|
|
|
|
@@ -3,15 +3,15 @@ module Node.Stream.CSV.Parse where
|
|
|
|
|
import Prelude hiding (join)
|
|
|
|
|
|
|
|
|
|
import Control.Alt ((<|>))
|
|
|
|
|
import Control.Alternative (guard)
|
|
|
|
|
import Control.Alternative (guard, empty)
|
|
|
|
|
import Control.Monad.Error.Class (liftEither)
|
|
|
|
|
import Control.Monad.Except (runExcept)
|
|
|
|
|
import Control.Monad.Maybe.Trans (MaybeT(..), runMaybeT)
|
|
|
|
|
import Control.Monad.Rec.Class (class MonadRec, whileJust)
|
|
|
|
|
import Control.Monad.ST.Global as ST
|
|
|
|
|
import Control.Monad.Rec.Class (class MonadRec, untilJust, whileJust)
|
|
|
|
|
import Control.Monad.ST.Class (liftST)
|
|
|
|
|
import Control.Monad.Trans.Class (lift)
|
|
|
|
|
import Control.MonadPlus (class Alternative)
|
|
|
|
|
import Control.Parallel (class Parallel, parTraverse_)
|
|
|
|
|
import Control.Parallel (class Parallel, parSequence_)
|
|
|
|
|
import Data.Array as Array
|
|
|
|
|
import Data.Array.ST as Array.ST
|
|
|
|
|
import Data.Bifunctor (lmap)
|
|
|
|
|
@@ -123,29 +123,53 @@ foreach
|
|
|
|
|
-> ({ | r } -> m Unit)
|
|
|
|
|
-> m Unit
|
|
|
|
|
foreach stream cb =
|
|
|
|
|
whileJust
|
|
|
|
|
$ runMaybeT
|
|
|
|
|
$ do
|
|
|
|
|
liftAff $ delay $ wrap 0.0
|
|
|
|
|
do
|
|
|
|
|
q <- liftEffect $ liftST $ Array.ST.new
|
|
|
|
|
|
|
|
|
|
guard =<< not <$> liftEffect (Stream.closed stream)
|
|
|
|
|
let
|
|
|
|
|
deque = liftEffect $ liftST $ Array.ST.shift q
|
|
|
|
|
enque a = liftEffect $ liftST $ Array.ST.push a q
|
|
|
|
|
|
|
|
|
|
isReadable <- liftEffect $ Stream.readable stream
|
|
|
|
|
liftAff $ when (not isReadable) $ makeAff \res -> do
|
|
|
|
|
stop <- flip (Event.once Stream.readableH) stream $ res $ Right unit
|
|
|
|
|
pure $ Canceler $ const $ liftEffect stop
|
|
|
|
|
waitReadable =
|
|
|
|
|
makeAff \res -> do
|
|
|
|
|
stop <- flip (Event.once Stream.readableH) stream $ res $ Right unit
|
|
|
|
|
pure $ Canceler $ const $ liftEffect stop
|
|
|
|
|
|
|
|
|
|
recordsST <- liftEffect $ ST.toEffect $ Array.ST.new
|
|
|
|
|
liftEffect $ Effect.untilE do
|
|
|
|
|
r <- read @r stream
|
|
|
|
|
void $ for r $ ST.toEffect <<< flip Array.ST.push recordsST
|
|
|
|
|
pure $ isNothing r
|
|
|
|
|
records <- liftEffect $ ST.toEffect $ Array.ST.unsafeFreeze recordsST
|
|
|
|
|
processQ =
|
|
|
|
|
untilJust
|
|
|
|
|
$ runMaybeT
|
|
|
|
|
$ do
|
|
|
|
|
liftAff $ delay $ wrap 0.0
|
|
|
|
|
r <- deque
|
|
|
|
|
isClosed <- liftEffect $ Stream.closed stream
|
|
|
|
|
if isClosed && isNothing r then
|
|
|
|
|
pure unit
|
|
|
|
|
else if isNothing r then do
|
|
|
|
|
liftAff $ delay $ wrap 10.0
|
|
|
|
|
empty
|
|
|
|
|
else do
|
|
|
|
|
r' <- MaybeT $ pure r
|
|
|
|
|
lift $ cb r'
|
|
|
|
|
empty
|
|
|
|
|
|
|
|
|
|
lift $ parTraverse_ cb records
|
|
|
|
|
guard =<< not <$> liftEffect (Stream.closed stream)
|
|
|
|
|
pure unit
|
|
|
|
|
readToQ =
|
|
|
|
|
whileJust
|
|
|
|
|
$ runMaybeT
|
|
|
|
|
$ do
|
|
|
|
|
liftAff $ delay $ wrap 0.0
|
|
|
|
|
guard =<< not <$> liftEffect (Stream.closed stream)
|
|
|
|
|
isReadable <- liftEffect $ Stream.readable stream
|
|
|
|
|
liftAff $ when (not isReadable) waitReadable
|
|
|
|
|
|
|
|
|
|
liftEffect $ Effect.untilE do
|
|
|
|
|
r <- read @r stream
|
|
|
|
|
void $ for r enque
|
|
|
|
|
pure $ isNothing r
|
|
|
|
|
guard =<< not <$> liftEffect (Stream.closed stream)
|
|
|
|
|
pure unit
|
|
|
|
|
|
|
|
|
|
parSequence_ [readToQ, processQ]
|
|
|
|
|
|
|
|
|
|
-- | Reads a parsed record from the stream.
|
|
|
|
|
-- |
|
|
|
|
|
-- | Returns `Nothing` when either:
|
|
|
|
|
@@ -174,9 +198,9 @@ readAll
|
|
|
|
|
=> CSVParser r a
|
|
|
|
|
-> m (Array { | r })
|
|
|
|
|
readAll stream = do
|
|
|
|
|
records <- liftEffect $ ST.toEffect $ Array.ST.new
|
|
|
|
|
foreach stream $ void <<< liftEffect <<< ST.toEffect <<< flip Array.ST.push records
|
|
|
|
|
liftEffect $ ST.toEffect $ Array.ST.unsafeFreeze records
|
|
|
|
|
records <- liftEffect $ liftST $ Array.ST.new
|
|
|
|
|
foreach stream $ void <<< liftEffect <<< liftST <<< flip Array.ST.push records
|
|
|
|
|
liftEffect $ liftST $ Array.ST.unsafeFreeze records
|
|
|
|
|
|
|
|
|
|
-- | `data` event. Emitted when a CSV record has been parsed.
|
|
|
|
|
dataH :: forall r a. EventHandle1 (CSVParser r a) { | r }
|
|
|
|
|
|