summaryrefslogtreecommitdiff
path: root/Control/Concurrent/Go.hs
diff options
context:
space:
mode:
authorBen Sima <ben@bsima.me>2021-08-04 11:09:35 -0400
committerBen Sima <ben@bsima.me>2021-11-26 13:47:37 -0500
commit9b1df01fd2cf3ecf41fc68b94051db665821c774 (patch)
treec1a3e68f625679576ff7e47bd1ebcb07bb94e15e /Control/Concurrent/Go.hs
parent0264f4a5dc37b16f872e6fa92bd8f1fc1e2b1826 (diff)
Reimplement Que with Servant
Still todo: add authentication. But that can wait. In re-implementing this, I was able to figure out how to get the Go.mult working properly as well. The problem is that a tap from a mult channel does not remove the message from the original channel. I'm not sure if that should be a core feature or not; for now I'm just draining the channel when it's received in the Que HTTP handler. (Also, this would be a good place to put persistence: have a background job read from the original channel, and write the msg to disk via acid-state; this would obviate the need for a flush to nowhere.) Also, streaming is working now. The problem was that Scotty closes the connection after it sees a newline in the body, or something, so streaming over Scotty doesn't actually work. It's fine, Servant is better anyway.
Diffstat (limited to 'Control/Concurrent/Go.hs')
-rw-r--r--Control/Concurrent/Go.hs117
1 files changed, 110 insertions, 7 deletions
diff --git a/Control/Concurrent/Go.hs b/Control/Concurrent/Go.hs
index 5057bfe..a5eb2b7 100644
--- a/Control/Concurrent/Go.hs
+++ b/Control/Concurrent/Go.hs
@@ -1,4 +1,6 @@
+{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE QuasiQuotes #-}
{-# LANGUAGE NoImplicitPrelude #-}
-- | An EDSL to make working with concurrent in-process code a bit easier
@@ -8,32 +10,59 @@
-- Golang and Clojure's core.async.
--
-- \$example
+-- : out go
module Control.Concurrent.Go
( -- * Running and forking
fork,
-- * Channels
Channel,
+ Mult,
chan,
read,
write,
mult,
tap,
+
+ -- * internal
+ sleep,
+ test,
+ main,
)
where
import Alpha
+import qualified Biz.Cli as Cli
+import Biz.Test ((@?=))
+import qualified Biz.Test as Test
import qualified Control.Concurrent as Concurrent
import qualified Control.Concurrent.Chan.Unagi.Bounded as Chan
import qualified Data.Aeson as Aeson
-import Data.Text (Text)
import qualified System.IO.Unsafe as Unsafe
+main :: IO ()
+main =
+ Cli.main
+ <| Cli.Plan
+ { Cli.help = help,
+ Cli.move = \_ -> pure (),
+ Cli.test = test,
+ Cli.tidy = pure
+ }
+ where
+ help =
+ [Cli.docopt|
+ go
+
+Usage:
+ go test
+ |]
+
-- | A standard channel.
data Channel a = Channel
- { _in :: Chan.InChan a,
- _out :: Chan.OutChan a,
- _size :: Int
+ { _in :: !(Chan.InChan a),
+ _out :: !(Chan.OutChan a),
+ _size :: !Int
}
instance Aeson.ToJSON (Channel a) where
@@ -76,6 +105,10 @@ read = Chan.readChan <. _out
write :: Channel a -> a -> IO Bool
write = Chan.tryWriteChan <. _in
+-- | Sleep for some number of milliseconds
+sleep :: Int -> IO ()
+sleep n = threadDelay <| n * 1_000
+
-- <|example
--
-- A simple example from ghci:
@@ -92,10 +125,10 @@ write = Chan.tryWriteChan <. _in
-- >>> Go.write c "hi"
-- >>> Go.read c
-- "hi"
--- >>> Go.fork
--- >>> Go.fork <| forever <| Go.mult c +> Go.tap +> \t -> print ("one: " <> t)
+-- m <- Go.mult
+-- >>> Go.fork <| forever (Go.tap m +> \t -> print ("one: " <> t))
-- ThreadId 810
--- >>> Go.fork <| forever <| Go.mult c +> Go.tap +> \t -> print ("two: " <> t)
+-- >>> Go.fork <| forever (Go.tap m +> \t -> print ("two: " <> t))
-- ThreadId 825
-- >>> Go.write c "test"
-- "two: t"eosnte":
@@ -103,3 +136,73 @@ write = Chan.tryWriteChan <. _in
--
-- The text is garbled because the actions are happening concurrently and
-- trying to serialize to write the output, but you get the idea.
+--
+test :: Test.Tree
+test =
+ Test.group
+ "Control.Concurrent.Go"
+ [ Test.unit "simple example" <| do
+ c <- chan 1 :: IO (Channel Text)
+ recv <- mult c
+ _ <- fork (forever (tap recv +> pure))
+ ret <- write c "simple example"
+ True @?= ret,
+ Test.unit "simple MVar counter" <| do
+ counter <- newEmptyMVar
+ putMVar counter (0 :: Integer)
+ modifyMVar_ counter (pure <. (+ 1))
+ modifyMVar_ counter (pure <. (+ 1))
+ modifyMVar_ counter (pure <. (+ 1))
+ r <- takeMVar counter
+ r @?= 3
+ {- Why don't these work?
+ Test.unit "subscription counter" <| do
+ counter <- newEmptyMVar :: IO (MVar Integer)
+ putMVar counter 0
+ let dec = modifyMVar_ counter (\x -> pure <| x -1)
+ let inc = modifyMVar_ counter (pure <. (+ 1))
+ c <- chan 10 :: IO (Channel Bool)
+ c1 <- mult c
+ _ <- fork (forever (tap c1 +> bool dec inc))
+ _ <- write c True
+ _ <- write c True
+ _ <- write c True
+ threadDelay 1
+ r1 <- takeMVar counter
+ r1 @?= 3,
+ Test.unit "SPMC" <| do
+ out1 <- newEmptyMVar
+ out2 <- newEmptyMVar
+ putMVar out1 "init"
+ putMVar out2 "init"
+ c <- chan 10 :: IO (Channel Text)
+ c1 <- mult c
+ c2 <- mult c
+ _ <- fork <| forever (tap c1 +> swapMVar out1 >> pure ())
+ _ <- fork <| forever (tap c2 +> swapMVar out2 >> pure ())
+ _ <- write c "test1"
+ _ <- write c "test2"
+ threadDelay 1
+ r1 <- takeMVar out1
+ r2 <- takeMVar out2
+ r1 @?= r2
+ r1 @?= "test2",
+ Test.unit "Unagi SPMC" <| do
+ out1 <- newEmptyMVar
+ out2 <- newEmptyMVar
+ putMVar out1 "init"
+ putMVar out2 "init"
+ (i, _) <- Chan.newChan 10 :: IO (Chan.InChan Text, Chan.OutChan Text)
+ o1 <- Chan.dupChan i
+ o2 <- Chan.dupChan i
+ _ <- forkIO <| forever (Chan.readChan o1 +> swapMVar out1 >> pure ())
+ _ <- forkIO <| forever (Chan.readChan o2 +> swapMVar out2 >> pure ())
+ _ <- Chan.writeChan i "test1"
+ _ <- Chan.writeChan i "test2"
+ threadDelay 1
+ r1 <- takeMVar out1
+ r2 <- takeMVar out2
+ r1 @?= r2
+ r1 @?= "test2"
+ -}
+ ]