refactor(tvix/nix-compat): move nar writer to tokio

There's little reason to keep the nar writer using Async{Read,Write}
traits from futures, while everything else async in tvix (and
nix-compat) uses tokio.

Change-Id: I8cd1efcd0dd5bb76471de997603c7b701a5095de
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11391
Tested-by: BuildkiteCI
Reviewed-by: raitobezarius <tvl@lahfa.xyz>
Reviewed-by: Brian Olsen <me@griff.name>
This commit is contained in:
Florian Klink 2024-04-10 16:33:02 +03:00 committed by flokli
parent 742937d55c
commit 45cf7ae657
6 changed files with 51 additions and 67 deletions

1
tvix/Cargo.lock generated
View file

@ -2187,7 +2187,6 @@ dependencies = [
"ed25519-dalek", "ed25519-dalek",
"enum-primitive-derive", "enum-primitive-derive",
"futures", "futures",
"futures-util",
"glob", "glob",
"hex-literal", "hex-literal",
"lazy_static", "lazy_static",

View file

@ -6717,12 +6717,6 @@ rec {
name = "enum-primitive-derive"; name = "enum-primitive-derive";
packageId = "enum-primitive-derive"; packageId = "enum-primitive-derive";
} }
{
name = "futures-util";
packageId = "futures-util";
optional = true;
features = [ "io" ];
}
{ {
name = "glob"; name = "glob";
packageId = "glob"; packageId = "glob";
@ -6814,13 +6808,12 @@ rec {
} }
]; ];
features = { features = {
"async" = [ "futures-util" ]; "async" = [ "tokio" ];
"futures-util" = [ "dep:futures-util" ];
"pin-project-lite" = [ "dep:pin-project-lite" ]; "pin-project-lite" = [ "dep:pin-project-lite" ];
"tokio" = [ "dep:tokio" ]; "tokio" = [ "dep:tokio" ];
"wire" = [ "tokio" "pin-project-lite" ]; "wire" = [ "tokio" "pin-project-lite" ];
}; };
resolvedDefaultFeatures = [ "async" "futures-util" "pin-project-lite" "tokio" "wire" ]; resolvedDefaultFeatures = [ "async" "pin-project-lite" "tokio" "wire" ];
}; };
"nom" = rec { "nom" = rec {
crateName = "nom"; crateName = "nom";

View file

@ -5,7 +5,7 @@ edition = "2021"
[features] [features]
# async NAR writer # async NAR writer
async = ["futures-util"] async = ["tokio"]
# code emitting low-level packets used in the daemon protocol. # code emitting low-level packets used in the daemon protocol.
wire = ["tokio", "pin-project-lite"] wire = ["tokio", "pin-project-lite"]
@ -16,7 +16,6 @@ data-encoding = "2.3.3"
ed25519 = "2.2.3" ed25519 = "2.2.3"
ed25519-dalek = "2.1.0" ed25519-dalek = "2.1.0"
enum-primitive-derive = "0.3.0" enum-primitive-derive = "0.3.0"
futures-util = { version = "0.3.30", features = ["io"], optional = true }
glob = "0.3.0" glob = "0.3.0"
nom = "7.1.3" nom = "7.1.3"
num-traits = "0.2.18" num-traits = "0.2.18"

View file

@ -10,7 +10,7 @@
//! //!
//! ```rust //! ```rust
//! # futures::executor::block_on(async { //! # futures::executor::block_on(async {
//! # use futures::io::BufReader; //! # use tokio::io::BufReader;
//! # let some_file: Vec<u8> = vec![0, 1, 2, 3, 4]; //! # let some_file: Vec<u8> = vec![0, 1, 2, 3, 4];
//! //!
//! // Output location to write the NAR to. //! // Output location to write the NAR to.
@ -31,7 +31,6 @@
//! ``` //! ```
use crate::nar::wire; use crate::nar::wire;
use futures_util::{AsyncBufRead, AsyncBufReadExt, AsyncWrite, AsyncWriteExt};
use std::{ use std::{
io::{ io::{
self, self,
@ -39,6 +38,7 @@ use std::{
}, },
pin::Pin, pin::Pin,
}; };
use tokio::io::{AsyncBufRead, AsyncBufReadExt, AsyncWrite, AsyncWriteExt};
/// Convenience type alias for types implementing [`AsyncWrite`]. /// Convenience type alias for types implementing [`AsyncWrite`].
pub type Writer<'a> = dyn AsyncWrite + Unpin + Send + 'a; pub type Writer<'a> = dyn AsyncWrite + Unpin + Send + 'a;

View file

@ -11,16 +11,14 @@ fn symlink() {
} }
#[cfg(feature = "async")] #[cfg(feature = "async")]
#[test] #[tokio::test]
fn symlink_async() { async fn symlink_async() {
let mut buf = vec![]; let mut buf = vec![];
futures::executor::block_on(async {
let node = nar::writer::r#async::open(&mut buf).await.unwrap(); let node = nar::writer::r#async::open(&mut buf).await.unwrap();
node.symlink("/nix/store/somewhereelse".as_bytes()) node.symlink("/nix/store/somewhereelse".as_bytes())
.await .await
.unwrap(); .unwrap();
});
assert_eq!(include_bytes!("../tests/symlink.nar"), buf.as_slice()); assert_eq!(include_bytes!("../tests/symlink.nar"), buf.as_slice());
} }
@ -42,22 +40,22 @@ fn file() {
} }
#[cfg(feature = "async")] #[cfg(feature = "async")]
#[test] #[tokio::test]
fn file_async() { async fn file_async() {
use std::io::Cursor;
let mut buf = vec![]; let mut buf = vec![];
futures::executor::block_on(async {
let node = nar::writer::r#async::open(&mut buf).await.unwrap(); let node = nar::writer::r#async::open(&mut buf).await.unwrap();
let file_contents = "Hello World!".to_string(); let file_contents = "Hello World!".to_string();
node.file( node.file(
false, false,
file_contents.len() as u64, file_contents.len() as u64,
&mut futures::io::Cursor::new(file_contents), &mut Cursor::new(file_contents),
) )
.await .await
.unwrap(); .unwrap();
});
assert_eq!(include_bytes!("../tests/helloworld.nar"), buf.as_slice()); assert_eq!(include_bytes!("../tests/helloworld.nar"), buf.as_slice());
} }
@ -93,17 +91,18 @@ fn complicated() {
} }
#[cfg(feature = "async")] #[cfg(feature = "async")]
#[test] #[tokio::test]
fn complicated_async() { async fn complicated_async() {
use std::io::Cursor;
let mut buf = vec![]; let mut buf = vec![];
futures::executor::block_on(async {
let node = nar::writer::r#async::open(&mut buf).await.unwrap(); let node = nar::writer::r#async::open(&mut buf).await.unwrap();
let mut dir_node = node.directory().await.unwrap(); let mut dir_node = node.directory().await.unwrap();
let e = dir_node.entry(".keep".as_bytes()).await.unwrap(); let e = dir_node.entry(".keep".as_bytes()).await.unwrap();
e.file(false, 0, &mut futures::io::Cursor::new([])) e.file(false, 0, &mut Cursor::new([]))
.await .await
.expect("read .keep must succeed"); .expect("read .keep must succeed");
@ -119,15 +118,11 @@ fn complicated_async() {
.entry(".keep".as_bytes()) .entry(".keep".as_bytes())
.await .await
.expect("subdir entry must succeed"); .expect("subdir entry must succeed");
e_sub e_sub.file(false, 0, &mut Cursor::new([])).await.unwrap();
.file(false, 0, &mut futures::io::Cursor::new([]))
.await
.unwrap();
// close the subdir, and then the dir, which is required. // close the subdir, and then the dir, which is required.
subdir_node.close().await.unwrap(); subdir_node.close().await.unwrap();
dir_node.close().await.unwrap(); dir_node.close().await.unwrap();
});
assert_eq!(include_bytes!("../tests/complicated.nar"), buf.as_slice()); assert_eq!(include_bytes!("../tests/complicated.nar"), buf.as_slice());
} }

View file

@ -6,7 +6,6 @@ use count_write::CountWrite;
use nix_compat::nar::writer::r#async as nar_writer; use nix_compat::nar::writer::r#async as nar_writer;
use sha2::{Digest, Sha256}; use sha2::{Digest, Sha256};
use tokio::io::{self, AsyncWrite, BufReader}; use tokio::io::{self, AsyncWrite, BufReader};
use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
use tvix_castore::{ use tvix_castore::{
blobservice::BlobService, blobservice::BlobService,
directoryservice::DirectoryService, directoryservice::DirectoryService,
@ -45,7 +44,7 @@ where
/// necessary lookups as it traverses the structure. /// necessary lookups as it traverses the structure.
/// The contents in NAR serialization are writen to the passed [AsyncWrite]. /// The contents in NAR serialization are writen to the passed [AsyncWrite].
pub async fn write_nar<W, BS, DS>( pub async fn write_nar<W, BS, DS>(
w: W, mut w: W,
proto_root_node: &castorepb::node::Node, proto_root_node: &castorepb::node::Node,
blob_service: BS, blob_service: BS,
directory_service: DS, directory_service: DS,
@ -56,7 +55,6 @@ where
DS: DirectoryService + Send, DS: DirectoryService + Send,
{ {
// Initialize NAR writer // Initialize NAR writer
let mut w = w.compat_write();
let nar_root_node = nar_writer::open(&mut w) let nar_root_node = nar_writer::open(&mut w)
.await .await
.map_err(RenderError::NARWriterError)?; .map_err(RenderError::NARWriterError)?;
@ -101,7 +99,7 @@ where
)) ))
})?; })?;
let blob_reader = match blob_service let mut blob_reader = match blob_service
.open_read(&digest) .open_read(&digest)
.await .await
.map_err(RenderError::StoreError)? .map_err(RenderError::StoreError)?
@ -117,7 +115,7 @@ where
.file( .file(
proto_file_node.executable, proto_file_node.executable,
proto_file_node.size, proto_file_node.size,
&mut blob_reader.compat(), &mut blob_reader,
) )
.await .await
.map_err(RenderError::NARWriterError)?; .map_err(RenderError::NARWriterError)?;