summaryrefslogtreecommitdiff
path: root/Control/Concurrent/Go.hs
blob: 5057bfefa7642c4bc980e857b5b2741b3c5dac4b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE NoImplicitPrelude #-}

-- | An EDSL to make working with concurrent in-process code a bit easier
--   to read.
--
-- This module is expected to be imported qualified as `Go`. Inspired by
-- Golang and Clojure's core.async.
--
-- \$example
module Control.Concurrent.Go
  ( -- * Running and forking
    fork,

    -- * Channels
    Channel,
    chan,
    read,
    write,
    mult,
    tap,
  )
where

import Alpha
import qualified Control.Concurrent as Concurrent
import qualified Control.Concurrent.Chan.Unagi.Bounded as Chan
import qualified Data.Aeson as Aeson
import Data.Text (Text)
import qualified System.IO.Unsafe as Unsafe

-- | A standard channel.
data Channel a = Channel
  { _in :: Chan.InChan a,
    _out :: Chan.OutChan a,
    _size :: Int
  }

instance Aeson.ToJSON (Channel a) where
  toJSON c = Aeson.String ("#<channel " <> len c <> ">" :: Text)
    where
      len = show <. Unsafe.unsafePerformIO <. Chan.estimatedLength <. _in

-- | Starts a background process.
fork :: IO () -> IO Concurrent.ThreadId
fork = Concurrent.forkIO

-- | Make a new channel.
chan :: Int -> IO (Channel a)
chan n = do
  (i, o) <- Chan.newChan n
  pure <| Channel i o n

-- | A channel for broadcasting to multiple consumers. See 'mult'.
type Mult a = Chan.OutChan a

-- | Duplicates a channel, but then anything written to the source will
-- be available to both. This is like Clojure's `core.async/mult`
mult :: Channel a -> IO (Mult a)
mult = Chan.dupChan <. _in

-- | Read a value from a 'Mult'. This is like Clojure's `core.async/tap`.
--
-- You can use this to read from a channel in a background process, e.g.:
--
-- >>> c <- Go.chan
-- >>> Go.fork <. forever <| Go.mult c +> Go.tap +> print
tap :: Mult a -> IO a
tap = Chan.readChan

-- | Take from a channel. Blocks until a value is received.
read :: Channel a -> IO a
read = Chan.readChan <. _out

-- | Write to a channel. Blocks if the channel is full.
write :: Channel a -> a -> IO Bool
write = Chan.tryWriteChan <. _in

-- <|example
--
-- A simple example from ghci:
--
-- >>> import qualified Control.Concurrent.Go as Go
-- >>> c <- Go.chan :: IO (Go.Channel Text)
-- >>> Go.write c "test"
-- >>> Go.read c
-- "test"
--
-- An example with tap and mult:
--
-- >>> c <- Go.chan :: IO (Go.Channel Text)
-- >>> Go.write c "hi"
-- >>> Go.read c
-- "hi"
-- >>> Go.fork
-- >>> Go.fork <| forever <| Go.mult c +> Go.tap +> \t -> print ("one: " <> t)
-- ThreadId 810
-- >>> Go.fork <| forever <| Go.mult c +> Go.tap +> \t -> print ("two: " <> t)
-- ThreadId 825
-- >>> Go.write c "test"
-- "two: t"eosnte":
--  test"
--
-- The text is garbled because the actions are happening concurrently and
-- trying to serialize to write the output, but you get the idea.