feat(3p/nix): Implement AddToStore proto handler

Implement the proto handler for AddToStore, which adds a nix path to the
store. This is implemented by adding a new (probably
soon-to-be-generalized) Source concretion that wraps a grpc ServerReader
for the stream of data we're receiving from the client - this is less
than ideal, as it's perpetuating the source/sink thing that's going on
and storing entire nars in memory, but is at the very worst an
incremental step towards a functioning nix that we can refactor in the
future.

Paired-With: Perry Lorier <isomer@tvl.fyi>
Paired-With: Vincent Ambo <mail@tazj.in>
Change-Id: I48db734e7460a47aee4a85dd5137b690980859e3
Reviewed-on: https://cl.tvl.fyi/c/depot/+/1441
Tested-by: BuildkiteCI
Reviewed-by: kanepyork <rikingcoding@gmail.com>
Reviewed-by: tazjin <mail@tazj.in>
This commit is contained in:
Griffin Smith 2020-07-25 18:44:37 -04:00 committed by glittershark
parent 1fe4a47aa2
commit 05e44c121d
7 changed files with 141 additions and 13 deletions

View file

@ -408,9 +408,10 @@ class Store : public std::enable_shared_from_this<Store>, public Config {
std::shared_ptr<FSAccessor> accessor = 0); std::shared_ptr<FSAccessor> accessor = 0);
/* Copy the contents of a path to the store and register the /* Copy the contents of a path to the store and register the
validity the resulting path. The resulting path is returned. validity of the resulting path. The resulting path is returned.
The function object `filter' can be used to exclude files (see The function object `filter' can be used to exclude files (see
libutil/archive.hh). */ libutil/archive.hh). If recursive is set to true, the path will be treated
as a directory (eg cp -r vs cp) */
virtual Path addToStore(const std::string& name, const Path& srcPath, virtual Path addToStore(const std::string& name, const Path& srcPath,
bool recursive = true, HashType hashAlgo = htSHA256, bool recursive = true, HashType hashAlgo = htSHA256,
PathFilter& filter = defaultPathFilter, PathFilter& filter = defaultPathFilter,

View file

@ -47,6 +47,7 @@ target_sources(nixutil
) )
target_link_libraries(nixutil target_link_libraries(nixutil
nixproto
absl::strings absl::strings
absl::statusor absl::statusor
glog glog

View file

@ -18,6 +18,23 @@
namespace nix { namespace nix {
std::optional<HashType> hash_type_from(nix::proto::HashType hash_type) {
switch (hash_type) {
case nix::proto::HashType::UNKNOWN:
return HashType::htUnknown;
case nix::proto::HashType::MD5:
return HashType::htMD5;
case nix::proto::HashType::SHA1:
return HashType::htSHA1;
case nix::proto::HashType::SHA256:
return HashType::htSHA256;
case nix::proto::HashType::SHA512:
return HashType::htSHA512;
default:
return {};
}
}
void Hash::init() { void Hash::init() {
if (type == htMD5) { if (type == htMD5) {
hashSize = md5HashSize; hashSize = md5HashSize;

View file

@ -2,6 +2,7 @@
#include <absl/status/statusor.h> #include <absl/status/statusor.h>
#include "libproto/worker.grpc.pb.h"
#include "libutil/serialise.hh" #include "libutil/serialise.hh"
#include "libutil/types.hh" #include "libutil/types.hh"
@ -9,8 +10,12 @@ namespace nix {
MakeError(BadHash, Error); MakeError(BadHash, Error);
// TODO(grfn): Replace this with the hash type enum from the daemon proto so we
// don't have to juggle two different types
enum HashType : char { htUnknown, htMD5, htSHA1, htSHA256, htSHA512 }; enum HashType : char { htUnknown, htMD5, htSHA1, htSHA256, htSHA512 };
std::optional<HashType> hash_type_from(nix::proto::HashType hash_type);
const int md5HashSize = 16; const int md5HashSize = 16;
const int sha1HashSize = 20; const int sha1HashSize = 20;
const int sha256HashSize = 32; const int sha256HashSize = 32;

View file

@ -1,5 +1,7 @@
#include "nix-daemon-proto.hh" #include "nix-daemon-proto.hh"
#include <sstream>
#include <google/protobuf/empty.pb.h> #include <google/protobuf/empty.pb.h>
#include <google/protobuf/util/time_util.h> #include <google/protobuf/util/time_util.h>
#include <grpcpp/impl/codegen/server_context.h> #include <grpcpp/impl/codegen/server_context.h>
@ -10,7 +12,11 @@
#include "libproto/worker.grpc.pb.h" #include "libproto/worker.grpc.pb.h"
#include "libproto/worker.pb.h" #include "libproto/worker.pb.h"
#include "libstore/derivations.hh" #include "libstore/derivations.hh"
#include "libstore/local-store.hh"
#include "libstore/store-api.hh" #include "libstore/store-api.hh"
#include "libutil/archive.hh"
#include "libutil/hash.hh"
#include "libutil/serialise.hh"
namespace nix::daemon { namespace nix::daemon {
@ -23,6 +29,58 @@ using ::nix::proto::WorkerService;
static Status INVALID_STORE_PATH = static Status INVALID_STORE_PATH =
Status(grpc::StatusCode::INVALID_ARGUMENT, "Invalid store path"); Status(grpc::StatusCode::INVALID_ARGUMENT, "Invalid store path");
class AddToStoreRequestSource final : public Source {
using Reader = grpc::ServerReader<nix::proto::AddToStoreRequest>;
public:
explicit AddToStoreRequestSource(Reader* reader) : reader_(reader) {}
size_t read(unsigned char* data, size_t len) override {
auto got = buffer_.sgetn(reinterpret_cast<char*>(data), len);
if (got < len) {
proto::AddToStoreRequest msg;
if (!reader_->Read(&msg)) {
return got;
}
if (msg.add_oneof_case() != proto::AddToStoreRequest::kData) {
// TODO(grfn): Make Source::read return a StatusOr and get rid of this
// throw
throw Error(
"Invalid AddToStoreRequest: all messages except the first must "
"contain data");
}
buffer_.sputn(msg.data().data(), msg.data().length());
return got + read(data + got, len - got);
}
return got;
};
private:
std::stringbuf buffer_;
Reader* reader_;
};
// TODO(grfn): Make this some sort of pipe so we don't have to store data in
// memory
/* If the NAR archive contains a single file at top-level, then save
the contents of the file to `s'. Otherwise barf. */
struct RetrieveRegularNARSink : ParseSink {
bool regular{true};
std::string s;
RetrieveRegularNARSink() {}
void createDirectory(const Path& path) override { regular = false; }
void receiveContents(unsigned char* data, unsigned int len) override {
s.append((const char*)data, len);
}
void createSymlink(const Path& path, const std::string& target) override {
regular = false;
}
};
class WorkerServiceImpl final : public WorkerService::Service { class WorkerServiceImpl final : public WorkerService::Service {
public: public:
explicit WorkerServiceImpl(nix::Store& store) : store_(&store) {} explicit WorkerServiceImpl(nix::Store& store) : store_(&store) {}
@ -61,6 +119,57 @@ class WorkerServiceImpl final : public WorkerService::Service {
return Status::OK; return Status::OK;
} }
Status AddToStore(grpc::ServerContext* context,
grpc::ServerReader<nix::proto::AddToStoreRequest>* reader,
nix::proto::StorePath* response) override {
proto::AddToStoreRequest metadata_request;
auto has_metadata = reader->Read(&metadata_request);
if (!has_metadata || metadata_request.has_meta()) {
return Status(grpc::StatusCode::INVALID_ARGUMENT,
"Metadata must be set before sending file content");
}
auto meta = metadata_request.meta();
AddToStoreRequestSource source(reader);
auto opt_hash_type = hash_type_from(meta.hash_type());
if (!opt_hash_type) {
return Status(grpc::StatusCode::INTERNAL, "Invalid hash type");
}
std::string* data;
RetrieveRegularNARSink nar;
TeeSource saved_nar(source);
if (meta.recursive()) {
// TODO(grfn): Don't store the full data in memory, instead just make
// addToStoreFromDump take a Source
ParseSink sink;
parseDump(sink, saved_nar);
data = &(*saved_nar.data);
} else {
parseDump(nar, source);
if (!nar.regular) {
return Status(grpc::StatusCode::INVALID_ARGUMENT,
"Regular file expected");
}
data = &nar.s;
}
auto local_store = store_.dynamic_pointer_cast<LocalStore>();
if (!local_store) {
return Status(grpc::StatusCode::FAILED_PRECONDITION,
"operation is only supported by LocalStore");
}
auto path = local_store->addToStoreFromDump(
*data, meta.base_name(), meta.recursive(), opt_hash_type.value());
response->set_path(path);
return Status::OK;
}
Status QueryValidDerivers(grpc::ServerContext* context, Status QueryValidDerivers(grpc::ServerContext* context,
const StorePath* request, const StorePath* request,
StorePaths* response) override { StorePaths* response) override {

View file

@ -298,15 +298,15 @@ static void performOp(TunnelLogger* logger, const ref<Store>& store,
case wopAddToStore: { case wopAddToStore: {
bool fixed = 0; bool fixed = 0;
bool recursive = 0; bool recursive = 0;
std::string s; std::string hashType;
std::string baseName; std::string baseName;
from >> baseName >> fixed /* obsolete */ >> recursive >> s; from >> baseName >> fixed /* obsolete */ >> recursive >> hashType;
/* Compatibility hack. */ /* Compatibility hack. */
if (!fixed) { if (!fixed) {
s = "sha256"; hashType = "sha256";
recursive = true; recursive = true;
} }
HashType hashAlgo = parseHashType(s); HashType hashAlgo = parseHashType(hashType);
TeeSource savedNAR(from); TeeSource savedNAR(from);
RetrieveRegularNARSink savedRegular; RetrieveRegularNARSink savedRegular;

View file

@ -17,7 +17,7 @@ service WorkerService {
// Query referrers for a given path. // Query referrers for a given path.
rpc QueryReferrers(StorePath) returns (StorePaths); rpc QueryReferrers(StorePath) returns (StorePaths);
// Add a NAR (I think?) to the store. The first stream request // Add a path to the store. The first stream request
// should be a message indicating metadata, the rest should be file // should be a message indicating metadata, the rest should be file
// chunks. // chunks.
rpc AddToStore(stream AddToStoreRequest) returns (StorePath); rpc AddToStore(stream AddToStoreRequest) returns (StorePath);
@ -185,14 +185,9 @@ message AddToStoreRequest {
string base_name = 4; string base_name = 4;
} }
message Chunk {
bytes content = 1;
bool final = 2;
}
oneof add_oneof { oneof add_oneof {
Metadata meta = 1; Metadata meta = 1;
Chunk chunk = 2; bytes data = 3;
} }
} }