10 Commits

5 changed files with 116 additions and 69 deletions

View File

@@ -1,6 +1,6 @@
{
"name": "purescript-csv-stream",
"version": "v1.2.6",
"version": "v1.2.11",
"dependencies": {
"csv-parse": "^5.5.5",
"csv-stringify": "^6.4.6"

View File

@@ -25,7 +25,6 @@ workspace:
- nullable: ">=6.0.0 <7.0.0"
- numbers: ">=9.0.1 <10.0.0"
- ordered-collections: ">=3.2.0 <4.0.0"
- parallel: ">=6.0.0 <7.0.0"
- precise-datetime: ">=7.0.0 <8.0.0"
- prelude: ">=6.0.1 <7.0.0"
- record: ">=4.0.0 <5.0.0"
@@ -35,6 +34,7 @@ workspace:
- tailrec: ">=6.1.0 <7.0.0"
- transformers: ">=6.0.0 <7.0.0"
- typelevel-prelude: ">=7.0.0 <8.0.0"
- unlift: ">=1.0.1 <2.0.0"
- unsafe-coerce: ">=6.0.0 <7.0.0"
test_dependencies:
- console
@@ -43,6 +43,7 @@ workspace:
- arraybuffer-types
- arrays
- bifunctors
- catenable-lists
- console
- const
- contravariant
@@ -61,6 +62,8 @@ workspace:
- foreign
- foreign-object
- formatters
- free
- freet
- functions
- functors
- gen
@@ -71,6 +74,7 @@ workspace:
- lazy
- lists
- maybe
- monad-control
- newtype
- node-buffer
- node-event-emitter
@@ -100,6 +104,7 @@ workspace:
- typelevel-prelude
- unfoldable
- unicode
- unlift
- unsafe-coerce
extra_packages: {}
packages:
@@ -159,6 +164,18 @@ packages:
- newtype
- prelude
- tuples
catenable-lists:
type: registry
version: 7.0.0
integrity: sha256-76vYENhwF4BWTBsjeLuErCH2jqVT4M3R1HX+4RwSftA=
dependencies:
- control
- foldable-traversable
- lists
- maybe
- prelude
- tuples
- unfoldable
console:
type: registry
version: 6.1.0
@@ -352,6 +369,40 @@ packages:
- parsing
- prelude
- transformers
free:
type: registry
version: 7.1.0
integrity: sha256-JAumgEsGSzJCNLD8AaFvuX7CpqS5yruCngi6yI7+V5k=
dependencies:
- catenable-lists
- control
- distributive
- either
- exists
- foldable-traversable
- invariant
- lazy
- maybe
- prelude
- tailrec
- transformers
- tuples
- unsafe-coerce
freet:
type: registry
version: 7.0.0
integrity: sha256-zkL6wU4ZPq8xz1kGFxoliWqyhBksepMJTyA68VEBaJo=
dependencies:
- aff
- bifunctors
- effect
- either
- exists
- free
- prelude
- tailrec
- transformers
- tuples
functions:
type: registry
version: 6.0.0
@@ -461,6 +512,15 @@ packages:
- invariant
- newtype
- prelude
monad-control:
type: registry
version: 5.0.0
integrity: sha256-bgfDW30wbIm70NR1Tvvh9P+VFQMDh1wK2sSJXCj/dZc=
dependencies:
- aff
- freet
- identity
- lists
newtype:
type: registry
version: 5.0.0
@@ -784,6 +844,23 @@ packages:
- foldable-traversable
- maybe
- strings
unlift:
type: registry
version: 1.0.1
integrity: sha256-nbBCVV0fZz/3UHKoW11dcTwBYmQOIgK31ht2BN47RPw=
dependencies:
- aff
- effect
- either
- freet
- identity
- lists
- maybe
- monad-control
- prelude
- st
- transformers
- tuples
unsafe-coerce:
type: registry
version: 6.0.0

View File

@@ -1,7 +1,7 @@
package:
name: csv-stream
publish:
version: '1.2.6'
version: '1.2.11'
license: 'GPL-3.0-or-later'
location:
githubOwner: 'cakekindel'
@@ -32,7 +32,6 @@ package:
- nullable: ">=6.0.0 <7.0.0"
- numbers: ">=9.0.1 <10.0.0"
- ordered-collections: ">=3.2.0 <4.0.0"
- parallel: ">=6.0.0 <7.0.0"
- precise-datetime: ">=7.0.0 <8.0.0"
- prelude: ">=6.0.1 <7.0.0"
- record: ">=4.0.0 <5.0.0"
@@ -42,6 +41,7 @@ package:
- tailrec: ">=6.1.0 <7.0.0"
- transformers: ">=6.0.0 <7.0.0"
- typelevel-prelude: ">=7.0.0 <8.0.0"
- unlift: ">=1.0.1 <2.0.0"
- unsafe-coerce: ">=6.0.0 <7.0.0"
test:
main: Test.Main

View File

@@ -12,6 +12,7 @@ export const makeImpl = (c) => () => {
const parser = new ParserWithColumns(c);
parser.once("readable", () => {
parser.columns = parser.read();
parser.emit('columns', parser.columns)
});
return parser;
};

View File

@@ -3,15 +3,12 @@ module Node.Stream.CSV.Parse where
import Prelude hiding (join)
import Control.Alt ((<|>))
import Control.Alternative (guard, empty)
import Control.Monad.Error.Class (liftEither)
import Control.Monad.Error.Class (liftEither, liftMaybe)
import Control.Monad.Except (runExcept)
import Control.Monad.Except.Trans (catchError)
import Control.Monad.Maybe.Trans (MaybeT(..), runMaybeT)
import Control.Monad.Rec.Class (class MonadRec, untilJust, whileJust)
import Control.Monad.ST.Class (liftST)
import Control.Monad.Trans.Class (lift)
import Control.MonadPlus (class Alternative)
import Control.Parallel (class Parallel, parSequence_)
import Data.Array as Array
import Data.Array.ST as Array.ST
import Data.Bifunctor (lmap)
@@ -24,11 +21,10 @@ import Data.Maybe (Maybe(..), isNothing)
import Data.Newtype (wrap)
import Data.Nullable (Nullable)
import Data.Nullable as Nullable
import Data.Traversable (for)
import Effect (Effect)
import Effect as Effect
import Effect.Aff (Canceler(..), delay, makeAff)
import Effect.Aff (Canceler(..), delay, launchAff_, makeAff)
import Effect.Aff.Class (class MonadAff, liftAff)
import Effect.Aff.Unlift (class MonadUnliftAff, UnliftAff(..), askUnliftAff)
import Effect.Class (liftEffect)
import Effect.Exception (error)
import Effect.Uncurried (mkEffectFn1)
@@ -93,11 +89,9 @@ make = makeImpl <<< unsafeToForeign <<< Object.union (recordToForeign { columns:
-- | Synchronously parse a CSV string
parse
:: forall @r rl @config missing extra m p
. Alternative p
=> Parallel p m
:: forall @r rl @config missing extra m
. MonadUnliftAff m
=> MonadAff m
=> MonadRec m
=> RowToList r rl
=> ReadCSVRecord r rl
=> Union config missing (Config extra)
@@ -112,64 +106,37 @@ parse config csv = do
-- | Loop until the stream is closed, invoking the callback with each record as it is parsed.
foreach
:: forall @r rl x m p
. Alternative p
=> Parallel p m
=> MonadRec m
:: forall @r rl x m
. MonadUnliftAff m
=> MonadAff m
=> RowToList r rl
=> ReadCSVRecord r rl
=> CSVParser r x
-> ({ | r } -> m Unit)
-> m Unit
foreach stream cb =
do
q <- liftEffect $ liftST $ Array.ST.new
foreach stream cb = do
UnliftAff unlift <- askUnliftAff
let
deque = liftEffect $ liftST $ Array.ST.shift q
enque a = liftEffect $ liftST $ Array.ST.push a q
alreadyHaveCols <- liftEffect $ getOrInitColumnsMap stream
when (isNothing alreadyHaveCols)
$ liftAff
$ makeAff \res -> pure mempty <* flip (Event.once columnsH) stream $ const do
void $ getOrInitColumnsMap stream
res $ Right unit
waitReadable =
makeAff \res -> do
stop <- flip (Event.once Stream.readableH) stream $ res $ Right unit
pure $ Canceler $ const $ liftEffect stop
liftAff $ makeAff \res -> do
removeDataListener <- flip (Event.on dataH) stream \row -> launchAff_ $ delay (wrap 0.0) <* liftEffect do
cols <- liftMaybe (error "unreachable") =<< getOrInitColumnsMap stream
record <- liftEither $ lmap (error <<< show) $ runExcept $ readCSVRecord @r @rl cols row
launchAff_ $ flip catchError (liftEffect <<< res <<< Left) (unlift $ cb record)
removeEndListener <- flip (Event.once Stream.endH) stream (res $ Right unit)
removeErrorListener <- flip (Event.on Stream.errorH) stream (res <<< Left)
processQ =
untilJust
$ runMaybeT
$ do
liftAff $ delay $ wrap 0.0
r <- deque
isClosed <- liftEffect $ Stream.closed stream
if isClosed && isNothing r then
pure unit
else if isNothing r then do
liftAff $ delay $ wrap 10.0
empty
else do
r' <- MaybeT $ pure r
lift $ cb r'
empty
pure $ Canceler $ const $ liftEffect do
removeDataListener
removeEndListener
removeErrorListener
readToQ =
whileJust
$ runMaybeT
$ do
liftAff $ delay $ wrap 0.0
guard =<< not <$> liftEffect (Stream.closed stream)
isReadable <- liftEffect $ Stream.readable stream
liftAff $ when (not isReadable) waitReadable
liftEffect $ Effect.untilE do
r <- read @r stream
void $ for r enque
pure $ isNothing r
guard =<< not <$> liftEffect (Stream.closed stream)
pure unit
parSequence_ [readToQ, processQ]
-- | Reads a parsed record from the stream.
-- |
-- | Returns `Nothing` when either:
@@ -188,10 +155,8 @@ read stream = runMaybeT do
-- | Collect all parsed records into an array
readAll
:: forall @r rl a m p
. Alternative p
=> Parallel p m
=> MonadRec m
:: forall @r rl a m
. MonadUnliftAff m
=> MonadAff m
=> RowToList r rl
=> ReadCSVRecord r rl
@@ -203,9 +168,13 @@ readAll stream = do
liftEffect $ liftST $ Array.ST.unsafeFreeze records
-- | `data` event. Emitted when a CSV record has been parsed.
dataH :: forall r a. EventHandle1 (CSVParser r a) { | r }
dataH :: forall r a. EventHandle1 (CSVParser r a) (Array String)
dataH = EventHandle "data" mkEffectFn1
-- | `columns` event. Emitted when the header row has been parsed.
columnsH :: forall r a. EventHandle1 (CSVParser r a) (Array String)
columnsH = EventHandle "columns" mkEffectFn1
-- | FFI
foreign import makeImpl :: forall r. Foreign -> Effect (Stream r)