Tabii ki! Kafka'yı kullanan bir projeyi adım adım anlatmak için aşağıda bir senaryo sunuyorum:
Senaryo: Bir IoT Veri Toplama ve İşleme Projesi
Amaç: IoT cihazlarından gelen sensör verilerini toplamak, Kafka kullanarak işlemek ve sonuçları analiz etmek.
Adım 1: Kafka Cluster Kurulumu
Kafka'yı kullanmak için bir Kafka kümesi (cluster) kurun. Bu küme birden fazla Kafka broker'ı içermelidir.
Her broker'ı farklı bir sunucuda veya sanal makinede çalıştırın.
ZooKeeper veya Kafka'nın kendi dahili koordinasyon hizmetini (group coordinator) kullanarak küme koordinasyonunu sağlayın.
Adım 2: Konu (Topic) Oluşturma
İlk adımda, verileri saklayacağınız Kafka konularını oluşturun. Örneğin, "sensor_verileri" adında bir konu oluşturabilirsiniz.
Konular, verilerin mantıksal kategorilerini temsil eder ve bu senaryoda IoT sensör verilerini içerir.
Adım 3: Veri Üretici (Producer) Geliştirme
IoT cihazlarından gelen sensör verilerini temsil eden bir veri üretici (producer) uygulaması geliştirin.
Bu uygulama, sensör verilerini Kafka konularına düzenli aralıklarla göndermelidir.
Her gönderilen verinin bir offset'i olacak ve bu offset, verinin konu içindeki sırasını temsil eder.
Adım 4: Tüketici Grubu (Consumer Group) Geliştirme
Sensör verilerini işlemek için bir tüketici grubu (consumer group) oluşturun. Bu grup birden fazla tüketici uygulamasını içermelidir.
Tüketici grubu, "sensor_verileri" konusundan veri okuyacak.
Tüketici uygulamaları, veriyi işleyip analiz etmek için geliştirilmelidir.
Her bir tüketici uygulaması, kendi offset yönetimini yapacak ve işlenen verilerin son durumunu saklayacaktır.
Adım 5: Veri İşleme ve Analiz
Tüketici uygulamaları, gelen sensör verilerini işleyip analiz ederek özgün iş gereksinimlerinizi karşılayacak sonuçlar üretecektir.
Örneğin, sıcaklık verilerini analiz ederek anormal durumları algılayabilir veya veriyi bir veritabanına kaydedebilirsiniz.
İşlenmiş verileri raporlamak veya görselleştirmek için uygun araçları kullanabilirsiniz.
Adım 6: Hata Yönetimi ve İzleme
Proje içinde hata yönetimi mekanizmalarını tanımlayın. Hataların yönetilmesi ve izlenmesi, sistemin güvenilirliğini artırır.
Kafka'daki offset yönetimi, işlenmeyen veya başarısız olan verilerin tekrar işlenmesi için kullanılabilir.
İzleme araçları veya Kafka Cluster üzerindeki loglar, sistemin sağlığını izlemek için kullanılabilir.
Sonuç:
Bu senaryo, IoT cihazlarından gelen verilerin toplanması, işlenmesi ve analiz edilmesi için Kafka'nın nasıl kullanılacağını adım adım açıklamaktadır. Bu, büyük ölçekli veri işleme projeleri için güçlü ve ölçeklenebilir bir yaklaşım sunar. Projenizin karmaşıklığına ve ihtiyaçlarına göre Kafka'yı daha fazla özelleştirebilirsiniz.
Partition (Bölüm):
Kafka konuları (topics), birden fazla partition'a bölünebilir. Her partition, ayrı bir sıralı veri akışını temsil eder. Örneğin, "sensor_verileri" konusu üç partition'a sahipse, bu üç farklı sıralı veri akışını temsil eder.
Partition'lar Kafka'nın ölçeklenebilirliğini sağlar. Mesajların farklı partition'lar arasında dağıtılması, işlemi paralel hale getirir ve yükü dengeler.
Her partition, içinde sıralı olarak saklanan mesajların bir offset dizisini korur. Offset, tüketici uygulamalarının hangi mesajları okuduklarını izlemek için kullanılır.
Partition'lar Kafka kümesi içinde farklı broker'lar üzerinde barındırılabilir. Bu, yüksek ölçeklenebilirlik ve yüksek erişilebilirlik sağlar.
Replication (Replikasyon):
Kafka, mesajların yedeklenmesini (replication) destekler. Her bir partition, belirli bir sayıda replica (kopya) ile çoğaltılabilir.
Replication, verilerin yedeklenmesi ve yüksek erişilebilirlik sağlar. Bir broker veya partition çökerse, replica'lar diğer broker'lar üzerinde çalışmaya devam edebilir ve veri kaybını önler.
Replica'lar lider (leader) ve takipçi (follower) olarak ayrılır. Lider partition üzerinde yazma ve okuma işlemleri gerçekleştirirken, takipçi replica'lar yedeklenmiş verileri tutar ve liderden güncellemeleri alır.
Replication faktörü, bir partition'ın kaç replica ile çoğaltılacağını belirler. Örneğin, replication faktörü 3 ise, bir partition 2 takipçi ile birlikte toplamda 3 replica içerir.
Partition ve replication, Kafka'nın dayanıklılık, ölçeklenebilirlik ve yüksek erişilebilirlik sağlamasına yardımcı olan temel kavramlardır. Bu özellikler, büyük ölçekli ve güvenilir veri akışı işleme senaryolarında kritik öneme sahiptir.
Bu senaryoda consumerlardaki grop oluşumu ve partitionlardaki ilişkiyi anlat
Elbette, senaryoda tüketicilerin grup oluşturması ve partition'lar arasındaki ilişkiyi daha ayrıntılı bir şekilde anlatalım.
Senaryo: Bir IoT Veri Toplama ve İşleme Projesi
Adım 4: Tüketici Grubu (Consumer Group) Geliştirme
Önceki açıklamalara dayanarak, tüketici grubu oluşturmak ve partition'lar arasındaki ilişkiyi anlatmak için aşağıdaki adımları göz önünde bulundurun:
Tüketici Grubu Oluşturma:
Tüketici grubunu oluştururken, grup kimliği (group id) belirtmeniz gerekir. Örneğin, "veri_isleme_grubu" olarak adlandırabilirsiniz.
Tüketici Üyelerin Katılması:
Tüketici grubu üyeleri, bu gruba katılmak için aynı grup kimliğini kullanmalıdır. Her bir tüketici uygulaması bu grup kimliğini kullanarak Kafka'ya katılır.
Partition Atama:
Kafka, her bir partition'ı grup üyeleri arasında adil bir şekilde paylaştırır. Örneğin, "sensor_verileri" konusu 3 partition'a sahipse, her bir partition bir tüketici üyesine atanır.
Bu atama işlemi, her partition için sıralı veri akışını sürdürmeyi ve yükü denglemeyi sağlar.
İşleme Başlama:
Tüketici grupu üyeleri, kendilerine atanan partition'lar üzerinde veri okumaya başlarlar. Her bir grup üyesi, kendi partition'ı üzerinde sıralı veri işler.
Offset Yönetimi:
Her tüketici uygulaması, kendi partition'ı için offset yönetimini yapar. Yani, hangi mesajların işlendiğini ve işlenmeyi bekleyen sonraki mesajın hangisi olduğunu belirler.
Bu sayede, her bir grup üyesi kendi ilerlemesini bağımsız olarak takip eder.
Örnek:
Tüketici grubu "veri_isleme_grubu" olarak adlandırıldı.
"sensor_verileri" konusu 3 partition'a sahip.
Tüketici grubu üyeleri şunlardır: Tüketici 1, Tüketici 2 ve Tüketici 3.
Atama şu şekilde olabilir:
Tüketici 1 -> Partition 0
Tüketici 2 -> Partition 1
Tüketici 3 -> Partition 2
Her bir tüketici kendi partition'ında veri okur ve offset yönetimini yapar.
Bu, paralel veri işlemenin sağlandığı ve yükün adil bir şekilde paylaşıldığı bir senaryoyu temsil eder.
Tüketici grupları, partition'lar arasındaki iş bölümünü ve koordinasyonunu kolaylaştırır ve veri işleme işini daha etkili bir şekilde yönetmenizi sağlar. Bu sayede, büyük veri akışlarını paralel olarak işlemek ve yüksek ölçeklenebilirlik elde etmek mümkün olur.
Bir partitionu grup içersinde birden fazla consumer okuyabilir mi?
Evet, bir partition, aynı tüketici grubuna üye birden fazla tüketici tarafından okunabilir. Kafka, aynı tüketici grubuna bağlı birden fazla tüketici uygulamasının aynı partition'dan veri okumasına izin verir ve bu sayede paralel veri işleme sağlanır.
Bu durumda, her bir tüketici aynı partition'dan gelen verileri okur ve her biri kendi offset yönetimini yapar. Kafka, partition'dan gelen verileri tüketici grup üyeleri arasında otomatik olarak paylaştırır ve her bir tüketici kendi sıralı veri akışını sürdürür.
Bu özellik, Kafka'nın ölçeklenebilirliğini artırır ve büyük veri akışlarını daha hızlı işlemek için kullanılır. Ancak, dikkat edilmesi gereken birkaç önemli nokta vardır:
Aynı partition'dan veri okuyan tüketici uygulamaları arasında veri işleme işinin adil bir şekilde paylaştırılması gerekir. Her bir tüketici aynı miktarda veriyi işlemelidir.
Offset yönetimi her tüketici tarafından bağımsız olarak yapılır. Bu nedenle, tüketici uygulamalarının, işledikleri verinin offset'lerini doğru bir şekilde saklaması ve yönetmesi gereklidir.
Tüketici grupu içindeki tüketici sayısı ve partition sayısı, projenizin ihtiyaçlarına ve Kafka kümenizin özelliklerine göre dikkatlice ayarlanmalıdır. Aşırı tüketici sayısı veya partition sayısı gereksiz karmaşıklığa yol açabilir.
Özetle, aynı partition'dan birden fazla tüketici tarafından veri okuma, Kafka'nın ölçeklenebilirliğini ve paralel veri işleme yeteneklerini artırır. Bu nedenle, büyük veri işleme senaryolarında yaygın olarak kullanılır.
Örneğin 3 partitionum ve 5 consumerım var, partitionşlar nasıl paylaşılır?
Eğer 3 partition'ınız varsa ve 5 tüketici uygulamanız varsa, Kafka, partition'ları tüketici grup üyeleri arasında adil bir şekilde paylaştıracaktır. Kafka, bu atamayı yaparken aşağıdaki stratejilere göre hareket eder:
Round-Robin Atama: Kafka, tüketici grup üyeleri arasında partition'ları sırayla paylaştırabilir. Örneğin, 3 partition ve 5 tüketici olduğunda, atama şu şekilde olabilir:
Tüketici 1 -> Partition 0
Tüketici 2 -> Partition 1
Tüketici 3 -> Partition 2
Tüketici 4 -> Partition 0 (Round-Robin ile başa döner)
Bu durumda, her tüketici farklı bir partition üzerinde veri okur ve işler.
Paralel Okuma: Tüketici grupu üyeleri arasında partition'lar eşit bir şekilde dağıtılır. Örneğin, 3 partition ve 5 tüketici olduğunda, atama şu şekilde olabilir:
Tüketici 1 -> Partition 0
Tüketici 2 -> Partition 1
Tüketici 3 -> Partition 2
Tüketici 4 -> Boşta (Herhangi bir partition'ı alabilir)
Bu durumda, her tüketici farklı bir partition üzerinde veri okur ve işlerken bir tüketici bir partition'ı işlemiyor olabilir.
Bu atama stratejileri sayesinde, partition'lar arasındaki iş yükü adil bir şekilde dağıtılır. Ancak unutulmaması gereken önemli bir nokta, partition sayısı tüketici sayısına göre her zaman tam olarak bölünmeyebilir. Bu durumda bazı tüketici uygulamaları boşta kalabilir.
Özellikle yüksek ölçekli projelerde, partition sayısı ve tüketici sayısı iyi bir şekilde planlanmalı ve gerektiğinde ölçeklendirilmelidir. Kafka'nın dinamik ölçeklenebilirlik yetenekleri ile bu tür durumlar yönetilebilir.
3 partition 5 consumer olan bu senaryoyu spring boot ve java ile consumer tarafını kodlar mısın?
Elbette, Kafka tüketici uygulamalarını Spring Boot ve Java ile geliştirmeniz için aşağıda örnek bir kod sunuyorum. Bu örnek, 3 partition'ı olan bir Kafka konusundan veri okuyan ve işleyen 5 tüketici uygulamasını içerir.
Öncelikle, Kafka'ya bağlanmak için gerekli olan bağımlılıkları eklemelisiniz. pom.xml dosyanıza aşağıdaki bağımlılıkları ekleyebilirsiniz:
xml
Copy code
<dependencies>
<!-- Spring Kafka Dependency -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- Kafka Client Dependency -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
</dependencies>
Ardından, tüketici uygulamasını aşağıdaki gibi kodlayabilirsiniz:
java
Copy code
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.config.MapConsumerProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.config.ContainerProperties;
import java.util.HashMap;
import java.util.Map;
@EnableKafka
public class KafkaConsumerConfig {
// Kafka Consumer Factory
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "your-kafka-broker");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "your-consumer-group");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class.getName());
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class.getName());
config.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
config.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
return new DefaultKafkaConsumerFactory<>(config);
}
// Kafka Listener Container Factory
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3); // Adjust to the number of partitions
factory.getContainerProperties().setConsumerRebalanceListener(new CustomConsumerRebalanceListener());
return factory;
}
// Kafka Listener
@KafkaListener(topics = "your-topic")
public void listen(String message) {
// Your message processing logic here
System.out.println("Received: " + message);
}
}
Yukarıdaki kod örneği, Spring Boot ve Spring Kafka kullanarak Kafka'ya bağlanan ve belirtilen bir konudan veri dinleyen bir tüketici uygulamasını temsil eder. your-kafka-broker ve your-consumer-group değerlerini kendi Kafka kümeniz ve tüketici grubunuzla değiştirmeniz gerekecektir.
Ayrıca, ConcurrentKafkaListenerContainerFactory içinde factory.setConcurrency(3) satırını, partition sayısına uygun şekilde ayarlamanız gerekecektir.
Bu kod, Spring Kafka'nın sağladığı özellikleri kullanarak Kafka'dan veri okuma işlemi için temel bir yapı sunar. Geriye kalan işlem, @KafkaListener ile işlenen veriyi işlemek ve iş uygulamanızı geliştirmek olacaktır.
Consumerı nasıl oluşturmalıyım, meseşla uygulamayı dockerize edip 5 instancela mı çalıştırmamı istiyorsun veya bu örnekte 5 thread oluşturuop paralel işlem mi yaptın?
Kafka tüketici uygulamalarını oluştururken, aynı tüketici grubuna bağlı birden fazla tüketici uygulamasını çalıştırmanız gerektiğini unutmamanız önemlidir. Bu, Kafka'dan gelen verileri paralel olarak işlemenizi sağlar. İşte nasıl yapılması gerektiği hakkında daha fazla ayrıntı:
Tüketici Grubu Oluşturma: Kafka tüketici uygulamalarını aynı tüketici grubuna kaydetmelisiniz. Bu, Kafka'nın tüketici grup üyeleri arasında partition'ları paylaştırmasını sağlar.
Tüketici Uygulama Sayısını Ayarlama: İhtiyacınıza bağlı olarak, aynı tüketici uygulamasını birden fazla örnekle çalıştırabilirsiniz. Bu, aynı tüketici grubu içinde daha fazla tüketici uygulamasını temsil eder. Örneğin, 5 tüketici uygulaması çalıştırmak için 5 farklı uygulama örneği oluşturabilirsiniz.
Concurrency (Paralel İşlem): Spring Kafka'da, ConcurrentKafkaListenerContainerFactory ile factory.setConcurrency() ayarını kullanarak her tüketici uygulamasının kaç tane eşzamanlı işleme sahip olacağını belirleyebilirsiniz. Örneğin, 5 tüketici uygulamasını çalıştırıyorsanız ve her biri 3 partition'ı işliyorsa, setConcurrency(3) ile her bir tüketici uygulaması için 3 eşzamanlı işlem oluşturabilirsiniz. Bu, toplamda 15 eşzamanlı işleme sahip olmanızı sağlar.
Docker ile Çalıştırma: Tüketici uygulamalarını Docker konteynerleri içinde çalıştırmak, ölçeklenebilirliği artırmanın bir yoludur. Docker kullanarak her bir tüketici uygulamasını ayrı konteynerlerde çalıştırabilir ve bu konteynerleri ölçeklendirebilirsiniz.
Örnek olarak, 5 tüketici uygulamasını Docker konteynerleri içinde çalıştırmak için her bir tüketici uygulamasının bir Docker konteyneri olarak çalıştırılabilir. Her biri farklı bir tüketici grubu üyesi olacak ve partition'ları paylaşacaklardır.
Bu şekilde yapılandırılmış bir Kafka tüketici uygulaması sistemi, veri işlemeyi paralel hale getirir ve Kafka'dan gelen yüksek veri akışlarını daha iyi yönetmenizi sağlar.
3 partionun herbirinin farklı groupid si mi olmalıdır?
Hayır, 3 partition'ın her birinin farklı groupid ile olması gerekmez. Partition'lar, aynı tüketici grubu içindeki tüketici uygulamaları arasında paylaştırılır. Yani, aynı tüketici grubuna ait birden fazla tüketici uygulaması, aynı partition'ları paylaşabilir.
Tüketici grubunun her bir tüketici uygulaması arasında farklı bir groupid kullanmanız gerekir. Tüketici grubu, aynı partition'ları paylaşan ve verileri işleyen tüketici uygulamalarını gruplamak için kullanılır. Her bir tüketici uygulamasının aynı groupid ile kaydolması, aynı tüketici grubunun bir parçası olduklarını ve partition'ları paylaşacaklarını belirtir.
Örneğin, 3 partition'ı olan bir Kafka konusunda 5 tüketici uygulaması çalıştırmak istiyorsanız, bu 5 tüketici uygulamasının aynı groupid ile kaydolması gerekir. Bu sayede Kafka, bu tüketici grubunu oluşturur ve partition'ları bu tüketici grubu üyeleri arasında adil bir şekilde paylaştırır.
Her bir tüketici uygulaması farklı bir groupid kullanırsa, Kafka onları farklı tüketici grupları olarak algılar ve partition'ları ayrı ayrı paylaştırır. Bu, verilerin istediğiniz gibi işlenmesini engelleyebilir ve beklenmeyen sonuçlara yol açabilir. Bu nedenle, aynı tüketici grubu içinde aynı groupid'i kullanmalısınız.
Yani psartitionlların ve consumerların group id leri aynı olmalıdır
Evet, partition'ların ve tüketici uygulamalarının groupid'leri aynı olmalıdır. İşte bu konseptin nasıl çalıştığına dair daha fazla açıklama:
Partition'ların Grup Kimliği (groupid): Her bir partition, tüketici grupları arasında paylaşılmak üzere bir grup kimliği (groupid) ile ilişkilendirilir. Örneğin, "sensor_verileri" adlı bir konunun 3 partition'ı varsa, bu 3 partition'ın hepsi aynı tüketici grubu üyeleri tarafından okunacaksa, bu partition'ların grup kimliği aynı olmalıdır.
Tüketici Gruplarının Grup Kimliği (groupid): Tüketici uygulamaları da bir tüketici grubu içinde grup kimliği (groupid) kullanarak kaydolur. Tüketici grupları, aynı partition'ları paylaşacak ve verileri işleyecektir. Bu nedenle, aynı tüketici grubundaki tüm tüketici uygulamalarının grup kimliği aynı olmalıdır.
Örnek olarak, 3 partition'ı olan bir Kafka konusundan veri okumak istiyorsanız ve 5 tüketici uygulaması çalıştırmak istiyorsanız:
Tüketici Grubu Üyeleri (Consumer Group Members): Bu 5 tüketici uygulaması, aynı tüketici grubu içinde kaydolmalıdır ve grup kimliği aynı olmalıdır. Örneğin, "veri_isleme_grubu" adlı bir tüketici grubu ve bu grubun groupid'i "veri_isleme_grubu" olarak ayarlanabilir.
Partition'ların Grup Kimliği: Bu 3 partition, aynı tüketici grubu üyeleri tarafından paylaşılmak üzere aynı grup kimliği (groupid) ile ilişkilendirilmelidir. Bu, Kafka'nın bu partition'ları tüketici grubu üyeleri arasında adil bir şekilde paylaştırmasını sağlar.
Sonuç olarak, tüketici gruplarının ve partition'ların grup kimlikleri aynı olmalıdır, böylece veriler adil bir şekilde işlenebilir ve partition'lar tüketici grupları arasında paylaştırılabilir.
Bu proje için aynı kdun devamı olarak manual offset kontrolünü kodlar mısın?
Elbette, Manuel Offset Kontrolü'nü uygulamak için Spring Kafka ile Java'da bir Kafka tüketici uygulaması kodlamak için aşağıdaki örnekleri kullanabilirsiniz. Bu örnekler, Kafka'nın içsel offset yönetimini devre dışı bırakacak ve manuel olarak offset kontrolünü sağlayacaktır.
Öncelikle, Kafka bağımlılıklarını eklemelisiniz (pom.xml dosyası):
xml
Copy code
<dependencies>
<!-- Spring Kafka Dependency -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- Kafka Client Dependency -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
</dependencies>
Manuel Offset Kontrolü'nü etkinleştirmek için ConsumerConfig ayarlarınızı yapmalısınız:
java
Copy code
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Properties;
public class ManualOffsetControlConsumer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "your-kafka-broker");
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "your-consumer-group");
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // Starting from the beginning
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
// Subscribe to the topic(s)
consumer.subscribe(Arrays.asList("your-topic"));
while (true) {
// Poll for records
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// Process the record
System.out.println("Received: " + record.value());
// Manually commit the offset
TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
OffsetAndMetadata offset = new OffsetAndMetadata(record.offset() + 1);
consumer.commitSync(Collections.singletonMap(topicPartition, offset));
}
}
}
}
Yukarıdaki örnek, AUTO_OFFSET_RESET_CONFIG ayarını "earliest" olarak ayarlar, bu da tüketici uygulamasının en baştan itibaren okumaya başlayacağı anlamına gelir. Manuel olarak offset'i her kayıt işlendikten sonra commit eder.
Bu şekilde, Manuel Offset Kontrolü'nü kullanarak Kafka tüketici uygulamanızı oluşturabilir ve verileri işlerken offset'i manuel olarak yönetebilirsiniz.