Kafka’dan 5 partition’a sahip bir topic okuyarak, Spring Boot uygulaması ile 5 thread kullanarak paralel okuma gerçekleştiren bir senaryo oluşturacağız. Bu senaryoda, Java ExecutorService kullanarak 5 thread ile paralel işlem yapacağız.
Adımlar
- Spring Boot Projesi Oluşturma
- Kafka Konfigürasyonu
- Kafka Consumer Oluşturma
- Thread Pool (ExecutorService) ile Paralel Okuma
1. Spring Boot Projesi Oluşturma
Spring Initializr kullanarak yeni bir Spring Boot projesi oluşturun ve aşağıdaki bağımlılıkları ekleyin:
- Spring Web
- Spring for Apache Kafka
2. Kafka Konfigürasyonu
application.properties
dosyasını aşağıdaki gibi konfigüre edin:
propertiesspring.kafka.bootstrap-servers=localhost:9092 spring.kafka.consumer.group-id=my-group spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
3. Kafka Consumer Oluşturma
KafkaConsumerConfig
sınıfını oluşturarak Kafka Consumer konfigürasyonunu yapalım:
javaimport org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.config.ContainerProperties;
import java.util.HashMap;
import java.util.Map;
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
4. Kafka Listener ve Thread Pool (ExecutorService) ile Paralel Okuma
KafkaConsumerService
sınıfını oluşturarak Kafka mesajlarını paralel olarak işleyelim:
javaimport org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@Service
public class KafkaConsumerService {
private final ExecutorService executorService;
public KafkaConsumerService() {
this.executorService = Executors.newFixedThreadPool(5);
}
@KafkaListener(topics = "my-topic", groupId = "my-group")
public void listen(String message) {
executorService.submit(() -> processMessage(message));
}
public void processMessage(String message) {
System.out.println("Processing message: " + message);
// Mesajı işlemek için gereken kodları buraya ekleyin
}
public void shutdown() {
executorService.shutdown();
try {
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
}
}
}
Spring Boot Uygulamasını Başlatma
Uygulamayı başlatarak Kafka topic’inden mesajları paralel olarak işlemeye başlayabilirsiniz.
Tüm Proje Dosya Yapısı:
csssrc
└── main
└── java
└── com
└── example
└── kafka
├── KafkaConsumerConfig.java
├── KafkaConsumerService.java
└── KafkaApplication.java
└── resources
└── application.properties
KafkaApplication.java
javaimport org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class KafkaApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaApplication.class, args);
}
}
Bu yapılandırma ile Kafka’dan 5 partition’a sahip bir topic’i okuyarak, 5 thread ile paralel okuma gerçekleştirebilirsiniz. Mesajlar processMessage
metodu içerisinde işlenir ve ExecutorService ile paralel işlenir.