25 Commits

Author SHA1 Message Date
3f4bc12d36 chore: prepare v1.4.0 2024-05-14 13:12:56 -05:00
fd53b6520f feat: Collect.toBuffer 2024-05-14 13:12:41 -05:00
0ef7240d61 wip: explore removing delays(10) 2024-05-14 12:44:31 -05:00
de22f44f86 wip: explore removing delays(9) 2024-05-14 12:43:38 -05:00
f9c0e20777 wip: explore removing delays(8) 2024-05-14 11:08:46 -05:00
edc7d40dbc wip: explore removing delays(7) 2024-05-14 11:04:50 -05:00
ba8d90038d wip: explore removing delays(6) 2024-05-14 10:55:14 -05:00
dfdca9f5e9 wip: explore removing delays(5) 2024-05-14 10:39:06 -05:00
67ae171532 wip: explore removing delays(4) 2024-05-13 21:18:27 -05:00
a347c05062 wip: explore removing delays(3) 2024-05-13 21:17:28 -05:00
f9446c97a0 wip: explore removing delays(2) 2024-05-13 21:15:45 -05:00
d3b8d1792d wip: explore removing delays 2024-05-13 21:06:41 -05:00
e05c74f42f fix: minor fixes 2024-05-13 15:04:34 -05:00
e1c2481e70 chore: prepare v1.3.3 2024-05-13 14:42:48 -05:00
820351f800 fix: more yields 2024-05-13 14:42:23 -05:00
9d8b500b8d chore: prepare v1.3.2 2024-05-13 14:35:55 -05:00
b7bead090e fix: transform should read more than just 1 chunk after writing 2024-05-13 14:35:44 -05:00
3db5cc44a9 chore: prepare v1.3.1 2024-05-13 13:27:28 -05:00
1a5ca66e83 fix: Pipes.Node.FS.read' 2024-05-13 13:27:18 -05:00
54d9d57927 chore: prepare v1.3.0 2024-05-13 11:21:23 -05:00
a5c535fb1e feat: Pipes.Construct 2024-05-13 11:21:06 -05:00
7e6c6af3dd chore: prepare v1.2.3 2024-05-11 22:11:44 -05:00
faf49fafd5 chore: lock 2024-05-11 22:11:40 -05:00
04815f66a4 chore: prepare v1.2.2 2024-05-11 22:10:57 -05:00
fd895de148 fix: ensure-ranges 2024-05-11 22:09:45 -05:00
14 changed files with 293 additions and 82 deletions

View File

@@ -1,6 +1,6 @@
{ {
"name": "purescript-node-stream-pipes", "name": "purescript-node-stream-pipes",
"version": "v1.2.1", "version": "v1.4.0",
"type": "module", "type": "module",
"dependencies": { "dependencies": {
"csv-parse": "^5.5.5", "csv-parse": "^5.5.5",

View File

@@ -9,8 +9,8 @@ workspace:
- either: ">=6.1.0 <7.0.0" - either: ">=6.1.0 <7.0.0"
- exceptions: ">=6.0.0 <7.0.0" - exceptions: ">=6.0.0 <7.0.0"
- foldable-traversable: ">=6.0.0 <7.0.0" - foldable-traversable: ">=6.0.0 <7.0.0"
- foreign-object - foreign-object: ">=4.1.0 <5.0.0"
- lists - lists: ">=7.0.0 <8.0.0"
- maybe: ">=6.0.0 <7.0.0" - maybe: ">=6.0.0 <7.0.0"
- mmorph: ">=7.0.0 <8.0.0" - mmorph: ">=7.0.0 <8.0.0"
- newtype: ">=5.0.0 <6.0.0" - newtype: ">=5.0.0 <6.0.0"
@@ -20,7 +20,7 @@ workspace:
- node-path: ">=5.0.0 <6.0.0" - node-path: ">=5.0.0 <6.0.0"
- node-streams: ">=9.0.0 <10.0.0" - node-streams: ">=9.0.0 <10.0.0"
- node-zlib: ">=0.4.0 <0.5.0" - node-zlib: ">=0.4.0 <0.5.0"
- ordered-collections - ordered-collections: ">=3.2.0 <4.0.0"
- parallel: ">=6.0.0 <7.0.0" - parallel: ">=6.0.0 <7.0.0"
- pipes: ">=8.0.0 <9.0.0" - pipes: ">=8.0.0 <9.0.0"
- prelude: ">=6.0.1 <7.0.0" - prelude: ">=6.0.1 <7.0.0"
@@ -28,8 +28,8 @@ workspace:
- strings: ">=6.0.1 <7.0.0" - strings: ">=6.0.1 <7.0.0"
- tailrec: ">=6.1.0 <7.0.0" - tailrec: ">=6.1.0 <7.0.0"
- transformers: ">=6.0.0 <7.0.0" - transformers: ">=6.0.0 <7.0.0"
- tuples - tuples: ">=7.0.0 <8.0.0"
- unordered-collections - unordered-collections: ">=3.1.0 <4.0.0"
- unsafe-coerce: ">=6.0.0 <7.0.0" - unsafe-coerce: ">=6.0.0 <7.0.0"
test_dependencies: test_dependencies:
- console - console

View File

@@ -1,7 +1,7 @@
package: package:
name: node-stream-pipes name: node-stream-pipes
publish: publish:
version: '1.2.1' version: '1.4.0'
license: 'GPL-3.0-or-later' license: 'GPL-3.0-or-later'
location: location:
githubOwner: 'cakekindel' githubOwner: 'cakekindel'
@@ -10,17 +10,14 @@ package:
strict: true strict: true
pedanticPackages: true pedanticPackages: true
dependencies: dependencies:
- foreign-object
- lists
- ordered-collections
- tuples
- unordered-collections
- aff: ">=7.1.0 <8.0.0" - aff: ">=7.1.0 <8.0.0"
- arrays: ">=7.3.0 <8.0.0" - arrays: ">=7.3.0 <8.0.0"
- effect: ">=4.0.0 <5.0.0" - effect: ">=4.0.0 <5.0.0"
- either: ">=6.1.0 <7.0.0" - either: ">=6.1.0 <7.0.0"
- exceptions: ">=6.0.0 <7.0.0" - exceptions: ">=6.0.0 <7.0.0"
- foldable-traversable: ">=6.0.0 <7.0.0" - foldable-traversable: ">=6.0.0 <7.0.0"
- foreign-object: ">=4.1.0 <5.0.0"
- lists: ">=7.0.0 <8.0.0"
- maybe: ">=6.0.0 <7.0.0" - maybe: ">=6.0.0 <7.0.0"
- mmorph: ">=7.0.0 <8.0.0" - mmorph: ">=7.0.0 <8.0.0"
- newtype: ">=5.0.0 <6.0.0" - newtype: ">=5.0.0 <6.0.0"
@@ -30,6 +27,7 @@ package:
- node-path: ">=5.0.0 <6.0.0" - node-path: ">=5.0.0 <6.0.0"
- node-streams: ">=9.0.0 <10.0.0" - node-streams: ">=9.0.0 <10.0.0"
- node-zlib: ">=0.4.0 <0.5.0" - node-zlib: ">=0.4.0 <0.5.0"
- ordered-collections: ">=3.2.0 <4.0.0"
- parallel: ">=6.0.0 <7.0.0" - parallel: ">=6.0.0 <7.0.0"
- pipes: ">=8.0.0 <9.0.0" - pipes: ">=8.0.0 <9.0.0"
- prelude: ">=6.0.1 <7.0.0" - prelude: ">=6.0.1 <7.0.0"
@@ -37,9 +35,13 @@ package:
- strings: ">=6.0.1 <7.0.0" - strings: ">=6.0.1 <7.0.0"
- tailrec: ">=6.1.0 <7.0.0" - tailrec: ">=6.1.0 <7.0.0"
- transformers: ">=6.0.0 <7.0.0" - transformers: ">=6.0.0 <7.0.0"
- tuples: ">=7.0.0 <8.0.0"
- unordered-collections: ">=3.1.0 <4.0.0"
- unsafe-coerce: ">=6.0.0 <7.0.0" - unsafe-coerce: ">=6.0.0 <7.0.0"
test: test:
main: Test.Main main: Test.Main
build:
strict: true
dependencies: dependencies:
- console - console
- gen - gen

View File

@@ -3,12 +3,18 @@ import Stream from "stream";
/** @type {(s: Stream.Readable | Stream.Transform) => () => boolean} */ /** @type {(s: Stream.Readable | Stream.Transform) => () => boolean} */
export const isReadableImpl = (s) => () => s.readable; export const isReadableImpl = (s) => () => s.readable;
/** @type {(s: Stream.Readable | Stream.Transform) => () => number} */
export const readableLengthImpl = (s) => () => s.readableLength;
/** @type {(s: Stream.Writable | Stream.Readable) => () => boolean} */ /** @type {(s: Stream.Writable | Stream.Readable) => () => boolean} */
export const isClosedImpl = (s) => () => s.closed; export const isClosedImpl = (s) => () => s.closed;
/** @type {(s: Stream.Writable | Stream.Transform) => () => boolean} */ /** @type {(s: Stream.Writable | Stream.Transform) => () => boolean} */
export const isWritableImpl = (s) => () => s.writable; export const isWritableImpl = (s) => () => s.writable;
/** @type {(s: Stream.Writable | Stream.Transform) => () => boolean} */
export const needsDrainImpl = (s) => () => s.writableNeedDrain;
/** @type {(s: Stream.Readable | Stream.Transform) => () => boolean} */ /** @type {(s: Stream.Readable | Stream.Transform) => () => boolean} */
export const isReadableEndedImpl = (s) => () => s.readableEnded; export const isReadableEndedImpl = (s) => () => s.readableEnded;

View File

@@ -60,6 +60,8 @@ foreign import isWritableImpl :: forall s. s -> Effect Boolean
foreign import isReadableEndedImpl :: forall s. s -> Effect Boolean foreign import isReadableEndedImpl :: forall s. s -> Effect Boolean
foreign import isWritableEndedImpl :: forall s. s -> Effect Boolean foreign import isWritableEndedImpl :: forall s. s -> Effect Boolean
foreign import isClosedImpl :: forall s. s -> Effect Boolean foreign import isClosedImpl :: forall s. s -> Effect Boolean
foreign import needsDrainImpl :: forall s. s -> Effect Boolean
foreign import readableLengthImpl :: forall s. s -> Effect Int
readResultFFI :: forall a. ReadResultFFI a readResultFFI :: forall a. ReadResultFFI a
readResultFFI = { closed: ReadClosed, wouldBlock: ReadWouldBlock, just: ReadJust } readResultFFI = { closed: ReadClosed, wouldBlock: ReadWouldBlock, just: ReadJust }
@@ -81,25 +83,30 @@ else instance Stream s => Stream s where
isClosed s = isClosed s isClosed s = isClosed s
class Stream s <= Read s a | s -> a where class Stream s <= Read s a | s -> a where
readableLength :: s -> Effect Int
isReadable :: s -> Effect Boolean isReadable :: s -> Effect Boolean
isReadableEnded :: s -> Effect Boolean isReadableEnded :: s -> Effect Boolean
read :: s -> Effect (ReadResult a) read :: s -> Effect (ReadResult a)
class Stream s <= Write s a | s -> a where class Stream s <= Write s a | s -> a where
isWritable :: s -> Effect Boolean isWritable :: s -> Effect Boolean
needsDrain :: s -> Effect Boolean
isWritableEnded :: s -> Effect Boolean isWritableEnded :: s -> Effect Boolean
write :: s -> a -> Effect WriteResult write :: s -> a -> Effect WriteResult
end :: s -> Effect Unit end :: s -> Effect Unit
instance Read (Readable a) a where instance Read (Readable a) a where
readableLength = readableLengthImpl
isReadable = isReadableImpl isReadable = isReadableImpl
isReadableEnded = isReadableEndedImpl isReadableEnded = isReadableEndedImpl
read = readImpl readResultFFI read = readImpl readResultFFI
else instance Read (Transform a b) b where else instance Read (Transform a b) b where
readableLength = readableLengthImpl
isReadable = isReadableImpl isReadable = isReadableImpl
isReadableEnded = isReadableEndedImpl isReadableEnded = isReadableEndedImpl
read = readImpl readResultFFI read = readImpl readResultFFI
else instance (Read s a) => Read s a where else instance (Read s a) => Read s a where
readableLength = readableLengthImpl
isReadable = isReadableImpl isReadable = isReadableImpl
isReadableEnded = isReadableEndedImpl isReadableEnded = isReadableEndedImpl
read s = read s read s = read s
@@ -109,16 +116,19 @@ instance Write (Writable a) a where
isWritableEnded = isWritableEndedImpl isWritableEnded = isWritableEndedImpl
write s = writeImpl writeResultFFI s write s = writeImpl writeResultFFI s
end = endImpl end = endImpl
needsDrain = needsDrainImpl
else instance Write (Transform a b) a where else instance Write (Transform a b) a where
isWritable = isWritableImpl isWritable = isWritableImpl
isWritableEnded = isWritableEndedImpl isWritableEnded = isWritableEndedImpl
write s = writeImpl writeResultFFI s write s = writeImpl writeResultFFI s
end = endImpl end = endImpl
needsDrain = needsDrainImpl
else instance (Write s a) => Write s a where else instance (Write s a) => Write s a where
isWritable = isWritableImpl isWritable = isWritableImpl
isWritableEnded = isWritableEndedImpl isWritableEnded = isWritableEndedImpl
write s a = write s a write s a = write s a
end s = end s end s = end s
needsDrain = needsDrainImpl
withErrorST :: forall s. Stream s => s -> Effect { cancel :: Effect Unit, error :: STRef Global (Maybe Error) } withErrorST :: forall s. Stream s => s -> Effect { cancel :: Effect Unit, error :: STRef Global (Maybe Error) }
withErrorST s = do withErrorST s = do
@@ -146,18 +156,24 @@ fromStringWritable = unsafeCoerce
awaitReadableOrClosed :: forall s a. Read s a => s -> Aff Unit awaitReadableOrClosed :: forall s a. Read s a => s -> Aff Unit
awaitReadableOrClosed s = do awaitReadableOrClosed s = do
closed <- liftEffect $ isClosed s
ended <- liftEffect $ isReadableEnded s
readable <- liftEffect $ isReadable s readable <- liftEffect $ isReadable s
when (not ended && not closed && not readable) length <- liftEffect $ readableLength s
$ liftEither =<< parOneOf [ onceAff0 readableH s $> Right unit, onceAff0 closeH s $> Right unit, Left <$> onceAff1 errorH s ] when (readable && length == 0)
$ liftEither
=<< parOneOf
[ onceAff0 readableH s $> Right unit
, onceAff0 closeH s $> Right unit
, Left <$> onceAff1 errorH s
]
awaitFinished :: forall s a. Write s a => s -> Aff Unit
awaitFinished s = onceAff0 finishH s
awaitWritableOrClosed :: forall s a. Write s a => s -> Aff Unit awaitWritableOrClosed :: forall s a. Write s a => s -> Aff Unit
awaitWritableOrClosed s = do awaitWritableOrClosed s = do
closed <- liftEffect $ isClosed s
ended <- liftEffect $ isWritableEnded s
writable <- liftEffect $ isWritable s writable <- liftEffect $ isWritable s
when (not ended && not closed && not writable) needsDrain <- liftEffect $ needsDrain s
when (writable && needsDrain)
$ liftEither =<< parOneOf [ onceAff0 drainH s $> Right unit, onceAff0 closeH s $> Right unit, Left <$> onceAff1 errorH s ] $ liftEither =<< parOneOf [ onceAff0 drainH s $> Right unit, onceAff0 closeH s $> Right unit, Left <$> onceAff1 errorH s ]
onceAff0 :: forall e. EventHandle0 e -> e -> Aff Unit onceAff0 :: forall e. EventHandle0 e -> e -> Aff Unit
@@ -184,3 +200,6 @@ errorH = EventHandle "error" mkEffectFn1
endH :: forall s a. Write s a => EventHandle0 s endH :: forall s a. Write s a => EventHandle0 s
endH = EventHandle "end" identity endH = EventHandle "end" identity
finishH :: forall s a. Write s a => EventHandle0 s
finishH = EventHandle "finish" identity

View File

@@ -1 +0,0 @@
module Pipes.CSV.Parse where

View File

@@ -2,12 +2,9 @@ module Pipes.Collect where
import Prelude import Prelude
import Control.Monad.Maybe.Trans (MaybeT(..), runMaybeT)
import Control.Monad.Rec.Class (class MonadRec, Step(..), tailRecM) import Control.Monad.Rec.Class (class MonadRec, Step(..), tailRecM)
import Control.Monad.ST.Class (liftST) import Control.Monad.ST.Class (liftST)
import Control.Monad.Trans.Class (lift)
import Data.Array.ST as Array.ST import Data.Array.ST as Array.ST
import Data.Either (hush)
import Data.HashMap (HashMap) import Data.HashMap (HashMap)
import Data.HashMap as HashMap import Data.HashMap as HashMap
import Data.Hashable (class Hashable) import Data.Hashable (class Hashable)
@@ -15,45 +12,59 @@ import Data.List (List)
import Data.List as List import Data.List as List
import Data.Map (Map) import Data.Map (Map)
import Data.Map as Map import Data.Map as Map
import Data.Maybe (fromMaybe) import Data.Maybe (Maybe(..), maybe)
import Data.Tuple.Nested (type (/\), (/\)) import Data.Tuple.Nested (type (/\), (/\))
import Effect.Class (class MonadEffect, liftEffect) import Effect.Class (class MonadEffect, liftEffect)
import Foreign.Object (Object) import Foreign.Object (Object)
import Foreign.Object.ST as Object.ST import Foreign.Object.ST as Object.ST
import Foreign.Object.ST.Unsafe as Object.ST.Unsafe import Foreign.Object.ST.Unsafe as Object.ST.Unsafe
import Pipes (next) as Pipes import Node.Buffer (Buffer)
import Node.Buffer as Buffer
import Pipes.Core (Producer) import Pipes.Core (Producer)
import Pipes.Internal (Proxy(..))
-- | Fold every value produced
-- |
-- | Uses `MonadRec`, supporting producers of arbitrary length.
fold :: forall a b m. MonadRec m => (b -> a -> b) -> b -> Producer a m Unit -> m b
fold f b p =
let
insertNext b' p' = runMaybeT do
a /\ p'' <- MaybeT $ hush <$> Pipes.next p'
pure $ Loop $ f b' a /\ p''
in
flip tailRecM (b /\ p) \(b' /\ p') -> fromMaybe (Done b') <$> insertNext b' p'
-- | Fold every value produced with a monadic action -- | Fold every value produced with a monadic action
-- | -- |
-- | Uses `MonadRec`, supporting producers of arbitrary length. -- | Uses `MonadRec`, supporting producers of arbitrary length.
traverse :: forall a b m. MonadRec m => (b -> a -> m b) -> b -> Producer a m Unit -> m b traverse :: forall a b m. MonadRec m => (b -> a -> m b) -> b -> Producer a m Unit -> m b
traverse f b p = traverse f b0 p0 =
let flip tailRecM (p0 /\ b0) \(p /\ b) ->
insertNext b' p' = runMaybeT do case p of
a /\ p'' <- MaybeT $ hush <$> Pipes.next p' Respond a m -> do
b'' <- lift $ f b' a b' <- f b a
pure $ Loop $ b'' /\ p'' pure $ Loop $ m unit /\ b'
in M m -> do
flip tailRecM (b /\ p) \(b' /\ p') -> fromMaybe (Done b') <$> insertNext b' p' n <- m
pure $ Loop $ (n /\ b)
Request _ _ -> pure $ Done b
Pure _ -> pure $ Done b
-- | Fold every value produced
-- |
-- | Uses `MonadRec`, supporting producers of arbitrary length.
fold :: forall a b m. MonadRec m => (b -> a -> b) -> b -> Producer a m Unit -> m b
fold f b0 p0 = traverse (\b a -> pure $ f b a) b0 p0
-- | Execute a monadic action on every item in a producer. -- | Execute a monadic action on every item in a producer.
-- | -- |
-- | Uses `MonadRec`, supporting producers of arbitrary length. -- | Uses `MonadRec`, supporting producers of arbitrary length.
foreach :: forall a m. MonadRec m => (a -> m Unit) -> Producer a m Unit -> m Unit foreach :: forall a m. MonadRec m => (a -> m Unit) -> Producer a m Unit -> m Unit
foreach f = traverse (const f) unit foreach f p0 = traverse (\_ a -> f a) unit p0
-- | Concatenate all produced buffers
toBuffer :: forall m. MonadRec m => MonadEffect m => Producer Buffer m Unit -> m Buffer
toBuffer p =
(liftEffect <<< maybe (Buffer.alloc 0) pure)
=<< traverse
( flip \b ->
case _ of
Just acc -> do
new <- liftEffect $ Buffer.concat [ acc, b ]
pure $ Just new
_ -> pure $ Just b
)
Nothing
p
-- | Collect all values from a `Producer` into an array. -- | Collect all values from a `Producer` into an array.
toArray :: forall a m. MonadRec m => MonadEffect m => Producer a m Unit -> m (Array a) toArray :: forall a m. MonadRec m => MonadEffect m => Producer a m Unit -> m (Array a)

View File

@@ -1 +1,64 @@
module Pipes.Construct where module Pipes.Construct where
import Prelude
import Control.Monad.Maybe.Trans (MaybeT(..), runMaybeT)
import Control.Monad.Rec.Class (class MonadRec, Step(..), tailRecM)
import Control.Monad.ST.Class (liftST)
import Control.Monad.Trans.Class (lift)
import Data.Array as Array
import Data.Array.ST as Array.ST
import Data.List (List)
import Data.List as List
import Data.Map (Map)
import Data.Map.Internal as Map.Internal
import Data.Maybe (fromMaybe)
import Data.Tuple.Nested (type (/\), (/\))
import Effect.Class (class MonadEffect, liftEffect)
import Pipes (yield, (>->))
import Pipes.Core (Producer)
import Pipes.Prelude as Pipe
import Pipes.Util as Pipe.Util
-- Producer that will emit monotonically increasing integers
-- ex `monotonic 0 -> 0 1 2 3 4 5 6 7 ..`
monotonic :: forall m. MonadRec m => Int -> Producer Int m Unit
monotonic start = flip tailRecM start \n -> yield n $> Loop (n + 1)
-- Producer that will emit integers from `start` (inclusive) to `end` (exclusive)
range :: forall m. MonadRec m => Int -> Int -> Producer Int m Unit
range start end = monotonic start >-> Pipe.take end
-- | Stack-safe producer that yields every value in an Array
eachArray :: forall a m. MonadRec m => Array a -> Producer a m Unit
eachArray as = monotonic 0 >-> Pipe.map (Array.index as) >-> Pipe.Util.whileJust
-- | Stack-safe producer that yields every value in a List
eachList :: forall a m. MonadRec m => List a -> Producer a m Unit
eachList init =
flip tailRecM init \as -> fromMaybe (Done unit) <$> runMaybeT do
head <- MaybeT $ pure $ List.head as
tail <- MaybeT $ pure $ List.tail as
lift $ yield head
pure $ Loop tail
-- | Stack-safe producer that yields every value in a Map
eachMap :: forall k v m. MonadEffect m => MonadRec m => Map k v -> Producer (k /\ v) m Unit
eachMap init = do
stack <- liftEffect $ liftST $ Array.ST.new
let
push a = void $ liftEffect $ liftST $ Array.ST.push a stack
pop = liftEffect $ liftST $ Array.ST.pop stack
flip tailRecM init case _ of
Map.Internal.Leaf -> fromMaybe (Done unit) <$> runMaybeT do
a <- MaybeT pop
pure $ Loop a
Map.Internal.Node _ _ k v Map.Internal.Leaf Map.Internal.Leaf -> do
yield $ k /\ v
pure $ Loop Map.Internal.Leaf
Map.Internal.Node _ _ k v Map.Internal.Leaf r -> do
yield $ k /\ v
pure $ Loop r
Map.Internal.Node a b k v l r -> do
push $ Map.Internal.Node a b k v Map.Internal.Leaf r
pure $ Loop l

View File

@@ -8,7 +8,7 @@ import Effect.Aff.Class (class MonadAff)
import Effect.Class (liftEffect) import Effect.Class (liftEffect)
import Effect.Exception (Error) import Effect.Exception (Error)
import Node.Buffer (Buffer) import Node.Buffer (Buffer)
import Node.FS.Stream (WriteStreamOptions) import Node.FS.Stream (WriteStreamOptions, ReadStreamOptions)
import Node.FS.Stream as FS.Stream import Node.FS.Stream as FS.Stream
import Node.Path (FilePath) import Node.Path (FilePath)
import Node.Stream.Object as O import Node.Stream.Object as O
@@ -61,3 +61,19 @@ read :: forall m. MonadAff m => MonadThrow Error m => FilePath -> Producer (Mayb
read p = do read p = do
r <- liftEffect $ FS.Stream.createReadStream p r <- liftEffect $ FS.Stream.createReadStream p
fromReadable $ O.fromBufferReadable r fromReadable $ O.fromBufferReadable r
-- | Creates a `fs.Readable` stream for the file at the given path.
-- |
-- | Emits `Nothing` before closing. To opt out of this behavior,
-- | use `Pipes.Node.Stream.withoutEOS` or `Pipes.Node.Stream.unEOS`.
read'
:: forall r trash m
. Union r trash ReadStreamOptions
=> MonadAff m
=> MonadThrow Error m
=> Record r
-> FilePath
-> Producer (Maybe Buffer) m Unit
read' opts p = do
r <- liftEffect $ FS.Stream.createReadStream' p opts
fromReadable $ O.fromBufferReadable r

View File

@@ -1,6 +1,6 @@
module Pipes.Node.Stream where module Pipes.Node.Stream where
import Prelude import Prelude hiding (join)
import Control.Monad.Error.Class (class MonadThrow, throwError) import Control.Monad.Error.Class (class MonadThrow, throwError)
import Control.Monad.Rec.Class (class MonadRec, Step(..), tailRecM) import Control.Monad.Rec.Class (class MonadRec, Step(..), tailRecM)
@@ -34,7 +34,6 @@ fromReadable r =
pure $ Done unit pure $ Done unit
go { error, cancel } = do go { error, cancel } = do
liftAff $ delay $ wrap 0.0
err <- liftEffect $ liftST $ STRef.read error err <- liftEffect $ liftST $ STRef.read error
for_ err throwError for_ err throwError
@@ -58,24 +57,23 @@ fromWritable w =
cleanup rmErrorListener = do cleanup rmErrorListener = do
liftEffect rmErrorListener liftEffect rmErrorListener
liftEffect $ O.end w liftEffect $ O.end w
liftAff $ O.awaitFinished w
pure $ Done unit pure $ Done unit
go { error, cancel } = do go { error, cancel } = do
liftAff $ delay $ wrap 0.0
err <- liftEffect $ liftST $ STRef.read error err <- liftEffect $ liftST $ STRef.read error
for_ err throwError for_ err throwError
needsDrain <- liftEffect $ O.needsDrain w
when needsDrain $ liftAff $ O.awaitWritableOrClosed w
ma <- await ma <- await
case ma of case ma of
Nothing -> cleanup cancel Nothing -> cleanup cancel
Just a -> do Just a -> do
res <- liftEffect $ O.write w a res <- liftEffect $ O.write w a
case res of case res of
O.WriteOk -> pure $ Loop { error, cancel }
O.WriteWouldBlock -> do
liftAff (O.awaitWritableOrClosed w)
pure $ Loop { error, cancel }
O.WriteClosed -> cleanup cancel O.WriteClosed -> cleanup cancel
_ -> pure $ Loop { error, cancel }
in in
do do
r <- liftEffect $ O.withErrorST w r <- liftEffect $ O.withErrorST w
@@ -93,29 +91,43 @@ fromTransform t =
liftEffect $ removeErrorListener liftEffect $ removeErrorListener
fromReadable t fromReadable t
pure $ Done unit pure $ Done unit
yieldFromReadableHalf = do
res <- liftEffect (O.read t) yieldWhileReadable = do
flip tailRecM unit \_ -> do
res <- liftEffect $ O.read t
case res of
O.ReadJust a -> yield (Just a) $> Loop unit
_ -> pure $ Done unit
maybeYield1 = do
res <- liftEffect $ O.read t
case res of case res of
O.ReadJust a -> yield (Just a) O.ReadJust a -> yield $ Just a
O.ReadWouldBlock -> pure unit _ -> pure unit
O.ReadClosed -> yield Nothing *> pure unit
go { error, cancel } = do go { error, cancel } = do
liftAff $ delay $ wrap 0.0
err <- liftEffect $ liftST $ STRef.read error err <- liftEffect $ liftST $ STRef.read error
for_ err throwError for_ err throwError
ma <- await needsDrain <- liftEffect $ O.needsDrain t
case ma of if needsDrain then do
Nothing -> cleanup cancel liftAff $ delay $ wrap 0.0
Just a' -> do yieldWhileReadable
res <- liftEffect $ O.write t a' pure $ Loop { error, cancel }
yieldFromReadableHalf else do
case res of ma <- await
O.WriteClosed -> cleanup cancel case ma of
O.WriteOk -> pure $ Loop { error, cancel } Nothing -> cleanup cancel
O.WriteWouldBlock -> do Just a' -> do
liftAff $ O.awaitWritableOrClosed t res <- liftEffect $ O.write t a'
pure $ Loop { error, cancel } case res of
O.WriteClosed -> cleanup cancel
O.WriteOk -> do
maybeYield1
pure $ Loop { error, cancel }
O.WriteWouldBlock -> do
yieldWhileReadable
pure $ Loop { error, cancel }
in in
do do
r <- liftEffect $ O.withErrorST t r <- liftEffect $ O.withErrorST t

View File

@@ -3,23 +3,35 @@ module Pipes.Util where
import Prelude import Prelude
import Control.Monad.Maybe.Trans (MaybeT(..), runMaybeT) import Control.Monad.Maybe.Trans (MaybeT(..), runMaybeT)
import Control.Monad.Rec.Class (class MonadRec, forever, whileJust) import Control.Monad.Rec.Class (class MonadRec, Step(..), forever, tailRecM)
import Control.Monad.Rec.Class as Rec
import Control.Monad.ST.Class (liftST) import Control.Monad.ST.Class (liftST)
import Control.Monad.ST.Ref (STRef) import Control.Monad.ST.Ref (STRef)
import Control.Monad.ST.Ref as STRef import Control.Monad.ST.Ref as STRef
import Control.Monad.Trans.Class (lift) import Control.Monad.Trans.Class (lift)
import Data.Array.ST (STArray) import Data.Array.ST (STArray)
import Data.Array.ST as Array.ST import Data.Array.ST as Array.ST
import Data.Either (hush)
import Data.HashSet as HashSet import Data.HashSet as HashSet
import Data.Hashable (class Hashable, hash) import Data.Hashable (class Hashable, hash)
import Data.List.NonEmpty (NonEmptyList) import Data.List.NonEmpty (NonEmptyList)
import Data.Maybe (Maybe(..)) import Data.Maybe (Maybe(..), fromMaybe)
import Data.Tuple.Nested (type (/\), (/\)) import Data.Tuple.Nested (type (/\), (/\))
import Effect.Class (class MonadEffect, liftEffect) import Effect.Class (class MonadEffect, liftEffect)
import Pipes (await, yield) import Pipes (await, yield)
import Pipes.Core (Pipe) import Pipes as Pipes
import Pipes.Core (Pipe, Producer)
import Pipes.Internal (Proxy(..)) import Pipes.Internal (Proxy(..))
-- | Re-yield all `Just`s, and close when `Nothing` is encountered
whileJust :: forall m a. MonadRec m => Pipe (Maybe a) a m Unit
whileJust = do
first <- await
flip tailRecM first $ \ma -> fromMaybe (Done unit) <$> runMaybeT do
a <- MaybeT $ pure ma
lift $ yield a
lift $ Loop <$> await
-- | Yields a separator value `sep` between received values -- | Yields a separator value `sep` between received values
-- | -- |
-- | ```purescript -- | ```purescript
@@ -33,7 +45,7 @@ intersperse sep = do
getIsFirst = liftEffect $ liftST $ STRef.read isFirstST getIsFirst = liftEffect $ liftST $ STRef.read isFirstST
markNotFirst = void $ liftEffect $ liftST $ STRef.write false isFirstST markNotFirst = void $ liftEffect $ liftST $ STRef.write false isFirstST
whileJust $ runMaybeT do Rec.whileJust $ runMaybeT do
a <- MaybeT await a <- MaybeT await
isFirst <- getIsFirst isFirst <- getIsFirst
if isFirst then markNotFirst else lift $ yield $ Just sep if isFirst then markNotFirst else lift $ yield $ Just sep
@@ -41,6 +53,16 @@ intersperse sep = do
yield Nothing yield Nothing
-- Pair every emitted value from 2 producers together, exiting when either exits.
zip :: forall a b m. MonadRec m => Producer a m Unit -> Producer b m Unit -> Producer (a /\ b) m Unit
zip as bs =
flip tailRecM (as /\ bs) \(as' /\ bs') ->
fromMaybe (Done unit) <$> runMaybeT do
a /\ as'' <- MaybeT $ lift $ hush <$> Pipes.next as'
b /\ bs'' <- MaybeT $ lift $ hush <$> Pipes.next bs'
lift $ yield $ a /\ b
pure $ Loop $ as'' /\ bs''
-- | Accumulate values in chunks of a given size. -- | Accumulate values in chunks of a given size.
-- | -- |
-- | If the pipe closes without yielding a multiple of `size` elements, -- | If the pipe closes without yielding a multiple of `size` elements,
@@ -60,12 +82,13 @@ chunked size = do
void $ flip STRef.write chunkST =<< Array.ST.new void $ flip STRef.write chunkST =<< Array.ST.new
Array.ST.unsafeFreeze chunkArray Array.ST.unsafeFreeze chunkArray
whileJust $ runMaybeT do Rec.whileJust $ runMaybeT do
a <- MaybeT await a <- MaybeT await
chunkPut a chunkPut a
len <- chunkLength len <- lift chunkLength
when (len >= size) $ lift $ yield =<< Just <$> chunkTake when (len >= size) $ lift $ yield =<< Just <$> chunkTake
yield =<< Just <$> chunkTake len <- chunkLength
when (len > 0) $ yield =<< Just <$> chunkTake
yield Nothing yield Nothing
-- | Equivalent of unix `uniq`, filtering out duplicate values passed to it. -- | Equivalent of unix `uniq`, filtering out duplicate values passed to it.

View File

@@ -9,12 +9,14 @@ import Test.Pipes.Node.Stream as Test.Pipes.Node.Stream
import Test.Pipes.Node.Buffer as Test.Pipes.Node.Buffer import Test.Pipes.Node.Buffer as Test.Pipes.Node.Buffer
import Test.Pipes.Node.FS as Test.Pipes.Node.FS import Test.Pipes.Node.FS as Test.Pipes.Node.FS
import Test.Pipes.Collect as Test.Pipes.Collect import Test.Pipes.Collect as Test.Pipes.Collect
import Test.Pipes.Construct as Test.Pipes.Construct
import Test.Spec.Reporter (specReporter) import Test.Spec.Reporter (specReporter)
import Test.Spec.Runner (defaultConfig, runSpec') import Test.Spec.Runner (defaultConfig, runSpec')
main :: Effect Unit main :: Effect Unit
main = launchAff_ $ runSpec' (defaultConfig { failFast = true, timeout = Nothing }) [ specReporter ] do main = launchAff_ $ runSpec' (defaultConfig { exit = false, timeout = Nothing }) [ specReporter ] do
Test.Pipes.Node.Stream.spec Test.Pipes.Node.Stream.spec
Test.Pipes.Node.Buffer.spec Test.Pipes.Node.Buffer.spec
Test.Pipes.Node.FS.spec Test.Pipes.Node.FS.spec
Test.Pipes.Collect.spec Test.Pipes.Collect.spec
Test.Pipes.Construct.spec

View File

@@ -0,0 +1,58 @@
module Test.Pipes.Construct where
import Prelude
import Data.Array as Array
import Data.List as List
import Data.Map as Map
import Data.Tuple.Nested (type (/\), (/\))
import Effect.Class (liftEffect)
import Pipes.Collect as Pipes.Collect
import Pipes.Construct as Pipes.Construct
import Test.Spec (Spec, describe, it)
import Test.Spec.Assertions (shouldEqual)
spec :: Spec Unit
spec =
describe "Test.Pipes.Construct" do
describe "eachMap" do
it "empty map" do
kvs <- Pipes.Collect.toArray $ Pipes.Construct.eachMap Map.empty
kvs `shouldEqual` ([] :: Array (Int /\ Int))
it "nonempty map" do
let
exp = (\n -> n /\ n) <$> Array.range 0 99999
map = Map.fromFoldable exp
kvs <-
liftEffect
$ Pipes.Collect.toArray
$ Pipes.Construct.eachMap
$ map
kvs `shouldEqual` exp
describe "eachArray" do
it "empty array" do
kvs <- Pipes.Collect.toArray $ Pipes.Construct.eachArray []
kvs `shouldEqual` ([] :: Array Int)
it "nonempty array" do
let
inp = (\n -> n /\ n) <$> Array.range 0 99999
kvs <-
liftEffect
$ Pipes.Collect.toArray
$ Pipes.Construct.eachArray
$ inp
kvs `shouldEqual` inp
describe "eachList" do
it "empty list" do
kvs <- Pipes.Collect.toArray $ Pipes.Construct.eachList List.Nil
kvs `shouldEqual` ([] :: Array Int)
it "nonempty list" do
let
inp = (\n -> n /\ n) <$> Array.range 0 99999
kvs <-
liftEffect
$ Pipes.Collect.toArray
$ Pipes.Construct.eachList
$ List.fromFoldable
$ inp
kvs `shouldEqual` inp

View File

@@ -34,7 +34,7 @@ spec = describe "Pipes.Node.FS" do
s <- fold <$> Pipes.toListM (Pipes.Node.FS.read p >-> unEOS >-> Pipes.Node.Buffer.toString UTF8) s <- fold <$> Pipes.toListM (Pipes.Node.FS.read p >-> unEOS >-> Pipes.Node.Buffer.toString UTF8)
s `shouldEqual` "foo" s `shouldEqual` "foo"
around tmpFile $ it "fails if the file already exists" \p -> do around tmpFile $ it "fails if the file already exists" \p -> do
liftEffect $ FS.writeTextFile UTF8 "foo" p liftEffect $ FS.writeTextFile UTF8 p "foo"
flip catchError (const $ pure unit) do flip catchError (const $ pure unit) do
Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.create p Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.create p
fail "should have thrown" fail "should have thrown"
@@ -44,7 +44,7 @@ spec = describe "Pipes.Node.FS" do
contents <- liftEffect $ FS.readTextFile UTF8 p contents <- liftEffect $ FS.readTextFile UTF8 p
contents `shouldEqual` "foo" contents `shouldEqual` "foo"
around tmpFile $ it "fails if the file already exists" \p -> do around tmpFile $ it "fails if the file already exists" \p -> do
liftEffect $ FS.writeTextFile UTF8 "foo" p liftEffect $ FS.writeTextFile UTF8 p "foo"
flip catchError (const $ pure unit) do flip catchError (const $ pure unit) do
Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.create p Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.create p
fail "should have thrown" fail "should have thrown"