refactor(tvix/store/nar/import): add ingest_nar_and_hash

This wraps ingest_nar, but also keeps track of the number of bytes read,
and calculates the sha256 digest of it.

Make use of it in the NixHTTPPathInfoService, where this code is coming
from.

Change-Id: I9c54e93d3ec8ed9ede87aed43e04d114fb06897b
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11787
Reviewed-by: Connor Brewster <cbrewster@hey.com>
Autosubmit: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
This commit is contained in:
Florian Klink 2024-06-11 15:46:52 +03:00 committed by clbot
parent beb7f57c73
commit ff40332864
3 changed files with 46 additions and 23 deletions

View file

@ -1,5 +1,10 @@
use nix_compat::nar::reader::r#async as nar_reader;
use tokio::{io::AsyncBufRead, sync::mpsc, try_join};
use sha2::Digest;
use tokio::{
io::{AsyncBufRead, AsyncRead},
sync::mpsc,
try_join,
};
use tvix_castore::{
blobservice::BlobService,
directoryservice::DirectoryService,
@ -11,6 +16,40 @@ use tvix_castore::{
PathBuf,
};
/// Ingests the contents from a [AsyncRead] providing NAR into the tvix store,
/// interacting with a [BlobService] and [DirectoryService].
/// Returns the castore root node, as well as the sha256 and size of the NAR
/// contents ingested.
pub async fn ingest_nar_and_hash<R, BS, DS>(
blob_service: BS,
directory_service: DS,
r: &mut R,
) -> Result<(Node, [u8; 32], u64), IngestionError<Error>>
where
R: AsyncRead + Unpin + Send,
BS: BlobService + Clone + 'static,
DS: DirectoryService,
{
let mut nar_hash = sha2::Sha256::new();
let mut nar_size = 0;
// Assemble NarHash and NarSize as we read bytes.
let r = tokio_util::io::InspectReader::new(r, |b| {
nar_size += b.len() as u64;
use std::io::Write;
nar_hash.write_all(b).unwrap();
});
// HACK: InspectReader doesn't implement AsyncBufRead.
// See if this can be propagated through and we can then require our input
// reader to be buffered too.
let mut r = tokio::io::BufReader::new(r);
let root_node = ingest_nar(blob_service, directory_service, &mut r).await?;
Ok((root_node, nar_hash.finalize().into(), nar_size))
}
/// Ingests the contents from a [AsyncRead] providing NAR into the tvix store,
/// interacting with a [BlobService] and [DirectoryService].
/// It returns the castore root node or an error.

View file

@ -4,6 +4,7 @@ use tvix_castore::B3Digest;
mod import;
mod renderer;
pub use import::ingest_nar;
pub use import::ingest_nar_and_hash;
pub use renderer::calculate_size_and_sha256;
pub use renderer::write_nar;
pub use renderer::SimpleRenderer;

View file

@ -1,3 +1,5 @@
use super::PathInfoService;
use crate::{nar::ingest_nar_and_hash, proto::PathInfo};
use futures::{stream::BoxStream, TryStreamExt};
use nix_compat::{
narinfo::{self, NarInfo},
@ -5,20 +7,13 @@ use nix_compat::{
nixhash::NixHash,
};
use reqwest::StatusCode;
use sha2::Digest;
use std::io::{self, Write};
use tokio::io::{AsyncRead, BufReader};
use tokio_util::io::InspectReader;
use tokio::io::{self, AsyncRead};
use tonic::async_trait;
use tracing::{debug, instrument, warn};
use tvix_castore::{
blobservice::BlobService, directoryservice::DirectoryService, proto as castorepb, Error,
};
use crate::proto::PathInfo;
use super::PathInfoService;
/// NixHTTPPathInfoService acts as a bridge in between the Nix HTTP Binary cache
/// protocol provided by Nix binary caches such as cache.nixos.org, and the Tvix
/// Store Model.
@ -178,7 +173,7 @@ where
}));
// handle decompression, depending on the compression field.
let r: Box<dyn AsyncRead + Send + Unpin> = match narinfo.compression {
let mut r: Box<dyn AsyncRead + Send + Unpin> = match narinfo.compression {
Some("none") => Box::new(r) as Box<dyn AsyncRead + Send + Unpin>,
Some("bzip2") | None => Box::new(async_compression::tokio::bufread::BzDecoder::new(r))
as Box<dyn AsyncRead + Send + Unpin>,
@ -194,19 +189,8 @@ where
)));
}
};
let mut nar_hash = sha2::Sha256::new();
let mut nar_size = 0;
// Assemble NarHash and NarSize as we read bytes.
let r = InspectReader::new(r, |b| {
nar_size += b.len() as u64;
nar_hash.write_all(b).unwrap();
});
// HACK: InspectReader doesn't implement AsyncBufRead, but neither do our decompressors.
let mut r = BufReader::new(r);
let root_node = crate::nar::ingest_nar(
let (root_node, nar_hash, nar_size) = ingest_nar_and_hash(
self.blob_service.clone(),
self.directory_service.clone(),
&mut r,
@ -226,7 +210,6 @@ where
"NarSize mismatch".to_string(),
))?;
}
let nar_hash: [u8; 32] = nar_hash.finalize().into();
if narinfo.nar_hash != nar_hash {
warn!(
narinfo.nar_hash = %NixHash::Sha256(narinfo.nar_hash),