feat(nix-compat/wire/bytes/reader): support buffered reading

If our underlying reader supports AsyncBufRead, then we can too.

Change-Id: If4b948c983400ca591c1c475bbcf7dc00d562040
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11545
Reviewed-by: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
This commit is contained in:
edef 2024-04-29 17:34:49 +00:00
parent ebad318ab3
commit 51e0f78e93
3 changed files with 195 additions and 6 deletions

View file

@ -94,6 +94,16 @@ impl<'a, 'r> AsyncRead for FileReader<'a, 'r> {
} }
} }
impl<'a, 'r> AsyncBufRead for FileReader<'a, 'r> {
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<io::Result<&[u8]>> {
Pin::new(&mut self.get_mut().inner).poll_fill_buf(cx)
}
fn consume(self: Pin<&mut Self>, amt: usize) {
Pin::new(&mut self.get_mut().inner).consume(amt)
}
}
/// A directory iterator, yielding a sequence of [Node]s. /// A directory iterator, yielding a sequence of [Node]s.
/// It must be fully consumed before reading further from the [DirReader] that produced it, if any. /// It must be fully consumed before reading further from the [DirReader] that produced it, if any.
pub struct DirReader<'a, 'r> { pub struct DirReader<'a, 'r> {

View file

@ -6,7 +6,7 @@ use std::{
pin::Pin, pin::Pin,
task::{self, ready, Poll}, task::{self, ready, Poll},
}; };
use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf}; use tokio::io::{AsyncBufRead, AsyncRead, AsyncReadExt, ReadBuf};
use trailer::{read_trailer, ReadTrailer, Trailer}; use trailer::{read_trailer, ReadTrailer, Trailer};
@ -146,12 +146,12 @@ impl<R: AsyncRead + Unpin, T: Tag> AsyncRead for BytesReader<R, T> {
*this = State::ReadTrailer(read_trailer(reader, tail_len)); *this = State::ReadTrailer(read_trailer(reader, tail_len));
continue; continue;
} else { } else {
reader.as_mut().unwrap() Pin::new(reader.as_mut().unwrap())
}; };
let mut bytes_read = 0; let mut bytes_read = 0;
ready!(with_limited(buf, remaining, |buf| { ready!(with_limited(buf, remaining, |buf| {
let ret = Pin::new(reader).poll_read(cx, buf); let ret = reader.poll_read(cx, buf);
bytes_read = buf.initialized().len(); bytes_read = buf.initialized().len();
ret ret
}))?; }))?;
@ -185,6 +185,96 @@ impl<R: AsyncRead + Unpin, T: Tag> AsyncRead for BytesReader<R, T> {
} }
} }
#[allow(private_bounds)]
impl<R: AsyncBufRead + Unpin, T: Tag> AsyncBufRead for BytesReader<R, T> {
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<io::Result<&[u8]>> {
let this = &mut self.get_mut().state;
loop {
match this {
// This state comes *after* the following case,
// but we can't keep it in logical order because
// that would lengthen the borrow lifetime.
State::Body {
reader,
consumed,
user_len,
} if {
let (body_len, _) = split_user_len(*user_len);
let remaining = body_len - *consumed;
remaining == 0
} =>
{
let reader = reader.take().unwrap();
let (_, tail_len) = split_user_len(*user_len);
*this = State::ReadTrailer(read_trailer(reader, tail_len));
}
State::Body {
reader,
consumed,
user_len,
} => {
let (body_len, _) = split_user_len(*user_len);
let remaining = body_len - *consumed;
let reader = Pin::new(reader.as_mut().unwrap());
match ready!(reader.poll_fill_buf(cx))? {
&[] => {
return Err(io::ErrorKind::UnexpectedEof.into()).into();
}
mut buf => {
if buf.len() as u64 > remaining {
buf = &buf[..remaining as usize];
}
return Ok(buf).into();
}
}
}
State::ReadTrailer(fut) => {
*this = State::ReleaseTrailer {
consumed: 0,
data: ready!(Pin::new(fut).poll(cx))?,
};
}
State::ReleaseTrailer { consumed, data } => {
return Ok(&data[*consumed as usize..]).into();
}
}
}
}
fn consume(mut self: Pin<&mut Self>, amt: usize) {
match &mut self.state {
State::Body {
reader,
consumed,
user_len,
} => {
let reader = Pin::new(reader.as_mut().unwrap());
let (body_len, _) = split_user_len(*user_len);
*consumed = consumed
.checked_add(amt as u64)
.filter(|&consumed| consumed <= body_len)
.expect("consumed out of bounds");
reader.consume(amt);
}
State::ReadTrailer(_) => unreachable!(),
State::ReleaseTrailer { consumed, data } => {
*consumed = amt
.checked_add(*consumed as usize)
.filter(|&consumed| consumed <= data.len())
.expect("consumed out of bounds") as u8;
}
}
}
}
/// Make a limited version of `buf`, consisting only of up to `n` bytes of the unfilled section, and call `f` with it. /// Make a limited version of `buf`, consisting only of up to `n` bytes of the unfilled section, and call `f` with it.
/// After `f` returns, we propagate the filled cursor advancement back to `buf`. /// After `f` returns, we propagate the filled cursor advancement back to `buf`.
fn with_limited<R>(buf: &mut ReadBuf, n: u64, f: impl FnOnce(&mut ReadBuf) -> R) -> R { fn with_limited<R>(buf: &mut ReadBuf, n: u64, f: impl FnOnce(&mut ReadBuf) -> R) -> R {
@ -217,7 +307,7 @@ mod tests {
use hex_literal::hex; use hex_literal::hex;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use rstest::rstest; use rstest::rstest;
use tokio::io::AsyncReadExt; use tokio::io::{AsyncReadExt, BufReader};
use tokio_test::io::Builder; use tokio_test::io::Builder;
use super::*; use super::*;
@ -261,6 +351,34 @@ mod tests {
assert_eq!(payload, &buf[..]); assert_eq!(payload, &buf[..]);
} }
/// Read bytes packets of various length, and ensure copy_buf reads the
/// expected payload.
#[rstest]
#[case::empty(&[])] // empty bytes packet
#[case::size_1b(&[0xff])] // 1 bytes payload
#[case::size_8b(&hex!("0001020304050607"))] // 8 bytes payload (no padding)
#[case::size_9b(&hex!("000102030405060708"))] // 9 bytes payload (7 bytes padding)
#[case::size_1m(LARGE_PAYLOAD.as_slice())] // larger bytes packet
#[tokio::test]
async fn read_payload_correct_readbuf(#[case] payload: &[u8]) {
let mut mock = BufReader::new(
Builder::new()
.read(&produce_packet_bytes(payload).await)
.build(),
);
let mut r = BytesReader::new(&mut mock, ..=LARGE_PAYLOAD.len() as u64)
.await
.unwrap();
let mut buf = Vec::new();
tokio::io::copy_buf(&mut r, &mut buf)
.await
.expect("copy_buf must succeed");
assert_eq!(payload, &buf[..]);
}
/// Fail if the bytes packet is larger than allowed /// Fail if the bytes packet is larger than allowed
#[tokio::test] #[tokio::test]
async fn read_bigger_than_allowed_fail() { async fn read_bigger_than_allowed_fail() {
@ -459,6 +577,48 @@ mod tests {
); );
} }
/// Start a 9 bytes payload packet, but return an error after a certain position.
/// Ensure that error is propagated (AsyncReadBuf case)
#[rstest]
#[case::during_size(4)]
#[case::before_payload(8)]
#[case::during_payload(8 + 4)]
#[case::before_padding(8 + 4)]
#[case::during_padding(8 + 9 + 2)]
#[tokio::test]
async fn propagate_error_from_reader_buffered(#[case] offset: usize) {
let payload = &hex!("FF0102030405060708");
let mock = Builder::new()
.read(&produce_packet_bytes(payload).await[..offset])
.read_error(std::io::Error::new(std::io::ErrorKind::Other, "foo"))
.build();
let mut mock = BufReader::new(mock);
// Either length reading or data reading can fail, depending on which test case we're in.
let err: io::Error = async {
let mut r = BytesReader::new(&mut mock, ..MAX_LEN).await?;
let mut buf = Vec::new();
tokio::io::copy_buf(&mut r, &mut buf).await?;
Ok(())
}
.await
.expect_err("must fail");
assert_eq!(
err.kind(),
std::io::ErrorKind::Other,
"error kind must match"
);
assert_eq!(
err.into_inner().unwrap().to_string(),
"foo",
"error payload must contain foo"
);
}
/// If there's an error right after the padding, we don't propagate it, as /// If there's an error right after the padding, we don't propagate it, as
/// we're done reading. We just return EOF. /// we're done reading. We just return EOF.
#[tokio::test] #[tokio::test]
@ -476,6 +636,26 @@ mod tests {
assert_eq!(buf.as_slice(), payload); assert_eq!(buf.as_slice(), payload);
} }
/// If there's an error right after the padding, we don't propagate it, as
/// we're done reading. We just return EOF.
#[tokio::test]
async fn no_error_after_eof_buffered() {
let payload = &hex!("FF0102030405060708");
let mock = Builder::new()
.read(&produce_packet_bytes(payload).await)
.read_error(std::io::Error::new(std::io::ErrorKind::Other, "foo"))
.build();
let mut mock = BufReader::new(mock);
let mut r = BytesReader::new(&mut mock, ..MAX_LEN).await.unwrap();
let mut buf = Vec::new();
tokio::io::copy_buf(&mut r, &mut buf)
.await
.expect("must succeed");
assert_eq!(buf.as_slice(), payload);
}
/// Introduce various stalls in various places of the packet, to ensure we /// Introduce various stalls in various places of the packet, to ensure we
/// handle these cases properly, too. /// handle these cases properly, too.
#[rstest] #[rstest]

View file

@ -70,8 +70,7 @@ where
} => { } => {
let (digest, size) = { let (digest, size) = {
let mut blob_writer = blob_service.open_write().await; let mut blob_writer = blob_service.open_write().await;
// TODO(edef): fix the AsyncBufRead implementation of nix_compat::wire::BytesReader let size = tokio::io::copy_buf(&mut reader, &mut blob_writer).await?;
let size = tokio::io::copy(&mut reader, &mut blob_writer).await?;
(blob_writer.close().await?, size) (blob_writer.close().await?, size)
}; };