summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBen Sima <ben@bsima.me>2020-04-01 22:51:47 -0700
committerBen Sima <ben@bsima.me>2020-04-01 22:51:47 -0700
commit4ef954f7b3e9b5d99d1030843c2633dbd76f37c0 (patch)
treeb72249c4a5a348a193d9995f1f6228d779150c92
parent1877ee48948e5952a694cfc666ff65a986211c46 (diff)
Bound ques to about 10 items
I also block before taking, instead of doing a mult and tap. This is a simple way to fix the memory issue, and makes them conceptually simpler to work with I think. The channels are still mutli-consumer and multi-producer, which is fine. I'm not sure now I will implement the regular pubsub, but I'm not sure there is a great usecase for that anyway.
-rw-r--r--Com/Simatime/Go.hs12
-rw-r--r--Run/Que/Server.hs15
2 files changed, 10 insertions, 17 deletions
diff --git a/Com/Simatime/Go.hs b/Com/Simatime/Go.hs
index e552a90..bd9296a 100644
--- a/Com/Simatime/Go.hs
+++ b/Com/Simatime/Go.hs
@@ -23,12 +23,14 @@ where
import Com.Simatime.Alpha hiding ( read )
import qualified Control.Concurrent as Concurrent
-import qualified Control.Concurrent.Chan.Unagi as Chan
+import qualified Control.Concurrent.Chan.Unagi.Bounded
+ as Chan
-- | A standard channel.
data Channel a = Channel
{ _in :: Chan.InChan a
, _out :: Chan.OutChan a
+ , _size :: Int
}
-- | Starts a background process.
@@ -36,10 +38,10 @@ fork :: IO () -> IO Concurrent.ThreadId
fork = Concurrent.forkIO
-- | Make a new channel.
-chan :: IO (Channel a)
-chan = do
- (i, o) <- Chan.newChan
- return <| Channel i o
+chan :: Int -> IO (Channel a)
+chan n = do
+ (i, o) <- Chan.newChan n
+ return <| Channel i o n
-- | A channel for broadcasting to multiple consumers. See 'mult'.
type Mult a = Chan.OutChan a
diff --git a/Run/Que/Server.hs b/Run/Que/Server.hs
index 69af529..e5094cd 100644
--- a/Run/Que/Server.hs
+++ b/Run/Que/Server.hs
@@ -96,7 +96,7 @@ routes = do
case poll of
True -> Scotty.stream $ streamQue q
_ -> do
- r <- liftIO <| takeQue q
+ r <- liftIO <| Go.read q
Scotty.html <| fromStrict <| Encoding.decodeUtf8 r
-- | POST que
@@ -119,7 +119,7 @@ routes = do
app . modify <| upsertNamespace ns
q <- app <| que ns qp
qdata <- Scotty.body
- liftIO <| pushQue (BSL.toStrict qdata) q
+ liftIO <| Go.write q <| BSL.toStrict qdata
return ()
-- | Given `guardNs ns whitelist`, if `ns` is not in the `whitelist`
@@ -222,15 +222,6 @@ que ns qp = do
if queExists
then return <| grab qp qbase
else do
- c <- liftIO Go.chan
+ c <- liftIO <| Go.chan 5
modify (insertQue ns qp c)
gets ques /> grab ns /> grab qp
-
--- | Put data on the que.
-pushQue :: Quedata -> Que -> IO ()
-pushQue = flip Go.write
-
--- | Tap and read from the Que. Tap first because a Que is actually a
--- broadcast channel. This allows for multiconsumer Ques.
-takeQue :: Que -> IO Quedata
-takeQue ch = Go.mult ch >>= Go.tap