Spring Cloud Stream ile Event-Driven Architecture oluşturmak, mikroservislerin asenkron olarak iletişim kurmasına olanak tanır. Mesaj tabanlı bir mimari, servislerin birbirinden bağımsız olarak çalışabilmesini ve daha esnek bir yapının oluşmasını sağlar. Aşağıda verdiğiniz kod parçası, Spring Cloud Stream kullanarak bir KStream işleme örneği sunmaktadır. Bu kod parçasını detaylandırarak ve açıklamalar ekleyerek bir Event-Driven Architecture oluşturmanın temel adımlarını inceleyelim.
1. Spring Cloud Stream Bağımlılıklarını Ekleme
İlk olarak, pom.xml
dosyasına Spring Cloud Stream bağımlılıklarını eklemelisiniz. Örneğin, Kafka kullanıyorsanız aşağıdaki bağımlılıkları ekleyin:
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
</dependencies>
2. Konfigürasyon Dosyasını Hazırlama
application.yml
veya application.properties
dosyasında, mesaj broker'ı (örneğin Kafka) için gerekli konfigürasyonları yapın:
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
bindings:
input:
destination: input-topic
contentType: application/json
output:
destination: output-topic
contentType: application/json
3. Stream Application Sınıfı
Aşağıda, Spring Cloud Stream ile bir KStream işleme uygulamasının adım adım açıklaması yer almaktadır:
import org.apache.kafka.streams.KStream;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
import org.springframework.kafka.core.StreamsBuilderFactoryBean;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
@SpringBootApplication
@EnableBinding(Processor.class)
@EnableKafkaStreams
public class StreamApplication {
public static void main(String[] args) {
SpringApplication.run(StreamApplication.class, args);
}
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
return new KafkaStreamsConfiguration(props);
}
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public KStream<String, Long> process(KStream<String, String> input) {
return input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, value) -> value)
.count(Materialized.as("counts-store"))
.toStream();
}
}
Açıklamalar:
Annotation'lar:
@SpringBootApplication
: Spring Boot uygulamasını başlatır.@EnableBinding(Processor.class)
: Processor bağlayıcıyı etkinleştirir ve input/output kanallarını kullanır.@EnableKafkaStreams
: Kafka Streams kullanımı için gerekli konfigürasyonu etkinleştirir.
KStream İşleme:
@StreamListener(Processor.INPUT)
: Gelen mesajları dinler.@SendTo(Processor.OUTPUT)
: İşlenmiş mesajları output kanalına gönderir.input.flatMapValues(...)
: Gelen mesajları kelimelerine ayırır ve küçük harfe dönüştürür.groupBy(...)
: Kelimeleri gruplar.count(Materialized.as("counts-store"))
: Kelime sayısını tutar.toStream()
: Sonuçları bir stream olarak döner.
Kafka Streams Konfigürasyonu:
@Bean
: Kafka Streams konfigürasyonunu sağlar.KafkaStreamsConfiguration
: Kafka Streams için gerekli yapılandırmaları yapar.
4. Mikroservislerin Bağımsız Çalışması
Bu yapıda, her mikroservis kendi başına çalışabilir ve diğer mikroservislerden gelen mesajları asenkron olarak işleyebilir. Mesaj tabanlı mimari, mikroservislerin bağımsızlığını ve esnekliğini artırır, aynı zamanda daha ölçeklenebilir bir yapı sağlar.
Özet
Spring Cloud Stream ve Kafka Streams kullanarak mikroservisler arasında asenkron iletişim sağlamak, Event-Driven Architecture oluşturmanın etkili bir yoludur. Yukarıdaki örnek, bu mimariyi uygulamak için temel adımları ve örnek bir uygulamayı içermektedir. Bu yapı, daha dayanıklı ve esnek mikroservisler oluşturmanıza yardımcı olabilir.