feat(users/Profpatsch/MonadPostgres): add PgFormatPool
Change-Id: Id65ee6184ef536fe6a46637005bea903b37f6ffd Reviewed-on: https://cl.tvl.fyi/c/depot/+/11653 Autosubmit: Profpatsch <mail@profpatsch.de> Tested-by: BuildkiteCI Reviewed-by: Profpatsch <mail@profpatsch.de>
This commit is contained in:
parent
e2a52407f8
commit
aa85a18723
1 changed files with 127 additions and 19 deletions
|
@ -7,11 +7,17 @@ module Postgres.MonadPostgres where
|
|||
|
||||
import AtLeast (AtLeast)
|
||||
import Control.Exception
|
||||
( Exception (displayException),
|
||||
Handler (Handler),
|
||||
catches,
|
||||
try,
|
||||
)
|
||||
import Control.Foldl qualified as Fold
|
||||
import Control.Monad.Logger.CallStack (MonadLogger, logDebug, logWarn)
|
||||
import Control.Monad.Reader (MonadReader (ask), ReaderT (..))
|
||||
import Control.Monad.Trans.Resource
|
||||
import Data.Aeson (FromJSON)
|
||||
import Data.ByteString qualified as ByteString
|
||||
import Data.Error.Tree
|
||||
import Data.HashMap.Strict qualified as HashMap
|
||||
import Data.Int (Int64)
|
||||
|
@ -28,6 +34,7 @@ import Database.PostgreSQL.Simple.FromRow qualified as PG
|
|||
import Database.PostgreSQL.Simple.ToField (ToField)
|
||||
import Database.PostgreSQL.Simple.ToRow (ToRow (toRow))
|
||||
import Database.PostgreSQL.Simple.Types (Query (..))
|
||||
import GHC.IO.Handle (Handle)
|
||||
import GHC.Records (getField)
|
||||
import Label
|
||||
import OpenTelemetry.Trace.Core qualified as Otel hiding (inSpan, inSpan')
|
||||
|
@ -39,7 +46,9 @@ import Pretty (showPretty)
|
|||
import Seconds
|
||||
import System.Exit (ExitCode (..))
|
||||
import Tool
|
||||
import UnliftIO (MonadUnliftIO (withRunInIO))
|
||||
import UnliftIO (MonadUnliftIO (withRunInIO), bracket, hClose)
|
||||
import UnliftIO.Concurrent (forkIO)
|
||||
import UnliftIO.Process (ProcessHandle)
|
||||
import UnliftIO.Process qualified as Process
|
||||
import UnliftIO.Resource qualified as Resource
|
||||
import Prelude hiding (init, span)
|
||||
|
@ -405,6 +414,74 @@ withPGTransaction connPool f =
|
|||
connPool
|
||||
(\conn -> Postgres.withTransaction conn (f conn))
|
||||
|
||||
-- | `pg_formatter` is a perl script that does not support any kind of streaming.
|
||||
-- Thus we initialize a pool with a bunch of these scripts running, waiting for input. This way we can have somewhat fast SQL formatting.
|
||||
--
|
||||
-- Call `initPgFormatPool` to initialize, then use `withPgFormat` to format some sql.
|
||||
data PgFormatPool = PgFormatPool
|
||||
{ pool :: Pool PgFormatProcess,
|
||||
pgFormat :: Tool
|
||||
}
|
||||
|
||||
data PgFormatProcess = PgFormatProcess
|
||||
{ stdinHdl :: Handle,
|
||||
stdoutHdl :: Handle,
|
||||
procHdl :: ProcessHandle
|
||||
}
|
||||
|
||||
initPgFormatPool :: (HasField "pgFormat" tools Tool) => tools -> IO PgFormatPool
|
||||
initPgFormatPool tools = do
|
||||
pool <-
|
||||
Pool.newPool
|
||||
( Pool.defaultPoolConfig
|
||||
(pgFormatStartCommandWaitForInput tools)
|
||||
( \pgFmt -> do
|
||||
Process.terminateProcess pgFmt.procHdl
|
||||
-- make sure we don’t leave any zombies
|
||||
_ <- forkIO $ do
|
||||
_ <- Process.waitForProcess pgFmt.procHdl
|
||||
pure ()
|
||||
pure ()
|
||||
)
|
||||
-- unused resource time
|
||||
100
|
||||
-- number of resources
|
||||
3
|
||||
)
|
||||
|
||||
-- fill the pool with resources
|
||||
let go =
|
||||
Pool.tryWithResource pool (\_ -> go) >>= \case
|
||||
Nothing -> pure ()
|
||||
Just () -> pure ()
|
||||
_ <- go
|
||||
pure (PgFormatPool {pool, pgFormat = tools.pgFormat})
|
||||
|
||||
destroyPgFormatPool :: PgFormatPool -> IO ()
|
||||
destroyPgFormatPool pool = Pool.destroyAllResources pool.pool
|
||||
|
||||
-- | Format the given SQL with pg_formatter. Will use the pool of already running formatters to speed up execution.
|
||||
withPgFormat :: PgFormatPool -> ByteString -> IO (ExitCode, ByteString)
|
||||
withPgFormat pool sqlStatement = do
|
||||
bracket
|
||||
(Pool.takeResource pool.pool)
|
||||
( \(a, localPool) -> do
|
||||
-- we always destroy the resource, because the process exited
|
||||
Pool.destroyResource pool.pool localPool a
|
||||
-- create a new process to keep the pool “warm”
|
||||
new <- pgFormatStartCommandWaitForInput pool
|
||||
Pool.putResource localPool new
|
||||
)
|
||||
( \(pgFmt, _localPool) -> do
|
||||
ByteString.hPut pgFmt.stdinHdl sqlStatement
|
||||
-- close stdin to make pg_formatter format (it exits …)
|
||||
-- issue: https://github.com/darold/pgFormatter/issues/333
|
||||
hClose pgFmt.stdinHdl
|
||||
formatted <- ByteString.hGetContents pgFmt.stdoutHdl
|
||||
exitCode <- Process.waitForProcess pgFmt.procHdl
|
||||
pure (exitCode, formatted)
|
||||
)
|
||||
|
||||
runPGTransactionImpl ::
|
||||
(MonadUnliftIO m) =>
|
||||
m (Pool Postgres.Connection) ->
|
||||
|
@ -664,21 +741,50 @@ pgFormatQueryByteString tools queryBytes = do
|
|||
"-"
|
||||
]
|
||||
(queryBytes & bytesToTextUtf8Lenient & textToString)
|
||||
case exitCode of
|
||||
ExitSuccess -> pure (stdout & stringToText)
|
||||
ExitFailure status -> do
|
||||
logWarn [fmt|pg_format failed with status {status} while formatting the query, using original query string. Is there a syntax error?|]
|
||||
logDebug
|
||||
( prettyErrorTree
|
||||
( nestedMultiError
|
||||
"pg_format output"
|
||||
( nestedError "stdout" (singleError (stdout & stringToText & newError))
|
||||
:| [(nestedError "stderr" (singleError (stderr & stringToText & newError)))]
|
||||
)
|
||||
)
|
||||
handlePgFormatExitCode exitCode stdout stderr queryBytes
|
||||
|
||||
pgFormatStartCommandWaitForInput ::
|
||||
( MonadIO m,
|
||||
HasField "pgFormat" tools Tool,
|
||||
MonadFail m
|
||||
) =>
|
||||
tools ->
|
||||
m PgFormatProcess
|
||||
pgFormatStartCommandWaitForInput tools = do
|
||||
do
|
||||
(Just stdinHdl, Just stdoutHdl, Nothing, procHdl) <-
|
||||
Process.createProcess
|
||||
( ( Process.proc
|
||||
tools.pgFormat.toolPath
|
||||
[ "--no-rcfile",
|
||||
"-"
|
||||
]
|
||||
)
|
||||
logDebug [fmt|pg_format stdout: stderr|]
|
||||
pure (queryBytes & bytesToTextUtf8Lenient)
|
||||
{ Process.std_in = Process.CreatePipe,
|
||||
Process.std_out = Process.CreatePipe,
|
||||
Process.std_err = Process.Inherit
|
||||
}
|
||||
)
|
||||
|
||||
pure PgFormatProcess {..}
|
||||
|
||||
handlePgFormatExitCode :: (MonadLogger m) => ExitCode -> String -> String -> ByteString -> m Text
|
||||
handlePgFormatExitCode exitCode stdout stderr queryBytes =
|
||||
case exitCode of
|
||||
ExitSuccess -> pure (stdout & stringToText)
|
||||
ExitFailure status -> do
|
||||
logWarn [fmt|pg_format failed with status {status} while formatting the query, using original query string. Is there a syntax error?|]
|
||||
logDebug
|
||||
( prettyErrorTree
|
||||
( nestedMultiError
|
||||
"pg_format output"
|
||||
( nestedError "stdout" (singleError (stdout & stringToText & newError))
|
||||
:| [(nestedError "stderr" (singleError (stderr & stringToText & newError)))]
|
||||
)
|
||||
)
|
||||
)
|
||||
logDebug [fmt|pg_format stdout: stderr|]
|
||||
pure (queryBytes & bytesToTextUtf8Lenient)
|
||||
|
||||
data DebugLogDatabaseQueries
|
||||
= -- | Do not log the database queries
|
||||
|
@ -710,10 +816,12 @@ traceQueryIfEnabled ::
|
|||
Transaction m ()
|
||||
traceQueryIfEnabled tools span logDatabaseQueries qry params = do
|
||||
-- In case we have query logging enabled, we want to do that
|
||||
let formattedQuery = case params of
|
||||
HasNoParams -> pgFormatQueryNoParams' tools qry
|
||||
HasSingleParam p -> pgFormatQuery' tools qry p
|
||||
HasMultiParams ps -> pgFormatQueryMany' tools qry ps
|
||||
let formattedQuery =
|
||||
Otel.inSpan "Postgres Query Formatting" Otel.defaultSpanArguments $
|
||||
case params of
|
||||
HasNoParams -> pgFormatQueryNoParams' tools qry
|
||||
HasSingleParam p -> pgFormatQuery' tools qry p
|
||||
HasMultiParams ps -> pgFormatQueryMany' tools qry ps
|
||||
let doLog errs =
|
||||
Otel.addAttributes
|
||||
span
|
||||
|
|
Loading…
Reference in a new issue