summaryrefslogtreecommitdiff
path: root/Control/Concurrent
diff options
context:
space:
mode:
Diffstat (limited to 'Control/Concurrent')
-rw-r--r--Control/Concurrent/Go.hs107
1 files changed, 107 insertions, 0 deletions
diff --git a/Control/Concurrent/Go.hs b/Control/Concurrent/Go.hs
new file mode 100644
index 0000000..1bb0b86
--- /dev/null
+++ b/Control/Concurrent/Go.hs
@@ -0,0 +1,107 @@
+{- | An EDSL to make working with concurrent in-process code a bit easier
+ to read.
+
+This module is expected to be imported qualified as `Go`. Inspired by
+Golang and Clojure's core.async.
+
+$example
+-}
+{-# LANGUAGE NoImplicitPrelude #-}
+{-# LANGUAGE OverloadedStrings #-}
+module Control.Concurrent.Go
+ (
+ -- * Running and forking
+ fork
+ -- * Channels
+ , Channel
+ , chan
+ , read
+ , write
+ , mult
+ , tap
+ )
+where
+
+import Alpha
+import qualified Control.Concurrent as Concurrent
+import qualified Control.Concurrent.Chan.Unagi.Bounded
+ as Chan
+import qualified Data.Aeson as Aeson
+import Data.Text ( Text )
+import qualified System.IO.Unsafe as Unsafe
+
+-- | A standard channel.
+data Channel a = Channel
+ { _in :: Chan.InChan a
+ , _out :: Chan.OutChan a
+ , _size :: Int
+ }
+
+instance Aeson.ToJSON (Channel a) where
+ toJSON c = Aeson.String ("#<channel " <> len c <> ">" :: Text)
+ where len = show . Unsafe.unsafePerformIO . Chan.estimatedLength . _in
+
+-- | Starts a background process.
+fork :: IO () -> IO Concurrent.ThreadId
+fork = Concurrent.forkIO
+
+-- | Make a new channel.
+chan :: Int -> IO (Channel a)
+chan n = do
+ (i, o) <- Chan.newChan n
+ return $ Channel i o n
+
+-- | A channel for broadcasting to multiple consumers. See 'mult'.
+type Mult a = Chan.OutChan a
+
+-- | Duplicates a channel, but then anything written to the source will
+-- be available to both. This is like Clojure's `core.async/mult`
+mult :: Channel a -> IO (Mult a)
+mult = Chan.dupChan . _in
+
+-- | Read a value from a 'Mult'. This is like Clojure's `core.async/tap`.
+--
+-- You can use this to read from a channel in a background process, e.g.:
+--
+-- >>> c <- Go.chan
+-- >>> Go.fork . forever <| Go.mult c >>= Go.tap >>= print
+tap :: Mult a -> IO a
+tap = Chan.readChan
+
+-- | Take from a channel. Blocks until a value is received.
+read :: Channel a -> IO a
+read = Chan.readChan . _out
+
+-- | Write to a channel. Blocks if the channel is full.
+write :: Channel a -> a -> IO Bool
+write = Chan.tryWriteChan . _in
+
+{- $example
+
+A simple example from ghci:
+
+>>> import qualified Control.Concurrent.Go as Go
+>>> c <- Go.chan :: IO (Go.Channel Text)
+>>> Go.write c "test"
+>>> Go.read c
+"test"
+
+An example with tap and mult:
+
+>>> c <- Go.chan :: IO (Go.Channel Text)
+>>> Go.write c "hi"
+>>> Go.read c
+"hi"
+>>> Go.fork
+>>> Go.fork $ forever $ Go.mult c >>= Go.tap >>= \t -> print ("one: " <> t)
+ThreadId 810
+>>> Go.fork $ forever $ Go.mult c >>= Go.tap >>= \t -> print ("two: " <> t)
+ThreadId 825
+>>> Go.write c "test"
+"two: t"eosnte":
+ test"
+
+The text is garbled because the actions are happening concurrently and
+trying to serialize to write the output, but you get the idea.
+
+-}