feat(tvix): Implement all remaining RPC calls

Implement all remaining RPC calls on the RpcSstore client, remove a few
stub methods we had added that weren't actually present in the old
RemoteStore implementation, and add one more RPC call for getBuildLog
that is present in the store API, but that we hadn't added as a
stub *or* to the proto.

Fixes: #29
Change-Id: Id827f51a393ece4bc7bbecaf38aee9eb4b329770
Reviewed-on: https://cl.tvl.fyi/c/depot/+/1692
Reviewed-by: kanepyork <rikingcoding@gmail.com>
Tested-by: BuildkiteCI
This commit is contained in:
Griffin Smith 2020-08-08 16:44:50 -04:00 committed by glittershark
parent 747dc65154
commit e440f60b6c
8 changed files with 159 additions and 50 deletions

View file

@ -5,6 +5,7 @@
#include <absl/strings/string_view.h>
#include <glog/logging.h>
#include "libproto/worker.pb.h"
#include "libstore/fs-accessor.hh"
#include "libstore/globals.hh"
#include "libstore/store-api.hh"
@ -33,6 +34,14 @@ void DerivationOutput::parseHashInfo(bool& recursive, Hash& hash) const {
hash = Hash::unwrap_throw(hash_);
}
nix::proto::Derivation_DerivationOutput DerivationOutput::to_proto() const {
nix::proto::Derivation_DerivationOutput result;
result.mutable_path()->set_path(path);
result.set_hash_algo(hashAlgo);
result.set_hash(hash);
return result;
}
BasicDerivation BasicDerivation::from_proto(
const nix::proto::Derivation* proto_derivation, const nix::Store& store) {
BasicDerivation result;
@ -57,6 +66,27 @@ BasicDerivation BasicDerivation::from_proto(
return result;
}
nix::proto::Derivation BasicDerivation::to_proto() const {
nix::proto::Derivation result;
for (const auto& [key, output] : outputs) {
result.mutable_outputs()->insert({key, output.to_proto()});
}
for (const auto& input_src : inputSrcs) {
result.mutable_input_sources()->add_paths(input_src);
}
result.set_platform(platform);
result.mutable_builder()->set_path(builder);
for (const auto& arg : args) {
result.add_args(arg);
}
for (const auto& [key, value] : env) {
result.mutable_env()->insert({key, value});
}
return result;
}
Path BasicDerivation::findOutput(const std::string& id) const {
auto i = outputs.find(id);
if (i == outputs.end()) {

View file

@ -35,6 +35,8 @@ struct DerivationOutput {
hash(proto_derivation_output.hash()) {}
void parseHashInfo(bool& recursive, Hash& hash) const;
[[nodiscard]] nix::proto::Derivation_DerivationOutput to_proto() const;
};
// TODO(tazjin): Determine whether this actually needs to be ordered.
@ -61,6 +63,8 @@ struct BasicDerivation {
static BasicDerivation from_proto(
const nix::proto::Derivation* proto_derivation, const nix::Store& store);
[[nodiscard]] nix::proto::Derivation to_proto() const;
virtual ~BasicDerivation(){};
/* Return the path corresponding to the output identifier `id' in

View file

@ -22,7 +22,9 @@
#include "libproto/worker.grpc.pb.h"
#include "libproto/worker.pb.h"
#include "libstore/derivations.hh"
#include "libstore/store-api.hh"
#include "libstore/worker-protocol.hh"
#include "libutil/archive.hh"
#include "libutil/hash.hh"
#include "libutil/proto.hh"
@ -315,10 +317,6 @@ Path RpcStore::addTextToStore(const std::string& name,
return result.path();
}
void RpcStore::narFromPath(const Path& path, Sink& sink) {
throw Unsupported(absl::StrCat("Not implemented ", __func__));
}
void RpcStore::buildPaths(const PathSet& paths, BuildMode buildMode) {
ClientContext ctx;
proto::BuildPathsRequest request;
@ -333,11 +331,29 @@ void RpcStore::buildPaths(const PathSet& paths, BuildMode buildMode) {
BuildResult RpcStore::buildDerivation(const Path& drvPath,
const BasicDerivation& drv,
BuildMode buildMode) {
throw Unsupported(absl::StrCat("Not implemented ", __func__));
ClientContext ctx;
proto::BuildDerivationRequest request;
request.mutable_drv_path()->set_path(drvPath);
auto proto_drv = drv.to_proto();
request.set_allocated_derivation(&proto_drv);
request.set_build_mode(BuildModeToProto(buildMode));
proto::BuildDerivationResponse response;
SuccessOrThrow(stub_->BuildDerivation(&ctx, request, &response),
__FUNCTION__);
const auto result = BuildResult::FromProto(response);
if (!result.has_value()) {
throw Error("Invalid response from daemon for buildDerivation");
}
return result.value();
}
void RpcStore::ensurePath(const Path& path) {
throw Unsupported(absl::StrCat("Not implemented ", __func__));
ClientContext ctx;
google::protobuf::Empty response;
SuccessOrThrow(
stub_->EnsurePath(&ctx, util::proto::StorePath(path), &response),
__FUNCTION__);
}
void RpcStore::addTempRoot(const Path& path) {
@ -397,53 +413,64 @@ void RpcStore::collectGarbage(const GCOptions& options, GCResults& results) {
}
void RpcStore::optimiseStore() {
throw Unsupported(absl::StrCat("Not implemented ", __func__));
ClientContext ctx;
google::protobuf::Empty response;
SuccessOrThrow(stub_->OptimiseStore(&ctx, kEmpty, &response), __FUNCTION__);
}
bool RpcStore::verifyStore(bool checkContents, RepairFlag repair) {
throw Unsupported(absl::StrCat("Not implemented ", __func__));
ClientContext ctx;
proto::VerifyStoreRequest request;
request.set_check_contents(checkContents);
request.set_repair(repair);
proto::VerifyStoreResponse response;
SuccessOrThrow(stub_->VerifyStore(&ctx, request, &response), __FUNCTION__);
return response.errors();
}
void RpcStore::addSignatures(const Path& storePath, const StringSet& sigs) {
throw Unsupported(absl::StrCat("Not implemented ", __func__));
}
void RpcStore::computeFSClosure(const PathSet& paths, PathSet& paths_,
bool flipDirection, bool includeOutputs,
bool includeDerivers) {
throw Unsupported(absl::StrCat("Not implemented ", __func__));
ClientContext ctx;
proto::AddSignaturesRequest request;
request.mutable_path()->set_path(storePath);
for (const auto& sig : sigs) {
request.mutable_sigs()->add_sigs(sig);
}
google::protobuf::Empty response;
SuccessOrThrow(stub_->AddSignatures(&ctx, request, &response), __FUNCTION__);
}
void RpcStore::queryMissing(const PathSet& targets, PathSet& willBuild,
PathSet& willSubstitute, PathSet& unknown,
unsigned long long& downloadSize,
unsigned long long& narSize) {
throw Unsupported(absl::StrCat("Not implemented ", __func__));
ClientContext ctx;
proto::QueryMissingResponse response;
SuccessOrThrow(
stub_->QueryMissing(&ctx, util::proto::StorePaths(targets), &response),
__FUNCTION__);
willBuild = util::proto::FillFrom<PathSet>(response.will_build());
willSubstitute = util::proto::FillFrom<PathSet>(response.will_substitute());
unknown = util::proto::FillFrom<PathSet>(response.unknown());
downloadSize = response.download_size();
narSize = response.nar_size();
}
std::shared_ptr<std::string> RpcStore::getBuildLog(const Path& path) {
throw Unsupported(absl::StrCat("Not implemented ", __func__));
ClientContext ctx;
proto::BuildLog response;
SuccessOrThrow(
stub_->GetBuildLog(&ctx, util::proto::StorePath(path), &response),
__FUNCTION__);
auto build_log = response.build_log();
if (build_log.empty()) {
return nullptr;
}
return std::make_shared<std::string>(build_log);
}
void RpcStore::connect() {
throw Unsupported(absl::StrCat("Not implemented ", __func__));
}
unsigned int RpcStore::getProtocol() {
throw Unsupported(absl::StrCat("Not implemented ", __func__));
}
int RpcStore::getPriority() {
throw Unsupported(absl::StrCat("Not implemented ", __func__));
}
Path RpcStore::toRealPath(const Path& storePath) {
throw Unsupported(absl::StrCat("Not implemented ", __func__));
}
void RpcStore::createUser(const std::string& userName, uid_t userId) {
throw Unsupported(absl::StrCat("Not implemented ", __func__));
}
unsigned int RpcStore::getProtocol() { return PROTOCOL_VERSION; }
} // namespace store

View file

@ -67,8 +67,6 @@ class RpcStore : public LocalFSStore, public virtual Store {
const PathSet& references,
RepairFlag repair = NoRepair) override;
virtual void narFromPath(const Path& path, Sink& sink) override;
virtual void buildPaths(const PathSet& paths,
BuildMode buildMode = bmNormal) override;
@ -97,11 +95,6 @@ class RpcStore : public LocalFSStore, public virtual Store {
virtual void addSignatures(const Path& storePath,
const StringSet& sigs) override;
virtual void computeFSClosure(const PathSet& paths, PathSet& paths_,
bool flipDirection = false,
bool includeOutputs = false,
bool includeDerivers = false) override;
virtual void queryMissing(const PathSet& targets, PathSet& willBuild,
PathSet& willSubstitute, PathSet& unknown,
unsigned long long& downloadSize,
@ -109,16 +102,10 @@ class RpcStore : public LocalFSStore, public virtual Store {
virtual std::shared_ptr<std::string> getBuildLog(const Path& path) override;
virtual void connect() override;
void connect() override{};
virtual unsigned int getProtocol() override;
virtual int getPriority() override;
virtual Path toRealPath(const Path& storePath) override;
virtual void createUser(const std::string& userName, uid_t userId) override;
protected:
virtual bool isValidPathUncached(const Path& path) override;

View file

@ -77,6 +77,44 @@ nix::proto::BuildStatus BuildResult::status_to_proto() {
}
}
std::optional<BuildResult> BuildResult::FromProto(
const nix::proto::BuildDerivationResponse& resp) {
BuildResult result;
switch (resp.status()) {
case proto::BuildStatus::Built:
result.status = BuildResult::Status::Built;
case proto::BuildStatus::Substituted:
result.status = BuildResult::Status::Substituted;
case proto::BuildStatus::AlreadyValid:
result.status = BuildResult::Status::AlreadyValid;
case proto::BuildStatus::PermanentFailure:
result.status = BuildResult::Status::PermanentFailure;
case proto::BuildStatus::InputRejected:
result.status = BuildResult::Status::InputRejected;
case proto::BuildStatus::OutputRejected:
result.status = BuildResult::Status::OutputRejected;
case proto::BuildStatus::TransientFailure:
result.status = BuildResult::Status::TransientFailure;
case proto::BuildStatus::CachedFailure:
result.status = BuildResult::Status::CachedFailure;
case proto::BuildStatus::TimedOut:
result.status = BuildResult::Status::TimedOut;
case proto::BuildStatus::MiscFailure:
result.status = BuildResult::Status::MiscFailure;
case proto::BuildStatus::DependencyFailed:
result.status = BuildResult::Status::DependencyFailed;
case proto::BuildStatus::LogLimitExceeded:
result.status = BuildResult::Status::LogLimitExceeded;
case proto::BuildStatus::NotDeterministic:
result.status = BuildResult::Status::NotDeterministic;
default:
return {};
}
result.errorMsg = resp.error_message();
return result;
}
std::optional<GCOptions::GCAction> GCActionFromProto(
nix::proto::GCAction gc_action) {
switch (gc_action) {

View file

@ -235,6 +235,9 @@ struct BuildResult {
// Convert the status of this `BuildResult` to its corresponding
// `nix::proto::BuildStatus`
nix::proto::BuildStatus status_to_proto();
static std::optional<BuildResult> FromProto(
const nix::proto::BuildDerivationResponse& resp);
};
class Store : public std::enable_shared_from_this<Store>, public Config {

View file

@ -690,6 +690,19 @@ class WorkerServiceImpl final : public WorkerService::Service {
__FUNCTION__);
};
Status GetBuildLog(grpc::ServerContext* context, const StorePath* request,
proto::BuildLog* response) override {
return HandleExceptions(
[&]() -> Status {
const auto log = store_->getBuildLog(request->path());
if (log) {
response->set_build_log(*log);
}
return Status::OK;
},
__FUNCTION__);
}
private:
Status HandleExceptions(std::function<Status(void)> fn,
absl::string_view methodName) {

View file

@ -107,6 +107,9 @@ service WorkerService {
// derivations that will be built, and the set of output paths that
// will be substituted.
rpc QueryMissing(StorePaths) returns (QueryMissingResponse);
// Return the build log of the specified store path, if available
rpc GetBuildLog(StorePath) returns (BuildLog);
}
enum HashType {
@ -337,3 +340,7 @@ message QueryMissingResponse {
uint64 download_size = 4;
uint64 nar_size = 5;
}
message BuildLog {
string build_log = 1;
}