Crossfire:Rust语言的高性能无锁spsc/mpsc/mpmc通道。
Crossfire: High-performance lockless spsc/mpsc/mpmc channels for Rust

原始链接: https://github.com/frostyplanet/crossfire-rs

## Crossfire:高性能无锁通道 Crossfire 是一个用于 Rust 的高性能无锁通道库,提供单生产者/单消费者 (SPSC)、多生产者/单消费者 (MPSC) 和多生产者/多消费者 (MPMC) 通信。它于 v1.0 (2022) 版本发布,并在 v2.0/v2.1 (2025) 版本中进行了重大重构,支持异步和阻塞上下文之间的无缝通信。 V2.1 移除了 `crossbeam-channel` 依赖,使用了修改后的 `crossbeam-queue` 以提高性能——通常超过其他异步通道,甚至在阻塞场景下超越原始的 `crossbeam-channel`。它利用自旋和让步,并提供 `detect_backoff_cfg()` 函数来优化在单核系统(如 VPS)上的性能。 Crossfire 提供灵活的 API 选项,适用于不同的用例,包括有界和无界通道,以及用于异步/阻塞上下文的各种发送者/接收者组合。SP/SC 接口以牺牲有限并发性为代价,提供最大的无锁性。它与 `tokio` 和 `async-std` 兼容,提供超时功能和安全的取消操作。 **重要提示:** v2.1 的性能可能会给异步运行时带来压力,可能暴露潜在的细微错误。可通过 `cargo bench` 和 GitHub workflows 获取基准测试和调试工具。

相关文章

原文

Build Status License Cargo Documentation Rust 1.36+

High-performance lockless spsc/mpsc/mpmc channels.

It supports async contexts, and communication between async and blocking contexts.

The low level is based on crossbeam-queue.

For the concept, please refer to the wiki.

  • V1.0: Released in 2022.12 and used in production.

  • V2.0: Released in 2025.6. Refactored the codebase and API by removing generic types from the ChannelShared type, which made it easier to code with.

  • v2.1: Released in 2025.9. Removed the dependency on crossbeam-channel and implemented with a modified version of crossbeam-queue, which brings performance improvements for both async and blocking contexts.

Being a lockless channel, crossfire outperforms other async-capable channels. And thanks to a lighter notification mechanism, in a blocking context, some cases are even better than the original crossbeam-channel,

mpsc bounded size 100 blocking context

mpmc bounded size 100 blocking context

mpsc bounded size 100 async context

mpmc bounded size 100 async context

More benchmark data is posted on wiki.

Also, being a lockless channel, the algorithm relies on spinning and yielding. Spinning is good on multi-core systems, but not friendly to single-core systems (like virtual machines). So we provide a function detect_backoff_cfg() to detect the running platform. Calling it within the initialization section of your code, will get a 2x performance boost on VPS.

The benchmark is written in the criterion framework. You can run the benchmark by:

cargo bench --bench crossfire

NOTE: Because v2.1 has push the speed to a level no one has gone before, it can put a pure pressure to the async runtime. Some hidden bug (especially atomic ops on weaker ordering platform) might occur:

v2.0.26 (legacy):

archruntimeworkflowstatus
x86_64 threaded cron_2.0_x86 PASSED
tokio 1.47.1
async-std
arm threaded cron_2.0_arm PASSED
tokio-1.47.1
async-std

Debug locally:

Use --features trace_log to run the bench or test until it hangs, then press ctrl+c or send SIGINT, there will be latest log dump to /tmp/crossfire_ring.log (refer to tests/common.rs _setup_log())

Debug with github workflow: #37

There are 3 modules: spsc, mpsc, mpmc, providing functions to allocate different types of channels.

The SP or SC interface is only for non-concurrent operation. It's more memory-efficient than MP or MC implementations, and sometimes slightly faster.

The return types in these 3 modules are different:

  • mpmc::bounded_blocking() : (tx blocking, rx blocking)

  • mpmc::bounded_async() : (tx async, rx async)

  • mpmc::bounded_tx_async_rx_blocking() : (tx async, rx blocking)

  • mpmc::bounded_tx_blocking_rx_async() : (tx blocking, rx async)

  • mpmc::unbounded_blocking() : (tx non-blocking, rx blocking)

  • mpmc::unbounded_async() : (tx non-blocking, rx async)

NOTE : For a bounded channel, a 0 size case is not supported yet. (Temporary rewrite as 1 size).

Context Sender (Producer) Receiver (Consumer)
Single Multiple Single Multiple
Blocking BlockingTxTrait BlockingRxTrait
Tx MTx Rx MRx
Async AsyncTxTrait AsyncRxTrait
AsyncTx MAsyncTx AsyncRx MAsyncRx

For the SP / SC version, AsyncTx, AsyncRx, Tx, and Rx are not Clone and without Sync. Although can be moved to other threads, but not allowed to use send/recv while in an Arc. (Refer to the compile_fail examples in the type document).

The benefit of using the SP / SC API is completely lockless waker registration, in exchange for a performance boost.

The sender/receiver can use the From trait to convert between blocking and async context counterparts.

Error types are the same as crossbeam-channel: TrySendError, SendError, TryRecvError, RecvError

  • tokio: Enable send_timeout, recv_timeout API for async context, based on tokio. And will detect the right backoff strategy for the type of runtime (multi-threaded / current-thread).

  • async_std: Enable send_timeout, recv_timeout API for async context, based on async-std.

Tested on tokio-1.x and async-std-1.x, crossfire is runtime-agnostic.

The following scenarios are considered:

  • The AsyncTx::send() and AsyncRx:recv() operations are cancellation-safe in an async context. You can safely use the select! macro and timeout() function in tokio/futures in combination with recv(). On cancellation, [SendFuture] and [RecvFuture] will trigger drop(), which will clean up the state of the waker, making sure there is no mem-leak and deadlock. But you cannot know the true result from SendFuture, since it's dropped upon cancellation. Thus, we suggest using AsyncTx::send_timeout() instead.

  • When the "tokio" or "async_std" feature is enabled, we also provide two additional functions:

  • AsyncTx::send_timeout(), which will return the message that failed to be sent in [SendTimeoutError]. We guarantee the result is atomic. Alternatively, you can use AsyncTx::send_with_timer().

  • AsyncRx::recv_timeout(), we guarantee the result is atomic. Alternatively, you can use crate::AsyncRx::recv_with_timer().

  • Between blocking context and async context, and between different async runtime instances.

  • The async waker footprint.

When using a multi-producer and multi-consumer scenario, there's a small memory overhead to pass along a Weak reference of wakers. Because we aim to be lockless, when the sending/receiving futures are canceled (like tokio::time::timeout()), it might trigger an immediate cleanup if the try-lock is successful, otherwise will rely on lazy cleanup. (This won't be an issue because weak wakers will be consumed by actual message send and recv). On an idle-select scenario, like a notification for close, the waker will be reused as much as possible if poll() returns pending.

Cargo.toml:

[dependencies]
crossfire = "2.1"
extern crate crossfire;
use crossfire::*;
#[macro_use]
extern crate tokio;
use tokio::time::{sleep, interval, Duration};

#[tokio::main]
async fn main() {
    let (tx, rx) = mpsc::bounded_async::<i32>(100);
    for _ in 0..10 {
        let _tx = tx.clone();
        tokio::spawn(async move {
            for i in 0i32..10 {
                let _ = _tx.send(i).await;
                sleep(Duration::from_millis(100)).await;
                println!("sent {}", i);
            }
        });
    }
    drop(tx);
    let mut inv = tokio::time::interval(Duration::from_millis(500));
    loop {
        tokio::select! {
            _ = inv.tick() =>{
                println!("tick");
            }
            r = rx.recv() => {
                if let Ok(_i) = r {
                    println!("recv {}", _i);
                } else {
                    println!("rx closed");
                    break;
                }
            }
        }
    }
}
联系我们 contact @ memedata.com