Build each node individually

Now nodes that take a long time to build won't bottleneck the
deployment of other nodes in the same chunk.

Fixes #47.
This commit is contained in:
Zhaofeng Li 2021-12-07 23:13:31 -08:00
parent ea09e60e36
commit eebded1786
10 changed files with 143 additions and 204 deletions

View file

@ -13,7 +13,7 @@ use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
use tokio::time;
use uuid::Uuid;
use crate::nix::{NixResult, NixError, NodeName, ProfileMap};
use crate::nix::{NixResult, NixError, NodeName};
use crate::progress::{Sender as ProgressSender, Message as ProgressMessage, Line, LineStyle};
pub type Sender = UnboundedSender<Event>;
@ -208,9 +208,6 @@ pub enum EventPayload {
/// The job wants to transition to a new state.
NewState(JobState),
/// The job built a set of system profiles.
ProfilesBuilt(ProfileMap),
/// The child process printed a line to stdout.
ChildStdout(String),
@ -348,19 +345,6 @@ impl JobMonitor {
self.print_job_stats();
}
}
EventPayload::ProfilesBuilt(profiles) => {
if let Some(sender) = &self.progress {
for (name, profile) in profiles.iter() {
let text = format!("Built {:?}", profile.as_path());
let line = Line::new(message.job_id, text)
.label(name.as_str().to_string())
.one_off()
.style(LineStyle::Success);
let pm = self.get_print_message(message.job_id, line);
sender.send(pm).unwrap();
}
}
}
EventPayload::ChildStdout(m) | EventPayload::ChildStderr(m) | EventPayload::Message(m) => {
if let Some(sender) = &self.progress {
let metadata = &self.jobs[&message.job_id];
@ -598,11 +582,6 @@ impl JobHandleInner {
self.send_payload(EventPayload::Failure(error.to_string()))
}
/// Sends a set of built profiles.
pub fn profiles_built(&self, profiles: ProfileMap) -> NixResult<()> {
self.send_payload(EventPayload::ProfilesBuilt(profiles))
}
/// Runs a closure, automatically updating the job monitor based on the result.
async fn run_internal<F, U, T>(self: Arc<Self>, f: U, report_running: bool) -> NixResult<T>
where U: FnOnce(Arc<Self>) -> F,
@ -788,7 +767,6 @@ impl Display for EventPayload {
EventPayload::Noop(m) => write!(f, " noop) {}", m)?,
EventPayload::Failure(e) => write!(f, " failure) {}", e)?,
EventPayload::ShutdownMonitor => write!(f, "shutdown)")?,
EventPayload::ProfilesBuilt(pm) => write!(f, " built) {:?}", pm)?,
}
Ok(())

View file

@ -14,9 +14,6 @@ pub struct ParallelismLimit {
/// Limit of concurrent evaluation processes.
pub evaluation: Semaphore,
/// Limit of concurrent build processes.
pub build: Semaphore,
/// Limit of concurrent apply processes.
pub apply: Semaphore,
}
@ -25,7 +22,6 @@ impl Default for ParallelismLimit {
fn default() -> Self {
Self {
evaluation: Semaphore::new(1),
build: Semaphore::new(2),
apply: Semaphore::new(10),
}
}

View file

@ -29,8 +29,7 @@ use super::{
NixError,
NixResult,
Profile,
ProfileMap,
StoreDerivation,
ProfileDerivation,
CopyDirection,
key::{Key, UploadAt as UploadKeyAt},
};
@ -54,6 +53,9 @@ pub struct Deployment {
/// Deployment options.
options: Options,
/// Options passed to Nix invocations.
nix_options: Vec<String>,
/// Handle to send messages to the ProgressOutput.
progress: Option<ProgressSender>,
@ -102,12 +104,13 @@ impl Deployment {
Self {
hive,
goal,
options: Options::default(),
nix_options: Vec::new(),
progress,
nodes: targets.keys().cloned().collect(),
targets,
parallelism_limit: ParallelismLimit::default(),
evaluation_node_limit: EvaluationNodeLimit::default(),
options: Options::default(),
executed: false,
}
}
@ -129,6 +132,9 @@ impl Deployment {
monitor.set_label_width(width);
}
let nix_options = self.hive.nix_options().await?;
self.nix_options = nix_options;
if self.goal == Goal::UploadKeys {
// Just upload keys
let targets = mem::take(&mut self.targets);
@ -218,45 +224,24 @@ impl Deployment {
}
let nodes: Vec<NodeName> = chunk.keys().cloned().collect();
let profiles = self.clone().build_nodes(parent.clone(), nodes.clone()).await?;
if self.goal == Goal::Build {
return Ok(());
}
let profile_drvs = self.clone().evaluate_nodes(parent.clone(), nodes.clone()).await?;
let mut futures = Vec::new();
for (name, profile) in profiles.iter() {
for (name, profile_drv) in profile_drvs.iter() {
let target = chunk.remove(name).unwrap();
futures.push(self.clone().deploy_node(parent.clone(), target, profile.clone()));
futures.push(self.clone().deploy_node(parent.clone(), target, profile_drv.clone()));
}
join_all(futures).await
.into_iter().collect::<NixResult<Vec<()>>>()?;
// Create GC root
if self.options.create_gc_roots {
let job = parent.create_job(JobType::CreateGcRoots, nodes.clone())?;
let arc_self = self.clone();
job.run_waiting(|job| async move {
if let Some(dir) = arc_self.hive.context_dir() {
job.state(JobState::Running)?;
let base = dir.join(".gcroots");
profiles.create_gc_roots(&base).await?;
} else {
job.noop("No context directory to create GC roots in".to_string())?;
}
Ok(())
}).await?;
}
Ok(())
}
/// Evaluates a set of nodes, returning a store derivation.
/// Evaluates a set of nodes, returning their corresponding store derivations.
async fn evaluate_nodes(self: DeploymentHandle, parent: JobHandle, nodes: Vec<NodeName>)
-> NixResult<StoreDerivation<ProfileMap>>
-> NixResult<HashMap<NodeName, ProfileDerivation>>
{
let job = parent.create_job(JobType::Evaluate, nodes.clone())?;
@ -272,33 +257,6 @@ impl Deployment {
}).await
}
/// Builds a set of nodes, returning a set of profiles.
async fn build_nodes(self: DeploymentHandle, parent: JobHandle, nodes: Vec<NodeName>)
-> NixResult<ProfileMap>
{
let job = parent.create_job(JobType::Build, nodes.clone())?;
job.run_waiting(|job| async move {
let derivation = self.clone().evaluate_nodes(job.clone(), nodes.clone()).await?;
// Wait for build limit
let permit = self.parallelism_limit.apply.acquire().await.unwrap();
job.state(JobState::Running)?;
// FIXME: Remote builder?
let nix_options = self.hive.nix_options().await.unwrap();
let mut builder = host::local(nix_options);
builder.set_job(Some(job.clone()));
let map = derivation.realize(&mut *builder).await?;
job.profiles_built(map.clone())?;
drop(permit);
Ok(map)
}).await
}
/// Only uploads keys to a node.
async fn upload_keys_to_node(self: DeploymentHandle, parent: JobHandle, mut target: TargetNode) -> NixResult<()> {
let nodes = vec![target.name.clone()];
@ -315,20 +273,36 @@ impl Deployment {
}).await
}
/// Pushes and optionally activates a system profile on a given node.
/// Builds, pushes, and optionally activates a system profile on a node.
///
/// This will also upload keys to the node.
async fn deploy_node(self: DeploymentHandle, parent: JobHandle, mut target: TargetNode, profile: Profile)
async fn deploy_node(self: DeploymentHandle, parent: JobHandle, mut target: TargetNode, profile_drv: ProfileDerivation)
-> NixResult<()>
{
if self.goal == Goal::Build {
unreachable!();
}
let nodes = vec![target.name.clone()];
let target_name = target.name.clone();
let permit = self.parallelism_limit.apply.acquire().await.unwrap();
// Build system profile
let build_job = parent.create_job(JobType::Build, nodes.clone())?;
let arc_self = self.clone();
let profile: Profile = build_job.run(|job| async move {
// FIXME: Remote builder?
let mut builder = host::local(arc_self.nix_options.clone());
builder.set_job(Some(job.clone()));
let profile = profile_drv.realize(&mut *builder).await?;
job.success_with_message(format!("Built {:?}", profile.as_path()))?;
Ok(profile)
}).await?;
if self.goal == Goal::Build {
return Ok(());
}
// Push closure to remote
let push_job = parent.create_job(JobType::Push, nodes.clone())?;
let push_profile = profile.clone();
let arc_self = self.clone();
@ -437,6 +411,23 @@ impl Deployment {
}).await?;
}
// Create GC root
if self.options.create_gc_roots {
let job = parent.create_job(JobType::CreateGcRoots, nodes.clone())?;
let arc_self = self.clone();
job.run_waiting(|job| async move {
if let Some(dir) = arc_self.hive.context_dir() {
job.state(JobState::Running)?;
let path = dir.join(".gcroots").join(format!("node-{}", &*target_name));
profile.create_gc_root(&path).await?;
} else {
job.noop("No context directory to create GC roots in".to_string())?;
}
Ok(())
}).await?;
}
drop(permit);
Ok(())

View file

@ -460,19 +460,11 @@ let
deploymentConfigJsonSelected = names: toJSON
(listToAttrs (map (name: { inherit name; value = nodes.${name}.config.deployment; }) names));
buildAll = buildSelected nodeNames;
buildSelected = names: let
# Change in the order of the names should not cause a derivation to be created
selected = lib.attrsets.filterAttrs (name: _: elem name names) toplevel;
in derivation rec {
name = "colmena-${hive.meta.name}";
system = currentSystem;
json = toJSON (lib.attrsets.mapAttrs (k: v: toString v) selected);
builder = pkgs.writeScript "${name}.sh" ''
#!/bin/sh
echo "$json" > $out
'';
};
evalAll = evalSelected nodeNames;
evalSelected = names: let
selected = lib.filterAttrs (name: _: elem name names) toplevel;
drvs = lib.mapAttrs (k: v: v.drvPath) selected;
in drvs;
introspect = function: function {
inherit pkgs lib nodes;
@ -481,7 +473,7 @@ in {
inherit
nodes toplevel
deploymentConfigJson deploymentConfigJsonSelected
buildAll buildSelected introspect;
evalAll evalSelected introspect;
meta = hive.meta;

View file

@ -11,12 +11,12 @@ use validator::Validate;
use super::{
Flake,
StoreDerivation,
NixResult,
NodeName,
NodeConfig,
NodeFilter,
ProfileMap,
ProfileDerivation,
StorePath,
};
use super::deployment::TargetNode;
use super::NixCommand;
@ -250,20 +250,24 @@ impl Hive {
/// Evaluation may take up a lot of memory, so we make it possible
/// to split up the evaluation process into chunks and run them
/// concurrently with other processes (e.g., build and apply).
pub async fn eval_selected(&self, nodes: &[NodeName], job: Option<JobHandle>) -> NixResult<StoreDerivation<ProfileMap>> {
pub async fn eval_selected(&self, nodes: &[NodeName], job: Option<JobHandle>) -> NixResult<HashMap<NodeName, ProfileDerivation>> {
let nodes_expr = SerializedNixExpresssion::new(nodes)?;
let expr = format!("hive.buildSelected {}", nodes_expr.expression());
let expr = format!("hive.evalSelected {}", nodes_expr.expression());
let command = self.nix_instantiate(&expr).instantiate_with_builders().await?;
let command = self.nix_instantiate(&expr)
.eval_with_builders().await?;
let mut execution = CommandExecution::new(command);
execution.set_job(job);
execution.set_hide_stdout(true);
let path = execution.capture_store_path().await?;
let drv = path.into_derivation()
.expect("The result should be a store derivation");
Ok(drv)
execution
.capture_json::<HashMap<NodeName, StorePath>>().await?
.into_iter().map(|(name, path)| {
let path = path.into_derivation()?;
Ok((name, path))
})
.collect()
}
/// Evaluates an expression using values from the configuration
@ -374,8 +378,10 @@ impl<'hive> NixInstantiate<'hive> {
fn eval(self) -> Command {
let mut command = self.instantiate();
command.arg("--eval").arg("--json")
.arg("--read-write-mode"); // For cases involving IFD
command.arg("--eval").arg("--json").arg("--strict")
// Ensures the derivations are instantiated
// Required for system profile evaluation and IFD
.arg("--read-write-mode");
command
}

View file

@ -30,7 +30,7 @@ pub mod key;
pub use key::Key;
pub mod profile;
pub use profile::{Profile, ProfileMap};
pub use profile::{Profile, ProfileDerivation};
pub mod deployment;
pub use deployment::Goal;
@ -78,6 +78,9 @@ pub enum NixError {
#[snafu(display("Failed to upload keys: {}", error))]
KeyError { error: key::KeyError },
#[snafu(display("Store path {:?} is not a derivation", store_path))]
NotADerivation { store_path: StorePath },
#[snafu(display("Invalid NixOS system profile"))]
InvalidProfile,

View file

@ -1,7 +1,4 @@
use std::collections::HashMap;
use std::convert::TryFrom;
use std::fs;
use std::ops::{Deref, DerefMut};
use std::path::Path;
use std::process::Stdio;
@ -11,10 +8,12 @@ use super::{
Goal,
NixResult,
NixError,
NodeName,
StorePath,
StoreDerivation,
};
pub type ProfileDerivation = StoreDerivation<Profile>;
/// A NixOS system profile.
#[derive(Clone, Debug)]
pub struct Profile(StorePath);
@ -61,80 +60,40 @@ impl Profile {
pub fn as_path(&self) -> &Path {
self.0.as_path()
}
}
/// A map of names to their associated NixOS system profiles.
#[derive(Debug, Clone)]
pub struct ProfileMap(HashMap<NodeName, Profile>);
/// Create a GC root for this profile.
pub async fn create_gc_root(&self, path: &Path) -> NixResult<()> {
let mut command = Command::new("nix-store");
command.args(&["--no-build-output", "--indirect", "--add-root", path.to_str().unwrap()]);
command.args(&["--realise", self.as_path().to_str().unwrap()]);
command.stdout(Stdio::null());
impl Deref for ProfileMap {
type Target = HashMap<NodeName, Profile>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl DerefMut for ProfileMap {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl TryFrom<Vec<StorePath>> for ProfileMap {
type Error = NixError;
fn try_from(paths: Vec<StorePath>) -> NixResult<Self> {
match paths.len() {
0 => Err(NixError::BadOutput {
output: String::from("Build produced no outputs"),
}),
l if l > 1 => Err(NixError::BadOutput {
output: String::from("Build produced multiple outputs"),
}),
_ => {
// We expect a JSON file containing a
// HashMap<String, StorePath>
let path = paths[0].as_path();
let json: String = fs::read_to_string(path)?;
let mut raw_map: HashMap<NodeName, StorePath> = serde_json::from_str(&json).map_err(|_| NixError::BadOutput {
output: String::from("The returned profile map is invalid"),
})?;
let mut checked_map = HashMap::new();
for (node, profile) in raw_map.drain() {
let profile = Profile::from_store_path(profile)?;
checked_map.insert(node, profile);
}
Ok(Self(checked_map))
}
}
}
}
impl ProfileMap {
/// Create GC roots for all profiles in the map.
///
/// The created links will be located at `{base}/node-{node_name}`.
pub async fn create_gc_roots(&self, base: &Path) -> NixResult<()> {
// This will actually try to build all profiles, but since they
// already exist only the GC roots will be created.
for (node, profile) in self.0.iter() {
let path = base.join(format!("node-{}", node.as_str()));
let mut command = Command::new("nix-store");
command.args(&["--no-build-output", "--indirect", "--add-root", path.to_str().unwrap()]);
command.args(&["--realise", profile.as_path().to_str().unwrap()]);
command.stdout(Stdio::null());
let status = command.status().await?;
if !status.success() {
return Err(status.into());
}
let status = command.status().await?;
if !status.success() {
return Err(status.into());
}
Ok(())
}
}
impl TryFrom<Vec<StorePath>> for Profile {
type Error = NixError;
fn try_from(paths: Vec<StorePath>) -> NixResult<Self> {
if paths.is_empty() {
return Err(NixError::BadOutput {
output: String::from("There is no store path"),
});
}
if paths.len() > 1 {
return Err(NixError::BadOutput {
output: String::from("Build resulted in more than 1 store path"),
});
}
let path = paths.into_iter().next().unwrap();
Self::from_store_path(path)
}
}

View file

@ -5,8 +5,9 @@ use std::ops::Deref;
use std::fmt;
use serde::{Serialize, Deserialize};
use tokio::process::Command;
use super::{Host, NixResult, NixError};
use super::{Host, NixCommand, NixResult, NixError};
/// A Nix store path.
#[derive(Debug, Clone, Serialize, Deserialize)]
@ -27,12 +28,24 @@ impl StorePath {
}
}
/// Returns the immediate dependencies of the store path.
pub async fn references(&self) -> NixResult<Vec<StorePath>> {
let references = Command::new("nix-store")
.args(&["--query", "--references"])
.arg(&self.0)
.capture_output().await?
.trim_end().split('\n')
.map(|p| StorePath(PathBuf::from(p))).collect();
Ok(references)
}
/// Converts the store path into a store derivation.
pub fn into_derivation<T: TryFrom<Vec<StorePath>>>(self) -> Option<StoreDerivation<T>> {
pub fn into_derivation<T: TryFrom<Vec<StorePath>>>(self) -> NixResult<StoreDerivation<T>> {
if self.is_derivation() {
Some(StoreDerivation::<T>::from_store_path_unchecked(self))
Ok(StoreDerivation::<T>::from_store_path_unchecked(self))
} else {
None
Err(NixError::NotADerivation { store_path: self })
}
}
}
@ -64,6 +77,7 @@ impl From<StorePath> for PathBuf {
}
/// A store derivation (.drv) that will result in a T when built.
#[derive(Debug, Clone)]
pub struct StoreDerivation<T: TryFrom<Vec<StorePath>>>{
path: StorePath,
_target: PhantomData<T>,

View file

@ -135,15 +135,6 @@ impl Line {
}
}
/// Builder-like interface to set the line as an one-off output.
///
/// For SpinnerOutput, this will create a new bar that immediately
/// finishes with the style (success or failure).
pub fn one_off(mut self) -> Self {
self.one_off = true;
self
}
/// Builder-like interface to set the line as noisy.
pub fn noisy(mut self) -> Self {
self.noisy = true;

View file

@ -14,6 +14,7 @@ use super::job::JobHandle;
pub struct CommandExecution {
command: Command,
job: Option<JobHandle>,
hide_stdout: bool,
stdout: Option<String>,
stderr: Option<String>,
}
@ -23,6 +24,7 @@ impl CommandExecution {
Self {
command,
job: None,
hide_stdout: false,
stdout: None,
stderr: None,
}
@ -33,6 +35,11 @@ impl CommandExecution {
self.job = job;
}
/// Sets whether to hide stdout.
pub fn set_hide_stdout(&mut self, hide_stdout: bool) {
self.hide_stdout = hide_stdout;
}
/// Returns logs from the last invocation.
pub fn get_logs(&self) -> (Option<&String>, Option<&String>) {
(self.stdout.as_ref(), self.stderr.as_ref())
@ -52,8 +59,10 @@ impl CommandExecution {
let stdout = BufReader::new(child.stdout.take().unwrap());
let stderr = BufReader::new(child.stderr.take().unwrap());
let stdout_job = if self.hide_stdout { None } else { self.job.clone() };
let futures = join3(
capture_stream(stdout, self.job.clone(), false),
capture_stream(stdout, stdout_job, false),
capture_stream(stderr, self.job.clone(), true),
child.wait(),
);