refactor(tvix/store): factor out hash update into function

We're using this in a bunch of places. Let's move it into a helper
function.

Change-Id: I118fba35f6d343704520ba37280e4ca52a61da44
Reviewed-on: https://cl.tvl.fyi/c/depot/+/8251
Autosubmit: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
Reviewed-by: raitobezarius <tvl@lahfa.xyz>
This commit is contained in:
Florian Klink 2023-03-10 23:24:23 +01:00 committed by clbot
parent 2dc93f8de2
commit b049b88d2d
4 changed files with 35 additions and 19 deletions

View file

@ -7,6 +7,7 @@ use crate::Error;
pub use self::memory::MemoryChunkService;
pub use self::sled::SledChunkService;
pub use self::util::update_hasher;
pub use self::util::upload_chunk;
/// The base trait all ChunkService services need to implement.

View file

@ -11,12 +11,7 @@ pub fn upload_chunk<CS: ChunkService>(
chunk_data: Vec<u8>,
) -> Result<Vec<u8>, Error> {
let mut hasher = blake3::Hasher::new();
// TODO: benchmark this number and factor it out
if chunk_data.len() >= 128 * 1024 {
hasher.update_rayon(&chunk_data);
} else {
hasher.update(&chunk_data);
}
update_hasher(&mut hasher, &chunk_data);
let digest = hasher.finalize();
if chunk_service.has(digest.as_bytes())? {
@ -28,3 +23,24 @@ pub fn upload_chunk<CS: ChunkService>(
Ok(digest.as_bytes().to_vec())
}
/// updates a given hasher with more data. Uses rayon if the data is
/// sufficiently big.
///
/// From the docs:
///
/// To get any performance benefit from multithreading, the input buffer needs
/// to be large. As a rule of thumb on x86_64, update_rayon is slower than
/// update for inputs under 128 KiB. That threshold varies quite a lot across
/// different processors, and its important to benchmark your specific use
/// case.
///
/// We didn't benchmark yet, so these numbers might need tweaking.
#[instrument(skip_all)]
pub fn update_hasher(hasher: &mut blake3::Hasher, data: &[u8]) {
if data.len() > 128 * 1024 {
hasher.update_rayon(data);
} else {
hasher.update(data);
}
}

View file

@ -1,4 +1,7 @@
use crate::{chunkservice::upload_chunk, proto};
use crate::{
chunkservice::{update_hasher, upload_chunk},
proto,
};
use std::{
collections::HashMap,
fmt::Debug,
@ -138,12 +141,8 @@ fn process_entry<BS: BlobService, CS: ChunkService + std::marker::Sync, DS: Dire
let chunk_len = chunk.data.len() as u32;
// update calculate blob hash, and use rayon if data is > 128KiB.
if chunk_len > 128 * 1024 {
blob_hasher.update_rayon(&chunk.data);
} else {
blob_hasher.update(&chunk.data);
}
// update calculate blob hash
update_hasher(&mut blob_hasher, &chunk.data);
let chunk_digest = upload_chunk(chunk_service, chunk.data)?;

View file

@ -1,4 +1,8 @@
use crate::{blobservice::BlobService, chunkservice::ChunkService, Error};
use crate::{
blobservice::BlobService,
chunkservice::{update_hasher, ChunkService},
Error,
};
use data_encoding::BASE64;
use std::io::{BufWriter, Write};
use tokio::{sync::mpsc::channel, task};
@ -23,11 +27,7 @@ impl<BS: BlobService, CS: ChunkService> GRPCBlobServiceWrapper<BS, CS> {
#[instrument(skip(chunk_service))]
fn upload_chunk(chunk_service: CS, chunk_data: Vec<u8>) -> Result<Vec<u8>, Error> {
let mut hasher = blake3::Hasher::new();
if chunk_data.len() >= 128 * 1024 {
hasher.update_rayon(&chunk_data);
} else {
hasher.update(&chunk_data);
}
update_hasher(&mut hasher, &chunk_data);
let digest = hasher.finalize();
if chunk_service.has(digest.as_bytes())? {