7 Commits

Author SHA1 Message Date
43ff92a4ad chore: prepare v2.1.4 2024-06-25 14:26:40 -05:00
bb9790b2f4 chore: prepare v2.1.34 2024-06-25 14:24:50 -05:00
6b64956034 chore: prepare v2.1.3 2024-06-25 14:24:46 -05:00
271ca13f8b chore: prepare v2.1.2 2024-06-25 14:20:38 -05:00
4a9dbf0a3c fix: relax pipeAsync constraints 2024-06-25 14:20:25 -05:00
c73d934a5c chore: prepare v2.1.1 2024-06-23 20:49:22 -05:00
dc1ba322a9 fix: asyncpipe is mfunctor 2024-06-23 20:49:17 -05:00
4 changed files with 21 additions and 10 deletions

View File

@@ -1,6 +1,6 @@
{
"name": "purescript-node-stream-pipes",
"version": "v2.1.0",
"version": "v2.1.4",
"type": "module",
"dependencies": {
"csv-parse": "^5.5.6",

View File

@@ -20,7 +20,7 @@ workspace:
- newtype: ">=5.0.0 <6.0.0"
- node-buffer: ">=9.0.0 <10.0.0"
- node-event-emitter: ">=3.0.0 <4.0.0"
- node-fs: ">=9.1.0 <10.0.0"
- node-fs: ">=9.1.0 <9.2.0"
- node-path: ">=5.0.0 <6.0.0"
- node-streams: ">=9.0.0 <10.0.0"
- node-zlib: ">=0.4.0 <0.5.0"

View File

@@ -1,7 +1,7 @@
package:
name: node-stream-pipes
publish:
version: '2.1.0'
version: '2.1.4'
license: 'GPL-3.0-or-later'
location:
githubOwner: 'cakekindel'
@@ -27,7 +27,7 @@ package:
- newtype: ">=5.0.0 <6.0.0"
- node-buffer: ">=9.0.0 <10.0.0"
- node-event-emitter: ">=3.0.0 <4.0.0"
- node-fs: ">=9.1.0 <10.0.0"
- node-fs: ">=9.1.0 <9.2.0"
- node-path: ">=5.0.0 <6.0.0"
- node-streams: ">=9.0.0 <10.0.0"
- node-zlib: ">=0.4.0 <0.5.0"

View File

@@ -8,7 +8,7 @@ import Control.Monad.Error.Class (class MonadError, class MonadThrow, catchError
import Control.Monad.Except (ExceptT, runExceptT)
import Control.Monad.Fork.Class (class MonadBracket, class MonadFork, fork)
import Control.Monad.Maybe.Trans (MaybeT(..), runMaybeT)
import Control.Monad.Morph (hoist)
import Control.Monad.Morph (class MFunctor, hoist)
import Control.Monad.Rec.Class (class MonadRec, Step(..), tailRecM)
import Control.Monad.ST.Class (liftST)
import Control.Monad.ST.Ref (STRef)
@@ -29,7 +29,6 @@ import Data.Tuple.Nested (type (/\), (/\))
import Effect.Aff.Class (class MonadAff, liftAff)
import Effect.Class (class MonadEffect, liftEffect)
import Effect.Console (log)
import Effect.Exception (Error, error)
import Effect.Now as Now
import Pipes (await, yield)
import Pipes.Collect as Collect
@@ -131,6 +130,18 @@ getAsyncIO (Pure _) = pure Nothing
instance MonadTrans (AsyncPipe a b) where
lift = M <<< map Pure
instance MFunctor (AsyncPipe a b) where
hoist _ (Pure a) = Pure a
hoist f (M m) = M $ f $ hoist f <$> m
hoist f (AsyncIO ({read, write, awaitWrite, awaitRead} /\ m)) =
AsyncIO
$ { read: f read
, write: f <<< write
, awaitWrite: f awaitWrite
, awaitRead: f awaitRead
}
/\ hoist f m
instance Monad m => Functor (AsyncPipe a b m) where
map f (Pure r) = Pure $ f r
map f (M m) = M $ map f <$> m
@@ -310,19 +321,19 @@ sync m =
-- |
-- | If the consuming half fails, the error is caught, the producing half is killed, and the error is rethrown.
pipeAsync
:: forall f m a b
:: forall e f m a b
. MonadRec m
=> MonadAff m
=> MonadBracket Error f m
=> MonadBracket e f m
=> Producer (Maybe a) m Unit
-> AsyncPipe (Maybe a) (Maybe b) m Unit
-> Producer (Maybe b) m Unit
pipeAsync prod m =
lift (getAsyncIO m)
>>= case _ of
Nothing -> throwError $ error "`pipeAsync` invoked on `AsyncPipe` that did not have `AsyncIO`"
Nothing -> pure unit
Just ({write, read, awaitWrite, awaitRead} /\ done) -> do
errST :: STRef _ (Maybe Error) <- lift $ liftEffect $ liftST $ ST.Ref.new Nothing
errST :: STRef _ (Maybe e) <- lift $ liftEffect $ liftST $ ST.Ref.new Nothing
killST :: STRef _ Boolean <- lift $ liftEffect $ liftST $ ST.Ref.new false
let