S3BinaryCacheStore: Allow disabling multipart uploads
The use of TransferManager has several issues, including that it doesn't allow setting a Content-Encoding without a patch, and it doesn't handle exceptions in worker threads (causing termination on memory allocation failure). Fixes #2493.
This commit is contained in:
parent
0163e8928c
commit
9f99d62480
1 changed files with 56 additions and 30 deletions
|
@ -173,6 +173,8 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
|
||||||
const Setting<std::string> narinfoCompression{this, "", "narinfo-compression", "compression method for .narinfo files"};
|
const Setting<std::string> narinfoCompression{this, "", "narinfo-compression", "compression method for .narinfo files"};
|
||||||
const Setting<std::string> lsCompression{this, "", "ls-compression", "compression method for .ls files"};
|
const Setting<std::string> lsCompression{this, "", "ls-compression", "compression method for .ls files"};
|
||||||
const Setting<std::string> logCompression{this, "", "log-compression", "compression method for log/* files"};
|
const Setting<std::string> logCompression{this, "", "log-compression", "compression method for log/* files"};
|
||||||
|
const Setting<bool> multipartUpload{
|
||||||
|
this, false, "multipart-upload", "whether to use multi-part uploads"};
|
||||||
const Setting<uint64_t> bufferSize{
|
const Setting<uint64_t> bufferSize{
|
||||||
this, 5 * 1024 * 1024, "buffer-size", "size (in bytes) of each part in multi-part uploads"};
|
this, 5 * 1024 * 1024, "buffer-size", "size (in bytes) of each part in multi-part uploads"};
|
||||||
|
|
||||||
|
@ -261,46 +263,70 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
|
||||||
static std::shared_ptr<Aws::Utils::Threading::PooledThreadExecutor>
|
static std::shared_ptr<Aws::Utils::Threading::PooledThreadExecutor>
|
||||||
executor = std::make_shared<Aws::Utils::Threading::PooledThreadExecutor>(maxThreads);
|
executor = std::make_shared<Aws::Utils::Threading::PooledThreadExecutor>(maxThreads);
|
||||||
|
|
||||||
std::call_once(transferManagerCreated, [&]() {
|
std::call_once(transferManagerCreated, [&]()
|
||||||
|
{
|
||||||
|
if (multipartUpload) {
|
||||||
|
TransferManagerConfiguration transferConfig(executor.get());
|
||||||
|
|
||||||
TransferManagerConfiguration transferConfig(executor.get());
|
transferConfig.s3Client = s3Helper.client;
|
||||||
|
transferConfig.bufferSize = bufferSize;
|
||||||
|
|
||||||
transferConfig.s3Client = s3Helper.client;
|
transferConfig.uploadProgressCallback =
|
||||||
transferConfig.bufferSize = bufferSize;
|
[](const TransferManager *transferManager,
|
||||||
|
const std::shared_ptr<const TransferHandle>
|
||||||
|
&transferHandle)
|
||||||
|
{
|
||||||
|
//FIXME: find a way to properly abort the multipart upload.
|
||||||
|
//checkInterrupt();
|
||||||
|
debug("upload progress ('%s'): '%d' of '%d' bytes",
|
||||||
|
transferHandle->GetKey(),
|
||||||
|
transferHandle->GetBytesTransferred(),
|
||||||
|
transferHandle->GetBytesTotalSize());
|
||||||
|
};
|
||||||
|
|
||||||
transferConfig.uploadProgressCallback =
|
transferManager = TransferManager::Create(transferConfig);
|
||||||
[](const TransferManager *transferManager,
|
}
|
||||||
const std::shared_ptr<const TransferHandle>
|
|
||||||
&transferHandle)
|
|
||||||
{
|
|
||||||
//FIXME: find a way to properly abort the multipart upload.
|
|
||||||
//checkInterrupt();
|
|
||||||
debug("upload progress ('%s'): '%d' of '%d' bytes",
|
|
||||||
transferHandle->GetKey(),
|
|
||||||
transferHandle->GetBytesTransferred(),
|
|
||||||
transferHandle->GetBytesTotalSize());
|
|
||||||
};
|
|
||||||
|
|
||||||
transferManager = TransferManager::Create(transferConfig);
|
|
||||||
});
|
});
|
||||||
|
|
||||||
auto now1 = std::chrono::steady_clock::now();
|
auto now1 = std::chrono::steady_clock::now();
|
||||||
|
|
||||||
std::shared_ptr<TransferHandle> transferHandle =
|
if (transferManager) {
|
||||||
transferManager->UploadFile(
|
|
||||||
stream, bucketName, path, mimeType,
|
|
||||||
Aws::Map<Aws::String, Aws::String>(),
|
|
||||||
nullptr, contentEncoding);
|
|
||||||
|
|
||||||
transferHandle->WaitUntilFinished();
|
std::shared_ptr<TransferHandle> transferHandle =
|
||||||
|
transferManager->UploadFile(
|
||||||
|
stream, bucketName, path, mimeType,
|
||||||
|
Aws::Map<Aws::String, Aws::String>(),
|
||||||
|
nullptr, contentEncoding);
|
||||||
|
|
||||||
if (transferHandle->GetStatus() == TransferStatus::FAILED)
|
transferHandle->WaitUntilFinished();
|
||||||
throw Error("AWS error: failed to upload 's3://%s/%s': %s",
|
|
||||||
bucketName, path, transferHandle->GetLastError().GetMessage());
|
|
||||||
|
|
||||||
if (transferHandle->GetStatus() != TransferStatus::COMPLETED)
|
if (transferHandle->GetStatus() == TransferStatus::FAILED)
|
||||||
throw Error("AWS error: transfer status of 's3://%s/%s' in unexpected state",
|
throw Error("AWS error: failed to upload 's3://%s/%s': %s",
|
||||||
bucketName, path);
|
bucketName, path, transferHandle->GetLastError().GetMessage());
|
||||||
|
|
||||||
|
if (transferHandle->GetStatus() != TransferStatus::COMPLETED)
|
||||||
|
throw Error("AWS error: transfer status of 's3://%s/%s' in unexpected state",
|
||||||
|
bucketName, path);
|
||||||
|
|
||||||
|
} else {
|
||||||
|
|
||||||
|
auto request =
|
||||||
|
Aws::S3::Model::PutObjectRequest()
|
||||||
|
.WithBucket(bucketName)
|
||||||
|
.WithKey(path);
|
||||||
|
|
||||||
|
request.SetContentType(mimeType);
|
||||||
|
|
||||||
|
if (contentEncoding != "")
|
||||||
|
request.SetContentEncoding(contentEncoding);
|
||||||
|
|
||||||
|
auto stream = std::make_shared<istringstream_nocopy>(data);
|
||||||
|
|
||||||
|
request.SetBody(stream);
|
||||||
|
|
||||||
|
auto result = checkAws(fmt("AWS error uploading '%s'", path),
|
||||||
|
s3Helper.client->PutObject(request));
|
||||||
|
}
|
||||||
|
|
||||||
printTalkative("upload of '%s' completed", path);
|
printTalkative("upload of '%s' completed", path);
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue