Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
f2f18c3c13
|
|||
|
76958b63ef
|
|||
|
821a47229c
|
|||
|
f373334f77
|
|||
|
30fbce3a2d
|
|||
|
3c8e497fa2
|
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "purescript-csv-stream",
|
||||
"version": "v1.0.2",
|
||||
"version": "v1.0.5",
|
||||
"dependencies": {
|
||||
"csv-parse": "^5.5.5",
|
||||
"csv-stringify": "^6.4.6"
|
||||
|
||||
@@ -25,7 +25,6 @@ workspace:
|
||||
- strings: ">=6.0.1 <7.0.0"
|
||||
- tailrec: ">=6.1.0 <7.0.0"
|
||||
- transformers: ">=6.0.0 <7.0.0"
|
||||
- tuples: ">=7.0.0 <8.0.0"
|
||||
- unsafe-coerce: ">=6.0.0 <7.0.0"
|
||||
test_dependencies:
|
||||
- console
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
package:
|
||||
name: node-stream-pipes
|
||||
publish:
|
||||
version: '1.0.2'
|
||||
version: '1.0.5'
|
||||
license: 'GPL-3.0-or-later'
|
||||
location:
|
||||
githubOwner: 'cakekindel'
|
||||
@@ -32,7 +32,6 @@ package:
|
||||
- strings: ">=6.0.1 <7.0.0"
|
||||
- tailrec: ">=6.1.0 <7.0.0"
|
||||
- transformers: ">=6.0.0 <7.0.0"
|
||||
- tuples: ">=7.0.0 <8.0.0"
|
||||
- unsafe-coerce: ">=6.0.0 <7.0.0"
|
||||
test:
|
||||
main: Test.Main
|
||||
|
||||
@@ -1,24 +0,0 @@
|
||||
module Control.Monad.Cleanup where
|
||||
|
||||
import Prelude
|
||||
|
||||
import Control.Monad.Error.Class (class MonadError, liftEither, try)
|
||||
import Control.Monad.State (StateT, modify_, runStateT)
|
||||
import Data.Tuple.Nested ((/\))
|
||||
|
||||
type CleanupT m = StateT (m Unit) m
|
||||
|
||||
finally :: forall m. Monad m => (m Unit) -> CleanupT m Unit
|
||||
finally m = modify_ (_ *> m)
|
||||
|
||||
runCleanup :: forall m a. Monad m => CleanupT m a -> m a
|
||||
runCleanup m = do
|
||||
a /\ final <- runStateT m (pure unit)
|
||||
final
|
||||
pure a
|
||||
|
||||
runCleanupE :: forall e m a. MonadError e m => CleanupT m a -> m a
|
||||
runCleanupE m = do
|
||||
ea /\ final <- runStateT (try m) (pure unit)
|
||||
final
|
||||
liftEither ea
|
||||
18
src/Pipes.Collect.purs
Normal file
18
src/Pipes.Collect.purs
Normal file
@@ -0,0 +1,18 @@
|
||||
module Pipes.Collect where
|
||||
|
||||
import Prelude
|
||||
|
||||
import Control.Monad.Rec.Class (class MonadRec)
|
||||
import Control.Monad.ST.Class (liftST)
|
||||
import Data.Array.ST as Array.ST
|
||||
import Effect.Class (class MonadEffect, liftEffect)
|
||||
import Pipes (for) as Pipes
|
||||
import Pipes.Core (Producer)
|
||||
import Pipes.Core (runEffect) as Pipes
|
||||
|
||||
-- | Traverse a pipe, collecting into a mutable array with constant stack usage
|
||||
collectArray :: forall a m. MonadRec m => MonadEffect m => Producer a m Unit -> m (Array a)
|
||||
collectArray p = do
|
||||
st <- liftEffect $ liftST $ Array.ST.new
|
||||
Pipes.runEffect $ Pipes.for p \a -> void $ liftEffect $ liftST $ Array.ST.push a st
|
||||
liftEffect $ liftST $ Array.ST.unsafeFreeze st
|
||||
@@ -3,7 +3,7 @@ module Pipes.Node.Stream where
|
||||
import Prelude
|
||||
|
||||
import Control.Monad.Error.Class (throwError)
|
||||
import Control.Monad.Rec.Class (whileJust)
|
||||
import Control.Monad.Rec.Class (Step(..), tailRecM, whileJust)
|
||||
import Control.Monad.ST.Class (liftST)
|
||||
import Control.Monad.ST.Ref as STRef
|
||||
import Control.Monad.Trans.Class (lift)
|
||||
@@ -28,6 +28,8 @@ fromReadable r =
|
||||
let
|
||||
cleanup rmErrorListener = do
|
||||
liftEffect rmErrorListener
|
||||
pure $ Done unit
|
||||
|
||||
go {error, cancel} = do
|
||||
liftAff $ delay $ wrap 0.0
|
||||
err <- liftEffect $ liftST $ STRef.read error
|
||||
@@ -35,12 +37,12 @@ fromReadable r =
|
||||
|
||||
res <- liftEffect $ O.read r
|
||||
case res of
|
||||
O.ReadJust a -> yield (Just a) *> go {error, cancel}
|
||||
O.ReadWouldBlock -> lift (O.awaitReadableOrClosed r) *> go {error, cancel}
|
||||
O.ReadJust a -> yield (Just a) $> Loop {error, cancel}
|
||||
O.ReadWouldBlock -> lift (O.awaitReadableOrClosed r) $> Loop {error, cancel}
|
||||
O.ReadClosed -> yield Nothing *> cleanup cancel
|
||||
in do
|
||||
e <- liftEffect $ O.withErrorST r
|
||||
go e
|
||||
tailRecM go e
|
||||
|
||||
-- | Convert a `Writable` stream to a `Pipe`.
|
||||
-- |
|
||||
@@ -52,6 +54,8 @@ fromWritable w =
|
||||
cleanup rmErrorListener = do
|
||||
liftEffect rmErrorListener
|
||||
liftEffect $ O.end w
|
||||
pure $ Done unit
|
||||
|
||||
go {error, cancel} = do
|
||||
liftAff $ delay $ wrap 0.0
|
||||
err <- liftEffect $ liftST $ STRef.read error
|
||||
@@ -63,12 +67,14 @@ fromWritable w =
|
||||
Just a -> do
|
||||
res <- liftEffect $ O.write w a
|
||||
case res of
|
||||
O.WriteOk -> go {error, cancel}
|
||||
O.WriteWouldBlock -> liftAff (O.awaitWritableOrClosed w) *> go {error, cancel}
|
||||
O.WriteClosed -> pure unit
|
||||
O.WriteOk -> pure $ Loop {error, cancel}
|
||||
O.WriteWouldBlock -> do
|
||||
liftAff (O.awaitWritableOrClosed w)
|
||||
pure $ Loop {error, cancel}
|
||||
O.WriteClosed -> cleanup cancel
|
||||
in do
|
||||
r <- liftEffect $ O.withErrorST w
|
||||
go r
|
||||
tailRecM go r
|
||||
|
||||
-- | Convert a `Transform` stream to a `Pipe`.
|
||||
-- |
|
||||
@@ -81,6 +87,7 @@ fromTransform t =
|
||||
liftEffect $ O.end t
|
||||
liftEffect $ removeErrorListener
|
||||
fromReadable t
|
||||
pure $ Done unit
|
||||
yieldFromReadableHalf = do
|
||||
res <- liftEffect (O.read t)
|
||||
case res of
|
||||
@@ -99,12 +106,14 @@ fromTransform t =
|
||||
res <- liftEffect $ O.write t a'
|
||||
yieldFromReadableHalf
|
||||
case res of
|
||||
O.WriteOk -> go {error, cancel}
|
||||
O.WriteWouldBlock -> lift (O.awaitWritableOrClosed t) *> go {error, cancel}
|
||||
O.WriteClosed -> cleanup cancel
|
||||
O.WriteOk -> pure $ Loop {error, cancel}
|
||||
O.WriteWouldBlock -> do
|
||||
lift (O.awaitWritableOrClosed t)
|
||||
pure $ Loop {error, cancel}
|
||||
in do
|
||||
r <- liftEffect $ O.withErrorST t
|
||||
go r
|
||||
tailRecM go r
|
||||
|
||||
-- | Given a `Producer` of values, wrap them in `Just`.
|
||||
-- |
|
||||
|
||||
@@ -2,9 +2,14 @@ module Pipes.Util where
|
||||
|
||||
import Prelude
|
||||
|
||||
import Control.Monad.Maybe.Trans (MaybeT(..), runMaybeT)
|
||||
import Control.Monad.Rec.Class (whileJust)
|
||||
import Control.Monad.ST.Class (liftST)
|
||||
import Control.Monad.ST.Ref (STRef)
|
||||
import Control.Monad.ST.Ref as STRef
|
||||
import Control.Monad.Trans.Class (lift)
|
||||
import Data.Array.ST (STArray)
|
||||
import Data.Array.ST as Array.ST
|
||||
import Data.Maybe (Maybe(..))
|
||||
import Effect.Class (class MonadEffect, liftEffect)
|
||||
import Pipes (await, yield)
|
||||
@@ -18,15 +23,42 @@ import Pipes.Core (Pipe)
|
||||
-- | ```
|
||||
intersperse :: forall m a. MonadEffect m => a -> Pipe (Maybe a) (Maybe a) m Unit
|
||||
intersperse sep = do
|
||||
isFirst <- liftEffect $ liftST $ STRef.new true
|
||||
whileJust do
|
||||
ma <- await
|
||||
isFirst' <- liftEffect $ liftST $ STRef.read isFirst
|
||||
case ma of
|
||||
Just a
|
||||
| isFirst' -> do
|
||||
void $ liftEffect $ liftST $ STRef.write false isFirst
|
||||
yield $ Just a
|
||||
| otherwise -> yield (Just sep) *> yield (Just a)
|
||||
Nothing -> yield Nothing
|
||||
pure $ void ma
|
||||
isFirstST <- liftEffect $ liftST $ STRef.new true
|
||||
let
|
||||
getIsFirst = liftEffect $ liftST $ STRef.read isFirstST
|
||||
markNotFirst = void $ liftEffect $ liftST $ STRef.write false isFirstST
|
||||
|
||||
whileJust $ runMaybeT do
|
||||
a <- MaybeT await
|
||||
isFirst <- getIsFirst
|
||||
if isFirst then markNotFirst else lift $ yield $ Just sep
|
||||
lift $ yield $ Just a
|
||||
|
||||
yield Nothing
|
||||
|
||||
-- | Accumulate values in chunks of a given size.
|
||||
-- |
|
||||
-- | If the pipe closes without yielding a multiple of `size` elements,
|
||||
-- | the remaining elements are yielded at the end.
|
||||
chunked :: forall m a. MonadEffect m => Int -> Pipe (Maybe a) (Maybe (Array a)) m Unit
|
||||
chunked size = do
|
||||
chunkST :: STRef _ (STArray _ a) <- liftEffect $ liftST $ STRef.new =<< Array.ST.new
|
||||
let
|
||||
chunkPut a = liftEffect $ liftST do
|
||||
chunkArray <- STRef.read chunkST
|
||||
void $ Array.ST.push a chunkArray
|
||||
chunkLength = liftEffect $ liftST do
|
||||
chunkArray <- STRef.read chunkST
|
||||
Array.ST.length chunkArray
|
||||
chunkTake = liftEffect $ liftST do
|
||||
chunkArray <- STRef.read chunkST
|
||||
void $ flip STRef.write chunkST =<< Array.ST.new
|
||||
Array.ST.unsafeFreeze chunkArray
|
||||
|
||||
whileJust $ runMaybeT do
|
||||
a <- MaybeT await
|
||||
chunkPut a
|
||||
len <- chunkLength
|
||||
when (len >= size) $ lift $ yield =<< Just <$> chunkTake
|
||||
yield =<< Just <$> chunkTake
|
||||
yield Nothing
|
||||
|
||||
Reference in New Issue
Block a user