理解Java的异步之旅
Understanding Java's Asynchronous Journey

原始链接: https://amritpandey.io/understanding-javas-asynchronous-journey/

Java 的并发编程经历了从手动管理线程 (Java 1) 到更高级 API 的演变。ExecutorService (Java 5) 使用 Future 对象简化了线程生命周期管理,但其 `get()` 方法会阻塞执行。ForkJoinPool (Java 7) 通过工作窃取机制优化了 CPU 密集型任务。CompletableFuture (Java 8) 支持非阻塞式任务链,解决了 `Future.get()` 的阻塞问题。ParallelStreams (Java 8) 提供了一种简化的并发数据处理方式。Flow API (Java 9) 支持用于事件驱动系统的响应式编程。虚拟线程 (Java 21) 通过允许许多 JVM 管理的线程共享少量操作系统线程来提高 CPU 利用率,这对于 I/O 密集型任务非常有利。Java 21 中的结构化并发 (Java 21 - 预览版) 提供了作用域任务管理,将并发任务视为一个单元,并支持快速失败行为。API 的选择取决于用例,从简单的并行处理到响应式系统,需要考虑规模、数据大小以及 I/O 或 CPU 密集型特性等因素。

Hacker News 上的一个讨论围绕着比较 Java 和 JavaScript 中异步编程的文章展开。评论者批评文章中 Java 的例子过于冗长,并且没有展示该语言更现代的特性,例如 `var` 以及 Java 24 中更新的入口点约定。一位评论者提供了使用 `CompletableFuture` 和实例主方法(Java 25)的简化 Java 代码,展示了更简洁的异步执行和线程中断处理。另一位评论者澄清说 `CompletableFuture` 的命名来源于其 `complete()` 方法,而不仅仅是其未来的完成。一些用户认为最初 Java 例子中使用 `Thread.sleep` 是不好的实践,建议使用 `Timer` 或 `ScheduledExecutorService` 作为更好的替代方案。讨论还涉及到两种语言执行的核心差异,特别是 Java 是多核的,而 JS 仍然主要单核。总的来说,讨论突出了 Java 并发特性 的发展,并对最初的例子进行了改进。

原文

Asynchronous programming skills are no longer “nice-to-have”; almost every programming language has it and uses it.

Languages like Go and JavaScript (in Node.js) have concurrency baked into their syntax.

Java, on the other hand, has concurrency, but it’s not quite as seamless at the syntax level when compared to something like JavaScript.

For instance, take a look at how JavaScript handles asynchronous operations. It’s way more compact and arguably easier to write than Java.

JavaScript
function fetchData() {
  return new Promise((resolve, reject) => {
    setTimeout(() => {
      resolve('Data fetched');
    }, 10000);
  });
}

fetchData().then(data => console.log(data));
console.log('Prints first'); // prints before the resolved data

Now, this is equivalent in Java.👇

Java
public class Example {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Data Fetched";
        });

        future.thenAccept(result -> System.out.println(result));
        System.out.println("Prints first"); // prints before the async result
    }
}

Coming from languages like JavaScript or Go, you might wonder 🤔

  • What even is a CompletableFuture?
  • Why does it take this much boilerplate to do something asynchronous in Java?
  • And why do we need to import a whole API just to do it?

🤓 These are valid questions — and believe it or not, this is the most simplified version of achieving Concurrency in Java.

This article walks through the evolution and explanation of concurrent programming in Java, from the early days of Threads in Java 1 to the StructuredTaskScope in Java 21.

Threads in Java 1

Early Java concurrency meant managing Thread objects directly.

To start execution of code in a thread, you’d have to create a thread object and pass a runnable with the actual logic you want to execute.

Take a look at the following example.

Java
public class Example {
    public static void main(String[] args) throws InterruptedException {
        final String[] result1 = new String[1];
        final String[] result2 = new String[1];

        Thread t1 = new Thread(() -> result1[0] = fetchData1());
        Thread t2 = new Thread(() -> result2[0] = fetchData2());

        t1.start();
        t2.start();

        t1.join();
        t2.join();

        System.out.println(result1[0] + " & " + result2[0]); // A & B
    }

    static String fetchData1() { return "A"; }
    static String fetchData2() { return "B"; }
}

👉 What’s bad with achieving concurrency using Thread objects? 😪

I can think of these:

  1. Manually handle threads, i.e. starting and stopping.
  2. Manual monitoring of Thread state: Start, Stop, Abort, Error, etc.
  3. In case a thread fails and throws an exception, you’ll have to handle that manually too.
  4. Too much code means more potential for making errors ❗.

ExecutorService in Java 5

Java 5 introduced ExecutorService, which abstracted away a lot of the thread lifecycle management with the help of Future object.

Java
public class Example {
    public static void main(String[] args) throws Exception {
        ExecutorService executor = Executors.newFixedThreadPool(2);

        Future<String> f1 = executor.submit(() -> fetchData1());
        Future<String> f2 = executor.submit(() -> fetchData2());

        System.out.println(f1.get() + " & " + f2.get());
        executor.shutdown();
    }

    static String fetchData1() { return "A"; }
    static String fetchData2() { return "B"; }
}

👉A Future is like a Promise object that we saw in the case of JavaScript as well, once the job is finished, it stores the results, which then can be accessed using get() method.

What got better with ExecutorService API? 🤔

  1. No more manual thread lifecycle handling.
  2. You can retrieve results via Future.

⚠️ But one major issue remains:

get() method invocation blocks the thread until the result is ready.

For example, if f1 takes time, everything after f1.get() in the above code also waits, which defeats the purpose of being “concurrent” — you are executing an asynchronous block of code synchronously.

ForkJoinPool in Java 7

Java 7 introduced ForkJoinPool API, designed for CPU-intensive parallel tasks using a work-stealing algorithm.

This is not an update over ExecutorService API, rather, it uses the ExecutorService internals to achieve its objective.

Java
public class Example {
    public static void main(String[] args) {
        ForkJoinPool pool = new ForkJoinPool();

        FetchTask task1 = new FetchTask(() -> fetchData1());
        FetchTask task2 = new FetchTask(() -> fetchData2());

        task1.fork();
        task2.fork();

        String result1 = task1.join();
        String result2 = task2.join();

        System.out.println("Combined: " + result1 + " & " + result2);

        pool.shutdown();
    }

    static class FetchTask extends RecursiveTask<String> {
        private final java.util.function.Supplier<String> supplier;

        FetchTask(java.util.function.Supplier<String> supplier) {
            this.supplier = supplier;
        }

        @Override
        protected String compute() {
            return supplier.get();
        }
    }

    static String fetchData1() { return "A"; }
    static String fetchData2() { return "B"; }
}

What’s special about ForkJoinPool? 🤔

  1. RecursiveTask: A wrapper for the task that takes a runnable called Supplier, the supplier keeps supplying computations to run on a thread and keeps it away from starvation.
  2. 👉 Work-stealing: Idle threads can “steal” work from busy threads.
  3. Best for CPU-bound tasks (not I/O-bound).

CompletableFuture in Java 8

This is where things start getting nice. 😀

CompletableFuture builds on top of the ExecutorService API, but uses it in a way that allows non-blocking chaining of tasks.

If you remember the problem we discussed with Future.get(), which was blocking the asynchronous task post its invocation, CompletableFuture prevents it by providing a chaining of operations on the received data.

Java
public class Example {
    public static void main(String[] args) {
        CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> fetchData1());
        CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> fetchData2());

        f1.thenCombine(f2, (resultFromF1, resultFromF2) -> resultFromF1 + " & " + resultFromF2)
          .thenAccept(System.out::println) // once combined then print
          .join(); // Waits for everything to complete
    }

    static String fetchData1() { return "A"; }
    static String fetchData2() { return "B"; }
}

What’s much better with CompletableFuture from Future?

  1. 👉 Chaining operations instead of blocking on get().
  2. thenCombine combines two async results.
  3. thenAccept Consumes the final result.

With CompletableFuture we got much closer to the modern needs of concurrent programming.

ParallelStreams in Java 8

ParallelStreams are not a concurreny specific topic but one that utilises multiple threads beneath to optimise streams in Java.

Java
public class Example {
    public static void main(String[] args) {
        List<String> names = List.of("Alice", "Bob", "Charlie", "David");

        names.parallelStream()
             .map(String::toUpperCase)
             .forEach(System.out::println); // Order not guaranteed
    }
}

In the above code, the names list is processed concurrently by the use of ParallelStream API.

This feature is great when we want to process large quantities of data, which can be processed parallely without order.

Flow API in Java 9

Java 9 introduced the Flow API to support reactive programming patterns, think streams of async data.

What’s Reactive Programming? You can read the reactive manifesto here.

In short, reactive programming is the realm of modern-day event-driven systems where systems have to process large quantities of real-time and historical data.

Think of Kafka or any other message queues that process large quantities of data, where the need for concurrency with excellent resource utilisation is of paramount importance.

Java
public class Example {
    public static void main(String[] args) throws Exception {
        SubmissionPublisher<String> publisher1 = new SubmissionPublisher<>();
        SubmissionPublisher<String> publisher2 = new SubmissionPublisher<>();

        Subscriber<String> subscriber = new Subscriber<>() {
            private String latest1, latest2;

            public void onSubscribe(Subscription s) { s.request(Long.MAX_VALUE); }

            public void onNext(String item) {
                if (item.startsWith("A")) latest1 = item;
                else latest2 = item;

                if (latest1 != null && latest2 != null)
                    System.out.println(latest1 + " & " + latest2);
            }

            public void onError(Throwable t) {}
            public void onComplete() {}
        };

        publisher1.subscribe(subscriber);
        publisher2.subscribe(subscriber);

        publisher1.submit(fetchData1());
        publisher2.submit(fetchData2());

        Thread.sleep(100);
        publisher1.close();
        publisher2.close();
    }

    static String fetchData1() { return "A"; }
    static String fetchData2() { return "B"; }
}

👉 Why use the Flow API?

  1. It’s ideal for streaming large volumes of data.
  2. Perfect for event-driven systems.

Virtual Threads in Java 21

A thread is an operating system entity that executes code through OS-level interfaces.

A common issue is thread starvation — when a thread completes its task but remains idle. The ForkJoinPool mitigates this, but it’s better suited for computation-heavy tasks, not I/O-bound ones.

👉 Virtual threads in Java 21 are lightweight JVM-managed threads that run code concurrently while using a small number of actual OS threads.

Many virtual threads can share a single platform thread, improving CPU utilisation.

They were introduced as a preview in Java 19 and became a stable feature in Java 21.

Java
public class Example {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor(); //

        try (executor) {
            Future<String> f1 = executor.submit(() -> fetchData1());
            Future<String> f2 = executor.submit(() -> fetchData2());

            String result1 = f1.get();
            String result2 = f2.get();

            System.out.println(result1 + " & " + result2);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }

    static String fetchData1() { return "A"; }
    static String fetchData2() { return "B"; }
}

👉 Similar to completable futures, these too are non-blocking in nature, and you can safely block the tasks like I/O operations without leading to thread starvation.

Structured Concurrency in Java 21 (Preview)

So, all the effort until Java 8 with CompletableFuture and Java 21 with Virtual Threads eased the use of concurrent programming in Java, however, some fundamental issues remain 😲.

These issues have more to do with the management of concurrent programming tasks.

One way of imagining the use of threads is to break the big task into small chunks and then execute them, much like ParallelStreams but in a custom manner.

This means that let’s say if I break my task A into two subtasks A1 and A2, the result R should be R = A1 + A2.

But what if any of the A1 or A2 fails? What happens to the result R?

In the simplest sense, the result for R should also fail as the task as a whole, which is that A did not succeed.

Currently, this kind of thread management, where we are constructing the results by executing the sub-tasks in parallel, can only be achieved manually, and Java don’t have a mechanism to combine multiple sub-tasks to be executed as one atomic task.

Structured Concurrency with StructuredTaskScope API (introduced in Java 21 as a preview) provides a way to group concurrent tasks together and treat them as a single unit of work.

Java
class Example {
  public static void main() {
      try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
          Supplier<Integer> x = scope.fork(() -> fun1());
          Supplier<Integer> y = scope.fork(() -> fun2());
  
          scope.join().throwIfFailed(); // Wait for all tasks and fail-fast
          System.out.println("Both tasks completed successfully.");
      } catch (Exception e) {
          System.out.println("One of the tasks failed. All tasks are now stopped.");
      }
  }
}

👉 Why does it matter?

  1. Scoped concurrency means tasks are grouped and managed together.
  2. If one task fails, all others can be cancelled implicitly, reducing complexity and improving safety.
  3. Treating sub-tasks as steps in an atomic task.
  4. Much cleaner than manual coordination of management logic like start, stop, abort, etc.

What should you choose to achieve concurreny in Java?

We can clearly see that Java offers multiple fronts for achieving concurrency. It even offers concurrency in differing paradigms like reactive programming.

Hence, the use of concurrency in Java can be based on many factors like Scale, Data Size, Nature(I/O or CPU), etc.

Here is a summary that you can use to better decide which technique will be suitable for you.

Use CaseRecommended API
Simple parallel tasksThread, ExecutorService or CompletableFuture
CPU-bound tasksForkJoinPool, ParallelStreams
Multiple I/O-bound tasksVirtual Threads
Event-driven/reactive systemsFlow API

If you liked this piece on Java, you’ll also like the new features introduced in JDK24.

Subscribe to my newsletter today!

联系我们 contact @ memedata.com