feat(users/Profpatsch/whatcd-resolver): INSERT red search results
Change-Id: Ice7fdb2e265cfb99734ed41d17b62ac98f7a4869 Reviewed-on: https://cl.tvl.fyi/c/depot/+/8840 Reviewed-by: Profpatsch <mail@profpatsch.de> Tested-by: BuildkiteCI
This commit is contained in:
parent
98e38339f2
commit
70da4318f5
2 changed files with 231 additions and 52 deletions
|
@ -3,6 +3,7 @@
|
|||
{-# LANGUAGE QuasiQuotes #-}
|
||||
{-# LANGUAGE TypeFamilyDependencies #-}
|
||||
{-# LANGUAGE UndecidableInstances #-}
|
||||
{-# OPTIONS_GHC -Wno-orphans #-}
|
||||
|
||||
module Postgres.MonadPostgres where
|
||||
|
||||
|
@ -18,6 +19,10 @@ import Data.Typeable (Typeable)
|
|||
import Database.PostgreSQL.Simple (Connection, FormatError, FromRow, Query, QueryError, ResultError, SqlError, ToRow)
|
||||
import Database.PostgreSQL.Simple qualified as PG
|
||||
import Database.PostgreSQL.Simple.FromRow qualified as PG
|
||||
import Database.PostgreSQL.Simple.ToField (ToField)
|
||||
import Database.PostgreSQL.Simple.ToRow (ToRow (toRow))
|
||||
import Database.PostgreSQL.Simple.Types (fromQuery)
|
||||
import GHC.Records (HasField (..))
|
||||
import Label
|
||||
import PossehlAnalyticsPrelude
|
||||
import Postgres.Decoder
|
||||
|
@ -33,10 +38,15 @@ import UnliftIO.Process qualified as Process
|
|||
-- and will behave the same unless othewise documented.
|
||||
class Monad m => MonadPostgres (m :: Type -> Type) where
|
||||
-- | Execute an INSERT, UPDATE, or other SQL query that is not expected to return results.
|
||||
|
||||
--
|
||||
-- Returns the number of rows affected.
|
||||
execute :: (ToRow params, Typeable params) => Query -> params -> Transaction m (Label "numberOfRowsAffected" Natural)
|
||||
|
||||
-- | Execute an INSERT, UPDATE, or other SQL query that is not expected to return results. Does not perform parameter substitution.
|
||||
--
|
||||
-- Returns the number of rows affected.
|
||||
execute_ :: Query -> Transaction m (Label "numberOfRowsAffected" Natural)
|
||||
|
||||
-- | Execute a multi-row INSERT, UPDATE, or other SQL query that is not expected to return results.
|
||||
--
|
||||
-- Returns the number of rows affected. If the list of parameters is empty, this function will simply return 0 without issuing the query to the backend. If this is not desired, consider using the 'PG.Values' constructor instead.
|
||||
|
@ -45,7 +55,7 @@ class Monad m => MonadPostgres (m :: Type -> Type) where
|
|||
-- | Execute INSERT ... RETURNING, UPDATE ... RETURNING, or other SQL query that accepts multi-row input and is expected to return results. Note that it is possible to write query conn "INSERT ... RETURNING ..." ... in cases where you are only inserting a single row, and do not need functionality analogous to 'executeMany'.
|
||||
--
|
||||
-- If the list of parameters is empty, this function will simply return [] without issuing the query to the backend. If this is not desired, consider using the 'PG.Values' constructor instead.
|
||||
executeManyReturning :: (ToRow q, FromRow r) => Query -> [q] -> Transaction m [r]
|
||||
executeManyReturningWith :: (ToRow q) => Query -> [q] -> Decoder r -> Transaction m [r]
|
||||
|
||||
-- | Run a query, passing parameters and result row parser.
|
||||
queryWith :: (PG.ToRow params, Typeable params, Typeable r) => PG.Query -> params -> Decoder r -> Transaction m [r]
|
||||
|
@ -145,27 +155,6 @@ ensureSingleRow = \case
|
|||
List.length more
|
||||
}
|
||||
|
||||
-- | A better `query`
|
||||
--
|
||||
-- Parameters are passed first,
|
||||
-- then a Proxy which you should annotate with the return type of the query.
|
||||
-- This way it’s right before the @SELECT@,
|
||||
-- meaning it’s easy to see whether the two correspond.
|
||||
--
|
||||
-- TODO: maybe replace the query function in the class with this?
|
||||
queryBetter ::
|
||||
( MonadPostgres m,
|
||||
ToRow params,
|
||||
FromRow res,
|
||||
Typeable params,
|
||||
Typeable res
|
||||
) =>
|
||||
params ->
|
||||
Proxy res ->
|
||||
Query ->
|
||||
Transaction m [res]
|
||||
queryBetter params Proxy q = query q params
|
||||
|
||||
newtype Transaction m a = Transaction {unTransaction :: (ReaderT Connection m a)}
|
||||
deriving newtype
|
||||
( Functor,
|
||||
|
@ -251,11 +240,11 @@ toNumberOfRowsAffected functionName i64 =
|
|||
& liftIO
|
||||
<&> label @"numberOfRowsAffected"
|
||||
|
||||
pgExecuteManyReturning :: (ToRow params, FromRow r, MonadUnliftIO m, MonadLogger m, MonadTools m) => Query -> [params] -> Transaction m [r]
|
||||
pgExecuteManyReturning qry params =
|
||||
pgExecuteManyReturningWith :: (ToRow params, MonadUnliftIO m, MonadLogger m, MonadTools m) => Query -> [params] -> Decoder r -> Transaction m [r]
|
||||
pgExecuteManyReturningWith qry params (Decoder fromRow) =
|
||||
do
|
||||
conn <- Transaction ask
|
||||
PG.returning conn qry params
|
||||
PG.returningWith fromRow conn qry params
|
||||
& handlePGException "executeManyReturning" qry (Right params)
|
||||
|
||||
pgFold ::
|
||||
|
@ -324,6 +313,10 @@ data SingleRowError = SingleRowError
|
|||
instance Exception SingleRowError where
|
||||
displayException (SingleRowError {..}) = [fmt|Single row expected from SQL query result, {numberOfRowsReturned} rows were returned instead."|]
|
||||
|
||||
pgFormatQueryNoParams' :: (MonadIO m, MonadLogger m, MonadTools m) => Query -> Transaction m Text
|
||||
pgFormatQueryNoParams' q =
|
||||
lift $ pgFormatQueryByteString q.fromQuery
|
||||
|
||||
pgFormatQuery' :: (MonadIO m, ToRow params, MonadLogger m, MonadTools m) => Query -> params -> Transaction m Text
|
||||
pgFormatQuery' q p =
|
||||
pgFormatQuery q p
|
||||
|
@ -375,3 +368,9 @@ pgFormatQueryByteString queryBytes = do
|
|||
)
|
||||
logDebug [fmt|pg_format stdout: stderr|]
|
||||
pure (queryBytes & bytesToTextUtf8Lenient)
|
||||
|
||||
instance (ToField t1, ToField t2) => ToRow (T2 l1 t1 l2 t2) where
|
||||
toRow t2 = toRow (getField @l1 t2, getField @l2 t2)
|
||||
|
||||
instance (ToField t1, ToField t2, ToField t3) => ToRow (T3 l1 t1 l2 t2 l3 t3) where
|
||||
toRow t3 = toRow (getField @l1 t3, getField @l2 t3, getField @l3 t3)
|
||||
|
|
|
@ -4,6 +4,7 @@
|
|||
|
||||
module WhatcdResolver where
|
||||
|
||||
import Control.Concurrent (threadDelay)
|
||||
import Control.Monad.Logger qualified as Logger
|
||||
import Control.Monad.Logger.CallStack
|
||||
import Control.Monad.Reader
|
||||
|
@ -16,11 +17,14 @@ import Data.Map.Strict qualified as Map
|
|||
import Data.Pool (Pool)
|
||||
import Data.Pool qualified as Pool
|
||||
import Data.Text qualified as Text
|
||||
import Data.Text.IO qualified as Text
|
||||
import Database.PostgreSQL.Simple (Only (..))
|
||||
import Database.PostgreSQL.Simple qualified as Postgres
|
||||
import Database.PostgreSQL.Simple.SqlQQ (sql)
|
||||
import Database.PostgreSQL.Simple.Types (PGArray (PGArray))
|
||||
import Database.PostgreSQL.Simple.Types qualified as Postgres
|
||||
import Database.Postgres.Temp qualified as TmpPg
|
||||
import FieldParser qualified as Field
|
||||
import GHC.Records (HasField (..))
|
||||
import Json qualified
|
||||
import Json.Enc (Enc)
|
||||
import Json.Enc qualified as Enc
|
||||
|
@ -46,6 +50,7 @@ data TransmissionRequest = TransmissionRequest
|
|||
}
|
||||
deriving stock (Show)
|
||||
|
||||
requestListAllTorrents :: TransmissionRequest
|
||||
requestListAllTorrents =
|
||||
TransmissionRequest
|
||||
{ method = "torrent-get",
|
||||
|
@ -132,23 +137,163 @@ doTransmissionRequest dat req = do
|
|||
Left err -> appThrowTree err
|
||||
_ -> liftIO $ unwrapIOError $ Left [fmt|Non-200 response: {showPretty resp}|]
|
||||
|
||||
redactedSearch advanced = redactedApiRequest (T2 (label @"action" "browse") (label @"actionArgs" ((advanced <&> second Just))))
|
||||
redactedSearch ::
|
||||
(MonadLogger m, MonadIO m, MonadThrow m) =>
|
||||
[(ByteString, ByteString)] ->
|
||||
Json.Parse ErrorTree a ->
|
||||
m a
|
||||
redactedSearch advanced =
|
||||
redactedApiRequest
|
||||
( T2
|
||||
(label @"action" "browse")
|
||||
(label @"actionArgs" ((advanced <&> second Just)))
|
||||
)
|
||||
|
||||
test :: IO (Either TmpPg.StartError a)
|
||||
test =
|
||||
runAppWith $
|
||||
redactedSearch
|
||||
[ ("artistname", "michael jackson"),
|
||||
("year", "1982"),
|
||||
("format", "MP3"),
|
||||
("releasetype", "album"),
|
||||
("order_by", "year")
|
||||
]
|
||||
<&> (fmap $ fromMaybe undefined)
|
||||
<&> (Http.getResponseBody)
|
||||
<&> showPrettyJson
|
||||
>>= liftIO . Text.putStrLn
|
||||
runAppWith $ do
|
||||
_ <- runTransaction migrate
|
||||
transaction <- bla
|
||||
runTransaction transaction
|
||||
fix
|
||||
( \io -> do
|
||||
logInfo "delay"
|
||||
liftIO $ threadDelay 10_000_000
|
||||
io
|
||||
)
|
||||
|
||||
redactedApiRequest dat = do
|
||||
bla :: (MonadThrow m, MonadIO m, MonadLogger m, MonadPostgres m) => m (Transaction m [Label "numberOfRowsAffected" Natural])
|
||||
bla =
|
||||
redactedSearch
|
||||
[ ("searchstr", "cherish"),
|
||||
("artistname", "kirinji"),
|
||||
-- ("year", "1982"),
|
||||
-- ("format", "MP3"),
|
||||
-- ("releasetype", "album"),
|
||||
("order_by", "year")
|
||||
]
|
||||
( do
|
||||
status <- Json.key "status" Json.asText
|
||||
when (status /= "success") $ do
|
||||
Json.throwCustomError [fmt|Status was not "success", but {status}|]
|
||||
Json.key "response" $ do
|
||||
Json.key "results" $
|
||||
sequence
|
||||
<$> ( Json.eachInArray $ do
|
||||
groupId <- Json.key "groupId" (Json.asIntegral @_ @Int)
|
||||
groupName <- Json.key "groupName" Json.asText
|
||||
fullJsonResult <- Json.asValue
|
||||
let insertTourGroup = do
|
||||
_ <-
|
||||
execute
|
||||
[fmt|
|
||||
DELETE FROM redacted.torrent_groups
|
||||
WHERE group_id = ?::integer
|
||||
|]
|
||||
(Only groupId)
|
||||
executeManyReturningWith
|
||||
[fmt|
|
||||
INSERT INTO redacted.torrent_groups (
|
||||
group_id, group_name, full_json_result
|
||||
) VALUES
|
||||
( ?, ? , ? )
|
||||
RETURNING (id)
|
||||
|]
|
||||
[ ( groupId,
|
||||
groupName,
|
||||
fullJsonResult
|
||||
)
|
||||
]
|
||||
(label @"tourGroupIdPg" <$> Dec.fromField @Int)
|
||||
>>= ensureSingleRow
|
||||
insertTorrents <- Json.key "torrents" $ do
|
||||
torrents <- Json.eachInArray $ do
|
||||
torrentId <- Json.keyLabel @"torrentId" "torrentId" (Json.asIntegral @_ @Int)
|
||||
fullJsonResultT <- label @"fullJsonResult" <$> Json.asValue
|
||||
pure $ T2 torrentId fullJsonResultT
|
||||
pure $ \dat -> do
|
||||
_ <-
|
||||
execute
|
||||
[sql|
|
||||
DELETE FROM redacted.torrents
|
||||
WHERE torrent_id = ANY (?::integer[])
|
||||
|]
|
||||
(Only $ torrents & unzipT2 & (.torrentId) & PGArray)
|
||||
execute
|
||||
[sql|
|
||||
INSERT INTO redacted.torrents
|
||||
(torrent_id, torrent_group, full_json_result)
|
||||
SELECT inputs.torrent_id, static.torrent_group, inputs.full_json_result FROM
|
||||
(SELECT * FROM UNNEST(?::integer[], ?::jsonb[])) AS inputs(torrent_id, full_json_result)
|
||||
CROSS JOIN (VALUES(?::integer)) as static(torrent_group)
|
||||
|]
|
||||
( torrents
|
||||
& unzipT2
|
||||
& \t ->
|
||||
( t.torrentId & PGArray,
|
||||
t.fullJsonResult & PGArray,
|
||||
dat.tourGroupIdPg
|
||||
)
|
||||
)
|
||||
pure (insertTourGroup >>= insertTorrents)
|
||||
)
|
||||
)
|
||||
|
||||
hush :: Either a1 a2 -> Maybe a2
|
||||
hush (Left _) = Nothing
|
||||
hush (Right a) = Just a
|
||||
|
||||
unzipT2 :: forall l1 t1 l2 t2. [T2 l1 t1 l2 t2] -> T2 l1 [t1] l2 [t2]
|
||||
unzipT2 xs = xs <&> toTup & unzip & fromTup
|
||||
where
|
||||
toTup :: forall a b. T2 a t1 b t2 -> (t1, t2)
|
||||
toTup (T2 a b) = (getField @a a, getField @b b)
|
||||
fromTup :: (a, b) -> T2 l1 a l2 b
|
||||
fromTup (t1, t2) = T2 (label @l1 t1) (label @l2 t2)
|
||||
|
||||
unzipT3 :: forall l1 t1 l2 t2 l3 t3. [T3 l1 t1 l2 t2 l3 t3] -> T3 l1 [t1] l2 [t2] l3 [t3]
|
||||
unzipT3 xs = xs <&> toTup & unzip3 & fromTup
|
||||
where
|
||||
toTup :: forall a b c. T3 a t1 b t2 c t3 -> (t1, t2, t3)
|
||||
toTup (T3 a b c) = (getField @a a, getField @b b, getField @c c)
|
||||
fromTup :: (a, b, c) -> T3 l1 a l2 b l3 c
|
||||
fromTup (t1, t2, t3) = T3 (label @l1 t1) (label @l2 t2) (label @l3 t3)
|
||||
|
||||
migrate :: MonadPostgres m => Transaction m (Label "numberOfRowsAffected" Natural)
|
||||
migrate = do
|
||||
execute_
|
||||
[sql|
|
||||
CREATE SCHEMA IF NOT EXISTS redacted;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS redacted.torrent_groups (
|
||||
id SERIAL PRIMARY KEY,
|
||||
group_id INTEGER,
|
||||
group_name TEXT,
|
||||
full_json_result JSONB,
|
||||
UNIQUE(group_id)
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS redacted.torrents (
|
||||
id SERIAL PRIMARY KEY,
|
||||
torrent_id INTEGER,
|
||||
torrent_group SERIAL NOT NULL REFERENCES redacted.torrent_groups(id),
|
||||
full_json_result JSONB,
|
||||
UNIQUE(torrent_id)
|
||||
);
|
||||
|
||||
|]
|
||||
|
||||
redactedApiRequest ::
|
||||
( MonadThrow m,
|
||||
MonadIO m,
|
||||
MonadLogger m,
|
||||
HasField "action" p ByteString,
|
||||
HasField "actionArgs" p [(ByteString, Maybe ByteString)]
|
||||
) =>
|
||||
p ->
|
||||
Json.Parse ErrorTree a ->
|
||||
m a
|
||||
redactedApiRequest dat parse = do
|
||||
authKey <- runCommandExpect0 "pass" ["internet/redacted/api-keys/whatcd-resolver"]
|
||||
let req =
|
||||
[fmt|https://redacted.ch/ajax.php|]
|
||||
|
@ -156,7 +301,20 @@ redactedApiRequest dat = do
|
|||
& Http.setQueryString (("action", Just dat.action) : dat.actionArgs)
|
||||
& Http.setRequestHeader "Authorization" [authKey]
|
||||
Http.httpBS req
|
||||
<&> fmap (Json.decodeStrict' @Json.Value)
|
||||
>>= assertM
|
||||
( \resp -> case resp & Http.responseStatus & (.statusCode) of
|
||||
200 -> Right $ resp & Http.responseBody
|
||||
_ -> Left [fmt|Redacted returned an non-200 error code: {resp & showPretty}|]
|
||||
)
|
||||
>>= ( Json.parseStrict parse
|
||||
>>> first (Json.parseErrorTree "could not parse redacted response")
|
||||
>>> assertM id
|
||||
)
|
||||
|
||||
assertM :: MonadThrow f => (t -> Either ErrorTree a) -> t -> f a
|
||||
assertM f v = case f v of
|
||||
Right a -> pure a
|
||||
Left err -> appThrowTree err
|
||||
|
||||
runAppWith :: AppT IO a -> IO (Either TmpPg.StartError a)
|
||||
runAppWith appT = withDb $ \db -> do
|
||||
|
@ -170,12 +328,18 @@ runAppWith appT = withDb $ \db -> do
|
|||
{- unusedResourceOpenTime -} 10
|
||||
{- max resources per stripe -} 10
|
||||
transmissionSessionId <- newEmptyMVar
|
||||
runReaderT appT.unAppT Context {..}
|
||||
let newAppT = do
|
||||
logInfo [fmt|Running with config: {showPretty config}|]
|
||||
logInfo [fmt|Connected to database at {db & TmpPg.toDataDirectory} on socket {db & TmpPg.toConnectionString}|]
|
||||
appT
|
||||
runReaderT newAppT.unAppT Context {..}
|
||||
|
||||
withDb :: (TmpPg.DB -> IO a) -> IO (Either TmpPg.StartError a)
|
||||
withDb act = do
|
||||
dataDir <- Xdg.getXdgDirectory Xdg.XdgData "whatcd-resolver"
|
||||
let databaseDir = dataDir </> "database"
|
||||
let socketDir = dataDir </> "database-socket"
|
||||
Dir.createDirectoryIfMissing True socketDir
|
||||
initDbConfig <-
|
||||
Dir.doesDirectoryExist databaseDir >>= \case
|
||||
True -> pure TmpPg.Zlich
|
||||
|
@ -186,6 +350,8 @@ withDb act = do
|
|||
let cfg =
|
||||
mempty
|
||||
{ TmpPg.dataDirectory = TmpPg.Permanent (databaseDir),
|
||||
TmpPg.socketDirectory = TmpPg.Permanent socketDir,
|
||||
TmpPg.port = pure $ Just 5432,
|
||||
TmpPg.initDbConfig
|
||||
}
|
||||
TmpPg.withConfig cfg $ \db -> do
|
||||
|
@ -203,6 +369,8 @@ data Context = Context
|
|||
newtype AppT m a = AppT {unAppT :: ReaderT Context m a}
|
||||
deriving newtype (Functor, Applicative, Monad, MonadIO, MonadUnliftIO, MonadThrow)
|
||||
|
||||
type App a = AppT IO a
|
||||
|
||||
data AppException = AppException Text
|
||||
deriving stock (Show)
|
||||
deriving anyclass (Exception)
|
||||
|
@ -229,20 +397,24 @@ instance (MonadIO m) => MonadTransmission (AppT m) where
|
|||
instance (MonadThrow m, MonadUnliftIO m) => MonadPostgres (AppT m) where
|
||||
execute qry params = do
|
||||
conf <- lift $ AppT (asks (.config))
|
||||
logQueryIfEnabled conf qry (Left params)
|
||||
logQueryIfEnabled conf qry (HasSingleParam params)
|
||||
pgExecute qry params
|
||||
execute_ qry = do
|
||||
conf <- lift $ AppT (asks (.config))
|
||||
logQueryIfEnabled @(Only Text) conf qry HasNoParams
|
||||
pgExecute_ qry
|
||||
executeMany qry params = do
|
||||
conf <- lift $ AppT (asks (.config))
|
||||
logQueryIfEnabled conf qry (Right params)
|
||||
logQueryIfEnabled conf qry (HasMultiParams params)
|
||||
pgExecuteMany qry params
|
||||
executeManyReturning qry params = do
|
||||
executeManyReturningWith qry params dec = do
|
||||
conf <- lift $ AppT (asks (.config))
|
||||
logQueryIfEnabled conf qry (Right params)
|
||||
pgExecuteManyReturning qry params
|
||||
logQueryIfEnabled conf qry (HasMultiParams params)
|
||||
pgExecuteManyReturningWith qry params dec
|
||||
|
||||
queryWith qry params decoder = do
|
||||
conf <- lift $ AppT (asks (.config))
|
||||
logQueryIfEnabled conf qry (Left params)
|
||||
logQueryIfEnabled conf qry (HasSingleParam params)
|
||||
pgQueryWith qry params decoder
|
||||
|
||||
-- TODO: log these queries as well with `logQueryIfEnabled`, but test out whether it works with query_ and foldRows first.
|
||||
|
@ -271,8 +443,14 @@ withPGTransaction connPool f =
|
|||
connPool
|
||||
(\conn -> Postgres.withTransaction conn (f conn))
|
||||
|
||||
data HasQueryParams param
|
||||
= HasNoParams
|
||||
| HasSingleParam param
|
||||
| HasMultiParams [param]
|
||||
|
||||
-- | Log the postgres query depending on the setting of @config.debugInfo.logDatabaseQueries@.
|
||||
logQueryIfEnabled ::
|
||||
forall params config m.
|
||||
( Postgres.ToRow params,
|
||||
MonadUnliftIO m,
|
||||
MonadLogger m,
|
||||
|
@ -281,13 +459,14 @@ logQueryIfEnabled ::
|
|||
) =>
|
||||
config ->
|
||||
Postgres.Query ->
|
||||
Either params [params] ->
|
||||
HasQueryParams params ->
|
||||
Transaction m ()
|
||||
logQueryIfEnabled config qry params = do
|
||||
-- In case we have query logging enabled, we want to do that
|
||||
let formattedQuery = case params of
|
||||
Left p -> pgFormatQuery' qry p
|
||||
Right ps -> pgFormatQueryMany' qry ps
|
||||
HasNoParams -> pgFormatQueryNoParams' qry
|
||||
HasSingleParam p -> pgFormatQuery' qry p
|
||||
HasMultiParams ps -> pgFormatQueryMany' qry ps
|
||||
|
||||
let doLog errs =
|
||||
errs
|
||||
|
@ -330,3 +509,4 @@ data DatabaseLogging
|
|||
= DontLogDatabaseQueries
|
||||
| LogDatabaseQueries
|
||||
| LogDatabaseQueriesAndExplain
|
||||
deriving stock (Show)
|
||||
|
|
Loading…
Reference in a new issue