{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}

module Control.Distributed.Fork.AWS.Lambda
  ( -- * Usage
    withLambdaBackend,

    -- * Options
    LambdaBackendOptions,
    lambdaBackendOptions,
    lboPrefix,
    lboMemory,
    lboMaxConcurrentInvocations,
    lboMaxConcurrentExecutions,
    lboMaxConcurrentDownloads,
    lboKeepStack,
  )
where

import Control.Concurrent.Throttled
import Control.Distributed.Fork.AWS.Lambda.Internal.Archive
import Control.Distributed.Fork.AWS.Lambda.Internal.Invoke
import Control.Distributed.Fork.AWS.Lambda.Internal.Stack
import Control.Distributed.Fork.AWS.Lambda.Internal.Types
import Control.Distributed.Fork.Backend
import Control.Lens (Lens', (^.), lens)
import Data.Bool (bool)
import qualified Data.Text as T
import Data.Time.Clock (getCurrentTime)
import Data.Time.Format (defaultTimeLocale, formatTime)
import Network.AWS
  ( Credentials (Discover),
    envRegion,
    newEnv,
    runAWS,
    runResourceT,
  )

-- |
-- Provides a 'Backend' using AWS Lambda functions.

-- In order to do that, roughly:

--     * It creates a deployment archive containing the program binary and
--     a tiny wrapper written in Python and uploads it to the given S3 bucket.
--     * Creates a CloudFormation stack containing the Lambda function; and both
--     an SQS queue and an S3 Bucket and to gather the answers.
--     * It starts polling the SQS queue for any answers.
--     * When executing, it invokes the Lambda function using asynchronous
--     invocation mode. After the function finishes, it either puts the result
--     directly into the SQS queue if the result is small enough (< 200kb) or
--     uploads it into the S3 bucket and sends a pointer to the file via the
--     queue.
--     * A separate thread on the driver continously polls the queue for answers
--     and parses and returns it to the caller.
--     * On exit, it deletes the CloudFormation stack.

-- Some warts:

--     * The same binary should run on AWS Lambda. In practice, this means:

--         * You have to build and use this library on a Linux machine.

--         * You have to statically link everything. You can use GHC's
--         '@-static -optl-static -optl-pthread -fPIC@' parameters for that.

--     * On AWS Lambda, more memory you assign to a function, more CPU you
--     get. So it might make your function run faster if you overallocate
--     memory.
--     * When invoked asynchronously, AWS Lambda retries the invocation
--     2 more times waiting a minute between every retry. This means when
--     something fails, it will take at least a few minutes to until you
--     get an exception.

-- Example:

-- @
-- {-\# LANGUAGE StaticPointers #-}

-- import Control.Lens
-- import Control.Distributed.Fork
-- import Control.Distributed.Fork.Lambda

-- opts :: LambdaBackendOptions
-- opts = lambdaBackendOptions "my-s3-bucket"
--          & lboMemory .~ 1024

-- main :: IO ()
-- main = do
--   'initDistributedFork'
--   'withLambdaBackend' opts $ \\backend -> do
--     handle <- 'fork' backend (static Dict) (static (return "Hello from Lambda!"))
--     await handle >>= putStrLn
-- @
withLambdaBackend :: LambdaBackendOptions -> (Backend -> IO a) -> IO a
withLambdaBackend :: LambdaBackendOptions -> (Backend -> IO a) -> IO a
withLambdaBackend LambdaBackendOptions {Bool
Int
Text
_lboKeepStack :: LambdaBackendOptions -> Bool
_lboMaxConcurrentDownloads :: LambdaBackendOptions -> Int
_lboMaxConcurrentExecutions :: LambdaBackendOptions -> Int
_lboMaxConcurrentInvocations :: LambdaBackendOptions -> Int
_lboMemory :: LambdaBackendOptions -> Int
_lboPrefix :: LambdaBackendOptions -> Text
_lboBucket :: LambdaBackendOptions -> Text
_lboKeepStack :: Bool
_lboMaxConcurrentDownloads :: Int
_lboMaxConcurrentExecutions :: Int
_lboMaxConcurrentInvocations :: Int
_lboMemory :: Int
_lboPrefix :: Text
_lboBucket :: Text
..} Backend -> IO a
f = do
  Env
env <- Credentials -> IO Env
forall (m :: * -> *).
(Applicative m, MonadIO m, MonadCatch m) =>
Credentials -> m Env
newEnv Credentials
Discover
  String -> IO ()
putStrLn (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"Detected region: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Region -> String
forall a. Show a => a -> String
show (Env
env Env -> Getting Region Env Region -> Region
forall s a. s -> Getting a s a -> a
^. Getting Region Env Region
forall a. HasEnv a => Lens' a Region
envRegion) String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
"."
  Archive
archive <- IO Archive
mkArchive
  let cksum :: Text
cksum = Archive -> Text
archiveChecksum Archive
archive
      size :: Double
size = Integer -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Archive -> Integer
archiveSize Archive
archive) Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/ (Double
1000 Double -> Double -> Double
forall a. Num a => a -> a -> a
* Double
1000) :: Double
      s3loc :: S3Loc
s3loc =
        BucketName -> Text -> S3Loc
S3Loc (Text -> BucketName
BucketName Text
_lboBucket) (Text
_lboPrefix Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"-" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
cksum Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
".zip")
  String -> IO ()
putStrLn (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"Checking if deployment archive exists (" String -> String -> String
forall a. Semigroup a => a -> a -> a
<> S3Loc -> String
forall a. Show a => a -> String
show S3Loc
s3loc String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
")."
  ResourceT IO () -> IO ()
forall (m :: * -> *) a. MonadUnliftIO m => ResourceT m a -> m a
runResourceT (ResourceT IO () -> IO ())
-> (AWS () -> ResourceT IO ()) -> AWS () -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Env -> AWS () -> ResourceT IO ()
forall (m :: * -> *) r a.
(MonadResource m, HasEnv r) =>
r -> AWS a -> m a
runAWS Env
env (AWS () -> IO ()) -> AWS () -> IO ()
forall a b. (a -> b) -> a -> b
$
    S3Loc -> AWS Bool
awsObjectExists S3Loc
s3loc
      AWS Bool -> (Bool -> AWS ()) -> AWS ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= AWS () -> AWS () -> Bool -> AWS ()
forall a. a -> a -> Bool -> a
bool
        ( IO () -> AWS ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (String -> IO ()
putStrLn (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"Uploading the deployment archive. (" String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Double -> String
forall a. Show a => a -> String
show Double
size String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
" MB)")
            AWS () -> AWS () -> AWS ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> S3Loc -> ByteString -> AWS ()
awsUploadObject S3Loc
s3loc (Archive -> ByteString
archiveToByteString Archive
archive)
        )
        (IO () -> AWS ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> AWS ()) -> IO () -> AWS ()
forall a b. (a -> b) -> a -> b
$ String -> IO ()
putStrLn String
"Found archive, skipping upload.")
  Text
time <-
    String -> Text
T.pack (String -> Text) -> (UTCTime -> String) -> UTCTime -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TimeLocale -> String -> UTCTime -> String
forall t. FormatTime t => TimeLocale -> String -> t -> String
formatTime TimeLocale
defaultTimeLocale String
"%Y%m%d%H%M%S" (UTCTime -> Text) -> IO UTCTime -> IO Text
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UTCTime
getCurrentTime
  let stackOptions :: StackOptions
stackOptions =
        StackOptions :: StackName -> Int -> S3Loc -> Bool -> StackOptions
StackOptions
          { soName :: StackName
soName = Text -> StackName
StackName (Text
_lboPrefix Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"-" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
time Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"-" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
cksum),
            soLambdaMemory :: Int
soLambdaMemory = Int
_lboMemory,
            soLambdaCode :: S3Loc
soLambdaCode = S3Loc
s3loc,
            soKeep :: Bool
soKeep = Bool
_lboKeepStack
          }
  String -> IO ()
putStrLn String
"Creating stack."
  StackOptions -> Env -> (StackInfo -> IO a) -> IO a
forall a. StackOptions -> Env -> (StackInfo -> IO a) -> IO a
withStack StackOptions
stackOptions Env
env ((StackInfo -> IO a) -> IO a) -> (StackInfo -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ \StackInfo
si -> do
    String -> IO ()
putStrLn (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"Stack created: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Text -> String
T.unpack (StackInfo -> Text
siId StackInfo
si)
    (Throttle, Throttle, Throttle)
throttles <-
      (,,) (Throttle
 -> Throttle -> Throttle -> (Throttle, Throttle, Throttle))
-> IO Throttle
-> IO (Throttle -> Throttle -> (Throttle, Throttle, Throttle))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Int -> IO Throttle
forall (m :: * -> *). MonadIO m => Int -> m Throttle
newThrottle Int
_lboMaxConcurrentInvocations
        IO (Throttle -> Throttle -> (Throttle, Throttle, Throttle))
-> IO Throttle -> IO (Throttle -> (Throttle, Throttle, Throttle))
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Int -> IO Throttle
forall (m :: * -> *). MonadIO m => Int -> m Throttle
newThrottle Int
_lboMaxConcurrentExecutions
        IO (Throttle -> (Throttle, Throttle, Throttle))
-> IO Throttle -> IO (Throttle, Throttle, Throttle)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Int -> IO Throttle
forall (m :: * -> *). MonadIO m => Int -> m Throttle
newThrottle Int
_lboMaxConcurrentDownloads
    Env
-> (Throttle, Throttle, Throttle)
-> StackInfo
-> ((ByteString -> BackendM ByteString) -> IO a)
-> IO a
forall a.
Env
-> (Throttle, Throttle, Throttle)
-> StackInfo
-> ((ByteString -> BackendM ByteString) -> IO a)
-> IO a
withInvoke Env
env (Throttle, Throttle, Throttle)
throttles StackInfo
si (((ByteString -> BackendM ByteString) -> IO a) -> IO a)
-> ((ByteString -> BackendM ByteString) -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ \ByteString -> BackendM ByteString
invoke ->
      Backend -> IO a
f (Backend -> IO a) -> Backend -> IO a
forall a b. (a -> b) -> a -> b
$ (ByteString -> BackendM ByteString) -> Backend
Backend ByteString -> BackendM ByteString
invoke

-- |
-- Options required for creating a Lambda backend.

-- Use 'lambdaBackendOptions' smart constructor to create and lenses below for
-- setting optional fields.
data LambdaBackendOptions
  = LambdaBackendOptions
      { LambdaBackendOptions -> Text
_lboBucket :: T.Text,
        LambdaBackendOptions -> Text
_lboPrefix :: T.Text,
        LambdaBackendOptions -> Int
_lboMemory :: Int,
        LambdaBackendOptions -> Int
_lboMaxConcurrentInvocations :: Int,
        LambdaBackendOptions -> Int
_lboMaxConcurrentExecutions :: Int,
        LambdaBackendOptions -> Int
_lboMaxConcurrentDownloads :: Int,
        LambdaBackendOptions -> Bool
_lboKeepStack :: Bool
      }

lambdaBackendOptions ::
  -- | Name of the S3 bucket to store the deployment archive in.
  T.Text ->
  LambdaBackendOptions
lambdaBackendOptions :: Text -> LambdaBackendOptions
lambdaBackendOptions Text
bucket =
  LambdaBackendOptions :: Text
-> Text -> Int -> Int -> Int -> Int -> Bool -> LambdaBackendOptions
LambdaBackendOptions
    { _lboBucket :: Text
_lboBucket = Text
bucket,
      _lboPrefix :: Text
_lboPrefix = Text
"distributed-dataset",
      _lboMemory :: Int
_lboMemory = Int
1024,
      _lboMaxConcurrentInvocations :: Int
_lboMaxConcurrentInvocations = Int
64,
      _lboMaxConcurrentExecutions :: Int
_lboMaxConcurrentExecutions = Int
0,
      _lboMaxConcurrentDownloads :: Int
_lboMaxConcurrentDownloads = Int
16,
      _lboKeepStack :: Bool
_lboKeepStack = Bool
False
    }

-- |
-- Desired memory for the Lambda functions.

-- See <https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-function.html#cfn-lambda-function-memorysize CloudFormation's AWS::Lambda::Function::MemorySize page> for allowed values.

--   Default: 1024
lboMemory :: Lens' LambdaBackendOptions Int
lboMemory :: (Int -> f Int) -> LambdaBackendOptions -> f LambdaBackendOptions
lboMemory = (LambdaBackendOptions -> Int)
-> (LambdaBackendOptions -> Int -> LambdaBackendOptions)
-> Lens LambdaBackendOptions LambdaBackendOptions Int Int
forall s a b t. (s -> a) -> (s -> b -> t) -> Lens s t a b
lens LambdaBackendOptions -> Int
_lboMemory (\LambdaBackendOptions
s Int
t -> LambdaBackendOptions
s {_lboMemory :: Int
_lboMemory = Int
t})

-- |
-- Prefix to the deployment archive and the CloudFormation stack.

-- Default: "distributed-dataset"
lboPrefix :: Lens' LambdaBackendOptions T.Text
lboPrefix :: (Text -> f Text) -> LambdaBackendOptions -> f LambdaBackendOptions
lboPrefix = (LambdaBackendOptions -> Text)
-> (LambdaBackendOptions -> Text -> LambdaBackendOptions)
-> Lens LambdaBackendOptions LambdaBackendOptions Text Text
forall s a b t. (s -> a) -> (s -> b -> t) -> Lens s t a b
lens LambdaBackendOptions -> Text
_lboPrefix (\LambdaBackendOptions
s Text
t -> LambdaBackendOptions
s {_lboPrefix :: Text
_lboPrefix = Text
t})

-- |
-- Maximum number of concurrent "invoke" calls to AWS API to trigger executions.

-- Non-positive values disable the throttling.

-- Default: 64
lboMaxConcurrentInvocations :: Lens' LambdaBackendOptions Int
lboMaxConcurrentInvocations :: (Int -> f Int) -> LambdaBackendOptions -> f LambdaBackendOptions
lboMaxConcurrentInvocations =
  (LambdaBackendOptions -> Int)
-> (LambdaBackendOptions -> Int -> LambdaBackendOptions)
-> Lens LambdaBackendOptions LambdaBackendOptions Int Int
forall s a b t. (s -> a) -> (s -> b -> t) -> Lens s t a b
lens LambdaBackendOptions -> Int
_lboMaxConcurrentInvocations (\LambdaBackendOptions
s Int
t -> LambdaBackendOptions
s {_lboMaxConcurrentInvocations :: Int
_lboMaxConcurrentInvocations = Int
t})

-- |
-- Maximum number of concurrently executing Lambda functions.

-- Non-positive values disable the throttling.

-- Default: 0
lboMaxConcurrentExecutions :: Lens' LambdaBackendOptions Int
lboMaxConcurrentExecutions :: (Int -> f Int) -> LambdaBackendOptions -> f LambdaBackendOptions
lboMaxConcurrentExecutions =
  (LambdaBackendOptions -> Int)
-> (LambdaBackendOptions -> Int -> LambdaBackendOptions)
-> Lens LambdaBackendOptions LambdaBackendOptions Int Int
forall s a b t. (s -> a) -> (s -> b -> t) -> Lens s t a b
lens LambdaBackendOptions -> Int
_lboMaxConcurrentExecutions (\LambdaBackendOptions
s Int
t -> LambdaBackendOptions
s {_lboMaxConcurrentExecutions :: Int
_lboMaxConcurrentExecutions = Int
t})

-- |
-- If the size of the return value from your function is larger than 200 kilobytes,
-- we fetch the results via S3. This parameter sets the maximum number of concurrent
-- downloads from S3.

-- Non-positive values disable the throttling.

-- Default: 16
lboMaxConcurrentDownloads :: Lens' LambdaBackendOptions Int
lboMaxConcurrentDownloads :: (Int -> f Int) -> LambdaBackendOptions -> f LambdaBackendOptions
lboMaxConcurrentDownloads =
  (LambdaBackendOptions -> Int)
-> (LambdaBackendOptions -> Int -> LambdaBackendOptions)
-> Lens LambdaBackendOptions LambdaBackendOptions Int Int
forall s a b t. (s -> a) -> (s -> b -> t) -> Lens s t a b
lens LambdaBackendOptions -> Int
_lboMaxConcurrentDownloads (\LambdaBackendOptions
s Int
t -> LambdaBackendOptions
s {_lboMaxConcurrentDownloads :: Int
_lboMaxConcurrentDownloads = Int
t})

-- |
-- Whether to keep the CloudFormation stack after the 'withLambdaBackend' call.
-- Useful for debugging.

-- Default: Fales
lboKeepStack :: Lens' LambdaBackendOptions Bool
lboKeepStack :: (Bool -> f Bool) -> LambdaBackendOptions -> f LambdaBackendOptions
lboKeepStack = (LambdaBackendOptions -> Bool)
-> (LambdaBackendOptions -> Bool -> LambdaBackendOptions)
-> Lens LambdaBackendOptions LambdaBackendOptions Bool Bool
forall s a b t. (s -> a) -> (s -> b -> t) -> Lens s t a b
lens LambdaBackendOptions -> Bool
_lboKeepStack (\LambdaBackendOptions
s Bool
t -> LambdaBackendOptions
s {_lboKeepStack :: Bool
_lboKeepStack = Bool
t})