Compare commits

...

2 commits

Author SHA1 Message Date
sinavir
71c10cd7d6 feat(nar-bridge): Use store composition 2024-07-21 16:36:12 +02:00
sinavir
c000e67769 fix(tvix/store): Fix narinfo compression selection
Parsing of the narinfo file sets the compression field to None instead
of Some("none"). The mapping selecting the decompression reader expected
the former in //tvix/store/src/pathinfoservice/nix_http.rs.

Change-Id: I254a825b88a4016aab087446bdc0c7b6286de40c
2024-07-21 16:27:58 +02:00
4 changed files with 66 additions and 37 deletions

View file

@ -17,6 +17,12 @@ struct Cli {
#[arg(long, env, default_value = "grpc+http://[::1]:8000")]
path_info_service_addr: String,
/// URL to a PathInfoService that's considered "remote".
/// If set, the other one is considered "local", and a "cache" for the
/// "remote" one.
#[arg(long, env)]
remote_path_info_service_addr: Option<String>,
/// The priority to announce at the `nix-cache-info` endpoint.
/// A lower number means it's *more preferred.
#[arg(long, env, default_value_t = 39)]
@ -48,12 +54,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
builder.build()?
};
// initialize stores
let (blob_service, directory_service, path_info_service, _nar_calculation_service) =
tvix_store::utils::construct_services(
let (blob_service, directory_service, path_info_service, _) =
tvix_store::utils::initialize_stores(
cli.blob_service_addr,
cli.directory_service_addr,
cli.path_info_service_addr,
cli.remote_path_info_service_addr,
)
.await?;

View file

@ -208,39 +208,14 @@ async fn run_cli(cli: Cli) -> Result<(), Box<dyn std::error::Error + Send + Sync
path_info_service_addr,
remote_path_info_service_addr,
} => {
// initialize stores
let mut configs = tvix_store::utils::addrs_to_configs(
blob_service_addr,
directory_service_addr,
path_info_service_addr,
)?;
// if remote_path_info_service_addr has been specified,
// update path_info_service to point to a cache combining the two.
if let Some(addr) = remote_path_info_service_addr {
use tvix_store::composition::{with_registry, DeserializeWithRegistry, REG};
use tvix_store::pathinfoservice::CachePathInfoServiceConfig;
let remote_url = url::Url::parse(&addr)?;
let remote_config = with_registry(&REG, || remote_url.try_into())?;
let local = configs.pathinfoservices.insert(
"default".into(),
DeserializeWithRegistry(Box::new(CachePathInfoServiceConfig {
near: "local".into(),
far: "remote".into(),
})),
);
configs
.pathinfoservices
.insert("local".into(), local.unwrap());
configs
.pathinfoservices
.insert("remote".into(), remote_config);
}
let (blob_service, directory_service, path_info_service, nar_calculation_service) =
tvix_store::utils::construct_services_from_configs(configs).await?;
tvix_store::utils::initialize_stores(
blob_service_addr,
directory_service_addr,
path_info_service_addr,
remote_path_info_service_addr,
)
.await?;
let mut server = Server::builder().layer(
ServiceBuilder::new()

View file

@ -179,8 +179,8 @@ where
// handle decompression, depending on the compression field.
let mut r: Box<dyn AsyncRead + Send + Unpin> = match narinfo.compression {
Some("none") => Box::new(r) as Box<dyn AsyncRead + Send + Unpin>,
Some("bzip2") | None => Box::new(async_compression::tokio::bufread::BzDecoder::new(r))
Some("none") | None => Box::new(r) as Box<dyn AsyncRead + Send + Unpin>,
Some("bzip2") => Box::new(async_compression::tokio::bufread::BzDecoder::new(r))
as Box<dyn AsyncRead + Send + Unpin>,
Some("gzip") => Box::new(async_compression::tokio::bufread::GzipDecoder::new(r))
as Box<dyn AsyncRead + Send + Unpin>,

View file

@ -29,6 +29,54 @@ pub struct CompositionConfigs {
>,
}
pub async fn initialize_stores(
blob_service_addr: impl AsRef<str>,
directory_service_addr: impl AsRef<str>,
path_info_service_addr: impl AsRef<str>,
remote_path_info_service_addr: Option<impl AsRef<str>>,
) -> Result<
(
Arc<dyn BlobService>,
Arc<dyn DirectoryService>,
Arc<dyn PathInfoService>,
Box<dyn NarCalculationService>,
),
Box<dyn std::error::Error + Send + Sync>,
> {
// initialize stores
let mut configs = crate::utils::addrs_to_configs(
blob_service_addr,
directory_service_addr,
path_info_service_addr,
)?;
// if remote_path_info_service_addr has been specified,
// update path_info_service to point to a cache combining the two.
if let Some(addr) = remote_path_info_service_addr {
use crate::composition::{with_registry, DeserializeWithRegistry, REG};
use crate::pathinfoservice::CachePathInfoServiceConfig;
let remote_url = url::Url::parse(addr.as_ref())?;
let remote_config = with_registry(&REG, || remote_url.try_into())?;
let local = configs.pathinfoservices.insert(
"default".into(),
DeserializeWithRegistry(Box::new(CachePathInfoServiceConfig {
near: "local".into(),
far: "remote".into(),
})),
);
configs
.pathinfoservices
.insert("local".into(), local.unwrap());
configs
.pathinfoservices
.insert("remote".into(), remote_config);
}
Ok(crate::utils::construct_services_from_configs(configs).await?)
}
pub fn addrs_to_configs(
blob_service_addr: impl AsRef<str>,
directory_service_addr: impl AsRef<str>,