{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StaticPointers #-}
{-# LANGUAGE TypeApplications #-}

module Control.Distributed.Dataset.Aggr
  ( Aggr,
    aggrConst,
    aggrCount,
    aggrSum,
    aggrMean,
    aggrMax,
    aggrMin,
    aggrCollect,
    aggrDistinct,
    aggrTopK,
    aggrBottomK,
    aggrFiltered,

    -- * Creating Aggr's
    aggrFromMonoid,
    aggrFromReduce,
    aggrFromFold,
  )
where

import Control.Applicative.Static
import Control.Distributed.Closure
import Control.Distributed.Dataset.Internal.Aggr
import Control.Distributed.Dataset.Internal.Class
import qualified Control.Foldl as F
import Data.Functor.Static
import Data.HashSet (HashSet)
import qualified Data.Heap as H
import Data.List
import Data.Monoid
import Data.Ord
import Data.Profunctor.Static
import Data.Typeable

-- |
-- Returns the sum of the inputs.
aggrSum :: StaticSerialise a => Closure (Dict (Num a)) -> Aggr a a
aggrSum :: Closure (Dict (Num a)) -> Aggr a a
aggrSum Closure (Dict (Num a))
d =
  Closure (a -> Sum a)
-> Closure (Sum a -> a) -> Aggr (Sum a) (Sum a) -> Aggr a a
forall (p :: * -> * -> *) a b c d.
(StaticProfunctor p, Typeable a, Typeable b, Typeable c,
 Typeable d) =>
Closure (a -> b) -> Closure (c -> d) -> p b c -> p a d
staticDimap
    (static a -> Sum a
forall a. a -> Sum a
Sum)
    (static Sum a -> a
forall a. Sum a -> a
getSum)
    (Closure (Dict (Monoid (Sum a))) -> Aggr (Sum a) (Sum a)
forall a.
StaticSerialise a =>
Closure (Dict (Monoid a)) -> Aggr a a
aggrFromMonoid (Closure (Dict (Monoid (Sum a))) -> Aggr (Sum a) (Sum a))
-> Closure (Dict (Monoid (Sum a))) -> Aggr (Sum a) (Sum a)
forall a b. (a -> b) -> a -> b
$ static (\Dict (Num a)
Dict -> Dict (Monoid (Sum a))
forall (a :: Constraint). a => Dict a
Dict) Closure (Dict (Num a) -> Dict (Monoid (Sum a)))
-> Closure (Dict (Num a)) -> Closure (Dict (Monoid (Sum a)))
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (Dict (Num a))
d)

-- |
-- Returns the number of inputs.
aggrCount :: Typeable a => Aggr a Integer
aggrCount :: Aggr a Integer
aggrCount =
  static (Integer -> a -> Integer
forall a b. a -> b -> a
const Integer
1) Closure (a -> Integer) -> Aggr Integer Integer -> Aggr a Integer
forall (p :: * -> * -> *) a b c.
(StaticProfunctor p, Typeable a, Typeable b, Typeable c) =>
Closure (a -> b) -> p b c -> p a c
`staticLmap` Closure (Dict (Num Integer)) -> Aggr Integer Integer
forall a. StaticSerialise a => Closure (Dict (Num a)) -> Aggr a a
aggrSum (static Dict (Num Integer)
forall (a :: Constraint). a => Dict a
Dict)

-- |
-- Calculates the mean of the inputs.
aggrMean :: Aggr Double Double
aggrMean :: Aggr Double Double
aggrMean =
  Closure (Double -> Double -> Double)
-> Aggr Double (Double -> Double -> Double)
forall a t. (Typeable a, Typeable t) => Closure a -> Aggr t a
aggrConst (static Double -> Double -> Double
forall a. Fractional a => a -> a -> a
(/))
    Aggr Double (Double -> Double -> Double)
-> Aggr Double Double -> Aggr Double (Double -> Double)
forall (f :: * -> *) a b.
(StaticApply f, Typeable a, Typeable b) =>
f (a -> b) -> f a -> f b
`staticApply` Closure (Dict (Num Double)) -> Aggr Double Double
forall a. StaticSerialise a => Closure (Dict (Num a)) -> Aggr a a
aggrSum (static Dict (Num Double)
forall (a :: Constraint). a => Dict a
Dict)
    Aggr Double (Double -> Double)
-> Aggr Double Double -> Aggr Double Double
forall (f :: * -> *) a b.
(StaticApply f, Typeable a, Typeable b) =>
f (a -> b) -> f a -> f b
`staticApply` Closure (Integer -> Double)
-> Aggr Double Integer -> Aggr Double Double
forall (f :: * -> *) a b.
(StaticFunctor f, Typeable a, Typeable b) =>
Closure (a -> b) -> f a -> f b
staticMap (static Integer -> Double
forall a b. (Real a, Fractional b) => a -> b
realToFrac) Aggr Double Integer
forall a. Typeable a => Aggr a Integer
aggrCount

-- |
-- Return the maximum of the inputs.

-- Returns 'Nothing' on empty 'Dataset's.
aggrMax :: StaticSerialise a => Closure (Dict (Ord a)) -> Aggr a (Maybe a)
aggrMax :: Closure (Dict (Ord a)) -> Aggr a (Maybe a)
aggrMax Closure (Dict (Ord a))
dict = Closure (a -> a -> a) -> Aggr a (Maybe a)
forall a.
StaticSerialise a =>
Closure (a -> a -> a) -> Aggr a (Maybe a)
aggrFromReduce (Closure (a -> a -> a) -> Aggr a (Maybe a))
-> Closure (a -> a -> a) -> Aggr a (Maybe a)
forall a b. (a -> b) -> a -> b
$ static (\Dict (Ord a)
Dict -> a -> a -> a
forall a. Ord a => a -> a -> a
max) Closure (Dict (Ord a) -> a -> a -> a)
-> Closure (Dict (Ord a)) -> Closure (a -> a -> a)
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (Dict (Ord a))
dict

-- |
-- Return the minimum of the inputs.

-- Returns 'Nothing' on empty 'Dataset's.
aggrMin :: StaticSerialise a => Closure (Dict (Ord a)) -> Aggr a (Maybe a)
aggrMin :: Closure (Dict (Ord a)) -> Aggr a (Maybe a)
aggrMin Closure (Dict (Ord a))
dict = Closure (a -> a -> a) -> Aggr a (Maybe a)
forall a.
StaticSerialise a =>
Closure (a -> a -> a) -> Aggr a (Maybe a)
aggrFromReduce (Closure (a -> a -> a) -> Aggr a (Maybe a))
-> Closure (a -> a -> a) -> Aggr a (Maybe a)
forall a b. (a -> b) -> a -> b
$ static (\Dict (Ord a)
Dict -> a -> a -> a
forall a. Ord a => a -> a -> a
min) Closure (Dict (Ord a) -> a -> a -> a)
-> Closure (Dict (Ord a)) -> Closure (a -> a -> a)
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (Dict (Ord a))
dict

-- |
-- Returns a new Aggr which only aggregates rows matching the predicate,
-- discarding others.
aggrFiltered :: Closure (a -> Bool) -> Aggr a b -> Aggr a b
aggrFiltered :: Closure (a -> Bool) -> Aggr a b -> Aggr a b
aggrFiltered Closure (a -> Bool)
predc (Aggr Closure (Fold a t)
f1 Closure (Fold t b)
f2) =
  Closure (Fold a t) -> Closure (Fold t b) -> Aggr a b
forall a b t.
(StaticSerialise t, Typeable a, Typeable b) =>
Closure (Fold a t) -> Closure (Fold t b) -> Aggr a b
Aggr
    (static (a -> Bool) -> Fold a t -> Fold a t
forall a r. (a -> Bool) -> Fold a r -> Fold a r
F.prefilter Closure ((a -> Bool) -> Fold a t -> Fold a t)
-> Closure (a -> Bool) -> Closure (Fold a t -> Fold a t)
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (a -> Bool)
predc Closure (Fold a t -> Fold a t)
-> Closure (Fold a t) -> Closure (Fold a t)
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (Fold a t)
f1)
    Closure (Fold t b)
f2

-- |
-- Collects the inputs as a list.

-- Warning: Ordering of the resulting list is non-deterministic.
aggrCollect :: StaticSerialise a => Aggr a [a]
aggrCollect :: Aggr a [a]
aggrCollect =
  Closure (Fold a [a]) -> Closure (Fold [a] [a]) -> Aggr a [a]
forall t a b.
(StaticSerialise t, Typeable a, Typeable b) =>
Closure (Fold a t) -> Closure (Fold t b) -> Aggr a b
aggrFromFold
    (static Fold a [a]
forall a. Fold a [a]
F.list)
    (static ([[a]] -> [a]
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat ([[a]] -> [a]) -> Fold [a] [[a]] -> Fold [a] [a]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Fold [a] [[a]]
forall a. Fold a [a]
F.list))

-- |
-- Collects the inputs to a 'HashSet'.
aggrDistinct :: forall a. (StaticSerialise a, StaticHashable a) => Aggr a (HashSet a)
aggrDistinct :: Aggr a (HashSet a)
aggrDistinct =
  Closure (Fold a (HashSet a))
-> Closure (Fold (HashSet a) (HashSet a)) -> Aggr a (HashSet a)
forall t a b.
(StaticSerialise t, Typeable a, Typeable b) =>
Closure (Fold a t) -> Closure (Fold t b) -> Aggr a b
aggrFromFold
    (static (\Dict (Typeable a, Eq a, Hashable a)
Dict -> Fold a (HashSet a)
forall a. (Eq a, Hashable a) => Fold a (HashSet a)
F.hashSet) Closure (Dict (Typeable a, Eq a, Hashable a) -> Fold a (HashSet a))
-> Closure (Dict (Typeable a, Eq a, Hashable a))
-> Closure (Fold a (HashSet a))
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` StaticHashable a => Closure (Dict (Typeable a, Eq a, Hashable a))
forall a.
StaticHashable a =>
Closure (Dict (Typeable a, Eq a, Hashable a))
staticHashable @a)
    (static (\Dict (Typeable a, Eq a, Hashable a)
Dict -> [HashSet a] -> HashSet a
forall a. Monoid a => [a] -> a
mconcat ([HashSet a] -> HashSet a)
-> Fold (HashSet a) [HashSet a] -> Fold (HashSet a) (HashSet a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Fold (HashSet a) [HashSet a]
forall a. Fold a [a]
F.list) Closure
  (Dict (Typeable a, Eq a, Hashable a)
   -> Fold (HashSet a) (HashSet a))
-> Closure (Dict (Typeable a, Eq a, Hashable a))
-> Closure (Fold (HashSet a) (HashSet a))
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` StaticHashable a => Closure (Dict (Typeable a, Eq a, Hashable a))
forall a.
StaticHashable a =>
Closure (Dict (Typeable a, Eq a, Hashable a))
staticHashable @a)

-- * Top K

data TopK a = TopK Int (H.Heap a)
  deriving (Typeable)

instance Semigroup (TopK a) where
  TopK Int
c1 Heap a
h1 <> :: TopK a -> TopK a -> TopK a
<> TopK Int
c2 Heap a
h2 =
    let m :: Int
m = Int -> Int -> Int
forall a. Ord a => a -> a -> a
min Int
c1 Int
c2
     in Int -> Heap a -> TopK a
forall a. Int -> Heap a -> TopK a
TopK Int
m (Int -> Heap a -> Heap a
forall a. Int -> Heap a -> Heap a
H.drop (Heap a -> Int
forall a. Heap a -> Int
H.size Heap a
h1 Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Heap a -> Int
forall a. Heap a -> Int
H.size Heap a
h2 Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
m) (Heap a -> Heap a) -> Heap a -> Heap a
forall a b. (a -> b) -> a -> b
$ Heap a -> Heap a -> Heap a
forall a. Heap a -> Heap a -> Heap a
H.union Heap a
h1 Heap a
h2)

instance Monoid (TopK a) where
  mempty :: TopK a
mempty = Int -> Heap a -> TopK a
forall a. Int -> Heap a -> TopK a
TopK Int
forall a. Bounded a => a
maxBound Heap a
forall a. Heap a
H.empty

-- |
-- Returns the 'n' greatest elements according to a key function. Similar to:
-- @take n . sortOn (Down . f)@

-- Warning: Ordering of the repeated elements is non-deterministic.
aggrTopK ::
  (StaticSerialise a, Typeable k) =>
  Closure (Dict (Ord k)) ->
  -- | Number of rows to return
  Int ->
  -- | Sorting key
  Closure (a -> k) ->
  Aggr a [a]
aggrTopK :: Closure (Dict (Ord k)) -> Int -> Closure (a -> k) -> Aggr a [a]
aggrTopK Closure (Dict (Ord k))
dict Int
count Closure (a -> k)
fc =
  Closure (Fold a [a]) -> Closure (Fold [a] [a]) -> Aggr a [a]
forall t a b.
(StaticSerialise t, Typeable a, Typeable b) =>
Closure (Fold a t) -> Closure (Fold t b) -> Aggr a b
aggrFromFold
    ( static
        ( \Dict (Ord k)
Dict Int
c a -> k
f ->
            (a -> TopK (Entry k a)) -> (TopK (Entry k a) -> [a]) -> Fold a [a]
forall w a b. Monoid w => (a -> w) -> (w -> b) -> Fold a b
F.foldMap
              (\a
a -> Int -> Heap (Entry k a) -> TopK (Entry k a)
forall a. Int -> Heap a -> TopK a
TopK Int
c (Heap (Entry k a) -> TopK (Entry k a))
-> (Entry k a -> Heap (Entry k a)) -> Entry k a -> TopK (Entry k a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Entry k a -> Heap (Entry k a)
forall a. Ord a => a -> Heap a
H.singleton (Entry k a -> TopK (Entry k a)) -> Entry k a -> TopK (Entry k a)
forall a b. (a -> b) -> a -> b
$ k -> a -> Entry k a
forall p a. p -> a -> Entry p a
H.Entry (a -> k
f a
a) a
a)
              (\(TopK Int
_ Heap (Entry k a)
h) -> (Entry k a -> a) -> [Entry k a] -> [a]
forall a b. (a -> b) -> [a] -> [b]
map Entry k a -> a
forall p a. Entry p a -> a
H.payload ([Entry k a] -> [a])
-> ([Entry k a] -> [Entry k a]) -> [Entry k a] -> [a]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Entry k a -> Down (Entry k a)) -> [Entry k a] -> [Entry k a]
forall b a. Ord b => (a -> b) -> [a] -> [a]
sortOn Entry k a -> Down (Entry k a)
forall a. a -> Down a
Down ([Entry k a] -> [a]) -> [Entry k a] -> [a]
forall a b. (a -> b) -> a -> b
$ Heap (Entry k a) -> [Entry k a]
forall a. Heap a -> [a]
H.toUnsortedList Heap (Entry k a)
h)
        )
        Closure (Dict (Ord k) -> Int -> (a -> k) -> Fold a [a])
-> Closure (Dict (Ord k))
-> Closure (Int -> (a -> k) -> Fold a [a])
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (Dict (Ord k))
dict
        Closure (Int -> (a -> k) -> Fold a [a])
-> Closure Int -> Closure ((a -> k) -> Fold a [a])
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (Dict (Serializable Int)) -> Int -> Closure Int
forall a. Closure (Dict (Serializable a)) -> a -> Closure a
cpure (static Dict (Serializable Int)
forall (a :: Constraint). a => Dict a
Dict) Int
count
        Closure ((a -> k) -> Fold a [a])
-> Closure (a -> k) -> Closure (Fold a [a])
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (a -> k)
fc
    )
    ( static
        ( \Dict (Ord k)
Dict Int
c a -> k
f ->
            ([a] -> [a] -> [a]) -> [a] -> ([a] -> [a]) -> Fold [a] [a]
forall a b x. (x -> a -> x) -> x -> (x -> b) -> Fold a b
F.Fold (\[a]
a [a]
b -> Int -> [a] -> [a]
forall a. Int -> [a] -> [a]
take Int
c ([a] -> [a]) -> [a] -> [a]
forall a b. (a -> b) -> a -> b
$ (a -> k) -> [a] -> [a] -> [a]
forall a a. Ord a => (a -> a) -> [a] -> [a] -> [a]
merge a -> k
f [a]
a [a]
b) [] [a] -> [a]
forall a. a -> a
id
        )
        Closure (Dict (Ord k) -> Int -> (a -> k) -> Fold [a] [a])
-> Closure (Dict (Ord k))
-> Closure (Int -> (a -> k) -> Fold [a] [a])
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (Dict (Ord k))
dict
        Closure (Int -> (a -> k) -> Fold [a] [a])
-> Closure Int -> Closure ((a -> k) -> Fold [a] [a])
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (Dict (Serializable Int)) -> Int -> Closure Int
forall a. Closure (Dict (Serializable a)) -> a -> Closure a
cpure (static Dict (Serializable Int)
forall (a :: Constraint). a => Dict a
Dict) Int
count
        Closure ((a -> k) -> Fold [a] [a])
-> Closure (a -> k) -> Closure (Fold [a] [a])
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (a -> k)
fc
    )
  where
    merge :: (a -> a) -> [a] -> [a] -> [a]
merge a -> a
_ [a]
xs [] = [a]
xs
    merge a -> a
_ [] [a]
ys = [a]
ys
    merge a -> a
f xss :: [a]
xss@(a
x : [a]
xs) yss :: [a]
yss@(a
y : [a]
ys) =
      if a -> a
f a
x a -> a -> Bool
forall a. Ord a => a -> a -> Bool
> a -> a
f a
y
        then a
x a -> [a] -> [a]
forall a. a -> [a] -> [a]
: (a -> a) -> [a] -> [a] -> [a]
merge a -> a
f [a]
xs [a]
yss
        else a
y a -> [a] -> [a]
forall a. a -> [a] -> [a]
: (a -> a) -> [a] -> [a] -> [a]
merge a -> a
f [a]
xss [a]
ys

-- |
-- Returns the 'n' least elements according to a key function. Similar to:
-- @take n . sortOn (Down . f)@

-- Warning: Ordering of the repeated elements is non-deterministic.
aggrBottomK ::
  (StaticSerialise a, Typeable k) =>
  Closure (Dict (Ord k)) ->
  -- | Number of rows to return
  Int ->
  -- | Sorting key
  Closure (a -> k) ->
  Aggr a [a]
aggrBottomK :: Closure (Dict (Ord k)) -> Int -> Closure (a -> k) -> Aggr a [a]
aggrBottomK Closure (Dict (Ord k))
d Int
count Closure (a -> k)
fc =
  Closure (Dict (Ord (Down k)))
-> Int -> Closure (a -> Down k) -> Aggr a [a]
forall a k.
(StaticSerialise a, Typeable k) =>
Closure (Dict (Ord k)) -> Int -> Closure (a -> k) -> Aggr a [a]
aggrTopK
    (static (\Dict (Ord k)
Dict -> Dict (Ord (Down k))
forall (a :: Constraint). a => Dict a
Dict) Closure (Dict (Ord k) -> Dict (Ord (Down k)))
-> Closure (Dict (Ord k)) -> Closure (Dict (Ord (Down k)))
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (Dict (Ord k))
d)
    Int
count
    (static (k -> Down k
forall a. a -> Down a
Down (k -> Down k) -> (a -> k) -> a -> Down k
forall b c a. (b -> c) -> (a -> b) -> a -> c
.) Closure ((a -> k) -> a -> Down k)
-> Closure (a -> k) -> Closure (a -> Down k)
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (a -> k)
fc)