{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE RankNTypes #-}
module Control.Distributed.Fork.Utils
( forkConcurrently,
Options (..),
defaultOptions,
)
where
import Control.Concurrent (threadDelay)
import Control.Concurrent.Async (async, wait, waitBoth)
import Control.Concurrent.STM
import Control.Distributed.Fork
import Control.Exception (throwIO)
import Control.Monad (forM, when)
import Control.Monad.IO.Class (liftIO)
import Control.Retry
import Data.Function (fix)
import Data.Functor
import Data.Group
import qualified System.Console.Terminal.Size as TS
data Options
= Options
{ Options -> Int
oRetries :: Int,
Options -> Bool
oShowProgress :: Bool
}
defaultOptions :: Options
defaultOptions :: Options
defaultOptions = Int -> Bool -> Options
Options Int
2 Bool
True
data Progress
= Progress
{ Progress -> Int
waiting :: Int,
Progress -> Int
submitted :: Int,
Progress -> Int
started :: Int,
Progress -> Int
finished :: Int,
Progress -> Int
retried :: Int
}
deriving (Progress -> Progress -> Bool
(Progress -> Progress -> Bool)
-> (Progress -> Progress -> Bool) -> Eq Progress
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Progress -> Progress -> Bool
$c/= :: Progress -> Progress -> Bool
== :: Progress -> Progress -> Bool
$c== :: Progress -> Progress -> Bool
Eq)
instance Semigroup Progress where
Progress Int
a1 Int
b1 Int
c1 Int
d1 Int
e1 <> :: Progress -> Progress -> Progress
<> Progress Int
a2 Int
b2 Int
c2 Int
d2 Int
e2 =
Int -> Int -> Int -> Int -> Int -> Progress
Progress (Int
a1 Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
a2) (Int
b1 Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
b2) (Int
c1 Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
c2) (Int
d1 Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
d2) (Int
e1 Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
e2)
instance Monoid Progress where
mempty :: Progress
mempty = Int -> Int -> Int -> Int -> Int -> Progress
Progress Int
0 Int
0 Int
0 Int
0 Int
0
instance Group Progress where
invert :: Progress -> Progress
invert (Progress Int
a Int
b Int
c Int
d Int
e) =
Int -> Int -> Int -> Int -> Int -> Progress
Progress (Int -> Int
forall a. Num a => a -> a
negate Int
a) (Int -> Int
forall a. Num a => a -> a
negate Int
b) (Int -> Int
forall a. Num a => a -> a
negate Int
c) (Int -> Int
forall a. Num a => a -> a
negate Int
d) (Int -> Int
forall a. Num a => a -> a
negate Int
e)
forkConcurrently ::
Options ->
Backend ->
Closure (Dict (Serializable a)) ->
[Closure (IO a)] ->
IO [a]
forkConcurrently :: Options
-> Backend
-> Closure (Dict (Serializable a))
-> [Closure (IO a)]
-> IO [a]
forkConcurrently Options
options Backend
backend Closure (Dict (Serializable a))
dict [Closure (IO a)]
xs = do
TVar (Bool, Progress)
st <- STM (TVar (Bool, Progress)) -> IO (TVar (Bool, Progress))
forall a. STM a -> IO a
atomically (STM (TVar (Bool, Progress)) -> IO (TVar (Bool, Progress)))
-> STM (TVar (Bool, Progress)) -> IO (TVar (Bool, Progress))
forall a b. (a -> b) -> a -> b
$ (Bool, Progress) -> STM (TVar (Bool, Progress))
forall a. a -> STM (TVar a)
newTVar (Bool
True, Progress
forall a. Monoid a => a
mempty)
[Async a]
asyncs <- [Closure (IO a)]
-> (Closure (IO a) -> IO (Async a)) -> IO [Async a]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [Closure (IO a)]
xs ((Closure (IO a) -> IO (Async a)) -> IO [Async a])
-> (Closure (IO a) -> IO (Async a)) -> IO [Async a]
forall a b. (a -> b) -> a -> b
$ TVar (Bool, Progress) -> Closure (IO a) -> IO (Async a)
updateThread TVar (Bool, Progress)
st
Async [a]
result <- IO [a] -> IO (Async [a])
forall a. IO a -> IO (Async a)
async (IO [a] -> IO (Async [a])) -> IO [a] -> IO (Async [a])
forall a b. (a -> b) -> a -> b
$ (Async a -> IO a) -> [Async a] -> IO [a]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM Async a -> IO a
forall a. Async a -> IO a
wait [Async a]
asyncs
Async ()
pbar <-
if Options -> Bool
oShowProgress Options
options
then TVar (Bool, Progress) -> Int -> IO (Async ())
progressThread TVar (Bool, Progress)
st ([Closure (IO a)] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [Closure (IO a)]
xs)
else IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
([a], ()) -> [a]
forall a b. (a, b) -> a
fst (([a], ()) -> [a]) -> IO ([a], ()) -> IO [a]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Async [a] -> Async () -> IO ([a], ())
forall a b. Async a -> Async b -> IO (a, b)
waitBoth Async [a]
result Async ()
pbar
where
updateThread :: TVar (Bool, Progress) -> Closure (IO a) -> IO (Async a)
updateThread TVar (Bool, Progress)
st Closure (IO a)
act =
IO a -> IO (Async a)
forall a. IO a -> IO (Async a)
async (IO a -> IO (Async a)) -> IO a -> IO (Async a)
forall a b. (a -> b) -> a -> b
$ do
TVar Progress
progress <- Progress -> IO (TVar Progress)
forall a. a -> IO (TVar a)
newTVarIO Progress
forall a. Monoid a => a
mempty
RetryPolicyM IO -> (RetryStatus -> IO a) -> IO a
forall (m :: * -> *) a.
(MonadIO m, MonadMask m) =>
RetryPolicyM m -> (RetryStatus -> m a) -> m a
recoverAll (Int -> RetryPolicy
limitRetries (Int -> RetryPolicy) -> Int -> RetryPolicy
forall a b. (a -> b) -> a -> b
$ Options -> Int
oRetries Options
options) ((RetryStatus -> IO a) -> IO a) -> (RetryStatus -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ \RetryStatus
retryStatus -> do
Handle a
handle <- Backend
-> Closure (Dict (Serializable a))
-> Closure (IO a)
-> IO (Handle a)
forall (m :: * -> *) a.
MonadIO m =>
Backend
-> Closure (Dict (Serializable a))
-> Closure (IO a)
-> m (Handle a)
fork Backend
backend Closure (Dict (Serializable a))
dict Closure (IO a)
act
(IO a -> IO a) -> IO a
forall a. (a -> a) -> a
fix ((IO a -> IO a) -> IO a) -> (IO a -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ \IO a
recurse -> do
Maybe (ExecutorFinalStatus a)
res <-
IO (Maybe (ExecutorFinalStatus a))
-> IO (Maybe (ExecutorFinalStatus a))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (ExecutorFinalStatus a))
-> IO (Maybe (ExecutorFinalStatus a)))
-> IO (Maybe (ExecutorFinalStatus a))
-> IO (Maybe (ExecutorFinalStatus a))
forall a b. (a -> b) -> a -> b
$ STM (Maybe (ExecutorFinalStatus a))
-> IO (Maybe (ExecutorFinalStatus a))
forall a. STM a -> IO a
atomically (STM (Maybe (ExecutorFinalStatus a))
-> IO (Maybe (ExecutorFinalStatus a)))
-> STM (Maybe (ExecutorFinalStatus a))
-> IO (Maybe (ExecutorFinalStatus a))
forall a b. (a -> b) -> a -> b
$ do
Progress
old <- TVar Progress -> STM Progress
forall a. TVar a -> STM a
readTVar TVar Progress
progress
let base :: Progress
base = Progress
forall a. Monoid a => a
mempty {retried :: Int
retried = RetryStatus -> Int
rsIterNumber RetryStatus
retryStatus}
(Progress
new, Maybe (ExecutorFinalStatus a)
res) <-
Handle a -> STM (ExecutorStatus a)
forall a. Handle a -> STM (ExecutorStatus a)
pollHandle Handle a
handle STM (ExecutorStatus a)
-> (ExecutorStatus a -> (Progress, Maybe (ExecutorFinalStatus a)))
-> STM (Progress, Maybe (ExecutorFinalStatus a))
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \case
ExecutorPending (ExecutorWaiting Maybe Text
_) -> (Progress
base {waiting :: Int
waiting = Int
1}, Maybe (ExecutorFinalStatus a)
forall a. Maybe a
Nothing)
ExecutorPending (ExecutorSubmitted Maybe Text
_) -> (Progress
base {submitted :: Int
submitted = Int
1}, Maybe (ExecutorFinalStatus a)
forall a. Maybe a
Nothing)
ExecutorPending (ExecutorStarted Maybe Text
_) -> (Progress
base {started :: Int
started = Int
1}, Maybe (ExecutorFinalStatus a)
forall a. Maybe a
Nothing)
ExecutorFinished ExecutorFinalStatus a
fr -> (Progress
base {finished :: Int
finished = Int
1}, ExecutorFinalStatus a -> Maybe (ExecutorFinalStatus a)
forall a. a -> Maybe a
Just ExecutorFinalStatus a
fr)
Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Progress
old Progress -> Progress -> Bool
forall a. Eq a => a -> a -> Bool
== Progress
new) STM ()
forall a. STM a
retry
TVar Progress -> Progress -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Progress
progress Progress
new
TVar (Bool, Progress)
-> ((Bool, Progress) -> (Bool, Progress)) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar TVar (Bool, Progress)
st (((Bool, Progress) -> (Bool, Progress)) -> STM ())
-> ((Bool, Progress) -> (Bool, Progress)) -> STM ()
forall a b. (a -> b) -> a -> b
$ \case
(Bool
_, Progress
m) -> (Bool
True, Progress
m Progress -> Progress -> Progress
forall a. Semigroup a => a -> a -> a
<> Progress -> Progress
forall m. Group m => m -> m
invert Progress
old Progress -> Progress -> Progress
forall a. Semigroup a => a -> a -> a
<> Progress
new)
Maybe (ExecutorFinalStatus a)
-> STM (Maybe (ExecutorFinalStatus a))
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (ExecutorFinalStatus a)
res
case Maybe (ExecutorFinalStatus a)
res of
Just (ExecutorFailed Text
err) -> IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO a -> IO a)
-> (ExecutorFailedException -> IO a)
-> ExecutorFailedException
-> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ExecutorFailedException -> IO a
forall e a. Exception e => e -> IO a
throwIO (ExecutorFailedException -> IO a)
-> ExecutorFailedException -> IO a
forall a b. (a -> b) -> a -> b
$ Text -> ExecutorFailedException
ExecutorFailedException Text
err
Just (ExecutorSucceeded a
x) -> a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return a
x
Maybe (ExecutorFinalStatus a)
Nothing -> IO a
recurse
progressThread :: TVar (Bool, Progress) -> Int -> IO (Async ())
progressThread TVar (Bool, Progress)
st Int
total = do
Int
termWidth <- Int -> (Window Int -> Int) -> Maybe (Window Int) -> Int
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Int
40 Window Int -> Int
forall a. Window a -> a
TS.width (Maybe (Window Int) -> Int) -> IO (Maybe (Window Int)) -> IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO (Maybe (Window Int))
forall n. Integral n => IO (Maybe (Window n))
TS.size :: IO Int
let ratio :: Double
ratio = Int -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
total Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/ Int -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int
termWidth Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
2) :: Double
IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ()))
-> ((IO () -> IO ()) -> IO ()) -> (IO () -> IO ()) -> IO (Async ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (IO () -> IO ()) -> IO ()
forall a. (a -> a) -> a
fix ((IO () -> IO ()) -> IO (Async ()))
-> (IO () -> IO ()) -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ \IO ()
recurse -> do
Progress
progress <-
STM Progress -> IO Progress
forall a. STM a -> IO a
atomically (STM Progress -> IO Progress) -> STM Progress -> IO Progress
forall a b. (a -> b) -> a -> b
$
TVar (Bool, Progress) -> STM (Bool, Progress)
forall a. TVar a -> STM a
readTVar TVar (Bool, Progress)
st
STM (Bool, Progress)
-> ((Bool, Progress) -> STM Progress) -> STM Progress
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
(Bool
False, Progress
_) -> STM Progress
forall a. STM a
retry
(Bool
True, Progress
p) -> do
TVar (Bool, Progress) -> (Bool, Progress) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Bool, Progress)
st (Bool
False, Progress
p)
Progress -> STM Progress
forall (m :: * -> *) a. Monad m => a -> m a
return Progress
p
let p :: a -> a -> [a]
p a
c a
n = Int -> a -> [a]
forall a. Int -> a -> [a]
replicate (Double -> Int
forall a b. (RealFrac a, Integral b) => a -> b
truncate (a -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral a
n Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/ Double
ratio)) a
c
String -> IO ()
putStr (String -> IO ()) -> ([String] -> String) -> [String] -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [String] -> String
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat ([String] -> IO ()) -> [String] -> IO ()
forall a b. (a -> b) -> a -> b
$
[ String
"\r",
String
"[",
Char -> Int -> String
forall a a. Integral a => a -> a -> [a]
p Char
'#' (Progress -> Int
finished Progress
progress),
Char -> Int -> String
forall a a. Integral a => a -> a -> [a]
p Char
':' (Progress -> Int
started Progress
progress),
Char -> Int -> String
forall a a. Integral a => a -> a -> [a]
p Char
'.' (Progress -> Int
submitted Progress
progress),
Char -> Int -> String
forall a a. Integral a => a -> a -> [a]
p Char
' ' (Progress -> Int
waiting Progress
progress),
String
"]"
]
if Progress -> Int
finished Progress
progress Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
total
then Int -> IO ()
threadDelay Int
10000 IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
recurse
else String -> IO ()
putStrLn String
""