poc(batch_operation): some rewrite to avoid various conflict (when an instructeur try to create a job with an incompatible dossier regarding the current task). also soome cleanup to isole spec in least involved model

This commit is contained in:
Martin 2022-11-25 15:43:00 +01:00 committed by mfo
parent 4266ab93c5
commit eaf72162da
8 changed files with 240 additions and 129 deletions

View file

@ -4,10 +4,7 @@ module Instructeurs
before_action :ensure_ownership! before_action :ensure_ownership!
def create def create
ActiveRecord::Base.transaction do BatchOperation.safe_create!(batch_operation_params.merge(instructeur: current_instructeur))
batch_operation = BatchOperation.create!(batch_operation_params.merge(instructeur: current_instructeur))
BatchOperationEnqueueAllJob.perform_later(batch_operation)
end
redirect_back(fallback_location: instructeur_procedure_url(@procedure.id)) redirect_back(fallback_location: instructeur_procedure_url(@procedure.id))
end end
@ -15,9 +12,7 @@ module Instructeurs
def batch_operation_params def batch_operation_params
params.require(:batch_operation) params.require(:batch_operation)
.permit(:operation, dossier_ids: []).tap do |params| .permit(:operation, dossier_ids: [])
# TODO: filter dossiers_ids out of instructeurs.dossiers.ids
end
end end
def set_procedure def set_procedure

View file

@ -1,15 +1,16 @@
class BatchOperationProcessOneJob < ApplicationJob class BatchOperationProcessOneJob < ApplicationJob
# what about wrapping all of that in a transaction retry_on StandardError, attempts: 1
# but, what about nested transaction because batch_operation.process_one(dossier) can run transaction
def perform(batch_operation, dossier) def perform(batch_operation, dossier)
success = true dossier = batch_operation.dossiers_safe_scope.find(dossier.id)
begin begin
batch_operation.process_one(dossier) batch_operation.process_one(dossier)
batch_operation.track_processed_dossier(true, dossier)
rescue => error rescue => error
success = false batch_operation.track_processed_dossier(false, dossier)
raise error raise error
ensure
batch_operation.track_dossier_processed(success, dossier)
end end
rescue ActiveRecord::RecordNotFound
dossier.update(batch_operation_id: nil)
end end
end end

View file

@ -24,14 +24,33 @@ class BatchOperation < ApplicationRecord
validates :operation, presence: true validates :operation, presence: true
def enqueue_all def dossiers_safe_scope(dossier_ids = self.dossier_ids)
Dossier.joins(:procedure) query = Dossier.joins(:procedure)
.where(procedure: { id: instructeur.procedures.ids }) .where(procedure: { id: instructeur.procedures.ids })
.where(id: dossiers.ids) .where(id: dossier_ids)
.visible_by_administration
case operation
when BatchOperation.operations.fetch(:archiver) then
query.not_archived.state_termine
end
end
def enqueue_all
dossiers_safe_scope # later in batch .
.map { |dossier| BatchOperationProcessOneJob.perform_later(self, dossier) } .map { |dossier| BatchOperationProcessOneJob.perform_later(self, dossier) }
end end
def track_dossier_processed(success, dossier) def process_one(dossier)
case operation
when BatchOperation.operations.fetch(:archiver)
dossier.archiver!(instructeur)
end
end
# use Arel::UpdateManager for array_append/array_remove (inspired by atomic_append)
# see: https://www.rubydoc.info/gems/arel/Arel/UpdateManager
# we use this approach to ensure atomicity
def track_processed_dossier(success, dossier)
transaction do transaction do
dossier.update(batch_operation: nil) dossier.update(batch_operation: nil)
reload reload
@ -40,7 +59,7 @@ class BatchOperation < ApplicationRecord
values.push([arel_table[:run_at], Time.zone.now]) if called_for_first_time? values.push([arel_table[:run_at], Time.zone.now]) if called_for_first_time?
values.push([arel_table[:finished_at], Time.zone.now]) if called_for_last_time? values.push([arel_table[:finished_at], Time.zone.now]) if called_for_last_time?
if success if success
values.push([arel_table[:success_dossier_ids],Arel::Nodes::NamedFunction.new('array_append', [arel_table[:success_dossier_ids], dossier.id])]) values.push([arel_table[:success_dossier_ids], Arel::Nodes::NamedFunction.new('array_append', [arel_table[:success_dossier_ids], dossier.id])])
values.push([arel_table[:failed_dossier_ids], Arel::Nodes::NamedFunction.new('array_remove', [arel_table[:failed_dossier_ids], dossier.id])]) values.push([arel_table[:failed_dossier_ids], Arel::Nodes::NamedFunction.new('array_remove', [arel_table[:failed_dossier_ids], dossier.id])])
else else
values.push([arel_table[:failed_dossier_ids], Arel::Nodes::NamedFunction.new('array_append', [arel_table[:failed_dossier_ids], dossier.id])]) values.push([arel_table[:failed_dossier_ids], Arel::Nodes::NamedFunction.new('array_append', [arel_table[:failed_dossier_ids], dossier.id])])
@ -50,39 +69,32 @@ class BatchOperation < ApplicationRecord
end end
end end
def arel_table # when an instructeur want to create a batch from his interface,
BatchOperation.arel_table # another one might have run something on one of the dossier
end # we use this approach to create a batch with given dossiers safely
def self.safe_create!(params)
def process_one(dossier) transaction do
case operation instance = new(params)
when BatchOperation.operations.fetch(:archiver) instance.dossiers = instance.dossiers_safe_scope(params[:dossier_ids])
dossier.archiver!(instructeur) .not_having_batch_operation
instance.save!
BatchOperationEnqueueAllJob.perform_later(instance)
instance
end end
true
end end
def called_for_first_time? def called_for_first_time?
run_at.nil? run_at.nil?
end end
def called_for_last_time? # beware, must be reloaded first # beware, must be reloaded first
def called_for_last_time?
dossiers.count.zero? dossiers.count.zero?
end end
private private
# safer enqueue, in case instructeur kept the page for some time and their is a Dossier.id which does not fit current transaction def arel_table
def dossiers_safe_scope BatchOperation.arel_table
query = Dossier.joins(:procedure)
.where(procedure: { id: instructeur.procedures.ids })
.where(id: dossiers.ids)
.visible_by_administration
# case operation
# when BatchOperation.operations.fetch(:archiver) then
# query.not_archived
# when BatchOperation.operations.fetch(:accepter) then
# query.state_en_instruction
# end
end end
end end

View file

@ -415,6 +415,7 @@ class Dossier < ApplicationRecord
end end
end end
scope :not_having_batch_operation, -> { where(batch_operation_id: nil) }
accepts_nested_attributes_for :individual accepts_nested_attributes_for :individual
delegate :siret, :siren, to: :etablissement, allow_nil: true delegate :siret, :siren, to: :etablissement, allow_nil: true

View file

@ -2,35 +2,35 @@
describe Instructeurs::BatchOperationsController, type: :controller do describe Instructeurs::BatchOperationsController, type: :controller do
let(:instructeur) { create(:instructeur) } let(:instructeur) { create(:instructeur) }
let(:procedure) { create(:procedure, :published, :for_individual, instructeurs: [instructeur]) } let(:procedure) { create(:simple_procedure, instructeurs: [instructeur]) }
let!(:dossier) { create(:dossier, :en_construction, :with_individual, procedure: procedure) } let(:dossier) { create(:dossier, :accepte, :with_individual, procedure: procedure) }
let(:params) do
{
procedure_id: procedure.id,
batch_operation: {
operation: BatchOperation.operations.fetch(:archiver),
dossier_ids: [dossier.id]
}
}
end
describe '#POST create' do describe '#POST create' do
before { sign_in(instructeur.user) } before { sign_in(instructeur.user) }
subject { post :create, params: params }
context 'ACL' do context 'ACL' do
subject { post :create, params: { procedure_id: create(:procedure).id } } let(:params) do
before { sign_in(instructeur.user) } { procedure_id: create(:procedure).id }
end
it 'fails when procedure does not belongs to instructeur' do it 'fails when procedure does not belongs to instructeur' do
expect(subject).to have_http_status(302) expect(subject).to have_http_status(302)
end end
end end
context 'success' do context 'success with valid dossier_ids' do
let(:params) do
{
procedure_id: procedure.id,
batch_operation: {
operation: BatchOperation.operations.fetch(:archiver),
dossier_ids: [dossier.id]
}
}
end
subject { post :create, params: params }
before { sign_in(instructeur.user) }
it 'creates a batch operation for our signed in instructeur' do it 'creates a batch operation for our signed in instructeur' do
expect { subject }.to change { instructeur.batch_operations.count }.by(1) expect { subject }.to change { instructeur.batch_operations.count }.by(1)
expect(BatchOperation.first.dossiers).to include(dossier)
end end
it 'created a batch operation contains dossiers' do it 'created a batch operation contains dossiers' do
subject subject

View file

@ -3,14 +3,15 @@ FactoryBot.define do
transient do transient do
invalid_instructeur { nil } invalid_instructeur { nil }
end end
trait :archiver do trait :archiver do
operation { BatchOperation.operations.fetch(:archiver) } operation { BatchOperation.operations.fetch(:archiver) }
after(:build) do |batch_operation, _evaluator| after(:build) do |batch_operation, _evaluator|
procedure = create(:procedure, instructeurs: [_evaluator.invalid_instructeur.presence || batch_operation.instructeur]) procedure = create(:simple_procedure, :published, instructeurs: [_evaluator.invalid_instructeur.presence || batch_operation.instructeur], administrateurs: [create(:administrateur)])
batch_operation.dossiers = [ batch_operation.dossiers = [
build(:dossier, :accepte, procedure: procedure), create(:dossier, :with_individual, :accepte, procedure: procedure),
build(:dossier, :refuse, procedure: procedure), create(:dossier, :with_individual, :refuse, procedure: procedure),
build(:dossier, :sans_suite, procedure: procedure) create(:dossier, :with_individual, :sans_suite, procedure: procedure)
] ]
end end
end end

View file

@ -8,71 +8,41 @@ describe BatchOperationProcessOneJob, type: :job do
subject { BatchOperationProcessOneJob.new(batch_operation, dossier_job) } subject { BatchOperationProcessOneJob.new(batch_operation, dossier_job) }
let(:options) { {} } let(:options) { {} }
it 'just call the process one' do it 'when it works' do
allow_any_instance_of(BatchOperation).to receive(:process_one).with(dossier_job).and_return(true)
expect { subject.perform_now } expect { subject.perform_now }
.to change { dossier_job.reload.archived } .to change { batch_operation.reload.success_dossier_ids }
.from(false) .from([])
.to(true) .to([dossier_job.id])
end end
it 'unlock the dossier' do it 'when it fails for an "unknown" reason' do
expect { subject.perform_now } allow_any_instance_of(BatchOperation).to receive(:process_one).with(dossier_job).and_raise("boom")
.to change { dossier_job.reload.batch_operation } expect { subject.perform_now }.to raise_error('boom')
.from(batch_operation)
.to(nil) expect(batch_operation.reload.failed_dossier_ids).to eq([dossier_job.id])
end end
context 'when it succeed' do context 'when the dossier is out of sync (ie: someone applied a transition somewhere we do not know)' do
it 'pushes dossier_job id to batch_operation.success_dossier_ids' do let(:instructeur) { create(:instructeur) }
expect { subject.perform_now } let(:procedure) { create(:simple_procedure, instructeurs: [instructeur]) }
.to change { batch_operation.reload.success_dossier_ids } let(:dossier) { create(:dossier, :accepte, :with_individual, archived: true, procedure: procedure) }
.from([]) let(:batch_operation) { create(:batch_operation, operation: :archiver, instructeur: instructeur, dossiers: [dossier]) }
.to([dossier_job.id])
it 'does run process_one' do
allow(batch_operation).to receive(:process_one).and_raise("should have been prevented")
subject.perform_now
end end
end
context 'when it fails' do it 'when it fails from dossiers_safe_scope.find' do
it 'pushes dossier_job id to batch_operation.failed_dossier_ids' do scope = double
expect(batch_operation).to receive(:process_one).with(dossier_job).and_raise("KO") expect(scope).to receive(:find).with(dossier_job.id).and_raise(ActiveRecord::RecordNotFound)
expect { subject.perform_now }.to raise_error("KO") expect_any_instance_of(BatchOperation).to receive(:dossiers_safe_scope).and_return(scope)
expect(batch_operation.reload.failed_dossier_ids).to eq([dossier_job.id])
end
end
context 'when it is the first job' do subject.perform_now
it 'sets run_at at first' do
run_at = 2.minutes.ago
Timecop.freeze(run_at) do
expect { subject.perform_now }
.to change { batch_operation.reload.run_at }
.from(nil)
.to(run_at)
end
end
end
context 'when it is the second job (meaning run_at was already set) but not the last' do expect(batch_operation.reload.failed_dossier_ids).to eq([])
let(:preview_run_at) { 2.days.ago } expect(batch_operation.dossiers).not_to include(dossier_job)
let(:options) { { run_at: preview_run_at } }
it 'does not change run_at' do
expect { subject.perform_now }.not_to change { batch_operation.reload.run_at }
end
end
context 'when it is the last job' do
before do
batch_operation.dossiers
.where.not(id: dossier_job.id)
.update_all(batch_operation_id: nil)
end
it 'sets finished_at' do
finished_at = Time.zone.now
Timecop.freeze(finished_at) do
expect { subject.perform_now }
.to change { batch_operation.reload.finished_at }
.from(nil)
.to(finished_at)
end
end end
end end
end end

View file

@ -18,18 +18,7 @@ describe BatchOperation, type: :model do
it { is_expected.to validate_presence_of(:operation) } it { is_expected.to validate_presence_of(:operation) }
end end
describe 'enqueue_all' do describe '#enqueue_all' do
context 'given dossier_ids not in instructeur procedures' do
subject do
create(:batch_operation, :archiver, instructeur: create(:instructeur), invalid_instructeur: create(:instructeur))
end
it 'does not enqueues any BatchOperationProcessOneJob' do
expect { subject.enqueue_all() }
.not_to have_enqueued_job(BatchOperationProcessOneJob)
end
end
context 'given dossier_ids in instructeur procedures' do context 'given dossier_ids in instructeur procedures' do
subject do subject do
create(:batch_operation, :archiver, instructeur: create(:instructeur)) create(:batch_operation, :archiver, instructeur: create(:instructeur))
@ -42,6 +31,148 @@ describe BatchOperation, type: :model do
.with(subject, subject.dossiers.second) .with(subject, subject.dossiers.second)
.with(subject, subject.dossiers.third) .with(subject, subject.dossiers.third)
end end
it 'pass through dossiers_safe_scope' do
expect(subject).to receive(:dossiers_safe_scope).and_return(subject.dossiers)
subject.enqueue_all
end
end
end
describe '#track_processed_dossier' do
let(:instructeur) { create(:instructeur) }
let(:procedure) { create(:simple_procedure, instructeurs: [instructeur]) }
let(:dossier) { create(:dossier, :accepte, :with_individual, archived: true, procedure: procedure) }
let(:batch_operation) { create(:batch_operation, operation: :archiver, instructeur: instructeur, dossiers: [dossier]) }
it 'unlock the dossier' do
expect { batch_operation.track_processed_dossier(true, dossier) }
.to change { dossier.reload.batch_operation }
.from(batch_operation)
.to(nil)
end
context 'when it succeed' do
it 'pushes dossier_job id to batch_operation.success_dossier_ids' do
expect { batch_operation.track_processed_dossier(true, dossier) }
.to change { batch_operation.reload.success_dossier_ids }
.from([])
.to([dossier.id])
end
end
context 'when it succeed after a failure' do
let(:batch_operation) { create(:batch_operation, operation: :archiver, instructeur: instructeur, dossiers: [dossier], failed_dossier_ids: [dossier.id]) }
it 'remove former dossier id from failed_dossier_ids' do
expect { batch_operation.track_processed_dossier(true, dossier) }
.to change { batch_operation.reload.failed_dossier_ids }
.from([dossier.id])
.to([])
end
end
context 'when it fails' do
it 'pushes dossier_job id to batch_operation.failed_dossier_ids' do
expect { batch_operation.track_processed_dossier(false, dossier) }
.to change { batch_operation.reload.failed_dossier_ids }
.from([])
.to([dossier.id])
end
end
context 'when it is the first job' do
it 'sets run_at at first' do
run_at = 2.minutes.ago
Timecop.freeze(run_at) do
expect { batch_operation.track_processed_dossier(false, dossier) }
.to change { batch_operation.reload.run_at }
.from(nil)
.to(run_at)
end
end
end
context 'when it is the second job (meaning run_at was already set) but not the last' do
let(:batch_operation) { create(:batch_operation, operation: :archiver, instructeur: instructeur, dossiers: [dossier], run_at: 2.days.ago) }
it 'does not change run_at' do
expect { batch_operation.track_processed_dossier(true, dossier) }
.not_to change { batch_operation.reload.run_at }
end
end
context 'when it is the last job' do
it 'sets finished_at' do
finished_at = Time.zone.now
Timecop.freeze(finished_at) do
expect { batch_operation.track_processed_dossier(true, dossier) }
.to change { batch_operation.reload.finished_at }
.from(nil)
.to(finished_at)
end
end
end
end
describe '#dossiers_safe_scope (with archiver)' do
let(:instructeur) { create(:instructeur) }
let(:procedure) { create(:simple_procedure, instructeurs: [instructeur]) }
let(:batch_operation) { create(:batch_operation, operation: :archiver, instructeur: instructeur, dossiers: [dossier]) }
context 'when dossier is valid' do
let(:dossier) { create(:dossier, :accepte, :with_individual, procedure: procedure) }
it 'find dosssier' do
expect(batch_operation.dossiers_safe_scope).to include(dossier)
end
end
context 'when dossier is already arcvhied' do
let(:dossier) { create(:dossier, :accepte, :with_individual, archived: true, procedure: procedure) }
it 'skips dosssier is already archived' do
expect(batch_operation.dossiers_safe_scope).not_to include(dossier)
end
end
context 'when dossier is not in state termine' do
let(:dossier) { create(:dossier, :en_instruction, :with_individual, procedure: procedure) }
it 'does not enqueue any job' do
expect(batch_operation.dossiers_safe_scope).not_to include(dossier)
end
end
context 'when dossier is not in instructeur procedures' do
let(:dossier) { create(:dossier, :accepte, :with_individual, procedure: create(:simple_procedure)) }
it 'does not enqueues any BatchOperationProcessOneJob' do
expect(batch_operation.dossiers_safe_scope).not_to include(dossier)
end
end
end
describe '#safe_create!' do
let(:instructeur) { create(:instructeur) }
let(:procedure) { create(:simple_procedure, instructeurs: [instructeur]) }
subject { BatchOperation.safe_create!(instructeur: instructeur, operation: :archiver, dossier_ids: [dossier.id]) }
context 'success with divergent list of dossier_ids' do
let(:dossier) { create(:dossier, :accepte, :with_individual, archived: true, procedure: procedure) }
it 'does not keep archived dossier within batch_operation.dossiers' do
expect(subject.dossiers).not_to include(dossier)
end
it 'enqueue a BatchOperationEnqueueAllJob' do
expect { subject }.to have_enqueued_job(BatchOperationEnqueueAllJob)
end
end
context 'with dossier already in a batch batch_operation' do
let(:dossier) { create(:dossier, :accepte, :with_individual, batch_operation: create(:batch_operation, :archiver, instructeur: instructeur), procedure: procedure) }
it 'does not keep dossier in batch_operation' do
expect(subject.dossiers).not_to include(dossier)
end
end end
end end
end end