{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE RankNTypes #-}

module Control.Distributed.Fork.Utils
  ( forkConcurrently,
    Options (..),
    defaultOptions,
  )
where

import Control.Concurrent (threadDelay)
import Control.Concurrent.Async (async, wait, waitBoth)
import Control.Concurrent.STM
import Control.Distributed.Fork
import Control.Exception (throwIO)
import Control.Monad (forM, when)
import Control.Monad.IO.Class (liftIO)
import Control.Retry
import Data.Function (fix)
import Data.Functor
import Data.Group
import qualified System.Console.Terminal.Size as TS

data Options
  = Options
      { Options -> Int
oRetries :: Int,
        Options -> Bool
oShowProgress :: Bool
      }

defaultOptions :: Options
defaultOptions :: Options
defaultOptions = Int -> Bool -> Options
Options Int
2 Bool
True

data Progress
  = Progress
      { Progress -> Int
waiting :: Int,
        Progress -> Int
submitted :: Int,
        Progress -> Int
started :: Int,
        Progress -> Int
finished :: Int,
        Progress -> Int
retried :: Int
      }
  deriving (Progress -> Progress -> Bool
(Progress -> Progress -> Bool)
-> (Progress -> Progress -> Bool) -> Eq Progress
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Progress -> Progress -> Bool
$c/= :: Progress -> Progress -> Bool
== :: Progress -> Progress -> Bool
$c== :: Progress -> Progress -> Bool
Eq)

instance Semigroup Progress where
  Progress Int
a1 Int
b1 Int
c1 Int
d1 Int
e1 <> :: Progress -> Progress -> Progress
<> Progress Int
a2 Int
b2 Int
c2 Int
d2 Int
e2 =
    Int -> Int -> Int -> Int -> Int -> Progress
Progress (Int
a1 Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
a2) (Int
b1 Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
b2) (Int
c1 Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
c2) (Int
d1 Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
d2) (Int
e1 Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
e2)

instance Monoid Progress where
  mempty :: Progress
mempty = Int -> Int -> Int -> Int -> Int -> Progress
Progress Int
0 Int
0 Int
0 Int
0 Int
0

instance Group Progress where
  invert :: Progress -> Progress
invert (Progress Int
a Int
b Int
c Int
d Int
e) =
    Int -> Int -> Int -> Int -> Int -> Progress
Progress (Int -> Int
forall a. Num a => a -> a
negate Int
a) (Int -> Int
forall a. Num a => a -> a
negate Int
b) (Int -> Int
forall a. Num a => a -> a
negate Int
c) (Int -> Int
forall a. Num a => a -> a
negate Int
d) (Int -> Int
forall a. Num a => a -> a
negate Int
e)

-- |
-- Runs given closures concurrently using the 'Backend', showing a
-- progress bar.

-- Throws 'Execut orFailedException' if something fails.
forkConcurrently ::
  Options ->
  Backend ->
  Closure (Dict (Serializable a)) ->
  [Closure (IO a)] ->
  IO [a]
forkConcurrently :: Options
-> Backend
-> Closure (Dict (Serializable a))
-> [Closure (IO a)]
-> IO [a]
forkConcurrently Options
options Backend
backend Closure (Dict (Serializable a))
dict [Closure (IO a)]
xs = do
  TVar (Bool, Progress)
st <- STM (TVar (Bool, Progress)) -> IO (TVar (Bool, Progress))
forall a. STM a -> IO a
atomically (STM (TVar (Bool, Progress)) -> IO (TVar (Bool, Progress)))
-> STM (TVar (Bool, Progress)) -> IO (TVar (Bool, Progress))
forall a b. (a -> b) -> a -> b
$ (Bool, Progress) -> STM (TVar (Bool, Progress))
forall a. a -> STM (TVar a)
newTVar (Bool
True, Progress
forall a. Monoid a => a
mempty)
  [Async a]
asyncs <- [Closure (IO a)]
-> (Closure (IO a) -> IO (Async a)) -> IO [Async a]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [Closure (IO a)]
xs ((Closure (IO a) -> IO (Async a)) -> IO [Async a])
-> (Closure (IO a) -> IO (Async a)) -> IO [Async a]
forall a b. (a -> b) -> a -> b
$ TVar (Bool, Progress) -> Closure (IO a) -> IO (Async a)
updateThread TVar (Bool, Progress)
st
  Async [a]
result <- IO [a] -> IO (Async [a])
forall a. IO a -> IO (Async a)
async (IO [a] -> IO (Async [a])) -> IO [a] -> IO (Async [a])
forall a b. (a -> b) -> a -> b
$ (Async a -> IO a) -> [Async a] -> IO [a]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM Async a -> IO a
forall a. Async a -> IO a
wait [Async a]
asyncs
  Async ()
pbar <-
    if Options -> Bool
oShowProgress Options
options
      then TVar (Bool, Progress) -> Int -> IO (Async ())
progressThread TVar (Bool, Progress)
st ([Closure (IO a)] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [Closure (IO a)]
xs)
      else IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
  ([a], ()) -> [a]
forall a b. (a, b) -> a
fst (([a], ()) -> [a]) -> IO ([a], ()) -> IO [a]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Async [a] -> Async () -> IO ([a], ())
forall a b. Async a -> Async b -> IO (a, b)
waitBoth Async [a]
result Async ()
pbar
  where
    updateThread :: TVar (Bool, Progress) -> Closure (IO a) -> IO (Async a)
updateThread TVar (Bool, Progress)
st Closure (IO a)
act =
      IO a -> IO (Async a)
forall a. IO a -> IO (Async a)
async (IO a -> IO (Async a)) -> IO a -> IO (Async a)
forall a b. (a -> b) -> a -> b
$ do
        TVar Progress
progress <- Progress -> IO (TVar Progress)
forall a. a -> IO (TVar a)
newTVarIO Progress
forall a. Monoid a => a
mempty
        RetryPolicyM IO -> (RetryStatus -> IO a) -> IO a
forall (m :: * -> *) a.
(MonadIO m, MonadMask m) =>
RetryPolicyM m -> (RetryStatus -> m a) -> m a
recoverAll (Int -> RetryPolicy
limitRetries (Int -> RetryPolicy) -> Int -> RetryPolicy
forall a b. (a -> b) -> a -> b
$ Options -> Int
oRetries Options
options) ((RetryStatus -> IO a) -> IO a) -> (RetryStatus -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ \RetryStatus
retryStatus -> do
          Handle a
handle <- Backend
-> Closure (Dict (Serializable a))
-> Closure (IO a)
-> IO (Handle a)
forall (m :: * -> *) a.
MonadIO m =>
Backend
-> Closure (Dict (Serializable a))
-> Closure (IO a)
-> m (Handle a)
fork Backend
backend Closure (Dict (Serializable a))
dict Closure (IO a)
act
          (IO a -> IO a) -> IO a
forall a. (a -> a) -> a
fix ((IO a -> IO a) -> IO a) -> (IO a -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ \IO a
recurse -> do
            Maybe (ExecutorFinalStatus a)
res <-
              IO (Maybe (ExecutorFinalStatus a))
-> IO (Maybe (ExecutorFinalStatus a))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (ExecutorFinalStatus a))
 -> IO (Maybe (ExecutorFinalStatus a)))
-> IO (Maybe (ExecutorFinalStatus a))
-> IO (Maybe (ExecutorFinalStatus a))
forall a b. (a -> b) -> a -> b
$ STM (Maybe (ExecutorFinalStatus a))
-> IO (Maybe (ExecutorFinalStatus a))
forall a. STM a -> IO a
atomically (STM (Maybe (ExecutorFinalStatus a))
 -> IO (Maybe (ExecutorFinalStatus a)))
-> STM (Maybe (ExecutorFinalStatus a))
-> IO (Maybe (ExecutorFinalStatus a))
forall a b. (a -> b) -> a -> b
$ do
                Progress
old <- TVar Progress -> STM Progress
forall a. TVar a -> STM a
readTVar TVar Progress
progress
                let base :: Progress
base = Progress
forall a. Monoid a => a
mempty {retried :: Int
retried = RetryStatus -> Int
rsIterNumber RetryStatus
retryStatus}
                (Progress
new, Maybe (ExecutorFinalStatus a)
res) <-
                  Handle a -> STM (ExecutorStatus a)
forall a. Handle a -> STM (ExecutorStatus a)
pollHandle Handle a
handle STM (ExecutorStatus a)
-> (ExecutorStatus a -> (Progress, Maybe (ExecutorFinalStatus a)))
-> STM (Progress, Maybe (ExecutorFinalStatus a))
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \case
                    ExecutorPending (ExecutorWaiting Maybe Text
_) -> (Progress
base {waiting :: Int
waiting = Int
1}, Maybe (ExecutorFinalStatus a)
forall a. Maybe a
Nothing)
                    ExecutorPending (ExecutorSubmitted Maybe Text
_) -> (Progress
base {submitted :: Int
submitted = Int
1}, Maybe (ExecutorFinalStatus a)
forall a. Maybe a
Nothing)
                    ExecutorPending (ExecutorStarted Maybe Text
_) -> (Progress
base {started :: Int
started = Int
1}, Maybe (ExecutorFinalStatus a)
forall a. Maybe a
Nothing)
                    ExecutorFinished ExecutorFinalStatus a
fr -> (Progress
base {finished :: Int
finished = Int
1}, ExecutorFinalStatus a -> Maybe (ExecutorFinalStatus a)
forall a. a -> Maybe a
Just ExecutorFinalStatus a
fr)
                Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Progress
old Progress -> Progress -> Bool
forall a. Eq a => a -> a -> Bool
== Progress
new) STM ()
forall a. STM a
retry
                TVar Progress -> Progress -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Progress
progress Progress
new
                TVar (Bool, Progress)
-> ((Bool, Progress) -> (Bool, Progress)) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar TVar (Bool, Progress)
st (((Bool, Progress) -> (Bool, Progress)) -> STM ())
-> ((Bool, Progress) -> (Bool, Progress)) -> STM ()
forall a b. (a -> b) -> a -> b
$ \case
                  (Bool
_, Progress
m) -> (Bool
True, Progress
m Progress -> Progress -> Progress
forall a. Semigroup a => a -> a -> a
<> Progress -> Progress
forall m. Group m => m -> m
invert Progress
old Progress -> Progress -> Progress
forall a. Semigroup a => a -> a -> a
<> Progress
new)
                Maybe (ExecutorFinalStatus a)
-> STM (Maybe (ExecutorFinalStatus a))
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (ExecutorFinalStatus a)
res
            case Maybe (ExecutorFinalStatus a)
res of
              Just (ExecutorFailed Text
err) -> IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO a -> IO a)
-> (ExecutorFailedException -> IO a)
-> ExecutorFailedException
-> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ExecutorFailedException -> IO a
forall e a. Exception e => e -> IO a
throwIO (ExecutorFailedException -> IO a)
-> ExecutorFailedException -> IO a
forall a b. (a -> b) -> a -> b
$ Text -> ExecutorFailedException
ExecutorFailedException Text
err
              Just (ExecutorSucceeded a
x) -> a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return a
x
              Maybe (ExecutorFinalStatus a)
Nothing -> IO a
recurse
    progressThread :: TVar (Bool, Progress) -> Int -> IO (Async ())
progressThread TVar (Bool, Progress)
st Int
total = do
      Int
termWidth <- Int -> (Window Int -> Int) -> Maybe (Window Int) -> Int
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Int
40 Window Int -> Int
forall a. Window a -> a
TS.width (Maybe (Window Int) -> Int) -> IO (Maybe (Window Int)) -> IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO (Maybe (Window Int))
forall n. Integral n => IO (Maybe (Window n))
TS.size :: IO Int
      let ratio :: Double
ratio = Int -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
total Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/ Int -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int
termWidth Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
2) :: Double
      IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ()))
-> ((IO () -> IO ()) -> IO ()) -> (IO () -> IO ()) -> IO (Async ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (IO () -> IO ()) -> IO ()
forall a. (a -> a) -> a
fix ((IO () -> IO ()) -> IO (Async ()))
-> (IO () -> IO ()) -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ \IO ()
recurse -> do
        Progress
progress <-
          STM Progress -> IO Progress
forall a. STM a -> IO a
atomically (STM Progress -> IO Progress) -> STM Progress -> IO Progress
forall a b. (a -> b) -> a -> b
$
            TVar (Bool, Progress) -> STM (Bool, Progress)
forall a. TVar a -> STM a
readTVar TVar (Bool, Progress)
st
              STM (Bool, Progress)
-> ((Bool, Progress) -> STM Progress) -> STM Progress
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
                (Bool
False, Progress
_) -> STM Progress
forall a. STM a
retry
                (Bool
True, Progress
p) -> do
                  TVar (Bool, Progress) -> (Bool, Progress) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Bool, Progress)
st (Bool
False, Progress
p)
                  Progress -> STM Progress
forall (m :: * -> *) a. Monad m => a -> m a
return Progress
p
        let p :: a -> a -> [a]
p a
c a
n = Int -> a -> [a]
forall a. Int -> a -> [a]
replicate (Double -> Int
forall a b. (RealFrac a, Integral b) => a -> b
truncate (a -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral a
n Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/ Double
ratio)) a
c
        String -> IO ()
putStr (String -> IO ()) -> ([String] -> String) -> [String] -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [String] -> String
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat ([String] -> IO ()) -> [String] -> IO ()
forall a b. (a -> b) -> a -> b
$
          [ String
"\r",
            String
"[",
            Char -> Int -> String
forall a a. Integral a => a -> a -> [a]
p Char
'#' (Progress -> Int
finished Progress
progress),
            Char -> Int -> String
forall a a. Integral a => a -> a -> [a]
p Char
':' (Progress -> Int
started Progress
progress),
            Char -> Int -> String
forall a a. Integral a => a -> a -> [a]
p Char
'.' (Progress -> Int
submitted Progress
progress),
            Char -> Int -> String
forall a a. Integral a => a -> a -> [a]
p Char
' ' (Progress -> Int
waiting Progress
progress),
            String
"]"
          ]
        if Progress -> Int
finished Progress
progress Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
total
          then Int -> IO ()
threadDelay Int
10000 IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
recurse
          else String -> IO ()
putStrLn String
""