{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE StaticPointers #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeSynonymInstances #-}
{-# OPTIONS_GHC -fno-warn-orphans #-}

module Control.Distributed.Dataset.AWS
  ( s3ShuffleStore,
    -- Re-exports
    module Control.Distributed.Fork.AWS,
  )
where

import Conduit
import Control.Distributed.Closure
import Control.Distributed.Dataset.ShuffleStore
import Control.Distributed.Fork.AWS
import Control.Lens
import Control.Monad
import Control.Monad.Trans.AWS (AWST)
import qualified Data.Text as T
import Network.AWS
import Network.AWS.Data.Body (RsBody (_streamBody))
import qualified Network.AWS.S3 as S3
import qualified Network.AWS.S3.StreamingUpload as S3
import System.IO.Unsafe

-- |
-- A shuffle store which uses given S3 bucket and the prefix as a shuffle store.

-- TODO: Cleanup
-- TODO: Use a temporary bucket created by CloudFormation
s3ShuffleStore :: T.Text -> T.Text -> ShuffleStore
s3ShuffleStore :: Text -> Text -> ShuffleStore
s3ShuffleStore Text
bucket' Text
prefix' =
  ShuffleStore :: Closure
  (Int64 -> Range -> ConduitT () ByteString (ResourceT IO) ())
-> Closure (Int64 -> ConduitT ByteString Void (ResourceT IO) ())
-> ShuffleStore
ShuffleStore
    { ssGet :: Closure
  (Int64 -> Range -> ConduitT () ByteString (ResourceT IO) ())
ssGet =
        static
          ( \Text
bucket Text
prefix Int64
num Range
range -> do
              GetObjectResponse
ret <-
                Env
-> AWS GetObjectResponse
-> ConduitT () ByteString (ResourceT IO) GetObjectResponse
forall (m :: * -> *) r a.
(MonadResource m, HasEnv r) =>
r -> AWS a -> m a
runAWS Env
globalAWSEnv
                  (AWS GetObjectResponse
 -> ConduitT () ByteString (ResourceT IO) GetObjectResponse)
-> AWS GetObjectResponse
-> ConduitT () ByteString (ResourceT IO) GetObjectResponse
forall a b. (a -> b) -> a -> b
$ GetObject -> AWST' Env (ResourceT IO) (Rs GetObject)
forall (m :: * -> *) a. (MonadAWS m, AWSRequest a) => a -> m (Rs a)
send
                  (GetObject -> AWST' Env (ResourceT IO) (Rs GetObject))
-> GetObject -> AWST' Env (ResourceT IO) (Rs GetObject)
forall a b. (a -> b) -> a -> b
$ BucketName -> ObjectKey -> GetObject
S3.getObject
                    (Text -> BucketName
S3.BucketName Text
bucket)
                    (Text -> ObjectKey
S3.ObjectKey (Text -> ObjectKey) -> Text -> ObjectKey
forall a b. (a -> b) -> a -> b
$ Text
prefix Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (Int64 -> String
forall a. Show a => a -> String
show Int64
num))
                    GetObject -> (GetObject -> GetObject) -> GetObject
forall a b. a -> (a -> b) -> b
& (Maybe Text -> Identity (Maybe Text))
-> GetObject -> Identity GetObject
Lens' GetObject (Maybe Text)
S3.goRange
                    ((Maybe Text -> Identity (Maybe Text))
 -> GetObject -> Identity GetObject)
-> Maybe Text -> GetObject -> GetObject
forall s t a b. ASetter s t a b -> b -> s -> t
.~ ( case Range
range of
                           Range
RangeAll -> Maybe Text
forall a. Maybe a
Nothing
                           RangeOnly Integer
lo Integer
hi ->
                             Text -> Maybe Text
forall a. a -> Maybe a
Just (Text -> Maybe Text) -> (String -> Text) -> String -> Maybe Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Text
T.pack (String -> Maybe Text) -> String -> Maybe Text
forall a b. (a -> b) -> a -> b
$ String
"bytes=" String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Integer -> String
forall a. Show a => a -> String
show Integer
lo String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
"-" String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Integer -> String
forall a. Show a => a -> String
show Integer
hi
                       )
              RsBody -> ConduitT () ByteString (ResourceT IO) ()
_streamBody (RsBody -> ConduitT () ByteString (ResourceT IO) ())
-> RsBody -> ConduitT () ByteString (ResourceT IO) ()
forall a b. (a -> b) -> a -> b
$ GetObjectResponse
ret GetObjectResponse
-> Getting RsBody GetObjectResponse RsBody -> RsBody
forall s a. s -> Getting a s a -> a
^. Getting RsBody GetObjectResponse RsBody
Lens' GetObjectResponse RsBody
S3.gorsBody
          )
          Closure
  (Text
   -> Text
   -> Int64
   -> Range
   -> ConduitT () ByteString (ResourceT IO) ())
-> Closure Text
-> Closure
     (Text
      -> Int64 -> Range -> ConduitT () ByteString (ResourceT IO) ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (Dict (Serializable Text)) -> Text -> Closure Text
forall a. Closure (Dict (Serializable a)) -> a -> Closure a
cpure (static Dict (Serializable Text)
forall (a :: Constraint). a => Dict a
Dict) Text
bucket'
          Closure
  (Text
   -> Int64 -> Range -> ConduitT () ByteString (ResourceT IO) ())
-> Closure Text
-> Closure
     (Int64 -> Range -> ConduitT () ByteString (ResourceT IO) ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (Dict (Serializable Text)) -> Text -> Closure Text
forall a. Closure (Dict (Serializable a)) -> a -> Closure a
cpure (static Dict (Serializable Text)
forall (a :: Constraint). a => Dict a
Dict) Text
prefix',
      ssPut :: Closure (Int64 -> ConduitT ByteString Void (ResourceT IO) ())
ssPut =
        static
          ( \Text
bucket Text
prefix Int64
num ->
              ConduitT
  ByteString
  Void
  (ResourceT IO)
  (Either
     (AbortMultipartUploadResponse, SomeException)
     CompleteMultipartUploadResponse)
-> ConduitT ByteString Void (ResourceT IO) ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (ConduitT
   ByteString
   Void
   (ResourceT IO)
   (Either
      (AbortMultipartUploadResponse, SomeException)
      CompleteMultipartUploadResponse)
 -> ConduitT ByteString Void (ResourceT IO) ())
-> (ConduitT
      ByteString
      Void
      (AWST (ResourceT IO))
      (Either
         (AbortMultipartUploadResponse, SomeException)
         CompleteMultipartUploadResponse)
    -> ConduitT
         ByteString
         Void
         (ResourceT IO)
         (Either
            (AbortMultipartUploadResponse, SomeException)
            CompleteMultipartUploadResponse))
-> ConduitT
     ByteString
     Void
     (AWST (ResourceT IO))
     (Either
        (AbortMultipartUploadResponse, SomeException)
        CompleteMultipartUploadResponse)
-> ConduitT ByteString Void (ResourceT IO) ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (forall a. AWST (ResourceT IO) a -> ResourceT IO a)
-> ConduitT
     ByteString
     Void
     (AWST (ResourceT IO))
     (Either
        (AbortMultipartUploadResponse, SomeException)
        CompleteMultipartUploadResponse)
-> ConduitT
     ByteString
     Void
     (ResourceT IO)
     (Either
        (AbortMultipartUploadResponse, SomeException)
        CompleteMultipartUploadResponse)
forall (m :: * -> *) (n :: * -> *) i o r.
Monad m =>
(forall a. m a -> n a) -> ConduitT i o m r -> ConduitT i o n r
transPipe @(AWST (ResourceT IO)) (Env -> AWS a -> ResourceT IO a
forall (m :: * -> *) r a.
(MonadResource m, HasEnv r) =>
r -> AWS a -> m a
runAWS Env
globalAWSEnv)
                (ConduitT
   ByteString
   Void
   (AWST (ResourceT IO))
   (Either
      (AbortMultipartUploadResponse, SomeException)
      CompleteMultipartUploadResponse)
 -> ConduitT ByteString Void (ResourceT IO) ())
-> ConduitT
     ByteString
     Void
     (AWST (ResourceT IO))
     (Either
        (AbortMultipartUploadResponse, SomeException)
        CompleteMultipartUploadResponse)
-> ConduitT ByteString Void (ResourceT IO) ()
forall a b. (a -> b) -> a -> b
$ Maybe ChunkSize
-> CreateMultipartUpload
-> ConduitT
     ByteString
     Void
     (AWST (ResourceT IO))
     (Either
        (AbortMultipartUploadResponse, SomeException)
        CompleteMultipartUploadResponse)
forall (m :: * -> *).
(MonadUnliftIO m, MonadAWS m, MonadFail m) =>
Maybe ChunkSize
-> CreateMultipartUpload
-> ConduitT
     ByteString
     Void
     m
     (Either
        (AbortMultipartUploadResponse, SomeException)
        CompleteMultipartUploadResponse)
S3.streamUpload Maybe ChunkSize
forall a. Maybe a
Nothing
                (CreateMultipartUpload
 -> ConduitT
      ByteString
      Void
      (AWST (ResourceT IO))
      (Either
         (AbortMultipartUploadResponse, SomeException)
         CompleteMultipartUploadResponse))
-> CreateMultipartUpload
-> ConduitT
     ByteString
     Void
     (AWST (ResourceT IO))
     (Either
        (AbortMultipartUploadResponse, SomeException)
        CompleteMultipartUploadResponse)
forall a b. (a -> b) -> a -> b
$ BucketName -> ObjectKey -> CreateMultipartUpload
S3.createMultipartUpload
                  (Text -> BucketName
S3.BucketName Text
bucket)
                  (Text -> ObjectKey
S3.ObjectKey (Text -> ObjectKey) -> Text -> ObjectKey
forall a b. (a -> b) -> a -> b
$ Text
prefix Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (Int64 -> String
forall a. Show a => a -> String
show Int64
num))
          )
          Closure
  (Text
   -> Text -> Int64 -> ConduitT ByteString Void (ResourceT IO) ())
-> Closure Text
-> Closure
     (Text -> Int64 -> ConduitT ByteString Void (ResourceT IO) ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (Dict (Serializable Text)) -> Text -> Closure Text
forall a. Closure (Dict (Serializable a)) -> a -> Closure a
cpure (static Dict (Serializable Text)
forall (a :: Constraint). a => Dict a
Dict) Text
bucket'
          Closure
  (Text -> Int64 -> ConduitT ByteString Void (ResourceT IO) ())
-> Closure Text
-> Closure (Int64 -> ConduitT ByteString Void (ResourceT IO) ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (Dict (Serializable Text)) -> Text -> Closure Text
forall a. Closure (Dict (Serializable a)) -> a -> Closure a
cpure (static Dict (Serializable Text)
forall (a :: Constraint). a => Dict a
Dict) Text
prefix'
    }

instance (MonadFail m, MonadIO m) => MonadFail (AWST m) where
  fail :: String -> AWST m a
fail = m a -> AWST m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m a -> AWST m a) -> (String -> m a) -> String -> AWST m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> m a
forall (m :: * -> *) a. MonadFail m => String -> m a
fail

-- FIXME
-- Currently ShuffleStore does not allow storing data between the 'ssGet's.
-- Since this function is quite costly, we're using this horrible hack.

-- Since using a state on 'get's is pretty common (having a database connection,
-- authentication etc.) we should figure out a way to fix this.
globalAWSEnv :: Env
globalAWSEnv :: Env
globalAWSEnv = IO Env -> Env
forall a. IO a -> a
unsafePerformIO (IO Env -> Env) -> IO Env -> Env
forall a b. (a -> b) -> a -> b
$ Credentials -> IO Env
forall (m :: * -> *).
(Applicative m, MonadIO m, MonadCatch m) =>
Credentials -> m Env
newEnv Credentials
Discover
{-# NOINLINE globalAWSEnv #-}