Java CompletableFuture allOf() & anyOf() Sinhala Guide | Asynchronous Programming

ආයුබෝවන් යාළුවනේ! 🚀 අද අපි කතා කරන්න යන්නේ Java වල Asynchronous Programming වලදී ගොඩක් වැදගත් වෙන, ඒ වගේම ඔයාගේ Application වල Performance අහස උසට වැඩි කරන්න පුළුවන් මාතෘකාවක් ගැන – ඒ තමයි CompletableFuture. විශේෂයෙන්ම, එක පාර CompletableFuture
කීපයක් එකට පාවිච්චි කරලා අපේ වැඩ පහසු කරගන්නේ කොහොමද කියලා අපි බලමු.
අපි මේ tutorial එකෙන් CompletableFuture.allOf()
සහ CompletableFuture.anyOf()
කියන methods දෙක ගැන සවිස්තරාත්මකව ඉගෙන ගන්නවා. මේවා ඔයාට එකම වෙලාවේ Task කීපයක් handle කරන්නත්, ඒවාගේ ප්රතිඵල එනකම් කාර්යක්ෂමව බලාගෙන ඉන්නත් උදව් වෙනවා. එහෙනම්, අපි පටන් ගමු!
1. CompletableFuture කියන්නේ මොකක්ද? (What is CompletableFuture?)
මුලින්ම, CompletableFuture
කියන්නේ මොකක්ද කියලා පොඩ්ඩක් මතක් කරගමු. Java 8 වලින් හඳුන්වා දුන්න CompletableFuture
කියන්නේ Future
Interface එකේ Extended Version එකක්. මේක අපිට Asynchronous computations කරන්නයි, ඒ results එනකම් Non-Blocking විදියට බලාගෙන ඉන්නයි උදව් කරනවා.
සාමාන්ය Future
එකකට වඩා CompletableFuture
ගොඩක් නම්යශීලී (flexible) යි. මොකද, මේකෙන් අපිට Callbacks (thenApply
, thenAccept
, thenRun
, etc.) පාවිච්චි කරන්න පුළුවන් වගේම, අපිට Manually Future එක Complete කරන්නත් (complete()
, completeExceptionally()
) පුළුවන්. ඒ වගේම තමයි, Future කීපයක් එකට එකතු කරන්නත් පුළුවන් – අද අපි බලන්න යන්නේ ඒ ගැන තමයි!
සරල CompletableFuture උදාහරණයක්:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
public class SimpleCompletableFuture {
private static String doSomethingExpensive(String taskName) {
try {
// Simulate some time-consuming operation
TimeUnit.SECONDS.sleep(2);
System.out.println(taskName + " completed.");
return "Result of " + taskName;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return "Task " + taskName + " interrupted.";
}
}
public static void main(String[] args) throws Exception {
System.out.println("Main thread started.");
// Create a CompletableFuture that runs asynchronously
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("Future task started by thread: " + Thread.currentThread().getName());
return doSomethingExpensive("Async Task 1");
});
// Do something else while the future is running
System.out.println("Main thread continuing its work...");
TimeUnit.SECONDS.sleep(1);
// Get the result (this will block until the future completes)
String result = future.get();
System.out.println("Future result: " + result);
System.out.println("Main thread finished.");
}
}
මේ Code එකෙන් doSomethingExpensive
කියන Method එක වෙනම Thread එකක (Asynchronously) execute කරලා, ඒකේ Result එක එනකම් main
thread එක Block නොවී තව වැඩ කරන හැටි පෙන්නනවා. future.get()
කියන තැනදී විතරයි Result එක එනකම් Block වෙන්නේ.
2. CompletableFuture.allOf() - හැමෝම ඉවර වෙනකම් ඉමු! (Waiting for Everyone to Finish!)
හිතන්න ඔයාට different services කීපයකින් data ගන්න තියෙනවා, නැත්නම් එක පාර Task කීපයක් execute කරන්න තියෙනවා. හැබැයි මේ හැම Task එකකින්ම නැත්නම් හැම service එකකින්ම data ආවට පස්සේ තමයි ඔයාට ඊළඟට තියෙන වැඩේ (ඊළඟ Business Logic එක) පටන් ගන්න පුළුවන්. ඒ වගේ වෙලාවකදී තමයි CompletableFuture.allOf()
කියන Method එක ගොඩක් ප්රයෝජනවත් වෙන්නේ.
allOf()
Method එකට අපිට CompletableFuture
Instances කීපයක් දෙන්න පුළුවන්. ඒකෙන් Return වෙන්නේ CompletableFuture<Void>
එකක්. මේ CompletableFuture<Void>
එක Complete වෙන්නේ, අපිට දීපු අනිත් හැම CompletableFuture
එකක්ම Complete වුනාට පස්සේ විතරයි. මේක ගොඩක් වෙලාවට Parallel tasks set එකක් execute කරලා, ඒ හැම එකකම results Aggregation එකක් කරන්න ඕන වෙලාවට පාවිච්චි කරනවා.
allOf()
පාවිච්චි කරලා Multiple Results Collect කරගන්න හැටි:
allOf()
එකෙන් CompletableFuture<Void>
එකක් Return කරන නිසා, ඒක තනි Result එකක් දෙන්නේ නැහැ. ඒ නිසා, අපිට තනි තනි CompletableFuture
වල Results ඕන නම්, අපි ඒ හැම CompletableFuture
එකක්ම Separate විදියට join()
කරන්න ඕන. හැබැයි, allOf()
එකත් එක්ක ඒක join()
කලාට පස්සේ තමයි ඒක Safe වෙන්නේ. පහත Code Example එක බලන්න.
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
public class AllOfExample {
private static ExecutorService executor = Executors.newFixedThreadPool(3);
// Simulate fetching data from a service
private static CompletableFuture<String> fetchData(String source, int delaySeconds) {
return CompletableFuture.supplyAsync(() -> {
try {
System.out.println("Fetching data from " + source + "... (Thread: " + Thread.currentThread().getName() + ")");
TimeUnit.SECONDS.sleep(delaySeconds);
System.out.println("Finished fetching from " + source + ".");
return "Data from " + source;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
}
}, executor);
}
public static void main(String[] args) {
System.out.println("Main thread started.");
// Create multiple CompletableFuture tasks
CompletableFuture<String> future1 = fetchData("Service A", 3);
CompletableFuture<String> future2 = fetchData("Service B", 2);
CompletableFuture<String> future3 = fetchData("Service C", 4);
// Combine all futures using allOf()
// This future completes only when all individual futures complete
CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2, future3);
// When allFutures completes, we can then process the results of individual futures
CompletableFuture<List<String>> allResults = allFutures.thenApply(v -> {
System.out.println("All futures completed. Collecting results...");
// You can safely get results here because all futures are guaranteed to be complete
return Arrays.asList(future1.join(), future2.join(), future3.join());
});
// Attach a callback to print the collected results
allResults.thenAccept(results -> {
System.out.println("Final combined results: " + results);
}).exceptionally(ex -> {
System.err.println("One or more futures failed: " + ex.getMessage());
return null;
});
// Wait for the final result to be printed, otherwise main thread might exit early
try {
allResults.join(); // Blocks until all results are processed
} catch (Exception e) {
System.err.println("Error waiting for all results: " + e.getMessage());
}
System.out.println("Main thread continuing/finished its work.");
executor.shutdown();
try {
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException ex) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
මේ Example එකේදී, Service A, B, C
කියන තුන් දෙනාගෙන්ම data එනකම් අපි බලාගෙන ඉන්නවා. හැම Future එකක්ම Complete වුනාට පස්සේ, allResults
කියන CompletableFuture
එකෙන් අපිට ඒ Results ටික List<String>
එකක් විදියට ගන්න පුළුවන්.
3. CompletableFuture.anyOf() - කවුරුහරි ඉස්සෙල්ලා ඉවර වුණොත් ඇති! (Whoever Finishes First is Enough!)
සමහර වෙලාවට අපිට data ගන්න Sources කීපයක් තියෙනවා, හැබැයි මේ Sources වලින්, මුලින්ම data දෙන Source එකේ data එක විතරක් අපිට ඇති. අනිත් ඒවා ආවේ නැතත්, ඒක ප්රශ්නයක් නැහැ. උදාහරණයක් විදියට, ඔයාගේ System එකේ Database Server එකේ Replicas කීපයක් තියෙනවා නම්, ඉක්මනින්ම response එක දෙන Server එකෙන් data එක ගන්න එකෙන් System එකේ Speed එක වැඩි කරගන්න පුළුවන්. මේ වගේ scenarios වලදී තමයි CompletableFuture.anyOf()
Method එක පාවිච්චි කරන්නේ.
anyOf()
Method එකටත් අපිට CompletableFuture
Instances කීපයක් දෙන්න පුළුවන්. හැබැයි මේකෙන් Return වෙන්නේ CompletableFuture<Object>
එකක්. මේ CompletableFuture<Object>
එක Complete වෙන්නේ, අපිට දීපු අනිත් CompletableFuture
වලින් ඕනම එකක් මුලින්ම Complete වුනාම විතරයි. ඒ කියන්නේ, ඒ මුලින්ම Complete වුණු Future එකේ Result එක තමයි මේ anyOf()
එකෙන් අපිට ලැබෙන්නේ.
anyOf()
පාවිච්චි කරන හැටි:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class AnyOfExample {
private static ExecutorService executor = Executors.newFixedThreadPool(3);
// Simulate fetching data from a service, some faster than others
private static CompletableFuture<String> fetchData(String source, int delaySeconds) {
return CompletableFuture.supplyAsync(() -> {
try {
System.out.println("Fetching data from " + source + "... (Thread: " + Thread.currentThread().getName() + ")");
TimeUnit.SECONDS.sleep(delaySeconds);
System.out.println("Finished fetching from " + source + ".");
return "Data from " + source;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
}
}, executor);
}
public static void main(String[] args) {
System.out.println("Main thread started.");
// Create multiple CompletableFuture tasks with different delays
CompletableFuture<String> futureFast = fetchData("Fast Service", 1);
CompletableFuture<String> futureMedium = fetchData("Medium Service", 3);
CompletableFuture<String> futureSlow = fetchData("Slow Service", 5);
// Combine all futures using anyOf()
// This future completes as soon as any one of the individual futures completes
CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(futureFast, futureMedium, futureSlow);
// When anyOfFuture completes, get the result (which will be from the fastest future)
anyOfFuture.thenAccept(result -> {
System.out.println("First service responded with: " + result);
}).exceptionally(ex -> {
System.err.println("One of the futures failed before any completed successfully: " + ex.getMessage());
return null;
});
// Wait for the result to be printed, otherwise main thread might exit early
try {
anyOfFuture.join(); // Blocks until one result is available
} catch (Exception e) {
System.err.println("Error waiting for any result: " + e.getMessage());
}
System.out.println("Main thread continuing/finished its work.");
executor.shutdown();
try {
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException ex) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
මේ Example එකේදී, අපි Services තුනකින් data ගන්නවා. හැබැයි Fast Service
එකෙන් ඉක්මනටම data එන නිසා, anyOfFuture
එක ඒකේ Result එකත් එක්ක Complete වෙනවා. ඊට පස්සේ අනිත් Futures වල Result එනකම් අපි බලාගෙන ඉන්නේ නැහැ. මේක Resources Save කරන්න වගේම, User Experience එක වැඩි දියුණු කරන්නත් උදව් වෙනවා.
4. වැදගත් Tips සහ Best Practices (Important Tips & Best Practices)
Error Handling (වැරදි හසුරුවන ක්රම)
Asynchronous operations වලදී Errors එන එක සාමාන්ය දෙයක්. CompletableFuture
වලදී අපිට Errors handle කරන්න Method කීපයක් තියෙනවා:
exceptionally(Function<Throwable, T> fn)
: Future එක Fail වුනොත්, ඒ Error එක Recover කරලා Default Value එකක් Return කරන්න පුළුවන්.handle(BiFunction<T, Throwable, R> fn)
: Future එක Complete වුනත්, Fail වුනත් මේ Method එක Execute වෙනවා. Result එකයි, Exception එකයි දෙකම අපිට මේ Method එක ඇතුලෙන් Handle කරන්න පුළුවන්.whenComplete(BiConsumer<T, Throwable> action)
: Future එක Complete වුනාම හෝ Fail වුනාම, ඒකේ Result එකයි, Exception එකයි දෙකම සමඟ මේ Action එක Execute වෙනවා. මේකෙන් Future එකේ Result එකට බලපෑමක් වෙන්නේ නැහැ.
allOf()
vs. anyOf()
- කවදාද මොකක්ද පාවිච්චි කරන්නේ?
allOf()
: ඔයාට Task Set එකක හැම Task එකක්ම Complete වෙන්න ඕන නම්, ඒ හැම Result එකක්ම Collect කරලා Process කරන්න ඕන නම්allOf()
පාවිච්චි කරන්න. (e.g., Reports Generate කරන විටදී, multiple data sources වලින් data අරගෙන combine කරන විටදී).anyOf()
: Task Set එකකින් ඕනම එකක් ඉස්සෙල්ලා Complete වුනාම ඇති නම්, ඒකේ Result එක විතරක් වැදගත් නම්anyOf()
පාවිච්චි කරන්න. (e.g., Content එකක් multiple CDN (Content Delivery Network) වලින් Load කරන විටදී, nearest සහ fastest CDN එකෙන් content එක ගන්න).
Custom Executor Service (අපේම Thread Pool එකක්)
CompletableFuture.supplyAsync()
වගේ Methods වලට අපි Executor
එකක් specify නොකලොත්, ඒක Default විදියට ForkJoinPool.commonPool()
එක පාවිච්චි කරනවා. හැබැයි, ඔයාගේ Asynchronous tasks වල Blocking Operations (e.g., Database calls, Network calls) තියෙනවා නම්, commonPool
එක Busy වෙලා, Overall Performance එක අඩු වෙන්න පුළුවන්.
ඒ නිසා, ඔයාගේ Task එකේ ස්වභාවයට (nature) අනුව newFixedThreadPool
, newCachedThreadPool
වගේ Custom ExecutorService
එකක් හදලා ඒක CompletableFuture
Methods වලට දෙන්න පුරුදු වෙන්න. මේකෙන් ඔයාට Threads ගණන පාලනය කරගන්නත්, System එකේ Resources කාර්යක්ෂමව පාවිච්චි කරන්නත් පුළුවන්.
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
// ... in your code
ExecutorService customExecutor = Executors.newFixedThreadPool(5); // 5 threads in the pool
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// your blocking or long-running task
return "Result";
}, customExecutor); // Pass the custom executor here
// Don't forget to shutdown the executor when done
// customExecutor.shutdown();
Conclusion (අවසන් කිරීම)
ඉතින් යාළුවනේ, මේ ලිපියෙන් ඔයාලට Java වල CompletableFuture.allOf()
සහ CompletableFuture.anyOf()
කියන powerful Methods දෙක පාවිච්චි කරලා Asynchronous operations කාර්යක්ෂමව handle කරගන්න හැටි ගැන හොඳ අවබෝධයක් ලැබෙන්න ඇති කියලා හිතනවා. මේවා ඔයාගේ Application වල Performance සහ Responsiveness වැඩි දියුණු කරන්න ගොඩක් උදව් වෙනවා.
Asynchronous Programming කියන්නේ අද දවසේ Modern Applications වලට නැතුවම බැරි දෙයක්. මේ Concepts ඔයාගේ Project වලට එකතු කරලා බලන්න. ඒ වගේම, Experiment කරලා බලන්න! ඔයාට ප්රශ්න තියෙනවා නම්, පහළින් Comment කරන්න. ඔයාගේ අත්දැකීම් අපිත් එක්ක බෙදාගන්නත් අමතක කරන්න එපා. ජය වේවා! 🇱🇰