feat(3p/nix): Start implementing RPC store client
Add a stub class for wrapping a gRPC client to the new, proto-backed nix store protocol, along with several methods implemented but several left throwing a not implemented exception. Paired-With: Vincent Ambo <mail@tazj.in> Paired-With: Perry Lorier <isomer@tvl.fyi> Change-Id: Id943d4f6d75084b8498786d580e6c9f7c92c104d Reviewed-on: https://cl.tvl.fyi/c/depot/+/1436 Tested-by: BuildkiteCI Reviewed-by: kanepyork <rikingcoding@gmail.com>
This commit is contained in:
parent
dcaba9de64
commit
b10970a66f
5 changed files with 450 additions and 16 deletions
2
third_party/nix/src/libstore/CMakeLists.txt
vendored
2
third_party/nix/src/libstore/CMakeLists.txt
vendored
|
@ -38,6 +38,7 @@ set(HEADER_FILES
|
|||
references.hh
|
||||
remote-fs-accessor.hh
|
||||
remote-store.hh
|
||||
rpc-store.hh
|
||||
s3-binary-cache-store.hh
|
||||
s3.hh
|
||||
serve-protocol.hh
|
||||
|
@ -78,6 +79,7 @@ target_sources(nixstore
|
|||
references.cc
|
||||
remote-fs-accessor.cc
|
||||
remote-store.cc
|
||||
rpc-store.cc
|
||||
s3-binary-cache-store.cc
|
||||
sqlite.cc
|
||||
ssh.cc
|
||||
|
|
282
third_party/nix/src/libstore/rpc-store.cc
vendored
Normal file
282
third_party/nix/src/libstore/rpc-store.cc
vendored
Normal file
|
@ -0,0 +1,282 @@
|
|||
#include "rpc-store.hh"
|
||||
|
||||
#include <algorithm>
|
||||
#include <memory>
|
||||
|
||||
#include <absl/strings/str_cat.h>
|
||||
#include <absl/strings/str_format.h>
|
||||
#include <google/protobuf/empty.pb.h>
|
||||
#include <google/protobuf/util/time_util.h>
|
||||
#include <grpcpp/create_channel.h>
|
||||
#include <grpcpp/impl/codegen/async_unary_call.h>
|
||||
#include <grpcpp/impl/codegen/client_context.h>
|
||||
#include <grpcpp/impl/codegen/completion_queue.h>
|
||||
#include <grpcpp/impl/codegen/status.h>
|
||||
#include <grpcpp/impl/codegen/sync_stream.h>
|
||||
#include <grpcpp/security/credentials.h>
|
||||
|
||||
#include "libproto/worker.grpc.pb.h"
|
||||
#include "libproto/worker.pb.h"
|
||||
#include "libstore/store-api.hh"
|
||||
#include "libutil/hash.hh"
|
||||
#include "libutil/types.hh"
|
||||
|
||||
namespace nix {
|
||||
|
||||
namespace store {
|
||||
|
||||
using google::protobuf::util::TimeUtil;
|
||||
using grpc::ClientContext;
|
||||
using nix::proto::WorkerService;
|
||||
|
||||
static google::protobuf::Empty kEmpty;
|
||||
static ClientContext ctx;
|
||||
|
||||
proto::StorePath StorePath(const Path& path) {
|
||||
proto::StorePath store_path;
|
||||
store_path.set_path(path);
|
||||
return store_path;
|
||||
}
|
||||
|
||||
template <typename T, typename U>
|
||||
T FillFrom(const U& src) {
|
||||
T result;
|
||||
result.insert(src.begin(), src.end());
|
||||
return result;
|
||||
}
|
||||
|
||||
// TODO(grfn): Obviously this should go away and be replaced by StatusOr... but
|
||||
// that would require refactoring the entire store api, which we don't feel like
|
||||
// doing right now. We should at some point though
|
||||
void SuccessOrThrow(const grpc::Status& status) {
|
||||
if (!status.ok()) {
|
||||
throw Error(absl::StrFormat("Rpc call failed (%d): %s ",
|
||||
status.error_code(), status.error_message()));
|
||||
}
|
||||
}
|
||||
|
||||
bool RpcStore::isValidPathUncached(const Path& path) {
|
||||
proto::IsValidPathResponse resp;
|
||||
SuccessOrThrow(stub_->IsValidPath(&ctx, StorePath(path), &resp));
|
||||
return resp.is_valid();
|
||||
}
|
||||
|
||||
PathSet RpcStore::queryAllValidPaths() {
|
||||
proto::StorePaths paths;
|
||||
SuccessOrThrow(stub_->QueryAllValidPaths(&ctx, kEmpty, &paths));
|
||||
return FillFrom<PathSet>(paths.paths());
|
||||
}
|
||||
|
||||
PathSet RpcStore::queryValidPaths(const PathSet& paths,
|
||||
SubstituteFlag maybeSubstitute) {
|
||||
proto::StorePaths store_paths;
|
||||
for (const auto& path : paths) {
|
||||
store_paths.add_paths(path);
|
||||
}
|
||||
proto::StorePaths result_paths;
|
||||
SuccessOrThrow(stub_->QueryValidPaths(&ctx, store_paths, &result_paths));
|
||||
return FillFrom<PathSet>(result_paths.paths());
|
||||
}
|
||||
|
||||
void RpcStore::queryPathInfoUncached(
|
||||
const Path& path,
|
||||
Callback<std::shared_ptr<ValidPathInfo>> callback) noexcept {
|
||||
proto::StorePath store_path;
|
||||
store_path.set_path(path);
|
||||
|
||||
try {
|
||||
proto::PathInfo path_info;
|
||||
SuccessOrThrow(stub_->QueryPathInfo(&ctx, store_path, &path_info));
|
||||
|
||||
std::shared_ptr<ValidPathInfo> info;
|
||||
|
||||
if (!path_info.is_valid()) {
|
||||
throw InvalidPath(absl::StrFormat("path '%s' is not valid", path));
|
||||
}
|
||||
|
||||
info = std::make_shared<ValidPathInfo>();
|
||||
info->path = path;
|
||||
info->deriver = path_info.deriver().path();
|
||||
if (!info->deriver.empty()) {
|
||||
assertStorePath(info->deriver);
|
||||
}
|
||||
info->narHash = Hash(path_info.nar_hash(), htSHA256);
|
||||
info->references.insert(path_info.references().begin(),
|
||||
path_info.references().end());
|
||||
info->registrationTime =
|
||||
TimeUtil::TimestampToTimeT(path_info.registration_time());
|
||||
info->narSize = path_info.nar_size();
|
||||
info->ultimate = path_info.ultimate();
|
||||
info->sigs.insert(path_info.sigs().begin(), path_info.sigs().end());
|
||||
info->ca = path_info.ca();
|
||||
|
||||
callback(std::move(info));
|
||||
} catch (...) {
|
||||
callback.rethrow();
|
||||
}
|
||||
}
|
||||
|
||||
void RpcStore::queryReferrers(const Path& path, PathSet& referrers) {
|
||||
proto::StorePaths paths;
|
||||
SuccessOrThrow(stub_->QueryReferrers(&ctx, StorePath(path), &paths));
|
||||
referrers.insert(paths.paths().begin(), paths.paths().end());
|
||||
}
|
||||
|
||||
PathSet RpcStore::queryValidDerivers(const Path& path) {
|
||||
proto::StorePaths paths;
|
||||
SuccessOrThrow(stub_->QueryValidDerivers(&ctx, StorePath(path), &paths));
|
||||
return FillFrom<PathSet>(paths.paths());
|
||||
}
|
||||
|
||||
PathSet RpcStore::queryDerivationOutputs(const Path& path) {
|
||||
proto::StorePaths paths;
|
||||
SuccessOrThrow(stub_->QueryDerivationOutputs(&ctx, StorePath(path), &paths));
|
||||
return FillFrom<PathSet>(paths.paths());
|
||||
}
|
||||
|
||||
StringSet RpcStore::queryDerivationOutputNames(const Path& path) {
|
||||
proto::DerivationOutputNames output_names;
|
||||
SuccessOrThrow(
|
||||
stub_->QueryDerivationOutputNames(&ctx, StorePath(path), &output_names));
|
||||
return FillFrom<StringSet>(output_names.names());
|
||||
}
|
||||
|
||||
Path RpcStore::queryPathFromHashPart(const std::string& hashPart) {
|
||||
throw absl::StrCat("Not implemented ", __func__);
|
||||
}
|
||||
|
||||
PathSet RpcStore::querySubstitutablePaths(const PathSet& paths) {
|
||||
throw absl::StrCat("Not implemented ", __func__);
|
||||
}
|
||||
|
||||
void RpcStore::querySubstitutablePathInfos(const PathSet& paths,
|
||||
SubstitutablePathInfos& infos) {
|
||||
throw absl::StrCat("Not implemented ", __func__);
|
||||
}
|
||||
|
||||
void RpcStore::addToStore(const ValidPathInfo& info, Source& narSource,
|
||||
RepairFlag repair, CheckSigsFlag checkSigs,
|
||||
std::shared_ptr<FSAccessor> accessor) {
|
||||
throw absl::StrCat("Not implemented ", __func__);
|
||||
}
|
||||
|
||||
void RpcStore::addToStore(const ValidPathInfo& info,
|
||||
const ref<std::string>& nar, RepairFlag repair,
|
||||
CheckSigsFlag checkSigs,
|
||||
std::shared_ptr<FSAccessor> accessor) {
|
||||
throw absl::StrCat("Not implemented ", __func__);
|
||||
}
|
||||
|
||||
Path RpcStore::addToStore(const std::string& name, const Path& srcPath,
|
||||
bool recursive, HashType hashAlgo, PathFilter& filter,
|
||||
RepairFlag repair) {
|
||||
throw absl::StrCat("Not implemented ", __func__);
|
||||
}
|
||||
|
||||
Path RpcStore::addTextToStore(const std::string& name, const std::string& s,
|
||||
const PathSet& references, RepairFlag repair) {
|
||||
throw absl::StrCat("Not implemented ", __func__);
|
||||
}
|
||||
|
||||
void RpcStore::narFromPath(const Path& path, Sink& sink) {
|
||||
throw absl::StrCat("Not implemented ", __func__);
|
||||
}
|
||||
|
||||
void RpcStore::buildPaths(const PathSet& paths, BuildMode buildMode) {
|
||||
throw absl::StrCat("Not implemented ", __func__);
|
||||
}
|
||||
|
||||
BuildResult RpcStore::buildDerivation(const Path& drvPath,
|
||||
const BasicDerivation& drv,
|
||||
BuildMode buildMode) {
|
||||
throw absl::StrCat("Not implemented ", __func__);
|
||||
}
|
||||
|
||||
void RpcStore::ensurePath(const Path& path) {
|
||||
throw absl::StrCat("Not implemented ", __func__);
|
||||
}
|
||||
|
||||
void RpcStore::addTempRoot(const Path& path) {
|
||||
throw absl::StrCat("Not implemented ", __func__);
|
||||
}
|
||||
|
||||
void RpcStore::addIndirectRoot(const Path& path) {
|
||||
throw absl::StrCat("Not implemented ", __func__);
|
||||
}
|
||||
|
||||
void RpcStore::syncWithGC() {
|
||||
throw absl::StrCat("Not implemented ", __func__);
|
||||
}
|
||||
|
||||
Roots RpcStore::findRoots(bool censor) {
|
||||
throw absl::StrCat("Not implemented ", __func__);
|
||||
}
|
||||
|
||||
void RpcStore::collectGarbage(const GCOptions& options, GCResults& results) {
|
||||
throw absl::StrCat("Not implemented ", __func__);
|
||||
}
|
||||
|
||||
void RpcStore::optimiseStore() {
|
||||
throw absl::StrCat("Not implemented ", __func__);
|
||||
}
|
||||
|
||||
bool RpcStore::verifyStore(bool checkContents, RepairFlag repair) {
|
||||
throw absl::StrCat("Not implemented ", __func__);
|
||||
}
|
||||
|
||||
void RpcStore::addSignatures(const Path& storePath, const StringSet& sigs) {
|
||||
throw absl::StrCat("Not implemented ", __func__);
|
||||
}
|
||||
|
||||
void RpcStore::computeFSClosure(const PathSet& paths, PathSet& paths_,
|
||||
bool flipDirection, bool includeOutputs,
|
||||
bool includeDerivers) {
|
||||
throw absl::StrCat("Not implemented ", __func__);
|
||||
}
|
||||
|
||||
void RpcStore::queryMissing(const PathSet& targets, PathSet& willBuild,
|
||||
PathSet& willSubstitute, PathSet& unknown,
|
||||
unsigned long long& downloadSize,
|
||||
unsigned long long& narSize) {
|
||||
throw absl::StrCat("Not implemented ", __func__);
|
||||
}
|
||||
|
||||
std::shared_ptr<std::string> RpcStore::getBuildLog(const Path& path) {
|
||||
throw absl::StrCat("Not implemented ", __func__);
|
||||
}
|
||||
|
||||
void RpcStore::connect() { throw absl::StrCat("Not implemented ", __func__); }
|
||||
|
||||
unsigned int RpcStore::getProtocol() {
|
||||
throw absl::StrCat("Not implemented ", __func__);
|
||||
}
|
||||
|
||||
int RpcStore::getPriority() {
|
||||
throw absl::StrCat("Not implemented ", __func__);
|
||||
}
|
||||
|
||||
Path RpcStore::toRealPath(const Path& storePath) {
|
||||
throw absl::StrCat("Not implemented ", __func__);
|
||||
}
|
||||
|
||||
void RpcStore::createUser(const std::string& userName, uid_t userId) {
|
||||
throw absl::StrCat("Not implemented ", __func__);
|
||||
}
|
||||
|
||||
} // namespace store
|
||||
|
||||
static std::string uriScheme = "unix://";
|
||||
|
||||
// TODO(grfn): Make this a function that we call from main rather than... this
|
||||
static RegisterStoreImplementation regStore([](const std::string& uri,
|
||||
const Store::Params& params)
|
||||
-> std::shared_ptr<Store> {
|
||||
if (std::string(uri, 0, uriScheme.size()) != uriScheme) {
|
||||
return nullptr;
|
||||
}
|
||||
auto channel = grpc::CreateChannel(uri, grpc::InsecureChannelCredentials());
|
||||
return std::make_shared<store::RpcStore>(
|
||||
uri, params, proto::WorkerService::NewStub(channel));
|
||||
});
|
||||
|
||||
} // namespace nix
|
142
third_party/nix/src/libstore/rpc-store.hh
vendored
Normal file
142
third_party/nix/src/libstore/rpc-store.hh
vendored
Normal file
|
@ -0,0 +1,142 @@
|
|||
#pragma once
|
||||
|
||||
#include "libproto/worker.grpc.pb.h"
|
||||
#include "libproto/worker.pb.h"
|
||||
#include "libstore/remote-store.hh"
|
||||
#include "libstore/store-api.hh"
|
||||
|
||||
namespace nix::store {
|
||||
|
||||
// TODO(grfn): Currently, since the RPCStore is only used for the connection to
|
||||
// the nix daemon over a unix socket, it inherits from the LocalFSStore since it
|
||||
// shares a filesystem with the daemon. This will not always be the case, at
|
||||
// which point we should tease these two things apart.
|
||||
class RpcStore : public LocalFSStore, public virtual Store {
|
||||
public:
|
||||
RpcStore(const Params& params,
|
||||
std::unique_ptr<nix::proto::WorkerService::Stub> stub)
|
||||
: Store(params), LocalFSStore(params), stub_(std::move(stub)) {}
|
||||
|
||||
RpcStore(std::string uri, const Params& params,
|
||||
std::unique_ptr<nix::proto::WorkerService::Stub> stub)
|
||||
: Store(params),
|
||||
LocalFSStore(params),
|
||||
uri_(uri),
|
||||
stub_(std::move(stub)) {}
|
||||
|
||||
std::string getUri() override {
|
||||
if (uri_.has_value()) {
|
||||
return uri_.value();
|
||||
} else {
|
||||
return "daemon";
|
||||
}
|
||||
};
|
||||
|
||||
virtual PathSet queryAllValidPaths() override;
|
||||
|
||||
virtual void queryReferrers(const Path& path, PathSet& referrers) override;
|
||||
|
||||
virtual PathSet queryValidDerivers(const Path& path) override;
|
||||
|
||||
virtual PathSet queryDerivationOutputs(const Path& path) override;
|
||||
|
||||
virtual StringSet queryDerivationOutputNames(const Path& path) override;
|
||||
|
||||
virtual Path queryPathFromHashPart(const std::string& hashPart) override;
|
||||
|
||||
virtual PathSet querySubstitutablePaths(const PathSet& paths) override;
|
||||
|
||||
virtual void querySubstitutablePathInfos(
|
||||
const PathSet& paths, SubstitutablePathInfos& infos) override;
|
||||
|
||||
virtual bool wantMassQuery() override { return true; }
|
||||
|
||||
virtual void addToStore(const ValidPathInfo& info, Source& narSource,
|
||||
RepairFlag repair = NoRepair,
|
||||
CheckSigsFlag checkSigs = CheckSigs,
|
||||
std::shared_ptr<FSAccessor> accessor = 0) override;
|
||||
|
||||
virtual void addToStore(const ValidPathInfo& info,
|
||||
const ref<std::string>& nar,
|
||||
RepairFlag repair = NoRepair,
|
||||
CheckSigsFlag checkSigs = CheckSigs,
|
||||
std::shared_ptr<FSAccessor> accessor = 0) override;
|
||||
|
||||
virtual Path addToStore(const std::string& name, const Path& srcPath,
|
||||
bool recursive = true, HashType hashAlgo = htSHA256,
|
||||
PathFilter& filter = defaultPathFilter,
|
||||
RepairFlag repair = NoRepair) override;
|
||||
|
||||
virtual Path addTextToStore(const std::string& name, const std::string& s,
|
||||
const PathSet& references,
|
||||
RepairFlag repair = NoRepair) override;
|
||||
|
||||
virtual void narFromPath(const Path& path, Sink& sink) override;
|
||||
|
||||
virtual void buildPaths(const PathSet& paths,
|
||||
BuildMode buildMode = bmNormal) override;
|
||||
|
||||
virtual BuildResult buildDerivation(const Path& drvPath,
|
||||
const BasicDerivation& drv,
|
||||
BuildMode buildMode = bmNormal) override;
|
||||
|
||||
virtual void ensurePath(const Path& path) override;
|
||||
|
||||
virtual void addTempRoot(const Path& path) override;
|
||||
|
||||
virtual void addIndirectRoot(const Path& path) override;
|
||||
|
||||
virtual void syncWithGC() override;
|
||||
|
||||
virtual Roots findRoots(bool censor) override;
|
||||
|
||||
virtual void collectGarbage(const GCOptions& options,
|
||||
GCResults& results) override;
|
||||
|
||||
virtual void optimiseStore() override;
|
||||
|
||||
virtual bool verifyStore(bool checkContents,
|
||||
RepairFlag repair = NoRepair) override;
|
||||
|
||||
virtual void addSignatures(const Path& storePath,
|
||||
const StringSet& sigs) override;
|
||||
|
||||
virtual void computeFSClosure(const PathSet& paths, PathSet& paths_,
|
||||
bool flipDirection = false,
|
||||
bool includeOutputs = false,
|
||||
bool includeDerivers = false) override;
|
||||
|
||||
virtual void queryMissing(const PathSet& targets, PathSet& willBuild,
|
||||
PathSet& willSubstitute, PathSet& unknown,
|
||||
unsigned long long& downloadSize,
|
||||
unsigned long long& narSize) override;
|
||||
|
||||
virtual std::shared_ptr<std::string> getBuildLog(const Path& path) override;
|
||||
|
||||
virtual void connect() override;
|
||||
|
||||
virtual unsigned int getProtocol() override;
|
||||
|
||||
virtual int getPriority() override;
|
||||
|
||||
virtual Path toRealPath(const Path& storePath) override;
|
||||
|
||||
virtual void createUser(const std::string& userName, uid_t userId) override;
|
||||
|
||||
protected:
|
||||
virtual bool isValidPathUncached(const Path& path) override;
|
||||
|
||||
virtual PathSet queryValidPaths(
|
||||
const PathSet& paths,
|
||||
SubstituteFlag maybeSubstitute = NoSubstitute) override;
|
||||
|
||||
virtual void queryPathInfoUncached(
|
||||
const Path& path,
|
||||
Callback<std::shared_ptr<ValidPathInfo>> callback) noexcept override;
|
||||
|
||||
private:
|
||||
std::optional<std::string> uri_;
|
||||
std::unique_ptr<nix::proto::WorkerService::Stub> stub_;
|
||||
};
|
||||
|
||||
} // namespace nix::store
|
39
third_party/nix/src/libstore/store-api.cc
vendored
39
third_party/nix/src/libstore/store-api.cc
vendored
|
@ -7,11 +7,13 @@
|
|||
#include <absl/strings/numbers.h>
|
||||
#include <absl/strings/str_split.h>
|
||||
#include <glog/logging.h>
|
||||
#include <grpcpp/create_channel.h>
|
||||
|
||||
#include "libstore/crypto.hh"
|
||||
#include "libstore/derivations.hh"
|
||||
#include "libstore/globals.hh"
|
||||
#include "libstore/nar-info-disk-cache.hh"
|
||||
#include "libstore/rpc-store.hh"
|
||||
#include "libutil/json.hh"
|
||||
#include "libutil/thread-pool.hh"
|
||||
#include "libutil/util.hh"
|
||||
|
@ -978,23 +980,28 @@ StoreType getStoreType(const std::string& uri, const std::string& stateDir) {
|
|||
}
|
||||
}
|
||||
|
||||
static RegisterStoreImplementation regStore([](const std::string& uri,
|
||||
const Store::Params& params)
|
||||
-> std::shared_ptr<Store> {
|
||||
switch (getStoreType(uri, get(params, "state", settings.nixStateDir))) {
|
||||
case tDaemon:
|
||||
return std::shared_ptr<Store>(std::make_shared<UDSRemoteStore>(params));
|
||||
case tLocal: {
|
||||
Store::Params params2 = params;
|
||||
if (absl::StartsWith(uri, "/")) {
|
||||
params2["root"] = uri;
|
||||
static RegisterStoreImplementation regStore(
|
||||
[](const std::string& uri,
|
||||
const Store::Params& params) -> std::shared_ptr<Store> {
|
||||
switch (getStoreType(uri, get(params, "state", settings.nixStateDir))) {
|
||||
case tDaemon: {
|
||||
auto daemon_socket_uri = settings.nixDaemonSocketFile;
|
||||
auto channel = grpc::CreateChannel(
|
||||
daemon_socket_uri, grpc::InsecureChannelCredentials());
|
||||
return std::shared_ptr<Store>(std::make_shared<nix::store::RpcStore>(
|
||||
params, proto::WorkerService::NewStub(channel)));
|
||||
}
|
||||
case tLocal: {
|
||||
Store::Params params2 = params;
|
||||
if (absl::StartsWith(uri, "/")) {
|
||||
params2["root"] = uri;
|
||||
}
|
||||
return std::shared_ptr<Store>(std::make_shared<LocalStore>(params2));
|
||||
}
|
||||
default:
|
||||
return nullptr;
|
||||
}
|
||||
return std::shared_ptr<Store>(std::make_shared<LocalStore>(params2));
|
||||
}
|
||||
default:
|
||||
return nullptr;
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
std::list<ref<Store>> getDefaultSubstituters() {
|
||||
static auto stores([]() {
|
||||
|
|
1
third_party/nix/src/proto/worker.proto
vendored
1
third_party/nix/src/proto/worker.proto
vendored
|
@ -251,6 +251,7 @@ message CollectGarbageResponse {
|
|||
}
|
||||
|
||||
message PathInfo {
|
||||
bool is_valid = 9;
|
||||
StorePath deriver = 1;
|
||||
bytes nar_hash = 2;
|
||||
repeated string references = 3;
|
||||
|
|
Loading…
Reference in a new issue