feat(tvix/store/directory): validate Directory and sizes
This calls out to Directory::validate() for all received Directory messages, and also makes sure the sizes we refer a Directory message as matches the sizes that have been calculated. Change-Id: I316f9191d5872ee4ba6d78b9a4326f069b22fa63 Reviewed-on: https://cl.tvl.fyi/c/depot/+/7882 Tested-by: BuildkiteCI Reviewed-by: tazjin <tazjin@tvl.su>
This commit is contained in:
parent
d8e0fa8e5e
commit
90979d39f3
2 changed files with 98 additions and 10 deletions
|
@ -1,4 +1,5 @@
|
||||||
use data_encoding::BASE64;
|
use data_encoding::BASE64;
|
||||||
|
use std::collections::HashMap;
|
||||||
use std::collections::HashSet;
|
use std::collections::HashSet;
|
||||||
use std::collections::VecDeque;
|
use std::collections::VecDeque;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
|
@ -131,20 +132,47 @@ fn insert_directories(
|
||||||
txn: &sled::transaction::TransactionalTree,
|
txn: &sled::transaction::TransactionalTree,
|
||||||
directories: &Vec<Directory>,
|
directories: &Vec<Directory>,
|
||||||
) -> Result<Vec<u8>, Status> {
|
) -> Result<Vec<u8>, Status> {
|
||||||
// This keeps track of the seen directory keys.
|
// This keeps track of the seen directory keys, and their size.
|
||||||
|
// This is used to validate the size field of a reference to a previously sent directory.
|
||||||
// We don't need to keep the contents around, they're stored in the DB.
|
// We don't need to keep the contents around, they're stored in the DB.
|
||||||
let mut seen_directory_dgsts: HashSet<Vec<u8>> = HashSet::new();
|
let mut seen_directories_sizes: HashMap<Vec<u8>, u32> = HashMap::new();
|
||||||
let mut last_directory_dgst: Option<Vec<u8>> = None;
|
let mut last_directory_dgst: Option<Vec<u8>> = None;
|
||||||
|
|
||||||
for directory in directories {
|
for directory in directories {
|
||||||
|
// validate the directory itself.
|
||||||
|
if let Err(e) = directory.validate() {
|
||||||
|
return Err(Status::invalid_argument(format!(
|
||||||
|
"directory {} failed validation: {}",
|
||||||
|
BASE64.encode(&directory.digest()),
|
||||||
|
e,
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
|
||||||
// for each child directory this directory refers to, we need
|
// for each child directory this directory refers to, we need
|
||||||
// to ensure it has been seen already in this stream.
|
// to ensure it has been seen already in this stream, and that the size
|
||||||
|
// matches what we recorded.
|
||||||
for child_directory in &directory.directories {
|
for child_directory in &directory.directories {
|
||||||
if !seen_directory_dgsts.contains(&child_directory.digest) {
|
match seen_directories_sizes.get(&child_directory.digest) {
|
||||||
return Err(Status::invalid_argument(format!(
|
None => {
|
||||||
"referred child directory {} not seen yet",
|
return Err(Status::invalid_argument(format!(
|
||||||
BASE64.encode(&child_directory.digest)
|
"child directory '{}' ({}) in directory '{}' not seen yet",
|
||||||
)));
|
child_directory.name,
|
||||||
|
BASE64.encode(&child_directory.digest),
|
||||||
|
BASE64.encode(&directory.digest()),
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
Some(seen_child_directory_size) => {
|
||||||
|
if seen_child_directory_size != &child_directory.size {
|
||||||
|
return Err(Status::invalid_argument(format!(
|
||||||
|
"child directory '{}' ({}) in directory '{}' referred with wrong size, expected {}, actual {}",
|
||||||
|
child_directory.name,
|
||||||
|
BASE64.encode(&child_directory.digest),
|
||||||
|
BASE64.encode(&directory.digest()),
|
||||||
|
seen_child_directory_size,
|
||||||
|
child_directory.size,
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -153,7 +181,7 @@ fn insert_directories(
|
||||||
// disconnected graphs at the same time…
|
// disconnected graphs at the same time…
|
||||||
|
|
||||||
let dgst = directory.digest();
|
let dgst = directory.digest();
|
||||||
seen_directory_dgsts.insert(dgst.clone());
|
seen_directories_sizes.insert(dgst.clone(), directory.size());
|
||||||
last_directory_dgst = Some(dgst.clone());
|
last_directory_dgst = Some(dgst.clone());
|
||||||
|
|
||||||
// check if the directory already exists in the database. We can skip
|
// check if the directory already exists in the database. We can skip
|
||||||
|
|
|
@ -5,7 +5,7 @@ use tonic::Status;
|
||||||
use crate::proto::directory_service_server::DirectoryService;
|
use crate::proto::directory_service_server::DirectoryService;
|
||||||
use crate::proto::get_directory_request::ByWhat;
|
use crate::proto::get_directory_request::ByWhat;
|
||||||
use crate::proto::GetDirectoryRequest;
|
use crate::proto::GetDirectoryRequest;
|
||||||
use crate::proto::{Directory, DirectoryNode};
|
use crate::proto::{Directory, DirectoryNode, SymlinkNode};
|
||||||
use crate::sled_directory_service::SledDirectoryService;
|
use crate::sled_directory_service::SledDirectoryService;
|
||||||
use lazy_static::lazy_static;
|
use lazy_static::lazy_static;
|
||||||
|
|
||||||
|
@ -206,3 +206,63 @@ async fn put_get_dedup() -> anyhow::Result<()> {
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Trying to upload a Directory failing validation should fail.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn put_reject_failed_validation() -> anyhow::Result<()> {
|
||||||
|
let service = SledDirectoryService::new(TempDir::new()?.path().to_path_buf())?;
|
||||||
|
|
||||||
|
// construct a broken Directory message that fails validation
|
||||||
|
let broken_directory = Directory {
|
||||||
|
symlinks: vec![SymlinkNode {
|
||||||
|
name: "".to_string(),
|
||||||
|
target: "doesntmatter".to_string(),
|
||||||
|
}],
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
assert!(broken_directory.validate().is_err());
|
||||||
|
|
||||||
|
// send it over, it must fail
|
||||||
|
let put_resp = service
|
||||||
|
.put(tonic_mock::streaming_request(vec![broken_directory]))
|
||||||
|
.await
|
||||||
|
.expect_err("must fail");
|
||||||
|
|
||||||
|
assert_eq!(put_resp.code(), tonic::Code::InvalidArgument);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Trying to upload a Directory with wrong size should fail.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn put_reject_wrong_size() -> anyhow::Result<()> {
|
||||||
|
let service = SledDirectoryService::new(TempDir::new()?.path().to_path_buf())?;
|
||||||
|
|
||||||
|
// Construct a directory referring to DIRECTORY_A, but with wrong size.
|
||||||
|
let broken_parent_directory = Directory {
|
||||||
|
directories: vec![DirectoryNode {
|
||||||
|
name: "foo".to_string(),
|
||||||
|
digest: DIRECTORY_A.digest(),
|
||||||
|
size: 42,
|
||||||
|
}],
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
// Make sure we got the size wrong.
|
||||||
|
assert_ne!(
|
||||||
|
broken_parent_directory.directories[0].size,
|
||||||
|
DIRECTORY_A.size()
|
||||||
|
);
|
||||||
|
|
||||||
|
// now upload both (first A, then the broken parent). This must fail.
|
||||||
|
let put_resp = service
|
||||||
|
.put(tonic_mock::streaming_request(vec![
|
||||||
|
DIRECTORY_A.clone(),
|
||||||
|
broken_parent_directory,
|
||||||
|
]))
|
||||||
|
.await
|
||||||
|
.expect_err("must fail");
|
||||||
|
|
||||||
|
assert_eq!(put_resp.code(), tonic::Code::InvalidArgument);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue