22 Commits

Author SHA1 Message Date
1b6e6423b1 chore: prepare v1.4.1 2024-05-14 15:02:39 -05:00
657af14bb6 fix: await finished before exiting 2024-05-14 14:07:20 -05:00
33d42034fc fix: continue to read after readableEnded 2024-05-14 13:51:57 -05:00
c8822aeffe fix: I WAS INCORRECT HHEHE 2024-05-14 13:44:33 -05:00
7076b13df4 fix: catch incorrect stream implementations 2024-05-14 13:42:39 -05:00
3f4bc12d36 chore: prepare v1.4.0 2024-05-14 13:12:56 -05:00
fd53b6520f feat: Collect.toBuffer 2024-05-14 13:12:41 -05:00
0ef7240d61 wip: explore removing delays(10) 2024-05-14 12:44:31 -05:00
de22f44f86 wip: explore removing delays(9) 2024-05-14 12:43:38 -05:00
f9c0e20777 wip: explore removing delays(8) 2024-05-14 11:08:46 -05:00
edc7d40dbc wip: explore removing delays(7) 2024-05-14 11:04:50 -05:00
ba8d90038d wip: explore removing delays(6) 2024-05-14 10:55:14 -05:00
dfdca9f5e9 wip: explore removing delays(5) 2024-05-14 10:39:06 -05:00
67ae171532 wip: explore removing delays(4) 2024-05-13 21:18:27 -05:00
a347c05062 wip: explore removing delays(3) 2024-05-13 21:17:28 -05:00
f9446c97a0 wip: explore removing delays(2) 2024-05-13 21:15:45 -05:00
d3b8d1792d wip: explore removing delays 2024-05-13 21:06:41 -05:00
e05c74f42f fix: minor fixes 2024-05-13 15:04:34 -05:00
e1c2481e70 chore: prepare v1.3.3 2024-05-13 14:42:48 -05:00
820351f800 fix: more yields 2024-05-13 14:42:23 -05:00
9d8b500b8d chore: prepare v1.3.2 2024-05-13 14:35:55 -05:00
b7bead090e fix: transform should read more than just 1 chunk after writing 2024-05-13 14:35:44 -05:00
8 changed files with 118 additions and 67 deletions

View File

@@ -1,6 +1,6 @@
{
"name": "purescript-node-stream-pipes",
"version": "v1.3.1",
"version": "v1.4.1",
"type": "module",
"dependencies": {
"csv-parse": "^5.5.5",

View File

@@ -1,7 +1,7 @@
package:
name: node-stream-pipes
publish:
version: '1.3.1'
version: '1.4.1'
license: 'GPL-3.0-or-later'
location:
githubOwner: 'cakekindel'

View File

@@ -3,12 +3,18 @@ import Stream from "stream";
/** @type {(s: Stream.Readable | Stream.Transform) => () => boolean} */
export const isReadableImpl = (s) => () => s.readable;
/** @type {(s: Stream.Readable | Stream.Transform) => () => number} */
export const readableLengthImpl = (s) => () => s.readableLength;
/** @type {(s: Stream.Writable | Stream.Readable) => () => boolean} */
export const isClosedImpl = (s) => () => s.closed;
/** @type {(s: Stream.Writable | Stream.Transform) => () => boolean} */
export const isWritableImpl = (s) => () => s.writable;
/** @type {(s: Stream.Writable | Stream.Transform) => () => boolean} */
export const needsDrainImpl = (s) => () => s.writableNeedDrain;
/** @type {(s: Stream.Readable | Stream.Transform) => () => boolean} */
export const isReadableEndedImpl = (s) => () => s.readableEnded;
@@ -18,16 +24,12 @@ export const isWritableEndedImpl = (s) => () => s.writableEnded;
/** @type {(s: Stream.Writable | Stream.Transform) => () => void} */
export const endImpl = (s) => () => s.end();
/** @type {<WriteResult>(o: {ok: WriteResult, wouldBlock: WriteResult, closed: WriteResult}) => (s: Stream.Writable | Stream.Transform) => (a: unknown) => () => WriteResult} */
/** @type {<WriteResult>(o: {ok: WriteResult, wouldBlock: WriteResult}) => (s: Stream.Writable | Stream.Transform) => (a: unknown) => () => WriteResult} */
export const writeImpl =
({ ok, wouldBlock, closed }) =>
({ ok, wouldBlock }) =>
(s) =>
(a) =>
() => {
if (s.closed || s.writableEnded) {
return closed;
}
if (s.write(a)) {
return ok;
} else {
@@ -35,15 +37,11 @@ export const writeImpl =
}
};
/** @type {<ReadResult>(o: {just: (_a: unknown) => ReadResult, wouldBlock: ReadResult, closed: ReadResult}) => (s: Stream.Readable | Stream.Transform) => () => ReadResult} */
/** @type {<ReadResult>(o: {just: (_a: unknown) => ReadResult, wouldBlock: ReadResult}) => (s: Stream.Readable | Stream.Transform) => () => ReadResult} */
export const readImpl =
({ just, closed, wouldBlock }) =>
({ just, wouldBlock }) =>
(s) =>
() => {
if (s.closed || s.readableEnded) {
return closed;
}
const a = s.read();
if (a === null) {
return wouldBlock;

View File

@@ -26,7 +26,6 @@ import Unsafe.Coerce (unsafeCoerce)
data ReadResult a
= ReadWouldBlock
| ReadClosed
| ReadJust a
derive instance Generic (ReadResult a) _
@@ -37,7 +36,6 @@ instance Show (ReadResult a) where
data WriteResult
= WriteWouldBlock
| WriteClosed
| WriteOk
derive instance Generic WriteResult _
@@ -45,8 +43,8 @@ derive instance Eq WriteResult
instance Show WriteResult where
show = genericShow
type ReadResultFFI a = { closed :: ReadResult a, wouldBlock :: ReadResult a, just :: a -> ReadResult a }
type WriteResultFFI = { closed :: WriteResult, wouldBlock :: WriteResult, ok :: WriteResult }
type ReadResultFFI a = { wouldBlock :: ReadResult a, just :: a -> ReadResult a }
type WriteResultFFI = { wouldBlock :: WriteResult, ok :: WriteResult }
foreign import data Writable :: Type -> Type
foreign import data Readable :: Type -> Type
@@ -60,12 +58,14 @@ foreign import isWritableImpl :: forall s. s -> Effect Boolean
foreign import isReadableEndedImpl :: forall s. s -> Effect Boolean
foreign import isWritableEndedImpl :: forall s. s -> Effect Boolean
foreign import isClosedImpl :: forall s. s -> Effect Boolean
foreign import needsDrainImpl :: forall s. s -> Effect Boolean
foreign import readableLengthImpl :: forall s. s -> Effect Int
readResultFFI :: forall a. ReadResultFFI a
readResultFFI = { closed: ReadClosed, wouldBlock: ReadWouldBlock, just: ReadJust }
readResultFFI = { wouldBlock: ReadWouldBlock, just: ReadJust }
writeResultFFI :: WriteResultFFI
writeResultFFI = { closed: WriteClosed, wouldBlock: WriteWouldBlock, ok: WriteOk }
writeResultFFI = { wouldBlock: WriteWouldBlock, ok: WriteOk }
class Stream :: Type -> Constraint
class Stream s where
@@ -81,25 +81,30 @@ else instance Stream s => Stream s where
isClosed s = isClosed s
class Stream s <= Read s a | s -> a where
readableLength :: s -> Effect Int
isReadable :: s -> Effect Boolean
isReadableEnded :: s -> Effect Boolean
read :: s -> Effect (ReadResult a)
class Stream s <= Write s a | s -> a where
isWritable :: s -> Effect Boolean
needsDrain :: s -> Effect Boolean
isWritableEnded :: s -> Effect Boolean
write :: s -> a -> Effect WriteResult
end :: s -> Effect Unit
instance Read (Readable a) a where
readableLength = readableLengthImpl
isReadable = isReadableImpl
isReadableEnded = isReadableEndedImpl
read = readImpl readResultFFI
else instance Read (Transform a b) b where
readableLength = readableLengthImpl
isReadable = isReadableImpl
isReadableEnded = isReadableEndedImpl
read = readImpl readResultFFI
else instance (Read s a) => Read s a where
readableLength = readableLengthImpl
isReadable = isReadableImpl
isReadableEnded = isReadableEndedImpl
read s = read s
@@ -109,16 +114,19 @@ instance Write (Writable a) a where
isWritableEnded = isWritableEndedImpl
write s = writeImpl writeResultFFI s
end = endImpl
needsDrain = needsDrainImpl
else instance Write (Transform a b) a where
isWritable = isWritableImpl
isWritableEnded = isWritableEndedImpl
write s = writeImpl writeResultFFI s
end = endImpl
needsDrain = needsDrainImpl
else instance (Write s a) => Write s a where
isWritable = isWritableImpl
isWritableEnded = isWritableEndedImpl
write s a = write s a
end s = end s
needsDrain = needsDrainImpl
withErrorST :: forall s. Stream s => s -> Effect { cancel :: Effect Unit, error :: STRef Global (Maybe Error) }
withErrorST s = do
@@ -146,21 +154,24 @@ fromStringWritable = unsafeCoerce
awaitReadableOrClosed :: forall s a. Read s a => s -> Aff Unit
awaitReadableOrClosed s = do
closed <- liftEffect $ isClosed s
ended <- liftEffect $ isReadableEnded s
readable <- liftEffect $ isReadable s
when (not ended && not closed && not readable)
$ liftEither =<< parOneOf [ onceAff0 readableH s $> Right unit, onceAff0 closeH s $> Right unit, Left <$> onceAff1 errorH s ]
length <- liftEffect $ readableLength s
when (readable && length == 0)
$ liftEither
=<< parOneOf
[ onceAff0 readableH s $> Right unit
, onceAff0 closeH s $> Right unit
, Left <$> onceAff1 errorH s
]
awaitFinished :: forall s a. Write s a => s -> Aff Unit
awaitFinished s = onceAff0 finishH s
awaitWritableOrClosed :: forall s a. Write s a => s -> Aff Unit
awaitWritableOrClosed s = do
closed <- liftEffect $ isClosed s
ended <- liftEffect $ isWritableEnded s
writable <- liftEffect $ isWritable s
when (not ended && not closed && not writable)
needsDrain <- liftEffect $ needsDrain s
when (writable && needsDrain)
$ liftEither =<< parOneOf [ onceAff0 drainH s $> Right unit, onceAff0 closeH s $> Right unit, Left <$> onceAff1 errorH s ]
onceAff0 :: forall e. EventHandle0 e -> e -> Aff Unit

View File

@@ -1 +0,0 @@
module Pipes.CSV.Parse where

View File

@@ -12,11 +12,14 @@ import Data.List (List)
import Data.List as List
import Data.Map (Map)
import Data.Map as Map
import Data.Maybe (Maybe(..), maybe)
import Data.Tuple.Nested (type (/\), (/\))
import Effect.Class (class MonadEffect, liftEffect)
import Foreign.Object (Object)
import Foreign.Object.ST as Object.ST
import Foreign.Object.ST.Unsafe as Object.ST.Unsafe
import Node.Buffer (Buffer)
import Node.Buffer as Buffer
import Pipes.Core (Producer)
import Pipes.Internal (Proxy(..))
@@ -27,8 +30,12 @@ traverse :: forall a b m. MonadRec m => (b -> a -> m b) -> b -> Producer a m Uni
traverse f b0 p0 =
flip tailRecM (p0 /\ b0) \(p /\ b) ->
case p of
Respond a m -> Loop <$> (m unit /\ _) <$> f b a
M m -> Loop <$> (_ /\ b) <$> m
Respond a m -> do
b' <- f b a
pure $ Loop $ m unit /\ b'
M m -> do
n <- m
pure $ Loop $ (n /\ b)
Request _ _ -> pure $ Done b
Pure _ -> pure $ Done b
@@ -44,6 +51,21 @@ fold f b0 p0 = traverse (\b a -> pure $ f b a) b0 p0
foreach :: forall a m. MonadRec m => (a -> m Unit) -> Producer a m Unit -> m Unit
foreach f p0 = traverse (\_ a -> f a) unit p0
-- | Concatenate all produced buffers
toBuffer :: forall m. MonadRec m => MonadEffect m => Producer Buffer m Unit -> m Buffer
toBuffer p =
(liftEffect <<< maybe (Buffer.alloc 0) pure)
=<< traverse
( flip \b ->
case _ of
Just acc -> do
new <- liftEffect $ Buffer.concat [ acc, b ]
pure $ Just new
_ -> pure $ Just b
)
Nothing
p
-- | Collect all values from a `Producer` into an array.
toArray :: forall a m. MonadRec m => MonadEffect m => Producer a m Unit -> m (Array a)
toArray p = do

View File

@@ -1,6 +1,6 @@
module Pipes.Node.Stream where
import Prelude
import Prelude hiding (join)
import Control.Monad.Error.Class (class MonadThrow, throwError)
import Control.Monad.Rec.Class (class MonadRec, Step(..), tailRecM)
@@ -34,15 +34,19 @@ fromReadable r =
pure $ Done unit
go { error, cancel } = do
liftAff $ delay $ wrap 0.0
err <- liftEffect $ liftST $ STRef.read error
for_ err throwError
res <- liftEffect $ O.read r
case res of
O.ReadJust a -> yield (Just a) $> Loop { error, cancel }
O.ReadWouldBlock -> liftAff (O.awaitReadableOrClosed r) $> Loop { error, cancel }
O.ReadClosed -> yield Nothing *> cleanup cancel
O.ReadWouldBlock -> do
ended <- liftEffect $ O.isReadableEnded r
if ended then do
yield Nothing
cleanup cancel
else
liftAff (O.awaitReadableOrClosed r) $> Loop { error, cancel }
in
do
e <- liftEffect $ O.withErrorST r
@@ -62,21 +66,21 @@ fromWritable w =
pure $ Done unit
go { error, cancel } = do
liftAff $ delay $ wrap 0.0
err <- liftEffect $ liftST $ STRef.read error
for_ err throwError
ma <- await
case ma of
Nothing -> cleanup cancel
Just a -> do
res <- liftEffect $ O.write w a
case res of
O.WriteOk -> pure $ Loop { error, cancel }
O.WriteWouldBlock -> do
liftAff (O.awaitWritableOrClosed w)
pure $ Loop { error, cancel }
O.WriteClosed -> cleanup cancel
needsDrain <- liftEffect $ O.needsDrain w
when needsDrain $ liftAff $ O.awaitWritableOrClosed w
ended <- liftEffect $ O.isWritableEnded w
if ended then
cleanup cancel
else
await >>= case _ of
Nothing -> cleanup cancel
Just a -> do
void $ liftEffect $ O.write w a
pure $ Loop { error, cancel }
in
do
r <- liftEffect $ O.withErrorST w
@@ -91,32 +95,48 @@ fromTransform t =
let
cleanup removeErrorListener = do
liftEffect $ O.end t
liftEffect $ removeErrorListener
liftAff $ O.awaitFinished t
fromReadable t
liftEffect $ removeErrorListener
pure $ Done unit
yieldFromReadableHalf = do
res <- liftEffect (O.read t)
yieldWhileReadable = do
flip tailRecM unit \_ -> do
res <- liftEffect $ O.read t
case res of
O.ReadJust a -> yield (Just a) $> Loop unit
_ -> pure $ Done unit
maybeYield1 = do
res <- liftEffect $ O.read t
case res of
O.ReadJust a -> yield (Just a)
O.ReadWouldBlock -> pure unit
O.ReadClosed -> yield Nothing *> pure unit
O.ReadJust a -> yield $ Just a
_ -> pure unit
go { error, cancel } = do
liftAff $ delay $ wrap 0.0
err <- liftEffect $ liftST $ STRef.read error
for_ err throwError
ma <- await
case ma of
Nothing -> cleanup cancel
Just a' -> do
res <- liftEffect $ O.write t a'
yieldFromReadableHalf
case res of
O.WriteClosed -> cleanup cancel
O.WriteOk -> pure $ Loop { error, cancel }
O.WriteWouldBlock -> do
liftAff $ O.awaitWritableOrClosed t
pure $ Loop { error, cancel }
needsDrain <- liftEffect $ O.needsDrain t
ended <- liftEffect $ O.isWritableEnded t
if needsDrain then do
liftAff $ delay $ wrap 0.0
yieldWhileReadable
pure $ Loop { error, cancel }
else if ended then
cleanup cancel
else
await >>= case _ of
Nothing -> cleanup cancel
Just a' -> do
res <- liftEffect $ O.write t a'
case res of
O.WriteOk -> do
maybeYield1
pure $ Loop { error, cancel }
O.WriteWouldBlock -> do
yieldWhileReadable
pure $ Loop { error, cancel }
in
do
r <- liftEffect $ O.withErrorST t

View File

@@ -85,9 +85,10 @@ chunked size = do
Rec.whileJust $ runMaybeT do
a <- MaybeT await
chunkPut a
len <- chunkLength
len <- lift chunkLength
when (len >= size) $ lift $ yield =<< Just <$> chunkTake
yield =<< Just <$> chunkTake
len <- chunkLength
when (len > 0) $ yield =<< Just <$> chunkTake
yield Nothing
-- | Equivalent of unix `uniq`, filtering out duplicate values passed to it.