|
|
|
|
@@ -3,15 +3,14 @@ module Node.Stream.CSV.Parse where
|
|
|
|
|
import Prelude hiding (join)
|
|
|
|
|
|
|
|
|
|
import Control.Alt ((<|>))
|
|
|
|
|
import Control.Alternative (class Alternative)
|
|
|
|
|
import Control.Monad.Error.Class (liftEither)
|
|
|
|
|
import Control.Monad.Except (runExcept)
|
|
|
|
|
import Control.Monad.Fork.Class (class MonadFork, fork, join)
|
|
|
|
|
import Control.Monad.Maybe.Trans (MaybeT(..), runMaybeT)
|
|
|
|
|
import Control.Monad.Rec.Class (class MonadRec, tailRecM, Step(..), whileJust)
|
|
|
|
|
import Control.Monad.Rec.Class (class MonadRec, whileJust)
|
|
|
|
|
import Control.Monad.ST.Global as ST
|
|
|
|
|
import Control.Monad.Trans.Class (lift)
|
|
|
|
|
import Control.Parallel (class Parallel, parTraverse)
|
|
|
|
|
import Control.MonadPlus (class Alternative)
|
|
|
|
|
import Control.Parallel (class Parallel, parTraverse_)
|
|
|
|
|
import Data.Array as Array
|
|
|
|
|
import Data.Array.ST as Array.ST
|
|
|
|
|
import Data.Bifunctor (lmap)
|
|
|
|
|
@@ -20,12 +19,13 @@ import Data.Either (Either(..))
|
|
|
|
|
import Data.Filterable (filter)
|
|
|
|
|
import Data.Map (Map)
|
|
|
|
|
import Data.Map as Map
|
|
|
|
|
import Data.Maybe (Maybe(..))
|
|
|
|
|
import Data.Newtype (wrap)
|
|
|
|
|
import Data.Maybe (Maybe(..), isNothing)
|
|
|
|
|
import Data.Nullable (Nullable)
|
|
|
|
|
import Data.Nullable as Nullable
|
|
|
|
|
import Data.Traversable (for)
|
|
|
|
|
import Effect (Effect)
|
|
|
|
|
import Effect.Aff (Canceler(..), delay, makeAff)
|
|
|
|
|
import Effect as Effect
|
|
|
|
|
import Effect.Aff (Canceler(..), makeAff)
|
|
|
|
|
import Effect.Aff.Class (class MonadAff, liftAff)
|
|
|
|
|
import Effect.Class (liftEffect)
|
|
|
|
|
import Effect.Exception (error)
|
|
|
|
|
@@ -90,7 +90,18 @@ make :: forall @r rl @config @missing @extra. RowToList r rl => ReadCSVRecord r
|
|
|
|
|
make = makeImpl <<< unsafeToForeign <<< Object.union (recordToForeign { columns: false, cast: false, cast_date: false }) <<< recordToForeign
|
|
|
|
|
|
|
|
|
|
-- | Synchronously parse a CSV string
|
|
|
|
|
parse :: forall @r rl @config missing extra m p f. Parallel p m => MonadFork f m => Alternative p => MonadAff m => MonadRec m => RowToList r rl => ReadCSVRecord r rl => Union config missing (Config extra) => { | config } -> String -> m (Array { | r })
|
|
|
|
|
parse
|
|
|
|
|
:: forall @r rl @config missing extra m p
|
|
|
|
|
. Alternative p
|
|
|
|
|
=> Parallel p m
|
|
|
|
|
=> MonadAff m
|
|
|
|
|
=> MonadRec m
|
|
|
|
|
=> RowToList r rl
|
|
|
|
|
=> ReadCSVRecord r rl
|
|
|
|
|
=> Union config missing (Config extra)
|
|
|
|
|
=> { | config }
|
|
|
|
|
-> String
|
|
|
|
|
-> m (Array { | r })
|
|
|
|
|
parse config csv = do
|
|
|
|
|
stream <- liftEffect $ make @r @config @missing @extra config
|
|
|
|
|
void $ liftEffect $ Stream.writeString stream UTF8 csv
|
|
|
|
|
@@ -98,21 +109,31 @@ parse config csv = do
|
|
|
|
|
readAll stream
|
|
|
|
|
|
|
|
|
|
-- | Loop until the stream is closed, invoking the callback with each record as it is parsed.
|
|
|
|
|
foreach :: forall @r rl x m f p. Parallel p m => Alternative p => MonadFork f m => MonadRec m => MonadAff m => RowToList r rl => ReadCSVRecord r rl => CSVParser r x -> ({ | r } -> m Unit) -> m Unit
|
|
|
|
|
foreach
|
|
|
|
|
:: forall @r rl x m p
|
|
|
|
|
. Alternative p
|
|
|
|
|
=> Parallel p m
|
|
|
|
|
=> MonadRec m
|
|
|
|
|
=> MonadAff m
|
|
|
|
|
=> RowToList r rl
|
|
|
|
|
=> ReadCSVRecord r rl
|
|
|
|
|
=> CSVParser r x
|
|
|
|
|
-> ({ | r } -> m Unit)
|
|
|
|
|
-> m Unit
|
|
|
|
|
foreach stream cb = whileJust do
|
|
|
|
|
isReadable <- liftEffect $ Stream.readable stream
|
|
|
|
|
liftAff $ when (not isReadable) $ makeAff \res -> do
|
|
|
|
|
stop <- flip (Event.once Stream.readableH) stream $ res $ Right unit
|
|
|
|
|
pure $ Canceler $ const $ liftEffect stop
|
|
|
|
|
fibers <- flip tailRecM [] \fibers -> do
|
|
|
|
|
liftAff $ delay $ wrap 0.0
|
|
|
|
|
r <- liftEffect $ read @r stream
|
|
|
|
|
case r of
|
|
|
|
|
Just r' -> do
|
|
|
|
|
f <- fork (cb r')
|
|
|
|
|
pure $ Loop $ fibers <> [ f ]
|
|
|
|
|
Nothing -> pure $ Done fibers
|
|
|
|
|
void $ parTraverse join fibers
|
|
|
|
|
|
|
|
|
|
recordsST <- liftEffect $ ST.toEffect $ Array.ST.new
|
|
|
|
|
liftEffect $ Effect.untilE do
|
|
|
|
|
r <- read @r stream
|
|
|
|
|
void $ for r $ ST.toEffect <<< flip Array.ST.push recordsST
|
|
|
|
|
pure $ isNothing r
|
|
|
|
|
records <- liftEffect $ ST.toEffect $ Array.ST.unsafeFreeze recordsST
|
|
|
|
|
|
|
|
|
|
parTraverse_ cb records
|
|
|
|
|
isClosed <- liftEffect $ Stream.closed stream
|
|
|
|
|
pure $ if isClosed then Nothing else Just unit
|
|
|
|
|
|
|
|
|
|
@@ -121,14 +142,28 @@ foreach stream cb = whileJust do
|
|
|
|
|
-- | Returns `Nothing` when either:
|
|
|
|
|
-- | - The internal buffer of parsed records has been exhausted, but there will be more (`Node.Stream.readable` and `Node.Stream.closed` are both `false`)
|
|
|
|
|
-- | - All records have been processed (`Node.Stream.closed` is `true`)
|
|
|
|
|
read :: forall @r rl a. RowToList r rl => ReadCSVRecord r rl => CSVParser r a -> Effect (Maybe { | r })
|
|
|
|
|
read
|
|
|
|
|
:: forall @r rl a
|
|
|
|
|
. RowToList r rl
|
|
|
|
|
=> ReadCSVRecord r rl
|
|
|
|
|
=> CSVParser r a
|
|
|
|
|
-> Effect (Maybe { | r })
|
|
|
|
|
read stream = runMaybeT do
|
|
|
|
|
cols <- MaybeT $ getOrInitColumnsMap stream
|
|
|
|
|
raw :: Array String <- MaybeT $ Nullable.toMaybe <$> readImpl stream
|
|
|
|
|
liftEither $ lmap (error <<< show) $ runExcept $ readCSVRecord @r @rl cols raw
|
|
|
|
|
|
|
|
|
|
-- | Collect all parsed records into an array
|
|
|
|
|
readAll :: forall @r rl a m p f. Parallel p m => MonadFork f m => Alternative p => MonadRec m => MonadAff m => RowToList r rl => ReadCSVRecord r rl => CSVParser r a -> m (Array { | r })
|
|
|
|
|
readAll
|
|
|
|
|
:: forall @r rl a m p
|
|
|
|
|
. Alternative p
|
|
|
|
|
=> Parallel p m
|
|
|
|
|
=> MonadRec m
|
|
|
|
|
=> MonadAff m
|
|
|
|
|
=> RowToList r rl
|
|
|
|
|
=> ReadCSVRecord r rl
|
|
|
|
|
=> CSVParser r a
|
|
|
|
|
-> m (Array { | r })
|
|
|
|
|
readAll stream = do
|
|
|
|
|
records <- liftEffect $ ST.toEffect $ Array.ST.new
|
|
|
|
|
foreach stream $ void <<< liftEffect <<< ST.toEffect <<< flip Array.ST.push records
|
|
|
|
|
|