feat(async_backend): switch to typhoeus
This commit is contained in:
parent
ce1b189dcd
commit
3eb1c1a421
6 changed files with 210 additions and 4187 deletions
|
@ -1,7 +1,3 @@
|
|||
require 'async'
|
||||
require 'async/barrier'
|
||||
require 'async/http/internet'
|
||||
|
||||
module DownloadManager
|
||||
class ParallelDownloadQueue
|
||||
include Utils::Retryable
|
||||
|
@ -17,51 +13,49 @@ module DownloadManager
|
|||
end
|
||||
|
||||
def download_all
|
||||
Async do
|
||||
http_client = Async::HTTP::Internet.new
|
||||
barrier = Async::Barrier.new
|
||||
semaphore = Async::Semaphore.new(DOWNLOAD_MAX_PARALLEL, parent: barrier)
|
||||
hydra = Typhoeus::Hydra.new(max_concurrency: DOWNLOAD_MAX_PARALLEL)
|
||||
|
||||
attachments.map do |attachment, path|
|
||||
semaphore.async do
|
||||
begin
|
||||
with_retry(max_attempt: 1) do
|
||||
download_one(attachment: attachment,
|
||||
path_in_download_dir: path,
|
||||
http_client: http_client)
|
||||
end
|
||||
rescue => e
|
||||
on_error.call(attachment, path, e)
|
||||
end
|
||||
attachments.map do |attachment, path|
|
||||
begin
|
||||
with_retry(max_attempt: 1) do
|
||||
download_one(attachment: attachment,
|
||||
path_in_download_dir: path,
|
||||
http_client: hydra)
|
||||
end
|
||||
rescue => e
|
||||
on_error.call(attachment, path, e)
|
||||
end
|
||||
barrier.wait
|
||||
ensure
|
||||
http_client&.close
|
||||
end
|
||||
hydra.run
|
||||
end
|
||||
|
||||
# rubocop:disable Style/AutoResourceCleanup
|
||||
# can't be used with typhoeus, otherwise block is closed before the request is run by hydra
|
||||
def download_one(attachment:, path_in_download_dir:, http_client:)
|
||||
byte_written = 0
|
||||
attachment_path = File.join(destination, path_in_download_dir)
|
||||
attachment_dir = File.dirname(attachment_path)
|
||||
|
||||
FileUtils.mkdir_p(attachment_dir) if !Dir.exist?(attachment_dir) # defensive, do not write in undefined dir
|
||||
if attachment.is_a?(PiecesJustificativesService::FakeAttachment)
|
||||
byte_written = File.write(attachment_path, attachment.file.read, mode: 'wb')
|
||||
File.write(attachment_path, attachment.file.read, mode: 'wb')
|
||||
else
|
||||
response = http_client.get(attachment.url)
|
||||
File.open(attachment_path, mode: 'wb') do |fd|
|
||||
response.body.each do |chunk|
|
||||
byte_written = byte_written + fd.write(chunk)
|
||||
end
|
||||
response.body.close
|
||||
request = Typhoeus::Request.new(attachment.url)
|
||||
fd = File.open(attachment_path, mode: 'wb')
|
||||
request.on_body do |chunk|
|
||||
fd.write(chunk)
|
||||
end
|
||||
request.on_complete do |response|
|
||||
fd.close
|
||||
unless response.success?
|
||||
raise 'ko'
|
||||
end
|
||||
end
|
||||
http_client.queue(request)
|
||||
end
|
||||
byte_written
|
||||
rescue
|
||||
File.delete(attachment_path) if File.exist?(attachment_path) # -> case of retries failed, must cleanup partialy downloaded file
|
||||
raise
|
||||
end
|
||||
# rubocop:enable Style/AutoResourceCleanup
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue