10 Commits

Author SHA1 Message Date
3db5cc44a9 chore: prepare v1.3.1 2024-05-13 13:27:28 -05:00
1a5ca66e83 fix: Pipes.Node.FS.read' 2024-05-13 13:27:18 -05:00
54d9d57927 chore: prepare v1.3.0 2024-05-13 11:21:23 -05:00
a5c535fb1e feat: Pipes.Construct 2024-05-13 11:21:06 -05:00
7e6c6af3dd chore: prepare v1.2.3 2024-05-11 22:11:44 -05:00
faf49fafd5 chore: lock 2024-05-11 22:11:40 -05:00
04815f66a4 chore: prepare v1.2.2 2024-05-11 22:10:57 -05:00
fd895de148 fix: ensure-ranges 2024-05-11 22:09:45 -05:00
b618ef1819 chore: prepare v1.2.1 2024-05-11 22:08:38 -05:00
407491f055 fix: generalize all Affs to MonadAffs 2024-05-11 22:08:27 -05:00
13 changed files with 239 additions and 73 deletions

View File

@@ -1,6 +1,6 @@
{
"name": "purescript-node-stream-pipes",
"version": "v1.2.0",
"version": "v1.3.1",
"type": "module",
"dependencies": {
"csv-parse": "^5.5.5",

View File

@@ -9,8 +9,8 @@ workspace:
- either: ">=6.1.0 <7.0.0"
- exceptions: ">=6.0.0 <7.0.0"
- foldable-traversable: ">=6.0.0 <7.0.0"
- foreign-object
- lists
- foreign-object: ">=4.1.0 <5.0.0"
- lists: ">=7.0.0 <8.0.0"
- maybe: ">=6.0.0 <7.0.0"
- mmorph: ">=7.0.0 <8.0.0"
- newtype: ">=5.0.0 <6.0.0"
@@ -20,7 +20,7 @@ workspace:
- node-path: ">=5.0.0 <6.0.0"
- node-streams: ">=9.0.0 <10.0.0"
- node-zlib: ">=0.4.0 <0.5.0"
- ordered-collections
- ordered-collections: ">=3.2.0 <4.0.0"
- parallel: ">=6.0.0 <7.0.0"
- pipes: ">=8.0.0 <9.0.0"
- prelude: ">=6.0.1 <7.0.0"
@@ -28,8 +28,8 @@ workspace:
- strings: ">=6.0.1 <7.0.0"
- tailrec: ">=6.1.0 <7.0.0"
- transformers: ">=6.0.0 <7.0.0"
- tuples
- unordered-collections
- tuples: ">=7.0.0 <8.0.0"
- unordered-collections: ">=3.1.0 <4.0.0"
- unsafe-coerce: ">=6.0.0 <7.0.0"
test_dependencies:
- console

View File

@@ -1,7 +1,7 @@
package:
name: node-stream-pipes
publish:
version: '1.2.0'
version: '1.3.1'
license: 'GPL-3.0-or-later'
location:
githubOwner: 'cakekindel'
@@ -10,17 +10,14 @@ package:
strict: true
pedanticPackages: true
dependencies:
- foreign-object
- lists
- ordered-collections
- tuples
- unordered-collections
- aff: ">=7.1.0 <8.0.0"
- arrays: ">=7.3.0 <8.0.0"
- effect: ">=4.0.0 <5.0.0"
- either: ">=6.1.0 <7.0.0"
- exceptions: ">=6.0.0 <7.0.0"
- foldable-traversable: ">=6.0.0 <7.0.0"
- foreign-object: ">=4.1.0 <5.0.0"
- lists: ">=7.0.0 <8.0.0"
- maybe: ">=6.0.0 <7.0.0"
- mmorph: ">=7.0.0 <8.0.0"
- newtype: ">=5.0.0 <6.0.0"
@@ -30,6 +27,7 @@ package:
- node-path: ">=5.0.0 <6.0.0"
- node-streams: ">=9.0.0 <10.0.0"
- node-zlib: ">=0.4.0 <0.5.0"
- ordered-collections: ">=3.2.0 <4.0.0"
- parallel: ">=6.0.0 <7.0.0"
- pipes: ">=8.0.0 <9.0.0"
- prelude: ">=6.0.1 <7.0.0"
@@ -37,9 +35,13 @@ package:
- strings: ">=6.0.1 <7.0.0"
- tailrec: ">=6.1.0 <7.0.0"
- transformers: ">=6.0.0 <7.0.0"
- tuples: ">=7.0.0 <8.0.0"
- unordered-collections: ">=3.1.0 <4.0.0"
- unsafe-coerce: ">=6.0.0 <7.0.0"
test:
main: Test.Main
build:
strict: true
dependencies:
- console
- gen

View File

@@ -152,6 +152,9 @@ awaitReadableOrClosed s = do
when (not ended && not closed && not readable)
$ liftEither =<< parOneOf [ onceAff0 readableH s $> Right unit, onceAff0 closeH s $> Right unit, Left <$> onceAff1 errorH s ]
awaitFinished :: forall s a. Write s a => s -> Aff Unit
awaitFinished s = onceAff0 finishH s
awaitWritableOrClosed :: forall s a. Write s a => s -> Aff Unit
awaitWritableOrClosed s = do
closed <- liftEffect $ isClosed s
@@ -184,3 +187,6 @@ errorH = EventHandle "error" mkEffectFn1
endH :: forall s a. Write s a => EventHandle0 s
endH = EventHandle "end" identity
finishH :: forall s a. Write s a => EventHandle0 s
finishH = EventHandle "finish" identity

View File

@@ -2,12 +2,9 @@ module Pipes.Collect where
import Prelude
import Control.Monad.Maybe.Trans (MaybeT(..), runMaybeT)
import Control.Monad.Rec.Class (class MonadRec, Step(..), tailRecM)
import Control.Monad.ST.Class (liftST)
import Control.Monad.Trans.Class (lift)
import Data.Array.ST as Array.ST
import Data.Either (hush)
import Data.HashMap (HashMap)
import Data.HashMap as HashMap
import Data.Hashable (class Hashable)
@@ -15,45 +12,37 @@ import Data.List (List)
import Data.List as List
import Data.Map (Map)
import Data.Map as Map
import Data.Maybe (fromMaybe)
import Data.Tuple.Nested (type (/\), (/\))
import Effect.Class (class MonadEffect, liftEffect)
import Foreign.Object (Object)
import Foreign.Object.ST as Object.ST
import Foreign.Object.ST.Unsafe as Object.ST.Unsafe
import Pipes (next) as Pipes
import Pipes.Core (Producer)
-- | Fold every value produced
-- |
-- | Uses `MonadRec`, supporting producers of arbitrary length.
fold :: forall a b m. MonadRec m => (b -> a -> b) -> b -> Producer a m Unit -> m b
fold f b p =
let
insertNext b' p' = runMaybeT do
a /\ p'' <- MaybeT $ hush <$> Pipes.next p'
pure $ Loop $ f b' a /\ p''
in
flip tailRecM (b /\ p) \(b' /\ p') -> fromMaybe (Done b') <$> insertNext b' p'
import Pipes.Internal (Proxy(..))
-- | Fold every value produced with a monadic action
-- |
-- | Uses `MonadRec`, supporting producers of arbitrary length.
traverse :: forall a b m. MonadRec m => (b -> a -> m b) -> b -> Producer a m Unit -> m b
traverse f b p =
let
insertNext b' p' = runMaybeT do
a /\ p'' <- MaybeT $ hush <$> Pipes.next p'
b'' <- lift $ f b' a
pure $ Loop $ b'' /\ p''
in
flip tailRecM (b /\ p) \(b' /\ p') -> fromMaybe (Done b') <$> insertNext b' p'
traverse f b0 p0 =
flip tailRecM (p0 /\ b0) \(p /\ b) ->
case p of
Respond a m -> Loop <$> (m unit /\ _) <$> f b a
M m -> Loop <$> (_ /\ b) <$> m
Request _ _ -> pure $ Done b
Pure _ -> pure $ Done b
-- | Fold every value produced
-- |
-- | Uses `MonadRec`, supporting producers of arbitrary length.
fold :: forall a b m. MonadRec m => (b -> a -> b) -> b -> Producer a m Unit -> m b
fold f b0 p0 = traverse (\b a -> pure $ f b a) b0 p0
-- | Execute a monadic action on every item in a producer.
-- |
-- | Uses `MonadRec`, supporting producers of arbitrary length.
foreach :: forall a m. MonadRec m => (a -> m Unit) -> Producer a m Unit -> m Unit
foreach f = traverse (const f) unit
foreach f p0 = traverse (\_ a -> f a) unit p0
-- | Collect all values from a `Producer` into an array.
toArray :: forall a m. MonadRec m => MonadEffect m => Producer a m Unit -> m (Array a)

View File

@@ -1 +1,64 @@
module Pipes.Construct where
import Prelude
import Control.Monad.Maybe.Trans (MaybeT(..), runMaybeT)
import Control.Monad.Rec.Class (class MonadRec, Step(..), tailRecM)
import Control.Monad.ST.Class (liftST)
import Control.Monad.Trans.Class (lift)
import Data.Array as Array
import Data.Array.ST as Array.ST
import Data.List (List)
import Data.List as List
import Data.Map (Map)
import Data.Map.Internal as Map.Internal
import Data.Maybe (fromMaybe)
import Data.Tuple.Nested (type (/\), (/\))
import Effect.Class (class MonadEffect, liftEffect)
import Pipes (yield, (>->))
import Pipes.Core (Producer)
import Pipes.Prelude as Pipe
import Pipes.Util as Pipe.Util
-- Producer that will emit monotonically increasing integers
-- ex `monotonic 0 -> 0 1 2 3 4 5 6 7 ..`
monotonic :: forall m. MonadRec m => Int -> Producer Int m Unit
monotonic start = flip tailRecM start \n -> yield n $> Loop (n + 1)
-- Producer that will emit integers from `start` (inclusive) to `end` (exclusive)
range :: forall m. MonadRec m => Int -> Int -> Producer Int m Unit
range start end = monotonic start >-> Pipe.take end
-- | Stack-safe producer that yields every value in an Array
eachArray :: forall a m. MonadRec m => Array a -> Producer a m Unit
eachArray as = monotonic 0 >-> Pipe.map (Array.index as) >-> Pipe.Util.whileJust
-- | Stack-safe producer that yields every value in a List
eachList :: forall a m. MonadRec m => List a -> Producer a m Unit
eachList init =
flip tailRecM init \as -> fromMaybe (Done unit) <$> runMaybeT do
head <- MaybeT $ pure $ List.head as
tail <- MaybeT $ pure $ List.tail as
lift $ yield head
pure $ Loop tail
-- | Stack-safe producer that yields every value in a Map
eachMap :: forall k v m. MonadEffect m => MonadRec m => Map k v -> Producer (k /\ v) m Unit
eachMap init = do
stack <- liftEffect $ liftST $ Array.ST.new
let
push a = void $ liftEffect $ liftST $ Array.ST.push a stack
pop = liftEffect $ liftST $ Array.ST.pop stack
flip tailRecM init case _ of
Map.Internal.Leaf -> fromMaybe (Done unit) <$> runMaybeT do
a <- MaybeT pop
pure $ Loop a
Map.Internal.Node _ _ k v Map.Internal.Leaf Map.Internal.Leaf -> do
yield $ k /\ v
pure $ Loop Map.Internal.Leaf
Map.Internal.Node _ _ k v Map.Internal.Leaf r -> do
yield $ k /\ v
pure $ Loop r
Map.Internal.Node a b k v l r -> do
push $ Map.Internal.Node a b k v Map.Internal.Leaf r
pure $ Loop l

View File

@@ -2,11 +2,13 @@ module Pipes.Node.FS where
import Prelude
import Control.Monad.Error.Class (class MonadThrow)
import Data.Maybe (Maybe)
import Effect.Aff (Aff)
import Effect.Aff.Class (class MonadAff)
import Effect.Class (liftEffect)
import Effect.Exception (Error)
import Node.Buffer (Buffer)
import Node.FS.Stream (WriteStreamOptions)
import Node.FS.Stream (WriteStreamOptions, ReadStreamOptions)
import Node.FS.Stream as FS.Stream
import Node.Path (FilePath)
import Node.Stream.Object as O
@@ -22,11 +24,13 @@ import Prim.Row (class Union)
-- | See `Pipes.Node.Stream.withEOS` for converting `Producer a`
-- | into `Producer (Maybe a)`, emitting `Nothing` before exiting.
write
:: forall r trash
:: forall r trash m
. Union r trash WriteStreamOptions
=> MonadAff m
=> MonadThrow Error m
=> Record r
-> FilePath
-> Consumer (Maybe Buffer) Aff Unit
-> Consumer (Maybe Buffer) m Unit
write o p = do
w <- liftEffect $ FS.Stream.createWriteStream' p o
fromWritable $ O.fromBufferWritable w
@@ -34,26 +38,42 @@ write o p = do
-- | Open a file in write mode, failing if the file already exists.
-- |
-- | `write {flags: "wx"}`
create :: FilePath -> Consumer (Maybe Buffer) Aff Unit
create :: forall m. MonadAff m => MonadThrow Error m => FilePath -> Consumer (Maybe Buffer) m Unit
create = write { flags: "wx" }
-- | Open a file in write mode, truncating it if the file already exists.
-- |
-- | `write {flags: "w"}`
truncate :: FilePath -> Consumer (Maybe Buffer) Aff Unit
truncate :: forall m. MonadAff m => MonadThrow Error m => FilePath -> Consumer (Maybe Buffer) m Unit
truncate = write { flags: "w" }
-- | Open a file in write mode, appending written contents if the file already exists.
-- |
-- | `write {flags: "a"}`
append :: FilePath -> Consumer (Maybe Buffer) Aff Unit
append :: forall m. MonadAff m => MonadThrow Error m => FilePath -> Consumer (Maybe Buffer) m Unit
append = write { flags: "a" }
-- | Creates a `fs.Readable` stream for the file at the given path.
-- |
-- | Emits `Nothing` before closing. To opt out of this behavior,
-- | use `Pipes.Node.Stream.withoutEOS` or `Pipes.Node.Stream.unEOS`.
read :: FilePath -> Producer (Maybe Buffer) Aff Unit
read :: forall m. MonadAff m => MonadThrow Error m => FilePath -> Producer (Maybe Buffer) m Unit
read p = do
r <- liftEffect $ FS.Stream.createReadStream p
fromReadable $ O.fromBufferReadable r
-- | Creates a `fs.Readable` stream for the file at the given path.
-- |
-- | Emits `Nothing` before closing. To opt out of this behavior,
-- | use `Pipes.Node.Stream.withoutEOS` or `Pipes.Node.Stream.unEOS`.
read'
:: forall r trash m
. Union r trash ReadStreamOptions
=> MonadAff m
=> MonadThrow Error m
=> Record r
-> FilePath
-> Producer (Maybe Buffer) m Unit
read' opts p = do
r <- liftEffect $ FS.Stream.createReadStream' p opts
fromReadable $ O.fromBufferReadable r

View File

@@ -2,7 +2,7 @@ module Pipes.Node.Stream where
import Prelude
import Control.Monad.Error.Class (throwError)
import Control.Monad.Error.Class (class MonadThrow, throwError)
import Control.Monad.Rec.Class (class MonadRec, Step(..), tailRecM)
import Control.Monad.ST.Class (liftST)
import Control.Monad.ST.Ref as STRef
@@ -11,9 +11,10 @@ import Data.Maybe (Maybe(..))
import Data.Newtype (wrap)
import Data.Traversable (for_)
import Data.Tuple.Nested ((/\))
import Effect.Aff (Aff, delay)
import Effect.Aff.Class (liftAff)
import Effect.Aff (delay)
import Effect.Aff.Class (class MonadAff, liftAff)
import Effect.Class (liftEffect)
import Effect.Exception (Error)
import Node.Stream.Object as O
import Pipes (await, yield)
import Pipes (for) as P
@@ -25,7 +26,7 @@ import Pipes.Util (InvokeResult(..), invoke)
-- |
-- | This will yield `Nothing` before exiting, signaling
-- | End-of-stream.
fromReadable :: forall s a. O.Read s a => s -> Producer_ (Maybe a) Aff Unit
fromReadable :: forall s a m. MonadThrow Error m => MonadAff m => O.Read s a => s -> Producer_ (Maybe a) m Unit
fromReadable r =
let
cleanup rmErrorListener = do
@@ -40,7 +41,7 @@ fromReadable r =
res <- liftEffect $ O.read r
case res of
O.ReadJust a -> yield (Just a) $> Loop { error, cancel }
O.ReadWouldBlock -> lift (O.awaitReadableOrClosed r) $> Loop { error, cancel }
O.ReadWouldBlock -> liftAff (O.awaitReadableOrClosed r) $> Loop { error, cancel }
O.ReadClosed -> yield Nothing *> cleanup cancel
in
do
@@ -51,12 +52,13 @@ fromReadable r =
-- |
-- | When `Nothing` is piped to this, the stream will
-- | be `end`ed, and the pipe will noop if invoked again.
fromWritable :: forall s a. O.Write s a => s -> Consumer (Maybe a) Aff Unit
fromWritable :: forall s a m. MonadThrow Error m => MonadAff m => O.Write s a => s -> Consumer (Maybe a) m Unit
fromWritable w =
let
cleanup rmErrorListener = do
liftEffect rmErrorListener
liftEffect $ O.end w
liftAff $ O.awaitFinished w
pure $ Done unit
go { error, cancel } = do
@@ -84,7 +86,7 @@ fromWritable w =
-- |
-- | When `Nothing` is piped to this, the `Transform` stream will
-- | be `end`ed, and the pipe will noop if invoked again.
fromTransform :: forall a b. O.Transform a b -> Pipe (Maybe a) (Maybe b) Aff Unit
fromTransform :: forall a b m. MonadThrow Error m => MonadAff m => O.Transform a b -> Pipe (Maybe a) (Maybe b) m Unit
fromTransform t =
let
cleanup removeErrorListener = do
@@ -113,7 +115,7 @@ fromTransform t =
O.WriteClosed -> cleanup cancel
O.WriteOk -> pure $ Loop { error, cancel }
O.WriteWouldBlock -> do
lift (O.awaitWritableOrClosed t)
liftAff $ O.awaitWritableOrClosed t
pure $ Loop { error, cancel }
in
do
@@ -123,13 +125,13 @@ fromTransform t =
-- | Given a `Producer` of values, wrap them in `Just`.
-- |
-- | Before the `Producer` exits, emits `Nothing` as an End-of-stream signal.
withEOS :: forall a. Producer a Aff Unit -> Producer (Maybe a) Aff Unit
withEOS :: forall a m. Monad m => Producer a m Unit -> Producer (Maybe a) m Unit
withEOS a = do
P.for a (yield <<< Just)
yield Nothing
-- | Strip a pipeline of the EOS signal
unEOS :: forall a. Pipe (Maybe a) a Aff Unit
unEOS :: forall a m. Monad m => Pipe (Maybe a) a m Unit
unEOS = P.mapFoldable identity
-- | Lift a `Pipe a a` to `Pipe (Maybe a) (Maybe a)`.

View File

@@ -2,10 +2,12 @@ module Pipes.Node.Zlib where
import Prelude
import Control.Monad.Error.Class (class MonadThrow)
import Data.Maybe (Maybe)
import Effect (Effect)
import Effect.Aff (Aff)
import Effect.Aff.Class (class MonadAff)
import Effect.Class (liftEffect)
import Effect.Exception (Error)
import Node.Buffer (Buffer)
import Node.Stream.Object as O
import Node.Zlib as Zlib
@@ -13,28 +15,28 @@ import Node.Zlib.Types (ZlibStream)
import Pipes.Core (Pipe)
import Pipes.Node.Stream (fromTransform)
fromZlib :: forall r. Effect (ZlibStream r) -> Pipe (Maybe Buffer) (Maybe Buffer) Aff Unit
fromZlib :: forall r m. MonadAff m => MonadThrow Error m => Effect (ZlibStream r) -> Pipe (Maybe Buffer) (Maybe Buffer) m Unit
fromZlib z = do
raw <- liftEffect $ Zlib.toDuplex <$> z
fromTransform $ O.fromBufferTransform raw
gzip :: Pipe (Maybe Buffer) (Maybe Buffer) Aff Unit
gzip :: forall m. MonadAff m => MonadThrow Error m => Pipe (Maybe Buffer) (Maybe Buffer) m Unit
gzip = fromZlib Zlib.createGzip
gunzip :: Pipe (Maybe Buffer) (Maybe Buffer) Aff Unit
gunzip :: forall m. MonadAff m => MonadThrow Error m => Pipe (Maybe Buffer) (Maybe Buffer) m Unit
gunzip = fromZlib Zlib.createGunzip
unzip :: Pipe (Maybe Buffer) (Maybe Buffer) Aff Unit
unzip :: forall m. MonadAff m => MonadThrow Error m => Pipe (Maybe Buffer) (Maybe Buffer) m Unit
unzip = fromZlib Zlib.createUnzip
inflate :: Pipe (Maybe Buffer) (Maybe Buffer) Aff Unit
inflate :: forall m. MonadAff m => MonadThrow Error m => Pipe (Maybe Buffer) (Maybe Buffer) m Unit
inflate = fromZlib Zlib.createInflate
deflate :: Pipe (Maybe Buffer) (Maybe Buffer) Aff Unit
deflate :: forall m. MonadAff m => MonadThrow Error m => Pipe (Maybe Buffer) (Maybe Buffer) m Unit
deflate = fromZlib Zlib.createDeflate
brotliCompress :: Pipe (Maybe Buffer) (Maybe Buffer) Aff Unit
brotliCompress :: forall m. MonadAff m => MonadThrow Error m => Pipe (Maybe Buffer) (Maybe Buffer) m Unit
brotliCompress = fromZlib Zlib.createBrotliCompress
brotliDecompress :: Pipe (Maybe Buffer) (Maybe Buffer) Aff Unit
brotliDecompress :: forall m. MonadAff m => MonadThrow Error m => Pipe (Maybe Buffer) (Maybe Buffer) m Unit
brotliDecompress = fromZlib Zlib.createBrotliDecompress

View File

@@ -3,23 +3,35 @@ module Pipes.Util where
import Prelude
import Control.Monad.Maybe.Trans (MaybeT(..), runMaybeT)
import Control.Monad.Rec.Class (class MonadRec, forever, whileJust)
import Control.Monad.Rec.Class (class MonadRec, Step(..), forever, tailRecM)
import Control.Monad.Rec.Class as Rec
import Control.Monad.ST.Class (liftST)
import Control.Monad.ST.Ref (STRef)
import Control.Monad.ST.Ref as STRef
import Control.Monad.Trans.Class (lift)
import Data.Array.ST (STArray)
import Data.Array.ST as Array.ST
import Data.Either (hush)
import Data.HashSet as HashSet
import Data.Hashable (class Hashable, hash)
import Data.List.NonEmpty (NonEmptyList)
import Data.Maybe (Maybe(..))
import Data.Maybe (Maybe(..), fromMaybe)
import Data.Tuple.Nested (type (/\), (/\))
import Effect.Class (class MonadEffect, liftEffect)
import Pipes (await, yield)
import Pipes.Core (Pipe)
import Pipes as Pipes
import Pipes.Core (Pipe, Producer)
import Pipes.Internal (Proxy(..))
-- | Re-yield all `Just`s, and close when `Nothing` is encountered
whileJust :: forall m a. MonadRec m => Pipe (Maybe a) a m Unit
whileJust = do
first <- await
flip tailRecM first $ \ma -> fromMaybe (Done unit) <$> runMaybeT do
a <- MaybeT $ pure ma
lift $ yield a
lift $ Loop <$> await
-- | Yields a separator value `sep` between received values
-- |
-- | ```purescript
@@ -33,7 +45,7 @@ intersperse sep = do
getIsFirst = liftEffect $ liftST $ STRef.read isFirstST
markNotFirst = void $ liftEffect $ liftST $ STRef.write false isFirstST
whileJust $ runMaybeT do
Rec.whileJust $ runMaybeT do
a <- MaybeT await
isFirst <- getIsFirst
if isFirst then markNotFirst else lift $ yield $ Just sep
@@ -41,6 +53,16 @@ intersperse sep = do
yield Nothing
-- Pair every emitted value from 2 producers together, exiting when either exits.
zip :: forall a b m. MonadRec m => Producer a m Unit -> Producer b m Unit -> Producer (a /\ b) m Unit
zip as bs =
flip tailRecM (as /\ bs) \(as' /\ bs') ->
fromMaybe (Done unit) <$> runMaybeT do
a /\ as'' <- MaybeT $ lift $ hush <$> Pipes.next as'
b /\ bs'' <- MaybeT $ lift $ hush <$> Pipes.next bs'
lift $ yield $ a /\ b
pure $ Loop $ as'' /\ bs''
-- | Accumulate values in chunks of a given size.
-- |
-- | If the pipe closes without yielding a multiple of `size` elements,
@@ -60,7 +82,7 @@ chunked size = do
void $ flip STRef.write chunkST =<< Array.ST.new
Array.ST.unsafeFreeze chunkArray
whileJust $ runMaybeT do
Rec.whileJust $ runMaybeT do
a <- MaybeT await
chunkPut a
len <- chunkLength

View File

@@ -9,12 +9,14 @@ import Test.Pipes.Node.Stream as Test.Pipes.Node.Stream
import Test.Pipes.Node.Buffer as Test.Pipes.Node.Buffer
import Test.Pipes.Node.FS as Test.Pipes.Node.FS
import Test.Pipes.Collect as Test.Pipes.Collect
import Test.Pipes.Construct as Test.Pipes.Construct
import Test.Spec.Reporter (specReporter)
import Test.Spec.Runner (defaultConfig, runSpec')
main :: Effect Unit
main = launchAff_ $ runSpec' (defaultConfig { failFast = true, timeout = Nothing }) [ specReporter ] do
main = launchAff_ $ runSpec' (defaultConfig { exit = false, timeout = Nothing }) [ specReporter ] do
Test.Pipes.Node.Stream.spec
Test.Pipes.Node.Buffer.spec
Test.Pipes.Node.FS.spec
Test.Pipes.Collect.spec
Test.Pipes.Construct.spec

View File

@@ -0,0 +1,58 @@
module Test.Pipes.Construct where
import Prelude
import Data.Array as Array
import Data.List as List
import Data.Map as Map
import Data.Tuple.Nested (type (/\), (/\))
import Effect.Class (liftEffect)
import Pipes.Collect as Pipes.Collect
import Pipes.Construct as Pipes.Construct
import Test.Spec (Spec, describe, it)
import Test.Spec.Assertions (shouldEqual)
spec :: Spec Unit
spec =
describe "Test.Pipes.Construct" do
describe "eachMap" do
it "empty map" do
kvs <- Pipes.Collect.toArray $ Pipes.Construct.eachMap Map.empty
kvs `shouldEqual` ([] :: Array (Int /\ Int))
it "nonempty map" do
let
exp = (\n -> n /\ n) <$> Array.range 0 99999
map = Map.fromFoldable exp
kvs <-
liftEffect
$ Pipes.Collect.toArray
$ Pipes.Construct.eachMap
$ map
kvs `shouldEqual` exp
describe "eachArray" do
it "empty array" do
kvs <- Pipes.Collect.toArray $ Pipes.Construct.eachArray []
kvs `shouldEqual` ([] :: Array Int)
it "nonempty array" do
let
inp = (\n -> n /\ n) <$> Array.range 0 99999
kvs <-
liftEffect
$ Pipes.Collect.toArray
$ Pipes.Construct.eachArray
$ inp
kvs `shouldEqual` inp
describe "eachList" do
it "empty list" do
kvs <- Pipes.Collect.toArray $ Pipes.Construct.eachList List.Nil
kvs `shouldEqual` ([] :: Array Int)
it "nonempty list" do
let
inp = (\n -> n /\ n) <$> Array.range 0 99999
kvs <-
liftEffect
$ Pipes.Collect.toArray
$ Pipes.Construct.eachList
$ List.fromFoldable
$ inp
kvs `shouldEqual` inp

View File

@@ -34,7 +34,7 @@ spec = describe "Pipes.Node.FS" do
s <- fold <$> Pipes.toListM (Pipes.Node.FS.read p >-> unEOS >-> Pipes.Node.Buffer.toString UTF8)
s `shouldEqual` "foo"
around tmpFile $ it "fails if the file already exists" \p -> do
liftEffect $ FS.writeTextFile UTF8 "foo" p
liftEffect $ FS.writeTextFile UTF8 p "foo"
flip catchError (const $ pure unit) do
Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.create p
fail "should have thrown"
@@ -44,7 +44,7 @@ spec = describe "Pipes.Node.FS" do
contents <- liftEffect $ FS.readTextFile UTF8 p
contents `shouldEqual` "foo"
around tmpFile $ it "fails if the file already exists" \p -> do
liftEffect $ FS.writeTextFile UTF8 "foo" p
liftEffect $ FS.writeTextFile UTF8 p "foo"
flip catchError (const $ pure unit) do
Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.create p
fail "should have thrown"