From 36b296609b0d4db633f78a6fa3685f61227af94a Mon Sep 17 00:00:00 2001 From: Florian Klink Date: Wed, 10 Apr 2024 14:54:11 +0300 Subject: [PATCH] refactor(tvix/nix-compat): reorganize wire and bytes Move everything bytes-related into its own module, and re-export both bytes and primitive in a flat space from wire/mod.rs. Expose this if a `wire` feature flag is set. We only have `async` stuff in here. Change-Id: Ia4ce4791f13a5759901cc9d6ce6bd6bbcca587c7 Reviewed-on: https://cl.tvl.fyi/c/depot/+/11389 Autosubmit: flokli Reviewed-by: raitobezarius Tested-by: BuildkiteCI Reviewed-by: Brian Olsen --- tvix/Cargo.nix | 5 +- tvix/nix-compat/Cargo.toml | 5 +- tvix/nix-compat/default.nix | 2 +- tvix/nix-compat/src/lib.rs | 2 + .../src/wire/{bytes.rs => bytes/mod.rs} | 32 ++++++++++-- .../wire/{bytes_reader.rs => bytes/reader.rs} | 4 +- .../wire/{bytes_writer.rs => bytes/writer.rs} | 28 ++-------- tvix/nix-compat/src/wire/mod.rs | 18 ++----- users/picnoir/tvix-daemon/Cargo.lock | 9 ---- users/picnoir/tvix-daemon/Cargo.nix | 52 +++++++------------ users/picnoir/tvix-daemon/Cargo.toml | 2 +- users/picnoir/tvix-daemon/src/main.rs | 6 +-- 12 files changed, 69 insertions(+), 96 deletions(-) rename tvix/nix-compat/src/wire/{bytes.rs => bytes/mod.rs} (86%) rename tvix/nix-compat/src/wire/{bytes_reader.rs => bytes/reader.rs} (99%) rename tvix/nix-compat/src/wire/{bytes_writer.rs => bytes/writer.rs} (94%) diff --git a/tvix/Cargo.nix b/tvix/Cargo.nix index 4d3e7d3b7..216dd2c6e 100644 --- a/tvix/Cargo.nix +++ b/tvix/Cargo.nix @@ -6814,12 +6814,13 @@ rec { } ]; features = { - "async" = [ "futures-util" "tokio" "pin-project-lite" ]; + "async" = [ "futures-util" ]; "futures-util" = [ "dep:futures-util" ]; "pin-project-lite" = [ "dep:pin-project-lite" ]; "tokio" = [ "dep:tokio" ]; + "wire" = [ "tokio" "pin-project-lite" ]; }; - resolvedDefaultFeatures = [ "async" "futures-util" "pin-project-lite" "tokio" ]; + resolvedDefaultFeatures = [ "async" "futures-util" "pin-project-lite" "tokio" "wire" ]; }; "nom" = rec { crateName = "nom"; diff --git a/tvix/nix-compat/Cargo.toml b/tvix/nix-compat/Cargo.toml index daa718f43..805640942 100644 --- a/tvix/nix-compat/Cargo.toml +++ b/tvix/nix-compat/Cargo.toml @@ -4,7 +4,10 @@ version = "0.1.0" edition = "2021" [features] -async = ["futures-util", "tokio", "pin-project-lite"] +# async NAR writer +async = ["futures-util"] +# code emitting low-level packets used in the daemon protocol. +wire = ["tokio", "pin-project-lite"] [dependencies] bitflags = "2.4.1" diff --git a/tvix/nix-compat/default.nix b/tvix/nix-compat/default.nix index d6169f133..9df76e12f 100644 --- a/tvix/nix-compat/default.nix +++ b/tvix/nix-compat/default.nix @@ -3,5 +3,5 @@ depot.tvix.crates.workspaceMembers.nix-compat.build.override { runTests = true; # make sure we also enable async here, so run the tests behind that feature flag. - features = [ "default" "async" ]; + features = [ "default" "async" "wire" ]; } diff --git a/tvix/nix-compat/src/lib.rs b/tvix/nix-compat/src/lib.rs index 2e90aaa0b..da8ac2a6c 100644 --- a/tvix/nix-compat/src/lib.rs +++ b/tvix/nix-compat/src/lib.rs @@ -6,4 +6,6 @@ pub mod nixbase32; pub mod nixhash; pub mod path_info; pub mod store_path; + +#[cfg(feature = "wire")] pub mod wire; diff --git a/tvix/nix-compat/src/wire/bytes.rs b/tvix/nix-compat/src/wire/bytes/mod.rs similarity index 86% rename from tvix/nix-compat/src/wire/bytes.rs rename to tvix/nix-compat/src/wire/bytes/mod.rs index d299eea65..9487536eb 100644 --- a/tvix/nix-compat/src/wire/bytes.rs +++ b/tvix/nix-compat/src/wire/bytes/mod.rs @@ -2,13 +2,20 @@ use std::{ io::{Error, ErrorKind}, ops::RangeBounds, }; - use tokio::io::{AsyncReadExt, AsyncWriteExt}; +mod reader; +pub use reader::BytesReader; +mod writer; +pub use writer::BytesWriter; + use super::primitive; /// 8 null bytes, used to write out padding. -pub(crate) const EMPTY_BYTES: &[u8; 8] = &[0u8; 8]; +const EMPTY_BYTES: &[u8; 8] = &[0u8; 8]; + +/// The length of the size field, in bytes is always 8. +const LEN_SIZE: usize = 8; #[allow(dead_code)] /// Read a "bytes wire packet" from the AsyncRead. @@ -116,7 +123,7 @@ pub async fn write_bytes>( /// Computes the number of bytes we should add to len (a length in /// bytes) to be alined on 64 bits (8 bytes). -pub(crate) fn padding_len(len: u64) -> u8 { +fn padding_len(len: u64) -> u8 { let modulo = len % 8; if modulo == 0 { 0 @@ -125,6 +132,25 @@ pub(crate) fn padding_len(len: u64) -> u8 { } } +/// Models the position inside a "bytes wire packet" that the reader or writer +/// is in. +/// It can be in three different stages, inside size, payload or padding fields. +/// The number tracks the number of bytes written inside the specific field. +/// There shall be no ambiguous states, at the end of a stage we immediately +/// move to the beginning of the next one: +/// - Size(LEN_SIZE) must be expressed as Payload(0) +/// - Payload(self.payload_len) must be expressed as Padding(0) +/// There's one exception - Size(LEN_SIZE) in the reader represents a failure +/// state we enter in case the allowed size doesn't match the allowed range. +/// +/// Padding(padding_len) means we're at the end of the bytes wire packet. +#[derive(Clone, Debug, PartialEq, Eq)] +enum BytesPacketPosition { + Size(usize), + Payload(u64), + Padding(usize), +} + #[cfg(test)] mod tests { use tokio_test::{assert_ok, io::Builder}; diff --git a/tvix/nix-compat/src/wire/bytes_reader.rs b/tvix/nix-compat/src/wire/bytes/reader.rs similarity index 99% rename from tvix/nix-compat/src/wire/bytes_reader.rs rename to tvix/nix-compat/src/wire/bytes/reader.rs index b1dcebcc6..4c450b55d 100644 --- a/tvix/nix-compat/src/wire/bytes_reader.rs +++ b/tvix/nix-compat/src/wire/bytes/reader.rs @@ -5,9 +5,7 @@ use std::{ }; use tokio::io::AsyncRead; -use crate::wire::bytes::padding_len; - -use super::bytes_writer::{BytesPacketPosition, LEN_SIZE}; +use super::{padding_len, BytesPacketPosition, LEN_SIZE}; pin_project! { /// Reads a "bytes wire packet" from the underlying reader. diff --git a/tvix/nix-compat/src/wire/bytes_writer.rs b/tvix/nix-compat/src/wire/bytes/writer.rs similarity index 94% rename from tvix/nix-compat/src/wire/bytes_writer.rs rename to tvix/nix-compat/src/wire/bytes/writer.rs index 8bd0a2d00..f278b8335 100644 --- a/tvix/nix-compat/src/wire/bytes_writer.rs +++ b/tvix/nix-compat/src/wire/bytes/writer.rs @@ -3,10 +3,7 @@ use std::task::{ready, Poll}; use tokio::io::AsyncWrite; -use super::bytes::EMPTY_BYTES; - -/// The length of the size field, in bytes is always 8. -pub(crate) const LEN_SIZE: usize = 8; +use super::{padding_len, BytesPacketPosition, EMPTY_BYTES, LEN_SIZE}; pin_project! { /// Writes a "bytes wire packet" to the underlying writer. @@ -44,25 +41,6 @@ pin_project! { } } -/// Models the position inside a "bytes wire packet" that the reader or writer -/// is in. -/// It can be in three different stages, inside size, payload or padding fields. -/// The number tracks the number of bytes written inside the specific field. -/// There shall be no ambiguous states, at the end of a stage we immediately -/// move to the beginning of the next one: -/// - Size(LEN_SIZE) must be expressed as Payload(0) -/// - Payload(self.payload_len) must be expressed as Padding(0) -/// There's one exception - Size(LEN_SIZE) in the reader represents a failure -/// state we enter in case the allowed size doesn't match the allowed range. -/// -/// Padding(padding_len) means we're at the end of the bytes wire packet. -#[derive(Clone, Debug, PartialEq, Eq)] -pub(crate) enum BytesPacketPosition { - Size(usize), - Payload(u64), - Padding(usize), -} - impl BytesWriter where W: AsyncWrite, @@ -186,7 +164,7 @@ where } BytesPacketPosition::Padding(pos) => { // Write remaining padding, if there is padding to write. - let total_padding_len = super::bytes::padding_len(*this.payload_len) as usize; + let total_padding_len = padding_len(*this.payload_len) as usize; if pos != total_padding_len { let bytes_written = ensure_nonzero_bytes_written(ready!(this @@ -217,7 +195,7 @@ where // After a flush, being inside the padding state, and at the end of the padding // is the only way to prevent a dirty shutdown. if let BytesPacketPosition::Padding(pos) = *this.state { - let padding_len = super::bytes::padding_len(*this.payload_len) as usize; + let padding_len = padding_len(*this.payload_len) as usize; if padding_len == pos { // Shutdown the underlying writer return this.inner.poll_shutdown(cx); diff --git a/tvix/nix-compat/src/wire/mod.rs b/tvix/nix-compat/src/wire/mod.rs index 56ea364df..83d4a7c9b 100644 --- a/tvix/nix-compat/src/wire/mod.rs +++ b/tvix/nix-compat/src/wire/mod.rs @@ -1,20 +1,10 @@ //! Module parsing and emitting the wire format used by Nix, both in the //! nix-daemon protocol as well as in the NAR format. -#[cfg(feature = "async")] -pub mod bytes; +mod bytes; +pub use bytes::*; -#[cfg(feature = "async")] -mod bytes_reader; -#[cfg(feature = "async")] -mod bytes_writer; -#[cfg(feature = "async")] -pub use bytes_reader::BytesReader; -#[cfg(feature = "async")] -pub use bytes_writer::BytesWriter; +mod primitive; +pub use primitive::*; -#[cfg(feature = "async")] -pub mod primitive; - -#[cfg(feature = "async")] pub mod worker_protocol; diff --git a/users/picnoir/tvix-daemon/Cargo.lock b/users/picnoir/tvix-daemon/Cargo.lock index b3b7e43af..683203f5c 100644 --- a/users/picnoir/tvix-daemon/Cargo.lock +++ b/users/picnoir/tvix-daemon/Cargo.lock @@ -441,12 +441,6 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" -[[package]] -name = "futures-io" -version = "0.3.30" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" - [[package]] name = "futures-macro" version = "0.3.30" @@ -477,10 +471,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" dependencies = [ "futures-core", - "futures-io", "futures-macro", "futures-task", - "memchr", "pin-project-lite", "pin-utils", "slab", @@ -753,7 +745,6 @@ dependencies = [ "ed25519", "ed25519-dalek", "enum-primitive-derive", - "futures-util", "glob", "nom", "num-traits", diff --git a/users/picnoir/tvix-daemon/Cargo.nix b/users/picnoir/tvix-daemon/Cargo.nix index 8dc9b5417..d73a65c82 100644 --- a/users/picnoir/tvix-daemon/Cargo.nix +++ b/users/picnoir/tvix-daemon/Cargo.nix @@ -1,5 +1,5 @@ -# This file was @generated by crate2nix 0.12.0 with the command: -# "generate" +# This file was @generated by crate2nix 0.13.0 with the command: +# "generate" "--all-features" # See https://github.com/kolloch/crate2nix for more info. { nixpkgs ? @@ -1386,16 +1386,6 @@ rec { }; resolvedDefaultFeatures = [ "alloc" "default" "std" ]; }; - "futures-io" = rec { - crateName = "futures-io"; - version = "0.3.30"; - edition = "2018"; - sha256 = "1hgh25isvsr4ybibywhr4dpys8mjnscw4wfxxwca70cn1gi26im4"; - features = { - "default" = [ "std" ]; - }; - resolvedDefaultFeatures = [ "std" ]; - }; "futures-macro" = rec { crateName = "futures-macro"; version = "0.3.30"; @@ -1452,13 +1442,6 @@ rec { packageId = "futures-core"; usesDefaultFeatures = false; } - { - name = "futures-io"; - packageId = "futures-io"; - optional = true; - usesDefaultFeatures = false; - features = [ "std" ]; - } { name = "futures-macro"; packageId = "futures-macro"; @@ -1470,11 +1453,6 @@ rec { packageId = "futures-task"; usesDefaultFeatures = false; } - { - name = "memchr"; - packageId = "memchr"; - optional = true; - } { name = "pin-project-lite"; packageId = "pin-project-lite"; @@ -1511,7 +1489,7 @@ rec { "unstable" = [ "futures-core/unstable" "futures-task/unstable" ]; "write-all-vectored" = [ "io" ]; }; - resolvedDefaultFeatures = [ "alloc" "async-await" "async-await-macro" "default" "futures-io" "futures-macro" "io" "memchr" "slab" "std" ]; + resolvedDefaultFeatures = [ "alloc" "async-await" "async-await-macro" "default" "futures-macro" "slab" "std" ]; }; "generic-array" = rec { crateName = "generic-array"; @@ -2340,12 +2318,6 @@ rec { name = "enum-primitive-derive"; packageId = "enum-primitive-derive"; } - { - name = "futures-util"; - packageId = "futures-util"; - optional = true; - features = [ "io" ]; - } { name = "glob"; packageId = "glob"; @@ -2394,12 +2366,13 @@ rec { } ]; features = { - "async" = [ "futures-util" "tokio" "pin-project-lite" ]; + "async" = [ "futures-util" ]; "futures-util" = [ "dep:futures-util" ]; "pin-project-lite" = [ "dep:pin-project-lite" ]; "tokio" = [ "dep:tokio" ]; + "wire" = [ "tokio" "pin-project-lite" ]; }; - resolvedDefaultFeatures = [ "async" "futures-util" "pin-project-lite" "tokio" ]; + resolvedDefaultFeatures = [ "pin-project-lite" "tokio" "wire" ]; }; "nom" = rec { crateName = "nom"; @@ -4153,7 +4126,7 @@ rec { { name = "nix-compat"; packageId = "nix-compat"; - features = [ "async" ]; + features = [ "wire" ]; } { name = "tokio"; @@ -5193,6 +5166,7 @@ rec { ( _: { buildTests = true; + release = false; } ); # If the user hasn't set any pre/post commands, we don't want to @@ -5217,6 +5191,16 @@ rec { # recreate a file hierarchy as when running tests with cargo # the source for test data + # It's necessary to locate the source in $NIX_BUILD_TOP/source/ + # instead of $NIX_BUILD_TOP/ + # because we compiled those test binaries in the former and not the latter. + # So all paths will expect source tree to be there and not in the build top directly. + # For example: $NIX_BUILD_TOP := /build in general, if you ask yourself. + # TODO(raitobezarius): I believe there could be more edge cases if `crate.sourceRoot` + # do exist but it's very hard to reason about them, so let's wait until the first bug report. + mkdir -p source/ + cd source/ + ${pkgs.buildPackages.xorg.lndir}/bin/lndir ${crate.src} # build outputs diff --git a/users/picnoir/tvix-daemon/Cargo.toml b/users/picnoir/tvix-daemon/Cargo.toml index 091116f5b..2aca99f20 100644 --- a/users/picnoir/tvix-daemon/Cargo.toml +++ b/users/picnoir/tvix-daemon/Cargo.toml @@ -4,7 +4,7 @@ version = "0.1.0" edition = "2021" [dependencies] -nix-compat = { path = "../../../tvix/nix-compat/", features = ["async"] } +nix-compat = { path = "../../../tvix/nix-compat", features = ["wire"] } tokio-listener = "0.3.1" tokio = { version = "1.36.0", features = ["full"] } tracing-subscriber = "0.3.18" diff --git a/users/picnoir/tvix-daemon/src/main.rs b/users/picnoir/tvix-daemon/src/main.rs index 82220637c..335010012 100644 --- a/users/picnoir/tvix-daemon/src/main.rs +++ b/users/picnoir/tvix-daemon/src/main.rs @@ -4,7 +4,7 @@ use tokio_listener::{self, SystemOptions, UserOptions}; use tracing::{debug, error, info, instrument, Level}; use nix_compat::wire::{ - primitive, + self, worker_protocol::{self, server_handshake_client, ClientSettings, Trust}, }; @@ -80,7 +80,7 @@ where // TODO: implement logging. For now, we'll just send // STDERR_LAST, which is good enough to get Nix respond to // us. - primitive::write_u64(&mut client_connection.conn, worker_protocol::STDERR_LAST) + wire::write_u64(&mut client_connection.conn, worker_protocol::STDERR_LAST) .await .unwrap(); loop { @@ -112,6 +112,6 @@ where worker_protocol::read_client_settings(&mut conn.conn, conn.version_minor).await?; // The client expects us to send some logs when we're processing // the settings. Sending STDERR_LAST signal we're done processing. - primitive::write_u64(&mut conn.conn, worker_protocol::STDERR_LAST).await?; + wire::write_u64(&mut conn.conn, worker_protocol::STDERR_LAST).await?; Ok(settings) }