refactor(tvix/store/directorysvc): use [u8; 32] instead of Vec<u8>

Also, simplify the trait interface, only allowing lookups of Directory
objects by their digest.

Change-Id: I6eec28a8cb0557bed9b69df8b8ff99a5e0f8fe35
Reviewed-on: https://cl.tvl.fyi/c/depot/+/8313
Tested-by: BuildkiteCI
Autosubmit: flokli <flokli@flokli.de>
Reviewed-by: tazjin <tazjin@tvl.su>
This commit is contained in:
Florian Klink 2023-03-16 00:01:30 +01:00 committed by flokli
parent 9c08cbc973
commit ee23220564
12 changed files with 126 additions and 128 deletions

View file

@ -8,46 +8,38 @@ use super::DirectoryService;
#[derive(Clone, Default)]
pub struct MemoryDirectoryService {
db: Arc<RwLock<HashMap<Vec<u8>, proto::Directory>>>,
db: Arc<RwLock<HashMap<[u8; 32], proto::Directory>>>,
}
impl DirectoryService for MemoryDirectoryService {
// TODO: change api to only be by digest
#[instrument(skip(self, by_what))]
fn get(
&self,
by_what: &proto::get_directory_request::ByWhat,
) -> Result<Option<proto::Directory>, Error> {
match by_what {
proto::get_directory_request::ByWhat::Digest(digest) => {
let db = self.db.read()?;
#[instrument(skip(self, digest), fields(directory.digest = BASE64.encode(digest)))]
fn get(&self, digest: &[u8; 32]) -> Result<Option<proto::Directory>, Error> {
let db = self.db.read()?;
match db.get(digest) {
// The directory was not found, return
None => Ok(None),
match db.get(digest) {
// The directory was not found, return
None => Ok(None),
// The directory was found, try to parse the data as Directory message
Some(directory) => {
// Validate the retrieved Directory indeed has the
// digest we expect it to have, to detect corruptions.
let actual_digest = directory.digest();
if actual_digest.as_slice() != digest {
return Err(Error::StorageError(format!(
"requested directory with digest {}, but got {}",
BASE64.encode(digest),
BASE64.encode(&actual_digest)
)));
}
Ok(Some(directory.clone()))
}
// The directory was found, try to parse the data as Directory message
Some(directory) => {
// Validate the retrieved Directory indeed has the
// digest we expect it to have, to detect corruptions.
let actual_digest = directory.digest();
if actual_digest.as_slice() != digest {
return Err(Error::StorageError(format!(
"requested directory with digest {}, but got {}",
BASE64.encode(digest),
BASE64.encode(&actual_digest)
)));
}
Ok(Some(directory.clone()))
}
}
}
#[instrument(skip(self, directory), fields(directory.digest = BASE64.encode(&directory.digest())))]
fn put(&self, directory: proto::Directory) -> Result<Vec<u8>, Error> {
fn put(&self, directory: proto::Directory) -> Result<[u8; 32], Error> {
let digest = directory.digest();
// validate the directory itself.
@ -61,7 +53,7 @@ impl DirectoryService for MemoryDirectoryService {
// store it
let mut db = self.db.write()?;
db.insert(digest.clone(), directory);
db.insert(digest, directory);
Ok(digest)
}

View file

@ -11,11 +11,8 @@ pub use self::sled::SledDirectoryService;
pub trait DirectoryService {
/// Get looks up a single Directory message by its digest.
/// In case the directory is not found, Ok(None) is returned.
fn get(
&self,
by_what: &proto::get_directory_request::ByWhat,
) -> Result<Option<proto::Directory>, Error>;
fn get(&self, digest: &[u8; 32]) -> Result<Option<proto::Directory>, Error>;
/// Get uploads a single Directory message, and returns the calculated
/// digest, or an error.
fn put(&self, directory: proto::Directory) -> Result<Vec<u8>, Error>;
fn put(&self, directory: proto::Directory) -> Result<[u8; 32], Error>;
}

View file

@ -29,48 +29,40 @@ impl SledDirectoryService {
}
impl DirectoryService for SledDirectoryService {
// TODO: change api to only be by digest
#[instrument(name = "SledDirectoryService::get", skip(self, by_what))]
fn get(
&self,
by_what: &proto::get_directory_request::ByWhat,
) -> Result<Option<proto::Directory>, Error> {
match by_what {
proto::get_directory_request::ByWhat::Digest(digest) => {
match self.db.get(digest) {
// The directory was not found, return
Ok(None) => Ok(None),
#[instrument(name = "SledDirectoryService::get", skip(self, digest), fields(directory.digest = BASE64.encode(digest)))]
fn get(&self, digest: &[u8; 32]) -> Result<Option<proto::Directory>, Error> {
match self.db.get(digest) {
// The directory was not found, return
Ok(None) => Ok(None),
// The directory was found, try to parse the data as Directory message
Ok(Some(data)) => match Directory::decode(&*data) {
Ok(directory) => {
// Validate the retrieved Directory indeed has the
// digest we expect it to have, to detect corruptions.
let actual_digest = directory.digest();
if actual_digest.as_slice() != digest {
return Err(Error::StorageError(format!(
"requested directory with digest {}, but got {}",
BASE64.encode(digest),
BASE64.encode(&actual_digest)
)));
}
// The directory was found, try to parse the data as Directory message
Ok(Some(data)) => match Directory::decode(&*data) {
Ok(directory) => {
// Validate the retrieved Directory indeed has the
// digest we expect it to have, to detect corruptions.
let actual_digest = directory.digest();
if actual_digest.as_slice() != digest {
return Err(Error::StorageError(format!(
"requested directory with digest {}, but got {}",
BASE64.encode(digest),
BASE64.encode(&actual_digest)
)));
}
Ok(Some(directory))
}
Err(e) => {
warn!("unable to parse directory {}: {}", BASE64.encode(digest), e);
Err(Error::StorageError(e.to_string()))
}
},
// some storage error?
Err(e) => Err(Error::StorageError(e.to_string())),
Ok(Some(directory))
}
}
Err(e) => {
warn!("unable to parse directory {}: {}", BASE64.encode(digest), e);
Err(Error::StorageError(e.to_string()))
}
},
// some storage error?
Err(e) => Err(Error::StorageError(e.to_string())),
}
}
#[instrument(name = "SledDirectoryService::put", skip(self, directory), fields(directory.digest = BASE64.encode(&directory.digest())))]
fn put(&self, directory: proto::Directory) -> Result<Vec<u8>, Error> {
fn put(&self, directory: proto::Directory) -> Result<[u8; 32], Error> {
let digest = directory.digest();
// validate the directory itself.
@ -82,7 +74,7 @@ impl DirectoryService for SledDirectoryService {
)));
}
// store it
let result = self.db.insert(&digest, directory.encode_to_vec());
let result = self.db.insert(digest, directory.encode_to_vec());
if let Err(e) = result {
return Err(Error::StorageError(e.to_string()));
}

View file

@ -85,7 +85,7 @@ fn process_entry<BS: BlobService, CS: ChunkService + std::marker::Sync, DS: Dire
.to_str()
.map(|s| Ok(s.to_owned()))
.unwrap_or(Err(Error::InvalidEncoding(entry.path().to_path_buf())))?,
digest: directory_digest,
digest: directory_digest.to_vec(),
size: directory_size,
}));
}

View file

@ -103,20 +103,23 @@ impl<BS: BlobService, CS: ChunkService + Clone, DS: DirectoryService> NARRendere
}
proto::node::Node::Directory(proto_directory_node) => {
// get the digest we're referring to
let digest = proto_directory_node.digest;
let digest: [u8; 32] = proto_directory_node.digest.try_into().map_err(|_e| {
RenderError::StoreError(crate::Error::StorageError(
"invalid digest len in directory node".to_string(),
))
})?;
// look it up with the directory service
let resp = self
.directory_service
.get(&proto::get_directory_request::ByWhat::Digest(
digest.to_vec(),
))
.get(&digest)
.map_err(RenderError::StoreError)?;
match resp {
// if it's None, that's an error!
None => {
return Err(RenderError::DirectoryNotFound(
digest,
digest.to_vec(),
proto_directory_node.name,
))
}

View file

@ -34,31 +34,33 @@ impl<DS: DirectoryService + Send + Sync + Clone + 'static>
let req_inner = request.into_inner();
let client = self.directory_service.clone();
let directory_service = self.directory_service.clone();
// kick off an async thread
task::spawn(async move {
// Keep the list of directory digests to traverse.
// As per rpc_directory.proto, we traverse in BFS order.
let mut deq: VecDeque<Vec<u8>> = VecDeque::new();
let mut deq: VecDeque<[u8; 32]> = VecDeque::new();
// look at the digest in the request and put it in the top of the queue.
match &req_inner.by_what {
None => return Err(Status::invalid_argument("by_what needs to be specified")),
Some(proto::get_directory_request::ByWhat::Digest(digest)) => {
if digest.len() != 32 {
return Err(Status::invalid_argument("invalid digest length"));
}
deq.push_back(digest.clone());
deq.push_back(
digest
.as_slice()
.try_into()
.map_err(|_e| Status::invalid_argument("invalid digest length"))?,
);
}
}
// keep a list of all the Directory messages already sent, so we can omit sending the same.
let mut sent_directory_dgsts: HashSet<Vec<u8>> = HashSet::new();
let mut sent_directory_dgsts: HashSet<[u8; 32]> = HashSet::new();
// look up the directory at the top of the queue
while let Some(ref digest) = deq.pop_front() {
let digest_b64: String = BASE64.encode(digest);
while let Some(digest) = deq.pop_front() {
let digest_b64: String = BASE64.encode(&digest);
// add digest we're currently processing to a span, but pay attention to
// https://docs.rs/tracing/0.1.37/tracing/span/struct.Span.html#in-asynchronous-code
@ -69,9 +71,7 @@ impl<DS: DirectoryService + Send + Sync + Clone + 'static>
let _enter = span.enter();
// invoke client.get, and map to a Result<Directory, Status>
match client.get(&proto::get_directory_request::ByWhat::Digest(
digest.to_vec(),
)) {
match directory_service.get(&digest) {
// The directory was not found, abort
Ok(None) => {
if !sent_directory_dgsts.is_empty() {
@ -94,10 +94,19 @@ impl<DS: DirectoryService + Send + Sync + Clone + 'static>
// Same applies to when it already is in the queue.
if req_inner.recursive {
for child_directory_node in &directory.directories {
if !sent_directory_dgsts.contains(&child_directory_node.digest)
&& !deq.contains(&child_directory_node.digest)
let child_directory_node_digest: [u8; 32] =
child_directory_node.digest.clone().try_into().map_err(
|_e| {
Status::internal(
"invalid child directory digest len",
)
},
)?;
if !sent_directory_dgsts.contains(&child_directory_node_digest)
&& !deq.contains(&child_directory_node_digest)
{
deq.push_back(child_directory_node.digest.clone());
deq.push_back(child_directory_node_digest);
}
}
}
@ -106,7 +115,7 @@ impl<DS: DirectoryService + Send + Sync + Clone + 'static>
// Strictly speaking, it wasn't sent yet, but tx.send happens right after,
// and the only way we can still fail is by the remote side to hang up,
// in which case we stop anyways.
sent_directory_dgsts.insert(digest.to_vec());
sent_directory_dgsts.insert(digest);
Ok(directory)
}
@ -143,8 +152,8 @@ impl<DS: DirectoryService + Send + Sync + Clone + 'static>
// This keeps track of the seen directory keys, and their size.
// This is used to validate the size field of a reference to a previously sent directory.
// We don't need to keep the contents around, they're stored in the DB.
let mut seen_directories_sizes: HashMap<Vec<u8>, u32> = HashMap::new();
let mut last_directory_dgst: Option<Vec<u8>> = None;
let mut seen_directories_sizes: HashMap<[u8; 32], u32> = HashMap::new();
let mut last_directory_dgst: Option<[u8; 32]> = None;
// Consume directories, and insert them into the store.
// Reject directory messages that refer to Directories not sent in the same stream.
@ -162,12 +171,18 @@ impl<DS: DirectoryService + Send + Sync + Clone + 'static>
// to ensure it has been seen already in this stream, and that the size
// matches what we recorded.
for child_directory in &directory.directories {
match seen_directories_sizes.get(&child_directory.digest) {
let child_directory_digest: [u8; 32] = child_directory
.digest
.clone()
.try_into()
.map_err(|_e| Status::internal("invalid child directory digest len"))?;
match seen_directories_sizes.get(&child_directory_digest) {
None => {
return Err(Status::invalid_argument(format!(
"child directory '{}' ({}) in directory '{}' not seen yet",
child_directory.name,
BASE64.encode(&child_directory.digest),
BASE64.encode(&child_directory_digest),
BASE64.encode(&directory.digest()),
)));
}
@ -176,7 +191,7 @@ impl<DS: DirectoryService + Send + Sync + Clone + 'static>
return Err(Status::invalid_argument(format!(
"child directory '{}' ({}) in directory '{}' referred with wrong size, expected {}, actual {}",
child_directory.name,
BASE64.encode(&child_directory.digest),
BASE64.encode(&child_directory_digest),
BASE64.encode(&directory.digest()),
seen_child_directory_size,
child_directory.size,
@ -197,15 +212,12 @@ impl<DS: DirectoryService + Send + Sync + Clone + 'static>
// does a reachability check.
let dgst = directory.digest();
seen_directories_sizes.insert(dgst.clone(), directory.size());
last_directory_dgst = Some(dgst.clone());
seen_directories_sizes.insert(dgst, directory.size());
last_directory_dgst = Some(dgst);
// check if the directory already exists in the database. We can skip
// inserting if it's already there, as that'd be a no-op.
match self
.directory_service
.get(&proto::get_directory_request::ByWhat::Digest(dgst.to_vec()))
{
match self.directory_service.get(&dgst) {
Err(e) => {
warn!("error checking if directory already exists: {}", e);
return Err(e.into());
@ -224,7 +236,7 @@ impl<DS: DirectoryService + Send + Sync + Clone + 'static>
match last_directory_dgst {
None => Err(Status::invalid_argument("no directories received")),
Some(last_directory_dgst) => Ok(Response::new(proto::PutDirectoryResponse {
root_digest: last_directory_dgst,
root_digest: last_directory_dgst.to_vec(),
})),
}
}

View file

@ -236,10 +236,10 @@ impl Directory {
/// Calculates the digest of a Directory, which is the blake3 hash of a
/// Directory protobuf message, serialized in protobuf canonical form.
pub fn digest(&self) -> Vec<u8> {
pub fn digest(&self) -> [u8; 32] {
let mut hasher = blake3::Hasher::new();
hasher.update(&self.encode_to_vec()).finalize().as_bytes()[..].to_vec()
*hasher.update(&self.encode_to_vec()).finalize().as_bytes()
}
/// validate checks the directory for invalid data, such as:

View file

@ -2,7 +2,7 @@ use crate::proto::{Directory, DirectoryNode, FileNode, SymlinkNode, ValidateDire
use lazy_static::lazy_static;
lazy_static! {
static ref DUMMY_DIGEST: Vec<u8> = vec![
static ref DUMMY_DIGEST: [u8; 32] = [
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00,
@ -66,7 +66,7 @@ fn digest() {
assert_eq!(
d.digest(),
vec![
[
0xaf, 0x13, 0x49, 0xb9, 0xf5, 0xf9, 0xa1, 0xa6, 0xa0, 0x40, 0x4d, 0xea, 0x36, 0xdc,
0xc9, 0x49, 0x9b, 0xcb, 0x25, 0xc9, 0xad, 0xc1, 0x12, 0xb7, 0xcc, 0x9a, 0x93, 0xca,
0xe4, 0x1f, 0x32, 0x62

View file

@ -44,7 +44,7 @@ async fn not_found() {
let resp = service
.get(tonic::Request::new(GetDirectoryRequest {
by_what: Some(ByWhat::Digest(DIRECTORY_A.digest())),
by_what: Some(ByWhat::Digest(DIRECTORY_A.digest().to_vec())),
..Default::default()
}))
.await;
@ -114,14 +114,17 @@ async fn put_get_multiple() {
.await
.expect("must succeed");
assert_eq!(DIRECTORY_B.digest(), put_resp.into_inner().root_digest);
assert_eq!(
DIRECTORY_B.digest().to_vec(),
put_resp.into_inner().root_digest
);
// now, request b, first in non-recursive mode.
let items = get_directories(
&service,
GetDirectoryRequest {
recursive: false,
by_what: Some(ByWhat::Digest(DIRECTORY_B.digest())),
by_what: Some(ByWhat::Digest(DIRECTORY_B.digest().to_vec())),
},
)
.await
@ -135,7 +138,7 @@ async fn put_get_multiple() {
&service,
GetDirectoryRequest {
recursive: true,
by_what: Some(ByWhat::Digest(DIRECTORY_B.digest())),
by_what: Some(ByWhat::Digest(DIRECTORY_B.digest().to_vec())),
},
)
.await
@ -161,14 +164,17 @@ async fn put_get_dedup() {
.await
.expect("must succeed");
assert_eq!(DIRECTORY_C.digest(), put_resp.into_inner().root_digest);
assert_eq!(
DIRECTORY_C.digest().to_vec(),
put_resp.into_inner().root_digest
);
// Ask for "C" recursively. We expect to only get "A" once, as there's no point sending it twice.
let items = get_directories(
&service,
GetDirectoryRequest {
recursive: true,
by_what: Some(ByWhat::Digest(DIRECTORY_C.digest())),
by_what: Some(ByWhat::Digest(DIRECTORY_C.digest().to_vec())),
},
)
.await
@ -211,7 +217,7 @@ async fn put_reject_wrong_size() {
let broken_parent_directory = Directory {
directories: vec![DirectoryNode {
name: "foo".to_string(),
digest: DIRECTORY_A.digest(),
digest: DIRECTORY_A.digest().to_vec(),
size: 42,
}],
..Default::default()

View file

@ -39,7 +39,7 @@ lazy_static! {
pub static ref DIRECTORY_COMPLICATED: proto::Directory = proto::Directory {
directories: vec![DirectoryNode {
name: "keep".to_string(),
digest: DIRECTORY_WITH_KEEP.digest(),
digest: DIRECTORY_WITH_KEEP.digest().to_vec(),
size: DIRECTORY_WITH_KEEP.size(),
}],
files: vec![FileNode {
@ -57,7 +57,7 @@ lazy_static! {
pub static ref DIRECTORY_B: Directory = Directory {
directories: vec![DirectoryNode {
name: "a".to_string(),
digest: DIRECTORY_A.digest(),
digest: DIRECTORY_A.digest().to_vec(),
size: DIRECTORY_A.size(),
}],
..Default::default()
@ -66,12 +66,12 @@ lazy_static! {
directories: vec![
DirectoryNode {
name: "a".to_string(),
digest: DIRECTORY_A.digest(),
digest: DIRECTORY_A.digest().to_vec(),
size: DIRECTORY_A.size(),
},
DirectoryNode {
name: "a'".to_string(),
digest: DIRECTORY_A.digest(),
digest: DIRECTORY_A.digest().to_vec(),
size: DIRECTORY_A.size(),
}
],

View file

@ -106,7 +106,7 @@ fn complicated() {
.unwrap()
.to_string_lossy()
.to_string(),
digest: DIRECTORY_COMPLICATED.digest(),
digest: DIRECTORY_COMPLICATED.digest().to_vec(),
size: DIRECTORY_COMPLICATED.size(),
}),
root_node,
@ -114,15 +114,11 @@ fn complicated() {
// ensure DIRECTORY_WITH_KEEP and DIRECTORY_COMPLICATED have been uploaded
assert!(directory_service
.get(&proto::get_directory_request::ByWhat::Digest(
DIRECTORY_WITH_KEEP.digest()
))
.get(&DIRECTORY_WITH_KEEP.digest())
.unwrap()
.is_some());
assert!(directory_service
.get(&proto::get_directory_request::ByWhat::Digest(
DIRECTORY_COMPLICATED.digest()
))
.get(&DIRECTORY_COMPLICATED.digest())
.unwrap()
.is_some());

View file

@ -198,7 +198,7 @@ fn test_complicated() {
&mut buf,
crate::proto::node::Node::Directory(DirectoryNode {
name: "doesntmatter".to_string(),
digest: DIRECTORY_COMPLICATED.digest(),
digest: DIRECTORY_COMPLICATED.digest().to_vec(),
size: DIRECTORY_COMPLICATED.size(),
}),
)