Haskell 的并发:快速、简单、正确
Concurrency in Haskell: Fast, Simple, Correct

原始链接: https://bitbashing.io/haskell-concurrency.html

Haskell采用独特的方案应对并发挑战,它结合了线程和事件驱动IO的优点。它使用运行时在一个操作系统线程池上管理的绿色线程,提供了一个更易于访问的并发模型。`async`包通过promise和简单的取消操作简化了线程管理。 Haskell真正的优势在于软件事务内存 (STM)。STM引入了特殊的 数据类型,例如`TVar`,用于在事务中进行原子读写操作。这些事务可以无缝地组合,从而实现复杂的逻辑,例如`TBCQueue`示例,无需显式使用互斥锁或锁。STM中的`retry`函数优雅地处理等待和重试事务,而不会出现忙等待,因为Haskell运行时会根据`TVar`的变化来管理线程唤醒。 STM通过确保只有STM操作才能原子地执行来消除许多并发问题,从而防止在临界区意外进行IO操作。这结合了父线程失败时自动取消子线程的功能,促进了声明式和健壮的并发代码。

Hacker News 最新 | 过去 | 评论 | 提问 | 展示 | 招聘 | 提交 登录 Haskell中的并发:快速、简单、正确 (bitbashing.io) ingve 1小时前 9分 | 隐藏 | 过去 | 收藏 | 1条评论 haskell17373 8分钟前 [–] 值得注意的是,这里使用的`async`库非常简单易懂。几乎每个函数都只有一两行代码。同样,由于使用了STM,`TQueue`也极其简单(并且易于证明其正确性),并且通常具有良好的性能。 回复 加入我们,参加6月16日至17日在旧金山举办的AI创业学校! 指南 | 常见问题 | 列表 | API | 安全 | 法律 | 申请YC | 联系我们 搜索:
相关文章
  • (评论) 2023-11-10
  • (评论) 2024-03-26
  • 为什么是哈斯克尔? 2024-09-13
  • 组件简洁性 2025-03-21
  • (评论) 2024-09-13

  • 原文

    After nearly a decade of building embedded systems in C, C++, and Rust, I’ve somehow ended up writing Haskell for a living. If you’d asked me about functional programming a few years ago, I would have told you it was self-indulgent academic baloney—and then I stumbled into people using it for real-time systems where microseconds can mean literal life or death.

    I’m too old to try to convince people what tools they should use, but Haskell has some features that might interest anyone who cares about fast, correct code. Let’s talk about them.

    We’ll start with concurrency.


    Some people, when confronted with a problem, think,
    “I know, I’ll use regular expressions.”
    Now they have two problems.

    —Jamie Zawinski

    Some people, when confronted with a problem, think,
    “I know, I’ll use threads,”
    and then two they hav erpoblesms.

    —Ned Batchelder


    Like we’ve previously discussed, we have two main concerns when going fast:

    • Your computer (even the one in your pocket) has many cores. To use the whole computer, you need to distribute work across them.

    • The outside world is slow—networking and disk IO are many thousands of times slower than computing. Keep computing while you wait!

    And so, we need to break work into independent tasks, usually one of two ways:

    1. Compose the program into several threads of execution, traditionally scheduled and ran by the operating system.
    2. Compose the program as a series of callbacks, or continuations, that run once some other action (e.g., IO) completes.

    Option 2 has some nice performance benefits, especially when paired with event-driven IO. Watch Ryan Dhall introduce Node.js to the world—he doesn’t especially care about Javascript; he’s just trying to make this sort of concurrency more accessible. But continuation passing has its own problems. Even when syntactic sugar like async/await makes it appear to run sequentially, debugging can be a frustrating experience. Traditional stack traces go out the window, and you may ask yourself, “well, how did I get here?”

    Threads and You

    Haskell tries to have the best of both worlds: threads are its concurrency primitive, but they’re green threads, scheduled by the runtime on an (OS) thread pool, and fed by event-driven IO.

    Let’s crunch through the basics so that we can get to the cool stuff. We can spawn threads with forkIO, which runs the given action in a new thread and returns a thread ID:

    import Control.Concurrent
    
    main :: IO ()
    main = do
        _tid <- forkIO $ putStrLn "Hello from thread 2!"
        putStrLn "Look ma, concurrent prints!"
    
    

    That’s a start, but how do we wait for the thread to complete, or see what it returned? There’s not much we can do with a thread’s ID, besides killing it. We find answers in the async package, which gives us a promise for our new thread:

    async :: IO a -> IO (Async a)
    

    …which we can wait for! Or cancel, if we’re in a bad mood:

    wait :: Async a -> IO a
    
    cancel :: Async a -> IO ()
    

    And so,

    import Control.Concurrent.Async
    
    main :: IO ()
    main = do
        hFut <- async $ readFile "hello.txt"
        putStrLn "Reading file..."
        helloContents <- wait hFut
        putStrLn helloContents
    

    Sometimes we don’t want to wait for the threads we spawn, though. Consider a server that spins one off for each client that connects. It might link these new threads to itself so failures propagate back up.

    serveLoop :: Socket -> (Socket -> SockAddr -> IO ()) -> IO ()
    serveLoop listener clientHandler = do
        (clientSock, clientAddr) <- accept listener
        -- Handle each client in their own thread
        clientThread <- async $ clientHandler clientSock clientAddr
        -- Silently swallowing errors is bad, mmk?
        link clientThread
        serveLoop listener clientHandler
    

    No Threads, Only Concurrently

    We still have lots to figure out. How should we wait for several threads? If one fails, can we cancel the others? What happens if we (the caller) are cancelled?

    With the right tools, the correct answer is, “don’t worry about it.”

    -- Runs each action in its own thread and returns the results
    concurrently :: IO a -> IO b -> IO (a, b)
    
    -- Runs each action in its own thread,
    -- returning whichever finishes first.
    race :: IO a -> IO b -> IO (Either a b)
    
    -- Run a function (mapping a to b) in a separate thread
    -- for each element of a data structure
    mapConcurrently :: Traversable t => (a -> IO b) -> t a -> IO (t b)
    
    -- And much more...
    

    In each of these, if one thread fails, the rest are cancelled. And if the parent thread fails, all children are cancelled. This is wonderfully declarative—work happens concurrently, stops as soon as it should, and we don’t concern ourselves with spawning and joining individual threads.

    There’s also a Concurrently type we can apply to our own abstractions. Want a concurrently-evaluated tuple?

    (page1, page2, page3) <- runConcurrently $ (,,)
        <$> Concurrently (getURL "url1")
        <*> Concurrently (getURL "url2")
        <*> Concurrently (getURL "url3")
    

    Or to run a whole collection of actions all at once and collect the results?

    runAll :: (Foldable f, Monoid m) => f (IO m) -> IO m
    runAll = runConcurrently . foldMap Concurrently
    

    (Haskell’s ability to build generalize code over “anything foldable” or “anything traversable” is another superpower worth talking about, but let’s gloss over FP jargon today.)

    STM and the art of waiting

    For he who gets hurt will be he who has stalled.
    —Bob Dylan


    Great, we have threads! Next, we need them to talk to each other—this is the part folks think of when they say concurrency is hard. Enter the real magic of Haskell: STM.

    Short for “software transactional memory”, STM defines a few special types. The foundational one is TVar:

    -- A "transactional variable"
    data TVar a
    
    -- Create a TVar holding any type at all, then...
    newTVarIO :: a -> IO (TVar a)
    
    -- ...atomically read...
    readTVar :: TVar a -> STM a
    
    -- ...and atomically write.
    writeTVar :: TVar a -> a -> STM ()
    

    The library uses that to build other useful types, like a bounded queue:

    data TBQueue a
    
    -- Create one of the given length
    newTBQueueIO :: Natural -> IO (TBQueue a)
    
    -- Write to the queue, blocking if full
    writeTBQueue :: TBQueue a -> a -> STM ()
    
    -- Read from the queue, blocking if empty
    readTBQueue :: TBQueue a -> STM a
    
    -- Read from the queue, returning Nothing if empty
    tryReadTBQueue :: TBQueue a -> STM (Maybe a)
    
    -- And so on...
    

    You’ll notice that reads and writes aren’t IO actions—they’re STM actions. How do we use those? As parts of an atomic transaction, of course.

    atomically :: STM a -> IO a
    

    As the name implies, atomically acts as a critical section—everything inside happens all at once. At its most boring, we can use this to read and write our STM types:

    -- A silly concurrent cat:
    -- read stdin in one thread, write to stdout in the other.
    main :: IO ()
    main = do
        q <- newTBQueueIO 1024
        let reader = do
                l <- getLine
                atomically $ writeTBQueue q l
                reader -- loop!
        let printer = do
                l <- atomically $ readTBQueue q
                putStrLn l
                printer -- loop!
        -- Run each in their own thread:
        concurrently_ reader printer
    

    But the real power is how STM functions compose.

    Let’s say we want a queue that can be closed. Our little program only works well until data stops—pipe it a file or hit Ctrl+D and:

    cat.hs: <stdin>: hGetLine: end of file
    

    Let’s fix that.

    -- The C is for Closeable!
    data TBCQueue a = TBCQueue {
        queue :: TBQueue a,
        open :: TVar Bool
    }
    
    -- Make a new closeable queue with the given capacity.
    newTBCQueueIO :: Natural -> IO (TBCQueue a)
    newTBCQueueIO n = TBCQueue <$> newTBQueueIO n <*> newTVarIO True
    
    -- Closing means it's no longer open.
    closeTBCQueue :: TBCQueue a -> STM ()
    closeTBCQueue q = writeTVar q.open False
    

    We’ll make writing a no-op if the channel is closed. (Returning open would be another viable option.)

    writeTBCQueue :: TBCQueue a -> a -> STM ()
    writeTBCQueue q v = do
        stillOpen <- readTVar q.open
        when stillOpen $ writeTBQueue q.queue v
    

    Reading is a little more interesting—we want to wait for a value when the queue is open, and then once it’s closed (and empty!), return Nothing.

    readTBCQueue :: TBCQueue a -> STM (Maybe a)
    readTBCQueue q = do
        -- Try to read from the queue
        maybeV <- tryReadTBQueue q.queue
        case maybeV of
            -- If there was a value in the queue, just return it.
            Just v -> pure $ Just v
            -- If the queue was empty...
            Nothing -> do
                -- ...Is the queue still open?
                -- If so we need to wait,
                -- otherwise return Nothing to indicate it's closed.
                stillOpen <- readTVar q.open
                if stillOpen
                    then retry
                    else pure Nothing
    

    What’s retry, you might ask? It aborts the entire transaction and tries again.

    Add some logic to check when the party ends, and we can gracefully handle EOF:

    main :: IO ()
    main = do
        q <- newTBCQueueIO 1024
        let reader = do
                eof <- isEOF
                if eof
                    then atomically $ closeTBCQueue q
                    else do
                        l <- getLine
                        atomically $ writeTBCQueue q l
                        reader -- loop!
    
        let printer = do
                maybeL <- atomically $ readTBCQueue q
                case maybeL of
                    Nothing -> pure ()
                    Just l -> do
                        putStrLn l
                        printer -- loop!
    
        concurrently_ reader printer
    

    If you’d like to play with this yourself, TBCQueue and some related goodies are available here.

    But first, stop and appreciate the magic. We’re atomically manipulating both the queue and the open flag, and there’s no mutexes in sight. What’s more, readTBCQueue looks like it busy-loops by calling retry, but no cores are harmed when we run the program! The Haskell runtime tracks the TVars involved in each transaction, and only wakes retrying threads when a writer changes one.

    Imagine how you’d implement this wait/wake behavior with condition variables, CASes and futexes, event groups, or whatever other primitives you know and love. It would be tricky, to say the least. Here there’s no spurious wakeups or deadlock to worry about. And, because only STM actions can go in atomically, we can’t accidentally pull arbitrary IO into these critical sections. In the same way Rust makes most memory bugs impossible on the type level, STM wipes out entire categories of concurrency problems.

    I think that’s pretty neat.


    联系我们 contact @ memedata.com