Project Reactor සමග Reactive Systems ගොඩනැගීම | Java Asynchronous Programming Sinhala

Project Reactor සමග Reactive Systems ගොඩනැගීම | Java Asynchronous Programming Sinhala

හලෝ යාළුවනේ! අද අපි කතා කරන්න යන්නේ නූතන Software Engineering වල ඉතාම වැදගත් මාතෘකාවක් ගැන. ඒ තමයි Reactive Systems. විශේෂයෙන්ම අපි Java පරිසරය තුළ Project Reactor කියන බලගතු Library එක පාවිච්චි කරලා Non-blocking Applications හදන්නේ කොහොමද කියලා විස්තරාත්මකව බලමු.

අද කාලේ Application වලට High Concurrency සහ Scalability අත්‍යවශ්‍යයි. සාම්ප්‍රදායික Blocking I/O ක්‍රම වලදී එක Request එකක් එනකල් thread එකක් block වෙනවා. මේක Application එකේ Performance වලට හානි කරනවා වගේම, වැඩි ගානකට Concurrent Users ලා manage කරන්නත් අමාරු කරනවා. මෙන්න මේකට විසඳුමක් තමයි Reactive Programming කියන්නේ.

මේ Tutorial එකෙන් අපි Reactive Programming කියන්නේ මොකක්ද, ඒකේ වාසි මොනවද, Project Reactor කියන්නේ මොකක්ද, ඒකේ ප්‍රධානම සංරචක වන Flux සහ Mono කියන්නේ මොනවද, ඒ වගේම Reactive Operators සහ Concurrency Management (Schedulers) ගැනත් දැනගනිමු. අවසානයේදී සරල Code Example එකක් හරහා මේ සංකල්ප ප්‍රායෝගිකව ක්‍රියාත්මක කරන්නේ කොහොමද කියලා බලමු. එහෙනම්, අපි පටන් ගමුද?

Reactive Programming යනු කුමක්ද?

සරලව කිව්වොත්, Reactive Programming කියන්නේ Asynchronous Data Streams එක්ක වැඩ කරන Programming Paradigm එකක්. ඒ කියන්නේ, අපිට Data එකක් එනකල් බලාගෙන ඉන්නේ නැතුව, Data එකක් ආවම ඒක Process කරන්න පුළුවන් විදිහට Code ලියන එක. මේක Event-driven වෙනවා වගේම, Non-blocking වෙනවා.

සාම්ප්‍රදායික Imperative Programming වලදී අපි Code එක ලියන්නේ Step-by-step. එක Step එකක් ඉවර වෙනකල් ඊළඟ Step එක පටන් ගන්නේ නැහැ. උදාහරණයක් විදිහට, Database Call එකක් ගත්තොත්, ඒකේ Result එක එනකල් Thread එක block වෙනවා. ඒක තමයි Blocking I/O.

හැබැයි Reactive Programming වලදී, අපි Data Stream එකකට "subscribe" කරනවා. Data එකක් ආපු ගමන් (හෝ Error එකක් ආපු ගමන්, නැත්නම් Stream එක ඉවර වුණාම) අපේ Subscriber එකට ඒ බව දැනුම් දෙනවා. මේ ක්‍රමය නිසා, එක Thread එකකට එක වෙලාවකදී Request ගොඩකට Process කරන්න පුළුවන් වෙනවා.

Reactive Programming වල ප්‍රධාන වාසි:

  • Responsiveness: Application එක හැම වෙලාවෙම Responsive එකක් විදිහට තියෙනවා. Blocking නිසා UI Freeze වෙන එකක් නැහැ.
  • Resilience: වැරදීම් (Errors) වලට හොඳින් මුහුණ දෙන්න පුළුවන්. Stream එකක Error එකක් ආවත්, ඒක gracefully handle කරන්න Mechanisms තියෙනවා.
  • Elasticity: Workload එක වෙනස් වෙනකොට Resource Allocation එක dynamically adjust කරන්න පුළුවන්.
  • Message-Driven: Asynchronous Message Passing හරහා Components අතර Communication සිද්ධ වෙනවා.

මේ සංකල්ප Reactive Manifesto එකෙන් තමයි මුලින්ම ඉදිරිපත් කළේ.

Project Reactor හඳුන්වා දීම

Project Reactor කියන්නේ JVM (Java Virtual Machine) එකට Reactive Programming හැකියාවන් ගෙන එන Spring Framework එකේම කොටසක් වන, Netty වගේ Non-blocking I/O Libraries මත පදනම් වූ Reactive Streams Specification එක Implement කරන Library එකක්. මේක විශේෂයෙන්ම Spring WebFlux වගේ Reactive Web Frameworks වලට යටින් තියෙන Foundation එක.

Reactive Streams කියන්නේ Standard එකක්. ඒකෙන් Asynchronous Stream Processing වලට අවශ්‍ය Interfaces සහ Protocols නිර්වචනය කරනවා. Project Reactor මේ Standard එකට අනුකූලව Java Implementation එකක් ලබා දෙනවා.

ප්‍රධානම Building Blocks: Flux සහ Mono

Project Reactor වලදී අපි වැඩිපුරම පාවිච්චි කරන Core Types දෙකක් තියෙනවා:

  • Flux<T>: Asynchronous Sequence එකක් නිරූපණය කරනවා, ඒකේ Zero, එකක්, නැත්නම් අසීමිත (infinite) T Type එකේ Elements ප්‍රමාණයක් අඩංගු වෙන්න පුළුවන්. මේක Stream එකක් වගේ. උදාහරණයක් විදිහට, Database එකකින් Records ගොඩක් එනවා නම්, HTTP Request එකකින් Stream එකක් විදිහට Data එනවා නම්, Flux එකක් පාවිච්චි කරනවා.
  • Mono<T>: Asynchronous Sequence එකක් නිරූපණය කරනවා, ඒකේ Zero නැත්නම් එකම එක T Type එකේ Element එකක් විතරයි අඩංගු වෙන්නේ. උදාහරණයක් විදිහට, Database එකකින් Single Record එකක් එනවා නම්, HTTP GET Request එකකින් එක Response Object එකක් එනවා නම්, Mono එකක් පාවිච්චි කරනවා.

මේ දෙකම Publisher Interface එක Implement කරනවා.

Dependency Setup (Maven):

ඔබේ pom.xml එකට මේ Dependency එක එකතු කරගන්න:


<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
    <version>3.6.4</version> <!-- Latest stable version එක බලලා දාන්න -->
</dependency>

Flux සහ Mono සමග වැඩ කිරීම

අපි දැන් Flux සහ Mono හදාගෙන, ඒවාට Operators යොදලා Data Transform කරන විදිහ බලමු. Reactive Programming වල තියෙන ලොකුම වාසියක් තමයි මේ Declarative Style එක. Data එකට මොකද වෙන්න ඕනේ කියලා අපි කියනවා මිසක්, ඒක කරන්නේ කොහොමද කියලා Step-by-step කියන්නේ නැහැ.

Flux සහ Mono නිර්මාණය කිරීම:

Flux සහ Mono හදන්න විවිධ ක්‍රම තියෙනවා:

  • Flux.just(...) / Mono.just(...): නියත Elements දාලා හදනවා.
  • Flux.fromIterable(...) / Flux.fromArray(...): Collection එකකින් නැත්නම් Array එකකින් හදනවා.
  • Flux.range(...): අංක Sequence එකක් හදනවා.
  • Flux.generate(...): Programmatic විදිහට Elements ජනනය කරන්න.
  • Mono.empty() / Flux.empty(): හිස් Stream එකක් හදන්න.
  • Mono.error(Throwable) / Flux.error(Throwable): Error Stream එකක් හදන්න.

ප්‍රධාන Operators:

Operators කියන්නේ Stream එකක් Transform කරන්න, Filter කරන්න, Combine කරන්න, නැත්නම් Process කරන්න පාවිච්චි කරන Functions. මේවා Chain කරන්න පුළුවන්.

  • Transformation:
    • map(Function): Stream එකේ තියෙන හැම Element එකක්ම වෙන Type එකකට Transform කරනවා. Synchronous Transformation.
    • flatMap(Function): Asynchronous Transformation එකකට පාවිච්චි කරනවා. හැම Element එකක්ම අරගෙන ඒකෙන් තව Publisher (Flux හෝ Mono) එකක් හදලා, ඒ හැම Publisher එකකින්ම එන Result එක Single Stream එකකට Flatten කරනවා.
  • Filtering:
    • filter(Predicate): යම්කිසි Condition එකක් සපුරන Elements විතරක් Stream එකෙන් එලියට දෙනවා.
  • Combination:
    • zip(Publisher, Publisher, Function): Publishers දෙකකින් (හෝ වැඩි ගානකින්) එකම වෙලාවේ Elements අරගෙන, ඒවා Combine කරලා Tuple එකක් විදිහට දෙනවා.
    • merge(Publisher, Publisher): Publishers දෙකකින් එන Elements ඒවා එන Order එකටම Merge කරලා දෙනවා.
  • Error Handling:
    • onErrorReturn(T): Error එකක් ආවොත්, Stream එක නවත්වලා, නිශ්චිත Default Value එකක් Return කරනවා.
    • onErrorResume(Function): Error එකක් ආවොත්, වෙන Publisher එකක් Resume කරනවා.
  • Consumption:
    • subscribe(): මේක තමයි වැදගත්ම දේ. Reactive Stream එකක් ඇත්තටම Execute වෙන්නේ subscribe() Method එකට Call කරාමයි. Publisher එකට Subscriber කෙනෙක් නැත්නම්, Data Flow එකක් සිද්ධ වෙන්නේ නැහැ. (Remember: "Nothing happens until you subscribe!")
    • subscribe(Consumer<? super T> consumer): Data එක Process කරන්න Lambda Expression එකක් දෙනවා.
    • subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer): Error එකක් ආවොත් Handle කරන්නත් Lambda Expression එකක් දෙනවා.

සරල Code Example එකක්:

අපි දැන් මේවා පාවිච්චි කරලා සරල Code Example එකක් බලමු. මෙතනදී අපි Flux එකක් හදලා, map එකකින් Data Transform කරලා, filter එකකින් Filter කරලා, subscribe කරලා ඒක Output කරනවා.


import reactor.core.publisher.Flux;

public class ReactorExample {

    public static void main(String[] args) {
        // Flux එකක් හදනවා String names වලින්
        Flux<String> namesFlux = Flux.just("Amal", "Kamal", "Nimal", "Sunil", "Bandara", "Gayani");

        // Operations chain එකක් හදනවා
        namesFlux
            .map(String::toUpperCase) // හැම නමක්ම uppercase කරනවා
            .filter(name -> name.startsWith("S")) // "S" වලින් පටන් ගන්න නම් විතරක් තෝරනවා
            .doOnNext(name -> System.out.println("Processing: " + name)) // හැම element එකක්ම process කරනකොට print කරනවා
            .subscribe(
                name -> System.out.println("Found Name: " + name), // OnNext
                error -> System.err.println("An error occurred: " + error.getMessage()), // OnError
                () -> System.out.println("Finished processing names!") // OnComplete
            );

        System.out.println("Main thread finished."); // මේක reactive stream එකට කලින් print වෙනවා
    }
}

මේ Code එක Run කරපුවාම, "Main thread finished." කියන එක මුලින්ම Print වෙන්න පුළුවන්. ඒකට හේතුව Flux එක Asynchronous විදිහට Process වෙන නිසා. Output එක මෙහෙම වෙයි:


Main thread finished.
Processing: SUNIL
Found Name: SUNIL
Finished processing names!

"Amal", "Kamal", "Nimal", "Sunil", "Bandara", "Gayani" කියන නම් ටික ගත්තම, ඒවා toUpperCase වෙනවා. ඊට පස්සේ "S" වලින් පටන් ගන්න නම් විතරක් filter වෙනවා. ඒ නිසා "SUNIL" කියන නම විතරක් Output වෙනවා.

Backpressure සහ Schedulers

Reactive Programming වල තවත් වැදගත් සංකල්ප දෙකක් තමයි Backpressure සහ Schedulers.

Backpressure:

හිතන්න Producer කෙනෙක් Data ගොඩක් හරි වේගෙන් Generate කරනවා කියලා. හැබැයි Consumer කෙනෙක්ට ඒක ඒ වේගෙන් Process කරන්න බැහැ. මේ වගේ තත්ත්වයක් තමයි Backpressure එකක් ඇති කරන්නේ. සාම්ප්‍රදායික Systems වල මේක Buffer Overflow, Memory Issues, නැත්නම් Crashes වලට හේතු වෙන්න පුළුවන්.

Reactive Streams Specification එකේ Backpressure Mechanism එකක් තියෙනවා. ඒකෙන් Consumer එකට පුළුවන් Producer එකට "මට තව Elements කීයක් ඕනෙද" කියලා ඉල්ලන්න. මේක නිසා Producer එක Consumer එකට දරාගන්න පුළුවන් ප්‍රමාණයට විතරක් Data Publish කරනවා. Project Reactor මේ Backpressure එක AutomaticHandle කරනවා, ඒ නිසා අපිට ඒ ගැන වැඩිය හිතන්න අවශ්‍ය නැහැ.

Schedulers:

Schedulers කියන්නේ Project Reactor වල Concurrency සහ Threading Manage කරන්න පාවිච්චි කරන Mechanism එකක්. Reactive Streams Specification එකෙන් Concurrency එක ගැන වැඩි විස්තරයක් කියන්නේ නැහැ. ඒක Implementation එකට බාර දෙනවා. Project Reactor මේකට Schedulers කියන Abstraction එක දෙනවා.

ප්‍රධාන Schedulers වර්ග කීපයක් තියෙනවා:

  • Schedulers.immediate(): Current Thread එකේම Task එක Execute කරනවා.
  • Schedulers.single(): Reusable Single Thread එකක Task එක Execute කරනවා.
  • Schedulers.elastic() (Deprecated in Reactor 3.4+): අවශ්‍යතාවය අනුව Threads හදනවා, I/O Blocking operations (Database calls, Network calls) වලට හොඳයි. Reactor 3.4 න් පස්සේ මේක වෙනුවට Schedulers.boundedElastic() ආවා.
  • Schedulers.boundedElastic(): Elastic Scheduler එක වගේමයි, නමුත් Thread Pool එකේ Size එක bounded වෙනවා, ඒ කියන්නේ Threads ගානට Limit එකක් තියෙනවා. I/O Blocking operations වලට හොඳම Scheduler එක.
  • Schedulers.parallel(): CPU Cores ගානට Threads තියෙන Thread Pool එකක් පාවිච්චි කරනවා. Computation-intensive tasks වලට හොඳයි.

publishOn() vs subscribeOn():

මේ Methods දෙක Scheduler එකක් Apply කරන විදිහ තීරණය කරනවා:

  • subscribeOn(Scheduler): සම්පූර්ණ Reactive Chain එකම Execute වෙන්න ඕන Thread එක තීරණය කරනවා. මේක Chain එකේ කොහේ දැම්මත්, මුළු Chain එකේම Subscription එක සිද්ධ වෙන්නේ ඒ Scheduler එකේ.
  • publishOn(Scheduler): මේක Chain එකේ Operator එකකට කලින් දැම්මොත්, ඊට පස්සේ එන Operators ටික Execute වෙන්නේ publishOn() එකට දීපු Scheduler එකේ. ඒ කියන්නේ, මේකෙන් Subsequent Operators වල Execution Context එක වෙනස් කරනවා.

Scheduler Example:

සරල Example එකකින් මේක තේරුම් ගමු:


import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

public class ReactorSchedulerExample {

    public static void main(String[] args) throws InterruptedException {
        Flux.range(1, 10)
            .map(i -> {
                System.out.println("Mapping " + i + " on thread: " + Thread.currentThread().getName());
                return i * 2;
            })
            .subscribeOn(Schedulers.parallel()) // Entire chain will be executed on parallel scheduler
            .filter(i -> {
                System.out.println("Filtering " + i + " on thread: " + Thread.currentThread().getName());
                return i % 4 == 0;
            })
            .publishOn(Schedulers.boundedElastic()) // Subsequent operations will be on boundedElastic scheduler
            .subscribe(
                result -> System.out.println("Result: " + result + " on thread: " + Thread.currentThread().getName()),
                error -> System.err.println("Error: " + error),
                () -> System.out.println("Completed!")
            );

        // Main thread එක instantly ඉවර වෙන්න පුළුවන් නිසා, පොඩ්ඩක් ඉන්න කියලා කියනවා
        // Production code එකේදී මේ වගේ Thread.sleep() පාවිච්චි කරන්නේ නැහැ
        Thread.sleep(1000);
    }
}

මේ Example එක Run කරපුවාම, Output එකේ Thread Names වෙනස් වෙන හැටි ඔබට පේයි. subscribeOn(Schedulers.parallel()) නිසා මුළු Subscription එකම parallel Scheduler එකේ පටන් ගන්නවා. ඊට පස්සේ publishOn(Schedulers.boundedElastic()) නිසා filter එකෙන් පස්සේ එන subscribe කොටස boundedElastic Scheduler එකට මාරු වෙනවා.

අවසානයට (Conclusion)

ඉතින් යාළුවනේ, මේ Tutorial එකෙන් අපි Java Reactive Systems, විශේෂයෙන්ම Project Reactor ගැන ගොඩක් දේවල් ඉගෙන ගත්තා. Reactive Programming කියන්නේ අපේ Applications වඩාත් Responsive, Resilient, සහ Scalable කරන්න පුළුවන් බලගතු Paradigm එකක්.

අපි Flux සහ Mono කියන්නේ මොනවද, ඒවා හදන්නේ කොහොමද, ඒ වගේම map, filter වගේ Operators පාවිච්චි කරලා Data Transform කරන්නේ කොහොමද කියලා බැලුවා. ඒ වගේම Backpressure සහ Schedulers පාවිච්චි කරලා Concurrency Manage කරන විදිහ ගැනත් පොඩි අවබෝධයක් ලබා ගත්තා.

Project Reactor කියන්නේ ලොකු Library එකක්. මේක හැඳින්වීමක් විතරයි. මේකේ තව ගොඩක් Operators සහ Features තියෙනවා. ඒ නිසා මම ඔයාලට ආරාධනා කරනවා Project Reactor Reference Guide එක බලලා, මේ ගැන තවත් හොයා බලන්න කියලා. ඔයාලගේ ඊළඟ Project එකට Reactive Approach එකක් යොදාගෙන බලන්න!

මේ Tutorial එක ගැන ඔයාලගේ අදහස්, ප්‍රශ්න, නැත්නම් ඔයාලගේ Reactive Programming Experiences පහලින් Comment එකක් දාන්න! ඊළඟ Tutorial එකෙන් හමුවෙමු!