feat(tvix/store/import): use StreamCDC instead of blobwriter
This seems to be way faster. Change-Id: Ica7cee95d108c51fe67365f07366634ddbbfa060 Reviewed-on: https://cl.tvl.fyi/c/depot/+/8246 Reviewed-by: raitobezarius <tvl@lahfa.xyz> Reviewed-by: tazjin <tazjin@tvl.su> Autosubmit: flokli <flokli@flokli.de> Tested-by: BuildkiteCI
This commit is contained in:
parent
ceb9d670bf
commit
ead113cdfc
1 changed files with 35 additions and 9 deletions
|
@ -1,10 +1,9 @@
|
|||
use crate::{proto, BlobWriter};
|
||||
use crate::{chunkservice::upload_chunk, proto};
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
fmt::Debug,
|
||||
fs,
|
||||
fs::File,
|
||||
io::BufReader,
|
||||
os::unix::prelude::PermissionsExt,
|
||||
path::{Path, PathBuf},
|
||||
};
|
||||
|
@ -115,18 +114,45 @@ fn process_entry<BS: BlobService, CS: ChunkService + std::marker::Sync, DS: Dire
|
|||
|
||||
// hash the file contents, upload chunks if not there yet
|
||||
let (blob_digest, blob_meta) = {
|
||||
let mut blob_writer = BlobWriter::new(chunk_service);
|
||||
|
||||
let file = File::open(entry_path.clone())
|
||||
.map_err(|e| Error::UnableToOpen(entry_path.clone(), e))?;
|
||||
|
||||
let mut file_reader = BufReader::new(file);
|
||||
let mut blob_meta = proto::BlobMeta::default();
|
||||
let mut blob_hasher = blake3::Hasher::new();
|
||||
|
||||
std::io::copy(&mut file_reader, &mut blob_writer)
|
||||
.map_err(|e| Error::UnableToRead(entry_path, e))?;
|
||||
// TODO: play with chunking sizes
|
||||
let chunker_avg_size = 64 * 1024;
|
||||
let chunker_min_size = chunker_avg_size / 4;
|
||||
let chunker_max_size = chunker_avg_size * 4;
|
||||
|
||||
// TODO: handle errors
|
||||
blob_writer.finalize().unwrap()
|
||||
let chunker = fastcdc::v2020::StreamCDC::new(
|
||||
Box::new(file),
|
||||
chunker_min_size,
|
||||
chunker_avg_size,
|
||||
chunker_max_size,
|
||||
);
|
||||
|
||||
for chunking_result in chunker {
|
||||
let chunk = chunking_result.unwrap();
|
||||
// TODO: convert to error::UnableToRead
|
||||
|
||||
let chunk_len = chunk.data.len() as u32;
|
||||
|
||||
// update calculate blob hash, and use rayon if data is > 128KiB.
|
||||
if chunk_len > 128 * 1024 {
|
||||
blob_hasher.update_rayon(&chunk.data);
|
||||
} else {
|
||||
blob_hasher.update(&chunk.data);
|
||||
}
|
||||
|
||||
let chunk_digest = upload_chunk(chunk_service, chunk.data)?;
|
||||
|
||||
blob_meta.chunks.push(proto::blob_meta::ChunkMeta {
|
||||
digest: chunk_digest,
|
||||
size: chunk_len,
|
||||
});
|
||||
}
|
||||
(blob_hasher.finalize().as_bytes().to_vec(), blob_meta)
|
||||
};
|
||||
|
||||
// upload blobmeta if not there yet
|
||||
|
|
Loading…
Reference in a new issue