refactor(tvix/store): use read_all_and_chunk in gRPC blobservice

This was the last piece of code using BlobWriter.

We can also use `read_all_and_chunk`, it's just requires a bit more
plumbing:

 - The data coming from the client (stream) needs to be mapped (we
   extract the .data field).
 - The stream needs to be turned into an (async) reader
 - The reader needs to be made sync, and that code using the sync reader
   needs to be in a `task::spawn_blocking`.

Change-Id: I4e374e1a9f47d5a0933f59a8f5c121185a5f3e95
Reviewed-on: https://cl.tvl.fyi/c/depot/+/8260
Autosubmit: flokli <flokli@flokli.de>
Reviewed-by: raitobezarius <tvl@lahfa.xyz>
Tested-by: BuildkiteCI
Reviewed-by: tazjin <tazjin@tvl.su>
This commit is contained in:
Florian Klink 2023-03-11 21:21:40 +01:00 committed by flokli
parent 7ffb2676ee
commit c8bbddd5e5
4 changed files with 49 additions and 39 deletions

View file

@ -1,14 +1,15 @@
use std::collections::VecDeque;
use crate::{
blobservice::BlobService,
chunkservice::{update_hasher, ChunkService},
chunkservice::{read_all_and_chunk, update_hasher, ChunkService},
Error,
};
use data_encoding::BASE64;
use std::io::{BufWriter, Write};
use tokio::{sync::mpsc::channel, task};
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::{wrappers::ReceiverStream, StreamExt};
use tonic::{async_trait, Request, Response, Status, Streaming};
use tracing::{debug, error, instrument, warn};
use tracing::{debug, instrument, warn};
pub struct GRPCBlobServiceWrapper<BS: BlobService, CS: ChunkService> {
blob_service: BS,
@ -163,38 +164,40 @@ impl<
&self,
request: Request<Streaming<super::BlobChunk>>,
) -> Result<Response<super::PutBlobResponse>, Status> {
let mut req_inner = request.into_inner();
let req_inner = request.into_inner();
// instantiate a [BlobWriter] to write all data received with a client,
// but wrap it in a pretty large (1MiB) [BufWriter] to prevent
// excessive useless chunk attempts.
let mut blob_writer = crate::BlobWriter::new(&self.chunk_service);
let data_stream = req_inner.map(|x| {
x.map(|x| VecDeque::from(x.data))
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))
});
let data_reader = tokio_util::io::StreamReader::new(data_stream);
// TODO: can we get rid of this clone?
let chunk_service = self.chunk_service.clone();
let (blob_digest, blob_meta) =
task::spawn_blocking(move || -> Result<(Vec<u8>, super::BlobMeta), Error> {
// feed read_all_and_chunk a (sync) reader to the data retrieved from the stream.
read_all_and_chunk(
&chunk_service,
tokio_util::io::SyncIoBridge::new(data_reader),
)
})
.await
.map_err(|e| Status::internal(e.to_string()))??;
// upload blobmeta if not there yet
if self
.blob_service
.stat(&super::StatBlobRequest {
digest: blob_digest.to_vec(),
include_chunks: false,
include_bao: false,
})?
.is_none()
{
let mut blob_writer_buffered = BufWriter::with_capacity(1024 * 1024, &mut blob_writer);
// receive data from the client, and write them all to the blob_writer.
while let Some(blob_chunk) = req_inner.message().await? {
if let Err(e) = blob_writer_buffered.write_all(&blob_chunk.data) {
error!(e=%e,"unable to write blob data");
return Err(Status::internal("unable to write blob data"));
}
}
blob_writer_buffered.flush()?;
}
// run finalize
let (blob_digest, blob_meta) = blob_writer
.finalize()
.map_err(|_| Status::internal("unable to finalize blob"))?;
// check if we have the received blob in the [BlobService] already.
let resp = self.blob_service.stat(&super::StatBlobRequest {
digest: blob_digest.to_vec(),
..Default::default()
})?;
// if not, store.
if resp.is_none() {
// upload blobmeta
self.blob_service.put(&blob_digest, blob_meta)?;
}