feat(tvix/store/bin): add "copy" command
This allows copying individual store paths from the local /nix/store into tvix-store. As we don't support getting this information from Nix yet, we currently expect metadata to be provided externally: Nix' `exportReferencesGraph` feature contains pretty much all data we need for this. Expect a list of this information at a well-known key (`closure`, similar to nixpkgs' `pkgs/build-support/binary-cache/ default.nix`). We currently simply upload all store paths sequentially, without any parallelism or awareness in how the reference graph looks like. As long as the connected stores don't enforce this, this is fine, at least for now. Change-Id: Ib83c998465adddfdb110db994843c44e26b3d3d8 Reviewed-on: https://cl.tvl.fyi/c/depot/+/11397 Reviewed-by: raitobezarius <tvl@lahfa.xyz> Tested-by: BuildkiteCI Autosubmit: flokli <flokli@flokli.de>
This commit is contained in:
parent
863c4207cc
commit
a4f65ddba0
4 changed files with 99 additions and 0 deletions
1
tvix/Cargo.lock
generated
1
tvix/Cargo.lock
generated
|
@ -4528,6 +4528,7 @@ dependencies = [
|
|||
"rstest",
|
||||
"rstest_reuse",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"serde_qs",
|
||||
"serde_with",
|
||||
"sha2",
|
||||
|
|
|
@ -14549,6 +14549,10 @@ rec {
|
|||
packageId = "serde";
|
||||
features = [ "derive" ];
|
||||
}
|
||||
{
|
||||
name = "serde_json";
|
||||
packageId = "serde_json";
|
||||
}
|
||||
{
|
||||
name = "serde_qs";
|
||||
packageId = "serde_qs";
|
||||
|
|
|
@ -21,6 +21,7 @@ opentelemetry = { version = "0.21.0", optional = true}
|
|||
opentelemetry-otlp = { version = "0.14.0", optional = true }
|
||||
opentelemetry_sdk = { version = "0.21.0", features = ["rt-tokio"], optional = true}
|
||||
serde = { version = "1.0.197", features = [ "derive" ] }
|
||||
serde_json = "1.0"
|
||||
serde_with = "3.7.0"
|
||||
serde_qs = "0.12.0"
|
||||
sha2 = "0.10.6"
|
||||
|
|
|
@ -2,6 +2,9 @@ use clap::Parser;
|
|||
use clap::Subcommand;
|
||||
|
||||
use futures::future::try_join_all;
|
||||
use nix_compat::path_info::ExportedPathInfo;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use tokio_listener::Listener;
|
||||
|
@ -13,6 +16,9 @@ use tracing::Level;
|
|||
use tracing_subscriber::EnvFilter;
|
||||
use tracing_subscriber::Layer;
|
||||
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
|
||||
use tvix_castore::import::ingest_path;
|
||||
use tvix_store::proto::NarInfo;
|
||||
use tvix_store::proto::PathInfo;
|
||||
|
||||
use tvix_castore::proto::blob_service_server::BlobServiceServer;
|
||||
use tvix_castore::proto::directory_service_server::DirectoryServiceServer;
|
||||
|
@ -101,6 +107,30 @@ enum Commands {
|
|||
#[arg(long, env, default_value = "grpc+http://[::1]:8000")]
|
||||
path_info_service_addr: String,
|
||||
},
|
||||
|
||||
/// Copies a list of store paths on the system into tvix-store.
|
||||
Copy {
|
||||
#[arg(long, env, default_value = "grpc+http://[::1]:8000")]
|
||||
blob_service_addr: String,
|
||||
|
||||
#[arg(long, env, default_value = "grpc+http://[::1]:8000")]
|
||||
directory_service_addr: String,
|
||||
|
||||
#[arg(long, env, default_value = "grpc+http://[::1]:8000")]
|
||||
path_info_service_addr: String,
|
||||
|
||||
/// A path pointing to a JSON file produced by the Nix
|
||||
/// `__structuredAttrs` containing reference graph information provided
|
||||
/// by the `exportReferencesGraph` feature.
|
||||
///
|
||||
/// This can be used to invoke tvix-store inside a Nix derivation
|
||||
/// copying to a Tvix store (or outside, if the JSON file is copied
|
||||
/// out).
|
||||
///
|
||||
/// Currently limited to the `closure` key inside that JSON file.
|
||||
#[arg(value_name = "NIX_ATTRS_JSON_FILE", env = "NIX_ATTRS_JSON_FILE")]
|
||||
reference_graph_path: PathBuf,
|
||||
},
|
||||
/// Mounts a tvix-store at the given mountpoint
|
||||
#[cfg(feature = "fuse")]
|
||||
Mount {
|
||||
|
@ -357,6 +387,69 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||
|
||||
try_join_all(tasks).await?;
|
||||
}
|
||||
Commands::Copy {
|
||||
blob_service_addr,
|
||||
directory_service_addr,
|
||||
path_info_service_addr,
|
||||
reference_graph_path,
|
||||
} => {
|
||||
let (blob_service, directory_service, path_info_service) =
|
||||
tvix_store::utils::construct_services(
|
||||
blob_service_addr,
|
||||
directory_service_addr,
|
||||
path_info_service_addr,
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Parse the file at reference_graph_path.
|
||||
let reference_graph_json = tokio::fs::read(&reference_graph_path).await?;
|
||||
|
||||
#[derive(Deserialize, Serialize)]
|
||||
struct ReferenceGraph<'a> {
|
||||
#[serde(borrow)]
|
||||
closure: Vec<ExportedPathInfo<'a>>,
|
||||
}
|
||||
|
||||
let reference_graph: ReferenceGraph<'_> =
|
||||
serde_json::from_slice(reference_graph_json.as_slice())?;
|
||||
|
||||
// We currently simply upload all store paths in linear order.
|
||||
// FUTUREWORK: properly walk the reference graph from the leaves, and upload multiple in parallel.
|
||||
for elem in reference_graph.closure {
|
||||
// Skip if that store path already exists
|
||||
if path_info_service.get(*elem.path.digest()).await?.is_some() {
|
||||
continue;
|
||||
}
|
||||
|
||||
let path: PathBuf = elem.path.to_absolute_path().into();
|
||||
// Ingest the given path
|
||||
let root_node =
|
||||
ingest_path(blob_service.clone(), directory_service.clone(), path).await?;
|
||||
|
||||
// Create and upload a PathInfo pointing to the root_node,
|
||||
// annotated with information we have from the reference graph.
|
||||
let path_info = PathInfo {
|
||||
node: Some(tvix_castore::proto::Node {
|
||||
node: Some(root_node),
|
||||
}),
|
||||
references: Vec::from_iter(
|
||||
elem.references.iter().map(|e| e.digest().to_vec().into()),
|
||||
),
|
||||
narinfo: Some(NarInfo {
|
||||
nar_size: elem.nar_size,
|
||||
nar_sha256: elem.nar_sha256.to_vec().into(),
|
||||
signatures: vec![],
|
||||
reference_names: Vec::from_iter(
|
||||
elem.references.iter().map(|e| e.to_string()),
|
||||
),
|
||||
deriver: None,
|
||||
ca: None,
|
||||
}),
|
||||
};
|
||||
|
||||
path_info_service.put(path_info).await?;
|
||||
}
|
||||
}
|
||||
#[cfg(feature = "fuse")]
|
||||
Commands::Mount {
|
||||
dest,
|
||||
|
|
Loading…
Reference in a new issue