feat(tvix/store): do not buffer blob data

Use the FastCDC::cut function to ask fastcd for cutting points as we
receive the data. Make sure to keep the last chunk in the temporary
buffer, as we might not actually cut at the end.

Also, use rayon to calculate the blake3 hash if the input data is
> 128KiB.

Change-Id: I6195f3b74eac5516965cb12d8d026aa720c8b891
Reviewed-on: https://cl.tvl.fyi/c/depot/+/8135
Reviewed-by: raitobezarius <tvl@lahfa.xyz>
Tested-by: BuildkiteCI
This commit is contained in:
Florian Klink 2023-02-18 20:44:58 +01:00 committed by flokli
parent a40d2dcdcd
commit d8ab140d25
2 changed files with 138 additions and 72 deletions

View file

@ -17,6 +17,27 @@ impl<BS: BlobService, CS: ChunkService> GRPCBlobServiceWrapper<BS, CS> {
chunk_service,
}
}
// upload the chunk to the chunk service, and return its digest (or an error) when done.
#[instrument(skip(chunk_service))]
fn upload_chunk(chunk_service: CS, chunk_data: Vec<u8>) -> Result<Vec<u8>, Error> {
let mut hasher = blake3::Hasher::new();
if chunk_data.len() >= 128 * 1024 {
hasher.update_rayon(&chunk_data);
} else {
hasher.update(&chunk_data);
}
let digest = hasher.finalize();
if chunk_service.has(digest.as_bytes())? {
debug!("already has chunk, skipping");
}
let digest_resp = chunk_service.put(chunk_data)?;
assert_eq!(digest_resp, digest.as_bytes());
Ok(digest.as_bytes().to_vec())
}
}
#[async_trait]
@ -143,89 +164,101 @@ impl<
) -> Result<Response<super::PutBlobResponse>, Status> {
let mut req_inner = request.into_inner();
// TODO: for now, we collect all Chunks into a large Vec<u8>, and then
// pass it to a (content-defined) Chunker.
// This is because the fastcdc crate currently operates on byte slices,
// not on something implementing [std::io::Read].
// (see https://github.com/nlfiedler/fastcdc-rs/issues/17)
let mut blob_contents: Vec<u8> = Vec::new();
while let Some(mut blob_chunk) = req_inner.message().await? {
blob_contents.append(&mut blob_chunk.data);
}
// initialize a new chunker
// TODO: play with chunking sizes
let chunker = fastcdc::v2020::FastCDC::new(
&blob_contents,
64 * 1024 / 4, // min
64 * 1024, // avg
64 * 1024 * 4, // max
);
// initialize blake3 hashers. chunk_hasher is used and reset for each
// chunk, blob_hasher calculates the hash of the whole blob.
let mut chunk_hasher = blake3::Hasher::new();
// initialize a blake3 hasher calculating the hash of the whole blob.
let mut blob_hasher = blake3::Hasher::new();
// start a BlobMeta, which we'll fill while looping over the chunks
let mut blob_meta = super::BlobMeta::default();
// loop over all the chunks
for chunk in chunker {
// extract the data itself
let chunk_data: Vec<u8> =
blob_contents[chunk.offset..chunk.offset + chunk.length].to_vec();
// is filled with bytes received from the client.
let mut buf: Vec<u8> = vec![];
// calculate the digest of that chunk
chunk_hasher.update(&chunk_data);
let chunk_digest = chunk_hasher.finalize();
chunk_hasher.reset();
// also update blob_hasher
blob_hasher.update(&chunk_data);
// check if chunk is already in db, and if not, insert.
match self.chunk_service.has(chunk_digest.as_bytes()) {
Err(e) => {
return Err(Error::StorageError(format!(
"unable to check if chunk {} exists: {}",
BASE64.encode(chunk_digest.as_bytes()),
e
))
.into());
}
Ok(has_chunk) => {
if !has_chunk {
if let Err(e) = self.chunk_service.put(chunk_data.to_vec()) {
return Err(Error::StorageError(format!(
"unable to store chunk {}: {}",
BASE64.encode(chunk_digest.as_bytes()),
e
))
.into());
}
}
}
// This reads data from the client, chunks it up using fastcdc,
// uploads all chunks to the [ChunkService], and fills a
// [super::BlobMeta] linking to these chunks.
while let Some(blob_chunk) = req_inner.message().await? {
// calculate blob hash, and use rayon if data is > 128KiB.
if blob_chunk.data.len() > 128 * 1024 {
blob_hasher.update_rayon(&blob_chunk.data);
} else {
blob_hasher.update(&blob_chunk.data);
}
// add chunk to blobmeta
blob_meta.chunks.push(super::blob_meta::ChunkMeta {
digest: chunk_digest.as_bytes().to_vec(),
size: chunk.length as u32,
});
// extend buf with the newly received data
buf.append(&mut blob_chunk.data.clone());
// TODO: play with chunking sizes
let chunker_avg_size = 64 * 1024;
let chunker_min_size = chunker_avg_size / 4;
let chunker_max_size = chunker_avg_size * 4;
// initialize a chunker with the current buffer
let chunker = fastcdc::v2020::FastCDC::new(
&buf,
chunker_min_size,
chunker_avg_size,
chunker_max_size,
);
// ask the chunker for cutting points in the buffer.
let mut start_pos = 0 as usize;
buf = loop {
// ask the chunker for the next cutting point.
let (_fp, end_pos) = chunker.cut(start_pos, buf.len() - start_pos);
// whenever the last cut point is pointing to the end of the buffer,
// keep that chunk left in there.
// We don't know if the chunker decided to cut here simply because it was
// at the end of the buffer, or if it would also cut if there
// were more data.
//
// Split off all previous chunks and keep this chunk data in the buffer.
if end_pos == buf.len() {
break buf.split_off(start_pos);
}
// Upload that chunk to the chunk service and record it in BlobMeta.
// TODO: make upload_chunk async and upload concurrently?
let chunk_data = &buf[start_pos..end_pos];
let chunk_digest =
Self::upload_chunk(self.chunk_service.clone(), chunk_data.to_vec())?;
blob_meta.chunks.push(super::blob_meta::ChunkMeta {
digest: chunk_digest,
size: chunk_data.len() as u32,
});
// move start_pos over the processed chunk.
start_pos = end_pos;
}
}
// done reading data, finalize blob_hasher and insert blobmeta.
let blob_digest = blob_hasher.finalize();
// Also upload the last chunk (what's left in `buf`) to the chunk
// service and record it in BlobMeta.
let buf_len = buf.len() as u32;
let chunk_digest = Self::upload_chunk(self.chunk_service.clone(), buf)?;
// TODO: don't store if we already have it (potentially with different chunking)
match self.blob_service.put(blob_digest.as_bytes(), blob_meta) {
Ok(()) => Ok(Response::new(super::PutBlobResponse {
digest: blob_digest.as_bytes().to_vec(),
})),
Err(e) => Err(e.into()),
blob_meta.chunks.push(super::blob_meta::ChunkMeta {
digest: chunk_digest,
size: buf_len,
});
let blob_digest = blob_hasher.finalize().as_bytes().to_vec();
// check if we have the received blob in the [BlobService] already.
let resp = self.blob_service.stat(&super::StatBlobRequest {
digest: blob_digest.to_vec(),
..Default::default()
})?;
// if not, store.
if resp.is_none() {
self.blob_service.put(&blob_digest, blob_meta)?;
}
// return to client.
Ok(Response::new(super::PutBlobResponse {
digest: blob_digest,
}))
}
}

View file

@ -166,6 +166,39 @@ async fn put_read_stat_large() {
}
assert_eq!(BLOB_B.len() as u32, size_in_stat);
// Chunks are chunked up the same way we would do locally, when initializing the chunker with the same values.
// TODO: make the chunker config better accessible, so we don't need to synchronize this.
{
let chunker_avg_size = 64 * 1024;
let chunker_min_size = chunker_avg_size / 4;
let chunker_max_size = chunker_avg_size * 4;
// initialize a chunker with the current buffer
let blob_b = BLOB_B.to_vec();
let chunker = fastcdc::v2020::FastCDC::new(
&blob_b,
chunker_min_size,
chunker_avg_size,
chunker_max_size,
);
let mut num_chunks = 0;
for (i, chunk) in chunker.enumerate() {
assert_eq!(
resp.chunks[i].size, chunk.length as u32,
"expected locally-chunked chunk length to match stat response"
);
num_chunks += 1;
}
assert_eq!(
resp.chunks.len(),
num_chunks,
"expected number of chunks to match"
);
}
// Reading the whole blob by its digest via the read() interface should succeed.
{
let resp = service