Spring Cloud Stream Kullanarak Olay Odaklı Mimari: Kod Örnekleriyle Adım Adım Uygulama

2024 yılında, kafka kullanan bir Spring Cloud microservice projemiz var. Event driven architecture kurmak istiyoruz. Event'lerın yanında kafka ile ms'ler arası mesajlarda iletmek istiyoruz. Bazı ms'lerim producer iken, bazı ms'lerim consumer durumunda. Event ve message'ların formatı ne olmalı? Serialization ve deserialization nasıl yapılmalı? Pub-Sub sistemi nasıl kurulmalı? Projenin bağımlılıkları neler olmalı? Spring boot ms'lerimde implementesyon nasıl olmalı?



Spring Cloud ve Apache Kafka kullanarak event-driven mimari kurarken dikkate almanız gereken birkaç önemli nokta bulunmaktadır. Bu mimari, mikroservisler arasında asenkron iletişim sağlayarak sistemlerin daha esnek ve ölçeklenebilir olmasına yardımcı olur. Aşağıda, bir event-driven mimari kurulumunda dikkate almanız gereken adımlar ve uygulama detayları bulunmaktadır:


1. Event ve Mesaj Formatı

JSON: Kolayca okunabilir ve insan tarafından anlaşılır. Farklı platformlar ve diller arasında geniş destek görür.

Avro: Schema tabanlı bir serialization framework’üdür. Veri yapısını ve türlerini tanımlamak için schema kullanır, bu da verilerin daha kompakt olmasını ve hızlı işlenmesini sağlar.

Protobuf: Google tarafından geliştirilen, veri yapılarını sıkı bir şekilde tanımlayan ve verileri çok kompakt bir biçimde serialize eden bir sistemdir.

2. Serialization ve Deserialization

Apache Kafka, veri serializasyonu ve deserializasyonu için kendi Serializer ve Deserializer sınıflarını sağlar. Spring Kafka, bu işlemleri daha da kolaylaştırır ve Spring'in @KafkaListener ve KafkaTemplate kullanımı ile entegre eder.


3. Pub-Sub Sistemi Kurulumu

Topic Oluşturma: İletişim kurulacak her bir event veya mesaj türü için Kafka'da bir topic oluşturulmalıdır.

Producer Konfigürasyonu: Mikroservislerin producer olarak ayarlanması, Kafka'ya mesaj göndermek için KafkaTemplate kullanılmalıdır.

Consumer Konfigürasyonu: Mikroservislerin consumer olarak ayarlanması, mesajları almak için @KafkaListener anotasyonu kullanılmalıdır.

4. Projenin Bağımlılıkları

Spring Boot mikroservisleriniz için Maven veya Gradle bağımlılıklarınız şunları içermelidir:


xml

Copy code

<!-- Kafka -->

<dependency>

    <groupId>org.springframework.kafka</groupId>

    <artifactId>spring-kafka</artifactId>

</dependency>

<!-- Kafka Streams (Eğer stream işlemleri yapılacaksa) -->

<dependency>

    <groupId>org.springframework.kafka</groupId>

    <artifactId>spring-kafka-streams</artifactId>

</dependency>

5. Spring Boot Mikroservislerde Implementasyon

Producer Konfigürasyonu: KafkaTemplate kullanarak mesaj gönderme işlemleri için bir servis sınıfı oluşturulur.

Consumer Konfigürasyonu: @KafkaListener anotasyonu ile mesajları dinleyen bir metot tanımlanır. Bu metod, belirli bir topic'ten mesajları alır ve işler.

6. İyi Uygulama Örnekleri

Idempotence ve Deduplication: Mesaj işleme idempotent olmalıdır; yani, aynı mesaj birden fazla kez işlense bile, yalnızca bir kez etki eder.

Error Handling: Hatalı mesajların nasıl ele alınacağını planlayın. Örneğin, bir hata kuyruğuna yönlendirme veya yeniden deneme mekanizmaları.

Monitoring ve Logging: Kafka ve mikroservislerinizin durumunu izlemek için uygun araçları ve yöntemleri kullanın.

Bu yapıyı kurarken, Spring Cloud Stream gibi daha yüksek seviyeli abstraksiyonlar sağlayan kütüphaneleri de değerlendirebilirsiniz. Bu, Kafka ile iletişim kurarken daha az boilerplate kodu ile çalışmanızı sağlayabilir ve daha hızlı geliştirme yapmanıza olanak tanır.


Spring Cloud Stream, Spring Cloud ve Apache Kafka kullanarak event-driven mikroservisler geliştirmek için daha yüksek seviyeli bir abstraksiyon sağlar. Bu, konfigürasyon ve kodlama yükünü azaltırken, mesajlaşma sistemleri arasında kolay geçiş yapmanıza olanak tanır. Spring Cloud Stream ile çalışırken dikkate almanız gereken temel kavramlar şunlardır:


Spring Cloud Stream ile Çalışmak

Binder Konfigürasyonu

Spring Cloud Stream, mesajlaşma sistemlerine (Kafka, RabbitMQ vb.) bağlanmak için "Binder" kavramını kullanır. Bu, uygulamanızın hangi mesajlaşma sistemiyle iletişim kuracağını belirlemenize olanak tanır. Kafka için spring-cloud-stream-binder-kafka bağımlılığını projenize eklemeniz gerekir:


xml

Copy code

<dependency>

    <groupId>org.springframework.cloud</groupId>

    <artifactId>spring-cloud-stream-binder-kafka</artifactId>

</dependency>

Uygulama Yapılandırması

Spring Cloud Stream, application.properties veya application.yml dosyasında yapılandırılabilir. Örneğin, Kafka ile çalışırken, broker adresleri, topic isimleri ve diğer Kafka özellikleri bu dosyada tanımlanır.


StreamListener ve EnableBinding

Spring Cloud Stream, mesajları işlemek için @StreamListener anotasyonunu kullanır. Ayrıca, @EnableBinding anotasyonu ile hangi kanalların kullanılacağını belirtirsiniz. Örneğin, bir mesajı dinlemek ve işlemek için bir kanala bağlanabilirsiniz.


java

Copy code

@EnableBinding(Processor.class)

public class MyMessageProcessor {


    @StreamListener(target = Processor.INPUT)

    public void processMyMessage(String message) {

        // Mesaj işleme mantığı

    }

}

Mesaj Gönderme

Mesaj göndermek için, Spring Cloud Stream StreamBridge veya @Output anotasyonu kullanılarak tanımlanan bir kanal üzerinden MessageChannel aracılığıyla yapılabilir. StreamBridge kullanımı, programatik olarak mesaj göndermenize olanak tanır.


java

Copy code

@Autowired

private StreamBridge streamBridge;


public void sendMessage(String message) {

    streamBridge.send("myChannel", message);

}

İyi Uygulama Örnekleri ve Dikkat Edilmesi Gerekenler

Dinamik Özellikler: Spring Cloud Stream, runtime'da özellikleri değiştirmek için destek sunar. Bu, canlı sistemlerde ayarları değiştirmenize olanak tanır.

Partitioning: Kafka ile çalışırken, mesajların birden fazla partition arasında dengeli bir şekilde dağıtılmasını sağlayarak işlem kapasitesini artırabilirsiniz.

Güvenlik: Kafka ve Spring Cloud Stream kullanırken, mesajlaşma sistemlerinize erişimi güvenli hale getirmek için uygun güvenlik önlemlerini alın.

Test Desteği: Spring Cloud Stream, mikroservislerinizin mesajlaşma yönlerini test etmek için araçlar ve metodolojiler sağlar. Bu, entegrasyon testleri sırasında faydalıdır.

Spring Cloud Stream kullanımı, Kafka ve diğer mesajlaşma sistemleri ile çalışırken, uygulamanızın bağlantı detaylarından soyutlanmasını sağlar. Bu, kodunuzu daha temiz, yönetilebilir ve adaptif hale getirirken, aynı zamanda geliştirme sürecini hızlandırır.


Spring Cloud Stream'in sunduğu esneklik ve kolay kullanımın yanı sıra, bir mikroservis mimarisinde event-driven iletişim kurarken dikkate almanız gereken ileri düzey konseptler ve en iyi uygulamalar da bulunmaktadır. Bu konseptler, sisteminizin ölçeklenebilirliğini, dayanıklılığını ve bakım kolaylığını artırabilir.


İleri Düzey Konseptler ve En İyi Uygulamalar

Error Handling ve Retry Mechanisms

Spring Cloud Stream, hata yönetimi için kapsamlı destek sunar. Mesaj işleme sırasında bir hata meydana geldiğinde, sistem otomatik olarak yeniden deneme (retry) mekanizmalarını devreye sokabilir veya hatalı mesajları bir "dead letter" kuyruğuna yönlendirebilir. Bu işlevsellik, application.properties dosyasında yapılandırılabilir ve işlem mantığınızın dayanıklılığını artırır.


Dinamik Özellikler

Spring Cloud Stream, runtime sırasında özelliklerin dinamik olarak değiştirilmesini destekler. Bu, canlı sistemlerde, uygulamanızın davranışını değiştirmenize olanak tanırken, sistemin yeniden başlatılmasına gerek kalmaz. Örneğin, log seviyeleri, yeniden deneme politikaları ve hatta bağlı olduğunuz topic'ler gibi özellikler dinamik olarak yönetilebilir.


Monitoring ve Logging

Mikroservis mimarisinde, her bir servisin ve mesajlaşma altyapısının izlenmesi ve loglanması kritik öneme sahiptir. Spring Cloud Stream, Spring Boot'un Actuator desteği ile entegre çalışır, bu da sağlık durumu, metrikler ve loglama için kapsamlı destek sağlar. Kafka'nın kendi monitoring araçları ile birlikte, bu özellikler sistem genelinde görünürlüğü ve izlenebilirliği artırır.


Event Sourcing ve CQRS

Event-driven mimarilerde, Event Sourcing ve Command Query Responsibility Segregation (CQRS) desenleri sıkça kullanılır. Spring Cloud Stream, bu desenlerin uygulanmasını kolaylaştırabilir. Event Sourcing, sistemdeki değişiklikleri bir dizi event olarak kaydetmeyi, CQRS ise okuma ve yazma işlemlerini ayrı modeller üzerinden yönetmeyi ifade eder. Bu yaklaşımlar, sistemlerin ölçeklenebilirliğini ve performansını artırabilir.


Multi-Binder Destek

Bir uygulama içinde birden fazla mesajlaşma sistemiyle iletişim kurma ihtiyacı doğabilir. Spring Cloud Stream, aynı uygulama içinde birden fazla binder (Kafka, RabbitMQ vb.) kullanımını destekler. Bu, farklı sistemler arasında köprü kurmanıza ve mesajlaşma altyapınızı esnek bir şekilde genişletmenize olanak tanır.


Sonuç

Spring Cloud Stream ve Apache Kafka kullanarak kurulan event-driven mimari, mikroservisler arasında güçlü, esnek ve ölçeklenebilir asenkron iletişim sağlar. En iyi uygulamaları ve ileri düzey konseptleri takip etmek, sisteminizin performansını, dayanıklılığını ve bakım kolaylığını artırır. Sisteminizi tasarlarken bu prensipleri ve uygulamaları dikkate almak, uzun vadede faydalar sağlayacaktır.


User

Örnek 2 ms'li bir spring cloud, kafka ve spring cloud stream senaryosu ve kodu

ChatGPT

Bir Spring Cloud, Kafka ve Spring Cloud Stream kullanarak basit bir senaryo üzerinden gitmek, konseptleri daha iyi anlamanıza yardımcı olacaktır. Bu örnekte, iki mikroservisimiz olacak: biri mesaj üreten bir Producer ve diğeri bu mesajları tüketen bir Consumer.


Ortam Kurulumu

Öncelikle, Apache Kafka'nın sisteminizde çalışır durumda olduğundan emin olun. Kafka ve Zookeeper'ı başlatmak için resmi Kafka Quickstart dokümantasyonunu takip edebilirsiniz.


Maven Bağımlılıkları

Her iki mikroservis için pom.xml dosyasına Spring Cloud Stream ve Kafka bağımlılıklarını ekleyin.


xml

Copy code

<dependency>

    <groupId>org.springframework.cloud</groupId>

    <artifactId>spring-cloud-starter-stream-kafka</artifactId>

</dependency>

Producer Servisi

Producer mikroservisi, belirli aralıklarla mesajları bir Kafka topic'ine gönderir.


Application.properties

properties

Copy code

spring.cloud.stream.bindings.output.destination=my-topic

spring.cloud.stream.kafka.binder.brokers=localhost:9092

Producer Kodu

java

Copy code

package com.example.producer;


import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.cloud.stream.function.StreamBridge;

import org.springframework.scheduling.annotation.Scheduled;

import org.springframework.stereotype.Component;


@Component

public class MessageProducer {


    @Autowired

    private StreamBridge streamBridge;


    @Scheduled(fixedDelay = 5000)

    public void sendMessage() {

        String message = "Hello from Producer at " + System.currentTimeMillis();

        streamBridge.send("output", message);

        System.out.println("Sent: " + message);

    }

}

@Scheduled anotasyonu ile sendMessage metodunu her 5 saniyede bir çalışacak şekilde ayarlayabilirsiniz.


Consumer Servisi

Consumer mikroservisi, Producer tarafından gönderilen mesajları tüketir.


Application.properties

properties

Copy code

spring.cloud.stream.bindings.input.destination=my-topic

spring.cloud.stream.kafka.binder.brokers=localhost:9092

Consumer Kodu

java

Copy code

package com.example.consumer;


import org.springframework.cloud.stream.annotation.StreamListener;

import org.springframework.cloud.stream.messaging.Sink;

import org.springframework.stereotype.Component;


@Component

public class MessageConsumer {


    @StreamListener(Sink.INPUT)

    public void handleMessage(String message) {

        System.out.println("Received: " + message);

    }

}

Uygulamaları Çalıştırmak

Her iki uygulamayı da Spring Boot uygulaması olarak başlatın. Producer uygulaması, mesajları Kafka topic'ine gönderirken, Consumer uygulaması bu mesajları alacak ve konsola yazdıracaktır.


Sonuç

Bu basit senaryo, bir Producer ve bir Consumer kullanarak Spring Cloud Stream ve Kafka ile event-driven mimari kurulumunun temellerini göstermektedir. Gerçek dünya uygulamalarında, bu yapı daha karmaşık iş akışlarına, hata yönetimi mekanizmalarına ve performans iyileştirmelerine genişletilebilir.


User

Örnek bir event için json yazar mısın?

ChatGPT

Elbette, bir mikroservis mimarisinde kullanılabilecek örnek bir event için basit bir JSON formatı aşağıdaki gibidir. Bu örnek, bir sipariş sisteminde bir siparişin başarıyla oluşturulduğunu temsil eden bir event'i göstermektedir. JSON formatı, event'in tipini, zaman damgasını ve siparişle ilgili detayları içerir.


json

Copy code

{

  "eventType": "OrderCreated",

  "timestamp": "2024-02-04T12:34:56Z",

  "orderId": "123456",

  "userId": "78910",

  "orderDetails": {

    "totalAmount": 299.99,

    "currency": "USD",

    "items": [

      {

        "itemId": "001",

        "productName": "Wireless Mouse",

        "quantity": 1,

        "price": 29.99

      },

      {

        "itemId": "002",

        "productName": "Keyboard",

        "quantity": 1,

        "price": 49.99

      },

      {

        "itemId": "003",

        "productName": "USB-C Charging Cable",

        "quantity": 2,

        "price": 19.99

      }

    ]

  }

}

Bu JSON yapısı, bir event'in temel unsurlarını içerir:


eventType: Event'in tipi, bu durumda bir siparişin oluşturulduğunu belirten "OrderCreated".

timestamp: Event'in oluşturulduğu zaman damgası. ISO 8601 formatında.

orderId: Oluşturulan siparişin benzersiz kimliği.

userId: Siparişi oluşturan kullanıcının kimliği.

orderDetails: Siparişle ilgili detayları içeren bir nesne. Bu nesne, siparişin toplam tutarını, para birimini ve siparişteki ürünlerin bir listesini içerir.

Bu tür bir JSON yapısı, mikroservisler arasında iletişim kurarken veya bir event sourcing sisteminde event'leri saklarken kullanılabilir. Event'in yapısı, uygulamanızın ihtiyaçlarına göre özelleştirilebilir.


Bu event'i spring cloud stream ile nasıl gönderir ve okurum?


Spring Cloud Stream ile bir JSON event'ini göndermek ve okumak için, Spring Boot uygulamanızda birkaç adımı takip etmeniz gerekmektedir. Bu işlemleri gerçekleştirmek için önce sisteminizi yapılandırmanız, ardından event gönderme ve okuma için kodunuzu yazmanız gerekir. Aşağıda, bir önceki örnekte oluşturduğumuz JSON event'ini kullanarak bir Producer ve bir Consumer servisi oluşturacağız.


Producer Servisi Yapılandırması ve Kodu

Öncelikle, event'leri Kafka'ya gönderen bir Producer servisi oluşturalım. Bu servis, önceden tanımladığımız JSON event'ini kullanacak.


application.properties Yapılandırması

properties

Copy code

spring.cloud.stream.bindings.output-out-0.destination=order-events

spring.cloud.stream.kafka.binder.brokers=localhost:9092

Bu yapılandırma, order-events adlı Kafka topic'ine mesaj göndermek için kullanılacak output kanalını tanımlar.


Event Gönderme Kodu

java

Copy code

package com.example.producer;


import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.messaging.Message;

import org.springframework.messaging.support.MessageBuilder;

import org.springframework.cloud.stream.function.StreamBridge;

import org.springframework.stereotype.Service;


@Service

public class OrderEventProducer {


    @Autowired

    private StreamBridge streamBridge;


    public void sendOrderCreatedEvent(OrderCreatedEvent orderEvent) {

        Message<OrderCreatedEvent> message = MessageBuilder.withPayload(orderEvent).build();

        streamBridge.send("output-out-0", message);

    }

}

OrderCreatedEvent sınıfınız, örnek JSON event'imize karşılık gelen bir Java sınıfıdır. Bu sınıfı, JSON event'inizdeki alanları temsil edecek şekilde oluşturmalısınız.


Consumer Servisi Yapılandırması ve Kodu

Şimdi, Kafka'dan event'leri okuyan bir Consumer servisi oluşturalım.


application.properties Yapılandırması

properties

Copy code

spring.cloud.stream.bindings.input-in-0.destination=order-events

spring.cloud.stream.kafka.binder.brokers=localhost:9092

Bu yapılandırma, order-events adlı Kafka topic'inden mesaj almak için kullanılacak input kanalını tanımlar.


Event Okuma Kodu

java

Copy code

package com.example.consumer;


import org.springframework.cloud.stream.annotation.StreamListener;

import org.springframework.cloud.stream.messaging.Sink;

import org.springframework.stereotype.Component;


@Component

public class OrderEventConsumer {


    @StreamListener(Sink.INPUT)

    public void handleOrderCreatedEvent(OrderCreatedEvent orderEvent) {

        System.out.println("Received order event: " + orderEvent);

    }

}

Bu kod parçası, order-events topic'inden gelen mesajları dinler ve konsola yazdırır. OrderCreatedEvent sınıfını, JSON event'inizdeki alanları temsil edecek şekilde oluşturmalısınız ve bu sınıf @StreamListener metodunun parametresi olarak kullanılmalıdır.


JSON Serileştirme/Çözümleme

Spring Cloud Stream otomatik olarak Java nesnelerini JSON'a serileştirir ve JSON'dan Java nesnelerine çözümler. Ancak, bu işlevsellikten faydalanmak için spring-cloud-stream bağımlılığınızın yanı sıra jackson-databind gibi bir JSON işleme kütüphanesine ihtiyacınız olacaktır. Bu genellikle Spring Boot'un spring-boot-starter-web bağımlılığı ile otomatik olarak sağlanır.


Notlar

Örnek kodlar, Spring Cloud Stream'in temel kullanımını göstermektedir ve gerçek uygulama geliştirme sırasında uygulamanızın ihtiyaçlarına göre özelleştirilmelidir.

Kafka ve Zookeeper'ın çalışır durumda olduğundan ve application.properties dosyasındaki broker adreslerinin doğru yapılandırıldığından emin olun.

OrderCreatedEvent sınıfı, örnek JSON event'inize karşılık gelen bir POJO olmalıdır. Bu sınıf, event içindeki verileri temsil eder ve Spring'in Jackson kütüphanesi tarafından otomatik olarak JSON formatına serileştirilip, JSON'dan Java nesnesine çözümlenebilir. İşte bu sınıfın bir örneği:


OrderCreatedEvent Sınıfı

java

Copy code

package com.example.domain;


import java.math.BigDecimal;

import java.util.List;


public class OrderCreatedEvent {


    private String eventType;

    private String timestamp;

    private String orderId;

    private String userId;

    private OrderDetails orderDetails;


    // Getter'lar ve Setter'lar


    public static class OrderDetails {

        private BigDecimal totalAmount;

        private String currency;

        private List<Item> items;


        // Getter'lar ve Setter'lar

    }


    public static class Item {

        private String itemId;

        private String productName;

        private int quantity;

        private BigDecimal price;


        // Getter'lar ve Setter'lar

    }

}

Bu sınıf, önceki JSON örneğinizde belirtilen alanları temsil eder. OrderCreatedEvent, OrderDetails ve Item sınıfları iç içe tanımlanmıştır. Bu sınıfların her biri için gerekli getter ve setter metodlarını eklemeyi unutmayın.


Uygulamaları Çalıştırma

Uygulamalarınızı çalıştırmadan önce, Kafka'nın ve gerekirse Zookeeper'ın çalışır durumda olduğundan emin olun. Daha sonra, Producer ve Consumer mikroservislerinizi Spring Boot uygulaması olarak başlatın:


Producer uygulaması başladığında, OrderEventProducer sınıfındaki sendOrderCreatedEvent metodunu çağırarak bir OrderCreatedEvent mesajı gönderebilirsiniz. Bu, order-events topic'ine bir mesaj gönderir.

Consumer uygulaması bu mesajı alır ve OrderEventConsumer sınıfındaki handleOrderCreatedEvent metodunda işler. Alınan mesaj konsola yazdırılır.

Bu basit örnek, Spring Cloud Stream ile Kafka üzerinden JSON tabanlı mesajlaşmanın nasıl gerçekleştirileceğini göstermektedir. Gerçek bir uygulama senaryosunda, mesaj gönderme ve alma işlemlerini tetiklemek için HTTP endpoint'leri, zamanlanmış görevler veya sistem içi diğer olaylar kullanabilirsiniz.


Özet

Spring Cloud Stream, Kafka gibi mesajlaşma sistemleri ile çalışırken, mikroservisler arası iletişimi kolaylaştıran güçlü bir araçtır. Bu rehberde gösterilen yapılandırma ve kod örnekleri, bu teknolojileri kullanarak event-driven mikroservis mimarileri geliştirmenin temellerini sağlar. JSON tabanlı mesajların serileştirilmesi, gönderilmesi, alınması ve çözümlenmesi işlemleri, Spring Cloud Stream'in otomatik serileştirme ve çözümleme özellikleri sayesinde kolaylaştırılmıştır. Bu özellikler, modern mikroservis uygulamalarınızda esnek ve ölçeklenebilir event-driven mimariler oluşturmanıza olanak tanır.

Please Select Embedded Mode To Show The Comment System.*

Daha yeni Daha eski

نموذج الاتصال