feat(tvix/build/oci): wire up refscanning

Change-Id: I07d016f831dcc596b4627f1d8f33909e632be416
Reviewed-on: https://cl.tvl.fyi/c/depot/+/12533
Autosubmit: yuka <yuka@yuka.dev>
Tested-by: BuildkiteCI
Reviewed-by: flokli <flokli@flokli.de>
This commit is contained in:
Yureka 2024-09-27 22:19:51 +02:00 committed by clbot
parent 2414c87282
commit 284c1eb45a

View file

@ -5,14 +5,18 @@ use tokio::process::{Child, Command};
use tonic::async_trait; use tonic::async_trait;
use tracing::{debug, instrument, warn, Span}; use tracing::{debug, instrument, warn, Span};
use tvix_castore::{ use tvix_castore::{
blobservice::BlobService, directoryservice::DirectoryService, fs::fuse::FuseDaemon, blobservice::BlobService,
import::fs::ingest_path, Node, PathComponent, directoryservice::DirectoryService,
fs::fuse::FuseDaemon,
import::fs::ingest_path,
refscan::{ReferencePattern, ReferenceScanner},
Node, PathComponent,
}; };
use uuid::Uuid; use uuid::Uuid;
use crate::{ use crate::{
oci::{get_host_output_paths, make_bundle, make_spec}, oci::{get_host_output_paths, make_bundle, make_spec},
proto::{Build, BuildRequest}, proto::{build::OutputNeedles, Build, BuildRequest},
}; };
use std::{collections::BTreeMap, ffi::OsStr, path::PathBuf, process::Stdio}; use std::{collections::BTreeMap, ffi::OsStr, path::PathBuf, process::Stdio};
@ -123,16 +127,17 @@ where
.context("failed to calculate host output paths") .context("failed to calculate host output paths")
.map_err(std::io::Error::other)?; .map_err(std::io::Error::other)?;
// NOTE: impl Drop for FuseDaemon unmounts, so if the call is cancelled, umount.
let _fuse_daemon = tokio::task::spawn_blocking({
let blob_service = self.blob_service.clone();
let directory_service = self.directory_service.clone();
// assemble a BTreeMap of Nodes to pass into TvixStoreFs. // assemble a BTreeMap of Nodes to pass into TvixStoreFs.
let root_nodes: BTreeMap<PathComponent, Node> = let root_nodes: BTreeMap<PathComponent, Node> =
BTreeMap::from_iter(request.inputs.iter().map(|input| { BTreeMap::from_iter(request.inputs.iter().map(|input| {
// We know from validation this is Some. // We know from validation this is Some.
input.clone().into_name_and_node().unwrap() input.clone().into_name_and_node().unwrap()
})); }));
let patterns = ReferencePattern::new(request.refscan_needles.clone());
// NOTE: impl Drop for FuseDaemon unmounts, so if the call is cancelled, umount.
let _fuse_daemon = tokio::task::spawn_blocking({
let blob_service = self.blob_service.clone();
let directory_service = self.directory_service.clone();
debug!(inputs=?root_nodes.keys(), "got inputs"); debug!(inputs=?root_nodes.keys(), "got inputs");
@ -184,17 +189,19 @@ where
// Ingest build outputs into the castore. // Ingest build outputs into the castore.
// We use try_join_all here. No need to spawn new tasks, as this is // We use try_join_all here. No need to spawn new tasks, as this is
// mostly IO bound. // mostly IO bound.
let outputs = futures::future::try_join_all(host_output_paths.into_iter().enumerate().map( let (outputs, outputs_needles) = futures::future::try_join_all(
|(i, p)| { host_output_paths.into_iter().enumerate().map(|(i, p)| {
let output_path = request.outputs[i].clone(); let output_path = request.outputs[i].clone();
let patterns = patterns.clone();
async move { async move {
debug!(host.path=?p, output.path=?output_path, "ingesting path"); debug!(host.path=?p, output.path=?output_path, "ingesting path");
let output_node = ingest_path::<_, _, _, &[u8]>( let scanner = ReferenceScanner::new(patterns);
let output_node = ingest_path(
self.blob_service.clone(), self.blob_service.clone(),
&self.directory_service, &self.directory_service,
p, p,
None, Some(&scanner),
) )
.await .await
.map_err(|e| { .map_err(|e| {
@ -204,19 +211,39 @@ where
) )
})?; })?;
Ok::<_, std::io::Error>(tvix_castore::proto::Node::from_name_and_node( let needles = OutputNeedles {
"".into(), needles: scanner
.matches()
.into_iter()
.enumerate()
.filter(|(_, val)| *val)
.map(|(idx, _)| idx as u64)
.collect(),
};
Ok::<_, std::io::Error>((
tvix_castore::proto::Node::from_name_and_node(
PathBuf::from(output_path)
.file_name()
.and_then(|s| s.to_str())
.map(|s| s.to_string())
.unwrap_or("".into())
.into(),
output_node, output_node,
),
needles,
)) ))
} }
}, }),
)) )
.await?; .await?
.into_iter()
.unzip();
Ok(Build { Ok(Build {
build_request: Some(request.clone()), build_request: Some(request.clone()),
outputs, outputs,
outputs_needles: vec![], // TODO refscanning outputs_needles,
}) })
} }
} }