Spring Boot සහ Kafka Streams: Real-time Data Processing වලට අලුත් පණක්! | SC Guide

Spring Boot සහ Kafka Streams: Real-time Data Processing වලට අලුත් පණක්! | SC Guide

Real-time Data Powerhouse: Spring Boot with Kafka Streams SC Guide

ආයුබෝවන් යාළුවනේ! අද මම ඔයාලත් එක්ක කතා කරන්න යන්නේ මේ දවස් වල තාක්ෂණික ලෝකයේ ගොඩක් දෙනෙක් කතා කරන, ඒ වගේම අපේ දෛනික ජීවිතයටත් සෑහෙන බලපෑමක් කරන අතිශය වැදගත් මාතෘකාවක් ගැන – ඒ තමයි Real-time Data Processing. හිතන්නකෝ ඔයාලා Online food order එකක් දැම්මා කියලා, ඒකෙ Status එක Update වෙන්න විනාඩි ගණන් ගියා නම් කොහොම වෙයිද? නැත්නම් Bank එකක Transactions Real-time Processing වෙන්න නැති වුණා නම්? එතනදි තමයි මේ Real-time Data Processing කියන එකේ වැදගත්කම ඉස්මතු වෙන්නේ.

අද මම විශේෂයෙන්ම කතා කරන්නේ Spring Boot Framework එකත් එක්ක Apache Kafka Streams library එක පාවිච්චි කරලා කොහොමද මේ Real-time Data Process කරන්නේ කියලා. පොඩ්ඩක් සංකීර්ණ වගේ පෙනුනත්, මේක හරිම සුන්දර journey එකක් කියලා මම සහතික වෙනවා! මේ ලිපිය අවසානයේදී, ඔබට Real-time stream processing පිළිබඳ පැහැදිලි අවබෝධයක් ලැබෙනවා වගේම, සරල data aggregation එකක් Spring Boot සහ Kafka Streams යොදාගෙන සාර්ථකව සිදු කරගන්නත් පුළුවන් වේවි.

Kafka Streams කියන්නේ මොකක්ද?

හරි, මුලින්ම බලමු මේ Kafka Streams කියන්නේ මොකක්ද කියලා. සරලවම කිව්වොත්, මේක Apache Kafka Ecosystem එකේම කොටසක්. Kafka කියන්නේ Distribution Streaming Platform එකක් කියලා ඔයාලා දන්නවා ඇති. ඒකෙන් පුළුවන් දත්ත විශාල ප්‍රමාණයක් වේගයෙන්, විශ්වාසවන්තව සහ fault-tolerant විදියට එහා මෙහා කරන්න. හැබැයි Kafka එකෙන් දත්ත transfer කරනවා මිසක්, ඒ දත්ත Process කරන්න directly උදව් වෙන්නේ නෑ. මෙතනදි තමයි Kafka Streams picture එකට එන්නේ.

Kafka Streams කියන්නේ client library එකක්. ඒකෙන් පුළුවන් ඔයාලගේ Java/Scala applications ඇතුලෙන්ම Kafka Topics වල තියෙන දත්ත Process කරන්න. මේක වෙනම Cluster එකක් දාන්න ඕන එකක් නෙවෙයි, ඔයාලගේ application එකේම thread එකක් විදියට run කරන්න පුළුවන්. ඒක තමයි මේකේ ලොකුම වාසියක්.

Kafka Streams වල තියෙන Core concepts දෙකක් තමයි KStream සහ KTable කියන්නේ:

  • KStream: මේක නිකන් data stream එකක් වගේ. එක event එකක් ඇවිත් Process වෙලා යනවා. උදාහරණයක් විදියට, bank transactions, log events වගේ දේවල්. මේක immutable (වෙනස් කළ නොහැකි) sequence of records එකක්. අලුත් event එකක් එනකොට, ඒක කලින් event එකට බලපාන්නේ නෑ.
  • KTable: මේක database table එකක් වගේ. යම්කිසි key එකකට අදාලව latest value එක තියාගෙන ඉන්නවා. උදාහරණයක් විදියට, user profile data, stock prices වගේ දේවල්. මේක updatable view එකක් වගේ ක්‍රියා කරනවා, key එකක් සඳහා අලුත් value එකක් ආවොත්, කලින් value එක replace වෙනවා.

මේ දෙකේ duality එක තේරුම් ගැනීම ගොඩක් වැදගත්. KStream එකක් KTable එකකට convert කරන්න පුළුවන්, ඒ වගේම අනෙක් පැත්තටත් පුළුවන්. මේකෙන් තමයි complex real-time transformations සහ aggregations කරන්න පුළුවන් වෙන්නේ. සරලව කිව්වොත්, KStream එකක් කියන්නේ වෙනස්වීම් වල (changes) ධාරාවක්, KTable එකක් කියන්නේ ඒ වෙනස්වීම් වලින් පස්සේ ලැබෙන වර්තමාන තත්වය (current state).

Spring Boot සහ Kafka Streams එකට?

දැන් අපි බලමු Spring Boot මේකට කොහොමද උදව් කරන්නේ කියලා. Spring Boot කියන්නේ Rapid Application Development වලට කිරුළු පැලඳූ framework එකක් කියලා අපි හැමෝම දන්නවා. Auto-configuration, Dependency Injection, Externalized Configuration වගේ දේවල් නිසා අපේ වැඩ ගොඩක් ලේසි වෙනවා.

Kafka Streams applications හදනකොට Spring Boot එකෙන් අපිට පුළුවන් ගොඩක් දේවල් සරල කරගන්න. Spring Boot Kafka module එකේ තියෙනවා spring-kafka-streams කියන dependency එක. ඒක add කරාම Spring Boot එකෙන් automatic configure කරනවා Kafka Streams client එකට අවශ්‍ය infrastructure ටික. ඔයාලට කරන්න තියෙන්නේ @EnableKafkaStreams කියන annotation එක main application class එකට add කරන එක විතරයි. එතකොට Spring Boot එකෙන් StreamsBuilderFactoryBean එකක් initialize කරලා දෙනවා. ඒක පාවිච්චි කරලා අපිට පුළුවන් අපේ Streaming topology එක define කරන්න.

මේකෙන් වෙන්නේ මොකක්ද? සාමාන්‍යයෙන් Kafka Streams application එකක් හදනකොට අපිට StreamsConfig, Serdes වගේ දේවල් ගොඩක් manual configure කරන්න වෙනවා. හැබැයි Spring Boot එකත් එක්ක නම්, මේ ගොඩක් දේවල් Spring Boot එකෙන් automatic කරනවා. අපිට පුළුවන් අපේ business logic එක ගැන විතරක් හිතලා code කරන්න. ඒක තමයි මේකේ ලොකුම වාසිය. අඩු code ප්‍රමාණයකින්, වැඩි functionality එකක් හදාගන්න පුළුවන්. ඒ වගේම Spring Boot ගොඩක් production-ready විශේෂාංග (උදා: Health Checks, Metrics) ලබා දෙන නිසා, ඔයාලගේ Streaming Application එක monitoring සහ managing කරන එකත් ලේසි වෙනවා.

ප්‍රායෝගික අභ්‍යාසය: Real-time Transaction Data එකතු කිරීම

හරි, දැන් අපි බලමු ප්‍රායෝගිකව මේක කොහොමද කරන්නේ කියලා. අපි හිතමු අපිට බැංකුවක Transactions ටිකක් Kafka topic එකකට එනවා කියලා. අපේ අරමුණ තමයි මේ transactions ටික Real-time aggregate කරලා, එක් එක් user කීයක් transactions කරලා තියෙනවද කියලා බලන එක. මේක අපි Spring Boot සහ Kafka Streams පාවිච්චි කරලා කරමු.

පියවර 1: Dependencies එකතු කිරීම (pom.xml)

මුලින්ම, ඔයාලගේ pom.xml එකට මේ dependency ටික add කරගන්න ඕනේ:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
    </dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
    </dependency>
    <!-- Lombok for boilerplate code reduction -->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <!-- Test dependencies (optional for basic setup) -->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

පියවර 2: application.properties Configuration

දැන් application.properties එකට Kafka Configuration ටික දාගමු:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.streams.application-id=transaction-aggregator-app
spring.kafka.streams.properties.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.kafka.streams.properties.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.kafka.streams.state.dir=/tmp/kafka-streams
spring.kafka.streams.properties.commit.interval.ms=1000
  • bootstrap-servers: ඔයාලගේ Kafka broker එක තියෙන තැන.
  • application-id: මේක unique වෙන්න ඕනේ. මේක තමයි Consumer Group ID එක විදියට පාවිච්චි වෙන්නේ.
  • default.key.serde, default.value.serde: Data Serialization/Deserialization වලට පාවිච්චි කරන classes.
  • state.dir: Kafka Streams state store එකට data save කරන location එක.
  • commit.interval.ms: Consumer offsets Kafka වලට commit කරන interval එක. Real-time applications වලට මේක අඩු අගයක් දෙනවා.

පියවර 3: Transaction Model එක (Java Class)

අපි සරල Transaction model එකක් හදාගමු. මේක JSON format එකට serialize/deserialize වෙන්න ඕනේ.

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Transaction {
    @JsonProperty("transactionId")
    private String transactionId;
    @JsonProperty("userId")
    private String userId;
    @JsonProperty("amount")
    private double amount;
    @JsonProperty("timestamp")
    private long timestamp;

    // A simple method to serialize/deserialize for demonstration
    // In a real application, consider a dedicated custom Serde class
    public static Transaction fromJson(String json) {
        try {
            return new ObjectMapper().readValue(json, Transaction.class);
        } catch (Exception e) {
            throw new RuntimeException("Error deserializing transaction: " + json, e);
        }
    }

    public String toJson() {
        try {
            return new ObjectMapper().writeValueAsString(this);
        } catch (Exception e) {
            throw new RuntimeException("Error serializing transaction", e);
        }
    }
}
  • lombok dependencies add කරගන්න අමතක කරන්න එපා. නැත්නම් getters/setters manual generate කරන්න වෙනවා.
  • ObjectMapper එක පාවිච්චි කරලා JSON String එකක් Transaction Object එකකට convert කරන හැටි සහ අනෙක් පැත්තට කරන හැටිත් මම මෙතන පෙන්නුවා.

පියවර 4: Kafka Streams Processor එක

මේක තමයි අපේ ප්‍රධාන Logic එක. මෙතනදි අපි Kafka Streams KStream එකක් build කරලා data aggregate කරනවා.

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

@Component
public class TransactionAggregatorStream {

    private static final String INPUT_TOPIC = "transactions-input";
    private static final String OUTPUT_TOPIC = "user-transaction-counts-output";

    @Autowired
    private StreamsBuilder streamsBuilder;

    @PostConstruct
    public void buildProcessingGraph() {
        // Custom Serde for Transaction object
        final Serde<Transaction> transactionSerde = Serdes.serdeFrom(
            (topic, data) -> {
                if (data == null) return null;
                return data.toJson().getBytes();
            },
            (topic, data) -> {
                if (data == null) return null;
                return Transaction.fromJson(new String(data));
            }
        );

        KStream<String, String> rawTransactionsStream = // Input key is String (userId or null), Value is String (JSON)
            streamsBuilder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), Serdes.String()));

        KTable<String, Long> userTransactionCounts =
            rawTransactionsStream
                .map((key, value) -> {
                    Transaction transaction = Transaction.fromJson(value);
                    // Re-key the stream by userId before grouping for aggregation
                    return new KeyValue<>(transaction.getUserId(), transaction);
                })
                .groupByKey(Consumed.with(Serdes.String(), transactionSerde)) // Group by userId
                .count(Materialized.as("user-transaction-counts-store")); // State store name

        userTransactionCounts.toStream()
            .to(OUTPUT_TOPIC, Produced.with(Serdes.String(), Serdes.Long()));

        System.out.println("Kafka Streams Topology built successfully for transaction aggregation.");
    }
}

පියවර 5: Main Application Class

අපේ Spring Boot Application එක ආරම්භ කරන Class එක.

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafkaStreams;

@SpringBootApplication
@EnableKafkaStreams
public class KafkaStreamsApplication {

    public static void main(String[] args) {
        SpringApplication.run(KafkaStreamsApplication.class, args);
    }
}

මේ Code එකෙන් වෙන්නේ මොකක්ද?

  1. @EnableKafkaStreams නිසා Spring Boot එක Kafka Streams infrastructure එක configure කරනවා.
  2. TransactionAggregatorStream class එක @Component එකක් නිසා Spring Boot එකෙන් ඒක instantiate කරනවා.
  3. @PostConstruct annotation එක නිසා application එක initialize වෙනකොට buildProcessingGraph() method එක execute වෙනවා.
  4. transactionSerde කියන එකෙන් අපි අපේ Transaction object එක String එකකට convert කරන්න (සහ අනෙක් පැත්තට) custom serializer/deserializer එකක් හදාගත්තා. Kafka Streams වලදී data process කරනකොට ඒ data වල format එක Serde (Serializer/Deserializer) එකක් විදියට specify කරන්න ඕනේ.
  5. streamsBuilder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), Serdes.String())) කියන line එකෙන් transactions-input කියන topic එකෙන් එන data KStream එකක් විදියට ගන්නවා. මේකේ Key එක String එකක්, Value එක String (JSON) එකක්.
  6. .map((key, value) -> ...): මෙතනදි අපි input stream එකේ String JSON value එක Transaction object එකකට deserialize කරලා, ඒ Transaction object එකේ userId එක අලුත් Key එක විදියට set කරනවා. මේ KeyValue operation එකෙන් Stream එක re-key කරනවා.
  7. .groupByKey(Consumed.with(Serdes.String(), transactionSerde)): මේකෙන් කරන්නේ KStream එකේ records ටික අලුත් Key (userId) එක අනුව group කරන එක. මෙතනදි අපි custom transactionSerde එක පාවිච්චි කරනවා, මොකද දැන් group වෙන්නේ userId සහ Transaction object එක.
  8. .count(Materialized.as("user-transaction-counts-store")): Group කරපු records වල ගණන (count) ගන්නවා. මේක KTable එකක් විදියට return කරනවා, ඒක user-transaction-counts-store කියන State Store එකේ maintain කරනවා. මේ State Store එක Local Disk එකේ තියෙන්න පුළුවන් (අපේ application.properties වල specify කරපු state.dir එකේ) නැත්නම් Remote storage එකක (RocksDB වගේ). මේක තමයි Kafka Streams වල State Management වල බලය. Event එකක් ආවම කලින් state එක මතක තියාගෙන, ඒක update කරන්න පුළුවන්.
  9. userTransactionCounts.toStream().to(OUTPUT_TOPIC, Produced.with(Serdes.String(), Serdes.Long())): අවසාන වශයෙන්, අපි KTable එක KStream එකකට convert කරලා, ඒක user-transaction-counts-output කියන topic එකට publish කරනවා. මේ output එකේ Key එක userId එක, Value එක Long (count) එක.

මේක Run කරන්නේ කොහොමද?

  1. මුලින්ම Kafka Broker එකක් run වෙලා තියෙන්න ඕනේ (localhost:9092).
  2. transactions-input සහ user-transaction-counts-output කියන topics දෙක create කරගන්න. Command line එකෙන් මේ වගේ:
    kafka-topics --create --topic transactions-input --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
    kafka-topics --create --topic user-transaction-counts-output --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
  3. Application එක run කරන්න (IDE එකකින් හෝ mvn spring-boot:run command එකෙන්).
  4. දැන් ඔයාලට පුළුවන් transactions-input topic එකට sample data publish කරන්න. (උදා: kafka-console-producer --broker-list localhost:9092 --topic transactions-input)
    • Example Input (paste one by one, press Enter after each):
      {"transactionId":"T001","userId":"user1","amount":100.0,"timestamp":1678886400000}
      {"transactionId":"T002","userId":"user2","amount":250.0,"timestamp":1678886401000}
      {"transactionId":"T003","userId":"user1","amount":50.0,"timestamp":1678886402000}
      {"transactionId":"T004","userId":"user3","amount":120.0,"timestamp":1678886403000}
      {"transactionId":"T005","userId":"user2","amount":80.0,"timestamp":1678886404000}
      {"transactionId":"T006","userId":"user1","amount":200.0,"timestamp":1678886405000}
  5. user-transaction-counts-output topic එක consume කරලා බලන්න Output එක එනවද කියලා. (උදා: kafka-console-consumer --bootstrap-server localhost:9092 --topic user-transaction-counts-output --from-beginning --property print.key=true --property key.separator=:)
    • Expected Output (might show intermediate counts for each key update):
      user1:1
      user2:1
      user1:2
      user3:1
      user2:2
      user1:3

මේකෙන් ඔයාලට තේරෙන්න ඕනේ Real-time data එකක් කොහොමද Stream එකක් විදියට Process කරලා, අවශ්‍ය විදියට Aggregate කරලා, නැවත වෙන topic එකකට publish කරන්නේ කියලා. මේක නිකන්ම නිකන් "Count" එකක් වුණාට, මේක ඇතුලේ තියෙන Logic එකෙන් කොච්චර දේවල් කරන්න පුළුවන්ද කියලා හිතන්න!

ප්‍රයෝජන සහ යෙදුම් (Benefits and Use Cases)

දැන් ඔයාලට අදහසක් එන්න ඇති Spring Boot සහ Kafka Streams එකට පාවිච්චි කරන එකේ වටිනාකම. මේකෙන් ලැබෙන ප්‍රධාන ප්‍රයෝජන ටිකක් ගැන කතා කරමු:

  • Scalability: Kafka Streams Applications horizontally scale කරන්න පුළුවන්. එකම Application ID එකෙන් instances ගණනාවක් run කරාම Kafka Stream එක automatically workload එක partition කරලා share කරනවා.
  • Fault Tolerance: යම් instance එකක් fail වුනොත්, Kafka Streams framework එක automatically ඒක identify කරලා, workload එක වෙන instance එකකට transfer කරනවා. Data Loss වීමක් වෙන්නේ නෑ.
  • Low Latency: Real-time processing නිසා data ආපු ගමන්ම Process වෙලා, Output එක ක්ෂණිකව ලැබෙනවා. Milliseconds ගණනක් ඇතුලත.
  • Stateful Processing: State Store එක පාවිච්චි කරලා කලින් ආපු data මතක තියාගෙන, ඒ අනුව වර්තමාන data process කරන්න පුළුවන්. අපේ උදාහරණයේදී වගේ Transaction Count එක maintain කරන එක.
  • Simple Development: Spring Boot එකත් එක්ක Kafka Streams integration එක ගොඩක් සරලයි. Boilerplate code අඩුයි.

Real-world Use Cases (අපේ ලංකාවේ වුණත්):

  • Real-time Analytics Dashboard: Online sales, website traffic වගේ දේවල් වලට instantly analytics generate කරන එක. E-commerce platforms වලට මේක ගොඩක් වැදගත්. උදාහරණයක් විදියට, Daraz, PickMe Food වගේ platforms වලට ඒ වෙලාවෙම තියෙන Orders, Active users ගණන dashboard එකක බලන්න මේක පාවිච්චි කරන්න පුළුවන්.
  • Fraud Detection: බැංකු සහ මුල්‍ය ආයතන වල transactions වල pattern එක analyze කරලා, අසාමාන්‍ය transaction එකක් වුනොත් instantly detect කරන එක. මේක අපේ රටේ බැංකු සහ Credit Card companies වලට අතිශය වැදගත්.
  • Personalized Recommendations: User activities (views, purchases) analyze කරලා, ඒ වෙලාවෙම ඒ අයට recommend කරන්න පුළුවන් දේවල් suggest කරන එක. Netflix, Amazon වගේ platforms කරන විදියට. Dialog, Mobitel වගේ සමාගම් වලට මේක පාවිච්චි කරලා personalized data packages හෝ offers දෙන්න පුළුවන්.
  • IoT Data Processing: Sensors වලින් එන data (temperature, pressure, GPS coordinates) වගේ දේවල් Real-time process කරලා anomalies detect කරන එක. Smart agriculture, smart cities වගේ project වලට මේවා ගොඩක් ප්‍රයෝජනවත්.
  • Log Monitoring and Alerting: Server logs analyze කරලා, critical errors වුනොත් instantly alerts යවන එක. ඕනෑම large-scale software system එකකට මේක අත්‍යවශ්‍යයි.

මේ හැම දේටම Kafka Streams සහ Spring Boot කියන්නේ කදිම සංකලනයක්.

නිගමනය

ඉතින් යාළුවනේ, මේ ලිපියෙන් මම ඔයාලට Real-time Data Processing වල වැදගත්කම, Apache Kafka Streams සහ Spring Boot එකට පාවිච්චි කරලා කොහොමද data aggregate කරන්නේ කියලා පොඩි අදහසක් දෙන්න උත්සාහ කළා. මේක හරියට විශාල ගඟක වතුර ගලනකොට, ඒ වතුරේ තියෙන අපද්‍රව්‍ය පෙරලා, පිරිසිදු වතුර අවශ්‍ය තැනට හරවනවා වගේ වැඩක්. Technology ලෝකයේ මේ වගේ Real-time solutions වලට තියෙන ඉල්ලුම දවසින් දවස වැඩි වෙනවා.

මේ subject එක ගොඩක් විශාලයි. මම දීපු උදාහරණය සරල එකක් වුණත්, මේක ඔයාලට වැඩිදුර අධ්‍යයනය කරන්න හොඳ ආරම්භයක් වෙයි කියලා මම හිතනවා. Kafka Streams API එකේ තවත් ගොඩක් operations තියෙනවා (join, windowing, transform, branch වගේ). ඒ හැම එකක් ගැනම ඉගෙනගෙන, තව complex use cases හදා බලන්න ඔයාලට පුළුවන්.

ඔයාලත් මේක try කරලා බලන්න. මොනවා හරි ප්‍රශ්න තියෙනවා නම්, මේ ගැන තව දැනගන්න කැමති නම්, පහලින් comment එකක් දාන්න. මට පුළුවන් විදියට උදව් කරන්නම්. එහෙනම්, තවත් මෙවැනිම ලිපියකින් හමුවෙමු! සුභ දවසක්!