fix(users/Profpatsch/MonadPostgres): take old formatter process

The pool library would always take out the most recently used perl
resource again, and since that is the one that we just spawned, we’d
be back at square one. Instead, we try to find an older one (or up to
200ms old) and use that instead, because that should be the one with
the fastest response time.

Okay, that was enough bullshit lol.

Change-Id: I6b999e682d02ab03206a9d1b707edf16daa04a0d
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11657
Tested-by: BuildkiteCI
Reviewed-by: Profpatsch <mail@profpatsch.de>
Autosubmit: Profpatsch <mail@profpatsch.de>
This commit is contained in:
Profpatsch 2024-05-13 16:02:25 +02:00 committed by clbot
parent 5ea5dff597
commit 14353ce751

View file

@ -5,6 +5,7 @@
module Postgres.MonadPostgres where module Postgres.MonadPostgres where
import Arg
import AtLeast (AtLeast) import AtLeast (AtLeast)
import Control.Exception import Control.Exception
( Exception (displayException), ( Exception (displayException),
@ -26,7 +27,6 @@ import Data.List qualified as List
import Data.Pool (Pool) import Data.Pool (Pool)
import Data.Pool qualified as Pool import Data.Pool qualified as Pool
import Data.Text qualified as Text import Data.Text qualified as Text
import Data.Time (getCurrentTime)
import Data.Typeable (Typeable) import Data.Typeable (Typeable)
import Database.PostgreSQL.Simple (Connection, FormatError, FromRow, Query, QueryError, ResultError, SqlError, ToRow) import Database.PostgreSQL.Simple (Connection, FormatError, FromRow, Query, QueryError, ResultError, SqlError, ToRow)
import Database.PostgreSQL.Simple qualified as PG import Database.PostgreSQL.Simple qualified as PG
@ -48,7 +48,7 @@ import Pretty (showPretty)
import Seconds import Seconds
import System.Exit (ExitCode (..)) import System.Exit (ExitCode (..))
import Tool import Tool
import UnliftIO (MonadUnliftIO (withRunInIO), bracket, hClose) import UnliftIO (MonadUnliftIO (withRunInIO), bracket, hClose, mask_)
import UnliftIO.Concurrent (forkIO) import UnliftIO.Concurrent (forkIO)
import UnliftIO.Process (ProcessHandle) import UnliftIO.Process (ProcessHandle)
import UnliftIO.Process qualified as Process import UnliftIO.Process qualified as Process
@ -429,7 +429,8 @@ data PgFormatProcess = PgFormatProcess
{ stdinHdl :: Handle, { stdinHdl :: Handle,
stdoutHdl :: Handle, stdoutHdl :: Handle,
stderrHdl :: Handle, stderrHdl :: Handle,
procHdl :: ProcessHandle procHdl :: ProcessHandle,
startedAt :: Otel.Timestamp
} }
initPgFormatPool :: (HasField "pgFormat" tools Tool) => tools -> IO PgFormatPool initPgFormatPool :: (HasField "pgFormat" tools Tool) => tools -> IO PgFormatPool
@ -463,11 +464,34 @@ initPgFormatPool tools = do
destroyPgFormatPool :: PgFormatPool -> IO () destroyPgFormatPool :: PgFormatPool -> IO ()
destroyPgFormatPool pool = Pool.destroyAllResources pool.pool destroyPgFormatPool pool = Pool.destroyAllResources pool.pool
-- | Get the oldest resource from the pool, or stop if you find a resource thats older than `cutoffPointMs`.
takeOldestResource :: PgFormatPool -> Arg "cutoffPointMs" Integer -> IO (PgFormatProcess, Pool.LocalPool PgFormatProcess)
takeOldestResource pool cutoffPointMs = do
now <- Otel.getTimestamp
mask_ $ do
a <- Pool.takeResource pool.pool
(putBack, res) <- go now [] a
-- make sure we dont leak any resources we didnt use in the end
for_ putBack $ \(x, xLocal) -> Pool.putResource xLocal x
pure res
where
mkMs ts = (ts & Otel.timestampNanoseconds & toInteger) `div` 1000_000
go now putBack a@(a', _) =
if abs (mkMs now - mkMs a'.startedAt) > cutoffPointMs.unArg
then pure (putBack, a)
else
Pool.tryTakeResource pool.pool >>= \case
Nothing -> pure (putBack, a)
Just b@(b', _) -> do
if a'.startedAt < b'.startedAt
then go now (b : putBack) a
else go now (a : putBack) b
-- | Format the given SQL with pg_formatter. Will use the pool of already running formatters to speed up execution. -- | Format the given SQL with pg_formatter. Will use the pool of already running formatters to speed up execution.
runPgFormat :: PgFormatPool -> ByteString -> IO (T3 "exitCode" ExitCode "formatted" ByteString "stderr" ByteString) runPgFormat :: PgFormatPool -> ByteString -> IO (T3 "exitCode" ExitCode "formatted" ByteString "stderr" ByteString)
runPgFormat pool sqlStatement = do runPgFormat pool sqlStatement = do
bracket bracket
(Pool.takeResource pool.pool) (takeOldestResource pool 200)
( \(a, localPool) -> do ( \(a, localPool) -> do
-- we always destroy the resource, because the process exited -- we always destroy the resource, because the process exited
Pool.destroyResource pool.pool localPool a Pool.destroyResource pool.pool localPool a
@ -772,6 +796,7 @@ pgFormatStartCommandWaitForInput ::
m PgFormatProcess m PgFormatProcess
pgFormatStartCommandWaitForInput tools = do pgFormatStartCommandWaitForInput tools = do
do do
startedAt <- Otel.getTimestamp
(Just stdinHdl, Just stdoutHdl, Just stderrHdl, procHdl) <- (Just stdinHdl, Just stdoutHdl, Just stderrHdl, procHdl) <-
Process.createProcess Process.createProcess
( ( Process.proc ( ( Process.proc