{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE RankNTypes #-}

module Control.Distributed.Dataset.Internal.Process where

import Conduit
import Data.Binary
import qualified Data.ByteString as BS
import Data.IORef
import GHC.Generics
import System.Clock

data ExecutorResponse a
  = ExecutorResponse
      { ExecutorResponse a -> ExecutorStats
erStats :: ExecutorStats,
        ExecutorResponse a -> a
erResponse :: a
      }
  deriving (Int -> ExecutorResponse a -> ShowS
[ExecutorResponse a] -> ShowS
ExecutorResponse a -> String
(Int -> ExecutorResponse a -> ShowS)
-> (ExecutorResponse a -> String)
-> ([ExecutorResponse a] -> ShowS)
-> Show (ExecutorResponse a)
forall a. Show a => Int -> ExecutorResponse a -> ShowS
forall a. Show a => [ExecutorResponse a] -> ShowS
forall a. Show a => ExecutorResponse a -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ExecutorResponse a] -> ShowS
$cshowList :: forall a. Show a => [ExecutorResponse a] -> ShowS
show :: ExecutorResponse a -> String
$cshow :: forall a. Show a => ExecutorResponse a -> String
showsPrec :: Int -> ExecutorResponse a -> ShowS
$cshowsPrec :: forall a. Show a => Int -> ExecutorResponse a -> ShowS
Show, (forall x. ExecutorResponse a -> Rep (ExecutorResponse a) x)
-> (forall x. Rep (ExecutorResponse a) x -> ExecutorResponse a)
-> Generic (ExecutorResponse a)
forall x. Rep (ExecutorResponse a) x -> ExecutorResponse a
forall x. ExecutorResponse a -> Rep (ExecutorResponse a) x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
forall a x. Rep (ExecutorResponse a) x -> ExecutorResponse a
forall a x. ExecutorResponse a -> Rep (ExecutorResponse a) x
$cto :: forall a x. Rep (ExecutorResponse a) x -> ExecutorResponse a
$cfrom :: forall a x. ExecutorResponse a -> Rep (ExecutorResponse a) x
Generic, Get (ExecutorResponse a)
[ExecutorResponse a] -> Put
ExecutorResponse a -> Put
(ExecutorResponse a -> Put)
-> Get (ExecutorResponse a)
-> ([ExecutorResponse a] -> Put)
-> Binary (ExecutorResponse a)
forall a. Binary a => Get (ExecutorResponse a)
forall a. Binary a => [ExecutorResponse a] -> Put
forall a. Binary a => ExecutorResponse a -> Put
forall t. (t -> Put) -> Get t -> ([t] -> Put) -> Binary t
putList :: [ExecutorResponse a] -> Put
$cputList :: forall a. Binary a => [ExecutorResponse a] -> Put
get :: Get (ExecutorResponse a)
$cget :: forall a. Binary a => Get (ExecutorResponse a)
put :: ExecutorResponse a -> Put
$cput :: forall a. Binary a => ExecutorResponse a -> Put
Binary)

data ExecutorStats
  = ExecutorStats
      { ExecutorStats -> Integer
esDownloadBytes :: Integer,
        ExecutorStats -> Integer
esUploadBytes :: Integer,
        ExecutorStats -> Integer
esInputItems :: Integer,
        ExecutorStats -> Integer
esOutputItems :: Integer,
        ExecutorStats -> Integer
esElapsedMillis :: Integer
      }
  deriving (Int -> ExecutorStats -> ShowS
[ExecutorStats] -> ShowS
ExecutorStats -> String
(Int -> ExecutorStats -> ShowS)
-> (ExecutorStats -> String)
-> ([ExecutorStats] -> ShowS)
-> Show ExecutorStats
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ExecutorStats] -> ShowS
$cshowList :: [ExecutorStats] -> ShowS
show :: ExecutorStats -> String
$cshow :: ExecutorStats -> String
showsPrec :: Int -> ExecutorStats -> ShowS
$cshowsPrec :: Int -> ExecutorStats -> ShowS
Show, (forall x. ExecutorStats -> Rep ExecutorStats x)
-> (forall x. Rep ExecutorStats x -> ExecutorStats)
-> Generic ExecutorStats
forall x. Rep ExecutorStats x -> ExecutorStats
forall x. ExecutorStats -> Rep ExecutorStats x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep ExecutorStats x -> ExecutorStats
$cfrom :: forall x. ExecutorStats -> Rep ExecutorStats x
Generic, Get ExecutorStats
[ExecutorStats] -> Put
ExecutorStats -> Put
(ExecutorStats -> Put)
-> Get ExecutorStats
-> ([ExecutorStats] -> Put)
-> Binary ExecutorStats
forall t. (t -> Put) -> Get t -> ([t] -> Put) -> Binary t
putList :: [ExecutorStats] -> Put
$cputList :: [ExecutorStats] -> Put
get :: Get ExecutorStats
$cget :: Get ExecutorStats
put :: ExecutorStats -> Put
$cput :: ExecutorStats -> Put
Binary)

instance Semigroup ExecutorStats where
  ExecutorStats Integer
a Integer
b Integer
c Integer
d Integer
e <> :: ExecutorStats -> ExecutorStats -> ExecutorStats
<> ExecutorStats Integer
a' Integer
b' Integer
c' Integer
d' Integer
e' =
    Integer
-> Integer -> Integer -> Integer -> Integer -> ExecutorStats
ExecutorStats (Integer
a Integer -> Integer -> Integer
forall a. Num a => a -> a -> a
+ Integer
a') (Integer
b Integer -> Integer -> Integer
forall a. Num a => a -> a -> a
+ Integer
b') (Integer
c Integer -> Integer -> Integer
forall a. Num a => a -> a -> a
+ Integer
c') (Integer
d Integer -> Integer -> Integer
forall a. Num a => a -> a -> a
+ Integer
d') (Integer
e Integer -> Integer -> Integer
forall a. Num a => a -> a -> a
+ Integer
e')

instance Monoid ExecutorStats where
  mempty :: ExecutorStats
mempty = Integer
-> Integer -> Integer -> Integer -> Integer -> ExecutorStats
ExecutorStats Integer
0 Integer
0 Integer
0 Integer
0 Integer
0

data ExecutorStatsHooks
  = ExecutorStatsHooks
      { ExecutorStatsHooks
-> forall (m :: * -> *).
   MonadIO m =>
   ConduitT ByteString ByteString m ()
eshDownload :: forall m. MonadIO m => ConduitT BS.ByteString BS.ByteString m (),
        ExecutorStatsHooks
-> forall (m :: * -> *).
   MonadIO m =>
   ConduitT ByteString ByteString m ()
eshUpload :: forall m. MonadIO m => ConduitT BS.ByteString BS.ByteString m (),
        ExecutorStatsHooks
-> forall a (m :: * -> *). MonadIO m => ConduitT a a m ()
eshInput :: forall a m. MonadIO m => ConduitT a a m (),
        ExecutorStatsHooks
-> forall a (m :: * -> *). MonadIO m => ConduitT a a m ()
eshOutput :: forall a m. MonadIO m => ConduitT a a m ()
      }

withExecutorStats :: (ExecutorStatsHooks -> IO a) -> IO (ExecutorResponse a)
withExecutorStats :: (ExecutorStatsHooks -> IO a) -> IO (ExecutorResponse a)
withExecutorStats ExecutorStatsHooks -> IO a
act = do
  IORef Integer
downloadRef <- Integer -> IO (IORef Integer)
forall a. a -> IO (IORef a)
newIORef Integer
0
  IORef Integer
uploadRef <- Integer -> IO (IORef Integer)
forall a. a -> IO (IORef a)
newIORef Integer
0
  IORef Integer
inputRef <- Integer -> IO (IORef Integer)
forall a. a -> IO (IORef a)
newIORef Integer
0
  IORef Integer
outputRef <- Integer -> IO (IORef Integer)
forall a. a -> IO (IORef a)
newIORef Integer
0
  let countBs :: IORef a -> ConduitT ByteString ByteString m ()
countBs IORef a
ref =
        (ByteString -> m ()) -> ConduitT ByteString ByteString m ()
forall (m :: * -> *) a. Monad m => (a -> m ()) -> ConduitT a a m ()
iterMC ((ByteString -> m ()) -> ConduitT ByteString ByteString m ())
-> (ByteString -> m ()) -> ConduitT ByteString ByteString m ()
forall a b. (a -> b) -> a -> b
$ \ByteString
bs ->
          IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef a -> (a -> a) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef IORef a
ref (a -> a -> a
forall a. Num a => a -> a -> a
+ Int -> a
forall a b. (Integral a, Num b) => a -> b
fromIntegral (ByteString -> Int
BS.length ByteString
bs))
      countIt :: IORef a -> ConduitT a a m ()
countIt IORef a
ref =
        (a -> m ()) -> ConduitT a a m ()
forall (m :: * -> *) a. Monad m => (a -> m ()) -> ConduitT a a m ()
iterMC ((a -> m ()) -> ConduitT a a m ())
-> (a -> m ()) -> ConduitT a a m ()
forall a b. (a -> b) -> a -> b
$ \a
_ ->
          IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef a -> (a -> a) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef IORef a
ref (a -> a -> a
forall a. Num a => a -> a -> a
+ a
1)
      hooks :: ExecutorStatsHooks
hooks =
        ExecutorStatsHooks :: (forall (m :: * -> *).
 MonadIO m =>
 ConduitT ByteString ByteString m ())
-> (forall (m :: * -> *).
    MonadIO m =>
    ConduitT ByteString ByteString m ())
-> (forall a (m :: * -> *). MonadIO m => ConduitT a a m ())
-> (forall a (m :: * -> *). MonadIO m => ConduitT a a m ())
-> ExecutorStatsHooks
ExecutorStatsHooks
          { eshDownload :: forall (m :: * -> *).
MonadIO m =>
ConduitT ByteString ByteString m ()
eshDownload = IORef Integer -> ConduitT ByteString ByteString m ()
forall (m :: * -> *) a.
(MonadIO m, Num a) =>
IORef a -> ConduitT ByteString ByteString m ()
countBs IORef Integer
downloadRef,
            eshUpload :: forall (m :: * -> *).
MonadIO m =>
ConduitT ByteString ByteString m ()
eshUpload = IORef Integer -> ConduitT ByteString ByteString m ()
forall (m :: * -> *) a.
(MonadIO m, Num a) =>
IORef a -> ConduitT ByteString ByteString m ()
countBs IORef Integer
uploadRef,
            eshInput :: forall a (m :: * -> *). MonadIO m => ConduitT a a m ()
eshInput = IORef Integer -> ConduitT a a m ()
forall (m :: * -> *) a a.
(MonadIO m, Num a) =>
IORef a -> ConduitT a a m ()
countIt IORef Integer
inputRef,
            eshOutput :: forall a (m :: * -> *). MonadIO m => ConduitT a a m ()
eshOutput = IORef Integer -> ConduitT a a m ()
forall (m :: * -> *) a a.
(MonadIO m, Num a) =>
IORef a -> ConduitT a a m ()
countIt IORef Integer
outputRef
          }
  TimeSpec
start <- Clock -> IO TimeSpec
getTime Clock
Monotonic
  a
ret <- ExecutorStatsHooks -> IO a
act ExecutorStatsHooks
hooks
  TimeSpec
end <- Clock -> IO TimeSpec
getTime Clock
Monotonic
  let elapsed :: Integer
elapsed = TimeSpec -> Integer
toNanoSecs (TimeSpec
end TimeSpec -> TimeSpec -> TimeSpec
forall a. Num a => a -> a -> a
- TimeSpec
start) Integer -> Integer -> Integer
forall a. Integral a => a -> a -> a
`div` Integer
1000000
  Integer
downloadBytes <- IORef Integer -> IO Integer
forall a. IORef a -> IO a
readIORef IORef Integer
downloadRef
  Integer
uploadBytes <- IORef Integer -> IO Integer
forall a. IORef a -> IO a
readIORef IORef Integer
uploadRef
  Integer
inputCount <- IORef Integer -> IO Integer
forall a. IORef a -> IO a
readIORef IORef Integer
inputRef
  Integer
outputCount <- IORef Integer -> IO Integer
forall a. IORef a -> IO a
readIORef IORef Integer
outputRef
  let stats :: ExecutorStats
stats =
        ExecutorStats :: Integer
-> Integer -> Integer -> Integer -> Integer -> ExecutorStats
ExecutorStats
          { esDownloadBytes :: Integer
esDownloadBytes = Integer
downloadBytes,
            esUploadBytes :: Integer
esUploadBytes = Integer
uploadBytes,
            esInputItems :: Integer
esInputItems = Integer
inputCount,
            esOutputItems :: Integer
esOutputItems = Integer
outputCount,
            esElapsedMillis :: Integer
esElapsedMillis = Integer
elapsed
          }
  ExecutorResponse a -> IO (ExecutorResponse a)
forall (m :: * -> *) a. Monad m => a -> m a
return (ExecutorResponse a -> IO (ExecutorResponse a))
-> ExecutorResponse a -> IO (ExecutorResponse a)
forall a b. (a -> b) -> a -> b
$ ExecutorStats -> a -> ExecutorResponse a
forall a. ExecutorStats -> a -> ExecutorResponse a
ExecutorResponse ExecutorStats
stats a
ret