feat(tvix/store): add bigtable pathinfoservice backend
Put behind the "cloud" backend, like in the `tvix-castore` crate. Change-Id: Ib38d198baf11ab2a4b6dc405121676147c424611 Reviewed-on: https://cl.tvl.fyi/c/depot/+/11362 Autosubmit: flokli <flokli@flokli.de> Reviewed-by: Connor Brewster <cbrewster@hey.com> Tested-by: BuildkiteCI
This commit is contained in:
parent
d6cadee941
commit
fbf31f45ef
8 changed files with 499 additions and 7 deletions
5
tvix/Cargo.lock
generated
5
tvix/Cargo.lock
generated
|
@ -4505,8 +4505,10 @@ name = "tvix-store"
|
|||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-process",
|
||||
"async-recursion",
|
||||
"async-stream",
|
||||
"bigtable_rs",
|
||||
"blake3",
|
||||
"bstr",
|
||||
"bytes",
|
||||
|
@ -4525,6 +4527,9 @@ dependencies = [
|
|||
"reqwest",
|
||||
"rstest",
|
||||
"rstest_reuse",
|
||||
"serde",
|
||||
"serde_qs",
|
||||
"serde_with",
|
||||
"sha2",
|
||||
"sled",
|
||||
"tempfile",
|
||||
|
|
|
@ -14472,6 +14472,11 @@ rec {
|
|||
name = "async-stream";
|
||||
packageId = "async-stream";
|
||||
}
|
||||
{
|
||||
name = "bigtable_rs";
|
||||
packageId = "bigtable_rs";
|
||||
optional = true;
|
||||
}
|
||||
{
|
||||
name = "blake3";
|
||||
packageId = "blake3";
|
||||
|
@ -14541,6 +14546,19 @@ rec {
|
|||
usesDefaultFeatures = false;
|
||||
features = [ "rustls-tls-native-roots" "stream" ];
|
||||
}
|
||||
{
|
||||
name = "serde";
|
||||
packageId = "serde";
|
||||
features = [ "derive" ];
|
||||
}
|
||||
{
|
||||
name = "serde_qs";
|
||||
packageId = "serde_qs";
|
||||
}
|
||||
{
|
||||
name = "serde_with";
|
||||
packageId = "serde_with";
|
||||
}
|
||||
{
|
||||
name = "sha2";
|
||||
packageId = "sha2";
|
||||
|
@ -14628,6 +14646,10 @@ rec {
|
|||
}
|
||||
];
|
||||
devDependencies = [
|
||||
{
|
||||
name = "async-process";
|
||||
packageId = "async-process";
|
||||
}
|
||||
{
|
||||
name = "rstest";
|
||||
packageId = "rstest";
|
||||
|
@ -14650,7 +14672,7 @@ rec {
|
|||
}
|
||||
];
|
||||
features = {
|
||||
"cloud" = [ "tvix-castore/cloud" ];
|
||||
"cloud" = [ "dep:bigtable_rs" "tvix-castore/cloud" ];
|
||||
"default" = [ "cloud" "fuse" "otlp" "tonic-reflection" ];
|
||||
"fuse" = [ "tvix-castore/fuse" ];
|
||||
"otlp" = [ "dep:opentelemetry" "dep:opentelemetry-otlp" "dep:opentelemetry_sdk" ];
|
||||
|
|
|
@ -20,6 +20,9 @@ prost = "0.12.1"
|
|||
opentelemetry = { version = "0.21.0", optional = true}
|
||||
opentelemetry-otlp = { version = "0.14.0", optional = true }
|
||||
opentelemetry_sdk = { version = "0.21.0", features = ["rt-tokio"], optional = true}
|
||||
serde = { version = "1.0.197", features = [ "derive" ] }
|
||||
serde_with = "3.7.0"
|
||||
serde_qs = "0.12.0"
|
||||
sha2 = "0.10.6"
|
||||
sled = { version = "0.34.7" }
|
||||
thiserror = "1.0.38"
|
||||
|
@ -43,11 +46,18 @@ xz2 = "0.1.7"
|
|||
optional = true
|
||||
version = "0.11.0"
|
||||
|
||||
[dependencies.bigtable_rs]
|
||||
optional = true
|
||||
# https://github.com/liufuyang/bigtable_rs/pull/72
|
||||
git = "https://github.com/flokli/bigtable_rs"
|
||||
rev = "0af404741dfc40eb9fa99cf4d4140a09c5c20df7"
|
||||
|
||||
[build-dependencies]
|
||||
prost-build = "0.12.1"
|
||||
tonic-build = "0.11.0"
|
||||
|
||||
[dev-dependencies]
|
||||
async-process = "2.1.0"
|
||||
rstest = "0.18.2"
|
||||
rstest_reuse = "0.6.0"
|
||||
test-case = "3.3.1"
|
||||
|
@ -56,7 +66,10 @@ tokio-retry = "0.3.0"
|
|||
|
||||
[features]
|
||||
default = ["cloud", "fuse", "otlp", "tonic-reflection"]
|
||||
cloud = ["tvix-castore/cloud"]
|
||||
cloud = [
|
||||
"dep:bigtable_rs",
|
||||
"tvix-castore/cloud"
|
||||
]
|
||||
fuse = ["tvix-castore/fuse"]
|
||||
otlp = ["dep:opentelemetry", "dep:opentelemetry-otlp", "dep:opentelemetry_sdk"]
|
||||
tonic-reflection = ["dep:tonic-reflection", "tvix-castore/tonic-reflection"]
|
||||
|
|
|
@ -25,12 +25,14 @@ in
|
|||
(depot.tvix.crates.workspaceMembers.tvix-store.build.override {
|
||||
runTests = true;
|
||||
testPreRun = ''
|
||||
export SSL_CERT_FILE=${pkgs.cacert}/etc/ssl/certs/ca-bundle.crt;
|
||||
export SSL_CERT_FILE=${pkgs.cacert}/etc/ssl/certs/ca-bundle.crt
|
||||
export PATH="$PATH:${pkgs.lib.makeBinPath [pkgs.cbtemulator pkgs.google-cloud-bigtable-tool]}"
|
||||
'';
|
||||
|
||||
# virtiofs feature currently fails to build on Darwin.
|
||||
# we however can ship it for non-darwin.
|
||||
features = if pkgs.stdenv.isDarwin then [ "default" ] else [ "default" "virtiofs" ];
|
||||
# enable some optional features.
|
||||
features = [ "default" "cloud" ]
|
||||
# virtiofs feature currently fails to build on Darwin.
|
||||
++ pkgs.lib.optional pkgs.stdenv.isLinux "virtiofs";
|
||||
}).overrideAttrs (_: {
|
||||
meta.ci.extraSteps = {
|
||||
import-docs = (mkImportCheck "tvix/store/docs" ./docs);
|
||||
|
|
399
tvix/store/src/pathinfoservice/bigtable.rs
Normal file
399
tvix/store/src/pathinfoservice/bigtable.rs
Normal file
|
@ -0,0 +1,399 @@
|
|||
use super::PathInfoService;
|
||||
use crate::proto;
|
||||
use crate::proto::PathInfo;
|
||||
use async_stream::try_stream;
|
||||
use bigtable_rs::{bigtable, google::bigtable::v2 as bigtable_v2};
|
||||
use bytes::Bytes;
|
||||
use data_encoding::HEXLOWER;
|
||||
use futures::stream::BoxStream;
|
||||
use prost::Message;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_with::{serde_as, DurationSeconds};
|
||||
use tonic::async_trait;
|
||||
use tracing::trace;
|
||||
use tvix_castore::proto as castorepb;
|
||||
use tvix_castore::Error;
|
||||
|
||||
/// There should not be more than 10 MiB in a single cell.
|
||||
/// https://cloud.google.com/bigtable/docs/schema-design#cells
|
||||
const CELL_SIZE_LIMIT: u64 = 10 * 1024 * 1024;
|
||||
|
||||
/// Provides a [DirectoryService] implementation using
|
||||
/// [Bigtable](https://cloud.google.com/bigtable/docs/)
|
||||
/// as an underlying K/V store.
|
||||
///
|
||||
/// # Data format
|
||||
/// We use Bigtable as a plain K/V store.
|
||||
/// The row key is the digest of the store path, in hexlower.
|
||||
/// Inside the row, we currently have a single column/cell, again using the
|
||||
/// hexlower store path digest.
|
||||
/// Its value is the PathInfo message, serialized in canonical protobuf.
|
||||
/// We currently only populate this column.
|
||||
///
|
||||
/// Listing is ranging over all rows, and calculate_nar is returning a
|
||||
/// "unimplemented" error.
|
||||
#[derive(Clone)]
|
||||
pub struct BigtablePathInfoService {
|
||||
client: bigtable::BigTable,
|
||||
params: BigtableParameters,
|
||||
|
||||
#[cfg(test)]
|
||||
#[allow(dead_code)]
|
||||
/// Holds the temporary directory containing the unix socket, and the
|
||||
/// spawned emulator process.
|
||||
emulator: std::sync::Arc<(tempfile::TempDir, async_process::Child)>,
|
||||
}
|
||||
|
||||
/// Represents configuration of [BigtablePathInfoService].
|
||||
/// This currently conflates both connect parameters and data model/client
|
||||
/// behaviour parameters.
|
||||
#[serde_as]
|
||||
#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)]
|
||||
pub struct BigtableParameters {
|
||||
project_id: String,
|
||||
instance_name: String,
|
||||
#[serde(default)]
|
||||
is_read_only: bool,
|
||||
#[serde(default = "default_channel_size")]
|
||||
channel_size: usize,
|
||||
|
||||
#[serde_as(as = "Option<DurationSeconds<String>>")]
|
||||
#[serde(default = "default_timeout")]
|
||||
timeout: Option<std::time::Duration>,
|
||||
table_name: String,
|
||||
family_name: String,
|
||||
|
||||
#[serde(default = "default_app_profile_id")]
|
||||
app_profile_id: String,
|
||||
}
|
||||
|
||||
fn default_app_profile_id() -> String {
|
||||
"default".to_owned()
|
||||
}
|
||||
|
||||
fn default_channel_size() -> usize {
|
||||
4
|
||||
}
|
||||
|
||||
fn default_timeout() -> Option<std::time::Duration> {
|
||||
Some(std::time::Duration::from_secs(4))
|
||||
}
|
||||
|
||||
impl BigtablePathInfoService {
|
||||
#[cfg(not(test))]
|
||||
pub async fn connect(params: BigtableParameters) -> Result<Self, bigtable::Error> {
|
||||
let connection = bigtable::BigTableConnection::new(
|
||||
¶ms.project_id,
|
||||
¶ms.instance_name,
|
||||
params.is_read_only,
|
||||
params.channel_size,
|
||||
params.timeout,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(Self {
|
||||
client: connection.client(),
|
||||
params,
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub async fn connect(params: BigtableParameters) -> Result<Self, bigtable::Error> {
|
||||
use std::time::Duration;
|
||||
|
||||
use async_process::{Command, Stdio};
|
||||
use tempfile::TempDir;
|
||||
use tokio_retry::{strategy::ExponentialBackoff, Retry};
|
||||
|
||||
let tmpdir = TempDir::new().unwrap();
|
||||
|
||||
let socket_path = tmpdir.path().join("cbtemulator.sock");
|
||||
|
||||
let emulator_process = Command::new("cbtemulator")
|
||||
.arg("-address")
|
||||
.arg(socket_path.clone())
|
||||
.stderr(Stdio::piped())
|
||||
.stdout(Stdio::piped())
|
||||
.kill_on_drop(true)
|
||||
.spawn()
|
||||
.expect("failed to spwan emulator");
|
||||
|
||||
Retry::spawn(
|
||||
ExponentialBackoff::from_millis(20).max_delay(Duration::from_secs(1)),
|
||||
|| async {
|
||||
if socket_path.exists() {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(())
|
||||
}
|
||||
},
|
||||
)
|
||||
.await
|
||||
.expect("failed to wait for socket");
|
||||
|
||||
// populate the emulator
|
||||
for cmd in &[
|
||||
vec!["createtable", ¶ms.table_name],
|
||||
vec!["createfamily", ¶ms.table_name, ¶ms.family_name],
|
||||
] {
|
||||
Command::new("cbt")
|
||||
.args({
|
||||
let mut args = vec![
|
||||
"-instance",
|
||||
¶ms.instance_name,
|
||||
"-project",
|
||||
¶ms.project_id,
|
||||
];
|
||||
args.extend_from_slice(cmd);
|
||||
args
|
||||
})
|
||||
.env(
|
||||
"BIGTABLE_EMULATOR_HOST",
|
||||
format!("unix://{}", socket_path.to_string_lossy()),
|
||||
)
|
||||
.output()
|
||||
.await
|
||||
.expect("failed to run cbt setup command");
|
||||
}
|
||||
|
||||
let connection = bigtable_rs::bigtable::BigTableConnection::new_with_emulator(
|
||||
&format!("unix://{}", socket_path.to_string_lossy()),
|
||||
¶ms.project_id,
|
||||
¶ms.instance_name,
|
||||
false,
|
||||
None,
|
||||
)?;
|
||||
|
||||
Ok(Self {
|
||||
client: connection.client(),
|
||||
params,
|
||||
emulator: (tmpdir, emulator_process).into(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Derives the row/column key for a given output path.
|
||||
/// We use hexlower encoding, also because it can't be misinterpreted as RE2.
|
||||
fn derive_pathinfo_key(digest: &[u8; 20]) -> String {
|
||||
HEXLOWER.encode(digest)
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl PathInfoService for BigtablePathInfoService {
|
||||
async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
|
||||
let mut client = self.client.clone();
|
||||
let path_info_key = derive_pathinfo_key(&digest);
|
||||
|
||||
let request = bigtable_v2::ReadRowsRequest {
|
||||
app_profile_id: self.params.app_profile_id.to_string(),
|
||||
table_name: client.get_full_table_name(&self.params.table_name),
|
||||
rows_limit: 1,
|
||||
rows: Some(bigtable_v2::RowSet {
|
||||
row_keys: vec![path_info_key.clone().into()],
|
||||
row_ranges: vec![],
|
||||
}),
|
||||
// Filter selected family name, and column qualifier matching the digest.
|
||||
// The latter is to ensure we don't fail once we start adding more metadata.
|
||||
filter: Some(bigtable_v2::RowFilter {
|
||||
filter: Some(bigtable_v2::row_filter::Filter::Chain(
|
||||
bigtable_v2::row_filter::Chain {
|
||||
filters: vec![
|
||||
bigtable_v2::RowFilter {
|
||||
filter: Some(
|
||||
bigtable_v2::row_filter::Filter::FamilyNameRegexFilter(
|
||||
self.params.family_name.to_string(),
|
||||
),
|
||||
),
|
||||
},
|
||||
bigtable_v2::RowFilter {
|
||||
filter: Some(
|
||||
bigtable_v2::row_filter::Filter::ColumnQualifierRegexFilter(
|
||||
path_info_key.clone().into(),
|
||||
),
|
||||
),
|
||||
},
|
||||
],
|
||||
},
|
||||
)),
|
||||
}),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let mut response = client
|
||||
.read_rows(request)
|
||||
.await
|
||||
.map_err(|e| Error::StorageError(format!("unable to read rows: {}", e)))?;
|
||||
|
||||
if response.len() != 1 {
|
||||
if response.len() > 1 {
|
||||
// This shouldn't happen, we limit number of rows to 1
|
||||
return Err(Error::StorageError(
|
||||
"got more than one row from bigtable".into(),
|
||||
));
|
||||
}
|
||||
// else, this is simply a "not found".
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let (row_key, mut cells) = response.pop().unwrap();
|
||||
if row_key != path_info_key.as_bytes() {
|
||||
// This shouldn't happen, we requested this row key.
|
||||
return Err(Error::StorageError(
|
||||
"got wrong row key from bigtable".into(),
|
||||
));
|
||||
}
|
||||
|
||||
let cell = cells
|
||||
.pop()
|
||||
.ok_or_else(|| Error::StorageError("found no cells".into()))?;
|
||||
|
||||
// Ensure there's only one cell (so no more left after the pop())
|
||||
// This shouldn't happen, We filter out other cells in our query.
|
||||
if !cells.is_empty() {
|
||||
return Err(Error::StorageError(
|
||||
"more than one cell returned from bigtable".into(),
|
||||
));
|
||||
}
|
||||
|
||||
// We also require the qualifier to be correct in the filter above,
|
||||
// so this shouldn't happen.
|
||||
if path_info_key.as_bytes() != cell.qualifier {
|
||||
return Err(Error::StorageError("unexpected cell qualifier".into()));
|
||||
}
|
||||
|
||||
// Try to parse the value into a PathInfo message
|
||||
let path_info = proto::PathInfo::decode(Bytes::from(cell.value))
|
||||
.map_err(|e| Error::StorageError(format!("unable to decode pathinfo proto: {}", e)))?;
|
||||
|
||||
let store_path = path_info
|
||||
.validate()
|
||||
.map_err(|e| Error::StorageError(format!("invalid PathInfo: {}", e)))?;
|
||||
|
||||
if store_path.digest() != &digest {
|
||||
return Err(Error::StorageError("PathInfo has unexpected digest".into()));
|
||||
}
|
||||
|
||||
Ok(Some(path_info))
|
||||
}
|
||||
|
||||
async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> {
|
||||
let store_path = path_info
|
||||
.validate()
|
||||
.map_err(|e| Error::InvalidRequest(format!("pathinfo failed validation: {}", e)))?;
|
||||
|
||||
let mut client = self.client.clone();
|
||||
let path_info_key = derive_pathinfo_key(store_path.digest());
|
||||
|
||||
let data = path_info.encode_to_vec();
|
||||
if data.len() as u64 > CELL_SIZE_LIMIT {
|
||||
return Err(Error::StorageError(
|
||||
"PathInfo exceeds cell limit on Bigtable".into(),
|
||||
));
|
||||
}
|
||||
|
||||
let resp = client
|
||||
.check_and_mutate_row(bigtable_v2::CheckAndMutateRowRequest {
|
||||
table_name: client.get_full_table_name(&self.params.table_name),
|
||||
app_profile_id: self.params.app_profile_id.to_string(),
|
||||
row_key: path_info_key.clone().into(),
|
||||
predicate_filter: Some(bigtable_v2::RowFilter {
|
||||
filter: Some(bigtable_v2::row_filter::Filter::ColumnQualifierRegexFilter(
|
||||
path_info_key.clone().into(),
|
||||
)),
|
||||
}),
|
||||
// If the column was already found, do nothing.
|
||||
true_mutations: vec![],
|
||||
// Else, do the insert.
|
||||
false_mutations: vec![
|
||||
// https://cloud.google.com/bigtable/docs/writes
|
||||
bigtable_v2::Mutation {
|
||||
mutation: Some(bigtable_v2::mutation::Mutation::SetCell(
|
||||
bigtable_v2::mutation::SetCell {
|
||||
family_name: self.params.family_name.to_string(),
|
||||
column_qualifier: path_info_key.clone().into(),
|
||||
timestamp_micros: -1, // use server time to fill timestamp
|
||||
value: data,
|
||||
},
|
||||
)),
|
||||
},
|
||||
],
|
||||
})
|
||||
.await
|
||||
.map_err(|e| Error::StorageError(format!("unable to mutate rows: {}", e)))?;
|
||||
|
||||
if resp.predicate_matched {
|
||||
trace!("already existed")
|
||||
}
|
||||
|
||||
Ok(path_info)
|
||||
}
|
||||
|
||||
async fn calculate_nar(
|
||||
&self,
|
||||
_root_node: &castorepb::node::Node,
|
||||
) -> Result<(u64, [u8; 32]), Error> {
|
||||
return Err(Error::StorageError("unimplemented".into()));
|
||||
}
|
||||
|
||||
fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
|
||||
let mut client = self.client.clone();
|
||||
|
||||
let request = bigtable_v2::ReadRowsRequest {
|
||||
app_profile_id: self.params.app_profile_id.to_string(),
|
||||
table_name: client.get_full_table_name(&self.params.table_name),
|
||||
filter: Some(bigtable_v2::RowFilter {
|
||||
filter: Some(bigtable_v2::row_filter::Filter::FamilyNameRegexFilter(
|
||||
self.params.family_name.to_string(),
|
||||
)),
|
||||
}),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let stream = try_stream! {
|
||||
// TODO: add pagination, we don't want to hold all of this in memory.
|
||||
let response = client
|
||||
.read_rows(request)
|
||||
.await
|
||||
.map_err(|e| Error::StorageError(format!("unable to read rows: {}", e)))?;
|
||||
|
||||
for (row_key, mut cells) in response {
|
||||
let cell = cells
|
||||
.pop()
|
||||
.ok_or_else(|| Error::StorageError("found no cells".into()))?;
|
||||
|
||||
// The cell must have the same qualifier as the row key
|
||||
if row_key != cell.qualifier {
|
||||
Err(Error::StorageError("unexpected cell qualifier".into()))?;
|
||||
}
|
||||
|
||||
// Ensure there's only one cell (so no more left after the pop())
|
||||
// This shouldn't happen, We filter out other cells in our query.
|
||||
if !cells.is_empty() {
|
||||
|
||||
Err(Error::StorageError(
|
||||
"more than one cell returned from bigtable".into(),
|
||||
))?
|
||||
}
|
||||
|
||||
// Try to parse the value into a PathInfo message.
|
||||
let path_info = proto::PathInfo::decode(Bytes::from(cell.value))
|
||||
.map_err(|e| Error::StorageError(format!("unable to decode pathinfo proto: {}", e)))?;
|
||||
|
||||
// Validate the containing PathInfo, ensure its StorePath digest
|
||||
// matches row key.
|
||||
let store_path = path_info
|
||||
.validate()
|
||||
.map_err(|e| Error::StorageError(format!("invalid PathInfo: {}", e)))?;
|
||||
|
||||
if store_path.digest().as_slice() != row_key.as_slice() {
|
||||
Err(Error::StorageError("PathInfo has unexpected digest".into()))?
|
||||
}
|
||||
|
||||
|
||||
yield path_info
|
||||
}
|
||||
};
|
||||
|
||||
Box::pin(stream)
|
||||
}
|
||||
}
|
|
@ -37,7 +37,8 @@ pub async fn from_addr(
|
|||
blob_service: Arc<dyn BlobService>,
|
||||
directory_service: Arc<dyn DirectoryService>,
|
||||
) -> Result<Box<dyn PathInfoService>, Error> {
|
||||
let url =
|
||||
#[allow(unused_mut)]
|
||||
let mut url =
|
||||
Url::parse(uri).map_err(|e| Error::StorageError(format!("unable to parse url: {}", e)))?;
|
||||
|
||||
let path_info_service: Box<dyn PathInfoService> = match url.scheme() {
|
||||
|
@ -108,6 +109,30 @@ pub async fn from_addr(
|
|||
PathInfoServiceClient::new(tvix_castore::tonic::channel_from_url(&url).await?);
|
||||
Box::new(GRPCPathInfoService::from_client(client))
|
||||
}
|
||||
#[cfg(feature = "cloud")]
|
||||
"bigtable" => {
|
||||
use super::bigtable::BigtableParameters;
|
||||
use super::BigtablePathInfoService;
|
||||
|
||||
// parse the instance name from the hostname.
|
||||
let instance_name = url
|
||||
.host_str()
|
||||
.ok_or_else(|| Error::StorageError("instance name missing".into()))?
|
||||
.to_string();
|
||||
|
||||
// … but add it to the query string now, so we just need to parse that.
|
||||
url.query_pairs_mut()
|
||||
.append_pair("instance_name", &instance_name);
|
||||
|
||||
let params: BigtableParameters = serde_qs::from_str(url.query().unwrap_or_default())
|
||||
.map_err(|e| Error::InvalidRequest(format!("failed to parse parameters: {}", e)))?;
|
||||
|
||||
Box::new(
|
||||
BigtablePathInfoService::connect(params)
|
||||
.await
|
||||
.map_err(|e| Error::StorageError(e.to_string()))?,
|
||||
)
|
||||
}
|
||||
_ => Err(Error::StorageError(format!(
|
||||
"unknown scheme: {}",
|
||||
url.scheme()
|
||||
|
@ -194,4 +219,24 @@ mod tests {
|
|||
assert!(resp.is_err(), "should fail");
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "cloud")]
|
||||
/// A valid example for Bigtable.
|
||||
#[test_case("bigtable://instance-1?project_id=project-1&table_name=table-1&family_name=cf1", true; "objectstore valid bigtable url")]
|
||||
/// An invalid examplee for Bigtable, missing fields
|
||||
#[test_case("bigtable://instance-1", false; "objectstore invalid bigtable url, missing fields")]
|
||||
#[tokio::test]
|
||||
async fn test_from_addr_tokio_cloud(uri_str: &str, exp_succeed: bool) {
|
||||
let blob_service: Arc<dyn BlobService> = Arc::from(MemoryBlobService::default());
|
||||
let directory_service: Arc<dyn DirectoryService> =
|
||||
Arc::from(MemoryDirectoryService::default());
|
||||
|
||||
let resp = from_addr(uri_str, blob_service, directory_service).await;
|
||||
|
||||
if exp_succeed {
|
||||
resp.expect("should succeed");
|
||||
} else {
|
||||
assert!(resp.is_err(), "should fail");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,6 +23,11 @@ pub use self::memory::MemoryPathInfoService;
|
|||
pub use self::nix_http::NixHTTPPathInfoService;
|
||||
pub use self::sled::SledPathInfoService;
|
||||
|
||||
#[cfg(feature = "cloud")]
|
||||
mod bigtable;
|
||||
#[cfg(feature = "cloud")]
|
||||
pub use self::bigtable::BigtablePathInfoService;
|
||||
|
||||
#[cfg(any(feature = "fuse", feature = "virtiofs"))]
|
||||
pub use self::fs::make_fs;
|
||||
|
||||
|
|
|
@ -51,6 +51,7 @@ pub async fn make_path_info_service(uri: &str) -> BSDSPS {
|
|||
#[case::memory(make_path_info_service("memory://").await)]
|
||||
#[case::grpc(make_grpc_path_info_service_client().await)]
|
||||
#[case::sled(make_path_info_service("sled://").await)]
|
||||
#[cfg_attr(feature = "cloud", case::bigtable(make_path_info_service("bigtable://instance-1?project_id=project-1&table_name=table-1&family_name=cf1").await))]
|
||||
pub fn path_info_services(
|
||||
#[case] services: (
|
||||
impl BlobService,
|
||||
|
|
Loading…
Reference in a new issue