feat(tvix/eval): implement builtins.path
Now, it supports almost everything except `recursive = false;`, i.e. `flat`-ingestion because we have no knob exposed in the tvix store import side to do it. This has been tested to work. Change-Id: I2e9da10ceccdfbf45b43c532077ed45d6306aa98 Reviewed-on: https://cl.tvl.fyi/c/depot/+/10597 Tested-by: BuildkiteCI Autosubmit: raitobezarius <tvl@lahfa.xyz> Reviewed-by: flokli <flokli@flokli.de>
This commit is contained in:
parent
14fe65a50b
commit
cecb5e295a
6 changed files with 367 additions and 31 deletions
|
@ -53,3 +53,19 @@ impl From<FetcherError> for tvix_eval::ErrorKind {
|
|||
tvix_eval::ErrorKind::TvixError(Rc::new(err))
|
||||
}
|
||||
}
|
||||
|
||||
/// Errors related to `builtins.path` and `builtins.filterSource`,
|
||||
/// a.k.a. "importing" builtins.
|
||||
#[derive(Debug, Error)]
|
||||
pub enum ImportError {
|
||||
#[error("non-file '{0}' cannot be imported in 'flat' mode")]
|
||||
FlatImportOfNonFile(String),
|
||||
#[error("hash mismatch at ingestion of '{0}', expected: '{1}', got: '{2}'")]
|
||||
HashMismatch(String, NixHash, NixHash),
|
||||
}
|
||||
|
||||
impl From<ImportError> for tvix_eval::ErrorKind {
|
||||
fn from(err: ImportError) -> Self {
|
||||
tvix_eval::ErrorKind::TvixError(Rc::new(err))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,11 +1,12 @@
|
|||
//! Implements builtins used to import paths in the store.
|
||||
|
||||
use crate::builtins::errors::ImportError;
|
||||
use futures::pin_mut;
|
||||
use std::path::Path;
|
||||
use tvix_eval::{
|
||||
builtin_macros::builtins,
|
||||
generators::{self, GenCo},
|
||||
ErrorKind, Value,
|
||||
ErrorKind, EvalIO, Value,
|
||||
};
|
||||
|
||||
use std::rc::Rc;
|
||||
|
@ -123,8 +124,127 @@ mod import_builtins {
|
|||
use tvix_eval::generators::Gen;
|
||||
use tvix_eval::{generators::GenCo, ErrorKind, Value};
|
||||
|
||||
use tvix_castore::B3Digest;
|
||||
|
||||
use crate::tvix_store_io::TvixStoreIO;
|
||||
|
||||
#[builtin("path")]
|
||||
async fn builtin_path(
|
||||
state: Rc<TvixStoreIO>,
|
||||
co: GenCo,
|
||||
args: Value,
|
||||
) -> Result<Value, ErrorKind> {
|
||||
let args = args.to_attrs()?;
|
||||
let path = args.select_required("path")?;
|
||||
let path = generators::request_force(&co, path.clone())
|
||||
.await
|
||||
.to_path()?;
|
||||
let name: String = if let Some(name) = args.select("name") {
|
||||
generators::request_force(&co, name.clone())
|
||||
.await
|
||||
.to_str()?
|
||||
.as_bstr()
|
||||
.to_string()
|
||||
} else {
|
||||
tvix_store::import::path_to_name(&path)
|
||||
.expect("Failed to derive the default name out of the path")
|
||||
.to_string()
|
||||
};
|
||||
let filter = args.select("filter");
|
||||
let recursive_ingestion = args
|
||||
.select("recursive")
|
||||
.map(|r| r.as_bool())
|
||||
.transpose()?
|
||||
.unwrap_or(true); // Yes, yes, Nix, by default, puts `recursive = true;`.
|
||||
let expected_sha256 = args
|
||||
.select("sha256")
|
||||
.map(|h| {
|
||||
h.to_str().and_then(|expected| {
|
||||
let expected = expected.into_bstring().to_string();
|
||||
// TODO: ensure that we fail if this is not a valid str.
|
||||
nix_compat::nixhash::from_str(&expected, None).map_err(|_err| {
|
||||
// TODO: a better error would be nice, we use
|
||||
// DerivationError::InvalidOutputHash usually for derivation construction.
|
||||
// This is not a derivation construction, should we move it outside and
|
||||
// generalize?
|
||||
ErrorKind::TypeError {
|
||||
expected: "sha256",
|
||||
actual: "not a sha256",
|
||||
}
|
||||
})
|
||||
})
|
||||
})
|
||||
.transpose()?;
|
||||
|
||||
// FUTUREWORK(performance): this reads the file instead of using a stat-like
|
||||
// system call to the file, this degrades very badly on large files.
|
||||
if !recursive_ingestion && state.read_to_end(path.as_ref()).is_err() {
|
||||
Err(ImportError::FlatImportOfNonFile(
|
||||
path.to_string_lossy().to_string(),
|
||||
))?;
|
||||
}
|
||||
|
||||
let root_node = filtered_ingest(state.clone(), co, path.as_ref(), filter).await?;
|
||||
let ca: CAHash = if recursive_ingestion {
|
||||
CAHash::Nar(NixHash::Sha256(state.tokio_handle.block_on(async {
|
||||
Ok::<_, tvix_eval::ErrorKind>(
|
||||
state
|
||||
.path_info_service
|
||||
.as_ref()
|
||||
.calculate_nar(&root_node)
|
||||
.await
|
||||
.map_err(|e| ErrorKind::TvixError(Rc::new(e)))?
|
||||
.1,
|
||||
)
|
||||
})?))
|
||||
} else {
|
||||
let digest: B3Digest = match root_node {
|
||||
tvix_castore::proto::node::Node::File(ref fnode) => {
|
||||
// It's already validated.
|
||||
fnode.digest.clone().try_into().unwrap()
|
||||
}
|
||||
// We cannot hash anything else than file in flat import mode.
|
||||
_ => {
|
||||
return Err(ImportError::FlatImportOfNonFile(
|
||||
path.to_string_lossy().to_string(),
|
||||
)
|
||||
.into())
|
||||
}
|
||||
};
|
||||
|
||||
// FUTUREWORK: avoid hashing again.
|
||||
CAHash::Flat(NixHash::Sha256(
|
||||
state
|
||||
.tokio_handle
|
||||
.block_on(async { state.blob_to_sha256_hash(digest).await })?,
|
||||
))
|
||||
};
|
||||
|
||||
let obtained_hash = ca.hash().clone().into_owned();
|
||||
let (path_info, output_path) = state.tokio_handle.block_on(async {
|
||||
state
|
||||
.node_to_path_info(name.as_ref(), path.as_ref(), ca, root_node)
|
||||
.await
|
||||
})?;
|
||||
|
||||
if let Some(expected_sha256) = expected_sha256 {
|
||||
if obtained_hash != expected_sha256 {
|
||||
Err(ImportError::HashMismatch(
|
||||
path.to_string_lossy().to_string(),
|
||||
expected_sha256,
|
||||
obtained_hash,
|
||||
))?;
|
||||
}
|
||||
}
|
||||
|
||||
let _: tvix_store::proto::PathInfo = state.tokio_handle.block_on(async {
|
||||
// This is necessary to cause the coercion of the error type.
|
||||
Ok::<_, std::io::Error>(state.path_info_service.as_ref().put(path_info).await?)
|
||||
})?;
|
||||
|
||||
Ok(output_path.to_absolute_path().into())
|
||||
}
|
||||
|
||||
#[builtin("filterSource")]
|
||||
async fn builtin_filter_source(
|
||||
state: Rc<TvixStoreIO>,
|
||||
|
|
|
@ -10,7 +10,7 @@ mod fetchers;
|
|||
mod import;
|
||||
mod utils;
|
||||
|
||||
pub use errors::{DerivationError, FetcherError};
|
||||
pub use errors::{DerivationError, FetcherError, ImportError};
|
||||
|
||||
/// Adds derivation-related builtins to the passed [tvix_eval::Evaluation].
|
||||
///
|
||||
|
@ -438,6 +438,182 @@ mod tests {
|
|||
assert!(eval_result.errors.is_empty(), "errors should be empty");
|
||||
}
|
||||
|
||||
// Space is an illegal character.
|
||||
#[test_case(
|
||||
r#"(builtins.path { name = "valid-name"; path = @fixtures + "/te st"; recursive = true; })"#,
|
||||
true
|
||||
)]
|
||||
// Space is still an illegal character.
|
||||
#[test_case(
|
||||
r#"(builtins.path { name = "invalid name"; path = @fixtures + "/te st"; recursive = true; })"#,
|
||||
false
|
||||
)]
|
||||
fn builtins_path_recursive_rename(code: &str, success: bool) {
|
||||
// populate the fixtures dir
|
||||
let temp = TempDir::new().expect("create temporary directory");
|
||||
let p = temp.path().join("import_fixtures");
|
||||
|
||||
// create the fixtures directory.
|
||||
// We produce them at runtime rather than shipping it inside the source
|
||||
// tree, as git can't model certain things - like directories without any
|
||||
// items.
|
||||
{
|
||||
fs::create_dir(&p).expect("creating import_fixtures");
|
||||
fs::write(p.join("te st"), "").expect("creating `/te st`");
|
||||
}
|
||||
// replace @fixtures with the temporary path containing the fixtures
|
||||
let code_replaced = code.replace("@fixtures", &p.to_string_lossy());
|
||||
|
||||
let eval_result = eval(&code_replaced);
|
||||
|
||||
let value = eval_result.value;
|
||||
|
||||
if success {
|
||||
match value.expect("expected successful evaluation on legal rename") {
|
||||
tvix_eval::Value::String(s) => {
|
||||
assert_eq!(
|
||||
"/nix/store/nd5z11x7zjqqz44rkbhc6v7yifdkn659-valid-name",
|
||||
s.as_bstr()
|
||||
);
|
||||
}
|
||||
v => panic!("unexpected value type: {:?}", v),
|
||||
}
|
||||
} else {
|
||||
assert!(value.is_none(), "unexpected success on illegal store paths");
|
||||
}
|
||||
}
|
||||
|
||||
// Space is an illegal character.
|
||||
#[test_case(
|
||||
r#"(builtins.path { name = "valid-name"; path = @fixtures + "/te st"; recursive = false; })"#,
|
||||
true
|
||||
)]
|
||||
// Space is still an illegal character.
|
||||
#[test_case(
|
||||
r#"(builtins.path { name = "invalid name"; path = @fixtures + "/te st"; recursive = false; })"#,
|
||||
false
|
||||
)]
|
||||
// The non-recursive variant passes explicitly `recursive = false;`
|
||||
fn builtins_path_nonrecursive_rename(code: &str, success: bool) {
|
||||
// populate the fixtures dir
|
||||
let temp = TempDir::new().expect("create temporary directory");
|
||||
let p = temp.path().join("import_fixtures");
|
||||
|
||||
// create the fixtures directory.
|
||||
// We produce them at runtime rather than shipping it inside the source
|
||||
// tree, as git can't model certain things - like directories without any
|
||||
// items.
|
||||
{
|
||||
fs::create_dir(&p).expect("creating import_fixtures");
|
||||
fs::write(p.join("te st"), "").expect("creating `/te st`");
|
||||
}
|
||||
// replace @fixtures with the temporary path containing the fixtures
|
||||
let code_replaced = code.replace("@fixtures", &p.to_string_lossy());
|
||||
|
||||
let eval_result = eval(&code_replaced);
|
||||
|
||||
let value = eval_result.value;
|
||||
|
||||
if success {
|
||||
match value.expect("expected successful evaluation on legal rename") {
|
||||
tvix_eval::Value::String(s) => {
|
||||
assert_eq!(
|
||||
"/nix/store/il2rmfbqgs37rshr8w7x64hd4d3b4bsa-valid-name",
|
||||
s.as_bstr()
|
||||
);
|
||||
}
|
||||
v => panic!("unexpected value type: {:?}", v),
|
||||
}
|
||||
} else {
|
||||
assert!(value.is_none(), "unexpected success on illegal store paths");
|
||||
}
|
||||
}
|
||||
|
||||
#[test_case(
|
||||
r#"(builtins.path { name = "valid-name"; path = @fixtures + "/te st"; recursive = false; sha256 = "sha256-47DEQpj8HBSa+/TImW+5JCeuQeRkm5NMpJWZG3hSuFU="; })"#,
|
||||
true
|
||||
)]
|
||||
#[test_case(
|
||||
r#"(builtins.path { name = "valid-name"; path = @fixtures + "/te st"; recursive = true; sha256 = "sha256-47DEQpj8HBSa+/TImW+5JCeuQeRkm5NMpJWZG3hSuFU="; })"#,
|
||||
false
|
||||
)]
|
||||
#[test_case(
|
||||
r#"(builtins.path { name = "valid-name"; path = @fixtures + "/te st"; recursive = true; sha256 = "sha256-d6xi4mKdjkX2JFicDIv5niSzpyI0m/Hnm8GGAIU04kY="; })"#,
|
||||
true
|
||||
)]
|
||||
#[test_case(
|
||||
r#"(builtins.path { name = "valid-name"; path = @fixtures + "/te st"; recursive = false; sha256 = "sha256-d6xi4mKdjkX2JFicDIv5niSzpyI0m/Hnm8GGAIU04kY="; })"#,
|
||||
false
|
||||
)]
|
||||
fn builtins_path_fod_locking(code: &str, success: bool) {
|
||||
// populate the fixtures dir
|
||||
let temp = TempDir::new().expect("create temporary directory");
|
||||
let p = temp.path().join("import_fixtures");
|
||||
|
||||
// create the fixtures directory.
|
||||
// We produce them at runtime rather than shipping it inside the source
|
||||
// tree, as git can't model certain things - like directories without any
|
||||
// items.
|
||||
{
|
||||
fs::create_dir(&p).expect("creating import_fixtures");
|
||||
fs::write(p.join("te st"), "").expect("creating `/te st`");
|
||||
}
|
||||
// replace @fixtures with the temporary path containing the fixtures
|
||||
let code_replaced = code.replace("@fixtures", &p.to_string_lossy());
|
||||
|
||||
let eval_result = eval(&code_replaced);
|
||||
|
||||
let value = eval_result.value;
|
||||
|
||||
if success {
|
||||
assert!(
|
||||
value.is_some(),
|
||||
"expected successful evaluation on legal rename and valid FOD sha256"
|
||||
);
|
||||
} else {
|
||||
assert!(value.is_none(), "unexpected success on invalid FOD sha256");
|
||||
}
|
||||
}
|
||||
|
||||
#[test_case(
|
||||
r#"(builtins.path { name = "valid-path"; path = @fixtures + "/te st dir"; filter = _: _: true; })"#,
|
||||
"/nix/store/i28jmi4fwym4fw3flkrkp2mdxx50pdy0-valid-path"
|
||||
)]
|
||||
#[test_case(
|
||||
r#"(builtins.path { name = "valid-path"; path = @fixtures + "/te st dir"; filter = _: _: false; })"#,
|
||||
"/nix/store/pwza2ij9gk1fmzhbjnynmfv2mq2sgcap-valid-path"
|
||||
)]
|
||||
fn builtins_path_filter(code: &str, expected_outpath: &str) {
|
||||
// populate the fixtures dir
|
||||
let temp = TempDir::new().expect("create temporary directory");
|
||||
let p = temp.path().join("import_fixtures");
|
||||
|
||||
// create the fixtures directory.
|
||||
// We produce them at runtime rather than shipping it inside the source
|
||||
// tree, as git can't model certain things - like directories without any
|
||||
// items.
|
||||
{
|
||||
fs::create_dir(&p).expect("creating import_fixtures");
|
||||
fs::create_dir(p.join("te st dir")).expect("creating `/te st dir`");
|
||||
fs::write(p.join("te st dir").join("test"), "").expect("creating `/te st dir/test`");
|
||||
}
|
||||
// replace @fixtures with the temporary path containing the fixtures
|
||||
let code_replaced = code.replace("@fixtures", &p.to_string_lossy());
|
||||
|
||||
let eval_result = eval(&code_replaced);
|
||||
|
||||
let value = eval_result.value.expect("must succeed");
|
||||
|
||||
match value {
|
||||
tvix_eval::Value::String(s) => {
|
||||
assert_eq!(expected_outpath, s.as_bstr());
|
||||
}
|
||||
_ => panic!("unexpected value type: {:?}", value),
|
||||
}
|
||||
|
||||
assert!(eval_result.errors.is_empty(), "errors should be empty");
|
||||
}
|
||||
|
||||
// All tests filter out some unsupported (not representable in castore) nodes, confirming
|
||||
// invalid, but filtered-out nodes don't prevent ingestion of a path.
|
||||
#[cfg(target_family = "unix")]
|
||||
|
|
|
@ -21,6 +21,7 @@ use tokio::io::AsyncReadExt;
|
|||
use tracing::{error, instrument, warn, Level};
|
||||
use tvix_build::buildservice::BuildService;
|
||||
use tvix_eval::{ErrorKind, EvalIO, FileType, StdIO};
|
||||
use tvix_store::utils::AsyncIoBridge;
|
||||
use walkdir::DirEntry;
|
||||
|
||||
use tvix_castore::{
|
||||
|
@ -341,6 +342,26 @@ impl TvixStoreIO {
|
|||
Ok(output_path)
|
||||
}
|
||||
|
||||
/// Transforms a BLAKE-3 digest into a SHA256 digest
|
||||
/// by re-hashing the whole file.
|
||||
pub(crate) async fn blob_to_sha256_hash(&self, blob_digest: B3Digest) -> io::Result<[u8; 32]> {
|
||||
let mut reader = self
|
||||
.blob_service
|
||||
.open_read(&blob_digest)
|
||||
.await?
|
||||
.ok_or_else(|| {
|
||||
io::Error::new(
|
||||
io::ErrorKind::NotFound,
|
||||
format!("blob represented by digest: '{}' not found", blob_digest),
|
||||
)
|
||||
})?;
|
||||
// It is fine to use `AsyncIoBridge` here because hashing is not actually I/O.
|
||||
let mut hasher = AsyncIoBridge(Sha256::new());
|
||||
|
||||
tokio::io::copy(&mut reader, &mut hasher).await?;
|
||||
Ok(hasher.0.finalize().into())
|
||||
}
|
||||
|
||||
pub async fn store_path_exists<'a>(&'a self, store_path: StorePathRef<'a>) -> io::Result<bool> {
|
||||
Ok(self
|
||||
.path_info_service
|
||||
|
|
|
@ -1,12 +1,10 @@
|
|||
use crate::utils::AsyncIoBridge;
|
||||
|
||||
use super::RenderError;
|
||||
use async_recursion::async_recursion;
|
||||
use count_write::CountWrite;
|
||||
use nix_compat::nar::writer::r#async as nar_writer;
|
||||
use sha2::{Digest, Sha256};
|
||||
use std::{
|
||||
pin::Pin,
|
||||
task::{self, Poll},
|
||||
};
|
||||
use tokio::io::{self, AsyncWrite, BufReader};
|
||||
use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
|
||||
use tvix_castore::{
|
||||
|
@ -42,31 +40,6 @@ where
|
|||
Ok((cw.count(), h.finalize().into()))
|
||||
}
|
||||
|
||||
/// The inverse of [tokio_util::io::SyncIoBridge].
|
||||
/// Don't use this with anything that actually does blocking I/O.
|
||||
struct AsyncIoBridge<T>(T);
|
||||
|
||||
impl<W: std::io::Write + Unpin> AsyncWrite for AsyncIoBridge<W> {
|
||||
fn poll_write(
|
||||
self: Pin<&mut Self>,
|
||||
_cx: &mut task::Context<'_>,
|
||||
buf: &[u8],
|
||||
) -> Poll<io::Result<usize>> {
|
||||
Poll::Ready(self.get_mut().0.write(buf))
|
||||
}
|
||||
|
||||
fn poll_flush(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
|
||||
Poll::Ready(self.get_mut().0.flush())
|
||||
}
|
||||
|
||||
fn poll_shutdown(
|
||||
self: Pin<&mut Self>,
|
||||
_cx: &mut task::Context<'_>,
|
||||
) -> Poll<Result<(), io::Error>> {
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
}
|
||||
|
||||
/// Accepts a [castorepb::node::Node] pointing to the root of a (store) path,
|
||||
/// and uses the passed blob_service and directory_service to perform the
|
||||
/// necessary lookups as it traverses the structure.
|
||||
|
|
|
@ -1,4 +1,9 @@
|
|||
use std::sync::Arc;
|
||||
use std::{
|
||||
pin::Pin,
|
||||
task::{self, Poll},
|
||||
};
|
||||
use tokio::io::{self, AsyncWrite};
|
||||
|
||||
use tvix_castore::{
|
||||
blobservice::{self, BlobService},
|
||||
|
@ -33,3 +38,28 @@ pub async fn construct_services(
|
|||
|
||||
Ok((blob_service, directory_service, path_info_service))
|
||||
}
|
||||
|
||||
/// The inverse of [tokio_util::io::SyncIoBridge].
|
||||
/// Don't use this with anything that actually does blocking I/O.
|
||||
pub struct AsyncIoBridge<T>(pub T);
|
||||
|
||||
impl<W: std::io::Write + Unpin> AsyncWrite for AsyncIoBridge<W> {
|
||||
fn poll_write(
|
||||
self: Pin<&mut Self>,
|
||||
_cx: &mut task::Context<'_>,
|
||||
buf: &[u8],
|
||||
) -> Poll<io::Result<usize>> {
|
||||
Poll::Ready(self.get_mut().0.write(buf))
|
||||
}
|
||||
|
||||
fn poll_flush(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
|
||||
Poll::Ready(self.get_mut().0.flush())
|
||||
}
|
||||
|
||||
fn poll_shutdown(
|
||||
self: Pin<&mut Self>,
|
||||
_cx: &mut task::Context<'_>,
|
||||
) -> Poll<Result<(), io::Error>> {
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue