14 Commits

Author SHA1 Message Date
8e842feeef chore: prepare v1.2.12 2024-05-03 14:23:52 -05:00
52dd297c9c fix: what 2024-05-03 14:22:07 -05:00
39276c2546 chore: prepare v1.2.11 2024-05-03 14:05:54 -05:00
8ba33e88cf fix: maybe this has already been emitted 2024-05-03 14:05:51 -05:00
6b28c7fdf7 chore: prepare v1.2.10 2024-05-03 14:00:39 -05:00
093fff058d fix: add columns event 2024-05-03 14:00:35 -05:00
c8d83e8cf3 chore: prepare v1.2.9 2024-05-03 13:55:26 -05:00
559967d7a7 fix: scheduler nonsense 2024-05-03 13:55:16 -05:00
ebf048b431 chore: prepare v1.2.8 2024-05-03 13:45:33 -05:00
8201ede7c4 fix: reading the columns emits data 2024-05-03 13:45:27 -05:00
3a5553fd29 chore: prepare v1.2.7 2024-05-03 13:40:14 -05:00
42e779a2a7 fix: rework to just use streaming mode 2024-05-03 13:40:06 -05:00
2be8960836 chore: prepare v1.2.6 2024-05-03 13:06:39 -05:00
4c72c8f3b7 fix: bugs 2024-05-03 13:06:36 -05:00
5 changed files with 118 additions and 69 deletions

View File

@@ -1,6 +1,6 @@
{
"name": "purescript-csv-stream",
"version": "v1.2.5",
"version": "v1.2.12",
"dependencies": {
"csv-parse": "^5.5.5",
"csv-stringify": "^6.4.6"

View File

@@ -25,7 +25,6 @@ workspace:
- nullable: ">=6.0.0 <7.0.0"
- numbers: ">=9.0.1 <10.0.0"
- ordered-collections: ">=3.2.0 <4.0.0"
- parallel: ">=6.0.0 <7.0.0"
- precise-datetime: ">=7.0.0 <8.0.0"
- prelude: ">=6.0.1 <7.0.0"
- record: ">=4.0.0 <5.0.0"
@@ -35,6 +34,7 @@ workspace:
- tailrec: ">=6.1.0 <7.0.0"
- transformers: ">=6.0.0 <7.0.0"
- typelevel-prelude: ">=7.0.0 <8.0.0"
- unlift: ">=1.0.1 <2.0.0"
- unsafe-coerce: ">=6.0.0 <7.0.0"
test_dependencies:
- console
@@ -43,6 +43,7 @@ workspace:
- arraybuffer-types
- arrays
- bifunctors
- catenable-lists
- console
- const
- contravariant
@@ -61,6 +62,8 @@ workspace:
- foreign
- foreign-object
- formatters
- free
- freet
- functions
- functors
- gen
@@ -71,6 +74,7 @@ workspace:
- lazy
- lists
- maybe
- monad-control
- newtype
- node-buffer
- node-event-emitter
@@ -100,6 +104,7 @@ workspace:
- typelevel-prelude
- unfoldable
- unicode
- unlift
- unsafe-coerce
extra_packages: {}
packages:
@@ -159,6 +164,18 @@ packages:
- newtype
- prelude
- tuples
catenable-lists:
type: registry
version: 7.0.0
integrity: sha256-76vYENhwF4BWTBsjeLuErCH2jqVT4M3R1HX+4RwSftA=
dependencies:
- control
- foldable-traversable
- lists
- maybe
- prelude
- tuples
- unfoldable
console:
type: registry
version: 6.1.0
@@ -352,6 +369,40 @@ packages:
- parsing
- prelude
- transformers
free:
type: registry
version: 7.1.0
integrity: sha256-JAumgEsGSzJCNLD8AaFvuX7CpqS5yruCngi6yI7+V5k=
dependencies:
- catenable-lists
- control
- distributive
- either
- exists
- foldable-traversable
- invariant
- lazy
- maybe
- prelude
- tailrec
- transformers
- tuples
- unsafe-coerce
freet:
type: registry
version: 7.0.0
integrity: sha256-zkL6wU4ZPq8xz1kGFxoliWqyhBksepMJTyA68VEBaJo=
dependencies:
- aff
- bifunctors
- effect
- either
- exists
- free
- prelude
- tailrec
- transformers
- tuples
functions:
type: registry
version: 6.0.0
@@ -461,6 +512,15 @@ packages:
- invariant
- newtype
- prelude
monad-control:
type: registry
version: 5.0.0
integrity: sha256-bgfDW30wbIm70NR1Tvvh9P+VFQMDh1wK2sSJXCj/dZc=
dependencies:
- aff
- freet
- identity
- lists
newtype:
type: registry
version: 5.0.0
@@ -784,6 +844,23 @@ packages:
- foldable-traversable
- maybe
- strings
unlift:
type: registry
version: 1.0.1
integrity: sha256-nbBCVV0fZz/3UHKoW11dcTwBYmQOIgK31ht2BN47RPw=
dependencies:
- aff
- effect
- either
- freet
- identity
- lists
- maybe
- monad-control
- prelude
- st
- transformers
- tuples
unsafe-coerce:
type: registry
version: 6.0.0

View File

@@ -1,7 +1,7 @@
package:
name: csv-stream
publish:
version: '1.2.5'
version: '1.2.12'
license: 'GPL-3.0-or-later'
location:
githubOwner: 'cakekindel'
@@ -32,7 +32,6 @@ package:
- nullable: ">=6.0.0 <7.0.0"
- numbers: ">=9.0.1 <10.0.0"
- ordered-collections: ">=3.2.0 <4.0.0"
- parallel: ">=6.0.0 <7.0.0"
- precise-datetime: ">=7.0.0 <8.0.0"
- prelude: ">=6.0.1 <7.0.0"
- record: ">=4.0.0 <5.0.0"
@@ -42,6 +41,7 @@ package:
- tailrec: ">=6.1.0 <7.0.0"
- transformers: ">=6.0.0 <7.0.0"
- typelevel-prelude: ">=7.0.0 <8.0.0"
- unlift: ">=1.0.1 <2.0.0"
- unsafe-coerce: ">=6.0.0 <7.0.0"
test:
main: Test.Main

View File

@@ -12,6 +12,7 @@ export const makeImpl = (c) => () => {
const parser = new ParserWithColumns(c);
parser.once("readable", () => {
parser.columns = parser.read();
parser.emit('columns', parser.columns)
});
return parser;
};

View File

@@ -3,15 +3,12 @@ module Node.Stream.CSV.Parse where
import Prelude hiding (join)
import Control.Alt ((<|>))
import Control.Alternative (guard, empty)
import Control.Monad.Error.Class (liftEither)
import Control.Monad.Error.Class (liftEither, liftMaybe)
import Control.Monad.Except (runExcept)
import Control.Monad.Except.Trans (catchError)
import Control.Monad.Maybe.Trans (MaybeT(..), runMaybeT)
import Control.Monad.Rec.Class (class MonadRec, untilJust, whileJust)
import Control.Monad.ST.Class (liftST)
import Control.Monad.Trans.Class (lift)
import Control.MonadPlus (class Alternative)
import Control.Parallel (class Parallel, parSequence_)
import Data.Array as Array
import Data.Array.ST as Array.ST
import Data.Bifunctor (lmap)
@@ -24,11 +21,10 @@ import Data.Maybe (Maybe(..), isNothing)
import Data.Newtype (wrap)
import Data.Nullable (Nullable)
import Data.Nullable as Nullable
import Data.Traversable (for)
import Effect (Effect)
import Effect as Effect
import Effect.Aff (Canceler(..), delay, makeAff)
import Effect.Aff (Canceler(..), delay, launchAff_, makeAff)
import Effect.Aff.Class (class MonadAff, liftAff)
import Effect.Aff.Unlift (class MonadUnliftAff, UnliftAff(..), askUnliftAff)
import Effect.Class (liftEffect)
import Effect.Exception (error)
import Effect.Uncurried (mkEffectFn1)
@@ -93,11 +89,9 @@ make = makeImpl <<< unsafeToForeign <<< Object.union (recordToForeign { columns:
-- | Synchronously parse a CSV string
parse
:: forall @r rl @config missing extra m p
. Alternative p
=> Parallel p m
:: forall @r rl @config missing extra m
. MonadUnliftAff m
=> MonadAff m
=> MonadRec m
=> RowToList r rl
=> ReadCSVRecord r rl
=> Union config missing (Config extra)
@@ -112,64 +106,39 @@ parse config csv = do
-- | Loop until the stream is closed, invoking the callback with each record as it is parsed.
foreach
:: forall @r rl x m p
. Alternative p
=> Parallel p m
=> MonadRec m
:: forall @r rl x m
. MonadUnliftAff m
=> MonadAff m
=> RowToList r rl
=> ReadCSVRecord r rl
=> CSVParser r x
-> ({ | r } -> m Unit)
-> m Unit
foreach stream cb =
do
q <- liftEffect $ liftST $ Array.ST.new
foreach stream cb = do
UnliftAff unlift <- askUnliftAff
let
deque = liftEffect $ liftST $ Array.ST.shift q
enque a = liftEffect $ liftST $ Array.ST.push a q
alreadyHaveCols <- liftEffect $ getOrInitColumnsMap stream
when (isNothing alreadyHaveCols)
$ liftAff
$ makeAff \res -> do
stop <- flip (Event.once columnsH) stream $ const do
void $ getOrInitColumnsMap stream
res $ Right unit
pure $ Canceler $ const $ liftEffect stop
waitReadable =
makeAff \res -> do
stop <- flip (Event.once Stream.readableH) stream $ res $ Right unit
pure $ Canceler $ const $ liftEffect stop
liftAff $ makeAff \res -> do
removeDataListener <- flip (Event.on dataH) stream \row -> launchAff_ $ delay (wrap 0.0) <* liftEffect do
cols <- liftMaybe (error "unreachable") =<< getOrInitColumnsMap stream
record <- liftEither $ lmap (error <<< show) $ runExcept $ readCSVRecord @r @rl cols row
launchAff_ $ flip catchError (liftEffect <<< res <<< Left) (unlift $ cb record)
removeEndListener <- flip (Event.once Stream.endH) stream (res $ Right unit)
removeErrorListener <- flip (Event.on Stream.errorH) stream (res <<< Left)
processQ =
untilJust
$ runMaybeT
$ do
liftAff $ delay $ wrap 0.0
r <- deque
isClosed <- liftEffect $ Stream.closed stream
if isClosed && isNothing r then
pure unit
else if isNothing r then do
liftAff $ delay $ wrap 10.0
empty
else do
r' <- MaybeT $ pure r
lift $ cb r'
pure unit
pure $ Canceler $ const $ liftEffect do
removeDataListener
removeEndListener
removeErrorListener
readToQ =
whileJust
$ runMaybeT
$ do
liftAff $ delay $ wrap 0.0
guard =<< not <$> liftEffect (Stream.closed stream)
isReadable <- liftEffect $ Stream.readable stream
liftAff $ when (not isReadable) waitReadable
liftEffect $ Effect.untilE do
r <- read @r stream
void $ for r enque
pure $ isNothing r
guard =<< not <$> liftEffect (Stream.closed stream)
pure unit
parSequence_ [readToQ, processQ]
-- | Reads a parsed record from the stream.
-- |
-- | Returns `Nothing` when either:
@@ -188,10 +157,8 @@ read stream = runMaybeT do
-- | Collect all parsed records into an array
readAll
:: forall @r rl a m p
. Alternative p
=> Parallel p m
=> MonadRec m
:: forall @r rl a m
. MonadUnliftAff m
=> MonadAff m
=> RowToList r rl
=> ReadCSVRecord r rl
@@ -203,9 +170,13 @@ readAll stream = do
liftEffect $ liftST $ Array.ST.unsafeFreeze records
-- | `data` event. Emitted when a CSV record has been parsed.
dataH :: forall r a. EventHandle1 (CSVParser r a) { | r }
dataH :: forall r a. EventHandle1 (CSVParser r a) (Array String)
dataH = EventHandle "data" mkEffectFn1
-- | `columns` event. Emitted when the header row has been parsed.
columnsH :: forall r a. EventHandle1 (CSVParser r a) (Array String)
columnsH = EventHandle "columns" mkEffectFn1
-- | FFI
foreign import makeImpl :: forall r. Foreign -> Effect (Stream r)