feat(tvix/store/bin): factor out import
While at it, make it a bit more generic. Change-Id: Ic4caefda93aca3ffb656a09f8b4d648b41415532 Reviewed-on: https://cl.tvl.fyi/c/depot/+/10511 Autosubmit: flokli <flokli@flokli.de> Reviewed-by: raitobezarius <tvl@lahfa.xyz> Tested-by: BuildkiteCI
This commit is contained in:
parent
ddae4860c2
commit
52a61e353b
2 changed files with 132 additions and 113 deletions
|
@ -1,27 +1,22 @@
|
||||||
use clap::Subcommand;
|
use clap::Subcommand;
|
||||||
use data_encoding::BASE64;
|
|
||||||
use futures::future::try_join_all;
|
use futures::future::try_join_all;
|
||||||
use nix_compat::store_path;
|
|
||||||
use std::path::Path;
|
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::task::JoinHandle;
|
|
||||||
use tokio_listener::Listener;
|
use tokio_listener::Listener;
|
||||||
use tokio_listener::SystemOptions;
|
use tokio_listener::SystemOptions;
|
||||||
use tokio_listener::UserOptions;
|
use tokio_listener::UserOptions;
|
||||||
|
|
||||||
use tracing_subscriber::prelude::*;
|
use tracing_subscriber::prelude::*;
|
||||||
use tvix_castore::import;
|
|
||||||
use tvix_castore::proto::blob_service_server::BlobServiceServer;
|
use tvix_castore::proto::blob_service_server::BlobServiceServer;
|
||||||
use tvix_castore::proto::directory_service_server::DirectoryServiceServer;
|
use tvix_castore::proto::directory_service_server::DirectoryServiceServer;
|
||||||
use tvix_castore::proto::node::Node;
|
|
||||||
use tvix_castore::proto::GRPCBlobServiceWrapper;
|
use tvix_castore::proto::GRPCBlobServiceWrapper;
|
||||||
use tvix_castore::proto::GRPCDirectoryServiceWrapper;
|
use tvix_castore::proto::GRPCDirectoryServiceWrapper;
|
||||||
use tvix_store::pathinfoservice::PathInfoService;
|
use tvix_store::pathinfoservice::PathInfoService;
|
||||||
use tvix_store::proto::nar_info;
|
|
||||||
use tvix_store::proto::path_info_service_server::PathInfoServiceServer;
|
use tvix_store::proto::path_info_service_server::PathInfoServiceServer;
|
||||||
use tvix_store::proto::GRPCPathInfoServiceWrapper;
|
use tvix_store::proto::GRPCPathInfoServiceWrapper;
|
||||||
use tvix_store::proto::NarInfo;
|
|
||||||
use tvix_store::proto::PathInfo;
|
|
||||||
|
|
||||||
#[cfg(any(feature = "fuse", feature = "virtiofs"))]
|
#[cfg(any(feature = "fuse", feature = "virtiofs"))]
|
||||||
use tvix_store::pathinfoservice::make_fs;
|
use tvix_store::pathinfoservice::make_fs;
|
||||||
|
@ -253,86 +248,25 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
let tasks = paths
|
let tasks = paths
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|path| {
|
.map(|path| {
|
||||||
let task: JoinHandle<std::io::Result<()>> = tokio::task::spawn({
|
tokio::task::spawn({
|
||||||
let blob_service = blob_service.clone();
|
let blob_service = blob_service.clone();
|
||||||
let directory_service = directory_service.clone();
|
let directory_service = directory_service.clone();
|
||||||
let path_info_service = path_info_service.clone();
|
let path_info_service = path_info_service.clone();
|
||||||
|
|
||||||
async move {
|
async move {
|
||||||
// calculate the name
|
let resp = tvix_store::utils::import_path(
|
||||||
let name = path
|
path,
|
||||||
.file_name()
|
blob_service,
|
||||||
.and_then(|file_name| file_name.to_str())
|
directory_service,
|
||||||
.ok_or_else(|| {
|
path_info_service,
|
||||||
std::io::Error::new(
|
|
||||||
std::io::ErrorKind::InvalidInput,
|
|
||||||
"path must not be .. and the basename valid unicode",
|
|
||||||
)
|
|
||||||
})?;
|
|
||||||
|
|
||||||
// Ingest the path into blob and directory service.
|
|
||||||
let root_node = import::ingest_path(
|
|
||||||
blob_service.clone(),
|
|
||||||
directory_service.clone(),
|
|
||||||
&path,
|
|
||||||
)
|
)
|
||||||
.await
|
.await;
|
||||||
.expect("failed to ingest path");
|
if let Ok(output_path) = resp {
|
||||||
|
// If the import was successful, print the path to stdout.
|
||||||
// Ask the PathInfoService for the NAR size and sha256
|
println!("{}", output_path.to_absolute_path());
|
||||||
let (nar_size, nar_sha256) =
|
}
|
||||||
path_info_service.calculate_nar(&root_node).await?;
|
|
||||||
|
|
||||||
// Calculate the output path. This might still fail, as some names are illegal.
|
|
||||||
let output_path =
|
|
||||||
store_path::build_nar_based_store_path(&nar_sha256, name).map_err(
|
|
||||||
|_| {
|
|
||||||
std::io::Error::new(
|
|
||||||
std::io::ErrorKind::InvalidData,
|
|
||||||
format!("invalid name: {}", name),
|
|
||||||
)
|
|
||||||
},
|
|
||||||
)?;
|
|
||||||
|
|
||||||
// assemble a new root_node with a name that is derived from the nar hash.
|
|
||||||
let root_node =
|
|
||||||
root_node.rename(output_path.to_string().into_bytes().into());
|
|
||||||
|
|
||||||
// assemble the [crate::proto::PathInfo] object.
|
|
||||||
let path_info = PathInfo {
|
|
||||||
node: Some(tvix_castore::proto::Node {
|
|
||||||
node: Some(root_node),
|
|
||||||
}),
|
|
||||||
// There's no reference scanning on path contents ingested like this.
|
|
||||||
references: vec![],
|
|
||||||
narinfo: Some(NarInfo {
|
|
||||||
nar_size,
|
|
||||||
nar_sha256: nar_sha256.to_vec().into(),
|
|
||||||
signatures: vec![],
|
|
||||||
reference_names: vec![],
|
|
||||||
deriver: None,
|
|
||||||
ca: Some(nar_info::Ca {
|
|
||||||
r#type: tvix_store::proto::nar_info::ca::Hash::NarSha256
|
|
||||||
.into(),
|
|
||||||
digest: nar_sha256.to_vec().into(),
|
|
||||||
}),
|
|
||||||
}),
|
|
||||||
};
|
|
||||||
|
|
||||||
// put into [PathInfoService], and return the PathInfo that we get back
|
|
||||||
// from there (it might contain additional signatures).
|
|
||||||
let path_info = path_info_service.put(path_info).await?;
|
|
||||||
|
|
||||||
let node = path_info.node.unwrap().node.unwrap();
|
|
||||||
|
|
||||||
log_node(&node, &path);
|
|
||||||
|
|
||||||
println!("{}", output_path.to_absolute_path());
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
});
|
})
|
||||||
task
|
|
||||||
})
|
})
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
@ -411,32 +345,3 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
};
|
};
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn log_node(node: &Node, path: &Path) {
|
|
||||||
match node {
|
|
||||||
Node::Directory(directory_node) => {
|
|
||||||
info!(
|
|
||||||
path = ?path,
|
|
||||||
name = ?directory_node.name,
|
|
||||||
digest = BASE64.encode(&directory_node.digest),
|
|
||||||
"import successful",
|
|
||||||
)
|
|
||||||
}
|
|
||||||
Node::File(file_node) => {
|
|
||||||
info!(
|
|
||||||
path = ?path,
|
|
||||||
name = ?file_node.name,
|
|
||||||
digest = BASE64.encode(&file_node.digest),
|
|
||||||
"import successful"
|
|
||||||
)
|
|
||||||
}
|
|
||||||
Node::Symlink(symlink_node) => {
|
|
||||||
info!(
|
|
||||||
path = ?path,
|
|
||||||
name = ?symlink_node.name,
|
|
||||||
target = ?symlink_node.target,
|
|
||||||
"import successful"
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -1,11 +1,18 @@
|
||||||
use std::sync::Arc;
|
use std::{ops::Deref, path::Path, sync::Arc};
|
||||||
|
|
||||||
|
use data_encoding::BASE64;
|
||||||
|
use nix_compat::store_path::{self, StorePath};
|
||||||
|
use tracing::{debug, instrument};
|
||||||
use tvix_castore::{
|
use tvix_castore::{
|
||||||
blobservice::{self, BlobService},
|
blobservice::{self, BlobService},
|
||||||
directoryservice::{self, DirectoryService},
|
directoryservice::{self, DirectoryService},
|
||||||
|
proto::node::Node,
|
||||||
};
|
};
|
||||||
|
|
||||||
use crate::pathinfoservice::{self, PathInfoService};
|
use crate::{
|
||||||
|
pathinfoservice::{self, PathInfoService},
|
||||||
|
proto::{nar_info, NarInfo, PathInfo},
|
||||||
|
};
|
||||||
|
|
||||||
/// Construct the three store handles from their addrs.
|
/// Construct the three store handles from their addrs.
|
||||||
pub async fn construct_services(
|
pub async fn construct_services(
|
||||||
|
@ -33,3 +40,110 @@ pub async fn construct_services(
|
||||||
|
|
||||||
Ok((blob_service, directory_service, path_info_service))
|
Ok((blob_service, directory_service, path_info_service))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Imports a given path on the filesystem into the store, and returns the
|
||||||
|
/// [PathInfo] describing the path, that was sent to
|
||||||
|
/// [PathInfoService].
|
||||||
|
#[instrument(skip_all, fields(path=?path), err)]
|
||||||
|
pub async fn import_path<BS, DS, PS, P>(
|
||||||
|
path: P,
|
||||||
|
blob_service: BS,
|
||||||
|
directory_service: DS,
|
||||||
|
path_info_service: PS,
|
||||||
|
) -> Result<StorePath, std::io::Error>
|
||||||
|
where
|
||||||
|
P: AsRef<Path> + std::fmt::Debug,
|
||||||
|
BS: Deref<Target = dyn BlobService> + Clone,
|
||||||
|
DS: Deref<Target = dyn DirectoryService> + Clone,
|
||||||
|
PS: Deref<Target = dyn PathInfoService>,
|
||||||
|
{
|
||||||
|
// calculate the name
|
||||||
|
// TODO: make a path_to_name helper function?
|
||||||
|
let name = path
|
||||||
|
.as_ref()
|
||||||
|
.file_name()
|
||||||
|
.and_then(|file_name| file_name.to_str())
|
||||||
|
.ok_or_else(|| {
|
||||||
|
std::io::Error::new(
|
||||||
|
std::io::ErrorKind::InvalidInput,
|
||||||
|
"path must not be .. and the basename valid unicode",
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
|
||||||
|
// Ingest the path into blob and directory service.
|
||||||
|
let root_node = tvix_castore::import::ingest_path(blob_service, directory_service, &path)
|
||||||
|
.await
|
||||||
|
.expect("failed to ingest path");
|
||||||
|
|
||||||
|
debug!(root_node =?root_node, "import successful");
|
||||||
|
|
||||||
|
// Ask the PathInfoService for the NAR size and sha256
|
||||||
|
let (nar_size, nar_sha256) = path_info_service.calculate_nar(&root_node).await?;
|
||||||
|
|
||||||
|
// Calculate the output path. This might still fail, as some names are illegal.
|
||||||
|
let output_path = store_path::build_nar_based_store_path(&nar_sha256, name).map_err(|_| {
|
||||||
|
std::io::Error::new(
|
||||||
|
std::io::ErrorKind::InvalidData,
|
||||||
|
format!("invalid name: {}", name),
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
|
||||||
|
// assemble a new root_node with a name that is derived from the nar hash.
|
||||||
|
let root_node = root_node.rename(output_path.to_string().into_bytes().into());
|
||||||
|
log_node(&root_node, path.as_ref());
|
||||||
|
|
||||||
|
// assemble the [crate::proto::PathInfo] object.
|
||||||
|
let path_info = PathInfo {
|
||||||
|
node: Some(tvix_castore::proto::Node {
|
||||||
|
node: Some(root_node),
|
||||||
|
}),
|
||||||
|
// There's no reference scanning on path contents ingested like this.
|
||||||
|
references: vec![],
|
||||||
|
narinfo: Some(NarInfo {
|
||||||
|
nar_size,
|
||||||
|
nar_sha256: nar_sha256.to_vec().into(),
|
||||||
|
signatures: vec![],
|
||||||
|
reference_names: vec![],
|
||||||
|
deriver: None,
|
||||||
|
ca: Some(nar_info::Ca {
|
||||||
|
r#type: nar_info::ca::Hash::NarSha256.into(),
|
||||||
|
digest: nar_sha256.to_vec().into(),
|
||||||
|
}),
|
||||||
|
}),
|
||||||
|
};
|
||||||
|
|
||||||
|
// put into [PathInfoService], and return the PathInfo that we get back
|
||||||
|
// from there (it might contain additional signatures).
|
||||||
|
let _path_info = path_info_service.put(path_info).await?;
|
||||||
|
|
||||||
|
Ok(output_path.to_owned())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn log_node(node: &Node, path: &Path) {
|
||||||
|
match node {
|
||||||
|
Node::Directory(directory_node) => {
|
||||||
|
debug!(
|
||||||
|
path = ?path,
|
||||||
|
name = ?directory_node.name,
|
||||||
|
digest = BASE64.encode(&directory_node.digest),
|
||||||
|
"import successful",
|
||||||
|
)
|
||||||
|
}
|
||||||
|
Node::File(file_node) => {
|
||||||
|
debug!(
|
||||||
|
path = ?path,
|
||||||
|
name = ?file_node.name,
|
||||||
|
digest = BASE64.encode(&file_node.digest),
|
||||||
|
"import successful"
|
||||||
|
)
|
||||||
|
}
|
||||||
|
Node::Symlink(symlink_node) => {
|
||||||
|
debug!(
|
||||||
|
path = ?path,
|
||||||
|
name = ?symlink_node.name,
|
||||||
|
target = ?symlink_node.target,
|
||||||
|
"import successful"
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue