Spring Boot හා KSQL: Real-time Data Processing වලට අලුත් පාරක් | Stream Processing Guide

ආයුබෝවන් යාළුවනේ! කොහොමද ඉතින්?
අද අපි කතා කරන්න යන්නේ software engineering ලෝකයේ ගොඩක් දෙනෙක්ට ප්රයෝජනවත් වෙන, ටිකක් අලුත් විෂය ක්ෂේත්රයක් ගැන. ඒ තමයි Real-time data processing. අපේ දවසේ දත්ත (data) කියන්නේ රත්තරං වගේ. හැබැයි ඒ දත්ත වලින් උපරිම ප්රයෝජන ගන්න නම්, ඒවා තේරුම් ගන්න විතරක් මදි, වෙලාවට වැඩේට යොදවන්නත් ඕනේ. හරියට වෙලාවට එන බස් එක වගේ. නේද?
අද වෙද්දි Internet of Things (IoT), E-commerce, Financial transactions වගේ දේවල් නිසා දත්ත ගලාගෙන එන්නේ (streams) නතර වෙන්නේ නැති ගඟක් වගේ. මේවා සාමාන්ය databases වල දාලා පස්සේ analyze කරනවට වඩා, එන ගමන්ම analyse කරලා තීරණ ගන්න එක තමයි Smart වෙන්නේ. ඒකට තමයි stream processing කියන්නේ.
අපි අද මේ කතාවට Spring Boot වගේම, Apache Kafka සහ ඒකත් එක්ක වැඩ කරන KSQL කියන powerful tool එකක් ගැනත් එකතු කරගන්නවා. මේක ටිකක් අලුත් දෙයක් වුණත්, දැනගෙන ඉන්න එක ඔයාගේ career එකට ලොකු boost එකක් වෙයි. එහෙනම් අපි පටන් ගමු නේද?
Kafka සහ KSQL - මොනවද මේ?
Apache Kafka: දත්ත ගංවතුරට පාලම
මුලින්ම අපි Kafka ගැන පොඩ්ඩක් බලමු. Apache Kafka කියන්නේ distributed streaming platform එකක්. සරලව කිව්වොත්, data streams handling කරන්න හදපු high-performance, fault-tolerant system එකක්. මේක Messages publish කරන්න, subscribe කරන්න, store කරන්න වගේම process කරන්නත් පුළුවන්. ඒක හරියට මහා පරිමාණ data pipeline එකක් වගේ. ලොකු සමාගම් වල දත්ත හුවමාරු කරගන්න මේක ගොඩක් භාවිතා කරනවා.
- Publish-Subscribe Model: Producer කෙනෙක් data Kafka topic එකකට publish කරනවා, Consumer කෙනෙක් ඒකෙන් subscribe කරලා ගන්නවා.
- Durability: Data loss වෙන්නේ නැහැ. Hard disk එකේ store කරනවා.
- Scalability: ඕන තරම් servers එකතු කරලා capacity වැඩි කරන්න පුළුවන්.
- High Throughput: තප්පරේට ගොඩාක් data messages process කරන්න පුළුවන්.
KSQL: Kafka වලට SQL
දැන් එතකොට KSQL කියන්නේ මොකක්ද? KSQL කියන්නේ Apache Kafka Streams API එක උඩ හදපු SQL-like interface එකක්. හිතල බලන්න, ඔයා relational database එකක SQL queries ලියනවා වගේ, Kafka topic වල එන real-time data stream කරන්න පුළුවන් නම්? ඒක තමයි KSQL වලින් කරන්නේ!
KSQL වලින් ඔයාට පුළුවන් Kafka topics වලින් එන data filter කරන්න, transform කරන්න, join කරන්න, aggregations කරන්න වගේ දේවල් කරන්න. ඒක stream processing වලට අලුත් පහසුවක් ගෙනාවා. ඒකට Programming knowledge එච්චරම ඕනේ නැහැ, SQL දන්නවා නම් ඇති.
Spring Boot එකට Kafka ගලපගමු
අපි හැමෝම දන්නවා Spring Boot කියන්නේ Java applications ඉක්මනට, පහසුවෙන් හදන්න පුළුවන් framework එකක් කියලා. Spring Boot, Kafka එක්ක integrate කරන්න පුදුම විදිහට පහසුයි. Spring Framework එකේ තියෙන spring-kafka
project එකෙන් මේ වැඩේ පහසුවෙන්ම කරගන්න පුළුවන්. ඔයාට Producer කෙනෙක් හරි Consumer කෙනෙක් හරි හදන එකට වැඩි code ප්රමාණයක් ලියන්න ඕනේ නැහැ.
// Example Kafka Producer in Spring Boot
@Service
public class KafkaProducerService {
private final KafkaTemplate<String, String> kafkaTemplate;
public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
System.out.println("Message sent: " + message + " to topic: " + topic);
}
}
මේ වගේම Spring Boot Consumer කෙනෙක්ට පුළුවන් Kafka topics වලින් data consume කරන්න. මෙහෙම data produce කරන එක සහ consume කරන එක තමයි KSQL stream processing වලට අවශ්ය දත්ත සැපයීම සහ ප්රතිඵල ලබාගැනීම කරන්නේ.
KSQL Queries බලමු - Practical Examples!
දැන් අපි KSQL වල නියම වැඩ ටිකට බහිමු! KSQL වල ප්රධාන concepts දෙකක් තියෙනවා: STREAM සහ TABLE. මේ දෙක SQL වල වගේම හැසිරුණත්, stream processing වලට විශේෂයි.
- STREAM: දත්ත ගලාගෙන එන අඛණ්ඩ ප්රවාහයක්. දත්ත එකතු වෙන පිළිවෙළට process කරනවා. (e.g., Log events, Sensor readings, Order placements)
- TABLE: Stream එකකින් එන දත්ත වල state එකක් maintain කරනවා. key එකක් මත පදනම් වෙලා අලුත්ම value එක update කරනවා. (e.g., User profiles, Latest stock prices)
KSQL CLI එකට ගිහින් බලමු
KSQL CLI (Command Line Interface) එකෙන් තමයි අපි මේ queries execute කරන්නේ. ඔයාට Confluent Platform එක set up කරගත්තා නම්, ksql
command එකෙන් CLI එකට යන්න පුළුවන්.
1. Stream එකක් හදමු: (Creating a Stream)
මුලින්ම අපි raw data stream එකක් හදමු. හිතන්නකෝ, අපේ website එකේ user clicks ගැන data එනවා කියලා. ඒ data user_clicks
කියන Kafka topic එකේ JSON format එකෙන් තියෙනවා.
CREATE STREAM user_clicks_stream (
user_id VARCHAR,
page_id VARCHAR,
click_time BIGINT
) WITH (
KAFKA_TOPIC='user_clicks',
VALUE_FORMAT='JSON',
PARTITIONS=1
);
මේ query එකෙන් කරන්නේ user_clicks
topic එකෙන් data අරගෙන user_clicks_stream
කියලා KSQL stream එකක් හදන එක. අපි define කරනවා column names සහ data types මොනවද කියලා. VALUE_FORMAT='JSON'
කියන්නේ එන data JSON format එකෙන් කියන එකයි.
2. Stream එකක් බලමු: (Peeking into a Stream)
දැන් මේ stream එකේ මොනවද යන්නේ කියලා බලන්න පුළුවන් මේ විදිහට:
SELECT user_id, page_id FROM user_clicks_stream EMIT CHANGES LIMIT 5;
EMIT CHANGES
කියන්නේ අලුතින් එන data පෙන්නන්න කියන එකයි. LIMIT 5
කියන්නේ මුල්ම පේළි 5 විතරක් පෙන්නන්න කියන එකයි.
3. දත්ත පෙරීම: (Filtering Data)
අපිට ඕනේ blog page එකට ආපු clicks විතරක් බලන්න. ඒක KSQL වලින් කරන්න පුළුවන් SQL වල WHERE clause එක වගේමයි:
CREATE STREAM blog_page_clicks AS
SELECT user_id, click_time
FROM user_clicks_stream
WHERE page_id = 'blog_page'
EMIT CHANGES;
මේකෙන් අලුත් stream එකක් හැදෙනවා blog_page_clicks
කියලා, ඒකට යන්නේ 'blog_page' එකට අදාළ clicks විතරයි.
4. දත්ත ගණනය කිරීම (Aggregations and Grouping)
දැන් අපි බලමු, හැම විනාඩියකටම blog page එකට එන clicks කීයක්ද කියලා. මේකට අපි WINDOWING concept එක භාවිතා කරනවා. Windowing කියන්නේ stream එකේ දත්ත ටිකක් තෝරාගත් කාල සීමාවක් තුළ එකතු කරලා analyze කරන එක.
CREATE TABLE blog_clicks_per_minute AS
SELECT page_id, COUNT(*) AS total_clicks
FROM blog_page_clicks
WINDOW TUMBLING (SIZE 1 MINUTE)
GROUP BY page_id
EMIT CHANGES;
WINDOW TUMBLING (SIZE 1 MINUTE)
කියන්නේ හැම විනාඩියකටම (non-overlapping windows) data analyze කරන්න කියන එකයි. GROUP BY page_id
කියන්නේ page_id එක අනුව group කරන්න කියන එකයි. මෙතනදී අපි STREAM එකකින් TABLE එකක් හැදුවා, මොකද අපිට ඕනේ මේ window එක ඇතුලේ අන්තිම aggregate value එක maintain කරන්න.
5. Stream දෙකක් Join කිරීම: (Joining Streams)
හිතන්න, ඔයාගේ ළඟ user_clicks_stream
එක වගේම, user_profiles
කියන Kafka topic එකේ users ලගේ details තියෙනවා කියලා. අපිට පුළුවන් user clicks එක්ක user profile data join කරන්න.
මුලින්ම user_profiles
කියන topic එකෙන් TABLE එකක් හදාගමු. මේකේ user_id
එක key එක විදිහට තියෙනවා කියලා හිතමු.
CREATE TABLE user_profiles_table (
user_id VARCHAR PRIMARY KEY,
user_name VARCHAR,
region VARCHAR
) WITH (
KAFKA_TOPIC='user_profiles',
VALUE_FORMAT='JSON'
);
CREATE STREAM enriched_clicks AS
SELECT
c.user_id,
p.user_name,
p.region,
c.page_id,
c.click_time
FROM user_clicks_stream c
INNER JOIN user_profiles_table p ON c.user_id = p.user_id
EMIT CHANGES;
මේ query එකෙන් කරන්නේ user_clicks_stream
එක user_profiles_table
එකත් එක්ක join කරලා, click එකත් එක්ක user name එක සහ region එකත් අලුත් stream එකකට (enriched_clicks
) එකතු කරන එක.
Real-world Scenario එකක්: E-commerce Order Analytics
අපි හිතමු ඔයා online store එකක් හදනවා කියලා. හැම order එකක්ම Kafka topic එකකට යනවා. අපිට ඕනේ මේ order stream එක analyze කරලා, popular products මොනවද, වැඩිපුර orders එන්නේ කොයි region එකෙන්ද වගේ දේවල් real-time බලන්න. Spring Boot app එකෙන් order data Kafka එකට දානවා, KSQL ඒක process කරනවා.
1. Order Stream එක හදමු:
Spring Boot application එකෙන් orders
topic එකට මේ වගේ JSON data දානවා කියලා හිතමු:
{
"orderId": "ORD001",
"productId": "PROD005",
"quantity": 2,
"price": 1500.00,
"userId": "USER123",
"region": "Western",
"timestamp": 1678886400000
}
දැන් KSQL වලින් මේක stream එකක් විදිහට හදාගමු:
CREATE STREAM orders_raw (
orderId VARCHAR,
productId VARCHAR,
quantity INT,
price DOUBLE,
userId VARCHAR,
region VARCHAR,
timestamp BIGINT
) WITH (
KAFKA_TOPIC='orders',
VALUE_FORMAT='JSON'
);
2. ජනප්රිය නිෂ්පාදන සොයාගැනීම:
හැම පැයකටම වැඩිපුරම විකිණෙන භාණ්ඩ මොනවද කියලා හොයන්න පුළුවන්:
CREATE TABLE popular_products_hourly AS
SELECT
productId,
COUNT(*) AS totalOrders,
SUM(quantity) AS totalQuantitySold
FROM orders_raw
WINDOW TUMBLING (SIZE 1 HOUR)
GROUP BY productId
EMIT CHANGES;
මේ query එක popular_products_hourly
කියන TABLE එකක් හදනවා. ඒක හැම පැයකටම, productId
එක අනුව, orders ගණනයි, විකිණුණ ප්රමාණයයි calculate කරලා update කරනවා. මේ data Spring Boot app එකකින් consume කරලා Dashboard එකක පෙන්නන්න පුළුවන්.
3. කලාපීය විකුණුම් විශ්ලේෂණය:
ඒ වගේම, හැම දවසකම වැඩිපුරම orders එන්නේ මොන region එකෙන්ද කියලා බලන්නත් පුළුවන්:
CREATE TABLE regional_sales_daily AS
SELECT
region,
COUNT(*) AS totalOrders,
SUM(price * quantity) AS totalRevenue
FROM orders_raw
WINDOW TUMBLING (SIZE 24 HOURS)
GROUP BY region
EMIT CHANGES;
මේ විදිහට KSQL වලින් complex real-time analytics කරන්න පුළුවන්, SQL syntax එක භාවිතා කරලා. මේවා හදලා ඉවර උනාම, Spring Boot applications වලට පුළුවන් මේ KSQL වලින් හදන output topics subscribe කරලා, dashboard වලට, alert systems වලට data supply කරන්න.
අවසාන වශයෙන්:
ඉතින් යාළුවනේ, අපි අද Spring Boot, Apache Kafka සහ KSQL කියන ත්රිත්වය ගැන කතා කළා. Real-time data processing කියන එක අද ලෝකයේ නැතුවම බැරි දෙයක් වෙලා. Spring Boot වලින් ඔයාගේ application එක පහසුවෙන් හදාගන්න පුළුවන් වගේම, Kafka වලින් විශ්වාසවන්තව data stream කරන්න පුළුවන්. ඒ වගේම KSQL වලින් ඒ data stream එක SQL වගේ පහසුවෙන් analyse කරන්න, transform කරන්න පුළුවන්.
මේ concepts ටික අලුත් නම්, පොඩ්ඩක් explore කරන්න, අත ගහලා බලන්න. Confluent Community Edition එක download කරලා install කරගත්තා නම් ඔයාට KSQL CLI එකත් එක්කම practice කරන්න පුළුවන්. ඒක ඔයාගේ skillset එකට අලුත් Dimension එකක් එකතු කරයි, ගොඩක් වටින දෙයක් වෙයි.
මේ ලිපිය ගැන ඔයාගේ අදහස් මොනවද? ඔයා මේ ගැන දැනගෙන හිටියද? KSQL පාවිච්චි කරලා තියෙනවද? පහළින් comment එකක් දාලා යන්න අමතක කරන්න එපා. අපි තවත් මේ වගේ දේවල් ගැන කතා කරමු!
සැමට ජය!