summaryrefslogtreecommitdiff
path: root/Com/Simatime/Go.hs
blob: f19511f78a1a9cf55fb471c26ea7b858f93f4742 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
{- | An EDSL to make working with concurrent in-process code a bit easier
   to read.

This module is expected to be imported qualified as `Go`. Inspired by
Golang and Clojure's core.async.

$example
-}
{-# LANGUAGE NoImplicitPrelude #-}
{-# LANGUAGE OverloadedStrings #-}
module Com.Simatime.Go
  (
  -- * Running and forking
    fork
  -- * Channels
  , Channel
  , chan
  , read
  , write
  , mult
  , tap
  )
where

import           Com.Simatime.Alpha      hiding ( read )
import qualified Control.Concurrent            as Concurrent
import qualified Control.Concurrent.Chan.Unagi.Bounded
                                               as Chan
import qualified Data.Aeson                    as Aeson
import qualified System.IO.Unsafe              as Unsafe

-- | A standard channel.
data Channel a = Channel
  { _in :: Chan.InChan a
  , _out :: Chan.OutChan a
  , _size :: Int
  }

instance Aeson.ToJSON (Channel a) where
  toJSON c = Aeson.String ("#<channel " <> len c <> ">" :: Text)
    where len = show . Unsafe.unsafePerformIO . Chan.estimatedLength . _in

-- | Starts a background process.
fork :: IO () -> IO Concurrent.ThreadId
fork = Concurrent.forkIO

-- | Make a new channel.
chan :: Int -> IO (Channel a)
chan n = do
  (i, o) <- Chan.newChan n
  return <| Channel i o n

-- | A channel for broadcasting to multiple consumers. See 'mult'.
type Mult a = Chan.OutChan a

-- | Duplicates a channel, but then anything written to the source will
-- be available to both. This is like Clojure's `core.async/mult`
mult :: Channel a -> IO (Mult a)
mult = Chan.dupChan . _in

-- | Read a value from a 'Mult'. This is like Clojure's `core.async/tap`.
--
-- You can use this to read from a channel in a background process, e.g.:
--
-- >>> c <- Go.chan
-- >>> Go.fork . forever <| Go.mult c >>= Go.tap >>= print
tap :: Mult a -> IO a
tap = Chan.readChan

-- | Take from a channel. Blocks until a value is received.
read :: Channel a -> IO a
read = Chan.readChan . _out

-- | Write to a channel.
write :: Channel a -> a -> IO ()
write = Chan.writeChan . _in

{- $example

A simple example from ghci:

>>> import qualified Com.Simatime.Go as Go
>>> c <- Go.chan :: IO (Go.Channel Text)
>>> Go.write c "test"
>>> Go.read c
"test"

An example with tap and mult:

>>> c <- Go.chan :: IO (Go.Channel Text)
>>> Go.write c "hi"
>>> Go.read c
"hi"
>>> Go.fork
>>> Go.fork $ forever $ Go.mult c >>= Go.tap >>= \t -> print ("one: " <> t)
ThreadId 810
>>> Go.fork $ forever $ Go.mult c >>= Go.tap >>= \t -> print ("two: " <> t)
ThreadId 825
>>> Go.write c "test"
"two: t"eosnte":
 test"

The text is garbled because the actions are happening concurrently and
trying to serialize to write the output, but you get the idea.

-}