2023-03-27 17:08:16 +02:00
|
|
|
use super::DirectoryPutter;
|
2023-03-27 14:47:57 +02:00
|
|
|
use super::DirectoryService;
|
|
|
|
use crate::proto;
|
2023-05-18 20:43:33 +02:00
|
|
|
use crate::B3Digest;
|
2023-03-27 14:47:57 +02:00
|
|
|
use crate::Error;
|
|
|
|
use std::collections::{HashSet, VecDeque};
|
|
|
|
use tracing::{debug_span, instrument, warn};
|
|
|
|
|
|
|
|
/// Traverses a [proto::Directory] from the root to the children.
|
|
|
|
///
|
|
|
|
/// This is mostly BFS, but directories are only returned once.
|
|
|
|
pub struct DirectoryTraverser<DS: DirectoryService> {
|
|
|
|
directory_service: DS,
|
|
|
|
/// The list of all directories that still need to be traversed. The next
|
|
|
|
/// element is picked from the front, new elements are enqueued at the
|
|
|
|
/// back.
|
2023-05-18 20:43:33 +02:00
|
|
|
worklist_directory_digests: VecDeque<B3Digest>,
|
2023-03-27 14:47:57 +02:00
|
|
|
/// The list of directory digests already sent to the consumer.
|
|
|
|
/// We omit sending the same directories multiple times.
|
2023-05-18 20:43:33 +02:00
|
|
|
sent_directory_digests: HashSet<B3Digest>,
|
2023-03-27 14:47:57 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
impl<DS: DirectoryService> DirectoryTraverser<DS> {
|
2023-05-18 20:43:33 +02:00
|
|
|
pub fn with(directory_service: DS, root_directory_digest: &B3Digest) -> Self {
|
2023-03-27 14:47:57 +02:00
|
|
|
Self {
|
|
|
|
directory_service,
|
2023-05-18 20:43:33 +02:00
|
|
|
worklist_directory_digests: VecDeque::from([root_directory_digest.clone()]),
|
2023-03-27 14:47:57 +02:00
|
|
|
sent_directory_digests: HashSet::new(),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// enqueue all child directory digests to the work queue, as
|
|
|
|
// long as they're not part of the worklist or already sent.
|
|
|
|
// This panics if the digest looks invalid, it's supposed to be checked first.
|
|
|
|
fn enqueue_child_directories(&mut self, directory: &proto::Directory) {
|
|
|
|
for child_directory_node in &directory.directories {
|
2023-05-18 20:43:33 +02:00
|
|
|
// TODO: propagate error
|
|
|
|
let child_digest = B3Digest::from_vec(child_directory_node.digest.clone()).unwrap();
|
2023-03-27 14:47:57 +02:00
|
|
|
|
|
|
|
if self.worklist_directory_digests.contains(&child_digest)
|
|
|
|
|| self.sent_directory_digests.contains(&child_digest)
|
|
|
|
{
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
self.worklist_directory_digests.push_back(child_digest);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<DS: DirectoryService> Iterator for DirectoryTraverser<DS> {
|
|
|
|
type Item = Result<proto::Directory, Error>;
|
|
|
|
|
|
|
|
#[instrument(skip_all)]
|
|
|
|
fn next(&mut self) -> Option<Self::Item> {
|
|
|
|
// fetch the next directory digest from the top of the work queue.
|
|
|
|
match self.worklist_directory_digests.pop_front() {
|
|
|
|
None => None,
|
|
|
|
Some(current_directory_digest) => {
|
2023-05-18 20:43:33 +02:00
|
|
|
let span = debug_span!("directory.digest", "{}", current_directory_digest);
|
2023-03-27 14:47:57 +02:00
|
|
|
let _ = span.enter();
|
|
|
|
|
|
|
|
// look up the directory itself.
|
|
|
|
let current_directory = match self.directory_service.get(¤t_directory_digest)
|
|
|
|
{
|
|
|
|
// if we got it
|
|
|
|
Ok(Some(current_directory)) => {
|
|
|
|
// validate, we don't want to send invalid directories.
|
|
|
|
if let Err(e) = current_directory.validate() {
|
|
|
|
warn!("directory failed validation: {}", e.to_string());
|
|
|
|
return Some(Err(Error::StorageError(format!(
|
|
|
|
"invalid directory: {}",
|
2023-05-18 20:43:33 +02:00
|
|
|
current_directory_digest
|
2023-03-27 14:47:57 +02:00
|
|
|
))));
|
|
|
|
}
|
|
|
|
current_directory
|
|
|
|
}
|
|
|
|
// if it's not there, we have an inconsistent store!
|
|
|
|
Ok(None) => {
|
2023-05-18 20:43:33 +02:00
|
|
|
warn!("directory {} does not exist", current_directory_digest);
|
2023-03-27 14:47:57 +02:00
|
|
|
return Some(Err(Error::StorageError(format!(
|
|
|
|
"directory {} does not exist",
|
2023-05-18 20:43:33 +02:00
|
|
|
current_directory_digest
|
2023-03-27 14:47:57 +02:00
|
|
|
))));
|
|
|
|
}
|
|
|
|
Err(e) => {
|
|
|
|
warn!("failed to look up directory");
|
|
|
|
return Some(Err(Error::StorageError(format!(
|
|
|
|
"unable to look up directory {}: {}",
|
2023-05-18 20:43:33 +02:00
|
|
|
current_directory_digest, e
|
2023-03-27 14:47:57 +02:00
|
|
|
))));
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
// All DirectoryServices MUST validate directory nodes, before returning them out, so we
|
|
|
|
// can be sure [enqueue_child_directories] doesn't panic.
|
|
|
|
|
|
|
|
// enqueue child directories
|
|
|
|
self.enqueue_child_directories(¤t_directory);
|
|
|
|
Some(Ok(current_directory))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2023-03-27 17:08:16 +02:00
|
|
|
|
|
|
|
/// This is a simple implementation of a Directory uploader.
|
|
|
|
/// TODO: verify connectivity? Factor out these checks into generic helpers?
|
|
|
|
pub struct SimplePutter<DS: DirectoryService> {
|
|
|
|
directory_service: DS,
|
2023-05-18 20:43:33 +02:00
|
|
|
last_directory_digest: Option<B3Digest>,
|
2023-03-27 17:08:16 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
impl<DS: DirectoryService> SimplePutter<DS> {
|
|
|
|
pub fn new(directory_service: DS) -> Self {
|
|
|
|
Self {
|
|
|
|
directory_service,
|
|
|
|
last_directory_digest: None,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<DS: DirectoryService> DirectoryPutter for SimplePutter<DS> {
|
|
|
|
fn put(&mut self, directory: proto::Directory) -> Result<(), Error> {
|
|
|
|
let digest = self.directory_service.put(directory)?;
|
|
|
|
|
|
|
|
// track the last directory digest
|
|
|
|
self.last_directory_digest = Some(digest);
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
/// We need to be mutable here, as that's the signature of the trait.
|
2023-05-18 20:43:33 +02:00
|
|
|
fn close(&mut self) -> Result<B3Digest, Error> {
|
|
|
|
match &self.last_directory_digest {
|
|
|
|
Some(last_digest) => Ok(last_digest.clone()),
|
2023-03-27 17:08:16 +02:00
|
|
|
None => Err(Error::InvalidRequest(
|
|
|
|
"no directories sent, can't show root digest".to_string(),
|
|
|
|
)),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|