Basit mesaj gönderme ve alma
Özelleştirilmiş mesaj dönüştürücü kullanımı
Mesaj yayınlama (Fanout Exchange)
Mesaj önceliği belirleme
Mesaj TTL (Time-to-Live) ayarlama
Dead Letter Exchange kullanımı
Mesaj onaylama (Manual Acknowledgment)
Toplu mesaj gönderme
Mesaj dinleyici havuzu oluşturma
Mesaj yeniden deneme mekanizması
Mesaj filtreleme
Dinamik Queue oluşturma
Mesaj sıralama (Consistent Hash Exchange)
Mesaj içeriği şifreleme
Mesaj yönlendirme (Topic Exchange)
Mesaj izleme ve loglama
Mesaj doğrulama
Mesaj gruplandırma
Mesaj yayılımı kontrolü (Publisher Confirms)
Mesaj işleme performans ölçümü
// 1. Basit Mesaj Gönderme ve Alma
// Gönderici
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage(String message) {
rabbitTemplate.convertAndSend("exchangeName", "routingKey", message);
}
// Alıcı
@RabbitListener(queues = "queueName")
public void receiveMessage(String message) {
System.out.println("Received message: " + message);
}
// 2. Özelleştirilmiş Mesaj Dönüştürücü Kullanımı
public class CustomMessageConverter implements MessageConverter {
@Override
public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
// Nesneyi mesaja dönüştürme işlemi
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
// Mesajı nesneye dönüştürme işlemi
}
}
@Bean
public MessageConverter jsonMessageConverter() {
return new CustomMessageConverter();
}
// 3. Mesaj Yayınlama (Fanout Exchange)
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchangeName");
}
public void publishMessage(String message) {
rabbitTemplate.convertAndSend("fanoutExchangeName", "", message);
}
// 4. Mesaj Önceliği Belirleme
MessageProperties properties = new MessageProperties();
properties.setPriority(10);
Message message = new Message("High priority message".getBytes(), properties);
rabbitTemplate.send("exchangeName", "routingKey", message);
// 5. Mesaj TTL (Time-to-Live) Ayarlama
@Bean
public Queue ttlQueue() {
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 5000); // 5 saniye
return new Queue("ttlQueueName", true, false, false, args);
}
// 6. Dead Letter Exchange Kullanımı
@Bean
public Queue deadLetterQueue() {
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "deadLetterExchangeName");
args.put("x-dead-letter-routing-key", "deadLetterRoutingKey");
return new Queue("queueName", true, false, false, args);
}
// 7. Mesaj Onaylama (Manual Acknowledgment)
@RabbitListener(queues = "queueName", ackMode = "MANUAL")
public void receiveMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
try {
// Mesaj işleme
channel.basicAck(tag, false);
} catch (Exception e) {
channel.basicNack(tag, false, true);
}
}
// 8. Toplu Mesaj Gönderme
List<String> messages = Arrays.asList("Message 1", "Message 2", "Message 3");
messages.forEach(message -> rabbitTemplate.convertAndSend("exchangeName", "routingKey", message));
// 9. Mesaj Dinleyici Havuzu Oluşturma
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setConcurrentConsumers(3);
factory.setMaxConcurrentConsumers(10);
return factory;
}
// 10. Mesaj Yeniden Deneme Mekanizması
@Bean
public RetryOperationsInterceptor retryInterceptor() {
return RetryInterceptorBuilder.stateless()
.maxAttempts(5)
.backOffPolicy(new ExponentialBackOffPolicy())
.build();
}
@Bean
public SimpleRabbitListenerContainerFactory retryableRabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setAdviceChain(new Advice[] { retryInterceptor() });
return factory;
}
// 11. Mesaj Filtreleme
@RabbitListener(queues = "queueName", condition = "headers['custom-header'] == 'important'")
public void receiveImportantMessage(String message) {
System.out.println("Received important message: " + message);
}
// 12. Dinamik Queue Oluşturma
@Bean
public Queue dynamicQueue() {
return new AnonymousQueue();
}
// 13. Mesaj Sıralama (Consistent Hash Exchange)
@Bean
public CustomExchange consistentHashExchange() {
Map<String, Object> args = new HashMap<>();
args.put("hash-property", "hash-on");
return new CustomExchange("consistentHashExchange", "x-consistent-hash", true, false, args);
}
// 14. Mesaj İçeriği Şifreleme
@Bean
public MessageConverter encryptingMessageConverter() {
return new SimpleMessageConverter() {
@Override
protected Message createMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
Message message = super.createMessage(object, messageProperties);
// Mesaj içeriğini şifreleme işlemi
return message;
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
// Mesaj içeriğini çözme işlemi
return super.fromMessage(message);
}
};
}
// 15. Mesaj Yönlendirme (Topic Exchange)
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("topicExchangeName");
}
@Bean
public Binding binding(Queue queue, TopicExchange topicExchange) {
return BindingBuilder.bind(queue).to(topicExchange).with("routing.#");
}
// 16. Mesaj İzleme ve Loglama
@Bean
public SimpleRabbitListenerContainerFactory loggingRabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setBeforeSendReplyPostProcessors(message -> {
log.info("Sending reply: " + message);
return message;
});
return factory;
}
// 17. Mesaj Doğrulama
@RabbitListener(queues = "queueName")
public void receiveAndValidateMessage(@Valid @Payload CustomMessage message, @Headers MessageHeaders headers) {
// Mesaj işleme
}
// 18. Mesaj Gruplandırma
@RabbitListener(queues = "queueName", group = "processingGroup")
public void processGroupedMessage(String message) {
// Gruplandırılmış mesaj işleme
}
// 19. Mesaj Yayılımı Kontrolü (Publisher Confirms)
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
log.info("Message confirmed: " + correlationData);
} else {
log.error("Message not confirmed: " + cause);
}
});
}
// 20. Mesaj İşleme Performans Ölçümü
@RabbitListener(queues = "queueName")
@Timed(value = "rabbitmq.message.processing", description = "Time taken to process RabbitMQ messages")
public void processMessageWithMetrics(String message) {
// Mesaj işleme
}