Java CompletableFuture: allOf(), anyOf() භාවිතයෙන් Multiple Futures Manage කරමු | Sinhala Guide

Java CompletableFuture: allOf(), anyOf() භාවිතයෙන් Multiple Futures Manage කරමු | Sinhala Guide

Completing Multiple Futures with CompletableFuture's allOf() and anyOf() | Sinhala Guide

ආයුබෝවන් යාළුවනේ! 👋 අද අපි කතා කරන්න යන්නේ Java වල Asynchronous Programming ගැන තියෙන වැදගත්ම Topic එකක් ගැන. ඔයාලා දන්නවනේ, Modern applications වලදී performance එක වැඩි කරගන්න එක කොච්චර වැදගත්ද කියලා. එක වැඩක් ඉවර වෙනකම් බලන් ඉන්නේ නැතුව එක පාරටම වැඩ කීපයක් කරගන්න අපිට Asynchronous Programming උදව් වෙනවා. Java 8 වලින් පස්සේ ආපු CompletableFuture කියන්නේ මේකට තියෙන නියමම Solution එකක්.

සමහර වෙලාවට අපිට Future Tasks ගොඩක් run කරන්න වෙනවා. ඊට පස්සේ ඒ හැම Task එකක්ම ඉවර වෙනකම් ඉන්න, එහෙමත් නැත්නම් ඒ Tasks අතරින් එකක් හරි ඉවර වුන ගමන් ඊළඟ වැඩේට යන්න ඕනේ වෙනවා. මේ වගේ scenarios වලදී අපිට CompletableFuture.allOf() සහ CompletableFuture.anyOf() කියන Methods දෙක ලොකු උදව්වක් වෙනවා.

අද මේ Guide එකෙන් අපි බලමු, මේ Methods දෙක මොනවද, කොහොමද ඒවා පාවිච්චි කරන්නේ, සහ ඔයාලගේ applications වල performance එක වැඩි කරගන්න මේවා කොහොමද ප්‍රයෝජනවත් වෙන්නේ කියලා. එහෙනම්, අපි පටන් ගමු!

CompletableFuture කියන්නේ මොකක්ද?

සාමාන්‍යයෙන් Future එකක් කියන්නේ Asynchronous Operation එකක Result එකක්. ඔයාලා ExecutorService එක්ක submit() කරලා Future objects අරන් ඇති. හැබැයි Future එකේ Limitations ටිකක් තියෙනවා:

  • get() Method එකෙන් Result එක ගන්න ගියාම ඒක Block කරනවා. ඒ කියන්නේ Result එක එනකම් Main Thread එකට බලාගෙන ඉන්න වෙනවා.
  • ඒවා compose කරන්න (එක Future එකක Result එක තවත් Future එකකට යොදාගන්න) අමාරුයි. Callback mechanisms නැහැ.
  • Error handling කරන්න විශේෂ ක්‍රමයක් නැහැ.

CompletableFuture එකක් කියන්නේ මේ Future එකේ Improved Version එකක්. ඒක Future සහ CompletionStage යන Interfaces දෙකම implement කරන නිසා අපිට Non-blocking asynchronous operations කරන්න, ඒවා එකට chain කරන්න (compose කරන්න), error handling කරන්න, සහ ඒ Results process කරන්න පුළුලුවක් දෙනවා. සරලවම කිව්වොත්, ඒක අපේ asynchronous code එක ලියන එක ගොඩක් පහසු කරනවා.

සරල CompletableFuture උදාහරණයක්

මුලින්ම අපි සරල CompletableFuture එකක් කොහොමද වැඩ කරන්නේ කියලා බලමු.


import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;

public class BasicCompletableFutureExample {

    public static void main(String[] args) throws Exception {
        ExecutorService executor = Executors.newFixedThreadPool(2);

        // supplyAsync() uses a ForkJoinPool.commonPool() by default,
        // or a custom ExecutorService if provided.
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println("Task started in thread: " + Thread.currentThread().getName());
                Thread.sleep(2000); // Simulate a long-running task (e.g., database query)
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                System.err.println("Task interrupted: " + e.getMessage());
            }
            System.out.println("Task finished in thread: " + Thread.currentThread().getName());
            return "Hello from Async Task!";
        }, executor);

        // Main thread can continue doing other work
        System.out.println("Main thread is doing other work while the future is running...");

        // Attach a callback to process the result when it's available (non-blocking)
        future.thenAccept(result -> {
            System.out.println("Callback received result: " + result + " in thread: " + Thread.currentThread().getName());
        }).exceptionally(ex -> {
            System.err.println("Error in task: " + ex.getMessage());
            return null; // Return a default value or re-throw
        });

        // To keep the main thread alive until the future completes, if needed for testing
        // In real applications, this might be handled by an event loop or framework.
        future.join(); // This will wait for the future to complete (similar to get() but throws unchecked exceptions)

        System.out.println("Main thread finished.");
        executor.shutdown();
    }
}
    

මේ Example එකෙන් පෙන්නනවා supplyAsync() Method එකෙන් වෙන Thread එකක Task එකක් run කරන්නේ කොහොමද කියලා. thenAccept() වගේ Callback Methods වලින් Result එක ආවට පස්සේ Non-blocking විදියට ඒක process කරන්න අපිට පුළුවන්. join() Method එකත් get() වගේම Result එක එනකම් Block කරනවා, හැබැයි ඒක Checked Exceptions වෙනුවට Unchecked Exceptions විසි කරනවා, ඒක තමයි වෙනස.

allOf() භාවිතයෙන් Futures ඔක්කොම සම්පූර්ණ කිරීම

අපිට Tasks කීපයක් එක පාර run කරලා, ඒ හැම එකක්ම සාර්ථකව complete වෙනකම් බලන් ඉන්න ඕනේ නම් CompletableFuture.allOf() Method එක තමයි පාවිච්චි කරන්නේ. මේක හරිම Useful වෙන්නේ, අපිට වෙන වෙනම data sources කීපයකින් data fetch කරන්න තියෙනකොට, එහෙමත් නැත්නම් වෙන වෙනම Services කීපයකට requests යවලා ඒ හැම response එකක්ම process කරන්න ඕනේ වෙනකොට.

වැදගත්ම දේ තමයි allOf() Return කරන්නේ CompletableFuture<Void> එකක්. ඒ කියන්නේ ඒකේ individual results අඩංගු වෙන්නේ නැහැ. Results ගන්න ඕනේ නම්, අපිට ඒ original CompletableFuture objects වල join() (හෝ get()) Method එක පාවිච්චි කරන්න වෙනවා. හැබැයි allOf().join() call කරාට පස්සේ, original futures වල join() calls Block වෙන්නේ නැහැ, මොකද ඒ වෙලාවට futures already complete වෙලා නිසා.

allOf() Practical උදාහරණයක්


import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;

public class CompletableFutureAllOfExample {

    // Method to simulate fetching data from a source with a delay
    public static CompletableFuture<String> fetchData(String dataSource, long delay) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println("Fetching data from " + dataSource + "... (Thread: " + Thread.currentThread().getName() + ")");
                Thread.sleep(delay); // Simulate network latency/processing time
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                System.err.println("Task for " + dataSource + " interrupted.");
            }
            return "Data from " + dataSource;
        });
    }

    public static void main(String[] args) {
        long startTime = System.currentTimeMillis();

        // Create multiple CompletableFuture tasks
        CompletableFuture<String> future1 = fetchData("Database A", 2000);
        CompletableFuture<String> future2 = fetchData("API B", 1500);
        CompletableFuture<String> future3 = fetchData("File System C", 1000);

        // Combine all futures using allOf()
        // This CompletableFuture will complete when all input futures complete
        CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2, future3);

        // Wait for all futures to complete (this will block the main thread until then)
        allFutures.join(); 

        // All futures are now completed. We can safely retrieve their results.
        System.out.println("\nAll tasks completed! Retrieving results...");
        
        // A more elegant way to collect results
        List<String> results = Arrays.asList(future1, future2, future3)
                                      .stream()
                                      .map(CompletableFuture::join) // join() here will not block
                                      .collect(Collectors.toList());

        results.forEach(System.out::println);

        long endTime = System.currentTimeMillis();
        System.out.println("Total time taken: " + (endTime - startTime) + " ms");

        // Error Handling Note: If any of the individual futures completes exceptionally,
        // allFutures will also complete exceptionally. You can handle this with .exceptionally()
        // on 'allFutures' or on individual futures before combining.
        // Example: future1.exceptionally(ex -> "Error: " + ex.getMessage()) 
    }
}
    

මෙයින් අපට ලැබෙන වාසි:

  • Performance Advantage: Tasks එක පාරටම run වෙන නිසා, වැඩේට යන මුළු කාලය, දීර්ඝතම Task එකේ කාලයට සමාන වෙනවා (sequential run කරනවාට වඩා වේගවත්). මේ උදාහරණයේදී, 2000ms ක් වගේ කාලයක් ඇතුලත tasks 3ම ඉවර වෙනවා.
  • Result Handling: allOf().join() එකෙන් පස්සේ future1.join() වගේ individual join() call කරද්දී ඒවා Block වෙන්නේ නැහැ, මොකද ඒ වෙලාවට Future එක already complete වෙලා නිසා. Stream API එකත් එක්ක Results collect කරන එක ගොඩක් පහසුයි.
  • Error Handling: allOf() එකට දීපු Futures අතරින් එකක් හරි Exception එකක් දැම්මොත්, allOf() එකෙන් Return වෙන CompletableFuture<Void> එකත් Exception එකකින් Complete වෙනවා. ඒක අපිට exceptionally() වගේ Methods වලින් handle කරන්න පුළුවන්.

anyOf() භාවිතයෙන් එක Future එකක් සම්පූර්ණ වූ විට ක්‍රියා කිරීම

අපිට Tasks කීපයක් එක පාර run කරලා, ඒ Tasks අතරින් මුලින්ම complete වෙන එකේ Result එක ගන්න ඕනේ නම් CompletableFuture.anyOf() Method එක පාවිච්චි කරනවා. මේක හරිම Useful වෙන්නේ, අපිට එකම data set එකක් ගන්න පුළුවන් වෙන වෙනම Sources කීපයක් තියෙනකොට (e.g., primary DB, replica DB, cache, different APIs), ඒ අතරින් වේගවත්ම එකේ response එක ගන්න ඕනේ වෙනකොට. Failover mechanisms වලටත් මේක පාවිච්චි කරන්න පුළුවන්.

anyOf() Return කරන්නේ CompletableFuture<Object> එකක්. මොකද complete වෙන Future එකේ Type එක මොකක්ද කියලා කලින්ම කියන්න බැරි නිසා. ඒ නිසා, Result එක ගත්තට පස්සේ අපිට ඒක අවශ්‍ය Type එකට Cast කරගන්න වෙනවා.

anyOf() Practical උදාහරණයක්


import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;

public class CompletableFutureAnyOfExample {

    // Method to simulate fetching data from a service with a delay
    public static CompletableFuture<String> fetchDataFromService(String serviceName, long delay, boolean shouldFail) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println("Requesting data from " + serviceName + "... (Thread: " + Thread.currentThread().getName() + ")");
                Thread.sleep(delay); // Simulate network latency
                if (shouldFail) {
                    throw new RuntimeException("Failed to fetch data from " + serviceName + ".");
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                System.err.println("Task for " + serviceName + " interrupted.");
            } catch (RuntimeException e) {
                System.err.println("Error from " + serviceName + ": " + e.getMessage());
                throw e; // Re-throw to propagate the error
            }
            return "Data from " + serviceName;
        });
    }

    public static void main(String[] args) {
        long startTime = System.currentTimeMillis();

        // Create multiple CompletableFuture tasks
        // One is fast, one is medium, one is slow (and might fail)
        CompletableFuture<String> fastService = fetchDataFromService("Fast Cache", 500, false);
        CompletableFuture<String> mediumService = fetchDataFromService("Primary DB", 1200, false);
        CompletableFuture<String> slowService = fetchDataFromService("Backup API", 2000, true); // This one will fail

        // Combine with anyOf()
        // This CompletableFuture will complete as soon as any of the input futures completes
        CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(fastService, mediumService, slowService);

        // Wait for any future to complete and get its result
        try {
            Object result = anyOfFuture.join(); // This will block until one future completes
            System.out.println("\nFirst service responded!");
            System.out.println("Result: " + result);
        } catch (Exception e) {
            System.err.println("\nAn error occurred while waiting for any service: " + e.getMessage());
        }

        long endTime = System.currentTimeMillis();
        System.out.println("Total time taken: " + (endTime - startTime) + " ms");

        // Note: Even if slowService fails, if fastService completes successfully first,
        // anyOfFuture will complete successfully with the fastService's result.
        // If the *first* future to complete (regardless of its original position) completes exceptionally,
        // then anyOfFuture will complete exceptionally.
    }
}
    

මෙයින් අපට ලැබෙන වාසි:

  • Fastest Response: Tasks අතරින් වේගවත්ම එකේ result එක අරගෙන user experience එක improve කරන්න පුළුවන්. මේ උදාහරණයේදී, 500ms ක් වගේ කාලයක් ඇතුලත අපිට response එක ලැබෙනවා.
  • Failover Mechanisms: Service එකක් fail වුනත්, තවත් Service එකක් වේගයෙන් response එකක් දුන්නොත් අපිට ඒක පාවිච්චි කරන්න පුළුවන්.
  • Result Type: join() එකෙන් Object එකක් Return වෙන නිසා, ඒක අපිට අවශ්‍ය type එකට cast කරගන්න වෙනවා. (String finalResult = (String) result;)
  • Error Handling: anyOf() එකට දීපු Futures අතරින් මුලින්ම complete වෙන Future එක exception එකකින් complete වුනොත්, anyOf() එකෙන් return වෙන CompletableFuture<Object> එකත් exception එකකින් complete වෙනවා. අනිත් Futures වල errors ඒකට බලපාන්නේ නැහැ, ඒවා පස්සේ complete වෙනවා නම්.

allOf() සහ anyOf() අතර වෙනස

දැන් අපි මේ Methods දෙකේ ප්‍රධාන වෙනස්කම් ගැන සාරාංශයක් බලමු:

Feature CompletableFuture.allOf() CompletableFuture.anyOf()
Purpose පහසුකම් සපයන ලද සියලුම CompletableFutures සම්පූර්ණ වන තෙක් බලා සිටී. පහසුකම් සපයන ලද CompletableFutures අතරින් ඕනෑම එකක් සම්පූර්ණ වන තෙක් බලා සිටී.
Return Type CompletableFuture<Void> (තනි ප්‍රතිඵලයක් ගෙන එන්නේ නැත) CompletableFuture<Object> (පළමු සාර්ථක ප්‍රතිඵලය ගෙන එයි)
Result Retrieval allOf() වෙතින් සෘජු ප්‍රතිඵලයක් නැත. තනි ප්‍රතිඵල ලබා ගත යුත්තේ මුල් CompletableFuture objects වල join() හෝ get() මගිනි. පළමුව සම්පූර්ණ වූ CompletableFuture හි ප්‍රතිඵලය ලබා දෙයි.
Use Cases Batch processing, සියලුම දත්ත අවශ්‍ය වන විට බහු මූලාශ්‍රවලින් දත්ත ලබා ගැනීම. Failover mechanisms, වේගවත්ම දත්ත ලබා ගැනීම සඳහා අනවශ්‍ය ප්‍රභවයන්ගෙන් (cache, multiple APIs) දත්ත ලබා ගැනීම.
Completion (Success) සියලුම Input Futures සාර්ථකව සම්පූර්ණ වුවහොත් පමණක් සාර්ථකව සම්පූර්ණ වේ. ඕනෑම Input Future එකක් සාර්ථකව සම්පූර්ණ වුවහොත් සාර්ථකව සම්පූර්ණ වේ.
Completion (Exception) ඕනෑම Input Future එකක් Exception එකකින් සම්පූර්ණ වුවහොත්, allOf() ද Exception එකකින් සම්පූර්ණ වේ. මුලින්ම සම්පූර්ණ වන Future එක (පිළිවෙල නොසලකා) Exception එකකින් සම්පූර්ණ වුවහොත්, anyOf() ද Exception එකකින් සම්පූර්ණ වේ.

නිගමනය (Conclusion)

ඉතින් යාළුවනේ, මේ Guide එකෙන් ඔයාලට CompletableFuture වල තියෙන allOf() සහ anyOf() Methods ගැන හොඳ අවබෝධයක් ලැබෙන්න ඇති කියලා මම හිතනවා. Asynchronous Programming කියන්නේ Java Developer කෙනෙක් විදියට ඔයාලා අනිවාර්යයෙන්ම දැනගෙන ඉන්න ඕනේ දෙයක්. මේ Methods දෙක පාවිච්චි කරලා ඔයාලට ඔයාලගේ Applications වල performance එක හොඳටම වැඩි කරගන්න පුළුවන්.

allOf() එකෙන් ඔයාලට Parallel tasks ගොඩක් run කරලා, ඒ හැම එකකම results එක පාර process කරන්න පුළුවන්, කාලය ඉතිරි කරගන්න පුළුවන්. anyOf() එකෙන් ඔයාලට තියෙන options අතරින් වේගවත්ම එකේ result එක අරගෙන user experience එක improve කරන්න පුළුවන්.

මේ concept එක නිකම්ම කියවලා තේරුම් ගන්නවට වඩා Practical විදියට implement කරලා බලන්න. ඔයාලගේ next project එකේදී මේවා පාවිච්චි කරන්න උත්සාහ කරන්න. මොකද, Practice තමයි මේ වගේ දේවල් හොඳටම ඉගෙන ගන්න තියෙන හොඳම ක්‍රමය! මේ වගේ Async operations වලදී Thread pools manage කරන එකත් හරිම වැදගත්. ඒ ගැනත් වැඩිදුරටත් හොයලා බලන්න.

මේ ගැන ඔයාලගේ අදහස්, ප්‍රශ්න, එහෙමත් නැත්නම් ඔයාලගේ experiences පහත comment section එකේ share කරන්න. අපි හැමෝම එකතු වෙලා ඉගෙන ගමු! ජය වේවා! 🙏