refactor(tvix/[ca]store): use auto_impl

This implements BS, DS, PS for Box'ed or Arc'ed variants of it with less
code, and less potential to accidentially forget to proxy default trait
methods for blanked impls, as fixed in cl/12658.

Change-Id: If2cdbb563a73792038ebe7bff45d6f880214855b
Reviewed-on: https://cl.tvl.fyi/c/depot/+/12661
Tested-by: BuildkiteCI
Autosubmit: flokli <flokli@flokli.de>
Reviewed-by: edef <edef@edef.eu>
This commit is contained in:
Florian Klink 2024-10-18 14:41:14 +02:00 committed by clbot
parent 47efebfc6f
commit 9c22345019
16 changed files with 85 additions and 99 deletions

13
tvix/Cargo.lock generated
View file

@ -277,6 +277,17 @@ version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0"
[[package]]
name = "auto_impl"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c87f3f15e7794432337fc718554eaa4dc8f04c9677a950ffe366f20a162ae42"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.79",
]
[[package]]
name = "autocfg"
version = "1.4.0"
@ -4572,6 +4583,7 @@ dependencies = [
"async-process",
"async-stream",
"async-tempfile",
"auto_impl",
"bigtable_rs",
"blake3",
"bstr",
@ -4765,6 +4777,7 @@ dependencies = [
"async-compression",
"async-process",
"async-stream",
"auto_impl",
"bigtable_rs",
"blake3",
"bstr",

View file

@ -992,6 +992,33 @@ rec {
"portable-atomic" = [ "dep:portable-atomic" ];
};
};
"auto_impl" = rec {
crateName = "auto_impl";
version = "1.2.0";
edition = "2021";
sha256 = "0hmfcahj0vrnzq7rayk7r428zp54x9a8awgw6wil753pbvqz71rw";
procMacro = true;
authors = [
"Ashley Mannix <ashleymannix@live.com.au>"
"Lukas Kalbertodt <lukas.kalbertodt@gmail.com>"
];
dependencies = [
{
name = "proc-macro2";
packageId = "proc-macro2";
}
{
name = "quote";
packageId = "quote";
}
{
name = "syn";
packageId = "syn 2.0.79";
features = [ "full" "visit" "visit-mut" ];
}
];
};
"autocfg" = rec {
crateName = "autocfg";
version = "1.4.0";
@ -15131,6 +15158,10 @@ rec {
name = "async-tempfile";
packageId = "async-tempfile";
}
{
name = "auto_impl";
packageId = "auto_impl";
}
{
name = "bigtable_rs";
packageId = "bigtable_rs";
@ -15917,6 +15948,10 @@ rec {
name = "async-stream";
packageId = "async-stream";
}
{
name = "auto_impl";
packageId = "auto_impl";
}
{
name = "bigtable_rs";
packageId = "bigtable_rs";

View file

@ -20,8 +20,8 @@ pub async fn from_addr<BS, DS>(
directory_service: DS,
) -> std::io::Result<Box<dyn BuildService>>
where
BS: AsRef<dyn BlobService> + Send + Sync + Clone + 'static,
DS: AsRef<dyn DirectoryService> + Send + Sync + Clone + 'static,
BS: BlobService + Send + Sync + Clone + 'static,
DS: DirectoryService + Send + Sync + Clone + 'static,
{
let url = Url::parse(uri)
.map_err(|e| std::io::Error::other(format!("unable to parse url: {}", e)))?;

View file

@ -91,8 +91,8 @@ impl<BS, DS> OCIBuildService<BS, DS> {
#[async_trait]
impl<BS, DS> BuildService for OCIBuildService<BS, DS>
where
BS: AsRef<dyn BlobService> + Send + Sync + Clone + 'static,
DS: AsRef<dyn DirectoryService> + Send + Sync + Clone + 'static,
BS: BlobService + Clone + 'static,
DS: DirectoryService + Clone + 'static,
{
#[instrument(skip_all, err)]
async fn do_build(&self, request: BuildRequest) -> std::io::Result<Build> {

View file

@ -52,6 +52,7 @@ vm-memory = { workspace = true, optional = true }
vmm-sys-util = { workspace = true, optional = true }
virtio-bindings = { workspace = true, optional = true }
wu-manber = { workspace = true }
auto_impl = "1.2.0"
[build-dependencies]
prost-build = { workspace = true }

View file

@ -1,5 +1,6 @@
use std::io;
use auto_impl::auto_impl;
use tonic::async_trait;
use crate::composition::{Registry, ServiceBuilder};
@ -29,6 +30,7 @@ pub use self::object_store::{ObjectStoreBlobService, ObjectStoreBlobServiceConfi
/// which will implement a writer interface, and also provides a close funtion,
/// to finalize a blob and get its digest.
#[async_trait]
#[auto_impl(&, &mut, Arc, Box)]
pub trait BlobService: Send + Sync {
/// Check if the service has the blob, by its content hash.
/// On implementations returning chunks, this must also work for chunks.
@ -60,28 +62,6 @@ pub trait BlobService: Send + Sync {
}
}
#[async_trait]
impl<A> BlobService for A
where
A: AsRef<dyn BlobService> + Send + Sync,
{
async fn has(&self, digest: &B3Digest) -> io::Result<bool> {
self.as_ref().has(digest).await
}
async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> {
self.as_ref().open_read(digest).await
}
async fn open_write(&self) -> Box<dyn BlobWriter> {
self.as_ref().open_write().await
}
async fn chunks(&self, digest: &B3Digest) -> io::Result<Option<Vec<ChunkMeta>>> {
self.as_ref().chunks(digest).await
}
}
/// A [tokio::io::AsyncWrite] that the user needs to close() afterwards for persist.
/// On success, it returns the digest of the written blob.
#[async_trait]

View file

@ -170,8 +170,8 @@ impl ServiceBuilder for CacheConfig {
context: &CompositionContext,
) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
let (near, far) = futures::join!(
context.resolve(self.near.clone()),
context.resolve(self.far.clone())
context.resolve::<Self::Output>(self.near.clone()),
context.resolve::<Self::Output>(self.far.clone())
);
Ok(Arc::new(Cache {
near: near?,

View file

@ -1,6 +1,7 @@
use crate::composition::{Registry, ServiceBuilder};
use crate::{B3Digest, Directory, Error};
use auto_impl::auto_impl;
use futures::stream::BoxStream;
use tonic::async_trait;
mod combinators;
@ -39,6 +40,7 @@ pub use self::bigtable::{BigtableDirectoryService, BigtableParameters};
/// This is a simple get and put of [Directory], returning their
/// digest.
#[async_trait]
#[auto_impl(&, &mut, Arc, Box)]
pub trait DirectoryService: Send + Sync {
/// Looks up a single Directory message by its digest.
/// The returned Directory message *must* be valid.
@ -80,31 +82,6 @@ pub trait DirectoryService: Send + Sync {
fn put_multiple_start(&self) -> Box<dyn DirectoryPutter>;
}
#[async_trait]
impl<A> DirectoryService for A
where
A: AsRef<dyn DirectoryService> + Send + Sync,
{
async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> {
self.as_ref().get(digest).await
}
async fn put(&self, directory: Directory) -> Result<B3Digest, Error> {
self.as_ref().put(directory).await
}
fn get_recursive(
&self,
root_directory_digest: &B3Digest,
) -> BoxStream<'static, Result<Directory, Error>> {
self.as_ref().get_recursive(root_directory_digest)
}
fn put_multiple_start(&self) -> Box<dyn DirectoryPutter> {
self.as_ref().put_multiple_start()
}
}
/// Provides a handle to put a closure of connected [Directory] elements.
///
/// The consumer can periodically call [DirectoryPutter::put], starting from the

View file

@ -45,8 +45,8 @@ fn do_mount<P: AsRef<Path>, BS, DS>(
show_xattr: bool,
) -> io::Result<FuseDaemon>
where
BS: AsRef<dyn BlobService> + Send + Sync + Clone + 'static,
DS: AsRef<dyn DirectoryService> + Send + Sync + Clone + 'static,
BS: BlobService + Send + Sync + Clone + 'static,
DS: DirectoryService + Send + Sync + Clone + 'static,
{
let fs = TvixStoreFs::new(
blob_service,

View file

@ -121,8 +121,8 @@ pub struct TvixStoreFs<BS, DS, RN> {
impl<BS, DS, RN> TvixStoreFs<BS, DS, RN>
where
BS: AsRef<dyn BlobService> + Clone + Send,
DS: AsRef<dyn DirectoryService> + Clone + Send + 'static,
BS: BlobService + Clone + Send,
DS: DirectoryService + Clone + Send + 'static,
RN: RootNodes + Clone + 'static,
{
pub fn new(
@ -186,7 +186,7 @@ where
.block_on({
let directory_service = self.directory_service.clone();
let parent_digest = parent_digest.to_owned();
async move { directory_service.as_ref().get(&parent_digest).await }
async move { directory_service.get(&parent_digest).await }
})?
.ok_or_else(|| {
warn!(directory.digest=%parent_digest, "directory not found");
@ -302,8 +302,8 @@ const XATTR_NAME_BLOB_DIGEST: &[u8] = b"user.tvix.castore.blob.digest";
impl<BS, DS, RN> Layer for TvixStoreFs<BS, DS, RN>
where
BS: AsRef<dyn BlobService> + Clone + Send + 'static,
DS: AsRef<dyn DirectoryService> + Send + Clone + 'static,
BS: BlobService + Clone + Send + 'static,
DS: DirectoryService + Send + Clone + 'static,
RN: RootNodes + Clone + 'static,
{
fn root_inode(&self) -> Self::Inode {
@ -313,8 +313,8 @@ where
impl<BS, DS, RN> FileSystem for TvixStoreFs<BS, DS, RN>
where
BS: AsRef<dyn BlobService> + Clone + Send + 'static,
DS: AsRef<dyn DirectoryService> + Send + Clone + 'static,
BS: BlobService + Clone + Send + 'static,
DS: DirectoryService + Send + Clone + 'static,
RN: RootNodes + Clone + 'static,
{
type Handle = u64;
@ -674,7 +674,7 @@ where
match self.tokio_handle.block_on({
let blob_service = self.blob_service.clone();
let blob_digest = blob_digest.clone();
async move { blob_service.as_ref().open_read(&blob_digest).await }
async move { blob_service.open_read(&blob_digest).await }
}) {
Ok(None) => {
warn!("blob not found");

View file

@ -49,6 +49,7 @@ redb = { workspace = true, features = ["logging"] }
mimalloc = { workspace = true }
tonic-reflection = { workspace = true, optional = true }
bigtable_rs = { workspace = true, optional = true }
auto_impl = "1.2.0"
[build-dependencies]
prost-build = { workspace = true }

View file

@ -88,8 +88,8 @@ impl ServiceBuilder for CacheConfig {
context: &CompositionContext,
) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
let (near, far) = futures::join!(
context.resolve(self.near.clone()),
context.resolve(self.far.clone())
context.resolve::<Self::Output>(self.near.clone()),
context.resolve::<Self::Output>(self.far.clone())
);
Ok(Arc::new(Cache {
near: near?,

View file

@ -20,9 +20,9 @@ pub fn make_fs<BS, DS, PS>(
show_xattr: bool,
) -> TvixStoreFs<BS, DS, RootNodesWrapper<PS>>
where
BS: AsRef<dyn BlobService> + Send + Clone + 'static,
DS: AsRef<dyn DirectoryService> + Send + Clone + 'static,
PS: AsRef<dyn PathInfoService> + Send + Sync + Clone + 'static,
BS: BlobService + Send + Clone + 'static,
DS: DirectoryService + Send + Clone + 'static,
PS: PathInfoService + Send + Sync + Clone + 'static,
{
TvixStoreFs::new(
blob_service,
@ -46,7 +46,7 @@ pub struct RootNodesWrapper<T>(pub(crate) T);
#[async_trait]
impl<T> RootNodes for RootNodesWrapper<T>
where
T: AsRef<dyn PathInfoService> + Send + Sync,
T: PathInfoService + Send + Sync,
{
async fn get_by_basename(&self, name: &PathComponent) -> Result<Option<Node>, Error> {
let Ok(store_path) = StorePathRef::from_bytes(name.as_ref()) else {
@ -55,14 +55,13 @@ where
Ok(self
.0
.as_ref()
.get(*store_path.digest())
.await?
.map(|path_info| path_info.node))
}
fn list(&self) -> BoxStream<Result<(PathComponent, Node), Error>> {
Box::pin(self.0.as_ref().list().map(|result| {
Box::pin(self.0.list().map(|result| {
result.map(|path_info| {
let basename = path_info.store_path.to_string();
(

View file

@ -13,6 +13,7 @@ mod fs;
#[cfg(test)]
mod tests;
use auto_impl::auto_impl;
use futures::stream::BoxStream;
use tonic::async_trait;
use tvix_castore::composition::{Registry, ServiceBuilder};
@ -45,6 +46,7 @@ pub use self::fs::make_fs;
/// The base trait all PathInfo services need to implement.
#[async_trait]
#[auto_impl(&, &mut, Arc, Box)]
pub trait PathInfoService: Send + Sync {
/// Retrieve a PathInfo message by the output digest.
async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error>;
@ -69,28 +71,6 @@ pub trait PathInfoService: Send + Sync {
}
}
#[async_trait]
impl<A> PathInfoService for A
where
A: AsRef<dyn PathInfoService> + Send + Sync + 'static,
{
async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
self.as_ref().get(digest).await
}
async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> {
self.as_ref().put(path_info).await
}
fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
self.as_ref().list()
}
fn nar_calculation_service(&self) -> Option<Box<dyn NarCalculationService>> {
self.as_ref().nar_calculation_service()
}
}
/// Registers the builtin PathInfoService implementations with the registry
pub(crate) fn register_pathinfo_services(reg: &mut Registry) {
reg.register::<Box<dyn ServiceBuilder<Output = dyn PathInfoService>>, CachePathInfoServiceConfig>("cache");

View file

@ -66,8 +66,8 @@ impl<BS, DS> NixHTTPPathInfoService<BS, DS> {
#[async_trait]
impl<BS, DS> PathInfoService for NixHTTPPathInfoService<BS, DS>
where
BS: AsRef<dyn BlobService> + Send + Sync + Clone + 'static,
DS: AsRef<dyn DirectoryService> + Send + Sync + Clone + 'static,
BS: BlobService + Send + Sync + Clone + 'static,
DS: DirectoryService + Send + Sync + Clone + 'static,
{
#[instrument(skip_all, err, fields(path.digest=nixbase32::encode(&digest)))]
async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
@ -316,10 +316,10 @@ impl ServiceBuilder for NixHTTPPathInfoServiceConfig {
&'a self,
_instance_name: &str,
context: &CompositionContext,
) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
) -> Result<Arc<Self::Output>, Box<dyn std::error::Error + Send + Sync + 'static>> {
let (blob_service, directory_service) = futures::join!(
context.resolve(self.blob_service.clone()),
context.resolve(self.directory_service.clone())
context.resolve::<dyn BlobService>(self.blob_service.clone()),
context.resolve::<dyn DirectoryService>(self.directory_service.clone())
);
let mut svc = NixHTTPPathInfoService::new(
Url::parse(&self.base_url)?,

View file

@ -96,7 +96,7 @@ impl ServiceBuilder for KeyFileSigningPathInfoServiceConfig {
_instance_name: &str,
context: &CompositionContext,
) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
let inner = context.resolve(self.inner.clone()).await?;
let inner = context.resolve::<Self::Output>(self.inner.clone()).await?;
let signing_key = Arc::new(
parse_keypair(tokio::fs::read_to_string(&self.keyfile).await?.trim())
.map_err(|e| Error::StorageError(e.to_string()))?