From eaf72162da799a94f09b2f80c0e2aa47b074c839 Mon Sep 17 00:00:00 2001 From: Martin Date: Fri, 25 Nov 2022 15:43:00 +0100 Subject: [PATCH] poc(batch_operation): some rewrite to avoid various conflict (when an instructeur try to create a job with an incompatible dossier regarding the current task). also soome cleanup to isole spec in least involved model --- .../batch_operations_controller.rb | 9 +- app/jobs/batch_operation_process_one_job.rb | 13 +- app/models/batch_operation.rb | 66 +++++--- app/models/dossier.rb | 1 + .../batch_operations_controller_spec.rb | 34 ++-- spec/factories/batch_operation.rb | 9 +- .../batch_operation_process_one_job_spec.rb | 82 +++------ spec/models/batch_operation_spec.rb | 155 ++++++++++++++++-- 8 files changed, 240 insertions(+), 129 deletions(-) diff --git a/app/controllers/instructeurs/batch_operations_controller.rb b/app/controllers/instructeurs/batch_operations_controller.rb index 722bdacdb..b70e51def 100644 --- a/app/controllers/instructeurs/batch_operations_controller.rb +++ b/app/controllers/instructeurs/batch_operations_controller.rb @@ -4,10 +4,7 @@ module Instructeurs before_action :ensure_ownership! def create - ActiveRecord::Base.transaction do - batch_operation = BatchOperation.create!(batch_operation_params.merge(instructeur: current_instructeur)) - BatchOperationEnqueueAllJob.perform_later(batch_operation) - end + BatchOperation.safe_create!(batch_operation_params.merge(instructeur: current_instructeur)) redirect_back(fallback_location: instructeur_procedure_url(@procedure.id)) end @@ -15,9 +12,7 @@ module Instructeurs def batch_operation_params params.require(:batch_operation) - .permit(:operation, dossier_ids: []).tap do |params| - # TODO: filter dossiers_ids out of instructeurs.dossiers.ids - end + .permit(:operation, dossier_ids: []) end def set_procedure diff --git a/app/jobs/batch_operation_process_one_job.rb b/app/jobs/batch_operation_process_one_job.rb index 528358097..10f3f2547 100644 --- a/app/jobs/batch_operation_process_one_job.rb +++ b/app/jobs/batch_operation_process_one_job.rb @@ -1,15 +1,16 @@ class BatchOperationProcessOneJob < ApplicationJob - # what about wrapping all of that in a transaction - # but, what about nested transaction because batch_operation.process_one(dossier) can run transaction + retry_on StandardError, attempts: 1 + def perform(batch_operation, dossier) - success = true + dossier = batch_operation.dossiers_safe_scope.find(dossier.id) begin batch_operation.process_one(dossier) + batch_operation.track_processed_dossier(true, dossier) rescue => error - success = false + batch_operation.track_processed_dossier(false, dossier) raise error - ensure - batch_operation.track_dossier_processed(success, dossier) end + rescue ActiveRecord::RecordNotFound + dossier.update(batch_operation_id: nil) end end diff --git a/app/models/batch_operation.rb b/app/models/batch_operation.rb index 53e63de8f..3c37233cd 100644 --- a/app/models/batch_operation.rb +++ b/app/models/batch_operation.rb @@ -24,14 +24,33 @@ class BatchOperation < ApplicationRecord validates :operation, presence: true - def enqueue_all - Dossier.joins(:procedure) + def dossiers_safe_scope(dossier_ids = self.dossier_ids) + query = Dossier.joins(:procedure) .where(procedure: { id: instructeur.procedures.ids }) - .where(id: dossiers.ids) + .where(id: dossier_ids) + .visible_by_administration + case operation + when BatchOperation.operations.fetch(:archiver) then + query.not_archived.state_termine + end + end + + def enqueue_all + dossiers_safe_scope # later in batch . .map { |dossier| BatchOperationProcessOneJob.perform_later(self, dossier) } end - def track_dossier_processed(success, dossier) + def process_one(dossier) + case operation + when BatchOperation.operations.fetch(:archiver) + dossier.archiver!(instructeur) + end + end + + # use Arel::UpdateManager for array_append/array_remove (inspired by atomic_append) + # see: https://www.rubydoc.info/gems/arel/Arel/UpdateManager + # we use this approach to ensure atomicity + def track_processed_dossier(success, dossier) transaction do dossier.update(batch_operation: nil) reload @@ -40,7 +59,7 @@ class BatchOperation < ApplicationRecord values.push([arel_table[:run_at], Time.zone.now]) if called_for_first_time? values.push([arel_table[:finished_at], Time.zone.now]) if called_for_last_time? if success - values.push([arel_table[:success_dossier_ids],Arel::Nodes::NamedFunction.new('array_append', [arel_table[:success_dossier_ids], dossier.id])]) + values.push([arel_table[:success_dossier_ids], Arel::Nodes::NamedFunction.new('array_append', [arel_table[:success_dossier_ids], dossier.id])]) values.push([arel_table[:failed_dossier_ids], Arel::Nodes::NamedFunction.new('array_remove', [arel_table[:failed_dossier_ids], dossier.id])]) else values.push([arel_table[:failed_dossier_ids], Arel::Nodes::NamedFunction.new('array_append', [arel_table[:failed_dossier_ids], dossier.id])]) @@ -50,39 +69,32 @@ class BatchOperation < ApplicationRecord end end - def arel_table - BatchOperation.arel_table - end - - def process_one(dossier) - case operation - when BatchOperation.operations.fetch(:archiver) - dossier.archiver!(instructeur) + # when an instructeur want to create a batch from his interface, + # another one might have run something on one of the dossier + # we use this approach to create a batch with given dossiers safely + def self.safe_create!(params) + transaction do + instance = new(params) + instance.dossiers = instance.dossiers_safe_scope(params[:dossier_ids]) + .not_having_batch_operation + instance.save! + BatchOperationEnqueueAllJob.perform_later(instance) + instance end - true end def called_for_first_time? run_at.nil? end - def called_for_last_time? # beware, must be reloaded first + # beware, must be reloaded first + def called_for_last_time? dossiers.count.zero? end private - # safer enqueue, in case instructeur kept the page for some time and their is a Dossier.id which does not fit current transaction - def dossiers_safe_scope - query = Dossier.joins(:procedure) - .where(procedure: { id: instructeur.procedures.ids }) - .where(id: dossiers.ids) - .visible_by_administration - # case operation - # when BatchOperation.operations.fetch(:archiver) then - # query.not_archived - # when BatchOperation.operations.fetch(:accepter) then - # query.state_en_instruction - # end + def arel_table + BatchOperation.arel_table end end diff --git a/app/models/dossier.rb b/app/models/dossier.rb index 95117be0f..8ebaef95f 100644 --- a/app/models/dossier.rb +++ b/app/models/dossier.rb @@ -415,6 +415,7 @@ class Dossier < ApplicationRecord end end + scope :not_having_batch_operation, -> { where(batch_operation_id: nil) } accepts_nested_attributes_for :individual delegate :siret, :siren, to: :etablissement, allow_nil: true diff --git a/spec/controllers/instructeurs/batch_operations_controller_spec.rb b/spec/controllers/instructeurs/batch_operations_controller_spec.rb index 1ed5c7d38..6c739d443 100644 --- a/spec/controllers/instructeurs/batch_operations_controller_spec.rb +++ b/spec/controllers/instructeurs/batch_operations_controller_spec.rb @@ -2,35 +2,35 @@ describe Instructeurs::BatchOperationsController, type: :controller do let(:instructeur) { create(:instructeur) } - let(:procedure) { create(:procedure, :published, :for_individual, instructeurs: [instructeur]) } - let!(:dossier) { create(:dossier, :en_construction, :with_individual, procedure: procedure) } + let(:procedure) { create(:simple_procedure, instructeurs: [instructeur]) } + let(:dossier) { create(:dossier, :accepte, :with_individual, procedure: procedure) } + let(:params) do + { + procedure_id: procedure.id, + batch_operation: { + operation: BatchOperation.operations.fetch(:archiver), + dossier_ids: [dossier.id] + } + } + end describe '#POST create' do before { sign_in(instructeur.user) } + subject { post :create, params: params } context 'ACL' do - subject { post :create, params: { procedure_id: create(:procedure).id } } - before { sign_in(instructeur.user) } + let(:params) do + { procedure_id: create(:procedure).id } + end + it 'fails when procedure does not belongs to instructeur' do expect(subject).to have_http_status(302) end end - context 'success' do - let(:params) do - { - procedure_id: procedure.id, - batch_operation: { - operation: BatchOperation.operations.fetch(:archiver), - dossier_ids: [dossier.id] - } - } - end - subject { post :create, params: params } - before { sign_in(instructeur.user) } + context 'success with valid dossier_ids' do it 'creates a batch operation for our signed in instructeur' do expect { subject }.to change { instructeur.batch_operations.count }.by(1) - expect(BatchOperation.first.dossiers).to include(dossier) end it 'created a batch operation contains dossiers' do subject diff --git a/spec/factories/batch_operation.rb b/spec/factories/batch_operation.rb index 3c5bcb068..a59101aa5 100644 --- a/spec/factories/batch_operation.rb +++ b/spec/factories/batch_operation.rb @@ -3,14 +3,15 @@ FactoryBot.define do transient do invalid_instructeur { nil } end + trait :archiver do operation { BatchOperation.operations.fetch(:archiver) } after(:build) do |batch_operation, _evaluator| - procedure = create(:procedure, instructeurs: [_evaluator.invalid_instructeur.presence || batch_operation.instructeur]) + procedure = create(:simple_procedure, :published, instructeurs: [_evaluator.invalid_instructeur.presence || batch_operation.instructeur], administrateurs: [create(:administrateur)]) batch_operation.dossiers = [ - build(:dossier, :accepte, procedure: procedure), - build(:dossier, :refuse, procedure: procedure), - build(:dossier, :sans_suite, procedure: procedure) + create(:dossier, :with_individual, :accepte, procedure: procedure), + create(:dossier, :with_individual, :refuse, procedure: procedure), + create(:dossier, :with_individual, :sans_suite, procedure: procedure) ] end end diff --git a/spec/jobs/batch_operation_process_one_job_spec.rb b/spec/jobs/batch_operation_process_one_job_spec.rb index 569e9212c..3ad411175 100644 --- a/spec/jobs/batch_operation_process_one_job_spec.rb +++ b/spec/jobs/batch_operation_process_one_job_spec.rb @@ -8,71 +8,41 @@ describe BatchOperationProcessOneJob, type: :job do subject { BatchOperationProcessOneJob.new(batch_operation, dossier_job) } let(:options) { {} } - it 'just call the process one' do + it 'when it works' do + allow_any_instance_of(BatchOperation).to receive(:process_one).with(dossier_job).and_return(true) expect { subject.perform_now } - .to change { dossier_job.reload.archived } - .from(false) - .to(true) + .to change { batch_operation.reload.success_dossier_ids } + .from([]) + .to([dossier_job.id]) end - it 'unlock the dossier' do - expect { subject.perform_now } - .to change { dossier_job.reload.batch_operation } - .from(batch_operation) - .to(nil) + it 'when it fails for an "unknown" reason' do + allow_any_instance_of(BatchOperation).to receive(:process_one).with(dossier_job).and_raise("boom") + expect { subject.perform_now }.to raise_error('boom') + + expect(batch_operation.reload.failed_dossier_ids).to eq([dossier_job.id]) end - context 'when it succeed' do - it 'pushes dossier_job id to batch_operation.success_dossier_ids' do - expect { subject.perform_now } - .to change { batch_operation.reload.success_dossier_ids } - .from([]) - .to([dossier_job.id]) + context 'when the dossier is out of sync (ie: someone applied a transition somewhere we do not know)' do + let(:instructeur) { create(:instructeur) } + let(:procedure) { create(:simple_procedure, instructeurs: [instructeur]) } + let(:dossier) { create(:dossier, :accepte, :with_individual, archived: true, procedure: procedure) } + let(:batch_operation) { create(:batch_operation, operation: :archiver, instructeur: instructeur, dossiers: [dossier]) } + + it 'does run process_one' do + allow(batch_operation).to receive(:process_one).and_raise("should have been prevented") + subject.perform_now end - end - context 'when it fails' do - it 'pushes dossier_job id to batch_operation.failed_dossier_ids' do - expect(batch_operation).to receive(:process_one).with(dossier_job).and_raise("KO") - expect { subject.perform_now }.to raise_error("KO") - expect(batch_operation.reload.failed_dossier_ids).to eq([dossier_job.id]) - end - end + it 'when it fails from dossiers_safe_scope.find' do + scope = double + expect(scope).to receive(:find).with(dossier_job.id).and_raise(ActiveRecord::RecordNotFound) + expect_any_instance_of(BatchOperation).to receive(:dossiers_safe_scope).and_return(scope) - context 'when it is the first job' do - it 'sets run_at at first' do - run_at = 2.minutes.ago - Timecop.freeze(run_at) do - expect { subject.perform_now } - .to change { batch_operation.reload.run_at } - .from(nil) - .to(run_at) - end - end - end + subject.perform_now - context 'when it is the second job (meaning run_at was already set) but not the last' do - let(:preview_run_at) { 2.days.ago } - let(:options) { { run_at: preview_run_at } } - it 'does not change run_at' do - expect { subject.perform_now }.not_to change { batch_operation.reload.run_at } - end - end - - context 'when it is the last job' do - before do - batch_operation.dossiers - .where.not(id: dossier_job.id) - .update_all(batch_operation_id: nil) - end - it 'sets finished_at' do - finished_at = Time.zone.now - Timecop.freeze(finished_at) do - expect { subject.perform_now } - .to change { batch_operation.reload.finished_at } - .from(nil) - .to(finished_at) - end + expect(batch_operation.reload.failed_dossier_ids).to eq([]) + expect(batch_operation.dossiers).not_to include(dossier_job) end end end diff --git a/spec/models/batch_operation_spec.rb b/spec/models/batch_operation_spec.rb index bbc64c1c3..48e47ed9f 100644 --- a/spec/models/batch_operation_spec.rb +++ b/spec/models/batch_operation_spec.rb @@ -18,18 +18,7 @@ describe BatchOperation, type: :model do it { is_expected.to validate_presence_of(:operation) } end - describe 'enqueue_all' do - context 'given dossier_ids not in instructeur procedures' do - subject do - create(:batch_operation, :archiver, instructeur: create(:instructeur), invalid_instructeur: create(:instructeur)) - end - - it 'does not enqueues any BatchOperationProcessOneJob' do - expect { subject.enqueue_all() } - .not_to have_enqueued_job(BatchOperationProcessOneJob) - end - end - + describe '#enqueue_all' do context 'given dossier_ids in instructeur procedures' do subject do create(:batch_operation, :archiver, instructeur: create(:instructeur)) @@ -42,6 +31,148 @@ describe BatchOperation, type: :model do .with(subject, subject.dossiers.second) .with(subject, subject.dossiers.third) end + + it 'pass through dossiers_safe_scope' do + expect(subject).to receive(:dossiers_safe_scope).and_return(subject.dossiers) + subject.enqueue_all + end + end + end + + describe '#track_processed_dossier' do + let(:instructeur) { create(:instructeur) } + let(:procedure) { create(:simple_procedure, instructeurs: [instructeur]) } + let(:dossier) { create(:dossier, :accepte, :with_individual, archived: true, procedure: procedure) } + let(:batch_operation) { create(:batch_operation, operation: :archiver, instructeur: instructeur, dossiers: [dossier]) } + + it 'unlock the dossier' do + expect { batch_operation.track_processed_dossier(true, dossier) } + .to change { dossier.reload.batch_operation } + .from(batch_operation) + .to(nil) + end + + context 'when it succeed' do + it 'pushes dossier_job id to batch_operation.success_dossier_ids' do + expect { batch_operation.track_processed_dossier(true, dossier) } + .to change { batch_operation.reload.success_dossier_ids } + .from([]) + .to([dossier.id]) + end + end + + context 'when it succeed after a failure' do + let(:batch_operation) { create(:batch_operation, operation: :archiver, instructeur: instructeur, dossiers: [dossier], failed_dossier_ids: [dossier.id]) } + it 'remove former dossier id from failed_dossier_ids' do + expect { batch_operation.track_processed_dossier(true, dossier) } + .to change { batch_operation.reload.failed_dossier_ids } + .from([dossier.id]) + .to([]) + end + end + + context 'when it fails' do + it 'pushes dossier_job id to batch_operation.failed_dossier_ids' do + expect { batch_operation.track_processed_dossier(false, dossier) } + .to change { batch_operation.reload.failed_dossier_ids } + .from([]) + .to([dossier.id]) + end + end + + context 'when it is the first job' do + it 'sets run_at at first' do + run_at = 2.minutes.ago + Timecop.freeze(run_at) do + expect { batch_operation.track_processed_dossier(false, dossier) } + .to change { batch_operation.reload.run_at } + .from(nil) + .to(run_at) + end + end + end + + context 'when it is the second job (meaning run_at was already set) but not the last' do + let(:batch_operation) { create(:batch_operation, operation: :archiver, instructeur: instructeur, dossiers: [dossier], run_at: 2.days.ago) } + it 'does not change run_at' do + expect { batch_operation.track_processed_dossier(true, dossier) } + .not_to change { batch_operation.reload.run_at } + end + end + + context 'when it is the last job' do + it 'sets finished_at' do + finished_at = Time.zone.now + Timecop.freeze(finished_at) do + expect { batch_operation.track_processed_dossier(true, dossier) } + .to change { batch_operation.reload.finished_at } + .from(nil) + .to(finished_at) + end + end + end + end + + describe '#dossiers_safe_scope (with archiver)' do + let(:instructeur) { create(:instructeur) } + let(:procedure) { create(:simple_procedure, instructeurs: [instructeur]) } + let(:batch_operation) { create(:batch_operation, operation: :archiver, instructeur: instructeur, dossiers: [dossier]) } + + context 'when dossier is valid' do + let(:dossier) { create(:dossier, :accepte, :with_individual, procedure: procedure) } + + it 'find dosssier' do + expect(batch_operation.dossiers_safe_scope).to include(dossier) + end + end + context 'when dossier is already arcvhied' do + let(:dossier) { create(:dossier, :accepte, :with_individual, archived: true, procedure: procedure) } + + it 'skips dosssier is already archived' do + expect(batch_operation.dossiers_safe_scope).not_to include(dossier) + end + end + + context 'when dossier is not in state termine' do + let(:dossier) { create(:dossier, :en_instruction, :with_individual, procedure: procedure) } + + it 'does not enqueue any job' do + expect(batch_operation.dossiers_safe_scope).not_to include(dossier) + end + end + + context 'when dossier is not in instructeur procedures' do + let(:dossier) { create(:dossier, :accepte, :with_individual, procedure: create(:simple_procedure)) } + + it 'does not enqueues any BatchOperationProcessOneJob' do + expect(batch_operation.dossiers_safe_scope).not_to include(dossier) + end + end + end + + describe '#safe_create!' do + let(:instructeur) { create(:instructeur) } + let(:procedure) { create(:simple_procedure, instructeurs: [instructeur]) } + subject { BatchOperation.safe_create!(instructeur: instructeur, operation: :archiver, dossier_ids: [dossier.id]) } + + context 'success with divergent list of dossier_ids' do + let(:dossier) { create(:dossier, :accepte, :with_individual, archived: true, procedure: procedure) } + + it 'does not keep archived dossier within batch_operation.dossiers' do + expect(subject.dossiers).not_to include(dossier) + end + + it 'enqueue a BatchOperationEnqueueAllJob' do + expect { subject }.to have_enqueued_job(BatchOperationEnqueueAllJob) + end + end + + context 'with dossier already in a batch batch_operation' do + let(:dossier) { create(:dossier, :accepte, :with_individual, batch_operation: create(:batch_operation, :archiver, instructeur: instructeur), procedure: procedure) } + + it 'does not keep dossier in batch_operation' do + expect(subject.dossiers).not_to include(dossier) + end end end end