Rive,Go 语言中快速且可靠的后台任务处理框架
Rive, Fast and reliable background jobs in Go

原始链接: https://github.com/riverqueue/river

River 是一个为 Go 语言设计的高性能后台任务处理系统,旨在与 Postgres 无缝集成。通过将应用程序数据和任务队列存储在同一个数据库中,River 实现了事务性入队。这确保了只有在相关的数据库事务提交后才会处理任务,从而有效避免了分布式系统中常见的数据一致性问题。 主要特性包括: * **简单的定义**:任务由结构体对(实现 `JobArgs` 和 `Worker`)定义,并具有唯一的“kind”标识符。 * **灵活的客户端**:River 客户端负责管理任务插入、工作进程注册和维护。它支持跨多个队列的并发任务执行,并提供优雅的关闭机制。 * **事务完整性**:通过使用 `InsertTx`,开发者可以在单个事务中将业务逻辑与任务入队操作一并执行。 * **互操作性**:尽管 River 使用 Go 编写,但它支持跨语言的任务插入,允许非 Go 服务将性能敏感的任务委托给基于 Go 的工作进程。 得益于详尽的文档和配套的 UI 界面,River 为基于 Postgres 的 Go 应用程序提供了一套稳健且可靠的后台任务管理方案。

Hacker News 最新 | 过往 | 评论 | 提问 | 展示 | 招聘 | 提交 登录 Rive, Go 语言中快速可靠的后台任务队列 (github.com/riverqueue) 14 点,由 mountainview 发布于 5 小时前 | 隐藏 | 过往 | 收藏 | 2 条评论 leetrout 23 分钟前 | 下一条 [-] 标题有错别字……是 River 而不是 Rivere 回复 jeffbee 23 分钟前 | 上一条 [-] 他们通过制造一个非分布式系统,避开了所有那些棘手的分布式系统问题。真是个惊人的声明。 回复 指南 | 常见问题 | 列表 | API | 安全 | 法律 | 加入 YC | 联系我们 搜索:
相关文章

原文

River is a robust high-performance job processing system for Go and Postgres.

See homepage, docs, and godoc, as well as the River UI and its live demo.

Being built for Postgres, River encourages the use of the same database for application data and job queue. By enqueueing jobs transactionally along with other database changes, whole classes of distributed systems problems are avoided. Jobs are guaranteed to be enqueued if their transaction commits, are removed if their transaction rolls back, and aren't visible for work until commit. See transactional enqueueing for more background on this philosophy.

Jobs are defined in struct pairs, with an implementation of JobArgs and one of Worker.

Job args contain json annotations and define how jobs are serialized to and from the database, along with a "kind", a stable string that uniquely identifies the job.

type SortArgs struct {
    // Strings is a slice of strings to sort.
    Strings []string `json:"strings"`
}

func (SortArgs) Kind() string { return "sort" }

Workers expose a Work function that dictates how jobs run.

type SortWorker struct {
    // An embedded WorkerDefaults sets up default methods to fulfill the rest of
    // the Worker interface:
    river.WorkerDefaults[SortArgs]
}

func (w *SortWorker) Work(ctx context.Context, job *river.Job[SortArgs]) error {
    sort.Strings(job.Args.Strings)
    fmt.Printf("Sorted strings: %+v\n", job.Args.Strings)
    return nil
}

Jobs are uniquely identified by their "kind" string. Workers are registered on start up so that River knows how to assign jobs to workers:

workers := river.NewWorkers()
// AddWorker panics if the worker is already registered or invalid:
river.AddWorker(workers, &SortWorker{})

A River Client provides an interface for job insertion and manages job processing and maintenance services. A client's created with a database pool, driver, and config struct containing a Workers bundle and other settings. Here's a Client working one queue ("default") with up to 100 worker goroutines at a time:

riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
    Queues: map[string]river.QueueConfig{
        river.QueueDefault: {MaxWorkers: 100},
    },
    Workers: workers,
})
if err != nil {
    panic(err)
}

// Run the client inline. All executed jobs will inherit from ctx:
if err := riverClient.Start(ctx); err != nil {
    panic(err)
}

Workers can also be omitted, but it's better to include it so River can check that inserted job kinds have a worker that can run them.

The client should also be stopped on program shutdown. There's a number of ways to go about this (see graceful shutdown), but the shortest is to cancel the context send to Start when the program's ready to stop. For example, to stop on SIGINT/SIGTERM:

riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
    SoftStopTimeout: 10 * time.Second,
    ...
})
if err != nil {
    panic(err)
}

signalCtx, stop := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM)
defer stop()

// Stop fetching new work and wait for active jobs to finish. Cancel jobs after
// SoftStopTimeout elapses.
if err := riverClient.Start(signalCtx); err != nil {
    panic(err)
}

<-riverClient.Stopped()

Alternatively, use an explicit call to Stop:

if err := riverClient.Stop(ctx); err != nil {
    panic(err)
}

Insert-only clients will insert jobs, but not work them, and don't need to be started or stopped.

Client.InsertTx is used in conjunction with an instance of job args to insert a job to work on a transaction:

_, err = riverClient.InsertTx(ctx, tx, SortArgs{
    Strings: []string{
        "whale", "tiger", "bear",
    },
}, nil)

if err != nil {
    panic(err)
}

See the InsertAndWork example for complete code.

Cross language enqueueing

River supports inserting jobs in some non-Go languages which are then worked by Go implementations. This may be desirable in performance sensitive cases so that jobs can take advantage of Go's fast runtime.

See developing River.

River was in large part inspired by our experiences with other background job libraries over the years, most notably:

Thank you for driving the software ecosystem forward.

联系我们 contact @ memedata.com