{-# LANGUAGE DeriveFunctor #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StaticPointers #-}
{-# LANGUAGE TypeApplications #-}

module Control.Distributed.Fork.Internal where

import Control.Concurrent
import Control.Concurrent.STM
import Control.Distributed.Closure
import Control.Monad.Catch
import Control.Monad.IO.Class
import Control.Monad.IO.Unlift
import Control.Monad.Trans.Reader
import Data.Binary
import Data.Binary.Zlib
import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as BL
import Data.Text (Text)
import qualified Data.Text as T
import Data.Void
import GHC.Generics
import System.Environment
import System.Exit
import System.IO (stdin)

-- |
-- We switch to executor mode only when  @argv[1] == argExecutorMode@.
argExecutorMode :: String
argExecutorMode :: String
argExecutorMode = String
"DISTRIBUTED_FORK_EXECUTOR_MODE"

-- |
-- On distributed-fork, we run the same binary both in your machine (called
-- "driver") and in the remote environment (called "executor"). In order for the
-- program to act according to where it is, you should call this function as the
-- first thing in your @main@:

-- @
-- main = do
--   initDistributedFork
--   ...
-- @
initDistributedFork :: IO ()
initDistributedFork :: IO ()
initDistributedFork =
  IO [String]
getArgs IO [String] -> ([String] -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    [String
x]
      | String
x String -> String -> Bool
forall a. Eq a => a -> a -> Bool
== String
argExecutorMode -> IO Void -> IO ()
forall (f :: * -> *) a. Functor f => f Void -> f a
vacuous IO Void
runExecutor
    [String]
_ -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- |
-- When on executor mode, we expect a serialised ExecutorClosure from `stdin`,
-- run it and exit.
runExecutor :: IO Void
runExecutor :: IO Void
runExecutor =
  Handle -> IO ByteString
BL.hGetContents Handle
stdin
    IO ByteString -> (ByteString -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Closure (IO ()) -> IO ()
forall a. Closure a -> a
unclosure
    (Closure (IO ()) -> IO ())
-> (ByteString -> Closure (IO ())) -> ByteString -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ZlibWrapper (Closure (IO ())) -> Closure (IO ())
forall a. ZlibWrapper a -> a
unZlibWrapper
    (ZlibWrapper (Closure (IO ())) -> Closure (IO ()))
-> (ByteString -> ZlibWrapper (Closure (IO ())))
-> ByteString
-> Closure (IO ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Binary (ZlibWrapper (Closure (IO ()))) =>
ByteString -> ZlibWrapper (Closure (IO ()))
forall a. Binary a => ByteString -> a
decode @ExecutorClosure
    IO () -> IO Void -> IO Void
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO Void
forall a. IO a
exitSuccess

-- |
-- An ExecutorClosure is a serialisable IO action.
type ExecutorClosure = ZlibWrapper (Closure (IO ()))

-- |
-- 'Backend' is responsible for running your functions in a remote environment.

-- See:

--   * 'Control.Distributed.Fork.Local.localProcessBackend'

--   * <http://hackage.haskell.org/package/distributed-dataset-aws distributed-dataset-aws>
newtype Backend
  = Backend
      { Backend -> ByteString -> BackendM ByteString
bExecute :: BS.ByteString -> BackendM BS.ByteString
      }
-- ^ Should run the current binary in the target environment, put the given
-- string as standard input and return the executables answer on the standard
-- output.
-- |
-- BackendM is essentially `IO`, but also has the ability to report the status of the
-- executor.

newtype BackendM a
  = BackendM (ReaderT (ExecutorPendingStatus -> IO ()) IO a)
  deriving
    ( a -> BackendM b -> BackendM a
(a -> b) -> BackendM a -> BackendM b
(forall a b. (a -> b) -> BackendM a -> BackendM b)
-> (forall a b. a -> BackendM b -> BackendM a) -> Functor BackendM
forall a b. a -> BackendM b -> BackendM a
forall a b. (a -> b) -> BackendM a -> BackendM b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: a -> BackendM b -> BackendM a
$c<$ :: forall a b. a -> BackendM b -> BackendM a
fmap :: (a -> b) -> BackendM a -> BackendM b
$cfmap :: forall a b. (a -> b) -> BackendM a -> BackendM b
Functor,
      Functor BackendM
a -> BackendM a
Functor BackendM
-> (forall a. a -> BackendM a)
-> (forall a b. BackendM (a -> b) -> BackendM a -> BackendM b)
-> (forall a b c.
    (a -> b -> c) -> BackendM a -> BackendM b -> BackendM c)
-> (forall a b. BackendM a -> BackendM b -> BackendM b)
-> (forall a b. BackendM a -> BackendM b -> BackendM a)
-> Applicative BackendM
BackendM a -> BackendM b -> BackendM b
BackendM a -> BackendM b -> BackendM a
BackendM (a -> b) -> BackendM a -> BackendM b
(a -> b -> c) -> BackendM a -> BackendM b -> BackendM c
forall a. a -> BackendM a
forall a b. BackendM a -> BackendM b -> BackendM a
forall a b. BackendM a -> BackendM b -> BackendM b
forall a b. BackendM (a -> b) -> BackendM a -> BackendM b
forall a b c.
(a -> b -> c) -> BackendM a -> BackendM b -> BackendM c
forall (f :: * -> *).
Functor f
-> (forall a. a -> f a)
-> (forall a b. f (a -> b) -> f a -> f b)
-> (forall a b c. (a -> b -> c) -> f a -> f b -> f c)
-> (forall a b. f a -> f b -> f b)
-> (forall a b. f a -> f b -> f a)
-> Applicative f
<* :: BackendM a -> BackendM b -> BackendM a
$c<* :: forall a b. BackendM a -> BackendM b -> BackendM a
*> :: BackendM a -> BackendM b -> BackendM b
$c*> :: forall a b. BackendM a -> BackendM b -> BackendM b
liftA2 :: (a -> b -> c) -> BackendM a -> BackendM b -> BackendM c
$cliftA2 :: forall a b c.
(a -> b -> c) -> BackendM a -> BackendM b -> BackendM c
<*> :: BackendM (a -> b) -> BackendM a -> BackendM b
$c<*> :: forall a b. BackendM (a -> b) -> BackendM a -> BackendM b
pure :: a -> BackendM a
$cpure :: forall a. a -> BackendM a
$cp1Applicative :: Functor BackendM
Applicative,
      Applicative BackendM
a -> BackendM a
Applicative BackendM
-> (forall a b. BackendM a -> (a -> BackendM b) -> BackendM b)
-> (forall a b. BackendM a -> BackendM b -> BackendM b)
-> (forall a. a -> BackendM a)
-> Monad BackendM
BackendM a -> (a -> BackendM b) -> BackendM b
BackendM a -> BackendM b -> BackendM b
forall a. a -> BackendM a
forall a b. BackendM a -> BackendM b -> BackendM b
forall a b. BackendM a -> (a -> BackendM b) -> BackendM b
forall (m :: * -> *).
Applicative m
-> (forall a b. m a -> (a -> m b) -> m b)
-> (forall a b. m a -> m b -> m b)
-> (forall a. a -> m a)
-> Monad m
return :: a -> BackendM a
$creturn :: forall a. a -> BackendM a
>> :: BackendM a -> BackendM b -> BackendM b
$c>> :: forall a b. BackendM a -> BackendM b -> BackendM b
>>= :: BackendM a -> (a -> BackendM b) -> BackendM b
$c>>= :: forall a b. BackendM a -> (a -> BackendM b) -> BackendM b
$cp1Monad :: Applicative BackendM
Monad,
      Monad BackendM
Monad BackendM
-> (forall a. IO a -> BackendM a) -> MonadIO BackendM
IO a -> BackendM a
forall a. IO a -> BackendM a
forall (m :: * -> *).
Monad m -> (forall a. IO a -> m a) -> MonadIO m
liftIO :: IO a -> BackendM a
$cliftIO :: forall a. IO a -> BackendM a
$cp1MonadIO :: Monad BackendM
MonadIO,
      MonadThrow BackendM
MonadThrow BackendM
-> (forall e a.
    Exception e =>
    BackendM a -> (e -> BackendM a) -> BackendM a)
-> MonadCatch BackendM
BackendM a -> (e -> BackendM a) -> BackendM a
forall e a.
Exception e =>
BackendM a -> (e -> BackendM a) -> BackendM a
forall (m :: * -> *).
MonadThrow m
-> (forall e a. Exception e => m a -> (e -> m a) -> m a)
-> MonadCatch m
catch :: BackendM a -> (e -> BackendM a) -> BackendM a
$ccatch :: forall e a.
Exception e =>
BackendM a -> (e -> BackendM a) -> BackendM a
$cp1MonadCatch :: MonadThrow BackendM
MonadCatch,
      Monad BackendM
e -> BackendM a
Monad BackendM
-> (forall e a. Exception e => e -> BackendM a)
-> MonadThrow BackendM
forall e a. Exception e => e -> BackendM a
forall (m :: * -> *).
Monad m -> (forall e a. Exception e => e -> m a) -> MonadThrow m
throwM :: e -> BackendM a
$cthrowM :: forall e a. Exception e => e -> BackendM a
$cp1MonadThrow :: Monad BackendM
MonadThrow,
      MonadCatch BackendM
MonadCatch BackendM
-> (forall b.
    ((forall a. BackendM a -> BackendM a) -> BackendM b) -> BackendM b)
-> (forall b.
    ((forall a. BackendM a -> BackendM a) -> BackendM b) -> BackendM b)
-> (forall a b c.
    BackendM a
    -> (a -> ExitCase b -> BackendM c)
    -> (a -> BackendM b)
    -> BackendM (b, c))
-> MonadMask BackendM
BackendM a
-> (a -> ExitCase b -> BackendM c)
-> (a -> BackendM b)
-> BackendM (b, c)
((forall a. BackendM a -> BackendM a) -> BackendM b) -> BackendM b
((forall a. BackendM a -> BackendM a) -> BackendM b) -> BackendM b
forall b.
((forall a. BackendM a -> BackendM a) -> BackendM b) -> BackendM b
forall a b c.
BackendM a
-> (a -> ExitCase b -> BackendM c)
-> (a -> BackendM b)
-> BackendM (b, c)
forall (m :: * -> *).
MonadCatch m
-> (forall b. ((forall a. m a -> m a) -> m b) -> m b)
-> (forall b. ((forall a. m a -> m a) -> m b) -> m b)
-> (forall a b c.
    m a -> (a -> ExitCase b -> m c) -> (a -> m b) -> m (b, c))
-> MonadMask m
generalBracket :: BackendM a
-> (a -> ExitCase b -> BackendM c)
-> (a -> BackendM b)
-> BackendM (b, c)
$cgeneralBracket :: forall a b c.
BackendM a
-> (a -> ExitCase b -> BackendM c)
-> (a -> BackendM b)
-> BackendM (b, c)
uninterruptibleMask :: ((forall a. BackendM a -> BackendM a) -> BackendM b) -> BackendM b
$cuninterruptibleMask :: forall b.
((forall a. BackendM a -> BackendM a) -> BackendM b) -> BackendM b
mask :: ((forall a. BackendM a -> BackendM a) -> BackendM b) -> BackendM b
$cmask :: forall b.
((forall a. BackendM a -> BackendM a) -> BackendM b) -> BackendM b
$cp1MonadMask :: MonadCatch BackendM
MonadMask,
      MonadIO BackendM
BackendM (UnliftIO BackendM)
MonadIO BackendM
-> BackendM (UnliftIO BackendM)
-> (forall b.
    ((forall a. BackendM a -> IO a) -> IO b) -> BackendM b)
-> MonadUnliftIO BackendM
((forall a. BackendM a -> IO a) -> IO b) -> BackendM b
forall b. ((forall a. BackendM a -> IO a) -> IO b) -> BackendM b
forall (m :: * -> *).
MonadIO m
-> m (UnliftIO m)
-> (forall b. ((forall a. m a -> IO a) -> IO b) -> m b)
-> MonadUnliftIO m
withRunInIO :: ((forall a. BackendM a -> IO a) -> IO b) -> BackendM b
$cwithRunInIO :: forall b. ((forall a. BackendM a -> IO a) -> IO b) -> BackendM b
askUnliftIO :: BackendM (UnliftIO BackendM)
$caskUnliftIO :: BackendM (UnliftIO BackendM)
$cp1MonadUnliftIO :: MonadIO BackendM
MonadUnliftIO
    )

instance Binary a => Binary (ExecutorFinalStatus a)

-- |
-- Given an IO action and a static proof that the result is 'Serializable', this
-- function runs the action using the Backend in a separate thread and returns a
-- 'TVar' holding the 'ExecutorStatus'.
runBackend ::
  Closure (Dict (Serializable i)) ->
  Closure (IO i) ->
  Backend ->
  IO (Handle i)
runBackend :: Closure (Dict (Serializable i))
-> Closure (IO i) -> Backend -> IO (Handle i)
runBackend Closure (Dict (Serializable i))
dict Closure (IO i)
cls (Backend ByteString -> BackendM ByteString
backend) =
  case Closure (Dict (Serializable i)) -> Dict (Serializable i)
forall a. Closure a -> a
unclosure Closure (Dict (Serializable i))
dict of
    Dict (Serializable i)
Dict -> do
      let BackendM ReaderT (ExecutorPendingStatus -> IO ()) IO ByteString
m =
            ByteString -> BackendM ByteString
backend (ByteString -> BackendM ByteString)
-> ByteString -> BackendM ByteString
forall a b. (a -> b) -> a -> b
$ Closure (Dict (Serializable i)) -> Closure (IO i) -> ByteString
forall a.
Closure (Dict (Serializable a)) -> Closure (IO a) -> ByteString
toBackendStdin Closure (Dict (Serializable i))
dict Closure (IO i)
cls
      TVar (ExecutorStatus i)
t <- STM (TVar (ExecutorStatus i)) -> IO (TVar (ExecutorStatus i))
forall a. STM a -> IO a
atomically (ExecutorStatus i -> STM (TVar (ExecutorStatus i))
forall a. a -> STM (TVar a)
newTVar (ExecutorStatus i -> STM (TVar (ExecutorStatus i)))
-> ExecutorStatus i -> STM (TVar (ExecutorStatus i))
forall a b. (a -> b) -> a -> b
$ ExecutorPendingStatus -> ExecutorStatus i
forall a. ExecutorPendingStatus -> ExecutorStatus a
ExecutorPending (Maybe Text -> ExecutorPendingStatus
ExecutorWaiting Maybe Text
forall a. Maybe a
Nothing))
      ThreadId
_ <-
        IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ do
          Either SomeException ByteString
answer <- IO ByteString -> IO (Either SomeException ByteString)
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
try (IO ByteString -> IO (Either SomeException ByteString))
-> IO ByteString -> IO (Either SomeException ByteString)
forall a b. (a -> b) -> a -> b
$ ReaderT (ExecutorPendingStatus -> IO ()) IO ByteString
-> (ExecutorPendingStatus -> IO ()) -> IO ByteString
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT ReaderT (ExecutorPendingStatus -> IO ()) IO ByteString
m (STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ())
-> (ExecutorPendingStatus -> STM ())
-> ExecutorPendingStatus
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TVar (ExecutorStatus i) -> ExecutorStatus i -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (ExecutorStatus i)
t (ExecutorStatus i -> STM ())
-> (ExecutorPendingStatus -> ExecutorStatus i)
-> ExecutorPendingStatus
-> STM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ExecutorPendingStatus -> ExecutorStatus i
forall a. ExecutorPendingStatus -> ExecutorStatus a
ExecutorPending)
          let r :: ExecutorFinalStatus i
r =
                (SomeException -> ExecutorFinalStatus i)
-> (ByteString -> ExecutorFinalStatus i)
-> Either SomeException ByteString
-> ExecutorFinalStatus i
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either
                  ( \(SomeException
err :: SomeException) ->
                      Text -> ExecutorFinalStatus i
forall a. Text -> ExecutorFinalStatus a
ExecutorFailed (Text -> ExecutorFinalStatus i) -> Text -> ExecutorFinalStatus i
forall a b. (a -> b) -> a -> b
$
                        Text
"Backend threw an exception: "
                          Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (SomeException -> String
forall a. Show a => a -> String
show SomeException
err)
                  )
                  ByteString -> ExecutorFinalStatus i
forall a. Binary a => ByteString -> ExecutorFinalStatus a
fromBackendStdout
                  Either SomeException ByteString
answer
          STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (ExecutorStatus i) -> ExecutorStatus i -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (ExecutorStatus i)
t (ExecutorFinalStatus i -> ExecutorStatus i
forall a. ExecutorFinalStatus a -> ExecutorStatus a
ExecutorFinished ExecutorFinalStatus i
r)
          () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
      Handle i -> IO (Handle i)
forall (m :: * -> *) a. Monad m => a -> m a
return (Handle i -> IO (Handle i)) -> Handle i -> IO (Handle i)
forall a b. (a -> b) -> a -> b
$ TVar (ExecutorStatus i) -> Handle i
forall a. TVar (ExecutorStatus a) -> Handle a
Handle TVar (ExecutorStatus i)
t

toBackendStdin :: Closure (Dict (Serializable a)) -> Closure (IO a) -> BS.ByteString
toBackendStdin :: Closure (Dict (Serializable a)) -> Closure (IO a) -> ByteString
toBackendStdin Closure (Dict (Serializable a))
dict Closure (IO a)
cls =
  case Closure (Dict (Serializable a)) -> Dict (Serializable a)
forall a. Closure a -> a
unclosure Closure (Dict (Serializable a))
dict of
    Dict (Serializable a)
Dict -> ByteString -> ByteString
BL.toStrict (ByteString -> ByteString)
-> (Closure (IO ()) -> ByteString) -> Closure (IO ()) -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ZlibWrapper (Closure (IO ())) -> ByteString
forall a. Binary a => a -> ByteString
encode (ZlibWrapper (Closure (IO ())) -> ByteString)
-> (Closure (IO ()) -> ZlibWrapper (Closure (IO ())))
-> Closure (IO ())
-> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Closure (IO ()) -> ZlibWrapper (Closure (IO ()))
forall a. a -> ZlibWrapper a
ZlibWrapper (Closure (IO ()) -> ByteString) -> Closure (IO ()) -> ByteString
forall a b. (a -> b) -> a -> b
$ static Dict (Serializable a) -> IO a -> IO ()
forall a. Dict (Serializable a) -> IO a -> IO ()
run Closure (Dict (Serializable a) -> IO a -> IO ())
-> Closure (Dict (Serializable a)) -> Closure (IO a -> IO ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (Dict (Serializable a))
dict Closure (IO a -> IO ()) -> Closure (IO a) -> Closure (IO ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (IO a)
cls
  where
    run :: forall a. Dict (Serializable a) -> IO a -> IO ()
    run :: Dict (Serializable a) -> IO a -> IO ()
run Dict (Serializable a)
Dict IO a
a =
      (IO a
a IO a -> (a -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ByteString -> IO ()
BL.putStr (ByteString -> IO ()) -> (a -> ByteString) -> a -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ExecutorFinalStatus a -> ByteString
forall a. Binary a => a -> ByteString
encode (ExecutorFinalStatus a -> ByteString)
-> (a -> ExecutorFinalStatus a) -> a -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> ExecutorFinalStatus a
forall a. a -> ExecutorFinalStatus a
ExecutorSucceeded)
        IO () -> (SomeException -> IO ()) -> IO ()
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> (e -> m a) -> m a
`catch` ( \(SomeException
ex :: SomeException) ->
                    ByteString -> IO ()
BL.putStr (ByteString -> IO ()) -> (Text -> ByteString) -> Text -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ExecutorFinalStatus a -> ByteString
forall a. Binary a => a -> ByteString
encode (ExecutorFinalStatus a -> ByteString)
-> (Text -> ExecutorFinalStatus a) -> Text -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> ExecutorFinalStatus a
forall a. Text -> ExecutorFinalStatus a
ExecutorFailed @a (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$
                      Text
"Exception from executor: "
                        Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (SomeException -> String
forall a. Show a => a -> String
show SomeException
ex)
                )

fromBackendStdout :: Binary a => BS.ByteString -> ExecutorFinalStatus a
fromBackendStdout :: ByteString -> ExecutorFinalStatus a
fromBackendStdout ByteString
bs =
  case ByteString
-> Either
     (ByteString, ByteOffset, String)
     (ByteString, ByteOffset, ExecutorFinalStatus a)
forall a.
Binary a =>
ByteString
-> Either
     (ByteString, ByteOffset, String) (ByteString, ByteOffset, a)
decodeOrFail (ByteString -> ByteString
BL.fromStrict ByteString
bs) of
    Left (ByteString
_, ByteOffset
_, String
err) -> Text -> ExecutorFinalStatus a
forall a. Text -> ExecutorFinalStatus a
ExecutorFailed (Text -> ExecutorFinalStatus a) -> Text -> ExecutorFinalStatus a
forall a b. (a -> b) -> a -> b
$ Text
"Error decoding answer: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack String
err
    Right (ByteString
_, ByteOffset
_, ExecutorFinalStatus a
a) -> ExecutorFinalStatus a
a

data ExecutorStatus a
  = ExecutorPending ExecutorPendingStatus
  | ExecutorFinished (ExecutorFinalStatus a)
  deriving (ExecutorStatus a -> ExecutorStatus a -> Bool
(ExecutorStatus a -> ExecutorStatus a -> Bool)
-> (ExecutorStatus a -> ExecutorStatus a -> Bool)
-> Eq (ExecutorStatus a)
forall a. Eq a => ExecutorStatus a -> ExecutorStatus a -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: ExecutorStatus a -> ExecutorStatus a -> Bool
$c/= :: forall a. Eq a => ExecutorStatus a -> ExecutorStatus a -> Bool
== :: ExecutorStatus a -> ExecutorStatus a -> Bool
$c== :: forall a. Eq a => ExecutorStatus a -> ExecutorStatus a -> Bool
Eq, a -> ExecutorStatus b -> ExecutorStatus a
(a -> b) -> ExecutorStatus a -> ExecutorStatus b
(forall a b. (a -> b) -> ExecutorStatus a -> ExecutorStatus b)
-> (forall a b. a -> ExecutorStatus b -> ExecutorStatus a)
-> Functor ExecutorStatus
forall a b. a -> ExecutorStatus b -> ExecutorStatus a
forall a b. (a -> b) -> ExecutorStatus a -> ExecutorStatus b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: a -> ExecutorStatus b -> ExecutorStatus a
$c<$ :: forall a b. a -> ExecutorStatus b -> ExecutorStatus a
fmap :: (a -> b) -> ExecutorStatus a -> ExecutorStatus b
$cfmap :: forall a b. (a -> b) -> ExecutorStatus a -> ExecutorStatus b
Functor)

data ExecutorPendingStatus
  = ExecutorWaiting (Maybe Text)
  | ExecutorSubmitted (Maybe Text)
  | ExecutorStarted (Maybe Text)
  deriving (ExecutorPendingStatus -> ExecutorPendingStatus -> Bool
(ExecutorPendingStatus -> ExecutorPendingStatus -> Bool)
-> (ExecutorPendingStatus -> ExecutorPendingStatus -> Bool)
-> Eq ExecutorPendingStatus
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: ExecutorPendingStatus -> ExecutorPendingStatus -> Bool
$c/= :: ExecutorPendingStatus -> ExecutorPendingStatus -> Bool
== :: ExecutorPendingStatus -> ExecutorPendingStatus -> Bool
$c== :: ExecutorPendingStatus -> ExecutorPendingStatus -> Bool
Eq)

data ExecutorFinalStatus a
  = ExecutorFailed Text
  | ExecutorSucceeded a
  deriving (ExecutorFinalStatus a -> ExecutorFinalStatus a -> Bool
(ExecutorFinalStatus a -> ExecutorFinalStatus a -> Bool)
-> (ExecutorFinalStatus a -> ExecutorFinalStatus a -> Bool)
-> Eq (ExecutorFinalStatus a)
forall a.
Eq a =>
ExecutorFinalStatus a -> ExecutorFinalStatus a -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: ExecutorFinalStatus a -> ExecutorFinalStatus a -> Bool
$c/= :: forall a.
Eq a =>
ExecutorFinalStatus a -> ExecutorFinalStatus a -> Bool
== :: ExecutorFinalStatus a -> ExecutorFinalStatus a -> Bool
$c== :: forall a.
Eq a =>
ExecutorFinalStatus a -> ExecutorFinalStatus a -> Bool
Eq, (forall x. ExecutorFinalStatus a -> Rep (ExecutorFinalStatus a) x)
-> (forall x.
    Rep (ExecutorFinalStatus a) x -> ExecutorFinalStatus a)
-> Generic (ExecutorFinalStatus a)
forall x. Rep (ExecutorFinalStatus a) x -> ExecutorFinalStatus a
forall x. ExecutorFinalStatus a -> Rep (ExecutorFinalStatus a) x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
forall a x. Rep (ExecutorFinalStatus a) x -> ExecutorFinalStatus a
forall a x. ExecutorFinalStatus a -> Rep (ExecutorFinalStatus a) x
$cto :: forall a x. Rep (ExecutorFinalStatus a) x -> ExecutorFinalStatus a
$cfrom :: forall a x. ExecutorFinalStatus a -> Rep (ExecutorFinalStatus a) x
Generic, a -> ExecutorFinalStatus b -> ExecutorFinalStatus a
(a -> b) -> ExecutorFinalStatus a -> ExecutorFinalStatus b
(forall a b.
 (a -> b) -> ExecutorFinalStatus a -> ExecutorFinalStatus b)
-> (forall a b.
    a -> ExecutorFinalStatus b -> ExecutorFinalStatus a)
-> Functor ExecutorFinalStatus
forall a b. a -> ExecutorFinalStatus b -> ExecutorFinalStatus a
forall a b.
(a -> b) -> ExecutorFinalStatus a -> ExecutorFinalStatus b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: a -> ExecutorFinalStatus b -> ExecutorFinalStatus a
$c<$ :: forall a b. a -> ExecutorFinalStatus b -> ExecutorFinalStatus a
fmap :: (a -> b) -> ExecutorFinalStatus a -> ExecutorFinalStatus b
$cfmap :: forall a b.
(a -> b) -> ExecutorFinalStatus a -> ExecutorFinalStatus b
Functor)

-- |
-- Result of a 'fork' is an Handle where you can 'await' a result.
newtype Handle a = Handle (TVar (ExecutorStatus a))

-- |
-- Get the current status of given 'Handle'.
pollHandle :: Handle a -> STM (ExecutorStatus a)
pollHandle :: Handle a -> STM (ExecutorStatus a)
pollHandle (Handle TVar (ExecutorStatus a)
t) = TVar (ExecutorStatus a) -> STM (ExecutorStatus a)
forall a. TVar a -> STM a
readTVar TVar (ExecutorStatus a)
t

tryAwait :: Handle a -> IO (Either Text a)
tryAwait :: Handle a -> IO (Either Text a)
tryAwait Handle a
h = do
  ExecutorFinalStatus a
r <-
    IO (ExecutorFinalStatus a) -> IO (ExecutorFinalStatus a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (ExecutorFinalStatus a) -> IO (ExecutorFinalStatus a))
-> (STM (ExecutorFinalStatus a) -> IO (ExecutorFinalStatus a))
-> STM (ExecutorFinalStatus a)
-> IO (ExecutorFinalStatus a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM (ExecutorFinalStatus a) -> IO (ExecutorFinalStatus a)
forall a. STM a -> IO a
atomically (STM (ExecutorFinalStatus a) -> IO (ExecutorFinalStatus a))
-> STM (ExecutorFinalStatus a) -> IO (ExecutorFinalStatus a)
forall a b. (a -> b) -> a -> b
$
      Handle a -> STM (ExecutorStatus a)
forall a. Handle a -> STM (ExecutorStatus a)
pollHandle Handle a
h STM (ExecutorStatus a)
-> (ExecutorStatus a -> STM (ExecutorFinalStatus a))
-> STM (ExecutorFinalStatus a)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
        ExecutorPending ExecutorPendingStatus
_ -> STM (ExecutorFinalStatus a)
forall a. STM a
retry
        ExecutorFinished ExecutorFinalStatus a
a -> ExecutorFinalStatus a -> STM (ExecutorFinalStatus a)
forall (m :: * -> *) a. Monad m => a -> m a
return ExecutorFinalStatus a
a
  Either Text a -> IO (Either Text a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Either Text a -> IO (Either Text a))
-> Either Text a -> IO (Either Text a)
forall a b. (a -> b) -> a -> b
$ case ExecutorFinalStatus a
r of
    ExecutorFailed Text
err -> Text -> Either Text a
forall a b. a -> Either a b
Left Text
err
    ExecutorSucceeded a
a -> a -> Either Text a
forall a b. b -> Either a b
Right a
a