4 Commits

Author SHA1 Message Date
271ca13f8b chore: prepare v2.1.2 2024-06-25 14:20:38 -05:00
4a9dbf0a3c fix: relax pipeAsync constraints 2024-06-25 14:20:25 -05:00
c73d934a5c chore: prepare v2.1.1 2024-06-23 20:49:22 -05:00
dc1ba322a9 fix: asyncpipe is mfunctor 2024-06-23 20:49:17 -05:00
3 changed files with 19 additions and 8 deletions

View File

@@ -1,6 +1,6 @@
{
"name": "purescript-node-stream-pipes",
"version": "v2.1.0",
"version": "v2.1.2",
"type": "module",
"dependencies": {
"csv-parse": "^5.5.6",

View File

@@ -1,7 +1,7 @@
package:
name: node-stream-pipes
publish:
version: '2.1.0'
version: '2.1.2'
license: 'GPL-3.0-or-later'
location:
githubOwner: 'cakekindel'

View File

@@ -8,7 +8,7 @@ import Control.Monad.Error.Class (class MonadError, class MonadThrow, catchError
import Control.Monad.Except (ExceptT, runExceptT)
import Control.Monad.Fork.Class (class MonadBracket, class MonadFork, fork)
import Control.Monad.Maybe.Trans (MaybeT(..), runMaybeT)
import Control.Monad.Morph (hoist)
import Control.Monad.Morph (class MFunctor, hoist)
import Control.Monad.Rec.Class (class MonadRec, Step(..), tailRecM)
import Control.Monad.ST.Class (liftST)
import Control.Monad.ST.Ref (STRef)
@@ -29,7 +29,6 @@ import Data.Tuple.Nested (type (/\), (/\))
import Effect.Aff.Class (class MonadAff, liftAff)
import Effect.Class (class MonadEffect, liftEffect)
import Effect.Console (log)
import Effect.Exception (Error, error)
import Effect.Now as Now
import Pipes (await, yield)
import Pipes.Collect as Collect
@@ -131,6 +130,18 @@ getAsyncIO (Pure _) = pure Nothing
instance MonadTrans (AsyncPipe a b) where
lift = M <<< map Pure
instance MFunctor (AsyncPipe a b) where
hoist _ (Pure a) = Pure a
hoist f (M m) = M $ f $ hoist f <$> m
hoist f (AsyncIO ({read, write, awaitWrite, awaitRead} /\ m)) =
AsyncIO
$ { read: f read
, write: f <<< write
, awaitWrite: f awaitWrite
, awaitRead: f awaitRead
}
/\ hoist f m
instance Monad m => Functor (AsyncPipe a b m) where
map f (Pure r) = Pure $ f r
map f (M m) = M $ map f <$> m
@@ -310,19 +321,19 @@ sync m =
-- |
-- | If the consuming half fails, the error is caught, the producing half is killed, and the error is rethrown.
pipeAsync
:: forall f m a b
:: forall e f m a b
. MonadRec m
=> MonadAff m
=> MonadBracket Error f m
=> MonadBracket e f m
=> Producer (Maybe a) m Unit
-> AsyncPipe (Maybe a) (Maybe b) m Unit
-> Producer (Maybe b) m Unit
pipeAsync prod m =
lift (getAsyncIO m)
>>= case _ of
Nothing -> throwError $ error "`pipeAsync` invoked on `AsyncPipe` that did not have `AsyncIO`"
Nothing -> pure unit
Just ({write, read, awaitWrite, awaitRead} /\ done) -> do
errST :: STRef _ (Maybe Error) <- lift $ liftEffect $ liftST $ ST.Ref.new Nothing
errST :: STRef _ (Maybe e) <- lift $ liftEffect $ liftST $ ST.Ref.new Nothing
killST :: STRef _ Boolean <- lift $ liftEffect $ liftST $ ST.Ref.new false
let