Java Reactive Programming (Reactor, Flux, Mono) Sinhala Guide

Java Reactive Programming (Reactor, Flux, Mono) Sinhala Guide

ආයුබෝවන් යාළුවනේ! අද අපි කතා කරන්න යන්නේ මෘදුකාංග ඉංජිනේරු ක්ෂේත්‍රයේ දැන් ගොඩක් කතාබහට ලක්වෙන, ඒ වගේම අනාගතය වෙනුවෙන් අත්‍යවශ්‍ය සංකල්පයක් ගැන – ඒ තමයි Reactive Programming. විශේෂයෙන්ම Java ලෝකයේ Reactive Programming කියන්නේ මොකක්ද, ඒක කොහොමද අපේ දේවල් පහසු කරන්නේ, ඒ වගේම Project Reactor framework එකේ ප්‍රධානම කොටස් වන Flux සහ Mono කියන්නේ මොනවාද කියලා අපි සවිස්තරාත්මකව බලමු. ඔයා දැනටමත් Java developer කෙනෙක් නම්, නැත්නම් අලුතින් මේ පැත්තට එන්න බලාපොරොත්තු වෙන කෙනෙක් නම්, මේ ලිපිය ඔයාට Reactive Programming වලට හොඳ පදනමක් දාගන්න උදව් වෙයි.

අපි දන්නවා අද ලෝකේ හැම application එකක්ම වගේ බලාපොරොත්තු වෙන්නේ ඉක්මන්කම, කාර්යක්ෂමතාවය සහ කිසිම ප්‍රමාදයක් නැතුව වැඩ කරන්න. ඒත් සාම්ප්‍රදායිකව අපි ලියන Code වලින් මේ හැමදේම achieve කරන එක ටිකක් අමාරුයි. Blocking operations නිසා අපේ application එකේ performance එක අඩුවෙන්න පුළුවන්. Reactive Programming එන්නේ මේ ප්‍රශ්නෙට විසඳුමක් විදිහට. ඒකෙන් අපිට පුළුවන් Asynchronous, non-blocking applications හදන්න. එහෙනම් අපි බලමුකෝ මේක හරියටම මොකක්ද කියලා.

Reactive Programming යනු කුමක්ද?

සාම්ප්‍රදායිකව අපි Code ලියනකොට, එක task එකක් ඉවර වෙනකම් ඊළඟ task එකට යන්නේ නැහැ (blocking). උදාහරණයක් විදිහට, database එකකින් data load කරනකොට, ඒ data load වෙනකම් අපේ program එකේ අනිත් operations ටික නතර වෙලා තියෙනවා. මේක "blocking I/O" කියලා හඳුන්වනවා. පොඩ්ඩක් හිතන්න, ඔයා තේ එකක් හදාගන්න වතුර ටිකක් ලිපේ තියලා, ඒ වතුර ටික රත් වෙනකම් කිසිම දෙයක් කරන්නේ නැතුව බලාගෙන ඉන්නවා වගේ වැඩක්. ඒත් Reactive Programming වලදී, වතුර ටික රත් වෙන්න ලිපේ තියලා, ඒක රත් වෙන අතරතුරේ තේ කොළ ටිකයි, සීනි ටිකයි ලෑස්ති කරගන්න පුළුවන්. වතුර රත් වුණාම notification එකක් එනවා වගේ, අපිට ඊළඟ වැඩේට යන්න පුළුවන්. මේක තමයි non-blocking කියන්නේ.

Reactive Programming කියන්නේ data streams මත පදනම් වූ Asynchronous data processing එකක්. මේක Observer Pattern එකේම දියුණු කළ version එකක්. මෙහිදී, producer කෙනෙක් data stream එකක් publish කරනවා, ඒ stream එකට subscribe වෙලා ඉන්න consumers (subscribers) ඒ data එනකොට receive කරගෙන process කරනවා.

මේකෙන් ලැබෙන වාසි කිහිපයක් මෙන්න:

  • Responsiveness: Application එකේ responsiveness එක වැඩි වෙනවා, මොකද user requests වලට ඉක්මනින් ප්‍රතිචාර දක්වන්න පුළුවන්.
  • Resilience: System එකක කොටසක් fail වුණත්, සමස්ත system එකේම ක්‍රියාකාරීත්වයට ලොකු බලපෑමක් වෙන්නේ නැහැ.
  • Elasticity: System එකට එන requests ගණන අනුව automatically scale වෙන්න පුළුවන්.
  • Message Driven: Components එකිනෙක අතර asynchronous message passing හරහා සන්නිවේදනය කරනවා.

මේ සංකල්ප "The Reactive Manifesto" එකෙන් පැහැදිලිව ඉදිරිපත් කරලා තියෙනවා. Java වලදී Reactive Programming වලට ප්‍රධාන වශයෙන් භාවිතා වෙන frameworks තමයි RxJava සහ Project Reactor. අද අපි Project Reactor ගැන අවධානය යොමු කරමු.

Reactor Framework එකට හඳුන්වාදීම

Project Reactor කියන්නේ Spring Framework එකේ core library එකක් විදිහටත් භාවිතා වෙන, JVM එක සඳහා හදපු reactive library එකක්. Spring WebFlux වගේ frameworks වලට පදනම වෙන්නේ මේ Reactor තමයි. Reactor ප්‍රධාන වශයෙන්ම data publishers දෙකක් භාවිතා කරනවා – ඒ තමයි Flux සහ Mono.

  • Flux: Flux එකකින් 0 සිට N දක්වා (කිසිවක් නැති, එකක් හෝ කිහිපයක්) items ප්‍රමාණයක් emit කරන්න පුළුවන්. උදාහරණයක් විදිහට, database එකකින් ගොඩක් user records retrieve කරනකොට, web service එකකට එන incoming requests stream එකක් වගේ දේවල් සඳහා Flux එකක් භාවිතා කරන්න පුළුවන්. ඒ කියන්නේ "array of data" වගේ දෙයක් stream කරනවා නම්, Flux තමයි නියම.
  • Mono: Mono එකකින් 0 හෝ 1 item එකක් විතරයි emit කරන්න පුළුවන්. උදාහරණයක් විදිහට, database එකකින් එකම user record එකක් retrieve කරනකොට, save operation එකකින් ලැබෙන confirmation එකක් වගේ දේවල් සඳහා Mono එකක් භාවිතා කරන්න පුළුවන්. ඒ කියන්නේ "single data element" වගේ දෙයක් stream කරනවා නම්, Mono තමයි නියම.

මේ දෙකම Publisher interface එක implement කරනවා. ඒවාට දත්ත stream කරන්න, ඒවා transform කරන්න, filter කරන්න සහ end consumers ලට subscribe වෙන්න පහසුකම් සපයනවා.

Flux සහ Mono සමග වැඩ කිරීම

Flux සහ Mono objects හදාගන්න පුළුවන් විවිධ ක්‍රම තියෙනවා. අපි මේ කිහිපයක් බලමු.

Creating Flux and Mono:

range(): සංඛ්‍යා පරාසයක් generate කරන්න.

Flux<Integer> numberFlux = Flux.range(1, 10); // Emits 1 to 10

error(): Error එකක් emit කරන publisher කෙනෙක්.

Mono<String> errorMono = Mono.error(new RuntimeException("Something went wrong!"));

empty(): කිසිම data එකක් emit නොකරන publisher කෙනෙක්.

Mono<String> emptyMono = Mono.empty();
Flux<String> emptyFlux = Flux.empty();

fromIterable() / fromStream(): Collection එකකින් හෝ Stream එකකින්.

List<String> names = Arrays.asList("Alice", "Bob", "Charlie");
Flux<String> nameFlux = Flux.fromIterable(names);

just(): Static data එකක් publish කරන්න.

Mono<String> mono = Mono.just("Hello Reactor!");
Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5);

Subscribing to Flux and Mono:

publisher කෙනෙක්ගෙන් data receive කරන්න නම්, අපි subscribe() method එක use කරන්න ඕනේ. subscribe() method එකට parameters කිහිපයක් දාන්න පුළුවන්:

  • Consumer<T> onNext: සෑම item එකක්ම receive කරන විට execute වෙනවා.
  • Consumer<Throwable> onError: Error එකක් occur වුණොත් execute වෙනවා.
  • Runnable onComplete: Stream එක සාර්ථකව complete වුණාම execute වෙනවා.

Code Example 1: Basic Flux and Mono Creation and Subscription


import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.Arrays;
import java.util.List;

public class BasicReactiveExample {

    public static void main(String[] args) {
        System.out.println("--- Mono Example ---");
        // Mono එකක් හදමු: එකම item එකක් විතරයි
        Mono<String> greetingMono = Mono.just("ආයුබෝවන්, Reactor!");

        // Mono එකට subscribe වෙමු
        greetingMono.subscribe(
            data -> System.out.println("Mono Data: " + data), // onNext - දත්ත ලැබෙන විට
            error -> System.err.println("Mono Error: " + error.getMessage()), // onError - දෝෂයක් ඇති වුවහොත්
            () -> System.out.println("Mono Completed!") // onComplete - දත්ත ලැබීම අවසන් වූ විට
        );

        System.out.println("\n--- Flux Example ---");
        // Flux එකක් හදමු: බහු items (0 සිට N)
        List<String> fruits = Arrays.asList("ඇපල්", "කෙසෙල්", "දොඩම්", "අඹ");
        Flux<String> fruitFlux = Flux.fromIterable(fruits);

        // Flux එකට subscribe වෙමු
        fruitFlux.subscribe(
            fruit -> System.out.println("ලැබුණු පලතුර: " + fruit), // onNext
            error -> System.err.println("Flux Error: " + error.getMessage()), // onError
            () -> System.out.println("සියලු පලතුරු ලැබුණා!") // onComplete
        );

        System.out.println("\n--- Empty Mono Example ---");
        Mono<String> emptyDataMono = Mono.empty();
        emptyDataMono.subscribe(
            data -> System.out.println("Empty Mono Data: " + data),
            error -> System.err.println("Empty Mono Error: " + error.getMessage()),
            () -> System.out.println("Empty Mono Completed! (කිසිවක් නොලැබුණි)")
        );

        System.out.println("\n--- Error Mono Example ---");
        Mono<String> errorGeneratingMono = Mono.error(new IllegalArgumentException("අවලංගු ආදානයක්!"));
        errorGeneratingMono.subscribe(
            data -> System.out.println("Error Mono Data: " + data),
            error -> System.err.println("Error Mono Error: " + error.getMessage()), // මෙතනදී error එක print වෙයි
            () -> System.out.println("Error Mono Completed! (මේක කවදාවත් වෙන්නේ නෑ)") // error එකක් ආවොත් මේක run වෙන්නේ නෑ
        );
    }
}

මේ example එකෙන් ඔයාට තේරෙයි Mono එකක් එක් item එකක් publish කරන හැටිත්, Flux එකක් items කිහිපයක් publish කරන හැටිත්. ඒ වගේම subscribe method එකේ onNext, onError, onComplete callbacks කොහොමද වැඩ කරන්නේ කියලත්.

ප්‍රායෝගික උදාහරණ සහ Operators

Reactive Programming වල තියෙන ලොකුම වාසියක් තමයි streams transform කරන්න, combine කරන්න සහ manipulate කරන්න පුළුවන් operators ගොඩක් තියෙන එක. මේ operators "immutable" (වෙනස් නොවන) විදිහට තමයි වැඩ කරන්නේ. ඒ කියන්නේ operator එකක් apply කළාම අලුත් Flux හෝ Mono object එකක් return කරනවා.

Operators කිහිපයක්:

  • map(): Stream එකේ හැම item එකක්ම වෙනස් කරන්න.
  • filter(): Stream එකෙන් ඇතැම් items ඉවත් කරන්න.
  • flatMap(): Asynchronous operations එකිනෙක සම්බන්ධ කරන්න (complex transformations).
  • concatWith() / mergeWith(): Streams දෙකක් එකට join කරන්න.

Code Example 2: Transforming data with map and filter

අපි හිතමු අපිට user names ටිකක් තියෙනවා, ඒවාට "Mr./Ms." prefix එක දාලා, "A" අකුරෙන් පටන් ගන්න names විතරක් filter කරගන්න ඕනේ කියලා.


import reactor.core.publisher.Flux;
import java.util.Arrays;
import java.util.List;

public class TransformExample {

    public static void main(String[] args) {
        List<String> names = Arrays.asList("Alice", "Bob", "Amara", "Chamal", "Anushka");

        Flux<String> processedNames = Flux.fromIterable(names)
            .filter(name -> name.startsWith("A")) // 'A' අකුරෙන් පටන් ගන්න names විතරක් තෝරගන්න
            .map(name -> "Mr./Ms. " + name); // හැම name එකකටම prefix එකක් එකතු කරන්න

        System.out.println("--- Processed Names ---");
        processedNames.subscribe(
            finalName -> System.out.println("නව නම: " + finalName),
            error -> System.err.println("දෝෂයකි: " + error.getMessage()),
            () -> System.out.println("නම් සැකසීම අවසන්!")
        );
    }
}

මේ code එක run කළාම, "Mr./Ms. Alice", "Mr./Ms. Amara", "Mr./Ms. Anushka" කියලා print වෙයි. filter එකෙන් අදාල නැති නම් ඉවත් කරලා, map එකෙන් ඉතිරි නම් transform කරනවා.

Code Example 3: Error Handling (onErrorReturn)

Reactive streams වල error handling කියන්නේ ගොඩක් වැදගත් දෙයක්. onErrorReturn වගේ operators වලින් අපිට පුළුවන් error එකක් ආවොත් default value එකක් return කරන්න, නැත්නම් onErrorResume වගේ එකකින් වෙනත් publisher කෙනෙක්ට control එක දෙන්න.


import reactor.core.publisher.Flux;

public class ErrorHandlingExample {

    public static void main(String[] args) {
        Flux<Integer> numbers = Flux.just(1, 2, 3, 4, 5)
            .map(number -> {
                if (number == 3) {
                    throw new RuntimeException("අපි 3ට අකමැතියි!"); // අංක 3ට error එකක් දාමු
                }
                return number * 10;
            })
            .onErrorReturn(0); // error එකක් ආවොත්, stream එක 0 විදිහට ඉවර කරන්න

        System.out.println("--- Error Handling with onErrorReturn ---");
        numbers.subscribe(
            data -> System.out.println("ලැබුණු අංකය: " + data),
            error -> System.err.println("Error: " + error.getMessage()), // මේක print වෙන්නේ නැහැ
            () -> System.out.println("Stream එක අවසන්!")
        );

        System.out.println("\n--- Error Handling with onErrorResume ---");
        Flux<Integer> numbersWithErrorResume = Flux.just(1, 2, 3, 4, 5)
            .map(number -> {
                if (number == 3) {
                    throw new RuntimeException("3 වෙනස් සංඛ්‍යාවක්!");
                }
                return number * 10;
            })
            .onErrorResume(e -> {
                System.err.println("Error එකක් අහු වුණා: " + e.getMessage());
                return Flux.just(100, 200, 300); // error එකක් ආවොත් මේ alternative stream එක දාමු
            });

        numbersWithErrorResume.subscribe(
            data -> System.out.println("ලැබුණු අංකය (Resume): " + data),
            error -> System.err.println("Error (Resume): " + error.getMessage()), // මේකත් print වෙන්නේ නැහැ
            () -> System.out.println("Stream එක (Resume) අවසන්!")
        );
    }
}

onErrorReturn එකෙන් stream එක අවසන් කරලා default value එකක් දෙනකොට, onErrorResume එකෙන් error එකක් ආවොත් වෙනත් stream එකක් publish කරන්න පුළුවන්. මේකෙන් අපේ application එකේ resilience එක වැඩි වෙනවා.

Backpressure (කෙටි හැඳින්වීමක්):

Reactive Programming වල තවත් වැදගත් සංකල්පයක් තමයි Backpressure. සමහර වෙලාවට producer කෙනෙක් consumer කෙනෙක්ට process කරන්න පුළුවන් ප්‍රමාණයට වඩා වැඩි data ප්‍රමාණයක් ඉක්මනින් emit කරන්න පුළුවන්. මේකෙන් consumer over-load වෙලා crash වෙන්න පුළුවන්. Backpressure කියන්නේ consumer කෙනෙක්ට, producer කෙනෙක්ගෙන් එන data ප්‍රවාහයේ වේගය පාලනය කරන්න පුළුවන් යාන්ත්‍රණයක්. ඒ කියන්නේ "මට මේ data ටික process කරන්න ඔය වේගය වැඩියි, ටිකක් ස්ලෝ කරන්න" කියලා consumer ට producer ට කියන්න පුළුවන්. Reactor framework එකේ මේක automatically handle කරනවා. ඒක නිසා අපි සාමාන්‍යයෙන් මේ ගැන වැඩිය හිතන්න ඕනේ නැහැ.

Conclusion

ඉතින් යාළුවනේ, මේ ලිපියෙන් ඔයාලට Reactive Programming ගැන, විශේෂයෙන්ම Java වල Project Reactor framework එක ගැන සහ ඒකේ තියෙන ප්‍රධානම Publishers වන Flux හා Mono ගැන හොඳ අවබෝධයක් ලැබෙන්න ඇති කියලා මම හිතනවා. අපි දැක්කා Reactive Programming මගින් Asynchronous, non-blocking applications හදලා performance, scalability, සහ responsiveness වැඩි කරගන්න පුළුවන් හැටි. ඒ වගේම Flux සහ Mono භාවිතා කරලා data streams හදන හැටි, operators යොදාගෙන ඒවා manipulate කරන හැටි, ඒ වගේම error handling කොහොමද කරන්නේ කියලත් අපි ඉගෙන ගත්තා.

Reactive Programming කියන්නේ ටිකක් අලුත් සංකල්පයක් වුණාට, Microservices, Cloud Native applications වගේ දේවල් වලට මේක නැතිවම බැරි දෙයක් වෙලා. Spring WebFlux වගේ frameworks වලදී Reactor කියන්නේ අත්‍යවශ්‍යම දෙයක්.

දැන් ඔයාලා මේ concept ටික තේරුම් අරන් ඇති කියලා මම විශ්වාස කරනවා. මේක තවත් හොඳට තේරුම් ගන්න නම්, පුළුවන් තරම් Code ලියලා practice කරන්න. ඔයාලගේ අදහස්, මේ ගැන තියෙන ප්‍රශ්න, නැත්නම් මේක implement කරද්දී ආපු අත්දැකීම් පහළ comment section එකේ අපිට කියන්න. ඒ වගේම මේ ලිපිය ඔයාගේ යාලුවෝ අතරත් share කරන්න අමතක කරන්න එපා. ජයවේවා!