{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE StaticPointers #-}

module Control.Distributed.Dataset.Local
  ( withLocalTmpShuffleStore,

    -- * Re-exports
    localProcessBackend,
  )
where

import Conduit
import Control.Distributed.Closure
import Control.Distributed.Dataset.ShuffleStore
import Control.Distributed.Fork.Local
import Control.Exception (bracket)
import qualified Data.ByteString as BS
import Data.Conduit.Binary
import qualified Data.Conduit.Combinators as C
import System.Directory
import System.FilePath
import System.Posix.Temp
import Prelude hiding (rem)

-- |
-- Uses a temporary directory in the local disk as a 'ShuffleStore'.

-- Useful for testing purposes.
withLocalTmpShuffleStore :: (ShuffleStore -> IO a) -> IO a
withLocalTmpShuffleStore :: (ShuffleStore -> IO a) -> IO a
withLocalTmpShuffleStore ShuffleStore -> IO a
act =
  IO FilePath -> (FilePath -> IO ()) -> (FilePath -> IO a) -> IO a
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket
    (FilePath -> IO FilePath
mkdtemp FilePath
"/tmp/dd-")
    FilePath -> IO ()
removeDirectoryRecursive
    (ShuffleStore -> IO a
act (ShuffleStore -> IO a)
-> (FilePath -> ShuffleStore) -> FilePath -> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. FilePath -> ShuffleStore
mk)
  where
    mk :: String -> ShuffleStore
    mk :: FilePath -> ShuffleStore
mk FilePath
tmp' =
      ShuffleStore :: Closure
  (Int64 -> Range -> ConduitT () ByteString (ResourceT IO) ())
-> Closure (Int64 -> ConduitT ByteString Void (ResourceT IO) ())
-> ShuffleStore
ShuffleStore
        { ssGet :: Closure
  (Int64 -> Range -> ConduitT () ByteString (ResourceT IO) ())
ssGet = static (\FilePath
tmp Int64
int Range
range -> Range -> FilePath -> ConduitT () ByteString (ResourceT IO) ()
forall t.
Range -> FilePath -> ConduitT t ByteString (ResourceT IO) ()
streamFile Range
range (FilePath
tmp FilePath -> FilePath -> FilePath
</> Int64 -> FilePath
forall a. Show a => a -> FilePath
show Int64
int)) Closure
  (FilePath
   -> Int64 -> Range -> ConduitT () ByteString (ResourceT IO) ())
-> Closure FilePath
-> Closure
     (Int64 -> Range -> ConduitT () ByteString (ResourceT IO) ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (Dict (Serializable FilePath))
-> FilePath -> Closure FilePath
forall a. Closure (Dict (Serializable a)) -> a -> Closure a
cpure (static Dict (Serializable FilePath)
forall (a :: Constraint). a => Dict a
Dict) FilePath
tmp',
          ssPut :: Closure (Int64 -> ConduitT ByteString Void (ResourceT IO) ())
ssPut = static (\FilePath
tmp Int64
int -> FilePath -> ConduitT ByteString Void (ResourceT IO) ()
forall (m :: * -> *) o.
MonadResource m =>
FilePath -> ConduitT ByteString o m ()
C.sinkFile (FilePath
tmp FilePath -> FilePath -> FilePath
</> Int64 -> FilePath
forall a. Show a => a -> FilePath
show Int64
int)) Closure
  (FilePath -> Int64 -> ConduitT ByteString Void (ResourceT IO) ())
-> Closure FilePath
-> Closure (Int64 -> ConduitT ByteString Void (ResourceT IO) ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (Dict (Serializable FilePath))
-> FilePath -> Closure FilePath
forall a. Closure (Dict (Serializable a)) -> a -> Closure a
cpure (static Dict (Serializable FilePath)
forall (a :: Constraint). a => Dict a
Dict) FilePath
tmp'
        }

streamFile :: Range -> FilePath -> ConduitT t BS.ByteString (ResourceT IO) ()
streamFile :: Range -> FilePath -> ConduitT t ByteString (ResourceT IO) ()
streamFile Range
RangeAll FilePath
fp = FilePath -> ConduitT t ByteString (ResourceT IO) ()
forall (m :: * -> *) i.
MonadResource m =>
FilePath -> ConduitT i ByteString m ()
C.sourceFile FilePath
fp
streamFile (RangeOnly Integer
start Integer
end) FilePath
fp = FilePath
-> Maybe Integer
-> Maybe Integer
-> ConduitT t ByteString (ResourceT IO) ()
forall (m :: * -> *) i.
MonadResource m =>
FilePath
-> Maybe Integer -> Maybe Integer -> ConduitT i ByteString m ()
sourceFileRange FilePath
fp (Integer -> Maybe Integer
forall a. a -> Maybe a
Just Integer
start) (Integer -> Maybe Integer
forall a. a -> Maybe a
Just (Integer -> Maybe Integer) -> Integer -> Maybe Integer
forall a b. (a -> b) -> a -> b
$ Integer
end Integer -> Integer -> Integer
forall a. Num a => a -> a -> a
- Integer
start Integer -> Integer -> Integer
forall a. Num a => a -> a -> a
+ Integer
1)