Redesign deployment process (again)

We now ship Events from different parts of the deployment process
via a channel to a job monitor.
This commit is contained in:
Zhaofeng Li 2021-11-20 23:34:52 -08:00
parent 5c84134af3
commit 0cb3f8e968
30 changed files with 2861 additions and 1410 deletions

27
Cargo.lock generated
View file

@ -133,6 +133,7 @@ dependencies = [
"glob", "glob",
"hostname", "hostname",
"indicatif", "indicatif",
"itertools",
"lazy_static", "lazy_static",
"libc", "libc",
"log", "log",
@ -147,6 +148,7 @@ dependencies = [
"tokio", "tokio",
"tokio-test", "tokio-test",
"users", "users",
"uuid",
"validator", "validator",
] ]
@ -206,6 +208,12 @@ version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10" checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10"
[[package]]
name = "either"
version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457"
[[package]] [[package]]
name = "encode_unicode" name = "encode_unicode"
version = "0.3.6" version = "0.3.6"
@ -400,6 +408,15 @@ dependencies = [
"regex", "regex",
] ]
[[package]]
name = "itertools"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69ddb889f9d0d08a67338271fa9b62996bc788c7796a5c18cf057420aaed5eaf"
dependencies = [
"either",
]
[[package]] [[package]]
name = "itoa" name = "itoa"
version = "0.4.8" version = "0.4.8"
@ -952,6 +969,16 @@ dependencies = [
"log", "log",
] ]
[[package]]
name = "uuid"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc5cf98d8186244414c848017f0e2676b3fcb46807f6668a97dfe67359a3c4b7"
dependencies = [
"getrandom",
"serde",
]
[[package]] [[package]]
name = "validator" name = "validator"
version = "0.12.0" version = "0.12.0"

View file

@ -17,6 +17,7 @@ env_logger = "0.8.2"
futures = "0.3.8" futures = "0.3.8"
glob = "0.3.0" glob = "0.3.0"
hostname = "0.3.1" hostname = "0.3.1"
itertools = "0.10.1"
lazy_static = "1.4.0" lazy_static = "1.4.0"
libc = "0.2.81" libc = "0.2.81"
log = "0.4.11" log = "0.4.11"
@ -30,6 +31,7 @@ snafu = "0.6.10"
tempfile = "3.1.0" tempfile = "3.1.0"
tokio-test = "0.4.0" tokio-test = "0.4.0"
users = "0.11.0" users = "0.11.0"
uuid = { version = "0.8.2", features = ["serde", "v4"] }
validator = { version = "0.12", features = ["derive"] } validator = { version = "0.12", features = ["derive"] }
# For https://github.com/mitsuhiko/indicatif/pull/325 # For https://github.com/mitsuhiko/indicatif/pull/325

View file

@ -20,7 +20,7 @@ in rustPlatform.buildRustPackage rec {
src = lib.cleanSource ./.; src = lib.cleanSource ./.;
}; };
cargoSha256 = "sha256-DJ+8XeGyg2EQdnHjmzN37fIuYa7HH+unM27RFHXHaso="; cargoSha256 = "sha256-HGqecerb5LgnPhetqBYEmDKpJBkgzLS+iviVkDgVyGI=";
postInstall = lib.optionalString (stdenv.hostPlatform == stdenv.buildPlatform) '' postInstall = lib.optionalString (stdenv.hostPlatform == stdenv.buildPlatform) ''
mkdir completions mkdir completions

View file

@ -24,7 +24,7 @@
# Full user manual # Full user manual
manual = let manual = let
colmena = self.packages.${system}.colmena; colmena = self.packages.${system}.colmena;
evalNix = import ./src/nix/eval.nix { evalNix = import ./src/nix/hive/eval.nix {
hermetic = true; hermetic = true;
}; };
deploymentOptionsMd = (pkgs.nixosOptionsDoc { deploymentOptionsMd = (pkgs.nixosOptionsDoc {

View file

@ -1,20 +1,17 @@
use std::collections::HashMap;
use std::env; use std::env;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Arc;
use clap::{Arg, App, SubCommand, ArgMatches}; use clap::{Arg, App, SubCommand, ArgMatches};
use crate::nix::deployment::{ use crate::nix::deployment::{
Deployment, Deployment,
Goal, Goal,
Target, Options,
DeploymentOptions,
EvaluationNodeLimit, EvaluationNodeLimit,
ParallelismLimit, ParallelismLimit,
}; };
use crate::nix::NixError; use crate::progress::SimpleProgressOutput;
use crate::nix::host::local as localhost; use crate::nix::{NixError, NodeFilter};
use crate::util; use crate::util;
pub fn register_deploy_args<'a, 'b>(command: App<'a, 'b>) -> App<'a, 'b> { pub fn register_deploy_args<'a, 'b>(command: App<'a, 'b>) -> App<'a, 'b> {
@ -118,81 +115,31 @@ pub fn subcommand() -> App<'static, 'static> {
pub async fn run(_global_args: &ArgMatches<'_>, local_args: &ArgMatches<'_>) -> Result<(), NixError> { pub async fn run(_global_args: &ArgMatches<'_>, local_args: &ArgMatches<'_>) -> Result<(), NixError> {
let hive = util::hive_from_args(local_args).await?; let hive = util::hive_from_args(local_args).await?;
log::info!("Enumerating nodes...");
let all_nodes = hive.deployment_info().await?;
let nix_options = hive.nix_options().await?;
let selected_nodes = match local_args.value_of("on") {
Some(filter) => {
util::filter_nodes(&all_nodes, filter)
}
None => all_nodes.keys().cloned().collect(),
};
if selected_nodes.len() == 0 {
log::warn!("No hosts matched. Exiting...");
quit::with_code(2);
}
let ssh_config = env::var("SSH_CONFIG_FILE") let ssh_config = env::var("SSH_CONFIG_FILE")
.ok().map(PathBuf::from); .ok().map(PathBuf::from);
// FIXME: This is ugly :/ Make an enum wrapper for this fake "keys" goal let filter = if let Some(f) = local_args.value_of("on") {
let goal_arg = local_args.value_of("goal").unwrap(); Some(NodeFilter::new(f)?)
let goal = if goal_arg == "keys" {
Goal::Build
} else { } else {
Goal::from_str(goal_arg).unwrap() None
}; };
let build_only = goal == Goal::Build && goal_arg != "keys"; let goal_arg = local_args.value_of("goal").unwrap();
let goal = Goal::from_str(goal_arg).unwrap();
let mut targets = HashMap::new(); let targets = hive.select_nodes(filter, ssh_config, goal.requires_target_host()).await?;
for node in &selected_nodes { let n_targets = targets.len();
let config = all_nodes.get(node).unwrap();
let host = config.to_ssh_host();
match host {
Some(mut host) => {
if let Some(ssh_config) = ssh_config.as_ref() {
host.set_ssh_config(ssh_config.clone());
}
targets.insert( let mut output = SimpleProgressOutput::new(local_args.is_present("verbose"));
node.clone(), let progress = output.get_sender();
Target::new(host.upcast(), config.clone()),
);
}
None => {
if build_only {
targets.insert(
node.clone(),
Target::new(localhost(nix_options.clone()), config.clone()),
);
}
}
}
}
if targets.len() == all_nodes.len() { let mut deployment = Deployment::new(hive, targets, goal, progress);
log::info!("Selected all {} nodes.", targets.len());
} else if targets.len() == selected_nodes.len() {
log::info!("Selected {} out of {} hosts.", targets.len(), all_nodes.len());
} else {
log::info!("Selected {} out of {} hosts ({} skipped)", targets.len(), all_nodes.len(), selected_nodes.len() - targets.len());
}
if targets.len() == 0 { // FIXME: Configure limits
log::warn!("No selected nodes are accessible over SSH. Exiting..."); let options = {
quit::with_code(2); let mut options = Options::default();
}
let mut deployment = Deployment::new(hive, targets, goal);
let mut options = DeploymentOptions::default();
options.set_substituters_push(!local_args.is_present("no-substitutes")); options.set_substituters_push(!local_args.is_present("no-substitutes"));
options.set_gzip(!local_args.is_present("no-gzip")); options.set_gzip(!local_args.is_present("no-gzip"));
options.set_progress_bar(!local_args.is_present("verbose"));
options.set_upload_keys(!local_args.is_present("no-keys")); options.set_upload_keys(!local_args.is_present("no-keys"));
options.set_force_replace_unknown_profiles(local_args.is_present("force-replace-unknown-profiles")); options.set_force_replace_unknown_profiles(local_args.is_present("force-replace-unknown-profiles"));
@ -200,23 +147,28 @@ pub async fn run(_global_args: &ArgMatches<'_>, local_args: &ArgMatches<'_>) ->
options.set_create_gc_roots(true); options.set_create_gc_roots(true);
} }
options
};
deployment.set_options(options); deployment.set_options(options);
if local_args.is_present("no-keys") && goal_arg == "keys" { if local_args.is_present("no-keys") && goal == Goal::UploadKeys {
log::error!("--no-keys cannot be used when the goal is to upload keys"); log::error!("--no-keys cannot be used when the goal is to upload keys");
quit::with_code(1); quit::with_code(1);
} }
let mut parallelism_limit = ParallelismLimit::default(); let parallelism_limit = {
parallelism_limit.set_apply_limit({ let mut limit = ParallelismLimit::default();
limit.set_apply_limit({
let limit = local_args.value_of("parallel").unwrap().parse::<usize>().unwrap(); let limit = local_args.value_of("parallel").unwrap().parse::<usize>().unwrap();
if limit == 0 { if limit == 0 {
selected_nodes.len() // HACK n_targets
} else { } else {
local_args.value_of("parallel").unwrap().parse::<usize>().unwrap() limit
} }
}); });
deployment.set_parallelism_limit(parallelism_limit); limit
};
let evaluation_node_limit = match local_args.value_of("eval-node-limit").unwrap() { let evaluation_node_limit = match local_args.value_of("eval-node-limit").unwrap() {
"auto" => EvaluationNodeLimit::Heuristic, "auto" => EvaluationNodeLimit::Heuristic,
@ -229,19 +181,16 @@ pub async fn run(_global_args: &ArgMatches<'_>, local_args: &ArgMatches<'_>) ->
} }
} }
}; };
deployment.set_parallelism_limit(parallelism_limit);
deployment.set_evaluation_node_limit(evaluation_node_limit); deployment.set_evaluation_node_limit(evaluation_node_limit);
let deployment = Arc::new(deployment); let (deployment, output) = tokio::join!(
deployment.execute(),
output.run_until_completion(),
);
let success = if goal_arg == "keys" { deployment?; output?;
deployment.upload_keys().await
} else {
deployment.execute().await
};
if !success {
quit::with_code(10);
}
Ok(()) Ok(())
} }

View file

@ -1,6 +1,5 @@
use std::env; use std::env;
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc;
use clap::{Arg, App, SubCommand, ArgMatches}; use clap::{Arg, App, SubCommand, ArgMatches};
use tokio::fs; use tokio::fs;
@ -9,10 +8,11 @@ use tokio::process::Command;
use crate::nix::deployment::{ use crate::nix::deployment::{
Deployment, Deployment,
Goal, Goal,
Target, TargetNode,
DeploymentOptions, Options,
}; };
use crate::nix::{NixError, NodeName, host}; use crate::nix::{NixError, NodeName, host};
use crate::progress::SimpleProgressOutput;
use crate::util; use crate::util;
pub fn subcommand() -> App<'static, 'static> { pub fn subcommand() -> App<'static, 'static> {
@ -23,7 +23,7 @@ pub fn subcommand() -> App<'static, 'static> {
.long_help("Same as the targets for switch-to-configuration.\n\"push\" is noop in apply-local.") .long_help("Same as the targets for switch-to-configuration.\n\"push\" is noop in apply-local.")
.default_value("switch") .default_value("switch")
.index(1) .index(1)
.possible_values(&["push", "switch", "boot", "test", "dry-activate"])) .possible_values(&["push", "switch", "boot", "test", "dry-activate", "keys"]))
.arg(Arg::with_name("sudo") .arg(Arg::with_name("sudo")
.long("sudo") .long("sudo")
.help("Attempt to escalate privileges if not run as root")) .help("Attempt to escalate privileges if not run as root"))
@ -102,20 +102,21 @@ pub async fn run(_global_args: &ArgMatches<'_>, local_args: &ArgMatches<'_>) ->
}; };
let goal = Goal::from_str(local_args.value_of("goal").unwrap()).unwrap(); let goal = Goal::from_str(local_args.value_of("goal").unwrap()).unwrap();
let target: Target = { let target = {
if let Some(info) = hive.deployment_info_for(&hostname).await.unwrap() { if let Some(info) = hive.deployment_info_single(&hostname).await.unwrap() {
let nix_options = hive.nix_options().await.unwrap(); let nix_options = hive.nix_options().await.unwrap();
if !info.allows_local_deployment() { if !info.allows_local_deployment() {
log::error!("Local deployment is not enabled for host {}.", hostname.as_str()); log::error!("Local deployment is not enabled for host {}.", hostname.as_str());
log::error!("Hint: Set deployment.allowLocalDeployment to true."); log::error!("Hint: Set deployment.allowLocalDeployment to true.");
quit::with_code(2); quit::with_code(2);
} }
Target::new( TargetNode::new(
host::local(nix_options), hostname.clone(),
Some(host::local(nix_options)),
info.clone(), info.clone(),
) )
} else { } else {
log::error!("Host {} is not present in the Hive configuration.", hostname.as_str()); log::error!("Host \"{}\" is not present in the Hive configuration.", hostname.as_str());
quit::with_code(2); quit::with_code(2);
} }
}; };
@ -123,18 +124,20 @@ pub async fn run(_global_args: &ArgMatches<'_>, local_args: &ArgMatches<'_>) ->
let mut targets = HashMap::new(); let mut targets = HashMap::new();
targets.insert(hostname.clone(), target); targets.insert(hostname.clone(), target);
let mut deployment = Deployment::new(hive, targets, goal); let mut output = SimpleProgressOutput::new(local_args.is_present("verbose"));
let mut options = DeploymentOptions::default(); let progress = output.get_sender();
let mut deployment = Deployment::new(hive, targets, goal, progress);
let options = {
let mut options = Options::default();
options.set_upload_keys(!local_args.is_present("no-upload-keys")); options.set_upload_keys(!local_args.is_present("no-upload-keys"));
options.set_progress_bar(!local_args.is_present("verbose")); options
};
deployment.set_options(options); deployment.set_options(options);
let deployment = Arc::new(deployment); deployment.execute().await?;
let success = deployment.execute().await;
if !success {
quit::with_code(10);
}
Ok(()) Ok(())
} }

View file

@ -1,4 +1,3 @@
use std::collections::HashMap;
use std::env; use std::env;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Arc; use std::sync::Arc;
@ -7,9 +6,10 @@ use clap::{Arg, App, AppSettings, SubCommand, ArgMatches};
use futures::future::join_all; use futures::future::join_all;
use tokio::sync::Semaphore; use tokio::sync::Semaphore;
use crate::nix::NixError; use crate::nix::{NixError, NodeFilter};
use crate::progress::{Progress, OutputStyle}; use crate::job::{JobMonitor, JobState, JobType};
use crate::util::{self, CommandExecution}; use crate::progress::SimpleProgressOutput;
use crate::util;
pub fn subcommand() -> App<'static, 'static> { pub fn subcommand() -> App<'static, 'static> {
let command = SubCommand::with_name("exec") let command = SubCommand::with_name("exec")
@ -56,60 +56,17 @@ It's recommended to use -- to separate Colmena options from the command to run.
pub async fn run(_global_args: &ArgMatches<'_>, local_args: &ArgMatches<'_>) -> Result<(), NixError> { pub async fn run(_global_args: &ArgMatches<'_>, local_args: &ArgMatches<'_>) -> Result<(), NixError> {
let hive = util::hive_from_args(local_args).await?; let hive = util::hive_from_args(local_args).await?;
log::info!("Enumerating nodes...");
let all_nodes = hive.deployment_info().await?;
let selected_nodes = match local_args.value_of("on") {
Some(filter) => {
util::filter_nodes(&all_nodes, filter)
}
None => all_nodes.keys().cloned().collect(),
};
if selected_nodes.len() == 0 {
log::warn!("No hosts matched. Exiting...");
quit::with_code(2);
}
let ssh_config = env::var("SSH_CONFIG_FILE") let ssh_config = env::var("SSH_CONFIG_FILE")
.ok().map(PathBuf::from); .ok().map(PathBuf::from);
let mut hosts = HashMap::new(); let filter = if let Some(f) = local_args.value_of("on") {
for node in &selected_nodes { Some(NodeFilter::new(f)?)
let config = all_nodes.get(node).unwrap();
let host = config.to_ssh_host();
match host {
Some(mut host) => {
if let Some(ssh_config) = ssh_config.as_ref() {
host.set_ssh_config(ssh_config.clone());
}
hosts.insert(node.clone(), host);
}
None => {},
}
}
if hosts.len() == all_nodes.len() {
log::info!("Selected all {} nodes.", hosts.len());
} else if hosts.len() == selected_nodes.len() {
log::info!("Selected {} out of {} hosts.", hosts.len(), all_nodes.len());
} else { } else {
log::info!("Selected {} out of {} hosts ({} skipped)", hosts.len(), all_nodes.len(), selected_nodes.len() - hosts.len()); None
}
if hosts.len() == 0 {
log::warn!("No selected nodes are accessible over SSH. Exiting...");
quit::with_code(2);
}
let mut progress = if local_args.is_present("verbose") {
Progress::with_style(OutputStyle::Plain)
} else {
Progress::default()
}; };
let mut targets = hive.select_nodes(filter, ssh_config, true).await?;
let parallel_sp = Arc::new({ let parallel_sp = Arc::new({
let limit = local_args.value_of("parallel").unwrap() let limit = local_args.value_of("parallel").unwrap()
.parse::<usize>().unwrap(); .parse::<usize>().unwrap();
@ -121,52 +78,52 @@ pub async fn run(_global_args: &ArgMatches<'_>, local_args: &ArgMatches<'_>) ->
} }
}); });
let label_width = hosts.keys().map(|n| n.len()).max().unwrap();
progress.set_label_width(label_width);
let progress = Arc::new(progress);
let command: Arc<Vec<String>> = Arc::new(local_args.values_of("command").unwrap().map(|s| s.to_string()).collect()); let command: Arc<Vec<String>> = Arc::new(local_args.values_of("command").unwrap().map(|s| s.to_string()).collect());
progress.run(|progress| async move { let mut output = SimpleProgressOutput::new(local_args.is_present("verbose"));
let (monitor, meta) = JobMonitor::new(output.get_sender());
let meta = meta.run(|meta| async move {
let mut futures = Vec::new(); let mut futures = Vec::new();
for (name, host) in hosts.drain() { for (name, target) in targets.drain() {
let parallel_sp = parallel_sp.clone(); let parallel_sp = parallel_sp.clone();
let command = command.clone(); let command = command.clone();
let progress = progress.clone();
futures.push(async move { let mut host = target.into_host().unwrap();
let job = meta.create_job(JobType::Execute, vec![ name.clone() ])?;
futures.push(job.run_waiting(|job| async move {
let permit = match parallel_sp.as_ref() { let permit = match parallel_sp.as_ref() {
Some(sp) => Some(sp.acquire().await.unwrap()), Some(sp) => Some(sp.acquire().await.unwrap()),
None => None, None => None,
}; };
let progress = progress.create_task_progress(name.to_string()); job.state(JobState::Running)?;
let command_v: Vec<&str> = command.iter().map(|s| s.as_str()).collect(); let command_v: Vec<&str> = command.iter().map(|s| s.as_str()).collect();
let command = host.ssh(&command_v); host.set_job(Some(job));
let mut execution = CommandExecution::new(command); host.run_command(&command_v).await?;
execution.set_progress_bar(progress.clone());
match execution.run().await {
Ok(()) => {
progress.success("Exited");
}
Err(e) => {
if let NixError::NixFailure { exit_code } = e {
progress.failure(&format!("Exited with code {}", exit_code));
} else {
progress.failure(&format!("Error during execution: {}", e));
}
}
}
drop(permit); drop(permit);
});
Ok(())
}));
} }
join_all(futures).await; join_all(futures).await;
}).await;
Ok(())
});
let (meta, monitor, output) = tokio::join!(
meta,
monitor.run_until_completion(),
output.run_until_completion(),
);
meta?; monitor?; output?;
Ok(()) Ok(())
} }

View file

@ -3,8 +3,15 @@ use std::time::Duration;
use clap::{App, AppSettings, SubCommand, ArgMatches}; use clap::{App, AppSettings, SubCommand, ArgMatches};
use tokio::time; use tokio::time;
use crate::nix::NixError; use crate::job::{JobMonitor, JobType};
use crate::progress::{Progress, OutputStyle}; use crate::nix::{NixError, NixResult, NodeName};
use crate::progress::{ProgressOutput, spinner::SpinnerOutput};
macro_rules! node {
($n:expr) => {
NodeName::new($n.to_string()).unwrap()
}
}
pub fn subcommand() -> App<'static, 'static> { pub fn subcommand() -> App<'static, 'static> {
SubCommand::with_name("test-progress") SubCommand::with_name("test-progress")
@ -13,15 +20,50 @@ pub fn subcommand() -> App<'static, 'static> {
} }
pub async fn run(_global_args: &ArgMatches<'_>, _local_args: &ArgMatches<'_>) -> Result<(), NixError> { pub async fn run(_global_args: &ArgMatches<'_>, _local_args: &ArgMatches<'_>) -> Result<(), NixError> {
let progress = Progress::with_style(OutputStyle::Condensed); let mut output = SpinnerOutput::new();
let mut task = progress.create_task_progress(String::from("test")); let (monitor, meta) = JobMonitor::new(output.get_sender());
let meta_future = meta.run(|meta| async move {
meta.message("Message from meta job".to_string())?;
let nodes = vec![
node!("alpha"),
node!("beta"),
node!("gamma"),
node!("delta"),
node!("epsilon"),
];
let eval = meta.create_job(JobType::Evaluate, nodes)?;
let eval = eval.run(|job| async move {
for i in 0..10 { for i in 0..10 {
time::sleep(Duration::from_secs(2)).await; job.message(format!("eval: {}", i))?;
task.log(&format!("Very slow counter: {}", i)); time::sleep(Duration::from_secs(1)).await;
} }
task.success("Completed"); Ok(())
});
let build = meta.create_job(JobType::Build, vec![ node!("alpha"), node!("beta") ])?;
let build = build.run(|_| async move {
time::sleep(Duration::from_secs(5)).await;
Ok(())
});
let (_, _) = tokio::join!(eval, build);
Err(NixError::Unsupported) as NixResult<()>
});
let (monitor, output, ret) = tokio::join!(
monitor.run_until_completion(),
output.run_until_completion(),
meta_future,
);
monitor?; output?;
println!("Return Value -> {:?}", ret);
Ok(()) Ok(())
} }

945
src/job.rs Normal file
View file

@ -0,0 +1,945 @@
//! Job control.
//!
//! We use a channel to send Events from different futures to a job monitor,
//! which coordinates the display of progress onto the terminal.
use std::collections::HashMap;
use std::fmt::{self, Display};
use std::future::Future;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
use tokio::time;
use uuid::Uuid;
use crate::nix::{NixResult, NixError, NodeName, ProfileMap};
use crate::progress::{Sender as ProgressSender, Message as ProgressMessage, Line, LineStyle};
pub type Sender = UnboundedSender<Event>;
pub type Receiver = UnboundedReceiver<Event>;
/// A handle to a job.
pub type JobHandle = Arc<JobHandleInner>;
/// Maximum log lines to print for failures.
const LOG_CONTEXT_LINES: usize = 20;
/// An opaque job identifier.
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
pub struct JobId(Uuid);
/// Coordinator of all job states.
///
/// It receives event messages from jobs and updates the progress
/// spinners.
pub struct JobMonitor {
/// The receiving end of the mpsc channel.
receiver: Receiver,
/// Events received so far.
events: Vec<Event>,
/// Known jobs and their metadata.
jobs: HashMap<JobId, JobMetadata>,
/// ID of the meta job.
meta_job_id: JobId,
/// Sender to the spinner thread.
progress: Option<ProgressSender>,
/// Estimated max label size.
label_width: Option<usize>,
}
/// The state of a job.
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum JobState {
/// Waiting to begin.
///
/// Progress bar is not shown in this state.
Waiting,
/// Running.
Running,
/// Succeeded.
Succeeded,
/// Failed.
Failed,
}
/// The type of a job.
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum JobType {
/// Meta.
Meta,
/// Nix evaluation.
Evaluate,
/// Nix build.
Build,
/// Key uploading.
UploadKeys,
/// Pushing closure to a host.
Push,
/// Activating a system profile on a host.
Activate,
/// Executing an arbitrary command.
Execute,
/// Creating GC roots.
CreateGcRoots,
}
/// A handle to a job.
///
/// Usually used as `Arc<JobHandleInner>`/`JobHandle` which is clonable.
#[derive(Debug)]
pub struct JobHandleInner {
/// Unique ID of the job.
job_id: JobId,
/// Handle to the mpsc channel.
sender: Sender,
}
/// A handle to the meta job.
///
/// This handle cannot be cloned, and the wrapper is implemented differently
/// to signal to the monitor when it needs to shut down.
#[derive(Debug)]
pub struct MetaJobHandle {
/// Unique ID of the job.
job_id: JobId,
/// Handle to the mpsc channel.
sender: Sender,
}
/// Internal metadata of a job.
#[derive(Debug)]
struct JobMetadata {
job_id: JobId,
/// Type of the job.
job_type: JobType,
/// Custom human-readable name of the job.
friendly_name: Option<String>,
/// List of associated nodes.
///
/// Some jobs may be related to multiple nodes (e.g., building
/// several system profiles at once).
nodes: Vec<NodeName>,
/// Current state of this job.
state: JobState,
/// Current custom message of this job.
///
/// For jobs in the Failed state, this is the error.
/// For jobs in the Succeeded state, this might contain a custom
/// message.
custom_message: Option<String>,
/// Last human-readable message from the job.
///
/// This is so we can quickly repaint without needing to filter
/// through the event logs.
last_message: Option<String>,
}
/// Message to create a new job.
#[derive(Debug)]
pub struct JobCreation {
/// Type of the job.
job_type: JobType,
/// Custom human-readable name of the job.
friendly_name: Option<String>,
/// List of associated nodes.
nodes: Vec<NodeName>,
}
/// An event message sent via the mpsc channel.
#[derive(Debug)]
pub struct Event {
/// Unique ID of the job.
job_id: JobId,
/// Event payload.
payload: EventPayload,
}
/// The payload of an event.
#[derive(Debug)]
pub enum EventPayload {
/// The job is created.
Creation(JobCreation),
/// The job succeeded with a custom message.
SuccessWithMessage(String),
/// The job failed.
///
/// We can't pass the NixError because the wrapper needs to
/// be able to return it as-is.
Failure(String),
/// The job was no-op.
///
/// This probably means that some precondition wasn't met and
/// this job didn't make any changes.
///
/// This puts the job in the Succeeded state but causes the
/// progress spinner to disappear.
Noop(String),
/// The job wants to transition to a new state.
NewState(JobState),
/// The job built a set of system profiles.
ProfilesBuilt(ProfileMap),
/// The child process printed a line to stdout.
ChildStdout(String),
/// The child process printed a line to stderr.
ChildStderr(String),
/// A normal message from the job itself.
Message(String),
/// The monitor should shut down.
///
/// This is sent at the end of the meta job regardless of the outcome.
ShutdownMonitor,
}
struct JobStats {
waiting: usize,
running: usize,
succeeded: usize,
failed: usize,
}
impl JobId {
pub fn new() -> Self {
Self(Uuid::new_v4())
}
}
impl JobMonitor {
/// Creates a new job monitor and a meta job.
pub fn new(progress: Option<ProgressSender>) -> (Self, MetaJobHandle) {
let (sender, receiver) = mpsc::unbounded_channel();
let meta_job_id = JobId::new();
let mut monitor = Self {
receiver,
events: Vec::new(),
jobs: HashMap::new(),
meta_job_id,
progress,
label_width: None,
};
let metadata = JobMetadata {
job_id: meta_job_id,
job_type: JobType::Meta,
friendly_name: None,
nodes: Vec::new(),
state: JobState::Running,
last_message: None,
custom_message: None,
};
monitor.jobs.insert(meta_job_id, metadata);
let job = MetaJobHandle {
job_id: meta_job_id,
sender,
};
(monitor, job)
}
/// Sets the max label width.
pub fn set_label_width(&mut self, label_width: usize) {
self.label_width = Some(label_width);
}
/// Starts the monitor.
pub async fn run_until_completion(mut self) -> NixResult<Self> {
if let Some(width) = self.label_width {
if let Some(sender) = &self.progress {
sender.send(ProgressMessage::HintLabelWidth(width)).unwrap();
}
}
loop {
let message = self.receiver.recv().await;
if message.is_none() {
// All sending halves have been closed - We are done!
return self.finish().await;
}
let message = message.unwrap();
match &message.payload {
EventPayload::Creation(creation) => {
let metadata = JobMetadata {
job_id: message.job_id,
job_type: creation.job_type,
friendly_name: creation.friendly_name.clone(),
nodes: creation.nodes.clone(),
state: JobState::Waiting,
last_message: None,
custom_message: None,
};
let existing = self.jobs.insert(message.job_id, metadata);
assert!(existing.is_none());
}
EventPayload::ShutdownMonitor => {
// The meta job has returned - We are done!
assert_eq!(self.meta_job_id, message.job_id);
return self.finish().await;
}
EventPayload::NewState(new_state) => {
self.update_job_state(message.job_id, *new_state, None, false);
if message.job_id != self.meta_job_id {
self.print_job_stats();
}
}
EventPayload::SuccessWithMessage(custom_message) => {
let custom_message = Some(custom_message.clone());
self.update_job_state(message.job_id, JobState::Succeeded, custom_message, false);
if message.job_id != self.meta_job_id {
self.print_job_stats();
}
}
EventPayload::Noop(custom_message) => {
let custom_message = Some(custom_message.clone());
self.update_job_state(message.job_id, JobState::Succeeded, custom_message, true);
if message.job_id != self.meta_job_id {
self.print_job_stats();
}
}
EventPayload::Failure(error) => {
let error = Some(error.clone());
self.update_job_state(message.job_id, JobState::Failed, error, false);
if message.job_id != self.meta_job_id {
self.print_job_stats();
}
}
EventPayload::ProfilesBuilt(profiles) => {
if let Some(sender) = &self.progress {
for (name, profile) in profiles.iter() {
let text = format!("Built {:?}", profile.as_path());
let line = Line::new(message.job_id, text)
.label(name.as_str().to_string())
.one_off()
.style(LineStyle::Success);
let pm = self.get_print_message(message.job_id, line);
sender.send(pm).unwrap();
}
}
}
EventPayload::ChildStdout(m) | EventPayload::ChildStderr(m) | EventPayload::Message(m) => {
if let Some(sender) = &self.progress {
let metadata = &self.jobs[&message.job_id];
let line = metadata.get_line(m.clone());
let pm = self.get_print_message(message.job_id, line);
sender.send(pm).unwrap();
}
}
}
self.events.push(message);
}
}
/// Updates the state of a job.
fn update_job_state(&mut self,
job_id: JobId,
new_state: JobState,
message: Option<String>,
noop: bool,
) {
let mut metadata = self.jobs.remove(&job_id).unwrap();
let old_state = metadata.state;
if old_state == new_state {
return;
} else if old_state.is_final() {
log::debug!("Tried to update the state of a finished job");
return;
}
metadata.state = new_state;
if message.is_some() {
metadata.custom_message = message.clone();
}
let metadata = if new_state == JobState::Waiting {
// Waiting state doesn't generate user-visible output
metadata
} else {
if let Some(sender) = &self.progress {
let text = if new_state == JobState::Succeeded {
metadata.custom_message.clone()
.or_else(|| metadata.describe_state_transition())
} else {
metadata.describe_state_transition()
};
if let Some(text) = text {
let line = if noop {
// Spinner should disappear
metadata.get_line(text).style(LineStyle::SuccessNoop)
} else {
metadata.get_line(text)
};
let message = self.get_print_message(job_id, line);
sender.send(message).unwrap();
}
}
metadata
};
self.jobs.insert(job_id, metadata);
}
/// Updates the user-visible job statistics output.
fn print_job_stats(&self) {
if let Some(sender) = &self.progress {
let stats = self.get_job_stats();
let text = format!("{}", stats);
let line = self.jobs[&self.meta_job_id].get_line(text)
.noisy();
let message = ProgressMessage::PrintMeta(line);
sender.send(message).unwrap();
}
}
/// Returns jobs statistics.
fn get_job_stats(&self) -> JobStats {
let mut waiting = 0;
let mut running = 0;
let mut succeeded = 0;
let mut failed = 0;
for job in self.jobs.values() {
if job.job_id == self.meta_job_id {
continue;
}
match job.state {
JobState::Waiting => {
waiting += 1;
}
JobState::Running => {
running += 1;
}
JobState::Succeeded => {
succeeded += 1;
}
JobState::Failed => {
failed += 1;
}
}
}
JobStats {
waiting,
running,
succeeded,
failed,
}
}
fn get_print_message(&self, job_id: JobId, line: Line) -> ProgressMessage {
if job_id == self.meta_job_id {
ProgressMessage::PrintMeta(line)
} else {
ProgressMessage::Print(line)
}
}
/// Shows human-readable summary and performs cleanup.
async fn finish(mut self) -> NixResult<Self> {
if let Some(sender) = self.progress.take() {
sender.send(ProgressMessage::Complete).unwrap();
}
// HACK
time::sleep(Duration::from_secs(1)).await;
for job in self.jobs.values() {
if job.state == JobState::Failed {
let logs: Vec<&Event> = self.events.iter().filter(|e| e.job_id == job.job_id).collect();
let last_logs: Vec<&Event> = logs.into_iter().rev().take(LOG_CONTEXT_LINES).rev().collect();
log::error!("{} - Last {} lines of logs:", job.get_failure_summary(), last_logs.len());
for event in last_logs {
log::error!("{}", event.payload);
}
}
}
Ok(self)
}
}
impl JobState {
/// Returns whether this state is final.
pub fn is_final(&self) -> bool {
match self {
Self::Failed | Self::Succeeded => true,
_ => false,
}
}
}
impl JobHandleInner {
/// Creates a new job with a distinct ID.
///
/// This sends out a Creation message with the metadata.
pub fn create_job(&self, job_type: JobType, nodes: Vec<NodeName>) -> NixResult<JobHandle> {
let job_id = JobId::new();
let creation = JobCreation {
friendly_name: None,
job_type,
nodes,
};
if job_type == JobType::Meta {
return Err(NixError::Unknown { message: "Cannot create a meta job!".to_string() });
}
let new_handle = Arc::new(Self {
job_id,
sender: self.sender.clone(),
});
new_handle.send_payload(EventPayload::Creation(creation))?;
Ok(new_handle)
}
/// Runs a closure, automatically updating the job monitor based on the result.
///
/// This immediately transitions the state to Running.
pub async fn run<F, U, T>(self: Arc<Self>, f: U) -> NixResult<T>
where U: FnOnce(Arc<Self>) -> F,
F: Future<Output = NixResult<T>>,
{
self.run_internal(f, true).await
}
/// Runs a closure, automatically updating the job monitor based on the result.
///
/// This does not immediately transition the state to Running.
pub async fn run_waiting<F, U, T>(self: Arc<Self>, f: U) -> NixResult<T>
where U: FnOnce(Arc<Self>) -> F,
F: Future<Output = NixResult<T>>,
{
self.run_internal(f, false).await
}
/// Sends a line of child stdout to the job monitor.
pub fn stdout(&self, output: String) -> NixResult<()> {
self.send_payload(EventPayload::ChildStdout(output))
}
/// Sends a line of child stderr to the job monitor.
pub fn stderr(&self, output: String) -> NixResult<()> {
self.send_payload(EventPayload::ChildStderr(output))
}
/// Sends a human-readable message to the job monitor.
pub fn message(&self, message: String) -> NixResult<()> {
self.send_payload(EventPayload::Message(message))
}
/// Transitions to a new job state.
pub fn state(&self, new_state: JobState) -> NixResult<()> {
self.send_payload(EventPayload::NewState(new_state))
}
/// Marks the job as successful, with a custom message.
pub fn success_with_message(&self, message: String) -> NixResult<()> {
self.send_payload(EventPayload::SuccessWithMessage(message))
}
/// Marks the job as noop.
pub fn noop(&self, message: String) -> NixResult<()> {
self.send_payload(EventPayload::Noop(message))
}
/// Marks the job as failed.
pub fn failure(&self, error: &NixError) -> NixResult<()> {
self.send_payload(EventPayload::Failure(error.to_string()))
}
/// Sends a set of built profiles.
pub fn profiles_built(&self, profiles: ProfileMap) -> NixResult<()> {
self.send_payload(EventPayload::ProfilesBuilt(profiles))
}
/// Runs a closure, automatically updating the job monitor based on the result.
async fn run_internal<F, U, T>(self: Arc<Self>, f: U, report_running: bool) -> NixResult<T>
where U: FnOnce(Arc<Self>) -> F,
F: Future<Output = NixResult<T>>,
{
if report_running {
// Tell monitor we are starting
self.send_payload(EventPayload::NewState(JobState::Running))?;
}
match f(self.clone()).await {
Ok(val) => {
// Success!
self.state(JobState::Succeeded)?;
Ok(val)
}
Err(e) => {
self.failure(&e)?;
Err(e)
}
}
}
/// Sends an event to the job monitor.
fn send_payload(&self, payload: EventPayload) -> NixResult<()> {
if payload.privileged() {
panic!("Tried to send privileged payload with JobHandle");
}
let event = Event::new(self.job_id, payload);
self.sender.send(event)
.map_err(|e| NixError::unknown(Box::new(e)))?;
Ok(())
}
}
impl MetaJobHandle {
/// Runs a closure, automatically updating the job monitor based on the result.
pub async fn run<F, U, T>(self, f: U) -> NixResult<T>
where U: FnOnce(JobHandle) -> F,
F: Future<Output = NixResult<T>>,
{
let normal_handle = Arc::new(JobHandleInner {
job_id: self.job_id,
sender: self.sender.clone(),
});
match f(normal_handle).await {
Ok(val) => {
self.send_payload(EventPayload::NewState(JobState::Succeeded))?;
self.send_payload(EventPayload::ShutdownMonitor)?;
Ok(val)
}
Err(e) => {
self.send_payload(EventPayload::Failure(e.to_string()))?;
self.send_payload(EventPayload::ShutdownMonitor)?;
Err(e)
}
}
}
/// Sends an event to the job monitor.
fn send_payload(&self, payload: EventPayload) -> NixResult<()> {
let event = Event::new(self.job_id, payload);
self.sender.send(event)
.map_err(|e| NixError::unknown(Box::new(e)))?;
Ok(())
}
}
impl JobMetadata {
/// Returns a short human-readable label.
fn get_label(&self) -> &str {
if self.job_type == JobType::Meta {
""
} else if self.nodes.len() != 1 {
"(...)"
} else {
self.nodes[0].as_str()
}
}
/// Returns a Line struct with the given text.
fn get_line(&self, text: String) -> Line {
let style = match self.state {
JobState::Succeeded => LineStyle::Success,
JobState::Failed => LineStyle::Failure,
_ => LineStyle::Normal,
};
Line::new(self.job_id, text)
.style(style)
.label(self.get_label().to_string())
}
/// Returns a human-readable string describing the transition to the current state.
fn describe_state_transition(&self) -> Option<String> {
if self.state == JobState::Waiting {
return None;
}
let node_list = describe_node_list(&self.nodes)
.unwrap_or_else(|| "some node(s)".to_string());
let message = self.custom_message.as_ref().map(|e| e.as_str())
.unwrap_or("No message");
Some(match (self.job_type, self.state) {
(JobType::Meta, JobState::Succeeded) => format!("All done!"),
(JobType::Evaluate, JobState::Running) => format!("Evaluating {}", node_list),
(JobType::Evaluate, JobState::Succeeded) => format!("Evaluated {}", node_list),
(JobType::Evaluate, JobState::Failed) => format!("Evaluation failed: {}", message),
(JobType::Build, JobState::Running) => format!("Building {}", node_list),
(JobType::Build, JobState::Succeeded) => format!("Built {}", node_list),
(JobType::Build, JobState::Failed) => format!("Build failed: {}", message),
(JobType::Push, JobState::Running) => format!("Pushing system closure"),
(JobType::Push, JobState::Succeeded) => format!("Pushed system closure"),
(JobType::Push, JobState::Failed) => format!("Push failed: {}", message),
(JobType::UploadKeys, JobState::Running) => format!("Uploading keys"),
(JobType::UploadKeys, JobState::Succeeded) => format!("Uploaded keys"),
(JobType::UploadKeys, JobState::Failed) => format!("Key upload failed: {}", message),
(JobType::Activate, JobState::Running) => format!("Activating system profile"),
(JobType::Activate, JobState::Failed) => format!("Activation failed: {}", message),
(_, JobState::Failed) => format!("Failed: {}", message),
(_, JobState::Succeeded) => format!("Succeeded"),
_ => "".to_string(),
})
}
/// Returns a human-readable string describing a failed job for use in the summary.
fn get_failure_summary(&self) -> String {
let node_list = describe_node_list(&self.nodes)
.unwrap_or_else(|| "some node(s)".to_string());
match self.job_type {
JobType::Evaluate => format!("Failed to evaluate {}", node_list),
JobType::Build => format!("Failed to build {}", node_list),
JobType::Push => format!("Failed to push system closure to {}", node_list),
JobType::UploadKeys => format!("Failed to upload keys to {}", node_list),
JobType::Activate => format!("Failed to deploy to {}", node_list),
JobType::Meta => format!("Failed to complete requested operation"),
_ => format!("Failed to complete job on {}", node_list),
}
}
}
impl Event {
/// Creates a new event.
fn new(job_id: JobId, payload: EventPayload) -> Self {
Self { job_id, payload }
}
}
impl EventPayload {
fn privileged(&self) -> bool {
match self {
Self::ShutdownMonitor => true,
_ => false,
}
}
}
impl Display for EventPayload {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Event")?;
match self {
EventPayload::ChildStdout(o) => write!(f, "[ stdout]: {}", o)?,
EventPayload::ChildStderr(o) => write!(f, "[ stderr]: {}", o)?,
EventPayload::Message(m) => write!(f, "[ message]: {}", m)?,
EventPayload::Creation(_) => write!(f, "[ created]")?,
EventPayload::NewState(s) => write!(f, "[ state] {:?}", s)?,
EventPayload::SuccessWithMessage(m) => write!(f, "[ success]: {}", m)?,
EventPayload::Noop(m) => write!(f, "[ noop]: {}", m)?,
EventPayload::Failure(e) => write!(f, "[ failure]: {}", e)?,
EventPayload::ShutdownMonitor => write!(f, "[shutdown]")?,
EventPayload::ProfilesBuilt(pm) => write!(f, "[ built]: {:?}", pm)?,
}
Ok(())
}
}
impl Display for JobStats {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let mut first = true;
fn comma(f: &mut fmt::Formatter, first: &mut bool) -> fmt::Result {
if *first {
*first = false;
return Ok(());
}
write!(f, ", ")
}
if self.running != 0 {
comma(f, &mut first)?;
write!(f, "{} running", self.running)?;
}
if self.succeeded != 0 {
comma(f, &mut first)?;
write!(f, "{} succeeded", self.succeeded)?;
}
if self.failed != 0 {
comma(f, &mut first)?;
write!(f, "{} failed", self.failed)?;
}
if self.waiting != 0 {
comma(f, &mut first)?;
write!(f, "{} waiting", self.waiting)?;
}
Ok(())
}
}
/// Returns a textual description of a list of nodes.
///
/// Example: "alpha, beta, and 5 other nodes"
fn describe_node_list(nodes: &[NodeName]) -> Option<String> {
let rough_limit = 40;
let other_text = ", and XX other nodes";
let total = nodes.len();
if total == 0 {
return None;
}
let mut s = String::new();
let mut iter = nodes.iter().enumerate().peekable();
while let Some((_, node)) = iter.next() {
let next = iter.peek();
if s.len() != 0 {
if next.is_none() {
s += if total > 2 { ", and " } else { " and " };
} else {
s += ", "
}
}
s += node.as_str();
if next.is_none() {
break;
}
let (idx, next) = next.unwrap();
let remaining = rough_limit - s.len();
if next.len() + other_text.len() >= remaining {
s += &format!(", and {} other nodes", total - idx);
break;
}
}
Some(s)
}
#[cfg(test)]
mod tests {
use super::*;
use tokio_test::block_on;
macro_rules! node {
($n:expr) => {
NodeName::new($n.to_string()).unwrap()
}
}
#[test]
fn test_monitor_event() {
block_on(async {
let (monitor, meta) = JobMonitor::new(None);
let meta = meta.run(|job: JobHandle| async move {
job.message("hello world".to_string())?;
let eval_job = job.create_job(JobType::Evaluate, vec![ node!("alpha") ])?;
eval_job.run(|job| async move {
job.stdout("child stdout".to_string())?;
Ok(())
}).await?;
Err(NixError::Unsupported) as NixResult<()>
});
// Run until completion
let (ret, monitor) = tokio::join!(
meta,
monitor.run_until_completion(),
);
match ret {
Err(NixError::Unsupported) => (),
_ => {
panic!("Wrapper must return error as-is");
}
}
let monitor = monitor.unwrap();
assert_eq!(2, monitor.jobs.len());
for event in monitor.events.iter() {
match &event.payload {
EventPayload::Message(m) => {
assert_eq!("hello world", m);
}
EventPayload::ChildStdout(m) => {
assert_eq!("child stdout", m);
}
_ => {}
}
}
});
}
}

View file

@ -1,9 +1,12 @@
#![deny(unused_must_use)]
use std::env; use std::env;
mod nix; mod nix;
mod cli; mod cli;
mod command; mod command;
mod progress; mod progress;
mod job;
mod troubleshooter; mod troubleshooter;
mod util; mod util;

View file

@ -1,796 +0,0 @@
use std::cmp::max;
use std::collections::HashMap;
use std::sync::Arc;
use futures::future::join_all;
use tokio::sync::{Mutex, Semaphore};
use super::{Hive, Host, CopyOptions, NodeName, NodeConfig, Profile, StoreDerivation, ProfileMap, host};
use super::key::{Key, UploadAt};
use crate::progress::{Progress, TaskProgress, OutputStyle};
/// Amount of RAM reserved for the system, in MB.
const EVAL_RESERVE_MB: u64 = 1024;
/// Estimated amount of RAM needed to evaluate one host, in MB.
const EVAL_PER_HOST_MB: u64 = 512;
const BATCH_OPERATION_LABEL: &'static str = "(...)";
macro_rules! set_up_batch_progress_bar {
($progress:ident, $style:ident, $chunk:ident, $single_text:expr, $batch_text:expr) => {{
if $chunk.len() == 1 {
let mut bar = $progress.create_task_progress($chunk[0].to_string());
bar.log($single_text);
bar
} else {
let mut bar = $progress.create_task_progress(BATCH_OPERATION_LABEL.to_string());
bar.log(&format!($batch_text, $chunk.len()));
bar
}
}};
}
#[derive(Debug, Copy, Clone, PartialEq)]
pub enum Goal {
/// Build the configurations only.
Build,
/// Push the closures only.
Push,
/// Make the configuration the boot default and activate now.
Switch,
/// Make the configuration the boot default.
Boot,
/// Activate the configuration, but don't make it the boot default.
Test,
/// Show what would be done if this configuration were activated.
DryActivate,
}
impl Goal {
pub fn from_str(s: &str) -> Option<Self> {
match s {
"build" => Some(Self::Build),
"push" => Some(Self::Push),
"switch" => Some(Self::Switch),
"boot" => Some(Self::Boot),
"test" => Some(Self::Test),
"dry-activate" => Some(Self::DryActivate),
_ => None,
}
}
pub fn as_str(&self) -> Option<&'static str> {
use Goal::*;
match self {
Build => None,
Push => None,
Switch => Some("switch"),
Boot => Some("boot"),
Test => Some("test"),
DryActivate => Some("dry-activate"),
}
}
pub fn success_str(&self) -> Option<&'static str> {
use Goal::*;
match self {
Build => Some("Configuration built"),
Push => Some("Pushed"),
Switch => Some("Activation successful"),
Boot => Some("Will be activated next boot"),
Test => Some("Activation successful (test)"),
DryActivate => Some("Dry activation successful"),
}
}
pub fn should_switch_profile(&self) -> bool {
use Goal::*;
match self {
Boot | Switch => true,
_ => false,
}
}
pub fn requires_activation(&self) -> bool {
use Goal::*;
match self {
Build | Push => false,
_ => true,
}
}
}
/// Internal deployment stages.
#[derive(Debug)]
enum Stage {
Evaluate(Vec<NodeName>),
Build(Vec<NodeName>),
Apply(NodeName),
}
/// Results of a deployment to a node.
#[derive(Debug)]
struct DeploymentResult {
/// Stage in which the deployment ended.
stage: Stage,
/// Whether the deployment succeeded or not.
success: bool,
/// Unstructured logs of the deployment.
logs: Option<String>,
}
impl DeploymentResult {
fn success(stage: Stage, logs: Option<String>) -> Self {
Self {
stage,
success: true,
logs,
}
}
fn failure(stage: Stage, logs: Option<String>) -> Self {
Self {
stage,
success: false,
logs,
}
}
fn is_successful(&self) -> bool {
self.success
}
fn print(&self) {
use Stage::*;
if self.is_successful() {
unimplemented!();
}
match &self.stage {
Evaluate(nodes) => {
self.print_failed_nodes("Evaluation of", &nodes, true);
}
Build(nodes) => {
self.print_failed_nodes("Build of", &nodes, true);
}
Apply(node) => {
self.print_failed_nodes("Deployment to", &vec![node.clone()], false);
}
}
}
fn print_failed_nodes(&self, prefix: &'static str, nodes: &Vec<NodeName>, full_logs: bool) {
let msg = if nodes.len() == 1 {
format!("{} {} failed.", prefix, nodes[0].as_str())
} else {
format!("{} {} nodes failed.", prefix, nodes.len())
};
if let Some(logs) = self.logs.as_ref() {
let mut lines = logs.split("\n").collect::<Vec<&str>>();
if full_logs {
log::error!("{} Logs:", msg);
} else {
lines = lines.drain(..).rev().take(10).rev().collect();
log::error!("{} Last {} lines of logs:", msg, lines.len());
}
for line in lines {
log::error!("{}", line);
}
}
}
}
/// A deployment target.
#[derive(Debug)]
pub struct Target {
host: Box<dyn Host>,
config: NodeConfig,
}
impl Target {
pub fn new(host: Box<dyn Host>, config: NodeConfig) -> Self {
Self { host, config }
}
}
#[derive(Debug)]
pub struct Deployment {
hive: Hive,
goal: Goal,
target_names: Vec<NodeName>,
targets: Mutex<HashMap<NodeName, Target>>,
label_width: usize,
parallelism_limit: ParallelismLimit,
evaluation_node_limit: EvaluationNodeLimit,
options: DeploymentOptions,
results: Mutex<Vec<DeploymentResult>>,
}
impl Deployment {
pub fn new(hive: Hive, targets: HashMap<NodeName, Target>, goal: Goal) -> Self {
let target_names: Vec<NodeName> = targets.keys().cloned().collect();
let label_width = if let Some(len) = target_names.iter().map(|n| n.len()).max() {
max(BATCH_OPERATION_LABEL.len(), len)
} else {
BATCH_OPERATION_LABEL.len()
};
Self {
hive,
goal,
target_names,
targets: Mutex::new(targets),
label_width,
parallelism_limit: ParallelismLimit::default(),
evaluation_node_limit: EvaluationNodeLimit::default(),
options: DeploymentOptions::default(),
results: Mutex::new(Vec::new()),
}
}
pub fn set_options(&mut self, options: DeploymentOptions) {
self.options = options;
}
pub fn set_parallelism_limit(&mut self, limit: ParallelismLimit) {
self.parallelism_limit = limit;
}
pub fn set_evaluation_node_limit(&mut self, limit: EvaluationNodeLimit) {
self.evaluation_node_limit = limit;
}
/// Uploads keys only (user-facing)
pub async fn upload_keys(self: Arc<Self>) -> bool {
let progress = {
let mut progress = Progress::default();
progress.set_label_width(self.label_width);
Arc::new(progress)
};
let arc_self = self.clone();
{
let arc_self = self.clone();
progress.run(|progress| async move {
let mut futures = Vec::new();
for node in self.target_names.iter() {
let node = node.to_owned();
let mut target = {
let mut targets = arc_self.targets.lock().await;
targets.remove(&node).unwrap()
};
let arc_self = self.clone();
let progress = progress.clone();
futures.push(async move {
let permit = arc_self.parallelism_limit.apply.acquire().await.unwrap();
let mut task = progress.create_task_progress(node.to_string());
task.log("Uploading keys...");
if let Err(e) = target.host.upload_keys(&target.config.keys, true).await {
task.failure_err(&e);
let mut results = arc_self.results.lock().await;
let stage = Stage::Apply(node);
let logs = target.host.dump_logs().await.map(|s| s.to_string());
results.push(DeploymentResult::failure(stage, logs));
return;
} else {
task.success("Keys uploaded");
}
drop(permit);
});
}
join_all(futures).await
}).await;
}
arc_self.print_logs().await;
arc_self.all_successful().await
}
/// Executes the deployment (user-facing)
///
/// Self must be wrapped inside an Arc.
pub async fn execute(self: Arc<Self>) -> bool {
let progress = {
let mut progress = if !self.options.progress_bar {
Progress::with_style(OutputStyle::Plain)
} else {
Progress::default()
};
progress.set_label_width(self.label_width);
Arc::new(progress)
};
let arc_self = self.clone();
{
let arc_self = self.clone();
let eval_limit = arc_self.clone().eval_limit();
progress.run(|progress| async move {
let mut futures = Vec::new();
for chunk in self.target_names.chunks(eval_limit) {
let arc_self = arc_self.clone();
let progress = progress.clone();
// FIXME: Eww
let chunk: Vec<NodeName> = chunk.iter().map(|s| s.clone()).collect();
futures.push(async move {
let drv = {
// Evaluation phase
let permit = arc_self.parallelism_limit.evaluation.acquire().await.unwrap();
let bar = set_up_batch_progress_bar!(progress, style, chunk,
"Evaluating configuration...",
"Evaluating configurations for {} nodes"
);
let arc_self = arc_self.clone();
let drv = match arc_self.eval_profiles(&chunk, bar).await {
Some(drv) => drv,
None => {
return;
}
};
drop(permit);
drv
};
let profiles = {
// Build phase
let permit = arc_self.parallelism_limit.build.acquire().await.unwrap();
let bar = set_up_batch_progress_bar!(progress, style, chunk,
"Building configuration...",
"Building configurations for {} nodes"
);
let goal = arc_self.goal;
let profiles = arc_self.clone().build_profiles(&chunk, drv, bar.clone()).await;
let profiles = match profiles {
Some(profiles) => profiles,
None => {
return;
}
};
let build_elapsed = bar.get_elapsed();
bar.success_quiet();
if goal == Goal::Build {
for (node, profile) in profiles.iter() {
let mut bar = progress.create_task_progress(node.to_string());
if let Some(elapsed) = build_elapsed {
bar.set_elapsed(elapsed);
}
bar.success(&format!("Built {:?}", profile.as_path()));
}
}
if arc_self.options.create_gc_roots {
// Create GC roots
if let Some(dir) = arc_self.hive.context_dir() {
let base = dir.join(".gcroots");
if let Err(e) = profiles.create_gc_roots(&base).await {
let bar = progress.create_task_progress(BATCH_OPERATION_LABEL.to_string());
bar.failure(&format!("Failed to create GC roots: {:?}", e));
}
}
}
drop(permit);
profiles
};
// Should we continue?
if arc_self.goal == Goal::Build {
return;
}
// Apply phase
let mut futures = Vec::new();
for node in chunk {
let arc_self = arc_self.clone();
let progress = progress.clone();
let target = {
let mut targets = arc_self.targets.lock().await;
targets.remove(&node).unwrap()
};
let profile = profiles.get(&node).cloned()
.expect(&format!("Somehow profile for {} was not built", node.as_str()));
futures.push(async move {
arc_self.apply_profile(&node, target, profile, progress).await
});
}
join_all(futures).await;
});
}
join_all(futures).await;
}).await;
}
arc_self.print_logs().await;
arc_self.all_successful().await
}
async fn all_successful(&self) -> bool {
let results = self.results.lock().await;
results.iter().filter(|r| !r.is_successful()).count() == 0
}
async fn print_logs(&self) {
let results = self.results.lock().await;
for result in results.iter() {
if !result.is_successful() {
result.print();
}
}
}
async fn eval_profiles(self: Arc<Self>, chunk: &Vec<NodeName>, progress: TaskProgress) -> Option<StoreDerivation<ProfileMap>> {
let (eval, logs) = self.hive.eval_selected(&chunk, progress.clone()).await;
match eval {
Ok(drv) => {
progress.success_quiet();
Some(drv)
}
Err(e) => {
progress.failure(&format!("Evalation failed: {}", e));
let mut results = self.results.lock().await;
let stage = Stage::Evaluate(chunk.clone());
results.push(DeploymentResult::failure(stage, logs));
None
}
}
}
async fn build_profiles(self: Arc<Self>, chunk: &Vec<NodeName>, derivation: StoreDerivation<ProfileMap>, progress: TaskProgress) -> Option<ProfileMap> {
let nix_options = self.hive.nix_options().await.unwrap();
// FIXME: Remote build?
let mut builder = host::local(nix_options);
builder.set_progress_bar(progress.clone());
match derivation.realize(&mut *builder).await {
Ok(profiles) => {
progress.success("Build successful");
let mut results = self.results.lock().await;
let stage = Stage::Build(chunk.clone());
let logs = builder.dump_logs().await.map(|s| s.to_string());
results.push(DeploymentResult::success(stage, logs));
Some(profiles)
}
Err(e) => {
progress.failure(&format!("Build failed: {}", e));
let mut results = self.results.lock().await;
let stage = Stage::Build(chunk.clone());
let logs = builder.dump_logs().await.map(|s| s.to_string());
results.push(DeploymentResult::failure(stage, logs));
None
}
}
}
async fn apply_profile(self: Arc<Self>, name: &NodeName, mut target: Target, profile: Profile, multi: Arc<Progress>) {
let permit = self.parallelism_limit.apply.acquire().await.unwrap();
let mut bar = multi.create_task_progress(name.to_string());
// FIXME: Would be nicer to check remote status before spending time evaluating/building
if !target.config.replace_unknown_profiles {
bar.log("Checking remote profile...");
match target.host.active_derivation_known().await {
Ok(_) => {
bar.log("Remote profile known");
}
Err(e) => {
if self.options.force_replace_unknown_profiles {
bar.log("warning: remote profile is unknown, but unknown profiles are being ignored");
} else {
bar.failure(&format!("Failed: {}", e));
return;
}
}
}
}
let pre_activation_keys = target.config.keys.iter()
.filter(|(_, v)| v.upload_at() == UploadAt::PreActivation)
.map(|(k, v)| (k.clone(), v.clone()))
.collect::<HashMap<String, Key>>();
let post_activation_keys = target.config.keys.iter()
.filter(|(_, v)| v.upload_at() == UploadAt::PostActivation)
.map(|(k, v)| (k.clone(), v.clone()))
.collect::<HashMap<String, Key>>();
if self.options.upload_keys && !pre_activation_keys.is_empty() {
bar.log("Uploading keys...");
if let Err(e) = target.host.upload_keys(&pre_activation_keys, false).await {
bar.failure_err(&e);
let mut results = self.results.lock().await;
let stage = Stage::Apply(name.clone());
let logs = target.host.dump_logs().await.map(|s| s.to_string());
results.push(DeploymentResult::failure(stage, logs));
return;
}
}
bar.log("Starting...");
target.host.set_progress_bar(bar.clone());
let copy_options = self.options.to_copy_options()
.include_outputs(true);
match target.host.deploy(&profile, self.goal, copy_options).await {
Ok(_) => {
// FIXME: This is ugly
if self.options.upload_keys && !post_activation_keys.is_empty() {
bar.log("Uploading keys (post-activation)...");
if let Err(e) = target.host.upload_keys(&post_activation_keys, true).await {
bar.failure_err(&e);
let mut results = self.results.lock().await;
let stage = Stage::Apply(name.clone());
let logs = target.host.dump_logs().await.map(|s| s.to_string());
results.push(DeploymentResult::failure(stage, logs));
return;
}
}
bar.success(self.goal.success_str().unwrap());
let mut results = self.results.lock().await;
let stage = Stage::Apply(name.clone());
let logs = target.host.dump_logs().await.map(|s| s.to_string());
results.push(DeploymentResult::success(stage, logs));
}
Err(e) => {
bar.failure(&format!("Failed: {}", e));
let mut results = self.results.lock().await;
let stage = Stage::Apply(name.clone());
let logs = target.host.dump_logs().await.map(|s| s.to_string());
results.push(DeploymentResult::failure(stage, logs));
}
}
drop(permit);
}
fn eval_limit(&self) -> usize {
if let Some(limit) = self.evaluation_node_limit.get_limit() {
limit
} else {
self.target_names.len()
}
}
}
#[derive(Debug)]
pub struct ParallelismLimit {
/// Limit of concurrent evaluation processes.
evaluation: Semaphore,
/// Limit of concurrent build processes.
build: Semaphore,
/// Limit of concurrent apply processes.
apply: Semaphore,
}
impl Default for ParallelismLimit {
fn default() -> Self {
Self {
evaluation: Semaphore::new(1),
build: Semaphore::new(2),
apply: Semaphore::new(10),
}
}
}
impl ParallelismLimit {
// Do we actually want them to be configurable?
/*
/// Sets the concurrent evaluation limit.
///
/// This limits the number of evaluation processes, not
/// the number of nodes in each evaluation process.
/// The latter is controlled in DeploymentOptions.
pub fn set_evaluation_limit(&mut self, limit: usize) {
self.evaluation = Semaphore::new(limit);
}
/// Sets the concurrent build limit.
pub fn set_build_limit(&mut self, limit: usize) {
self.build = Semaphore::new(limit);
}
*/
/// Sets the concurrent apply limit.
pub fn set_apply_limit(&mut self, limit: usize) {
self.apply = Semaphore::new(limit);
}
}
#[derive(Clone, Debug)]
pub struct DeploymentOptions {
/// Whether to show condensed progress bars.
///
/// If set to false, verbose logs will be displayed instead.
progress_bar: bool,
/// Whether to use binary caches when copying closures to remote hosts.
substituters_push: bool,
/// Whether to use gzip when copying closures to remote hosts.
gzip: bool,
/// Whether to upload keys when deploying.
upload_keys: bool,
/// Whether to create GC roots for node profiles.
///
/// If true, .gc_roots will be created under the hive's context
/// directory if it exists.
create_gc_roots: bool,
/// Ignore the node-level `deployment.replaceUnknownProfiles` option.
force_replace_unknown_profiles: bool,
}
impl Default for DeploymentOptions {
fn default() -> Self {
Self {
progress_bar: true,
substituters_push: true,
gzip: true,
upload_keys: true,
create_gc_roots: false,
force_replace_unknown_profiles: false,
}
}
}
impl DeploymentOptions {
pub fn set_progress_bar(&mut self, value: bool) {
self.progress_bar = value;
}
pub fn set_substituters_push(&mut self, value: bool) {
self.substituters_push = value;
}
pub fn set_gzip(&mut self, value: bool) {
self.gzip = value;
}
pub fn set_upload_keys(&mut self, enable: bool) {
self.upload_keys = enable;
}
pub fn set_create_gc_roots(&mut self, enable: bool) {
self.create_gc_roots = enable;
}
pub fn set_force_replace_unknown_profiles(&mut self, enable: bool) {
self.force_replace_unknown_profiles = enable;
}
fn to_copy_options(&self) -> CopyOptions {
let options = CopyOptions::default();
options
.use_substitutes(self.substituters_push)
.gzip(self.gzip)
}
}
/// Limit of the number of nodes in each evaluation process.
///
/// The evaluation process is very RAM-intensive, with memory
/// consumption scaling linearly with the number of nodes
/// evaluated at the same time. This can be a problem if you
/// are deploying to a large number of nodes at the same time,
/// where `nix-instantiate` may consume too much RAM and get
/// killed by the OS (`NixKilled` error).
///
/// Evaluating each node on its own is not an efficient solution,
/// with total CPU time and memory consumption vastly exceeding the
/// case where we evaluate the same set of nodes at the same time
/// (TODO: Provide statistics).
///
/// To overcome this problem, we split the evaluation process into
/// chunks when necessary, with the maximum number of nodes in
/// each `nix-instantiate` invocation determined with:
///
/// - A simple heuristic based on remaining memory in the system
/// - A supplied number
/// - No limit at all
#[derive(Copy, Clone, Debug)]
pub enum EvaluationNodeLimit {
/// Use a naive heuristic based on available memory.
Heuristic,
/// Supply the maximum number of nodes.
Manual(usize),
/// Do not limit the number of nodes in each evaluation process
None,
}
impl Default for EvaluationNodeLimit {
fn default() -> Self {
Self::Heuristic
}
}
impl EvaluationNodeLimit {
/// Returns the maximum number of hosts in each evaluation.
///
/// The result should be cached.
pub fn get_limit(&self) -> Option<usize> {
match self {
EvaluationNodeLimit::Heuristic => {
if let Ok(mem_info) = sys_info::mem_info() {
let mut mb = mem_info.avail / 1024;
if mb >= EVAL_RESERVE_MB {
mb -= EVAL_RESERVE_MB;
}
let nodes = mb / EVAL_PER_HOST_MB;
if nodes == 0 {
Some(1)
} else {
Some(nodes as usize)
}
} else {
Some(10)
}
}
EvaluationNodeLimit::Manual(limit) => Some(*limit),
EvaluationNodeLimit::None => None,
}
}
}

100
src/nix/deployment/goal.rs Normal file
View file

@ -0,0 +1,100 @@
//! Deployment goals.
/// The goal of a deployment.
#[derive(Debug, Copy, Clone, PartialEq)]
pub enum Goal {
/// Build the configurations only.
Build,
/// Push the closures only.
Push,
/// Make the configuration the boot default and activate now.
Switch,
/// Make the configuration the boot default.
Boot,
/// Activate the configuration, but don't make it the boot default.
Test,
/// Show what would be done if this configuration were activated.
DryActivate,
/// Only upload keys.
UploadKeys,
}
impl Goal {
pub fn from_str(s: &str) -> Option<Self> {
match s {
"build" => Some(Self::Build),
"push" => Some(Self::Push),
"switch" => Some(Self::Switch),
"boot" => Some(Self::Boot),
"test" => Some(Self::Test),
"dry-activate" => Some(Self::DryActivate),
"keys" => Some(Self::UploadKeys),
_ => None,
}
}
pub fn as_str(&self) -> Option<&'static str> {
use Goal::*;
match self {
Build => None,
Push => None,
Switch => Some("switch"),
Boot => Some("boot"),
Test => Some("test"),
DryActivate => Some("dry-activate"),
UploadKeys => Some("keys"),
}
}
pub fn success_str(&self) -> &'static str {
use Goal::*;
match self {
Build => "Configuration built",
Push => "Pushed",
Switch => "Activation successful",
Boot => "Will be activated next boot",
Test => "Activation successful (test)",
DryActivate => "Dry activation successful",
UploadKeys => "Uploaded keys",
}
}
pub fn should_switch_profile(&self) -> bool {
use Goal::*;
match self {
Boot | Switch => true,
_ => false,
}
}
pub fn requires_activation(&self) -> bool {
use Goal::*;
match self {
Build | UploadKeys | Push => false,
_ => true,
}
}
pub fn requires_target_host(&self) -> bool {
use Goal::*;
match self {
Build => false,
_ => true,
}
}
/// Is this a real goal supported by switch-to-configuration?
pub fn is_real_goal(&self) -> bool {
use Goal::*;
match self {
Build | UploadKeys | Push => false,
_ => true,
}
}
}

View file

@ -0,0 +1,109 @@
//! Parallelism limits.
use tokio::sync::Semaphore;
/// Amount of RAM reserved for the system, in MB.
const EVAL_RESERVE_MB: u64 = 1024;
/// Estimated amount of RAM needed to evaluate one host, in MB.
const EVAL_PER_HOST_MB: u64 = 512;
/// The parallelism limit for a deployment.
#[derive(Debug)]
pub struct ParallelismLimit {
/// Limit of concurrent evaluation processes.
pub evaluation: Semaphore,
/// Limit of concurrent build processes.
pub build: Semaphore,
/// Limit of concurrent apply processes.
pub apply: Semaphore,
}
impl Default for ParallelismLimit {
fn default() -> Self {
Self {
evaluation: Semaphore::new(1),
build: Semaphore::new(2),
apply: Semaphore::new(10),
}
}
}
impl ParallelismLimit {
/// Sets the concurrent apply limit.
pub fn set_apply_limit(&mut self, limit: usize) {
self.apply = Semaphore::new(limit);
}
}
/// Limit of the number of nodes in each evaluation process.
///
/// The evaluation process is very RAM-intensive, with memory
/// consumption scaling linearly with the number of nodes
/// evaluated at the same time. This can be a problem if you
/// are deploying to a large number of nodes at the same time,
/// where `nix-instantiate` may consume too much RAM and get
/// killed by the OS (`NixKilled` error).
///
/// Evaluating each node on its own is not an efficient solution,
/// with total CPU time and memory consumption vastly exceeding the
/// case where we evaluate the same set of nodes at the same time
/// (TODO: Provide statistics).
///
/// To overcome this problem, we split the evaluation process into
/// chunks when necessary, with the maximum number of nodes in
/// each `nix-instantiate` invocation determined with:
///
/// - A simple heuristic based on remaining memory in the system
/// - A supplied number
/// - No limit at all
#[derive(Copy, Clone, Debug)]
pub enum EvaluationNodeLimit {
/// Use a naive heuristic based on available memory.
Heuristic,
/// Supply the maximum number of nodes.
Manual(usize),
/// Do not limit the number of nodes in each evaluation process
None,
}
impl Default for EvaluationNodeLimit {
fn default() -> Self {
Self::Heuristic
}
}
impl EvaluationNodeLimit {
/// Returns the maximum number of hosts in each evaluation.
///
/// The result should be cached.
pub fn get_limit(&self) -> Option<usize> {
match self {
EvaluationNodeLimit::Heuristic => {
if let Ok(mem_info) = sys_info::mem_info() {
let mut mb = mem_info.avail / 1024;
if mb >= EVAL_RESERVE_MB {
mb -= EVAL_RESERVE_MB;
}
let nodes = mb / EVAL_PER_HOST_MB;
if nodes == 0 {
Some(1)
} else {
Some(nodes as usize)
}
} else {
Some(10)
}
}
EvaluationNodeLimit::Manual(limit) => Some(*limit),
EvaluationNodeLimit::None => None,
}
}
}

435
src/nix/deployment/mod.rs Normal file
View file

@ -0,0 +1,435 @@
//! Deployment logic.
pub mod goal;
pub use goal::Goal;
pub mod limits;
pub use limits::{EvaluationNodeLimit, ParallelismLimit};
pub mod options;
pub use options::Options;
use std::collections::HashMap;
use std::mem;
use std::sync::Arc;
use futures::future::join_all;
use itertools::Itertools;
use crate::progress::Sender as ProgressSender;
use crate::job::{JobMonitor, JobHandle, JobType, JobState};
use crate::util;
use super::{
Hive,
Host,
NodeName,
NodeConfig,
NixError,
NixResult,
Profile,
ProfileMap,
StoreDerivation,
CopyDirection,
key::{Key, UploadAt as UploadKeyAt},
};
use super::host;
/// A deployment.
pub type DeploymentHandle = Arc<Deployment>;
/// A map of target nodes.
pub type TargetNodeMap = HashMap<NodeName, TargetNode>;
/// A deployment.
#[derive(Debug)]
pub struct Deployment {
/// The configuration.
hive: Hive,
/// The goal of this deployment.
goal: Goal,
/// Deployment options.
options: Options,
/// Handle to send messages to the ProgressOutput.
progress: Option<ProgressSender>,
/// Names of the target nodes.
nodes: Vec<NodeName>,
/// Handles to the deployment targets.
targets: HashMap<NodeName, TargetNode>,
/// Parallelism limit.
parallelism_limit: ParallelismLimit,
/// Evaluation limit.
evaluation_node_limit: EvaluationNodeLimit,
/// Whether it was executed.
executed: bool,
}
/// Handle to a target node.
#[derive(Debug)]
pub struct TargetNode {
/// Name of the node.
name: NodeName,
/// The host to deploy to.
host: Option<Box<dyn Host>>,
/// The config.deployment values of the node.
config: NodeConfig,
}
impl TargetNode {
pub fn new(name: NodeName, host: Option<Box<dyn Host>>, config: NodeConfig) -> Self {
Self { name, host, config }
}
pub fn into_host(self) -> Option<Box<dyn Host>> {
self.host
}
}
impl Deployment {
/// Creates a new deployment.
pub fn new(hive: Hive, targets: TargetNodeMap, goal: Goal, progress: Option<ProgressSender>) -> Self {
Self {
hive,
goal,
progress,
nodes: targets.keys().cloned().collect(),
targets,
parallelism_limit: ParallelismLimit::default(),
evaluation_node_limit: EvaluationNodeLimit::default(),
options: Options::default(),
executed: false,
}
}
/// Executes the deployment.
///
/// If a ProgressSender is supplied, then this should be run in parallel
/// with its `run_until_completion()` future.
pub async fn execute(mut self) -> NixResult<()> {
if self.executed {
return Err(NixError::DeploymentAlreadyExecuted);
}
self.executed = true;
let (mut monitor, meta) = JobMonitor::new(self.progress.clone());
if let Some(width) = util::get_label_width(&self.targets) {
monitor.set_label_width(width);
}
if self.goal == Goal::UploadKeys {
// Just upload keys
let targets = mem::take(&mut self.targets);
let deployment = DeploymentHandle::new(self);
let meta_future = meta.run(|meta| async move {
let mut futures = Vec::new();
for target in targets.into_values() {
futures.push(deployment.clone().upload_keys_to_node(meta.clone(), target));
}
let result: NixResult<Vec<()>> = join_all(futures).await.into_iter().collect();
result?;
Ok(())
});
let (result, _) = tokio::join!(
meta_future,
monitor.run_until_completion(),
);
result?;
Ok(())
} else {
// Do the whole eval-build-deploy flow
let chunks = self.get_chunks();
let deployment = DeploymentHandle::new(self);
let meta_future = meta.run(|meta| async move {
let mut futures = Vec::new();
for chunk in chunks.into_iter() {
futures.push(deployment.clone().execute_chunk(meta.clone(), chunk));
}
let result: NixResult<Vec<()>> = join_all(futures).await.into_iter().collect();
result?;
Ok(())
});
let (result, _) = tokio::join!(
meta_future,
monitor.run_until_completion(),
);
result?;
Ok(())
}
}
pub fn set_options(&mut self, options: Options) {
self.options = options;
}
pub fn set_parallelism_limit(&mut self, limit: ParallelismLimit) {
self.parallelism_limit = limit;
}
pub fn set_evaluation_node_limit(&mut self, limit: EvaluationNodeLimit) {
self.evaluation_node_limit = limit;
}
fn get_chunks(&mut self) -> Vec<TargetNodeMap> {
let eval_limit = self.evaluation_node_limit.get_limit()
.unwrap_or(self.targets.len());
let mut result = Vec::new();
for chunk in self.targets.drain().chunks(eval_limit).into_iter() {
let mut map = HashMap::new();
for (name, host) in chunk {
map.insert(name, host);
}
result.push(map);
}
result
}
/// Executes the deployment against a portion of nodes.
async fn execute_chunk(self: DeploymentHandle, parent: JobHandle, mut chunk: TargetNodeMap) -> NixResult<()> {
if self.goal == Goal::UploadKeys {
unreachable!(); // some logic is screwed up
}
let nodes: Vec<NodeName> = chunk.keys().cloned().collect();
let profiles = self.clone().build_nodes(parent.clone(), nodes.clone()).await?;
if self.goal == Goal::Build {
return Ok(());
}
for (name, profile) in profiles.iter() {
let target = chunk.remove(&name).unwrap();
self.clone().deploy_node(parent.clone(), target, profile.clone()).await?;
}
// Create GC root
if self.options.create_gc_roots {
let job = parent.create_job(JobType::CreateGcRoots, nodes.clone())?;
let arc_self = self.clone();
job.run_waiting(|job| async move {
if let Some(dir) = arc_self.hive.context_dir() {
job.state(JobState::Running)?;
let base = dir.join(".gcroots");
profiles.create_gc_roots(&base).await?;
} else {
job.noop("No context directory to create GC roots in".to_string())?;
}
Ok(())
}).await?;
}
Ok(())
}
/// Evaluates a set of nodes, returning a store derivation.
async fn evaluate_nodes(self: DeploymentHandle, parent: JobHandle, nodes: Vec<NodeName>)
-> NixResult<StoreDerivation<ProfileMap>>
{
let job = parent.create_job(JobType::Evaluate, nodes.clone())?;
job.run_waiting(|job| async move {
// Wait for eval limit
let permit = self.parallelism_limit.evaluation.acquire().await.unwrap();
job.state(JobState::Running)?;
let result = self.hive.eval_selected(&nodes, Some(job.clone())).await;
drop(permit);
result
}).await
}
/// Builds a set of nodes, returning a set of profiles.
async fn build_nodes(self: DeploymentHandle, parent: JobHandle, nodes: Vec<NodeName>)
-> NixResult<ProfileMap>
{
let job = parent.create_job(JobType::Build, nodes.clone())?;
job.run_waiting(|job| async move {
let derivation = self.clone().evaluate_nodes(job.clone(), nodes.clone()).await?;
// Wait for build limit
let permit = self.parallelism_limit.apply.acquire().await.unwrap();
job.state(JobState::Running)?;
// FIXME: Remote builder?
let nix_options = self.hive.nix_options().await.unwrap();
let mut builder = host::local(nix_options);
let map = derivation.realize(&mut *builder).await?;
job.profiles_built(map.clone())?;
drop(permit);
Ok(map)
}).await
}
/// Only uploads keys to a node.
async fn upload_keys_to_node(self: DeploymentHandle, parent: JobHandle, mut target: TargetNode) -> NixResult<()> {
let nodes = vec![target.name.clone()];
let job = parent.create_job(JobType::UploadKeys, nodes)?;
job.run(|_| async move {
if target.host.is_none() {
return Err(NixError::Unsupported);
}
let host = target.host.as_mut().unwrap();
host.upload_keys(&target.config.keys, true).await?;
Ok(())
}).await
}
/// Pushes and optionally activates a system profile on a given node.
///
/// This will also upload keys to the node.
async fn deploy_node(self: DeploymentHandle, parent: JobHandle, mut target: TargetNode, profile: Profile)
-> NixResult<()>
{
if self.goal == Goal::Build {
unreachable!();
}
let nodes = vec![target.name.clone()];
let push_job = parent.create_job(JobType::Push, nodes.clone())?;
let push_profile = profile.clone();
let arc_self = self.clone();
let mut target = push_job.run_waiting(|job| async move {
if target.host.is_none() {
return Err(NixError::Unsupported);
}
let permit = arc_self.parallelism_limit.apply.acquire().await.unwrap();
job.state(JobState::Running)?;
let host = target.host.as_mut().unwrap();
host.copy_closure(
push_profile.as_store_path(),
CopyDirection::ToRemote,
arc_self.options.to_copy_options()).await?;
drop(permit);
Ok(target)
}).await?;
if !self.goal.requires_activation() {
// We are done here :)
return Ok(());
}
// Upload pre-activation keys
let mut target = if self.options.upload_keys {
let job = parent.create_job(JobType::UploadKeys, nodes.clone())?;
job.run_waiting(|job| async move {
let keys = target.config.keys.iter()
.filter(|(_, v)| v.upload_at() == UploadKeyAt::PreActivation)
.map(|(k, v)| (k.clone(), v.clone()))
.collect::<HashMap<String, Key>>();
if keys.is_empty() {
job.noop("No pre-activation keys to upload".to_string())?;
return Ok(target);
}
job.state(JobState::Running)?;
job.message("Uploading pre-activation keys...".to_string())?;
let host = target.host.as_mut().unwrap();
host.upload_keys(&keys, false).await?;
job.success_with_message("Uploaded keys (pre-activation)".to_string())?;
Ok(target)
}).await?
} else {
target
};
// Activate profile
let activation_job = parent.create_job(JobType::Activate, nodes.clone())?;
let arc_self = self.clone();
let profile_r = profile.clone();
let mut target = activation_job.run(|job| async move {
let host = target.host.as_mut().unwrap();
if !target.config.replace_unknown_profiles {
job.message("Checking remote profile...".to_string())?;
match host.active_derivation_known().await {
Ok(_) => {
job.message("Remote profile known".to_string())?;
}
Err(e) => {
if arc_self.options.force_replace_unknown_profiles {
job.message("warning: remote profile is unknown, but unknown profiles are being ignored".to_string())?;
} else {
return Err(e);
}
}
}
}
host.activate(&profile_r, arc_self.goal).await?;
job.success_with_message(arc_self.goal.success_str().to_string())?;
Ok(target)
}).await?;
// Upload post-activation keys
if self.options.upload_keys {
let job = parent.create_job(JobType::UploadKeys, nodes.clone())?;
job.run_waiting(|job| async move {
let keys = target.config.keys.iter()
.filter(|(_, v)| v.upload_at() == UploadKeyAt::PostActivation)
.map(|(k, v)| (k.clone(), v.clone()))
.collect::<HashMap<String, Key>>();
if keys.is_empty() {
job.noop("No post-activation keys to upload".to_string())?;
return Ok(());
}
job.state(JobState::Running)?;
job.message("Uploading post-activation keys...".to_string())?;
let host = target.host.as_mut().unwrap();
host.upload_keys(&keys, true).await?;
job.success_with_message("Uploaded keys (post-activation)".to_string())?;
Ok(())
}).await?;
}
Ok(())
}
}

View file

@ -0,0 +1,67 @@
//! Deployment options.
use crate::nix::CopyOptions;
/// Options for a deployment.
#[derive(Clone, Debug)]
pub struct Options {
/// Whether to use binary caches when copying closures to remote hosts.
pub(super) substituters_push: bool,
/// Whether to use gzip when copying closures to remote hosts.
pub(super) gzip: bool,
/// Whether to upload keys when deploying.
pub(super) upload_keys: bool,
/// Whether to create GC roots for node profiles.
///
/// If true, .gc_roots will be created under the hive's context
/// directory if it exists.
pub(super) create_gc_roots: bool,
/// Ignore the node-level `deployment.replaceUnknownProfiles` option.
pub(super) force_replace_unknown_profiles: bool,
}
impl Options {
pub fn set_substituters_push(&mut self, value: bool) {
self.substituters_push = value;
}
pub fn set_gzip(&mut self, value: bool) {
self.gzip = value;
}
pub fn set_upload_keys(&mut self, enable: bool) {
self.upload_keys = enable;
}
pub fn set_create_gc_roots(&mut self, enable: bool) {
self.create_gc_roots = enable;
}
pub fn set_force_replace_unknown_profiles(&mut self, enable: bool) {
self.force_replace_unknown_profiles = enable;
}
pub fn to_copy_options(&self) -> CopyOptions {
let options = CopyOptions::default();
options
.use_substitutes(self.substituters_push)
.gzip(self.gzip)
}
}
impl Default for Options {
fn default() -> Self {
Self {
substituters_push: true,
gzip: true,
upload_keys: true,
create_gc_roots: false,
force_replace_unknown_profiles: false,
}
}
}

View file

@ -446,14 +446,15 @@ let
value = evalNode name hive.${name}; value = evalNode name hive.${name};
}) nodeNames); }) nodeNames);
deploymentConfigJson = toJSON (lib.attrsets.mapAttrs (name: eval: eval.config.deployment) nodes); toplevel = lib.mapAttrs (name: eval: eval.config.system.build.toplevel) nodes;
toplevel = lib.attrsets.mapAttrs (name: eval: eval.config.system.build.toplevel) nodes; deploymentConfigJson = toJSON (lib.mapAttrs (name: eval: eval.config.deployment) nodes);
buildAll = buildSelected { deploymentConfigJsonSelected = names: toJSON
names = nodeNames; (listToAttrs (map (name: { inherit name; value = nodes.${name}.config.deployment; }) names));
};
buildSelected = { names ? null }: let buildAll = buildSelected nodeNames;
buildSelected = names: let
# Change in the order of the names should not cause a derivation to be created # Change in the order of the names should not cause a derivation to be created
selected = lib.attrsets.filterAttrs (name: _: elem name names) toplevel; selected = lib.attrsets.filterAttrs (name: _: elem name names) toplevel;
in derivation rec { in derivation rec {
@ -470,7 +471,11 @@ let
inherit pkgs lib nodes; inherit pkgs lib nodes;
}; };
in { in {
inherit nodes deploymentConfigJson toplevel buildAll buildSelected introspect; inherit
nodes toplevel
deploymentConfigJson deploymentConfigJsonSelected
buildAll buildSelected introspect;
meta = hive.meta; meta = hive.meta;
docs = { docs = {

View file

@ -15,11 +15,13 @@ use super::{
NixResult, NixResult,
NodeName, NodeName,
NodeConfig, NodeConfig,
NodeFilter,
ProfileMap, ProfileMap,
}; };
use super::deployment::TargetNode;
use super::NixCommand; use super::NixCommand;
use crate::util::CommandExecution; use crate::util::CommandExecution;
use crate::progress::TaskProgress; use crate::job::JobHandle;
const HIVE_EVAL: &'static [u8] = include_bytes!("eval.nix"); const HIVE_EVAL: &'static [u8] = include_bytes!("eval.nix");
@ -116,6 +118,88 @@ impl Hive {
Ok(options) Ok(options)
} }
/// Convenience wrapper to filter nodes for CLI actions.
pub async fn select_nodes(&self, filter: Option<NodeFilter>, ssh_config: Option<PathBuf>, ssh_only: bool) -> NixResult<HashMap<NodeName, TargetNode>> {
let mut node_configs = None;
log::info!("Enumerating nodes...");
let all_nodes = self.node_names().await?;
let selected_nodes = match filter {
Some(filter) => {
if filter.has_node_config_rules() {
log::debug!("Retrieving deployment info for all nodes...");
let all_node_configs = self.deployment_info().await?;
let filtered = filter.filter_node_configs(all_node_configs.iter())
.into_iter().collect();
node_configs = Some(all_node_configs);
filtered
} else {
filter.filter_node_names(&all_nodes)?
.into_iter().collect()
}
}
None => all_nodes.clone(),
};
let n_selected = selected_nodes.len();
let mut node_configs = if let Some(configs) = node_configs {
configs
} else {
log::debug!("Retrieving deployment info for selected nodes...");
self.deployment_info_selected(&selected_nodes).await?
};
let mut targets = HashMap::new();
let mut n_ssh = 0;
for node in selected_nodes.into_iter() {
let config = node_configs.remove(&node).unwrap();
let host = config.to_ssh_host().map(|mut host| {
n_ssh += 1;
if let Some(ssh_config) = &ssh_config {
host.set_ssh_config(ssh_config.clone());
}
host.upcast()
});
let ssh_host = host.is_some();
let target = TargetNode::new(node.clone(), host, config);
if !ssh_only || ssh_host {
targets.insert(node, target);
}
}
let skipped = n_selected - n_ssh;
if targets.is_empty() {
if skipped != 0 {
log::warn!("No hosts selected.");
} else {
log::warn!("No hosts selected ({} skipped).", skipped);
}
} else if targets.len() == all_nodes.len() {
log::info!("Selected all {} nodes.", targets.len());
} else if !ssh_only || skipped == 0 {
log::info!("Selected {} out of {} hosts.", targets.len(), all_nodes.len());
} else {
log::info!("Selected {} out of {} hosts ({} skipped).", targets.len(), all_nodes.len(), skipped);
}
Ok(targets)
}
/// Returns a list of all node names.
pub async fn node_names(&self) -> NixResult<Vec<NodeName>> {
self.nix_instantiate("attrNames hive.nodes").eval()
.capture_json().await
}
/// Retrieve deployment info for all nodes. /// Retrieve deployment info for all nodes.
pub async fn deployment_info(&self) -> NixResult<HashMap<NodeName, NodeConfig>> { pub async fn deployment_info(&self) -> NixResult<HashMap<NodeName, NodeConfig>> {
// FIXME: Really ugly :( // FIXME: Really ugly :(
@ -133,7 +217,7 @@ impl Hive {
} }
/// Retrieve deployment info for a single node. /// Retrieve deployment info for a single node.
pub async fn deployment_info_for(&self, node: &NodeName) -> NixResult<Option<NodeConfig>> { pub async fn deployment_info_single(&self, node: &NodeName) -> NixResult<Option<NodeConfig>> {
let expr = format!("toJSON (hive.nodes.\"{}\".config.deployment or null)", node.as_str()); let expr = format!("toJSON (hive.nodes.\"{}\".config.deployment or null)", node.as_str());
let s: String = self.nix_instantiate(&expr).eval_with_builders().await? let s: String = self.nix_instantiate(&expr).eval_with_builders().await?
.capture_json().await?; .capture_json().await?;
@ -141,48 +225,45 @@ impl Hive {
Ok(serde_json::from_str(&s).unwrap()) Ok(serde_json::from_str(&s).unwrap())
} }
/// Retrieve deployment info for a list of nodes.
pub async fn deployment_info_selected(&self, nodes: &[NodeName]) -> NixResult<HashMap<NodeName, NodeConfig>> {
let nodes_expr = SerializedNixExpresssion::new(nodes)?;
// FIXME: Really ugly :(
let s: String = self.nix_instantiate(&format!("hive.deploymentConfigJsonSelected {}", nodes_expr.expression()))
.eval_with_builders().await?
.capture_json().await?;
let configs: HashMap<NodeName, NodeConfig> = serde_json::from_str(&s).unwrap();
for config in configs.values() {
config.validate()?;
for key in config.keys.values() {
key.validate()?;
}
}
Ok(configs)
}
/// Evaluates selected nodes. /// Evaluates selected nodes.
/// ///
/// Evaluation may take up a lot of memory, so we make it possible /// Evaluation may take up a lot of memory, so we make it possible
/// to split up the evaluation process into chunks and run them /// to split up the evaluation process into chunks and run them
/// concurrently with other processes (e.g., build and apply). /// concurrently with other processes (e.g., build and apply).
pub async fn eval_selected(&self, nodes: &Vec<NodeName>, progress_bar: TaskProgress) -> (NixResult<StoreDerivation<ProfileMap>>, Option<String>) { pub async fn eval_selected(&self, nodes: &Vec<NodeName>, job: Option<JobHandle>) -> NixResult<StoreDerivation<ProfileMap>> {
// FIXME: The return type is ugly... let nodes_expr = SerializedNixExpresssion::new(nodes)?;
let nodes_expr = SerializedNixExpresssion::new(nodes); let expr = format!("hive.buildSelected {}", nodes_expr.expression());
if let Err(e) = nodes_expr {
return (Err(e), None);
}
let nodes_expr = nodes_expr.unwrap();
let expr = format!("hive.buildSelected {{ names = {}; }}", nodes_expr.expression());
let command = match self.nix_instantiate(&expr).instantiate_with_builders().await {
Ok(command) => command,
Err(e) => {
return (Err(e), None);
}
};
let command = self.nix_instantiate(&expr).instantiate_with_builders().await?;
let mut execution = CommandExecution::new(command); let mut execution = CommandExecution::new(command);
execution.set_progress_bar(progress_bar); execution.set_job(job);
let eval = execution let path = execution.capture_store_path().await?;
.capture_store_path().await;
let (_, stderr) = execution.get_logs();
match eval {
Ok(path) => {
let drv = path.to_derivation() let drv = path.to_derivation()
.expect("The result should be a store derivation"); .expect("The result should be a store derivation");
(Ok(drv), stderr.cloned()) Ok(drv)
}
Err(e) => {
(Err(e), stderr.cloned())
}
}
} }
/// Evaluates an expression using values from the configuration /// Evaluates an expression using values from the configuration

View file

@ -12,8 +12,8 @@ use shell_escape::unix::escape;
use tokio::io::{AsyncWriteExt, BufReader}; use tokio::io::{AsyncWriteExt, BufReader};
use tokio::process::Child; use tokio::process::Child;
use crate::job::JobHandle;
use crate::nix::{Key, NixResult}; use crate::nix::{Key, NixResult};
use crate::progress::TaskProgress;
use crate::util::capture_stream; use crate::util::capture_stream;
const SCRIPT_TEMPLATE: &'static str = include_str!("./key_uploader.template.sh"); const SCRIPT_TEMPLATE: &'static str = include_str!("./key_uploader.template.sh");
@ -30,7 +30,7 @@ pub fn generate_script<'a>(key: &'a Key, destination: &'a Path, require_ownershi
escape(key_script.into()) escape(key_script.into())
} }
pub async fn feed_uploader(mut uploader: Child, key: &Key, progress: TaskProgress, logs: &mut String) -> NixResult<()> { pub async fn feed_uploader(mut uploader: Child, key: &Key, job: Option<JobHandle>) -> NixResult<()> {
let mut reader = key.reader().await?; let mut reader = key.reader().await?;
let mut stdin = uploader.stdin.take().unwrap(); let mut stdin = uploader.stdin.take().unwrap();
@ -42,13 +42,11 @@ pub async fn feed_uploader(mut uploader: Child, key: &Key, progress: TaskProgres
let stderr = BufReader::new(uploader.stderr.take().unwrap()); let stderr = BufReader::new(uploader.stderr.take().unwrap());
let futures = join3( let futures = join3(
capture_stream(stdout, progress.clone()), capture_stream(stdout, job.clone(), false),
capture_stream(stderr, progress.clone()), capture_stream(stderr, job.clone(), true),
uploader.wait(), uploader.wait(),
); );
let (stdout_str, stderr_str, exit) = futures.await; let (_, _, exit) = futures.await;
logs.push_str(&stdout_str);
logs.push_str(&stderr_str);
let exit = exit?; let exit = exit?;

View file

@ -6,9 +6,9 @@ use async_trait::async_trait;
use tokio::process::Command; use tokio::process::Command;
use super::{CopyDirection, CopyOptions, Host, key_uploader}; use super::{CopyDirection, CopyOptions, Host, key_uploader};
use crate::nix::{StorePath, Profile, Goal, NixResult, NixCommand, Key, SYSTEM_PROFILE}; use crate::nix::{StorePath, Profile, Goal, NixError, NixResult, NixCommand, Key, SYSTEM_PROFILE};
use crate::util::CommandExecution; use crate::util::CommandExecution;
use crate::progress::TaskProgress; use crate::job::JobHandle;
/// The local machine running Colmena. /// The local machine running Colmena.
/// ///
@ -16,16 +16,14 @@ use crate::progress::TaskProgress;
/// (e.g., building Linux derivations on macOS). /// (e.g., building Linux derivations on macOS).
#[derive(Debug)] #[derive(Debug)]
pub struct Local { pub struct Local {
progress_bar: TaskProgress, job: Option<JobHandle>,
logs: String,
nix_options: Vec<String>, nix_options: Vec<String>,
} }
impl Local { impl Local {
pub fn new(nix_options: Vec<String>) -> Self { pub fn new(nix_options: Vec<String>) -> Self {
Self { Self {
progress_bar: TaskProgress::default(), job: None,
logs: String::new(),
nix_options, nix_options,
} }
} }
@ -36,6 +34,7 @@ impl Host for Local {
async fn copy_closure(&mut self, _closure: &StorePath, _direction: CopyDirection, _options: CopyOptions) -> NixResult<()> { async fn copy_closure(&mut self, _closure: &StorePath, _direction: CopyDirection, _options: CopyOptions) -> NixResult<()> {
Ok(()) Ok(())
} }
async fn realize_remote(&mut self, derivation: &StorePath) -> NixResult<Vec<StorePath>> { async fn realize_remote(&mut self, derivation: &StorePath) -> NixResult<Vec<StorePath>> {
let mut command = Command::new("nix-store"); let mut command = Command::new("nix-store");
@ -47,20 +46,15 @@ impl Host for Local {
let mut execution = CommandExecution::new(command); let mut execution = CommandExecution::new(command);
execution.set_progress_bar(self.progress_bar.clone()); execution.set_job(self.job.clone());
let result = execution.run().await; execution.run().await?;
let (stdout, _) = execution.get_logs();
let (stdout, stderr) = execution.get_logs(); stdout.unwrap().lines()
self.logs += stderr.unwrap(); .map(|p| p.to_string().try_into()).collect()
}
match result {
Ok(()) => {
stdout.unwrap().lines().map(|p| p.to_string().try_into()).collect()
}
Err(e) => Err(e),
}
}
async fn upload_keys(&mut self, keys: &HashMap<String, Key>, require_ownership: bool) -> NixResult<()> { async fn upload_keys(&mut self, keys: &HashMap<String, Key>, require_ownership: bool) -> NixResult<()> {
for (name, key) in keys { for (name, key) in keys {
self.upload_key(&name, &key, require_ownership).await?; self.upload_key(&name, &key, require_ownership).await?;
@ -68,7 +62,12 @@ impl Host for Local {
Ok(()) Ok(())
} }
async fn activate(&mut self, profile: &Profile, goal: Goal) -> NixResult<()> { async fn activate(&mut self, profile: &Profile, goal: Goal) -> NixResult<()> {
if !goal.is_real_goal() {
return Err(NixError::Unsupported);
}
if goal.should_switch_profile() { if goal.should_switch_profile() {
let path = profile.as_path().to_str().unwrap(); let path = profile.as_path().to_str().unwrap();
Command::new("nix-env") Command::new("nix-env")
@ -85,32 +84,23 @@ impl Host for Local {
let mut execution = CommandExecution::new(command); let mut execution = CommandExecution::new(command);
execution.set_progress_bar(self.progress_bar.clone()); execution.set_job(self.job.clone());
let result = execution.run().await; let result = execution.run().await;
// FIXME: Bad - Order of lines is messed up
let (stdout, stderr) = execution.get_logs();
self.logs += stdout.unwrap();
self.logs += stderr.unwrap();
result result
} }
async fn active_derivation_known(&mut self) -> NixResult<bool> { async fn active_derivation_known(&mut self) -> NixResult<bool> {
Ok(true) Ok(true)
} }
fn set_progress_bar(&mut self, bar: TaskProgress) {
self.progress_bar = bar;
}
async fn dump_logs(&self) -> Option<&str> {
Some(&self.logs)
}
} }
impl Local { impl Local {
/// "Uploads" a single key. /// "Uploads" a single key.
async fn upload_key(&mut self, name: &str, key: &Key, require_ownership: bool) -> NixResult<()> { async fn upload_key(&mut self, name: &str, key: &Key, require_ownership: bool) -> NixResult<()> {
self.progress_bar.log(&format!("Deploying key {}", name)); if let Some(job) = &self.job {
job.message(format!("Deploying key {}", name))?;
}
let dest_path = key.dest_dir().join(name); let dest_path = key.dest_dir().join(name);
let key_script = format!("'{}'", key_uploader::generate_script(key, &dest_path, require_ownership)); let key_script = format!("'{}'", key_uploader::generate_script(key, &dest_path, require_ownership));
@ -123,6 +113,6 @@ impl Local {
command.stdout(Stdio::piped()); command.stdout(Stdio::piped());
let uploader = command.spawn()?; let uploader = command.spawn()?;
key_uploader::feed_uploader(uploader, key, self.progress_bar.clone(), &mut self.logs).await key_uploader::feed_uploader(uploader, key, self.job.clone()).await
} }
} }

View file

@ -3,7 +3,7 @@ use std::collections::HashMap;
use async_trait::async_trait; use async_trait::async_trait;
use super::{StorePath, Profile, Goal, NixResult, NixError, Key}; use super::{StorePath, Profile, Goal, NixResult, NixError, Key};
use crate::progress::TaskProgress; use crate::job::JobHandle;
mod ssh; mod ssh;
pub use ssh::Ssh; pub use ssh::Ssh;
@ -109,21 +109,22 @@ pub trait Host: Send + Sync + std::fmt::Debug {
/// Check if the active profile is known to the host running Colmena /// Check if the active profile is known to the host running Colmena
async fn active_derivation_known(&mut self) -> NixResult<bool>; async fn active_derivation_known(&mut self) -> NixResult<bool>;
#[allow(unused_variables)]
/// Activates a system profile on the host, if it runs NixOS. /// Activates a system profile on the host, if it runs NixOS.
/// ///
/// The profile must already exist on the host. You should probably use deploy instead. /// The profile must already exist on the host. You should probably use deploy instead.
#[allow(unused_variables)]
async fn activate(&mut self, profile: &Profile, goal: Goal) -> NixResult<()> { async fn activate(&mut self, profile: &Profile, goal: Goal) -> NixResult<()> {
Err(NixError::Unsupported) Err(NixError::Unsupported)
} }
/// Runs an arbitrary command on the host.
#[allow(unused_variables)] #[allow(unused_variables)]
/// Provides a TaskProgress to use during operations. async fn run_command(&mut self, command: &[&str]) -> NixResult<()> {
fn set_progress_bar(&mut self, bar: TaskProgress) { Err(NixError::Unsupported)
} }
/// Dumps human-readable unstructured log messages related to the host. /// Provides a JobHandle to use during operations.
async fn dump_logs(&self) -> Option<&str> { #[allow(unused_variables)]
None fn set_job(&mut self, bar: Option<JobHandle>) {
} }
} }

View file

@ -9,7 +9,7 @@ use tokio::process::Command;
use super::{CopyDirection, CopyOptions, Host, key_uploader}; use super::{CopyDirection, CopyOptions, Host, key_uploader};
use crate::nix::{StorePath, Profile, Goal, NixResult, NixCommand, NixError, Key, SYSTEM_PROFILE}; use crate::nix::{StorePath, Profile, Goal, NixResult, NixCommand, NixError, Key, SYSTEM_PROFILE};
use crate::util::CommandExecution; use crate::util::CommandExecution;
use crate::progress::TaskProgress; use crate::job::JobHandle;
/// A remote machine connected over SSH. /// A remote machine connected over SSH.
#[derive(Debug)] #[derive(Debug)]
@ -30,8 +30,7 @@ pub struct Ssh {
privilege_escalation_command: Vec<String>, privilege_escalation_command: Vec<String>,
friendly_name: String, friendly_name: String,
progress_bar: TaskProgress, job: Option<JobHandle>,
logs: String,
} }
#[async_trait] #[async_trait]
@ -61,6 +60,10 @@ impl Host for Ssh {
Ok(()) Ok(())
} }
async fn activate(&mut self, profile: &Profile, goal: Goal) -> NixResult<()> { async fn activate(&mut self, profile: &Profile, goal: Goal) -> NixResult<()> {
if !goal.is_real_goal() {
return Err(NixError::Unsupported);
}
if goal.should_switch_profile() { if goal.should_switch_profile() {
let path = profile.as_path().to_str().unwrap(); let path = profile.as_path().to_str().unwrap();
let set_profile = self.ssh(&["nix-env", "--profile", SYSTEM_PROFILE, "--set", path]); let set_profile = self.ssh(&["nix-env", "--profile", SYSTEM_PROFILE, "--set", path]);
@ -72,6 +75,10 @@ impl Host for Ssh {
let command = self.ssh(&v); let command = self.ssh(&v);
self.run_command(command).await self.run_command(command).await
} }
async fn run_command(&mut self, command: &[&str]) -> NixResult<()> {
let command = self.ssh(&command);
self.run_command(command).await
}
async fn active_derivation_known(&mut self) -> NixResult<bool> { async fn active_derivation_known(&mut self) -> NixResult<bool> {
let paths = self.ssh(&["realpath", SYSTEM_PROFILE]) let paths = self.ssh(&["realpath", SYSTEM_PROFILE])
.capture_output() .capture_output()
@ -93,11 +100,8 @@ impl Host for Ssh {
Err(e) => Err(e), Err(e) => Err(e),
} }
} }
fn set_progress_bar(&mut self, bar: TaskProgress) { fn set_job(&mut self, job: Option<JobHandle>) {
self.progress_bar = bar; self.job = job;
}
async fn dump_logs(&self) -> Option<&str> {
Some(&self.logs)
} }
} }
@ -111,8 +115,7 @@ impl Ssh {
ssh_config: None, ssh_config: None,
friendly_name, friendly_name,
privilege_escalation_command: Vec::new(), privilege_escalation_command: Vec::new(),
progress_bar: TaskProgress::default(), job: None,
logs: String::new(),
} }
} }
@ -157,16 +160,10 @@ impl Ssh {
async fn run_command(&mut self, command: Command) -> NixResult<()> { async fn run_command(&mut self, command: Command) -> NixResult<()> {
let mut execution = CommandExecution::new(command); let mut execution = CommandExecution::new(command);
execution.set_job(self.job.clone());
execution.set_progress_bar(self.progress_bar.clone());
let result = execution.run().await; let result = execution.run().await;
// FIXME: Bad - Order of lines is messed up
let (stdout, stderr) = execution.get_logs();
self.logs += stdout.unwrap();
self.logs += stderr.unwrap();
result result
} }
@ -228,7 +225,9 @@ impl Ssh {
/// Uploads a single key. /// Uploads a single key.
async fn upload_key(&mut self, name: &str, key: &Key, require_ownership: bool) -> NixResult<()> { async fn upload_key(&mut self, name: &str, key: &Key, require_ownership: bool) -> NixResult<()> {
self.progress_bar.log(&format!("Deploying key {}", name)); if let Some(job) = &self.job {
job.message(format!("Uploading key {}", name))?;
}
let dest_path = key.dest_dir().join(name); let dest_path = key.dest_dir().join(name);
let key_script = key_uploader::generate_script(key, &dest_path, require_ownership); let key_script = key_uploader::generate_script(key, &dest_path, require_ownership);
@ -240,6 +239,6 @@ impl Ssh {
command.stdout(Stdio::piped()); command.stdout(Stdio::piped());
let uploader = command.spawn()?; let uploader = command.spawn()?;
key_uploader::feed_uploader(uploader, key, self.progress_bar.clone(), &mut self.logs).await key_uploader::feed_uploader(uploader, key, self.job.clone()).await
} }
} }

View file

@ -33,7 +33,7 @@ pub mod profile;
pub use profile::{Profile, ProfileMap}; pub use profile::{Profile, ProfileMap};
pub mod deployment; pub mod deployment;
pub use deployment::{Goal, Target, Deployment}; pub use deployment::Goal;
pub mod info; pub mod info;
pub use info::NixCheck; pub use info::NixCheck;
@ -87,10 +87,19 @@ pub enum NixError {
#[snafu(display("Current Nix version does not support Flakes"))] #[snafu(display("Current Nix version does not support Flakes"))]
NoFlakesSupport, NoFlakesSupport,
#[snafu(display("Don't know how to connect to the node"))]
NoTargetHost,
#[snafu(display("Node name cannot be empty"))] #[snafu(display("Node name cannot be empty"))]
EmptyNodeName, EmptyNodeName,
#[snafu(display("Nix Error: {}", message))] #[snafu(display("Filter rule cannot be empty"))]
EmptyFilterRule,
#[snafu(display("Deployment already executed"))]
DeploymentAlreadyExecuted,
#[snafu(display("Unknown error: {}", message))]
Unknown { message: String }, Unknown { message: String },
} }
@ -121,6 +130,13 @@ impl From<ExitStatus> for NixError {
} }
} }
impl NixError {
pub fn unknown(error: Box<dyn std::error::Error>) -> Self {
let message = error.to_string();
Self::Unknown { message }
}
}
/// A node's attribute name. /// A node's attribute name.
#[derive(Serialize, Deserialize, Clone, Debug, Hash, Eq, PartialEq)] #[derive(Serialize, Deserialize, Clone, Debug, Hash, Eq, PartialEq)]
#[serde(transparent)] #[serde(transparent)]
@ -154,6 +170,14 @@ pub struct NodeConfig {
keys: HashMap<String, Key>, keys: HashMap<String, Key>,
} }
#[async_trait]
trait NixCommand {
async fn passthrough(&mut self) -> NixResult<()>;
async fn capture_output(&mut self) -> NixResult<String>;
async fn capture_json<T>(&mut self) -> NixResult<T> where T: DeserializeOwned;
async fn capture_store_path(&mut self) -> NixResult<StorePath>;
}
impl NodeName { impl NodeName {
/// Returns the string. /// Returns the string.
pub fn as_str(&self) -> &str { pub fn as_str(&self) -> &str {
@ -218,14 +242,6 @@ impl NodeConfig {
} }
} }
#[async_trait]
trait NixCommand {
async fn passthrough(&mut self) -> NixResult<()>;
async fn capture_output(&mut self) -> NixResult<String>;
async fn capture_json<T>(&mut self) -> NixResult<T> where T: DeserializeOwned;
async fn capture_store_path(&mut self) -> NixResult<StorePath>;
}
#[async_trait] #[async_trait]
impl NixCommand for Command { impl NixCommand for Command {
/// Runs the command with stdout and stderr passed through to the user. /// Runs the command with stdout and stderr passed through to the user.

260
src/nix/node_filter.rs Normal file
View file

@ -0,0 +1,260 @@
//! Node filters.
use std::collections::HashSet;
use std::convert::AsRef;
use std::iter::{Iterator, FromIterator};
use glob::Pattern as GlobPattern;
use super::{NixError, NixResult, NodeName, NodeConfig};
/// A node filter containing a list of rules.
pub struct NodeFilter {
rules: Vec<Rule>,
}
/// A filter rule.
///
/// The filter rules are OR'd together.
#[derive(Debug, Eq, PartialEq)]
enum Rule {
/// Matches a node's attribute name.
MatchName(GlobPattern),
/// Matches a node's `deployment.tags`.
MatchTag(GlobPattern),
}
impl NodeFilter {
/// Creates a new filter using an expression passed using `--on`.
pub fn new<S: AsRef<str>>(filter: S) -> NixResult<Self> {
let filter = filter.as_ref();
let trimmed = filter.trim();
if trimmed.len() == 0 {
log::warn!("Filter \"{}\" is blank and will match nothing", filter);
return Ok(Self {
rules: Vec::new(),
});
}
let rules = trimmed.split(",").map(|pattern| {
let pattern = pattern.trim();
if pattern.len() == 0 {
return Err(NixError::EmptyFilterRule);
}
if let Some(tag_pattern) = pattern.strip_prefix("@") {
Ok(Rule::MatchTag(GlobPattern::new(tag_pattern).unwrap()))
} else {
Ok(Rule::MatchName(GlobPattern::new(pattern).unwrap()))
}
}).collect::<Vec<NixResult<Rule>>>();
let rules = Result::from_iter(rules)?;
Ok(Self {
rules,
})
}
/// Returns whether the filter has any rule matching NodeConfig information.
///
/// Evaluating `config.deployment` can potentially be very expensive,
/// especially when its values (e.g., tags) depend on other parts of
/// the configuration.
pub fn has_node_config_rules(&self) -> bool {
self.rules.iter().find(|rule| rule.matches_node_config()).is_some()
}
/// Runs the filter against a set of NodeConfigs and returns the matched ones.
pub fn filter_node_configs<'a, I>(&self, nodes: I) -> HashSet<NodeName>
where I: Iterator<Item = (&'a NodeName, &'a NodeConfig)>
{
if self.rules.len() == 0 {
return HashSet::new();
}
nodes.filter_map(|(name, node)| {
for rule in self.rules.iter() {
match rule {
Rule::MatchName(pat) => {
if pat.matches(name.as_str()) {
return Some(name);
}
}
Rule::MatchTag(pat) => {
for tag in node.tags() {
if pat.matches(tag) {
return Some(name);
}
}
}
}
}
None
}).cloned().collect()
}
/// Runs the filter against a set of node names and returns the matched ones.
pub fn filter_node_names(&self, nodes: &[NodeName]) -> NixResult<HashSet<NodeName>> {
nodes.iter().filter_map(|name| -> Option<NixResult<NodeName>> {
for rule in self.rules.iter() {
match rule {
Rule::MatchName(pat) => {
if pat.matches(name.as_str()) {
return Some(Ok(name.clone()));
}
}
_ => {
return Some(Err(NixError::Unknown {
message: format!("Not enough information to run rule {:?} - We only have node names", rule),
}));
}
}
}
None
}).collect()
}
}
impl Rule {
/// Returns whether the rule matches against the NodeConfig (i.e., `config.deployment`).
pub fn matches_node_config(&self) -> bool {
match self {
Self::MatchTag(_) => true,
Self::MatchName(_) => false,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::{HashMap, HashSet};
macro_rules! node {
($n:expr) => {
NodeName::new($n.to_string()).unwrap()
}
}
#[test]
fn test_empty_filter() {
let filter = NodeFilter::new("").unwrap();
assert_eq!(0, filter.rules.len());
let filter = NodeFilter::new("\t").unwrap();
assert_eq!(0, filter.rules.len());
let filter = NodeFilter::new(" ").unwrap();
assert_eq!(0, filter.rules.len());
}
#[test]
fn test_empty_filter_rule() {
assert!(NodeFilter::new(",").is_err());
assert!(NodeFilter::new("a,,b").is_err());
assert!(NodeFilter::new("a,b,c,").is_err());
}
#[test]
fn test_filter_rule_mixed() {
let filter = NodeFilter::new("@router,gamma-*").unwrap();
assert_eq!(
vec![
Rule::MatchTag(GlobPattern::new("router").unwrap()),
Rule::MatchName(GlobPattern::new("gamma-*").unwrap()),
],
filter.rules,
);
let filter = NodeFilter::new("a, \t@b , c-*").unwrap();
assert_eq!(
vec![
Rule::MatchName(GlobPattern::new("a").unwrap()),
Rule::MatchTag(GlobPattern::new("b").unwrap()),
Rule::MatchName(GlobPattern::new("c-*").unwrap()),
],
filter.rules,
);
}
#[test]
fn test_filter_node_names() {
let nodes = vec![ node!("lax-alpha"), node!("lax-beta"), node!("sfo-gamma") ];
assert_eq!(
&HashSet::from_iter([ node!("lax-alpha") ]),
&NodeFilter::new("lax-alpha").unwrap().filter_node_names(&nodes).unwrap(),
);
assert_eq!(
&HashSet::from_iter([ node!("lax-alpha"), node!("lax-beta") ]),
&NodeFilter::new("lax-*").unwrap().filter_node_names(&nodes).unwrap(),
);
}
#[test]
fn test_filter_node_configs() {
// TODO: Better way to mock
let template = NodeConfig {
tags: vec![],
target_host: None,
target_user: None,
target_port: None,
allow_local_deployment: false,
replace_unknown_profiles: false,
privilege_escalation_command: vec![],
keys: HashMap::new(),
};
let mut nodes = HashMap::new();
nodes.insert(node!("alpha"), NodeConfig {
tags: vec![ "web".to_string(), "infra-lax".to_string() ],
..template.clone()
});
nodes.insert(node!("beta"), NodeConfig {
tags: vec![ "router".to_string(), "infra-sfo".to_string() ],
..template.clone()
});
nodes.insert(node!("gamma-a"), NodeConfig {
tags: vec![ "controller".to_string() ],
..template.clone()
});
nodes.insert(node!("gamma-b"), NodeConfig {
tags: vec![ "ewaste".to_string() ],
..template.clone()
});
assert_eq!(4, nodes.len());
assert_eq!(
&HashSet::from_iter([ node!("alpha") ]),
&NodeFilter::new("@web").unwrap().filter_node_configs(nodes.iter()),
);
assert_eq!(
&HashSet::from_iter([ node!("alpha"), node!("beta") ]),
&NodeFilter::new("@infra-*").unwrap().filter_node_configs(nodes.iter()),
);
assert_eq!(
&HashSet::from_iter([ node!("beta"), node!("gamma-a") ]),
&NodeFilter::new("@router,@controller").unwrap().filter_node_configs(nodes.iter()),
);
assert_eq!(
&HashSet::from_iter([ node!("beta"), node!("gamma-a"), node!("gamma-b") ]),
&NodeFilter::new("@router,gamma-*").unwrap().filter_node_configs(nodes.iter()),
);
}
}

View file

@ -65,7 +65,7 @@ impl Profile {
} }
/// A map of names to their associated NixOS system profiles. /// A map of names to their associated NixOS system profiles.
#[derive(Debug)] #[derive(Debug, Clone)]
pub struct ProfileMap(HashMap<NodeName, Profile>); pub struct ProfileMap(HashMap<NodeName, Profile>);
impl Deref for ProfileMap { impl Deref for ProfileMap {

View file

@ -1,7 +1,6 @@
//! Integration-ish tests //! Integration-ish tests
use super::*; use super::*;
use crate::progress::TaskProgress;
use std::collections::HashSet; use std::collections::HashSet;
use std::hash::Hash; use std::hash::Hash;
@ -69,16 +68,14 @@ impl TempHive {
/// Asserts that the specified nodes can be fully evaluated. /// Asserts that the specified nodes can be fully evaluated.
pub fn eval_success(text: &str, nodes: Vec<NodeName>) { pub fn eval_success(text: &str, nodes: Vec<NodeName>) {
let hive = Self::new(text); let hive = Self::new(text);
let progress = TaskProgress::new("tests".to_string(), 5); let profiles = block_on(hive.eval_selected(&nodes, None));
let (profiles, _) = block_on(hive.eval_selected(&nodes, progress));
assert!(profiles.is_ok()); assert!(profiles.is_ok());
} }
/// Asserts that the specified nodes will fail to evaluate. /// Asserts that the specified nodes will fail to evaluate.
pub fn eval_failure(text: &str, nodes: Vec<NodeName>) { pub fn eval_failure(text: &str, nodes: Vec<NodeName>) {
let hive = Self::new(text); let hive = Self::new(text);
let progress = TaskProgress::new("tests".to_string(), 5); let profiles = block_on(hive.eval_selected(&nodes, None));
let (profiles, _) = block_on(hive.eval_selected(&nodes, progress));
assert!(profiles.is_err()); assert!(profiles.is_err());
} }
} }

View file

@ -1,227 +0,0 @@
//! Progress display utilities.
use std::future::Future;
use std::sync::Arc;
use std::time::Duration;
use atty::Stream;
use console::Style;
use indicatif::{
MultiProgress,
ProgressStyle as IndicatifStyle,
ProgressBar as IndicatifBar,
};
pub fn get_spinner_styles(label_width: usize) -> (IndicatifStyle, IndicatifStyle) {
let template = format!("{{prefix:>{}.bold.dim}} {{spinner}} {{elapsed}} {{wide_msg}}", label_width);
(
IndicatifStyle::default_spinner()
.tick_chars("🕛🕐🕑🕒🕓🕔🕕🕖🕗🕘🕙🕚✅")
.template(&template),
IndicatifStyle::default_spinner()
.tick_chars("❌❌")
.template(&template),
)
}
pub enum OutputStyle {
/// Show condensed progress bars with fancy spinners.
///
/// Not usable in a non-interactive environment.
Condensed,
/// Output log lines directly to console.
Plain,
}
/// Parallel progress display.
///
/// Currently a simple wrapper over MultiProgress.
/// Sometimes we need to log directly to the console, in case
/// stdout is not connected to a TTY or the user requests
/// verbose logging via `--verbose`.
///
/// This is normally only usable as Arc<Progress>.
pub struct Progress {
multi: Option<Arc<MultiProgress>>, // eww
/// Width of the labels for alignment
label_width: usize,
}
impl Progress {
pub fn with_style(output_style: OutputStyle) -> Self {
let multi = match output_style {
OutputStyle::Condensed => Some(Arc::new(Self::init_multi())),
OutputStyle::Plain => None,
};
Self {
multi,
label_width: 10,
}
}
pub fn set_label_width(&mut self, width: usize) {
self.label_width = width;
}
/// Returns a handle for a task to display progress information.
pub fn create_task_progress(&self, label: String) -> TaskProgress {
let mut progress = TaskProgress::new(label.clone(), self.label_width);
if let Some(multi) = self.multi.as_ref() {
let bar = multi.add(IndicatifBar::new(100));
let (style, _) = get_spinner_styles(self.label_width);
bar.set_prefix(label);
bar.set_style(style);
bar.enable_steady_tick(100);
progress.set_bar(bar);
}
progress
}
/// Runs code that may initate multiple tasks.
pub async fn run<F: Future, U>(self: Arc<Self>, func: U) -> F::Output
where U: FnOnce(Arc<Progress>) -> F
{
// TODO: Remove this - Previous trick no longer required in indicatif 0.7
func(self.clone()).await
}
fn init_multi() -> MultiProgress {
let multi = MultiProgress::new();
multi
}
fn detect_output() -> OutputStyle {
if atty::is(Stream::Stdout) {
OutputStyle::Condensed
} else {
OutputStyle::Plain
}
}
}
impl Default for Progress {
fn default() -> Self {
let style = Self::detect_output();
Self::with_style(style)
}
}
/// Progress display for a single task.
#[derive(Debug, Clone)]
pub struct TaskProgress {
label: String,
label_width: usize,
bar: Option<IndicatifBar>,
quiet: bool,
}
impl TaskProgress {
pub fn new(label: String, label_width: usize) -> Self {
Self {
label,
label_width,
bar: None,
quiet: false,
}
}
fn set_bar(&mut self, bar: IndicatifBar) {
self.bar = Some(bar);
}
/// Displays a new line of log.
pub fn log(&mut self, message: &str) {
if self.quiet {
return;
}
if let Some(bar) = self.bar.as_ref() {
bar.set_message(message.to_owned());
} else {
let style = Style::new().bold();
self.plain_print(style, message);
}
}
/// Marks the task as successful and leave the spinner intact.
pub fn success(self, message: &str) {
if self.quiet {
return;
}
if let Some(bar) = self.bar.as_ref() {
bar.finish_with_message(message.to_owned());
} else {
let style = Style::new().bold().green();
self.plain_print(style, message);
}
}
/// Marks the task as successful and remove the spinner.
pub fn success_quiet(self) {
if self.quiet {
return;
}
if let Some(bar) = self.bar.as_ref() {
bar.finish_and_clear();
}
}
/// Marks the task as unsuccessful.
pub fn failure(self, message: &str) {
if self.quiet {
return;
}
if let Some(bar) = self.bar.as_ref() {
let (_, fail_style) = get_spinner_styles(self.label_width);
bar.set_style(fail_style);
bar.abandon_with_message(message.to_owned());
} else {
let style = Style::new().bold().red();
self.plain_print(style, message);
}
}
/// Returns the time spent on this task so far.
pub fn get_elapsed(&self) -> Option<Duration> {
self.bar.as_ref().map(|bar| bar.elapsed())
}
/// Sets the time spent on this task so far.
pub fn set_elapsed(&mut self, elapsed: Duration) {
if let Some(bar) = self.bar.take() {
self.bar.replace(bar.with_elapsed(elapsed));
}
}
pub fn failure_err<E: std::error::Error>(self, error: &E) {
self.failure(&error.to_string())
}
fn plain_print(&self, style: Style, line: &str) {
eprintln!("{:>width$} | {}", style.apply_to(&self.label), line, width = self.label_width);
}
}
impl Default for TaskProgress {
/// Creates a TaskProgress that does nothing.
fn default() -> Self {
Self {
label: String::new(),
label_width: 0,
bar: None,
quiet: true,
}
}
}

166
src/progress/mod.rs Normal file
View file

@ -0,0 +1,166 @@
//! Progress output.
//!
//! Displaying of progress is handled through a ProgressOutput. Each
//! ProgressOutput is minimally-stateful and receives already formatted
//! text from a writer (e.g., a JobMonitor).
pub mod plain;
pub mod spinner;
use async_trait::async_trait;
use tokio::sync::mpsc::{self,
UnboundedReceiver as TokioReceiver,
UnboundedSender as TokioSender,
};
use crate::job::JobId;
use crate::nix::NixResult;
pub use plain::PlainOutput;
pub use spinner::SpinnerOutput;
pub type Sender = TokioSender<Message>;
pub type Receiver = TokioReceiver<Message>;
const DEFAULT_LABEL_WIDTH: usize = 5;
pub enum SimpleProgressOutput {
Plain(PlainOutput),
Spinner(SpinnerOutput),
}
/// A progress display driver.
#[async_trait]
pub trait ProgressOutput : Sized {
/// Runs until a Message::Complete is received.
async fn run_until_completion(self) -> NixResult<Self>;
/// Returns a sender.
///
/// This method can only be called once.
fn get_sender(&mut self) -> Option<Sender>;
}
/// A message.
#[derive(Debug, Clone)]
pub enum Message {
/// Prints a line of text to the screen.
Print(Line),
/// Prints a line of text related to the overall progress.
///
/// For certain output types, this will be printed in a fixed,
/// prominent position with special styling.
PrintMeta(Line),
/// Hints about the maximum label width.
HintLabelWidth(usize),
/// Completes the progress output.
Complete,
}
/// A line of output.
#[derive(Debug, Clone)]
pub struct Line {
/// Identifier for elapsed time tracking.
job_id: JobId,
/// Style of the line.
style: LineStyle,
/// A label.
label: String,
/// The text.
text: String,
/// Whether this is an one-off output.
one_off: bool,
/// Whether this is line is noisy.
noisy: bool,
}
/// Style of a line.
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum LineStyle {
Normal,
Success,
SuccessNoop,
Failure,
}
impl SimpleProgressOutput {
pub fn new(verbose: bool) -> Self {
if verbose {
Self::Plain(PlainOutput::new())
} else {
Self::Spinner(SpinnerOutput::new())
}
}
pub fn get_sender(&mut self) -> Option<Sender> {
match self {
Self::Plain(ref mut o) => o.get_sender(),
Self::Spinner(ref mut o) => o.get_sender(),
}
}
pub async fn run_until_completion(self) -> NixResult<Self> {
match self {
Self::Plain(o) => {
o.run_until_completion().await
.map(|o| Self::Plain(o))
}
Self::Spinner(o) => {
o.run_until_completion().await
.map(|o| Self::Spinner(o))
}
}
}
}
impl Line {
pub fn new(job_id: JobId, text: String) -> Self {
Self {
job_id,
style: LineStyle::Normal,
label: String::new(),
text,
one_off: false,
noisy: false,
}
}
/// Builder-like interface to set the line as an one-off output.
///
/// For SpinnerOutput, this will create a new bar that immediately
/// finishes with the style (success or failure).
pub fn one_off(mut self) -> Self {
self.one_off = true;
self
}
/// Builder-like interface to set the line as noisy.
pub fn noisy(mut self) -> Self {
self.noisy = true;
self
}
/// Builder-like interface to set the label.
pub fn label(mut self, label: String) -> Self {
self.label = label;
self
}
/// Builder-like interface to set the line style.
pub fn style(mut self, style: LineStyle) -> Self {
self.style = style;
self
}
}
fn create_channel() -> (Sender, Receiver) {
mpsc::unbounded_channel()
}

113
src/progress/plain.rs Normal file
View file

@ -0,0 +1,113 @@
//! Plain output.
use async_trait::async_trait;
use console::Style as ConsoleStyle;
use crate::nix::NixResult;
use super::{
DEFAULT_LABEL_WIDTH,
ProgressOutput,
Sender,
Receiver,
Message,
Line,
LineStyle,
create_channel,
};
pub struct PlainOutput {
sender: Option<Sender>,
receiver: Receiver,
label_width: usize,
}
impl PlainOutput {
pub fn new() -> Self {
let (sender, receiver) = create_channel();
Self {
sender: Some(sender),
receiver,
label_width: DEFAULT_LABEL_WIDTH,
}
}
fn print(&self, line: Line) {
if line.noisy {
return;
}
let label_style = match line.style {
LineStyle::Normal => {
ConsoleStyle::new().bold()
}
LineStyle::Success => {
ConsoleStyle::new().bold().green()
}
LineStyle::SuccessNoop => {
ConsoleStyle::new().bold().green().dim()
}
LineStyle::Failure => {
ConsoleStyle::new().bold().red()
}
};
let text_style = match line.style {
LineStyle::Normal => {
ConsoleStyle::new()
}
LineStyle::Success => {
ConsoleStyle::new().green()
}
LineStyle::SuccessNoop => {
ConsoleStyle::new().dim()
}
LineStyle::Failure => {
ConsoleStyle::new().red()
}
};
eprintln!("{:>width$} | {}",
label_style.apply_to(line.label),
text_style.apply_to(line.text),
width = self.label_width,
);
}
}
#[async_trait]
impl ProgressOutput for PlainOutput {
async fn run_until_completion(mut self) -> NixResult<Self> {
loop {
let message = self.receiver.recv().await;
if message.is_none() {
log::info!("All senders dropped");
return Ok(self);
}
let message = message.unwrap();
match message {
Message::Complete => {
return Ok(self);
}
Message::Print(line) => {
self.print(line);
}
Message::PrintMeta(line) => {
self.print(line);
}
Message::HintLabelWidth(width) => {
if width > self.label_width {
self.label_width = width;
}
}
}
}
}
fn get_sender(&mut self) -> Option<Sender> {
self.sender.take()
}
}

242
src/progress/spinner.rs Normal file
View file

@ -0,0 +1,242 @@
//! Progress spinner output.
use std::collections::HashMap;
use std::time::Instant;
use async_trait::async_trait;
use indicatif::{MultiProgress, ProgressStyle, ProgressBar};
use crate::job::JobId;
use crate::nix::NixResult;
use super::{
DEFAULT_LABEL_WIDTH,
ProgressOutput,
Sender,
Receiver,
Message,
Line,
LineStyle,
create_channel,
};
/// Progress spinner output.
pub struct SpinnerOutput {
/// Job timekeeping.
job_state: HashMap<JobId, JobState>,
/// One-off progress bars.
one_off_bars: Vec<(ProgressBar, LineStyle)>,
/// Progress bar for the meta job.
meta_bar: ProgressBar,
/// Last style printed to the meta bar.
meta_style: LineStyle,
/// Maximum label width for alignment.
label_width: usize,
multi: MultiProgress,
sender: Option<Sender>,
receiver: Receiver,
}
#[derive(Clone)]
struct JobState {
/// When the job started.
since: Instant,
/// Progress bar to draw to.
bar: ProgressBar,
/// Last style printed to the bar.
///
/// This is used to regenerate the approproate style when the
/// max label width changes.
style: LineStyle,
}
impl SpinnerOutput {
pub fn new() -> Self {
let meta_bar = {
let bar = ProgressBar::new(100)
.with_style(get_spinner_style(DEFAULT_LABEL_WIDTH, LineStyle::Normal));
bar
};
let (sender, receiver) = create_channel();
Self {
multi: MultiProgress::new(),
job_state: HashMap::new(),
one_off_bars: Vec::new(),
meta_bar,
meta_style: LineStyle::Normal,
label_width: DEFAULT_LABEL_WIDTH,
sender: Some(sender),
receiver,
}
}
/// Returns the state of a job.
fn get_job_state(&mut self, job_id: JobId) -> JobState {
if let Some(state) = self.job_state.get(&job_id) {
state.clone()
} else {
let bar = self.create_bar(LineStyle::Normal);
let state = JobState::new(bar.clone());
self.job_state.insert(job_id, state.clone());
state
}
}
/// Creates a new bar.
fn create_bar(&self, style: LineStyle) -> ProgressBar {
let bar = ProgressBar::new(100)
.with_style(self.get_spinner_style(style));
let bar = self.multi.add(bar.clone());
bar.enable_steady_tick(100);
bar
}
fn print(&mut self, line: Line, meta: bool) {
if line.label.len() > self.label_width {
self.label_width = line.label.len();
self.reset_styles();
}
let bar = if meta {
if self.meta_style != line.style {
self.meta_style = line.style;
self.meta_bar.set_style(self.get_spinner_style(line.style));
}
self.meta_bar.clone()
} else {
let mut state = self.get_job_state(line.job_id);
if line.one_off {
let bar = self.create_bar(line.style);
state.configure_one_off(&bar);
self.one_off_bars.push((bar.clone(), line.style));
bar
} else {
let bar = state.bar.clone();
if state.style != line.style {
state.style = line.style;
bar.set_style(self.get_spinner_style(line.style));
self.job_state.insert(line.job_id, state);
}
bar
}
};
bar.set_prefix(line.label);
match line.style {
LineStyle::Success | LineStyle::Failure => {
bar.finish_with_message(line.text);
}
LineStyle::SuccessNoop => {
bar.finish_and_clear();
}
_ => {
bar.set_message(line.text);
}
}
}
/// Resets the styles of all known bars.
fn reset_styles(&self) {
for (bar, style) in &self.one_off_bars {
let style = self.get_spinner_style(*style);
bar.set_style(style);
}
for state in self.job_state.values() {
let style = self.get_spinner_style(state.style);
state.bar.set_style(style);
}
let style = self.get_spinner_style(self.meta_style);
self.meta_bar.set_style(style);
}
fn get_spinner_style(&self, style: LineStyle) -> ProgressStyle {
get_spinner_style(self.label_width, style)
}
}
#[async_trait]
impl ProgressOutput for SpinnerOutput {
async fn run_until_completion(mut self) -> NixResult<Self> {
let meta_bar = self.multi.add(self.meta_bar.clone());
meta_bar.enable_steady_tick(100);
loop {
let message = self.receiver.recv().await;
if message.is_none() {
return Ok(self);
}
let message = message.unwrap();
match message {
Message::Complete => {
return Ok(self);
}
Message::Print(line) => {
self.print(line, false);
}
Message::PrintMeta(line) => {
self.print(line, true);
}
Message::HintLabelWidth(width) => {
if width > self.label_width {
self.label_width = width;
self.reset_styles();
}
}
}
}
}
fn get_sender(&mut self) -> Option<Sender> {
self.sender.take()
}
}
impl JobState {
fn new(bar: ProgressBar) -> Self {
Self {
since: Instant::now(),
bar,
style: LineStyle::Normal,
}
}
fn configure_one_off(&self, bar: &ProgressBar) {
bar.clone().with_elapsed(Instant::now().duration_since(self.since));
}
}
fn get_spinner_style(label_width: usize, style: LineStyle) -> ProgressStyle {
let template = format!("{{prefix:>{}.bold.dim}} {{spinner}} {{elapsed}} {{wide_msg}}", label_width);
match style {
LineStyle::Normal | LineStyle::Success | LineStyle::SuccessNoop => {
ProgressStyle::default_spinner()
.tick_chars("🕛🕐🕑🕒🕓🕔🕕🕖🕗🕘🕙🕚✅")
.template(&template)
}
LineStyle::Failure => {
ProgressStyle::default_spinner()
.tick_chars("❌❌")
.template(&template)
}
}
}

View file

@ -1,25 +1,19 @@
use std::collections::HashMap;
use std::path::PathBuf; use std::path::PathBuf;
use std::process::Stdio; use std::process::Stdio;
use clap::{App, Arg, ArgMatches}; use clap::{App, Arg, ArgMatches};
use futures::future::join3; use futures::future::join3;
use glob::Pattern as GlobPattern;
use tokio::io::{AsyncRead, AsyncBufReadExt, BufReader}; use tokio::io::{AsyncRead, AsyncBufReadExt, BufReader};
use tokio::process::Command; use tokio::process::Command;
use super::nix::{Flake, NodeName, NodeConfig, Hive, HivePath, NixResult}; use super::nix::{Flake, Hive, HivePath, NixResult};
use super::progress::TaskProgress; use super::nix::deployment::TargetNodeMap;
use super::job::JobHandle;
enum NodeFilter { /// Non-interactive execution of an arbitrary command.
NameFilter(GlobPattern),
TagFilter(GlobPattern),
}
/// Non-interactive execution of an arbitrary Nix command.
pub struct CommandExecution { pub struct CommandExecution {
command: Command, command: Command,
progress_bar: TaskProgress, job: Option<JobHandle>,
stdout: Option<String>, stdout: Option<String>,
stderr: Option<String>, stderr: Option<String>,
} }
@ -28,23 +22,23 @@ impl CommandExecution {
pub fn new(command: Command) -> Self { pub fn new(command: Command) -> Self {
Self { Self {
command, command,
progress_bar: TaskProgress::default(), job: None,
stdout: None, stdout: None,
stderr: None, stderr: None,
} }
} }
/// Provides a TaskProgress to use to display output. /// Sets the job associated with this execution.
pub fn set_progress_bar(&mut self, bar: TaskProgress) { pub fn set_job(&mut self, job: Option<JobHandle>) {
self.progress_bar = bar; self.job = job;
} }
/// Retrieve logs from the last invocation. /// Returns logs from the last invocation.
pub fn get_logs(&self) -> (Option<&String>, Option<&String>) { pub fn get_logs(&self) -> (Option<&String>, Option<&String>) {
(self.stdout.as_ref(), self.stderr.as_ref()) (self.stdout.as_ref(), self.stderr.as_ref())
} }
/// Run the command. /// Runs the command.
pub async fn run(&mut self) -> NixResult<()> { pub async fn run(&mut self) -> NixResult<()> {
self.command.stdin(Stdio::null()); self.command.stdin(Stdio::null());
self.command.stdout(Stdio::piped()); self.command.stdout(Stdio::piped());
@ -59,8 +53,8 @@ impl CommandExecution {
let stderr = BufReader::new(child.stderr.take().unwrap()); let stderr = BufReader::new(child.stderr.take().unwrap());
let futures = join3( let futures = join3(
capture_stream(stdout, self.progress_bar.clone()), capture_stream(stdout, self.job.clone(), false),
capture_stream(stderr, self.progress_bar.clone()), capture_stream(stderr, self.job.clone(), true),
child.wait(), child.wait(),
); );
@ -145,44 +139,6 @@ pub async fn hive_from_args(args: &ArgMatches<'_>) -> NixResult<Hive> {
Ok(hive) Ok(hive)
} }
pub fn filter_nodes(nodes: &HashMap<NodeName, NodeConfig>, filter: &str) -> Vec<NodeName> {
let filters: Vec<NodeFilter> = filter.split(",").map(|pattern| {
use NodeFilter::*;
if let Some(tag_pattern) = pattern.strip_prefix("@") {
TagFilter(GlobPattern::new(tag_pattern).unwrap())
} else {
NameFilter(GlobPattern::new(pattern).unwrap())
}
}).collect();
if filters.len() > 0 {
nodes.iter().filter_map(|(name, node)| {
for filter in filters.iter() {
use NodeFilter::*;
match filter {
TagFilter(pat) => {
// Welp
for tag in node.tags() {
if pat.matches(tag) {
return Some(name);
}
}
}
NameFilter(pat) => {
if pat.matches(name) {
return Some(name)
}
}
}
}
None
}).cloned().collect()
} else {
nodes.keys().cloned().collect()
}
}
pub fn register_selector_args<'a, 'b>(command: App<'a, 'b>) -> App<'a, 'b> { pub fn register_selector_args<'a, 'b>(command: App<'a, 'b>) -> App<'a, 'b> {
command command
.arg(Arg::with_name("on") .arg(Arg::with_name("on")
@ -208,7 +164,7 @@ fn canonicalize_cli_path(path: &str) -> PathBuf {
} }
} }
pub async fn capture_stream<R: AsyncRead + Unpin>(mut stream: BufReader<R>, mut progress_bar: TaskProgress) -> String { pub async fn capture_stream<R: AsyncRead + Unpin>(mut stream: BufReader<R>, job: Option<JobHandle>, stderr: bool) -> String {
let mut log = String::new(); let mut log = String::new();
loop { loop {
@ -220,7 +176,14 @@ pub async fn capture_stream<R: AsyncRead + Unpin>(mut stream: BufReader<R>, mut
} }
let trimmed = line.trim_end(); let trimmed = line.trim_end();
progress_bar.log(trimmed);
if let Some(job) = &job {
if stderr {
job.stderr(trimmed.to_string()).unwrap();
} else {
job.stdout(trimmed.to_string()).unwrap();
}
}
log += trimmed; log += trimmed;
log += "\n"; log += "\n";
@ -228,3 +191,7 @@ pub async fn capture_stream<R: AsyncRead + Unpin>(mut stream: BufReader<R>, mut
log log
} }
pub fn get_label_width(targets: &TargetNodeMap) -> Option<usize> {
targets.keys().map(|n| n.len()).max()
}