Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
1b6e6423b1
|
|||
|
657af14bb6
|
|||
|
33d42034fc
|
|||
|
c8822aeffe
|
|||
|
7076b13df4
|
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "purescript-node-stream-pipes",
|
||||
"version": "v1.4.0",
|
||||
"version": "v1.4.1",
|
||||
"type": "module",
|
||||
"dependencies": {
|
||||
"csv-parse": "^5.5.5",
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
package:
|
||||
name: node-stream-pipes
|
||||
publish:
|
||||
version: '1.4.0'
|
||||
version: '1.4.1'
|
||||
license: 'GPL-3.0-or-later'
|
||||
location:
|
||||
githubOwner: 'cakekindel'
|
||||
|
||||
@@ -24,16 +24,12 @@ export const isWritableEndedImpl = (s) => () => s.writableEnded;
|
||||
/** @type {(s: Stream.Writable | Stream.Transform) => () => void} */
|
||||
export const endImpl = (s) => () => s.end();
|
||||
|
||||
/** @type {<WriteResult>(o: {ok: WriteResult, wouldBlock: WriteResult, closed: WriteResult}) => (s: Stream.Writable | Stream.Transform) => (a: unknown) => () => WriteResult} */
|
||||
/** @type {<WriteResult>(o: {ok: WriteResult, wouldBlock: WriteResult}) => (s: Stream.Writable | Stream.Transform) => (a: unknown) => () => WriteResult} */
|
||||
export const writeImpl =
|
||||
({ ok, wouldBlock, closed }) =>
|
||||
({ ok, wouldBlock }) =>
|
||||
(s) =>
|
||||
(a) =>
|
||||
() => {
|
||||
if (s.closed || s.writableEnded) {
|
||||
return closed;
|
||||
}
|
||||
|
||||
if (s.write(a)) {
|
||||
return ok;
|
||||
} else {
|
||||
@@ -41,15 +37,11 @@ export const writeImpl =
|
||||
}
|
||||
};
|
||||
|
||||
/** @type {<ReadResult>(o: {just: (_a: unknown) => ReadResult, wouldBlock: ReadResult, closed: ReadResult}) => (s: Stream.Readable | Stream.Transform) => () => ReadResult} */
|
||||
/** @type {<ReadResult>(o: {just: (_a: unknown) => ReadResult, wouldBlock: ReadResult}) => (s: Stream.Readable | Stream.Transform) => () => ReadResult} */
|
||||
export const readImpl =
|
||||
({ just, closed, wouldBlock }) =>
|
||||
({ just, wouldBlock }) =>
|
||||
(s) =>
|
||||
() => {
|
||||
if (s.closed || s.readableEnded) {
|
||||
return closed;
|
||||
}
|
||||
|
||||
const a = s.read();
|
||||
if (a === null) {
|
||||
return wouldBlock;
|
||||
|
||||
@@ -26,7 +26,6 @@ import Unsafe.Coerce (unsafeCoerce)
|
||||
|
||||
data ReadResult a
|
||||
= ReadWouldBlock
|
||||
| ReadClosed
|
||||
| ReadJust a
|
||||
|
||||
derive instance Generic (ReadResult a) _
|
||||
@@ -37,7 +36,6 @@ instance Show (ReadResult a) where
|
||||
|
||||
data WriteResult
|
||||
= WriteWouldBlock
|
||||
| WriteClosed
|
||||
| WriteOk
|
||||
|
||||
derive instance Generic WriteResult _
|
||||
@@ -45,8 +43,8 @@ derive instance Eq WriteResult
|
||||
instance Show WriteResult where
|
||||
show = genericShow
|
||||
|
||||
type ReadResultFFI a = { closed :: ReadResult a, wouldBlock :: ReadResult a, just :: a -> ReadResult a }
|
||||
type WriteResultFFI = { closed :: WriteResult, wouldBlock :: WriteResult, ok :: WriteResult }
|
||||
type ReadResultFFI a = { wouldBlock :: ReadResult a, just :: a -> ReadResult a }
|
||||
type WriteResultFFI = { wouldBlock :: WriteResult, ok :: WriteResult }
|
||||
|
||||
foreign import data Writable :: Type -> Type
|
||||
foreign import data Readable :: Type -> Type
|
||||
@@ -64,10 +62,10 @@ foreign import needsDrainImpl :: forall s. s -> Effect Boolean
|
||||
foreign import readableLengthImpl :: forall s. s -> Effect Int
|
||||
|
||||
readResultFFI :: forall a. ReadResultFFI a
|
||||
readResultFFI = { closed: ReadClosed, wouldBlock: ReadWouldBlock, just: ReadJust }
|
||||
readResultFFI = { wouldBlock: ReadWouldBlock, just: ReadJust }
|
||||
|
||||
writeResultFFI :: WriteResultFFI
|
||||
writeResultFFI = { closed: WriteClosed, wouldBlock: WriteWouldBlock, ok: WriteOk }
|
||||
writeResultFFI = { wouldBlock: WriteWouldBlock, ok: WriteOk }
|
||||
|
||||
class Stream :: Type -> Constraint
|
||||
class Stream s where
|
||||
|
||||
@@ -40,8 +40,13 @@ fromReadable r =
|
||||
res <- liftEffect $ O.read r
|
||||
case res of
|
||||
O.ReadJust a -> yield (Just a) $> Loop { error, cancel }
|
||||
O.ReadWouldBlock -> liftAff (O.awaitReadableOrClosed r) $> Loop { error, cancel }
|
||||
O.ReadClosed -> yield Nothing *> cleanup cancel
|
||||
O.ReadWouldBlock -> do
|
||||
ended <- liftEffect $ O.isReadableEnded r
|
||||
if ended then do
|
||||
yield Nothing
|
||||
cleanup cancel
|
||||
else
|
||||
liftAff (O.awaitReadableOrClosed r) $> Loop { error, cancel }
|
||||
in
|
||||
do
|
||||
e <- liftEffect $ O.withErrorST r
|
||||
@@ -66,14 +71,16 @@ fromWritable w =
|
||||
|
||||
needsDrain <- liftEffect $ O.needsDrain w
|
||||
when needsDrain $ liftAff $ O.awaitWritableOrClosed w
|
||||
ma <- await
|
||||
case ma of
|
||||
Nothing -> cleanup cancel
|
||||
Just a -> do
|
||||
res <- liftEffect $ O.write w a
|
||||
case res of
|
||||
O.WriteClosed -> cleanup cancel
|
||||
_ -> pure $ Loop { error, cancel }
|
||||
|
||||
ended <- liftEffect $ O.isWritableEnded w
|
||||
if ended then
|
||||
cleanup cancel
|
||||
else
|
||||
await >>= case _ of
|
||||
Nothing -> cleanup cancel
|
||||
Just a -> do
|
||||
void $ liftEffect $ O.write w a
|
||||
pure $ Loop { error, cancel }
|
||||
in
|
||||
do
|
||||
r <- liftEffect $ O.withErrorST w
|
||||
@@ -88,8 +95,9 @@ fromTransform t =
|
||||
let
|
||||
cleanup removeErrorListener = do
|
||||
liftEffect $ O.end t
|
||||
liftEffect $ removeErrorListener
|
||||
liftAff $ O.awaitFinished t
|
||||
fromReadable t
|
||||
liftEffect $ removeErrorListener
|
||||
pure $ Done unit
|
||||
|
||||
yieldWhileReadable = do
|
||||
@@ -110,18 +118,19 @@ fromTransform t =
|
||||
for_ err throwError
|
||||
|
||||
needsDrain <- liftEffect $ O.needsDrain t
|
||||
ended <- liftEffect $ O.isWritableEnded t
|
||||
if needsDrain then do
|
||||
liftAff $ delay $ wrap 0.0
|
||||
yieldWhileReadable
|
||||
pure $ Loop { error, cancel }
|
||||
else do
|
||||
ma <- await
|
||||
case ma of
|
||||
else if ended then
|
||||
cleanup cancel
|
||||
else
|
||||
await >>= case _ of
|
||||
Nothing -> cleanup cancel
|
||||
Just a' -> do
|
||||
res <- liftEffect $ O.write t a'
|
||||
case res of
|
||||
O.WriteClosed -> cleanup cancel
|
||||
O.WriteOk -> do
|
||||
maybeYield1
|
||||
pure $ Loop { error, cancel }
|
||||
|
||||
Reference in New Issue
Block a user