{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StaticPointers #-}

module Control.Distributed.Dataset.Internal.Aggr
  ( Aggr (..),
    aggrFromMonoid,
    aggrFromReduce,
    aggrFromFold,
    aggrConst,
  )
where

import Control.Applicative.Static
import Control.Distributed.Closure
import Control.Distributed.Dataset.Internal.Class
import qualified Control.Foldl as F
import Control.Lens
import Data.Functor.Static
import Data.Profunctor.Static
import Data.Typeable

-- |
-- Represent an aggregation which takes many 'a's and returns a single 'b'.

-- Use 'Control.Distributed.Fork.dAggr' and 'Control.Distributed.Fork.dGroupedAggr'
-- functions to use them on 'Dataset's.

-- You can use the 'StaticApply' and 'StaticProfunctor' instances to compose
-- 'Aggr's together. Example:

-- @
-- dAvg :: Aggr Double Double
-- dAvg =
--   aggrConst (static (/))
--     \`staticApply\` aggrSum (static Dict)
--     \`staticApply\` staticMap (static realToFrac) aggrCount
-- @

-- Alternatively, you can use aggrFrom* functions to create 'Aggr's.
data Aggr a b
  = forall t.
    (StaticSerialise t, Typeable a, Typeable b) =>
    Aggr
      (Closure (F.Fold a t))
      (Closure (F.Fold t b))

instance Typeable m => StaticFunctor (Aggr m) where
  staticMap :: Closure (a -> b) -> Aggr m a -> Aggr m b
staticMap Closure (a -> b)
f (Aggr Closure (Fold m t)
f1c Closure (Fold t a)
f2c) =
    Closure (Fold m t) -> Closure (Fold t b) -> Aggr m b
forall a b t.
(StaticSerialise t, Typeable a, Typeable b) =>
Closure (Fold a t) -> Closure (Fold t b) -> Aggr a b
Aggr Closure (Fold m t)
f1c (static (a -> b) -> Fold t a -> Fold t b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Closure ((a -> b) -> Fold t a -> Fold t b)
-> Closure (a -> b) -> Closure (Fold t a -> Fold t b)
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (a -> b)
f Closure (Fold t a -> Fold t b)
-> Closure (Fold t a) -> Closure (Fold t b)
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (Fold t a)
f2c)

instance Typeable m => StaticApply (Aggr m) where
  staticApply :: Aggr m (a -> b) -> Aggr m a -> Aggr m b
staticApply (Aggr Closure (Fold m t)
f1c Closure (Fold t (a -> b))
f2c) (Aggr Closure (Fold m t)
f1c' Closure (Fold t a)
f2c') =
    Closure (Fold m (t, t)) -> Closure (Fold (t, t) b) -> Aggr m b
forall a b t.
(StaticSerialise t, Typeable a, Typeable b) =>
Closure (Fold a t) -> Closure (Fold t b) -> Aggr a b
Aggr
      (static (\Fold m t
f1 Fold m t
f1' -> (,) (t -> t -> (t, t)) -> Fold m t -> Fold m (t -> (t, t))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Fold m t
f1 Fold m (t -> (t, t)) -> Fold m t -> Fold m (t, t)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Fold m t
f1') Closure (Fold m t -> Fold m t -> Fold m (t, t))
-> Closure (Fold m t) -> Closure (Fold m t -> Fold m (t, t))
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (Fold m t)
f1c Closure (Fold m t -> Fold m (t, t))
-> Closure (Fold m t) -> Closure (Fold m (t, t))
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (Fold m t)
f1c')
      (static (\Fold t (a -> b)
f2 Fold t a
f2' -> (a -> b) -> a -> b
forall a b. (a -> b) -> a -> b
($) ((a -> b) -> a -> b)
-> Fold (t, t) (a -> b) -> Fold (t, t) (a -> b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ((t, t) -> t) -> Fold t (a -> b) -> Fold (t, t) (a -> b)
forall (p :: * -> * -> *) a b c.
Profunctor p =>
(a -> b) -> p b c -> p a c
lmap (t, t) -> t
forall a b. (a, b) -> a
fst Fold t (a -> b)
f2 Fold (t, t) (a -> b) -> Fold (t, t) a -> Fold (t, t) b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> ((t, t) -> t) -> Fold t a -> Fold (t, t) a
forall (p :: * -> * -> *) a b c.
Profunctor p =>
(a -> b) -> p b c -> p a c
lmap (t, t) -> t
forall a b. (a, b) -> b
snd Fold t a
f2') Closure (Fold t (a -> b) -> Fold t a -> Fold (t, t) b)
-> Closure (Fold t (a -> b)) -> Closure (Fold t a -> Fold (t, t) b)
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (Fold t (a -> b))
f2c Closure (Fold t a -> Fold (t, t) b)
-> Closure (Fold t a) -> Closure (Fold (t, t) b)
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (Fold t a)
f2c')

instance StaticProfunctor Aggr where
  staticDimap :: Closure (a -> b) -> Closure (c -> d) -> Aggr b c -> Aggr a d
staticDimap Closure (a -> b)
l Closure (c -> d)
r (Aggr Closure (Fold b t)
f1 Closure (Fold t c)
f2) =
    Closure (Fold a t) -> Closure (Fold t d) -> Aggr a d
forall a b t.
(StaticSerialise t, Typeable a, Typeable b) =>
Closure (Fold a t) -> Closure (Fold t b) -> Aggr a b
Aggr
      (static (a -> b) -> Fold b t -> Fold a t
forall (p :: * -> * -> *) a b c.
Profunctor p =>
(a -> b) -> p b c -> p a c
lmap Closure ((a -> b) -> Fold b t -> Fold a t)
-> Closure (a -> b) -> Closure (Fold b t -> Fold a t)
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (a -> b)
l Closure (Fold b t -> Fold a t)
-> Closure (Fold b t) -> Closure (Fold a t)
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (Fold b t)
f1)
      (static (c -> d) -> Fold t c -> Fold t d
forall (p :: * -> * -> *) b c a.
Profunctor p =>
(b -> c) -> p a b -> p a c
rmap Closure ((c -> d) -> Fold t c -> Fold t d)
-> Closure (c -> d) -> Closure (Fold t c -> Fold t d)
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (c -> d)
r Closure (Fold t c -> Fold t d)
-> Closure (Fold t c) -> Closure (Fold t d)
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (Fold t c)
f2)

-- |
-- Create an aggregation given a 'Monoid' instance.
aggrFromMonoid ::
  StaticSerialise a =>
  Closure (Dict (Monoid a)) ->
  Aggr a a
aggrFromMonoid :: Closure (Dict (Monoid a)) -> Aggr a a
aggrFromMonoid Closure (Dict (Monoid a))
d =
  Closure (Fold a a) -> Closure (Fold a a) -> Aggr a a
forall t a b.
(StaticSerialise t, Typeable a, Typeable b) =>
Closure (Fold a t) -> Closure (Fold t b) -> Aggr a b
aggrFromFold Closure (Fold a a)
go Closure (Fold a a)
go
  where
    go :: Closure (Fold a a)
go = static (\Dict (Monoid a)
Dict -> (a -> a) -> (a -> a) -> Fold a a
forall w a b. Monoid w => (a -> w) -> (w -> b) -> Fold a b
F.foldMap a -> a
forall a. a -> a
id a -> a
forall a. a -> a
id) Closure (Dict (Monoid a) -> Fold a a)
-> Closure (Dict (Monoid a)) -> Closure (Fold a a)
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (Dict (Monoid a))
d

-- |
-- Create an aggregation given a reduce function.

-- Returns 'Nothing' on empty 'Dataset's.
aggrFromReduce ::
  StaticSerialise a =>
  Closure (a -> a -> a) ->
  Aggr a (Maybe a)
aggrFromReduce :: Closure (a -> a -> a) -> Aggr a (Maybe a)
aggrFromReduce Closure (a -> a -> a)
dc =
  Closure (Fold a (Maybe a))
-> Closure (Fold (Maybe a) (Maybe a)) -> Aggr a (Maybe a)
forall t a b.
(StaticSerialise t, Typeable a, Typeable b) =>
Closure (Fold a t) -> Closure (Fold t b) -> Aggr a b
aggrFromFold
    (static (a -> a -> a) -> Fold a (Maybe a)
forall a. (a -> a -> a) -> Fold a (Maybe a)
F._Fold1 Closure ((a -> a -> a) -> Fold a (Maybe a))
-> Closure (a -> a -> a) -> Closure (Fold a (Maybe a))
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (a -> a -> a)
dc)
    (static (Handler (Maybe a) a -> Fold a (Maybe a) -> Fold (Maybe a) (Maybe a)
forall a b r. Handler a b -> Fold b r -> Fold a r
F.handles Handler (Maybe a) a
forall a b. Prism (Maybe a) (Maybe b) a b
_Just (Fold a (Maybe a) -> Fold (Maybe a) (Maybe a))
-> ((a -> a -> a) -> Fold a (Maybe a))
-> (a -> a -> a)
-> Fold (Maybe a) (Maybe a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a -> a -> a) -> Fold a (Maybe a)
forall a. (a -> a -> a) -> Fold a (Maybe a)
F._Fold1) Closure ((a -> a -> a) -> Fold (Maybe a) (Maybe a))
-> Closure (a -> a -> a) -> Closure (Fold (Maybe a) (Maybe a))
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (a -> a -> a)
dc)

-- |
-- Create an aggregation given two 'Fold's.

-- This is the most primitive way to create an aggregation, use other
-- methods if possible.

-- The first 'Fold' will be applied on each partition, and the results will
-- be shuffled and fed to the second 'Fold'.
aggrFromFold ::
  (StaticSerialise t, Typeable a, Typeable b) =>
  -- | Fold to run before the shuffle
  Closure (F.Fold a t) ->
  -- | Fold to run after the shuffle
  Closure (F.Fold t b) ->
  Aggr a b
aggrFromFold :: Closure (Fold a t) -> Closure (Fold t b) -> Aggr a b
aggrFromFold = Closure (Fold a t) -> Closure (Fold t b) -> Aggr a b
forall a b t.
(StaticSerialise t, Typeable a, Typeable b) =>
Closure (Fold a t) -> Closure (Fold t b) -> Aggr a b
Aggr

-- |
-- An aggregation which ignores the input data and always yields the given value.
aggrConst :: forall a t. (Typeable a, Typeable t) => Closure a -> Aggr t a
aggrConst :: Closure a -> Aggr t a
aggrConst Closure a
ac =
  Closure (t -> ()) -> Closure (() -> a) -> Aggr () () -> Aggr t a
forall (p :: * -> * -> *) a b c d.
(StaticProfunctor p, Typeable a, Typeable b, Typeable c,
 Typeable d) =>
Closure (a -> b) -> Closure (c -> d) -> p b c -> p a d
staticDimap
    (static (() -> t -> ()
forall a b. a -> b -> a
const ()))
    (static a -> () -> a
forall a b. a -> b -> a
const Closure (a -> () -> a) -> Closure a -> Closure (() -> a)
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure a
ac)
    (Closure (Dict (Monoid ())) -> Aggr () ()
forall a.
StaticSerialise a =>
Closure (Dict (Monoid a)) -> Aggr a a
aggrFromMonoid (static Dict (Monoid ())
forall (a :: Constraint). a => Dict a
Dict))