tech(export_job): sometimes ExportJob are OOMed, in those cases jobs are stuck and never retried. release lock and increase attemps

This commit is contained in:
Martin 2024-03-26 09:25:44 +01:00
parent 536a03bbb4
commit be47152096
4 changed files with 101 additions and 0 deletions

View file

@ -91,6 +91,7 @@ gem 'sidekiq'
gem 'skylight' gem 'skylight'
gem 'spreadsheet_architect' gem 'spreadsheet_architect'
gem 'strong_migrations' # lint database migrations gem 'strong_migrations' # lint database migrations
gem 'sys-proctable'
gem 'turbo-rails' gem 'turbo-rails'
gem 'typhoeus' gem 'typhoeus'
gem 'ulid-ruby', require: 'ulid' gem 'ulid-ruby', require: 'ulid'

View file

@ -734,6 +734,8 @@ GEM
attr_required (>= 0.0.5) attr_required (>= 0.0.5)
faraday (~> 2.0) faraday (~> 2.0)
faraday-follow_redirects faraday-follow_redirects
sys-proctable (1.3.0)
ffi (~> 1.1)
sysexits (1.2.0) sysexits (1.2.0)
temple (0.8.2) temple (0.8.2)
terminal-table (3.0.2) terminal-table (3.0.2)
@ -940,6 +942,7 @@ DEPENDENCIES
spring-commands-rspec spring-commands-rspec
stackprof stackprof
strong_migrations strong_migrations
sys-proctable
timecop timecop
turbo-rails turbo-rails
typhoeus typhoeus

View file

@ -0,0 +1,44 @@
class Cron::ReleaseCrashedExportJob < Cron::CronJob
self.schedule_expression = "every 10 minute"
SECSCAN_LIMIT = 20_000
def perform(*args)
return if !performable?
export_jobs = jobs_for_current_host
return if export_jobs.empty?
host_pids = Sys::ProcTable.ps.map(&:pid)
export_jobs.each do |job|
_, pid = hostname_and_pid(job.locked_by)
reset(job:) if host_pids.exclude?(pid.to_i)
end
end
def reset(job:)
job.locked_by = nil
job.locked_at = nil
job.attempts += 1
job.save!
end
def hostname_and_pid(worker_name)
matches = /host:(?<host>.*) pid:(?<pid>\d+)/.match(worker_name)
[matches[:host], matches[:pid]]
end
def jobs_for_current_host
Delayed::Job.where("locked_by like ?", "%#{whoami}%")
.where(queue: ExportJob.queue_name)
end
def whoami
me, _ = hostname_and_pid(Delayed::Worker.new.name)
me
end
def performable?
Delayed::Job.count < SECSCAN_LIMIT
end
end

View file

@ -0,0 +1,53 @@
describe Cron::ReleaseCrashedExportJob do
let(:handler) { "whocares" }
def locked_by(hostname)
"delayed_job.33 host:#{hostname} pid:1252488"
end
describe '.perform' do
subject { described_class.new.perform }
let!(:job) { Delayed::Job.create!(handler:, queue: ExportJob.queue_name, locked_by: locked_by(Socket.gethostname)) }
it 'releases lock' do
expect { subject }.to change { job.reload.locked_by }.from(anything).to(nil)
end
it 'increases attempts' do
expect { subject }.to change { job.reload.attempts }.by(1)
end
end
describe '.hostname_and_pid' do
subject { described_class.new.hostname_and_pid(Delayed::Worker.new.name) }
it 'extract hostname and pid from worker.name' do
hostname, pid = subject
expect(hostname).to eq(Socket.gethostname)
expect(pid).to eq(Process.pid.to_s)
end
end
describe 'whoami' do
subject { described_class.new.whoami }
it { is_expected.to eq(Socket.gethostname) }
end
describe 'jobs_for_current_host' do
subject { described_class.new.jobs_for_current_host }
context 'when jobs run an another host' do
let!(:job) { Delayed::Job.create!(handler:, queue: :default, locked_by: locked_by('spec1.prod')) }
it { is_expected.to be_empty }
end
context 'when jobs run an same host with default queue' do
let!(:job) { Delayed::Job.create!(handler:, queue: :default, locked_by: locked_by(Socket.gethostname)) }
it { is_expected.to be_empty }
end
context 'when jobs run an same host with exports queue' do
let!(:job) { Delayed::Job.create!(handler:, queue: ExportJob.queue_name, locked_by: locked_by(Socket.gethostname)) }
it { is_expected.to include(job) }
end
end
end