Spring Boot ile Kafka'dan Çoklu Partition Okuma: 5 Thread Kullanarak Paralel İşleme Rehberi

Kafka’dan 5 partition’a sahip bir topic okuyarak, Spring Boot uygulaması ile 5 thread kullanarak paralel okuma gerçekleştiren bir senaryo oluşturacağız. Bu senaryoda, Java ExecutorService kullanarak 5 thread ile paralel işlem yapacağız.



Adımlar

  1. Spring Boot Projesi Oluşturma
  2. Kafka Konfigürasyonu
  3. Kafka Consumer Oluşturma
  4. Thread Pool (ExecutorService) ile Paralel Okuma

1. Spring Boot Projesi Oluşturma

Spring Initializr kullanarak yeni bir Spring Boot projesi oluşturun ve aşağıdaki bağımlılıkları ekleyin:

  • Spring Web
  • Spring for Apache Kafka

2. Kafka Konfigürasyonu

application.properties dosyasını aşağıdaki gibi konfigüre edin:

properties
spring.kafka.bootstrap-servers=localhost:9092 spring.kafka.consumer.group-id=my-group spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

3. Kafka Consumer Oluşturma

KafkaConsumerConfig sınıfını oluşturarak Kafka Consumer konfigürasyonunu yapalım:

java
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.listener.config.ContainerProperties; import java.util.HashMap; import java.util.Map; @EnableKafka @Configuration public class KafkaConsumerConfig { @Bean public ConsumerFactory<String, String> consumerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return new DefaultKafkaConsumerFactory<>(props); } @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } }

4. Kafka Listener ve Thread Pool (ExecutorService) ile Paralel Okuma

KafkaConsumerService sınıfını oluşturarak Kafka mesajlarını paralel olarak işleyelim:

java
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Service; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @Service public class KafkaConsumerService { private final ExecutorService executorService; public KafkaConsumerService() { this.executorService = Executors.newFixedThreadPool(5); } @KafkaListener(topics = "my-topic", groupId = "my-group") public void listen(String message) { executorService.submit(() -> processMessage(message)); } public void processMessage(String message) { System.out.println("Processing message: " + message); // Mesajı işlemek için gereken kodları buraya ekleyin } public void shutdown() { executorService.shutdown(); try { if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) { executorService.shutdownNow(); } } catch (InterruptedException e) { executorService.shutdownNow(); } } }

Spring Boot Uygulamasını Başlatma

Uygulamayı başlatarak Kafka topic’inden mesajları paralel olarak işlemeye başlayabilirsiniz.

Tüm Proje Dosya Yapısı:

css
src └── main └── java └── com └── example └── kafka ├── KafkaConsumerConfig.java ├── KafkaConsumerService.java └── KafkaApplication.java └── resources └── application.properties

KafkaApplication.java

java
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class KafkaApplication { public static void main(String[] args) { SpringApplication.run(KafkaApplication.class, args); } }

Bu yapılandırma ile Kafka’dan 5 partition’a sahip bir topic’i okuyarak, 5 thread ile paralel okuma gerçekleştirebilirsiniz. Mesajlar processMessage metodu içerisinde işlenir ve ExecutorService ile paralel işlenir.

Please Select Embedded Mode To Show The Comment System.*

Daha yeni Daha eski

نموذج الاتصال