feat(tvix/store/directory): deduplicate Directory messages
We can omit sending Directory messages to clients that have already been sent in the same stream. We can also omit storing a Directory message if we already have it - they're content-addressed anyways. Change-Id: Iba44565e07157a83a033177a2ffbdddced64ba5c Reviewed-on: https://cl.tvl.fyi/c/depot/+/7881 Reviewed-by: tazjin <tazjin@tvl.su> Tested-by: BuildkiteCI
This commit is contained in:
parent
e719da53be
commit
d8e0fa8e5e
2 changed files with 82 additions and 9 deletions
|
@ -53,6 +53,9 @@ fn send_directories(
|
|||
}
|
||||
}
|
||||
|
||||
// keep a list of all the Directory messages already sent, so we can omit sending the same.
|
||||
let mut sent_directory_dgsts: HashSet<Vec<u8>> = HashSet::new();
|
||||
|
||||
// look up the directory at the top of the queue
|
||||
while let Some(ref digest) = deq.pop_front() {
|
||||
let digest_b64: String = BASE64.encode(digest);
|
||||
|
@ -87,9 +90,16 @@ fn send_directories(
|
|||
}
|
||||
|
||||
// if recursion was requested, all its children need to be added to the queue.
|
||||
// If a Directory message with the same digest has already
|
||||
// been sent previously, we can skip enqueueing it.
|
||||
// Same applies to when it already is in the queue.
|
||||
if req.recursive {
|
||||
for child_directory_node in &directory.directories {
|
||||
deq.push_back(child_directory_node.digest.clone());
|
||||
if !sent_directory_dgsts.contains(&child_directory_node.digest)
|
||||
&& !deq.contains(&child_directory_node.digest)
|
||||
{
|
||||
deq.push_back(child_directory_node.digest.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -98,6 +108,8 @@ fn send_directories(
|
|||
debug!("error sending: {}", e);
|
||||
return Err(Status::internal("error sending"));
|
||||
}
|
||||
|
||||
sent_directory_dgsts.insert(digest.to_vec());
|
||||
}
|
||||
},
|
||||
// some storage error?
|
||||
|
@ -144,16 +156,27 @@ fn insert_directories(
|
|||
seen_directory_dgsts.insert(dgst.clone());
|
||||
last_directory_dgst = Some(dgst.clone());
|
||||
|
||||
// insert the directory into the database
|
||||
if let Err(e) = txn.insert(dgst, directory.encode_to_vec()) {
|
||||
match e {
|
||||
// TODO: conflict is a problem, as the whole transaction closure is retried?
|
||||
sled::transaction::UnabortableTransactionError::Conflict => todo!(),
|
||||
sled::transaction::UnabortableTransactionError::Storage(e) => {
|
||||
warn!("storage error: {}", e);
|
||||
return Err(Status::internal("storage error"));
|
||||
// check if the directory already exists in the database. We can skip
|
||||
// inserting if it's already there, as that'd be a no-op.
|
||||
match txn.get(dgst.clone()) {
|
||||
Ok(None) => {
|
||||
// insert the directory into the database
|
||||
if let Err(e) = txn.insert(dgst, directory.encode_to_vec()) {
|
||||
match e {
|
||||
// TODO: conflict is a problem, as the whole transaction closure is retried?
|
||||
sled::transaction::UnabortableTransactionError::Conflict => todo!(),
|
||||
sled::transaction::UnabortableTransactionError::Storage(e) => {
|
||||
warn!("storage error: {}", e);
|
||||
return Err(Status::internal("storage error"));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(Some(_)) => continue,
|
||||
Err(e) => {
|
||||
warn!("storage error: {}", e);
|
||||
return Err(Status::internal("storage error"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -19,6 +19,21 @@ lazy_static! {
|
|||
}],
|
||||
..Default::default()
|
||||
};
|
||||
static ref DIRECTORY_C: Directory = Directory {
|
||||
directories: vec![
|
||||
DirectoryNode {
|
||||
name: "a".to_string(),
|
||||
digest: DIRECTORY_A.digest(),
|
||||
size: DIRECTORY_A.size(),
|
||||
},
|
||||
DirectoryNode {
|
||||
name: "a'".to_string(),
|
||||
digest: DIRECTORY_A.digest(),
|
||||
size: DIRECTORY_A.size(),
|
||||
}
|
||||
],
|
||||
..Default::default()
|
||||
};
|
||||
}
|
||||
|
||||
/// Send the specified GetDirectoryRequest.
|
||||
|
@ -156,3 +171,38 @@ async fn put_get_multiple() -> anyhow::Result<()> {
|
|||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Put multiple Directories into the store, and omit duplicates.
|
||||
#[tokio::test]
|
||||
async fn put_get_dedup() -> anyhow::Result<()> {
|
||||
let service = SledDirectoryService::new(TempDir::new()?.path().to_path_buf())?;
|
||||
|
||||
// Send "A", then "C", which refers to "A" two times
|
||||
// Pretend we're a dumb client sending A twice.
|
||||
let put_resp = service
|
||||
.put(tonic_mock::streaming_request(vec![
|
||||
DIRECTORY_A.clone(),
|
||||
DIRECTORY_A.clone(),
|
||||
DIRECTORY_C.clone(),
|
||||
]))
|
||||
.await
|
||||
.expect("must succeed");
|
||||
|
||||
assert_eq!(DIRECTORY_C.digest(), put_resp.into_inner().root_digest);
|
||||
|
||||
// Ask for "C" recursively. We expect to only get "A" once, as there's no point sending it twice.
|
||||
let items = get_directories(
|
||||
&service,
|
||||
GetDirectoryRequest {
|
||||
recursive: true,
|
||||
by_what: Some(ByWhat::Digest(DIRECTORY_C.digest())),
|
||||
},
|
||||
)
|
||||
.await
|
||||
.expect("must not error");
|
||||
|
||||
// We expect to get C, and then A (once, as the second A has been deduplicated).
|
||||
assert_eq!(vec![DIRECTORY_C.clone(), DIRECTORY_A.clone()], items);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue