summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lore/Control/Concurrent/Go.hs101
-rw-r--r--lore/Control/Concurrent/Sima.hs12
2 files changed, 113 insertions, 0 deletions
diff --git a/lore/Control/Concurrent/Go.hs b/lore/Control/Concurrent/Go.hs
new file mode 100644
index 0000000..05626d7
--- /dev/null
+++ b/lore/Control/Concurrent/Go.hs
@@ -0,0 +1,101 @@
+-- based on
+-- https://stackoverflow.com/questions/4522387/how-can-i-emulate-gos-channels-with-haskell
+-- but this version encodes end-of-stream on the communication channel, as a Nothing
+
+module Control.Concurrent.Go
+ ( chan
+ , readCh
+ , (-<-)
+ , writeCh
+ , (->-)
+ , go
+ )
+where
+
+import Control.Concurrent ( forkIO
+ , ThreadId
+ , threadDelay
+ )
+import Control.Concurrent.STM.TChan ( newTChan
+ , readTChan
+ , writeTChan
+ , isEmptyTChan
+ , TChan
+ )
+import Control.Monad ( forM_ )
+import GHC.Conc ( atomically )
+
+-- | Make a new channel.
+chan :: _
+chan = atomically . newTChan
+
+-- | Take from a channel.
+readCh :: TChan a -> IO a
+readCh = atomically . readTChan
+
+-- | Alias for 'readCh'.
+--
+-- >>> c <- chan
+-- >>> writeCh c "val"
+-- >>> -<- c
+-- "val"
+--
+-- I don't think this looks terrible with do-notation:
+--
+-- >>> c <- chan
+-- >>> writeCh c "val"
+-- >>> result <- -<- c
+-- >>> print result
+-- "val"
+(-<-) :: TChan a -> IO a
+(-<-) = readCh
+
+-- | Write to a channel.
+writeCh :: TChan a -> a -> IO ()
+writeCh ch v = atomically $ writeTChan ch v
+
+-- | Alias for 'writeCh', but flipped to make it read better.
+--
+-- >>> c <- chan
+-- >>> "val" ->- c
+-- >>> readCh c
+-- "val"
+(->-) :: TChan a -> a -> IO ()
+(->-) = flip writeCh
+
+-- | Starts a background process.
+go :: IO () -> IO ThreadId
+go = forkIO
+
+
+{- Example: (TODO: move to module-level docs)
+
+forRange :: TChan (Maybe a) -> (a -> IO b) -> IO [b]
+forRange ch fn = helper fn [] where
+ -- I could not get Intero or ghc to accept a type-annotation for the helper function,
+ -- even using the exact inferred type from Intero.
+ -- helper :: (a -> IO b) -> [b] -> IO [b]
+ helper fn acc = do
+ jv <- readCh ch
+ case jv of
+ Nothing -> return $ reverse acc
+ Just v -> do
+ b <- fn v
+ helper fn (b : acc)
+
+generate :: (Num a, Enum a) => TChan (Maybe a) -> IO ()
+generate ch = do
+ forM_ [1 .. 9999] (\x -> writeCh ch (Just x))
+ writeQ ch Nothing -- EOF value
+
+process :: TChan (Maybe Int) -> IO ()
+process c = do
+ forRange c (print :: Int -> IO ())
+ return ()
+
+main :: IO ()
+main = do
+ ch <- chan
+ go $ generate ch
+ process ch
+-}
diff --git a/lore/Control/Concurrent/Sima.hs b/lore/Control/Concurrent/Sima.hs
new file mode 100644
index 0000000..3588a2a
--- /dev/null
+++ b/lore/Control/Concurrent/Sima.hs
@@ -0,0 +1,12 @@
+module Control.Concurrent.Simaphore
+ ( mapPool
+ )
+where
+
+import Control.Concurrent.MSem
+
+-- | Simaphore-based throttled 'mapConcurrently'.
+mapPool :: Traversable t => Int -> (a -> IO b) -> t a -> IO (t b)
+mapPool lim f xs = do
+ sima <- new lim
+ mapConcurrently (with sima . f) xs