21 Commits

Author SHA1 Message Date
7e6c6af3dd chore: prepare v1.2.3 2024-05-11 22:11:44 -05:00
faf49fafd5 chore: lock 2024-05-11 22:11:40 -05:00
04815f66a4 chore: prepare v1.2.2 2024-05-11 22:10:57 -05:00
fd895de148 fix: ensure-ranges 2024-05-11 22:09:45 -05:00
b618ef1819 chore: prepare v1.2.1 2024-05-11 22:08:38 -05:00
407491f055 fix: generalize all Affs to MonadAffs 2024-05-11 22:08:27 -05:00
2fdf6f0dad chore: prepare v1.2.0 2024-05-11 22:01:44 -05:00
eb01962553 feat: fix inEOS, add uniqHash + invoke 2024-05-11 22:01:06 -05:00
4baf317f43 chore: prepare v1.1.0 2024-05-11 18:02:13 -05:00
634e52fe39 fix: more Pipes.Collect utils, stack-safety 2024-05-11 18:01:43 -05:00
f2f18c3c13 chore: prepare v1.0.5 2024-05-10 18:30:36 -05:00
76958b63ef feat: Pipes.Util.chunked 2024-05-10 18:30:27 -05:00
821a47229c chore: prepare v1.0.4 2024-05-10 18:16:01 -05:00
f373334f77 feat: Pipes.Collect 2024-05-10 18:15:58 -05:00
30fbce3a2d chore: prepare v1.0.3 2024-05-10 18:03:48 -05:00
3c8e497fa2 fix: explicit tailRecM 2024-05-10 18:03:34 -05:00
7a18a7182c chore: prepare v1.0.2 2024-05-10 15:05:20 -05:00
93ef037344 fix: lockfile 2024-05-10 15:05:14 -05:00
b7ace71fc0 chore: prepare v1.0.1 2024-05-10 15:04:30 -05:00
805f3b8887 feat: Pipes.Node.FS, Pipes.Node.Buffer, etc. 2024-05-10 15:04:18 -05:00
01ebfba9ad docs: update readme 2024-05-09 17:33:44 -05:00
21 changed files with 1052 additions and 270 deletions

111
README.md
View File

@@ -1,81 +1,42 @@
# purescript-csv-stream
# purescript-node-stream-pipes
Type-safe bindings for the streaming API of `csv-parse` and `csv-stringify`.
Interact with node streams in object mode using [`Pipes`]!
## Example
```purescript
import Prelude
import Effect.Aff (launchAff_)
import Effect (Effect)
import Effect.Class (liftEffect)
import Effect.Console (log)
import Pipes.Prelude ((>->))
import Pipes.Prelude as Pipes
import Pipes.Core as Pipes.Core
import Pipes.Node.FS.Stream as FS
import Pipes.Node.Zlib as Zlib
import Pipes.CSV.Parse as CSV.Parse
-- == my-zipped-data.csv ==
-- id,foo,is_deleted
-- 1,hello,f
-- 2,goodbye,t
-- Logs:
-- {id: 1, foo: "hello", is_deleted: false}
-- {id: 2, foo: "goodbye", is_deleted: true}
main :: Effect Unit
main =
Pipes.Core.runEffect
$ FS.createReadStream "my-zipped-data.csv.gz"
>-> Zlib.gunzip
>-> CSV.Parse.parse @{id :: Int, foo :: String, is_deleted :: Boolean}
>-> Pipes.mapM (liftEffect <<< log)
```
## Installing
```bash
spago install csv-stream
{bun|yarn|npm|pnpm} install csv-parse csv-stringify
spago install node-stream-pipes
```
## Examples
### Stream
```purescript
module Main where
import Prelude
import Effect (Effect)
import Effect.Class (liftEffect)
import Effect.Aff (launchAff_)
import Node.Stream (pipe)
import Node.Stream as Stream
import Node.Stream.CSV.Stringify as CSV.Stringify
import Node.Stream.CSV.Parse as CSV.Parse
type MyCSVType1 = {a :: Int, b :: Int, bar :: String, baz :: Boolean}
type MyCSVType2 = {ab :: Int, bar :: String, baz :: Boolean}
atob :: MyCSVType1 -> MyCSVType2
atob {a, b, bar, baz} = {ab: a + b, bar, baz}
myCSV :: String
myCSV = "a,b,bar,baz\n1,2,\"hello, world!\",true\n3,3,,f"
main :: Effect Unit
main = launchAff_ do
parser <- liftEffect $ CSV.Parse.make {}
stringifier <- liftEffect $ CSV.Stringify.make {}
input <- liftEffect $ Stream.readableFromString myCSV
liftEffect $ Stream.pipe input parser
records <- CSV.Parse.readAll parser
liftEffect $ for_ records \r -> CSV.Stringify.write $ atob r
liftEffect $ Stream.end stringifier
-- "ab,bar,baz\n3,\"hello, world!\",true\n6,,false"
csvString <- CSV.Stringify.readAll stringifier
pure unit
```
### Synchronous
```purescript
module Main where
import Prelude
import Effect (Effect)
import Effect.Class (liftEffect)
import Effect.Aff (launchAff_)
import Node.Stream (pipe)
import Node.Stream as Stream
import Node.Stream.CSV.Stringify as CSV.Stringify
import Node.Stream.CSV.Parse as CSV.Parse
type MyCSVType1 = {a :: Int, b :: Int, bar :: String, baz :: Boolean}
type MyCSVType2 = {ab :: Int, bar :: String, baz :: Boolean}
atob :: MyCSVType1 -> MyCSVType2
atob {a, b, bar, baz} = {ab: a + b, bar, baz}
myCSV :: String
myCSV = "a,b,bar,baz\n1,2,\"hello, world!\",true\n3,3,,f"
main :: Effect Unit
main = launchAff_ do
records :: Array MyCSVType1 <- CSV.Parse.parse myCSV
-- "ab,bar,baz\n3,\"hello, world!\",true\n6,,false"
csvString <- CSV.Stringify.stringify (atob <$> records)
pure unit
```
[`Pipes`]: https://pursuit.purescript.org/packages/purescript-pipes/8.0.0

View File

@@ -1,6 +1,7 @@
{
"name": "purescript-csv-stream",
"version": "v1.0.0",
"name": "purescript-node-stream-pipes",
"version": "v1.2.3",
"type": "module",
"dependencies": {
"csv-parse": "^5.5.5",
"csv-stringify": "^6.4.6"

View File

@@ -4,19 +4,32 @@ workspace:
path: ./
dependencies:
- aff: ">=7.1.0 <8.0.0"
- control: ">=6.0.0 <7.0.0"
- arrays: ">=7.3.0 <8.0.0"
- effect: ">=4.0.0 <5.0.0"
- either: ">=6.1.0 <7.0.0"
- exceptions: ">=6.0.0 <7.0.0"
- foldable-traversable: ">=6.0.0 <7.0.0"
- foreign-object: ">=4.1.0 <5.0.0"
- lists: ">=7.0.0 <8.0.0"
- maybe: ">=6.0.0 <7.0.0"
- mmorph: ">=7.0.0 <8.0.0"
- newtype: ">=5.0.0 <6.0.0"
- node-buffer: ">=9.0.0 <10.0.0"
- node-event-emitter: ">=3.0.0 <4.0.0"
- node-fs: ">=9.1.0 <10.0.0"
- node-path: ">=5.0.0 <6.0.0"
- node-streams: ">=9.0.0 <10.0.0"
- node-zlib: ">=0.4.0 <0.5.0"
- ordered-collections: ">=3.2.0 <4.0.0"
- parallel: ">=6.0.0 <7.0.0"
- pipes: ">=8.0.0 <9.0.0"
- prelude: ">=6.0.1 <7.0.0"
- st: ">=6.2.0 <7.0.0"
- strings: ">=6.0.1 <7.0.0"
- tailrec: ">=6.1.0 <7.0.0"
- transformers: ">=6.0.0 <7.0.0"
- tuples: ">=7.0.0 <8.0.0"
- unordered-collections: ">=3.1.0 <4.0.0"
- unsafe-coerce: ">=6.0.0 <7.0.0"
test_dependencies:
- console
@@ -26,6 +39,7 @@ workspace:
- quickcheck
- simple-json
- spec
- spec-quickcheck
build_plan:
- aff
- ansi
@@ -87,6 +101,7 @@ workspace:
- safe-coerce
- simple-json
- spec
- spec-quickcheck
- st
- strings
- tailrec
@@ -95,6 +110,7 @@ workspace:
- type-equality
- typelevel-prelude
- unfoldable
- unordered-collections
- unsafe-coerce
- variant
extra_packages: {}
@@ -791,6 +807,16 @@ packages:
- tailrec
- transformers
- tuples
spec-quickcheck:
type: registry
version: 5.0.0
integrity: sha256-iE0iThqZCuDGe3pwg5RvqcL8E5cRQ4txDuloCclOsCs=
dependencies:
- aff
- prelude
- quickcheck
- random
- spec
st:
type: registry
version: 6.2.0
@@ -883,6 +909,21 @@ packages:
- partial
- prelude
- tuples
unordered-collections:
type: registry
version: 3.1.0
integrity: sha256-H2eQR+ylI+cljz4XzWfEbdF7ee+pnw2IZCeq69AuJ+Q=
dependencies:
- arrays
- enums
- functions
- integers
- lists
- prelude
- record
- tuples
- typelevel-prelude
- unfoldable
unsafe-coerce:
type: registry
version: 6.0.0

View File

@@ -1,7 +1,7 @@
package:
name: node-stream-pipes
publish:
version: '1.0.0'
version: '1.2.3'
license: 'GPL-3.0-or-later'
location:
githubOwner: 'cakekindel'
@@ -11,19 +11,32 @@ package:
pedanticPackages: true
dependencies:
- aff: ">=7.1.0 <8.0.0"
- control: ">=6.0.0 <7.0.0"
- arrays: ">=7.3.0 <8.0.0"
- effect: ">=4.0.0 <5.0.0"
- either: ">=6.1.0 <7.0.0"
- exceptions: ">=6.0.0 <7.0.0"
- foldable-traversable: ">=6.0.0 <7.0.0"
- foreign-object: ">=4.1.0 <5.0.0"
- lists: ">=7.0.0 <8.0.0"
- maybe: ">=6.0.0 <7.0.0"
- mmorph: ">=7.0.0 <8.0.0"
- newtype: ">=5.0.0 <6.0.0"
- node-buffer: ">=9.0.0 <10.0.0"
- node-event-emitter: ">=3.0.0 <4.0.0"
- node-fs: ">=9.1.0 <10.0.0"
- node-path: ">=5.0.0 <6.0.0"
- node-streams: ">=9.0.0 <10.0.0"
- node-zlib: ">=0.4.0 <0.5.0"
- ordered-collections: ">=3.2.0 <4.0.0"
- parallel: ">=6.0.0 <7.0.0"
- pipes: ">=8.0.0 <9.0.0"
- prelude: ">=6.0.1 <7.0.0"
- st: ">=6.2.0 <7.0.0"
- strings: ">=6.0.1 <7.0.0"
- tailrec: ">=6.1.0 <7.0.0"
- transformers: ">=6.0.0 <7.0.0"
- tuples: ">=7.0.0 <8.0.0"
- unordered-collections: ">=3.1.0 <4.0.0"
- unsafe-coerce: ">=6.0.0 <7.0.0"
test:
main: Test.Main
@@ -35,5 +48,6 @@ package:
- quickcheck
- simple-json
- spec
- spec-quickcheck
workspace:
extraPackages: {}

View File

@@ -1,35 +1,39 @@
import Stream from "stream";
/** @type {(s: Stream.Readable | Stream.Transform) => () => boolean} */
export const isReadableImpl = s => () => s.readable
export const isReadableImpl = (s) => () => s.readable;
/** @type {(s: Stream.Writable | Stream.Readable) => () => boolean} */
export const isClosedImpl = s => () => s.closed
export const isClosedImpl = (s) => () => s.closed;
/** @type {(s: Stream.Writable | Stream.Transform) => () => boolean} */
export const isWritableImpl = s => () => s.writable
export const isWritableImpl = (s) => () => s.writable;
/** @type {(s: Stream.Readable | Stream.Transform) => () => boolean} */
export const isReadableEndedImpl = s => () => s.readableEnded
export const isReadableEndedImpl = (s) => () => s.readableEnded;
/** @type {(s: Stream.Writable | Stream.Transform) => () => boolean} */
export const isWritableEndedImpl = s => () => s.writableEnded
export const isWritableEndedImpl = (s) => () => s.writableEnded;
/** @type {(s: Stream.Writable | Stream.Transform) => () => void} */
export const endImpl = (s) => () => s.end();
/** @type {<WriteResult>(o: {ok: WriteResult, wouldBlock: WriteResult, closed: WriteResult}) => (s: Stream.Writable | Stream.Transform) => (a: unknown) => () => WriteResult} */
export const writeImpl = ({ok, wouldBlock, closed}) => (s) => (a) => () => {
if (s.closed || s.writableEnded) {
return closed
}
export const writeImpl =
({ ok, wouldBlock, closed }) =>
(s) =>
(a) =>
() => {
if (s.closed || s.writableEnded) {
return closed;
}
if (s.write(a)) {
return ok
} else {
return wouldBlock
}
}
if (s.write(a)) {
return ok;
} else {
return wouldBlock;
}
};
/** @type {<ReadResult>(o: {just: (_a: unknown) => ReadResult, wouldBlock: ReadResult, closed: ReadResult}) => (s: Stream.Readable | Stream.Transform) => () => ReadResult} */
export const readImpl =

View File

@@ -2,7 +2,16 @@ module Node.Stream.Object where
import Prelude
import Control.Monad.Error.Class (liftEither)
import Control.Monad.ST.Class (liftST)
import Control.Monad.ST.Global (Global)
import Control.Monad.ST.Ref (STRef)
import Control.Monad.ST.Ref as STRef
import Control.Parallel (parOneOf)
import Data.Either (Either(..))
import Data.Generic.Rep (class Generic)
import Data.Maybe (Maybe(..))
import Data.Show.Generic (genericShow)
import Effect (Effect)
import Effect.Aff (Aff, effectCanceler, makeAff)
import Effect.Class (liftEffect)
@@ -20,11 +29,22 @@ data ReadResult a
| ReadClosed
| ReadJust a
derive instance Generic (ReadResult a) _
derive instance Functor ReadResult
derive instance Eq a => Eq (ReadResult a)
instance Show (ReadResult a) where
show = genericShow <<< map (const "..")
data WriteResult
= WriteWouldBlock
| WriteClosed
| WriteOk
derive instance Generic WriteResult _
derive instance Eq WriteResult
instance Show WriteResult where
show = genericShow
type ReadResultFFI a = { closed :: ReadResult a, wouldBlock :: ReadResult a, just :: a -> ReadResult a }
type WriteResultFFI = { closed :: WriteResult, wouldBlock :: WriteResult, ok :: WriteResult }
@@ -42,10 +62,10 @@ foreign import isWritableEndedImpl :: forall s. s -> Effect Boolean
foreign import isClosedImpl :: forall s. s -> Effect Boolean
readResultFFI :: forall a. ReadResultFFI a
readResultFFI = {closed: ReadClosed, wouldBlock: ReadWouldBlock, just: ReadJust}
readResultFFI = { closed: ReadClosed, wouldBlock: ReadWouldBlock, just: ReadJust }
writeResultFFI :: WriteResultFFI
writeResultFFI = {closed: WriteClosed, wouldBlock: WriteWouldBlock, ok: WriteOk}
writeResultFFI = { closed: WriteClosed, wouldBlock: WriteWouldBlock, ok: WriteOk }
class Stream :: Type -> Constraint
class Stream s where
@@ -100,6 +120,12 @@ else instance (Write s a) => Write s a where
write s a = write s a
end s = end s
withErrorST :: forall s. Stream s => s -> Effect { cancel :: Effect Unit, error :: STRef Global (Maybe Error) }
withErrorST s = do
error <- liftST $ STRef.new Nothing
cancel <- flip (Event.once errorH) s \e -> void $ liftST $ STRef.write (Just e) error
pure { error, cancel }
fromBufferReadable :: forall r. Stream.Readable r -> Readable Buffer
fromBufferReadable = unsafeCoerce
@@ -123,34 +149,26 @@ awaitReadableOrClosed s = do
closed <- liftEffect $ isClosed s
ended <- liftEffect $ isReadableEnded s
readable <- liftEffect $ isReadable s
when (not ended && not closed && not readable) $ makeAff \res -> do
cancelClose <- Event.once closeH (res $ Right unit) s
cancelError <- Event.once errorH (res <<< Left) s
cancelReadable <- flip (Event.once readableH) s do
cancelClose
cancelError
res $ Right unit
pure $ effectCanceler do
cancelReadable
cancelClose
cancelError
when (not ended && not closed && not readable)
$ liftEither =<< parOneOf [ onceAff0 readableH s $> Right unit, onceAff0 closeH s $> Right unit, Left <$> onceAff1 errorH s ]
awaitWritableOrClosed :: forall s a. Write s a => s -> Aff Unit
awaitWritableOrClosed s = do
closed <- liftEffect $ isClosed s
ended <- liftEffect $ isWritableEnded s
writable <- liftEffect $ isWritable s
when (not closed && not ended && not writable) $ makeAff \res -> do
cancelClose <- Event.once closeH (res $ Right unit) s
cancelError <- Event.once errorH (res <<< Left) s
cancelDrain <- flip (Event.once drainH) s do
cancelClose
cancelError
res $ Right unit
pure $ effectCanceler do
cancelDrain
cancelClose
cancelError
when (not ended && not closed && not writable)
$ liftEither =<< parOneOf [ onceAff0 drainH s $> Right unit, onceAff0 closeH s $> Right unit, Left <$> onceAff1 errorH s ]
onceAff0 :: forall e. EventHandle0 e -> e -> Aff Unit
onceAff0 h emitter = makeAff \res -> do
cancel <- Event.once h (res $ Right unit) emitter
pure $ effectCanceler cancel
onceAff1 :: forall e a. EventHandle1 e a -> e -> Aff a
onceAff1 h emitter = makeAff \res -> do
cancel <- Event.once h (res <<< Right) emitter
pure $ effectCanceler cancel
readableH :: forall s a. Read s a => EventHandle0 s
readableH = EventHandle "readable" identity

82
src/Pipes.Collect.purs Normal file
View File

@@ -0,0 +1,82 @@
module Pipes.Collect where
import Prelude
import Control.Monad.Maybe.Trans (MaybeT(..), runMaybeT)
import Control.Monad.Rec.Class (class MonadRec, Step(..), tailRecM)
import Control.Monad.ST.Class (liftST)
import Control.Monad.Trans.Class (lift)
import Data.Array.ST as Array.ST
import Data.Either (hush)
import Data.HashMap (HashMap)
import Data.HashMap as HashMap
import Data.Hashable (class Hashable)
import Data.List (List)
import Data.List as List
import Data.Map (Map)
import Data.Map as Map
import Data.Maybe (fromMaybe)
import Data.Tuple.Nested (type (/\), (/\))
import Effect.Class (class MonadEffect, liftEffect)
import Foreign.Object (Object)
import Foreign.Object.ST as Object.ST
import Foreign.Object.ST.Unsafe as Object.ST.Unsafe
import Pipes (next) as Pipes
import Pipes.Core (Producer)
-- | Fold every value produced
-- |
-- | Uses `MonadRec`, supporting producers of arbitrary length.
fold :: forall a b m. MonadRec m => (b -> a -> b) -> b -> Producer a m Unit -> m b
fold f b p =
let
insertNext b' p' = runMaybeT do
a /\ p'' <- MaybeT $ hush <$> Pipes.next p'
pure $ Loop $ f b' a /\ p''
in
flip tailRecM (b /\ p) \(b' /\ p') -> fromMaybe (Done b') <$> insertNext b' p'
-- | Fold every value produced with a monadic action
-- |
-- | Uses `MonadRec`, supporting producers of arbitrary length.
traverse :: forall a b m. MonadRec m => (b -> a -> m b) -> b -> Producer a m Unit -> m b
traverse f b p =
let
insertNext b' p' = runMaybeT do
a /\ p'' <- MaybeT $ hush <$> Pipes.next p'
b'' <- lift $ f b' a
pure $ Loop $ b'' /\ p''
in
flip tailRecM (b /\ p) \(b' /\ p') -> fromMaybe (Done b') <$> insertNext b' p'
-- | Execute a monadic action on every item in a producer.
-- |
-- | Uses `MonadRec`, supporting producers of arbitrary length.
foreach :: forall a m. MonadRec m => (a -> m Unit) -> Producer a m Unit -> m Unit
foreach f = traverse (const f) unit
-- | Collect all values from a `Producer` into an array.
toArray :: forall a m. MonadRec m => MonadEffect m => Producer a m Unit -> m (Array a)
toArray p = do
st <- liftEffect $ liftST $ Array.ST.new
foreach (void <<< liftEffect <<< liftST <<< flip Array.ST.push st) p
liftEffect $ liftST $ Array.ST.unsafeFreeze st
-- | Collect all values from a `Producer` into a list.
toList :: forall a m. MonadRec m => MonadEffect m => Producer a m Unit -> m (List a)
toList = map List.reverse <<< fold (flip List.Cons) List.Nil
-- | Collect all values from a `Producer` into a Javascript Object.
toObject :: forall a m. MonadRec m => MonadEffect m => Producer (String /\ a) m Unit -> m (Object a)
toObject p = do
st <- liftEffect $ liftST $ Object.ST.new
foreach (\(k /\ v) -> void $ liftEffect $ liftST $ Object.ST.poke k v st) p
liftEffect $ liftST $ Object.ST.Unsafe.unsafeFreeze st
-- | Collect all values from a `Producer` into a `HashMap`
toHashMap :: forall k v m. Hashable k => MonadRec m => Producer (k /\ v) m Unit -> m (HashMap k v)
toHashMap = fold (\map (k /\ v) -> HashMap.insert k v map) HashMap.empty
-- | Collect all values from a `Producer` into a `Map`
toMap :: forall k v m. Ord k => MonadRec m => Producer (k /\ v) m Unit -> m (Map k v)
toMap = fold (\map (k /\ v) -> Map.insert k v map) Map.empty

1
src/Pipes.Construct.purs Normal file
View File

@@ -0,0 +1 @@
module Pipes.Construct where

View File

@@ -0,0 +1,17 @@
module Pipes.Node.Buffer where
import Prelude
import Control.Monad.Morph (hoist)
import Effect.Class (class MonadEffect, liftEffect)
import Node.Buffer (Buffer)
import Node.Buffer as Buffer
import Node.Encoding (Encoding)
import Pipes.Core (Pipe)
import Pipes.Prelude as Pipes
toString :: forall m. MonadEffect m => Encoding -> Pipe Buffer String m Unit
toString enc = hoist liftEffect $ Pipes.mapM $ Buffer.toString enc
fromString :: forall m. MonadEffect m => Encoding -> Pipe String Buffer m Unit
fromString enc = hoist liftEffect $ Pipes.mapM $ flip Buffer.fromString enc

63
src/Pipes.Node.FS.purs Normal file
View File

@@ -0,0 +1,63 @@
module Pipes.Node.FS where
import Prelude
import Control.Monad.Error.Class (class MonadThrow)
import Data.Maybe (Maybe)
import Effect.Aff.Class (class MonadAff)
import Effect.Class (liftEffect)
import Effect.Exception (Error)
import Node.Buffer (Buffer)
import Node.FS.Stream (WriteStreamOptions)
import Node.FS.Stream as FS.Stream
import Node.Path (FilePath)
import Node.Stream.Object as O
import Pipes.Core (Consumer, Producer)
import Pipes.Node.Stream (fromReadable, fromWritable)
import Prim.Row (class Union)
-- | Creates a `fs.Writable` stream for the file
-- | at the given path.
-- |
-- | Writing `Nothing` to this pipe will close the stream.
-- |
-- | See `Pipes.Node.Stream.withEOS` for converting `Producer a`
-- | into `Producer (Maybe a)`, emitting `Nothing` before exiting.
write
:: forall r trash m
. Union r trash WriteStreamOptions
=> MonadAff m
=> MonadThrow Error m
=> Record r
-> FilePath
-> Consumer (Maybe Buffer) m Unit
write o p = do
w <- liftEffect $ FS.Stream.createWriteStream' p o
fromWritable $ O.fromBufferWritable w
-- | Open a file in write mode, failing if the file already exists.
-- |
-- | `write {flags: "wx"}`
create :: forall m. MonadAff m => MonadThrow Error m => FilePath -> Consumer (Maybe Buffer) m Unit
create = write { flags: "wx" }
-- | Open a file in write mode, truncating it if the file already exists.
-- |
-- | `write {flags: "w"}`
truncate :: forall m. MonadAff m => MonadThrow Error m => FilePath -> Consumer (Maybe Buffer) m Unit
truncate = write { flags: "w" }
-- | Open a file in write mode, appending written contents if the file already exists.
-- |
-- | `write {flags: "a"}`
append :: forall m. MonadAff m => MonadThrow Error m => FilePath -> Consumer (Maybe Buffer) m Unit
append = write { flags: "a" }
-- | Creates a `fs.Readable` stream for the file at the given path.
-- |
-- | Emits `Nothing` before closing. To opt out of this behavior,
-- | use `Pipes.Node.Stream.withoutEOS` or `Pipes.Node.Stream.unEOS`.
read :: forall m. MonadAff m => MonadThrow Error m => FilePath -> Producer (Maybe Buffer) m Unit
read p = do
r <- liftEffect $ FS.Stream.createReadStream p
fromReadable $ O.fromBufferReadable r

View File

@@ -2,94 +2,157 @@ module Pipes.Node.Stream where
import Prelude
import Control.Alternative (empty)
import Control.Monad.Maybe.Trans (MaybeT(..), runMaybeT)
import Control.Monad.Rec.Class (whileJust)
import Control.Monad.Error.Class (class MonadThrow, throwError)
import Control.Monad.Rec.Class (class MonadRec, Step(..), tailRecM)
import Control.Monad.ST.Class (liftST)
import Control.Monad.ST.Ref as STRef
import Control.Monad.Trans.Class (lift)
import Data.Maybe (Maybe(..))
import Data.Newtype (wrap)
import Effect.Aff (Aff, delay)
import Effect.Aff.Class (liftAff)
import Data.Traversable (for_)
import Data.Tuple.Nested ((/\))
import Effect.Aff (delay)
import Effect.Aff.Class (class MonadAff, liftAff)
import Effect.Class (liftEffect)
import Effect.Exception (Error)
import Node.Stream.Object as O
import Pipes (await, yield)
import Pipes.Core (Consumer, Pipe, Producer)
import Pipes.Internal (Proxy)
import Pipes.Internal as P.I
import Pipes (for) as P
import Pipes.Core (Consumer, Pipe, Producer, Producer_)
import Pipes.Prelude (mapFoldable) as P
import Pipes.Util (InvokeResult(..), invoke)
type ProxyFFI :: Type -> Type -> Type -> Type -> Type -> Type -> Type
type ProxyFFI a' a b' b r pipe =
{ pure :: r -> pipe
, request :: a' -> (a -> pipe) -> pipe
, respond :: b -> (b' -> pipe) -> pipe
}
-- | Convert a `Readable` stream to a `Pipe`.
-- |
-- | This will yield `Nothing` before exiting, signaling
-- | End-of-stream.
fromReadable :: forall s a m. MonadThrow Error m => MonadAff m => O.Read s a => s -> Producer_ (Maybe a) m Unit
fromReadable r =
let
cleanup rmErrorListener = do
liftEffect rmErrorListener
pure $ Done unit
proxyFFI :: forall m a' a b' b r. ProxyFFI a' a b' b r (Proxy a' a b' b m r)
proxyFFI = { pure: P.I.Pure, request: P.I.Request, respond: P.I.Respond }
go { error, cancel } = do
liftAff $ delay $ wrap 0.0
err <- liftEffect $ liftST $ STRef.read error
for_ err throwError
fromReadable :: forall s a. O.Read s a => s -> Producer (Maybe a) Aff Unit
fromReadable r = whileJust do
liftAff $ delay $ wrap 0.0
a <- liftEffect $ O.read r
case a of
O.ReadWouldBlock -> do
lift $ O.awaitReadableOrClosed r
pure $ Just unit
O.ReadClosed -> do
yield Nothing
pure Nothing
O.ReadJust a' -> do
yield $ Just a'
pure $ Just unit
res <- liftEffect $ O.read r
case res of
O.ReadJust a -> yield (Just a) $> Loop { error, cancel }
O.ReadWouldBlock -> liftAff (O.awaitReadableOrClosed r) $> Loop { error, cancel }
O.ReadClosed -> yield Nothing *> cleanup cancel
in
do
e <- liftEffect $ O.withErrorST r
tailRecM go e
fromWritable :: forall s a. O.Write s a => s -> Consumer (Maybe a) Aff Unit
fromWritable w = do
whileJust $ runMaybeT do
liftAff $ delay $ wrap 0.0
a <- MaybeT await
res <- liftEffect $ O.write w a
case res of
O.WriteClosed -> empty
O.WriteOk -> pure unit
O.WriteWouldBlock -> do
liftAff $ O.awaitWritableOrClosed w
pure unit
liftEffect $ O.end w
-- | Convert a `Writable` stream to a `Pipe`.
-- |
-- | When `Nothing` is piped to this, the stream will
-- | be `end`ed, and the pipe will noop if invoked again.
fromWritable :: forall s a m. MonadThrow Error m => MonadAff m => O.Write s a => s -> Consumer (Maybe a) m Unit
fromWritable w =
let
cleanup rmErrorListener = do
liftEffect rmErrorListener
liftEffect $ O.end w
pure $ Done unit
fromTransform :: forall a b. O.Transform a b -> Pipe (Maybe a) (Maybe b) Aff Unit
go { error, cancel } = do
liftAff $ delay $ wrap 0.0
err <- liftEffect $ liftST $ STRef.read error
for_ err throwError
ma <- await
case ma of
Nothing -> cleanup cancel
Just a -> do
res <- liftEffect $ O.write w a
case res of
O.WriteOk -> pure $ Loop { error, cancel }
O.WriteWouldBlock -> do
liftAff (O.awaitWritableOrClosed w)
pure $ Loop { error, cancel }
O.WriteClosed -> cleanup cancel
in
do
r <- liftEffect $ O.withErrorST w
tailRecM go r
-- | Convert a `Transform` stream to a `Pipe`.
-- |
-- | When `Nothing` is piped to this, the `Transform` stream will
-- | be `end`ed, and the pipe will noop if invoked again.
fromTransform :: forall a b m. MonadThrow Error m => MonadAff m => O.Transform a b -> Pipe (Maybe a) (Maybe b) m Unit
fromTransform t =
let
read' {exitOnWouldBlock} =
whileJust $ runMaybeT do
liftAff $ delay $ wrap 0.0
res <- liftEffect $ O.read t
case res of
O.ReadWouldBlock ->
if exitOnWouldBlock then do
empty
else do
liftAff $ O.awaitReadableOrClosed t
pure unit
O.ReadJust b -> do
lift $ yield $ Just b
pure unit
O.ReadClosed -> do
lift $ yield Nothing
empty
in do
whileJust $ runMaybeT do
cleanup removeErrorListener = do
liftEffect $ O.end t
liftEffect $ removeErrorListener
fromReadable t
pure $ Done unit
yieldFromReadableHalf = do
res <- liftEffect (O.read t)
case res of
O.ReadJust a -> yield (Just a)
O.ReadWouldBlock -> pure unit
O.ReadClosed -> yield Nothing *> pure unit
go { error, cancel } = do
liftAff $ delay $ wrap 0.0
err <- liftEffect $ liftST $ STRef.read error
for_ err throwError
a <- MaybeT await
writeRes <- liftEffect $ O.write t a
ma <- await
case ma of
Nothing -> cleanup cancel
Just a' -> do
res <- liftEffect $ O.write t a'
yieldFromReadableHalf
case res of
O.WriteClosed -> cleanup cancel
O.WriteOk -> pure $ Loop { error, cancel }
O.WriteWouldBlock -> do
liftAff $ O.awaitWritableOrClosed t
pure $ Loop { error, cancel }
in
do
r <- liftEffect $ O.withErrorST t
tailRecM go r
lift $ read' {exitOnWouldBlock: true}
-- | Given a `Producer` of values, wrap them in `Just`.
-- |
-- | Before the `Producer` exits, emits `Nothing` as an End-of-stream signal.
withEOS :: forall a m. Monad m => Producer a m Unit -> Producer (Maybe a) m Unit
withEOS a = do
P.for a (yield <<< Just)
yield Nothing
case writeRes of
O.WriteOk -> pure unit
O.WriteClosed -> empty
O.WriteWouldBlock -> do
liftAff $ O.awaitWritableOrClosed t
pure unit
liftEffect $ O.end t
read' {exitOnWouldBlock: false}
-- | Strip a pipeline of the EOS signal
unEOS :: forall a m. Monad m => Pipe (Maybe a) a m Unit
unEOS = P.mapFoldable identity
-- | Lift a `Pipe a a` to `Pipe (Maybe a) (Maybe a)`.
-- |
-- | Allows easily using pipes not concerned with the EOS signal with
-- | pipes that do need this signal.
-- |
-- | (ex. `Pipes.Node.Buffer.toString` doesn't need an EOS signal, but `Pipes.Node.FS.create` does.)
-- |
-- | `Just` values will be passed to the pipe, and the response(s) will be wrapped in `Just`.
-- |
-- | `Nothing` will bypass the given pipe entirely, and the pipe will not be invoked again.
inEOS :: forall a b m. MonadRec m => Pipe a b m Unit -> Pipe (Maybe a) (Maybe b) m Unit
inEOS p = flip tailRecM p \p' -> do
ma <- await
case ma of
Just a -> do
res <- lift $ invoke p' a
case res of
Yielded (as /\ p'') -> do
for_ (Just <$> as) yield
pure $ Loop p''
DidNotYield p'' -> pure $ Loop p''
Exited -> yield Nothing $> Done unit
_ -> yield Nothing $> Done unit

42
src/Pipes.Node.Zlib.purs Normal file
View File

@@ -0,0 +1,42 @@
module Pipes.Node.Zlib where
import Prelude
import Control.Monad.Error.Class (class MonadThrow)
import Data.Maybe (Maybe)
import Effect (Effect)
import Effect.Aff.Class (class MonadAff)
import Effect.Class (liftEffect)
import Effect.Exception (Error)
import Node.Buffer (Buffer)
import Node.Stream.Object as O
import Node.Zlib as Zlib
import Node.Zlib.Types (ZlibStream)
import Pipes.Core (Pipe)
import Pipes.Node.Stream (fromTransform)
fromZlib :: forall r m. MonadAff m => MonadThrow Error m => Effect (ZlibStream r) -> Pipe (Maybe Buffer) (Maybe Buffer) m Unit
fromZlib z = do
raw <- liftEffect $ Zlib.toDuplex <$> z
fromTransform $ O.fromBufferTransform raw
gzip :: forall m. MonadAff m => MonadThrow Error m => Pipe (Maybe Buffer) (Maybe Buffer) m Unit
gzip = fromZlib Zlib.createGzip
gunzip :: forall m. MonadAff m => MonadThrow Error m => Pipe (Maybe Buffer) (Maybe Buffer) m Unit
gunzip = fromZlib Zlib.createGunzip
unzip :: forall m. MonadAff m => MonadThrow Error m => Pipe (Maybe Buffer) (Maybe Buffer) m Unit
unzip = fromZlib Zlib.createUnzip
inflate :: forall m. MonadAff m => MonadThrow Error m => Pipe (Maybe Buffer) (Maybe Buffer) m Unit
inflate = fromZlib Zlib.createInflate
deflate :: forall m. MonadAff m => MonadThrow Error m => Pipe (Maybe Buffer) (Maybe Buffer) m Unit
deflate = fromZlib Zlib.createDeflate
brotliCompress :: forall m. MonadAff m => MonadThrow Error m => Pipe (Maybe Buffer) (Maybe Buffer) m Unit
brotliCompress = fromZlib Zlib.createBrotliCompress
brotliDecompress :: forall m. MonadAff m => MonadThrow Error m => Pipe (Maybe Buffer) (Maybe Buffer) m Unit
brotliDecompress = fromZlib Zlib.createBrotliDecompress

43
src/Pipes.String.purs Normal file
View File

@@ -0,0 +1,43 @@
module Pipes.String where
import Prelude
import Control.Monad.Maybe.Trans (MaybeT(..), runMaybeT)
import Control.Monad.Rec.Class (whileJust)
import Control.Monad.ST.Class (liftST)
import Control.Monad.Trans.Class (lift)
import Data.Array.ST as Array.ST
import Data.Foldable (fold, traverse_)
import Data.Maybe (Maybe(..))
import Data.String (Pattern)
import Data.String as String
import Effect.Class (class MonadEffect, liftEffect)
import Pipes (await, yield)
import Pipes.Core (Pipe)
-- | Accumulate string chunks until `pat` is seen, then `yield` the buffered
-- | string up to (and not including) the pattern.
-- |
-- | When end-of-stream is reached, yields the remaining buffered string then `Nothing`.
-- |
-- | ```
-- | toList $ yield "foo,bar,baz" >-> split ","
-- | -- "foo" : "bar" : "baz" : Nil
-- | ```
split :: forall m. MonadEffect m => Pattern -> Pipe (Maybe String) (Maybe String) m Unit
split pat = do
buf <- liftEffect $ liftST $ Array.ST.new
whileJust $ runMaybeT do
chunk <- MaybeT await
case String.indexOf pat chunk of
Nothing -> void $ liftEffect $ liftST $ Array.ST.push chunk buf
Just ix -> do
let
{ before, after } = String.splitAt ix chunk
len <- liftEffect $ liftST $ Array.ST.length buf
buf' <- liftEffect $ liftST $ Array.ST.splice 0 len [] buf
lift $ yield $ Just $ (fold buf') <> before
void $ liftEffect $ liftST $ Array.ST.push (String.drop 1 after) buf
buf' <- liftEffect $ liftST $ Array.ST.unsafeFreeze buf
traverse_ yield (Just <$> String.split pat (fold buf'))
yield Nothing

123
src/Pipes.Util.purs Normal file
View File

@@ -0,0 +1,123 @@
module Pipes.Util where
import Prelude
import Control.Monad.Maybe.Trans (MaybeT(..), runMaybeT)
import Control.Monad.Rec.Class (class MonadRec, forever, whileJust)
import Control.Monad.ST.Class (liftST)
import Control.Monad.ST.Ref (STRef)
import Control.Monad.ST.Ref as STRef
import Control.Monad.Trans.Class (lift)
import Data.Array.ST (STArray)
import Data.Array.ST as Array.ST
import Data.HashSet as HashSet
import Data.Hashable (class Hashable, hash)
import Data.List.NonEmpty (NonEmptyList)
import Data.Maybe (Maybe(..))
import Data.Tuple.Nested (type (/\), (/\))
import Effect.Class (class MonadEffect, liftEffect)
import Pipes (await, yield)
import Pipes.Core (Pipe)
import Pipes.Internal (Proxy(..))
-- | Yields a separator value `sep` between received values
-- |
-- | ```purescript
-- | toList $ (yield "a" *> yield "b" *> yield "c") >-> intersperse ","
-- | -- "a" : "," : "b" : "," : "c" : Nil
-- | ```
intersperse :: forall m a. MonadEffect m => a -> Pipe (Maybe a) (Maybe a) m Unit
intersperse sep = do
isFirstST <- liftEffect $ liftST $ STRef.new true
let
getIsFirst = liftEffect $ liftST $ STRef.read isFirstST
markNotFirst = void $ liftEffect $ liftST $ STRef.write false isFirstST
whileJust $ runMaybeT do
a <- MaybeT await
isFirst <- getIsFirst
if isFirst then markNotFirst else lift $ yield $ Just sep
lift $ yield $ Just a
yield Nothing
-- | Accumulate values in chunks of a given size.
-- |
-- | If the pipe closes without yielding a multiple of `size` elements,
-- | the remaining elements are yielded at the end.
chunked :: forall m a. MonadEffect m => Int -> Pipe (Maybe a) (Maybe (Array a)) m Unit
chunked size = do
chunkST :: STRef _ (STArray _ a) <- liftEffect $ liftST $ STRef.new =<< Array.ST.new
let
chunkPut a = liftEffect $ liftST do
chunkArray <- STRef.read chunkST
void $ Array.ST.push a chunkArray
chunkLength = liftEffect $ liftST do
chunkArray <- STRef.read chunkST
Array.ST.length chunkArray
chunkTake = liftEffect $ liftST do
chunkArray <- STRef.read chunkST
void $ flip STRef.write chunkST =<< Array.ST.new
Array.ST.unsafeFreeze chunkArray
whileJust $ runMaybeT do
a <- MaybeT await
chunkPut a
len <- chunkLength
when (len >= size) $ lift $ yield =<< Just <$> chunkTake
yield =<< Just <$> chunkTake
yield Nothing
-- | Equivalent of unix `uniq`, filtering out duplicate values passed to it.
-- |
-- | Uses a `HashSet` of hashes of `a`; for `n` elements `awaited`, this pipe
-- | will occupy O(n) space, and `yield` in O(1) time.
uniqHash :: forall a m. Hashable a => MonadEffect m => MonadRec m => Pipe a a m Unit
uniqHash = do
seenHashesST <- liftEffect $ liftST $ STRef.new HashSet.empty
forever do
a <- await
seenHashes <- liftEffect $ liftST $ STRef.read seenHashesST
when (not $ HashSet.member (hash a) seenHashes) do
void $ liftEffect $ liftST $ STRef.modify (HashSet.insert $ hash a) seenHashesST
yield a
-- | The result of a single step forward of a pipe.
data InvokeResult a b m
-- | The pipe `await`ed the value, but did not `yield` a response.
= DidNotYield (Pipe a b m Unit)
-- | The pipe `await`ed the value, and `yield`ed 1 or more responses.
| Yielded (NonEmptyList b /\ Pipe a b m Unit)
-- | The pipe `await`ed the value, and exited.
| Exited
data IntermediateInvokeResult a b m
= IDidNotYield (Pipe a b m Unit)
| IYielded (NonEmptyList b /\ Pipe a b m Unit)
| IDidNotAwait (Pipe a b m Unit)
-- | Pass a single value to a pipe, returning the result of the pipe's invocation.
invoke :: forall m a b. Monad m => Pipe a b m Unit -> a -> m (InvokeResult a b m)
invoke m a =
let
go :: IntermediateInvokeResult a b m -> m (InvokeResult a b m)
go (IYielded (as /\ n)) =
case n of
Request _ _ -> pure $ Yielded $ as /\ n
Respond rep f -> go (IYielded $ (as <> pure rep) /\ f unit)
M o -> go =<< IYielded <$> (as /\ _) <$> o
Pure _ -> pure Exited
go (IDidNotYield n) =
case n of
Request _ _ -> pure $ DidNotYield n
Respond rep f -> go (IYielded $ pure rep /\ f unit)
M o -> go =<< IDidNotYield <$> o
Pure _ -> pure Exited
go (IDidNotAwait n) =
case n of
Request _ f -> go (IDidNotYield (f a))
Respond rep f -> go (IYielded $ pure rep /\ f unit)
M o -> go =<< IDidNotAwait <$> o
Pure _ -> pure Exited
in
go (IDidNotAwait m)

38
test/Test/Common.purs Normal file
View File

@@ -0,0 +1,38 @@
module Test.Common where
import Prelude
import Control.Monad.Error.Class (class MonadError, liftEither, try)
import Data.Bifunctor (lmap)
import Data.String.Gen (genAlphaString)
import Data.Tuple (fst)
import Data.Tuple.Nested (type (/\), (/\))
import Effect.Aff (Aff, bracket)
import Effect.Class (liftEffect)
import Effect.Exception (Error, error)
import Node.FS.Sync as FS
import Pipes.Core (Pipe)
import Pipes.Prelude as Pipes
import Simple.JSON (class ReadForeign, class WriteForeign, readJSON, writeJSON)
import Test.QuickCheck.Gen (randomSampleOne, resize)
tmpFile :: (String -> Aff Unit) -> Aff Unit
tmpFile f = tmpFiles (f <<< fst)
tmpFiles :: (String /\ String -> Aff Unit) -> Aff Unit
tmpFiles =
let
acq = do
randa <- liftEffect $ randomSampleOne $ resize 10 genAlphaString
randb <- liftEffect $ randomSampleOne $ resize 10 genAlphaString
void $ try $ liftEffect $ FS.mkdir ".tmp"
pure $ (".tmp/tmp." <> randa) /\ (".tmp/tmp." <> randb)
rel (a /\ b) = liftEffect (try (FS.rm a) *> void (try $ FS.rm b))
in
bracket acq rel
jsonStringify :: forall m a. Monad m => WriteForeign a => Pipe a String m Unit
jsonStringify = Pipes.map writeJSON
jsonParse :: forall m @a. MonadError Error m => ReadForeign a => Pipe String a m Unit
jsonParse = Pipes.mapM (liftEither <<< lmap (error <<< show) <<< readJSON)

View File

@@ -6,9 +6,15 @@ import Data.Maybe (Maybe(..))
import Effect (Effect)
import Effect.Aff (launchAff_)
import Test.Pipes.Node.Stream as Test.Pipes.Node.Stream
import Test.Spec.Reporter (consoleReporter, specReporter)
import Test.Pipes.Node.Buffer as Test.Pipes.Node.Buffer
import Test.Pipes.Node.FS as Test.Pipes.Node.FS
import Test.Pipes.Collect as Test.Pipes.Collect
import Test.Spec.Reporter (specReporter)
import Test.Spec.Runner (defaultConfig, runSpec')
main :: Effect Unit
main = launchAff_ $ runSpec' (defaultConfig { timeout = Nothing }) [ specReporter ] do
main = launchAff_ $ runSpec' (defaultConfig { failFast = true, timeout = Nothing }) [ specReporter ] do
Test.Pipes.Node.Stream.spec
Test.Pipes.Node.Buffer.spec
Test.Pipes.Node.FS.spec
Test.Pipes.Collect.spec

View File

@@ -0,0 +1,111 @@
module Test.Pipes.Collect where
import Prelude
import Control.Monad.Gen (chooseInt)
import Control.Monad.Rec.Class (Step(..), tailRecM)
import Control.Monad.ST as ST
import Control.Monad.ST.Ref as STRef
import Data.Array as Array
import Data.Bifunctor (lmap)
import Data.HashMap (HashMap)
import Data.HashMap as HashMap
import Data.List (List)
import Data.List as List
import Data.Map (Map)
import Data.Map as Map
import Data.Maybe (Maybe(..))
import Data.Traversable (traverse)
import Data.Tuple.Nested (type (/\), (/\))
import Effect.Aff (Aff)
import Effect.Class (liftEffect)
import Effect.Unsafe (unsafePerformEffect)
import Foreign.Object (Object)
import Foreign.Object as Object
import Pipes (yield)
import Pipes.Collect as Pipes.Collect
import Pipes.Core (Producer)
import Test.QuickCheck.Gen (randomSampleOne)
import Test.Spec (Spec, describe, it)
import Test.Spec.Assertions (shouldEqual)
testData
:: { array :: Array (Int /\ Int)
, list :: List (Int /\ Int)
, strarray :: Array (String /\ Int)
, object :: Object Int
, map :: Map Int Int
, hashMap :: HashMap Int Int
, stream :: Producer (Int /\ Int) Aff Unit
, streamStr :: Producer (String /\ Int) Aff Unit
}
testData =
unsafePerformEffect $ do
array <-
flip traverse (Array.range 0 99999) \k -> do
v <- liftEffect $ randomSampleOne $ chooseInt 0 99999
pure $ k /\ v
let
strarray = lmap show <$> array
object = Object.fromFoldable strarray
map' :: forall m. m -> (Int -> Int -> m -> m) -> m
map' empty insert = ST.run do
st <- STRef.new empty
ST.foreach array \(k /\ v) -> void $ STRef.modify (insert k v) st
STRef.read st
hashMap = map' HashMap.empty HashMap.insert
map = map' Map.empty Map.insert
pure
{ array
, strarray
, list: List.fromFoldable array
, object
, hashMap
, map
, stream: flip tailRecM 0 \ix -> case Array.index array ix of
Just a -> yield a $> Loop (ix + 1)
Nothing -> pure $ Done unit
, streamStr: flip tailRecM 0 \ix -> case Array.index strarray ix of
Just a -> yield a $> Loop (ix + 1)
Nothing -> pure $ Done unit
}
spec :: Spec Unit
spec =
describe "Test.Pipes.Collect" do
describe "toArray" do
it "collects an array" do
act <- Pipes.Collect.toArray testData.stream
act `shouldEqual` testData.array
it "empty ok" do
act :: Array Int <- Pipes.Collect.toArray (pure unit)
act `shouldEqual` []
describe "toObject" do
it "collects" do
act <- Pipes.Collect.toObject $ testData.streamStr
act `shouldEqual` testData.object
it "empty ok" do
act :: Object Int <- Pipes.Collect.toObject (pure unit)
act `shouldEqual` Object.empty
describe "toMap" do
it "collects" do
act <- Pipes.Collect.toMap testData.stream
act `shouldEqual` testData.map
it "empty ok" do
act :: Map String Int <- Pipes.Collect.toMap (pure unit)
act `shouldEqual` Map.empty
describe "toHashMap" do
it "collects" do
act <- Pipes.Collect.toHashMap testData.stream
act `shouldEqual` testData.hashMap
it "empty ok" do
act :: HashMap String Int <- Pipes.Collect.toHashMap (pure unit)
act `shouldEqual` HashMap.empty
describe "toList" do
it "collects" do
act <- Pipes.Collect.toList testData.stream
act `shouldEqual` testData.list
it "empty ok" do
act :: List (String /\ Int) <- Pipes.Collect.toList (pure unit)
act `shouldEqual` List.Nil

View File

@@ -0,0 +1,71 @@
module Test.Pipes.Node.Buffer where
import Prelude
import Control.Monad.Error.Class (catchError)
import Control.Monad.Gen (chooseInt, sized)
import Data.Array as Array
import Data.FoldableWithIndex (forWithIndex_)
import Data.Int as Int
import Data.String.Gen (genAsciiString)
import Data.Tuple (fst, snd)
import Data.Tuple.Nested ((/\))
import Effect.Class (liftEffect)
import Effect.Unsafe (unsafePerformEffect)
import Node.Buffer (Buffer, BufferValueType(..))
import Node.Buffer as Buffer
import Node.Encoding (Encoding(..))
import Pipes ((>->))
import Pipes (each) as Pipes
import Pipes.Core (runEffect) as Pipes
import Pipes.Node.Buffer as Pipes.Node.Buffer
import Pipes.Prelude (drain, toListM) as Pipes
import Test.QuickCheck (class Arbitrary)
import Test.QuickCheck.Arbitrary (arbitrary)
import Test.QuickCheck.Gen (randomSample', vectorOf)
import Test.Spec (Spec, describe, it)
import Test.Spec.Assertions (fail, shouldEqual)
data BufferJunk = BufferJunk Buffer
instance Arbitrary BufferJunk where
arbitrary = sized \s -> do
ns <- vectorOf s (chooseInt 0 7)
pure $ unsafePerformEffect do
buf <- Buffer.alloc s
forWithIndex_ ns \ix n -> Buffer.write UInt8 (Int.toNumber n) ix buf
pure $ BufferJunk buf
data BufferUTF8 = BufferUTF8 String Buffer
instance Arbitrary BufferUTF8 where
arbitrary = do
s <- genAsciiString
pure $ BufferUTF8 s $ unsafePerformEffect $ Buffer.fromString s UTF8
spec :: Spec Unit
spec = describe "Pipes.Node.Buffer" do
describe "toString" do
it "fails when encoding wrong" do
vals <- Pipes.each <$> (map \(BufferJunk b) -> b) <$> liftEffect (randomSample' 10 arbitrary)
let
uut = Pipes.runEffect $ vals >-> Pipes.Node.Buffer.toString UTF8 >-> Pipes.drain
ok = do
uut
fail "Should have thrown"
err _ = pure unit
catchError ok err
it "junk OK in hex" do
vals <- Pipes.each <$> (map \(BufferJunk b) -> b) <$> liftEffect (randomSample' 10 arbitrary)
Pipes.runEffect $ vals >-> Pipes.Node.Buffer.toString Hex >-> Pipes.drain
it "UTF8 ok" do
vals <- (map \(BufferUTF8 s b) -> s /\ b) <$> liftEffect (randomSample' 100 arbitrary)
let
bufs = Pipes.each $ snd <$> vals
strs = fst <$> vals
act <- Array.fromFoldable <$> Pipes.toListM (bufs >-> Pipes.Node.Buffer.toString UTF8)
act `shouldEqual` strs
describe "fromString" do
it "ok" do
vals <- Pipes.each <$> liftEffect (randomSample' 100 genAsciiString)
Pipes.runEffect $ vals >-> Pipes.Node.Buffer.fromString UTF8 >-> Pipes.drain

View File

@@ -0,0 +1,86 @@
module Test.Pipes.Node.FS where
import Prelude
import Control.Monad.Error.Class (catchError)
import Data.Foldable (fold, intercalate)
import Data.Newtype (wrap)
import Data.Tuple.Nested ((/\))
import Effect.Class (liftEffect)
import Node.Encoding (Encoding(..))
import Node.FS.Sync as FS
import Pipes (yield, (>->))
import Pipes.Core (runEffect) as Pipes
import Pipes.Node.Buffer as Pipes.Node.Buffer
import Pipes.Node.FS as Pipes.Node.FS
import Pipes.Node.Stream (inEOS, unEOS, withEOS)
import Pipes.Prelude (drain, map, toListM) as Pipes
import Pipes.String as Pipes.String
import Pipes.Util as Pipes.Util
import Simple.JSON (writeJSON)
import Test.Common (jsonParse, tmpFile, tmpFiles)
import Test.Spec (Spec, around, describe, it)
import Test.Spec.Assertions (fail, shouldEqual)
spec :: Spec Unit
spec = describe "Pipes.Node.FS" do
describe "read" do
around tmpFile $ it "fails if the file does not exist" \p -> do
flip catchError (const $ pure unit) do
Pipes.runEffect $ Pipes.Node.FS.read p >-> Pipes.drain
fail "should have thrown"
around tmpFile $ it "reads ok" \p -> do
liftEffect $ FS.writeTextFile UTF8 p "foo"
s <- fold <$> Pipes.toListM (Pipes.Node.FS.read p >-> unEOS >-> Pipes.Node.Buffer.toString UTF8)
s `shouldEqual` "foo"
around tmpFile $ it "fails if the file already exists" \p -> do
liftEffect $ FS.writeTextFile UTF8 "foo" p
flip catchError (const $ pure unit) do
Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.create p
fail "should have thrown"
describe "create" do
around tmpFile $ it "creates the file when not exists" \p -> do
Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.create p
contents <- liftEffect $ FS.readTextFile UTF8 p
contents `shouldEqual` "foo"
around tmpFile $ it "fails if the file already exists" \p -> do
liftEffect $ FS.writeTextFile UTF8 "foo" p
flip catchError (const $ pure unit) do
Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.create p
fail "should have thrown"
describe "append" do
around tmpFile $ it "creates the file when not exists" \p -> do
Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.append p
contents <- liftEffect $ FS.readTextFile UTF8 p
contents `shouldEqual` "foo"
around tmpFile $ it "appends" \p -> do
Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.append p
Pipes.runEffect $ withEOS (yield "\n" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.append p
Pipes.runEffect $ withEOS (yield "bar" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.append p
contents <- liftEffect $ FS.readTextFile UTF8 p
contents `shouldEqual` "foo\nbar"
describe "truncate" do
around tmpFile $ it "creates the file when not exists" \p -> do
Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.truncate p
contents <- liftEffect $ FS.readTextFile UTF8 p
contents `shouldEqual` "foo"
around tmpFile $ it "overwrites contents" \p -> do
Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.truncate p
Pipes.runEffect $ withEOS (yield "bar" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.truncate p
contents <- liftEffect $ FS.readTextFile UTF8 p
contents `shouldEqual` "bar"
around tmpFiles $ it "json lines >-> parse >-> _.foo >-> write" \(a /\ b) -> do
let
exp = [ { foo: "a" }, { foo: "bar" }, { foo: "123" } ]
liftEffect $ FS.writeTextFile UTF8 a $ intercalate "\n" $ writeJSON <$> exp
Pipes.runEffect $
Pipes.Node.FS.read a
>-> inEOS (Pipes.Node.Buffer.toString UTF8)
>-> Pipes.String.split (wrap "\n")
>-> inEOS (jsonParse @{ foo :: String })
>-> inEOS (Pipes.map _.foo)
>-> Pipes.Util.intersperse "\n"
>-> inEOS (Pipes.Node.Buffer.fromString UTF8)
>-> Pipes.Node.FS.create b
act <- liftEffect $ FS.readTextFile UTF8 b
act `shouldEqual` "a\nbar\n123"

View File

@@ -1,4 +1,19 @@
import Stream from 'stream'
export const discardTransform = () => new Stream.Transform({
transform: function(_ck, _enc, cb) {
cb()
},
objectMode: true
})
export const charsTransform = () => new Stream.Transform({
transform: function(ck, _enc, cb) {
ck.split('').filter(s => !!s).forEach(s => this.push(s))
cb()
},
objectMode: true,
})
/** @type {(a: Array<unknown>) => Stream.Readable}*/
export const readableFromArray = a => Stream.Readable.from(a)

View File

@@ -2,24 +2,18 @@ module Test.Pipes.Node.Stream where
import Prelude
import Control.Monad.Error.Class (liftEither, try)
import Control.Monad.Morph (hoist)
import Control.Monad.Trans.Class (lift)
import Data.Array as Array
import Data.Bifunctor (lmap)
import Data.Foldable (fold, intercalate)
import Data.Foldable (fold)
import Data.List ((:))
import Data.List as List
import Data.Maybe (Maybe(..), fromMaybe)
import Data.Maybe (Maybe)
import Data.Newtype (wrap)
import Data.String.Gen (genAlphaString)
import Data.Traversable (for_, traverse)
import Data.Tuple (fst)
import Data.Tuple.Nested (type (/\), (/\))
import Effect (Effect)
import Effect.Aff (Aff, bracket, delay)
import Effect.Class (liftEffect)
import Effect.Exception (error)
import Effect.Aff (Aff, delay)
import Effect.Class (class MonadEffect, liftEffect)
import Node.Buffer (Buffer)
import Node.Buffer as Buffer
import Node.Encoding (Encoding(..))
@@ -27,53 +21,30 @@ import Node.FS.Stream as FS.Stream
import Node.FS.Sync as FS
import Node.Stream.Object as O
import Node.Zlib as Zlib
import Pipes (each) as Pipes
import Pipes (yield, (>->))
import Pipes.Core (Consumer, Producer, Pipe, runEffect)
import Pipes.Core (Consumer, Producer, runEffect)
import Pipes.Node.Buffer as Pipes.Buffer
import Pipes.Node.Stream as S
import Pipes.Prelude as Pipe
import Simple.JSON (class ReadForeign, class WriteForeign, readJSON, writeJSON)
import Pipes.Prelude (mapFoldable, toListM) as Pipes
import Simple.JSON (writeJSON)
import Test.Common (jsonParse, jsonStringify, tmpFile, tmpFiles)
import Test.QuickCheck.Arbitrary (arbitrary)
import Test.QuickCheck.Gen (randomSample', randomSampleOne, resize)
import Test.QuickCheck.Gen (randomSample')
import Test.Spec (Spec, around, describe, it)
import Test.Spec.Assertions (shouldEqual)
foreign import readableFromArray :: forall @a. Array a -> O.Readable a
foreign import discardTransform :: forall a b. Effect (O.Transform a b)
foreign import charsTransform :: Effect (O.Transform String String)
str2buf :: Pipe (Maybe String) (Maybe Buffer) Aff Unit
str2buf = hoist liftEffect $ Pipe.mapM (traverse $ flip Buffer.fromString UTF8)
writer :: forall m. MonadEffect m => String -> m (O.Writable Buffer /\ Consumer (Maybe Buffer) Aff Unit)
writer a = do
stream <- liftEffect $ O.fromBufferWritable <$> FS.Stream.createWriteStream a
pure $ stream /\ S.fromWritable stream
buf2str :: Pipe (Maybe Buffer) (Maybe String) Aff Unit
buf2str = hoist liftEffect $ Pipe.mapM (traverse $ Buffer.toString UTF8)
buf2hex :: Pipe (Maybe Buffer) (Maybe String) Aff Unit
buf2hex = hoist liftEffect $ Pipe.mapM (traverse $ Buffer.toString Hex)
jsonStringify :: forall a. WriteForeign a => Pipe (Maybe a) (Maybe String) Aff Unit
jsonStringify = Pipe.map (map writeJSON)
jsonParse :: forall @a. ReadForeign a => Pipe (Maybe String) (Maybe a) Aff Unit
jsonParse = Pipe.mapM (traverse (liftEither <<< lmap (error <<< show) <<< readJSON))
writer :: String -> Effect (Consumer (Maybe Buffer) Aff Unit)
writer a = S.fromWritable <$> O.fromBufferWritable <$> FS.Stream.createWriteStream a
reader :: String -> Effect (Producer (Maybe Buffer) Aff Unit)
reader a = S.fromReadable <$> O.fromBufferReadable <$> FS.Stream.createReadStream a
tmpFile :: (String -> Aff Unit) -> Aff Unit
tmpFile f = tmpFiles (f <<< fst)
tmpFiles :: (String /\ String -> Aff Unit) -> Aff Unit
tmpFiles =
let
acq = do
randa <- liftEffect $ randomSampleOne $ resize 10 genAlphaString
randb <- liftEffect $ randomSampleOne $ resize 10 genAlphaString
void $ try $ liftEffect $ FS.mkdir ".tmp"
pure $ ("tmp." <> randa) /\ ("tmp." <> randb)
rel (a /\ b) = liftEffect (try (FS.rm a) *> void (try $ FS.rm b))
in
bracket acq rel
reader :: forall m. MonadEffect m => String -> m (Producer (Maybe Buffer) Aff Unit)
reader a = liftEffect $ S.fromReadable <$> O.fromBufferReadable <$> FS.Stream.createReadStream a
spec :: Spec Unit
spec =
@@ -81,42 +52,42 @@ spec =
describe "Readable" do
describe "Readable.from(<Iterable>)" do
it "empty" do
vals <- List.catMaybes <$> (Pipe.toListM $ S.fromReadable $ readableFromArray @{ foo :: String } [])
vals <- Pipes.toListM $ (S.fromReadable $ readableFromArray @{ foo :: String } []) >-> S.unEOS
vals `shouldEqual` List.Nil
it "singleton" do
vals <- List.catMaybes <$> (Pipe.toListM $ S.fromReadable $ readableFromArray @{ foo :: String } [ { foo: "1" } ])
vals <- Pipes.toListM $ (S.fromReadable $ readableFromArray @{ foo :: String } [ { foo: "1" } ]) >-> S.unEOS
vals `shouldEqual` ({ foo: "1" } : List.Nil)
it "many elements" do
let exp = (\n -> { foo: show n }) <$> Array.range 0 100
vals <- List.catMaybes <$> (Pipe.toListM $ S.fromReadable $ readableFromArray exp)
vals <- Pipes.toListM $ (S.fromReadable $ readableFromArray exp) >-> S.unEOS
vals `shouldEqual` (List.fromFoldable exp)
describe "Writable" $ around tmpFile do
describe "fs.WriteStream" do
it "pipe to file" \p -> do
w <- S.fromWritable <$> O.fromBufferWritable <$> liftEffect (FS.Stream.createWriteStream p)
stream <- O.fromBufferWritable <$> liftEffect (FS.Stream.createWriteStream p)
let
w = S.fromWritable stream
source = do
buf <- liftEffect $ Buffer.fromString "hello" UTF8
yield $ Just buf
yield Nothing
runEffect $ source >-> w
yield buf
runEffect $ S.withEOS source >-> w
contents <- liftEffect $ FS.readTextFile UTF8 p
contents `shouldEqual` "hello"
shouldEqual true =<< liftEffect (O.isWritableEnded stream)
it "async pipe to file" \p -> do
w <- S.fromWritable <$> O.fromBufferWritable <$> liftEffect (FS.Stream.createWriteStream p)
let
source = do
yield $ Just "hello, "
yield "hello, "
lift $ delay $ wrap 5.0
yield $ Just "world!"
yield "world!"
lift $ delay $ wrap 5.0
yield $ Just " "
yield " "
lift $ delay $ wrap 5.0
yield $ Just "this is a "
yield "this is a "
lift $ delay $ wrap 5.0
yield $ Just "test."
yield Nothing
runEffect $ source >-> str2buf >-> w
yield "test."
runEffect $ S.withEOS (source >-> Pipes.Buffer.fromString UTF8) >-> w
contents <- liftEffect $ FS.readTextFile UTF8 p
contents `shouldEqual` "hello, world! this is a test."
it "chained pipes" \p -> do
@@ -125,34 +96,45 @@ spec =
str :: String <- genAlphaString
num :: Int <- arbitrary
stuff :: Array String <- arbitrary
pure {str, num, stuff}
objs <- liftEffect $ randomSample' 1 obj
pure { str, num, stuff }
objs <- liftEffect (randomSample' 1 obj)
let
exp = fold (writeJSON <$> objs)
objs' = for_ (Just <$> objs) yield *> yield Nothing
w <- liftEffect $ writer p
runEffect $ objs' >-> jsonStringify >-> str2buf >-> w
stream /\ w <- liftEffect $ writer p
runEffect $ S.withEOS (Pipes.each objs >-> jsonStringify >-> Pipes.Buffer.fromString UTF8) >-> w
contents <- liftEffect $ FS.readTextFile UTF8 p
contents `shouldEqual` exp
shouldEqual true =<< liftEffect (O.isWritableEnded stream)
describe "Transform" do
it "gzip" do
let
json = do
yield $ Just $ writeJSON {foo: "bar"}
yield Nothing
json = yield $ writeJSON { foo: "bar" }
exp = "1f8b0800000000000003ab564acbcf57b2524a4a2c52aa0500eff52bfe0d000000"
gzip <- S.fromTransform <$> O.fromBufferTransform <$> liftEffect (Zlib.toDuplex <$> Zlib.createGzip)
outs :: List.List String <- List.catMaybes <$> Pipe.toListM (json >-> str2buf >-> gzip >-> buf2hex)
outs :: List.List String <- Pipes.toListM (S.withEOS (json >-> Pipes.Buffer.fromString UTF8) >-> gzip >-> S.unEOS >-> Pipes.Buffer.toString Hex)
fold outs `shouldEqual` exp
around tmpFiles
$ it "file >-> gzip >-> file >-> gunzip" \(a /\ b) -> do
liftEffect $ FS.writeTextFile UTF8 a $ writeJSON [1, 2, 3, 4]
liftEffect $ FS.writeTextFile UTF8 a $ writeJSON [ 1, 2, 3, 4 ]
areader <- liftEffect $ reader a
bwriter <- liftEffect $ writer b
bwritestream /\ bwriter <- liftEffect $ writer b
gzip <- S.fromTransform <$> O.fromBufferTransform <$> liftEffect (Zlib.toDuplex <$> Zlib.createGzip)
runEffect $ areader >-> gzip >-> bwriter
shouldEqual true =<< liftEffect (O.isWritableEnded bwritestream)
gunzip <- S.fromTransform <$> O.fromBufferTransform <$> liftEffect (Zlib.toDuplex <$> Zlib.createGunzip)
breader <- liftEffect $ reader b
nums <- Pipe.toListM (breader >-> gunzip >-> buf2str >-> jsonParse @(Array Int) >-> Pipe.mapFoldable (fromMaybe []))
Array.fromFoldable nums `shouldEqual` [1, 2, 3, 4]
nums <- Pipes.toListM (breader >-> gunzip >-> S.unEOS >-> Pipes.Buffer.toString UTF8 >-> jsonParse @(Array Int) >-> Pipes.mapFoldable identity)
Array.fromFoldable nums `shouldEqual` [ 1, 2, 3, 4 ]
around tmpFile $ it "file >-> discardTransform" \(p :: String) -> do
liftEffect $ FS.writeTextFile UTF8 p "foo"
r <- reader p
discard' <- liftEffect discardTransform
out :: List.List Int <- Pipes.toListM $ r >-> S.fromTransform discard' >-> S.unEOS
out `shouldEqual` List.Nil
around tmpFile $ it "file >-> charsTransform" \(p :: String) -> do
liftEffect $ FS.writeTextFile UTF8 p "foo bar"
r <- reader p
chars' <- liftEffect charsTransform
out :: List.List String <- Pipes.toListM $ r >-> S.inEOS (Pipes.Buffer.toString UTF8) >-> S.fromTransform chars' >-> S.unEOS
out `shouldEqual` List.fromFoldable [ "f", "o", "o", " ", "b", "a", "r" ]