feat(nix-compat/nar/reader): provide passthrough buffered I/O
Allow taking advantage of the buffer of the underlying reader to avoid unnecessary copies of file data. We can't easily implement the methods of BufRead directly, since we have some extra I/O to perform in the final consume() invocation. That could be resolved at the cost of additional bookkeeping, but this will suffice for now. Change-Id: I8100cf0abd79e7469670b8596bd989be5db44a91 Reviewed-on: https://cl.tvl.fyi/c/depot/+/10089 Reviewed-by: flokli <flokli@flokli.de> Tested-by: BuildkiteCI
This commit is contained in:
parent
785ff80c8b
commit
a11abc02e2
2 changed files with 82 additions and 20 deletions
|
@ -7,7 +7,7 @@
|
|||
use std::io::{
|
||||
self, BufRead,
|
||||
ErrorKind::{InvalidData, UnexpectedEof},
|
||||
Read,
|
||||
Read, Write,
|
||||
};
|
||||
|
||||
// Required reading for understanding this module.
|
||||
|
@ -111,6 +111,62 @@ impl<'a, 'r> FileReader<'a, 'r> {
|
|||
}
|
||||
}
|
||||
|
||||
impl FileReader<'_, '_> {
|
||||
/// Equivalent to [BufRead::fill_buf]
|
||||
///
|
||||
/// We can't directly implement [BufRead], because [FileReader::consume] needs
|
||||
/// to perform fallible I/O.
|
||||
pub fn fill_buf(&mut self) -> io::Result<&[u8]> {
|
||||
if self.is_empty() {
|
||||
return Ok(&[]);
|
||||
}
|
||||
|
||||
let mut buf = self.reader.fill_buf()?;
|
||||
|
||||
if buf.is_empty() {
|
||||
return Err(UnexpectedEof.into());
|
||||
}
|
||||
|
||||
if buf.len() as u64 > self.len {
|
||||
buf = &buf[..self.len as usize];
|
||||
}
|
||||
|
||||
Ok(buf)
|
||||
}
|
||||
|
||||
/// Analogous to [BufRead::consume], differing only in that it needs
|
||||
/// to perform I/O in order to read padding and terminators.
|
||||
pub fn consume(&mut self, n: usize) -> io::Result<()> {
|
||||
if n == 0 {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
self.len = self
|
||||
.len
|
||||
.checked_sub(n as u64)
|
||||
.expect("consumed bytes past EOF");
|
||||
|
||||
self.reader.consume(n);
|
||||
|
||||
if self.is_empty() {
|
||||
self.finish()?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Copy the (remaining) contents of the file into `dst`.
|
||||
pub fn copy(&mut self, mut dst: impl Write) -> io::Result<()> {
|
||||
while !self.is_empty() {
|
||||
let buf = self.fill_buf()?;
|
||||
let n = dst.write(buf)?;
|
||||
self.consume(n)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Read for FileReader<'_, '_> {
|
||||
fn read(&mut self, mut buf: &mut [u8]) -> io::Result<usize> {
|
||||
if buf.is_empty() || self.is_empty() {
|
||||
|
@ -128,27 +184,33 @@ impl Read for FileReader<'_, '_> {
|
|||
return Err(UnexpectedEof.into());
|
||||
}
|
||||
|
||||
// If we've reached semantic EOF, consume and verify the padding and terminating TOK_PAR.
|
||||
// Files are padded to 64 bits (8 bytes), just like any other byte string in the wire format.
|
||||
if self.is_empty() {
|
||||
let pad = (self.pad & 7) as usize;
|
||||
|
||||
if pad != 0 {
|
||||
let mut buf = [0; 8];
|
||||
self.reader.read_exact(&mut buf[pad..])?;
|
||||
|
||||
if buf != [0; 8] {
|
||||
return Err(InvalidData.into());
|
||||
}
|
||||
}
|
||||
|
||||
read::token(self.reader, &wire::TOK_PAR)?;
|
||||
self.finish()?;
|
||||
}
|
||||
|
||||
Ok(n)
|
||||
}
|
||||
}
|
||||
|
||||
impl FileReader<'_, '_> {
|
||||
/// We've reached semantic EOF, consume and verify the padding and terminating TOK_PAR.
|
||||
/// Files are padded to 64 bits (8 bytes), just like any other byte string in the wire format.
|
||||
fn finish(&mut self) -> io::Result<()> {
|
||||
let pad = (self.pad & 7) as usize;
|
||||
|
||||
if pad != 0 {
|
||||
let mut buf = [0; 8];
|
||||
self.reader.read_exact(&mut buf[pad..])?;
|
||||
|
||||
if buf != [0; 8] {
|
||||
return Err(InvalidData.into());
|
||||
}
|
||||
}
|
||||
|
||||
read::token(self.reader, &wire::TOK_PAR)
|
||||
}
|
||||
}
|
||||
|
||||
/// A directory iterator, yielding a sequence of [Node]s.
|
||||
/// It must be fully consumed before reading further from the [DirReader] that produced it, if any.
|
||||
pub struct DirReader<'a, 'r> {
|
||||
|
|
|
@ -135,13 +135,13 @@ fn process_file_reader(
|
|||
|
||||
// write the blob.
|
||||
let mut blob_writer = {
|
||||
let mut dest = SyncIoBridge::new(blob_writer);
|
||||
io::copy(&mut file_reader, &mut dest)?;
|
||||
let mut dst = SyncIoBridge::new(blob_writer);
|
||||
|
||||
dest.shutdown()?;
|
||||
file_reader.copy(&mut dst)?;
|
||||
dst.shutdown()?;
|
||||
|
||||
// return back the blob_reader
|
||||
dest.into_inner()
|
||||
// return back the blob_writer
|
||||
dst.into_inner()
|
||||
};
|
||||
|
||||
// close the blob_writer, retrieve the digest.
|
||||
|
|
Loading…
Reference in a new issue