Spring Boot හා Debezium CDC: Real-time Data Updates Kafka සමග

Spring Boot හා Debezium CDC: Real-time Data Updates Kafka සමග

ආයුබෝවන් කොහොමද ඔයාලට! 😊 අද අපි කතා කරමු අලුත්ම වැඩක් ගැන. ඔයාලා දන්නවාද අද කාලේ applications හදනකොට data real-time update වෙන එක කොච්චර වැදගත්ද කියලා? සාමාන්‍යයෙන් අපිට database එකක වෙනස්කම් ගැන දැනගන්න ඕන වුණාම අපි කරන්නේ නිකන්ම නිකන් polling කරන එක නැත්තම් ඒකෙන් data අදින එක. ඒත් ඒක එච්චර smart විදියක් නෙවෙයි, විශේෂයෙන්ම ලොකු scale එකකදී. ඔන්න ඔතනට තමයි Change Data Capture (CDC) කියන concept එක එන්නේ.

මේ article එකෙන් අපි කතා කරන්නේ CDC කියන්නේ මොකක්ද, ඒකට Debezium, Kafka වගේ tools කොහොමද පාවිච්චි කරන්නේ, ඒ වගේම අපේ Spring Boot application එකක් එක්ක මේක කොහොමද integrate කරන්නේ කියලා. සූදානම් වෙන්න, අපි ලෝකෙටම real-time data ගෙනත් දෙමු! 🚀

🤔 CDC කියන්නේ මොකක්ද? (What is Change Data Capture?)

සරලවම කිව්වොත්, CDC කියන්නේ database එකක වෙන හැම වෙනස්කමක්ම (insert, update, delete) අඳුරගෙන ඒවා වෙනම stream එකක් විදියට අනිත් system වලට දැනුම් දෙන ක්‍රමයක්. හිතන්න, ඔයාගේ bank account එකේ transaction එකක් සිද්ධ වුණාම, ඒක වෙනත් system වලට (SMS gateway, email service, analytics dashboard) ක්ෂණිකව දැනගන්න ඕනනේ. ඒ වගේ තැන්වලට තමයි CDC ගොඩක් වැදගත් වෙන්නේ.

සාම්ප්‍රදායිකව දත්ත සමමුහුර්ත (synchronize) කරන්න අපි කළේ ටික ටික වෙලාවට database එක check කරන එක. (ඒ කියන්නේ polling). නැත්නම් complex triggers ලිවීම වගේ දේවල්. ඒත් polling කරනකොට network traffic වැඩිවෙනවා, database එකට බරක් වෙනවා, real-time update වෙන්නේ නැහැ. Triggers ලිවීම එච්චරම flexible නැහැ වගේම database එකේ performance එකටත් බලපෑම් කරන්න පුළුවන්. CDC, විශේෂයෙන්ම log-based CDC, මේ ප්‍රශ්න ගොඩකට විසඳුම් දෙනවා.

Log-based CDC කියන්නේ database එකේ transaction log (උදාහරණයක් විදියට PostgreSQL වල WAL – Write-Ahead Log, MySQL වල binlog) කියවලා ඒ වෙනස්කම් extract කරගන්න එක. database එකට පොඩ්ඩක්වත් හානියක් නැහැ, performance එකට බලපෑමක් නැහැ. ඒක ගොඩක් efficient.

🛠️ Debezium සහ Kafka – දත්ත ගමනට සපෝර්ට් (Debezium and Kafka - Powering the Data Flow)

දැන් අපි බලමු CDC කරන්න අපිට ඕන කරන ප්‍රධාන tools දෙකක් ගැන.

Debezium – දත්ත රහස් පරීක්ෂකයා 🕵️‍♂️

Debezium කියන්නේ open-source platform එකක්. මේක තමයි database එකේ transaction logs කියවලා, සිද්ධවෙන වෙනස්කම් අඳුරගෙන, ඒ data message විදියට එළියට දාන වැඩේ කරන්නේ. Debezium එකේ විවිධ databases වලට connectors තියෙනවා (MySQL, PostgreSQL, MongoDB, SQL Server, Oracle). ඔයාට ඕන database එකට අදාළ connector එක deploy කළාම ඒක database එක monitor කරනවා.

Debezium connector එකක් deploy කරන්නේ Kafka Connect කියන framework එක ඇතුළේ. Kafka Connect කියන්නේ Kafka ecosystem එකේ තියෙන component එකක්, ඒකෙන් data sources වලින් data Kafka topics වලට අරගෙන එන්නත්, Kafka topics වලින් data sinks (වෙනත් systems) වලට යවන්නත් උදව් වෙනවා. ඉතින්, Debezium connector එක Kafka Connect එකේ Source connector එකක් විදියට වැඩ කරනවා.

Kafka – දත්ත මහා මාර්ගය 🛣️

දැන් Debezium අරගෙන එන data messages ගොඩට යවන්න තැනක් ඕනනේ? ඔන්න ඔතනට තමයි Apache Kafka එන්නේ. Kafka කියන්නේ distributed streaming platform එකක්. Debezium capture කරන හැම database change එකක්ම Kafka topic එකකට publish කරනවා. මේ topics වලට ඕනම ගානක් consumers ලට listen කරන්න පුළුවන්.

Kafka වල තියෙන වාසි තමයි: high throughput, fault-tolerance, scalability, සහ persistence. ඒ කියන්නේ data කොච්චර ආවත් handle කරන්න පුළුවන්, එක server එකක් වැටුණත් data නැති වෙන්නේ නැහැ, ඕන තරම් expand කරන්න පුළුවන්, වගේම data ටික කාලයක් topic එකේ තියාගන්නත් පුළුවන්.

ඉතින්, මේ දෙක එකට වැඩ කරනකොට මොකද වෙන්නේ? Debezium database එකේ වෙනස්කම් අඳුරගෙන, ඒ changes Kafka Connect හරහා Kafka topic එකකට දානවා. ඊට පස්සේ අපේ Spring Boot application එක වගේ consumers ලා ඒ Kafka topic එකට listen කරලා, ඒ වෙනස්කම් දැනගත්ත ගමන් ඕන වැඩක් කරන්න පුළුවන්.

💡 Spring Boot එකට CDC ගලපමු (Integrating CDC with Spring Boot)

දැන් තමයි කතාවේ වැදගත්ම ටික. Spring Boot application එකක් ඇතුලේ මේ CDC data consume කරලා අපේ logic එක implement කරන්නේ කොහොමද කියලා.

1. Local Setup එක (Docker!)

මේ වගේ system එකක් local environment එකක set up කරන එක ටිකක් බර වැඩක් වෙන්න පුළුවන් මුලදී. ඒකට හොඳම විසඳුම තමයි Docker. අපි Docker Compose පාවිච්චි කරලා PostgreSQL database එකක්, Zookeeper (Kafka වලට ඕන), Kafka broker එකක්, සහ Kafka Connect (Debezium connector එක deploy කරන්න) එක පාරටම up කරගමු.

ඔයාට මේ වගේ docker-compose.yml file එකක් පාවිච්චි කරන්න පුළුවන් (සම්පූර්ණ code එක මෙතන දාන්නේ නැහැ, නමුත් මේ වගේ structure එකක්):

version: '3.8'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    # ... configurations ...
  kafka:
    image: confluentinc/cp-kafka:7.5.0
    # ... configurations ...
    depends_on:
      - zookeeper
  postgres:
    image: debezium/postgres:16
    # ... configurations ...
  connect:
    image: debezium/connect:2.4
    # ... configurations ...
    depends_on:
      - kafka
      - postgres

Docker Compose up කරලා, ඊට පස්සේ Debezium PostgreSQL connector එක Kafka Connect REST API එක හරහා deploy කරන්න පුළුවන්. ඒක කරන්න පුළුවන් CURL request එකක් පාවිච්චි කරලා. (dbserver1 වගේ නමක් connector එකට දීලා, database details specify කරලා).

Connector එක හරියට deploy වුණාම, ඔයාගේ PostgreSQL database එකේ public schema එකේ tables වලට වෙන changes dbserver1.public.<table_name> කියන format එකට Kafka topics වලට එන්න පටන් ගනීවි.

2. Spring Boot Project එක (Kafka Consumer)

දැන් අපි Spring Boot application එක හදාගමු. මේක තමයි Kafka topic එකට publish වෙන messages consume කරන්නේ.

Dependencies

pom.xml එකට මේ ටික add කරගන්න:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
    </dependency>
    <!-- Optionally, if you need to interact with the DB directly -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-jpa</artifactId>
    </dependency>
    <dependency>
        <groupId>org.postgresql</groupId>
        <artifactId>postgresql</artifactId>
        <scope>runtime</scope>
    </dependency>
</dependencies>

application.properties

Kafka consumer එක configure කරන්න application.properties file එකට මේ ටික එකතු කරන්න:

spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=product-cdc-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# If Debezium sends Avro, you'll need a different deserializer

Kafka Listener

දැන් තමයි වැදගත්ම කොටස. අපි @KafkaListener annotation එක පාවිච්චි කරලා Kafka topic එකෙන් messages consume කරමු. Debezium messages JSON format එකෙන් එන්නේ. ඒකේ before, after, op (operation type: c-create, u-update, d-delete, r-read/snapshot), source වගේ fields තියෙනවා.

හිතන්න අපිට products කියන table එකේ වෙන changes listen කරන්න ඕන කියලා. ඒ table එකේ id, name, price වගේ columns තියෙනවා.

package com.scguide.cdc.listener;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class ProductCdcListener {

    private final ObjectMapper objectMapper = new ObjectMapper();

    @KafkaListener(topics = "dbserver1.public.products", groupId = "product-cdc-group")
    public void listenProductChanges(String message) {
        try {
            JsonNode rootNode = objectMapper.readTree(message);
            JsonNode payload = rootNode.get("payload");

            if (payload == null) {
                System.out.println("Received message with no payload: " + message);
                return;
            }

            String operationType = payload.get("op").asText(); // c, u, d, r

            JsonNode afterNode = payload.get("after");
            JsonNode beforeNode = payload.get("before");

            System.out.println("Operation Type: " + operationType);

            switch (operationType) {
                case "c": // Create
                    if (afterNode != null) {
                        System.out.println("Product Created: " + afterNode.toPrettyString());
                        // Logic for new product: e.g., send notification, update search index
                    }
                    break;
                case "u": // Update
                    if (beforeNode != null && afterNode != null) {
                        System.out.println("Product Updated - Before: " + beforeNode.toPrettyString());
                        System.out.println("Product Updated - After: " + afterNode.toPrettyString());
                        // Logic for product update: e.g., invalidate cache, update analytics
                        // You can compare beforeNode and afterNode to see what changed
                    }
                    break;
                case "d": // Delete
                    if (beforeNode != null) {
                        System.out.println("Product Deleted: " + beforeNode.toPrettyString());
                        // Logic for deleted product: e.g., remove from cache, archive
                    }
                    break;
                case "r": // Read / Snapshot
                    if (afterNode != null) {
                        System.out.println("Product Snapshot Read: " + afterNode.toPrettyString());
                        // This happens during initial sync
                    }
                    break;
                default:
                    System.out.println("Unknown operation type: " + operationType + " for message: " + message);
            }
        } catch (Exception e) {
            System.err.println("Error processing Kafka message: " + e.getMessage());
            e.printStackTrace();
        }
    }
}

මේ code එකෙන් අපි කරන්නේ:

  1. @KafkaListener එකෙන් dbserver1.public.products topic එකට එන messages අහනවා.
  2. ආපු message එක JSON විදියට parse කරනවා.
  3. Debezium message එකේ payload එක ඇතුළේ තියෙන op (operation type) field එක බලලා, ඒක insert එකක්ද, update එකක්ද, delete එකක්ද කියලා තීරණය කරනවා.
  4. ඒ අනුව before සහ after fields වල තියෙන data (change එකට කලින් සහ පස්සේ data) අරගෙන අපිට ඕන logic එක implement කරනවා.

හිතන්න ඔයාට ඕන product එකක price එක වෙනස් වුණාම ක්ෂණිකවම cache එක update කරන්න. එතකොට ඔයාට මේ ProductCdcListener එක ඇතුළේ u (update) operation එක එනකොට, afterNode එකේ තියෙන new price එක අරගෙන, cache update කරන method එකක් call කරන්න පුළුවන්. ඒක මාර පහසුවක් නේද?

3. Data Table එකක් හදමු (PostgreSQL)

දැන් අපි test කරන්න database එකේ table එකක් හදමු. PostgreSQL command line එකෙන් වගේ මේක කරන්න පුළුවන්:

CREATE TABLE products (
    id SERIAL PRIMARY KEY,
    name VARCHAR(255) NOT NULL,
    price DECIMAL(10, 2) NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- Add some data
INSERT INTO products (name, price) VALUES ('Laptop', 1200.00);
INSERT INTO products (name, price) VALUES ('Mouse', 25.00);

දැන් ඔයා මේ table එකට වෙනස්කම් කරලා බලන්න. උදාහරණයක් විදියට:

UPDATE products SET price = 1250.00 WHERE name = 'Laptop';
INSERT INTO products (name, price) VALUES ('Keyboard', 75.00);
DELETE FROM products WHERE name = 'Mouse';

ඔයාගේ Spring Boot application එකේ console එක බලන්න, ඒ වෙනස්කම් real-time එකේදී detect කරලා print වෙනවා දැකගන්න පුළුවන්. ඒක මාර සිරා වැඩක්!

➕ වාසි සහ අභියෝග (Advantages and Challenges)

ඕනම තාක්ෂණයක වගේම CDC වලත් වාසි අවාසි දෙකම තියෙනවා.

වාසි (Advantages):

  • Real-time data synchronization: දත්ත වෙනස් වුණ ගමන්ම අනිත් system වලට දැනුම් දෙන්න පුළුවන්.
  • Decoupling services: Microservices architecture එකකදී සේවාවන් අතර දත්ත සමමුහුර්ත කරන්න හොඳ ක්‍රමයක්. එක් සේවාවක් අනිකට කෙලින්ම බලපාන්නේ නැහැ.
  • Auditing and Compliance: දත්තවල වෙනස්කම් ගැන complete record එකක් තියාගන්න පුළුවන්.
  • Scalability: Kafka වගේ distributed system එකක් නිසා ඕනෑම level එකකට scale කරන්න පුළුවන්.
  • Minimal impact on source DB: Transaction logs කියවන නිසා database performance එකට ලොකු බලපෑමක් වෙන්නේ නැහැ.
  • Event-driven architecture: දත්ත වෙනස්වීම් events විදියට handle කරන්න පුළුවන් නිසා event-driven systems හදන්න හොඳට ගැලපෙනවා.

අභියෝග (Challenges):

  • Complexity of setup: මුලින්ම Debezium, Kafka, Kafka Connect වගේ දේවල් configure කරගන්න ටිකක් මහන්සි වෙන්න වෙනවා.
  • Schema changes: Source database එකේ schema එක වෙනස් වුණොත් (උදා: column එකක් add/remove කළොත්) Debezium connector එක update කරන්න සිද්ධ වෙන්න පුළුවන්, නැත්නම් consumer එකේ code එකත් වෙනස් කරන්න වෙන්න පුළුවන්.
  • Data consistency: At-least-once delivery වගේ දේවල් නිසා duplicate messages එන්න පුළුවන්. ඒ වගේ තැන්වලදී consumer logic එක idempotent වෙන්න ඕන. (එකම message එක කීප සැරයක් ආවත් අවුලක් වෙන්නේ නැති විදියට).
  • Monitoring and Troubleshooting: Distributed system එකක් නිසා ගැටලු ආවොත් diagnose කරන එක ටිකක් අමාරු වෙන්න පුළුවන්.

අවසාන වශයෙන් (Wrapping Up)

හිතන්න, ඔයාලගේ e-commerce site එකේ product එකක price එක වෙනස් වුණ ගමන්, website එකේ front-end එක refresh නොකරම update වෙනවා, ඒ වගේම marketing team එකට auto-generated email එකක් යනවා, analytics dashboard එකේ data refresh වෙනවා. ඒ හැමදේම සිද්ධ වෙන්නේ database එකේ වෙනස්කම ක්ෂණිකව detect කරලා. ඒකට තමයි මේ CDC කියන concept එක අපිට උදව් කරන්නේ.

Spring Boot එක්ක Debezium සහ Kafka එකතු කරලා අපි කොහොමද database changes real-time stream කරන්නේ කියලා අපි මේ article එකෙන් කතා කළා. මේක microservices, data synchronization, auditing, real-time analytics වගේ ගොඩක් දේවල් වලට ගොඩක්ම වැදගත් වෙනවා.

දැන් ඔයාලට පුළුවන් මේ concepts ඔයාලගේ projects වලට අරගෙන යන්න. Try කරලා බලන්න. ඔයාලට මේ ගැන තියෙන අදහස්, ප්‍රශ්න පහළ comment section එකේ දාගෙන යන්න. අපි උත්තර දෙන්නම්. ඒ වගේම ඔයාලා මේ වගේ setup එකක් පාවිච්චි කරලා තියෙනවා නම්, ඔයාලගේ අත්දැකීම් share කරන්නත් අමතක කරන්න එපා. අපි මේ වගේ තවත් තාක්ෂණික තොරතුරු ඔයාලට අරගෙන එන්න බලාපොරොත්තු වෙනවා. එහෙනම් ආයෙත් දවසක හමුවෙමු! 🫡