fix(tvix/store/proto/grpc_blobservice_wrapper): buffer recv data
While we don't want to keep all of the data in memory, we want to feed a reasonably-enough buffer to the chunking function, to prevent unnecessarily trying to chunk over and over again. Change-Id: I5bbe2d55e8c1c63f8f7ce343889d374b528b559e Reviewed-on: https://cl.tvl.fyi/c/depot/+/8160 Tested-by: BuildkiteCI Reviewed-by: raitobezarius <tvl@lahfa.xyz>
This commit is contained in:
parent
b29d1ae372
commit
535e1b15ab
1 changed files with 13 additions and 6 deletions
|
@ -1,6 +1,6 @@
|
|||
use crate::{blobservice::BlobService, chunkservice::ChunkService, Error};
|
||||
use data_encoding::BASE64;
|
||||
use std::io::Write;
|
||||
use std::io::{BufWriter, Write};
|
||||
use tokio::{sync::mpsc::channel, task};
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
use tonic::{async_trait, Request, Response, Status, Streaming};
|
||||
|
@ -165,14 +165,21 @@ impl<
|
|||
) -> Result<Response<super::PutBlobResponse>, Status> {
|
||||
let mut req_inner = request.into_inner();
|
||||
|
||||
// instantiate a [BlobWriter] to write all data received with a client,
|
||||
// but wrap it in a pretty large (1MiB) [BufWriter] to prevent
|
||||
// excessive useless chunk attempts.
|
||||
let mut blob_writer = crate::BlobWriter::new(&self.chunk_service);
|
||||
{
|
||||
let mut blob_writer_buffered = BufWriter::with_capacity(1024 * 1024, &mut blob_writer);
|
||||
|
||||
// receive data from the client, and keep writing it to the blob writer.
|
||||
while let Some(blob_chunk) = req_inner.message().await? {
|
||||
if let Err(e) = blob_writer.write_all(&blob_chunk.data) {
|
||||
error!(e=%e,"unable to write blob data");
|
||||
return Err(Status::internal("unable to write blob data"));
|
||||
// receive data from the client, and write them all to the blob_writer.
|
||||
while let Some(blob_chunk) = req_inner.message().await? {
|
||||
if let Err(e) = blob_writer_buffered.write_all(&blob_chunk.data) {
|
||||
error!(e=%e,"unable to write blob data");
|
||||
return Err(Status::internal("unable to write blob data"));
|
||||
}
|
||||
}
|
||||
blob_writer_buffered.flush()?;
|
||||
}
|
||||
|
||||
// run finalize
|
||||
|
|
Loading…
Reference in a new issue