2023-05-25 16:52:08 +02:00
|
|
|
use crate::blobservice::BlobService;
|
|
|
|
use crate::directoryservice::DirectoryService;
|
|
|
|
use crate::{directoryservice::DirectoryPutter, proto};
|
2023-06-09 17:22:25 +02:00
|
|
|
use std::sync::Arc;
|
2023-02-27 13:03:53 +01:00
|
|
|
use std::{
|
|
|
|
collections::HashMap,
|
|
|
|
fmt::Debug,
|
|
|
|
fs,
|
|
|
|
fs::File,
|
refactor(tvix/store): remove ChunkService
Whether chunking is involved or not, is an implementation detail of each
Blobstore. Consumers of a whole blob shouldn't need to worry about that.
It currently is not visible in the gRPC interface either. It
shouldn't bleed into everything.
Let the BlobService trait provide `open_read` and `open_write` methods,
which return handles providing io::Read or io::Write, and leave the
details up to the implementation.
This means, our custom BlobReader module can go away, and all the
chunking bits in there, too.
In the future, we might still want to add more chunking-aware syncing,
but as a syncing strategy some stores can expose, not as a fundamental
protocol component.
This currently needs "SyncReadIntoAsyncRead", taken and vendored in from
https://github.com/tokio-rs/tokio/pull/5669.
It provides a AsyncRead for a sync Read, which is necessary to connect
our (sync) BlobReader interface to a GRPC server implementation.
As an alternative, we could also make the BlobReader itself async, and
let consumers of the trait (EvalIO) deal with the async-ness, but this
is less of a change for now.
In terms of vendoring, I initially tried to move our tokio crate to
these commits, but ended up in version incompatibilities, so let's
vendor it in for now.
Change-Id: I5969ebbc4c0e1ceece47981be3b9e7cfb3f59ad0
Reviewed-on: https://cl.tvl.fyi/c/depot/+/8551
Tested-by: BuildkiteCI
Reviewed-by: tazjin <tazjin@tvl.su>
2023-05-11 14:49:01 +02:00
|
|
|
io,
|
2023-02-27 13:03:53 +01:00
|
|
|
os::unix::prelude::PermissionsExt,
|
|
|
|
path::{Path, PathBuf},
|
|
|
|
};
|
|
|
|
use tracing::instrument;
|
|
|
|
use walkdir::WalkDir;
|
|
|
|
|
|
|
|
#[derive(Debug, thiserror::Error)]
|
|
|
|
pub enum Error {
|
|
|
|
#[error("failed to upload directory at {0}: {1}")]
|
|
|
|
UploadDirectoryError(PathBuf, crate::Error),
|
|
|
|
|
|
|
|
#[error("invalid encoding encountered for entry {0:?}")]
|
|
|
|
InvalidEncoding(PathBuf),
|
|
|
|
|
|
|
|
#[error("unable to stat {0}: {1}")]
|
|
|
|
UnableToStat(PathBuf, std::io::Error),
|
|
|
|
|
|
|
|
#[error("unable to open {0}: {1}")]
|
|
|
|
UnableToOpen(PathBuf, std::io::Error),
|
|
|
|
|
|
|
|
#[error("unable to read {0}: {1}")]
|
|
|
|
UnableToRead(PathBuf, std::io::Error),
|
|
|
|
}
|
|
|
|
|
|
|
|
impl From<super::Error> for Error {
|
|
|
|
fn from(value: super::Error) -> Self {
|
|
|
|
match value {
|
|
|
|
crate::Error::InvalidRequest(_) => panic!("tvix bug"),
|
|
|
|
crate::Error::StorageError(_) => panic!("error"),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// This processes a given [walkdir::DirEntry] and returns a
|
|
|
|
// proto::node::Node, depending on the type of the entry.
|
|
|
|
//
|
|
|
|
// If the entry is a file, its contents are uploaded.
|
|
|
|
// If the entry is a directory, the Directory is uploaded as well.
|
|
|
|
// For this to work, it relies on the caller to provide the directory object
|
|
|
|
// with the previously returned (child) nodes.
|
|
|
|
//
|
|
|
|
// It assumes entries to be returned in "contents first" order, means this
|
|
|
|
// will only be called with a directory if all children of it have been
|
|
|
|
// visited. If the entry is indeed a directory, it'll also upload that
|
|
|
|
// directory to the store. For this, the so-far-assembled Directory object for
|
|
|
|
// this path needs to be passed in.
|
|
|
|
//
|
|
|
|
// It assumes the caller adds returned nodes to the directories it assembles.
|
|
|
|
#[instrument(skip_all, fields(entry.file_type=?&entry.file_type(),entry.path=?entry.path()))]
|
2023-06-09 11:26:34 +02:00
|
|
|
fn process_entry(
|
2023-06-09 17:22:25 +02:00
|
|
|
blob_service: Arc<dyn BlobService>,
|
2023-06-09 11:26:34 +02:00
|
|
|
directory_putter: &mut Box<dyn DirectoryPutter>,
|
2023-02-27 13:03:53 +01:00
|
|
|
entry: &walkdir::DirEntry,
|
|
|
|
maybe_directory: Option<proto::Directory>,
|
|
|
|
) -> Result<proto::node::Node, Error> {
|
|
|
|
let file_type = entry.file_type();
|
|
|
|
|
|
|
|
let entry_path: PathBuf = entry.path().to_path_buf();
|
|
|
|
|
|
|
|
if file_type.is_dir() {
|
|
|
|
let directory = maybe_directory
|
|
|
|
.expect("tvix bug: must be called with some directory in the case of directory");
|
|
|
|
let directory_digest = directory.digest();
|
|
|
|
let directory_size = directory.size();
|
|
|
|
|
|
|
|
// upload this directory
|
2023-03-27 17:24:53 +02:00
|
|
|
directory_putter
|
2023-02-27 13:03:53 +01:00
|
|
|
.put(directory)
|
|
|
|
.map_err(|e| Error::UploadDirectoryError(entry.path().to_path_buf(), e))?;
|
|
|
|
|
|
|
|
return Ok(proto::node::Node::Directory(proto::DirectoryNode {
|
|
|
|
name: entry
|
|
|
|
.file_name()
|
|
|
|
.to_str()
|
|
|
|
.map(|s| Ok(s.to_owned()))
|
|
|
|
.unwrap_or(Err(Error::InvalidEncoding(entry.path().to_path_buf())))?,
|
2023-03-16 00:01:30 +01:00
|
|
|
digest: directory_digest.to_vec(),
|
2023-02-27 13:03:53 +01:00
|
|
|
size: directory_size,
|
|
|
|
}));
|
|
|
|
}
|
|
|
|
|
|
|
|
if file_type.is_symlink() {
|
|
|
|
let target = std::fs::read_link(&entry_path)
|
|
|
|
.map_err(|e| Error::UnableToStat(entry_path.clone(), e))?;
|
|
|
|
|
|
|
|
return Ok(proto::node::Node::Symlink(proto::SymlinkNode {
|
|
|
|
name: entry
|
|
|
|
.file_name()
|
|
|
|
.to_str()
|
|
|
|
.map(|s| Ok(s.to_owned()))
|
|
|
|
.unwrap_or(Err(Error::InvalidEncoding(entry.path().to_path_buf())))?,
|
|
|
|
target: target
|
|
|
|
.to_str()
|
|
|
|
.map(|s| Ok(s.to_owned()))
|
|
|
|
.unwrap_or(Err(Error::InvalidEncoding(entry.path().to_path_buf())))?,
|
|
|
|
}));
|
|
|
|
}
|
|
|
|
|
|
|
|
if file_type.is_file() {
|
|
|
|
let metadata = entry
|
|
|
|
.metadata()
|
|
|
|
.map_err(|e| Error::UnableToStat(entry_path.clone(), e.into()))?;
|
|
|
|
|
refactor(tvix/store): remove ChunkService
Whether chunking is involved or not, is an implementation detail of each
Blobstore. Consumers of a whole blob shouldn't need to worry about that.
It currently is not visible in the gRPC interface either. It
shouldn't bleed into everything.
Let the BlobService trait provide `open_read` and `open_write` methods,
which return handles providing io::Read or io::Write, and leave the
details up to the implementation.
This means, our custom BlobReader module can go away, and all the
chunking bits in there, too.
In the future, we might still want to add more chunking-aware syncing,
but as a syncing strategy some stores can expose, not as a fundamental
protocol component.
This currently needs "SyncReadIntoAsyncRead", taken and vendored in from
https://github.com/tokio-rs/tokio/pull/5669.
It provides a AsyncRead for a sync Read, which is necessary to connect
our (sync) BlobReader interface to a GRPC server implementation.
As an alternative, we could also make the BlobReader itself async, and
let consumers of the trait (EvalIO) deal with the async-ness, but this
is less of a change for now.
In terms of vendoring, I initially tried to move our tokio crate to
these commits, but ended up in version incompatibilities, so let's
vendor it in for now.
Change-Id: I5969ebbc4c0e1ceece47981be3b9e7cfb3f59ad0
Reviewed-on: https://cl.tvl.fyi/c/depot/+/8551
Tested-by: BuildkiteCI
Reviewed-by: tazjin <tazjin@tvl.su>
2023-05-11 14:49:01 +02:00
|
|
|
let mut file = File::open(entry_path.clone())
|
2023-03-11 21:14:57 +01:00
|
|
|
.map_err(|e| Error::UnableToOpen(entry_path.clone(), e))?;
|
2023-03-10 14:23:36 +01:00
|
|
|
|
2023-06-12 14:13:00 +02:00
|
|
|
let mut writer = blob_service.open_write();
|
refactor(tvix/store): remove ChunkService
Whether chunking is involved or not, is an implementation detail of each
Blobstore. Consumers of a whole blob shouldn't need to worry about that.
It currently is not visible in the gRPC interface either. It
shouldn't bleed into everything.
Let the BlobService trait provide `open_read` and `open_write` methods,
which return handles providing io::Read or io::Write, and leave the
details up to the implementation.
This means, our custom BlobReader module can go away, and all the
chunking bits in there, too.
In the future, we might still want to add more chunking-aware syncing,
but as a syncing strategy some stores can expose, not as a fundamental
protocol component.
This currently needs "SyncReadIntoAsyncRead", taken and vendored in from
https://github.com/tokio-rs/tokio/pull/5669.
It provides a AsyncRead for a sync Read, which is necessary to connect
our (sync) BlobReader interface to a GRPC server implementation.
As an alternative, we could also make the BlobReader itself async, and
let consumers of the trait (EvalIO) deal with the async-ness, but this
is less of a change for now.
In terms of vendoring, I initially tried to move our tokio crate to
these commits, but ended up in version incompatibilities, so let's
vendor it in for now.
Change-Id: I5969ebbc4c0e1ceece47981be3b9e7cfb3f59ad0
Reviewed-on: https://cl.tvl.fyi/c/depot/+/8551
Tested-by: BuildkiteCI
Reviewed-by: tazjin <tazjin@tvl.su>
2023-05-11 14:49:01 +02:00
|
|
|
|
|
|
|
if let Err(e) = io::copy(&mut file, &mut writer) {
|
|
|
|
return Err(Error::UnableToRead(entry_path, e));
|
|
|
|
};
|
|
|
|
|
|
|
|
let digest = writer.close()?;
|
2023-02-27 13:03:53 +01:00
|
|
|
|
|
|
|
return Ok(proto::node::Node::File(proto::FileNode {
|
|
|
|
name: entry
|
|
|
|
.file_name()
|
|
|
|
.to_str()
|
|
|
|
.map(|s| Ok(s.to_owned()))
|
|
|
|
.unwrap_or(Err(Error::InvalidEncoding(entry.path().to_path_buf())))?,
|
refactor(tvix/store): remove ChunkService
Whether chunking is involved or not, is an implementation detail of each
Blobstore. Consumers of a whole blob shouldn't need to worry about that.
It currently is not visible in the gRPC interface either. It
shouldn't bleed into everything.
Let the BlobService trait provide `open_read` and `open_write` methods,
which return handles providing io::Read or io::Write, and leave the
details up to the implementation.
This means, our custom BlobReader module can go away, and all the
chunking bits in there, too.
In the future, we might still want to add more chunking-aware syncing,
but as a syncing strategy some stores can expose, not as a fundamental
protocol component.
This currently needs "SyncReadIntoAsyncRead", taken and vendored in from
https://github.com/tokio-rs/tokio/pull/5669.
It provides a AsyncRead for a sync Read, which is necessary to connect
our (sync) BlobReader interface to a GRPC server implementation.
As an alternative, we could also make the BlobReader itself async, and
let consumers of the trait (EvalIO) deal with the async-ness, but this
is less of a change for now.
In terms of vendoring, I initially tried to move our tokio crate to
these commits, but ended up in version incompatibilities, so let's
vendor it in for now.
Change-Id: I5969ebbc4c0e1ceece47981be3b9e7cfb3f59ad0
Reviewed-on: https://cl.tvl.fyi/c/depot/+/8551
Tested-by: BuildkiteCI
Reviewed-by: tazjin <tazjin@tvl.su>
2023-05-11 14:49:01 +02:00
|
|
|
digest: digest.to_vec(),
|
2023-02-27 13:03:53 +01:00
|
|
|
size: metadata.len() as u32,
|
|
|
|
// If it's executable by the user, it'll become executable.
|
|
|
|
// This matches nix's dump() function behaviour.
|
|
|
|
executable: metadata.permissions().mode() & 64 != 0,
|
|
|
|
}));
|
|
|
|
}
|
|
|
|
todo!("handle other types")
|
|
|
|
}
|
|
|
|
|
2023-05-17 13:21:26 +02:00
|
|
|
/// Ingests the contents at the given path into the tvix store,
|
|
|
|
/// interacting with a [BlobService] and [DirectoryService].
|
|
|
|
/// It returns the root node or an error.
|
2023-02-27 13:03:53 +01:00
|
|
|
///
|
2023-05-17 13:21:26 +02:00
|
|
|
/// It's not interacting with a [PathInfoService], it's up to the caller to
|
|
|
|
/// possibly register it somewhere (and potentially rename it based on some
|
|
|
|
/// naming scheme.
|
refactor(tvix/store): remove ChunkService
Whether chunking is involved or not, is an implementation detail of each
Blobstore. Consumers of a whole blob shouldn't need to worry about that.
It currently is not visible in the gRPC interface either. It
shouldn't bleed into everything.
Let the BlobService trait provide `open_read` and `open_write` methods,
which return handles providing io::Read or io::Write, and leave the
details up to the implementation.
This means, our custom BlobReader module can go away, and all the
chunking bits in there, too.
In the future, we might still want to add more chunking-aware syncing,
but as a syncing strategy some stores can expose, not as a fundamental
protocol component.
This currently needs "SyncReadIntoAsyncRead", taken and vendored in from
https://github.com/tokio-rs/tokio/pull/5669.
It provides a AsyncRead for a sync Read, which is necessary to connect
our (sync) BlobReader interface to a GRPC server implementation.
As an alternative, we could also make the BlobReader itself async, and
let consumers of the trait (EvalIO) deal with the async-ness, but this
is less of a change for now.
In terms of vendoring, I initially tried to move our tokio crate to
these commits, but ended up in version incompatibilities, so let's
vendor it in for now.
Change-Id: I5969ebbc4c0e1ceece47981be3b9e7cfb3f59ad0
Reviewed-on: https://cl.tvl.fyi/c/depot/+/8551
Tested-by: BuildkiteCI
Reviewed-by: tazjin <tazjin@tvl.su>
2023-05-11 14:49:01 +02:00
|
|
|
#[instrument(skip(blob_service, directory_service), fields(path=?p))]
|
2023-06-09 11:26:34 +02:00
|
|
|
pub fn ingest_path<P: AsRef<Path> + Debug>(
|
2023-06-09 17:22:25 +02:00
|
|
|
blob_service: Arc<dyn BlobService>,
|
|
|
|
directory_service: Arc<dyn DirectoryService>,
|
2023-02-27 13:03:53 +01:00
|
|
|
p: P,
|
|
|
|
) -> Result<proto::node::Node, Error> {
|
|
|
|
// Probe if the path points to a symlink. If it does, we process it manually,
|
|
|
|
// due to https://github.com/BurntSushi/walkdir/issues/175.
|
|
|
|
let symlink_metadata = fs::symlink_metadata(p.as_ref())
|
|
|
|
.map_err(|e| Error::UnableToStat(p.as_ref().to_path_buf(), e))?;
|
|
|
|
if symlink_metadata.is_symlink() {
|
|
|
|
let target = std::fs::read_link(p.as_ref())
|
|
|
|
.map_err(|e| Error::UnableToStat(p.as_ref().to_path_buf(), e))?;
|
|
|
|
return Ok(proto::node::Node::Symlink(proto::SymlinkNode {
|
|
|
|
name: p
|
|
|
|
.as_ref()
|
|
|
|
.file_name()
|
|
|
|
.unwrap_or_default()
|
|
|
|
.to_str()
|
|
|
|
.map(|s| Ok(s.to_owned()))
|
|
|
|
.unwrap_or(Err(Error::InvalidEncoding(p.as_ref().to_path_buf())))?,
|
|
|
|
target: target
|
|
|
|
.to_str()
|
|
|
|
.map(|s| Ok(s.to_owned()))
|
|
|
|
.unwrap_or(Err(Error::InvalidEncoding(p.as_ref().to_path_buf())))?,
|
|
|
|
}));
|
|
|
|
}
|
|
|
|
|
|
|
|
let mut directories: HashMap<PathBuf, proto::Directory> = HashMap::default();
|
|
|
|
|
2023-06-09 11:26:34 +02:00
|
|
|
// TODO: pass this one instead?
|
2023-03-27 17:24:53 +02:00
|
|
|
let mut directory_putter = directory_service.put_multiple_start();
|
|
|
|
|
2023-02-27 18:07:16 +01:00
|
|
|
for entry in WalkDir::new(p)
|
|
|
|
.follow_links(false)
|
|
|
|
.contents_first(true)
|
|
|
|
.sort_by_file_name()
|
|
|
|
{
|
2023-02-27 13:03:53 +01:00
|
|
|
let entry = entry.unwrap();
|
|
|
|
|
|
|
|
// process_entry wants an Option<Directory> in case the entry points to a directory.
|
|
|
|
// make sure to provide it.
|
|
|
|
let maybe_directory: Option<proto::Directory> = {
|
|
|
|
if entry.file_type().is_dir() {
|
|
|
|
Some(
|
|
|
|
directories
|
|
|
|
.entry(entry.path().to_path_buf())
|
|
|
|
.or_default()
|
|
|
|
.clone(),
|
|
|
|
)
|
|
|
|
} else {
|
|
|
|
None
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
2023-06-09 17:22:25 +02:00
|
|
|
let node = process_entry(
|
|
|
|
blob_service.clone(),
|
|
|
|
&mut directory_putter,
|
|
|
|
&entry,
|
|
|
|
maybe_directory,
|
|
|
|
)?;
|
2023-02-27 13:03:53 +01:00
|
|
|
|
|
|
|
if entry.depth() == 0 {
|
|
|
|
return Ok(node);
|
|
|
|
} else {
|
|
|
|
// calculate the parent path, and make sure we register the node there.
|
|
|
|
// NOTE: entry.depth() > 0
|
|
|
|
let parent_path = entry.path().parent().unwrap().to_path_buf();
|
|
|
|
|
|
|
|
// record node in parent directory, creating a new [proto:Directory] if not there yet.
|
|
|
|
let parent_directory = directories.entry(parent_path).or_default();
|
|
|
|
match node {
|
|
|
|
proto::node::Node::Directory(e) => parent_directory.directories.push(e),
|
|
|
|
proto::node::Node::File(e) => parent_directory.files.push(e),
|
|
|
|
proto::node::Node::Symlink(e) => parent_directory.symlinks.push(e),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// unreachable, we already bailed out before if root doesn't exist.
|
|
|
|
panic!("tvix bug")
|
|
|
|
}
|