feat(tvix/nix-compat): worker protocol operation parser

Change-Id: I7776635b17c44534223603d28cf59c7eebd976e0
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11229
Reviewed-by: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
This commit is contained in:
Picnoir 2024-03-21 09:52:21 +01:00 committed by picnoir picnoir
parent 508d67ad49
commit 21481b02b8
7 changed files with 206 additions and 11 deletions

View file

@ -14,9 +14,11 @@ bstr = { version = "1.6.0", features = ["alloc", "unicode", "serde"] }
data-encoding = "2.3.3"
ed25519 = "2.2.3"
ed25519-dalek = "2.1.0"
enum-primitive-derive = "0.3.0"
futures-util = { version = "0.3.30", features = ["io"], optional = true }
glob = "0.3.0"
nom = "7.1.3"
num-traits = "0.2.18"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
sha2 = "0.10.6"

View file

@ -6,3 +6,6 @@ pub mod bytes;
#[cfg(feature = "async")]
pub mod primitive;
#[cfg(feature = "async")]
pub mod worker_protocol;

View file

@ -0,0 +1,83 @@
use std::io::{Error, ErrorKind};
use enum_primitive_derive::Primitive;
use num_traits::{FromPrimitive, ToPrimitive};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use crate::wire::primitive;
/// Worker Operation
///
/// These operations are encoded as unsigned 64 bits before being sent
/// to the wire. See the [read_op] and
/// [write_op] operations to serialize/deserialize the
/// operation on the wire.
///
/// Note: for now, we're using the Nix 2.20 operation description. The
/// operations marked as obsolete are obsolete for Nix 2.20, not
/// necessarily for Nix 2.3. We'll revisit this later on.
#[derive(Debug, PartialEq, Primitive)]
pub enum Operation {
IsValidPath = 1,
HasSubstitutes = 3,
QueryPathHash = 4, // obsolete
QueryReferences = 5, // obsolete
QueryReferrers = 6,
AddToStore = 7,
AddTextToStore = 8, // obsolete since 1.25, Nix 3.0. Use WorkerProto::Op::AddToStore
BuildPaths = 9,
EnsurePath = 10,
AddTempRoot = 11,
AddIndirectRoot = 12,
SyncWithGC = 13,
FindRoots = 14,
ExportPath = 16, // obsolete
QueryDeriver = 18, // obsolete
SetOptions = 19,
CollectGarbage = 20,
QuerySubstitutablePathInfo = 21,
QueryDerivationOutputs = 22, // obsolete
QueryAllValidPaths = 23,
QueryFailedPaths = 24,
ClearFailedPaths = 25,
QueryPathInfo = 26,
ImportPaths = 27, // obsolete
QueryDerivationOutputNames = 28, // obsolete
QueryPathFromHashPart = 29,
QuerySubstitutablePathInfos = 30,
QueryValidPaths = 31,
QuerySubstitutablePaths = 32,
QueryValidDerivers = 33,
OptimiseStore = 34,
VerifyStore = 35,
BuildDerivation = 36,
AddSignatures = 37,
NarFromPath = 38,
AddToStoreNar = 39,
QueryMissing = 40,
QueryDerivationOutputMap = 41,
RegisterDrvOutput = 42,
QueryRealisation = 43,
AddMultipleToStore = 44,
AddBuildLog = 45,
BuildPathsWithResults = 46,
AddPermRoot = 47,
}
/// Read a worker [Operation] from the wire.
pub async fn read_op<R: AsyncReadExt + Unpin>(r: &mut R) -> std::io::Result<Operation> {
let op_number = primitive::read_u64(r).await?;
Operation::from_u64(op_number).ok_or(Error::new(
ErrorKind::Other,
format!("Invalid OP number {}", op_number),
))
}
/// Write a worker [Operation] to the wire.
pub async fn write_op<W: AsyncWriteExt + Unpin>(w: &mut W, op: &Operation) -> std::io::Result<()> {
let op = Operation::to_u64(op).ok_or(Error::new(
ErrorKind::Other,
format!("Can't convert the OP {:?} to u64", op),
))?;
w.write_u64(op).await
}