feat(tvix/store): drop BlobWriter
All code initially using this has been replaced by the simpler and more performant implementation with StreamCDC and read_all_and_chunk. Change-Id: I08889e9a6984de91c5debcf2b612cb68ae5072d1 Reviewed-on: https://cl.tvl.fyi/c/depot/+/8265 Autosubmit: flokli <flokli@flokli.de> Reviewed-by: tazjin <tazjin@tvl.su> Reviewed-by: raitobezarius <tvl@lahfa.xyz> Tested-by: BuildkiteCI
This commit is contained in:
parent
c8bbddd5e5
commit
21782d24f5
2 changed files with 0 additions and 142 deletions
|
@ -1,140 +0,0 @@
|
|||
use crate::chunkservice::{update_hasher, upload_chunk, ChunkService};
|
||||
use crate::{proto, Error};
|
||||
use rayon::prelude::*;
|
||||
use tracing::instrument;
|
||||
|
||||
pub struct BlobWriter<'a, CS: ChunkService> {
|
||||
chunk_service: &'a CS,
|
||||
|
||||
blob_hasher: blake3::Hasher,
|
||||
|
||||
blob_meta: proto::BlobMeta,
|
||||
|
||||
// filled with data from previous writes that didn't end up producing a full chunk.
|
||||
buf: Vec<u8>,
|
||||
}
|
||||
|
||||
impl<'a, CS: ChunkService> BlobWriter<'a, CS> {
|
||||
pub fn new(chunk_service: &'a CS) -> Self {
|
||||
Self {
|
||||
chunk_service,
|
||||
blob_hasher: blake3::Hasher::new(),
|
||||
blob_meta: proto::BlobMeta::default(),
|
||||
buf: vec![],
|
||||
}
|
||||
}
|
||||
|
||||
// Return the digest of the blob, as well as the blobmeta containing info of all the chunks,
|
||||
// or an error if there's still bytes left to be flushed.
|
||||
// In case there was still data to be written a last unfinalized chunk,
|
||||
// it's written as well.
|
||||
#[instrument(skip(self))]
|
||||
pub fn finalize(&mut self) -> Result<(Vec<u8>, proto::BlobMeta), Error> {
|
||||
// If there's something still left in self.buf, upload this as the last
|
||||
// chunk to the chunk service and record it in BlobMeta.
|
||||
if !self.buf.is_empty() {
|
||||
// Also upload the last chunk (what's left in `self.buf`) to the chunk
|
||||
// service and record it in BlobMeta.
|
||||
let buf_len = self.buf.len() as u32;
|
||||
let chunk_digest = upload_chunk(self.chunk_service, self.buf.clone())?;
|
||||
|
||||
self.blob_meta.chunks.push(proto::blob_meta::ChunkMeta {
|
||||
digest: chunk_digest,
|
||||
size: buf_len,
|
||||
});
|
||||
|
||||
self.buf.clear();
|
||||
}
|
||||
return Ok((
|
||||
self.blob_hasher.finalize().as_bytes().to_vec(),
|
||||
self.blob_meta.clone(),
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
/// This chunks up all data written using fastcdc, uploads all chunks to the
|
||||
// [ChunkService], and fills a [proto::BlobMeta] linking to these chunks.
|
||||
impl<CS: ChunkService + std::marker::Sync> std::io::Write for BlobWriter<'_, CS> {
|
||||
fn write(&mut self, input_buf: &[u8]) -> std::io::Result<usize> {
|
||||
// calculate input_buf.len(), we need to return that later.
|
||||
let input_buf_len = input_buf.len();
|
||||
|
||||
// update blob hash
|
||||
update_hasher(&mut self.blob_hasher, input_buf);
|
||||
|
||||
// prepend buf with existing data (from self.buf)
|
||||
let buf: Vec<u8> = {
|
||||
let mut b = Vec::new();
|
||||
b.append(&mut self.buf);
|
||||
b.append(&mut input_buf.to_vec());
|
||||
b
|
||||
};
|
||||
|
||||
// TODO: play with chunking sizes
|
||||
let chunker_avg_size = 64 * 1024;
|
||||
let chunker_min_size = chunker_avg_size / 4;
|
||||
let chunker_max_size = chunker_avg_size * 4;
|
||||
|
||||
// initialize a chunker with the buffer
|
||||
let chunker = fastcdc::v2020::FastCDC::new(
|
||||
&buf,
|
||||
chunker_min_size,
|
||||
chunker_avg_size,
|
||||
chunker_max_size,
|
||||
);
|
||||
|
||||
// assemble a list of byte slices to be uploaded
|
||||
let mut chunk_slices: Vec<&[u8]> = Vec::new();
|
||||
|
||||
// ask the chunker for cutting points in the buffer.
|
||||
let mut start_pos = 0_usize;
|
||||
let rest = loop {
|
||||
// ask the chunker for the next cutting point.
|
||||
let (_fp, end_pos) = chunker.cut(start_pos, buf.len() - start_pos);
|
||||
|
||||
// whenever the last cut point is pointing to the end of the buffer,
|
||||
// return that from the loop.
|
||||
// We don't know if the chunker decided to cut here simply because it was
|
||||
// at the end of the buffer, or if it would also cut if there
|
||||
// were more data.
|
||||
//
|
||||
// Split off all previous chunks and keep this chunk data in the buffer.
|
||||
if end_pos == buf.len() {
|
||||
break &buf[start_pos..];
|
||||
}
|
||||
|
||||
// if it's an intermediate chunk, add it to chunk_slices.
|
||||
// We'll later upload all of them in batch.
|
||||
chunk_slices.push(&buf[start_pos..end_pos]);
|
||||
|
||||
// advance start_pos over the processed chunk.
|
||||
start_pos = end_pos;
|
||||
};
|
||||
|
||||
// Upload all chunks to the chunk service and map them to a ChunkMeta
|
||||
let blob_meta_chunks: Vec<Result<proto::blob_meta::ChunkMeta, Error>> = chunk_slices
|
||||
.into_par_iter()
|
||||
.map(|chunk_slice| {
|
||||
let chunk_digest = upload_chunk(self.chunk_service, chunk_slice.to_vec())?;
|
||||
|
||||
Ok(proto::blob_meta::ChunkMeta {
|
||||
digest: chunk_digest,
|
||||
size: chunk_slice.len() as u32,
|
||||
})
|
||||
})
|
||||
.collect();
|
||||
|
||||
self.blob_meta.chunks = blob_meta_chunks
|
||||
.into_iter()
|
||||
.collect::<Result<Vec<proto::blob_meta::ChunkMeta>, Error>>()?;
|
||||
|
||||
// update buf to point to the rest we didn't upload.
|
||||
self.buf = rest.to_vec();
|
||||
|
||||
Ok(input_buf_len)
|
||||
}
|
||||
|
||||
fn flush(&mut self) -> std::io::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
|
@ -1,5 +1,4 @@
|
|||
mod blobreader;
|
||||
mod blobwriter;
|
||||
mod errors;
|
||||
|
||||
pub mod blobservice;
|
||||
|
@ -11,7 +10,6 @@ pub mod pathinfoservice;
|
|||
pub mod proto;
|
||||
|
||||
pub use blobreader::BlobReader;
|
||||
pub use blobwriter::BlobWriter;
|
||||
pub use errors::Error;
|
||||
|
||||
#[cfg(test)]
|
||||
|
|
Loading…
Reference in a new issue