{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE StaticPointers #-}
{-# LANGUAGE TypeApplications #-}

module Control.Distributed.Dataset.OpenDatasets.GHArchive
  ( ghArchive,
    module Control.Distributed.Dataset.OpenDatasets.GHArchive.Types,

    -- * Re-exports
    fromGregorian,
  )
where

import Conduit
  ( (.|),
    ConduitT,
    ResourceT,
    handleC,
    throwM,
  )
import Control.Distributed.Dataset
import Control.Distributed.Dataset.OpenDatasets.GHArchive.Types
import Control.Exception
import qualified Data.Conduit.Combinators as C
import qualified Data.Conduit.JSON.NewlineDelimited as NDJ
import Data.Conduit.Zlib (ungzip)
import qualified Data.Text as T
import Data.Time.Calendar
  ( Day,
    fromGregorian,
    showGregorian,
  )
import Data.Typeable
import Network.HTTP.Simple
  ( getResponseBody,
    getResponseStatusCode,
    httpSource,
    parseRequest,
  )
import Text.Printf

ghArchive :: (Day, Day) -> Dataset GHEvent
ghArchive :: (Day, Day) -> Dataset GHEvent
ghArchive (Day
start, Day
end) =
  [Partition GHEvent] -> Dataset GHEvent
forall a. StaticSerialise a => [Partition a] -> Dataset a
dExternal
    ( (Text -> Partition GHEvent) -> [Text] -> [Partition GHEvent]
forall a b. (a -> b) -> [a] -> [b]
map
        (\Text
str -> Closure (ConduitT () GHEvent (ResourceT IO) ())
-> Partition GHEvent
forall a.
Typeable a =>
Closure (ConduitT () a (ResourceT IO) ()) -> Partition a
mkPartition (static Text -> ConduitT () GHEvent (ResourceT IO) ()
processUrl Closure (Text -> ConduitT () GHEvent (ResourceT IO) ())
-> Closure Text -> Closure (ConduitT () GHEvent (ResourceT IO) ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (Dict (Serializable Text)) -> Text -> Closure Text
forall a. Closure (Dict (Serializable a)) -> a -> Closure a
cpure (static Dict (Serializable Text)
forall (a :: Constraint). a => Dict a
Dict) Text
str))
        (Day -> Day -> [Text]
allUrls Day
start Day
end)
    )
    Dataset GHEvent
-> (Dataset GHEvent -> Dataset GHEvent) -> Dataset GHEvent
forall a b. a -> (a -> b) -> b
& Int -> Dataset GHEvent -> Dataset GHEvent
forall a. Typeable a => Int -> Dataset a -> Dataset a
dCoalesce ((Day -> Int
forall a. Enum a => a -> Int
fromEnum Day
end Int -> Int -> Int
forall a. Num a => a -> a -> a
- Day -> Int
forall a. Enum a => a -> Int
fromEnum Day
start Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
6)

allUrls :: Day -> Day -> [T.Text]
allUrls :: Day -> Day -> [Text]
allUrls Day
start Day
end = do
  String
date <- Day -> String
showGregorian (Day -> String) -> [Day] -> [String]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [Day
start .. Day
end]
  String
time <- String -> Int -> String
forall r. PrintfType r => String -> r
printf String
"%02d" (Int -> String) -> [Int] -> [String]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [(Int
0 :: Int) .. Int
23]
  let str :: Text
str = String -> Text
T.pack String
date Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"-" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack String
time
  Text -> [Text]
forall (m :: * -> *) a. Monad m => a -> m a
return (Text -> [Text]) -> Text -> [Text]
forall a b. (a -> b) -> a -> b
$ Text
"http://data.gharchive.org/" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
str Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
".json.gz"

processUrl :: T.Text -> ConduitT () GHEvent (ResourceT IO) ()
processUrl :: Text -> ConduitT () GHEvent (ResourceT IO) ()
processUrl Text
url =
  (SomeException -> ConduitT () GHEvent (ResourceT IO) ())
-> ConduitT () GHEvent (ResourceT IO) ()
-> ConduitT () GHEvent (ResourceT IO) ()
forall (m :: * -> *) e i o r.
(MonadUnliftIO m, Exception e) =>
(e -> ConduitT i o m r) -> ConduitT i o m r -> ConduitT i o m r
handleC SomeException -> ConduitT () GHEvent (ResourceT IO) ()
forall a. SomeException -> ConduitT () GHEvent (ResourceT IO) a
wrapEx (ConduitT () GHEvent (ResourceT IO) ()
 -> ConduitT () GHEvent (ResourceT IO) ())
-> ConduitT () GHEvent (ResourceT IO) ()
-> ConduitT () GHEvent (ResourceT IO) ()
forall a b. (a -> b) -> a -> b
$ do
    Request
req <- String -> ConduitT () GHEvent (ResourceT IO) Request
forall (m :: * -> *). MonadThrow m => String -> m Request
parseRequest (Text -> String
T.unpack Text
url)
    Request
-> (Response (ConduitM () ByteString (ResourceT IO) ())
    -> ConduitT () GHEvent (ResourceT IO) ())
-> ConduitT () GHEvent (ResourceT IO) ()
forall (m :: * -> *) (n :: * -> *) i o r.
(MonadResource m, MonadIO n) =>
Request
-> (Response (ConduitM i ByteString n ()) -> ConduitM i o m r)
-> ConduitM i o m r
httpSource Request
req Response (ConduitM () ByteString (ResourceT IO) ())
-> ConduitT () GHEvent (ResourceT IO) ()
forall (m :: * -> *) a.
(PrimMonad m, MonadThrow m, MonadFail m) =>
Response (ConduitM a ByteString m ()) -> ConduitM a GHEvent m ()
call
  where
    call :: Response (ConduitM a ByteString m ()) -> ConduitM a GHEvent m ()
call Response (ConduitM a ByteString m ())
req =
      case Response (ConduitM a ByteString m ()) -> Int
forall a. Response a -> Int
getResponseStatusCode Response (ConduitM a ByteString m ())
req of
        Int
200 ->
          Response (ConduitM a ByteString m ()) -> ConduitM a ByteString m ()
forall a. Response a -> a
getResponseBody Response (ConduitM a ByteString m ())
req
            ConduitM a ByteString m ()
-> ConduitM ByteString GHEvent m () -> ConduitM a GHEvent m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| ConduitT ByteString ByteString m ()
forall (m :: * -> *).
(PrimMonad m, MonadThrow m) =>
ConduitT ByteString ByteString m ()
ungzip
            ConduitT ByteString ByteString m ()
-> ConduitM ByteString GHEvent m ()
-> ConduitM ByteString GHEvent m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| (Monad m, FromJSON GHEvent) =>
Conduit ByteString m (Either String GHEvent)
forall (m :: * -> *) a.
(Monad m, FromJSON a) =>
Conduit ByteString m (Either String a)
NDJ.eitherParser @_ @GHEvent
            Conduit ByteString m (Either String GHEvent)
-> ConduitM (Either String GHEvent) GHEvent m ()
-> ConduitM ByteString GHEvent m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| (Either String GHEvent -> m GHEvent)
-> ConduitM (Either String GHEvent) GHEvent m ()
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> ConduitT a b m ()
C.mapM ((String -> m GHEvent)
-> (GHEvent -> m GHEvent) -> Either String GHEvent -> m GHEvent
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either String -> m GHEvent
forall (m :: * -> *) a. MonadFail m => String -> m a
fail GHEvent -> m GHEvent
forall (m :: * -> *) a. Monad m => a -> m a
return)
        Int
404 -> () -> ConduitM a GHEvent m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
        Int
r -> String -> ConduitM a GHEvent m ()
forall (m :: * -> *) a. MonadFail m => String -> m a
fail (String -> ConduitM a GHEvent m ())
-> String -> ConduitM a GHEvent m ()
forall a b. (a -> b) -> a -> b
$ String
"Unexpected status code: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ Int -> String
forall a. Show a => a -> String
show Int
r
    wrapEx :: SomeException -> ConduitT () GHEvent (ResourceT IO) a
wrapEx =
      GHArchiveException SomeException
-> ConduitT () GHEvent (ResourceT IO) a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM (GHArchiveException SomeException
 -> ConduitT () GHEvent (ResourceT IO) a)
-> (SomeException -> GHArchiveException SomeException)
-> SomeException
-> ConduitT () GHEvent (ResourceT IO) a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> SomeException -> GHArchiveException SomeException
forall e. Text -> e -> GHArchiveException e
GHArchiveException Text
url (SomeException -> GHArchiveException SomeException)
-> (SomeException -> SomeException)
-> SomeException
-> GHArchiveException SomeException
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SomeException -> SomeException
forall a. a -> a
id @SomeException

data GHArchiveException e = GHArchiveException T.Text e
  deriving (Typeable, Int -> GHArchiveException e -> String -> String
[GHArchiveException e] -> String -> String
GHArchiveException e -> String
(Int -> GHArchiveException e -> String -> String)
-> (GHArchiveException e -> String)
-> ([GHArchiveException e] -> String -> String)
-> Show (GHArchiveException e)
forall e. Show e => Int -> GHArchiveException e -> String -> String
forall e. Show e => [GHArchiveException e] -> String -> String
forall e. Show e => GHArchiveException e -> String
forall a.
(Int -> a -> String -> String)
-> (a -> String) -> ([a] -> String -> String) -> Show a
showList :: [GHArchiveException e] -> String -> String
$cshowList :: forall e. Show e => [GHArchiveException e] -> String -> String
show :: GHArchiveException e -> String
$cshow :: forall e. Show e => GHArchiveException e -> String
showsPrec :: Int -> GHArchiveException e -> String -> String
$cshowsPrec :: forall e. Show e => Int -> GHArchiveException e -> String -> String
Show)

instance (Typeable e, Show e) => Exception (GHArchiveException e)