diff --git a/tvix/castore/src/import/archive.rs b/tvix/castore/src/import/archive.rs index 84e280faf..adcfb871d 100644 --- a/tvix/castore/src/import/archive.rs +++ b/tvix/castore/src/import/archive.rs @@ -15,7 +15,7 @@ use tracing::{instrument, warn, Level}; use crate::blobservice::BlobService; use crate::directoryservice::DirectoryService; -use crate::import::{ingest_entries, Error, IngestionEntry}; +use crate::import::{ingest_entries, Error as ImportError, IngestionEntry}; use crate::proto::node::Node; use crate::B3Digest; @@ -30,6 +30,24 @@ const CONCURRENT_BLOB_UPLOAD_THRESHOLD: u32 = 1024 * 1024; /// The maximum amount of bytes allowed to be buffered in memory to perform async blob uploads. const MAX_TARBALL_BUFFER_SIZE: usize = 128 * 1024 * 1024; +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("error reading archive entry: {0}")] + Io(#[from] std::io::Error), + + #[error("unsupported tar entry {0} type: {1:?}")] + UnsupportedTarEntry(PathBuf, tokio_tar::EntryType), + + #[error("symlink missing target {0}")] + MissingSymlinkTarget(PathBuf), + + #[error("unexpected number of top level directory entries")] + UnexpectedNumberOfTopLevelEntries, + + #[error("failed to import into castore {0}")] + Import(#[from] ImportError), +} + /// Ingests elements from the given tar [`Archive`] into a the passed [`BlobService`] and /// [`DirectoryService`]. #[instrument(skip_all, ret(level = Level::TRACE), err)] @@ -53,16 +71,16 @@ where let semaphore = Arc::new(Semaphore::new(MAX_TARBALL_BUFFER_SIZE)); let mut async_blob_uploads: JoinSet> = JoinSet::new(); - let mut entries_iter = archive.entries().map_err(Error::Archive)?; - while let Some(mut entry) = entries_iter.try_next().await.map_err(Error::Archive)? { - let path: PathBuf = entry.path().map_err(Error::Archive)?.into(); + let mut entries_iter = archive.entries()?; + while let Some(mut entry) = entries_iter.try_next().await? { + let path: PathBuf = entry.path()?.into(); let header = entry.header(); let entry = match header.entry_type() { tokio_tar::EntryType::Regular | tokio_tar::EntryType::GNUSparse | tokio_tar::EntryType::Continuous => { - let header_size = header.size().map_err(Error::Archive)?; + let header_size = header.size()?; // If the blob is small enough, read it off the wire, compute the digest, // and upload it to the [BlobService] in the background. @@ -83,9 +101,7 @@ where .acquire_many_owned(header_size as u32) .await .unwrap(); - let size = tokio::io::copy(&mut reader, &mut buffer) - .await - .map_err(Error::Archive)?; + let size = tokio::io::copy(&mut reader, &mut buffer).await?; let digest: B3Digest = hasher.finalize().as_bytes().into(); @@ -96,11 +112,9 @@ where async move { let mut writer = blob_service.open_write().await; - tokio::io::copy(&mut Cursor::new(buffer), &mut writer) - .await - .map_err(Error::Archive)?; + tokio::io::copy(&mut Cursor::new(buffer), &mut writer).await?; - let blob_digest = writer.close().await.map_err(Error::Archive)?; + let blob_digest = writer.close().await?; assert_eq!(digest, blob_digest, "Tvix bug: blob digest mismatch"); @@ -116,11 +130,9 @@ where } else { let mut writer = blob_service.open_write().await; - let size = tokio::io::copy(&mut entry, &mut writer) - .await - .map_err(Error::Archive)?; + let size = tokio::io::copy(&mut entry, &mut writer).await?; - let digest = writer.close().await.map_err(Error::Archive)?; + let digest = writer.close().await?; (size, digest) }; @@ -128,14 +140,13 @@ where IngestionEntry::Regular { path, size, - executable: entry.header().mode().map_err(Error::Archive)? & 64 != 0, + executable: entry.header().mode()? & 64 != 0, digest, } } tokio_tar::EntryType::Symlink => IngestionEntry::Symlink { target: entry - .link_name() - .map_err(Error::Archive)? + .link_name()? .ok_or_else(|| Error::MissingSymlinkTarget(path.clone()))? .into(), path, @@ -157,11 +168,13 @@ where result.expect("task panicked")?; } - ingest_entries( + let root_node = ingest_entries( directory_service, futures::stream::iter(nodes.finalize()?.into_iter().map(Ok)), ) - .await + .await?; + + Ok(root_node) } /// Keep track of the directory structure of a file tree being ingested. This is used diff --git a/tvix/castore/src/import/error.rs b/tvix/castore/src/import/error.rs index 8cd4f95ff..15dd0664d 100644 --- a/tvix/castore/src/import/error.rs +++ b/tvix/castore/src/import/error.rs @@ -19,20 +19,8 @@ pub enum Error { #[error("unable to read {0}: {1}")] UnableToRead(PathBuf, std::io::Error), - #[error("error reading from archive: {0}")] - Archive(std::io::Error), - #[error("unsupported file {0} type: {1:?}")] UnsupportedFileType(PathBuf, FileType), - - #[error("unsupported tar entry {0} type: {1:?}")] - UnsupportedTarEntry(PathBuf, tokio_tar::EntryType), - - #[error("symlink missing target {0}")] - MissingSymlinkTarget(PathBuf), - - #[error("unexpected number of top level directory entries")] - UnexpectedNumberOfTopLevelEntries, } impl From for Error { diff --git a/tvix/glue/src/builtins/errors.rs b/tvix/glue/src/builtins/errors.rs index 5e36bc1a2..c05d366f1 100644 --- a/tvix/glue/src/builtins/errors.rs +++ b/tvix/glue/src/builtins/errors.rs @@ -54,6 +54,9 @@ pub enum FetcherError { #[error(transparent)] Import(#[from] tvix_castore::import::Error), + #[error(transparent)] + ImportArchive(#[from] tvix_castore::import::archive::Error), + #[error("Error calculating store path for fetcher output: {0}")] StorePath(#[from] BuildStorePathError), }