Compare commits
2 commits
fork
...
nar-bridge
Author | SHA1 | Date | |
---|---|---|---|
|
71c10cd7d6 | ||
|
c000e67769 |
4 changed files with 66 additions and 37 deletions
|
@ -17,6 +17,12 @@ struct Cli {
|
||||||
#[arg(long, env, default_value = "grpc+http://[::1]:8000")]
|
#[arg(long, env, default_value = "grpc+http://[::1]:8000")]
|
||||||
path_info_service_addr: String,
|
path_info_service_addr: String,
|
||||||
|
|
||||||
|
/// URL to a PathInfoService that's considered "remote".
|
||||||
|
/// If set, the other one is considered "local", and a "cache" for the
|
||||||
|
/// "remote" one.
|
||||||
|
#[arg(long, env)]
|
||||||
|
remote_path_info_service_addr: Option<String>,
|
||||||
|
|
||||||
/// The priority to announce at the `nix-cache-info` endpoint.
|
/// The priority to announce at the `nix-cache-info` endpoint.
|
||||||
/// A lower number means it's *more preferred.
|
/// A lower number means it's *more preferred.
|
||||||
#[arg(long, env, default_value_t = 39)]
|
#[arg(long, env, default_value_t = 39)]
|
||||||
|
@ -48,12 +54,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||||
builder.build()?
|
builder.build()?
|
||||||
};
|
};
|
||||||
|
|
||||||
// initialize stores
|
let (blob_service, directory_service, path_info_service, _) =
|
||||||
let (blob_service, directory_service, path_info_service, _nar_calculation_service) =
|
tvix_store::utils::initialize_stores(
|
||||||
tvix_store::utils::construct_services(
|
|
||||||
cli.blob_service_addr,
|
cli.blob_service_addr,
|
||||||
cli.directory_service_addr,
|
cli.directory_service_addr,
|
||||||
cli.path_info_service_addr,
|
cli.path_info_service_addr,
|
||||||
|
cli.remote_path_info_service_addr,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
|
|
|
@ -208,39 +208,14 @@ async fn run_cli(cli: Cli) -> Result<(), Box<dyn std::error::Error + Send + Sync
|
||||||
path_info_service_addr,
|
path_info_service_addr,
|
||||||
remote_path_info_service_addr,
|
remote_path_info_service_addr,
|
||||||
} => {
|
} => {
|
||||||
// initialize stores
|
let (blob_service, directory_service, path_info_service, nar_calculation_service) =
|
||||||
let mut configs = tvix_store::utils::addrs_to_configs(
|
tvix_store::utils::initialize_stores(
|
||||||
blob_service_addr,
|
blob_service_addr,
|
||||||
directory_service_addr,
|
directory_service_addr,
|
||||||
path_info_service_addr,
|
path_info_service_addr,
|
||||||
)?;
|
remote_path_info_service_addr,
|
||||||
|
)
|
||||||
// if remote_path_info_service_addr has been specified,
|
.await?;
|
||||||
// update path_info_service to point to a cache combining the two.
|
|
||||||
if let Some(addr) = remote_path_info_service_addr {
|
|
||||||
use tvix_store::composition::{with_registry, DeserializeWithRegistry, REG};
|
|
||||||
use tvix_store::pathinfoservice::CachePathInfoServiceConfig;
|
|
||||||
|
|
||||||
let remote_url = url::Url::parse(&addr)?;
|
|
||||||
let remote_config = with_registry(®, || remote_url.try_into())?;
|
|
||||||
|
|
||||||
let local = configs.pathinfoservices.insert(
|
|
||||||
"default".into(),
|
|
||||||
DeserializeWithRegistry(Box::new(CachePathInfoServiceConfig {
|
|
||||||
near: "local".into(),
|
|
||||||
far: "remote".into(),
|
|
||||||
})),
|
|
||||||
);
|
|
||||||
configs
|
|
||||||
.pathinfoservices
|
|
||||||
.insert("local".into(), local.unwrap());
|
|
||||||
configs
|
|
||||||
.pathinfoservices
|
|
||||||
.insert("remote".into(), remote_config);
|
|
||||||
}
|
|
||||||
|
|
||||||
let (blob_service, directory_service, path_info_service, nar_calculation_service) =
|
|
||||||
tvix_store::utils::construct_services_from_configs(configs).await?;
|
|
||||||
|
|
||||||
let mut server = Server::builder().layer(
|
let mut server = Server::builder().layer(
|
||||||
ServiceBuilder::new()
|
ServiceBuilder::new()
|
||||||
|
|
|
@ -179,8 +179,8 @@ where
|
||||||
|
|
||||||
// handle decompression, depending on the compression field.
|
// handle decompression, depending on the compression field.
|
||||||
let mut r: Box<dyn AsyncRead + Send + Unpin> = match narinfo.compression {
|
let mut r: Box<dyn AsyncRead + Send + Unpin> = match narinfo.compression {
|
||||||
Some("none") => Box::new(r) as Box<dyn AsyncRead + Send + Unpin>,
|
Some("none") | None => Box::new(r) as Box<dyn AsyncRead + Send + Unpin>,
|
||||||
Some("bzip2") | None => Box::new(async_compression::tokio::bufread::BzDecoder::new(r))
|
Some("bzip2") => Box::new(async_compression::tokio::bufread::BzDecoder::new(r))
|
||||||
as Box<dyn AsyncRead + Send + Unpin>,
|
as Box<dyn AsyncRead + Send + Unpin>,
|
||||||
Some("gzip") => Box::new(async_compression::tokio::bufread::GzipDecoder::new(r))
|
Some("gzip") => Box::new(async_compression::tokio::bufread::GzipDecoder::new(r))
|
||||||
as Box<dyn AsyncRead + Send + Unpin>,
|
as Box<dyn AsyncRead + Send + Unpin>,
|
||||||
|
|
|
@ -29,6 +29,54 @@ pub struct CompositionConfigs {
|
||||||
>,
|
>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn initialize_stores(
|
||||||
|
blob_service_addr: impl AsRef<str>,
|
||||||
|
directory_service_addr: impl AsRef<str>,
|
||||||
|
path_info_service_addr: impl AsRef<str>,
|
||||||
|
remote_path_info_service_addr: Option<impl AsRef<str>>,
|
||||||
|
) -> Result<
|
||||||
|
(
|
||||||
|
Arc<dyn BlobService>,
|
||||||
|
Arc<dyn DirectoryService>,
|
||||||
|
Arc<dyn PathInfoService>,
|
||||||
|
Box<dyn NarCalculationService>,
|
||||||
|
),
|
||||||
|
Box<dyn std::error::Error + Send + Sync>,
|
||||||
|
> {
|
||||||
|
// initialize stores
|
||||||
|
let mut configs = crate::utils::addrs_to_configs(
|
||||||
|
blob_service_addr,
|
||||||
|
directory_service_addr,
|
||||||
|
path_info_service_addr,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
// if remote_path_info_service_addr has been specified,
|
||||||
|
// update path_info_service to point to a cache combining the two.
|
||||||
|
if let Some(addr) = remote_path_info_service_addr {
|
||||||
|
use crate::composition::{with_registry, DeserializeWithRegistry, REG};
|
||||||
|
use crate::pathinfoservice::CachePathInfoServiceConfig;
|
||||||
|
|
||||||
|
let remote_url = url::Url::parse(addr.as_ref())?;
|
||||||
|
let remote_config = with_registry(®, || remote_url.try_into())?;
|
||||||
|
|
||||||
|
let local = configs.pathinfoservices.insert(
|
||||||
|
"default".into(),
|
||||||
|
DeserializeWithRegistry(Box::new(CachePathInfoServiceConfig {
|
||||||
|
near: "local".into(),
|
||||||
|
far: "remote".into(),
|
||||||
|
})),
|
||||||
|
);
|
||||||
|
configs
|
||||||
|
.pathinfoservices
|
||||||
|
.insert("local".into(), local.unwrap());
|
||||||
|
configs
|
||||||
|
.pathinfoservices
|
||||||
|
.insert("remote".into(), remote_config);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(crate::utils::construct_services_from_configs(configs).await?)
|
||||||
|
}
|
||||||
|
|
||||||
pub fn addrs_to_configs(
|
pub fn addrs_to_configs(
|
||||||
blob_service_addr: impl AsRef<str>,
|
blob_service_addr: impl AsRef<str>,
|
||||||
directory_service_addr: impl AsRef<str>,
|
directory_service_addr: impl AsRef<str>,
|
||||||
|
|
Loading…
Reference in a new issue