feat(tvix/store/pathinfosvc): add gRPC client

Change-Id: Ie8e205c691bd11db99fcf097357c1e49161c6e19
Reviewed-on: https://cl.tvl.fyi/c/depot/+/8349
Autosubmit: flokli <flokli@flokli.de>
Reviewed-by: tazjin <tazjin@tvl.su>
Tested-by: BuildkiteCI
This commit is contained in:
Florian Klink 2023-03-25 22:17:23 +01:00 committed by clbot
parent 93f3964cbc
commit b919f29752
3 changed files with 80 additions and 0 deletions

View file

@ -1,5 +1,6 @@
use std::sync::PoisonError;
use thiserror::Error;
use tokio::task::JoinError;
use tonic::Status;
/// Errors related to communication with the store.
@ -18,6 +19,12 @@ impl<T> From<PoisonError<T>> for Error {
}
}
impl From<JoinError> for Error {
fn from(value: JoinError) -> Self {
Error::StorageError(value.to_string())
}
}
impl From<Error> for Status {
fn from(value: Error) -> Self {
match value {

View file

@ -0,0 +1,71 @@
use super::PathInfoService;
use crate::proto;
use tonic::{transport::Channel, Code, Status};
/// Connects to a (remote) tvix-store PathInfoService over gRPC.
#[derive(Clone)]
pub struct GRPCPathInfoService {
/// A handle into the active tokio runtime. Necessary to spawn tasks.
tokio_handle: tokio::runtime::Handle,
/// The internal reference to a gRPC client.
/// Cloning it is cheap, and it internally handles concurrent requests.
grpc_client: proto::path_info_service_client::PathInfoServiceClient<Channel>,
}
impl GRPCPathInfoService {
/// Construct a new [GRPCPathInfoService], by passing a handle to the tokio
/// runtime, and a gRPC client.
pub fn new(
tokio_handle: tokio::runtime::Handle,
grpc_client: proto::path_info_service_client::PathInfoServiceClient<Channel>,
) -> Self {
Self {
tokio_handle,
grpc_client,
}
}
}
impl PathInfoService for GRPCPathInfoService {
fn get(
&self,
by_what: proto::get_path_info_request::ByWhat,
) -> Result<Option<proto::PathInfo>, crate::Error> {
// Get a new handle to the gRPC client.
let mut grpc_client = self.grpc_client.clone();
let task: tokio::task::JoinHandle<Result<proto::PathInfo, Status>> =
self.tokio_handle.spawn(async move {
let path_info = grpc_client
.get(proto::GetPathInfoRequest {
by_what: Some(by_what),
})
.await?
.into_inner();
Ok(path_info)
});
match self.tokio_handle.block_on(task)? {
Ok(path_info) => Ok(Some(path_info)),
Err(e) if e.code() == Code::NotFound => Ok(None),
Err(e) => Err(crate::Error::StorageError(e.to_string())),
}
}
fn put(&self, path_info: proto::PathInfo) -> Result<proto::PathInfo, crate::Error> {
// Get a new handle to the gRPC client.
let mut grpc_client = self.grpc_client.clone();
let task: tokio::task::JoinHandle<Result<proto::PathInfo, Status>> =
self.tokio_handle.spawn(async move {
let path_info = grpc_client.put(path_info).await?.into_inner();
Ok(path_info)
});
self.tokio_handle
.block_on(task)?
.map_err(|e| crate::Error::StorageError(e.to_string()))
}
}

View file

@ -1,8 +1,10 @@
mod grpc;
mod memory;
mod sled;
use crate::{proto, Error};
pub use self::grpc::GRPCPathInfoService;
pub use self::memory::MemoryPathInfoService;
pub use self::sled::SledPathInfoService;