Java Parallel Streams | වැඩි දියුණු කළ කාර්යසාධනය | Sinhala Guide

ඉතින් කොහොමද යාලුවනේ! අද අපි කතා කරන්න යන්නේ Java 8 එකේ තියෙන හරිම වැදගත්, ඒ වගේම අපේ applications වල performance එක වැඩි කරන්න පුළුවන් කමක් තියෙන topic එකක් ගැන. ඒ තමයි Parallel Streams.
අද කාලේ තියෙන computers වල CPUs වලට තියෙන්නේ එක core එකක් නෙවෙයි. ගොඩක් වෙලාවට multiple cores. ඒ කියන්නේ අපේ program එකේ task කීපයක් එකම වෙලාවේ, එකට කරන්න පුළුවන්. හැබැයි මේ multi-core CPUs වල උපරිම ප්රයෝජන ගන්න නම් අපේ code එකත් ඒකට ගැලපෙන්න ලියන්න ඕනේ. සාමාන්යයෙන් Concurrency (එකම වෙලාවේ දේවල් කිහිපයක් කරන එක) කියන එක program කරන්න හරිම අමාරු දෙයක්. Threads manage කරන එක, deadlocks, race conditions වගේ issues නැතිව code ලියන එක developers ලට ලොකු headache එකක්.
හැබැයි Java 8 එකත් එක්ක ආපු Stream API එක මේ වැඩේට ලොකු පහසුවක් දුන්නා. Stream API එක functional programming style එකක් ගෙනාවා, ඒ වගේම Collections process කරන එක හරිම ලේසි කළා. ඒත් එක්කම, Stream API එකේ තියෙන විශේෂ හැකියාවක් තමයි Parallel Streams. මේකෙන් වෙන්නේ අපේ stream operations multi-core CPUs වලට බෙදලා, එකම වෙලාවේ run කරන එක. ඒ කියන්නේ code එකේ එකම Line of Code එකක් වෙනස් නොකර අපිට performance එක වැඩි කරගන්න පුළුවන්!
මේ tutorial එකේදී අපි Parallel Streams කියන්නේ මොනවද, ඒවා භාවිතා කරන්නේ කොහොමද, ඒ වගේම ඒවා භාවිතා කරද්දි සැලකිලිමත් වෙන්න ඕන දේවල් මොනවද කියලා පැහැදිලිව කතා කරමු. අවසානයේ අපි practical examples කීපයකුත් බලමු, එහෙනම් වැඩේට බහිමු!
Stream API එකේ පොඩි හැඳින්වීමක්
Parallel Streams ගැන කතා කරන්න කලින්, අපි පොඩ්ඩක් මතක් කරගමු සාමාන්ය Stream API එක ගැන. Java 8 වලදී හඳුන්වා දුන්න Stream API එක Collection එකක තියෙන elements වලට sequence of operations apply කරන්න පුළුවන් විදියක්. මේකෙන් Data Processing pipelines හදන්න පුළුවන්. Stream එකක් කියන්නේ data structure එකක් නෙවෙයි, data source එකකට (e.g., List
, Set
, Array
) උඩින් "pipeline" එකක් වගේ ක්රියා කරන එකක්.
උදාහරණයක් විදියට, List
එකක තියෙන සංඛ්යා filter කරලා, ඒවා square කරලා, අන්තිමට ඒවගේ එකතුව හොයනවා කියලා හිතන්න. සාමාන්ය for-each
loop එකකින් මේක කරන්න පුළුවන් වුණත්, Stream API එකෙන් මේක හරිම elegant විදියට කරන්න පුළුවන්.
import java.util.Arrays;
import java.util.List;
public class SequentialStreamExample {
public static void main(String[] args) {
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
// Sequential Stream එකක් භාවිතා කරලා, ඉරට්ටේ සංඛ්යා (even numbers)
// filter කරලා, ඒවායේ වර්ග (squares) හොයලා, එකතු කරනවා.
long sumOfSquaresOfEvenNumbers = numbers.stream()
.filter(n -> n % 2 == 0) // ඉරට්ටේ සංඛ්යා තෝරනවා
.mapToLong(n -> (long) n * n) // ඒවායේ වර්ග හොයනවා
.sum(); // එකතු කරනවා
System.out.println("Sequential Stream Result: " + sumOfSquaresOfEvenNumbers);
}
}
මේ code එක බලන්න. කෙටියි, කියවන්න ලේසියි නේද? මෙතන .stream()
කියන method එකෙන් තමයි අපි sequential stream එකක් හදන්නේ.
Parallel Streams මොනවද?
හරි, දැන් අපි එමු අපේ ප්රධාන මාතෘකාවට. Parallel Streams කියන්නේ මොනවද? සරලවම කිව්වොත්, stream එකක් sequential විදියට process කරනවා වෙනුවට, ඒක task කීපයකට කඩලා, ඒ task ටික එකම වෙලාවේ වෙන වෙනම threads වල run කරලා, අන්තිමට ඒ result ටික එකතු කරන එක තමයි Parallel Streams වලින් කරන්නේ.
මේ වැඩේට Java 8 වල තියෙන Fork/Join Framework එක තමයි යටින් use කරන්නේ. මේ Framework එකෙන් ලොකු task එකක් පොඩි පොඩි sub-tasks වලට බෙදලා (fork), ඒ sub-tasks execute කරලා, අන්තිමට ඒ result ටික එකතු කරනවා (join). මේකෙන් Multi-core CPUs වල උපරිම ප්රයෝජන ගන්න පුළුවන්.
Stream එකක් Parallel Stream එකක් බවට පත් කරන එක හරිම ලේසියි. අපිට තියෙන්නේ .stream()
වෙනුවට .parallelStream()
කියන method එක Collection එකට call කරන එක. නැත්නම්, දැනට තියෙන sequential stream එකකට .parallel()
කියන intermediate operation එක add කරන එක.
// Collection එකකින් කෙලින්ම Parallel Stream එකක් හදනවා
List<String> names = Arrays.asList("Ruwan", "Kamal", "Nimal", "Sunil");
names.parallelStream()
.forEach(System.out::println);
// Sequential Stream එකක් Parallel Stream එකක් බවට පත් කරනවා
numbers.stream()
.parallel() // මෙන්න මෙතනින් තමයි stream එක parallel වෙන්නේ
.filter(n -> n % 2 == 0)
.mapToLong(n -> (long) n * n)
.sum();
අපිට පේනවා නේද code එකේ ලොකු වෙනසක් නෑ. හැබැයි යටින් වෙන වැඩේ නම් ලොකු වෙනසක්!
Sequential Stream එකක් Parallel Stream එකක් බවට හරවමු
අපි කලින් Sequential Stream එකෙන් කරපු උදාහරණයම Parallel Stream එකකින් කරලා බලමු. ඒ වගේම මේ දෙකේ performance එකත් අපි සසඳලා බලමු. සාමාන්යයෙන් පොඩි data sets වලට Parallel Streams වලින් ලොකු performance boost එකක් ලැබෙන්නේ නෑ. මොකද threads create කරන එකටයි, task manage කරන එකටයි වෙන overhead එකක් තියෙනවා. හැබැයි ලොකු data sets වලට නම් ලොකු වෙනසක් බලාගන්න පුළුවන්.
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class StreamPerformanceComparison {
public static void main(String[] args) {
// ලොකු Integer List එකක් හදමු
List<Integer> bigListOfNumbers = IntStream.range(0, 10_000_000) // මිලියන 10ක්
.boxed()
.collect(Collectors.toList());
System.out.println("Processing " + bigListOfNumbers.size() + " numbers...");
// Sequential Stream එකකින් Process කරලා බලමු
long startTimeSequential = System.nanoTime();
long sequentialSum = bigListOfNumbers.stream()
.filter(n -> n % 2 == 0)
.mapToLong(n -> (long) n * n)
.sum();
long endTimeSequential = System.nanoTime();
long durationSequential = (endTimeSequential - startTimeSequential) / 1_000_000; // milliseconds
System.out.println("Sequential Stream Sum: " + sequentialSum);
System.out.println("Sequential Stream took: " + durationSequential + " ms");
System.out.println("------------------------------------");
// Parallel Stream එකකින් Process කරලා බලමු
long startTimeParallel = System.nanoTime();
long parallelSum = bigListOfNumbers.parallelStream() // මෙතනින් තමයි වෙනස වෙන්නේ!
.filter(n -> n % 2 == 0)
.mapToLong(n -> (long) n * n)
.sum();
long endTimeParallel = System.nanoTime();
long durationParallel = (endTimeParallel - startTimeParallel) / 1_000_000; // milliseconds
System.out.println("Parallel Stream Sum: " + parallelSum);
System.out.println("Parallel Stream took: " + durationParallel + " ms");
System.out.println("\nPerformance Improvement: " + (double) durationSequential / durationParallel + "x");
}
}
මේ program එක run කරලා බලන්න. ඔයාලගේ computer එකේ cores කීයක් තියෙනවද කියන එක අනුව, වගේම JVM එකේ settings අනුව result වෙනස් වෙන්න පුළුවන්. හැබැයි ගොඩක් වෙලාවට Parallel Stream එක sequential stream එකට වඩා වේගවත් වෙයි, විශේෂයෙන්ම Data set එක ලොකු නිසා.
Parallel Streams භාවිතයේදී සැලකිලිමත් විය යුතු කරුණු
Parallel Streams කියන්නේ magic wand එකක් නෙවෙයි. හැම වෙලාවෙම ඒවා භාවිතා කිරීමෙන් performance එක වැඩි වෙන්නේ නෑ. සමහර වෙලාවට ඒක අඩු වෙන්නත් පුළුවන්. ඒ නිසා මේ කරුණු ටික සැලකිල්ලට ගන්න.
1. Performance Benefits: කවදාද වැඩ කරන්නේ?
- Data Set එකේ Size එක: Parallel Streams වලින් ලොකුම වාසිය ලැබෙන්නේ ලොකු data sets process කරද්දි. පොඩි data sets වලට threads create කරන්නයි, task manage කරන්නයි යන overhead එක නිසා performance එක අඩු වෙන්න පුළුවන්.
- Operation එකේ ස්වභාවය: CPU-bound operations (calculation වගේ දේවල්) සඳහා Parallel Streams හරිම හොඳයි. I/O-bound operations (file read/write, network calls) වලට නම් සාමාන්යයෙන් Parallel Streams වලින් ලොකු වාසියක් ලැබෙන්නේ නෑ, මොකද ගොඩක් වෙලාවට I/O එක ඉවර වෙනකන් threads wait කරන්න වෙන නිසා.
- Cores ගණන: ඔයාගේ CPU එකේ තියෙන cores ගණන අනුව performance benefit එක වෙනස් වෙනවා. Cores වැඩි වෙන්න වැඩි වෙන්න, Parallel Streams වලින් ගන්න පුළුවන් වාසියත් වැඩි වෙනවා.
2. Stateful Operations වලින් පරිස්සම් වෙන්න
Stream operations stateless වෙන්න ඕනේ කියන එක ගොඩක් වැදගත්. ඒ කියන්නේ operation එකක් run වෙද්දි, external mutable state එකක් modify කරන්න හොඳ නෑ. විශේෂයෙන්ම forEach()
වගේ terminal operation එකක් භාවිතා කරද්දි, ඒක ඇතුලේ external variable එකක් modify කරන්න ගියොත්, race conditions ඇති වෙලා වැරදි results එන්න පුළුවන්.
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
public class ParallelStreamStatefulExample {
public static void main(String[] args) {
List<Integer> numbers = List.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
List<Integer> results = new ArrayList<>(); // External mutable state
// Parallel Stream එකක් භාවිතා කරලා, stateful operation එකක් කරනවා
// මේකෙන් වැරදි results එන්න පුළුවන්!
numbers.parallelStream()
.filter(n -> n % 2 == 0)
.forEach(results::add); // NOT thread-safe!
// Output එක බලන්න. හැම වෙලාවෙම හරිම results එන්නේ නෑ.
// Threads එකම වෙලාවේ add කරන්න හදන නිසා duplicate entries හෝ missing entries වෙන්න පුළුවන්.
System.out.println("Results (may be incorrect): " + results);
System.out.println("Size (may be incorrect): " + results.size());
// මේකට නිවැරදි ක්රමය තමයි collector එකක් භාවිතා කරන එක
List<Integer> correctResults = numbers.parallelStream()
.filter(n -> n % 2 == 0)
.collect(Collectors.toList()); // Thread-safe collector
System.out.println("Correct Results: " + correctResults);
System.out.println("Correct Size: " + correctResults.size());
}
}
මේ example එකේදී forEach(results::add)
කියන එක Parallel Stream එකකදී භාවිතා කිරීම වැරදියි. මොකද results
කියන්නේ shared mutable state එකක්. ඒ වෙනුවට collect(Collectors.toList())
වගේ thread-safe collector එකක් භාවිතා කරන්න.
3. Ordering: Data එකේ පිළිවෙල
සාමාන්යයෙන් Parallel Streams වල operations run වෙන්නේ වෙන වෙනම. ඒ නිසා elements වල original order එක guarantee කරන්නේ නෑ. හැබැයි සමහර operation වලදී (e.g., forEachOrdered()
) order එක maintain කරන්න පුළුවන්. හැබැයි එහෙම කරද්දි Parallel Stream එකේ performance benefit එක අඩු වෙන්න පුළුවන්.
import java.util.List;
public class ParallelStreamOrderExample {
public static void main(String[] args) {
List<String> names = List.of("Apple", "Banana", "Cherry", "Date", "Elderberry");
System.out.println("Using forEach (order not guaranteed):");
names.parallelStream()
.forEach(s -> System.out.println(Thread.currentThread().getName() + ": " + s));
System.out.println("\nUsing forEachOrdered (order guaranteed):");
names.parallelStream()
.forEachOrdered(s -> System.out.println(Thread.currentThread().getName() + ": " + s));
}
}
පළවෙනි forEach
එකේ output එකේදී elements print වෙන පිළිවෙල වෙනස් වෙන්න පුළුවන්. දෙවැනි forEachOrdered
එකේදී elements original order එකටම print වෙයි, නමුත් ඒක Parallel Stream එකේ performance එකට බලපාලා තියෙන්නේ. මෙතනදී threads ගොඩක් තිබුණත්, output එක print වෙන්නේ පිළිවෙලකට නිසා, threads වලට එකිනෙකාට wait කරන්න සිද්ධ වෙනවා.
4. Common ForkJoinPool එක
Java Parallel Streams වලට threads create කරන්නේ ForkJoinPool.commonPool()
එකෙන්. මේ pool එක Java application එකේ හැම Parallel Stream එකකටම share වෙනවා. මේ pool එකේ threads ගණන සාමාන්යයෙන් ඔයාගේ CPU එකේ cores ගණනට සමානයි (ඒක JVM එකේ -Djava.util.concurrent.ForkJoinPool.common.parallelism
property එකෙන් වෙනස් කරන්න පුළුවන්). ඒ නිසා, ඔයාගේ application එකේ ගොඩක් Parallel Streams එකම වෙලාවේ run වෙනවා නම්, මේ pool එකේ threads අවසන් වෙලා performance එක අඩු වෙන්න පුළුවන්.
Practical Example: ලොකු දත්ත ගොනුවක් Process කරමු
අපි තවත් පොඩි practical example එකක් බලමු. User objects ලොකු list එකක් තියෙනවා කියලා හිතන්න. අපිට ඕනේ මේ users ලාගෙන් යම්කිසි වයසකට වඩා වැඩි අයව filter කරලා, ඒ අයගේ නම් uppercase කරලා, අන්තිමට ඒ names collect කරන එක.
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.concurrent.ThreadLocalRandom;
class User {
private String name;
private int age;
public User(String name, int age) {
this.name = name;
this.age = age;
}
public String getName() { return name; }
public int getAge() { return age; }
@Override
public String toString() { return "User{" + "name='" + name + '\'' + ", age=" + age + '}'; }
}
public class ParallelStreamUserProcessing {
private static List<User> generateUsers(int count) {
List<User> users = new ArrayList<>();
for (int i = 0; i < count; i++) {
users.add(new User("User" + i, ThreadLocalRandom.current().nextInt(18, 65)));
}
return users;
}
public static void main(String[] args) {
List<User> allUsers = generateUsers(5_000_000); // මිලියන 5ක Users ලා
int thresholdAge = 30;
System.out.println("Processing " + allUsers.size() + " users, filtering for age > " + thresholdAge + "...");
// Sequential Processing
long startTimeSequential = System.nanoTime();
List<String> youngAdultsSequential = allUsers.stream()
.filter(user -> user.getAge() > thresholdAge)
.map(user -> user.getName().toUpperCase())
.collect(Collectors.toList());
long endTimeSequential = System.nanoTime();
long durationSequential = (endTimeSequential - startTimeSequential) / 1_000_000;
System.out.println("Sequential Processing took: " + durationSequential + " ms");
// System.out.println("Sequential Result size: " + youngAdultsSequential.size()); // Uncomment to verify size
// Parallel Processing
long startTimeParallel = System.nanoTime();
List<String> youngAdultsParallel = allUsers.parallelStream()
.filter(user -> user.getAge() > thresholdAge)
.map(user -> user.getName().toUpperCase())
.collect(Collectors.toList());
long endTimeParallel = System.nanoTime();
long durationParallel = (endTimeParallel - startTimeParallel) / 1_000_000;
System.out.println("Parallel Processing took: " + durationParallel + " ms");
// System.out.println("Parallel Result size: " + youngAdultsParallel.size()); // Uncomment to verify size
System.out.println("\nPerformance Improvement: " + (double) durationSequential / durationParallel + "x");
}
}
මේ example එකෙන් අපිට පැහැදිලිවම පේනවා CPU-intensive operation එකක් (filter, map) ලොකු data set එකක් මත කරද්දි Parallel Streams වලින් කොච්චර performance improvement එකක් ගන්න පුළුවන්ද කියලා. මේ වගේ scenarios වලදී තමයි Parallel Streams සැබෑ විදියටම shine වෙන්නේ.
නිගමනය (Conclusion)
Java 8 Parallel Streams කියන්නේ multi-core CPUs වල බලය හරිම සරලව, efficently භාවිතා කරන්න පුළුවන් ප්රබල tool එකක්. අපිට Concurrency ගැන ගැඹුරින් නොදැනම, අමතර boilerplate code ලියන්නේ නැතුව performance optimization කරන්න මේකෙන් ලොකු අවස්ථාවක් ලැබෙනවා.
ඒත් අපි මතක තියාගන්න ඕනේ, මේවා හැම වෙලාවෙම solution එකක් නෙවෙයි කියලා. ලොකු data sets, CPU-bound operations වගේ තැන් වලදී තමයි මේවායේ උපරිම වාසිය ගන්න පුළුවන්. ඒ වගේම stateful operations වලින් පරිස්සම් වෙන්නත්, ordering ගැන සැලකිලිමත් වෙන්නත් අමතක කරන්න එපා. හොඳම දේ තමයි, ඔයාගේ application එකේ performance critical parts වලට Parallel Streams භාවිතා කරද්දි, sequential stream එකත් එක්ක benchmark කරලා බලන එක.
ඉතින්, මේ tutorial එකෙන් Parallel Streams ගැන පැහැදිලි අවබෝධයක් ලැබෙන්න ඇති කියලා හිතනවා. ඔයාලත් මේවා ඔයාලගේ project වලට integrate කරලා බලන්න. මොකද, මේවා දැනගෙන ඉන්න එක අද කාලේ software engineering field එකේදී ගොඩක් වැදගත්.
මේ ගැන ඔයාලගේ අදහස්, අත්දැකීම් පහලින් comment කරන්න. තවත් මේ වගේ වැදගත් topic එකකින් හමුවෙමු!