|
|
|
|
@@ -3,7 +3,7 @@ module Node.Stream.CSV.Parse where
|
|
|
|
|
import Prelude hiding (join)
|
|
|
|
|
|
|
|
|
|
import Control.Alt ((<|>))
|
|
|
|
|
import Control.Monad.Error.Class (liftEither)
|
|
|
|
|
import Control.Monad.Error.Class (liftEither, liftMaybe)
|
|
|
|
|
import Control.Monad.Except (runExcept)
|
|
|
|
|
import Control.Monad.Except.Trans (catchError)
|
|
|
|
|
import Control.Monad.Maybe.Trans (MaybeT(..), runMaybeT)
|
|
|
|
|
@@ -15,10 +15,9 @@ import Data.Bifunctor (lmap)
|
|
|
|
|
import Data.CSV.Record (class ReadCSVRecord, readCSVRecord)
|
|
|
|
|
import Data.Either (Either(..))
|
|
|
|
|
import Data.Filterable (filter)
|
|
|
|
|
import Data.Foldable (for_)
|
|
|
|
|
import Data.Map (Map)
|
|
|
|
|
import Data.Map as Map
|
|
|
|
|
import Data.Maybe (Maybe(..))
|
|
|
|
|
import Data.Maybe (Maybe(..), isNothing)
|
|
|
|
|
import Data.Newtype (wrap)
|
|
|
|
|
import Data.Nullable (Nullable)
|
|
|
|
|
import Data.Nullable as Nullable
|
|
|
|
|
@@ -117,12 +116,19 @@ foreach
|
|
|
|
|
-> m Unit
|
|
|
|
|
foreach stream cb = do
|
|
|
|
|
UnliftAff unlift <- askUnliftAff
|
|
|
|
|
|
|
|
|
|
alreadyHaveCols <- liftEffect $ getOrInitColumnsMap stream
|
|
|
|
|
when (isNothing alreadyHaveCols)
|
|
|
|
|
$ liftAff
|
|
|
|
|
$ makeAff \res -> pure mempty <* flip (Event.once columnsH) stream $ const do
|
|
|
|
|
void $ getOrInitColumnsMap stream
|
|
|
|
|
res $ Right unit
|
|
|
|
|
|
|
|
|
|
liftAff $ makeAff \res -> do
|
|
|
|
|
removeDataListener <- flip (Event.on dataH) stream \row -> launchAff_ $ delay (wrap 0.0) <* liftEffect do
|
|
|
|
|
cols <- getOrInitColumnsMap stream
|
|
|
|
|
for_ cols \cols' -> do
|
|
|
|
|
record <- liftEither $ lmap (error <<< show) $ runExcept $ readCSVRecord @r @rl cols' row
|
|
|
|
|
launchAff_ $ flip catchError (liftEffect <<< res <<< Left) (unlift $ cb record)
|
|
|
|
|
cols <- liftMaybe (error "unreachable") =<< getOrInitColumnsMap stream
|
|
|
|
|
record <- liftEither $ lmap (error <<< show) $ runExcept $ readCSVRecord @r @rl cols row
|
|
|
|
|
launchAff_ $ flip catchError (liftEffect <<< res <<< Left) (unlift $ cb record)
|
|
|
|
|
removeEndListener <- flip (Event.once Stream.endH) stream (res $ Right unit)
|
|
|
|
|
removeErrorListener <- flip (Event.on Stream.errorH) stream (res <<< Left)
|
|
|
|
|
|
|
|
|
|
@@ -165,6 +171,10 @@ readAll stream = do
|
|
|
|
|
dataH :: forall r a. EventHandle1 (CSVParser r a) (Array String)
|
|
|
|
|
dataH = EventHandle "data" mkEffectFn1
|
|
|
|
|
|
|
|
|
|
-- | `columns` event. Emitted when the header row has been parsed.
|
|
|
|
|
columnsH :: forall r a. EventHandle1 (CSVParser r a) (Array String)
|
|
|
|
|
columnsH = EventHandle "columns" mkEffectFn1
|
|
|
|
|
|
|
|
|
|
-- | FFI
|
|
|
|
|
foreign import makeImpl :: forall r. Foreign -> Effect (Stream r)
|
|
|
|
|
|
|
|
|
|
|