feat(tvix/store): use tvix_compat::nar::writer::async

Change-Id: Iad36872244df6f2225a2884f6b20cacd8f918b31
Reviewed-on: https://cl.tvl.fyi/c/depot/+/9619
Tested-by: BuildkiteCI
Reviewed-by: flokli <flokli@flokli.de>
Autosubmit: edef <edef@edef.eu>
This commit is contained in:
edef 2023-10-09 21:06:02 +00:00 committed by clbot
parent 8b35d97b4b
commit d23fe6ee20
5 changed files with 154 additions and 70 deletions

13
tvix/Cargo.lock generated
View file

@ -105,6 +105,17 @@ version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6" checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6"
[[package]]
name = "async-recursion"
version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5fd55a5ba1179988837d24ab4c7cc8ed6efdeff578ede0416b4225a5fca35bd0"
dependencies = [
"proc-macro2 1.0.67",
"quote 1.0.26",
"syn 2.0.16",
]
[[package]] [[package]]
name = "async-stream" name = "async-stream"
version = "0.3.5" version = "0.3.5"
@ -2492,6 +2503,7 @@ checksum = "1d68074620f57a0b21594d9735eb2e98ab38b17f80d3fcb189fca266771ca60d"
dependencies = [ dependencies = [
"bytes", "bytes",
"futures-core", "futures-core",
"futures-io",
"futures-sink", "futures-sink",
"pin-project-lite", "pin-project-lite",
"tokio", "tokio",
@ -2798,6 +2810,7 @@ name = "tvix-store"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-recursion",
"async-stream", "async-stream",
"blake3", "blake3",
"bytes", "bytes",

View file

@ -381,6 +381,35 @@ rec {
"serde" = [ "dep:serde" ]; "serde" = [ "dep:serde" ];
}; };
}; };
"async-recursion" = rec {
crateName = "async-recursion";
version = "1.0.5";
edition = "2018";
sha256 = "1l2vlgyaa9a2dd0y1vbqyppzsvpdr1y4rar4gn1qi68pl5dmmmaz";
procMacro = true;
authors = [
"Robert Usher <266585+dcchut@users.noreply.github.com>"
];
dependencies = [
{
name = "proc-macro2";
packageId = "proc-macro2 1.0.67";
usesDefaultFeatures = false;
}
{
name = "quote";
packageId = "quote 1.0.26";
usesDefaultFeatures = false;
}
{
name = "syn";
packageId = "syn 2.0.16";
usesDefaultFeatures = false;
features = [ "full" "parsing" "printing" "proc-macro" "clone-impls" ];
}
];
};
"async-stream" = rec { "async-stream" = rec {
crateName = "async-stream"; crateName = "async-stream";
version = "0.3.5"; version = "0.3.5";
@ -2420,7 +2449,7 @@ rec {
features = { features = {
"default" = [ "std" ]; "default" = [ "std" ];
}; };
resolvedDefaultFeatures = [ "std" ]; resolvedDefaultFeatures = [ "default" "std" ];
}; };
"futures-macro" = rec { "futures-macro" = rec {
crateName = "futures-macro"; crateName = "futures-macro";
@ -7290,6 +7319,11 @@ rec {
name = "futures-core"; name = "futures-core";
packageId = "futures-core"; packageId = "futures-core";
} }
{
name = "futures-io";
packageId = "futures-io";
optional = true;
}
{ {
name = "futures-sink"; name = "futures-sink";
packageId = "futures-sink"; packageId = "futures-sink";
@ -7333,7 +7367,7 @@ rec {
"time" = [ "tokio/time" "slab" ]; "time" = [ "tokio/time" "slab" ];
"tracing" = [ "dep:tracing" ]; "tracing" = [ "dep:tracing" ];
}; };
resolvedDefaultFeatures = [ "codec" "default" "io" "io-util" "tracing" ]; resolvedDefaultFeatures = [ "codec" "compat" "default" "futures-io" "io" "io-util" "tracing" ];
}; };
"toml" = rec { "toml" = rec {
crateName = "toml"; crateName = "toml";
@ -8474,6 +8508,10 @@ rec {
name = "anyhow"; name = "anyhow";
packageId = "anyhow"; packageId = "anyhow";
} }
{
name = "async-recursion";
packageId = "async-recursion";
}
{ {
name = "async-stream"; name = "async-stream";
packageId = "async-stream"; packageId = "async-stream";
@ -8521,6 +8559,7 @@ rec {
{ {
name = "nix-compat"; name = "nix-compat";
packageId = "nix-compat"; packageId = "nix-compat";
features = [ "async" ];
} }
{ {
name = "parking_lot"; name = "parking_lot";
@ -8564,7 +8603,7 @@ rec {
{ {
name = "tokio-util"; name = "tokio-util";
packageId = "tokio-util"; packageId = "tokio-util";
features = [ "io" "io-util" ]; features = [ "io" "io-util" "compat" ];
} }
{ {
name = "tonic"; name = "tonic";

View file

@ -13,7 +13,7 @@ count-write = "0.1.0"
data-encoding = "2.3.3" data-encoding = "2.3.3"
futures = "0.3.28" futures = "0.3.28"
lazy_static = "1.4.0" lazy_static = "1.4.0"
nix-compat = { path = "../nix-compat" } nix-compat = { path = "../nix-compat", features = ["async"] }
parking_lot = "0.12.1" parking_lot = "0.12.1"
pin-project-lite = "0.2.13" pin-project-lite = "0.2.13"
prost = "0.12.1" prost = "0.12.1"
@ -21,7 +21,7 @@ sha2 = "0.10.6"
sled = { version = "0.34.7", features = ["compression"] } sled = { version = "0.34.7", features = ["compression"] }
thiserror = "1.0.38" thiserror = "1.0.38"
tokio-stream = { version = "0.1.14", features = ["fs"] } tokio-stream = { version = "0.1.14", features = ["fs"] }
tokio-util = { version = "0.7.9", features = ["io", "io-util"] } tokio-util = { version = "0.7.9", features = ["io", "io-util", "compat"] }
tokio = { version = "1.32.0", features = ["fs", "macros", "net", "rt", "rt-multi-thread", "signal"] } tokio = { version = "1.32.0", features = ["fs", "macros", "net", "rt", "rt-multi-thread", "signal"] }
tonic = "0.10.2" tonic = "0.10.2"
tower = "0.4.13" tower = "0.4.13"
@ -31,6 +31,7 @@ tvix-castore = { path = "../castore" }
url = "2.4.0" url = "2.4.0"
walkdir = "2.4.0" walkdir = "2.4.0"
tokio-listener = { version = "0.2.1" } tokio-listener = { version = "0.2.1" }
async-recursion = "1.0.5"
[dependencies.fuse-backend-rs] [dependencies.fuse-backend-rs]
optional = true optional = true

View file

@ -1,9 +1,15 @@
use super::RenderError; use super::RenderError;
use async_recursion::async_recursion;
use count_write::CountWrite; use count_write::CountWrite;
use nix_compat::nar; use nix_compat::nar::writer::r#async as nar_writer;
use sha2::{Digest, Sha256}; use sha2::{Digest, Sha256};
use std::{io, sync::Arc}; use std::{
use tokio::{io::BufReader, task::spawn_blocking}; pin::Pin,
sync::Arc,
task::{self, Poll},
};
use tokio::io::{self, AsyncWrite, BufReader};
use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
use tracing::warn; use tracing::warn;
use tvix_castore::{ use tvix_castore::{
blobservice::BlobService, blobservice::BlobService,
@ -19,57 +25,79 @@ pub async fn calculate_size_and_sha256(
blob_service: Arc<dyn BlobService>, blob_service: Arc<dyn BlobService>,
directory_service: Arc<dyn DirectoryService>, directory_service: Arc<dyn DirectoryService>,
) -> Result<(u64, [u8; 32]), RenderError> { ) -> Result<(u64, [u8; 32]), RenderError> {
let h = Sha256::new(); let mut h = Sha256::new();
let cw = CountWrite::from(h); let mut cw = CountWrite::from(&mut h);
let cw = write_nar(cw, root_node, blob_service, directory_service).await?; write_nar(
// The hasher doesn't speak async. It doesn't
// actually do any I/O, so it's fine to wrap.
AsyncIoBridge(&mut cw),
root_node,
blob_service,
directory_service,
)
.await?;
Ok((cw.count(), cw.into_inner().finalize().into())) Ok((cw.count(), h.finalize().into()))
}
/// The inverse of [tokio_util::io::SyncIoBridge].
/// Don't use this with anything that actually does blocking I/O.
struct AsyncIoBridge<T>(T);
impl<W: std::io::Write + Unpin> AsyncWrite for AsyncIoBridge<W> {
fn poll_write(
self: Pin<&mut Self>,
_cx: &mut task::Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
Poll::Ready(self.get_mut().0.write(buf))
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(self.get_mut().0.flush())
}
fn poll_shutdown(
self: Pin<&mut Self>,
_cx: &mut task::Context<'_>,
) -> Poll<Result<(), io::Error>> {
Poll::Ready(Ok(()))
}
} }
/// Accepts a [castorepb::node::Node] pointing to the root of a (store) path, /// Accepts a [castorepb::node::Node] pointing to the root of a (store) path,
/// and uses the passed blob_service and directory_service to perform the /// and uses the passed blob_service and directory_service to perform the
/// necessary lookups as it traverses the structure. /// necessary lookups as it traverses the structure.
/// The contents in NAR serialization are writen to the passed [std::io::Write]. /// The contents in NAR serialization are writen to the passed [AsyncWrite].
/// pub async fn write_nar<W: AsyncWrite + Unpin + Send>(
/// The writer is passed back in the return value. This is done because async Rust w: W,
/// lacks scoped blocking tasks, so we need to transfer ownership of the writer
/// internally.
///
/// # Panics
/// This will panic if called outside the context of a Tokio runtime.
pub async fn write_nar<W: std::io::Write + Send + 'static>(
mut w: W,
proto_root_node: &castorepb::node::Node, proto_root_node: &castorepb::node::Node,
blob_service: Arc<dyn BlobService>, blob_service: Arc<dyn BlobService>,
directory_service: Arc<dyn DirectoryService>, directory_service: Arc<dyn DirectoryService>,
) -> Result<W, RenderError> { ) -> Result<(), RenderError> {
let tokio_handle = tokio::runtime::Handle::current(); // Initialize NAR writer
let proto_root_node = proto_root_node.clone(); let mut w = w.compat_write();
let nar_root_node = nar_writer::open(&mut w)
.await
.map_err(RenderError::NARWriterError)?;
spawn_blocking(move || { walk_node(
// Initialize NAR writer nar_root_node,
let nar_root_node = nar::writer::open(&mut w).map_err(RenderError::NARWriterError)?; &proto_root_node,
blob_service,
directory_service,
)
.await?;
walk_node( Ok(())
tokio_handle,
nar_root_node,
&proto_root_node,
blob_service,
directory_service,
)?;
Ok(w)
})
.await
.unwrap()
} }
/// Process an intermediate node in the structure. /// Process an intermediate node in the structure.
/// This consumes the node. /// This consumes the node.
fn walk_node( #[async_recursion]
tokio_handle: tokio::runtime::Handle, async fn walk_node(
nar_node: nar::writer::Node, nar_node: nar_writer::Node<'async_recursion, '_>,
proto_node: &castorepb::node::Node, proto_node: &castorepb::node::Node,
blob_service: Arc<dyn BlobService>, blob_service: Arc<dyn BlobService>,
directory_service: Arc<dyn DirectoryService>, directory_service: Arc<dyn DirectoryService>,
@ -78,6 +106,7 @@ fn walk_node(
castorepb::node::Node::Symlink(proto_symlink_node) => { castorepb::node::Node::Symlink(proto_symlink_node) => {
nar_node nar_node
.symlink(&proto_symlink_node.target) .symlink(&proto_symlink_node.target)
.await
.map_err(RenderError::NARWriterError)?; .map_err(RenderError::NARWriterError)?;
} }
castorepb::node::Node::File(proto_file_node) => { castorepb::node::Node::File(proto_file_node) => {
@ -92,8 +121,9 @@ fn walk_node(
)) ))
})?; })?;
let blob_reader = match tokio_handle let blob_reader = match blob_service
.block_on(async { blob_service.open_read(&digest).await }) .open_read(&digest)
.await
.map_err(RenderError::StoreError)? .map_err(RenderError::StoreError)?
{ {
Some(blob_reader) => Ok(BufReader::new(blob_reader)), Some(blob_reader) => Ok(BufReader::new(blob_reader)),
@ -107,8 +137,9 @@ fn walk_node(
.file( .file(
proto_file_node.executable, proto_file_node.executable,
proto_file_node.size.into(), proto_file_node.size.into(),
&mut tokio_util::io::SyncIoBridge::new(blob_reader), &mut blob_reader.compat(),
) )
.await
.map_err(RenderError::NARWriterError)?; .map_err(RenderError::NARWriterError)?;
} }
castorepb::node::Node::Directory(proto_directory_node) => { castorepb::node::Node::Directory(proto_directory_node) => {
@ -123,8 +154,9 @@ fn walk_node(
})?; })?;
// look it up with the directory service // look it up with the directory service
match tokio_handle match directory_service
.block_on(async { directory_service.get(&digest).await }) .get(&digest)
.await
.map_err(RenderError::StoreError)? .map_err(RenderError::StoreError)?
{ {
// if it's None, that's an error! // if it's None, that's an error!
@ -136,27 +168,31 @@ fn walk_node(
} }
Some(proto_directory) => { Some(proto_directory) => {
// start a directory node // start a directory node
let mut nar_node_directory = let mut nar_node_directory = nar_node
nar_node.directory().map_err(RenderError::NARWriterError)?; .directory()
.await
.map_err(RenderError::NARWriterError)?;
// for each node in the directory, create a new entry with its name, // for each node in the directory, create a new entry with its name,
// and then invoke walk_node on that entry. // and then invoke walk_node on that entry.
for proto_node in proto_directory.nodes() { for proto_node in proto_directory.nodes() {
let child_node = nar_node_directory let child_node = nar_node_directory
.entry(proto_node.get_name()) .entry(proto_node.get_name())
.await
.map_err(RenderError::NARWriterError)?; .map_err(RenderError::NARWriterError)?;
walk_node( walk_node(
tokio_handle.clone(),
child_node, child_node,
&proto_node, &proto_node,
blob_service.clone(), blob_service.clone(),
directory_service.clone(), directory_service.clone(),
)?; )
.await?;
} }
// close the directory // close the directory
nar_node_directory nar_node_directory
.close() .close()
.await
.map_err(RenderError::NARWriterError)?; .map_err(RenderError::NARWriterError)?;
} }
} }

View file

@ -4,16 +4,17 @@ use crate::tests::fixtures::*;
use crate::tests::utils::*; use crate::tests::utils::*;
use sha2::{Digest, Sha256}; use sha2::{Digest, Sha256};
use std::io; use std::io;
use tokio::io::sink;
use tvix_castore::proto::DirectoryNode; use tvix_castore::proto::DirectoryNode;
use tvix_castore::proto::FileNode; use tvix_castore::proto::FileNode;
use tvix_castore::proto::{self as castorepb, SymlinkNode}; use tvix_castore::proto::{self as castorepb, SymlinkNode};
#[tokio::test] #[tokio::test]
async fn single_symlink() { async fn single_symlink() {
let buf: Vec<u8> = vec![]; let mut buf: Vec<u8> = vec![];
let buf = write_nar( write_nar(
buf, &mut buf,
&castorepb::node::Node::Symlink(SymlinkNode { &castorepb::node::Node::Symlink(SymlinkNode {
name: "doesntmatter".into(), name: "doesntmatter".into(),
target: "/nix/store/somewhereelse".into(), target: "/nix/store/somewhereelse".into(),
@ -31,10 +32,8 @@ async fn single_symlink() {
/// Make sure the NARRenderer fails if a referred blob doesn't exist. /// Make sure the NARRenderer fails if a referred blob doesn't exist.
#[tokio::test] #[tokio::test]
async fn single_file_missing_blob() { async fn single_file_missing_blob() {
let buf: Vec<u8> = vec![];
let e = write_nar( let e = write_nar(
buf, sink(),
&castorepb::node::Node::File(FileNode { &castorepb::node::Node::File(FileNode {
name: "doesntmatter".into(), name: "doesntmatter".into(),
digest: HELLOWORLD_BLOB_DIGEST.clone().into(), digest: HELLOWORLD_BLOB_DIGEST.clone().into(),
@ -78,10 +77,8 @@ async fn single_file_wrong_blob_size() {
let bs = blob_service.clone(); let bs = blob_service.clone();
// Test with a root FileNode of a too big size // Test with a root FileNode of a too big size
{ {
let buf: Vec<u8> = vec![];
let e = write_nar( let e = write_nar(
buf, sink(),
&castorepb::node::Node::File(FileNode { &castorepb::node::Node::File(FileNode {
name: "doesntmatter".into(), name: "doesntmatter".into(),
digest: HELLOWORLD_BLOB_DIGEST.clone().into(), digest: HELLOWORLD_BLOB_DIGEST.clone().into(),
@ -105,10 +102,8 @@ async fn single_file_wrong_blob_size() {
let bs = blob_service.clone(); let bs = blob_service.clone();
// Test with a root FileNode of a too small size // Test with a root FileNode of a too small size
{ {
let buf: Vec<u8> = vec![];
let e = write_nar( let e = write_nar(
buf, sink(),
&castorepb::node::Node::File(FileNode { &castorepb::node::Node::File(FileNode {
name: "doesntmatter".into(), name: "doesntmatter".into(),
digest: HELLOWORLD_BLOB_DIGEST.clone().into(), digest: HELLOWORLD_BLOB_DIGEST.clone().into(),
@ -148,10 +143,10 @@ async fn single_file() {
writer.close().await.unwrap() writer.close().await.unwrap()
); );
let buf: Vec<u8> = vec![]; let mut buf: Vec<u8> = vec![];
let buf = write_nar( write_nar(
buf, &mut buf,
&castorepb::node::Node::File(FileNode { &castorepb::node::Node::File(FileNode {
name: "doesntmatter".into(), name: "doesntmatter".into(),
digest: HELLOWORLD_BLOB_DIGEST.clone().into(), digest: HELLOWORLD_BLOB_DIGEST.clone().into(),
@ -192,13 +187,13 @@ async fn test_complicated() {
.await .await
.unwrap(); .unwrap();
let buf: Vec<u8> = vec![]; let mut buf: Vec<u8> = vec![];
let bs = blob_service.clone(); let bs = blob_service.clone();
let ds = directory_service.clone(); let ds = directory_service.clone();
let buf = write_nar( write_nar(
buf, &mut buf,
&castorepb::node::Node::Directory(DirectoryNode { &castorepb::node::Node::Directory(DirectoryNode {
name: "doesntmatter".into(), name: "doesntmatter".into(),
digest: DIRECTORY_COMPLICATED.digest().into(), digest: DIRECTORY_COMPLICATED.digest().into(),