refactor(tvix/castore): add separate Error enum for archives

The `Error` enum for the `imports` crate has both filesystem and archive
specific errors and was starting to get messy.

This adds a separate `Error` enum for archive-specific errors and then
keeps a single `Archive` variant in the top-level import `Error` for all
archive errors.

Change-Id: I4cd0746c864e5ec50b1aa68c0630ef9cd05176c7
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11498
Tested-by: BuildkiteCI
Autosubmit: Connor Brewster <cbrewster@hey.com>
Reviewed-by: flokli <flokli@flokli.de>
This commit is contained in:
Connor Brewster 2024-04-21 17:40:02 -05:00 committed by clbot
parent 79698c470c
commit d2e67f021e
3 changed files with 37 additions and 33 deletions

View file

@ -15,7 +15,7 @@ use tracing::{instrument, warn, Level};
use crate::blobservice::BlobService; use crate::blobservice::BlobService;
use crate::directoryservice::DirectoryService; use crate::directoryservice::DirectoryService;
use crate::import::{ingest_entries, Error, IngestionEntry}; use crate::import::{ingest_entries, Error as ImportError, IngestionEntry};
use crate::proto::node::Node; use crate::proto::node::Node;
use crate::B3Digest; use crate::B3Digest;
@ -30,6 +30,24 @@ const CONCURRENT_BLOB_UPLOAD_THRESHOLD: u32 = 1024 * 1024;
/// The maximum amount of bytes allowed to be buffered in memory to perform async blob uploads. /// The maximum amount of bytes allowed to be buffered in memory to perform async blob uploads.
const MAX_TARBALL_BUFFER_SIZE: usize = 128 * 1024 * 1024; const MAX_TARBALL_BUFFER_SIZE: usize = 128 * 1024 * 1024;
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("error reading archive entry: {0}")]
Io(#[from] std::io::Error),
#[error("unsupported tar entry {0} type: {1:?}")]
UnsupportedTarEntry(PathBuf, tokio_tar::EntryType),
#[error("symlink missing target {0}")]
MissingSymlinkTarget(PathBuf),
#[error("unexpected number of top level directory entries")]
UnexpectedNumberOfTopLevelEntries,
#[error("failed to import into castore {0}")]
Import(#[from] ImportError),
}
/// Ingests elements from the given tar [`Archive`] into a the passed [`BlobService`] and /// Ingests elements from the given tar [`Archive`] into a the passed [`BlobService`] and
/// [`DirectoryService`]. /// [`DirectoryService`].
#[instrument(skip_all, ret(level = Level::TRACE), err)] #[instrument(skip_all, ret(level = Level::TRACE), err)]
@ -53,16 +71,16 @@ where
let semaphore = Arc::new(Semaphore::new(MAX_TARBALL_BUFFER_SIZE)); let semaphore = Arc::new(Semaphore::new(MAX_TARBALL_BUFFER_SIZE));
let mut async_blob_uploads: JoinSet<Result<(), Error>> = JoinSet::new(); let mut async_blob_uploads: JoinSet<Result<(), Error>> = JoinSet::new();
let mut entries_iter = archive.entries().map_err(Error::Archive)?; let mut entries_iter = archive.entries()?;
while let Some(mut entry) = entries_iter.try_next().await.map_err(Error::Archive)? { while let Some(mut entry) = entries_iter.try_next().await? {
let path: PathBuf = entry.path().map_err(Error::Archive)?.into(); let path: PathBuf = entry.path()?.into();
let header = entry.header(); let header = entry.header();
let entry = match header.entry_type() { let entry = match header.entry_type() {
tokio_tar::EntryType::Regular tokio_tar::EntryType::Regular
| tokio_tar::EntryType::GNUSparse | tokio_tar::EntryType::GNUSparse
| tokio_tar::EntryType::Continuous => { | tokio_tar::EntryType::Continuous => {
let header_size = header.size().map_err(Error::Archive)?; let header_size = header.size()?;
// If the blob is small enough, read it off the wire, compute the digest, // If the blob is small enough, read it off the wire, compute the digest,
// and upload it to the [BlobService] in the background. // and upload it to the [BlobService] in the background.
@ -83,9 +101,7 @@ where
.acquire_many_owned(header_size as u32) .acquire_many_owned(header_size as u32)
.await .await
.unwrap(); .unwrap();
let size = tokio::io::copy(&mut reader, &mut buffer) let size = tokio::io::copy(&mut reader, &mut buffer).await?;
.await
.map_err(Error::Archive)?;
let digest: B3Digest = hasher.finalize().as_bytes().into(); let digest: B3Digest = hasher.finalize().as_bytes().into();
@ -96,11 +112,9 @@ where
async move { async move {
let mut writer = blob_service.open_write().await; let mut writer = blob_service.open_write().await;
tokio::io::copy(&mut Cursor::new(buffer), &mut writer) tokio::io::copy(&mut Cursor::new(buffer), &mut writer).await?;
.await
.map_err(Error::Archive)?;
let blob_digest = writer.close().await.map_err(Error::Archive)?; let blob_digest = writer.close().await?;
assert_eq!(digest, blob_digest, "Tvix bug: blob digest mismatch"); assert_eq!(digest, blob_digest, "Tvix bug: blob digest mismatch");
@ -116,11 +130,9 @@ where
} else { } else {
let mut writer = blob_service.open_write().await; let mut writer = blob_service.open_write().await;
let size = tokio::io::copy(&mut entry, &mut writer) let size = tokio::io::copy(&mut entry, &mut writer).await?;
.await
.map_err(Error::Archive)?;
let digest = writer.close().await.map_err(Error::Archive)?; let digest = writer.close().await?;
(size, digest) (size, digest)
}; };
@ -128,14 +140,13 @@ where
IngestionEntry::Regular { IngestionEntry::Regular {
path, path,
size, size,
executable: entry.header().mode().map_err(Error::Archive)? & 64 != 0, executable: entry.header().mode()? & 64 != 0,
digest, digest,
} }
} }
tokio_tar::EntryType::Symlink => IngestionEntry::Symlink { tokio_tar::EntryType::Symlink => IngestionEntry::Symlink {
target: entry target: entry
.link_name() .link_name()?
.map_err(Error::Archive)?
.ok_or_else(|| Error::MissingSymlinkTarget(path.clone()))? .ok_or_else(|| Error::MissingSymlinkTarget(path.clone()))?
.into(), .into(),
path, path,
@ -157,11 +168,13 @@ where
result.expect("task panicked")?; result.expect("task panicked")?;
} }
ingest_entries( let root_node = ingest_entries(
directory_service, directory_service,
futures::stream::iter(nodes.finalize()?.into_iter().map(Ok)), futures::stream::iter(nodes.finalize()?.into_iter().map(Ok)),
) )
.await .await?;
Ok(root_node)
} }
/// Keep track of the directory structure of a file tree being ingested. This is used /// Keep track of the directory structure of a file tree being ingested. This is used

View file

@ -19,20 +19,8 @@ pub enum Error {
#[error("unable to read {0}: {1}")] #[error("unable to read {0}: {1}")]
UnableToRead(PathBuf, std::io::Error), UnableToRead(PathBuf, std::io::Error),
#[error("error reading from archive: {0}")]
Archive(std::io::Error),
#[error("unsupported file {0} type: {1:?}")] #[error("unsupported file {0} type: {1:?}")]
UnsupportedFileType(PathBuf, FileType), UnsupportedFileType(PathBuf, FileType),
#[error("unsupported tar entry {0} type: {1:?}")]
UnsupportedTarEntry(PathBuf, tokio_tar::EntryType),
#[error("symlink missing target {0}")]
MissingSymlinkTarget(PathBuf),
#[error("unexpected number of top level directory entries")]
UnexpectedNumberOfTopLevelEntries,
} }
impl From<CastoreError> for Error { impl From<CastoreError> for Error {

View file

@ -54,6 +54,9 @@ pub enum FetcherError {
#[error(transparent)] #[error(transparent)]
Import(#[from] tvix_castore::import::Error), Import(#[from] tvix_castore::import::Error),
#[error(transparent)]
ImportArchive(#[from] tvix_castore::import::archive::Error),
#[error("Error calculating store path for fetcher output: {0}")] #[error("Error calculating store path for fetcher output: {0}")]
StorePath(#[from] BuildStorePathError), StorePath(#[from] BuildStorePathError),
} }