From be47152096bd58002af6e38e32ebe4a5ca826f8e Mon Sep 17 00:00:00 2001 From: Martin Date: Tue, 26 Mar 2024 09:25:44 +0100 Subject: [PATCH] tech(export_job): sometimes ExportJob are OOMed, in those cases jobs are stuck and never retried. release lock and increase attemps --- Gemfile | 1 + Gemfile.lock | 3 ++ app/jobs/cron/release_crashed_export_job.rb | 44 +++++++++++++++ .../cron/release_crashed_export_job_spec.rb | 53 +++++++++++++++++++ 4 files changed, 101 insertions(+) create mode 100644 app/jobs/cron/release_crashed_export_job.rb create mode 100644 spec/jobs/cron/release_crashed_export_job_spec.rb diff --git a/Gemfile b/Gemfile index e2c2022b1..438b36719 100644 --- a/Gemfile +++ b/Gemfile @@ -91,6 +91,7 @@ gem 'sidekiq' gem 'skylight' gem 'spreadsheet_architect' gem 'strong_migrations' # lint database migrations +gem 'sys-proctable' gem 'turbo-rails' gem 'typhoeus' gem 'ulid-ruby', require: 'ulid' diff --git a/Gemfile.lock b/Gemfile.lock index 4a2910f40..cb824354b 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -734,6 +734,8 @@ GEM attr_required (>= 0.0.5) faraday (~> 2.0) faraday-follow_redirects + sys-proctable (1.3.0) + ffi (~> 1.1) sysexits (1.2.0) temple (0.8.2) terminal-table (3.0.2) @@ -940,6 +942,7 @@ DEPENDENCIES spring-commands-rspec stackprof strong_migrations + sys-proctable timecop turbo-rails typhoeus diff --git a/app/jobs/cron/release_crashed_export_job.rb b/app/jobs/cron/release_crashed_export_job.rb new file mode 100644 index 000000000..6a36237a1 --- /dev/null +++ b/app/jobs/cron/release_crashed_export_job.rb @@ -0,0 +1,44 @@ +class Cron::ReleaseCrashedExportJob < Cron::CronJob + self.schedule_expression = "every 10 minute" + SECSCAN_LIMIT = 20_000 + + def perform(*args) + return if !performable? + export_jobs = jobs_for_current_host + + return if export_jobs.empty? + + host_pids = Sys::ProcTable.ps.map(&:pid) + export_jobs.each do |job| + _, pid = hostname_and_pid(job.locked_by) + + reset(job:) if host_pids.exclude?(pid.to_i) + end + end + + def reset(job:) + job.locked_by = nil + job.locked_at = nil + job.attempts += 1 + job.save! + end + + def hostname_and_pid(worker_name) + matches = /host:(?.*) pid:(?\d+)/.match(worker_name) + [matches[:host], matches[:pid]] + end + + def jobs_for_current_host + Delayed::Job.where("locked_by like ?", "%#{whoami}%") + .where(queue: ExportJob.queue_name) + end + + def whoami + me, _ = hostname_and_pid(Delayed::Worker.new.name) + me + end + + def performable? + Delayed::Job.count < SECSCAN_LIMIT + end +end diff --git a/spec/jobs/cron/release_crashed_export_job_spec.rb b/spec/jobs/cron/release_crashed_export_job_spec.rb new file mode 100644 index 000000000..43de73576 --- /dev/null +++ b/spec/jobs/cron/release_crashed_export_job_spec.rb @@ -0,0 +1,53 @@ +describe Cron::ReleaseCrashedExportJob do + let(:handler) { "whocares" } + + def locked_by(hostname) + "delayed_job.33 host:#{hostname} pid:1252488" + end + + describe '.perform' do + subject { described_class.new.perform } + let!(:job) { Delayed::Job.create!(handler:, queue: ExportJob.queue_name, locked_by: locked_by(Socket.gethostname)) } + + it 'releases lock' do + expect { subject }.to change { job.reload.locked_by }.from(anything).to(nil) + end + it 'increases attempts' do + expect { subject }.to change { job.reload.attempts }.by(1) + end + end + + describe '.hostname_and_pid' do + subject { described_class.new.hostname_and_pid(Delayed::Worker.new.name) } + it 'extract hostname and pid from worker.name' do + hostname, pid = subject + + expect(hostname).to eq(Socket.gethostname) + expect(pid).to eq(Process.pid.to_s) + end + end + + describe 'whoami' do + subject { described_class.new.whoami } + it { is_expected.to eq(Socket.gethostname) } + end + + describe 'jobs_for_current_host' do + subject { described_class.new.jobs_for_current_host } + + context 'when jobs run an another host' do + let!(:job) { Delayed::Job.create!(handler:, queue: :default, locked_by: locked_by('spec1.prod')) } + it { is_expected.to be_empty } + end + + context 'when jobs run an same host with default queue' do + let!(:job) { Delayed::Job.create!(handler:, queue: :default, locked_by: locked_by(Socket.gethostname)) } + it { is_expected.to be_empty } + end + + context 'when jobs run an same host with exports queue' do + let!(:job) { Delayed::Job.create!(handler:, queue: ExportJob.queue_name, locked_by: locked_by(Socket.gethostname)) } + it { is_expected.to include(job) } + end + end +end