Java parallelStream() Sinhala Guide - වේගවත් දත්ත සැකසීම | Parallel Processing

Java parallelStream() Sinhala Guide - වේගවත් දත්ත සැකසීම | Parallel Processing

Java Streams සමඟින් Parallel Processing: ඔබේ Code එක වේගවත් කරමු | Sinhala Guide

ඉතින් කොහොමද යාලුවනේ! 🚀 අද අපි කතා කරන්න යන්නේ Java Programming වල තවත් නියම Topic එකක් ගැන. ඒ තමයි Java Streams භාවිතා කරලා කොහොමද අපේ Code එකේ Parallel Processing කරනවා කියන එක. විශේෂයෙන්ම, parallelStream() කියන Method එක අපේ application වල performance එක වැඩි කරගන්න කොච්චර උදව් වෙනවද කියලා අපි බලමු.

දැන් කාලේ software applications කියන්නේ ගොඩක් දත්ත (data) එක්ක වැඩ කරන ඒවානේ. සමහර වෙලාවට මේ දත්ත ප්‍රමාණය කොච්චරද කියනවනම් ඒවා සාමාන්‍ය විදිහට, ඒ කියන්නේ එකින් එකට (sequentially) process කරන්න ගියොත් අපේ application එකේ වේගය ගොඩක් අඩු වෙන්න පුළුවන්. එතකොට තමයි Parallel Processing කියන concept එක අපිට උදව්වට එන්නේ. මේකෙන් වෙන්නේ එක වැඩක් කොටස් වලට කඩලා එකවර processor cores කිහිපයකින් ඒ කොටස් process කරන එක.

අද අපි මේ Guide එකෙන් ඉගෙන ගන්නවා:

  • Java Streams මොනවද සහ ඒවා වැදගත් වෙන්නේ ඇයි කියලා.
  • ඇයි Parallel Processing අවශ්‍ය වෙන්නේ කියන එක.
  • parallelStream() කියන්නේ මොකක්ද සහ ඒක කොහොමද පාවිච්චි කරන්නේ කියලා.
  • ප්‍රායෝගික උදාහරණයක් එක්ක parallelStream() වල බලය බලන හැටි.
  • parallelStream() පාවිච්චි කරනකොට සැලකිලිමත් විය යුතු කරුණු.

හරි, එහෙනම් අපි පටන් ගමු!

Java Streams මොනවද? (Java Streams කියන්නේ මොනවද?)

ඔයාලා Java 8 පාවිච්චි කරනවනම්, Streams API කියන්නේ ඔබට දත්ත එකතුවක් (collections of data) process කරන්න තියෙන හරිම පහසු සහ බලවත් ක්‍රමයක්. සරලව කිව්වොත්, Stream එකක් කියන්නේ data source එකකින් එන elements flow එකක්. මේක අපිට Filter, Map, Reduce වගේ operations කරන්න පුළුවන්.

Stream එකක ප්‍රධාන වාසි කිහිපයක් තියෙනවා:

  • Declarative programming: අපි කොහොමද කියන එකට වඩා මොනවද කරන්න ඕනේ කියන එක ගැන අවධානය යොමු කරන්න පුළුවන්.
  • Functional programming: Lambda expressions එක්ක හොඳට වැඩ කරනවා.
  • Chaining operations: Operations ගණනාවක් එක දිගට දාගෙන යන්න පුළුවන්.
  • Internal iteration: අපි for loop එකක වගේ manually iterate කරන්න අවශ්‍ය නෑ.

සාමාන්‍යයෙන් අපි Stream එකක් හදන්නේ මෙහෙමයි:

List<String> names = Arrays.asList("Amara", "Bandara", "Chandra", "Dilini", "Eshan");

// Sequential Stream එකක්
names.stream()
     .filter(name -> name.startsWith("A"))
     .map(String::toUpperCase)
     .forEach(System.out::println);

මේ code එකෙන් වෙන්නේ 'A' අකුරෙන් පටන් ගන්න names අරගෙන ඒවා uppercase කරලා print කරන එක. මේක තමයි Sequential Stream එකක්. ඒ කියන්නේ එක වැඩක් ඉවර වෙලා තමයි ඊළඟ වැඩේ පටන් ගන්නේ.

ඇයි Parallel Processing අවශ්‍ය වෙන්නේ?

දැන් ඔයාලා හිතනවා ඇති, Stream එකක් හොඳට වැඩ කරනවනම්, ඇයි අපිට Parallel Processing ඕනෙ කියලා. උත්තරේ සරලයි: වේගය!

අද තියෙන computers වල බොහෝ වෙලාවට තනි CPU එකක් ඇතුළේ multiple cores (cores කිහිපයක්) තියෙනවා. සාමාන්‍යයෙන් අපි ලියන program එකක් එක core එකක තමයි run වෙන්නේ. නමුත්, Parallel Processing කියන එකෙන් වෙන්නේ අපේ වැඩේ කොටස් වලට කඩලා ඒ කොටස් එකවර cores කිහිපයකින් run කරන එක. හිතන්න ඔයාලට බර වැඩ ගොඩක් කරන්න තියෙනවා කියලා. ඒ වැඩ ටික එක පුද්ගලයෙක් කරනවට වඩා, කිහිප දෙනෙක් බෙදාගෙන කරනවට වැඩේ ඉක්මනින් ඉවර වෙනවනේ. මේකත් ඒ වගේම තමයි.

විශාල දත්ත ප්‍රමාණයක් (big data sets) process කරනකොට, හෝ CPU එකට බර වැඩ (CPU-intensive tasks) කරනකොට Parallel Processing වලින් විශාල වාසියක් ලබා ගන්න පුළුවන්. ඒකෙන් අපේ application එකේ response time එක අඩු කරලා overall performance එක වැඩි කරනවා.

parallelStream() ගැන දැනගමු

Java Streams API එකේ තියෙන ලොකුම වාසියක් තමයි, අපිට කිසිම අමාරුවක් නැතුව Sequential Stream එකක් Parallel Stream එකක් බවට පත් කරන්න පුළුවන් එක. මේකට අපිට තියෙන්නේ එක Method එකක් විතරයි:

collection.parallelStream();

මෙන්න මෙහෙමයි ඒක පාවිච්චි කරන්නේ:

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

numbers.parallelStream()
       .filter(n -> n % 2 == 0) // Even numbers විතරක් ගන්නවා
       .map(n -> n * n)       // වර්ග කරනවා
       .forEach(System.out::println); // එකින් එක print කරනවා

මේ Code එකෙන් වෙන්නේ ඉරට්ටේ සංඛ්‍යා (even numbers) විතරක් අරගෙන, ඒවා වර්ග කරලා (square කරලා), print කරන එක. මේ operations ටික එකවර cores කිහිපයකින් execute වෙන්න පුළුවන්.

parallelStream() ක්‍රියාත්මක වන ආකාරය

parallelStream() එකක් භාවිතා කරනකොට, Java Virtual Machine (JVM) එක විසින් Fork/Join Framework එක යොදාගෙන තමයි මේ Parallel Processing එක manage කරන්නේ. මේ Framework එකෙන් Data Source එක කුඩා කොටස් වලට කඩලා, ඒ කොටස් ForkJoinPool එකේ තියෙන threads වලට distribute කරනවා. වැඩේ ඉවර වුනාම, ප්‍රතිඵල (results) එකතු කරලා final result එක දෙනවා. මේ සියල්ලම අපිට නොදැනීම JVM එක විසින් සිදු කරනවා. හරිම පහසුයි නේද?

parallelStream() පාවිච්චි කරන්න ඕනෙ කවදද? (When to Use parallelStream())

parallelStream() එකක් සෑම විටම වේගවත් නැහැ. ඒක පාවිච්චි කරන්න ඕනේ විශේෂ අවස්ථා වලදී පමණයි. මෙන්න බලන්න ඒ වගේ අවස්ථා කිහිපයක්:

  1. විශාල දත්ත ප්‍රමාණයක් (Large Data Sets): ඔයාලා ගොඩක් විශාල දත්ත එකතුවක් (List, Set, Map) process කරනවනම්, parallelStream() වලින් හොඳ performance boost එකක් ගන්න පුළුවන්.
  2. CPU-intensive Operations: එක් එක් element එක process කරන්න ගොඩක් computational power එකක් අවශ්‍ය වෙනවනම් (උදාහරණයක් ලෙස, Complex calculations, Image processing, Encryption වගේ දේවල්), parallelStream() එකක් පාවිච්චි කිරීමෙන් වේගය වැඩි කරගන්න පුළුවන්.
  3. Non-interfering, Stateless Operations: Stream operations එකිනෙකට බලපෑමක් නොකරනවා නම් (non-interfering) සහ ඒවාට වෙනත් shared state එකක් අවශ්‍ය නැතිනම් (stateless), parallelStream() එක හොඳින් ක්‍රියා කරනවා. උදාහරණයක් ලෙස map(), filter() වැනි operations.
  4. Data Structure එක: ArrayList, HashMap වගේ data structures වලට parallelStream() එක හොඳින් ක්‍රියා කරනවා. මොකද ඒවා පහසුවෙන් කොටස් වලට කඩන්න පුළුවන් (splittable).

කවදාද පාවිච්චි නොකළ යුත්තේ?

  • කුඩා දත්ත ප්‍රමාණයක් (Small Data Sets): දත්ත ප්‍රමාණය කුඩා නම්, parallel processing setup කරන්න යන overhead එක නිසා sequential stream එකකට වඩා parallelStream() එකක් slow වෙන්න පුළුවන්.
  • I/O-bound Operations: File කියවීම, Network calls වගේ I/O operations කරනකොට parallel processing එකෙන් ලොකු වාසියක් ලැබෙන්නේ නැහැ. මොකද bottleneck එක CPU එක නෙමෙයි, I/O එකයි.
  • Shared Mutable State: Stream operations ඇතුළේ shared variables modify කරනවනම්, race conditions ඇති වෙලා වැරදි ප්‍රතිඵල එන්න පුළුවන්. ඒ වගේ වෙලාවට synchronization කරන්න ගියොත් parallel processing වල වාසිය නැති වෙලා යනවා.

ප්‍රායෝගික උදාහරණයක්: දත්ත වෙනස් කිරීම (Practical Example: Data Transformation)

අපි දැන් ප්‍රායෝගික උදාහරණයක් බලමු. අපි හිතමු අපිට numbers මිලියන ගණනක් තියෙනවා, ඒ හැම number එකක්ම වර්ග කරලා, ඊට පස්සේ ඒවායින් 500ට වඩා වැඩි ඒවා විතරක් අරගෙන, අන්තිමට ඒ හැම number එකටම 10ක් එකතු කරන්න ඕනේ කියලා. මේක අපි Sequential Stream එකකින් සහ Parallel Stream එකකින් කරලා ඒ දෙකේ performance එක සන්සන්දනය කරලා බලමු.

import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class ParallelStreamDemo {

    public static void main(String[] args) {
        int dataSize = 10_000_000; // මිලියන 10ක දත්ත ප්‍රමාණයක්

        // විශාල integer list එකක් හදමු
        List<Integer> numbers = IntStream.rangeClosed(1, dataSize)
                                        .boxed()
                                        .collect(Collectors.toList());

        System.out.println("Processing " + dataSize + " numbers...\n");

        // --- Sequential Stream Demonstration ---
        long startTimeSequential = System.currentTimeMillis();
        List<Integer> resultSequential = numbers.stream()
                                                .map(n -> n * n) // වර්ග කරනවා
                                                .filter(n -> n > 500) // 500ට වඩා වැඩි ඒවා විතරක්
                                                .map(n -> n + 10) // 10ක් එකතු කරනවා
                                                .collect(Collectors.toList());
        long endTimeSequential = System.currentTimeMillis();
        System.out.println("Sequential Stream Processing Time: " + (endTimeSequential - startTimeSequential) + " ms");
        // System.out.println("Sequential Result Size: " + resultSequential.size()); // ප්‍රතිඵලයේ ප්‍රමාණය පරීක්ෂා කිරීමට

        System.out.println("-----------------------------------
");

        // --- Parallel Stream Demonstration ---
        long startTimeParallel = System.currentTimeMillis();
        List<Integer> resultParallel = numbers.parallelStream()
                                              .map(n -> n * n) // වර්ග කරනවා
                                              .filter(n -> n > 500) // 500ට වඩා වැඩි ඒවා විතරක්
                                              .map(n -> n + 10) // 10ක් එකතු කරනවා
                                              .collect(Collectors.toList());
        long endTimeParallel = System.currentTimeMillis();
        System.out.println("Parallel Stream Processing Time: " + (endTimeParallel - startTimeParallel) + " ms");
        // System.out.println("Parallel Result Size: " + resultParallel.size()); // ප්‍රතිඵලයේ ප්‍රමාණය පරීක්ෂා කිරීමට

        // දෙකේම ප්‍රතිඵල එක සමානදැයි පරීක්ෂා කිරීම
        if (resultSequential.size() == resultParallel.size() && resultSequential.containsAll(resultParallel)) {
            System.out.println("\nResults are consistent between sequential and parallel streams.");
        } else {
            System.out.println("\nWARNING: Results are NOT consistent!");
        }
    }
}

මේ Code එක run කරලා බලන්න. ඔයාලට පැහැදිලිව පෙනේවි parallelStream() එක භාවිතා කරනකොට ගතවන කාලය ගොඩක් අඩු බව. මේක performance test එකක් නෙමෙයි, සාමාන්‍ය demonstration එකක් විතරයි. නිවැරදි performance benchmarks ගන්නවා නම් JMH (Java Microbenchmark Harness) වගේ tools පාවිච්චි කරන්න වෙනවා.

සැලකිලිමත් විය යුතු කරුණු (Things to Consider & Cautions)

parallelStream() කියන්නේ ඉතා බලවත් tool එකක් වුනත්, ඒක පාවිච්චි කරනකොට අපිට ගොඩක් දේවල් ගැන සැලකිලිමත් වෙන්න ඕනේ. නැත්නම් හිතපු ප්‍රතිඵල නොලැබී යන්න පුළුවන්, නැත්නම් application එක slow වෙන්නත් පුළුවන්.

1. Overhead එක

Parallel processing set up කරන්නත් යම්කිසි කාලයක් සහ resources ප්‍රමාණයක් වැය වෙනවා (overhead). දත්ත ප්‍රමාණය කුඩා නම්, මේ overhead එක නිසා parallelStream() එකක් sequential එකකට වඩා slow වෙන්න පුළුවන්.

2. Shared State (බෙදාගත් තත්ත්වය)

මෙන්න මේක තමයි ගොඩක්ම වැදගත් කාරණාව. parallelStream() එකක් ඇතුළේ shared mutable state (එකිනෙකට වෙනස් කළ හැකි, share කරන ලද variables) පාවිච්චි කරන එක සම්පූර්ණයෙන්ම වගේ නවත්තන්න ඕනේ. උදාහරණයක් ලෙස, stream එකක් ඇතුළේ external List එකකට elements add කරනවා නම්, Race Conditions ඇති වෙලා වැරදි ප්‍රතිඵල එන්න පුළුවන්. Stream operations හැමවිටම Stateless සහ Non-interfering වෙන්න ඕනේ.

මේ වගේ තත්ත්වයක් වළක්වා ගන්න collect() වගේ terminal operations භාවිතා කරන්න. ඒකෙන් thread-safe විදිහට ප්‍රතිඵල එකතු කරන්න පුළුවන්.

3. ForkJoinPool

parallelStream() එක විසින් default thread pool එකක් භාවිතා කරනවා, ඒ තමයි ForkJoinPool.commonPool(). මේ pool එකෙන් තමයි parallel tasks execute කරන්න threads manage කරන්නේ. මේ common pool එක global එකක් නිසා, ඔයාලගේ application එකේ වෙන තැන් වලත් heavy parallel tasks run වෙනවනම්, මේ pool එකේ threads block වෙලා හෝ exhausted වෙලා application එකේ performance එකට බලපෑමක් වෙන්න පුළුවන්.

අවශ්‍ය නම්, අපිට වෙනම ForkJoinPool එකක් හදාගෙන parallelStream() එක ඒක ඇතුළේ run කරන්න පුළුවන්, නමුත් ඒක advanced use case එකක්.

4. Order of Elements (Elements වල අනුපිළිවෙල)

සාමාන්‍යයෙන් parallelStream() එකක් පාවිච්චි කරනකොට, operations execute වන අනුපිළිවෙල guarantee වෙන්නේ නැහැ. ඒ කියන්නේ forEach() එකක් භාවිතා කරනකොට elements print වෙන order එක stream එකේ මුල් order එකට වඩා වෙනස් වෙන්න පුළුවන්.

ඔයාලට අනිවාර්යයෙන්ම මුල් order එක maintain කරන්න ඕනේ නම්, forEachOrdered() කියන Method එක භාවිතා කරන්න පුළුවන්. නමුත් ඒකෙන් parallel processing වල performance වාසිය යම් ප්‍රමාණයකට අඩු වෙන්න පුළුවන්.

5. I/O Bound vs. CPU Bound

parallelStream() එක වඩාත් හොඳින් වැඩ කරන්නේ CPU-bound tasks (CPU එකට බර වැඩ) වලදී. ඒ කියන්නේ operation එකක් කරන්න ගොඩක් CPU time එකක් වැය වෙනවනම්. I/O-bound tasks (file read/write, network requests වගේ දේවල්) වලදී parallelStream() එකෙන් ලොකු වාසියක් ලැබෙන්නේ නැහැ. මොකද මේවා බොහෝ වෙලාවට CPU එක එනතුරු බලාගෙන ඉන්නවා (waiting time). ඒ වගේ අවස්ථා වලට Asynchronous programming (උදා: CompletableFuture) වගේ දේවල් වඩාත් සුදුසුයි.

නිගමනය (Conclusion)

ඉතින් යාලුවනේ, අද අපි Java Streams වල parallelStream() Method එක ගැන ගොඩක් දේවල් ඉගෙන ගත්තා. මේකෙන් අපිට පුළුවන් Java application වල performance එක වැඩි කරගෙන, විශාල දත්ත ප්‍රමාණයක් කාර්යක්ෂමව process කරන්න. හැබැයි, ඕනෑම බලවත් tool එකක් වගේම, parallelStream() එකත් නිවැරදිව සහ සැලකිලිමත්ව භාවිතා කරන්න ඕනේ.

මතක තියාගන්න:

  • parallelStream() එකක් පාවිච්චි කරන්න ඕනේ විශාල දත්ත ප්‍රමාණයක් සහ CPU-bound operations වලදී.
  • හැමවිටම stateless සහ non-interfering operations පාවිච්චි කරන්න. Shared mutable state වලින් වළකින්න.
  • Small data sets වලට සහ I/O-bound tasks වලට sequential streams හෝ වෙනත් concurrency mechanisms භාවිතා කරන්න.

ඔයාලගේ next Java project එකේදී parallelStream() පාවිච්චි කරලා බලන්න. ඒකෙන් ඔයාලගේ application එකේ වේගය කොච්චර වැඩි වෙනවද කියලා ඔයාලටම බලාගන්න පුළුවන් වේවි!

මේ ගැන ඔයාලගේ අත්දැකීම් කොහොමද? පහලින් Comment එකක් දාගෙන යන්න. ප්‍රශ්න තියෙනවනම් අහන්න බය වෙන්න එපා! අපි නැවතත් තවත් වැදගත් Topic එකකින් හමුවෙමු! Happy Coding! 😊