格林:用于 Gleam 参与者的类型安全 PubSub 和注册表,具有分布式集群功能。
Glyn: Type-safe PubSub and Registry for Gleam actors with distributed clustering

原始链接: https://github.com/mbuhot/glyn

## Glyn:用于 Gleam 的类型安全分布式 Actor Glyn 为构建分布式 Gleam actor 提供类型安全的 PubSub 和 Registry 系统,利用 Erlang 的 `syn` 库。它通过两个核心组件实现健壮的 actor 通信:**PubSub** 用于向订阅者广播事件,**Registry** 用于直接将命令路由到命名进程。 至关重要的是,Glyn 通过要求显式解码器来强制类型安全,确保跨集群的可靠处理——消息作为 Erlang 项发送,而不是 JSON。开发者定义消息类型和相应的解码器函数来实现这一点。 Glyn 使用选择器组合与 Gleam 的 actor 模型无缝集成。Actor 可以订阅 PubSub 主题,在 Registry 注册,并将这些通道组合起来以实现复杂的行为。这允许强大的多通道 actor 集成,在一个 actor 中处理各种消息类型。 该库包含示例,演示了消息定义、PubSub 发布/订阅、Registry 注册/命令发送,以及结合使用这两个系统的完整 actor 实现。Glyn 是开源的,并采用 MIT 许可证。

相关文章

原文

Package Version Hex Docs

Type-safe PubSub and Registry for Gleam actors with distributed clustering support.

Built on the Erlang syn library.

Glyn provides two complementary systems for actor communication:

  • PubSub: Broadcast events to multiple subscribers
  • Registry: Direct command routing to named processes

Both systems integrate seamlessly with Gleam's actor model using selector composition patterns.

Creating Message Types and Decoders

First, define your message types and corresponding decoder functions. Explicit decoders are required to ensure messages sent between nodes in a cluster are handled with type safety. Note the Glyn does not JSON encode messages, they are sent directly as erlang terms and should be decoded from tuples.

// my_app/orders.gleam
import gleam/dynamic.{type Dynamic}
import gleam/dynamic/decode
import gleam/erlang/process.{type Subject}

// Define your message types
pub type Event {
  OrderCreated(id: String, amount: Int)
  OrderShipped(id: String, tracking: String)
  SystemAlert(message: String)
}

pub type Command {
  ProcessOrder(id: String, reply_with: Subject(Bool))
  GetStatus(reply_with: Subject(String))
  Shutdown
}

// Helper function to match specific atoms
fn expect_atom(expected: String) -> decode.Decoder(atom.Atom) {
  use value <- decode.then(atom.decoder())
  case atom.to_string(value) == expected {
    True -> decode.success(value)
    False -> decode.failure(value, "Expected atom: " <> expected)
  }
}

// Unsafe cast for Subject decoding - use with caution
@external(erlang, "gleam_stdlib", "identity")
fn unsafe_cast_subject(value: Dynamic) -> Subject(a)

// Create decoder functions
pub fn event_decoder() -> decode.Decoder(Event) {
  decode.one_of(
    {
      use _ <- decode.field(0, expect_atom("order_created"))
      use id <- decode.field(1, decode.string)
      use amount <- decode.field(2, decode.int)
      decode.success(OrderCreated(id, amount))
    },
    or: [
      {
        use _ <- decode.field(0, expect_atom("order_shipped"))
        use id <- decode.field(1, decode.string)
        use tracking <- decode.field(2, decode.string)
        decode.success(OrderShipped(id, tracking))
      },
      {
        use _ <- decode.field(0, expect_atom("system_alert"))
        use message <- decode.field(1, decode.string)
        decode.success(SystemAlert(message))
      },
    ]
  )
}

pub fn command_decoder() -> decode.Decoder(Command) {
  decode.one_of(
    // Shutdown is a simple variant - encoded as bare atom
    decode.map(expect_atom("shutdown"), fn(_) { Shutdown }),
    or: [
      // ProcessOrder has data - encoded as tuple
      {
        use _ <- decode.field(0, expect_atom("process_order"))
        use id <- decode.field(1, decode.string)
        use reply_with <- decode.field(2, decode.dynamic)
        decode.success(ProcessOrder(id, unsafe_cast_subject(reply_with)))
      },
      // GetStatus has data - encoded as tuple
      {
        use _ <- decode.field(0, expect_atom("get_status"))
        use reply_with <- decode.field(1, decode.dynamic)
        decode.success(GetStatus(unsafe_cast_subject(reply_with)))
      },
    ]
  )
}
import glyn/pubsub
import my_app/orders
import gleam/erlang/process

pub fn main() {
  // Create PubSub system with decoder
  let pubsub = pubsub.new(
    "orders",
    orders.event_decoder(),
  )

  // Subscribe returns a Selector that can be composed
  let selector =
    process.new_selector()
    |> process.merge_selector(
      pubsub.subscribe(pubsub, "processing")
      |> process.map_selector(OrderEvent)
    )

  // Publish events
  let assert Ok(Nil) = pubsub.publish(pubsub, "processing",
    orders.OrderCreated("ORDER123", 99))

  // Receive messages through selector
  let assert Ok(OrderEvent(orders.OrderCreated(id, amount))) =
    process.selector_receive(selector, 100)
}
import glyn/registry
import my_app/orders  // Using same orders module from above
import gleam/erlang/process.{type Subject}

pub fn main() {
  // Create Registry system with decoder and error default
  let registry = registry.new(
    "orders",
    orders.command_decoder(),
    orders.Shutdown
  )

  // Register returns a Selector for receiving commands
  let assert Ok(command_selector) = registry.register(registry, "order_processor", "v1.0")

  // Compose with other selectors
  let selector =
    process.new_selector()
    |> process.merge_selector(
      process.map_selector(command_selector, UserCommand)
    )

  // Send commands to registered processes
  let assert Ok(_) = registry.send(registry, "order_processor",
    orders.ProcessOrder("ORDER123", process.new_subject()))

  // Or use call for request/reply pattern
  let assert Ok(status) = registry.call(registry, "order_processor", waiting: 1000,
    sending: fn(reply) { orders.GetStatus(reply) })
}

Multi-Channel Actor Integration

The real power of Gleam's typed actor system comes from composing multiple message channels in a single actor:

import glyn/pubsub
import glyn/registry
import my_app/orders  // Using the orders module we defined above
import gleam/erlang/process.{type Subject}
import gleam/otp/actor

// Actor message type that composes multiple channels
pub type ProcessorMessage {
  DirectCommand(DirectMessage)  // Direct actor commands
  OrderCommand(orders.Command)  // Registry commands
  OrderEvent(orders.Event)      // PubSub events
  Shutdown
}

pub type DirectMessage {
  GetStats(reply_with: Subject(String))
  Reset
}

pub fn start_order_processor() {
  actor.new_with_initialiser(5000, fn(subject) {
    // Create PubSub and Registry systems
    let event_pubsub = pubsub.new(
      "events",
      orders.event_decoder(),
    )

    let command_registry = registry.new(
      "commands",
      orders.command_decoder(),
      orders.Shutdown
    )

    // Get selectors from both systems
    let event_selector = pubsub.subscribe(event_pubsub, "orders")
    let assert Ok(command_selector) = registry.register(command_registry, "order_processor", "v1.0")

    // Create direct command channel
    let direct_subject = process.new_subject()

    // Compose all channels using selectors
    let selector =
      process.new_selector()
      |> process.select(subject)
      |> process.select_map(direct_subject, DirectCommand)
      |> process.merge_selector(process.map_selector(command_selector, OrderCommand))
      |> process.merge_selector(process.map_selector(event_selector, OrderEvent))

    // Return initialized actor
    actor.initialised(ProcessorState(status: "ready", processed: 0))
    |> actor.selecting(selector)
    |> actor.returning(direct_subject)  // Return direct command interface
    |> Ok
  })
  |> actor.on_message(handle_processor_message)
  |> actor.start()
}

fn handle_processor_message(state, message) {
  case message {
    OrderCommand(orders.ProcessOrder(id, reply_with)) -> {
      // Handle registry command
      process.send(reply_with, True)
      actor.continue(ProcessorState(..state, processed: state.processed + 1))
    }
    OrderCommand(orders.GetStatus(reply_with)) -> {
      // Return status
      process.send(reply_with, state.status)
      actor.continue(state)
    }
    OrderEvent(orders.OrderCreated(id, amount)) -> {
      // React to PubSub event
      actor.continue(ProcessorState(..state, status: "Processing order " <> id))
    }
    OrderEvent(orders.SystemAlert(message)) -> {
      // Handle system-wide alerts
      actor.continue(ProcessorState(..state, status: "Alert: " <> message))
    }
    DirectCommand(user_command) -> {
      // Handle direct commands
      actor.continue(state)
    }
    Shutdown -> {
      actor.stop()
    }
  }
}
gleam test           # Run tests
gleam docs build     # Build documentation

This project is licensed under the MIT License.

联系我们 contact @ memedata.com