24 Commits

Author SHA1 Message Date
4cd44367a8 chore: prepare v1.6.1 2024-06-21 13:21:22 -05:00
d76f55e267 fix: introduced transform bug 2024-06-21 13:21:19 -05:00
4b91ab7d5c chore: prepare v1.6.0 2024-06-20 15:40:32 -05:00
a8702f4849 fix: finish may not emit until all chunks are read 2024-06-20 15:40:17 -05:00
f3ea830379 chore: prepare v1.5.0 2024-05-15 13:30:23 -05:00
4242330bef fix: write readme, clarify API 2024-05-15 13:30:08 -05:00
1b6e6423b1 chore: prepare v1.4.1 2024-05-14 15:02:39 -05:00
657af14bb6 fix: await finished before exiting 2024-05-14 14:07:20 -05:00
33d42034fc fix: continue to read after readableEnded 2024-05-14 13:51:57 -05:00
c8822aeffe fix: I WAS INCORRECT HHEHE 2024-05-14 13:44:33 -05:00
7076b13df4 fix: catch incorrect stream implementations 2024-05-14 13:42:39 -05:00
3f4bc12d36 chore: prepare v1.4.0 2024-05-14 13:12:56 -05:00
fd53b6520f feat: Collect.toBuffer 2024-05-14 13:12:41 -05:00
0ef7240d61 wip: explore removing delays(10) 2024-05-14 12:44:31 -05:00
de22f44f86 wip: explore removing delays(9) 2024-05-14 12:43:38 -05:00
f9c0e20777 wip: explore removing delays(8) 2024-05-14 11:08:46 -05:00
edc7d40dbc wip: explore removing delays(7) 2024-05-14 11:04:50 -05:00
ba8d90038d wip: explore removing delays(6) 2024-05-14 10:55:14 -05:00
dfdca9f5e9 wip: explore removing delays(5) 2024-05-14 10:39:06 -05:00
67ae171532 wip: explore removing delays(4) 2024-05-13 21:18:27 -05:00
a347c05062 wip: explore removing delays(3) 2024-05-13 21:17:28 -05:00
f9446c97a0 wip: explore removing delays(2) 2024-05-13 21:15:45 -05:00
d3b8d1792d wip: explore removing delays 2024-05-13 21:06:41 -05:00
e05c74f42f fix: minor fixes 2024-05-13 15:04:34 -05:00
14 changed files with 348 additions and 183 deletions

139
README.md
View File

@@ -2,41 +2,114 @@
Interact with node streams in object mode using [`Pipes`]!
## Example
```purescript
import Prelude
import Effect.Aff (launchAff_)
import Effect (Effect)
import Effect.Class (liftEffect)
import Effect.Console (log)
import Pipes.Prelude ((>->))
import Pipes.Prelude as Pipes
import Pipes.Core as Pipes.Core
import Pipes.Node.FS.Stream as FS
import Pipes.Node.Zlib as Zlib
import Pipes.CSV.Parse as CSV.Parse
-- == my-zipped-data.csv ==
-- id,foo,is_deleted
-- 1,hello,f
-- 2,goodbye,t
-- Logs:
-- {id: 1, foo: "hello", is_deleted: false}
-- {id: 2, foo: "goodbye", is_deleted: true}
main :: Effect Unit
main =
Pipes.Core.runEffect
$ FS.createReadStream "my-zipped-data.csv.gz"
>-> Zlib.gunzip
>-> CSV.Parse.parse @{id :: Int, foo :: String, is_deleted :: Boolean}
>-> Pipes.mapM (liftEffect <<< log)
```
## Installing
## Install
```bash
spago install node-stream-pipes
```
## Usage
### Node Streams
#### Raw Streams
Raw `objectMode` Node streams are represented in `Node.Stream.Object`:
- `Writable a` accepts chunks of type `a`
- `Readable a` emits chunks of type `a`
- `Transform a b` transforms chunks from `a` to `b`
Non-Object streams can also be represented with these types; for example an `fs.WriteStream`
can be coerced to `Writable Buffer` without issue.
Interop between these types and `Node.Stream` are provided in `Node.Stream.Object`:
- `unsafeFrom{String,Buffer}{Writable,Readable,Transform}`
- `unsafeCoerce{Writable,Readable,Transform}`
#### Pipes
Streams in `Node.Stream.Object` can be converted to `Producer`s, `Consumer`s and `Pipe`s with `Pipes.Node.Stream`:
- `fromReadable :: forall a. <Readable a> -> Producer (Maybe a) <Aff> Unit`
- `fromWritable :: forall a. <Writable a> -> Consumer (Maybe a) <Aff> Unit`
- `fromTransform :: forall a b. <Transform a b> -> Pipe (Maybe a) (Maybe b) <Aff> Unit`
#### EOS Marker
Normally, pipe computations will not be executed once any computation in a pipeline exits.
To allow for resource cleanup and awareness that the stream is about to close,
`Maybe a` is used occasionally in this package as an End-of-Stream marker:
```purescript
-- foo.txt is "hello, world!\n"
chunks <- Pipes.Collect.toArray $ Pipes.FS.read "foo.txt" >-> Pipes.Node.Stream.inEOS (Pipes.Buffer.toString UTF8)
chunks `shouldEqual` [Just "hello, world!\n", Nothing]
```
Pipes from `a -> b` unaware of EOS can be lifted to `Maybe a -> Maybe b` with `Pipes.Node.Stream.inEOS`.
Producers of `Maybe a` can drop the EOS marker and emit `a` with `Pipes.Node.Stream.unEOS`.
Producers of `a` can have an EOS marker added with `Pipes.Node.Stream.withEOS`.
#### Example
`Pipes.PassThrough.js`
```javascript
import {PassThrough} from 'stream'
export const makePassThrough = () => new PassThrough()
```
`Pipes.PassThrough.purs`
```purescript
module Pipes.PassThrough where
import Prelude
import Effect (Effect)
import Effect.Class (liftEffect)
import Effect.Aff (Aff)
import Pipes.Core (Pipe)
import Node.Stream.Object as ObjectStream
import Pipes.Node.Stream as Pipes.Node.Stream
type PassThroughStream a = ObjectStream.Transform a a
foreign import makeRaw :: Effect PassThroughStream
passThrough :: forall a. Pipe a a Aff Unit
passThrough = do
raw <- liftEffect $ makeRaw
Pipes.Node.Stream.fromTransform raw
```
### Utilities
This package provides utilities that explicitly use `MonadRec` to ensure stack-safety
when dealing with producers of large amounts of data.
- `Pipes.Collect` provides stack-safe utilities for executing a pipeline and collecting results into a collection, `Buffer`, `Monoid` etc.
- `Pipes.Construct` provides stack-safe utilities for creating producers from in-memory collections.
- `Pipes.Util` provides some miscellaneous utilities missing from `pipes`.
### Zlib
Pipes for compression & decompression using `zlib` are provided in `Pipes.Node.Zlib`.
### FS
Read files with:
- `Pipes.Node.FS.read <path>`
- `Pipes.Node.FS.read' <WriteStreamOptions> <path>`
```purescript
Pipes.Collect.toStringWith UTF8 $ Pipes.Node.FS.read "foo.txt" >-> Pipes.Stream.unEOS
```
Write files with:
- `Pipes.Node.FS.write' <WriteStreamOptions> <path>`
- `Pipes.Node.FS.trunc <path>`
- `Pipes.Node.FS.create <path>`
- `Pipes.Node.FS.append <path>`
```purescript
Pipes.Stream.withEOS (
Pipes.Construct.eachArray ["id,name", "1,henry", "2,suzie"]
>-> Pipes.Util.intersperse "\n"
>-> Pipes.Buffer.fromString UTF8
)
>-> Pipes.Node.FS.create "foo.csv"
```
[`Pipes`]: https://pursuit.purescript.org/packages/purescript-pipes/8.0.0

View File

@@ -1,6 +1,6 @@
{
"name": "purescript-node-stream-pipes",
"version": "v1.3.3",
"version": "v1.6.1",
"type": "module",
"dependencies": {
"csv-parse": "^5.5.5",

View File

@@ -13,7 +13,6 @@ workspace:
- lists: ">=7.0.0 <8.0.0"
- maybe: ">=6.0.0 <7.0.0"
- mmorph: ">=7.0.0 <8.0.0"
- newtype: ">=5.0.0 <6.0.0"
- node-buffer: ">=9.0.0 <10.0.0"
- node-event-emitter: ">=3.0.0 <4.0.0"
- node-fs: ">=9.1.0 <10.0.0"

View File

@@ -1,7 +1,7 @@
package:
name: node-stream-pipes
publish:
version: '1.3.3'
version: '1.6.1'
license: 'GPL-3.0-or-later'
location:
githubOwner: 'cakekindel'
@@ -20,7 +20,6 @@ package:
- lists: ">=7.0.0 <8.0.0"
- maybe: ">=6.0.0 <7.0.0"
- mmorph: ">=7.0.0 <8.0.0"
- newtype: ">=5.0.0 <6.0.0"
- node-buffer: ">=9.0.0 <10.0.0"
- node-event-emitter: ">=3.0.0 <4.0.0"
- node-fs: ">=9.1.0 <10.0.0"

View File

@@ -3,31 +3,36 @@ import Stream from "stream";
/** @type {(s: Stream.Readable | Stream.Transform) => () => boolean} */
export const isReadableImpl = (s) => () => s.readable;
/** @type {(s: Stream.Readable | Stream.Transform) => () => number} */
export const readableLengthImpl = (s) => () => s.readableLength;
/** @type {(s: Stream.Writable | Stream.Readable) => () => boolean} */
export const isClosedImpl = (s) => () => s.closed;
/** @type {(s: Stream.Writable | Stream.Transform) => () => boolean} */
export const isWritableImpl = (s) => () => s.writable;
/** @type {(s: Stream.Writable | Stream.Transform) => () => boolean} */
export const needsDrainImpl = (s) => () => s.writableNeedDrain;
/** @type {(s: Stream.Readable | Stream.Transform) => () => boolean} */
export const isReadableEndedImpl = (s) => () => s.readableEnded;
/** @type {(s: Stream.Writable | Stream.Transform) => () => boolean} */
export const isWritableEndedImpl = (s) => () => s.writableEnded;
/** @type {(s: Stream.Writable | Stream.Transform) => () => boolean} */
export const isWritableFinishedImpl = (s) => () => s.writableFinished;
/** @type {(s: Stream.Writable | Stream.Transform) => () => void} */
export const endImpl = (s) => () => s.end();
/** @type {<WriteResult>(o: {ok: WriteResult, wouldBlock: WriteResult, closed: WriteResult}) => (s: Stream.Writable | Stream.Transform) => (a: unknown) => () => WriteResult} */
/** @type {<WriteResult>(o: {ok: WriteResult, wouldBlock: WriteResult}) => (s: Stream.Writable | Stream.Transform) => (a: unknown) => () => WriteResult} */
export const writeImpl =
({ ok, wouldBlock, closed }) =>
({ ok, wouldBlock }) =>
(s) =>
(a) =>
() => {
if (s.closed || s.writableEnded) {
return closed;
}
if (s.write(a)) {
return ok;
} else {
@@ -35,15 +40,11 @@ export const writeImpl =
}
};
/** @type {<ReadResult>(o: {just: (_a: unknown) => ReadResult, wouldBlock: ReadResult, closed: ReadResult}) => (s: Stream.Readable | Stream.Transform) => () => ReadResult} */
/** @type {<ReadResult>(o: {just: (_a: unknown) => ReadResult, wouldBlock: ReadResult}) => (s: Stream.Readable | Stream.Transform) => () => ReadResult} */
export const readImpl =
({ just, closed, wouldBlock }) =>
({ just, wouldBlock }) =>
(s) =>
() => {
if (s.closed || s.readableEnded) {
return closed;
}
const a = s.read();
if (a === null) {
return wouldBlock;

View File

@@ -14,8 +14,9 @@ import Data.Maybe (Maybe(..))
import Data.Show.Generic (genericShow)
import Effect (Effect)
import Effect.Aff (Aff, effectCanceler, makeAff)
import Effect.Aff as Aff
import Effect.Class (liftEffect)
import Effect.Exception (Error)
import Effect.Exception (Error, error)
import Effect.Uncurried (mkEffectFn1)
import Node.Buffer (Buffer)
import Node.EventEmitter (EventHandle(..))
@@ -26,7 +27,6 @@ import Unsafe.Coerce (unsafeCoerce)
data ReadResult a
= ReadWouldBlock
| ReadClosed
| ReadJust a
derive instance Generic (ReadResult a) _
@@ -35,9 +35,12 @@ derive instance Eq a => Eq (ReadResult a)
instance Show (ReadResult a) where
show = genericShow <<< map (const "..")
maybeReadResult :: forall a. ReadResult a -> Maybe a
maybeReadResult (ReadWouldBlock) = Nothing
maybeReadResult (ReadJust a) = Just a
data WriteResult
= WriteWouldBlock
| WriteClosed
| WriteOk
derive instance Generic WriteResult _
@@ -45,8 +48,8 @@ derive instance Eq WriteResult
instance Show WriteResult where
show = genericShow
type ReadResultFFI a = { closed :: ReadResult a, wouldBlock :: ReadResult a, just :: a -> ReadResult a }
type WriteResultFFI = { closed :: WriteResult, wouldBlock :: WriteResult, ok :: WriteResult }
type ReadResultFFI a = { wouldBlock :: ReadResult a, just :: a -> ReadResult a }
type WriteResultFFI = { wouldBlock :: WriteResult, ok :: WriteResult }
foreign import data Writable :: Type -> Type
foreign import data Readable :: Type -> Type
@@ -59,13 +62,16 @@ foreign import isReadableImpl :: forall s. s -> Effect Boolean
foreign import isWritableImpl :: forall s. s -> Effect Boolean
foreign import isReadableEndedImpl :: forall s. s -> Effect Boolean
foreign import isWritableEndedImpl :: forall s. s -> Effect Boolean
foreign import isWritableFinishedImpl :: forall s. s -> Effect Boolean
foreign import isClosedImpl :: forall s. s -> Effect Boolean
foreign import needsDrainImpl :: forall s. s -> Effect Boolean
foreign import readableLengthImpl :: forall s. s -> Effect Int
readResultFFI :: forall a. ReadResultFFI a
readResultFFI = { closed: ReadClosed, wouldBlock: ReadWouldBlock, just: ReadJust }
readResultFFI = { wouldBlock: ReadWouldBlock, just: ReadJust }
writeResultFFI :: WriteResultFFI
writeResultFFI = { closed: WriteClosed, wouldBlock: WriteWouldBlock, ok: WriteOk }
writeResultFFI = { wouldBlock: WriteWouldBlock, ok: WriteOk }
class Stream :: Type -> Constraint
class Stream s where
@@ -81,25 +87,31 @@ else instance Stream s => Stream s where
isClosed s = isClosed s
class Stream s <= Read s a | s -> a where
readableLength :: s -> Effect Int
isReadable :: s -> Effect Boolean
isReadableEnded :: s -> Effect Boolean
read :: s -> Effect (ReadResult a)
class Stream s <= Write s a | s -> a where
isWritable :: s -> Effect Boolean
needsDrain :: s -> Effect Boolean
isWritableEnded :: s -> Effect Boolean
isWritableFinished :: s -> Effect Boolean
write :: s -> a -> Effect WriteResult
end :: s -> Effect Unit
instance Read (Readable a) a where
readableLength = readableLengthImpl
isReadable = isReadableImpl
isReadableEnded = isReadableEndedImpl
read = readImpl readResultFFI
else instance Read (Transform a b) b where
readableLength = readableLengthImpl
isReadable = isReadableImpl
isReadableEnded = isReadableEndedImpl
read = readImpl readResultFFI
else instance (Read s a) => Read s a where
readableLength = readableLengthImpl
isReadable = isReadableImpl
isReadableEnded = isReadableEndedImpl
read s = read s
@@ -107,18 +119,24 @@ else instance (Read s a) => Read s a where
instance Write (Writable a) a where
isWritable = isWritableImpl
isWritableEnded = isWritableEndedImpl
isWritableFinished = isWritableFinishedImpl
write s = writeImpl writeResultFFI s
end = endImpl
needsDrain = needsDrainImpl
else instance Write (Transform a b) a where
isWritable = isWritableImpl
isWritableEnded = isWritableEndedImpl
isWritableFinished = isWritableFinishedImpl
write s = writeImpl writeResultFFI s
end = endImpl
needsDrain = needsDrainImpl
else instance (Write s a) => Write s a where
isWritable = isWritableImpl
isWritableEnded = isWritableEndedImpl
isWritableFinished = isWritableFinishedImpl
write s a = write s a
end s = end s
needsDrain = needsDrainImpl
withErrorST :: forall s. Stream s => s -> Effect { cancel :: Effect Unit, error :: STRef Global (Maybe Error) }
withErrorST s = do
@@ -126,42 +144,74 @@ withErrorST s = do
cancel <- flip (Event.once errorH) s \e -> void $ liftST $ STRef.write (Just e) error
pure { error, cancel }
fromBufferReadable :: forall r. Stream.Readable r -> Readable Buffer
fromBufferReadable = unsafeCoerce
unsafeCoerceWritable :: forall r a. Stream.Writable r -> Writable a
unsafeCoerceWritable = unsafeCoerce
fromBufferTransform :: Stream.Duplex -> Transform Buffer Buffer
fromBufferTransform = unsafeCoerce
unsafeCoerceReadable :: forall r a. Stream.Readable r -> Readable a
unsafeCoerceReadable = unsafeCoerce
fromBufferWritable :: forall r. Stream.Writable r -> Writable Buffer
fromBufferWritable = unsafeCoerce
unsafeCoerceTransform :: forall a b. Stream.Duplex -> Transform a b
unsafeCoerceTransform = unsafeCoerce
fromStringReadable :: forall r. Stream.Readable r -> Readable String
fromStringReadable = unsafeCoerce
unsafeFromBufferReadable :: forall r. Stream.Readable r -> Readable Buffer
unsafeFromBufferReadable = unsafeCoerce
fromStringTransform :: Stream.Duplex -> Transform String String
fromStringTransform = unsafeCoerce
unsafeFromBufferTransform :: forall a. Stream.Duplex -> Transform Buffer a
unsafeFromBufferTransform = unsafeCoerce
fromStringWritable :: forall r. Stream.Writable r -> Writable String
fromStringWritable = unsafeCoerce
unsafeFromBufferWritable :: forall r. Stream.Writable r -> Writable Buffer
unsafeFromBufferWritable = unsafeCoerce
unsafeFromStringReadable :: forall r. Stream.Readable r -> Readable String
unsafeFromStringReadable = unsafeCoerce
unsafeFromStringTransform :: forall a. Stream.Duplex -> Transform String a
unsafeFromStringTransform = unsafeCoerce
unsafeFromStringWritable :: forall r. Stream.Writable r -> Writable String
unsafeFromStringWritable = unsafeCoerce
awaitReadableOrClosed :: forall s a. Read s a => s -> Aff Unit
awaitReadableOrClosed s = do
awaitReadableOrClosed s = Aff.supervise do
fiber <-
Aff.forkAff
$ parOneOf
[ onceAff0 readableH s $> Right unit
, onceAff0 closeH s $> Right unit
, Left <$> onceAff1 errorH s
]
closed <- liftEffect $ isClosed s
ended <- liftEffect $ isReadableEnded s
readEnded <- liftEffect $ isReadableEnded s
readable <- liftEffect $ isReadable s
when (not ended && not closed && not readable)
$ liftEither =<< parOneOf [ onceAff0 readableH s $> Right unit, onceAff0 closeH s $> Right unit, Left <$> onceAff1 errorH s ]
length <- liftEffect $ readableLength s
if (not closed && not readEnded && readable && length == 0) then
liftEither =<< Aff.joinFiber fiber
else
Aff.killFiber (error "") fiber
awaitFinished :: forall s a. Write s a => s -> Aff Unit
awaitFinished s = onceAff0 finishH s
awaitFinished s = Aff.supervise do
fiber <- Aff.forkAff $ onceAff0 finishH s
finished <- liftEffect $ isWritableFinished s
if not finished then Aff.joinFiber fiber else Aff.killFiber (error "") fiber
awaitWritableOrClosed :: forall s a. Write s a => s -> Aff Unit
awaitWritableOrClosed s = do
awaitWritableOrClosed s = Aff.supervise do
fiber <-
Aff.forkAff
$ parOneOf
[ onceAff0 drainH s $> Right unit
, onceAff0 closeH s $> Right unit
, Left <$> onceAff1 errorH s
]
closed <- liftEffect $ isClosed s
ended <- liftEffect $ isWritableEnded s
writeEnded <- liftEffect $ isWritableEnded s
writable <- liftEffect $ isWritable s
when (not ended && not closed && not writable)
$ liftEither =<< parOneOf [ onceAff0 drainH s $> Right unit, onceAff0 closeH s $> Right unit, Left <$> onceAff1 errorH s ]
needsDrain <- liftEffect $ needsDrain s
if not closed && not writeEnded && writable && needsDrain then
liftEither =<< Aff.joinFiber fiber
else
Aff.killFiber (error "") fiber
onceAff0 :: forall e. EventHandle0 e -> e -> Aff Unit
onceAff0 h emitter = makeAff \res -> do

View File

@@ -1 +0,0 @@
module Pipes.CSV.Parse where

View File

@@ -12,11 +12,15 @@ import Data.List (List)
import Data.List as List
import Data.Map (Map)
import Data.Map as Map
import Data.Maybe (Maybe(..), maybe)
import Data.Tuple.Nested (type (/\), (/\))
import Effect.Class (class MonadEffect, liftEffect)
import Foreign.Object (Object)
import Foreign.Object.ST as Object.ST
import Foreign.Object.ST.Unsafe as Object.ST.Unsafe
import Node.Buffer (Buffer)
import Node.Buffer as Buffer
import Node.Encoding (Encoding)
import Pipes.Core (Producer)
import Pipes.Internal (Proxy(..))
@@ -27,8 +31,12 @@ traverse :: forall a b m. MonadRec m => (b -> a -> m b) -> b -> Producer a m Uni
traverse f b0 p0 =
flip tailRecM (p0 /\ b0) \(p /\ b) ->
case p of
Respond a m -> Loop <$> (m unit /\ _) <$> f b a
M m -> Loop <$> (_ /\ b) <$> m
Respond a m -> do
b' <- f b a
pure $ Loop $ m unit /\ b'
M m -> do
n <- m
pure $ Loop $ (n /\ b)
Request _ _ -> pure $ Done b
Pure _ -> pure $ Done b
@@ -44,6 +52,30 @@ fold f b0 p0 = traverse (\b a -> pure $ f b a) b0 p0
foreach :: forall a m. MonadRec m => (a -> m Unit) -> Producer a m Unit -> m Unit
foreach f p0 = traverse (\_ a -> f a) unit p0
-- | `append` all emitted values to `mempty`
toMonoid :: forall a m. Monoid a => MonadRec m => MonadEffect m => Producer a m Unit -> m a
toMonoid = fold (<>) mempty
-- | Concatenate all buffers to a single buffer, then decode with the
-- | provided encoding.
toStringWith :: forall m. MonadRec m => MonadEffect m => Encoding -> Producer Buffer m Unit -> m String
toStringWith enc = (liftEffect <<< Buffer.toString enc) <=< toBuffer
-- | Concatenate all produced buffers to a single buffer
toBuffer :: forall m. MonadRec m => MonadEffect m => Producer Buffer m Unit -> m Buffer
toBuffer p =
(liftEffect <<< maybe (Buffer.alloc 0) pure)
=<< traverse
( flip \b ->
case _ of
Just acc -> do
new <- liftEffect $ Buffer.concat [ acc, b ]
pure $ Just new
_ -> pure $ Just b
)
Nothing
p
-- | Collect all values from a `Producer` into an array.
toArray :: forall a m. MonadRec m => MonadEffect m => Producer a m Unit -> m (Array a)
toArray p = do
@@ -52,9 +84,18 @@ toArray p = do
liftEffect $ liftST $ Array.ST.unsafeFreeze st
-- | Collect all values from a `Producer` into a list.
-- |
-- | Reverses the list after collecting, so that values will be
-- | in the order they were emitted.
toList :: forall a m. MonadRec m => MonadEffect m => Producer a m Unit -> m (List a)
toList = map List.reverse <<< fold (flip List.Cons) List.Nil
-- | Collect all values from a `Producer` into a list.
-- |
-- | Does not reverse the list after collecting.
toListRev :: forall a m. MonadRec m => MonadEffect m => Producer a m Unit -> m (List a)
toListRev = map List.reverse <<< fold (flip List.Cons) List.Nil
-- | Collect all values from a `Producer` into a Javascript Object.
toObject :: forall a m. MonadRec m => MonadEffect m => Producer (String /\ a) m Unit -> m (Object a)
toObject p = do

View File

@@ -23,7 +23,7 @@ import Prim.Row (class Union)
-- |
-- | See `Pipes.Node.Stream.withEOS` for converting `Producer a`
-- | into `Producer (Maybe a)`, emitting `Nothing` before exiting.
write
write'
:: forall r trash m
. Union r trash WriteStreamOptions
=> MonadAff m
@@ -31,27 +31,27 @@ write
=> Record r
-> FilePath
-> Consumer (Maybe Buffer) m Unit
write o p = do
write' o p = do
w <- liftEffect $ FS.Stream.createWriteStream' p o
fromWritable $ O.fromBufferWritable w
fromWritable $ O.unsafeCoerceWritable w
-- | Open a file in write mode, failing if the file already exists.
-- |
-- | `write {flags: "wx"}`
-- | `write' {flags: "wx"}`
create :: forall m. MonadAff m => MonadThrow Error m => FilePath -> Consumer (Maybe Buffer) m Unit
create = write { flags: "wx" }
create = write' { flags: "wx" }
-- | Open a file in write mode, truncating it if the file already exists.
-- |
-- | `write {flags: "w"}`
truncate :: forall m. MonadAff m => MonadThrow Error m => FilePath -> Consumer (Maybe Buffer) m Unit
truncate = write { flags: "w" }
-- | `write' {flags: "w"}`
trunc :: forall m. MonadAff m => MonadThrow Error m => FilePath -> Consumer (Maybe Buffer) m Unit
trunc = write' { flags: "w" }
-- | Open a file in write mode, appending written contents if the file already exists.
-- |
-- | `write {flags: "a"}`
-- | `write' {flags: "a"}`
append :: forall m. MonadAff m => MonadThrow Error m => FilePath -> Consumer (Maybe Buffer) m Unit
append = write { flags: "a" }
append = write' { flags: "a" }
-- | Creates a `fs.Readable` stream for the file at the given path.
-- |
@@ -60,7 +60,7 @@ append = write { flags: "a" }
read :: forall m. MonadAff m => MonadThrow Error m => FilePath -> Producer (Maybe Buffer) m Unit
read p = do
r <- liftEffect $ FS.Stream.createReadStream p
fromReadable $ O.fromBufferReadable r
fromReadable $ O.unsafeCoerceReadable r
-- | Creates a `fs.Readable` stream for the file at the given path.
-- |
@@ -76,4 +76,4 @@ read'
-> Producer (Maybe Buffer) m Unit
read' opts p = do
r <- liftEffect $ FS.Stream.createReadStream' p opts
fromReadable $ O.fromBufferReadable r
fromReadable $ O.unsafeCoerceReadable r

View File

@@ -1,17 +1,16 @@
module Pipes.Node.Stream where
import Prelude
import Prelude hiding (join)
import Control.Monad.Error.Class (class MonadThrow, throwError)
import Control.Monad.Rec.Class (class MonadRec, Step(..), tailRecM)
import Control.Monad.Rec.Class (class MonadRec, Step(..), tailRecM, whileJust)
import Control.Monad.ST.Class (liftST)
import Control.Monad.ST.Ref as STRef
import Control.Monad.Trans.Class (lift)
import Data.Maybe (Maybe(..))
import Data.Newtype (wrap)
import Data.Traversable (for_)
import Control.Parallel (parOneOf)
import Data.Maybe (Maybe(..), maybe)
import Data.Traversable (for_, traverse, traverse_)
import Data.Tuple.Nested ((/\))
import Effect.Aff (delay)
import Effect.Aff.Class (class MonadAff, liftAff)
import Effect.Class (liftEffect)
import Effect.Exception (Error)
@@ -19,7 +18,6 @@ import Node.Stream.Object as O
import Pipes (await, yield)
import Pipes (for) as P
import Pipes.Core (Consumer, Pipe, Producer, Producer_)
import Pipes.Prelude (mapFoldable) as P
import Pipes.Util (InvokeResult(..), invoke)
-- | Convert a `Readable` stream to a `Pipe`.
@@ -34,15 +32,19 @@ fromReadable r =
pure $ Done unit
go { error, cancel } = do
liftAff $ delay $ wrap 0.0
err <- liftEffect $ liftST $ STRef.read error
for_ err throwError
res <- liftEffect $ O.read r
case res of
O.ReadJust a -> yield (Just a) $> Loop { error, cancel }
O.ReadWouldBlock -> liftAff (O.awaitReadableOrClosed r) $> Loop { error, cancel }
O.ReadClosed -> yield Nothing *> cleanup cancel
O.ReadWouldBlock -> do
ended <- liftEffect $ O.isReadableEnded r
if ended then do
yield Nothing
cleanup cancel
else
liftAff (O.awaitReadableOrClosed r) $> Loop { error, cancel }
in
do
e <- liftEffect $ O.withErrorST r
@@ -53,78 +55,78 @@ fromReadable r =
-- | When `Nothing` is piped to this, the stream will
-- | be `end`ed, and the pipe will noop if invoked again.
fromWritable :: forall s a m. MonadThrow Error m => MonadAff m => O.Write s a => s -> Consumer (Maybe a) m Unit
fromWritable w =
fromWritable w = do
{ error: errorST, cancel: removeErrorListener } <- liftEffect $ O.withErrorST w
let
cleanup rmErrorListener = do
liftEffect rmErrorListener
liftEffect $ O.end w
maybeThrow = traverse_ throwError =<< liftEffect (liftST $ STRef.read errorST)
waitCanWrite = do
shouldWait <- liftEffect $ O.needsDrain w
when shouldWait $ liftAff $ O.awaitWritableOrClosed w
cleanup = do
liftAff $ O.awaitFinished w
pure $ Done unit
maybeThrow
liftEffect removeErrorListener
go { error, cancel } = do
liftAff $ delay $ wrap 0.0
err <- liftEffect $ liftST $ STRef.read error
for_ err throwError
onEOS = liftEffect (O.end w) *> cleanup $> Done unit
onChunk a = liftEffect (O.write w a) $> Loop unit
ma <- await
case ma of
Nothing -> cleanup cancel
Just a -> do
res <- liftEffect $ O.write w a
case res of
O.WriteOk -> pure $ Loop { error, cancel }
O.WriteWouldBlock -> do
liftAff (O.awaitWritableOrClosed w)
pure $ Loop { error, cancel }
O.WriteClosed -> cleanup cancel
in
do
r <- liftEffect $ O.withErrorST w
tailRecM go r
go _ = do
maybeThrow
waitCanWrite
ended <- liftEffect $ O.isWritableEnded w
if ended then
cleanup $> Done unit
else
await >>= maybe onEOS onChunk
tailRecM go unit
-- | Convert a `Transform` stream to a `Pipe`.
-- |
-- | When `Nothing` is piped to this, the `Transform` stream will
-- | be `end`ed, and the pipe will noop if invoked again.
fromTransform :: forall a b m. MonadThrow Error m => MonadAff m => O.Transform a b -> Pipe (Maybe a) (Maybe b) m Unit
fromTransform t =
fromTransform t = do
{ error: errorST, cancel: removeErrorListener } <- liftEffect $ O.withErrorST t
let
cleanup removeErrorListener = do
liftEffect $ O.end t
liftEffect $ removeErrorListener
fromReadable t
pure $ Done unit
yieldFromReadableHalf =
flip tailRecM unit $ const do
res <- liftEffect (O.read t)
case res of
O.ReadJust a -> do
yield $ Just a
pure $ Loop unit
O.ReadWouldBlock -> pure $ Done unit
O.ReadClosed -> yield Nothing $> Done unit
go { error, cancel } = do
liftAff $ delay $ wrap 0.0
err <- liftEffect $ liftST $ STRef.read error
for_ err throwError
maybeThrow = traverse_ throwError =<< liftEffect (liftST $ STRef.read errorST)
yieldFromReadableHalf
ma <- await
case ma of
Nothing -> cleanup cancel
Just a' -> do
res <- liftEffect $ O.write t a'
yieldFromReadableHalf
case res of
O.WriteClosed -> cleanup cancel
O.WriteOk -> pure $ Loop { error, cancel }
O.WriteWouldBlock -> do
liftAff $ O.awaitWritableOrClosed t
pure $ Loop { error, cancel }
in
do
r <- liftEffect $ O.withErrorST t
tailRecM go r
cleanup = do
flip tailRecM unit $ const do
liftAff $ O.awaitReadableOrClosed t
readEnded <- liftEffect $ O.isReadableEnded t
yieldWhileReadable
pure $ (if readEnded then Done else Loop) unit
liftAff $ O.awaitFinished t
maybeThrow
liftEffect $ removeErrorListener
yield Nothing
yieldWhileReadable = void $ whileJust $ maybeYield1
maybeYield1 = traverse (\a -> yield (Just a) $> Just unit) =<< O.maybeReadResult <$> liftEffect (O.read t)
onEOS = liftEffect (O.end t) *> cleanup $> Done unit
onChunk a = liftEffect (O.write t a) $> Loop unit
go _ = do
maybeThrow
needsDrain <- liftEffect $ O.needsDrain t
ended <- liftEffect $ O.isWritableEnded t
if needsDrain then do
yieldWhileReadable
liftAff $ parOneOf [O.awaitWritableOrClosed t, O.awaitReadableOrClosed t]
pure $ Loop unit
else if ended then
cleanup $> Done unit
else
await >>= maybe onEOS onChunk
tailRecM go unit
-- | Given a `Producer` of values, wrap them in `Just`.
-- |
@@ -136,7 +138,7 @@ withEOS a = do
-- | Strip a pipeline of the EOS signal
unEOS :: forall a m. Monad m => Pipe (Maybe a) a m Unit
unEOS = P.mapFoldable identity
unEOS = tailRecM (const $ maybe (pure $ Done unit) (\a -> yield a $> Loop unit) =<< await) unit
-- | Lift a `Pipe a a` to `Pipe (Maybe a) (Maybe a)`.
-- |

View File

@@ -18,7 +18,7 @@ import Pipes.Node.Stream (fromTransform)
fromZlib :: forall r m. MonadAff m => MonadThrow Error m => Effect (ZlibStream r) -> Pipe (Maybe Buffer) (Maybe Buffer) m Unit
fromZlib z = do
raw <- liftEffect $ Zlib.toDuplex <$> z
fromTransform $ O.fromBufferTransform raw
fromTransform $ O.unsafeCoerceTransform raw
gzip :: forall m. MonadAff m => MonadThrow Error m => Pipe (Maybe Buffer) (Maybe Buffer) m Unit
gzip = fromZlib Zlib.createGzip

View File

@@ -85,9 +85,15 @@ chunked size = do
Rec.whileJust $ runMaybeT do
a <- MaybeT await
chunkPut a
len <- chunkLength
when (len >= size) $ lift $ yield =<< Just <$> chunkTake
yield =<< Just <$> chunkTake
len <- lift chunkLength
when (len >= size) do
chunk <- lift chunkTake
lift $ yield $ Just chunk
len <- chunkLength
when (len > 0) do
chunk <- chunkTake
yield $ Just chunk
yield Nothing
-- | Equivalent of unix `uniq`, filtering out duplicate values passed to it.

View File

@@ -33,11 +33,6 @@ spec = describe "Pipes.Node.FS" do
liftEffect $ FS.writeTextFile UTF8 p "foo"
s <- fold <$> Pipes.toListM (Pipes.Node.FS.read p >-> unEOS >-> Pipes.Node.Buffer.toString UTF8)
s `shouldEqual` "foo"
around tmpFile $ it "fails if the file already exists" \p -> do
liftEffect $ FS.writeTextFile UTF8 p "foo"
flip catchError (const $ pure unit) do
Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.create p
fail "should have thrown"
describe "create" do
around tmpFile $ it "creates the file when not exists" \p -> do
Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.create p
@@ -59,14 +54,14 @@ spec = describe "Pipes.Node.FS" do
Pipes.runEffect $ withEOS (yield "bar" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.append p
contents <- liftEffect $ FS.readTextFile UTF8 p
contents `shouldEqual` "foo\nbar"
describe "truncate" do
describe "trunc" do
around tmpFile $ it "creates the file when not exists" \p -> do
Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.truncate p
Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.trunc p
contents <- liftEffect $ FS.readTextFile UTF8 p
contents `shouldEqual` "foo"
around tmpFile $ it "overwrites contents" \p -> do
Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.truncate p
Pipes.runEffect $ withEOS (yield "bar" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.truncate p
Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.trunc p
Pipes.runEffect $ withEOS (yield "bar" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.trunc p
contents <- liftEffect $ FS.readTextFile UTF8 p
contents `shouldEqual` "bar"
around tmpFiles $ it "json lines >-> parse >-> _.foo >-> write" \(a /\ b) -> do

View File

@@ -40,11 +40,11 @@ foreign import charsTransform :: Effect (O.Transform String String)
writer :: forall m. MonadEffect m => String -> m (O.Writable Buffer /\ Consumer (Maybe Buffer) Aff Unit)
writer a = do
stream <- liftEffect $ O.fromBufferWritable <$> FS.Stream.createWriteStream a
stream <- liftEffect $ O.unsafeCoerceWritable <$> FS.Stream.createWriteStream a
pure $ stream /\ S.fromWritable stream
reader :: forall m. MonadEffect m => String -> m (Producer (Maybe Buffer) Aff Unit)
reader a = liftEffect $ S.fromReadable <$> O.fromBufferReadable <$> FS.Stream.createReadStream a
reader a = liftEffect $ S.fromReadable <$> O.unsafeCoerceReadable <$> FS.Stream.createReadStream a
spec :: Spec Unit
spec =
@@ -64,7 +64,7 @@ spec =
describe "Writable" $ around tmpFile do
describe "fs.WriteStream" do
it "pipe to file" \p -> do
stream <- O.fromBufferWritable <$> liftEffect (FS.Stream.createWriteStream p)
stream <- O.unsafeCoerceWritable <$> liftEffect (FS.Stream.createWriteStream p)
let
w = S.fromWritable stream
source = do
@@ -75,7 +75,7 @@ spec =
contents `shouldEqual` "hello"
shouldEqual true =<< liftEffect (O.isWritableEnded stream)
it "async pipe to file" \p -> do
w <- S.fromWritable <$> O.fromBufferWritable <$> liftEffect (FS.Stream.createWriteStream p)
w <- S.fromWritable <$> O.unsafeCoerceWritable <$> liftEffect (FS.Stream.createWriteStream p)
let
source = do
yield "hello, "
@@ -110,7 +110,7 @@ spec =
let
json = yield $ writeJSON { foo: "bar" }
exp = "1f8b0800000000000003ab564acbcf57b2524a4a2c52aa0500eff52bfe0d000000"
gzip <- S.fromTransform <$> O.fromBufferTransform <$> liftEffect (Zlib.toDuplex <$> Zlib.createGzip)
gzip <- S.fromTransform <$> O.unsafeCoerceTransform <$> liftEffect (Zlib.toDuplex <$> Zlib.createGzip)
outs :: List.List String <- Pipes.toListM (S.withEOS (json >-> Pipes.Buffer.fromString UTF8) >-> gzip >-> S.unEOS >-> Pipes.Buffer.toString Hex)
fold outs `shouldEqual` exp
around tmpFiles
@@ -118,11 +118,11 @@ spec =
liftEffect $ FS.writeTextFile UTF8 a $ writeJSON [ 1, 2, 3, 4 ]
areader <- liftEffect $ reader a
bwritestream /\ bwriter <- liftEffect $ writer b
gzip <- S.fromTransform <$> O.fromBufferTransform <$> liftEffect (Zlib.toDuplex <$> Zlib.createGzip)
gzip <- S.fromTransform <$> O.unsafeCoerceTransform <$> liftEffect (Zlib.toDuplex <$> Zlib.createGzip)
runEffect $ areader >-> gzip >-> bwriter
shouldEqual true =<< liftEffect (O.isWritableEnded bwritestream)
gunzip <- S.fromTransform <$> O.fromBufferTransform <$> liftEffect (Zlib.toDuplex <$> Zlib.createGunzip)
gunzip <- S.fromTransform <$> O.unsafeCoerceTransform <$> liftEffect (Zlib.toDuplex <$> Zlib.createGunzip)
breader <- liftEffect $ reader b
nums <- Pipes.toListM (breader >-> gunzip >-> S.unEOS >-> Pipes.Buffer.toString UTF8 >-> jsonParse @(Array Int) >-> Pipes.mapFoldable identity)
Array.fromFoldable nums `shouldEqual` [ 1, 2, 3, 4 ]