From fbf31f45efb48776c73ce88f093a416f59d22585 Mon Sep 17 00:00:00 2001 From: Florian Klink Date: Fri, 5 Apr 2024 16:54:50 +0300 Subject: [PATCH] feat(tvix/store): add bigtable pathinfoservice backend Put behind the "cloud" backend, like in the `tvix-castore` crate. Change-Id: Ib38d198baf11ab2a4b6dc405121676147c424611 Reviewed-on: https://cl.tvl.fyi/c/depot/+/11362 Autosubmit: flokli Reviewed-by: Connor Brewster Tested-by: BuildkiteCI --- tvix/Cargo.lock | 5 + tvix/Cargo.nix | 24 +- tvix/store/Cargo.toml | 15 +- tvix/store/default.nix | 10 +- tvix/store/src/pathinfoservice/bigtable.rs | 399 ++++++++++++++++++++ tvix/store/src/pathinfoservice/from_addr.rs | 47 ++- tvix/store/src/pathinfoservice/mod.rs | 5 + tvix/store/src/pathinfoservice/tests/mod.rs | 1 + 8 files changed, 499 insertions(+), 7 deletions(-) create mode 100644 tvix/store/src/pathinfoservice/bigtable.rs diff --git a/tvix/Cargo.lock b/tvix/Cargo.lock index 2c39c2bad..f229e0b3c 100644 --- a/tvix/Cargo.lock +++ b/tvix/Cargo.lock @@ -4505,8 +4505,10 @@ name = "tvix-store" version = "0.1.0" dependencies = [ "anyhow", + "async-process", "async-recursion", "async-stream", + "bigtable_rs", "blake3", "bstr", "bytes", @@ -4525,6 +4527,9 @@ dependencies = [ "reqwest", "rstest", "rstest_reuse", + "serde", + "serde_qs", + "serde_with", "sha2", "sled", "tempfile", diff --git a/tvix/Cargo.nix b/tvix/Cargo.nix index a0cd0b999..68c90cdca 100644 --- a/tvix/Cargo.nix +++ b/tvix/Cargo.nix @@ -14472,6 +14472,11 @@ rec { name = "async-stream"; packageId = "async-stream"; } + { + name = "bigtable_rs"; + packageId = "bigtable_rs"; + optional = true; + } { name = "blake3"; packageId = "blake3"; @@ -14541,6 +14546,19 @@ rec { usesDefaultFeatures = false; features = [ "rustls-tls-native-roots" "stream" ]; } + { + name = "serde"; + packageId = "serde"; + features = [ "derive" ]; + } + { + name = "serde_qs"; + packageId = "serde_qs"; + } + { + name = "serde_with"; + packageId = "serde_with"; + } { name = "sha2"; packageId = "sha2"; @@ -14628,6 +14646,10 @@ rec { } ]; devDependencies = [ + { + name = "async-process"; + packageId = "async-process"; + } { name = "rstest"; packageId = "rstest"; @@ -14650,7 +14672,7 @@ rec { } ]; features = { - "cloud" = [ "tvix-castore/cloud" ]; + "cloud" = [ "dep:bigtable_rs" "tvix-castore/cloud" ]; "default" = [ "cloud" "fuse" "otlp" "tonic-reflection" ]; "fuse" = [ "tvix-castore/fuse" ]; "otlp" = [ "dep:opentelemetry" "dep:opentelemetry-otlp" "dep:opentelemetry_sdk" ]; diff --git a/tvix/store/Cargo.toml b/tvix/store/Cargo.toml index 367e63c21..5931a86e0 100644 --- a/tvix/store/Cargo.toml +++ b/tvix/store/Cargo.toml @@ -20,6 +20,9 @@ prost = "0.12.1" opentelemetry = { version = "0.21.0", optional = true} opentelemetry-otlp = { version = "0.14.0", optional = true } opentelemetry_sdk = { version = "0.21.0", features = ["rt-tokio"], optional = true} +serde = { version = "1.0.197", features = [ "derive" ] } +serde_with = "3.7.0" +serde_qs = "0.12.0" sha2 = "0.10.6" sled = { version = "0.34.7" } thiserror = "1.0.38" @@ -43,11 +46,18 @@ xz2 = "0.1.7" optional = true version = "0.11.0" +[dependencies.bigtable_rs] +optional = true +# https://github.com/liufuyang/bigtable_rs/pull/72 +git = "https://github.com/flokli/bigtable_rs" +rev = "0af404741dfc40eb9fa99cf4d4140a09c5c20df7" + [build-dependencies] prost-build = "0.12.1" tonic-build = "0.11.0" [dev-dependencies] +async-process = "2.1.0" rstest = "0.18.2" rstest_reuse = "0.6.0" test-case = "3.3.1" @@ -56,7 +66,10 @@ tokio-retry = "0.3.0" [features] default = ["cloud", "fuse", "otlp", "tonic-reflection"] -cloud = ["tvix-castore/cloud"] +cloud = [ + "dep:bigtable_rs", + "tvix-castore/cloud" +] fuse = ["tvix-castore/fuse"] otlp = ["dep:opentelemetry", "dep:opentelemetry-otlp", "dep:opentelemetry_sdk"] tonic-reflection = ["dep:tonic-reflection", "tvix-castore/tonic-reflection"] diff --git a/tvix/store/default.nix b/tvix/store/default.nix index 2c07cdf2b..f30923ac2 100644 --- a/tvix/store/default.nix +++ b/tvix/store/default.nix @@ -25,12 +25,14 @@ in (depot.tvix.crates.workspaceMembers.tvix-store.build.override { runTests = true; testPreRun = '' - export SSL_CERT_FILE=${pkgs.cacert}/etc/ssl/certs/ca-bundle.crt; + export SSL_CERT_FILE=${pkgs.cacert}/etc/ssl/certs/ca-bundle.crt + export PATH="$PATH:${pkgs.lib.makeBinPath [pkgs.cbtemulator pkgs.google-cloud-bigtable-tool]}" ''; - # virtiofs feature currently fails to build on Darwin. - # we however can ship it for non-darwin. - features = if pkgs.stdenv.isDarwin then [ "default" ] else [ "default" "virtiofs" ]; + # enable some optional features. + features = [ "default" "cloud" ] + # virtiofs feature currently fails to build on Darwin. + ++ pkgs.lib.optional pkgs.stdenv.isLinux "virtiofs"; }).overrideAttrs (_: { meta.ci.extraSteps = { import-docs = (mkImportCheck "tvix/store/docs" ./docs); diff --git a/tvix/store/src/pathinfoservice/bigtable.rs b/tvix/store/src/pathinfoservice/bigtable.rs new file mode 100644 index 000000000..cb1683016 --- /dev/null +++ b/tvix/store/src/pathinfoservice/bigtable.rs @@ -0,0 +1,399 @@ +use super::PathInfoService; +use crate::proto; +use crate::proto::PathInfo; +use async_stream::try_stream; +use bigtable_rs::{bigtable, google::bigtable::v2 as bigtable_v2}; +use bytes::Bytes; +use data_encoding::HEXLOWER; +use futures::stream::BoxStream; +use prost::Message; +use serde::{Deserialize, Serialize}; +use serde_with::{serde_as, DurationSeconds}; +use tonic::async_trait; +use tracing::trace; +use tvix_castore::proto as castorepb; +use tvix_castore::Error; + +/// There should not be more than 10 MiB in a single cell. +/// https://cloud.google.com/bigtable/docs/schema-design#cells +const CELL_SIZE_LIMIT: u64 = 10 * 1024 * 1024; + +/// Provides a [DirectoryService] implementation using +/// [Bigtable](https://cloud.google.com/bigtable/docs/) +/// as an underlying K/V store. +/// +/// # Data format +/// We use Bigtable as a plain K/V store. +/// The row key is the digest of the store path, in hexlower. +/// Inside the row, we currently have a single column/cell, again using the +/// hexlower store path digest. +/// Its value is the PathInfo message, serialized in canonical protobuf. +/// We currently only populate this column. +/// +/// Listing is ranging over all rows, and calculate_nar is returning a +/// "unimplemented" error. +#[derive(Clone)] +pub struct BigtablePathInfoService { + client: bigtable::BigTable, + params: BigtableParameters, + + #[cfg(test)] + #[allow(dead_code)] + /// Holds the temporary directory containing the unix socket, and the + /// spawned emulator process. + emulator: std::sync::Arc<(tempfile::TempDir, async_process::Child)>, +} + +/// Represents configuration of [BigtablePathInfoService]. +/// This currently conflates both connect parameters and data model/client +/// behaviour parameters. +#[serde_as] +#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] +pub struct BigtableParameters { + project_id: String, + instance_name: String, + #[serde(default)] + is_read_only: bool, + #[serde(default = "default_channel_size")] + channel_size: usize, + + #[serde_as(as = "Option>")] + #[serde(default = "default_timeout")] + timeout: Option, + table_name: String, + family_name: String, + + #[serde(default = "default_app_profile_id")] + app_profile_id: String, +} + +fn default_app_profile_id() -> String { + "default".to_owned() +} + +fn default_channel_size() -> usize { + 4 +} + +fn default_timeout() -> Option { + Some(std::time::Duration::from_secs(4)) +} + +impl BigtablePathInfoService { + #[cfg(not(test))] + pub async fn connect(params: BigtableParameters) -> Result { + let connection = bigtable::BigTableConnection::new( + ¶ms.project_id, + ¶ms.instance_name, + params.is_read_only, + params.channel_size, + params.timeout, + ) + .await?; + + Ok(Self { + client: connection.client(), + params, + }) + } + + #[cfg(test)] + pub async fn connect(params: BigtableParameters) -> Result { + use std::time::Duration; + + use async_process::{Command, Stdio}; + use tempfile::TempDir; + use tokio_retry::{strategy::ExponentialBackoff, Retry}; + + let tmpdir = TempDir::new().unwrap(); + + let socket_path = tmpdir.path().join("cbtemulator.sock"); + + let emulator_process = Command::new("cbtemulator") + .arg("-address") + .arg(socket_path.clone()) + .stderr(Stdio::piped()) + .stdout(Stdio::piped()) + .kill_on_drop(true) + .spawn() + .expect("failed to spwan emulator"); + + Retry::spawn( + ExponentialBackoff::from_millis(20).max_delay(Duration::from_secs(1)), + || async { + if socket_path.exists() { + Ok(()) + } else { + Err(()) + } + }, + ) + .await + .expect("failed to wait for socket"); + + // populate the emulator + for cmd in &[ + vec!["createtable", ¶ms.table_name], + vec!["createfamily", ¶ms.table_name, ¶ms.family_name], + ] { + Command::new("cbt") + .args({ + let mut args = vec![ + "-instance", + ¶ms.instance_name, + "-project", + ¶ms.project_id, + ]; + args.extend_from_slice(cmd); + args + }) + .env( + "BIGTABLE_EMULATOR_HOST", + format!("unix://{}", socket_path.to_string_lossy()), + ) + .output() + .await + .expect("failed to run cbt setup command"); + } + + let connection = bigtable_rs::bigtable::BigTableConnection::new_with_emulator( + &format!("unix://{}", socket_path.to_string_lossy()), + ¶ms.project_id, + ¶ms.instance_name, + false, + None, + )?; + + Ok(Self { + client: connection.client(), + params, + emulator: (tmpdir, emulator_process).into(), + }) + } +} + +/// Derives the row/column key for a given output path. +/// We use hexlower encoding, also because it can't be misinterpreted as RE2. +fn derive_pathinfo_key(digest: &[u8; 20]) -> String { + HEXLOWER.encode(digest) +} + +#[async_trait] +impl PathInfoService for BigtablePathInfoService { + async fn get(&self, digest: [u8; 20]) -> Result, Error> { + let mut client = self.client.clone(); + let path_info_key = derive_pathinfo_key(&digest); + + let request = bigtable_v2::ReadRowsRequest { + app_profile_id: self.params.app_profile_id.to_string(), + table_name: client.get_full_table_name(&self.params.table_name), + rows_limit: 1, + rows: Some(bigtable_v2::RowSet { + row_keys: vec![path_info_key.clone().into()], + row_ranges: vec![], + }), + // Filter selected family name, and column qualifier matching the digest. + // The latter is to ensure we don't fail once we start adding more metadata. + filter: Some(bigtable_v2::RowFilter { + filter: Some(bigtable_v2::row_filter::Filter::Chain( + bigtable_v2::row_filter::Chain { + filters: vec![ + bigtable_v2::RowFilter { + filter: Some( + bigtable_v2::row_filter::Filter::FamilyNameRegexFilter( + self.params.family_name.to_string(), + ), + ), + }, + bigtable_v2::RowFilter { + filter: Some( + bigtable_v2::row_filter::Filter::ColumnQualifierRegexFilter( + path_info_key.clone().into(), + ), + ), + }, + ], + }, + )), + }), + ..Default::default() + }; + + let mut response = client + .read_rows(request) + .await + .map_err(|e| Error::StorageError(format!("unable to read rows: {}", e)))?; + + if response.len() != 1 { + if response.len() > 1 { + // This shouldn't happen, we limit number of rows to 1 + return Err(Error::StorageError( + "got more than one row from bigtable".into(), + )); + } + // else, this is simply a "not found". + return Ok(None); + } + + let (row_key, mut cells) = response.pop().unwrap(); + if row_key != path_info_key.as_bytes() { + // This shouldn't happen, we requested this row key. + return Err(Error::StorageError( + "got wrong row key from bigtable".into(), + )); + } + + let cell = cells + .pop() + .ok_or_else(|| Error::StorageError("found no cells".into()))?; + + // Ensure there's only one cell (so no more left after the pop()) + // This shouldn't happen, We filter out other cells in our query. + if !cells.is_empty() { + return Err(Error::StorageError( + "more than one cell returned from bigtable".into(), + )); + } + + // We also require the qualifier to be correct in the filter above, + // so this shouldn't happen. + if path_info_key.as_bytes() != cell.qualifier { + return Err(Error::StorageError("unexpected cell qualifier".into())); + } + + // Try to parse the value into a PathInfo message + let path_info = proto::PathInfo::decode(Bytes::from(cell.value)) + .map_err(|e| Error::StorageError(format!("unable to decode pathinfo proto: {}", e)))?; + + let store_path = path_info + .validate() + .map_err(|e| Error::StorageError(format!("invalid PathInfo: {}", e)))?; + + if store_path.digest() != &digest { + return Err(Error::StorageError("PathInfo has unexpected digest".into())); + } + + Ok(Some(path_info)) + } + + async fn put(&self, path_info: PathInfo) -> Result { + let store_path = path_info + .validate() + .map_err(|e| Error::InvalidRequest(format!("pathinfo failed validation: {}", e)))?; + + let mut client = self.client.clone(); + let path_info_key = derive_pathinfo_key(store_path.digest()); + + let data = path_info.encode_to_vec(); + if data.len() as u64 > CELL_SIZE_LIMIT { + return Err(Error::StorageError( + "PathInfo exceeds cell limit on Bigtable".into(), + )); + } + + let resp = client + .check_and_mutate_row(bigtable_v2::CheckAndMutateRowRequest { + table_name: client.get_full_table_name(&self.params.table_name), + app_profile_id: self.params.app_profile_id.to_string(), + row_key: path_info_key.clone().into(), + predicate_filter: Some(bigtable_v2::RowFilter { + filter: Some(bigtable_v2::row_filter::Filter::ColumnQualifierRegexFilter( + path_info_key.clone().into(), + )), + }), + // If the column was already found, do nothing. + true_mutations: vec![], + // Else, do the insert. + false_mutations: vec![ + // https://cloud.google.com/bigtable/docs/writes + bigtable_v2::Mutation { + mutation: Some(bigtable_v2::mutation::Mutation::SetCell( + bigtable_v2::mutation::SetCell { + family_name: self.params.family_name.to_string(), + column_qualifier: path_info_key.clone().into(), + timestamp_micros: -1, // use server time to fill timestamp + value: data, + }, + )), + }, + ], + }) + .await + .map_err(|e| Error::StorageError(format!("unable to mutate rows: {}", e)))?; + + if resp.predicate_matched { + trace!("already existed") + } + + Ok(path_info) + } + + async fn calculate_nar( + &self, + _root_node: &castorepb::node::Node, + ) -> Result<(u64, [u8; 32]), Error> { + return Err(Error::StorageError("unimplemented".into())); + } + + fn list(&self) -> BoxStream<'static, Result> { + let mut client = self.client.clone(); + + let request = bigtable_v2::ReadRowsRequest { + app_profile_id: self.params.app_profile_id.to_string(), + table_name: client.get_full_table_name(&self.params.table_name), + filter: Some(bigtable_v2::RowFilter { + filter: Some(bigtable_v2::row_filter::Filter::FamilyNameRegexFilter( + self.params.family_name.to_string(), + )), + }), + ..Default::default() + }; + + let stream = try_stream! { + // TODO: add pagination, we don't want to hold all of this in memory. + let response = client + .read_rows(request) + .await + .map_err(|e| Error::StorageError(format!("unable to read rows: {}", e)))?; + + for (row_key, mut cells) in response { + let cell = cells + .pop() + .ok_or_else(|| Error::StorageError("found no cells".into()))?; + + // The cell must have the same qualifier as the row key + if row_key != cell.qualifier { + Err(Error::StorageError("unexpected cell qualifier".into()))?; + } + + // Ensure there's only one cell (so no more left after the pop()) + // This shouldn't happen, We filter out other cells in our query. + if !cells.is_empty() { + + Err(Error::StorageError( + "more than one cell returned from bigtable".into(), + ))? + } + + // Try to parse the value into a PathInfo message. + let path_info = proto::PathInfo::decode(Bytes::from(cell.value)) + .map_err(|e| Error::StorageError(format!("unable to decode pathinfo proto: {}", e)))?; + + // Validate the containing PathInfo, ensure its StorePath digest + // matches row key. + let store_path = path_info + .validate() + .map_err(|e| Error::StorageError(format!("invalid PathInfo: {}", e)))?; + + if store_path.digest().as_slice() != row_key.as_slice() { + Err(Error::StorageError("PathInfo has unexpected digest".into()))? + } + + + yield path_info + } + }; + + Box::pin(stream) + } +} diff --git a/tvix/store/src/pathinfoservice/from_addr.rs b/tvix/store/src/pathinfoservice/from_addr.rs index 6109054d7..c14696f22 100644 --- a/tvix/store/src/pathinfoservice/from_addr.rs +++ b/tvix/store/src/pathinfoservice/from_addr.rs @@ -37,7 +37,8 @@ pub async fn from_addr( blob_service: Arc, directory_service: Arc, ) -> Result, Error> { - let url = + #[allow(unused_mut)] + let mut url = Url::parse(uri).map_err(|e| Error::StorageError(format!("unable to parse url: {}", e)))?; let path_info_service: Box = match url.scheme() { @@ -108,6 +109,30 @@ pub async fn from_addr( PathInfoServiceClient::new(tvix_castore::tonic::channel_from_url(&url).await?); Box::new(GRPCPathInfoService::from_client(client)) } + #[cfg(feature = "cloud")] + "bigtable" => { + use super::bigtable::BigtableParameters; + use super::BigtablePathInfoService; + + // parse the instance name from the hostname. + let instance_name = url + .host_str() + .ok_or_else(|| Error::StorageError("instance name missing".into()))? + .to_string(); + + // … but add it to the query string now, so we just need to parse that. + url.query_pairs_mut() + .append_pair("instance_name", &instance_name); + + let params: BigtableParameters = serde_qs::from_str(url.query().unwrap_or_default()) + .map_err(|e| Error::InvalidRequest(format!("failed to parse parameters: {}", e)))?; + + Box::new( + BigtablePathInfoService::connect(params) + .await + .map_err(|e| Error::StorageError(e.to_string()))?, + ) + } _ => Err(Error::StorageError(format!( "unknown scheme: {}", url.scheme() @@ -194,4 +219,24 @@ mod tests { assert!(resp.is_err(), "should fail"); } } + + #[cfg(feature = "cloud")] + /// A valid example for Bigtable. + #[test_case("bigtable://instance-1?project_id=project-1&table_name=table-1&family_name=cf1", true; "objectstore valid bigtable url")] + /// An invalid examplee for Bigtable, missing fields + #[test_case("bigtable://instance-1", false; "objectstore invalid bigtable url, missing fields")] + #[tokio::test] + async fn test_from_addr_tokio_cloud(uri_str: &str, exp_succeed: bool) { + let blob_service: Arc = Arc::from(MemoryBlobService::default()); + let directory_service: Arc = + Arc::from(MemoryDirectoryService::default()); + + let resp = from_addr(uri_str, blob_service, directory_service).await; + + if exp_succeed { + resp.expect("should succeed"); + } else { + assert!(resp.is_err(), "should fail"); + } + } } diff --git a/tvix/store/src/pathinfoservice/mod.rs b/tvix/store/src/pathinfoservice/mod.rs index c334c8dc5..c1a482bbb 100644 --- a/tvix/store/src/pathinfoservice/mod.rs +++ b/tvix/store/src/pathinfoservice/mod.rs @@ -23,6 +23,11 @@ pub use self::memory::MemoryPathInfoService; pub use self::nix_http::NixHTTPPathInfoService; pub use self::sled::SledPathInfoService; +#[cfg(feature = "cloud")] +mod bigtable; +#[cfg(feature = "cloud")] +pub use self::bigtable::BigtablePathInfoService; + #[cfg(any(feature = "fuse", feature = "virtiofs"))] pub use self::fs::make_fs; diff --git a/tvix/store/src/pathinfoservice/tests/mod.rs b/tvix/store/src/pathinfoservice/tests/mod.rs index a3035d094..da9ad11ca 100644 --- a/tvix/store/src/pathinfoservice/tests/mod.rs +++ b/tvix/store/src/pathinfoservice/tests/mod.rs @@ -51,6 +51,7 @@ pub async fn make_path_info_service(uri: &str) -> BSDSPS { #[case::memory(make_path_info_service("memory://").await)] #[case::grpc(make_grpc_path_info_service_client().await)] #[case::sled(make_path_info_service("sled://").await)] +#[cfg_attr(feature = "cloud", case::bigtable(make_path_info_service("bigtable://instance-1?project_id=project-1&table_name=table-1&family_name=cf1").await))] pub fn path_info_services( #[case] services: ( impl BlobService,