feat(tvix/store): add blobreader
A BlobReader can be used to read a blob, which might consist out of multiple chunks. Chunks are fetched from a ChunkService. Change-Id: I1806225b0052adaa4a2320b79b744e554e524ee3 Reviewed-on: https://cl.tvl.fyi/c/depot/+/8088 Tested-by: BuildkiteCI Reviewed-by: raitobezarius <tvl@lahfa.xyz> Reviewed-by: tazjin <tazjin@tvl.su>
This commit is contained in:
parent
cff6d1e895
commit
60abca1d8e
2 changed files with 386 additions and 0 deletions
384
tvix/store/src/blobreader.rs
Normal file
384
tvix/store/src/blobreader.rs
Normal file
|
@ -0,0 +1,384 @@
|
|||
use std::io::{self, Cursor, Read, Write};
|
||||
|
||||
use data_encoding::BASE64;
|
||||
|
||||
use crate::{chunkservice::ChunkService, proto};
|
||||
|
||||
/// BlobReader implements reading of a blob, by querying individual chunks.
|
||||
///
|
||||
/// It doesn't talk to BlobService, but assumes something has already fetched
|
||||
/// blob_meta already.
|
||||
pub struct BlobReader<'a, CS: ChunkService> {
|
||||
// used to look up chunks
|
||||
chunk_service: &'a CS,
|
||||
|
||||
// internal iterator over chunk hashes and their sizes
|
||||
chunks_iter: std::vec::IntoIter<proto::blob_meta::ChunkMeta>,
|
||||
|
||||
// If a chunk was partially read (if buf.len() < chunk.size),
|
||||
// a cursor to its contents are stored here.
|
||||
current_chunk: Option<Cursor<Vec<u8>>>,
|
||||
}
|
||||
|
||||
impl<'a, CS: ChunkService> BlobReader<'a, CS> {
|
||||
pub fn open(chunk_service: &'a CS, blob_meta: proto::BlobMeta) -> Self {
|
||||
Self {
|
||||
chunk_service,
|
||||
chunks_iter: blob_meta.chunks.into_iter(),
|
||||
current_chunk: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// reads (up to n bytes) from the current chunk into buf (if there is
|
||||
/// a chunk).
|
||||
///
|
||||
/// If it arrives at the end of the chunk, sets it back to None.
|
||||
/// Returns a io::Result<usize> of the bytes read from the chunk.
|
||||
fn read_from_current_chunk<W: std::io::Write>(
|
||||
&mut self,
|
||||
m: usize,
|
||||
buf: &mut W,
|
||||
) -> std::io::Result<usize> {
|
||||
// If there's still something in partial_chunk, read from there
|
||||
// (up to m: usize bytes) and return the number of bytes read.
|
||||
if let Some(current_chunk) = &mut self.current_chunk {
|
||||
let result = io::copy(&mut current_chunk.take(m as u64), buf);
|
||||
|
||||
match result {
|
||||
Ok(n) => {
|
||||
// if we were not able to read all off m bytes,
|
||||
// this means we arrived at the end of the chunk.
|
||||
if n < m as u64 {
|
||||
self.current_chunk = None
|
||||
}
|
||||
|
||||
// n can never be > m, so downcasting this to usize is ok.
|
||||
Ok(n as usize)
|
||||
}
|
||||
Err(e) => Err(e),
|
||||
}
|
||||
} else {
|
||||
Ok(0)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<CS: ChunkService> std::io::Read for BlobReader<'_, CS> {
|
||||
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
|
||||
let read_max = buf.len();
|
||||
let mut bytes_read = 0_usize;
|
||||
let mut buf_w = std::io::BufWriter::new(buf);
|
||||
|
||||
// read up to buf.len() bytes into buf, by reading from the current
|
||||
// chunk and subsequent ones.
|
||||
loop {
|
||||
// try to fill buf with bytes from the current chunk
|
||||
// (if there's still one)
|
||||
let n = self.read_from_current_chunk(read_max - bytes_read, &mut buf_w)?;
|
||||
bytes_read += n;
|
||||
|
||||
// We want to make sure we don't accidentially read past more than
|
||||
// we're allowed to.
|
||||
assert!(bytes_read <= read_max);
|
||||
|
||||
// buf is entirerly filled, we're done.
|
||||
if bytes_read == read_max {
|
||||
buf_w.flush()?;
|
||||
break Ok(bytes_read);
|
||||
}
|
||||
|
||||
// Otherwise, bytes_read is < read_max, so we could still write
|
||||
// more to buf.
|
||||
// Check if we have more chunks to read from.
|
||||
match self.chunks_iter.next() {
|
||||
// No more chunks, we're done.
|
||||
None => {
|
||||
buf_w.flush()?;
|
||||
return Ok(bytes_read);
|
||||
}
|
||||
// There's another chunk to visit, fetch its contents
|
||||
Some(chunk_meta) => match self.chunk_service.get(&chunk_meta.digest) {
|
||||
// Fetch successful, put it into `self.current_chunk` and restart the loop.
|
||||
Ok(Some(chunk_data)) => {
|
||||
// make sure the size matches what chunk_meta says as well.
|
||||
if chunk_data.len() as u32 != chunk_meta.size {
|
||||
break Err(std::io::Error::new(
|
||||
io::ErrorKind::InvalidData,
|
||||
format!(
|
||||
"chunk_service returned chunk with wrong size for {}, expected {}, got {}",
|
||||
BASE64.encode(&chunk_meta.digest), chunk_meta.size, chunk_data.len()
|
||||
)
|
||||
));
|
||||
}
|
||||
self.current_chunk = Some(Cursor::new(chunk_data));
|
||||
}
|
||||
// Chunk requested does not exist
|
||||
Ok(None) => {
|
||||
break Err(std::io::Error::new(
|
||||
io::ErrorKind::NotFound,
|
||||
format!("chunk {} not found", BASE64.encode(&chunk_meta.digest)),
|
||||
))
|
||||
}
|
||||
// Error occured while fetching the next chunk, propagate the error from the chunk service
|
||||
Err(e) => {
|
||||
break Err(std::io::Error::new(io::ErrorKind::InvalidData, e));
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::BlobReader;
|
||||
use crate::chunkservice::ChunkService;
|
||||
use crate::chunkservice::SledChunkService;
|
||||
use crate::proto;
|
||||
use lazy_static::lazy_static;
|
||||
use std::io::Cursor;
|
||||
use std::io::Read;
|
||||
use std::io::Write;
|
||||
use std::path::PathBuf;
|
||||
use tempfile::TempDir;
|
||||
|
||||
lazy_static! {
|
||||
static ref DUMMY_DIGEST: Vec<u8> = vec![
|
||||
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||
0x00, 0x00, 0x00, 0x00,
|
||||
];
|
||||
static ref DUMMY_DATA_1: Vec<u8> = vec![0x01, 0x02, 0x03];
|
||||
static ref DUMMY_DATA_2: Vec<u8> = vec![0x04, 0x05];
|
||||
}
|
||||
|
||||
fn gen_chunk_service(p: PathBuf) -> impl ChunkService {
|
||||
SledChunkService::new(p.join("chunks")).unwrap()
|
||||
}
|
||||
|
||||
#[test]
|
||||
/// reading from a blobmeta with zero chunks should produce zero bytes.
|
||||
fn empty_blobmeta() -> anyhow::Result<()> {
|
||||
let tmpdir = TempDir::new()?;
|
||||
let chunk_service = gen_chunk_service(tmpdir.path().to_path_buf());
|
||||
|
||||
let blobmeta = proto::BlobMeta {
|
||||
chunks: vec![],
|
||||
inline_bao: vec![],
|
||||
};
|
||||
|
||||
let mut blob_reader = BlobReader::open(&chunk_service, blobmeta);
|
||||
let mut buf = Cursor::new(Vec::new());
|
||||
|
||||
let res = std::io::copy(&mut blob_reader, &mut buf);
|
||||
|
||||
assert_eq!(0, res.unwrap());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
/// trying to read something where the chunk doesn't exist should fail
|
||||
fn missing_chunk_fail() -> anyhow::Result<()> {
|
||||
let tmpdir = TempDir::new()?;
|
||||
let chunk_service = gen_chunk_service(tmpdir.path().to_path_buf());
|
||||
|
||||
let blobmeta = proto::BlobMeta {
|
||||
chunks: vec![proto::blob_meta::ChunkMeta {
|
||||
digest: DUMMY_DIGEST.to_vec(),
|
||||
size: 42,
|
||||
}],
|
||||
inline_bao: vec![],
|
||||
};
|
||||
|
||||
let mut blob_reader = BlobReader::open(&chunk_service, blobmeta);
|
||||
let mut buf = Cursor::new(Vec::new());
|
||||
|
||||
let res = std::io::copy(&mut blob_reader, &mut buf);
|
||||
|
||||
assert!(res.is_err());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
/// read something containing the single (empty) chunk
|
||||
fn empty_chunk() -> anyhow::Result<()> {
|
||||
let tmpdir = TempDir::new()?;
|
||||
let chunk_service = gen_chunk_service(tmpdir.path().to_path_buf());
|
||||
|
||||
// insert a single chunk
|
||||
let dgst = chunk_service.put(vec![]).expect("must succeed");
|
||||
|
||||
// assemble a blobmeta
|
||||
let blobmeta = proto::BlobMeta {
|
||||
chunks: vec![proto::blob_meta::ChunkMeta {
|
||||
digest: dgst,
|
||||
size: 0,
|
||||
}],
|
||||
inline_bao: vec![],
|
||||
};
|
||||
|
||||
let mut blob_reader = BlobReader::open(&chunk_service, blobmeta);
|
||||
|
||||
let mut buf: Vec<u8> = Vec::new();
|
||||
|
||||
let res =
|
||||
std::io::copy(&mut blob_reader, &mut Cursor::new(&mut buf)).expect("must succeed");
|
||||
|
||||
assert_eq!(res, 0, "number of bytes read must match");
|
||||
assert!(buf.is_empty(), "buf must be empty");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// read something which contains a single chunk
|
||||
#[test]
|
||||
fn single_chunk() -> anyhow::Result<()> {
|
||||
let tmpdir = TempDir::new()?;
|
||||
let chunk_service = gen_chunk_service(tmpdir.path().to_path_buf());
|
||||
|
||||
// insert a single chunk
|
||||
let dgst = chunk_service
|
||||
.put(DUMMY_DATA_1.clone())
|
||||
.expect("must succeed");
|
||||
|
||||
// assemble a blobmeta
|
||||
let blobmeta = proto::BlobMeta {
|
||||
chunks: vec![proto::blob_meta::ChunkMeta {
|
||||
digest: dgst,
|
||||
size: 3,
|
||||
}],
|
||||
inline_bao: vec![],
|
||||
};
|
||||
|
||||
let mut blob_reader = BlobReader::open(&chunk_service, blobmeta);
|
||||
|
||||
let mut buf: Vec<u8> = Vec::new();
|
||||
|
||||
let res =
|
||||
std::io::copy(&mut blob_reader, &mut Cursor::new(&mut buf)).expect("must succeed");
|
||||
|
||||
assert_eq!(res, 3, "number of bytes read must match");
|
||||
assert_eq!(DUMMY_DATA_1[..], buf[..], "data read must match");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// read something referring to a chunk, but with wrong size
|
||||
#[test]
|
||||
fn wrong_size_fail() -> anyhow::Result<()> {
|
||||
let tmpdir = TempDir::new()?;
|
||||
let chunk_service = gen_chunk_service(tmpdir.path().to_path_buf());
|
||||
|
||||
// insert chunks
|
||||
let dgst_1 = chunk_service
|
||||
.put(DUMMY_DATA_1.clone())
|
||||
.expect("must succeed");
|
||||
|
||||
// assemble a blobmeta
|
||||
let blobmeta = proto::BlobMeta {
|
||||
chunks: vec![proto::blob_meta::ChunkMeta {
|
||||
digest: dgst_1,
|
||||
size: 42,
|
||||
}],
|
||||
inline_bao: vec![],
|
||||
};
|
||||
|
||||
let mut blob_reader = BlobReader::open(&chunk_service, blobmeta);
|
||||
|
||||
let mut buf: Vec<u8> = Vec::new();
|
||||
|
||||
let res = std::io::copy(&mut blob_reader, &mut Cursor::new(&mut buf));
|
||||
|
||||
assert!(res.is_err(), "reading must fail");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// read something referring to multiple chunks
|
||||
#[test]
|
||||
fn multiple_chunks() -> anyhow::Result<()> {
|
||||
let tmpdir = TempDir::new()?;
|
||||
let chunk_service = gen_chunk_service(tmpdir.path().to_path_buf());
|
||||
|
||||
// insert chunks
|
||||
let dgst_1 = chunk_service
|
||||
.put(DUMMY_DATA_1.clone())
|
||||
.expect("must succeed");
|
||||
let dgst_2 = chunk_service
|
||||
.put(DUMMY_DATA_2.clone())
|
||||
.expect("must succeed");
|
||||
|
||||
// assemble a blobmeta
|
||||
let blobmeta = proto::BlobMeta {
|
||||
chunks: vec![
|
||||
proto::blob_meta::ChunkMeta {
|
||||
digest: dgst_1.clone(),
|
||||
size: 3,
|
||||
},
|
||||
proto::blob_meta::ChunkMeta {
|
||||
digest: dgst_2,
|
||||
size: 2,
|
||||
},
|
||||
proto::blob_meta::ChunkMeta {
|
||||
digest: dgst_1,
|
||||
size: 3,
|
||||
},
|
||||
],
|
||||
inline_bao: vec![],
|
||||
};
|
||||
|
||||
// assemble ecpected data
|
||||
let mut expected_data: Vec<u8> = Vec::new();
|
||||
expected_data.extend_from_slice(&DUMMY_DATA_1[..]);
|
||||
expected_data.extend_from_slice(&DUMMY_DATA_2[..]);
|
||||
expected_data.extend_from_slice(&DUMMY_DATA_1[..]);
|
||||
|
||||
// read via io::copy
|
||||
{
|
||||
let mut blob_reader = BlobReader::open(&chunk_service, blobmeta.clone());
|
||||
|
||||
let mut buf: Vec<u8> = Vec::new();
|
||||
|
||||
let res =
|
||||
std::io::copy(&mut blob_reader, &mut Cursor::new(&mut buf)).expect("must succeed");
|
||||
|
||||
assert_eq!(8, res, "number of bytes read must match");
|
||||
|
||||
assert_eq!(expected_data[..], buf[..], "data read must match");
|
||||
}
|
||||
|
||||
// now read the same thing again, but not via io::copy, but individually
|
||||
{
|
||||
let mut blob_reader = BlobReader::open(&chunk_service, blobmeta);
|
||||
|
||||
let mut buf: Vec<u8> = Vec::new();
|
||||
let mut cursor = Cursor::new(&mut buf);
|
||||
|
||||
let mut bytes_read = 0;
|
||||
|
||||
loop {
|
||||
let mut smallbuf = [0xff; 1];
|
||||
match blob_reader.read(&mut smallbuf) {
|
||||
Ok(n) => {
|
||||
if n == 0 {
|
||||
break;
|
||||
}
|
||||
let w_b = cursor.write(&smallbuf).unwrap();
|
||||
assert_eq!(n, w_b);
|
||||
bytes_read += w_b;
|
||||
}
|
||||
Err(_) => {
|
||||
panic!("error occured during read");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
assert_eq!(8, bytes_read, "number of bytes read must match");
|
||||
assert_eq!(expected_data[..], buf[..], "data read must match");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
|
@ -1,11 +1,13 @@
|
|||
pub mod client;
|
||||
|
||||
mod blobreader;
|
||||
mod errors;
|
||||
|
||||
pub mod blobservice;
|
||||
pub mod chunkservice;
|
||||
pub mod proto;
|
||||
|
||||
pub use blobreader::BlobReader;
|
||||
pub mod dummy_blob_service;
|
||||
pub mod sled_directory_service;
|
||||
pub mod sled_path_info_service;
|
||||
|
|
Loading…
Reference in a new issue