From 9b1df01fd2cf3ecf41fc68b94051db665821c774 Mon Sep 17 00:00:00 2001 From: Ben Sima Date: Wed, 4 Aug 2021 11:09:35 -0400 Subject: Reimplement Que with Servant Still todo: add authentication. But that can wait. In re-implementing this, I was able to figure out how to get the Go.mult working properly as well. The problem is that a tap from a mult channel does not remove the message from the original channel. I'm not sure if that should be a core feature or not; for now I'm just draining the channel when it's received in the Que HTTP handler. (Also, this would be a good place to put persistence: have a background job read from the original channel, and write the msg to disk via acid-state; this would obviate the need for a flush to nowhere.) Also, streaming is working now. The problem was that Scotty closes the connection after it sees a newline in the body, or something, so streaming over Scotty doesn't actually work. It's fine, Servant is better anyway. --- Control/Concurrent/Go.hs | 117 ++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 110 insertions(+), 7 deletions(-) (limited to 'Control/Concurrent') diff --git a/Control/Concurrent/Go.hs b/Control/Concurrent/Go.hs index 5057bfe..a5eb2b7 100644 --- a/Control/Concurrent/Go.hs +++ b/Control/Concurrent/Go.hs @@ -1,4 +1,6 @@ +{-# LANGUAGE NumericUnderscores #-} {-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE QuasiQuotes #-} {-# LANGUAGE NoImplicitPrelude #-} -- | An EDSL to make working with concurrent in-process code a bit easier @@ -8,32 +10,59 @@ -- Golang and Clojure's core.async. -- -- \$example +-- : out go module Control.Concurrent.Go ( -- * Running and forking fork, -- * Channels Channel, + Mult, chan, read, write, mult, tap, + + -- * internal + sleep, + test, + main, ) where import Alpha +import qualified Biz.Cli as Cli +import Biz.Test ((@?=)) +import qualified Biz.Test as Test import qualified Control.Concurrent as Concurrent import qualified Control.Concurrent.Chan.Unagi.Bounded as Chan import qualified Data.Aeson as Aeson -import Data.Text (Text) import qualified System.IO.Unsafe as Unsafe +main :: IO () +main = + Cli.main + <| Cli.Plan + { Cli.help = help, + Cli.move = \_ -> pure (), + Cli.test = test, + Cli.tidy = pure + } + where + help = + [Cli.docopt| + go + +Usage: + go test + |] + -- | A standard channel. data Channel a = Channel - { _in :: Chan.InChan a, - _out :: Chan.OutChan a, - _size :: Int + { _in :: !(Chan.InChan a), + _out :: !(Chan.OutChan a), + _size :: !Int } instance Aeson.ToJSON (Channel a) where @@ -76,6 +105,10 @@ read = Chan.readChan <. _out write :: Channel a -> a -> IO Bool write = Chan.tryWriteChan <. _in +-- | Sleep for some number of milliseconds +sleep :: Int -> IO () +sleep n = threadDelay <| n * 1_000 + -- <|example -- -- A simple example from ghci: @@ -92,10 +125,10 @@ write = Chan.tryWriteChan <. _in -- >>> Go.write c "hi" -- >>> Go.read c -- "hi" --- >>> Go.fork --- >>> Go.fork <| forever <| Go.mult c +> Go.tap +> \t -> print ("one: " <> t) +-- m <- Go.mult +-- >>> Go.fork <| forever (Go.tap m +> \t -> print ("one: " <> t)) -- ThreadId 810 --- >>> Go.fork <| forever <| Go.mult c +> Go.tap +> \t -> print ("two: " <> t) +-- >>> Go.fork <| forever (Go.tap m +> \t -> print ("two: " <> t)) -- ThreadId 825 -- >>> Go.write c "test" -- "two: t"eosnte": @@ -103,3 +136,73 @@ write = Chan.tryWriteChan <. _in -- -- The text is garbled because the actions are happening concurrently and -- trying to serialize to write the output, but you get the idea. +-- +test :: Test.Tree +test = + Test.group + "Control.Concurrent.Go" + [ Test.unit "simple example" <| do + c <- chan 1 :: IO (Channel Text) + recv <- mult c + _ <- fork (forever (tap recv +> pure)) + ret <- write c "simple example" + True @?= ret, + Test.unit "simple MVar counter" <| do + counter <- newEmptyMVar + putMVar counter (0 :: Integer) + modifyMVar_ counter (pure <. (+ 1)) + modifyMVar_ counter (pure <. (+ 1)) + modifyMVar_ counter (pure <. (+ 1)) + r <- takeMVar counter + r @?= 3 + {- Why don't these work? + Test.unit "subscription counter" <| do + counter <- newEmptyMVar :: IO (MVar Integer) + putMVar counter 0 + let dec = modifyMVar_ counter (\x -> pure <| x -1) + let inc = modifyMVar_ counter (pure <. (+ 1)) + c <- chan 10 :: IO (Channel Bool) + c1 <- mult c + _ <- fork (forever (tap c1 +> bool dec inc)) + _ <- write c True + _ <- write c True + _ <- write c True + threadDelay 1 + r1 <- takeMVar counter + r1 @?= 3, + Test.unit "SPMC" <| do + out1 <- newEmptyMVar + out2 <- newEmptyMVar + putMVar out1 "init" + putMVar out2 "init" + c <- chan 10 :: IO (Channel Text) + c1 <- mult c + c2 <- mult c + _ <- fork <| forever (tap c1 +> swapMVar out1 >> pure ()) + _ <- fork <| forever (tap c2 +> swapMVar out2 >> pure ()) + _ <- write c "test1" + _ <- write c "test2" + threadDelay 1 + r1 <- takeMVar out1 + r2 <- takeMVar out2 + r1 @?= r2 + r1 @?= "test2", + Test.unit "Unagi SPMC" <| do + out1 <- newEmptyMVar + out2 <- newEmptyMVar + putMVar out1 "init" + putMVar out2 "init" + (i, _) <- Chan.newChan 10 :: IO (Chan.InChan Text, Chan.OutChan Text) + o1 <- Chan.dupChan i + o2 <- Chan.dupChan i + _ <- forkIO <| forever (Chan.readChan o1 +> swapMVar out1 >> pure ()) + _ <- forkIO <| forever (Chan.readChan o2 +> swapMVar out2 >> pure ()) + _ <- Chan.writeChan i "test1" + _ <- Chan.writeChan i "test2" + threadDelay 1 + r1 <- takeMVar out1 + r2 <- takeMVar out2 + r1 @?= r2 + r1 @?= "test2" + -} + ] -- cgit v1.2.3