refactor(tvix/store/chunksvc): use [u8; 32] instead of Vec<u8>

Change-Id: Ie2b94aa5d69ff2c61fb77e13ae844f81f6270273
Reviewed-on: https://cl.tvl.fyi/c/depot/+/8314
Tested-by: BuildkiteCI
Reviewed-by: tazjin <tazjin@tvl.su>
Autosubmit: flokli <flokli@flokli.de>
This commit is contained in:
Florian Klink 2023-03-16 00:56:30 +01:00 committed by flokli
parent ee23220564
commit 530cb920b5
6 changed files with 70 additions and 54 deletions

View file

@ -97,33 +97,42 @@ impl<CS: ChunkService> std::io::Read for BlobReader<'_, CS> {
return Ok(bytes_read);
}
// There's another chunk to visit, fetch its contents
Some(chunk_meta) => match self.chunk_service.get(&chunk_meta.digest) {
// Fetch successful, put it into `self.current_chunk` and restart the loop.
Ok(Some(chunk_data)) => {
// make sure the size matches what chunk_meta says as well.
if chunk_data.len() as u32 != chunk_meta.size {
break Err(std::io::Error::new(
Some(chunk_meta) => {
let chunk_meta_digest: [u8; 32] =
chunk_meta.digest.clone().try_into().map_err(|_e| {
std::io::Error::new(
io::ErrorKind::InvalidData,
format!("chunk in chunkmeta has wrong digest size"),
)
})?;
match self.chunk_service.get(&chunk_meta_digest) {
// Fetch successful, put it into `self.current_chunk` and restart the loop.
Ok(Some(chunk_data)) => {
// make sure the size matches what chunk_meta says as well.
if chunk_data.len() as u32 != chunk_meta.size {
break Err(std::io::Error::new(
io::ErrorKind::InvalidData,
format!(
"chunk_service returned chunk with wrong size for {}, expected {}, got {}",
BASE64.encode(&chunk_meta.digest), chunk_meta.size, chunk_data.len()
)
));
}
self.current_chunk = Some(Cursor::new(chunk_data));
}
// Chunk requested does not exist
Ok(None) => {
break Err(std::io::Error::new(
io::ErrorKind::NotFound,
format!("chunk {} not found", BASE64.encode(&chunk_meta.digest)),
))
}
// Error occured while fetching the next chunk, propagate the error from the chunk service
Err(e) => {
break Err(std::io::Error::new(io::ErrorKind::InvalidData, e));
}
self.current_chunk = Some(Cursor::new(chunk_data));
}
// Chunk requested does not exist
Ok(None) => {
break Err(std::io::Error::new(
io::ErrorKind::NotFound,
format!("chunk {} not found", BASE64.encode(&chunk_meta.digest)),
))
}
// Error occured while fetching the next chunk, propagate the error from the chunk service
Err(e) => {
break Err(std::io::Error::new(io::ErrorKind::InvalidData, e));
}
},
}
}
}
}
@ -196,7 +205,7 @@ mod tests {
// assemble a blobmeta
let blobmeta = proto::BlobMeta {
chunks: vec![proto::blob_meta::ChunkMeta {
digest: dgst,
digest: dgst.to_vec(),
size: 0,
}],
inline_bao: vec![],
@ -228,7 +237,7 @@ mod tests {
// assemble a blobmeta
let blobmeta = proto::BlobMeta {
chunks: vec![proto::blob_meta::ChunkMeta {
digest: dgst,
digest: dgst.to_vec(),
size: 3,
}],
inline_bao: vec![],
@ -260,7 +269,7 @@ mod tests {
// assemble a blobmeta
let blobmeta = proto::BlobMeta {
chunks: vec![proto::blob_meta::ChunkMeta {
digest: dgst_1,
digest: dgst_1.to_vec(),
size: 42,
}],
inline_bao: vec![],
@ -294,15 +303,15 @@ mod tests {
let blobmeta = proto::BlobMeta {
chunks: vec![
proto::blob_meta::ChunkMeta {
digest: dgst_1.clone(),
digest: dgst_1.to_vec(),
size: 3,
},
proto::blob_meta::ChunkMeta {
digest: dgst_2,
digest: dgst_2.to_vec(),
size: 2,
},
proto::blob_meta::ChunkMeta {
digest: dgst_1,
digest: dgst_1.to_vec(),
size: 3,
},
],

View file

@ -11,18 +11,18 @@ use super::ChunkService;
#[derive(Clone, Default)]
pub struct MemoryChunkService {
db: Arc<RwLock<HashMap<Vec<u8>, Vec<u8>>>>,
db: Arc<RwLock<HashMap<[u8; 32], Vec<u8>>>>,
}
impl ChunkService for MemoryChunkService {
#[instrument(skip(self, digest), fields(chunk.digest=BASE64.encode(digest)))]
fn has(&self, digest: &[u8]) -> Result<bool, Error> {
fn has(&self, digest: &[u8; 32]) -> Result<bool, Error> {
let db = self.db.read().unwrap();
Ok(db.get(digest).is_some())
}
#[instrument(skip(self), fields(chunk.digest=BASE64.encode(digest)))]
fn get(&self, digest: &[u8]) -> Result<Option<Vec<u8>>, Error> {
fn get(&self, digest: &[u8; 32]) -> Result<Option<Vec<u8>>, Error> {
let db = self.db.read().unwrap();
match db.get(digest) {
None => Ok(None),
@ -42,12 +42,12 @@ impl ChunkService for MemoryChunkService {
}
#[instrument(skip(self, data))]
fn put(&self, data: Vec<u8>) -> Result<Vec<u8>, Error> {
let digest = blake3::hash(&data).as_bytes().to_vec();
fn put(&self, data: Vec<u8>) -> Result<[u8; 32], Error> {
let digest = blake3::hash(&data);
let mut db = self.db.write().unwrap();
db.insert(digest.clone(), data);
db.insert(digest.as_bytes().clone(), data);
Ok(digest)
Ok(digest.as_bytes().clone())
}
}

View file

@ -17,12 +17,12 @@ pub use self::util::upload_chunk;
/// chunking information.
pub trait ChunkService {
/// check if the service has a chunk, given by its digest.
fn has(&self, digest: &[u8]) -> Result<bool, Error>;
fn has(&self, digest: &[u8; 32]) -> Result<bool, Error>;
/// retrieve a chunk by its digest. Implementations MUST validate the digest
/// matches.
fn get(&self, digest: &[u8]) -> Result<Option<Vec<u8>>, Error>;
fn get(&self, digest: &[u8; 32]) -> Result<Option<Vec<u8>>, Error>;
/// insert a chunk. returns the digest of the chunk, or an error.
fn put(&self, data: Vec<u8>) -> Result<Vec<u8>, Error>;
fn put(&self, data: Vec<u8>) -> Result<[u8; 32], Error>;
}

View file

@ -30,7 +30,7 @@ impl SledChunkService {
impl ChunkService for SledChunkService {
#[instrument(name = "SledChunkService::has", skip(self, digest), fields(chunk.digest=BASE64.encode(digest)))]
fn has(&self, digest: &[u8]) -> Result<bool, Error> {
fn has(&self, digest: &[u8; 32]) -> Result<bool, Error> {
match self.db.get(digest) {
Ok(None) => Ok(false),
Ok(Some(_)) => Ok(true),
@ -39,7 +39,7 @@ impl ChunkService for SledChunkService {
}
#[instrument(name = "SledChunkService::get", skip(self), fields(chunk.digest=BASE64.encode(digest)))]
fn get(&self, digest: &[u8]) -> Result<Option<Vec<u8>>, Error> {
fn get(&self, digest: &[u8; 32]) -> Result<Option<Vec<u8>>, Error> {
match self.db.get(digest) {
Ok(None) => Ok(None),
Ok(Some(data)) => {
@ -59,12 +59,12 @@ impl ChunkService for SledChunkService {
}
#[instrument(name = "SledChunkService::put", skip(self, data))]
fn put(&self, data: Vec<u8>) -> Result<Vec<u8>, Error> {
let digest = blake3::hash(&data).as_bytes().to_vec();
let result = self.db.insert(&digest, data);
fn put(&self, data: Vec<u8>) -> Result<[u8; 32], Error> {
let digest = blake3::hash(&data);
let result = self.db.insert(&digest.as_bytes(), data);
if let Err(e) = result {
return Err(Error::StorageError(e.to_string()));
}
Ok(digest)
Ok(digest.as_bytes().clone())
}
}

View file

@ -9,7 +9,7 @@ use super::ChunkService;
pub fn upload_chunk<CS: ChunkService>(
chunk_service: &CS,
chunk_data: Vec<u8>,
) -> Result<Vec<u8>, Error> {
) -> Result<[u8; 32], Error> {
let mut hasher = blake3::Hasher::new();
update_hasher(&mut hasher, &chunk_data);
let digest = hasher.finalize();
@ -19,9 +19,9 @@ pub fn upload_chunk<CS: ChunkService>(
}
let digest_resp = chunk_service.put(chunk_data)?;
assert_eq!(digest_resp, digest.as_bytes());
assert_eq!(&digest_resp, digest.as_bytes());
Ok(digest.as_bytes().to_vec())
Ok(digest.as_bytes().clone())
}
/// reads through a reader, writes chunks to a [ChunkService] and returns a
@ -56,7 +56,7 @@ pub fn read_all_and_chunk<CS: ChunkService, R: Read>(
let chunk_digest = upload_chunk(chunk_service, chunk.data)?;
blob_meta.chunks.push(proto::blob_meta::ChunkMeta {
digest: chunk_digest,
digest: chunk_digest.to_vec(),
size: chunk_len,
});
}

View file

@ -36,7 +36,7 @@ impl<BS: BlobService, CS: ChunkService> GRPCBlobServiceWrapper<BS, CS> {
}
let digest_resp = chunk_service.put(chunk_data)?;
assert_eq!(digest_resp, digest.as_bytes());
assert_eq!(&digest_resp, digest.as_bytes());
Ok(digest.as_bytes().to_vec())
}
@ -74,9 +74,14 @@ impl<
let req = request.into_inner();
let (tx, rx) = channel(5);
let req_digest: [u8; 32] = req
.digest
.try_into()
.map_err(|_e| Status::invalid_argument("invalid digest length"))?;
// query the chunk service for more detailed blob info
let stat_resp = self.blob_service.stat(&super::StatBlobRequest {
digest: req.digest.to_vec(),
digest: req_digest.to_vec(),
include_chunks: true,
..Default::default()
})?;
@ -86,7 +91,7 @@ impl<
// If the stat didn't return any blobmeta, the client might
// still have asked for a single chunk to be read.
// Check the chunkstore.
if let Some(data) = self.chunk_service.get(&req.digest)? {
if let Some(data) = self.chunk_service.get(&req_digest)? {
// We already know the hash matches, and contrary to
// iterating over a blobmeta, we can't know the size,
// so send the contents of that chunk over,
@ -101,7 +106,7 @@ impl<
} else {
return Err(Status::not_found(format!(
"blob {} not found",
BASE64.encode(&req.digest),
BASE64.encode(&req_digest),
)));
}
}
@ -118,13 +123,15 @@ impl<
// request chunk.
// We don't need to validate the digest again, as
// that's required for all implementations of ChunkService.
let res = match chunk_client.get(&chunkmeta.digest) {
// TODO: handle error
let chunkmeta_digest = &chunkmeta.digest.try_into().unwrap();
let res = match chunk_client.get(&chunkmeta_digest) {
Err(e) => Err(e.into()),
// TODO: make this a separate error type
Ok(None) => Err(Error::StorageError(format!(
"consistency error: chunk {} for blob {} not found",
BASE64.encode(&chunkmeta.digest),
BASE64.encode(&req.digest),
BASE64.encode(chunkmeta_digest),
BASE64.encode(&req_digest),
))
.into()),
Ok(Some(data)) => {
@ -133,8 +140,8 @@ impl<
if data.len() as u32 != chunkmeta.size {
Err(Error::StorageError(format!(
"consistency error: chunk {} for blob {} has wrong size, expected {}, got {}",
BASE64.encode(&chunkmeta.digest),
BASE64.encode(&req.digest),
BASE64.encode(chunkmeta_digest),
BASE64.encode(&req_digest),
chunkmeta.size,
data.len(),
)).into())