Fix 'error 9 while decompressing xz file'

Once we've started writing data to a Sink, we can't restart a download
request, because then we end up writing duplicate data to the
Sink. Therefore we shouldn't handle retries in Downloader but at a
higher level (in particular, in copyStorePath()).

Fixes #2952.

(cherry picked from commit a67cf5a3585c41dd9f219a2c7aa9cf67fa69520b)
This commit is contained in:
Eelco Dolstra 2019-06-24 21:48:52 +02:00
parent 2fef4dd296
commit 78fa47a7f0
No known key found for this signature in database
GPG key ID: 8170B4726D7198DE
7 changed files with 156 additions and 119 deletions

View file

@ -10,6 +10,8 @@
#include "nar-info-disk-cache.hh" #include "nar-info-disk-cache.hh"
#include "nar-accessor.hh" #include "nar-accessor.hh"
#include "json.hh" #include "json.hh"
#include "retry.hh"
#include "download.hh"
#include <chrono> #include <chrono>
@ -79,13 +81,15 @@ void BinaryCacheStore::getFile(const std::string & path, Sink & sink)
std::shared_ptr<std::string> BinaryCacheStore::getFile(const std::string & path) std::shared_ptr<std::string> BinaryCacheStore::getFile(const std::string & path)
{ {
StringSink sink; return retry<std::shared_ptr<std::string>>(downloadSettings.tries, [&]() -> std::shared_ptr<std::string> {
try { StringSink sink;
getFile(path, sink); try {
} catch (NoSuchBinaryCacheFile &) { getFile(path, sink);
return nullptr; } catch (NoSuchBinaryCacheFile &) {
} return nullptr;
return sink.s; }
return sink.s;
});
} }
Path BinaryCacheStore::narInfoFileFor(const Path & storePath) Path BinaryCacheStore::narInfoFileFor(const Path & storePath)

View file

@ -8,6 +8,7 @@
#include "compression.hh" #include "compression.hh"
#include "pathlocks.hh" #include "pathlocks.hh"
#include "finally.hh" #include "finally.hh"
#include "retry.hh"
#ifdef ENABLE_S3 #ifdef ENABLE_S3
#include <aws/core/client/ClientConfiguration.h> #include <aws/core/client/ClientConfiguration.h>
@ -19,11 +20,9 @@
#include <curl/curl.h> #include <curl/curl.h>
#include <algorithm> #include <algorithm>
#include <cmath>
#include <cstring> #include <cstring>
#include <iostream> #include <iostream>
#include <queue> #include <queue>
#include <random>
#include <thread> #include <thread>
using namespace std::string_literals; using namespace std::string_literals;
@ -62,9 +61,6 @@ struct CurlDownloader : public Downloader
{ {
CURLM * curlm = 0; CURLM * curlm = 0;
std::random_device rd;
std::mt19937 mt19937;
struct DownloadItem : public std::enable_shared_from_this<DownloadItem> struct DownloadItem : public std::enable_shared_from_this<DownloadItem>
{ {
CurlDownloader & downloader; CurlDownloader & downloader;
@ -77,12 +73,6 @@ struct CurlDownloader : public Downloader
bool active = false; // whether the handle has been added to the multi object bool active = false; // whether the handle has been added to the multi object
std::string status; std::string status;
unsigned int attempt = 0;
/* Don't start this download until the specified time point
has been reached. */
std::chrono::steady_clock::time_point embargo;
struct curl_slist * requestHeaders = 0; struct curl_slist * requestHeaders = 0;
std::string encoding; std::string encoding;
@ -401,9 +391,7 @@ struct CurlDownloader : public Downloader
} }
} }
attempt++; fail(
auto exc =
code == CURLE_ABORTED_BY_CALLBACK && _isInterrupted code == CURLE_ABORTED_BY_CALLBACK && _isInterrupted
? DownloadError(Interrupted, fmt("%s of '%s' was interrupted", request.verb(), request.uri)) ? DownloadError(Interrupted, fmt("%s of '%s' was interrupted", request.verb(), request.uri))
: httpStatus != 0 : httpStatus != 0
@ -414,31 +402,15 @@ struct CurlDownloader : public Downloader
) )
: DownloadError(err, : DownloadError(err,
fmt("unable to %s '%s': %s (%d)", fmt("unable to %s '%s': %s (%d)",
request.verb(), request.uri, curl_easy_strerror(code), code)); request.verb(), request.uri, curl_easy_strerror(code), code)));
/* If this is a transient error, then maybe retry the
download after a while. */
if (err == Transient && attempt < request.tries) {
int ms = request.baseRetryTimeMs * std::pow(2.0f, attempt - 1 + std::uniform_real_distribution<>(0.0, 0.5)(downloader.mt19937));
printError(format("warning: %s; retrying in %d ms") % exc.what() % ms);
embargo = std::chrono::steady_clock::now() + std::chrono::milliseconds(ms);
downloader.enqueueItem(shared_from_this());
}
else
fail(exc);
} }
} }
}; };
struct State struct State
{ {
struct EmbargoComparator {
bool operator() (const std::shared_ptr<DownloadItem> & i1, const std::shared_ptr<DownloadItem> & i2) {
return i1->embargo > i2->embargo;
}
};
bool quit = false; bool quit = false;
std::priority_queue<std::shared_ptr<DownloadItem>, std::vector<std::shared_ptr<DownloadItem>>, EmbargoComparator> incoming; std::vector<std::shared_ptr<DownloadItem>> incoming;
}; };
Sync<State> state_; Sync<State> state_;
@ -451,7 +423,6 @@ struct CurlDownloader : public Downloader
std::thread workerThread; std::thread workerThread;
CurlDownloader() CurlDownloader()
: mt19937(rd())
{ {
static std::once_flag globalInit; static std::once_flag globalInit;
std::call_once(globalInit, curl_global_init, CURL_GLOBAL_ALL); std::call_once(globalInit, curl_global_init, CURL_GLOBAL_ALL);
@ -545,9 +516,7 @@ struct CurlDownloader : public Downloader
nextWakeup = std::chrono::steady_clock::time_point(); nextWakeup = std::chrono::steady_clock::time_point();
/* Add new curl requests from the incoming requests queue, /* Add new curl requests from the incoming requests queue. */
except for requests that are embargoed (waiting for a
retry timeout to expire). */
if (extraFDs[0].revents & CURL_WAIT_POLLIN) { if (extraFDs[0].revents & CURL_WAIT_POLLIN) {
char buf[1024]; char buf[1024];
auto res = read(extraFDs[0].fd, buf, sizeof(buf)); auto res = read(extraFDs[0].fd, buf, sizeof(buf));
@ -556,22 +525,9 @@ struct CurlDownloader : public Downloader
} }
std::vector<std::shared_ptr<DownloadItem>> incoming; std::vector<std::shared_ptr<DownloadItem>> incoming;
auto now = std::chrono::steady_clock::now();
{ {
auto state(state_.lock()); auto state(state_.lock());
while (!state->incoming.empty()) { std::swap(state->incoming, incoming);
auto item = state->incoming.top();
if (item->embargo <= now) {
incoming.push_back(item);
state->incoming.pop();
} else {
if (nextWakeup == std::chrono::steady_clock::time_point()
|| item->embargo < nextWakeup)
nextWakeup = item->embargo;
break;
}
}
quit = state->quit; quit = state->quit;
} }
@ -598,7 +554,7 @@ struct CurlDownloader : public Downloader
{ {
auto state(state_.lock()); auto state(state_.lock());
while (!state->incoming.empty()) state->incoming.pop(); state->incoming.clear();
state->quit = true; state->quit = true;
} }
} }
@ -614,7 +570,7 @@ struct CurlDownloader : public Downloader
auto state(state_.lock()); auto state(state_.lock());
if (state->quit) if (state->quit)
throw nix::Error("cannot enqueue download request because the download thread is shutting down"); throw nix::Error("cannot enqueue download request because the download thread is shutting down");
state->incoming.push(item); state->incoming.push_back(item);
} }
writeFull(wakeupPipe.writeSide.get(), " "); writeFull(wakeupPipe.writeSide.get(), " ");
} }
@ -697,7 +653,9 @@ std::future<DownloadResult> Downloader::enqueueDownload(const DownloadRequest &
DownloadResult Downloader::download(const DownloadRequest & request) DownloadResult Downloader::download(const DownloadRequest & request)
{ {
return enqueueDownload(request).get(); return retry<DownloadResult>(request.tries, [&]() {
return enqueueDownload(request).get();
});
} }
void Downloader::download(DownloadRequest && request, Sink & sink) void Downloader::download(DownloadRequest && request, Sink & sink)
@ -883,7 +841,7 @@ CachedDownloadResult Downloader::downloadCached(ref<Store> store, const string &
writeFile(dataFile, url + "\n" + res.etag + "\n" + std::to_string(time(0)) + "\n"); writeFile(dataFile, url + "\n" + res.etag + "\n" + std::to_string(time(0)) + "\n");
} catch (DownloadError & e) { } catch (DownloadError & e) {
if (storePath.empty()) throw; if (storePath.empty()) throw;
printError(format("warning: %1%; using cached result") % e.msg()); warn("%s; using cached result", e.msg());
result.etag = expectedETag; result.etag = expectedETag;
} }
} }
@ -933,5 +891,4 @@ bool isUri(const string & s)
return scheme == "http" || scheme == "https" || scheme == "file" || scheme == "channel" || scheme == "git" || scheme == "s3" || scheme == "ssh"; return scheme == "http" || scheme == "https" || scheme == "file" || scheme == "channel" || scheme == "git" || scheme == "s3" || scheme == "ssh";
} }
} }

View file

@ -62,11 +62,13 @@ struct Downloader
std::future<DownloadResult> enqueueDownload(const DownloadRequest & request); std::future<DownloadResult> enqueueDownload(const DownloadRequest & request);
/* Synchronously download a file. */ /* Synchronously download a file. The request will be retried in
case of transient failures. */
DownloadResult download(const DownloadRequest & request); DownloadResult download(const DownloadRequest & request);
/* Download a file, writing its data to a sink. The sink will be /* Download a file, writing its data to a sink. The sink will be
invoked on the thread of the caller. */ invoked on the thread of the caller. The request will not be
retried in case of transient failures. */
void download(DownloadRequest && request, Sink & sink); void download(DownloadRequest && request, Sink & sink);
/* Check if the specified file is already in ~/.cache/nix/tarballs /* Check if the specified file is already in ~/.cache/nix/tarballs
@ -95,6 +97,11 @@ public:
DownloadError(Downloader::Error error, const FormatOrString & fs) DownloadError(Downloader::Error error, const FormatOrString & fs)
: Error(fs), error(error) : Error(fs), error(error)
{ } { }
bool isTransient() override
{
return error == Downloader::Error::Transient;
}
}; };
bool isUri(const string & s); bool isUri(const string & s);

View file

@ -2,6 +2,7 @@
#include "download.hh" #include "download.hh"
#include "globals.hh" #include "globals.hh"
#include "nar-info-disk-cache.hh" #include "nar-info-disk-cache.hh"
#include "retry.hh"
namespace nix { namespace nix {
@ -114,7 +115,6 @@ protected:
DownloadRequest makeRequest(const std::string & path) DownloadRequest makeRequest(const std::string & path)
{ {
DownloadRequest request(cacheUri + "/" + path); DownloadRequest request(cacheUri + "/" + path);
request.tries = 8;
return request; return request;
} }
@ -137,21 +137,46 @@ protected:
{ {
checkEnabled(); checkEnabled();
auto request(makeRequest(path)); struct State
{
DownloadRequest request;
std::function<void()> tryDownload;
unsigned int attempt = 0;
State(DownloadRequest && request) : request(request) {}
};
getDownloader()->enqueueDownload(request, auto state = std::make_shared<State>(makeRequest(path));
{[callback, this](std::future<DownloadResult> result) {
try { state->tryDownload = [callback, state, this]() {
callback(result.get().data); getDownloader()->enqueueDownload(state->request,
} catch (DownloadError & e) { {[callback, state, this](std::future<DownloadResult> result) {
if (e.error == Downloader::NotFound || e.error == Downloader::Forbidden) try {
return callback(std::shared_ptr<std::string>()); callback(result.get().data);
maybeDisable(); } catch (DownloadError & e) {
callback.rethrow(); if (e.error == Downloader::NotFound || e.error == Downloader::Forbidden)
} catch (...) { return callback(std::shared_ptr<std::string>());
callback.rethrow(); ++state->attempt;
} if (state->attempt < state->request.tries && e.isTransient()) {
}}); auto ms = retrySleepTime(state->attempt);
warn("%s; retrying in %d ms", e.what(), ms);
/* We can't sleep here because that would
block the download thread. So use a
separate thread for sleeping. */
std::thread([state, ms]() {
std::this_thread::sleep_for(std::chrono::milliseconds(ms));
state->tryDownload();
}).detach();
} else {
maybeDisable();
callback.rethrow();
}
} catch (...) {
callback.rethrow();
}
}});
};
state->tryDownload();
} }
}; };

View file

@ -6,10 +6,11 @@
#include "thread-pool.hh" #include "thread-pool.hh"
#include "json.hh" #include "json.hh"
#include "derivations.hh" #include "derivations.hh"
#include "retry.hh"
#include "download.hh"
#include <future> #include <future>
namespace nix { namespace nix {
@ -572,54 +573,57 @@ void Store::buildPaths(const PathSet & paths, BuildMode buildMode)
void copyStorePath(ref<Store> srcStore, ref<Store> dstStore, void copyStorePath(ref<Store> srcStore, ref<Store> dstStore,
const Path & storePath, RepairFlag repair, CheckSigsFlag checkSigs) const Path & storePath, RepairFlag repair, CheckSigsFlag checkSigs)
{ {
auto srcUri = srcStore->getUri(); retry<void>(downloadSettings.tries, [&]() {
auto dstUri = dstStore->getUri();
Activity act(*logger, lvlInfo, actCopyPath, auto srcUri = srcStore->getUri();
srcUri == "local" || srcUri == "daemon" auto dstUri = dstStore->getUri();
? fmt("copying path '%s' to '%s'", storePath, dstUri)
: dstUri == "local" || dstUri == "daemon"
? fmt("copying path '%s' from '%s'", storePath, srcUri)
: fmt("copying path '%s' from '%s' to '%s'", storePath, srcUri, dstUri),
{storePath, srcUri, dstUri});
PushActivity pact(act.id);
auto info = srcStore->queryPathInfo(storePath); Activity act(*logger, lvlInfo, actCopyPath,
srcUri == "local" || srcUri == "daemon"
? fmt("copying path '%s' to '%s'", storePath, dstUri)
: dstUri == "local" || dstUri == "daemon"
? fmt("copying path '%s' from '%s'", storePath, srcUri)
: fmt("copying path '%s' from '%s' to '%s'", storePath, srcUri, dstUri),
{storePath, srcUri, dstUri});
PushActivity pact(act.id);
uint64_t total = 0; auto info = srcStore->queryPathInfo(storePath);
if (!info->narHash) { uint64_t total = 0;
StringSink sink;
srcStore->narFromPath({storePath}, sink);
auto info2 = make_ref<ValidPathInfo>(*info);
info2->narHash = hashString(htSHA256, *sink.s);
if (!info->narSize) info2->narSize = sink.s->size();
if (info->ultimate) info2->ultimate = false;
info = info2;
StringSource source(*sink.s); if (!info->narHash) {
dstStore->addToStore(*info, source, repair, checkSigs); StringSink sink;
return; srcStore->narFromPath({storePath}, sink);
} auto info2 = make_ref<ValidPathInfo>(*info);
info2->narHash = hashString(htSHA256, *sink.s);
if (!info->narSize) info2->narSize = sink.s->size();
if (info->ultimate) info2->ultimate = false;
info = info2;
if (info->ultimate) { StringSource source(*sink.s);
auto info2 = make_ref<ValidPathInfo>(*info); dstStore->addToStore(*info, source, repair, checkSigs);
info2->ultimate = false; return;
info = info2; }
}
auto source = sinkToSource([&](Sink & sink) { if (info->ultimate) {
LambdaSink wrapperSink([&](const unsigned char * data, size_t len) { auto info2 = make_ref<ValidPathInfo>(*info);
sink(data, len); info2->ultimate = false;
total += len; info = info2;
act.progress(total, info->narSize); }
auto source = sinkToSource([&](Sink & sink) {
LambdaSink wrapperSink([&](const unsigned char * data, size_t len) {
sink(data, len);
total += len;
act.progress(total, info->narSize);
});
srcStore->narFromPath({storePath}, wrapperSink);
}, [&]() {
throw EndOfFile("NAR for '%s' fetched from '%s' is incomplete", storePath, srcStore->getUri());
}); });
srcStore->narFromPath({storePath}, wrapperSink);
}, [&]() {
throw EndOfFile("NAR for '%s' fetched from '%s' is incomplete", storePath, srcStore->getUri());
});
dstStore->addToStore(*info, *source, repair, checkSigs); dstStore->addToStore(*info, *source, repair, checkSigs);
});
} }

38
src/libutil/retry.hh Normal file
View file

@ -0,0 +1,38 @@
#pragma once
#include "logging.hh"
#include <functional>
#include <cmath>
#include <random>
#include <thread>
namespace nix {
inline unsigned int retrySleepTime(unsigned int attempt)
{
std::random_device rd;
std::mt19937 mt19937;
return 250.0 * std::pow(2.0f,
attempt - 1 + std::uniform_real_distribution<>(0.0, 0.5)(mt19937));
}
template<typename C>
C retry(unsigned int attempts, std::function<C()> && f)
{
unsigned int attempt = 0;
while (true) {
try {
return f();
} catch (BaseError & e) {
++attempt;
if (attempt >= attempts || !e.isTransient())
throw;
auto ms = retrySleepTime(attempt);
warn("%s; retrying in %d ms", e.what(), ms);
std::this_thread::sleep_for(std::chrono::milliseconds(ms));
}
}
}
}

View file

@ -109,6 +109,8 @@ public:
const string & msg() const { return err; } const string & msg() const { return err; }
const string & prefix() const { return prefix_; } const string & prefix() const { return prefix_; }
BaseError & addPrefix(const FormatOrString & fs); BaseError & addPrefix(const FormatOrString & fs);
virtual bool isTransient() { return false; }
}; };
#define MakeError(newClass, superClass) \ #define MakeError(newClass, superClass) \