-- See 'Control.Distributed.Fork.Local.localProcessBackend' for a minimal example.

-- |
-- You only need this module if you want to create a new backend for distributed-dataset.
module Control.Distributed.Fork.Backend
  ( -- * Writing a 'Backend'.
    Backend (..),
    BackendM,
    argExecutorMode,

    -- * Reporting status
    ExecutorFinalStatus (..),
    ExecutorStatus (..),
    ExecutorPendingStatus (..),
    waiting,
    waitingDesc,
    submitted,
    submittedDesc,
    started,
    startedDesc,

    -- * Utils
    throttledBackend,
    runBackend,
    toBackendStdin,
    fromBackendStdout,

    -- * Re-exports
    liftIO,
    getExecutablePath,
  )
where

import Control.Concurrent.Throttled
import Control.Distributed.Fork.Internal
import Control.Monad.IO.Class
import Control.Monad.Trans.Reader
import Data.Text (Text)
import System.Environment

pendingStatus :: ExecutorPendingStatus -> BackendM ()
pendingStatus :: ExecutorPendingStatus -> BackendM ()
pendingStatus ExecutorPendingStatus
s = ReaderT
  (ExecutorPendingStatus -> IO ())
  IO
  (ExecutorPendingStatus -> IO ())
-> BackendM (ExecutorPendingStatus -> IO ())
forall a.
ReaderT (ExecutorPendingStatus -> IO ()) IO a -> BackendM a
BackendM ReaderT
  (ExecutorPendingStatus -> IO ())
  IO
  (ExecutorPendingStatus -> IO ())
forall (m :: * -> *) r. Monad m => ReaderT r m r
ask BackendM (ExecutorPendingStatus -> IO ())
-> ((ExecutorPendingStatus -> IO ()) -> BackendM ()) -> BackendM ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= IO () -> BackendM ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> BackendM ())
-> ((ExecutorPendingStatus -> IO ()) -> IO ())
-> (ExecutorPendingStatus -> IO ())
-> BackendM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((ExecutorPendingStatus -> IO ()) -> ExecutorPendingStatus -> IO ()
forall a b. (a -> b) -> a -> b
$ ExecutorPendingStatus
s)

waiting :: BackendM ()
waiting :: BackendM ()
waiting = ExecutorPendingStatus -> BackendM ()
pendingStatus (ExecutorPendingStatus -> BackendM ())
-> ExecutorPendingStatus -> BackendM ()
forall a b. (a -> b) -> a -> b
$ Maybe Text -> ExecutorPendingStatus
ExecutorWaiting Maybe Text
forall a. Maybe a
Nothing

waitingDesc :: Text -> BackendM ()
waitingDesc :: Text -> BackendM ()
waitingDesc = ExecutorPendingStatus -> BackendM ()
pendingStatus (ExecutorPendingStatus -> BackendM ())
-> (Text -> ExecutorPendingStatus) -> Text -> BackendM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Maybe Text -> ExecutorPendingStatus
ExecutorWaiting (Maybe Text -> ExecutorPendingStatus)
-> (Text -> Maybe Text) -> Text -> ExecutorPendingStatus
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> Maybe Text
forall a. a -> Maybe a
Just

submitted :: BackendM ()
submitted :: BackendM ()
submitted = ExecutorPendingStatus -> BackendM ()
pendingStatus (ExecutorPendingStatus -> BackendM ())
-> ExecutorPendingStatus -> BackendM ()
forall a b. (a -> b) -> a -> b
$ Maybe Text -> ExecutorPendingStatus
ExecutorSubmitted Maybe Text
forall a. Maybe a
Nothing

submittedDesc :: Text -> BackendM ()
submittedDesc :: Text -> BackendM ()
submittedDesc = ExecutorPendingStatus -> BackendM ()
pendingStatus (ExecutorPendingStatus -> BackendM ())
-> (Text -> ExecutorPendingStatus) -> Text -> BackendM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Maybe Text -> ExecutorPendingStatus
ExecutorSubmitted (Maybe Text -> ExecutorPendingStatus)
-> (Text -> Maybe Text) -> Text -> ExecutorPendingStatus
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> Maybe Text
forall a. a -> Maybe a
Just

started :: BackendM ()
started :: BackendM ()
started = ExecutorPendingStatus -> BackendM ()
pendingStatus (ExecutorPendingStatus -> BackendM ())
-> ExecutorPendingStatus -> BackendM ()
forall a b. (a -> b) -> a -> b
$ Maybe Text -> ExecutorPendingStatus
ExecutorStarted Maybe Text
forall a. Maybe a
Nothing

startedDesc :: Text -> BackendM ()
startedDesc :: Text -> BackendM ()
startedDesc = ExecutorPendingStatus -> BackendM ()
pendingStatus (ExecutorPendingStatus -> BackendM ())
-> (Text -> ExecutorPendingStatus) -> Text -> BackendM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Maybe Text -> ExecutorPendingStatus
ExecutorStarted (Maybe Text -> ExecutorPendingStatus)
-> (Text -> Maybe Text) -> Text -> ExecutorPendingStatus
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> Maybe Text
forall a. a -> Maybe a
Just

throttledBackend :: MonadIO m => Int -> Backend -> m Backend
throttledBackend :: Int -> Backend -> m Backend
throttledBackend Int
limit (Backend ByteString -> BackendM ByteString
b) = do
  Throttle
t <- Int -> m Throttle
forall (m :: * -> *). MonadIO m => Int -> m Throttle
newThrottle Int
limit
  Backend -> m Backend
forall (m :: * -> *) a. Monad m => a -> m a
return (Backend -> m Backend) -> Backend -> m Backend
forall a b. (a -> b) -> a -> b
$ (ByteString -> BackendM ByteString) -> Backend
Backend (Throttle
-> forall (m :: * -> *) a. (MonadIO m, MonadMask m) => m a -> m a
throttled Throttle
t (BackendM ByteString -> BackendM ByteString)
-> (ByteString -> BackendM ByteString)
-> ByteString
-> BackendM ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> BackendM ByteString
b)