21 Commits

Author SHA1 Message Date
a3625ab1b7 chore: prepare v1.2.2 2024-05-03 10:54:17 -05:00
87f42c3bfe fix: more efficiently / less blockingly read? 2024-05-03 10:54:12 -05:00
5f8a82c8d8 fix: replace unneeded parTraverse with traverse 2024-05-03 10:46:48 -05:00
cda17f8d3c chore: prepare v1.2.1 2024-05-03 10:44:42 -05:00
616ceabd9f fix: do not busy loop 2024-05-03 10:44:37 -05:00
eab713cd4e chore: prepare v1.2.0 2024-05-03 10:42:51 -05:00
e21260cd2c feat: Parser.foreach should concurrently process records as they are read
User-provided callback no longer blocks subsequent reads
2024-05-03 10:41:33 -05:00
02090c3129 chore: prepare v1.1.14 2024-05-02 13:17:17 -05:00
bb25b8f562 fix: streaming mode bug 2024-05-02 13:17:14 -05:00
e3c89adaed chore: prepare v1.1.13 2024-05-02 13:07:40 -05:00
854ceacba3 fix: race condition? 2024-05-02 13:07:33 -05:00
a29203ce14 chore: prepare v1.1.12 2024-05-02 13:06:14 -05:00
a7b46d632a fix: race condition? 2024-05-02 13:06:08 -05:00
ee7619b93d chore: prepare v1.1.11 2024-05-02 13:02:54 -05:00
3adbc63df1 fix: race condition? 2024-05-02 13:02:51 -05:00
4f0ddbf75c chore: prepare v1.1.10 2024-05-02 12:56:24 -05:00
1ee358a55b fix: canceler in foreach 2024-05-02 12:56:14 -05:00
30f127788b chore: prepare v1.1.9 2024-05-02 12:00:04 -05:00
1eb6f2242f fix: generalize parser/stringifier to MonadAff 2024-05-02 11:59:50 -05:00
03cc9eba28 chore: prepare v1.1.8 2024-05-01 16:45:30 -05:00
488ea405ff fix: parse numbers properly 2024-05-01 16:45:20 -05:00
11 changed files with 140 additions and 87 deletions

View File

@@ -1,27 +1,27 @@
/** @type {(parser: string, ps: string[]) => import("bun").Subprocess} */
const prettier = (parser, ps) =>
Bun.spawn(['bun', 'x', 'prettier', '--write', '--parser', parser, ...ps], {
stdout: 'inherit',
stderr: 'inherit',
})
Bun.spawn(["bun", "x", "prettier", "--write", "--parser", parser, ...ps], {
stdout: "inherit",
stderr: "inherit",
});
const procs = [
prettier('babel', ['./src/**/*.js', './bun/**/*.js', './.prettierrc.cjs']),
prettier('json', ['./package.json', './jsconfig.json']),
prettier("babel", ["./src/**/*.js", "./bun/**/*.js", "./.prettierrc.cjs"]),
prettier("json", ["./package.json", "./jsconfig.json"]),
Bun.spawn(
[
'bun',
'x',
'purs-tidy',
'format-in-place',
'src/**/*.purs',
'test/**/*.purs',
"bun",
"x",
"purs-tidy",
"format-in-place",
"src/**/*.purs",
"test/**/*.purs",
],
{
stdout: 'inherit',
stderr: 'inherit',
stdout: "inherit",
stderr: "inherit",
},
),
]
];
await Promise.all(procs.map(p => p.exited))
await Promise.all(procs.map((p) => p.exited));

View File

@@ -1,34 +1,34 @@
import { readFile, writeFile } from 'fs/promises'
import { execSync } from 'child_process'
import { readFile, writeFile } from "fs/promises";
import { execSync } from "child_process";
let ver = process.argv[2]
let ver = process.argv[2];
if (!ver) {
console.error(`tag required: bun bun/prepare.js v1.0.0`)
process.exit(1)
console.error(`tag required: bun bun/prepare.js v1.0.0`);
process.exit(1);
} else if (!/v\d+\.\d+\.\d+/.test(ver)) {
console.error(`invalid tag: ${ver}`)
process.exit(1)
console.error(`invalid tag: ${ver}`);
process.exit(1);
}
ver = (/\d+\.\d+\.\d+/.exec(ver) || [])[0] || ''
ver = (/\d+\.\d+\.\d+/.exec(ver) || [])[0] || "";
const pkg = await readFile('./package.json', 'utf8')
const pkgnew = pkg.replace(/"version": ".+"/, `"version": "v${ver}"`)
await writeFile('./package.json', pkgnew)
const pkg = await readFile("./package.json", "utf8");
const pkgnew = pkg.replace(/"version": ".+"/, `"version": "v${ver}"`);
await writeFile("./package.json", pkgnew);
const spago = await readFile('./spago.yaml', 'utf8')
const spagonew = spago.replace(/version: .+/, `version: '${ver}'`)
await writeFile('./spago.yaml', spagonew)
const spago = await readFile("./spago.yaml", "utf8");
const spagonew = spago.replace(/version: .+/, `version: '${ver}'`);
await writeFile("./spago.yaml", spagonew);
const readme = await readFile('./README.md', 'utf8')
const readme = await readFile("./README.md", "utf8");
const readmenew = readme.replace(
/packages\/purescript-csv-stream\/.+?\//g,
`/packages/purescript-csv-stream/${ver}/`,
)
await writeFile('./README.md', readmenew)
);
await writeFile("./README.md", readmenew);
execSync(`git add spago.yaml package.json README.md`)
execSync(`git commit -m 'chore: prepare v${ver}'`)
execSync(`git tag v${ver}`)
execSync(`git push --tags`)
execSync(`git push --mirror github-mirror`)
execSync(`git add spago.yaml package.json README.md`);
execSync(`git commit -m 'chore: prepare v${ver}'`);
execSync(`git tag v${ver}`);
execSync(`git push --tags`);
execSync(`git push --mirror github-mirror`);

View File

@@ -1,6 +1,6 @@
{
"name": "purescript-csv-stream",
"version": "v1.1.7",
"version": "v1.2.2",
"dependencies": {
"csv-parse": "^5.5.5",
"csv-stringify": "^6.4.6"

View File

@@ -25,6 +25,7 @@ workspace:
- nullable: ">=6.0.0 <7.0.0"
- numbers: ">=9.0.1 <10.0.0"
- ordered-collections: ">=3.2.0 <4.0.0"
- parallel: ">=6.0.0 <7.0.0"
- precise-datetime: ">=7.0.0 <8.0.0"
- prelude: ">=6.0.1 <7.0.0"
- record: ">=4.0.0 <5.0.0"

View File

@@ -1,7 +1,7 @@
package:
name: csv-stream
publish:
version: '1.1.7'
version: '1.2.2'
license: 'GPL-3.0-or-later'
location:
githubOwner: 'cakekindel'
@@ -32,6 +32,7 @@ package:
- nullable: ">=6.0.0 <7.0.0"
- numbers: ">=9.0.1 <10.0.0"
- ordered-collections: ">=3.2.0 <4.0.0"
- parallel: ">=6.0.0 <7.0.0"
- precise-datetime: ">=7.0.0 <8.0.0"
- prelude: ">=6.0.1 <7.0.0"
- record: ">=4.0.0 <5.0.0"

View File

@@ -22,11 +22,12 @@ class RowToList r rl <= WriteCSVRecord r rl | rl -> r where
writeCSVRecord :: { | r } -> Array String
instance (RowToList r (Cons k v tailrl), IsSymbol k, WriteCSV v, Lacks k tail, Cons k v tail r, WriteCSVRecord tail tailrl) => WriteCSVRecord r (Cons k v tailrl) where
writeCSVRecord r = let
val = writeCSV $ Record.get (Proxy @k) r
tail = writeCSVRecord @tail @tailrl $ Record.delete (Proxy @k) r
in
[val] <> tail
writeCSVRecord r =
let
val = writeCSV $ Record.get (Proxy @k) r
tail = writeCSVRecord @tail @tailrl $ Record.delete (Proxy @k) r
in
[ val ] <> tail
instance WriteCSVRecord () Nil where
writeCSVRecord _ = []

View File

@@ -9,11 +9,12 @@ import Data.Int as Int
import Data.List.NonEmpty (NonEmptyList)
import Data.Maybe (Maybe(..), maybe)
import Data.Newtype (unwrap)
import Data.Number (fromString) as Number
import Data.Number.Format (toString) as Number
import Data.PreciseDateTime (fromDateTime, fromRFC3339String, toDateTimeLossy, toRFC3339String)
import Data.RFC3339String (RFC3339String(..))
import Data.String as String
import Foreign (ForeignError(..), readInt, readNumber, unsafeToForeign)
import Foreign (ForeignError(..))
class ReadCSV a where
readCSV :: String -> Except (NonEmptyList ForeignError) a
@@ -22,10 +23,10 @@ class WriteCSV a where
writeCSV :: a -> String
instance ReadCSV Int where
readCSV = readInt <<< unsafeToForeign
readCSV s = liftMaybe (pure $ ForeignError $ "invalid integer: " <> s) $ Int.fromString s
instance ReadCSV Number where
readCSV = readNumber <<< unsafeToForeign
readCSV s = liftMaybe (pure $ ForeignError $ "invalid number: " <> s) $ Number.fromString s
instance ReadCSV String where
readCSV = pure

View File

@@ -1,29 +1,29 @@
import {parse, Parser} from 'csv-parse'
import { parse, Parser } from "csv-parse";
class ParserWithColumns extends Parser {
/** @type {Array<string>} */
columns = []
columns = [];
/** @type {Map<string, number> | null} */
columnsMap = null
columnsMap = null;
}
/** @type {(s: import('csv-parse').Options) => () => ParserWithColumns} */
export const makeImpl = c => () => {
const parser = new ParserWithColumns(c)
parser.once('readable', () => {
export const makeImpl = (c) => () => {
const parser = new ParserWithColumns(c);
parser.once("readable", () => {
parser.columns = parser.read();
})
return parser
}
});
return parser;
};
/** @type {(s: ParserWithColumns) => () => Array<string> | null} */
export const readImpl = p => () => p.read();
export const readImpl = (p) => () => p.read();
/** @type {(s: ParserWithColumns) => () => Array<string>} */
export const columnsArrayImpl = p => () => p.columns
export const columnsArrayImpl = (p) => () => p.columns;
/** @type {(s: ParserWithColumns) => () => Map<string, number> | null} */
export const columnsMapImpl = p => () => p.columnsMap
export const columnsMapImpl = (p) => () => p.columnsMap;
/** @type {(s: ParserWithColumns) => (m: Map<string, number>) => () => void} */
export const setColumnsMapImpl = p => m => () => p.columnsMap = m
export const setColumnsMapImpl = (p) => (m) => () => (p.columnsMap = m);

View File

@@ -1,14 +1,16 @@
module Node.Stream.CSV.Parse where
import Prelude
import Prelude hiding (join)
import Control.Alt ((<|>))
import Control.Monad.Error.Class (liftEither)
import Control.Monad.Except (runExcept)
import Control.Monad.Maybe.Trans (MaybeT(..), runMaybeT)
import Control.Monad.Rec.Class (whileJust)
import Control.Monad.Rec.Class (class MonadRec, whileJust)
import Control.Monad.ST.Global as ST
import Control.Monad.Trans.Class (lift)
import Control.MonadPlus (class Alternative)
import Control.Parallel (class Parallel, parTraverse_)
import Data.Array as Array
import Data.Array.ST as Array.ST
import Data.Bifunctor (lmap)
@@ -16,18 +18,20 @@ import Data.CSV.Record (class ReadCSVRecord, readCSVRecord)
import Data.Either (Either(..))
import Data.Filterable (filter)
import Data.Map (Map)
import Data.Maybe (Maybe(..))
import Data.Map as Map
import Data.Maybe (Maybe(..), isNothing)
import Data.Nullable (Nullable)
import Data.Nullable as Nullable
import Data.Traversable (for_)
import Data.Traversable (for)
import Effect (Effect)
import Effect.Aff (Aff, makeAff)
import Effect as Effect
import Effect.Aff (Canceler(..), makeAff)
import Effect.Aff.Class (class MonadAff, liftAff)
import Effect.Class (liftEffect)
import Effect.Exception (error)
import Effect.Uncurried (mkEffectFn1)
import Foreign (Foreign, unsafeToForeign)
import Foreign.Object (Object)
import Data.Map as Map
import Foreign.Object as Object
import Node.Encoding (Encoding(..))
import Node.EventEmitter (EventHandle(..))
@@ -83,10 +87,21 @@ type Config r =
-- | Create a CSVParser
make :: forall @r rl @config @missing @extra. RowToList r rl => ReadCSVRecord r rl => Union config missing (Config extra) => { | config } -> Effect (CSVParser r ())
make = makeImpl <<< unsafeToForeign <<< Object.union (recordToForeign {columns: false, cast: false, cast_date: false}) <<< recordToForeign
make = makeImpl <<< unsafeToForeign <<< Object.union (recordToForeign { columns: false, cast: false, cast_date: false }) <<< recordToForeign
-- | Synchronously parse a CSV string
parse :: forall @r rl @config missing extra. RowToList r rl => ReadCSVRecord r rl => Union config missing (Config extra) => { | config } -> String -> Aff (Array { | r })
parse
:: forall @r rl @config missing extra m p
. Alternative p
=> Parallel p m
=> MonadAff m
=> MonadRec m
=> RowToList r rl
=> ReadCSVRecord r rl
=> Union config missing (Config extra)
=> { | config }
-> String
-> m (Array { | r })
parse config csv = do
stream <- liftEffect $ make @r @config @missing @extra config
void $ liftEffect $ Stream.writeString stream UTF8 csv
@@ -94,14 +109,31 @@ parse config csv = do
readAll stream
-- | Loop until the stream is closed, invoking the callback with each record as it is parsed.
foreach :: forall @r rl x. RowToList r rl => ReadCSVRecord r rl => CSVParser r x -> ({ | r } -> Aff Unit) -> Aff Unit
foreach
:: forall @r rl x m p
. Alternative p
=> Parallel p m
=> MonadRec m
=> MonadAff m
=> RowToList r rl
=> ReadCSVRecord r rl
=> CSVParser r x
-> ({ | r } -> m Unit)
-> m Unit
foreach stream cb = whileJust do
isReadable <- liftEffect $ Stream.readable stream
when (not isReadable) $ makeAff \res -> mempty <* flip (Event.once Stream.readableH) stream $ res $ Right unit
whileJust do
r <- liftEffect $ read @r stream
for_ r cb
pure $ void r
liftAff $ when (not isReadable) $ makeAff \res -> do
stop <- flip (Event.once Stream.readableH) stream $ res $ Right unit
pure $ Canceler $ const $ liftEffect stop
recordsST <- liftEffect $ ST.toEffect $ Array.ST.new
liftEffect $ Effect.untilE do
r <- read @r stream
void $ for r $ ST.toEffect <<< flip Array.ST.push recordsST
pure $ isNothing r
records <- liftEffect $ ST.toEffect $ Array.ST.unsafeFreeze recordsST
parTraverse_ cb records
isClosed <- liftEffect $ Stream.closed stream
pure $ if isClosed then Nothing else Just unit
@@ -110,14 +142,28 @@ foreach stream cb = whileJust do
-- | Returns `Nothing` when either:
-- | - The internal buffer of parsed records has been exhausted, but there will be more (`Node.Stream.readable` and `Node.Stream.closed` are both `false`)
-- | - All records have been processed (`Node.Stream.closed` is `true`)
read :: forall @r rl a. RowToList r rl => ReadCSVRecord r rl => CSVParser r a -> Effect (Maybe { | r })
read
:: forall @r rl a
. RowToList r rl
=> ReadCSVRecord r rl
=> CSVParser r a
-> Effect (Maybe { | r })
read stream = runMaybeT do
cols <- MaybeT $ getOrInitColumnsMap stream
raw :: Array String <- MaybeT $ Nullable.toMaybe <$> readImpl stream
liftEither $ lmap (error <<< show) $ runExcept $ readCSVRecord @r @rl cols raw
-- | Collect all parsed records into an array
readAll :: forall @r rl a. RowToList r rl => ReadCSVRecord r rl => CSVParser r a -> Aff (Array { | r })
readAll
:: forall @r rl a m p
. Alternative p
=> Parallel p m
=> MonadRec m
=> MonadAff m
=> RowToList r rl
=> ReadCSVRecord r rl
=> CSVParser r a
-> m (Array { | r })
readAll stream = do
records <- liftEffect $ ST.toEffect $ Array.ST.new
foreach stream $ void <<< liftEffect <<< ST.toEffect <<< flip Array.ST.push records

View File

@@ -1,7 +1,7 @@
import {stringify} from 'csv-stringify'
import { stringify } from "csv-stringify";
/** @type {(c: import('csv-stringify').Options) => () => import('csv-stringify').Stringifier} */
export const makeImpl = c => () => stringify(c)
export const makeImpl = (c) => () => stringify(c);
/** @type {(s: import('csv-stringify').Stringifier) => (vals: Array<string>) => () => void} */
export const writeImpl = s => vals => () => s.write(vals)
export const writeImpl = (s) => (vals) => () => s.write(vals);

View File

@@ -2,7 +2,7 @@ module Node.Stream.CSV.Stringify where
import Prelude
import Control.Monad.Rec.Class (whileJust)
import Control.Monad.Rec.Class (class MonadRec, whileJust)
import Control.Monad.ST.Global as ST
import Data.Array as Array
import Data.Array.ST as Array.ST
@@ -13,7 +13,8 @@ import Data.Maybe (Maybe(..))
import Data.String.Regex (Regex)
import Data.Traversable (for_)
import Effect (Effect)
import Effect.Aff (Aff, makeAff)
import Effect.Aff (Canceler(..), makeAff)
import Effect.Aff.Class (class MonadAff, liftAff)
import Effect.Class (liftEffect)
import Foreign (Foreign, unsafeToForeign)
import Foreign.Object (Object)
@@ -64,10 +65,10 @@ recordToForeign = unsafeCoerce
-- | Create a CSVStringifier
make :: forall @r rl @config @missing @extra. Keys rl => RowToList r rl => WriteCSVRecord r rl => Union config missing (Config extra) => { | config } -> Effect (CSVStringifier r ())
make = makeImpl <<< unsafeToForeign <<< Object.union (recordToForeign {columns: Array.fromFoldable $ keys (Proxy @r)}) <<< recordToForeign
make = makeImpl <<< unsafeToForeign <<< Object.union (recordToForeign { columns: Array.fromFoldable $ keys (Proxy @r) }) <<< recordToForeign
-- | Synchronously stringify a collection of records
stringify :: forall @r rl f @config missing extra. Keys rl => Foldable f => RowToList r rl => WriteCSVRecord r rl => Union config missing (Config extra) => { | config } -> f { | r } -> Aff String
stringify :: forall @r rl f m @config missing extra. MonadAff m => MonadRec m => Keys rl => Foldable f => RowToList r rl => WriteCSVRecord r rl => Union config missing (Config extra) => { | config } -> f { | r } -> m String
stringify config records = do
stream <- liftEffect $ make @r @config @missing @extra config
liftEffect $ for_ records \r -> write stream r
@@ -82,10 +83,12 @@ write :: forall @r rl a. RowToList r rl => WriteCSVRecord r rl => CSVStringifier
write s = writeImpl s <<< writeCSVRecord @r @rl
-- | Loop until the stream is closed, invoking the callback with each chunk of stringified CSV text.
foreach :: forall r x. CSVStringifier r x -> (String -> Aff Unit) -> Aff Unit
foreach :: forall m r x. MonadAff m => MonadRec m => CSVStringifier r x -> (String -> m Unit) -> m Unit
foreach stream cb = whileJust do
isReadable <- liftEffect $ Stream.readable stream
when (not isReadable) $ makeAff \res -> mempty <* flip (Event.once Stream.readableH) stream $ res $ Right unit
liftAff $ when (not isReadable) $ makeAff \res -> do
stop <- flip (Event.once Stream.readableH) stream $ res $ Right unit
pure $ Canceler $ const $ liftEffect stop
whileJust do
s <- liftEffect $ (join <<< map blush) <$> Stream.readEither stream
for_ s cb
@@ -94,7 +97,7 @@ foreach stream cb = whileJust do
pure $ if isClosed then Nothing else Just unit
-- | Read the stringified chunks until end-of-stream, returning the entire CSV string.
readAll :: forall r a. CSVStringifier r a -> Aff String
readAll :: forall r a m. MonadAff m => MonadRec m => CSVStringifier r a -> m String
readAll stream = do
chunks <- liftEffect $ ST.toEffect $ Array.ST.new
foreach stream $ void <<< liftEffect <<< ST.toEffect <<< flip Array.ST.push chunks