35 Commits

Author SHA1 Message Date
a3625ab1b7 chore: prepare v1.2.2 2024-05-03 10:54:17 -05:00
87f42c3bfe fix: more efficiently / less blockingly read? 2024-05-03 10:54:12 -05:00
5f8a82c8d8 fix: replace unneeded parTraverse with traverse 2024-05-03 10:46:48 -05:00
cda17f8d3c chore: prepare v1.2.1 2024-05-03 10:44:42 -05:00
616ceabd9f fix: do not busy loop 2024-05-03 10:44:37 -05:00
eab713cd4e chore: prepare v1.2.0 2024-05-03 10:42:51 -05:00
e21260cd2c feat: Parser.foreach should concurrently process records as they are read
User-provided callback no longer blocks subsequent reads
2024-05-03 10:41:33 -05:00
02090c3129 chore: prepare v1.1.14 2024-05-02 13:17:17 -05:00
bb25b8f562 fix: streaming mode bug 2024-05-02 13:17:14 -05:00
e3c89adaed chore: prepare v1.1.13 2024-05-02 13:07:40 -05:00
854ceacba3 fix: race condition? 2024-05-02 13:07:33 -05:00
a29203ce14 chore: prepare v1.1.12 2024-05-02 13:06:14 -05:00
a7b46d632a fix: race condition? 2024-05-02 13:06:08 -05:00
ee7619b93d chore: prepare v1.1.11 2024-05-02 13:02:54 -05:00
3adbc63df1 fix: race condition? 2024-05-02 13:02:51 -05:00
4f0ddbf75c chore: prepare v1.1.10 2024-05-02 12:56:24 -05:00
1ee358a55b fix: canceler in foreach 2024-05-02 12:56:14 -05:00
30f127788b chore: prepare v1.1.9 2024-05-02 12:00:04 -05:00
1eb6f2242f fix: generalize parser/stringifier to MonadAff 2024-05-02 11:59:50 -05:00
03cc9eba28 chore: prepare v1.1.8 2024-05-01 16:45:30 -05:00
488ea405ff fix: parse numbers properly 2024-05-01 16:45:20 -05:00
bb2274bf19 chore: prepare v1.1.7 2024-05-01 11:05:46 -05:00
8eaad8a39c fix: more bugs 2024-05-01 10:35:29 -05:00
c80bcaa4d6 chore: prepare v1.1.6 2024-05-01 10:12:54 -05:00
cae11ace61 fix: more bugs 2024-05-01 10:12:51 -05:00
70d6ed44f6 chore: prepare v1.1.5 2024-05-01 10:02:50 -05:00
a17f0774f6 fix: more bugs 2024-05-01 10:02:45 -05:00
f2e2b4b9c8 chore: prepare v1.1.4 2024-05-01 10:01:25 -05:00
0a9beb46ea fix: more bugs 2024-05-01 10:01:22 -05:00
60346c9c10 chore: prepare v1.1.3 2024-05-01 09:55:28 -05:00
10fe682cc9 fix: use column order from first row 2024-05-01 09:55:21 -05:00
db425ea4d0 chore: prepare v1.1.2 2024-05-01 09:11:48 -05:00
7c64e66119 fix: readCSVRecord bug 2024-05-01 09:11:34 -05:00
8d2705c53e chore: prepare v1.1.1 2024-05-01 08:54:07 -05:00
3bc01c5afa fix: stringify options 2024-05-01 08:52:13 -05:00
11 changed files with 233 additions and 87 deletions

View File

@@ -1,27 +1,27 @@
/** @type {(parser: string, ps: string[]) => import("bun").Subprocess} */ /** @type {(parser: string, ps: string[]) => import("bun").Subprocess} */
const prettier = (parser, ps) => const prettier = (parser, ps) =>
Bun.spawn(['bun', 'x', 'prettier', '--write', '--parser', parser, ...ps], { Bun.spawn(["bun", "x", "prettier", "--write", "--parser", parser, ...ps], {
stdout: 'inherit', stdout: "inherit",
stderr: 'inherit', stderr: "inherit",
}) });
const procs = [ const procs = [
prettier('babel', ['./src/**/*.js', './bun/**/*.js', './.prettierrc.cjs']), prettier("babel", ["./src/**/*.js", "./bun/**/*.js", "./.prettierrc.cjs"]),
prettier('json', ['./package.json', './jsconfig.json']), prettier("json", ["./package.json", "./jsconfig.json"]),
Bun.spawn( Bun.spawn(
[ [
'bun', "bun",
'x', "x",
'purs-tidy', "purs-tidy",
'format-in-place', "format-in-place",
'src/**/*.purs', "src/**/*.purs",
'test/**/*.purs', "test/**/*.purs",
], ],
{ {
stdout: 'inherit', stdout: "inherit",
stderr: 'inherit', stderr: "inherit",
}, },
), ),
] ];
await Promise.all(procs.map(p => p.exited)) await Promise.all(procs.map((p) => p.exited));

View File

@@ -1,34 +1,34 @@
import { readFile, writeFile } from 'fs/promises' import { readFile, writeFile } from "fs/promises";
import { execSync } from 'child_process' import { execSync } from "child_process";
let ver = process.argv[2] let ver = process.argv[2];
if (!ver) { if (!ver) {
console.error(`tag required: bun bun/prepare.js v1.0.0`) console.error(`tag required: bun bun/prepare.js v1.0.0`);
process.exit(1) process.exit(1);
} else if (!/v\d+\.\d+\.\d+/.test(ver)) { } else if (!/v\d+\.\d+\.\d+/.test(ver)) {
console.error(`invalid tag: ${ver}`) console.error(`invalid tag: ${ver}`);
process.exit(1) process.exit(1);
} }
ver = (/\d+\.\d+\.\d+/.exec(ver) || [])[0] || '' ver = (/\d+\.\d+\.\d+/.exec(ver) || [])[0] || "";
const pkg = await readFile('./package.json', 'utf8') const pkg = await readFile("./package.json", "utf8");
const pkgnew = pkg.replace(/"version": ".+"/, `"version": "v${ver}"`) const pkgnew = pkg.replace(/"version": ".+"/, `"version": "v${ver}"`);
await writeFile('./package.json', pkgnew) await writeFile("./package.json", pkgnew);
const spago = await readFile('./spago.yaml', 'utf8') const spago = await readFile("./spago.yaml", "utf8");
const spagonew = spago.replace(/version: .+/, `version: '${ver}'`) const spagonew = spago.replace(/version: .+/, `version: '${ver}'`);
await writeFile('./spago.yaml', spagonew) await writeFile("./spago.yaml", spagonew);
const readme = await readFile('./README.md', 'utf8') const readme = await readFile("./README.md", "utf8");
const readmenew = readme.replace( const readmenew = readme.replace(
/packages\/purescript-csv-stream\/.+?\//g, /packages\/purescript-csv-stream\/.+?\//g,
`/packages/purescript-csv-stream/${ver}/`, `/packages/purescript-csv-stream/${ver}/`,
) );
await writeFile('./README.md', readmenew) await writeFile("./README.md", readmenew);
execSync(`git add spago.yaml package.json README.md`) execSync(`git add spago.yaml package.json README.md`);
execSync(`git commit -m 'chore: prepare v${ver}'`) execSync(`git commit -m 'chore: prepare v${ver}'`);
execSync(`git tag v${ver}`) execSync(`git tag v${ver}`);
execSync(`git push --tags`) execSync(`git push --tags`);
execSync(`git push --mirror github-mirror`) execSync(`git push --mirror github-mirror`);

View File

@@ -1,6 +1,6 @@
{ {
"name": "purescript-csv-stream", "name": "purescript-csv-stream",
"version": "v1.1.0", "version": "v1.2.2",
"dependencies": { "dependencies": {
"csv-parse": "^5.5.5", "csv-parse": "^5.5.5",
"csv-stringify": "^6.4.6" "csv-stringify": "^6.4.6"

View File

@@ -6,10 +6,12 @@ workspace:
- aff: ">=7.1.0 <8.0.0" - aff: ">=7.1.0 <8.0.0"
- arrays: ">=7.3.0 <8.0.0" - arrays: ">=7.3.0 <8.0.0"
- bifunctors: ">=6.0.0 <7.0.0" - bifunctors: ">=6.0.0 <7.0.0"
- control: ">=6.0.0 <7.0.0"
- datetime: ">=6.1.0 <7.0.0" - datetime: ">=6.1.0 <7.0.0"
- effect: ">=4.0.0 <5.0.0" - effect: ">=4.0.0 <5.0.0"
- either: ">=6.1.0 <7.0.0" - either: ">=6.1.0 <7.0.0"
- exceptions: ">=6.0.0 <7.0.0" - exceptions: ">=6.0.0 <7.0.0"
- filterable: ">=5.0.0 <6.0.0"
- foldable-traversable: ">=6.0.0 <7.0.0" - foldable-traversable: ">=6.0.0 <7.0.0"
- foreign: ">=7.0.0 <8.0.0" - foreign: ">=7.0.0 <8.0.0"
- foreign-object: ">=4.1.0 <5.0.0" - foreign-object: ">=4.1.0 <5.0.0"
@@ -22,9 +24,12 @@ workspace:
- node-streams: ">=9.0.0 <10.0.0" - node-streams: ">=9.0.0 <10.0.0"
- nullable: ">=6.0.0 <7.0.0" - nullable: ">=6.0.0 <7.0.0"
- numbers: ">=9.0.1 <10.0.0" - numbers: ">=9.0.1 <10.0.0"
- ordered-collections: ">=3.2.0 <4.0.0"
- parallel: ">=6.0.0 <7.0.0"
- precise-datetime: ">=7.0.0 <8.0.0" - precise-datetime: ">=7.0.0 <8.0.0"
- prelude: ">=6.0.1 <7.0.0" - prelude: ">=6.0.1 <7.0.0"
- record: ">=4.0.0 <5.0.0" - record: ">=4.0.0 <5.0.0"
- record-extra: ">=5.0.1 <6.0.0"
- st: ">=6.2.0 <7.0.0" - st: ">=6.2.0 <7.0.0"
- strings: ">=6.0.1 <7.0.0" - strings: ">=6.0.1 <7.0.0"
- tailrec: ">=6.1.0 <7.0.0" - tailrec: ">=6.1.0 <7.0.0"
@@ -50,6 +55,7 @@ workspace:
- enums - enums
- exceptions - exceptions
- exists - exists
- filterable
- fixed-points - fixed-points
- foldable-traversable - foldable-traversable
- foreign - foreign
@@ -82,6 +88,7 @@ workspace:
- prelude - prelude
- profunctor - profunctor
- record - record
- record-extra
- refs - refs
- safe-coerce - safe-coerce
- st - st
@@ -266,6 +273,17 @@ packages:
integrity: sha256-A0JQHpTfo1dNOj9U5/Fd3xndlRSE0g2IQWOGor2yXn8= integrity: sha256-A0JQHpTfo1dNOj9U5/Fd3xndlRSE0g2IQWOGor2yXn8=
dependencies: dependencies:
- unsafe-coerce - unsafe-coerce
filterable:
type: registry
version: 5.0.0
integrity: sha256-cCojJHRnTmpY1j1kegI4CFwghdQ2Fm/8dzM8IlC+lng=
dependencies:
- arrays
- either
- foldable-traversable
- identity
- lists
- ordered-collections
fixed-points: fixed-points:
type: registry type: registry
version: 7.0.0 version: 7.0.0
@@ -641,6 +659,18 @@ packages:
- functions - functions
- prelude - prelude
- unsafe-coerce - unsafe-coerce
record-extra:
type: registry
version: 5.0.1
integrity: sha256-7vnREK2fpGJ7exswSeA9UpZFuU+UXRt3SA7AFUldT/Y=
dependencies:
- arrays
- functions
- lists
- prelude
- record
- tuples
- typelevel-prelude
refs: refs:
type: registry type: registry
version: 6.0.0 version: 6.0.0

View File

@@ -1,7 +1,7 @@
package: package:
name: csv-stream name: csv-stream
publish: publish:
version: '1.1.0' version: '1.2.2'
license: 'GPL-3.0-or-later' license: 'GPL-3.0-or-later'
location: location:
githubOwner: 'cakekindel' githubOwner: 'cakekindel'
@@ -13,10 +13,12 @@ package:
- aff: ">=7.1.0 <8.0.0" - aff: ">=7.1.0 <8.0.0"
- arrays: ">=7.3.0 <8.0.0" - arrays: ">=7.3.0 <8.0.0"
- bifunctors: ">=6.0.0 <7.0.0" - bifunctors: ">=6.0.0 <7.0.0"
- control: ">=6.0.0 <7.0.0"
- datetime: ">=6.1.0 <7.0.0" - datetime: ">=6.1.0 <7.0.0"
- effect: ">=4.0.0 <5.0.0" - effect: ">=4.0.0 <5.0.0"
- either: ">=6.1.0 <7.0.0" - either: ">=6.1.0 <7.0.0"
- exceptions: ">=6.0.0 <7.0.0" - exceptions: ">=6.0.0 <7.0.0"
- filterable: ">=5.0.0 <6.0.0"
- foldable-traversable: ">=6.0.0 <7.0.0" - foldable-traversable: ">=6.0.0 <7.0.0"
- foreign: ">=7.0.0 <8.0.0" - foreign: ">=7.0.0 <8.0.0"
- foreign-object: ">=4.1.0 <5.0.0" - foreign-object: ">=4.1.0 <5.0.0"
@@ -29,9 +31,12 @@ package:
- node-streams: ">=9.0.0 <10.0.0" - node-streams: ">=9.0.0 <10.0.0"
- nullable: ">=6.0.0 <7.0.0" - nullable: ">=6.0.0 <7.0.0"
- numbers: ">=9.0.1 <10.0.0" - numbers: ">=9.0.1 <10.0.0"
- ordered-collections: ">=3.2.0 <4.0.0"
- parallel: ">=6.0.0 <7.0.0"
- precise-datetime: ">=7.0.0 <8.0.0" - precise-datetime: ">=7.0.0 <8.0.0"
- prelude: ">=6.0.1 <7.0.0" - prelude: ">=6.0.1 <7.0.0"
- record: ">=4.0.0 <5.0.0" - record: ">=4.0.0 <5.0.0"
- record-extra: ">=5.0.1 <6.0.0"
- st: ">=6.2.0 <7.0.0" - st: ">=6.2.0 <7.0.0"
- strings: ">=6.0.1 <7.0.0" - strings: ">=6.0.1 <7.0.0"
- tailrec: ">=6.1.0 <7.0.0" - tailrec: ">=6.1.0 <7.0.0"

View File

@@ -7,8 +7,10 @@ import Control.Monad.Except (Except)
import Data.Array as Array import Data.Array as Array
import Data.CSV (class ReadCSV, class WriteCSV, readCSV, writeCSV) import Data.CSV (class ReadCSV, class WriteCSV, readCSV, writeCSV)
import Data.List.NonEmpty (NonEmptyList) import Data.List.NonEmpty (NonEmptyList)
import Data.Map (Map)
import Data.Map as Map
import Data.Maybe (fromMaybe) import Data.Maybe (fromMaybe)
import Data.Symbol (class IsSymbol) import Data.Symbol (class IsSymbol, reflectSymbol)
import Foreign (ForeignError(..)) import Foreign (ForeignError(..))
import Prim.Row (class Cons, class Lacks) import Prim.Row (class Cons, class Lacks)
import Prim.RowList (class RowToList, Cons, Nil, RowList) import Prim.RowList (class RowToList, Cons, Nil, RowList)
@@ -20,25 +22,29 @@ class RowToList r rl <= WriteCSVRecord r rl | rl -> r where
writeCSVRecord :: { | r } -> Array String writeCSVRecord :: { | r } -> Array String
instance (RowToList r (Cons k v tailrl), IsSymbol k, WriteCSV v, Lacks k tail, Cons k v tail r, WriteCSVRecord tail tailrl) => WriteCSVRecord r (Cons k v tailrl) where instance (RowToList r (Cons k v tailrl), IsSymbol k, WriteCSV v, Lacks k tail, Cons k v tail r, WriteCSVRecord tail tailrl) => WriteCSVRecord r (Cons k v tailrl) where
writeCSVRecord r = let writeCSVRecord r =
val = writeCSV $ Record.get (Proxy @k) r let
tail = writeCSVRecord @tail @tailrl $ Record.delete (Proxy @k) r val = writeCSV $ Record.get (Proxy @k) r
in tail = writeCSVRecord @tail @tailrl $ Record.delete (Proxy @k) r
[val] <> tail in
[ val ] <> tail
instance WriteCSVRecord () Nil where instance WriteCSVRecord () Nil where
writeCSVRecord _ = [] writeCSVRecord _ = []
class ReadCSVRecord :: Row Type -> RowList Type -> Constraint class ReadCSVRecord :: Row Type -> RowList Type -> Constraint
class RowToList r rl <= ReadCSVRecord r rl | rl -> r where class RowToList r rl <= ReadCSVRecord r rl | rl -> r where
readCSVRecord :: Array String -> Except (NonEmptyList ForeignError) { | r } readCSVRecord :: Map String Int -> Array String -> Except (NonEmptyList ForeignError) { | r }
instance (RowToList r (Cons k v tailrl), IsSymbol k, ReadCSV v, Lacks k tail, Cons k v tail r, ReadCSVRecord tail tailrl) => ReadCSVRecord r (Cons k v tailrl) where instance (RowToList r (Cons k v tailrl), IsSymbol k, ReadCSV v, Lacks k tail, Cons k v tail r, ReadCSVRecord tail tailrl) => ReadCSVRecord r (Cons k v tailrl) where
readCSVRecord vals = do readCSVRecord cols vals = do
valraw <- liftMaybe (pure $ ForeignError "unexpected end of record") $ Array.head vals let
k = reflectSymbol (Proxy @k)
pos <- liftMaybe (pure $ ForeignError $ "row too long; did not expect value " <> k) $ Map.lookup k cols
let valraw = fromMaybe "" $ Array.index vals pos
val <- readCSV @v valraw val <- readCSV @v valraw
tail <- readCSVRecord @tail @tailrl (fromMaybe [] $ Array.tail vals) tail <- readCSVRecord @tail @tailrl cols vals
pure $ Record.insert (Proxy @k) val tail pure $ Record.insert (Proxy @k) val tail
instance ReadCSVRecord () Nil where instance ReadCSVRecord () Nil where
readCSVRecord _ = pure {} readCSVRecord _ _ = pure {}

View File

@@ -9,11 +9,12 @@ import Data.Int as Int
import Data.List.NonEmpty (NonEmptyList) import Data.List.NonEmpty (NonEmptyList)
import Data.Maybe (Maybe(..), maybe) import Data.Maybe (Maybe(..), maybe)
import Data.Newtype (unwrap) import Data.Newtype (unwrap)
import Data.Number (fromString) as Number
import Data.Number.Format (toString) as Number import Data.Number.Format (toString) as Number
import Data.PreciseDateTime (fromDateTime, fromRFC3339String, toDateTimeLossy, toRFC3339String) import Data.PreciseDateTime (fromDateTime, fromRFC3339String, toDateTimeLossy, toRFC3339String)
import Data.RFC3339String (RFC3339String(..)) import Data.RFC3339String (RFC3339String(..))
import Data.String as String import Data.String as String
import Foreign (ForeignError(..), readInt, readNumber, unsafeToForeign) import Foreign (ForeignError(..))
class ReadCSV a where class ReadCSV a where
readCSV :: String -> Except (NonEmptyList ForeignError) a readCSV :: String -> Except (NonEmptyList ForeignError) a
@@ -22,10 +23,10 @@ class WriteCSV a where
writeCSV :: a -> String writeCSV :: a -> String
instance ReadCSV Int where instance ReadCSV Int where
readCSV = readInt <<< unsafeToForeign readCSV s = liftMaybe (pure $ ForeignError $ "invalid integer: " <> s) $ Int.fromString s
instance ReadCSV Number where instance ReadCSV Number where
readCSV = readNumber <<< unsafeToForeign readCSV s = liftMaybe (pure $ ForeignError $ "invalid number: " <> s) $ Number.fromString s
instance ReadCSV String where instance ReadCSV String where
readCSV = pure readCSV = pure

View File

@@ -1,7 +1,29 @@
import {parse} from 'csv-parse' import { parse, Parser } from "csv-parse";
/** @type {(s: import('csv-parse').Options) => () => import('csv-parse').Parser} */ class ParserWithColumns extends Parser {
export const makeImpl = c => () => parse(c) /** @type {Array<string>} */
columns = [];
/** @type {Map<string, number> | null} */
columnsMap = null;
}
/** @type {(s: import('stream').Duplex) => () => string[] | null} */ /** @type {(s: import('csv-parse').Options) => () => ParserWithColumns} */
export const readImpl = s => () => s.read(); export const makeImpl = (c) => () => {
const parser = new ParserWithColumns(c);
parser.once("readable", () => {
parser.columns = parser.read();
});
return parser;
};
/** @type {(s: ParserWithColumns) => () => Array<string> | null} */
export const readImpl = (p) => () => p.read();
/** @type {(s: ParserWithColumns) => () => Array<string>} */
export const columnsArrayImpl = (p) => () => p.columns;
/** @type {(s: ParserWithColumns) => () => Map<string, number> | null} */
export const columnsMapImpl = (p) => () => p.columnsMap;
/** @type {(s: ParserWithColumns) => (m: Map<string, number>) => () => void} */
export const setColumnsMapImpl = (p) => (m) => () => (p.columnsMap = m);

View File

@@ -1,22 +1,32 @@
module Node.Stream.CSV.Parse where module Node.Stream.CSV.Parse where
import Prelude import Prelude hiding (join)
import Control.Alt ((<|>))
import Control.Monad.Error.Class (liftEither) import Control.Monad.Error.Class (liftEither)
import Control.Monad.Except (runExcept) import Control.Monad.Except (runExcept)
import Control.Monad.Maybe.Trans (MaybeT(..), runMaybeT) import Control.Monad.Maybe.Trans (MaybeT(..), runMaybeT)
import Control.Monad.Rec.Class (whileJust) import Control.Monad.Rec.Class (class MonadRec, whileJust)
import Control.Monad.ST.Global as ST import Control.Monad.ST.Global as ST
import Control.Monad.Trans.Class (lift)
import Control.MonadPlus (class Alternative)
import Control.Parallel (class Parallel, parTraverse_)
import Data.Array as Array
import Data.Array.ST as Array.ST import Data.Array.ST as Array.ST
import Data.Bifunctor (lmap) import Data.Bifunctor (lmap)
import Data.CSV.Record (class ReadCSVRecord, readCSVRecord) import Data.CSV.Record (class ReadCSVRecord, readCSVRecord)
import Data.Either (Either(..)) import Data.Either (Either(..))
import Data.Maybe (Maybe(..)) import Data.Filterable (filter)
import Data.Map (Map)
import Data.Map as Map
import Data.Maybe (Maybe(..), isNothing)
import Data.Nullable (Nullable) import Data.Nullable (Nullable)
import Data.Nullable as Nullable import Data.Nullable as Nullable
import Data.Traversable (for_) import Data.Traversable (for)
import Effect (Effect) import Effect (Effect)
import Effect.Aff (Aff, makeAff) import Effect as Effect
import Effect.Aff (Canceler(..), makeAff)
import Effect.Aff.Class (class MonadAff, liftAff)
import Effect.Class (liftEffect) import Effect.Class (liftEffect)
import Effect.Exception (error) import Effect.Exception (error)
import Effect.Uncurried (mkEffectFn1) import Effect.Uncurried (mkEffectFn1)
@@ -77,10 +87,21 @@ type Config r =
-- | Create a CSVParser -- | Create a CSVParser
make :: forall @r rl @config @missing @extra. RowToList r rl => ReadCSVRecord r rl => Union config missing (Config extra) => { | config } -> Effect (CSVParser r ()) make :: forall @r rl @config @missing @extra. RowToList r rl => ReadCSVRecord r rl => Union config missing (Config extra) => { | config } -> Effect (CSVParser r ())
make = makeImpl <<< unsafeToForeign <<< Object.union (recordToForeign {columns: true, cast: false, cast_date: false}) <<< recordToForeign make = makeImpl <<< unsafeToForeign <<< Object.union (recordToForeign { columns: false, cast: false, cast_date: false }) <<< recordToForeign
-- | Synchronously parse a CSV string -- | Synchronously parse a CSV string
parse :: forall @r rl @config missing extra. RowToList r rl => ReadCSVRecord r rl => Union config missing (Config extra) => { | config } -> String -> Aff (Array { | r }) parse
:: forall @r rl @config missing extra m p
. Alternative p
=> Parallel p m
=> MonadAff m
=> MonadRec m
=> RowToList r rl
=> ReadCSVRecord r rl
=> Union config missing (Config extra)
=> { | config }
-> String
-> m (Array { | r })
parse config csv = do parse config csv = do
stream <- liftEffect $ make @r @config @missing @extra config stream <- liftEffect $ make @r @config @missing @extra config
void $ liftEffect $ Stream.writeString stream UTF8 csv void $ liftEffect $ Stream.writeString stream UTF8 csv
@@ -88,14 +109,31 @@ parse config csv = do
readAll stream readAll stream
-- | Loop until the stream is closed, invoking the callback with each record as it is parsed. -- | Loop until the stream is closed, invoking the callback with each record as it is parsed.
foreach :: forall @r rl x. RowToList r rl => ReadCSVRecord r rl => CSVParser r x -> ({ | r } -> Aff Unit) -> Aff Unit foreach
:: forall @r rl x m p
. Alternative p
=> Parallel p m
=> MonadRec m
=> MonadAff m
=> RowToList r rl
=> ReadCSVRecord r rl
=> CSVParser r x
-> ({ | r } -> m Unit)
-> m Unit
foreach stream cb = whileJust do foreach stream cb = whileJust do
isReadable <- liftEffect $ Stream.readable stream isReadable <- liftEffect $ Stream.readable stream
when (not isReadable) $ makeAff \res -> mempty <* flip (Event.once Stream.readableH) stream $ res $ Right unit liftAff $ when (not isReadable) $ makeAff \res -> do
whileJust do stop <- flip (Event.once Stream.readableH) stream $ res $ Right unit
r <- liftEffect $ read @r stream pure $ Canceler $ const $ liftEffect stop
for_ r cb
pure $ void r recordsST <- liftEffect $ ST.toEffect $ Array.ST.new
liftEffect $ Effect.untilE do
r <- read @r stream
void $ for r $ ST.toEffect <<< flip Array.ST.push recordsST
pure $ isNothing r
records <- liftEffect $ ST.toEffect $ Array.ST.unsafeFreeze recordsST
parTraverse_ cb records
isClosed <- liftEffect $ Stream.closed stream isClosed <- liftEffect $ Stream.closed stream
pure $ if isClosed then Nothing else Just unit pure $ if isClosed then Nothing else Just unit
@@ -104,13 +142,28 @@ foreach stream cb = whileJust do
-- | Returns `Nothing` when either: -- | Returns `Nothing` when either:
-- | - The internal buffer of parsed records has been exhausted, but there will be more (`Node.Stream.readable` and `Node.Stream.closed` are both `false`) -- | - The internal buffer of parsed records has been exhausted, but there will be more (`Node.Stream.readable` and `Node.Stream.closed` are both `false`)
-- | - All records have been processed (`Node.Stream.closed` is `true`) -- | - All records have been processed (`Node.Stream.closed` is `true`)
read :: forall @r rl a. RowToList r rl => ReadCSVRecord r rl => CSVParser r a -> Effect (Maybe { | r }) read
:: forall @r rl a
. RowToList r rl
=> ReadCSVRecord r rl
=> CSVParser r a
-> Effect (Maybe { | r })
read stream = runMaybeT do read stream = runMaybeT do
cols <- MaybeT $ getOrInitColumnsMap stream
raw :: Array String <- MaybeT $ Nullable.toMaybe <$> readImpl stream raw :: Array String <- MaybeT $ Nullable.toMaybe <$> readImpl stream
liftEither $ lmap (error <<< show) $ runExcept $ readCSVRecord @r @rl raw liftEither $ lmap (error <<< show) $ runExcept $ readCSVRecord @r @rl cols raw
-- | Collect all parsed records into an array -- | Collect all parsed records into an array
readAll :: forall @r rl a. RowToList r rl => ReadCSVRecord r rl => CSVParser r a -> Aff (Array { | r }) readAll
:: forall @r rl a m p
. Alternative p
=> Parallel p m
=> MonadRec m
=> MonadAff m
=> RowToList r rl
=> ReadCSVRecord r rl
=> CSVParser r a
-> m (Array { | r })
readAll stream = do readAll stream = do
records <- liftEffect $ ST.toEffect $ Array.ST.new records <- liftEffect $ ST.toEffect $ Array.ST.new
foreach stream $ void <<< liftEffect <<< ST.toEffect <<< flip Array.ST.push records foreach stream $ void <<< liftEffect <<< ST.toEffect <<< flip Array.ST.push records
@@ -126,6 +179,30 @@ foreign import makeImpl :: forall r. Foreign -> Effect (Stream r)
-- | FFI -- | FFI
foreign import readImpl :: forall r. Stream r -> Effect (Nullable (Array String)) foreign import readImpl :: forall r. Stream r -> Effect (Nullable (Array String))
-- | FFI
foreign import columnsArrayImpl :: forall r. Stream r -> Effect (Array String)
-- | FFI
foreign import columnsMapImpl :: forall r. Stream r -> Effect (Nullable (Map String Int))
-- | FFI
foreign import setColumnsMapImpl :: forall r. Stream r -> Map String Int -> Effect Unit
-- | FFI
getOrInitColumnsMap :: forall r x. CSVParser r x -> Effect (Maybe (Map String Int))
getOrInitColumnsMap s = runMaybeT do
cols :: Array String <- MaybeT $ filter (not <<< Array.null) <$> Just <$> columnsArrayImpl s
let
get = MaybeT $ Nullable.toMaybe <$> columnsMapImpl s
init = do
let
ixs = Array.range 0 (Array.length cols - 1)
assoc = Array.zip cols ixs
map = Map.fromFoldable assoc
lift $ setColumnsMapImpl s map
pure map
get <|> init
-- | FFI -- | FFI
recordToForeign :: forall r. Record r -> Object Foreign recordToForeign :: forall r. Record r -> Object Foreign
recordToForeign = unsafeCoerce recordToForeign = unsafeCoerce

View File

@@ -1,7 +1,7 @@
import {stringify} from 'csv-stringify' import { stringify } from "csv-stringify";
/** @type {(c: import('csv-stringify').Options) => () => import('csv-stringify').Stringifier} */ /** @type {(c: import('csv-stringify').Options) => () => import('csv-stringify').Stringifier} */
export const makeImpl = c => () => stringify(c) export const makeImpl = (c) => () => stringify(c);
/** @type {(s: import('csv-stringify').Stringifier) => (vals: Array<string>) => () => void} */ /** @type {(s: import('csv-stringify').Stringifier) => (vals: Array<string>) => () => void} */
export const writeImpl = s => vals => () => s.write(vals) export const writeImpl = (s) => (vals) => () => s.write(vals);

View File

@@ -2,8 +2,9 @@ module Node.Stream.CSV.Stringify where
import Prelude import Prelude
import Control.Monad.Rec.Class (whileJust) import Control.Monad.Rec.Class (class MonadRec, whileJust)
import Control.Monad.ST.Global as ST import Control.Monad.ST.Global as ST
import Data.Array as Array
import Data.Array.ST as Array.ST import Data.Array.ST as Array.ST
import Data.CSV.Record (class WriteCSVRecord, writeCSVRecord) import Data.CSV.Record (class WriteCSVRecord, writeCSVRecord)
import Data.Either (Either(..), blush) import Data.Either (Either(..), blush)
@@ -12,7 +13,8 @@ import Data.Maybe (Maybe(..))
import Data.String.Regex (Regex) import Data.String.Regex (Regex)
import Data.Traversable (for_) import Data.Traversable (for_)
import Effect (Effect) import Effect (Effect)
import Effect.Aff (Aff, makeAff) import Effect.Aff (Canceler(..), makeAff)
import Effect.Aff.Class (class MonadAff, liftAff)
import Effect.Class (liftEffect) import Effect.Class (liftEffect)
import Foreign (Foreign, unsafeToForeign) import Foreign (Foreign, unsafeToForeign)
import Foreign.Object (Object) import Foreign.Object (Object)
@@ -22,6 +24,8 @@ import Node.Stream (Read, Stream, Write)
import Node.Stream as Stream import Node.Stream as Stream
import Prim.Row (class Union) import Prim.Row (class Union)
import Prim.RowList (class RowToList) import Prim.RowList (class RowToList)
import Record.Extra (class Keys, keys)
import Type.Prelude (Proxy(..))
import Unsafe.Coerce (unsafeCoerce) import Unsafe.Coerce (unsafeCoerce)
data CSVWrite data CSVWrite
@@ -40,7 +44,6 @@ type CSVStringifier a r = Stream (read :: Read, write :: Write, csv :: CSVWrite
-- | https://csv.js.org/stringify/options/ -- | https://csv.js.org/stringify/options/
type Config r = type Config r =
( bom :: Boolean ( bom :: Boolean
, group_columns_by_name :: Boolean
, delimiter :: String , delimiter :: String
, record_delimiter :: String , record_delimiter :: String
, escape :: String , escape :: String
@@ -61,11 +64,11 @@ recordToForeign :: forall r. Record r -> Object Foreign
recordToForeign = unsafeCoerce recordToForeign = unsafeCoerce
-- | Create a CSVStringifier -- | Create a CSVStringifier
make :: forall @r rl @config @missing @extra. RowToList r rl => WriteCSVRecord r rl => Union config missing (Config extra) => { | config } -> Effect (CSVStringifier r ()) make :: forall @r rl @config @missing @extra. Keys rl => RowToList r rl => WriteCSVRecord r rl => Union config missing (Config extra) => { | config } -> Effect (CSVStringifier r ())
make = makeImpl <<< unsafeToForeign <<< Object.union (recordToForeign {columns: true, cast: false, cast_date: false}) <<< recordToForeign make = makeImpl <<< unsafeToForeign <<< Object.union (recordToForeign { columns: Array.fromFoldable $ keys (Proxy @r) }) <<< recordToForeign
-- | Synchronously stringify a collection of records -- | Synchronously stringify a collection of records
stringify :: forall @r rl f @config missing extra. Foldable f => RowToList r rl => WriteCSVRecord r rl => Union config missing (Config extra) => { | config } -> f { | r } -> Aff String stringify :: forall @r rl f m @config missing extra. MonadAff m => MonadRec m => Keys rl => Foldable f => RowToList r rl => WriteCSVRecord r rl => Union config missing (Config extra) => { | config } -> f { | r } -> m String
stringify config records = do stringify config records = do
stream <- liftEffect $ make @r @config @missing @extra config stream <- liftEffect $ make @r @config @missing @extra config
liftEffect $ for_ records \r -> write stream r liftEffect $ for_ records \r -> write stream r
@@ -80,10 +83,12 @@ write :: forall @r rl a. RowToList r rl => WriteCSVRecord r rl => CSVStringifier
write s = writeImpl s <<< writeCSVRecord @r @rl write s = writeImpl s <<< writeCSVRecord @r @rl
-- | Loop until the stream is closed, invoking the callback with each chunk of stringified CSV text. -- | Loop until the stream is closed, invoking the callback with each chunk of stringified CSV text.
foreach :: forall r x. CSVStringifier r x -> (String -> Aff Unit) -> Aff Unit foreach :: forall m r x. MonadAff m => MonadRec m => CSVStringifier r x -> (String -> m Unit) -> m Unit
foreach stream cb = whileJust do foreach stream cb = whileJust do
isReadable <- liftEffect $ Stream.readable stream isReadable <- liftEffect $ Stream.readable stream
when (not isReadable) $ makeAff \res -> mempty <* flip (Event.once Stream.readableH) stream $ res $ Right unit liftAff $ when (not isReadable) $ makeAff \res -> do
stop <- flip (Event.once Stream.readableH) stream $ res $ Right unit
pure $ Canceler $ const $ liftEffect stop
whileJust do whileJust do
s <- liftEffect $ (join <<< map blush) <$> Stream.readEither stream s <- liftEffect $ (join <<< map blush) <$> Stream.readEither stream
for_ s cb for_ s cb
@@ -92,7 +97,7 @@ foreach stream cb = whileJust do
pure $ if isClosed then Nothing else Just unit pure $ if isClosed then Nothing else Just unit
-- | Read the stringified chunks until end-of-stream, returning the entire CSV string. -- | Read the stringified chunks until end-of-stream, returning the entire CSV string.
readAll :: forall r a. CSVStringifier r a -> Aff String readAll :: forall r a m. MonadAff m => MonadRec m => CSVStringifier r a -> m String
readAll stream = do readAll stream = do
chunks <- liftEffect $ ST.toEffect $ Array.ST.new chunks <- liftEffect $ ST.toEffect $ Array.ST.new
foreach stream $ void <<< liftEffect <<< ST.toEffect <<< flip Array.ST.push chunks foreach stream $ void <<< liftEffect <<< ST.toEffect <<< flip Array.ST.push chunks