- Basit mesaj gönderme ve alma 
- Özelleştirilmiş mesaj dönüştürücü kullanımı 
- Mesaj yayınlama (Fanout Exchange) 
- Mesaj önceliği belirleme 
- Mesaj TTL (Time-to-Live) ayarlama 
- Dead Letter Exchange kullanımı 
- Mesaj onaylama (Manual Acknowledgment) 
- Toplu mesaj gönderme 
- Mesaj dinleyici havuzu oluşturma 
- Mesaj yeniden deneme mekanizması 
- Mesaj filtreleme 
- Dinamik Queue oluşturma 
- Mesaj sıralama (Consistent Hash Exchange) 
- Mesaj içeriği şifreleme 
- Mesaj yönlendirme (Topic Exchange) 
- Mesaj izleme ve loglama 
- Mesaj doğrulama 
- Mesaj gruplandırma 
- Mesaj yayılımı kontrolü (Publisher Confirms) 
- Mesaj işleme performans ölçümü 
// 1. Basit Mesaj Gönderme ve Alma
// Gönderici
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage(String message) {
rabbitTemplate.convertAndSend("exchangeName", "routingKey", message);
}
// Alıcı
@RabbitListener(queues = "queueName")
public void receiveMessage(String message) {
System.out.println("Received message: " + message);
}
// 2. Özelleştirilmiş Mesaj Dönüştürücü Kullanımı
public class CustomMessageConverter implements MessageConverter {
@Override
public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
// Nesneyi mesaja dönüştürme işlemi
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
// Mesajı nesneye dönüştürme işlemi
}
}
@Bean
public MessageConverter jsonMessageConverter() {
return new CustomMessageConverter();
}
// 3. Mesaj Yayınlama (Fanout Exchange)
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchangeName");
}
public void publishMessage(String message) {
rabbitTemplate.convertAndSend("fanoutExchangeName", "", message);
}
// 4. Mesaj Önceliği Belirleme
MessageProperties properties = new MessageProperties();
properties.setPriority(10);
Message message = new Message("High priority message".getBytes(), properties);
rabbitTemplate.send("exchangeName", "routingKey", message);
// 5. Mesaj TTL (Time-to-Live) Ayarlama
@Bean
public Queue ttlQueue() {
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 5000); // 5 saniye
return new Queue("ttlQueueName", true, false, false, args);
}
// 6. Dead Letter Exchange Kullanımı
@Bean
public Queue deadLetterQueue() {
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "deadLetterExchangeName");
args.put("x-dead-letter-routing-key", "deadLetterRoutingKey");
return new Queue("queueName", true, false, false, args);
}
// 7. Mesaj Onaylama (Manual Acknowledgment)
@RabbitListener(queues = "queueName", ackMode = "MANUAL")
public void receiveMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
try {
// Mesaj işleme
channel.basicAck(tag, false);
} catch (Exception e) {
channel.basicNack(tag, false, true);
}
}
// 8. Toplu Mesaj Gönderme
List<String> messages = Arrays.asList("Message 1", "Message 2", "Message 3");
messages.forEach(message -> rabbitTemplate.convertAndSend("exchangeName", "routingKey", message));
// 9. Mesaj Dinleyici Havuzu Oluşturma
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setConcurrentConsumers(3);
factory.setMaxConcurrentConsumers(10);
return factory;
}
// 10. Mesaj Yeniden Deneme Mekanizması
@Bean
public RetryOperationsInterceptor retryInterceptor() {
return RetryInterceptorBuilder.stateless()
.maxAttempts(5)
.backOffPolicy(new ExponentialBackOffPolicy())
.build();
}
@Bean
public SimpleRabbitListenerContainerFactory retryableRabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setAdviceChain(new Advice[] { retryInterceptor() });
return factory;
}
// 11. Mesaj Filtreleme
@RabbitListener(queues = "queueName", condition = "headers['custom-header'] == 'important'")
public void receiveImportantMessage(String message) {
System.out.println("Received important message: " + message);
}
// 12. Dinamik Queue Oluşturma
@Bean
public Queue dynamicQueue() {
return new AnonymousQueue();
}
// 13. Mesaj Sıralama (Consistent Hash Exchange)
@Bean
public CustomExchange consistentHashExchange() {
Map<String, Object> args = new HashMap<>();
args.put("hash-property", "hash-on");
return new CustomExchange("consistentHashExchange", "x-consistent-hash", true, false, args);
}
// 14. Mesaj İçeriği Şifreleme
@Bean
public MessageConverter encryptingMessageConverter() {
return new SimpleMessageConverter() {
@Override
protected Message createMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
Message message = super.createMessage(object, messageProperties);
// Mesaj içeriğini şifreleme işlemi
return message;
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
// Mesaj içeriğini çözme işlemi
return super.fromMessage(message);
}
};
}
// 15. Mesaj Yönlendirme (Topic Exchange)
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("topicExchangeName");
}
@Bean
public Binding binding(Queue queue, TopicExchange topicExchange) {
return BindingBuilder.bind(queue).to(topicExchange).with("routing.#");
}
// 16. Mesaj İzleme ve Loglama
@Bean
public SimpleRabbitListenerContainerFactory loggingRabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setBeforeSendReplyPostProcessors(message -> {
log.info("Sending reply: " + message);
return message;
});
return factory;
}
// 17. Mesaj Doğrulama
@RabbitListener(queues = "queueName")
public void receiveAndValidateMessage(@Valid @Payload CustomMessage message, @Headers MessageHeaders headers) {
// Mesaj işleme
}
// 18. Mesaj Gruplandırma
@RabbitListener(queues = "queueName", group = "processingGroup")
public void processGroupedMessage(String message) {
// Gruplandırılmış mesaj işleme
}
// 19. Mesaj Yayılımı Kontrolü (Publisher Confirms)
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
log.info("Message confirmed: " + correlationData);
} else {
log.error("Message not confirmed: " + cause);
}
});
}
// 20. Mesaj İşleme Performans Ölçümü
@RabbitListener(queues = "queueName")
@Timed(value = "rabbitmq.message.processing", description = "Time taken to process RabbitMQ messages")
public void processMessageWithMetrics(String message) {
// Mesaj işleme
}
