{-# LANGUAGE DataKinds #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StaticPointers #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeFamilies #-}

module Control.Distributed.Dataset
  ( Dataset,

    -- * Transformations
    dMap,
    dFilter,
    dConcatMap,
    dGroupedAggr,
    dDistinct,
    dDistinctBy,

    -- ** Low-level transformations
    dCoalesce,
    dPipe,
    dPartition,

    -- * Aggregations
    module Control.Distributed.Dataset.Aggr,

    -- * Execution
    dAggr,
    dFetch,
    dToList,
    DD,
    runDD,
    runDDWith,
    Backend,
    ShuffleStore,

    -- * Creating datasets
    Partition,
    mkPartition,
    dExternal,

    -- * Class
    StaticSerialise (..),
    StaticHashable (..),

    -- * Re-exports
    initDistributedFork,
    liftIO,
    (&),

    -- ** Closure
    Closure,
    cap,
    cpure,
    Dict (Dict),
  )
where

import Conduit hiding
  ( Consumer,
    Producer,
    await,
  )
import qualified Conduit as C
import Control.Distributed.Closure
import Control.Distributed.Dataset.Aggr
import Control.Distributed.Dataset.Internal.Aggr
import Control.Distributed.Dataset.Internal.Class
import Control.Distributed.Dataset.Internal.Dataset
import Control.Distributed.Dataset.ShuffleStore
import Control.Distributed.Fork
import qualified Control.Foldl as F
import Control.Lens
import qualified Data.HashMap.Strict as HM
import qualified Data.HashSet as HS
import Data.Hashable
import Data.Typeable

-- |
-- Create a 'Partition' given a Source conduit.
mkPartition :: Typeable a => Closure (ConduitT () a (ResourceT IO) ()) -> Partition a
mkPartition :: Closure (ConduitT () a (ResourceT IO) ()) -> Partition a
mkPartition = Closure (ConduitT () a (ResourceT IO) ()) -> Partition a
forall a. Closure (ConduitT () a (ResourceT IO) ()) -> Partition a
PSimple

-- * Dataset

-- |
-- Transforms a 'Dataset' by passing every partition through the given Conduit.
dPipe ::
  (StaticSerialise a, StaticSerialise b) =>
  Closure (ConduitT a b (ResourceT IO) ()) ->
  Dataset a ->
  Dataset b
dPipe :: Closure (ConduitT a b (ResourceT IO) ()) -> Dataset a -> Dataset b
dPipe = Closure (ConduitT a b (ResourceT IO) ()) -> Dataset a -> Dataset b
forall a b.
(StaticSerialise a, StaticSerialise b) =>
Closure (ConduitT a b (ResourceT IO) ()) -> Dataset a -> Dataset b
DPipe

-- |
-- Create a dataset from given `Partition`'s.

-- This is how every 'Dataset' is created initially.
dExternal :: StaticSerialise a => [Partition a] -> Dataset a
dExternal :: [Partition a] -> Dataset a
dExternal = [Partition a] -> Dataset a
forall a. StaticSerialise a => [Partition a] -> Dataset a
DExternal

-- |
-- Re-partition the dataset using the given function so that the items with the same 'k' will
-- end up in the same partition.
dPartition :: (StaticSerialise a, StaticHashable k) => Int -> Closure (a -> k) -> Dataset a -> Dataset a
dPartition :: Int -> Closure (a -> k) -> Dataset a -> Dataset a
dPartition = Int -> Closure (a -> k) -> Dataset a -> Dataset a
forall k a.
(StaticHashable k, StaticSerialise a) =>
Int -> Closure (a -> k) -> Dataset a -> Dataset a
DPartition

-- |
-- Coalesce partitions together to get the specified number of partitions.
dCoalesce :: Typeable a => Int -> Dataset a -> Dataset a
dCoalesce :: Int -> Dataset a -> Dataset a
dCoalesce = Int -> Dataset a -> Dataset a
forall a. Int -> Dataset a -> Dataset a
DCoalesce

-- * Dataset API

-- |
-- Returns a new Dataset by first applying a function to all elements of this Dataset, and then
-- flattening the results.
dConcatMap :: (StaticSerialise a, StaticSerialise b) => Closure (a -> [b]) -> Dataset a -> Dataset b
dConcatMap :: Closure (a -> [b]) -> Dataset a -> Dataset b
dConcatMap Closure (a -> [b])
f = Closure (ConduitT a b (ResourceT IO) ()) -> Dataset a -> Dataset b
forall a b.
(StaticSerialise a, StaticSerialise b) =>
Closure (ConduitT a b (ResourceT IO) ()) -> Dataset a -> Dataset b
dPipe (Closure (ConduitT a b (ResourceT IO) ())
 -> Dataset a -> Dataset b)
-> Closure (ConduitT a b (ResourceT IO) ())
-> Dataset a
-> Dataset b
forall a b. (a -> b) -> a -> b
$ static (forall mono a.
(Monad (ResourceT IO), MonoFoldable mono) =>
(a -> mono) -> ConduitT a (Element mono) (ResourceT IO) ()
forall (m :: * -> *) mono a.
(Monad m, MonoFoldable mono) =>
(a -> mono) -> ConduitT a (Element mono) m ()
C.concatMapC @(ResourceT IO)) Closure ((a -> [b]) -> ConduitT a b (ResourceT IO) ())
-> Closure (a -> [b]) -> Closure (ConduitT a b (ResourceT IO) ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (a -> [b])
f

-- |
-- Returns a new Dataset that contains the result of applying the given function to each element.
dMap :: (StaticSerialise a, StaticSerialise b) => Closure (a -> b) -> Dataset a -> Dataset b
dMap :: Closure (a -> b) -> Dataset a -> Dataset b
dMap Closure (a -> b)
f = Closure (a -> [b]) -> Dataset a -> Dataset b
forall a b.
(StaticSerialise a, StaticSerialise b) =>
Closure (a -> [b]) -> Dataset a -> Dataset b
dConcatMap (Closure (a -> [b]) -> Dataset a -> Dataset b)
-> Closure (a -> [b]) -> Dataset a -> Dataset b
forall a b. (a -> b) -> a -> b
$ static (b -> [b]
forall (f :: * -> *) a. Applicative f => a -> f a
pure (b -> [b]) -> (a -> b) -> a -> [b]
forall b c a. (b -> c) -> (a -> b) -> a -> c
.) Closure ((a -> b) -> a -> [b])
-> Closure (a -> b) -> Closure (a -> [b])
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (a -> b)
f

-- |
-- Returns a new Dataset that only contains elements where the given function returns true.
dFilter :: (StaticSerialise a) => Closure (a -> Bool) -> Dataset a -> Dataset a
dFilter :: Closure (a -> Bool) -> Dataset a -> Dataset a
dFilter Closure (a -> Bool)
f = Closure (a -> [a]) -> Dataset a -> Dataset a
forall a b.
(StaticSerialise a, StaticSerialise b) =>
Closure (a -> [b]) -> Dataset a -> Dataset b
dConcatMap (Closure (a -> [a]) -> Dataset a -> Dataset a)
-> Closure (a -> [a]) -> Dataset a -> Dataset a
forall a b. (a -> b) -> a -> b
$ static (\a -> Bool
f_ a
a -> if a -> Bool
f_ a
a then [a
a] else []) Closure ((a -> Bool) -> a -> [a])
-> Closure (a -> Bool) -> Closure (a -> [a])
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (a -> Bool)
f

-- |
-- Apply an aggregation to all items on a Dataset, and fetch the result.
dAggr :: (StaticSerialise a, StaticSerialise b) => Aggr a b -> Dataset a -> DD b
dAggr :: Aggr a b -> Dataset a -> DD b
dAggr aggr :: Aggr a b
aggr@(Aggr Closure (Fold a t)
_ Closure (Fold t b)
fc) Dataset a
ds = do
  ConduitT () ((), b) (ResourceT IO) ()
c <-
    Dataset a
ds
      Dataset a -> (Dataset a -> Dataset ((), b)) -> Dataset ((), b)
forall a b. a -> (a -> b) -> b
& Int
-> Closure (a -> ()) -> Aggr a b -> Dataset a -> Dataset ((), b)
forall k b a.
(StaticHashable k, StaticSerialise k, StaticSerialise b) =>
Int -> Closure (a -> k) -> Aggr a b -> Dataset a -> Dataset (k, b)
dGroupedAggr Int
1 (static (() -> a -> ()
forall a b. a -> b -> a
const ())) Aggr a b
aggr
      Dataset ((), b)
-> (Dataset ((), b) -> DD (ConduitT () ((), b) (ResourceT IO) ()))
-> DD (ConduitT () ((), b) (ResourceT IO) ())
forall a b. a -> (a -> b) -> b
& Dataset ((), b) -> DD (ConduitT () ((), b) (ResourceT IO) ())
forall a.
StaticSerialise a =>
Dataset a -> DD (ConduitT () a (ResourceT IO) ())
dFetch
  IO (Maybe ((), b)) -> DD (Maybe ((), b))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (ConduitT () Void (ResourceT IO) (Maybe ((), b))
-> IO (Maybe ((), b))
forall (m :: * -> *) r.
MonadUnliftIO m =>
ConduitT () Void (ResourceT m) r -> m r
runConduitRes (ConduitT () Void (ResourceT IO) (Maybe ((), b))
 -> IO (Maybe ((), b)))
-> ConduitT () Void (ResourceT IO) (Maybe ((), b))
-> IO (Maybe ((), b))
forall a b. (a -> b) -> a -> b
$ ConduitT () ((), b) (ResourceT IO) ()
c ConduitT () ((), b) (ResourceT IO) ()
-> ConduitM ((), b) Void (ResourceT IO) (Maybe ((), b))
-> ConduitT () Void (ResourceT IO) (Maybe ((), b))
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| ConduitM ((), b) Void (ResourceT IO) (Maybe ((), b))
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
C.await) DD (Maybe ((), b)) -> (Maybe ((), b) -> DD b) -> DD b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= b -> DD b
forall (m :: * -> *) a. Monad m => a -> m a
return (b -> DD b) -> (Maybe ((), b) -> b) -> Maybe ((), b) -> DD b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. \case
    Just ((), b
r) -> b
r
    Maybe ((), b)
Nothing -> Fold t b -> [t] -> b
forall (f :: * -> *) a b. Foldable f => Fold a b -> f a -> b
F.fold (Closure (Fold t b) -> Fold t b
forall a. Closure a -> a
unclosure Closure (Fold t b)
fc) []

-- |
-- Removes a new dataset with duplicate rows removed.
dDistinct :: StaticHashable a => Int -> Dataset a -> Dataset a
dDistinct :: Int -> Dataset a -> Dataset a
dDistinct Int
partitionCount = Int -> Closure (a -> a) -> Dataset a -> Dataset a
forall b a.
StaticHashable b =>
Int -> Closure (a -> b) -> Dataset a -> Dataset a
dDistinctBy Int
partitionCount (static a -> a
forall a. a -> a
id)

-- |
-- Removes a new dataset with rows with the duplicate keys removed.
dDistinctBy ::
  StaticHashable b =>
  -- | Target number of partitions
  Int ->
  Closure (a -> b) ->
  Dataset a ->
  Dataset a
dDistinctBy :: Int -> Closure (a -> b) -> Dataset a -> Dataset a
dDistinctBy Int
partitionCount (Closure (a -> b)
key :: Closure (a -> b)) Dataset a
ds =
  case Dataset a -> Dict (StaticSerialise a)
forall a. Dataset a -> Dict (StaticSerialise a)
dStaticSerialise Dataset a
ds of
    Dict (StaticSerialise a)
Dict ->
      Dataset a
ds
        Dataset a -> (Dataset a -> Dataset a) -> Dataset a
forall a b. a -> (a -> b) -> b
& Closure (ConduitT a a (ResourceT IO) ()) -> Dataset a -> Dataset a
forall a b.
(StaticSerialise a, StaticSerialise b) =>
Closure (ConduitT a b (ResourceT IO) ()) -> Dataset a -> Dataset b
dPipe (static (\Dict (Typeable b, Eq b, Hashable b)
Dict -> HashSet b -> (a -> b) -> ConduitT a a (ResourceT IO) ()
forall (m :: * -> *) a i.
(Monad m, Eq a, Hashable a) =>
HashSet a -> (i -> a) -> ConduitT i i m ()
dedupe HashSet b
forall a. HashSet a
HS.empty) Closure
  (Dict (Typeable b, Eq b, Hashable b)
   -> (a -> b) -> ConduitT a a (ResourceT IO) ())
-> Closure (Dict (Typeable b, Eq b, Hashable b))
-> Closure ((a -> b) -> ConduitT a a (ResourceT IO) ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` StaticHashable b => Closure (Dict (Typeable b, Eq b, Hashable b))
forall a.
StaticHashable a =>
Closure (Dict (Typeable a, Eq a, Hashable a))
staticHashable @b Closure ((a -> b) -> ConduitT a a (ResourceT IO) ())
-> Closure (a -> b) -> Closure (ConduitT a a (ResourceT IO) ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (a -> b)
key)
        Dataset a -> (Dataset a -> Dataset a) -> Dataset a
forall a b. a -> (a -> b) -> b
& Int -> Closure (a -> b) -> Dataset a -> Dataset a
forall a k.
(StaticSerialise a, StaticHashable k) =>
Int -> Closure (a -> k) -> Dataset a -> Dataset a
dPartition Int
partitionCount Closure (a -> b)
key
        Dataset a -> (Dataset a -> Dataset a) -> Dataset a
forall a b. a -> (a -> b) -> b
& Closure (ConduitT a a (ResourceT IO) ()) -> Dataset a -> Dataset a
forall a b.
(StaticSerialise a, StaticSerialise b) =>
Closure (ConduitT a b (ResourceT IO) ()) -> Dataset a -> Dataset b
dPipe (static (\Dict (Typeable b, Eq b, Hashable b)
Dict -> HashSet b -> (a -> b) -> ConduitT a a (ResourceT IO) ()
forall (m :: * -> *) a i.
(Monad m, Eq a, Hashable a) =>
HashSet a -> (i -> a) -> ConduitT i i m ()
dedupe HashSet b
forall a. HashSet a
HS.empty) Closure
  (Dict (Typeable b, Eq b, Hashable b)
   -> (a -> b) -> ConduitT a a (ResourceT IO) ())
-> Closure (Dict (Typeable b, Eq b, Hashable b))
-> Closure ((a -> b) -> ConduitT a a (ResourceT IO) ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` StaticHashable b => Closure (Dict (Typeable b, Eq b, Hashable b))
forall a.
StaticHashable a =>
Closure (Dict (Typeable a, Eq a, Hashable a))
staticHashable @b Closure ((a -> b) -> ConduitT a a (ResourceT IO) ())
-> Closure (a -> b) -> Closure (ConduitT a a (ResourceT IO) ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (a -> b)
key)
  where
    dedupe :: HashSet a -> (i -> a) -> ConduitT i i m ()
dedupe HashSet a
acc i -> a
f =
      ConduitT i i m (Maybe i)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
C.await ConduitT i i m (Maybe i)
-> (Maybe i -> ConduitT i i m ()) -> ConduitT i i m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
        Maybe i
Nothing -> () -> ConduitT i i m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
        Just i
a ->
          let b :: a
b = i -> a
f i
a
           in if a
b a -> HashSet a -> Bool
forall a. (Eq a, Hashable a) => a -> HashSet a -> Bool
`HS.member` HashSet a
acc
                then HashSet a -> (i -> a) -> ConduitT i i m ()
dedupe HashSet a
acc i -> a
f
                else i -> ConduitT i i m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield i
a ConduitT i i m () -> ConduitT i i m () -> ConduitT i i m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> HashSet a -> (i -> a) -> ConduitT i i m ()
dedupe (a
b a -> HashSet a -> HashSet a
forall a. (Eq a, Hashable a) => a -> HashSet a -> HashSet a
`HS.insert` HashSet a
acc) i -> a
f

-- |
-- Apply an aggregation to all rows sharing the same key.
dGroupedAggr ::
  (StaticHashable k, StaticSerialise k, StaticSerialise b) =>
  -- | Target number of partitions
  Int ->
  -- | Grouping key
  Closure (a -> k) ->
  Aggr a b ->
  Dataset a ->
  Dataset (k, b)
dGroupedAggr :: Int -> Closure (a -> k) -> Aggr a b -> Dataset a -> Dataset (k, b)
dGroupedAggr
  Int
partitionCount
  (Closure (a -> k)
kc :: Closure (a -> k))
  ( Aggr
      (Closure (Fold a t)
f1c :: Closure (F.Fold a t))
      (Closure (Fold t b)
f2c :: Closure (F.Fold t b))
    )
  Dataset a
ds =
    case Dataset a -> Dict (StaticSerialise a)
forall a. Dataset a -> Dict (StaticSerialise a)
dStaticSerialise Dataset a
ds of
      Dict (StaticSerialise a)
Dict ->
        Dataset a
ds
          Dataset a -> (Dataset a -> Dataset (k, a)) -> Dataset (k, a)
forall a b. a -> (a -> b) -> b
& Closure (a -> (k, a)) -> Dataset a -> Dataset (k, a)
forall a b.
(StaticSerialise a, StaticSerialise b) =>
Closure (a -> b) -> Dataset a -> Dataset b
dMap (static (\a -> k
k a
a -> (a -> k
k a
a, a
a)) Closure ((a -> k) -> a -> (k, a))
-> Closure (a -> k) -> Closure (a -> (k, a))
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (a -> k)
kc)
          Dataset (k, a)
-> (Dataset (k, a) -> Dataset (k, t)) -> Dataset (k, t)
forall a b. a -> (a -> b) -> b
& Closure (ConduitT (k, a) (k, t) (ResourceT IO) ())
-> Dataset (k, a) -> Dataset (k, t)
forall a b.
(StaticSerialise a, StaticSerialise b) =>
Closure (ConduitT a b (ResourceT IO) ()) -> Dataset a -> Dataset b
dPipe
            ( static (\Dict (Typeable k, Eq k, Hashable k)
Dict -> (Eq k, Hashable k) =>
Fold a t -> ConduitT (k, a) (k, t) (ResourceT IO) ()
forall a' b' k'.
(Eq k', Hashable k') =>
Fold a' b' -> ConduitT (k', a') (k', b') (ResourceT IO) ()
aggrC @a @t @k)
                Closure
  (Dict (Typeable k, Eq k, Hashable k)
   -> Fold a t -> ConduitT (k, a) (k, t) (ResourceT IO) ())
-> Closure (Dict (Typeable k, Eq k, Hashable k))
-> Closure (Fold a t -> ConduitT (k, a) (k, t) (ResourceT IO) ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` StaticHashable k => Closure (Dict (Typeable k, Eq k, Hashable k))
forall a.
StaticHashable a =>
Closure (Dict (Typeable a, Eq a, Hashable a))
staticHashable @k
                Closure (Fold a t -> ConduitT (k, a) (k, t) (ResourceT IO) ())
-> Closure (Fold a t)
-> Closure (ConduitT (k, a) (k, t) (ResourceT IO) ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (Fold a t)
f1c
            )
          Dataset (k, t)
-> (Dataset (k, t) -> Dataset (k, t)) -> Dataset (k, t)
forall a b. a -> (a -> b) -> b
& Int -> Closure ((k, t) -> k) -> Dataset (k, t) -> Dataset (k, t)
forall a k.
(StaticSerialise a, StaticHashable k) =>
Int -> Closure (a -> k) -> Dataset a -> Dataset a
dPartition Int
partitionCount (static ((k, t) -> k
forall a b. (a, b) -> a
fst @k @t))
          Dataset (k, t)
-> (Dataset (k, t) -> Dataset (k, b)) -> Dataset (k, b)
forall a b. a -> (a -> b) -> b
& Closure (ConduitT (k, t) (k, b) (ResourceT IO) ())
-> Dataset (k, t) -> Dataset (k, b)
forall a b.
(StaticSerialise a, StaticSerialise b) =>
Closure (ConduitT a b (ResourceT IO) ()) -> Dataset a -> Dataset b
dPipe
            ( static (\Dict (Typeable k, Eq k, Hashable k)
Dict -> (Eq k, Hashable k) =>
Fold t b -> ConduitT (k, t) (k, b) (ResourceT IO) ()
forall a' b' k'.
(Eq k', Hashable k') =>
Fold a' b' -> ConduitT (k', a') (k', b') (ResourceT IO) ()
aggrC @t @b @k)
                Closure
  (Dict (Typeable k, Eq k, Hashable k)
   -> Fold t b -> ConduitT (k, t) (k, b) (ResourceT IO) ())
-> Closure (Dict (Typeable k, Eq k, Hashable k))
-> Closure (Fold t b -> ConduitT (k, t) (k, b) (ResourceT IO) ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` StaticHashable k => Closure (Dict (Typeable k, Eq k, Hashable k))
forall a.
StaticHashable a =>
Closure (Dict (Typeable a, Eq a, Hashable a))
staticHashable @k
                Closure (Fold t b -> ConduitT (k, t) (k, b) (ResourceT IO) ())
-> Closure (Fold t b)
-> Closure (ConduitT (k, t) (k, b) (ResourceT IO) ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (Fold t b)
f2c
            )
    where
      aggrC ::
        forall a' b' k'.
        (Eq k', Hashable k') =>
        F.Fold a' b' ->
        ConduitT (k', a') (k', b') (ResourceT IO) ()
      aggrC :: Fold a' b' -> ConduitT (k', a') (k', b') (ResourceT IO) ()
aggrC (F.Fold x -> a' -> x
step x
init' x -> b'
extract) =
        (x -> a' -> x)
-> x
-> (x -> b')
-> HashMap k' x
-> ConduitT (k', a') (k', b') (ResourceT IO) ()
forall a' b' x' k'.
(Eq k', Hashable k') =>
(b' -> a' -> b')
-> b'
-> (b' -> x')
-> HashMap k' b'
-> ConduitT (k', a') (k', x') (ResourceT IO) ()
go x -> a' -> x
step x
init' x -> b'
extract HashMap k' x
forall k v. HashMap k v
HM.empty
      go ::
        forall a' b' x' k'.
        (Eq k', Hashable k') =>
        (b' -> a' -> b') ->
        b' ->
        (b' -> x') ->
        HM.HashMap k' b' ->
        ConduitT (k', a') (k', x') (ResourceT IO) ()
      go :: (b' -> a' -> b')
-> b'
-> (b' -> x')
-> HashMap k' b'
-> ConduitT (k', a') (k', x') (ResourceT IO) ()
go b' -> a' -> b'
step b'
init' b' -> x'
extract HashMap k' b'
hm =
        ConduitT (k', a') (k', x') (ResourceT IO) (Maybe (k', a'))
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
C.await ConduitT (k', a') (k', x') (ResourceT IO) (Maybe (k', a'))
-> (Maybe (k', a') -> ConduitT (k', a') (k', x') (ResourceT IO) ())
-> ConduitT (k', a') (k', x') (ResourceT IO) ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
          Maybe (k', a')
Nothing ->
            ((k', b') -> ConduitT (k', a') (k', x') (ResourceT IO) ())
-> [(k', b')] -> ConduitT (k', a') (k', x') (ResourceT IO) ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_
              (\(k'
k, b'
b) -> (k', x') -> ConduitT (k', a') (k', x') (ResourceT IO) ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield (k'
k, b' -> x'
extract b'
b))
              (HashMap k' b' -> [(k', b')]
forall k v. HashMap k v -> [(k, v)]
HM.toList HashMap k' b'
hm)
          Just (k'
k, a'
a) ->
            (b' -> a' -> b')
-> b'
-> (b' -> x')
-> HashMap k' b'
-> ConduitT (k', a') (k', x') (ResourceT IO) ()
forall a' b' x' k'.
(Eq k', Hashable k') =>
(b' -> a' -> b')
-> b'
-> (b' -> x')
-> HashMap k' b'
-> ConduitT (k', a') (k', x') (ResourceT IO) ()
go b' -> a' -> b'
step b'
init' b' -> x'
extract (HashMap k' b' -> ConduitT (k', a') (k', x') (ResourceT IO) ())
-> HashMap k' b' -> ConduitT (k', a') (k', x') (ResourceT IO) ()
forall a b. (a -> b) -> a -> b
$
              (Maybe b' -> Maybe b') -> k' -> HashMap k' b' -> HashMap k' b'
forall k v.
(Eq k, Hashable k) =>
(Maybe v -> Maybe v) -> k -> HashMap k v -> HashMap k v
HM.alter
                ( b' -> Maybe b'
forall a. a -> Maybe a
Just (b' -> Maybe b') -> (Maybe b' -> b') -> Maybe b' -> Maybe b'
forall b c a. (b -> c) -> (a -> b) -> a -> c
. \case
                    Maybe b'
Nothing -> b' -> a' -> b'
step b'
init' a'
a
                    Just b'
st -> b' -> a' -> b'
step b'
st a'
a
                )
                k'
k
                HashMap k' b'
hm