refactor(tvix/nix-compat): reorganize wire and bytes

Move everything bytes-related into its own module, and re-export
both bytes and primitive in a flat space from wire/mod.rs.

Expose this if a `wire` feature flag is set. We only have `async` stuff
in here.

Change-Id: Ia4ce4791f13a5759901cc9d6ce6bd6bbcca587c7
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11389
Autosubmit: flokli <flokli@flokli.de>
Reviewed-by: raitobezarius <tvl@lahfa.xyz>
Tested-by: BuildkiteCI
Reviewed-by: Brian Olsen <me@griff.name>
This commit is contained in:
Florian Klink 2024-04-10 14:54:11 +03:00 committed by flokli
parent 839c971a0f
commit 36b296609b
12 changed files with 69 additions and 96 deletions

View file

@ -6814,12 +6814,13 @@ rec {
} }
]; ];
features = { features = {
"async" = [ "futures-util" "tokio" "pin-project-lite" ]; "async" = [ "futures-util" ];
"futures-util" = [ "dep:futures-util" ]; "futures-util" = [ "dep:futures-util" ];
"pin-project-lite" = [ "dep:pin-project-lite" ]; "pin-project-lite" = [ "dep:pin-project-lite" ];
"tokio" = [ "dep:tokio" ]; "tokio" = [ "dep:tokio" ];
"wire" = [ "tokio" "pin-project-lite" ];
}; };
resolvedDefaultFeatures = [ "async" "futures-util" "pin-project-lite" "tokio" ]; resolvedDefaultFeatures = [ "async" "futures-util" "pin-project-lite" "tokio" "wire" ];
}; };
"nom" = rec { "nom" = rec {
crateName = "nom"; crateName = "nom";

View file

@ -4,7 +4,10 @@ version = "0.1.0"
edition = "2021" edition = "2021"
[features] [features]
async = ["futures-util", "tokio", "pin-project-lite"] # async NAR writer
async = ["futures-util"]
# code emitting low-level packets used in the daemon protocol.
wire = ["tokio", "pin-project-lite"]
[dependencies] [dependencies]
bitflags = "2.4.1" bitflags = "2.4.1"

View file

@ -3,5 +3,5 @@
depot.tvix.crates.workspaceMembers.nix-compat.build.override { depot.tvix.crates.workspaceMembers.nix-compat.build.override {
runTests = true; runTests = true;
# make sure we also enable async here, so run the tests behind that feature flag. # make sure we also enable async here, so run the tests behind that feature flag.
features = [ "default" "async" ]; features = [ "default" "async" "wire" ];
} }

View file

@ -6,4 +6,6 @@ pub mod nixbase32;
pub mod nixhash; pub mod nixhash;
pub mod path_info; pub mod path_info;
pub mod store_path; pub mod store_path;
#[cfg(feature = "wire")]
pub mod wire; pub mod wire;

View file

@ -2,13 +2,20 @@ use std::{
io::{Error, ErrorKind}, io::{Error, ErrorKind},
ops::RangeBounds, ops::RangeBounds,
}; };
use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::io::{AsyncReadExt, AsyncWriteExt};
mod reader;
pub use reader::BytesReader;
mod writer;
pub use writer::BytesWriter;
use super::primitive; use super::primitive;
/// 8 null bytes, used to write out padding. /// 8 null bytes, used to write out padding.
pub(crate) const EMPTY_BYTES: &[u8; 8] = &[0u8; 8]; const EMPTY_BYTES: &[u8; 8] = &[0u8; 8];
/// The length of the size field, in bytes is always 8.
const LEN_SIZE: usize = 8;
#[allow(dead_code)] #[allow(dead_code)]
/// Read a "bytes wire packet" from the AsyncRead. /// Read a "bytes wire packet" from the AsyncRead.
@ -116,7 +123,7 @@ pub async fn write_bytes<W: AsyncWriteExt + Unpin, B: AsRef<[u8]>>(
/// Computes the number of bytes we should add to len (a length in /// Computes the number of bytes we should add to len (a length in
/// bytes) to be alined on 64 bits (8 bytes). /// bytes) to be alined on 64 bits (8 bytes).
pub(crate) fn padding_len(len: u64) -> u8 { fn padding_len(len: u64) -> u8 {
let modulo = len % 8; let modulo = len % 8;
if modulo == 0 { if modulo == 0 {
0 0
@ -125,6 +132,25 @@ pub(crate) fn padding_len(len: u64) -> u8 {
} }
} }
/// Models the position inside a "bytes wire packet" that the reader or writer
/// is in.
/// It can be in three different stages, inside size, payload or padding fields.
/// The number tracks the number of bytes written inside the specific field.
/// There shall be no ambiguous states, at the end of a stage we immediately
/// move to the beginning of the next one:
/// - Size(LEN_SIZE) must be expressed as Payload(0)
/// - Payload(self.payload_len) must be expressed as Padding(0)
/// There's one exception - Size(LEN_SIZE) in the reader represents a failure
/// state we enter in case the allowed size doesn't match the allowed range.
///
/// Padding(padding_len) means we're at the end of the bytes wire packet.
#[derive(Clone, Debug, PartialEq, Eq)]
enum BytesPacketPosition {
Size(usize),
Payload(u64),
Padding(usize),
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use tokio_test::{assert_ok, io::Builder}; use tokio_test::{assert_ok, io::Builder};

View file

@ -5,9 +5,7 @@ use std::{
}; };
use tokio::io::AsyncRead; use tokio::io::AsyncRead;
use crate::wire::bytes::padding_len; use super::{padding_len, BytesPacketPosition, LEN_SIZE};
use super::bytes_writer::{BytesPacketPosition, LEN_SIZE};
pin_project! { pin_project! {
/// Reads a "bytes wire packet" from the underlying reader. /// Reads a "bytes wire packet" from the underlying reader.

View file

@ -3,10 +3,7 @@ use std::task::{ready, Poll};
use tokio::io::AsyncWrite; use tokio::io::AsyncWrite;
use super::bytes::EMPTY_BYTES; use super::{padding_len, BytesPacketPosition, EMPTY_BYTES, LEN_SIZE};
/// The length of the size field, in bytes is always 8.
pub(crate) const LEN_SIZE: usize = 8;
pin_project! { pin_project! {
/// Writes a "bytes wire packet" to the underlying writer. /// Writes a "bytes wire packet" to the underlying writer.
@ -44,25 +41,6 @@ pin_project! {
} }
} }
/// Models the position inside a "bytes wire packet" that the reader or writer
/// is in.
/// It can be in three different stages, inside size, payload or padding fields.
/// The number tracks the number of bytes written inside the specific field.
/// There shall be no ambiguous states, at the end of a stage we immediately
/// move to the beginning of the next one:
/// - Size(LEN_SIZE) must be expressed as Payload(0)
/// - Payload(self.payload_len) must be expressed as Padding(0)
/// There's one exception - Size(LEN_SIZE) in the reader represents a failure
/// state we enter in case the allowed size doesn't match the allowed range.
///
/// Padding(padding_len) means we're at the end of the bytes wire packet.
#[derive(Clone, Debug, PartialEq, Eq)]
pub(crate) enum BytesPacketPosition {
Size(usize),
Payload(u64),
Padding(usize),
}
impl<W> BytesWriter<W> impl<W> BytesWriter<W>
where where
W: AsyncWrite, W: AsyncWrite,
@ -186,7 +164,7 @@ where
} }
BytesPacketPosition::Padding(pos) => { BytesPacketPosition::Padding(pos) => {
// Write remaining padding, if there is padding to write. // Write remaining padding, if there is padding to write.
let total_padding_len = super::bytes::padding_len(*this.payload_len) as usize; let total_padding_len = padding_len(*this.payload_len) as usize;
if pos != total_padding_len { if pos != total_padding_len {
let bytes_written = ensure_nonzero_bytes_written(ready!(this let bytes_written = ensure_nonzero_bytes_written(ready!(this
@ -217,7 +195,7 @@ where
// After a flush, being inside the padding state, and at the end of the padding // After a flush, being inside the padding state, and at the end of the padding
// is the only way to prevent a dirty shutdown. // is the only way to prevent a dirty shutdown.
if let BytesPacketPosition::Padding(pos) = *this.state { if let BytesPacketPosition::Padding(pos) = *this.state {
let padding_len = super::bytes::padding_len(*this.payload_len) as usize; let padding_len = padding_len(*this.payload_len) as usize;
if padding_len == pos { if padding_len == pos {
// Shutdown the underlying writer // Shutdown the underlying writer
return this.inner.poll_shutdown(cx); return this.inner.poll_shutdown(cx);

View file

@ -1,20 +1,10 @@
//! Module parsing and emitting the wire format used by Nix, both in the //! Module parsing and emitting the wire format used by Nix, both in the
//! nix-daemon protocol as well as in the NAR format. //! nix-daemon protocol as well as in the NAR format.
#[cfg(feature = "async")] mod bytes;
pub mod bytes; pub use bytes::*;
#[cfg(feature = "async")] mod primitive;
mod bytes_reader; pub use primitive::*;
#[cfg(feature = "async")]
mod bytes_writer;
#[cfg(feature = "async")]
pub use bytes_reader::BytesReader;
#[cfg(feature = "async")]
pub use bytes_writer::BytesWriter;
#[cfg(feature = "async")]
pub mod primitive;
#[cfg(feature = "async")]
pub mod worker_protocol; pub mod worker_protocol;

View file

@ -441,12 +441,6 @@ version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d"
[[package]]
name = "futures-io"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1"
[[package]] [[package]]
name = "futures-macro" name = "futures-macro"
version = "0.3.30" version = "0.3.30"
@ -477,10 +471,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48"
dependencies = [ dependencies = [
"futures-core", "futures-core",
"futures-io",
"futures-macro", "futures-macro",
"futures-task", "futures-task",
"memchr",
"pin-project-lite", "pin-project-lite",
"pin-utils", "pin-utils",
"slab", "slab",
@ -753,7 +745,6 @@ dependencies = [
"ed25519", "ed25519",
"ed25519-dalek", "ed25519-dalek",
"enum-primitive-derive", "enum-primitive-derive",
"futures-util",
"glob", "glob",
"nom", "nom",
"num-traits", "num-traits",

View file

@ -1,5 +1,5 @@
# This file was @generated by crate2nix 0.12.0 with the command: # This file was @generated by crate2nix 0.13.0 with the command:
# "generate" # "generate" "--all-features"
# See https://github.com/kolloch/crate2nix for more info. # See https://github.com/kolloch/crate2nix for more info.
{ nixpkgs ? <nixpkgs> { nixpkgs ? <nixpkgs>
@ -1386,16 +1386,6 @@ rec {
}; };
resolvedDefaultFeatures = [ "alloc" "default" "std" ]; resolvedDefaultFeatures = [ "alloc" "default" "std" ];
}; };
"futures-io" = rec {
crateName = "futures-io";
version = "0.3.30";
edition = "2018";
sha256 = "1hgh25isvsr4ybibywhr4dpys8mjnscw4wfxxwca70cn1gi26im4";
features = {
"default" = [ "std" ];
};
resolvedDefaultFeatures = [ "std" ];
};
"futures-macro" = rec { "futures-macro" = rec {
crateName = "futures-macro"; crateName = "futures-macro";
version = "0.3.30"; version = "0.3.30";
@ -1452,13 +1442,6 @@ rec {
packageId = "futures-core"; packageId = "futures-core";
usesDefaultFeatures = false; usesDefaultFeatures = false;
} }
{
name = "futures-io";
packageId = "futures-io";
optional = true;
usesDefaultFeatures = false;
features = [ "std" ];
}
{ {
name = "futures-macro"; name = "futures-macro";
packageId = "futures-macro"; packageId = "futures-macro";
@ -1470,11 +1453,6 @@ rec {
packageId = "futures-task"; packageId = "futures-task";
usesDefaultFeatures = false; usesDefaultFeatures = false;
} }
{
name = "memchr";
packageId = "memchr";
optional = true;
}
{ {
name = "pin-project-lite"; name = "pin-project-lite";
packageId = "pin-project-lite"; packageId = "pin-project-lite";
@ -1511,7 +1489,7 @@ rec {
"unstable" = [ "futures-core/unstable" "futures-task/unstable" ]; "unstable" = [ "futures-core/unstable" "futures-task/unstable" ];
"write-all-vectored" = [ "io" ]; "write-all-vectored" = [ "io" ];
}; };
resolvedDefaultFeatures = [ "alloc" "async-await" "async-await-macro" "default" "futures-io" "futures-macro" "io" "memchr" "slab" "std" ]; resolvedDefaultFeatures = [ "alloc" "async-await" "async-await-macro" "default" "futures-macro" "slab" "std" ];
}; };
"generic-array" = rec { "generic-array" = rec {
crateName = "generic-array"; crateName = "generic-array";
@ -2340,12 +2318,6 @@ rec {
name = "enum-primitive-derive"; name = "enum-primitive-derive";
packageId = "enum-primitive-derive"; packageId = "enum-primitive-derive";
} }
{
name = "futures-util";
packageId = "futures-util";
optional = true;
features = [ "io" ];
}
{ {
name = "glob"; name = "glob";
packageId = "glob"; packageId = "glob";
@ -2394,12 +2366,13 @@ rec {
} }
]; ];
features = { features = {
"async" = [ "futures-util" "tokio" "pin-project-lite" ]; "async" = [ "futures-util" ];
"futures-util" = [ "dep:futures-util" ]; "futures-util" = [ "dep:futures-util" ];
"pin-project-lite" = [ "dep:pin-project-lite" ]; "pin-project-lite" = [ "dep:pin-project-lite" ];
"tokio" = [ "dep:tokio" ]; "tokio" = [ "dep:tokio" ];
"wire" = [ "tokio" "pin-project-lite" ];
}; };
resolvedDefaultFeatures = [ "async" "futures-util" "pin-project-lite" "tokio" ]; resolvedDefaultFeatures = [ "pin-project-lite" "tokio" "wire" ];
}; };
"nom" = rec { "nom" = rec {
crateName = "nom"; crateName = "nom";
@ -4153,7 +4126,7 @@ rec {
{ {
name = "nix-compat"; name = "nix-compat";
packageId = "nix-compat"; packageId = "nix-compat";
features = [ "async" ]; features = [ "wire" ];
} }
{ {
name = "tokio"; name = "tokio";
@ -5193,6 +5166,7 @@ rec {
( (
_: { _: {
buildTests = true; buildTests = true;
release = false;
} }
); );
# If the user hasn't set any pre/post commands, we don't want to # If the user hasn't set any pre/post commands, we don't want to
@ -5217,6 +5191,16 @@ rec {
# recreate a file hierarchy as when running tests with cargo # recreate a file hierarchy as when running tests with cargo
# the source for test data # the source for test data
# It's necessary to locate the source in $NIX_BUILD_TOP/source/
# instead of $NIX_BUILD_TOP/
# because we compiled those test binaries in the former and not the latter.
# So all paths will expect source tree to be there and not in the build top directly.
# For example: $NIX_BUILD_TOP := /build in general, if you ask yourself.
# TODO(raitobezarius): I believe there could be more edge cases if `crate.sourceRoot`
# do exist but it's very hard to reason about them, so let's wait until the first bug report.
mkdir -p source/
cd source/
${pkgs.buildPackages.xorg.lndir}/bin/lndir ${crate.src} ${pkgs.buildPackages.xorg.lndir}/bin/lndir ${crate.src}
# build outputs # build outputs

View file

@ -4,7 +4,7 @@ version = "0.1.0"
edition = "2021" edition = "2021"
[dependencies] [dependencies]
nix-compat = { path = "../../../tvix/nix-compat/", features = ["async"] } nix-compat = { path = "../../../tvix/nix-compat", features = ["wire"] }
tokio-listener = "0.3.1" tokio-listener = "0.3.1"
tokio = { version = "1.36.0", features = ["full"] } tokio = { version = "1.36.0", features = ["full"] }
tracing-subscriber = "0.3.18" tracing-subscriber = "0.3.18"

View file

@ -4,7 +4,7 @@ use tokio_listener::{self, SystemOptions, UserOptions};
use tracing::{debug, error, info, instrument, Level}; use tracing::{debug, error, info, instrument, Level};
use nix_compat::wire::{ use nix_compat::wire::{
primitive, self,
worker_protocol::{self, server_handshake_client, ClientSettings, Trust}, worker_protocol::{self, server_handshake_client, ClientSettings, Trust},
}; };
@ -80,7 +80,7 @@ where
// TODO: implement logging. For now, we'll just send // TODO: implement logging. For now, we'll just send
// STDERR_LAST, which is good enough to get Nix respond to // STDERR_LAST, which is good enough to get Nix respond to
// us. // us.
primitive::write_u64(&mut client_connection.conn, worker_protocol::STDERR_LAST) wire::write_u64(&mut client_connection.conn, worker_protocol::STDERR_LAST)
.await .await
.unwrap(); .unwrap();
loop { loop {
@ -112,6 +112,6 @@ where
worker_protocol::read_client_settings(&mut conn.conn, conn.version_minor).await?; worker_protocol::read_client_settings(&mut conn.conn, conn.version_minor).await?;
// The client expects us to send some logs when we're processing // The client expects us to send some logs when we're processing
// the settings. Sending STDERR_LAST signal we're done processing. // the settings. Sending STDERR_LAST signal we're done processing.
primitive::write_u64(&mut conn.conn, worker_protocol::STDERR_LAST).await?; wire::write_u64(&mut conn.conn, worker_protocol::STDERR_LAST).await?;
Ok(settings) Ok(settings)
} }