{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE StaticPointers #-}
module Control.Distributed.Dataset.Local
( withLocalTmpShuffleStore,
localProcessBackend,
)
where
import Conduit
import Control.Distributed.Closure
import Control.Distributed.Dataset.ShuffleStore
import Control.Distributed.Fork.Local
import Control.Exception (bracket)
import qualified Data.ByteString as BS
import Data.Conduit.Binary
import qualified Data.Conduit.Combinators as C
import System.Directory
import System.FilePath
import System.Posix.Temp
import Prelude hiding (rem)
withLocalTmpShuffleStore :: (ShuffleStore -> IO a) -> IO a
withLocalTmpShuffleStore :: (ShuffleStore -> IO a) -> IO a
withLocalTmpShuffleStore ShuffleStore -> IO a
act =
IO FilePath -> (FilePath -> IO ()) -> (FilePath -> IO a) -> IO a
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket
(FilePath -> IO FilePath
mkdtemp FilePath
"/tmp/dd-")
FilePath -> IO ()
removeDirectoryRecursive
(ShuffleStore -> IO a
act (ShuffleStore -> IO a)
-> (FilePath -> ShuffleStore) -> FilePath -> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. FilePath -> ShuffleStore
mk)
where
mk :: String -> ShuffleStore
mk :: FilePath -> ShuffleStore
mk FilePath
tmp' =
ShuffleStore :: Closure
(Int64 -> Range -> ConduitT () ByteString (ResourceT IO) ())
-> Closure (Int64 -> ConduitT ByteString Void (ResourceT IO) ())
-> ShuffleStore
ShuffleStore
{ ssGet :: Closure
(Int64 -> Range -> ConduitT () ByteString (ResourceT IO) ())
ssGet = static (\FilePath
tmp Int64
int Range
range -> Range -> FilePath -> ConduitT () ByteString (ResourceT IO) ()
forall t.
Range -> FilePath -> ConduitT t ByteString (ResourceT IO) ()
streamFile Range
range (FilePath
tmp FilePath -> FilePath -> FilePath
</> Int64 -> FilePath
forall a. Show a => a -> FilePath
show Int64
int)) Closure
(FilePath
-> Int64 -> Range -> ConduitT () ByteString (ResourceT IO) ())
-> Closure FilePath
-> Closure
(Int64 -> Range -> ConduitT () ByteString (ResourceT IO) ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (Dict (Serializable FilePath))
-> FilePath -> Closure FilePath
forall a. Closure (Dict (Serializable a)) -> a -> Closure a
cpure (static Dict (Serializable FilePath)
forall (a :: Constraint). a => Dict a
Dict) FilePath
tmp',
ssPut :: Closure (Int64 -> ConduitT ByteString Void (ResourceT IO) ())
ssPut = static (\FilePath
tmp Int64
int -> FilePath -> ConduitT ByteString Void (ResourceT IO) ()
forall (m :: * -> *) o.
MonadResource m =>
FilePath -> ConduitT ByteString o m ()
C.sinkFile (FilePath
tmp FilePath -> FilePath -> FilePath
</> Int64 -> FilePath
forall a. Show a => a -> FilePath
show Int64
int)) Closure
(FilePath -> Int64 -> ConduitT ByteString Void (ResourceT IO) ())
-> Closure FilePath
-> Closure (Int64 -> ConduitT ByteString Void (ResourceT IO) ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (Dict (Serializable FilePath))
-> FilePath -> Closure FilePath
forall a. Closure (Dict (Serializable a)) -> a -> Closure a
cpure (static Dict (Serializable FilePath)
forall (a :: Constraint). a => Dict a
Dict) FilePath
tmp'
}
streamFile :: Range -> FilePath -> ConduitT t BS.ByteString (ResourceT IO) ()
streamFile :: Range -> FilePath -> ConduitT t ByteString (ResourceT IO) ()
streamFile Range
RangeAll FilePath
fp = FilePath -> ConduitT t ByteString (ResourceT IO) ()
forall (m :: * -> *) i.
MonadResource m =>
FilePath -> ConduitT i ByteString m ()
C.sourceFile FilePath
fp
streamFile (RangeOnly Integer
start Integer
end) FilePath
fp = FilePath
-> Maybe Integer
-> Maybe Integer
-> ConduitT t ByteString (ResourceT IO) ()
forall (m :: * -> *) i.
MonadResource m =>
FilePath
-> Maybe Integer -> Maybe Integer -> ConduitT i ByteString m ()
sourceFileRange FilePath
fp (Integer -> Maybe Integer
forall a. a -> Maybe a
Just Integer
start) (Integer -> Maybe Integer
forall a. a -> Maybe a
Just (Integer -> Maybe Integer) -> Integer -> Maybe Integer
forall a b. (a -> b) -> a -> b
$ Integer
end Integer -> Integer -> Integer
forall a. Num a => a -> a -> a
- Integer
start Integer -> Integer -> Integer
forall a. Num a => a -> a -> a
+ Integer
1)