19 Commits

Author SHA1 Message Date
7d03850623 chore: prepare v2.1.5 2024-07-08 12:43:58 -05:00
dc7a2d3387 feat: Pipe.Util.buffered 2024-07-08 12:40:25 -05:00
43ff92a4ad chore: prepare v2.1.4 2024-06-25 14:26:40 -05:00
bb9790b2f4 chore: prepare v2.1.34 2024-06-25 14:24:50 -05:00
6b64956034 chore: prepare v2.1.3 2024-06-25 14:24:46 -05:00
271ca13f8b chore: prepare v2.1.2 2024-06-25 14:20:38 -05:00
4a9dbf0a3c fix: relax pipeAsync constraints 2024-06-25 14:20:25 -05:00
c73d934a5c chore: prepare v2.1.1 2024-06-23 20:49:22 -05:00
dc1ba322a9 fix: asyncpipe is mfunctor 2024-06-23 20:49:17 -05:00
cda67508d4 chore: prepare v2.1.0 2024-06-23 18:34:01 -05:00
860ace3990 feat: AsyncPipe really should be a monad 2024-06-23 18:33:45 -05:00
7f11c303fb chore: prepare v2.0.2 2024-06-22 19:33:56 -05:00
2e0be4ac62 fix: loop bug 2024-06-22 19:33:45 -05:00
0ba315ede0 chore: prepare v2.0.1 2024-06-22 19:12:56 -05:00
08bd9a817a fix: asyncpipe is profunctor 2024-06-22 19:12:31 -05:00
970d890a00 chore: prepare v2.0.0 2024-06-22 18:42:49 -05:00
5b3eda707e feat!: add AsyncPipe abstraction, significantly improve throughput of Transform streams 2024-06-22 18:42:22 -05:00
4cd44367a8 chore: prepare v1.6.1 2024-06-21 13:21:22 -05:00
d76f55e267 fix: introduced transform bug 2024-06-21 13:21:19 -05:00
15 changed files with 8850 additions and 119 deletions

BIN
bun.lockb

Binary file not shown.

View File

@@ -1,12 +1,13 @@
{ {
"name": "purescript-node-stream-pipes", "name": "purescript-node-stream-pipes",
"version": "v1.6.0", "version": "v2.1.5",
"type": "module", "type": "module",
"dependencies": { "dependencies": {
"csv-parse": "^5.5.5", "csv-parse": "^5.5.6",
"csv-stringify": "^6.4.6" "csv-stringify": "^6.5.0"
}, },
"devDependencies": { "devDependencies": {
"cbor-x": "^1.5.9",
"typescript": "^5.4.5" "typescript": "^5.4.5"
} }
} }

View File

@@ -5,20 +5,26 @@ workspace:
dependencies: dependencies:
- aff: ">=7.1.0 <8.0.0" - aff: ">=7.1.0 <8.0.0"
- arrays: ">=7.3.0 <8.0.0" - arrays: ">=7.3.0 <8.0.0"
- console: ">=6.1.0 <7.0.0"
- control: ">=6.0.0 <7.0.0"
- datetime: ">=6.1.0 <7.0.0"
- effect: ">=4.0.0 <5.0.0" - effect: ">=4.0.0 <5.0.0"
- either: ">=6.1.0 <7.0.0" - either: ">=6.1.0 <7.0.0"
- exceptions: ">=6.0.0 <7.0.0" - exceptions: ">=6.0.0 <7.0.0"
- foldable-traversable: ">=6.0.0 <7.0.0" - foldable-traversable: ">=6.0.0 <7.0.0"
- foreign-object: ">=4.1.0 <5.0.0" - foreign-object: ">=4.1.0 <5.0.0"
- fork: ">=6.0.0 <7.0.0"
- lists: ">=7.0.0 <8.0.0" - lists: ">=7.0.0 <8.0.0"
- maybe: ">=6.0.0 <7.0.0" - maybe: ">=6.0.0 <7.0.0"
- mmorph: ">=7.0.0 <8.0.0" - mmorph: ">=7.0.0 <8.0.0"
- newtype: ">=5.0.0 <6.0.0"
- node-buffer: ">=9.0.0 <10.0.0" - node-buffer: ">=9.0.0 <10.0.0"
- node-event-emitter: ">=3.0.0 <4.0.0" - node-event-emitter: ">=3.0.0 <4.0.0"
- node-fs: ">=9.1.0 <10.0.0" - node-fs: ">=9.1.0 <9.2.0"
- node-path: ">=5.0.0 <6.0.0" - node-path: ">=5.0.0 <6.0.0"
- node-streams: ">=9.0.0 <10.0.0" - node-streams: ">=9.0.0 <10.0.0"
- node-zlib: ">=0.4.0 <0.5.0" - node-zlib: ">=0.4.0 <0.5.0"
- now: ">=6.0.0 <7.0.0"
- ordered-collections: ">=3.2.0 <4.0.0" - ordered-collections: ">=3.2.0 <4.0.0"
- parallel: ">=6.0.0 <7.0.0" - parallel: ">=6.0.0 <7.0.0"
- pipes: ">=8.0.0 <9.0.0" - pipes: ">=8.0.0 <9.0.0"
@@ -296,8 +302,8 @@ packages:
- unfoldable - unfoldable
exceptions: exceptions:
type: registry type: registry
version: 6.0.0 version: 6.1.0
integrity: sha256-y/xTAEIZIARCE+50/u1di0ncebJ+CIwNOLswyOWzMTw= integrity: sha256-K0T89IHtF3vBY7eSAO7eDOqSb2J9kZGAcDN5+IKsF8E=
dependencies: dependencies:
- effect - effect
- either - either
@@ -861,8 +867,8 @@ packages:
- refs - refs
transformers: transformers:
type: registry type: registry
version: 6.0.0 version: 6.1.0
integrity: sha256-Pzw40HjthX77tdPAYzjx43LK3X5Bb7ZspYAp27wksFA= integrity: sha256-3Bm+Z6tsC/paG888XkywDngJ2JMos+JfOhRlkVfb7gI=
dependencies: dependencies:
- control - control
- distributive - distributive
@@ -875,6 +881,7 @@ packages:
- maybe - maybe
- newtype - newtype
- prelude - prelude
- st
- tailrec - tailrec
- tuples - tuples
- unfoldable - unfoldable

View File

@@ -1,7 +1,7 @@
package: package:
name: node-stream-pipes name: node-stream-pipes
publish: publish:
version: '1.6.0' version: '2.1.5'
license: 'GPL-3.0-or-later' license: 'GPL-3.0-or-later'
location: location:
githubOwner: 'cakekindel' githubOwner: 'cakekindel'
@@ -12,20 +12,26 @@ package:
dependencies: dependencies:
- aff: ">=7.1.0 <8.0.0" - aff: ">=7.1.0 <8.0.0"
- arrays: ">=7.3.0 <8.0.0" - arrays: ">=7.3.0 <8.0.0"
- console: ">=6.1.0 <7.0.0"
- control: ">=6.0.0 <7.0.0"
- datetime: ">=6.1.0 <7.0.0"
- effect: ">=4.0.0 <5.0.0" - effect: ">=4.0.0 <5.0.0"
- either: ">=6.1.0 <7.0.0" - either: ">=6.1.0 <7.0.0"
- exceptions: ">=6.0.0 <7.0.0" - exceptions: ">=6.0.0 <7.0.0"
- foldable-traversable: ">=6.0.0 <7.0.0" - foldable-traversable: ">=6.0.0 <7.0.0"
- foreign-object: ">=4.1.0 <5.0.0" - foreign-object: ">=4.1.0 <5.0.0"
- fork: ">=6.0.0 <7.0.0"
- lists: ">=7.0.0 <8.0.0" - lists: ">=7.0.0 <8.0.0"
- maybe: ">=6.0.0 <7.0.0" - maybe: ">=6.0.0 <7.0.0"
- mmorph: ">=7.0.0 <8.0.0" - mmorph: ">=7.0.0 <8.0.0"
- newtype: ">=5.0.0 <6.0.0"
- node-buffer: ">=9.0.0 <10.0.0" - node-buffer: ">=9.0.0 <10.0.0"
- node-event-emitter: ">=3.0.0 <4.0.0" - node-event-emitter: ">=3.0.0 <4.0.0"
- node-fs: ">=9.1.0 <10.0.0" - node-fs: ">=9.1.0 <9.2.0"
- node-path: ">=5.0.0 <6.0.0" - node-path: ">=5.0.0 <6.0.0"
- node-streams: ">=9.0.0 <10.0.0" - node-streams: ">=9.0.0 <10.0.0"
- node-zlib: ">=0.4.0 <0.5.0" - node-zlib: ">=0.4.0 <0.5.0"
- now: ">=6.0.0 <7.0.0"
- ordered-collections: ">=3.2.0 <4.0.0" - ordered-collections: ">=3.2.0 <4.0.0"
- parallel: ">=6.0.0 <7.0.0" - parallel: ">=6.0.0 <7.0.0"
- pipes: ">=8.0.0 <9.0.0" - pipes: ">=8.0.0 <9.0.0"

View File

@@ -172,9 +172,10 @@ unsafeFromStringWritable :: forall r. Stream.Writable r -> Writable String
unsafeFromStringWritable = unsafeCoerce unsafeFromStringWritable = unsafeCoerce
awaitReadableOrClosed :: forall s a. Read s a => s -> Aff Unit awaitReadableOrClosed :: forall s a. Read s a => s -> Aff Unit
awaitReadableOrClosed s = do awaitReadableOrClosed s = Aff.supervise do
fiber <- fiber <-
Aff.forkAff $ parOneOf Aff.forkAff
$ parOneOf
[ onceAff0 readableH s $> Right unit [ onceAff0 readableH s $> Right unit
, onceAff0 closeH s $> Right unit , onceAff0 closeH s $> Right unit
, Left <$> onceAff1 errorH s , Left <$> onceAff1 errorH s
@@ -189,14 +190,20 @@ awaitReadableOrClosed s = do
Aff.killFiber (error "") fiber Aff.killFiber (error "") fiber
awaitFinished :: forall s a. Write s a => s -> Aff Unit awaitFinished :: forall s a. Write s a => s -> Aff Unit
awaitFinished s = do awaitFinished s = Aff.supervise do
fiber <- Aff.forkAff $ onceAff0 finishH s fiber <- Aff.forkAff $ onceAff0 finishH s
finished <- liftEffect $ isWritableFinished s finished <- liftEffect $ isWritableFinished s
if not finished then Aff.joinFiber fiber else Aff.killFiber (error "") fiber if not finished then Aff.joinFiber fiber else Aff.killFiber (error "") fiber
awaitWritableOrClosed :: forall s a. Write s a => s -> Aff Unit awaitWritableOrClosed :: forall s a. Write s a => s -> Aff Unit
awaitWritableOrClosed s = do awaitWritableOrClosed s = Aff.supervise do
fiber <- Aff.forkAff $ parOneOf [ onceAff0 drainH s $> Right unit, onceAff0 closeH s $> Right unit, Left <$> onceAff1 errorH s ] fiber <-
Aff.forkAff
$ parOneOf
[ onceAff0 drainH s $> Right unit
, onceAff0 closeH s $> Right unit
, Left <$> onceAff1 errorH s
]
closed <- liftEffect $ isClosed s closed <- liftEffect $ isClosed s
writeEnded <- liftEffect $ isWritableEnded s writeEnded <- liftEffect $ isWritableEnded s
writable <- liftEffect $ isWritable s writable <- liftEffect $ isWritable s

368
src/Pipes.Async.purs Normal file
View File

@@ -0,0 +1,368 @@
module Pipes.Async where
import Prelude hiding (join)
import Control.Alternative (class Alternative, empty, guard)
import Control.Monad.Cont (class MonadTrans)
import Control.Monad.Error.Class (class MonadError, class MonadThrow, catchError, throwError)
import Control.Monad.Except (ExceptT, runExceptT)
import Control.Monad.Fork.Class (class MonadBracket, class MonadFork, fork)
import Control.Monad.Maybe.Trans (MaybeT(..), runMaybeT)
import Control.Monad.Morph (class MFunctor, hoist)
import Control.Monad.Rec.Class (class MonadRec, Step(..), tailRecM)
import Control.Monad.ST.Class (liftST)
import Control.Monad.ST.Ref (STRef)
import Control.Monad.ST.Ref as ST.Ref
import Control.Monad.Trans.Class (lift)
import Control.Parallel (class Parallel, parOneOf)
import Data.Array as Array
import Data.DateTime.Instant as Instant
import Data.Either (Either(..), either)
import Data.Foldable (class Foldable, fold)
import Data.Generic.Rep (class Generic)
import Data.Maybe (Maybe(..), fromMaybe, isNothing)
import Data.Newtype (unwrap)
import Data.Show.Generic (genericShow)
import Data.Time.Duration (Milliseconds)
import Data.Traversable (class Traversable, traverse, traverse_)
import Data.Tuple.Nested (type (/\), (/\))
import Effect.Aff.Class (class MonadAff, liftAff)
import Effect.Class (class MonadEffect, liftEffect)
import Effect.Console (log)
import Effect.Now as Now
import Pipes (await, yield)
import Pipes.Collect as Collect
import Pipes.Core (Pipe, Proxy, Producer)
data WriteSignal
= WriteSignalOk
| WriteSignalEnded
derive instance Generic WriteSignal _
derive instance Eq WriteSignal
derive instance Ord WriteSignal
instance Show WriteSignal where show = genericShow
instance Discard WriteSignal where
discard = bind
data ReadSignal
= ReadSignalOk
| ReadSignalEnded
derive instance Generic ReadSignal _
derive instance Eq ReadSignal
derive instance Ord ReadSignal
instance Show ReadSignal where show = genericShow
instance Discard ReadSignal where
discard = bind
data WriteResult
= WriteAgain
| WriteNeedsDrain
| WriteEnded
derive instance Generic WriteResult _
derive instance Eq WriteResult
derive instance Ord WriteResult
instance Show WriteResult where show = genericShow
data ReadResult a
= ReadOk a
| ReadWouldBlock
derive instance Generic (ReadResult a) _
derive instance Eq a => Eq (ReadResult a)
derive instance Ord a => Ord (ReadResult a)
derive instance Functor ReadResult
derive instance Foldable ReadResult
derive instance Traversable ReadResult
instance Show a => Show (ReadResult a) where show = genericShow
type AsyncIO a b m r =
{ write :: a -> m WriteResult
, read :: m (ReadResult b)
, awaitWrite :: m WriteSignal
, awaitRead :: m ReadSignal
}
/\ AsyncPipe a b m r
-- | An `AsyncPipe` is a `Pipe`-like struct that allows
-- | concurrently reading from a `Producer` and writing to a `Consumer`.
-- |
-- | An implementation of `AsyncPipe` for Node `Transform` streams
-- | is provided in `Pipes.Node.Stream`.
data AsyncPipe a b m r
-- | A pure return value
= Pure r
-- | An `AsyncPipe` behind a computation
| M (m (AsyncPipe a b m r))
-- | Interface to write & read from the backing resource
| AsyncIO (AsyncIO a b m r)
-- | Modify request / response types
mapIO :: forall aa ab ba bb m r. Monad m => (ab -> aa) -> (ba -> bb) -> AsyncPipe aa ba m r -> AsyncPipe ab bb m r
mapIO _ _ (Pure a) = Pure a
mapIO a b (M m) = M $ mapIO a b <$> m
mapIO a b (AsyncIO ({write, awaitWrite, read, awaitRead} /\ m)) =
AsyncIO $ {write: write <<< a, awaitWrite, read: map b <$> read, awaitRead} /\ mapIO a b m
-- | Modify request / response types
bindIO :: forall aa ab ba bb m r. Monad m => (ab -> m aa) -> (ba -> m bb) -> AsyncPipe aa ba m r -> AsyncPipe ab bb m r
bindIO _ _ (Pure a) = Pure a
bindIO a b (M m) = M $ bindIO a b <$> m
bindIO a b (AsyncIO ({write, awaitWrite, read, awaitRead} /\ m)) =
AsyncIO $ {write: flip bind write <<< a, awaitWrite, read: traverse b =<< read, awaitRead} /\ bindIO a b m
-- | Remove the `AsyncPipe` wrapper by discarding the IO
stripIO :: forall a b m r. Monad m => AsyncPipe a b m r -> m r
stripIO (Pure r) = pure r
stripIO (M m) = m >>= stripIO
stripIO (AsyncIO (_ /\ m)) = stripIO m
-- | Execute the `AsyncPipe` monad stack until `AsyncIO` is reached (if any)
getAsyncIO :: forall a b m r. Monad m => AsyncPipe a b m r -> m (Maybe (AsyncIO a b m r))
getAsyncIO (AsyncIO a) = pure $ Just a
getAsyncIO (M m) = m >>= getAsyncIO
getAsyncIO (Pure _) = pure Nothing
instance MonadTrans (AsyncPipe a b) where
lift = M <<< map Pure
instance MFunctor (AsyncPipe a b) where
hoist _ (Pure a) = Pure a
hoist f (M m) = M $ f $ hoist f <$> m
hoist f (AsyncIO ({read, write, awaitWrite, awaitRead} /\ m)) =
AsyncIO
$ { read: f read
, write: f <<< write
, awaitWrite: f awaitWrite
, awaitRead: f awaitRead
}
/\ hoist f m
instance Monad m => Functor (AsyncPipe a b m) where
map f (Pure r) = Pure $ f r
map f (M m) = M $ map f <$> m
map f (AsyncIO (io /\ m)) = AsyncIO $ io /\ (f <$> m)
instance Monad m => Apply (AsyncPipe a b m) where
apply (Pure f) ma = f <$> ma
apply (M mf) ma = M $ (_ <*> ma) <$> mf
apply (AsyncIO (io /\ mf)) ma = AsyncIO $ io /\ (mf <*> ma)
instance Monad m => Applicative (AsyncPipe a b m) where
pure = Pure
instance Monad m => Bind (AsyncPipe a b m) where
bind (Pure a) f = f a
bind (M ma) f = M $ (_ >>= f) <$> ma
bind (AsyncIO (io /\ m)) f = AsyncIO $ io /\ (m >>= f)
instance Monad m => Monad (AsyncPipe a b m)
instance MonadThrow e m => MonadThrow e (AsyncPipe a b m) where
throwError = lift <<< throwError
instance MonadError e m => MonadError e (AsyncPipe a b m) where
catchError m f = lift $ catchError (stripIO m) (stripIO <<< f)
instance MonadEffect m => MonadEffect (AsyncPipe a b m) where
liftEffect = lift <<< liftEffect
instance MonadAff m => MonadAff (AsyncPipe a b m) where
liftAff = lift <<< liftAff
-- | Wraps all fields of an `AsyncPipe` with logging to debug
-- | behavior and timing.
debug :: forall a b m r. MonadAff m => String -> AsyncPipe (Maybe a) (Maybe b) m r -> AsyncPipe (Maybe a) (Maybe b) m r
debug c m =
let
logL :: forall m'. MonadEffect m' => _ -> m' Unit
logL msg = liftEffect $ log $ "[" <> c <> "] " <> msg
logR :: forall m'. MonadEffect m' => _ -> m' Unit
logR msg = liftEffect $ log $ "[" <> c <> "] " <> fold (Array.replicate 20 " ") <> msg
time :: forall m' a'. MonadEffect m' => m' a' -> m' (Milliseconds /\ a')
time ma = do
start <- liftEffect Now.now
a <- ma
end <- liftEffect Now.now
pure $ (end `Instant.diff` start) /\ a
in
flip bind (fromMaybe m)
$ runMaybeT do
(io /\ done') <- MaybeT $ lift $ getAsyncIO m
let
write a = do
logL "write >"
elapsed /\ w <- time $ io.write a
logL $ "< write " <> show w <> " (" <> show (unwrap elapsed) <> "ms)"
pure w
read = do
logR "read >"
elapsed /\ r <- time $ io.read
logR $ "< read " <> show (map (const unit) <$> r) <> " (" <> show (unwrap elapsed) <> "ms)"
pure r
awaitWrite = do
logL "awaitWrite >"
elapsed /\ w <- time $ io.awaitWrite
logL $ "< awaitWrite " <> show w <> " (" <> show (unwrap elapsed) <> "ms)"
pure w
awaitRead = do
logR "awaitRead >"
elapsed /\ r <- time $ io.awaitRead
logR $ "< awaitRead " <> show r <> " (" <> show (unwrap elapsed) <> "ms)"
pure r
done = do
logL "done >"
elapsed /\ r <- time done'
logL $ "< done (" <> show (unwrap elapsed) <> "ms)"
pure r
pure $ AsyncIO $ {write, read, awaitWrite, awaitRead} /\ done
-- | Convert an `AsyncPipe` to a regular `Pipe`.
-- |
-- | Rather than two concurrently-running halves (producer & consumer),
-- | this requires the `AsyncPipe` to occasionally stop `await`ing data
-- | written by the upstream `Producer` so that it can `yield` to the downstream `Consumer`.
-- |
-- | This implementation chooses to prioritize `yield`ing data to the `Consumer` over
-- | `await`ing written chunks.
-- |
-- | Note that using this limits the potential parallelism of the entire pipeline, ex:
-- |
-- | ```purs
-- | Pipe.FS.read "foo.csv" -- read
-- | >-> sync Pipe.CSV.parse -- parse
-- | >-> sync Pipe.CBOR.encode -- encode
-- | >-> Pipe.FS.write "foo.bin" -- write
-- | ```
-- |
-- | In the above example, this is what happens when the pipeline
-- | is executed:
-- | 1. `write` asks `encode` "do you have any data yet?" (fast)
-- | 1. `encode` asks `parse` "do you have any data yet?" (fast)
-- | 1. `parse` asks `read` "do you have any data yet?" (fast)
-- | 1. `read` passes 1 chunk to `parse` (fast)
-- | 1. `parse` blocks until the chunk is parsed (slow)
-- | 1. `parse` passes 1 chunk to `encode` (fast)
-- | 1. `encode` blocks until the chunk is encoded (slow)
-- | 1. `write` writes the block (fast)
-- |
-- | For larger workloads, changing this to use `asyncPipe` would be preferable, ex:
-- | ```purs
-- | Pipe.FS.read "foo.csv" -- read
-- | >-/-> Pipe.CSV.parse -- parse
-- | >-/-> Pipe.CBOR.encode -- encode
-- | >-> Pipe.FS.write "foo.bin" -- write
-- | ```
-- |
-- | With this change:
-- | * `read` will pass chunks to `parse` as fast as `parse` allows
-- | * `parse` will parse chunks and yield them to `encode` as soon as they're ready
-- | * `encode` will encode chunks and yield them to `write` as soon as they're ready
sync :: forall a b f p e m r. MonadError e m => Alternative p => Parallel p m => MonadFork f m => MonadAff m => AsyncPipe (Maybe a) (Maybe b) m r -> Pipe (Maybe a) (Maybe b) m r
sync m =
let
liftPipe :: forall r'. (Proxy _ _ _ _ m) r' -> ExceptT (Step _ _) (Proxy _ _ _ _ m) r'
liftPipe = lift
liftM :: forall r'. m r' -> ExceptT (Step _ _) (Proxy _ _ _ _ m) r'
liftM = liftPipe <<< lift
in
lift (getAsyncIO m) >>=
case _ of
Nothing -> lift $ stripIO m
Just ({write, awaitWrite, read, awaitRead} /\ done) ->
let
awaitRW onR onW =
liftM (parOneOf [Right <$> awaitWrite, Left <$> awaitRead])
>>= either (const onR) onW
wSignal WriteSignalOk = WriteAgain
wSignal WriteSignalEnded = WriteEnded
tailRecEarly f a = tailRecM (map (either identity identity) <<< runExceptT <<< f) a
continue a = throwError (Loop a)
break = (Done <$> liftM (stripIO done)) >>= throwError
in do
flip tailRecEarly WriteAgain \writable -> do
rb <- liftM read
case rb of
ReadWouldBlock
| writable == WriteEnded -> liftM awaitRead *> continue writable
| writable == WriteNeedsDrain -> awaitRW (continue writable) (continue <<< wSignal)
| otherwise -> pure unit
ReadOk (Just b) -> liftPipe (yield $ Just b) *> continue writable
ReadOk Nothing -> liftPipe (yield Nothing) *> break
when (writable /= WriteAgain) $ continue writable
a <- liftPipe await
writable' <- liftM $ write a
when (isNothing a) $ continue WriteEnded
pure $ Loop writable'
-- | Implementation of `(>-/->)`
-- |
-- | In the current `MonadFork` "thread", read data from the `AsyncPipe` as it
-- | is yielded and `yield` to the downstream `Consumer`.
-- |
-- | Concurrently, in a separate thread, read data from the upstream `Producer`
-- | and write to the `AsyncPipe` at max throughput.
-- |
-- | If the producing half fails, the error is caught and rethrown.
-- |
-- | If the consuming half fails, the error is caught, the producing half is killed, and the error is rethrown.
pipeAsync
:: forall e f m a b
. MonadRec m
=> MonadAff m
=> MonadBracket e f m
=> Producer (Maybe a) m Unit
-> AsyncPipe (Maybe a) (Maybe b) m Unit
-> Producer (Maybe b) m Unit
pipeAsync prod m =
lift (getAsyncIO m)
>>= case _ of
Nothing -> pure unit
Just ({write, read, awaitWrite, awaitRead} /\ done) -> do
errST :: STRef _ (Maybe e) <- lift $ liftEffect $ liftST $ ST.Ref.new Nothing
killST :: STRef _ Boolean <- lift $ liftEffect $ liftST $ ST.Ref.new false
let
killThread = void $ liftEffect $ liftST $ ST.Ref.write true killST
threadKilled = liftEffect $ liftST $ ST.Ref.read killST
putThreadError = void <<< liftEffect <<< liftST <<< flip ST.Ref.write errST <<< Just
getThreadError = liftEffect $ liftST $ ST.Ref.read errST
rx a = do
killed <- threadKilled
guard $ not killed
w <- lift $ write a
case w of
WriteNeedsDrain -> lift $ void awaitWrite
WriteEnded -> empty
WriteAgain -> pure unit
spawn = lift <<< fork <<< flip catchError putThreadError
_thread <- spawn $ void $ runMaybeT $ Collect.foreach rx (hoist lift prod)
flip tailRecM unit $ const do
getThreadError >>= traverse_ throwError
rb <- lift read
case rb of
ReadOk (Just b) -> yield (Just b) $> Loop unit
ReadOk Nothing -> killThread *> yield Nothing $> Done unit
ReadWouldBlock -> void (lift awaitRead) $> Loop unit
lift $ stripIO done
infixl 7 pipeAsync as >-/->

View File

@@ -31,12 +31,8 @@ traverse :: forall a b m. MonadRec m => (b -> a -> m b) -> b -> Producer a m Uni
traverse f b0 p0 = traverse f b0 p0 =
flip tailRecM (p0 /\ b0) \(p /\ b) -> flip tailRecM (p0 /\ b0) \(p /\ b) ->
case p of case p of
Respond a m -> do Respond a m -> Loop <$> (m unit /\ _) <$> f b a
b' <- f b a M m -> Loop <$> (_ /\ b) <$> m
pure $ Loop $ m unit /\ b'
M m -> do
n <- m
pure $ Loop $ (n /\ b)
Request _ _ -> pure $ Done b Request _ _ -> pure $ Done b
Pure _ -> pure $ Done b Pure _ -> pure $ Done b

View File

@@ -3,19 +3,24 @@ module Pipes.Node.Stream where
import Prelude hiding (join) import Prelude hiding (join)
import Control.Monad.Error.Class (class MonadThrow, throwError) import Control.Monad.Error.Class (class MonadThrow, throwError)
import Control.Monad.Rec.Class (class MonadRec, Step(..), tailRecM, whileJust) import Control.Monad.Rec.Class (class MonadRec, Step(..), tailRecM)
import Control.Monad.ST.Class (liftST) import Control.Monad.ST.Class (liftST)
import Control.Monad.ST.Ref as ST.Ref
import Control.Monad.ST.Ref as STRef import Control.Monad.ST.Ref as STRef
import Control.Monad.Trans.Class (lift) import Control.Monad.Trans.Class (lift)
import Data.Maybe (Maybe(..), maybe) import Data.Maybe (Maybe(..), maybe)
import Data.Traversable (for_, traverse, traverse_) import Data.Traversable (for_, traverse_)
import Data.Tuple.Nested ((/\)) import Data.Tuple.Nested ((/\))
import Effect (Effect)
import Effect.Aff.Class (class MonadAff, liftAff) import Effect.Aff.Class (class MonadAff, liftAff)
import Effect.Class (liftEffect) import Effect.Class (liftEffect)
import Effect.Exception (Error) import Effect.Exception (Error)
import Node.Stream.Object (ReadResult(..), WriteResult(..))
import Node.Stream.Object as O import Node.Stream.Object as O
import Pipes (await, yield) import Pipes (await, yield)
import Pipes (for) as P import Pipes (for) as P
import Pipes.Async (AsyncPipe(..))
import Pipes.Async as AsyncPipe
import Pipes.Core (Consumer, Pipe, Producer, Producer_) import Pipes.Core (Consumer, Pipe, Producer, Producer_)
import Pipes.Util (InvokeResult(..), invoke) import Pipes.Util (InvokeResult(..), invoke)
@@ -83,49 +88,60 @@ fromWritable w = do
tailRecM go unit tailRecM go unit
-- | Convert a `Transform` stream to a `Pipe`. fromTransformEffect
-- | :: forall a b m
-- | When `Nothing` is piped to this, the `Transform` stream will . MonadThrow Error m
-- | be `end`ed, and the pipe will noop if invoked again. => MonadAff m
fromTransform :: forall a b m. MonadThrow Error m => MonadAff m => O.Transform a b -> Pipe (Maybe a) (Maybe b) m Unit => Effect (O.Transform a b)
fromTransform t = do -> AsyncPipe (Maybe a) (Maybe b) m Unit
{ error: errorST, cancel: removeErrorListener } <- liftEffect $ O.withErrorST t fromTransformEffect = fromTransform <=< liftEffect
let
maybeThrow = traverse_ throwError =<< liftEffect (liftST $ STRef.read errorST)
cleanup = do -- | Convert a `Transform` stream to an `AsyncPipe`.
flip tailRecM unit $ const do fromTransform
liftAff $ O.awaitReadableOrClosed t :: forall a b m
readEnded <- liftEffect $ O.isReadableEnded t . MonadThrow Error m
yieldWhileReadable => MonadAff m
pure $ (if readEnded then Done else Loop) unit => O.Transform a b
-> AsyncPipe (Maybe a) (Maybe b) m Unit
fromTransform stream = do
{ error: errorST, cancel: removeErrorListener } <- liftEffect $ O.withErrorST stream
let
rethrow = traverse_ throwError =<< liftEffect (liftST $ ST.Ref.read errorST)
cleanup = liftEffect removeErrorListener
liftAff $ O.awaitFinished t writeSignal =
maybeThrow liftEffect (O.isWritableEnded stream)
liftEffect $ removeErrorListener <#> if _ then AsyncPipe.WriteSignalEnded else AsyncPipe.WriteSignalOk
yield Nothing
yieldWhileReadable = void $ whileJust $ maybeYield1 readSignal =
liftEffect (O.isReadableEnded stream)
<#> if _ then AsyncPipe.ReadSignalEnded else AsyncPipe.ReadSignalOk
maybeYield1 = traverse (\a -> yield (Just a) $> Just unit) =<< O.maybeReadResult <$> liftEffect (O.read t) writeResult WriteOk = AsyncPipe.WriteAgain
writeResult WriteWouldBlock = AsyncPipe.WriteNeedsDrain
onEOS = liftEffect (O.end t) *> cleanup $> Done unit readResult (ReadJust a) = AsyncPipe.ReadOk (Just a)
onChunk a = liftEffect (O.write t a) $> Loop unit readResult ReadWouldBlock = AsyncPipe.ReadWouldBlock
go _ = do awaitWritable = liftAff $ O.awaitWritableOrClosed stream
maybeThrow awaitReadable = liftAff $ O.awaitReadableOrClosed stream
needsDrain <- liftEffect $ O.needsDrain t
ended <- liftEffect $ O.isWritableEnded t
if needsDrain then do
yieldWhileReadable
liftAff $ O.awaitWritableOrClosed t
pure $ Loop unit
else if ended then
cleanup $> Done unit
else
await >>= maybe onEOS onChunk
tailRecM go unit awaitWrite = rethrow *> awaitWritable *> writeSignal
awaitRead = rethrow *> awaitReadable *> readSignal
whenReadNotEnded m =
liftEffect (O.isReadableEnded stream)
>>= if _ then pure $ AsyncPipe.ReadOk Nothing else m
readNow = readResult <$> liftEffect (O.read stream)
writeNow a = writeResult <$> liftEffect (O.write stream a)
read = rethrow *> whenReadNotEnded readNow
write Nothing = liftEffect (O.end stream) $> AsyncPipe.WriteEnded
write (Just a) = rethrow *> writeNow a
AsyncIO ({write, awaitWrite, read, awaitRead} /\ cleanup)
-- | Given a `Producer` of values, wrap them in `Just`. -- | Given a `Producer` of values, wrap them in `Just`.
-- | -- |

View File

@@ -12,31 +12,32 @@ import Node.Buffer (Buffer)
import Node.Stream.Object as O import Node.Stream.Object as O
import Node.Zlib as Zlib import Node.Zlib as Zlib
import Node.Zlib.Types (ZlibStream) import Node.Zlib.Types (ZlibStream)
import Pipes.Core (Pipe) import Pipes.Async (AsyncPipe)
import Pipes.Node.Stream (fromTransform) import Pipes.Node.Stream (fromTransform)
fromZlib :: forall r m. MonadAff m => MonadThrow Error m => Effect (ZlibStream r) -> Pipe (Maybe Buffer) (Maybe Buffer) m Unit fromZlib :: forall r m. MonadAff m => MonadThrow Error m => Effect (ZlibStream r) -> AsyncPipe (Maybe Buffer) (Maybe Buffer) m Unit
fromZlib z = do fromZlib z =
raw <- liftEffect $ Zlib.toDuplex <$> z do
fromTransform $ O.unsafeCoerceTransform raw raw <- liftEffect $ Zlib.toDuplex <$> z
fromTransform $ O.unsafeCoerceTransform raw
gzip :: forall m. MonadAff m => MonadThrow Error m => Pipe (Maybe Buffer) (Maybe Buffer) m Unit gzip :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe (Maybe Buffer) (Maybe Buffer) m Unit
gzip = fromZlib Zlib.createGzip gzip = fromZlib Zlib.createGzip
gunzip :: forall m. MonadAff m => MonadThrow Error m => Pipe (Maybe Buffer) (Maybe Buffer) m Unit gunzip :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe (Maybe Buffer) (Maybe Buffer) m Unit
gunzip = fromZlib Zlib.createGunzip gunzip = fromZlib Zlib.createGunzip
unzip :: forall m. MonadAff m => MonadThrow Error m => Pipe (Maybe Buffer) (Maybe Buffer) m Unit unzip :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe (Maybe Buffer) (Maybe Buffer) m Unit
unzip = fromZlib Zlib.createUnzip unzip = fromZlib Zlib.createUnzip
inflate :: forall m. MonadAff m => MonadThrow Error m => Pipe (Maybe Buffer) (Maybe Buffer) m Unit inflate :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe (Maybe Buffer) (Maybe Buffer) m Unit
inflate = fromZlib Zlib.createInflate inflate = fromZlib Zlib.createInflate
deflate :: forall m. MonadAff m => MonadThrow Error m => Pipe (Maybe Buffer) (Maybe Buffer) m Unit deflate :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe (Maybe Buffer) (Maybe Buffer) m Unit
deflate = fromZlib Zlib.createDeflate deflate = fromZlib Zlib.createDeflate
brotliCompress :: forall m. MonadAff m => MonadThrow Error m => Pipe (Maybe Buffer) (Maybe Buffer) m Unit brotliCompress :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe (Maybe Buffer) (Maybe Buffer) m Unit
brotliCompress = fromZlib Zlib.createBrotliCompress brotliCompress = fromZlib Zlib.createBrotliCompress
brotliDecompress :: forall m. MonadAff m => MonadThrow Error m => Pipe (Maybe Buffer) (Maybe Buffer) m Unit brotliDecompress :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe (Maybe Buffer) (Maybe Buffer) m Unit
brotliDecompress = fromZlib Zlib.createBrotliDecompress brotliDecompress = fromZlib Zlib.createBrotliDecompress

View File

@@ -15,9 +15,12 @@ import Data.Either (hush)
import Data.HashSet as HashSet import Data.HashSet as HashSet
import Data.Hashable (class Hashable, hash) import Data.Hashable (class Hashable, hash)
import Data.List.NonEmpty (NonEmptyList) import Data.List.NonEmpty (NonEmptyList)
import Data.Maybe (Maybe(..), fromMaybe) import Data.Maybe (Maybe(..), fromMaybe, maybe)
import Data.Traversable (traverse_)
import Data.Tuple.Nested (type (/\), (/\)) import Data.Tuple.Nested (type (/\), (/\))
import Effect.Class (class MonadEffect, liftEffect) import Effect.Class (class MonadEffect, liftEffect)
import Node.Buffer (Buffer)
import Node.Buffer as Buffer
import Pipes (await, yield) import Pipes (await, yield)
import Pipes as Pipes import Pipes as Pipes
import Pipes.Core (Pipe, Producer) import Pipes.Core (Pipe, Producer)
@@ -96,6 +99,30 @@ chunked size = do
yield Nothing yield Nothing
-- | Buffers input to the given size before passing to subsequent pipes
buffered :: forall m. MonadEffect m => Int -> Pipe (Maybe Buffer) (Maybe Buffer) m Unit
buffered size = do
chunkST :: STRef _ (Maybe Buffer) <- liftEffect $ liftST $ STRef.new Nothing
let
chunkClear = liftEffect $ liftST $ STRef.write Nothing chunkST
chunkPeek = liftEffect $ liftST $ STRef.read chunkST
chunkLen = maybe (pure 0) (liftEffect <<< Buffer.size) =<< chunkPeek
chunkPut b = liftEffect do
new <- liftST (STRef.read chunkST) >>= maybe (pure b) (\a -> Buffer.concat [a, b])
void $ liftST $ STRef.write (Just new) chunkST
pure new
Rec.whileJust $ runMaybeT do
a <- MaybeT await
buf <- chunkPut a
len <- lift chunkLen
when (len > size) $ chunkClear *> lift (yield $ Just buf)
len <- chunkLen
chunkPeek >>= traverse_ (when (len > 0) <<< yield <<< Just)
yield Nothing
-- | Equivalent of unix `uniq`, filtering out duplicate values passed to it. -- | Equivalent of unix `uniq`, filtering out duplicate values passed to it.
-- | -- |
-- | Uses a `HashSet` of hashes of `a`; for `n` elements `awaited`, this pipe -- | Uses a `HashSet` of hashes of `a`; for `n` elements `awaited`, this pipe

View File

@@ -3,18 +3,19 @@ module Test.Main where
import Prelude import Prelude
import Data.Maybe (Maybe(..)) import Data.Maybe (Maybe(..))
import Data.Time.Duration (Milliseconds(..))
import Effect (Effect) import Effect (Effect)
import Effect.Aff (launchAff_) import Effect.Aff (launchAff_)
import Test.Pipes.Node.Stream as Test.Pipes.Node.Stream
import Test.Pipes.Node.Buffer as Test.Pipes.Node.Buffer
import Test.Pipes.Node.FS as Test.Pipes.Node.FS
import Test.Pipes.Collect as Test.Pipes.Collect import Test.Pipes.Collect as Test.Pipes.Collect
import Test.Pipes.Construct as Test.Pipes.Construct import Test.Pipes.Construct as Test.Pipes.Construct
import Test.Pipes.Node.Buffer as Test.Pipes.Node.Buffer
import Test.Pipes.Node.FS as Test.Pipes.Node.FS
import Test.Pipes.Node.Stream as Test.Pipes.Node.Stream
import Test.Spec.Reporter (specReporter) import Test.Spec.Reporter (specReporter)
import Test.Spec.Runner (defaultConfig, runSpec') import Test.Spec.Runner (defaultConfig, runSpec')
main :: Effect Unit main :: Effect Unit
main = launchAff_ $ runSpec' (defaultConfig { exit = false, timeout = Nothing }) [ specReporter ] do main = launchAff_ $ runSpec' (defaultConfig { slow = Milliseconds 0.0, failFast = true, exit = false, timeout = Nothing }) [ specReporter ] do
Test.Pipes.Node.Stream.spec Test.Pipes.Node.Stream.spec
Test.Pipes.Node.Buffer.spec Test.Pipes.Node.Buffer.spec
Test.Pipes.Node.FS.spec Test.Pipes.Node.FS.spec

View File

@@ -1,4 +1,16 @@
import Stream from 'stream' import Stream from 'stream'
import * as CBOR from "cbor-x";
import * as CSVDecode from "csv-parse";
import * as CSVEncode from "csv-stringify";
export const cborDecode = () => new CBOR.DecoderStream({useRecords: false, allowHalfOpen: true});
export const cborEncode = () => new CBOR.EncoderStream({useRecords: false, allowHalfOpen: true});
export const cborDecodeSync = a => () => CBOR.decodeMultiple(a);
export const cborEncodeSync = a => () => CBOR.encode(a, {useRecords: false});
export const csvDecode = () => CSVDecode.parse({columns: true, allowHalfOpen: true})
export const csvEncode = () => CSVEncode.stringify({header: true, allowHalfOpen: true})
export const discardTransform = () => new Stream.Transform({ export const discardTransform = () => new Stream.Transform({
transform: function(_ck, _enc, cb) { transform: function(_ck, _enc, cb) {
@@ -7,6 +19,16 @@ export const discardTransform = () => new Stream.Transform({
objectMode: true objectMode: true
}) })
export const slowTransform = () => {
return new Stream.Transform({
transform: function(ck, _enc, cb) {
this.push(ck)
setTimeout(() => cb(), 4)
},
objectMode: true
})
}
export const charsTransform = () => new Stream.Transform({ export const charsTransform = () => new Stream.Transform({
transform: function(ck, _enc, cb) { transform: function(ck, _enc, cb) {
ck.split('').filter(s => !!s).forEach(s => this.push(s)) ck.split('').filter(s => !!s).forEach(s => this.push(s))

View File

@@ -2,49 +2,72 @@ module Test.Pipes.Node.Stream where
import Prelude import Prelude
import Control.Monad.Trans.Class (lift) import Control.Monad.Cont (lift)
import Control.Monad.Error.Class (liftEither)
import Control.Monad.Except (runExcept)
import Data.Array as Array import Data.Array as Array
import Data.Foldable (fold) import Data.Bifunctor (lmap)
import Data.Either (Either(..))
import Data.Foldable (fold, intercalate)
import Data.FoldableWithIndex (forWithIndex_)
import Data.FunctorWithIndex (mapWithIndex)
import Data.Int as Int
import Data.List ((:)) import Data.List ((:))
import Data.List as List import Data.List as List
import Data.Maybe (Maybe) import Data.Maybe (Maybe)
import Data.Newtype (wrap) import Data.Newtype (wrap)
import Data.Profunctor.Strong (first)
import Data.String as String
import Data.String.Gen (genAlphaString) import Data.String.Gen (genAlphaString)
import Data.Traversable (for_, traverse)
import Data.Tuple.Nested (type (/\), (/\)) import Data.Tuple.Nested (type (/\), (/\))
import Effect (Effect) import Effect (Effect)
import Effect.Aff (Aff, delay) import Effect.Aff (Aff, delay)
import Effect.Class (class MonadEffect, liftEffect) import Effect.Class (class MonadEffect, liftEffect)
import Effect.Exception (error)
import Effect.Unsafe (unsafePerformEffect)
import Foreign (Foreign)
import Node.Buffer (Buffer) import Node.Buffer (Buffer)
import Node.Buffer as Buffer import Node.Buffer as Buffer
import Node.Encoding (Encoding(..)) import Node.Encoding (Encoding(..))
import Node.FS.Stream as FS.Stream import Node.FS.Stream as FS.Stream
import Node.FS.Sync as FS import Node.FS.Sync as FS
import Node.Stream.Object as O import Node.Stream.Object as O
import Node.Zlib as Zlib import Pipes (each) as Pipe
import Pipes (each) as Pipes
import Pipes (yield, (>->)) import Pipes (yield, (>->))
import Pipes.Async (sync, (>-/->))
import Pipes.Collect as Pipe.Collect
import Pipes.Core (Consumer, Producer, runEffect) import Pipes.Core (Consumer, Producer, runEffect)
import Pipes.Node.Buffer as Pipes.Buffer import Pipes.Node.Buffer as Pipe.Buffer
import Pipes.Node.Stream as S import Pipes.Node.FS as Pipe.FS
import Pipes.Prelude (mapFoldable, toListM) as Pipes import Pipes.Node.Stream as Pipe.Node
import Simple.JSON (writeJSON) import Pipes.Node.Zlib as Pipe.Zlib
import Test.Common (jsonParse, jsonStringify, tmpFile, tmpFiles) import Pipes.Prelude (toListM) as Pipe
import Simple.JSON (readImpl, readJSON, writeJSON)
import Test.Common (jsonStringify, tmpFile, tmpFiles)
import Test.QuickCheck.Arbitrary (arbitrary) import Test.QuickCheck.Arbitrary (arbitrary)
import Test.QuickCheck.Gen (randomSample') import Test.QuickCheck.Gen (randomSample')
import Test.Spec (Spec, around, describe, it) import Test.Spec (Spec, around, describe, it)
import Test.Spec.Assertions (shouldEqual) import Test.Spec.Assertions (fail, shouldEqual)
foreign import readableFromArray :: forall @a. Array a -> O.Readable a foreign import readableFromArray :: forall @a. Array a -> O.Readable a
foreign import discardTransform :: forall a b. Effect (O.Transform a b) foreign import discardTransform :: forall a b. Effect (O.Transform a b)
foreign import slowTransform :: forall a b. Effect (O.Transform a b)
foreign import charsTransform :: Effect (O.Transform String String) foreign import charsTransform :: Effect (O.Transform String String)
foreign import cborEncodeSync :: forall a. a -> Effect Buffer
foreign import cborDecodeSync :: forall a. Buffer -> Effect a
foreign import cborEncode :: forall a. Effect (O.Transform a Buffer)
foreign import cborDecode :: forall a. Effect (O.Transform Buffer a)
foreign import csvEncode :: forall a. Effect (O.Transform a String)
foreign import csvDecode :: forall a. Effect (O.Transform String a)
writer :: forall m. MonadEffect m => String -> m (O.Writable Buffer /\ Consumer (Maybe Buffer) Aff Unit) writer :: forall m. MonadEffect m => String -> m (O.Writable Buffer /\ Consumer (Maybe Buffer) Aff Unit)
writer a = do writer a = do
stream <- liftEffect $ O.unsafeCoerceWritable <$> FS.Stream.createWriteStream a stream <- liftEffect $ O.unsafeCoerceWritable <$> FS.Stream.createWriteStream a
pure $ stream /\ S.fromWritable stream pure $ stream /\ Pipe.Node.fromWritable stream
reader :: forall m. MonadEffect m => String -> m (Producer (Maybe Buffer) Aff Unit) reader :: forall m. MonadEffect m => String -> m (Producer (Maybe Buffer) Aff Unit)
reader a = liftEffect $ S.fromReadable <$> O.unsafeCoerceReadable <$> FS.Stream.createReadStream a reader a = liftEffect $ Pipe.Node.fromReadable <$> O.unsafeCoerceReadable <$> FS.Stream.createReadStream a
spec :: Spec Unit spec :: Spec Unit
spec = spec =
@@ -52,30 +75,30 @@ spec =
describe "Readable" do describe "Readable" do
describe "Readable.from(<Iterable>)" do describe "Readable.from(<Iterable>)" do
it "empty" do it "empty" do
vals <- Pipes.toListM $ (S.fromReadable $ readableFromArray @{ foo :: String } []) >-> S.unEOS vals <- Pipe.toListM $ (Pipe.Node.fromReadable $ readableFromArray @{ foo :: String } []) >-> Pipe.Node.unEOS
vals `shouldEqual` List.Nil vals `shouldEqual` List.Nil
it "singleton" do it "singleton" do
vals <- Pipes.toListM $ (S.fromReadable $ readableFromArray @{ foo :: String } [ { foo: "1" } ]) >-> S.unEOS vals <- Pipe.toListM $ (Pipe.Node.fromReadable $ readableFromArray @{ foo :: String } [ { foo: "1" } ]) >-> Pipe.Node.unEOS
vals `shouldEqual` ({ foo: "1" } : List.Nil) vals `shouldEqual` ({ foo: "1" } : List.Nil)
it "many elements" do it "many elements" do
let exp = (\n -> { foo: show n }) <$> Array.range 0 100 let exp = (\n -> { foo: show n }) <$> Array.range 0 100
vals <- Pipes.toListM $ (S.fromReadable $ readableFromArray exp) >-> S.unEOS vals <- Pipe.toListM $ (Pipe.Node.fromReadable $ readableFromArray exp) >-> Pipe.Node.unEOS
vals `shouldEqual` (List.fromFoldable exp) vals `shouldEqual` (List.fromFoldable exp)
describe "Writable" $ around tmpFile do describe "Writable" $ around tmpFile do
describe "fs.WriteStream" do describe "fs.WriteStream" do
it "pipe to file" \p -> do it "pipe to file" \p -> do
stream <- O.unsafeCoerceWritable <$> liftEffect (FS.Stream.createWriteStream p) stream <- O.unsafeCoerceWritable <$> liftEffect (FS.Stream.createWriteStream p)
let let
w = S.fromWritable stream w = Pipe.Node.fromWritable stream
source = do source = do
buf <- liftEffect $ Buffer.fromString "hello" UTF8 buf <- liftEffect $ Buffer.fromString "hello" UTF8
yield buf yield buf
runEffect $ S.withEOS source >-> w runEffect $ Pipe.Node.withEOS source >-> w
contents <- liftEffect $ FS.readTextFile UTF8 p contents <- liftEffect $ FS.readTextFile UTF8 p
contents `shouldEqual` "hello" contents `shouldEqual` "hello"
shouldEqual true =<< liftEffect (O.isWritableEnded stream) shouldEqual true =<< liftEffect (O.isWritableEnded stream)
it "async pipe to file" \p -> do it "async pipe to file" \p -> do
w <- S.fromWritable <$> O.unsafeCoerceWritable <$> liftEffect (FS.Stream.createWriteStream p) w <- Pipe.Node.fromWritable <$> O.unsafeCoerceWritable <$> liftEffect (FS.Stream.createWriteStream p)
let let
source = do source = do
yield "hello, " yield "hello, "
@@ -87,7 +110,7 @@ spec =
yield "this is a " yield "this is a "
lift $ delay $ wrap 5.0 lift $ delay $ wrap 5.0
yield "test." yield "test."
runEffect $ S.withEOS (source >-> Pipes.Buffer.fromString UTF8) >-> w runEffect $ Pipe.Node.withEOS (source >-> Pipe.Buffer.fromString UTF8) >-> w
contents <- liftEffect $ FS.readTextFile UTF8 p contents <- liftEffect $ FS.readTextFile UTF8 p
contents `shouldEqual` "hello, world! this is a test." contents `shouldEqual` "hello, world! this is a test."
it "chained pipes" \p -> do it "chained pipes" \p -> do
@@ -101,40 +124,105 @@ spec =
let let
exp = fold (writeJSON <$> objs) exp = fold (writeJSON <$> objs)
stream /\ w <- liftEffect $ writer p stream /\ w <- liftEffect $ writer p
runEffect $ S.withEOS (Pipes.each objs >-> jsonStringify >-> Pipes.Buffer.fromString UTF8) >-> w runEffect $ Pipe.Node.withEOS (Pipe.each objs >-> jsonStringify >-> Pipe.Buffer.fromString UTF8) >-> w
contents <- liftEffect $ FS.readTextFile UTF8 p contents <- liftEffect $ FS.readTextFile UTF8 p
contents `shouldEqual` exp contents `shouldEqual` exp
shouldEqual true =<< liftEffect (O.isWritableEnded stream) shouldEqual true =<< liftEffect (O.isWritableEnded stream)
describe "Transform" do describe "Transform" do
it "gzip" do let
let bignums = Array.range 1 1000
json = yield $ writeJSON { foo: "bar" } firstNames = String.split (wrap "\n") $ unsafePerformEffect (FS.readTextFile UTF8 "./test/Test/first_names.txt")
exp = "1f8b0800000000000003ab564acbcf57b2524a4a2c52aa0500eff52bfe0d000000" lastNames = String.split (wrap "\n") $ unsafePerformEffect (FS.readTextFile UTF8 "./test/Test/last_names.txt")
gzip <- S.fromTransform <$> O.unsafeCoerceTransform <$> liftEffect (Zlib.toDuplex <$> Zlib.createGzip) names n = do
outs :: List.List String <- Pipes.toListM (S.withEOS (json >-> Pipes.Buffer.fromString UTF8) >-> gzip >-> S.unEOS >-> Pipes.Buffer.toString Hex) first <- firstNames
fold outs `shouldEqual` exp last <- Array.take (Int.round $ Int.toNumber n / Int.toNumber (Array.length firstNames)) lastNames
around tmpFiles pure $ first <> " " <> last
$ it "file >-> gzip >-> file >-> gunzip" \(a /\ b) -> do people n = mapWithIndex (\ix name -> {id: show $ ix + 1, name}) (names n)
liftEffect $ FS.writeTextFile UTF8 a $ writeJSON [ 1, 2, 3, 4 ] peopleCSV n = "id,name\n" <> intercalate "\n" ((\{id, name} -> id <> "," <> name) <$> people n)
areader <- liftEffect $ reader a
bwritestream /\ bwriter <- liftEffect $ writer b
gzip <- S.fromTransform <$> O.unsafeCoerceTransform <$> liftEffect (Zlib.toDuplex <$> Zlib.createGzip)
runEffect $ areader >-> gzip >-> bwriter
shouldEqual true =<< liftEffect (O.isWritableEnded bwritestream)
gunzip <- S.fromTransform <$> O.unsafeCoerceTransform <$> liftEffect (Zlib.toDuplex <$> Zlib.createGunzip) for_ [4000, 8000, 32000, 64000, 200000] \n -> do
breader <- liftEffect $ reader b let
nums <- Pipes.toListM (breader >-> gunzip >-> S.unEOS >-> Pipes.Buffer.toString UTF8 >-> jsonParse @(Array Int) >-> Pipes.mapFoldable identity) csv = peopleCSV n
Array.fromFoldable nums `shouldEqual` [ 1, 2, 3, 4 ] people' = people n
around tmpFiles
$ it (show n <> " row csv >-/-> csv-parse >-/-> cborEncode") \(a /\ _) -> do
liftEffect $ FS.writeTextFile UTF8 a csv
cbor :: Buffer <- Pipe.Collect.toBuffer
$ Pipe.FS.read a
>-> Pipe.Node.inEOS (Pipe.Buffer.toString UTF8)
>-/-> Pipe.Node.fromTransformEffect csvDecode
>-/-> Pipe.Node.fromTransformEffect cborEncode
>-> Pipe.Node.unEOS
f :: Array Foreign <- liftEffect $ cborDecodeSync cbor
ppl <- traverse (liftEither <<< lmap (error <<< show) <<< runExcept <<< readImpl) f
ppl `shouldEqual` people'
around tmpFiles
$ it (show n <> " row csv >-> sync csv-parse >-> sync cborEncode") \(a /\ _) -> do
liftEffect $ FS.writeTextFile UTF8 a csv
cbor :: Buffer <- Pipe.Collect.toBuffer
$ Pipe.FS.read a
>-> Pipe.Node.inEOS (Pipe.Buffer.toString UTF8)
>-> sync (Pipe.Node.fromTransformEffect csvDecode)
>-> sync (Pipe.Node.fromTransformEffect cborEncode)
>-> Pipe.Node.unEOS
f :: Array Foreign <- liftEffect $ cborDecodeSync cbor
ppl <- traverse (liftEither <<< lmap (error <<< show) <<< runExcept <<< readImpl) f
ppl `shouldEqual` people'
around tmpFiles
$ it "file >-> sync gzip >-> sync gunzip" \(a /\ _) -> do
liftEffect $ FS.writeTextFile UTF8 a $ writeJSON bignums
json <- Pipe.Collect.toMonoid
$ Pipe.FS.read a
>-> sync Pipe.Zlib.gzip
>-> sync Pipe.Zlib.gunzip
>-> Pipe.Node.unEOS
>-> Pipe.Buffer.toString UTF8
readJSON json `shouldEqual` (Right bignums)
around tmpFiles
$ it "file >-/-> gzip >-/-> slow >-/-> gunzip" \(a /\ _) -> do
liftEffect $ FS.writeTextFile UTF8 a $ writeJSON bignums
json <-
Pipe.Collect.toMonoid
$ Pipe.FS.read a
>-/-> Pipe.Zlib.gzip
>-/-> Pipe.Node.fromTransformEffect slowTransform
>-/-> Pipe.Zlib.gunzip
>-> Pipe.Node.unEOS
>-> Pipe.Buffer.toString UTF8
readJSON json `shouldEqual` (Right bignums)
around tmpFiles
$ it "file >-> sync gzip >-> sync slow >-> sync gunzip" \(a /\ _) -> do
liftEffect $ FS.writeTextFile UTF8 a $ writeJSON bignums
json <-
Pipe.Collect.toMonoid
$ Pipe.FS.read a
>-> sync Pipe.Zlib.gzip
>-> sync (Pipe.Node.fromTransformEffect slowTransform)
>-> sync Pipe.Zlib.gunzip
>-> Pipe.Node.unEOS
>-> Pipe.Buffer.toString UTF8
readJSON json `shouldEqual` (Right bignums)
around tmpFile $ it "file >-> discardTransform" \(p :: String) -> do around tmpFile $ it "file >-> discardTransform" \(p :: String) -> do
liftEffect $ FS.writeTextFile UTF8 p "foo" liftEffect $ FS.writeTextFile UTF8 p "foo"
r <- reader p r <- reader p
discard' <- liftEffect discardTransform out :: List.List Int <-
out :: List.List Int <- Pipes.toListM $ r >-> S.fromTransform discard' >-> S.unEOS Pipe.toListM
$ r
>-/-> Pipe.Node.fromTransformEffect discardTransform
>-> Pipe.Node.unEOS
out `shouldEqual` List.Nil out `shouldEqual` List.Nil
around tmpFile $ it "file >-> charsTransform" \(p :: String) -> do around tmpFile $ it "file >-> charsTransform" \(p :: String) -> do
liftEffect $ FS.writeTextFile UTF8 p "foo bar" liftEffect $ FS.writeTextFile UTF8 p "foo bar"
r <- reader p r <- reader p
chars' <- liftEffect charsTransform out :: List.List String <-
out :: List.List String <- Pipes.toListM $ r >-> S.inEOS (Pipes.Buffer.toString UTF8) >-> S.fromTransform chars' >-> S.unEOS Pipe.toListM $
r
>-> Pipe.Node.inEOS (Pipe.Buffer.toString UTF8)
>-/-> Pipe.Node.fromTransformEffect charsTransform
>-> Pipe.Node.unEOS
out `shouldEqual` List.fromFoldable [ "f", "o", "o", " ", "b", "a", "r" ] out `shouldEqual` List.fromFoldable [ "f", "o", "o", " ", "b", "a", "r" ]

4095
test/Test/first_names.txt Normal file

File diff suppressed because it is too large Load Diff

4096
test/Test/last_names.txt Normal file

File diff suppressed because it is too large Load Diff