From 3506d3bd0ebbbcd907d3cd627d8dd828171fd6d1 Mon Sep 17 00:00:00 2001 From: Florian Klink Date: Fri, 10 Mar 2023 14:21:37 +0100 Subject: [PATCH] refactor(tvix/store): move upload_chunk out of blobwriter This is useful not only in blobwriter contexts. Change-Id: I4c584b5264ff7b4bb3b1a9671affc39e18bf4ccf Reviewed-on: https://cl.tvl.fyi/c/depot/+/8245 Tested-by: BuildkiteCI Autosubmit: flokli Reviewed-by: raitobezarius --- tvix/store/src/blobwriter.rs | 29 ++-------------------------- tvix/store/src/chunkservice/mod.rs | 3 +++ tvix/store/src/chunkservice/util.rs | 30 +++++++++++++++++++++++++++++ 3 files changed, 35 insertions(+), 27 deletions(-) create mode 100644 tvix/store/src/chunkservice/util.rs diff --git a/tvix/store/src/blobwriter.rs b/tvix/store/src/blobwriter.rs index 50471a97c..3966df82d 100644 --- a/tvix/store/src/blobwriter.rs +++ b/tvix/store/src/blobwriter.rs @@ -1,7 +1,7 @@ -use crate::chunkservice::ChunkService; +use crate::chunkservice::{upload_chunk, ChunkService}; use crate::{proto, Error}; use rayon::prelude::*; -use tracing::{debug, instrument}; +use tracing::instrument; pub struct BlobWriter<'a, CS: ChunkService> { chunk_service: &'a CS, @@ -14,31 +14,6 @@ pub struct BlobWriter<'a, CS: ChunkService> { buf: Vec, } -// upload a chunk to the chunk service, and return its digest (or an error) when done. -#[instrument(skip_all)] -fn upload_chunk( - chunk_service: &CS, - chunk_data: Vec, -) -> Result, Error> { - let mut hasher = blake3::Hasher::new(); - // TODO: benchmark this number and factor it out - if chunk_data.len() >= 128 * 1024 { - hasher.update_rayon(&chunk_data); - } else { - hasher.update(&chunk_data); - } - let digest = hasher.finalize(); - - if chunk_service.has(digest.as_bytes())? { - debug!("already has chunk, skipping"); - } - let digest_resp = chunk_service.put(chunk_data)?; - - assert_eq!(digest_resp, digest.as_bytes()); - - Ok(digest.as_bytes().to_vec()) -} - impl<'a, CS: ChunkService> BlobWriter<'a, CS> { pub fn new(chunk_service: &'a CS) -> Self { Self { diff --git a/tvix/store/src/chunkservice/mod.rs b/tvix/store/src/chunkservice/mod.rs index 83c91fd96..725ed2014 100644 --- a/tvix/store/src/chunkservice/mod.rs +++ b/tvix/store/src/chunkservice/mod.rs @@ -1,3 +1,5 @@ +mod util; + pub mod memory; pub mod sled; @@ -5,6 +7,7 @@ use crate::Error; pub use self::memory::MemoryChunkService; pub use self::sled::SledChunkService; +pub use self::util::upload_chunk; /// The base trait all ChunkService services need to implement. /// It allows checking for the existence, download and upload of chunks. diff --git a/tvix/store/src/chunkservice/util.rs b/tvix/store/src/chunkservice/util.rs new file mode 100644 index 000000000..cf644fa51 --- /dev/null +++ b/tvix/store/src/chunkservice/util.rs @@ -0,0 +1,30 @@ +use tracing::{debug, instrument}; + +use crate::Error; + +use super::ChunkService; + +// upload a chunk to the chunk service, and return its digest (or an error) when done. +#[instrument(skip_all, err)] +pub fn upload_chunk( + chunk_service: &CS, + chunk_data: Vec, +) -> Result, Error> { + let mut hasher = blake3::Hasher::new(); + // TODO: benchmark this number and factor it out + if chunk_data.len() >= 128 * 1024 { + hasher.update_rayon(&chunk_data); + } else { + hasher.update(&chunk_data); + } + let digest = hasher.finalize(); + + if chunk_service.has(digest.as_bytes())? { + debug!("already has chunk, skipping"); + } + let digest_resp = chunk_service.put(chunk_data)?; + + assert_eq!(digest_resp, digest.as_bytes()); + + Ok(digest.as_bytes().to_vec()) +}