feat(tvix/store/blobservice): implement seek
For memory and sled, it's trivial, as we already have a Cursor<Vec<u8>>. For gRPC, we simply reject going backwards, and skip n bytes for now. Once the gRPC protocol gets support for offsets and verified streaming, this can be improved. Change-Id: I734066a514aed287ea3db64bfb1680911ac1eeb0 Reviewed-on: https://cl.tvl.fyi/c/depot/+/8885 Autosubmit: flokli <flokli@flokli.de> Tested-by: BuildkiteCI Reviewed-by: tazjin <tazjin@tvl.su>
This commit is contained in:
parent
42dc18353d
commit
7613e2e769
6 changed files with 240 additions and 11 deletions
93
tvix/store/src/blobservice/dumb_seeker.rs
Normal file
93
tvix/store/src/blobservice/dumb_seeker.rs
Normal file
|
@ -0,0 +1,93 @@
|
|||
use std::io;
|
||||
|
||||
use super::BlobReader;
|
||||
|
||||
/// This implements [io::Seek] for and [io::Read] by simply skipping over some
|
||||
/// bytes, keeping track of the position.
|
||||
/// It fails whenever you try to seek backwards.
|
||||
pub struct DumbSeeker<R: io::Read> {
|
||||
r: R,
|
||||
pos: u64,
|
||||
}
|
||||
|
||||
impl<R: io::Read> DumbSeeker<R> {
|
||||
pub fn new(r: R) -> Self {
|
||||
DumbSeeker { r, pos: 0 }
|
||||
}
|
||||
}
|
||||
|
||||
impl<R: io::Read> io::Read for DumbSeeker<R> {
|
||||
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
||||
let bytes_read = self.r.read(buf)?;
|
||||
|
||||
self.pos += bytes_read as u64;
|
||||
|
||||
Ok(bytes_read)
|
||||
}
|
||||
}
|
||||
|
||||
impl<R: io::Read> io::Seek for DumbSeeker<R> {
|
||||
fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
|
||||
let absolute_offset: u64 = match pos {
|
||||
io::SeekFrom::Start(start_offset) => {
|
||||
if start_offset < self.pos {
|
||||
return Err(io::Error::new(
|
||||
io::ErrorKind::Unsupported,
|
||||
format!("can't seek backwards ({} -> {})", self.pos, start_offset),
|
||||
));
|
||||
} else {
|
||||
start_offset
|
||||
}
|
||||
}
|
||||
// we don't know the total size, can't support this.
|
||||
io::SeekFrom::End(_end_offset) => {
|
||||
return Err(io::Error::new(
|
||||
io::ErrorKind::Unsupported,
|
||||
"can't seek from end",
|
||||
));
|
||||
}
|
||||
io::SeekFrom::Current(relative_offset) => {
|
||||
if relative_offset < 0 {
|
||||
return Err(io::Error::new(
|
||||
io::ErrorKind::Unsupported,
|
||||
"can't seek backwards relative to current position",
|
||||
));
|
||||
} else {
|
||||
self.pos + relative_offset as u64
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// we already know absolute_offset is larger than self.pos
|
||||
debug_assert!(
|
||||
absolute_offset > self.pos,
|
||||
"absolute_offset is larger than self.pos"
|
||||
);
|
||||
|
||||
// calculate bytes to skip
|
||||
let bytes_to_skip: u64 = absolute_offset - self.pos;
|
||||
|
||||
// discard these bytes. We can't use take() as it requires ownership of
|
||||
// self.r, but we only have &mut self.
|
||||
let mut buf = [0; 1024];
|
||||
let mut bytes_skipped: u64 = 0;
|
||||
while bytes_skipped < bytes_to_skip {
|
||||
let len = std::cmp::min(bytes_to_skip - bytes_skipped, buf.len() as u64);
|
||||
match self.r.read(&mut buf[..len as usize]) {
|
||||
Ok(0) => break,
|
||||
Ok(n) => bytes_skipped += n as u64,
|
||||
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
|
||||
Err(e) => return Err(e),
|
||||
}
|
||||
}
|
||||
debug_assert_eq!(bytes_to_skip, bytes_skipped);
|
||||
|
||||
self.pos = absolute_offset;
|
||||
|
||||
// return the new position from the start of the stream
|
||||
Ok(absolute_offset)
|
||||
}
|
||||
}
|
||||
|
||||
/// A Cursor<Vec<u8>> can be used as a BlobReader.
|
||||
impl<R: io::Read + Send + 'static> BlobReader for DumbSeeker<R> {}
|
|
@ -1,4 +1,4 @@
|
|||
use super::{BlobService, BlobWriter};
|
||||
use super::{dumb_seeker::DumbSeeker, BlobReader, BlobService, BlobWriter};
|
||||
use crate::{proto, B3Digest};
|
||||
use futures::sink::{SinkExt, SinkMapErr};
|
||||
use std::{collections::VecDeque, io};
|
||||
|
@ -114,10 +114,7 @@ impl BlobService for GRPCBlobService {
|
|||
|
||||
// On success, this returns a Ok(Some(io::Read)), which can be used to read
|
||||
// the contents of the Blob, identified by the digest.
|
||||
fn open_read(
|
||||
&self,
|
||||
digest: &B3Digest,
|
||||
) -> Result<Option<Box<dyn io::Read + Send>>, crate::Error> {
|
||||
fn open_read(&self, digest: &B3Digest) -> Result<Option<Box<dyn BlobReader>>, crate::Error> {
|
||||
// Get a new handle to the gRPC client, and copy the digest.
|
||||
let mut grpc_client = self.grpc_client.clone();
|
||||
let digest = digest.clone();
|
||||
|
@ -155,7 +152,7 @@ impl BlobService for GRPCBlobService {
|
|||
|
||||
// Use SyncIoBridge to turn it into a sync Read.
|
||||
let sync_reader = tokio_util::io::SyncIoBridge::new(data_reader);
|
||||
Ok(Some(Box::new(sync_reader)))
|
||||
Ok(Some(Box::new(DumbSeeker::new(sync_reader))))
|
||||
}
|
||||
Err(e) if e.code() == Code::NotFound => Ok(None),
|
||||
Err(e) => Err(crate::Error::StorageError(e.to_string())),
|
||||
|
|
|
@ -5,7 +5,7 @@ use std::{
|
|||
};
|
||||
use tracing::{instrument, warn};
|
||||
|
||||
use super::{BlobService, BlobWriter};
|
||||
use super::{BlobReader, BlobService, BlobWriter};
|
||||
use crate::{B3Digest, Error};
|
||||
|
||||
#[derive(Clone, Default)]
|
||||
|
@ -36,7 +36,7 @@ impl BlobService for MemoryBlobService {
|
|||
Ok(db.contains_key(digest))
|
||||
}
|
||||
|
||||
fn open_read(&self, digest: &B3Digest) -> Result<Option<Box<dyn io::Read + Send>>, Error> {
|
||||
fn open_read(&self, digest: &B3Digest) -> Result<Option<Box<dyn BlobReader>>, Error> {
|
||||
let db = self.db.read().unwrap();
|
||||
|
||||
match db.get(digest).map(|x| Cursor::new(x.clone())) {
|
||||
|
|
|
@ -2,6 +2,7 @@ use std::io;
|
|||
|
||||
use crate::{B3Digest, Error};
|
||||
|
||||
mod dumb_seeker;
|
||||
mod from_addr;
|
||||
mod grpc;
|
||||
mod memory;
|
||||
|
@ -30,7 +31,7 @@ pub trait BlobService: Send + Sync {
|
|||
fn has(&self, digest: &B3Digest) -> Result<bool, Error>;
|
||||
|
||||
/// Request a blob from the store, by its content hash.
|
||||
fn open_read(&self, digest: &B3Digest) -> Result<Option<Box<dyn io::Read + Send>>, Error>;
|
||||
fn open_read(&self, digest: &B3Digest) -> Result<Option<Box<dyn BlobReader>>, Error>;
|
||||
|
||||
/// Insert a new blob into the store. Returns a [BlobWriter], which
|
||||
/// implements [io::Write] and a [BlobWriter::close].
|
||||
|
@ -46,3 +47,9 @@ pub trait BlobWriter: io::Write + Send + Sync + 'static {
|
|||
/// Closing a already-closed BlobWriter is a no-op.
|
||||
fn close(&mut self) -> Result<B3Digest, Error>;
|
||||
}
|
||||
|
||||
/// A [io::Read] that also allows seeking.
|
||||
pub trait BlobReader: io::Read + io::Seek + Send + 'static {}
|
||||
|
||||
/// A Cursor<Vec<u8>> can be used as a BlobReader.
|
||||
impl BlobReader for io::Cursor<Vec<u8>> {}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use super::{BlobService, BlobWriter};
|
||||
use super::{BlobReader, BlobService, BlobWriter};
|
||||
use crate::{B3Digest, Error};
|
||||
use std::{
|
||||
io::{self, Cursor},
|
||||
|
@ -65,7 +65,7 @@ impl BlobService for SledBlobService {
|
|||
}
|
||||
|
||||
#[instrument(skip(self), fields(blob.digest=%digest))]
|
||||
fn open_read(&self, digest: &B3Digest) -> Result<Option<Box<dyn io::Read + Send>>, Error> {
|
||||
fn open_read(&self, digest: &B3Digest) -> Result<Option<Box<dyn BlobReader>>, Error> {
|
||||
match self.db.get(digest.to_vec()) {
|
||||
Ok(None) => Ok(None),
|
||||
Ok(Some(data)) => Ok(Some(Box::new(Cursor::new(data[..].to_vec())))),
|
||||
|
|
|
@ -84,3 +84,135 @@ fn put_has_get(blob_service: impl BlobService, blob_contents: &[u8], blob_digest
|
|||
|
||||
assert_eq!(blob_contents, buf, "read blob contents must match");
|
||||
}
|
||||
|
||||
/// Put a blob in the store, and seek inside it a bit.
|
||||
#[test_case(gen_memory_blob_service(); "memory")]
|
||||
#[test_case(gen_sled_blob_service(); "sled")]
|
||||
fn put_seek(blob_service: impl BlobService) {
|
||||
let mut w = blob_service.open_write();
|
||||
|
||||
io::copy(&mut io::Cursor::new(&fixtures::BLOB_B.to_vec()), &mut w).expect("copy must succeed");
|
||||
w.close().expect("close must succeed");
|
||||
|
||||
// open a blob for reading
|
||||
let mut r = blob_service
|
||||
.open_read(&fixtures::BLOB_B_DIGEST)
|
||||
.expect("open_read must succeed")
|
||||
.expect("must be some");
|
||||
|
||||
let mut pos: u64 = 0;
|
||||
|
||||
// read the first 10 bytes, they must match the data in the fixture.
|
||||
{
|
||||
let mut buf = [0; 10];
|
||||
r.read_exact(&mut buf).expect("must succeed");
|
||||
|
||||
assert_eq!(
|
||||
&fixtures::BLOB_B[pos as usize..pos as usize + buf.len()],
|
||||
buf,
|
||||
"expected first 10 bytes to match"
|
||||
);
|
||||
|
||||
pos += buf.len() as u64;
|
||||
}
|
||||
// seek by 0 bytes, using SeekFrom::Start.
|
||||
let p = r
|
||||
.seek(io::SeekFrom::Start(pos as u64))
|
||||
.expect("must not fail");
|
||||
assert_eq!(pos, p);
|
||||
|
||||
// read the next 10 bytes, they must match the data in the fixture.
|
||||
{
|
||||
let mut buf = [0; 10];
|
||||
r.read_exact(&mut buf).expect("must succeed");
|
||||
|
||||
assert_eq!(
|
||||
&fixtures::BLOB_B[pos as usize..pos as usize + buf.len()],
|
||||
buf,
|
||||
"expected data to match"
|
||||
);
|
||||
|
||||
pos += buf.len() as u64;
|
||||
}
|
||||
|
||||
// seek by 5 bytes, using SeekFrom::Start.
|
||||
let p = r
|
||||
.seek(io::SeekFrom::Start(pos as u64 + 5))
|
||||
.expect("must not fail");
|
||||
pos += 5;
|
||||
assert_eq!(pos, p);
|
||||
|
||||
// read the next 10 bytes, they must match the data in the fixture.
|
||||
{
|
||||
let mut buf = [0; 10];
|
||||
r.read_exact(&mut buf).expect("must succeed");
|
||||
|
||||
assert_eq!(
|
||||
&fixtures::BLOB_B[pos as usize..pos as usize + buf.len()],
|
||||
buf,
|
||||
"expected data to match"
|
||||
);
|
||||
|
||||
pos += buf.len() as u64;
|
||||
}
|
||||
|
||||
// seek by 12345 bytes, using SeekFrom::
|
||||
let p = r.seek(io::SeekFrom::Current(12345)).expect("must not fail");
|
||||
pos += 12345;
|
||||
assert_eq!(pos, p);
|
||||
|
||||
// read the next 10 bytes, they must match the data in the fixture.
|
||||
{
|
||||
let mut buf = [0; 10];
|
||||
r.read_exact(&mut buf).expect("must succeed");
|
||||
|
||||
assert_eq!(
|
||||
&fixtures::BLOB_B[pos as usize..pos as usize + buf.len()],
|
||||
buf,
|
||||
"expected data to match"
|
||||
);
|
||||
|
||||
#[allow(unused_assignments)]
|
||||
{
|
||||
pos += buf.len() as u64;
|
||||
}
|
||||
}
|
||||
|
||||
// seeking to the end is okay…
|
||||
let p = r
|
||||
.seek(io::SeekFrom::Start(fixtures::BLOB_B.len() as u64))
|
||||
.expect("must not fail");
|
||||
pos = fixtures::BLOB_B.len() as u64;
|
||||
assert_eq!(pos, p);
|
||||
|
||||
{
|
||||
// but it returns no more data.
|
||||
let mut buf: Vec<u8> = Vec::new();
|
||||
r.read_to_end(&mut buf).expect("must not fail");
|
||||
assert!(buf.is_empty(), "expected no more data to be read");
|
||||
}
|
||||
|
||||
// seeking past the end…
|
||||
match r.seek(io::SeekFrom::Start(fixtures::BLOB_B.len() as u64 + 1)) {
|
||||
// should either be ok, but then return 0 bytes.
|
||||
// this matches the behaviour or a Cursor<Vec<u8>>.
|
||||
Ok(_pos) => {
|
||||
let mut buf: Vec<u8> = Vec::new();
|
||||
r.read_to_end(&mut buf).expect("must not fail");
|
||||
assert!(buf.is_empty(), "expected no more data to be read");
|
||||
}
|
||||
// or not be okay.
|
||||
Err(_) => {}
|
||||
}
|
||||
|
||||
// TODO: this is only broken for the gRPC version
|
||||
// We expect seeking backwards or relative to the end to fail.
|
||||
// r.seek(io::SeekFrom::Current(-1))
|
||||
// .expect_err("SeekFrom::Current(-1) expected to fail");
|
||||
|
||||
// r.seek(io::SeekFrom::Start(pos - 1))
|
||||
// .expect_err("SeekFrom::Start(pos-1) expected to fail");
|
||||
|
||||
// r.seek(io::SeekFrom::End(0))
|
||||
// .expect_err("SeekFrom::End(_) expected to fail");
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue