{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}

module Control.Distributed.Fork.AWS.Lambda.Internal.Invoke
  ( withInvoke,
  )
where

import Conduit
import Control.Concurrent.Async
import Control.Concurrent.MVar
import Control.Concurrent.Throttled
import Control.Distributed.Fork.AWS.Lambda.Internal.Stack (StackInfo (..))
import Control.Distributed.Fork.AWS.Lambda.Internal.Types
import Control.Distributed.Fork.Backend
import Control.Exception.Safe
import Control.Lens
import Control.Monad
import qualified Data.Aeson as A
import Data.Aeson.Lens
import qualified Data.ByteString as BS
import Data.ByteString.Base64 as B64
import qualified Data.ByteString.Lazy as BL
import qualified Data.Map.Strict as M
import Data.Maybe
import qualified Data.Text as T
import qualified Data.Text.Encoding as T
import Network.AWS
import Network.AWS.Lambda
import Network.AWS.S3 as S3
import Network.AWS.SQS

{-
Since we're going to get our answers asynchronously, we maintain a state with
callbacks for individual invocations.

Every individual invocation have an incrementing id, so we can distinguish
the responses.
-}
data LambdaState
  = LambdaState
      { LambdaState -> Map Int (IO ResponsePayload -> IO ())
lsInvocations :: M.Map Int (IO ResponsePayload -> IO ()),
        LambdaState -> Int
lsNextId :: Int
      }

data LambdaEnv
  = LambdaEnv
      { LambdaEnv -> MVar LambdaState
leState :: MVar LambdaState,
        LambdaEnv -> StackInfo
leStack :: StackInfo,
        LambdaEnv -> Env
leEnv :: Env
      }

newLambdaEnv :: Env -> StackInfo -> IO LambdaEnv
newLambdaEnv :: Env -> StackInfo -> IO LambdaEnv
newLambdaEnv Env
env StackInfo
st =
  MVar LambdaState -> StackInfo -> Env -> LambdaEnv
LambdaEnv
    (MVar LambdaState -> StackInfo -> Env -> LambdaEnv)
-> IO (MVar LambdaState) -> IO (StackInfo -> Env -> LambdaEnv)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> LambdaState -> IO (MVar LambdaState)
forall a. a -> IO (MVar a)
newMVar (Map Int (IO ResponsePayload -> IO ()) -> Int -> LambdaState
LambdaState Map Int (IO ResponsePayload -> IO ())
forall k a. Map k a
M.empty Int
0)
    IO (StackInfo -> Env -> LambdaEnv)
-> IO StackInfo -> IO (Env -> LambdaEnv)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> StackInfo -> IO StackInfo
forall (m :: * -> *) a. Monad m => a -> m a
return StackInfo
st
    IO (Env -> LambdaEnv) -> IO Env -> IO LambdaEnv
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Env -> IO Env
forall (m :: * -> *) a. Monad m => a -> m a
return Env
env

{-
When invoking a function, we insert a new id to the state and then call Lambda.
-}
execute ::
  LambdaEnv ->
  (Throttle, Throttle, Throttle) ->
  BS.ByteString ->
  BackendM BS.ByteString
execute :: LambdaEnv
-> (Throttle, Throttle, Throttle)
-> ByteString
-> BackendM ByteString
execute LambdaEnv {Env
MVar LambdaState
StackInfo
leEnv :: Env
leStack :: StackInfo
leState :: MVar LambdaState
leEnv :: LambdaEnv -> Env
leStack :: LambdaEnv -> StackInfo
leState :: LambdaEnv -> MVar LambdaState
..} (Throttle
invocationThrottle, Throttle
executionThrottle, Throttle
downloadThrottle) ByteString
input = do
  -- Modify environment
  MVar (IO ResponsePayload)
mvar <- IO (MVar (IO ResponsePayload))
-> BackendM (MVar (IO ResponsePayload))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (MVar (IO ResponsePayload))
 -> BackendM (MVar (IO ResponsePayload)))
-> IO (MVar (IO ResponsePayload))
-> BackendM (MVar (IO ResponsePayload))
forall a b. (a -> b) -> a -> b
$ IO (MVar (IO ResponsePayload))
forall a. IO (MVar a)
newEmptyMVar @(IO ResponsePayload)
  Int
id' <-
    IO Int -> BackendM Int
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Int -> BackendM Int) -> IO Int -> BackendM Int
forall a b. (a -> b) -> a -> b
$ MVar LambdaState
-> (LambdaState -> IO (LambdaState, Int)) -> IO Int
forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar MVar LambdaState
leState ((LambdaState -> IO (LambdaState, Int)) -> IO Int)
-> (LambdaState -> IO (LambdaState, Int)) -> IO Int
forall a b. (a -> b) -> a -> b
$ \LambdaState {Int
Map Int (IO ResponsePayload -> IO ())
lsNextId :: Int
lsInvocations :: Map Int (IO ResponsePayload -> IO ())
lsNextId :: LambdaState -> Int
lsInvocations :: LambdaState -> Map Int (IO ResponsePayload -> IO ())
..} ->
      (LambdaState, Int) -> IO (LambdaState, Int)
forall (m :: * -> *) a. Monad m => a -> m a
return
        ( LambdaState :: Map Int (IO ResponsePayload -> IO ()) -> Int -> LambdaState
LambdaState
            { lsNextId :: Int
lsNextId = Int
lsNextId Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1,
              lsInvocations :: Map Int (IO ResponsePayload -> IO ())
lsInvocations = Int
-> (IO ResponsePayload -> IO ())
-> Map Int (IO ResponsePayload -> IO ())
-> Map Int (IO ResponsePayload -> IO ())
forall k a. Ord k => k -> a -> Map k a -> Map k a
M.insert Int
lsNextId (IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ())
-> (IO ResponsePayload -> IO Bool) -> IO ResponsePayload -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MVar (IO ResponsePayload) -> IO ResponsePayload -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar MVar (IO ResponsePayload)
mvar) Map Int (IO ResponsePayload -> IO ())
lsInvocations
            },
          Int
lsNextId
        )
  ResponsePayload
answer <-
    Throttle
-> forall (m :: * -> *) a. (MonadIO m, MonadMask m) => m a -> m a
throttled Throttle
executionThrottle (BackendM ResponsePayload -> BackendM ResponsePayload)
-> BackendM ResponsePayload -> BackendM ResponsePayload
forall a b. (a -> b) -> a -> b
$ do
      -- invoke the lambda function
      InvokeResponse
irs <-
        Throttle
-> forall (m :: * -> *) a. (MonadIO m, MonadMask m) => m a -> m a
throttled Throttle
invocationThrottle (BackendM InvokeResponse -> BackendM InvokeResponse)
-> BackendM InvokeResponse -> BackendM InvokeResponse
forall a b. (a -> b) -> a -> b
$ ResourceT BackendM InvokeResponse -> BackendM InvokeResponse
forall (m :: * -> *) a. MonadUnliftIO m => ResourceT m a -> m a
runResourceT (ResourceT BackendM InvokeResponse -> BackendM InvokeResponse)
-> (AWS InvokeResponse -> ResourceT BackendM InvokeResponse)
-> AWS InvokeResponse
-> BackendM InvokeResponse
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Env -> AWS InvokeResponse -> ResourceT BackendM InvokeResponse
forall (m :: * -> *) r a.
(MonadResource m, HasEnv r) =>
r -> AWS a -> m a
runAWS Env
leEnv
          (AWS InvokeResponse -> BackendM InvokeResponse)
-> AWS InvokeResponse -> BackendM InvokeResponse
forall a b. (a -> b) -> a -> b
$ Invoke -> AWST' Env (ResourceT IO) (Rs Invoke)
forall (m :: * -> *) a. (MonadAWS m, AWSRequest a) => a -> m (Rs a)
send
          (Invoke -> AWST' Env (ResourceT IO) (Rs Invoke))
-> Invoke -> AWST' Env (ResourceT IO) (Rs Invoke)
forall a b. (a -> b) -> a -> b
$ Text -> ByteString -> Invoke
invoke
            (StackInfo -> Text
siFunc StackInfo
leStack)
            ( ByteString -> ByteString
BL.toStrict (ByteString -> ByteString)
-> (Value -> ByteString) -> Value -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Value -> ByteString
forall a. ToJSON a => a -> ByteString
A.encode (Value -> ByteString) -> Value -> ByteString
forall a b. (a -> b) -> a -> b
$
                [Pair] -> Value
A.object
                  [ (String -> Text
T.pack String
"d", Text -> Value
forall a. ToJSON a => a -> Value
A.toJSON (Text -> Value) -> (ByteString -> Text) -> ByteString -> Value
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Text
T.decodeUtf8 (ByteString -> Value) -> ByteString -> Value
forall a b. (a -> b) -> a -> b
$ ByteString -> ByteString
B64.encode ByteString
input),
                    (String -> Text
T.pack String
"i", Int -> Value
forall a. ToJSON a => a -> Value
A.toJSON Int
id')
                  ]
            )
            Invoke -> (Invoke -> Invoke) -> Invoke
forall a b. a -> (a -> b) -> b
& (Maybe InvocationType -> Identity (Maybe InvocationType))
-> Invoke -> Identity Invoke
Lens' Invoke (Maybe InvocationType)
iInvocationType
            ((Maybe InvocationType -> Identity (Maybe InvocationType))
 -> Invoke -> Identity Invoke)
-> InvocationType -> Invoke -> Invoke
forall s t a b. ASetter s t a (Maybe b) -> b -> s -> t
?~ InvocationType
Event
      BackendM ()
submitted
      Bool -> BackendM () -> BackendM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (InvokeResponse
irs InvokeResponse -> Getting Int InvokeResponse Int -> Int
forall s a. s -> Getting a s a -> a
^. Getting Int InvokeResponse Int
Lens' InvokeResponse Int
irsStatusCode Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` Int
100 Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
2)
        (BackendM () -> BackendM ()) -> BackendM () -> BackendM ()
forall a b. (a -> b) -> a -> b
$ InvokeException -> BackendM ()
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (InvokeException -> BackendM ())
-> (Text -> InvokeException) -> Text -> BackendM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> InvokeException
InvokeException
        (Text -> BackendM ()) -> Text -> BackendM ()
forall a b. (a -> b) -> a -> b
$ Text
"Invoke failed. Status code: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (Int -> String
forall a. Show a => a -> String
show (Int -> String) -> Int -> String
forall a b. (a -> b) -> a -> b
$ InvokeResponse
irs InvokeResponse -> Getting Int InvokeResponse Int -> Int
forall s a. s -> Getting a s a -> a
^. Getting Int InvokeResponse Int
Lens' InvokeResponse Int
irsStatusCode)
      -- wait fo the answer
      IO ResponsePayload -> BackendM ResponsePayload
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ResponsePayload -> BackendM ResponsePayload)
-> (IO (IO ResponsePayload) -> IO ResponsePayload)
-> IO (IO ResponsePayload)
-> BackendM ResponsePayload
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO (IO ResponsePayload) -> IO ResponsePayload
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO ResponsePayload) -> BackendM ResponsePayload)
-> IO (IO ResponsePayload) -> BackendM ResponsePayload
forall a b. (a -> b) -> a -> b
$ MVar (IO ResponsePayload) -> IO (IO ResponsePayload)
forall a. MVar a -> IO a
readMVar MVar (IO ResponsePayload)
mvar
  case ResponsePayload
answer of
    ResponsePayloadInline Text
p ->
      case ByteString -> Either String ByteString
B64.decode (ByteString -> Either String ByteString)
-> (Text -> ByteString) -> Text -> Either String ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> ByteString
T.encodeUtf8 (Text -> Either String ByteString)
-> Text -> Either String ByteString
forall a b. (a -> b) -> a -> b
$ Text
p of
        Left String
err ->
          InvokeException -> BackendM ByteString
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (InvokeException -> BackendM ByteString)
-> (Text -> InvokeException) -> Text -> BackendM ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> InvokeException
InvokeException (Text -> BackendM ByteString) -> Text -> BackendM ByteString
forall a b. (a -> b) -> a -> b
$
            Text
"Error decoding answer: "
              Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack String
err
        Right ByteString
x -> ByteString -> BackendM ByteString
forall (m :: * -> *) a. Monad m => a -> m a
return ByteString
x
    ResponsePayloadS3 Text
p ->
      Throttle
-> forall (m :: * -> *) a. (MonadIO m, MonadMask m) => m a -> m a
throttled Throttle
downloadThrottle
        (BackendM ByteString -> BackendM ByteString)
-> BackendM ByteString -> BackendM ByteString
forall a b. (a -> b) -> a -> b
$ ResourceT BackendM ByteString -> BackendM ByteString
forall (m :: * -> *) a. MonadUnliftIO m => ResourceT m a -> m a
runResourceT
          (ResourceT BackendM ByteString -> BackendM ByteString)
-> (AWS ByteString -> ResourceT BackendM ByteString)
-> AWS ByteString
-> BackendM ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Env -> AWS ByteString -> ResourceT BackendM ByteString
forall (m :: * -> *) r a.
(MonadResource m, HasEnv r) =>
r -> AWS a -> m a
runAWS Env
leEnv
        (AWS ByteString -> BackendM ByteString)
-> AWS ByteString -> BackendM ByteString
forall a b. (a -> b) -> a -> b
$ do
          let bucketName :: BucketName
bucketName = Text -> BucketName
S3.BucketName (Text -> BucketName) -> Text -> BucketName
forall a b. (a -> b) -> a -> b
$ StackInfo -> Text
siAnswerBucket StackInfo
leStack
          GetObjectResponse
gors <- GetObject -> AWST' Env (ResourceT IO) (Rs GetObject)
forall (m :: * -> *) a. (MonadAWS m, AWSRequest a) => a -> m (Rs a)
send (GetObject -> AWST' Env (ResourceT IO) (Rs GetObject))
-> GetObject -> AWST' Env (ResourceT IO) (Rs GetObject)
forall a b. (a -> b) -> a -> b
$ BucketName -> ObjectKey -> GetObject
getObject BucketName
bucketName (Text -> ObjectKey
ObjectKey Text
p)
          Bool -> AWST' Env (ResourceT IO) () -> AWST' Env (ResourceT IO) ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (GetObjectResponse
gors GetObjectResponse -> Getting Int GetObjectResponse Int -> Int
forall s a. s -> Getting a s a -> a
^. Getting Int GetObjectResponse Int
Lens' GetObjectResponse Int
gorsResponseStatus Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` Int
100 Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
2)
            (AWST' Env (ResourceT IO) () -> AWST' Env (ResourceT IO) ())
-> AWST' Env (ResourceT IO) () -> AWST' Env (ResourceT IO) ()
forall a b. (a -> b) -> a -> b
$ InvokeException -> AWST' Env (ResourceT IO) ()
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO
              (InvokeException -> AWST' Env (ResourceT IO) ())
-> (Text -> InvokeException) -> Text -> AWST' Env (ResourceT IO) ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> InvokeException
InvokeException
            (Text -> AWST' Env (ResourceT IO) ())
-> Text -> AWST' Env (ResourceT IO) ()
forall a b. (a -> b) -> a -> b
$ Text
"Downloading result failed. Status code: "
              Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (Int -> String
forall a. Show a => a -> String
show (Int -> String) -> Int -> String
forall a b. (a -> b) -> a -> b
$ GetObjectResponse
gors GetObjectResponse -> Getting Int GetObjectResponse Int -> Int
forall s a. s -> Getting a s a -> a
^. Getting Int GetObjectResponse Int
Lens' GetObjectResponse Int
gorsResponseStatus)
          ByteString
bs <- [ByteString] -> ByteString
forall a. Monoid a => [a] -> a
mconcat ([ByteString] -> ByteString)
-> AWST' Env (ResourceT IO) [ByteString] -> AWS ByteString
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> RsBody
-> ConduitM ByteString Void (ResourceT IO) [ByteString]
-> AWST' Env (ResourceT IO) [ByteString]
forall (m :: * -> *) a.
MonadIO m =>
RsBody -> ConduitM ByteString Void (ResourceT IO) a -> m a
sinkBody (GetObjectResponse
gors GetObjectResponse
-> Getting RsBody GetObjectResponse RsBody -> RsBody
forall s a. s -> Getting a s a -> a
^. Getting RsBody GetObjectResponse RsBody
Lens' GetObjectResponse RsBody
gorsBody) ConduitM ByteString Void (ResourceT IO) [ByteString]
forall (m :: * -> *) a o. Monad m => ConduitT a o m [a]
sinkList
          DeleteObjectResponse
dors <- DeleteObject -> AWST' Env (ResourceT IO) (Rs DeleteObject)
forall (m :: * -> *) a. (MonadAWS m, AWSRequest a) => a -> m (Rs a)
send (DeleteObject -> AWST' Env (ResourceT IO) (Rs DeleteObject))
-> DeleteObject -> AWST' Env (ResourceT IO) (Rs DeleteObject)
forall a b. (a -> b) -> a -> b
$ BucketName -> ObjectKey -> DeleteObject
deleteObject BucketName
bucketName (Text -> ObjectKey
ObjectKey Text
p)
          Bool -> AWST' Env (ResourceT IO) () -> AWST' Env (ResourceT IO) ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (DeleteObjectResponse
dors DeleteObjectResponse -> Getting Int DeleteObjectResponse Int -> Int
forall s a. s -> Getting a s a -> a
^. Getting Int DeleteObjectResponse Int
Lens' DeleteObjectResponse Int
dorsResponseStatus Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` Int
100 Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
2)
            (AWST' Env (ResourceT IO) () -> AWST' Env (ResourceT IO) ())
-> AWST' Env (ResourceT IO) () -> AWST' Env (ResourceT IO) ()
forall a b. (a -> b) -> a -> b
$ InvokeException -> AWST' Env (ResourceT IO) ()
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO
              (InvokeException -> AWST' Env (ResourceT IO) ())
-> (Text -> InvokeException) -> Text -> AWST' Env (ResourceT IO) ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> InvokeException
InvokeException
            (Text -> AWST' Env (ResourceT IO) ())
-> Text -> AWST' Env (ResourceT IO) ()
forall a b. (a -> b) -> a -> b
$ Text
"Deleting result failed. Status code: "
              Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (Int -> String
forall a. Show a => a -> String
show (Int -> String) -> Int -> String
forall a b. (a -> b) -> a -> b
$ DeleteObjectResponse
dors DeleteObjectResponse -> Getting Int DeleteObjectResponse Int -> Int
forall s a. s -> Getting a s a -> a
^. Getting Int DeleteObjectResponse Int
Lens' DeleteObjectResponse Int
dorsResponseStatus)
          ByteString -> AWS ByteString
forall (m :: * -> *) a. Monad m => a -> m a
return ByteString
bs

{-
And then we listen from answerQueue for the responses
-}
answerThread :: LambdaEnv -> IO ()
answerThread :: LambdaEnv -> IO ()
answerThread LambdaEnv {Env
MVar LambdaState
StackInfo
leEnv :: Env
leStack :: StackInfo
leState :: MVar LambdaState
leEnv :: LambdaEnv -> Env
leStack :: LambdaEnv -> StackInfo
leState :: LambdaEnv -> MVar LambdaState
..} =
  ResourceT IO () -> IO ()
forall (m :: * -> *) a. MonadUnliftIO m => ResourceT m a -> m a
runResourceT (ResourceT IO () -> IO ())
-> (AWST' Env (ResourceT IO) () -> ResourceT IO ())
-> AWST' Env (ResourceT IO) ()
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Env -> AWST' Env (ResourceT IO) () -> ResourceT IO ()
forall (m :: * -> *) r a.
(MonadResource m, HasEnv r) =>
r -> AWS a -> m a
runAWS Env
leEnv (AWST' Env (ResourceT IO) () -> ResourceT IO ())
-> (AWST' Env (ResourceT IO) () -> AWST' Env (ResourceT IO) ())
-> AWST' Env (ResourceT IO) ()
-> ResourceT IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. AWST' Env (ResourceT IO) () -> AWST' Env (ResourceT IO) ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (AWST' Env (ResourceT IO) () -> IO ())
-> AWST' Env (ResourceT IO) () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    [Message]
msgs <- Text -> AWS [Message]
sqsReceiveSome (Text -> AWS [Message]) -> Text -> AWS [Message]
forall a b. (a -> b) -> a -> b
$ StackInfo -> Text
siAnswerQueue StackInfo
leStack
    [Message]
-> (Message -> AWST' Env (ResourceT IO) ())
-> AWST' Env (ResourceT IO) ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Message]
msgs ((Message -> AWST' Env (ResourceT IO) ())
 -> AWST' Env (ResourceT IO) ())
-> (Message -> AWST' Env (ResourceT IO) ())
-> AWST' Env (ResourceT IO) ()
forall a b. (a -> b) -> a -> b
$ \Message
msg -> do
      Response Int
id' ResponsePayload
payload <-
        case ByteString -> Maybe Response
forall a. FromJSON a => ByteString -> Maybe a
A.decodeStrict (ByteString -> Maybe Response)
-> (Text -> ByteString) -> Text -> Maybe Response
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> ByteString
T.encodeUtf8 (Text -> Maybe Response) -> Maybe Text -> Maybe (Maybe Response)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Message
msg Message -> Getting (Maybe Text) Message (Maybe Text) -> Maybe Text
forall s a. s -> Getting a s a -> a
^. Getting (Maybe Text) Message (Maybe Text)
Lens' Message (Maybe Text)
mBody of
          Maybe (Maybe Response)
Nothing ->
            InvokeException -> AWST' Env (ResourceT IO) Response
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (InvokeException -> AWST' Env (ResourceT IO) Response)
-> (Text -> InvokeException)
-> Text
-> AWST' Env (ResourceT IO) Response
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> InvokeException
InvokeException (Text -> AWST' Env (ResourceT IO) Response)
-> Text -> AWST' Env (ResourceT IO) Response
forall a b. (a -> b) -> a -> b
$
              Text
"Error decoding answer: no body."
          Just Maybe Response
Nothing ->
            InvokeException -> AWST' Env (ResourceT IO) Response
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (InvokeException -> AWST' Env (ResourceT IO) Response)
-> (Text -> InvokeException)
-> Text
-> AWST' Env (ResourceT IO) Response
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> InvokeException
InvokeException (Text -> AWST' Env (ResourceT IO) Response)
-> Text -> AWST' Env (ResourceT IO) Response
forall a b. (a -> b) -> a -> b
$
              Text
"Error decoding answer: invalid json: "
                Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (Message -> String
forall a. Show a => a -> String
show Message
msg)
          Just (Just Response
r) -> Response -> AWST' Env (ResourceT IO) Response
forall (m :: * -> *) a. Monad m => a -> m a
return Response
r
      IO () -> AWST' Env (ResourceT IO) ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> AWST' Env (ResourceT IO) ())
-> ((LambdaState -> IO LambdaState) -> IO ())
-> (LambdaState -> IO LambdaState)
-> AWST' Env (ResourceT IO) ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MVar LambdaState -> (LambdaState -> IO LambdaState) -> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ MVar LambdaState
leState ((LambdaState -> IO LambdaState) -> AWST' Env (ResourceT IO) ())
-> (LambdaState -> IO LambdaState) -> AWST' Env (ResourceT IO) ()
forall a b. (a -> b) -> a -> b
$ \LambdaState
s ->
        case (Int
 -> (IO ResponsePayload -> IO ())
 -> Maybe (IO ResponsePayload -> IO ()))
-> Int
-> Map Int (IO ResponsePayload -> IO ())
-> (Maybe (IO ResponsePayload -> IO ()),
    Map Int (IO ResponsePayload -> IO ()))
forall k a.
Ord k =>
(k -> a -> Maybe a) -> k -> Map k a -> (Maybe a, Map k a)
M.updateLookupWithKey (\Int
_ IO ResponsePayload -> IO ()
_ -> Maybe (IO ResponsePayload -> IO ())
forall a. Maybe a
Nothing) Int
id' (LambdaState -> Map Int (IO ResponsePayload -> IO ())
lsInvocations LambdaState
s) of
          (Maybe (IO ResponsePayload -> IO ())
Nothing, Map Int (IO ResponsePayload -> IO ())
_) -> LambdaState -> IO LambdaState
forall (m :: * -> *) a. Monad m => a -> m a
return LambdaState
s
          (Just IO ResponsePayload -> IO ()
x, Map Int (IO ResponsePayload -> IO ())
s') -> LambdaState
s {lsInvocations :: Map Int (IO ResponsePayload -> IO ())
lsInvocations = Map Int (IO ResponsePayload -> IO ())
s'} LambdaState -> IO () -> IO LambdaState
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ IO ResponsePayload -> IO ()
x (ResponsePayload -> IO ResponsePayload
forall (m :: * -> *) a. Monad m => a -> m a
return ResponsePayload
payload)

{-
https://docs.aws.amazon.com/lambda/latest/dg/dlq.html
-}
deadLetterThread :: LambdaEnv -> IO ()
deadLetterThread :: LambdaEnv -> IO ()
deadLetterThread LambdaEnv {Env
MVar LambdaState
StackInfo
leEnv :: Env
leStack :: StackInfo
leState :: MVar LambdaState
leEnv :: LambdaEnv -> Env
leStack :: LambdaEnv -> StackInfo
leState :: LambdaEnv -> MVar LambdaState
..} =
  ResourceT IO () -> IO ()
forall (m :: * -> *) a. MonadUnliftIO m => ResourceT m a -> m a
runResourceT (ResourceT IO () -> IO ())
-> (AWST' Env (ResourceT IO) () -> ResourceT IO ())
-> AWST' Env (ResourceT IO) ()
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Env -> AWST' Env (ResourceT IO) () -> ResourceT IO ()
forall (m :: * -> *) r a.
(MonadResource m, HasEnv r) =>
r -> AWS a -> m a
runAWS Env
leEnv (AWST' Env (ResourceT IO) () -> ResourceT IO ())
-> (AWST' Env (ResourceT IO) () -> AWST' Env (ResourceT IO) ())
-> AWST' Env (ResourceT IO) ()
-> ResourceT IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. AWST' Env (ResourceT IO) () -> AWST' Env (ResourceT IO) ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (AWST' Env (ResourceT IO) () -> IO ())
-> AWST' Env (ResourceT IO) () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    [Message]
msgs <- Text -> AWS [Message]
sqsReceiveSome (Text -> AWS [Message]) -> Text -> AWS [Message]
forall a b. (a -> b) -> a -> b
$ StackInfo -> Text
siDeadLetterQueue StackInfo
leStack
    [Message]
-> (Message -> AWST' Env (ResourceT IO) ())
-> AWST' Env (ResourceT IO) ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Message]
msgs ((Message -> AWST' Env (ResourceT IO) ())
 -> AWST' Env (ResourceT IO) ())
-> (Message -> AWST' Env (ResourceT IO) ())
-> AWST' Env (ResourceT IO) ()
forall a b. (a -> b) -> a -> b
$ \Message
msg -> do
      Int
id' <- IO Int -> AWST' Env (ResourceT IO) Int
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Int -> AWST' Env (ResourceT IO) Int)
-> IO Int -> AWST' Env (ResourceT IO) Int
forall a b. (a -> b) -> a -> b
$ Message -> IO Int
decodeId Message
msg
      IO () -> AWST' Env (ResourceT IO) ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> AWST' Env (ResourceT IO) ())
-> ((LambdaState -> IO LambdaState) -> IO ())
-> (LambdaState -> IO LambdaState)
-> AWST' Env (ResourceT IO) ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MVar LambdaState -> (LambdaState -> IO LambdaState) -> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ MVar LambdaState
leState ((LambdaState -> IO LambdaState) -> AWST' Env (ResourceT IO) ())
-> (LambdaState -> IO LambdaState) -> AWST' Env (ResourceT IO) ()
forall a b. (a -> b) -> a -> b
$ \LambdaState
s ->
        case (Int
 -> (IO ResponsePayload -> IO ())
 -> Maybe (IO ResponsePayload -> IO ()))
-> Int
-> Map Int (IO ResponsePayload -> IO ())
-> (Maybe (IO ResponsePayload -> IO ()),
    Map Int (IO ResponsePayload -> IO ()))
forall k a.
Ord k =>
(k -> a -> Maybe a) -> k -> Map k a -> (Maybe a, Map k a)
M.updateLookupWithKey (\Int
_ IO ResponsePayload -> IO ()
_ -> Maybe (IO ResponsePayload -> IO ())
forall a. Maybe a
Nothing) Int
id' (LambdaState -> Map Int (IO ResponsePayload -> IO ())
lsInvocations LambdaState
s) of
          (Maybe (IO ResponsePayload -> IO ())
Nothing, Map Int (IO ResponsePayload -> IO ())
_) -> LambdaState -> IO LambdaState
forall (m :: * -> *) a. Monad m => a -> m a
return LambdaState
s
          (Just IO ResponsePayload -> IO ()
x, Map Int (IO ResponsePayload -> IO ())
s') -> do
            let errMsg :: Maybe Text
errMsg =
                  Message
msg
                    Message -> Getting (Maybe Text) Message (Maybe Text) -> Maybe Text
forall s a. s -> Getting a s a -> a
^. (HashMap Text MessageAttributeValue
 -> Const (Maybe Text) (HashMap Text MessageAttributeValue))
-> Message -> Const (Maybe Text) Message
Lens' Message (HashMap Text MessageAttributeValue)
mMessageAttributes
                    ((HashMap Text MessageAttributeValue
  -> Const (Maybe Text) (HashMap Text MessageAttributeValue))
 -> Message -> Const (Maybe Text) Message)
-> ((Maybe Text -> Const (Maybe Text) (Maybe Text))
    -> HashMap Text MessageAttributeValue
    -> Const (Maybe Text) (HashMap Text MessageAttributeValue))
-> Getting (Maybe Text) Message (Maybe Text)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Index (HashMap Text MessageAttributeValue)
-> Lens'
     (HashMap Text MessageAttributeValue)
     (Maybe (IxValue (HashMap Text MessageAttributeValue)))
forall m. At m => Index m -> Lens' m (Maybe (IxValue m))
at Index (HashMap Text MessageAttributeValue)
"ErrorMessage"
                    ((Maybe MessageAttributeValue
  -> Const (Maybe Text) (Maybe MessageAttributeValue))
 -> HashMap Text MessageAttributeValue
 -> Const (Maybe Text) (HashMap Text MessageAttributeValue))
-> ((Maybe Text -> Const (Maybe Text) (Maybe Text))
    -> Maybe MessageAttributeValue
    -> Const (Maybe Text) (Maybe MessageAttributeValue))
-> (Maybe Text -> Const (Maybe Text) (Maybe Text))
-> HashMap Text MessageAttributeValue
-> Const (Maybe Text) (HashMap Text MessageAttributeValue)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (MessageAttributeValue -> Const (Maybe Text) MessageAttributeValue)
-> Maybe MessageAttributeValue
-> Const (Maybe Text) (Maybe MessageAttributeValue)
forall a b. Prism (Maybe a) (Maybe b) a b
_Just
                    ((MessageAttributeValue
  -> Const (Maybe Text) MessageAttributeValue)
 -> Maybe MessageAttributeValue
 -> Const (Maybe Text) (Maybe MessageAttributeValue))
-> ((Maybe Text -> Const (Maybe Text) (Maybe Text))
    -> MessageAttributeValue
    -> Const (Maybe Text) MessageAttributeValue)
-> (Maybe Text -> Const (Maybe Text) (Maybe Text))
-> Maybe MessageAttributeValue
-> Const (Maybe Text) (Maybe MessageAttributeValue)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Maybe Text -> Const (Maybe Text) (Maybe Text))
-> MessageAttributeValue
-> Const (Maybe Text) MessageAttributeValue
Lens' MessageAttributeValue (Maybe Text)
mavStringValue
            IO ResponsePayload -> IO ()
x (IO ResponsePayload -> IO ())
-> (Text -> IO ResponsePayload) -> Text -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. InvokeException -> IO ResponsePayload
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (InvokeException -> IO ResponsePayload)
-> (Text -> InvokeException) -> Text -> IO ResponsePayload
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> InvokeException
InvokeException (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$
              Text
"Lambda function failed: "
                Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Maybe Text -> Text
forall a. a -> Maybe a -> a
fromMaybe Text
"error message not found." Maybe Text
errMsg
            LambdaState -> IO LambdaState
forall (m :: * -> *) a. Monad m => a -> m a
return (LambdaState -> IO LambdaState) -> LambdaState -> IO LambdaState
forall a b. (a -> b) -> a -> b
$ LambdaState
s {lsInvocations :: Map Int (IO ResponsePayload -> IO ())
lsInvocations = Map Int (IO ResponsePayload -> IO ())
s'}
  where
    decodeId :: Message -> IO Int
    decodeId :: Message -> IO Int
decodeId Message
msg = case Message
msg Message
-> Getting (First Scientific) Message Scientific
-> Maybe Scientific
forall s a. s -> Getting (First a) s a -> Maybe a
^? (Maybe Text -> Const (First Scientific) (Maybe Text))
-> Message -> Const (First Scientific) Message
Lens' Message (Maybe Text)
mBody ((Maybe Text -> Const (First Scientific) (Maybe Text))
 -> Message -> Const (First Scientific) Message)
-> ((Scientific -> Const (First Scientific) Scientific)
    -> Maybe Text -> Const (First Scientific) (Maybe Text))
-> Getting (First Scientific) Message Scientific
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Text -> Const (First Scientific) Text)
-> Maybe Text -> Const (First Scientific) (Maybe Text)
forall a b. Prism (Maybe a) (Maybe b) a b
_Just ((Text -> Const (First Scientific) Text)
 -> Maybe Text -> Const (First Scientific) (Maybe Text))
-> ((Scientific -> Const (First Scientific) Scientific)
    -> Text -> Const (First Scientific) Text)
-> (Scientific -> Const (First Scientific) Scientific)
-> Maybe Text
-> Const (First Scientific) (Maybe Text)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> Traversal' Text Value
forall t. AsValue t => Text -> Traversal' t Value
key Text
"i" ((Value -> Const (First Scientific) Value)
 -> Text -> Const (First Scientific) Text)
-> ((Scientific -> Const (First Scientific) Scientific)
    -> Value -> Const (First Scientific) Value)
-> (Scientific -> Const (First Scientific) Scientific)
-> Text
-> Const (First Scientific) Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Scientific -> Const (First Scientific) Scientific)
-> Value -> Const (First Scientific) Value
forall t. AsNumber t => Prism' t Scientific
_Number of
      Maybe Scientific
Nothing ->
        InvokeException -> IO Int
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (InvokeException -> IO Int)
-> (Text -> InvokeException) -> Text -> IO Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> InvokeException
InvokeException (Text -> IO Int) -> Text -> IO Int
forall a b. (a -> b) -> a -> b
$
          Text
"Can not find Id: "
            Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (Message -> String
forall a. Show a => a -> String
show Message
msg)
      Just Scientific
x -> Int -> IO Int
forall (m :: * -> *) a. Monad m => a -> m a
return (Int -> IO Int) -> Int -> IO Int
forall a b. (a -> b) -> a -> b
$ Scientific -> Int
forall a b. (RealFrac a, Integral b) => a -> b
truncate Scientific
x

{-
A helper function to read from SQS queues.
-}
sqsReceiveSome :: T.Text -> AWS [Message]
sqsReceiveSome :: Text -> AWS [Message]
sqsReceiveSome Text
queue = do
  ReceiveMessageResponse
rmrs <-
    ReceiveMessage -> AWST' Env (ResourceT IO) (Rs ReceiveMessage)
forall (m :: * -> *) a. (MonadAWS m, AWSRequest a) => a -> m (Rs a)
send (ReceiveMessage -> AWST' Env (ResourceT IO) (Rs ReceiveMessage))
-> ReceiveMessage -> AWST' Env (ResourceT IO) (Rs ReceiveMessage)
forall a b. (a -> b) -> a -> b
$
      Text -> ReceiveMessage
receiveMessage Text
queue
        ReceiveMessage
-> (ReceiveMessage -> ReceiveMessage) -> ReceiveMessage
forall a b. a -> (a -> b) -> b
& (Maybe Int -> Identity (Maybe Int))
-> ReceiveMessage -> Identity ReceiveMessage
Lens' ReceiveMessage (Maybe Int)
rmVisibilityTimeout
        ((Maybe Int -> Identity (Maybe Int))
 -> ReceiveMessage -> Identity ReceiveMessage)
-> Int -> ReceiveMessage -> ReceiveMessage
forall s t a b. ASetter s t a (Maybe b) -> b -> s -> t
?~ Int
10
        ReceiveMessage
-> (ReceiveMessage -> ReceiveMessage) -> ReceiveMessage
forall a b. a -> (a -> b) -> b
& (Maybe Int -> Identity (Maybe Int))
-> ReceiveMessage -> Identity ReceiveMessage
Lens' ReceiveMessage (Maybe Int)
rmWaitTimeSeconds
        ((Maybe Int -> Identity (Maybe Int))
 -> ReceiveMessage -> Identity ReceiveMessage)
-> Int -> ReceiveMessage -> ReceiveMessage
forall s t a b. ASetter s t a (Maybe b) -> b -> s -> t
?~ Int
10
        ReceiveMessage
-> (ReceiveMessage -> ReceiveMessage) -> ReceiveMessage
forall a b. a -> (a -> b) -> b
& (Maybe Int -> Identity (Maybe Int))
-> ReceiveMessage -> Identity ReceiveMessage
Lens' ReceiveMessage (Maybe Int)
rmMaxNumberOfMessages
        ((Maybe Int -> Identity (Maybe Int))
 -> ReceiveMessage -> Identity ReceiveMessage)
-> Int -> ReceiveMessage -> ReceiveMessage
forall s t a b. ASetter s t a (Maybe b) -> b -> s -> t
?~ Int
10
        ReceiveMessage
-> (ReceiveMessage -> ReceiveMessage) -> ReceiveMessage
forall a b. a -> (a -> b) -> b
& ([Text] -> Identity [Text])
-> ReceiveMessage -> Identity ReceiveMessage
Lens' ReceiveMessage [Text]
rmMessageAttributeNames
        (([Text] -> Identity [Text])
 -> ReceiveMessage -> Identity ReceiveMessage)
-> [Text] -> ReceiveMessage -> ReceiveMessage
forall s t a b. ASetter s t a b -> b -> s -> t
.~ [Text
"All"]
  Bool -> AWST' Env (ResourceT IO) () -> AWST' Env (ResourceT IO) ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (ReceiveMessageResponse
rmrs ReceiveMessageResponse
-> Getting Int ReceiveMessageResponse Int -> Int
forall s a. s -> Getting a s a -> a
^. Getting Int ReceiveMessageResponse Int
Lens' ReceiveMessageResponse Int
rmrsResponseStatus Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
200)
    (AWST' Env (ResourceT IO) () -> AWST' Env (ResourceT IO) ())
-> AWST' Env (ResourceT IO) () -> AWST' Env (ResourceT IO) ()
forall a b. (a -> b) -> a -> b
$ IO () -> AWST' Env (ResourceT IO) ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
      (IO () -> AWST' Env (ResourceT IO) ())
-> (Text -> IO ()) -> Text -> AWST' Env (ResourceT IO) ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. InvokeException -> IO ()
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO
      (InvokeException -> IO ())
-> (Text -> InvokeException) -> Text -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> InvokeException
InvokeException
    (Text -> AWST' Env (ResourceT IO) ())
-> Text -> AWST' Env (ResourceT IO) ()
forall a b. (a -> b) -> a -> b
$ Text
"Error receiving messages: "
      Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (Int -> String
forall a. Show a => a -> String
show (Int -> String) -> Int -> String
forall a b. (a -> b) -> a -> b
$ ReceiveMessageResponse
rmrs ReceiveMessageResponse
-> Getting Int ReceiveMessageResponse Int -> Int
forall s a. s -> Getting a s a -> a
^. Getting Int ReceiveMessageResponse Int
Lens' ReceiveMessageResponse Int
rmrsResponseStatus)
  let msgs :: [Message]
msgs = ReceiveMessageResponse
rmrs ReceiveMessageResponse
-> Getting [Message] ReceiveMessageResponse [Message] -> [Message]
forall s a. s -> Getting a s a -> a
^. Getting [Message] ReceiveMessageResponse [Message]
Lens' ReceiveMessageResponse [Message]
rmrsMessages
  Bool -> AWST' Env (ResourceT IO) () -> AWST' Env (ResourceT IO) ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless ([Message] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [Message]
msgs) (AWST' Env (ResourceT IO) () -> AWST' Env (ResourceT IO) ())
-> AWST' Env (ResourceT IO) () -> AWST' Env (ResourceT IO) ()
forall a b. (a -> b) -> a -> b
$ do
    DeleteMessageBatchResponse
dmbrs <-
      DeleteMessageBatch
-> AWST' Env (ResourceT IO) (Rs DeleteMessageBatch)
forall (m :: * -> *) a. (MonadAWS m, AWSRequest a) => a -> m (Rs a)
send (DeleteMessageBatch
 -> AWST' Env (ResourceT IO) (Rs DeleteMessageBatch))
-> DeleteMessageBatch
-> AWST' Env (ResourceT IO) (Rs DeleteMessageBatch)
forall a b. (a -> b) -> a -> b
$
        Text -> DeleteMessageBatch
deleteMessageBatch Text
queue
          DeleteMessageBatch
-> (DeleteMessageBatch -> DeleteMessageBatch) -> DeleteMessageBatch
forall a b. a -> (a -> b) -> b
& ([DeleteMessageBatchRequestEntry]
 -> Identity [DeleteMessageBatchRequestEntry])
-> DeleteMessageBatch -> Identity DeleteMessageBatch
Lens' DeleteMessageBatch [DeleteMessageBatchRequestEntry]
dmbEntries
          (([DeleteMessageBatchRequestEntry]
  -> Identity [DeleteMessageBatchRequestEntry])
 -> DeleteMessageBatch -> Identity DeleteMessageBatch)
-> [DeleteMessageBatchRequestEntry]
-> DeleteMessageBatch
-> DeleteMessageBatch
forall s t a b. ASetter s t a b -> b -> s -> t
.~ [ Text -> Text -> DeleteMessageBatchRequestEntry
deleteMessageBatchRequestEntry
                 (String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Integer -> String
forall a. Show a => a -> String
show Integer
i)
                 (Maybe Text -> Text
forall a. HasCallStack => Maybe a -> a
fromJust (Maybe Text -> Text) -> Maybe Text -> Text
forall a b. (a -> b) -> a -> b
$ Message
msg Message -> Getting (Maybe Text) Message (Maybe Text) -> Maybe Text
forall s a. s -> Getting a s a -> a
^. Getting (Maybe Text) Message (Maybe Text)
Lens' Message (Maybe Text)
mReceiptHandle)
               | (Integer
i, Message
msg) <- [Integer] -> [Message] -> [(Integer, Message)]
forall a b. [a] -> [b] -> [(a, b)]
zip [(Integer
0 :: Integer) ..] [Message]
msgs
             ]
    Bool -> AWST' Env (ResourceT IO) () -> AWST' Env (ResourceT IO) ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (DeleteMessageBatchResponse
dmbrs DeleteMessageBatchResponse
-> Getting Int DeleteMessageBatchResponse Int -> Int
forall s a. s -> Getting a s a -> a
^. Getting Int DeleteMessageBatchResponse Int
Lens' DeleteMessageBatchResponse Int
dmbrsResponseStatus Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
200)
      (AWST' Env (ResourceT IO) () -> AWST' Env (ResourceT IO) ())
-> AWST' Env (ResourceT IO) () -> AWST' Env (ResourceT IO) ()
forall a b. (a -> b) -> a -> b
$ IO () -> AWST' Env (ResourceT IO) ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
        (IO () -> AWST' Env (ResourceT IO) ())
-> (Text -> IO ()) -> Text -> AWST' Env (ResourceT IO) ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. InvokeException -> IO ()
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO
        (InvokeException -> IO ())
-> (Text -> InvokeException) -> Text -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> InvokeException
InvokeException
      (Text -> AWST' Env (ResourceT IO) ())
-> Text -> AWST' Env (ResourceT IO) ()
forall a b. (a -> b) -> a -> b
$ Text
"Error deleting received messages: "
        Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (Int -> String
forall a. Show a => a -> String
show (Int -> String) -> Int -> String
forall a b. (a -> b) -> a -> b
$ ReceiveMessageResponse
rmrs ReceiveMessageResponse
-> Getting Int ReceiveMessageResponse Int -> Int
forall s a. s -> Getting a s a -> a
^. Getting Int ReceiveMessageResponse Int
Lens' ReceiveMessageResponse Int
rmrsResponseStatus)
  [Message] -> AWS [Message]
forall (m :: * -> *) a. Monad m => a -> m a
return [Message]
msgs

withInvoke ::
  Env ->
  (Throttle, Throttle, Throttle) ->
  StackInfo ->
  ((BS.ByteString -> BackendM BS.ByteString) -> IO a) ->
  IO a
withInvoke :: Env
-> (Throttle, Throttle, Throttle)
-> StackInfo
-> ((ByteString -> BackendM ByteString) -> IO a)
-> IO a
withInvoke Env
env (Throttle, Throttle, Throttle)
throttles StackInfo
stack (ByteString -> BackendM ByteString) -> IO a
f = do
  LambdaEnv
le <- Env -> StackInfo -> IO LambdaEnv
newLambdaEnv Env
env StackInfo
stack
  let answerT :: IO (Async a)
answerT = IO a -> IO (Async a)
forall a. IO a -> IO (Async a)
async (IO a -> IO (Async a)) -> (IO () -> IO a) -> IO () -> IO (Async a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> IO a
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO (Async a)) -> IO () -> IO (Async a)
forall a b. (a -> b) -> a -> b
$ IO () -> (SomeException -> IO ()) -> IO ()
forall (m :: * -> *) a.
MonadCatch m =>
m a -> (SomeException -> m a) -> m a
catchAny (LambdaEnv -> IO ()
answerThread LambdaEnv
le) ((SomeException -> IO ()) -> IO ())
-> (SomeException -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \SomeException
ex -> SomeException -> IO ()
forall a. Show a => a -> IO ()
print SomeException
ex
      deadLetterT :: IO (Async a)
deadLetterT = IO a -> IO (Async a)
forall a. IO a -> IO (Async a)
async (IO a -> IO (Async a)) -> (IO () -> IO a) -> IO () -> IO (Async a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> IO a
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO (Async a)) -> IO () -> IO (Async a)
forall a b. (a -> b) -> a -> b
$ IO () -> (SomeException -> IO ()) -> IO ()
forall (m :: * -> *) a.
MonadCatch m =>
m a -> (SomeException -> m a) -> m a
catchAny (LambdaEnv -> IO ()
deadLetterThread LambdaEnv
le) ((SomeException -> IO ()) -> IO ())
-> (SomeException -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \SomeException
ex -> SomeException -> IO ()
forall a. Show a => a -> IO ()
print SomeException
ex
  [Async Any]
threads <- [Async Any] -> [Async Any] -> [Async Any]
forall a. [a] -> [a] -> [a]
(++) ([Async Any] -> [Async Any] -> [Async Any])
-> IO [Async Any] -> IO ([Async Any] -> [Async Any])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Int -> IO (Async Any) -> IO [Async Any]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
4 IO (Async Any)
forall a. IO (Async a)
answerT IO ([Async Any] -> [Async Any]) -> IO [Async Any] -> IO [Async Any]
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Int -> IO (Async Any) -> IO [Async Any]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
2 IO (Async Any)
forall a. IO (Async a)
deadLetterT
  (ByteString -> BackendM ByteString) -> IO a
f (LambdaEnv
-> (Throttle, Throttle, Throttle)
-> ByteString
-> BackendM ByteString
execute LambdaEnv
le (Throttle, Throttle, Throttle)
throttles) IO a -> IO () -> IO a
forall (m :: * -> *) a b. MonadMask m => m a -> m b -> m a
`finally` (Async Any -> IO ()) -> [Async Any] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Async Any -> IO ()
forall a. Async a -> IO ()
cancel [Async Any]
threads

newtype InvokeException
  = InvokeException T.Text
  deriving (Int -> InvokeException -> ShowS
[InvokeException] -> ShowS
InvokeException -> String
(Int -> InvokeException -> ShowS)
-> (InvokeException -> String)
-> ([InvokeException] -> ShowS)
-> Show InvokeException
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [InvokeException] -> ShowS
$cshowList :: [InvokeException] -> ShowS
show :: InvokeException -> String
$cshow :: InvokeException -> String
showsPrec :: Int -> InvokeException -> ShowS
$cshowsPrec :: Int -> InvokeException -> ShowS
Show)

instance Exception InvokeException