feat(tvix/store): seekable nar renderer
Co-authored-by: edef <edef@edef.eu> Change-Id: I233206e8aae35504ca0519ac88178dfc5596bedb Reviewed-on: https://cl.tvl.fyi/c/depot/+/12439 Reviewed-by: flokli <flokli@flokli.de> Autosubmit: yuka <yuka@yuka.dev> Tested-by: BuildkiteCI
This commit is contained in:
parent
6deff4d8e9
commit
e4378f0143
4 changed files with 535 additions and 0 deletions
|
@ -3,6 +3,7 @@ use tvix_castore::B3Digest;
|
||||||
|
|
||||||
mod import;
|
mod import;
|
||||||
mod renderer;
|
mod renderer;
|
||||||
|
pub mod seekable;
|
||||||
pub use import::ingest_nar;
|
pub use import::ingest_nar;
|
||||||
pub use import::ingest_nar_and_hash;
|
pub use import::ingest_nar_and_hash;
|
||||||
pub use renderer::calculate_size_and_sha256;
|
pub use renderer::calculate_size_and_sha256;
|
||||||
|
|
422
tvix/store/src/nar/seekable.rs
Normal file
422
tvix/store/src/nar/seekable.rs
Normal file
|
@ -0,0 +1,422 @@
|
||||||
|
use std::{
|
||||||
|
cmp::min,
|
||||||
|
io,
|
||||||
|
pin::Pin,
|
||||||
|
sync::Arc,
|
||||||
|
task::{Context, Poll},
|
||||||
|
};
|
||||||
|
|
||||||
|
use super::RenderError;
|
||||||
|
|
||||||
|
use bytes::{BufMut, Bytes};
|
||||||
|
|
||||||
|
use nix_compat::nar::writer::sync as nar_writer;
|
||||||
|
use tvix_castore::blobservice::{BlobReader, BlobService};
|
||||||
|
use tvix_castore::directoryservice::{
|
||||||
|
DirectoryGraph, DirectoryService, RootToLeavesValidator, ValidatedDirectoryGraph,
|
||||||
|
};
|
||||||
|
use tvix_castore::Directory;
|
||||||
|
use tvix_castore::{B3Digest, Node};
|
||||||
|
|
||||||
|
use futures::future::{BoxFuture, FusedFuture, TryMaybeDone};
|
||||||
|
use futures::FutureExt;
|
||||||
|
use futures::TryStreamExt;
|
||||||
|
|
||||||
|
use tokio::io::AsyncSeekExt;
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
struct BlobRef {
|
||||||
|
digest: B3Digest,
|
||||||
|
size: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
enum Data {
|
||||||
|
Literal(Bytes),
|
||||||
|
Blob(BlobRef),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Data {
|
||||||
|
pub fn len(&self) -> u64 {
|
||||||
|
match self {
|
||||||
|
Data::Literal(data) => data.len() as u64,
|
||||||
|
Data::Blob(BlobRef { size, .. }) => *size,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct Reader<B: BlobService> {
|
||||||
|
segments: Vec<(u64, Data)>,
|
||||||
|
position_bytes: u64,
|
||||||
|
position_index: usize,
|
||||||
|
blob_service: Arc<B>,
|
||||||
|
seeking: bool,
|
||||||
|
current_blob: TryMaybeDone<BoxFuture<'static, io::Result<Box<dyn BlobReader>>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Used during construction.
|
||||||
|
/// Converts the current buffer (passed as `cur_segment`) into a `Data::Literal` segment and
|
||||||
|
/// inserts it into `self.segments`.
|
||||||
|
fn flush_segment(segments: &mut Vec<(u64, Data)>, offset: &mut u64, cur_segment: Vec<u8>) {
|
||||||
|
let segment_size = cur_segment.len();
|
||||||
|
segments.push((*offset, Data::Literal(cur_segment.into())));
|
||||||
|
*offset += segment_size as u64;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Used during construction.
|
||||||
|
/// Recursively walks the node and its children, and fills `segments` with the appropriate
|
||||||
|
/// `Data::Literal` and `Data::Blob` elements.
|
||||||
|
fn walk_node(
|
||||||
|
segments: &mut Vec<(u64, Data)>,
|
||||||
|
offset: &mut u64,
|
||||||
|
get_directory: &impl Fn(&B3Digest) -> Directory,
|
||||||
|
node: Node,
|
||||||
|
// Includes a reference to the current segment's buffer
|
||||||
|
nar_node: nar_writer::Node<'_, Vec<u8>>,
|
||||||
|
) -> Result<(), RenderError> {
|
||||||
|
match node {
|
||||||
|
tvix_castore::Node::Symlink { target } => {
|
||||||
|
nar_node
|
||||||
|
.symlink(target.as_ref())
|
||||||
|
.map_err(RenderError::NARWriterError)?;
|
||||||
|
}
|
||||||
|
tvix_castore::Node::File {
|
||||||
|
digest,
|
||||||
|
size,
|
||||||
|
executable,
|
||||||
|
} => {
|
||||||
|
let (cur_segment, skip) = nar_node
|
||||||
|
.file_manual_write(executable, size)
|
||||||
|
.map_err(RenderError::NARWriterError)?;
|
||||||
|
|
||||||
|
// Flush the segment up until the beginning of the blob
|
||||||
|
flush_segment(segments, offset, std::mem::take(cur_segment));
|
||||||
|
|
||||||
|
// Insert the blob segment
|
||||||
|
segments.push((*offset, Data::Blob(BlobRef { digest, size })));
|
||||||
|
*offset += size;
|
||||||
|
|
||||||
|
// Close the file node
|
||||||
|
// We **intentionally** do not write the file contents anywhere.
|
||||||
|
// Instead we have stored the blob reference in a Data::Blob segment,
|
||||||
|
// and the poll_read implementation will take care of serving the
|
||||||
|
// appropriate blob at this offset.
|
||||||
|
skip.close(cur_segment)
|
||||||
|
.map_err(RenderError::NARWriterError)?;
|
||||||
|
}
|
||||||
|
tvix_castore::Node::Directory { digest, .. } => {
|
||||||
|
let directory = get_directory(&digest);
|
||||||
|
|
||||||
|
// start a directory node
|
||||||
|
let mut nar_node_directory =
|
||||||
|
nar_node.directory().map_err(RenderError::NARWriterError)?;
|
||||||
|
|
||||||
|
// for each node in the directory, create a new entry with its name,
|
||||||
|
// and then recurse on that entry.
|
||||||
|
for (name, node) in directory.nodes() {
|
||||||
|
let child_node = nar_node_directory
|
||||||
|
.entry(name.as_ref())
|
||||||
|
.map_err(RenderError::NARWriterError)?;
|
||||||
|
|
||||||
|
walk_node(segments, offset, get_directory, node.clone(), child_node)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
// close the directory
|
||||||
|
nar_node_directory
|
||||||
|
.close()
|
||||||
|
.map_err(RenderError::NARWriterError)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<B: BlobService + 'static> Reader<B> {
|
||||||
|
/// Creates a new seekable NAR renderer for the given castore root node.
|
||||||
|
///
|
||||||
|
/// This function pre-fetches the directory closure using `get_recursive()` and assembles the
|
||||||
|
/// NAR structure, except the file contents which are stored as 'holes' with references to a blob
|
||||||
|
/// of a specific BLAKE3 digest and known size. The AsyncRead implementation will then switch
|
||||||
|
/// between serving the precomputed literal segments, and the appropriate blob for the file
|
||||||
|
/// contents.
|
||||||
|
pub async fn new(
|
||||||
|
root_node: Node,
|
||||||
|
blob_service: B,
|
||||||
|
directory_service: impl DirectoryService,
|
||||||
|
) -> Result<Self, RenderError> {
|
||||||
|
let maybe_directory_closure = match &root_node {
|
||||||
|
// If this is a directory, resolve all subdirectories
|
||||||
|
Node::Directory { digest, .. } => {
|
||||||
|
let mut closure = DirectoryGraph::with_order(
|
||||||
|
RootToLeavesValidator::new_with_root_digest(digest.clone()),
|
||||||
|
);
|
||||||
|
let mut stream = directory_service.get_recursive(digest);
|
||||||
|
while let Some(dir) = stream
|
||||||
|
.try_next()
|
||||||
|
.await
|
||||||
|
.map_err(|e| RenderError::StoreError(e.into()))?
|
||||||
|
{
|
||||||
|
closure.add(dir).map_err(|e| {
|
||||||
|
RenderError::StoreError(
|
||||||
|
tvix_castore::Error::StorageError(e.to_string()).into(),
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
}
|
||||||
|
Some(closure.validate().map_err(|e| {
|
||||||
|
RenderError::StoreError(tvix_castore::Error::StorageError(e.to_string()).into())
|
||||||
|
})?)
|
||||||
|
}
|
||||||
|
// If the top-level node is a file or a symlink, just pass it on
|
||||||
|
Node::File { .. } => None,
|
||||||
|
Node::Symlink { .. } => None,
|
||||||
|
};
|
||||||
|
|
||||||
|
Self::new_with_directory_closure(root_node, blob_service, maybe_directory_closure)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Creates a new seekable NAR renderer for the given castore root node.
|
||||||
|
/// This version of the instantiation does not perform any I/O and as such is not async.
|
||||||
|
/// However it requires all directories to be passed as a ValidatedDirectoryGraph.
|
||||||
|
///
|
||||||
|
/// panics if the directory closure is not the closure of the root node
|
||||||
|
pub fn new_with_directory_closure(
|
||||||
|
root_node: Node,
|
||||||
|
blob_service: B,
|
||||||
|
directory_closure: Option<ValidatedDirectoryGraph>,
|
||||||
|
) -> Result<Self, RenderError> {
|
||||||
|
let directories = directory_closure
|
||||||
|
.map(|directory_closure| {
|
||||||
|
let mut directories: Vec<(B3Digest, Directory)> = vec![];
|
||||||
|
for dir in directory_closure.drain_root_to_leaves() {
|
||||||
|
let digest = dir.digest();
|
||||||
|
let pos = directories
|
||||||
|
.binary_search_by_key(&digest.as_slice(), |(digest, _dir)| {
|
||||||
|
digest.as_slice()
|
||||||
|
})
|
||||||
|
.expect_err("duplicate directory"); // DirectoryGraph checks this
|
||||||
|
directories.insert(pos, (digest, dir));
|
||||||
|
}
|
||||||
|
directories
|
||||||
|
})
|
||||||
|
.unwrap_or_default();
|
||||||
|
|
||||||
|
let mut segments = vec![];
|
||||||
|
let mut cur_segment: Vec<u8> = vec![];
|
||||||
|
let mut offset = 0;
|
||||||
|
|
||||||
|
let nar_node = nar_writer::open(&mut cur_segment).map_err(RenderError::NARWriterError)?;
|
||||||
|
|
||||||
|
walk_node(
|
||||||
|
&mut segments,
|
||||||
|
&mut offset,
|
||||||
|
&|digest| {
|
||||||
|
directories
|
||||||
|
.binary_search_by_key(&digest.as_slice(), |(digest, _dir)| digest.as_slice())
|
||||||
|
.map(|pos| directories[pos].clone())
|
||||||
|
.expect("missing directory") // DirectoryGraph checks this
|
||||||
|
.1
|
||||||
|
},
|
||||||
|
root_node,
|
||||||
|
nar_node,
|
||||||
|
)?;
|
||||||
|
// Flush the final segment
|
||||||
|
flush_segment(&mut segments, &mut offset, std::mem::take(&mut cur_segment));
|
||||||
|
|
||||||
|
Ok(Reader {
|
||||||
|
segments,
|
||||||
|
position_bytes: 0,
|
||||||
|
position_index: 0,
|
||||||
|
blob_service: blob_service.into(),
|
||||||
|
seeking: false,
|
||||||
|
current_blob: TryMaybeDone::Gone,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn stream_len(&self) -> u64 {
|
||||||
|
self.segments
|
||||||
|
.last()
|
||||||
|
.map(|&(off, ref data)| off + data.len())
|
||||||
|
.expect("no segment found")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<B: BlobService + 'static> tokio::io::AsyncSeek for Reader<B> {
|
||||||
|
fn start_seek(mut self: Pin<&mut Self>, pos: io::SeekFrom) -> io::Result<()> {
|
||||||
|
let stream_len = Reader::stream_len(&self);
|
||||||
|
|
||||||
|
let this = &mut *self;
|
||||||
|
if this.seeking {
|
||||||
|
return Err(io::Error::new(io::ErrorKind::Other, "Already seeking"));
|
||||||
|
}
|
||||||
|
this.seeking = true;
|
||||||
|
|
||||||
|
// TODO(edef): be sane about overflows
|
||||||
|
let pos = match pos {
|
||||||
|
io::SeekFrom::Start(n) => n,
|
||||||
|
io::SeekFrom::End(n) => (stream_len as i64 + n) as u64,
|
||||||
|
io::SeekFrom::Current(n) => (this.position_bytes as i64 + n) as u64,
|
||||||
|
};
|
||||||
|
|
||||||
|
let prev_position_bytes = this.position_bytes;
|
||||||
|
let prev_position_index = this.position_index;
|
||||||
|
|
||||||
|
this.position_bytes = min(pos, stream_len);
|
||||||
|
this.position_index = match this
|
||||||
|
.segments
|
||||||
|
.binary_search_by_key(&this.position_bytes, |&(off, _)| off)
|
||||||
|
{
|
||||||
|
Ok(idx) => idx,
|
||||||
|
Err(idx) => idx - 1,
|
||||||
|
};
|
||||||
|
|
||||||
|
let Some((offset, Data::Blob(BlobRef { digest, .. }))) =
|
||||||
|
this.segments.get(this.position_index)
|
||||||
|
else {
|
||||||
|
// If not seeking into a blob, we clear the active blob reader and then we're done
|
||||||
|
this.current_blob = TryMaybeDone::Gone;
|
||||||
|
return Ok(());
|
||||||
|
};
|
||||||
|
let offset_in_segment = this.position_bytes - offset;
|
||||||
|
|
||||||
|
if prev_position_bytes == this.position_bytes {
|
||||||
|
// position has not changed. do nothing
|
||||||
|
} else if prev_position_index == this.position_index {
|
||||||
|
// seeking within the same segment, re-use the blob reader
|
||||||
|
let mut prev = std::mem::replace(&mut this.current_blob, TryMaybeDone::Gone);
|
||||||
|
this.current_blob = futures::future::try_maybe_done(
|
||||||
|
(async move {
|
||||||
|
let mut reader = Pin::new(&mut prev).take_output().unwrap();
|
||||||
|
reader.seek(io::SeekFrom::Start(offset_in_segment)).await?;
|
||||||
|
Ok(reader)
|
||||||
|
})
|
||||||
|
.boxed(),
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
// seek to a different segment
|
||||||
|
let blob_service = this.blob_service.clone();
|
||||||
|
let digest = digest.clone();
|
||||||
|
this.current_blob = futures::future::try_maybe_done(
|
||||||
|
(async move {
|
||||||
|
let mut reader =
|
||||||
|
blob_service
|
||||||
|
.open_read(&digest)
|
||||||
|
.await?
|
||||||
|
.ok_or(io::Error::new(
|
||||||
|
io::ErrorKind::NotFound,
|
||||||
|
RenderError::BlobNotFound(digest.clone(), Default::default()),
|
||||||
|
))?;
|
||||||
|
if offset_in_segment != 0 {
|
||||||
|
reader.seek(io::SeekFrom::Start(offset_in_segment)).await?;
|
||||||
|
}
|
||||||
|
Ok(reader)
|
||||||
|
})
|
||||||
|
.boxed(),
|
||||||
|
);
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<u64>> {
|
||||||
|
let this = &mut *self;
|
||||||
|
|
||||||
|
if !this.current_blob.is_terminated() {
|
||||||
|
futures::ready!(this.current_blob.poll_unpin(cx))?;
|
||||||
|
}
|
||||||
|
this.seeking = false;
|
||||||
|
|
||||||
|
Poll::Ready(Ok(this.position_bytes))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<B: BlobService + 'static> tokio::io::AsyncRead for Reader<B> {
|
||||||
|
fn poll_read(
|
||||||
|
mut self: Pin<&mut Self>,
|
||||||
|
cx: &mut Context,
|
||||||
|
buf: &mut tokio::io::ReadBuf,
|
||||||
|
) -> Poll<io::Result<()>> {
|
||||||
|
let this = &mut *self;
|
||||||
|
|
||||||
|
let Some(&(offset, ref segment)) = this.segments.get(this.position_index) else {
|
||||||
|
return Poll::Ready(Ok(())); // EOF
|
||||||
|
};
|
||||||
|
|
||||||
|
let prev_read_buf_pos = buf.filled().len();
|
||||||
|
match segment {
|
||||||
|
Data::Literal(data) => {
|
||||||
|
let offset_in_segment = this.position_bytes - offset;
|
||||||
|
let offset_in_segment = usize::try_from(offset_in_segment).unwrap();
|
||||||
|
let remaining_data = data.len() - offset_in_segment;
|
||||||
|
let read_size = std::cmp::min(remaining_data, buf.remaining());
|
||||||
|
buf.put(&data[offset_in_segment..offset_in_segment + read_size]);
|
||||||
|
}
|
||||||
|
Data::Blob(BlobRef { size, .. }) => {
|
||||||
|
futures::ready!(this.current_blob.poll_unpin(cx))?;
|
||||||
|
this.seeking = false;
|
||||||
|
let blob = Pin::new(&mut this.current_blob)
|
||||||
|
.output_mut()
|
||||||
|
.expect("missing blob");
|
||||||
|
futures::ready!(Pin::new(blob).poll_read(cx, buf))?;
|
||||||
|
let read_length = buf.filled().len() - prev_read_buf_pos;
|
||||||
|
let maximum_expected_read_length = (offset + size) - this.position_bytes;
|
||||||
|
let is_eof = read_length == 0;
|
||||||
|
let too_much_returned = read_length as u64 > maximum_expected_read_length;
|
||||||
|
match (is_eof, too_much_returned) {
|
||||||
|
(true, false) => {
|
||||||
|
return Poll::Ready(Err(io::Error::new(
|
||||||
|
io::ErrorKind::UnexpectedEof,
|
||||||
|
"blob short read",
|
||||||
|
)))
|
||||||
|
}
|
||||||
|
(false, true) => {
|
||||||
|
buf.set_filled(prev_read_buf_pos);
|
||||||
|
return Poll::Ready(Err(io::Error::new(
|
||||||
|
io::ErrorKind::InvalidInput,
|
||||||
|
"blob continued to yield data beyond end",
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let new_read_buf_pos = buf.filled().len();
|
||||||
|
this.position_bytes += (new_read_buf_pos - prev_read_buf_pos) as u64;
|
||||||
|
|
||||||
|
let prev_position_index = this.position_index;
|
||||||
|
while {
|
||||||
|
if let Some(&(offset, ref segment)) = this.segments.get(this.position_index) {
|
||||||
|
(this.position_bytes - offset) >= segment.len()
|
||||||
|
} else {
|
||||||
|
false
|
||||||
|
}
|
||||||
|
} {
|
||||||
|
this.position_index += 1;
|
||||||
|
}
|
||||||
|
if prev_position_index != this.position_index {
|
||||||
|
let Some((_offset, Data::Blob(BlobRef { digest, .. }))) =
|
||||||
|
this.segments.get(this.position_index)
|
||||||
|
else {
|
||||||
|
// If the next segment is not a blob, we clear the active blob reader and then we're done
|
||||||
|
this.current_blob = TryMaybeDone::Gone;
|
||||||
|
return Poll::Ready(Ok(()));
|
||||||
|
};
|
||||||
|
|
||||||
|
// The next segment is a blob, open the BlobReader
|
||||||
|
let blob_service = this.blob_service.clone();
|
||||||
|
let digest = digest.clone();
|
||||||
|
this.current_blob = futures::future::try_maybe_done(
|
||||||
|
(async move {
|
||||||
|
let reader = blob_service
|
||||||
|
.open_read(&digest)
|
||||||
|
.await?
|
||||||
|
.ok_or(io::Error::new(
|
||||||
|
io::ErrorKind::NotFound,
|
||||||
|
RenderError::BlobNotFound(digest.clone(), Default::default()),
|
||||||
|
))?;
|
||||||
|
Ok(reader)
|
||||||
|
})
|
||||||
|
.boxed(),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
Poll::Ready(Ok(()))
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,2 +1,3 @@
|
||||||
pub mod fixtures;
|
pub mod fixtures;
|
||||||
mod nar_renderer;
|
mod nar_renderer;
|
||||||
|
mod nar_renderer_seekable;
|
||||||
|
|
111
tvix/store/src/tests/nar_renderer_seekable.rs
Normal file
111
tvix/store/src/tests/nar_renderer_seekable.rs
Normal file
|
@ -0,0 +1,111 @@
|
||||||
|
use crate::nar::seekable::Reader;
|
||||||
|
use crate::tests::fixtures::blob_service_with_contents as blob_service;
|
||||||
|
use crate::tests::fixtures::directory_service_with_contents as directory_service;
|
||||||
|
use crate::tests::fixtures::*;
|
||||||
|
use rstest::*;
|
||||||
|
use rstest_reuse::*;
|
||||||
|
use std::io;
|
||||||
|
use std::pin::Pin;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use tokio::io::{AsyncReadExt, AsyncSeek, AsyncSeekExt};
|
||||||
|
use tvix_castore::blobservice::BlobService;
|
||||||
|
use tvix_castore::directoryservice::DirectoryService;
|
||||||
|
use tvix_castore::Node;
|
||||||
|
|
||||||
|
#[apply(castore_fixtures_template)]
|
||||||
|
#[tokio::test]
|
||||||
|
async fn read_to_end(
|
||||||
|
#[future] blob_service: Arc<dyn BlobService>,
|
||||||
|
#[future] directory_service: Arc<dyn DirectoryService>,
|
||||||
|
#[case] test_input: &Node,
|
||||||
|
#[case] test_output: Result<Result<&Vec<u8>, io::ErrorKind>, crate::nar::RenderError>,
|
||||||
|
) {
|
||||||
|
let reader_result = Reader::new(
|
||||||
|
test_input.clone(),
|
||||||
|
// don't put anything in the stores, as we don't actually do any requests.
|
||||||
|
blob_service.await,
|
||||||
|
directory_service.await,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
match (reader_result, test_output) {
|
||||||
|
(Ok(_), Err(_)) => panic!("creating reader should have failed but succeeded"),
|
||||||
|
(Err(err), Ok(_)) => panic!("creating reader should have succeeded but failed: {}", err),
|
||||||
|
(Err(reader_err), Err(expected_err)) => {
|
||||||
|
assert_eq!(format!("{}", reader_err), format!("{}", expected_err));
|
||||||
|
}
|
||||||
|
(Ok(mut reader), Ok(expected_read_result)) => {
|
||||||
|
let mut buf: Vec<u8> = vec![];
|
||||||
|
let read_result = reader.read_to_end(&mut buf).await;
|
||||||
|
|
||||||
|
match (read_result, expected_read_result) {
|
||||||
|
(Ok(_), Err(_)) => panic!("read_to_end should have failed but succeeded"),
|
||||||
|
(Err(err), Ok(_)) => {
|
||||||
|
panic!("read_to_end should have succeeded but failed: {}", err)
|
||||||
|
}
|
||||||
|
(Err(read_err), Err(expected_read_err)) => {
|
||||||
|
assert_eq!(read_err.kind(), expected_read_err);
|
||||||
|
}
|
||||||
|
(Ok(_n), Ok(expected_read_result)) => {
|
||||||
|
assert_eq!(buf, expected_read_result.to_vec());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[rstest]
|
||||||
|
#[tokio::test]
|
||||||
|
/// Check that the Reader does not allow starting a seek while another seek is running
|
||||||
|
/// If this is not prevented, it might lead to futures piling up on the heap
|
||||||
|
async fn seek_twice(
|
||||||
|
#[future] blob_service: Arc<dyn BlobService>,
|
||||||
|
#[future] directory_service: Arc<dyn DirectoryService>,
|
||||||
|
) {
|
||||||
|
let mut reader = Reader::new(
|
||||||
|
CASTORE_NODE_COMPLICATED.clone(),
|
||||||
|
// don't put anything in the stores, as we don't actually do any requests.
|
||||||
|
blob_service.await,
|
||||||
|
directory_service.await,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.expect("must succeed");
|
||||||
|
|
||||||
|
Pin::new(&mut reader)
|
||||||
|
.start_seek(io::SeekFrom::Start(1))
|
||||||
|
.expect("must succeed");
|
||||||
|
let seek_err = Pin::new(&mut reader)
|
||||||
|
.start_seek(io::SeekFrom::Start(2))
|
||||||
|
.expect_err("must fail");
|
||||||
|
|
||||||
|
assert_eq!(seek_err.kind(), io::ErrorKind::Other);
|
||||||
|
assert_eq!(seek_err.to_string(), "Already seeking".to_string());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[rstest]
|
||||||
|
#[tokio::test]
|
||||||
|
async fn seek(
|
||||||
|
#[future] blob_service: Arc<dyn BlobService>,
|
||||||
|
#[future] directory_service: Arc<dyn DirectoryService>,
|
||||||
|
) {
|
||||||
|
let mut reader = Reader::new(
|
||||||
|
CASTORE_NODE_HELLOWORLD.clone(),
|
||||||
|
// don't put anything in the stores, as we don't actually do any requests.
|
||||||
|
blob_service.await,
|
||||||
|
directory_service.await,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.expect("must succeed");
|
||||||
|
|
||||||
|
let mut buf = [0u8; 10];
|
||||||
|
|
||||||
|
for position in [
|
||||||
|
io::SeekFrom::Start(0x65), // Just before the file contents
|
||||||
|
io::SeekFrom::Start(0x68), // Seek back the file contents
|
||||||
|
io::SeekFrom::Start(0x70), // Just before the end of the file contents
|
||||||
|
] {
|
||||||
|
let n = reader.seek(position).await.expect("seek") as usize;
|
||||||
|
reader.read_exact(&mut buf).await.expect("read_exact");
|
||||||
|
assert_eq!(NAR_CONTENTS_HELLOWORLD[n..n + 10], buf);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Add table
Reference in a new issue