refactor(tvix/glue/builtins/import): refactor

This removes all the intermediate helper functions and reorganizes the
import code to only do the calculations where/when needed, and hopefully
makes things easier to understand as well.

Change-Id: I7e4c89c742bf8569b45e303523f7f801da7127ea
Reviewed-on: https://cl.tvl.fyi/c/depot/+/12627
Autosubmit: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
Reviewed-by: Jörg Thalheim <joerg@thalheim.io>
Reviewed-by: edef <edef@edef.eu>
This commit is contained in:
Florian Klink 2024-10-15 16:11:42 +03:00 committed by clbot
parent baebe29bab
commit ca1e628c85
6 changed files with 206 additions and 233 deletions

View file

@ -77,20 +77,6 @@ correctness:
"some amount of outgoing bytes" in memory.
This is somewhat blocked until the {Chunk/Blob}Service split is done, as then
prefetching would only be a matter of adding it into the one `BlobReader`.
- The import builtins (`builtins.path` and `builtins.filterSource`) use(d) some
helper functions in TvixStoreIO that deals with constructing the `PathInfo`
structs and inserting them, but some of the abstractions where probably done
at the wrong layer:
- Some ways of importing calculate the NAR representation twice
- The code isn't very usable from other consumers that also create structs
`PathInfo`.
- `node_to_path_info` is ony called by `register_in_path_info_service` (due
to this marked as private for now).
Instead of fighting these abstractions, maybe it's best to explicitly add the
code back to the two builtins, remove redundant calls and calculations. A
later phase could then see how/if some of this can be reasonably factored out in
a way it's also usable for other places having a node and wanting to persist
it (if at all).
### Error cleanup
- Currently, all services use tvix_castore::Error, which only has two kinds

View file

@ -64,10 +64,10 @@ pub enum FetcherError {
#[derive(Debug, Error)]
pub enum ImportError {
#[error("non-file '{0}' cannot be imported in 'flat' mode")]
FlatImportOfNonFile(String),
FlatImportOfNonFile(PathBuf),
#[error("hash mismatch at ingestion of '{0}', expected: '{1}', got: '{2}'")]
HashMismatch(String, NixHash, NixHash),
HashMismatch(PathBuf, NixHash, NixHash),
#[error("path '{}' is not absolute or invalid", .0.display())]
PathNotAbsoluteOrInvalid(PathBuf),

View file

@ -112,7 +112,7 @@ mod import_builtins {
use crate::tvix_store_io::TvixStoreIO;
use bstr::ByteSlice;
use nix_compat::nixhash::{CAHash, NixHash};
use nix_compat::store_path::StorePathRef;
use nix_compat::store_path::{build_ca_path, StorePathRef};
use sha2::Digest;
use std::rc::Rc;
use tokio::io::AsyncWriteExt;
@ -120,6 +120,7 @@ mod import_builtins {
use tvix_eval::generators::Gen;
use tvix_eval::{generators::GenCo, ErrorKind, Value};
use tvix_eval::{FileType, NixContextElement, NixString};
use tvix_store::path_info::PathInfo;
#[builtin("path")]
async fn builtin_path(
@ -147,124 +148,128 @@ mod import_builtins {
.expect("Failed to derive the default name out of the path")
.to_string()
};
let filter = args.select("filter");
// Construct a sha256 hasher, which is needed for flat ingestion.
let recursive_ingestion = args
.select("recursive")
.map(|r| r.as_bool())
.transpose()?
.unwrap_or(true); // Yes, yes, Nix, by default, puts `recursive = true;`.
let expected_sha256 = args
.select("sha256")
.map(|h| {
h.to_str().and_then(|expected| {
let expected = expected.into_bstring().to_string();
// TODO: ensure that we fail if this is not a valid str.
nix_compat::nixhash::from_str(&expected, None).map_err(|_err| {
// TODO: a better error would be nice, we use
// DerivationError::InvalidOutputHash usually for derivation construction.
// This is not a derivation construction, should we move it outside and
// generalize?
ErrorKind::TypeError {
expected: "sha256",
actual: "not a sha256",
match nix_compat::nixhash::from_str(
expected.into_bstring().to_str()?,
Some("sha256"),
) {
Ok(NixHash::Sha256(digest)) => Ok(digest),
Ok(_) => unreachable!(),
Err(_e) => {
// TODO: a better error would be nice, we use
// DerivationError::InvalidOutputHash usually for derivation construction.
// This is not a derivation construction, should we move it outside and
// generalize?
Err(ErrorKind::TypeError {
expected: "sha256",
actual: "not a sha256",
})
}
})
}
})
})
.transpose()?;
// Check if the path points to a regular file.
// If it does, the filter function is never executed.
// TODO: follow symlinks and check their type instead
let (root_node, ca_hash) = match state.file_type(path.as_ref())? {
// As a first step, we ingest the contents, and get back a root node,
// and optionally the sha256 a flat file.
let (root_node, ca) = match state.file_type(path.as_ref())? {
// Check if the path points to a regular file.
// If it does, the filter function is never executed, and we copy to the blobservice directly.
// If recursive is false, we need to calculate the sha256 digest of the raw contents,
// as that affects the output path calculation.
FileType::Regular => {
let mut file = state.open(path.as_ref())?;
// This is a single file, copy it to the blobservice directly.
let mut hash = sha2::Sha256::new();
let mut flat_sha256 = (!recursive_ingestion).then(sha2::Sha256::new);
let mut blob_size = 0;
let mut blob_writer = state
.tokio_handle
.block_on(async { state.blob_service.open_write().await });
let mut buf = [0u8; 4096];
// read piece by piece and write to blob_writer.
// This is a bit manual due to EvalIO being sync, while everything else async.
{
let mut buf = [0u8; 4096];
loop {
// read bytes into buffer, break out if EOF
let len = file.read(&mut buf)?;
if len == 0 {
break;
}
blob_size += len as u64;
loop {
// read bytes into buffer, break out if EOF
let len = file.read(&mut buf)?;
if len == 0 {
break;
}
blob_size += len as u64;
let data = &buf[0..len];
let data = &buf[0..len];
// add to blobwriter
state
.tokio_handle
.block_on(async { blob_writer.write_all(data).await })?;
// update the sha256 hash function. We can skip that if we're not using it.
if !recursive_ingestion {
hash.update(data);
}
}
// close the blob writer, get back the b3 digest.
let blob_digest = state
.tokio_handle
.block_on(async { blob_writer.close().await })?;
let root_node = Node::File {
digest: blob_digest,
size: blob_size,
executable: false,
};
let ca_hash = if recursive_ingestion {
let (_nar_size, nar_sha256) = state
.tokio_handle
.block_on(async {
state
.nar_calculation_service
.as_ref()
.calculate_nar(&root_node)
.await
})
.map_err(|e| tvix_eval::ErrorKind::TvixError(Rc::new(e)))?;
CAHash::Nar(NixHash::Sha256(nar_sha256))
} else {
CAHash::Flat(NixHash::Sha256(hash.finalize().into()))
};
(root_node, ca_hash)
}
FileType::Directory => {
if !recursive_ingestion {
return Err(ImportError::FlatImportOfNonFile(
path.to_string_lossy().to_string(),
))?;
}
// do the filtered ingest
let root_node = filtered_ingest(state.clone(), co, path.as_ref(), filter).await?;
// calculate the NAR sha256
let (_nar_size, nar_sha256) = state
.tokio_handle
.block_on(async {
// add to blobwriter
state
.nar_calculation_service
.as_ref()
.calculate_nar(&root_node)
.await
})
.map_err(|e| tvix_eval::ErrorKind::TvixError(Rc::new(e)))?;
.tokio_handle
.block_on(async { blob_writer.write_all(data).await })?;
let ca_hash = CAHash::Nar(NixHash::Sha256(nar_sha256));
// update blob_sha256 if needed.
if let Some(h) = flat_sha256.as_mut() {
h.update(data)
}
}
}
(root_node, ca_hash)
// close the blob writer, construct the root node and the blob_sha256 (later used for output path calculation)
(
Node::File {
digest: state
.tokio_handle
.block_on(async { blob_writer.close().await })?,
size: blob_size,
executable: false,
},
{
// If non-recursive ingestion is requested…
if let Some(flat_sha256) = flat_sha256 {
let actual_sha256 = flat_sha256.finalize().into();
// compare the recorded flat hash with an upfront one if provided.
if let Some(expected_sha256) = expected_sha256 {
if actual_sha256 != expected_sha256 {
return Err(ImportError::HashMismatch(
path,
NixHash::Sha256(expected_sha256),
NixHash::Sha256(actual_sha256),
)
.into());
}
}
Some(CAHash::Flat(NixHash::Sha256(actual_sha256)))
} else {
None
}
},
)
}
FileType::Directory if !recursive_ingestion => {
return Err(ImportError::FlatImportOfNonFile(path))?
}
// do the filtered ingest
FileType::Directory => (
filtered_ingest(state.clone(), co, path.as_ref(), filter).await?,
None,
),
FileType::Symlink => {
// FUTUREWORK: Nix follows a symlink if it's at the root,
// except if it's not resolve-able (NixOS/nix#7761).i
@ -287,26 +292,63 @@ mod import_builtins {
}
};
if let Some(expected_sha256) = expected_sha256 {
if *ca_hash.hash() != expected_sha256 {
Err(ImportError::HashMismatch(
path.to_string_lossy().to_string(),
expected_sha256,
ca_hash.hash().into_owned(),
))?;
// Calculate the NAR sha256.
let (nar_size, nar_sha256) = state
.tokio_handle
.block_on(async {
state
.nar_calculation_service
.as_ref()
.calculate_nar(&root_node)
.await
})
.map_err(|e| tvix_eval::ErrorKind::TvixError(Rc::new(e)))?;
// Calculate the CA hash for the recursive cases, this is only already
// `Some(_)` for flat ingestion.
let ca = match ca {
None => {
// If an upfront-expected NAR hash was specified, compare.
if let Some(expected_nar_sha256) = expected_sha256 {
if expected_nar_sha256 != nar_sha256 {
return Err(ImportError::HashMismatch(
path,
NixHash::Sha256(expected_nar_sha256),
NixHash::Sha256(nar_sha256),
)
.into());
}
}
CAHash::Nar(NixHash::Sha256(nar_sha256))
}
}
Some(ca) => ca,
};
let store_path = build_ca_path(&name, &ca, Vec::<&str>::new(), false)
.map_err(|e| tvix_eval::ErrorKind::TvixError(Rc::new(e)))?;
let path_info = state
.tokio_handle
.block_on(async {
state
.register_in_path_info_service(name.as_ref(), path.as_ref(), ca_hash, root_node)
.path_info_service
.as_ref()
.put(PathInfo {
store_path,
node: root_node,
// There's no reference scanning on path contents ingested like this.
references: vec![],
nar_size,
nar_sha256,
signatures: vec![],
deriver: None,
ca: Some(ca),
})
.await
})
.map_err(|e| tvix_eval::ErrorKind::IO {
path: Some(path.to_path_buf()),
error: Rc::new(e),
error: Rc::new(e.into()),
})?;
// We need to attach context to the final output path.
@ -332,24 +374,46 @@ mod import_builtins {
let path_info = state
.tokio_handle
.block_on(async {
let (_, nar_sha256) = state
// Ask the PathInfoService for the NAR size and sha256
// We always need it no matter what is the actual hash mode
// because the [PathInfo] needs to contain nar_{sha256,size}.
let (nar_size, nar_sha256) = state
.nar_calculation_service
.as_ref()
.calculate_nar(&root_node)
.await?;
let ca = CAHash::Nar(NixHash::Sha256(nar_sha256));
// Calculate the output path. This might still fail, as some names are illegal.
let output_path =
nix_compat::store_path::build_ca_path(name, &ca, Vec::<&str>::new(), false)
.map_err(|_| {
std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("invalid name: {}", name),
)
})?;
state
.register_in_path_info_service(
name,
&p,
CAHash::Nar(NixHash::Sha256(nar_sha256)),
root_node,
)
.path_info_service
.as_ref()
.put(PathInfo {
store_path: output_path,
node: root_node,
// There's no reference scanning on path contents ingested like this.
references: vec![],
nar_size,
nar_sha256,
signatures: vec![],
deriver: None,
ca: Some(ca),
})
.await
})
.map_err(|err| ErrorKind::IO {
.map_err(|e| ErrorKind::IO {
path: Some(p.to_path_buf()),
error: err.into(),
error: Rc::new(e.into()),
})?;
// We need to attach context to the final output path.

View file

@ -384,56 +384,6 @@ impl TvixStoreIO {
.await
.map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))
}
async fn node_to_path_info<'a>(
&self,
name: &'a str,
path: &Path,
ca: CAHash,
root_node: Node,
) -> io::Result<PathInfo> {
// Ask the PathInfoService for the NAR size and sha256
// We always need it no matter what is the actual hash mode
// because the [PathInfo] needs to contain nar_{sha256,size}.
let (nar_size, nar_sha256) = self
.nar_calculation_service
.as_ref()
.calculate_nar(&root_node)
.await?;
// Calculate the output path. This might still fail, as some names are illegal.
let output_path =
nix_compat::store_path::build_ca_path(name, &ca, Vec::<&str>::new(), false).map_err(
|_| {
std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("invalid name: {}", name),
)
},
)?;
tvix_store::import::log_node(name.as_bytes(), &root_node, path);
// construct a PathInfo
Ok(tvix_store::import::derive_nar_ca_path_info(
nar_size,
nar_sha256,
Some(ca),
output_path,
root_node,
))
}
pub(crate) async fn register_in_path_info_service<'a>(
&self,
name: &'a str,
path: &Path,
ca: CAHash,
root_node: Node,
) -> io::Result<PathInfo> {
let path_info = self.node_to_path_info(name, path, ca, root_node).await?;
Ok(self.path_info_service.as_ref().put(path_info).await?)
}
}
impl EvalIO for TvixStoreIO {
@ -589,7 +539,7 @@ impl EvalIO for TvixStoreIO {
#[instrument(skip(self), ret(level = Level::TRACE), err)]
fn import_path(&self, path: &Path) -> io::Result<PathBuf> {
let output_path = self.tokio_handle.block_on(async {
let path_info = self.tokio_handle.block_on({
tvix_store::import::import_path_as_nar_ca(
path,
tvix_store::import::path_to_name(path)?,
@ -598,10 +548,10 @@ impl EvalIO for TvixStoreIO {
&self.path_info_service,
&self.nar_calculation_service,
)
.await
})?;
Ok(output_path.to_absolute_path().into())
// From the returned PathInfo, extract the store path and return it.
Ok(path_info.store_path.to_absolute_path().into())
}
#[instrument(skip(self), ret(level = Level::TRACE))]

View file

@ -262,9 +262,9 @@ async fn run_cli(cli: Cli) -> Result<(), Box<dyn std::error::Error + Send + Sync
nar_calculation_service,
)
.await;
if let Ok(output_path) = resp {
if let Ok(path_info) = resp {
// If the import was successful, print the path to stdout.
println!("{}", output_path.to_absolute_path());
println!("{}", path_info.store_path.to_absolute_path());
}
}
}

View file

@ -7,7 +7,7 @@ use tvix_castore::{
use nix_compat::{
nixhash::{CAHash, NixHash},
store_path::{self, StorePath, StorePathRef},
store_path::{self, StorePath},
};
use crate::{
@ -70,38 +70,10 @@ pub fn path_to_name(path: &Path) -> std::io::Result<&str> {
})
}
/// Takes the NAR size, SHA-256 of the NAR representation, the root node and optionally
/// a CA hash information.
///
/// Constructs a [PathInfo] for a NAR-style object.
///
/// The user can then further fill the fields (like deriver, signatures), and/or
/// verify to have the expected hashes.
#[inline]
pub fn derive_nar_ca_path_info(
nar_size: u64,
nar_sha256: [u8; 32],
ca: Option<CAHash>,
store_path: StorePath<String>,
root_node: Node,
) -> PathInfo {
// assemble the [crate::proto::PathInfo] object.
PathInfo {
store_path,
node: root_node,
// There's no reference scanning on path contents ingested like this.
references: vec![],
nar_size,
nar_sha256,
signatures: vec![],
deriver: None,
ca,
}
}
/// Ingest the contents at the given path `path` into castore, and registers the
/// resulting root node in the passed PathInfoService, using the "NAR sha256
/// digest" and the passed name for output path calculation.
/// Inserts the PathInfo into the PathInfoService and returns it back to the caller.
#[instrument(skip_all, fields(store_name=name, path=?path), err)]
pub async fn import_path_as_nar_ca<BS, DS, PS, NS, P>(
path: P,
@ -110,7 +82,7 @@ pub async fn import_path_as_nar_ca<BS, DS, PS, NS, P>(
directory_service: DS,
path_info_service: PS,
nar_calculation_service: NS,
) -> Result<StorePathRef, std::io::Error>
) -> Result<PathInfo, std::io::Error>
where
P: AsRef<Path> + std::fmt::Debug,
BS: BlobService + Clone,
@ -118,6 +90,7 @@ where
PS: AsRef<dyn PathInfoService>,
NS: NarCalculationService,
{
// Ingest the contents at the given path `path` into castore.
let root_node =
ingest_path::<_, _, _, &[u8]>(blob_service, directory_service, path.as_ref(), None)
.await
@ -129,29 +102,29 @@ where
// Calculate the output path. This might still fail, as some names are illegal.
// FUTUREWORK: express the `name` at the type level to be valid and move the conversion
// at the caller level.
let output_path = store_path::build_nar_based_store_path(&nar_sha256, name).map_err(|_| {
std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("invalid name: {}", name),
)
})?;
let output_path: StorePath<String> = store_path::build_nar_based_store_path(&nar_sha256, name)
.map_err(|_| {
std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("invalid name: {}", name),
)
})?;
log_node(name.as_ref(), &root_node, path.as_ref());
let path_info = derive_nar_ca_path_info(
nar_size,
nar_sha256,
Some(CAHash::Nar(NixHash::Sha256(nar_sha256))),
output_path.to_owned(),
root_node,
);
// This new [`PathInfo`] that we get back from there might contain additional signatures or
// information set by the service itself. In this function, we silently swallow it because
// callers don't really need it.
let _path_info = path_info_service.as_ref().put(path_info).await?;
Ok(output_path)
// Insert a PathInfo. On success, return it back to the caller.
Ok(path_info_service
.as_ref()
.put(PathInfo {
store_path: output_path.to_owned(),
node: root_node,
// There's no reference scanning on imported paths
references: vec![],
nar_size,
nar_sha256,
signatures: vec![],
deriver: None,
ca: Some(CAHash::Nar(NixHash::Sha256(nar_sha256))),
})
.await?)
}
#[cfg(test)]