Spring Boot - RabbitMQ 20 Kullanım Senaryosu ve Kodları

 


  1. Basit mesaj gönderme ve alma

  2. Özelleştirilmiş mesaj dönüştürücü kullanımı

  3. Mesaj yayınlama (Fanout Exchange)

  4. Mesaj önceliği belirleme

  5. Mesaj TTL (Time-to-Live) ayarlama

  6. Dead Letter Exchange kullanımı

  7. Mesaj onaylama (Manual Acknowledgment)

  8. Toplu mesaj gönderme

  9. Mesaj dinleyici havuzu oluşturma

  10. Mesaj yeniden deneme mekanizması

  11. Mesaj filtreleme

  12. Dinamik Queue oluşturma

  13. Mesaj sıralama (Consistent Hash Exchange)

  14. Mesaj içeriği şifreleme

  15. Mesaj yönlendirme (Topic Exchange)

  16. Mesaj izleme ve loglama

  17. Mesaj doğrulama

  18. Mesaj gruplandırma

  19. Mesaj yayılımı kontrolü (Publisher Confirms)

  20. Mesaj işleme performans ölçümü

// 1. Basit Mesaj Gönderme ve Alma

// Gönderici

@Autowired

private RabbitTemplate rabbitTemplate;


public void sendMessage(String message) {

rabbitTemplate.convertAndSend("exchangeName", "routingKey", message);

}


// Alıcı

@RabbitListener(queues = "queueName")

public void receiveMessage(String message) {

System.out.println("Received message: " + message);

}


// 2. Özelleştirilmiş Mesaj Dönüştürücü Kullanımı

public class CustomMessageConverter implements MessageConverter {

@Override

public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {

// Nesneyi mesaja dönüştürme işlemi

}


@Override

public Object fromMessage(Message message) throws MessageConversionException {

// Mesajı nesneye dönüştürme işlemi

}

}


@Bean

public MessageConverter jsonMessageConverter() {

return new CustomMessageConverter();

}


// 3. Mesaj Yayınlama (Fanout Exchange)

@Bean

public FanoutExchange fanoutExchange() {

return new FanoutExchange("fanoutExchangeName");

}


public void publishMessage(String message) {

rabbitTemplate.convertAndSend("fanoutExchangeName", "", message);

}


// 4. Mesaj Önceliği Belirleme

MessageProperties properties = new MessageProperties();

properties.setPriority(10);

Message message = new Message("High priority message".getBytes(), properties);

rabbitTemplate.send("exchangeName", "routingKey", message);


// 5. Mesaj TTL (Time-to-Live) Ayarlama

@Bean

public Queue ttlQueue() {

Map<String, Object> args = new HashMap<>();

args.put("x-message-ttl", 5000); // 5 saniye

return new Queue("ttlQueueName", true, false, false, args);

}


// 6. Dead Letter Exchange Kullanımı

@Bean

public Queue deadLetterQueue() {

Map<String, Object> args = new HashMap<>();

args.put("x-dead-letter-exchange", "deadLetterExchangeName");

args.put("x-dead-letter-routing-key", "deadLetterRoutingKey");

return new Queue("queueName", true, false, false, args);

}


// 7. Mesaj Onaylama (Manual Acknowledgment)

@RabbitListener(queues = "queueName", ackMode = "MANUAL")

public void receiveMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {

try {

// Mesaj işleme

channel.basicAck(tag, false);

} catch (Exception e) {

channel.basicNack(tag, false, true);

}

}


// 8. Toplu Mesaj Gönderme

List<String> messages = Arrays.asList("Message 1", "Message 2", "Message 3");

messages.forEach(message -> rabbitTemplate.convertAndSend("exchangeName", "routingKey", message));


// 9. Mesaj Dinleyici Havuzu Oluşturma

@Bean

public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {

SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();

factory.setConnectionFactory(connectionFactory);

factory.setConcurrentConsumers(3);

factory.setMaxConcurrentConsumers(10);

return factory;

}


// 10. Mesaj Yeniden Deneme Mekanizması

@Bean

public RetryOperationsInterceptor retryInterceptor() {

return RetryInterceptorBuilder.stateless()

.maxAttempts(5)

.backOffPolicy(new ExponentialBackOffPolicy())

.build();

}


@Bean

public SimpleRabbitListenerContainerFactory retryableRabbitListenerContainerFactory(ConnectionFactory connectionFactory) {

SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();

factory.setConnectionFactory(connectionFactory);

factory.setAdviceChain(new Advice[] { retryInterceptor() });

return factory;

}


// 11. Mesaj Filtreleme

@RabbitListener(queues = "queueName", condition = "headers['custom-header'] == 'important'")

public void receiveImportantMessage(String message) {

System.out.println("Received important message: " + message);

}


// 12. Dinamik Queue Oluşturma

@Bean

public Queue dynamicQueue() {

return new AnonymousQueue();

}


// 13. Mesaj Sıralama (Consistent Hash Exchange)

@Bean

public CustomExchange consistentHashExchange() {

Map<String, Object> args = new HashMap<>();

args.put("hash-property", "hash-on");

return new CustomExchange("consistentHashExchange", "x-consistent-hash", true, false, args);

}


// 14. Mesaj İçeriği Şifreleme

@Bean

public MessageConverter encryptingMessageConverter() {

return new SimpleMessageConverter() {

@Override

protected Message createMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {

Message message = super.createMessage(object, messageProperties);

// Mesaj içeriğini şifreleme işlemi

return message;

}


@Override

public Object fromMessage(Message message) throws MessageConversionException {

// Mesaj içeriğini çözme işlemi

return super.fromMessage(message);

}

};

}


// 15. Mesaj Yönlendirme (Topic Exchange)

@Bean

public TopicExchange topicExchange() {

return new TopicExchange("topicExchangeName");

}


@Bean

public Binding binding(Queue queue, TopicExchange topicExchange) {

return BindingBuilder.bind(queue).to(topicExchange).with("routing.#");

}


// 16. Mesaj İzleme ve Loglama

@Bean

public SimpleRabbitListenerContainerFactory loggingRabbitListenerContainerFactory(ConnectionFactory connectionFactory) {

SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();

factory.setConnectionFactory(connectionFactory);

factory.setBeforeSendReplyPostProcessors(message -> {

log.info("Sending reply: " + message);

return message;

});

return factory;

}


// 17. Mesaj Doğrulama

@RabbitListener(queues = "queueName")

public void receiveAndValidateMessage(@Valid @Payload CustomMessage message, @Headers MessageHeaders headers) {

// Mesaj işleme

}


// 18. Mesaj Gruplandırma

@RabbitListener(queues = "queueName", group = "processingGroup")

public void processGroupedMessage(String message) {

// Gruplandırılmış mesaj işleme

}


// 19. Mesaj Yayılımı Kontrolü (Publisher Confirms)

@Autowired

private RabbitTemplate rabbitTemplate;


@PostConstruct

public void init() {

rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {

if (ack) {

log.info("Message confirmed: " + correlationData);

} else {

log.error("Message not confirmed: " + cause);

}

});

}


// 20. Mesaj İşleme Performans Ölçümü

@RabbitListener(queues = "queueName")

@Timed(value = "rabbitmq.message.processing", description = "Time taken to process RabbitMQ messages")

public void processMessageWithMetrics(String message) {

// Mesaj işleme

}

Please Select Embedded Mode To Show The Comment System.*

Daha yeni Daha eski

نموذج الاتصال