{-# LANGUAGE DataKinds #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StaticPointers #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeFamilies #-}
module Control.Distributed.Dataset
( Dataset,
dMap,
dFilter,
dConcatMap,
dGroupedAggr,
dDistinct,
dDistinctBy,
dCoalesce,
dPipe,
dPartition,
module Control.Distributed.Dataset.Aggr,
dAggr,
dFetch,
dToList,
DD,
runDD,
runDDWith,
Backend,
ShuffleStore,
Partition,
mkPartition,
dExternal,
StaticSerialise (..),
StaticHashable (..),
initDistributedFork,
liftIO,
(&),
Closure,
cap,
cpure,
Dict (Dict),
)
where
import Conduit hiding
( Consumer,
Producer,
await,
)
import qualified Conduit as C
import Control.Distributed.Closure
import Control.Distributed.Dataset.Aggr
import Control.Distributed.Dataset.Internal.Aggr
import Control.Distributed.Dataset.Internal.Class
import Control.Distributed.Dataset.Internal.Dataset
import Control.Distributed.Dataset.ShuffleStore
import Control.Distributed.Fork
import qualified Control.Foldl as F
import Control.Lens
import qualified Data.HashMap.Strict as HM
import qualified Data.HashSet as HS
import Data.Hashable
import Data.Typeable
mkPartition :: Typeable a => Closure (ConduitT () a (ResourceT IO) ()) -> Partition a
mkPartition :: Closure (ConduitT () a (ResourceT IO) ()) -> Partition a
mkPartition = Closure (ConduitT () a (ResourceT IO) ()) -> Partition a
forall a. Closure (ConduitT () a (ResourceT IO) ()) -> Partition a
PSimple
dPipe ::
(StaticSerialise a, StaticSerialise b) =>
Closure (ConduitT a b (ResourceT IO) ()) ->
Dataset a ->
Dataset b
dPipe :: Closure (ConduitT a b (ResourceT IO) ()) -> Dataset a -> Dataset b
dPipe = Closure (ConduitT a b (ResourceT IO) ()) -> Dataset a -> Dataset b
forall a b.
(StaticSerialise a, StaticSerialise b) =>
Closure (ConduitT a b (ResourceT IO) ()) -> Dataset a -> Dataset b
DPipe
dExternal :: StaticSerialise a => [Partition a] -> Dataset a
dExternal :: [Partition a] -> Dataset a
dExternal = [Partition a] -> Dataset a
forall a. StaticSerialise a => [Partition a] -> Dataset a
DExternal
dPartition :: (StaticSerialise a, StaticHashable k) => Int -> Closure (a -> k) -> Dataset a -> Dataset a
dPartition :: Int -> Closure (a -> k) -> Dataset a -> Dataset a
dPartition = Int -> Closure (a -> k) -> Dataset a -> Dataset a
forall k a.
(StaticHashable k, StaticSerialise a) =>
Int -> Closure (a -> k) -> Dataset a -> Dataset a
DPartition
dCoalesce :: Typeable a => Int -> Dataset a -> Dataset a
dCoalesce :: Int -> Dataset a -> Dataset a
dCoalesce = Int -> Dataset a -> Dataset a
forall a. Int -> Dataset a -> Dataset a
DCoalesce
dConcatMap :: (StaticSerialise a, StaticSerialise b) => Closure (a -> [b]) -> Dataset a -> Dataset b
dConcatMap :: Closure (a -> [b]) -> Dataset a -> Dataset b
dConcatMap Closure (a -> [b])
f = Closure (ConduitT a b (ResourceT IO) ()) -> Dataset a -> Dataset b
forall a b.
(StaticSerialise a, StaticSerialise b) =>
Closure (ConduitT a b (ResourceT IO) ()) -> Dataset a -> Dataset b
dPipe (Closure (ConduitT a b (ResourceT IO) ())
-> Dataset a -> Dataset b)
-> Closure (ConduitT a b (ResourceT IO) ())
-> Dataset a
-> Dataset b
forall a b. (a -> b) -> a -> b
$ static (forall mono a.
(Monad (ResourceT IO), MonoFoldable mono) =>
(a -> mono) -> ConduitT a (Element mono) (ResourceT IO) ()
forall (m :: * -> *) mono a.
(Monad m, MonoFoldable mono) =>
(a -> mono) -> ConduitT a (Element mono) m ()
C.concatMapC @(ResourceT IO)) Closure ((a -> [b]) -> ConduitT a b (ResourceT IO) ())
-> Closure (a -> [b]) -> Closure (ConduitT a b (ResourceT IO) ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (a -> [b])
f
dMap :: (StaticSerialise a, StaticSerialise b) => Closure (a -> b) -> Dataset a -> Dataset b
dMap :: Closure (a -> b) -> Dataset a -> Dataset b
dMap Closure (a -> b)
f = Closure (a -> [b]) -> Dataset a -> Dataset b
forall a b.
(StaticSerialise a, StaticSerialise b) =>
Closure (a -> [b]) -> Dataset a -> Dataset b
dConcatMap (Closure (a -> [b]) -> Dataset a -> Dataset b)
-> Closure (a -> [b]) -> Dataset a -> Dataset b
forall a b. (a -> b) -> a -> b
$ static (b -> [b]
forall (f :: * -> *) a. Applicative f => a -> f a
pure (b -> [b]) -> (a -> b) -> a -> [b]
forall b c a. (b -> c) -> (a -> b) -> a -> c
.) Closure ((a -> b) -> a -> [b])
-> Closure (a -> b) -> Closure (a -> [b])
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (a -> b)
f
dFilter :: (StaticSerialise a) => Closure (a -> Bool) -> Dataset a -> Dataset a
dFilter :: Closure (a -> Bool) -> Dataset a -> Dataset a
dFilter Closure (a -> Bool)
f = Closure (a -> [a]) -> Dataset a -> Dataset a
forall a b.
(StaticSerialise a, StaticSerialise b) =>
Closure (a -> [b]) -> Dataset a -> Dataset b
dConcatMap (Closure (a -> [a]) -> Dataset a -> Dataset a)
-> Closure (a -> [a]) -> Dataset a -> Dataset a
forall a b. (a -> b) -> a -> b
$ static (\a -> Bool
f_ a
a -> if a -> Bool
f_ a
a then [a
a] else []) Closure ((a -> Bool) -> a -> [a])
-> Closure (a -> Bool) -> Closure (a -> [a])
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (a -> Bool)
f
dAggr :: (StaticSerialise a, StaticSerialise b) => Aggr a b -> Dataset a -> DD b
dAggr :: Aggr a b -> Dataset a -> DD b
dAggr aggr :: Aggr a b
aggr@(Aggr Closure (Fold a t)
_ Closure (Fold t b)
fc) Dataset a
ds = do
ConduitT () ((), b) (ResourceT IO) ()
c <-
Dataset a
ds
Dataset a -> (Dataset a -> Dataset ((), b)) -> Dataset ((), b)
forall a b. a -> (a -> b) -> b
& Int
-> Closure (a -> ()) -> Aggr a b -> Dataset a -> Dataset ((), b)
forall k b a.
(StaticHashable k, StaticSerialise k, StaticSerialise b) =>
Int -> Closure (a -> k) -> Aggr a b -> Dataset a -> Dataset (k, b)
dGroupedAggr Int
1 (static (() -> a -> ()
forall a b. a -> b -> a
const ())) Aggr a b
aggr
Dataset ((), b)
-> (Dataset ((), b) -> DD (ConduitT () ((), b) (ResourceT IO) ()))
-> DD (ConduitT () ((), b) (ResourceT IO) ())
forall a b. a -> (a -> b) -> b
& Dataset ((), b) -> DD (ConduitT () ((), b) (ResourceT IO) ())
forall a.
StaticSerialise a =>
Dataset a -> DD (ConduitT () a (ResourceT IO) ())
dFetch
IO (Maybe ((), b)) -> DD (Maybe ((), b))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (ConduitT () Void (ResourceT IO) (Maybe ((), b))
-> IO (Maybe ((), b))
forall (m :: * -> *) r.
MonadUnliftIO m =>
ConduitT () Void (ResourceT m) r -> m r
runConduitRes (ConduitT () Void (ResourceT IO) (Maybe ((), b))
-> IO (Maybe ((), b)))
-> ConduitT () Void (ResourceT IO) (Maybe ((), b))
-> IO (Maybe ((), b))
forall a b. (a -> b) -> a -> b
$ ConduitT () ((), b) (ResourceT IO) ()
c ConduitT () ((), b) (ResourceT IO) ()
-> ConduitM ((), b) Void (ResourceT IO) (Maybe ((), b))
-> ConduitT () Void (ResourceT IO) (Maybe ((), b))
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| ConduitM ((), b) Void (ResourceT IO) (Maybe ((), b))
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
C.await) DD (Maybe ((), b)) -> (Maybe ((), b) -> DD b) -> DD b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= b -> DD b
forall (m :: * -> *) a. Monad m => a -> m a
return (b -> DD b) -> (Maybe ((), b) -> b) -> Maybe ((), b) -> DD b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. \case
Just ((), b
r) -> b
r
Maybe ((), b)
Nothing -> Fold t b -> [t] -> b
forall (f :: * -> *) a b. Foldable f => Fold a b -> f a -> b
F.fold (Closure (Fold t b) -> Fold t b
forall a. Closure a -> a
unclosure Closure (Fold t b)
fc) []
dDistinct :: StaticHashable a => Int -> Dataset a -> Dataset a
dDistinct :: Int -> Dataset a -> Dataset a
dDistinct Int
partitionCount = Int -> Closure (a -> a) -> Dataset a -> Dataset a
forall b a.
StaticHashable b =>
Int -> Closure (a -> b) -> Dataset a -> Dataset a
dDistinctBy Int
partitionCount (static a -> a
forall a. a -> a
id)
dDistinctBy ::
StaticHashable b =>
Int ->
Closure (a -> b) ->
Dataset a ->
Dataset a
dDistinctBy :: Int -> Closure (a -> b) -> Dataset a -> Dataset a
dDistinctBy Int
partitionCount (Closure (a -> b)
key :: Closure (a -> b)) Dataset a
ds =
case Dataset a -> Dict (StaticSerialise a)
forall a. Dataset a -> Dict (StaticSerialise a)
dStaticSerialise Dataset a
ds of
Dict (StaticSerialise a)
Dict ->
Dataset a
ds
Dataset a -> (Dataset a -> Dataset a) -> Dataset a
forall a b. a -> (a -> b) -> b
& Closure (ConduitT a a (ResourceT IO) ()) -> Dataset a -> Dataset a
forall a b.
(StaticSerialise a, StaticSerialise b) =>
Closure (ConduitT a b (ResourceT IO) ()) -> Dataset a -> Dataset b
dPipe (static (\Dict (Typeable b, Eq b, Hashable b)
Dict -> HashSet b -> (a -> b) -> ConduitT a a (ResourceT IO) ()
forall (m :: * -> *) a i.
(Monad m, Eq a, Hashable a) =>
HashSet a -> (i -> a) -> ConduitT i i m ()
dedupe HashSet b
forall a. HashSet a
HS.empty) Closure
(Dict (Typeable b, Eq b, Hashable b)
-> (a -> b) -> ConduitT a a (ResourceT IO) ())
-> Closure (Dict (Typeable b, Eq b, Hashable b))
-> Closure ((a -> b) -> ConduitT a a (ResourceT IO) ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` StaticHashable b => Closure (Dict (Typeable b, Eq b, Hashable b))
forall a.
StaticHashable a =>
Closure (Dict (Typeable a, Eq a, Hashable a))
staticHashable @b Closure ((a -> b) -> ConduitT a a (ResourceT IO) ())
-> Closure (a -> b) -> Closure (ConduitT a a (ResourceT IO) ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (a -> b)
key)
Dataset a -> (Dataset a -> Dataset a) -> Dataset a
forall a b. a -> (a -> b) -> b
& Int -> Closure (a -> b) -> Dataset a -> Dataset a
forall a k.
(StaticSerialise a, StaticHashable k) =>
Int -> Closure (a -> k) -> Dataset a -> Dataset a
dPartition Int
partitionCount Closure (a -> b)
key
Dataset a -> (Dataset a -> Dataset a) -> Dataset a
forall a b. a -> (a -> b) -> b
& Closure (ConduitT a a (ResourceT IO) ()) -> Dataset a -> Dataset a
forall a b.
(StaticSerialise a, StaticSerialise b) =>
Closure (ConduitT a b (ResourceT IO) ()) -> Dataset a -> Dataset b
dPipe (static (\Dict (Typeable b, Eq b, Hashable b)
Dict -> HashSet b -> (a -> b) -> ConduitT a a (ResourceT IO) ()
forall (m :: * -> *) a i.
(Monad m, Eq a, Hashable a) =>
HashSet a -> (i -> a) -> ConduitT i i m ()
dedupe HashSet b
forall a. HashSet a
HS.empty) Closure
(Dict (Typeable b, Eq b, Hashable b)
-> (a -> b) -> ConduitT a a (ResourceT IO) ())
-> Closure (Dict (Typeable b, Eq b, Hashable b))
-> Closure ((a -> b) -> ConduitT a a (ResourceT IO) ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` StaticHashable b => Closure (Dict (Typeable b, Eq b, Hashable b))
forall a.
StaticHashable a =>
Closure (Dict (Typeable a, Eq a, Hashable a))
staticHashable @b Closure ((a -> b) -> ConduitT a a (ResourceT IO) ())
-> Closure (a -> b) -> Closure (ConduitT a a (ResourceT IO) ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (a -> b)
key)
where
dedupe :: HashSet a -> (i -> a) -> ConduitT i i m ()
dedupe HashSet a
acc i -> a
f =
ConduitT i i m (Maybe i)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
C.await ConduitT i i m (Maybe i)
-> (Maybe i -> ConduitT i i m ()) -> ConduitT i i m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Maybe i
Nothing -> () -> ConduitT i i m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just i
a ->
let b :: a
b = i -> a
f i
a
in if a
b a -> HashSet a -> Bool
forall a. (Eq a, Hashable a) => a -> HashSet a -> Bool
`HS.member` HashSet a
acc
then HashSet a -> (i -> a) -> ConduitT i i m ()
dedupe HashSet a
acc i -> a
f
else i -> ConduitT i i m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield i
a ConduitT i i m () -> ConduitT i i m () -> ConduitT i i m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> HashSet a -> (i -> a) -> ConduitT i i m ()
dedupe (a
b a -> HashSet a -> HashSet a
forall a. (Eq a, Hashable a) => a -> HashSet a -> HashSet a
`HS.insert` HashSet a
acc) i -> a
f
dGroupedAggr ::
(StaticHashable k, StaticSerialise k, StaticSerialise b) =>
Int ->
Closure (a -> k) ->
Aggr a b ->
Dataset a ->
Dataset (k, b)
dGroupedAggr :: Int -> Closure (a -> k) -> Aggr a b -> Dataset a -> Dataset (k, b)
dGroupedAggr
Int
partitionCount
(Closure (a -> k)
kc :: Closure (a -> k))
( Aggr
(Closure (Fold a t)
f1c :: Closure (F.Fold a t))
(Closure (Fold t b)
f2c :: Closure (F.Fold t b))
)
Dataset a
ds =
case Dataset a -> Dict (StaticSerialise a)
forall a. Dataset a -> Dict (StaticSerialise a)
dStaticSerialise Dataset a
ds of
Dict (StaticSerialise a)
Dict ->
Dataset a
ds
Dataset a -> (Dataset a -> Dataset (k, a)) -> Dataset (k, a)
forall a b. a -> (a -> b) -> b
& Closure (a -> (k, a)) -> Dataset a -> Dataset (k, a)
forall a b.
(StaticSerialise a, StaticSerialise b) =>
Closure (a -> b) -> Dataset a -> Dataset b
dMap (static (\a -> k
k a
a -> (a -> k
k a
a, a
a)) Closure ((a -> k) -> a -> (k, a))
-> Closure (a -> k) -> Closure (a -> (k, a))
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (a -> k)
kc)
Dataset (k, a)
-> (Dataset (k, a) -> Dataset (k, t)) -> Dataset (k, t)
forall a b. a -> (a -> b) -> b
& Closure (ConduitT (k, a) (k, t) (ResourceT IO) ())
-> Dataset (k, a) -> Dataset (k, t)
forall a b.
(StaticSerialise a, StaticSerialise b) =>
Closure (ConduitT a b (ResourceT IO) ()) -> Dataset a -> Dataset b
dPipe
( static (\Dict (Typeable k, Eq k, Hashable k)
Dict -> (Eq k, Hashable k) =>
Fold a t -> ConduitT (k, a) (k, t) (ResourceT IO) ()
forall a' b' k'.
(Eq k', Hashable k') =>
Fold a' b' -> ConduitT (k', a') (k', b') (ResourceT IO) ()
aggrC @a @t @k)
Closure
(Dict (Typeable k, Eq k, Hashable k)
-> Fold a t -> ConduitT (k, a) (k, t) (ResourceT IO) ())
-> Closure (Dict (Typeable k, Eq k, Hashable k))
-> Closure (Fold a t -> ConduitT (k, a) (k, t) (ResourceT IO) ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` StaticHashable k => Closure (Dict (Typeable k, Eq k, Hashable k))
forall a.
StaticHashable a =>
Closure (Dict (Typeable a, Eq a, Hashable a))
staticHashable @k
Closure (Fold a t -> ConduitT (k, a) (k, t) (ResourceT IO) ())
-> Closure (Fold a t)
-> Closure (ConduitT (k, a) (k, t) (ResourceT IO) ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (Fold a t)
f1c
)
Dataset (k, t)
-> (Dataset (k, t) -> Dataset (k, t)) -> Dataset (k, t)
forall a b. a -> (a -> b) -> b
& Int -> Closure ((k, t) -> k) -> Dataset (k, t) -> Dataset (k, t)
forall a k.
(StaticSerialise a, StaticHashable k) =>
Int -> Closure (a -> k) -> Dataset a -> Dataset a
dPartition Int
partitionCount (static ((k, t) -> k
forall a b. (a, b) -> a
fst @k @t))
Dataset (k, t)
-> (Dataset (k, t) -> Dataset (k, b)) -> Dataset (k, b)
forall a b. a -> (a -> b) -> b
& Closure (ConduitT (k, t) (k, b) (ResourceT IO) ())
-> Dataset (k, t) -> Dataset (k, b)
forall a b.
(StaticSerialise a, StaticSerialise b) =>
Closure (ConduitT a b (ResourceT IO) ()) -> Dataset a -> Dataset b
dPipe
( static (\Dict (Typeable k, Eq k, Hashable k)
Dict -> (Eq k, Hashable k) =>
Fold t b -> ConduitT (k, t) (k, b) (ResourceT IO) ()
forall a' b' k'.
(Eq k', Hashable k') =>
Fold a' b' -> ConduitT (k', a') (k', b') (ResourceT IO) ()
aggrC @t @b @k)
Closure
(Dict (Typeable k, Eq k, Hashable k)
-> Fold t b -> ConduitT (k, t) (k, b) (ResourceT IO) ())
-> Closure (Dict (Typeable k, Eq k, Hashable k))
-> Closure (Fold t b -> ConduitT (k, t) (k, b) (ResourceT IO) ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` StaticHashable k => Closure (Dict (Typeable k, Eq k, Hashable k))
forall a.
StaticHashable a =>
Closure (Dict (Typeable a, Eq a, Hashable a))
staticHashable @k
Closure (Fold t b -> ConduitT (k, t) (k, b) (ResourceT IO) ())
-> Closure (Fold t b)
-> Closure (ConduitT (k, t) (k, b) (ResourceT IO) ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (Fold t b)
f2c
)
where
aggrC ::
forall a' b' k'.
(Eq k', Hashable k') =>
F.Fold a' b' ->
ConduitT (k', a') (k', b') (ResourceT IO) ()
aggrC :: Fold a' b' -> ConduitT (k', a') (k', b') (ResourceT IO) ()
aggrC (F.Fold x -> a' -> x
step x
init' x -> b'
extract) =
(x -> a' -> x)
-> x
-> (x -> b')
-> HashMap k' x
-> ConduitT (k', a') (k', b') (ResourceT IO) ()
forall a' b' x' k'.
(Eq k', Hashable k') =>
(b' -> a' -> b')
-> b'
-> (b' -> x')
-> HashMap k' b'
-> ConduitT (k', a') (k', x') (ResourceT IO) ()
go x -> a' -> x
step x
init' x -> b'
extract HashMap k' x
forall k v. HashMap k v
HM.empty
go ::
forall a' b' x' k'.
(Eq k', Hashable k') =>
(b' -> a' -> b') ->
b' ->
(b' -> x') ->
HM.HashMap k' b' ->
ConduitT (k', a') (k', x') (ResourceT IO) ()
go :: (b' -> a' -> b')
-> b'
-> (b' -> x')
-> HashMap k' b'
-> ConduitT (k', a') (k', x') (ResourceT IO) ()
go b' -> a' -> b'
step b'
init' b' -> x'
extract HashMap k' b'
hm =
ConduitT (k', a') (k', x') (ResourceT IO) (Maybe (k', a'))
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
C.await ConduitT (k', a') (k', x') (ResourceT IO) (Maybe (k', a'))
-> (Maybe (k', a') -> ConduitT (k', a') (k', x') (ResourceT IO) ())
-> ConduitT (k', a') (k', x') (ResourceT IO) ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Maybe (k', a')
Nothing ->
((k', b') -> ConduitT (k', a') (k', x') (ResourceT IO) ())
-> [(k', b')] -> ConduitT (k', a') (k', x') (ResourceT IO) ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_
(\(k'
k, b'
b) -> (k', x') -> ConduitT (k', a') (k', x') (ResourceT IO) ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield (k'
k, b' -> x'
extract b'
b))
(HashMap k' b' -> [(k', b')]
forall k v. HashMap k v -> [(k, v)]
HM.toList HashMap k' b'
hm)
Just (k'
k, a'
a) ->
(b' -> a' -> b')
-> b'
-> (b' -> x')
-> HashMap k' b'
-> ConduitT (k', a') (k', x') (ResourceT IO) ()
forall a' b' x' k'.
(Eq k', Hashable k') =>
(b' -> a' -> b')
-> b'
-> (b' -> x')
-> HashMap k' b'
-> ConduitT (k', a') (k', x') (ResourceT IO) ()
go b' -> a' -> b'
step b'
init' b' -> x'
extract (HashMap k' b' -> ConduitT (k', a') (k', x') (ResourceT IO) ())
-> HashMap k' b' -> ConduitT (k', a') (k', x') (ResourceT IO) ()
forall a b. (a -> b) -> a -> b
$
(Maybe b' -> Maybe b') -> k' -> HashMap k' b' -> HashMap k' b'
forall k v.
(Eq k, Hashable k) =>
(Maybe v -> Maybe v) -> k -> HashMap k v -> HashMap k v
HM.alter
( b' -> Maybe b'
forall a. a -> Maybe a
Just (b' -> Maybe b') -> (Maybe b' -> b') -> Maybe b' -> Maybe b'
forall b c a. (b -> c) -> (a -> b) -> a -> c
. \case
Maybe b'
Nothing -> b' -> a' -> b'
step b'
init' a'
a
Just b'
st -> b' -> a' -> b'
step b'
st a'
a
)
k'
k
HashMap k' b'
hm