tvl-depot/third_party/nix/src/libstore/remote-store.cc

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

686 lines
20 KiB
C++
Raw Normal View History

#include "libstore/remote-store.hh"
#include <cerrno>
#include <cstring>
#include <absl/status/status.h>
#include <absl/strings/ascii.h>
#include <fcntl.h>
#include <glog/logging.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/un.h>
#include <unistd.h>
#include "libstore/derivations.hh"
#include "libstore/globals.hh"
#include "libstore/worker-protocol.hh"
#include "libutil/affinity.hh"
#include "libutil/archive.hh"
#include "libutil/finally.hh"
#include "libutil/pool.hh"
#include "libutil/serialise.hh"
#include "libutil/util.hh"
2006-11-30 19:35:50 +01:00
namespace nix {
Path readStorePath(Store& store, Source& from) {
Path path = readString(from);
store.assertStorePath(path);
return path;
}
template <class T>
T readStorePaths(Store& store, Source& from) {
T paths = readStrings<T>(from);
for (auto& i : paths) {
store.assertStorePath(i);
}
return paths;
}
template PathSet readStorePaths(Store& store, Source& from);
template Paths readStorePaths(Store& store, Source& from);
/* TODO: Separate these store impls into different files, give them better names
*/
RemoteStore::RemoteStore(const Params& params)
: Store(params),
connections(make_ref<Pool<Connection>>(
std::max(1, (int)maxConnections),
[this]() { return openConnectionWrapper(); },
[this](const ref<Connection>& r) {
return r->to.good() && r->from.good() &&
std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::steady_clock::now() - r->startTime)
.count() < maxConnectionAge;
})) {}
ref<RemoteStore::Connection> RemoteStore::openConnectionWrapper() {
if (failed) {
throw Error("opening a connection to remote store '%s' previously failed",
getUri());
}
try {
return openConnection();
} catch (...) {
2017-03-03 19:35:34 +01:00
failed = true;
throw;
}
}
2017-03-03 19:35:34 +01:00
void RemoteStore::initConnection(Connection& conn) {
/* Send the magic greeting, check for the reply. */
try {
conn.to << WORKER_MAGIC_1;
conn.to.flush();
unsigned int magic = readInt(conn.from);
if (magic != WORKER_MAGIC_2) {
throw Error("protocol mismatch");
}
conn.from >> conn.daemonVersion;
if (GET_PROTOCOL_MAJOR(conn.daemonVersion) !=
GET_PROTOCOL_MAJOR(PROTOCOL_VERSION)) {
throw Error("Nix daemon protocol version not supported");
}
2017-03-03 19:35:34 +01:00
if (GET_PROTOCOL_MINOR(conn.daemonVersion) < 10) {
throw Error("the Nix daemon version is too old");
}
conn.to << PROTOCOL_VERSION;
if (GET_PROTOCOL_MINOR(conn.daemonVersion) >= 14) {
2017-03-03 19:35:34 +01:00
int cpu = sameMachine() && settings.lockCPU ? lockToCurrentCPU() : -1;
if (cpu != -1) {
conn.to << 1 << cpu;
} else {
conn.to << 0;
}
2017-03-03 19:35:34 +01:00
}
2012-07-30 23:13:25 +02:00
if (GET_PROTOCOL_MINOR(conn.daemonVersion) >= 11) {
conn.to << 0u;
}
2012-08-01 00:19:44 +02:00
auto ex = conn.processStderr();
if (ex) {
std::rethrow_exception(ex);
}
} catch (Error& e) {
throw Error("cannot open connection to remote store '%s': %s", getUri(),
e.what());
}
setOptions(conn);
}
void RemoteStore::setOptions(Connection& conn) {
conn.to << wopSetOptions << static_cast<uint64_t>(settings.keepFailed)
<< static_cast<uint64_t>(settings.keepGoing)
<< static_cast<uint64_t>(settings.tryFallback)
<< /* previously: verbosity = */ 0 << settings.maxBuildJobs
<< settings.maxSilentTime << 1u
<< /* previously: remote verbosity = */ 0 << 0 // obsolete log type
<< 0 /* obsolete print build trace */
<< settings.buildCores
<< static_cast<uint64_t>(settings.useSubstitutes);
if (GET_PROTOCOL_MINOR(conn.daemonVersion) >= 12) {
std::map<std::string, Config::SettingInfo> overrides;
globalConfig.getSettings(overrides, true);
overrides.erase(settings.keepFailed.name);
overrides.erase(settings.keepGoing.name);
overrides.erase(settings.tryFallback.name);
overrides.erase(settings.maxBuildJobs.name);
overrides.erase(settings.maxSilentTime.name);
overrides.erase(settings.buildCores.name);
overrides.erase(settings.useSubstitutes.name);
overrides.erase(settings.showTrace.name);
conn.to << overrides.size();
for (auto& i : overrides) {
conn.to << i.first << i.second.value;
}
}
auto ex = conn.processStderr();
if (ex) {
std::rethrow_exception(ex);
}
}
/* A wrapper around Pool<RemoteStore::Connection>::Handle that marks
the connection as bad (causing it to be closed) if a non-daemon
exception is thrown before the handle is closed. Such an exception
causes a deviation from the expected protocol and therefore a
desynchronization between the client and daemon. */
struct ConnectionHandle {
Pool<RemoteStore::Connection>::Handle handle;
bool daemonException = false;
explicit ConnectionHandle(Pool<RemoteStore::Connection>::Handle&& handle)
: handle(std::move(handle)) {}
ConnectionHandle(ConnectionHandle&& h) : handle(std::move(h.handle)) {}
~ConnectionHandle() {
if (!daemonException && (std::uncaught_exceptions() != 0)) {
handle.markBad();
// TODO(tazjin): are these types of things supposed to be DEBUG?
DLOG(INFO) << "closing daemon connection because of an exception";
}
}
RemoteStore::Connection* operator->() { return &*handle; }
void processStderr(Sink* sink = nullptr, Source* source = nullptr) {
auto ex = handle->processStderr(sink, source);
if (ex) {
daemonException = true;
std::rethrow_exception(ex);
}
}
};
ConnectionHandle RemoteStore::getConnection() {
return ConnectionHandle(connections->get());
}
bool RemoteStore::isValidPathUncached(const Path& path) {
auto conn(getConnection());
conn->to << wopIsValidPath << path;
conn.processStderr();
return readInt(conn->from) != 0u;
}
PathSet RemoteStore::queryValidPaths(const PathSet& paths,
SubstituteFlag maybeSubstitute) {
auto conn(getConnection());
if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 12) {
PathSet res;
2015-07-17 19:24:28 +02:00
for (auto& i : paths) {
if (isValidPath(i)) {
res.insert(i);
}
}
return res;
}
conn->to << wopQueryValidPaths << paths;
conn.processStderr();
return readStorePaths<PathSet>(*this, conn->from);
}
PathSet RemoteStore::queryAllValidPaths() {
auto conn(getConnection());
conn->to << wopQueryAllValidPaths;
conn.processStderr();
return readStorePaths<PathSet>(*this, conn->from);
2006-11-30 19:35:50 +01:00
}
download-from-binary-cache: parallelise fetching of NAR info files Getting substitute information using the binary cache substituter has non-trivial latency overhead. A package or NixOS system configuration can have hundreds of dependencies, and in the worst case (when the local info cache is empty) we have to do a separate HTTP request for each of these. If the ping time to the server is t, getting N info files will take tN seconds; e.g., with a ping time of 0.1s to nixos.org, sequentially downloading 1000 info files (a typical NixOS config) will take at least 100 seconds. To fix this problem, the binary cache substituter can now perform requests in parallel. This required changing the substituter interface to support a function querySubstitutablePathInfos() that queries multiple paths at the same time, and rewriting queryMissing() to take advantage of parallelism. (Due to local caching, parallelising queryMissing() is sufficient for most use cases, since it's almost always called before building a derivation and thus fills the local info cache.) For example, parallelism speeds up querying all 1056 paths in a particular NixOS system configuration from 116s to 2.6s. It works so well because the eccentricity of the top-level derivation in the dependency graph is only 9. So we only need 10 round-trips (when using an unlimited number of parallel connections) to get everything. Currently we do a maximum of 150 parallel connections to the server. Thus it's important that the binary cache server (e.g. nixos.org) has a high connection limit. Alternatively we could use HTTP pipelining, but WWW::Curl doesn't support it and libcurl has a hard-coded limit of 5 requests per pipeline.
2012-07-07 01:08:20 +02:00
PathSet RemoteStore::querySubstitutablePaths(const PathSet& paths) {
auto conn(getConnection());
if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 12) {
PathSet res;
for (auto& i : paths) {
conn->to << wopHasSubstitutes << i;
conn.processStderr();
if (readInt(conn->from) != 0u) {
res.insert(i);
}
download-from-binary-cache: parallelise fetching of NAR info files Getting substitute information using the binary cache substituter has non-trivial latency overhead. A package or NixOS system configuration can have hundreds of dependencies, and in the worst case (when the local info cache is empty) we have to do a separate HTTP request for each of these. If the ping time to the server is t, getting N info files will take tN seconds; e.g., with a ping time of 0.1s to nixos.org, sequentially downloading 1000 info files (a typical NixOS config) will take at least 100 seconds. To fix this problem, the binary cache substituter can now perform requests in parallel. This required changing the substituter interface to support a function querySubstitutablePathInfos() that queries multiple paths at the same time, and rewriting queryMissing() to take advantage of parallelism. (Due to local caching, parallelising queryMissing() is sufficient for most use cases, since it's almost always called before building a derivation and thus fills the local info cache.) For example, parallelism speeds up querying all 1056 paths in a particular NixOS system configuration from 116s to 2.6s. It works so well because the eccentricity of the top-level derivation in the dependency graph is only 9. So we only need 10 round-trips (when using an unlimited number of parallel connections) to get everything. Currently we do a maximum of 150 parallel connections to the server. Thus it's important that the binary cache server (e.g. nixos.org) has a high connection limit. Alternatively we could use HTTP pipelining, but WWW::Curl doesn't support it and libcurl has a hard-coded limit of 5 requests per pipeline.
2012-07-07 01:08:20 +02:00
}
return res;
}
conn->to << wopQuerySubstitutablePaths << paths;
conn.processStderr();
return readStorePaths<PathSet>(*this, conn->from);
}
void RemoteStore::querySubstitutablePathInfos(const PathSet& paths,
SubstitutablePathInfos& infos) {
if (paths.empty()) {
return;
}
auto conn(getConnection());
if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 12) {
for (auto& i : paths) {
SubstitutablePathInfo info;
conn->to << wopQuerySubstitutablePathInfo << i;
conn.processStderr();
unsigned int reply = readInt(conn->from);
if (reply == 0) {
continue;
}
info.deriver = readString(conn->from);
if (!info.deriver.empty()) {
assertStorePath(info.deriver);
}
info.references = readStorePaths<PathSet>(*this, conn->from);
info.downloadSize = readLongLong(conn->from);
info.narSize = readLongLong(conn->from);
2015-07-17 19:24:28 +02:00
infos[i] = info;
}
} else {
conn->to << wopQuerySubstitutablePathInfos << paths;
conn.processStderr();
auto count = readNum<size_t>(conn->from);
for (size_t n = 0; n < count; n++) {
Path path = readStorePath(*this, conn->from);
SubstitutablePathInfo& info(infos[path]);
info.deriver = readString(conn->from);
if (!info.deriver.empty()) {
assertStorePath(info.deriver);
}
info.references = readStorePaths<PathSet>(*this, conn->from);
info.downloadSize = readLongLong(conn->from);
info.narSize = readLongLong(conn->from);
}
}
}
void RemoteStore::queryPathInfoUncached(
const Path& path,
Callback<std::shared_ptr<ValidPathInfo>> callback) noexcept {
try {
std::shared_ptr<ValidPathInfo> info;
{
auto conn(getConnection());
conn->to << wopQueryPathInfo << path;
try {
conn.processStderr();
} catch (Error& e) {
// Ugly backwards compatibility hack.
if (e.msg().find("is not valid") != std::string::npos) {
throw InvalidPath(e.what());
}
throw;
}
if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 17) {
bool valid;
conn->from >> valid;
if (!valid) {
throw InvalidPath(format("path '%s' is not valid") % path);
}
}
info = std::make_shared<ValidPathInfo>();
info->path = path;
info->deriver = readString(conn->from);
if (!info->deriver.empty()) {
assertStorePath(info->deriver);
}
auto hash_ = Hash::deserialize(readString(conn->from), htSHA256);
info->narHash = Hash::unwrap_throw(hash_);
info->references = readStorePaths<PathSet>(*this, conn->from);
conn->from >> info->registrationTime >> info->narSize;
if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 16) {
conn->from >> info->ultimate;
info->sigs = readStrings<StringSet>(conn->from);
conn->from >> info->ca;
}
}
2018-03-27 22:16:01 +02:00
callback(std::move(info));
2016-02-23 16:40:16 +01:00
} catch (...) {
2018-03-27 22:16:01 +02:00
callback.rethrow();
}
}
void RemoteStore::queryReferrers(const Path& path, PathSet& referrers) {
auto conn(getConnection());
conn->to << wopQueryReferrers << path;
conn.processStderr();
auto referrers2 = readStorePaths<PathSet>(*this, conn->from);
referrers.insert(referrers2.begin(), referrers2.end());
}
PathSet RemoteStore::queryValidDerivers(const Path& path) {
auto conn(getConnection());
conn->to << wopQueryValidDerivers << path;
conn.processStderr();
return readStorePaths<PathSet>(*this, conn->from);
2006-11-30 19:35:50 +01:00
}
PathSet RemoteStore::queryDerivationOutputs(const Path& path) {
auto conn(getConnection());
conn->to << wopQueryDerivationOutputs << path;
conn.processStderr();
return readStorePaths<PathSet>(*this, conn->from);
2006-11-30 19:35:50 +01:00
}
PathSet RemoteStore::queryDerivationOutputNames(const Path& path) {
auto conn(getConnection());
conn->to << wopQueryDerivationOutputNames << path;
conn.processStderr();
return readStrings<PathSet>(conn->from);
}
Path RemoteStore::queryPathFromHashPart(const std::string& hashPart) {
auto conn(getConnection());
conn->to << wopQueryPathFromHashPart << hashPart;
conn.processStderr();
Path path = readString(conn->from);
if (!path.empty()) {
assertStorePath(path);
}
return path;
}
void RemoteStore::addToStore(const ValidPathInfo& info, Source& source,
RepairFlag repair, CheckSigsFlag checkSigs,
std::shared_ptr<FSAccessor> accessor) {
auto conn(getConnection());
if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 18) {
conn->to << wopImportPaths;
auto source2 = sinkToSource([&](Sink& sink) {
sink << 1 // == path follows
;
copyNAR(source, sink);
sink << exportMagic << info.path << info.references << info.deriver
<< 0 // == no legacy signature
<< 0 // == no path follows
;
});
conn.processStderr(nullptr, source2.get());
auto importedPaths = readStorePaths<PathSet>(*this, conn->from);
assert(importedPaths.size() <= 1);
}
else {
conn->to << wopAddToStoreNar << info.path << info.deriver
<< info.narHash.to_string(Base16, false) << info.references
<< info.registrationTime << info.narSize << info.ultimate
<< info.sigs << info.ca << repair << !checkSigs;
bool tunnel = GET_PROTOCOL_MINOR(conn->daemonVersion) >= 21;
if (!tunnel) {
copyNAR(source, conn->to);
}
conn.processStderr(nullptr, tunnel ? &source : nullptr);
}
}
Path RemoteStore::addToStore(const std::string& name, const Path& _srcPath,
bool recursive, HashType hashAlgo,
PathFilter& filter, RepairFlag repair) {
if (repair != 0u) {
throw Error(
"repairing is not supported when building through the Nix daemon");
}
auto conn(getConnection());
Path srcPath(absPath(_srcPath));
conn->to << wopAddToStore << name
2015-07-20 01:16:16 +02:00
<< ((hashAlgo == htSHA256 && recursive)
? 0
: 1) /* backwards compatibility hack */
2015-07-17 19:24:28 +02:00
<< (recursive ? 1 : 0) << printHashType(hashAlgo);
2015-07-17 19:24:28 +02:00
try {
conn->to.written = 0;
conn->to.warn = true;
connections->incCapacity();
{
Finally cleanup([&]() { connections->decCapacity(); });
dumpPath(srcPath, conn->to, filter);
}
conn->to.warn = false;
conn.processStderr();
} catch (SysError& e) {
/* Daemon closed while we were sending the path. Probably OOM
or I/O error. */
if (e.errNo == EPIPE) {
try {
conn.processStderr();
} catch (EndOfFile& e) {
}
}
throw;
}
return readStorePath(*this, conn->from);
}
Path RemoteStore::addTextToStore(const std::string& name, const std::string& s,
const PathSet& references, RepairFlag repair) {
if (repair != 0u) {
throw Error(
"repairing is not supported when building through the Nix daemon");
}
auto conn(getConnection());
conn->to << wopAddTextToStore << name << s << references;
conn.processStderr();
return readStorePath(*this, conn->from);
}
absl::Status RemoteStore::buildPaths(std::ostream& /* log_sink */,
const PathSet& drvPaths,
BuildMode build_mode) {
auto conn(getConnection());
conn->to << wopBuildPaths;
if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 13) {
conn->to << drvPaths;
if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 15) {
conn->to << build_mode;
} else if (build_mode != bmNormal) {
/* Old daemons did not take a 'buildMode' parameter, so we
need to validate it here on the client side. */
return absl::Status(
absl::StatusCode::kInvalidArgument,
"repairing or checking is not supported when building through the "
"Nix daemon");
}
} else {
/* For backwards compatibility with old daemons, strip output
identifiers. */
PathSet drvPaths2;
2015-07-17 19:24:28 +02:00
for (auto& i : drvPaths) {
drvPaths2.insert(std::string(i, 0, i.find('!')));
}
conn->to << drvPaths2;
}
conn.processStderr();
readInt(conn->from);
return absl::OkStatus();
2006-11-30 19:35:50 +01:00
}
BuildResult RemoteStore::buildDerivation(const Path& drvPath,
const BasicDerivation& drv,
BuildMode buildMode) {
auto conn(getConnection());
conn->to << wopBuildDerivation << drvPath << drv << buildMode;
conn.processStderr();
BuildResult res;
unsigned int status;
conn->from >> status >> res.errorMsg;
res.status = static_cast<BuildResult::Status>(status);
return res;
}
void RemoteStore::ensurePath(const Path& path) {
auto conn(getConnection());
conn->to << wopEnsurePath << path;
conn.processStderr();
readInt(conn->from);
}
void RemoteStore::addTempRoot(const Path& path) {
auto conn(getConnection());
conn->to << wopAddTempRoot << path;
conn.processStderr();
readInt(conn->from);
}
void RemoteStore::addIndirectRoot(const Path& path) {
auto conn(getConnection());
conn->to << wopAddIndirectRoot << path;
conn.processStderr();
readInt(conn->from);
}
void RemoteStore::syncWithGC() {
auto conn(getConnection());
conn->to << wopSyncWithGC;
conn.processStderr();
readInt(conn->from);
}
Roots RemoteStore::findRoots(bool censor) {
auto conn(getConnection());
conn->to << wopFindRoots;
conn.processStderr();
auto count = readNum<size_t>(conn->from);
Roots result;
while ((count--) != 0u) {
Path link = readString(conn->from);
Path target = readStorePath(*this, conn->from);
result[target].emplace(link);
}
return result;
}
void RemoteStore::collectGarbage(const GCOptions& options, GCResults& results) {
auto conn(getConnection());
conn->to << wopCollectGarbage << options.action << options.pathsToDelete
<< static_cast<uint64_t>(options.ignoreLiveness)
<< options.maxFreed
/* removed options */
<< 0 << 0 << 0;
conn.processStderr();
results.paths = readStrings<PathSet>(conn->from);
results.bytesFreed = readLongLong(conn->from);
readLongLong(conn->from); // obsolete
{
auto state_(Store::state.lock());
state_->pathInfoCache.clear();
}
}
void RemoteStore::optimiseStore() {
auto conn(getConnection());
conn->to << wopOptimiseStore;
conn.processStderr();
readInt(conn->from);
}
bool RemoteStore::verifyStore(bool checkContents, RepairFlag repair) {
auto conn(getConnection());
conn->to << wopVerifyStore << static_cast<uint64_t>(checkContents) << repair;
conn.processStderr();
return readInt(conn->from) != 0u;
}
void RemoteStore::addSignatures(const Path& storePath, const StringSet& sigs) {
auto conn(getConnection());
conn->to << wopAddSignatures << storePath << sigs;
conn.processStderr();
readInt(conn->from);
}
void RemoteStore::queryMissing(const PathSet& targets, PathSet& willBuild,
PathSet& willSubstitute, PathSet& unknown,
unsigned long long& downloadSize,
unsigned long long& narSize) {
2006-11-30 19:35:50 +01:00
{
auto conn(getConnection());
if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 19) {
// Don't hold the connection handle in the fallback case
// to prevent a deadlock.
goto fallback;
}
conn->to << wopQueryMissing << targets;
conn.processStderr();
willBuild = readStorePaths<PathSet>(*this, conn->from);
willSubstitute = readStorePaths<PathSet>(*this, conn->from);
unknown = readStorePaths<PathSet>(*this, conn->from);
conn->from >> downloadSize >> narSize;
return;
2006-11-30 19:35:50 +01:00
}
fallback:
return Store::queryMissing(targets, willBuild, willSubstitute, unknown,
downloadSize, narSize);
}
void RemoteStore::connect() { auto conn(getConnection()); }
unsigned int RemoteStore::getProtocol() {
auto conn(connections->get());
return conn->daemonVersion;
}
void RemoteStore::flushBadConnections() { connections->flushBad(); }
RemoteStore::Connection::~Connection() {
try {
to.flush();
} catch (...) {
ignoreException();
}
}
std::exception_ptr RemoteStore::Connection::processStderr(Sink* sink,
Source* source) {
2016-02-23 16:40:16 +01:00
to.flush();
while (true) {
auto msg = readNum<uint64_t>(from);
if (msg == STDERR_WRITE) {
std::string s = readString(from);
if (sink == nullptr) {
throw Error("no sink");
}
(*sink)(s);
}
else if (msg == STDERR_READ) {
if (source == nullptr) {
throw Error("no source");
}
auto len = readNum<size_t>(from);
auto buf = std::make_unique<unsigned char[]>(len);
writeString(buf.get(), source->read(buf.get(), len), to);
2016-02-23 16:40:16 +01:00
to.flush();
}
2016-02-23 16:40:16 +01:00
else if (msg == STDERR_ERROR) {
std::string error = readString(from);
unsigned int status = readInt(from);
return std::make_exception_ptr(Error(status, error));
}
else if (msg == STDERR_NEXT) {
LOG(ERROR) << absl::StripTrailingAsciiWhitespace(readString(from));
}
else if (msg == STDERR_START_ACTIVITY) {
LOG(INFO) << readString(from);
}
else if (msg == STDERR_LAST) {
break;
}
else {
throw Error("got unknown message type %x from Nix daemon", msg);
}
}
return nullptr;
}
2006-11-30 19:35:50 +01:00
} // namespace nix