20 Commits

Author SHA1 Message Date
2b16a38a66 fix: improve performance 2024-05-13 15:06:23 -05:00
e87d81cc1d fix: generalize to MonadAff 2024-05-13 11:52:09 -05:00
b4b6dfdebd chore: prepare v2.0.0 2024-05-10 18:41:10 -05:00
d8b0039678 feat: rework API, use node-stream-pipes 2024-05-10 18:40:47 -05:00
d355d3b91c chore: prepare v1.2.19 2024-05-09 11:02:22 -05:00
eea77a3000 fix: maybe a race condition 2024-05-09 11:02:14 -05:00
3d8d722871 chore: prepare v1.2.18 2024-05-07 08:35:57 -05:00
aa7fb66b74 fix: prematurely ending 2024-05-07 08:35:48 -05:00
092276ed4c chore: prepare v1.2.17 2024-05-05 14:19:43 -05:00
bee63db2e7 fix: race condition or something 2024-05-05 14:19:34 -05:00
d6c6130279 chore: prepare v1.2.16 2024-05-03 15:51:53 -05:00
bbb78e2d95 fix: timeslice shenanigans 2024-05-03 15:51:45 -05:00
bf7404fe59 chore: prepare v1.2.15 2024-05-03 15:40:21 -05:00
6d41852361 fix: effect cb 2024-05-03 15:40:15 -05:00
35bd75fadf chore: prepare v1.2.14 2024-05-03 14:34:23 -05:00
c94adacc1a fix: timeslice shenanigans 2024-05-03 14:34:08 -05:00
a94781834d chore: prepare v1.2.13 2024-05-03 14:28:26 -05:00
499eb82986 fix: ?? 2024-05-03 14:28:23 -05:00
8e842feeef chore: prepare v1.2.12 2024-05-03 14:23:52 -05:00
52dd297c9c fix: what 2024-05-03 14:22:07 -05:00
11 changed files with 512 additions and 287 deletions

2
.gitignore vendored
View File

@@ -1,4 +1,3 @@
bower_components/
node_modules/
.pulp-cache/
@@ -10,3 +9,4 @@ generated-docs/
.purs*
.psa*
.spago
.tmp/

BIN
bun.lockb

Binary file not shown.

View File

@@ -1,9 +1,11 @@
{
"name": "purescript-csv-stream",
"version": "v1.2.11",
"version": "v2.0.0",
"type": "module",
"dependencies": {
"csv-parse": "^5.5.5",
"csv-stringify": "^6.4.6"
"csv-stringify": "^6.4.6",
"decimal.js": "^10.4.3"
},
"devDependencies": {
"typescript": "^5.4.5"

View File

@@ -6,12 +6,9 @@ workspace:
- aff: ">=7.1.0 <8.0.0"
- arrays: ">=7.3.0 <8.0.0"
- bifunctors: ">=6.0.0 <7.0.0"
- control: ">=6.0.0 <7.0.0"
- datetime: ">=6.1.0 <7.0.0"
- effect: ">=4.0.0 <5.0.0"
- either: ">=6.1.0 <7.0.0"
- exceptions: ">=6.0.0 <7.0.0"
- filterable: ">=5.0.0 <6.0.0"
- foldable-traversable: ">=6.0.0 <7.0.0"
- foreign: ">=7.0.0 <8.0.0"
- foreign-object: ">=4.1.0 <5.0.0"
@@ -21,10 +18,12 @@ workspace:
- newtype: ">=5.0.0 <6.0.0"
- node-buffer: ">=9.0.0 <10.0.0"
- node-event-emitter: ">=3.0.0 <4.0.0"
- node-stream-pipes: ">=1.3.0 <2.0.0"
- node-streams: ">=9.0.0 <10.0.0"
- nullable: ">=6.0.0 <7.0.0"
- numbers: ">=9.0.1 <10.0.0"
- ordered-collections: ">=3.2.0 <4.0.0"
- pipes: ">=8.0.0 <9.0.0"
- precise-datetime: ">=7.0.0 <8.0.0"
- prelude: ">=6.0.1 <7.0.0"
- record: ">=4.0.0 <5.0.0"
@@ -33,15 +32,23 @@ workspace:
- strings: ">=6.0.1 <7.0.0"
- tailrec: ">=6.1.0 <7.0.0"
- transformers: ">=6.0.0 <7.0.0"
- tuples: ">=7.0.0 <8.0.0"
- typelevel-prelude: ">=7.0.0 <8.0.0"
- unlift: ">=1.0.1 <2.0.0"
- unsafe-coerce: ">=6.0.0 <7.0.0"
test_dependencies:
- console
- gen
- node-fs
- node-zlib
- quickcheck
- simple-json
- spec
build_plan:
- aff
- ansi
- arraybuffer-types
- arrays
- avar
- bifunctors
- catenable-lists
- console
@@ -56,14 +63,13 @@ workspace:
- enums
- exceptions
- exists
- filterable
- fixed-points
- foldable-traversable
- foreign
- foreign-object
- fork
- formatters
- free
- freet
- functions
- functors
- gen
@@ -72,13 +78,18 @@ workspace:
- invariant
- js-date
- lazy
- lcg
- lists
- maybe
- monad-control
- mmorph
- newtype
- node-buffer
- node-event-emitter
- node-fs
- node-path
- node-stream-pipes
- node-streams
- node-zlib
- nonempty
- now
- nullable
@@ -88,13 +99,18 @@ workspace:
- parallel
- parsing
- partial
- pipes
- precise-datetime
- prelude
- profunctor
- quickcheck
- random
- record
- record-extra
- refs
- safe-coerce
- simple-json
- spec
- st
- strings
- tailrec
@@ -104,8 +120,9 @@ workspace:
- typelevel-prelude
- unfoldable
- unicode
- unlift
- unordered-collections
- unsafe-coerce
- variant
extra_packages: {}
packages:
aff:
@@ -130,6 +147,14 @@ packages:
- tailrec
- transformers
- unsafe-coerce
ansi:
type: registry
version: 7.0.0
integrity: sha256-ZMB6HD+q9CXvn9fRCmJ8dvuDrOVHcjombL3oNOerVnE=
dependencies:
- foldable-traversable
- lists
- strings
arraybuffer-types:
type: registry
version: 3.0.2
@@ -154,6 +179,17 @@ packages:
- tuples
- unfoldable
- unsafe-coerce
avar:
type: registry
version: 5.0.0
integrity: sha256-e7hf0x4hEpcygXP0LtvfvAQ49Bbj2aWtZT3gqM///0A=
dependencies:
- aff
- effect
- either
- exceptions
- functions
- maybe
bifunctors:
type: registry
version: 6.0.0
@@ -290,17 +326,6 @@ packages:
integrity: sha256-A0JQHpTfo1dNOj9U5/Fd3xndlRSE0g2IQWOGor2yXn8=
dependencies:
- unsafe-coerce
filterable:
type: registry
version: 5.0.0
integrity: sha256-cCojJHRnTmpY1j1kegI4CFwghdQ2Fm/8dzM8IlC+lng=
dependencies:
- arrays
- either
- foldable-traversable
- identity
- lists
- ordered-collections
fixed-points:
type: registry
version: 7.0.0
@@ -357,6 +382,12 @@ packages:
- tuples
- typelevel-prelude
- unfoldable
fork:
type: registry
version: 6.0.0
integrity: sha256-X7u0SuCvFbLbzuNEKLBNuWjmcroqMqit4xEzpQwAP7E=
dependencies:
- aff
formatters:
type: registry
version: 7.0.0
@@ -388,21 +419,6 @@ packages:
- transformers
- tuples
- unsafe-coerce
freet:
type: registry
version: 7.0.0
integrity: sha256-zkL6wU4ZPq8xz1kGFxoliWqyhBksepMJTyA68VEBaJo=
dependencies:
- aff
- bifunctors
- effect
- either
- exists
- free
- prelude
- tailrec
- transformers
- tuples
functions:
type: registry
version: 6.0.0
@@ -486,6 +502,17 @@ packages:
- foldable-traversable
- invariant
- prelude
lcg:
type: registry
version: 4.0.0
integrity: sha256-h7ME5cthLfbgJOJdsZcSfFpwXsx4rf8YmhebU+3iSYg=
dependencies:
- effect
- integers
- maybe
- partial
- prelude
- random
lists:
type: registry
version: 7.0.0
@@ -512,15 +539,14 @@ packages:
- invariant
- newtype
- prelude
monad-control:
mmorph:
type: registry
version: 5.0.0
integrity: sha256-bgfDW30wbIm70NR1Tvvh9P+VFQMDh1wK2sSJXCj/dZc=
version: 7.0.0
integrity: sha256-urZlZNNqGeQFe5D/ClHlR8QgGBNHTMFPtJ5S5IpflTQ=
dependencies:
- aff
- freet
- identity
- lists
- free
- functors
- transformers
newtype:
type: registry
version: 5.0.0
@@ -551,6 +577,67 @@ packages:
- nullable
- prelude
- unsafe-coerce
node-fs:
type: registry
version: 9.1.0
integrity: sha256-TzhvGdrwcM0bazDvrWSqh+M/H8GKYf1Na6aGm2Qg4+c=
dependencies:
- datetime
- effect
- either
- enums
- exceptions
- functions
- integers
- js-date
- maybe
- node-buffer
- node-path
- node-streams
- nullable
- partial
- prelude
- strings
- unsafe-coerce
node-path:
type: registry
version: 5.0.0
integrity: sha256-pd82nQ+2l5UThzaxPdKttgDt7xlsgIDLpPG0yxDEdyE=
dependencies:
- effect
node-stream-pipes:
type: registry
version: 1.3.0
integrity: sha256-5Jpf0BLn0ExQWYxbTTewai4M8quEmEVHxihc9CM1Juo=
dependencies:
- aff
- arrays
- effect
- either
- exceptions
- foldable-traversable
- foreign-object
- lists
- maybe
- mmorph
- newtype
- node-buffer
- node-event-emitter
- node-fs
- node-path
- node-streams
- node-zlib
- ordered-collections
- parallel
- pipes
- prelude
- st
- strings
- tailrec
- transformers
- tuples
- unordered-collections
- unsafe-coerce
node-streams:
type: registry
version: 9.0.0
@@ -564,6 +651,19 @@ packages:
- node-event-emitter
- nullable
- prelude
node-zlib:
type: registry
version: 0.4.0
integrity: sha256-kYSajFQFzWVg71l5/y4w4kXdTr5EJoqyV3D2RqmAjQ4=
dependencies:
- aff
- effect
- either
- functions
- node-buffer
- node-streams
- prelude
- unsafe-coerce
nonempty:
type: registry
version: 7.0.0
@@ -671,6 +771,18 @@ packages:
version: 4.0.0
integrity: sha256-fwXerld6Xw1VkReh8yeQsdtLVrjfGiVuC5bA1Wyo/J4=
dependencies: []
pipes:
type: registry
version: 8.0.0
integrity: sha256-kvfqGM4cPA/wCcBHbp5psouFw5dZGvku2462x7ZBwSY=
dependencies:
- aff
- lists
- mmorph
- prelude
- tailrec
- transformers
- tuples
precise-datetime:
type: registry
version: 7.0.0
@@ -711,6 +823,45 @@ packages:
- newtype
- prelude
- tuples
quickcheck:
type: registry
version: 8.0.1
integrity: sha256-ZvpccKQCvgslTXZCNmpYW4bUsFzhZd/kQUr2WmxFTGY=
dependencies:
- arrays
- console
- control
- effect
- either
- enums
- exceptions
- foldable-traversable
- gen
- identity
- integers
- lazy
- lcg
- lists
- maybe
- newtype
- nonempty
- numbers
- partial
- prelude
- record
- st
- strings
- tailrec
- transformers
- tuples
- unfoldable
random:
type: registry
version: 6.0.0
integrity: sha256-CJ611a35MPCE7XQMp0rdC6MCn76znlhisiCRgboAG+Q=
dependencies:
- effect
- integers
record:
type: registry
version: 4.0.0
@@ -744,6 +895,52 @@ packages:
integrity: sha256-a1ibQkiUcbODbLE/WAq7Ttbbh9ex+x33VCQ7GngKudU=
dependencies:
- unsafe-coerce
simple-json:
type: registry
version: 9.0.0
integrity: sha256-K3RJaThqsszTd+TEklzZmAdDqvIHWgXIfKqlsoykU1c=
dependencies:
- arrays
- exceptions
- foreign
- foreign-object
- nullable
- prelude
- record
- typelevel-prelude
- variant
spec:
type: registry
version: 7.6.0
integrity: sha256-+merGdQbL9zWONbnt8S8J9afGJ59MQqGtS0qSd3yu4I=
dependencies:
- aff
- ansi
- arrays
- avar
- bifunctors
- control
- datetime
- effect
- either
- exceptions
- foldable-traversable
- fork
- identity
- integers
- lists
- maybe
- newtype
- now
- ordered-collections
- parallel
- pipes
- prelude
- refs
- strings
- tailrec
- transformers
- tuples
st:
type: registry
version: 6.2.0
@@ -844,25 +1041,36 @@ packages:
- foldable-traversable
- maybe
- strings
unlift:
unordered-collections:
type: registry
version: 1.0.1
integrity: sha256-nbBCVV0fZz/3UHKoW11dcTwBYmQOIgK31ht2BN47RPw=
version: 3.1.0
integrity: sha256-H2eQR+ylI+cljz4XzWfEbdF7ee+pnw2IZCeq69AuJ+Q=
dependencies:
- aff
- effect
- either
- freet
- identity
- arrays
- enums
- functions
- integers
- lists
- maybe
- monad-control
- prelude
- st
- transformers
- record
- tuples
- typelevel-prelude
- unfoldable
unsafe-coerce:
type: registry
version: 6.0.0
integrity: sha256-IqIYW4Vkevn8sI+6aUwRGvd87tVL36BBeOr0cGAE7t0=
dependencies: []
variant:
type: registry
version: 8.0.0
integrity: sha256-SR//zQDg2dnbB8ZHslcxieUkCeNlbMToapvmh9onTtw=
dependencies:
- enums
- lists
- maybe
- partial
- prelude
- record
- tuples
- unsafe-coerce

View File

@@ -1,7 +1,7 @@
package:
name: csv-stream
publish:
version: '1.2.11'
version: '2.0.0'
license: 'GPL-3.0-or-later'
location:
githubOwner: 'cakekindel'
@@ -10,15 +10,13 @@ package:
strict: true
pedanticPackages: true
dependencies:
- node-stream-pipes: ">=1.3.0 <2.0.0"
- aff: ">=7.1.0 <8.0.0"
- arrays: ">=7.3.0 <8.0.0"
- bifunctors: ">=6.0.0 <7.0.0"
- control: ">=6.0.0 <7.0.0"
- datetime: ">=6.1.0 <7.0.0"
- effect: ">=4.0.0 <5.0.0"
- either: ">=6.1.0 <7.0.0"
- exceptions: ">=6.0.0 <7.0.0"
- filterable: ">=5.0.0 <6.0.0"
- foldable-traversable: ">=6.0.0 <7.0.0"
- foreign: ">=7.0.0 <8.0.0"
- foreign-object: ">=4.1.0 <5.0.0"
@@ -32,6 +30,7 @@ package:
- nullable: ">=6.0.0 <7.0.0"
- numbers: ">=9.0.1 <10.0.0"
- ordered-collections: ">=3.2.0 <4.0.0"
- pipes: ">=8.0.0 <9.0.0"
- precise-datetime: ">=7.0.0 <8.0.0"
- prelude: ">=6.0.1 <7.0.0"
- record: ">=4.0.0 <5.0.0"
@@ -40,12 +39,18 @@ package:
- strings: ">=6.0.1 <7.0.0"
- tailrec: ">=6.1.0 <7.0.0"
- transformers: ">=6.0.0 <7.0.0"
- tuples: ">=7.0.0 <8.0.0"
- typelevel-prelude: ">=7.0.0 <8.0.0"
- unlift: ">=1.0.1 <2.0.0"
- unsafe-coerce: ">=6.0.0 <7.0.0"
test:
main: Test.Main
dependencies:
- console
- gen
- node-fs
- node-zlib
- quickcheck
- simple-json
- spec
workspace:
extraPackages: {}

View File

@@ -1,30 +1,7 @@
import { parse, Parser } from "csv-parse";
import { Parser } from "csv-parse";
class ParserWithColumns extends Parser {
/** @type {Array<string>} */
columns = [];
/** @type {Map<string, number> | null} */
columnsMap = null;
}
/** @type {(s: import('csv-parse').Options) => () => Parser} */
export const makeImpl = (c) => () => new Parser(c);
/** @type {(s: import('csv-parse').Options) => () => ParserWithColumns} */
export const makeImpl = (c) => () => {
const parser = new ParserWithColumns(c);
parser.once("readable", () => {
parser.columns = parser.read();
parser.emit('columns', parser.columns)
});
return parser;
};
/** @type {(s: ParserWithColumns) => () => Array<string> | null} */
/** @type {(s: Parser) => () => Array<string> | null} */
export const readImpl = (p) => () => p.read();
/** @type {(s: ParserWithColumns) => () => Array<string>} */
export const columnsArrayImpl = (p) => () => p.columns;
/** @type {(s: ParserWithColumns) => () => Map<string, number> | null} */
export const columnsMapImpl = (p) => () => p.columnsMap;
/** @type {(s: ParserWithColumns) => (m: Map<string, number>) => () => void} */
export const setColumnsMapImpl = (p) => (m) => () => (p.columnsMap = m);

View File

@@ -2,43 +2,18 @@ module Node.Stream.CSV.Parse where
import Prelude hiding (join)
import Control.Alt ((<|>))
import Control.Monad.Error.Class (liftEither, liftMaybe)
import Control.Monad.Except (runExcept)
import Control.Monad.Except.Trans (catchError)
import Control.Monad.Maybe.Trans (MaybeT(..), runMaybeT)
import Control.Monad.ST.Class (liftST)
import Control.Monad.Trans.Class (lift)
import Data.Array as Array
import Data.Array.ST as Array.ST
import Data.Bifunctor (lmap)
import Data.CSV.Record (class ReadCSVRecord, readCSVRecord)
import Data.Either (Either(..))
import Data.Filterable (filter)
import Data.Map (Map)
import Data.Map as Map
import Data.Maybe (Maybe(..), isNothing)
import Data.Newtype (wrap)
import Data.Nullable (Nullable)
import Data.Nullable as Nullable
import Effect (Effect)
import Effect.Aff (Canceler(..), delay, launchAff_, makeAff)
import Effect.Aff.Class (class MonadAff, liftAff)
import Effect.Aff.Unlift (class MonadUnliftAff, UnliftAff(..), askUnliftAff)
import Effect.Class (liftEffect)
import Effect.Exception (error)
import Effect.Uncurried (mkEffectFn1)
import Foreign (Foreign, unsafeToForeign)
import Foreign.Object (Object)
import Foreign.Object as Object
import Node.Encoding (Encoding(..))
import Foreign.Object (union) as Object
import Node.Buffer (Buffer)
import Node.EventEmitter (EventHandle(..))
import Node.EventEmitter as Event
import Node.EventEmitter.UtilTypes (EventHandle1)
import Node.Stream (Read, Stream, Write)
import Node.Stream as Stream
import Node.Stream.Object (Transform) as Object
import Prim.Row (class Union)
import Prim.RowList (class RowToList)
import Unsafe.Coerce (unsafeCoerce)
data CSVRead
@@ -47,12 +22,9 @@ data CSVRead
-- | into parsed purescript objects.
-- |
-- | The CSV contents may be piped into this stream
-- | as Buffer or String encoded chunks.
-- |
-- | Records can be read with `read` when `Node.Stream.readable`
-- | is true.
type CSVParser :: Row Type -> Row Type -> Type
type CSVParser a r = Stream (read :: Read, write :: Write, csv :: CSVRead | r)
-- | as Buffer or String chunks.
type CSVParser :: Row Type -> Type
type CSVParser r = Stream (read :: Read, write :: Write, csv :: CSVRead | r)
-- | https://csv.js.org/parse/options/
type Config r =
@@ -84,127 +56,22 @@ type Config r =
)
-- | Create a CSVParser
make :: forall @r rl @config @missing @extra. RowToList r rl => ReadCSVRecord r rl => Union config missing (Config extra) => { | config } -> Effect (CSVParser r ())
make :: forall @config @missing @extra. Union config missing (Config extra) => { | config } -> Effect (CSVParser ())
make = makeImpl <<< unsafeToForeign <<< Object.union (recordToForeign { columns: false, cast: false, cast_date: false }) <<< recordToForeign
-- | Synchronously parse a CSV string
parse
:: forall @r rl @config missing extra m
. MonadUnliftAff m
=> MonadAff m
=> RowToList r rl
=> ReadCSVRecord r rl
=> Union config missing (Config extra)
=> { | config }
-> String
-> m (Array { | r })
parse config csv = do
stream <- liftEffect $ make @r @config @missing @extra config
void $ liftEffect $ Stream.writeString stream UTF8 csv
liftEffect $ Stream.end stream
readAll stream
-- | Loop until the stream is closed, invoking the callback with each record as it is parsed.
foreach
:: forall @r rl x m
. MonadUnliftAff m
=> MonadAff m
=> RowToList r rl
=> ReadCSVRecord r rl
=> CSVParser r x
-> ({ | r } -> m Unit)
-> m Unit
foreach stream cb = do
UnliftAff unlift <- askUnliftAff
alreadyHaveCols <- liftEffect $ getOrInitColumnsMap stream
when (isNothing alreadyHaveCols)
$ liftAff
$ makeAff \res -> pure mempty <* flip (Event.once columnsH) stream $ const do
void $ getOrInitColumnsMap stream
res $ Right unit
liftAff $ makeAff \res -> do
removeDataListener <- flip (Event.on dataH) stream \row -> launchAff_ $ delay (wrap 0.0) <* liftEffect do
cols <- liftMaybe (error "unreachable") =<< getOrInitColumnsMap stream
record <- liftEither $ lmap (error <<< show) $ runExcept $ readCSVRecord @r @rl cols row
launchAff_ $ flip catchError (liftEffect <<< res <<< Left) (unlift $ cb record)
removeEndListener <- flip (Event.once Stream.endH) stream (res $ Right unit)
removeErrorListener <- flip (Event.on Stream.errorH) stream (res <<< Left)
pure $ Canceler $ const $ liftEffect do
removeDataListener
removeEndListener
removeErrorListener
-- | Reads a parsed record from the stream.
-- |
-- | Returns `Nothing` when either:
-- | - The internal buffer of parsed records has been exhausted, but there will be more (`Node.Stream.readable` and `Node.Stream.closed` are both `false`)
-- | - All records have been processed (`Node.Stream.closed` is `true`)
read
:: forall @r rl a
. RowToList r rl
=> ReadCSVRecord r rl
=> CSVParser r a
-> Effect (Maybe { | r })
read stream = runMaybeT do
cols <- MaybeT $ getOrInitColumnsMap stream
raw :: Array String <- MaybeT $ Nullable.toMaybe <$> readImpl stream
liftEither $ lmap (error <<< show) $ runExcept $ readCSVRecord @r @rl cols raw
-- | Collect all parsed records into an array
readAll
:: forall @r rl a m
. MonadUnliftAff m
=> MonadAff m
=> RowToList r rl
=> ReadCSVRecord r rl
=> CSVParser r a
-> m (Array { | r })
readAll stream = do
records <- liftEffect $ liftST $ Array.ST.new
foreach stream $ void <<< liftEffect <<< liftST <<< flip Array.ST.push records
liftEffect $ liftST $ Array.ST.unsafeFreeze records
toObjectStream :: CSVParser () -> Object.Transform Buffer (Array String)
toObjectStream = unsafeCoerce
-- | `data` event. Emitted when a CSV record has been parsed.
dataH :: forall r a. EventHandle1 (CSVParser r a) (Array String)
dataH :: forall a. EventHandle1 (CSVParser a) (Array String)
dataH = EventHandle "data" mkEffectFn1
-- | `columns` event. Emitted when the header row has been parsed.
columnsH :: forall r a. EventHandle1 (CSVParser r a) (Array String)
columnsH = EventHandle "columns" mkEffectFn1
-- | FFI
foreign import makeImpl :: forall r. Foreign -> Effect (Stream r)
-- | FFI
foreign import readImpl :: forall r. Stream r -> Effect (Nullable (Array String))
-- | FFI
foreign import columnsArrayImpl :: forall r. Stream r -> Effect (Array String)
-- | FFI
foreign import columnsMapImpl :: forall r. Stream r -> Effect (Nullable (Map String Int))
-- | FFI
foreign import setColumnsMapImpl :: forall r. Stream r -> Map String Int -> Effect Unit
-- | FFI
getOrInitColumnsMap :: forall r x. CSVParser r x -> Effect (Maybe (Map String Int))
getOrInitColumnsMap s = runMaybeT do
cols :: Array String <- MaybeT $ filter (not <<< Array.null) <$> Just <$> columnsArrayImpl s
let
get = MaybeT $ Nullable.toMaybe <$> columnsMapImpl s
init = do
let
ixs = Array.range 0 (Array.length cols - 1)
assoc = Array.zip cols ixs
map = Map.fromFoldable assoc
lift $ setColumnsMapImpl s map
pure map
get <|> init
-- | FFI
recordToForeign :: forall r. Record r -> Object Foreign
recordToForeign = unsafeCoerce

View File

@@ -2,30 +2,16 @@ module Node.Stream.CSV.Stringify where
import Prelude
import Control.Monad.Rec.Class (class MonadRec, whileJust)
import Control.Monad.ST.Global as ST
import Data.Array as Array
import Data.Array.ST as Array.ST
import Data.CSV.Record (class WriteCSVRecord, writeCSVRecord)
import Data.Either (Either(..), blush)
import Data.Foldable (class Foldable, fold)
import Data.Maybe (Maybe(..))
import Data.String.Regex (Regex)
import Data.Traversable (for_)
import Effect (Effect)
import Effect.Aff (Canceler(..), makeAff)
import Effect.Aff.Class (class MonadAff, liftAff)
import Effect.Class (liftEffect)
import Foreign (Foreign, unsafeToForeign)
import Foreign.Object (Object)
import Foreign.Object as Object
import Node.EventEmitter as Event
import Foreign.Object (union) as Object
import Node.Stream (Read, Stream, Write)
import Node.Stream as Stream
import Node.Stream.Object (Transform) as Object
import Prim.Row (class Union)
import Prim.RowList (class RowToList)
import Record.Extra (class Keys, keys)
import Type.Prelude (Proxy(..))
import Unsafe.Coerce (unsafeCoerce)
data CSVWrite
@@ -38,8 +24,8 @@ data CSVWrite
-- | Stringified rows are emitted on the `Readable` end as string
-- | chunks, meaning it can be treated as a `Node.Stream.Readable`
-- | that has had `setEncoding UTF8` invoked on it.
type CSVStringifier :: Row Type -> Row Type -> Type
type CSVStringifier a r = Stream (read :: Read, write :: Write, csv :: CSVWrite | r)
type CSVStringifier :: Row Type -> Type
type CSVStringifier r = Stream (read :: Read, write :: Write, csv :: CSVWrite | r)
-- | https://csv.js.org/stringify/options/
type Config r =
@@ -48,7 +34,6 @@ type Config r =
, record_delimiter :: String
, escape :: String
, escape_formulas :: Boolean
, header :: Boolean
, quote :: String
, quoted :: Boolean
, quoted_empty :: Boolean
@@ -63,43 +48,36 @@ foreign import writeImpl :: forall r. Stream r -> Array String -> Effect Unit
recordToForeign :: forall r. Record r -> Object Foreign
recordToForeign = unsafeCoerce
-- | Create a CSVStringifier
make :: forall @r rl @config @missing @extra. Keys rl => RowToList r rl => WriteCSVRecord r rl => Union config missing (Config extra) => { | config } -> Effect (CSVStringifier r ())
make = makeImpl <<< unsafeToForeign <<< Object.union (recordToForeign { columns: Array.fromFoldable $ keys (Proxy @r) }) <<< recordToForeign
-- | Create a raw Transform stream that accepts chunks of `Array String`,
-- | and transforms them into string CSV rows.
-- |
-- | Requires an ordered array of column names.
make
:: forall @config @missing @extra
. Union config missing (Config extra)
=> Array String
-> { | config }
-> Effect (CSVStringifier ())
make columns =
makeImpl
<<< unsafeToForeign
<<< Object.union (recordToForeign { columns, header: true })
<<< recordToForeign
-- | Synchronously stringify a collection of records
stringify :: forall @r rl f m @config missing extra. MonadAff m => MonadRec m => Keys rl => Foldable f => RowToList r rl => WriteCSVRecord r rl => Union config missing (Config extra) => { | config } -> f { | r } -> m String
stringify config records = do
stream <- liftEffect $ make @r @config @missing @extra config
liftEffect $ for_ records \r -> write stream r
liftEffect $ Stream.end stream
readAll stream
-- | Convert the raw stream to a typed ObjectStream
toObjectStream :: CSVStringifier () -> Object.Transform (Array String) String
toObjectStream = unsafeCoerce
-- | Write a record to a CSVStringifier.
-- |
-- | The record will be emitted on the `Readable` end
-- | of the stream as a string chunk.
write :: forall @r rl a. RowToList r rl => WriteCSVRecord r rl => CSVStringifier r a -> { | r } -> Effect Unit
write :: forall @r rl a. RowToList r rl => WriteCSVRecord r rl => CSVStringifier a -> { | r } -> Effect Unit
write s = writeImpl s <<< writeCSVRecord @r @rl
-- | Loop until the stream is closed, invoking the callback with each chunk of stringified CSV text.
foreach :: forall m r x. MonadAff m => MonadRec m => CSVStringifier r x -> (String -> m Unit) -> m Unit
foreach stream cb = whileJust do
isReadable <- liftEffect $ Stream.readable stream
liftAff $ when (not isReadable) $ makeAff \res -> do
stop <- flip (Event.once Stream.readableH) stream $ res $ Right unit
pure $ Canceler $ const $ liftEffect stop
whileJust do
s <- liftEffect $ (join <<< map blush) <$> Stream.readEither stream
for_ s cb
pure $ void s
isClosed <- liftEffect $ Stream.closed stream
pure $ if isClosed then Nothing else Just unit
-- | Read the stringified chunks until end-of-stream, returning the entire CSV string.
readAll :: forall r a m. MonadAff m => MonadRec m => CSVStringifier r a -> m String
readAll stream = do
chunks <- liftEffect $ ST.toEffect $ Array.ST.new
foreach stream $ void <<< liftEffect <<< ST.toEffect <<< flip Array.ST.push chunks
chunks' <- liftEffect $ ST.toEffect $ Array.ST.unsafeFreeze chunks
pure $ fold chunks'
-- | Write a record to a CSVStringifier.
-- |
-- | The record will be emitted on the `Readable` end
-- | of the stream as a string chunk.
writeRaw :: forall a. CSVStringifier a -> Array String -> Effect Unit
writeRaw = writeImpl

98
src/Pipes.CSV.purs Normal file
View File

@@ -0,0 +1,98 @@
module Pipes.CSV where
import Prelude
import Control.Monad.Error.Class (liftEither)
import Control.Monad.Except (runExcept)
import Control.Monad.Rec.Class (forever)
import Control.Monad.ST.Global as ST
import Control.Monad.ST.Ref as STRef
import Data.Array as Array
import Data.Bifunctor (lmap)
import Data.CSV.Record (class ReadCSVRecord, class WriteCSVRecord, readCSVRecord, writeCSVRecord)
import Data.FunctorWithIndex (mapWithIndex)
import Data.Map as Map
import Data.Maybe (Maybe(..))
import Data.Tuple.Nested ((/\))
import Effect.Aff (Aff)
import Effect.Class (liftEffect)
import Effect.Exception (error)
import Node.Buffer (Buffer)
import Node.Stream.CSV.Parse as CSV.Parse
import Node.Stream.CSV.Stringify as CSV.Stringify
import Pipes (await, yield, (>->))
import Pipes.Core (Pipe)
import Pipes.Node.Stream as Pipes.Stream
import Prim.RowList (class RowToList)
import Record.Extra (class Keys, keys)
import Type.Prelude (Proxy(..))
-- | Transforms buffer chunks of a CSV file to parsed
-- | records of `r`.
-- |
-- | ```
-- | -- == my-data.csv.gz ==
-- | -- id,foo,is_deleted
-- | -- 1,hi,f
-- | -- 2,bye,t
-- |
-- | rows
-- | :: Array {id :: Int, foo :: String, is_deleted :: Boolean}
-- | <- map Array.fromFoldable
-- | $ Pipes.toListM
-- | $ Pipes.Node.Stream.unEOS
-- | $ Pipes.Node.FS.read "my-data.csv.gz"
-- | >-> Pipes.Node.Zlib.gunzip
-- | >-> Pipes.CSV.parse
-- | rows `shouldEqual` [{id: 1, foo: "hi", is_deleted: false}, {id: 2, foo: "bye", is_deleted: true}]
-- | ```
parse
:: forall @r rl
. RowToList r rl
=> ReadCSVRecord r rl
=> Pipe (Maybe Buffer) (Maybe { | r }) Aff Unit
parse = do
raw <- liftEffect $ CSV.Parse.make {}
colsST <- liftEffect $ ST.toEffect $ STRef.new Nothing
let
readCols = liftEffect $ ST.toEffect $ STRef.read colsST
putCols a = void $ liftEffect $ ST.toEffect $ STRef.write (Just a) colsST
parse' a cols' = liftEither $ lmap (error <<< show) $ runExcept $ readCSVRecord @r @rl cols' a
firstRow a = putCols $ Map.fromFoldable $ mapWithIndex (flip (/\)) a
row a cols' = yield =<< parse' a cols'
unmarshal = forever do
r <- await
cols <- readCols
case cols of
Just cols' -> row r cols'
Nothing -> firstRow r
parser = Pipes.Stream.fromTransform $ CSV.Parse.toObjectStream raw
parser >-> Pipes.Stream.inEOS unmarshal
-- | Transforms buffer chunks of a CSV file to parsed
-- | arrays of CSV values.
parseRaw :: Pipe (Maybe Buffer) (Maybe (Array String)) Aff Unit
parseRaw = do
s <- liftEffect $ CSV.Parse.toObjectStream <$> CSV.Parse.make {}
Pipes.Stream.fromTransform s
-- | Transforms CSV rows into stringified CSV records
-- | using the given ordered array of column names.
stringifyRaw :: Array String -> Pipe (Maybe (Array String)) (Maybe String) Aff Unit
stringifyRaw columns = do
s <- liftEffect $ CSV.Stringify.toObjectStream <$> CSV.Stringify.make columns {}
Pipes.Stream.fromTransform s
-- | Transforms purescript records into stringified CSV records.
-- |
-- | Columns are inferred from the record's keys, ordered alphabetically.
stringify :: forall r rl. WriteCSVRecord r rl => RowToList r rl => Keys rl => Pipe (Maybe { | r }) (Maybe String) Aff Unit
stringify = do
raw <- liftEffect $ CSV.Stringify.make (Array.fromFoldable $ keys $ Proxy @r) {}
let
printer = Pipes.Stream.fromTransform $ CSV.Stringify.toObjectStream raw
marshal = forever $ yield =<< (writeCSVRecord @r @rl <$> await)
Pipes.Stream.inEOS marshal >-> printer

View File

@@ -2,11 +2,13 @@ module Test.Main where
import Prelude
import Data.Maybe (Maybe(..))
import Effect (Effect)
import Effect.Class.Console (log)
import Effect.Aff (launchAff_)
import Test.Pipes.CSV as Test.Pipes.CSV
import Test.Spec.Reporter (specReporter)
import Test.Spec.Runner (defaultConfig, runSpec')
main :: Effect Unit
main = do
log "🍕"
log "You should add some tests."
main = launchAff_ $ runSpec' (defaultConfig { failFast = true, timeout = Nothing }) [ specReporter ] do
Test.Pipes.CSV.spec

88
test/Test/Pipes.CSV.purs Normal file
View File

@@ -0,0 +1,88 @@
module Test.Pipes.CSV where
import Prelude
import Control.Monad.Gen (chooseInt)
import Control.Monad.Rec.Class (Step(..), tailRecM)
import Data.Array as Array
import Data.DateTime (DateTime)
import Data.Foldable (fold, sum)
import Data.Maybe (Maybe(..), fromJust)
import Data.Newtype (wrap)
import Data.PreciseDateTime (fromRFC3339String, toDateTimeLossy)
import Data.String.CodePoints as String.CodePoints
import Data.Tuple.Nested ((/\))
import Effect.Class (liftEffect)
import Effect.Console (log)
import Node.Encoding (Encoding(..))
import Partial.Unsafe (unsafePartial)
import Pipes (yield, (>->))
import Pipes.CSV as Pipes.CSV
import Pipes.Collect as Pipes.Collect
import Pipes.Construct as Pipes.Construct
import Pipes.Node.Buffer as Pipes.Buffer
import Pipes.Node.Stream as Pipes.Stream
import Pipes.Prelude (chain, map, toListM) as Pipes
import Pipes.Util as Pipes.Util
import Test.QuickCheck.Gen (randomSample')
import Test.Spec (Spec, before, describe, it)
import Test.Spec.Assertions (shouldEqual)
csv :: String
csv =
"""created,flag,foo,id
2020-01-01T00:00:00.0Z,true,a,1
2024-02-02T08:00:00.0Z,false,apple,2
1970-01-01T00:00:00.0Z,true,hello,3
"""
dt :: String -> DateTime
dt = toDateTimeLossy <<< unsafePartial fromJust <<< fromRFC3339String <<< wrap
spec :: Spec Unit
spec =
describe "Pipes.CSV" do
it "stringify" do
let
objs =
[ { id: 1, foo: "a", flag: true, created: dt "2020-01-01T00:00:00Z" }
, { id: 2, foo: "apple", flag: false, created: dt "2024-02-02T08:00:00Z" }
, { id: 3, foo: "hello", flag: true, created: dt "1970-01-01T00:00:00Z" }
]
csv' <- map fold $ Pipes.Collect.toArray $ Pipes.Stream.withEOS (Pipes.Construct.eachArray objs) >-> Pipes.CSV.stringify >-> Pipes.Stream.unEOS
csv' `shouldEqual` csv
describe "parse" do
it "parses csv" do
rows <- map Array.fromFoldable
$ Pipes.toListM
$ Pipes.Stream.withEOS (yield csv)
>-> Pipes.Stream.inEOS (Pipes.Buffer.fromString UTF8)
>-> Pipes.CSV.parse
>-> Pipes.Stream.unEOS
rows `shouldEqual`
[ { id: 1, foo: "a", flag: true, created: dt "2020-01-01T00:00:00Z" }
, { id: 2, foo: "apple", flag: false, created: dt "2024-02-02T08:00:00Z" }
, { id: 3, foo: "hello", flag: true, created: dt "1970-01-01T00:00:00Z" }
]
before
(do
nums <- liftEffect $ randomSample' 100000 (chooseInt 0 9)
let
chars = [ "i","d","\n" ] <> join ((\n -> [show n, "\n"]) <$> nums)
bufs <- Pipes.Collect.toArray
$ Pipes.Stream.withEOS (Pipes.Construct.eachArray chars)
>-> Pipes.Util.chunked 1000
>-> Pipes.Stream.inEOS (Pipes.map fold >-> Pipes.Buffer.fromString UTF8)
>-> Pipes.Stream.unEOS
pure $ nums /\ bufs
)
$ it "parses large csv" \(nums /\ bufs) -> do
rows <-
Pipes.Collect.toArray
$ Pipes.Stream.withEOS (Pipes.Construct.eachArray bufs)
>-> Pipes.CSV.parse @(id :: Int)
>-> Pipes.Stream.unEOS
rows `shouldEqual` ((\id -> { id }) <$> nums)