8 Commits

17 changed files with 466 additions and 171 deletions

View File

@@ -1,6 +1,7 @@
{
"name": "purescript-csv-stream",
"version": "v1.0.5",
"name": "purescript-node-stream-pipes",
"version": "v1.2.2",
"type": "module",
"dependencies": {
"csv-parse": "^5.5.5",
"csv-stringify": "^6.4.6"

View File

@@ -9,6 +9,8 @@ workspace:
- either: ">=6.1.0 <7.0.0"
- exceptions: ">=6.0.0 <7.0.0"
- foldable-traversable: ">=6.0.0 <7.0.0"
- foreign-object
- lists
- maybe: ">=6.0.0 <7.0.0"
- mmorph: ">=7.0.0 <8.0.0"
- newtype: ">=5.0.0 <6.0.0"
@@ -18,6 +20,7 @@ workspace:
- node-path: ">=5.0.0 <6.0.0"
- node-streams: ">=9.0.0 <10.0.0"
- node-zlib: ">=0.4.0 <0.5.0"
- ordered-collections
- parallel: ">=6.0.0 <7.0.0"
- pipes: ">=8.0.0 <9.0.0"
- prelude: ">=6.0.1 <7.0.0"
@@ -25,6 +28,8 @@ workspace:
- strings: ">=6.0.1 <7.0.0"
- tailrec: ">=6.1.0 <7.0.0"
- transformers: ">=6.0.0 <7.0.0"
- tuples
- unordered-collections
- unsafe-coerce: ">=6.0.0 <7.0.0"
test_dependencies:
- console
@@ -105,6 +110,7 @@ workspace:
- type-equality
- typelevel-prelude
- unfoldable
- unordered-collections
- unsafe-coerce
- variant
extra_packages: {}
@@ -903,6 +909,21 @@ packages:
- partial
- prelude
- tuples
unordered-collections:
type: registry
version: 3.1.0
integrity: sha256-H2eQR+ylI+cljz4XzWfEbdF7ee+pnw2IZCeq69AuJ+Q=
dependencies:
- arrays
- enums
- functions
- integers
- lists
- prelude
- record
- tuples
- typelevel-prelude
- unfoldable
unsafe-coerce:
type: registry
version: 6.0.0

View File

@@ -1,7 +1,7 @@
package:
name: node-stream-pipes
publish:
version: '1.0.5'
version: '1.2.2'
license: 'GPL-3.0-or-later'
location:
githubOwner: 'cakekindel'
@@ -16,6 +16,8 @@ package:
- either: ">=6.1.0 <7.0.0"
- exceptions: ">=6.0.0 <7.0.0"
- foldable-traversable: ">=6.0.0 <7.0.0"
- foreign-object: ">=4.1.0 <5.0.0"
- lists: ">=7.0.0 <8.0.0"
- maybe: ">=6.0.0 <7.0.0"
- mmorph: ">=7.0.0 <8.0.0"
- newtype: ">=5.0.0 <6.0.0"
@@ -25,6 +27,7 @@ package:
- node-path: ">=5.0.0 <6.0.0"
- node-streams: ">=9.0.0 <10.0.0"
- node-zlib: ">=0.4.0 <0.5.0"
- ordered-collections: ">=3.2.0 <4.0.0"
- parallel: ">=6.0.0 <7.0.0"
- pipes: ">=8.0.0 <9.0.0"
- prelude: ">=6.0.1 <7.0.0"
@@ -32,6 +35,8 @@ package:
- strings: ">=6.0.1 <7.0.0"
- tailrec: ">=6.1.0 <7.0.0"
- transformers: ">=6.0.0 <7.0.0"
- tuples: ">=7.0.0 <8.0.0"
- unordered-collections: ">=3.1.0 <4.0.0"
- unsafe-coerce: ">=6.0.0 <7.0.0"
test:
main: Test.Main

View File

@@ -1,35 +1,39 @@
import Stream from "stream";
/** @type {(s: Stream.Readable | Stream.Transform) => () => boolean} */
export const isReadableImpl = s => () => s.readable
export const isReadableImpl = (s) => () => s.readable;
/** @type {(s: Stream.Writable | Stream.Readable) => () => boolean} */
export const isClosedImpl = s => () => s.closed
export const isClosedImpl = (s) => () => s.closed;
/** @type {(s: Stream.Writable | Stream.Transform) => () => boolean} */
export const isWritableImpl = s => () => s.writable
export const isWritableImpl = (s) => () => s.writable;
/** @type {(s: Stream.Readable | Stream.Transform) => () => boolean} */
export const isReadableEndedImpl = s => () => s.readableEnded
export const isReadableEndedImpl = (s) => () => s.readableEnded;
/** @type {(s: Stream.Writable | Stream.Transform) => () => boolean} */
export const isWritableEndedImpl = s => () => s.writableEnded
export const isWritableEndedImpl = (s) => () => s.writableEnded;
/** @type {(s: Stream.Writable | Stream.Transform) => () => void} */
export const endImpl = (s) => () => s.end();
/** @type {<WriteResult>(o: {ok: WriteResult, wouldBlock: WriteResult, closed: WriteResult}) => (s: Stream.Writable | Stream.Transform) => (a: unknown) => () => WriteResult} */
export const writeImpl = ({ok, wouldBlock, closed}) => (s) => (a) => () => {
if (s.closed || s.writableEnded) {
return closed
}
export const writeImpl =
({ ok, wouldBlock, closed }) =>
(s) =>
(a) =>
() => {
if (s.closed || s.writableEnded) {
return closed;
}
if (s.write(a)) {
return ok
} else {
return wouldBlock
}
}
if (s.write(a)) {
return ok;
} else {
return wouldBlock;
}
};
/** @type {<ReadResult>(o: {just: (_a: unknown) => ReadResult, wouldBlock: ReadResult, closed: ReadResult}) => (s: Stream.Readable | Stream.Transform) => () => ReadResult} */
export const readImpl =

View File

@@ -28,6 +28,7 @@ data ReadResult a
= ReadWouldBlock
| ReadClosed
| ReadJust a
derive instance Generic (ReadResult a) _
derive instance Functor ReadResult
derive instance Eq a => Eq (ReadResult a)
@@ -38,9 +39,11 @@ data WriteResult
= WriteWouldBlock
| WriteClosed
| WriteOk
derive instance Generic WriteResult _
derive instance Eq WriteResult
instance Show WriteResult where show = genericShow
instance Show WriteResult where
show = genericShow
type ReadResultFFI a = { closed :: ReadResult a, wouldBlock :: ReadResult a, just :: a -> ReadResult a }
type WriteResultFFI = { closed :: WriteResult, wouldBlock :: WriteResult, ok :: WriteResult }
@@ -59,10 +62,10 @@ foreign import isWritableEndedImpl :: forall s. s -> Effect Boolean
foreign import isClosedImpl :: forall s. s -> Effect Boolean
readResultFFI :: forall a. ReadResultFFI a
readResultFFI = {closed: ReadClosed, wouldBlock: ReadWouldBlock, just: ReadJust}
readResultFFI = { closed: ReadClosed, wouldBlock: ReadWouldBlock, just: ReadJust }
writeResultFFI :: WriteResultFFI
writeResultFFI = {closed: WriteClosed, wouldBlock: WriteWouldBlock, ok: WriteOk}
writeResultFFI = { closed: WriteClosed, wouldBlock: WriteWouldBlock, ok: WriteOk }
class Stream :: Type -> Constraint
class Stream s where
@@ -117,11 +120,11 @@ else instance (Write s a) => Write s a where
write s a = write s a
end s = end s
withErrorST :: forall s. Stream s => s -> Effect {cancel :: Effect Unit, error :: STRef Global (Maybe Error)}
withErrorST :: forall s. Stream s => s -> Effect { cancel :: Effect Unit, error :: STRef Global (Maybe Error) }
withErrorST s = do
error <- liftST $ STRef.new Nothing
cancel <- flip (Event.once errorH) s \e -> void $ liftST $ STRef.write (Just e) error
pure {error, cancel}
pure { error, cancel }
fromBufferReadable :: forall r. Stream.Readable r -> Readable Buffer
fromBufferReadable = unsafeCoerce
@@ -147,7 +150,7 @@ awaitReadableOrClosed s = do
ended <- liftEffect $ isReadableEnded s
readable <- liftEffect $ isReadable s
when (not ended && not closed && not readable)
$ liftEither =<< parOneOf [onceAff0 readableH s $> Right unit, onceAff0 closeH s $> Right unit, Left <$> onceAff1 errorH s]
$ liftEither =<< parOneOf [ onceAff0 readableH s $> Right unit, onceAff0 closeH s $> Right unit, Left <$> onceAff1 errorH s ]
awaitWritableOrClosed :: forall s a. Write s a => s -> Aff Unit
awaitWritableOrClosed s = do
@@ -155,7 +158,7 @@ awaitWritableOrClosed s = do
ended <- liftEffect $ isWritableEnded s
writable <- liftEffect $ isWritable s
when (not ended && not closed && not writable)
$ liftEither =<< parOneOf [onceAff0 drainH s $> Right unit, onceAff0 closeH s $> Right unit, Left <$> onceAff1 errorH s]
$ liftEither =<< parOneOf [ onceAff0 drainH s $> Right unit, onceAff0 closeH s $> Right unit, Left <$> onceAff1 errorH s ]
onceAff0 :: forall e. EventHandle0 e -> e -> Aff Unit
onceAff0 h emitter = makeAff \res -> do

View File

@@ -2,17 +2,81 @@ module Pipes.Collect where
import Prelude
import Control.Monad.Rec.Class (class MonadRec)
import Control.Monad.Maybe.Trans (MaybeT(..), runMaybeT)
import Control.Monad.Rec.Class (class MonadRec, Step(..), tailRecM)
import Control.Monad.ST.Class (liftST)
import Control.Monad.Trans.Class (lift)
import Data.Array.ST as Array.ST
import Data.Either (hush)
import Data.HashMap (HashMap)
import Data.HashMap as HashMap
import Data.Hashable (class Hashable)
import Data.List (List)
import Data.List as List
import Data.Map (Map)
import Data.Map as Map
import Data.Maybe (fromMaybe)
import Data.Tuple.Nested (type (/\), (/\))
import Effect.Class (class MonadEffect, liftEffect)
import Pipes (for) as Pipes
import Foreign.Object (Object)
import Foreign.Object.ST as Object.ST
import Foreign.Object.ST.Unsafe as Object.ST.Unsafe
import Pipes (next) as Pipes
import Pipes.Core (Producer)
import Pipes.Core (runEffect) as Pipes
-- | Traverse a pipe, collecting into a mutable array with constant stack usage
collectArray :: forall a m. MonadRec m => MonadEffect m => Producer a m Unit -> m (Array a)
collectArray p = do
-- | Fold every value produced
-- |
-- | Uses `MonadRec`, supporting producers of arbitrary length.
fold :: forall a b m. MonadRec m => (b -> a -> b) -> b -> Producer a m Unit -> m b
fold f b p =
let
insertNext b' p' = runMaybeT do
a /\ p'' <- MaybeT $ hush <$> Pipes.next p'
pure $ Loop $ f b' a /\ p''
in
flip tailRecM (b /\ p) \(b' /\ p') -> fromMaybe (Done b') <$> insertNext b' p'
-- | Fold every value produced with a monadic action
-- |
-- | Uses `MonadRec`, supporting producers of arbitrary length.
traverse :: forall a b m. MonadRec m => (b -> a -> m b) -> b -> Producer a m Unit -> m b
traverse f b p =
let
insertNext b' p' = runMaybeT do
a /\ p'' <- MaybeT $ hush <$> Pipes.next p'
b'' <- lift $ f b' a
pure $ Loop $ b'' /\ p''
in
flip tailRecM (b /\ p) \(b' /\ p') -> fromMaybe (Done b') <$> insertNext b' p'
-- | Execute a monadic action on every item in a producer.
-- |
-- | Uses `MonadRec`, supporting producers of arbitrary length.
foreach :: forall a m. MonadRec m => (a -> m Unit) -> Producer a m Unit -> m Unit
foreach f = traverse (const f) unit
-- | Collect all values from a `Producer` into an array.
toArray :: forall a m. MonadRec m => MonadEffect m => Producer a m Unit -> m (Array a)
toArray p = do
st <- liftEffect $ liftST $ Array.ST.new
Pipes.runEffect $ Pipes.for p \a -> void $ liftEffect $ liftST $ Array.ST.push a st
foreach (void <<< liftEffect <<< liftST <<< flip Array.ST.push st) p
liftEffect $ liftST $ Array.ST.unsafeFreeze st
-- | Collect all values from a `Producer` into a list.
toList :: forall a m. MonadRec m => MonadEffect m => Producer a m Unit -> m (List a)
toList = map List.reverse <<< fold (flip List.Cons) List.Nil
-- | Collect all values from a `Producer` into a Javascript Object.
toObject :: forall a m. MonadRec m => MonadEffect m => Producer (String /\ a) m Unit -> m (Object a)
toObject p = do
st <- liftEffect $ liftST $ Object.ST.new
foreach (\(k /\ v) -> void $ liftEffect $ liftST $ Object.ST.poke k v st) p
liftEffect $ liftST $ Object.ST.Unsafe.unsafeFreeze st
-- | Collect all values from a `Producer` into a `HashMap`
toHashMap :: forall k v m. Hashable k => MonadRec m => Producer (k /\ v) m Unit -> m (HashMap k v)
toHashMap = fold (\map (k /\ v) -> HashMap.insert k v map) HashMap.empty
-- | Collect all values from a `Producer` into a `Map`
toMap :: forall k v m. Ord k => MonadRec m => Producer (k /\ v) m Unit -> m (Map k v)
toMap = fold (\map (k /\ v) -> Map.insert k v map) Map.empty

1
src/Pipes.Construct.purs Normal file
View File

@@ -0,0 +1 @@
module Pipes.Construct where

View File

@@ -2,9 +2,11 @@ module Pipes.Node.FS where
import Prelude
import Control.Monad.Error.Class (class MonadThrow)
import Data.Maybe (Maybe)
import Effect.Aff (Aff)
import Effect.Aff.Class (class MonadAff)
import Effect.Class (liftEffect)
import Effect.Exception (Error)
import Node.Buffer (Buffer)
import Node.FS.Stream (WriteStreamOptions)
import Node.FS.Stream as FS.Stream
@@ -22,9 +24,13 @@ import Prim.Row (class Union)
-- | See `Pipes.Node.Stream.withEOS` for converting `Producer a`
-- | into `Producer (Maybe a)`, emitting `Nothing` before exiting.
write
:: forall r trash
. Union r trash WriteStreamOptions
=> Record r -> FilePath -> Consumer (Maybe Buffer) Aff Unit
:: forall r trash m
. Union r trash WriteStreamOptions
=> MonadAff m
=> MonadThrow Error m
=> Record r
-> FilePath
-> Consumer (Maybe Buffer) m Unit
write o p = do
w <- liftEffect $ FS.Stream.createWriteStream' p o
fromWritable $ O.fromBufferWritable w
@@ -32,26 +38,26 @@ write o p = do
-- | Open a file in write mode, failing if the file already exists.
-- |
-- | `write {flags: "wx"}`
create :: FilePath -> Consumer (Maybe Buffer) Aff Unit
create = write {flags: "wx"}
create :: forall m. MonadAff m => MonadThrow Error m => FilePath -> Consumer (Maybe Buffer) m Unit
create = write { flags: "wx" }
-- | Open a file in write mode, truncating it if the file already exists.
-- |
-- | `write {flags: "w"}`
truncate :: FilePath -> Consumer (Maybe Buffer) Aff Unit
truncate = write {flags: "w"}
truncate :: forall m. MonadAff m => MonadThrow Error m => FilePath -> Consumer (Maybe Buffer) m Unit
truncate = write { flags: "w" }
-- | Open a file in write mode, appending written contents if the file already exists.
-- |
-- | `write {flags: "a"}`
append :: FilePath -> Consumer (Maybe Buffer) Aff Unit
append = write {flags: "a"}
append :: forall m. MonadAff m => MonadThrow Error m => FilePath -> Consumer (Maybe Buffer) m Unit
append = write { flags: "a" }
-- | Creates a `fs.Readable` stream for the file at the given path.
-- |
-- | Emits `Nothing` before closing. To opt out of this behavior,
-- | use `Pipes.Node.Stream.withoutEOS` or `Pipes.Node.Stream.unEOS`.
read :: FilePath -> Producer (Maybe Buffer) Aff Unit
read :: forall m. MonadAff m => MonadThrow Error m => FilePath -> Producer (Maybe Buffer) m Unit
read p = do
r <- liftEffect $ FS.Stream.createReadStream p
fromReadable $ O.fromBufferReadable r

View File

@@ -2,53 +2,57 @@ module Pipes.Node.Stream where
import Prelude
import Control.Monad.Error.Class (throwError)
import Control.Monad.Rec.Class (Step(..), tailRecM, whileJust)
import Control.Monad.Error.Class (class MonadThrow, throwError)
import Control.Monad.Rec.Class (class MonadRec, Step(..), tailRecM)
import Control.Monad.ST.Class (liftST)
import Control.Monad.ST.Ref as STRef
import Control.Monad.Trans.Class (lift)
import Data.Maybe (Maybe(..), maybe)
import Data.Maybe (Maybe(..))
import Data.Newtype (wrap)
import Data.Traversable (for_)
import Effect.Aff (Aff, delay)
import Effect.Aff.Class (liftAff)
import Data.Tuple.Nested ((/\))
import Effect.Aff (delay)
import Effect.Aff.Class (class MonadAff, liftAff)
import Effect.Class (liftEffect)
import Effect.Exception (Error)
import Node.Stream.Object as O
import Pipes (await, yield, (>->))
import Pipes (await, yield)
import Pipes (for) as P
import Pipes.Core (Consumer, Pipe, Producer, Producer_)
import Pipes.Prelude (mapFoldable, map) as P
import Pipes.Prelude (mapFoldable) as P
import Pipes.Util (InvokeResult(..), invoke)
-- | Convert a `Readable` stream to a `Pipe`.
-- |
-- | This will yield `Nothing` before exiting, signaling
-- | End-of-stream.
fromReadable :: forall s a. O.Read s a => s -> Producer_ (Maybe a) Aff Unit
fromReadable :: forall s a m. MonadThrow Error m => MonadAff m => O.Read s a => s -> Producer_ (Maybe a) m Unit
fromReadable r =
let
cleanup rmErrorListener = do
liftEffect rmErrorListener
pure $ Done unit
go {error, cancel} = do
go { error, cancel } = do
liftAff $ delay $ wrap 0.0
err <- liftEffect $ liftST $ STRef.read error
for_ err throwError
res <- liftEffect $ O.read r
case res of
O.ReadJust a -> yield (Just a) $> Loop {error, cancel}
O.ReadWouldBlock -> lift (O.awaitReadableOrClosed r) $> Loop {error, cancel}
O.ReadJust a -> yield (Just a) $> Loop { error, cancel }
O.ReadWouldBlock -> liftAff (O.awaitReadableOrClosed r) $> Loop { error, cancel }
O.ReadClosed -> yield Nothing *> cleanup cancel
in do
e <- liftEffect $ O.withErrorST r
tailRecM go e
in
do
e <- liftEffect $ O.withErrorST r
tailRecM go e
-- | Convert a `Writable` stream to a `Pipe`.
-- |
-- | When `Nothing` is piped to this, the stream will
-- | be `end`ed, and the pipe will noop if invoked again.
fromWritable :: forall s a. O.Write s a => s -> Consumer (Maybe a) Aff Unit
fromWritable :: forall s a m. MonadThrow Error m => MonadAff m => O.Write s a => s -> Consumer (Maybe a) m Unit
fromWritable w =
let
cleanup rmErrorListener = do
@@ -56,7 +60,7 @@ fromWritable w =
liftEffect $ O.end w
pure $ Done unit
go {error, cancel} = do
go { error, cancel } = do
liftAff $ delay $ wrap 0.0
err <- liftEffect $ liftST $ STRef.read error
for_ err throwError
@@ -67,20 +71,21 @@ fromWritable w =
Just a -> do
res <- liftEffect $ O.write w a
case res of
O.WriteOk -> pure $ Loop {error, cancel}
O.WriteOk -> pure $ Loop { error, cancel }
O.WriteWouldBlock -> do
liftAff (O.awaitWritableOrClosed w)
pure $ Loop {error, cancel}
pure $ Loop { error, cancel }
O.WriteClosed -> cleanup cancel
in do
r <- liftEffect $ O.withErrorST w
tailRecM go r
in
do
r <- liftEffect $ O.withErrorST w
tailRecM go r
-- | Convert a `Transform` stream to a `Pipe`.
-- |
-- | When `Nothing` is piped to this, the `Transform` stream will
-- | be `end`ed, and the pipe will noop if invoked again.
fromTransform :: forall a b. O.Transform a b -> Pipe (Maybe a) (Maybe b) Aff Unit
fromTransform :: forall a b m. MonadThrow Error m => MonadAff m => O.Transform a b -> Pipe (Maybe a) (Maybe b) m Unit
fromTransform t =
let
cleanup removeErrorListener = do
@@ -94,7 +99,7 @@ fromTransform t =
O.ReadJust a -> yield (Just a)
O.ReadWouldBlock -> pure unit
O.ReadClosed -> yield Nothing *> pure unit
go {error, cancel} = do
go { error, cancel } = do
liftAff $ delay $ wrap 0.0
err <- liftEffect $ liftST $ STRef.read error
for_ err throwError
@@ -107,24 +112,25 @@ fromTransform t =
yieldFromReadableHalf
case res of
O.WriteClosed -> cleanup cancel
O.WriteOk -> pure $ Loop {error, cancel}
O.WriteOk -> pure $ Loop { error, cancel }
O.WriteWouldBlock -> do
lift (O.awaitWritableOrClosed t)
pure $ Loop {error, cancel}
in do
r <- liftEffect $ O.withErrorST t
tailRecM go r
liftAff $ O.awaitWritableOrClosed t
pure $ Loop { error, cancel }
in
do
r <- liftEffect $ O.withErrorST t
tailRecM go r
-- | Given a `Producer` of values, wrap them in `Just`.
-- |
-- | Before the `Producer` exits, emits `Nothing` as an End-of-stream signal.
withEOS :: forall a. Producer a Aff Unit -> Producer (Maybe a) Aff Unit
withEOS :: forall a m. Monad m => Producer a m Unit -> Producer (Maybe a) m Unit
withEOS a = do
P.for a (yield <<< Just)
yield Nothing
-- | Strip a pipeline of the EOS signal
unEOS :: forall a. Pipe (Maybe a) a Aff Unit
unEOS :: forall a m. Monad m => Pipe (Maybe a) a m Unit
unEOS = P.mapFoldable identity
-- | Lift a `Pipe a a` to `Pipe (Maybe a) (Maybe a)`.
@@ -137,8 +143,16 @@ unEOS = P.mapFoldable identity
-- | `Just` values will be passed to the pipe, and the response(s) will be wrapped in `Just`.
-- |
-- | `Nothing` will bypass the given pipe entirely, and the pipe will not be invoked again.
inEOS :: forall a b. Pipe a b Aff Unit -> Pipe (Maybe a) (Maybe b) Aff Unit
inEOS p = whileJust do
inEOS :: forall a b m. MonadRec m => Pipe a b m Unit -> Pipe (Maybe a) (Maybe b) m Unit
inEOS p = flip tailRecM p \p' -> do
ma <- await
maybe (yield Nothing) (\a -> yield a >-> p >-> P.map Just) ma
pure $ void ma
case ma of
Just a -> do
res <- lift $ invoke p' a
case res of
Yielded (as /\ p'') -> do
for_ (Just <$> as) yield
pure $ Loop p''
DidNotYield p'' -> pure $ Loop p''
Exited -> yield Nothing $> Done unit
_ -> yield Nothing $> Done unit

View File

@@ -2,10 +2,12 @@ module Pipes.Node.Zlib where
import Prelude
import Control.Monad.Error.Class (class MonadThrow)
import Data.Maybe (Maybe)
import Effect (Effect)
import Effect.Aff (Aff)
import Effect.Aff.Class (class MonadAff)
import Effect.Class (liftEffect)
import Effect.Exception (Error)
import Node.Buffer (Buffer)
import Node.Stream.Object as O
import Node.Zlib as Zlib
@@ -13,28 +15,28 @@ import Node.Zlib.Types (ZlibStream)
import Pipes.Core (Pipe)
import Pipes.Node.Stream (fromTransform)
fromZlib :: forall r. Effect (ZlibStream r) -> Pipe (Maybe Buffer) (Maybe Buffer) Aff Unit
fromZlib :: forall r m. MonadAff m => MonadThrow Error m => Effect (ZlibStream r) -> Pipe (Maybe Buffer) (Maybe Buffer) m Unit
fromZlib z = do
raw <- liftEffect $ Zlib.toDuplex <$> z
fromTransform $ O.fromBufferTransform raw
gzip :: Pipe (Maybe Buffer) (Maybe Buffer) Aff Unit
gzip :: forall m. MonadAff m => MonadThrow Error m => Pipe (Maybe Buffer) (Maybe Buffer) m Unit
gzip = fromZlib Zlib.createGzip
gunzip :: Pipe (Maybe Buffer) (Maybe Buffer) Aff Unit
gunzip :: forall m. MonadAff m => MonadThrow Error m => Pipe (Maybe Buffer) (Maybe Buffer) m Unit
gunzip = fromZlib Zlib.createGunzip
unzip :: Pipe (Maybe Buffer) (Maybe Buffer) Aff Unit
unzip :: forall m. MonadAff m => MonadThrow Error m => Pipe (Maybe Buffer) (Maybe Buffer) m Unit
unzip = fromZlib Zlib.createUnzip
inflate :: Pipe (Maybe Buffer) (Maybe Buffer) Aff Unit
inflate :: forall m. MonadAff m => MonadThrow Error m => Pipe (Maybe Buffer) (Maybe Buffer) m Unit
inflate = fromZlib Zlib.createInflate
deflate :: Pipe (Maybe Buffer) (Maybe Buffer) Aff Unit
deflate :: forall m. MonadAff m => MonadThrow Error m => Pipe (Maybe Buffer) (Maybe Buffer) m Unit
deflate = fromZlib Zlib.createDeflate
brotliCompress :: Pipe (Maybe Buffer) (Maybe Buffer) Aff Unit
brotliCompress :: forall m. MonadAff m => MonadThrow Error m => Pipe (Maybe Buffer) (Maybe Buffer) m Unit
brotliCompress = fromZlib Zlib.createBrotliCompress
brotliDecompress :: Pipe (Maybe Buffer) (Maybe Buffer) Aff Unit
brotliDecompress :: forall m. MonadAff m => MonadThrow Error m => Pipe (Maybe Buffer) (Maybe Buffer) m Unit
brotliDecompress = fromZlib Zlib.createBrotliDecompress

View File

@@ -33,7 +33,7 @@ split pat = do
Nothing -> void $ liftEffect $ liftST $ Array.ST.push chunk buf
Just ix -> do
let
{before, after} = String.splitAt ix chunk
{ before, after } = String.splitAt ix chunk
len <- liftEffect $ liftST $ Array.ST.length buf
buf' <- liftEffect $ liftST $ Array.ST.splice 0 len [] buf
lift $ yield $ Just $ (fold buf') <> before

View File

@@ -3,17 +3,22 @@ module Pipes.Util where
import Prelude
import Control.Monad.Maybe.Trans (MaybeT(..), runMaybeT)
import Control.Monad.Rec.Class (whileJust)
import Control.Monad.Rec.Class (class MonadRec, forever, whileJust)
import Control.Monad.ST.Class (liftST)
import Control.Monad.ST.Ref (STRef)
import Control.Monad.ST.Ref as STRef
import Control.Monad.Trans.Class (lift)
import Data.Array.ST (STArray)
import Data.Array.ST as Array.ST
import Data.HashSet as HashSet
import Data.Hashable (class Hashable, hash)
import Data.List.NonEmpty (NonEmptyList)
import Data.Maybe (Maybe(..))
import Data.Tuple.Nested (type (/\), (/\))
import Effect.Class (class MonadEffect, liftEffect)
import Pipes (await, yield)
import Pipes.Core (Pipe)
import Pipes.Internal (Proxy(..))
-- | Yields a separator value `sep` between received values
-- |
@@ -62,3 +67,57 @@ chunked size = do
when (len >= size) $ lift $ yield =<< Just <$> chunkTake
yield =<< Just <$> chunkTake
yield Nothing
-- | Equivalent of unix `uniq`, filtering out duplicate values passed to it.
-- |
-- | Uses a `HashSet` of hashes of `a`; for `n` elements `awaited`, this pipe
-- | will occupy O(n) space, and `yield` in O(1) time.
uniqHash :: forall a m. Hashable a => MonadEffect m => MonadRec m => Pipe a a m Unit
uniqHash = do
seenHashesST <- liftEffect $ liftST $ STRef.new HashSet.empty
forever do
a <- await
seenHashes <- liftEffect $ liftST $ STRef.read seenHashesST
when (not $ HashSet.member (hash a) seenHashes) do
void $ liftEffect $ liftST $ STRef.modify (HashSet.insert $ hash a) seenHashesST
yield a
-- | The result of a single step forward of a pipe.
data InvokeResult a b m
-- | The pipe `await`ed the value, but did not `yield` a response.
= DidNotYield (Pipe a b m Unit)
-- | The pipe `await`ed the value, and `yield`ed 1 or more responses.
| Yielded (NonEmptyList b /\ Pipe a b m Unit)
-- | The pipe `await`ed the value, and exited.
| Exited
data IntermediateInvokeResult a b m
= IDidNotYield (Pipe a b m Unit)
| IYielded (NonEmptyList b /\ Pipe a b m Unit)
| IDidNotAwait (Pipe a b m Unit)
-- | Pass a single value to a pipe, returning the result of the pipe's invocation.
invoke :: forall m a b. Monad m => Pipe a b m Unit -> a -> m (InvokeResult a b m)
invoke m a =
let
go :: IntermediateInvokeResult a b m -> m (InvokeResult a b m)
go (IYielded (as /\ n)) =
case n of
Request _ _ -> pure $ Yielded $ as /\ n
Respond rep f -> go (IYielded $ (as <> pure rep) /\ f unit)
M o -> go =<< IYielded <$> (as /\ _) <$> o
Pure _ -> pure Exited
go (IDidNotYield n) =
case n of
Request _ _ -> pure $ DidNotYield n
Respond rep f -> go (IYielded $ pure rep /\ f unit)
M o -> go =<< IDidNotYield <$> o
Pure _ -> pure Exited
go (IDidNotAwait n) =
case n of
Request _ f -> go (IDidNotYield (f a))
Respond rep f -> go (IYielded $ pure rep /\ f unit)
M o -> go =<< IDidNotAwait <$> o
Pure _ -> pure Exited
in
go (IDidNotAwait m)

View File

@@ -8,6 +8,7 @@ import Effect.Aff (launchAff_)
import Test.Pipes.Node.Stream as Test.Pipes.Node.Stream
import Test.Pipes.Node.Buffer as Test.Pipes.Node.Buffer
import Test.Pipes.Node.FS as Test.Pipes.Node.FS
import Test.Pipes.Collect as Test.Pipes.Collect
import Test.Spec.Reporter (specReporter)
import Test.Spec.Runner (defaultConfig, runSpec')
@@ -16,3 +17,4 @@ main = launchAff_ $ runSpec' (defaultConfig { failFast = true, timeout = Nothing
Test.Pipes.Node.Stream.spec
Test.Pipes.Node.Buffer.spec
Test.Pipes.Node.FS.spec
Test.Pipes.Collect.spec

View File

@@ -0,0 +1,111 @@
module Test.Pipes.Collect where
import Prelude
import Control.Monad.Gen (chooseInt)
import Control.Monad.Rec.Class (Step(..), tailRecM)
import Control.Monad.ST as ST
import Control.Monad.ST.Ref as STRef
import Data.Array as Array
import Data.Bifunctor (lmap)
import Data.HashMap (HashMap)
import Data.HashMap as HashMap
import Data.List (List)
import Data.List as List
import Data.Map (Map)
import Data.Map as Map
import Data.Maybe (Maybe(..))
import Data.Traversable (traverse)
import Data.Tuple.Nested (type (/\), (/\))
import Effect.Aff (Aff)
import Effect.Class (liftEffect)
import Effect.Unsafe (unsafePerformEffect)
import Foreign.Object (Object)
import Foreign.Object as Object
import Pipes (yield)
import Pipes.Collect as Pipes.Collect
import Pipes.Core (Producer)
import Test.QuickCheck.Gen (randomSampleOne)
import Test.Spec (Spec, describe, it)
import Test.Spec.Assertions (shouldEqual)
testData
:: { array :: Array (Int /\ Int)
, list :: List (Int /\ Int)
, strarray :: Array (String /\ Int)
, object :: Object Int
, map :: Map Int Int
, hashMap :: HashMap Int Int
, stream :: Producer (Int /\ Int) Aff Unit
, streamStr :: Producer (String /\ Int) Aff Unit
}
testData =
unsafePerformEffect $ do
array <-
flip traverse (Array.range 0 99999) \k -> do
v <- liftEffect $ randomSampleOne $ chooseInt 0 99999
pure $ k /\ v
let
strarray = lmap show <$> array
object = Object.fromFoldable strarray
map' :: forall m. m -> (Int -> Int -> m -> m) -> m
map' empty insert = ST.run do
st <- STRef.new empty
ST.foreach array \(k /\ v) -> void $ STRef.modify (insert k v) st
STRef.read st
hashMap = map' HashMap.empty HashMap.insert
map = map' Map.empty Map.insert
pure
{ array
, strarray
, list: List.fromFoldable array
, object
, hashMap
, map
, stream: flip tailRecM 0 \ix -> case Array.index array ix of
Just a -> yield a $> Loop (ix + 1)
Nothing -> pure $ Done unit
, streamStr: flip tailRecM 0 \ix -> case Array.index strarray ix of
Just a -> yield a $> Loop (ix + 1)
Nothing -> pure $ Done unit
}
spec :: Spec Unit
spec =
describe "Test.Pipes.Collect" do
describe "toArray" do
it "collects an array" do
act <- Pipes.Collect.toArray testData.stream
act `shouldEqual` testData.array
it "empty ok" do
act :: Array Int <- Pipes.Collect.toArray (pure unit)
act `shouldEqual` []
describe "toObject" do
it "collects" do
act <- Pipes.Collect.toObject $ testData.streamStr
act `shouldEqual` testData.object
it "empty ok" do
act :: Object Int <- Pipes.Collect.toObject (pure unit)
act `shouldEqual` Object.empty
describe "toMap" do
it "collects" do
act <- Pipes.Collect.toMap testData.stream
act `shouldEqual` testData.map
it "empty ok" do
act :: Map String Int <- Pipes.Collect.toMap (pure unit)
act `shouldEqual` Map.empty
describe "toHashMap" do
it "collects" do
act <- Pipes.Collect.toHashMap testData.stream
act `shouldEqual` testData.hashMap
it "empty ok" do
act :: HashMap String Int <- Pipes.Collect.toHashMap (pure unit)
act `shouldEqual` HashMap.empty
describe "toList" do
it "collects" do
act <- Pipes.Collect.toList testData.stream
act `shouldEqual` testData.list
it "empty ok" do
act :: List (String /\ Int) <- Pipes.Collect.toList (pure unit)
act `shouldEqual` List.Nil

View File

@@ -27,6 +27,7 @@ import Test.Spec (Spec, describe, it)
import Test.Spec.Assertions (fail, shouldEqual)
data BufferJunk = BufferJunk Buffer
instance Arbitrary BufferJunk where
arbitrary = sized \s -> do
ns <- vectorOf s (chooseInt 0 7)
@@ -36,6 +37,7 @@ instance Arbitrary BufferJunk where
pure $ BufferJunk buf
data BufferUTF8 = BufferUTF8 String Buffer
instance Arbitrary BufferUTF8 where
arbitrary = do
s <- genAsciiString
@@ -43,27 +45,27 @@ instance Arbitrary BufferUTF8 where
spec :: Spec Unit
spec = describe "Pipes.Node.Buffer" do
describe "toString" do
it "fails when encoding wrong" do
vals <- Pipes.each <$> (map \(BufferJunk b) -> b) <$> liftEffect (randomSample' 10 arbitrary)
let
uut = Pipes.runEffect $ vals >-> Pipes.Node.Buffer.toString UTF8 >-> Pipes.drain
ok = do
uut
fail "Should have thrown"
err _ = pure unit
catchError ok err
it "junk OK in hex" do
vals <- Pipes.each <$> (map \(BufferJunk b) -> b) <$> liftEffect (randomSample' 10 arbitrary)
Pipes.runEffect $ vals >-> Pipes.Node.Buffer.toString Hex >-> Pipes.drain
it "UTF8 ok" do
vals <- (map \(BufferUTF8 s b) -> s /\ b) <$> liftEffect (randomSample' 100 arbitrary)
let
bufs = Pipes.each $ snd <$> vals
strs = fst <$> vals
act <- Array.fromFoldable <$> Pipes.toListM (bufs >-> Pipes.Node.Buffer.toString UTF8)
act `shouldEqual` strs
describe "fromString" do
it "ok" do
vals <- Pipes.each <$> liftEffect (randomSample' 100 genAsciiString)
Pipes.runEffect $ vals >-> Pipes.Node.Buffer.fromString UTF8 >-> Pipes.drain
describe "toString" do
it "fails when encoding wrong" do
vals <- Pipes.each <$> (map \(BufferJunk b) -> b) <$> liftEffect (randomSample' 10 arbitrary)
let
uut = Pipes.runEffect $ vals >-> Pipes.Node.Buffer.toString UTF8 >-> Pipes.drain
ok = do
uut
fail "Should have thrown"
err _ = pure unit
catchError ok err
it "junk OK in hex" do
vals <- Pipes.each <$> (map \(BufferJunk b) -> b) <$> liftEffect (randomSample' 10 arbitrary)
Pipes.runEffect $ vals >-> Pipes.Node.Buffer.toString Hex >-> Pipes.drain
it "UTF8 ok" do
vals <- (map \(BufferUTF8 s b) -> s /\ b) <$> liftEffect (randomSample' 100 arbitrary)
let
bufs = Pipes.each $ snd <$> vals
strs = fst <$> vals
act <- Array.fromFoldable <$> Pipes.toListM (bufs >-> Pipes.Node.Buffer.toString UTF8)
act `shouldEqual` strs
describe "fromString" do
it "ok" do
vals <- Pipes.each <$> liftEffect (randomSample' 100 genAsciiString)
Pipes.runEffect $ vals >-> Pipes.Node.Buffer.fromString UTF8 >-> Pipes.drain

View File

@@ -24,63 +24,63 @@ import Test.Spec.Assertions (fail, shouldEqual)
spec :: Spec Unit
spec = describe "Pipes.Node.FS" do
describe "read" do
around tmpFile $ it "fails if the file does not exist" \p -> do
flip catchError (const $ pure unit) do
Pipes.runEffect $ Pipes.Node.FS.read p >-> Pipes.drain
fail "should have thrown"
around tmpFile $ it "reads ok" \p -> do
liftEffect $ FS.writeTextFile UTF8 p "foo"
s <- fold <$> Pipes.toListM (Pipes.Node.FS.read p >-> unEOS >-> Pipes.Node.Buffer.toString UTF8)
s `shouldEqual` "foo"
around tmpFile $ it "fails if the file already exists" \p -> do
liftEffect $ FS.writeTextFile UTF8 "foo" p
flip catchError (const $ pure unit) do
Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.create p
fail "should have thrown"
describe "create" do
around tmpFile $ it "creates the file when not exists" \p -> do
describe "read" do
around tmpFile $ it "fails if the file does not exist" \p -> do
flip catchError (const $ pure unit) do
Pipes.runEffect $ Pipes.Node.FS.read p >-> Pipes.drain
fail "should have thrown"
around tmpFile $ it "reads ok" \p -> do
liftEffect $ FS.writeTextFile UTF8 p "foo"
s <- fold <$> Pipes.toListM (Pipes.Node.FS.read p >-> unEOS >-> Pipes.Node.Buffer.toString UTF8)
s `shouldEqual` "foo"
around tmpFile $ it "fails if the file already exists" \p -> do
liftEffect $ FS.writeTextFile UTF8 "foo" p
flip catchError (const $ pure unit) do
Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.create p
contents <- liftEffect $ FS.readTextFile UTF8 p
contents `shouldEqual` "foo"
around tmpFile $ it "fails if the file already exists" \p -> do
liftEffect $ FS.writeTextFile UTF8 "foo" p
flip catchError (const $ pure unit) do
Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.create p
fail "should have thrown"
describe "append" do
around tmpFile $ it "creates the file when not exists" \p -> do
Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.append p
contents <- liftEffect $ FS.readTextFile UTF8 p
contents `shouldEqual` "foo"
around tmpFile $ it "appends" \p -> do
Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.append p
Pipes.runEffect $ withEOS (yield "\n" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.append p
Pipes.runEffect $ withEOS (yield "bar" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.append p
contents <- liftEffect $ FS.readTextFile UTF8 p
contents `shouldEqual` "foo\nbar"
describe "truncate" do
around tmpFile $ it "creates the file when not exists" \p -> do
Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.truncate p
contents <- liftEffect $ FS.readTextFile UTF8 p
contents `shouldEqual` "foo"
around tmpFile $ it "overwrites contents" \p -> do
Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.truncate p
Pipes.runEffect $ withEOS (yield "bar" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.truncate p
contents <- liftEffect $ FS.readTextFile UTF8 p
contents `shouldEqual` "bar"
around tmpFiles $ it "json lines >-> parse >-> _.foo >-> write" \(a /\ b) -> do
let
exp = [{foo: "a"}, {foo: "bar"}, {foo: "123"}]
liftEffect $ FS.writeTextFile UTF8 a $ intercalate "\n" $ writeJSON <$> exp
Pipes.runEffect $
Pipes.Node.FS.read a
fail "should have thrown"
describe "create" do
around tmpFile $ it "creates the file when not exists" \p -> do
Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.create p
contents <- liftEffect $ FS.readTextFile UTF8 p
contents `shouldEqual` "foo"
around tmpFile $ it "fails if the file already exists" \p -> do
liftEffect $ FS.writeTextFile UTF8 "foo" p
flip catchError (const $ pure unit) do
Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.create p
fail "should have thrown"
describe "append" do
around tmpFile $ it "creates the file when not exists" \p -> do
Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.append p
contents <- liftEffect $ FS.readTextFile UTF8 p
contents `shouldEqual` "foo"
around tmpFile $ it "appends" \p -> do
Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.append p
Pipes.runEffect $ withEOS (yield "\n" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.append p
Pipes.runEffect $ withEOS (yield "bar" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.append p
contents <- liftEffect $ FS.readTextFile UTF8 p
contents `shouldEqual` "foo\nbar"
describe "truncate" do
around tmpFile $ it "creates the file when not exists" \p -> do
Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.truncate p
contents <- liftEffect $ FS.readTextFile UTF8 p
contents `shouldEqual` "foo"
around tmpFile $ it "overwrites contents" \p -> do
Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.truncate p
Pipes.runEffect $ withEOS (yield "bar" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.truncate p
contents <- liftEffect $ FS.readTextFile UTF8 p
contents `shouldEqual` "bar"
around tmpFiles $ it "json lines >-> parse >-> _.foo >-> write" \(a /\ b) -> do
let
exp = [ { foo: "a" }, { foo: "bar" }, { foo: "123" } ]
liftEffect $ FS.writeTextFile UTF8 a $ intercalate "\n" $ writeJSON <$> exp
Pipes.runEffect $
Pipes.Node.FS.read a
>-> inEOS (Pipes.Node.Buffer.toString UTF8)
>-> Pipes.String.split (wrap "\n")
>-> inEOS (jsonParse @{foo :: String})
>-> inEOS (jsonParse @{ foo :: String })
>-> inEOS (Pipes.map _.foo)
>-> Pipes.Util.intersperse "\n"
>-> inEOS (Pipes.Node.Buffer.fromString UTF8)
>-> Pipes.Node.FS.create b
act <- liftEffect $ FS.readTextFile UTF8 b
act `shouldEqual` "a\nbar\n123"
act <- liftEffect $ FS.readTextFile UTF8 b
act `shouldEqual` "a\nbar\n123"

View File

@@ -96,7 +96,7 @@ spec =
str :: String <- genAlphaString
num :: Int <- arbitrary
stuff :: Array String <- arbitrary
pure {str, num, stuff}
pure { str, num, stuff }
objs <- liftEffect (randomSample' 1 obj)
let
exp = fold (writeJSON <$> objs)
@@ -108,14 +108,14 @@ spec =
describe "Transform" do
it "gzip" do
let
json = yield $ writeJSON {foo: "bar"}
json = yield $ writeJSON { foo: "bar" }
exp = "1f8b0800000000000003ab564acbcf57b2524a4a2c52aa0500eff52bfe0d000000"
gzip <- S.fromTransform <$> O.fromBufferTransform <$> liftEffect (Zlib.toDuplex <$> Zlib.createGzip)
outs :: List.List String <- Pipes.toListM (S.withEOS (json >-> Pipes.Buffer.fromString UTF8) >-> gzip >-> S.unEOS >-> Pipes.Buffer.toString Hex)
fold outs `shouldEqual` exp
around tmpFiles
$ it "file >-> gzip >-> file >-> gunzip" \(a /\ b) -> do
liftEffect $ FS.writeTextFile UTF8 a $ writeJSON [1, 2, 3, 4]
liftEffect $ FS.writeTextFile UTF8 a $ writeJSON [ 1, 2, 3, 4 ]
areader <- liftEffect $ reader a
bwritestream /\ bwriter <- liftEffect $ writer b
gzip <- S.fromTransform <$> O.fromBufferTransform <$> liftEffect (Zlib.toDuplex <$> Zlib.createGzip)
@@ -125,7 +125,7 @@ spec =
gunzip <- S.fromTransform <$> O.fromBufferTransform <$> liftEffect (Zlib.toDuplex <$> Zlib.createGunzip)
breader <- liftEffect $ reader b
nums <- Pipes.toListM (breader >-> gunzip >-> S.unEOS >-> Pipes.Buffer.toString UTF8 >-> jsonParse @(Array Int) >-> Pipes.mapFoldable identity)
Array.fromFoldable nums `shouldEqual` [1, 2, 3, 4]
Array.fromFoldable nums `shouldEqual` [ 1, 2, 3, 4 ]
around tmpFile $ it "file >-> discardTransform" \(p :: String) -> do
liftEffect $ FS.writeTextFile UTF8 p "foo"
r <- reader p
@@ -137,4 +137,4 @@ spec =
r <- reader p
chars' <- liftEffect charsTransform
out :: List.List String <- Pipes.toListM $ r >-> S.inEOS (Pipes.Buffer.toString UTF8) >-> S.fromTransform chars' >-> S.unEOS
out `shouldEqual` List.fromFoldable ["f", "o", "o", " ", "b", "a", "r"]
out `shouldEqual` List.fromFoldable [ "f", "o", "o", " ", "b", "a", "r" ]