{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE StaticPointers #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeSynonymInstances #-}
{-# OPTIONS_GHC -fno-warn-orphans #-}
module Control.Distributed.Dataset.AWS
( s3ShuffleStore,
module Control.Distributed.Fork.AWS,
)
where
import Conduit
import Control.Distributed.Closure
import Control.Distributed.Dataset.ShuffleStore
import Control.Distributed.Fork.AWS
import Control.Lens
import Control.Monad
import Control.Monad.Trans.AWS (AWST)
import qualified Data.Text as T
import Network.AWS
import Network.AWS.Data.Body (RsBody (_streamBody))
import qualified Network.AWS.S3 as S3
import qualified Network.AWS.S3.StreamingUpload as S3
import System.IO.Unsafe
s3ShuffleStore :: T.Text -> T.Text -> ShuffleStore
s3ShuffleStore :: Text -> Text -> ShuffleStore
s3ShuffleStore Text
bucket' Text
prefix' =
ShuffleStore :: Closure
(Int64 -> Range -> ConduitT () ByteString (ResourceT IO) ())
-> Closure (Int64 -> ConduitT ByteString Void (ResourceT IO) ())
-> ShuffleStore
ShuffleStore
{ ssGet :: Closure
(Int64 -> Range -> ConduitT () ByteString (ResourceT IO) ())
ssGet =
static
( \Text
bucket Text
prefix Int64
num Range
range -> do
GetObjectResponse
ret <-
Env
-> AWS GetObjectResponse
-> ConduitT () ByteString (ResourceT IO) GetObjectResponse
forall (m :: * -> *) r a.
(MonadResource m, HasEnv r) =>
r -> AWS a -> m a
runAWS Env
globalAWSEnv
(AWS GetObjectResponse
-> ConduitT () ByteString (ResourceT IO) GetObjectResponse)
-> AWS GetObjectResponse
-> ConduitT () ByteString (ResourceT IO) GetObjectResponse
forall a b. (a -> b) -> a -> b
$ GetObject -> AWST' Env (ResourceT IO) (Rs GetObject)
forall (m :: * -> *) a. (MonadAWS m, AWSRequest a) => a -> m (Rs a)
send
(GetObject -> AWST' Env (ResourceT IO) (Rs GetObject))
-> GetObject -> AWST' Env (ResourceT IO) (Rs GetObject)
forall a b. (a -> b) -> a -> b
$ BucketName -> ObjectKey -> GetObject
S3.getObject
(Text -> BucketName
S3.BucketName Text
bucket)
(Text -> ObjectKey
S3.ObjectKey (Text -> ObjectKey) -> Text -> ObjectKey
forall a b. (a -> b) -> a -> b
$ Text
prefix Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (Int64 -> String
forall a. Show a => a -> String
show Int64
num))
GetObject -> (GetObject -> GetObject) -> GetObject
forall a b. a -> (a -> b) -> b
& (Maybe Text -> Identity (Maybe Text))
-> GetObject -> Identity GetObject
Lens' GetObject (Maybe Text)
S3.goRange
((Maybe Text -> Identity (Maybe Text))
-> GetObject -> Identity GetObject)
-> Maybe Text -> GetObject -> GetObject
forall s t a b. ASetter s t a b -> b -> s -> t
.~ ( case Range
range of
Range
RangeAll -> Maybe Text
forall a. Maybe a
Nothing
RangeOnly Integer
lo Integer
hi ->
Text -> Maybe Text
forall a. a -> Maybe a
Just (Text -> Maybe Text) -> (String -> Text) -> String -> Maybe Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Text
T.pack (String -> Maybe Text) -> String -> Maybe Text
forall a b. (a -> b) -> a -> b
$ String
"bytes=" String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Integer -> String
forall a. Show a => a -> String
show Integer
lo String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
"-" String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Integer -> String
forall a. Show a => a -> String
show Integer
hi
)
RsBody -> ConduitT () ByteString (ResourceT IO) ()
_streamBody (RsBody -> ConduitT () ByteString (ResourceT IO) ())
-> RsBody -> ConduitT () ByteString (ResourceT IO) ()
forall a b. (a -> b) -> a -> b
$ GetObjectResponse
ret GetObjectResponse
-> Getting RsBody GetObjectResponse RsBody -> RsBody
forall s a. s -> Getting a s a -> a
^. Getting RsBody GetObjectResponse RsBody
Lens' GetObjectResponse RsBody
S3.gorsBody
)
Closure
(Text
-> Text
-> Int64
-> Range
-> ConduitT () ByteString (ResourceT IO) ())
-> Closure Text
-> Closure
(Text
-> Int64 -> Range -> ConduitT () ByteString (ResourceT IO) ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (Dict (Serializable Text)) -> Text -> Closure Text
forall a. Closure (Dict (Serializable a)) -> a -> Closure a
cpure (static Dict (Serializable Text)
forall (a :: Constraint). a => Dict a
Dict) Text
bucket'
Closure
(Text
-> Int64 -> Range -> ConduitT () ByteString (ResourceT IO) ())
-> Closure Text
-> Closure
(Int64 -> Range -> ConduitT () ByteString (ResourceT IO) ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (Dict (Serializable Text)) -> Text -> Closure Text
forall a. Closure (Dict (Serializable a)) -> a -> Closure a
cpure (static Dict (Serializable Text)
forall (a :: Constraint). a => Dict a
Dict) Text
prefix',
ssPut :: Closure (Int64 -> ConduitT ByteString Void (ResourceT IO) ())
ssPut =
static
( \Text
bucket Text
prefix Int64
num ->
ConduitT
ByteString
Void
(ResourceT IO)
(Either
(AbortMultipartUploadResponse, SomeException)
CompleteMultipartUploadResponse)
-> ConduitT ByteString Void (ResourceT IO) ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (ConduitT
ByteString
Void
(ResourceT IO)
(Either
(AbortMultipartUploadResponse, SomeException)
CompleteMultipartUploadResponse)
-> ConduitT ByteString Void (ResourceT IO) ())
-> (ConduitT
ByteString
Void
(AWST (ResourceT IO))
(Either
(AbortMultipartUploadResponse, SomeException)
CompleteMultipartUploadResponse)
-> ConduitT
ByteString
Void
(ResourceT IO)
(Either
(AbortMultipartUploadResponse, SomeException)
CompleteMultipartUploadResponse))
-> ConduitT
ByteString
Void
(AWST (ResourceT IO))
(Either
(AbortMultipartUploadResponse, SomeException)
CompleteMultipartUploadResponse)
-> ConduitT ByteString Void (ResourceT IO) ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (forall a. AWST (ResourceT IO) a -> ResourceT IO a)
-> ConduitT
ByteString
Void
(AWST (ResourceT IO))
(Either
(AbortMultipartUploadResponse, SomeException)
CompleteMultipartUploadResponse)
-> ConduitT
ByteString
Void
(ResourceT IO)
(Either
(AbortMultipartUploadResponse, SomeException)
CompleteMultipartUploadResponse)
forall (m :: * -> *) (n :: * -> *) i o r.
Monad m =>
(forall a. m a -> n a) -> ConduitT i o m r -> ConduitT i o n r
transPipe @(AWST (ResourceT IO)) (Env -> AWS a -> ResourceT IO a
forall (m :: * -> *) r a.
(MonadResource m, HasEnv r) =>
r -> AWS a -> m a
runAWS Env
globalAWSEnv)
(ConduitT
ByteString
Void
(AWST (ResourceT IO))
(Either
(AbortMultipartUploadResponse, SomeException)
CompleteMultipartUploadResponse)
-> ConduitT ByteString Void (ResourceT IO) ())
-> ConduitT
ByteString
Void
(AWST (ResourceT IO))
(Either
(AbortMultipartUploadResponse, SomeException)
CompleteMultipartUploadResponse)
-> ConduitT ByteString Void (ResourceT IO) ()
forall a b. (a -> b) -> a -> b
$ Maybe ChunkSize
-> CreateMultipartUpload
-> ConduitT
ByteString
Void
(AWST (ResourceT IO))
(Either
(AbortMultipartUploadResponse, SomeException)
CompleteMultipartUploadResponse)
forall (m :: * -> *).
(MonadUnliftIO m, MonadAWS m, MonadFail m) =>
Maybe ChunkSize
-> CreateMultipartUpload
-> ConduitT
ByteString
Void
m
(Either
(AbortMultipartUploadResponse, SomeException)
CompleteMultipartUploadResponse)
S3.streamUpload Maybe ChunkSize
forall a. Maybe a
Nothing
(CreateMultipartUpload
-> ConduitT
ByteString
Void
(AWST (ResourceT IO))
(Either
(AbortMultipartUploadResponse, SomeException)
CompleteMultipartUploadResponse))
-> CreateMultipartUpload
-> ConduitT
ByteString
Void
(AWST (ResourceT IO))
(Either
(AbortMultipartUploadResponse, SomeException)
CompleteMultipartUploadResponse)
forall a b. (a -> b) -> a -> b
$ BucketName -> ObjectKey -> CreateMultipartUpload
S3.createMultipartUpload
(Text -> BucketName
S3.BucketName Text
bucket)
(Text -> ObjectKey
S3.ObjectKey (Text -> ObjectKey) -> Text -> ObjectKey
forall a b. (a -> b) -> a -> b
$ Text
prefix Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (Int64 -> String
forall a. Show a => a -> String
show Int64
num))
)
Closure
(Text
-> Text -> Int64 -> ConduitT ByteString Void (ResourceT IO) ())
-> Closure Text
-> Closure
(Text -> Int64 -> ConduitT ByteString Void (ResourceT IO) ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (Dict (Serializable Text)) -> Text -> Closure Text
forall a. Closure (Dict (Serializable a)) -> a -> Closure a
cpure (static Dict (Serializable Text)
forall (a :: Constraint). a => Dict a
Dict) Text
bucket'
Closure
(Text -> Int64 -> ConduitT ByteString Void (ResourceT IO) ())
-> Closure Text
-> Closure (Int64 -> ConduitT ByteString Void (ResourceT IO) ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (Dict (Serializable Text)) -> Text -> Closure Text
forall a. Closure (Dict (Serializable a)) -> a -> Closure a
cpure (static Dict (Serializable Text)
forall (a :: Constraint). a => Dict a
Dict) Text
prefix'
}
instance (MonadFail m, MonadIO m) => MonadFail (AWST m) where
fail :: String -> AWST m a
fail = m a -> AWST m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m a -> AWST m a) -> (String -> m a) -> String -> AWST m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> m a
forall (m :: * -> *) a. MonadFail m => String -> m a
fail
globalAWSEnv :: Env
globalAWSEnv :: Env
globalAWSEnv = IO Env -> Env
forall a. IO a -> a
unsafePerformIO (IO Env -> Env) -> IO Env -> Env
forall a b. (a -> b) -> a -> b
$ Credentials -> IO Env
forall (m :: * -> *).
(Applicative m, MonadIO m, MonadCatch m) =>
Credentials -> m Env
newEnv Credentials
Discover
{-# NOINLINE globalAWSEnv #-}