diff --git a/tvix/Cargo.lock b/tvix/Cargo.lock index 5d5208b7e..cb3ed4c09 100644 --- a/tvix/Cargo.lock +++ b/tvix/Cargo.lock @@ -3209,6 +3209,15 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "redb" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6dd20d3cdeb9c7d2366a0b16b93b35b75aec15309fbeb7ce477138c9f68c8c0" +dependencies = [ + "libc", +] + [[package]] name = "redox_syscall" version = "0.2.16" @@ -4808,6 +4817,7 @@ dependencies = [ "pin-project-lite", "prost 0.13.1", "prost-build 0.13.1", + "redb", "rstest", "rstest_reuse", "serde", @@ -4994,6 +5004,7 @@ dependencies = [ "pin-project-lite", "prost 0.13.1", "prost-build 0.13.1", + "redb", "reqwest", "reqwest-middleware", "rstest", diff --git a/tvix/Cargo.nix b/tvix/Cargo.nix index 64d0adabf..4826317e9 100644 --- a/tvix/Cargo.nix +++ b/tvix/Cargo.nix @@ -10068,6 +10068,37 @@ rec { "web_spin_lock" = [ "dep:wasm_sync" ]; }; }; + "redb" = rec { + crateName = "redb"; + version = "2.1.1"; + edition = "2021"; + sha256 = "1h68d2gqq4vpwiyfpyq9ag0swxavnf9npcd0cqipv77brp9j1pd6"; + type = [ "cdylib" "rlib" ]; + authors = [ + "Christopher Berner " + ]; + dependencies = [ + { + name = "libc"; + packageId = "libc"; + target = { target, features }: (target."unix" or false); + } + ]; + devDependencies = [ + { + name = "libc"; + packageId = "libc"; + target = { target, features }: (!("wasi" == target."os" or null)); + } + ]; + features = { + "log" = [ "dep:log" ]; + "logging" = [ "log" ]; + "pyo3" = [ "dep:pyo3" ]; + "pyo3-build-config" = [ "dep:pyo3-build-config" ]; + "python" = [ "pyo3" "pyo3-build-config" ]; + }; + }; "redox_syscall 0.2.16" = rec { crateName = "redox_syscall"; version = "0.2.16"; @@ -15647,6 +15678,10 @@ rec { name = "prost"; packageId = "prost 0.13.1"; } + { + name = "redb"; + packageId = "redb"; + } { name = "serde"; packageId = "serde"; @@ -16406,6 +16441,10 @@ rec { name = "prost"; packageId = "prost 0.13.1"; } + { + name = "redb"; + packageId = "redb"; + } { name = "reqwest"; packageId = "reqwest"; diff --git a/tvix/boot/tests/default.nix b/tvix/boot/tests/default.nix index a6c3047d0..101b71819 100644 --- a/tvix/boot/tests/default.nix +++ b/tvix/boot/tests/default.nix @@ -171,6 +171,7 @@ depot.nix.readTree.drvTargets closure-nixos = (mkBootTest { blobServiceAddr = "objectstore+file:///build/blobs"; + pathInfoServiceAddr = "redb:///build/pathinfo.redb"; path = testSystem; isClosure = true; vmCmdline = "init=${testSystem}/init panic=-1"; # reboot immediately on panic diff --git a/tvix/castore/Cargo.toml b/tvix/castore/Cargo.toml index aaa788ca7..ded2292db 100644 --- a/tvix/castore/Cargo.toml +++ b/tvix/castore/Cargo.toml @@ -40,6 +40,7 @@ petgraph = "0.6.4" erased-serde = "0.4.5" serde_tagged = "0.3.0" hyper-util = "0.1.6" +redb = "2.1.1" [dependencies.bigtable_rs] optional = true diff --git a/tvix/castore/src/errors.rs b/tvix/castore/src/errors.rs index 8343d0774..5bbcd7b04 100644 --- a/tvix/castore/src/errors.rs +++ b/tvix/castore/src/errors.rs @@ -33,6 +33,42 @@ impl From for Error { } } +impl From for Error { + fn from(value: redb::Error) -> Self { + Error::StorageError(value.to_string()) + } +} + +impl From for Error { + fn from(value: redb::DatabaseError) -> Self { + Error::StorageError(value.to_string()) + } +} + +impl From for Error { + fn from(value: redb::TableError) -> Self { + Error::StorageError(value.to_string()) + } +} + +impl From for Error { + fn from(value: redb::TransactionError) -> Self { + Error::StorageError(value.to_string()) + } +} + +impl From for Error { + fn from(value: redb::StorageError) -> Self { + Error::StorageError(value.to_string()) + } +} + +impl From for Error { + fn from(value: redb::CommitError) -> Self { + Error::StorageError(value.to_string()) + } +} + impl From for Error { fn from(value: std::io::Error) -> Self { if value.kind() == std::io::ErrorKind::InvalidInput { diff --git a/tvix/store/Cargo.toml b/tvix/store/Cargo.toml index 964fc9be2..de7d0a504 100644 --- a/tvix/store/Cargo.toml +++ b/tvix/store/Cargo.toml @@ -45,6 +45,7 @@ tracing-indicatif = "0.3.6" hyper-util = "0.1.6" toml = { version = "0.8.15", optional = true } tonic-health = { version = "0.12.1", default-features = false } +redb = "2.1.1" [dependencies.tonic-reflection] optional = true diff --git a/tvix/store/src/pathinfoservice/from_addr.rs b/tvix/store/src/pathinfoservice/from_addr.rs index 5635c226c..262e66c1b 100644 --- a/tvix/store/src/pathinfoservice/from_addr.rs +++ b/tvix/store/src/pathinfoservice/from_addr.rs @@ -63,6 +63,8 @@ mod tests { lazy_static! { static ref TMPDIR_SLED_1: TempDir = TempDir::new().unwrap(); static ref TMPDIR_SLED_2: TempDir = TempDir::new().unwrap(); + static ref TMPDIR_REDB_1: TempDir = TempDir::new().unwrap(); + static ref TMPDIR_REDB_2: TempDir = TempDir::new().unwrap(); } // the gRPC tests below don't fail, because we connect lazily. @@ -88,6 +90,14 @@ mod tests { #[case::memory_invalid_root_path("memory:///", false)] /// This sets a memory url path to "/foo", which is invalid. #[case::memory_invalid_root_path_foo("memory:///foo", false)] + /// redb with a host, and a valid path path, which should fail. + #[case::redb_invalid_host_with_valid_path(&format!("redb://foo.example{}", &TMPDIR_REDB_1.path().to_str().unwrap()), false)] + /// redb with / as path, which should fail. + #[case::redb_invalid_root("redb:///", false)] + /// redb with / as path, which should succeed. + #[case::redb_valid_path(&format!("redb://{}", &TMPDIR_REDB_2.path().join("foo").to_str().unwrap()), true)] + /// redb using the in-memory backend, which should succeed. + #[case::redb_valid_in_memory("redb://", true)] /// Correct Scheme for the cache.nixos.org binary cache. #[case::correct_nix_https("nix+https://cache.nixos.org", true)] /// Correct Scheme for the cache.nixos.org binary cache (HTTP URL). diff --git a/tvix/store/src/pathinfoservice/mod.rs b/tvix/store/src/pathinfoservice/mod.rs index 70f752f22..d118a8af1 100644 --- a/tvix/store/src/pathinfoservice/mod.rs +++ b/tvix/store/src/pathinfoservice/mod.rs @@ -4,6 +4,7 @@ mod grpc; mod lru; mod memory; mod nix_http; +mod redb; mod sled; #[cfg(any(feature = "fuse", feature = "virtiofs"))] @@ -28,6 +29,7 @@ pub use self::grpc::{GRPCPathInfoService, GRPCPathInfoServiceConfig}; pub use self::lru::{LruPathInfoService, LruPathInfoServiceConfig}; pub use self::memory::{MemoryPathInfoService, MemoryPathInfoServiceConfig}; pub use self::nix_http::{NixHTTPPathInfoService, NixHTTPPathInfoServiceConfig}; +pub use self::redb::{RedbPathInfoService, RedbPathInfoServiceConfig}; pub use self::sled::{SledPathInfoService, SledPathInfoServiceConfig}; #[cfg(feature = "cloud")] @@ -88,6 +90,7 @@ pub(crate) fn register_pathinfo_services(reg: &mut Registry) { reg.register::>, MemoryPathInfoServiceConfig>("memory"); reg.register::>, NixHTTPPathInfoServiceConfig>("nix"); reg.register::>, SledPathInfoServiceConfig>("sled"); + reg.register::>, RedbPathInfoServiceConfig>("redb"); #[cfg(feature = "cloud")] { reg.register::>, BigtableParameters>( diff --git a/tvix/store/src/pathinfoservice/redb.rs b/tvix/store/src/pathinfoservice/redb.rs new file mode 100644 index 000000000..180ec3ef7 --- /dev/null +++ b/tvix/store/src/pathinfoservice/redb.rs @@ -0,0 +1,214 @@ +use super::PathInfoService; +use crate::proto::PathInfo; +use data_encoding::BASE64; +use futures::{stream::BoxStream, StreamExt}; +use prost::Message; +use redb::{Database, ReadableTable, TableDefinition}; +use std::{path::PathBuf, sync::Arc}; +use tokio_stream::wrappers::ReceiverStream; +use tonic::async_trait; +use tracing::{instrument, warn}; +use tvix_castore::{ + composition::{CompositionContext, ServiceBuilder}, + Error, +}; + +const PATHINFO_TABLE: TableDefinition<[u8; 20], Vec> = TableDefinition::new("pathinfo"); + +/// PathInfoService implementation using redb under the hood. +/// redb stores all of its data in a single file with a K/V pointing from a path's output hash to +/// its corresponding protobuf-encoded PathInfo. +pub struct RedbPathInfoService { + // We wrap db in an Arc to be able to move it into spawn_blocking, + // as discussed in https://github.com/cberner/redb/issues/789 + db: Arc, +} + +impl RedbPathInfoService { + /// Constructs a new instance using the specified file system path for + /// storage. + pub async fn new(path: PathBuf) -> Result { + if path == PathBuf::from("/") { + return Err(Error::StorageError( + "cowardly refusing to open / with redb".to_string(), + )); + } + + let db = tokio::task::spawn_blocking(|| -> Result<_, redb::Error> { + let db = redb::Database::create(path)?; + create_schema(&db)?; + Ok(db) + }) + .await??; + + Ok(Self { db: Arc::new(db) }) + } + + /// Constructs a new instance using the in-memory backend. + pub fn new_temporary() -> Result { + let db = + redb::Database::builder().create_with_backend(redb::backends::InMemoryBackend::new())?; + + create_schema(&db)?; + + Ok(Self { db: Arc::new(db) }) + } +} + +/// Ensures all tables are present. +fn create_schema(db: &redb::Database) -> Result<(), redb::Error> { + // Opens a write transaction and calls open_table on PATHINFO_TABLE, which will + // create it if not present. + let txn = db.begin_write()?; + txn.open_table(PATHINFO_TABLE)?; + txn.commit()?; + + Ok(()) +} + +#[async_trait] +impl PathInfoService for RedbPathInfoService { + #[instrument(level = "trace", skip_all, fields(path_info.digest = BASE64.encode(&digest)))] + async fn get(&self, digest: [u8; 20]) -> Result, Error> { + let db = self.db.clone(); + + tokio::task::spawn_blocking({ + move || { + let txn = db.begin_read()?; + let table = txn.open_table(PATHINFO_TABLE)?; + match table.get(digest)? { + Some(pathinfo_bytes) => Ok(Some( + PathInfo::decode(pathinfo_bytes.value().as_slice()).map_err(|e| { + warn!(err=%e, "failed to decode stored PathInfo"); + Error::StorageError(format!("failed to decode stored PathInfo: {}", e)) + })?, + )), + None => Ok(None), + } + } + }) + .await? + } + + #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node))] + async fn put(&self, path_info: PathInfo) -> Result { + // Call validate on the received PathInfo message. + let store_path = path_info + .validate() + .map_err(|e| Error::InvalidRequest(format!("failed to validate PathInfo: {}", e)))? + .to_owned(); + + let path_info_encoded = path_info.encode_to_vec(); + let db = self.db.clone(); + + tokio::task::spawn_blocking({ + move || -> Result<(), Error> { + let txn = db.begin_write()?; + { + let mut table = txn.open_table(PATHINFO_TABLE)?; + table + .insert(store_path.digest(), path_info_encoded) + .map_err(|e| { + warn!(err=%e, "failed to insert PathInfo"); + Error::StorageError(format!("failed to insert PathInfo: {}", e)) + })?; + } + Ok(txn.commit()?) + } + }) + .await??; + + Ok(path_info) + } + + fn list(&self) -> BoxStream<'static, Result> { + let db = self.db.clone(); + let (tx, rx) = tokio::sync::mpsc::channel(50); + + // Spawn a blocking task which writes all PathInfos to tx. + tokio::task::spawn_blocking({ + move || -> Result<(), Error> { + let read_txn = db.begin_read()?; + let table = read_txn.open_table(PATHINFO_TABLE)?; + + for elem in table.iter()? { + let elem = elem?; + tokio::runtime::Handle::current() + .block_on(tx.send(Ok( + PathInfo::decode(elem.1.value().as_slice()).map_err(|e| { + Error::InvalidRequest(format!("invalid PathInfo: {}", e)) + })?, + ))) + .map_err(|e| Error::StorageError(e.to_string()))?; + } + + Ok(()) + } + }); + + ReceiverStream::from(rx).boxed() + } +} + +#[derive(serde::Deserialize)] +#[serde(deny_unknown_fields)] +pub struct RedbPathInfoServiceConfig { + is_temporary: bool, + #[serde(default)] + /// required when is_temporary = false + path: Option, +} + +impl TryFrom for RedbPathInfoServiceConfig { + type Error = Box; + fn try_from(url: url::Url) -> Result { + // redb doesn't support host, and a path can be provided (otherwise it'll live in memory only) + if url.has_host() { + return Err(Error::StorageError("no host allowed".to_string()).into()); + } + + Ok(if url.path().is_empty() { + RedbPathInfoServiceConfig { + is_temporary: true, + path: None, + } + } else { + RedbPathInfoServiceConfig { + is_temporary: false, + path: Some(url.path().into()), + } + }) + } +} + +#[async_trait] +impl ServiceBuilder for RedbPathInfoServiceConfig { + type Output = dyn PathInfoService; + async fn build<'a>( + &'a self, + _instance_name: &str, + _context: &CompositionContext, + ) -> Result, Box> { + match self { + RedbPathInfoServiceConfig { + is_temporary: true, + path: None, + } => Ok(Arc::new(RedbPathInfoService::new_temporary()?)), + RedbPathInfoServiceConfig { + is_temporary: true, + path: Some(_), + } => Err( + Error::StorageError("Temporary RedbPathInfoService can not have path".into()) + .into(), + ), + RedbPathInfoServiceConfig { + is_temporary: false, + path: None, + } => Err(Error::StorageError("RedbPathInfoService is missing path".into()).into()), + RedbPathInfoServiceConfig { + is_temporary: false, + path: Some(path), + } => Ok(Arc::new(RedbPathInfoService::new(path.to_owned()).await?)), + } + } +} diff --git a/tvix/store/src/pathinfoservice/tests/mod.rs b/tvix/store/src/pathinfoservice/tests/mod.rs index 826840801..777588e9b 100644 --- a/tvix/store/src/pathinfoservice/tests/mod.rs +++ b/tvix/store/src/pathinfoservice/tests/mod.rs @@ -7,6 +7,7 @@ use rstest::*; use rstest_reuse::{self, *}; use super::PathInfoService; +use crate::pathinfoservice::redb::RedbPathInfoService; use crate::pathinfoservice::MemoryPathInfoService; use crate::pathinfoservice::SledPathInfoService; use crate::proto::PathInfo; @@ -27,6 +28,7 @@ use self::utils::make_bigtable_path_info_service; svc })] #[case::sled(SledPathInfoService::new_temporary().unwrap())] +#[case::redb(RedbPathInfoService::new_temporary().unwrap())] #[cfg_attr(all(feature = "cloud",feature="integration"), case::bigtable(make_bigtable_path_info_service().await))] pub fn path_info_services(#[case] svc: impl PathInfoService) {}