{-# LANGUAGE DataKinds #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StaticPointers #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeFamilies #-}
module Control.Distributed.Dataset.Internal.Dataset where
import Conduit hiding
( Consumer,
Producer,
await,
)
import qualified Conduit as C
import Control.Distributed.Closure
import Control.Distributed.Dataset.Internal.Class
import Control.Distributed.Dataset.Internal.Process
import Control.Distributed.Dataset.ShuffleStore
import qualified Control.Distributed.Fork.Utils as D
import Control.Lens
import Control.Monad
import Control.Monad.Logger
import Data.Conduit.Serialise
import Data.Hashable
import Data.IORef
import qualified Data.IntMap as M
import qualified Data.IntMap.Merge.Strict as M
import Data.List
( foldl',
sortOn,
transpose,
)
import Data.List.Split
import qualified Data.Text as T
import Data.Typeable
import System.Random
data Partition a where
PEmpty :: Partition a
PSimple :: Closure (ConduitT () a (ResourceT IO) ()) -> Partition a
PCombined :: Partition a -> Partition a -> Partition a
instance Semigroup (Partition a) where
Partition a
a <> :: Partition a -> Partition a -> Partition a
<> Partition a
b = Partition a -> Partition a -> Partition a
forall a. Partition a -> Partition a -> Partition a
PCombined Partition a
a Partition a
b
instance Monoid (Partition a) where
mempty :: Partition a
mempty = Partition a
forall a. Partition a
PEmpty
partitionProducer :: Typeable a => Partition a -> Closure (ConduitT () a (ResourceT IO) ())
partitionProducer :: Partition a -> Closure (ConduitT () a (ResourceT IO) ())
partitionProducer Partition a
PEmpty = static (() -> ConduitT () a (ResourceT IO) ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
partitionProducer (PSimple Closure (ConduitT () a (ResourceT IO) ())
p) = Closure (ConduitT () a (ResourceT IO) ())
p
partitionProducer (PCombined Partition a
p1 Partition a
p2) =
static ConduitT () a (ResourceT IO) ()
-> ConduitT () a (ResourceT IO) ()
-> ConduitT () a (ResourceT IO) ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
(>>) Closure
(ConduitT () a (ResourceT IO) ()
-> ConduitT () a (ResourceT IO) ()
-> ConduitT () a (ResourceT IO) ())
-> Closure (ConduitT () a (ResourceT IO) ())
-> Closure
(ConduitT () a (ResourceT IO) ()
-> ConduitT () a (ResourceT IO) ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Partition a -> Closure (ConduitT () a (ResourceT IO) ())
forall a.
Typeable a =>
Partition a -> Closure (ConduitT () a (ResourceT IO) ())
partitionProducer Partition a
p1 Closure
(ConduitT () a (ResourceT IO) ()
-> ConduitT () a (ResourceT IO) ())
-> Closure (ConduitT () a (ResourceT IO) ())
-> Closure (ConduitT () a (ResourceT IO) ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Partition a -> Closure (ConduitT () a (ResourceT IO) ())
forall a.
Typeable a =>
Partition a -> Closure (ConduitT () a (ResourceT IO) ())
partitionProducer Partition a
p2
data Dataset a where
DExternal :: StaticSerialise a => [Partition a] -> Dataset a
DPipe ::
(StaticSerialise a, StaticSerialise b) =>
Closure (ConduitT a b (ResourceT IO) ()) ->
Dataset a ->
Dataset b
DPartition ::
(StaticHashable k, StaticSerialise a) =>
Int ->
Closure (a -> k) ->
Dataset a ->
Dataset a
DCoalesce ::
Int ->
Dataset a ->
Dataset a
dStaticSerialise :: Dataset a -> Dict (StaticSerialise a)
dStaticSerialise :: Dataset a -> Dict (StaticSerialise a)
dStaticSerialise DExternal {} = Dict (StaticSerialise a)
forall (a :: Constraint). a => Dict a
Dict
dStaticSerialise DPipe {} = Dict (StaticSerialise a)
forall (a :: Constraint). a => Dict a
Dict
dStaticSerialise DPartition {} = Dict (StaticSerialise a)
forall (a :: Constraint). a => Dict a
Dict
dStaticSerialise (DCoalesce Int
_ Dataset a
d) = Dataset a -> Dict (StaticSerialise a)
forall a. Dataset a -> Dict (StaticSerialise a)
dStaticSerialise Dataset a
d
data Stage a where
SInit :: Typeable a => [Partition a] -> Stage a
SNarrow ::
(StaticSerialise a, StaticSerialise b) =>
Closure (ConduitM a b (ResourceT IO) ()) ->
Stage a ->
Stage b
SWide ::
(StaticSerialise a, StaticSerialise b) =>
Int ->
Closure (ConduitM a (Int, b) (ResourceT IO) ()) ->
Stage a ->
Stage b
SCoalesce :: Int -> Stage a -> Stage a
instance Show (Stage a) where
show :: Stage a -> String
show s :: Stage a
s@(SInit [Partition a]
_) = Stage a -> String
forall a. Stage a -> String
showTopStage Stage a
s
show s :: Stage a
s@(SNarrow Closure (ConduitM a a (ResourceT IO) ())
_ Stage a
r) = [String] -> String
forall a. Monoid a => [a] -> a
mconcat [Stage a -> String
forall a. Show a => a -> String
show Stage a
r, String
" -> ", Stage a -> String
forall a. Stage a -> String
showTopStage Stage a
s]
show s :: Stage a
s@(SWide Int
_ Closure (ConduitM a (Int, a) (ResourceT IO) ())
_ Stage a
r) = [String] -> String
forall a. Monoid a => [a] -> a
mconcat [Stage a -> String
forall a. Show a => a -> String
show Stage a
r, String
" -> ", Stage a -> String
forall a. Stage a -> String
showTopStage Stage a
s]
show s :: Stage a
s@(SCoalesce Int
_ Stage a
r) = [String] -> String
forall a. Monoid a => [a] -> a
mconcat [Stage a -> String
forall a. Show a => a -> String
show Stage a
r, String
" -> ", Stage a -> String
forall a. Stage a -> String
showTopStage Stage a
s]
showTopStage :: forall a. Stage a -> String
showTopStage :: Stage a -> String
showTopStage (SInit [Partition a]
p) =
[String] -> String
forall a. Monoid a => [a] -> a
mconcat
[ String
"SInit",
String
" @",
TypeRep -> String
forall a. Show a => a -> String
show (Proxy a -> TypeRep
forall k (proxy :: k -> *) (a :: k).
Typeable a =>
proxy a -> TypeRep
typeRep (Proxy a -> TypeRep) -> Proxy a -> TypeRep
forall a b. (a -> b) -> a -> b
$ Proxy a
forall k (t :: k). Proxy t
Proxy @a),
String
" ",
Int -> String
forall a. Show a => a -> String
show ([Partition a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [Partition a]
p)
]
showTopStage (SNarrow Closure (ConduitM a a (ResourceT IO) ())
_ Stage a
_) =
[String] -> String
forall a. Monoid a => [a] -> a
mconcat
[ String
"* SNarrow",
String
" @",
TypeRep -> String
forall a. Show a => a -> String
show (Proxy a -> TypeRep
forall k (proxy :: k -> *) (a :: k).
Typeable a =>
proxy a -> TypeRep
typeRep (Proxy a -> TypeRep) -> Proxy a -> TypeRep
forall a b. (a -> b) -> a -> b
$ Proxy a
forall k (t :: k). Proxy t
Proxy @a)
]
showTopStage (SWide Int
i Closure (ConduitM a (Int, a) (ResourceT IO) ())
_ Stage a
_) =
[String] -> String
forall a. Monoid a => [a] -> a
mconcat
[ String
"* SWide",
String
" @",
TypeRep -> String
forall a. Show a => a -> String
show (Proxy a -> TypeRep
forall k (proxy :: k -> *) (a :: k).
Typeable a =>
proxy a -> TypeRep
typeRep (Proxy a -> TypeRep) -> Proxy a -> TypeRep
forall a b. (a -> b) -> a -> b
$ Proxy a
forall k (t :: k). Proxy t
Proxy @a),
String
" ",
Int -> String
forall a. Show a => a -> String
show Int
i
]
showTopStage (SCoalesce Int
i Stage a
_) =
[String] -> String
forall a. Monoid a => [a] -> a
mconcat
[ String
"SCoalesce",
String
" ",
Int -> String
forall a. Show a => a -> String
show Int
i
]
mkStages :: Dataset a -> Stage a
mkStages :: Dataset a -> Stage a
mkStages (DExternal [Partition a]
a) = [Partition a] -> Stage a
forall a. Typeable a => [Partition a] -> Stage a
SInit [Partition a]
a
mkStages (DPipe Closure (ConduitT a a (ResourceT IO) ())
p Dataset a
rest) =
case Dataset a -> Stage a
forall a. Dataset a -> Stage a
mkStages Dataset a
rest of
SNarrow Closure (ConduitM a a (ResourceT IO) ())
prev Stage a
r ->
Closure (ConduitM a a (ResourceT IO) ()) -> Stage a -> Stage a
forall a b.
(StaticSerialise a, StaticSerialise b) =>
Closure (ConduitM a b (ResourceT IO) ()) -> Stage a -> Stage b
SNarrow (static ConduitM a a (ResourceT IO) ()
-> ConduitT a a (ResourceT IO) () -> ConduitM a a (ResourceT IO) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
(.|) Closure
(ConduitM a a (ResourceT IO) ()
-> ConduitT a a (ResourceT IO) ()
-> ConduitM a a (ResourceT IO) ())
-> Closure (ConduitM a a (ResourceT IO) ())
-> Closure
(ConduitT a a (ResourceT IO) () -> ConduitM a a (ResourceT IO) ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (ConduitM a a (ResourceT IO) ())
prev Closure
(ConduitT a a (ResourceT IO) () -> ConduitM a a (ResourceT IO) ())
-> Closure (ConduitT a a (ResourceT IO) ())
-> Closure (ConduitM a a (ResourceT IO) ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (ConduitT a a (ResourceT IO) ())
p) Stage a
r
Stage a
other ->
Closure (ConduitT a a (ResourceT IO) ()) -> Stage a -> Stage a
forall a b.
(StaticSerialise a, StaticSerialise b) =>
Closure (ConduitM a b (ResourceT IO) ()) -> Stage a -> Stage b
SNarrow Closure (ConduitT a a (ResourceT IO) ())
p Stage a
other
mkStages (DPartition Int
count (Closure (a -> k)
cf :: Closure (a -> k)) Dataset a
rest) =
case Dataset a -> Stage a
forall a. Dataset a -> Stage a
mkStages Dataset a
rest of
SNarrow Closure (ConduitM a a (ResourceT IO) ())
cp Stage a
rest' ->
Int
-> Closure (ConduitM a (Int, a) (ResourceT IO) ())
-> Stage a
-> Stage a
forall a b.
(StaticSerialise a, StaticSerialise b) =>
Int
-> Closure (ConduitM a (Int, b) (ResourceT IO) ())
-> Stage a
-> Stage b
SWide
Int
count
( static (\Dict (Typeable k, Eq k, Hashable k)
Dict ConduitM a a (ResourceT IO) ()
p a -> k
f -> ConduitM a a (ResourceT IO) ()
p ConduitM a a (ResourceT IO) ()
-> ConduitM a (Int, a) (ResourceT IO) ()
-> ConduitM a (Int, a) (ResourceT IO) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| (a -> k) -> ConduitM a (Int, a) (ResourceT IO) ()
forall t e (m :: * -> *).
(Hashable e, Monad m) =>
(t -> e) -> ConduitT t (Int, t) m ()
partition @a @k a -> k
f)
Closure
(Dict (Typeable k, Eq k, Hashable k)
-> ConduitM a a (ResourceT IO) ()
-> (a -> k)
-> ConduitM a (Int, a) (ResourceT IO) ())
-> Closure (Dict (Typeable k, Eq k, Hashable k))
-> Closure
(ConduitM a a (ResourceT IO) ()
-> (a -> k) -> ConduitM a (Int, a) (ResourceT IO) ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` StaticHashable k => Closure (Dict (Typeable k, Eq k, Hashable k))
forall a.
StaticHashable a =>
Closure (Dict (Typeable a, Eq a, Hashable a))
staticHashable @k
Closure
(ConduitM a a (ResourceT IO) ()
-> (a -> k) -> ConduitM a (Int, a) (ResourceT IO) ())
-> Closure (ConduitM a a (ResourceT IO) ())
-> Closure ((a -> k) -> ConduitM a (Int, a) (ResourceT IO) ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (ConduitM a a (ResourceT IO) ())
cp
Closure ((a -> k) -> ConduitM a (Int, a) (ResourceT IO) ())
-> Closure (a -> k)
-> Closure (ConduitM a (Int, a) (ResourceT IO) ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (a -> k)
cf
)
Stage a
rest'
Stage a
other ->
Int
-> Closure (ConduitM a (Int, a) (ResourceT IO) ())
-> Stage a
-> Stage a
forall a b.
(StaticSerialise a, StaticSerialise b) =>
Int
-> Closure (ConduitM a (Int, b) (ResourceT IO) ())
-> Stage a
-> Stage b
SWide
Int
count
( static (\Dict (Typeable k, Eq k, Hashable k)
Dict -> forall t e (m :: * -> *).
(Hashable e, Monad m) =>
(t -> e) -> ConduitT t (Int, t) m ()
forall (m :: * -> *).
(Hashable k, Monad m) =>
(a -> k) -> ConduitT a (Int, a) m ()
partition @a @k)
Closure
(Dict (Typeable k, Eq k, Hashable k)
-> (a -> k) -> ConduitM a (Int, a) (ResourceT IO) ())
-> Closure (Dict (Typeable k, Eq k, Hashable k))
-> Closure ((a -> k) -> ConduitM a (Int, a) (ResourceT IO) ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` StaticHashable k => Closure (Dict (Typeable k, Eq k, Hashable k))
forall a.
StaticHashable a =>
Closure (Dict (Typeable a, Eq a, Hashable a))
staticHashable @k
Closure ((a -> k) -> ConduitM a (Int, a) (ResourceT IO) ())
-> Closure (a -> k)
-> Closure (ConduitM a (Int, a) (ResourceT IO) ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (a -> k)
cf
)
Stage a
other
where
partition :: forall t e m. (Hashable e, Monad m) => (t -> e) -> ConduitT t (Int, t) m ()
partition :: (t -> e) -> ConduitT t (Int, t) m ()
partition t -> e
f =
ConduitT t (Int, t) m (Maybe t)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
C.await ConduitT t (Int, t) m (Maybe t)
-> (Maybe t -> ConduitT t (Int, t) m ())
-> ConduitT t (Int, t) m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Maybe t
Nothing -> () -> ConduitT t (Int, t) m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just t
a -> (Int, t) -> ConduitT t (Int, t) m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield (e -> Int
forall a. Hashable a => a -> Int
hash (t -> e
f t
a), t
a) ConduitT t (Int, t) m ()
-> ConduitT t (Int, t) m () -> ConduitT t (Int, t) m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> (t -> e) -> ConduitT t (Int, t) m ()
forall t e (m :: * -> *).
(Hashable e, Monad m) =>
(t -> e) -> ConduitT t (Int, t) m ()
partition t -> e
f
mkStages (DCoalesce Int
count Dataset a
rest) =
case Dataset a -> Stage a
forall a. Dataset a -> Stage a
mkStages Dataset a
rest of
SNarrow Closure (ConduitM a a (ResourceT IO) ())
cp Stage a
rest' ->
Closure (ConduitM a a (ResourceT IO) ()) -> Stage a -> Stage a
forall a b.
(StaticSerialise a, StaticSerialise b) =>
Closure (ConduitM a b (ResourceT IO) ()) -> Stage a -> Stage b
SNarrow Closure (ConduitM a a (ResourceT IO) ())
cp (Int -> Stage a -> Stage a
forall a. Int -> Stage a -> Stage a
SCoalesce Int
count Stage a
rest')
SWide Int
_ Closure (ConduitM a (Int, a) (ResourceT IO) ())
cp Stage a
rest' ->
Int
-> Closure (ConduitM a (Int, a) (ResourceT IO) ())
-> Stage a
-> Stage a
forall a b.
(StaticSerialise a, StaticSerialise b) =>
Int
-> Closure (ConduitM a (Int, b) (ResourceT IO) ())
-> Stage a
-> Stage b
SWide Int
count Closure (ConduitM a (Int, a) (ResourceT IO) ())
cp Stage a
rest'
SCoalesce Int
_ Stage a
rest' ->
Int -> Stage a -> Stage a
forall a. Int -> Stage a -> Stage a
SCoalesce Int
count Stage a
rest'
Stage a
other ->
Int -> Stage a -> Stage a
forall a. Int -> Stage a -> Stage a
SCoalesce Int
count Stage a
other
runStages :: forall a. Stage a -> DD [Partition a]
runStages :: Stage a -> DD [Partition a]
runStages stage :: Stage a
stage@(SInit [Partition a]
ps) = do
Text -> DD ()
forall (m :: * -> *). MonadLogger m => Text -> m ()
logInfoN (Text -> DD ()) -> Text -> DD ()
forall a b. (a -> b) -> a -> b
$ Text
"Running: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (Stage a -> String
forall a. Stage a -> String
showTopStage Stage a
stage)
[Partition a] -> DD [Partition a]
forall (m :: * -> *) a. Monad m => a -> m a
return [Partition a]
ps
runStages stage :: Stage a
stage@(SNarrow Closure (ConduitM a a (ResourceT IO) ())
cpipe Stage a
rest) = do
[Partition a]
inputs <- Stage a -> DD [Partition a]
forall a. Stage a -> DD [Partition a]
runStages Stage a
rest
Text -> DD ()
forall (m :: * -> *). MonadLogger m => Text -> m ()
logInfoN (Text -> DD ()) -> Text -> DD ()
forall a b. (a -> b) -> a -> b
$ Text
"Running: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (Stage a -> String
forall a. Stage a -> String
showTopStage Stage a
stage)
ShuffleStore
shuffleStore <- Getting ShuffleStore DDEnv ShuffleStore -> DD ShuffleStore
forall s (m :: * -> *) a. MonadReader s m => Getting a s a -> m a
view Getting ShuffleStore DDEnv ShuffleStore
Lens' DDEnv ShuffleStore
ddShuffleStore
[(Closure (IO (ExecutorResponse ())), Partition a)]
tasks <-
[Partition a]
-> (Partition a
-> DD (Closure (IO (ExecutorResponse ())), Partition a))
-> DD [(Closure (IO (ExecutorResponse ())), Partition a)]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [Partition a]
inputs ((Partition a
-> DD (Closure (IO (ExecutorResponse ())), Partition a))
-> DD [(Closure (IO (ExecutorResponse ())), Partition a)])
-> (Partition a
-> DD (Closure (IO (ExecutorResponse ())), Partition a))
-> DD [(Closure (IO (ExecutorResponse ())), Partition a)]
forall a b. (a -> b) -> a -> b
$ \Partition a
input -> do
Int64
num <- IO Int64 -> DD Int64
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO Int64
forall a. Random a => IO a
randomIO
let coutput :: Closure (ConduitT ByteString Void (ResourceT IO) ())
coutput = ShuffleStore
-> Closure (Int64 -> ConduitT ByteString Void (ResourceT IO) ())
ssPut ShuffleStore
shuffleStore Closure (Int64 -> ConduitT ByteString Void (ResourceT IO) ())
-> Closure Int64
-> Closure (ConduitT ByteString Void (ResourceT IO) ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (Dict (Serializable Int64)) -> Int64 -> Closure Int64
forall a. Closure (Dict (Serializable a)) -> a -> Closure a
cpure (static Dict (Serializable Int64)
forall (a :: Constraint). a => Dict a
Dict) Int64
num
cinput :: Closure (ConduitT () ByteString (ResourceT IO) ())
cinput = ShuffleStore
-> Closure
(Int64 -> Range -> ConduitT () ByteString (ResourceT IO) ())
ssGet ShuffleStore
shuffleStore Closure
(Int64 -> Range -> ConduitT () ByteString (ResourceT IO) ())
-> Closure Int64
-> Closure (Range -> ConduitT () ByteString (ResourceT IO) ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (Dict (Serializable Int64)) -> Int64 -> Closure Int64
forall a. Closure (Dict (Serializable a)) -> a -> Closure a
cpure (static Dict (Serializable Int64)
forall (a :: Constraint). a => Dict a
Dict) Int64
num Closure (Range -> ConduitT () ByteString (ResourceT IO) ())
-> Closure Range
-> Closure (ConduitT () ByteString (ResourceT IO) ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (Dict (Serializable Range)) -> Range -> Closure Range
forall a. Closure (Dict (Serializable a)) -> a -> Closure a
cpure (static Dict (Serializable Range)
forall (a :: Constraint). a => Dict a
Dict) Range
RangeAll
crun :: Closure (IO (ExecutorResponse ()))
crun =
static
( \Dict (Typeable a, Serialise a)
Dict ConduitT () a (ResourceT IO) ()
producer ConduitM a a (ResourceT IO) ()
pipe ConduitT ByteString Void (ResourceT IO) ()
output ->
(ExecutorStatsHooks -> IO ()) -> IO (ExecutorResponse ())
forall a. (ExecutorStatsHooks -> IO a) -> IO (ExecutorResponse a)
withExecutorStats ((ExecutorStatsHooks -> IO ()) -> IO (ExecutorResponse ()))
-> (ExecutorStatsHooks -> IO ()) -> IO (ExecutorResponse ())
forall a b. (a -> b) -> a -> b
$ \ExecutorStatsHooks {forall a (m :: * -> *). MonadIO m => ConduitT a a m ()
forall (m :: * -> *).
MonadIO m =>
ConduitT ByteString ByteString m ()
eshOutput :: ExecutorStatsHooks
-> forall a (m :: * -> *). MonadIO m => ConduitT a a m ()
eshInput :: ExecutorStatsHooks
-> forall a (m :: * -> *). MonadIO m => ConduitT a a m ()
eshUpload :: ExecutorStatsHooks
-> forall (m :: * -> *).
MonadIO m =>
ConduitT ByteString ByteString m ()
eshDownload :: ExecutorStatsHooks
-> forall (m :: * -> *).
MonadIO m =>
ConduitT ByteString ByteString m ()
eshOutput :: forall a (m :: * -> *). MonadIO m => ConduitT a a m ()
eshInput :: forall a (m :: * -> *). MonadIO m => ConduitT a a m ()
eshUpload :: forall (m :: * -> *).
MonadIO m =>
ConduitT ByteString ByteString m ()
eshDownload :: forall (m :: * -> *).
MonadIO m =>
ConduitT ByteString ByteString m ()
..} ->
ConduitT () Void (ResourceT IO) () -> IO ()
forall (m :: * -> *) r.
MonadUnliftIO m =>
ConduitT () Void (ResourceT m) r -> m r
C.runConduitRes (ConduitT () Void (ResourceT IO) () -> IO ())
-> ConduitT () Void (ResourceT IO) () -> IO ()
forall a b. (a -> b) -> a -> b
$
ConduitT () a (ResourceT IO) ()
producer
ConduitT () a (ResourceT IO) ()
-> ConduitM a Void (ResourceT IO) ()
-> ConduitT () Void (ResourceT IO) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| ConduitT a a (ResourceT IO) ()
forall a (m :: * -> *). MonadIO m => ConduitT a a m ()
eshInput
ConduitT a a (ResourceT IO) ()
-> ConduitM a Void (ResourceT IO) ()
-> ConduitM a Void (ResourceT IO) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| ConduitM a a (ResourceT IO) ()
pipe
ConduitM a a (ResourceT IO) ()
-> ConduitM a Void (ResourceT IO) ()
-> ConduitM a Void (ResourceT IO) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| ConduitT a a (ResourceT IO) ()
forall a (m :: * -> *). MonadIO m => ConduitT a a m ()
eshOutput
ConduitT a a (ResourceT IO) ()
-> ConduitM a Void (ResourceT IO) ()
-> ConduitM a Void (ResourceT IO) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| forall a (m :: * -> *).
(Serialise a, Monad m) =>
ConduitM a ByteString m ()
forall (m :: * -> *).
(Serialise a, Monad m) =>
ConduitM a ByteString m ()
serialiseC @a
ConduitM a ByteString (ResourceT IO) ()
-> ConduitT ByteString Void (ResourceT IO) ()
-> ConduitM a Void (ResourceT IO) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| ConduitT ByteString ByteString (ResourceT IO) ()
forall (m :: * -> *).
MonadIO m =>
ConduitT ByteString ByteString m ()
eshUpload
ConduitT ByteString ByteString (ResourceT IO) ()
-> ConduitT ByteString Void (ResourceT IO) ()
-> ConduitT ByteString Void (ResourceT IO) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| ConduitT ByteString Void (ResourceT IO) ()
output
)
Closure
(Dict (Typeable a, Serialise a)
-> ConduitT () a (ResourceT IO) ()
-> ConduitM a a (ResourceT IO) ()
-> ConduitT ByteString Void (ResourceT IO) ()
-> IO (ExecutorResponse ()))
-> Closure (Dict (Typeable a, Serialise a))
-> Closure
(ConduitT () a (ResourceT IO) ()
-> ConduitM a a (ResourceT IO) ()
-> ConduitT ByteString Void (ResourceT IO) ()
-> IO (ExecutorResponse ()))
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` StaticSerialise a => Closure (Dict (Typeable a, Serialise a))
forall a.
StaticSerialise a =>
Closure (Dict (Typeable a, Serialise a))
staticSerialise @a
Closure
(ConduitT () a (ResourceT IO) ()
-> ConduitM a a (ResourceT IO) ()
-> ConduitT ByteString Void (ResourceT IO) ()
-> IO (ExecutorResponse ()))
-> Closure (ConduitT () a (ResourceT IO) ())
-> Closure
(ConduitM a a (ResourceT IO) ()
-> ConduitT ByteString Void (ResourceT IO) ()
-> IO (ExecutorResponse ()))
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Partition a -> Closure (ConduitT () a (ResourceT IO) ())
forall a.
Typeable a =>
Partition a -> Closure (ConduitT () a (ResourceT IO) ())
partitionProducer Partition a
input
Closure
(ConduitM a a (ResourceT IO) ()
-> ConduitT ByteString Void (ResourceT IO) ()
-> IO (ExecutorResponse ()))
-> Closure (ConduitM a a (ResourceT IO) ())
-> Closure
(ConduitT ByteString Void (ResourceT IO) ()
-> IO (ExecutorResponse ()))
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (ConduitM a a (ResourceT IO) ())
cpipe
Closure
(ConduitT ByteString Void (ResourceT IO) ()
-> IO (ExecutorResponse ()))
-> Closure (ConduitT ByteString Void (ResourceT IO) ())
-> Closure (IO (ExecutorResponse ()))
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (ConduitT ByteString Void (ResourceT IO) ())
coutput
newPartition :: Partition a
newPartition =
Closure (ConduitT () a (ResourceT IO) ()) -> Partition a
forall a. Closure (ConduitT () a (ResourceT IO) ()) -> Partition a
PSimple @a
( static (\Dict (Typeable a, Serialise a)
Dict ConduitT () ByteString (ResourceT IO) ()
input' -> ConduitT () ByteString (ResourceT IO) ()
input' ConduitT () ByteString (ResourceT IO) ()
-> ConduitM ByteString a (ResourceT IO) ()
-> ConduitT () a (ResourceT IO) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| ConduitM ByteString a (ResourceT IO) ()
forall a (m :: * -> *).
(Serialise a, MonadIO m) =>
ConduitM ByteString a m ()
deserialiseC)
Closure
(Dict (Typeable a, Serialise a)
-> ConduitT () ByteString (ResourceT IO) ()
-> ConduitT () a (ResourceT IO) ())
-> Closure (Dict (Typeable a, Serialise a))
-> Closure
(ConduitT () ByteString (ResourceT IO) ()
-> ConduitT () a (ResourceT IO) ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` StaticSerialise a => Closure (Dict (Typeable a, Serialise a))
forall a.
StaticSerialise a =>
Closure (Dict (Typeable a, Serialise a))
staticSerialise @a
Closure
(ConduitT () ByteString (ResourceT IO) ()
-> ConduitT () a (ResourceT IO) ())
-> Closure (ConduitT () ByteString (ResourceT IO) ())
-> Closure (ConduitT () a (ResourceT IO) ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (ConduitT () ByteString (ResourceT IO) ())
cinput
)
(Closure (IO (ExecutorResponse ())), Partition a)
-> DD (Closure (IO (ExecutorResponse ())), Partition a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Closure (IO (ExecutorResponse ()))
crun, Partition a
newPartition)
Backend
backend <- Getting Backend DDEnv Backend -> DD Backend
forall s (m :: * -> *) a. MonadReader s m => Getting a s a -> m a
view Getting Backend DDEnv Backend
Lens' DDEnv Backend
ddBackend
LogLevel
level <- Getting LogLevel DDEnv LogLevel -> DD LogLevel
forall s (m :: * -> *) a. MonadReader s m => Getting a s a -> m a
view Getting LogLevel DDEnv LogLevel
Lens' DDEnv LogLevel
ddLogLevel
let f :: Backend
-> Closure (Dict (Serializable (ExecutorResponse ())))
-> [Closure (IO (ExecutorResponse ()))]
-> IO [ExecutorResponse ()]
f =
Options
-> Backend
-> Closure (Dict (Serializable (ExecutorResponse ())))
-> [Closure (IO (ExecutorResponse ()))]
-> IO [ExecutorResponse ()]
forall a.
Options
-> Backend
-> Closure (Dict (Serializable a))
-> [Closure (IO a)]
-> IO [a]
D.forkConcurrently
( Options
D.defaultOptions
{ oShowProgress :: Bool
D.oShowProgress = LogLevel
level LogLevel -> LogLevel -> Bool
forall a. Ord a => a -> a -> Bool
<= LogLevel
LevelInfo,
oRetries :: Int
D.oRetries = Int
2
}
)
[ExecutorResponse ()]
ret <- IO [ExecutorResponse ()] -> DD [ExecutorResponse ()]
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [ExecutorResponse ()] -> DD [ExecutorResponse ()])
-> IO [ExecutorResponse ()] -> DD [ExecutorResponse ()]
forall a b. (a -> b) -> a -> b
$ Backend
-> Closure (Dict (Serializable (ExecutorResponse ())))
-> [Closure (IO (ExecutorResponse ()))]
-> IO [ExecutorResponse ()]
f Backend
backend (static Dict (Serializable (ExecutorResponse ()))
forall (a :: Constraint). a => Dict a
Dict) (((Closure (IO (ExecutorResponse ())), Partition a)
-> Closure (IO (ExecutorResponse ())))
-> [(Closure (IO (ExecutorResponse ())), Partition a)]
-> [Closure (IO (ExecutorResponse ()))]
forall a b. (a -> b) -> [a] -> [b]
map (Closure (IO (ExecutorResponse ())), Partition a)
-> Closure (IO (ExecutorResponse ()))
forall a b. (a, b) -> a
fst [(Closure (IO (ExecutorResponse ())), Partition a)]
tasks)
Text -> DD ()
forall (m :: * -> *). MonadLogger m => Text -> m ()
logDebugN (Text -> DD ()) -> Text -> DD ()
forall a b. (a -> b) -> a -> b
$ Text
"Stats: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (ExecutorStats -> String
forall a. Show a => a -> String
show (ExecutorStats -> String) -> ExecutorStats -> String
forall a b. (a -> b) -> a -> b
$ (ExecutorResponse () -> ExecutorStats)
-> [ExecutorResponse ()] -> ExecutorStats
forall (t :: * -> *) m a.
(Foldable t, Monoid m) =>
(a -> m) -> t a -> m
foldMap ExecutorResponse () -> ExecutorStats
forall a. ExecutorResponse a -> ExecutorStats
erStats [ExecutorResponse ()]
ret)
[Partition a] -> DD [Partition a]
forall (m :: * -> *) a. Monad m => a -> m a
return ([Partition a] -> DD [Partition a])
-> [Partition a] -> DD [Partition a]
forall a b. (a -> b) -> a -> b
$ ((Closure (IO (ExecutorResponse ())), Partition a) -> Partition a)
-> [(Closure (IO (ExecutorResponse ())), Partition a)]
-> [Partition a]
forall a b. (a -> b) -> [a] -> [b]
map (Closure (IO (ExecutorResponse ())), Partition a) -> Partition a
forall a b. (a, b) -> b
snd [(Closure (IO (ExecutorResponse ())), Partition a)]
tasks
runStages stage :: Stage a
stage@(SWide Int
count Closure (ConduitM a (Int, a) (ResourceT IO) ())
cpipe Stage a
rest) = do
[Partition a]
inputs <- Stage a -> DD [Partition a]
forall a. Stage a -> DD [Partition a]
runStages Stage a
rest
Text -> DD ()
forall (m :: * -> *). MonadLogger m => Text -> m ()
logInfoN (Text -> DD ()) -> Text -> DD ()
forall a b. (a -> b) -> a -> b
$ Text
"Running: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (Stage a -> String
forall a. Stage a -> String
showTopStage Stage a
stage)
ShuffleStore
shuffleStore <- Getting ShuffleStore DDEnv ShuffleStore -> DD ShuffleStore
forall s (m :: * -> *) a. MonadReader s m => Getting a s a -> m a
view Getting ShuffleStore DDEnv ShuffleStore
Lens' DDEnv ShuffleStore
ddShuffleStore
[(Closure (IO (ExecutorResponse [(Int, (Integer, Integer))])),
Int64)]
tasks <-
[Partition a]
-> (Partition a
-> DD
(Closure (IO (ExecutorResponse [(Int, (Integer, Integer))])),
Int64))
-> DD
[(Closure (IO (ExecutorResponse [(Int, (Integer, Integer))])),
Int64)]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [Partition a]
inputs ((Partition a
-> DD
(Closure (IO (ExecutorResponse [(Int, (Integer, Integer))])),
Int64))
-> DD
[(Closure (IO (ExecutorResponse [(Int, (Integer, Integer))])),
Int64)])
-> (Partition a
-> DD
(Closure (IO (ExecutorResponse [(Int, (Integer, Integer))])),
Int64))
-> DD
[(Closure (IO (ExecutorResponse [(Int, (Integer, Integer))])),
Int64)]
forall a b. (a -> b) -> a -> b
$ \Partition a
partition -> do
Int64
num <- IO Int64 -> DD Int64
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO Int64
forall a. Random a => IO a
randomIO
let coutput :: Closure (ConduitT ByteString Void (ResourceT IO) ())
coutput = ShuffleStore
-> Closure (Int64 -> ConduitT ByteString Void (ResourceT IO) ())
ssPut ShuffleStore
shuffleStore Closure (Int64 -> ConduitT ByteString Void (ResourceT IO) ())
-> Closure Int64
-> Closure (ConduitT ByteString Void (ResourceT IO) ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (Dict (Serializable Int64)) -> Int64 -> Closure Int64
forall a. Closure (Dict (Serializable a)) -> a -> Closure a
cpure (static Dict (Serializable Int64)
forall (a :: Constraint). a => Dict a
Dict) Int64
num
crun :: Closure (IO (ExecutorResponse [(Int, (Integer, Integer))]))
crun =
static
( \Dict (Typeable a, Serialise a)
Dict Int
count' ConduitT () a (ResourceT IO) ()
input ConduitM a (Int, a) (ResourceT IO) ()
pipe ConduitT ByteString Void (ResourceT IO) ()
output ->
(ExecutorStatsHooks -> IO [(Int, (Integer, Integer))])
-> IO (ExecutorResponse [(Int, (Integer, Integer))])
forall a. (ExecutorStatsHooks -> IO a) -> IO (ExecutorResponse a)
withExecutorStats ((ExecutorStatsHooks -> IO [(Int, (Integer, Integer))])
-> IO (ExecutorResponse [(Int, (Integer, Integer))]))
-> (ExecutorStatsHooks -> IO [(Int, (Integer, Integer))])
-> IO (ExecutorResponse [(Int, (Integer, Integer))])
forall a b. (a -> b) -> a -> b
$ \ExecutorStatsHooks {forall a (m :: * -> *). MonadIO m => ConduitT a a m ()
forall (m :: * -> *).
MonadIO m =>
ConduitT ByteString ByteString m ()
eshOutput :: forall a (m :: * -> *). MonadIO m => ConduitT a a m ()
eshInput :: forall a (m :: * -> *). MonadIO m => ConduitT a a m ()
eshUpload :: forall (m :: * -> *).
MonadIO m =>
ConduitT ByteString ByteString m ()
eshDownload :: forall (m :: * -> *).
MonadIO m =>
ConduitT ByteString ByteString m ()
eshOutput :: ExecutorStatsHooks
-> forall a (m :: * -> *). MonadIO m => ConduitT a a m ()
eshInput :: ExecutorStatsHooks
-> forall a (m :: * -> *). MonadIO m => ConduitT a a m ()
eshUpload :: ExecutorStatsHooks
-> forall (m :: * -> *).
MonadIO m =>
ConduitT ByteString ByteString m ()
eshDownload :: ExecutorStatsHooks
-> forall (m :: * -> *).
MonadIO m =>
ConduitT ByteString ByteString m ()
..} -> do
IORef [(Int, (Integer, Integer))]
ref <- [(Int, (Integer, Integer))]
-> IO (IORef [(Int, (Integer, Integer))])
forall a. a -> IO (IORef a)
newIORef @[(Int, (Integer, Integer))] []
ConduitT () Void (ResourceT IO) () -> IO ()
forall (m :: * -> *) r.
MonadUnliftIO m =>
ConduitT () Void (ResourceT m) r -> m r
C.runConduitRes (ConduitT () Void (ResourceT IO) () -> IO ())
-> ConduitT () Void (ResourceT IO) () -> IO ()
forall a b. (a -> b) -> a -> b
$
ConduitT () a (ResourceT IO) ()
input
ConduitT () a (ResourceT IO) ()
-> ConduitM a Void (ResourceT IO) ()
-> ConduitT () Void (ResourceT IO) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| ConduitT a a (ResourceT IO) ()
forall a (m :: * -> *). MonadIO m => ConduitT a a m ()
eshInput
ConduitT a a (ResourceT IO) ()
-> ConduitM a Void (ResourceT IO) ()
-> ConduitM a Void (ResourceT IO) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| ConduitM a (Int, a) (ResourceT IO) ()
pipe
ConduitM a (Int, a) (ResourceT IO) ()
-> ConduitM (Int, a) Void (ResourceT IO) ()
-> ConduitM a Void (ResourceT IO) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| ((Int, a) -> (Int, a))
-> ConduitT (Int, a) (Int, a) (ResourceT IO) ()
forall (m :: * -> *) a b. Monad m => (a -> b) -> ConduitT a b m ()
mapC (\(Int
k, a
v) -> (Int
k Int -> Int -> Int
forall a. Integral a => a -> a -> a
`mod` Int
count', a
v))
ConduitT (Int, a) (Int, a) (ResourceT IO) ()
-> ConduitM (Int, a) Void (ResourceT IO) ()
-> ConduitM (Int, a) Void (ResourceT IO) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| ConduitT (Int, a) (Int, a) (ResourceT IO) ()
forall a (m :: * -> *). MonadIO m => ConduitT a a m ()
eshOutput
ConduitT (Int, a) (Int, a) (ResourceT IO) ()
-> ConduitM (Int, a) Void (ResourceT IO) ()
-> ConduitM (Int, a) Void (ResourceT IO) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| forall t.
Monad (ResourceT IO) =>
ConduitT (Int, t) (Int, t) (ResourceT IO) ()
forall (m :: * -> *) t. Monad m => ConduitT (Int, t) (Int, t) m ()
sort @(ResourceT IO)
ConduitT (Int, a) (Int, a) (ResourceT IO) ()
-> ConduitM (Int, a) Void (ResourceT IO) ()
-> ConduitM (Int, a) Void (ResourceT IO) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| ((Eq Int, Serialise a, Monad (ResourceT IO)) =>
ConduitM
(Int, a) ByteString (ResourceT IO) [(Int, (Integer, Integer))]
forall a k (m :: * -> *).
(Eq k, Serialise a, Monad m) =>
ConduitM (k, a) ByteString m [(k, (Integer, Integer))]
serialiseWithLocC @a @Int @(ResourceT IO) ConduitM
(Int, a) ByteString (ResourceT IO) [(Int, (Integer, Integer))]
-> ([(Int, (Integer, Integer))]
-> ConduitT (Int, a) ByteString (ResourceT IO) ())
-> ConduitT (Int, a) ByteString (ResourceT IO) ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= IO () -> ConduitT (Int, a) ByteString (ResourceT IO) ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ConduitT (Int, a) ByteString (ResourceT IO) ())
-> ([(Int, (Integer, Integer))] -> IO ())
-> [(Int, (Integer, Integer))]
-> ConduitT (Int, a) ByteString (ResourceT IO) ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IORef [(Int, (Integer, Integer))]
-> [(Int, (Integer, Integer))] -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef [(Int, (Integer, Integer))]
ref)
ConduitT (Int, a) ByteString (ResourceT IO) ()
-> ConduitT ByteString Void (ResourceT IO) ()
-> ConduitM (Int, a) Void (ResourceT IO) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| ConduitT ByteString ByteString (ResourceT IO) ()
forall (m :: * -> *).
MonadIO m =>
ConduitT ByteString ByteString m ()
eshUpload
ConduitT ByteString ByteString (ResourceT IO) ()
-> ConduitT ByteString Void (ResourceT IO) ()
-> ConduitT ByteString Void (ResourceT IO) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| ConduitT ByteString Void (ResourceT IO) ()
output
IORef [(Int, (Integer, Integer))] -> IO [(Int, (Integer, Integer))]
forall a. IORef a -> IO a
readIORef IORef [(Int, (Integer, Integer))]
ref
)
Closure
(Dict (Typeable a, Serialise a)
-> Int
-> ConduitT () a (ResourceT IO) ()
-> ConduitM a (Int, a) (ResourceT IO) ()
-> ConduitT ByteString Void (ResourceT IO) ()
-> IO (ExecutorResponse [(Int, (Integer, Integer))]))
-> Closure (Dict (Typeable a, Serialise a))
-> Closure
(Int
-> ConduitT () a (ResourceT IO) ()
-> ConduitM a (Int, a) (ResourceT IO) ()
-> ConduitT ByteString Void (ResourceT IO) ()
-> IO (ExecutorResponse [(Int, (Integer, Integer))]))
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` StaticSerialise a => Closure (Dict (Typeable a, Serialise a))
forall a.
StaticSerialise a =>
Closure (Dict (Typeable a, Serialise a))
staticSerialise @a
Closure
(Int
-> ConduitT () a (ResourceT IO) ()
-> ConduitM a (Int, a) (ResourceT IO) ()
-> ConduitT ByteString Void (ResourceT IO) ()
-> IO (ExecutorResponse [(Int, (Integer, Integer))]))
-> Closure Int
-> Closure
(ConduitT () a (ResourceT IO) ()
-> ConduitM a (Int, a) (ResourceT IO) ()
-> ConduitT ByteString Void (ResourceT IO) ()
-> IO (ExecutorResponse [(Int, (Integer, Integer))]))
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (Dict (Serializable Int)) -> Int -> Closure Int
forall a. Closure (Dict (Serializable a)) -> a -> Closure a
cpure (static Dict (Serializable Int)
forall (a :: Constraint). a => Dict a
Dict) Int
count
Closure
(ConduitT () a (ResourceT IO) ()
-> ConduitM a (Int, a) (ResourceT IO) ()
-> ConduitT ByteString Void (ResourceT IO) ()
-> IO (ExecutorResponse [(Int, (Integer, Integer))]))
-> Closure (ConduitT () a (ResourceT IO) ())
-> Closure
(ConduitM a (Int, a) (ResourceT IO) ()
-> ConduitT ByteString Void (ResourceT IO) ()
-> IO (ExecutorResponse [(Int, (Integer, Integer))]))
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Partition a -> Closure (ConduitT () a (ResourceT IO) ())
forall a.
Typeable a =>
Partition a -> Closure (ConduitT () a (ResourceT IO) ())
partitionProducer Partition a
partition
Closure
(ConduitM a (Int, a) (ResourceT IO) ()
-> ConduitT ByteString Void (ResourceT IO) ()
-> IO (ExecutorResponse [(Int, (Integer, Integer))]))
-> Closure (ConduitM a (Int, a) (ResourceT IO) ())
-> Closure
(ConduitT ByteString Void (ResourceT IO) ()
-> IO (ExecutorResponse [(Int, (Integer, Integer))]))
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (ConduitM a (Int, a) (ResourceT IO) ())
cpipe
Closure
(ConduitT ByteString Void (ResourceT IO) ()
-> IO (ExecutorResponse [(Int, (Integer, Integer))]))
-> Closure (ConduitT ByteString Void (ResourceT IO) ())
-> Closure (IO (ExecutorResponse [(Int, (Integer, Integer))]))
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (ConduitT ByteString Void (ResourceT IO) ())
coutput
(Closure (IO (ExecutorResponse [(Int, (Integer, Integer))])),
Int64)
-> DD
(Closure (IO (ExecutorResponse [(Int, (Integer, Integer))])),
Int64)
forall (m :: * -> *) a. Monad m => a -> m a
return (Closure (IO (ExecutorResponse [(Int, (Integer, Integer))]))
crun, Int64
num)
Backend
backend <- Getting Backend DDEnv Backend -> DD Backend
forall s (m :: * -> *) a. MonadReader s m => Getting a s a -> m a
view Getting Backend DDEnv Backend
Lens' DDEnv Backend
ddBackend
LogLevel
level <- Getting LogLevel DDEnv LogLevel -> DD LogLevel
forall s (m :: * -> *) a. MonadReader s m => Getting a s a -> m a
view Getting LogLevel DDEnv LogLevel
Lens' DDEnv LogLevel
ddLogLevel
let f :: Backend
-> Closure
(Dict
(Serializable (ExecutorResponse [(Int, (Integer, Integer))])))
-> [Closure (IO (ExecutorResponse [(Int, (Integer, Integer))]))]
-> IO [ExecutorResponse [(Int, (Integer, Integer))]]
f =
Options
-> Backend
-> Closure
(Dict
(Serializable (ExecutorResponse [(Int, (Integer, Integer))])))
-> [Closure (IO (ExecutorResponse [(Int, (Integer, Integer))]))]
-> IO [ExecutorResponse [(Int, (Integer, Integer))]]
forall a.
Options
-> Backend
-> Closure (Dict (Serializable a))
-> [Closure (IO a)]
-> IO [a]
D.forkConcurrently
( Options
D.defaultOptions
{ oShowProgress :: Bool
D.oShowProgress = LogLevel
level LogLevel -> LogLevel -> Bool
forall a. Ord a => a -> a -> Bool
<= LogLevel
LevelInfo,
oRetries :: Int
D.oRetries = Int
2
}
)
[ExecutorResponse [(Int, (Integer, Integer))]]
ret <- IO [ExecutorResponse [(Int, (Integer, Integer))]]
-> DD [ExecutorResponse [(Int, (Integer, Integer))]]
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [ExecutorResponse [(Int, (Integer, Integer))]]
-> DD [ExecutorResponse [(Int, (Integer, Integer))]])
-> IO [ExecutorResponse [(Int, (Integer, Integer))]]
-> DD [ExecutorResponse [(Int, (Integer, Integer))]]
forall a b. (a -> b) -> a -> b
$ Backend
-> Closure
(Dict
(Serializable (ExecutorResponse [(Int, (Integer, Integer))])))
-> [Closure (IO (ExecutorResponse [(Int, (Integer, Integer))]))]
-> IO [ExecutorResponse [(Int, (Integer, Integer))]]
f Backend
backend (static Dict (Serializable (ExecutorResponse [(Int, (Integer, Integer))]))
forall (a :: Constraint). a => Dict a
Dict) (((Closure (IO (ExecutorResponse [(Int, (Integer, Integer))])),
Int64)
-> Closure (IO (ExecutorResponse [(Int, (Integer, Integer))])))
-> [(Closure (IO (ExecutorResponse [(Int, (Integer, Integer))])),
Int64)]
-> [Closure (IO (ExecutorResponse [(Int, (Integer, Integer))]))]
forall a b. (a -> b) -> [a] -> [b]
map (Closure (IO (ExecutorResponse [(Int, (Integer, Integer))])),
Int64)
-> Closure (IO (ExecutorResponse [(Int, (Integer, Integer))]))
forall a b. (a, b) -> a
fst [(Closure (IO (ExecutorResponse [(Int, (Integer, Integer))])),
Int64)]
tasks)
Text -> DD ()
forall (m :: * -> *). MonadLogger m => Text -> m ()
logDebugN (Text -> DD ()) -> Text -> DD ()
forall a b. (a -> b) -> a -> b
$ Text
"Stats: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (ExecutorStats -> String
forall a. Show a => a -> String
show (ExecutorStats -> String) -> ExecutorStats -> String
forall a b. (a -> b) -> a -> b
$ (ExecutorResponse [(Int, (Integer, Integer))] -> ExecutorStats)
-> [ExecutorResponse [(Int, (Integer, Integer))]] -> ExecutorStats
forall (t :: * -> *) m a.
(Foldable t, Monoid m) =>
(a -> m) -> t a -> m
foldMap ExecutorResponse [(Int, (Integer, Integer))] -> ExecutorStats
forall a. ExecutorResponse a -> ExecutorStats
erStats [ExecutorResponse [(Int, (Integer, Integer))]]
ret)
let ret' :: [([(Int, (Integer, Integer))], Int64)]
ret' = [[(Int, (Integer, Integer))]]
-> [Int64] -> [([(Int, (Integer, Integer))], Int64)]
forall a b. [a] -> [b] -> [(a, b)]
zip ((ExecutorResponse [(Int, (Integer, Integer))]
-> [(Int, (Integer, Integer))])
-> [ExecutorResponse [(Int, (Integer, Integer))]]
-> [[(Int, (Integer, Integer))]]
forall a b. (a -> b) -> [a] -> [b]
map ExecutorResponse [(Int, (Integer, Integer))]
-> [(Int, (Integer, Integer))]
forall a. ExecutorResponse a -> a
erResponse [ExecutorResponse [(Int, (Integer, Integer))]]
ret) (((Closure (IO (ExecutorResponse [(Int, (Integer, Integer))])),
Int64)
-> Int64)
-> [(Closure (IO (ExecutorResponse [(Int, (Integer, Integer))])),
Int64)]
-> [Int64]
forall a b. (a -> b) -> [a] -> [b]
map (Closure (IO (ExecutorResponse [(Int, (Integer, Integer))])),
Int64)
-> Int64
forall a b. (a, b) -> b
snd [(Closure (IO (ExecutorResponse [(Int, (Integer, Integer))])),
Int64)]
tasks)
[[(Int, Partition a)]]
partitions <-
[([(Int, (Integer, Integer))], Int64)]
-> (([(Int, (Integer, Integer))], Int64)
-> DD [(Int, Partition a)])
-> DD [[(Int, Partition a)]]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [([(Int, (Integer, Integer))], Int64)]
ret' ((([(Int, (Integer, Integer))], Int64) -> DD [(Int, Partition a)])
-> DD [[(Int, Partition a)]])
-> (([(Int, (Integer, Integer))], Int64)
-> DD [(Int, Partition a)])
-> DD [[(Int, Partition a)]]
forall a b. (a -> b) -> a -> b
$ \([(Int, (Integer, Integer))]
res, Int64
num) ->
[(Int, (Integer, Integer))]
-> ((Int, (Integer, Integer)) -> DD (Int, Partition a))
-> DD [(Int, Partition a)]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [(Int, (Integer, Integer))]
res (((Int, (Integer, Integer)) -> DD (Int, Partition a))
-> DD [(Int, Partition a)])
-> ((Int, (Integer, Integer)) -> DD (Int, Partition a))
-> DD [(Int, Partition a)]
forall a b. (a -> b) -> a -> b
$ \(Int
partition, (Integer
start, Integer
end)) ->
(Int, Partition a) -> DD (Int, Partition a)
forall (m :: * -> *) a. Monad m => a -> m a
return
( Int
partition,
Closure (ConduitT () a (ResourceT IO) ()) -> Partition a
forall a. Closure (ConduitT () a (ResourceT IO) ()) -> Partition a
PSimple @a
( static (\Dict (Typeable a, Serialise a)
Dict ConduitT () ByteString (ResourceT IO) ()
input' -> ConduitT () ByteString (ResourceT IO) ()
input' ConduitT () ByteString (ResourceT IO) ()
-> ConduitM ByteString a (ResourceT IO) ()
-> ConduitT () a (ResourceT IO) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| ConduitM ByteString a (ResourceT IO) ()
forall a (m :: * -> *).
(Serialise a, MonadIO m) =>
ConduitM ByteString a m ()
deserialiseC)
Closure
(Dict (Typeable a, Serialise a)
-> ConduitT () ByteString (ResourceT IO) ()
-> ConduitT () a (ResourceT IO) ())
-> Closure (Dict (Typeable a, Serialise a))
-> Closure
(ConduitT () ByteString (ResourceT IO) ()
-> ConduitT () a (ResourceT IO) ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` StaticSerialise a => Closure (Dict (Typeable a, Serialise a))
forall a.
StaticSerialise a =>
Closure (Dict (Typeable a, Serialise a))
staticSerialise @a
Closure
(ConduitT () ByteString (ResourceT IO) ()
-> ConduitT () a (ResourceT IO) ())
-> Closure (ConduitT () ByteString (ResourceT IO) ())
-> Closure (ConduitT () a (ResourceT IO) ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` ( ShuffleStore
-> Closure
(Int64 -> Range -> ConduitT () ByteString (ResourceT IO) ())
ssGet ShuffleStore
shuffleStore
Closure
(Int64 -> Range -> ConduitT () ByteString (ResourceT IO) ())
-> Closure Int64
-> Closure (Range -> ConduitT () ByteString (ResourceT IO) ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (Dict (Serializable Int64)) -> Int64 -> Closure Int64
forall a. Closure (Dict (Serializable a)) -> a -> Closure a
cpure (static Dict (Serializable Int64)
forall (a :: Constraint). a => Dict a
Dict) Int64
num
Closure (Range -> ConduitT () ByteString (ResourceT IO) ())
-> Closure Range
-> Closure (ConduitT () ByteString (ResourceT IO) ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (Dict (Serializable Range)) -> Range -> Closure Range
forall a. Closure (Dict (Serializable a)) -> a -> Closure a
cpure (static Dict (Serializable Range)
forall (a :: Constraint). a => Dict a
Dict) (Integer -> Integer -> Range
RangeOnly Integer
start Integer
end)
)
)
)
([(Int, Partition a)] -> IntMap (Partition a))
-> [[(Int, Partition a)]] -> [IntMap (Partition a)]
forall a b. (a -> b) -> [a] -> [b]
map [(Int, Partition a)] -> IntMap (Partition a)
forall a. [(Int, a)] -> IntMap a
M.fromList [[(Int, Partition a)]]
partitions
[IntMap (Partition a)]
-> ([IntMap (Partition a)] -> IntMap (Partition a))
-> IntMap (Partition a)
forall a b. a -> (a -> b) -> b
& (IntMap (Partition a)
-> IntMap (Partition a) -> IntMap (Partition a))
-> IntMap (Partition a)
-> [IntMap (Partition a)]
-> IntMap (Partition a)
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl' (SimpleWhenMissing (Partition a) (Partition a)
-> SimpleWhenMissing (Partition a) (Partition a)
-> SimpleWhenMatched (Partition a) (Partition a) (Partition a)
-> IntMap (Partition a)
-> IntMap (Partition a)
-> IntMap (Partition a)
forall a c b.
SimpleWhenMissing a c
-> SimpleWhenMissing b c
-> SimpleWhenMatched a b c
-> IntMap a
-> IntMap b
-> IntMap c
M.merge SimpleWhenMissing (Partition a) (Partition a)
forall (f :: * -> *) x. Applicative f => WhenMissing f x x
M.preserveMissing SimpleWhenMissing (Partition a) (Partition a)
forall (f :: * -> *) x. Applicative f => WhenMissing f x x
M.preserveMissing ((Int -> Partition a -> Partition a -> Partition a)
-> SimpleWhenMatched (Partition a) (Partition a) (Partition a)
forall (f :: * -> *) x y z.
Applicative f =>
(Int -> x -> y -> z) -> WhenMatched f x y z
M.zipWithMatched ((Int -> Partition a -> Partition a -> Partition a)
-> SimpleWhenMatched (Partition a) (Partition a) (Partition a))
-> (Int -> Partition a -> Partition a -> Partition a)
-> SimpleWhenMatched (Partition a) (Partition a) (Partition a)
forall a b. (a -> b) -> a -> b
$ (Partition a -> Partition a -> Partition a)
-> Int -> Partition a -> Partition a -> Partition a
forall a b. a -> b -> a
const Partition a -> Partition a -> Partition a
forall a. Monoid a => a -> a -> a
mappend)) IntMap (Partition a)
forall a. IntMap a
M.empty
IntMap (Partition a)
-> (IntMap (Partition a) -> [(Int, Partition a)])
-> [(Int, Partition a)]
forall a b. a -> (a -> b) -> b
& IntMap (Partition a) -> [(Int, Partition a)]
forall a. IntMap a -> [(Int, a)]
M.toList
[(Int, Partition a)]
-> ([(Int, Partition a)] -> [Partition a]) -> [Partition a]
forall a b. a -> (a -> b) -> b
& ((Int, Partition a) -> Partition a)
-> [(Int, Partition a)] -> [Partition a]
forall a b. (a -> b) -> [a] -> [b]
map (Int, Partition a) -> Partition a
forall a b. (a, b) -> b
snd
[Partition a]
-> ([Partition a] -> DD [Partition a]) -> DD [Partition a]
forall a b. a -> (a -> b) -> b
& [Partition a] -> DD [Partition a]
forall (m :: * -> *) a. Monad m => a -> m a
return
where
sort :: Monad m => ConduitT (Int, t) (Int, t) m ()
sort :: ConduitT (Int, t) (Int, t) m ()
sort = ((Int, t) -> ConduitT (Int, t) (Int, t) m ())
-> [(Int, t)] -> ConduitT (Int, t) (Int, t) m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (Int, t) -> ConduitT (Int, t) (Int, t) m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield ([(Int, t)] -> ConduitT (Int, t) (Int, t) m ())
-> ([(Int, t)] -> [(Int, t)])
-> [(Int, t)]
-> ConduitT (Int, t) (Int, t) m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((Int, t) -> Int) -> [(Int, t)] -> [(Int, t)]
forall b a. Ord b => (a -> b) -> [a] -> [a]
sortOn (Int, t) -> Int
forall a b. (a, b) -> a
fst ([(Int, t)] -> ConduitT (Int, t) (Int, t) m ())
-> ConduitT (Int, t) (Int, t) m [(Int, t)]
-> ConduitT (Int, t) (Int, t) m ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< ConduitT (Int, t) (Int, t) m [(Int, t)]
forall (m :: * -> *) a o. Monad m => ConduitT a o m [a]
C.sinkList
runStages stage :: Stage a
stage@(SCoalesce Int
count Stage a
rest) = do
[Partition a]
inputs <- Stage a -> DD [Partition a]
forall a. Stage a -> DD [Partition a]
runStages Stage a
rest
Text -> DD ()
forall (m :: * -> *). MonadLogger m => Text -> m ()
logInfoN (Text -> DD ()) -> Text -> DD ()
forall a b. (a -> b) -> a -> b
$ Text
"Running: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (Stage a -> String
forall a. Stage a -> String
showTopStage Stage a
stage)
[Partition a] -> DD [Partition a]
forall (m :: * -> *) a. Monad m => a -> m a
return ([Partition a] -> DD [Partition a])
-> [Partition a] -> DD [Partition a]
forall a b. (a -> b) -> a -> b
$ ([Partition a] -> Partition a) -> [[Partition a]] -> [Partition a]
forall a b. (a -> b) -> [a] -> [b]
map [Partition a] -> Partition a
forall a. Monoid a => [a] -> a
mconcat ([[Partition a]] -> [Partition a])
-> [[Partition a]] -> [Partition a]
forall a b. (a -> b) -> a -> b
$ [[Partition a]] -> [[Partition a]]
forall a. [[a]] -> [[a]]
transpose (Int -> [Partition a] -> [[Partition a]]
forall e. Int -> [e] -> [[e]]
chunksOf Int
count [Partition a]
inputs)
dFetch ::
StaticSerialise a =>
Dataset a ->
DD (ConduitT () a (ResourceT IO) ())
dFetch :: Dataset a -> DD (ConduitT () a (ResourceT IO) ())
dFetch Dataset a
ds = do
let stages :: Stage a
stages = Dataset a -> Stage a
forall a. Dataset a -> Stage a
mkStages Dataset a
ds
Text -> DD ()
forall (m :: * -> *). MonadLogger m => Text -> m ()
logInfoN (Text -> DD ()) -> Text -> DD ()
forall a b. (a -> b) -> a -> b
$ Text
"Stages: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (Stage a -> String
forall a. Show a => a -> String
show Stage a
stages)
[Partition a]
out <- Stage a -> DD [Partition a]
forall a. Stage a -> DD [Partition a]
runStages Stage a
stages
ConduitT () a (ResourceT IO) ()
-> DD (ConduitT () a (ResourceT IO) ())
forall (m :: * -> *) a. Monad m => a -> m a
return (ConduitT () a (ResourceT IO) ()
-> DD (ConduitT () a (ResourceT IO) ()))
-> ConduitT () a (ResourceT IO) ()
-> DD (ConduitT () a (ResourceT IO) ())
forall a b. (a -> b) -> a -> b
$ (Partition a -> ConduitT () a (ResourceT IO) ())
-> [Partition a] -> ConduitT () a (ResourceT IO) ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (Closure (ConduitT () a (ResourceT IO) ())
-> ConduitT () a (ResourceT IO) ()
forall a. Closure a -> a
unclosure (Closure (ConduitT () a (ResourceT IO) ())
-> ConduitT () a (ResourceT IO) ())
-> (Partition a -> Closure (ConduitT () a (ResourceT IO) ()))
-> Partition a
-> ConduitT () a (ResourceT IO) ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Partition a -> Closure (ConduitT () a (ResourceT IO) ())
forall a.
Typeable a =>
Partition a -> Closure (ConduitT () a (ResourceT IO) ())
partitionProducer) [Partition a]
out
dToList ::
StaticSerialise a =>
Dataset a ->
DD [a]
dToList :: Dataset a -> DD [a]
dToList Dataset a
ds = do
ConduitT () a (ResourceT IO) ()
c <- Dataset a -> DD (ConduitT () a (ResourceT IO) ())
forall a.
StaticSerialise a =>
Dataset a -> DD (ConduitT () a (ResourceT IO) ())
dFetch Dataset a
ds
IO [a] -> DD [a]
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [a] -> DD [a]) -> IO [a] -> DD [a]
forall a b. (a -> b) -> a -> b
$ ConduitT () Void (ResourceT IO) [a] -> IO [a]
forall (m :: * -> *) r.
MonadUnliftIO m =>
ConduitT () Void (ResourceT m) r -> m r
runConduitRes (ConduitT () Void (ResourceT IO) [a] -> IO [a])
-> ConduitT () Void (ResourceT IO) [a] -> IO [a]
forall a b. (a -> b) -> a -> b
$ ConduitT () a (ResourceT IO) ()
c ConduitT () a (ResourceT IO) ()
-> ConduitM a Void (ResourceT IO) [a]
-> ConduitT () Void (ResourceT IO) [a]
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| ConduitM a Void (ResourceT IO) [a]
forall (m :: * -> *) a o. Monad m => ConduitT a o m [a]
sinkList