17 Commits

Author SHA1 Message Date
05b61d84f0 chore: prepare v1.2.4 2024-05-03 12:47:28 -05:00
d6638ead1d fix: maybe this is faster? 2024-05-03 12:47:12 -05:00
f3d9ea8c11 chore: prepare v1.2.3 2024-05-03 10:58:52 -05:00
07c86f096f fix: timeslice shenanigans 2024-05-03 10:58:47 -05:00
a3625ab1b7 chore: prepare v1.2.2 2024-05-03 10:54:17 -05:00
87f42c3bfe fix: more efficiently / less blockingly read? 2024-05-03 10:54:12 -05:00
5f8a82c8d8 fix: replace unneeded parTraverse with traverse 2024-05-03 10:46:48 -05:00
cda17f8d3c chore: prepare v1.2.1 2024-05-03 10:44:42 -05:00
616ceabd9f fix: do not busy loop 2024-05-03 10:44:37 -05:00
eab713cd4e chore: prepare v1.2.0 2024-05-03 10:42:51 -05:00
e21260cd2c feat: Parser.foreach should concurrently process records as they are read
User-provided callback no longer blocks subsequent reads
2024-05-03 10:41:33 -05:00
02090c3129 chore: prepare v1.1.14 2024-05-02 13:17:17 -05:00
bb25b8f562 fix: streaming mode bug 2024-05-02 13:17:14 -05:00
e3c89adaed chore: prepare v1.1.13 2024-05-02 13:07:40 -05:00
854ceacba3 fix: race condition? 2024-05-02 13:07:33 -05:00
a29203ce14 chore: prepare v1.1.12 2024-05-02 13:06:14 -05:00
a7b46d632a fix: race condition? 2024-05-02 13:06:08 -05:00
10 changed files with 167 additions and 88 deletions

View File

@@ -1,27 +1,27 @@
/** @type {(parser: string, ps: string[]) => import("bun").Subprocess} */
const prettier = (parser, ps) =>
Bun.spawn(['bun', 'x', 'prettier', '--write', '--parser', parser, ...ps], {
stdout: 'inherit',
stderr: 'inherit',
})
Bun.spawn(["bun", "x", "prettier", "--write", "--parser", parser, ...ps], {
stdout: "inherit",
stderr: "inherit",
});
const procs = [
prettier('babel', ['./src/**/*.js', './bun/**/*.js', './.prettierrc.cjs']),
prettier('json', ['./package.json', './jsconfig.json']),
prettier("babel", ["./src/**/*.js", "./bun/**/*.js", "./.prettierrc.cjs"]),
prettier("json", ["./package.json", "./jsconfig.json"]),
Bun.spawn(
[
'bun',
'x',
'purs-tidy',
'format-in-place',
'src/**/*.purs',
'test/**/*.purs',
"bun",
"x",
"purs-tidy",
"format-in-place",
"src/**/*.purs",
"test/**/*.purs",
],
{
stdout: 'inherit',
stderr: 'inherit',
stdout: "inherit",
stderr: "inherit",
},
),
]
];
await Promise.all(procs.map(p => p.exited))
await Promise.all(procs.map((p) => p.exited));

View File

@@ -1,34 +1,34 @@
import { readFile, writeFile } from 'fs/promises'
import { execSync } from 'child_process'
import { readFile, writeFile } from "fs/promises";
import { execSync } from "child_process";
let ver = process.argv[2]
let ver = process.argv[2];
if (!ver) {
console.error(`tag required: bun bun/prepare.js v1.0.0`)
process.exit(1)
console.error(`tag required: bun bun/prepare.js v1.0.0`);
process.exit(1);
} else if (!/v\d+\.\d+\.\d+/.test(ver)) {
console.error(`invalid tag: ${ver}`)
process.exit(1)
console.error(`invalid tag: ${ver}`);
process.exit(1);
}
ver = (/\d+\.\d+\.\d+/.exec(ver) || [])[0] || ''
ver = (/\d+\.\d+\.\d+/.exec(ver) || [])[0] || "";
const pkg = await readFile('./package.json', 'utf8')
const pkgnew = pkg.replace(/"version": ".+"/, `"version": "v${ver}"`)
await writeFile('./package.json', pkgnew)
const pkg = await readFile("./package.json", "utf8");
const pkgnew = pkg.replace(/"version": ".+"/, `"version": "v${ver}"`);
await writeFile("./package.json", pkgnew);
const spago = await readFile('./spago.yaml', 'utf8')
const spagonew = spago.replace(/version: .+/, `version: '${ver}'`)
await writeFile('./spago.yaml', spagonew)
const spago = await readFile("./spago.yaml", "utf8");
const spagonew = spago.replace(/version: .+/, `version: '${ver}'`);
await writeFile("./spago.yaml", spagonew);
const readme = await readFile('./README.md', 'utf8')
const readme = await readFile("./README.md", "utf8");
const readmenew = readme.replace(
/packages\/purescript-csv-stream\/.+?\//g,
`/packages/purescript-csv-stream/${ver}/`,
)
await writeFile('./README.md', readmenew)
);
await writeFile("./README.md", readmenew);
execSync(`git add spago.yaml package.json README.md`)
execSync(`git commit -m 'chore: prepare v${ver}'`)
execSync(`git tag v${ver}`)
execSync(`git push --tags`)
execSync(`git push --mirror github-mirror`)
execSync(`git add spago.yaml package.json README.md`);
execSync(`git commit -m 'chore: prepare v${ver}'`);
execSync(`git tag v${ver}`);
execSync(`git push --tags`);
execSync(`git push --mirror github-mirror`);

View File

@@ -1,6 +1,6 @@
{
"name": "purescript-csv-stream",
"version": "v1.1.11",
"version": "v1.2.4",
"dependencies": {
"csv-parse": "^5.5.5",
"csv-stringify": "^6.4.6"

View File

@@ -25,6 +25,7 @@ workspace:
- nullable: ">=6.0.0 <7.0.0"
- numbers: ">=9.0.1 <10.0.0"
- ordered-collections: ">=3.2.0 <4.0.0"
- parallel: ">=6.0.0 <7.0.0"
- precise-datetime: ">=7.0.0 <8.0.0"
- prelude: ">=6.0.1 <7.0.0"
- record: ">=4.0.0 <5.0.0"

View File

@@ -1,7 +1,7 @@
package:
name: csv-stream
publish:
version: '1.1.11'
version: '1.2.4'
license: 'GPL-3.0-or-later'
location:
githubOwner: 'cakekindel'
@@ -32,6 +32,7 @@ package:
- nullable: ">=6.0.0 <7.0.0"
- numbers: ">=9.0.1 <10.0.0"
- ordered-collections: ">=3.2.0 <4.0.0"
- parallel: ">=6.0.0 <7.0.0"
- precise-datetime: ">=7.0.0 <8.0.0"
- prelude: ">=6.0.1 <7.0.0"
- record: ">=4.0.0 <5.0.0"

View File

@@ -22,11 +22,12 @@ class RowToList r rl <= WriteCSVRecord r rl | rl -> r where
writeCSVRecord :: { | r } -> Array String
instance (RowToList r (Cons k v tailrl), IsSymbol k, WriteCSV v, Lacks k tail, Cons k v tail r, WriteCSVRecord tail tailrl) => WriteCSVRecord r (Cons k v tailrl) where
writeCSVRecord r = let
val = writeCSV $ Record.get (Proxy @k) r
tail = writeCSVRecord @tail @tailrl $ Record.delete (Proxy @k) r
in
[val] <> tail
writeCSVRecord r =
let
val = writeCSV $ Record.get (Proxy @k) r
tail = writeCSVRecord @tail @tailrl $ Record.delete (Proxy @k) r
in
[ val ] <> tail
instance WriteCSVRecord () Nil where
writeCSVRecord _ = []

View File

@@ -1,29 +1,29 @@
import {parse, Parser} from 'csv-parse'
import { parse, Parser } from "csv-parse";
class ParserWithColumns extends Parser {
/** @type {Array<string>} */
columns = []
columns = [];
/** @type {Map<string, number> | null} */
columnsMap = null
columnsMap = null;
}
/** @type {(s: import('csv-parse').Options) => () => ParserWithColumns} */
export const makeImpl = c => () => {
const parser = new ParserWithColumns(c)
while (parser.columns.length === 0) {
parser.columns = parser.read() || [];
}
return parser
}
export const makeImpl = (c) => () => {
const parser = new ParserWithColumns(c);
parser.once("readable", () => {
parser.columns = parser.read();
});
return parser;
};
/** @type {(s: ParserWithColumns) => () => Array<string> | null} */
export const readImpl = p => () => p.read();
export const readImpl = (p) => () => p.read();
/** @type {(s: ParserWithColumns) => () => Array<string>} */
export const columnsArrayImpl = p => () => p.columns
export const columnsArrayImpl = (p) => () => p.columns;
/** @type {(s: ParserWithColumns) => () => Map<string, number> | null} */
export const columnsMapImpl = p => () => p.columnsMap
export const columnsMapImpl = (p) => () => p.columnsMap;
/** @type {(s: ParserWithColumns) => (m: Map<string, number>) => () => void} */
export const setColumnsMapImpl = p => m => () => p.columnsMap = m
export const setColumnsMapImpl = (p) => (m) => () => (p.columnsMap = m);

View File

@@ -1,14 +1,17 @@
module Node.Stream.CSV.Parse where
import Prelude
import Prelude hiding (join)
import Control.Alt ((<|>))
import Control.Alternative (guard)
import Control.Monad.Error.Class (liftEither)
import Control.Monad.Except (runExcept)
import Control.Monad.Maybe.Trans (MaybeT(..), runMaybeT)
import Control.Monad.Rec.Class (class MonadRec, whileJust)
import Control.Monad.ST.Global as ST
import Control.Monad.Rec.Class (class MonadRec, untilJust, whileJust)
import Control.Monad.ST.Class (liftST)
import Control.Monad.Trans.Class (lift)
import Control.MonadPlus (class Alternative)
import Control.Parallel (class Parallel, parSequence_)
import Data.Array as Array
import Data.Array.ST as Array.ST
import Data.Bifunctor (lmap)
@@ -17,12 +20,14 @@ import Data.Either (Either(..))
import Data.Filterable (filter)
import Data.Map (Map)
import Data.Map as Map
import Data.Maybe (Maybe(..))
import Data.Maybe (Maybe(..), isNothing)
import Data.Newtype (wrap)
import Data.Nullable (Nullable)
import Data.Nullable as Nullable
import Data.Traversable (for_)
import Data.Traversable (for)
import Effect (Effect)
import Effect.Aff (Canceler(..), makeAff)
import Effect as Effect
import Effect.Aff (Canceler(..), delay, makeAff)
import Effect.Aff.Class (class MonadAff, liftAff)
import Effect.Class (liftEffect)
import Effect.Exception (error)
@@ -84,10 +89,21 @@ type Config r =
-- | Create a CSVParser
make :: forall @r rl @config @missing @extra. RowToList r rl => ReadCSVRecord r rl => Union config missing (Config extra) => { | config } -> Effect (CSVParser r ())
make = makeImpl <<< unsafeToForeign <<< Object.union (recordToForeign {columns: false, cast: false, cast_date: false}) <<< recordToForeign
make = makeImpl <<< unsafeToForeign <<< Object.union (recordToForeign { columns: false, cast: false, cast_date: false }) <<< recordToForeign
-- | Synchronously parse a CSV string
parse :: forall @r rl @config missing extra m. MonadAff m => MonadRec m => RowToList r rl => ReadCSVRecord r rl => Union config missing (Config extra) => { | config } -> String -> m (Array { | r })
parse
:: forall @r rl @config missing extra m p
. Alternative p
=> Parallel p m
=> MonadAff m
=> MonadRec m
=> RowToList r rl
=> ReadCSVRecord r rl
=> Union config missing (Config extra)
=> { | config }
-> String
-> m (Array { | r })
parse config csv = do
stream <- liftEffect $ make @r @config @missing @extra config
void $ liftEffect $ Stream.writeString stream UTF8 csv
@@ -95,36 +111,96 @@ parse config csv = do
readAll stream
-- | Loop until the stream is closed, invoking the callback with each record as it is parsed.
foreach :: forall @r rl x m. MonadRec m => MonadAff m => RowToList r rl => ReadCSVRecord r rl => CSVParser r x -> ({ | r } -> m Unit) -> m Unit
foreach stream cb = whileJust do
isReadable <- liftEffect $ Stream.readable stream
liftAff $ when (not isReadable) $ makeAff \res -> do
stop <- flip (Event.once Stream.readableH) stream $ res $ Right unit
pure $ Canceler $ const $ liftEffect stop
whileJust do
r <- liftEffect $ read @r stream
for_ r cb
pure $ void r
isClosed <- liftEffect $ Stream.closed stream
pure $ if isClosed then Nothing else Just unit
foreach
:: forall @r rl x m p
. Alternative p
=> Parallel p m
=> MonadRec m
=> MonadAff m
=> RowToList r rl
=> ReadCSVRecord r rl
=> CSVParser r x
-> ({ | r } -> m Unit)
-> m Unit
foreach stream cb =
do
q <- liftEffect $ liftST $ Array.ST.new
let
deque = liftEffect $ liftST $ Array.ST.shift q
enque a = liftEffect $ liftST $ Array.ST.push a q
waitReadable =
makeAff \res -> do
stop <- flip (Event.once Stream.readableH) stream $ res $ Right unit
pure $ Canceler $ const $ liftEffect stop
processQ =
untilJust
$ runMaybeT
$ do
liftAff $ delay $ wrap 0.0
r <- deque
isClosed <- liftEffect $ Stream.closed stream
if isClosed && isNothing r then
pure unit
else if isNothing r then
liftAff $ delay $ wrap 10.0
else do
r' <- MaybeT $ pure r
lift $ cb r'
guard $ isClosed
pure unit
readToQ =
whileJust
$ runMaybeT
$ do
liftAff $ delay $ wrap 0.0
guard =<< not <$> liftEffect (Stream.closed stream)
isReadable <- liftEffect $ Stream.readable stream
liftAff $ when (not isReadable) waitReadable
liftEffect $ Effect.untilE do
r <- read @r stream
void $ for r enque
pure $ isNothing r
guard =<< not <$> liftEffect (Stream.closed stream)
pure unit
parSequence_ [readToQ, processQ]
-- | Reads a parsed record from the stream.
-- |
-- | Returns `Nothing` when either:
-- | - The internal buffer of parsed records has been exhausted, but there will be more (`Node.Stream.readable` and `Node.Stream.closed` are both `false`)
-- | - All records have been processed (`Node.Stream.closed` is `true`)
read :: forall @r rl a. RowToList r rl => ReadCSVRecord r rl => CSVParser r a -> Effect (Maybe { | r })
read
:: forall @r rl a
. RowToList r rl
=> ReadCSVRecord r rl
=> CSVParser r a
-> Effect (Maybe { | r })
read stream = runMaybeT do
cols <- MaybeT $ getOrInitColumnsMap stream
raw :: Array String <- MaybeT $ Nullable.toMaybe <$> readImpl stream
liftEither $ lmap (error <<< show) $ runExcept $ readCSVRecord @r @rl cols raw
-- | Collect all parsed records into an array
readAll :: forall @r rl a m. MonadRec m => MonadAff m => RowToList r rl => ReadCSVRecord r rl => CSVParser r a -> m (Array { | r })
readAll
:: forall @r rl a m p
. Alternative p
=> Parallel p m
=> MonadRec m
=> MonadAff m
=> RowToList r rl
=> ReadCSVRecord r rl
=> CSVParser r a
-> m (Array { | r })
readAll stream = do
records <- liftEffect $ ST.toEffect $ Array.ST.new
foreach stream $ void <<< liftEffect <<< ST.toEffect <<< flip Array.ST.push records
liftEffect $ ST.toEffect $ Array.ST.unsafeFreeze records
records <- liftEffect $ liftST $ Array.ST.new
foreach stream $ void <<< liftEffect <<< liftST <<< flip Array.ST.push records
liftEffect $ liftST $ Array.ST.unsafeFreeze records
-- | `data` event. Emitted when a CSV record has been parsed.
dataH :: forall r a. EventHandle1 (CSVParser r a) { | r }

View File

@@ -1,7 +1,7 @@
import {stringify} from 'csv-stringify'
import { stringify } from "csv-stringify";
/** @type {(c: import('csv-stringify').Options) => () => import('csv-stringify').Stringifier} */
export const makeImpl = c => () => stringify(c)
export const makeImpl = (c) => () => stringify(c);
/** @type {(s: import('csv-stringify').Stringifier) => (vals: Array<string>) => () => void} */
export const writeImpl = s => vals => () => s.write(vals)
export const writeImpl = (s) => (vals) => () => s.write(vals);

View File

@@ -65,7 +65,7 @@ recordToForeign = unsafeCoerce
-- | Create a CSVStringifier
make :: forall @r rl @config @missing @extra. Keys rl => RowToList r rl => WriteCSVRecord r rl => Union config missing (Config extra) => { | config } -> Effect (CSVStringifier r ())
make = makeImpl <<< unsafeToForeign <<< Object.union (recordToForeign {columns: Array.fromFoldable $ keys (Proxy @r)}) <<< recordToForeign
make = makeImpl <<< unsafeToForeign <<< Object.union (recordToForeign { columns: Array.fromFoldable $ keys (Proxy @r) }) <<< recordToForeign
-- | Synchronously stringify a collection of records
stringify :: forall @r rl f m @config missing extra. MonadAff m => MonadRec m => Keys rl => Foldable f => RowToList r rl => WriteCSVRecord r rl => Union config missing (Config extra) => { | config } -> f { | r } -> m String