展示 HN:Rust 宏实用工具,用于批量执行代价高昂的异步操作
Show HN: Rust macro utility for batching expensive async operations

原始链接: https://github.com/hackermondev/batched

`batched` crate 提供了一个 Rust 宏,用于高效处理大量开销较大的异步操作,尤其适用于数据库插入等场景。它基于可配置的 `limit` 将项目分组为批次,并以指定的 `concurrent` 级别并发处理它们,批次处理之间有最小的 `window` 延迟。 该宏将接受 `Vec` 的函数转换为批处理版本 (`insert_message`, `insert_message_multiple`)。批处理函数的返回类型决定了结果的处理方式——对于单个值进行克隆,或者对于 `Vec` 返回值进行迭代。错误处理使用 `SharedError` 处理缺乏 `Clone` 实现的类型。 主要特性包括与 Tokio 运行时集成、可选的 tracing span 用于请求监控,以及 OpenTelemetry 支持用于关联 span。示例展示了使用 PostgreSQL 批量数据库插入,展示了单条消息和批量消息处理,以及适当的错误处理和返回类型管理。需要注意的是,该宏不设计用于在结构体*内部*使用。

一个新的 Rust 宏工具,用于批量处理耗时的异步操作,已经在 Hacker News 上分享 ([github.com/hackermondev](https://github.com/hackermondev))。该工具旨在通过将多个对异步函数的调用组合成一个更大的操作来提高性能。 初步反应积极,一位用户已经发现了一个潜在的应用场景。然而,评论者也指出需要改进文档,解释*为什么*这个抽象是有益的以及性能成本是什么。关于窗口大小配置(动态 vs. 构建时)以及它与并发控制机制(如信号量)的区别出现了问题——它组合输入,而不是限制并发执行。 该项目似乎维护着 future 和输入值的向量,当达到定义的窗口大小时触发原始函数。
相关文章

原文

Rust macro utility for batching expensive async operations.

Or add this to your Cargo.toml:

[dependencies]
batched = "0.2.7"
  • limit: Maximum amount of items that can be grouped and processed in a single batch.
  • concurrent: Maximum amount of concurrent batched tasks running (default: Infinity)
  • window: Minimum amount of time (in milliseconds) the background thread waits before processing a batch.
  • window[x]: Minimum amount of time (in milliseconds) the background thread waits before processing a batch when latest buffer size is <= x

The target function must have a single argument, a vector of items (Vec<T>).

The return value of the batched function is propagated (cloned) to all async calls of the batch, unless the batched function returns a Vec<T>, in which case the return value for each call is pulled from the iterator in the same order of the input.

If the return value is not an iterator, The target function return type must implement Clone to propagate the result. Use batched::error::SharedError to wrap your error types (if they don't implement Clone).

  • Built for async environments (tokio), will not work without a tokio async runtime
  • Target function must have async
  • Not supported inside structs:
struct A;

impl A {
    #[batched(window = 1000, limit = 100)]
    fn operation() {
        ...
    }
}

This feature automatically adds tracing spans to call functions for batched requests (x, x_multiple).

This feature adds support for linking spans from callers to the inner batched call when using OpenTelemetry. Depending on whether your OpenTelemetry client supports it, you should be able to see the linked span to the batched call.

#[batched(window = 100, limit = 1000)]
async fn add(numbers: Vec<u32>) -> u32 {
    numbers.iter().sum()
}

async fn main() {
    for _ in 0..99 {
        tokio::task::spawn(async move {
            add(1).await
        });
    }

    let result = add(1).await;
    assert_eq!(result, 100);
}
use batched::{batched, error::SharedError};

// Macros creates functions [`insert_message`] and [`insert_message_multiple`]
#[batched(window = 100, window1 = 10, window5 = 20, limit = 100_000)]
async fn insert_message(messages: Vec<String>) -> Result<(), SharedError<anyhow::Error>> {
    let pool = PgPool::connect("postgres://user:password@localhost/dbname").await?;
    let mut query = String::from("INSERT INTO messages (content) VALUES ");
    ...
}

#[post("/message")]
async fn service(message: String) -> Result<(), anyhow::Error> {
    insert_message(message).await?;
    Ok(())
}

#[post("/bulk_messages")]
async fn service(messages: Vec<String>) -> Result<(), anyhow::Error> {
    insert_message_multiple(messages).await?;
    Ok(())
}

Batch insert rows and return them

use batched::{batched, error::SharedError};

struct Row {
    pub id: usize,
    pub content: String,
}

// Macros creates functions [`insert_message`] and [`insert_message_multiple`]
#[batched(window = 100, window1 = 10, window5 = 20, limit = 100_000)]
async fn insert_message_batched(messages: Vec<String>) -> Result<Vec<Row>, SharedError<anyhow::Error>> {
    let pool = PgPool::connect("postgres://user:password@localhost/dbname").await?;
    let mut query = String::from("INSERT INTO messages (content) VALUES ");
    ...
}

#[post("/message")]
async fn service(message: String) -> Result<(), anyhow::Error> {
    let message: Row = insert_message(message).await?;
    Ok(())
}

#[post("/bulk_messages")]
async fn service(messages: Vec<String>) -> Result<(), anyhow::Error> {
    let messages: Vec<Row> = insert_message_multiple(messages).await?;
    Ok(())
}
联系我们 contact @ memedata.com