summaryrefslogtreecommitdiff
path: root/Com
diff options
context:
space:
mode:
authorBen Sima <ben@bsima.me>2020-03-29 03:59:33 -0700
committerBen Sima <ben@bsima.me>2020-03-30 07:58:13 -0700
commitf7381fe3d559e0e583b89d96dee595fe0a14d9d5 (patch)
tree6e4a0b9ce37d695cfa0076d60ac6a6c3fdecd5cb /Com
parentec671288cb026e8f4f4e2bc946a43bd9278efd2e (diff)
Add 'broadcast' and 'tap'
Diffstat (limited to 'Com')
-rw-r--r--Com/Simatime/Go.hs40
1 files changed, 23 insertions, 17 deletions
diff --git a/Com/Simatime/Go.hs b/Com/Simatime/Go.hs
index 0eef38d..d9fb2d2 100644
--- a/Com/Simatime/Go.hs
+++ b/Com/Simatime/Go.hs
@@ -1,7 +1,8 @@
-{- | A Go-like EDSL to make working with concurrent in-process code a bit
- easier to read.
+{- | An EDSL to make working with concurrent in-process code a bit easier
+ to read.
-This module is expected to be imported qualified as `Go`.
+This module is expected to be imported qualified as `Go`. Inspired by
+Golang and Clojure's core.async.
$example
-}
@@ -15,6 +16,8 @@ module Com.Simatime.Go
-- * Channels
, Channel
, chan
+ , broadcast
+ , tap
, read
, write
)
@@ -23,21 +26,14 @@ where
import Control.Concurrent ( forkIO
, ThreadId
)
-import Control.Concurrent.STM.TChan ( newTChan
- , readTChan
- , writeTChan
- , TChan
- )
+import qualified Control.Concurrent.STM.TChan as TChan
import GHC.Conc ( STM
, atomically
)
-import Protolude ( IO()
- , MonadIO(liftIO)
- , flip
- , (.)
- )
+import Protolude ( IO )
+
type Go = STM
-type Channel = TChan
+type Channel = TChan.TChan
-- | Runs a Go command in IO.
run :: Go a -> IO a
@@ -49,15 +45,25 @@ fork = forkIO
-- | Make a new channel.
chan :: Go (Channel a)
-chan = newTChan
+chan = TChan.newTChan
+
+-- | Make a read-only channel.
+broadcast :: Go (Channel a)
+broadcast = TChan.newBroadcastTChan
+
+-- | Duplicates a channel, but then anything written to the source will
+-- be available to both. This is like a combination of Clojure's
+-- `core.async/mult` and `core.async/tap` but.
+tap :: Channel a -> Go (Channel a)
+tap = TChan.dupTChan
-- | Take from a channel. Blocks until a value is received.
read :: Channel a -> Go a
-read = readTChan
+read = TChan.readTChan
-- | Write to a channel.
write :: Channel a -> a -> Go ()
-write = writeTChan
+write = TChan.writeTChan
{- $example