From d0ed138ef0688fbad1614b2ba8d2e7583644019a Mon Sep 17 00:00:00 2001 From: Zhaofeng Li Date: Fri, 18 Dec 2020 01:27:44 -0800 Subject: [PATCH] Refactoring and other stuff --- Cargo.lock | 16 ++ Cargo.toml | 1 + README.md | 18 +- default.nix | 2 +- src/command/apply.rs | 13 +- src/command/build.rs | 11 +- src/command/introspect.rs | 52 ++++ src/command/mod.rs | 1 + src/deployment.rs | 54 ++-- src/main.rs | 41 +-- src/{ => nix}/eval.nix | 8 +- src/nix/host.rs | 218 ++++++++++++++++ src/{nix.rs => nix/mod.rs} | 504 +++++++++++++------------------------ src/util.rs | 48 +++- 14 files changed, 595 insertions(+), 392 deletions(-) create mode 100644 src/command/introspect.rs rename src/{ => nix}/eval.nix (92%) create mode 100644 src/nix/host.rs rename src/{nix.rs => nix/mod.rs} (51%) diff --git a/Cargo.lock b/Cargo.lock index 2fa7a4b..264703f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -87,6 +87,7 @@ dependencies = [ "glob", "indicatif", "log", + "quit", "serde", "serde_json", "snafu", @@ -444,6 +445,21 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "quit" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94f4bb36a4259c52bb35ae60baf6242b9b33f7505e834d8d36f6227d21840756" +dependencies = [ + "quit_macros", +] + +[[package]] +name = "quit_macros" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff6923e4138faf97a0aaabf4a7b5ff1b0df28370786bead6200d2202c16e1b4e" + [[package]] name = "quote" version = "1.0.7" diff --git a/Cargo.toml b/Cargo.toml index 9bc231b..5adddfa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ futures = "0.3.8" glob = "0.3.0" indicatif = "0.15.0" log = "0.4.11" +quit = "1.1.2" serde = { version = "1.0.118", features = ["derive"] } serde_json = "1.0" snafu = "0.6.10" diff --git a/README.md b/README.md index ca741d5..731e004 100644 --- a/README.md +++ b/README.md @@ -66,11 +66,27 @@ Here is a sample `hive.nix` with two nodes, with some common configurations appl The full set of options can be found at `src/eval.nix`. Run `colmena build` in the same directory to build the configuration, or do `colmena apply` to deploy it to all nodes. +## `colmena introspect` + +Sometimes you may want to extract values from your Hive configuration for consumption in another program (e.g., [OctoDNS](https://github.com/octodns/octodns)). +To do that, create a `.nix` file with a lambda: + +```nix +{ nodes, pkgs, lib, ... }: +# Feels like a NixOS module - But you can return any JSON-serializable value +lib.attrsets.mapAttrs (k: v: v.config.deployment.targetHost) nodes +``` + +Then you can evaluate with: + +``` +colmena introspect your-lambda.nix +``` + ## Current limitations - It's required to use SSH keys to log into the remote hosts, and interactive authentication will not work. - There is no option to override SSH or `nix-copy-closure` options. -- Node tagging is not yet implemented. - Error reporting is lacking. ## Licensing diff --git a/default.nix b/default.nix index f930ad2..07888cd 100644 --- a/default.nix +++ b/default.nix @@ -1,5 +1,5 @@ { - pkgs ? import ./pkgs.nix, + pkgs ? import ./pkgs.nix {}, }: let cargo = pkgs.callPackage ./Cargo.nix {}; in cargo.rootCrate.build diff --git a/src/command/apply.rs b/src/command/apply.rs index f9d1756..77ceeec 100644 --- a/src/command/apply.rs +++ b/src/command/apply.rs @@ -37,28 +37,27 @@ pub fn subcommand() -> App<'static, 'static> { } pub async fn run(_global_args: &ArgMatches<'_>, local_args: &ArgMatches<'_>) { - let hive = Hive::from_config_arg(local_args).unwrap(); + let mut hive = Hive::from_config_arg(local_args).unwrap(); println!("Enumerating nodes..."); - let deployment_info = hive.deployment_info().await.unwrap(); - let all_nodes: Vec = deployment_info.keys().cloned().collect(); + let all_nodes = hive.deployment_info().await.unwrap(); let selected_nodes = match local_args.value_of("on") { Some(filter) => { util::filter_nodes(&all_nodes, filter) } - None => all_nodes.clone(), + None => all_nodes.keys().cloned().collect(), }; if selected_nodes.len() == 0 { println!("No hosts matched. Exiting..."); - return; + quit::with_code(2); } if selected_nodes.len() == all_nodes.len() { println!("Building all node configurations..."); } else { - println!("Selected {} out of {} hosts. Building node configurations...", selected_nodes.len(), deployment_info.len()); + println!("Selected {} out of {} hosts. Building node configurations...", selected_nodes.len(), all_nodes.len()); } // Some ugly argument mangling :/ @@ -76,7 +75,7 @@ pub async fn run(_global_args: &ArgMatches<'_>, local_args: &ArgMatches<'_>) { for (name, profile) in profiles.iter() { let task = DeploymentTask::new( name.clone(), - deployment_info.get(name).unwrap().clone(), + all_nodes.get(name).unwrap().to_host(), profile.clone(), goal, ); diff --git a/src/command/build.rs b/src/command/build.rs index b75b499..215a637 100644 --- a/src/command/build.rs +++ b/src/command/build.rs @@ -17,28 +17,27 @@ pub fn subcommand() -> App<'static, 'static> { } pub async fn run(_global_args: &ArgMatches<'_>, local_args: &ArgMatches<'_>) { - let hive = Hive::from_config_arg(local_args).unwrap(); + let mut hive = Hive::from_config_arg(local_args).unwrap(); println!("Enumerating nodes..."); - let deployment_info = hive.deployment_info().await.unwrap(); - let all_nodes: Vec = deployment_info.keys().cloned().collect(); + let all_nodes = hive.deployment_info().await.unwrap(); let selected_nodes = match local_args.value_of("on") { Some(filter) => { util::filter_nodes(&all_nodes, filter) } - None => all_nodes.clone(), + None => all_nodes.keys().cloned().collect(), }; if selected_nodes.len() == 0 { println!("No hosts matched. Exiting..."); - return; + quit::with_code(2); } if selected_nodes.len() == all_nodes.len() { println!("Building all node configurations..."); } else { - println!("Selected {} out of {} hosts. Building node configurations...", selected_nodes.len(), deployment_info.len()); + println!("Selected {} out of {} hosts. Building node configurations...", selected_nodes.len(), all_nodes.len()); } hive.build_selected(selected_nodes).await.unwrap(); diff --git a/src/command/introspect.rs b/src/command/introspect.rs new file mode 100644 index 0000000..d38bbb7 --- /dev/null +++ b/src/command/introspect.rs @@ -0,0 +1,52 @@ +use std::path::PathBuf; + +use clap::{Arg, App, SubCommand, ArgMatches}; + +use crate::nix::Hive; + +pub fn subcommand() -> App<'static, 'static> { + let command = SubCommand::with_name("introspect") + .about("Evaluate expressions using the complete configuration.") + .long_about(r#"Your expression should take an attribute set with keys `pkgs`, `lib` and `nodes` (like a NixOS module) and return a JSON-serializable value. + +For example, to retrieve the configuration of one node, you may write something like: + + { nodes, ... }: nodes.node-a.config.networking.hostName +"#) + .arg(Arg::with_name("expression_file") + .index(1) + .help("The .nix file containing the expression") + .takes_value(true)) + .arg(Arg::with_name("expression") + .short("E") + .help("The Nix expression") + .takes_value(true)) + .arg(Arg::with_name("config") + .short("f") + .long("config") + .help("Path to a Hive expression") + .default_value("hive.nix") + .required(true)) + ; + + command +} + +pub async fn run(_global_args: &ArgMatches<'_>, local_args: &ArgMatches<'_>) { + let mut hive = Hive::from_config_arg(local_args).unwrap(); + + if !(local_args.is_present("expression") ^ local_args.is_present("expression_file")) { + eprintln!("Either an expression (-E) xor a .nix file containing an expression should be specified, not both."); + quit::with_code(1); + } + + let expression = if local_args.is_present("expression") { + local_args.value_of("expression").unwrap().to_string() + } else { + let path: PathBuf = local_args.value_of("expression_file").unwrap().into(); + format!("import {}", path.canonicalize().expect("Could not generate absolute path to expression file.").to_str().unwrap()) + }; + + let result = hive.introspect(expression).await.unwrap(); + println!("{}", result); +} diff --git a/src/command/mod.rs b/src/command/mod.rs index 4c05a5b..4704f26 100644 --- a/src/command/mod.rs +++ b/src/command/mod.rs @@ -1,2 +1,3 @@ pub mod build; pub mod apply; +pub mod introspect; diff --git a/src/deployment.rs b/src/deployment.rs index 878e432..a48feb4 100644 --- a/src/deployment.rs +++ b/src/deployment.rs @@ -1,14 +1,15 @@ use std::cmp::min; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use indicatif::{MultiProgress, ProgressBar, ProgressDrawTarget}; use futures::future::join_all; +use tokio::sync::Mutex; -use crate::nix::{DeploymentTask, DeploymentResult}; +use crate::nix::DeploymentTask; use crate::progress::get_spinner_styles; /// User-facing deploy routine -pub async fn deploy(tasks: Vec>, max_parallelism: Option, progress_bar: bool) { +pub async fn deploy(tasks: Vec, max_parallelism: Option, progress_bar: bool) { let parallelism = match max_parallelism { Some(limit) => { min(limit, tasks.len()) @@ -32,7 +33,7 @@ pub async fn deploy(tasks: Vec>, max_parallelism: Option } let tasks = Arc::new(Mutex::new(tasks)); - let result_list: Arc>> = Arc::new(Mutex::new(Vec::new())); + let result_list: Arc>> = Arc::new(Mutex::new(Vec::new())); let mut futures = Vec::new(); @@ -48,7 +49,7 @@ pub async fn deploy(tasks: Vec>, max_parallelism: Option // Perform tasks until there's none loop { let (task, remaining) = { - let mut tasks = tasks.lock().unwrap(); + let mut tasks = tasks.lock().await; let task = tasks.pop(); let remaining = tasks.len(); (task, remaining) @@ -68,25 +69,23 @@ pub async fn deploy(tasks: Vec>, max_parallelism: Option bar.inc(0); if progress_bar { - task.set_progress_bar(&bar); - task.set_failing_spinner_style(failing_spinner_style.clone()); + task.set_progress_bar(bar.clone()).await; } match task.execute().await { - Ok(result) => { - if !result.success() { - bar.abandon_with_message("Failed") - } else { - bar.finish_with_message(task.goal().success_str().unwrap()); - } - bar.inc(0); - let mut result_list = result_list.lock().unwrap(); - result_list.push(result); + Ok(_) => { + bar.finish_with_message(task.goal().success_str().unwrap()); + + let mut result_list = result_list.lock().await; + result_list.push((task, true)); }, Err(e) => { println!("An error occurred while pushing to {}: {:?}", task.name(), e); bar.set_style(failing_spinner_style.clone()); bar.abandon_with_message("Internal error"); + + let mut result_list = result_list.lock().await; + result_list.push((task, false)); }, } @@ -109,10 +108,25 @@ pub async fn deploy(tasks: Vec>, max_parallelism: Option join_all(futures).await; - let result_list = result_list.lock().unwrap(); - for result in result_list.iter() { - if !result.success() { - println!("{}", result); + let mut result_list = result_list.lock().await; + for (task, success) in result_list.drain(..) { + if !success { + let name = task.name().to_owned(); + let host = task.to_host().await; + + print!("Failed to deploy to {}. ", name); + if let Some(logs) = host.dump_logs().await { + if let Some(lines) = logs.chunks(10).rev().next() { + println!("Last {} lines of logs:", lines.len()); + for line in lines { + println!("{}", line); + } + } else { + println!("The log is empty."); + } + } else { + println!("Logs are not available for this target."); + } } } } diff --git a/src/main.rs b/src/main.rs index 6ebe0f9..5ace10a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,26 +6,37 @@ mod progress; mod deployment; mod util; +macro_rules! command { + ($name:ident, $matches:ident) => { + if let Some(sub_matches) = $matches.subcommand_matches(stringify!($name)) { + command::$name::run(&$matches, &sub_matches).await; + return; + } + } +} + +macro_rules! bind_command { + ($name:ident, $app:ident) => { + $app = $app.subcommand(command::$name::subcommand()); + } +} + #[tokio::main(flavor = "multi_thread")] -async fn main() -> Result<(), Box>{ - let matches = App::new("Colmena") +async fn main() { + let mut app = App::new("Colmena") .version("0.1.0") .author("Zhaofeng Li ") .about("NixOS deployment tool") .global_setting(AppSettings::ColoredHelp) - .setting(AppSettings::ArgRequiredElseHelp) - .subcommand(command::apply::subcommand()) - .subcommand(command::build::subcommand()) - .get_matches(); + .setting(AppSettings::ArgRequiredElseHelp); - if let Some(sub_matches) = matches.subcommand_matches("build") { - command::build::run(&matches, &sub_matches).await; - return Ok(()); - } - if let Some(sub_matches) = matches.subcommand_matches("apply") { - command::apply::run(&matches, &sub_matches).await; - return Ok(()); - } + bind_command!(apply, app); + bind_command!(build, app); + bind_command!(introspect, app); - Ok(()) + let matches = app.get_matches(); + + command!(apply, matches); + command!(build, matches); + command!(introspect, matches); } diff --git a/src/eval.nix b/src/nix/eval.nix similarity index 92% rename from src/eval.nix rename to src/nix/eval.nix index 4ee9f19..c6cf133 100644 --- a/src/eval.nix +++ b/src/nix/eval.nix @@ -107,7 +107,7 @@ let value = evalNode name hive.${name}; }) nodeNames); - deploymentInfoJson = toJSON (lib.attrsets.mapAttrs (name: eval: eval.config.deployment) nodes); + deploymentConfigJson = toJSON (lib.attrsets.mapAttrs (name: eval: eval.config.deployment) nodes); toplevel = lib.attrsets.mapAttrs (name: eval: eval.config.system.build.toplevel) nodes; @@ -126,6 +126,10 @@ let echo "$json" > $out ''; }; + + introspect = function: function { + inherit pkgs lib nodes; + }; in { - inherit nodes deploymentInfoJson toplevel buildAll buildSelected; + inherit nodes deploymentConfigJson toplevel buildAll buildSelected introspect; } diff --git a/src/nix/host.rs b/src/nix/host.rs new file mode 100644 index 0000000..9bbabac --- /dev/null +++ b/src/nix/host.rs @@ -0,0 +1,218 @@ +use std::process::Stdio; +use std::collections::HashSet; + +use console::style; +use async_trait::async_trait; +use tokio::io::{AsyncBufReadExt, BufReader}; +use tokio::process::Command; +use indicatif::ProgressBar; + +use super::{StorePath, DeploymentGoal, NixResult, NixError, NixCommand}; + +pub(crate) fn local() -> Box { + Box::new(Local {}) +} + +#[derive(Copy, Clone, Debug)] +pub enum CopyDirection { + ToRemote, + FromRemote, +} + +/// A Nix(OS) host. +/// +/// The underlying implementation must be Send and Sync. +#[async_trait] +pub trait Host: Send + Sync + std::fmt::Debug { + /// Sends or receives the specified closure to the host + /// + /// The StorePath and its dependent paths will then exist on this host. + async fn copy_closure(&mut self, closure: &StorePath, direction: CopyDirection, include_outputs: bool) -> NixResult<()>; + + /// Realizes the specified derivation on the host + /// + /// The derivation must already exist on the host. + /// After realization, paths in the Vec will then + /// exist on the host. + async fn realize_remote(&mut self, derivation: &StorePath) -> NixResult>; + + /// Realizes the specified local derivation on the host then retrieves the outputs. + async fn realize(&mut self, derivation: &StorePath) -> NixResult> { + self.copy_closure(derivation, CopyDirection::ToRemote, false).await?; + let paths = self.realize_remote(derivation).await?; + self.copy_closure(derivation, CopyDirection::FromRemote, true).await?; + + Ok(paths) + } + + #[allow(unused_variables)] + /// Activates a system profile on the host, if it runs NixOS. + async fn activate(&mut self, profile: &StorePath, goal: DeploymentGoal) -> NixResult<()> { + Err(NixError::Unsupported) + } + + #[allow(unused_variables)] + /// Provides a ProgressBar to use during operations. + fn set_progress_bar(&mut self, bar: ProgressBar) { + } + + /// Dumps human-readable unstructured log messages related to the host. + async fn dump_logs(&self) -> Option<&[String]> { + None + } +} + +/// The local machine running Colmena. +/// +/// It may not be capable of realizing some derivations +/// (e.g., building Linux derivations on macOS). +#[derive(Debug)] +pub struct Local {} + +#[async_trait] +impl Host for Local { + async fn copy_closure(&mut self, _closure: &StorePath, _direction: CopyDirection, _include_outputs: bool) -> NixResult<()> { + Ok(()) + } + async fn realize_remote(&mut self, derivation: &StorePath) -> NixResult> { + Command::new("nix-store") + .arg("--realise") + .arg(derivation.as_path()) + .capture_output() + .await + .map(|paths| { + paths.lines().map(|p| p.to_string().into()).collect() + }) + } +} + +/// A remote machine connected over SSH. +#[derive(Debug)] +pub struct SSH { + /// The username to use to connect. + user: String, + + /// The hostname or IP address to connect to. + host: String, + + friendly_name: String, + path_cache: HashSet, + progress: Option, + logs: Vec, +} + +#[async_trait] +impl Host for SSH { + async fn copy_closure(&mut self, closure: &StorePath, direction: CopyDirection, include_outputs: bool) -> NixResult<()> { + let command = self.nix_copy_closure(closure, direction, include_outputs); + self.run_command(command).await + } + async fn realize_remote(&mut self, derivation: &StorePath) -> NixResult> { + // FIXME + self.ssh(&["nix-store", "--realise", derivation.as_path().to_str().unwrap()]) + .capture_output() + .await + .map(|paths| { + paths.lines().map(|p| p.to_string().into()).collect() + }) + } + async fn activate(&mut self, profile: &StorePath, goal: DeploymentGoal) -> NixResult<()> { + let activation_command = format!("{}/bin/switch-to-configuration", profile.as_path().to_str().unwrap()); + let command = self.ssh(&[&activation_command, goal.as_str().unwrap()]); + self.run_command(command).await + } + fn set_progress_bar(&mut self, bar: ProgressBar) { + self.progress = Some(bar); + } + async fn dump_logs(&self) -> Option<&[String]> { + Some(&self.logs) + } +} + +impl SSH { + pub fn new(user: String, host: String) -> SSH { + let friendly_name = host.clone(); + Self { + user, + host, + friendly_name, + path_cache: HashSet::new(), + progress: None, + logs: Vec::new(), + } + } + + async fn run_command(&mut self, mut command: Command) -> NixResult<()> { + command.stdin(Stdio::null()); + command.stdout(Stdio::piped()); + command.stderr(Stdio::piped()); + + let mut child = command.spawn()?; + + let mut stderr = BufReader::new(child.stderr.as_mut().unwrap()); + + loop { + let mut line = String::new(); + let len = stderr.read_line(&mut line).await.unwrap(); + + if len == 0 { + break; + } + + let trimmed = line.trim_end(); + if let Some(progress) = self.progress.as_mut() { + progress.set_message(trimmed); + progress.inc(0); + } else { + println!("{} | {}", style(&self.friendly_name).cyan(), trimmed); + } + self.logs.push(line); + } + let exit = child.wait().await?; + + if exit.success() { + Ok(()) + } else { + Err(NixError::NixFailure { exit_code: exit.code().unwrap() }) + } + } + + fn ssh_target(&self) -> String { + format!("{}@{}", self.user, self.host) + } + + fn nix_copy_closure(&self, path: &StorePath, direction: CopyDirection, include_outputs: bool) -> Command { + let mut command = Command::new("nix-copy-closure"); + match direction { + CopyDirection::ToRemote => { + command.arg("--to"); + } + CopyDirection::FromRemote => { + command.arg("--from"); + } + } + if include_outputs { + command.arg("--include-outputs"); + } + + command + .arg("--gzip") + .arg("--use-substitutes") + .arg(&self.ssh_target()) + .arg(path.as_path()); + + command + } + + fn ssh(&self, command: &[&str]) -> Command { + // TODO: Allow configuation of SSH parameters + + let mut cmd = Command::new("ssh"); + cmd.arg(self.ssh_target()) + .args(&["-o", "StrictHostKeyChecking=accept-new"]) + .arg("--") + .args(command); + + cmd + } +} diff --git a/src/nix.rs b/src/nix/mod.rs similarity index 51% rename from src/nix.rs rename to src/nix/mod.rs index 2f86aad..1fcf075 100644 --- a/src/nix.rs +++ b/src/nix/mod.rs @@ -1,28 +1,135 @@ -//! A Colmena Hive. - use std::path::{Path, PathBuf}; use std::convert::AsRef; use std::io::Write; -use std::process::{ExitStatus, Stdio}; +use std::process::Stdio; use std::collections::HashMap; use std::fs; -use std::fmt; -use console::style; use async_trait::async_trait; use clap::ArgMatches; -use indicatif::{ProgressBar, ProgressStyle}; +use indicatif::ProgressBar; use serde::de::DeserializeOwned; use serde::{Serialize, Deserialize}; use snafu::Snafu; use tempfile::{NamedTempFile, TempPath}; -use tokio::io::{AsyncBufReadExt, BufReader}; use tokio::process::Command; +use tokio::sync::Mutex; + +mod host; +pub use host::{Host, CopyDirection}; +use host::SSH; const HIVE_EVAL: &'static [u8] = include_bytes!("eval.nix"); +pub type NixResult = Result; + +#[derive(Debug, Snafu)] +pub enum NixError { + #[snafu(display("I/O Error: {}", error))] + IoError { error: std::io::Error }, + + #[snafu(display("Nix returned invalid response: {}", output))] + BadOutput { output: String }, + + #[snafu(display("Nix exited with error code: {}", exit_code))] + NixFailure { exit_code: i32 }, + + #[snafu(display("Nix was interrupted"))] + NixKilled, + + #[snafu(display("This operation is not supported"))] + Unsupported, + + #[snafu(display("Nix Error: {}", message))] + Unknown { message: String }, +} + +impl From for NixError { + fn from(error: std::io::Error) -> Self { + Self::IoError { error } + } +} + +pub struct Hive { + hive: PathBuf, + eval_nix: TempPath, + builder: Box, +} + +impl Hive { + pub fn new>(hive: P) -> NixResult { + let mut eval_nix = NamedTempFile::new()?; + eval_nix.write_all(HIVE_EVAL)?; + + Ok(Self { + hive: hive.as_ref().to_owned(), + eval_nix: eval_nix.into_temp_path(), + builder: host::local(), + }) + } + + pub fn from_config_arg(args: &ArgMatches<'_>) -> NixResult { + let path = args.value_of("config").expect("The config arg should exist").to_owned(); + let path = canonicalize_path(path); + + Self::new(path) + } + + /// Retrieve deployment info for all nodes + pub async fn deployment_info(&self) -> NixResult> { + // FIXME: Really ugly :( + let s: String = self.nix_instantiate("hive.deploymentConfigJson").eval() + .capture_json().await?; + + Ok(serde_json::from_str(&s).unwrap()) + } + + /// Builds selected nodes + pub async fn build_selected(&mut self, nodes: Vec) -> NixResult> { + let nodes_expr = SerializedNixExpresssion::new(&nodes)?; + let expr = format!("hive.buildSelected {{ names = {}; }}", nodes_expr.expression()); + + self.build_common(&expr).await + } + + #[allow(dead_code)] + /// Builds all node configurations + pub async fn build_all(&mut self) -> NixResult> { + self.build_common("hive.buildAll").await + } + + /// Evaluates an expression using values from the configuration + pub async fn introspect(&mut self, expression: String) -> NixResult { + let expression = format!("toJSON (hive.introspect ({}))", expression); + self.nix_instantiate(&expression).eval() + .capture_json().await + } + + /// Builds node configurations + /// + /// Expects the resulting store path to point to a JSON file containing + /// a map of node name -> store path. + async fn build_common(&mut self, expression: &str) -> NixResult> { + let build: StorePath = self.nix_instantiate(expression).instantiate() + .capture_store_path().await?; + + let realization = self.builder.realize(&build).await?; + assert!(realization.len() == 1); + + let json = fs::read_to_string(&realization[0].as_path())?; + let result_map = serde_json::from_str(&json) + .expect("Bad result from our own build routine"); + + Ok(result_map) + } + + fn nix_instantiate(&self, expression: &str) -> NixInstantiate { + NixInstantiate::new(&self.eval_nix, &self.hive, expression.to_owned()) + } +} + #[derive(Debug, Clone, Deserialize)] -pub struct DeploymentInfo { +pub struct DeploymentConfig { #[serde(rename = "targetHost")] target_host: String, @@ -31,25 +138,12 @@ pub struct DeploymentInfo { tags: Vec, } -#[derive(Debug)] -pub struct DeploymentTask<'task> { - /// Name of the target. - name: String, - - /// The target to deploy to. - target: DeploymentInfo, - - /// Nix store path to the system profile to deploy. - profile: PathBuf, - - /// The goal of this deployment. - goal: DeploymentGoal, - - /// A ProgressBar to show the deployment progress to the user. - progress: Option<&'task ProgressBar>, - - /// The ProgressStyle to set when the deployment is failing. - failing_spinner_style: Option, +impl DeploymentConfig { + pub fn tags(&self) -> &[String] { &self.tags } + pub fn to_host(&self) -> Box { + let host = SSH::new(self.target_user.clone(), self.target_host.clone()); + Box::new(host) + } } #[derive(Debug, Copy, Clone)] @@ -105,112 +199,6 @@ impl DeploymentGoal { } } -/// Results of a DeploymentTask to show to the user. -pub struct DeploymentResult { - name: String, - push_output: Option, - push_successful: Option, - activate_output: Option, - activate_successful: Option, -} - -impl DeploymentResult { - fn new(name: String) -> Self { - Self { - name, - push_output: None, - push_successful: None, - activate_output: None, - activate_successful: None, - } - } - - /// Whether the deployment was successful overall. - pub fn success(&self) -> bool { - if let Some(push_successful) = self.push_successful { - if !push_successful { - return false; - } - } - - if let Some(activate_successful) = self.activate_successful { - if !activate_successful { - return false; - } - } - - true - } - - fn dump_log(f: &mut fmt::Formatter<'_>, output: Option<&String>) -> fmt::Result { - if let Some(output) = output { - writeln!(f, "Last 10 lines of log:")?; - writeln!(f, "~~~~~~~~~~")?; - let lines: Vec<&str> = output.split("\n").collect(); - - let start = if lines.len() < 10 { - 0 - } else { - lines.len() - 10 - }; - - for i in start..lines.len() { - writeln!(f, "{}", lines[i])?; - } - writeln!(f, "~~~~~~~~~~")?; - } - - writeln!(f) - } -} - -impl fmt::Display for DeploymentResult { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - if let Some(push_successful) = self.push_successful { - if push_successful { - writeln!(f, "Deployment on node {} succeeded.", self.name)?; - } else { - write!(f, "Deployment on node {} failed. ", self.name)?; - Self::dump_log(f, self.push_output.as_ref())?; - } - } - if let Some(activate_successful) = self.activate_successful { - if activate_successful { - writeln!(f, "Activation on node {} succeeded.", self.name)?; - } else { - write!(f, "Activation on node {} failed.", self.name)?; - Self::dump_log(f, self.activate_output.as_ref())?; - } - } - Ok(()) - } -} - -#[derive(Debug, Snafu)] -pub enum NixError { - #[snafu(display("I/O Error: {}", error))] - IoError { error: std::io::Error }, - - #[snafu(display("Nix returned invalid response: {}", output))] - BadOutput { output: String }, - - #[snafu(display("Nix exited with error code: {}", exit_code))] - NixFailure { exit_code: i32 }, - - #[snafu(display("Nix was interrupted"))] - NixKilled, - - #[snafu(display("Nix Error: {}", message))] - Unknown { message: String }, -} - -pub type NixResult = Result; - -pub struct Hive { - hive: PathBuf, - eval_nix: TempPath, -} - struct NixInstantiate<'hive> { eval_nix: &'hive Path, hive: &'hive Path, @@ -263,11 +251,9 @@ impl NixCommand for Command { /// Runs the command with stdout and stderr passed through to the user. async fn passthrough(&mut self) -> NixResult<()> { let exit = self - .spawn() - .map_err(map_io_error)? + .spawn()? .wait() - .await - .map_err(map_io_error)?; + .await?; if exit.success() { Ok(()) @@ -285,11 +271,9 @@ impl NixCommand for Command { let output = self .stdout(Stdio::piped()) .stderr(Stdio::inherit()) - .spawn() - .map_err(map_io_error)? + .spawn()? .wait_with_output() - .await - .map_err(map_io_error)?; + .await?; if output.status.success() { // FIXME: unwrap @@ -318,20 +302,25 @@ impl NixCommand for Command { } /// A Nix store path. -#[derive(Debug, Serialize, Deserialize)] -struct StorePath(PathBuf); +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct StorePath(PathBuf); impl StorePath { - /// Builds the store path. - pub async fn realise(&self) -> NixResult> { - Command::new("nix-store") - .arg("--realise") - .arg(&self.0) - .capture_output() - .await - .map(|paths| { - paths.lines().map(|p| p.into()).collect() - }) + /// Returns the store path + pub fn as_path(&self) -> &Path { + &self.0 + } +} + +impl From for StorePath { + fn from(s: String) -> Self { + Self(s.into()) + } +} + +impl Into for StorePath { + fn into(self) -> PathBuf { + self.0 } } @@ -347,9 +336,9 @@ struct SerializedNixExpresssion { impl SerializedNixExpresssion { pub fn new<'de, T>(data: T) -> NixResult where T: Serialize { - let mut tmp = NamedTempFile::new().map_err(map_io_error)?; + let mut tmp = NamedTempFile::new()?; let json = serde_json::to_vec(&data).expect("Could not serialize data"); - tmp.write_all(&json).map_err(map_io_error)?; + tmp.write_all(&json)?; Ok(Self { json_file: tmp.into_temp_path(), @@ -361,84 +350,28 @@ impl SerializedNixExpresssion { } } -impl Hive { - pub fn new>(hive: P) -> NixResult { - let mut eval_nix = NamedTempFile::new().map_err(map_io_error)?; - eval_nix.write_all(HIVE_EVAL).map_err(map_io_error)?; +#[derive(Debug)] +pub struct DeploymentTask { + /// Name of the target. + name: String, - Ok(Self { - hive: hive.as_ref().to_owned(), - eval_nix: eval_nix.into_temp_path(), - }) - } + /// The target to deploy to. + target: Mutex>, - pub fn from_config_arg(args: &ArgMatches<'_>) -> NixResult { - let path = args.value_of("config").expect("The config arg should exist").to_owned(); - let path = canonicalize_path(path); + /// Nix store path to the system profile to deploy. + profile: StorePath, - Self::new(path) - } - - /// Retrieve a list of nodes in the hive - pub async fn nodes(&self) -> NixResult> { - self.nix_instantiate("attrNames hive.nodes").eval() - .capture_json().await - } - - /// Retrieve deployment info for all nodes - pub async fn deployment_info(&self) -> NixResult> { - // FIXME: Really ugly :( - let s: String = self.nix_instantiate("hive.deploymentInfoJson").eval() - .capture_json().await?; - - Ok(serde_json::from_str(&s).unwrap()) - } - - /// Builds selected nodes - pub async fn build_selected(&self, nodes: Vec) -> NixResult> { - let nodes_expr = SerializedNixExpresssion::new(&nodes)?; - let expr = format!("hive.buildSelected {{ names = {}; }}", nodes_expr.expression()); - - self.build_common(&expr).await - } - - /// Builds all node configurations - pub async fn build_all(&self) -> NixResult> { - self.build_common("hive.buildAll").await - } - - /// Builds node configurations - /// - /// Expects the resulting store path to point to a JSON file containing - /// a map of node name -> store path. - async fn build_common(&self, expression: &str) -> NixResult> { - let build: StorePath = self.nix_instantiate(expression).instantiate() - .capture_store_path().await?; - - let realization = build.realise().await?; - assert!(realization.len() == 1); - - let json = fs::read_to_string(&realization[0]).map_err(map_io_error)?; - let result_map: HashMap = serde_json::from_str(&json) - .expect("Bad result from our own build routine"); - - Ok(result_map) - } - - fn nix_instantiate(&self, expression: &str) -> NixInstantiate { - NixInstantiate::new(&self.eval_nix, &self.hive, expression.to_owned()) - } + /// The goal of this deployment. + goal: DeploymentGoal, } -impl<'task> DeploymentTask<'task> { - pub fn new(name: String, target: DeploymentInfo, profile: PathBuf, goal: DeploymentGoal) -> Self { +impl DeploymentTask { + pub fn new(name: String, target: Box, profile: StorePath, goal: DeploymentGoal) -> Self { Self { name, - target, + target: Mutex::new(target), profile, goal, - progress: None, - failing_spinner_style: None, } } @@ -446,16 +379,13 @@ impl<'task> DeploymentTask<'task> { pub fn goal(&self) -> DeploymentGoal { self.goal } /// Set the progress bar used during deployment. - pub fn set_progress_bar(&mut self, progress: &'task ProgressBar) { - self.progress = Some(progress); + pub async fn set_progress_bar(&mut self, progress: ProgressBar) { + let mut target = self.target.lock().await; + target.set_progress_bar(progress); } - /// Set a spinner style to switch to when the deployment is failing. - pub fn set_failing_spinner_style(&mut self, style: ProgressStyle) { - self.failing_spinner_style = Some(style); - } - - pub async fn execute(&mut self) -> NixResult { + /// Executes the deployment. + pub async fn execute(&mut self) -> NixResult<()> { match self.goal { DeploymentGoal::Push => { self.push().await @@ -466,113 +396,25 @@ impl<'task> DeploymentTask<'task> { } } - async fn push(&mut self) -> NixResult { - let mut result = DeploymentResult::new(self.name.clone()); - - // Issue of interest: - // https://github.com/NixOS/nix/issues?q=ipv6 - let target = format!("{}@{}", self.target.target_user, self.target.target_host); - let mut command = Command::new("nix-copy-closure"); - command - .arg("--to") - .arg("--gzip") - .arg("--include-outputs") - .arg("--use-substitutes") - .arg(&target) - .arg(&self.profile); - - let (exit, output) = self.run_command(&mut command, false).await?; - - if let Some(progress) = self.progress.as_mut() { - if !exit.success() { - if self.failing_spinner_style.is_some() { - let style = self.failing_spinner_style.as_ref().unwrap().clone(); - progress.set_style(style); - } - } - } - - result.push_successful = Some(exit.success()); - result.push_output = output; - - Ok(result) + /// Takes the Host out, consuming the DeploymentTask. + pub async fn to_host(self) -> Box { + self.target.into_inner() } - async fn push_and_activate(&mut self) -> NixResult { - let mut result = self.push().await?; - - if !result.success() { - // Don't go any further - return Ok(result); - } - - let target = format!("{}@{}", self.target.target_user, self.target.target_host); - let activation_command = format!("{}/bin/switch-to-configuration", self.profile.to_str().unwrap()); - let mut command = Command::new("ssh"); - command - .arg("-o") - .arg("StrictHostKeyChecking=accept-new") - .arg(&target) - .arg("--") - .arg(activation_command) - .arg(self.goal.as_str().unwrap()); - - let (exit, output) = self.run_command(&mut command, true).await?; - - if let Some(progress) = self.progress.as_mut() { - if !exit.success() { - if self.failing_spinner_style.is_some() { - let style = self.failing_spinner_style.as_ref().unwrap().clone(); - progress.set_style(style); - } - } - } - - result.activate_successful = Some(exit.success()); - result.activate_output = output; - - Ok(result) + async fn push(&mut self) -> NixResult<()> { + let mut target = self.target.lock().await; + target.copy_closure(&self.profile, CopyDirection::ToRemote, true).await } - async fn run_command(&mut self, command: &mut Command, capture_stdout: bool) -> NixResult<(ExitStatus, Option)> { - command.stdin(Stdio::null()); - command.stderr(Stdio::piped()); - - if capture_stdout { - command.stdout(Stdio::piped()); + async fn push_and_activate(&mut self) -> NixResult<()> { + self.push().await?; + { + let mut target = self.target.lock().await; + target.activate(&self.profile, self.goal).await } - - let mut child = command.spawn().map_err(map_io_error)?; - - let mut stderr = BufReader::new(child.stderr.as_mut().unwrap()); - let mut output = String::new(); - - loop { - let mut line = String::new(); - let len = stderr.read_line(&mut line).await.unwrap(); - - if len == 0 { - break; - } - - let trimmed = line.trim_end(); - if let Some(progress) = self.progress.as_mut() { - progress.set_message(trimmed); - progress.inc(0); - } else { - println!("{} | {}", style(&self.name).cyan(), trimmed); - } - output += &line; - } - let exit = child.wait().await.map_err(map_io_error)?; - Ok((exit, Some(output))) } } -fn map_io_error(error: std::io::Error) -> NixError { - NixError::IoError { error } -} - fn canonicalize_path(path: String) -> PathBuf { if !path.starts_with("/") { format!("./{}", path).into() diff --git a/src/util.rs b/src/util.rs index bc14501..4db0b1f 100644 --- a/src/util.rs +++ b/src/util.rs @@ -1,21 +1,50 @@ +use std::collections::HashMap; + use clap::{Arg, App}; use glob::Pattern as GlobPattern; -pub fn filter_nodes(nodes: &Vec, filter: &str) -> Vec { - let filters: Vec = filter.split(",").map(|pattern| GlobPattern::new(pattern).unwrap()).collect(); +use super::nix::DeploymentConfig; + +enum NodeFilter { + NameFilter(GlobPattern), + TagFilter(GlobPattern), +} + +pub fn filter_nodes(nodes: &HashMap, filter: &str) -> Vec { + let filters: Vec = filter.split(",").map(|pattern| { + use NodeFilter::*; + if let Some(tag_pattern) = pattern.strip_prefix("@") { + TagFilter(GlobPattern::new(tag_pattern).unwrap()) + } else { + NameFilter(GlobPattern::new(pattern).unwrap()) + } + }).collect(); if filters.len() > 0 { - nodes.iter().filter(|name| { + nodes.iter().filter_map(|(name, node)| { for filter in filters.iter() { - if filter.matches(name) { - return true; + use NodeFilter::*; + match filter { + TagFilter(pat) => { + // Welp + for tag in node.tags() { + if pat.matches(tag) { + return Some(name); + } + } + } + NameFilter(pat) => { + if pat.matches(name) { + return Some(name) + } + } } } - false + None }).cloned().collect() } else { - nodes.to_owned() + nodes.keys().cloned().collect() } } @@ -30,11 +59,12 @@ pub fn register_common_args<'a, 'b>(command: App<'a, 'b>) -> App<'a, 'b> { .arg(Arg::with_name("on") .long("on") .help("Select a list of machines") - .long_help(r#"The list is comma-separated and globs are supported. + .long_help(r#"The list is comma-separated and globs are supported. To match tags, prepend the filter by @. Valid examples: - host1,host2,host3 - edge-* -- edge-*,core-*"#) +- edge-*,core-* +- @a-tag,@tags-can-have-*"#) .takes_value(true)) }