{-# LANGUAGE DataKinds #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StaticPointers #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeFamilies #-}

module Control.Distributed.Dataset.Internal.Dataset where

import Conduit hiding
  ( Consumer,
    Producer,
    await,
  )
import qualified Conduit as C
import Control.Distributed.Closure
import Control.Distributed.Dataset.Internal.Class
import Control.Distributed.Dataset.Internal.Process
import Control.Distributed.Dataset.ShuffleStore
import qualified Control.Distributed.Fork.Utils as D
import Control.Lens
import Control.Monad
import Control.Monad.Logger
import Data.Conduit.Serialise
import Data.Hashable
import Data.IORef
import qualified Data.IntMap as M
import qualified Data.IntMap.Merge.Strict as M
import Data.List
  ( foldl',
    sortOn,
    transpose,
  )
import Data.List.Split
import qualified Data.Text as T
import Data.Typeable
import System.Random

-- * Partition

-- |
-- Represents some amount of data which to be transformed on a single
-- executor.
data Partition a where
  PEmpty :: Partition a
  PSimple :: Closure (ConduitT () a (ResourceT IO) ()) -> Partition a
  PCombined :: Partition a -> Partition a -> Partition a

instance Semigroup (Partition a) where
  Partition a
a <> :: Partition a -> Partition a -> Partition a
<> Partition a
b = Partition a -> Partition a -> Partition a
forall a. Partition a -> Partition a -> Partition a
PCombined Partition a
a Partition a
b

instance Monoid (Partition a) where
  mempty :: Partition a
mempty = Partition a
forall a. Partition a
PEmpty

-- |
-- Streams the elements from a 'Partition'.
partitionProducer :: Typeable a => Partition a -> Closure (ConduitT () a (ResourceT IO) ())
partitionProducer :: Partition a -> Closure (ConduitT () a (ResourceT IO) ())
partitionProducer Partition a
PEmpty = static (() -> ConduitT () a (ResourceT IO) ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
partitionProducer (PSimple Closure (ConduitT () a (ResourceT IO) ())
p) = Closure (ConduitT () a (ResourceT IO) ())
p
partitionProducer (PCombined Partition a
p1 Partition a
p2) =
  static ConduitT () a (ResourceT IO) ()
-> ConduitT () a (ResourceT IO) ()
-> ConduitT () a (ResourceT IO) ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
(>>) Closure
  (ConduitT () a (ResourceT IO) ()
   -> ConduitT () a (ResourceT IO) ()
   -> ConduitT () a (ResourceT IO) ())
-> Closure (ConduitT () a (ResourceT IO) ())
-> Closure
     (ConduitT () a (ResourceT IO) ()
      -> ConduitT () a (ResourceT IO) ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Partition a -> Closure (ConduitT () a (ResourceT IO) ())
forall a.
Typeable a =>
Partition a -> Closure (ConduitT () a (ResourceT IO) ())
partitionProducer Partition a
p1 Closure
  (ConduitT () a (ResourceT IO) ()
   -> ConduitT () a (ResourceT IO) ())
-> Closure (ConduitT () a (ResourceT IO) ())
-> Closure (ConduitT () a (ResourceT IO) ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Partition a -> Closure (ConduitT () a (ResourceT IO) ())
forall a.
Typeable a =>
Partition a -> Closure (ConduitT () a (ResourceT IO) ())
partitionProducer Partition a
p2

-- * Dataset

-- |
-- Represents a partitioned multiset that can be transformed in a distributed fashion.
data Dataset a where
  DExternal :: StaticSerialise a => [Partition a] -> Dataset a
  DPipe ::
    (StaticSerialise a, StaticSerialise b) =>
    Closure (ConduitT a b (ResourceT IO) ()) ->
    Dataset a ->
    Dataset b
  DPartition ::
    (StaticHashable k, StaticSerialise a) =>
    Int ->
    Closure (a -> k) ->
    Dataset a ->
    Dataset a
  DCoalesce ::
    Int ->
    Dataset a ->
    Dataset a

dStaticSerialise :: Dataset a -> Dict (StaticSerialise a)
dStaticSerialise :: Dataset a -> Dict (StaticSerialise a)
dStaticSerialise DExternal {} = Dict (StaticSerialise a)
forall (a :: Constraint). a => Dict a
Dict
dStaticSerialise DPipe {} = Dict (StaticSerialise a)
forall (a :: Constraint). a => Dict a
Dict
dStaticSerialise DPartition {} = Dict (StaticSerialise a)
forall (a :: Constraint). a => Dict a
Dict
dStaticSerialise (DCoalesce Int
_ Dataset a
d) = Dataset a -> Dict (StaticSerialise a)
forall a. Dataset a -> Dict (StaticSerialise a)
dStaticSerialise Dataset a
d

-- * Stage

data Stage a where
  SInit :: Typeable a => [Partition a] -> Stage a
  SNarrow ::
    (StaticSerialise a, StaticSerialise b) =>
    Closure (ConduitM a b (ResourceT IO) ()) ->
    Stage a ->
    Stage b
  SWide ::
    (StaticSerialise a, StaticSerialise b) =>
    Int ->
    Closure (ConduitM a (Int, b) (ResourceT IO) ()) ->
    Stage a ->
    Stage b
  SCoalesce :: Int -> Stage a -> Stage a

instance Show (Stage a) where
  show :: Stage a -> String
show s :: Stage a
s@(SInit [Partition a]
_) = Stage a -> String
forall a. Stage a -> String
showTopStage Stage a
s
  show s :: Stage a
s@(SNarrow Closure (ConduitM a a (ResourceT IO) ())
_ Stage a
r) = [String] -> String
forall a. Monoid a => [a] -> a
mconcat [Stage a -> String
forall a. Show a => a -> String
show Stage a
r, String
" -> ", Stage a -> String
forall a. Stage a -> String
showTopStage Stage a
s]
  show s :: Stage a
s@(SWide Int
_ Closure (ConduitM a (Int, a) (ResourceT IO) ())
_ Stage a
r) = [String] -> String
forall a. Monoid a => [a] -> a
mconcat [Stage a -> String
forall a. Show a => a -> String
show Stage a
r, String
" -> ", Stage a -> String
forall a. Stage a -> String
showTopStage Stage a
s]
  show s :: Stage a
s@(SCoalesce Int
_ Stage a
r) = [String] -> String
forall a. Monoid a => [a] -> a
mconcat [Stage a -> String
forall a. Show a => a -> String
show Stage a
r, String
" -> ", Stage a -> String
forall a. Stage a -> String
showTopStage Stage a
s]

showTopStage :: forall a. Stage a -> String
showTopStage :: Stage a -> String
showTopStage (SInit [Partition a]
p) =
  [String] -> String
forall a. Monoid a => [a] -> a
mconcat
    [ String
"SInit",
      String
" @",
      TypeRep -> String
forall a. Show a => a -> String
show (Proxy a -> TypeRep
forall k (proxy :: k -> *) (a :: k).
Typeable a =>
proxy a -> TypeRep
typeRep (Proxy a -> TypeRep) -> Proxy a -> TypeRep
forall a b. (a -> b) -> a -> b
$ Proxy a
forall k (t :: k). Proxy t
Proxy @a),
      String
" ",
      Int -> String
forall a. Show a => a -> String
show ([Partition a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [Partition a]
p)
    ]
showTopStage (SNarrow Closure (ConduitM a a (ResourceT IO) ())
_ Stage a
_) =
  [String] -> String
forall a. Monoid a => [a] -> a
mconcat
    [ String
"* SNarrow",
      String
" @",
      TypeRep -> String
forall a. Show a => a -> String
show (Proxy a -> TypeRep
forall k (proxy :: k -> *) (a :: k).
Typeable a =>
proxy a -> TypeRep
typeRep (Proxy a -> TypeRep) -> Proxy a -> TypeRep
forall a b. (a -> b) -> a -> b
$ Proxy a
forall k (t :: k). Proxy t
Proxy @a)
    ]
showTopStage (SWide Int
i Closure (ConduitM a (Int, a) (ResourceT IO) ())
_ Stage a
_) =
  [String] -> String
forall a. Monoid a => [a] -> a
mconcat
    [ String
"* SWide",
      String
" @",
      TypeRep -> String
forall a. Show a => a -> String
show (Proxy a -> TypeRep
forall k (proxy :: k -> *) (a :: k).
Typeable a =>
proxy a -> TypeRep
typeRep (Proxy a -> TypeRep) -> Proxy a -> TypeRep
forall a b. (a -> b) -> a -> b
$ Proxy a
forall k (t :: k). Proxy t
Proxy @a),
      String
" ",
      Int -> String
forall a. Show a => a -> String
show Int
i
    ]
showTopStage (SCoalesce Int
i Stage a
_) =
  [String] -> String
forall a. Monoid a => [a] -> a
mconcat
    [ String
"SCoalesce",
      String
" ",
      Int -> String
forall a. Show a => a -> String
show Int
i
    ]

mkStages :: Dataset a -> Stage a
mkStages :: Dataset a -> Stage a
mkStages (DExternal [Partition a]
a) = [Partition a] -> Stage a
forall a. Typeable a => [Partition a] -> Stage a
SInit [Partition a]
a
mkStages (DPipe Closure (ConduitT a a (ResourceT IO) ())
p Dataset a
rest) =
  case Dataset a -> Stage a
forall a. Dataset a -> Stage a
mkStages Dataset a
rest of
    SNarrow Closure (ConduitM a a (ResourceT IO) ())
prev Stage a
r ->
      Closure (ConduitM a a (ResourceT IO) ()) -> Stage a -> Stage a
forall a b.
(StaticSerialise a, StaticSerialise b) =>
Closure (ConduitM a b (ResourceT IO) ()) -> Stage a -> Stage b
SNarrow (static ConduitM a a (ResourceT IO) ()
-> ConduitT a a (ResourceT IO) () -> ConduitM a a (ResourceT IO) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
(.|) Closure
  (ConduitM a a (ResourceT IO) ()
   -> ConduitT a a (ResourceT IO) ()
   -> ConduitM a a (ResourceT IO) ())
-> Closure (ConduitM a a (ResourceT IO) ())
-> Closure
     (ConduitT a a (ResourceT IO) () -> ConduitM a a (ResourceT IO) ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (ConduitM a a (ResourceT IO) ())
prev Closure
  (ConduitT a a (ResourceT IO) () -> ConduitM a a (ResourceT IO) ())
-> Closure (ConduitT a a (ResourceT IO) ())
-> Closure (ConduitM a a (ResourceT IO) ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (ConduitT a a (ResourceT IO) ())
p) Stage a
r
    Stage a
other ->
      Closure (ConduitT a a (ResourceT IO) ()) -> Stage a -> Stage a
forall a b.
(StaticSerialise a, StaticSerialise b) =>
Closure (ConduitM a b (ResourceT IO) ()) -> Stage a -> Stage b
SNarrow Closure (ConduitT a a (ResourceT IO) ())
p Stage a
other
mkStages (DPartition Int
count (Closure (a -> k)
cf :: Closure (a -> k)) Dataset a
rest) =
  case Dataset a -> Stage a
forall a. Dataset a -> Stage a
mkStages Dataset a
rest of
    SNarrow Closure (ConduitM a a (ResourceT IO) ())
cp Stage a
rest' ->
      Int
-> Closure (ConduitM a (Int, a) (ResourceT IO) ())
-> Stage a
-> Stage a
forall a b.
(StaticSerialise a, StaticSerialise b) =>
Int
-> Closure (ConduitM a (Int, b) (ResourceT IO) ())
-> Stage a
-> Stage b
SWide
        Int
count
        ( static (\Dict (Typeable k, Eq k, Hashable k)
Dict ConduitM a a (ResourceT IO) ()
p a -> k
f -> ConduitM a a (ResourceT IO) ()
p ConduitM a a (ResourceT IO) ()
-> ConduitM a (Int, a) (ResourceT IO) ()
-> ConduitM a (Int, a) (ResourceT IO) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| (a -> k) -> ConduitM a (Int, a) (ResourceT IO) ()
forall t e (m :: * -> *).
(Hashable e, Monad m) =>
(t -> e) -> ConduitT t (Int, t) m ()
partition @a @k a -> k
f)
            Closure
  (Dict (Typeable k, Eq k, Hashable k)
   -> ConduitM a a (ResourceT IO) ()
   -> (a -> k)
   -> ConduitM a (Int, a) (ResourceT IO) ())
-> Closure (Dict (Typeable k, Eq k, Hashable k))
-> Closure
     (ConduitM a a (ResourceT IO) ()
      -> (a -> k) -> ConduitM a (Int, a) (ResourceT IO) ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` StaticHashable k => Closure (Dict (Typeable k, Eq k, Hashable k))
forall a.
StaticHashable a =>
Closure (Dict (Typeable a, Eq a, Hashable a))
staticHashable @k
            Closure
  (ConduitM a a (ResourceT IO) ()
   -> (a -> k) -> ConduitM a (Int, a) (ResourceT IO) ())
-> Closure (ConduitM a a (ResourceT IO) ())
-> Closure ((a -> k) -> ConduitM a (Int, a) (ResourceT IO) ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (ConduitM a a (ResourceT IO) ())
cp
            Closure ((a -> k) -> ConduitM a (Int, a) (ResourceT IO) ())
-> Closure (a -> k)
-> Closure (ConduitM a (Int, a) (ResourceT IO) ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (a -> k)
cf
        )
        Stage a
rest'
    Stage a
other ->
      Int
-> Closure (ConduitM a (Int, a) (ResourceT IO) ())
-> Stage a
-> Stage a
forall a b.
(StaticSerialise a, StaticSerialise b) =>
Int
-> Closure (ConduitM a (Int, b) (ResourceT IO) ())
-> Stage a
-> Stage b
SWide
        Int
count
        ( static (\Dict (Typeable k, Eq k, Hashable k)
Dict -> forall t e (m :: * -> *).
(Hashable e, Monad m) =>
(t -> e) -> ConduitT t (Int, t) m ()
forall (m :: * -> *).
(Hashable k, Monad m) =>
(a -> k) -> ConduitT a (Int, a) m ()
partition @a @k)
            Closure
  (Dict (Typeable k, Eq k, Hashable k)
   -> (a -> k) -> ConduitM a (Int, a) (ResourceT IO) ())
-> Closure (Dict (Typeable k, Eq k, Hashable k))
-> Closure ((a -> k) -> ConduitM a (Int, a) (ResourceT IO) ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` StaticHashable k => Closure (Dict (Typeable k, Eq k, Hashable k))
forall a.
StaticHashable a =>
Closure (Dict (Typeable a, Eq a, Hashable a))
staticHashable @k
            Closure ((a -> k) -> ConduitM a (Int, a) (ResourceT IO) ())
-> Closure (a -> k)
-> Closure (ConduitM a (Int, a) (ResourceT IO) ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (a -> k)
cf
        )
        Stage a
other
  where
    partition :: forall t e m. (Hashable e, Monad m) => (t -> e) -> ConduitT t (Int, t) m ()
    partition :: (t -> e) -> ConduitT t (Int, t) m ()
partition t -> e
f =
      ConduitT t (Int, t) m (Maybe t)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
C.await ConduitT t (Int, t) m (Maybe t)
-> (Maybe t -> ConduitT t (Int, t) m ())
-> ConduitT t (Int, t) m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
        Maybe t
Nothing -> () -> ConduitT t (Int, t) m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
        Just t
a -> (Int, t) -> ConduitT t (Int, t) m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield (e -> Int
forall a. Hashable a => a -> Int
hash (t -> e
f t
a), t
a) ConduitT t (Int, t) m ()
-> ConduitT t (Int, t) m () -> ConduitT t (Int, t) m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> (t -> e) -> ConduitT t (Int, t) m ()
forall t e (m :: * -> *).
(Hashable e, Monad m) =>
(t -> e) -> ConduitT t (Int, t) m ()
partition t -> e
f
mkStages (DCoalesce Int
count Dataset a
rest) =
  case Dataset a -> Stage a
forall a. Dataset a -> Stage a
mkStages Dataset a
rest of
    SNarrow Closure (ConduitM a a (ResourceT IO) ())
cp Stage a
rest' ->
      Closure (ConduitM a a (ResourceT IO) ()) -> Stage a -> Stage a
forall a b.
(StaticSerialise a, StaticSerialise b) =>
Closure (ConduitM a b (ResourceT IO) ()) -> Stage a -> Stage b
SNarrow Closure (ConduitM a a (ResourceT IO) ())
cp (Int -> Stage a -> Stage a
forall a. Int -> Stage a -> Stage a
SCoalesce Int
count Stage a
rest')
    SWide Int
_ Closure (ConduitM a (Int, a) (ResourceT IO) ())
cp Stage a
rest' ->
      Int
-> Closure (ConduitM a (Int, a) (ResourceT IO) ())
-> Stage a
-> Stage a
forall a b.
(StaticSerialise a, StaticSerialise b) =>
Int
-> Closure (ConduitM a (Int, b) (ResourceT IO) ())
-> Stage a
-> Stage b
SWide Int
count Closure (ConduitM a (Int, a) (ResourceT IO) ())
cp Stage a
rest'
    SCoalesce Int
_ Stage a
rest' ->
      Int -> Stage a -> Stage a
forall a. Int -> Stage a -> Stage a
SCoalesce Int
count Stage a
rest'
    Stage a
other ->
      Int -> Stage a -> Stage a
forall a. Int -> Stage a -> Stage a
SCoalesce Int
count Stage a
other

-- FIXME: This function should not look this horrible.
runStages :: forall a. Stage a -> DD [Partition a]
runStages :: Stage a -> DD [Partition a]
runStages stage :: Stage a
stage@(SInit [Partition a]
ps) = do
  Text -> DD ()
forall (m :: * -> *). MonadLogger m => Text -> m ()
logInfoN (Text -> DD ()) -> Text -> DD ()
forall a b. (a -> b) -> a -> b
$ Text
"Running: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (Stage a -> String
forall a. Stage a -> String
showTopStage Stage a
stage)
  [Partition a] -> DD [Partition a]
forall (m :: * -> *) a. Monad m => a -> m a
return [Partition a]
ps
runStages stage :: Stage a
stage@(SNarrow Closure (ConduitM a a (ResourceT IO) ())
cpipe Stage a
rest) = do
  [Partition a]
inputs <- Stage a -> DD [Partition a]
forall a. Stage a -> DD [Partition a]
runStages Stage a
rest
  Text -> DD ()
forall (m :: * -> *). MonadLogger m => Text -> m ()
logInfoN (Text -> DD ()) -> Text -> DD ()
forall a b. (a -> b) -> a -> b
$ Text
"Running: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (Stage a -> String
forall a. Stage a -> String
showTopStage Stage a
stage)
  ShuffleStore
shuffleStore <- Getting ShuffleStore DDEnv ShuffleStore -> DD ShuffleStore
forall s (m :: * -> *) a. MonadReader s m => Getting a s a -> m a
view Getting ShuffleStore DDEnv ShuffleStore
Lens' DDEnv ShuffleStore
ddShuffleStore
  [(Closure (IO (ExecutorResponse ())), Partition a)]
tasks <-
    [Partition a]
-> (Partition a
    -> DD (Closure (IO (ExecutorResponse ())), Partition a))
-> DD [(Closure (IO (ExecutorResponse ())), Partition a)]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [Partition a]
inputs ((Partition a
  -> DD (Closure (IO (ExecutorResponse ())), Partition a))
 -> DD [(Closure (IO (ExecutorResponse ())), Partition a)])
-> (Partition a
    -> DD (Closure (IO (ExecutorResponse ())), Partition a))
-> DD [(Closure (IO (ExecutorResponse ())), Partition a)]
forall a b. (a -> b) -> a -> b
$ \Partition a
input -> do
      Int64
num <- IO Int64 -> DD Int64
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO Int64
forall a. Random a => IO a
randomIO
      let coutput :: Closure (ConduitT ByteString Void (ResourceT IO) ())
coutput = ShuffleStore
-> Closure (Int64 -> ConduitT ByteString Void (ResourceT IO) ())
ssPut ShuffleStore
shuffleStore Closure (Int64 -> ConduitT ByteString Void (ResourceT IO) ())
-> Closure Int64
-> Closure (ConduitT ByteString Void (ResourceT IO) ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (Dict (Serializable Int64)) -> Int64 -> Closure Int64
forall a. Closure (Dict (Serializable a)) -> a -> Closure a
cpure (static Dict (Serializable Int64)
forall (a :: Constraint). a => Dict a
Dict) Int64
num
          cinput :: Closure (ConduitT () ByteString (ResourceT IO) ())
cinput = ShuffleStore
-> Closure
     (Int64 -> Range -> ConduitT () ByteString (ResourceT IO) ())
ssGet ShuffleStore
shuffleStore Closure
  (Int64 -> Range -> ConduitT () ByteString (ResourceT IO) ())
-> Closure Int64
-> Closure (Range -> ConduitT () ByteString (ResourceT IO) ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (Dict (Serializable Int64)) -> Int64 -> Closure Int64
forall a. Closure (Dict (Serializable a)) -> a -> Closure a
cpure (static Dict (Serializable Int64)
forall (a :: Constraint). a => Dict a
Dict) Int64
num Closure (Range -> ConduitT () ByteString (ResourceT IO) ())
-> Closure Range
-> Closure (ConduitT () ByteString (ResourceT IO) ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (Dict (Serializable Range)) -> Range -> Closure Range
forall a. Closure (Dict (Serializable a)) -> a -> Closure a
cpure (static Dict (Serializable Range)
forall (a :: Constraint). a => Dict a
Dict) Range
RangeAll
          crun :: Closure (IO (ExecutorResponse ()))
crun =
            static
              ( \Dict (Typeable a, Serialise a)
Dict ConduitT () a (ResourceT IO) ()
producer ConduitM a a (ResourceT IO) ()
pipe ConduitT ByteString Void (ResourceT IO) ()
output ->
                  (ExecutorStatsHooks -> IO ()) -> IO (ExecutorResponse ())
forall a. (ExecutorStatsHooks -> IO a) -> IO (ExecutorResponse a)
withExecutorStats ((ExecutorStatsHooks -> IO ()) -> IO (ExecutorResponse ()))
-> (ExecutorStatsHooks -> IO ()) -> IO (ExecutorResponse ())
forall a b. (a -> b) -> a -> b
$ \ExecutorStatsHooks {forall a (m :: * -> *). MonadIO m => ConduitT a a m ()
forall (m :: * -> *).
MonadIO m =>
ConduitT ByteString ByteString m ()
eshOutput :: ExecutorStatsHooks
-> forall a (m :: * -> *). MonadIO m => ConduitT a a m ()
eshInput :: ExecutorStatsHooks
-> forall a (m :: * -> *). MonadIO m => ConduitT a a m ()
eshUpload :: ExecutorStatsHooks
-> forall (m :: * -> *).
   MonadIO m =>
   ConduitT ByteString ByteString m ()
eshDownload :: ExecutorStatsHooks
-> forall (m :: * -> *).
   MonadIO m =>
   ConduitT ByteString ByteString m ()
eshOutput :: forall a (m :: * -> *). MonadIO m => ConduitT a a m ()
eshInput :: forall a (m :: * -> *). MonadIO m => ConduitT a a m ()
eshUpload :: forall (m :: * -> *).
MonadIO m =>
ConduitT ByteString ByteString m ()
eshDownload :: forall (m :: * -> *).
MonadIO m =>
ConduitT ByteString ByteString m ()
..} ->
                    ConduitT () Void (ResourceT IO) () -> IO ()
forall (m :: * -> *) r.
MonadUnliftIO m =>
ConduitT () Void (ResourceT m) r -> m r
C.runConduitRes (ConduitT () Void (ResourceT IO) () -> IO ())
-> ConduitT () Void (ResourceT IO) () -> IO ()
forall a b. (a -> b) -> a -> b
$
                      ConduitT () a (ResourceT IO) ()
producer
                        ConduitT () a (ResourceT IO) ()
-> ConduitM a Void (ResourceT IO) ()
-> ConduitT () Void (ResourceT IO) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| ConduitT a a (ResourceT IO) ()
forall a (m :: * -> *). MonadIO m => ConduitT a a m ()
eshInput
                        ConduitT a a (ResourceT IO) ()
-> ConduitM a Void (ResourceT IO) ()
-> ConduitM a Void (ResourceT IO) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| ConduitM a a (ResourceT IO) ()
pipe
                        ConduitM a a (ResourceT IO) ()
-> ConduitM a Void (ResourceT IO) ()
-> ConduitM a Void (ResourceT IO) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| ConduitT a a (ResourceT IO) ()
forall a (m :: * -> *). MonadIO m => ConduitT a a m ()
eshOutput
                        ConduitT a a (ResourceT IO) ()
-> ConduitM a Void (ResourceT IO) ()
-> ConduitM a Void (ResourceT IO) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| forall a (m :: * -> *).
(Serialise a, Monad m) =>
ConduitM a ByteString m ()
forall (m :: * -> *).
(Serialise a, Monad m) =>
ConduitM a ByteString m ()
serialiseC @a
                        ConduitM a ByteString (ResourceT IO) ()
-> ConduitT ByteString Void (ResourceT IO) ()
-> ConduitM a Void (ResourceT IO) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| ConduitT ByteString ByteString (ResourceT IO) ()
forall (m :: * -> *).
MonadIO m =>
ConduitT ByteString ByteString m ()
eshUpload
                        ConduitT ByteString ByteString (ResourceT IO) ()
-> ConduitT ByteString Void (ResourceT IO) ()
-> ConduitT ByteString Void (ResourceT IO) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| ConduitT ByteString Void (ResourceT IO) ()
output
              )
              Closure
  (Dict (Typeable a, Serialise a)
   -> ConduitT () a (ResourceT IO) ()
   -> ConduitM a a (ResourceT IO) ()
   -> ConduitT ByteString Void (ResourceT IO) ()
   -> IO (ExecutorResponse ()))
-> Closure (Dict (Typeable a, Serialise a))
-> Closure
     (ConduitT () a (ResourceT IO) ()
      -> ConduitM a a (ResourceT IO) ()
      -> ConduitT ByteString Void (ResourceT IO) ()
      -> IO (ExecutorResponse ()))
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` StaticSerialise a => Closure (Dict (Typeable a, Serialise a))
forall a.
StaticSerialise a =>
Closure (Dict (Typeable a, Serialise a))
staticSerialise @a
              Closure
  (ConduitT () a (ResourceT IO) ()
   -> ConduitM a a (ResourceT IO) ()
   -> ConduitT ByteString Void (ResourceT IO) ()
   -> IO (ExecutorResponse ()))
-> Closure (ConduitT () a (ResourceT IO) ())
-> Closure
     (ConduitM a a (ResourceT IO) ()
      -> ConduitT ByteString Void (ResourceT IO) ()
      -> IO (ExecutorResponse ()))
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Partition a -> Closure (ConduitT () a (ResourceT IO) ())
forall a.
Typeable a =>
Partition a -> Closure (ConduitT () a (ResourceT IO) ())
partitionProducer Partition a
input
              Closure
  (ConduitM a a (ResourceT IO) ()
   -> ConduitT ByteString Void (ResourceT IO) ()
   -> IO (ExecutorResponse ()))
-> Closure (ConduitM a a (ResourceT IO) ())
-> Closure
     (ConduitT ByteString Void (ResourceT IO) ()
      -> IO (ExecutorResponse ()))
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (ConduitM a a (ResourceT IO) ())
cpipe
              Closure
  (ConduitT ByteString Void (ResourceT IO) ()
   -> IO (ExecutorResponse ()))
-> Closure (ConduitT ByteString Void (ResourceT IO) ())
-> Closure (IO (ExecutorResponse ()))
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (ConduitT ByteString Void (ResourceT IO) ())
coutput
          newPartition :: Partition a
newPartition =
            Closure (ConduitT () a (ResourceT IO) ()) -> Partition a
forall a. Closure (ConduitT () a (ResourceT IO) ()) -> Partition a
PSimple @a
              ( static (\Dict (Typeable a, Serialise a)
Dict ConduitT () ByteString (ResourceT IO) ()
input' -> ConduitT () ByteString (ResourceT IO) ()
input' ConduitT () ByteString (ResourceT IO) ()
-> ConduitM ByteString a (ResourceT IO) ()
-> ConduitT () a (ResourceT IO) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| ConduitM ByteString a (ResourceT IO) ()
forall a (m :: * -> *).
(Serialise a, MonadIO m) =>
ConduitM ByteString a m ()
deserialiseC)
                  Closure
  (Dict (Typeable a, Serialise a)
   -> ConduitT () ByteString (ResourceT IO) ()
   -> ConduitT () a (ResourceT IO) ())
-> Closure (Dict (Typeable a, Serialise a))
-> Closure
     (ConduitT () ByteString (ResourceT IO) ()
      -> ConduitT () a (ResourceT IO) ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` StaticSerialise a => Closure (Dict (Typeable a, Serialise a))
forall a.
StaticSerialise a =>
Closure (Dict (Typeable a, Serialise a))
staticSerialise @a
                  Closure
  (ConduitT () ByteString (ResourceT IO) ()
   -> ConduitT () a (ResourceT IO) ())
-> Closure (ConduitT () ByteString (ResourceT IO) ())
-> Closure (ConduitT () a (ResourceT IO) ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (ConduitT () ByteString (ResourceT IO) ())
cinput
              )
      (Closure (IO (ExecutorResponse ())), Partition a)
-> DD (Closure (IO (ExecutorResponse ())), Partition a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Closure (IO (ExecutorResponse ()))
crun, Partition a
newPartition)
  Backend
backend <- Getting Backend DDEnv Backend -> DD Backend
forall s (m :: * -> *) a. MonadReader s m => Getting a s a -> m a
view Getting Backend DDEnv Backend
Lens' DDEnv Backend
ddBackend
  LogLevel
level <- Getting LogLevel DDEnv LogLevel -> DD LogLevel
forall s (m :: * -> *) a. MonadReader s m => Getting a s a -> m a
view Getting LogLevel DDEnv LogLevel
Lens' DDEnv LogLevel
ddLogLevel
  let f :: Backend
-> Closure (Dict (Serializable (ExecutorResponse ())))
-> [Closure (IO (ExecutorResponse ()))]
-> IO [ExecutorResponse ()]
f =
        Options
-> Backend
-> Closure (Dict (Serializable (ExecutorResponse ())))
-> [Closure (IO (ExecutorResponse ()))]
-> IO [ExecutorResponse ()]
forall a.
Options
-> Backend
-> Closure (Dict (Serializable a))
-> [Closure (IO a)]
-> IO [a]
D.forkConcurrently
          ( Options
D.defaultOptions
              { oShowProgress :: Bool
D.oShowProgress = LogLevel
level LogLevel -> LogLevel -> Bool
forall a. Ord a => a -> a -> Bool
<= LogLevel
LevelInfo,
                oRetries :: Int
D.oRetries = Int
2
              }
          )
  [ExecutorResponse ()]
ret <- IO [ExecutorResponse ()] -> DD [ExecutorResponse ()]
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [ExecutorResponse ()] -> DD [ExecutorResponse ()])
-> IO [ExecutorResponse ()] -> DD [ExecutorResponse ()]
forall a b. (a -> b) -> a -> b
$ Backend
-> Closure (Dict (Serializable (ExecutorResponse ())))
-> [Closure (IO (ExecutorResponse ()))]
-> IO [ExecutorResponse ()]
f Backend
backend (static Dict (Serializable (ExecutorResponse ()))
forall (a :: Constraint). a => Dict a
Dict) (((Closure (IO (ExecutorResponse ())), Partition a)
 -> Closure (IO (ExecutorResponse ())))
-> [(Closure (IO (ExecutorResponse ())), Partition a)]
-> [Closure (IO (ExecutorResponse ()))]
forall a b. (a -> b) -> [a] -> [b]
map (Closure (IO (ExecutorResponse ())), Partition a)
-> Closure (IO (ExecutorResponse ()))
forall a b. (a, b) -> a
fst [(Closure (IO (ExecutorResponse ())), Partition a)]
tasks)
  Text -> DD ()
forall (m :: * -> *). MonadLogger m => Text -> m ()
logDebugN (Text -> DD ()) -> Text -> DD ()
forall a b. (a -> b) -> a -> b
$ Text
"Stats: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (ExecutorStats -> String
forall a. Show a => a -> String
show (ExecutorStats -> String) -> ExecutorStats -> String
forall a b. (a -> b) -> a -> b
$ (ExecutorResponse () -> ExecutorStats)
-> [ExecutorResponse ()] -> ExecutorStats
forall (t :: * -> *) m a.
(Foldable t, Monoid m) =>
(a -> m) -> t a -> m
foldMap ExecutorResponse () -> ExecutorStats
forall a. ExecutorResponse a -> ExecutorStats
erStats [ExecutorResponse ()]
ret)
  [Partition a] -> DD [Partition a]
forall (m :: * -> *) a. Monad m => a -> m a
return ([Partition a] -> DD [Partition a])
-> [Partition a] -> DD [Partition a]
forall a b. (a -> b) -> a -> b
$ ((Closure (IO (ExecutorResponse ())), Partition a) -> Partition a)
-> [(Closure (IO (ExecutorResponse ())), Partition a)]
-> [Partition a]
forall a b. (a -> b) -> [a] -> [b]
map (Closure (IO (ExecutorResponse ())), Partition a) -> Partition a
forall a b. (a, b) -> b
snd [(Closure (IO (ExecutorResponse ())), Partition a)]
tasks
runStages stage :: Stage a
stage@(SWide Int
count Closure (ConduitM a (Int, a) (ResourceT IO) ())
cpipe Stage a
rest) = do
  [Partition a]
inputs <- Stage a -> DD [Partition a]
forall a. Stage a -> DD [Partition a]
runStages Stage a
rest
  Text -> DD ()
forall (m :: * -> *). MonadLogger m => Text -> m ()
logInfoN (Text -> DD ()) -> Text -> DD ()
forall a b. (a -> b) -> a -> b
$ Text
"Running: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (Stage a -> String
forall a. Stage a -> String
showTopStage Stage a
stage)
  ShuffleStore
shuffleStore <- Getting ShuffleStore DDEnv ShuffleStore -> DD ShuffleStore
forall s (m :: * -> *) a. MonadReader s m => Getting a s a -> m a
view Getting ShuffleStore DDEnv ShuffleStore
Lens' DDEnv ShuffleStore
ddShuffleStore
  [(Closure (IO (ExecutorResponse [(Int, (Integer, Integer))])),
  Int64)]
tasks <-
    [Partition a]
-> (Partition a
    -> DD
         (Closure (IO (ExecutorResponse [(Int, (Integer, Integer))])),
          Int64))
-> DD
     [(Closure (IO (ExecutorResponse [(Int, (Integer, Integer))])),
       Int64)]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [Partition a]
inputs ((Partition a
  -> DD
       (Closure (IO (ExecutorResponse [(Int, (Integer, Integer))])),
        Int64))
 -> DD
      [(Closure (IO (ExecutorResponse [(Int, (Integer, Integer))])),
        Int64)])
-> (Partition a
    -> DD
         (Closure (IO (ExecutorResponse [(Int, (Integer, Integer))])),
          Int64))
-> DD
     [(Closure (IO (ExecutorResponse [(Int, (Integer, Integer))])),
       Int64)]
forall a b. (a -> b) -> a -> b
$ \Partition a
partition -> do
      Int64
num <- IO Int64 -> DD Int64
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO Int64
forall a. Random a => IO a
randomIO
      let coutput :: Closure (ConduitT ByteString Void (ResourceT IO) ())
coutput = ShuffleStore
-> Closure (Int64 -> ConduitT ByteString Void (ResourceT IO) ())
ssPut ShuffleStore
shuffleStore Closure (Int64 -> ConduitT ByteString Void (ResourceT IO) ())
-> Closure Int64
-> Closure (ConduitT ByteString Void (ResourceT IO) ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (Dict (Serializable Int64)) -> Int64 -> Closure Int64
forall a. Closure (Dict (Serializable a)) -> a -> Closure a
cpure (static Dict (Serializable Int64)
forall (a :: Constraint). a => Dict a
Dict) Int64
num
          crun :: Closure (IO (ExecutorResponse [(Int, (Integer, Integer))]))
crun =
            static
              ( \Dict (Typeable a, Serialise a)
Dict Int
count' ConduitT () a (ResourceT IO) ()
input ConduitM a (Int, a) (ResourceT IO) ()
pipe ConduitT ByteString Void (ResourceT IO) ()
output ->
                  (ExecutorStatsHooks -> IO [(Int, (Integer, Integer))])
-> IO (ExecutorResponse [(Int, (Integer, Integer))])
forall a. (ExecutorStatsHooks -> IO a) -> IO (ExecutorResponse a)
withExecutorStats ((ExecutorStatsHooks -> IO [(Int, (Integer, Integer))])
 -> IO (ExecutorResponse [(Int, (Integer, Integer))]))
-> (ExecutorStatsHooks -> IO [(Int, (Integer, Integer))])
-> IO (ExecutorResponse [(Int, (Integer, Integer))])
forall a b. (a -> b) -> a -> b
$ \ExecutorStatsHooks {forall a (m :: * -> *). MonadIO m => ConduitT a a m ()
forall (m :: * -> *).
MonadIO m =>
ConduitT ByteString ByteString m ()
eshOutput :: forall a (m :: * -> *). MonadIO m => ConduitT a a m ()
eshInput :: forall a (m :: * -> *). MonadIO m => ConduitT a a m ()
eshUpload :: forall (m :: * -> *).
MonadIO m =>
ConduitT ByteString ByteString m ()
eshDownload :: forall (m :: * -> *).
MonadIO m =>
ConduitT ByteString ByteString m ()
eshOutput :: ExecutorStatsHooks
-> forall a (m :: * -> *). MonadIO m => ConduitT a a m ()
eshInput :: ExecutorStatsHooks
-> forall a (m :: * -> *). MonadIO m => ConduitT a a m ()
eshUpload :: ExecutorStatsHooks
-> forall (m :: * -> *).
   MonadIO m =>
   ConduitT ByteString ByteString m ()
eshDownload :: ExecutorStatsHooks
-> forall (m :: * -> *).
   MonadIO m =>
   ConduitT ByteString ByteString m ()
..} -> do
                    IORef [(Int, (Integer, Integer))]
ref <- [(Int, (Integer, Integer))]
-> IO (IORef [(Int, (Integer, Integer))])
forall a. a -> IO (IORef a)
newIORef @[(Int, (Integer, Integer))] []
                    ConduitT () Void (ResourceT IO) () -> IO ()
forall (m :: * -> *) r.
MonadUnliftIO m =>
ConduitT () Void (ResourceT m) r -> m r
C.runConduitRes (ConduitT () Void (ResourceT IO) () -> IO ())
-> ConduitT () Void (ResourceT IO) () -> IO ()
forall a b. (a -> b) -> a -> b
$
                      ConduitT () a (ResourceT IO) ()
input
                        ConduitT () a (ResourceT IO) ()
-> ConduitM a Void (ResourceT IO) ()
-> ConduitT () Void (ResourceT IO) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| ConduitT a a (ResourceT IO) ()
forall a (m :: * -> *). MonadIO m => ConduitT a a m ()
eshInput
                        ConduitT a a (ResourceT IO) ()
-> ConduitM a Void (ResourceT IO) ()
-> ConduitM a Void (ResourceT IO) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| ConduitM a (Int, a) (ResourceT IO) ()
pipe
                        ConduitM a (Int, a) (ResourceT IO) ()
-> ConduitM (Int, a) Void (ResourceT IO) ()
-> ConduitM a Void (ResourceT IO) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| ((Int, a) -> (Int, a))
-> ConduitT (Int, a) (Int, a) (ResourceT IO) ()
forall (m :: * -> *) a b. Monad m => (a -> b) -> ConduitT a b m ()
mapC (\(Int
k, a
v) -> (Int
k Int -> Int -> Int
forall a. Integral a => a -> a -> a
`mod` Int
count', a
v))
                        ConduitT (Int, a) (Int, a) (ResourceT IO) ()
-> ConduitM (Int, a) Void (ResourceT IO) ()
-> ConduitM (Int, a) Void (ResourceT IO) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| ConduitT (Int, a) (Int, a) (ResourceT IO) ()
forall a (m :: * -> *). MonadIO m => ConduitT a a m ()
eshOutput
                        ConduitT (Int, a) (Int, a) (ResourceT IO) ()
-> ConduitM (Int, a) Void (ResourceT IO) ()
-> ConduitM (Int, a) Void (ResourceT IO) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| forall t.
Monad (ResourceT IO) =>
ConduitT (Int, t) (Int, t) (ResourceT IO) ()
forall (m :: * -> *) t. Monad m => ConduitT (Int, t) (Int, t) m ()
sort @(ResourceT IO)
                        ConduitT (Int, a) (Int, a) (ResourceT IO) ()
-> ConduitM (Int, a) Void (ResourceT IO) ()
-> ConduitM (Int, a) Void (ResourceT IO) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| ((Eq Int, Serialise a, Monad (ResourceT IO)) =>
ConduitM
  (Int, a) ByteString (ResourceT IO) [(Int, (Integer, Integer))]
forall a k (m :: * -> *).
(Eq k, Serialise a, Monad m) =>
ConduitM (k, a) ByteString m [(k, (Integer, Integer))]
serialiseWithLocC @a @Int @(ResourceT IO) ConduitM
  (Int, a) ByteString (ResourceT IO) [(Int, (Integer, Integer))]
-> ([(Int, (Integer, Integer))]
    -> ConduitT (Int, a) ByteString (ResourceT IO) ())
-> ConduitT (Int, a) ByteString (ResourceT IO) ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= IO () -> ConduitT (Int, a) ByteString (ResourceT IO) ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ConduitT (Int, a) ByteString (ResourceT IO) ())
-> ([(Int, (Integer, Integer))] -> IO ())
-> [(Int, (Integer, Integer))]
-> ConduitT (Int, a) ByteString (ResourceT IO) ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IORef [(Int, (Integer, Integer))]
-> [(Int, (Integer, Integer))] -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef [(Int, (Integer, Integer))]
ref)
                        ConduitT (Int, a) ByteString (ResourceT IO) ()
-> ConduitT ByteString Void (ResourceT IO) ()
-> ConduitM (Int, a) Void (ResourceT IO) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| ConduitT ByteString ByteString (ResourceT IO) ()
forall (m :: * -> *).
MonadIO m =>
ConduitT ByteString ByteString m ()
eshUpload
                        ConduitT ByteString ByteString (ResourceT IO) ()
-> ConduitT ByteString Void (ResourceT IO) ()
-> ConduitT ByteString Void (ResourceT IO) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| ConduitT ByteString Void (ResourceT IO) ()
output
                    IORef [(Int, (Integer, Integer))] -> IO [(Int, (Integer, Integer))]
forall a. IORef a -> IO a
readIORef IORef [(Int, (Integer, Integer))]
ref
              )
              Closure
  (Dict (Typeable a, Serialise a)
   -> Int
   -> ConduitT () a (ResourceT IO) ()
   -> ConduitM a (Int, a) (ResourceT IO) ()
   -> ConduitT ByteString Void (ResourceT IO) ()
   -> IO (ExecutorResponse [(Int, (Integer, Integer))]))
-> Closure (Dict (Typeable a, Serialise a))
-> Closure
     (Int
      -> ConduitT () a (ResourceT IO) ()
      -> ConduitM a (Int, a) (ResourceT IO) ()
      -> ConduitT ByteString Void (ResourceT IO) ()
      -> IO (ExecutorResponse [(Int, (Integer, Integer))]))
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` StaticSerialise a => Closure (Dict (Typeable a, Serialise a))
forall a.
StaticSerialise a =>
Closure (Dict (Typeable a, Serialise a))
staticSerialise @a
              Closure
  (Int
   -> ConduitT () a (ResourceT IO) ()
   -> ConduitM a (Int, a) (ResourceT IO) ()
   -> ConduitT ByteString Void (ResourceT IO) ()
   -> IO (ExecutorResponse [(Int, (Integer, Integer))]))
-> Closure Int
-> Closure
     (ConduitT () a (ResourceT IO) ()
      -> ConduitM a (Int, a) (ResourceT IO) ()
      -> ConduitT ByteString Void (ResourceT IO) ()
      -> IO (ExecutorResponse [(Int, (Integer, Integer))]))
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (Dict (Serializable Int)) -> Int -> Closure Int
forall a. Closure (Dict (Serializable a)) -> a -> Closure a
cpure (static Dict (Serializable Int)
forall (a :: Constraint). a => Dict a
Dict) Int
count
              Closure
  (ConduitT () a (ResourceT IO) ()
   -> ConduitM a (Int, a) (ResourceT IO) ()
   -> ConduitT ByteString Void (ResourceT IO) ()
   -> IO (ExecutorResponse [(Int, (Integer, Integer))]))
-> Closure (ConduitT () a (ResourceT IO) ())
-> Closure
     (ConduitM a (Int, a) (ResourceT IO) ()
      -> ConduitT ByteString Void (ResourceT IO) ()
      -> IO (ExecutorResponse [(Int, (Integer, Integer))]))
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Partition a -> Closure (ConduitT () a (ResourceT IO) ())
forall a.
Typeable a =>
Partition a -> Closure (ConduitT () a (ResourceT IO) ())
partitionProducer Partition a
partition
              Closure
  (ConduitM a (Int, a) (ResourceT IO) ()
   -> ConduitT ByteString Void (ResourceT IO) ()
   -> IO (ExecutorResponse [(Int, (Integer, Integer))]))
-> Closure (ConduitM a (Int, a) (ResourceT IO) ())
-> Closure
     (ConduitT ByteString Void (ResourceT IO) ()
      -> IO (ExecutorResponse [(Int, (Integer, Integer))]))
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (ConduitM a (Int, a) (ResourceT IO) ())
cpipe
              Closure
  (ConduitT ByteString Void (ResourceT IO) ()
   -> IO (ExecutorResponse [(Int, (Integer, Integer))]))
-> Closure (ConduitT ByteString Void (ResourceT IO) ())
-> Closure (IO (ExecutorResponse [(Int, (Integer, Integer))]))
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (ConduitT ByteString Void (ResourceT IO) ())
coutput
      (Closure (IO (ExecutorResponse [(Int, (Integer, Integer))])),
 Int64)
-> DD
     (Closure (IO (ExecutorResponse [(Int, (Integer, Integer))])),
      Int64)
forall (m :: * -> *) a. Monad m => a -> m a
return (Closure (IO (ExecutorResponse [(Int, (Integer, Integer))]))
crun, Int64
num)
  Backend
backend <- Getting Backend DDEnv Backend -> DD Backend
forall s (m :: * -> *) a. MonadReader s m => Getting a s a -> m a
view Getting Backend DDEnv Backend
Lens' DDEnv Backend
ddBackend
  LogLevel
level <- Getting LogLevel DDEnv LogLevel -> DD LogLevel
forall s (m :: * -> *) a. MonadReader s m => Getting a s a -> m a
view Getting LogLevel DDEnv LogLevel
Lens' DDEnv LogLevel
ddLogLevel
  let f :: Backend
-> Closure
     (Dict
        (Serializable (ExecutorResponse [(Int, (Integer, Integer))])))
-> [Closure (IO (ExecutorResponse [(Int, (Integer, Integer))]))]
-> IO [ExecutorResponse [(Int, (Integer, Integer))]]
f =
        Options
-> Backend
-> Closure
     (Dict
        (Serializable (ExecutorResponse [(Int, (Integer, Integer))])))
-> [Closure (IO (ExecutorResponse [(Int, (Integer, Integer))]))]
-> IO [ExecutorResponse [(Int, (Integer, Integer))]]
forall a.
Options
-> Backend
-> Closure (Dict (Serializable a))
-> [Closure (IO a)]
-> IO [a]
D.forkConcurrently
          ( Options
D.defaultOptions
              { oShowProgress :: Bool
D.oShowProgress = LogLevel
level LogLevel -> LogLevel -> Bool
forall a. Ord a => a -> a -> Bool
<= LogLevel
LevelInfo,
                oRetries :: Int
D.oRetries = Int
2
              }
          )
  [ExecutorResponse [(Int, (Integer, Integer))]]
ret <- IO [ExecutorResponse [(Int, (Integer, Integer))]]
-> DD [ExecutorResponse [(Int, (Integer, Integer))]]
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [ExecutorResponse [(Int, (Integer, Integer))]]
 -> DD [ExecutorResponse [(Int, (Integer, Integer))]])
-> IO [ExecutorResponse [(Int, (Integer, Integer))]]
-> DD [ExecutorResponse [(Int, (Integer, Integer))]]
forall a b. (a -> b) -> a -> b
$ Backend
-> Closure
     (Dict
        (Serializable (ExecutorResponse [(Int, (Integer, Integer))])))
-> [Closure (IO (ExecutorResponse [(Int, (Integer, Integer))]))]
-> IO [ExecutorResponse [(Int, (Integer, Integer))]]
f Backend
backend (static Dict (Serializable (ExecutorResponse [(Int, (Integer, Integer))]))
forall (a :: Constraint). a => Dict a
Dict) (((Closure (IO (ExecutorResponse [(Int, (Integer, Integer))])),
  Int64)
 -> Closure (IO (ExecutorResponse [(Int, (Integer, Integer))])))
-> [(Closure (IO (ExecutorResponse [(Int, (Integer, Integer))])),
     Int64)]
-> [Closure (IO (ExecutorResponse [(Int, (Integer, Integer))]))]
forall a b. (a -> b) -> [a] -> [b]
map (Closure (IO (ExecutorResponse [(Int, (Integer, Integer))])),
 Int64)
-> Closure (IO (ExecutorResponse [(Int, (Integer, Integer))]))
forall a b. (a, b) -> a
fst [(Closure (IO (ExecutorResponse [(Int, (Integer, Integer))])),
  Int64)]
tasks)
  Text -> DD ()
forall (m :: * -> *). MonadLogger m => Text -> m ()
logDebugN (Text -> DD ()) -> Text -> DD ()
forall a b. (a -> b) -> a -> b
$ Text
"Stats: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (ExecutorStats -> String
forall a. Show a => a -> String
show (ExecutorStats -> String) -> ExecutorStats -> String
forall a b. (a -> b) -> a -> b
$ (ExecutorResponse [(Int, (Integer, Integer))] -> ExecutorStats)
-> [ExecutorResponse [(Int, (Integer, Integer))]] -> ExecutorStats
forall (t :: * -> *) m a.
(Foldable t, Monoid m) =>
(a -> m) -> t a -> m
foldMap ExecutorResponse [(Int, (Integer, Integer))] -> ExecutorStats
forall a. ExecutorResponse a -> ExecutorStats
erStats [ExecutorResponse [(Int, (Integer, Integer))]]
ret)
  let ret' :: [([(Int, (Integer, Integer))], Int64)]
ret' = [[(Int, (Integer, Integer))]]
-> [Int64] -> [([(Int, (Integer, Integer))], Int64)]
forall a b. [a] -> [b] -> [(a, b)]
zip ((ExecutorResponse [(Int, (Integer, Integer))]
 -> [(Int, (Integer, Integer))])
-> [ExecutorResponse [(Int, (Integer, Integer))]]
-> [[(Int, (Integer, Integer))]]
forall a b. (a -> b) -> [a] -> [b]
map ExecutorResponse [(Int, (Integer, Integer))]
-> [(Int, (Integer, Integer))]
forall a. ExecutorResponse a -> a
erResponse [ExecutorResponse [(Int, (Integer, Integer))]]
ret) (((Closure (IO (ExecutorResponse [(Int, (Integer, Integer))])),
  Int64)
 -> Int64)
-> [(Closure (IO (ExecutorResponse [(Int, (Integer, Integer))])),
     Int64)]
-> [Int64]
forall a b. (a -> b) -> [a] -> [b]
map (Closure (IO (ExecutorResponse [(Int, (Integer, Integer))])),
 Int64)
-> Int64
forall a b. (a, b) -> b
snd [(Closure (IO (ExecutorResponse [(Int, (Integer, Integer))])),
  Int64)]
tasks)
  [[(Int, Partition a)]]
partitions <-
    [([(Int, (Integer, Integer))], Int64)]
-> (([(Int, (Integer, Integer))], Int64)
    -> DD [(Int, Partition a)])
-> DD [[(Int, Partition a)]]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [([(Int, (Integer, Integer))], Int64)]
ret' ((([(Int, (Integer, Integer))], Int64) -> DD [(Int, Partition a)])
 -> DD [[(Int, Partition a)]])
-> (([(Int, (Integer, Integer))], Int64)
    -> DD [(Int, Partition a)])
-> DD [[(Int, Partition a)]]
forall a b. (a -> b) -> a -> b
$ \([(Int, (Integer, Integer))]
res, Int64
num) ->
      [(Int, (Integer, Integer))]
-> ((Int, (Integer, Integer)) -> DD (Int, Partition a))
-> DD [(Int, Partition a)]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [(Int, (Integer, Integer))]
res (((Int, (Integer, Integer)) -> DD (Int, Partition a))
 -> DD [(Int, Partition a)])
-> ((Int, (Integer, Integer)) -> DD (Int, Partition a))
-> DD [(Int, Partition a)]
forall a b. (a -> b) -> a -> b
$ \(Int
partition, (Integer
start, Integer
end)) ->
        (Int, Partition a) -> DD (Int, Partition a)
forall (m :: * -> *) a. Monad m => a -> m a
return
          ( Int
partition,
            Closure (ConduitT () a (ResourceT IO) ()) -> Partition a
forall a. Closure (ConduitT () a (ResourceT IO) ()) -> Partition a
PSimple @a
              ( static (\Dict (Typeable a, Serialise a)
Dict ConduitT () ByteString (ResourceT IO) ()
input' -> ConduitT () ByteString (ResourceT IO) ()
input' ConduitT () ByteString (ResourceT IO) ()
-> ConduitM ByteString a (ResourceT IO) ()
-> ConduitT () a (ResourceT IO) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| ConduitM ByteString a (ResourceT IO) ()
forall a (m :: * -> *).
(Serialise a, MonadIO m) =>
ConduitM ByteString a m ()
deserialiseC)
                  Closure
  (Dict (Typeable a, Serialise a)
   -> ConduitT () ByteString (ResourceT IO) ()
   -> ConduitT () a (ResourceT IO) ())
-> Closure (Dict (Typeable a, Serialise a))
-> Closure
     (ConduitT () ByteString (ResourceT IO) ()
      -> ConduitT () a (ResourceT IO) ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` StaticSerialise a => Closure (Dict (Typeable a, Serialise a))
forall a.
StaticSerialise a =>
Closure (Dict (Typeable a, Serialise a))
staticSerialise @a
                  Closure
  (ConduitT () ByteString (ResourceT IO) ()
   -> ConduitT () a (ResourceT IO) ())
-> Closure (ConduitT () ByteString (ResourceT IO) ())
-> Closure (ConduitT () a (ResourceT IO) ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` ( ShuffleStore
-> Closure
     (Int64 -> Range -> ConduitT () ByteString (ResourceT IO) ())
ssGet ShuffleStore
shuffleStore
                            Closure
  (Int64 -> Range -> ConduitT () ByteString (ResourceT IO) ())
-> Closure Int64
-> Closure (Range -> ConduitT () ByteString (ResourceT IO) ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (Dict (Serializable Int64)) -> Int64 -> Closure Int64
forall a. Closure (Dict (Serializable a)) -> a -> Closure a
cpure (static Dict (Serializable Int64)
forall (a :: Constraint). a => Dict a
Dict) Int64
num
                            Closure (Range -> ConduitT () ByteString (ResourceT IO) ())
-> Closure Range
-> Closure (ConduitT () ByteString (ResourceT IO) ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (Dict (Serializable Range)) -> Range -> Closure Range
forall a. Closure (Dict (Serializable a)) -> a -> Closure a
cpure (static Dict (Serializable Range)
forall (a :: Constraint). a => Dict a
Dict) (Integer -> Integer -> Range
RangeOnly Integer
start Integer
end)
                        )
              )
          )
  ([(Int, Partition a)] -> IntMap (Partition a))
-> [[(Int, Partition a)]] -> [IntMap (Partition a)]
forall a b. (a -> b) -> [a] -> [b]
map [(Int, Partition a)] -> IntMap (Partition a)
forall a. [(Int, a)] -> IntMap a
M.fromList [[(Int, Partition a)]]
partitions
    [IntMap (Partition a)]
-> ([IntMap (Partition a)] -> IntMap (Partition a))
-> IntMap (Partition a)
forall a b. a -> (a -> b) -> b
& (IntMap (Partition a)
 -> IntMap (Partition a) -> IntMap (Partition a))
-> IntMap (Partition a)
-> [IntMap (Partition a)]
-> IntMap (Partition a)
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl' (SimpleWhenMissing (Partition a) (Partition a)
-> SimpleWhenMissing (Partition a) (Partition a)
-> SimpleWhenMatched (Partition a) (Partition a) (Partition a)
-> IntMap (Partition a)
-> IntMap (Partition a)
-> IntMap (Partition a)
forall a c b.
SimpleWhenMissing a c
-> SimpleWhenMissing b c
-> SimpleWhenMatched a b c
-> IntMap a
-> IntMap b
-> IntMap c
M.merge SimpleWhenMissing (Partition a) (Partition a)
forall (f :: * -> *) x. Applicative f => WhenMissing f x x
M.preserveMissing SimpleWhenMissing (Partition a) (Partition a)
forall (f :: * -> *) x. Applicative f => WhenMissing f x x
M.preserveMissing ((Int -> Partition a -> Partition a -> Partition a)
-> SimpleWhenMatched (Partition a) (Partition a) (Partition a)
forall (f :: * -> *) x y z.
Applicative f =>
(Int -> x -> y -> z) -> WhenMatched f x y z
M.zipWithMatched ((Int -> Partition a -> Partition a -> Partition a)
 -> SimpleWhenMatched (Partition a) (Partition a) (Partition a))
-> (Int -> Partition a -> Partition a -> Partition a)
-> SimpleWhenMatched (Partition a) (Partition a) (Partition a)
forall a b. (a -> b) -> a -> b
$ (Partition a -> Partition a -> Partition a)
-> Int -> Partition a -> Partition a -> Partition a
forall a b. a -> b -> a
const Partition a -> Partition a -> Partition a
forall a. Monoid a => a -> a -> a
mappend)) IntMap (Partition a)
forall a. IntMap a
M.empty
    IntMap (Partition a)
-> (IntMap (Partition a) -> [(Int, Partition a)])
-> [(Int, Partition a)]
forall a b. a -> (a -> b) -> b
& IntMap (Partition a) -> [(Int, Partition a)]
forall a. IntMap a -> [(Int, a)]
M.toList
    [(Int, Partition a)]
-> ([(Int, Partition a)] -> [Partition a]) -> [Partition a]
forall a b. a -> (a -> b) -> b
& ((Int, Partition a) -> Partition a)
-> [(Int, Partition a)] -> [Partition a]
forall a b. (a -> b) -> [a] -> [b]
map (Int, Partition a) -> Partition a
forall a b. (a, b) -> b
snd
    [Partition a]
-> ([Partition a] -> DD [Partition a]) -> DD [Partition a]
forall a b. a -> (a -> b) -> b
& [Partition a] -> DD [Partition a]
forall (m :: * -> *) a. Monad m => a -> m a
return
  where
    sort :: Monad m => ConduitT (Int, t) (Int, t) m ()
    sort :: ConduitT (Int, t) (Int, t) m ()
sort = ((Int, t) -> ConduitT (Int, t) (Int, t) m ())
-> [(Int, t)] -> ConduitT (Int, t) (Int, t) m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (Int, t) -> ConduitT (Int, t) (Int, t) m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield ([(Int, t)] -> ConduitT (Int, t) (Int, t) m ())
-> ([(Int, t)] -> [(Int, t)])
-> [(Int, t)]
-> ConduitT (Int, t) (Int, t) m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((Int, t) -> Int) -> [(Int, t)] -> [(Int, t)]
forall b a. Ord b => (a -> b) -> [a] -> [a]
sortOn (Int, t) -> Int
forall a b. (a, b) -> a
fst ([(Int, t)] -> ConduitT (Int, t) (Int, t) m ())
-> ConduitT (Int, t) (Int, t) m [(Int, t)]
-> ConduitT (Int, t) (Int, t) m ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< ConduitT (Int, t) (Int, t) m [(Int, t)]
forall (m :: * -> *) a o. Monad m => ConduitT a o m [a]
C.sinkList
runStages stage :: Stage a
stage@(SCoalesce Int
count Stage a
rest) = do
  [Partition a]
inputs <- Stage a -> DD [Partition a]
forall a. Stage a -> DD [Partition a]
runStages Stage a
rest
  Text -> DD ()
forall (m :: * -> *). MonadLogger m => Text -> m ()
logInfoN (Text -> DD ()) -> Text -> DD ()
forall a b. (a -> b) -> a -> b
$ Text
"Running: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (Stage a -> String
forall a. Stage a -> String
showTopStage Stage a
stage)
  [Partition a] -> DD [Partition a]
forall (m :: * -> *) a. Monad m => a -> m a
return ([Partition a] -> DD [Partition a])
-> [Partition a] -> DD [Partition a]
forall a b. (a -> b) -> a -> b
$ ([Partition a] -> Partition a) -> [[Partition a]] -> [Partition a]
forall a b. (a -> b) -> [a] -> [b]
map [Partition a] -> Partition a
forall a. Monoid a => [a] -> a
mconcat ([[Partition a]] -> [Partition a])
-> [[Partition a]] -> [Partition a]
forall a b. (a -> b) -> a -> b
$ [[Partition a]] -> [[Partition a]]
forall a. [[a]] -> [[a]]
transpose (Int -> [Partition a] -> [[Partition a]]
forall e. Int -> [e] -> [[e]]
chunksOf Int
count [Partition a]
inputs)

-- * Dataset API

-- |
-- Returns a Conduit to fetch the results lazily to the driver.
dFetch ::
  StaticSerialise a =>
  Dataset a ->
  DD (ConduitT () a (ResourceT IO) ())
dFetch :: Dataset a -> DD (ConduitT () a (ResourceT IO) ())
dFetch Dataset a
ds = do
  let stages :: Stage a
stages = Dataset a -> Stage a
forall a. Dataset a -> Stage a
mkStages Dataset a
ds
  Text -> DD ()
forall (m :: * -> *). MonadLogger m => Text -> m ()
logInfoN (Text -> DD ()) -> Text -> DD ()
forall a b. (a -> b) -> a -> b
$ Text
"Stages: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (Stage a -> String
forall a. Show a => a -> String
show Stage a
stages)
  [Partition a]
out <- Stage a -> DD [Partition a]
forall a. Stage a -> DD [Partition a]
runStages Stage a
stages
  ConduitT () a (ResourceT IO) ()
-> DD (ConduitT () a (ResourceT IO) ())
forall (m :: * -> *) a. Monad m => a -> m a
return (ConduitT () a (ResourceT IO) ()
 -> DD (ConduitT () a (ResourceT IO) ()))
-> ConduitT () a (ResourceT IO) ()
-> DD (ConduitT () a (ResourceT IO) ())
forall a b. (a -> b) -> a -> b
$ (Partition a -> ConduitT () a (ResourceT IO) ())
-> [Partition a] -> ConduitT () a (ResourceT IO) ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (Closure (ConduitT () a (ResourceT IO) ())
-> ConduitT () a (ResourceT IO) ()
forall a. Closure a -> a
unclosure (Closure (ConduitT () a (ResourceT IO) ())
 -> ConduitT () a (ResourceT IO) ())
-> (Partition a -> Closure (ConduitT () a (ResourceT IO) ()))
-> Partition a
-> ConduitT () a (ResourceT IO) ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Partition a -> Closure (ConduitT () a (ResourceT IO) ())
forall a.
Typeable a =>
Partition a -> Closure (ConduitT () a (ResourceT IO) ())
partitionProducer) [Partition a]
out

-- |
-- Fetches the complete dataset as a list.
dToList ::
  StaticSerialise a =>
  Dataset a ->
  DD [a]
dToList :: Dataset a -> DD [a]
dToList Dataset a
ds = do
  ConduitT () a (ResourceT IO) ()
c <- Dataset a -> DD (ConduitT () a (ResourceT IO) ())
forall a.
StaticSerialise a =>
Dataset a -> DD (ConduitT () a (ResourceT IO) ())
dFetch Dataset a
ds
  IO [a] -> DD [a]
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [a] -> DD [a]) -> IO [a] -> DD [a]
forall a b. (a -> b) -> a -> b
$ ConduitT () Void (ResourceT IO) [a] -> IO [a]
forall (m :: * -> *) r.
MonadUnliftIO m =>
ConduitT () Void (ResourceT m) r -> m r
runConduitRes (ConduitT () Void (ResourceT IO) [a] -> IO [a])
-> ConduitT () Void (ResourceT IO) [a] -> IO [a]
forall a b. (a -> b) -> a -> b
$ ConduitT () a (ResourceT IO) ()
c ConduitT () a (ResourceT IO) ()
-> ConduitM a Void (ResourceT IO) [a]
-> ConduitT () Void (ResourceT IO) [a]
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| ConduitM a Void (ResourceT IO) [a]
forall (m :: * -> *) a o. Monad m => ConduitT a o m [a]
sinkList