52 Commits

Author SHA1 Message Date
cda67508d4 chore: prepare v2.1.0 2024-06-23 18:34:01 -05:00
860ace3990 feat: AsyncPipe really should be a monad 2024-06-23 18:33:45 -05:00
7f11c303fb chore: prepare v2.0.2 2024-06-22 19:33:56 -05:00
2e0be4ac62 fix: loop bug 2024-06-22 19:33:45 -05:00
0ba315ede0 chore: prepare v2.0.1 2024-06-22 19:12:56 -05:00
08bd9a817a fix: asyncpipe is profunctor 2024-06-22 19:12:31 -05:00
970d890a00 chore: prepare v2.0.0 2024-06-22 18:42:49 -05:00
5b3eda707e feat!: add AsyncPipe abstraction, significantly improve throughput of Transform streams 2024-06-22 18:42:22 -05:00
4cd44367a8 chore: prepare v1.6.1 2024-06-21 13:21:22 -05:00
d76f55e267 fix: introduced transform bug 2024-06-21 13:21:19 -05:00
4b91ab7d5c chore: prepare v1.6.0 2024-06-20 15:40:32 -05:00
a8702f4849 fix: finish may not emit until all chunks are read 2024-06-20 15:40:17 -05:00
f3ea830379 chore: prepare v1.5.0 2024-05-15 13:30:23 -05:00
4242330bef fix: write readme, clarify API 2024-05-15 13:30:08 -05:00
1b6e6423b1 chore: prepare v1.4.1 2024-05-14 15:02:39 -05:00
657af14bb6 fix: await finished before exiting 2024-05-14 14:07:20 -05:00
33d42034fc fix: continue to read after readableEnded 2024-05-14 13:51:57 -05:00
c8822aeffe fix: I WAS INCORRECT HHEHE 2024-05-14 13:44:33 -05:00
7076b13df4 fix: catch incorrect stream implementations 2024-05-14 13:42:39 -05:00
3f4bc12d36 chore: prepare v1.4.0 2024-05-14 13:12:56 -05:00
fd53b6520f feat: Collect.toBuffer 2024-05-14 13:12:41 -05:00
0ef7240d61 wip: explore removing delays(10) 2024-05-14 12:44:31 -05:00
de22f44f86 wip: explore removing delays(9) 2024-05-14 12:43:38 -05:00
f9c0e20777 wip: explore removing delays(8) 2024-05-14 11:08:46 -05:00
edc7d40dbc wip: explore removing delays(7) 2024-05-14 11:04:50 -05:00
ba8d90038d wip: explore removing delays(6) 2024-05-14 10:55:14 -05:00
dfdca9f5e9 wip: explore removing delays(5) 2024-05-14 10:39:06 -05:00
67ae171532 wip: explore removing delays(4) 2024-05-13 21:18:27 -05:00
a347c05062 wip: explore removing delays(3) 2024-05-13 21:17:28 -05:00
f9446c97a0 wip: explore removing delays(2) 2024-05-13 21:15:45 -05:00
d3b8d1792d wip: explore removing delays 2024-05-13 21:06:41 -05:00
e05c74f42f fix: minor fixes 2024-05-13 15:04:34 -05:00
e1c2481e70 chore: prepare v1.3.3 2024-05-13 14:42:48 -05:00
820351f800 fix: more yields 2024-05-13 14:42:23 -05:00
9d8b500b8d chore: prepare v1.3.2 2024-05-13 14:35:55 -05:00
b7bead090e fix: transform should read more than just 1 chunk after writing 2024-05-13 14:35:44 -05:00
3db5cc44a9 chore: prepare v1.3.1 2024-05-13 13:27:28 -05:00
1a5ca66e83 fix: Pipes.Node.FS.read' 2024-05-13 13:27:18 -05:00
54d9d57927 chore: prepare v1.3.0 2024-05-13 11:21:23 -05:00
a5c535fb1e feat: Pipes.Construct 2024-05-13 11:21:06 -05:00
7e6c6af3dd chore: prepare v1.2.3 2024-05-11 22:11:44 -05:00
faf49fafd5 chore: lock 2024-05-11 22:11:40 -05:00
04815f66a4 chore: prepare v1.2.2 2024-05-11 22:10:57 -05:00
fd895de148 fix: ensure-ranges 2024-05-11 22:09:45 -05:00
b618ef1819 chore: prepare v1.2.1 2024-05-11 22:08:38 -05:00
407491f055 fix: generalize all Affs to MonadAffs 2024-05-11 22:08:27 -05:00
2fdf6f0dad chore: prepare v1.2.0 2024-05-11 22:01:44 -05:00
eb01962553 feat: fix inEOS, add uniqHash + invoke 2024-05-11 22:01:06 -05:00
4baf317f43 chore: prepare v1.1.0 2024-05-11 18:02:13 -05:00
634e52fe39 fix: more Pipes.Collect utils, stack-safety 2024-05-11 18:01:43 -05:00
f2f18c3c13 chore: prepare v1.0.5 2024-05-10 18:30:36 -05:00
76958b63ef feat: Pipes.Util.chunked 2024-05-10 18:30:27 -05:00
25 changed files with 9699 additions and 359 deletions

139
README.md
View File

@@ -2,41 +2,114 @@
Interact with node streams in object mode using [`Pipes`]!
## Example
```purescript
import Prelude
import Effect.Aff (launchAff_)
import Effect (Effect)
import Effect.Class (liftEffect)
import Effect.Console (log)
import Pipes.Prelude ((>->))
import Pipes.Prelude as Pipes
import Pipes.Core as Pipes.Core
import Pipes.Node.FS.Stream as FS
import Pipes.Node.Zlib as Zlib
import Pipes.CSV.Parse as CSV.Parse
-- == my-zipped-data.csv ==
-- id,foo,is_deleted
-- 1,hello,f
-- 2,goodbye,t
-- Logs:
-- {id: 1, foo: "hello", is_deleted: false}
-- {id: 2, foo: "goodbye", is_deleted: true}
main :: Effect Unit
main =
Pipes.Core.runEffect
$ FS.createReadStream "my-zipped-data.csv.gz"
>-> Zlib.gunzip
>-> CSV.Parse.parse @{id :: Int, foo :: String, is_deleted :: Boolean}
>-> Pipes.mapM (liftEffect <<< log)
```
## Installing
## Install
```bash
spago install node-stream-pipes
```
## Usage
### Node Streams
#### Raw Streams
Raw `objectMode` Node streams are represented in `Node.Stream.Object`:
- `Writable a` accepts chunks of type `a`
- `Readable a` emits chunks of type `a`
- `Transform a b` transforms chunks from `a` to `b`
Non-Object streams can also be represented with these types; for example an `fs.WriteStream`
can be coerced to `Writable Buffer` without issue.
Interop between these types and `Node.Stream` are provided in `Node.Stream.Object`:
- `unsafeFrom{String,Buffer}{Writable,Readable,Transform}`
- `unsafeCoerce{Writable,Readable,Transform}`
#### Pipes
Streams in `Node.Stream.Object` can be converted to `Producer`s, `Consumer`s and `Pipe`s with `Pipes.Node.Stream`:
- `fromReadable :: forall a. <Readable a> -> Producer (Maybe a) <Aff> Unit`
- `fromWritable :: forall a. <Writable a> -> Consumer (Maybe a) <Aff> Unit`
- `fromTransform :: forall a b. <Transform a b> -> Pipe (Maybe a) (Maybe b) <Aff> Unit`
#### EOS Marker
Normally, pipe computations will not be executed once any computation in a pipeline exits.
To allow for resource cleanup and awareness that the stream is about to close,
`Maybe a` is used occasionally in this package as an End-of-Stream marker:
```purescript
-- foo.txt is "hello, world!\n"
chunks <- Pipes.Collect.toArray $ Pipes.FS.read "foo.txt" >-> Pipes.Node.Stream.inEOS (Pipes.Buffer.toString UTF8)
chunks `shouldEqual` [Just "hello, world!\n", Nothing]
```
Pipes from `a -> b` unaware of EOS can be lifted to `Maybe a -> Maybe b` with `Pipes.Node.Stream.inEOS`.
Producers of `Maybe a` can drop the EOS marker and emit `a` with `Pipes.Node.Stream.unEOS`.
Producers of `a` can have an EOS marker added with `Pipes.Node.Stream.withEOS`.
#### Example
`Pipes.PassThrough.js`
```javascript
import {PassThrough} from 'stream'
export const makePassThrough = () => new PassThrough()
```
`Pipes.PassThrough.purs`
```purescript
module Pipes.PassThrough where
import Prelude
import Effect (Effect)
import Effect.Class (liftEffect)
import Effect.Aff (Aff)
import Pipes.Core (Pipe)
import Node.Stream.Object as ObjectStream
import Pipes.Node.Stream as Pipes.Node.Stream
type PassThroughStream a = ObjectStream.Transform a a
foreign import makeRaw :: Effect PassThroughStream
passThrough :: forall a. Pipe a a Aff Unit
passThrough = do
raw <- liftEffect $ makeRaw
Pipes.Node.Stream.fromTransform raw
```
### Utilities
This package provides utilities that explicitly use `MonadRec` to ensure stack-safety
when dealing with producers of large amounts of data.
- `Pipes.Collect` provides stack-safe utilities for executing a pipeline and collecting results into a collection, `Buffer`, `Monoid` etc.
- `Pipes.Construct` provides stack-safe utilities for creating producers from in-memory collections.
- `Pipes.Util` provides some miscellaneous utilities missing from `pipes`.
### Zlib
Pipes for compression & decompression using `zlib` are provided in `Pipes.Node.Zlib`.
### FS
Read files with:
- `Pipes.Node.FS.read <path>`
- `Pipes.Node.FS.read' <WriteStreamOptions> <path>`
```purescript
Pipes.Collect.toStringWith UTF8 $ Pipes.Node.FS.read "foo.txt" >-> Pipes.Stream.unEOS
```
Write files with:
- `Pipes.Node.FS.write' <WriteStreamOptions> <path>`
- `Pipes.Node.FS.trunc <path>`
- `Pipes.Node.FS.create <path>`
- `Pipes.Node.FS.append <path>`
```purescript
Pipes.Stream.withEOS (
Pipes.Construct.eachArray ["id,name", "1,henry", "2,suzie"]
>-> Pipes.Util.intersperse "\n"
>-> Pipes.Buffer.fromString UTF8
)
>-> Pipes.Node.FS.create "foo.csv"
```
[`Pipes`]: https://pursuit.purescript.org/packages/purescript-pipes/8.0.0

BIN
bun.lockb

Binary file not shown.

View File

@@ -1,11 +1,13 @@
{
"name": "purescript-csv-stream",
"version": "v1.0.4",
"name": "purescript-node-stream-pipes",
"version": "v2.1.0",
"type": "module",
"dependencies": {
"csv-parse": "^5.5.5",
"csv-stringify": "^6.4.6"
"csv-parse": "^5.5.6",
"csv-stringify": "^6.5.0"
},
"devDependencies": {
"cbor-x": "^1.5.9",
"typescript": "^5.4.5"
}
}

View File

@@ -5,10 +5,16 @@ workspace:
dependencies:
- aff: ">=7.1.0 <8.0.0"
- arrays: ">=7.3.0 <8.0.0"
- console: ">=6.1.0 <7.0.0"
- control: ">=6.0.0 <7.0.0"
- datetime: ">=6.1.0 <7.0.0"
- effect: ">=4.0.0 <5.0.0"
- either: ">=6.1.0 <7.0.0"
- exceptions: ">=6.0.0 <7.0.0"
- foldable-traversable: ">=6.0.0 <7.0.0"
- foreign-object: ">=4.1.0 <5.0.0"
- fork: ">=6.0.0 <7.0.0"
- lists: ">=7.0.0 <8.0.0"
- maybe: ">=6.0.0 <7.0.0"
- mmorph: ">=7.0.0 <8.0.0"
- newtype: ">=5.0.0 <6.0.0"
@@ -18,6 +24,8 @@ workspace:
- node-path: ">=5.0.0 <6.0.0"
- node-streams: ">=9.0.0 <10.0.0"
- node-zlib: ">=0.4.0 <0.5.0"
- now: ">=6.0.0 <7.0.0"
- ordered-collections: ">=3.2.0 <4.0.0"
- parallel: ">=6.0.0 <7.0.0"
- pipes: ">=8.0.0 <9.0.0"
- prelude: ">=6.0.1 <7.0.0"
@@ -26,6 +34,7 @@ workspace:
- tailrec: ">=6.1.0 <7.0.0"
- transformers: ">=6.0.0 <7.0.0"
- tuples: ">=7.0.0 <8.0.0"
- unordered-collections: ">=3.1.0 <4.0.0"
- unsafe-coerce: ">=6.0.0 <7.0.0"
test_dependencies:
- console
@@ -106,6 +115,7 @@ workspace:
- type-equality
- typelevel-prelude
- unfoldable
- unordered-collections
- unsafe-coerce
- variant
extra_packages: {}
@@ -904,6 +914,21 @@ packages:
- partial
- prelude
- tuples
unordered-collections:
type: registry
version: 3.1.0
integrity: sha256-H2eQR+ylI+cljz4XzWfEbdF7ee+pnw2IZCeq69AuJ+Q=
dependencies:
- arrays
- enums
- functions
- integers
- lists
- prelude
- record
- tuples
- typelevel-prelude
- unfoldable
unsafe-coerce:
type: registry
version: 6.0.0

View File

@@ -1,7 +1,7 @@
package:
name: node-stream-pipes
publish:
version: '1.0.4'
version: '2.1.0'
license: 'GPL-3.0-or-later'
location:
githubOwner: 'cakekindel'
@@ -12,10 +12,16 @@ package:
dependencies:
- aff: ">=7.1.0 <8.0.0"
- arrays: ">=7.3.0 <8.0.0"
- console: ">=6.1.0 <7.0.0"
- control: ">=6.0.0 <7.0.0"
- datetime: ">=6.1.0 <7.0.0"
- effect: ">=4.0.0 <5.0.0"
- either: ">=6.1.0 <7.0.0"
- exceptions: ">=6.0.0 <7.0.0"
- foldable-traversable: ">=6.0.0 <7.0.0"
- foreign-object: ">=4.1.0 <5.0.0"
- fork: ">=6.0.0 <7.0.0"
- lists: ">=7.0.0 <8.0.0"
- maybe: ">=6.0.0 <7.0.0"
- mmorph: ">=7.0.0 <8.0.0"
- newtype: ">=5.0.0 <6.0.0"
@@ -25,6 +31,8 @@ package:
- node-path: ">=5.0.0 <6.0.0"
- node-streams: ">=9.0.0 <10.0.0"
- node-zlib: ">=0.4.0 <0.5.0"
- now: ">=6.0.0 <7.0.0"
- ordered-collections: ">=3.2.0 <4.0.0"
- parallel: ">=6.0.0 <7.0.0"
- pipes: ">=8.0.0 <9.0.0"
- prelude: ">=6.0.1 <7.0.0"
@@ -32,9 +40,13 @@ package:
- strings: ">=6.0.1 <7.0.0"
- tailrec: ">=6.1.0 <7.0.0"
- transformers: ">=6.0.0 <7.0.0"
- tuples: ">=7.0.0 <8.0.0"
- unordered-collections: ">=3.1.0 <4.0.0"
- unsafe-coerce: ">=6.0.0 <7.0.0"
test:
main: Test.Main
build:
strict: true
dependencies:
- console
- gen

View File

@@ -1,45 +1,50 @@
import Stream from "stream";
/** @type {(s: Stream.Readable | Stream.Transform) => () => boolean} */
export const isReadableImpl = s => () => s.readable
export const isReadableImpl = (s) => () => s.readable;
/** @type {(s: Stream.Readable | Stream.Transform) => () => number} */
export const readableLengthImpl = (s) => () => s.readableLength;
/** @type {(s: Stream.Writable | Stream.Readable) => () => boolean} */
export const isClosedImpl = s => () => s.closed
export const isClosedImpl = (s) => () => s.closed;
/** @type {(s: Stream.Writable | Stream.Transform) => () => boolean} */
export const isWritableImpl = s => () => s.writable
export const isWritableImpl = (s) => () => s.writable;
/** @type {(s: Stream.Writable | Stream.Transform) => () => boolean} */
export const needsDrainImpl = (s) => () => s.writableNeedDrain;
/** @type {(s: Stream.Readable | Stream.Transform) => () => boolean} */
export const isReadableEndedImpl = s => () => s.readableEnded
export const isReadableEndedImpl = (s) => () => s.readableEnded;
/** @type {(s: Stream.Writable | Stream.Transform) => () => boolean} */
export const isWritableEndedImpl = s => () => s.writableEnded
export const isWritableEndedImpl = (s) => () => s.writableEnded;
/** @type {(s: Stream.Writable | Stream.Transform) => () => boolean} */
export const isWritableFinishedImpl = (s) => () => s.writableFinished;
/** @type {(s: Stream.Writable | Stream.Transform) => () => void} */
export const endImpl = (s) => () => s.end();
/** @type {<WriteResult>(o: {ok: WriteResult, wouldBlock: WriteResult, closed: WriteResult}) => (s: Stream.Writable | Stream.Transform) => (a: unknown) => () => WriteResult} */
export const writeImpl = ({ok, wouldBlock, closed}) => (s) => (a) => () => {
if (s.closed || s.writableEnded) {
return closed
}
/** @type {<WriteResult>(o: {ok: WriteResult, wouldBlock: WriteResult}) => (s: Stream.Writable | Stream.Transform) => (a: unknown) => () => WriteResult} */
export const writeImpl =
({ ok, wouldBlock }) =>
(s) =>
(a) =>
() => {
if (s.write(a)) {
return ok;
} else {
return wouldBlock;
}
};
if (s.write(a)) {
return ok
} else {
return wouldBlock
}
}
/** @type {<ReadResult>(o: {just: (_a: unknown) => ReadResult, wouldBlock: ReadResult, closed: ReadResult}) => (s: Stream.Readable | Stream.Transform) => () => ReadResult} */
/** @type {<ReadResult>(o: {just: (_a: unknown) => ReadResult, wouldBlock: ReadResult}) => (s: Stream.Readable | Stream.Transform) => () => ReadResult} */
export const readImpl =
({ just, closed, wouldBlock }) =>
({ just, wouldBlock }) =>
(s) =>
() => {
if (s.closed || s.readableEnded) {
return closed;
}
const a = s.read();
if (a === null) {
return wouldBlock;

View File

@@ -14,8 +14,9 @@ import Data.Maybe (Maybe(..))
import Data.Show.Generic (genericShow)
import Effect (Effect)
import Effect.Aff (Aff, effectCanceler, makeAff)
import Effect.Aff as Aff
import Effect.Class (liftEffect)
import Effect.Exception (Error)
import Effect.Exception (Error, error)
import Effect.Uncurried (mkEffectFn1)
import Node.Buffer (Buffer)
import Node.EventEmitter (EventHandle(..))
@@ -26,24 +27,29 @@ import Unsafe.Coerce (unsafeCoerce)
data ReadResult a
= ReadWouldBlock
| ReadClosed
| ReadJust a
derive instance Generic (ReadResult a) _
derive instance Functor ReadResult
derive instance Eq a => Eq (ReadResult a)
instance Show (ReadResult a) where
show = genericShow <<< map (const "..")
maybeReadResult :: forall a. ReadResult a -> Maybe a
maybeReadResult (ReadWouldBlock) = Nothing
maybeReadResult (ReadJust a) = Just a
data WriteResult
= WriteWouldBlock
| WriteClosed
| WriteOk
derive instance Generic WriteResult _
derive instance Eq WriteResult
instance Show WriteResult where show = genericShow
instance Show WriteResult where
show = genericShow
type ReadResultFFI a = { closed :: ReadResult a, wouldBlock :: ReadResult a, just :: a -> ReadResult a }
type WriteResultFFI = { closed :: WriteResult, wouldBlock :: WriteResult, ok :: WriteResult }
type ReadResultFFI a = { wouldBlock :: ReadResult a, just :: a -> ReadResult a }
type WriteResultFFI = { wouldBlock :: WriteResult, ok :: WriteResult }
foreign import data Writable :: Type -> Type
foreign import data Readable :: Type -> Type
@@ -56,13 +62,16 @@ foreign import isReadableImpl :: forall s. s -> Effect Boolean
foreign import isWritableImpl :: forall s. s -> Effect Boolean
foreign import isReadableEndedImpl :: forall s. s -> Effect Boolean
foreign import isWritableEndedImpl :: forall s. s -> Effect Boolean
foreign import isWritableFinishedImpl :: forall s. s -> Effect Boolean
foreign import isClosedImpl :: forall s. s -> Effect Boolean
foreign import needsDrainImpl :: forall s. s -> Effect Boolean
foreign import readableLengthImpl :: forall s. s -> Effect Int
readResultFFI :: forall a. ReadResultFFI a
readResultFFI = {closed: ReadClosed, wouldBlock: ReadWouldBlock, just: ReadJust}
readResultFFI = { wouldBlock: ReadWouldBlock, just: ReadJust }
writeResultFFI :: WriteResultFFI
writeResultFFI = {closed: WriteClosed, wouldBlock: WriteWouldBlock, ok: WriteOk}
writeResultFFI = { wouldBlock: WriteWouldBlock, ok: WriteOk }
class Stream :: Type -> Constraint
class Stream s where
@@ -78,25 +87,31 @@ else instance Stream s => Stream s where
isClosed s = isClosed s
class Stream s <= Read s a | s -> a where
readableLength :: s -> Effect Int
isReadable :: s -> Effect Boolean
isReadableEnded :: s -> Effect Boolean
read :: s -> Effect (ReadResult a)
class Stream s <= Write s a | s -> a where
isWritable :: s -> Effect Boolean
needsDrain :: s -> Effect Boolean
isWritableEnded :: s -> Effect Boolean
isWritableFinished :: s -> Effect Boolean
write :: s -> a -> Effect WriteResult
end :: s -> Effect Unit
instance Read (Readable a) a where
readableLength = readableLengthImpl
isReadable = isReadableImpl
isReadableEnded = isReadableEndedImpl
read = readImpl readResultFFI
else instance Read (Transform a b) b where
readableLength = readableLengthImpl
isReadable = isReadableImpl
isReadableEnded = isReadableEndedImpl
read = readImpl readResultFFI
else instance (Read s a) => Read s a where
readableLength = readableLengthImpl
isReadable = isReadableImpl
isReadableEnded = isReadableEndedImpl
read s = read s
@@ -104,58 +119,99 @@ else instance (Read s a) => Read s a where
instance Write (Writable a) a where
isWritable = isWritableImpl
isWritableEnded = isWritableEndedImpl
isWritableFinished = isWritableFinishedImpl
write s = writeImpl writeResultFFI s
end = endImpl
needsDrain = needsDrainImpl
else instance Write (Transform a b) a where
isWritable = isWritableImpl
isWritableEnded = isWritableEndedImpl
isWritableFinished = isWritableFinishedImpl
write s = writeImpl writeResultFFI s
end = endImpl
needsDrain = needsDrainImpl
else instance (Write s a) => Write s a where
isWritable = isWritableImpl
isWritableEnded = isWritableEndedImpl
isWritableFinished = isWritableFinishedImpl
write s a = write s a
end s = end s
needsDrain = needsDrainImpl
withErrorST :: forall s. Stream s => s -> Effect {cancel :: Effect Unit, error :: STRef Global (Maybe Error)}
withErrorST :: forall s. Stream s => s -> Effect { cancel :: Effect Unit, error :: STRef Global (Maybe Error) }
withErrorST s = do
error <- liftST $ STRef.new Nothing
cancel <- flip (Event.once errorH) s \e -> void $ liftST $ STRef.write (Just e) error
pure {error, cancel}
pure { error, cancel }
fromBufferReadable :: forall r. Stream.Readable r -> Readable Buffer
fromBufferReadable = unsafeCoerce
unsafeCoerceWritable :: forall r a. Stream.Writable r -> Writable a
unsafeCoerceWritable = unsafeCoerce
fromBufferTransform :: Stream.Duplex -> Transform Buffer Buffer
fromBufferTransform = unsafeCoerce
unsafeCoerceReadable :: forall r a. Stream.Readable r -> Readable a
unsafeCoerceReadable = unsafeCoerce
fromBufferWritable :: forall r. Stream.Writable r -> Writable Buffer
fromBufferWritable = unsafeCoerce
unsafeCoerceTransform :: forall a b. Stream.Duplex -> Transform a b
unsafeCoerceTransform = unsafeCoerce
fromStringReadable :: forall r. Stream.Readable r -> Readable String
fromStringReadable = unsafeCoerce
unsafeFromBufferReadable :: forall r. Stream.Readable r -> Readable Buffer
unsafeFromBufferReadable = unsafeCoerce
fromStringTransform :: Stream.Duplex -> Transform String String
fromStringTransform = unsafeCoerce
unsafeFromBufferTransform :: forall a. Stream.Duplex -> Transform Buffer a
unsafeFromBufferTransform = unsafeCoerce
fromStringWritable :: forall r. Stream.Writable r -> Writable String
fromStringWritable = unsafeCoerce
unsafeFromBufferWritable :: forall r. Stream.Writable r -> Writable Buffer
unsafeFromBufferWritable = unsafeCoerce
unsafeFromStringReadable :: forall r. Stream.Readable r -> Readable String
unsafeFromStringReadable = unsafeCoerce
unsafeFromStringTransform :: forall a. Stream.Duplex -> Transform String a
unsafeFromStringTransform = unsafeCoerce
unsafeFromStringWritable :: forall r. Stream.Writable r -> Writable String
unsafeFromStringWritable = unsafeCoerce
awaitReadableOrClosed :: forall s a. Read s a => s -> Aff Unit
awaitReadableOrClosed s = do
awaitReadableOrClosed s = Aff.supervise do
fiber <-
Aff.forkAff
$ parOneOf
[ onceAff0 readableH s $> Right unit
, onceAff0 closeH s $> Right unit
, Left <$> onceAff1 errorH s
]
closed <- liftEffect $ isClosed s
ended <- liftEffect $ isReadableEnded s
readEnded <- liftEffect $ isReadableEnded s
readable <- liftEffect $ isReadable s
when (not ended && not closed && not readable)
$ liftEither =<< parOneOf [onceAff0 readableH s $> Right unit, onceAff0 closeH s $> Right unit, Left <$> onceAff1 errorH s]
length <- liftEffect $ readableLength s
if (not closed && not readEnded && readable && length == 0) then
liftEither =<< Aff.joinFiber fiber
else
Aff.killFiber (error "") fiber
awaitFinished :: forall s a. Write s a => s -> Aff Unit
awaitFinished s = Aff.supervise do
fiber <- Aff.forkAff $ onceAff0 finishH s
finished <- liftEffect $ isWritableFinished s
if not finished then Aff.joinFiber fiber else Aff.killFiber (error "") fiber
awaitWritableOrClosed :: forall s a. Write s a => s -> Aff Unit
awaitWritableOrClosed s = do
awaitWritableOrClosed s = Aff.supervise do
fiber <-
Aff.forkAff
$ parOneOf
[ onceAff0 drainH s $> Right unit
, onceAff0 closeH s $> Right unit
, Left <$> onceAff1 errorH s
]
closed <- liftEffect $ isClosed s
ended <- liftEffect $ isWritableEnded s
writeEnded <- liftEffect $ isWritableEnded s
writable <- liftEffect $ isWritable s
when (not ended && not closed && not writable)
$ liftEither =<< parOneOf [onceAff0 drainH s $> Right unit, onceAff0 closeH s $> Right unit, Left <$> onceAff1 errorH s]
needsDrain <- liftEffect $ needsDrain s
if not closed && not writeEnded && writable && needsDrain then
liftEither =<< Aff.joinFiber fiber
else
Aff.killFiber (error "") fiber
onceAff0 :: forall e. EventHandle0 e -> e -> Aff Unit
onceAff0 h emitter = makeAff \res -> do
@@ -181,3 +237,6 @@ errorH = EventHandle "error" mkEffectFn1
endH :: forall s a. Write s a => EventHandle0 s
endH = EventHandle "end" identity
finishH :: forall s a. Write s a => EventHandle0 s
finishH = EventHandle "finish" identity

357
src/Pipes.Async.purs Normal file
View File

@@ -0,0 +1,357 @@
module Pipes.Async where
import Prelude hiding (join)
import Control.Alternative (class Alternative, empty, guard)
import Control.Monad.Cont (class MonadTrans)
import Control.Monad.Error.Class (class MonadError, class MonadThrow, catchError, throwError)
import Control.Monad.Except (ExceptT, runExceptT)
import Control.Monad.Fork.Class (class MonadBracket, class MonadFork, fork)
import Control.Monad.Maybe.Trans (MaybeT(..), runMaybeT)
import Control.Monad.Morph (hoist)
import Control.Monad.Rec.Class (class MonadRec, Step(..), tailRecM)
import Control.Monad.ST.Class (liftST)
import Control.Monad.ST.Ref (STRef)
import Control.Monad.ST.Ref as ST.Ref
import Control.Monad.Trans.Class (lift)
import Control.Parallel (class Parallel, parOneOf)
import Data.Array as Array
import Data.DateTime.Instant as Instant
import Data.Either (Either(..), either)
import Data.Foldable (class Foldable, fold)
import Data.Generic.Rep (class Generic)
import Data.Maybe (Maybe(..), fromMaybe, isNothing)
import Data.Newtype (unwrap)
import Data.Show.Generic (genericShow)
import Data.Time.Duration (Milliseconds)
import Data.Traversable (class Traversable, traverse, traverse_)
import Data.Tuple.Nested (type (/\), (/\))
import Effect.Aff.Class (class MonadAff, liftAff)
import Effect.Class (class MonadEffect, liftEffect)
import Effect.Console (log)
import Effect.Exception (Error, error)
import Effect.Now as Now
import Pipes (await, yield)
import Pipes.Collect as Collect
import Pipes.Core (Pipe, Proxy, Producer)
data WriteSignal
= WriteSignalOk
| WriteSignalEnded
derive instance Generic WriteSignal _
derive instance Eq WriteSignal
derive instance Ord WriteSignal
instance Show WriteSignal where show = genericShow
instance Discard WriteSignal where
discard = bind
data ReadSignal
= ReadSignalOk
| ReadSignalEnded
derive instance Generic ReadSignal _
derive instance Eq ReadSignal
derive instance Ord ReadSignal
instance Show ReadSignal where show = genericShow
instance Discard ReadSignal where
discard = bind
data WriteResult
= WriteAgain
| WriteNeedsDrain
| WriteEnded
derive instance Generic WriteResult _
derive instance Eq WriteResult
derive instance Ord WriteResult
instance Show WriteResult where show = genericShow
data ReadResult a
= ReadOk a
| ReadWouldBlock
derive instance Generic (ReadResult a) _
derive instance Eq a => Eq (ReadResult a)
derive instance Ord a => Ord (ReadResult a)
derive instance Functor ReadResult
derive instance Foldable ReadResult
derive instance Traversable ReadResult
instance Show a => Show (ReadResult a) where show = genericShow
type AsyncIO a b m r =
{ write :: a -> m WriteResult
, read :: m (ReadResult b)
, awaitWrite :: m WriteSignal
, awaitRead :: m ReadSignal
}
/\ AsyncPipe a b m r
-- | An `AsyncPipe` is a `Pipe`-like struct that allows
-- | concurrently reading from a `Producer` and writing to a `Consumer`.
-- |
-- | An implementation of `AsyncPipe` for Node `Transform` streams
-- | is provided in `Pipes.Node.Stream`.
data AsyncPipe a b m r
-- | A pure return value
= Pure r
-- | An `AsyncPipe` behind a computation
| M (m (AsyncPipe a b m r))
-- | Interface to write & read from the backing resource
| AsyncIO (AsyncIO a b m r)
-- | Modify request / response types
mapIO :: forall aa ab ba bb m r. Monad m => (ab -> aa) -> (ba -> bb) -> AsyncPipe aa ba m r -> AsyncPipe ab bb m r
mapIO _ _ (Pure a) = Pure a
mapIO a b (M m) = M $ mapIO a b <$> m
mapIO a b (AsyncIO ({write, awaitWrite, read, awaitRead} /\ m)) =
AsyncIO $ {write: write <<< a, awaitWrite, read: map b <$> read, awaitRead} /\ mapIO a b m
-- | Modify request / response types
bindIO :: forall aa ab ba bb m r. Monad m => (ab -> m aa) -> (ba -> m bb) -> AsyncPipe aa ba m r -> AsyncPipe ab bb m r
bindIO _ _ (Pure a) = Pure a
bindIO a b (M m) = M $ bindIO a b <$> m
bindIO a b (AsyncIO ({write, awaitWrite, read, awaitRead} /\ m)) =
AsyncIO $ {write: flip bind write <<< a, awaitWrite, read: traverse b =<< read, awaitRead} /\ bindIO a b m
-- | Remove the `AsyncPipe` wrapper by discarding the IO
stripIO :: forall a b m r. Monad m => AsyncPipe a b m r -> m r
stripIO (Pure r) = pure r
stripIO (M m) = m >>= stripIO
stripIO (AsyncIO (_ /\ m)) = stripIO m
-- | Execute the `AsyncPipe` monad stack until `AsyncIO` is reached (if any)
getAsyncIO :: forall a b m r. Monad m => AsyncPipe a b m r -> m (Maybe (AsyncIO a b m r))
getAsyncIO (AsyncIO a) = pure $ Just a
getAsyncIO (M m) = m >>= getAsyncIO
getAsyncIO (Pure _) = pure Nothing
instance MonadTrans (AsyncPipe a b) where
lift = M <<< map Pure
instance Monad m => Functor (AsyncPipe a b m) where
map f (Pure r) = Pure $ f r
map f (M m) = M $ map f <$> m
map f (AsyncIO (io /\ m)) = AsyncIO $ io /\ (f <$> m)
instance Monad m => Apply (AsyncPipe a b m) where
apply (Pure f) ma = f <$> ma
apply (M mf) ma = M $ (_ <*> ma) <$> mf
apply (AsyncIO (io /\ mf)) ma = AsyncIO $ io /\ (mf <*> ma)
instance Monad m => Applicative (AsyncPipe a b m) where
pure = Pure
instance Monad m => Bind (AsyncPipe a b m) where
bind (Pure a) f = f a
bind (M ma) f = M $ (_ >>= f) <$> ma
bind (AsyncIO (io /\ m)) f = AsyncIO $ io /\ (m >>= f)
instance Monad m => Monad (AsyncPipe a b m)
instance MonadThrow e m => MonadThrow e (AsyncPipe a b m) where
throwError = lift <<< throwError
instance MonadError e m => MonadError e (AsyncPipe a b m) where
catchError m f = lift $ catchError (stripIO m) (stripIO <<< f)
instance MonadEffect m => MonadEffect (AsyncPipe a b m) where
liftEffect = lift <<< liftEffect
instance MonadAff m => MonadAff (AsyncPipe a b m) where
liftAff = lift <<< liftAff
-- | Wraps all fields of an `AsyncPipe` with logging to debug
-- | behavior and timing.
debug :: forall a b m r. MonadAff m => String -> AsyncPipe (Maybe a) (Maybe b) m r -> AsyncPipe (Maybe a) (Maybe b) m r
debug c m =
let
logL :: forall m'. MonadEffect m' => _ -> m' Unit
logL msg = liftEffect $ log $ "[" <> c <> "] " <> msg
logR :: forall m'. MonadEffect m' => _ -> m' Unit
logR msg = liftEffect $ log $ "[" <> c <> "] " <> fold (Array.replicate 20 " ") <> msg
time :: forall m' a'. MonadEffect m' => m' a' -> m' (Milliseconds /\ a')
time ma = do
start <- liftEffect Now.now
a <- ma
end <- liftEffect Now.now
pure $ (end `Instant.diff` start) /\ a
in
flip bind (fromMaybe m)
$ runMaybeT do
(io /\ done') <- MaybeT $ lift $ getAsyncIO m
let
write a = do
logL "write >"
elapsed /\ w <- time $ io.write a
logL $ "< write " <> show w <> " (" <> show (unwrap elapsed) <> "ms)"
pure w
read = do
logR "read >"
elapsed /\ r <- time $ io.read
logR $ "< read " <> show (map (const unit) <$> r) <> " (" <> show (unwrap elapsed) <> "ms)"
pure r
awaitWrite = do
logL "awaitWrite >"
elapsed /\ w <- time $ io.awaitWrite
logL $ "< awaitWrite " <> show w <> " (" <> show (unwrap elapsed) <> "ms)"
pure w
awaitRead = do
logR "awaitRead >"
elapsed /\ r <- time $ io.awaitRead
logR $ "< awaitRead " <> show r <> " (" <> show (unwrap elapsed) <> "ms)"
pure r
done = do
logL "done >"
elapsed /\ r <- time done'
logL $ "< done (" <> show (unwrap elapsed) <> "ms)"
pure r
pure $ AsyncIO $ {write, read, awaitWrite, awaitRead} /\ done
-- | Convert an `AsyncPipe` to a regular `Pipe`.
-- |
-- | Rather than two concurrently-running halves (producer & consumer),
-- | this requires the `AsyncPipe` to occasionally stop `await`ing data
-- | written by the upstream `Producer` so that it can `yield` to the downstream `Consumer`.
-- |
-- | This implementation chooses to prioritize `yield`ing data to the `Consumer` over
-- | `await`ing written chunks.
-- |
-- | Note that using this limits the potential parallelism of the entire pipeline, ex:
-- |
-- | ```purs
-- | Pipe.FS.read "foo.csv" -- read
-- | >-> sync Pipe.CSV.parse -- parse
-- | >-> sync Pipe.CBOR.encode -- encode
-- | >-> Pipe.FS.write "foo.bin" -- write
-- | ```
-- |
-- | In the above example, this is what happens when the pipeline
-- | is executed:
-- | 1. `write` asks `encode` "do you have any data yet?" (fast)
-- | 1. `encode` asks `parse` "do you have any data yet?" (fast)
-- | 1. `parse` asks `read` "do you have any data yet?" (fast)
-- | 1. `read` passes 1 chunk to `parse` (fast)
-- | 1. `parse` blocks until the chunk is parsed (slow)
-- | 1. `parse` passes 1 chunk to `encode` (fast)
-- | 1. `encode` blocks until the chunk is encoded (slow)
-- | 1. `write` writes the block (fast)
-- |
-- | For larger workloads, changing this to use `asyncPipe` would be preferable, ex:
-- | ```purs
-- | Pipe.FS.read "foo.csv" -- read
-- | >-/-> Pipe.CSV.parse -- parse
-- | >-/-> Pipe.CBOR.encode -- encode
-- | >-> Pipe.FS.write "foo.bin" -- write
-- | ```
-- |
-- | With this change:
-- | * `read` will pass chunks to `parse` as fast as `parse` allows
-- | * `parse` will parse chunks and yield them to `encode` as soon as they're ready
-- | * `encode` will encode chunks and yield them to `write` as soon as they're ready
sync :: forall a b f p e m r. MonadError e m => Alternative p => Parallel p m => MonadFork f m => MonadAff m => AsyncPipe (Maybe a) (Maybe b) m r -> Pipe (Maybe a) (Maybe b) m r
sync m =
let
liftPipe :: forall r'. (Proxy _ _ _ _ m) r' -> ExceptT (Step _ _) (Proxy _ _ _ _ m) r'
liftPipe = lift
liftM :: forall r'. m r' -> ExceptT (Step _ _) (Proxy _ _ _ _ m) r'
liftM = liftPipe <<< lift
in
lift (getAsyncIO m) >>=
case _ of
Nothing -> lift $ stripIO m
Just ({write, awaitWrite, read, awaitRead} /\ done) ->
let
awaitRW onR onW =
liftM (parOneOf [Right <$> awaitWrite, Left <$> awaitRead])
>>= either (const onR) onW
wSignal WriteSignalOk = WriteAgain
wSignal WriteSignalEnded = WriteEnded
tailRecEarly f a = tailRecM (map (either identity identity) <<< runExceptT <<< f) a
continue a = throwError (Loop a)
break = (Done <$> liftM (stripIO done)) >>= throwError
in do
flip tailRecEarly WriteAgain \writable -> do
rb <- liftM read
case rb of
ReadWouldBlock
| writable == WriteEnded -> liftM awaitRead *> continue writable
| writable == WriteNeedsDrain -> awaitRW (continue writable) (continue <<< wSignal)
| otherwise -> pure unit
ReadOk (Just b) -> liftPipe (yield $ Just b) *> continue writable
ReadOk Nothing -> liftPipe (yield Nothing) *> break
when (writable /= WriteAgain) $ continue writable
a <- liftPipe await
writable' <- liftM $ write a
when (isNothing a) $ continue WriteEnded
pure $ Loop writable'
-- | Implementation of `(>-/->)`
-- |
-- | In the current `MonadFork` "thread", read data from the `AsyncPipe` as it
-- | is yielded and `yield` to the downstream `Consumer`.
-- |
-- | Concurrently, in a separate thread, read data from the upstream `Producer`
-- | and write to the `AsyncPipe` at max throughput.
-- |
-- | If the producing half fails, the error is caught and rethrown.
-- |
-- | If the consuming half fails, the error is caught, the producing half is killed, and the error is rethrown.
pipeAsync
:: forall f m a b
. MonadRec m
=> MonadAff m
=> MonadBracket Error f m
=> Producer (Maybe a) m Unit
-> AsyncPipe (Maybe a) (Maybe b) m Unit
-> Producer (Maybe b) m Unit
pipeAsync prod m =
lift (getAsyncIO m)
>>= case _ of
Nothing -> throwError $ error "`pipeAsync` invoked on `AsyncPipe` that did not have `AsyncIO`"
Just ({write, read, awaitWrite, awaitRead} /\ done) -> do
errST :: STRef _ (Maybe Error) <- lift $ liftEffect $ liftST $ ST.Ref.new Nothing
killST :: STRef _ Boolean <- lift $ liftEffect $ liftST $ ST.Ref.new false
let
killThread = void $ liftEffect $ liftST $ ST.Ref.write true killST
threadKilled = liftEffect $ liftST $ ST.Ref.read killST
putThreadError = void <<< liftEffect <<< liftST <<< flip ST.Ref.write errST <<< Just
getThreadError = liftEffect $ liftST $ ST.Ref.read errST
rx a = do
killed <- threadKilled
guard $ not killed
w <- lift $ write a
case w of
WriteNeedsDrain -> lift $ void awaitWrite
WriteEnded -> empty
WriteAgain -> pure unit
spawn = lift <<< fork <<< flip catchError putThreadError
_thread <- spawn $ void $ runMaybeT $ Collect.foreach rx (hoist lift prod)
flip tailRecM unit $ const do
getThreadError >>= traverse_ throwError
rb <- lift read
case rb of
ReadOk (Just b) -> yield (Just b) $> Loop unit
ReadOk Nothing -> killThread *> yield Nothing $> Done unit
ReadWouldBlock -> void (lift awaitRead) $> Loop unit
lift $ stripIO done
infixl 7 pipeAsync as >-/->

View File

@@ -1 +0,0 @@
module Pipes.CSV.Parse where

View File

@@ -2,17 +2,107 @@ module Pipes.Collect where
import Prelude
import Control.Monad.Rec.Class (class MonadRec)
import Control.Monad.Rec.Class (class MonadRec, Step(..), tailRecM)
import Control.Monad.ST.Class (liftST)
import Data.Array.ST as Array.ST
import Data.HashMap (HashMap)
import Data.HashMap as HashMap
import Data.Hashable (class Hashable)
import Data.List (List)
import Data.List as List
import Data.Map (Map)
import Data.Map as Map
import Data.Maybe (Maybe(..), maybe)
import Data.Tuple.Nested (type (/\), (/\))
import Effect.Class (class MonadEffect, liftEffect)
import Pipes (for) as Pipes
import Foreign.Object (Object)
import Foreign.Object.ST as Object.ST
import Foreign.Object.ST.Unsafe as Object.ST.Unsafe
import Node.Buffer (Buffer)
import Node.Buffer as Buffer
import Node.Encoding (Encoding)
import Pipes.Core (Producer)
import Pipes.Core (runEffect) as Pipes
import Pipes.Internal (Proxy(..))
-- | Traverse a pipe, collecting into a mutable array with constant stack usage
collectArray :: forall a m. MonadRec m => MonadEffect m => Producer a m Unit -> m (Array a)
collectArray p = do
-- | Fold every value produced with a monadic action
-- |
-- | Uses `MonadRec`, supporting producers of arbitrary length.
traverse :: forall a b m. MonadRec m => (b -> a -> m b) -> b -> Producer a m Unit -> m b
traverse f b0 p0 =
flip tailRecM (p0 /\ b0) \(p /\ b) ->
case p of
Respond a m -> Loop <$> (m unit /\ _) <$> f b a
M m -> Loop <$> (_ /\ b) <$> m
Request _ _ -> pure $ Done b
Pure _ -> pure $ Done b
-- | Fold every value produced
-- |
-- | Uses `MonadRec`, supporting producers of arbitrary length.
fold :: forall a b m. MonadRec m => (b -> a -> b) -> b -> Producer a m Unit -> m b
fold f b0 p0 = traverse (\b a -> pure $ f b a) b0 p0
-- | Execute a monadic action on every item in a producer.
-- |
-- | Uses `MonadRec`, supporting producers of arbitrary length.
foreach :: forall a m. MonadRec m => (a -> m Unit) -> Producer a m Unit -> m Unit
foreach f p0 = traverse (\_ a -> f a) unit p0
-- | `append` all emitted values to `mempty`
toMonoid :: forall a m. Monoid a => MonadRec m => MonadEffect m => Producer a m Unit -> m a
toMonoid = fold (<>) mempty
-- | Concatenate all buffers to a single buffer, then decode with the
-- | provided encoding.
toStringWith :: forall m. MonadRec m => MonadEffect m => Encoding -> Producer Buffer m Unit -> m String
toStringWith enc = (liftEffect <<< Buffer.toString enc) <=< toBuffer
-- | Concatenate all produced buffers to a single buffer
toBuffer :: forall m. MonadRec m => MonadEffect m => Producer Buffer m Unit -> m Buffer
toBuffer p =
(liftEffect <<< maybe (Buffer.alloc 0) pure)
=<< traverse
( flip \b ->
case _ of
Just acc -> do
new <- liftEffect $ Buffer.concat [ acc, b ]
pure $ Just new
_ -> pure $ Just b
)
Nothing
p
-- | Collect all values from a `Producer` into an array.
toArray :: forall a m. MonadRec m => MonadEffect m => Producer a m Unit -> m (Array a)
toArray p = do
st <- liftEffect $ liftST $ Array.ST.new
Pipes.runEffect $ Pipes.for p \a -> void $ liftEffect $ liftST $ Array.ST.push a st
foreach (void <<< liftEffect <<< liftST <<< flip Array.ST.push st) p
liftEffect $ liftST $ Array.ST.unsafeFreeze st
-- | Collect all values from a `Producer` into a list.
-- |
-- | Reverses the list after collecting, so that values will be
-- | in the order they were emitted.
toList :: forall a m. MonadRec m => MonadEffect m => Producer a m Unit -> m (List a)
toList = map List.reverse <<< fold (flip List.Cons) List.Nil
-- | Collect all values from a `Producer` into a list.
-- |
-- | Does not reverse the list after collecting.
toListRev :: forall a m. MonadRec m => MonadEffect m => Producer a m Unit -> m (List a)
toListRev = map List.reverse <<< fold (flip List.Cons) List.Nil
-- | Collect all values from a `Producer` into a Javascript Object.
toObject :: forall a m. MonadRec m => MonadEffect m => Producer (String /\ a) m Unit -> m (Object a)
toObject p = do
st <- liftEffect $ liftST $ Object.ST.new
foreach (\(k /\ v) -> void $ liftEffect $ liftST $ Object.ST.poke k v st) p
liftEffect $ liftST $ Object.ST.Unsafe.unsafeFreeze st
-- | Collect all values from a `Producer` into a `HashMap`
toHashMap :: forall k v m. Hashable k => MonadRec m => Producer (k /\ v) m Unit -> m (HashMap k v)
toHashMap = fold (\map (k /\ v) -> HashMap.insert k v map) HashMap.empty
-- | Collect all values from a `Producer` into a `Map`
toMap :: forall k v m. Ord k => MonadRec m => Producer (k /\ v) m Unit -> m (Map k v)
toMap = fold (\map (k /\ v) -> Map.insert k v map) Map.empty

64
src/Pipes.Construct.purs Normal file
View File

@@ -0,0 +1,64 @@
module Pipes.Construct where
import Prelude
import Control.Monad.Maybe.Trans (MaybeT(..), runMaybeT)
import Control.Monad.Rec.Class (class MonadRec, Step(..), tailRecM)
import Control.Monad.ST.Class (liftST)
import Control.Monad.Trans.Class (lift)
import Data.Array as Array
import Data.Array.ST as Array.ST
import Data.List (List)
import Data.List as List
import Data.Map (Map)
import Data.Map.Internal as Map.Internal
import Data.Maybe (fromMaybe)
import Data.Tuple.Nested (type (/\), (/\))
import Effect.Class (class MonadEffect, liftEffect)
import Pipes (yield, (>->))
import Pipes.Core (Producer)
import Pipes.Prelude as Pipe
import Pipes.Util as Pipe.Util
-- Producer that will emit monotonically increasing integers
-- ex `monotonic 0 -> 0 1 2 3 4 5 6 7 ..`
monotonic :: forall m. MonadRec m => Int -> Producer Int m Unit
monotonic start = flip tailRecM start \n -> yield n $> Loop (n + 1)
-- Producer that will emit integers from `start` (inclusive) to `end` (exclusive)
range :: forall m. MonadRec m => Int -> Int -> Producer Int m Unit
range start end = monotonic start >-> Pipe.take end
-- | Stack-safe producer that yields every value in an Array
eachArray :: forall a m. MonadRec m => Array a -> Producer a m Unit
eachArray as = monotonic 0 >-> Pipe.map (Array.index as) >-> Pipe.Util.whileJust
-- | Stack-safe producer that yields every value in a List
eachList :: forall a m. MonadRec m => List a -> Producer a m Unit
eachList init =
flip tailRecM init \as -> fromMaybe (Done unit) <$> runMaybeT do
head <- MaybeT $ pure $ List.head as
tail <- MaybeT $ pure $ List.tail as
lift $ yield head
pure $ Loop tail
-- | Stack-safe producer that yields every value in a Map
eachMap :: forall k v m. MonadEffect m => MonadRec m => Map k v -> Producer (k /\ v) m Unit
eachMap init = do
stack <- liftEffect $ liftST $ Array.ST.new
let
push a = void $ liftEffect $ liftST $ Array.ST.push a stack
pop = liftEffect $ liftST $ Array.ST.pop stack
flip tailRecM init case _ of
Map.Internal.Leaf -> fromMaybe (Done unit) <$> runMaybeT do
a <- MaybeT pop
pure $ Loop a
Map.Internal.Node _ _ k v Map.Internal.Leaf Map.Internal.Leaf -> do
yield $ k /\ v
pure $ Loop Map.Internal.Leaf
Map.Internal.Node _ _ k v Map.Internal.Leaf r -> do
yield $ k /\ v
pure $ Loop r
Map.Internal.Node a b k v l r -> do
push $ Map.Internal.Node a b k v Map.Internal.Leaf r
pure $ Loop l

View File

@@ -2,11 +2,13 @@ module Pipes.Node.FS where
import Prelude
import Control.Monad.Error.Class (class MonadThrow)
import Data.Maybe (Maybe)
import Effect.Aff (Aff)
import Effect.Aff.Class (class MonadAff)
import Effect.Class (liftEffect)
import Effect.Exception (Error)
import Node.Buffer (Buffer)
import Node.FS.Stream (WriteStreamOptions)
import Node.FS.Stream (WriteStreamOptions, ReadStreamOptions)
import Node.FS.Stream as FS.Stream
import Node.Path (FilePath)
import Node.Stream.Object as O
@@ -21,37 +23,57 @@ import Prim.Row (class Union)
-- |
-- | See `Pipes.Node.Stream.withEOS` for converting `Producer a`
-- | into `Producer (Maybe a)`, emitting `Nothing` before exiting.
write
:: forall r trash
. Union r trash WriteStreamOptions
=> Record r -> FilePath -> Consumer (Maybe Buffer) Aff Unit
write o p = do
write'
:: forall r trash m
. Union r trash WriteStreamOptions
=> MonadAff m
=> MonadThrow Error m
=> Record r
-> FilePath
-> Consumer (Maybe Buffer) m Unit
write' o p = do
w <- liftEffect $ FS.Stream.createWriteStream' p o
fromWritable $ O.fromBufferWritable w
fromWritable $ O.unsafeCoerceWritable w
-- | Open a file in write mode, failing if the file already exists.
-- |
-- | `write {flags: "wx"}`
create :: FilePath -> Consumer (Maybe Buffer) Aff Unit
create = write {flags: "wx"}
-- | `write' {flags: "wx"}`
create :: forall m. MonadAff m => MonadThrow Error m => FilePath -> Consumer (Maybe Buffer) m Unit
create = write' { flags: "wx" }
-- | Open a file in write mode, truncating it if the file already exists.
-- |
-- | `write {flags: "w"}`
truncate :: FilePath -> Consumer (Maybe Buffer) Aff Unit
truncate = write {flags: "w"}
-- | `write' {flags: "w"}`
trunc :: forall m. MonadAff m => MonadThrow Error m => FilePath -> Consumer (Maybe Buffer) m Unit
trunc = write' { flags: "w" }
-- | Open a file in write mode, appending written contents if the file already exists.
-- |
-- | `write {flags: "a"}`
append :: FilePath -> Consumer (Maybe Buffer) Aff Unit
append = write {flags: "a"}
-- | `write' {flags: "a"}`
append :: forall m. MonadAff m => MonadThrow Error m => FilePath -> Consumer (Maybe Buffer) m Unit
append = write' { flags: "a" }
-- | Creates a `fs.Readable` stream for the file at the given path.
-- |
-- | Emits `Nothing` before closing. To opt out of this behavior,
-- | use `Pipes.Node.Stream.withoutEOS` or `Pipes.Node.Stream.unEOS`.
read :: FilePath -> Producer (Maybe Buffer) Aff Unit
read :: forall m. MonadAff m => MonadThrow Error m => FilePath -> Producer (Maybe Buffer) m Unit
read p = do
r <- liftEffect $ FS.Stream.createReadStream p
fromReadable $ O.fromBufferReadable r
fromReadable $ O.unsafeCoerceReadable r
-- | Creates a `fs.Readable` stream for the file at the given path.
-- |
-- | Emits `Nothing` before closing. To opt out of this behavior,
-- | use `Pipes.Node.Stream.withoutEOS` or `Pipes.Node.Stream.unEOS`.
read'
:: forall r trash m
. Union r trash ReadStreamOptions
=> MonadAff m
=> MonadThrow Error m
=> Record r
-> FilePath
-> Producer (Maybe Buffer) m Unit
read' opts p = do
r <- liftEffect $ FS.Stream.createReadStream' p opts
fromReadable $ O.unsafeCoerceReadable r

View File

@@ -1,131 +1,159 @@
module Pipes.Node.Stream where
import Prelude
import Prelude hiding (join)
import Control.Monad.Error.Class (throwError)
import Control.Monad.Rec.Class (Step(..), tailRecM, whileJust)
import Control.Monad.Error.Class (class MonadThrow, throwError)
import Control.Monad.Rec.Class (class MonadRec, Step(..), tailRecM)
import Control.Monad.ST.Class (liftST)
import Control.Monad.ST.Ref as ST.Ref
import Control.Monad.ST.Ref as STRef
import Control.Monad.Trans.Class (lift)
import Data.Maybe (Maybe(..), maybe)
import Data.Newtype (wrap)
import Data.Traversable (for_)
import Effect.Aff (Aff, delay)
import Effect.Aff.Class (liftAff)
import Data.Traversable (for_, traverse_)
import Data.Tuple.Nested ((/\))
import Effect (Effect)
import Effect.Aff.Class (class MonadAff, liftAff)
import Effect.Class (liftEffect)
import Effect.Exception (Error)
import Node.Stream.Object (ReadResult(..), WriteResult(..))
import Node.Stream.Object as O
import Pipes (await, yield, (>->))
import Pipes (await, yield)
import Pipes (for) as P
import Pipes.Async (AsyncPipe(..))
import Pipes.Async as AsyncPipe
import Pipes.Core (Consumer, Pipe, Producer, Producer_)
import Pipes.Prelude (mapFoldable, map) as P
import Pipes.Util (InvokeResult(..), invoke)
-- | Convert a `Readable` stream to a `Pipe`.
-- |
-- | This will yield `Nothing` before exiting, signaling
-- | End-of-stream.
fromReadable :: forall s a. O.Read s a => s -> Producer_ (Maybe a) Aff Unit
fromReadable :: forall s a m. MonadThrow Error m => MonadAff m => O.Read s a => s -> Producer_ (Maybe a) m Unit
fromReadable r =
let
cleanup rmErrorListener = do
liftEffect rmErrorListener
pure $ Done unit
go {error, cancel} = do
liftAff $ delay $ wrap 0.0
go { error, cancel } = do
err <- liftEffect $ liftST $ STRef.read error
for_ err throwError
res <- liftEffect $ O.read r
case res of
O.ReadJust a -> yield (Just a) $> Loop {error, cancel}
O.ReadWouldBlock -> lift (O.awaitReadableOrClosed r) $> Loop {error, cancel}
O.ReadClosed -> yield Nothing *> cleanup cancel
in do
e <- liftEffect $ O.withErrorST r
tailRecM go e
O.ReadJust a -> yield (Just a) $> Loop { error, cancel }
O.ReadWouldBlock -> do
ended <- liftEffect $ O.isReadableEnded r
if ended then do
yield Nothing
cleanup cancel
else
liftAff (O.awaitReadableOrClosed r) $> Loop { error, cancel }
in
do
e <- liftEffect $ O.withErrorST r
tailRecM go e
-- | Convert a `Writable` stream to a `Pipe`.
-- |
-- | When `Nothing` is piped to this, the stream will
-- | be `end`ed, and the pipe will noop if invoked again.
fromWritable :: forall s a. O.Write s a => s -> Consumer (Maybe a) Aff Unit
fromWritable w =
fromWritable :: forall s a m. MonadThrow Error m => MonadAff m => O.Write s a => s -> Consumer (Maybe a) m Unit
fromWritable w = do
{ error: errorST, cancel: removeErrorListener } <- liftEffect $ O.withErrorST w
let
cleanup rmErrorListener = do
liftEffect rmErrorListener
liftEffect $ O.end w
pure $ Done unit
maybeThrow = traverse_ throwError =<< liftEffect (liftST $ STRef.read errorST)
go {error, cancel} = do
liftAff $ delay $ wrap 0.0
err <- liftEffect $ liftST $ STRef.read error
for_ err throwError
waitCanWrite = do
shouldWait <- liftEffect $ O.needsDrain w
when shouldWait $ liftAff $ O.awaitWritableOrClosed w
ma <- await
case ma of
Nothing -> cleanup cancel
Just a -> do
res <- liftEffect $ O.write w a
case res of
O.WriteOk -> pure $ Loop {error, cancel}
O.WriteWouldBlock -> do
liftAff (O.awaitWritableOrClosed w)
pure $ Loop {error, cancel}
O.WriteClosed -> cleanup cancel
in do
r <- liftEffect $ O.withErrorST w
tailRecM go r
cleanup = do
liftAff $ O.awaitFinished w
maybeThrow
liftEffect removeErrorListener
-- | Convert a `Transform` stream to a `Pipe`.
-- |
-- | When `Nothing` is piped to this, the `Transform` stream will
-- | be `end`ed, and the pipe will noop if invoked again.
fromTransform :: forall a b. O.Transform a b -> Pipe (Maybe a) (Maybe b) Aff Unit
fromTransform t =
let
cleanup removeErrorListener = do
liftEffect $ O.end t
liftEffect $ removeErrorListener
fromReadable t
pure $ Done unit
yieldFromReadableHalf = do
res <- liftEffect (O.read t)
case res of
O.ReadJust a -> yield (Just a)
O.ReadWouldBlock -> pure unit
O.ReadClosed -> yield Nothing *> pure unit
go {error, cancel} = do
liftAff $ delay $ wrap 0.0
err <- liftEffect $ liftST $ STRef.read error
for_ err throwError
onEOS = liftEffect (O.end w) *> cleanup $> Done unit
onChunk a = liftEffect (O.write w a) $> Loop unit
ma <- await
case ma of
Nothing -> cleanup cancel
Just a' -> do
res <- liftEffect $ O.write t a'
yieldFromReadableHalf
case res of
O.WriteClosed -> cleanup cancel
O.WriteOk -> pure $ Loop {error, cancel}
O.WriteWouldBlock -> do
lift (O.awaitWritableOrClosed t)
pure $ Loop {error, cancel}
in do
r <- liftEffect $ O.withErrorST t
tailRecM go r
go _ = do
maybeThrow
waitCanWrite
ended <- liftEffect $ O.isWritableEnded w
if ended then
cleanup $> Done unit
else
await >>= maybe onEOS onChunk
tailRecM go unit
fromTransformEffect
:: forall a b m
. MonadThrow Error m
=> MonadAff m
=> Effect (O.Transform a b)
-> AsyncPipe (Maybe a) (Maybe b) m Unit
fromTransformEffect = fromTransform <=< liftEffect
-- | Convert a `Transform` stream to an `AsyncPipe`.
fromTransform
:: forall a b m
. MonadThrow Error m
=> MonadAff m
=> O.Transform a b
-> AsyncPipe (Maybe a) (Maybe b) m Unit
fromTransform stream = do
{ error: errorST, cancel: removeErrorListener } <- liftEffect $ O.withErrorST stream
let
rethrow = traverse_ throwError =<< liftEffect (liftST $ ST.Ref.read errorST)
cleanup = liftEffect removeErrorListener
writeSignal =
liftEffect (O.isWritableEnded stream)
<#> if _ then AsyncPipe.WriteSignalEnded else AsyncPipe.WriteSignalOk
readSignal =
liftEffect (O.isReadableEnded stream)
<#> if _ then AsyncPipe.ReadSignalEnded else AsyncPipe.ReadSignalOk
writeResult WriteOk = AsyncPipe.WriteAgain
writeResult WriteWouldBlock = AsyncPipe.WriteNeedsDrain
readResult (ReadJust a) = AsyncPipe.ReadOk (Just a)
readResult ReadWouldBlock = AsyncPipe.ReadWouldBlock
awaitWritable = liftAff $ O.awaitWritableOrClosed stream
awaitReadable = liftAff $ O.awaitReadableOrClosed stream
awaitWrite = rethrow *> awaitWritable *> writeSignal
awaitRead = rethrow *> awaitReadable *> readSignal
whenReadNotEnded m =
liftEffect (O.isReadableEnded stream)
>>= if _ then pure $ AsyncPipe.ReadOk Nothing else m
readNow = readResult <$> liftEffect (O.read stream)
writeNow a = writeResult <$> liftEffect (O.write stream a)
read = rethrow *> whenReadNotEnded readNow
write Nothing = liftEffect (O.end stream) $> AsyncPipe.WriteEnded
write (Just a) = rethrow *> writeNow a
AsyncIO ({write, awaitWrite, read, awaitRead} /\ cleanup)
-- | Given a `Producer` of values, wrap them in `Just`.
-- |
-- | Before the `Producer` exits, emits `Nothing` as an End-of-stream signal.
withEOS :: forall a. Producer a Aff Unit -> Producer (Maybe a) Aff Unit
withEOS :: forall a m. Monad m => Producer a m Unit -> Producer (Maybe a) m Unit
withEOS a = do
P.for a (yield <<< Just)
yield Nothing
-- | Strip a pipeline of the EOS signal
unEOS :: forall a. Pipe (Maybe a) a Aff Unit
unEOS = P.mapFoldable identity
unEOS :: forall a m. Monad m => Pipe (Maybe a) a m Unit
unEOS = tailRecM (const $ maybe (pure $ Done unit) (\a -> yield a $> Loop unit) =<< await) unit
-- | Lift a `Pipe a a` to `Pipe (Maybe a) (Maybe a)`.
-- |
@@ -137,8 +165,16 @@ unEOS = P.mapFoldable identity
-- | `Just` values will be passed to the pipe, and the response(s) will be wrapped in `Just`.
-- |
-- | `Nothing` will bypass the given pipe entirely, and the pipe will not be invoked again.
inEOS :: forall a b. Pipe a b Aff Unit -> Pipe (Maybe a) (Maybe b) Aff Unit
inEOS p = whileJust do
inEOS :: forall a b m. MonadRec m => Pipe a b m Unit -> Pipe (Maybe a) (Maybe b) m Unit
inEOS p = flip tailRecM p \p' -> do
ma <- await
maybe (yield Nothing) (\a -> yield a >-> p >-> P.map Just) ma
pure $ void ma
case ma of
Just a -> do
res <- lift $ invoke p' a
case res of
Yielded (as /\ p'') -> do
for_ (Just <$> as) yield
pure $ Loop p''
DidNotYield p'' -> pure $ Loop p''
Exited -> yield Nothing $> Done unit
_ -> yield Nothing $> Done unit

View File

@@ -2,39 +2,42 @@ module Pipes.Node.Zlib where
import Prelude
import Control.Monad.Error.Class (class MonadThrow)
import Data.Maybe (Maybe)
import Effect (Effect)
import Effect.Aff (Aff)
import Effect.Aff.Class (class MonadAff)
import Effect.Class (liftEffect)
import Effect.Exception (Error)
import Node.Buffer (Buffer)
import Node.Stream.Object as O
import Node.Zlib as Zlib
import Node.Zlib.Types (ZlibStream)
import Pipes.Core (Pipe)
import Pipes.Async (AsyncPipe)
import Pipes.Node.Stream (fromTransform)
fromZlib :: forall r. Effect (ZlibStream r) -> Pipe (Maybe Buffer) (Maybe Buffer) Aff Unit
fromZlib z = do
raw <- liftEffect $ Zlib.toDuplex <$> z
fromTransform $ O.fromBufferTransform raw
fromZlib :: forall r m. MonadAff m => MonadThrow Error m => Effect (ZlibStream r) -> AsyncPipe (Maybe Buffer) (Maybe Buffer) m Unit
fromZlib z =
do
raw <- liftEffect $ Zlib.toDuplex <$> z
fromTransform $ O.unsafeCoerceTransform raw
gzip :: Pipe (Maybe Buffer) (Maybe Buffer) Aff Unit
gzip :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe (Maybe Buffer) (Maybe Buffer) m Unit
gzip = fromZlib Zlib.createGzip
gunzip :: Pipe (Maybe Buffer) (Maybe Buffer) Aff Unit
gunzip :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe (Maybe Buffer) (Maybe Buffer) m Unit
gunzip = fromZlib Zlib.createGunzip
unzip :: Pipe (Maybe Buffer) (Maybe Buffer) Aff Unit
unzip :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe (Maybe Buffer) (Maybe Buffer) m Unit
unzip = fromZlib Zlib.createUnzip
inflate :: Pipe (Maybe Buffer) (Maybe Buffer) Aff Unit
inflate :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe (Maybe Buffer) (Maybe Buffer) m Unit
inflate = fromZlib Zlib.createInflate
deflate :: Pipe (Maybe Buffer) (Maybe Buffer) Aff Unit
deflate :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe (Maybe Buffer) (Maybe Buffer) m Unit
deflate = fromZlib Zlib.createDeflate
brotliCompress :: Pipe (Maybe Buffer) (Maybe Buffer) Aff Unit
brotliCompress :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe (Maybe Buffer) (Maybe Buffer) m Unit
brotliCompress = fromZlib Zlib.createBrotliCompress
brotliDecompress :: Pipe (Maybe Buffer) (Maybe Buffer) Aff Unit
brotliDecompress :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe (Maybe Buffer) (Maybe Buffer) m Unit
brotliDecompress = fromZlib Zlib.createBrotliDecompress

View File

@@ -33,7 +33,7 @@ split pat = do
Nothing -> void $ liftEffect $ liftST $ Array.ST.push chunk buf
Just ix -> do
let
{before, after} = String.splitAt ix chunk
{ before, after } = String.splitAt ix chunk
len <- liftEffect $ liftST $ Array.ST.length buf
buf' <- liftEffect $ liftST $ Array.ST.splice 0 len [] buf
lift $ yield $ Just $ (fold buf') <> before

View File

@@ -2,13 +2,35 @@ module Pipes.Util where
import Prelude
import Control.Monad.Rec.Class (whileJust)
import Control.Monad.Maybe.Trans (MaybeT(..), runMaybeT)
import Control.Monad.Rec.Class (class MonadRec, Step(..), forever, tailRecM)
import Control.Monad.Rec.Class as Rec
import Control.Monad.ST.Class (liftST)
import Control.Monad.ST.Ref (STRef)
import Control.Monad.ST.Ref as STRef
import Data.Maybe (Maybe(..))
import Control.Monad.Trans.Class (lift)
import Data.Array.ST (STArray)
import Data.Array.ST as Array.ST
import Data.Either (hush)
import Data.HashSet as HashSet
import Data.Hashable (class Hashable, hash)
import Data.List.NonEmpty (NonEmptyList)
import Data.Maybe (Maybe(..), fromMaybe)
import Data.Tuple.Nested (type (/\), (/\))
import Effect.Class (class MonadEffect, liftEffect)
import Pipes (await, yield)
import Pipes.Core (Pipe)
import Pipes as Pipes
import Pipes.Core (Pipe, Producer)
import Pipes.Internal (Proxy(..))
-- | Re-yield all `Just`s, and close when `Nothing` is encountered
whileJust :: forall m a. MonadRec m => Pipe (Maybe a) a m Unit
whileJust = do
first <- await
flip tailRecM first $ \ma -> fromMaybe (Done unit) <$> runMaybeT do
a <- MaybeT $ pure ma
lift $ yield a
lift $ Loop <$> await
-- | Yields a separator value `sep` between received values
-- |
@@ -18,15 +40,114 @@ import Pipes.Core (Pipe)
-- | ```
intersperse :: forall m a. MonadEffect m => a -> Pipe (Maybe a) (Maybe a) m Unit
intersperse sep = do
isFirst <- liftEffect $ liftST $ STRef.new true
whileJust do
ma <- await
isFirst' <- liftEffect $ liftST $ STRef.read isFirst
case ma of
Just a
| isFirst' -> do
void $ liftEffect $ liftST $ STRef.write false isFirst
yield $ Just a
| otherwise -> yield (Just sep) *> yield (Just a)
Nothing -> yield Nothing
pure $ void ma
isFirstST <- liftEffect $ liftST $ STRef.new true
let
getIsFirst = liftEffect $ liftST $ STRef.read isFirstST
markNotFirst = void $ liftEffect $ liftST $ STRef.write false isFirstST
Rec.whileJust $ runMaybeT do
a <- MaybeT await
isFirst <- getIsFirst
if isFirst then markNotFirst else lift $ yield $ Just sep
lift $ yield $ Just a
yield Nothing
-- Pair every emitted value from 2 producers together, exiting when either exits.
zip :: forall a b m. MonadRec m => Producer a m Unit -> Producer b m Unit -> Producer (a /\ b) m Unit
zip as bs =
flip tailRecM (as /\ bs) \(as' /\ bs') ->
fromMaybe (Done unit) <$> runMaybeT do
a /\ as'' <- MaybeT $ lift $ hush <$> Pipes.next as'
b /\ bs'' <- MaybeT $ lift $ hush <$> Pipes.next bs'
lift $ yield $ a /\ b
pure $ Loop $ as'' /\ bs''
-- | Accumulate values in chunks of a given size.
-- |
-- | If the pipe closes without yielding a multiple of `size` elements,
-- | the remaining elements are yielded at the end.
chunked :: forall m a. MonadEffect m => Int -> Pipe (Maybe a) (Maybe (Array a)) m Unit
chunked size = do
chunkST :: STRef _ (STArray _ a) <- liftEffect $ liftST $ STRef.new =<< Array.ST.new
let
chunkPut a = liftEffect $ liftST do
chunkArray <- STRef.read chunkST
void $ Array.ST.push a chunkArray
chunkLength = liftEffect $ liftST do
chunkArray <- STRef.read chunkST
Array.ST.length chunkArray
chunkTake = liftEffect $ liftST do
chunkArray <- STRef.read chunkST
void $ flip STRef.write chunkST =<< Array.ST.new
Array.ST.unsafeFreeze chunkArray
Rec.whileJust $ runMaybeT do
a <- MaybeT await
chunkPut a
len <- lift chunkLength
when (len >= size) do
chunk <- lift chunkTake
lift $ yield $ Just chunk
len <- chunkLength
when (len > 0) do
chunk <- chunkTake
yield $ Just chunk
yield Nothing
-- | Equivalent of unix `uniq`, filtering out duplicate values passed to it.
-- |
-- | Uses a `HashSet` of hashes of `a`; for `n` elements `awaited`, this pipe
-- | will occupy O(n) space, and `yield` in O(1) time.
uniqHash :: forall a m. Hashable a => MonadEffect m => MonadRec m => Pipe a a m Unit
uniqHash = do
seenHashesST <- liftEffect $ liftST $ STRef.new HashSet.empty
forever do
a <- await
seenHashes <- liftEffect $ liftST $ STRef.read seenHashesST
when (not $ HashSet.member (hash a) seenHashes) do
void $ liftEffect $ liftST $ STRef.modify (HashSet.insert $ hash a) seenHashesST
yield a
-- | The result of a single step forward of a pipe.
data InvokeResult a b m
-- | The pipe `await`ed the value, but did not `yield` a response.
= DidNotYield (Pipe a b m Unit)
-- | The pipe `await`ed the value, and `yield`ed 1 or more responses.
| Yielded (NonEmptyList b /\ Pipe a b m Unit)
-- | The pipe `await`ed the value, and exited.
| Exited
data IntermediateInvokeResult a b m
= IDidNotYield (Pipe a b m Unit)
| IYielded (NonEmptyList b /\ Pipe a b m Unit)
| IDidNotAwait (Pipe a b m Unit)
-- | Pass a single value to a pipe, returning the result of the pipe's invocation.
invoke :: forall m a b. Monad m => Pipe a b m Unit -> a -> m (InvokeResult a b m)
invoke m a =
let
go :: IntermediateInvokeResult a b m -> m (InvokeResult a b m)
go (IYielded (as /\ n)) =
case n of
Request _ _ -> pure $ Yielded $ as /\ n
Respond rep f -> go (IYielded $ (as <> pure rep) /\ f unit)
M o -> go =<< IYielded <$> (as /\ _) <$> o
Pure _ -> pure Exited
go (IDidNotYield n) =
case n of
Request _ _ -> pure $ DidNotYield n
Respond rep f -> go (IYielded $ pure rep /\ f unit)
M o -> go =<< IDidNotYield <$> o
Pure _ -> pure Exited
go (IDidNotAwait n) =
case n of
Request _ f -> go (IDidNotYield (f a))
Respond rep f -> go (IYielded $ pure rep /\ f unit)
M o -> go =<< IDidNotAwait <$> o
Pure _ -> pure Exited
in
go (IDidNotAwait m)

View File

@@ -3,16 +3,21 @@ module Test.Main where
import Prelude
import Data.Maybe (Maybe(..))
import Data.Time.Duration (Milliseconds(..))
import Effect (Effect)
import Effect.Aff (launchAff_)
import Test.Pipes.Node.Stream as Test.Pipes.Node.Stream
import Test.Pipes.Collect as Test.Pipes.Collect
import Test.Pipes.Construct as Test.Pipes.Construct
import Test.Pipes.Node.Buffer as Test.Pipes.Node.Buffer
import Test.Pipes.Node.FS as Test.Pipes.Node.FS
import Test.Pipes.Node.Stream as Test.Pipes.Node.Stream
import Test.Spec.Reporter (specReporter)
import Test.Spec.Runner (defaultConfig, runSpec')
main :: Effect Unit
main = launchAff_ $ runSpec' (defaultConfig { failFast = true, timeout = Nothing }) [ specReporter ] do
main = launchAff_ $ runSpec' (defaultConfig { slow = Milliseconds 0.0, failFast = true, exit = false, timeout = Nothing }) [ specReporter ] do
Test.Pipes.Node.Stream.spec
Test.Pipes.Node.Buffer.spec
Test.Pipes.Node.FS.spec
Test.Pipes.Collect.spec
Test.Pipes.Construct.spec

View File

@@ -0,0 +1,111 @@
module Test.Pipes.Collect where
import Prelude
import Control.Monad.Gen (chooseInt)
import Control.Monad.Rec.Class (Step(..), tailRecM)
import Control.Monad.ST as ST
import Control.Monad.ST.Ref as STRef
import Data.Array as Array
import Data.Bifunctor (lmap)
import Data.HashMap (HashMap)
import Data.HashMap as HashMap
import Data.List (List)
import Data.List as List
import Data.Map (Map)
import Data.Map as Map
import Data.Maybe (Maybe(..))
import Data.Traversable (traverse)
import Data.Tuple.Nested (type (/\), (/\))
import Effect.Aff (Aff)
import Effect.Class (liftEffect)
import Effect.Unsafe (unsafePerformEffect)
import Foreign.Object (Object)
import Foreign.Object as Object
import Pipes (yield)
import Pipes.Collect as Pipes.Collect
import Pipes.Core (Producer)
import Test.QuickCheck.Gen (randomSampleOne)
import Test.Spec (Spec, describe, it)
import Test.Spec.Assertions (shouldEqual)
testData
:: { array :: Array (Int /\ Int)
, list :: List (Int /\ Int)
, strarray :: Array (String /\ Int)
, object :: Object Int
, map :: Map Int Int
, hashMap :: HashMap Int Int
, stream :: Producer (Int /\ Int) Aff Unit
, streamStr :: Producer (String /\ Int) Aff Unit
}
testData =
unsafePerformEffect $ do
array <-
flip traverse (Array.range 0 99999) \k -> do
v <- liftEffect $ randomSampleOne $ chooseInt 0 99999
pure $ k /\ v
let
strarray = lmap show <$> array
object = Object.fromFoldable strarray
map' :: forall m. m -> (Int -> Int -> m -> m) -> m
map' empty insert = ST.run do
st <- STRef.new empty
ST.foreach array \(k /\ v) -> void $ STRef.modify (insert k v) st
STRef.read st
hashMap = map' HashMap.empty HashMap.insert
map = map' Map.empty Map.insert
pure
{ array
, strarray
, list: List.fromFoldable array
, object
, hashMap
, map
, stream: flip tailRecM 0 \ix -> case Array.index array ix of
Just a -> yield a $> Loop (ix + 1)
Nothing -> pure $ Done unit
, streamStr: flip tailRecM 0 \ix -> case Array.index strarray ix of
Just a -> yield a $> Loop (ix + 1)
Nothing -> pure $ Done unit
}
spec :: Spec Unit
spec =
describe "Test.Pipes.Collect" do
describe "toArray" do
it "collects an array" do
act <- Pipes.Collect.toArray testData.stream
act `shouldEqual` testData.array
it "empty ok" do
act :: Array Int <- Pipes.Collect.toArray (pure unit)
act `shouldEqual` []
describe "toObject" do
it "collects" do
act <- Pipes.Collect.toObject $ testData.streamStr
act `shouldEqual` testData.object
it "empty ok" do
act :: Object Int <- Pipes.Collect.toObject (pure unit)
act `shouldEqual` Object.empty
describe "toMap" do
it "collects" do
act <- Pipes.Collect.toMap testData.stream
act `shouldEqual` testData.map
it "empty ok" do
act :: Map String Int <- Pipes.Collect.toMap (pure unit)
act `shouldEqual` Map.empty
describe "toHashMap" do
it "collects" do
act <- Pipes.Collect.toHashMap testData.stream
act `shouldEqual` testData.hashMap
it "empty ok" do
act :: HashMap String Int <- Pipes.Collect.toHashMap (pure unit)
act `shouldEqual` HashMap.empty
describe "toList" do
it "collects" do
act <- Pipes.Collect.toList testData.stream
act `shouldEqual` testData.list
it "empty ok" do
act :: List (String /\ Int) <- Pipes.Collect.toList (pure unit)
act `shouldEqual` List.Nil

View File

@@ -0,0 +1,58 @@
module Test.Pipes.Construct where
import Prelude
import Data.Array as Array
import Data.List as List
import Data.Map as Map
import Data.Tuple.Nested (type (/\), (/\))
import Effect.Class (liftEffect)
import Pipes.Collect as Pipes.Collect
import Pipes.Construct as Pipes.Construct
import Test.Spec (Spec, describe, it)
import Test.Spec.Assertions (shouldEqual)
spec :: Spec Unit
spec =
describe "Test.Pipes.Construct" do
describe "eachMap" do
it "empty map" do
kvs <- Pipes.Collect.toArray $ Pipes.Construct.eachMap Map.empty
kvs `shouldEqual` ([] :: Array (Int /\ Int))
it "nonempty map" do
let
exp = (\n -> n /\ n) <$> Array.range 0 99999
map = Map.fromFoldable exp
kvs <-
liftEffect
$ Pipes.Collect.toArray
$ Pipes.Construct.eachMap
$ map
kvs `shouldEqual` exp
describe "eachArray" do
it "empty array" do
kvs <- Pipes.Collect.toArray $ Pipes.Construct.eachArray []
kvs `shouldEqual` ([] :: Array Int)
it "nonempty array" do
let
inp = (\n -> n /\ n) <$> Array.range 0 99999
kvs <-
liftEffect
$ Pipes.Collect.toArray
$ Pipes.Construct.eachArray
$ inp
kvs `shouldEqual` inp
describe "eachList" do
it "empty list" do
kvs <- Pipes.Collect.toArray $ Pipes.Construct.eachList List.Nil
kvs `shouldEqual` ([] :: Array Int)
it "nonempty list" do
let
inp = (\n -> n /\ n) <$> Array.range 0 99999
kvs <-
liftEffect
$ Pipes.Collect.toArray
$ Pipes.Construct.eachList
$ List.fromFoldable
$ inp
kvs `shouldEqual` inp

View File

@@ -27,6 +27,7 @@ import Test.Spec (Spec, describe, it)
import Test.Spec.Assertions (fail, shouldEqual)
data BufferJunk = BufferJunk Buffer
instance Arbitrary BufferJunk where
arbitrary = sized \s -> do
ns <- vectorOf s (chooseInt 0 7)
@@ -36,6 +37,7 @@ instance Arbitrary BufferJunk where
pure $ BufferJunk buf
data BufferUTF8 = BufferUTF8 String Buffer
instance Arbitrary BufferUTF8 where
arbitrary = do
s <- genAsciiString
@@ -43,27 +45,27 @@ instance Arbitrary BufferUTF8 where
spec :: Spec Unit
spec = describe "Pipes.Node.Buffer" do
describe "toString" do
it "fails when encoding wrong" do
vals <- Pipes.each <$> (map \(BufferJunk b) -> b) <$> liftEffect (randomSample' 10 arbitrary)
let
uut = Pipes.runEffect $ vals >-> Pipes.Node.Buffer.toString UTF8 >-> Pipes.drain
ok = do
uut
fail "Should have thrown"
err _ = pure unit
catchError ok err
it "junk OK in hex" do
vals <- Pipes.each <$> (map \(BufferJunk b) -> b) <$> liftEffect (randomSample' 10 arbitrary)
Pipes.runEffect $ vals >-> Pipes.Node.Buffer.toString Hex >-> Pipes.drain
it "UTF8 ok" do
vals <- (map \(BufferUTF8 s b) -> s /\ b) <$> liftEffect (randomSample' 100 arbitrary)
let
bufs = Pipes.each $ snd <$> vals
strs = fst <$> vals
act <- Array.fromFoldable <$> Pipes.toListM (bufs >-> Pipes.Node.Buffer.toString UTF8)
act `shouldEqual` strs
describe "fromString" do
it "ok" do
vals <- Pipes.each <$> liftEffect (randomSample' 100 genAsciiString)
Pipes.runEffect $ vals >-> Pipes.Node.Buffer.fromString UTF8 >-> Pipes.drain
describe "toString" do
it "fails when encoding wrong" do
vals <- Pipes.each <$> (map \(BufferJunk b) -> b) <$> liftEffect (randomSample' 10 arbitrary)
let
uut = Pipes.runEffect $ vals >-> Pipes.Node.Buffer.toString UTF8 >-> Pipes.drain
ok = do
uut
fail "Should have thrown"
err _ = pure unit
catchError ok err
it "junk OK in hex" do
vals <- Pipes.each <$> (map \(BufferJunk b) -> b) <$> liftEffect (randomSample' 10 arbitrary)
Pipes.runEffect $ vals >-> Pipes.Node.Buffer.toString Hex >-> Pipes.drain
it "UTF8 ok" do
vals <- (map \(BufferUTF8 s b) -> s /\ b) <$> liftEffect (randomSample' 100 arbitrary)
let
bufs = Pipes.each $ snd <$> vals
strs = fst <$> vals
act <- Array.fromFoldable <$> Pipes.toListM (bufs >-> Pipes.Node.Buffer.toString UTF8)
act `shouldEqual` strs
describe "fromString" do
it "ok" do
vals <- Pipes.each <$> liftEffect (randomSample' 100 genAsciiString)
Pipes.runEffect $ vals >-> Pipes.Node.Buffer.fromString UTF8 >-> Pipes.drain

View File

@@ -24,63 +24,58 @@ import Test.Spec.Assertions (fail, shouldEqual)
spec :: Spec Unit
spec = describe "Pipes.Node.FS" do
describe "read" do
around tmpFile $ it "fails if the file does not exist" \p -> do
flip catchError (const $ pure unit) do
Pipes.runEffect $ Pipes.Node.FS.read p >-> Pipes.drain
fail "should have thrown"
around tmpFile $ it "reads ok" \p -> do
liftEffect $ FS.writeTextFile UTF8 p "foo"
s <- fold <$> Pipes.toListM (Pipes.Node.FS.read p >-> unEOS >-> Pipes.Node.Buffer.toString UTF8)
s `shouldEqual` "foo"
around tmpFile $ it "fails if the file already exists" \p -> do
liftEffect $ FS.writeTextFile UTF8 "foo" p
flip catchError (const $ pure unit) do
Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.create p
fail "should have thrown"
describe "create" do
around tmpFile $ it "creates the file when not exists" \p -> do
describe "read" do
around tmpFile $ it "fails if the file does not exist" \p -> do
flip catchError (const $ pure unit) do
Pipes.runEffect $ Pipes.Node.FS.read p >-> Pipes.drain
fail "should have thrown"
around tmpFile $ it "reads ok" \p -> do
liftEffect $ FS.writeTextFile UTF8 p "foo"
s <- fold <$> Pipes.toListM (Pipes.Node.FS.read p >-> unEOS >-> Pipes.Node.Buffer.toString UTF8)
s `shouldEqual` "foo"
describe "create" do
around tmpFile $ it "creates the file when not exists" \p -> do
Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.create p
contents <- liftEffect $ FS.readTextFile UTF8 p
contents `shouldEqual` "foo"
around tmpFile $ it "fails if the file already exists" \p -> do
liftEffect $ FS.writeTextFile UTF8 p "foo"
flip catchError (const $ pure unit) do
Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.create p
contents <- liftEffect $ FS.readTextFile UTF8 p
contents `shouldEqual` "foo"
around tmpFile $ it "fails if the file already exists" \p -> do
liftEffect $ FS.writeTextFile UTF8 "foo" p
flip catchError (const $ pure unit) do
Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.create p
fail "should have thrown"
describe "append" do
around tmpFile $ it "creates the file when not exists" \p -> do
Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.append p
contents <- liftEffect $ FS.readTextFile UTF8 p
contents `shouldEqual` "foo"
around tmpFile $ it "appends" \p -> do
Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.append p
Pipes.runEffect $ withEOS (yield "\n" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.append p
Pipes.runEffect $ withEOS (yield "bar" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.append p
contents <- liftEffect $ FS.readTextFile UTF8 p
contents `shouldEqual` "foo\nbar"
describe "truncate" do
around tmpFile $ it "creates the file when not exists" \p -> do
Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.truncate p
contents <- liftEffect $ FS.readTextFile UTF8 p
contents `shouldEqual` "foo"
around tmpFile $ it "overwrites contents" \p -> do
Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.truncate p
Pipes.runEffect $ withEOS (yield "bar" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.truncate p
contents <- liftEffect $ FS.readTextFile UTF8 p
contents `shouldEqual` "bar"
around tmpFiles $ it "json lines >-> parse >-> _.foo >-> write" \(a /\ b) -> do
let
exp = [{foo: "a"}, {foo: "bar"}, {foo: "123"}]
liftEffect $ FS.writeTextFile UTF8 a $ intercalate "\n" $ writeJSON <$> exp
Pipes.runEffect $
Pipes.Node.FS.read a
fail "should have thrown"
describe "append" do
around tmpFile $ it "creates the file when not exists" \p -> do
Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.append p
contents <- liftEffect $ FS.readTextFile UTF8 p
contents `shouldEqual` "foo"
around tmpFile $ it "appends" \p -> do
Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.append p
Pipes.runEffect $ withEOS (yield "\n" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.append p
Pipes.runEffect $ withEOS (yield "bar" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.append p
contents <- liftEffect $ FS.readTextFile UTF8 p
contents `shouldEqual` "foo\nbar"
describe "trunc" do
around tmpFile $ it "creates the file when not exists" \p -> do
Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.trunc p
contents <- liftEffect $ FS.readTextFile UTF8 p
contents `shouldEqual` "foo"
around tmpFile $ it "overwrites contents" \p -> do
Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.trunc p
Pipes.runEffect $ withEOS (yield "bar" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.trunc p
contents <- liftEffect $ FS.readTextFile UTF8 p
contents `shouldEqual` "bar"
around tmpFiles $ it "json lines >-> parse >-> _.foo >-> write" \(a /\ b) -> do
let
exp = [ { foo: "a" }, { foo: "bar" }, { foo: "123" } ]
liftEffect $ FS.writeTextFile UTF8 a $ intercalate "\n" $ writeJSON <$> exp
Pipes.runEffect $
Pipes.Node.FS.read a
>-> inEOS (Pipes.Node.Buffer.toString UTF8)
>-> Pipes.String.split (wrap "\n")
>-> inEOS (jsonParse @{foo :: String})
>-> inEOS (jsonParse @{ foo :: String })
>-> inEOS (Pipes.map _.foo)
>-> Pipes.Util.intersperse "\n"
>-> inEOS (Pipes.Node.Buffer.fromString UTF8)
>-> Pipes.Node.FS.create b
act <- liftEffect $ FS.readTextFile UTF8 b
act `shouldEqual` "a\nbar\n123"
act <- liftEffect $ FS.readTextFile UTF8 b
act `shouldEqual` "a\nbar\n123"

View File

@@ -1,4 +1,16 @@
import Stream from 'stream'
import * as CBOR from "cbor-x";
import * as CSVDecode from "csv-parse";
import * as CSVEncode from "csv-stringify";
export const cborDecode = () => new CBOR.DecoderStream({useRecords: false, allowHalfOpen: true});
export const cborEncode = () => new CBOR.EncoderStream({useRecords: false, allowHalfOpen: true});
export const cborDecodeSync = a => () => CBOR.decodeMultiple(a);
export const cborEncodeSync = a => () => CBOR.encode(a, {useRecords: false});
export const csvDecode = () => CSVDecode.parse({columns: true, allowHalfOpen: true})
export const csvEncode = () => CSVEncode.stringify({header: true, allowHalfOpen: true})
export const discardTransform = () => new Stream.Transform({
transform: function(_ck, _enc, cb) {
@@ -7,6 +19,16 @@ export const discardTransform = () => new Stream.Transform({
objectMode: true
})
export const slowTransform = () => {
return new Stream.Transform({
transform: function(ck, _enc, cb) {
this.push(ck)
setTimeout(() => cb(), 4)
},
objectMode: true
})
}
export const charsTransform = () => new Stream.Transform({
transform: function(ck, _enc, cb) {
ck.split('').filter(s => !!s).forEach(s => this.push(s))

View File

@@ -2,49 +2,72 @@ module Test.Pipes.Node.Stream where
import Prelude
import Control.Monad.Trans.Class (lift)
import Control.Monad.Cont (lift)
import Control.Monad.Error.Class (liftEither)
import Control.Monad.Except (runExcept)
import Data.Array as Array
import Data.Foldable (fold)
import Data.Bifunctor (lmap)
import Data.Either (Either(..))
import Data.Foldable (fold, intercalate)
import Data.FoldableWithIndex (forWithIndex_)
import Data.FunctorWithIndex (mapWithIndex)
import Data.Int as Int
import Data.List ((:))
import Data.List as List
import Data.Maybe (Maybe)
import Data.Newtype (wrap)
import Data.Profunctor.Strong (first)
import Data.String as String
import Data.String.Gen (genAlphaString)
import Data.Traversable (for_, traverse)
import Data.Tuple.Nested (type (/\), (/\))
import Effect (Effect)
import Effect.Aff (Aff, delay)
import Effect.Class (class MonadEffect, liftEffect)
import Effect.Exception (error)
import Effect.Unsafe (unsafePerformEffect)
import Foreign (Foreign)
import Node.Buffer (Buffer)
import Node.Buffer as Buffer
import Node.Encoding (Encoding(..))
import Node.FS.Stream as FS.Stream
import Node.FS.Sync as FS
import Node.Stream.Object as O
import Node.Zlib as Zlib
import Pipes (each) as Pipes
import Pipes (each) as Pipe
import Pipes (yield, (>->))
import Pipes.Async (sync, (>-/->))
import Pipes.Collect as Pipe.Collect
import Pipes.Core (Consumer, Producer, runEffect)
import Pipes.Node.Buffer as Pipes.Buffer
import Pipes.Node.Stream as S
import Pipes.Prelude (mapFoldable, toListM) as Pipes
import Simple.JSON (writeJSON)
import Test.Common (jsonParse, jsonStringify, tmpFile, tmpFiles)
import Pipes.Node.Buffer as Pipe.Buffer
import Pipes.Node.FS as Pipe.FS
import Pipes.Node.Stream as Pipe.Node
import Pipes.Node.Zlib as Pipe.Zlib
import Pipes.Prelude (toListM) as Pipe
import Simple.JSON (readImpl, readJSON, writeJSON)
import Test.Common (jsonStringify, tmpFile, tmpFiles)
import Test.QuickCheck.Arbitrary (arbitrary)
import Test.QuickCheck.Gen (randomSample')
import Test.Spec (Spec, around, describe, it)
import Test.Spec.Assertions (shouldEqual)
import Test.Spec.Assertions (fail, shouldEqual)
foreign import readableFromArray :: forall @a. Array a -> O.Readable a
foreign import discardTransform :: forall a b. Effect (O.Transform a b)
foreign import slowTransform :: forall a b. Effect (O.Transform a b)
foreign import charsTransform :: Effect (O.Transform String String)
foreign import cborEncodeSync :: forall a. a -> Effect Buffer
foreign import cborDecodeSync :: forall a. Buffer -> Effect a
foreign import cborEncode :: forall a. Effect (O.Transform a Buffer)
foreign import cborDecode :: forall a. Effect (O.Transform Buffer a)
foreign import csvEncode :: forall a. Effect (O.Transform a String)
foreign import csvDecode :: forall a. Effect (O.Transform String a)
writer :: forall m. MonadEffect m => String -> m (O.Writable Buffer /\ Consumer (Maybe Buffer) Aff Unit)
writer a = do
stream <- liftEffect $ O.fromBufferWritable <$> FS.Stream.createWriteStream a
pure $ stream /\ S.fromWritable stream
stream <- liftEffect $ O.unsafeCoerceWritable <$> FS.Stream.createWriteStream a
pure $ stream /\ Pipe.Node.fromWritable stream
reader :: forall m. MonadEffect m => String -> m (Producer (Maybe Buffer) Aff Unit)
reader a = liftEffect $ S.fromReadable <$> O.fromBufferReadable <$> FS.Stream.createReadStream a
reader a = liftEffect $ Pipe.Node.fromReadable <$> O.unsafeCoerceReadable <$> FS.Stream.createReadStream a
spec :: Spec Unit
spec =
@@ -52,30 +75,30 @@ spec =
describe "Readable" do
describe "Readable.from(<Iterable>)" do
it "empty" do
vals <- Pipes.toListM $ (S.fromReadable $ readableFromArray @{ foo :: String } []) >-> S.unEOS
vals <- Pipe.toListM $ (Pipe.Node.fromReadable $ readableFromArray @{ foo :: String } []) >-> Pipe.Node.unEOS
vals `shouldEqual` List.Nil
it "singleton" do
vals <- Pipes.toListM $ (S.fromReadable $ readableFromArray @{ foo :: String } [ { foo: "1" } ]) >-> S.unEOS
vals <- Pipe.toListM $ (Pipe.Node.fromReadable $ readableFromArray @{ foo :: String } [ { foo: "1" } ]) >-> Pipe.Node.unEOS
vals `shouldEqual` ({ foo: "1" } : List.Nil)
it "many elements" do
let exp = (\n -> { foo: show n }) <$> Array.range 0 100
vals <- Pipes.toListM $ (S.fromReadable $ readableFromArray exp) >-> S.unEOS
vals <- Pipe.toListM $ (Pipe.Node.fromReadable $ readableFromArray exp) >-> Pipe.Node.unEOS
vals `shouldEqual` (List.fromFoldable exp)
describe "Writable" $ around tmpFile do
describe "fs.WriteStream" do
it "pipe to file" \p -> do
stream <- O.fromBufferWritable <$> liftEffect (FS.Stream.createWriteStream p)
stream <- O.unsafeCoerceWritable <$> liftEffect (FS.Stream.createWriteStream p)
let
w = S.fromWritable stream
w = Pipe.Node.fromWritable stream
source = do
buf <- liftEffect $ Buffer.fromString "hello" UTF8
yield buf
runEffect $ S.withEOS source >-> w
runEffect $ Pipe.Node.withEOS source >-> w
contents <- liftEffect $ FS.readTextFile UTF8 p
contents `shouldEqual` "hello"
shouldEqual true =<< liftEffect (O.isWritableEnded stream)
it "async pipe to file" \p -> do
w <- S.fromWritable <$> O.fromBufferWritable <$> liftEffect (FS.Stream.createWriteStream p)
w <- Pipe.Node.fromWritable <$> O.unsafeCoerceWritable <$> liftEffect (FS.Stream.createWriteStream p)
let
source = do
yield "hello, "
@@ -87,7 +110,7 @@ spec =
yield "this is a "
lift $ delay $ wrap 5.0
yield "test."
runEffect $ S.withEOS (source >-> Pipes.Buffer.fromString UTF8) >-> w
runEffect $ Pipe.Node.withEOS (source >-> Pipe.Buffer.fromString UTF8) >-> w
contents <- liftEffect $ FS.readTextFile UTF8 p
contents `shouldEqual` "hello, world! this is a test."
it "chained pipes" \p -> do
@@ -96,45 +119,110 @@ spec =
str :: String <- genAlphaString
num :: Int <- arbitrary
stuff :: Array String <- arbitrary
pure {str, num, stuff}
pure { str, num, stuff }
objs <- liftEffect (randomSample' 1 obj)
let
exp = fold (writeJSON <$> objs)
stream /\ w <- liftEffect $ writer p
runEffect $ S.withEOS (Pipes.each objs >-> jsonStringify >-> Pipes.Buffer.fromString UTF8) >-> w
runEffect $ Pipe.Node.withEOS (Pipe.each objs >-> jsonStringify >-> Pipe.Buffer.fromString UTF8) >-> w
contents <- liftEffect $ FS.readTextFile UTF8 p
contents `shouldEqual` exp
shouldEqual true =<< liftEffect (O.isWritableEnded stream)
describe "Transform" do
it "gzip" do
let
json = yield $ writeJSON {foo: "bar"}
exp = "1f8b0800000000000003ab564acbcf57b2524a4a2c52aa0500eff52bfe0d000000"
gzip <- S.fromTransform <$> O.fromBufferTransform <$> liftEffect (Zlib.toDuplex <$> Zlib.createGzip)
outs :: List.List String <- Pipes.toListM (S.withEOS (json >-> Pipes.Buffer.fromString UTF8) >-> gzip >-> S.unEOS >-> Pipes.Buffer.toString Hex)
fold outs `shouldEqual` exp
around tmpFiles
$ it "file >-> gzip >-> file >-> gunzip" \(a /\ b) -> do
liftEffect $ FS.writeTextFile UTF8 a $ writeJSON [1, 2, 3, 4]
areader <- liftEffect $ reader a
bwritestream /\ bwriter <- liftEffect $ writer b
gzip <- S.fromTransform <$> O.fromBufferTransform <$> liftEffect (Zlib.toDuplex <$> Zlib.createGzip)
runEffect $ areader >-> gzip >-> bwriter
shouldEqual true =<< liftEffect (O.isWritableEnded bwritestream)
let
bignums = Array.range 1 1000
firstNames = String.split (wrap "\n") $ unsafePerformEffect (FS.readTextFile UTF8 "./test/Test/first_names.txt")
lastNames = String.split (wrap "\n") $ unsafePerformEffect (FS.readTextFile UTF8 "./test/Test/last_names.txt")
names n = do
first <- firstNames
last <- Array.take (Int.round $ Int.toNumber n / Int.toNumber (Array.length firstNames)) lastNames
pure $ first <> " " <> last
people n = mapWithIndex (\ix name -> {id: show $ ix + 1, name}) (names n)
peopleCSV n = "id,name\n" <> intercalate "\n" ((\{id, name} -> id <> "," <> name) <$> people n)
gunzip <- S.fromTransform <$> O.fromBufferTransform <$> liftEffect (Zlib.toDuplex <$> Zlib.createGunzip)
breader <- liftEffect $ reader b
nums <- Pipes.toListM (breader >-> gunzip >-> S.unEOS >-> Pipes.Buffer.toString UTF8 >-> jsonParse @(Array Int) >-> Pipes.mapFoldable identity)
Array.fromFoldable nums `shouldEqual` [1, 2, 3, 4]
for_ [4000, 8000, 32000, 64000, 200000] \n -> do
let
csv = peopleCSV n
people' = people n
around tmpFiles
$ it (show n <> " row csv >-/-> csv-parse >-/-> cborEncode") \(a /\ _) -> do
liftEffect $ FS.writeTextFile UTF8 a csv
cbor :: Buffer <- Pipe.Collect.toBuffer
$ Pipe.FS.read a
>-> Pipe.Node.inEOS (Pipe.Buffer.toString UTF8)
>-/-> Pipe.Node.fromTransformEffect csvDecode
>-/-> Pipe.Node.fromTransformEffect cborEncode
>-> Pipe.Node.unEOS
f :: Array Foreign <- liftEffect $ cborDecodeSync cbor
ppl <- traverse (liftEither <<< lmap (error <<< show) <<< runExcept <<< readImpl) f
ppl `shouldEqual` people'
around tmpFiles
$ it (show n <> " row csv >-> sync csv-parse >-> sync cborEncode") \(a /\ _) -> do
liftEffect $ FS.writeTextFile UTF8 a csv
cbor :: Buffer <- Pipe.Collect.toBuffer
$ Pipe.FS.read a
>-> Pipe.Node.inEOS (Pipe.Buffer.toString UTF8)
>-> sync (Pipe.Node.fromTransformEffect csvDecode)
>-> sync (Pipe.Node.fromTransformEffect cborEncode)
>-> Pipe.Node.unEOS
f :: Array Foreign <- liftEffect $ cborDecodeSync cbor
ppl <- traverse (liftEither <<< lmap (error <<< show) <<< runExcept <<< readImpl) f
ppl `shouldEqual` people'
around tmpFiles
$ it "file >-> sync gzip >-> sync gunzip" \(a /\ _) -> do
liftEffect $ FS.writeTextFile UTF8 a $ writeJSON bignums
json <- Pipe.Collect.toMonoid
$ Pipe.FS.read a
>-> sync Pipe.Zlib.gzip
>-> sync Pipe.Zlib.gunzip
>-> Pipe.Node.unEOS
>-> Pipe.Buffer.toString UTF8
readJSON json `shouldEqual` (Right bignums)
around tmpFiles
$ it "file >-/-> gzip >-/-> slow >-/-> gunzip" \(a /\ _) -> do
liftEffect $ FS.writeTextFile UTF8 a $ writeJSON bignums
json <-
Pipe.Collect.toMonoid
$ Pipe.FS.read a
>-/-> Pipe.Zlib.gzip
>-/-> Pipe.Node.fromTransformEffect slowTransform
>-/-> Pipe.Zlib.gunzip
>-> Pipe.Node.unEOS
>-> Pipe.Buffer.toString UTF8
readJSON json `shouldEqual` (Right bignums)
around tmpFiles
$ it "file >-> sync gzip >-> sync slow >-> sync gunzip" \(a /\ _) -> do
liftEffect $ FS.writeTextFile UTF8 a $ writeJSON bignums
json <-
Pipe.Collect.toMonoid
$ Pipe.FS.read a
>-> sync Pipe.Zlib.gzip
>-> sync (Pipe.Node.fromTransformEffect slowTransform)
>-> sync Pipe.Zlib.gunzip
>-> Pipe.Node.unEOS
>-> Pipe.Buffer.toString UTF8
readJSON json `shouldEqual` (Right bignums)
around tmpFile $ it "file >-> discardTransform" \(p :: String) -> do
liftEffect $ FS.writeTextFile UTF8 p "foo"
r <- reader p
discard' <- liftEffect discardTransform
out :: List.List Int <- Pipes.toListM $ r >-> S.fromTransform discard' >-> S.unEOS
out :: List.List Int <-
Pipe.toListM
$ r
>-/-> Pipe.Node.fromTransformEffect discardTransform
>-> Pipe.Node.unEOS
out `shouldEqual` List.Nil
around tmpFile $ it "file >-> charsTransform" \(p :: String) -> do
liftEffect $ FS.writeTextFile UTF8 p "foo bar"
r <- reader p
chars' <- liftEffect charsTransform
out :: List.List String <- Pipes.toListM $ r >-> S.inEOS (Pipes.Buffer.toString UTF8) >-> S.fromTransform chars' >-> S.unEOS
out `shouldEqual` List.fromFoldable ["f", "o", "o", " ", "b", "a", "r"]
out :: List.List String <-
Pipe.toListM $
r
>-> Pipe.Node.inEOS (Pipe.Buffer.toString UTF8)
>-/-> Pipe.Node.fromTransformEffect charsTransform
>-> Pipe.Node.unEOS
out `shouldEqual` List.fromFoldable [ "f", "o", "o", " ", "b", "a", "r" ]

4095
test/Test/first_names.txt Normal file

File diff suppressed because it is too large Load Diff

4096
test/Test/last_names.txt Normal file

File diff suppressed because it is too large Load Diff