refactor(tvix/store): add read_all_and_chunk method
This moves the logic from src/import.rs that - reads over the contents of a file - chunks them up and uploads individual chunks - keeps track of the uploaded chunks in a BlobMeta structure - returns the hash of the blob and the BlobMeta structure … into a generic read_all_and_chunk function in src/chunkservice/util.rs. It will work on anything implementing io::Read, not just files, which will help us in a bit. Change-Id: I53bf628114b73ee2e515bdae29974571ea2b6f6f Reviewed-on: https://cl.tvl.fyi/c/depot/+/8259 Reviewed-by: tazjin <tazjin@tvl.su> Tested-by: BuildkiteCI Reviewed-by: raitobezarius <tvl@lahfa.xyz> Autosubmit: flokli <flokli@flokli.de>
This commit is contained in:
parent
2fe7192dbc
commit
7ffb2676ee
3 changed files with 46 additions and 43 deletions
|
@ -7,6 +7,7 @@ use crate::Error;
|
|||
|
||||
pub use self::memory::MemoryChunkService;
|
||||
pub use self::sled::SledChunkService;
|
||||
pub use self::util::read_all_and_chunk;
|
||||
pub use self::util::update_hasher;
|
||||
pub use self::util::upload_chunk;
|
||||
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
use crate::{proto, Error};
|
||||
use std::io::Read;
|
||||
use tracing::{debug, instrument};
|
||||
|
||||
use crate::Error;
|
||||
|
||||
use super::ChunkService;
|
||||
|
||||
/// uploads a chunk to a chunk service, and returns its digest (or an error) when done.
|
||||
|
@ -24,6 +24,45 @@ pub fn upload_chunk<CS: ChunkService>(
|
|||
Ok(digest.as_bytes().to_vec())
|
||||
}
|
||||
|
||||
/// reads through a reader, writes chunks to a [ChunkService] and returns a
|
||||
/// [proto::BlobMeta] pointing to all the chunks.
|
||||
#[instrument(skip_all, err)]
|
||||
pub fn read_all_and_chunk<CS: ChunkService, R: Read>(
|
||||
chunk_service: &CS,
|
||||
r: R,
|
||||
) -> Result<(Vec<u8>, proto::BlobMeta), Error> {
|
||||
let mut blob_meta = proto::BlobMeta::default();
|
||||
|
||||
// hash the file contents, upload chunks if not there yet
|
||||
let mut blob_hasher = blake3::Hasher::new();
|
||||
|
||||
// TODO: play with chunking sizes
|
||||
let chunker_avg_size = 64 * 1024;
|
||||
let chunker_min_size = chunker_avg_size / 4;
|
||||
let chunker_max_size = chunker_avg_size * 4;
|
||||
|
||||
let chunker =
|
||||
fastcdc::v2020::StreamCDC::new(r, chunker_min_size, chunker_avg_size, chunker_max_size);
|
||||
|
||||
for chunking_result in chunker {
|
||||
let chunk = chunking_result.unwrap();
|
||||
// TODO: convert to error::UnableToRead
|
||||
|
||||
let chunk_len = chunk.data.len() as u32;
|
||||
|
||||
// update calculate blob hash
|
||||
update_hasher(&mut blob_hasher, &chunk.data);
|
||||
|
||||
let chunk_digest = upload_chunk(chunk_service, chunk.data)?;
|
||||
|
||||
blob_meta.chunks.push(proto::blob_meta::ChunkMeta {
|
||||
digest: chunk_digest,
|
||||
size: chunk_len,
|
||||
});
|
||||
}
|
||||
Ok((blob_hasher.finalize().as_bytes().to_vec(), blob_meta))
|
||||
}
|
||||
|
||||
/// updates a given hasher with more data. Uses rayon if the data is
|
||||
/// sufficiently big.
|
||||
///
|
||||
|
|
|
@ -1,7 +1,4 @@
|
|||
use crate::{
|
||||
chunkservice::{update_hasher, upload_chunk},
|
||||
proto,
|
||||
};
|
||||
use crate::{chunkservice::read_all_and_chunk, proto};
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
fmt::Debug,
|
||||
|
@ -115,44 +112,10 @@ fn process_entry<BS: BlobService, CS: ChunkService + std::marker::Sync, DS: Dire
|
|||
.metadata()
|
||||
.map_err(|e| Error::UnableToStat(entry_path.clone(), e.into()))?;
|
||||
|
||||
// hash the file contents, upload chunks if not there yet
|
||||
let (blob_digest, blob_meta) = {
|
||||
let file = File::open(entry_path.clone())
|
||||
.map_err(|e| Error::UnableToOpen(entry_path.clone(), e))?;
|
||||
let file = File::open(entry_path.clone())
|
||||
.map_err(|e| Error::UnableToOpen(entry_path.clone(), e))?;
|
||||
|
||||
let mut blob_meta = proto::BlobMeta::default();
|
||||
let mut blob_hasher = blake3::Hasher::new();
|
||||
|
||||
// TODO: play with chunking sizes
|
||||
let chunker_avg_size = 64 * 1024;
|
||||
let chunker_min_size = chunker_avg_size / 4;
|
||||
let chunker_max_size = chunker_avg_size * 4;
|
||||
|
||||
let chunker = fastcdc::v2020::StreamCDC::new(
|
||||
file,
|
||||
chunker_min_size,
|
||||
chunker_avg_size,
|
||||
chunker_max_size,
|
||||
);
|
||||
|
||||
for chunking_result in chunker {
|
||||
let chunk = chunking_result.unwrap();
|
||||
// TODO: convert to error::UnableToRead
|
||||
|
||||
let chunk_len = chunk.data.len() as u32;
|
||||
|
||||
// update calculate blob hash
|
||||
update_hasher(&mut blob_hasher, &chunk.data);
|
||||
|
||||
let chunk_digest = upload_chunk(chunk_service, chunk.data)?;
|
||||
|
||||
blob_meta.chunks.push(proto::blob_meta::ChunkMeta {
|
||||
digest: chunk_digest,
|
||||
size: chunk_len,
|
||||
});
|
||||
}
|
||||
(blob_hasher.finalize().as_bytes().to_vec(), blob_meta)
|
||||
};
|
||||
let (blob_digest, blob_meta) = read_all_and_chunk(chunk_service, file)?;
|
||||
|
||||
// upload blobmeta if not there yet
|
||||
if blob_service
|
||||
|
|
Loading…
Reference in a new issue