summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBen Sima <ben@bsima.me>2020-03-29 18:41:14 -0700
committerBen Sima <ben@bsima.me>2020-03-30 07:58:13 -0700
commit4fab0c52d2bae4076e79bf86ee4f3b8e08392a86 (patch)
tree1e60f49065fffc80ae7208403390ef897d499846
parentf7381fe3d559e0e583b89d96dee595fe0a14d9d5 (diff)
Rewrite Go to use unagi-chan
The performance is reportedly better. The API is simpler. Also with STM channels, I couldn't get multiconsumer to work. I was able to get it to work with unagi. Also I could write 'mult' and 'tap' which bring me back to my Clojure days.
-rw-r--r--Com/Simatime/Go.hs94
1 files changed, 57 insertions, 37 deletions
diff --git a/Com/Simatime/Go.hs b/Com/Simatime/Go.hs
index d9fb2d2..e552a90 100644
--- a/Com/Simatime/Go.hs
+++ b/Com/Simatime/Go.hs
@@ -10,68 +10,88 @@ $example
module Com.Simatime.Go
(
-- * Running and forking
- Go
- , run
- , fork
+ fork
-- * Channels
, Channel
, chan
- , broadcast
- , tap
, read
, write
+ , mult
+ , tap
)
where
-import Control.Concurrent ( forkIO
- , ThreadId
- )
-import qualified Control.Concurrent.STM.TChan as TChan
-import GHC.Conc ( STM
- , atomically
- )
-import Protolude ( IO )
+import Com.Simatime.Alpha hiding ( read )
+import qualified Control.Concurrent as Concurrent
+import qualified Control.Concurrent.Chan.Unagi as Chan
-type Go = STM
-type Channel = TChan.TChan
-
--- | Runs a Go command in IO.
-run :: Go a -> IO a
-run = atomically
+-- | A standard channel.
+data Channel a = Channel
+ { _in :: Chan.InChan a
+ , _out :: Chan.OutChan a
+ }
-- | Starts a background process.
-fork :: IO () -> IO ThreadId
-fork = forkIO
+fork :: IO () -> IO Concurrent.ThreadId
+fork = Concurrent.forkIO
-- | Make a new channel.
-chan :: Go (Channel a)
-chan = TChan.newTChan
+chan :: IO (Channel a)
+chan = do
+ (i, o) <- Chan.newChan
+ return <| Channel i o
--- | Make a read-only channel.
-broadcast :: Go (Channel a)
-broadcast = TChan.newBroadcastTChan
+-- | A channel for broadcasting to multiple consumers. See 'mult'.
+type Mult a = Chan.OutChan a
-- | Duplicates a channel, but then anything written to the source will
--- be available to both. This is like a combination of Clojure's
--- `core.async/mult` and `core.async/tap` but.
-tap :: Channel a -> Go (Channel a)
-tap = TChan.dupTChan
+-- be available to both. This is like Clojure's `core.async/mult`
+mult :: Channel a -> IO (Mult a)
+mult = Chan.dupChan . _in
+
+-- | Read a value from a 'Mult'. This is like Clojure's `core.async/tap`.
+--
+-- You can use this to read from a channel in a background process, e.g.:
+--
+-- >>> c <- Go.chan
+-- >>> Go.fork . forever <| Go.mult c >>= Go.tap >>= print
+tap :: Mult a -> IO a
+tap = Chan.readChan
-- | Take from a channel. Blocks until a value is received.
-read :: Channel a -> Go a
-read = TChan.readTChan
+read :: Channel a -> IO a
+read = Chan.readChan . _out
-- | Write to a channel.
-write :: Channel a -> a -> Go ()
-write = TChan.writeTChan
+write :: Channel a -> a -> IO ()
+write = Chan.writeChan . _in
{- $example
A simple example from ghci:
>>> import qualified Com.Simatime.Go as Go
->>> c <- Go.run Go.chan :: IO (Go.Channel Text)
->>> Go.run $ Go.write c "test"
->>> Go.run $ Go.read c
+>>> c <- Go.chan :: IO (Go.Channel Text)
+>>> Go.write c "test"
+>>> Go.read c
"test"
+
+An example with tap and mult:
+
+>>> c <- Go.chan :: IO (Go.Channel Text)
+>>> Go.write c "hi"
+>>> Go.read c
+"hi"
+>>> Go.fork
+>>> Go.fork $ forever $ Go.mult c >>= Go.tap >>= \t -> print ("one: " <> t)
+ThreadId 810
+>>> Go.fork $ forever $ Go.mult c >>= Go.tap >>= \t -> print ("two: " <> t)
+ThreadId 825
+>>> Go.write c "test"
+"two: t"eosnte":
+ test"
+
+The text is garbled because the actions are happening concurrently and
+trying to serialize to write the output, but you get the idea.
+
-}