fix(tvix/glue/tvix_store_io): remove early return
Doing the fetch comes up with the root node, but we still need to descend from there to the desired subpath. Move things around to ensure the fetch case also only sets root_node. This logic should probably be moved into smaller, easier to consume functions. Change-Id: I6ab9317df794f53d2504029bbc77859e89fef1ed Reviewed-on: https://cl.tvl.fyi/c/depot/+/11507 Autosubmit: flokli <flokli@flokli.de> Tested-by: BuildkiteCI Reviewed-by: raitobezarius <tvl@lahfa.xyz>
This commit is contained in:
parent
72d3f9b914
commit
e18bc33529
1 changed files with 161 additions and 152 deletions
|
@ -131,167 +131,176 @@ impl TvixStoreIO {
|
|||
.borrow()
|
||||
.get_fetch_for_output_path(store_path);
|
||||
|
||||
if let Some((name, fetch)) = maybe_fetch {
|
||||
info!(?fetch, "triggering lazy fetch");
|
||||
let (sp, root_node) = self
|
||||
.fetcher
|
||||
.ingest_and_persist(&name, fetch)
|
||||
.await
|
||||
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
|
||||
match maybe_fetch {
|
||||
Some((name, fetch)) => {
|
||||
info!(?fetch, "triggering lazy fetch");
|
||||
let (sp, root_node) = self
|
||||
.fetcher
|
||||
.ingest_and_persist(&name, fetch)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
std::io::Error::new(std::io::ErrorKind::InvalidData, e)
|
||||
})?;
|
||||
|
||||
debug_assert_eq!(
|
||||
sp.to_string(),
|
||||
store_path.to_string(),
|
||||
"store path returned from fetcher should match"
|
||||
);
|
||||
debug_assert_eq!(
|
||||
sp.to_string(),
|
||||
store_path.to_string(),
|
||||
"store path returned from fetcher should match"
|
||||
);
|
||||
|
||||
return Ok(Some(root_node));
|
||||
}
|
||||
|
||||
// Look up the derivation for this output path.
|
||||
let (drv_path, drv) = {
|
||||
let known_paths = self.known_paths.borrow();
|
||||
match known_paths.get_drv_path_for_output_path(store_path) {
|
||||
Some(drv_path) => (
|
||||
drv_path.to_owned(),
|
||||
known_paths.get_drv_by_drvpath(drv_path).unwrap().to_owned(),
|
||||
),
|
||||
None => {
|
||||
warn!(store_path=%store_path, "no drv found");
|
||||
// let StdIO take over
|
||||
return Ok(None);
|
||||
}
|
||||
root_node
|
||||
}
|
||||
};
|
||||
None => {
|
||||
// Look up the derivation for this output path.
|
||||
let (drv_path, drv) = {
|
||||
let known_paths = self.known_paths.borrow();
|
||||
match known_paths.get_drv_path_for_output_path(store_path) {
|
||||
Some(drv_path) => (
|
||||
drv_path.to_owned(),
|
||||
known_paths.get_drv_by_drvpath(drv_path).unwrap().to_owned(),
|
||||
),
|
||||
None => {
|
||||
warn!(store_path=%store_path, "no drv found");
|
||||
// let StdIO take over
|
||||
return Ok(None);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
warn!("triggering build");
|
||||
warn!("triggering build");
|
||||
|
||||
// derivation_to_build_request needs castore nodes for all inputs.
|
||||
// Provide them, which means, here is where we recursively build
|
||||
// all dependencies.
|
||||
#[allow(clippy::mutable_key_type)]
|
||||
let input_nodes: BTreeSet<Node> =
|
||||
futures::stream::iter(drv.input_derivations.iter())
|
||||
.map(|(input_drv_path, output_names)| {
|
||||
// look up the derivation object
|
||||
let input_drv = {
|
||||
let known_paths = self.known_paths.borrow();
|
||||
known_paths
|
||||
.get_drv_by_drvpath(input_drv_path)
|
||||
.unwrap_or_else(|| panic!("{} not found", input_drv_path))
|
||||
.to_owned()
|
||||
// derivation_to_build_request needs castore nodes for all inputs.
|
||||
// Provide them, which means, here is where we recursively build
|
||||
// all dependencies.
|
||||
#[allow(clippy::mutable_key_type)]
|
||||
let input_nodes: BTreeSet<Node> =
|
||||
futures::stream::iter(drv.input_derivations.iter())
|
||||
.map(|(input_drv_path, output_names)| {
|
||||
// look up the derivation object
|
||||
let input_drv = {
|
||||
let known_paths = self.known_paths.borrow();
|
||||
known_paths
|
||||
.get_drv_by_drvpath(input_drv_path)
|
||||
.unwrap_or_else(|| {
|
||||
panic!("{} not found", input_drv_path)
|
||||
})
|
||||
.to_owned()
|
||||
};
|
||||
|
||||
// convert output names to actual paths
|
||||
let output_paths: Vec<StorePath> = output_names
|
||||
.iter()
|
||||
.map(|output_name| {
|
||||
input_drv
|
||||
.outputs
|
||||
.get(output_name)
|
||||
.expect("missing output_name")
|
||||
.path
|
||||
.as_ref()
|
||||
.expect("missing output path")
|
||||
.clone()
|
||||
})
|
||||
.collect();
|
||||
// For each output, ask for the castore node.
|
||||
// We're in a per-derivation context, so if they're
|
||||
// not built yet they'll all get built together.
|
||||
// If they don't need to build, we can however still
|
||||
// substitute all in parallel (if they don't need to
|
||||
// be built) - so we turn this into a stream of streams.
|
||||
// It's up to the builder to deduplicate same build requests.
|
||||
futures::stream::iter(output_paths.into_iter()).map(
|
||||
|output_path| async move {
|
||||
let node = self
|
||||
.store_path_to_node(&output_path, Path::new(""))
|
||||
.await?;
|
||||
|
||||
if let Some(node) = node {
|
||||
Ok(node)
|
||||
} else {
|
||||
Err(io::Error::other("no node produced"))
|
||||
}
|
||||
},
|
||||
)
|
||||
})
|
||||
.flatten()
|
||||
.buffer_unordered(10) // TODO: make configurable
|
||||
.try_collect()
|
||||
.await?;
|
||||
|
||||
// TODO: check if input sources are sufficiently dealth with,
|
||||
// I think yes, they must be imported into the store by other
|
||||
// operations, so dealt with in the Some(…) match arm
|
||||
|
||||
// synthesize the build request.
|
||||
let build_request = derivation_to_build_request(&drv, input_nodes)?;
|
||||
|
||||
// create a build
|
||||
let build_result = self
|
||||
.build_service
|
||||
.as_ref()
|
||||
.do_build(build_request)
|
||||
.await
|
||||
.map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?;
|
||||
|
||||
// TODO: refscan?
|
||||
|
||||
// For each output, insert a PathInfo.
|
||||
for output in &build_result.outputs {
|
||||
let root_node = output.node.as_ref().expect("invalid root node");
|
||||
|
||||
// calculate the nar representation
|
||||
let (nar_size, nar_sha256) =
|
||||
self.path_info_service.calculate_nar(root_node).await?;
|
||||
|
||||
// assemble the PathInfo to persist
|
||||
let path_info = PathInfo {
|
||||
node: Some(tvix_castore::proto::Node {
|
||||
node: Some(root_node.clone()),
|
||||
}),
|
||||
references: vec![], // TODO: refscan
|
||||
narinfo: Some(tvix_store::proto::NarInfo {
|
||||
nar_size,
|
||||
nar_sha256: Bytes::from(nar_sha256.to_vec()),
|
||||
signatures: vec![],
|
||||
reference_names: vec![], // TODO: refscan
|
||||
deriver: Some(tvix_store::proto::StorePath {
|
||||
name: drv_path
|
||||
.name()
|
||||
.strip_suffix(".drv")
|
||||
.expect("missing .drv suffix")
|
||||
.to_string(),
|
||||
digest: drv_path.digest().to_vec().into(),
|
||||
}),
|
||||
ca: drv.fod_digest().map(
|
||||
|fod_digest| -> tvix_store::proto::nar_info::Ca {
|
||||
(&CAHash::Nar(nix_compat::nixhash::NixHash::Sha256(
|
||||
fod_digest,
|
||||
)))
|
||||
.into()
|
||||
},
|
||||
),
|
||||
}),
|
||||
};
|
||||
|
||||
// convert output names to actual paths
|
||||
let output_paths: Vec<StorePath> = output_names
|
||||
.iter()
|
||||
.map(|output_name| {
|
||||
input_drv
|
||||
.outputs
|
||||
.get(output_name)
|
||||
.expect("missing output_name")
|
||||
.path
|
||||
.as_ref()
|
||||
.expect("missing output path")
|
||||
.clone()
|
||||
})
|
||||
.collect();
|
||||
// For each output, ask for the castore node.
|
||||
// We're in a per-derivation context, so if they're
|
||||
// not built yet they'll all get built together.
|
||||
// If they don't need to build, we can however still
|
||||
// substitute all in parallel (if they don't need to
|
||||
// be built) - so we turn this into a stream of streams.
|
||||
// It's up to the builder to deduplicate same build requests.
|
||||
futures::stream::iter(output_paths.into_iter()).map(
|
||||
|output_path| async move {
|
||||
let node = self
|
||||
.store_path_to_node(&output_path, Path::new(""))
|
||||
.await?;
|
||||
self.path_info_service
|
||||
.put(path_info)
|
||||
.await
|
||||
.map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?;
|
||||
}
|
||||
|
||||
if let Some(node) = node {
|
||||
Ok(node)
|
||||
} else {
|
||||
Err(io::Error::other("no node produced"))
|
||||
}
|
||||
},
|
||||
)
|
||||
})
|
||||
.flatten()
|
||||
.buffer_unordered(10) // TODO: make configurable
|
||||
.try_collect()
|
||||
.await?;
|
||||
|
||||
// TODO: check if input sources are sufficiently dealth with,
|
||||
// I think yes, they must be imported into the store by other
|
||||
// operations, so dealt with in the Some(…) match arm
|
||||
|
||||
// synthesize the build request.
|
||||
let build_request = derivation_to_build_request(&drv, input_nodes)?;
|
||||
|
||||
// create a build
|
||||
let build_result = self
|
||||
.build_service
|
||||
.as_ref()
|
||||
.do_build(build_request)
|
||||
.await
|
||||
.map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?;
|
||||
|
||||
// TODO: refscan?
|
||||
|
||||
// For each output, insert a PathInfo.
|
||||
for output in &build_result.outputs {
|
||||
let root_node = output.node.as_ref().expect("invalid root node");
|
||||
|
||||
// calculate the nar representation
|
||||
let (nar_size, nar_sha256) =
|
||||
self.path_info_service.calculate_nar(root_node).await?;
|
||||
|
||||
// assemble the PathInfo to persist
|
||||
let path_info = PathInfo {
|
||||
node: Some(tvix_castore::proto::Node {
|
||||
node: Some(root_node.clone()),
|
||||
}),
|
||||
references: vec![], // TODO: refscan
|
||||
narinfo: Some(tvix_store::proto::NarInfo {
|
||||
nar_size,
|
||||
nar_sha256: Bytes::from(nar_sha256.to_vec()),
|
||||
signatures: vec![],
|
||||
reference_names: vec![], // TODO: refscan
|
||||
deriver: Some(tvix_store::proto::StorePath {
|
||||
name: drv_path
|
||||
.name()
|
||||
.strip_suffix(".drv")
|
||||
.expect("missing .drv suffix")
|
||||
.to_string(),
|
||||
digest: drv_path.digest().to_vec().into(),
|
||||
}),
|
||||
ca: drv.fod_digest().map(
|
||||
|fod_digest| -> tvix_store::proto::nar_info::Ca {
|
||||
(&CAHash::Nar(nix_compat::nixhash::NixHash::Sha256(fod_digest)))
|
||||
.into()
|
||||
},
|
||||
),
|
||||
}),
|
||||
};
|
||||
|
||||
self.path_info_service
|
||||
.put(path_info)
|
||||
.await
|
||||
.map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?;
|
||||
// find the output for the store path requested
|
||||
build_result
|
||||
.outputs
|
||||
.into_iter()
|
||||
.find(|output_node| {
|
||||
output_node.node.as_ref().expect("invalid node").get_name()
|
||||
== store_path.to_string().as_bytes()
|
||||
})
|
||||
.expect("build didn't produce the store path")
|
||||
.node
|
||||
.expect("invalid node")
|
||||
}
|
||||
}
|
||||
|
||||
// find the output for the store path requested
|
||||
build_result
|
||||
.outputs
|
||||
.into_iter()
|
||||
.find(|output_node| {
|
||||
output_node.node.as_ref().expect("invalid node").get_name()
|
||||
== store_path.to_string().as_bytes()
|
||||
})
|
||||
.expect("build didn't produce the store path")
|
||||
.node
|
||||
.expect("invalid node")
|
||||
}
|
||||
};
|
||||
|
||||
|
|
Loading…
Reference in a new issue