feat(tvix/castore): add HashingReader, B3HashingReader
HashingReader wraps an existing AsyncRead, and allows querying for the digest of all data read "through" it. The hash function is configurable by type parameter, and we define B3HashingReader. Change-Id: Ic08142077566fc08836662218f5ec8c3aff80be5 Reviewed-on: https://cl.tvl.fyi/c/depot/+/11087 Autosubmit: flokli <flokli@flokli.de> Reviewed-by: raitobezarius <tvl@lahfa.xyz> Tested-by: BuildkiteCI
This commit is contained in:
parent
8383e9e02e
commit
4b4443240e
2 changed files with 90 additions and 0 deletions
88
tvix/castore/src/hashing_reader.rs
Normal file
88
tvix/castore/src/hashing_reader.rs
Normal file
|
@ -0,0 +1,88 @@
|
||||||
|
use pin_project_lite::pin_project;
|
||||||
|
use tokio::io::AsyncRead;
|
||||||
|
|
||||||
|
pin_project! {
|
||||||
|
/// Wraps an existing AsyncRead, and allows querying for the digest of all
|
||||||
|
/// data read "through" it.
|
||||||
|
/// The hash function is configurable by type parameter.
|
||||||
|
pub struct HashingReader<R, H>
|
||||||
|
where
|
||||||
|
R: AsyncRead,
|
||||||
|
H: digest::Digest,
|
||||||
|
{
|
||||||
|
#[pin]
|
||||||
|
inner: R,
|
||||||
|
hasher: H,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub type B3HashingReader<R> = HashingReader<R, blake3::Hasher>;
|
||||||
|
|
||||||
|
impl<R, H> HashingReader<R, H>
|
||||||
|
where
|
||||||
|
R: AsyncRead,
|
||||||
|
H: digest::Digest,
|
||||||
|
{
|
||||||
|
pub fn from(r: R) -> Self {
|
||||||
|
Self {
|
||||||
|
inner: r,
|
||||||
|
hasher: H::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Return the digest.
|
||||||
|
pub fn digest(self) -> digest::Output<H> {
|
||||||
|
self.hasher.finalize()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<R, H> tokio::io::AsyncRead for HashingReader<R, H>
|
||||||
|
where
|
||||||
|
R: AsyncRead,
|
||||||
|
H: digest::Digest,
|
||||||
|
{
|
||||||
|
fn poll_read(
|
||||||
|
self: std::pin::Pin<&mut Self>,
|
||||||
|
cx: &mut std::task::Context<'_>,
|
||||||
|
buf: &mut tokio::io::ReadBuf<'_>,
|
||||||
|
) -> std::task::Poll<std::io::Result<()>> {
|
||||||
|
let buf_filled_len_before = buf.filled().len();
|
||||||
|
|
||||||
|
let this = self.project();
|
||||||
|
let ret = this.inner.poll_read(cx, buf);
|
||||||
|
|
||||||
|
// write everything new filled into the hasher.
|
||||||
|
this.hasher.update(&buf.filled()[buf_filled_len_before..]);
|
||||||
|
|
||||||
|
ret
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use std::io::Cursor;
|
||||||
|
|
||||||
|
use test_case::test_case;
|
||||||
|
|
||||||
|
use crate::fixtures::BLOB_A;
|
||||||
|
use crate::fixtures::BLOB_A_DIGEST;
|
||||||
|
use crate::fixtures::BLOB_B;
|
||||||
|
use crate::fixtures::BLOB_B_DIGEST;
|
||||||
|
use crate::fixtures::EMPTY_BLOB_DIGEST;
|
||||||
|
use crate::{B3Digest, B3HashingReader};
|
||||||
|
|
||||||
|
#[test_case(&BLOB_A, &BLOB_A_DIGEST; "blob a")]
|
||||||
|
#[test_case(&BLOB_B, &BLOB_B_DIGEST; "blob b")]
|
||||||
|
#[test_case(&[], &EMPTY_BLOB_DIGEST; "empty blob")]
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_b3_hashing_reader(data: &[u8], b3_digest: &B3Digest) {
|
||||||
|
let r = Cursor::new(data);
|
||||||
|
let mut hr = B3HashingReader::from(r);
|
||||||
|
|
||||||
|
tokio::io::copy(&mut hr, &mut tokio::io::sink())
|
||||||
|
.await
|
||||||
|
.expect("read must succeed");
|
||||||
|
|
||||||
|
assert_eq!(*b3_digest, hr.digest().into());
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,5 +1,6 @@
|
||||||
mod digests;
|
mod digests;
|
||||||
mod errors;
|
mod errors;
|
||||||
|
mod hashing_reader;
|
||||||
|
|
||||||
pub mod blobservice;
|
pub mod blobservice;
|
||||||
pub mod directoryservice;
|
pub mod directoryservice;
|
||||||
|
@ -15,6 +16,7 @@ pub mod utils;
|
||||||
|
|
||||||
pub use digests::{B3Digest, B3_LEN};
|
pub use digests::{B3Digest, B3_LEN};
|
||||||
pub use errors::Error;
|
pub use errors::Error;
|
||||||
|
pub use hashing_reader::{B3HashingReader, HashingReader};
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests;
|
mod tests;
|
||||||
|
|
Loading…
Reference in a new issue