diff options
Diffstat (limited to 'Control')
-rw-r--r-- | Control/Concurrent/Go.hs | 107 |
1 files changed, 107 insertions, 0 deletions
diff --git a/Control/Concurrent/Go.hs b/Control/Concurrent/Go.hs new file mode 100644 index 0000000..1bb0b86 --- /dev/null +++ b/Control/Concurrent/Go.hs @@ -0,0 +1,107 @@ +{- | An EDSL to make working with concurrent in-process code a bit easier + to read. + +This module is expected to be imported qualified as `Go`. Inspired by +Golang and Clojure's core.async. + +$example +-} +{-# LANGUAGE NoImplicitPrelude #-} +{-# LANGUAGE OverloadedStrings #-} +module Control.Concurrent.Go + ( + -- * Running and forking + fork + -- * Channels + , Channel + , chan + , read + , write + , mult + , tap + ) +where + +import Alpha +import qualified Control.Concurrent as Concurrent +import qualified Control.Concurrent.Chan.Unagi.Bounded + as Chan +import qualified Data.Aeson as Aeson +import Data.Text ( Text ) +import qualified System.IO.Unsafe as Unsafe + +-- | A standard channel. +data Channel a = Channel + { _in :: Chan.InChan a + , _out :: Chan.OutChan a + , _size :: Int + } + +instance Aeson.ToJSON (Channel a) where + toJSON c = Aeson.String ("#<channel " <> len c <> ">" :: Text) + where len = show . Unsafe.unsafePerformIO . Chan.estimatedLength . _in + +-- | Starts a background process. +fork :: IO () -> IO Concurrent.ThreadId +fork = Concurrent.forkIO + +-- | Make a new channel. +chan :: Int -> IO (Channel a) +chan n = do + (i, o) <- Chan.newChan n + return $ Channel i o n + +-- | A channel for broadcasting to multiple consumers. See 'mult'. +type Mult a = Chan.OutChan a + +-- | Duplicates a channel, but then anything written to the source will +-- be available to both. This is like Clojure's `core.async/mult` +mult :: Channel a -> IO (Mult a) +mult = Chan.dupChan . _in + +-- | Read a value from a 'Mult'. This is like Clojure's `core.async/tap`. +-- +-- You can use this to read from a channel in a background process, e.g.: +-- +-- >>> c <- Go.chan +-- >>> Go.fork . forever <| Go.mult c >>= Go.tap >>= print +tap :: Mult a -> IO a +tap = Chan.readChan + +-- | Take from a channel. Blocks until a value is received. +read :: Channel a -> IO a +read = Chan.readChan . _out + +-- | Write to a channel. Blocks if the channel is full. +write :: Channel a -> a -> IO Bool +write = Chan.tryWriteChan . _in + +{- $example + +A simple example from ghci: + +>>> import qualified Control.Concurrent.Go as Go +>>> c <- Go.chan :: IO (Go.Channel Text) +>>> Go.write c "test" +>>> Go.read c +"test" + +An example with tap and mult: + +>>> c <- Go.chan :: IO (Go.Channel Text) +>>> Go.write c "hi" +>>> Go.read c +"hi" +>>> Go.fork +>>> Go.fork $ forever $ Go.mult c >>= Go.tap >>= \t -> print ("one: " <> t) +ThreadId 810 +>>> Go.fork $ forever $ Go.mult c >>= Go.tap >>= \t -> print ("two: " <> t) +ThreadId 825 +>>> Go.write c "test" +"two: t"eosnte": + test" + +The text is garbled because the actions are happening concurrently and +trying to serialize to write the output, but you get the idea. + +-} |