{-# LANGUAGE DeriveAnyClass #-} {-# LANGUAGE DeriveGeneric #-} {-# LANGUAGE RankNTypes #-} module Control.Distributed.Dataset.Internal.Process where import Conduit import Data.Binary import qualified Data.ByteString as BS import Data.IORef import GHC.Generics import System.Clock data ExecutorResponse a = ExecutorResponse { ExecutorResponse a -> ExecutorStats erStats :: ExecutorStats, ExecutorResponse a -> a erResponse :: a } deriving (Int -> ExecutorResponse a -> ShowS [ExecutorResponse a] -> ShowS ExecutorResponse a -> String (Int -> ExecutorResponse a -> ShowS) -> (ExecutorResponse a -> String) -> ([ExecutorResponse a] -> ShowS) -> Show (ExecutorResponse a) forall a. Show a => Int -> ExecutorResponse a -> ShowS forall a. Show a => [ExecutorResponse a] -> ShowS forall a. Show a => ExecutorResponse a -> String forall a. (Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a showList :: [ExecutorResponse a] -> ShowS $cshowList :: forall a. Show a => [ExecutorResponse a] -> ShowS show :: ExecutorResponse a -> String $cshow :: forall a. Show a => ExecutorResponse a -> String showsPrec :: Int -> ExecutorResponse a -> ShowS $cshowsPrec :: forall a. Show a => Int -> ExecutorResponse a -> ShowS Show, (forall x. ExecutorResponse a -> Rep (ExecutorResponse a) x) -> (forall x. Rep (ExecutorResponse a) x -> ExecutorResponse a) -> Generic (ExecutorResponse a) forall x. Rep (ExecutorResponse a) x -> ExecutorResponse a forall x. ExecutorResponse a -> Rep (ExecutorResponse a) x forall a. (forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a forall a x. Rep (ExecutorResponse a) x -> ExecutorResponse a forall a x. ExecutorResponse a -> Rep (ExecutorResponse a) x $cto :: forall a x. Rep (ExecutorResponse a) x -> ExecutorResponse a $cfrom :: forall a x. ExecutorResponse a -> Rep (ExecutorResponse a) x Generic, Get (ExecutorResponse a) [ExecutorResponse a] -> Put ExecutorResponse a -> Put (ExecutorResponse a -> Put) -> Get (ExecutorResponse a) -> ([ExecutorResponse a] -> Put) -> Binary (ExecutorResponse a) forall a. Binary a => Get (ExecutorResponse a) forall a. Binary a => [ExecutorResponse a] -> Put forall a. Binary a => ExecutorResponse a -> Put forall t. (t -> Put) -> Get t -> ([t] -> Put) -> Binary t putList :: [ExecutorResponse a] -> Put $cputList :: forall a. Binary a => [ExecutorResponse a] -> Put get :: Get (ExecutorResponse a) $cget :: forall a. Binary a => Get (ExecutorResponse a) put :: ExecutorResponse a -> Put $cput :: forall a. Binary a => ExecutorResponse a -> Put Binary) data ExecutorStats = ExecutorStats { ExecutorStats -> Integer esDownloadBytes :: Integer, ExecutorStats -> Integer esUploadBytes :: Integer, ExecutorStats -> Integer esInputItems :: Integer, ExecutorStats -> Integer esOutputItems :: Integer, ExecutorStats -> Integer esElapsedMillis :: Integer } deriving (Int -> ExecutorStats -> ShowS [ExecutorStats] -> ShowS ExecutorStats -> String (Int -> ExecutorStats -> ShowS) -> (ExecutorStats -> String) -> ([ExecutorStats] -> ShowS) -> Show ExecutorStats forall a. (Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a showList :: [ExecutorStats] -> ShowS $cshowList :: [ExecutorStats] -> ShowS show :: ExecutorStats -> String $cshow :: ExecutorStats -> String showsPrec :: Int -> ExecutorStats -> ShowS $cshowsPrec :: Int -> ExecutorStats -> ShowS Show, (forall x. ExecutorStats -> Rep ExecutorStats x) -> (forall x. Rep ExecutorStats x -> ExecutorStats) -> Generic ExecutorStats forall x. Rep ExecutorStats x -> ExecutorStats forall x. ExecutorStats -> Rep ExecutorStats x forall a. (forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a $cto :: forall x. Rep ExecutorStats x -> ExecutorStats $cfrom :: forall x. ExecutorStats -> Rep ExecutorStats x Generic, Get ExecutorStats [ExecutorStats] -> Put ExecutorStats -> Put (ExecutorStats -> Put) -> Get ExecutorStats -> ([ExecutorStats] -> Put) -> Binary ExecutorStats forall t. (t -> Put) -> Get t -> ([t] -> Put) -> Binary t putList :: [ExecutorStats] -> Put $cputList :: [ExecutorStats] -> Put get :: Get ExecutorStats $cget :: Get ExecutorStats put :: ExecutorStats -> Put $cput :: ExecutorStats -> Put Binary) instance Semigroup ExecutorStats where ExecutorStats Integer a Integer b Integer c Integer d Integer e <> :: ExecutorStats -> ExecutorStats -> ExecutorStats <> ExecutorStats Integer a' Integer b' Integer c' Integer d' Integer e' = Integer -> Integer -> Integer -> Integer -> Integer -> ExecutorStats ExecutorStats (Integer a Integer -> Integer -> Integer forall a. Num a => a -> a -> a + Integer a') (Integer b Integer -> Integer -> Integer forall a. Num a => a -> a -> a + Integer b') (Integer c Integer -> Integer -> Integer forall a. Num a => a -> a -> a + Integer c') (Integer d Integer -> Integer -> Integer forall a. Num a => a -> a -> a + Integer d') (Integer e Integer -> Integer -> Integer forall a. Num a => a -> a -> a + Integer e') instance Monoid ExecutorStats where mempty :: ExecutorStats mempty = Integer -> Integer -> Integer -> Integer -> Integer -> ExecutorStats ExecutorStats Integer 0 Integer 0 Integer 0 Integer 0 Integer 0 data ExecutorStatsHooks = ExecutorStatsHooks { ExecutorStatsHooks -> forall (m :: * -> *). MonadIO m => ConduitT ByteString ByteString m () eshDownload :: forall m. MonadIO m => ConduitT BS.ByteString BS.ByteString m (), ExecutorStatsHooks -> forall (m :: * -> *). MonadIO m => ConduitT ByteString ByteString m () eshUpload :: forall m. MonadIO m => ConduitT BS.ByteString BS.ByteString m (), ExecutorStatsHooks -> forall a (m :: * -> *). MonadIO m => ConduitT a a m () eshInput :: forall a m. MonadIO m => ConduitT a a m (), ExecutorStatsHooks -> forall a (m :: * -> *). MonadIO m => ConduitT a a m () eshOutput :: forall a m. MonadIO m => ConduitT a a m () } withExecutorStats :: (ExecutorStatsHooks -> IO a) -> IO (ExecutorResponse a) withExecutorStats :: (ExecutorStatsHooks -> IO a) -> IO (ExecutorResponse a) withExecutorStats ExecutorStatsHooks -> IO a act = do IORef Integer downloadRef <- Integer -> IO (IORef Integer) forall a. a -> IO (IORef a) newIORef Integer 0 IORef Integer uploadRef <- Integer -> IO (IORef Integer) forall a. a -> IO (IORef a) newIORef Integer 0 IORef Integer inputRef <- Integer -> IO (IORef Integer) forall a. a -> IO (IORef a) newIORef Integer 0 IORef Integer outputRef <- Integer -> IO (IORef Integer) forall a. a -> IO (IORef a) newIORef Integer 0 let countBs :: IORef a -> ConduitT ByteString ByteString m () countBs IORef a ref = (ByteString -> m ()) -> ConduitT ByteString ByteString m () forall (m :: * -> *) a. Monad m => (a -> m ()) -> ConduitT a a m () iterMC ((ByteString -> m ()) -> ConduitT ByteString ByteString m ()) -> (ByteString -> m ()) -> ConduitT ByteString ByteString m () forall a b. (a -> b) -> a -> b $ \ByteString bs -> IO () -> m () forall (m :: * -> *) a. MonadIO m => IO a -> m a liftIO (IO () -> m ()) -> IO () -> m () forall a b. (a -> b) -> a -> b $ IORef a -> (a -> a) -> IO () forall a. IORef a -> (a -> a) -> IO () modifyIORef IORef a ref (a -> a -> a forall a. Num a => a -> a -> a + Int -> a forall a b. (Integral a, Num b) => a -> b fromIntegral (ByteString -> Int BS.length ByteString bs)) countIt :: IORef a -> ConduitT a a m () countIt IORef a ref = (a -> m ()) -> ConduitT a a m () forall (m :: * -> *) a. Monad m => (a -> m ()) -> ConduitT a a m () iterMC ((a -> m ()) -> ConduitT a a m ()) -> (a -> m ()) -> ConduitT a a m () forall a b. (a -> b) -> a -> b $ \a _ -> IO () -> m () forall (m :: * -> *) a. MonadIO m => IO a -> m a liftIO (IO () -> m ()) -> IO () -> m () forall a b. (a -> b) -> a -> b $ IORef a -> (a -> a) -> IO () forall a. IORef a -> (a -> a) -> IO () modifyIORef IORef a ref (a -> a -> a forall a. Num a => a -> a -> a + a 1) hooks :: ExecutorStatsHooks hooks = ExecutorStatsHooks :: (forall (m :: * -> *). MonadIO m => ConduitT ByteString ByteString m ()) -> (forall (m :: * -> *). MonadIO m => ConduitT ByteString ByteString m ()) -> (forall a (m :: * -> *). MonadIO m => ConduitT a a m ()) -> (forall a (m :: * -> *). MonadIO m => ConduitT a a m ()) -> ExecutorStatsHooks ExecutorStatsHooks { eshDownload :: forall (m :: * -> *). MonadIO m => ConduitT ByteString ByteString m () eshDownload = IORef Integer -> ConduitT ByteString ByteString m () forall (m :: * -> *) a. (MonadIO m, Num a) => IORef a -> ConduitT ByteString ByteString m () countBs IORef Integer downloadRef, eshUpload :: forall (m :: * -> *). MonadIO m => ConduitT ByteString ByteString m () eshUpload = IORef Integer -> ConduitT ByteString ByteString m () forall (m :: * -> *) a. (MonadIO m, Num a) => IORef a -> ConduitT ByteString ByteString m () countBs IORef Integer uploadRef, eshInput :: forall a (m :: * -> *). MonadIO m => ConduitT a a m () eshInput = IORef Integer -> ConduitT a a m () forall (m :: * -> *) a a. (MonadIO m, Num a) => IORef a -> ConduitT a a m () countIt IORef Integer inputRef, eshOutput :: forall a (m :: * -> *). MonadIO m => ConduitT a a m () eshOutput = IORef Integer -> ConduitT a a m () forall (m :: * -> *) a a. (MonadIO m, Num a) => IORef a -> ConduitT a a m () countIt IORef Integer outputRef } TimeSpec start <- Clock -> IO TimeSpec getTime Clock Monotonic a ret <- ExecutorStatsHooks -> IO a act ExecutorStatsHooks hooks TimeSpec end <- Clock -> IO TimeSpec getTime Clock Monotonic let elapsed :: Integer elapsed = TimeSpec -> Integer toNanoSecs (TimeSpec end TimeSpec -> TimeSpec -> TimeSpec forall a. Num a => a -> a -> a - TimeSpec start) Integer -> Integer -> Integer forall a. Integral a => a -> a -> a `div` Integer 1000000 Integer downloadBytes <- IORef Integer -> IO Integer forall a. IORef a -> IO a readIORef IORef Integer downloadRef Integer uploadBytes <- IORef Integer -> IO Integer forall a. IORef a -> IO a readIORef IORef Integer uploadRef Integer inputCount <- IORef Integer -> IO Integer forall a. IORef a -> IO a readIORef IORef Integer inputRef Integer outputCount <- IORef Integer -> IO Integer forall a. IORef a -> IO a readIORef IORef Integer outputRef let stats :: ExecutorStats stats = ExecutorStats :: Integer -> Integer -> Integer -> Integer -> Integer -> ExecutorStats ExecutorStats { esDownloadBytes :: Integer esDownloadBytes = Integer downloadBytes, esUploadBytes :: Integer esUploadBytes = Integer uploadBytes, esInputItems :: Integer esInputItems = Integer inputCount, esOutputItems :: Integer esOutputItems = Integer outputCount, esElapsedMillis :: Integer esElapsedMillis = Integer elapsed } ExecutorResponse a -> IO (ExecutorResponse a) forall (m :: * -> *) a. Monad m => a -> m a return (ExecutorResponse a -> IO (ExecutorResponse a)) -> ExecutorResponse a -> IO (ExecutorResponse a) forall a b. (a -> b) -> a -> b $ ExecutorStats -> a -> ExecutorResponse a forall a. ExecutorStats -> a -> ExecutorResponse a ExecutorResponse ExecutorStats stats a ret