17 Commits

Author SHA1 Message Date
a3625ab1b7 chore: prepare v1.2.2 2024-05-03 10:54:17 -05:00
87f42c3bfe fix: more efficiently / less blockingly read? 2024-05-03 10:54:12 -05:00
5f8a82c8d8 fix: replace unneeded parTraverse with traverse 2024-05-03 10:46:48 -05:00
cda17f8d3c chore: prepare v1.2.1 2024-05-03 10:44:42 -05:00
616ceabd9f fix: do not busy loop 2024-05-03 10:44:37 -05:00
eab713cd4e chore: prepare v1.2.0 2024-05-03 10:42:51 -05:00
e21260cd2c feat: Parser.foreach should concurrently process records as they are read
User-provided callback no longer blocks subsequent reads
2024-05-03 10:41:33 -05:00
02090c3129 chore: prepare v1.1.14 2024-05-02 13:17:17 -05:00
bb25b8f562 fix: streaming mode bug 2024-05-02 13:17:14 -05:00
e3c89adaed chore: prepare v1.1.13 2024-05-02 13:07:40 -05:00
854ceacba3 fix: race condition? 2024-05-02 13:07:33 -05:00
a29203ce14 chore: prepare v1.1.12 2024-05-02 13:06:14 -05:00
a7b46d632a fix: race condition? 2024-05-02 13:06:08 -05:00
ee7619b93d chore: prepare v1.1.11 2024-05-02 13:02:54 -05:00
3adbc63df1 fix: race condition? 2024-05-02 13:02:51 -05:00
4f0ddbf75c chore: prepare v1.1.10 2024-05-02 12:56:24 -05:00
1ee358a55b fix: canceler in foreach 2024-05-02 12:56:14 -05:00
10 changed files with 128 additions and 78 deletions

View File

@@ -1,27 +1,27 @@
/** @type {(parser: string, ps: string[]) => import("bun").Subprocess} */
const prettier = (parser, ps) =>
Bun.spawn(['bun', 'x', 'prettier', '--write', '--parser', parser, ...ps], {
stdout: 'inherit',
stderr: 'inherit',
})
Bun.spawn(["bun", "x", "prettier", "--write", "--parser", parser, ...ps], {
stdout: "inherit",
stderr: "inherit",
});
const procs = [
prettier('babel', ['./src/**/*.js', './bun/**/*.js', './.prettierrc.cjs']),
prettier('json', ['./package.json', './jsconfig.json']),
prettier("babel", ["./src/**/*.js", "./bun/**/*.js", "./.prettierrc.cjs"]),
prettier("json", ["./package.json", "./jsconfig.json"]),
Bun.spawn(
[
'bun',
'x',
'purs-tidy',
'format-in-place',
'src/**/*.purs',
'test/**/*.purs',
"bun",
"x",
"purs-tidy",
"format-in-place",
"src/**/*.purs",
"test/**/*.purs",
],
{
stdout: 'inherit',
stderr: 'inherit',
stdout: "inherit",
stderr: "inherit",
},
),
]
];
await Promise.all(procs.map(p => p.exited))
await Promise.all(procs.map((p) => p.exited));

View File

@@ -1,34 +1,34 @@
import { readFile, writeFile } from 'fs/promises'
import { execSync } from 'child_process'
import { readFile, writeFile } from "fs/promises";
import { execSync } from "child_process";
let ver = process.argv[2]
let ver = process.argv[2];
if (!ver) {
console.error(`tag required: bun bun/prepare.js v1.0.0`)
process.exit(1)
console.error(`tag required: bun bun/prepare.js v1.0.0`);
process.exit(1);
} else if (!/v\d+\.\d+\.\d+/.test(ver)) {
console.error(`invalid tag: ${ver}`)
process.exit(1)
console.error(`invalid tag: ${ver}`);
process.exit(1);
}
ver = (/\d+\.\d+\.\d+/.exec(ver) || [])[0] || ''
ver = (/\d+\.\d+\.\d+/.exec(ver) || [])[0] || "";
const pkg = await readFile('./package.json', 'utf8')
const pkgnew = pkg.replace(/"version": ".+"/, `"version": "v${ver}"`)
await writeFile('./package.json', pkgnew)
const pkg = await readFile("./package.json", "utf8");
const pkgnew = pkg.replace(/"version": ".+"/, `"version": "v${ver}"`);
await writeFile("./package.json", pkgnew);
const spago = await readFile('./spago.yaml', 'utf8')
const spagonew = spago.replace(/version: .+/, `version: '${ver}'`)
await writeFile('./spago.yaml', spagonew)
const spago = await readFile("./spago.yaml", "utf8");
const spagonew = spago.replace(/version: .+/, `version: '${ver}'`);
await writeFile("./spago.yaml", spagonew);
const readme = await readFile('./README.md', 'utf8')
const readme = await readFile("./README.md", "utf8");
const readmenew = readme.replace(
/packages\/purescript-csv-stream\/.+?\//g,
`/packages/purescript-csv-stream/${ver}/`,
)
await writeFile('./README.md', readmenew)
);
await writeFile("./README.md", readmenew);
execSync(`git add spago.yaml package.json README.md`)
execSync(`git commit -m 'chore: prepare v${ver}'`)
execSync(`git tag v${ver}`)
execSync(`git push --tags`)
execSync(`git push --mirror github-mirror`)
execSync(`git add spago.yaml package.json README.md`);
execSync(`git commit -m 'chore: prepare v${ver}'`);
execSync(`git tag v${ver}`);
execSync(`git push --tags`);
execSync(`git push --mirror github-mirror`);

View File

@@ -1,6 +1,6 @@
{
"name": "purescript-csv-stream",
"version": "v1.1.9",
"version": "v1.2.2",
"dependencies": {
"csv-parse": "^5.5.5",
"csv-stringify": "^6.4.6"

View File

@@ -25,6 +25,7 @@ workspace:
- nullable: ">=6.0.0 <7.0.0"
- numbers: ">=9.0.1 <10.0.0"
- ordered-collections: ">=3.2.0 <4.0.0"
- parallel: ">=6.0.0 <7.0.0"
- precise-datetime: ">=7.0.0 <8.0.0"
- prelude: ">=6.0.1 <7.0.0"
- record: ">=4.0.0 <5.0.0"

View File

@@ -1,7 +1,7 @@
package:
name: csv-stream
publish:
version: '1.1.9'
version: '1.2.2'
license: 'GPL-3.0-or-later'
location:
githubOwner: 'cakekindel'
@@ -32,6 +32,7 @@ package:
- nullable: ">=6.0.0 <7.0.0"
- numbers: ">=9.0.1 <10.0.0"
- ordered-collections: ">=3.2.0 <4.0.0"
- parallel: ">=6.0.0 <7.0.0"
- precise-datetime: ">=7.0.0 <8.0.0"
- prelude: ">=6.0.1 <7.0.0"
- record: ">=4.0.0 <5.0.0"

View File

@@ -22,11 +22,12 @@ class RowToList r rl <= WriteCSVRecord r rl | rl -> r where
writeCSVRecord :: { | r } -> Array String
instance (RowToList r (Cons k v tailrl), IsSymbol k, WriteCSV v, Lacks k tail, Cons k v tail r, WriteCSVRecord tail tailrl) => WriteCSVRecord r (Cons k v tailrl) where
writeCSVRecord r = let
val = writeCSV $ Record.get (Proxy @k) r
tail = writeCSVRecord @tail @tailrl $ Record.delete (Proxy @k) r
in
[val] <> tail
writeCSVRecord r =
let
val = writeCSV $ Record.get (Proxy @k) r
tail = writeCSVRecord @tail @tailrl $ Record.delete (Proxy @k) r
in
[ val ] <> tail
instance WriteCSVRecord () Nil where
writeCSVRecord _ = []

View File

@@ -1,29 +1,29 @@
import {parse, Parser} from 'csv-parse'
import { parse, Parser } from "csv-parse";
class ParserWithColumns extends Parser {
/** @type {Array<string>} */
columns = []
columns = [];
/** @type {Map<string, number> | null} */
columnsMap = null
columnsMap = null;
}
/** @type {(s: import('csv-parse').Options) => () => ParserWithColumns} */
export const makeImpl = c => () => {
const parser = new ParserWithColumns(c)
parser.once('readable', () => {
export const makeImpl = (c) => () => {
const parser = new ParserWithColumns(c);
parser.once("readable", () => {
parser.columns = parser.read();
})
return parser
}
});
return parser;
};
/** @type {(s: ParserWithColumns) => () => Array<string> | null} */
export const readImpl = p => () => p.read();
export const readImpl = (p) => () => p.read();
/** @type {(s: ParserWithColumns) => () => Array<string>} */
export const columnsArrayImpl = p => () => p.columns
export const columnsArrayImpl = (p) => () => p.columns;
/** @type {(s: ParserWithColumns) => () => Map<string, number> | null} */
export const columnsMapImpl = p => () => p.columnsMap
export const columnsMapImpl = (p) => () => p.columnsMap;
/** @type {(s: ParserWithColumns) => (m: Map<string, number>) => () => void} */
export const setColumnsMapImpl = p => m => () => p.columnsMap = m
export const setColumnsMapImpl = (p) => (m) => () => (p.columnsMap = m);

View File

@@ -1,6 +1,6 @@
module Node.Stream.CSV.Parse where
import Prelude
import Prelude hiding (join)
import Control.Alt ((<|>))
import Control.Monad.Error.Class (liftEither)
@@ -9,6 +9,8 @@ import Control.Monad.Maybe.Trans (MaybeT(..), runMaybeT)
import Control.Monad.Rec.Class (class MonadRec, whileJust)
import Control.Monad.ST.Global as ST
import Control.Monad.Trans.Class (lift)
import Control.MonadPlus (class Alternative)
import Control.Parallel (class Parallel, parTraverse_)
import Data.Array as Array
import Data.Array.ST as Array.ST
import Data.Bifunctor (lmap)
@@ -17,12 +19,13 @@ import Data.Either (Either(..))
import Data.Filterable (filter)
import Data.Map (Map)
import Data.Map as Map
import Data.Maybe (Maybe(..))
import Data.Maybe (Maybe(..), isNothing)
import Data.Nullable (Nullable)
import Data.Nullable as Nullable
import Data.Traversable (for_)
import Data.Traversable (for)
import Effect (Effect)
import Effect.Aff (makeAff)
import Effect as Effect
import Effect.Aff (Canceler(..), makeAff)
import Effect.Aff.Class (class MonadAff, liftAff)
import Effect.Class (liftEffect)
import Effect.Exception (error)
@@ -84,10 +87,21 @@ type Config r =
-- | Create a CSVParser
make :: forall @r rl @config @missing @extra. RowToList r rl => ReadCSVRecord r rl => Union config missing (Config extra) => { | config } -> Effect (CSVParser r ())
make = makeImpl <<< unsafeToForeign <<< Object.union (recordToForeign {columns: false, cast: false, cast_date: false}) <<< recordToForeign
make = makeImpl <<< unsafeToForeign <<< Object.union (recordToForeign { columns: false, cast: false, cast_date: false }) <<< recordToForeign
-- | Synchronously parse a CSV string
parse :: forall @r rl @config missing extra m. MonadAff m => MonadRec m => RowToList r rl => ReadCSVRecord r rl => Union config missing (Config extra) => { | config } -> String -> m (Array { | r })
parse
:: forall @r rl @config missing extra m p
. Alternative p
=> Parallel p m
=> MonadAff m
=> MonadRec m
=> RowToList r rl
=> ReadCSVRecord r rl
=> Union config missing (Config extra)
=> { | config }
-> String
-> m (Array { | r })
parse config csv = do
stream <- liftEffect $ make @r @config @missing @extra config
void $ liftEffect $ Stream.writeString stream UTF8 csv
@@ -95,14 +109,31 @@ parse config csv = do
readAll stream
-- | Loop until the stream is closed, invoking the callback with each record as it is parsed.
foreach :: forall @r rl x m. MonadRec m => MonadAff m => RowToList r rl => ReadCSVRecord r rl => CSVParser r x -> ({ | r } -> m Unit) -> m Unit
foreach
:: forall @r rl x m p
. Alternative p
=> Parallel p m
=> MonadRec m
=> MonadAff m
=> RowToList r rl
=> ReadCSVRecord r rl
=> CSVParser r x
-> ({ | r } -> m Unit)
-> m Unit
foreach stream cb = whileJust do
isReadable <- liftEffect $ Stream.readable stream
liftAff $ when (not isReadable) $ makeAff \res -> mempty <* flip (Event.once Stream.readableH) stream $ res $ Right unit
whileJust do
r <- liftEffect $ read @r stream
for_ r cb
pure $ void r
liftAff $ when (not isReadable) $ makeAff \res -> do
stop <- flip (Event.once Stream.readableH) stream $ res $ Right unit
pure $ Canceler $ const $ liftEffect stop
recordsST <- liftEffect $ ST.toEffect $ Array.ST.new
liftEffect $ Effect.untilE do
r <- read @r stream
void $ for r $ ST.toEffect <<< flip Array.ST.push recordsST
pure $ isNothing r
records <- liftEffect $ ST.toEffect $ Array.ST.unsafeFreeze recordsST
parTraverse_ cb records
isClosed <- liftEffect $ Stream.closed stream
pure $ if isClosed then Nothing else Just unit
@@ -111,14 +142,28 @@ foreach stream cb = whileJust do
-- | Returns `Nothing` when either:
-- | - The internal buffer of parsed records has been exhausted, but there will be more (`Node.Stream.readable` and `Node.Stream.closed` are both `false`)
-- | - All records have been processed (`Node.Stream.closed` is `true`)
read :: forall @r rl a. RowToList r rl => ReadCSVRecord r rl => CSVParser r a -> Effect (Maybe { | r })
read
:: forall @r rl a
. RowToList r rl
=> ReadCSVRecord r rl
=> CSVParser r a
-> Effect (Maybe { | r })
read stream = runMaybeT do
cols <- MaybeT $ getOrInitColumnsMap stream
raw :: Array String <- MaybeT $ Nullable.toMaybe <$> readImpl stream
liftEither $ lmap (error <<< show) $ runExcept $ readCSVRecord @r @rl cols raw
-- | Collect all parsed records into an array
readAll :: forall @r rl a m. MonadRec m => MonadAff m => RowToList r rl => ReadCSVRecord r rl => CSVParser r a -> m (Array { | r })
readAll
:: forall @r rl a m p
. Alternative p
=> Parallel p m
=> MonadRec m
=> MonadAff m
=> RowToList r rl
=> ReadCSVRecord r rl
=> CSVParser r a
-> m (Array { | r })
readAll stream = do
records <- liftEffect $ ST.toEffect $ Array.ST.new
foreach stream $ void <<< liftEffect <<< ST.toEffect <<< flip Array.ST.push records

View File

@@ -1,7 +1,7 @@
import {stringify} from 'csv-stringify'
import { stringify } from "csv-stringify";
/** @type {(c: import('csv-stringify').Options) => () => import('csv-stringify').Stringifier} */
export const makeImpl = c => () => stringify(c)
export const makeImpl = (c) => () => stringify(c);
/** @type {(s: import('csv-stringify').Stringifier) => (vals: Array<string>) => () => void} */
export const writeImpl = s => vals => () => s.write(vals)
export const writeImpl = (s) => (vals) => () => s.write(vals);

View File

@@ -13,7 +13,7 @@ import Data.Maybe (Maybe(..))
import Data.String.Regex (Regex)
import Data.Traversable (for_)
import Effect (Effect)
import Effect.Aff (makeAff)
import Effect.Aff (Canceler(..), makeAff)
import Effect.Aff.Class (class MonadAff, liftAff)
import Effect.Class (liftEffect)
import Foreign (Foreign, unsafeToForeign)
@@ -65,7 +65,7 @@ recordToForeign = unsafeCoerce
-- | Create a CSVStringifier
make :: forall @r rl @config @missing @extra. Keys rl => RowToList r rl => WriteCSVRecord r rl => Union config missing (Config extra) => { | config } -> Effect (CSVStringifier r ())
make = makeImpl <<< unsafeToForeign <<< Object.union (recordToForeign {columns: Array.fromFoldable $ keys (Proxy @r)}) <<< recordToForeign
make = makeImpl <<< unsafeToForeign <<< Object.union (recordToForeign { columns: Array.fromFoldable $ keys (Proxy @r) }) <<< recordToForeign
-- | Synchronously stringify a collection of records
stringify :: forall @r rl f m @config missing extra. MonadAff m => MonadRec m => Keys rl => Foldable f => RowToList r rl => WriteCSVRecord r rl => Union config missing (Config extra) => { | config } -> f { | r } -> m String
@@ -86,7 +86,9 @@ write s = writeImpl s <<< writeCSVRecord @r @rl
foreach :: forall m r x. MonadAff m => MonadRec m => CSVStringifier r x -> (String -> m Unit) -> m Unit
foreach stream cb = whileJust do
isReadable <- liftEffect $ Stream.readable stream
liftAff $ when (not isReadable) $ makeAff \res -> mempty <* flip (Event.once Stream.readableH) stream $ res $ Right unit
liftAff $ when (not isReadable) $ makeAff \res -> do
stop <- flip (Event.once Stream.readableH) stream $ res $ Right unit
pure $ Canceler $ const $ liftEffect stop
whileJust do
s <- liftEffect $ (join <<< map blush) <$> Stream.readEither stream
for_ s cb