refactor(DownloadManager): extract parallel download in dedicated class. move error management in custom class for procedure exports using the didicated class
This commit is contained in:
parent
2ed9cccba0
commit
ce1b189dcd
7 changed files with 3879 additions and 104 deletions
|
@ -1,87 +0,0 @@
|
|||
require 'async'
|
||||
require 'async/barrier'
|
||||
require 'async/http/internet'
|
||||
|
||||
class ActiveStorage::DownloadManager
|
||||
include Utils::Retryable
|
||||
DOWNLOAD_MAX_PARALLEL = ENV.fetch('DOWNLOAD_MAX_PARALLEL') { 10 }
|
||||
|
||||
attr_reader :download_to_dir, :errors
|
||||
|
||||
def download_all(attachments:, on_failure:)
|
||||
Async do
|
||||
internet = Async::HTTP::Internet.new
|
||||
barrier = Async::Barrier.new
|
||||
semaphore = Async::Semaphore.new(DOWNLOAD_MAX_PARALLEL, parent: barrier)
|
||||
|
||||
attachments.map do |attachment, path|
|
||||
semaphore.async do
|
||||
begin
|
||||
with_retry(max_attempt: 1) do
|
||||
download_one(attachment: attachment,
|
||||
path_in_download_dir: path,
|
||||
async_internet: internet)
|
||||
end
|
||||
rescue => e
|
||||
on_failure.call(attachment, path, e)
|
||||
end
|
||||
end
|
||||
end
|
||||
barrier.wait
|
||||
write_error_manifest if !errors.empty?
|
||||
ensure
|
||||
internet&.close
|
||||
end
|
||||
end
|
||||
|
||||
# beware, must be re-entrant because retryable
|
||||
def download_one(attachment:, path_in_download_dir:, async_internet:)
|
||||
byte_written = 0
|
||||
attachment_path = File.join(download_to_dir, path_in_download_dir)
|
||||
attachment_dir = File.dirname(attachment_path)
|
||||
|
||||
FileUtils.mkdir_p(attachment_dir) if !Dir.exist?(attachment_dir) # defensive, do not write in undefined dir
|
||||
if attachment.is_a?(PiecesJustificativesService::FakeAttachment)
|
||||
byte_written = File.write(attachment_path, attachment.file.read, mode: 'wb')
|
||||
else
|
||||
response = async_internet.get(attachment.url)
|
||||
File.open(attachment_path, mode: 'wb') do |fd|
|
||||
response.body.each do |chunk|
|
||||
byte_written = byte_written + fd.write(chunk)
|
||||
end
|
||||
response.body.close
|
||||
end
|
||||
end
|
||||
track_retryable_download_state(attachment_path: attachment_path, state: true) # -> fail once, success after -> no failure
|
||||
byte_written
|
||||
rescue
|
||||
track_retryable_download_state(attachment_path: attachment_path, state: false) #
|
||||
File.delete(attachment_path) if File.exist?(attachment_path) # -> case of retries failed, must cleanup partialy downloaded file
|
||||
raise
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def initialize(download_to_dir:)
|
||||
@download_to_dir = download_to_dir
|
||||
@errors = {}
|
||||
end
|
||||
|
||||
def track_retryable_download_state(attachment_path:, state:)
|
||||
key = File.basename(attachment_path)
|
||||
if state
|
||||
errors.delete(key) # do not keep track of success, otherwise errors map grows
|
||||
else
|
||||
errors[key] = state
|
||||
end
|
||||
end
|
||||
|
||||
def write_error_manifest
|
||||
manifest_path = File.join(download_to_dir, 'LISEZMOI.txt')
|
||||
manifest_content = errors.map do |file_basename, _failed|
|
||||
"Impossible de récupérer le fichier #{file_basename}"
|
||||
end
|
||||
.join("\n")
|
||||
File.write(manifest_path, manifest_content)
|
||||
end
|
||||
end
|
67
app/lib/download_manager/parallel_download_queue.rb
Normal file
67
app/lib/download_manager/parallel_download_queue.rb
Normal file
|
@ -0,0 +1,67 @@
|
|||
require 'async'
|
||||
require 'async/barrier'
|
||||
require 'async/http/internet'
|
||||
|
||||
module DownloadManager
|
||||
class ParallelDownloadQueue
|
||||
include Utils::Retryable
|
||||
DOWNLOAD_MAX_PARALLEL = ENV.fetch('DOWNLOAD_MAX_PARALLEL') { 10 }
|
||||
|
||||
attr_accessor :attachments,
|
||||
:destination,
|
||||
:on_error
|
||||
|
||||
def initialize(attachments, destination)
|
||||
@attachments = attachments
|
||||
@destination = destination
|
||||
end
|
||||
|
||||
def download_all
|
||||
Async do
|
||||
http_client = Async::HTTP::Internet.new
|
||||
barrier = Async::Barrier.new
|
||||
semaphore = Async::Semaphore.new(DOWNLOAD_MAX_PARALLEL, parent: barrier)
|
||||
|
||||
attachments.map do |attachment, path|
|
||||
semaphore.async do
|
||||
begin
|
||||
with_retry(max_attempt: 1) do
|
||||
download_one(attachment: attachment,
|
||||
path_in_download_dir: path,
|
||||
http_client: http_client)
|
||||
end
|
||||
rescue => e
|
||||
on_error.call(attachment, path, e)
|
||||
end
|
||||
end
|
||||
end
|
||||
barrier.wait
|
||||
ensure
|
||||
http_client&.close
|
||||
end
|
||||
end
|
||||
|
||||
def download_one(attachment:, path_in_download_dir:, http_client:)
|
||||
byte_written = 0
|
||||
attachment_path = File.join(destination, path_in_download_dir)
|
||||
attachment_dir = File.dirname(attachment_path)
|
||||
|
||||
FileUtils.mkdir_p(attachment_dir) if !Dir.exist?(attachment_dir) # defensive, do not write in undefined dir
|
||||
if attachment.is_a?(PiecesJustificativesService::FakeAttachment)
|
||||
byte_written = File.write(attachment_path, attachment.file.read, mode: 'wb')
|
||||
else
|
||||
response = http_client.get(attachment.url)
|
||||
File.open(attachment_path, mode: 'wb') do |fd|
|
||||
response.body.each do |chunk|
|
||||
byte_written = byte_written + fd.write(chunk)
|
||||
end
|
||||
response.body.close
|
||||
end
|
||||
end
|
||||
byte_written
|
||||
rescue
|
||||
File.delete(attachment_path) if File.exist?(attachment_path) # -> case of retries failed, must cleanup partialy downloaded file
|
||||
raise
|
||||
end
|
||||
end
|
||||
end
|
34
app/lib/download_manager/procedure_attachments_export.rb
Normal file
34
app/lib/download_manager/procedure_attachments_export.rb
Normal file
|
@ -0,0 +1,34 @@
|
|||
module DownloadManager
|
||||
class ProcedureAttachmentsExport
|
||||
delegate :destination, to: :@queue
|
||||
|
||||
attr_reader :queue
|
||||
attr_accessor :errors
|
||||
|
||||
def initialize(procedure, attachments, destination)
|
||||
@procedure = procedure
|
||||
@errors = {}
|
||||
@queue = ParallelDownloadQueue.new(attachments, destination)
|
||||
@queue.on_error = proc do |_attachment, path, error|
|
||||
errors[path] = true
|
||||
Rails.logger.error("Fail to download filename #{path} in procedure##{@procedure.id}, reason: #{error}")
|
||||
end
|
||||
end
|
||||
|
||||
def download_all
|
||||
@queue.download_all
|
||||
write_report if !errors.empty?
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def write_report
|
||||
manifest_path = File.join(destination, 'LISEZMOI.txt')
|
||||
manifest_content = errors.map do |file_basename, _failed|
|
||||
"Impossible de récupérer le fichier #{file_basename}"
|
||||
end
|
||||
.join("\n")
|
||||
File.write(manifest_path, manifest_content)
|
||||
end
|
||||
end
|
||||
end
|
|
@ -102,12 +102,8 @@ class ProcedureArchiveService
|
|||
FileUtils.remove_entry_secure(archive_dir) if Dir.exist?(archive_dir)
|
||||
Dir.mkdir(archive_dir)
|
||||
|
||||
ActiveStorage::DownloadManager
|
||||
.new(download_to_dir: archive_dir)
|
||||
.download_all(attachments: attachments,
|
||||
on_failure: proc { |_attachment, path, error|
|
||||
Rails.logger.error("Fail to download filename #{path} in procedure##{@procedure.id}, reason: #{error}")
|
||||
})
|
||||
download_manager = DownloadManager::ProcedureAttachmentsExport.new(@procedure, attachments, archive_dir)
|
||||
download_manager.download_all
|
||||
|
||||
Dir.chdir(tmp_dir) do
|
||||
File.delete(zip_path) if File.exist?(zip_path)
|
||||
|
|
3763
spec/fixtures/cassettes/archive/file_to_get.yml
vendored
3763
spec/fixtures/cassettes/archive/file_to_get.yml
vendored
File diff suppressed because it is too large
Load diff
|
@ -1,14 +1,17 @@
|
|||
describe ActiveStorage::DownloadManager do
|
||||
describe DownloadManager::ParallelDownloadQueue do
|
||||
let(:test_dir) { Dir.mktmpdir(nil, Dir.tmpdir) }
|
||||
let(:download_to_dir) { test_dir }
|
||||
before do
|
||||
downloadable_manager.on_error = proc { |_, _, _| }
|
||||
end
|
||||
|
||||
after { FileUtils.remove_entry_secure(test_dir) if Dir.exist?(test_dir) }
|
||||
|
||||
let(:downloadable_manager) { ActiveStorage::DownloadManager.new(download_to_dir: download_to_dir) }
|
||||
|
||||
let(:downloadable_manager) { DownloadManager::ParallelDownloadQueue.new([attachment], download_to_dir) }
|
||||
describe '#download_one' do
|
||||
subject { downloadable_manager.download_one(attachment: attachment, path_in_download_dir: path_in_download_dir, async_internet: double) }
|
||||
subject { downloadable_manager.download_one(attachment: attachment, path_in_download_dir: destination, http_client: double) }
|
||||
|
||||
let(:path_in_download_dir) { 'lol.png' }
|
||||
let(:destination) { 'lol.png' }
|
||||
let(:attachment) do
|
||||
PiecesJustificativesService::FakeAttachment.new(
|
||||
file: StringIO.new('coucou'),
|
||||
|
@ -21,21 +24,20 @@ describe ActiveStorage::DownloadManager do
|
|||
|
||||
context 'with a PiecesJustificativesService::FakeAttachment and it works' do
|
||||
it 'write attachment.file to disk' do
|
||||
target = File.join(download_to_dir, path_in_download_dir)
|
||||
target = File.join(download_to_dir, destination)
|
||||
expect { subject }.to change { File.exist?(target) }
|
||||
attachment.file.rewind
|
||||
expect(attachment.file.read).to eq(File.read(target))
|
||||
expect(downloadable_manager.errors).not_to have_key(path_in_download_dir)
|
||||
end
|
||||
end
|
||||
|
||||
context 'with a PiecesJustificativesService::FakeAttachment and it fails' do
|
||||
it 'write attachment.file to disk' do
|
||||
expect(attachment.file).to receive(:read).and_raise("boom")
|
||||
target = File.join(download_to_dir, path_in_download_dir)
|
||||
target = File.join(download_to_dir, destination)
|
||||
expect { subject }.to raise_error(StandardError)
|
||||
expect(File.exist?(target)).to be_falsey
|
||||
expect(downloadable_manager.errors).to have_key(path_in_download_dir)
|
||||
# expect(downloadable_manager.errors).to have_key(destination)
|
||||
end
|
||||
end
|
||||
end
|
|
@ -133,7 +133,7 @@ describe ProcedureArchiveService do
|
|||
end
|
||||
it 'collect files' do
|
||||
expect(InstructeurMailer).to receive(:send_archive).and_return(mailer)
|
||||
VCR.use_cassette('archive/file.to.get') do
|
||||
VCR.use_cassette('archive/file_to_get') do
|
||||
service.collect_files_archive(archive, instructeur)
|
||||
end
|
||||
|
||||
|
@ -218,7 +218,7 @@ describe ProcedureArchiveService do
|
|||
it 'collect files' do
|
||||
expect(InstructeurMailer).to receive(:send_archive).and_return(mailer)
|
||||
|
||||
VCR.use_cassette('archive/file.to.get') do
|
||||
VCR.use_cassette('archive/file_to_get') do
|
||||
service.collect_files_archive(archive, instructeur)
|
||||
end
|
||||
|
||||
|
|
Loading…
Reference in a new issue