refactor(tvix/*store): use DS: DirectoryService
We implement DirectoryService for Arc<DirectoryService> and Box<DirectoryService>, this is sufficient. Change-Id: I0a5a81cbc4782764406b5bca57f908ace6090737 Reviewed-on: https://cl.tvl.fyi/c/depot/+/11586 Tested-by: BuildkiteCI Reviewed-by: Connor Brewster <cbrewster@hey.com>
This commit is contained in:
parent
f2f12d1556
commit
ba00f0c695
6 changed files with 23 additions and 29 deletions
|
@ -84,7 +84,7 @@ pub async fn ingest_archive<BS, DS, R>(
|
||||||
) -> Result<Node, IngestionError<Error>>
|
) -> Result<Node, IngestionError<Error>>
|
||||||
where
|
where
|
||||||
BS: BlobService + Clone + 'static,
|
BS: BlobService + Clone + 'static,
|
||||||
DS: AsRef<dyn DirectoryService>,
|
DS: DirectoryService,
|
||||||
R: AsyncRead + Unpin,
|
R: AsyncRead + Unpin,
|
||||||
{
|
{
|
||||||
// Since tarballs can have entries in any arbitrary order, we need to
|
// Since tarballs can have entries in any arbitrary order, we need to
|
||||||
|
|
|
@ -35,7 +35,7 @@ pub async fn ingest_path<BS, DS, P>(
|
||||||
where
|
where
|
||||||
P: AsRef<std::path::Path> + std::fmt::Debug,
|
P: AsRef<std::path::Path> + std::fmt::Debug,
|
||||||
BS: BlobService + Clone,
|
BS: BlobService + Clone,
|
||||||
DS: AsRef<dyn DirectoryService>,
|
DS: DirectoryService,
|
||||||
{
|
{
|
||||||
let iter = WalkDir::new(path.as_ref())
|
let iter = WalkDir::new(path.as_ref())
|
||||||
.follow_links(false)
|
.follow_links(false)
|
||||||
|
|
|
@ -49,7 +49,7 @@ pub async fn ingest_entries<DS, S, E>(
|
||||||
mut entries: S,
|
mut entries: S,
|
||||||
) -> Result<Node, IngestionError<E>>
|
) -> Result<Node, IngestionError<E>>
|
||||||
where
|
where
|
||||||
DS: AsRef<dyn DirectoryService>,
|
DS: DirectoryService,
|
||||||
S: Stream<Item = Result<IngestionEntry, E>> + Send + std::marker::Unpin,
|
S: Stream<Item = Result<IngestionEntry, E>> + Send + std::marker::Unpin,
|
||||||
E: std::error::Error,
|
E: std::error::Error,
|
||||||
{
|
{
|
||||||
|
@ -90,7 +90,7 @@ where
|
||||||
// If we don't have one yet (as that's the first one to upload),
|
// If we don't have one yet (as that's the first one to upload),
|
||||||
// initialize the putter.
|
// initialize the putter.
|
||||||
maybe_directory_putter
|
maybe_directory_putter
|
||||||
.get_or_insert_with(|| directory_service.as_ref().put_multiple_start())
|
.get_or_insert_with(|| directory_service.put_multiple_start())
|
||||||
.put(directory)
|
.put(directory)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| {
|
.map_err(|e| {
|
||||||
|
|
|
@ -172,8 +172,8 @@ async fn hash<D: Digest + std::io::Write>(
|
||||||
|
|
||||||
impl<BS, DS, PS> Fetcher<BS, DS, PS>
|
impl<BS, DS, PS> Fetcher<BS, DS, PS>
|
||||||
where
|
where
|
||||||
BS: AsRef<(dyn BlobService + 'static)> + Clone + Send + Sync + 'static,
|
BS: BlobService + Clone + 'static,
|
||||||
DS: AsRef<(dyn DirectoryService + 'static)>,
|
DS: DirectoryService + Clone,
|
||||||
PS: PathInfoService,
|
PS: PathInfoService,
|
||||||
{
|
{
|
||||||
/// Ingest the data from a specified [Fetch].
|
/// Ingest the data from a specified [Fetch].
|
||||||
|
@ -247,7 +247,7 @@ where
|
||||||
// Ingest the archive, get the root node
|
// Ingest the archive, get the root node
|
||||||
let node = tvix_castore::import::archive::ingest_archive(
|
let node = tvix_castore::import::archive::ingest_archive(
|
||||||
self.blob_service.clone(),
|
self.blob_service.clone(),
|
||||||
&self.directory_service,
|
self.directory_service.clone(),
|
||||||
archive,
|
archive,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
|
@ -114,7 +114,7 @@ pub async fn import_path_as_nar_ca<BS, DS, PS, P>(
|
||||||
where
|
where
|
||||||
P: AsRef<Path> + std::fmt::Debug,
|
P: AsRef<Path> + std::fmt::Debug,
|
||||||
BS: BlobService + Clone,
|
BS: BlobService + Clone,
|
||||||
DS: AsRef<dyn DirectoryService>,
|
DS: DirectoryService,
|
||||||
PS: AsRef<dyn PathInfoService>,
|
PS: AsRef<dyn PathInfoService>,
|
||||||
{
|
{
|
||||||
let root_node = ingest_path(blob_service, directory_service, path.as_ref())
|
let root_node = ingest_path(blob_service, directory_service, path.as_ref())
|
||||||
|
|
|
@ -24,19 +24,19 @@ pub fn read_nar<R, BS, DS>(
|
||||||
) -> io::Result<castorepb::node::Node>
|
) -> io::Result<castorepb::node::Node>
|
||||||
where
|
where
|
||||||
R: BufRead + Send,
|
R: BufRead + Send,
|
||||||
BS: AsRef<dyn BlobService>,
|
BS: BlobService + Clone,
|
||||||
DS: AsRef<dyn DirectoryService>,
|
DS: DirectoryService,
|
||||||
{
|
{
|
||||||
let handle = tokio::runtime::Handle::current();
|
let handle = tokio::runtime::Handle::current();
|
||||||
|
|
||||||
let directory_putter = directory_service.as_ref().put_multiple_start();
|
let directory_putter = directory_service.put_multiple_start();
|
||||||
|
|
||||||
let node = nix_compat::nar::reader::open(r)?;
|
let node = nix_compat::nar::reader::open(r)?;
|
||||||
let (root_node, mut directory_putter, _) = process_node(
|
let (root_node, mut directory_putter) = process_node(
|
||||||
handle.clone(),
|
handle.clone(),
|
||||||
"".into(), // this is the root node, it has an empty name
|
"".into(), // this is the root node, it has an empty name
|
||||||
node,
|
node,
|
||||||
&blob_service,
|
blob_service,
|
||||||
directory_putter,
|
directory_putter,
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
|
@ -80,9 +80,9 @@ fn process_node<BS>(
|
||||||
node: nar::reader::Node,
|
node: nar::reader::Node,
|
||||||
blob_service: BS,
|
blob_service: BS,
|
||||||
directory_putter: Box<dyn DirectoryPutter>,
|
directory_putter: Box<dyn DirectoryPutter>,
|
||||||
) -> io::Result<(castorepb::node::Node, Box<dyn DirectoryPutter>, BS)>
|
) -> io::Result<(castorepb::node::Node, Box<dyn DirectoryPutter>)>
|
||||||
where
|
where
|
||||||
BS: AsRef<dyn BlobService>,
|
BS: BlobService + Clone,
|
||||||
{
|
{
|
||||||
Ok(match node {
|
Ok(match node {
|
||||||
nar::reader::Node::Symlink { target } => (
|
nar::reader::Node::Symlink { target } => (
|
||||||
|
@ -91,7 +91,6 @@ where
|
||||||
target: target.into(),
|
target: target.into(),
|
||||||
}),
|
}),
|
||||||
directory_putter,
|
directory_putter,
|
||||||
blob_service,
|
|
||||||
),
|
),
|
||||||
nar::reader::Node::File { executable, reader } => (
|
nar::reader::Node::File { executable, reader } => (
|
||||||
castorepb::node::Node::File(process_file_reader(
|
castorepb::node::Node::File(process_file_reader(
|
||||||
|
@ -99,19 +98,17 @@ where
|
||||||
name,
|
name,
|
||||||
reader,
|
reader,
|
||||||
executable,
|
executable,
|
||||||
&blob_service,
|
blob_service,
|
||||||
)?),
|
)?),
|
||||||
directory_putter,
|
directory_putter,
|
||||||
blob_service,
|
|
||||||
),
|
),
|
||||||
nar::reader::Node::Directory(dir_reader) => {
|
nar::reader::Node::Directory(dir_reader) => {
|
||||||
let (directory_node, directory_putter, blob_service_back) =
|
let (directory_node, directory_putter) =
|
||||||
process_dir_reader(handle, name, dir_reader, blob_service, directory_putter)?;
|
process_dir_reader(handle, name, dir_reader, blob_service, directory_putter)?;
|
||||||
|
|
||||||
(
|
(
|
||||||
castorepb::node::Node::Directory(directory_node),
|
castorepb::node::Node::Directory(directory_node),
|
||||||
directory_putter,
|
directory_putter,
|
||||||
blob_service_back,
|
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
@ -127,13 +124,13 @@ fn process_file_reader<BS>(
|
||||||
blob_service: BS,
|
blob_service: BS,
|
||||||
) -> io::Result<castorepb::FileNode>
|
) -> io::Result<castorepb::FileNode>
|
||||||
where
|
where
|
||||||
BS: AsRef<dyn BlobService>,
|
BS: BlobService,
|
||||||
{
|
{
|
||||||
// store the length. If we read any other length, reading will fail.
|
// store the length. If we read any other length, reading will fail.
|
||||||
let expected_len = file_reader.len();
|
let expected_len = file_reader.len();
|
||||||
|
|
||||||
// prepare writing a new blob.
|
// prepare writing a new blob.
|
||||||
let blob_writer = handle.block_on(async { blob_service.as_ref().open_write().await });
|
let blob_writer = handle.block_on(async { blob_service.open_write().await });
|
||||||
|
|
||||||
// write the blob.
|
// write the blob.
|
||||||
let mut blob_writer = {
|
let mut blob_writer = {
|
||||||
|
@ -168,24 +165,22 @@ fn process_dir_reader<BS>(
|
||||||
mut dir_reader: nar::reader::DirReader,
|
mut dir_reader: nar::reader::DirReader,
|
||||||
blob_service: BS,
|
blob_service: BS,
|
||||||
directory_putter: Box<dyn DirectoryPutter>,
|
directory_putter: Box<dyn DirectoryPutter>,
|
||||||
) -> io::Result<(castorepb::DirectoryNode, Box<dyn DirectoryPutter>, BS)>
|
) -> io::Result<(castorepb::DirectoryNode, Box<dyn DirectoryPutter>)>
|
||||||
where
|
where
|
||||||
BS: AsRef<dyn BlobService>,
|
BS: BlobService + Clone,
|
||||||
{
|
{
|
||||||
let mut directory = castorepb::Directory::default();
|
let mut directory = castorepb::Directory::default();
|
||||||
|
|
||||||
let mut directory_putter = directory_putter;
|
let mut directory_putter = directory_putter;
|
||||||
let mut blob_service = blob_service;
|
|
||||||
while let Some(entry) = dir_reader.next()? {
|
while let Some(entry) = dir_reader.next()? {
|
||||||
let (node, directory_putter_back, blob_service_back) = process_node(
|
let (node, directory_putter_back) = process_node(
|
||||||
handle.clone(),
|
handle.clone(),
|
||||||
entry.name.into(),
|
entry.name.into(),
|
||||||
entry.node,
|
entry.node,
|
||||||
blob_service,
|
blob_service.clone(),
|
||||||
directory_putter,
|
directory_putter,
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
blob_service = blob_service_back;
|
|
||||||
directory_putter = directory_putter_back;
|
directory_putter = directory_putter_back;
|
||||||
|
|
||||||
match node {
|
match node {
|
||||||
|
@ -213,7 +208,6 @@ where
|
||||||
size: directory_size,
|
size: directory_size,
|
||||||
},
|
},
|
||||||
directory_putter,
|
directory_putter,
|
||||||
blob_service,
|
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue