Spring Boot Flink: Real-time Data Processing | සිංහලෙන් - SC Guide

Spring Boot Flink: Real-time Data Processing | සිංහලෙන් - SC Guide

අයියෝ, කට්ටිය කොහොමද? ඔයාලා හැමෝම සැප සනීපෙන් ඉන්නවා කියලා හිතනවා. අද අපි කතා කරන්න යන්නේ හරිම ආකර්ෂණීය දෙයක් ගැන. මේ දවස්වල හැමෝම කියන්නේ Real-time Data Processing ගැනනේ. සිරාවටම කිව්වොත්, data කියන්නේ දැන් රත්තරන් වගේ දෙයක්. ඒ වගේම, ඒ data එක instantly, live විදියට process කරන්න පුළුවන් නම්, ඒක නියම power එකක්.

හිතන්න, ක්‍රිකට් මැච් එකක ලකුණු, වෙළඳපොලේ කොටස් මිල, ඔන්ලයින් ඕඩර්ස්, නැත්නම් අපේ දවසෙන් දවස වෙන smart device sensor data – මේ හැමදේම තප්පරෙන් තප්පර අපිට update වෙන්න ඕනේ, නේද? ඒ වගේම ඒ update වෙන ගමන්ම අපිට ඒ data එකෙන් මොනවා හරි තේරුමක් ගන්නත් පුළුවන් වෙන්න ඕනේ. ඉතින්, මේ වගේ ලොකු data ප්‍රමාණයක් අකුණක් වගේ වේගෙන් හසුරුවන්නේ කොහොමද?

ඕකට තමා Spring Boot එකයි, අපේ Apache Flink කියන සුපිරි කොල්ලායි එකතු වෙන්නේ. Spring Boot කියන්නේ Java Application හදන්න තියෙන ලේසිම, වේගවත්ම Framework එකක් කියල ඔයාලා දන්නවනේ. Apache Flink කියන්නේ High-throughput, Low-latency Stream Processing වලට තියෙන පට්ටම Tool එකක්. මේ දෙන්නා එකට අල්ලගෙන වැඩ කරනකොට මොන වගේ වැඩක් වෙයිද? අද මේ blog post එකෙන් අපි බලමු මේ දෙන්නා එකට දාලා කොහොමද real-time data processing වල කප් එක ගහන්නේ කියලා. අද අපි බලමු Flink use කරලා events process කරන හැටිත්.

මුලින්ම බලමු ඇයි මේ දෙන්නා එකට පාවිච්චි කරන්න හොඳම කියලා. ඒ කියන්නේ මේ දෙන්නාගෙම තියෙන විශේෂතා මොනවද? ඒ විශේෂතා එකට එකතු වුනාම අපිට මොන වගේ වාසිද තියෙන්නේ?

Spring Boot – අපේ කම්මැලි නැති යාළුවා

Spring Boot කියන්නේ Java Developersලා අතර හරිම ජනප්‍රිය Framework එකක්. ඒකට හේතු කීපයක්ම තියෙනවා:

  • ඉක්මන් Application Development: Boilerplate code අඩුයි. ඉක්මනටම project එකක් පටන් අරන් වැඩේට බහින්න පුළුවන්.
  • Microservices වලට නියමයි: කුඩා, ස්වාධීන service හදන්න හරිම ලේසියි. මේවා වෙන වෙනම deploy කරන්නත් පුළුවන්.
  • REST API වලට සුපිරි: RESTful APIs හදන්න තියෙන පහසුව නිසා front-end application එක්ක connect වෙන්න පහසුයි.
  • Dependency Management: Project එකට අවශ්‍ය libraries add කරන එක auto-configure කරන නිසා හරිම පහසුයි.

ඉතින්, Spring Boot එකෙන් අපිට පුළුවන් වේගයෙන්, පහසුවෙන් scalable applications හදන්න. ඒ වගේම REST endpoints හරහා data receive කරන්න, Database connect කරන්න වගේ දේවල් වලටත් නියමයි.

Apache Flink කියන්නේ Distributed Stream Processing Framework එකක්. ඒ කියන්නේ විශාල data ප්‍රමාණයක් විවිධ servers මත (distributed) live stream එකක් විදියට process කරන්න පුළුවන්. ඒකෙ තියෙන විශේෂතා ටිකක් බලමු:

  • Real-time Data Processing: Data එන ගමන්ම process කරන නිසා low-latency results ගන්න පුළුවන්. ඒ කියන්නේ "කෙලින්ම වැඩේ" තමා!
  • Stateful Computations: Flink වලට පුළුවන් data stream එකක state එක maintain කරන්න. උදාහරණයක් විදියට, customer කෙනෙක්ගේ කලින් order ගණන මතක තියාගෙන අලුත් order එකක් ආවම ඒක update කරන්න පුළුවන්.
  • Fault Tolerance: මොනවා හරි අවුලක් ගියොත්, system එක automatic restart වෙලා කලින් තිබ්බ තැනින්ම data processing එක පටන් ගන්නවා. Data නැති වෙන්නේ නැහැ.
  • Event Time Processing: Flink වලට පුළුවන් events ඇත්තටම සිද්ධ වුන වෙලාව අනුව process කරන්න. ඒ කියන්නේ data එක එන order එක වැදගත් නැහැ, ඒකේ තියෙන timestamp එක වැදගත්.
  • High Throughput: තප්පරයකට ලක්ෂ ගණන් data records process කරන්න පුළුවන්කම තියෙනවා.

ඉතින්, Spring Boot එකෙන් අපිට පුළුවන් ඉක්මනට, හොඳට හදපු application එකක් හදාගන්න. ඒ application එකට එන data stream එක Flink එකට දීලා, ඒකෙන් ඒ data එක real-time analyse කරලා, අවශ්‍ය නම් ආයෙත් Spring Boot application එකට result එක එවන්න පුළුවන්. මේ දෙන්නාගේ එකතුව කියන්නේ Data Processing වලට නියම combination එකක්. Microservices architecture එකක් ඇතුළේ real-time analytics කරන්න, event-driven architectures හදන්න මේ දෙන්නා සුපිරියටම ගැලපෙනවා.

අපි වැඩේට බහිමු – අවශ්‍ය දේවල් සෙට් කරගනිමු!

හරි, දැන් අපි බලමු මේ වැඩේ පටන් ගන්න අපිට මොනවාද ඕනේ කියලා. මේ ටික ඔයාලා ළඟ තියෙනවා නම් හරි ලේසියි වැඩේට බහින්න:

  • Java Development Kit (JDK): Java 8 හෝ ඊට ඉහළ (Java 11 or 17 recommended).
  • Maven හෝ Gradle: Project එක manage කරන්න. අපි මේ tutorial එකේදී Maven පාවිච්චි කරමු.
  • IDE: IntelliJ IDEA, Eclipse, Spring Tool Suite (STS) වගේ එකක්.

මුලින්ම, අපි Spring Initializr (start.spring.io) එකෙන් අලුත් Spring Boot project එකක් හදාගමු. Dependencies විදියට Spring Web එක add කරගන්න. project එක download කරගෙන, ඔයාලගේ IDE එකෙන් open කරගන්න.

දැන් අපි Flink dependencies ටික pom.xml එකට add කරමු. මේවා තමයි අපිට අවශ්‍ය මූලිකම dependencies:

<dependencies>
    <!-- Spring Boot Web Starter -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <!-- Apache Flink Core API -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.17.1</version> <!-- Flink version එක වෙනස් වෙන්න පුළුවන් -->
        <scope>compile</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java</artifactId>
        <version>1.17.1</version>
        <scope>compile</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients</artifactId>
        <version>1.17.1</version>
        <scope>compile</scope>
    </dependency>

    <!-- Spring Boot Test Starter -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

<version> එක ඔයාලා use කරන Flink version එක අනුව වෙනස් වෙන්න පුළුවන්. ඒක හැමවෙලේම Flink official website එකෙන් check කරගන්න. <scope>compile</scope> කියන්නේ application එක compile කරනකොට මේ libraries ඕනේ කියලා. Flink runtime එක වෙනම deploy කරනවා නම්, මේවා provided scope එකට දාන්න පුළුවන්. හැබැයි අපි මේක Spring Boot app එක ඇතුළෙම run කරන නිසා compile හොඳයි.

Apache Flink වල ප්‍රධානම API එක තමයි DataStream API එක. මේක තමයි live data streams process කරන්න පාවිච්චි කරන්නේ. අපි පොඩ්ඩක් බලමු මේකෙ මූලික සංකල්ප ටිකක්.

  • StreamExecutionEnvironment: මේක තමයි Flink job එකක් execute කරන්න අවශ්‍ය මූලිකම object එක. මේකෙන් තමයි අපි data sources set කරන්නේ, transformations apply කරන්නේ, සහ job එක run කරන්නේ.
  • DataStream: මේක තමයි Flink වල data stream එකක් නියෝජනය කරන්නේ. එන හැම data record එකක්ම මේ stream එක හරහා ගලාගෙන යනවා.
  • Transformations: මේවා තමයි data stream එකක data එක වෙනස් කරන්න හෝ analyse කරන්න පාවිච්චි කරන operations.
    • map(): Stream එකේ හැම record එකක්ම වෙනස් කරනවා (e.g., String එකක් Int එකක් බවට).
    • filter(): Stream එකෙන් සමහර records remove කරනවා (e.g., මිල අඩු orders filter කරන්න).
    • keyBy(): Stream එක key එකක් අනුව group කරනවා. මේක distributed processing වලට හරිම වැදගත්. උදාහරණයක් විදියට, productId එක අනුව data group කරන්න පුළුවන්.
    • window(): Stream එකේ data, time හෝ count එකක් අනුව chunks (windows) වලට කඩනවා. Real-time analytics වලට මේක අත්‍යවශ්‍යයි. උදාහරණයක් විදියට, අවසාන මිනිත්තු 5 ඇතුලත ආපු හැම order එකක්ම.
    • sum()/count()/reduce()/aggregate(): Window එකක් ඇතුළේ data aggregate කරන්න.

සරල Flink job එකක් මේ වගේ වෙන්න පුළුවන්:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;

public class SimpleFlinkJob {
    public static void main(String[] args) throws Exception {
        // 1. StreamExecutionEnvironment එක හදාගන්නවා
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2. Data Source එකක් define කරනවා (මේක demo එකක් නිසා collection එකක්)
        DataStream<String> text = env.fromElements(
                "hello flink",
                "hello world",
                "flink stream processing"
        );

        // 3. Transformation එකක් apply කරනවා: හැම line එකක්ම වචන වලට කඩලා, "hello" තියෙන ඒවා විතරක් print කරනවා
        DataStream<String> filteredWords = text
                .flatMap((value, out) -> {
                    for (String word : value.split(" ")) {
                        out.collect(word);
                    }
                })
                .filter(word -> word.startsWith("hello"));

        // 4. Result එක print කරනවා
        filteredWords.print();

        // 5. Job එක execute කරනවා
        env.execute("Simple Word Filter Job");
    }
}

මේ Code එක Flink job එකක මූලිකම කොටස් පෙන්නනවා. අපි StreamExecutionEnvironment එකක් හදාගෙන, data source එකක් (මේකෙදි static text collection එකක්) දාලා, ඒක flatMap කරලා වචන වලට කඩලා, filter කරලා "hello" වලින් පටන් ගන්න වචන විතරක් අරන්, ඒවා print() කරනවා. අන්තිමට env.execute() කරලා job එක run කරනවා.

හරි, දැන් අපි එමු මේ post එකේ ප්‍රධානම කොටසට. Spring Boot application එකක් ඇතුළේ Flink job එකක් run කරලා, external events process කරන්නේ කොහොමද කියලා අපි බලමු. අපේ exercise එක තමයි "Process events with Flink".

අපි හිතමු අපිට online order events එනවා කියලා. හැම order එකක්ම JSON object එකක් විදියට Spring Boot REST endpoint එකකට එනවා. අපේ Flink job එකෙන් මේ order events real-time process කරලා, සෑම මිනිත්තුවකටම product එකක් අනුව total orders ගණනයි, total sales එකයි calculate කරලා print කරන්න ඕනේ.

1. OrderEvent DTO එක හදාගනිමු

මුලින්ම, එන Order events represent කරන්න OrderEvent කියන Class එක හදාගමු. මේකෙදි timestamp field එක event time processing වලට වැදගත් වෙනවා.

package com.example.flinkspringboot.model;

import java.io.Serializable;

public class OrderEvent implements Serializable {
    private String orderId;
    private String productId;
    private int quantity;
    private double price;
    private long timestamp; // Event time in milliseconds

    public OrderEvent() {}

    public OrderEvent(String orderId, String productId, int quantity, double price, long timestamp) {
        this.orderId = orderId;
        this.productId = productId;
        this.quantity = quantity;
        this.price = price;
        this.timestamp = timestamp;
    }

    // Getters and Setters
    public String getOrderId() { return orderId; }
    public void setOrderId(String orderId) { this.orderId = orderId; }
    public String getProductId() { return productId; }
    public void setProductId(String productId) { this.productId = productId; }
    public int getQuantity() { return quantity; }
    public void setQuantity(int quantity) { this.quantity = quantity; }
    public double getPrice() { return price; }
    public void setPrice(double price) { this.price = price; }
    public long getTimestamp() { return timestamp; }
    public void setTimestamp(long timestamp) { this.timestamp = timestamp; }

    @Override
    public String toString() {
        return "OrderEvent{" +
               "orderId='" + orderId + '\'' +
               ", productId='" + productId + '\'' +
               ", quantity=" + quantity +
               ", price=" + price +
               ", timestamp=" + timestamp +
               '}';
    }
}

Serializable කියන Interface එක implement කරන්න මතක තියාගන්න. ඒක Flink වලට data serialize කරන්න වැදගත්.

සාමාන්‍යයෙන් Flink external systems (Kafka වගේ) වලින් data ගන්නවා. හැබැයි අපේ demo එකට, Spring Boot REST endpoint එකකට එන events ටික Flink job එකට feed කරන්න පුළුවන් Custom Source Function එකක් හදාගමු. මේක BlockingQueue එකකින් data කියවනවා.

package com.example.flinkspringboot.service;

import com.example.flinkspringboot.model.OrderEvent;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public class OrderEventSource extends RichSourceFunction<OrderEvent> {

    private transient volatile boolean isRunning;
    private final BlockingQueue<OrderEvent> queue;

    public OrderEventSource(BlockingQueue<OrderEvent> queue) {
        this.queue = queue;
    }

    @Override
    public void run(SourceContext<OrderEvent> ctx) throws Exception {
        isRunning = true;
        while (isRunning) {
            // Queue එකෙන් event එකක් ගන්නකම් බලන් ඉන්නවා, නැත්නම් පොඩි වෙලාවක් pause වෙනවා
            OrderEvent event = queue.poll(100, TimeUnit.MILLISECONDS);
            if (event != null) {
                ctx.collect(event); // Flink stream එකට event එක add කරනවා
            }
        }
    }

    @Override
    public void cancel() {
        isRunning = false; // Source එක නවත්වන්න සිද්ධ වුනොත් මේක run වෙනවා
    }
}

මෙතනදී RichSourceFunction එකෙන් අපිට පුළුවන් stateful operations වගේ දේවල් කරන්න. BlockingQueue එකක් පාවිච්චි කරන්නේ Spring Boot thread එකෙන් data එක Flink thread එකට safely pass කරන්න.

මේ @Service එකෙන් තමයි Flink job එක initialize කරලා run කරන්නේ. ඒ වගේම Spring Boot REST Controller එකෙන් එන OrderEvent එක OrderEventSource එකට අයිති BlockingQueue එකට දාන්නේ මේ Service එක හරහායි.

package com.example.flinkspringboot.service;

import com.example.flinkspringboot.model.OrderEvent;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.time.Duration;
import java.util.Date;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

@Service
public class FlinkProcessingService implements CommandLineRunner {

    // Spring Boot thread එකෙන් Flink thread එකට data pass කරන්න BlockingQueue එකක්
    private final BlockingQueue<OrderEvent> eventQueue = new LinkedBlockingQueue<>();
    private StreamExecutionEnvironment env;

    @PostConstruct
    public void init() {
        // Flink Execution Environment එක initialize කරනවා
        env = StreamExecutionEnvironment.getExecutionEnvironment();
        // Parallelism එක 1 ට සීමා කරමු Demo එකට (Production වලදී වෙනස් කරන්න)
        env.setParallelism(1);
    }

    // Spring Boot REST Controller එකෙන් මේ method එකට event එකක් දානවා
    public void processOrderEvent(OrderEvent event) {
        eventQueue.offer(event); // Queue එකට event එක දානවා
        System.out.println("Received order event: " + event.getOrderId() + " for Flink processing.");
    }

    @Override
    public void run(String... args) throws Exception {
        // Flink job එක thread එකක් ඇතුළේ run කරනවා Spring Boot main thread එක block නොවෙන්න
        new Thread(() -> {
            try {
                // Custom Source එකෙන් data ගන්නවා
                DataStream<OrderEvent> orders = env.addSource(new OrderEventSource(eventQueue));

                // Watermark Strategy එක define කරනවා Event Time Processing වලට
                // events වල timestamp එක පාවිච්චි කරනවා
                DataStream<OrderEvent> watermarkedOrders = orders.assignTimestampsAndWatermarks(
                    WatermarkStrategy.<OrderEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                        .withTimestampAssigner((event, timestamp) -> event.getTimestamp()));

                // Flink Transformation: Product ID එක අනුව group කරලා,
                // මිනිත්තුවක window එකක් ඇතුළේ orders ගණන් කරනවා
                watermarkedOrders.keyBy(OrderEvent::getProductId)
                        .window(TumblingEventTimeWindows.of(Time.minutes(1))) // 1 minute tumbling window
                        .process(new ProcessWindowFunction<OrderEvent, String, String, TimeWindow>() {
                            @Override
                            public void process(String key, Context context, Iterable<OrderEvent> elements, Collector<String> out) throws Exception {
                                long count = 0;
                                double totalSales = 0.0;
                                for (OrderEvent event : elements) {
                                    count++;
                                    totalSales += event.getPrice() * event.getQuantity();
                                }
                                // Result එක print කරනවා
                                out.collect("--- Flink Result --- Product: " + key +
                                            " - Orders: " + count +
                                            ", Total Sales: " + String.format("%.2f", totalSales) +
                                            " (Window End: " + new Date(context.window().getEnd()) + ")");
                            }
                        })
                        .print(); // Processed results console එකේ print කරනවා

                // Flink job එක execute කරනවා
                env.execute("Spring Boot Flink Order Processing Job");
            } catch (Exception e) {
                System.err.println("Error running Flink job: " + e.getMessage());
                e.printStackTrace();
            }
        }).start(); // අලුත් Thread එකක් විදියට job එක start කරනවා
    }
}

මෙතනදී අපි CommandLineRunner Interface එක implement කරනවා. ඒ කියන්නේ Spring Boot Application එක start වෙනකොටම මේ run() method එක execute වෙනවා. ඒකෙන් අපි Flink job එක අලුත් Thread එකක් ඇතුළේ run කරනවා, නැත්නම් Spring Boot main thread එක block වෙන්න පුළුවන්. WatermarkStrategy එක වැදගත් වෙන්නේ Flink වල Event Time processing වලට. මේකෙන් Flink වලට කියනවා events වලට අයිති real timestamp එක මොකක්ද කියලා. forBoundedOutOfOrderness කියන්නේ events පොඩ්ඩක් order නැතුව ආවත් ඒක handle කරන්න පුළුවන් කියලා.

keyBy(OrderEvent::getProductId) කරලා අපි orders productId එක අනුව group කරනවා. ඊට පස්සේ TumblingEventTimeWindows.of(Time.minutes(1)) කරලා, හැම මිනිත්තුවකටම අලුත් window එකක් හදනවා. process() method එක ඇතුළේ අපි window එකට ආපු හැම event එකක්ම iterate කරලා, orders ගණනයි, total sales එකයි calculate කරනවා. අන්තිමට ඒ result එක out.collect() කරලා console එකට print කරනවා.

4. Spring Boot REST Controller එක හදමු

දැන් අපි හදපු Flink service එකට events යවන්න පුළුවන් REST Endpoint එකක් හදමු.

package com.example.flinkspringboot.controller;

import com.example.flinkspringboot.model.OrderEvent;
import com.example.flinkspringboot.service.FlinkProcessingService;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/api/orders")
public class OrderController {

    private final FlinkProcessingService flinkProcessingService;

    public OrderController(FlinkProcessingService flinkProcessingService) {
        this.flinkProcessingService = flinkProcessingService;
    }

    @PostMapping("/submit")
    public ResponseEntity<String> submitOrder(@RequestBody OrderEvent event) {
        // Event එකට current timestamp එක දානවා (real world වලදී client side එකෙන් එන්න පුgwan)
        event.setTimestamp(System.currentTimeMillis());
        flinkProcessingService.processOrderEvent(event); // Flink service එකට event එක යවනවා
        return new ResponseEntity<>("Order submitted for Flink processing!", HttpStatus.OK);
    }
}

මේ OrderController එක /api/orders/submit කියන URL එකට එන POST request handle කරනවා. Request Body එකෙන් එන OrderEvent object එක, අපි කලින් හදපු FlinkProcessingService එකට යවනවා. System.currentTimeMillis() දාන්නේ demo එකට. ඇත්තටම timestamp එක client side එකෙන්, හෝ data source එකෙන් එනවනම් වඩා හොඳයි.

අපි දැන් Run කරලා බලමු!

ඔයාලගේ Spring Boot main application class එක Run කරන්න (FlinkSpringbootApplication.java වගේ එක).

Application එක start වුනාට පස්සේ, ඔයාලට පුළුවන් Postman, cURL, නැත්නම් වෙනත් REST client එකක් පාවිච්චි කරලා මේ endpoint එකට POST request යවන්න:

URL: http://localhost:8080/api/orders/submit

Method: POST

Content-Type: application/json

Request Body:

{
    "orderId": "ORD001",
    "productId": "PROD001",
    "quantity": 2,
    "price": 50.0
}
{
    "orderId": "ORD002",
    "productId": "PROD002",
    "quantity": 1,
    "price": 120.0
}
{
    "orderId": "ORD003",
    "productId": "PROD001",
    "quantity": 3,
    "price": 45.0
}

ඔයාලා මේ requests යවනකොට, Spring Boot console එකේ "Received order event..." වගේ message එකක් පෙන්වයි. ඒ වගේම, හැම මිනිත්තුවකටම Flink job එකෙන් process කරපු results "--- Flink Result ---" කියලා print වෙයි.

ඔයාලට පේයි Product ID එක අනුව orders ගණනයි, total sales එකයි Flink එකෙන් calculate කරන හැටි. Window එක ඉවර වුනාට පස්සේ තමයි result එක print වෙන්නේ. ඒක තමයි streaming analytics වල හැටි!

ඉදිරි පියවර සහ හොඳම ක්‍රම (Best Practices)

අපි මේ කරපු setup එක demo එකකට හොඳ වුනත්, real-world Production application එකකට යද්දි තව ගොඩක් දේවල් බලන්න ඕනේ:

  • Data Source/Sink:
    • Kafka/RabbitMQ: Real-time data ingest කරන්න සහ processed results ආයෙත් publish කරන්න Kafka වගේ Message Queue එකක් පාවිච්චි කරන එක තමයි හොඳම විසඳුම. Flink වලට Kafka connector තියෙනවා.
    • Databases/Storage: Processed results Cassandra, Elasticsearch, S3 වගේ තැන්වල save කරන්න පුළුවන්.
  • State Management: Flink වලට state maintain කරන්න පුළුවන් (උදා: customer කෙනෙක්ගේ total purchases). Production වලදී මේ state එක reliable විදියට store කරන්න Checkpointing සහ Savepoints හරිම වැදගත්. මේවා configure කරන්න අමතක කරන්න එපා.
  • Fault Tolerance: Flink inherent fault tolerant. Node එකක් fail වුනත් data නැති වෙන්නේ නැතුව, කලින් තිබ්බ තැනින් job එක continue කරන්න පුළුවන්. ඒකට Checkpointing enable කරන්න ඕනේ.
  • Deployment: අපි මේ demo එකේදී Flink job එක Spring Boot application එක ඇතුළෙම run කළා. Production වලදී Flink jobs, Flink cluster එකක් උඩ (Standalone, YARN, Kubernetes වගේ) deploy කරන එක තමයි හොඳම. Spring Boot application එකෙන් Flink Job Gateway එකක් විදියට job එක submit කරන්න පුළුවන්.
  • Monitoring and Logging: Flink job එක monitor කරන්න metrics සහ logs එකතු කරන එක හරිම වැදගත්. Prometheus, Grafana වගේ tools පාවිච්චි කරන්න පුළුවන්.
  • Backpressure: Data එන වේගය processing වේගයට වඩා වැඩි වුනොත් (backpressure) system එක collapse වෙන්න පුළුවන්. මේක manage කරන්න Flink වලට built-in mechanisms තියෙනවා.

අවසාන වචන

ඉතින් යාළුවනේ, මේ blog post එකෙන් අපි Spring Boot සහ Apache Flink එකට පාවිච්චි කරලා real-time event processing කරන හැටි ගැන ලොකු අදහසක් ගන්න ඇති කියලා හිතනවා. මේ දෙන්නාගේ එකතුව කියන්නේ modern, scalable, real-time applications හදන්න පුළුවන් සුපිරි combination එකක්.

අද අපි කරපු සරල උදාහරණයෙන් ඔයාලට Flink වල බලය සහ Spring Boot එක්ක ඒක integrate කරන හැටි ගැන පොඩි අදහසක් එන්න ඇති. මේක ඔයාලට පුළුවන් තවත් සංකීර්ණ use cases වලට යොදාගන්න. උදාහරණයක් විදියට, live fraud detection, real-time recommendation systems, IoT data analytics වගේ දේවල් වලට මේක නියමයි.

අනිවාර්යයෙන්ම මේ code එක ඔයාලගේ machine එකේ run කරලා බලන්න. එතකොට තමයි හරියටම තේරෙන්නේ. මොනවා හරි ප්‍රශ්න තියෙනවා නම්, තේරෙන්නේ නැති දෙයක් තියෙනවා නම්, පහළින් comment එකක් දාන්න. අපි ඒ ගැන කතා කරමු. ඒ වගේම, ඔයාලා Flink සහ Spring Boot එකට පාවිච්චි කරලා කරපු project මොනවාද කියලාත් කියන්න. තවත් මේ වගේ වැදගත්, ප්‍රයෝජනවත් blog posts එක්ක හමුවෙමු. ගිහින් එන්නම්! සුභ දවසක්!