#!/usr/bin/env run.sh {-# LANGUAGE NumericUnderscores #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE QuasiQuotes #-} {-# LANGUAGE NoImplicitPrelude #-} -- | An EDSL to make working with concurrent in-process code a bit easier -- to read. -- -- This module is expected to be imported qualified as `Go`. Inspired by -- Golang and Clojure's core.async. -- -- \$example -- : out go module Control.Concurrent.Go ( -- * Running and forking fork, -- * Channels Channel, Mult, chan, read, write, mult, tap, -- * internal sleep, test, main, ) where import Alpha import qualified Control.Concurrent as Concurrent import qualified Control.Concurrent.Chan.Unagi.Bounded as Chan import qualified Data.Aeson as Aeson import qualified Omni.Cli as Cli import Omni.Test ((@?=)) import qualified Omni.Test as Test import qualified System.IO.Unsafe as Unsafe main :: IO () main = Cli.main <| Cli.Plan { Cli.help = help, Cli.move = \_ -> pure (), Cli.test = test, Cli.tidy = pure } where help = [Cli.docopt| go Usage: go test |] -- | A standard channel. data Channel a = Channel { _in :: !(Chan.InChan a), _out :: !(Chan.OutChan a), _size :: !Int } instance Aeson.ToJSON (Channel a) where toJSON c = Aeson.String ("# len c <> ">" :: Text) where len = show <. Unsafe.unsafePerformIO <. Chan.estimatedLength <. _in -- | Starts a background process. fork :: IO () -> IO Concurrent.ThreadId fork = Concurrent.forkIO -- | Make a new channel. chan :: Int -> IO (Channel a) chan n = do (i, o) <- Chan.newChan n pure <| Channel i o n -- | A channel for broadcasting to multiple consumers. See 'mult'. type Mult a = Chan.OutChan a -- | Duplicates a channel, but then anything written to the source will -- be available to both. This is like Clojure's `core.async/mult` mult :: Channel a -> IO (Mult a) mult = Chan.dupChan <. _in -- | Read a value from a 'Mult'. This is like Clojure's `core.async/tap`. -- -- You can use this to read from a channel in a background process, e.g.: -- -- >>> c <- Go.chan -- >>> Go.fork <. forever <| Go.mult c +> Go.tap +> print tap :: Mult a -> IO a tap = Chan.readChan -- | Take from a channel. Blocks until a value is received. read :: Channel a -> IO a read = Chan.readChan <. _out -- | Write to a channel. Blocks if the channel is full. write :: Channel a -> a -> IO Bool write = Chan.tryWriteChan <. _in -- | Sleep for some number of milliseconds sleep :: Int -> IO () sleep n = threadDelay <| n * 1_000 -- <|example -- -- A simple example from ghci: -- -- >>> import qualified Control.Concurrent.Go as Go -- >>> c <- Go.chan :: IO (Go.Channel Text) -- >>> Go.write c "test" -- >>> Go.read c -- "test" -- -- An example with tap and mult: -- -- >>> c <- Go.chan :: IO (Go.Channel Text) -- >>> Go.write c "hi" -- >>> Go.read c -- "hi" -- m <- Go.mult -- >>> Go.fork <| forever (Go.tap m +> \t -> print ("one: " <> t)) -- ThreadId 810 -- >>> Go.fork <| forever (Go.tap m +> \t -> print ("two: " <> t)) -- ThreadId 825 -- >>> Go.write c "test" -- "two: t"eosnte": -- test" -- -- The text is garbled because the actions are happening concurrently and -- trying to serialize to write the output, but you get the idea. -- test :: Test.Tree test = Test.group "Control.Concurrent.Go" [ Test.unit "simple example" <| do c <- chan 1 :: IO (Channel Text) recv <- mult c _ <- fork (forever (tap recv +> pure)) ret <- write c "simple example" True @?= ret, Test.unit "simple MVar counter" <| do counter <- newEmptyMVar putMVar counter (0 :: Integer) modifyMVar_ counter (pure <. (+ 1)) modifyMVar_ counter (pure <. (+ 1)) modifyMVar_ counter (pure <. (+ 1)) r <- takeMVar counter r @?= 3 {- Why don't these work? Test.unit "subscription counter" <| do counter <- newEmptyMVar :: IO (MVar Integer) putMVar counter 0 let dec = modifyMVar_ counter (\x -> pure <| x -1) let inc = modifyMVar_ counter (pure <. (+ 1)) c <- chan 10 :: IO (Channel Bool) c1 <- mult c _ <- fork (forever (tap c1 +> bool dec inc)) _ <- write c True _ <- write c True _ <- write c True threadDelay 1 r1 <- takeMVar counter r1 @?= 3, Test.unit "SPMC" <| do out1 <- newEmptyMVar out2 <- newEmptyMVar putMVar out1 "init" putMVar out2 "init" c <- chan 10 :: IO (Channel Text) c1 <- mult c c2 <- mult c _ <- fork <| forever (tap c1 +> swapMVar out1 >> pure ()) _ <- fork <| forever (tap c2 +> swapMVar out2 >> pure ()) _ <- write c "test1" _ <- write c "test2" threadDelay 1 r1 <- takeMVar out1 r2 <- takeMVar out2 r1 @?= r2 r1 @?= "test2", Test.unit "Unagi SPMC" <| do out1 <- newEmptyMVar out2 <- newEmptyMVar putMVar out1 "init" putMVar out2 "init" (i, _) <- Chan.newChan 10 :: IO (Chan.InChan Text, Chan.OutChan Text) o1 <- Chan.dupChan i o2 <- Chan.dupChan i _ <- forkIO <| forever (Chan.readChan o1 +> swapMVar out1 >> pure ()) _ <- forkIO <| forever (Chan.readChan o2 +> swapMVar out2 >> pure ()) _ <- Chan.writeChan i "test1" _ <- Chan.writeChan i "test2" threadDelay 1 r1 <- takeMVar out1 r2 <- takeMVar out2 r1 @?= r2 r1 @?= "test2" -} ]