Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
7f11c303fb
|
|||
|
2e0be4ac62
|
|||
|
0ba315ede0
|
|||
|
08bd9a817a
|
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "purescript-node-stream-pipes",
|
||||
"version": "v2.0.0",
|
||||
"version": "v2.0.2",
|
||||
"type": "module",
|
||||
"dependencies": {
|
||||
"csv-parse": "^5.5.6",
|
||||
|
||||
@@ -29,6 +29,7 @@ workspace:
|
||||
- parallel: ">=6.0.0 <7.0.0"
|
||||
- pipes: ">=8.0.0 <9.0.0"
|
||||
- prelude: ">=6.0.1 <7.0.0"
|
||||
- profunctor: ">=6.0.1 <7.0.0"
|
||||
- st: ">=6.2.0 <7.0.0"
|
||||
- strings: ">=6.0.1 <7.0.0"
|
||||
- tailrec: ">=6.1.0 <7.0.0"
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
package:
|
||||
name: node-stream-pipes
|
||||
publish:
|
||||
version: '2.0.0'
|
||||
version: '2.0.2'
|
||||
license: 'GPL-3.0-or-later'
|
||||
location:
|
||||
githubOwner: 'cakekindel'
|
||||
@@ -36,6 +36,7 @@ package:
|
||||
- parallel: ">=6.0.0 <7.0.0"
|
||||
- pipes: ">=8.0.0 <9.0.0"
|
||||
- prelude: ">=6.0.1 <7.0.0"
|
||||
- profunctor: ">=6.0.1 <7.0.0"
|
||||
- st: ">=6.2.0 <7.0.0"
|
||||
- strings: ">=6.0.1 <7.0.0"
|
||||
- tailrec: ">=6.1.0 <7.0.0"
|
||||
|
||||
@@ -8,7 +8,7 @@ import Control.Monad.Except (ExceptT, runExceptT)
|
||||
import Control.Monad.Fork.Class (class MonadBracket, class MonadFork, fork)
|
||||
import Control.Monad.Maybe.Trans (runMaybeT)
|
||||
import Control.Monad.Morph (hoist)
|
||||
import Control.Monad.Rec.Class (class MonadRec, Step(..), forever, tailRecM)
|
||||
import Control.Monad.Rec.Class (class MonadRec, Step(..), tailRecM)
|
||||
import Control.Monad.ST.Class (liftST)
|
||||
import Control.Monad.ST.Ref (STRef)
|
||||
import Control.Monad.ST.Ref as ST.Ref
|
||||
@@ -21,6 +21,7 @@ import Data.Either (Either(..), either)
|
||||
import Data.Generic.Rep (class Generic)
|
||||
import Data.Maybe (Maybe(..), isNothing)
|
||||
import Data.Newtype (unwrap)
|
||||
import Data.Profunctor (class Profunctor)
|
||||
import Data.Show.Generic (genericShow)
|
||||
import Data.Time.Duration (Milliseconds)
|
||||
import Data.Traversable (traverse_)
|
||||
@@ -92,7 +93,7 @@ instance Show a => Show (ReadResult a) where show = genericShow
|
||||
-- | - Attempt to read a chunk
|
||||
-- | - `x -> m ReadSignal`
|
||||
-- | - Block until the pipe is readable again (or reading must stop)
|
||||
data AsyncPipe x a b m =
|
||||
data AsyncPipe x m a b =
|
||||
AsyncPipe
|
||||
(m x)
|
||||
(x -> a -> m WriteResult)
|
||||
@@ -100,9 +101,22 @@ data AsyncPipe x a b m =
|
||||
(x -> m (ReadResult b))
|
||||
(x -> m ReadSignal)
|
||||
|
||||
instance Monad m => Functor (AsyncPipe x m a) where
|
||||
map f (AsyncPipe init w aw r ar) = AsyncPipe init w aw (map (map f) <<< r) ar
|
||||
|
||||
instance Monad m => Profunctor (AsyncPipe x m) where
|
||||
dimap :: forall a b c d. (a -> b) -> (c -> d) -> _ b c -> _ a d
|
||||
dimap ab cd (AsyncPipe init w aw r ar) =
|
||||
AsyncPipe
|
||||
init
|
||||
(\x -> w x <<< ab)
|
||||
aw
|
||||
(map (map cd) <<< r)
|
||||
ar
|
||||
|
||||
-- | Wraps all fields of an `AsyncPipe` with logging to debug
|
||||
-- | behavior and timing.
|
||||
debug :: forall x a b m. MonadAff m => String -> AsyncPipe x a b m -> AsyncPipe x a b m
|
||||
debug :: forall x a b m. MonadAff m => String -> AsyncPipe x m (Maybe a) (Maybe b) -> AsyncPipe x m (Maybe a) (Maybe b)
|
||||
debug c (AsyncPipe init write awaitWrite read awaitRead) =
|
||||
let
|
||||
logL m = liftEffect $ log $ "[" <> c <> "] " <> m
|
||||
@@ -130,7 +144,7 @@ debug c (AsyncPipe init write awaitWrite read awaitRead) =
|
||||
read' x = do
|
||||
logR "read >"
|
||||
elapsed /\ r <- time $ read x
|
||||
logR $ "< read " <> show (r $> unit) <> " (" <> show (unwrap elapsed) <> "ms)"
|
||||
logR $ "< read " <> show (map (const unit) <$> r) <> " (" <> show (unwrap elapsed) <> "ms)"
|
||||
pure r
|
||||
|
||||
awaitWrite' x = do
|
||||
@@ -188,7 +202,7 @@ debug c (AsyncPipe init write awaitWrite read awaitRead) =
|
||||
-- | * `read` will pass chunks to `parse` as fast as `parse` allows
|
||||
-- | * `parse` will parse chunks and yield them to `encode` as soon as they're ready
|
||||
-- | * `encode` will encode chunks and yield them to `write` as soon as they're ready
|
||||
sync :: forall x a b f p e m. MonadError e m => Alternative p => Parallel p m => MonadFork f m => MonadAff m => AsyncPipe x (Maybe a) (Maybe b) m -> Pipe (Maybe a) (Maybe b) m Unit
|
||||
sync :: forall x a b f p e m. MonadError e m => Alternative p => Parallel p m => MonadFork f m => MonadAff m => AsyncPipe x m (Maybe a) (Maybe b) -> Pipe (Maybe a) (Maybe b) m Unit
|
||||
sync (AsyncPipe init write awaitWrite read awaitRead) =
|
||||
let
|
||||
liftPipe :: forall r. (Proxy _ _ _ _ m) r -> ExceptT (Step _ _) (Proxy _ _ _ _ m) r
|
||||
@@ -243,7 +257,7 @@ pipeAsync
|
||||
=> MonadAff m
|
||||
=> MonadBracket Error f m
|
||||
=> Producer (Maybe a) m Unit
|
||||
-> AsyncPipe x (Maybe a) (Maybe b) m
|
||||
-> AsyncPipe x m (Maybe a) (Maybe b)
|
||||
-> Producer (Maybe b) m Unit
|
||||
pipeAsync prod (AsyncPipe init write awaitWrite read awaitRead) =
|
||||
do
|
||||
@@ -270,12 +284,12 @@ pipeAsync prod (AsyncPipe init write awaitWrite read awaitRead) =
|
||||
x <- lift init
|
||||
_thread <- spawn $ void $ runMaybeT $ Collect.foreach (rx x) (hoist lift prod)
|
||||
|
||||
forever do
|
||||
flip tailRecM unit $ const do
|
||||
getThreadError >>= traverse_ throwError
|
||||
rb <- lift $ read x
|
||||
case rb of
|
||||
ReadOk (Just b) -> yield $ Just b
|
||||
ReadOk Nothing -> killThread *> yield Nothing
|
||||
ReadWouldBlock -> void $ lift (awaitRead x)
|
||||
ReadOk (Just b) -> yield (Just b) $> Loop unit
|
||||
ReadOk Nothing -> killThread *> yield Nothing $> Done unit
|
||||
ReadWouldBlock -> void (lift (awaitRead x)) $> Loop unit
|
||||
|
||||
infixl 7 pipeAsync as >-/->
|
||||
|
||||
@@ -113,7 +113,7 @@ fromTransform
|
||||
. MonadThrow Error m
|
||||
=> MonadAff m
|
||||
=> Effect (O.Transform a b)
|
||||
-> AsyncPipe (TransformContext a b) (Maybe a) (Maybe b) m
|
||||
-> AsyncPipe (TransformContext a b) m (Maybe a) (Maybe b)
|
||||
fromTransform t =
|
||||
let
|
||||
init = do
|
||||
@@ -147,7 +147,6 @@ fromTransform t =
|
||||
let s = transformStream x
|
||||
readEnded <- liftEffect $ O.isReadableEnded s
|
||||
if readEnded then do
|
||||
transformCleanup x
|
||||
pure $ AsyncPipe.ReadOk Nothing
|
||||
else
|
||||
maybe AsyncPipe.ReadWouldBlock (AsyncPipe.ReadOk <<< Just) <$> maybeReadResult <$> liftEffect (O.read s)
|
||||
|
||||
@@ -17,29 +17,29 @@ import Pipes.Node.Stream (TransformContext, fromTransform)
|
||||
|
||||
type X = TransformContext Buffer Buffer
|
||||
|
||||
fromZlib :: forall r m. MonadAff m => MonadThrow Error m => Effect (ZlibStream r) -> AsyncPipe X (Maybe Buffer) (Maybe Buffer) m
|
||||
fromZlib :: forall r m. MonadAff m => MonadThrow Error m => Effect (ZlibStream r) -> AsyncPipe X m (Maybe Buffer) (Maybe Buffer)
|
||||
fromZlib z =
|
||||
fromTransform do
|
||||
raw <- liftEffect $ Zlib.toDuplex <$> z
|
||||
pure $ O.unsafeCoerceTransform raw
|
||||
|
||||
gzip :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X (Maybe Buffer) (Maybe Buffer) m
|
||||
gzip :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X m (Maybe Buffer) (Maybe Buffer)
|
||||
gzip = fromZlib Zlib.createGzip
|
||||
|
||||
gunzip :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X (Maybe Buffer) (Maybe Buffer) m
|
||||
gunzip :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X m (Maybe Buffer) (Maybe Buffer)
|
||||
gunzip = fromZlib Zlib.createGunzip
|
||||
|
||||
unzip :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X (Maybe Buffer) (Maybe Buffer) m
|
||||
unzip :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X m (Maybe Buffer) (Maybe Buffer)
|
||||
unzip = fromZlib Zlib.createUnzip
|
||||
|
||||
inflate :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X (Maybe Buffer) (Maybe Buffer) m
|
||||
inflate :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X m (Maybe Buffer) (Maybe Buffer)
|
||||
inflate = fromZlib Zlib.createInflate
|
||||
|
||||
deflate :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X (Maybe Buffer) (Maybe Buffer) m
|
||||
deflate :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X m (Maybe Buffer) (Maybe Buffer)
|
||||
deflate = fromZlib Zlib.createDeflate
|
||||
|
||||
brotliCompress :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X (Maybe Buffer) (Maybe Buffer) m
|
||||
brotliCompress :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X m (Maybe Buffer) (Maybe Buffer)
|
||||
brotliCompress = fromZlib Zlib.createBrotliCompress
|
||||
|
||||
brotliDecompress :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X (Maybe Buffer) (Maybe Buffer) m
|
||||
brotliDecompress :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X m (Maybe Buffer) (Maybe Buffer)
|
||||
brotliDecompress = fromZlib Zlib.createBrotliDecompress
|
||||
|
||||
Reference in New Issue
Block a user