6 Commits

Author SHA1 Message Date
2be8960836 chore: prepare v1.2.6 2024-05-03 13:06:39 -05:00
4c72c8f3b7 fix: bugs 2024-05-03 13:06:36 -05:00
a3be110749 chore: prepare v1.2.5 2024-05-03 13:00:45 -05:00
874503a300 fix: bug 2024-05-03 12:58:52 -05:00
05b61d84f0 chore: prepare v1.2.4 2024-05-03 12:47:28 -05:00
d6638ead1d fix: maybe this is faster? 2024-05-03 12:47:12 -05:00
3 changed files with 51 additions and 27 deletions

View File

@@ -1,6 +1,6 @@
{
"name": "purescript-csv-stream",
"version": "v1.2.3",
"version": "v1.2.6",
"dependencies": {
"csv-parse": "^5.5.5",
"csv-stringify": "^6.4.6"

View File

@@ -1,7 +1,7 @@
package:
name: csv-stream
publish:
version: '1.2.3'
version: '1.2.6'
license: 'GPL-3.0-or-later'
location:
githubOwner: 'cakekindel'

View File

@@ -3,15 +3,15 @@ module Node.Stream.CSV.Parse where
import Prelude hiding (join)
import Control.Alt ((<|>))
import Control.Alternative (guard)
import Control.Alternative (guard, empty)
import Control.Monad.Error.Class (liftEither)
import Control.Monad.Except (runExcept)
import Control.Monad.Maybe.Trans (MaybeT(..), runMaybeT)
import Control.Monad.Rec.Class (class MonadRec, whileJust)
import Control.Monad.ST.Global as ST
import Control.Monad.Rec.Class (class MonadRec, untilJust, whileJust)
import Control.Monad.ST.Class (liftST)
import Control.Monad.Trans.Class (lift)
import Control.MonadPlus (class Alternative)
import Control.Parallel (class Parallel, parTraverse_)
import Control.Parallel (class Parallel, parSequence_)
import Data.Array as Array
import Data.Array.ST as Array.ST
import Data.Bifunctor (lmap)
@@ -123,29 +123,53 @@ foreach
-> ({ | r } -> m Unit)
-> m Unit
foreach stream cb =
whileJust
$ runMaybeT
$ do
liftAff $ delay $ wrap 0.0
do
q <- liftEffect $ liftST $ Array.ST.new
guard =<< not <$> liftEffect (Stream.closed stream)
let
deque = liftEffect $ liftST $ Array.ST.shift q
enque a = liftEffect $ liftST $ Array.ST.push a q
isReadable <- liftEffect $ Stream.readable stream
liftAff $ when (not isReadable) $ makeAff \res -> do
stop <- flip (Event.once Stream.readableH) stream $ res $ Right unit
pure $ Canceler $ const $ liftEffect stop
waitReadable =
makeAff \res -> do
stop <- flip (Event.once Stream.readableH) stream $ res $ Right unit
pure $ Canceler $ const $ liftEffect stop
recordsST <- liftEffect $ ST.toEffect $ Array.ST.new
liftEffect $ Effect.untilE do
r <- read @r stream
void $ for r $ ST.toEffect <<< flip Array.ST.push recordsST
pure $ isNothing r
records <- liftEffect $ ST.toEffect $ Array.ST.unsafeFreeze recordsST
processQ =
untilJust
$ runMaybeT
$ do
liftAff $ delay $ wrap 0.0
r <- deque
isClosed <- liftEffect $ Stream.closed stream
if isClosed && isNothing r then
pure unit
else if isNothing r then do
liftAff $ delay $ wrap 10.0
empty
else do
r' <- MaybeT $ pure r
lift $ cb r'
empty
lift $ parTraverse_ cb records
guard =<< not <$> liftEffect (Stream.closed stream)
pure unit
readToQ =
whileJust
$ runMaybeT
$ do
liftAff $ delay $ wrap 0.0
guard =<< not <$> liftEffect (Stream.closed stream)
isReadable <- liftEffect $ Stream.readable stream
liftAff $ when (not isReadable) waitReadable
liftEffect $ Effect.untilE do
r <- read @r stream
void $ for r enque
pure $ isNothing r
guard =<< not <$> liftEffect (Stream.closed stream)
pure unit
parSequence_ [readToQ, processQ]
-- | Reads a parsed record from the stream.
-- |
-- | Returns `Nothing` when either:
@@ -174,9 +198,9 @@ readAll
=> CSVParser r a
-> m (Array { | r })
readAll stream = do
records <- liftEffect $ ST.toEffect $ Array.ST.new
foreach stream $ void <<< liftEffect <<< ST.toEffect <<< flip Array.ST.push records
liftEffect $ ST.toEffect $ Array.ST.unsafeFreeze records
records <- liftEffect $ liftST $ Array.ST.new
foreach stream $ void <<< liftEffect <<< liftST <<< flip Array.ST.push records
liftEffect $ liftST $ Array.ST.unsafeFreeze records
-- | `data` event. Emitted when a CSV record has been parsed.
dataH :: forall r a. EventHandle1 (CSVParser r a) { | r }