feat(tvix/castore/directory): add SledDirectoryPutter

This uses DirectoryClosureValidator for validation and the sled batch
API to insert multiple directories at once.

Change-Id: I2d6dc513ccbc02e638f8d22173da5463e73182ee
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11222
Autosubmit: flokli <flokli@flokli.de>
Reviewed-by: Connor Brewster <cbrewster@hey.com>
Tested-by: BuildkiteCI
Reviewed-by: raitobezarius <tvl@lahfa.xyz>
This commit is contained in:
Florian Klink 2024-03-20 22:00:30 +02:00 committed by clbot
parent f7281d8fd5
commit 21fcc1c9df

View file

@ -1,14 +1,14 @@
use crate::directoryservice::DirectoryPutter;
use crate::proto::Directory; use crate::proto::Directory;
use crate::{proto, B3Digest, Error}; use crate::{proto, B3Digest, Error};
use futures::stream::BoxStream; use futures::stream::BoxStream;
use prost::Message; use prost::Message;
use std::ops::Deref;
use std::path::Path; use std::path::Path;
use tonic::async_trait; use tonic::async_trait;
use tracing::{instrument, warn}; use tracing::{instrument, warn};
use super::utils::traverse_directory; use super::utils::traverse_directory;
use super::{DirectoryService, SimplePutter}; use super::{ClosureValidator, DirectoryPutter, DirectoryService};
#[derive(Clone)] #[derive(Clone)]
pub struct SledDirectoryService { pub struct SledDirectoryService {
@ -107,6 +107,62 @@ impl DirectoryService for SledDirectoryService {
where where
Self: Clone, Self: Clone,
{ {
Box::new(SimplePutter::new(self.clone())) Box::new(SledDirectoryPutter {
tree: self.db.deref().clone(),
directory_validator: Some(Default::default()),
})
}
}
/// Buffers Directory messages to be uploaded and inserts them in a batch
/// transaction on close.
pub struct SledDirectoryPutter {
tree: sled::Tree,
/// The directories (inside the directory validator) that we insert later,
/// or None, if they were already inserted.
directory_validator: Option<ClosureValidator>,
}
#[async_trait]
impl DirectoryPutter for SledDirectoryPutter {
#[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)]
async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> {
match self.directory_validator {
None => return Err(Error::StorageError("already closed".to_string())),
Some(ref mut validator) => {
validator.add(directory)?;
}
}
Ok(())
}
#[instrument(level = "trace", skip_all, ret, err)]
async fn close(&mut self) -> Result<B3Digest, Error> {
match self.directory_validator.take() {
None => Err(Error::InvalidRequest("already closed".to_string())),
Some(validator) => {
// retrieve the validated directories.
let directories = validator.finalize()?;
// Get the root digest, which is at the end (cf. insertion order)
let root_digest = directories
.last()
.ok_or_else(|| Error::InvalidRequest("got no directories".to_string()))?
.digest();
let mut batch = sled::Batch::default();
for directory in directories {
batch.insert(directory.digest().as_slice(), directory.encode_to_vec());
}
self.tree
.apply_batch(batch)
.map_err(|e| Error::StorageError(format!("unable to apply batch: {}", e)))?;
Ok(root_digest)
}
}
} }
} }