diff --git a/tvix/castore/src/directoryservice/sled.rs b/tvix/castore/src/directoryservice/sled.rs index f41c5f718..e4a4c2bbe 100644 --- a/tvix/castore/src/directoryservice/sled.rs +++ b/tvix/castore/src/directoryservice/sled.rs @@ -1,14 +1,14 @@ -use crate::directoryservice::DirectoryPutter; use crate::proto::Directory; use crate::{proto, B3Digest, Error}; use futures::stream::BoxStream; use prost::Message; +use std::ops::Deref; use std::path::Path; use tonic::async_trait; use tracing::{instrument, warn}; use super::utils::traverse_directory; -use super::{DirectoryService, SimplePutter}; +use super::{ClosureValidator, DirectoryPutter, DirectoryService}; #[derive(Clone)] pub struct SledDirectoryService { @@ -107,6 +107,62 @@ impl DirectoryService for SledDirectoryService { where Self: Clone, { - Box::new(SimplePutter::new(self.clone())) + Box::new(SledDirectoryPutter { + tree: self.db.deref().clone(), + directory_validator: Some(Default::default()), + }) + } +} + +/// Buffers Directory messages to be uploaded and inserts them in a batch +/// transaction on close. +pub struct SledDirectoryPutter { + tree: sled::Tree, + + /// The directories (inside the directory validator) that we insert later, + /// or None, if they were already inserted. + directory_validator: Option, +} + +#[async_trait] +impl DirectoryPutter for SledDirectoryPutter { + #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)] + async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> { + match self.directory_validator { + None => return Err(Error::StorageError("already closed".to_string())), + Some(ref mut validator) => { + validator.add(directory)?; + } + } + + Ok(()) + } + + #[instrument(level = "trace", skip_all, ret, err)] + async fn close(&mut self) -> Result { + match self.directory_validator.take() { + None => Err(Error::InvalidRequest("already closed".to_string())), + Some(validator) => { + // retrieve the validated directories. + let directories = validator.finalize()?; + + // Get the root digest, which is at the end (cf. insertion order) + let root_digest = directories + .last() + .ok_or_else(|| Error::InvalidRequest("got no directories".to_string()))? + .digest(); + + let mut batch = sled::Batch::default(); + for directory in directories { + batch.insert(directory.digest().as_slice(), directory.encode_to_vec()); + } + + self.tree + .apply_batch(batch) + .map_err(|e| Error::StorageError(format!("unable to apply batch: {}", e)))?; + + Ok(root_digest) + } + } } }