diff --git a/tvix/store/src/nar/import.rs b/tvix/store/src/nar/import.rs index 36122d419..32c2f4e58 100644 --- a/tvix/store/src/nar/import.rs +++ b/tvix/store/src/nar/import.rs @@ -1,5 +1,10 @@ use nix_compat::nar::reader::r#async as nar_reader; -use tokio::{io::AsyncBufRead, sync::mpsc, try_join}; +use sha2::Digest; +use tokio::{ + io::{AsyncBufRead, AsyncRead}, + sync::mpsc, + try_join, +}; use tvix_castore::{ blobservice::BlobService, directoryservice::DirectoryService, @@ -11,6 +16,40 @@ use tvix_castore::{ PathBuf, }; +/// Ingests the contents from a [AsyncRead] providing NAR into the tvix store, +/// interacting with a [BlobService] and [DirectoryService]. +/// Returns the castore root node, as well as the sha256 and size of the NAR +/// contents ingested. +pub async fn ingest_nar_and_hash( + blob_service: BS, + directory_service: DS, + r: &mut R, +) -> Result<(Node, [u8; 32], u64), IngestionError> +where + R: AsyncRead + Unpin + Send, + BS: BlobService + Clone + 'static, + DS: DirectoryService, +{ + let mut nar_hash = sha2::Sha256::new(); + let mut nar_size = 0; + + // Assemble NarHash and NarSize as we read bytes. + let r = tokio_util::io::InspectReader::new(r, |b| { + nar_size += b.len() as u64; + use std::io::Write; + nar_hash.write_all(b).unwrap(); + }); + + // HACK: InspectReader doesn't implement AsyncBufRead. + // See if this can be propagated through and we can then require our input + // reader to be buffered too. + let mut r = tokio::io::BufReader::new(r); + + let root_node = ingest_nar(blob_service, directory_service, &mut r).await?; + + Ok((root_node, nar_hash.finalize().into(), nar_size)) +} + /// Ingests the contents from a [AsyncRead] providing NAR into the tvix store, /// interacting with a [BlobService] and [DirectoryService]. /// It returns the castore root node or an error. diff --git a/tvix/store/src/nar/mod.rs b/tvix/store/src/nar/mod.rs index 164748a65..8cbb091f1 100644 --- a/tvix/store/src/nar/mod.rs +++ b/tvix/store/src/nar/mod.rs @@ -4,6 +4,7 @@ use tvix_castore::B3Digest; mod import; mod renderer; pub use import::ingest_nar; +pub use import::ingest_nar_and_hash; pub use renderer::calculate_size_and_sha256; pub use renderer::write_nar; pub use renderer::SimpleRenderer; diff --git a/tvix/store/src/pathinfoservice/nix_http.rs b/tvix/store/src/pathinfoservice/nix_http.rs index cccd4805c..1dd7da483 100644 --- a/tvix/store/src/pathinfoservice/nix_http.rs +++ b/tvix/store/src/pathinfoservice/nix_http.rs @@ -1,3 +1,5 @@ +use super::PathInfoService; +use crate::{nar::ingest_nar_and_hash, proto::PathInfo}; use futures::{stream::BoxStream, TryStreamExt}; use nix_compat::{ narinfo::{self, NarInfo}, @@ -5,20 +7,13 @@ use nix_compat::{ nixhash::NixHash, }; use reqwest::StatusCode; -use sha2::Digest; -use std::io::{self, Write}; -use tokio::io::{AsyncRead, BufReader}; -use tokio_util::io::InspectReader; +use tokio::io::{self, AsyncRead}; use tonic::async_trait; use tracing::{debug, instrument, warn}; use tvix_castore::{ blobservice::BlobService, directoryservice::DirectoryService, proto as castorepb, Error, }; -use crate::proto::PathInfo; - -use super::PathInfoService; - /// NixHTTPPathInfoService acts as a bridge in between the Nix HTTP Binary cache /// protocol provided by Nix binary caches such as cache.nixos.org, and the Tvix /// Store Model. @@ -178,7 +173,7 @@ where })); // handle decompression, depending on the compression field. - let r: Box = match narinfo.compression { + let mut r: Box = match narinfo.compression { Some("none") => Box::new(r) as Box, Some("bzip2") | None => Box::new(async_compression::tokio::bufread::BzDecoder::new(r)) as Box, @@ -194,19 +189,8 @@ where ))); } }; - let mut nar_hash = sha2::Sha256::new(); - let mut nar_size = 0; - // Assemble NarHash and NarSize as we read bytes. - let r = InspectReader::new(r, |b| { - nar_size += b.len() as u64; - nar_hash.write_all(b).unwrap(); - }); - - // HACK: InspectReader doesn't implement AsyncBufRead, but neither do our decompressors. - let mut r = BufReader::new(r); - - let root_node = crate::nar::ingest_nar( + let (root_node, nar_hash, nar_size) = ingest_nar_and_hash( self.blob_service.clone(), self.directory_service.clone(), &mut r, @@ -226,7 +210,6 @@ where "NarSize mismatch".to_string(), ))?; } - let nar_hash: [u8; 32] = nar_hash.finalize().into(); if narinfo.nar_hash != nar_hash { warn!( narinfo.nar_hash = %NixHash::Sha256(narinfo.nar_hash),