Spring Cloud Stream ile Event-Driven Architecture: Mikroservisler Arası Asenkron İletişim Rehberi



Spring Cloud Stream ile Event-Driven Architecture oluşturmak, mikroservislerin asenkron olarak iletişim kurmasına olanak tanır. Mesaj tabanlı bir mimari, servislerin birbirinden bağımsız olarak çalışabilmesini ve daha esnek bir yapının oluşmasını sağlar. Aşağıda verdiğiniz kod parçası, Spring Cloud Stream kullanarak bir KStream işleme örneği sunmaktadır. Bu kod parçasını detaylandırarak ve açıklamalar ekleyerek bir Event-Driven Architecture oluşturmanın temel adımlarını inceleyelim.

1. Spring Cloud Stream Bağımlılıklarını Ekleme

İlk olarak, pom.xml dosyasına Spring Cloud Stream bağımlılıklarını eklemelisiniz. Örneğin, Kafka kullanıyorsanız aşağıdaki bağımlılıkları ekleyin:


<dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> </dependency> </dependencies>

2. Konfigürasyon Dosyasını Hazırlama

application.yml veya application.properties dosyasında, mesaj broker'ı (örneğin Kafka) için gerekli konfigürasyonları yapın:


spring: cloud: stream: kafka: binder: brokers: localhost:9092 bindings: input: destination: input-topic contentType: application/json output: destination: output-topic contentType: application/json

3. Stream Application Sınıfı

Aşağıda, Spring Cloud Stream ile bir KStream işleme uygulamasının adım adım açıklaması yer almaktadır:


import org.apache.kafka.streams.KStream; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Processor; import org.springframework.context.annotation.Bean; import org.springframework.kafka.annotation.EnableKafkaStreams; import org.springframework.kafka.config.KafkaStreamsConfiguration; import org.springframework.kafka.core.StreamsBuilderFactoryBean; import java.util.Arrays; import java.util.HashMap; import java.util.Map; @SpringBootApplication @EnableBinding(Processor.class) @EnableKafkaStreams public class StreamApplication { public static void main(String[] args) { SpringApplication.run(StreamApplication.class, args); } @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME) public KafkaStreamsConfiguration kStreamsConfigs() { Map<String, Object> props = new HashMap<>(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-app"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); return new KafkaStreamsConfiguration(props); } @StreamListener(Processor.INPUT) @SendTo(Processor.OUTPUT) public KStream<String, Long> process(KStream<String, String> input) { return input .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+"))) .groupBy((key, value) -> value) .count(Materialized.as("counts-store")) .toStream(); } }

Açıklamalar:

  1. Annotation'lar:

    • @SpringBootApplication: Spring Boot uygulamasını başlatır.
    • @EnableBinding(Processor.class): Processor bağlayıcıyı etkinleştirir ve input/output kanallarını kullanır.
    • @EnableKafkaStreams: Kafka Streams kullanımı için gerekli konfigürasyonu etkinleştirir.
  2. KStream İşleme:

    • @StreamListener(Processor.INPUT): Gelen mesajları dinler.
    • @SendTo(Processor.OUTPUT): İşlenmiş mesajları output kanalına gönderir.
    • input.flatMapValues(...): Gelen mesajları kelimelerine ayırır ve küçük harfe dönüştürür.
    • groupBy(...): Kelimeleri gruplar.
    • count(Materialized.as("counts-store")): Kelime sayısını tutar.
    • toStream(): Sonuçları bir stream olarak döner.
  3. Kafka Streams Konfigürasyonu:

    • @Bean: Kafka Streams konfigürasyonunu sağlar.
    • KafkaStreamsConfiguration: Kafka Streams için gerekli yapılandırmaları yapar.

4. Mikroservislerin Bağımsız Çalışması

Bu yapıda, her mikroservis kendi başına çalışabilir ve diğer mikroservislerden gelen mesajları asenkron olarak işleyebilir. Mesaj tabanlı mimari, mikroservislerin bağımsızlığını ve esnekliğini artırır, aynı zamanda daha ölçeklenebilir bir yapı sağlar.

Özet

Spring Cloud Stream ve Kafka Streams kullanarak mikroservisler arasında asenkron iletişim sağlamak, Event-Driven Architecture oluşturmanın etkili bir yoludur. Yukarıdaki örnek, bu mimariyi uygulamak için temel adımları ve örnek bir uygulamayı içermektedir. Bu yapı, daha dayanıklı ve esnek mikroservisler oluşturmanıza yardımcı olabilir.

Please Select Embedded Mode To Show The Comment System.*

Daha yeni Daha eski

نموذج الاتصال