diff --git a/tvix/store/src/blobwriter.rs b/tvix/store/src/blobwriter.rs new file mode 100644 index 000000000..8cb09ecc6 --- /dev/null +++ b/tvix/store/src/blobwriter.rs @@ -0,0 +1,149 @@ +use crate::chunkservice::ChunkService; +use crate::{proto, Error}; +use tracing::{debug, instrument}; + +pub struct BlobWriter<'a, CS: ChunkService> { + chunk_service: &'a CS, + + blob_hasher: blake3::Hasher, + + blob_meta: proto::BlobMeta, + + // filled with data from previous writes that didn't end up producing a full chunk. + buf: Vec, +} + +impl<'a, CS: ChunkService> BlobWriter<'a, CS> { + pub fn new(chunk_service: &'a CS) -> Self { + Self { + chunk_service, + blob_hasher: blake3::Hasher::new(), + blob_meta: proto::BlobMeta::default(), + buf: vec![], + } + } + + // Return the digest of the blob, as well as the blobmeta containing info of all the chunks, + // or an error if there's still bytes left to be flushed. + // In case there was still data to be written a last unfinalized chunk, + // it's written as well. + #[instrument(skip(self))] + pub fn finalize(&mut self) -> Result<(Vec, proto::BlobMeta), Error> { + // If there's something still left in self.buf, upload this as the last + // chunk to the chunk service and record it in BlobMeta. + if !self.buf.is_empty() { + // Also upload the last chunk (what's left in `self.buf`) to the chunk + // service and record it in BlobMeta. + let buf_len = self.buf.len() as u32; + let chunk_digest = self.upload_chunk(self.buf.clone())?; + + self.blob_meta.chunks.push(proto::blob_meta::ChunkMeta { + digest: chunk_digest, + size: buf_len, + }); + + self.buf.clear(); + } + return Ok(( + self.blob_hasher.finalize().as_bytes().to_vec(), + self.blob_meta.clone(), + )); + } + + // upload a chunk to the chunk service, and return its digest (or an error) when done. + #[instrument(skip(self, chunk_data))] + fn upload_chunk(&mut self, chunk_data: Vec) -> Result, Error> { + let mut hasher = blake3::Hasher::new(); + if chunk_data.len() >= 128 * 1024 { + hasher.update_rayon(&chunk_data); + } else { + hasher.update(&chunk_data); + } + let digest = hasher.finalize(); + + if self.chunk_service.has(digest.as_bytes())? { + debug!("already has chunk, skipping"); + } + let digest_resp = self.chunk_service.put(chunk_data)?; + + assert_eq!(digest_resp, digest.as_bytes()); + + Ok(digest.as_bytes().to_vec()) + } +} + +/// This chunks up all data written using fastcdc, uploads all chunks to the +// [ChunkService], and fills a [proto::BlobMeta] linking to these chunks. +impl std::io::Write for BlobWriter<'_, CS> { + fn write(&mut self, input_buf: &[u8]) -> std::io::Result { + // calculate input_buf.len(), we need to return that later. + let input_buf_len = input_buf.len(); + + // update calculate blob hash, and use rayon if data is > 128KiB. + if input_buf.len() > 128 * 1024 { + self.blob_hasher.update_rayon(input_buf); + } else { + self.blob_hasher.update(input_buf); + } + + // prepend buf with existing data (from self.buf) + let buf: Vec = { + let mut b = Vec::new(); + b.append(&mut self.buf); + b.append(&mut input_buf.to_vec()); + b + }; + + // TODO: play with chunking sizes + let chunker_avg_size = 64 * 1024; + let chunker_min_size = chunker_avg_size / 4; + let chunker_max_size = chunker_avg_size * 4; + + // initialize a chunker with the buffer + let chunker = fastcdc::v2020::FastCDC::new( + &buf, + chunker_min_size, + chunker_avg_size, + chunker_max_size, + ); + + // ask the chunker for cutting points in the buffer. + let mut start_pos = 0_usize; + let rest = loop { + // ask the chunker for the next cutting point. + let (_fp, end_pos) = chunker.cut(start_pos, buf.len() - start_pos); + + // whenever the last cut point is pointing to the end of the buffer, + // keep that chunk left in there. + // We don't know if the chunker decided to cut here simply because it was + // at the end of the buffer, or if it would also cut if there + // were more data. + // + // Split off all previous chunks and keep this chunk data in the buffer. + if end_pos == buf.len() { + break buf[start_pos..].to_vec(); + } + + // Upload that chunk to the chunk service and record it in BlobMeta. + // TODO: make upload_chunk async and upload concurrently? + let chunk_data = &buf[start_pos..end_pos]; + let chunk_digest = self.upload_chunk(chunk_data.to_vec())?; + + self.blob_meta.chunks.push(proto::blob_meta::ChunkMeta { + digest: chunk_digest, + size: chunk_data.len() as u32, + }); + + // move start_pos over the processed chunk. + start_pos = end_pos; + }; + + self.buf = rest; + + Ok(input_buf_len) + } + + fn flush(&mut self) -> std::io::Result<()> { + Ok(()) + } +} diff --git a/tvix/store/src/errors.rs b/tvix/store/src/errors.rs index 25e87c8aa..36a4320cb 100644 --- a/tvix/store/src/errors.rs +++ b/tvix/store/src/errors.rs @@ -26,3 +26,13 @@ impl From for Status { } } } + +// TODO: this should probably go somewhere else? +impl From for std::io::Error { + fn from(value: Error) -> Self { + match value { + Error::InvalidRequest(msg) => Self::new(std::io::ErrorKind::InvalidInput, msg), + Error::StorageError(msg) => Self::new(std::io::ErrorKind::Other, msg), + } + } +} diff --git a/tvix/store/src/lib.rs b/tvix/store/src/lib.rs index f294e39ad..c345cb9b1 100644 --- a/tvix/store/src/lib.rs +++ b/tvix/store/src/lib.rs @@ -1,4 +1,5 @@ mod blobreader; +mod blobwriter; mod errors; pub mod blobservice; @@ -9,6 +10,7 @@ pub mod pathinfoservice; pub mod proto; pub use blobreader::BlobReader; +pub use blobwriter::BlobWriter; pub use errors::Error; #[cfg(test)] diff --git a/tvix/store/src/proto/grpc_blobservice_wrapper.rs b/tvix/store/src/proto/grpc_blobservice_wrapper.rs index 4ae309349..52d067030 100644 --- a/tvix/store/src/proto/grpc_blobservice_wrapper.rs +++ b/tvix/store/src/proto/grpc_blobservice_wrapper.rs @@ -1,9 +1,10 @@ use crate::{blobservice::BlobService, chunkservice::ChunkService, Error}; use data_encoding::BASE64; +use std::io::Write; use tokio::{sync::mpsc::channel, task}; use tokio_stream::wrappers::ReceiverStream; use tonic::{async_trait, Request, Response, Status, Streaming}; -use tracing::{debug, instrument}; +use tracing::{debug, error, instrument, warn}; pub struct GRPCBlobServiceWrapper { blob_service: BS, @@ -164,86 +165,20 @@ impl< ) -> Result, Status> { let mut req_inner = request.into_inner(); - // initialize a blake3 hasher calculating the hash of the whole blob. - let mut blob_hasher = blake3::Hasher::new(); + let mut blob_writer = crate::BlobWriter::new(&self.chunk_service); - // start a BlobMeta, which we'll fill while looping over the chunks - let mut blob_meta = super::BlobMeta::default(); - - // is filled with bytes received from the client. - let mut buf: Vec = vec![]; - - // This reads data from the client, chunks it up using fastcdc, - // uploads all chunks to the [ChunkService], and fills a - // [super::BlobMeta] linking to these chunks. + // receive data from the client, and keep writing it to the blob writer. while let Some(blob_chunk) = req_inner.message().await? { - // calculate blob hash, and use rayon if data is > 128KiB. - if blob_chunk.data.len() > 128 * 1024 { - blob_hasher.update_rayon(&blob_chunk.data); - } else { - blob_hasher.update(&blob_chunk.data); - } - - // extend buf with the newly received data - buf.append(&mut blob_chunk.data.clone()); - - // TODO: play with chunking sizes - let chunker_avg_size = 64 * 1024; - let chunker_min_size = chunker_avg_size / 4; - let chunker_max_size = chunker_avg_size * 4; - - // initialize a chunker with the current buffer - let chunker = fastcdc::v2020::FastCDC::new( - &buf, - chunker_min_size, - chunker_avg_size, - chunker_max_size, - ); - - // ask the chunker for cutting points in the buffer. - let mut start_pos = 0 as usize; - buf = loop { - // ask the chunker for the next cutting point. - let (_fp, end_pos) = chunker.cut(start_pos, buf.len() - start_pos); - - // whenever the last cut point is pointing to the end of the buffer, - // keep that chunk left in there. - // We don't know if the chunker decided to cut here simply because it was - // at the end of the buffer, or if it would also cut if there - // were more data. - // - // Split off all previous chunks and keep this chunk data in the buffer. - if end_pos == buf.len() { - break buf.split_off(start_pos); - } - - // Upload that chunk to the chunk service and record it in BlobMeta. - // TODO: make upload_chunk async and upload concurrently? - let chunk_data = &buf[start_pos..end_pos]; - let chunk_digest = - Self::upload_chunk(self.chunk_service.clone(), chunk_data.to_vec())?; - - blob_meta.chunks.push(super::blob_meta::ChunkMeta { - digest: chunk_digest, - size: chunk_data.len() as u32, - }); - - // move start_pos over the processed chunk. - start_pos = end_pos; + if let Err(e) = blob_writer.write_all(&blob_chunk.data) { + error!(e=%e,"unable to write blob data"); + return Err(Status::internal("unable to write blob data")); } } - // Also upload the last chunk (what's left in `buf`) to the chunk - // service and record it in BlobMeta. - let buf_len = buf.len() as u32; - let chunk_digest = Self::upload_chunk(self.chunk_service.clone(), buf)?; - - blob_meta.chunks.push(super::blob_meta::ChunkMeta { - digest: chunk_digest, - size: buf_len, - }); - - let blob_digest = blob_hasher.finalize().as_bytes().to_vec(); + // run finalize + let (blob_digest, blob_meta) = blob_writer + .finalize() + .map_err(|_| Status::internal("unable to finalize blob"))?; // check if we have the received blob in the [BlobService] already. let resp = self.blob_service.stat(&super::StatBlobRequest {