15 Commits

8 changed files with 91 additions and 39 deletions

View File

@@ -1,6 +1,6 @@
{
"name": "purescript-node-stream-pipes",
"version": "v1.3.2",
"version": "v1.4.0",
"type": "module",
"dependencies": {
"csv-parse": "^5.5.5",

View File

@@ -1,7 +1,7 @@
package:
name: node-stream-pipes
publish:
version: '1.3.2'
version: '1.4.0'
license: 'GPL-3.0-or-later'
location:
githubOwner: 'cakekindel'

View File

@@ -3,12 +3,18 @@ import Stream from "stream";
/** @type {(s: Stream.Readable | Stream.Transform) => () => boolean} */
export const isReadableImpl = (s) => () => s.readable;
/** @type {(s: Stream.Readable | Stream.Transform) => () => number} */
export const readableLengthImpl = (s) => () => s.readableLength;
/** @type {(s: Stream.Writable | Stream.Readable) => () => boolean} */
export const isClosedImpl = (s) => () => s.closed;
/** @type {(s: Stream.Writable | Stream.Transform) => () => boolean} */
export const isWritableImpl = (s) => () => s.writable;
/** @type {(s: Stream.Writable | Stream.Transform) => () => boolean} */
export const needsDrainImpl = (s) => () => s.writableNeedDrain;
/** @type {(s: Stream.Readable | Stream.Transform) => () => boolean} */
export const isReadableEndedImpl = (s) => () => s.readableEnded;

View File

@@ -60,6 +60,8 @@ foreign import isWritableImpl :: forall s. s -> Effect Boolean
foreign import isReadableEndedImpl :: forall s. s -> Effect Boolean
foreign import isWritableEndedImpl :: forall s. s -> Effect Boolean
foreign import isClosedImpl :: forall s. s -> Effect Boolean
foreign import needsDrainImpl :: forall s. s -> Effect Boolean
foreign import readableLengthImpl :: forall s. s -> Effect Int
readResultFFI :: forall a. ReadResultFFI a
readResultFFI = { closed: ReadClosed, wouldBlock: ReadWouldBlock, just: ReadJust }
@@ -81,25 +83,30 @@ else instance Stream s => Stream s where
isClosed s = isClosed s
class Stream s <= Read s a | s -> a where
readableLength :: s -> Effect Int
isReadable :: s -> Effect Boolean
isReadableEnded :: s -> Effect Boolean
read :: s -> Effect (ReadResult a)
class Stream s <= Write s a | s -> a where
isWritable :: s -> Effect Boolean
needsDrain :: s -> Effect Boolean
isWritableEnded :: s -> Effect Boolean
write :: s -> a -> Effect WriteResult
end :: s -> Effect Unit
instance Read (Readable a) a where
readableLength = readableLengthImpl
isReadable = isReadableImpl
isReadableEnded = isReadableEndedImpl
read = readImpl readResultFFI
else instance Read (Transform a b) b where
readableLength = readableLengthImpl
isReadable = isReadableImpl
isReadableEnded = isReadableEndedImpl
read = readImpl readResultFFI
else instance (Read s a) => Read s a where
readableLength = readableLengthImpl
isReadable = isReadableImpl
isReadableEnded = isReadableEndedImpl
read s = read s
@@ -109,16 +116,19 @@ instance Write (Writable a) a where
isWritableEnded = isWritableEndedImpl
write s = writeImpl writeResultFFI s
end = endImpl
needsDrain = needsDrainImpl
else instance Write (Transform a b) a where
isWritable = isWritableImpl
isWritableEnded = isWritableEndedImpl
write s = writeImpl writeResultFFI s
end = endImpl
needsDrain = needsDrainImpl
else instance (Write s a) => Write s a where
isWritable = isWritableImpl
isWritableEnded = isWritableEndedImpl
write s a = write s a
end s = end s
needsDrain = needsDrainImpl
withErrorST :: forall s. Stream s => s -> Effect { cancel :: Effect Unit, error :: STRef Global (Maybe Error) }
withErrorST s = do
@@ -146,21 +156,24 @@ fromStringWritable = unsafeCoerce
awaitReadableOrClosed :: forall s a. Read s a => s -> Aff Unit
awaitReadableOrClosed s = do
closed <- liftEffect $ isClosed s
ended <- liftEffect $ isReadableEnded s
readable <- liftEffect $ isReadable s
when (not ended && not closed && not readable)
$ liftEither =<< parOneOf [ onceAff0 readableH s $> Right unit, onceAff0 closeH s $> Right unit, Left <$> onceAff1 errorH s ]
length <- liftEffect $ readableLength s
when (readable && length == 0)
$ liftEither
=<< parOneOf
[ onceAff0 readableH s $> Right unit
, onceAff0 closeH s $> Right unit
, Left <$> onceAff1 errorH s
]
awaitFinished :: forall s a. Write s a => s -> Aff Unit
awaitFinished s = onceAff0 finishH s
awaitWritableOrClosed :: forall s a. Write s a => s -> Aff Unit
awaitWritableOrClosed s = do
closed <- liftEffect $ isClosed s
ended <- liftEffect $ isWritableEnded s
writable <- liftEffect $ isWritable s
when (not ended && not closed && not writable)
needsDrain <- liftEffect $ needsDrain s
when (writable && needsDrain)
$ liftEither =<< parOneOf [ onceAff0 drainH s $> Right unit, onceAff0 closeH s $> Right unit, Left <$> onceAff1 errorH s ]
onceAff0 :: forall e. EventHandle0 e -> e -> Aff Unit

View File

@@ -1 +0,0 @@
module Pipes.CSV.Parse where

View File

@@ -12,11 +12,14 @@ import Data.List (List)
import Data.List as List
import Data.Map (Map)
import Data.Map as Map
import Data.Maybe (Maybe(..), maybe)
import Data.Tuple.Nested (type (/\), (/\))
import Effect.Class (class MonadEffect, liftEffect)
import Foreign.Object (Object)
import Foreign.Object.ST as Object.ST
import Foreign.Object.ST.Unsafe as Object.ST.Unsafe
import Node.Buffer (Buffer)
import Node.Buffer as Buffer
import Pipes.Core (Producer)
import Pipes.Internal (Proxy(..))
@@ -27,8 +30,12 @@ traverse :: forall a b m. MonadRec m => (b -> a -> m b) -> b -> Producer a m Uni
traverse f b0 p0 =
flip tailRecM (p0 /\ b0) \(p /\ b) ->
case p of
Respond a m -> Loop <$> (m unit /\ _) <$> f b a
M m -> Loop <$> (_ /\ b) <$> m
Respond a m -> do
b' <- f b a
pure $ Loop $ m unit /\ b'
M m -> do
n <- m
pure $ Loop $ (n /\ b)
Request _ _ -> pure $ Done b
Pure _ -> pure $ Done b
@@ -44,6 +51,21 @@ fold f b0 p0 = traverse (\b a -> pure $ f b a) b0 p0
foreach :: forall a m. MonadRec m => (a -> m Unit) -> Producer a m Unit -> m Unit
foreach f p0 = traverse (\_ a -> f a) unit p0
-- | Concatenate all produced buffers
toBuffer :: forall m. MonadRec m => MonadEffect m => Producer Buffer m Unit -> m Buffer
toBuffer p =
(liftEffect <<< maybe (Buffer.alloc 0) pure)
=<< traverse
( flip \b ->
case _ of
Just acc -> do
new <- liftEffect $ Buffer.concat [ acc, b ]
pure $ Just new
_ -> pure $ Just b
)
Nothing
p
-- | Collect all values from a `Producer` into an array.
toArray :: forall a m. MonadRec m => MonadEffect m => Producer a m Unit -> m (Array a)
toArray p = do

View File

@@ -1,6 +1,6 @@
module Pipes.Node.Stream where
import Prelude
import Prelude hiding (join)
import Control.Monad.Error.Class (class MonadThrow, throwError)
import Control.Monad.Rec.Class (class MonadRec, Step(..), tailRecM)
@@ -34,7 +34,6 @@ fromReadable r =
pure $ Done unit
go { error, cancel } = do
liftAff $ delay $ wrap 0.0
err <- liftEffect $ liftST $ STRef.read error
for_ err throwError
@@ -62,21 +61,19 @@ fromWritable w =
pure $ Done unit
go { error, cancel } = do
liftAff $ delay $ wrap 0.0
err <- liftEffect $ liftST $ STRef.read error
for_ err throwError
needsDrain <- liftEffect $ O.needsDrain w
when needsDrain $ liftAff $ O.awaitWritableOrClosed w
ma <- await
case ma of
Nothing -> cleanup cancel
Just a -> do
res <- liftEffect $ O.write w a
case res of
O.WriteOk -> pure $ Loop { error, cancel }
O.WriteWouldBlock -> do
liftAff (O.awaitWritableOrClosed w)
pure $ Loop { error, cancel }
O.WriteClosed -> cleanup cancel
_ -> pure $ Loop { error, cancel }
in
do
r <- liftEffect $ O.withErrorST w
@@ -94,29 +91,43 @@ fromTransform t =
liftEffect $ removeErrorListener
fromReadable t
pure $ Done unit
yieldFromReadableHalf = do
res <- liftEffect (O.read t)
yieldWhileReadable = do
flip tailRecM unit \_ -> do
res <- liftEffect $ O.read t
case res of
O.ReadJust a -> yield (Just a) $> Loop unit
_ -> pure $ Done unit
maybeYield1 = do
res <- liftEffect $ O.read t
case res of
O.ReadJust a -> yield (Just a) *> yieldFromReadableHalf
O.ReadWouldBlock -> pure unit
O.ReadClosed -> yield Nothing *> pure unit
O.ReadJust a -> yield $ Just a
_ -> pure unit
go { error, cancel } = do
liftAff $ delay $ wrap 0.0
err <- liftEffect $ liftST $ STRef.read error
for_ err throwError
ma <- await
case ma of
Nothing -> cleanup cancel
Just a' -> do
res <- liftEffect $ O.write t a'
yieldFromReadableHalf
case res of
O.WriteClosed -> cleanup cancel
O.WriteOk -> pure $ Loop { error, cancel }
O.WriteWouldBlock -> do
liftAff $ O.awaitWritableOrClosed t
pure $ Loop { error, cancel }
needsDrain <- liftEffect $ O.needsDrain t
if needsDrain then do
liftAff $ delay $ wrap 0.0
yieldWhileReadable
pure $ Loop { error, cancel }
else do
ma <- await
case ma of
Nothing -> cleanup cancel
Just a' -> do
res <- liftEffect $ O.write t a'
case res of
O.WriteClosed -> cleanup cancel
O.WriteOk -> do
maybeYield1
pure $ Loop { error, cancel }
O.WriteWouldBlock -> do
yieldWhileReadable
pure $ Loop { error, cancel }
in
do
r <- liftEffect $ O.withErrorST t

View File

@@ -85,9 +85,10 @@ chunked size = do
Rec.whileJust $ runMaybeT do
a <- MaybeT await
chunkPut a
len <- chunkLength
len <- lift chunkLength
when (len >= size) $ lift $ yield =<< Just <$> chunkTake
yield =<< Just <$> chunkTake
len <- chunkLength
when (len > 0) $ yield =<< Just <$> chunkTake
yield Nothing
-- | Equivalent of unix `uniq`, filtering out duplicate values passed to it.