refactor(tvix): use AsRef<dyn …> instead of Deref<Target= …>

Removes some more needs for Arcs.

Change-Id: I9a9f4b81641c271de260e9ffa98313a32944d760
Reviewed-on: https://cl.tvl.fyi/c/depot/+/10578
Autosubmit: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
Reviewed-by: raitobezarius <tvl@lahfa.xyz>
This commit is contained in:
Florian Klink 2024-01-09 11:04:29 +02:00 committed by clbot
parent 8fbdf72825
commit 89882ff9b1
8 changed files with 58 additions and 69 deletions

View file

@ -23,7 +23,6 @@ use fuse_backend_rs::abi::fuse_abi::stat64;
use fuse_backend_rs::api::filesystem::{Context, FileSystem, FsOptions, ROOT_ID}; use fuse_backend_rs::api::filesystem::{Context, FileSystem, FsOptions, ROOT_ID};
use futures::StreamExt; use futures::StreamExt;
use parking_lot::RwLock; use parking_lot::RwLock;
use std::ops::Deref;
use std::{ use std::{
collections::HashMap, collections::HashMap,
io, io,
@ -101,8 +100,8 @@ pub struct TvixStoreFs<BS, DS, RN> {
impl<BS, DS, RN> TvixStoreFs<BS, DS, RN> impl<BS, DS, RN> TvixStoreFs<BS, DS, RN>
where where
BS: Deref<Target = dyn BlobService> + Clone + Send, BS: AsRef<dyn BlobService> + Clone + Send,
DS: Deref<Target = dyn DirectoryService> + Clone + Send + 'static, DS: AsRef<dyn DirectoryService> + Clone + Send + 'static,
RN: RootNodes + Clone + 'static, RN: RootNodes + Clone + 'static,
{ {
pub fn new( pub fn new(
@ -157,7 +156,7 @@ where
.block_on(self.tokio_handle.spawn({ .block_on(self.tokio_handle.spawn({
let directory_service = self.directory_service.clone(); let directory_service = self.directory_service.clone();
let parent_digest = parent_digest.to_owned(); let parent_digest = parent_digest.to_owned();
async move { directory_service.get(&parent_digest).await } async move { directory_service.as_ref().get(&parent_digest).await }
})) }))
.unwrap()? .unwrap()?
.ok_or_else(|| { .ok_or_else(|| {
@ -270,8 +269,8 @@ where
impl<BS, DS, RN> FileSystem for TvixStoreFs<BS, DS, RN> impl<BS, DS, RN> FileSystem for TvixStoreFs<BS, DS, RN>
where where
BS: Deref<Target = dyn BlobService> + Clone + Send + 'static, BS: AsRef<dyn BlobService> + Clone + Send + 'static,
DS: Deref<Target = dyn DirectoryService> + Send + Clone + 'static, DS: AsRef<dyn DirectoryService> + Send + Clone + 'static,
RN: RootNodes + Clone + 'static, RN: RootNodes + Clone + 'static,
{ {
type Handle = u64; type Handle = u64;
@ -496,7 +495,7 @@ where
let task = self let task = self
.tokio_handle .tokio_handle
.spawn(async move { blob_service.open_read(&blob_digest).await }); .spawn(async move { blob_service.as_ref().open_read(&blob_digest).await });
let blob_reader = self.tokio_handle.block_on(task).unwrap(); let blob_reader = self.tokio_handle.block_on(task).unwrap();

View file

@ -1,4 +1,4 @@
use std::{collections::BTreeMap, ops::Deref, pin::Pin}; use std::{collections::BTreeMap, pin::Pin};
use crate::{proto::node::Node, Error}; use crate::{proto::node::Node, Error};
use bytes::Bytes; use bytes::Bytes;
@ -23,13 +23,15 @@ pub trait RootNodes: Send + Sync {
/// the key is the node name. /// the key is the node name.
impl<T> RootNodes for T impl<T> RootNodes for T
where where
T: Deref<Target = BTreeMap<Bytes, Node>> + Send + Sync, T: AsRef<BTreeMap<Bytes, Node>> + Send + Sync,
{ {
async fn get_by_basename(&self, name: &[u8]) -> Result<Option<Node>, Error> { async fn get_by_basename(&self, name: &[u8]) -> Result<Option<Node>, Error> {
Ok(self.get(name).cloned()) Ok(self.as_ref().get(name).cloned())
} }
fn list(&self) -> Pin<Box<dyn Stream<Item = Result<Node, Error>> + Send + '_>> { fn list(&self) -> Pin<Box<dyn Stream<Item = Result<Node, Error>> + Send + '_>> {
Box::pin(tokio_stream::iter(self.iter().map(|(_, v)| Ok(v.clone())))) Box::pin(tokio_stream::iter(
self.as_ref().iter().map(|(_, v)| Ok(v.clone())),
))
} }
} }

View file

@ -1,7 +1,6 @@
use std::{ use std::{
collections::BTreeMap, collections::BTreeMap,
io::{self, Cursor}, io::{self, Cursor},
ops::Deref,
os::unix::fs::MetadataExt, os::unix::fs::MetadataExt,
path::Path, path::Path,
sync::Arc, sync::Arc,
@ -43,8 +42,8 @@ fn do_mount<P: AsRef<Path>, BS, DS>(
list_root: bool, list_root: bool,
) -> io::Result<FuseDaemon> ) -> io::Result<FuseDaemon>
where where
BS: Deref<Target = dyn BlobService> + Send + Sync + Clone + 'static, BS: AsRef<dyn BlobService> + Send + Sync + Clone + 'static,
DS: Deref<Target = dyn DirectoryService> + Send + Sync + Clone + 'static, DS: AsRef<dyn DirectoryService> + Send + Sync + Clone + 'static,
{ {
let fs = TvixStoreFs::new( let fs = TvixStoreFs::new(
blob_service, blob_service,

View file

@ -7,7 +7,6 @@ use crate::proto::DirectoryNode;
use crate::proto::FileNode; use crate::proto::FileNode;
use crate::proto::SymlinkNode; use crate::proto::SymlinkNode;
use crate::Error as CastoreError; use crate::Error as CastoreError;
use std::ops::Deref;
use std::os::unix::ffi::OsStrExt; use std::os::unix::ffi::OsStrExt;
use std::{ use std::{
collections::HashMap, collections::HashMap,
@ -68,7 +67,7 @@ async fn process_entry<'a, BS>(
maybe_directory: Option<Directory>, maybe_directory: Option<Directory>,
) -> Result<Node, Error> ) -> Result<Node, Error>
where where
BS: Deref<Target = dyn BlobService> + Clone, BS: AsRef<dyn BlobService> + Clone,
{ {
let file_type = entry.file_type(); let file_type = entry.file_type();
@ -114,7 +113,7 @@ where
.await .await
.map_err(|e| Error::UnableToOpen(entry.path().to_path_buf(), e))?; .map_err(|e| Error::UnableToOpen(entry.path().to_path_buf(), e))?;
let mut writer = blob_service.open_write().await; let mut writer = blob_service.as_ref().open_write().await;
if let Err(e) = tokio::io::copy(&mut file, &mut writer).await { if let Err(e) = tokio::io::copy(&mut file, &mut writer).await {
return Err(Error::UnableToRead(entry.path().to_path_buf(), e)); return Err(Error::UnableToRead(entry.path().to_path_buf(), e));
@ -156,12 +155,12 @@ pub async fn ingest_path<'a, BS, DS, P>(
) -> Result<Node, Error> ) -> Result<Node, Error>
where where
P: AsRef<Path> + Debug, P: AsRef<Path> + Debug,
BS: Deref<Target = dyn BlobService> + Clone, BS: AsRef<dyn BlobService> + Clone,
DS: Deref<Target = &'a dyn DirectoryService>, DS: AsRef<dyn DirectoryService>,
{ {
let mut directories: HashMap<PathBuf, Directory> = HashMap::default(); let mut directories: HashMap<PathBuf, Directory> = HashMap::default();
let mut directory_putter = directory_service.put_multiple_start(); let mut directory_putter = directory_service.as_ref().put_multiple_start();
for entry in WalkDir::new(p.as_ref()) for entry in WalkDir::new(p.as_ref())
.follow_links(false) .follow_links(false)

View file

@ -1,10 +1,8 @@
use crate::blobservice::BlobService; use crate::blobservice::BlobService;
use crate::directoryservice::DirectoryService;
use crate::fixtures::*; use crate::fixtures::*;
use crate::import::ingest_path; use crate::import::ingest_path;
use crate::proto; use crate::proto;
use crate::utils::{gen_blob_service, gen_directory_service}; use crate::utils::{gen_blob_service, gen_directory_service};
use std::ops::Deref;
use std::sync::Arc; use std::sync::Arc;
use tempfile::TempDir; use tempfile::TempDir;
@ -14,8 +12,8 @@ use std::os::unix::ffi::OsStrExt;
#[cfg(target_family = "unix")] #[cfg(target_family = "unix")]
#[tokio::test] #[tokio::test]
async fn symlink() { async fn symlink() {
let blob_service: Arc<dyn BlobService> = gen_blob_service().into(); let blob_service = gen_blob_service();
let directory_service: Arc<dyn DirectoryService> = gen_directory_service().into(); let directory_service = gen_directory_service();
let tmpdir = TempDir::new().unwrap(); let tmpdir = TempDir::new().unwrap();
@ -27,8 +25,8 @@ async fn symlink() {
.unwrap(); .unwrap();
let root_node = ingest_path( let root_node = ingest_path(
blob_service, Arc::from(blob_service),
&directory_service.deref(), directory_service,
tmpdir.path().join("doesntmatter"), tmpdir.path().join("doesntmatter"),
) )
.await .await
@ -46,7 +44,7 @@ async fn symlink() {
#[tokio::test] #[tokio::test]
async fn single_file() { async fn single_file() {
let blob_service: Arc<dyn BlobService> = gen_blob_service().into(); let blob_service: Arc<dyn BlobService> = gen_blob_service().into();
let directory_service: Arc<dyn DirectoryService> = gen_directory_service().into(); let directory_service = gen_directory_service();
let tmpdir = TempDir::new().unwrap(); let tmpdir = TempDir::new().unwrap();
@ -54,7 +52,7 @@ async fn single_file() {
let root_node = ingest_path( let root_node = ingest_path(
blob_service.clone(), blob_service.clone(),
&directory_service.deref(), directory_service,
tmpdir.path().join("root"), tmpdir.path().join("root"),
) )
.await .await
@ -78,7 +76,7 @@ async fn single_file() {
#[tokio::test] #[tokio::test]
async fn complicated() { async fn complicated() {
let blob_service: Arc<dyn BlobService> = gen_blob_service().into(); let blob_service: Arc<dyn BlobService> = gen_blob_service().into();
let directory_service: Arc<dyn DirectoryService> = gen_directory_service().into(); let directory_service = gen_directory_service();
let tmpdir = TempDir::new().unwrap(); let tmpdir = TempDir::new().unwrap();
@ -91,11 +89,7 @@ async fn complicated() {
// File ``keep/.keep` // File ``keep/.keep`
std::fs::write(tmpdir.path().join("keep").join(".keep"), vec![]).unwrap(); std::fs::write(tmpdir.path().join("keep").join(".keep"), vec![]).unwrap();
let root_node = ingest_path( let root_node = ingest_path(blob_service.clone(), &directory_service, tmpdir.path())
blob_service.clone(),
&directory_service.deref(),
tmpdir.path(),
)
.await .await
.expect("must succeed"); .expect("must succeed");

View file

@ -3,7 +3,6 @@
use nix_compat::store_path::StorePath; use nix_compat::store_path::StorePath;
use std::{ use std::{
io, io,
ops::Deref,
path::{Path, PathBuf}, path::{Path, PathBuf},
}; };
use tokio::io::AsyncReadExt; use tokio::io::AsyncReadExt;
@ -35,8 +34,8 @@ pub struct TvixStoreIO<BS, DS, PS> {
impl<BS, DS, PS> TvixStoreIO<BS, DS, PS> impl<BS, DS, PS> TvixStoreIO<BS, DS, PS>
where where
DS: Deref<Target = dyn DirectoryService>, DS: AsRef<dyn DirectoryService>,
PS: Deref<Target = dyn PathInfoService>, PS: AsRef<dyn PathInfoService>,
{ {
pub fn new( pub fn new(
blob_service: BS, blob_service: BS,
@ -70,7 +69,7 @@ where
.tokio_handle .tokio_handle
.block_on(async { .block_on(async {
self.path_info_service self.path_info_service
.deref() .as_ref()
.get(*store_path.digest()) .get(*store_path.digest())
.await .await
}) })
@ -93,7 +92,7 @@ where
// with the root_node and sub_path, descend to the node requested. // with the root_node and sub_path, descend to the node requested.
Ok(self.tokio_handle.block_on({ Ok(self.tokio_handle.block_on({
async { async {
directoryservice::descend_to(self.directory_service.deref(), root_node, sub_path) directoryservice::descend_to(self.directory_service.as_ref(), root_node, sub_path)
.await .await
} }
})?) })?)
@ -102,9 +101,9 @@ where
impl<BS, DS, PS> EvalIO for TvixStoreIO<BS, DS, PS> impl<BS, DS, PS> EvalIO for TvixStoreIO<BS, DS, PS>
where where
BS: Deref<Target = dyn BlobService> + Clone, BS: AsRef<dyn BlobService> + Clone,
DS: Deref<Target = dyn DirectoryService>, DS: AsRef<dyn DirectoryService>,
PS: Deref<Target = dyn PathInfoService>, PS: AsRef<dyn PathInfoService>,
{ {
#[instrument(skip(self), ret, err)] #[instrument(skip(self), ret, err)]
fn path_exists(&self, path: &Path) -> io::Result<bool> { fn path_exists(&self, path: &Path) -> io::Result<bool> {
@ -154,7 +153,7 @@ where
self.tokio_handle.block_on(async { self.tokio_handle.block_on(async {
let mut reader = { let mut reader = {
let resp = self.blob_service.deref().open_read(&digest).await?; let resp = self.blob_service.as_ref().open_read(&digest).await?;
match resp { match resp {
Some(blob_reader) => blob_reader, Some(blob_reader) => blob_reader,
None => { None => {
@ -212,10 +211,9 @@ where
) )
})?; })?;
if let Some(directory) = self if let Some(directory) = self.tokio_handle.block_on(async {
.tokio_handle self.directory_service.as_ref().get(&digest).await
.block_on(async { self.directory_service.deref().get(&digest).await })? })? {
{
let mut children: Vec<(bytes::Bytes, FileType)> = Vec::new(); let mut children: Vec<(bytes::Bytes, FileType)> = Vec::new();
for node in directory.nodes() { for node in directory.nodes() {
children.push(match node { children.push(match node {
@ -263,9 +261,9 @@ where
let output_path = self.tokio_handle.block_on(async { let output_path = self.tokio_handle.block_on(async {
tvix_store::utils::import_path( tvix_store::utils::import_path(
path, path,
self.blob_service.deref(), &self.blob_service,
self.directory_service.deref(), &self.directory_service,
self.path_info_service.deref(), &self.path_info_service,
) )
.await .await
})?; })?;

View file

@ -1,6 +1,5 @@
use futures::Stream; use futures::Stream;
use futures::StreamExt; use futures::StreamExt;
use std::ops::Deref;
use std::pin::Pin; use std::pin::Pin;
use tonic::async_trait; use tonic::async_trait;
use tvix_castore::fs::{RootNodes, TvixStoreFs}; use tvix_castore::fs::{RootNodes, TvixStoreFs};
@ -21,9 +20,9 @@ pub fn make_fs<BS, DS, PS>(
list_root: bool, list_root: bool,
) -> TvixStoreFs<BS, DS, RootNodesWrapper<PS>> ) -> TvixStoreFs<BS, DS, RootNodesWrapper<PS>>
where where
BS: Deref<Target = dyn BlobService> + Send + Clone + 'static, BS: AsRef<dyn BlobService> + Send + Clone + 'static,
DS: Deref<Target = dyn DirectoryService> + Send + Clone + 'static, DS: AsRef<dyn DirectoryService> + Send + Clone + 'static,
PS: Deref<Target = dyn PathInfoService> + Send + Sync + Clone + 'static, PS: AsRef<dyn PathInfoService> + Send + Sync + Clone + 'static,
{ {
TvixStoreFs::new( TvixStoreFs::new(
blob_service, blob_service,
@ -46,7 +45,7 @@ pub struct RootNodesWrapper<T>(pub(crate) T);
#[async_trait] #[async_trait]
impl<T> RootNodes for RootNodesWrapper<T> impl<T> RootNodes for RootNodesWrapper<T>
where where
T: Deref<Target = dyn PathInfoService> + Send + Sync, T: AsRef<dyn PathInfoService> + Send + Sync,
{ {
async fn get_by_basename(&self, name: &[u8]) -> Result<Option<castorepb::node::Node>, Error> { async fn get_by_basename(&self, name: &[u8]) -> Result<Option<castorepb::node::Node>, Error> {
let Ok(store_path) = nix_compat::store_path::StorePath::from_bytes(name) else { let Ok(store_path) = nix_compat::store_path::StorePath::from_bytes(name) else {
@ -55,7 +54,7 @@ where
Ok(self Ok(self
.0 .0
.deref() .as_ref()
.get(*store_path.digest()) .get(*store_path.digest())
.await? .await?
.map(|path_info| { .map(|path_info| {
@ -68,7 +67,7 @@ where
} }
fn list(&self) -> Pin<Box<dyn Stream<Item = Result<castorepb::node::Node, Error>> + Send>> { fn list(&self) -> Pin<Box<dyn Stream<Item = Result<castorepb::node::Node, Error>> + Send>> {
Box::pin(self.0.deref().list().map(|result| { Box::pin(self.0.as_ref().list().map(|result| {
result.map(|path_info| { result.map(|path_info| {
path_info path_info
.node .node

View file

@ -1,4 +1,4 @@
use std::{ops::Deref, path::Path, sync::Arc}; use std::{path::Path, sync::Arc};
use data_encoding::BASE64; use data_encoding::BASE64;
use nix_compat::store_path::{self, StorePath}; use nix_compat::store_path::{self, StorePath};
@ -53,9 +53,9 @@ pub async fn import_path<BS, DS, PS, P>(
) -> Result<StorePath, std::io::Error> ) -> Result<StorePath, std::io::Error>
where where
P: AsRef<Path> + std::fmt::Debug, P: AsRef<Path> + std::fmt::Debug,
BS: Deref<Target = dyn BlobService> + Clone, BS: AsRef<dyn BlobService> + Clone,
DS: Deref<Target = dyn DirectoryService>, DS: AsRef<dyn DirectoryService>,
PS: Deref<Target = dyn PathInfoService>, PS: AsRef<dyn PathInfoService>,
{ {
// calculate the name // calculate the name
// TODO: make a path_to_name helper function? // TODO: make a path_to_name helper function?
@ -71,15 +71,14 @@ where
})?; })?;
// Ingest the path into blob and directory service. // Ingest the path into blob and directory service.
let root_node = let root_node = tvix_castore::import::ingest_path(blob_service, &directory_service, &path)
tvix_castore::import::ingest_path(blob_service, &directory_service.deref(), &path)
.await .await
.expect("failed to ingest path"); .expect("failed to ingest path");
debug!(root_node =?root_node, "import successful"); debug!(root_node =?root_node, "import successful");
// Ask the PathInfoService for the NAR size and sha256 // Ask the PathInfoService for the NAR size and sha256
let (nar_size, nar_sha256) = path_info_service.calculate_nar(&root_node).await?; let (nar_size, nar_sha256) = path_info_service.as_ref().calculate_nar(&root_node).await?;
// Calculate the output path. This might still fail, as some names are illegal. // Calculate the output path. This might still fail, as some names are illegal.
let output_path = store_path::build_nar_based_store_path(&nar_sha256, name).map_err(|_| { let output_path = store_path::build_nar_based_store_path(&nar_sha256, name).map_err(|_| {
@ -115,7 +114,7 @@ where
// put into [PathInfoService], and return the PathInfo that we get back // put into [PathInfoService], and return the PathInfo that we get back
// from there (it might contain additional signatures). // from there (it might contain additional signatures).
let _path_info = path_info_service.put(path_info).await?; let _path_info = path_info_service.as_ref().put(path_info).await?;
Ok(output_path.to_owned()) Ok(output_path.to_owned())
} }