异步 Rust 的视觉之旅
A Visual Journey Through Async Rust

原始链接: https://github.com/alexpusch/rust-magic-patterns/blob/master/visual-journey-through-async-rust/Readme.md

本实验通过可视化方式探索 Rust 的 Tokio 异步运行时行为。首先,它绘制由并发 Future 计算的正弦波,演示了区分并发和并行的交替执行。然后,可视化展示了 CPU 密集型任务对其他并发 Future 的影响,说明即使是短暂的阻塞操作(500µs)也会使整个系统停滞。 为了缓解这个问题,实验使用 `tokio::spawn` 在单独的线程中运行 CPU 密集型任务,有效地实现了并行执行并隔离了阻塞代码。但是,它也演示了 Tokio 的线程池具有有限的线程数,当产生过多的 CPU 密集型任务时会导致竞争。 最后,引入了 `tokio::task::spawn_blocking`,它利用一个专门的线程池来处理同步阻塞代码。可视化结果显示了高效的线程利用率,但同时也承认在同一核心上仍然会发生上下文切换和资源共享。结论强调,仔细考虑 CPU 密集型代码并策略性地使用生成机制对于 Tokio 异步应用程序的最佳性能至关重要。

Hacker News 的一个帖子讨论了文章“异步 Rust 的可视化之旅”。评论者称赞这些可视化效果解释了异步原理。OskarS 强调,虽然 CPU 密集型任务不可避免,但阻塞式 I/O 操作是异步环境中的一个主要问题,它会不必要地占用 CPU 并阻碍并发。PaulHoule 分享了他使用异步 Python 和 Java 的经验,他认为对于 Web 服务器来说,除非处理极高的流量,否则线程通常就足够了。 一些用户深入探讨了技术细节。remram 解释说阻塞式 I/O 不会占用 CPU 内核,而 binary132 反驳说,在单核每线程的异步设置中,它确实会占用。Inglor 指出 Node.js 的 `worker_threads` 作为一种并行替代方案。Vermilingua 观察到文章可视化效果中正弦波的失真,alphaXp 将其归因于 sin 方法的运行时可变性,而 ninkendo 则认为是调度可变性造成的。讨论还涉及到习语“the penny drops”(恍然大悟)的起源,将其与早期投币式老虎机的延迟联系起来。

原文

I'm a visual and experimental learner. To truly understand something, I need to tinker with it and see it run with my own eyes. To truly understand async execution, I want to visualize it. What order are things happening in? Do concurrent futures affect each other? How do tasks and threads relate? Plotting network calls or file system operations is boring. Let's draw something.

All visualization code is available here

To visualize the passing of time, we'll plot out some shape. A simple sine wave will do nicely.
First, we create a future that asynchronously computes N values of the sine function. In each iteration, we compute a value and yield_now().await to yield execution to other futures. We send these computed values to a channel and later use matplotlib and pyo3 to graphically display the results.

/// Each calculated sine value is a sample we keep track of
struct Sample {
    fut_name: String,
    value: f32,
    t: u128
}

async fn produce_sin(run_start: Instant, fut_name: String, tx: mpsc::UnboundedSender<Sample>) {
    for i in 1..N {
        let t = run_start.elapsed().as_micros();
        let value = sin(i);

        tx.send(Sample { fut_name, value, t }).unwrap();
        // yield execution so that other futures can do their thing
        tokio::task::yield_now().await;
    }
}

Now, let's create a couple of these and see our two sine waves calculated side by side:

#[tokio::main]
async fn main() {
    let (tx, rx) = tokio::sync::mpsc::unbounded_channel();

    let mut futs = Vec::new();

    let run_start = Instant::now();

    futs.push(produce_sin(run_start, "fut1", tx.clone()).boxed());
    futs.push(produce_sin(run_start, "fut2", tx.clone()).boxed());

    futures::future::join_all(futs).await;

    drop(tx);
    plot_samples(rx).await;
}

This is what we get:

Alright! We managed to plot two sine waves in the most convoluted method ever! And both graphs are plotted parallel to one another. Could it be that Tokio futures are actually parallel and not just concurrent? Let's take a closer look.

First, let's add the sine value calculation times to our generated samples and update the plotting code to show this duration in the graphics.

struct Sample {
    fut_name: String,
    value: f32,
    start_t: u128,
    end_t: u128
}

async fn produce_sin(run_start: Instant, fut_name: String, tx: mpsc::UnboundedSender<Sample>) {
    for i in 1..N {
        let start_t = run_start.elapsed().as_micros();
        let value = sin(i);
        let end_t = run_start.elapsed().as_micros();

        tx.send(Sample { fut_name, value, start_t, end_t }).unwrap();
        tokio::task::yield_now().await;
    }
}

This is what we see now:

The blue blocks represent the duration it took us to calculate the sine value. In other words, this is the time our future was executed by the runtime, aka polled. Note that for the purpose of this visualization, our sin() function has a built-in 100-microsecond synchronous sleep. This is useful to make the timings more prominent and uniform.

Now, let's zoom in a bit toward the first few samples:

Aha! Sine value calculations alternate! When the first future calculates the sine value on its invocation, the other future is idle, or in more correct terminology, the second future yielded execution and let the other future continue with its work.

This is exactly the difference between concurrency and parallelism. If the async runtime ran futures in parallel, we would have seen all the blue blocks line up one below the other, more or less in the same time frames.

"CPU-intensive code will block the async executor" is common knowledge in async programming. From Node.js and Python's asyncio to Rust's async executors, CPU-bound code is the devil. It can slow down or even hang different concurrent async operations. How intensive is "CPU-intensive" anyway? Is calculating sin() intensive? SHA1? JSON parsing?

Let's see how CPU-intensive code affects our visualization. We'll define a new sin_high_cpu() method. This sine-generating method is more CPU-intensive than the regular f32::sin; in fact, it takes 500 microseconds.

Let's add a future that produces sine values using sin_high_cpu():

#[tokio::main]
async fn main() {
    let (tx, rx) = tokio::sync::mpsc::unbounded_channel();

    let mut futs = Vec::new();

    let run_start = Instant::now();

    futs.push(produce_sin(run_start, "fut0", tx.clone()).boxed());
    futs.push(produce_sin(run_start, "fut1", tx.clone()).boxed());
    futs.push(produce_sin_high_cpu(run_start, "high cpu", tx.clone()).boxed());

    futures::future::join_all(futs).await;

    drop(tx);
    plot_samples(rx).await;
}

This is the resulting plot:

As expected, produce_sin_high_cpu() takes much longer to complete, almost 500ms, compared to 160ms in the previous example. But behold, the other futures, the ones that use the regular sin() method, take just as long.

A zoomed-in view reveals what's going on:

The CPU-heavy future hogs all the CPU for itself. While it works, the two other futures wait for it to yield and cannot perform their fast operations. This demonstrates the effect of CPU-intensive code. Even a "short" operation of 500µs has effects on other concurrently running futures.

Is this test realistic? Does your async code loop around and perform small CPU-bound tasks? Think about HTTP servers that handle lots of multiple requests, parse incoming and outgoing JSON strings, send and receive DB query results, etc. Another common example is message queue consumers. This is an important insight: Using a single Tokio task limits multicore utilization. Your machine might have multiple available cores, but it's your job to utilize them. One approach for this is spawning Tokio tasks. Lets check that next.

Unlike Node.js, Rust's Tokio allows us to spawn a new Task and run futures within it. In the multithreaded runtime, Tokio will create worker threads that will host and run the tasks. By default, Tokio's thread pool will maintain a thread for each core your CPU has, in addition to the main thread. I work on a 4-core GitHub workspace, so I have 1+4 available threads.

Let's spawn produce_sin_high_cpu in a Tokio task and see how it affects the plot:

#[tokio::main]
async fn main() {
    let (tx, rx) = tokio::sync::mpsc::unbounded_channel();

    let mut futs = Vec::new();

    let run_start = Instant::now();

    futs.push(produce_sin(run_start, "fut0", tx.clone()).boxed());
    futs.push(produce_sin(run_start, "fut1", tx.clone()).boxed());
    futs.push(
        tokio::spawn(produce_sin_high_cpu(run_start, "spawned", tx.clone()).boxed())
            .map(|_| ()) // we need to replace the return value with () to match the other futures
            .boxed(),
    );

    futures::future::join_all(futs).await;

    drop(tx);
    plot_samples(rx).await;
}

This is the resulting plot:

And zoomed in:

Awesome! Our first two futures produce fast sine waves again, while the CPU-bounded sin_high_cpu is contained and does not affect the rest of the code. Note how the first two futures finish their calculation of N values much faster now when they don't need to compete with sin_high_cpu for CPU time.

This is an important lesson: spawning a new task allowed us to easily take advantage of our CPU's multiple cores and have the "spawned" future execute in parallel with the other two futures.

As the more perceptive of you might have noticed, I colored the execution periods in a different color according to the thread the code was executed on. "fut0" and "fut1" are executed on the main thread (blue), and the "spawned" future is executed on some worker thread (turquoise).

Ok, so if Tokio's multithreaded runtime has 1+4 available threads, what will happen if we spawn multiple tasks? Let's add some more sin_high_cpu tasks for our plot:

#[tokio::main]
async fn main() {
    let (tx, rx) = tokio::sync::mpsc::unbounded_channel();

    let mut futs = Vec::new();

    let run_start = Instant::now();

    futs.push(produce_sin(run_start, "fut0", tx.clone()).boxed());

    for i in 1..7 {
        let fut_name = format!("spawned{i}");
        futs.push(
            tokio::spawn(produce_sin_heavy(run_start, fut_name, tx.clone()).boxed())
                .map(|_| ())
                .boxed(),
        );
    }

    futures::future::join_all(futs).await;

    drop(tx);
    plot_samples(rx).await;
}

This is the resulting plot:

And zoomed in:

Tokio juggles the tasks between the threads. Each future polling might run in a different thread. Since we don't have enough available threads, our CPU-bound issues manifest again, and sometimes one task can stall work on other tasks. Interesting. Spawning new tasks improves parallelism, but with a hard limit. This is something we should definitely remember. In our example, look at the "spawned2" future. Note how this future and task are contending with "spawned4" and "spawned5" on the brown and gray threads.

Another relevant tool in Tokio's tool belt is tokio::task::spawn_blocking(). spawn_blocking() will spawn a block of (non-async) code in a dedicated thread pool — the blocking pool. This thread pool maintains a much larger thread pool.

Let's add a version of our sine producer that runs sin_heavy() under a spawn_blocking call:

async fn produce_sin_heavy_blocking(
    run_start: Instant,
    fut_name: String,
    tx: mpsc::UnboundedSender<Sample>,
) {
    for i in 1..N {
        let start = run_start.elapsed().as_micros();
        let tx = tx.clone();

        let (t_id, value) = tokio::task::spawn_blocking(move || {
            let value = sin_heavy(i);
            let t_id = thread_id::get();

            (t_id, value)
        })
        .await
        .unwrap();

        let end = run_start.elapsed().as_micros();

        let sample = Sample { fut_name, value, start, end, thread_id: t_id };

        tx.send(sample).unwrap();

        tokio::task::yield_now().await;
    }
}

And add a bunch of these to our visualization:

for i in 1..7 {
    futs.push(
        produce_sin_heavy_blocking(run_start, format!("spawn_blocking{i}"), tx.clone())
            .boxed(),
    );
}

This is the resulting plot:

And zoomed in:

I don't know about you, but this zoomed-in plot is oddly satisfying for me. Each sine calculation is done on a free thread, and supposedly we've reached peak efficiency. Of course, we need to remember that we didn't grow CPU cores out of thin air. Some of these threads are running on the same core, context switching and sharing resources after all.

In fact spawn_blocking is not always the best choice for CPU-heavy tasks. Often we want to dedicate a limited number of threads to such code. Async: What is blocking? is a nice post by Alice Ryhl that goes into details and surveys some alternatives.

Visualizing futures runtime like this really makes some pennies drop. The difference between concurrent and parallel execution pops out, and multicore utilization becomes intuitive.

It's much easier now to reason about Tokios behavior, even without deep familiarity with its codebase. Async programming in Rust is nuanced, but I hope this post has helped you grasp it a little better.

Appendix: Running Demo Code

The demo code uses some Python code to plot the visualizations. To run it:

  • Make sure you have rye installed
  • Install Python dependencies: rye sync
  • Activate Python virtual environment: source .venv/bin/activate
  • Run the code: cargo run
联系我们 contact @ memedata.com