|
|
|
|
@@ -8,7 +8,7 @@ import Control.Monad.Error.Class (class MonadError, class MonadThrow, catchError
|
|
|
|
|
import Control.Monad.Except (ExceptT, runExceptT)
|
|
|
|
|
import Control.Monad.Fork.Class (class MonadBracket, class MonadFork, fork)
|
|
|
|
|
import Control.Monad.Maybe.Trans (MaybeT(..), runMaybeT)
|
|
|
|
|
import Control.Monad.Morph (hoist)
|
|
|
|
|
import Control.Monad.Morph (class MFunctor, hoist)
|
|
|
|
|
import Control.Monad.Rec.Class (class MonadRec, Step(..), tailRecM)
|
|
|
|
|
import Control.Monad.ST.Class (liftST)
|
|
|
|
|
import Control.Monad.ST.Ref (STRef)
|
|
|
|
|
@@ -29,7 +29,6 @@ import Data.Tuple.Nested (type (/\), (/\))
|
|
|
|
|
import Effect.Aff.Class (class MonadAff, liftAff)
|
|
|
|
|
import Effect.Class (class MonadEffect, liftEffect)
|
|
|
|
|
import Effect.Console (log)
|
|
|
|
|
import Effect.Exception (Error, error)
|
|
|
|
|
import Effect.Now as Now
|
|
|
|
|
import Pipes (await, yield)
|
|
|
|
|
import Pipes.Collect as Collect
|
|
|
|
|
@@ -131,6 +130,18 @@ getAsyncIO (Pure _) = pure Nothing
|
|
|
|
|
instance MonadTrans (AsyncPipe a b) where
|
|
|
|
|
lift = M <<< map Pure
|
|
|
|
|
|
|
|
|
|
instance MFunctor (AsyncPipe a b) where
|
|
|
|
|
hoist _ (Pure a) = Pure a
|
|
|
|
|
hoist f (M m) = M $ f $ hoist f <$> m
|
|
|
|
|
hoist f (AsyncIO ({read, write, awaitWrite, awaitRead} /\ m)) =
|
|
|
|
|
AsyncIO
|
|
|
|
|
$ { read: f read
|
|
|
|
|
, write: f <<< write
|
|
|
|
|
, awaitWrite: f awaitWrite
|
|
|
|
|
, awaitRead: f awaitRead
|
|
|
|
|
}
|
|
|
|
|
/\ hoist f m
|
|
|
|
|
|
|
|
|
|
instance Monad m => Functor (AsyncPipe a b m) where
|
|
|
|
|
map f (Pure r) = Pure $ f r
|
|
|
|
|
map f (M m) = M $ map f <$> m
|
|
|
|
|
@@ -310,19 +321,19 @@ sync m =
|
|
|
|
|
-- |
|
|
|
|
|
-- | If the consuming half fails, the error is caught, the producing half is killed, and the error is rethrown.
|
|
|
|
|
pipeAsync
|
|
|
|
|
:: forall f m a b
|
|
|
|
|
:: forall e f m a b
|
|
|
|
|
. MonadRec m
|
|
|
|
|
=> MonadAff m
|
|
|
|
|
=> MonadBracket Error f m
|
|
|
|
|
=> MonadBracket e f m
|
|
|
|
|
=> Producer (Maybe a) m Unit
|
|
|
|
|
-> AsyncPipe (Maybe a) (Maybe b) m Unit
|
|
|
|
|
-> Producer (Maybe b) m Unit
|
|
|
|
|
pipeAsync prod m =
|
|
|
|
|
lift (getAsyncIO m)
|
|
|
|
|
>>= case _ of
|
|
|
|
|
Nothing -> throwError $ error "`pipeAsync` invoked on `AsyncPipe` that did not have `AsyncIO`"
|
|
|
|
|
Nothing -> pure unit
|
|
|
|
|
Just ({write, read, awaitWrite, awaitRead} /\ done) -> do
|
|
|
|
|
errST :: STRef _ (Maybe Error) <- lift $ liftEffect $ liftST $ ST.Ref.new Nothing
|
|
|
|
|
errST :: STRef _ (Maybe e) <- lift $ liftEffect $ liftST $ ST.Ref.new Nothing
|
|
|
|
|
killST :: STRef _ Boolean <- lift $ liftEffect $ liftST $ ST.Ref.new false
|
|
|
|
|
|
|
|
|
|
let
|
|
|
|
|
|