2 Commits

Author SHA1 Message Date
30fbce3a2d chore: prepare v1.0.3 2024-05-10 18:03:48 -05:00
3c8e497fa2 fix: explicit tailRecM 2024-05-10 18:03:34 -05:00
4 changed files with 22 additions and 37 deletions

View File

@@ -1,6 +1,6 @@
{
"name": "purescript-csv-stream",
"version": "v1.0.2",
"version": "v1.0.3",
"dependencies": {
"csv-parse": "^5.5.5",
"csv-stringify": "^6.4.6"

View File

@@ -1,7 +1,7 @@
package:
name: node-stream-pipes
publish:
version: '1.0.2'
version: '1.0.3'
license: 'GPL-3.0-or-later'
location:
githubOwner: 'cakekindel'

View File

@@ -1,24 +0,0 @@
module Control.Monad.Cleanup where
import Prelude
import Control.Monad.Error.Class (class MonadError, liftEither, try)
import Control.Monad.State (StateT, modify_, runStateT)
import Data.Tuple.Nested ((/\))
type CleanupT m = StateT (m Unit) m
finally :: forall m. Monad m => (m Unit) -> CleanupT m Unit
finally m = modify_ (_ *> m)
runCleanup :: forall m a. Monad m => CleanupT m a -> m a
runCleanup m = do
a /\ final <- runStateT m (pure unit)
final
pure a
runCleanupE :: forall e m a. MonadError e m => CleanupT m a -> m a
runCleanupE m = do
ea /\ final <- runStateT (try m) (pure unit)
final
liftEither ea

View File

@@ -3,7 +3,7 @@ module Pipes.Node.Stream where
import Prelude
import Control.Monad.Error.Class (throwError)
import Control.Monad.Rec.Class (whileJust)
import Control.Monad.Rec.Class (Step(..), tailRecM, whileJust)
import Control.Monad.ST.Class (liftST)
import Control.Monad.ST.Ref as STRef
import Control.Monad.Trans.Class (lift)
@@ -28,6 +28,8 @@ fromReadable r =
let
cleanup rmErrorListener = do
liftEffect rmErrorListener
pure $ Done unit
go {error, cancel} = do
liftAff $ delay $ wrap 0.0
err <- liftEffect $ liftST $ STRef.read error
@@ -35,12 +37,12 @@ fromReadable r =
res <- liftEffect $ O.read r
case res of
O.ReadJust a -> yield (Just a) *> go {error, cancel}
O.ReadWouldBlock -> lift (O.awaitReadableOrClosed r) *> go {error, cancel}
O.ReadJust a -> yield (Just a) $> Loop {error, cancel}
O.ReadWouldBlock -> lift (O.awaitReadableOrClosed r) $> Loop {error, cancel}
O.ReadClosed -> yield Nothing *> cleanup cancel
in do
e <- liftEffect $ O.withErrorST r
go e
tailRecM go e
-- | Convert a `Writable` stream to a `Pipe`.
-- |
@@ -52,6 +54,8 @@ fromWritable w =
cleanup rmErrorListener = do
liftEffect rmErrorListener
liftEffect $ O.end w
pure $ Done unit
go {error, cancel} = do
liftAff $ delay $ wrap 0.0
err <- liftEffect $ liftST $ STRef.read error
@@ -63,12 +67,14 @@ fromWritable w =
Just a -> do
res <- liftEffect $ O.write w a
case res of
O.WriteOk -> go {error, cancel}
O.WriteWouldBlock -> liftAff (O.awaitWritableOrClosed w) *> go {error, cancel}
O.WriteClosed -> pure unit
O.WriteOk -> pure $ Loop {error, cancel}
O.WriteWouldBlock -> do
liftAff (O.awaitWritableOrClosed w)
pure $ Loop {error, cancel}
O.WriteClosed -> cleanup cancel
in do
r <- liftEffect $ O.withErrorST w
go r
tailRecM go r
-- | Convert a `Transform` stream to a `Pipe`.
-- |
@@ -81,6 +87,7 @@ fromTransform t =
liftEffect $ O.end t
liftEffect $ removeErrorListener
fromReadable t
pure $ Done unit
yieldFromReadableHalf = do
res <- liftEffect (O.read t)
case res of
@@ -99,12 +106,14 @@ fromTransform t =
res <- liftEffect $ O.write t a'
yieldFromReadableHalf
case res of
O.WriteOk -> go {error, cancel}
O.WriteWouldBlock -> lift (O.awaitWritableOrClosed t) *> go {error, cancel}
O.WriteClosed -> cleanup cancel
O.WriteOk -> pure $ Loop {error, cancel}
O.WriteWouldBlock -> do
lift (O.awaitWritableOrClosed t)
pure $ Loop {error, cancel}
in do
r <- liftEffect $ O.withErrorST t
go r
tailRecM go r
-- | Given a `Producer` of values, wrap them in `Just`.
-- |