refactor(tvix/store): use Arc instead of Box

This allows us to blob services without closing them before putting them
in a box.
We currently need to use Arc<_>, not Rc<_>, because the GRPC wrappers
require Sync.

Change-Id: I679c5f06b62304f5b0456cfefe25a0a881de7c84
Reviewed-on: https://cl.tvl.fyi/c/depot/+/8738
Reviewed-by: tazjin <tazjin@tvl.su>
Tested-by: BuildkiteCI
Autosubmit: flokli <flokli@flokli.de>
This commit is contained in:
Florian Klink 2023-06-09 18:22:25 +03:00 committed by clbot
parent 7725eb53ad
commit aa7bdc1199
13 changed files with 132 additions and 108 deletions

View file

@ -7,6 +7,7 @@ mod tvix_io;
use std::cell::RefCell;
use std::rc::Rc;
use std::sync::Arc;
use std::{fs, path::PathBuf};
use clap::Parser;
@ -71,20 +72,14 @@ fn interpret(code: &str, path: Option<PathBuf>, args: &Args, explain: bool) -> b
eval.strict = args.strict;
let blob_service = MemoryBlobService::default();
let directory_service = MemoryDirectoryService::default();
let path_info_service = MemoryPathInfoService::new(
Box::new(blob_service.clone()),
Box::new(directory_service.clone()),
);
let blob_service = Arc::new(MemoryBlobService::default());
let directory_service = Arc::new(MemoryDirectoryService::default());
let path_info_service =
MemoryPathInfoService::new(blob_service.clone(), directory_service.clone());
eval.io_handle = Box::new(tvix_io::TvixIO::new(
known_paths.clone(),
tvix_store::TvixStoreIO::new(
Box::new(blob_service),
Box::new(directory_service),
path_info_service,
),
tvix_store::TvixStoreIO::new(blob_service, directory_service, path_info_service),
));
// bundle fetchurl.nix (used in nixpkgs) by resolving <nix> to

View file

@ -100,16 +100,14 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
match cli.command {
Commands::Daemon { listen_address } => {
// initialize stores
let blob_service = SledBlobService::new("blobs.sled".into())?;
let boxed_blob_service: Box<dyn BlobService> = Box::new(blob_service.clone());
let boxed_blob_service2: Box<dyn BlobService> = Box::new(blob_service.clone());
let directory_service = SledDirectoryService::new("directories.sled".into())?;
let boxed_directory_service = Box::new(directory_service.clone());
let boxed_directory_service2: Box<dyn DirectoryService> = Box::new(directory_service);
let blob_service: Arc<dyn BlobService> =
Arc::new(SledBlobService::new("blobs.sled".into())?);
let directory_service: Arc<dyn DirectoryService> =
Arc::new(SledDirectoryService::new("directories.sled".into())?);
let path_info_service = SledPathInfoService::new(
"pathinfo.sled".into(),
boxed_blob_service,
boxed_directory_service,
blob_service.clone(),
directory_service.clone(),
)?;
let listen_address = listen_address
@ -122,10 +120,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
#[allow(unused_mut)]
let mut router = server
.add_service(BlobServiceServer::new(GRPCBlobServiceWrapper::from(
boxed_blob_service2,
blob_service,
)))
.add_service(DirectoryServiceServer::new(
GRPCDirectoryServiceWrapper::from(boxed_directory_service2),
GRPCDirectoryServiceWrapper::from(directory_service),
))
.add_service(PathInfoServiceServer::new(
GRPCPathInfoServiceWrapper::from(path_info_service),
@ -156,8 +154,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
GRPCPathInfoService::from_client(path_info_service_client.clone());
let io = Arc::new(TvixStoreIO::new(
Box::new(blob_service),
Box::new(directory_service),
Arc::new(blob_service),
Arc::new(directory_service),
path_info_service,
));

View file

@ -1,5 +1,6 @@
use super::DirectoryService;
use crate::{proto::NamedNode, B3Digest, Error};
use std::sync::Arc;
use tracing::{instrument, warn};
/// This traverses from a (root) node to the given (sub)path, returning the Node
@ -11,7 +12,7 @@ use tracing::{instrument, warn};
/// clearly distinguish it from the BFS traversers.
#[instrument(skip(directory_service))]
pub fn traverse_to(
directory_service: &Box<dyn DirectoryService>,
directory_service: Arc<dyn DirectoryService>,
node: crate::proto::node::Node,
path: &std::path::Path,
) -> Result<Option<crate::proto::node::Node>, Error> {
@ -91,7 +92,7 @@ mod tests {
#[test]
fn test_traverse_to() {
let mut directory_service = gen_directory_service();
let directory_service = gen_directory_service();
let mut handle = directory_service.put_multiple_start();
handle
@ -121,7 +122,7 @@ mod tests {
// traversal to an empty subpath should return the root node.
{
let resp = traverse_to(
&mut directory_service,
directory_service.clone(),
node_directory_complicated.clone(),
&PathBuf::from(""),
)
@ -133,7 +134,7 @@ mod tests {
// traversal to `keep` should return the node for DIRECTORY_WITH_KEEP
{
let resp = traverse_to(
&mut directory_service,
directory_service.clone(),
node_directory_complicated.clone(),
&PathBuf::from("keep"),
)
@ -145,7 +146,7 @@ mod tests {
// traversal to `keep/.keep` should return the node for the .keep file
{
let resp = traverse_to(
&mut directory_service,
directory_service.clone(),
node_directory_complicated.clone(),
&PathBuf::from("keep/.keep"),
)
@ -157,7 +158,7 @@ mod tests {
// traversal to `keep/.keep` should return the node for the .keep file
{
let resp = traverse_to(
&mut directory_service,
directory_service.clone(),
node_directory_complicated.clone(),
&PathBuf::from("/keep/.keep"),
)
@ -169,7 +170,7 @@ mod tests {
// traversal to `void` should return None (doesn't exist)
{
let resp = traverse_to(
&mut directory_service,
directory_service.clone(),
node_directory_complicated.clone(),
&PathBuf::from("void"),
)
@ -181,7 +182,7 @@ mod tests {
// traversal to `void` should return None (doesn't exist)
{
let resp = traverse_to(
&mut directory_service,
directory_service.clone(),
node_directory_complicated.clone(),
&PathBuf::from("//v/oid"),
)
@ -194,7 +195,7 @@ mod tests {
// reached, as keep/.keep already is a file)
{
let resp = traverse_to(
&mut directory_service,
directory_service.clone(),
node_directory_complicated.clone(),
&PathBuf::from("keep/.keep/foo"),
)
@ -206,7 +207,7 @@ mod tests {
// traversal to a subpath of '/' should return the root node.
{
let resp = traverse_to(
&mut directory_service,
directory_service.clone(),
node_directory_complicated.clone(),
&PathBuf::from("/"),
)

View file

@ -1,6 +1,7 @@
use crate::blobservice::BlobService;
use crate::directoryservice::DirectoryService;
use crate::{directoryservice::DirectoryPutter, proto};
use std::sync::Arc;
use std::{
collections::HashMap,
fmt::Debug,
@ -57,7 +58,7 @@ impl From<super::Error> for Error {
// It assumes the caller adds returned nodes to the directories it assembles.
#[instrument(skip_all, fields(entry.file_type=?&entry.file_type(),entry.path=?entry.path()))]
fn process_entry(
blob_service: &Box<dyn BlobService>,
blob_service: Arc<dyn BlobService>,
directory_putter: &mut Box<dyn DirectoryPutter>,
entry: &walkdir::DirEntry,
maybe_directory: Option<proto::Directory>,
@ -146,8 +147,8 @@ fn process_entry(
/// naming scheme.
#[instrument(skip(blob_service, directory_service), fields(path=?p))]
pub fn ingest_path<P: AsRef<Path> + Debug>(
blob_service: &Box<dyn BlobService>,
directory_service: &Box<dyn DirectoryService>,
blob_service: Arc<dyn BlobService>,
directory_service: Arc<dyn DirectoryService>,
p: P,
) -> Result<proto::node::Node, Error> {
// Probe if the path points to a symlink. If it does, we process it manually,
@ -199,7 +200,12 @@ pub fn ingest_path<P: AsRef<Path> + Debug>(
}
};
let node = process_entry(blob_service, &mut directory_putter, &entry, maybe_directory)?;
let node = process_entry(
blob_service.clone(),
&mut directory_putter,
&entry,
maybe_directory,
)?;
if entry.depth() == 0 {
return Ok(node);

View file

@ -8,15 +8,18 @@ use crate::{
use count_write::CountWrite;
use nix_compat::nar;
use sha2::{Digest, Sha256};
use std::io::{self, BufReader};
use std::{
io::{self, BufReader},
sync::Arc,
};
use tracing::warn;
/// Invoke [render_nar], and return the size and sha256 digest of the produced
/// NAR output.
pub fn calculate_size_and_sha256(
root_node: &proto::node::Node,
blob_service: &Box<dyn BlobService>,
directory_service: &Box<dyn DirectoryService>,
blob_service: Arc<dyn BlobService>,
directory_service: Arc<dyn DirectoryService>,
) -> Result<(u64, [u8; 32]), RenderError> {
let h = Sha256::new();
let mut cw = CountWrite::from(h);
@ -33,8 +36,8 @@ pub fn calculate_size_and_sha256(
pub fn write_nar<W: std::io::Write>(
w: &mut W,
proto_root_node: &proto::node::Node,
blob_service: &Box<dyn BlobService>,
directory_service: &Box<dyn DirectoryService>,
blob_service: Arc<dyn BlobService>,
directory_service: Arc<dyn DirectoryService>,
) -> Result<(), RenderError> {
// Initialize NAR writer
let nar_root_node = nar::writer::open(w).map_err(RenderError::NARWriterError)?;
@ -52,8 +55,8 @@ pub fn write_nar<W: std::io::Write>(
fn walk_node(
nar_node: nar::writer::Node,
proto_node: &proto::node::Node,
blob_service: &Box<dyn BlobService>,
directory_service: &Box<dyn DirectoryService>,
blob_service: Arc<dyn BlobService>,
directory_service: Arc<dyn DirectoryService>,
) -> Result<(), RenderError> {
match proto_node {
proto::node::Node::Symlink(proto_symlink_node) => {
@ -127,7 +130,7 @@ fn walk_node(
walk_node(
child_node,
&proto_node,
blob_service,
blob_service.clone(),
directory_service.clone(),
)?;
}

View file

@ -11,14 +11,14 @@ use std::{
pub struct MemoryPathInfoService {
db: Arc<RwLock<HashMap<[u8; 20], proto::PathInfo>>>,
blob_service: Box<dyn BlobService>,
directory_service: Box<dyn DirectoryService>,
blob_service: Arc<dyn BlobService>,
directory_service: Arc<dyn DirectoryService>,
}
impl MemoryPathInfoService {
pub fn new(
blob_service: Box<dyn BlobService>,
directory_service: Box<dyn DirectoryService>,
blob_service: Arc<dyn BlobService>,
directory_service: Arc<dyn DirectoryService>,
) -> Self {
Self {
db: Default::default(),
@ -58,7 +58,11 @@ impl PathInfoService for MemoryPathInfoService {
}
fn calculate_nar(&self, root_node: &proto::node::Node) -> Result<(u64, [u8; 32]), Error> {
calculate_size_and_sha256(root_node, &self.blob_service, &self.directory_service)
.map_err(|e| Error::StorageError(e.to_string()))
calculate_size_and_sha256(
root_node,
self.blob_service.clone(),
self.directory_service.clone(),
)
.map_err(|e| Error::StorageError(e.to_string()))
}
}

View file

@ -4,7 +4,7 @@ use crate::{
proto, Error,
};
use prost::Message;
use std::path::PathBuf;
use std::{path::PathBuf, sync::Arc};
use tracing::warn;
/// SledPathInfoService stores PathInfo in a [sled](https://github.com/spacejam/sled).
@ -14,15 +14,15 @@ use tracing::warn;
pub struct SledPathInfoService {
db: sled::Db,
blob_service: Box<dyn BlobService>,
directory_service: Box<dyn DirectoryService>,
blob_service: Arc<dyn BlobService>,
directory_service: Arc<dyn DirectoryService>,
}
impl SledPathInfoService {
pub fn new(
p: PathBuf,
blob_service: Box<dyn BlobService>,
directory_service: Box<dyn DirectoryService>,
blob_service: Arc<dyn BlobService>,
directory_service: Arc<dyn DirectoryService>,
) -> Result<Self, sled::Error> {
let config = sled::Config::default().use_compression(true).path(p);
let db = config.open()?;
@ -35,8 +35,8 @@ impl SledPathInfoService {
}
pub fn new_temporary(
blob_service: Box<dyn BlobService>,
directory_service: Box<dyn DirectoryService>,
blob_service: Arc<dyn BlobService>,
directory_service: Arc<dyn DirectoryService>,
) -> Result<Self, sled::Error> {
let config = sled::Config::default().temporary(true);
let db = config.open()?;
@ -95,7 +95,11 @@ impl PathInfoService for SledPathInfoService {
}
fn calculate_nar(&self, root_node: &proto::node::Node) -> Result<(u64, [u8; 32]), Error> {
calculate_size_and_sha256(root_node, &self.blob_service, &self.directory_service)
.map_err(|e| Error::StorageError(e.to_string()))
calculate_size_and_sha256(
root_node,
self.blob_service.clone(),
self.directory_service.clone(),
)
.map_err(|e| Error::StorageError(e.to_string()))
}
}

View file

@ -1,7 +1,7 @@
use crate::{
blobservice::BlobService, proto::sync_read_into_async_read::SyncReadIntoAsyncRead, B3Digest,
};
use std::{collections::VecDeque, io, pin::Pin};
use std::{collections::VecDeque, io, pin::Pin, sync::Arc};
use tokio::task;
use tokio_stream::StreamExt;
use tokio_util::io::ReaderStream;
@ -9,11 +9,11 @@ use tonic::{async_trait, Request, Response, Status, Streaming};
use tracing::{instrument, warn};
pub struct GRPCBlobServiceWrapper {
blob_service: Box<dyn BlobService>,
blob_service: Arc<dyn BlobService>,
}
impl From<Box<dyn BlobService + 'static>> for GRPCBlobServiceWrapper {
fn from(value: Box<dyn BlobService>) -> Self {
impl From<Arc<dyn BlobService>> for GRPCBlobServiceWrapper {
fn from(value: Arc<dyn BlobService>) -> Self {
Self {
blob_service: value,
}

View file

@ -8,13 +8,13 @@ use tonic::{async_trait, Request, Response, Status, Streaming};
use tracing::{debug, instrument, warn};
pub struct GRPCDirectoryServiceWrapper {
directory_service: Arc<Box<dyn DirectoryService>>,
directory_service: Arc<dyn DirectoryService>,
}
impl From<Box<dyn DirectoryService>> for GRPCDirectoryServiceWrapper {
fn from(value: Box<dyn DirectoryService>) -> Self {
impl From<Arc<dyn DirectoryService>> for GRPCDirectoryServiceWrapper {
fn from(value: Arc<dyn DirectoryService>) -> Self {
Self {
directory_service: Arc::new(value),
directory_service: value,
}
}
}

View file

@ -8,7 +8,7 @@ use nix_compat::{
store_path::{build_regular_ca_path, StorePath},
};
use smol_str::SmolStr;
use std::{io, path::Path, path::PathBuf};
use std::{io, path::Path, path::PathBuf, sync::Arc};
use tracing::{error, instrument, warn};
use tvix_eval::{EvalIO, FileType, StdIO};
@ -30,16 +30,16 @@ use crate::{
/// on the filesystem (still managed by Nix), as well as being able to read
/// files outside store paths.
pub struct TvixStoreIO<PS: PathInfoService> {
blob_service: Box<dyn BlobService>,
directory_service: Box<dyn DirectoryService>,
blob_service: Arc<dyn BlobService>,
directory_service: Arc<dyn DirectoryService>,
path_info_service: PS,
std_io: StdIO,
}
impl<PS: PathInfoService> TvixStoreIO<PS> {
pub fn new(
blob_service: Box<dyn BlobService>,
directory_service: Box<dyn DirectoryService>,
blob_service: Arc<dyn BlobService>,
directory_service: Arc<dyn DirectoryService>,
path_info_service: PS,
) -> Self {
Self {
@ -86,7 +86,7 @@ impl<PS: PathInfoService> TvixStoreIO<PS> {
}
};
directoryservice::traverse_to(&self.directory_service, root_node, sub_path)
directoryservice::traverse_to(self.directory_service.clone(), root_node, sub_path)
}
/// Imports a given path on the filesystem into the store, and returns the
@ -100,13 +100,20 @@ impl<PS: PathInfoService> TvixStoreIO<PS> {
path: &std::path::Path,
) -> Result<crate::proto::PathInfo, io::Error> {
// Call [import::ingest_path], which will walk over the given path and return a root_node.
let root_node = import::ingest_path(&self.blob_service, &self.directory_service, path)
.expect("error during import_path");
let root_node = import::ingest_path(
self.blob_service.clone(),
self.directory_service.clone(),
path,
)
.expect("error during import_path");
// Render the NAR
let (nar_size, nar_sha256) =
calculate_size_and_sha256(&root_node, &self.blob_service, &self.directory_service)
.expect("error during nar calculation"); // TODO: handle error
let (nar_size, nar_sha256) = calculate_size_and_sha256(
&root_node,
self.blob_service.clone(),
self.directory_service.clone(),
)
.expect("error during nar calculation"); // TODO: handle error
// For given NAR sha256 digest and name, return the new [StorePath] this would have.
let nar_hash_with_mode =

View file

@ -18,8 +18,8 @@ fn symlink() {
.unwrap();
let root_node = ingest_path(
&mut gen_blob_service(),
&mut gen_directory_service(),
gen_blob_service(),
gen_directory_service(),
tmpdir.path().join("doesntmatter"),
)
.expect("must succeed");
@ -39,11 +39,11 @@ fn single_file() {
std::fs::write(tmpdir.path().join("root"), HELLOWORLD_BLOB_CONTENTS).unwrap();
let mut blob_service = gen_blob_service();
let blob_service = gen_blob_service();
let root_node = ingest_path(
&mut blob_service,
&mut gen_directory_service(),
blob_service.clone(),
gen_directory_service(),
tmpdir.path().join("root"),
)
.expect("must succeed");
@ -75,11 +75,15 @@ fn complicated() {
// File ``keep/.keep`
std::fs::write(tmpdir.path().join("keep").join(".keep"), vec![]).unwrap();
let mut blob_service = gen_blob_service();
let mut directory_service = gen_directory_service();
let blob_service = gen_blob_service();
let directory_service = gen_directory_service();
let root_node = ingest_path(&mut blob_service, &mut directory_service, tmpdir.path())
.expect("must succeed");
let root_node = ingest_path(
blob_service.clone(),
directory_service.clone(),
tmpdir.path(),
)
.expect("must succeed");
// ensure root_node matched expectations
assert_eq!(

View file

@ -19,8 +19,8 @@ fn single_symlink() {
target: "/nix/store/somewhereelse".to_string(),
}),
// don't put anything in the stores, as we don't actually do any requests.
&gen_blob_service(),
&gen_directory_service(),
gen_blob_service(),
gen_directory_service(),
)
.expect("must succeed");
@ -41,8 +41,8 @@ fn single_file_missing_blob() {
executable: false,
}),
// the blobservice is empty intentionally, to provoke the error.
&gen_blob_service(),
&gen_directory_service(),
gen_blob_service(),
gen_directory_service(),
)
.expect_err("must fail");
@ -81,8 +81,8 @@ fn single_file_wrong_blob_size() {
size: 42, // <- note the wrong size here!
executable: false,
}),
&blob_service,
&gen_directory_service(),
blob_service.clone(),
gen_directory_service(),
)
.expect_err("must fail");
@ -106,8 +106,8 @@ fn single_file_wrong_blob_size() {
size: 2, // <- note the wrong size here!
executable: false,
}),
&blob_service,
&gen_directory_service(),
blob_service,
gen_directory_service(),
)
.expect_err("must fail");
@ -143,8 +143,8 @@ fn single_file() {
size: HELLOWORLD_BLOB_CONTENTS.len() as u32,
executable: false,
}),
&blob_service,
&gen_directory_service(),
blob_service,
gen_directory_service(),
)
.expect("must succeed");
@ -180,8 +180,8 @@ fn test_complicated() {
digest: DIRECTORY_COMPLICATED.digest().to_vec(),
size: DIRECTORY_COMPLICATED.size(),
}),
&blob_service,
&directory_service,
blob_service.clone(),
directory_service.clone(),
)
.expect("must succeed");
@ -194,8 +194,8 @@ fn test_complicated() {
digest: DIRECTORY_COMPLICATED.digest().to_vec(),
size: DIRECTORY_COMPLICATED.size(),
}),
&blob_service,
&directory_service,
blob_service,
directory_service,
)
.expect("must succeed");

View file

@ -1,20 +1,22 @@
use std::sync::Arc;
use crate::{
blobservice::{BlobService, MemoryBlobService},
directoryservice::{DirectoryService, MemoryDirectoryService},
pathinfoservice::{MemoryPathInfoService, PathInfoService},
};
pub fn gen_blob_service() -> Box<dyn BlobService> {
Box::new(MemoryBlobService::default())
pub fn gen_blob_service() -> Arc<dyn BlobService> {
Arc::new(MemoryBlobService::default())
}
pub fn gen_directory_service() -> Box<dyn DirectoryService> {
Box::new(MemoryDirectoryService::default())
pub fn gen_directory_service() -> Arc<dyn DirectoryService> {
Arc::new(MemoryDirectoryService::default())
}
pub fn gen_pathinfo_service(
blob_service: Box<dyn BlobService>,
directory_service: Box<dyn DirectoryService>,
blob_service: Arc<dyn BlobService>,
directory_service: Arc<dyn DirectoryService>,
) -> impl PathInfoService {
MemoryPathInfoService::new(blob_service, directory_service)
}