Merge pull request #10215 from demarches-simplifiees/US/catch-oomed-jobs
tech(export_job): sometimes ExportJob are OOMed, in those cases jobs are stuck and never retried. release lock and increase attempts
This commit is contained in:
commit
465de75e6b
4 changed files with 101 additions and 0 deletions
1
Gemfile
1
Gemfile
|
@ -93,6 +93,7 @@ gem 'sidekiq'
|
|||
gem 'skylight'
|
||||
gem 'spreadsheet_architect'
|
||||
gem 'strong_migrations' # lint database migrations
|
||||
gem 'sys-proctable'
|
||||
gem 'turbo-rails'
|
||||
gem 'typhoeus'
|
||||
gem 'ulid-ruby', require: 'ulid'
|
||||
|
|
|
@ -748,6 +748,8 @@ GEM
|
|||
attr_required (>= 0.0.5)
|
||||
faraday (~> 2.0)
|
||||
faraday-follow_redirects
|
||||
sys-proctable (1.3.0)
|
||||
ffi (~> 1.1)
|
||||
sysexits (1.2.0)
|
||||
temple (0.8.2)
|
||||
terminal-table (3.0.2)
|
||||
|
@ -956,6 +958,7 @@ DEPENDENCIES
|
|||
spring-commands-rspec
|
||||
stackprof
|
||||
strong_migrations
|
||||
sys-proctable
|
||||
timecop
|
||||
turbo-rails
|
||||
typhoeus
|
||||
|
|
44
app/jobs/cron/release_crashed_export_job.rb
Normal file
44
app/jobs/cron/release_crashed_export_job.rb
Normal file
|
@ -0,0 +1,44 @@
|
|||
class Cron::ReleaseCrashedExportJob < Cron::CronJob
|
||||
self.schedule_expression = "every 10 minute"
|
||||
SECSCAN_LIMIT = 20_000
|
||||
|
||||
def perform(*args)
|
||||
return if !performable?
|
||||
export_jobs = jobs_for_current_host
|
||||
|
||||
return if export_jobs.empty?
|
||||
|
||||
host_pids = Sys::ProcTable.ps.map(&:pid)
|
||||
export_jobs.each do |job|
|
||||
_, pid = hostname_and_pid(job.locked_by)
|
||||
|
||||
reset(job:) if host_pids.exclude?(pid.to_i)
|
||||
end
|
||||
end
|
||||
|
||||
def reset(job:)
|
||||
job.locked_by = nil
|
||||
job.locked_at = nil
|
||||
job.attempts += 1
|
||||
job.save!
|
||||
end
|
||||
|
||||
def hostname_and_pid(worker_name)
|
||||
matches = /host:(?<host>.*) pid:(?<pid>\d+)/.match(worker_name)
|
||||
[matches[:host], matches[:pid]]
|
||||
end
|
||||
|
||||
def jobs_for_current_host
|
||||
Delayed::Job.where("locked_by like ?", "%#{whoami}%")
|
||||
.where(queue: ExportJob.queue_name)
|
||||
end
|
||||
|
||||
def whoami
|
||||
me, _ = hostname_and_pid(Delayed::Worker.new.name)
|
||||
me
|
||||
end
|
||||
|
||||
def performable?
|
||||
Delayed::Job.count < SECSCAN_LIMIT
|
||||
end
|
||||
end
|
53
spec/jobs/cron/release_crashed_export_job_spec.rb
Normal file
53
spec/jobs/cron/release_crashed_export_job_spec.rb
Normal file
|
@ -0,0 +1,53 @@
|
|||
describe Cron::ReleaseCrashedExportJob do
|
||||
let(:handler) { "whocares" }
|
||||
|
||||
def locked_by(hostname)
|
||||
"delayed_job.33 host:#{hostname} pid:1252488"
|
||||
end
|
||||
|
||||
describe '.perform' do
|
||||
subject { described_class.new.perform }
|
||||
let!(:job) { Delayed::Job.create!(handler:, queue: ExportJob.queue_name, locked_by: locked_by(Socket.gethostname)) }
|
||||
|
||||
it 'releases lock' do
|
||||
expect { subject }.to change { job.reload.locked_by }.from(anything).to(nil)
|
||||
end
|
||||
it 'increases attempts' do
|
||||
expect { subject }.to change { job.reload.attempts }.by(1)
|
||||
end
|
||||
end
|
||||
|
||||
describe '.hostname_and_pid' do
|
||||
subject { described_class.new.hostname_and_pid(Delayed::Worker.new.name) }
|
||||
it 'extract hostname and pid from worker.name' do
|
||||
hostname, pid = subject
|
||||
|
||||
expect(hostname).to eq(Socket.gethostname)
|
||||
expect(pid).to eq(Process.pid.to_s)
|
||||
end
|
||||
end
|
||||
|
||||
describe 'whoami' do
|
||||
subject { described_class.new.whoami }
|
||||
it { is_expected.to eq(Socket.gethostname) }
|
||||
end
|
||||
|
||||
describe 'jobs_for_current_host' do
|
||||
subject { described_class.new.jobs_for_current_host }
|
||||
|
||||
context 'when jobs run an another host' do
|
||||
let!(:job) { Delayed::Job.create!(handler:, queue: :default, locked_by: locked_by('spec1.prod')) }
|
||||
it { is_expected.to be_empty }
|
||||
end
|
||||
|
||||
context 'when jobs run an same host with default queue' do
|
||||
let!(:job) { Delayed::Job.create!(handler:, queue: :default, locked_by: locked_by(Socket.gethostname)) }
|
||||
it { is_expected.to be_empty }
|
||||
end
|
||||
|
||||
context 'when jobs run an same host with exports queue' do
|
||||
let!(:job) { Delayed::Job.create!(handler:, queue: ExportJob.queue_name, locked_by: locked_by(Socket.gethostname)) }
|
||||
it { is_expected.to include(job) }
|
||||
end
|
||||
end
|
||||
end
|
Loading…
Reference in a new issue