refactor(tvix): Factor-out proto utilities

Factor out the shared utilities for interacting with protobufs/grpc from
libstore to a proto.hh header in libproto.

Change-Id: I1cb8d94867d5d4b63a9994be0b53f8f612eb8e3a
Reviewed-on: https://cl.tvl.fyi/c/depot/+/1691
Tested-by: BuildkiteCI
Reviewed-by: kanepyork <rikingcoding@gmail.com>
This commit is contained in:
Griffin Smith 2020-08-08 15:13:22 -04:00 committed by glittershark
parent ef7894273b
commit 747dc65154
3 changed files with 162 additions and 136 deletions

View file

@ -7,6 +7,8 @@
#include <absl/status/status.h> #include <absl/status/status.h>
#include <absl/strings/str_cat.h> #include <absl/strings/str_cat.h>
#include <absl/strings/str_format.h> #include <absl/strings/str_format.h>
#include <absl/strings/string_view.h>
#include <glog/logging.h>
#include <google/protobuf/empty.pb.h> #include <google/protobuf/empty.pb.h>
#include <google/protobuf/util/time_util.h> #include <google/protobuf/util/time_util.h>
#include <grpcpp/create_channel.h> #include <grpcpp/create_channel.h>
@ -23,6 +25,7 @@
#include "libstore/store-api.hh" #include "libstore/store-api.hh"
#include "libutil/archive.hh" #include "libutil/archive.hh"
#include "libutil/hash.hh" #include "libutil/hash.hh"
#include "libutil/proto.hh"
#include "libutil/types.hh" #include "libutil/types.hh"
namespace nix { namespace nix {
@ -35,27 +38,6 @@ using nix::proto::WorkerService;
static google::protobuf::Empty kEmpty; static google::protobuf::Empty kEmpty;
proto::StorePath StorePath(const Path& path) {
proto::StorePath store_path;
store_path.set_path(path);
return store_path;
}
proto::StorePaths StorePaths(const PathSet& paths) {
proto::StorePaths result;
for (const auto& path : paths) {
result.add_paths(path);
}
return result;
}
template <typename T, typename U>
T FillFrom(const U& src) {
T result;
result.insert(src.begin(), src.end());
return result;
}
template <typename Request> template <typename Request>
class RPCSink : public BufferedSink { class RPCSink : public BufferedSink {
public: public:
@ -85,88 +67,6 @@ class RPCSink : public BufferedSink {
bool good_; bool good_;
}; };
constexpr absl::StatusCode GRPCStatusCodeToAbsl(grpc::StatusCode code) {
switch (code) {
case grpc::StatusCode::OK:
return absl::StatusCode::kOk;
case grpc::StatusCode::CANCELLED:
return absl::StatusCode::kCancelled;
case grpc::StatusCode::UNKNOWN:
return absl::StatusCode::kUnknown;
case grpc::StatusCode::INVALID_ARGUMENT:
return absl::StatusCode::kInvalidArgument;
case grpc::StatusCode::DEADLINE_EXCEEDED:
return absl::StatusCode::kDeadlineExceeded;
case grpc::StatusCode::NOT_FOUND:
return absl::StatusCode::kNotFound;
case grpc::StatusCode::ALREADY_EXISTS:
return absl::StatusCode::kAlreadyExists;
case grpc::StatusCode::PERMISSION_DENIED:
return absl::StatusCode::kPermissionDenied;
case grpc::StatusCode::UNAUTHENTICATED:
return absl::StatusCode::kUnauthenticated;
case grpc::StatusCode::RESOURCE_EXHAUSTED:
return absl::StatusCode::kResourceExhausted;
case grpc::StatusCode::FAILED_PRECONDITION:
return absl::StatusCode::kFailedPrecondition;
case grpc::StatusCode::ABORTED:
return absl::StatusCode::kAborted;
case grpc::StatusCode::OUT_OF_RANGE:
return absl::StatusCode::kOutOfRange;
case grpc::StatusCode::UNIMPLEMENTED:
return absl::StatusCode::kUnimplemented;
case grpc::StatusCode::INTERNAL:
return absl::StatusCode::kInternal;
case grpc::StatusCode::UNAVAILABLE:
return absl::StatusCode::kUnavailable;
case grpc::StatusCode::DATA_LOSS:
return absl::StatusCode::kDataLoss;
default:
return absl::StatusCode::kInternal;
}
}
constexpr absl::string_view GRPCStatusCodeDescription(grpc::StatusCode code) {
switch (code) {
case grpc::StatusCode::OK:
return "OK";
case grpc::StatusCode::CANCELLED:
return "CANCELLED";
case grpc::StatusCode::UNKNOWN:
return "UNKNOWN";
case grpc::StatusCode::INVALID_ARGUMENT:
return "INVALID_ARGUMENT";
case grpc::StatusCode::DEADLINE_EXCEEDED:
return "DEADLINE_EXCEEDED";
case grpc::StatusCode::NOT_FOUND:
return "NOT_FOUND";
case grpc::StatusCode::ALREADY_EXISTS:
return "ALREADY_EXISTS";
case grpc::StatusCode::PERMISSION_DENIED:
return "PERMISSION_DENIED";
case grpc::StatusCode::UNAUTHENTICATED:
return "UNAUTHENTICATED";
case grpc::StatusCode::RESOURCE_EXHAUSTED:
return "RESOURCE_EXHAUSTED";
case grpc::StatusCode::FAILED_PRECONDITION:
return "FAILED_PRECONDITION";
case grpc::StatusCode::ABORTED:
return "ABORTED";
case grpc::StatusCode::OUT_OF_RANGE:
return "OUT_OF_RANGE";
case grpc::StatusCode::UNIMPLEMENTED:
return "UNIMPLEMENTED";
case grpc::StatusCode::INTERNAL:
return "INTERNAL";
case grpc::StatusCode::UNAVAILABLE:
return "UNAVAILABLE";
case grpc::StatusCode::DATA_LOSS:
return "DATA_LOSS";
default:
return "<BAD ERROR CODE>";
};
}
// TODO(grfn): Obviously this should go away and be replaced by StatusOr... but // TODO(grfn): Obviously this should go away and be replaced by StatusOr... but
// that would require refactoring the entire store api, which we don't feel like // that would require refactoring the entire store api, which we don't feel like
// doing right now. We should at some point though // doing right now. We should at some point though
@ -177,12 +77,12 @@ void const RpcStore::SuccessOrThrow(const grpc::Status& status,
switch (status.error_code()) { switch (status.error_code()) {
case grpc::StatusCode::UNIMPLEMENTED: case grpc::StatusCode::UNIMPLEMENTED:
throw Unsupported( throw Unsupported(
absl::StrFormat("operation is not supported by store at %s: %s", absl::StrFormat("operation %s is not supported by store at %s: %s",
uri, status.error_message())); call, uri, status.error_message()));
default: default:
throw Error( throw Error(absl::StrFormat(
absl::StrFormat("Rpc call to %s failed (%s): %s ", uri, "Rpc call %s to %s failed (%s): %s ", call, uri,
GRPCStatusCodeDescription(status.error_code()), util::proto::GRPCStatusCodeDescription(status.error_code()),
status.error_message())); status.error_message()));
} }
} }
@ -191,7 +91,7 @@ void const RpcStore::SuccessOrThrow(const grpc::Status& status,
bool RpcStore::isValidPathUncached(const Path& path) { bool RpcStore::isValidPathUncached(const Path& path) {
ClientContext ctx; ClientContext ctx;
proto::IsValidPathResponse resp; proto::IsValidPathResponse resp;
SuccessOrThrow(stub_->IsValidPath(&ctx, StorePath(path), &resp), SuccessOrThrow(stub_->IsValidPath(&ctx, util::proto::StorePath(path), &resp),
__FUNCTION__); __FUNCTION__);
return resp.is_valid(); return resp.is_valid();
} }
@ -200,7 +100,7 @@ PathSet RpcStore::queryAllValidPaths() {
ClientContext ctx; ClientContext ctx;
proto::StorePaths paths; proto::StorePaths paths;
SuccessOrThrow(stub_->QueryAllValidPaths(&ctx, kEmpty, &paths), __FUNCTION__); SuccessOrThrow(stub_->QueryAllValidPaths(&ctx, kEmpty, &paths), __FUNCTION__);
return FillFrom<PathSet>(paths.paths()); return util::proto::FillFrom<PathSet>(paths.paths());
} }
PathSet RpcStore::queryValidPaths(const PathSet& paths, PathSet RpcStore::queryValidPaths(const PathSet& paths,
@ -213,7 +113,7 @@ PathSet RpcStore::queryValidPaths(const PathSet& paths,
proto::StorePaths result_paths; proto::StorePaths result_paths;
SuccessOrThrow(stub_->QueryValidPaths(&ctx, store_paths, &result_paths), SuccessOrThrow(stub_->QueryValidPaths(&ctx, store_paths, &result_paths),
__FUNCTION__); __FUNCTION__);
return FillFrom<PathSet>(result_paths.paths()); return util::proto::FillFrom<PathSet>(result_paths.paths());
} }
void RpcStore::queryPathInfoUncached( void RpcStore::queryPathInfoUncached(
@ -260,7 +160,8 @@ void RpcStore::queryPathInfoUncached(
void RpcStore::queryReferrers(const Path& path, PathSet& referrers) { void RpcStore::queryReferrers(const Path& path, PathSet& referrers) {
ClientContext ctx; ClientContext ctx;
proto::StorePaths paths; proto::StorePaths paths;
SuccessOrThrow(stub_->QueryReferrers(&ctx, StorePath(path), &paths), SuccessOrThrow(
stub_->QueryReferrers(&ctx, util::proto::StorePath(path), &paths),
__FUNCTION__); __FUNCTION__);
referrers.insert(paths.paths().begin(), paths.paths().end()); referrers.insert(paths.paths().begin(), paths.paths().end());
} }
@ -268,25 +169,27 @@ void RpcStore::queryReferrers(const Path& path, PathSet& referrers) {
PathSet RpcStore::queryValidDerivers(const Path& path) { PathSet RpcStore::queryValidDerivers(const Path& path) {
ClientContext ctx; ClientContext ctx;
proto::StorePaths paths; proto::StorePaths paths;
SuccessOrThrow(stub_->QueryValidDerivers(&ctx, StorePath(path), &paths), SuccessOrThrow(
stub_->QueryValidDerivers(&ctx, util::proto::StorePath(path), &paths),
__FUNCTION__); __FUNCTION__);
return FillFrom<PathSet>(paths.paths()); return util::proto::FillFrom<PathSet>(paths.paths());
} }
PathSet RpcStore::queryDerivationOutputs(const Path& path) { PathSet RpcStore::queryDerivationOutputs(const Path& path) {
ClientContext ctx; ClientContext ctx;
proto::StorePaths paths; proto::StorePaths paths;
SuccessOrThrow(stub_->QueryDerivationOutputs(&ctx, StorePath(path), &paths), SuccessOrThrow(
stub_->QueryDerivationOutputs(&ctx, util::proto::StorePath(path), &paths),
__FUNCTION__); __FUNCTION__);
return FillFrom<PathSet>(paths.paths()); return util::proto::FillFrom<PathSet>(paths.paths());
} }
StringSet RpcStore::queryDerivationOutputNames(const Path& path) { StringSet RpcStore::queryDerivationOutputNames(const Path& path) {
ClientContext ctx; ClientContext ctx;
proto::DerivationOutputNames output_names; proto::DerivationOutputNames output_names;
SuccessOrThrow( SuccessOrThrow(stub_->QueryDerivationOutputNames(
stub_->QueryDerivationOutputNames(&ctx, StorePath(path), &output_names)); &ctx, util::proto::StorePath(path), &output_names));
return FillFrom<StringSet>(output_names.names()); return util::proto::FillFrom<StringSet>(output_names.names());
} }
Path RpcStore::queryPathFromHashPart(const std::string& hashPart) { Path RpcStore::queryPathFromHashPart(const std::string& hashPart) {
@ -302,17 +205,17 @@ Path RpcStore::queryPathFromHashPart(const std::string& hashPart) {
PathSet RpcStore::querySubstitutablePaths(const PathSet& paths) { PathSet RpcStore::querySubstitutablePaths(const PathSet& paths) {
ClientContext ctx; ClientContext ctx;
proto::StorePaths result; proto::StorePaths result;
SuccessOrThrow( SuccessOrThrow(stub_->QuerySubstitutablePaths(
stub_->QuerySubstitutablePaths(&ctx, StorePaths(paths), &result)); &ctx, util::proto::StorePaths(paths), &result));
return FillFrom<PathSet>(result.paths()); return util::proto::FillFrom<PathSet>(result.paths());
} }
void RpcStore::querySubstitutablePathInfos(const PathSet& paths, void RpcStore::querySubstitutablePathInfos(const PathSet& paths,
SubstitutablePathInfos& infos) { SubstitutablePathInfos& infos) {
ClientContext ctx; ClientContext ctx;
proto::SubstitutablePathInfos result; proto::SubstitutablePathInfos result;
SuccessOrThrow( SuccessOrThrow(stub_->QuerySubstitutablePathInfos(
stub_->QuerySubstitutablePathInfos(&ctx, StorePaths(paths), &result)); &ctx, util::proto::StorePaths(paths), &result));
for (const auto& path_info : result.path_infos()) { for (const auto& path_info : result.path_infos()) {
auto path = path_info.path().path(); auto path = path_info.path().path();
@ -321,7 +224,7 @@ void RpcStore::querySubstitutablePathInfos(const PathSet& paths,
if (!info.deriver.empty()) { if (!info.deriver.empty()) {
assertStorePath(info.deriver); assertStorePath(info.deriver);
} }
info.references = FillFrom<PathSet>(path_info.references()); info.references = util::proto::FillFrom<PathSet>(path_info.references());
info.downloadSize = path_info.download_size(); info.downloadSize = path_info.download_size();
info.narSize = path_info.nar_size(); info.narSize = path_info.nar_size();
} }
@ -353,7 +256,9 @@ void RpcStore::addToStore(const ValidPathInfo& info, Source& narSource,
path_info_req.mutable_path_info()->set_repair(repair); path_info_req.mutable_path_info()->set_repair(repair);
path_info_req.mutable_path_info()->set_check_sigs(checkSigs); path_info_req.mutable_path_info()->set_check_sigs(checkSigs);
writer->Write(path_info_req); if (!writer->Write(path_info_req)) {
throw Error("Could not write to nix daemon");
}
RPCSink sink(std::move(writer)); RPCSink sink(std::move(writer));
copyNAR(narSource, sink); copyNAR(narSource, sink);
@ -378,7 +283,10 @@ Path RpcStore::addToStore(const std::string& name, const Path& srcPath,
metadata_req.mutable_meta()->set_fixed(!(hashAlgo == htSHA256 && recursive)); metadata_req.mutable_meta()->set_fixed(!(hashAlgo == htSHA256 && recursive));
metadata_req.mutable_meta()->set_recursive(recursive); metadata_req.mutable_meta()->set_recursive(recursive);
metadata_req.mutable_meta()->set_hash_type(HashTypeToProto(hashAlgo)); metadata_req.mutable_meta()->set_hash_type(HashTypeToProto(hashAlgo));
writer->Write(metadata_req);
if (!writer->Write(metadata_req)) {
throw Error("Could not write to nix daemon");
}
RPCSink sink(std::move(writer)); RPCSink sink(std::move(writer));
dumpPath(std::filesystem::absolute(srcPath), sink); dumpPath(std::filesystem::absolute(srcPath), sink);
@ -435,14 +343,16 @@ void RpcStore::ensurePath(const Path& path) {
void RpcStore::addTempRoot(const Path& path) { void RpcStore::addTempRoot(const Path& path) {
ClientContext ctx; ClientContext ctx;
google::protobuf::Empty response; google::protobuf::Empty response;
SuccessOrThrow(stub_->AddTempRoot(&ctx, StorePath(path), &response), SuccessOrThrow(
stub_->AddTempRoot(&ctx, util::proto::StorePath(path), &response),
__FUNCTION__); __FUNCTION__);
} }
void RpcStore::addIndirectRoot(const Path& path) { void RpcStore::addIndirectRoot(const Path& path) {
ClientContext ctx; ClientContext ctx;
google::protobuf::Empty response; google::protobuf::Empty response;
SuccessOrThrow(stub_->AddIndirectRoot(&ctx, StorePath(path), &response), SuccessOrThrow(
stub_->AddIndirectRoot(&ctx, util::proto::StorePath(path), &response),
__FUNCTION__); __FUNCTION__);
} }
@ -459,7 +369,8 @@ Roots RpcStore::findRoots(bool censor) {
Roots result; Roots result;
for (const auto& [target, links] : response.roots()) { for (const auto& [target, links] : response.roots()) {
auto link_paths = FillFrom<std::unordered_set<std::string>>(links.paths()); auto link_paths =
util::proto::FillFrom<std::unordered_set<std::string>>(links.paths());
result.insert({target, link_paths}); result.insert({target, link_paths});
} }

View file

@ -18,6 +18,7 @@ set(HEADER_FILES
lru-cache.hh lru-cache.hh
monitor-fd.hh monitor-fd.hh
pool.hh pool.hh
proto.hh
ref.hh ref.hh
serialise.hh serialise.hh
sync.hh sync.hh

114
third_party/nix/src/libutil/proto.hh vendored Normal file
View file

@ -0,0 +1,114 @@
#pragma once
#include <absl/status/status.h>
#include <grpcpp/impl/codegen/status.h>
#include "libproto/worker.pb.h"
#include "libutil/types.hh"
namespace nix::util::proto {
::nix::proto::StorePath StorePath(const Path& path) {
::nix::proto::StorePath store_path;
store_path.set_path(path);
return store_path;
}
::nix::proto::StorePaths StorePaths(const PathSet& paths) {
::nix::proto::StorePaths result;
for (const auto& path : paths) {
result.add_paths(path);
}
return result;
}
template <typename T, typename U>
T FillFrom(const U& src) {
T result;
result.insert(src.begin(), src.end());
return result;
}
constexpr absl::StatusCode GRPCStatusCodeToAbsl(grpc::StatusCode code) {
switch (code) {
case grpc::StatusCode::OK:
return absl::StatusCode::kOk;
case grpc::StatusCode::CANCELLED:
return absl::StatusCode::kCancelled;
case grpc::StatusCode::UNKNOWN:
return absl::StatusCode::kUnknown;
case grpc::StatusCode::INVALID_ARGUMENT:
return absl::StatusCode::kInvalidArgument;
case grpc::StatusCode::DEADLINE_EXCEEDED:
return absl::StatusCode::kDeadlineExceeded;
case grpc::StatusCode::NOT_FOUND:
return absl::StatusCode::kNotFound;
case grpc::StatusCode::ALREADY_EXISTS:
return absl::StatusCode::kAlreadyExists;
case grpc::StatusCode::PERMISSION_DENIED:
return absl::StatusCode::kPermissionDenied;
case grpc::StatusCode::UNAUTHENTICATED:
return absl::StatusCode::kUnauthenticated;
case grpc::StatusCode::RESOURCE_EXHAUSTED:
return absl::StatusCode::kResourceExhausted;
case grpc::StatusCode::FAILED_PRECONDITION:
return absl::StatusCode::kFailedPrecondition;
case grpc::StatusCode::ABORTED:
return absl::StatusCode::kAborted;
case grpc::StatusCode::OUT_OF_RANGE:
return absl::StatusCode::kOutOfRange;
case grpc::StatusCode::UNIMPLEMENTED:
return absl::StatusCode::kUnimplemented;
case grpc::StatusCode::INTERNAL:
return absl::StatusCode::kInternal;
case grpc::StatusCode::UNAVAILABLE:
return absl::StatusCode::kUnavailable;
case grpc::StatusCode::DATA_LOSS:
return absl::StatusCode::kDataLoss;
default:
return absl::StatusCode::kInternal;
}
}
constexpr absl::string_view GRPCStatusCodeDescription(grpc::StatusCode code) {
switch (code) {
case grpc::StatusCode::OK:
return "OK";
case grpc::StatusCode::CANCELLED:
return "CANCELLED";
case grpc::StatusCode::UNKNOWN:
return "UNKNOWN";
case grpc::StatusCode::INVALID_ARGUMENT:
return "INVALID_ARGUMENT";
case grpc::StatusCode::DEADLINE_EXCEEDED:
return "DEADLINE_EXCEEDED";
case grpc::StatusCode::NOT_FOUND:
return "NOT_FOUND";
case grpc::StatusCode::ALREADY_EXISTS:
return "ALREADY_EXISTS";
case grpc::StatusCode::PERMISSION_DENIED:
return "PERMISSION_DENIED";
case grpc::StatusCode::UNAUTHENTICATED:
return "UNAUTHENTICATED";
case grpc::StatusCode::RESOURCE_EXHAUSTED:
return "RESOURCE_EXHAUSTED";
case grpc::StatusCode::FAILED_PRECONDITION:
return "FAILED_PRECONDITION";
case grpc::StatusCode::ABORTED:
return "ABORTED";
case grpc::StatusCode::OUT_OF_RANGE:
return "OUT_OF_RANGE";
case grpc::StatusCode::UNIMPLEMENTED:
return "UNIMPLEMENTED";
case grpc::StatusCode::INTERNAL:
return "INTERNAL";
case grpc::StatusCode::UNAVAILABLE:
return "UNAVAILABLE";
case grpc::StatusCode::DATA_LOSS:
return "DATA_LOSS";
default:
return "<BAD ERROR CODE>";
};
}
} // namespace nix::util::proto