5 Commits

Author SHA1 Message Date
7d03850623 chore: prepare v2.1.5 2024-07-08 12:43:58 -05:00
dc7a2d3387 feat: Pipe.Util.buffered 2024-07-08 12:40:25 -05:00
43ff92a4ad chore: prepare v2.1.4 2024-06-25 14:26:40 -05:00
bb9790b2f4 chore: prepare v2.1.34 2024-06-25 14:24:50 -05:00
6b64956034 chore: prepare v2.1.3 2024-06-25 14:24:46 -05:00
4 changed files with 37 additions and 11 deletions

View File

@@ -1,6 +1,6 @@
{
"name": "purescript-node-stream-pipes",
"version": "v2.1.3",
"version": "v2.1.5",
"type": "module",
"dependencies": {
"csv-parse": "^5.5.6",

View File

@@ -20,7 +20,7 @@ workspace:
- newtype: ">=5.0.0 <6.0.0"
- node-buffer: ">=9.0.0 <10.0.0"
- node-event-emitter: ">=3.0.0 <4.0.0"
- node-fs: ">=9.1.0 <10.0.0"
- node-fs: ">=9.1.0 <9.2.0"
- node-path: ">=5.0.0 <6.0.0"
- node-streams: ">=9.0.0 <10.0.0"
- node-zlib: ">=0.4.0 <0.5.0"
@@ -302,8 +302,8 @@ packages:
- unfoldable
exceptions:
type: registry
version: 6.0.0
integrity: sha256-y/xTAEIZIARCE+50/u1di0ncebJ+CIwNOLswyOWzMTw=
version: 6.1.0
integrity: sha256-K0T89IHtF3vBY7eSAO7eDOqSb2J9kZGAcDN5+IKsF8E=
dependencies:
- effect
- either
@@ -867,8 +867,8 @@ packages:
- refs
transformers:
type: registry
version: 6.0.0
integrity: sha256-Pzw40HjthX77tdPAYzjx43LK3X5Bb7ZspYAp27wksFA=
version: 6.1.0
integrity: sha256-3Bm+Z6tsC/paG888XkywDngJ2JMos+JfOhRlkVfb7gI=
dependencies:
- control
- distributive
@@ -881,6 +881,7 @@ packages:
- maybe
- newtype
- prelude
- st
- tailrec
- tuples
- unfoldable

View File

@@ -1,7 +1,7 @@
package:
name: node-stream-pipes
publish:
version: '2.1.3'
version: '2.1.5'
license: 'GPL-3.0-or-later'
location:
githubOwner: 'cakekindel'
@@ -27,7 +27,7 @@ package:
- newtype: ">=5.0.0 <6.0.0"
- node-buffer: ">=9.0.0 <10.0.0"
- node-event-emitter: ">=3.0.0 <4.0.0"
- node-fs: ">=9.1.0 <10.0.0"
- node-fs: ">=9.1.0 <9.2.0"
- node-path: ">=5.0.0 <6.0.0"
- node-streams: ">=9.0.0 <10.0.0"
- node-zlib: ">=0.4.0 <0.5.0"

View File

@@ -15,9 +15,12 @@ import Data.Either (hush)
import Data.HashSet as HashSet
import Data.Hashable (class Hashable, hash)
import Data.List.NonEmpty (NonEmptyList)
import Data.Maybe (Maybe(..), fromMaybe)
import Data.Maybe (Maybe(..), fromMaybe, maybe)
import Data.Traversable (traverse_)
import Data.Tuple.Nested (type (/\), (/\))
import Effect.Class (class MonadEffect, liftEffect)
import Node.Buffer (Buffer)
import Node.Buffer as Buffer
import Pipes (await, yield)
import Pipes as Pipes
import Pipes.Core (Pipe, Producer)
@@ -96,6 +99,30 @@ chunked size = do
yield Nothing
-- | Buffers input to the given size before passing to subsequent pipes
buffered :: forall m. MonadEffect m => Int -> Pipe (Maybe Buffer) (Maybe Buffer) m Unit
buffered size = do
chunkST :: STRef _ (Maybe Buffer) <- liftEffect $ liftST $ STRef.new Nothing
let
chunkClear = liftEffect $ liftST $ STRef.write Nothing chunkST
chunkPeek = liftEffect $ liftST $ STRef.read chunkST
chunkLen = maybe (pure 0) (liftEffect <<< Buffer.size) =<< chunkPeek
chunkPut b = liftEffect do
new <- liftST (STRef.read chunkST) >>= maybe (pure b) (\a -> Buffer.concat [a, b])
void $ liftST $ STRef.write (Just new) chunkST
pure new
Rec.whileJust $ runMaybeT do
a <- MaybeT await
buf <- chunkPut a
len <- lift chunkLen
when (len > size) $ chunkClear *> lift (yield $ Just buf)
len <- chunkLen
chunkPeek >>= traverse_ (when (len > 0) <<< yield <<< Just)
yield Nothing
-- | Equivalent of unix `uniq`, filtering out duplicate values passed to it.
-- |
-- | Uses a `HashSet` of hashes of `a`; for `n` elements `awaited`, this pipe
@@ -149,5 +176,3 @@ invoke m a =
Pure _ -> pure Exited
in
go (IDidNotAwait m)