This change contains 2 improvements to the tvix-store copy command: 1. Allows reading the reference graph from stdin, using `-` argument 2. Supports json representation produced by `nix path-info --json` command. In general it makes is easier and faster to import arbitrary closures from an existing nix store with e.g the following command: ``` nix path-info ./result --json --closure-size --recursive | \ jq -s '{closure: add}' | \ tvix-store copy - ``` Change-Id: Id6eea2993da233ecfbdc186f1a8c37735b686264 Reviewed-on: https://cl.tvl.fyi/c/depot/+/12765 Tested-by: BuildkiteCI Reviewed-by: flokli <flokli@flokli.de>
556 lines
21 KiB
Rust
556 lines
21 KiB
Rust
use clap::Parser;
|
|
use clap::Subcommand;
|
|
|
|
use futures::StreamExt;
|
|
use futures::TryStreamExt;
|
|
use nix_compat::nixhash::CAHash;
|
|
use nix_compat::nixhash::NixHash;
|
|
use nix_compat::wire::de::Error;
|
|
use nix_compat::{path_info::ExportedPathInfo, store_path::StorePath};
|
|
use serde::Deserialize;
|
|
use serde::Serialize;
|
|
use std::path::PathBuf;
|
|
use std::sync::Arc;
|
|
use tonic::transport::Server;
|
|
use tower::ServiceBuilder;
|
|
use tower_http::trace::{DefaultMakeSpan, TraceLayer};
|
|
use tracing::{debug, info, info_span, instrument, warn, Instrument, Level, Span};
|
|
use tracing_indicatif::span_ext::IndicatifSpanExt;
|
|
use tvix_castore::import::fs::ingest_path;
|
|
use tvix_store::import::path_to_name;
|
|
use tvix_store::nar::NarCalculationService;
|
|
use tvix_store::utils::{ServiceUrls, ServiceUrlsGrpc};
|
|
use tvix_tracing::TracingHandle;
|
|
|
|
use tvix_castore::proto::blob_service_server::BlobServiceServer;
|
|
use tvix_castore::proto::directory_service_server::DirectoryServiceServer;
|
|
use tvix_castore::proto::GRPCBlobServiceWrapper;
|
|
use tvix_castore::proto::GRPCDirectoryServiceWrapper;
|
|
use tvix_store::pathinfoservice::{PathInfo, PathInfoService};
|
|
use tvix_store::proto::path_info_service_server::PathInfoServiceServer;
|
|
use tvix_store::proto::GRPCPathInfoServiceWrapper;
|
|
|
|
#[cfg(any(feature = "fuse", feature = "virtiofs"))]
|
|
use tvix_store::pathinfoservice::make_fs;
|
|
|
|
#[cfg(feature = "fuse")]
|
|
use tvix_castore::fs::fuse::FuseDaemon;
|
|
|
|
#[cfg(feature = "virtiofs")]
|
|
use tvix_castore::fs::virtiofs::start_virtiofs_daemon;
|
|
|
|
#[cfg(feature = "tonic-reflection")]
|
|
use tvix_castore::proto::FILE_DESCRIPTOR_SET as CASTORE_FILE_DESCRIPTOR_SET;
|
|
#[cfg(feature = "tonic-reflection")]
|
|
use tvix_store::proto::FILE_DESCRIPTOR_SET;
|
|
|
|
use mimalloc::MiMalloc;
|
|
|
|
#[global_allocator]
|
|
static GLOBAL: MiMalloc = MiMalloc;
|
|
|
|
#[derive(Parser)]
|
|
#[command(author, version, about, long_about = None)]
|
|
struct Cli {
|
|
/// Whether to configure OTLP. Set --otlp=false to disable.
|
|
#[arg(long, default_missing_value = "true", default_value = "true", num_args(0..=1), require_equals(true), action(clap::ArgAction::Set))]
|
|
otlp: bool,
|
|
|
|
#[command(subcommand)]
|
|
command: Commands,
|
|
}
|
|
|
|
#[derive(Subcommand)]
|
|
enum Commands {
|
|
/// Runs the tvix-store daemon.
|
|
Daemon {
|
|
/// The address to listen on.
|
|
#[clap(flatten)]
|
|
listen_args: tokio_listener::ListenerAddressLFlag,
|
|
|
|
#[clap(flatten)]
|
|
service_addrs: ServiceUrls,
|
|
},
|
|
/// Imports a list of paths into the store, print the store path for each of them.
|
|
Import {
|
|
#[clap(value_name = "PATH")]
|
|
paths: Vec<PathBuf>,
|
|
|
|
#[clap(flatten)]
|
|
service_addrs: ServiceUrlsGrpc,
|
|
},
|
|
|
|
/// Copies a list of store paths on the system into tvix-store.
|
|
Copy {
|
|
#[clap(flatten)]
|
|
service_addrs: ServiceUrlsGrpc,
|
|
|
|
/// A path pointing to a JSON file(or '-' for stdin) produced by the Nix
|
|
/// `__structuredAttrs` containing reference graph information provided
|
|
/// by the `exportReferencesGraph` feature.
|
|
///
|
|
/// Additionally supports the output from the following nix command:
|
|
///
|
|
/// ```notrust
|
|
/// nix path-info --json --closure-size --recursive <some-path> | \
|
|
/// jq -s '{closure: add}' | \
|
|
/// tvix-store copy -
|
|
/// ```
|
|
///
|
|
/// This can be used to invoke tvix-store inside a Nix derivation
|
|
/// copying to a Tvix store (or outside, if the JSON file is copied
|
|
/// out).
|
|
///
|
|
/// Currently limited to the `closure` key inside that JSON file.
|
|
#[arg(value_name = "NIX_ATTRS_JSON_FILE", env = "NIX_ATTRS_JSON_FILE")]
|
|
reference_graph_path: PathBuf,
|
|
},
|
|
/// Mounts a tvix-store at the given mountpoint
|
|
#[cfg(feature = "fuse")]
|
|
Mount {
|
|
#[clap(value_name = "PATH")]
|
|
dest: PathBuf,
|
|
|
|
#[clap(flatten)]
|
|
service_addrs: ServiceUrlsGrpc,
|
|
|
|
/// Number of FUSE threads to spawn.
|
|
#[arg(long, env, default_value_t = default_threads())]
|
|
threads: usize,
|
|
|
|
#[arg(long, env, default_value_t = false)]
|
|
/// Whether to configure the mountpoint with allow_other.
|
|
/// Requires /etc/fuse.conf to contain the `user_allow_other`
|
|
/// option, configured via `programs.fuse.userAllowOther` on NixOS.
|
|
allow_other: bool,
|
|
|
|
/// Whether to list elements at the root of the mount point.
|
|
/// This is useful if your PathInfoService doesn't provide an
|
|
/// (exhaustive) listing.
|
|
#[clap(long, short, action)]
|
|
list_root: bool,
|
|
|
|
#[arg(long, default_value_t = true)]
|
|
/// Whether to expose blob and directory digests as extended attributes.
|
|
show_xattr: bool,
|
|
},
|
|
/// Starts a tvix-store virtiofs daemon at the given socket path.
|
|
#[cfg(feature = "virtiofs")]
|
|
#[command(name = "virtiofs")]
|
|
VirtioFs {
|
|
#[clap(value_name = "PATH")]
|
|
socket: PathBuf,
|
|
|
|
#[clap(flatten)]
|
|
service_addrs: ServiceUrlsGrpc,
|
|
|
|
/// Whether to list elements at the root of the mount point.
|
|
/// This is useful if your PathInfoService doesn't provide an
|
|
/// (exhaustive) listing.
|
|
#[clap(long, short, action)]
|
|
list_root: bool,
|
|
|
|
#[arg(long, default_value_t = true)]
|
|
/// Whether to expose blob and directory digests as extended attributes.
|
|
show_xattr: bool,
|
|
},
|
|
}
|
|
|
|
#[cfg(feature = "fuse")]
|
|
fn default_threads() -> usize {
|
|
std::thread::available_parallelism()
|
|
.map(|threads| threads.into())
|
|
.unwrap_or(4)
|
|
}
|
|
|
|
#[instrument(skip_all)]
|
|
async fn run_cli(
|
|
cli: Cli,
|
|
tracing_handle: TracingHandle,
|
|
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
|
match cli.command {
|
|
Commands::Daemon {
|
|
listen_args,
|
|
service_addrs,
|
|
} => {
|
|
// initialize stores
|
|
let (blob_service, directory_service, path_info_service, nar_calculation_service) =
|
|
tvix_store::utils::construct_services(service_addrs).await?;
|
|
|
|
let mut server = Server::builder().layer(
|
|
ServiceBuilder::new()
|
|
.layer(
|
|
TraceLayer::new_for_grpc().make_span_with(
|
|
DefaultMakeSpan::new()
|
|
.level(Level::INFO)
|
|
.include_headers(true),
|
|
),
|
|
)
|
|
.map_request(tvix_tracing::propagate::tonic::accept_trace),
|
|
);
|
|
|
|
let (_health_reporter, health_service) = tonic_health::server::health_reporter();
|
|
|
|
#[allow(unused_mut)]
|
|
let mut router = server
|
|
.add_service(health_service)
|
|
.add_service(BlobServiceServer::new(GRPCBlobServiceWrapper::new(
|
|
blob_service,
|
|
)))
|
|
.add_service(DirectoryServiceServer::new(
|
|
GRPCDirectoryServiceWrapper::new(directory_service),
|
|
))
|
|
.add_service(PathInfoServiceServer::new(GRPCPathInfoServiceWrapper::new(
|
|
path_info_service,
|
|
nar_calculation_service,
|
|
)));
|
|
|
|
#[cfg(feature = "tonic-reflection")]
|
|
{
|
|
router = router.add_service(
|
|
tonic_reflection::server::Builder::configure()
|
|
.register_encoded_file_descriptor_set(CASTORE_FILE_DESCRIPTOR_SET)
|
|
.register_encoded_file_descriptor_set(FILE_DESCRIPTOR_SET)
|
|
.build_v1alpha()?,
|
|
);
|
|
router = router.add_service(
|
|
tonic_reflection::server::Builder::configure()
|
|
.register_encoded_file_descriptor_set(CASTORE_FILE_DESCRIPTOR_SET)
|
|
.register_encoded_file_descriptor_set(FILE_DESCRIPTOR_SET)
|
|
.build_v1()?,
|
|
);
|
|
}
|
|
|
|
let listen_address = &listen_args.listen_address.unwrap_or_else(|| {
|
|
"[::]:8000"
|
|
.parse()
|
|
.expect("invalid fallback listen address")
|
|
});
|
|
|
|
let listener = tokio_listener::Listener::bind(
|
|
listen_address,
|
|
&Default::default(),
|
|
&listen_args.listener_options,
|
|
)
|
|
.await?;
|
|
|
|
info!(listen_address=%listen_address, "starting daemon");
|
|
|
|
router.serve_with_incoming(listener).await?;
|
|
}
|
|
Commands::Import {
|
|
paths,
|
|
service_addrs,
|
|
} => {
|
|
// FUTUREWORK: allow flat for single files?
|
|
let (blob_service, directory_service, path_info_service, nar_calculation_service) =
|
|
tvix_store::utils::construct_services(service_addrs).await?;
|
|
|
|
// Arc NarCalculationService, as we clone it .
|
|
let nar_calculation_service: Arc<dyn NarCalculationService> =
|
|
nar_calculation_service.into();
|
|
|
|
// For each path passed, construct the name, or bail out if it's invalid.
|
|
let paths_and_names = paths
|
|
.into_iter()
|
|
.map(|p| match path_to_name(&p) {
|
|
Ok(name) => {
|
|
let name = name.to_owned();
|
|
Ok((p, name))
|
|
}
|
|
Err(e) => Err(e),
|
|
})
|
|
.collect::<Result<Vec<_>, _>>()?;
|
|
|
|
let imports_span =
|
|
info_span!("import paths", "indicatif.pb_show" = tracing::field::Empty);
|
|
imports_span.pb_set_message("Importing");
|
|
imports_span.pb_set_length(paths_and_names.len() as u64);
|
|
imports_span.pb_set_style(&tvix_tracing::PB_PROGRESS_STYLE);
|
|
imports_span.pb_start();
|
|
|
|
futures::stream::iter(paths_and_names)
|
|
.map(|(path, name)| {
|
|
let blob_service = blob_service.clone();
|
|
let directory_service = directory_service.clone();
|
|
let path_info_service = path_info_service.clone();
|
|
let nar_calculation_service = nar_calculation_service.clone();
|
|
let imports_span = imports_span.clone();
|
|
let tracing_handle = tracing_handle.clone();
|
|
|
|
async move {
|
|
let span = Span::current();
|
|
span.pb_set_style(&tvix_tracing::PB_SPINNER_STYLE);
|
|
span.pb_set_message(&format!("Ingesting {:?}", path));
|
|
span.pb_start();
|
|
|
|
// Ingest the contents at the given path into castore.
|
|
let root_node = ingest_path::<_, _, _, &[u8]>(
|
|
blob_service,
|
|
directory_service,
|
|
&path,
|
|
None,
|
|
)
|
|
.await
|
|
.map_err(std::io::Error::custom)?;
|
|
|
|
span.pb_set_message(&format!("NAR Calculation for {:?}", path));
|
|
|
|
// Ask for the NAR size and sha256
|
|
let (nar_size, nar_sha256) =
|
|
nar_calculation_service.calculate_nar(&root_node).await?;
|
|
|
|
// Calculate the output path. This might still fail, as some names are illegal.
|
|
// FUTUREWORK: express the `name` at the type level to be valid and check for this earlier.
|
|
let ca = CAHash::Nar(NixHash::Sha256(nar_sha256));
|
|
let output_path: StorePath<String> =
|
|
nix_compat::store_path::build_ca_path::<&str, _, _>(
|
|
&name,
|
|
&ca,
|
|
[],
|
|
false,
|
|
)
|
|
.map_err(|e| {
|
|
warn!(err=%e, "unable to build CA path");
|
|
std::io::Error::custom(e)
|
|
})?;
|
|
|
|
// Construct and insert PathInfo
|
|
match path_info_service
|
|
.as_ref()
|
|
.put(PathInfo {
|
|
store_path: output_path.to_owned(),
|
|
node: root_node,
|
|
// There's no reference scanning on imported paths
|
|
references: vec![],
|
|
nar_size,
|
|
nar_sha256,
|
|
signatures: vec![],
|
|
deriver: None,
|
|
ca: Some(ca),
|
|
})
|
|
.await
|
|
{
|
|
// If the import was successful, print the path to stdout.
|
|
Ok(path_info) => {
|
|
use std::io::Write;
|
|
debug!(store_path=%path_info.store_path.to_absolute_path(), "imported path");
|
|
writeln!(&mut tracing_handle.get_stdout_writer(), "{}", path_info.store_path.to_absolute_path())?;
|
|
imports_span.pb_inc(1);
|
|
Ok(())
|
|
}
|
|
Err(e) => {
|
|
warn!(?path, err=%e, "failed to import");
|
|
Err(e)
|
|
}
|
|
}
|
|
}.instrument(info_span!("import path", "indicatif.pb_show" = tracing::field::Empty))
|
|
})
|
|
.buffer_unordered(50)
|
|
.try_collect::<()>()
|
|
.await?;
|
|
}
|
|
Commands::Copy {
|
|
service_addrs,
|
|
reference_graph_path,
|
|
} => {
|
|
let (blob_service, directory_service, path_info_service, _nar_calculation_service) =
|
|
tvix_store::utils::construct_services(service_addrs).await?;
|
|
// Parse the file at reference_graph_path.
|
|
let reference_graph_json = if reference_graph_path == PathBuf::from("-") {
|
|
let mut writer: Vec<u8> = vec![];
|
|
tokio::io::copy(&mut tokio::io::stdin(), &mut writer).await?;
|
|
writer
|
|
} else {
|
|
tokio::fs::read(&reference_graph_path).await?
|
|
};
|
|
|
|
#[derive(Deserialize, Serialize)]
|
|
struct ReferenceGraph<'a> {
|
|
#[serde(borrow)]
|
|
closure: Vec<ExportedPathInfo<'a>>,
|
|
}
|
|
|
|
let reference_graph: ReferenceGraph<'_> =
|
|
serde_json::from_slice(reference_graph_json.as_slice())?;
|
|
|
|
let lookups_span = info_span!(
|
|
"lookup pathinfos",
|
|
"indicatif.pb_show" = tracing::field::Empty
|
|
);
|
|
lookups_span.pb_set_length(reference_graph.closure.len() as u64);
|
|
lookups_span.pb_set_style(&tvix_tracing::PB_PROGRESS_STYLE);
|
|
lookups_span.pb_start();
|
|
|
|
// From our reference graph, lookup all pathinfos that might exist.
|
|
let elems: Vec<_> = futures::stream::iter(reference_graph.closure)
|
|
.map(|elem| {
|
|
let path_info_service = path_info_service.clone();
|
|
async move {
|
|
let resp = path_info_service
|
|
.get(*elem.path.digest())
|
|
.await
|
|
.map(|resp| (elem, resp));
|
|
|
|
Span::current().pb_inc(1);
|
|
resp
|
|
}
|
|
})
|
|
.buffer_unordered(50)
|
|
// Filter out all that are already uploaded.
|
|
// TODO: check if there's a better combinator for this
|
|
.try_filter_map(|(elem, path_info)| {
|
|
std::future::ready(if path_info.is_none() {
|
|
Ok(Some(elem))
|
|
} else {
|
|
Ok(None)
|
|
})
|
|
})
|
|
.try_collect()
|
|
.await?;
|
|
|
|
// Run ingest_path on all of them.
|
|
let uploads: Vec<_> = futures::stream::iter(elems)
|
|
.map(|elem| {
|
|
// Map to a future returning the root node, alongside with the closure info.
|
|
let blob_service = blob_service.clone();
|
|
let directory_service = directory_service.clone();
|
|
async move {
|
|
// Ingest the given path.
|
|
|
|
ingest_path::<_, _, _, &[u8]>(
|
|
blob_service,
|
|
directory_service,
|
|
PathBuf::from(elem.path.to_absolute_path()),
|
|
None,
|
|
)
|
|
.await
|
|
.map(|root_node| (elem, root_node))
|
|
}
|
|
})
|
|
.buffer_unordered(10)
|
|
.try_collect()
|
|
.await?;
|
|
|
|
// Insert them into the PathInfoService.
|
|
// FUTUREWORK: do this properly respecting the reference graph.
|
|
for (elem, root_node) in uploads {
|
|
// Create and upload a PathInfo pointing to the root_node,
|
|
// annotated with information we have from the reference graph.
|
|
let path_info = PathInfo {
|
|
store_path: elem.path.to_owned(),
|
|
node: root_node,
|
|
references: elem.references.iter().map(StorePath::to_owned).collect(),
|
|
nar_size: elem.nar_size,
|
|
nar_sha256: elem.nar_sha256,
|
|
signatures: elem.signatures.iter().map(|s| s.to_owned()).collect(),
|
|
deriver: elem.deriver.map(|p| p.to_owned()),
|
|
ca: None,
|
|
};
|
|
|
|
path_info_service.put(path_info).await?;
|
|
}
|
|
}
|
|
#[cfg(feature = "fuse")]
|
|
Commands::Mount {
|
|
dest,
|
|
service_addrs,
|
|
list_root,
|
|
threads,
|
|
allow_other,
|
|
show_xattr,
|
|
} => {
|
|
let (blob_service, directory_service, path_info_service, _nar_calculation_service) =
|
|
tvix_store::utils::construct_services(service_addrs).await?;
|
|
|
|
let fuse_daemon = tokio::task::spawn_blocking(move || {
|
|
let fs = make_fs(
|
|
blob_service,
|
|
directory_service,
|
|
path_info_service,
|
|
list_root,
|
|
show_xattr,
|
|
);
|
|
info!(mount_path=?dest, "mounting");
|
|
|
|
FuseDaemon::new(fs, &dest, threads, allow_other)
|
|
})
|
|
.await??;
|
|
|
|
// Wait for a ctrl_c and then call fuse_daemon.unmount().
|
|
tokio::spawn({
|
|
let fuse_daemon = fuse_daemon.clone();
|
|
async move {
|
|
tokio::signal::ctrl_c().await.unwrap();
|
|
info!("interrupt received, unmounting…");
|
|
tokio::task::spawn_blocking(move || fuse_daemon.unmount()).await??;
|
|
info!("unmount occured, terminating…");
|
|
Ok::<_, std::io::Error>(())
|
|
}
|
|
});
|
|
|
|
// Wait for the server to finish, which can either happen through it
|
|
// being unmounted externally, or receiving a signal invoking the
|
|
// handler above.
|
|
tokio::task::spawn_blocking(move || fuse_daemon.wait()).await?
|
|
}
|
|
#[cfg(feature = "virtiofs")]
|
|
Commands::VirtioFs {
|
|
socket,
|
|
service_addrs,
|
|
list_root,
|
|
show_xattr,
|
|
} => {
|
|
let (blob_service, directory_service, path_info_service, _nar_calculation_service) =
|
|
tvix_store::utils::construct_services(service_addrs).await?;
|
|
|
|
tokio::task::spawn_blocking(move || {
|
|
let fs = make_fs(
|
|
blob_service,
|
|
directory_service,
|
|
path_info_service,
|
|
list_root,
|
|
show_xattr,
|
|
);
|
|
info!(socket_path=?socket, "starting virtiofs-daemon");
|
|
|
|
start_virtiofs_daemon(fs, socket)
|
|
})
|
|
.await??;
|
|
}
|
|
};
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::main]
|
|
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
|
let cli = Cli::parse();
|
|
|
|
let tracing_handle = {
|
|
let mut builder = tvix_tracing::TracingBuilder::default();
|
|
builder = builder.enable_progressbar();
|
|
#[cfg(feature = "otlp")]
|
|
{
|
|
if cli.otlp {
|
|
builder = builder.enable_otlp("tvix.store");
|
|
}
|
|
}
|
|
builder.build()?
|
|
};
|
|
|
|
tokio::select! {
|
|
res = tokio::signal::ctrl_c() => {
|
|
res?;
|
|
if let Err(e) = tracing_handle.force_shutdown().await {
|
|
eprintln!("failed to shutdown tracing: {e}");
|
|
}
|
|
Ok(())
|
|
},
|
|
res = run_cli(cli, tracing_handle.clone()) => {
|
|
if let Err(e) = tracing_handle.shutdown().await {
|
|
eprintln!("failed to shutdown tracing: {e}");
|
|
}
|
|
res
|
|
}
|
|
}
|
|
}
|