Java Parallel Streams Sinhala Guide | Concurrency in Java 8 | Performance Optimization

Java Parallel Streams Sinhala Guide | Concurrency in Java 8 | Performance Optimization

ආයුබෝවන් යාළුවනේ! අද අපි කතා කරන්න යන්නේ Java programming වල ගොඩක් වැදගත් සහ බලගතු විශේෂාංගයක් ගැන. ඒ තමයි Java 8 එක්ක ආපු Parallel Streams.

දැන් තියෙන computers වල multi-core CPUs තියෙනවා නේ. ඒ core ඔක්කොම එකට වැඩට දාලා, අපේ programs වේගවත් කරගන්න පුළුවන් නම්, ඒක නියමයි නේද? Parallel Streams කියන්නේ හරියට ඒකටම තියෙන solution එකක්. මේකෙන් අපි බලමු කොහොමද අපේ application එකේ performance වැඩි කරගන්නේ කියල.

අද අපි මේ tutorial එකෙන් Stream API එක මොකක්ද, Parallel Streams කියන්නේ මොනවද, ඒවා කොහොමද implement කරන්නේ, ඒවගේ වාසි අවාසි මොනවද වගේ ගොඩක් දේවල් ගැන කතා කරමු. අන්තිමට, ඔයාගේ Project එකේදී මේවා පාවිච්චි කරන්න කලින් දැනගන්න ඕන වැදගත් කරුණු කිහිපයකුත් අපි සාකච්ඡා කරමු.

Stream API එක ගැන කෙටි හැඳින්වීමක් (A Quick Intro to the Stream API)

Parallel Streams ගැන කතා කරන්න කලින්, අපි Stream API එක ගැන පොඩ්ඩක් මතක් කරගමු. Java 8 එක්ක ආපු Stream API එක කියන්නේ Collections process කරන්න තියෙන අලුත්ම, functional-style විදිහක්.

සාමාන්‍යයෙන් අපි for-each loop එකක් වගේ පාවිච්චි කරලා Collection එකක තියෙන elements process කරනවා නේද? Stream API එකෙන් අපිට ඒක declarative විදිහට, data transformations chain එකක් විදිහට කරන්න පුළුවන්. මේකෙන් Code එක කියවන්න ලේසියි, වගේම වැඩේ කරන විදිහ ගැන වැඩි focus එකක් දෙන්නේ නැතුව, මොකක්ද කරන්න ඕනේ කියන එක ගැන focus කරන්න පුළුවන්.

සරල Sequential Stream එකක උදාහරණයක් බලමු:

import java.util.Arrays;
import java.util.List;

public class SequentialStreamExample {
    public static void main(String[] args) {
        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

        // සාමාන්‍ය sequential stream එකක්
        long sumOfSquaresOfEvenNumbers = numbers.stream()
                                             .filter(n -> n % 2 == 0) // Even numbers (ඉරට්ටේ සංඛ්‍යා) විතරක් ගන්නවා
                                             .map(n -> n * n)         // ඒවා වර්ග කරනවා
                                             .reduce(0, Integer::sum); // වර්ග කරපු සංඛ්‍යා එකතු කරනවා

        System.out.println("Sequential Stream: Sum of squares of even numbers: " + sumOfSquaresOfEvenEvenNumbers);
        // Output: Sequential Stream: Sum of squares of even numbers: 220 (4+16+36+64+100)
    }
}

මේ code එකේදී filter, map, reduce වගේ operations pipeline එකක් වගේ එකින් එකට වැඩ කරනවා. මේක Sequential Stream එකක් නිසා, හැම operation එකක්ම එක thread එකකින්, එකින් එකට තමයි process වෙන්නේ.

Parallel Streams මොකටද? (Why Parallel Streams?)

හරි, දැන් අපි Sequential Streams ගැන දන්නවා. එහෙනම් Parallel Streams මොකටද? Multi-core processors තියෙනවා නම්, එකම core එකක ඔක්කොම වැඩ කරගෙන ඉන්න එක වැඩක් නෑ නේද? Parallel Streams වලින් වෙන්නේ, Stream එකේ තියෙන data sets ටික පොඩි කෑලි වලට කඩලා (splitting), ඒ කෑලි ටික එකපාරට, එක එක core වල process කරන එක.

හිතන්න ඔයාට ගොඩක් ලොකු data set එකක් තියෙනවා කියලා process කරන්න. උදාහරණයක් විදිහට, මිලියන ගණනක අගයන් analyze කරන්න, නැත්නම් ලොකු Collection එකක තියෙන දේවල් sort කරන්න වගේ. Sequential stream එකක් පාවිච්චි කරනකොට, ඒක වෙලාවක් යන වැඩක් වෙන්න පුළුවන්. Parallel stream එකකින් වෙන්නේ ඒ වැඩේ speed up කරන එක.

Parallel Streams හැමවිටම හොඳද?

කොහොම වුණත්, හැම වෙලාවෙම Parallel Streams හොඳ නැහැ. ඒක වැදගත් කරුණක්. සමහර අවස්ථාවලදී Parallel Streams පාවිච්චි කිරීමෙන් performance අඩු වෙන්නත් පුළුවන්.

  • Data set එක පොඩි නම්: Stream එක parallel කරන්න, threads manage කරන්න වගේ දේවලට යම් overhead එකක් තියෙනවා. Data set එක පොඩි නම්, මේ overhead එක නිසා sequential stream එකකට වඩා performance අඩුවෙන්න පුළුවන්.
  • I/O-bound operations (ෆයිල් කියවන එක, network calls වගේ): මේවා CPU එක මත රඳා පවතින්නේ නැති නිසා, core කීපයක වැඩ කරන එකෙන් ලොකු වාසියක් නෑ. මේවාට සාමාන්‍යයෙන් asynchronous programming techniques තමයි වඩා සුදුසු.
  • Operations වලට Order එක වැදගත් නම්: සමහර operations වලදී order එක වැදගත් වෙනවා. Parallel Streams වලදී order එක හැමවිටම guarantee වෙන්නේ නැහැ.

Parallel Streams හදන්නේ කොහොමද? (How to Create Parallel Streams?)

Parallel Streams හදන එක හරිම ලේසියි. ප්‍රධාන ක්‍රම දෙකක් තියෙනවා.

1. parallelStream() method එක පාවිච්චි කිරීම

Collections වලට parallelStream() කියලා method එකක් තියෙනවා. ඒක call කරාම Collection එකෙන් direct Parallel Stream එකක් හදාගන්න පුළුවන්.

import java.util.Arrays;
import java.util.List;

public class ParallelStreamCreationExample1 {
    public static void main(String[] args) {
        List<String> words = Arrays.asList("apple", "banana", "cat", "dog", "elephant", "fish", "grape", "house");

        // Collection එකකින් direct Parallel Stream එකක් හදනවා
        long startTime = System.nanoTime();
        long countUpperCaseWords = words.parallelStream()
                                        .map(String::toUpperCase) // හැම word එකක්ම uppercase කරනවා
                                        .filter(s -> s.startsWith("A")) // 'A' වලින් පටන් ගන්න ඒවා විතරක් ගන්නවා
                                        .count(); // ගණන් කරනවා
        long endTime = System.nanoTime();

        System.out.println("Parallel Stream: Count of uppercase words starting with 'A': " + countUpperCaseWords);
        System.out.println("Time taken: " + (endTime - startTime) / 1_000_000.0 + " ms");
        // Output: Parallel Stream: Count of uppercase words starting with 'A': 2
    }
}

2. parallel() method එක පාවිච්චි කිරීම

ඔයාට දැනටමත් sequential stream එකක් තියෙනවා නම්, ඒකට parallel() method එක call කරලා parallel stream එකක් බවට පත් කරන්න පුළුවන්.

import java.util.stream.IntStream;

public class ParallelStreamCreationExample2 {
    public static void main(String[] args) {
        // ලොකු IntStream එකක් හදාගෙන, ඒක parallel කරනවා
        long limit = 100_000_000; // 100 million

        long startTimeSequential = System.nanoTime();
        long sumSequential = IntStream.rangeClosed(1, (int) limit) // 1 ඉඳන් limit දක්වා සංඛ්‍යා
                                        .sum(); // එකතු කරනවා
        long endTimeSequential = System.nanoTime();

        System.out.println("Sequential Stream: Sum of numbers: " + sumSequential);
        System.out.println("Sequential Time taken: " + (endTimeSequential - startTimeSequential) / 1_000_000.0 + " ms");

        long startTimeParallel = System.nanoTime();
        long sumParallel = IntStream.rangeClosed(1, (int) limit)
                                    .parallel() // Stream එක parallel කරනවා
                                    .sum(); // එකතු කරනවා
        long endTimeParallel = System.nanoTime();

        System.out.println("Parallel Stream: Sum of numbers: " + sumParallel);
        System.out.println("Parallel Time taken: " + (endTimeParallel - startTimeParallel) / 1_000_000.0 + " ms");

        // Output එක ඔයාගේ CPU එක අනුව වෙනස් වේවි. Parallel එක සාමාන්‍යයෙන් වේගවත් විය යුතුයි.
    }
}

මේ code එක run කරලා බැලුවොත්, ලොකු data set එකකදී parallel() stream එක කොච්චර වේගවත්ද කියලා ඔයාටම බලාගන්න පුළුවන්. ඔයාගේ CPU එකේ cores ගාන අනුව, මේකේ වේගය වෙනස් වෙන්න පුළුවන්.

ඇතුලත සිදුවන දේ: Fork/Join Framework එක (Under the Hood: The Fork/Join Framework)

Parallel Streams වැඩ කරන්නේ Java's Fork/Join Framework එක පාවිච්චි කරලා. මේකෙන් වෙන්නේ task එක පොඩි sub-tasks වලට කඩලා (Fork), ඒ sub-tasks එකපාරට process කරලා, ආපහු ඒ results එකතු කරන එක (Join).

  • Fork: ලොකු task එකක් පොඩි, ස්වාධීන sub-tasks වලට බෙදනවා.
  • Join: Sub-tasks වල results එකතු කරලා, අවසාන result එක හදනවා.

සාමාන්‍යයෙන් Parallel Streams වලට Default ForkJoinPool එක පාවිච්චි කරනවා. ඒකේ thread count එක, ඔයාගේ computer එකේ CPU cores ගානට සමානයි. මේකෙන් වෙන්නේ ඔයාගේ CPU එකේ cores ගානට වැඩ ටික බෙදාගෙන, ඵලදායී විදිහට process කරන එක.

වැදගත් කරුණු:

  • Order එක: Parallel processing වලදී order එක හැමවිටම guaranteed නෑ. ඒ නිසා forEachOrdered වගේ operations පාවිච්චි කරන්න වෙනවා, order එක වැදගත් නම්.
  • Shared Mutable State: Shared mutable state (එකම variable එකකට threads කීපයක් ලියන්න හදනවා නම්) තියෙන operations වලින් පරිස්සම් වෙන්න ඕනේ. ඒවයින් data inconsistencies වෙන්න පුළුවන්. ඒ නිසා side-effect-free operations පාවිච්චි කරන්න පුරුදු වෙන්න.

ප්‍රායෝගික කරුණු සහ හොඳම පුරුදු (Practical Considerations & Best Practices)

Parallel Streams කියන්නේ බලගතු tool එකක් වුණත්, ඒවා නිවැරදිව පාවිච්චි කරන එක ගොඩක් වැදගත්. මෙන්න හොඳම පුරුදු කිහිපයක්:

Common ForkJoinPool එක ගැන සැලකිලිමත් වෙන්න

Parallel Streams වලට Default ForkJoinPool එක පාවිච්චි කරනවා. ඔයාගේ application එකේ ಬೇರೆ තැන්වලත් මේ Pool එක පාවිච්චි කරනවා නම්, Parallel Stream එකක් run කරන එකෙන් අනිත් tasks වලට බලපෑමක් වෙන්න පුළුවන්. විශේෂයෙන්ම I/O-bound operations කරන Parallel Streams වලින් Default Pool එකේ threads block වෙන්න පුළුවන්.

Stream Source එක (Characteristics of the Data Source)

ArrayList, HashMap වගේ Collections වලට Parallel Streams හොඳට වැඩ කරනවා. මොකද ඒවා පහසුවෙන් පොඩි කෑලි වලට කඩන්න පුළුවන්. නමුත් Linked Lists වගේ ඒවට එච්චර හොඳ නැහැ, මොකද ඒවා effective විදිහට කඩන්න අමාරුයි, ඒ වගේම element එකක් ගන්න ගොඩක් වෙලා යනවා.

Side-Effects වලින් වළකින්න (Avoid Side-Effects)

Lambda expressions වලදී shared mutable state වෙනස් කරන එකෙන් වළකින්න. මේවට side-effects කියලා කියනවා. Pure functions (input එකට විතරක් වැඩ කරන, පිටත දේවල් වෙනස් නොකරන) පාවිච්චි කරන්න පුරුදු වෙන්න. උදාහරණයක් විදිහට, AtomicInteger වගේ Thread-safe classes පාවිච්චි කරන්න පුළුවන්, නැත්නම් `reduce` operation එක පාවිච්චි කරන්න.

Performance Test කරන්න (Benchmark)

Parallel Streams හැමවිටම වේගවත් නෑ. පොඩි data sets වලට, overhead එක නිසා slow වෙන්න පුළුවන්. ඒ නිසා, ඔයාගේ Use Case එකට Sequential Streams වඩා හොඳද, නැත්නම් Parallel Streams වඩා හොඳද කියලා test කරලා බලන්න ඕනේ. Java Microbenchmark Harness (JMH) වගේ tools මේකට පාවිච්චි කරන්න පුළුවන්.

නිගමනය (Conclusion)

ඉතින් යාළුවනේ, අපි අද Java 8 Parallel Streams ගැන ගොඩක් දේවල් ඉගෙන ගත්තා. මේකෙන් අපිට පුළුවන් අපේ applications වල multi-core CPU power එකෙන් උපරිම ප්‍රයෝජන අරගෙන performance වැඩි කරගන්න.

හැබැයි, හැමවිටම මේක හොඳම solution එක නෙවෙයි කියලත් අපි දැක්කා. නියම තැනට, නියම වෙලාවට Parallel Streams පාවිච්චි කරන එක තමයි වැදගත්. Performance gain එකක් තියෙනවා නම් විතරක් මේවා පාවිච්චි කරන්න, හැබැයි overhead එක සහ side-effects ගැනත් සැලකිලිමත් වෙන්න ඕනේ.

ඔයාලත් මේක ඔයාලගේ project වලට implement කරලා බලන්න. ඔයාලගේ අත්දැකීම් පහලින් comment කරන්නත් අමතක කරන්න එපා. තවත් මේ වගේ tutorial එකකින් හමුවෙමු! සුභ දවසක්!