4 Commits

Author SHA1 Message Date
f2f18c3c13 chore: prepare v1.0.5 2024-05-10 18:30:36 -05:00
76958b63ef feat: Pipes.Util.chunked 2024-05-10 18:30:27 -05:00
821a47229c chore: prepare v1.0.4 2024-05-10 18:16:01 -05:00
f373334f77 feat: Pipes.Collect 2024-05-10 18:15:58 -05:00
5 changed files with 64 additions and 16 deletions

View File

@@ -1,6 +1,6 @@
{
"name": "purescript-csv-stream",
"version": "v1.0.3",
"version": "v1.0.5",
"dependencies": {
"csv-parse": "^5.5.5",
"csv-stringify": "^6.4.6"

View File

@@ -25,7 +25,6 @@ workspace:
- strings: ">=6.0.1 <7.0.0"
- tailrec: ">=6.1.0 <7.0.0"
- transformers: ">=6.0.0 <7.0.0"
- tuples: ">=7.0.0 <8.0.0"
- unsafe-coerce: ">=6.0.0 <7.0.0"
test_dependencies:
- console

View File

@@ -1,7 +1,7 @@
package:
name: node-stream-pipes
publish:
version: '1.0.3'
version: '1.0.5'
license: 'GPL-3.0-or-later'
location:
githubOwner: 'cakekindel'
@@ -32,7 +32,6 @@ package:
- strings: ">=6.0.1 <7.0.0"
- tailrec: ">=6.1.0 <7.0.0"
- transformers: ">=6.0.0 <7.0.0"
- tuples: ">=7.0.0 <8.0.0"
- unsafe-coerce: ">=6.0.0 <7.0.0"
test:
main: Test.Main

18
src/Pipes.Collect.purs Normal file
View File

@@ -0,0 +1,18 @@
module Pipes.Collect where
import Prelude
import Control.Monad.Rec.Class (class MonadRec)
import Control.Monad.ST.Class (liftST)
import Data.Array.ST as Array.ST
import Effect.Class (class MonadEffect, liftEffect)
import Pipes (for) as Pipes
import Pipes.Core (Producer)
import Pipes.Core (runEffect) as Pipes
-- | Traverse a pipe, collecting into a mutable array with constant stack usage
collectArray :: forall a m. MonadRec m => MonadEffect m => Producer a m Unit -> m (Array a)
collectArray p = do
st <- liftEffect $ liftST $ Array.ST.new
Pipes.runEffect $ Pipes.for p \a -> void $ liftEffect $ liftST $ Array.ST.push a st
liftEffect $ liftST $ Array.ST.unsafeFreeze st

View File

@@ -2,9 +2,14 @@ module Pipes.Util where
import Prelude
import Control.Monad.Maybe.Trans (MaybeT(..), runMaybeT)
import Control.Monad.Rec.Class (whileJust)
import Control.Monad.ST.Class (liftST)
import Control.Monad.ST.Ref (STRef)
import Control.Monad.ST.Ref as STRef
import Control.Monad.Trans.Class (lift)
import Data.Array.ST (STArray)
import Data.Array.ST as Array.ST
import Data.Maybe (Maybe(..))
import Effect.Class (class MonadEffect, liftEffect)
import Pipes (await, yield)
@@ -18,15 +23,42 @@ import Pipes.Core (Pipe)
-- | ```
intersperse :: forall m a. MonadEffect m => a -> Pipe (Maybe a) (Maybe a) m Unit
intersperse sep = do
isFirst <- liftEffect $ liftST $ STRef.new true
whileJust do
ma <- await
isFirst' <- liftEffect $ liftST $ STRef.read isFirst
case ma of
Just a
| isFirst' -> do
void $ liftEffect $ liftST $ STRef.write false isFirst
yield $ Just a
| otherwise -> yield (Just sep) *> yield (Just a)
Nothing -> yield Nothing
pure $ void ma
isFirstST <- liftEffect $ liftST $ STRef.new true
let
getIsFirst = liftEffect $ liftST $ STRef.read isFirstST
markNotFirst = void $ liftEffect $ liftST $ STRef.write false isFirstST
whileJust $ runMaybeT do
a <- MaybeT await
isFirst <- getIsFirst
if isFirst then markNotFirst else lift $ yield $ Just sep
lift $ yield $ Just a
yield Nothing
-- | Accumulate values in chunks of a given size.
-- |
-- | If the pipe closes without yielding a multiple of `size` elements,
-- | the remaining elements are yielded at the end.
chunked :: forall m a. MonadEffect m => Int -> Pipe (Maybe a) (Maybe (Array a)) m Unit
chunked size = do
chunkST :: STRef _ (STArray _ a) <- liftEffect $ liftST $ STRef.new =<< Array.ST.new
let
chunkPut a = liftEffect $ liftST do
chunkArray <- STRef.read chunkST
void $ Array.ST.push a chunkArray
chunkLength = liftEffect $ liftST do
chunkArray <- STRef.read chunkST
Array.ST.length chunkArray
chunkTake = liftEffect $ liftST do
chunkArray <- STRef.read chunkST
void $ flip STRef.write chunkST =<< Array.ST.new
Array.ST.unsafeFreeze chunkArray
whileJust $ runMaybeT do
a <- MaybeT await
chunkPut a
len <- chunkLength
when (len >= size) $ lift $ yield =<< Just <$> chunkTake
yield =<< Just <$> chunkTake
yield Nothing