{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE StaticPointers #-}
{-# LANGUAGE TypeApplications #-}
module Control.Distributed.Dataset.OpenDatasets.GHArchive
( ghArchive,
module Control.Distributed.Dataset.OpenDatasets.GHArchive.Types,
fromGregorian,
)
where
import Conduit
( (.|),
ConduitT,
ResourceT,
handleC,
throwM,
)
import Control.Distributed.Dataset
import Control.Distributed.Dataset.OpenDatasets.GHArchive.Types
import Control.Exception
import qualified Data.Conduit.Combinators as C
import qualified Data.Conduit.JSON.NewlineDelimited as NDJ
import Data.Conduit.Zlib (ungzip)
import qualified Data.Text as T
import Data.Time.Calendar
( Day,
fromGregorian,
showGregorian,
)
import Data.Typeable
import Network.HTTP.Simple
( getResponseBody,
getResponseStatusCode,
httpSource,
parseRequest,
)
import Text.Printf
ghArchive :: (Day, Day) -> Dataset GHEvent
ghArchive :: (Day, Day) -> Dataset GHEvent
ghArchive (Day
start, Day
end) =
[Partition GHEvent] -> Dataset GHEvent
forall a. StaticSerialise a => [Partition a] -> Dataset a
dExternal
( (Text -> Partition GHEvent) -> [Text] -> [Partition GHEvent]
forall a b. (a -> b) -> [a] -> [b]
map
(\Text
str -> Closure (ConduitT () GHEvent (ResourceT IO) ())
-> Partition GHEvent
forall a.
Typeable a =>
Closure (ConduitT () a (ResourceT IO) ()) -> Partition a
mkPartition (static Text -> ConduitT () GHEvent (ResourceT IO) ()
processUrl Closure (Text -> ConduitT () GHEvent (ResourceT IO) ())
-> Closure Text -> Closure (ConduitT () GHEvent (ResourceT IO) ())
forall a b.
Typeable a =>
Closure (a -> b) -> Closure a -> Closure b
`cap` Closure (Dict (Serializable Text)) -> Text -> Closure Text
forall a. Closure (Dict (Serializable a)) -> a -> Closure a
cpure (static Dict (Serializable Text)
forall (a :: Constraint). a => Dict a
Dict) Text
str))
(Day -> Day -> [Text]
allUrls Day
start Day
end)
)
Dataset GHEvent
-> (Dataset GHEvent -> Dataset GHEvent) -> Dataset GHEvent
forall a b. a -> (a -> b) -> b
& Int -> Dataset GHEvent -> Dataset GHEvent
forall a. Typeable a => Int -> Dataset a -> Dataset a
dCoalesce ((Day -> Int
forall a. Enum a => a -> Int
fromEnum Day
end Int -> Int -> Int
forall a. Num a => a -> a -> a
- Day -> Int
forall a. Enum a => a -> Int
fromEnum Day
start Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
6)
allUrls :: Day -> Day -> [T.Text]
allUrls :: Day -> Day -> [Text]
allUrls Day
start Day
end = do
String
date <- Day -> String
showGregorian (Day -> String) -> [Day] -> [String]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [Day
start .. Day
end]
String
time <- String -> Int -> String
forall r. PrintfType r => String -> r
printf String
"%02d" (Int -> String) -> [Int] -> [String]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [(Int
0 :: Int) .. Int
23]
let str :: Text
str = String -> Text
T.pack String
date Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"-" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack String
time
Text -> [Text]
forall (m :: * -> *) a. Monad m => a -> m a
return (Text -> [Text]) -> Text -> [Text]
forall a b. (a -> b) -> a -> b
$ Text
"http://data.gharchive.org/" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
str Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
".json.gz"
processUrl :: T.Text -> ConduitT () GHEvent (ResourceT IO) ()
processUrl :: Text -> ConduitT () GHEvent (ResourceT IO) ()
processUrl Text
url =
(SomeException -> ConduitT () GHEvent (ResourceT IO) ())
-> ConduitT () GHEvent (ResourceT IO) ()
-> ConduitT () GHEvent (ResourceT IO) ()
forall (m :: * -> *) e i o r.
(MonadUnliftIO m, Exception e) =>
(e -> ConduitT i o m r) -> ConduitT i o m r -> ConduitT i o m r
handleC SomeException -> ConduitT () GHEvent (ResourceT IO) ()
forall a. SomeException -> ConduitT () GHEvent (ResourceT IO) a
wrapEx (ConduitT () GHEvent (ResourceT IO) ()
-> ConduitT () GHEvent (ResourceT IO) ())
-> ConduitT () GHEvent (ResourceT IO) ()
-> ConduitT () GHEvent (ResourceT IO) ()
forall a b. (a -> b) -> a -> b
$ do
Request
req <- String -> ConduitT () GHEvent (ResourceT IO) Request
forall (m :: * -> *). MonadThrow m => String -> m Request
parseRequest (Text -> String
T.unpack Text
url)
Request
-> (Response (ConduitM () ByteString (ResourceT IO) ())
-> ConduitT () GHEvent (ResourceT IO) ())
-> ConduitT () GHEvent (ResourceT IO) ()
forall (m :: * -> *) (n :: * -> *) i o r.
(MonadResource m, MonadIO n) =>
Request
-> (Response (ConduitM i ByteString n ()) -> ConduitM i o m r)
-> ConduitM i o m r
httpSource Request
req Response (ConduitM () ByteString (ResourceT IO) ())
-> ConduitT () GHEvent (ResourceT IO) ()
forall (m :: * -> *) a.
(PrimMonad m, MonadThrow m, MonadFail m) =>
Response (ConduitM a ByteString m ()) -> ConduitM a GHEvent m ()
call
where
call :: Response (ConduitM a ByteString m ()) -> ConduitM a GHEvent m ()
call Response (ConduitM a ByteString m ())
req =
case Response (ConduitM a ByteString m ()) -> Int
forall a. Response a -> Int
getResponseStatusCode Response (ConduitM a ByteString m ())
req of
Int
200 ->
Response (ConduitM a ByteString m ()) -> ConduitM a ByteString m ()
forall a. Response a -> a
getResponseBody Response (ConduitM a ByteString m ())
req
ConduitM a ByteString m ()
-> ConduitM ByteString GHEvent m () -> ConduitM a GHEvent m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| ConduitT ByteString ByteString m ()
forall (m :: * -> *).
(PrimMonad m, MonadThrow m) =>
ConduitT ByteString ByteString m ()
ungzip
ConduitT ByteString ByteString m ()
-> ConduitM ByteString GHEvent m ()
-> ConduitM ByteString GHEvent m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| (Monad m, FromJSON GHEvent) =>
Conduit ByteString m (Either String GHEvent)
forall (m :: * -> *) a.
(Monad m, FromJSON a) =>
Conduit ByteString m (Either String a)
NDJ.eitherParser @_ @GHEvent
Conduit ByteString m (Either String GHEvent)
-> ConduitM (Either String GHEvent) GHEvent m ()
-> ConduitM ByteString GHEvent m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| (Either String GHEvent -> m GHEvent)
-> ConduitM (Either String GHEvent) GHEvent m ()
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> ConduitT a b m ()
C.mapM ((String -> m GHEvent)
-> (GHEvent -> m GHEvent) -> Either String GHEvent -> m GHEvent
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either String -> m GHEvent
forall (m :: * -> *) a. MonadFail m => String -> m a
fail GHEvent -> m GHEvent
forall (m :: * -> *) a. Monad m => a -> m a
return)
Int
404 -> () -> ConduitM a GHEvent m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Int
r -> String -> ConduitM a GHEvent m ()
forall (m :: * -> *) a. MonadFail m => String -> m a
fail (String -> ConduitM a GHEvent m ())
-> String -> ConduitM a GHEvent m ()
forall a b. (a -> b) -> a -> b
$ String
"Unexpected status code: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ Int -> String
forall a. Show a => a -> String
show Int
r
wrapEx :: SomeException -> ConduitT () GHEvent (ResourceT IO) a
wrapEx =
GHArchiveException SomeException
-> ConduitT () GHEvent (ResourceT IO) a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM (GHArchiveException SomeException
-> ConduitT () GHEvent (ResourceT IO) a)
-> (SomeException -> GHArchiveException SomeException)
-> SomeException
-> ConduitT () GHEvent (ResourceT IO) a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> SomeException -> GHArchiveException SomeException
forall e. Text -> e -> GHArchiveException e
GHArchiveException Text
url (SomeException -> GHArchiveException SomeException)
-> (SomeException -> SomeException)
-> SomeException
-> GHArchiveException SomeException
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SomeException -> SomeException
forall a. a -> a
id @SomeException
data GHArchiveException e = GHArchiveException T.Text e
deriving (Typeable, Int -> GHArchiveException e -> String -> String
[GHArchiveException e] -> String -> String
GHArchiveException e -> String
(Int -> GHArchiveException e -> String -> String)
-> (GHArchiveException e -> String)
-> ([GHArchiveException e] -> String -> String)
-> Show (GHArchiveException e)
forall e. Show e => Int -> GHArchiveException e -> String -> String
forall e. Show e => [GHArchiveException e] -> String -> String
forall e. Show e => GHArchiveException e -> String
forall a.
(Int -> a -> String -> String)
-> (a -> String) -> ([a] -> String -> String) -> Show a
showList :: [GHArchiveException e] -> String -> String
$cshowList :: forall e. Show e => [GHArchiveException e] -> String -> String
show :: GHArchiveException e -> String
$cshow :: forall e. Show e => GHArchiveException e -> String
showsPrec :: Int -> GHArchiveException e -> String -> String
$cshowsPrec :: forall e. Show e => Int -> GHArchiveException e -> String -> String
Show)
instance (Typeable e, Show e) => Exception (GHArchiveException e)