21 Commits

Author SHA1 Message Date
7d03850623 chore: prepare v2.1.5 2024-07-08 12:43:58 -05:00
dc7a2d3387 feat: Pipe.Util.buffered 2024-07-08 12:40:25 -05:00
43ff92a4ad chore: prepare v2.1.4 2024-06-25 14:26:40 -05:00
bb9790b2f4 chore: prepare v2.1.34 2024-06-25 14:24:50 -05:00
6b64956034 chore: prepare v2.1.3 2024-06-25 14:24:46 -05:00
271ca13f8b chore: prepare v2.1.2 2024-06-25 14:20:38 -05:00
4a9dbf0a3c fix: relax pipeAsync constraints 2024-06-25 14:20:25 -05:00
c73d934a5c chore: prepare v2.1.1 2024-06-23 20:49:22 -05:00
dc1ba322a9 fix: asyncpipe is mfunctor 2024-06-23 20:49:17 -05:00
cda67508d4 chore: prepare v2.1.0 2024-06-23 18:34:01 -05:00
860ace3990 feat: AsyncPipe really should be a monad 2024-06-23 18:33:45 -05:00
7f11c303fb chore: prepare v2.0.2 2024-06-22 19:33:56 -05:00
2e0be4ac62 fix: loop bug 2024-06-22 19:33:45 -05:00
0ba315ede0 chore: prepare v2.0.1 2024-06-22 19:12:56 -05:00
08bd9a817a fix: asyncpipe is profunctor 2024-06-22 19:12:31 -05:00
970d890a00 chore: prepare v2.0.0 2024-06-22 18:42:49 -05:00
5b3eda707e feat!: add AsyncPipe abstraction, significantly improve throughput of Transform streams 2024-06-22 18:42:22 -05:00
4cd44367a8 chore: prepare v1.6.1 2024-06-21 13:21:22 -05:00
d76f55e267 fix: introduced transform bug 2024-06-21 13:21:19 -05:00
4b91ab7d5c chore: prepare v1.6.0 2024-06-20 15:40:32 -05:00
a8702f4849 fix: finish may not emit until all chunks are read 2024-06-20 15:40:17 -05:00
16 changed files with 8887 additions and 130 deletions

BIN
bun.lockb

Binary file not shown.

View File

@@ -1,12 +1,13 @@
{
"name": "purescript-node-stream-pipes",
"version": "v1.5.0",
"version": "v2.1.5",
"type": "module",
"dependencies": {
"csv-parse": "^5.5.5",
"csv-stringify": "^6.4.6"
"csv-parse": "^5.5.6",
"csv-stringify": "^6.5.0"
},
"devDependencies": {
"cbor-x": "^1.5.9",
"typescript": "^5.4.5"
}
}

View File

@@ -5,21 +5,26 @@ workspace:
dependencies:
- aff: ">=7.1.0 <8.0.0"
- arrays: ">=7.3.0 <8.0.0"
- console: ">=6.1.0 <7.0.0"
- control: ">=6.0.0 <7.0.0"
- datetime: ">=6.1.0 <7.0.0"
- effect: ">=4.0.0 <5.0.0"
- either: ">=6.1.0 <7.0.0"
- exceptions: ">=6.0.0 <7.0.0"
- foldable-traversable: ">=6.0.0 <7.0.0"
- foreign-object: ">=4.1.0 <5.0.0"
- fork: ">=6.0.0 <7.0.0"
- lists: ">=7.0.0 <8.0.0"
- maybe: ">=6.0.0 <7.0.0"
- mmorph: ">=7.0.0 <8.0.0"
- newtype: ">=5.0.0 <6.0.0"
- node-buffer: ">=9.0.0 <10.0.0"
- node-event-emitter: ">=3.0.0 <4.0.0"
- node-fs: ">=9.1.0 <10.0.0"
- node-fs: ">=9.1.0 <9.2.0"
- node-path: ">=5.0.0 <6.0.0"
- node-streams: ">=9.0.0 <10.0.0"
- node-zlib: ">=0.4.0 <0.5.0"
- now: ">=6.0.0 <7.0.0"
- ordered-collections: ">=3.2.0 <4.0.0"
- parallel: ">=6.0.0 <7.0.0"
- pipes: ">=8.0.0 <9.0.0"
@@ -297,8 +302,8 @@ packages:
- unfoldable
exceptions:
type: registry
version: 6.0.0
integrity: sha256-y/xTAEIZIARCE+50/u1di0ncebJ+CIwNOLswyOWzMTw=
version: 6.1.0
integrity: sha256-K0T89IHtF3vBY7eSAO7eDOqSb2J9kZGAcDN5+IKsF8E=
dependencies:
- effect
- either
@@ -862,8 +867,8 @@ packages:
- refs
transformers:
type: registry
version: 6.0.0
integrity: sha256-Pzw40HjthX77tdPAYzjx43LK3X5Bb7ZspYAp27wksFA=
version: 6.1.0
integrity: sha256-3Bm+Z6tsC/paG888XkywDngJ2JMos+JfOhRlkVfb7gI=
dependencies:
- control
- distributive
@@ -876,6 +881,7 @@ packages:
- maybe
- newtype
- prelude
- st
- tailrec
- tuples
- unfoldable

View File

@@ -1,7 +1,7 @@
package:
name: node-stream-pipes
publish:
version: '1.5.0'
version: '2.1.5'
license: 'GPL-3.0-or-later'
location:
githubOwner: 'cakekindel'
@@ -12,21 +12,26 @@ package:
dependencies:
- aff: ">=7.1.0 <8.0.0"
- arrays: ">=7.3.0 <8.0.0"
- console: ">=6.1.0 <7.0.0"
- control: ">=6.0.0 <7.0.0"
- datetime: ">=6.1.0 <7.0.0"
- effect: ">=4.0.0 <5.0.0"
- either: ">=6.1.0 <7.0.0"
- exceptions: ">=6.0.0 <7.0.0"
- foldable-traversable: ">=6.0.0 <7.0.0"
- foreign-object: ">=4.1.0 <5.0.0"
- fork: ">=6.0.0 <7.0.0"
- lists: ">=7.0.0 <8.0.0"
- maybe: ">=6.0.0 <7.0.0"
- mmorph: ">=7.0.0 <8.0.0"
- newtype: ">=5.0.0 <6.0.0"
- node-buffer: ">=9.0.0 <10.0.0"
- node-event-emitter: ">=3.0.0 <4.0.0"
- node-fs: ">=9.1.0 <10.0.0"
- node-fs: ">=9.1.0 <9.2.0"
- node-path: ">=5.0.0 <6.0.0"
- node-streams: ">=9.0.0 <10.0.0"
- node-zlib: ">=0.4.0 <0.5.0"
- now: ">=6.0.0 <7.0.0"
- ordered-collections: ">=3.2.0 <4.0.0"
- parallel: ">=6.0.0 <7.0.0"
- pipes: ">=8.0.0 <9.0.0"

View File

@@ -21,6 +21,9 @@ export const isReadableEndedImpl = (s) => () => s.readableEnded;
/** @type {(s: Stream.Writable | Stream.Transform) => () => boolean} */
export const isWritableEndedImpl = (s) => () => s.writableEnded;
/** @type {(s: Stream.Writable | Stream.Transform) => () => boolean} */
export const isWritableFinishedImpl = (s) => () => s.writableFinished;
/** @type {(s: Stream.Writable | Stream.Transform) => () => void} */
export const endImpl = (s) => () => s.end();

View File

@@ -14,8 +14,9 @@ import Data.Maybe (Maybe(..))
import Data.Show.Generic (genericShow)
import Effect (Effect)
import Effect.Aff (Aff, effectCanceler, makeAff)
import Effect.Aff as Aff
import Effect.Class (liftEffect)
import Effect.Exception (Error)
import Effect.Exception (Error, error)
import Effect.Uncurried (mkEffectFn1)
import Node.Buffer (Buffer)
import Node.EventEmitter (EventHandle(..))
@@ -61,6 +62,7 @@ foreign import isReadableImpl :: forall s. s -> Effect Boolean
foreign import isWritableImpl :: forall s. s -> Effect Boolean
foreign import isReadableEndedImpl :: forall s. s -> Effect Boolean
foreign import isWritableEndedImpl :: forall s. s -> Effect Boolean
foreign import isWritableFinishedImpl :: forall s. s -> Effect Boolean
foreign import isClosedImpl :: forall s. s -> Effect Boolean
foreign import needsDrainImpl :: forall s. s -> Effect Boolean
foreign import readableLengthImpl :: forall s. s -> Effect Int
@@ -94,6 +96,7 @@ class Stream s <= Write s a | s -> a where
isWritable :: s -> Effect Boolean
needsDrain :: s -> Effect Boolean
isWritableEnded :: s -> Effect Boolean
isWritableFinished :: s -> Effect Boolean
write :: s -> a -> Effect WriteResult
end :: s -> Effect Unit
@@ -116,18 +119,21 @@ else instance (Read s a) => Read s a where
instance Write (Writable a) a where
isWritable = isWritableImpl
isWritableEnded = isWritableEndedImpl
isWritableFinished = isWritableFinishedImpl
write s = writeImpl writeResultFFI s
end = endImpl
needsDrain = needsDrainImpl
else instance Write (Transform a b) a where
isWritable = isWritableImpl
isWritableEnded = isWritableEndedImpl
isWritableFinished = isWritableFinishedImpl
write s = writeImpl writeResultFFI s
end = endImpl
needsDrain = needsDrainImpl
else instance (Write s a) => Write s a where
isWritable = isWritableImpl
isWritableEnded = isWritableEndedImpl
isWritableFinished = isWritableFinishedImpl
write s a = write s a
end s = end s
needsDrain = needsDrainImpl
@@ -166,26 +172,46 @@ unsafeFromStringWritable :: forall r. Stream.Writable r -> Writable String
unsafeFromStringWritable = unsafeCoerce
awaitReadableOrClosed :: forall s a. Read s a => s -> Aff Unit
awaitReadableOrClosed s = do
awaitReadableOrClosed s = Aff.supervise do
fiber <-
Aff.forkAff
$ parOneOf
[ onceAff0 readableH s $> Right unit
, onceAff0 closeH s $> Right unit
, Left <$> onceAff1 errorH s
]
closed <- liftEffect $ isClosed s
readEnded <- liftEffect $ isReadableEnded s
readable <- liftEffect $ isReadable s
length <- liftEffect $ readableLength s
when (readable && length == 0)
$ liftEither
=<< parOneOf
[ onceAff0 readableH s $> Right unit
, onceAff0 closeH s $> Right unit
, Left <$> onceAff1 errorH s
]
if (not closed && not readEnded && readable && length == 0) then
liftEither =<< Aff.joinFiber fiber
else
Aff.killFiber (error "") fiber
awaitFinished :: forall s a. Write s a => s -> Aff Unit
awaitFinished s = onceAff0 finishH s
awaitFinished s = Aff.supervise do
fiber <- Aff.forkAff $ onceAff0 finishH s
finished <- liftEffect $ isWritableFinished s
if not finished then Aff.joinFiber fiber else Aff.killFiber (error "") fiber
awaitWritableOrClosed :: forall s a. Write s a => s -> Aff Unit
awaitWritableOrClosed s = do
awaitWritableOrClosed s = Aff.supervise do
fiber <-
Aff.forkAff
$ parOneOf
[ onceAff0 drainH s $> Right unit
, onceAff0 closeH s $> Right unit
, Left <$> onceAff1 errorH s
]
closed <- liftEffect $ isClosed s
writeEnded <- liftEffect $ isWritableEnded s
writable <- liftEffect $ isWritable s
needsDrain <- liftEffect $ needsDrain s
when (writable && needsDrain)
$ liftEither =<< parOneOf [ onceAff0 drainH s $> Right unit, onceAff0 closeH s $> Right unit, Left <$> onceAff1 errorH s ]
if not closed && not writeEnded && writable && needsDrain then
liftEither =<< Aff.joinFiber fiber
else
Aff.killFiber (error "") fiber
onceAff0 :: forall e. EventHandle0 e -> e -> Aff Unit
onceAff0 h emitter = makeAff \res -> do

368
src/Pipes.Async.purs Normal file
View File

@@ -0,0 +1,368 @@
module Pipes.Async where
import Prelude hiding (join)
import Control.Alternative (class Alternative, empty, guard)
import Control.Monad.Cont (class MonadTrans)
import Control.Monad.Error.Class (class MonadError, class MonadThrow, catchError, throwError)
import Control.Monad.Except (ExceptT, runExceptT)
import Control.Monad.Fork.Class (class MonadBracket, class MonadFork, fork)
import Control.Monad.Maybe.Trans (MaybeT(..), runMaybeT)
import Control.Monad.Morph (class MFunctor, hoist)
import Control.Monad.Rec.Class (class MonadRec, Step(..), tailRecM)
import Control.Monad.ST.Class (liftST)
import Control.Monad.ST.Ref (STRef)
import Control.Monad.ST.Ref as ST.Ref
import Control.Monad.Trans.Class (lift)
import Control.Parallel (class Parallel, parOneOf)
import Data.Array as Array
import Data.DateTime.Instant as Instant
import Data.Either (Either(..), either)
import Data.Foldable (class Foldable, fold)
import Data.Generic.Rep (class Generic)
import Data.Maybe (Maybe(..), fromMaybe, isNothing)
import Data.Newtype (unwrap)
import Data.Show.Generic (genericShow)
import Data.Time.Duration (Milliseconds)
import Data.Traversable (class Traversable, traverse, traverse_)
import Data.Tuple.Nested (type (/\), (/\))
import Effect.Aff.Class (class MonadAff, liftAff)
import Effect.Class (class MonadEffect, liftEffect)
import Effect.Console (log)
import Effect.Now as Now
import Pipes (await, yield)
import Pipes.Collect as Collect
import Pipes.Core (Pipe, Proxy, Producer)
data WriteSignal
= WriteSignalOk
| WriteSignalEnded
derive instance Generic WriteSignal _
derive instance Eq WriteSignal
derive instance Ord WriteSignal
instance Show WriteSignal where show = genericShow
instance Discard WriteSignal where
discard = bind
data ReadSignal
= ReadSignalOk
| ReadSignalEnded
derive instance Generic ReadSignal _
derive instance Eq ReadSignal
derive instance Ord ReadSignal
instance Show ReadSignal where show = genericShow
instance Discard ReadSignal where
discard = bind
data WriteResult
= WriteAgain
| WriteNeedsDrain
| WriteEnded
derive instance Generic WriteResult _
derive instance Eq WriteResult
derive instance Ord WriteResult
instance Show WriteResult where show = genericShow
data ReadResult a
= ReadOk a
| ReadWouldBlock
derive instance Generic (ReadResult a) _
derive instance Eq a => Eq (ReadResult a)
derive instance Ord a => Ord (ReadResult a)
derive instance Functor ReadResult
derive instance Foldable ReadResult
derive instance Traversable ReadResult
instance Show a => Show (ReadResult a) where show = genericShow
type AsyncIO a b m r =
{ write :: a -> m WriteResult
, read :: m (ReadResult b)
, awaitWrite :: m WriteSignal
, awaitRead :: m ReadSignal
}
/\ AsyncPipe a b m r
-- | An `AsyncPipe` is a `Pipe`-like struct that allows
-- | concurrently reading from a `Producer` and writing to a `Consumer`.
-- |
-- | An implementation of `AsyncPipe` for Node `Transform` streams
-- | is provided in `Pipes.Node.Stream`.
data AsyncPipe a b m r
-- | A pure return value
= Pure r
-- | An `AsyncPipe` behind a computation
| M (m (AsyncPipe a b m r))
-- | Interface to write & read from the backing resource
| AsyncIO (AsyncIO a b m r)
-- | Modify request / response types
mapIO :: forall aa ab ba bb m r. Monad m => (ab -> aa) -> (ba -> bb) -> AsyncPipe aa ba m r -> AsyncPipe ab bb m r
mapIO _ _ (Pure a) = Pure a
mapIO a b (M m) = M $ mapIO a b <$> m
mapIO a b (AsyncIO ({write, awaitWrite, read, awaitRead} /\ m)) =
AsyncIO $ {write: write <<< a, awaitWrite, read: map b <$> read, awaitRead} /\ mapIO a b m
-- | Modify request / response types
bindIO :: forall aa ab ba bb m r. Monad m => (ab -> m aa) -> (ba -> m bb) -> AsyncPipe aa ba m r -> AsyncPipe ab bb m r
bindIO _ _ (Pure a) = Pure a
bindIO a b (M m) = M $ bindIO a b <$> m
bindIO a b (AsyncIO ({write, awaitWrite, read, awaitRead} /\ m)) =
AsyncIO $ {write: flip bind write <<< a, awaitWrite, read: traverse b =<< read, awaitRead} /\ bindIO a b m
-- | Remove the `AsyncPipe` wrapper by discarding the IO
stripIO :: forall a b m r. Monad m => AsyncPipe a b m r -> m r
stripIO (Pure r) = pure r
stripIO (M m) = m >>= stripIO
stripIO (AsyncIO (_ /\ m)) = stripIO m
-- | Execute the `AsyncPipe` monad stack until `AsyncIO` is reached (if any)
getAsyncIO :: forall a b m r. Monad m => AsyncPipe a b m r -> m (Maybe (AsyncIO a b m r))
getAsyncIO (AsyncIO a) = pure $ Just a
getAsyncIO (M m) = m >>= getAsyncIO
getAsyncIO (Pure _) = pure Nothing
instance MonadTrans (AsyncPipe a b) where
lift = M <<< map Pure
instance MFunctor (AsyncPipe a b) where
hoist _ (Pure a) = Pure a
hoist f (M m) = M $ f $ hoist f <$> m
hoist f (AsyncIO ({read, write, awaitWrite, awaitRead} /\ m)) =
AsyncIO
$ { read: f read
, write: f <<< write
, awaitWrite: f awaitWrite
, awaitRead: f awaitRead
}
/\ hoist f m
instance Monad m => Functor (AsyncPipe a b m) where
map f (Pure r) = Pure $ f r
map f (M m) = M $ map f <$> m
map f (AsyncIO (io /\ m)) = AsyncIO $ io /\ (f <$> m)
instance Monad m => Apply (AsyncPipe a b m) where
apply (Pure f) ma = f <$> ma
apply (M mf) ma = M $ (_ <*> ma) <$> mf
apply (AsyncIO (io /\ mf)) ma = AsyncIO $ io /\ (mf <*> ma)
instance Monad m => Applicative (AsyncPipe a b m) where
pure = Pure
instance Monad m => Bind (AsyncPipe a b m) where
bind (Pure a) f = f a
bind (M ma) f = M $ (_ >>= f) <$> ma
bind (AsyncIO (io /\ m)) f = AsyncIO $ io /\ (m >>= f)
instance Monad m => Monad (AsyncPipe a b m)
instance MonadThrow e m => MonadThrow e (AsyncPipe a b m) where
throwError = lift <<< throwError
instance MonadError e m => MonadError e (AsyncPipe a b m) where
catchError m f = lift $ catchError (stripIO m) (stripIO <<< f)
instance MonadEffect m => MonadEffect (AsyncPipe a b m) where
liftEffect = lift <<< liftEffect
instance MonadAff m => MonadAff (AsyncPipe a b m) where
liftAff = lift <<< liftAff
-- | Wraps all fields of an `AsyncPipe` with logging to debug
-- | behavior and timing.
debug :: forall a b m r. MonadAff m => String -> AsyncPipe (Maybe a) (Maybe b) m r -> AsyncPipe (Maybe a) (Maybe b) m r
debug c m =
let
logL :: forall m'. MonadEffect m' => _ -> m' Unit
logL msg = liftEffect $ log $ "[" <> c <> "] " <> msg
logR :: forall m'. MonadEffect m' => _ -> m' Unit
logR msg = liftEffect $ log $ "[" <> c <> "] " <> fold (Array.replicate 20 " ") <> msg
time :: forall m' a'. MonadEffect m' => m' a' -> m' (Milliseconds /\ a')
time ma = do
start <- liftEffect Now.now
a <- ma
end <- liftEffect Now.now
pure $ (end `Instant.diff` start) /\ a
in
flip bind (fromMaybe m)
$ runMaybeT do
(io /\ done') <- MaybeT $ lift $ getAsyncIO m
let
write a = do
logL "write >"
elapsed /\ w <- time $ io.write a
logL $ "< write " <> show w <> " (" <> show (unwrap elapsed) <> "ms)"
pure w
read = do
logR "read >"
elapsed /\ r <- time $ io.read
logR $ "< read " <> show (map (const unit) <$> r) <> " (" <> show (unwrap elapsed) <> "ms)"
pure r
awaitWrite = do
logL "awaitWrite >"
elapsed /\ w <- time $ io.awaitWrite
logL $ "< awaitWrite " <> show w <> " (" <> show (unwrap elapsed) <> "ms)"
pure w
awaitRead = do
logR "awaitRead >"
elapsed /\ r <- time $ io.awaitRead
logR $ "< awaitRead " <> show r <> " (" <> show (unwrap elapsed) <> "ms)"
pure r
done = do
logL "done >"
elapsed /\ r <- time done'
logL $ "< done (" <> show (unwrap elapsed) <> "ms)"
pure r
pure $ AsyncIO $ {write, read, awaitWrite, awaitRead} /\ done
-- | Convert an `AsyncPipe` to a regular `Pipe`.
-- |
-- | Rather than two concurrently-running halves (producer & consumer),
-- | this requires the `AsyncPipe` to occasionally stop `await`ing data
-- | written by the upstream `Producer` so that it can `yield` to the downstream `Consumer`.
-- |
-- | This implementation chooses to prioritize `yield`ing data to the `Consumer` over
-- | `await`ing written chunks.
-- |
-- | Note that using this limits the potential parallelism of the entire pipeline, ex:
-- |
-- | ```purs
-- | Pipe.FS.read "foo.csv" -- read
-- | >-> sync Pipe.CSV.parse -- parse
-- | >-> sync Pipe.CBOR.encode -- encode
-- | >-> Pipe.FS.write "foo.bin" -- write
-- | ```
-- |
-- | In the above example, this is what happens when the pipeline
-- | is executed:
-- | 1. `write` asks `encode` "do you have any data yet?" (fast)
-- | 1. `encode` asks `parse` "do you have any data yet?" (fast)
-- | 1. `parse` asks `read` "do you have any data yet?" (fast)
-- | 1. `read` passes 1 chunk to `parse` (fast)
-- | 1. `parse` blocks until the chunk is parsed (slow)
-- | 1. `parse` passes 1 chunk to `encode` (fast)
-- | 1. `encode` blocks until the chunk is encoded (slow)
-- | 1. `write` writes the block (fast)
-- |
-- | For larger workloads, changing this to use `asyncPipe` would be preferable, ex:
-- | ```purs
-- | Pipe.FS.read "foo.csv" -- read
-- | >-/-> Pipe.CSV.parse -- parse
-- | >-/-> Pipe.CBOR.encode -- encode
-- | >-> Pipe.FS.write "foo.bin" -- write
-- | ```
-- |
-- | With this change:
-- | * `read` will pass chunks to `parse` as fast as `parse` allows
-- | * `parse` will parse chunks and yield them to `encode` as soon as they're ready
-- | * `encode` will encode chunks and yield them to `write` as soon as they're ready
sync :: forall a b f p e m r. MonadError e m => Alternative p => Parallel p m => MonadFork f m => MonadAff m => AsyncPipe (Maybe a) (Maybe b) m r -> Pipe (Maybe a) (Maybe b) m r
sync m =
let
liftPipe :: forall r'. (Proxy _ _ _ _ m) r' -> ExceptT (Step _ _) (Proxy _ _ _ _ m) r'
liftPipe = lift
liftM :: forall r'. m r' -> ExceptT (Step _ _) (Proxy _ _ _ _ m) r'
liftM = liftPipe <<< lift
in
lift (getAsyncIO m) >>=
case _ of
Nothing -> lift $ stripIO m
Just ({write, awaitWrite, read, awaitRead} /\ done) ->
let
awaitRW onR onW =
liftM (parOneOf [Right <$> awaitWrite, Left <$> awaitRead])
>>= either (const onR) onW
wSignal WriteSignalOk = WriteAgain
wSignal WriteSignalEnded = WriteEnded
tailRecEarly f a = tailRecM (map (either identity identity) <<< runExceptT <<< f) a
continue a = throwError (Loop a)
break = (Done <$> liftM (stripIO done)) >>= throwError
in do
flip tailRecEarly WriteAgain \writable -> do
rb <- liftM read
case rb of
ReadWouldBlock
| writable == WriteEnded -> liftM awaitRead *> continue writable
| writable == WriteNeedsDrain -> awaitRW (continue writable) (continue <<< wSignal)
| otherwise -> pure unit
ReadOk (Just b) -> liftPipe (yield $ Just b) *> continue writable
ReadOk Nothing -> liftPipe (yield Nothing) *> break
when (writable /= WriteAgain) $ continue writable
a <- liftPipe await
writable' <- liftM $ write a
when (isNothing a) $ continue WriteEnded
pure $ Loop writable'
-- | Implementation of `(>-/->)`
-- |
-- | In the current `MonadFork` "thread", read data from the `AsyncPipe` as it
-- | is yielded and `yield` to the downstream `Consumer`.
-- |
-- | Concurrently, in a separate thread, read data from the upstream `Producer`
-- | and write to the `AsyncPipe` at max throughput.
-- |
-- | If the producing half fails, the error is caught and rethrown.
-- |
-- | If the consuming half fails, the error is caught, the producing half is killed, and the error is rethrown.
pipeAsync
:: forall e f m a b
. MonadRec m
=> MonadAff m
=> MonadBracket e f m
=> Producer (Maybe a) m Unit
-> AsyncPipe (Maybe a) (Maybe b) m Unit
-> Producer (Maybe b) m Unit
pipeAsync prod m =
lift (getAsyncIO m)
>>= case _ of
Nothing -> pure unit
Just ({write, read, awaitWrite, awaitRead} /\ done) -> do
errST :: STRef _ (Maybe e) <- lift $ liftEffect $ liftST $ ST.Ref.new Nothing
killST :: STRef _ Boolean <- lift $ liftEffect $ liftST $ ST.Ref.new false
let
killThread = void $ liftEffect $ liftST $ ST.Ref.write true killST
threadKilled = liftEffect $ liftST $ ST.Ref.read killST
putThreadError = void <<< liftEffect <<< liftST <<< flip ST.Ref.write errST <<< Just
getThreadError = liftEffect $ liftST $ ST.Ref.read errST
rx a = do
killed <- threadKilled
guard $ not killed
w <- lift $ write a
case w of
WriteNeedsDrain -> lift $ void awaitWrite
WriteEnded -> empty
WriteAgain -> pure unit
spawn = lift <<< fork <<< flip catchError putThreadError
_thread <- spawn $ void $ runMaybeT $ Collect.foreach rx (hoist lift prod)
flip tailRecM unit $ const do
getThreadError >>= traverse_ throwError
rb <- lift read
case rb of
ReadOk (Just b) -> yield (Just b) $> Loop unit
ReadOk Nothing -> killThread *> yield Nothing $> Done unit
ReadWouldBlock -> void (lift awaitRead) $> Loop unit
lift $ stripIO done
infixl 7 pipeAsync as >-/->

View File

@@ -31,12 +31,8 @@ traverse :: forall a b m. MonadRec m => (b -> a -> m b) -> b -> Producer a m Uni
traverse f b0 p0 =
flip tailRecM (p0 /\ b0) \(p /\ b) ->
case p of
Respond a m -> do
b' <- f b a
pure $ Loop $ m unit /\ b'
M m -> do
n <- m
pure $ Loop $ (n /\ b)
Respond a m -> Loop <$> (m unit /\ _) <$> f b a
M m -> Loop <$> (_ /\ b) <$> m
Request _ _ -> pure $ Done b
Pure _ -> pure $ Done b

View File

@@ -3,23 +3,25 @@ module Pipes.Node.Stream where
import Prelude hiding (join)
import Control.Monad.Error.Class (class MonadThrow, throwError)
import Control.Monad.Rec.Class (class MonadRec, Step(..), tailRecM, whileJust)
import Control.Monad.Rec.Class (class MonadRec, Step(..), tailRecM)
import Control.Monad.ST.Class (liftST)
import Control.Monad.ST.Ref as ST.Ref
import Control.Monad.ST.Ref as STRef
import Control.Monad.Trans.Class (lift)
import Data.Maybe (Maybe(..), maybe)
import Data.Newtype (wrap)
import Data.Traversable (for_, traverse, traverse_)
import Data.Traversable (for_, traverse_)
import Data.Tuple.Nested ((/\))
import Effect.Aff (delay)
import Effect (Effect)
import Effect.Aff.Class (class MonadAff, liftAff)
import Effect.Class (liftEffect)
import Effect.Exception (Error)
import Node.Stream.Object (ReadResult(..), WriteResult(..))
import Node.Stream.Object as O
import Pipes (await, yield)
import Pipes (for) as P
import Pipes.Async (AsyncPipe(..))
import Pipes.Async as AsyncPipe
import Pipes.Core (Consumer, Pipe, Producer, Producer_)
import Pipes.Prelude (mapFoldable) as P
import Pipes.Util (InvokeResult(..), invoke)
-- | Convert a `Readable` stream to a `Pipe`.
@@ -86,45 +88,60 @@ fromWritable w = do
tailRecM go unit
-- | Convert a `Transform` stream to a `Pipe`.
-- |
-- | When `Nothing` is piped to this, the `Transform` stream will
-- | be `end`ed, and the pipe will noop if invoked again.
fromTransform :: forall a b m. MonadThrow Error m => MonadAff m => O.Transform a b -> Pipe (Maybe a) (Maybe b) m Unit
fromTransform t = do
{ error: errorST, cancel: removeErrorListener } <- liftEffect $ O.withErrorST t
let
maybeThrow = traverse_ throwError =<< liftEffect (liftST $ STRef.read errorST)
fromTransformEffect
:: forall a b m
. MonadThrow Error m
=> MonadAff m
=> Effect (O.Transform a b)
-> AsyncPipe (Maybe a) (Maybe b) m Unit
fromTransformEffect = fromTransform <=< liftEffect
cleanup = do
liftAff $ O.awaitFinished t
fromReadable t
maybeThrow
liftEffect $ removeErrorListener
-- | Convert a `Transform` stream to an `AsyncPipe`.
fromTransform
:: forall a b m
. MonadThrow Error m
=> MonadAff m
=> O.Transform a b
-> AsyncPipe (Maybe a) (Maybe b) m Unit
fromTransform stream = do
{ error: errorST, cancel: removeErrorListener } <- liftEffect $ O.withErrorST stream
let
rethrow = traverse_ throwError =<< liftEffect (liftST $ ST.Ref.read errorST)
cleanup = liftEffect removeErrorListener
yieldWhileReadable = void $ whileJust $ maybeYield1
writeSignal =
liftEffect (O.isWritableEnded stream)
<#> if _ then AsyncPipe.WriteSignalEnded else AsyncPipe.WriteSignalOk
maybeYield1 = traverse (\a -> yield (Just a) $> Just unit) =<< O.maybeReadResult <$> liftEffect (O.read t)
readSignal =
liftEffect (O.isReadableEnded stream)
<#> if _ then AsyncPipe.ReadSignalEnded else AsyncPipe.ReadSignalOk
onEOS = liftEffect (O.end t) *> cleanup $> Done unit
onChunk a =
liftEffect (O.write t a)
>>= case _ of
O.WriteOk -> maybeYield1 $> Loop unit
O.WriteWouldBlock -> yieldWhileReadable $> Loop unit
writeResult WriteOk = AsyncPipe.WriteAgain
writeResult WriteWouldBlock = AsyncPipe.WriteNeedsDrain
go _ = do
maybeThrow
needsDrain <- liftEffect $ O.needsDrain t
ended <- liftEffect $ O.isWritableEnded t
if needsDrain then
liftAff (delay $ wrap 0.0) *> yieldWhileReadable $> Loop unit
else if ended then
cleanup $> Done unit
else
await >>= maybe onEOS onChunk
readResult (ReadJust a) = AsyncPipe.ReadOk (Just a)
readResult ReadWouldBlock = AsyncPipe.ReadWouldBlock
tailRecM go unit
awaitWritable = liftAff $ O.awaitWritableOrClosed stream
awaitReadable = liftAff $ O.awaitReadableOrClosed stream
awaitWrite = rethrow *> awaitWritable *> writeSignal
awaitRead = rethrow *> awaitReadable *> readSignal
whenReadNotEnded m =
liftEffect (O.isReadableEnded stream)
>>= if _ then pure $ AsyncPipe.ReadOk Nothing else m
readNow = readResult <$> liftEffect (O.read stream)
writeNow a = writeResult <$> liftEffect (O.write stream a)
read = rethrow *> whenReadNotEnded readNow
write Nothing = liftEffect (O.end stream) $> AsyncPipe.WriteEnded
write (Just a) = rethrow *> writeNow a
AsyncIO ({write, awaitWrite, read, awaitRead} /\ cleanup)
-- | Given a `Producer` of values, wrap them in `Just`.
-- |
@@ -136,7 +153,7 @@ withEOS a = do
-- | Strip a pipeline of the EOS signal
unEOS :: forall a m. Monad m => Pipe (Maybe a) a m Unit
unEOS = P.mapFoldable identity
unEOS = tailRecM (const $ maybe (pure $ Done unit) (\a -> yield a $> Loop unit) =<< await) unit
-- | Lift a `Pipe a a` to `Pipe (Maybe a) (Maybe a)`.
-- |

View File

@@ -12,31 +12,32 @@ import Node.Buffer (Buffer)
import Node.Stream.Object as O
import Node.Zlib as Zlib
import Node.Zlib.Types (ZlibStream)
import Pipes.Core (Pipe)
import Pipes.Async (AsyncPipe)
import Pipes.Node.Stream (fromTransform)
fromZlib :: forall r m. MonadAff m => MonadThrow Error m => Effect (ZlibStream r) -> Pipe (Maybe Buffer) (Maybe Buffer) m Unit
fromZlib z = do
raw <- liftEffect $ Zlib.toDuplex <$> z
fromTransform $ O.unsafeCoerceTransform raw
fromZlib :: forall r m. MonadAff m => MonadThrow Error m => Effect (ZlibStream r) -> AsyncPipe (Maybe Buffer) (Maybe Buffer) m Unit
fromZlib z =
do
raw <- liftEffect $ Zlib.toDuplex <$> z
fromTransform $ O.unsafeCoerceTransform raw
gzip :: forall m. MonadAff m => MonadThrow Error m => Pipe (Maybe Buffer) (Maybe Buffer) m Unit
gzip :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe (Maybe Buffer) (Maybe Buffer) m Unit
gzip = fromZlib Zlib.createGzip
gunzip :: forall m. MonadAff m => MonadThrow Error m => Pipe (Maybe Buffer) (Maybe Buffer) m Unit
gunzip :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe (Maybe Buffer) (Maybe Buffer) m Unit
gunzip = fromZlib Zlib.createGunzip
unzip :: forall m. MonadAff m => MonadThrow Error m => Pipe (Maybe Buffer) (Maybe Buffer) m Unit
unzip :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe (Maybe Buffer) (Maybe Buffer) m Unit
unzip = fromZlib Zlib.createUnzip
inflate :: forall m. MonadAff m => MonadThrow Error m => Pipe (Maybe Buffer) (Maybe Buffer) m Unit
inflate :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe (Maybe Buffer) (Maybe Buffer) m Unit
inflate = fromZlib Zlib.createInflate
deflate :: forall m. MonadAff m => MonadThrow Error m => Pipe (Maybe Buffer) (Maybe Buffer) m Unit
deflate :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe (Maybe Buffer) (Maybe Buffer) m Unit
deflate = fromZlib Zlib.createDeflate
brotliCompress :: forall m. MonadAff m => MonadThrow Error m => Pipe (Maybe Buffer) (Maybe Buffer) m Unit
brotliCompress :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe (Maybe Buffer) (Maybe Buffer) m Unit
brotliCompress = fromZlib Zlib.createBrotliCompress
brotliDecompress :: forall m. MonadAff m => MonadThrow Error m => Pipe (Maybe Buffer) (Maybe Buffer) m Unit
brotliDecompress :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe (Maybe Buffer) (Maybe Buffer) m Unit
brotliDecompress = fromZlib Zlib.createBrotliDecompress

View File

@@ -15,9 +15,12 @@ import Data.Either (hush)
import Data.HashSet as HashSet
import Data.Hashable (class Hashable, hash)
import Data.List.NonEmpty (NonEmptyList)
import Data.Maybe (Maybe(..), fromMaybe)
import Data.Maybe (Maybe(..), fromMaybe, maybe)
import Data.Traversable (traverse_)
import Data.Tuple.Nested (type (/\), (/\))
import Effect.Class (class MonadEffect, liftEffect)
import Node.Buffer (Buffer)
import Node.Buffer as Buffer
import Pipes (await, yield)
import Pipes as Pipes
import Pipes.Core (Pipe, Producer)
@@ -86,9 +89,38 @@ chunked size = do
a <- MaybeT await
chunkPut a
len <- lift chunkLength
when (len >= size) $ lift $ yield =<< Just <$> chunkTake
when (len >= size) do
chunk <- lift chunkTake
lift $ yield $ Just chunk
len <- chunkLength
when (len > 0) $ yield =<< Just <$> chunkTake
when (len > 0) do
chunk <- chunkTake
yield $ Just chunk
yield Nothing
-- | Buffers input to the given size before passing to subsequent pipes
buffered :: forall m. MonadEffect m => Int -> Pipe (Maybe Buffer) (Maybe Buffer) m Unit
buffered size = do
chunkST :: STRef _ (Maybe Buffer) <- liftEffect $ liftST $ STRef.new Nothing
let
chunkClear = liftEffect $ liftST $ STRef.write Nothing chunkST
chunkPeek = liftEffect $ liftST $ STRef.read chunkST
chunkLen = maybe (pure 0) (liftEffect <<< Buffer.size) =<< chunkPeek
chunkPut b = liftEffect do
new <- liftST (STRef.read chunkST) >>= maybe (pure b) (\a -> Buffer.concat [a, b])
void $ liftST $ STRef.write (Just new) chunkST
pure new
Rec.whileJust $ runMaybeT do
a <- MaybeT await
buf <- chunkPut a
len <- lift chunkLen
when (len > size) $ chunkClear *> lift (yield $ Just buf)
len <- chunkLen
chunkPeek >>= traverse_ (when (len > 0) <<< yield <<< Just)
yield Nothing
-- | Equivalent of unix `uniq`, filtering out duplicate values passed to it.

View File

@@ -3,18 +3,19 @@ module Test.Main where
import Prelude
import Data.Maybe (Maybe(..))
import Data.Time.Duration (Milliseconds(..))
import Effect (Effect)
import Effect.Aff (launchAff_)
import Test.Pipes.Node.Stream as Test.Pipes.Node.Stream
import Test.Pipes.Node.Buffer as Test.Pipes.Node.Buffer
import Test.Pipes.Node.FS as Test.Pipes.Node.FS
import Test.Pipes.Collect as Test.Pipes.Collect
import Test.Pipes.Construct as Test.Pipes.Construct
import Test.Pipes.Node.Buffer as Test.Pipes.Node.Buffer
import Test.Pipes.Node.FS as Test.Pipes.Node.FS
import Test.Pipes.Node.Stream as Test.Pipes.Node.Stream
import Test.Spec.Reporter (specReporter)
import Test.Spec.Runner (defaultConfig, runSpec')
main :: Effect Unit
main = launchAff_ $ runSpec' (defaultConfig { exit = false, timeout = Nothing }) [ specReporter ] do
main = launchAff_ $ runSpec' (defaultConfig { slow = Milliseconds 0.0, failFast = true, exit = false, timeout = Nothing }) [ specReporter ] do
Test.Pipes.Node.Stream.spec
Test.Pipes.Node.Buffer.spec
Test.Pipes.Node.FS.spec

View File

@@ -1,4 +1,16 @@
import Stream from 'stream'
import * as CBOR from "cbor-x";
import * as CSVDecode from "csv-parse";
import * as CSVEncode from "csv-stringify";
export const cborDecode = () => new CBOR.DecoderStream({useRecords: false, allowHalfOpen: true});
export const cborEncode = () => new CBOR.EncoderStream({useRecords: false, allowHalfOpen: true});
export const cborDecodeSync = a => () => CBOR.decodeMultiple(a);
export const cborEncodeSync = a => () => CBOR.encode(a, {useRecords: false});
export const csvDecode = () => CSVDecode.parse({columns: true, allowHalfOpen: true})
export const csvEncode = () => CSVEncode.stringify({header: true, allowHalfOpen: true})
export const discardTransform = () => new Stream.Transform({
transform: function(_ck, _enc, cb) {
@@ -7,6 +19,16 @@ export const discardTransform = () => new Stream.Transform({
objectMode: true
})
export const slowTransform = () => {
return new Stream.Transform({
transform: function(ck, _enc, cb) {
this.push(ck)
setTimeout(() => cb(), 4)
},
objectMode: true
})
}
export const charsTransform = () => new Stream.Transform({
transform: function(ck, _enc, cb) {
ck.split('').filter(s => !!s).forEach(s => this.push(s))

View File

@@ -2,49 +2,72 @@ module Test.Pipes.Node.Stream where
import Prelude
import Control.Monad.Trans.Class (lift)
import Control.Monad.Cont (lift)
import Control.Monad.Error.Class (liftEither)
import Control.Monad.Except (runExcept)
import Data.Array as Array
import Data.Foldable (fold)
import Data.Bifunctor (lmap)
import Data.Either (Either(..))
import Data.Foldable (fold, intercalate)
import Data.FoldableWithIndex (forWithIndex_)
import Data.FunctorWithIndex (mapWithIndex)
import Data.Int as Int
import Data.List ((:))
import Data.List as List
import Data.Maybe (Maybe)
import Data.Newtype (wrap)
import Data.Profunctor.Strong (first)
import Data.String as String
import Data.String.Gen (genAlphaString)
import Data.Traversable (for_, traverse)
import Data.Tuple.Nested (type (/\), (/\))
import Effect (Effect)
import Effect.Aff (Aff, delay)
import Effect.Class (class MonadEffect, liftEffect)
import Effect.Exception (error)
import Effect.Unsafe (unsafePerformEffect)
import Foreign (Foreign)
import Node.Buffer (Buffer)
import Node.Buffer as Buffer
import Node.Encoding (Encoding(..))
import Node.FS.Stream as FS.Stream
import Node.FS.Sync as FS
import Node.Stream.Object as O
import Node.Zlib as Zlib
import Pipes (each) as Pipes
import Pipes (each) as Pipe
import Pipes (yield, (>->))
import Pipes.Async (sync, (>-/->))
import Pipes.Collect as Pipe.Collect
import Pipes.Core (Consumer, Producer, runEffect)
import Pipes.Node.Buffer as Pipes.Buffer
import Pipes.Node.Stream as S
import Pipes.Prelude (mapFoldable, toListM) as Pipes
import Simple.JSON (writeJSON)
import Test.Common (jsonParse, jsonStringify, tmpFile, tmpFiles)
import Pipes.Node.Buffer as Pipe.Buffer
import Pipes.Node.FS as Pipe.FS
import Pipes.Node.Stream as Pipe.Node
import Pipes.Node.Zlib as Pipe.Zlib
import Pipes.Prelude (toListM) as Pipe
import Simple.JSON (readImpl, readJSON, writeJSON)
import Test.Common (jsonStringify, tmpFile, tmpFiles)
import Test.QuickCheck.Arbitrary (arbitrary)
import Test.QuickCheck.Gen (randomSample')
import Test.Spec (Spec, around, describe, it)
import Test.Spec.Assertions (shouldEqual)
import Test.Spec.Assertions (fail, shouldEqual)
foreign import readableFromArray :: forall @a. Array a -> O.Readable a
foreign import discardTransform :: forall a b. Effect (O.Transform a b)
foreign import slowTransform :: forall a b. Effect (O.Transform a b)
foreign import charsTransform :: Effect (O.Transform String String)
foreign import cborEncodeSync :: forall a. a -> Effect Buffer
foreign import cborDecodeSync :: forall a. Buffer -> Effect a
foreign import cborEncode :: forall a. Effect (O.Transform a Buffer)
foreign import cborDecode :: forall a. Effect (O.Transform Buffer a)
foreign import csvEncode :: forall a. Effect (O.Transform a String)
foreign import csvDecode :: forall a. Effect (O.Transform String a)
writer :: forall m. MonadEffect m => String -> m (O.Writable Buffer /\ Consumer (Maybe Buffer) Aff Unit)
writer a = do
stream <- liftEffect $ O.unsafeCoerceWritable <$> FS.Stream.createWriteStream a
pure $ stream /\ S.fromWritable stream
pure $ stream /\ Pipe.Node.fromWritable stream
reader :: forall m. MonadEffect m => String -> m (Producer (Maybe Buffer) Aff Unit)
reader a = liftEffect $ S.fromReadable <$> O.unsafeCoerceReadable <$> FS.Stream.createReadStream a
reader a = liftEffect $ Pipe.Node.fromReadable <$> O.unsafeCoerceReadable <$> FS.Stream.createReadStream a
spec :: Spec Unit
spec =
@@ -52,30 +75,30 @@ spec =
describe "Readable" do
describe "Readable.from(<Iterable>)" do
it "empty" do
vals <- Pipes.toListM $ (S.fromReadable $ readableFromArray @{ foo :: String } []) >-> S.unEOS
vals <- Pipe.toListM $ (Pipe.Node.fromReadable $ readableFromArray @{ foo :: String } []) >-> Pipe.Node.unEOS
vals `shouldEqual` List.Nil
it "singleton" do
vals <- Pipes.toListM $ (S.fromReadable $ readableFromArray @{ foo :: String } [ { foo: "1" } ]) >-> S.unEOS
vals <- Pipe.toListM $ (Pipe.Node.fromReadable $ readableFromArray @{ foo :: String } [ { foo: "1" } ]) >-> Pipe.Node.unEOS
vals `shouldEqual` ({ foo: "1" } : List.Nil)
it "many elements" do
let exp = (\n -> { foo: show n }) <$> Array.range 0 100
vals <- Pipes.toListM $ (S.fromReadable $ readableFromArray exp) >-> S.unEOS
vals <- Pipe.toListM $ (Pipe.Node.fromReadable $ readableFromArray exp) >-> Pipe.Node.unEOS
vals `shouldEqual` (List.fromFoldable exp)
describe "Writable" $ around tmpFile do
describe "fs.WriteStream" do
it "pipe to file" \p -> do
stream <- O.unsafeCoerceWritable <$> liftEffect (FS.Stream.createWriteStream p)
let
w = S.fromWritable stream
w = Pipe.Node.fromWritable stream
source = do
buf <- liftEffect $ Buffer.fromString "hello" UTF8
yield buf
runEffect $ S.withEOS source >-> w
runEffect $ Pipe.Node.withEOS source >-> w
contents <- liftEffect $ FS.readTextFile UTF8 p
contents `shouldEqual` "hello"
shouldEqual true =<< liftEffect (O.isWritableEnded stream)
it "async pipe to file" \p -> do
w <- S.fromWritable <$> O.unsafeCoerceWritable <$> liftEffect (FS.Stream.createWriteStream p)
w <- Pipe.Node.fromWritable <$> O.unsafeCoerceWritable <$> liftEffect (FS.Stream.createWriteStream p)
let
source = do
yield "hello, "
@@ -87,7 +110,7 @@ spec =
yield "this is a "
lift $ delay $ wrap 5.0
yield "test."
runEffect $ S.withEOS (source >-> Pipes.Buffer.fromString UTF8) >-> w
runEffect $ Pipe.Node.withEOS (source >-> Pipe.Buffer.fromString UTF8) >-> w
contents <- liftEffect $ FS.readTextFile UTF8 p
contents `shouldEqual` "hello, world! this is a test."
it "chained pipes" \p -> do
@@ -101,40 +124,105 @@ spec =
let
exp = fold (writeJSON <$> objs)
stream /\ w <- liftEffect $ writer p
runEffect $ S.withEOS (Pipes.each objs >-> jsonStringify >-> Pipes.Buffer.fromString UTF8) >-> w
runEffect $ Pipe.Node.withEOS (Pipe.each objs >-> jsonStringify >-> Pipe.Buffer.fromString UTF8) >-> w
contents <- liftEffect $ FS.readTextFile UTF8 p
contents `shouldEqual` exp
shouldEqual true =<< liftEffect (O.isWritableEnded stream)
describe "Transform" do
it "gzip" do
let
json = yield $ writeJSON { foo: "bar" }
exp = "1f8b0800000000000003ab564acbcf57b2524a4a2c52aa0500eff52bfe0d000000"
gzip <- S.fromTransform <$> O.unsafeCoerceTransform <$> liftEffect (Zlib.toDuplex <$> Zlib.createGzip)
outs :: List.List String <- Pipes.toListM (S.withEOS (json >-> Pipes.Buffer.fromString UTF8) >-> gzip >-> S.unEOS >-> Pipes.Buffer.toString Hex)
fold outs `shouldEqual` exp
around tmpFiles
$ it "file >-> gzip >-> file >-> gunzip" \(a /\ b) -> do
liftEffect $ FS.writeTextFile UTF8 a $ writeJSON [ 1, 2, 3, 4 ]
areader <- liftEffect $ reader a
bwritestream /\ bwriter <- liftEffect $ writer b
gzip <- S.fromTransform <$> O.unsafeCoerceTransform <$> liftEffect (Zlib.toDuplex <$> Zlib.createGzip)
runEffect $ areader >-> gzip >-> bwriter
shouldEqual true =<< liftEffect (O.isWritableEnded bwritestream)
let
bignums = Array.range 1 1000
firstNames = String.split (wrap "\n") $ unsafePerformEffect (FS.readTextFile UTF8 "./test/Test/first_names.txt")
lastNames = String.split (wrap "\n") $ unsafePerformEffect (FS.readTextFile UTF8 "./test/Test/last_names.txt")
names n = do
first <- firstNames
last <- Array.take (Int.round $ Int.toNumber n / Int.toNumber (Array.length firstNames)) lastNames
pure $ first <> " " <> last
people n = mapWithIndex (\ix name -> {id: show $ ix + 1, name}) (names n)
peopleCSV n = "id,name\n" <> intercalate "\n" ((\{id, name} -> id <> "," <> name) <$> people n)
gunzip <- S.fromTransform <$> O.unsafeCoerceTransform <$> liftEffect (Zlib.toDuplex <$> Zlib.createGunzip)
breader <- liftEffect $ reader b
nums <- Pipes.toListM (breader >-> gunzip >-> S.unEOS >-> Pipes.Buffer.toString UTF8 >-> jsonParse @(Array Int) >-> Pipes.mapFoldable identity)
Array.fromFoldable nums `shouldEqual` [ 1, 2, 3, 4 ]
for_ [4000, 8000, 32000, 64000, 200000] \n -> do
let
csv = peopleCSV n
people' = people n
around tmpFiles
$ it (show n <> " row csv >-/-> csv-parse >-/-> cborEncode") \(a /\ _) -> do
liftEffect $ FS.writeTextFile UTF8 a csv
cbor :: Buffer <- Pipe.Collect.toBuffer
$ Pipe.FS.read a
>-> Pipe.Node.inEOS (Pipe.Buffer.toString UTF8)
>-/-> Pipe.Node.fromTransformEffect csvDecode
>-/-> Pipe.Node.fromTransformEffect cborEncode
>-> Pipe.Node.unEOS
f :: Array Foreign <- liftEffect $ cborDecodeSync cbor
ppl <- traverse (liftEither <<< lmap (error <<< show) <<< runExcept <<< readImpl) f
ppl `shouldEqual` people'
around tmpFiles
$ it (show n <> " row csv >-> sync csv-parse >-> sync cborEncode") \(a /\ _) -> do
liftEffect $ FS.writeTextFile UTF8 a csv
cbor :: Buffer <- Pipe.Collect.toBuffer
$ Pipe.FS.read a
>-> Pipe.Node.inEOS (Pipe.Buffer.toString UTF8)
>-> sync (Pipe.Node.fromTransformEffect csvDecode)
>-> sync (Pipe.Node.fromTransformEffect cborEncode)
>-> Pipe.Node.unEOS
f :: Array Foreign <- liftEffect $ cborDecodeSync cbor
ppl <- traverse (liftEither <<< lmap (error <<< show) <<< runExcept <<< readImpl) f
ppl `shouldEqual` people'
around tmpFiles
$ it "file >-> sync gzip >-> sync gunzip" \(a /\ _) -> do
liftEffect $ FS.writeTextFile UTF8 a $ writeJSON bignums
json <- Pipe.Collect.toMonoid
$ Pipe.FS.read a
>-> sync Pipe.Zlib.gzip
>-> sync Pipe.Zlib.gunzip
>-> Pipe.Node.unEOS
>-> Pipe.Buffer.toString UTF8
readJSON json `shouldEqual` (Right bignums)
around tmpFiles
$ it "file >-/-> gzip >-/-> slow >-/-> gunzip" \(a /\ _) -> do
liftEffect $ FS.writeTextFile UTF8 a $ writeJSON bignums
json <-
Pipe.Collect.toMonoid
$ Pipe.FS.read a
>-/-> Pipe.Zlib.gzip
>-/-> Pipe.Node.fromTransformEffect slowTransform
>-/-> Pipe.Zlib.gunzip
>-> Pipe.Node.unEOS
>-> Pipe.Buffer.toString UTF8
readJSON json `shouldEqual` (Right bignums)
around tmpFiles
$ it "file >-> sync gzip >-> sync slow >-> sync gunzip" \(a /\ _) -> do
liftEffect $ FS.writeTextFile UTF8 a $ writeJSON bignums
json <-
Pipe.Collect.toMonoid
$ Pipe.FS.read a
>-> sync Pipe.Zlib.gzip
>-> sync (Pipe.Node.fromTransformEffect slowTransform)
>-> sync Pipe.Zlib.gunzip
>-> Pipe.Node.unEOS
>-> Pipe.Buffer.toString UTF8
readJSON json `shouldEqual` (Right bignums)
around tmpFile $ it "file >-> discardTransform" \(p :: String) -> do
liftEffect $ FS.writeTextFile UTF8 p "foo"
r <- reader p
discard' <- liftEffect discardTransform
out :: List.List Int <- Pipes.toListM $ r >-> S.fromTransform discard' >-> S.unEOS
out :: List.List Int <-
Pipe.toListM
$ r
>-/-> Pipe.Node.fromTransformEffect discardTransform
>-> Pipe.Node.unEOS
out `shouldEqual` List.Nil
around tmpFile $ it "file >-> charsTransform" \(p :: String) -> do
liftEffect $ FS.writeTextFile UTF8 p "foo bar"
r <- reader p
chars' <- liftEffect charsTransform
out :: List.List String <- Pipes.toListM $ r >-> S.inEOS (Pipes.Buffer.toString UTF8) >-> S.fromTransform chars' >-> S.unEOS
out :: List.List String <-
Pipe.toListM $
r
>-> Pipe.Node.inEOS (Pipe.Buffer.toString UTF8)
>-/-> Pipe.Node.fromTransformEffect charsTransform
>-> Pipe.Node.unEOS
out `shouldEqual` List.fromFoldable [ "f", "o", "o", " ", "b", "a", "r" ]

4095
test/Test/first_names.txt Normal file

File diff suppressed because it is too large Load Diff

4096
test/Test/last_names.txt Normal file

File diff suppressed because it is too large Load Diff