From ea3039d585d1c7462084aa4822be230b411d15e9 Mon Sep 17 00:00:00 2001 From: Ben Sima Date: Sun, 12 Apr 2020 15:08:11 -0700 Subject: Move Com.Simatime.Go to Control.Concurrent.Go --- Com/Simatime/Go.hs | 106 ---------------------------------------------- Control/Concurrent/Go.hs | 107 +++++++++++++++++++++++++++++++++++++++++++++++ Run/Que/Server.hs | 2 +- 3 files changed, 108 insertions(+), 107 deletions(-) delete mode 100644 Com/Simatime/Go.hs create mode 100644 Control/Concurrent/Go.hs diff --git a/Com/Simatime/Go.hs b/Com/Simatime/Go.hs deleted file mode 100644 index 01555f3..0000000 --- a/Com/Simatime/Go.hs +++ /dev/null @@ -1,106 +0,0 @@ -{- | An EDSL to make working with concurrent in-process code a bit easier - to read. - -This module is expected to be imported qualified as `Go`. Inspired by -Golang and Clojure's core.async. - -$example --} -{-# LANGUAGE NoImplicitPrelude #-} -{-# LANGUAGE OverloadedStrings #-} -module Com.Simatime.Go - ( - -- * Running and forking - fork - -- * Channels - , Channel - , chan - , read - , write - , mult - , tap - ) -where - -import Alpha hiding ( read ) -import qualified Control.Concurrent as Concurrent -import qualified Control.Concurrent.Chan.Unagi.Bounded - as Chan -import qualified Data.Aeson as Aeson -import qualified System.IO.Unsafe as Unsafe - --- | A standard channel. -data Channel a = Channel - { _in :: Chan.InChan a - , _out :: Chan.OutChan a - , _size :: Int - } - -instance Aeson.ToJSON (Channel a) where - toJSON c = Aeson.String ("# len c <> ">" :: Text) - where len = show . Unsafe.unsafePerformIO . Chan.estimatedLength . _in - --- | Starts a background process. -fork :: IO () -> IO Concurrent.ThreadId -fork = Concurrent.forkIO - --- | Make a new channel. -chan :: Int -> IO (Channel a) -chan n = do - (i, o) <- Chan.newChan n - return <| Channel i o n - --- | A channel for broadcasting to multiple consumers. See 'mult'. -type Mult a = Chan.OutChan a - --- | Duplicates a channel, but then anything written to the source will --- be available to both. This is like Clojure's `core.async/mult` -mult :: Channel a -> IO (Mult a) -mult = Chan.dupChan . _in - --- | Read a value from a 'Mult'. This is like Clojure's `core.async/tap`. --- --- You can use this to read from a channel in a background process, e.g.: --- --- >>> c <- Go.chan --- >>> Go.fork . forever <| Go.mult c >>= Go.tap >>= print -tap :: Mult a -> IO a -tap = Chan.readChan - --- | Take from a channel. Blocks until a value is received. -read :: Channel a -> IO a -read = Chan.readChan . _out - --- | Write to a channel. Blocks if the channel is full. -write :: Channel a -> a -> IO Bool -write = Chan.tryWriteChan . _in - -{- $example - -A simple example from ghci: - ->>> import qualified Com.Simatime.Go as Go ->>> c <- Go.chan :: IO (Go.Channel Text) ->>> Go.write c "test" ->>> Go.read c -"test" - -An example with tap and mult: - ->>> c <- Go.chan :: IO (Go.Channel Text) ->>> Go.write c "hi" ->>> Go.read c -"hi" ->>> Go.fork ->>> Go.fork $ forever $ Go.mult c >>= Go.tap >>= \t -> print ("one: " <> t) -ThreadId 810 ->>> Go.fork $ forever $ Go.mult c >>= Go.tap >>= \t -> print ("two: " <> t) -ThreadId 825 ->>> Go.write c "test" -"two: t"eosnte": - test" - -The text is garbled because the actions are happening concurrently and -trying to serialize to write the output, but you get the idea. - --} diff --git a/Control/Concurrent/Go.hs b/Control/Concurrent/Go.hs new file mode 100644 index 0000000..1bb0b86 --- /dev/null +++ b/Control/Concurrent/Go.hs @@ -0,0 +1,107 @@ +{- | An EDSL to make working with concurrent in-process code a bit easier + to read. + +This module is expected to be imported qualified as `Go`. Inspired by +Golang and Clojure's core.async. + +$example +-} +{-# LANGUAGE NoImplicitPrelude #-} +{-# LANGUAGE OverloadedStrings #-} +module Control.Concurrent.Go + ( + -- * Running and forking + fork + -- * Channels + , Channel + , chan + , read + , write + , mult + , tap + ) +where + +import Alpha +import qualified Control.Concurrent as Concurrent +import qualified Control.Concurrent.Chan.Unagi.Bounded + as Chan +import qualified Data.Aeson as Aeson +import Data.Text ( Text ) +import qualified System.IO.Unsafe as Unsafe + +-- | A standard channel. +data Channel a = Channel + { _in :: Chan.InChan a + , _out :: Chan.OutChan a + , _size :: Int + } + +instance Aeson.ToJSON (Channel a) where + toJSON c = Aeson.String ("# len c <> ">" :: Text) + where len = show . Unsafe.unsafePerformIO . Chan.estimatedLength . _in + +-- | Starts a background process. +fork :: IO () -> IO Concurrent.ThreadId +fork = Concurrent.forkIO + +-- | Make a new channel. +chan :: Int -> IO (Channel a) +chan n = do + (i, o) <- Chan.newChan n + return $ Channel i o n + +-- | A channel for broadcasting to multiple consumers. See 'mult'. +type Mult a = Chan.OutChan a + +-- | Duplicates a channel, but then anything written to the source will +-- be available to both. This is like Clojure's `core.async/mult` +mult :: Channel a -> IO (Mult a) +mult = Chan.dupChan . _in + +-- | Read a value from a 'Mult'. This is like Clojure's `core.async/tap`. +-- +-- You can use this to read from a channel in a background process, e.g.: +-- +-- >>> c <- Go.chan +-- >>> Go.fork . forever <| Go.mult c >>= Go.tap >>= print +tap :: Mult a -> IO a +tap = Chan.readChan + +-- | Take from a channel. Blocks until a value is received. +read :: Channel a -> IO a +read = Chan.readChan . _out + +-- | Write to a channel. Blocks if the channel is full. +write :: Channel a -> a -> IO Bool +write = Chan.tryWriteChan . _in + +{- $example + +A simple example from ghci: + +>>> import qualified Control.Concurrent.Go as Go +>>> c <- Go.chan :: IO (Go.Channel Text) +>>> Go.write c "test" +>>> Go.read c +"test" + +An example with tap and mult: + +>>> c <- Go.chan :: IO (Go.Channel Text) +>>> Go.write c "hi" +>>> Go.read c +"hi" +>>> Go.fork +>>> Go.fork $ forever $ Go.mult c >>= Go.tap >>= \t -> print ("one: " <> t) +ThreadId 810 +>>> Go.fork $ forever $ Go.mult c >>= Go.tap >>= \t -> print ("two: " <> t) +ThreadId 825 +>>> Go.write c "test" +"two: t"eosnte": + test" + +The text is garbled because the actions are happening concurrently and +trying to serialize to write the output, but you get the idea. + +-} diff --git a/Run/Que/Server.hs b/Run/Que/Server.hs index e807502..f729795 100644 --- a/Run/Que/Server.hs +++ b/Run/Que/Server.hs @@ -26,7 +26,7 @@ import Alpha hiding ( Text , modify , poll ) -import qualified Com.Simatime.Go as Go +import qualified Control.Concurrent.Go as Go import qualified Control.Concurrent.STM as STM import qualified Control.Exception as Exception import Control.Monad.Reader ( MonadTrans ) -- cgit v1.2.3