2024-01-08 09:50:13 +01:00
|
|
|
use std::sync::Arc;
|
2024-01-09 00:16:52 +01:00
|
|
|
use std::{
|
|
|
|
pin::Pin,
|
|
|
|
task::{self, Poll},
|
|
|
|
};
|
|
|
|
use tokio::io::{self, AsyncWrite};
|
2023-12-31 15:33:26 +01:00
|
|
|
|
|
|
|
use tvix_castore::{
|
|
|
|
blobservice::{self, BlobService},
|
|
|
|
directoryservice::{self, DirectoryService},
|
|
|
|
};
|
2024-06-16 21:50:46 +02:00
|
|
|
use url::Url;
|
2023-12-31 15:33:26 +01:00
|
|
|
|
2024-05-10 07:59:25 +02:00
|
|
|
use crate::nar::{NarCalculationService, SimpleRenderer};
|
2024-01-08 09:50:13 +01:00
|
|
|
use crate::pathinfoservice::{self, PathInfoService};
|
2023-12-31 15:33:26 +01:00
|
|
|
|
2024-05-10 07:59:25 +02:00
|
|
|
/// Construct the store handles from their addrs.
|
2023-12-31 15:33:26 +01:00
|
|
|
pub async fn construct_services(
|
|
|
|
blob_service_addr: impl AsRef<str>,
|
|
|
|
directory_service_addr: impl AsRef<str>,
|
|
|
|
path_info_service_addr: impl AsRef<str>,
|
2024-07-18 19:09:07 +02:00
|
|
|
) -> Result<
|
|
|
|
(
|
|
|
|
Arc<dyn BlobService>,
|
|
|
|
Arc<dyn DirectoryService>,
|
|
|
|
Box<dyn PathInfoService>,
|
|
|
|
Box<dyn NarCalculationService>,
|
|
|
|
),
|
|
|
|
Box<dyn std::error::Error + Send + Sync>,
|
|
|
|
> {
|
|
|
|
let blob_service: Arc<dyn BlobService> =
|
|
|
|
blobservice::from_addr(blob_service_addr.as_ref()).await?;
|
2023-12-31 15:33:26 +01:00
|
|
|
let directory_service: Arc<dyn DirectoryService> =
|
2024-07-18 19:09:07 +02:00
|
|
|
directoryservice::from_addr(directory_service_addr.as_ref()).await?;
|
2024-06-16 21:50:46 +02:00
|
|
|
|
2023-12-31 15:33:26 +01:00
|
|
|
let path_info_service = pathinfoservice::from_addr(
|
|
|
|
path_info_service_addr.as_ref(),
|
|
|
|
blob_service.clone(),
|
|
|
|
directory_service.clone(),
|
|
|
|
)
|
|
|
|
.await?;
|
|
|
|
|
2024-06-16 21:50:46 +02:00
|
|
|
// HACK: The grpc client also implements NarCalculationService, and we
|
|
|
|
// really want to use it (otherwise we'd need to fetch everything again for hashing).
|
|
|
|
// Until we revamped store composition and config, detect this special case here.
|
|
|
|
let nar_calculation_service: Box<dyn NarCalculationService> = {
|
|
|
|
use crate::pathinfoservice::GRPCPathInfoService;
|
|
|
|
use crate::proto::path_info_service_client::PathInfoServiceClient;
|
|
|
|
|
|
|
|
let url = Url::parse(path_info_service_addr.as_ref())
|
|
|
|
.map_err(|e| io::Error::other(e.to_string()))?;
|
|
|
|
|
|
|
|
if url.scheme().starts_with("grpc+") {
|
2024-06-20 11:39:09 +02:00
|
|
|
Box::new(GRPCPathInfoService::from_client(
|
|
|
|
PathInfoServiceClient::with_interceptor(
|
|
|
|
tvix_castore::tonic::channel_from_url(&url)
|
|
|
|
.await
|
|
|
|
.map_err(|e| io::Error::other(e.to_string()))?,
|
|
|
|
tvix_tracing::propagate::tonic::send_trace,
|
|
|
|
),
|
|
|
|
))
|
2024-06-16 21:50:46 +02:00
|
|
|
} else {
|
|
|
|
Box::new(SimpleRenderer::new(
|
|
|
|
blob_service.clone(),
|
|
|
|
directory_service.clone(),
|
|
|
|
)) as Box<dyn NarCalculationService>
|
|
|
|
}
|
|
|
|
};
|
2024-05-10 07:59:25 +02:00
|
|
|
|
|
|
|
Ok((
|
|
|
|
blob_service,
|
|
|
|
directory_service,
|
|
|
|
path_info_service,
|
|
|
|
nar_calculation_service,
|
|
|
|
))
|
2023-12-31 15:33:26 +01:00
|
|
|
}
|
2024-01-09 00:16:52 +01:00
|
|
|
|
|
|
|
/// The inverse of [tokio_util::io::SyncIoBridge].
|
|
|
|
/// Don't use this with anything that actually does blocking I/O.
|
|
|
|
pub struct AsyncIoBridge<T>(pub T);
|
|
|
|
|
|
|
|
impl<W: std::io::Write + Unpin> AsyncWrite for AsyncIoBridge<W> {
|
|
|
|
fn poll_write(
|
|
|
|
self: Pin<&mut Self>,
|
|
|
|
_cx: &mut task::Context<'_>,
|
|
|
|
buf: &[u8],
|
|
|
|
) -> Poll<io::Result<usize>> {
|
|
|
|
Poll::Ready(self.get_mut().0.write(buf))
|
|
|
|
}
|
|
|
|
|
|
|
|
fn poll_flush(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
|
|
|
|
Poll::Ready(self.get_mut().0.flush())
|
|
|
|
}
|
|
|
|
|
|
|
|
fn poll_shutdown(
|
|
|
|
self: Pin<&mut Self>,
|
|
|
|
_cx: &mut task::Context<'_>,
|
|
|
|
) -> Poll<Result<(), io::Error>> {
|
|
|
|
Poll::Ready(Ok(()))
|
|
|
|
}
|
|
|
|
}
|