4 Commits

Author SHA1 Message Date
7f11c303fb chore: prepare v2.0.2 2024-06-22 19:33:56 -05:00
2e0be4ac62 fix: loop bug 2024-06-22 19:33:45 -05:00
0ba315ede0 chore: prepare v2.0.1 2024-06-22 19:12:56 -05:00
08bd9a817a fix: asyncpipe is profunctor 2024-06-22 19:12:31 -05:00
6 changed files with 37 additions and 22 deletions

View File

@@ -1,6 +1,6 @@
{
"name": "purescript-node-stream-pipes",
"version": "v2.0.0",
"version": "v2.0.2",
"type": "module",
"dependencies": {
"csv-parse": "^5.5.6",

View File

@@ -29,6 +29,7 @@ workspace:
- parallel: ">=6.0.0 <7.0.0"
- pipes: ">=8.0.0 <9.0.0"
- prelude: ">=6.0.1 <7.0.0"
- profunctor: ">=6.0.1 <7.0.0"
- st: ">=6.2.0 <7.0.0"
- strings: ">=6.0.1 <7.0.0"
- tailrec: ">=6.1.0 <7.0.0"

View File

@@ -1,7 +1,7 @@
package:
name: node-stream-pipes
publish:
version: '2.0.0'
version: '2.0.2'
license: 'GPL-3.0-or-later'
location:
githubOwner: 'cakekindel'
@@ -36,6 +36,7 @@ package:
- parallel: ">=6.0.0 <7.0.0"
- pipes: ">=8.0.0 <9.0.0"
- prelude: ">=6.0.1 <7.0.0"
- profunctor: ">=6.0.1 <7.0.0"
- st: ">=6.2.0 <7.0.0"
- strings: ">=6.0.1 <7.0.0"
- tailrec: ">=6.1.0 <7.0.0"

View File

@@ -8,7 +8,7 @@ import Control.Monad.Except (ExceptT, runExceptT)
import Control.Monad.Fork.Class (class MonadBracket, class MonadFork, fork)
import Control.Monad.Maybe.Trans (runMaybeT)
import Control.Monad.Morph (hoist)
import Control.Monad.Rec.Class (class MonadRec, Step(..), forever, tailRecM)
import Control.Monad.Rec.Class (class MonadRec, Step(..), tailRecM)
import Control.Monad.ST.Class (liftST)
import Control.Monad.ST.Ref (STRef)
import Control.Monad.ST.Ref as ST.Ref
@@ -21,6 +21,7 @@ import Data.Either (Either(..), either)
import Data.Generic.Rep (class Generic)
import Data.Maybe (Maybe(..), isNothing)
import Data.Newtype (unwrap)
import Data.Profunctor (class Profunctor)
import Data.Show.Generic (genericShow)
import Data.Time.Duration (Milliseconds)
import Data.Traversable (traverse_)
@@ -92,7 +93,7 @@ instance Show a => Show (ReadResult a) where show = genericShow
-- | - Attempt to read a chunk
-- | - `x -> m ReadSignal`
-- | - Block until the pipe is readable again (or reading must stop)
data AsyncPipe x a b m =
data AsyncPipe x m a b =
AsyncPipe
(m x)
(x -> a -> m WriteResult)
@@ -100,9 +101,22 @@ data AsyncPipe x a b m =
(x -> m (ReadResult b))
(x -> m ReadSignal)
instance Monad m => Functor (AsyncPipe x m a) where
map f (AsyncPipe init w aw r ar) = AsyncPipe init w aw (map (map f) <<< r) ar
instance Monad m => Profunctor (AsyncPipe x m) where
dimap :: forall a b c d. (a -> b) -> (c -> d) -> _ b c -> _ a d
dimap ab cd (AsyncPipe init w aw r ar) =
AsyncPipe
init
(\x -> w x <<< ab)
aw
(map (map cd) <<< r)
ar
-- | Wraps all fields of an `AsyncPipe` with logging to debug
-- | behavior and timing.
debug :: forall x a b m. MonadAff m => String -> AsyncPipe x a b m -> AsyncPipe x a b m
debug :: forall x a b m. MonadAff m => String -> AsyncPipe x m (Maybe a) (Maybe b) -> AsyncPipe x m (Maybe a) (Maybe b)
debug c (AsyncPipe init write awaitWrite read awaitRead) =
let
logL m = liftEffect $ log $ "[" <> c <> "] " <> m
@@ -130,7 +144,7 @@ debug c (AsyncPipe init write awaitWrite read awaitRead) =
read' x = do
logR "read >"
elapsed /\ r <- time $ read x
logR $ "< read " <> show (r $> unit) <> " (" <> show (unwrap elapsed) <> "ms)"
logR $ "< read " <> show (map (const unit) <$> r) <> " (" <> show (unwrap elapsed) <> "ms)"
pure r
awaitWrite' x = do
@@ -188,7 +202,7 @@ debug c (AsyncPipe init write awaitWrite read awaitRead) =
-- | * `read` will pass chunks to `parse` as fast as `parse` allows
-- | * `parse` will parse chunks and yield them to `encode` as soon as they're ready
-- | * `encode` will encode chunks and yield them to `write` as soon as they're ready
sync :: forall x a b f p e m. MonadError e m => Alternative p => Parallel p m => MonadFork f m => MonadAff m => AsyncPipe x (Maybe a) (Maybe b) m -> Pipe (Maybe a) (Maybe b) m Unit
sync :: forall x a b f p e m. MonadError e m => Alternative p => Parallel p m => MonadFork f m => MonadAff m => AsyncPipe x m (Maybe a) (Maybe b) -> Pipe (Maybe a) (Maybe b) m Unit
sync (AsyncPipe init write awaitWrite read awaitRead) =
let
liftPipe :: forall r. (Proxy _ _ _ _ m) r -> ExceptT (Step _ _) (Proxy _ _ _ _ m) r
@@ -243,7 +257,7 @@ pipeAsync
=> MonadAff m
=> MonadBracket Error f m
=> Producer (Maybe a) m Unit
-> AsyncPipe x (Maybe a) (Maybe b) m
-> AsyncPipe x m (Maybe a) (Maybe b)
-> Producer (Maybe b) m Unit
pipeAsync prod (AsyncPipe init write awaitWrite read awaitRead) =
do
@@ -270,12 +284,12 @@ pipeAsync prod (AsyncPipe init write awaitWrite read awaitRead) =
x <- lift init
_thread <- spawn $ void $ runMaybeT $ Collect.foreach (rx x) (hoist lift prod)
forever do
flip tailRecM unit $ const do
getThreadError >>= traverse_ throwError
rb <- lift $ read x
case rb of
ReadOk (Just b) -> yield $ Just b
ReadOk Nothing -> killThread *> yield Nothing
ReadWouldBlock -> void $ lift (awaitRead x)
ReadOk (Just b) -> yield (Just b) $> Loop unit
ReadOk Nothing -> killThread *> yield Nothing $> Done unit
ReadWouldBlock -> void (lift (awaitRead x)) $> Loop unit
infixl 7 pipeAsync as >-/->

View File

@@ -113,7 +113,7 @@ fromTransform
. MonadThrow Error m
=> MonadAff m
=> Effect (O.Transform a b)
-> AsyncPipe (TransformContext a b) (Maybe a) (Maybe b) m
-> AsyncPipe (TransformContext a b) m (Maybe a) (Maybe b)
fromTransform t =
let
init = do
@@ -147,7 +147,6 @@ fromTransform t =
let s = transformStream x
readEnded <- liftEffect $ O.isReadableEnded s
if readEnded then do
transformCleanup x
pure $ AsyncPipe.ReadOk Nothing
else
maybe AsyncPipe.ReadWouldBlock (AsyncPipe.ReadOk <<< Just) <$> maybeReadResult <$> liftEffect (O.read s)

View File

@@ -17,29 +17,29 @@ import Pipes.Node.Stream (TransformContext, fromTransform)
type X = TransformContext Buffer Buffer
fromZlib :: forall r m. MonadAff m => MonadThrow Error m => Effect (ZlibStream r) -> AsyncPipe X (Maybe Buffer) (Maybe Buffer) m
fromZlib :: forall r m. MonadAff m => MonadThrow Error m => Effect (ZlibStream r) -> AsyncPipe X m (Maybe Buffer) (Maybe Buffer)
fromZlib z =
fromTransform do
raw <- liftEffect $ Zlib.toDuplex <$> z
pure $ O.unsafeCoerceTransform raw
gzip :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X (Maybe Buffer) (Maybe Buffer) m
gzip :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X m (Maybe Buffer) (Maybe Buffer)
gzip = fromZlib Zlib.createGzip
gunzip :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X (Maybe Buffer) (Maybe Buffer) m
gunzip :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X m (Maybe Buffer) (Maybe Buffer)
gunzip = fromZlib Zlib.createGunzip
unzip :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X (Maybe Buffer) (Maybe Buffer) m
unzip :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X m (Maybe Buffer) (Maybe Buffer)
unzip = fromZlib Zlib.createUnzip
inflate :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X (Maybe Buffer) (Maybe Buffer) m
inflate :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X m (Maybe Buffer) (Maybe Buffer)
inflate = fromZlib Zlib.createInflate
deflate :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X (Maybe Buffer) (Maybe Buffer) m
deflate :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X m (Maybe Buffer) (Maybe Buffer)
deflate = fromZlib Zlib.createDeflate
brotliCompress :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X (Maybe Buffer) (Maybe Buffer) m
brotliCompress :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X m (Maybe Buffer) (Maybe Buffer)
brotliCompress = fromZlib Zlib.createBrotliCompress
brotliDecompress :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X (Maybe Buffer) (Maybe Buffer) m
brotliDecompress :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X m (Maybe Buffer) (Maybe Buffer)
brotliDecompress = fromZlib Zlib.createBrotliDecompress