diff options
Diffstat (limited to 'lore/Control')
-rw-r--r-- | lore/Control/Concurrent/Go.hs | 101 | ||||
-rw-r--r-- | lore/Control/Concurrent/Sima.hs | 12 |
2 files changed, 113 insertions, 0 deletions
diff --git a/lore/Control/Concurrent/Go.hs b/lore/Control/Concurrent/Go.hs new file mode 100644 index 0000000..05626d7 --- /dev/null +++ b/lore/Control/Concurrent/Go.hs @@ -0,0 +1,101 @@ +-- based on +-- https://stackoverflow.com/questions/4522387/how-can-i-emulate-gos-channels-with-haskell +-- but this version encodes end-of-stream on the communication channel, as a Nothing + +module Control.Concurrent.Go + ( chan + , readCh + , (-<-) + , writeCh + , (->-) + , go + ) +where + +import Control.Concurrent ( forkIO + , ThreadId + , threadDelay + ) +import Control.Concurrent.STM.TChan ( newTChan + , readTChan + , writeTChan + , isEmptyTChan + , TChan + ) +import Control.Monad ( forM_ ) +import GHC.Conc ( atomically ) + +-- | Make a new channel. +chan :: _ +chan = atomically . newTChan + +-- | Take from a channel. +readCh :: TChan a -> IO a +readCh = atomically . readTChan + +-- | Alias for 'readCh'. +-- +-- >>> c <- chan +-- >>> writeCh c "val" +-- >>> -<- c +-- "val" +-- +-- I don't think this looks terrible with do-notation: +-- +-- >>> c <- chan +-- >>> writeCh c "val" +-- >>> result <- -<- c +-- >>> print result +-- "val" +(-<-) :: TChan a -> IO a +(-<-) = readCh + +-- | Write to a channel. +writeCh :: TChan a -> a -> IO () +writeCh ch v = atomically $ writeTChan ch v + +-- | Alias for 'writeCh', but flipped to make it read better. +-- +-- >>> c <- chan +-- >>> "val" ->- c +-- >>> readCh c +-- "val" +(->-) :: TChan a -> a -> IO () +(->-) = flip writeCh + +-- | Starts a background process. +go :: IO () -> IO ThreadId +go = forkIO + + +{- Example: (TODO: move to module-level docs) + +forRange :: TChan (Maybe a) -> (a -> IO b) -> IO [b] +forRange ch fn = helper fn [] where + -- I could not get Intero or ghc to accept a type-annotation for the helper function, + -- even using the exact inferred type from Intero. + -- helper :: (a -> IO b) -> [b] -> IO [b] + helper fn acc = do + jv <- readCh ch + case jv of + Nothing -> return $ reverse acc + Just v -> do + b <- fn v + helper fn (b : acc) + +generate :: (Num a, Enum a) => TChan (Maybe a) -> IO () +generate ch = do + forM_ [1 .. 9999] (\x -> writeCh ch (Just x)) + writeQ ch Nothing -- EOF value + +process :: TChan (Maybe Int) -> IO () +process c = do + forRange c (print :: Int -> IO ()) + return () + +main :: IO () +main = do + ch <- chan + go $ generate ch + process ch +-} diff --git a/lore/Control/Concurrent/Sima.hs b/lore/Control/Concurrent/Sima.hs new file mode 100644 index 0000000..3588a2a --- /dev/null +++ b/lore/Control/Concurrent/Sima.hs @@ -0,0 +1,12 @@ +module Control.Concurrent.Simaphore + ( mapPool + ) +where + +import Control.Concurrent.MSem + +-- | Simaphore-based throttled 'mapConcurrently'. +mapPool :: Traversable t => Int -> (a -> IO b) -> t a -> IO (t b) +mapPool lim f xs = do + sima <- new lim + mapConcurrently (with sima . f) xs |