Spring Boot සහ Kafka Streams: Real-time Data Processing වලට අලුත් පණක්! | SC Guide

Real-time Data Powerhouse: Spring Boot with Kafka Streams SC Guide
ආයුබෝවන් යාළුවනේ! අද මම ඔයාලත් එක්ක කතා කරන්න යන්නේ මේ දවස් වල තාක්ෂණික ලෝකයේ ගොඩක් දෙනෙක් කතා කරන, ඒ වගේම අපේ දෛනික ජීවිතයටත් සෑහෙන බලපෑමක් කරන අතිශය වැදගත් මාතෘකාවක් ගැන – ඒ තමයි Real-time Data Processing. හිතන්නකෝ ඔයාලා Online food order එකක් දැම්මා කියලා, ඒකෙ Status එක Update වෙන්න විනාඩි ගණන් ගියා නම් කොහොම වෙයිද? නැත්නම් Bank එකක Transactions Real-time Processing වෙන්න නැති වුණා නම්? එතනදි තමයි මේ Real-time Data Processing කියන එකේ වැදගත්කම ඉස්මතු වෙන්නේ.
අද මම විශේෂයෙන්ම කතා කරන්නේ Spring Boot Framework එකත් එක්ක Apache Kafka Streams library එක පාවිච්චි කරලා කොහොමද මේ Real-time Data Process කරන්නේ කියලා. පොඩ්ඩක් සංකීර්ණ වගේ පෙනුනත්, මේක හරිම සුන්දර journey එකක් කියලා මම සහතික වෙනවා! මේ ලිපිය අවසානයේදී, ඔබට Real-time stream processing පිළිබඳ පැහැදිලි අවබෝධයක් ලැබෙනවා වගේම, සරල data aggregation එකක් Spring Boot සහ Kafka Streams යොදාගෙන සාර්ථකව සිදු කරගන්නත් පුළුවන් වේවි.
Kafka Streams කියන්නේ මොකක්ද?
හරි, මුලින්ම බලමු මේ Kafka Streams කියන්නේ මොකක්ද කියලා. සරලවම කිව්වොත්, මේක Apache Kafka Ecosystem එකේම කොටසක්. Kafka කියන්නේ Distribution Streaming Platform එකක් කියලා ඔයාලා දන්නවා ඇති. ඒකෙන් පුළුවන් දත්ත විශාල ප්රමාණයක් වේගයෙන්, විශ්වාසවන්තව සහ fault-tolerant විදියට එහා මෙහා කරන්න. හැබැයි Kafka එකෙන් දත්ත transfer කරනවා මිසක්, ඒ දත්ත Process කරන්න directly උදව් වෙන්නේ නෑ. මෙතනදි තමයි Kafka Streams picture එකට එන්නේ.
Kafka Streams කියන්නේ client library එකක්. ඒකෙන් පුළුවන් ඔයාලගේ Java/Scala applications ඇතුලෙන්ම Kafka Topics වල තියෙන දත්ත Process කරන්න. මේක වෙනම Cluster එකක් දාන්න ඕන එකක් නෙවෙයි, ඔයාලගේ application එකේම thread එකක් විදියට run කරන්න පුළුවන්. ඒක තමයි මේකේ ලොකුම වාසියක්.
Kafka Streams වල තියෙන Core concepts දෙකක් තමයි KStream සහ KTable කියන්නේ:
- KStream: මේක නිකන් data stream එකක් වගේ. එක event එකක් ඇවිත් Process වෙලා යනවා. උදාහරණයක් විදියට, bank transactions, log events වගේ දේවල්. මේක immutable (වෙනස් කළ නොහැකි) sequence of records එකක්. අලුත් event එකක් එනකොට, ඒක කලින් event එකට බලපාන්නේ නෑ.
- KTable: මේක database table එකක් වගේ. යම්කිසි key එකකට අදාලව latest value එක තියාගෙන ඉන්නවා. උදාහරණයක් විදියට, user profile data, stock prices වගේ දේවල්. මේක updatable view එකක් වගේ ක්රියා කරනවා, key එකක් සඳහා අලුත් value එකක් ආවොත්, කලින් value එක replace වෙනවා.
මේ දෙකේ duality එක තේරුම් ගැනීම ගොඩක් වැදගත්. KStream එකක් KTable එකකට convert කරන්න පුළුවන්, ඒ වගේම අනෙක් පැත්තටත් පුළුවන්. මේකෙන් තමයි complex real-time transformations සහ aggregations කරන්න පුළුවන් වෙන්නේ. සරලව කිව්වොත්, KStream එකක් කියන්නේ වෙනස්වීම් වල (changes) ධාරාවක්, KTable එකක් කියන්නේ ඒ වෙනස්වීම් වලින් පස්සේ ලැබෙන වර්තමාන තත්වය (current state).
Spring Boot සහ Kafka Streams එකට?
දැන් අපි බලමු Spring Boot මේකට කොහොමද උදව් කරන්නේ කියලා. Spring Boot කියන්නේ Rapid Application Development වලට කිරුළු පැලඳූ framework එකක් කියලා අපි හැමෝම දන්නවා. Auto-configuration, Dependency Injection, Externalized Configuration වගේ දේවල් නිසා අපේ වැඩ ගොඩක් ලේසි වෙනවා.
Kafka Streams applications හදනකොට Spring Boot එකෙන් අපිට පුළුවන් ගොඩක් දේවල් සරල කරගන්න. Spring Boot Kafka module එකේ තියෙනවා spring-kafka-streams
කියන dependency එක. ඒක add කරාම Spring Boot එකෙන් automatic configure කරනවා Kafka Streams client එකට අවශ්ය infrastructure ටික. ඔයාලට කරන්න තියෙන්නේ @EnableKafkaStreams
කියන annotation එක main application class එකට add කරන එක විතරයි. එතකොට Spring Boot එකෙන් StreamsBuilderFactoryBean
එකක් initialize කරලා දෙනවා. ඒක පාවිච්චි කරලා අපිට පුළුවන් අපේ Streaming topology එක define කරන්න.
මේකෙන් වෙන්නේ මොකක්ද? සාමාන්යයෙන් Kafka Streams application එකක් හදනකොට අපිට StreamsConfig
, Serdes
වගේ දේවල් ගොඩක් manual configure කරන්න වෙනවා. හැබැයි Spring Boot එකත් එක්ක නම්, මේ ගොඩක් දේවල් Spring Boot එකෙන් automatic කරනවා. අපිට පුළුවන් අපේ business logic එක ගැන විතරක් හිතලා code කරන්න. ඒක තමයි මේකේ ලොකුම වාසිය. අඩු code ප්රමාණයකින්, වැඩි functionality එකක් හදාගන්න පුළුවන්. ඒ වගේම Spring Boot ගොඩක් production-ready විශේෂාංග (උදා: Health Checks, Metrics) ලබා දෙන නිසා, ඔයාලගේ Streaming Application එක monitoring සහ managing කරන එකත් ලේසි වෙනවා.
ප්රායෝගික අභ්යාසය: Real-time Transaction Data එකතු කිරීම
හරි, දැන් අපි බලමු ප්රායෝගිකව මේක කොහොමද කරන්නේ කියලා. අපි හිතමු අපිට බැංකුවක Transactions ටිකක් Kafka topic එකකට එනවා කියලා. අපේ අරමුණ තමයි මේ transactions ටික Real-time aggregate කරලා, එක් එක් user කීයක් transactions කරලා තියෙනවද කියලා බලන එක. මේක අපි Spring Boot සහ Kafka Streams පාවිච්චි කරලා කරමු.
පියවර 1: Dependencies එකතු කිරීම (pom.xml)
මුලින්ම, ඔයාලගේ pom.xml
එකට මේ dependency ටික add කරගන්න ඕනේ:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<!-- Lombok for boilerplate code reduction -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- Test dependencies (optional for basic setup) -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
පියවර 2: application.properties Configuration
දැන් application.properties
එකට Kafka Configuration ටික දාගමු:
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.streams.application-id=transaction-aggregator-app
spring.kafka.streams.properties.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.kafka.streams.properties.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.kafka.streams.state.dir=/tmp/kafka-streams
spring.kafka.streams.properties.commit.interval.ms=1000
bootstrap-servers
: ඔයාලගේ Kafka broker එක තියෙන තැන.application-id
: මේක unique වෙන්න ඕනේ. මේක තමයි Consumer Group ID එක විදියට පාවිච්චි වෙන්නේ.default.key.serde
,default.value.serde
: Data Serialization/Deserialization වලට පාවිච්චි කරන classes.state.dir
: Kafka Streams state store එකට data save කරන location එක.commit.interval.ms
: Consumer offsets Kafka වලට commit කරන interval එක. Real-time applications වලට මේක අඩු අගයක් දෙනවා.
පියවර 3: Transaction Model එක (Java Class)
අපි සරල Transaction model එකක් හදාගමු. මේක JSON format එකට serialize/deserialize වෙන්න ඕනේ.
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Transaction {
@JsonProperty("transactionId")
private String transactionId;
@JsonProperty("userId")
private String userId;
@JsonProperty("amount")
private double amount;
@JsonProperty("timestamp")
private long timestamp;
// A simple method to serialize/deserialize for demonstration
// In a real application, consider a dedicated custom Serde class
public static Transaction fromJson(String json) {
try {
return new ObjectMapper().readValue(json, Transaction.class);
} catch (Exception e) {
throw new RuntimeException("Error deserializing transaction: " + json, e);
}
}
public String toJson() {
try {
return new ObjectMapper().writeValueAsString(this);
} catch (Exception e) {
throw new RuntimeException("Error serializing transaction", e);
}
}
}
lombok
dependencies add කරගන්න අමතක කරන්න එපා. නැත්නම් getters/setters manual generate කරන්න වෙනවා.ObjectMapper
එක පාවිච්චි කරලා JSON String එකක්Transaction
Object එකකට convert කරන හැටි සහ අනෙක් පැත්තට කරන හැටිත් මම මෙතන පෙන්නුවා.
පියවර 4: Kafka Streams Processor එක
මේක තමයි අපේ ප්රධාන Logic එක. මෙතනදි අපි Kafka Streams KStream
එකක් build කරලා data aggregate කරනවා.
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Component
public class TransactionAggregatorStream {
private static final String INPUT_TOPIC = "transactions-input";
private static final String OUTPUT_TOPIC = "user-transaction-counts-output";
@Autowired
private StreamsBuilder streamsBuilder;
@PostConstruct
public void buildProcessingGraph() {
// Custom Serde for Transaction object
final Serde<Transaction> transactionSerde = Serdes.serdeFrom(
(topic, data) -> {
if (data == null) return null;
return data.toJson().getBytes();
},
(topic, data) -> {
if (data == null) return null;
return Transaction.fromJson(new String(data));
}
);
KStream<String, String> rawTransactionsStream = // Input key is String (userId or null), Value is String (JSON)
streamsBuilder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), Serdes.String()));
KTable<String, Long> userTransactionCounts =
rawTransactionsStream
.map((key, value) -> {
Transaction transaction = Transaction.fromJson(value);
// Re-key the stream by userId before grouping for aggregation
return new KeyValue<>(transaction.getUserId(), transaction);
})
.groupByKey(Consumed.with(Serdes.String(), transactionSerde)) // Group by userId
.count(Materialized.as("user-transaction-counts-store")); // State store name
userTransactionCounts.toStream()
.to(OUTPUT_TOPIC, Produced.with(Serdes.String(), Serdes.Long()));
System.out.println("Kafka Streams Topology built successfully for transaction aggregation.");
}
}
පියවර 5: Main Application Class
අපේ Spring Boot Application එක ආරම්භ කරන Class එක.
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafkaStreams;
@SpringBootApplication
@EnableKafkaStreams
public class KafkaStreamsApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaStreamsApplication.class, args);
}
}
මේ Code එකෙන් වෙන්නේ මොකක්ද?
@EnableKafkaStreams
නිසා Spring Boot එක Kafka Streams infrastructure එක configure කරනවා.TransactionAggregatorStream
class එක@Component
එකක් නිසා Spring Boot එකෙන් ඒක instantiate කරනවා.@PostConstruct
annotation එක නිසා application එක initialize වෙනකොටbuildProcessingGraph()
method එක execute වෙනවා.transactionSerde
කියන එකෙන් අපි අපේTransaction
object එකString
එකකට convert කරන්න (සහ අනෙක් පැත්තට) custom serializer/deserializer එකක් හදාගත්තා. Kafka Streams වලදී data process කරනකොට ඒ data වල format එක Serde (Serializer/Deserializer) එකක් විදියට specify කරන්න ඕනේ.streamsBuilder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), Serdes.String()))
කියන line එකෙන්transactions-input
කියන topic එකෙන් එන dataKStream
එකක් විදියට ගන්නවා. මේකේ Key එකString
එකක්, Value එකString
(JSON) එකක්..map((key, value) -> ...)
: මෙතනදි අපි input stream එකේString
JSON value එකTransaction
object එකකට deserialize කරලා, ඒTransaction
object එකේuserId
එක අලුත් Key එක විදියට set කරනවා. මේKeyValue
operation එකෙන් Stream එක re-key කරනවා..groupByKey(Consumed.with(Serdes.String(), transactionSerde))
: මේකෙන් කරන්නේKStream
එකේ records ටික අලුත් Key (userId
) එක අනුව group කරන එක. මෙතනදි අපි customtransactionSerde
එක පාවිච්චි කරනවා, මොකද දැන් group වෙන්නේuserId
සහTransaction
object එක..count(Materialized.as("user-transaction-counts-store"))
: Group කරපු records වල ගණන (count) ගන්නවා. මේකKTable
එකක් විදියට return කරනවා, ඒකuser-transaction-counts-store
කියන State Store එකේ maintain කරනවා. මේ State Store එක Local Disk එකේ තියෙන්න පුළුවන් (අපේapplication.properties
වල specify කරපුstate.dir
එකේ) නැත්නම් Remote storage එකක (RocksDB වගේ). මේක තමයි Kafka Streams වල State Management වල බලය. Event එකක් ආවම කලින් state එක මතක තියාගෙන, ඒක update කරන්න පුළුවන්.userTransactionCounts.toStream().to(OUTPUT_TOPIC, Produced.with(Serdes.String(), Serdes.Long()))
: අවසාන වශයෙන්, අපිKTable
එකKStream
එකකට convert කරලා, ඒකuser-transaction-counts-output
කියන topic එකට publish කරනවා. මේ output එකේ Key එකuserId
එක, Value එකLong
(count) එක.
මේක Run කරන්නේ කොහොමද?
- මුලින්ම Kafka Broker එකක් run වෙලා තියෙන්න ඕනේ (
localhost:9092
). transactions-input
සහuser-transaction-counts-output
කියන topics දෙක create කරගන්න. Command line එකෙන් මේ වගේ:kafka-topics --create --topic transactions-input --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
kafka-topics --create --topic user-transaction-counts-output --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1- Application එක run කරන්න (IDE එකකින් හෝ
mvn spring-boot:run
command එකෙන්). - දැන් ඔයාලට පුළුවන්
transactions-input
topic එකට sample data publish කරන්න. (උදා:kafka-console-producer --broker-list localhost:9092 --topic transactions-input
)- Example Input (paste one by one, press Enter after each):
{"transactionId":"T001","userId":"user1","amount":100.0,"timestamp":1678886400000}
{"transactionId":"T002","userId":"user2","amount":250.0,"timestamp":1678886401000}
{"transactionId":"T003","userId":"user1","amount":50.0,"timestamp":1678886402000}
{"transactionId":"T004","userId":"user3","amount":120.0,"timestamp":1678886403000}
{"transactionId":"T005","userId":"user2","amount":80.0,"timestamp":1678886404000}
{"transactionId":"T006","userId":"user1","amount":200.0,"timestamp":1678886405000}
- Example Input (paste one by one, press Enter after each):
user-transaction-counts-output
topic එක consume කරලා බලන්න Output එක එනවද කියලා. (උදා:kafka-console-consumer --bootstrap-server localhost:9092 --topic user-transaction-counts-output --from-beginning --property print.key=true --property key.separator=:
)- Expected Output (might show intermediate counts for each key update):
user1:1
user2:1
user1:2
user3:1
user2:2
user1:3
- Expected Output (might show intermediate counts for each key update):
මේකෙන් ඔයාලට තේරෙන්න ඕනේ Real-time data එකක් කොහොමද Stream එකක් විදියට Process කරලා, අවශ්ය විදියට Aggregate කරලා, නැවත වෙන topic එකකට publish කරන්නේ කියලා. මේක නිකන්ම නිකන් "Count" එකක් වුණාට, මේක ඇතුලේ තියෙන Logic එකෙන් කොච්චර දේවල් කරන්න පුළුවන්ද කියලා හිතන්න!
ප්රයෝජන සහ යෙදුම් (Benefits and Use Cases)
දැන් ඔයාලට අදහසක් එන්න ඇති Spring Boot සහ Kafka Streams එකට පාවිච්චි කරන එකේ වටිනාකම. මේකෙන් ලැබෙන ප්රධාන ප්රයෝජන ටිකක් ගැන කතා කරමු:
- Scalability: Kafka Streams Applications horizontally scale කරන්න පුළුවන්. එකම Application ID එකෙන් instances ගණනාවක් run කරාම Kafka Stream එක automatically workload එක partition කරලා share කරනවා.
- Fault Tolerance: යම් instance එකක් fail වුනොත්, Kafka Streams framework එක automatically ඒක identify කරලා, workload එක වෙන instance එකකට transfer කරනවා. Data Loss වීමක් වෙන්නේ නෑ.
- Low Latency: Real-time processing නිසා data ආපු ගමන්ම Process වෙලා, Output එක ක්ෂණිකව ලැබෙනවා. Milliseconds ගණනක් ඇතුලත.
- Stateful Processing: State Store එක පාවිච්චි කරලා කලින් ආපු data මතක තියාගෙන, ඒ අනුව වර්තමාන data process කරන්න පුළුවන්. අපේ උදාහරණයේදී වගේ Transaction Count එක maintain කරන එක.
- Simple Development: Spring Boot එකත් එක්ක Kafka Streams integration එක ගොඩක් සරලයි. Boilerplate code අඩුයි.
Real-world Use Cases (අපේ ලංකාවේ වුණත්):
- Real-time Analytics Dashboard: Online sales, website traffic වගේ දේවල් වලට instantly analytics generate කරන එක. E-commerce platforms වලට මේක ගොඩක් වැදගත්. උදාහරණයක් විදියට, Daraz, PickMe Food වගේ platforms වලට ඒ වෙලාවෙම තියෙන Orders, Active users ගණන dashboard එකක බලන්න මේක පාවිච්චි කරන්න පුළුවන්.
- Fraud Detection: බැංකු සහ මුල්ය ආයතන වල transactions වල pattern එක analyze කරලා, අසාමාන්ය transaction එකක් වුනොත් instantly detect කරන එක. මේක අපේ රටේ බැංකු සහ Credit Card companies වලට අතිශය වැදගත්.
- Personalized Recommendations: User activities (views, purchases) analyze කරලා, ඒ වෙලාවෙම ඒ අයට recommend කරන්න පුළුවන් දේවල් suggest කරන එක. Netflix, Amazon වගේ platforms කරන විදියට. Dialog, Mobitel වගේ සමාගම් වලට මේක පාවිච්චි කරලා personalized data packages හෝ offers දෙන්න පුළුවන්.
- IoT Data Processing: Sensors වලින් එන data (temperature, pressure, GPS coordinates) වගේ දේවල් Real-time process කරලා anomalies detect කරන එක. Smart agriculture, smart cities වගේ project වලට මේවා ගොඩක් ප්රයෝජනවත්.
- Log Monitoring and Alerting: Server logs analyze කරලා, critical errors වුනොත් instantly alerts යවන එක. ඕනෑම large-scale software system එකකට මේක අත්යවශ්යයි.
මේ හැම දේටම Kafka Streams සහ Spring Boot කියන්නේ කදිම සංකලනයක්.
නිගමනය
ඉතින් යාළුවනේ, මේ ලිපියෙන් මම ඔයාලට Real-time Data Processing වල වැදගත්කම, Apache Kafka Streams සහ Spring Boot එකට පාවිච්චි කරලා කොහොමද data aggregate කරන්නේ කියලා පොඩි අදහසක් දෙන්න උත්සාහ කළා. මේක හරියට විශාල ගඟක වතුර ගලනකොට, ඒ වතුරේ තියෙන අපද්රව්ය පෙරලා, පිරිසිදු වතුර අවශ්ය තැනට හරවනවා වගේ වැඩක්. Technology ලෝකයේ මේ වගේ Real-time solutions වලට තියෙන ඉල්ලුම දවසින් දවස වැඩි වෙනවා.
මේ subject එක ගොඩක් විශාලයි. මම දීපු උදාහරණය සරල එකක් වුණත්, මේක ඔයාලට වැඩිදුර අධ්යයනය කරන්න හොඳ ආරම්භයක් වෙයි කියලා මම හිතනවා. Kafka Streams API එකේ තවත් ගොඩක් operations තියෙනවා (join, windowing, transform, branch වගේ). ඒ හැම එකක් ගැනම ඉගෙනගෙන, තව complex use cases හදා බලන්න ඔයාලට පුළුවන්.
ඔයාලත් මේක try කරලා බලන්න. මොනවා හරි ප්රශ්න තියෙනවා නම්, මේ ගැන තව දැනගන්න කැමති නම්, පහලින් comment එකක් දාන්න. මට පුළුවන් විදියට උදව් කරන්නම්. එහෙනම්, තවත් මෙවැනිම ලිපියකින් හමුවෙමු! සුභ දවසක්!