{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
module Control.Distributed.Fork.AWS.Lambda.Internal.Invoke
( withInvoke,
)
where
import Conduit
import Control.Concurrent.Async
import Control.Concurrent.MVar
import Control.Concurrent.Throttled
import Control.Distributed.Fork.AWS.Lambda.Internal.Stack (StackInfo (..))
import Control.Distributed.Fork.AWS.Lambda.Internal.Types
import Control.Distributed.Fork.Backend
import Control.Exception.Safe
import Control.Lens
import Control.Monad
import qualified Data.Aeson as A
import Data.Aeson.Lens
import qualified Data.ByteString as BS
import Data.ByteString.Base64 as B64
import qualified Data.ByteString.Lazy as BL
import qualified Data.Map.Strict as M
import Data.Maybe
import qualified Data.Text as T
import qualified Data.Text.Encoding as T
import Network.AWS
import Network.AWS.Lambda
import Network.AWS.S3 as S3
import Network.AWS.SQS
data LambdaState
= LambdaState
{ LambdaState -> Map Int (IO ResponsePayload -> IO ())
lsInvocations :: M.Map Int (IO ResponsePayload -> IO ()),
LambdaState -> Int
lsNextId :: Int
}
data LambdaEnv
= LambdaEnv
{ LambdaEnv -> MVar LambdaState
leState :: MVar LambdaState,
LambdaEnv -> StackInfo
leStack :: StackInfo,
LambdaEnv -> Env
leEnv :: Env
}
newLambdaEnv :: Env -> StackInfo -> IO LambdaEnv
newLambdaEnv :: Env -> StackInfo -> IO LambdaEnv
newLambdaEnv Env
env StackInfo
st =
MVar LambdaState -> StackInfo -> Env -> LambdaEnv
LambdaEnv
(MVar LambdaState -> StackInfo -> Env -> LambdaEnv)
-> IO (MVar LambdaState) -> IO (StackInfo -> Env -> LambdaEnv)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> LambdaState -> IO (MVar LambdaState)
forall a. a -> IO (MVar a)
newMVar (Map Int (IO ResponsePayload -> IO ()) -> Int -> LambdaState
LambdaState Map Int (IO ResponsePayload -> IO ())
forall k a. Map k a
M.empty Int
0)
IO (StackInfo -> Env -> LambdaEnv)
-> IO StackInfo -> IO (Env -> LambdaEnv)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> StackInfo -> IO StackInfo
forall (m :: * -> *) a. Monad m => a -> m a
return StackInfo
st
IO (Env -> LambdaEnv) -> IO Env -> IO LambdaEnv
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Env -> IO Env
forall (m :: * -> *) a. Monad m => a -> m a
return Env
env
execute ::
LambdaEnv ->
(Throttle, Throttle, Throttle) ->
BS.ByteString ->
BackendM BS.ByteString
execute :: LambdaEnv
-> (Throttle, Throttle, Throttle)
-> ByteString
-> BackendM ByteString
execute LambdaEnv {Env
MVar LambdaState
StackInfo
leEnv :: Env
leStack :: StackInfo
leState :: MVar LambdaState
leEnv :: LambdaEnv -> Env
leStack :: LambdaEnv -> StackInfo
leState :: LambdaEnv -> MVar LambdaState
..} (Throttle
invocationThrottle, Throttle
executionThrottle, Throttle
downloadThrottle) ByteString
input = do
MVar (IO ResponsePayload)
mvar <- IO (MVar (IO ResponsePayload))
-> BackendM (MVar (IO ResponsePayload))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (MVar (IO ResponsePayload))
-> BackendM (MVar (IO ResponsePayload)))
-> IO (MVar (IO ResponsePayload))
-> BackendM (MVar (IO ResponsePayload))
forall a b. (a -> b) -> a -> b
$ IO (MVar (IO ResponsePayload))
forall a. IO (MVar a)
newEmptyMVar @(IO ResponsePayload)
Int
id' <-
IO Int -> BackendM Int
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Int -> BackendM Int) -> IO Int -> BackendM Int
forall a b. (a -> b) -> a -> b
$ MVar LambdaState
-> (LambdaState -> IO (LambdaState, Int)) -> IO Int
forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar MVar LambdaState
leState ((LambdaState -> IO (LambdaState, Int)) -> IO Int)
-> (LambdaState -> IO (LambdaState, Int)) -> IO Int
forall a b. (a -> b) -> a -> b
$ \LambdaState {Int
Map Int (IO ResponsePayload -> IO ())
lsNextId :: Int
lsInvocations :: Map Int (IO ResponsePayload -> IO ())
lsNextId :: LambdaState -> Int
lsInvocations :: LambdaState -> Map Int (IO ResponsePayload -> IO ())
..} ->
(LambdaState, Int) -> IO (LambdaState, Int)
forall (m :: * -> *) a. Monad m => a -> m a
return
( LambdaState :: Map Int (IO ResponsePayload -> IO ()) -> Int -> LambdaState
LambdaState
{ lsNextId :: Int
lsNextId = Int
lsNextId Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1,
lsInvocations :: Map Int (IO ResponsePayload -> IO ())
lsInvocations = Int
-> (IO ResponsePayload -> IO ())
-> Map Int (IO ResponsePayload -> IO ())
-> Map Int (IO ResponsePayload -> IO ())
forall k a. Ord k => k -> a -> Map k a -> Map k a
M.insert Int
lsNextId (IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ())
-> (IO ResponsePayload -> IO Bool) -> IO ResponsePayload -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MVar (IO ResponsePayload) -> IO ResponsePayload -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar MVar (IO ResponsePayload)
mvar) Map Int (IO ResponsePayload -> IO ())
lsInvocations
},
Int
lsNextId
)
ResponsePayload
answer <-
Throttle
-> forall (m :: * -> *) a. (MonadIO m, MonadMask m) => m a -> m a
throttled Throttle
executionThrottle (BackendM ResponsePayload -> BackendM ResponsePayload)
-> BackendM ResponsePayload -> BackendM ResponsePayload
forall a b. (a -> b) -> a -> b
$ do
InvokeResponse
irs <-
Throttle
-> forall (m :: * -> *) a. (MonadIO m, MonadMask m) => m a -> m a
throttled Throttle
invocationThrottle (BackendM InvokeResponse -> BackendM InvokeResponse)
-> BackendM InvokeResponse -> BackendM InvokeResponse
forall a b. (a -> b) -> a -> b
$ ResourceT BackendM InvokeResponse -> BackendM InvokeResponse
forall (m :: * -> *) a. MonadUnliftIO m => ResourceT m a -> m a
runResourceT (ResourceT BackendM InvokeResponse -> BackendM InvokeResponse)
-> (AWS InvokeResponse -> ResourceT BackendM InvokeResponse)
-> AWS InvokeResponse
-> BackendM InvokeResponse
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Env -> AWS InvokeResponse -> ResourceT BackendM InvokeResponse
forall (m :: * -> *) r a.
(MonadResource m, HasEnv r) =>
r -> AWS a -> m a
runAWS Env
leEnv
(AWS InvokeResponse -> BackendM InvokeResponse)
-> AWS InvokeResponse -> BackendM InvokeResponse
forall a b. (a -> b) -> a -> b
$ Invoke -> AWST' Env (ResourceT IO) (Rs Invoke)
forall (m :: * -> *) a. (MonadAWS m, AWSRequest a) => a -> m (Rs a)
send
(Invoke -> AWST' Env (ResourceT IO) (Rs Invoke))
-> Invoke -> AWST' Env (ResourceT IO) (Rs Invoke)
forall a b. (a -> b) -> a -> b
$ Text -> ByteString -> Invoke
invoke
(StackInfo -> Text
siFunc StackInfo
leStack)
( ByteString -> ByteString
BL.toStrict (ByteString -> ByteString)
-> (Value -> ByteString) -> Value -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Value -> ByteString
forall a. ToJSON a => a -> ByteString
A.encode (Value -> ByteString) -> Value -> ByteString
forall a b. (a -> b) -> a -> b
$
[Pair] -> Value
A.object
[ (String -> Text
T.pack String
"d", Text -> Value
forall a. ToJSON a => a -> Value
A.toJSON (Text -> Value) -> (ByteString -> Text) -> ByteString -> Value
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Text
T.decodeUtf8 (ByteString -> Value) -> ByteString -> Value
forall a b. (a -> b) -> a -> b
$ ByteString -> ByteString
B64.encode ByteString
input),
(String -> Text
T.pack String
"i", Int -> Value
forall a. ToJSON a => a -> Value
A.toJSON Int
id')
]
)
Invoke -> (Invoke -> Invoke) -> Invoke
forall a b. a -> (a -> b) -> b
& (Maybe InvocationType -> Identity (Maybe InvocationType))
-> Invoke -> Identity Invoke
Lens' Invoke (Maybe InvocationType)
iInvocationType
((Maybe InvocationType -> Identity (Maybe InvocationType))
-> Invoke -> Identity Invoke)
-> InvocationType -> Invoke -> Invoke
forall s t a b. ASetter s t a (Maybe b) -> b -> s -> t
?~ InvocationType
Event
BackendM ()
submitted
Bool -> BackendM () -> BackendM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (InvokeResponse
irs InvokeResponse -> Getting Int InvokeResponse Int -> Int
forall s a. s -> Getting a s a -> a
^. Getting Int InvokeResponse Int
Lens' InvokeResponse Int
irsStatusCode Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` Int
100 Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
2)
(BackendM () -> BackendM ()) -> BackendM () -> BackendM ()
forall a b. (a -> b) -> a -> b
$ InvokeException -> BackendM ()
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (InvokeException -> BackendM ())
-> (Text -> InvokeException) -> Text -> BackendM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> InvokeException
InvokeException
(Text -> BackendM ()) -> Text -> BackendM ()
forall a b. (a -> b) -> a -> b
$ Text
"Invoke failed. Status code: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (Int -> String
forall a. Show a => a -> String
show (Int -> String) -> Int -> String
forall a b. (a -> b) -> a -> b
$ InvokeResponse
irs InvokeResponse -> Getting Int InvokeResponse Int -> Int
forall s a. s -> Getting a s a -> a
^. Getting Int InvokeResponse Int
Lens' InvokeResponse Int
irsStatusCode)
IO ResponsePayload -> BackendM ResponsePayload
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ResponsePayload -> BackendM ResponsePayload)
-> (IO (IO ResponsePayload) -> IO ResponsePayload)
-> IO (IO ResponsePayload)
-> BackendM ResponsePayload
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO (IO ResponsePayload) -> IO ResponsePayload
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO ResponsePayload) -> BackendM ResponsePayload)
-> IO (IO ResponsePayload) -> BackendM ResponsePayload
forall a b. (a -> b) -> a -> b
$ MVar (IO ResponsePayload) -> IO (IO ResponsePayload)
forall a. MVar a -> IO a
readMVar MVar (IO ResponsePayload)
mvar
case ResponsePayload
answer of
ResponsePayloadInline Text
p ->
case ByteString -> Either String ByteString
B64.decode (ByteString -> Either String ByteString)
-> (Text -> ByteString) -> Text -> Either String ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> ByteString
T.encodeUtf8 (Text -> Either String ByteString)
-> Text -> Either String ByteString
forall a b. (a -> b) -> a -> b
$ Text
p of
Left String
err ->
InvokeException -> BackendM ByteString
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (InvokeException -> BackendM ByteString)
-> (Text -> InvokeException) -> Text -> BackendM ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> InvokeException
InvokeException (Text -> BackendM ByteString) -> Text -> BackendM ByteString
forall a b. (a -> b) -> a -> b
$
Text
"Error decoding answer: "
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack String
err
Right ByteString
x -> ByteString -> BackendM ByteString
forall (m :: * -> *) a. Monad m => a -> m a
return ByteString
x
ResponsePayloadS3 Text
p ->
Throttle
-> forall (m :: * -> *) a. (MonadIO m, MonadMask m) => m a -> m a
throttled Throttle
downloadThrottle
(BackendM ByteString -> BackendM ByteString)
-> BackendM ByteString -> BackendM ByteString
forall a b. (a -> b) -> a -> b
$ ResourceT BackendM ByteString -> BackendM ByteString
forall (m :: * -> *) a. MonadUnliftIO m => ResourceT m a -> m a
runResourceT
(ResourceT BackendM ByteString -> BackendM ByteString)
-> (AWS ByteString -> ResourceT BackendM ByteString)
-> AWS ByteString
-> BackendM ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Env -> AWS ByteString -> ResourceT BackendM ByteString
forall (m :: * -> *) r a.
(MonadResource m, HasEnv r) =>
r -> AWS a -> m a
runAWS Env
leEnv
(AWS ByteString -> BackendM ByteString)
-> AWS ByteString -> BackendM ByteString
forall a b. (a -> b) -> a -> b
$ do
let bucketName :: BucketName
bucketName = Text -> BucketName
S3.BucketName (Text -> BucketName) -> Text -> BucketName
forall a b. (a -> b) -> a -> b
$ StackInfo -> Text
siAnswerBucket StackInfo
leStack
GetObjectResponse
gors <- GetObject -> AWST' Env (ResourceT IO) (Rs GetObject)
forall (m :: * -> *) a. (MonadAWS m, AWSRequest a) => a -> m (Rs a)
send (GetObject -> AWST' Env (ResourceT IO) (Rs GetObject))
-> GetObject -> AWST' Env (ResourceT IO) (Rs GetObject)
forall a b. (a -> b) -> a -> b
$ BucketName -> ObjectKey -> GetObject
getObject BucketName
bucketName (Text -> ObjectKey
ObjectKey Text
p)
Bool -> AWST' Env (ResourceT IO) () -> AWST' Env (ResourceT IO) ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (GetObjectResponse
gors GetObjectResponse -> Getting Int GetObjectResponse Int -> Int
forall s a. s -> Getting a s a -> a
^. Getting Int GetObjectResponse Int
Lens' GetObjectResponse Int
gorsResponseStatus Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` Int
100 Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
2)
(AWST' Env (ResourceT IO) () -> AWST' Env (ResourceT IO) ())
-> AWST' Env (ResourceT IO) () -> AWST' Env (ResourceT IO) ()
forall a b. (a -> b) -> a -> b
$ InvokeException -> AWST' Env (ResourceT IO) ()
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO
(InvokeException -> AWST' Env (ResourceT IO) ())
-> (Text -> InvokeException) -> Text -> AWST' Env (ResourceT IO) ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> InvokeException
InvokeException
(Text -> AWST' Env (ResourceT IO) ())
-> Text -> AWST' Env (ResourceT IO) ()
forall a b. (a -> b) -> a -> b
$ Text
"Downloading result failed. Status code: "
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (Int -> String
forall a. Show a => a -> String
show (Int -> String) -> Int -> String
forall a b. (a -> b) -> a -> b
$ GetObjectResponse
gors GetObjectResponse -> Getting Int GetObjectResponse Int -> Int
forall s a. s -> Getting a s a -> a
^. Getting Int GetObjectResponse Int
Lens' GetObjectResponse Int
gorsResponseStatus)
ByteString
bs <- [ByteString] -> ByteString
forall a. Monoid a => [a] -> a
mconcat ([ByteString] -> ByteString)
-> AWST' Env (ResourceT IO) [ByteString] -> AWS ByteString
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> RsBody
-> ConduitM ByteString Void (ResourceT IO) [ByteString]
-> AWST' Env (ResourceT IO) [ByteString]
forall (m :: * -> *) a.
MonadIO m =>
RsBody -> ConduitM ByteString Void (ResourceT IO) a -> m a
sinkBody (GetObjectResponse
gors GetObjectResponse
-> Getting RsBody GetObjectResponse RsBody -> RsBody
forall s a. s -> Getting a s a -> a
^. Getting RsBody GetObjectResponse RsBody
Lens' GetObjectResponse RsBody
gorsBody) ConduitM ByteString Void (ResourceT IO) [ByteString]
forall (m :: * -> *) a o. Monad m => ConduitT a o m [a]
sinkList
DeleteObjectResponse
dors <- DeleteObject -> AWST' Env (ResourceT IO) (Rs DeleteObject)
forall (m :: * -> *) a. (MonadAWS m, AWSRequest a) => a -> m (Rs a)
send (DeleteObject -> AWST' Env (ResourceT IO) (Rs DeleteObject))
-> DeleteObject -> AWST' Env (ResourceT IO) (Rs DeleteObject)
forall a b. (a -> b) -> a -> b
$ BucketName -> ObjectKey -> DeleteObject
deleteObject BucketName
bucketName (Text -> ObjectKey
ObjectKey Text
p)
Bool -> AWST' Env (ResourceT IO) () -> AWST' Env (ResourceT IO) ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (DeleteObjectResponse
dors DeleteObjectResponse -> Getting Int DeleteObjectResponse Int -> Int
forall s a. s -> Getting a s a -> a
^. Getting Int DeleteObjectResponse Int
Lens' DeleteObjectResponse Int
dorsResponseStatus Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` Int
100 Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
2)
(AWST' Env (ResourceT IO) () -> AWST' Env (ResourceT IO) ())
-> AWST' Env (ResourceT IO) () -> AWST' Env (ResourceT IO) ()
forall a b. (a -> b) -> a -> b
$ InvokeException -> AWST' Env (ResourceT IO) ()
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO
(InvokeException -> AWST' Env (ResourceT IO) ())
-> (Text -> InvokeException) -> Text -> AWST' Env (ResourceT IO) ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> InvokeException
InvokeException
(Text -> AWST' Env (ResourceT IO) ())
-> Text -> AWST' Env (ResourceT IO) ()
forall a b. (a -> b) -> a -> b
$ Text
"Deleting result failed. Status code: "
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (Int -> String
forall a. Show a => a -> String
show (Int -> String) -> Int -> String
forall a b. (a -> b) -> a -> b
$ DeleteObjectResponse
dors DeleteObjectResponse -> Getting Int DeleteObjectResponse Int -> Int
forall s a. s -> Getting a s a -> a
^. Getting Int DeleteObjectResponse Int
Lens' DeleteObjectResponse Int
dorsResponseStatus)
ByteString -> AWS ByteString
forall (m :: * -> *) a. Monad m => a -> m a
return ByteString
bs
answerThread :: LambdaEnv -> IO ()
answerThread :: LambdaEnv -> IO ()
answerThread LambdaEnv {Env
MVar LambdaState
StackInfo
leEnv :: Env
leStack :: StackInfo
leState :: MVar LambdaState
leEnv :: LambdaEnv -> Env
leStack :: LambdaEnv -> StackInfo
leState :: LambdaEnv -> MVar LambdaState
..} =
ResourceT IO () -> IO ()
forall (m :: * -> *) a. MonadUnliftIO m => ResourceT m a -> m a
runResourceT (ResourceT IO () -> IO ())
-> (AWST' Env (ResourceT IO) () -> ResourceT IO ())
-> AWST' Env (ResourceT IO) ()
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Env -> AWST' Env (ResourceT IO) () -> ResourceT IO ()
forall (m :: * -> *) r a.
(MonadResource m, HasEnv r) =>
r -> AWS a -> m a
runAWS Env
leEnv (AWST' Env (ResourceT IO) () -> ResourceT IO ())
-> (AWST' Env (ResourceT IO) () -> AWST' Env (ResourceT IO) ())
-> AWST' Env (ResourceT IO) ()
-> ResourceT IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. AWST' Env (ResourceT IO) () -> AWST' Env (ResourceT IO) ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (AWST' Env (ResourceT IO) () -> IO ())
-> AWST' Env (ResourceT IO) () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
[Message]
msgs <- Text -> AWS [Message]
sqsReceiveSome (Text -> AWS [Message]) -> Text -> AWS [Message]
forall a b. (a -> b) -> a -> b
$ StackInfo -> Text
siAnswerQueue StackInfo
leStack
[Message]
-> (Message -> AWST' Env (ResourceT IO) ())
-> AWST' Env (ResourceT IO) ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Message]
msgs ((Message -> AWST' Env (ResourceT IO) ())
-> AWST' Env (ResourceT IO) ())
-> (Message -> AWST' Env (ResourceT IO) ())
-> AWST' Env (ResourceT IO) ()
forall a b. (a -> b) -> a -> b
$ \Message
msg -> do
Response Int
id' ResponsePayload
payload <-
case ByteString -> Maybe Response
forall a. FromJSON a => ByteString -> Maybe a
A.decodeStrict (ByteString -> Maybe Response)
-> (Text -> ByteString) -> Text -> Maybe Response
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> ByteString
T.encodeUtf8 (Text -> Maybe Response) -> Maybe Text -> Maybe (Maybe Response)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Message
msg Message -> Getting (Maybe Text) Message (Maybe Text) -> Maybe Text
forall s a. s -> Getting a s a -> a
^. Getting (Maybe Text) Message (Maybe Text)
Lens' Message (Maybe Text)
mBody of
Maybe (Maybe Response)
Nothing ->
InvokeException -> AWST' Env (ResourceT IO) Response
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (InvokeException -> AWST' Env (ResourceT IO) Response)
-> (Text -> InvokeException)
-> Text
-> AWST' Env (ResourceT IO) Response
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> InvokeException
InvokeException (Text -> AWST' Env (ResourceT IO) Response)
-> Text -> AWST' Env (ResourceT IO) Response
forall a b. (a -> b) -> a -> b
$
Text
"Error decoding answer: no body."
Just Maybe Response
Nothing ->
InvokeException -> AWST' Env (ResourceT IO) Response
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (InvokeException -> AWST' Env (ResourceT IO) Response)
-> (Text -> InvokeException)
-> Text
-> AWST' Env (ResourceT IO) Response
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> InvokeException
InvokeException (Text -> AWST' Env (ResourceT IO) Response)
-> Text -> AWST' Env (ResourceT IO) Response
forall a b. (a -> b) -> a -> b
$
Text
"Error decoding answer: invalid json: "
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (Message -> String
forall a. Show a => a -> String
show Message
msg)
Just (Just Response
r) -> Response -> AWST' Env (ResourceT IO) Response
forall (m :: * -> *) a. Monad m => a -> m a
return Response
r
IO () -> AWST' Env (ResourceT IO) ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> AWST' Env (ResourceT IO) ())
-> ((LambdaState -> IO LambdaState) -> IO ())
-> (LambdaState -> IO LambdaState)
-> AWST' Env (ResourceT IO) ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MVar LambdaState -> (LambdaState -> IO LambdaState) -> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ MVar LambdaState
leState ((LambdaState -> IO LambdaState) -> AWST' Env (ResourceT IO) ())
-> (LambdaState -> IO LambdaState) -> AWST' Env (ResourceT IO) ()
forall a b. (a -> b) -> a -> b
$ \LambdaState
s ->
case (Int
-> (IO ResponsePayload -> IO ())
-> Maybe (IO ResponsePayload -> IO ()))
-> Int
-> Map Int (IO ResponsePayload -> IO ())
-> (Maybe (IO ResponsePayload -> IO ()),
Map Int (IO ResponsePayload -> IO ()))
forall k a.
Ord k =>
(k -> a -> Maybe a) -> k -> Map k a -> (Maybe a, Map k a)
M.updateLookupWithKey (\Int
_ IO ResponsePayload -> IO ()
_ -> Maybe (IO ResponsePayload -> IO ())
forall a. Maybe a
Nothing) Int
id' (LambdaState -> Map Int (IO ResponsePayload -> IO ())
lsInvocations LambdaState
s) of
(Maybe (IO ResponsePayload -> IO ())
Nothing, Map Int (IO ResponsePayload -> IO ())
_) -> LambdaState -> IO LambdaState
forall (m :: * -> *) a. Monad m => a -> m a
return LambdaState
s
(Just IO ResponsePayload -> IO ()
x, Map Int (IO ResponsePayload -> IO ())
s') -> LambdaState
s {lsInvocations :: Map Int (IO ResponsePayload -> IO ())
lsInvocations = Map Int (IO ResponsePayload -> IO ())
s'} LambdaState -> IO () -> IO LambdaState
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ IO ResponsePayload -> IO ()
x (ResponsePayload -> IO ResponsePayload
forall (m :: * -> *) a. Monad m => a -> m a
return ResponsePayload
payload)
deadLetterThread :: LambdaEnv -> IO ()
deadLetterThread :: LambdaEnv -> IO ()
deadLetterThread LambdaEnv {Env
MVar LambdaState
StackInfo
leEnv :: Env
leStack :: StackInfo
leState :: MVar LambdaState
leEnv :: LambdaEnv -> Env
leStack :: LambdaEnv -> StackInfo
leState :: LambdaEnv -> MVar LambdaState
..} =
ResourceT IO () -> IO ()
forall (m :: * -> *) a. MonadUnliftIO m => ResourceT m a -> m a
runResourceT (ResourceT IO () -> IO ())
-> (AWST' Env (ResourceT IO) () -> ResourceT IO ())
-> AWST' Env (ResourceT IO) ()
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Env -> AWST' Env (ResourceT IO) () -> ResourceT IO ()
forall (m :: * -> *) r a.
(MonadResource m, HasEnv r) =>
r -> AWS a -> m a
runAWS Env
leEnv (AWST' Env (ResourceT IO) () -> ResourceT IO ())
-> (AWST' Env (ResourceT IO) () -> AWST' Env (ResourceT IO) ())
-> AWST' Env (ResourceT IO) ()
-> ResourceT IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. AWST' Env (ResourceT IO) () -> AWST' Env (ResourceT IO) ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (AWST' Env (ResourceT IO) () -> IO ())
-> AWST' Env (ResourceT IO) () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
[Message]
msgs <- Text -> AWS [Message]
sqsReceiveSome (Text -> AWS [Message]) -> Text -> AWS [Message]
forall a b. (a -> b) -> a -> b
$ StackInfo -> Text
siDeadLetterQueue StackInfo
leStack
[Message]
-> (Message -> AWST' Env (ResourceT IO) ())
-> AWST' Env (ResourceT IO) ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Message]
msgs ((Message -> AWST' Env (ResourceT IO) ())
-> AWST' Env (ResourceT IO) ())
-> (Message -> AWST' Env (ResourceT IO) ())
-> AWST' Env (ResourceT IO) ()
forall a b. (a -> b) -> a -> b
$ \Message
msg -> do
Int
id' <- IO Int -> AWST' Env (ResourceT IO) Int
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Int -> AWST' Env (ResourceT IO) Int)
-> IO Int -> AWST' Env (ResourceT IO) Int
forall a b. (a -> b) -> a -> b
$ Message -> IO Int
decodeId Message
msg
IO () -> AWST' Env (ResourceT IO) ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> AWST' Env (ResourceT IO) ())
-> ((LambdaState -> IO LambdaState) -> IO ())
-> (LambdaState -> IO LambdaState)
-> AWST' Env (ResourceT IO) ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MVar LambdaState -> (LambdaState -> IO LambdaState) -> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ MVar LambdaState
leState ((LambdaState -> IO LambdaState) -> AWST' Env (ResourceT IO) ())
-> (LambdaState -> IO LambdaState) -> AWST' Env (ResourceT IO) ()
forall a b. (a -> b) -> a -> b
$ \LambdaState
s ->
case (Int
-> (IO ResponsePayload -> IO ())
-> Maybe (IO ResponsePayload -> IO ()))
-> Int
-> Map Int (IO ResponsePayload -> IO ())
-> (Maybe (IO ResponsePayload -> IO ()),
Map Int (IO ResponsePayload -> IO ()))
forall k a.
Ord k =>
(k -> a -> Maybe a) -> k -> Map k a -> (Maybe a, Map k a)
M.updateLookupWithKey (\Int
_ IO ResponsePayload -> IO ()
_ -> Maybe (IO ResponsePayload -> IO ())
forall a. Maybe a
Nothing) Int
id' (LambdaState -> Map Int (IO ResponsePayload -> IO ())
lsInvocations LambdaState
s) of
(Maybe (IO ResponsePayload -> IO ())
Nothing, Map Int (IO ResponsePayload -> IO ())
_) -> LambdaState -> IO LambdaState
forall (m :: * -> *) a. Monad m => a -> m a
return LambdaState
s
(Just IO ResponsePayload -> IO ()
x, Map Int (IO ResponsePayload -> IO ())
s') -> do
let errMsg :: Maybe Text
errMsg =
Message
msg
Message -> Getting (Maybe Text) Message (Maybe Text) -> Maybe Text
forall s a. s -> Getting a s a -> a
^. (HashMap Text MessageAttributeValue
-> Const (Maybe Text) (HashMap Text MessageAttributeValue))
-> Message -> Const (Maybe Text) Message
Lens' Message (HashMap Text MessageAttributeValue)
mMessageAttributes
((HashMap Text MessageAttributeValue
-> Const (Maybe Text) (HashMap Text MessageAttributeValue))
-> Message -> Const (Maybe Text) Message)
-> ((Maybe Text -> Const (Maybe Text) (Maybe Text))
-> HashMap Text MessageAttributeValue
-> Const (Maybe Text) (HashMap Text MessageAttributeValue))
-> Getting (Maybe Text) Message (Maybe Text)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Index (HashMap Text MessageAttributeValue)
-> Lens'
(HashMap Text MessageAttributeValue)
(Maybe (IxValue (HashMap Text MessageAttributeValue)))
forall m. At m => Index m -> Lens' m (Maybe (IxValue m))
at Index (HashMap Text MessageAttributeValue)
"ErrorMessage"
((Maybe MessageAttributeValue
-> Const (Maybe Text) (Maybe MessageAttributeValue))
-> HashMap Text MessageAttributeValue
-> Const (Maybe Text) (HashMap Text MessageAttributeValue))
-> ((Maybe Text -> Const (Maybe Text) (Maybe Text))
-> Maybe MessageAttributeValue
-> Const (Maybe Text) (Maybe MessageAttributeValue))
-> (Maybe Text -> Const (Maybe Text) (Maybe Text))
-> HashMap Text MessageAttributeValue
-> Const (Maybe Text) (HashMap Text MessageAttributeValue)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (MessageAttributeValue -> Const (Maybe Text) MessageAttributeValue)
-> Maybe MessageAttributeValue
-> Const (Maybe Text) (Maybe MessageAttributeValue)
forall a b. Prism (Maybe a) (Maybe b) a b
_Just
((MessageAttributeValue
-> Const (Maybe Text) MessageAttributeValue)
-> Maybe MessageAttributeValue
-> Const (Maybe Text) (Maybe MessageAttributeValue))
-> ((Maybe Text -> Const (Maybe Text) (Maybe Text))
-> MessageAttributeValue
-> Const (Maybe Text) MessageAttributeValue)
-> (Maybe Text -> Const (Maybe Text) (Maybe Text))
-> Maybe MessageAttributeValue
-> Const (Maybe Text) (Maybe MessageAttributeValue)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Maybe Text -> Const (Maybe Text) (Maybe Text))
-> MessageAttributeValue
-> Const (Maybe Text) MessageAttributeValue
Lens' MessageAttributeValue (Maybe Text)
mavStringValue
IO ResponsePayload -> IO ()
x (IO ResponsePayload -> IO ())
-> (Text -> IO ResponsePayload) -> Text -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. InvokeException -> IO ResponsePayload
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (InvokeException -> IO ResponsePayload)
-> (Text -> InvokeException) -> Text -> IO ResponsePayload
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> InvokeException
InvokeException (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$
Text
"Lambda function failed: "
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Maybe Text -> Text
forall a. a -> Maybe a -> a
fromMaybe Text
"error message not found." Maybe Text
errMsg
LambdaState -> IO LambdaState
forall (m :: * -> *) a. Monad m => a -> m a
return (LambdaState -> IO LambdaState) -> LambdaState -> IO LambdaState
forall a b. (a -> b) -> a -> b
$ LambdaState
s {lsInvocations :: Map Int (IO ResponsePayload -> IO ())
lsInvocations = Map Int (IO ResponsePayload -> IO ())
s'}
where
decodeId :: Message -> IO Int
decodeId :: Message -> IO Int
decodeId Message
msg = case Message
msg Message
-> Getting (First Scientific) Message Scientific
-> Maybe Scientific
forall s a. s -> Getting (First a) s a -> Maybe a
^? (Maybe Text -> Const (First Scientific) (Maybe Text))
-> Message -> Const (First Scientific) Message
Lens' Message (Maybe Text)
mBody ((Maybe Text -> Const (First Scientific) (Maybe Text))
-> Message -> Const (First Scientific) Message)
-> ((Scientific -> Const (First Scientific) Scientific)
-> Maybe Text -> Const (First Scientific) (Maybe Text))
-> Getting (First Scientific) Message Scientific
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Text -> Const (First Scientific) Text)
-> Maybe Text -> Const (First Scientific) (Maybe Text)
forall a b. Prism (Maybe a) (Maybe b) a b
_Just ((Text -> Const (First Scientific) Text)
-> Maybe Text -> Const (First Scientific) (Maybe Text))
-> ((Scientific -> Const (First Scientific) Scientific)
-> Text -> Const (First Scientific) Text)
-> (Scientific -> Const (First Scientific) Scientific)
-> Maybe Text
-> Const (First Scientific) (Maybe Text)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> Traversal' Text Value
forall t. AsValue t => Text -> Traversal' t Value
key Text
"i" ((Value -> Const (First Scientific) Value)
-> Text -> Const (First Scientific) Text)
-> ((Scientific -> Const (First Scientific) Scientific)
-> Value -> Const (First Scientific) Value)
-> (Scientific -> Const (First Scientific) Scientific)
-> Text
-> Const (First Scientific) Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Scientific -> Const (First Scientific) Scientific)
-> Value -> Const (First Scientific) Value
forall t. AsNumber t => Prism' t Scientific
_Number of
Maybe Scientific
Nothing ->
InvokeException -> IO Int
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (InvokeException -> IO Int)
-> (Text -> InvokeException) -> Text -> IO Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> InvokeException
InvokeException (Text -> IO Int) -> Text -> IO Int
forall a b. (a -> b) -> a -> b
$
Text
"Can not find Id: "
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (Message -> String
forall a. Show a => a -> String
show Message
msg)
Just Scientific
x -> Int -> IO Int
forall (m :: * -> *) a. Monad m => a -> m a
return (Int -> IO Int) -> Int -> IO Int
forall a b. (a -> b) -> a -> b
$ Scientific -> Int
forall a b. (RealFrac a, Integral b) => a -> b
truncate Scientific
x
sqsReceiveSome :: T.Text -> AWS [Message]
sqsReceiveSome :: Text -> AWS [Message]
sqsReceiveSome Text
queue = do
ReceiveMessageResponse
rmrs <-
ReceiveMessage -> AWST' Env (ResourceT IO) (Rs ReceiveMessage)
forall (m :: * -> *) a. (MonadAWS m, AWSRequest a) => a -> m (Rs a)
send (ReceiveMessage -> AWST' Env (ResourceT IO) (Rs ReceiveMessage))
-> ReceiveMessage -> AWST' Env (ResourceT IO) (Rs ReceiveMessage)
forall a b. (a -> b) -> a -> b
$
Text -> ReceiveMessage
receiveMessage Text
queue
ReceiveMessage
-> (ReceiveMessage -> ReceiveMessage) -> ReceiveMessage
forall a b. a -> (a -> b) -> b
& (Maybe Int -> Identity (Maybe Int))
-> ReceiveMessage -> Identity ReceiveMessage
Lens' ReceiveMessage (Maybe Int)
rmVisibilityTimeout
((Maybe Int -> Identity (Maybe Int))
-> ReceiveMessage -> Identity ReceiveMessage)
-> Int -> ReceiveMessage -> ReceiveMessage
forall s t a b. ASetter s t a (Maybe b) -> b -> s -> t
?~ Int
10
ReceiveMessage
-> (ReceiveMessage -> ReceiveMessage) -> ReceiveMessage
forall a b. a -> (a -> b) -> b
& (Maybe Int -> Identity (Maybe Int))
-> ReceiveMessage -> Identity ReceiveMessage
Lens' ReceiveMessage (Maybe Int)
rmWaitTimeSeconds
((Maybe Int -> Identity (Maybe Int))
-> ReceiveMessage -> Identity ReceiveMessage)
-> Int -> ReceiveMessage -> ReceiveMessage
forall s t a b. ASetter s t a (Maybe b) -> b -> s -> t
?~ Int
10
ReceiveMessage
-> (ReceiveMessage -> ReceiveMessage) -> ReceiveMessage
forall a b. a -> (a -> b) -> b
& (Maybe Int -> Identity (Maybe Int))
-> ReceiveMessage -> Identity ReceiveMessage
Lens' ReceiveMessage (Maybe Int)
rmMaxNumberOfMessages
((Maybe Int -> Identity (Maybe Int))
-> ReceiveMessage -> Identity ReceiveMessage)
-> Int -> ReceiveMessage -> ReceiveMessage
forall s t a b. ASetter s t a (Maybe b) -> b -> s -> t
?~ Int
10
ReceiveMessage
-> (ReceiveMessage -> ReceiveMessage) -> ReceiveMessage
forall a b. a -> (a -> b) -> b
& ([Text] -> Identity [Text])
-> ReceiveMessage -> Identity ReceiveMessage
Lens' ReceiveMessage [Text]
rmMessageAttributeNames
(([Text] -> Identity [Text])
-> ReceiveMessage -> Identity ReceiveMessage)
-> [Text] -> ReceiveMessage -> ReceiveMessage
forall s t a b. ASetter s t a b -> b -> s -> t
.~ [Text
"All"]
Bool -> AWST' Env (ResourceT IO) () -> AWST' Env (ResourceT IO) ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (ReceiveMessageResponse
rmrs ReceiveMessageResponse
-> Getting Int ReceiveMessageResponse Int -> Int
forall s a. s -> Getting a s a -> a
^. Getting Int ReceiveMessageResponse Int
Lens' ReceiveMessageResponse Int
rmrsResponseStatus Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
200)
(AWST' Env (ResourceT IO) () -> AWST' Env (ResourceT IO) ())
-> AWST' Env (ResourceT IO) () -> AWST' Env (ResourceT IO) ()
forall a b. (a -> b) -> a -> b
$ IO () -> AWST' Env (ResourceT IO) ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
(IO () -> AWST' Env (ResourceT IO) ())
-> (Text -> IO ()) -> Text -> AWST' Env (ResourceT IO) ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. InvokeException -> IO ()
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO
(InvokeException -> IO ())
-> (Text -> InvokeException) -> Text -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> InvokeException
InvokeException
(Text -> AWST' Env (ResourceT IO) ())
-> Text -> AWST' Env (ResourceT IO) ()
forall a b. (a -> b) -> a -> b
$ Text
"Error receiving messages: "
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (Int -> String
forall a. Show a => a -> String
show (Int -> String) -> Int -> String
forall a b. (a -> b) -> a -> b
$ ReceiveMessageResponse
rmrs ReceiveMessageResponse
-> Getting Int ReceiveMessageResponse Int -> Int
forall s a. s -> Getting a s a -> a
^. Getting Int ReceiveMessageResponse Int
Lens' ReceiveMessageResponse Int
rmrsResponseStatus)
let msgs :: [Message]
msgs = ReceiveMessageResponse
rmrs ReceiveMessageResponse
-> Getting [Message] ReceiveMessageResponse [Message] -> [Message]
forall s a. s -> Getting a s a -> a
^. Getting [Message] ReceiveMessageResponse [Message]
Lens' ReceiveMessageResponse [Message]
rmrsMessages
Bool -> AWST' Env (ResourceT IO) () -> AWST' Env (ResourceT IO) ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless ([Message] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [Message]
msgs) (AWST' Env (ResourceT IO) () -> AWST' Env (ResourceT IO) ())
-> AWST' Env (ResourceT IO) () -> AWST' Env (ResourceT IO) ()
forall a b. (a -> b) -> a -> b
$ do
DeleteMessageBatchResponse
dmbrs <-
DeleteMessageBatch
-> AWST' Env (ResourceT IO) (Rs DeleteMessageBatch)
forall (m :: * -> *) a. (MonadAWS m, AWSRequest a) => a -> m (Rs a)
send (DeleteMessageBatch
-> AWST' Env (ResourceT IO) (Rs DeleteMessageBatch))
-> DeleteMessageBatch
-> AWST' Env (ResourceT IO) (Rs DeleteMessageBatch)
forall a b. (a -> b) -> a -> b
$
Text -> DeleteMessageBatch
deleteMessageBatch Text
queue
DeleteMessageBatch
-> (DeleteMessageBatch -> DeleteMessageBatch) -> DeleteMessageBatch
forall a b. a -> (a -> b) -> b
& ([DeleteMessageBatchRequestEntry]
-> Identity [DeleteMessageBatchRequestEntry])
-> DeleteMessageBatch -> Identity DeleteMessageBatch
Lens' DeleteMessageBatch [DeleteMessageBatchRequestEntry]
dmbEntries
(([DeleteMessageBatchRequestEntry]
-> Identity [DeleteMessageBatchRequestEntry])
-> DeleteMessageBatch -> Identity DeleteMessageBatch)
-> [DeleteMessageBatchRequestEntry]
-> DeleteMessageBatch
-> DeleteMessageBatch
forall s t a b. ASetter s t a b -> b -> s -> t
.~ [ Text -> Text -> DeleteMessageBatchRequestEntry
deleteMessageBatchRequestEntry
(String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Integer -> String
forall a. Show a => a -> String
show Integer
i)
(Maybe Text -> Text
forall a. HasCallStack => Maybe a -> a
fromJust (Maybe Text -> Text) -> Maybe Text -> Text
forall a b. (a -> b) -> a -> b
$ Message
msg Message -> Getting (Maybe Text) Message (Maybe Text) -> Maybe Text
forall s a. s -> Getting a s a -> a
^. Getting (Maybe Text) Message (Maybe Text)
Lens' Message (Maybe Text)
mReceiptHandle)
| (Integer
i, Message
msg) <- [Integer] -> [Message] -> [(Integer, Message)]
forall a b. [a] -> [b] -> [(a, b)]
zip [(Integer
0 :: Integer) ..] [Message]
msgs
]
Bool -> AWST' Env (ResourceT IO) () -> AWST' Env (ResourceT IO) ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (DeleteMessageBatchResponse
dmbrs DeleteMessageBatchResponse
-> Getting Int DeleteMessageBatchResponse Int -> Int
forall s a. s -> Getting a s a -> a
^. Getting Int DeleteMessageBatchResponse Int
Lens' DeleteMessageBatchResponse Int
dmbrsResponseStatus Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
200)
(AWST' Env (ResourceT IO) () -> AWST' Env (ResourceT IO) ())
-> AWST' Env (ResourceT IO) () -> AWST' Env (ResourceT IO) ()
forall a b. (a -> b) -> a -> b
$ IO () -> AWST' Env (ResourceT IO) ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
(IO () -> AWST' Env (ResourceT IO) ())
-> (Text -> IO ()) -> Text -> AWST' Env (ResourceT IO) ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. InvokeException -> IO ()
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO
(InvokeException -> IO ())
-> (Text -> InvokeException) -> Text -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> InvokeException
InvokeException
(Text -> AWST' Env (ResourceT IO) ())
-> Text -> AWST' Env (ResourceT IO) ()
forall a b. (a -> b) -> a -> b
$ Text
"Error deleting received messages: "
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (Int -> String
forall a. Show a => a -> String
show (Int -> String) -> Int -> String
forall a b. (a -> b) -> a -> b
$ ReceiveMessageResponse
rmrs ReceiveMessageResponse
-> Getting Int ReceiveMessageResponse Int -> Int
forall s a. s -> Getting a s a -> a
^. Getting Int ReceiveMessageResponse Int
Lens' ReceiveMessageResponse Int
rmrsResponseStatus)
[Message] -> AWS [Message]
forall (m :: * -> *) a. Monad m => a -> m a
return [Message]
msgs
withInvoke ::
Env ->
(Throttle, Throttle, Throttle) ->
StackInfo ->
((BS.ByteString -> BackendM BS.ByteString) -> IO a) ->
IO a
withInvoke :: Env
-> (Throttle, Throttle, Throttle)
-> StackInfo
-> ((ByteString -> BackendM ByteString) -> IO a)
-> IO a
withInvoke Env
env (Throttle, Throttle, Throttle)
throttles StackInfo
stack (ByteString -> BackendM ByteString) -> IO a
f = do
LambdaEnv
le <- Env -> StackInfo -> IO LambdaEnv
newLambdaEnv Env
env StackInfo
stack
let answerT :: IO (Async a)
answerT = IO a -> IO (Async a)
forall a. IO a -> IO (Async a)
async (IO a -> IO (Async a)) -> (IO () -> IO a) -> IO () -> IO (Async a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> IO a
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO (Async a)) -> IO () -> IO (Async a)
forall a b. (a -> b) -> a -> b
$ IO () -> (SomeException -> IO ()) -> IO ()
forall (m :: * -> *) a.
MonadCatch m =>
m a -> (SomeException -> m a) -> m a
catchAny (LambdaEnv -> IO ()
answerThread LambdaEnv
le) ((SomeException -> IO ()) -> IO ())
-> (SomeException -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \SomeException
ex -> SomeException -> IO ()
forall a. Show a => a -> IO ()
print SomeException
ex
deadLetterT :: IO (Async a)
deadLetterT = IO a -> IO (Async a)
forall a. IO a -> IO (Async a)
async (IO a -> IO (Async a)) -> (IO () -> IO a) -> IO () -> IO (Async a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> IO a
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO (Async a)) -> IO () -> IO (Async a)
forall a b. (a -> b) -> a -> b
$ IO () -> (SomeException -> IO ()) -> IO ()
forall (m :: * -> *) a.
MonadCatch m =>
m a -> (SomeException -> m a) -> m a
catchAny (LambdaEnv -> IO ()
deadLetterThread LambdaEnv
le) ((SomeException -> IO ()) -> IO ())
-> (SomeException -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \SomeException
ex -> SomeException -> IO ()
forall a. Show a => a -> IO ()
print SomeException
ex
[Async Any]
threads <- [Async Any] -> [Async Any] -> [Async Any]
forall a. [a] -> [a] -> [a]
(++) ([Async Any] -> [Async Any] -> [Async Any])
-> IO [Async Any] -> IO ([Async Any] -> [Async Any])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Int -> IO (Async Any) -> IO [Async Any]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
4 IO (Async Any)
forall a. IO (Async a)
answerT IO ([Async Any] -> [Async Any]) -> IO [Async Any] -> IO [Async Any]
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Int -> IO (Async Any) -> IO [Async Any]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
2 IO (Async Any)
forall a. IO (Async a)
deadLetterT
(ByteString -> BackendM ByteString) -> IO a
f (LambdaEnv
-> (Throttle, Throttle, Throttle)
-> ByteString
-> BackendM ByteString
execute LambdaEnv
le (Throttle, Throttle, Throttle)
throttles) IO a -> IO () -> IO a
forall (m :: * -> *) a b. MonadMask m => m a -> m b -> m a
`finally` (Async Any -> IO ()) -> [Async Any] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Async Any -> IO ()
forall a. Async a -> IO ()
cancel [Async Any]
threads
newtype InvokeException
= InvokeException T.Text
deriving (Int -> InvokeException -> ShowS
[InvokeException] -> ShowS
InvokeException -> String
(Int -> InvokeException -> ShowS)
-> (InvokeException -> String)
-> ([InvokeException] -> ShowS)
-> Show InvokeException
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [InvokeException] -> ShowS
$cshowList :: [InvokeException] -> ShowS
show :: InvokeException -> String
$cshow :: InvokeException -> String
showsPrec :: Int -> InvokeException -> ShowS
$cshowsPrec :: Int -> InvokeException -> ShowS
Show)
instance Exception InvokeException