feat(nix-daemon): Implement QueryPathInfo and IsValidPath.

Change-Id: Ia601e2eae24a2bc13d8851b2e8ed9d6c1808bb35
Reviewed-on: https://cl.tvl.fyi/c/depot/+/12745
Reviewed-by: flokli <flokli@flokli.de>
Autosubmit: Vladimir Kryachko <v.kryachko@gmail.com>
Tested-by: BuildkiteCI
This commit is contained in:
Vova Kryachko 2024-11-08 14:22:23 -05:00 committed by Vladimir Kryachko
parent b564ed9d43
commit 9d114bf040
4 changed files with 182 additions and 12 deletions

View file

@ -10,11 +10,16 @@ use super::{
worker_protocol::{server_handshake_client, ClientSettings, Operation, Trust, STDERR_LAST},
NixDaemonIO,
};
use crate::wire::{
de::{NixRead, NixReader},
ser::{NixSerialize, NixWrite, NixWriter, NixWriterBuilder},
ProtocolVersion,
use crate::{
store_path::StorePath,
wire::{
de::{NixRead, NixReader},
ser::{NixSerialize, NixWrite, NixWriter, NixWriterBuilder},
ProtocolVersion,
},
};
use crate::{nix_daemon::types::NixError, worker_protocol::STDERR_ERROR};
/// Handles a single connection with a nix client.
@ -105,6 +110,7 @@ where
/// Main client connection loop, reads client's requests and responds to them accordingly.
pub async fn handle_client(&mut self) -> Result<(), std::io::Error> {
let io = self.io.clone();
loop {
let op_code = self.reader.read_number().await?;
match TryInto::<Operation>::try_into(op_code) {
@ -113,6 +119,15 @@ where
self.client_settings = self.reader.read_value().await?;
self.handle(async { Ok(()) }).await?
}
Operation::QueryPathInfo => {
let path: StorePath<String> = self.reader.read_value().await?;
self.handle(io.query_path_info(&path)).await?
}
Operation::IsValidPath => {
let path: StorePath<String> = self.reader.read_value().await?;
self.handle(async { Ok(io.query_path_info(&path).await?.is_some()) })
.await?
}
_ => {
return Err(std::io::Error::other(format!(
"Operation {operation:?} is not implemented"
@ -168,18 +183,26 @@ where
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use std::{io::Result, sync::Arc};
use tokio::io::AsyncWriteExt;
use crate::{
nix_daemon::types::UnkeyedValidPathInfo,
wire::ProtocolVersion,
worker_protocol::{ClientSettings, WORKER_MAGIC_1, WORKER_MAGIC_2},
};
struct MockDaemonIO {}
impl NixDaemonIO for MockDaemonIO {}
impl NixDaemonIO for MockDaemonIO {
async fn query_path_info(
&self,
_path: &crate::store_path::StorePath<String>,
) -> Result<Option<UnkeyedValidPathInfo>> {
Ok(None)
}
}
#[tokio::test]
async fn test_daemon_initialization() {

View file

@ -1,8 +1,18 @@
pub mod worker_protocol;
use std::io::Result;
use types::UnkeyedValidPathInfo;
use crate::store_path::StorePath;
pub mod handler;
pub mod types;
pub mod worker_protocol;
/// Represents all possible operations over the nix-daemon protocol.
pub trait NixDaemonIO {
// TODO add methods to it.
fn query_path_info(
&self,
path: &StorePath<String>,
) -> impl std::future::Future<Output = Result<Option<UnkeyedValidPathInfo>>> + Send;
}

View file

@ -1,5 +1,17 @@
use std::{fmt::Display, ops::Deref};
use nix_compat_derive::{NixDeserialize, NixSerialize};
use crate::{
narinfo::Signature,
nixhash::CAHash,
store_path::StorePath,
wire::{
de::{NixDeserialize, NixRead},
ser::{NixSerialize, NixWrite},
},
};
/// Marker type that consumes/sends and ignores a u64.
#[derive(Clone, Debug, NixDeserialize, NixSerialize)]
#[nix(from = "u64", into = "u64")]
@ -60,3 +72,98 @@ impl NixError {
}
}
}
nix_compat_derive::nix_serialize_remote!(#[nix(display)] Signature<String>);
impl NixSerialize for CAHash {
async fn serialize<W>(&self, writer: &mut W) -> Result<(), W::Error>
where
W: NixWrite,
{
writer.write_value(&self.to_nix_nixbase32_string()).await
}
}
impl NixSerialize for Option<CAHash> {
async fn serialize<W>(&self, writer: &mut W) -> Result<(), W::Error>
where
W: NixWrite,
{
match self {
Some(value) => writer.write_value(value).await,
None => writer.write_value("").await,
}
}
}
impl NixSerialize for Option<UnkeyedValidPathInfo> {
async fn serialize<W>(&self, writer: &mut W) -> Result<(), W::Error>
where
W: NixWrite,
{
match self {
Some(value) => {
writer.write_value(&true).await?;
writer.write_value(value).await
}
None => writer.write_value(&false).await,
}
}
}
// Custom implementation since FromStr does not use from_absolute_path
impl NixDeserialize for StorePath<String> {
async fn try_deserialize<R>(reader: &mut R) -> Result<Option<Self>, R::Error>
where
R: ?Sized + NixRead + Send,
{
use crate::wire::de::Error;
if let Some(buf) = reader.try_read_bytes().await? {
let result = StorePath::<String>::from_absolute_path(&buf);
result.map(Some).map_err(R::Error::invalid_data)
} else {
Ok(None)
}
}
}
// Custom implementation since Display does not use absolute paths.
impl<S> NixSerialize for StorePath<S>
where
S: std::cmp::Eq + Deref<Target = str> + Display + Sync,
{
async fn serialize<W>(&self, writer: &mut W) -> Result<(), W::Error>
where
W: NixWrite,
{
writer.write_value(&self.to_absolute_path()).await
}
}
// Writes StorePath or an empty string.
impl NixSerialize for Option<StorePath<String>> {
async fn serialize<W>(&self, writer: &mut W) -> Result<(), W::Error>
where
W: NixWrite,
{
match self {
Some(value) => writer.write_value(value).await,
None => writer.write_value("").await,
}
}
}
#[derive(NixSerialize, Debug)]
pub struct UnkeyedValidPathInfo {
pub deriver: Option<StorePath<String>>,
pub nar_hash: String,
pub references: Vec<StorePath<String>>,
pub registration_time: u64,
pub nar_size: u64,
pub ultimate: bool,
pub signatures: Vec<Signature<String>>,
pub ca: Option<CAHash>,
}
#[cfg(test)]
mod tests {}

View file

@ -1,7 +1,11 @@
use std::sync::Arc;
use std::{io::Result, sync::Arc};
use nix_compat::nix_daemon::NixDaemonIO;
use tvix_store::pathinfoservice::PathInfoService;
use nix_compat::{
nix_daemon::{types::UnkeyedValidPathInfo, NixDaemonIO},
nixbase32,
store_path::StorePath,
};
use tvix_store::{path_info::PathInfo, pathinfoservice::PathInfoService};
#[allow(dead_code)]
pub struct TvixDaemon {
@ -15,4 +19,30 @@ impl TvixDaemon {
}
/// Implements [NixDaemonIO] backed by tvix services.
impl NixDaemonIO for TvixDaemon {}
impl NixDaemonIO for TvixDaemon {
async fn query_path_info(
&self,
path: &StorePath<String>,
) -> Result<Option<UnkeyedValidPathInfo>> {
match self.path_info_service.get(*path.digest()).await? {
Some(path_info) => Ok(Some(into_unkeyed_path_info(path_info))),
None => Ok(None),
}
}
}
// PathInfo lives in the tvix-store crate, but does not depend on nix-compat's wire feature,
// while UnkeyedValidPathInfo is only available if that feature is enabled. To avoid complexity
// we manually convert as opposed to creating a From<PathInfo>.
fn into_unkeyed_path_info(info: PathInfo) -> UnkeyedValidPathInfo {
UnkeyedValidPathInfo {
deriver: info.deriver,
nar_hash: nixbase32::encode(&info.nar_sha256),
references: info.references,
registration_time: 0,
nar_size: info.nar_size,
ultimate: false,
signatures: info.signatures,
ca: info.ca,
}
}