feat(tvix): Implement AddToStoreNar

Implement both the client and server sides of AddToStoreNar, using a
templated generalization of the sources and sinks we were using for
AddToStore on both ends.

Change-Id: I73d0ed34118c711b125851dff99a7518ced4af35
Reviewed-on: https://cl.tvl.fyi/c/depot/+/1686
Tested-by: BuildkiteCI
Reviewed-by: kanepyork <rikingcoding@gmail.com>
This commit is contained in:
Griffin Smith 2020-08-05 22:46:21 -04:00 committed by glittershark
parent cc01059d40
commit 7db734afad
4 changed files with 129 additions and 33 deletions

View file

@ -56,29 +56,32 @@ T FillFrom(const U& src) {
return result; return result;
} }
class AddToStorePathWriterSink : public BufferedSink { template <typename Request>
class RPCSink : public BufferedSink {
public: public:
explicit AddToStorePathWriterSink( using Writer = grpc::ClientWriter<Request>;
std::unique_ptr< explicit RPCSink(std::unique_ptr<Writer>&& writer)
grpc_impl::ClientWriter<class nix::proto::AddToStoreRequest>>&&
writer)
: writer_(std::move(writer)), good_(true) {} : writer_(std::move(writer)), good_(true) {}
bool good() override { return good_; } bool good() override { return good_; }
void write(const unsigned char* data, size_t len) override { void write(const unsigned char* data, size_t len) override {
proto::AddToStoreRequest req; Request req;
req.set_data(data, len); req.set_data(data, len);
if (!writer_->Write(req)) { if (!writer_->Write(req)) {
good_ = false; good_ = false;
} }
} }
grpc::Status Finish() { return writer_->Finish(); } ~RPCSink() override { flush(); }
grpc::Status Finish() {
flush();
return writer_->Finish();
}
private: private:
std::unique_ptr<grpc_impl::ClientWriter<class nix::proto::AddToStoreRequest>> std::unique_ptr<Writer> writer_;
writer_;
bool good_; bool good_;
}; };
@ -319,14 +322,33 @@ void RpcStore::querySubstitutablePathInfos(const PathSet& paths,
void RpcStore::addToStore(const ValidPathInfo& info, Source& narSource, void RpcStore::addToStore(const ValidPathInfo& info, Source& narSource,
RepairFlag repair, CheckSigsFlag checkSigs, RepairFlag repair, CheckSigsFlag checkSigs,
std::shared_ptr<FSAccessor> accessor) { std::shared_ptr<FSAccessor> accessor) {
throw Unsupported(absl::StrCat("Not implemented ", __func__)); ClientContext ctx;
} google::protobuf::Empty response;
auto writer = stub_->AddToStoreNar(&ctx, &response);
void RpcStore::addToStore(const ValidPathInfo& info, proto::AddToStoreNarRequest path_info_req;
const ref<std::string>& nar, RepairFlag repair, path_info_req.mutable_path_info()->mutable_path()->set_path(info.path);
CheckSigsFlag checkSigs, path_info_req.mutable_path_info()->mutable_deriver()->set_path(info.deriver);
std::shared_ptr<FSAccessor> accessor) { path_info_req.mutable_path_info()->set_nar_hash(
throw Unsupported(absl::StrCat("Not implemented ", __func__)); info.narHash.to_string(Base16, false));
for (const auto& ref : info.references) {
path_info_req.mutable_path_info()->add_references(ref);
}
*path_info_req.mutable_path_info()->mutable_registration_time() =
TimeUtil::TimeTToTimestamp(info.registrationTime);
path_info_req.mutable_path_info()->set_nar_size(info.narSize);
path_info_req.mutable_path_info()->set_ultimate(info.ultimate);
for (const auto& sig : info.sigs) {
path_info_req.mutable_path_info()->add_sigs(sig);
}
path_info_req.mutable_path_info()->set_ca(info.ca);
path_info_req.mutable_path_info()->set_repair(repair);
path_info_req.mutable_path_info()->set_check_sigs(checkSigs);
writer->Write(path_info_req);
RPCSink sink(std::move(writer));
copyNAR(narSource, sink);
} }
Path RpcStore::addToStore(const std::string& name, const Path& srcPath, Path RpcStore::addToStore(const std::string& name, const Path& srcPath,
@ -349,7 +371,7 @@ Path RpcStore::addToStore(const std::string& name, const Path& srcPath,
metadata_req.mutable_meta()->set_hash_type(HashTypeToProto(hashAlgo)); metadata_req.mutable_meta()->set_hash_type(HashTypeToProto(hashAlgo));
writer->Write(metadata_req); writer->Write(metadata_req);
AddToStorePathWriterSink sink(std::move(writer)); RPCSink sink(std::move(writer));
dumpPath(std::filesystem::absolute(srcPath), sink); dumpPath(std::filesystem::absolute(srcPath), sink);
sink.flush(); sink.flush();
SuccessOrThrow(sink.Finish()); SuccessOrThrow(sink.Finish());
@ -402,7 +424,9 @@ void RpcStore::ensurePath(const Path& path) {
} }
void RpcStore::addTempRoot(const Path& path) { void RpcStore::addTempRoot(const Path& path) {
throw Unsupported(absl::StrCat("Not implemented ", __func__)); ClientContext ctx;
google::protobuf::Empty response;
SuccessOrThrow(stub_->AddTempRoot(&ctx, StorePath(path), &response));
} }
void RpcStore::addIndirectRoot(const Path& path) { void RpcStore::addIndirectRoot(const Path& path) {

View file

@ -56,12 +56,6 @@ class RpcStore : public LocalFSStore, public virtual Store {
CheckSigsFlag checkSigs = CheckSigs, CheckSigsFlag checkSigs = CheckSigs,
std::shared_ptr<FSAccessor> accessor = 0) override; std::shared_ptr<FSAccessor> accessor = 0) override;
virtual void addToStore(const ValidPathInfo& info,
const ref<std::string>& nar,
RepairFlag repair = NoRepair,
CheckSigsFlag checkSigs = CheckSigs,
std::shared_ptr<FSAccessor> accessor = 0) override;
virtual Path addToStore(const std::string& name, const Path& srcPath, virtual Path addToStore(const std::string& name, const Path& srcPath,
bool recursive = true, HashType hashAlgo = htSHA256, bool recursive = true, HashType hashAlgo = htSHA256,
PathFilter& filter = defaultPathFilter, PathFilter& filter = defaultPathFilter,

View file

@ -2,6 +2,7 @@
#include <filesystem> #include <filesystem>
#include <sstream> #include <sstream>
#include <string>
#include <absl/strings/str_cat.h> #include <absl/strings/str_cat.h>
#include <absl/strings/str_format.h> #include <absl/strings/str_format.h>
@ -10,6 +11,7 @@
#include <grpcpp/impl/codegen/server_context.h> #include <grpcpp/impl/codegen/server_context.h>
#include <grpcpp/impl/codegen/status.h> #include <grpcpp/impl/codegen/status.h>
#include <grpcpp/impl/codegen/status_code_enum.h> #include <grpcpp/impl/codegen/status_code_enum.h>
#include <grpcpp/impl/codegen/sync_stream.h>
#include "libmain/shared.hh" #include "libmain/shared.hh"
#include "libproto/worker.grpc.pb.h" #include "libproto/worker.grpc.pb.h"
@ -24,6 +26,7 @@
namespace nix::daemon { namespace nix::daemon {
using ::google::protobuf::util::TimeUtil;
using ::grpc::Status; using ::grpc::Status;
using ::nix::proto::BuildStatus; using ::nix::proto::BuildStatus;
using ::nix::proto::PathInfo; using ::nix::proto::PathInfo;
@ -31,20 +34,20 @@ using ::nix::proto::StorePath;
using ::nix::proto::StorePaths; using ::nix::proto::StorePaths;
using ::nix::proto::WorkerService; using ::nix::proto::WorkerService;
class AddToStoreRequestSource final : public Source { template <typename Request>
using Reader = grpc::ServerReader<nix::proto::AddToStoreRequest>; class RPCSource final : public Source {
public: public:
explicit AddToStoreRequestSource(Reader* reader) : reader_(reader) {} using Reader = grpc::ServerReader<Request>;
explicit RPCSource(Reader* reader) : reader_(reader) {}
size_t read(unsigned char* data, size_t len) override { size_t read(unsigned char* data, size_t len) override {
auto got = buffer_.sgetn(reinterpret_cast<char*>(data), len); auto got = buffer_.sgetn(reinterpret_cast<char*>(data), len);
if (got < len) { if (got < len) {
proto::AddToStoreRequest msg; Request msg;
if (!reader_->Read(&msg)) { if (!reader_->Read(&msg)) {
return got; return got;
} }
if (msg.add_oneof_case() != proto::AddToStoreRequest::kData) { if (msg.add_oneof_case() != Request::kData) {
// TODO(grfn): Make Source::read return a StatusOr and get rid of this // TODO(grfn): Make Source::read return a StatusOr and get rid of this
// throw // throw
throw Error( throw Error(
@ -152,7 +155,7 @@ class WorkerServiceImpl final : public WorkerService::Service {
} }
auto meta = metadata_request.meta(); auto meta = metadata_request.meta();
AddToStoreRequestSource source(reader); RPCSource source(reader);
auto opt_hash_type = hash_type_from(meta.hash_type()); auto opt_hash_type = hash_type_from(meta.hash_type());
if (!opt_hash_type) { if (!opt_hash_type) {
return Status(grpc::StatusCode::INVALID_ARGUMENT, return Status(grpc::StatusCode::INVALID_ARGUMENT,
@ -194,6 +197,62 @@ class WorkerServiceImpl final : public WorkerService::Service {
__FUNCTION__); __FUNCTION__);
} }
Status AddToStoreNar(
grpc::ServerContext* context,
grpc::ServerReader<nix::proto::AddToStoreNarRequest>* reader,
google::protobuf::Empty*) override {
return HandleExceptions(
[&]() -> Status {
proto::AddToStoreNarRequest path_info_request;
auto has_path_info = reader->Read(&path_info_request);
if (!has_path_info || !path_info_request.has_path_info()) {
return Status(grpc::StatusCode::INVALID_ARGUMENT,
"Path info must be set before sending nar content");
}
auto path_info = path_info_request.path_info();
ValidPathInfo info;
info.path = path_info.path().path();
info.deriver = path_info.deriver().path();
if (!info.deriver.empty()) {
ASSERT_INPUT_STORE_PATH(info.deriver);
}
auto nar_hash = Hash::deserialize(path_info.nar_hash(), htSHA256);
if (!nar_hash.ok()) {
return Status(grpc::StatusCode::INVALID_ARGUMENT,
std::string(nar_hash.status().message()));
}
info.narHash = nar_hash.ConsumeValueOrDie();
for (const auto& ref : path_info.references()) {
info.references.insert(ref);
}
info.registrationTime =
TimeUtil::TimestampToTimeT(path_info.registration_time());
info.narSize = path_info.nar_size();
info.ultimate = path_info.ultimate();
for (const auto& sig : path_info.sigs()) {
info.sigs.insert(sig);
}
info.ca = path_info.ca();
auto repair = path_info.repair();
auto check_sigs = path_info.check_sigs();
std::string saved;
RPCSource source(reader);
store_->addToStore(info, source, repair ? Repair : NoRepair,
check_sigs ? CheckSigs : NoCheckSigs, nullptr);
return Status::OK;
},
__FUNCTION__);
}
Status AddTextToStore(grpc::ServerContext*, Status AddTextToStore(grpc::ServerContext*,
const nix::proto::AddTextToStoreRequest* request, const nix::proto::AddTextToStoreRequest* request,
nix::proto::StorePath* response) override { nix::proto::StorePath* response) override {
@ -228,6 +287,19 @@ class WorkerServiceImpl final : public WorkerService::Service {
return Status::OK; return Status::OK;
} }
Status AddTempRoot(grpc::ServerContext*, const nix::proto::StorePath* request,
google::protobuf::Empty*) override {
auto path = request->path();
ASSERT_INPUT_STORE_PATH(path);
return HandleExceptions(
[&]() -> Status {
store_->addTempRoot(path);
return Status::OK;
},
__FUNCTION__);
}
Status AddIndirectRoot(grpc::ServerContext*, Status AddIndirectRoot(grpc::ServerContext*,
const nix::proto::StorePath* request, const nix::proto::StorePath* request,
google::protobuf::Empty*) override { google::protobuf::Empty*) override {

View file

@ -33,6 +33,8 @@ service WorkerService {
rpc EnsurePath(StorePath) returns (google.protobuf.Empty); rpc EnsurePath(StorePath) returns (google.protobuf.Empty);
// TODO: What does this do? // TODO: What does this do?
// TODO(grfn): This should not actually take a StorePath, as it's not a
// StorePath
rpc AddTempRoot(StorePath) returns (google.protobuf.Empty); rpc AddTempRoot(StorePath) returns (google.protobuf.Empty);
// TODO: What does this do? // TODO: What does this do?
@ -260,6 +262,10 @@ message PathInfo {
repeated string sigs = 7; repeated string sigs = 7;
// If non-empty, an assertion that the path is content-addressed // If non-empty, an assertion that the path is content-addressed
string ca = 8; string ca = 8;
// Only used for AddToStoreNarRequest
bool repair = 12;
bool check_sigs = 13;
} }
message SubstitutablePathInfos { message SubstitutablePathInfos {
@ -318,9 +324,9 @@ message AddSignaturesRequest {
} }
message AddToStoreNarRequest { message AddToStoreNarRequest {
oneof add_oneoff { oneof add_oneof {
PathInfo path_info = 1; PathInfo path_info = 1;
bytes chunk = 2; bytes data = 2;
} }
} }