9 Commits

13 changed files with 295 additions and 186 deletions

139
README.md
View File

@@ -2,41 +2,114 @@
Interact with node streams in object mode using [`Pipes`]! Interact with node streams in object mode using [`Pipes`]!
## Example ## Install
```purescript
import Prelude
import Effect.Aff (launchAff_)
import Effect (Effect)
import Effect.Class (liftEffect)
import Effect.Console (log)
import Pipes.Prelude ((>->))
import Pipes.Prelude as Pipes
import Pipes.Core as Pipes.Core
import Pipes.Node.FS.Stream as FS
import Pipes.Node.Zlib as Zlib
import Pipes.CSV.Parse as CSV.Parse
-- == my-zipped-data.csv ==
-- id,foo,is_deleted
-- 1,hello,f
-- 2,goodbye,t
-- Logs:
-- {id: 1, foo: "hello", is_deleted: false}
-- {id: 2, foo: "goodbye", is_deleted: true}
main :: Effect Unit
main =
Pipes.Core.runEffect
$ FS.createReadStream "my-zipped-data.csv.gz"
>-> Zlib.gunzip
>-> CSV.Parse.parse @{id :: Int, foo :: String, is_deleted :: Boolean}
>-> Pipes.mapM (liftEffect <<< log)
```
## Installing
```bash ```bash
spago install node-stream-pipes spago install node-stream-pipes
``` ```
## Usage
### Node Streams
#### Raw Streams
Raw `objectMode` Node streams are represented in `Node.Stream.Object`:
- `Writable a` accepts chunks of type `a`
- `Readable a` emits chunks of type `a`
- `Transform a b` transforms chunks from `a` to `b`
Non-Object streams can also be represented with these types; for example an `fs.WriteStream`
can be coerced to `Writable Buffer` without issue.
Interop between these types and `Node.Stream` are provided in `Node.Stream.Object`:
- `unsafeFrom{String,Buffer}{Writable,Readable,Transform}`
- `unsafeCoerce{Writable,Readable,Transform}`
#### Pipes
Streams in `Node.Stream.Object` can be converted to `Producer`s, `Consumer`s and `Pipe`s with `Pipes.Node.Stream`:
- `fromReadable :: forall a. <Readable a> -> Producer (Maybe a) <Aff> Unit`
- `fromWritable :: forall a. <Writable a> -> Consumer (Maybe a) <Aff> Unit`
- `fromTransform :: forall a b. <Transform a b> -> Pipe (Maybe a) (Maybe b) <Aff> Unit`
#### EOS Marker
Normally, pipe computations will not be executed once any computation in a pipeline exits.
To allow for resource cleanup and awareness that the stream is about to close,
`Maybe a` is used occasionally in this package as an End-of-Stream marker:
```purescript
-- foo.txt is "hello, world!\n"
chunks <- Pipes.Collect.toArray $ Pipes.FS.read "foo.txt" >-> Pipes.Node.Stream.inEOS (Pipes.Buffer.toString UTF8)
chunks `shouldEqual` [Just "hello, world!\n", Nothing]
```
Pipes from `a -> b` unaware of EOS can be lifted to `Maybe a -> Maybe b` with `Pipes.Node.Stream.inEOS`.
Producers of `Maybe a` can drop the EOS marker and emit `a` with `Pipes.Node.Stream.unEOS`.
Producers of `a` can have an EOS marker added with `Pipes.Node.Stream.withEOS`.
#### Example
`Pipes.PassThrough.js`
```javascript
import {PassThrough} from 'stream'
export const makePassThrough = () => new PassThrough()
```
`Pipes.PassThrough.purs`
```purescript
module Pipes.PassThrough where
import Prelude
import Effect (Effect)
import Effect.Class (liftEffect)
import Effect.Aff (Aff)
import Pipes.Core (Pipe)
import Node.Stream.Object as ObjectStream
import Pipes.Node.Stream as Pipes.Node.Stream
type PassThroughStream a = ObjectStream.Transform a a
foreign import makeRaw :: Effect PassThroughStream
passThrough :: forall a. Pipe a a Aff Unit
passThrough = do
raw <- liftEffect $ makeRaw
Pipes.Node.Stream.fromTransform raw
```
### Utilities
This package provides utilities that explicitly use `MonadRec` to ensure stack-safety
when dealing with producers of large amounts of data.
- `Pipes.Collect` provides stack-safe utilities for executing a pipeline and collecting results into a collection, `Buffer`, `Monoid` etc.
- `Pipes.Construct` provides stack-safe utilities for creating producers from in-memory collections.
- `Pipes.Util` provides some miscellaneous utilities missing from `pipes`.
### Zlib
Pipes for compression & decompression using `zlib` are provided in `Pipes.Node.Zlib`.
### FS
Read files with:
- `Pipes.Node.FS.read <path>`
- `Pipes.Node.FS.read' <WriteStreamOptions> <path>`
```purescript
Pipes.Collect.toStringWith UTF8 $ Pipes.Node.FS.read "foo.txt" >-> Pipes.Stream.unEOS
```
Write files with:
- `Pipes.Node.FS.write' <WriteStreamOptions> <path>`
- `Pipes.Node.FS.trunc <path>`
- `Pipes.Node.FS.create <path>`
- `Pipes.Node.FS.append <path>`
```purescript
Pipes.Stream.withEOS (
Pipes.Construct.eachArray ["id,name", "1,henry", "2,suzie"]
>-> Pipes.Util.intersperse "\n"
>-> Pipes.Buffer.fromString UTF8
)
>-> Pipes.Node.FS.create "foo.csv"
```
[`Pipes`]: https://pursuit.purescript.org/packages/purescript-pipes/8.0.0 [`Pipes`]: https://pursuit.purescript.org/packages/purescript-pipes/8.0.0

View File

@@ -1,6 +1,6 @@
{ {
"name": "purescript-node-stream-pipes", "name": "purescript-node-stream-pipes",
"version": "v1.4.0", "version": "v1.6.0",
"type": "module", "type": "module",
"dependencies": { "dependencies": {
"csv-parse": "^5.5.5", "csv-parse": "^5.5.5",

View File

@@ -13,7 +13,6 @@ workspace:
- lists: ">=7.0.0 <8.0.0" - lists: ">=7.0.0 <8.0.0"
- maybe: ">=6.0.0 <7.0.0" - maybe: ">=6.0.0 <7.0.0"
- mmorph: ">=7.0.0 <8.0.0" - mmorph: ">=7.0.0 <8.0.0"
- newtype: ">=5.0.0 <6.0.0"
- node-buffer: ">=9.0.0 <10.0.0" - node-buffer: ">=9.0.0 <10.0.0"
- node-event-emitter: ">=3.0.0 <4.0.0" - node-event-emitter: ">=3.0.0 <4.0.0"
- node-fs: ">=9.1.0 <10.0.0" - node-fs: ">=9.1.0 <10.0.0"

View File

@@ -1,7 +1,7 @@
package: package:
name: node-stream-pipes name: node-stream-pipes
publish: publish:
version: '1.4.0' version: '1.6.0'
license: 'GPL-3.0-or-later' license: 'GPL-3.0-or-later'
location: location:
githubOwner: 'cakekindel' githubOwner: 'cakekindel'
@@ -20,7 +20,6 @@ package:
- lists: ">=7.0.0 <8.0.0" - lists: ">=7.0.0 <8.0.0"
- maybe: ">=6.0.0 <7.0.0" - maybe: ">=6.0.0 <7.0.0"
- mmorph: ">=7.0.0 <8.0.0" - mmorph: ">=7.0.0 <8.0.0"
- newtype: ">=5.0.0 <6.0.0"
- node-buffer: ">=9.0.0 <10.0.0" - node-buffer: ">=9.0.0 <10.0.0"
- node-event-emitter: ">=3.0.0 <4.0.0" - node-event-emitter: ">=3.0.0 <4.0.0"
- node-fs: ">=9.1.0 <10.0.0" - node-fs: ">=9.1.0 <10.0.0"

View File

@@ -21,19 +21,18 @@ export const isReadableEndedImpl = (s) => () => s.readableEnded;
/** @type {(s: Stream.Writable | Stream.Transform) => () => boolean} */ /** @type {(s: Stream.Writable | Stream.Transform) => () => boolean} */
export const isWritableEndedImpl = (s) => () => s.writableEnded; export const isWritableEndedImpl = (s) => () => s.writableEnded;
/** @type {(s: Stream.Writable | Stream.Transform) => () => boolean} */
export const isWritableFinishedImpl = (s) => () => s.writableFinished;
/** @type {(s: Stream.Writable | Stream.Transform) => () => void} */ /** @type {(s: Stream.Writable | Stream.Transform) => () => void} */
export const endImpl = (s) => () => s.end(); export const endImpl = (s) => () => s.end();
/** @type {<WriteResult>(o: {ok: WriteResult, wouldBlock: WriteResult, closed: WriteResult}) => (s: Stream.Writable | Stream.Transform) => (a: unknown) => () => WriteResult} */ /** @type {<WriteResult>(o: {ok: WriteResult, wouldBlock: WriteResult}) => (s: Stream.Writable | Stream.Transform) => (a: unknown) => () => WriteResult} */
export const writeImpl = export const writeImpl =
({ ok, wouldBlock, closed }) => ({ ok, wouldBlock }) =>
(s) => (s) =>
(a) => (a) =>
() => { () => {
if (s.closed || s.writableEnded) {
return closed;
}
if (s.write(a)) { if (s.write(a)) {
return ok; return ok;
} else { } else {
@@ -41,15 +40,11 @@ export const writeImpl =
} }
}; };
/** @type {<ReadResult>(o: {just: (_a: unknown) => ReadResult, wouldBlock: ReadResult, closed: ReadResult}) => (s: Stream.Readable | Stream.Transform) => () => ReadResult} */ /** @type {<ReadResult>(o: {just: (_a: unknown) => ReadResult, wouldBlock: ReadResult}) => (s: Stream.Readable | Stream.Transform) => () => ReadResult} */
export const readImpl = export const readImpl =
({ just, closed, wouldBlock }) => ({ just, wouldBlock }) =>
(s) => (s) =>
() => { () => {
if (s.closed || s.readableEnded) {
return closed;
}
const a = s.read(); const a = s.read();
if (a === null) { if (a === null) {
return wouldBlock; return wouldBlock;

View File

@@ -14,8 +14,9 @@ import Data.Maybe (Maybe(..))
import Data.Show.Generic (genericShow) import Data.Show.Generic (genericShow)
import Effect (Effect) import Effect (Effect)
import Effect.Aff (Aff, effectCanceler, makeAff) import Effect.Aff (Aff, effectCanceler, makeAff)
import Effect.Aff as Aff
import Effect.Class (liftEffect) import Effect.Class (liftEffect)
import Effect.Exception (Error) import Effect.Exception (Error, error)
import Effect.Uncurried (mkEffectFn1) import Effect.Uncurried (mkEffectFn1)
import Node.Buffer (Buffer) import Node.Buffer (Buffer)
import Node.EventEmitter (EventHandle(..)) import Node.EventEmitter (EventHandle(..))
@@ -26,7 +27,6 @@ import Unsafe.Coerce (unsafeCoerce)
data ReadResult a data ReadResult a
= ReadWouldBlock = ReadWouldBlock
| ReadClosed
| ReadJust a | ReadJust a
derive instance Generic (ReadResult a) _ derive instance Generic (ReadResult a) _
@@ -35,9 +35,12 @@ derive instance Eq a => Eq (ReadResult a)
instance Show (ReadResult a) where instance Show (ReadResult a) where
show = genericShow <<< map (const "..") show = genericShow <<< map (const "..")
maybeReadResult :: forall a. ReadResult a -> Maybe a
maybeReadResult (ReadWouldBlock) = Nothing
maybeReadResult (ReadJust a) = Just a
data WriteResult data WriteResult
= WriteWouldBlock = WriteWouldBlock
| WriteClosed
| WriteOk | WriteOk
derive instance Generic WriteResult _ derive instance Generic WriteResult _
@@ -45,8 +48,8 @@ derive instance Eq WriteResult
instance Show WriteResult where instance Show WriteResult where
show = genericShow show = genericShow
type ReadResultFFI a = { closed :: ReadResult a, wouldBlock :: ReadResult a, just :: a -> ReadResult a } type ReadResultFFI a = { wouldBlock :: ReadResult a, just :: a -> ReadResult a }
type WriteResultFFI = { closed :: WriteResult, wouldBlock :: WriteResult, ok :: WriteResult } type WriteResultFFI = { wouldBlock :: WriteResult, ok :: WriteResult }
foreign import data Writable :: Type -> Type foreign import data Writable :: Type -> Type
foreign import data Readable :: Type -> Type foreign import data Readable :: Type -> Type
@@ -59,15 +62,16 @@ foreign import isReadableImpl :: forall s. s -> Effect Boolean
foreign import isWritableImpl :: forall s. s -> Effect Boolean foreign import isWritableImpl :: forall s. s -> Effect Boolean
foreign import isReadableEndedImpl :: forall s. s -> Effect Boolean foreign import isReadableEndedImpl :: forall s. s -> Effect Boolean
foreign import isWritableEndedImpl :: forall s. s -> Effect Boolean foreign import isWritableEndedImpl :: forall s. s -> Effect Boolean
foreign import isWritableFinishedImpl :: forall s. s -> Effect Boolean
foreign import isClosedImpl :: forall s. s -> Effect Boolean foreign import isClosedImpl :: forall s. s -> Effect Boolean
foreign import needsDrainImpl :: forall s. s -> Effect Boolean foreign import needsDrainImpl :: forall s. s -> Effect Boolean
foreign import readableLengthImpl :: forall s. s -> Effect Int foreign import readableLengthImpl :: forall s. s -> Effect Int
readResultFFI :: forall a. ReadResultFFI a readResultFFI :: forall a. ReadResultFFI a
readResultFFI = { closed: ReadClosed, wouldBlock: ReadWouldBlock, just: ReadJust } readResultFFI = { wouldBlock: ReadWouldBlock, just: ReadJust }
writeResultFFI :: WriteResultFFI writeResultFFI :: WriteResultFFI
writeResultFFI = { closed: WriteClosed, wouldBlock: WriteWouldBlock, ok: WriteOk } writeResultFFI = { wouldBlock: WriteWouldBlock, ok: WriteOk }
class Stream :: Type -> Constraint class Stream :: Type -> Constraint
class Stream s where class Stream s where
@@ -92,6 +96,7 @@ class Stream s <= Write s a | s -> a where
isWritable :: s -> Effect Boolean isWritable :: s -> Effect Boolean
needsDrain :: s -> Effect Boolean needsDrain :: s -> Effect Boolean
isWritableEnded :: s -> Effect Boolean isWritableEnded :: s -> Effect Boolean
isWritableFinished :: s -> Effect Boolean
write :: s -> a -> Effect WriteResult write :: s -> a -> Effect WriteResult
end :: s -> Effect Unit end :: s -> Effect Unit
@@ -114,18 +119,21 @@ else instance (Read s a) => Read s a where
instance Write (Writable a) a where instance Write (Writable a) a where
isWritable = isWritableImpl isWritable = isWritableImpl
isWritableEnded = isWritableEndedImpl isWritableEnded = isWritableEndedImpl
isWritableFinished = isWritableFinishedImpl
write s = writeImpl writeResultFFI s write s = writeImpl writeResultFFI s
end = endImpl end = endImpl
needsDrain = needsDrainImpl needsDrain = needsDrainImpl
else instance Write (Transform a b) a where else instance Write (Transform a b) a where
isWritable = isWritableImpl isWritable = isWritableImpl
isWritableEnded = isWritableEndedImpl isWritableEnded = isWritableEndedImpl
isWritableFinished = isWritableFinishedImpl
write s = writeImpl writeResultFFI s write s = writeImpl writeResultFFI s
end = endImpl end = endImpl
needsDrain = needsDrainImpl needsDrain = needsDrainImpl
else instance (Write s a) => Write s a where else instance (Write s a) => Write s a where
isWritable = isWritableImpl isWritable = isWritableImpl
isWritableEnded = isWritableEndedImpl isWritableEnded = isWritableEndedImpl
isWritableFinished = isWritableFinishedImpl
write s a = write s a write s a = write s a
end s = end s end s = end s
needsDrain = needsDrainImpl needsDrain = needsDrainImpl
@@ -136,45 +144,67 @@ withErrorST s = do
cancel <- flip (Event.once errorH) s \e -> void $ liftST $ STRef.write (Just e) error cancel <- flip (Event.once errorH) s \e -> void $ liftST $ STRef.write (Just e) error
pure { error, cancel } pure { error, cancel }
fromBufferReadable :: forall r. Stream.Readable r -> Readable Buffer unsafeCoerceWritable :: forall r a. Stream.Writable r -> Writable a
fromBufferReadable = unsafeCoerce unsafeCoerceWritable = unsafeCoerce
fromBufferTransform :: Stream.Duplex -> Transform Buffer Buffer unsafeCoerceReadable :: forall r a. Stream.Readable r -> Readable a
fromBufferTransform = unsafeCoerce unsafeCoerceReadable = unsafeCoerce
fromBufferWritable :: forall r. Stream.Writable r -> Writable Buffer unsafeCoerceTransform :: forall a b. Stream.Duplex -> Transform a b
fromBufferWritable = unsafeCoerce unsafeCoerceTransform = unsafeCoerce
fromStringReadable :: forall r. Stream.Readable r -> Readable String unsafeFromBufferReadable :: forall r. Stream.Readable r -> Readable Buffer
fromStringReadable = unsafeCoerce unsafeFromBufferReadable = unsafeCoerce
fromStringTransform :: Stream.Duplex -> Transform String String unsafeFromBufferTransform :: forall a. Stream.Duplex -> Transform Buffer a
fromStringTransform = unsafeCoerce unsafeFromBufferTransform = unsafeCoerce
fromStringWritable :: forall r. Stream.Writable r -> Writable String unsafeFromBufferWritable :: forall r. Stream.Writable r -> Writable Buffer
fromStringWritable = unsafeCoerce unsafeFromBufferWritable = unsafeCoerce
unsafeFromStringReadable :: forall r. Stream.Readable r -> Readable String
unsafeFromStringReadable = unsafeCoerce
unsafeFromStringTransform :: forall a. Stream.Duplex -> Transform String a
unsafeFromStringTransform = unsafeCoerce
unsafeFromStringWritable :: forall r. Stream.Writable r -> Writable String
unsafeFromStringWritable = unsafeCoerce
awaitReadableOrClosed :: forall s a. Read s a => s -> Aff Unit awaitReadableOrClosed :: forall s a. Read s a => s -> Aff Unit
awaitReadableOrClosed s = do awaitReadableOrClosed s = do
fiber <-
Aff.forkAff $ parOneOf
[ onceAff0 readableH s $> Right unit
, onceAff0 closeH s $> Right unit
, Left <$> onceAff1 errorH s
]
closed <- liftEffect $ isClosed s
readEnded <- liftEffect $ isReadableEnded s
readable <- liftEffect $ isReadable s readable <- liftEffect $ isReadable s
length <- liftEffect $ readableLength s length <- liftEffect $ readableLength s
when (readable && length == 0) if (not closed && not readEnded && readable && length == 0) then
$ liftEither liftEither =<< Aff.joinFiber fiber
=<< parOneOf else
[ onceAff0 readableH s $> Right unit Aff.killFiber (error "") fiber
, onceAff0 closeH s $> Right unit
, Left <$> onceAff1 errorH s
]
awaitFinished :: forall s a. Write s a => s -> Aff Unit awaitFinished :: forall s a. Write s a => s -> Aff Unit
awaitFinished s = onceAff0 finishH s awaitFinished s = do
fiber <- Aff.forkAff $ onceAff0 finishH s
finished <- liftEffect $ isWritableFinished s
if not finished then Aff.joinFiber fiber else Aff.killFiber (error "") fiber
awaitWritableOrClosed :: forall s a. Write s a => s -> Aff Unit awaitWritableOrClosed :: forall s a. Write s a => s -> Aff Unit
awaitWritableOrClosed s = do awaitWritableOrClosed s = do
fiber <- Aff.forkAff $ parOneOf [ onceAff0 drainH s $> Right unit, onceAff0 closeH s $> Right unit, Left <$> onceAff1 errorH s ]
closed <- liftEffect $ isClosed s
writeEnded <- liftEffect $ isWritableEnded s
writable <- liftEffect $ isWritable s writable <- liftEffect $ isWritable s
needsDrain <- liftEffect $ needsDrain s needsDrain <- liftEffect $ needsDrain s
when (writable && needsDrain) if not closed && not writeEnded && writable && needsDrain then
$ liftEither =<< parOneOf [ onceAff0 drainH s $> Right unit, onceAff0 closeH s $> Right unit, Left <$> onceAff1 errorH s ] liftEither =<< Aff.joinFiber fiber
else
Aff.killFiber (error "") fiber
onceAff0 :: forall e. EventHandle0 e -> e -> Aff Unit onceAff0 :: forall e. EventHandle0 e -> e -> Aff Unit
onceAff0 h emitter = makeAff \res -> do onceAff0 h emitter = makeAff \res -> do

View File

@@ -20,6 +20,7 @@ import Foreign.Object.ST as Object.ST
import Foreign.Object.ST.Unsafe as Object.ST.Unsafe import Foreign.Object.ST.Unsafe as Object.ST.Unsafe
import Node.Buffer (Buffer) import Node.Buffer (Buffer)
import Node.Buffer as Buffer import Node.Buffer as Buffer
import Node.Encoding (Encoding)
import Pipes.Core (Producer) import Pipes.Core (Producer)
import Pipes.Internal (Proxy(..)) import Pipes.Internal (Proxy(..))
@@ -51,7 +52,16 @@ fold f b0 p0 = traverse (\b a -> pure $ f b a) b0 p0
foreach :: forall a m. MonadRec m => (a -> m Unit) -> Producer a m Unit -> m Unit foreach :: forall a m. MonadRec m => (a -> m Unit) -> Producer a m Unit -> m Unit
foreach f p0 = traverse (\_ a -> f a) unit p0 foreach f p0 = traverse (\_ a -> f a) unit p0
-- | Concatenate all produced buffers -- | `append` all emitted values to `mempty`
toMonoid :: forall a m. Monoid a => MonadRec m => MonadEffect m => Producer a m Unit -> m a
toMonoid = fold (<>) mempty
-- | Concatenate all buffers to a single buffer, then decode with the
-- | provided encoding.
toStringWith :: forall m. MonadRec m => MonadEffect m => Encoding -> Producer Buffer m Unit -> m String
toStringWith enc = (liftEffect <<< Buffer.toString enc) <=< toBuffer
-- | Concatenate all produced buffers to a single buffer
toBuffer :: forall m. MonadRec m => MonadEffect m => Producer Buffer m Unit -> m Buffer toBuffer :: forall m. MonadRec m => MonadEffect m => Producer Buffer m Unit -> m Buffer
toBuffer p = toBuffer p =
(liftEffect <<< maybe (Buffer.alloc 0) pure) (liftEffect <<< maybe (Buffer.alloc 0) pure)
@@ -74,9 +84,18 @@ toArray p = do
liftEffect $ liftST $ Array.ST.unsafeFreeze st liftEffect $ liftST $ Array.ST.unsafeFreeze st
-- | Collect all values from a `Producer` into a list. -- | Collect all values from a `Producer` into a list.
-- |
-- | Reverses the list after collecting, so that values will be
-- | in the order they were emitted.
toList :: forall a m. MonadRec m => MonadEffect m => Producer a m Unit -> m (List a) toList :: forall a m. MonadRec m => MonadEffect m => Producer a m Unit -> m (List a)
toList = map List.reverse <<< fold (flip List.Cons) List.Nil toList = map List.reverse <<< fold (flip List.Cons) List.Nil
-- | Collect all values from a `Producer` into a list.
-- |
-- | Does not reverse the list after collecting.
toListRev :: forall a m. MonadRec m => MonadEffect m => Producer a m Unit -> m (List a)
toListRev = map List.reverse <<< fold (flip List.Cons) List.Nil
-- | Collect all values from a `Producer` into a Javascript Object. -- | Collect all values from a `Producer` into a Javascript Object.
toObject :: forall a m. MonadRec m => MonadEffect m => Producer (String /\ a) m Unit -> m (Object a) toObject :: forall a m. MonadRec m => MonadEffect m => Producer (String /\ a) m Unit -> m (Object a)
toObject p = do toObject p = do

View File

@@ -23,7 +23,7 @@ import Prim.Row (class Union)
-- | -- |
-- | See `Pipes.Node.Stream.withEOS` for converting `Producer a` -- | See `Pipes.Node.Stream.withEOS` for converting `Producer a`
-- | into `Producer (Maybe a)`, emitting `Nothing` before exiting. -- | into `Producer (Maybe a)`, emitting `Nothing` before exiting.
write write'
:: forall r trash m :: forall r trash m
. Union r trash WriteStreamOptions . Union r trash WriteStreamOptions
=> MonadAff m => MonadAff m
@@ -31,27 +31,27 @@ write
=> Record r => Record r
-> FilePath -> FilePath
-> Consumer (Maybe Buffer) m Unit -> Consumer (Maybe Buffer) m Unit
write o p = do write' o p = do
w <- liftEffect $ FS.Stream.createWriteStream' p o w <- liftEffect $ FS.Stream.createWriteStream' p o
fromWritable $ O.fromBufferWritable w fromWritable $ O.unsafeCoerceWritable w
-- | Open a file in write mode, failing if the file already exists. -- | Open a file in write mode, failing if the file already exists.
-- | -- |
-- | `write {flags: "wx"}` -- | `write' {flags: "wx"}`
create :: forall m. MonadAff m => MonadThrow Error m => FilePath -> Consumer (Maybe Buffer) m Unit create :: forall m. MonadAff m => MonadThrow Error m => FilePath -> Consumer (Maybe Buffer) m Unit
create = write { flags: "wx" } create = write' { flags: "wx" }
-- | Open a file in write mode, truncating it if the file already exists. -- | Open a file in write mode, truncating it if the file already exists.
-- | -- |
-- | `write {flags: "w"}` -- | `write' {flags: "w"}`
truncate :: forall m. MonadAff m => MonadThrow Error m => FilePath -> Consumer (Maybe Buffer) m Unit trunc :: forall m. MonadAff m => MonadThrow Error m => FilePath -> Consumer (Maybe Buffer) m Unit
truncate = write { flags: "w" } trunc = write' { flags: "w" }
-- | Open a file in write mode, appending written contents if the file already exists. -- | Open a file in write mode, appending written contents if the file already exists.
-- | -- |
-- | `write {flags: "a"}` -- | `write' {flags: "a"}`
append :: forall m. MonadAff m => MonadThrow Error m => FilePath -> Consumer (Maybe Buffer) m Unit append :: forall m. MonadAff m => MonadThrow Error m => FilePath -> Consumer (Maybe Buffer) m Unit
append = write { flags: "a" } append = write' { flags: "a" }
-- | Creates a `fs.Readable` stream for the file at the given path. -- | Creates a `fs.Readable` stream for the file at the given path.
-- | -- |
@@ -60,7 +60,7 @@ append = write { flags: "a" }
read :: forall m. MonadAff m => MonadThrow Error m => FilePath -> Producer (Maybe Buffer) m Unit read :: forall m. MonadAff m => MonadThrow Error m => FilePath -> Producer (Maybe Buffer) m Unit
read p = do read p = do
r <- liftEffect $ FS.Stream.createReadStream p r <- liftEffect $ FS.Stream.createReadStream p
fromReadable $ O.fromBufferReadable r fromReadable $ O.unsafeCoerceReadable r
-- | Creates a `fs.Readable` stream for the file at the given path. -- | Creates a `fs.Readable` stream for the file at the given path.
-- | -- |
@@ -76,4 +76,4 @@ read'
-> Producer (Maybe Buffer) m Unit -> Producer (Maybe Buffer) m Unit
read' opts p = do read' opts p = do
r <- liftEffect $ FS.Stream.createReadStream' p opts r <- liftEffect $ FS.Stream.createReadStream' p opts
fromReadable $ O.fromBufferReadable r fromReadable $ O.unsafeCoerceReadable r

View File

@@ -3,15 +3,13 @@ module Pipes.Node.Stream where
import Prelude hiding (join) import Prelude hiding (join)
import Control.Monad.Error.Class (class MonadThrow, throwError) import Control.Monad.Error.Class (class MonadThrow, throwError)
import Control.Monad.Rec.Class (class MonadRec, Step(..), tailRecM) import Control.Monad.Rec.Class (class MonadRec, Step(..), tailRecM, whileJust)
import Control.Monad.ST.Class (liftST) import Control.Monad.ST.Class (liftST)
import Control.Monad.ST.Ref as STRef import Control.Monad.ST.Ref as STRef
import Control.Monad.Trans.Class (lift) import Control.Monad.Trans.Class (lift)
import Data.Maybe (Maybe(..)) import Data.Maybe (Maybe(..), maybe)
import Data.Newtype (wrap) import Data.Traversable (for_, traverse, traverse_)
import Data.Traversable (for_)
import Data.Tuple.Nested ((/\)) import Data.Tuple.Nested ((/\))
import Effect.Aff (delay)
import Effect.Aff.Class (class MonadAff, liftAff) import Effect.Aff.Class (class MonadAff, liftAff)
import Effect.Class (liftEffect) import Effect.Class (liftEffect)
import Effect.Exception (Error) import Effect.Exception (Error)
@@ -19,7 +17,6 @@ import Node.Stream.Object as O
import Pipes (await, yield) import Pipes (await, yield)
import Pipes (for) as P import Pipes (for) as P
import Pipes.Core (Consumer, Pipe, Producer, Producer_) import Pipes.Core (Consumer, Pipe, Producer, Producer_)
import Pipes.Prelude (mapFoldable) as P
import Pipes.Util (InvokeResult(..), invoke) import Pipes.Util (InvokeResult(..), invoke)
-- | Convert a `Readable` stream to a `Pipe`. -- | Convert a `Readable` stream to a `Pipe`.
@@ -40,8 +37,13 @@ fromReadable r =
res <- liftEffect $ O.read r res <- liftEffect $ O.read r
case res of case res of
O.ReadJust a -> yield (Just a) $> Loop { error, cancel } O.ReadJust a -> yield (Just a) $> Loop { error, cancel }
O.ReadWouldBlock -> liftAff (O.awaitReadableOrClosed r) $> Loop { error, cancel } O.ReadWouldBlock -> do
O.ReadClosed -> yield Nothing *> cleanup cancel ended <- liftEffect $ O.isReadableEnded r
if ended then do
yield Nothing
cleanup cancel
else
liftAff (O.awaitReadableOrClosed r) $> Loop { error, cancel }
in in
do do
e <- liftEffect $ O.withErrorST r e <- liftEffect $ O.withErrorST r
@@ -52,86 +54,78 @@ fromReadable r =
-- | When `Nothing` is piped to this, the stream will -- | When `Nothing` is piped to this, the stream will
-- | be `end`ed, and the pipe will noop if invoked again. -- | be `end`ed, and the pipe will noop if invoked again.
fromWritable :: forall s a m. MonadThrow Error m => MonadAff m => O.Write s a => s -> Consumer (Maybe a) m Unit fromWritable :: forall s a m. MonadThrow Error m => MonadAff m => O.Write s a => s -> Consumer (Maybe a) m Unit
fromWritable w = fromWritable w = do
{ error: errorST, cancel: removeErrorListener } <- liftEffect $ O.withErrorST w
let let
cleanup rmErrorListener = do maybeThrow = traverse_ throwError =<< liftEffect (liftST $ STRef.read errorST)
liftEffect rmErrorListener
liftEffect $ O.end w waitCanWrite = do
shouldWait <- liftEffect $ O.needsDrain w
when shouldWait $ liftAff $ O.awaitWritableOrClosed w
cleanup = do
liftAff $ O.awaitFinished w liftAff $ O.awaitFinished w
pure $ Done unit maybeThrow
liftEffect removeErrorListener
go { error, cancel } = do onEOS = liftEffect (O.end w) *> cleanup $> Done unit
err <- liftEffect $ liftST $ STRef.read error onChunk a = liftEffect (O.write w a) $> Loop unit
for_ err throwError
needsDrain <- liftEffect $ O.needsDrain w go _ = do
when needsDrain $ liftAff $ O.awaitWritableOrClosed w maybeThrow
ma <- await waitCanWrite
case ma of ended <- liftEffect $ O.isWritableEnded w
Nothing -> cleanup cancel if ended then
Just a -> do cleanup $> Done unit
res <- liftEffect $ O.write w a else
case res of await >>= maybe onEOS onChunk
O.WriteClosed -> cleanup cancel
_ -> pure $ Loop { error, cancel } tailRecM go unit
in
do
r <- liftEffect $ O.withErrorST w
tailRecM go r
-- | Convert a `Transform` stream to a `Pipe`. -- | Convert a `Transform` stream to a `Pipe`.
-- | -- |
-- | When `Nothing` is piped to this, the `Transform` stream will -- | When `Nothing` is piped to this, the `Transform` stream will
-- | be `end`ed, and the pipe will noop if invoked again. -- | be `end`ed, and the pipe will noop if invoked again.
fromTransform :: forall a b m. MonadThrow Error m => MonadAff m => O.Transform a b -> Pipe (Maybe a) (Maybe b) m Unit fromTransform :: forall a b m. MonadThrow Error m => MonadAff m => O.Transform a b -> Pipe (Maybe a) (Maybe b) m Unit
fromTransform t = fromTransform t = do
{ error: errorST, cancel: removeErrorListener } <- liftEffect $ O.withErrorST t
let let
cleanup removeErrorListener = do maybeThrow = traverse_ throwError =<< liftEffect (liftST $ STRef.read errorST)
liftEffect $ O.end t
liftEffect $ removeErrorListener
fromReadable t
pure $ Done unit
yieldWhileReadable = do cleanup = do
flip tailRecM unit \_ -> do flip tailRecM unit $ const do
res <- liftEffect $ O.read t liftAff $ O.awaitReadableOrClosed t
case res of readEnded <- liftEffect $ O.isReadableEnded t
O.ReadJust a -> yield (Just a) $> Loop unit
_ -> pure $ Done unit
maybeYield1 = do
res <- liftEffect $ O.read t
case res of
O.ReadJust a -> yield $ Just a
_ -> pure unit
go { error, cancel } = do
err <- liftEffect $ liftST $ STRef.read error
for_ err throwError
needsDrain <- liftEffect $ O.needsDrain t
if needsDrain then do
liftAff $ delay $ wrap 0.0
yieldWhileReadable yieldWhileReadable
pure $ Loop { error, cancel } pure $ (if readEnded then Done else Loop) unit
else do
ma <- await liftAff $ O.awaitFinished t
case ma of maybeThrow
Nothing -> cleanup cancel liftEffect $ removeErrorListener
Just a' -> do yield Nothing
res <- liftEffect $ O.write t a'
case res of yieldWhileReadable = void $ whileJust $ maybeYield1
O.WriteClosed -> cleanup cancel
O.WriteOk -> do maybeYield1 = traverse (\a -> yield (Just a) $> Just unit) =<< O.maybeReadResult <$> liftEffect (O.read t)
maybeYield1
pure $ Loop { error, cancel } onEOS = liftEffect (O.end t) *> cleanup $> Done unit
O.WriteWouldBlock -> do onChunk a = liftEffect (O.write t a) $> Loop unit
yieldWhileReadable
pure $ Loop { error, cancel } go _ = do
in maybeThrow
do needsDrain <- liftEffect $ O.needsDrain t
r <- liftEffect $ O.withErrorST t ended <- liftEffect $ O.isWritableEnded t
tailRecM go r if needsDrain then do
yieldWhileReadable
liftAff $ O.awaitWritableOrClosed t
pure $ Loop unit
else if ended then
cleanup $> Done unit
else
await >>= maybe onEOS onChunk
tailRecM go unit
-- | Given a `Producer` of values, wrap them in `Just`. -- | Given a `Producer` of values, wrap them in `Just`.
-- | -- |
@@ -143,7 +137,7 @@ withEOS a = do
-- | Strip a pipeline of the EOS signal -- | Strip a pipeline of the EOS signal
unEOS :: forall a m. Monad m => Pipe (Maybe a) a m Unit unEOS :: forall a m. Monad m => Pipe (Maybe a) a m Unit
unEOS = P.mapFoldable identity unEOS = tailRecM (const $ maybe (pure $ Done unit) (\a -> yield a $> Loop unit) =<< await) unit
-- | Lift a `Pipe a a` to `Pipe (Maybe a) (Maybe a)`. -- | Lift a `Pipe a a` to `Pipe (Maybe a) (Maybe a)`.
-- | -- |

View File

@@ -18,7 +18,7 @@ import Pipes.Node.Stream (fromTransform)
fromZlib :: forall r m. MonadAff m => MonadThrow Error m => Effect (ZlibStream r) -> Pipe (Maybe Buffer) (Maybe Buffer) m Unit fromZlib :: forall r m. MonadAff m => MonadThrow Error m => Effect (ZlibStream r) -> Pipe (Maybe Buffer) (Maybe Buffer) m Unit
fromZlib z = do fromZlib z = do
raw <- liftEffect $ Zlib.toDuplex <$> z raw <- liftEffect $ Zlib.toDuplex <$> z
fromTransform $ O.fromBufferTransform raw fromTransform $ O.unsafeCoerceTransform raw
gzip :: forall m. MonadAff m => MonadThrow Error m => Pipe (Maybe Buffer) (Maybe Buffer) m Unit gzip :: forall m. MonadAff m => MonadThrow Error m => Pipe (Maybe Buffer) (Maybe Buffer) m Unit
gzip = fromZlib Zlib.createGzip gzip = fromZlib Zlib.createGzip

View File

@@ -86,9 +86,14 @@ chunked size = do
a <- MaybeT await a <- MaybeT await
chunkPut a chunkPut a
len <- lift chunkLength len <- lift chunkLength
when (len >= size) $ lift $ yield =<< Just <$> chunkTake when (len >= size) do
chunk <- lift chunkTake
lift $ yield $ Just chunk
len <- chunkLength len <- chunkLength
when (len > 0) $ yield =<< Just <$> chunkTake when (len > 0) do
chunk <- chunkTake
yield $ Just chunk
yield Nothing yield Nothing
-- | Equivalent of unix `uniq`, filtering out duplicate values passed to it. -- | Equivalent of unix `uniq`, filtering out duplicate values passed to it.

View File

@@ -33,11 +33,6 @@ spec = describe "Pipes.Node.FS" do
liftEffect $ FS.writeTextFile UTF8 p "foo" liftEffect $ FS.writeTextFile UTF8 p "foo"
s <- fold <$> Pipes.toListM (Pipes.Node.FS.read p >-> unEOS >-> Pipes.Node.Buffer.toString UTF8) s <- fold <$> Pipes.toListM (Pipes.Node.FS.read p >-> unEOS >-> Pipes.Node.Buffer.toString UTF8)
s `shouldEqual` "foo" s `shouldEqual` "foo"
around tmpFile $ it "fails if the file already exists" \p -> do
liftEffect $ FS.writeTextFile UTF8 p "foo"
flip catchError (const $ pure unit) do
Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.create p
fail "should have thrown"
describe "create" do describe "create" do
around tmpFile $ it "creates the file when not exists" \p -> do around tmpFile $ it "creates the file when not exists" \p -> do
Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.create p Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.create p
@@ -59,14 +54,14 @@ spec = describe "Pipes.Node.FS" do
Pipes.runEffect $ withEOS (yield "bar" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.append p Pipes.runEffect $ withEOS (yield "bar" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.append p
contents <- liftEffect $ FS.readTextFile UTF8 p contents <- liftEffect $ FS.readTextFile UTF8 p
contents `shouldEqual` "foo\nbar" contents `shouldEqual` "foo\nbar"
describe "truncate" do describe "trunc" do
around tmpFile $ it "creates the file when not exists" \p -> do around tmpFile $ it "creates the file when not exists" \p -> do
Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.truncate p Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.trunc p
contents <- liftEffect $ FS.readTextFile UTF8 p contents <- liftEffect $ FS.readTextFile UTF8 p
contents `shouldEqual` "foo" contents `shouldEqual` "foo"
around tmpFile $ it "overwrites contents" \p -> do around tmpFile $ it "overwrites contents" \p -> do
Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.truncate p Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.trunc p
Pipes.runEffect $ withEOS (yield "bar" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.truncate p Pipes.runEffect $ withEOS (yield "bar" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.trunc p
contents <- liftEffect $ FS.readTextFile UTF8 p contents <- liftEffect $ FS.readTextFile UTF8 p
contents `shouldEqual` "bar" contents `shouldEqual` "bar"
around tmpFiles $ it "json lines >-> parse >-> _.foo >-> write" \(a /\ b) -> do around tmpFiles $ it "json lines >-> parse >-> _.foo >-> write" \(a /\ b) -> do

View File

@@ -40,11 +40,11 @@ foreign import charsTransform :: Effect (O.Transform String String)
writer :: forall m. MonadEffect m => String -> m (O.Writable Buffer /\ Consumer (Maybe Buffer) Aff Unit) writer :: forall m. MonadEffect m => String -> m (O.Writable Buffer /\ Consumer (Maybe Buffer) Aff Unit)
writer a = do writer a = do
stream <- liftEffect $ O.fromBufferWritable <$> FS.Stream.createWriteStream a stream <- liftEffect $ O.unsafeCoerceWritable <$> FS.Stream.createWriteStream a
pure $ stream /\ S.fromWritable stream pure $ stream /\ S.fromWritable stream
reader :: forall m. MonadEffect m => String -> m (Producer (Maybe Buffer) Aff Unit) reader :: forall m. MonadEffect m => String -> m (Producer (Maybe Buffer) Aff Unit)
reader a = liftEffect $ S.fromReadable <$> O.fromBufferReadable <$> FS.Stream.createReadStream a reader a = liftEffect $ S.fromReadable <$> O.unsafeCoerceReadable <$> FS.Stream.createReadStream a
spec :: Spec Unit spec :: Spec Unit
spec = spec =
@@ -64,7 +64,7 @@ spec =
describe "Writable" $ around tmpFile do describe "Writable" $ around tmpFile do
describe "fs.WriteStream" do describe "fs.WriteStream" do
it "pipe to file" \p -> do it "pipe to file" \p -> do
stream <- O.fromBufferWritable <$> liftEffect (FS.Stream.createWriteStream p) stream <- O.unsafeCoerceWritable <$> liftEffect (FS.Stream.createWriteStream p)
let let
w = S.fromWritable stream w = S.fromWritable stream
source = do source = do
@@ -75,7 +75,7 @@ spec =
contents `shouldEqual` "hello" contents `shouldEqual` "hello"
shouldEqual true =<< liftEffect (O.isWritableEnded stream) shouldEqual true =<< liftEffect (O.isWritableEnded stream)
it "async pipe to file" \p -> do it "async pipe to file" \p -> do
w <- S.fromWritable <$> O.fromBufferWritable <$> liftEffect (FS.Stream.createWriteStream p) w <- S.fromWritable <$> O.unsafeCoerceWritable <$> liftEffect (FS.Stream.createWriteStream p)
let let
source = do source = do
yield "hello, " yield "hello, "
@@ -110,7 +110,7 @@ spec =
let let
json = yield $ writeJSON { foo: "bar" } json = yield $ writeJSON { foo: "bar" }
exp = "1f8b0800000000000003ab564acbcf57b2524a4a2c52aa0500eff52bfe0d000000" exp = "1f8b0800000000000003ab564acbcf57b2524a4a2c52aa0500eff52bfe0d000000"
gzip <- S.fromTransform <$> O.fromBufferTransform <$> liftEffect (Zlib.toDuplex <$> Zlib.createGzip) gzip <- S.fromTransform <$> O.unsafeCoerceTransform <$> liftEffect (Zlib.toDuplex <$> Zlib.createGzip)
outs :: List.List String <- Pipes.toListM (S.withEOS (json >-> Pipes.Buffer.fromString UTF8) >-> gzip >-> S.unEOS >-> Pipes.Buffer.toString Hex) outs :: List.List String <- Pipes.toListM (S.withEOS (json >-> Pipes.Buffer.fromString UTF8) >-> gzip >-> S.unEOS >-> Pipes.Buffer.toString Hex)
fold outs `shouldEqual` exp fold outs `shouldEqual` exp
around tmpFiles around tmpFiles
@@ -118,11 +118,11 @@ spec =
liftEffect $ FS.writeTextFile UTF8 a $ writeJSON [ 1, 2, 3, 4 ] liftEffect $ FS.writeTextFile UTF8 a $ writeJSON [ 1, 2, 3, 4 ]
areader <- liftEffect $ reader a areader <- liftEffect $ reader a
bwritestream /\ bwriter <- liftEffect $ writer b bwritestream /\ bwriter <- liftEffect $ writer b
gzip <- S.fromTransform <$> O.fromBufferTransform <$> liftEffect (Zlib.toDuplex <$> Zlib.createGzip) gzip <- S.fromTransform <$> O.unsafeCoerceTransform <$> liftEffect (Zlib.toDuplex <$> Zlib.createGzip)
runEffect $ areader >-> gzip >-> bwriter runEffect $ areader >-> gzip >-> bwriter
shouldEqual true =<< liftEffect (O.isWritableEnded bwritestream) shouldEqual true =<< liftEffect (O.isWritableEnded bwritestream)
gunzip <- S.fromTransform <$> O.fromBufferTransform <$> liftEffect (Zlib.toDuplex <$> Zlib.createGunzip) gunzip <- S.fromTransform <$> O.unsafeCoerceTransform <$> liftEffect (Zlib.toDuplex <$> Zlib.createGunzip)
breader <- liftEffect $ reader b breader <- liftEffect $ reader b
nums <- Pipes.toListM (breader >-> gunzip >-> S.unEOS >-> Pipes.Buffer.toString UTF8 >-> jsonParse @(Array Int) >-> Pipes.mapFoldable identity) nums <- Pipes.toListM (breader >-> gunzip >-> S.unEOS >-> Pipes.Buffer.toString UTF8 >-> jsonParse @(Array Int) >-> Pipes.mapFoldable identity)
Array.fromFoldable nums `shouldEqual` [ 1, 2, 3, 4 ] Array.fromFoldable nums `shouldEqual` [ 1, 2, 3, 4 ]