Compare commits
2 commits
fork
...
nar-bridge
Author | SHA1 | Date | |
---|---|---|---|
|
71c10cd7d6 | ||
|
c000e67769 |
4 changed files with 66 additions and 37 deletions
|
@ -17,6 +17,12 @@ struct Cli {
|
|||
#[arg(long, env, default_value = "grpc+http://[::1]:8000")]
|
||||
path_info_service_addr: String,
|
||||
|
||||
/// URL to a PathInfoService that's considered "remote".
|
||||
/// If set, the other one is considered "local", and a "cache" for the
|
||||
/// "remote" one.
|
||||
#[arg(long, env)]
|
||||
remote_path_info_service_addr: Option<String>,
|
||||
|
||||
/// The priority to announce at the `nix-cache-info` endpoint.
|
||||
/// A lower number means it's *more preferred.
|
||||
#[arg(long, env, default_value_t = 39)]
|
||||
|
@ -48,12 +54,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
|||
builder.build()?
|
||||
};
|
||||
|
||||
// initialize stores
|
||||
let (blob_service, directory_service, path_info_service, _nar_calculation_service) =
|
||||
tvix_store::utils::construct_services(
|
||||
let (blob_service, directory_service, path_info_service, _) =
|
||||
tvix_store::utils::initialize_stores(
|
||||
cli.blob_service_addr,
|
||||
cli.directory_service_addr,
|
||||
cli.path_info_service_addr,
|
||||
cli.remote_path_info_service_addr,
|
||||
)
|
||||
.await?;
|
||||
|
||||
|
|
|
@ -208,39 +208,14 @@ async fn run_cli(cli: Cli) -> Result<(), Box<dyn std::error::Error + Send + Sync
|
|||
path_info_service_addr,
|
||||
remote_path_info_service_addr,
|
||||
} => {
|
||||
// initialize stores
|
||||
let mut configs = tvix_store::utils::addrs_to_configs(
|
||||
blob_service_addr,
|
||||
directory_service_addr,
|
||||
path_info_service_addr,
|
||||
)?;
|
||||
|
||||
// if remote_path_info_service_addr has been specified,
|
||||
// update path_info_service to point to a cache combining the two.
|
||||
if let Some(addr) = remote_path_info_service_addr {
|
||||
use tvix_store::composition::{with_registry, DeserializeWithRegistry, REG};
|
||||
use tvix_store::pathinfoservice::CachePathInfoServiceConfig;
|
||||
|
||||
let remote_url = url::Url::parse(&addr)?;
|
||||
let remote_config = with_registry(®, || remote_url.try_into())?;
|
||||
|
||||
let local = configs.pathinfoservices.insert(
|
||||
"default".into(),
|
||||
DeserializeWithRegistry(Box::new(CachePathInfoServiceConfig {
|
||||
near: "local".into(),
|
||||
far: "remote".into(),
|
||||
})),
|
||||
);
|
||||
configs
|
||||
.pathinfoservices
|
||||
.insert("local".into(), local.unwrap());
|
||||
configs
|
||||
.pathinfoservices
|
||||
.insert("remote".into(), remote_config);
|
||||
}
|
||||
|
||||
let (blob_service, directory_service, path_info_service, nar_calculation_service) =
|
||||
tvix_store::utils::construct_services_from_configs(configs).await?;
|
||||
tvix_store::utils::initialize_stores(
|
||||
blob_service_addr,
|
||||
directory_service_addr,
|
||||
path_info_service_addr,
|
||||
remote_path_info_service_addr,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let mut server = Server::builder().layer(
|
||||
ServiceBuilder::new()
|
||||
|
|
|
@ -179,8 +179,8 @@ where
|
|||
|
||||
// handle decompression, depending on the compression field.
|
||||
let mut r: Box<dyn AsyncRead + Send + Unpin> = match narinfo.compression {
|
||||
Some("none") => Box::new(r) as Box<dyn AsyncRead + Send + Unpin>,
|
||||
Some("bzip2") | None => Box::new(async_compression::tokio::bufread::BzDecoder::new(r))
|
||||
Some("none") | None => Box::new(r) as Box<dyn AsyncRead + Send + Unpin>,
|
||||
Some("bzip2") => Box::new(async_compression::tokio::bufread::BzDecoder::new(r))
|
||||
as Box<dyn AsyncRead + Send + Unpin>,
|
||||
Some("gzip") => Box::new(async_compression::tokio::bufread::GzipDecoder::new(r))
|
||||
as Box<dyn AsyncRead + Send + Unpin>,
|
||||
|
|
|
@ -29,6 +29,54 @@ pub struct CompositionConfigs {
|
|||
>,
|
||||
}
|
||||
|
||||
pub async fn initialize_stores(
|
||||
blob_service_addr: impl AsRef<str>,
|
||||
directory_service_addr: impl AsRef<str>,
|
||||
path_info_service_addr: impl AsRef<str>,
|
||||
remote_path_info_service_addr: Option<impl AsRef<str>>,
|
||||
) -> Result<
|
||||
(
|
||||
Arc<dyn BlobService>,
|
||||
Arc<dyn DirectoryService>,
|
||||
Arc<dyn PathInfoService>,
|
||||
Box<dyn NarCalculationService>,
|
||||
),
|
||||
Box<dyn std::error::Error + Send + Sync>,
|
||||
> {
|
||||
// initialize stores
|
||||
let mut configs = crate::utils::addrs_to_configs(
|
||||
blob_service_addr,
|
||||
directory_service_addr,
|
||||
path_info_service_addr,
|
||||
)?;
|
||||
|
||||
// if remote_path_info_service_addr has been specified,
|
||||
// update path_info_service to point to a cache combining the two.
|
||||
if let Some(addr) = remote_path_info_service_addr {
|
||||
use crate::composition::{with_registry, DeserializeWithRegistry, REG};
|
||||
use crate::pathinfoservice::CachePathInfoServiceConfig;
|
||||
|
||||
let remote_url = url::Url::parse(addr.as_ref())?;
|
||||
let remote_config = with_registry(®, || remote_url.try_into())?;
|
||||
|
||||
let local = configs.pathinfoservices.insert(
|
||||
"default".into(),
|
||||
DeserializeWithRegistry(Box::new(CachePathInfoServiceConfig {
|
||||
near: "local".into(),
|
||||
far: "remote".into(),
|
||||
})),
|
||||
);
|
||||
configs
|
||||
.pathinfoservices
|
||||
.insert("local".into(), local.unwrap());
|
||||
configs
|
||||
.pathinfoservices
|
||||
.insert("remote".into(), remote_config);
|
||||
}
|
||||
|
||||
Ok(crate::utils::construct_services_from_configs(configs).await?)
|
||||
}
|
||||
|
||||
pub fn addrs_to_configs(
|
||||
blob_service_addr: impl AsRef<str>,
|
||||
directory_service_addr: impl AsRef<str>,
|
||||
|
|
Loading…
Reference in a new issue