refactor(tvix/store): move blob splitting into a BlobWriter struct
This will moves the chunking-as-we-receive logic that so far only lived in grpc_blobservice_wrapper.rs into a generic BlobWriter. Change-Id: Ief7d1bda3c6280129f7139de3f6c4174be2ca6ea Reviewed-on: https://cl.tvl.fyi/c/depot/+/8154 Tested-by: BuildkiteCI Reviewed-by: raitobezarius <tvl@lahfa.xyz>
This commit is contained in:
parent
d8ab140d25
commit
0baaabc43e
4 changed files with 172 additions and 76 deletions
149
tvix/store/src/blobwriter.rs
Normal file
149
tvix/store/src/blobwriter.rs
Normal file
|
@ -0,0 +1,149 @@
|
|||
use crate::chunkservice::ChunkService;
|
||||
use crate::{proto, Error};
|
||||
use tracing::{debug, instrument};
|
||||
|
||||
pub struct BlobWriter<'a, CS: ChunkService> {
|
||||
chunk_service: &'a CS,
|
||||
|
||||
blob_hasher: blake3::Hasher,
|
||||
|
||||
blob_meta: proto::BlobMeta,
|
||||
|
||||
// filled with data from previous writes that didn't end up producing a full chunk.
|
||||
buf: Vec<u8>,
|
||||
}
|
||||
|
||||
impl<'a, CS: ChunkService> BlobWriter<'a, CS> {
|
||||
pub fn new(chunk_service: &'a CS) -> Self {
|
||||
Self {
|
||||
chunk_service,
|
||||
blob_hasher: blake3::Hasher::new(),
|
||||
blob_meta: proto::BlobMeta::default(),
|
||||
buf: vec![],
|
||||
}
|
||||
}
|
||||
|
||||
// Return the digest of the blob, as well as the blobmeta containing info of all the chunks,
|
||||
// or an error if there's still bytes left to be flushed.
|
||||
// In case there was still data to be written a last unfinalized chunk,
|
||||
// it's written as well.
|
||||
#[instrument(skip(self))]
|
||||
pub fn finalize(&mut self) -> Result<(Vec<u8>, proto::BlobMeta), Error> {
|
||||
// If there's something still left in self.buf, upload this as the last
|
||||
// chunk to the chunk service and record it in BlobMeta.
|
||||
if !self.buf.is_empty() {
|
||||
// Also upload the last chunk (what's left in `self.buf`) to the chunk
|
||||
// service and record it in BlobMeta.
|
||||
let buf_len = self.buf.len() as u32;
|
||||
let chunk_digest = self.upload_chunk(self.buf.clone())?;
|
||||
|
||||
self.blob_meta.chunks.push(proto::blob_meta::ChunkMeta {
|
||||
digest: chunk_digest,
|
||||
size: buf_len,
|
||||
});
|
||||
|
||||
self.buf.clear();
|
||||
}
|
||||
return Ok((
|
||||
self.blob_hasher.finalize().as_bytes().to_vec(),
|
||||
self.blob_meta.clone(),
|
||||
));
|
||||
}
|
||||
|
||||
// upload a chunk to the chunk service, and return its digest (or an error) when done.
|
||||
#[instrument(skip(self, chunk_data))]
|
||||
fn upload_chunk(&mut self, chunk_data: Vec<u8>) -> Result<Vec<u8>, Error> {
|
||||
let mut hasher = blake3::Hasher::new();
|
||||
if chunk_data.len() >= 128 * 1024 {
|
||||
hasher.update_rayon(&chunk_data);
|
||||
} else {
|
||||
hasher.update(&chunk_data);
|
||||
}
|
||||
let digest = hasher.finalize();
|
||||
|
||||
if self.chunk_service.has(digest.as_bytes())? {
|
||||
debug!("already has chunk, skipping");
|
||||
}
|
||||
let digest_resp = self.chunk_service.put(chunk_data)?;
|
||||
|
||||
assert_eq!(digest_resp, digest.as_bytes());
|
||||
|
||||
Ok(digest.as_bytes().to_vec())
|
||||
}
|
||||
}
|
||||
|
||||
/// This chunks up all data written using fastcdc, uploads all chunks to the
|
||||
// [ChunkService], and fills a [proto::BlobMeta] linking to these chunks.
|
||||
impl<CS: ChunkService> std::io::Write for BlobWriter<'_, CS> {
|
||||
fn write(&mut self, input_buf: &[u8]) -> std::io::Result<usize> {
|
||||
// calculate input_buf.len(), we need to return that later.
|
||||
let input_buf_len = input_buf.len();
|
||||
|
||||
// update calculate blob hash, and use rayon if data is > 128KiB.
|
||||
if input_buf.len() > 128 * 1024 {
|
||||
self.blob_hasher.update_rayon(input_buf);
|
||||
} else {
|
||||
self.blob_hasher.update(input_buf);
|
||||
}
|
||||
|
||||
// prepend buf with existing data (from self.buf)
|
||||
let buf: Vec<u8> = {
|
||||
let mut b = Vec::new();
|
||||
b.append(&mut self.buf);
|
||||
b.append(&mut input_buf.to_vec());
|
||||
b
|
||||
};
|
||||
|
||||
// TODO: play with chunking sizes
|
||||
let chunker_avg_size = 64 * 1024;
|
||||
let chunker_min_size = chunker_avg_size / 4;
|
||||
let chunker_max_size = chunker_avg_size * 4;
|
||||
|
||||
// initialize a chunker with the buffer
|
||||
let chunker = fastcdc::v2020::FastCDC::new(
|
||||
&buf,
|
||||
chunker_min_size,
|
||||
chunker_avg_size,
|
||||
chunker_max_size,
|
||||
);
|
||||
|
||||
// ask the chunker for cutting points in the buffer.
|
||||
let mut start_pos = 0_usize;
|
||||
let rest = loop {
|
||||
// ask the chunker for the next cutting point.
|
||||
let (_fp, end_pos) = chunker.cut(start_pos, buf.len() - start_pos);
|
||||
|
||||
// whenever the last cut point is pointing to the end of the buffer,
|
||||
// keep that chunk left in there.
|
||||
// We don't know if the chunker decided to cut here simply because it was
|
||||
// at the end of the buffer, or if it would also cut if there
|
||||
// were more data.
|
||||
//
|
||||
// Split off all previous chunks and keep this chunk data in the buffer.
|
||||
if end_pos == buf.len() {
|
||||
break buf[start_pos..].to_vec();
|
||||
}
|
||||
|
||||
// Upload that chunk to the chunk service and record it in BlobMeta.
|
||||
// TODO: make upload_chunk async and upload concurrently?
|
||||
let chunk_data = &buf[start_pos..end_pos];
|
||||
let chunk_digest = self.upload_chunk(chunk_data.to_vec())?;
|
||||
|
||||
self.blob_meta.chunks.push(proto::blob_meta::ChunkMeta {
|
||||
digest: chunk_digest,
|
||||
size: chunk_data.len() as u32,
|
||||
});
|
||||
|
||||
// move start_pos over the processed chunk.
|
||||
start_pos = end_pos;
|
||||
};
|
||||
|
||||
self.buf = rest;
|
||||
|
||||
Ok(input_buf_len)
|
||||
}
|
||||
|
||||
fn flush(&mut self) -> std::io::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
|
@ -26,3 +26,13 @@ impl From<Error> for Status {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: this should probably go somewhere else?
|
||||
impl From<Error> for std::io::Error {
|
||||
fn from(value: Error) -> Self {
|
||||
match value {
|
||||
Error::InvalidRequest(msg) => Self::new(std::io::ErrorKind::InvalidInput, msg),
|
||||
Error::StorageError(msg) => Self::new(std::io::ErrorKind::Other, msg),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
mod blobreader;
|
||||
mod blobwriter;
|
||||
mod errors;
|
||||
|
||||
pub mod blobservice;
|
||||
|
@ -9,6 +10,7 @@ pub mod pathinfoservice;
|
|||
pub mod proto;
|
||||
|
||||
pub use blobreader::BlobReader;
|
||||
pub use blobwriter::BlobWriter;
|
||||
pub use errors::Error;
|
||||
|
||||
#[cfg(test)]
|
||||
|
|
|
@ -1,9 +1,10 @@
|
|||
use crate::{blobservice::BlobService, chunkservice::ChunkService, Error};
|
||||
use data_encoding::BASE64;
|
||||
use std::io::Write;
|
||||
use tokio::{sync::mpsc::channel, task};
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
use tonic::{async_trait, Request, Response, Status, Streaming};
|
||||
use tracing::{debug, instrument};
|
||||
use tracing::{debug, error, instrument, warn};
|
||||
|
||||
pub struct GRPCBlobServiceWrapper<BS: BlobService, CS: ChunkService> {
|
||||
blob_service: BS,
|
||||
|
@ -164,86 +165,20 @@ impl<
|
|||
) -> Result<Response<super::PutBlobResponse>, Status> {
|
||||
let mut req_inner = request.into_inner();
|
||||
|
||||
// initialize a blake3 hasher calculating the hash of the whole blob.
|
||||
let mut blob_hasher = blake3::Hasher::new();
|
||||
let mut blob_writer = crate::BlobWriter::new(&self.chunk_service);
|
||||
|
||||
// start a BlobMeta, which we'll fill while looping over the chunks
|
||||
let mut blob_meta = super::BlobMeta::default();
|
||||
|
||||
// is filled with bytes received from the client.
|
||||
let mut buf: Vec<u8> = vec![];
|
||||
|
||||
// This reads data from the client, chunks it up using fastcdc,
|
||||
// uploads all chunks to the [ChunkService], and fills a
|
||||
// [super::BlobMeta] linking to these chunks.
|
||||
// receive data from the client, and keep writing it to the blob writer.
|
||||
while let Some(blob_chunk) = req_inner.message().await? {
|
||||
// calculate blob hash, and use rayon if data is > 128KiB.
|
||||
if blob_chunk.data.len() > 128 * 1024 {
|
||||
blob_hasher.update_rayon(&blob_chunk.data);
|
||||
} else {
|
||||
blob_hasher.update(&blob_chunk.data);
|
||||
}
|
||||
|
||||
// extend buf with the newly received data
|
||||
buf.append(&mut blob_chunk.data.clone());
|
||||
|
||||
// TODO: play with chunking sizes
|
||||
let chunker_avg_size = 64 * 1024;
|
||||
let chunker_min_size = chunker_avg_size / 4;
|
||||
let chunker_max_size = chunker_avg_size * 4;
|
||||
|
||||
// initialize a chunker with the current buffer
|
||||
let chunker = fastcdc::v2020::FastCDC::new(
|
||||
&buf,
|
||||
chunker_min_size,
|
||||
chunker_avg_size,
|
||||
chunker_max_size,
|
||||
);
|
||||
|
||||
// ask the chunker for cutting points in the buffer.
|
||||
let mut start_pos = 0 as usize;
|
||||
buf = loop {
|
||||
// ask the chunker for the next cutting point.
|
||||
let (_fp, end_pos) = chunker.cut(start_pos, buf.len() - start_pos);
|
||||
|
||||
// whenever the last cut point is pointing to the end of the buffer,
|
||||
// keep that chunk left in there.
|
||||
// We don't know if the chunker decided to cut here simply because it was
|
||||
// at the end of the buffer, or if it would also cut if there
|
||||
// were more data.
|
||||
//
|
||||
// Split off all previous chunks and keep this chunk data in the buffer.
|
||||
if end_pos == buf.len() {
|
||||
break buf.split_off(start_pos);
|
||||
}
|
||||
|
||||
// Upload that chunk to the chunk service and record it in BlobMeta.
|
||||
// TODO: make upload_chunk async and upload concurrently?
|
||||
let chunk_data = &buf[start_pos..end_pos];
|
||||
let chunk_digest =
|
||||
Self::upload_chunk(self.chunk_service.clone(), chunk_data.to_vec())?;
|
||||
|
||||
blob_meta.chunks.push(super::blob_meta::ChunkMeta {
|
||||
digest: chunk_digest,
|
||||
size: chunk_data.len() as u32,
|
||||
});
|
||||
|
||||
// move start_pos over the processed chunk.
|
||||
start_pos = end_pos;
|
||||
if let Err(e) = blob_writer.write_all(&blob_chunk.data) {
|
||||
error!(e=%e,"unable to write blob data");
|
||||
return Err(Status::internal("unable to write blob data"));
|
||||
}
|
||||
}
|
||||
|
||||
// Also upload the last chunk (what's left in `buf`) to the chunk
|
||||
// service and record it in BlobMeta.
|
||||
let buf_len = buf.len() as u32;
|
||||
let chunk_digest = Self::upload_chunk(self.chunk_service.clone(), buf)?;
|
||||
|
||||
blob_meta.chunks.push(super::blob_meta::ChunkMeta {
|
||||
digest: chunk_digest,
|
||||
size: buf_len,
|
||||
});
|
||||
|
||||
let blob_digest = blob_hasher.finalize().as_bytes().to_vec();
|
||||
// run finalize
|
||||
let (blob_digest, blob_meta) = blob_writer
|
||||
.finalize()
|
||||
.map_err(|_| Status::internal("unable to finalize blob"))?;
|
||||
|
||||
// check if we have the received blob in the [BlobService] already.
|
||||
let resp = self.blob_service.stat(&super::StatBlobRequest {
|
||||
|
|
Loading…
Reference in a new issue