{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StaticPointers #-}
{-# LANGUAGE TypeApplications #-}
module Control.Distributed.Dataset.Aggr
( Aggr,
aggrConst,
aggrCount,
aggrSum,
aggrMean,
aggrMax,
aggrMin,
aggrCollect,
aggrDistinct,
aggrTopK,
aggrBottomK,
aggrFiltered,
aggrFromMonoid,
aggrFromReduce,
aggrFromFold,
)
where
import Control.Applicative.Static
import Control.Distributed.Closure
import Control.Distributed.Dataset.Internal.Aggr
import Control.Distributed.Dataset.Internal.Class
import qualified Control.Foldl as F
import Data.Functor.Static
import Data.HashSet (HashSet)
import qualified Data.Heap as H
import Data.List
import Data.Monoid
import Data.Ord
import Data.Profunctor.Static
import Data.Typeable
aggrSum :: StaticSerialise a => Closure (Dict (Num a)) -> Aggr a a
aggrSum :: Closure (Dict (Num a)) -> Aggr a a
aggrSum Closure (Dict (Num a))
d =
Closure (a -> Sum a)
-> Closure (Sum a -> a) -> Aggr (Sum a) (Sum a) -> Aggr a a
forall (p :: * -> * -> *) a b c d.
(StaticProfunctor p, Typeable a, Typeable b, Typeable c,
Typeable d) =>
Closure (a -> b) -> Closure (c -> d) -> p b c -> p a d
staticDimap
(static a -> Sum a
forall a. a -> Sum a
Sum)
(static Sum a -> a
forall a. Sum a -> a
getSum)
(Closure (Dict (Monoid (Sum a))) -> Aggr (Sum a) (Sum a)
forall a.
StaticSerialise a =>
Closure (Dict (Monoid a)) -> Aggr a a
aggrFromMonoid (Closure (Dict (Monoid (Sum a))) -> Aggr (Sum a) (Sum a))
-> Closure (Dict (Monoid (Sum a))) -> Aggr (Sum a) (Sum a)
forall a b. (a -> b) -> a -> b
$ static (\Dict (Num a)
Dict -> Dict (Monoid (Sum a))
forall (a :: Constraint). a => Dict a
Dict) Closure (Dict (Num a) -> Dict (Monoid (Sum a)))
-> Closure (Dict (Num a)) -> Closure (Dict (Monoid (Sum a)))
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (Dict (Num a))
d)
aggrCount :: Typeable a => Aggr a Integer
aggrCount :: Aggr a Integer
aggrCount =
static (Integer -> a -> Integer
forall a b. a -> b -> a
const Integer
1) Closure (a -> Integer) -> Aggr Integer Integer -> Aggr a Integer
forall (p :: * -> * -> *) a b c.
(StaticProfunctor p, Typeable a, Typeable b, Typeable c) =>
Closure (a -> b) -> p b c -> p a c
`staticLmap` Closure (Dict (Num Integer)) -> Aggr Integer Integer
forall a. StaticSerialise a => Closure (Dict (Num a)) -> Aggr a a
aggrSum (static Dict (Num Integer)
forall (a :: Constraint). a => Dict a
Dict)
aggrMean :: Aggr Double Double
aggrMean :: Aggr Double Double
aggrMean =
Closure (Double -> Double -> Double)
-> Aggr Double (Double -> Double -> Double)
forall a t. (Typeable a, Typeable t) => Closure a -> Aggr t a
aggrConst (static Double -> Double -> Double
forall a. Fractional a => a -> a -> a
(/))
Aggr Double (Double -> Double -> Double)
-> Aggr Double Double -> Aggr Double (Double -> Double)
forall (f :: * -> *) a b.
(StaticApply f, Typeable a, Typeable b) =>
f (a -> b) -> f a -> f b
`staticApply` Closure (Dict (Num Double)) -> Aggr Double Double
forall a. StaticSerialise a => Closure (Dict (Num a)) -> Aggr a a
aggrSum (static Dict (Num Double)
forall (a :: Constraint). a => Dict a
Dict)
Aggr Double (Double -> Double)
-> Aggr Double Double -> Aggr Double Double
forall (f :: * -> *) a b.
(StaticApply f, Typeable a, Typeable b) =>
f (a -> b) -> f a -> f b
`staticApply` Closure (Integer -> Double)
-> Aggr Double Integer -> Aggr Double Double
forall (f :: * -> *) a b.
(StaticFunctor f, Typeable a, Typeable b) =>
Closure (a -> b) -> f a -> f b
staticMap (static Integer -> Double
forall a b. (Real a, Fractional b) => a -> b
realToFrac) Aggr Double Integer
forall a. Typeable a => Aggr a Integer
aggrCount
aggrMax :: StaticSerialise a => Closure (Dict (Ord a)) -> Aggr a (Maybe a)
aggrMax :: Closure (Dict (Ord a)) -> Aggr a (Maybe a)
aggrMax Closure (Dict (Ord a))
dict = Closure (a -> a -> a) -> Aggr a (Maybe a)
forall a.
StaticSerialise a =>
Closure (a -> a -> a) -> Aggr a (Maybe a)
aggrFromReduce (Closure (a -> a -> a) -> Aggr a (Maybe a))
-> Closure (a -> a -> a) -> Aggr a (Maybe a)
forall a b. (a -> b) -> a -> b
$ static (\Dict (Ord a)
Dict -> a -> a -> a
forall a. Ord a => a -> a -> a
max) Closure (Dict (Ord a) -> a -> a -> a)
-> Closure (Dict (Ord a)) -> Closure (a -> a -> a)
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (Dict (Ord a))
dict
aggrMin :: StaticSerialise a => Closure (Dict (Ord a)) -> Aggr a (Maybe a)
aggrMin :: Closure (Dict (Ord a)) -> Aggr a (Maybe a)
aggrMin Closure (Dict (Ord a))
dict = Closure (a -> a -> a) -> Aggr a (Maybe a)
forall a.
StaticSerialise a =>
Closure (a -> a -> a) -> Aggr a (Maybe a)
aggrFromReduce (Closure (a -> a -> a) -> Aggr a (Maybe a))
-> Closure (a -> a -> a) -> Aggr a (Maybe a)
forall a b. (a -> b) -> a -> b
$ static (\Dict (Ord a)
Dict -> a -> a -> a
forall a. Ord a => a -> a -> a
min) Closure (Dict (Ord a) -> a -> a -> a)
-> Closure (Dict (Ord a)) -> Closure (a -> a -> a)
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (Dict (Ord a))
dict
aggrFiltered :: Closure (a -> Bool) -> Aggr a b -> Aggr a b
aggrFiltered :: Closure (a -> Bool) -> Aggr a b -> Aggr a b
aggrFiltered Closure (a -> Bool)
predc (Aggr Closure (Fold a t)
f1 Closure (Fold t b)
f2) =
Closure (Fold a t) -> Closure (Fold t b) -> Aggr a b
forall a b t.
(StaticSerialise t, Typeable a, Typeable b) =>
Closure (Fold a t) -> Closure (Fold t b) -> Aggr a b
Aggr
(static (a -> Bool) -> Fold a t -> Fold a t
forall a r. (a -> Bool) -> Fold a r -> Fold a r
F.prefilter Closure ((a -> Bool) -> Fold a t -> Fold a t)
-> Closure (a -> Bool) -> Closure (Fold a t -> Fold a t)
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (a -> Bool)
predc Closure (Fold a t -> Fold a t)
-> Closure (Fold a t) -> Closure (Fold a t)
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (Fold a t)
f1)
Closure (Fold t b)
f2
aggrCollect :: StaticSerialise a => Aggr a [a]
aggrCollect :: Aggr a [a]
aggrCollect =
Closure (Fold a [a]) -> Closure (Fold [a] [a]) -> Aggr a [a]
forall t a b.
(StaticSerialise t, Typeable a, Typeable b) =>
Closure (Fold a t) -> Closure (Fold t b) -> Aggr a b
aggrFromFold
(static Fold a [a]
forall a. Fold a [a]
F.list)
(static ([[a]] -> [a]
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat ([[a]] -> [a]) -> Fold [a] [[a]] -> Fold [a] [a]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Fold [a] [[a]]
forall a. Fold a [a]
F.list))
aggrDistinct :: forall a. (StaticSerialise a, StaticHashable a) => Aggr a (HashSet a)
aggrDistinct :: Aggr a (HashSet a)
aggrDistinct =
Closure (Fold a (HashSet a))
-> Closure (Fold (HashSet a) (HashSet a)) -> Aggr a (HashSet a)
forall t a b.
(StaticSerialise t, Typeable a, Typeable b) =>
Closure (Fold a t) -> Closure (Fold t b) -> Aggr a b
aggrFromFold
(static (\Dict (Typeable a, Eq a, Hashable a)
Dict -> Fold a (HashSet a)
forall a. (Eq a, Hashable a) => Fold a (HashSet a)
F.hashSet) Closure (Dict (Typeable a, Eq a, Hashable a) -> Fold a (HashSet a))
-> Closure (Dict (Typeable a, Eq a, Hashable a))
-> Closure (Fold a (HashSet a))
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` StaticHashable a => Closure (Dict (Typeable a, Eq a, Hashable a))
forall a.
StaticHashable a =>
Closure (Dict (Typeable a, Eq a, Hashable a))
staticHashable @a)
(static (\Dict (Typeable a, Eq a, Hashable a)
Dict -> [HashSet a] -> HashSet a
forall a. Monoid a => [a] -> a
mconcat ([HashSet a] -> HashSet a)
-> Fold (HashSet a) [HashSet a] -> Fold (HashSet a) (HashSet a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Fold (HashSet a) [HashSet a]
forall a. Fold a [a]
F.list) Closure
(Dict (Typeable a, Eq a, Hashable a)
-> Fold (HashSet a) (HashSet a))
-> Closure (Dict (Typeable a, Eq a, Hashable a))
-> Closure (Fold (HashSet a) (HashSet a))
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` StaticHashable a => Closure (Dict (Typeable a, Eq a, Hashable a))
forall a.
StaticHashable a =>
Closure (Dict (Typeable a, Eq a, Hashable a))
staticHashable @a)
data TopK a = TopK Int (H.Heap a)
deriving (Typeable)
instance Semigroup (TopK a) where
TopK Int
c1 Heap a
h1 <> :: TopK a -> TopK a -> TopK a
<> TopK Int
c2 Heap a
h2 =
let m :: Int
m = Int -> Int -> Int
forall a. Ord a => a -> a -> a
min Int
c1 Int
c2
in Int -> Heap a -> TopK a
forall a. Int -> Heap a -> TopK a
TopK Int
m (Int -> Heap a -> Heap a
forall a. Int -> Heap a -> Heap a
H.drop (Heap a -> Int
forall a. Heap a -> Int
H.size Heap a
h1 Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Heap a -> Int
forall a. Heap a -> Int
H.size Heap a
h2 Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
m) (Heap a -> Heap a) -> Heap a -> Heap a
forall a b. (a -> b) -> a -> b
$ Heap a -> Heap a -> Heap a
forall a. Heap a -> Heap a -> Heap a
H.union Heap a
h1 Heap a
h2)
instance Monoid (TopK a) where
mempty :: TopK a
mempty = Int -> Heap a -> TopK a
forall a. Int -> Heap a -> TopK a
TopK Int
forall a. Bounded a => a
maxBound Heap a
forall a. Heap a
H.empty
aggrTopK ::
(StaticSerialise a, Typeable k) =>
Closure (Dict (Ord k)) ->
Int ->
Closure (a -> k) ->
Aggr a [a]
aggrTopK :: Closure (Dict (Ord k)) -> Int -> Closure (a -> k) -> Aggr a [a]
aggrTopK Closure (Dict (Ord k))
dict Int
count Closure (a -> k)
fc =
Closure (Fold a [a]) -> Closure (Fold [a] [a]) -> Aggr a [a]
forall t a b.
(StaticSerialise t, Typeable a, Typeable b) =>
Closure (Fold a t) -> Closure (Fold t b) -> Aggr a b
aggrFromFold
( static
( \Dict (Ord k)
Dict Int
c a -> k
f ->
(a -> TopK (Entry k a)) -> (TopK (Entry k a) -> [a]) -> Fold a [a]
forall w a b. Monoid w => (a -> w) -> (w -> b) -> Fold a b
F.foldMap
(\a
a -> Int -> Heap (Entry k a) -> TopK (Entry k a)
forall a. Int -> Heap a -> TopK a
TopK Int
c (Heap (Entry k a) -> TopK (Entry k a))
-> (Entry k a -> Heap (Entry k a)) -> Entry k a -> TopK (Entry k a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Entry k a -> Heap (Entry k a)
forall a. Ord a => a -> Heap a
H.singleton (Entry k a -> TopK (Entry k a)) -> Entry k a -> TopK (Entry k a)
forall a b. (a -> b) -> a -> b
$ k -> a -> Entry k a
forall p a. p -> a -> Entry p a
H.Entry (a -> k
f a
a) a
a)
(\(TopK Int
_ Heap (Entry k a)
h) -> (Entry k a -> a) -> [Entry k a] -> [a]
forall a b. (a -> b) -> [a] -> [b]
map Entry k a -> a
forall p a. Entry p a -> a
H.payload ([Entry k a] -> [a])
-> ([Entry k a] -> [Entry k a]) -> [Entry k a] -> [a]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Entry k a -> Down (Entry k a)) -> [Entry k a] -> [Entry k a]
forall b a. Ord b => (a -> b) -> [a] -> [a]
sortOn Entry k a -> Down (Entry k a)
forall a. a -> Down a
Down ([Entry k a] -> [a]) -> [Entry k a] -> [a]
forall a b. (a -> b) -> a -> b
$ Heap (Entry k a) -> [Entry k a]
forall a. Heap a -> [a]
H.toUnsortedList Heap (Entry k a)
h)
)
Closure (Dict (Ord k) -> Int -> (a -> k) -> Fold a [a])
-> Closure (Dict (Ord k))
-> Closure (Int -> (a -> k) -> Fold a [a])
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (Dict (Ord k))
dict
Closure (Int -> (a -> k) -> Fold a [a])
-> Closure Int -> Closure ((a -> k) -> Fold a [a])
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (Dict (Serializable Int)) -> Int -> Closure Int
forall a. Closure (Dict (Serializable a)) -> a -> Closure a
cpure (static Dict (Serializable Int)
forall (a :: Constraint). a => Dict a
Dict) Int
count
Closure ((a -> k) -> Fold a [a])
-> Closure (a -> k) -> Closure (Fold a [a])
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (a -> k)
fc
)
( static
( \Dict (Ord k)
Dict Int
c a -> k
f ->
([a] -> [a] -> [a]) -> [a] -> ([a] -> [a]) -> Fold [a] [a]
forall a b x. (x -> a -> x) -> x -> (x -> b) -> Fold a b
F.Fold (\[a]
a [a]
b -> Int -> [a] -> [a]
forall a. Int -> [a] -> [a]
take Int
c ([a] -> [a]) -> [a] -> [a]
forall a b. (a -> b) -> a -> b
$ (a -> k) -> [a] -> [a] -> [a]
forall a a. Ord a => (a -> a) -> [a] -> [a] -> [a]
merge a -> k
f [a]
a [a]
b) [] [a] -> [a]
forall a. a -> a
id
)
Closure (Dict (Ord k) -> Int -> (a -> k) -> Fold [a] [a])
-> Closure (Dict (Ord k))
-> Closure (Int -> (a -> k) -> Fold [a] [a])
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (Dict (Ord k))
dict
Closure (Int -> (a -> k) -> Fold [a] [a])
-> Closure Int -> Closure ((a -> k) -> Fold [a] [a])
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (Dict (Serializable Int)) -> Int -> Closure Int
forall a. Closure (Dict (Serializable a)) -> a -> Closure a
cpure (static Dict (Serializable Int)
forall (a :: Constraint). a => Dict a
Dict) Int
count
Closure ((a -> k) -> Fold [a] [a])
-> Closure (a -> k) -> Closure (Fold [a] [a])
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (a -> k)
fc
)
where
merge :: (a -> a) -> [a] -> [a] -> [a]
merge a -> a
_ [a]
xs [] = [a]
xs
merge a -> a
_ [] [a]
ys = [a]
ys
merge a -> a
f xss :: [a]
xss@(a
x : [a]
xs) yss :: [a]
yss@(a
y : [a]
ys) =
if a -> a
f a
x a -> a -> Bool
forall a. Ord a => a -> a -> Bool
> a -> a
f a
y
then a
x a -> [a] -> [a]
forall a. a -> [a] -> [a]
: (a -> a) -> [a] -> [a] -> [a]
merge a -> a
f [a]
xs [a]
yss
else a
y a -> [a] -> [a]
forall a. a -> [a] -> [a]
: (a -> a) -> [a] -> [a] -> [a]
merge a -> a
f [a]
xss [a]
ys
aggrBottomK ::
(StaticSerialise a, Typeable k) =>
Closure (Dict (Ord k)) ->
Int ->
Closure (a -> k) ->
Aggr a [a]
aggrBottomK :: Closure (Dict (Ord k)) -> Int -> Closure (a -> k) -> Aggr a [a]
aggrBottomK Closure (Dict (Ord k))
d Int
count Closure (a -> k)
fc =
Closure (Dict (Ord (Down k)))
-> Int -> Closure (a -> Down k) -> Aggr a [a]
forall a k.
(StaticSerialise a, Typeable k) =>
Closure (Dict (Ord k)) -> Int -> Closure (a -> k) -> Aggr a [a]
aggrTopK
(static (\Dict (Ord k)
Dict -> Dict (Ord (Down k))
forall (a :: Constraint). a => Dict a
Dict) Closure (Dict (Ord k) -> Dict (Ord (Down k)))
-> Closure (Dict (Ord k)) -> Closure (Dict (Ord (Down k)))
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (Dict (Ord k))
d)
Int
count
(static (k -> Down k
forall a. a -> Down a
Down (k -> Down k) -> (a -> k) -> a -> Down k
forall b c a. (b -> c) -> (a -> b) -> a -> c
.) Closure ((a -> k) -> a -> Down k)
-> Closure (a -> k) -> Closure (a -> Down k)
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (a -> k)
fc)