feat(tvix/nix-daemon): New operation AddToStoreNar

This operation is particularly used when invoking the following
nix commands:

```
nix-store --add-fixed some-path
nix-store --add-fixed --recursive some-path
```

Change-Id: I0f9b129c838c00e10415881f1e6e0d7bc1d7a3a6
Reviewed-on: https://cl.tvl.fyi/c/depot/+/12800
Tested-by: BuildkiteCI
Reviewed-by: flokli <flokli@flokli.de>
This commit is contained in:
Vova Kryachko 2024-11-24 13:33:04 -05:00 committed by Vladimir Kryachko
parent 8ef9ba82a8
commit e9acde3c42
5 changed files with 289 additions and 19 deletions

View file

@ -1,4 +1,4 @@
use std::{future::Future, sync::Arc};
use std::{future::Future, ops::DerefMut, sync::Arc};
use bytes::Bytes;
use tokio::{
@ -8,7 +8,8 @@ use tokio::{
use tracing::{debug, warn};
use super::{
types::QueryValidPaths,
framing::{NixFramedReader, StderrReadFramedReader},
types::{AddToStoreNarRequest, QueryValidPaths},
worker_protocol::{server_handshake_client, ClientSettings, Operation, Trust, STDERR_LAST},
NixDaemonIO,
};
@ -120,7 +121,7 @@ where
Ok(operation) => match operation {
Operation::IsValidPath => {
let path: StorePath<String> = self.reader.read_value().await?;
self.handle(io.is_valid_path(&path)).await?
Self::handle(&self.writer, io.is_valid_path(&path)).await?
}
// Note this operation does not currently delegate to NixDaemonIO,
// The general idea is that we will pass relevant ClientSettings
@ -128,23 +129,23 @@ where
// For now we just store the settings in the NixDaemon for future use.
Operation::SetOptions => {
self.client_settings = self.reader.read_value().await?;
self.handle(async { Ok(()) }).await?
Self::handle(&self.writer, async { Ok(()) }).await?
}
Operation::QueryPathInfo => {
let path: StorePath<String> = self.reader.read_value().await?;
self.handle(io.query_path_info(&path)).await?
Self::handle(&self.writer, io.query_path_info(&path)).await?
}
Operation::QueryPathFromHashPart => {
let hash: Bytes = self.reader.read_value().await?;
self.handle(io.query_path_from_hash_part(&hash)).await?
Self::handle(&self.writer, io.query_path_from_hash_part(&hash)).await?
}
Operation::QueryValidPaths => {
let query: QueryValidPaths = self.reader.read_value().await?;
self.handle(io.query_valid_paths(&query)).await?
Self::handle(&self.writer, io.query_valid_paths(&query)).await?
}
Operation::QueryValidDerivers => {
let path: StorePath<String> = self.reader.read_value().await?;
self.handle(io.query_valid_derivers(&path)).await?
Self::handle(&self.writer, io.query_valid_derivers(&path)).await?
}
// FUTUREWORK: These are just stubs that return an empty list.
// It's important not to return an error for the local-overlay:// store
@ -154,7 +155,7 @@ where
// invariants.
Operation::QueryReferrers | Operation::QueryRealisation => {
let _: String = self.reader.read_value().await?;
self.handle(async move {
Self::handle(&self.writer, async move {
warn!(
?operation,
"This operation is not implemented. Returning empty result..."
@ -163,6 +164,41 @@ where
})
.await?
}
Operation::AddToStoreNar => {
let request: AddToStoreNarRequest = self.reader.read_value().await?;
let minor_version = self.protocol_version.minor();
match minor_version {
..21 => {
// Before protocol version 1.21, the nar is sent unframed, so we just
// pass the reader directly to the operation.
Self::handle(
&self.writer,
self.io.add_to_store_nar(request, &mut self.reader),
)
.await?
}
21..23 => {
// Protocol versions 1.21 .. 1.23 use STDERR_READ protocol, see logging.md#stderr_read.
Self::handle(&self.writer, async {
let mut writer = self.writer.lock().await;
let mut reader = StderrReadFramedReader::new(
&mut self.reader,
writer.deref_mut(),
);
self.io.add_to_store_nar(request, &mut reader).await
})
.await?
}
23.. => {
// Starting at protocol version 1.23, the framed protocol is used, see serialization.md#framed
let mut framed = NixFramedReader::new(&mut self.reader);
Self::handle(&self.writer, async {
self.io.add_to_store_nar(request, &mut framed).await
})
.await?
}
}
}
_ => {
return Err(std::io::Error::other(format!(
"Operation {operation:?} is not implemented"
@ -188,14 +224,14 @@ where
/// This is a helper method, awaiting on the passed in future and then
/// handling log lines/activities as described above.
async fn handle<T>(
&mut self,
writer: &Arc<Mutex<NixWriter<WriteHalf<RW>>>>,
future: impl Future<Output = std::io::Result<T>>,
) -> Result<(), std::io::Error>
where
T: NixSerialize + Send,
{
let result = future.await;
let mut writer = self.writer.lock().await;
let mut writer = writer.lock().await;
match result {
Ok(r) => {
@ -244,6 +280,17 @@ mod tests {
) -> Result<Option<UnkeyedValidPathInfo>> {
Ok(None)
}
async fn add_to_store_nar<R>(
&self,
_request: crate::nix_daemon::types::AddToStoreNarRequest,
_reader: &mut R,
) -> Result<()>
where
R: tokio::io::AsyncRead + Send + Unpin,
{
Ok(())
}
}
#[tokio::test]

View file

@ -3,8 +3,9 @@ pub mod worker_protocol;
use std::io::Result;
use futures::future::try_join_all;
use tokio::io::AsyncRead;
use tracing::warn;
use types::{QueryValidPaths, UnkeyedValidPathInfo};
use types::{AddToStoreNarRequest, QueryValidPaths, UnkeyedValidPathInfo};
use crate::store_path::StorePath;
@ -60,6 +61,14 @@ pub trait NixDaemonIO: Sync {
Ok(result)
}
}
fn add_to_store_nar<R>(
&self,
request: AddToStoreNarRequest,
reader: &mut R,
) -> impl std::future::Future<Output = Result<()>> + Send
where
R: AsyncRead + Send + Unpin;
}
#[cfg(test)]
@ -89,6 +98,17 @@ mod tests {
) -> std::io::Result<Option<UnkeyedValidPathInfo>> {
Ok(None)
}
async fn add_to_store_nar<R>(
&self,
_request: super::types::AddToStoreNarRequest,
_reader: &mut R,
) -> std::io::Result<()>
where
R: tokio::io::AsyncRead + Send + Unpin,
{
Ok(())
}
}
#[tokio::test]

View file

@ -1,3 +1,4 @@
use crate::wire::de::Error;
use crate::{
narinfo::Signature,
nixhash::CAHash,
@ -73,6 +74,21 @@ impl NixError {
nix_compat_derive::nix_serialize_remote!(#[nix(display)] Signature<String>);
impl NixDeserialize for Signature<String> {
async fn try_deserialize<R>(reader: &mut R) -> Result<Option<Self>, R::Error>
where
R: ?Sized + NixRead + Send,
{
let value: Option<String> = reader.try_read_value().await?;
match value {
Some(value) => Ok(Some(
Signature::<String>::parse(&value).map_err(R::Error::invalid_data)?,
)),
None => Ok(None),
}
}
}
impl NixSerialize for CAHash {
async fn serialize<W>(&self, writer: &mut W) -> Result<(), W::Error>
where
@ -94,6 +110,42 @@ impl NixSerialize for Option<CAHash> {
}
}
impl NixDeserialize for CAHash {
async fn try_deserialize<R>(reader: &mut R) -> Result<Option<Self>, R::Error>
where
R: ?Sized + NixRead + Send,
{
let value: Option<String> = reader.try_read_value().await?;
match value {
Some(value) => Ok(Some(CAHash::from_nix_hex_str(&value).ok_or_else(|| {
R::Error::invalid_data(format!("Invalid cahash {}", value))
})?)),
None => Ok(None),
}
}
}
impl NixDeserialize for Option<CAHash> {
async fn try_deserialize<R>(reader: &mut R) -> Result<Option<Self>, R::Error>
where
R: ?Sized + NixRead + Send,
{
let value: Option<String> = reader.try_read_value().await?;
match value {
Some(value) => {
if value.is_empty() {
Ok(None)
} else {
Ok(Some(Some(CAHash::from_nix_hex_str(&value).ok_or_else(
|| R::Error::invalid_data(format!("Invalid cahash {}", value)),
)?)))
}
}
None => Ok(None),
}
}
}
impl NixSerialize for Option<UnkeyedValidPathInfo> {
async fn serialize<W>(&self, writer: &mut W) -> Result<(), W::Error>
where
@ -125,6 +177,27 @@ impl NixDeserialize for StorePath<String> {
}
}
impl NixDeserialize for Option<StorePath<String>> {
async fn try_deserialize<R>(reader: &mut R) -> Result<Option<Self>, R::Error>
where
R: ?Sized + NixRead + Send,
{
use crate::wire::de::Error;
if let Some(buf) = reader.try_read_bytes().await? {
if buf.is_empty() {
Ok(Some(None))
} else {
let result = StorePath::<String>::from_absolute_path(&buf);
result
.map(|r| Some(Some(r)))
.map_err(R::Error::invalid_data)
}
} else {
Ok(Some(None))
}
}
}
// Custom implementation since Display does not use absolute paths.
impl<S> NixSerialize for StorePath<S>
where
@ -174,3 +247,60 @@ pub struct QueryValidPaths {
#[nix(version = "27..")]
pub substitute: bool,
}
/// newtype wrapper for the byte array that correctly implements NixSerialize, NixDeserialize.
#[derive(Debug)]
pub struct NarHash([u8; 32]);
impl std::ops::Deref for NarHash {
type Target = [u8; 32];
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl NixDeserialize for NarHash {
async fn try_deserialize<R>(reader: &mut R) -> Result<Option<Self>, R::Error>
where
R: ?Sized + NixRead + Send,
{
if let Some(bytes) = reader.try_read_bytes().await? {
let result = data_encoding::HEXLOWER
.decode(bytes.as_ref())
.map_err(R::Error::invalid_data)?;
Ok(Some(NarHash(result.try_into().map_err(|_| {
R::Error::invalid_data("incorrect length")
})?)))
} else {
Ok(None)
}
}
}
/// Request type for [super::worker_protocol::Operation::AddToStoreNar]
#[derive(NixDeserialize, Debug)]
pub struct AddToStoreNarRequest {
// - path :: [StorePath][se-StorePath]
pub path: StorePath<String>,
// - deriver :: [OptStorePath][se-OptStorePath]
pub deriver: Option<StorePath<String>>,
// - narHash :: [NARHash][se-NARHash] - always sha256
pub nar_hash: NarHash,
// - references :: [Set][se-Set] of [StorePath][se-StorePath]
pub references: Vec<StorePath<String>>,
// - registrationTime :: [Time][se-Time]
pub registration_time: u64,
// - narSize :: [UInt64][se-UInt64]
pub nar_size: u64,
// - ultimate :: [Bool64][se-Bool64]
pub ultimate: bool,
// - signatures :: [Set][se-Set] of [Signature][se-Signature]
pub signatures: Vec<Signature<String>>,
// - ca :: [OptContentAddress][se-OptContentAddress]
pub ca: Option<CAHash>,
// - repair :: [Bool64][se-Bool64]
pub repair: bool,
// - dontCheckSigs :: [Bool64][se-Bool64]
pub dont_check_sigs: bool,
}

View file

@ -60,7 +60,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
}
async fn run(cli: Cli) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let (_blob_service, _directory_service, path_info_service, _nar_calculation_service) =
let (blob_service, directory_service, path_info_service, _nar_calculation_service) =
construct_services(cli.service_addrs).await?;
let listen_address = cli.listen_args.listen_address.unwrap_or_else(|| {
@ -76,7 +76,11 @@ async fn run(cli: Cli) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
)
.await?;
let io = Arc::new(TvixDaemon::new(path_info_service));
let io = Arc::new(TvixDaemon::new(
blob_service,
directory_service,
path_info_service,
));
while let Ok((connection, _)) = listener.accept().await {
let io = io.clone();

View file

@ -4,20 +4,35 @@ use std::{
};
use nix_compat::{
nix_daemon::{types::UnkeyedValidPathInfo, NixDaemonIO},
nix_daemon::{
types::{AddToStoreNarRequest, UnkeyedValidPathInfo},
NixDaemonIO,
},
nixbase32,
store_path::StorePath,
store_path::{build_ca_path, StorePath},
};
use tvix_store::{path_info::PathInfo, pathinfoservice::PathInfoService};
use tracing::warn;
use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService};
use tvix_store::{nar::ingest_nar_and_hash, path_info::PathInfo, pathinfoservice::PathInfoService};
#[allow(dead_code)]
pub struct TvixDaemon {
blob_service: Arc<dyn BlobService>,
directory_service: Arc<dyn DirectoryService>,
path_info_service: Arc<dyn PathInfoService>,
}
impl TvixDaemon {
pub fn new(path_info_service: Arc<dyn PathInfoService>) -> Self {
Self { path_info_service }
pub fn new(
blob_service: Arc<dyn BlobService>,
directory_service: Arc<dyn DirectoryService>,
path_info_service: Arc<dyn PathInfoService>,
) -> Self {
Self {
blob_service,
directory_service,
path_info_service,
}
}
}
@ -48,6 +63,60 @@ impl NixDaemonIO for TvixDaemon {
None => Ok(None),
}
}
async fn add_to_store_nar<R>(&self, request: AddToStoreNarRequest, reader: &mut R) -> Result<()>
where
R: tokio::io::AsyncRead + Send + Unpin,
{
let (root_node, nar_sha256, nar_size) = ingest_nar_and_hash(
self.blob_service.clone(),
self.directory_service.clone(),
reader,
&request.ca,
)
.await
.map_err(|e| Error::other(e.to_string()))?;
if nar_size != request.nar_size || nar_sha256 != *request.nar_hash {
warn!(
nar_hash.expected = nixbase32::encode(&*request.nar_hash),
nar_hash.actual = nixbase32::encode(&nar_sha256),
"nar hash mismatch"
);
return Err(Error::other(
"ingested nar ended up different from what was specified in the request",
));
}
if let Some(cahash) = &request.ca {
let actual_path: StorePath<String> = build_ca_path(
request.path.name(),
cahash,
request.references.iter().map(|p| p.to_absolute_path()),
false,
)
.map_err(Error::other)?;
if actual_path != request.path {
return Err(Error::other("path mismatch"));
}
}
let path_info = PathInfo {
store_path: request.path,
node: root_node,
references: request.references,
nar_size,
nar_sha256,
signatures: request.signatures,
deriver: request.deriver,
ca: request.ca,
};
self.path_info_service
.put(path_info)
.await
.map_err(|e| Error::other(e.to_string()))?;
Ok(())
}
}
// PathInfo lives in the tvix-store crate, but does not depend on nix-compat's wire feature,