feat(tvix/glue): Add AsyncRead wrapper to decompress streams

Add a new AsyncRead wrapper, DecompressedReader, that wraps an
underlying AsyncRead, but sniffs the magic bytes at the start of the
stream to determine which compression format is being used out of the
three that are supported by builtins.fetchTarball, and switches to the
correct decompression algorithm adapter dynamically.

This will be used in the implementation of builtins.fetchTarball

Change-Id: I892a4683d5c93e67d4c173f3d21199bdc6605922
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11019
Reviewed-by: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
This commit is contained in:
Aspen Smith 2024-02-23 14:42:52 -05:00 committed by aspen
parent de727bccf9
commit 54609e8c17
8 changed files with 777 additions and 1 deletions

View file

@ -9,7 +9,9 @@ bstr = "1.6.0"
bytes = "1.4.0"
data-encoding = "2.3.3"
futures = "0.3.30"
magic = "0.16.2"
nix-compat = { path = "../nix-compat" }
pin-project = "1.1"
reqwest = { version = "0.11.22", features = ["rustls-tls-native-roots"], default-features = false }
tvix-build = { path = "../build", default-features = false, features = []}
tvix-eval = { path = "../eval" }
@ -17,6 +19,7 @@ tvix-castore = { path = "../castore" }
tvix-store = { path = "../store", default-features = false, features = []}
tracing = "0.1.37"
tokio = "1.28.0"
tokio-tar = "0.3.1"
tokio-util = { version = "0.7.9", features = ["io", "io-util", "compat"] }
thiserror = "1.0.38"
serde = "1.0.195"
@ -24,6 +27,10 @@ serde_json = "1.0"
sha2 = "0.10.8"
walkdir = "2.4.0"
[dependencies.async-compression]
version = "0.4.6"
features = ["tokio", "gzip", "bzip2", "xz"]
[dependencies.wu-manber]
git = "https://github.com/tvlfyi/wu-manber.git"

View file

@ -0,0 +1,221 @@
#![allow(dead_code)] // TODO
use std::{
io, mem,
pin::Pin,
task::{Context, Poll},
};
use async_compression::tokio::bufread::{BzDecoder, GzipDecoder, XzDecoder};
use futures::ready;
use pin_project::pin_project;
use tokio::io::{AsyncBufRead, AsyncRead, BufReader, ReadBuf};
const GZIP_MAGIC: [u8; 2] = [0x1f, 0x8b];
const BZIP2_MAGIC: [u8; 3] = *b"BZh";
const XZ_MAGIC: [u8; 6] = [0xfd, 0x37, 0x7a, 0x58, 0x5a, 0x00];
const BYTES_NEEDED: usize = 6;
#[derive(Debug, Clone, Copy)]
enum Algorithm {
Gzip,
Bzip2,
Xz,
}
impl Algorithm {
fn from_magic(magic: &[u8]) -> Option<Self> {
if magic.starts_with(&GZIP_MAGIC) {
Some(Self::Gzip)
} else if magic.starts_with(&BZIP2_MAGIC) {
Some(Self::Bzip2)
} else if magic.starts_with(&XZ_MAGIC) {
Some(Self::Xz)
} else {
None
}
}
}
#[pin_project]
struct WithPreexistingBuffer<R> {
buffer: Vec<u8>,
#[pin]
inner: R,
}
impl<R> AsyncRead for WithPreexistingBuffer<R>
where
R: AsyncRead,
{
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
let this = self.project();
if !this.buffer.is_empty() {
// TODO: check if the buffer fits first
buf.put_slice(this.buffer);
this.buffer.clear();
}
this.inner.poll_read(cx, buf)
}
}
#[pin_project(project = DecompressedReaderInnerProj)]
enum DecompressedReaderInner<R> {
Unknown {
buffer: Vec<u8>,
#[pin]
inner: Option<R>,
},
Gzip(#[pin] GzipDecoder<BufReader<WithPreexistingBuffer<R>>>),
Bzip2(#[pin] BzDecoder<BufReader<WithPreexistingBuffer<R>>>),
Xz(#[pin] XzDecoder<BufReader<WithPreexistingBuffer<R>>>),
}
impl<R> DecompressedReaderInner<R>
where
R: AsyncBufRead,
{
fn switch_to(&mut self, algorithm: Algorithm) {
let (buffer, inner) = match self {
DecompressedReaderInner::Unknown { buffer, inner } => {
(mem::take(buffer), inner.take().unwrap())
}
DecompressedReaderInner::Gzip(_)
| DecompressedReaderInner::Bzip2(_)
| DecompressedReaderInner::Xz(_) => unreachable!(),
};
let inner = BufReader::new(WithPreexistingBuffer { buffer, inner });
*self = match algorithm {
Algorithm::Gzip => Self::Gzip(GzipDecoder::new(inner)),
Algorithm::Bzip2 => Self::Bzip2(BzDecoder::new(inner)),
Algorithm::Xz => Self::Xz(XzDecoder::new(inner)),
}
}
}
impl<R> AsyncRead for DecompressedReaderInner<R>
where
R: AsyncBufRead,
{
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
match self.project() {
DecompressedReaderInnerProj::Unknown { .. } => {
unreachable!("Can't call poll_read on Unknown")
}
DecompressedReaderInnerProj::Gzip(inner) => inner.poll_read(cx, buf),
DecompressedReaderInnerProj::Bzip2(inner) => inner.poll_read(cx, buf),
DecompressedReaderInnerProj::Xz(inner) => inner.poll_read(cx, buf),
}
}
}
#[pin_project]
pub struct DecompressedReader<R> {
#[pin]
inner: DecompressedReaderInner<R>,
switch_to: Option<Algorithm>,
}
impl<R> DecompressedReader<R> {
pub fn new(inner: R) -> Self {
Self {
inner: DecompressedReaderInner::Unknown {
buffer: vec![0; BYTES_NEEDED],
inner: Some(inner),
},
switch_to: None,
}
}
}
impl<R> AsyncRead for DecompressedReader<R>
where
R: AsyncBufRead + Unpin,
{
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
let mut this = self.project();
let (buffer, inner) = match this.inner.as_mut().project() {
DecompressedReaderInnerProj::Gzip(inner) => return inner.poll_read(cx, buf),
DecompressedReaderInnerProj::Bzip2(inner) => return inner.poll_read(cx, buf),
DecompressedReaderInnerProj::Xz(inner) => return inner.poll_read(cx, buf),
DecompressedReaderInnerProj::Unknown { buffer, inner } => (buffer, inner),
};
let mut our_buf = ReadBuf::new(buffer);
if let Err(e) = ready!(inner.as_pin_mut().unwrap().poll_read(cx, &mut our_buf)) {
return Poll::Ready(Err(e));
}
let data = our_buf.filled();
if data.len() >= BYTES_NEEDED {
if let Some(algorithm) = Algorithm::from_magic(data) {
this.inner.as_mut().switch_to(algorithm);
} else {
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::InvalidData,
"tar data not gz, bzip2, or xz compressed",
)));
}
this.inner.poll_read(cx, buf)
} else {
cx.waker().wake_by_ref();
Poll::Pending
}
}
}
#[cfg(test)]
mod tests {
use std::path::Path;
use async_compression::tokio::bufread::GzipEncoder;
use futures::TryStreamExt;
use test_case::test_case;
use tokio::io::{AsyncReadExt, BufReader};
use tokio_tar::Archive;
use super::*;
#[tokio::test]
async fn gzip() {
let data = b"abcdefghijk";
let mut enc = GzipEncoder::new(&data[..]);
let mut gzipped = vec![];
enc.read_to_end(&mut gzipped).await.unwrap();
let mut reader = DecompressedReader::new(BufReader::new(&gzipped[..]));
let mut round_tripped = vec![];
reader.read_to_end(&mut round_tripped).await.unwrap();
assert_eq!(data[..], round_tripped[..]);
}
#[test_case(include_bytes!("tests/blob.tar.gz"); "gzip")]
#[test_case(include_bytes!("tests/blob.tar.bz2"); "bzip2")]
#[test_case(include_bytes!("tests/blob.tar.xz"); "xz")]
#[tokio::test]
async fn compressed_tar(data: &[u8]) {
let reader = DecompressedReader::new(BufReader::new(data));
let mut archive = Archive::new(reader);
let mut entries: Vec<_> = archive.entries().unwrap().try_collect().await.unwrap();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].path().unwrap().as_ref(), Path::new("empty"));
let mut data = String::new();
entries[0].read_to_string(&mut data).await.unwrap();
assert_eq!(data, "");
}
}

View file

@ -5,6 +5,7 @@ pub mod tvix_build;
pub mod tvix_io;
pub mod tvix_store_io;
mod decompression;
#[cfg(test)]
mod tests;

Binary file not shown.

Binary file not shown.

Binary file not shown.