fix(tvix): Chunk the AddTextToStore request

Rather than sending the entire AddTextToStore request along in a single
message, send it in a stream of chunks using the same metadata-first
approach we've been using for the other store gRPC requests. This fixes
a bug where certain builds could send more data than the maximum gRPC
request size (4194304 bytes, it would appear), resulting in a
RESOURCE_EXHAUSTED error.

The initial chunk size, which is currently constant but should be made
dynamic at some point in the future, has been chosen based on the IPC
bandwidth delay product for tazjin's desktop, rounded up.

Change-Id: I6f0232cdbc98653484816b39855126873fc59a03
Reviewed-on: https://cl.tvl.fyi/c/depot/+/1835
Tested-by: BuildkiteCI
Reviewed-by: tazjin <mail@tazj.in>
Reviewed-by: kanepyork <rikingcoding@gmail.com>
This commit is contained in:
Griffin Smith 2020-08-21 18:58:58 -04:00 committed by glittershark
parent 059d90dd6d
commit 74a8c3d359
3 changed files with 66 additions and 17 deletions

View file

@ -5,6 +5,7 @@
#include <memory> #include <memory>
#include <optional> #include <optional>
#include <ostream> #include <ostream>
#include <string_view>
#include <absl/status/status.h> #include <absl/status/status.h>
#include <absl/strings/str_cat.h> #include <absl/strings/str_cat.h>
@ -36,6 +37,11 @@ namespace nix {
namespace store { namespace store {
// Should be set to the bandwidth delay product between the client and the
// daemon. The current value, which should eventually be determined dynamically,
// has currently been set to a developer's deskop computer, rounded up
constexpr size_t kChunkSize = 1024 * 64;
using google::protobuf::util::TimeUtil; using google::protobuf::util::TimeUtil;
using grpc::ClientContext; using grpc::ClientContext;
using nix::proto::WorkerService; using nix::proto::WorkerService;
@ -308,14 +314,29 @@ Path RpcStore::addTextToStore(const std::string& name,
"repairing is not supported when building through the Nix daemon"); "repairing is not supported when building through the Nix daemon");
} }
ClientContext ctx; ClientContext ctx;
proto::AddTextToStoreRequest request;
request.set_name(name);
request.set_content(content);
for (const auto& ref : references) {
request.add_references(ref);
}
proto::StorePath result; proto::StorePath result;
SuccessOrThrow(stub_->AddTextToStore(&ctx, request, &result), __FUNCTION__); auto writer = stub_->AddTextToStore(&ctx, &result);
proto::AddTextToStoreRequest meta;
meta.mutable_meta()->set_name(name);
meta.mutable_meta()->set_size(content.size());
for (const auto& ref : references) {
meta.mutable_meta()->add_references(ref);
}
writer->Write(meta);
for (int i = 0; i <= content.size(); i += kChunkSize) {
auto len = std::min(kChunkSize, content.size() - i);
proto::AddTextToStoreRequest data;
data.set_data(content.data() + i, len);
if (!writer->Write(data)) {
// Finish() below will error
break;
}
}
writer->WritesDone();
SuccessOrThrow(writer->Finish(), __FUNCTION__);
return result.path(); return result.path();
} }

View file

@ -271,17 +271,38 @@ class WorkerServiceImpl final : public WorkerService::Service {
__FUNCTION__); __FUNCTION__);
} }
Status AddTextToStore(grpc::ServerContext*, Status AddTextToStore(
const nix::proto::AddTextToStoreRequest* request, grpc::ServerContext*,
grpc::ServerReader<nix::proto::AddTextToStoreRequest>* reader,
nix::proto::StorePath* response) override { nix::proto::StorePath* response) override {
return HandleExceptions( return HandleExceptions(
[&]() -> Status { [&]() -> Status {
proto::AddTextToStoreRequest request;
auto has_metadata = reader->Read(&request);
if (!has_metadata || !request.has_meta()) {
return Status(grpc::StatusCode::INVALID_ARGUMENT,
"Metadata must be set before sending content");
}
proto::AddTextToStoreRequest_Metadata meta = request.meta();
PathSet references; PathSet references;
for (const auto& ref : request->references()) { for (const auto& ref : meta.references()) {
references.insert(ref); references.insert(ref);
} }
auto path = store_->addTextToStore(request->name(),
request->content(), references); std::string content;
content.reserve(meta.size());
while (reader->Read(&request)) {
if (request.add_oneof_case() != request.kData) {
return Status(grpc::StatusCode::INVALID_ARGUMENT,
"All requests except the first must contain data");
}
content.append(request.data());
}
auto path = store_->addTextToStore(meta.name(), content, references);
response->set_path(path); response->set_path(path);
return Status::OK; return Status::OK;
}, },

View file

@ -23,7 +23,7 @@ service WorkerService {
rpc AddToStore(stream AddToStoreRequest) returns (StorePath); rpc AddToStore(stream AddToStoreRequest) returns (StorePath);
// Adds the supplied string to the store, as a text file. // Adds the supplied string to the store, as a text file.
rpc AddTextToStore(AddTextToStoreRequest) returns (StorePath); rpc AddTextToStore(stream AddTextToStoreRequest) returns (StorePath);
// Build the specified derivations in one of the specified build // Build the specified derivations in one of the specified build
// modes, defaulting to a normal build. // modes, defaulting to a normal build.
@ -223,9 +223,16 @@ message AddToStoreRequest {
} }
message AddTextToStoreRequest { message AddTextToStoreRequest {
message Metadata {
string name = 1; string name = 1;
string content = 2; repeated string references = 2;
repeated string references = 3; uint64 size = 3;
}
oneof add_oneof {
Metadata meta = 4;
bytes data = 5;
}
} }
message BuildPathsRequest { message BuildPathsRequest {