feat(tvix/store): use async nar reader for ingest_nar

Rename read_nar to ingest_nar, and have it use the async nar reader
version, and the ingest_entries machinery.

This means we can now drop all code dealing with manually assembling
castore nodes.

Update our consumer, NixHTTPPathInfoService to use the new API.
As we now accept an AsyncRead, we don't need to do any blocking here
anymore, and can use the same async-compression crate as in the fetching
logic (and support some more compression formats out of the box).

Change-Id: I8646d20bd8603f8da47b5c84bc9e4ac236eb7f1a
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11580
Tested-by: BuildkiteCI
Autosubmit: flokli <flokli@flokli.de>
Reviewed-by: Connor Brewster <cbrewster@hey.com>
This commit is contained in:
Florian Klink 2024-04-30 12:17:20 +03:00 committed by clbot
parent bc92f4188e
commit aaf258f61e
7 changed files with 211 additions and 350 deletions

8
tvix/Cargo.lock generated
View file

@ -134,9 +134,9 @@ dependencies = [
[[package]]
name = "async-compression"
version = "0.4.6"
version = "0.4.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a116f46a969224200a0a97f29cfd4c50e7534e4b4826bd23ea2c3c533039c82c"
checksum = "4e9eabd7a98fe442131a17c316bd9349c43695e49e730c3c8e12cfb5f4da2693"
dependencies = [
"bzip2",
"flate2",
@ -145,6 +145,8 @@ dependencies = [
"pin-project-lite",
"tokio",
"xz2",
"zstd",
"zstd-safe",
]
[[package]]
@ -4434,6 +4436,7 @@ name = "tvix-store"
version = "0.1.0"
dependencies = [
"anyhow",
"async-compression",
"async-process",
"async-recursion",
"async-stream",
@ -4479,7 +4482,6 @@ dependencies = [
"tvix-castore",
"url",
"walkdir",
"xz2",
]
[[package]]

View file

@ -454,9 +454,9 @@ rec {
};
"async-compression" = rec {
crateName = "async-compression";
version = "0.4.6";
version = "0.4.9";
edition = "2018";
sha256 = "0b6874q56g1cx8ivs9j89d757rsh9kyrrwlp1852094jjrmg85m1";
sha256 = "14r6vbsbbkqjiqy0qwwywjakdi29jfyidhqp389l5r4gm7bsp7jf";
authors = [
"Wim Looman <wim@nemo157.com>"
"Allen Bui <fairingrey@gmail.com>"
@ -496,6 +496,27 @@ rec {
packageId = "xz2";
optional = true;
}
{
name = "zstd";
packageId = "zstd";
rename = "libzstd";
optional = true;
usesDefaultFeatures = false;
}
{
name = "zstd-safe";
packageId = "zstd-safe";
optional = true;
usesDefaultFeatures = false;
}
];
devDependencies = [
{
name = "tokio";
packageId = "tokio";
usesDefaultFeatures = false;
features = [ "io-util" "macros" "rt-multi-thread" "io-std" ];
}
];
features = {
"all" = [ "all-implementations" "all-algorithms" ];
@ -518,7 +539,7 @@ rec {
"zstd-safe" = [ "dep:zstd-safe" ];
"zstdmt" = [ "zstd" "zstd-safe/zstdmt" ];
};
resolvedDefaultFeatures = [ "bzip2" "flate2" "gzip" "tokio" "xz" "xz2" ];
resolvedDefaultFeatures = [ "bzip2" "flate2" "gzip" "libzstd" "tokio" "xz" "xz2" "zstd" "zstd-safe" ];
};
"async-io" = rec {
crateName = "async-io";
@ -14274,6 +14295,11 @@ rec {
name = "anyhow";
packageId = "anyhow";
}
{
name = "async-compression";
packageId = "async-compression";
features = [ "tokio" "bzip2" "gzip" "xz" "zstd" ];
}
{
name = "async-recursion";
packageId = "async-recursion";
@ -14444,10 +14470,6 @@ rec {
name = "walkdir";
packageId = "walkdir";
}
{
name = "xz2";
packageId = "xz2";
}
];
buildDependencies = [
{

View file

@ -4,6 +4,7 @@ version = "0.1.0"
edition = "2021"
[dependencies]
async-compression = { version = "0.4.9", features = ["tokio", "gzip", "bzip2", "xz"]}
async-recursion = "1.0.5"
bstr = "1.6.0"
bytes = "1.4.0"
@ -30,10 +31,6 @@ md-5 = "0.10.6"
url = "2.4.0"
walkdir = "2.4.0"
[dependencies.async-compression]
version = "0.4.6"
features = ["tokio", "gzip", "bzip2", "xz"]
[dependencies.wu-manber]
git = "https://github.com/tvlfyi/wu-manber.git"

View file

@ -5,6 +5,7 @@ edition = "2021"
[dependencies]
anyhow = "1.0.68"
async-compression = { version = "0.4.9", features = ["tokio", "bzip2", "gzip", "xz", "zstd"]}
async-stream = "0.3.5"
blake3 = { version = "1.3.1", features = ["rayon", "std"] }
bstr = "1.6.0"
@ -41,7 +42,6 @@ url = "2.4.0"
walkdir = "2.4.0"
async-recursion = "1.0.5"
reqwest = { version = "0.11.22", features = ["rustls-tls-native-roots", "stream"], default-features = false }
xz2 = "0.1.7"
[dependencies.tonic-reflection]
optional = true

View file

@ -1,219 +1,120 @@
use bytes::Bytes;
use nix_compat::nar;
use std::io::{self, BufRead};
use tokio_util::io::SyncIoBridge;
use tracing::warn;
use async_recursion::async_recursion;
use nix_compat::nar::reader::r#async as nar_reader;
use tokio::{io::AsyncBufRead, sync::mpsc, try_join};
use tvix_castore::{
blobservice::BlobService,
directoryservice::{DirectoryPutter, DirectoryService},
proto::{self as castorepb},
B3Digest,
directoryservice::DirectoryService,
import::{ingest_entries, IngestionEntry, IngestionError},
proto::{node::Node, NamedNode},
PathBuf,
};
/// Accepts a reader providing a NAR.
/// Will traverse it, uploading blobs to the given [BlobService], and
/// directories to the given [DirectoryService].
/// On success, the root node is returned.
/// This function is not async (because the NAR reader is not)
/// and calls [tokio::task::block_in_place] when interacting with backing
/// services, so make sure to only call this with spawn_blocking.
pub fn read_nar<R, BS, DS>(
r: &mut R,
/// Ingests the contents from a [AsyncRead] providing NAR into the tvix store,
/// interacting with a [BlobService] and [DirectoryService].
/// It returns the castore root node or an error.
pub async fn ingest_nar<R, BS, DS>(
blob_service: BS,
directory_service: DS,
) -> io::Result<castorepb::node::Node>
r: &mut R,
) -> Result<Node, IngestionError<Error>>
where
R: BufRead + Send,
R: AsyncBufRead + Unpin + Send,
BS: BlobService + Clone,
DS: DirectoryService,
{
let handle = tokio::runtime::Handle::current();
// open the NAR for reading.
// The NAR reader emits nodes in DFS preorder.
let root_node = nar_reader::open(r).await.map_err(Error::IO)?;
let directory_putter = directory_service.put_multiple_start();
let (tx, rx) = mpsc::channel(1);
let rx = tokio_stream::wrappers::ReceiverStream::new(rx);
let node = nix_compat::nar::reader::open(r)?;
let (root_node, mut directory_putter) = process_node(
handle.clone(),
"".into(), // this is the root node, it has an empty name
node,
blob_service,
directory_putter,
)?;
let produce = async move {
let res = produce_nar_inner(
blob_service,
root_node,
"root".parse().unwrap(), // HACK: the root node sent to ingest_entries may not be ROOT.
tx.clone(),
)
.await;
// In case the root node points to a directory, we need to close
// [directory_putter], and ensure the digest we got back from there matches
// what the root node is pointing to.
if let castorepb::node::Node::Directory(ref directory_node) = root_node {
// Close directory_putter to make sure all directories have been inserted.
let directory_putter_digest =
handle.block_on(handle.spawn(async move { directory_putter.close().await }))??;
let root_directory_node_digest: B3Digest =
directory_node.digest.clone().try_into().unwrap();
tx.send(res)
.await
.map_err(|e| Error::IO(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)))?;
if directory_putter_digest != root_directory_node_digest {
warn!(
root_directory_node_digest = %root_directory_node_digest,
directory_putter_digest =%directory_putter_digest,
"directory digest mismatch",
);
return Err(io::Error::new(
io::ErrorKind::Other,
"directory digest mismatch",
));
}
}
// In case it's not a Directory, [directory_putter] doesn't need to be
// closed (as we didn't end up uploading anything).
// It can just be dropped, as documented in its trait.
Ok(())
};
Ok(root_node)
let consume = ingest_entries(directory_service, rx);
let (_, node) = try_join!(produce, consume)?;
// remove the fake "root" name again
debug_assert_eq!(&node.get_name(), b"root");
Ok(node.rename("".into()))
}
/// This is called on a [nar::reader::Node] and returns a [castorepb::node::Node].
/// It does so by handling all three kinds, and recursing for directories.
///
/// [DirectoryPutter] is passed around, so a single instance of it can be used,
/// which is sufficient, as this reads through the whole NAR linerarly.
fn process_node<BS>(
handle: tokio::runtime::Handle,
name: bytes::Bytes,
node: nar::reader::Node,
#[async_recursion]
async fn produce_nar_inner<'a: 'async_recursion, 'r: 'async_recursion, BS>(
blob_service: BS,
directory_putter: Box<dyn DirectoryPutter>,
) -> io::Result<(castorepb::node::Node, Box<dyn DirectoryPutter>)>
node: nar_reader::Node<'a, 'r>,
path: PathBuf,
tx: mpsc::Sender<Result<IngestionEntry, Error>>,
) -> Result<IngestionEntry, Error>
where
BS: BlobService + Clone,
{
Ok(match node {
nar::reader::Node::Symlink { target } => (
castorepb::node::Node::Symlink(castorepb::SymlinkNode {
name,
target: target.into(),
}),
directory_putter,
),
nar::reader::Node::File { executable, reader } => (
castorepb::node::Node::File(process_file_reader(
handle,
name,
reader,
nar_reader::Node::Symlink { target } => IngestionEntry::Symlink { path, target },
nar_reader::Node::File {
executable,
mut reader,
} => {
let (digest, size) = {
let mut blob_writer = blob_service.open_write().await;
// TODO(edef): fix the AsyncBufRead implementation of nix_compat::wire::BytesReader
let size = tokio::io::copy(&mut reader, &mut blob_writer).await?;
(blob_writer.close().await?, size)
};
IngestionEntry::Regular {
path,
size,
executable,
blob_service,
)?),
directory_putter,
),
nar::reader::Node::Directory(dir_reader) => {
let (directory_node, directory_putter) =
process_dir_reader(handle, name, dir_reader, blob_service, directory_putter)?;
digest,
}
}
nar_reader::Node::Directory(mut dir_reader) => {
while let Some(entry) = dir_reader.next().await? {
let mut path = path.clone();
(
castorepb::node::Node::Directory(directory_node),
directory_putter,
)
// valid NAR names are valid castore names
path.try_push(&entry.name)
.expect("Tvix bug: failed to join name");
let entry =
produce_nar_inner(blob_service.clone(), entry.node, path, tx.clone()).await?;
tx.send(Ok(entry)).await.map_err(|e| {
Error::IO(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e))
})?;
}
IngestionEntry::Dir { path }
}
})
}
/// Given a name and [nar::reader::FileReader], this ingests the file into the
/// passed [BlobService] and returns a [castorepb::FileNode].
fn process_file_reader<BS>(
handle: tokio::runtime::Handle,
name: Bytes,
mut file_reader: nar::reader::FileReader,
executable: bool,
blob_service: BS,
) -> io::Result<castorepb::FileNode>
where
BS: BlobService,
{
// store the length. If we read any other length, reading will fail.
let expected_len = file_reader.len();
// prepare writing a new blob.
let blob_writer = handle.block_on(async { blob_service.open_write().await });
// write the blob.
let mut blob_writer = {
let mut dst = SyncIoBridge::new(blob_writer);
file_reader.copy(&mut dst)?;
dst.shutdown()?;
// return back the blob_writer
dst.into_inner()
};
// close the blob_writer, retrieve the digest.
let blob_digest = handle.block_on(async { blob_writer.close().await })?;
Ok(castorepb::FileNode {
name,
digest: blob_digest.into(),
size: expected_len,
executable,
})
}
/// Given a name and [nar::reader::DirReader], this returns a [castorepb::DirectoryNode].
/// It uses [process_node] to iterate over all children.
///
/// [DirectoryPutter] is passed around, so a single instance of it can be used,
/// which is sufficient, as this reads through the whole NAR linerarly.
fn process_dir_reader<BS>(
handle: tokio::runtime::Handle,
name: Bytes,
mut dir_reader: nar::reader::DirReader,
blob_service: BS,
directory_putter: Box<dyn DirectoryPutter>,
) -> io::Result<(castorepb::DirectoryNode, Box<dyn DirectoryPutter>)>
where
BS: BlobService + Clone,
{
let mut directory = castorepb::Directory::default();
let mut directory_putter = directory_putter;
while let Some(entry) = dir_reader.next()? {
let (node, directory_putter_back) = process_node(
handle.clone(),
entry.name.into(),
entry.node,
blob_service.clone(),
directory_putter,
)?;
directory_putter = directory_putter_back;
match node {
castorepb::node::Node::Directory(node) => directory.directories.push(node),
castorepb::node::Node::File(node) => directory.files.push(node),
castorepb::node::Node::Symlink(node) => directory.symlinks.push(node),
}
}
// calculate digest and size.
let directory_digest = directory.digest();
let directory_size = directory.size();
// upload the directory. This is a bit more verbose, as we want to get back
// directory_putter for later reuse.
let directory_putter = handle.block_on(handle.spawn(async move {
directory_putter.put(directory).await?;
Ok::<_, io::Error>(directory_putter)
}))??;
Ok((
castorepb::DirectoryNode {
name,
digest: directory_digest.into(),
size: directory_size,
},
directory_putter,
))
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error(transparent)]
IO(#[from] std::io::Error),
}
#[cfg(test)]
mod test {
use crate::nar::read_nar;
use crate::nar::ingest_nar;
use std::io::Cursor;
use std::sync::Arc;
@ -238,19 +139,13 @@ mod test {
blob_service: Arc<dyn BlobService>,
directory_service: Arc<dyn DirectoryService>,
) {
let handle = tokio::runtime::Handle::current();
let root_node = handle
.spawn_blocking(|| {
read_nar(
&mut Cursor::new(&NAR_CONTENTS_SYMLINK.clone()),
blob_service,
directory_service,
)
})
.await
.unwrap()
.expect("must parse");
let root_node = ingest_nar(
blob_service,
directory_service,
&mut Cursor::new(&NAR_CONTENTS_SYMLINK.clone()),
)
.await
.expect("must parse");
assert_eq!(
castorepb::node::Node::Symlink(castorepb::SymlinkNode {
@ -267,22 +162,13 @@ mod test {
blob_service: Arc<dyn BlobService>,
directory_service: Arc<dyn DirectoryService>,
) {
let handle = tokio::runtime::Handle::current();
let root_node = handle
.spawn_blocking({
let blob_service = blob_service.clone();
move || {
read_nar(
&mut Cursor::new(&NAR_CONTENTS_HELLOWORLD.clone()),
blob_service,
directory_service,
)
}
})
.await
.unwrap()
.expect("must parse");
let root_node = ingest_nar(
blob_service.clone(),
directory_service,
&mut Cursor::new(&NAR_CONTENTS_HELLOWORLD.clone()),
)
.await
.expect("must parse");
assert_eq!(
castorepb::node::Node::File(castorepb::FileNode {
@ -304,23 +190,13 @@ mod test {
blob_service: Arc<dyn BlobService>,
directory_service: Arc<dyn DirectoryService>,
) {
let handle = tokio::runtime::Handle::current();
let root_node = handle
.spawn_blocking({
let blob_service = blob_service.clone();
let directory_service = directory_service.clone();
|| {
read_nar(
&mut Cursor::new(&NAR_CONTENTS_COMPLICATED.clone()),
blob_service,
directory_service,
)
}
})
.await
.unwrap()
.expect("must parse");
let root_node = ingest_nar(
blob_service.clone(),
directory_service.clone(),
&mut Cursor::new(&NAR_CONTENTS_COMPLICATED.clone()),
)
.await
.expect("must parse");
assert_eq!(
castorepb::node::Node::Directory(castorepb::DirectoryNode {

View file

@ -2,7 +2,7 @@ use tvix_castore::B3Digest;
mod import;
mod renderer;
pub use import::read_nar;
pub use import::ingest_nar;
pub use renderer::calculate_size_and_sha256;
pub use renderer::write_nar;

View file

@ -1,5 +1,3 @@
use std::io::{self, BufRead, Read, Write};
use data_encoding::BASE64;
use futures::{stream::BoxStream, TryStreamExt};
use nix_compat::{
@ -8,7 +6,10 @@ use nix_compat::{
nixhash::NixHash,
};
use reqwest::StatusCode;
use sha2::{digest::FixedOutput, Digest, Sha256};
use sha2::Digest;
use std::io::{self, Write};
use tokio::io::{AsyncRead, BufReader};
use tokio_util::io::InspectReader;
use tonic::async_trait;
use tracing::{debug, instrument, warn};
use tvix_castore::{
@ -171,85 +172,83 @@ where
)));
}
// get an AsyncRead of the response body.
let async_r = tokio_util::io::StreamReader::new(resp.bytes_stream().map_err(|e| {
// get a reader of the response body.
let r = tokio_util::io::StreamReader::new(resp.bytes_stream().map_err(|e| {
let e = e.without_url();
warn!(e=%e, "failed to get response body");
io::Error::new(io::ErrorKind::BrokenPipe, e.to_string())
}));
let sync_r = tokio_util::io::SyncIoBridge::new(async_r);
// handle decompression, by wrapping the reader.
let sync_r: Box<dyn BufRead + Send> = match narinfo.compression {
Some("none") => Box::new(sync_r),
Some("xz") => Box::new(io::BufReader::new(xz2::read::XzDecoder::new(sync_r))),
Some(comp) => {
return Err(Error::InvalidRequest(
format!("unsupported compression: {}", comp).to_string(),
))
}
None => {
return Err(Error::InvalidRequest(
"unsupported compression: bzip2".to_string(),
))
// handle decompression, depending on the compression field.
let r: Box<dyn AsyncRead + Send + Unpin> = match narinfo.compression {
Some("none") => Box::new(r) as Box<dyn AsyncRead + Send + Unpin>,
Some("bzip2") | None => Box::new(async_compression::tokio::bufread::BzDecoder::new(r))
as Box<dyn AsyncRead + Send + Unpin>,
Some("gzip") => Box::new(async_compression::tokio::bufread::GzipDecoder::new(r))
as Box<dyn AsyncRead + Send + Unpin>,
Some("xz") => Box::new(async_compression::tokio::bufread::XzDecoder::new(r))
as Box<dyn AsyncRead + Send + Unpin>,
Some("zstd") => Box::new(async_compression::tokio::bufread::ZstdDecoder::new(r))
as Box<dyn AsyncRead + Send + Unpin>,
Some(comp_str) => {
return Err(Error::StorageError(format!(
"unsupported compression: {comp_str}"
)));
}
};
let mut nar_hash = sha2::Sha256::new();
let mut nar_size = 0;
let res = tokio::task::spawn_blocking({
let blob_service = self.blob_service.clone();
let directory_service = self.directory_service.clone();
move || -> io::Result<_> {
// Wrap the reader once more, so we can calculate NarSize and NarHash
let mut sync_r = io::BufReader::new(NarReader::from(sync_r));
let root_node = crate::nar::read_nar(&mut sync_r, blob_service, directory_service)?;
// Assemble NarHash and NarSize as we read bytes.
let r = InspectReader::new(r, |b| {
nar_size += b.len() as u64;
nar_hash.write_all(b).unwrap();
});
let (_, nar_hash, nar_size) = sync_r.into_inner().into_inner();
// HACK: InspectReader doesn't implement AsyncBufRead, but neither do our decompressors.
let mut r = BufReader::new(r);
Ok((root_node, nar_hash, nar_size))
}
})
let root_node = crate::nar::ingest_nar(
self.blob_service.clone(),
self.directory_service.clone(),
&mut r,
)
.await
.unwrap();
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
match res {
Ok((root_node, nar_hash, nar_size)) => {
// ensure the ingested narhash and narsize do actually match.
if narinfo.nar_size != nar_size {
warn!(
narinfo.nar_size = narinfo.nar_size,
http.nar_size = nar_size,
"NARSize mismatch"
);
Err(io::Error::new(
io::ErrorKind::InvalidData,
"NarSize mismatch".to_string(),
))?;
}
if narinfo.nar_hash != nar_hash {
warn!(
narinfo.nar_hash = %NixHash::Sha256(narinfo.nar_hash),
http.nar_hash = %NixHash::Sha256(nar_hash),
"NarHash mismatch"
);
Err(io::Error::new(
io::ErrorKind::InvalidData,
"NarHash mismatch".to_string(),
))?;
}
Ok(Some(PathInfo {
node: Some(castorepb::Node {
// set the name of the root node to the digest-name of the store path.
node: Some(
root_node.rename(narinfo.store_path.to_string().to_owned().into()),
),
}),
references: pathinfo.references,
narinfo: pathinfo.narinfo,
}))
}
Err(e) => Err(e.into()),
// ensure the ingested narhash and narsize do actually match.
if narinfo.nar_size != nar_size {
warn!(
narinfo.nar_size = narinfo.nar_size,
http.nar_size = nar_size,
"NARSize mismatch"
);
Err(io::Error::new(
io::ErrorKind::InvalidData,
"NarSize mismatch".to_string(),
))?;
}
let nar_hash: [u8; 32] = nar_hash.finalize().into();
if narinfo.nar_hash != nar_hash {
warn!(
narinfo.nar_hash = %NixHash::Sha256(narinfo.nar_hash),
http.nar_hash = %NixHash::Sha256(nar_hash),
"NarHash mismatch"
);
Err(io::Error::new(
io::ErrorKind::InvalidData,
"NarHash mismatch".to_string(),
))?;
}
Ok(Some(PathInfo {
node: Some(castorepb::Node {
// set the name of the root node to the digest-name of the store path.
node: Some(root_node.rename(narinfo.store_path.to_string().to_owned().into())),
}),
references: pathinfo.references,
narinfo: pathinfo.narinfo,
}))
}
#[instrument(skip_all, fields(path_info=?_path_info))]
@ -277,38 +276,3 @@ where
}))
}
}
/// Small helper reader implementing [std::io::Read].
/// It can be used to wrap another reader, counts the number of bytes read
/// and the sha256 digest of the contents.
struct NarReader<R: Read> {
r: R,
sha256: sha2::Sha256,
bytes_read: u64,
}
impl<R: Read> NarReader<R> {
pub fn from(inner: R) -> Self {
Self {
r: inner,
sha256: Sha256::new(),
bytes_read: 0,
}
}
/// Returns the (remaining) inner reader, the sha256 digest and the number of bytes read.
pub fn into_inner(self) -> (R, [u8; 32], u64) {
(self.r, self.sha256.finalize_fixed().into(), self.bytes_read)
}
}
impl<R: Read> Read for NarReader<R> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.r.read(buf).map(|n| {
self.bytes_read += n as u64;
self.sha256.write_all(&buf[..n]).unwrap();
n
})
}
}