From 506b894be6c81ae721a1639d2635e6f11718839a Mon Sep 17 00:00:00 2001 From: Zhaofeng Li Date: Sun, 24 Jan 2021 14:08:48 -0800 Subject: [PATCH] Redesign deployment process Now evaluation can be automatically split into chunks based on available RAM. All three stages of the deployment process (evaluate, build, apply) can happen concurrently. Fixes #1. --- Cargo.lock | 17 + Cargo.toml | 1 + default.nix | 11 +- src/command/apply.rs | 176 ++++++++--- src/command/apply_local.rs | 20 +- src/command/build.rs | 52 +-- src/command/introspect.rs | 2 +- src/deployment.rs | 131 -------- src/main.rs | 1 - src/nix/deployment.rs | 626 +++++++++++++++++++++++++++++++++++++ src/nix/hive.rs | 160 ++++++++++ src/nix/host.rs | 177 ++++++----- src/nix/mod.rs | 355 +++------------------ src/nix/profile.rs | 112 +++++++ src/nix/store.rs | 86 +++++ src/util.rs | 103 +++++- 16 files changed, 1415 insertions(+), 615 deletions(-) delete mode 100644 src/deployment.rs create mode 100644 src/nix/deployment.rs create mode 100644 src/nix/hive.rs create mode 100644 src/nix/profile.rs create mode 100644 src/nix/store.rs diff --git a/Cargo.lock b/Cargo.lock index 34ca21e..d8dbc9e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -58,6 +58,12 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ad1f8e949d755f9d79112b5bb46938e0ef9d3804a0b16dfab13aafcaa5f0fa72" +[[package]] +name = "cc" +version = "1.0.66" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c0496836a84f8d0495758516b8621a622beb77c0fed418570e50764093ced48" + [[package]] name = "cfg-if" version = "0.1.10" @@ -103,6 +109,7 @@ dependencies = [ "serde", "serde_json", "snafu", + "sys-info", "tempfile", "tokio", ] @@ -705,6 +712,16 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "sys-info" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5cfbd84f86389198ade41b439f72a5b1b3a8ba728e61cd589e1720d0df44c39" +dependencies = [ + "cc", + "libc", +] + [[package]] name = "tempfile" version = "3.1.0" diff --git a/Cargo.toml b/Cargo.toml index 45fbb41..a6f9025 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,7 @@ log = "0.4.11" quit = "1.1.2" serde = { version = "1.0.118", features = ["derive"] } serde_json = "1.0" +sys-info = "0.7.0" snafu = "0.6.10" tempfile = "3.1.0" tokio = { version = "1.0.0", features = ["full"] } diff --git a/default.nix b/default.nix index a8fe355..c686295 100644 --- a/default.nix +++ b/default.nix @@ -1,6 +1,7 @@ { pkgs ? import ./pkgs.nix {}, }: let + lib = pkgs.lib; rustPlatform = if pkgs ? pinnedRust then pkgs.makeRustPlatform { rustc = pkgs.pinnedRust; cargo = pkgs.pinnedRust; @@ -9,6 +10,12 @@ in rustPlatform.buildRustPackage { name = "colmena-dev"; version = "0.1.0"; - src = ./.; - cargoSha256 = "1ai046vbvydyqhwiy8qz0d28dch5jpxg3rzk7nrh2sdwcvxirmvm"; + src = lib.cleanSourceWith { + filter = name: type: !(type == "directory" && baseNameOf name == "target"); + src = lib.cleanSourceWith { + filter = lib.cleanSourceFilter; + src = ./.; + }; + }; + cargoSha256 = "0m35xjslm5gxr2cb5fw8pkqpm853hsznhsncry2kvicqzwh63ldm"; } diff --git a/src/command/apply.rs b/src/command/apply.rs index cb0515d..56e740b 100644 --- a/src/command/apply.rs +++ b/src/command/apply.rs @@ -1,24 +1,47 @@ +use std::collections::HashMap; +use std::sync::Arc; + use clap::{Arg, App, SubCommand, ArgMatches}; -use crate::nix::{DeploymentTask, DeploymentGoal}; -use crate::nix::host::CopyOptions; -use crate::deployment::deploy; +use crate::nix::deployment::{ + Deployment, + DeploymentGoal, + DeploymentOptions, + EvaluationNodeLimit, + ParallelismLimit, +}; +use crate::nix::host::local as localhost; use crate::util; -pub fn subcommand() -> App<'static, 'static> { - let command = SubCommand::with_name("apply") - .about("Apply configurations on remote machines") - .arg(Arg::with_name("goal") - .help("Deployment goal") - .long_help("Same as the targets for switch-to-configuration.\n\"push\" means only copying the closures to remote nodes.") - .default_value("switch") - .index(1) - .possible_values(&["push", "switch", "boot", "test", "dry-activate"])) +pub fn register_deploy_args<'a, 'b>(command: App<'a, 'b>) -> App<'a, 'b> { + command + .arg(Arg::with_name("eval-node-limit") + .long("eval-node-limit") + .value_name("LIMIT") + .help("Evaluation node limit") + .long_help(r#"Limits the maximum number of hosts to be evaluated at once. + +The evaluation process is RAM-intensive. The default behavior is to limit the maximum number of host evaluated at the same time based on naive heuristics. + +Set to 0 to disable the limit. +"#) + .default_value("auto") + .takes_value(true) + .validator(|s| { + if s == "auto" { + return Ok(()); + } + + match s.parse::() { + Ok(_) => Ok(()), + Err(_) => Err(String::from("The value must be a valid number")), + } + })) .arg(Arg::with_name("parallel") .short("p") .long("parallel") .value_name("LIMIT") - .help("Parallelism limit") + .help("Deploy parallelism limit") .long_help(r#"Limits the maximum number of hosts to be deployed in parallel. Set to 0 to disable parallemism limit. @@ -31,12 +54,33 @@ Set to 0 to disable parallemism limit. Err(_) => Err(String::from("The value must be a valid number")), } })) + .arg(Arg::with_name("parallel-build") + .long("parallel-build") + .value_name("LIMIT") + .help("Build parallelism limit") + .long_help("Limits the maximum number of parallel build processes.") + .default_value("2") + .takes_value(true) + .validator(|s| { + if s == "0" { + return Err(String::from("The value must be non-zero")); + } + match s.parse::() { + Ok(_) => Ok(()), + Err(_) => Err(String::from("The value must be a valid number")), + } + })) .arg(Arg::with_name("verbose") .short("v") .long("verbose") .help("Be verbose") .long_help("Deactivates the progress spinner and prints every line of output.") .takes_value(false)) + .arg(Arg::with_name("no-build-substitutes") + .long("no-build-substitutes") + .help("Do not use substitutes during build") + .long_help("Disables the use of substituters when building.") + .takes_value(false)) .arg(Arg::with_name("no-substitutes") .long("no-substitutes") .help("Do not use substitutes") @@ -47,13 +91,25 @@ Set to 0 to disable parallemism limit. .help("Do not use gzip") .long_help("Disables the use of gzip when copying closures to the remote host.") .takes_value(false)) +} + +pub fn subcommand() -> App<'static, 'static> { + let command = SubCommand::with_name("apply") + .about("Apply configurations on remote machines") + .arg(Arg::with_name("goal") + .help("Deployment goal") + .long_help("Same as the targets for switch-to-configuration.\n\"push\" means only copying the closures to remote nodes.") + .default_value("switch") + .index(1) + .possible_values(&["build", "push", "switch", "boot", "test", "dry-activate"])) ; + let command = register_deploy_args(command); util::register_selector_args(command) } pub async fn run(_global_args: &ArgMatches<'_>, local_args: &ArgMatches<'_>) { - let mut hive = util::hive_from_args(local_args).unwrap(); + let hive = util::hive_from_args(local_args).unwrap(); log::info!("Enumerating nodes..."); let all_nodes = hive.deployment_info().await.unwrap(); @@ -70,50 +126,70 @@ pub async fn run(_global_args: &ArgMatches<'_>, local_args: &ArgMatches<'_>) { quit::with_code(2); } - if selected_nodes.len() == all_nodes.len() { - log::info!("Building all node configurations..."); - } else { - log::info!("Selected {} out of {} hosts. Building node configurations...", selected_nodes.len(), all_nodes.len()); - } - - // Some ugly argument mangling :/ - let mut profiles = hive.build_selected(selected_nodes).await.unwrap(); let goal = DeploymentGoal::from_str(local_args.value_of("goal").unwrap()).unwrap(); - let verbose = local_args.is_present("verbose"); - - let max_parallelism = local_args.value_of("parallel").unwrap().parse::().unwrap(); - let max_parallelism = match max_parallelism { - 0 => None, - _ => Some(max_parallelism), - }; - - let mut task_list: Vec = Vec::new(); - let mut skip_list: Vec = Vec::new(); - for (name, profile) in profiles.drain() { - let target = all_nodes.get(&name).unwrap().to_ssh_host(); - - match target { - Some(target) => { - let mut task = DeploymentTask::new(name, target, profile, goal); - let options = CopyOptions::default() - .gzip(!local_args.is_present("no-gzip")) - .use_substitutes(!local_args.is_present("no-substitutes")) - ; - - task.set_copy_options(options); - task_list.push(task); + let mut targets = HashMap::new(); + for node in &selected_nodes { + let host = all_nodes.get(node).unwrap().to_ssh_host(); + match host { + Some(host) => { + targets.insert(node.clone(), host); } None => { - skip_list.push(name); + if goal == DeploymentGoal::Build { + targets.insert(node.clone(), localhost()); + } } } } - if skip_list.len() != 0 { - log::info!("Applying configurations ({} skipped)...", skip_list.len()); + if targets.len() == all_nodes.len() { + log::info!("Selected all {} nodes.", targets.len()); + } else if targets.len() == selected_nodes.len() { + log::info!("Selected {} out of {} hosts.", targets.len(), all_nodes.len()); } else { - log::info!("Applying configurations..."); + log::info!("Selected {} out of {} hosts ({} skipped)", targets.len(), all_nodes.len(), selected_nodes.len() - targets.len()); } - deploy(task_list, max_parallelism, !verbose).await; + let mut deployment = Deployment::new(hive, targets, goal); + + let mut options = DeploymentOptions::default(); + options.set_substituters_build(!local_args.is_present("no-build-substitutes")); + options.set_substituters_push(!local_args.is_present("no-substitutes")); + options.set_gzip(!local_args.is_present("no-gzip")); + options.set_progress_bar(!local_args.is_present("verbose")); + deployment.set_options(options); + + let mut parallelism_limit = ParallelismLimit::default(); + parallelism_limit.set_apply_limit({ + let limit = local_args.value_of("parallel").unwrap().parse::().unwrap(); + if limit == 0 { + selected_nodes.len() // HACK + } else { + local_args.value_of("parallel").unwrap().parse::().unwrap() + } + }); + parallelism_limit.set_build_limit({ + let limit = local_args.value_of("parallel").unwrap().parse::().unwrap(); + if limit == 0 { + panic!("The build parallelism limit must not be 0"); + } + limit + }); + deployment.set_parallelism_limit(parallelism_limit); + + let evaluation_node_limit = match local_args.value_of("eval-node-limit").unwrap() { + "auto" => EvaluationNodeLimit::Heuristic, + number => { + let number = number.parse::().unwrap(); + if number == 0 { + EvaluationNodeLimit::None + } else { + EvaluationNodeLimit::Manual(number) + } + } + }; + deployment.set_evaluation_node_limit(evaluation_node_limit); + + let deployment = Arc::new(deployment); + deployment.execute().await; } diff --git a/src/command/apply_local.rs b/src/command/apply_local.rs index 324007d..a4c0e1b 100644 --- a/src/command/apply_local.rs +++ b/src/command/apply_local.rs @@ -1,10 +1,12 @@ use std::env; +use std::collections::HashMap; +use std::sync::Arc; use clap::{Arg, App, SubCommand, ArgMatches}; use tokio::fs; use tokio::process::Command; -use crate::nix::{DeploymentTask, DeploymentGoal, Host}; +use crate::nix::{Deployment, DeploymentGoal, Host}; use crate::nix::host; use crate::util; @@ -57,7 +59,7 @@ pub async fn run(_global_args: &ArgMatches<'_>, local_args: &ArgMatches<'_>) { } } - let mut hive = util::hive_from_args(local_args).unwrap(); + let hive = util::hive_from_args(local_args).unwrap(); let hostname = hostname::get().expect("Could not get hostname") .to_string_lossy().into_owned(); let goal = DeploymentGoal::from_str(local_args.value_of("goal").unwrap()).unwrap(); @@ -79,16 +81,12 @@ pub async fn run(_global_args: &ArgMatches<'_>, local_args: &ArgMatches<'_>) { } }; - log::info!("Building local node configuration..."); - let profile = { - let selected_nodes: Vec = vec![hostname.clone()]; - let mut profiles = hive.build_selected(selected_nodes).await - .expect("Failed to build local configurations"); - profiles.remove(&hostname).unwrap() - }; + let mut targets = HashMap::new(); + targets.insert(hostname.clone(), target); - let mut task = DeploymentTask::new(hostname, target, profile, goal); - task.execute().await.unwrap(); + let deployment = Arc::new(Deployment::new(hive, targets, goal)); + + deployment.execute().await; } async fn escalate() -> ! { diff --git a/src/command/build.rs b/src/command/build.rs index a59593a..14906de 100644 --- a/src/command/build.rs +++ b/src/command/build.rs @@ -1,45 +1,23 @@ -use clap::{Arg, App, SubCommand, ArgMatches}; +use clap::{Arg, App, SubCommand}; use crate::util; +use super::apply; +pub use super::apply::run; + pub fn subcommand() -> App<'static, 'static> { let command = SubCommand::with_name("build") - .about("Build the configuration") - .arg(Arg::with_name("verbose") - .short("v") - .long("verbose") - .help("Be verbose") - .long_help("Deactivates the progress spinner and prints every line of output.") - .takes_value(false)); + .about("Build the configuration but not push to remote machines") + .long_about(r#"Build the configuration but not push to remote machines + +This subcommand behaves as if you invoked `apply` with the `build` goal."#) + .arg(Arg::with_name("goal") + .hidden(true) + .default_value("build") + .possible_values(&["build"]) + .takes_value(true)); + + let command = apply::register_deploy_args(command); util::register_selector_args(command) } - -pub async fn run(_global_args: &ArgMatches<'_>, local_args: &ArgMatches<'_>) { - let mut hive = util::hive_from_args(local_args).unwrap(); - - log::info!("Enumerating nodes..."); - let all_nodes = hive.deployment_info().await.unwrap(); - - let selected_nodes = match local_args.value_of("on") { - Some(filter) => { - util::filter_nodes(&all_nodes, filter) - } - None => all_nodes.keys().cloned().collect(), - }; - - if selected_nodes.len() == 0 { - log::warn!("No hosts matched. Exiting..."); - quit::with_code(2); - } - - if selected_nodes.len() == all_nodes.len() { - log::info!("Building all node configurations..."); - } else { - log::info!("Selected {} out of {} hosts. Building node configurations...", selected_nodes.len(), all_nodes.len()); - } - - hive.build_selected(selected_nodes).await.unwrap(); - - log::info!("Success!"); -} diff --git a/src/command/introspect.rs b/src/command/introspect.rs index 9a5f98b..f0595b9 100644 --- a/src/command/introspect.rs +++ b/src/command/introspect.rs @@ -24,7 +24,7 @@ For example, to retrieve the configuration of one node, you may write something } pub async fn run(_global_args: &ArgMatches<'_>, local_args: &ArgMatches<'_>) { - let mut hive = util::hive_from_args(local_args).unwrap(); + let hive = util::hive_from_args(local_args).unwrap(); if !(local_args.is_present("expression") ^ local_args.is_present("expression_file")) { log::error!("Either an expression (-E) xor a .nix file containing an expression should be specified, not both."); diff --git a/src/deployment.rs b/src/deployment.rs deleted file mode 100644 index cf59c35..0000000 --- a/src/deployment.rs +++ /dev/null @@ -1,131 +0,0 @@ -use std::cmp::min; -use std::sync::Arc; - -use indicatif::{MultiProgress, ProgressBar, ProgressDrawTarget}; -use futures::future::join_all; -use tokio::sync::Mutex; - -use crate::nix::DeploymentTask; -use crate::progress::get_spinner_styles; - -/// User-facing deploy routine -pub async fn deploy(tasks: Vec, max_parallelism: Option, progress_bar: bool) { - let parallelism = match max_parallelism { - Some(limit) => { - min(limit, tasks.len()) - } - None => { - tasks.len() - } - }; - - let node_name_alignment = tasks.iter().map(|task| task.name().len()).max().unwrap(); - - let multi = Arc::new(MultiProgress::new()); - let root_bar = Arc::new(multi.add(ProgressBar::new(tasks.len() as u64))); - multi.set_draw_target(ProgressDrawTarget::stderr_nohz()); - - { - let (spinner_style, _) = get_spinner_styles(node_name_alignment); - root_bar.set_message("Running..."); - root_bar.set_style(spinner_style); - root_bar.inc(0); - } - - let tasks = Arc::new(Mutex::new(tasks)); - let result_list: Arc>> = Arc::new(Mutex::new(Vec::new())); - - let mut futures = Vec::new(); - - for _ in 0..parallelism { - let tasks = tasks.clone(); - let result_list = result_list.clone(); - let multi = multi.clone(); - let (spinner_style, failing_spinner_style) = get_spinner_styles(node_name_alignment); - - let root_bar = root_bar.clone(); - - let future = tokio::spawn(async move { - // Perform tasks until there's none - loop { - let (task, remaining) = { - let mut tasks = tasks.lock().await; - let task = tasks.pop(); - let remaining = tasks.len(); - (task, remaining) - }; - - if task.is_none() { - // We are donzo! - return; - } - - let mut task = task.unwrap(); - - let bar = multi.add(ProgressBar::new(100)); - bar.set_style(spinner_style.clone()); - bar.set_prefix(task.name()); - bar.set_message("Starting..."); - bar.inc(0); - - if progress_bar { - task.set_progress_bar(bar.clone()).await; - } - - match task.execute().await { - Ok(_) => { - bar.finish_with_message(task.goal().success_str().unwrap()); - - let mut result_list = result_list.lock().await; - result_list.push((task, true)); - }, - Err(_) => { - bar.set_style(failing_spinner_style.clone()); - bar.abandon_with_message("Failed"); - - let mut result_list = result_list.lock().await; - result_list.push((task, false)); - }, - } - - root_bar.inc(1); - - if remaining == 0 { - root_bar.finish_with_message("Finished"); - } - } - }); - - futures.push(future); - } - - if progress_bar { - futures.push(tokio::task::spawn_blocking(move || { - multi.join().unwrap(); - })); - } - - join_all(futures).await; - - let mut result_list = result_list.lock().await; - for (task, success) in result_list.drain(..) { - if !success { - let name = task.name().to_owned(); - let host = task.to_host().await; - - print!("Failed to deploy to {}. ", name); - if let Some(logs) = host.dump_logs().await { - if let Some(lines) = logs.chunks(10).rev().next() { - println!("Last {} lines of logs:", lines.len()); - for line in lines { - println!("{}", line.trim_end()); - } - } else { - println!("The log is empty."); - } - } else { - println!("Logs are not available for this target."); - } - } - } -} diff --git a/src/main.rs b/src/main.rs index 3461bc0..6a48a36 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,7 +4,6 @@ use clap::{App, AppSettings, Arg}; mod nix; mod command; mod progress; -mod deployment; mod util; macro_rules! command { diff --git a/src/nix/deployment.rs b/src/nix/deployment.rs new file mode 100644 index 0000000..6656ab6 --- /dev/null +++ b/src/nix/deployment.rs @@ -0,0 +1,626 @@ +use std::cmp::max; +use std::sync::Arc; +use std::collections::HashMap; + +use futures::future::join_all; +use futures::join; +use indicatif::{MultiProgress, ProgressBar, ProgressDrawTarget}; +use tokio::sync::{Mutex, Semaphore}; + +use super::{Hive, Host, CopyOptions, host}; +use crate::progress::get_spinner_styles; + +/// Amount of RAM reserved for the system, in MB. +const EVAL_RESERVE_MB: u64 = 1024; + +/// Estimated amount of RAM needed to evaluate one host, in MB. +const EVAL_PER_HOST_MB: u64 = 512; + +const BATCH_OPERATION_LABEL: &'static str = "(...)"; + +macro_rules! set_up_batch_progress_bar { + ($multi:ident, $style:ident, $chunk:ident, $single_text:expr, $batch_text:expr) => {{ + let bar = $multi.add(ProgressBar::new(100)); + bar.set_style($style.clone()); + bar.enable_steady_tick(100); + + if $chunk.len() == 1 { + bar.set_prefix(&$chunk[0]); + bar.set_message($single_text); + } else { + bar.set_prefix(BATCH_OPERATION_LABEL); + bar.set_message(&format!($batch_text, $chunk.len())); + } + bar.inc(0); + + bar + }}; +} + +#[derive(Debug, Copy, Clone, PartialEq)] +pub enum DeploymentGoal { + /// Build the configurations only. + Build, + + /// Push the closures only. + Push, + + /// Make the configuration the boot default and activate now. + Switch, + + /// Make the configuration the boot default. + Boot, + + /// Activate the configuration, but don't make it the boot default. + Test, + + /// Show what would be done if this configuration were activated. + DryActivate, +} + +impl DeploymentGoal { + pub fn from_str(s: &str) -> Option { + match s { + "build" => Some(Self::Build), + "push" => Some(Self::Push), + "switch" => Some(Self::Switch), + "boot" => Some(Self::Boot), + "test" => Some(Self::Test), + "dry-activate" => Some(Self::DryActivate), + _ => None, + } + } + + pub fn as_str(&self) -> Option<&'static str> { + use DeploymentGoal::*; + match self { + Build => None, + Push => None, + Switch => Some("switch"), + Boot => Some("boot"), + Test => Some("test"), + DryActivate => Some("dry-activate"), + } + } + + pub fn success_str(&self) -> Option<&'static str> { + use DeploymentGoal::*; + match self { + Build => Some("Configuration built"), + Push => Some("Pushed"), + Switch => Some("Activation successful"), + Boot => Some("Will be activated next boot"), + Test => Some("Activation successful (test)"), + DryActivate => Some("Dry activation successful"), + } + } + + pub fn should_switch_profile(&self) -> bool { + use DeploymentGoal::*; + match self { + Boot | Switch => true, + _ => false, + } + } + + pub fn requires_activation(&self) -> bool { + use DeploymentGoal::*; + match self { + Build | Push => false, + _ => true, + } + } +} + +/// Internal deployment stages +#[derive(Debug)] +enum DeploymentStage { + Evaluate(Vec), + Build(Vec), + Apply(String), +} + +/// Results of a deployment to a node +#[derive(Debug)] +struct DeploymentResult { + /// Stage in which the deployment ended. + stage: DeploymentStage, + + /// Whether the deployment succeeded or not. + success: bool, + + /// Unstructured logs of the deployment. + logs: Option, +} + +impl DeploymentResult { + fn success(stage: DeploymentStage, logs: Option) -> Self { + Self { + stage, + success: true, + logs, + } + } + + fn failure(stage: DeploymentStage, logs: Option) -> Self { + Self { + stage, + success: true, + logs, + } + } + + fn is_successful(&self) -> bool { + self.success + } + + fn print(&self) { + use DeploymentStage::*; + + if self.is_successful() { + unimplemented!(); + } + + match &self.stage { + Evaluate(nodes) => { + self.print_failed_nodes("Evaluation of", &nodes); + } + Build(nodes) => { + self.print_failed_nodes("Build of", &nodes); + } + Apply(node) => { + self.print_failed_nodes("Deployment to", &vec![node.clone()]); + } + } + } + + fn print_failed_nodes(&self, prefix: &'static str, nodes: &Vec) { + let last_lines: Option> = self.logs.as_ref().map(|logs| { + logs.split("\n").collect::>().iter().rev().take(10).rev() + .map(|line| line.to_string()).collect() + }); + + let msg = if nodes.len() == 1 { + format!("{} {} failed.", prefix, nodes[0]) + } else { + format!("{} {} nodes failed.", prefix, nodes.len()) + }; + + if let Some(lines) = last_lines { + log::error!("{} Last {} lines of logs:", msg, lines.len()); + for line in lines { + log::error!("{}", line); + } + } + } +} + +#[derive(Debug)] +pub struct Deployment { + hive: Hive, + goal: DeploymentGoal, + nodes: Vec, + node_hosts: Mutex>>, + parallelism_limit: ParallelismLimit, + evaluation_node_limit: EvaluationNodeLimit, + options: DeploymentOptions, + results: Mutex>, +} + +impl Deployment { + pub fn new(hive: Hive, targets: HashMap>, goal: DeploymentGoal) -> Self { + let nodes: Vec = targets.keys().cloned().collect(); + + Self { + hive, + goal, + nodes, + node_hosts: Mutex::new(targets), + parallelism_limit: ParallelismLimit::default(), + evaluation_node_limit: EvaluationNodeLimit::default(), + options: DeploymentOptions::default(), + results: Mutex::new(Vec::new()), + } + } + + pub fn set_options(&mut self, options: DeploymentOptions) { + self.options = options; + } + + pub fn set_parallelism_limit(&mut self, limit: ParallelismLimit) { + self.parallelism_limit = limit; + } + + pub fn set_evaluation_node_limit(&mut self, limit: EvaluationNodeLimit) { + self.evaluation_node_limit = limit; + } + + /// Executes the deployment (user-facing) + /// + /// Self must be wrapped inside an Arc. + pub async fn execute(self: Arc) { + let multi = Arc::new(MultiProgress::new()); + let root_bar = Arc::new(multi.add(ProgressBar::new(100))); + let alignment = self.node_name_alignment(); + multi.set_draw_target(ProgressDrawTarget::stderr_nohz()); + + { + let (spinner_style, _) = get_spinner_styles(alignment); + root_bar.set_message("Running..."); + root_bar.set_style(spinner_style); + root_bar.tick(); + root_bar.enable_steady_tick(100); + } + + let arc_self = self.clone(); + let eval_limit = arc_self.clone().eval_limit(); + + // FIXME: Saner logging + let mut futures = Vec::new(); + + for chunk in self.nodes.chunks(eval_limit) { + let arc_self = self.clone(); + let multi = multi.clone(); + let (spinner_style, failing_spinner_style) = get_spinner_styles(alignment); + + // FIXME: Eww + let chunk: Vec = chunk.iter().map(|s| s.to_string()).collect(); + + futures.push(tokio::spawn(async move { + let drv = { + // Evaluation phase + let permit = arc_self.parallelism_limit.evaluation.acquire().await.unwrap(); + + let bar = set_up_batch_progress_bar!(multi, spinner_style, chunk, + "Evaluating configuration...", + "Evaluating configurations for {} nodes" + ); + + let drv = match arc_self.hive.eval_selected(&chunk, Some(bar.clone())).await { + Ok(drv) => { + bar.finish_and_clear(); + drv + } + Err(e) => { + bar.set_style(failing_spinner_style.clone()); + bar.abandon_with_message(&format!("Evalation failed: {}", e)); + + let mut results = arc_self.results.lock().await; + let stage = DeploymentStage::Evaluate(chunk.clone()); + results.push(DeploymentResult::failure(stage, None)); + return; + } + }; + + drop(permit); + drv + }; + + let profiles = { + // Build phase + let permit = arc_self.parallelism_limit.build.acquire().await.unwrap(); + + let bar = set_up_batch_progress_bar!(multi, spinner_style, chunk, + "Building configuration...", + "Building configurations for {} nodes" + ); + + // FIXME: Remote build? + let mut builder = host::local(); + + if arc_self.options.progress_bar { + builder.set_progress_bar(bar.clone()); + } + + let profiles = match drv.realize(&mut *builder).await { + Ok(profiles) => { + let goal = arc_self.goal; + if goal == DeploymentGoal::Build { + bar.finish_with_message(goal.success_str().unwrap()); + + let mut results = arc_self.results.lock().await; + let stage = DeploymentStage::Build(chunk.clone()); + let logs = builder.dump_logs().await.map(|s| s.to_string()); + results.push(DeploymentResult::success(stage, logs)); + return; + } else { + bar.finish_and_clear(); + profiles + } + } + Err(e) => { + bar.set_style(failing_spinner_style.clone()); + bar.abandon_with_message(&format!("Build failed: {}", e)); + + let mut results = arc_self.results.lock().await; + let stage = DeploymentStage::Build(chunk.clone()); + let logs = builder.dump_logs().await.map(|s| s.to_string()); + results.push(DeploymentResult::failure(stage, logs)); + return; + } + }; + + drop(permit); + profiles + }; + + // Apply phase + let mut futures = Vec::new(); + for node in chunk { + let arc_self = arc_self.clone(); + let multi = multi.clone(); + let spinner_style = spinner_style.clone(); + let failing_spinner_style = failing_spinner_style.clone(); + + let mut host = { + let mut node_hosts = arc_self.node_hosts.lock().await; + node_hosts.remove(&node).unwrap() + }; + let profile = profiles.get(&node).cloned() + .expect(&format!("Somehow profile for {} was not built", node)); + + futures.push(tokio::spawn(async move { + let permit = arc_self.parallelism_limit.apply.acquire().await.unwrap(); + + let bar = multi.add(ProgressBar::new(100)); + bar.set_style(spinner_style); + bar.set_prefix(&node); + bar.set_message("Starting..."); + bar.tick(); + bar.enable_steady_tick(100); + + if arc_self.options.progress_bar { + host.set_progress_bar(bar.clone()); + } + + let copy_options = arc_self.options.to_copy_options() + .include_outputs(true); + + let goal = arc_self.goal; + match host.deploy(&profile, goal, copy_options).await { + Ok(_) => { + bar.finish_with_message(goal.success_str().unwrap()); + + let mut results = arc_self.results.lock().await; + let stage = DeploymentStage::Apply(node.clone()); + let logs = host.dump_logs().await.map(|s| s.to_string()); + results.push(DeploymentResult::success(stage, logs)); + } + Err(e) => { + bar.set_style(failing_spinner_style); + bar.abandon_with_message(&format!("Failed: {}", e)); + + let mut results = arc_self.results.lock().await; + let stage = DeploymentStage::Apply(node.clone()); + let logs = host.dump_logs().await.map(|s| s.to_string()); + results.push(DeploymentResult::failure(stage, logs)); + } + } + + drop(permit); + })); + } + + join_all(futures).await; + })); + } + + let wait_for_tasks = tokio::spawn(async move { + join_all(futures).await; + root_bar.finish_with_message("Finished"); + }); + + let tasks_result = if self.options.progress_bar { + let wait_for_bars = tokio::task::spawn_blocking(move || { + multi.join().unwrap(); + }); + + let (tasks_result, _) = join!(wait_for_tasks, wait_for_bars); + + tasks_result + } else { + wait_for_tasks.await + }; + + if let Err(e) = tasks_result { + log::error!("Deployment process failed: {}", e); + } + + self.print_logs().await; + } + + async fn print_logs(&self) { + let results = self.results.lock().await; + for result in results.iter() { + if !result.is_successful() { + result.print(); + } + } + } + + fn node_name_alignment(&self) -> usize { + if let Some(len) = self.nodes.iter().map(|n| n.len()).max() { + max(BATCH_OPERATION_LABEL.len(), len) + } else { + BATCH_OPERATION_LABEL.len() + } + } + + fn eval_limit(&self) -> usize { + if let Some(limit) = self.evaluation_node_limit.get_limit() { + limit + } else { + self.nodes.len() + } + } +} + +#[derive(Debug)] +pub struct ParallelismLimit { + /// Limit of concurrent evaluation processes. + evaluation: Semaphore, + + /// Limit of concurrent build processes. + build: Semaphore, + + /// Limit of concurrent apply processes. + apply: Semaphore, +} + +impl Default for ParallelismLimit { + fn default() -> Self { + Self { + evaluation: Semaphore::new(1), + build: Semaphore::new(2), + apply: Semaphore::new(10), + } + } +} + +impl ParallelismLimit { + // Do we actually want this to be configurable? + /* + /// Sets the concurrent evaluation limit. + /// + /// This limits the number of evaluation processes, not + /// the number of nodes in each evaluation process. + /// The latter is controlled in DeploymentOptions. + pub fn set_evaluation_limit(&mut self, limit: usize) { + self.evaluation = Semaphore::new(limit); + } + */ + + /// Sets the concurrent build limit. + pub fn set_build_limit(&mut self, limit: usize) { + self.build = Semaphore::new(limit); + } + + /// Sets the concurrent apply limit. + pub fn set_apply_limit(&mut self, limit: usize) { + self.apply = Semaphore::new(limit); + } +} + +#[derive(Copy, Clone, Debug)] +pub struct DeploymentOptions { + /// Whether to show condensed progress bars. + /// + /// If set to false, verbose logs will be displayed instead. + progress_bar: bool, + + /// Whether to use binary caches when building. + substituters_build: bool, + + /// Whether to use binary caches when copying closures to remote hosts. + substituters_push: bool, + + /// Whether to use gzip when copying closures to remote hosts. + gzip: bool, +} + +impl Default for DeploymentOptions { + fn default() -> Self { + Self { + progress_bar: true, + substituters_build: true, + substituters_push: true, + gzip: true, + } + } +} + +impl DeploymentOptions { + pub fn set_progress_bar(&mut self, value: bool) { + self.progress_bar = value; + } + + pub fn set_substituters_build(&mut self, value: bool) { + self.substituters_build = value; + } + + pub fn set_substituters_push(&mut self, value: bool) { + self.substituters_push = value; + } + + pub fn set_gzip(&mut self, value: bool) { + self.gzip = value; + } + + fn to_copy_options(&self) -> CopyOptions { + let options = CopyOptions::default(); + + options + .use_substitutes(self.substituters_push) + .gzip(self.gzip) + } +} + +/// Limit of the number of nodes in each evaluation process. +/// +/// The evaluation process is very RAM-intensive, with memory +/// consumption scaling linearly with the number of nodes +/// evaluated at the same time. This can be a problem if you +/// are deploying to a large number of nodes at the same time, +/// where `nix-instantiate` may consume too much RAM and get +/// killed by the OS (`NixKilled` error). +/// +/// Evaluating each node on its own is not an efficient solution, +/// with total CPU time and memory consumption vastly exceeding the +/// case where we evaluate the same set of nodes at the same time +/// (TODO: Provide statistics). +/// +/// To overcome this problem, we split the evaluation process into +/// chunks when necessary, with the maximum number of nodes in +/// each `nix-instantiate` invocation determined with: +/// +/// - A simple heuristic based on remaining memory in the system +/// - A supplied number +/// - No limit at all +#[derive(Copy, Clone, Debug)] +pub enum EvaluationNodeLimit { + /// Use a naive heuristic based on available memory. + Heuristic, + + /// Supply the maximum number of nodes. + Manual(usize), + + /// Do not limit the number of nodes in each evaluation process + None, +} + +impl Default for EvaluationNodeLimit { + fn default() -> Self { + Self::Heuristic + } +} + +impl EvaluationNodeLimit { + /// Returns the maximum number of hosts in each evaluation. + /// + /// The result should be cached. + pub fn get_limit(&self) -> Option { + match self { + EvaluationNodeLimit::Heuristic => { + if let Ok(mem_info) = sys_info::mem_info() { + let mut mb = mem_info.avail / 1024; + + if mb >= EVAL_RESERVE_MB { + mb -= EVAL_RESERVE_MB; + } + + let nodes = mb / EVAL_PER_HOST_MB; + + if nodes == 0 { + Some(1) + } else { + Some(nodes as usize) + } + } else { + Some(10) + } + } + EvaluationNodeLimit::Manual(limit) => Some(*limit), + EvaluationNodeLimit::None => None, + } + } +} diff --git a/src/nix/hive.rs b/src/nix/hive.rs new file mode 100644 index 0000000..8d8f8fb --- /dev/null +++ b/src/nix/hive.rs @@ -0,0 +1,160 @@ +use std::collections::HashMap; +use std::io::Write; +use std::path::{Path, PathBuf}; + +use indicatif::ProgressBar; +use tempfile::{NamedTempFile, TempPath}; +use tokio::process::Command; +use serde::Serialize; + +use super::{ + StoreDerivation, + NixResult, + NodeConfig, + ProfileMap, +}; +use super::NixCommand; +use crate::util::CommandExecution; + +const HIVE_EVAL: &'static [u8] = include_bytes!("eval.nix"); + +#[derive(Debug)] +pub struct Hive { + hive: PathBuf, + eval_nix: TempPath, + show_trace: bool, +} + +impl Hive { + pub fn new>(hive: P) -> NixResult { + let mut eval_nix = NamedTempFile::new().unwrap(); + eval_nix.write_all(HIVE_EVAL).unwrap(); + + Ok(Self { + hive: hive.as_ref().to_owned(), + eval_nix: eval_nix.into_temp_path(), + show_trace: false, + }) + } + + pub fn show_trace(&mut self, value: bool) { + self.show_trace = value; + } + + pub fn as_path(&self) -> &Path { + &self.hive + } + + /// Retrieve deployment info for all nodes + pub async fn deployment_info(&self) -> NixResult> { + // FIXME: Really ugly :( + let s: String = self.nix_instantiate("hive.deploymentConfigJson").eval() + .capture_json().await?; + + Ok(serde_json::from_str(&s).unwrap()) + } + + /// Evaluates selected nodes. + /// + /// Evaluation may take up a lot of memory, so we make it possible + /// to split up the evaluation process into chunks and run them + /// concurrently with other processes (e.g., build and apply). + pub async fn eval_selected(&self, nodes: &Vec, progress_bar: Option) -> NixResult> { + let nodes_expr = SerializedNixExpresssion::new(nodes)?; + let expr = format!("hive.buildSelected {{ names = {}; }}", nodes_expr.expression()); + + let command = self.nix_instantiate(&expr).instantiate(); + let mut execution = CommandExecution::new("(eval)", command); + + if let Some(bar) = progress_bar { + execution.set_progress_bar(bar); + } + + let eval = execution + .capture_store_path().await?; + let drv = eval.to_derivation() + .expect("The result should be a store derivation"); + + Ok(drv) + } + + /// Evaluates an expression using values from the configuration + pub async fn introspect(&self, expression: String) -> NixResult { + let expression = format!("toJSON (hive.introspect ({}))", expression); + self.nix_instantiate(&expression).eval() + .capture_json().await + } + + fn nix_instantiate(&self, expression: &str) -> NixInstantiate { + NixInstantiate::new(&self, expression.to_owned()) + } +} + +struct NixInstantiate<'hive> { + hive: &'hive Hive, + expression: String, +} + +impl<'hive> NixInstantiate<'hive> { + fn new(hive: &'hive Hive, expression: String) -> Self { + Self { + hive, + expression, + } + } + + fn instantiate(self) -> Command { + // FIXME: unwrap + // Technically filenames can be arbitrary byte strings (OsStr), + // but Nix may not like it... + + let mut command = Command::new("nix-instantiate"); + command + .arg("--no-gc-warning") + .arg("-E") + .arg(format!( + "with builtins; let eval = import {}; hive = eval {{ rawHive = import {}; }}; in {}", + self.hive.eval_nix.to_str().unwrap(), + self.hive.as_path().to_str().unwrap(), + self.expression, + )); + + if self.hive.show_trace { + command.arg("--show-trace"); + } + + command + } + + fn eval(self) -> Command { + let mut command = self.instantiate(); + command.arg("--eval").arg("--json"); + command + } +} + +/// A serialized Nix expression. +/// +/// Very hacky and involves an Import From Derivation, so should be +/// avoided as much as possible. But I suppose it's more robust than attempting +/// to generate Nix expressions directly or escaping a JSON string to strip +/// off Nix interpolation. +struct SerializedNixExpresssion { + json_file: TempPath, +} + +impl SerializedNixExpresssion { + pub fn new<'de, T>(data: T) -> NixResult where T: Serialize { + let mut tmp = NamedTempFile::new()?; + let json = serde_json::to_vec(&data).expect("Could not serialize data"); + tmp.write_all(&json)?; + + Ok(Self { + json_file: tmp.into_temp_path(), + }) + } + + pub fn expression(&self) -> String { + format!("(builtins.fromJSON (builtins.readFile {}))", self.json_file.to_str().unwrap()) + } +} diff --git a/src/nix/host.rs b/src/nix/host.rs index 58a5778..8ff10e5 100644 --- a/src/nix/host.rs +++ b/src/nix/host.rs @@ -1,16 +1,15 @@ -use std::process::Stdio; use std::collections::HashSet; +use std::convert::TryInto; -use console::style; use async_trait::async_trait; -use tokio::io::{AsyncBufReadExt, BufReader}; use tokio::process::Command; use indicatif::ProgressBar; -use super::{StorePath, DeploymentGoal, NixResult, NixError, NixCommand, SYSTEM_PROFILE}; +use super::{StorePath, Profile, DeploymentGoal, NixResult, NixError, NixCommand, SYSTEM_PROFILE}; +use crate::util::CommandExecution; pub(crate) fn local() -> Box { - Box::new(Local {}) + Box::new(Local::new()) } #[derive(Copy, Clone, Debug)] @@ -81,9 +80,22 @@ pub trait Host: Send + Sync + std::fmt::Debug { Ok(paths) } + /// Pushes and optionally activates a profile to the host. + async fn deploy(&mut self, profile: &Profile, goal: DeploymentGoal, copy_options: CopyOptions) -> NixResult<()> { + self.copy_closure(profile.as_store_path(), CopyDirection::ToRemote, copy_options).await?; + + if goal.requires_activation() { + self.activate(profile, goal).await?; + } + + Ok(()) + } + #[allow(unused_variables)] /// Activates a system profile on the host, if it runs NixOS. - async fn activate(&mut self, profile: &StorePath, goal: DeploymentGoal) -> NixResult<()> { + /// + /// The profile must already exist on the host. You should probably use deploy instead. + async fn activate(&mut self, profile: &Profile, goal: DeploymentGoal) -> NixResult<()> { Err(NixError::Unsupported) } @@ -93,7 +105,7 @@ pub trait Host: Send + Sync + std::fmt::Debug { } /// Dumps human-readable unstructured log messages related to the host. - async fn dump_logs(&self) -> Option<&[String]> { + async fn dump_logs(&self) -> Option<&str> { None } } @@ -103,7 +115,19 @@ pub trait Host: Send + Sync + std::fmt::Debug { /// It may not be capable of realizing some derivations /// (e.g., building Linux derivations on macOS). #[derive(Debug)] -pub struct Local {} +pub struct Local { + progress_bar: Option, + logs: String, +} + +impl Local { + pub fn new() -> Self { + Self { + progress_bar: None, + logs: String::new(), + } + } +} #[async_trait] impl Host for Local { @@ -111,31 +135,58 @@ impl Host for Local { Ok(()) } async fn realize_remote(&mut self, derivation: &StorePath) -> NixResult> { - Command::new("nix-store") + let mut command = Command::new("nix-store"); + command .arg("--no-gc-warning") .arg("--realise") - .arg(derivation.as_path()) - .capture_output() - .await - .map(|paths| { - paths.lines().map(|p| p.to_string().into()).collect() - }) + .arg(derivation.as_path()); + + let mut execution = CommandExecution::new("local", command); + + if let Some(bar) = self.progress_bar.as_ref() { + execution.set_progress_bar(bar.clone()); + } + + execution.run().await?; + + let (stdout, _) = execution.get_logs(); + stdout.unwrap().lines().map(|p| p.to_string().try_into()).collect() } - async fn activate(&mut self, profile: &StorePath, goal: DeploymentGoal) -> NixResult<()> { - let profile = profile.as_path().to_str().unwrap(); + async fn activate(&mut self, profile: &Profile, goal: DeploymentGoal) -> NixResult<()> { if goal.should_switch_profile() { + let path = profile.as_path().to_str().unwrap(); Command::new("nix-env") .args(&["--profile", SYSTEM_PROFILE]) - .args(&["--set", profile]) + .args(&["--set", path]) .passthrough() .await?; } - let activation_command = format!("{}/bin/switch-to-configuration", profile); - Command::new(activation_command) - .arg(goal.as_str().unwrap()) - .passthrough() - .await + let activation_command = profile.activation_command(goal).unwrap(); + let mut command = Command::new(&activation_command[0]); + command + .args(&activation_command[1..]); + + let mut execution = CommandExecution::new("local", command); + + if let Some(bar) = self.progress_bar.as_ref() { + execution.set_progress_bar(bar.clone()); + } + + let result = execution.run().await; + + // FIXME: Bad - Order of lines is messed up + let (stdout, stderr) = execution.get_logs(); + self.logs += stdout.unwrap(); + self.logs += stderr.unwrap(); + + result + } + fn set_progress_bar(&mut self, bar: ProgressBar) { + self.progress_bar = Some(bar); + } + async fn dump_logs(&self) -> Option<&str> { + Some(&self.logs) } } @@ -150,8 +201,8 @@ pub struct SSH { friendly_name: String, path_cache: HashSet, - progress: Option, - logs: Vec, + progress_bar: Option, + logs: String, } #[async_trait] @@ -162,29 +213,33 @@ impl Host for SSH { } async fn realize_remote(&mut self, derivation: &StorePath) -> NixResult> { // FIXME - self.ssh(&["nix-store", "--no-gc-warning", "--realise", derivation.as_path().to_str().unwrap()]) + let paths = self.ssh(&["nix-store", "--no-gc-warning", "--realise", derivation.as_path().to_str().unwrap()]) .capture_output() - .await - .map(|paths| { - paths.lines().map(|p| p.to_string().into()).collect() - }) - } - async fn activate(&mut self, profile: &StorePath, goal: DeploymentGoal) -> NixResult<()> { - let profile = profile.as_path().to_str().unwrap(); + .await; + match paths { + Ok(paths) => { + paths.lines().map(|p| p.to_string().try_into()).collect() + } + Err(e) => Err(e), + } + } + async fn activate(&mut self, profile: &Profile, goal: DeploymentGoal) -> NixResult<()> { if goal.should_switch_profile() { - let set_profile = self.ssh(&["nix-env", "--profile", SYSTEM_PROFILE, "--set", profile]); + let path = profile.as_path().to_str().unwrap(); + let set_profile = self.ssh(&["nix-env", "--profile", SYSTEM_PROFILE, "--set", path]); self.run_command(set_profile).await?; } - let activation_command = format!("{}/bin/switch-to-configuration", profile); - let command = self.ssh(&[&activation_command, goal.as_str().unwrap()]); + let activation_command = profile.activation_command(goal).unwrap(); + let v: Vec<&str> = activation_command.iter().map(|s| &**s).collect(); + let command = self.ssh(&v); self.run_command(command).await } fn set_progress_bar(&mut self, bar: ProgressBar) { - self.progress = Some(bar); + self.progress_bar = Some(bar); } - async fn dump_logs(&self) -> Option<&[String]> { + async fn dump_logs(&self) -> Option<&str> { Some(&self.logs) } } @@ -197,44 +252,26 @@ impl SSH { host, friendly_name, path_cache: HashSet::new(), - progress: None, - logs: Vec::new(), + progress_bar: None, + logs: String::new(), } } - async fn run_command(&mut self, mut command: Command) -> NixResult<()> { - command.stdin(Stdio::null()); - command.stdout(Stdio::piped()); - command.stderr(Stdio::piped()); + async fn run_command(&mut self, command: Command) -> NixResult<()> { + let mut execution = CommandExecution::new(&self.friendly_name, command); - let mut child = command.spawn()?; - - let mut stderr = BufReader::new(child.stderr.as_mut().unwrap()); - - loop { - let mut line = String::new(); - let len = stderr.read_line(&mut line).await.unwrap(); - - if len == 0 { - break; - } - - let trimmed = line.trim_end(); - if let Some(progress) = self.progress.as_mut() { - progress.set_message(trimmed); - progress.inc(0); - } else { - eprintln!("{} | {}", style(&self.friendly_name).cyan(), trimmed); - } - self.logs.push(line); + if let Some(bar) = self.progress_bar.as_ref() { + execution.set_progress_bar(bar.clone()); } - let exit = child.wait().await?; - if exit.success() { - Ok(()) - } else { - Err(NixError::NixFailure { exit_code: exit.code().unwrap() }) - } + let result = execution.run().await; + + // FIXME: Bad - Order of lines is messed up + let (stdout, stderr) = execution.get_logs(); + self.logs += stdout.unwrap(); + self.logs += stderr.unwrap(); + + result } fn ssh_target(&self) -> String { diff --git a/src/nix/mod.rs b/src/nix/mod.rs index c30d511..5f857f7 100644 --- a/src/nix/mod.rs +++ b/src/nix/mod.rs @@ -1,29 +1,35 @@ -use std::path::{Path, PathBuf}; -use std::convert::AsRef; -use std::io::Write; +use std::convert::TryFrom; use std::process::Stdio; -use std::collections::HashMap; -use std::fs; use async_trait::async_trait; -use indicatif::ProgressBar; use serde::de::DeserializeOwned; -use serde::{Serialize, Deserialize}; +use serde::Deserialize; use snafu::Snafu; -use tempfile::{NamedTempFile, TempPath}; use tokio::process::Command; -use tokio::sync::Mutex; + +use crate::util::CommandExecution; pub mod host; pub use host::{Host, CopyDirection, CopyOptions}; use host::SSH; -const HIVE_EVAL: &'static [u8] = include_bytes!("eval.nix"); +pub mod hive; +pub use hive::Hive; + +pub mod store; +pub use store::{StorePath, StoreDerivation}; + +pub mod profile; +pub use profile::{Profile, ProfileMap}; + +pub mod deployment; +pub use deployment::{DeploymentGoal, Deployment}; pub const SYSTEM_PROFILE: &'static str = "/nix/var/nix/profiles/system"; pub type NixResult = Result; +#[non_exhaustive] #[derive(Debug, Snafu)] pub enum NixError { #[snafu(display("I/O Error: {}", error))] @@ -41,6 +47,12 @@ pub enum NixError { #[snafu(display("This operation is not supported"))] Unsupported, + #[snafu(display("Invalid Nix store path"))] + InvalidStorePath, + + #[snafu(display("Invalid NixOS system profile"))] + InvalidProfile, + #[snafu(display("Nix Error: {}", message))] Unknown { message: String }, } @@ -51,85 +63,8 @@ impl From for NixError { } } -pub struct Hive { - hive: PathBuf, - eval_nix: TempPath, - builder: Box, - show_trace: bool, -} - -impl Hive { - pub fn new>(hive: P) -> NixResult { - let mut eval_nix = NamedTempFile::new()?; - eval_nix.write_all(HIVE_EVAL)?; - - Ok(Self { - hive: hive.as_ref().to_owned(), - eval_nix: eval_nix.into_temp_path(), - builder: host::local(), - show_trace: false, - }) - } - - pub fn show_trace(&mut self, value: bool) { - self.show_trace = value; - } - - /// Retrieve deployment info for all nodes - pub async fn deployment_info(&self) -> NixResult> { - // FIXME: Really ugly :( - let s: String = self.nix_instantiate("hive.deploymentConfigJson").eval() - .capture_json().await?; - - Ok(serde_json::from_str(&s).unwrap()) - } - - /// Builds selected nodes - pub async fn build_selected(&mut self, nodes: Vec) -> NixResult> { - let nodes_expr = SerializedNixExpresssion::new(&nodes)?; - let expr = format!("hive.buildSelected {{ names = {}; }}", nodes_expr.expression()); - - self.build_common(&expr).await - } - - #[allow(dead_code)] - /// Builds all node configurations - pub async fn build_all(&mut self) -> NixResult> { - self.build_common("hive.buildAll").await - } - - /// Evaluates an expression using values from the configuration - pub async fn introspect(&mut self, expression: String) -> NixResult { - let expression = format!("toJSON (hive.introspect ({}))", expression); - self.nix_instantiate(&expression).eval() - .capture_json().await - } - - /// Builds node configurations - /// - /// Expects the resulting store path to point to a JSON file containing - /// a map of node name -> store path. - async fn build_common(&mut self, expression: &str) -> NixResult> { - let build: StorePath = self.nix_instantiate(expression).instantiate() - .capture_store_path().await?; - - let realization = self.builder.realize(&build).await?; - assert!(realization.len() == 1); - - let json = fs::read_to_string(&realization[0].as_path())?; - let result_map = serde_json::from_str(&json) - .expect("Bad result from our own build routine"); - - Ok(result_map) - } - - fn nix_instantiate(&self, expression: &str) -> NixInstantiate { - NixInstantiate::new(&self, expression.to_owned()) - } -} - #[derive(Debug, Clone, Deserialize)] -pub struct DeploymentConfig { +pub struct NodeConfig { #[serde(rename = "targetHost")] target_host: Option, @@ -141,7 +76,7 @@ pub struct DeploymentConfig { tags: Vec, } -impl DeploymentConfig { +impl NodeConfig { pub fn tags(&self) -> &[String] { &self.tags } pub fn allows_local_deployment(&self) -> bool { self.allow_local_deployment } @@ -154,110 +89,6 @@ impl DeploymentConfig { } } -#[derive(Debug, Copy, Clone)] -pub enum DeploymentGoal { - /// Push the closures only. - Push, - - /// Make the configuration the boot default and activate now. - Switch, - - /// Make the configuration the boot default. - Boot, - - /// Activate the configuration, but don't make it the boot default. - Test, - - /// Show what would be done if this configuration were activated. - DryActivate, -} - -impl DeploymentGoal { - pub fn from_str(s: &str) -> Option { - match s { - "push" => Some(Self::Push), - "switch" => Some(Self::Switch), - "boot" => Some(Self::Boot), - "test" => Some(Self::Test), - "dry-activate" => Some(Self::DryActivate), - _ => None, - } - } - - pub fn as_str(&self) -> Option<&'static str> { - use DeploymentGoal::*; - match self { - Push => None, - Switch => Some("switch"), - Boot => Some("boot"), - Test => Some("test"), - DryActivate => Some("dry-activate"), - } - } - - pub fn success_str(&self) -> Option<&'static str> { - use DeploymentGoal::*; - match self { - Push => Some("Pushed"), - Switch => Some("Activation successful"), - Boot => Some("Will be activated next boot"), - Test => Some("Activation successful (test)"), - DryActivate => Some("Dry activation successful"), - } - } - - pub fn should_switch_profile(&self) -> bool { - use DeploymentGoal::*; - match self { - Boot | Switch => true, - _ => false, - } - } -} - -struct NixInstantiate<'hive> { - hive: &'hive Hive, - expression: String, -} - -impl<'hive> NixInstantiate<'hive> { - fn new(hive: &'hive Hive, expression: String) -> Self { - Self { - hive, - expression, - } - } - - fn instantiate(self) -> Command { - // FIXME: unwrap - // Technically filenames can be arbitrary byte strings (OsStr), - // but Nix may not like it... - - let mut command = Command::new("nix-instantiate"); - command - .arg("--no-gc-warning") - .arg("-E") - .arg(format!( - "with builtins; let eval = import {}; hive = eval {{ rawHive = import {}; }}; in {}", - self.hive.eval_nix.to_str().unwrap(), - self.hive.hive.to_str().unwrap(), - self.expression, - )); - - if self.hive.show_trace { - command.arg("--show-trace"); - } - - command - } - - fn eval(self) -> Command { - let mut command = self.instantiate(); - command.arg("--eval").arg("--json"); - command - } -} - #[async_trait] trait NixCommand { async fn passthrough(&mut self) -> NixResult<()>; @@ -317,131 +148,37 @@ impl NixCommand for Command { /// Captures a single store path. async fn capture_store_path(&mut self) -> NixResult { let output = self.capture_output().await?; - Ok(StorePath(output.trim_end().into())) + let path = output.trim_end().to_owned(); + StorePath::try_from(path) } } -/// A Nix store path. -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct StorePath(PathBuf); - -impl StorePath { - /// Returns the store path - pub fn as_path(&self) -> &Path { - &self.0 +#[async_trait] +impl NixCommand for CommandExecution { + async fn passthrough(&mut self) -> NixResult<()> { + self.run().await } -} -impl From for StorePath { - fn from(s: String) -> Self { - Self(s.into()) + /// Captures output as a String. + async fn capture_output(&mut self) -> NixResult { + self.run().await?; + let (stdout, _) = self.get_logs(); + + Ok(stdout.unwrap().to_owned()) } -} -impl Into for StorePath { - fn into(self) -> PathBuf { - self.0 - } -} - -/// A serialized Nix expression. -/// -/// Very hacky and involves an Import From Derivation, so should be -/// avoided as much as possible. But I suppose it's more robust than attempting -/// to generate Nix expressions directly or escaping a JSON string to strip -/// off Nix interpolation. -struct SerializedNixExpresssion { - json_file: TempPath, -} - -impl SerializedNixExpresssion { - pub fn new<'de, T>(data: T) -> NixResult where T: Serialize { - let mut tmp = NamedTempFile::new()?; - let json = serde_json::to_vec(&data).expect("Could not serialize data"); - tmp.write_all(&json)?; - - Ok(Self { - json_file: tmp.into_temp_path(), + /// Captures deserialized output from JSON. + async fn capture_json(&mut self) -> NixResult where T: DeserializeOwned { + let output = self.capture_output().await?; + serde_json::from_str(&output).map_err(|_| NixError::BadOutput { + output: output.clone() }) } - pub fn expression(&self) -> String { - format!("(builtins.fromJSON (builtins.readFile {}))", self.json_file.to_str().unwrap()) - } -} - -#[derive(Debug)] -pub struct DeploymentTask { - /// Name of the target. - name: String, - - /// The target to deploy to. - target: Mutex>, - - /// Nix store path to the system profile to deploy. - profile: StorePath, - - /// The goal of this deployment. - goal: DeploymentGoal, - - /// Options used for copying closures to the remote host. - copy_options: CopyOptions, -} - -impl DeploymentTask { - pub fn new(name: String, target: Box, profile: StorePath, goal: DeploymentGoal) -> Self { - Self { - name, - target: Mutex::new(target), - profile, - goal, - copy_options: CopyOptions::default(), - } - } - - pub fn name(&self) -> &str { &self.name } - pub fn goal(&self) -> DeploymentGoal { self.goal } - - /// Set options used for copying closures to the remote host. - pub fn set_copy_options(&mut self, options: CopyOptions) { - self.copy_options = options; - } - - /// Set the progress bar used during deployment. - pub async fn set_progress_bar(&mut self, progress: ProgressBar) { - let mut target = self.target.lock().await; - target.set_progress_bar(progress); - } - - /// Executes the deployment. - pub async fn execute(&mut self) -> NixResult<()> { - match self.goal { - DeploymentGoal::Push => { - self.push().await - } - _ => { - self.push_and_activate().await - } - } - } - - /// Takes the Host out, consuming the DeploymentTask. - pub async fn to_host(self) -> Box { - self.target.into_inner() - } - - async fn push(&mut self) -> NixResult<()> { - let mut target = self.target.lock().await; - let options = self.copy_options.include_outputs(true); - - target.copy_closure(&self.profile, CopyDirection::ToRemote, options).await - } - - async fn push_and_activate(&mut self) -> NixResult<()> { - self.push().await?; - { - let mut target = self.target.lock().await; - target.activate(&self.profile, self.goal).await - } + /// Captures a single store path. + async fn capture_store_path(&mut self) -> NixResult { + let output = self.capture_output().await?; + let path = output.trim_end().to_owned(); + StorePath::try_from(path) } } diff --git a/src/nix/profile.rs b/src/nix/profile.rs new file mode 100644 index 0000000..2bc5634 --- /dev/null +++ b/src/nix/profile.rs @@ -0,0 +1,112 @@ +use std::collections::HashMap; +use std::convert::TryFrom; +use std::ops::{Deref, DerefMut}; +use std::fs; +use std::path::Path; + +use super::{ + DeploymentGoal, + NixResult, + NixError, + StorePath, +}; + +/// A NixOS system profile. +#[derive(Clone, Debug)] +pub struct Profile(StorePath); + +impl Profile { + pub fn from_store_path(path: StorePath) -> NixResult { + if + !path.is_dir() || + !path.join("bin/switch-to-configuration").exists() + { + return Err(NixError::InvalidProfile); + } + + if let None = path.to_str() { + Err(NixError::InvalidProfile) + } else { + Ok(Self(path)) + } + } + + /// Returns the command to activate this profile. + pub fn activation_command(&self, goal: DeploymentGoal) -> Option> { + if let Some(goal) = goal.as_str() { + let path = self.as_path().join("bin/switch-to-configuration"); + let switch_to_configuration = path.to_str() + .expect("The string should be UTF-8 valid") + .to_string(); + + let mut v = Vec::new(); + v.push(switch_to_configuration); + v.push(goal.to_string()); + + Some(v) + } else { + None + } + } + + /// Returns the store path. + pub fn as_store_path(&self) -> &StorePath { + &self.0 + } + + /// Returns the raw store path. + pub fn as_path(&self) -> &Path { + &self.0.as_path() + } +} + +/// A map of names to their associated NixOS system profiles. +#[derive(Debug)] +pub struct ProfileMap(HashMap); + +impl Deref for ProfileMap { + type Target = HashMap; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for ProfileMap { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +impl TryFrom> for ProfileMap { + type Error = NixError; + + fn try_from(paths: Vec) -> NixResult { + match paths.len() { + 0 => Err(NixError::BadOutput { + output: String::from("Build produced no outputs"), + }), + l if l > 1 => Err(NixError::BadOutput { + output: String::from("Build produced multiple outputs"), + }), + _ => { + // We expect a JSON file containing a + // HashMap + + let path = paths[0].as_path(); + let json: String = fs::read_to_string(path)?; + let mut raw_map: HashMap = serde_json::from_str(&json).map_err(|_| NixError::BadOutput { + output: String::from("The returned profile map is invalid"), + })?; + + let mut checked_map = HashMap::new(); + for (node, profile) in raw_map.drain() { + let profile = Profile::from_store_path(profile)?; + checked_map.insert(node, profile); + } + + Ok(Self(checked_map)) + } + } + } +} diff --git a/src/nix/store.rs b/src/nix/store.rs new file mode 100644 index 0000000..853cf39 --- /dev/null +++ b/src/nix/store.rs @@ -0,0 +1,86 @@ +use std::convert::{TryFrom, TryInto}; +use std::marker::PhantomData; +use std::path::{Path, PathBuf}; +use std::ops::Deref; + +use serde::{Serialize, Deserialize}; + +use super::{Host, NixResult, NixError}; + +/// A Nix store path. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct StorePath(PathBuf); + +impl StorePath { + /// Returns the raw store path. + pub fn as_path(&self) -> &Path { + &self.0 + } + + /// Determines whether the path points to a derivation. + pub fn is_derivation(&self) -> bool { + if let Some(ext) = self.0.extension() { + ext == "drv" + } else { + false + } + } + + /// Converts the store path into a store derivation. + pub fn to_derivation>>(self) -> Option> { + if self.is_derivation() { + Some(StoreDerivation::::from_store_path_unchecked(self)) + } else { + None + } + } +} + +impl Deref for StorePath { + type Target = Path; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl TryFrom for StorePath { + type Error = NixError; + + fn try_from(s: String) -> NixResult { + if s.starts_with("/nix/store/") { + Ok(Self(s.into())) + } else { + Err(NixError::InvalidStorePath) + } + } +} + +impl Into for StorePath { + fn into(self) -> PathBuf { + self.0 + } +} + +/// A store derivation (.drv) that will result in a T when built. +pub struct StoreDerivation>>{ + path: StorePath, + _target: PhantomData, +} + +impl>> StoreDerivation { + fn from_store_path_unchecked(path: StorePath) -> Self { + Self { + path, + _target: PhantomData, + } + } +} + +impl, Error=NixError>> StoreDerivation { + /// Builds the store derivation on a host, resulting in a T. + pub async fn realize(&self, host: &mut dyn Host) -> NixResult { + let paths: Vec = host.realize(&self.path).await?; + paths.try_into() + } +} diff --git a/src/util.rs b/src/util.rs index e1646cd..4937e63 100644 --- a/src/util.rs +++ b/src/util.rs @@ -1,17 +1,114 @@ use std::collections::HashMap; -use std::path::PathBuf; +use std::convert::AsRef; use std::fs; +use std::path::PathBuf; +use std::process::Stdio; use clap::{App, Arg, ArgMatches}; +use console::style; +use futures::future::join3; use glob::Pattern as GlobPattern; +use indicatif::ProgressBar; +use tokio::io::{AsyncRead, AsyncBufReadExt, BufReader}; +use tokio::process::Command; -use super::nix::{DeploymentConfig, Hive, NixResult}; +use super::nix::{NodeConfig, Hive, NixResult, NixError}; enum NodeFilter { NameFilter(GlobPattern), TagFilter(GlobPattern), } +/// Non-interactive execution of an arbitrary Nix command. +pub struct CommandExecution { + label: String, + command: Command, + progress_bar: Option, + stdout: Option, + stderr: Option, +} + +impl CommandExecution { + pub fn new>(label: S, command: Command) -> Self { + Self { + label: label.as_ref().to_string(), + command, + progress_bar: None, + stdout: None, + stderr: None, + } + } + + /// Provides a ProgressBar to use to display output. + pub fn set_progress_bar(&mut self, bar: ProgressBar) { + self.progress_bar = Some(bar); + } + + /// Retrieve logs from the last invocation. + pub fn get_logs(&self) -> (Option<&String>, Option<&String>) { + (self.stdout.as_ref(), self.stderr.as_ref()) + } + + /// Run the command. + pub async fn run(&mut self) -> NixResult<()> { + self.command.stdin(Stdio::null()); + self.command.stdout(Stdio::piped()); + self.command.stderr(Stdio::piped()); + + self.stdout = Some(String::new()); + self.stderr = Some(String::new()); + + let mut child = self.command.spawn()?; + + let stdout = BufReader::new(child.stdout.take().unwrap()); + let stderr = BufReader::new(child.stderr.take().unwrap()); + + async fn capture_stream(mut stream: BufReader, label: &str, mut progress_bar: Option) -> String { + let mut log = String::new(); + + loop { + let mut line = String::new(); + let len = stream.read_line(&mut line).await.unwrap(); + + if len == 0 { + break; + } + + let trimmed = line.trim_end(); + if let Some(progress_bar) = progress_bar.as_mut() { + progress_bar.set_message(trimmed); + } else { + eprintln!("{} | {}", style(label).cyan(), trimmed); + } + + log += trimmed; + log += "\n"; + } + + log + } + + let futures = join3( + capture_stream(stdout, &self.label, self.progress_bar.clone()), + capture_stream(stderr, &self.label, self.progress_bar.clone()), + child.wait(), + ); + + let (stdout_str, stderr_str, wait) = futures.await; + self.stdout = Some(stdout_str); + self.stderr = Some(stderr_str); + + let exit = wait?; + + if exit.success() { + Ok(()) + } else { + Err(NixError::NixFailure { exit_code: exit.code().unwrap() }) + } + } +} + + pub fn hive_from_args(args: &ArgMatches<'_>) -> NixResult { let path = match args.occurrences_of("config") { 0 => { @@ -84,7 +181,7 @@ pub fn hive_from_args(args: &ArgMatches<'_>) -> NixResult { Ok(hive) } -pub fn filter_nodes(nodes: &HashMap, filter: &str) -> Vec { +pub fn filter_nodes(nodes: &HashMap, filter: &str) -> Vec { let filters: Vec = filter.split(",").map(|pattern| { use NodeFilter::*; if let Some(tag_pattern) = pattern.strip_prefix("@") {