6 Commits

Author SHA1 Message Date
821a47229c chore: prepare v1.0.4 2024-05-10 18:16:01 -05:00
f373334f77 feat: Pipes.Collect 2024-05-10 18:15:58 -05:00
30fbce3a2d chore: prepare v1.0.3 2024-05-10 18:03:48 -05:00
3c8e497fa2 fix: explicit tailRecM 2024-05-10 18:03:34 -05:00
7a18a7182c chore: prepare v1.0.2 2024-05-10 15:05:20 -05:00
93ef037344 fix: lockfile 2024-05-10 15:05:14 -05:00
6 changed files with 51 additions and 49 deletions

View File

@@ -1,6 +1,6 @@
{
"name": "purescript-csv-stream",
"version": "v1.0.1",
"version": "v1.0.4",
"dependencies": {
"csv-parse": "^5.5.5",
"csv-stringify": "^6.4.6"

View File

@@ -4,28 +4,28 @@ workspace:
path: ./
dependencies:
- aff: ">=7.1.0 <8.0.0"
- arrays
- arrays: ">=7.3.0 <8.0.0"
- effect: ">=4.0.0 <5.0.0"
- either: ">=6.1.0 <7.0.0"
- exceptions: ">=6.0.0 <7.0.0"
- foldable-traversable
- foldable-traversable: ">=6.0.0 <7.0.0"
- maybe: ">=6.0.0 <7.0.0"
- mmorph
- mmorph: ">=7.0.0 <8.0.0"
- newtype: ">=5.0.0 <6.0.0"
- node-buffer: ">=9.0.0 <10.0.0"
- node-event-emitter: ">=3.0.0 <4.0.0"
- node-fs
- node-path
- node-fs: ">=9.1.0 <10.0.0"
- node-path: ">=5.0.0 <6.0.0"
- node-streams: ">=9.0.0 <10.0.0"
- node-zlib
- parallel
- node-zlib: ">=0.4.0 <0.5.0"
- parallel: ">=6.0.0 <7.0.0"
- pipes: ">=8.0.0 <9.0.0"
- prelude: ">=6.0.1 <7.0.0"
- st
- strings
- tailrec
- st: ">=6.2.0 <7.0.0"
- strings: ">=6.0.1 <7.0.0"
- tailrec: ">=6.1.0 <7.0.0"
- transformers: ">=6.0.0 <7.0.0"
- tuples
- tuples: ">=7.0.0 <8.0.0"
- unsafe-coerce: ">=6.0.0 <7.0.0"
test_dependencies:
- console

View File

@@ -1,7 +1,7 @@
package:
name: node-stream-pipes
publish:
version: '1.0.1'
version: '1.0.4'
license: 'GPL-3.0-or-later'
location:
githubOwner: 'cakekindel'
@@ -32,7 +32,6 @@ package:
- strings: ">=6.0.1 <7.0.0"
- tailrec: ">=6.1.0 <7.0.0"
- transformers: ">=6.0.0 <7.0.0"
- tuples: ">=7.0.0 <8.0.0"
- unsafe-coerce: ">=6.0.0 <7.0.0"
test:
main: Test.Main

View File

@@ -1,24 +0,0 @@
module Control.Monad.Cleanup where
import Prelude
import Control.Monad.Error.Class (class MonadError, liftEither, try)
import Control.Monad.State (StateT, modify_, runStateT)
import Data.Tuple.Nested ((/\))
type CleanupT m = StateT (m Unit) m
finally :: forall m. Monad m => (m Unit) -> CleanupT m Unit
finally m = modify_ (_ *> m)
runCleanup :: forall m a. Monad m => CleanupT m a -> m a
runCleanup m = do
a /\ final <- runStateT m (pure unit)
final
pure a
runCleanupE :: forall e m a. MonadError e m => CleanupT m a -> m a
runCleanupE m = do
ea /\ final <- runStateT (try m) (pure unit)
final
liftEither ea

18
src/Pipes.Collect.purs Normal file
View File

@@ -0,0 +1,18 @@
module Pipes.Collect where
import Prelude
import Control.Monad.Rec.Class (class MonadRec)
import Control.Monad.ST.Class (liftST)
import Data.Array.ST as Array.ST
import Effect.Class (class MonadEffect, liftEffect)
import Pipes (for) as Pipes
import Pipes.Core (Producer)
import Pipes.Core (runEffect) as Pipes
-- | Traverse a pipe, collecting into a mutable array with constant stack usage
collectArray :: forall a m. MonadRec m => MonadEffect m => Producer a m Unit -> m (Array a)
collectArray p = do
st <- liftEffect $ liftST $ Array.ST.new
Pipes.runEffect $ Pipes.for p \a -> void $ liftEffect $ liftST $ Array.ST.push a st
liftEffect $ liftST $ Array.ST.unsafeFreeze st

View File

@@ -3,7 +3,7 @@ module Pipes.Node.Stream where
import Prelude
import Control.Monad.Error.Class (throwError)
import Control.Monad.Rec.Class (whileJust)
import Control.Monad.Rec.Class (Step(..), tailRecM, whileJust)
import Control.Monad.ST.Class (liftST)
import Control.Monad.ST.Ref as STRef
import Control.Monad.Trans.Class (lift)
@@ -28,6 +28,8 @@ fromReadable r =
let
cleanup rmErrorListener = do
liftEffect rmErrorListener
pure $ Done unit
go {error, cancel} = do
liftAff $ delay $ wrap 0.0
err <- liftEffect $ liftST $ STRef.read error
@@ -35,12 +37,12 @@ fromReadable r =
res <- liftEffect $ O.read r
case res of
O.ReadJust a -> yield (Just a) *> go {error, cancel}
O.ReadWouldBlock -> lift (O.awaitReadableOrClosed r) *> go {error, cancel}
O.ReadJust a -> yield (Just a) $> Loop {error, cancel}
O.ReadWouldBlock -> lift (O.awaitReadableOrClosed r) $> Loop {error, cancel}
O.ReadClosed -> yield Nothing *> cleanup cancel
in do
e <- liftEffect $ O.withErrorST r
go e
tailRecM go e
-- | Convert a `Writable` stream to a `Pipe`.
-- |
@@ -52,6 +54,8 @@ fromWritable w =
cleanup rmErrorListener = do
liftEffect rmErrorListener
liftEffect $ O.end w
pure $ Done unit
go {error, cancel} = do
liftAff $ delay $ wrap 0.0
err <- liftEffect $ liftST $ STRef.read error
@@ -63,12 +67,14 @@ fromWritable w =
Just a -> do
res <- liftEffect $ O.write w a
case res of
O.WriteOk -> go {error, cancel}
O.WriteWouldBlock -> liftAff (O.awaitWritableOrClosed w) *> go {error, cancel}
O.WriteClosed -> pure unit
O.WriteOk -> pure $ Loop {error, cancel}
O.WriteWouldBlock -> do
liftAff (O.awaitWritableOrClosed w)
pure $ Loop {error, cancel}
O.WriteClosed -> cleanup cancel
in do
r <- liftEffect $ O.withErrorST w
go r
tailRecM go r
-- | Convert a `Transform` stream to a `Pipe`.
-- |
@@ -81,6 +87,7 @@ fromTransform t =
liftEffect $ O.end t
liftEffect $ removeErrorListener
fromReadable t
pure $ Done unit
yieldFromReadableHalf = do
res <- liftEffect (O.read t)
case res of
@@ -99,12 +106,14 @@ fromTransform t =
res <- liftEffect $ O.write t a'
yieldFromReadableHalf
case res of
O.WriteOk -> go {error, cancel}
O.WriteWouldBlock -> lift (O.awaitWritableOrClosed t) *> go {error, cancel}
O.WriteClosed -> cleanup cancel
O.WriteOk -> pure $ Loop {error, cancel}
O.WriteWouldBlock -> do
lift (O.awaitWritableOrClosed t)
pure $ Loop {error, cancel}
in do
r <- liftEffect $ O.withErrorST t
go r
tailRecM go r
-- | Given a `Producer` of values, wrap them in `Just`.
-- |