From 0cb3f8e968ac7d7c0321dbf7cc8c7e5e862080cc Mon Sep 17 00:00:00 2001 From: Zhaofeng Li Date: Sat, 20 Nov 2021 23:34:52 -0800 Subject: [PATCH] Redesign deployment process (again) We now ship Events from different parts of the deployment process via a channel to a job monitor. --- Cargo.lock | 27 + Cargo.toml | 2 + default.nix | 2 +- flake.nix | 2 +- src/command/apply.rs | 145 ++--- src/command/apply_local.rs | 41 +- src/command/exec.rs | 115 ++-- src/command/test_progress.rs | 60 +- src/job.rs | 945 +++++++++++++++++++++++++++++++ src/main.rs | 3 + src/nix/deployment.rs | 796 -------------------------- src/nix/deployment/goal.rs | 100 ++++ src/nix/deployment/limits.rs | 109 ++++ src/nix/deployment/mod.rs | 435 ++++++++++++++ src/nix/deployment/options.rs | 67 +++ src/nix/{ => hive}/eval.nix | 19 +- src/nix/{hive.rs => hive/mod.rs} | 149 +++-- src/nix/host/key_uploader.rs | 12 +- src/nix/host/local.rs | 52 +- src/nix/host/mod.rs | 15 +- src/nix/host/ssh.rs | 37 +- src/nix/mod.rs | 36 +- src/nix/node_filter.rs | 260 +++++++++ src/nix/profile.rs | 2 +- src/nix/tests/mod.rs | 7 +- src/progress.rs | 227 -------- src/progress/mod.rs | 166 ++++++ src/progress/plain.rs | 113 ++++ src/progress/spinner.rs | 242 ++++++++ src/util.rs | 85 +-- 30 files changed, 2861 insertions(+), 1410 deletions(-) create mode 100644 src/job.rs delete mode 100644 src/nix/deployment.rs create mode 100644 src/nix/deployment/goal.rs create mode 100644 src/nix/deployment/limits.rs create mode 100644 src/nix/deployment/mod.rs create mode 100644 src/nix/deployment/options.rs rename src/nix/{ => hive}/eval.nix (96%) rename src/nix/{hive.rs => hive/mod.rs} (67%) create mode 100644 src/nix/node_filter.rs delete mode 100644 src/progress.rs create mode 100644 src/progress/mod.rs create mode 100644 src/progress/plain.rs create mode 100644 src/progress/spinner.rs diff --git a/Cargo.lock b/Cargo.lock index b5c8926..6feeb99 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -133,6 +133,7 @@ dependencies = [ "glob", "hostname", "indicatif", + "itertools", "lazy_static", "libc", "log", @@ -147,6 +148,7 @@ dependencies = [ "tokio", "tokio-test", "users", + "uuid", "validator", ] @@ -206,6 +208,12 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10" +[[package]] +name = "either" +version = "1.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457" + [[package]] name = "encode_unicode" version = "0.3.6" @@ -400,6 +408,15 @@ dependencies = [ "regex", ] +[[package]] +name = "itertools" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69ddb889f9d0d08a67338271fa9b62996bc788c7796a5c18cf057420aaed5eaf" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "0.4.8" @@ -952,6 +969,16 @@ dependencies = [ "log", ] +[[package]] +name = "uuid" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc5cf98d8186244414c848017f0e2676b3fcb46807f6668a97dfe67359a3c4b7" +dependencies = [ + "getrandom", + "serde", +] + [[package]] name = "validator" version = "0.12.0" diff --git a/Cargo.toml b/Cargo.toml index 9de03ee..69cbe7f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,7 @@ env_logger = "0.8.2" futures = "0.3.8" glob = "0.3.0" hostname = "0.3.1" +itertools = "0.10.1" lazy_static = "1.4.0" libc = "0.2.81" log = "0.4.11" @@ -30,6 +31,7 @@ snafu = "0.6.10" tempfile = "3.1.0" tokio-test = "0.4.0" users = "0.11.0" +uuid = { version = "0.8.2", features = ["serde", "v4"] } validator = { version = "0.12", features = ["derive"] } # For https://github.com/mitsuhiko/indicatif/pull/325 diff --git a/default.nix b/default.nix index 61173ad..f28625c 100644 --- a/default.nix +++ b/default.nix @@ -20,7 +20,7 @@ in rustPlatform.buildRustPackage rec { src = lib.cleanSource ./.; }; - cargoSha256 = "sha256-DJ+8XeGyg2EQdnHjmzN37fIuYa7HH+unM27RFHXHaso="; + cargoSha256 = "sha256-HGqecerb5LgnPhetqBYEmDKpJBkgzLS+iviVkDgVyGI="; postInstall = lib.optionalString (stdenv.hostPlatform == stdenv.buildPlatform) '' mkdir completions diff --git a/flake.nix b/flake.nix index ce7dea3..76be8b7 100644 --- a/flake.nix +++ b/flake.nix @@ -24,7 +24,7 @@ # Full user manual manual = let colmena = self.packages.${system}.colmena; - evalNix = import ./src/nix/eval.nix { + evalNix = import ./src/nix/hive/eval.nix { hermetic = true; }; deploymentOptionsMd = (pkgs.nixosOptionsDoc { diff --git a/src/command/apply.rs b/src/command/apply.rs index f77c871..c7284cc 100644 --- a/src/command/apply.rs +++ b/src/command/apply.rs @@ -1,20 +1,17 @@ -use std::collections::HashMap; use std::env; use std::path::PathBuf; -use std::sync::Arc; use clap::{Arg, App, SubCommand, ArgMatches}; use crate::nix::deployment::{ Deployment, Goal, - Target, - DeploymentOptions, + Options, EvaluationNodeLimit, ParallelismLimit, }; -use crate::nix::NixError; -use crate::nix::host::local as localhost; +use crate::progress::SimpleProgressOutput; +use crate::nix::{NixError, NodeFilter}; use crate::util; pub fn register_deploy_args<'a, 'b>(command: App<'a, 'b>) -> App<'a, 'b> { @@ -118,105 +115,60 @@ pub fn subcommand() -> App<'static, 'static> { pub async fn run(_global_args: &ArgMatches<'_>, local_args: &ArgMatches<'_>) -> Result<(), NixError> { let hive = util::hive_from_args(local_args).await?; - log::info!("Enumerating nodes..."); - let all_nodes = hive.deployment_info().await?; - - let nix_options = hive.nix_options().await?; - - let selected_nodes = match local_args.value_of("on") { - Some(filter) => { - util::filter_nodes(&all_nodes, filter) - } - None => all_nodes.keys().cloned().collect(), - }; - - if selected_nodes.len() == 0 { - log::warn!("No hosts matched. Exiting..."); - quit::with_code(2); - } - let ssh_config = env::var("SSH_CONFIG_FILE") .ok().map(PathBuf::from); - // FIXME: This is ugly :/ Make an enum wrapper for this fake "keys" goal - let goal_arg = local_args.value_of("goal").unwrap(); - let goal = if goal_arg == "keys" { - Goal::Build + let filter = if let Some(f) = local_args.value_of("on") { + Some(NodeFilter::new(f)?) } else { - Goal::from_str(goal_arg).unwrap() + None }; - let build_only = goal == Goal::Build && goal_arg != "keys"; + let goal_arg = local_args.value_of("goal").unwrap(); + let goal = Goal::from_str(goal_arg).unwrap(); - let mut targets = HashMap::new(); - for node in &selected_nodes { - let config = all_nodes.get(node).unwrap(); - let host = config.to_ssh_host(); - match host { - Some(mut host) => { - if let Some(ssh_config) = ssh_config.as_ref() { - host.set_ssh_config(ssh_config.clone()); - } + let targets = hive.select_nodes(filter, ssh_config, goal.requires_target_host()).await?; + let n_targets = targets.len(); - targets.insert( - node.clone(), - Target::new(host.upcast(), config.clone()), - ); - } - None => { - if build_only { - targets.insert( - node.clone(), - Target::new(localhost(nix_options.clone()), config.clone()), - ); - } - } + let mut output = SimpleProgressOutput::new(local_args.is_present("verbose")); + let progress = output.get_sender(); + + let mut deployment = Deployment::new(hive, targets, goal, progress); + + // FIXME: Configure limits + let options = { + let mut options = Options::default(); + options.set_substituters_push(!local_args.is_present("no-substitutes")); + options.set_gzip(!local_args.is_present("no-gzip")); + options.set_upload_keys(!local_args.is_present("no-keys")); + options.set_force_replace_unknown_profiles(local_args.is_present("force-replace-unknown-profiles")); + + if local_args.is_present("keep-result") { + options.set_create_gc_roots(true); } - } - if targets.len() == all_nodes.len() { - log::info!("Selected all {} nodes.", targets.len()); - } else if targets.len() == selected_nodes.len() { - log::info!("Selected {} out of {} hosts.", targets.len(), all_nodes.len()); - } else { - log::info!("Selected {} out of {} hosts ({} skipped)", targets.len(), all_nodes.len(), selected_nodes.len() - targets.len()); - } - - if targets.len() == 0 { - log::warn!("No selected nodes are accessible over SSH. Exiting..."); - quit::with_code(2); - } - - let mut deployment = Deployment::new(hive, targets, goal); - - let mut options = DeploymentOptions::default(); - options.set_substituters_push(!local_args.is_present("no-substitutes")); - options.set_gzip(!local_args.is_present("no-gzip")); - options.set_progress_bar(!local_args.is_present("verbose")); - options.set_upload_keys(!local_args.is_present("no-keys")); - options.set_force_replace_unknown_profiles(local_args.is_present("force-replace-unknown-profiles")); - - if local_args.is_present("keep-result") { - options.set_create_gc_roots(true); - } + options + }; deployment.set_options(options); - if local_args.is_present("no-keys") && goal_arg == "keys" { + if local_args.is_present("no-keys") && goal == Goal::UploadKeys { log::error!("--no-keys cannot be used when the goal is to upload keys"); quit::with_code(1); } - let mut parallelism_limit = ParallelismLimit::default(); - parallelism_limit.set_apply_limit({ - let limit = local_args.value_of("parallel").unwrap().parse::().unwrap(); - if limit == 0 { - selected_nodes.len() // HACK - } else { - local_args.value_of("parallel").unwrap().parse::().unwrap() - } - }); - deployment.set_parallelism_limit(parallelism_limit); + let parallelism_limit = { + let mut limit = ParallelismLimit::default(); + limit.set_apply_limit({ + let limit = local_args.value_of("parallel").unwrap().parse::().unwrap(); + if limit == 0 { + n_targets + } else { + limit + } + }); + limit + }; let evaluation_node_limit = match local_args.value_of("eval-node-limit").unwrap() { "auto" => EvaluationNodeLimit::Heuristic, @@ -229,19 +181,16 @@ pub async fn run(_global_args: &ArgMatches<'_>, local_args: &ArgMatches<'_>) -> } } }; + + deployment.set_parallelism_limit(parallelism_limit); deployment.set_evaluation_node_limit(evaluation_node_limit); - let deployment = Arc::new(deployment); + let (deployment, output) = tokio::join!( + deployment.execute(), + output.run_until_completion(), + ); - let success = if goal_arg == "keys" { - deployment.upload_keys().await - } else { - deployment.execute().await - }; - - if !success { - quit::with_code(10); - } + deployment?; output?; Ok(()) } diff --git a/src/command/apply_local.rs b/src/command/apply_local.rs index 8b62e14..836429b 100644 --- a/src/command/apply_local.rs +++ b/src/command/apply_local.rs @@ -1,6 +1,5 @@ use std::env; use std::collections::HashMap; -use std::sync::Arc; use clap::{Arg, App, SubCommand, ArgMatches}; use tokio::fs; @@ -9,10 +8,11 @@ use tokio::process::Command; use crate::nix::deployment::{ Deployment, Goal, - Target, - DeploymentOptions, + TargetNode, + Options, }; use crate::nix::{NixError, NodeName, host}; +use crate::progress::SimpleProgressOutput; use crate::util; pub fn subcommand() -> App<'static, 'static> { @@ -23,7 +23,7 @@ pub fn subcommand() -> App<'static, 'static> { .long_help("Same as the targets for switch-to-configuration.\n\"push\" is noop in apply-local.") .default_value("switch") .index(1) - .possible_values(&["push", "switch", "boot", "test", "dry-activate"])) + .possible_values(&["push", "switch", "boot", "test", "dry-activate", "keys"])) .arg(Arg::with_name("sudo") .long("sudo") .help("Attempt to escalate privileges if not run as root")) @@ -102,20 +102,21 @@ pub async fn run(_global_args: &ArgMatches<'_>, local_args: &ArgMatches<'_>) -> }; let goal = Goal::from_str(local_args.value_of("goal").unwrap()).unwrap(); - let target: Target = { - if let Some(info) = hive.deployment_info_for(&hostname).await.unwrap() { + let target = { + if let Some(info) = hive.deployment_info_single(&hostname).await.unwrap() { let nix_options = hive.nix_options().await.unwrap(); if !info.allows_local_deployment() { log::error!("Local deployment is not enabled for host {}.", hostname.as_str()); log::error!("Hint: Set deployment.allowLocalDeployment to true."); quit::with_code(2); } - Target::new( - host::local(nix_options), + TargetNode::new( + hostname.clone(), + Some(host::local(nix_options)), info.clone(), ) } else { - log::error!("Host {} is not present in the Hive configuration.", hostname.as_str()); + log::error!("Host \"{}\" is not present in the Hive configuration.", hostname.as_str()); quit::with_code(2); } }; @@ -123,18 +124,20 @@ pub async fn run(_global_args: &ArgMatches<'_>, local_args: &ArgMatches<'_>) -> let mut targets = HashMap::new(); targets.insert(hostname.clone(), target); - let mut deployment = Deployment::new(hive, targets, goal); - let mut options = DeploymentOptions::default(); - options.set_upload_keys(!local_args.is_present("no-upload-keys")); - options.set_progress_bar(!local_args.is_present("verbose")); + let mut output = SimpleProgressOutput::new(local_args.is_present("verbose")); + let progress = output.get_sender(); + + let mut deployment = Deployment::new(hive, targets, goal, progress); + + let options = { + let mut options = Options::default(); + options.set_upload_keys(!local_args.is_present("no-upload-keys")); + options + }; + deployment.set_options(options); - let deployment = Arc::new(deployment); - let success = deployment.execute().await; - - if !success { - quit::with_code(10); - } + deployment.execute().await?; Ok(()) } diff --git a/src/command/exec.rs b/src/command/exec.rs index 50d16bb..be9f223 100644 --- a/src/command/exec.rs +++ b/src/command/exec.rs @@ -1,4 +1,3 @@ -use std::collections::HashMap; use std::env; use std::path::PathBuf; use std::sync::Arc; @@ -7,9 +6,10 @@ use clap::{Arg, App, AppSettings, SubCommand, ArgMatches}; use futures::future::join_all; use tokio::sync::Semaphore; -use crate::nix::NixError; -use crate::progress::{Progress, OutputStyle}; -use crate::util::{self, CommandExecution}; +use crate::nix::{NixError, NodeFilter}; +use crate::job::{JobMonitor, JobState, JobType}; +use crate::progress::SimpleProgressOutput; +use crate::util; pub fn subcommand() -> App<'static, 'static> { let command = SubCommand::with_name("exec") @@ -56,60 +56,17 @@ It's recommended to use -- to separate Colmena options from the command to run. pub async fn run(_global_args: &ArgMatches<'_>, local_args: &ArgMatches<'_>) -> Result<(), NixError> { let hive = util::hive_from_args(local_args).await?; - - log::info!("Enumerating nodes..."); - let all_nodes = hive.deployment_info().await?; - - let selected_nodes = match local_args.value_of("on") { - Some(filter) => { - util::filter_nodes(&all_nodes, filter) - } - None => all_nodes.keys().cloned().collect(), - }; - - if selected_nodes.len() == 0 { - log::warn!("No hosts matched. Exiting..."); - quit::with_code(2); - } - let ssh_config = env::var("SSH_CONFIG_FILE") .ok().map(PathBuf::from); - let mut hosts = HashMap::new(); - for node in &selected_nodes { - let config = all_nodes.get(node).unwrap(); - let host = config.to_ssh_host(); - match host { - Some(mut host) => { - if let Some(ssh_config) = ssh_config.as_ref() { - host.set_ssh_config(ssh_config.clone()); - } - - hosts.insert(node.clone(), host); - } - None => {}, - } - } - - if hosts.len() == all_nodes.len() { - log::info!("Selected all {} nodes.", hosts.len()); - } else if hosts.len() == selected_nodes.len() { - log::info!("Selected {} out of {} hosts.", hosts.len(), all_nodes.len()); + let filter = if let Some(f) = local_args.value_of("on") { + Some(NodeFilter::new(f)?) } else { - log::info!("Selected {} out of {} hosts ({} skipped)", hosts.len(), all_nodes.len(), selected_nodes.len() - hosts.len()); - } - - if hosts.len() == 0 { - log::warn!("No selected nodes are accessible over SSH. Exiting..."); - quit::with_code(2); - } - - let mut progress = if local_args.is_present("verbose") { - Progress::with_style(OutputStyle::Plain) - } else { - Progress::default() + None }; + let mut targets = hive.select_nodes(filter, ssh_config, true).await?; + let parallel_sp = Arc::new({ let limit = local_args.value_of("parallel").unwrap() .parse::().unwrap(); @@ -121,52 +78,52 @@ pub async fn run(_global_args: &ArgMatches<'_>, local_args: &ArgMatches<'_>) -> } }); - let label_width = hosts.keys().map(|n| n.len()).max().unwrap(); - progress.set_label_width(label_width); - - let progress = Arc::new(progress); let command: Arc> = Arc::new(local_args.values_of("command").unwrap().map(|s| s.to_string()).collect()); - progress.run(|progress| async move { + let mut output = SimpleProgressOutput::new(local_args.is_present("verbose")); + + let (monitor, meta) = JobMonitor::new(output.get_sender()); + let meta = meta.run(|meta| async move { let mut futures = Vec::new(); - for (name, host) in hosts.drain() { + for (name, target) in targets.drain() { let parallel_sp = parallel_sp.clone(); let command = command.clone(); - let progress = progress.clone(); - futures.push(async move { + let mut host = target.into_host().unwrap(); + + let job = meta.create_job(JobType::Execute, vec![ name.clone() ])?; + + futures.push(job.run_waiting(|job| async move { let permit = match parallel_sp.as_ref() { Some(sp) => Some(sp.acquire().await.unwrap()), None => None, }; - let progress = progress.create_task_progress(name.to_string()); + job.state(JobState::Running)?; let command_v: Vec<&str> = command.iter().map(|s| s.as_str()).collect(); - let command = host.ssh(&command_v); - let mut execution = CommandExecution::new(command); - execution.set_progress_bar(progress.clone()); - - match execution.run().await { - Ok(()) => { - progress.success("Exited"); - } - Err(e) => { - if let NixError::NixFailure { exit_code } = e { - progress.failure(&format!("Exited with code {}", exit_code)); - } else { - progress.failure(&format!("Error during execution: {}", e)); - } - } - } + host.set_job(Some(job)); + host.run_command(&command_v).await?; drop(permit); - }); + + Ok(()) + })); } join_all(futures).await; - }).await; + + Ok(()) + }); + + let (meta, monitor, output) = tokio::join!( + meta, + monitor.run_until_completion(), + output.run_until_completion(), + ); + + meta?; monitor?; output?; Ok(()) } diff --git a/src/command/test_progress.rs b/src/command/test_progress.rs index 43c9f14..e4c765b 100644 --- a/src/command/test_progress.rs +++ b/src/command/test_progress.rs @@ -3,8 +3,15 @@ use std::time::Duration; use clap::{App, AppSettings, SubCommand, ArgMatches}; use tokio::time; -use crate::nix::NixError; -use crate::progress::{Progress, OutputStyle}; +use crate::job::{JobMonitor, JobType}; +use crate::nix::{NixError, NixResult, NodeName}; +use crate::progress::{ProgressOutput, spinner::SpinnerOutput}; + +macro_rules! node { + ($n:expr) => { + NodeName::new($n.to_string()).unwrap() + } +} pub fn subcommand() -> App<'static, 'static> { SubCommand::with_name("test-progress") @@ -13,15 +20,50 @@ pub fn subcommand() -> App<'static, 'static> { } pub async fn run(_global_args: &ArgMatches<'_>, _local_args: &ArgMatches<'_>) -> Result<(), NixError> { - let progress = Progress::with_style(OutputStyle::Condensed); - let mut task = progress.create_task_progress(String::from("test")); + let mut output = SpinnerOutput::new(); + let (monitor, meta) = JobMonitor::new(output.get_sender()); - for i in 0..10 { - time::sleep(Duration::from_secs(2)).await; - task.log(&format!("Very slow counter: {}", i)); - } + let meta_future = meta.run(|meta| async move { + meta.message("Message from meta job".to_string())?; - task.success("Completed"); + let nodes = vec![ + node!("alpha"), + node!("beta"), + node!("gamma"), + node!("delta"), + node!("epsilon"), + ]; + let eval = meta.create_job(JobType::Evaluate, nodes)?; + let eval = eval.run(|job| async move { + for i in 0..10 { + job.message(format!("eval: {}", i))?; + time::sleep(Duration::from_secs(1)).await; + } + + Ok(()) + }); + + let build = meta.create_job(JobType::Build, vec![ node!("alpha"), node!("beta") ])?; + let build = build.run(|_| async move { + time::sleep(Duration::from_secs(5)).await; + + Ok(()) + }); + + let (_, _) = tokio::join!(eval, build); + + Err(NixError::Unsupported) as NixResult<()> + }); + + let (monitor, output, ret) = tokio::join!( + monitor.run_until_completion(), + output.run_until_completion(), + meta_future, + ); + + monitor?; output?; + + println!("Return Value -> {:?}", ret); Ok(()) } diff --git a/src/job.rs b/src/job.rs new file mode 100644 index 0000000..5fe49f6 --- /dev/null +++ b/src/job.rs @@ -0,0 +1,945 @@ +//! Job control. +//! +//! We use a channel to send Events from different futures to a job monitor, +//! which coordinates the display of progress onto the terminal. + +use std::collections::HashMap; +use std::fmt::{self, Display}; +use std::future::Future; +use std::sync::Arc; +use std::time::Duration; + +use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; +use tokio::time; +use uuid::Uuid; + +use crate::nix::{NixResult, NixError, NodeName, ProfileMap}; +use crate::progress::{Sender as ProgressSender, Message as ProgressMessage, Line, LineStyle}; + +pub type Sender = UnboundedSender; +pub type Receiver = UnboundedReceiver; + +/// A handle to a job. +pub type JobHandle = Arc; + +/// Maximum log lines to print for failures. +const LOG_CONTEXT_LINES: usize = 20; + +/// An opaque job identifier. +#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)] +pub struct JobId(Uuid); + +/// Coordinator of all job states. +/// +/// It receives event messages from jobs and updates the progress +/// spinners. +pub struct JobMonitor { + /// The receiving end of the mpsc channel. + receiver: Receiver, + + /// Events received so far. + events: Vec, + + /// Known jobs and their metadata. + jobs: HashMap, + + /// ID of the meta job. + meta_job_id: JobId, + + /// Sender to the spinner thread. + progress: Option, + + /// Estimated max label size. + label_width: Option, +} + +/// The state of a job. +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum JobState { + /// Waiting to begin. + /// + /// Progress bar is not shown in this state. + Waiting, + + /// Running. + Running, + + /// Succeeded. + Succeeded, + + /// Failed. + Failed, +} + +/// The type of a job. +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum JobType { + /// Meta. + Meta, + + /// Nix evaluation. + Evaluate, + + /// Nix build. + Build, + + /// Key uploading. + UploadKeys, + + /// Pushing closure to a host. + Push, + + /// Activating a system profile on a host. + Activate, + + /// Executing an arbitrary command. + Execute, + + /// Creating GC roots. + CreateGcRoots, +} + +/// A handle to a job. +/// +/// Usually used as `Arc`/`JobHandle` which is clonable. +#[derive(Debug)] +pub struct JobHandleInner { + /// Unique ID of the job. + job_id: JobId, + + /// Handle to the mpsc channel. + sender: Sender, +} + +/// A handle to the meta job. +/// +/// This handle cannot be cloned, and the wrapper is implemented differently +/// to signal to the monitor when it needs to shut down. +#[derive(Debug)] +pub struct MetaJobHandle { + /// Unique ID of the job. + job_id: JobId, + + /// Handle to the mpsc channel. + sender: Sender, +} + +/// Internal metadata of a job. +#[derive(Debug)] +struct JobMetadata { + job_id: JobId, + + /// Type of the job. + job_type: JobType, + + /// Custom human-readable name of the job. + friendly_name: Option, + + /// List of associated nodes. + /// + /// Some jobs may be related to multiple nodes (e.g., building + /// several system profiles at once). + nodes: Vec, + + /// Current state of this job. + state: JobState, + + /// Current custom message of this job. + /// + /// For jobs in the Failed state, this is the error. + /// For jobs in the Succeeded state, this might contain a custom + /// message. + custom_message: Option, + + /// Last human-readable message from the job. + /// + /// This is so we can quickly repaint without needing to filter + /// through the event logs. + last_message: Option, +} + +/// Message to create a new job. +#[derive(Debug)] +pub struct JobCreation { + /// Type of the job. + job_type: JobType, + + /// Custom human-readable name of the job. + friendly_name: Option, + + /// List of associated nodes. + nodes: Vec, +} + +/// An event message sent via the mpsc channel. +#[derive(Debug)] +pub struct Event { + /// Unique ID of the job. + job_id: JobId, + + /// Event payload. + payload: EventPayload, +} + +/// The payload of an event. +#[derive(Debug)] +pub enum EventPayload { + /// The job is created. + Creation(JobCreation), + + /// The job succeeded with a custom message. + SuccessWithMessage(String), + + /// The job failed. + /// + /// We can't pass the NixError because the wrapper needs to + /// be able to return it as-is. + Failure(String), + + /// The job was no-op. + /// + /// This probably means that some precondition wasn't met and + /// this job didn't make any changes. + /// + /// This puts the job in the Succeeded state but causes the + /// progress spinner to disappear. + Noop(String), + + /// The job wants to transition to a new state. + NewState(JobState), + + /// The job built a set of system profiles. + ProfilesBuilt(ProfileMap), + + /// The child process printed a line to stdout. + ChildStdout(String), + + /// The child process printed a line to stderr. + ChildStderr(String), + + /// A normal message from the job itself. + Message(String), + + /// The monitor should shut down. + /// + /// This is sent at the end of the meta job regardless of the outcome. + ShutdownMonitor, +} + +struct JobStats { + waiting: usize, + running: usize, + succeeded: usize, + failed: usize, +} + +impl JobId { + pub fn new() -> Self { + Self(Uuid::new_v4()) + } +} + +impl JobMonitor { + /// Creates a new job monitor and a meta job. + pub fn new(progress: Option) -> (Self, MetaJobHandle) { + let (sender, receiver) = mpsc::unbounded_channel(); + let meta_job_id = JobId::new(); + + let mut monitor = Self { + receiver, + events: Vec::new(), + jobs: HashMap::new(), + meta_job_id, + progress, + label_width: None, + }; + + let metadata = JobMetadata { + job_id: meta_job_id, + job_type: JobType::Meta, + friendly_name: None, + nodes: Vec::new(), + state: JobState::Running, + last_message: None, + custom_message: None, + }; + + monitor.jobs.insert(meta_job_id, metadata); + + let job = MetaJobHandle { + job_id: meta_job_id, + sender, + }; + + (monitor, job) + } + + /// Sets the max label width. + pub fn set_label_width(&mut self, label_width: usize) { + self.label_width = Some(label_width); + } + + /// Starts the monitor. + pub async fn run_until_completion(mut self) -> NixResult { + if let Some(width) = self.label_width { + if let Some(sender) = &self.progress { + sender.send(ProgressMessage::HintLabelWidth(width)).unwrap(); + } + } + + loop { + let message = self.receiver.recv().await; + + if message.is_none() { + // All sending halves have been closed - We are done! + return self.finish().await; + } + + let message = message.unwrap(); + + match &message.payload { + EventPayload::Creation(creation) => { + let metadata = JobMetadata { + job_id: message.job_id, + job_type: creation.job_type, + friendly_name: creation.friendly_name.clone(), + nodes: creation.nodes.clone(), + state: JobState::Waiting, + last_message: None, + custom_message: None, + }; + + let existing = self.jobs.insert(message.job_id, metadata); + assert!(existing.is_none()); + } + EventPayload::ShutdownMonitor => { + // The meta job has returned - We are done! + assert_eq!(self.meta_job_id, message.job_id); + return self.finish().await; + } + EventPayload::NewState(new_state) => { + self.update_job_state(message.job_id, *new_state, None, false); + + if message.job_id != self.meta_job_id { + self.print_job_stats(); + } + } + EventPayload::SuccessWithMessage(custom_message) => { + let custom_message = Some(custom_message.clone()); + self.update_job_state(message.job_id, JobState::Succeeded, custom_message, false); + + if message.job_id != self.meta_job_id { + self.print_job_stats(); + } + } + EventPayload::Noop(custom_message) => { + let custom_message = Some(custom_message.clone()); + self.update_job_state(message.job_id, JobState::Succeeded, custom_message, true); + + if message.job_id != self.meta_job_id { + self.print_job_stats(); + } + } + EventPayload::Failure(error) => { + let error = Some(error.clone()); + self.update_job_state(message.job_id, JobState::Failed, error, false); + + if message.job_id != self.meta_job_id { + self.print_job_stats(); + } + } + EventPayload::ProfilesBuilt(profiles) => { + if let Some(sender) = &self.progress { + for (name, profile) in profiles.iter() { + let text = format!("Built {:?}", profile.as_path()); + let line = Line::new(message.job_id, text) + .label(name.as_str().to_string()) + .one_off() + .style(LineStyle::Success); + let pm = self.get_print_message(message.job_id, line); + sender.send(pm).unwrap(); + } + } + } + EventPayload::ChildStdout(m) | EventPayload::ChildStderr(m) | EventPayload::Message(m) => { + if let Some(sender) = &self.progress { + let metadata = &self.jobs[&message.job_id]; + let line = metadata.get_line(m.clone()); + let pm = self.get_print_message(message.job_id, line); + sender.send(pm).unwrap(); + } + } + } + + self.events.push(message); + } + } + + /// Updates the state of a job. + fn update_job_state(&mut self, + job_id: JobId, + new_state: JobState, + message: Option, + noop: bool, + ) { + let mut metadata = self.jobs.remove(&job_id).unwrap(); + let old_state = metadata.state; + + if old_state == new_state { + return; + } else if old_state.is_final() { + log::debug!("Tried to update the state of a finished job"); + return; + } + + metadata.state = new_state; + + if message.is_some() { + metadata.custom_message = message.clone(); + } + + let metadata = if new_state == JobState::Waiting { + // Waiting state doesn't generate user-visible output + metadata + } else { + if let Some(sender) = &self.progress { + let text = if new_state == JobState::Succeeded { + metadata.custom_message.clone() + .or_else(|| metadata.describe_state_transition()) + } else { + metadata.describe_state_transition() + }; + + if let Some(text) = text { + let line = if noop { + // Spinner should disappear + metadata.get_line(text).style(LineStyle::SuccessNoop) + } else { + metadata.get_line(text) + }; + + let message = self.get_print_message(job_id, line); + sender.send(message).unwrap(); + } + } + + metadata + }; + + self.jobs.insert(job_id, metadata); + } + + /// Updates the user-visible job statistics output. + fn print_job_stats(&self) { + if let Some(sender) = &self.progress { + let stats = self.get_job_stats(); + let text = format!("{}", stats); + let line = self.jobs[&self.meta_job_id].get_line(text) + .noisy(); + let message = ProgressMessage::PrintMeta(line); + sender.send(message).unwrap(); + } + } + + /// Returns jobs statistics. + fn get_job_stats(&self) -> JobStats { + let mut waiting = 0; + let mut running = 0; + let mut succeeded = 0; + let mut failed = 0; + + for job in self.jobs.values() { + if job.job_id == self.meta_job_id { + continue; + } + + match job.state { + JobState::Waiting => { + waiting += 1; + } + JobState::Running => { + running += 1; + } + JobState::Succeeded => { + succeeded += 1; + } + JobState::Failed => { + failed += 1; + } + } + } + + JobStats { + waiting, + running, + succeeded, + failed, + } + } + + fn get_print_message(&self, job_id: JobId, line: Line) -> ProgressMessage { + if job_id == self.meta_job_id { + ProgressMessage::PrintMeta(line) + } else { + ProgressMessage::Print(line) + } + } + + /// Shows human-readable summary and performs cleanup. + async fn finish(mut self) -> NixResult { + if let Some(sender) = self.progress.take() { + sender.send(ProgressMessage::Complete).unwrap(); + } + + // HACK + time::sleep(Duration::from_secs(1)).await; + + for job in self.jobs.values() { + if job.state == JobState::Failed { + let logs: Vec<&Event> = self.events.iter().filter(|e| e.job_id == job.job_id).collect(); + let last_logs: Vec<&Event> = logs.into_iter().rev().take(LOG_CONTEXT_LINES).rev().collect(); + + log::error!("{} - Last {} lines of logs:", job.get_failure_summary(), last_logs.len()); + for event in last_logs { + log::error!("{}", event.payload); + } + } + } + + Ok(self) + } +} + +impl JobState { + /// Returns whether this state is final. + pub fn is_final(&self) -> bool { + match self { + Self::Failed | Self::Succeeded => true, + _ => false, + } + } +} + +impl JobHandleInner { + /// Creates a new job with a distinct ID. + /// + /// This sends out a Creation message with the metadata. + pub fn create_job(&self, job_type: JobType, nodes: Vec) -> NixResult { + let job_id = JobId::new(); + let creation = JobCreation { + friendly_name: None, + job_type, + nodes, + }; + + if job_type == JobType::Meta { + return Err(NixError::Unknown { message: "Cannot create a meta job!".to_string() }); + } + + let new_handle = Arc::new(Self { + job_id, + sender: self.sender.clone(), + }); + + new_handle.send_payload(EventPayload::Creation(creation))?; + + Ok(new_handle) + } + + /// Runs a closure, automatically updating the job monitor based on the result. + /// + /// This immediately transitions the state to Running. + pub async fn run(self: Arc, f: U) -> NixResult + where U: FnOnce(Arc) -> F, + F: Future>, + { + self.run_internal(f, true).await + } + + /// Runs a closure, automatically updating the job monitor based on the result. + /// + /// This does not immediately transition the state to Running. + pub async fn run_waiting(self: Arc, f: U) -> NixResult + where U: FnOnce(Arc) -> F, + F: Future>, + { + self.run_internal(f, false).await + } + + /// Sends a line of child stdout to the job monitor. + pub fn stdout(&self, output: String) -> NixResult<()> { + self.send_payload(EventPayload::ChildStdout(output)) + } + + /// Sends a line of child stderr to the job monitor. + pub fn stderr(&self, output: String) -> NixResult<()> { + self.send_payload(EventPayload::ChildStderr(output)) + } + + /// Sends a human-readable message to the job monitor. + pub fn message(&self, message: String) -> NixResult<()> { + self.send_payload(EventPayload::Message(message)) + } + + /// Transitions to a new job state. + pub fn state(&self, new_state: JobState) -> NixResult<()> { + self.send_payload(EventPayload::NewState(new_state)) + } + + /// Marks the job as successful, with a custom message. + pub fn success_with_message(&self, message: String) -> NixResult<()> { + self.send_payload(EventPayload::SuccessWithMessage(message)) + } + + /// Marks the job as noop. + pub fn noop(&self, message: String) -> NixResult<()> { + self.send_payload(EventPayload::Noop(message)) + } + + /// Marks the job as failed. + pub fn failure(&self, error: &NixError) -> NixResult<()> { + self.send_payload(EventPayload::Failure(error.to_string())) + } + + /// Sends a set of built profiles. + pub fn profiles_built(&self, profiles: ProfileMap) -> NixResult<()> { + self.send_payload(EventPayload::ProfilesBuilt(profiles)) + } + + /// Runs a closure, automatically updating the job monitor based on the result. + async fn run_internal(self: Arc, f: U, report_running: bool) -> NixResult + where U: FnOnce(Arc) -> F, + F: Future>, + { + if report_running { + // Tell monitor we are starting + self.send_payload(EventPayload::NewState(JobState::Running))?; + } + + match f(self.clone()).await { + Ok(val) => { + // Success! + self.state(JobState::Succeeded)?; + + Ok(val) + } + Err(e) => { + self.failure(&e)?; + + Err(e) + } + } + } + + /// Sends an event to the job monitor. + fn send_payload(&self, payload: EventPayload) -> NixResult<()> { + if payload.privileged() { + panic!("Tried to send privileged payload with JobHandle"); + } + + let event = Event::new(self.job_id, payload); + + self.sender.send(event) + .map_err(|e| NixError::unknown(Box::new(e)))?; + + Ok(()) + } +} + +impl MetaJobHandle { + /// Runs a closure, automatically updating the job monitor based on the result. + pub async fn run(self, f: U) -> NixResult + where U: FnOnce(JobHandle) -> F, + F: Future>, + { + let normal_handle = Arc::new(JobHandleInner { + job_id: self.job_id, + sender: self.sender.clone(), + }); + + match f(normal_handle).await { + Ok(val) => { + self.send_payload(EventPayload::NewState(JobState::Succeeded))?; + self.send_payload(EventPayload::ShutdownMonitor)?; + + Ok(val) + } + Err(e) => { + self.send_payload(EventPayload::Failure(e.to_string()))?; + self.send_payload(EventPayload::ShutdownMonitor)?; + + Err(e) + } + } + } + + /// Sends an event to the job monitor. + fn send_payload(&self, payload: EventPayload) -> NixResult<()> { + let event = Event::new(self.job_id, payload); + + self.sender.send(event) + .map_err(|e| NixError::unknown(Box::new(e)))?; + + Ok(()) + } +} + +impl JobMetadata { + /// Returns a short human-readable label. + fn get_label(&self) -> &str { + if self.job_type == JobType::Meta { + "" + } else if self.nodes.len() != 1 { + "(...)" + } else { + self.nodes[0].as_str() + } + } + + /// Returns a Line struct with the given text. + fn get_line(&self, text: String) -> Line { + let style = match self.state { + JobState::Succeeded => LineStyle::Success, + JobState::Failed => LineStyle::Failure, + _ => LineStyle::Normal, + }; + + Line::new(self.job_id, text) + .style(style) + .label(self.get_label().to_string()) + } + + /// Returns a human-readable string describing the transition to the current state. + fn describe_state_transition(&self) -> Option { + if self.state == JobState::Waiting { + return None; + } + + let node_list = describe_node_list(&self.nodes) + .unwrap_or_else(|| "some node(s)".to_string()); + + let message = self.custom_message.as_ref().map(|e| e.as_str()) + .unwrap_or("No message"); + + Some(match (self.job_type, self.state) { + (JobType::Meta, JobState::Succeeded) => format!("All done!"), + + (JobType::Evaluate, JobState::Running) => format!("Evaluating {}", node_list), + (JobType::Evaluate, JobState::Succeeded) => format!("Evaluated {}", node_list), + (JobType::Evaluate, JobState::Failed) => format!("Evaluation failed: {}", message), + + (JobType::Build, JobState::Running) => format!("Building {}", node_list), + (JobType::Build, JobState::Succeeded) => format!("Built {}", node_list), + (JobType::Build, JobState::Failed) => format!("Build failed: {}", message), + + (JobType::Push, JobState::Running) => format!("Pushing system closure"), + (JobType::Push, JobState::Succeeded) => format!("Pushed system closure"), + (JobType::Push, JobState::Failed) => format!("Push failed: {}", message), + + (JobType::UploadKeys, JobState::Running) => format!("Uploading keys"), + (JobType::UploadKeys, JobState::Succeeded) => format!("Uploaded keys"), + (JobType::UploadKeys, JobState::Failed) => format!("Key upload failed: {}", message), + + (JobType::Activate, JobState::Running) => format!("Activating system profile"), + (JobType::Activate, JobState::Failed) => format!("Activation failed: {}", message), + + (_, JobState::Failed) => format!("Failed: {}", message), + (_, JobState::Succeeded) => format!("Succeeded"), + _ => "".to_string(), + }) + } + + /// Returns a human-readable string describing a failed job for use in the summary. + fn get_failure_summary(&self) -> String { + let node_list = describe_node_list(&self.nodes) + .unwrap_or_else(|| "some node(s)".to_string()); + + match self.job_type { + JobType::Evaluate => format!("Failed to evaluate {}", node_list), + JobType::Build => format!("Failed to build {}", node_list), + JobType::Push => format!("Failed to push system closure to {}", node_list), + JobType::UploadKeys => format!("Failed to upload keys to {}", node_list), + JobType::Activate => format!("Failed to deploy to {}", node_list), + JobType::Meta => format!("Failed to complete requested operation"), + _ => format!("Failed to complete job on {}", node_list), + } + } +} + +impl Event { + /// Creates a new event. + fn new(job_id: JobId, payload: EventPayload) -> Self { + Self { job_id, payload } + } +} + +impl EventPayload { + fn privileged(&self) -> bool { + match self { + Self::ShutdownMonitor => true, + _ => false, + } + } +} + +impl Display for EventPayload { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "Event")?; + + match self { + EventPayload::ChildStdout(o) => write!(f, "[ stdout]: {}", o)?, + EventPayload::ChildStderr(o) => write!(f, "[ stderr]: {}", o)?, + EventPayload::Message(m) => write!(f, "[ message]: {}", m)?, + EventPayload::Creation(_) => write!(f, "[ created]")?, + EventPayload::NewState(s) => write!(f, "[ state] {:?}", s)?, + EventPayload::SuccessWithMessage(m) => write!(f, "[ success]: {}", m)?, + EventPayload::Noop(m) => write!(f, "[ noop]: {}", m)?, + EventPayload::Failure(e) => write!(f, "[ failure]: {}", e)?, + EventPayload::ShutdownMonitor => write!(f, "[shutdown]")?, + EventPayload::ProfilesBuilt(pm) => write!(f, "[ built]: {:?}", pm)?, + } + + Ok(()) + } +} + +impl Display for JobStats { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + let mut first = true; + fn comma(f: &mut fmt::Formatter, first: &mut bool) -> fmt::Result { + if *first { + *first = false; + return Ok(()); + } + write!(f, ", ") + } + + if self.running != 0 { + comma(f, &mut first)?; + write!(f, "{} running", self.running)?; + } + + if self.succeeded != 0 { + comma(f, &mut first)?; + write!(f, "{} succeeded", self.succeeded)?; + } + + if self.failed != 0 { + comma(f, &mut first)?; + write!(f, "{} failed", self.failed)?; + } + + if self.waiting != 0 { + comma(f, &mut first)?; + write!(f, "{} waiting", self.waiting)?; + } + + Ok(()) + } +} + +/// Returns a textual description of a list of nodes. +/// +/// Example: "alpha, beta, and 5 other nodes" +fn describe_node_list(nodes: &[NodeName]) -> Option { + let rough_limit = 40; + let other_text = ", and XX other nodes"; + + let total = nodes.len(); + if total == 0 { + return None; + } + + let mut s = String::new(); + let mut iter = nodes.iter().enumerate().peekable(); + + while let Some((_, node)) = iter.next() { + let next = iter.peek(); + + if s.len() != 0 { + if next.is_none() { + s += if total > 2 { ", and " } else { " and " }; + } else { + s += ", " + } + } + + s += node.as_str(); + + if next.is_none() { + break; + } + + let (idx, next) = next.unwrap(); + let remaining = rough_limit - s.len(); + + if next.len() + other_text.len() >= remaining { + s += &format!(", and {} other nodes", total - idx); + break; + } + } + + Some(s) +} + +#[cfg(test)] +mod tests { + use super::*; + + use tokio_test::block_on; + + macro_rules! node { + ($n:expr) => { + NodeName::new($n.to_string()).unwrap() + } + } + + #[test] + fn test_monitor_event() { + block_on(async { + let (monitor, meta) = JobMonitor::new(None); + + let meta = meta.run(|job: JobHandle| async move { + job.message("hello world".to_string())?; + + let eval_job = job.create_job(JobType::Evaluate, vec![ node!("alpha") ])?; + eval_job.run(|job| async move { + job.stdout("child stdout".to_string())?; + + Ok(()) + }).await?; + + Err(NixError::Unsupported) as NixResult<()> + }); + + // Run until completion + let (ret, monitor) = tokio::join!( + meta, + monitor.run_until_completion(), + ); + + match ret { + Err(NixError::Unsupported) => (), + _ => { + panic!("Wrapper must return error as-is"); + } + } + + let monitor = monitor.unwrap(); + + assert_eq!(2, monitor.jobs.len()); + + for event in monitor.events.iter() { + match &event.payload { + EventPayload::Message(m) => { + assert_eq!("hello world", m); + } + EventPayload::ChildStdout(m) => { + assert_eq!("child stdout", m); + } + _ => {} + } + } + }); + } +} diff --git a/src/main.rs b/src/main.rs index f3585ea..5d62828 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,9 +1,12 @@ +#![deny(unused_must_use)] + use std::env; mod nix; mod cli; mod command; mod progress; +mod job; mod troubleshooter; mod util; diff --git a/src/nix/deployment.rs b/src/nix/deployment.rs deleted file mode 100644 index eda3227..0000000 --- a/src/nix/deployment.rs +++ /dev/null @@ -1,796 +0,0 @@ -use std::cmp::max; -use std::collections::HashMap; -use std::sync::Arc; - -use futures::future::join_all; -use tokio::sync::{Mutex, Semaphore}; - -use super::{Hive, Host, CopyOptions, NodeName, NodeConfig, Profile, StoreDerivation, ProfileMap, host}; -use super::key::{Key, UploadAt}; -use crate::progress::{Progress, TaskProgress, OutputStyle}; - -/// Amount of RAM reserved for the system, in MB. -const EVAL_RESERVE_MB: u64 = 1024; - -/// Estimated amount of RAM needed to evaluate one host, in MB. -const EVAL_PER_HOST_MB: u64 = 512; - -const BATCH_OPERATION_LABEL: &'static str = "(...)"; - -macro_rules! set_up_batch_progress_bar { - ($progress:ident, $style:ident, $chunk:ident, $single_text:expr, $batch_text:expr) => {{ - if $chunk.len() == 1 { - let mut bar = $progress.create_task_progress($chunk[0].to_string()); - bar.log($single_text); - bar - } else { - let mut bar = $progress.create_task_progress(BATCH_OPERATION_LABEL.to_string()); - bar.log(&format!($batch_text, $chunk.len())); - bar - } - }}; -} - -#[derive(Debug, Copy, Clone, PartialEq)] -pub enum Goal { - /// Build the configurations only. - Build, - - /// Push the closures only. - Push, - - /// Make the configuration the boot default and activate now. - Switch, - - /// Make the configuration the boot default. - Boot, - - /// Activate the configuration, but don't make it the boot default. - Test, - - /// Show what would be done if this configuration were activated. - DryActivate, -} - -impl Goal { - pub fn from_str(s: &str) -> Option { - match s { - "build" => Some(Self::Build), - "push" => Some(Self::Push), - "switch" => Some(Self::Switch), - "boot" => Some(Self::Boot), - "test" => Some(Self::Test), - "dry-activate" => Some(Self::DryActivate), - _ => None, - } - } - - pub fn as_str(&self) -> Option<&'static str> { - use Goal::*; - match self { - Build => None, - Push => None, - Switch => Some("switch"), - Boot => Some("boot"), - Test => Some("test"), - DryActivate => Some("dry-activate"), - } - } - - pub fn success_str(&self) -> Option<&'static str> { - use Goal::*; - match self { - Build => Some("Configuration built"), - Push => Some("Pushed"), - Switch => Some("Activation successful"), - Boot => Some("Will be activated next boot"), - Test => Some("Activation successful (test)"), - DryActivate => Some("Dry activation successful"), - } - } - - pub fn should_switch_profile(&self) -> bool { - use Goal::*; - match self { - Boot | Switch => true, - _ => false, - } - } - - pub fn requires_activation(&self) -> bool { - use Goal::*; - match self { - Build | Push => false, - _ => true, - } - } -} - -/// Internal deployment stages. -#[derive(Debug)] -enum Stage { - Evaluate(Vec), - Build(Vec), - Apply(NodeName), -} - -/// Results of a deployment to a node. -#[derive(Debug)] -struct DeploymentResult { - /// Stage in which the deployment ended. - stage: Stage, - - /// Whether the deployment succeeded or not. - success: bool, - - /// Unstructured logs of the deployment. - logs: Option, -} - -impl DeploymentResult { - fn success(stage: Stage, logs: Option) -> Self { - Self { - stage, - success: true, - logs, - } - } - - fn failure(stage: Stage, logs: Option) -> Self { - Self { - stage, - success: false, - logs, - } - } - - fn is_successful(&self) -> bool { - self.success - } - - fn print(&self) { - use Stage::*; - - if self.is_successful() { - unimplemented!(); - } - - match &self.stage { - Evaluate(nodes) => { - self.print_failed_nodes("Evaluation of", &nodes, true); - } - Build(nodes) => { - self.print_failed_nodes("Build of", &nodes, true); - } - Apply(node) => { - self.print_failed_nodes("Deployment to", &vec![node.clone()], false); - } - } - } - - fn print_failed_nodes(&self, prefix: &'static str, nodes: &Vec, full_logs: bool) { - let msg = if nodes.len() == 1 { - format!("{} {} failed.", prefix, nodes[0].as_str()) - } else { - format!("{} {} nodes failed.", prefix, nodes.len()) - }; - - if let Some(logs) = self.logs.as_ref() { - let mut lines = logs.split("\n").collect::>(); - - if full_logs { - log::error!("{} Logs:", msg); - } else { - lines = lines.drain(..).rev().take(10).rev().collect(); - log::error!("{} Last {} lines of logs:", msg, lines.len()); - } - - for line in lines { - log::error!("{}", line); - } - } - } -} - -/// A deployment target. -#[derive(Debug)] -pub struct Target { - host: Box, - config: NodeConfig, -} - -impl Target { - pub fn new(host: Box, config: NodeConfig) -> Self { - Self { host, config } - } -} - -#[derive(Debug)] -pub struct Deployment { - hive: Hive, - goal: Goal, - target_names: Vec, - targets: Mutex>, - label_width: usize, - parallelism_limit: ParallelismLimit, - evaluation_node_limit: EvaluationNodeLimit, - options: DeploymentOptions, - results: Mutex>, -} - -impl Deployment { - pub fn new(hive: Hive, targets: HashMap, goal: Goal) -> Self { - let target_names: Vec = targets.keys().cloned().collect(); - - let label_width = if let Some(len) = target_names.iter().map(|n| n.len()).max() { - max(BATCH_OPERATION_LABEL.len(), len) - } else { - BATCH_OPERATION_LABEL.len() - }; - - Self { - hive, - goal, - target_names, - targets: Mutex::new(targets), - label_width, - parallelism_limit: ParallelismLimit::default(), - evaluation_node_limit: EvaluationNodeLimit::default(), - options: DeploymentOptions::default(), - results: Mutex::new(Vec::new()), - } - } - - pub fn set_options(&mut self, options: DeploymentOptions) { - self.options = options; - } - - pub fn set_parallelism_limit(&mut self, limit: ParallelismLimit) { - self.parallelism_limit = limit; - } - - pub fn set_evaluation_node_limit(&mut self, limit: EvaluationNodeLimit) { - self.evaluation_node_limit = limit; - } - - /// Uploads keys only (user-facing) - pub async fn upload_keys(self: Arc) -> bool { - let progress = { - let mut progress = Progress::default(); - progress.set_label_width(self.label_width); - Arc::new(progress) - }; - - let arc_self = self.clone(); - - { - let arc_self = self.clone(); - progress.run(|progress| async move { - let mut futures = Vec::new(); - - for node in self.target_names.iter() { - let node = node.to_owned(); - - let mut target = { - let mut targets = arc_self.targets.lock().await; - targets.remove(&node).unwrap() - }; - - let arc_self = self.clone(); - let progress = progress.clone(); - futures.push(async move { - let permit = arc_self.parallelism_limit.apply.acquire().await.unwrap(); - let mut task = progress.create_task_progress(node.to_string()); - - task.log("Uploading keys..."); - - if let Err(e) = target.host.upload_keys(&target.config.keys, true).await { - task.failure_err(&e); - - let mut results = arc_self.results.lock().await; - let stage = Stage::Apply(node); - let logs = target.host.dump_logs().await.map(|s| s.to_string()); - results.push(DeploymentResult::failure(stage, logs)); - return; - } else { - task.success("Keys uploaded"); - } - - drop(permit); - }); - } - - join_all(futures).await - }).await; - } - - arc_self.print_logs().await; - - arc_self.all_successful().await - } - - /// Executes the deployment (user-facing) - /// - /// Self must be wrapped inside an Arc. - pub async fn execute(self: Arc) -> bool { - let progress = { - let mut progress = if !self.options.progress_bar { - Progress::with_style(OutputStyle::Plain) - } else { - Progress::default() - }; - progress.set_label_width(self.label_width); - Arc::new(progress) - }; - - let arc_self = self.clone(); - - { - let arc_self = self.clone(); - let eval_limit = arc_self.clone().eval_limit(); - - progress.run(|progress| async move { - let mut futures = Vec::new(); - - for chunk in self.target_names.chunks(eval_limit) { - let arc_self = arc_self.clone(); - let progress = progress.clone(); - - // FIXME: Eww - let chunk: Vec = chunk.iter().map(|s| s.clone()).collect(); - - futures.push(async move { - let drv = { - // Evaluation phase - let permit = arc_self.parallelism_limit.evaluation.acquire().await.unwrap(); - - let bar = set_up_batch_progress_bar!(progress, style, chunk, - "Evaluating configuration...", - "Evaluating configurations for {} nodes" - ); - - let arc_self = arc_self.clone(); - let drv = match arc_self.eval_profiles(&chunk, bar).await { - Some(drv) => drv, - None => { - return; - } - }; - - drop(permit); - drv - }; - - let profiles = { - // Build phase - let permit = arc_self.parallelism_limit.build.acquire().await.unwrap(); - let bar = set_up_batch_progress_bar!(progress, style, chunk, - "Building configuration...", - "Building configurations for {} nodes" - ); - - let goal = arc_self.goal; - let profiles = arc_self.clone().build_profiles(&chunk, drv, bar.clone()).await; - - let profiles = match profiles { - Some(profiles) => profiles, - None => { - return; - } - }; - - let build_elapsed = bar.get_elapsed(); - bar.success_quiet(); - - if goal == Goal::Build { - for (node, profile) in profiles.iter() { - let mut bar = progress.create_task_progress(node.to_string()); - if let Some(elapsed) = build_elapsed { - bar.set_elapsed(elapsed); - } - bar.success(&format!("Built {:?}", profile.as_path())); - } - } - - if arc_self.options.create_gc_roots { - // Create GC roots - if let Some(dir) = arc_self.hive.context_dir() { - let base = dir.join(".gcroots"); - - if let Err(e) = profiles.create_gc_roots(&base).await { - let bar = progress.create_task_progress(BATCH_OPERATION_LABEL.to_string()); - bar.failure(&format!("Failed to create GC roots: {:?}", e)); - } - } - } - - drop(permit); - profiles - }; - - // Should we continue? - if arc_self.goal == Goal::Build { - return; - } - - // Apply phase - let mut futures = Vec::new(); - for node in chunk { - let arc_self = arc_self.clone(); - let progress = progress.clone(); - - let target = { - let mut targets = arc_self.targets.lock().await; - targets.remove(&node).unwrap() - }; - let profile = profiles.get(&node).cloned() - .expect(&format!("Somehow profile for {} was not built", node.as_str())); - - futures.push(async move { - arc_self.apply_profile(&node, target, profile, progress).await - }); - } - join_all(futures).await; - }); - } - - join_all(futures).await; - }).await; - } - - arc_self.print_logs().await; - - arc_self.all_successful().await - } - - async fn all_successful(&self) -> bool { - let results = self.results.lock().await; - results.iter().filter(|r| !r.is_successful()).count() == 0 - } - - async fn print_logs(&self) { - let results = self.results.lock().await; - for result in results.iter() { - if !result.is_successful() { - result.print(); - } - } - } - - async fn eval_profiles(self: Arc, chunk: &Vec, progress: TaskProgress) -> Option> { - let (eval, logs) = self.hive.eval_selected(&chunk, progress.clone()).await; - - match eval { - Ok(drv) => { - progress.success_quiet(); - Some(drv) - } - Err(e) => { - progress.failure(&format!("Evalation failed: {}", e)); - - let mut results = self.results.lock().await; - let stage = Stage::Evaluate(chunk.clone()); - results.push(DeploymentResult::failure(stage, logs)); - None - } - } - } - - async fn build_profiles(self: Arc, chunk: &Vec, derivation: StoreDerivation, progress: TaskProgress) -> Option { - let nix_options = self.hive.nix_options().await.unwrap(); - // FIXME: Remote build? - let mut builder = host::local(nix_options); - - builder.set_progress_bar(progress.clone()); - - match derivation.realize(&mut *builder).await { - Ok(profiles) => { - progress.success("Build successful"); - - let mut results = self.results.lock().await; - let stage = Stage::Build(chunk.clone()); - let logs = builder.dump_logs().await.map(|s| s.to_string()); - results.push(DeploymentResult::success(stage, logs)); - - Some(profiles) - } - Err(e) => { - progress.failure(&format!("Build failed: {}", e)); - - let mut results = self.results.lock().await; - let stage = Stage::Build(chunk.clone()); - let logs = builder.dump_logs().await.map(|s| s.to_string()); - results.push(DeploymentResult::failure(stage, logs)); - None - } - } - } - - async fn apply_profile(self: Arc, name: &NodeName, mut target: Target, profile: Profile, multi: Arc) { - let permit = self.parallelism_limit.apply.acquire().await.unwrap(); - - let mut bar = multi.create_task_progress(name.to_string()); - - // FIXME: Would be nicer to check remote status before spending time evaluating/building - if !target.config.replace_unknown_profiles { - bar.log("Checking remote profile..."); - match target.host.active_derivation_known().await { - Ok(_) => { - bar.log("Remote profile known"); - } - Err(e) => { - if self.options.force_replace_unknown_profiles { - bar.log("warning: remote profile is unknown, but unknown profiles are being ignored"); - } else { - bar.failure(&format!("Failed: {}", e)); - return; - } - } - } - } - - let pre_activation_keys = target.config.keys.iter() - .filter(|(_, v)| v.upload_at() == UploadAt::PreActivation) - .map(|(k, v)| (k.clone(), v.clone())) - .collect::>(); - - let post_activation_keys = target.config.keys.iter() - .filter(|(_, v)| v.upload_at() == UploadAt::PostActivation) - .map(|(k, v)| (k.clone(), v.clone())) - .collect::>(); - - if self.options.upload_keys && !pre_activation_keys.is_empty() { - bar.log("Uploading keys..."); - - if let Err(e) = target.host.upload_keys(&pre_activation_keys, false).await { - bar.failure_err(&e); - - let mut results = self.results.lock().await; - let stage = Stage::Apply(name.clone()); - let logs = target.host.dump_logs().await.map(|s| s.to_string()); - results.push(DeploymentResult::failure(stage, logs)); - return; - } - } - - bar.log("Starting..."); - - target.host.set_progress_bar(bar.clone()); - - let copy_options = self.options.to_copy_options() - .include_outputs(true); - - match target.host.deploy(&profile, self.goal, copy_options).await { - Ok(_) => { - // FIXME: This is ugly - if self.options.upload_keys && !post_activation_keys.is_empty() { - bar.log("Uploading keys (post-activation)..."); - - if let Err(e) = target.host.upload_keys(&post_activation_keys, true).await { - bar.failure_err(&e); - - let mut results = self.results.lock().await; - let stage = Stage::Apply(name.clone()); - let logs = target.host.dump_logs().await.map(|s| s.to_string()); - results.push(DeploymentResult::failure(stage, logs)); - return; - } - } - - bar.success(self.goal.success_str().unwrap()); - - let mut results = self.results.lock().await; - let stage = Stage::Apply(name.clone()); - let logs = target.host.dump_logs().await.map(|s| s.to_string()); - results.push(DeploymentResult::success(stage, logs)); - } - Err(e) => { - bar.failure(&format!("Failed: {}", e)); - - let mut results = self.results.lock().await; - let stage = Stage::Apply(name.clone()); - let logs = target.host.dump_logs().await.map(|s| s.to_string()); - results.push(DeploymentResult::failure(stage, logs)); - } - } - - drop(permit); - } - - fn eval_limit(&self) -> usize { - if let Some(limit) = self.evaluation_node_limit.get_limit() { - limit - } else { - self.target_names.len() - } - } -} - -#[derive(Debug)] -pub struct ParallelismLimit { - /// Limit of concurrent evaluation processes. - evaluation: Semaphore, - - /// Limit of concurrent build processes. - build: Semaphore, - - /// Limit of concurrent apply processes. - apply: Semaphore, -} - -impl Default for ParallelismLimit { - fn default() -> Self { - Self { - evaluation: Semaphore::new(1), - build: Semaphore::new(2), - apply: Semaphore::new(10), - } - } -} - -impl ParallelismLimit { - // Do we actually want them to be configurable? - /* - /// Sets the concurrent evaluation limit. - /// - /// This limits the number of evaluation processes, not - /// the number of nodes in each evaluation process. - /// The latter is controlled in DeploymentOptions. - pub fn set_evaluation_limit(&mut self, limit: usize) { - self.evaluation = Semaphore::new(limit); - } - - /// Sets the concurrent build limit. - pub fn set_build_limit(&mut self, limit: usize) { - self.build = Semaphore::new(limit); - } - */ - - /// Sets the concurrent apply limit. - pub fn set_apply_limit(&mut self, limit: usize) { - self.apply = Semaphore::new(limit); - } -} - -#[derive(Clone, Debug)] -pub struct DeploymentOptions { - /// Whether to show condensed progress bars. - /// - /// If set to false, verbose logs will be displayed instead. - progress_bar: bool, - - /// Whether to use binary caches when copying closures to remote hosts. - substituters_push: bool, - - /// Whether to use gzip when copying closures to remote hosts. - gzip: bool, - - /// Whether to upload keys when deploying. - upload_keys: bool, - - /// Whether to create GC roots for node profiles. - /// - /// If true, .gc_roots will be created under the hive's context - /// directory if it exists. - create_gc_roots: bool, - - /// Ignore the node-level `deployment.replaceUnknownProfiles` option. - force_replace_unknown_profiles: bool, -} - -impl Default for DeploymentOptions { - fn default() -> Self { - Self { - progress_bar: true, - substituters_push: true, - gzip: true, - upload_keys: true, - create_gc_roots: false, - force_replace_unknown_profiles: false, - } - } -} - -impl DeploymentOptions { - pub fn set_progress_bar(&mut self, value: bool) { - self.progress_bar = value; - } - - pub fn set_substituters_push(&mut self, value: bool) { - self.substituters_push = value; - } - - pub fn set_gzip(&mut self, value: bool) { - self.gzip = value; - } - - pub fn set_upload_keys(&mut self, enable: bool) { - self.upload_keys = enable; - } - - pub fn set_create_gc_roots(&mut self, enable: bool) { - self.create_gc_roots = enable; - } - - pub fn set_force_replace_unknown_profiles(&mut self, enable: bool) { - self.force_replace_unknown_profiles = enable; - } - - fn to_copy_options(&self) -> CopyOptions { - let options = CopyOptions::default(); - - options - .use_substitutes(self.substituters_push) - .gzip(self.gzip) - } -} - -/// Limit of the number of nodes in each evaluation process. -/// -/// The evaluation process is very RAM-intensive, with memory -/// consumption scaling linearly with the number of nodes -/// evaluated at the same time. This can be a problem if you -/// are deploying to a large number of nodes at the same time, -/// where `nix-instantiate` may consume too much RAM and get -/// killed by the OS (`NixKilled` error). -/// -/// Evaluating each node on its own is not an efficient solution, -/// with total CPU time and memory consumption vastly exceeding the -/// case where we evaluate the same set of nodes at the same time -/// (TODO: Provide statistics). -/// -/// To overcome this problem, we split the evaluation process into -/// chunks when necessary, with the maximum number of nodes in -/// each `nix-instantiate` invocation determined with: -/// -/// - A simple heuristic based on remaining memory in the system -/// - A supplied number -/// - No limit at all -#[derive(Copy, Clone, Debug)] -pub enum EvaluationNodeLimit { - /// Use a naive heuristic based on available memory. - Heuristic, - - /// Supply the maximum number of nodes. - Manual(usize), - - /// Do not limit the number of nodes in each evaluation process - None, -} - -impl Default for EvaluationNodeLimit { - fn default() -> Self { - Self::Heuristic - } -} - -impl EvaluationNodeLimit { - /// Returns the maximum number of hosts in each evaluation. - /// - /// The result should be cached. - pub fn get_limit(&self) -> Option { - match self { - EvaluationNodeLimit::Heuristic => { - if let Ok(mem_info) = sys_info::mem_info() { - let mut mb = mem_info.avail / 1024; - - if mb >= EVAL_RESERVE_MB { - mb -= EVAL_RESERVE_MB; - } - - let nodes = mb / EVAL_PER_HOST_MB; - - if nodes == 0 { - Some(1) - } else { - Some(nodes as usize) - } - } else { - Some(10) - } - } - EvaluationNodeLimit::Manual(limit) => Some(*limit), - EvaluationNodeLimit::None => None, - } - } -} diff --git a/src/nix/deployment/goal.rs b/src/nix/deployment/goal.rs new file mode 100644 index 0000000..d62cb1a --- /dev/null +++ b/src/nix/deployment/goal.rs @@ -0,0 +1,100 @@ +//! Deployment goals. + +/// The goal of a deployment. +#[derive(Debug, Copy, Clone, PartialEq)] +pub enum Goal { + /// Build the configurations only. + Build, + + /// Push the closures only. + Push, + + /// Make the configuration the boot default and activate now. + Switch, + + /// Make the configuration the boot default. + Boot, + + /// Activate the configuration, but don't make it the boot default. + Test, + + /// Show what would be done if this configuration were activated. + DryActivate, + + /// Only upload keys. + UploadKeys, +} + +impl Goal { + pub fn from_str(s: &str) -> Option { + match s { + "build" => Some(Self::Build), + "push" => Some(Self::Push), + "switch" => Some(Self::Switch), + "boot" => Some(Self::Boot), + "test" => Some(Self::Test), + "dry-activate" => Some(Self::DryActivate), + "keys" => Some(Self::UploadKeys), + _ => None, + } + } + + pub fn as_str(&self) -> Option<&'static str> { + use Goal::*; + match self { + Build => None, + Push => None, + Switch => Some("switch"), + Boot => Some("boot"), + Test => Some("test"), + DryActivate => Some("dry-activate"), + UploadKeys => Some("keys"), + } + } + + pub fn success_str(&self) -> &'static str { + use Goal::*; + match self { + Build => "Configuration built", + Push => "Pushed", + Switch => "Activation successful", + Boot => "Will be activated next boot", + Test => "Activation successful (test)", + DryActivate => "Dry activation successful", + UploadKeys => "Uploaded keys", + } + } + + pub fn should_switch_profile(&self) -> bool { + use Goal::*; + match self { + Boot | Switch => true, + _ => false, + } + } + + pub fn requires_activation(&self) -> bool { + use Goal::*; + match self { + Build | UploadKeys | Push => false, + _ => true, + } + } + + pub fn requires_target_host(&self) -> bool { + use Goal::*; + match self { + Build => false, + _ => true, + } + } + + /// Is this a real goal supported by switch-to-configuration? + pub fn is_real_goal(&self) -> bool { + use Goal::*; + match self { + Build | UploadKeys | Push => false, + _ => true, + } + } +} diff --git a/src/nix/deployment/limits.rs b/src/nix/deployment/limits.rs new file mode 100644 index 0000000..b099271 --- /dev/null +++ b/src/nix/deployment/limits.rs @@ -0,0 +1,109 @@ +//! Parallelism limits. + +use tokio::sync::Semaphore; + +/// Amount of RAM reserved for the system, in MB. +const EVAL_RESERVE_MB: u64 = 1024; + +/// Estimated amount of RAM needed to evaluate one host, in MB. +const EVAL_PER_HOST_MB: u64 = 512; + +/// The parallelism limit for a deployment. +#[derive(Debug)] +pub struct ParallelismLimit { + /// Limit of concurrent evaluation processes. + pub evaluation: Semaphore, + + /// Limit of concurrent build processes. + pub build: Semaphore, + + /// Limit of concurrent apply processes. + pub apply: Semaphore, +} + +impl Default for ParallelismLimit { + fn default() -> Self { + Self { + evaluation: Semaphore::new(1), + build: Semaphore::new(2), + apply: Semaphore::new(10), + } + } +} + +impl ParallelismLimit { + /// Sets the concurrent apply limit. + pub fn set_apply_limit(&mut self, limit: usize) { + self.apply = Semaphore::new(limit); + } +} + +/// Limit of the number of nodes in each evaluation process. +/// +/// The evaluation process is very RAM-intensive, with memory +/// consumption scaling linearly with the number of nodes +/// evaluated at the same time. This can be a problem if you +/// are deploying to a large number of nodes at the same time, +/// where `nix-instantiate` may consume too much RAM and get +/// killed by the OS (`NixKilled` error). +/// +/// Evaluating each node on its own is not an efficient solution, +/// with total CPU time and memory consumption vastly exceeding the +/// case where we evaluate the same set of nodes at the same time +/// (TODO: Provide statistics). +/// +/// To overcome this problem, we split the evaluation process into +/// chunks when necessary, with the maximum number of nodes in +/// each `nix-instantiate` invocation determined with: +/// +/// - A simple heuristic based on remaining memory in the system +/// - A supplied number +/// - No limit at all +#[derive(Copy, Clone, Debug)] +pub enum EvaluationNodeLimit { + /// Use a naive heuristic based on available memory. + Heuristic, + + /// Supply the maximum number of nodes. + Manual(usize), + + /// Do not limit the number of nodes in each evaluation process + None, +} + +impl Default for EvaluationNodeLimit { + fn default() -> Self { + Self::Heuristic + } +} + +impl EvaluationNodeLimit { + /// Returns the maximum number of hosts in each evaluation. + /// + /// The result should be cached. + pub fn get_limit(&self) -> Option { + match self { + EvaluationNodeLimit::Heuristic => { + if let Ok(mem_info) = sys_info::mem_info() { + let mut mb = mem_info.avail / 1024; + + if mb >= EVAL_RESERVE_MB { + mb -= EVAL_RESERVE_MB; + } + + let nodes = mb / EVAL_PER_HOST_MB; + + if nodes == 0 { + Some(1) + } else { + Some(nodes as usize) + } + } else { + Some(10) + } + } + EvaluationNodeLimit::Manual(limit) => Some(*limit), + EvaluationNodeLimit::None => None, + } + } +} diff --git a/src/nix/deployment/mod.rs b/src/nix/deployment/mod.rs new file mode 100644 index 0000000..6222e76 --- /dev/null +++ b/src/nix/deployment/mod.rs @@ -0,0 +1,435 @@ +//! Deployment logic. + +pub mod goal; + +pub use goal::Goal; + +pub mod limits; +pub use limits::{EvaluationNodeLimit, ParallelismLimit}; + +pub mod options; +pub use options::Options; + +use std::collections::HashMap; +use std::mem; +use std::sync::Arc; + +use futures::future::join_all; +use itertools::Itertools; + +use crate::progress::Sender as ProgressSender; +use crate::job::{JobMonitor, JobHandle, JobType, JobState}; +use crate::util; + +use super::{ + Hive, + Host, + NodeName, + NodeConfig, + NixError, + NixResult, + Profile, + ProfileMap, + StoreDerivation, + CopyDirection, + key::{Key, UploadAt as UploadKeyAt}, +}; +use super::host; + +/// A deployment. +pub type DeploymentHandle = Arc; + +/// A map of target nodes. +pub type TargetNodeMap = HashMap; + +/// A deployment. +#[derive(Debug)] +pub struct Deployment { + /// The configuration. + hive: Hive, + + /// The goal of this deployment. + goal: Goal, + + /// Deployment options. + options: Options, + + /// Handle to send messages to the ProgressOutput. + progress: Option, + + /// Names of the target nodes. + nodes: Vec, + + /// Handles to the deployment targets. + targets: HashMap, + + /// Parallelism limit. + parallelism_limit: ParallelismLimit, + + /// Evaluation limit. + evaluation_node_limit: EvaluationNodeLimit, + + /// Whether it was executed. + executed: bool, +} + +/// Handle to a target node. +#[derive(Debug)] +pub struct TargetNode { + /// Name of the node. + name: NodeName, + + /// The host to deploy to. + host: Option>, + + /// The config.deployment values of the node. + config: NodeConfig, +} + +impl TargetNode { + pub fn new(name: NodeName, host: Option>, config: NodeConfig) -> Self { + Self { name, host, config } + } + + pub fn into_host(self) -> Option> { + self.host + } +} + +impl Deployment { + /// Creates a new deployment. + pub fn new(hive: Hive, targets: TargetNodeMap, goal: Goal, progress: Option) -> Self { + Self { + hive, + goal, + progress, + nodes: targets.keys().cloned().collect(), + targets, + parallelism_limit: ParallelismLimit::default(), + evaluation_node_limit: EvaluationNodeLimit::default(), + options: Options::default(), + executed: false, + } + } + + /// Executes the deployment. + /// + /// If a ProgressSender is supplied, then this should be run in parallel + /// with its `run_until_completion()` future. + pub async fn execute(mut self) -> NixResult<()> { + if self.executed { + return Err(NixError::DeploymentAlreadyExecuted); + } + + self.executed = true; + + let (mut monitor, meta) = JobMonitor::new(self.progress.clone()); + + if let Some(width) = util::get_label_width(&self.targets) { + monitor.set_label_width(width); + } + + if self.goal == Goal::UploadKeys { + // Just upload keys + let targets = mem::take(&mut self.targets); + let deployment = DeploymentHandle::new(self); + let meta_future = meta.run(|meta| async move { + let mut futures = Vec::new(); + + for target in targets.into_values() { + futures.push(deployment.clone().upload_keys_to_node(meta.clone(), target)); + } + + let result: NixResult> = join_all(futures).await.into_iter().collect(); + + result?; + + Ok(()) + }); + + let (result, _) = tokio::join!( + meta_future, + monitor.run_until_completion(), + ); + + result?; + + Ok(()) + } else { + // Do the whole eval-build-deploy flow + let chunks = self.get_chunks(); + let deployment = DeploymentHandle::new(self); + let meta_future = meta.run(|meta| async move { + let mut futures = Vec::new(); + + for chunk in chunks.into_iter() { + futures.push(deployment.clone().execute_chunk(meta.clone(), chunk)); + } + + let result: NixResult> = join_all(futures).await.into_iter().collect(); + + result?; + + Ok(()) + }); + + let (result, _) = tokio::join!( + meta_future, + monitor.run_until_completion(), + ); + + result?; + + Ok(()) + } + } + + pub fn set_options(&mut self, options: Options) { + self.options = options; + } + + pub fn set_parallelism_limit(&mut self, limit: ParallelismLimit) { + self.parallelism_limit = limit; + } + + pub fn set_evaluation_node_limit(&mut self, limit: EvaluationNodeLimit) { + self.evaluation_node_limit = limit; + } + + fn get_chunks(&mut self) -> Vec { + let eval_limit = self.evaluation_node_limit.get_limit() + .unwrap_or(self.targets.len()); + let mut result = Vec::new(); + + for chunk in self.targets.drain().chunks(eval_limit).into_iter() { + let mut map = HashMap::new(); + for (name, host) in chunk { + map.insert(name, host); + } + result.push(map); + } + + result + } + + /// Executes the deployment against a portion of nodes. + async fn execute_chunk(self: DeploymentHandle, parent: JobHandle, mut chunk: TargetNodeMap) -> NixResult<()> { + if self.goal == Goal::UploadKeys { + unreachable!(); // some logic is screwed up + } + + let nodes: Vec = chunk.keys().cloned().collect(); + let profiles = self.clone().build_nodes(parent.clone(), nodes.clone()).await?; + + if self.goal == Goal::Build { + return Ok(()); + } + + for (name, profile) in profiles.iter() { + let target = chunk.remove(&name).unwrap(); + self.clone().deploy_node(parent.clone(), target, profile.clone()).await?; + } + + // Create GC root + if self.options.create_gc_roots { + let job = parent.create_job(JobType::CreateGcRoots, nodes.clone())?; + let arc_self = self.clone(); + job.run_waiting(|job| async move { + if let Some(dir) = arc_self.hive.context_dir() { + job.state(JobState::Running)?; + let base = dir.join(".gcroots"); + + profiles.create_gc_roots(&base).await?; + } else { + job.noop("No context directory to create GC roots in".to_string())?; + } + Ok(()) + }).await?; + } + + Ok(()) + } + + /// Evaluates a set of nodes, returning a store derivation. + async fn evaluate_nodes(self: DeploymentHandle, parent: JobHandle, nodes: Vec) + -> NixResult> + { + let job = parent.create_job(JobType::Evaluate, nodes.clone())?; + + job.run_waiting(|job| async move { + // Wait for eval limit + let permit = self.parallelism_limit.evaluation.acquire().await.unwrap(); + job.state(JobState::Running)?; + + let result = self.hive.eval_selected(&nodes, Some(job.clone())).await; + + drop(permit); + result + }).await + } + + /// Builds a set of nodes, returning a set of profiles. + async fn build_nodes(self: DeploymentHandle, parent: JobHandle, nodes: Vec) + -> NixResult + { + let job = parent.create_job(JobType::Build, nodes.clone())?; + + job.run_waiting(|job| async move { + let derivation = self.clone().evaluate_nodes(job.clone(), nodes.clone()).await?; + + // Wait for build limit + let permit = self.parallelism_limit.apply.acquire().await.unwrap(); + job.state(JobState::Running)?; + + // FIXME: Remote builder? + let nix_options = self.hive.nix_options().await.unwrap(); + let mut builder = host::local(nix_options); + + let map = derivation.realize(&mut *builder).await?; + + job.profiles_built(map.clone())?; + + drop(permit); + Ok(map) + }).await + } + + /// Only uploads keys to a node. + async fn upload_keys_to_node(self: DeploymentHandle, parent: JobHandle, mut target: TargetNode) -> NixResult<()> { + let nodes = vec![target.name.clone()]; + let job = parent.create_job(JobType::UploadKeys, nodes)?; + job.run(|_| async move { + if target.host.is_none() { + return Err(NixError::Unsupported); + } + + let host = target.host.as_mut().unwrap(); + host.upload_keys(&target.config.keys, true).await?; + + Ok(()) + }).await + } + + /// Pushes and optionally activates a system profile on a given node. + /// + /// This will also upload keys to the node. + async fn deploy_node(self: DeploymentHandle, parent: JobHandle, mut target: TargetNode, profile: Profile) + -> NixResult<()> + { + if self.goal == Goal::Build { + unreachable!(); + } + + let nodes = vec![target.name.clone()]; + + let push_job = parent.create_job(JobType::Push, nodes.clone())?; + let push_profile = profile.clone(); + let arc_self = self.clone(); + let mut target = push_job.run_waiting(|job| async move { + if target.host.is_none() { + return Err(NixError::Unsupported); + } + + let permit = arc_self.parallelism_limit.apply.acquire().await.unwrap(); + job.state(JobState::Running)?; + + let host = target.host.as_mut().unwrap(); + host.copy_closure( + push_profile.as_store_path(), + CopyDirection::ToRemote, + arc_self.options.to_copy_options()).await?; + + drop(permit); + Ok(target) + }).await?; + + if !self.goal.requires_activation() { + // We are done here :) + return Ok(()); + } + + // Upload pre-activation keys + let mut target = if self.options.upload_keys { + let job = parent.create_job(JobType::UploadKeys, nodes.clone())?; + job.run_waiting(|job| async move { + let keys = target.config.keys.iter() + .filter(|(_, v)| v.upload_at() == UploadKeyAt::PreActivation) + .map(|(k, v)| (k.clone(), v.clone())) + .collect::>(); + + if keys.is_empty() { + job.noop("No pre-activation keys to upload".to_string())?; + return Ok(target); + } + + job.state(JobState::Running)?; + job.message("Uploading pre-activation keys...".to_string())?; + + let host = target.host.as_mut().unwrap(); + host.upload_keys(&keys, false).await?; + + job.success_with_message("Uploaded keys (pre-activation)".to_string())?; + Ok(target) + }).await? + } else { + target + }; + + // Activate profile + let activation_job = parent.create_job(JobType::Activate, nodes.clone())?; + let arc_self = self.clone(); + let profile_r = profile.clone(); + let mut target = activation_job.run(|job| async move { + let host = target.host.as_mut().unwrap(); + + if !target.config.replace_unknown_profiles { + job.message("Checking remote profile...".to_string())?; + match host.active_derivation_known().await { + Ok(_) => { + job.message("Remote profile known".to_string())?; + } + Err(e) => { + if arc_self.options.force_replace_unknown_profiles { + job.message("warning: remote profile is unknown, but unknown profiles are being ignored".to_string())?; + } else { + return Err(e); + } + } + } + } + + host.activate(&profile_r, arc_self.goal).await?; + + job.success_with_message(arc_self.goal.success_str().to_string())?; + + Ok(target) + }).await?; + + // Upload post-activation keys + if self.options.upload_keys { + let job = parent.create_job(JobType::UploadKeys, nodes.clone())?; + job.run_waiting(|job| async move { + let keys = target.config.keys.iter() + .filter(|(_, v)| v.upload_at() == UploadKeyAt::PostActivation) + .map(|(k, v)| (k.clone(), v.clone())) + .collect::>(); + + if keys.is_empty() { + job.noop("No post-activation keys to upload".to_string())?; + return Ok(()); + } + + job.state(JobState::Running)?; + job.message("Uploading post-activation keys...".to_string())?; + + let host = target.host.as_mut().unwrap(); + host.upload_keys(&keys, true).await?; + + job.success_with_message("Uploaded keys (post-activation)".to_string())?; + Ok(()) + }).await?; + } + + Ok(()) + } +} diff --git a/src/nix/deployment/options.rs b/src/nix/deployment/options.rs new file mode 100644 index 0000000..10941af --- /dev/null +++ b/src/nix/deployment/options.rs @@ -0,0 +1,67 @@ +//! Deployment options. + +use crate::nix::CopyOptions; + +/// Options for a deployment. +#[derive(Clone, Debug)] +pub struct Options { + /// Whether to use binary caches when copying closures to remote hosts. + pub(super) substituters_push: bool, + + /// Whether to use gzip when copying closures to remote hosts. + pub(super) gzip: bool, + + /// Whether to upload keys when deploying. + pub(super) upload_keys: bool, + + /// Whether to create GC roots for node profiles. + /// + /// If true, .gc_roots will be created under the hive's context + /// directory if it exists. + pub(super) create_gc_roots: bool, + + /// Ignore the node-level `deployment.replaceUnknownProfiles` option. + pub(super) force_replace_unknown_profiles: bool, +} + +impl Options { + pub fn set_substituters_push(&mut self, value: bool) { + self.substituters_push = value; + } + + pub fn set_gzip(&mut self, value: bool) { + self.gzip = value; + } + + pub fn set_upload_keys(&mut self, enable: bool) { + self.upload_keys = enable; + } + + pub fn set_create_gc_roots(&mut self, enable: bool) { + self.create_gc_roots = enable; + } + + pub fn set_force_replace_unknown_profiles(&mut self, enable: bool) { + self.force_replace_unknown_profiles = enable; + } + + pub fn to_copy_options(&self) -> CopyOptions { + let options = CopyOptions::default(); + + options + .use_substitutes(self.substituters_push) + .gzip(self.gzip) + } +} + +impl Default for Options { + fn default() -> Self { + Self { + substituters_push: true, + gzip: true, + upload_keys: true, + create_gc_roots: false, + force_replace_unknown_profiles: false, + } + } +} diff --git a/src/nix/eval.nix b/src/nix/hive/eval.nix similarity index 96% rename from src/nix/eval.nix rename to src/nix/hive/eval.nix index d810e17..fc1d1b4 100644 --- a/src/nix/eval.nix +++ b/src/nix/hive/eval.nix @@ -446,14 +446,15 @@ let value = evalNode name hive.${name}; }) nodeNames); - deploymentConfigJson = toJSON (lib.attrsets.mapAttrs (name: eval: eval.config.deployment) nodes); + toplevel = lib.mapAttrs (name: eval: eval.config.system.build.toplevel) nodes; - toplevel = lib.attrsets.mapAttrs (name: eval: eval.config.system.build.toplevel) nodes; + deploymentConfigJson = toJSON (lib.mapAttrs (name: eval: eval.config.deployment) nodes); - buildAll = buildSelected { - names = nodeNames; - }; - buildSelected = { names ? null }: let + deploymentConfigJsonSelected = names: toJSON + (listToAttrs (map (name: { inherit name; value = nodes.${name}.config.deployment; }) names)); + + buildAll = buildSelected nodeNames; + buildSelected = names: let # Change in the order of the names should not cause a derivation to be created selected = lib.attrsets.filterAttrs (name: _: elem name names) toplevel; in derivation rec { @@ -470,7 +471,11 @@ let inherit pkgs lib nodes; }; in { - inherit nodes deploymentConfigJson toplevel buildAll buildSelected introspect; + inherit + nodes toplevel + deploymentConfigJson deploymentConfigJsonSelected + buildAll buildSelected introspect; + meta = hive.meta; docs = { diff --git a/src/nix/hive.rs b/src/nix/hive/mod.rs similarity index 67% rename from src/nix/hive.rs rename to src/nix/hive/mod.rs index cda7880..b697201 100644 --- a/src/nix/hive.rs +++ b/src/nix/hive/mod.rs @@ -15,11 +15,13 @@ use super::{ NixResult, NodeName, NodeConfig, + NodeFilter, ProfileMap, }; +use super::deployment::TargetNode; use super::NixCommand; use crate::util::CommandExecution; -use crate::progress::TaskProgress; +use crate::job::JobHandle; const HIVE_EVAL: &'static [u8] = include_bytes!("eval.nix"); @@ -116,6 +118,88 @@ impl Hive { Ok(options) } + /// Convenience wrapper to filter nodes for CLI actions. + pub async fn select_nodes(&self, filter: Option, ssh_config: Option, ssh_only: bool) -> NixResult> { + let mut node_configs = None; + + log::info!("Enumerating nodes..."); + + let all_nodes = self.node_names().await?; + let selected_nodes = match filter { + Some(filter) => { + if filter.has_node_config_rules() { + log::debug!("Retrieving deployment info for all nodes..."); + + let all_node_configs = self.deployment_info().await?; + let filtered = filter.filter_node_configs(all_node_configs.iter()) + .into_iter().collect(); + + node_configs = Some(all_node_configs); + + filtered + } else { + filter.filter_node_names(&all_nodes)? + .into_iter().collect() + } + } + None => all_nodes.clone(), + }; + + let n_selected = selected_nodes.len(); + + let mut node_configs = if let Some(configs) = node_configs { + configs + } else { + log::debug!("Retrieving deployment info for selected nodes..."); + self.deployment_info_selected(&selected_nodes).await? + }; + + let mut targets = HashMap::new(); + let mut n_ssh = 0; + for node in selected_nodes.into_iter() { + let config = node_configs.remove(&node).unwrap(); + + let host = config.to_ssh_host().map(|mut host| { + n_ssh += 1; + + if let Some(ssh_config) = &ssh_config { + host.set_ssh_config(ssh_config.clone()); + } + host.upcast() + }); + let ssh_host = host.is_some(); + let target = TargetNode::new(node.clone(), host, config); + + if !ssh_only || ssh_host { + targets.insert(node, target); + } + } + + let skipped = n_selected - n_ssh; + + if targets.is_empty() { + if skipped != 0 { + log::warn!("No hosts selected."); + } else { + log::warn!("No hosts selected ({} skipped).", skipped); + } + } else if targets.len() == all_nodes.len() { + log::info!("Selected all {} nodes.", targets.len()); + } else if !ssh_only || skipped == 0 { + log::info!("Selected {} out of {} hosts.", targets.len(), all_nodes.len()); + } else { + log::info!("Selected {} out of {} hosts ({} skipped).", targets.len(), all_nodes.len(), skipped); + } + + Ok(targets) + } + + /// Returns a list of all node names. + pub async fn node_names(&self) -> NixResult> { + self.nix_instantiate("attrNames hive.nodes").eval() + .capture_json().await + } + /// Retrieve deployment info for all nodes. pub async fn deployment_info(&self) -> NixResult> { // FIXME: Really ugly :( @@ -133,7 +217,7 @@ impl Hive { } /// Retrieve deployment info for a single node. - pub async fn deployment_info_for(&self, node: &NodeName) -> NixResult> { + pub async fn deployment_info_single(&self, node: &NodeName) -> NixResult> { let expr = format!("toJSON (hive.nodes.\"{}\".config.deployment or null)", node.as_str()); let s: String = self.nix_instantiate(&expr).eval_with_builders().await? .capture_json().await?; @@ -141,48 +225,45 @@ impl Hive { Ok(serde_json::from_str(&s).unwrap()) } + /// Retrieve deployment info for a list of nodes. + pub async fn deployment_info_selected(&self, nodes: &[NodeName]) -> NixResult> { + let nodes_expr = SerializedNixExpresssion::new(nodes)?; + + // FIXME: Really ugly :( + let s: String = self.nix_instantiate(&format!("hive.deploymentConfigJsonSelected {}", nodes_expr.expression())) + .eval_with_builders().await? + .capture_json().await?; + + let configs: HashMap = serde_json::from_str(&s).unwrap(); + for config in configs.values() { + config.validate()?; + for key in config.keys.values() { + key.validate()?; + } + } + + Ok(configs) + } + /// Evaluates selected nodes. /// /// Evaluation may take up a lot of memory, so we make it possible /// to split up the evaluation process into chunks and run them /// concurrently with other processes (e.g., build and apply). - pub async fn eval_selected(&self, nodes: &Vec, progress_bar: TaskProgress) -> (NixResult>, Option) { - // FIXME: The return type is ugly... + pub async fn eval_selected(&self, nodes: &Vec, job: Option) -> NixResult> { + let nodes_expr = SerializedNixExpresssion::new(nodes)?; - let nodes_expr = SerializedNixExpresssion::new(nodes); - if let Err(e) = nodes_expr { - return (Err(e), None); - } - let nodes_expr = nodes_expr.unwrap(); - - let expr = format!("hive.buildSelected {{ names = {}; }}", nodes_expr.expression()); - - let command = match self.nix_instantiate(&expr).instantiate_with_builders().await { - Ok(command) => command, - Err(e) => { - return (Err(e), None); - } - }; + let expr = format!("hive.buildSelected {}", nodes_expr.expression()); + let command = self.nix_instantiate(&expr).instantiate_with_builders().await?; let mut execution = CommandExecution::new(command); - execution.set_progress_bar(progress_bar); + execution.set_job(job); - let eval = execution - .capture_store_path().await; + let path = execution.capture_store_path().await?; + let drv = path.to_derivation() + .expect("The result should be a store derivation"); - let (_, stderr) = execution.get_logs(); - - match eval { - Ok(path) => { - let drv = path.to_derivation() - .expect("The result should be a store derivation"); - - (Ok(drv), stderr.cloned()) - } - Err(e) => { - (Err(e), stderr.cloned()) - } - } + Ok(drv) } /// Evaluates an expression using values from the configuration diff --git a/src/nix/host/key_uploader.rs b/src/nix/host/key_uploader.rs index 252bc16..f400edc 100644 --- a/src/nix/host/key_uploader.rs +++ b/src/nix/host/key_uploader.rs @@ -12,8 +12,8 @@ use shell_escape::unix::escape; use tokio::io::{AsyncWriteExt, BufReader}; use tokio::process::Child; +use crate::job::JobHandle; use crate::nix::{Key, NixResult}; -use crate::progress::TaskProgress; use crate::util::capture_stream; const SCRIPT_TEMPLATE: &'static str = include_str!("./key_uploader.template.sh"); @@ -30,7 +30,7 @@ pub fn generate_script<'a>(key: &'a Key, destination: &'a Path, require_ownershi escape(key_script.into()) } -pub async fn feed_uploader(mut uploader: Child, key: &Key, progress: TaskProgress, logs: &mut String) -> NixResult<()> { +pub async fn feed_uploader(mut uploader: Child, key: &Key, job: Option) -> NixResult<()> { let mut reader = key.reader().await?; let mut stdin = uploader.stdin.take().unwrap(); @@ -42,13 +42,11 @@ pub async fn feed_uploader(mut uploader: Child, key: &Key, progress: TaskProgres let stderr = BufReader::new(uploader.stderr.take().unwrap()); let futures = join3( - capture_stream(stdout, progress.clone()), - capture_stream(stderr, progress.clone()), + capture_stream(stdout, job.clone(), false), + capture_stream(stderr, job.clone(), true), uploader.wait(), ); - let (stdout_str, stderr_str, exit) = futures.await; - logs.push_str(&stdout_str); - logs.push_str(&stderr_str); + let (_, _, exit) = futures.await; let exit = exit?; diff --git a/src/nix/host/local.rs b/src/nix/host/local.rs index 6402443..76b59b9 100644 --- a/src/nix/host/local.rs +++ b/src/nix/host/local.rs @@ -6,9 +6,9 @@ use async_trait::async_trait; use tokio::process::Command; use super::{CopyDirection, CopyOptions, Host, key_uploader}; -use crate::nix::{StorePath, Profile, Goal, NixResult, NixCommand, Key, SYSTEM_PROFILE}; +use crate::nix::{StorePath, Profile, Goal, NixError, NixResult, NixCommand, Key, SYSTEM_PROFILE}; use crate::util::CommandExecution; -use crate::progress::TaskProgress; +use crate::job::JobHandle; /// The local machine running Colmena. /// @@ -16,16 +16,14 @@ use crate::progress::TaskProgress; /// (e.g., building Linux derivations on macOS). #[derive(Debug)] pub struct Local { - progress_bar: TaskProgress, - logs: String, + job: Option, nix_options: Vec, } impl Local { pub fn new(nix_options: Vec) -> Self { Self { - progress_bar: TaskProgress::default(), - logs: String::new(), + job: None, nix_options, } } @@ -36,6 +34,7 @@ impl Host for Local { async fn copy_closure(&mut self, _closure: &StorePath, _direction: CopyDirection, _options: CopyOptions) -> NixResult<()> { Ok(()) } + async fn realize_remote(&mut self, derivation: &StorePath) -> NixResult> { let mut command = Command::new("nix-store"); @@ -47,20 +46,15 @@ impl Host for Local { let mut execution = CommandExecution::new(command); - execution.set_progress_bar(self.progress_bar.clone()); + execution.set_job(self.job.clone()); - let result = execution.run().await; + execution.run().await?; + let (stdout, _) = execution.get_logs(); - let (stdout, stderr) = execution.get_logs(); - self.logs += stderr.unwrap(); - - match result { - Ok(()) => { - stdout.unwrap().lines().map(|p| p.to_string().try_into()).collect() - } - Err(e) => Err(e), - } + stdout.unwrap().lines() + .map(|p| p.to_string().try_into()).collect() } + async fn upload_keys(&mut self, keys: &HashMap, require_ownership: bool) -> NixResult<()> { for (name, key) in keys { self.upload_key(&name, &key, require_ownership).await?; @@ -68,7 +62,12 @@ impl Host for Local { Ok(()) } + async fn activate(&mut self, profile: &Profile, goal: Goal) -> NixResult<()> { + if !goal.is_real_goal() { + return Err(NixError::Unsupported); + } + if goal.should_switch_profile() { let path = profile.as_path().to_str().unwrap(); Command::new("nix-env") @@ -85,32 +84,23 @@ impl Host for Local { let mut execution = CommandExecution::new(command); - execution.set_progress_bar(self.progress_bar.clone()); + execution.set_job(self.job.clone()); let result = execution.run().await; - // FIXME: Bad - Order of lines is messed up - let (stdout, stderr) = execution.get_logs(); - self.logs += stdout.unwrap(); - self.logs += stderr.unwrap(); - result } async fn active_derivation_known(&mut self) -> NixResult { Ok(true) } - fn set_progress_bar(&mut self, bar: TaskProgress) { - self.progress_bar = bar; - } - async fn dump_logs(&self) -> Option<&str> { - Some(&self.logs) - } } impl Local { /// "Uploads" a single key. async fn upload_key(&mut self, name: &str, key: &Key, require_ownership: bool) -> NixResult<()> { - self.progress_bar.log(&format!("Deploying key {}", name)); + if let Some(job) = &self.job { + job.message(format!("Deploying key {}", name))?; + } let dest_path = key.dest_dir().join(name); let key_script = format!("'{}'", key_uploader::generate_script(key, &dest_path, require_ownership)); @@ -123,6 +113,6 @@ impl Local { command.stdout(Stdio::piped()); let uploader = command.spawn()?; - key_uploader::feed_uploader(uploader, key, self.progress_bar.clone(), &mut self.logs).await + key_uploader::feed_uploader(uploader, key, self.job.clone()).await } } diff --git a/src/nix/host/mod.rs b/src/nix/host/mod.rs index 985a989..2179e57 100644 --- a/src/nix/host/mod.rs +++ b/src/nix/host/mod.rs @@ -3,7 +3,7 @@ use std::collections::HashMap; use async_trait::async_trait; use super::{StorePath, Profile, Goal, NixResult, NixError, Key}; -use crate::progress::TaskProgress; +use crate::job::JobHandle; mod ssh; pub use ssh::Ssh; @@ -109,21 +109,22 @@ pub trait Host: Send + Sync + std::fmt::Debug { /// Check if the active profile is known to the host running Colmena async fn active_derivation_known(&mut self) -> NixResult; - #[allow(unused_variables)] /// Activates a system profile on the host, if it runs NixOS. /// /// The profile must already exist on the host. You should probably use deploy instead. + #[allow(unused_variables)] async fn activate(&mut self, profile: &Profile, goal: Goal) -> NixResult<()> { Err(NixError::Unsupported) } + /// Runs an arbitrary command on the host. #[allow(unused_variables)] - /// Provides a TaskProgress to use during operations. - fn set_progress_bar(&mut self, bar: TaskProgress) { + async fn run_command(&mut self, command: &[&str]) -> NixResult<()> { + Err(NixError::Unsupported) } - /// Dumps human-readable unstructured log messages related to the host. - async fn dump_logs(&self) -> Option<&str> { - None + /// Provides a JobHandle to use during operations. + #[allow(unused_variables)] + fn set_job(&mut self, bar: Option) { } } diff --git a/src/nix/host/ssh.rs b/src/nix/host/ssh.rs index d143b63..6f298e6 100644 --- a/src/nix/host/ssh.rs +++ b/src/nix/host/ssh.rs @@ -9,7 +9,7 @@ use tokio::process::Command; use super::{CopyDirection, CopyOptions, Host, key_uploader}; use crate::nix::{StorePath, Profile, Goal, NixResult, NixCommand, NixError, Key, SYSTEM_PROFILE}; use crate::util::CommandExecution; -use crate::progress::TaskProgress; +use crate::job::JobHandle; /// A remote machine connected over SSH. #[derive(Debug)] @@ -30,8 +30,7 @@ pub struct Ssh { privilege_escalation_command: Vec, friendly_name: String, - progress_bar: TaskProgress, - logs: String, + job: Option, } #[async_trait] @@ -61,6 +60,10 @@ impl Host for Ssh { Ok(()) } async fn activate(&mut self, profile: &Profile, goal: Goal) -> NixResult<()> { + if !goal.is_real_goal() { + return Err(NixError::Unsupported); + } + if goal.should_switch_profile() { let path = profile.as_path().to_str().unwrap(); let set_profile = self.ssh(&["nix-env", "--profile", SYSTEM_PROFILE, "--set", path]); @@ -72,6 +75,10 @@ impl Host for Ssh { let command = self.ssh(&v); self.run_command(command).await } + async fn run_command(&mut self, command: &[&str]) -> NixResult<()> { + let command = self.ssh(&command); + self.run_command(command).await + } async fn active_derivation_known(&mut self) -> NixResult { let paths = self.ssh(&["realpath", SYSTEM_PROFILE]) .capture_output() @@ -93,11 +100,8 @@ impl Host for Ssh { Err(e) => Err(e), } } - fn set_progress_bar(&mut self, bar: TaskProgress) { - self.progress_bar = bar; - } - async fn dump_logs(&self) -> Option<&str> { - Some(&self.logs) + fn set_job(&mut self, job: Option) { + self.job = job; } } @@ -111,8 +115,7 @@ impl Ssh { ssh_config: None, friendly_name, privilege_escalation_command: Vec::new(), - progress_bar: TaskProgress::default(), - logs: String::new(), + job: None, } } @@ -157,16 +160,10 @@ impl Ssh { async fn run_command(&mut self, command: Command) -> NixResult<()> { let mut execution = CommandExecution::new(command); - - execution.set_progress_bar(self.progress_bar.clone()); + execution.set_job(self.job.clone()); let result = execution.run().await; - // FIXME: Bad - Order of lines is messed up - let (stdout, stderr) = execution.get_logs(); - self.logs += stdout.unwrap(); - self.logs += stderr.unwrap(); - result } @@ -228,7 +225,9 @@ impl Ssh { /// Uploads a single key. async fn upload_key(&mut self, name: &str, key: &Key, require_ownership: bool) -> NixResult<()> { - self.progress_bar.log(&format!("Deploying key {}", name)); + if let Some(job) = &self.job { + job.message(format!("Uploading key {}", name))?; + } let dest_path = key.dest_dir().join(name); let key_script = key_uploader::generate_script(key, &dest_path, require_ownership); @@ -240,6 +239,6 @@ impl Ssh { command.stdout(Stdio::piped()); let uploader = command.spawn()?; - key_uploader::feed_uploader(uploader, key, self.progress_bar.clone(), &mut self.logs).await + key_uploader::feed_uploader(uploader, key, self.job.clone()).await } } diff --git a/src/nix/mod.rs b/src/nix/mod.rs index a8edf12..5318b8a 100644 --- a/src/nix/mod.rs +++ b/src/nix/mod.rs @@ -33,7 +33,7 @@ pub mod profile; pub use profile::{Profile, ProfileMap}; pub mod deployment; -pub use deployment::{Goal, Target, Deployment}; +pub use deployment::Goal; pub mod info; pub use info::NixCheck; @@ -87,10 +87,19 @@ pub enum NixError { #[snafu(display("Current Nix version does not support Flakes"))] NoFlakesSupport, + #[snafu(display("Don't know how to connect to the node"))] + NoTargetHost, + #[snafu(display("Node name cannot be empty"))] EmptyNodeName, - #[snafu(display("Nix Error: {}", message))] + #[snafu(display("Filter rule cannot be empty"))] + EmptyFilterRule, + + #[snafu(display("Deployment already executed"))] + DeploymentAlreadyExecuted, + + #[snafu(display("Unknown error: {}", message))] Unknown { message: String }, } @@ -121,6 +130,13 @@ impl From for NixError { } } +impl NixError { + pub fn unknown(error: Box) -> Self { + let message = error.to_string(); + Self::Unknown { message } + } +} + /// A node's attribute name. #[derive(Serialize, Deserialize, Clone, Debug, Hash, Eq, PartialEq)] #[serde(transparent)] @@ -154,6 +170,14 @@ pub struct NodeConfig { keys: HashMap, } +#[async_trait] +trait NixCommand { + async fn passthrough(&mut self) -> NixResult<()>; + async fn capture_output(&mut self) -> NixResult; + async fn capture_json(&mut self) -> NixResult where T: DeserializeOwned; + async fn capture_store_path(&mut self) -> NixResult; +} + impl NodeName { /// Returns the string. pub fn as_str(&self) -> &str { @@ -218,14 +242,6 @@ impl NodeConfig { } } -#[async_trait] -trait NixCommand { - async fn passthrough(&mut self) -> NixResult<()>; - async fn capture_output(&mut self) -> NixResult; - async fn capture_json(&mut self) -> NixResult where T: DeserializeOwned; - async fn capture_store_path(&mut self) -> NixResult; -} - #[async_trait] impl NixCommand for Command { /// Runs the command with stdout and stderr passed through to the user. diff --git a/src/nix/node_filter.rs b/src/nix/node_filter.rs new file mode 100644 index 0000000..64bdf47 --- /dev/null +++ b/src/nix/node_filter.rs @@ -0,0 +1,260 @@ +//! Node filters. + +use std::collections::HashSet; +use std::convert::AsRef; +use std::iter::{Iterator, FromIterator}; + +use glob::Pattern as GlobPattern; + +use super::{NixError, NixResult, NodeName, NodeConfig}; + +/// A node filter containing a list of rules. +pub struct NodeFilter { + rules: Vec, +} + +/// A filter rule. +/// +/// The filter rules are OR'd together. +#[derive(Debug, Eq, PartialEq)] +enum Rule { + /// Matches a node's attribute name. + MatchName(GlobPattern), + + /// Matches a node's `deployment.tags`. + MatchTag(GlobPattern), +} + +impl NodeFilter { + /// Creates a new filter using an expression passed using `--on`. + pub fn new>(filter: S) -> NixResult { + let filter = filter.as_ref(); + let trimmed = filter.trim(); + + if trimmed.len() == 0 { + log::warn!("Filter \"{}\" is blank and will match nothing", filter); + + return Ok(Self { + rules: Vec::new(), + }); + } + + let rules = trimmed.split(",").map(|pattern| { + let pattern = pattern.trim(); + + if pattern.len() == 0 { + return Err(NixError::EmptyFilterRule); + } + + if let Some(tag_pattern) = pattern.strip_prefix("@") { + Ok(Rule::MatchTag(GlobPattern::new(tag_pattern).unwrap())) + } else { + Ok(Rule::MatchName(GlobPattern::new(pattern).unwrap())) + } + }).collect::>>(); + + let rules = Result::from_iter(rules)?; + + Ok(Self { + rules, + }) + } + + /// Returns whether the filter has any rule matching NodeConfig information. + /// + /// Evaluating `config.deployment` can potentially be very expensive, + /// especially when its values (e.g., tags) depend on other parts of + /// the configuration. + pub fn has_node_config_rules(&self) -> bool { + self.rules.iter().find(|rule| rule.matches_node_config()).is_some() + } + + /// Runs the filter against a set of NodeConfigs and returns the matched ones. + pub fn filter_node_configs<'a, I>(&self, nodes: I) -> HashSet + where I: Iterator + { + if self.rules.len() == 0 { + return HashSet::new(); + } + + nodes.filter_map(|(name, node)| { + for rule in self.rules.iter() { + match rule { + Rule::MatchName(pat) => { + if pat.matches(name.as_str()) { + return Some(name); + } + } + Rule::MatchTag(pat) => { + for tag in node.tags() { + if pat.matches(tag) { + return Some(name); + } + } + } + } + } + + None + }).cloned().collect() + } + + /// Runs the filter against a set of node names and returns the matched ones. + pub fn filter_node_names(&self, nodes: &[NodeName]) -> NixResult> { + nodes.iter().filter_map(|name| -> Option> { + for rule in self.rules.iter() { + match rule { + Rule::MatchName(pat) => { + if pat.matches(name.as_str()) { + return Some(Ok(name.clone())); + } + } + _ => { + return Some(Err(NixError::Unknown { + message: format!("Not enough information to run rule {:?} - We only have node names", rule), + })); + } + } + } + None + }).collect() + } +} + +impl Rule { + /// Returns whether the rule matches against the NodeConfig (i.e., `config.deployment`). + pub fn matches_node_config(&self) -> bool { + match self { + Self::MatchTag(_) => true, + Self::MatchName(_) => false, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use std::collections::{HashMap, HashSet}; + + macro_rules! node { + ($n:expr) => { + NodeName::new($n.to_string()).unwrap() + } + } + + #[test] + fn test_empty_filter() { + let filter = NodeFilter::new("").unwrap(); + assert_eq!(0, filter.rules.len()); + + let filter = NodeFilter::new("\t").unwrap(); + assert_eq!(0, filter.rules.len()); + + let filter = NodeFilter::new(" ").unwrap(); + assert_eq!(0, filter.rules.len()); + } + + #[test] + fn test_empty_filter_rule() { + assert!(NodeFilter::new(",").is_err()); + assert!(NodeFilter::new("a,,b").is_err()); + assert!(NodeFilter::new("a,b,c,").is_err()); + } + + #[test] + fn test_filter_rule_mixed() { + let filter = NodeFilter::new("@router,gamma-*").unwrap(); + assert_eq!( + vec![ + Rule::MatchTag(GlobPattern::new("router").unwrap()), + Rule::MatchName(GlobPattern::new("gamma-*").unwrap()), + ], + filter.rules, + ); + + let filter = NodeFilter::new("a, \t@b , c-*").unwrap(); + assert_eq!( + vec![ + Rule::MatchName(GlobPattern::new("a").unwrap()), + Rule::MatchTag(GlobPattern::new("b").unwrap()), + Rule::MatchName(GlobPattern::new("c-*").unwrap()), + ], + filter.rules, + ); + } + + #[test] + fn test_filter_node_names() { + let nodes = vec![ node!("lax-alpha"), node!("lax-beta"), node!("sfo-gamma") ]; + + assert_eq!( + &HashSet::from_iter([ node!("lax-alpha") ]), + &NodeFilter::new("lax-alpha").unwrap().filter_node_names(&nodes).unwrap(), + ); + + assert_eq!( + &HashSet::from_iter([ node!("lax-alpha"), node!("lax-beta") ]), + &NodeFilter::new("lax-*").unwrap().filter_node_names(&nodes).unwrap(), + ); + } + + #[test] + fn test_filter_node_configs() { + // TODO: Better way to mock + let template = NodeConfig { + tags: vec![], + target_host: None, + target_user: None, + target_port: None, + allow_local_deployment: false, + replace_unknown_profiles: false, + privilege_escalation_command: vec![], + keys: HashMap::new(), + }; + + let mut nodes = HashMap::new(); + + nodes.insert(node!("alpha"), NodeConfig { + tags: vec![ "web".to_string(), "infra-lax".to_string() ], + ..template.clone() + }); + + nodes.insert(node!("beta"), NodeConfig { + tags: vec![ "router".to_string(), "infra-sfo".to_string() ], + ..template.clone() + }); + + nodes.insert(node!("gamma-a"), NodeConfig { + tags: vec![ "controller".to_string() ], + ..template.clone() + }); + + nodes.insert(node!("gamma-b"), NodeConfig { + tags: vec![ "ewaste".to_string() ], + ..template.clone() + }); + + assert_eq!(4, nodes.len()); + + assert_eq!( + &HashSet::from_iter([ node!("alpha") ]), + &NodeFilter::new("@web").unwrap().filter_node_configs(nodes.iter()), + ); + + assert_eq!( + &HashSet::from_iter([ node!("alpha"), node!("beta") ]), + &NodeFilter::new("@infra-*").unwrap().filter_node_configs(nodes.iter()), + ); + + assert_eq!( + &HashSet::from_iter([ node!("beta"), node!("gamma-a") ]), + &NodeFilter::new("@router,@controller").unwrap().filter_node_configs(nodes.iter()), + ); + + assert_eq!( + &HashSet::from_iter([ node!("beta"), node!("gamma-a"), node!("gamma-b") ]), + &NodeFilter::new("@router,gamma-*").unwrap().filter_node_configs(nodes.iter()), + ); + } +} diff --git a/src/nix/profile.rs b/src/nix/profile.rs index 11d9b61..d86fa94 100644 --- a/src/nix/profile.rs +++ b/src/nix/profile.rs @@ -65,7 +65,7 @@ impl Profile { } /// A map of names to their associated NixOS system profiles. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct ProfileMap(HashMap); impl Deref for ProfileMap { diff --git a/src/nix/tests/mod.rs b/src/nix/tests/mod.rs index 7822f89..b4d50a3 100644 --- a/src/nix/tests/mod.rs +++ b/src/nix/tests/mod.rs @@ -1,7 +1,6 @@ //! Integration-ish tests use super::*; -use crate::progress::TaskProgress; use std::collections::HashSet; use std::hash::Hash; @@ -69,16 +68,14 @@ impl TempHive { /// Asserts that the specified nodes can be fully evaluated. pub fn eval_success(text: &str, nodes: Vec) { let hive = Self::new(text); - let progress = TaskProgress::new("tests".to_string(), 5); - let (profiles, _) = block_on(hive.eval_selected(&nodes, progress)); + let profiles = block_on(hive.eval_selected(&nodes, None)); assert!(profiles.is_ok()); } /// Asserts that the specified nodes will fail to evaluate. pub fn eval_failure(text: &str, nodes: Vec) { let hive = Self::new(text); - let progress = TaskProgress::new("tests".to_string(), 5); - let (profiles, _) = block_on(hive.eval_selected(&nodes, progress)); + let profiles = block_on(hive.eval_selected(&nodes, None)); assert!(profiles.is_err()); } } diff --git a/src/progress.rs b/src/progress.rs deleted file mode 100644 index f731ae0..0000000 --- a/src/progress.rs +++ /dev/null @@ -1,227 +0,0 @@ -//! Progress display utilities. - -use std::future::Future; -use std::sync::Arc; -use std::time::Duration; - -use atty::Stream; -use console::Style; - -use indicatif::{ - MultiProgress, - ProgressStyle as IndicatifStyle, - ProgressBar as IndicatifBar, -}; - -pub fn get_spinner_styles(label_width: usize) -> (IndicatifStyle, IndicatifStyle) { - let template = format!("{{prefix:>{}.bold.dim}} {{spinner}} {{elapsed}} {{wide_msg}}", label_width); - - ( - IndicatifStyle::default_spinner() - .tick_chars("🕛🕐🕑🕒🕓🕔🕕🕖🕗🕘🕙🕚✅") - .template(&template), - - IndicatifStyle::default_spinner() - .tick_chars("❌❌") - .template(&template), - ) -} - -pub enum OutputStyle { - /// Show condensed progress bars with fancy spinners. - /// - /// Not usable in a non-interactive environment. - Condensed, - - /// Output log lines directly to console. - Plain, -} - -/// Parallel progress display. -/// -/// Currently a simple wrapper over MultiProgress. -/// Sometimes we need to log directly to the console, in case -/// stdout is not connected to a TTY or the user requests -/// verbose logging via `--verbose`. -/// -/// This is normally only usable as Arc. -pub struct Progress { - multi: Option>, // eww - - /// Width of the labels for alignment - label_width: usize, -} - -impl Progress { - pub fn with_style(output_style: OutputStyle) -> Self { - let multi = match output_style { - OutputStyle::Condensed => Some(Arc::new(Self::init_multi())), - OutputStyle::Plain => None, - }; - - Self { - multi, - label_width: 10, - } - } - - pub fn set_label_width(&mut self, width: usize) { - self.label_width = width; - } - - /// Returns a handle for a task to display progress information. - pub fn create_task_progress(&self, label: String) -> TaskProgress { - let mut progress = TaskProgress::new(label.clone(), self.label_width); - - if let Some(multi) = self.multi.as_ref() { - let bar = multi.add(IndicatifBar::new(100)); - let (style, _) = get_spinner_styles(self.label_width); - bar.set_prefix(label); - bar.set_style(style); - bar.enable_steady_tick(100); - - progress.set_bar(bar); - } - - progress - } - - /// Runs code that may initate multiple tasks. - pub async fn run(self: Arc, func: U) -> F::Output - where U: FnOnce(Arc) -> F - { - // TODO: Remove this - Previous trick no longer required in indicatif 0.7 - func(self.clone()).await - } - - fn init_multi() -> MultiProgress { - let multi = MultiProgress::new(); - multi - } - - fn detect_output() -> OutputStyle { - if atty::is(Stream::Stdout) { - OutputStyle::Condensed - } else { - OutputStyle::Plain - } - } -} - -impl Default for Progress { - fn default() -> Self { - let style = Self::detect_output(); - Self::with_style(style) - } -} - -/// Progress display for a single task. -#[derive(Debug, Clone)] -pub struct TaskProgress { - label: String, - label_width: usize, - bar: Option, - quiet: bool, -} - -impl TaskProgress { - pub fn new(label: String, label_width: usize) -> Self { - Self { - label, - label_width, - bar: None, - quiet: false, - } - } - - fn set_bar(&mut self, bar: IndicatifBar) { - self.bar = Some(bar); - } - - /// Displays a new line of log. - pub fn log(&mut self, message: &str) { - if self.quiet { - return; - } - - if let Some(bar) = self.bar.as_ref() { - bar.set_message(message.to_owned()); - } else { - let style = Style::new().bold(); - self.plain_print(style, message); - } - } - - /// Marks the task as successful and leave the spinner intact. - pub fn success(self, message: &str) { - if self.quiet { - return; - } - - if let Some(bar) = self.bar.as_ref() { - bar.finish_with_message(message.to_owned()); - } else { - let style = Style::new().bold().green(); - self.plain_print(style, message); - } - } - - /// Marks the task as successful and remove the spinner. - pub fn success_quiet(self) { - if self.quiet { - return; - } - - if let Some(bar) = self.bar.as_ref() { - bar.finish_and_clear(); - } - } - - /// Marks the task as unsuccessful. - pub fn failure(self, message: &str) { - if self.quiet { - return; - } - - if let Some(bar) = self.bar.as_ref() { - let (_, fail_style) = get_spinner_styles(self.label_width); - bar.set_style(fail_style); - bar.abandon_with_message(message.to_owned()); - } else { - let style = Style::new().bold().red(); - self.plain_print(style, message); - } - } - - /// Returns the time spent on this task so far. - pub fn get_elapsed(&self) -> Option { - self.bar.as_ref().map(|bar| bar.elapsed()) - } - - /// Sets the time spent on this task so far. - pub fn set_elapsed(&mut self, elapsed: Duration) { - if let Some(bar) = self.bar.take() { - self.bar.replace(bar.with_elapsed(elapsed)); - } - } - - pub fn failure_err(self, error: &E) { - self.failure(&error.to_string()) - } - - fn plain_print(&self, style: Style, line: &str) { - eprintln!("{:>width$} | {}", style.apply_to(&self.label), line, width = self.label_width); - } -} - -impl Default for TaskProgress { - /// Creates a TaskProgress that does nothing. - fn default() -> Self { - Self { - label: String::new(), - label_width: 0, - bar: None, - quiet: true, - } - } -} diff --git a/src/progress/mod.rs b/src/progress/mod.rs new file mode 100644 index 0000000..44893dc --- /dev/null +++ b/src/progress/mod.rs @@ -0,0 +1,166 @@ +//! Progress output. +//! +//! Displaying of progress is handled through a ProgressOutput. Each +//! ProgressOutput is minimally-stateful and receives already formatted +//! text from a writer (e.g., a JobMonitor). + +pub mod plain; +pub mod spinner; + +use async_trait::async_trait; +use tokio::sync::mpsc::{self, + UnboundedReceiver as TokioReceiver, + UnboundedSender as TokioSender, +}; + +use crate::job::JobId; +use crate::nix::NixResult; + +pub use plain::PlainOutput; +pub use spinner::SpinnerOutput; + +pub type Sender = TokioSender; +pub type Receiver = TokioReceiver; + +const DEFAULT_LABEL_WIDTH: usize = 5; + +pub enum SimpleProgressOutput { + Plain(PlainOutput), + Spinner(SpinnerOutput), +} + +/// A progress display driver. +#[async_trait] +pub trait ProgressOutput : Sized { + /// Runs until a Message::Complete is received. + async fn run_until_completion(self) -> NixResult; + + /// Returns a sender. + /// + /// This method can only be called once. + fn get_sender(&mut self) -> Option; +} + +/// A message. +#[derive(Debug, Clone)] +pub enum Message { + /// Prints a line of text to the screen. + Print(Line), + + /// Prints a line of text related to the overall progress. + /// + /// For certain output types, this will be printed in a fixed, + /// prominent position with special styling. + PrintMeta(Line), + + /// Hints about the maximum label width. + HintLabelWidth(usize), + + /// Completes the progress output. + Complete, +} + +/// A line of output. +#[derive(Debug, Clone)] +pub struct Line { + /// Identifier for elapsed time tracking. + job_id: JobId, + + /// Style of the line. + style: LineStyle, + + /// A label. + label: String, + + /// The text. + text: String, + + /// Whether this is an one-off output. + one_off: bool, + + /// Whether this is line is noisy. + noisy: bool, +} + +/// Style of a line. +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum LineStyle { + Normal, + Success, + SuccessNoop, + Failure, +} + +impl SimpleProgressOutput { + pub fn new(verbose: bool) -> Self { + if verbose { + Self::Plain(PlainOutput::new()) + } else { + Self::Spinner(SpinnerOutput::new()) + } + } + + pub fn get_sender(&mut self) -> Option { + match self { + Self::Plain(ref mut o) => o.get_sender(), + Self::Spinner(ref mut o) => o.get_sender(), + } + } + + pub async fn run_until_completion(self) -> NixResult { + match self { + Self::Plain(o) => { + o.run_until_completion().await + .map(|o| Self::Plain(o)) + } + Self::Spinner(o) => { + o.run_until_completion().await + .map(|o| Self::Spinner(o)) + } + } + } +} + +impl Line { + pub fn new(job_id: JobId, text: String) -> Self { + Self { + job_id, + style: LineStyle::Normal, + label: String::new(), + text, + one_off: false, + noisy: false, + } + } + + /// Builder-like interface to set the line as an one-off output. + /// + /// For SpinnerOutput, this will create a new bar that immediately + /// finishes with the style (success or failure). + pub fn one_off(mut self) -> Self { + self.one_off = true; + self + } + + /// Builder-like interface to set the line as noisy. + pub fn noisy(mut self) -> Self { + self.noisy = true; + self + } + + /// Builder-like interface to set the label. + pub fn label(mut self, label: String) -> Self { + self.label = label; + self + } + + /// Builder-like interface to set the line style. + pub fn style(mut self, style: LineStyle) -> Self { + self.style = style; + self + } +} + +fn create_channel() -> (Sender, Receiver) { + mpsc::unbounded_channel() +} diff --git a/src/progress/plain.rs b/src/progress/plain.rs new file mode 100644 index 0000000..5de2a80 --- /dev/null +++ b/src/progress/plain.rs @@ -0,0 +1,113 @@ +//! Plain output. + +use async_trait::async_trait; +use console::Style as ConsoleStyle; + +use crate::nix::NixResult; +use super::{ + DEFAULT_LABEL_WIDTH, + ProgressOutput, + Sender, + Receiver, + Message, + Line, + LineStyle, + create_channel, +}; + +pub struct PlainOutput { + sender: Option, + receiver: Receiver, + label_width: usize, +} + +impl PlainOutput { + pub fn new() -> Self { + let (sender, receiver) = create_channel(); + + Self { + sender: Some(sender), + receiver, + label_width: DEFAULT_LABEL_WIDTH, + } + } + + fn print(&self, line: Line) { + if line.noisy { + return; + } + + let label_style = match line.style { + LineStyle::Normal => { + ConsoleStyle::new().bold() + } + LineStyle::Success => { + ConsoleStyle::new().bold().green() + } + LineStyle::SuccessNoop => { + ConsoleStyle::new().bold().green().dim() + } + LineStyle::Failure => { + ConsoleStyle::new().bold().red() + } + }; + + let text_style = match line.style { + LineStyle::Normal => { + ConsoleStyle::new() + } + LineStyle::Success => { + ConsoleStyle::new().green() + } + LineStyle::SuccessNoop => { + ConsoleStyle::new().dim() + } + LineStyle::Failure => { + ConsoleStyle::new().red() + } + }; + + eprintln!("{:>width$} | {}", + label_style.apply_to(line.label), + text_style.apply_to(line.text), + width = self.label_width, + ); + } +} + +#[async_trait] +impl ProgressOutput for PlainOutput { + async fn run_until_completion(mut self) -> NixResult { + loop { + let message = self.receiver.recv().await; + + if message.is_none() { + log::info!("All senders dropped"); + return Ok(self); + } + + let message = message.unwrap(); + + match message { + Message::Complete => { + return Ok(self); + } + Message::Print(line) => { + self.print(line); + } + Message::PrintMeta(line) => { + self.print(line); + } + Message::HintLabelWidth(width) => { + if width > self.label_width { + self.label_width = width; + } + } + } + } + } + + fn get_sender(&mut self) -> Option { + self.sender.take() + } +} diff --git a/src/progress/spinner.rs b/src/progress/spinner.rs new file mode 100644 index 0000000..3d1ab08 --- /dev/null +++ b/src/progress/spinner.rs @@ -0,0 +1,242 @@ +//! Progress spinner output. + +use std::collections::HashMap; +use std::time::Instant; + +use async_trait::async_trait; +use indicatif::{MultiProgress, ProgressStyle, ProgressBar}; + +use crate::job::JobId; +use crate::nix::NixResult; +use super::{ + DEFAULT_LABEL_WIDTH, + ProgressOutput, + Sender, + Receiver, + Message, + Line, + LineStyle, + create_channel, +}; + +/// Progress spinner output. +pub struct SpinnerOutput { + /// Job timekeeping. + job_state: HashMap, + + /// One-off progress bars. + one_off_bars: Vec<(ProgressBar, LineStyle)>, + + /// Progress bar for the meta job. + meta_bar: ProgressBar, + + /// Last style printed to the meta bar. + meta_style: LineStyle, + + /// Maximum label width for alignment. + label_width: usize, + + multi: MultiProgress, + sender: Option, + receiver: Receiver, +} + +#[derive(Clone)] +struct JobState { + /// When the job started. + since: Instant, + + /// Progress bar to draw to. + bar: ProgressBar, + + /// Last style printed to the bar. + /// + /// This is used to regenerate the approproate style when the + /// max label width changes. + style: LineStyle, +} + +impl SpinnerOutput { + pub fn new() -> Self { + let meta_bar = { + let bar = ProgressBar::new(100) + .with_style(get_spinner_style(DEFAULT_LABEL_WIDTH, LineStyle::Normal)); + bar + }; + + let (sender, receiver) = create_channel(); + + Self { + multi: MultiProgress::new(), + job_state: HashMap::new(), + one_off_bars: Vec::new(), + meta_bar, + meta_style: LineStyle::Normal, + label_width: DEFAULT_LABEL_WIDTH, + sender: Some(sender), + receiver, + } + } + + /// Returns the state of a job. + fn get_job_state(&mut self, job_id: JobId) -> JobState { + if let Some(state) = self.job_state.get(&job_id) { + state.clone() + } else { + let bar = self.create_bar(LineStyle::Normal); + let state = JobState::new(bar.clone()); + self.job_state.insert(job_id, state.clone()); + state + } + } + + /// Creates a new bar. + fn create_bar(&self, style: LineStyle) -> ProgressBar { + let bar = ProgressBar::new(100) + .with_style(self.get_spinner_style(style)); + + let bar = self.multi.add(bar.clone()); + bar.enable_steady_tick(100); + bar + } + + fn print(&mut self, line: Line, meta: bool) { + if line.label.len() > self.label_width { + self.label_width = line.label.len(); + self.reset_styles(); + } + + let bar = if meta { + if self.meta_style != line.style { + self.meta_style = line.style; + self.meta_bar.set_style(self.get_spinner_style(line.style)); + } + + self.meta_bar.clone() + } else { + let mut state = self.get_job_state(line.job_id); + + if line.one_off { + let bar = self.create_bar(line.style); + state.configure_one_off(&bar); + self.one_off_bars.push((bar.clone(), line.style)); + bar + } else { + let bar = state.bar.clone(); + + if state.style != line.style { + state.style = line.style; + bar.set_style(self.get_spinner_style(line.style)); + self.job_state.insert(line.job_id, state); + } + + bar + } + }; + + bar.set_prefix(line.label); + + match line.style { + LineStyle::Success | LineStyle::Failure => { + bar.finish_with_message(line.text); + } + LineStyle::SuccessNoop => { + bar.finish_and_clear(); + } + _ => { + bar.set_message(line.text); + } + } + } + + /// Resets the styles of all known bars. + fn reset_styles(&self) { + for (bar, style) in &self.one_off_bars { + let style = self.get_spinner_style(*style); + bar.set_style(style); + } + + for state in self.job_state.values() { + let style = self.get_spinner_style(state.style); + state.bar.set_style(style); + } + + let style = self.get_spinner_style(self.meta_style); + self.meta_bar.set_style(style); + } + + fn get_spinner_style(&self, style: LineStyle) -> ProgressStyle { + get_spinner_style(self.label_width, style) + } +} + +#[async_trait] +impl ProgressOutput for SpinnerOutput { + async fn run_until_completion(mut self) -> NixResult { + let meta_bar = self.multi.add(self.meta_bar.clone()); + meta_bar.enable_steady_tick(100); + + loop { + let message = self.receiver.recv().await; + + if message.is_none() { + return Ok(self); + } + + let message = message.unwrap(); + + match message { + Message::Complete => { + return Ok(self); + } + Message::Print(line) => { + self.print(line, false); + } + Message::PrintMeta(line) => { + self.print(line, true); + } + Message::HintLabelWidth(width) => { + if width > self.label_width { + self.label_width = width; + self.reset_styles(); + } + } + } + } + } + + fn get_sender(&mut self) -> Option { + self.sender.take() + } +} + +impl JobState { + fn new(bar: ProgressBar) -> Self { + Self { + since: Instant::now(), + bar, + style: LineStyle::Normal, + } + } + + fn configure_one_off(&self, bar: &ProgressBar) { + bar.clone().with_elapsed(Instant::now().duration_since(self.since)); + } +} + +fn get_spinner_style(label_width: usize, style: LineStyle) -> ProgressStyle { + let template = format!("{{prefix:>{}.bold.dim}} {{spinner}} {{elapsed}} {{wide_msg}}", label_width); + + match style { + LineStyle::Normal | LineStyle::Success | LineStyle::SuccessNoop => { + ProgressStyle::default_spinner() + .tick_chars("🕛🕐🕑🕒🕓🕔🕕🕖🕗🕘🕙🕚✅") + .template(&template) + } + LineStyle::Failure => { + ProgressStyle::default_spinner() + .tick_chars("❌❌") + .template(&template) + } + } +} diff --git a/src/util.rs b/src/util.rs index 2330812..bb00820 100644 --- a/src/util.rs +++ b/src/util.rs @@ -1,25 +1,19 @@ -use std::collections::HashMap; use std::path::PathBuf; use std::process::Stdio; use clap::{App, Arg, ArgMatches}; use futures::future::join3; -use glob::Pattern as GlobPattern; use tokio::io::{AsyncRead, AsyncBufReadExt, BufReader}; use tokio::process::Command; -use super::nix::{Flake, NodeName, NodeConfig, Hive, HivePath, NixResult}; -use super::progress::TaskProgress; +use super::nix::{Flake, Hive, HivePath, NixResult}; +use super::nix::deployment::TargetNodeMap; +use super::job::JobHandle; -enum NodeFilter { - NameFilter(GlobPattern), - TagFilter(GlobPattern), -} - -/// Non-interactive execution of an arbitrary Nix command. +/// Non-interactive execution of an arbitrary command. pub struct CommandExecution { command: Command, - progress_bar: TaskProgress, + job: Option, stdout: Option, stderr: Option, } @@ -28,23 +22,23 @@ impl CommandExecution { pub fn new(command: Command) -> Self { Self { command, - progress_bar: TaskProgress::default(), + job: None, stdout: None, stderr: None, } } - /// Provides a TaskProgress to use to display output. - pub fn set_progress_bar(&mut self, bar: TaskProgress) { - self.progress_bar = bar; + /// Sets the job associated with this execution. + pub fn set_job(&mut self, job: Option) { + self.job = job; } - /// Retrieve logs from the last invocation. + /// Returns logs from the last invocation. pub fn get_logs(&self) -> (Option<&String>, Option<&String>) { (self.stdout.as_ref(), self.stderr.as_ref()) } - /// Run the command. + /// Runs the command. pub async fn run(&mut self) -> NixResult<()> { self.command.stdin(Stdio::null()); self.command.stdout(Stdio::piped()); @@ -59,8 +53,8 @@ impl CommandExecution { let stderr = BufReader::new(child.stderr.take().unwrap()); let futures = join3( - capture_stream(stdout, self.progress_bar.clone()), - capture_stream(stderr, self.progress_bar.clone()), + capture_stream(stdout, self.job.clone(), false), + capture_stream(stderr, self.job.clone(), true), child.wait(), ); @@ -145,44 +139,6 @@ pub async fn hive_from_args(args: &ArgMatches<'_>) -> NixResult { Ok(hive) } -pub fn filter_nodes(nodes: &HashMap, filter: &str) -> Vec { - let filters: Vec = filter.split(",").map(|pattern| { - use NodeFilter::*; - if let Some(tag_pattern) = pattern.strip_prefix("@") { - TagFilter(GlobPattern::new(tag_pattern).unwrap()) - } else { - NameFilter(GlobPattern::new(pattern).unwrap()) - } - }).collect(); - - if filters.len() > 0 { - nodes.iter().filter_map(|(name, node)| { - for filter in filters.iter() { - use NodeFilter::*; - match filter { - TagFilter(pat) => { - // Welp - for tag in node.tags() { - if pat.matches(tag) { - return Some(name); - } - } - } - NameFilter(pat) => { - if pat.matches(name) { - return Some(name) - } - } - } - } - - None - }).cloned().collect() - } else { - nodes.keys().cloned().collect() - } -} - pub fn register_selector_args<'a, 'b>(command: App<'a, 'b>) -> App<'a, 'b> { command .arg(Arg::with_name("on") @@ -208,7 +164,7 @@ fn canonicalize_cli_path(path: &str) -> PathBuf { } } -pub async fn capture_stream(mut stream: BufReader, mut progress_bar: TaskProgress) -> String { +pub async fn capture_stream(mut stream: BufReader, job: Option, stderr: bool) -> String { let mut log = String::new(); loop { @@ -220,7 +176,14 @@ pub async fn capture_stream(mut stream: BufReader, mut } let trimmed = line.trim_end(); - progress_bar.log(trimmed); + + if let Some(job) = &job { + if stderr { + job.stderr(trimmed.to_string()).unwrap(); + } else { + job.stdout(trimmed.to_string()).unwrap(); + } + } log += trimmed; log += "\n"; @@ -228,3 +191,7 @@ pub async fn capture_stream(mut stream: BufReader, mut log } + +pub fn get_label_width(targets: &TargetNodeMap) -> Option { + targets.keys().map(|n| n.len()).max() +}