feat(nix-compat/nar/reader): async support

This is a first cut at the async NAR reader, with some rough edges.
Poisoning is left unimplemented for now, pending future work.

Change-Id: Ifaafe0581a5e0e165a13357b909fb441f7bd8bab
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11524
Reviewed-by: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
This commit is contained in:
edef 2024-04-29 16:44:45 +00:00
parent 343e176bec
commit 08feea4817
6 changed files with 566 additions and 1 deletions

View file

@ -0,0 +1,166 @@
use std::{
pin::Pin,
task::{self, Poll},
};
use tokio::io::{self, AsyncBufRead, AsyncRead, ErrorKind::InvalidData};
// Required reading for understanding this module.
use crate::{
nar::{self, wire::PadPar},
wire::{self, BytesReader},
};
mod read;
#[cfg(test)]
mod test;
pub type Reader<'a> = dyn AsyncBufRead + Unpin + Send + 'a;
/// Start reading a NAR file from `reader`.
pub async fn open<'a, 'r>(reader: &'a mut Reader<'r>) -> io::Result<Node<'a, 'r>> {
read::token(reader, &nar::wire::TOK_NAR).await?;
Node::new(reader).await
}
pub enum Node<'a, 'r: 'a> {
Symlink {
target: Vec<u8>,
},
File {
executable: bool,
reader: FileReader<'a, 'r>,
},
Directory(DirReader<'a, 'r>),
}
impl<'a, 'r: 'a> Node<'a, 'r> {
/// Start reading a [Node], matching the next [wire::Node].
///
/// Reading the terminating [wire::TOK_PAR] is done immediately for [Node::Symlink],
/// but is otherwise left to [DirReader] or [BytesReader].
async fn new(reader: &'a mut Reader<'r>) -> io::Result<Self> {
Ok(match read::tag(reader).await? {
nar::wire::Node::Sym => {
let target = wire::read_bytes(reader, 1..=nar::wire::MAX_TARGET_LEN).await?;
if target.contains(&0) {
return Err(InvalidData.into());
}
read::token(reader, &nar::wire::TOK_PAR).await?;
Node::Symlink { target }
}
tag @ (nar::wire::Node::Reg | nar::wire::Node::Exe) => Node::File {
executable: tag == nar::wire::Node::Exe,
reader: FileReader {
inner: BytesReader::new_internal(reader, ..).await?,
},
},
nar::wire::Node::Dir => Node::Directory(DirReader::new(reader)),
})
}
}
/// File contents, readable through the [AsyncRead] trait.
///
/// It comes with some caveats:
/// * You must always read the entire file, unless you intend to abandon the entire archive reader.
/// * You must abandon the entire archive reader upon the first error.
///
/// It's fine to read exactly `reader.len()` bytes without ever seeing an explicit EOF.
pub struct FileReader<'a, 'r> {
inner: BytesReader<&'a mut Reader<'r>, PadPar>,
}
impl<'a, 'r> FileReader<'a, 'r> {
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn len(&self) -> u64 {
self.inner.len()
}
}
impl<'a, 'r> AsyncRead for FileReader<'a, 'r> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut task::Context,
buf: &mut io::ReadBuf,
) -> Poll<io::Result<()>> {
Pin::new(&mut self.get_mut().inner).poll_read(cx, buf)
}
}
/// A directory iterator, yielding a sequence of [Node]s.
/// It must be fully consumed before reading further from the [DirReader] that produced it, if any.
pub struct DirReader<'a, 'r> {
reader: &'a mut Reader<'r>,
/// Previous directory entry name.
/// We have to hang onto this to enforce name monotonicity.
prev_name: Option<Vec<u8>>,
}
pub struct Entry<'a, 'r> {
pub name: Vec<u8>,
pub node: Node<'a, 'r>,
}
impl<'a, 'r> DirReader<'a, 'r> {
fn new(reader: &'a mut Reader<'r>) -> Self {
Self {
reader,
prev_name: None,
}
}
/// Read the next [Entry] from the directory.
///
/// We explicitly don't implement [Iterator], since treating this as
/// a regular Rust iterator will surely lead you astray.
///
/// * You must always consume the entire iterator, unless you abandon the entire archive reader.
/// * You must abandon the entire archive reader on the first error.
/// * You must abandon the directory reader upon the first [None].
/// * Even if you know the amount of elements up front, you must keep reading until you encounter [None].
pub async fn next(&mut self) -> io::Result<Option<Entry<'_, 'r>>> {
// COME FROM the previous iteration: if we've already read an entry,
// read its terminating TOK_PAR here.
if self.prev_name.is_some() {
read::token(self.reader, &nar::wire::TOK_PAR).await?;
}
if let nar::wire::Entry::None = read::tag(self.reader).await? {
return Ok(None);
}
let name = wire::read_bytes(self.reader, 1..=nar::wire::MAX_NAME_LEN).await?;
if name.contains(&0) || name.contains(&b'/') || name == b"." || name == b".." {
return Err(InvalidData.into());
}
// Enforce strict monotonicity of directory entry names.
match &mut self.prev_name {
None => {
self.prev_name = Some(name.clone());
}
Some(prev_name) => {
if *prev_name >= name {
return Err(InvalidData.into());
}
name[..].clone_into(prev_name);
}
}
read::token(self.reader, &nar::wire::TOK_NOD).await?;
Ok(Some(Entry {
name,
node: Node::new(self.reader).await?,
}))
}
}

View file

@ -0,0 +1,69 @@
use tokio::io::{
self, AsyncReadExt,
ErrorKind::{InvalidData, UnexpectedEof},
};
use crate::nar::wire::Tag;
use super::Reader;
/// Consume a known token from the reader.
pub async fn token<const N: usize>(reader: &mut Reader<'_>, token: &[u8; N]) -> io::Result<()> {
let mut buf = [0u8; N];
// This implements something similar to [AsyncReadExt::read_exact], but verifies that
// the input data matches the token while we read it. These two slices respectively
// represent the remaining token to be verified, and the remaining input buffer.
let mut token = &token[..];
let mut buf = &mut buf[..];
while !token.is_empty() {
match reader.read(buf).await? {
0 => {
return Err(UnexpectedEof.into());
}
n => {
let (t, b);
(t, token) = token.split_at(n);
(b, buf) = buf.split_at_mut(n);
if t != b {
return Err(InvalidData.into());
}
}
}
}
Ok(())
}
/// Consume a [Tag] from the reader.
pub async fn tag<T: Tag>(reader: &mut Reader<'_>) -> io::Result<T> {
let mut buf = T::make_buf();
let buf = buf.as_mut();
// first read the known minimum length…
reader.read_exact(&mut buf[..T::MIN]).await?;
// then decide which tag we're expecting
let tag = T::from_u8(buf[T::OFF]).ok_or(InvalidData)?;
let (head, tail) = tag.as_bytes().split_at(T::MIN);
// make sure what we've read so far is valid
if buf[..T::MIN] != *head {
return Err(InvalidData.into());
}
// …then read the rest, if any
if !tail.is_empty() {
let rest = tail.len();
reader.read_exact(&mut buf[..rest]).await?;
// and make sure it's what we expect
if buf[..rest] != *tail {
return Err(InvalidData.into());
}
}
Ok(tag)
}

View file

@ -0,0 +1,310 @@
use tokio::io::AsyncReadExt;
mod nar {
pub use crate::nar::reader::r#async as reader;
}
#[tokio::test]
async fn symlink() {
let mut f = std::io::Cursor::new(include_bytes!("../../tests/symlink.nar"));
let node = nar::reader::open(&mut f).await.unwrap();
match node {
nar::reader::Node::Symlink { target } => {
assert_eq!(
&b"/nix/store/somewhereelse"[..],
&target,
"target must match"
);
}
_ => panic!("unexpected type"),
}
}
#[tokio::test]
async fn file() {
let mut f = std::io::Cursor::new(include_bytes!("../../tests/helloworld.nar"));
let node = nar::reader::open(&mut f).await.unwrap();
match node {
nar::reader::Node::File {
executable,
mut reader,
} => {
assert!(!executable);
let mut buf = vec![];
reader
.read_to_end(&mut buf)
.await
.expect("read must succeed");
assert_eq!(&b"Hello World!"[..], &buf);
}
_ => panic!("unexpected type"),
}
}
#[tokio::test]
async fn complicated() {
let mut f = std::io::Cursor::new(include_bytes!("../../tests/complicated.nar"));
let node = nar::reader::open(&mut f).await.unwrap();
match node {
nar::reader::Node::Directory(mut dir_reader) => {
// first entry is .keep, an empty regular file.
must_read_file(
".keep",
dir_reader
.next()
.await
.expect("next must succeed")
.expect("must be some"),
)
.await;
// second entry is aa, a symlink to /nix/store/somewhereelse
must_be_symlink(
"aa",
"/nix/store/somewhereelse",
dir_reader
.next()
.await
.expect("next must be some")
.expect("must be some"),
);
{
// third entry is a directory called "keep"
let entry = dir_reader
.next()
.await
.expect("next must be some")
.expect("must be some");
assert_eq!(&b"keep"[..], &entry.name);
match entry.node {
nar::reader::Node::Directory(mut subdir_reader) => {
{
// first entry is .keep, an empty regular file.
let entry = subdir_reader
.next()
.await
.expect("next must succeed")
.expect("must be some");
must_read_file(".keep", entry).await;
}
// we must read the None
assert!(
subdir_reader
.next()
.await
.expect("next must succeed")
.is_none(),
"keep directory contains only .keep"
);
}
_ => panic!("unexpected type for keep/.keep"),
}
};
// reading more entries yields None (and we actually must read until this)
assert!(dir_reader.next().await.expect("must succeed").is_none());
}
_ => panic!("unexpected type"),
}
}
#[tokio::test]
#[should_panic]
#[ignore = "TODO: async poisoning"]
async fn file_read_abandoned() {
let mut f = std::io::Cursor::new(include_bytes!("../../tests/complicated.nar"));
let node = nar::reader::open(&mut f).await.unwrap();
match node {
nar::reader::Node::Directory(mut dir_reader) => {
// first entry is .keep, an empty regular file.
{
let entry = dir_reader
.next()
.await
.expect("next must succeed")
.expect("must be some");
assert_eq!(&b".keep"[..], &entry.name);
// don't bother to finish reading it.
};
// this should panic (not return an error), because we are meant to abandon the archive reader now.
assert!(dir_reader.next().await.expect("must succeed").is_none());
}
_ => panic!("unexpected type"),
}
}
#[tokio::test]
#[should_panic]
#[ignore = "TODO: async poisoning"]
async fn dir_read_abandoned() {
let mut f = std::io::Cursor::new(include_bytes!("../../tests/complicated.nar"));
let node = nar::reader::open(&mut f).await.unwrap();
match node {
nar::reader::Node::Directory(mut dir_reader) => {
// first entry is .keep, an empty regular file.
must_read_file(
".keep",
dir_reader
.next()
.await
.expect("next must succeed")
.expect("must be some"),
)
.await;
// second entry is aa, a symlink to /nix/store/somewhereelse
must_be_symlink(
"aa",
"/nix/store/somewhereelse",
dir_reader
.next()
.await
.expect("next must be some")
.expect("must be some"),
);
{
// third entry is a directory called "keep"
let entry = dir_reader
.next()
.await
.expect("next must be some")
.expect("must be some");
assert_eq!(&b"keep"[..], &entry.name);
match entry.node {
nar::reader::Node::Directory(_) => {
// don't finish using it, which poisons the archive reader
}
_ => panic!("unexpected type for keep/.keep"),
}
};
// this should panic, because we didn't finish reading the child subdirectory
assert!(dir_reader.next().await.expect("must succeed").is_none());
}
_ => panic!("unexpected type"),
}
}
#[tokio::test]
#[should_panic]
#[ignore = "TODO: async poisoning"]
async fn dir_read_after_none() {
let mut f = std::io::Cursor::new(include_bytes!("../../tests/complicated.nar"));
let node = nar::reader::open(&mut f).await.unwrap();
match node {
nar::reader::Node::Directory(mut dir_reader) => {
// first entry is .keep, an empty regular file.
must_read_file(
".keep",
dir_reader
.next()
.await
.expect("next must succeed")
.expect("must be some"),
)
.await;
// second entry is aa, a symlink to /nix/store/somewhereelse
must_be_symlink(
"aa",
"/nix/store/somewhereelse",
dir_reader
.next()
.await
.expect("next must be some")
.expect("must be some"),
);
{
// third entry is a directory called "keep"
let entry = dir_reader
.next()
.await
.expect("next must be some")
.expect("must be some");
assert_eq!(&b"keep"[..], &entry.name);
match entry.node {
nar::reader::Node::Directory(mut subdir_reader) => {
// first entry is .keep, an empty regular file.
must_read_file(
".keep",
subdir_reader
.next()
.await
.expect("next must succeed")
.expect("must be some"),
)
.await;
// we must read the None
assert!(
subdir_reader
.next()
.await
.expect("next must succeed")
.is_none(),
"keep directory contains only .keep"
);
}
_ => panic!("unexpected type for keep/.keep"),
}
};
// reading more entries yields None (and we actually must read until this)
assert!(dir_reader.next().await.expect("must succeed").is_none());
// this should panic, because we already got a none so we're meant to stop.
dir_reader.next().await.unwrap();
unreachable!()
}
_ => panic!("unexpected type"),
}
}
async fn must_read_file(name: &'static str, entry: nar::reader::Entry<'_, '_>) {
assert_eq!(name.as_bytes(), &entry.name);
match entry.node {
nar::reader::Node::File {
executable,
mut reader,
} => {
assert!(!executable);
assert_eq!(reader.read(&mut [0]).await.unwrap(), 0);
}
_ => panic!("unexpected type for {}", name),
}
}
fn must_be_symlink(
name: &'static str,
exp_target: &'static str,
entry: nar::reader::Entry<'_, '_>,
) {
assert_eq!(name.as_bytes(), &entry.name);
match entry.node {
nar::reader::Node::Symlink { target } => {
assert_eq!(exp_target.as_bytes(), &target);
}
_ => panic!("unexpected type for {}", name),
}
}

View file

@ -16,6 +16,9 @@ use std::marker::PhantomData;
// Required reading for understanding this module. // Required reading for understanding this module.
use crate::nar::wire; use crate::nar::wire;
#[cfg(feature = "async")]
pub mod r#async;
mod read; mod read;
#[cfg(test)] #[cfg(test)]
mod test; mod test;

View file

@ -90,6 +90,23 @@ pub const TOK_DIR: [u8; 24] = *b"\x09\0\0\0\0\0\0\0directory\0\0\0\0\0\0\0";
pub const TOK_ENT: [u8; 48] = *b"\x05\0\0\0\0\0\0\0entry\0\0\0\x01\0\0\0\0\0\0\0(\0\0\0\0\0\0\0\x04\0\0\0\0\0\0\0name\0\0\0\0"; pub const TOK_ENT: [u8; 48] = *b"\x05\0\0\0\0\0\0\0entry\0\0\0\x01\0\0\0\0\0\0\0(\0\0\0\0\0\0\0\x04\0\0\0\0\0\0\0name\0\0\0\0";
pub const TOK_NOD: [u8; 48] = *b"\x04\0\0\0\0\0\0\0node\0\0\0\0\x01\0\0\0\0\0\0\0(\0\0\0\0\0\0\0\x04\0\0\0\0\0\0\0type\0\0\0\0"; pub const TOK_NOD: [u8; 48] = *b"\x04\0\0\0\0\0\0\0node\0\0\0\0\x01\0\0\0\0\0\0\0(\0\0\0\0\0\0\0\x04\0\0\0\0\0\0\0type\0\0\0\0";
pub const TOK_PAR: [u8; 16] = *b"\x01\0\0\0\0\0\0\0)\0\0\0\0\0\0\0"; pub const TOK_PAR: [u8; 16] = *b"\x01\0\0\0\0\0\0\0)\0\0\0\0\0\0\0";
#[cfg(feature = "async")]
const TOK_PAD_PAR: [u8; 24] = *b"\0\0\0\0\0\0\0\0\x01\0\0\0\0\0\0\0)\0\0\0\0\0\0\0";
#[cfg(feature = "async")]
#[derive(Debug)]
pub(crate) enum PadPar {}
#[cfg(feature = "async")]
impl crate::wire::reader::Tag for PadPar {
const PATTERN: &'static [u8] = &TOK_PAD_PAR;
type Buf = [u8; 24];
fn make_buf() -> Self::Buf {
[0; 24]
}
}
#[test] #[test]
fn tokens() { fn tokens() {

View file

@ -33,7 +33,7 @@ const LEN_SIZE: usize = 8;
/// ///
/// This buffers the entire payload into memory, /// This buffers the entire payload into memory,
/// a streaming version is available at [crate::wire::bytes::BytesReader]. /// a streaming version is available at [crate::wire::bytes::BytesReader].
pub async fn read_bytes<R>( pub async fn read_bytes<R: ?Sized>(
r: &mut R, r: &mut R,
allowed_size: RangeInclusive<usize>, allowed_size: RangeInclusive<usize>,
) -> std::io::Result<Vec<u8>> ) -> std::io::Result<Vec<u8>>