Refactoring and other stuff

This commit is contained in:
Zhaofeng Li 2020-12-18 01:27:44 -08:00
parent 65cb370bef
commit d0ed138ef0
14 changed files with 595 additions and 392 deletions

16
Cargo.lock generated
View file

@ -87,6 +87,7 @@ dependencies = [
"glob", "glob",
"indicatif", "indicatif",
"log", "log",
"quit",
"serde", "serde",
"serde_json", "serde_json",
"snafu", "snafu",
@ -444,6 +445,21 @@ dependencies = [
"unicode-xid", "unicode-xid",
] ]
[[package]]
name = "quit"
version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94f4bb36a4259c52bb35ae60baf6242b9b33f7505e834d8d36f6227d21840756"
dependencies = [
"quit_macros",
]
[[package]]
name = "quit_macros"
version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff6923e4138faf97a0aaabf4a7b5ff1b0df28370786bead6200d2202c16e1b4e"
[[package]] [[package]]
name = "quote" name = "quote"
version = "1.0.7" version = "1.0.7"

View file

@ -14,6 +14,7 @@ futures = "0.3.8"
glob = "0.3.0" glob = "0.3.0"
indicatif = "0.15.0" indicatif = "0.15.0"
log = "0.4.11" log = "0.4.11"
quit = "1.1.2"
serde = { version = "1.0.118", features = ["derive"] } serde = { version = "1.0.118", features = ["derive"] }
serde_json = "1.0" serde_json = "1.0"
snafu = "0.6.10" snafu = "0.6.10"

View file

@ -66,11 +66,27 @@ Here is a sample `hive.nix` with two nodes, with some common configurations appl
The full set of options can be found at `src/eval.nix`. The full set of options can be found at `src/eval.nix`.
Run `colmena build` in the same directory to build the configuration, or do `colmena apply` to deploy it to all nodes. Run `colmena build` in the same directory to build the configuration, or do `colmena apply` to deploy it to all nodes.
## `colmena introspect`
Sometimes you may want to extract values from your Hive configuration for consumption in another program (e.g., [OctoDNS](https://github.com/octodns/octodns)).
To do that, create a `.nix` file with a lambda:
```nix
{ nodes, pkgs, lib, ... }:
# Feels like a NixOS module - But you can return any JSON-serializable value
lib.attrsets.mapAttrs (k: v: v.config.deployment.targetHost) nodes
```
Then you can evaluate with:
```
colmena introspect your-lambda.nix
```
## Current limitations ## Current limitations
- It's required to use SSH keys to log into the remote hosts, and interactive authentication will not work. - It's required to use SSH keys to log into the remote hosts, and interactive authentication will not work.
- There is no option to override SSH or `nix-copy-closure` options. - There is no option to override SSH or `nix-copy-closure` options.
- Node tagging is not yet implemented.
- Error reporting is lacking. - Error reporting is lacking.
## Licensing ## Licensing

View file

@ -1,5 +1,5 @@
{ {
pkgs ? import ./pkgs.nix, pkgs ? import ./pkgs.nix {},
}: let }: let
cargo = pkgs.callPackage ./Cargo.nix {}; cargo = pkgs.callPackage ./Cargo.nix {};
in cargo.rootCrate.build in cargo.rootCrate.build

View file

@ -37,28 +37,27 @@ pub fn subcommand() -> App<'static, 'static> {
} }
pub async fn run(_global_args: &ArgMatches<'_>, local_args: &ArgMatches<'_>) { pub async fn run(_global_args: &ArgMatches<'_>, local_args: &ArgMatches<'_>) {
let hive = Hive::from_config_arg(local_args).unwrap(); let mut hive = Hive::from_config_arg(local_args).unwrap();
println!("Enumerating nodes..."); println!("Enumerating nodes...");
let deployment_info = hive.deployment_info().await.unwrap(); let all_nodes = hive.deployment_info().await.unwrap();
let all_nodes: Vec<String> = deployment_info.keys().cloned().collect();
let selected_nodes = match local_args.value_of("on") { let selected_nodes = match local_args.value_of("on") {
Some(filter) => { Some(filter) => {
util::filter_nodes(&all_nodes, filter) util::filter_nodes(&all_nodes, filter)
} }
None => all_nodes.clone(), None => all_nodes.keys().cloned().collect(),
}; };
if selected_nodes.len() == 0 { if selected_nodes.len() == 0 {
println!("No hosts matched. Exiting..."); println!("No hosts matched. Exiting...");
return; quit::with_code(2);
} }
if selected_nodes.len() == all_nodes.len() { if selected_nodes.len() == all_nodes.len() {
println!("Building all node configurations..."); println!("Building all node configurations...");
} else { } else {
println!("Selected {} out of {} hosts. Building node configurations...", selected_nodes.len(), deployment_info.len()); println!("Selected {} out of {} hosts. Building node configurations...", selected_nodes.len(), all_nodes.len());
} }
// Some ugly argument mangling :/ // Some ugly argument mangling :/
@ -76,7 +75,7 @@ pub async fn run(_global_args: &ArgMatches<'_>, local_args: &ArgMatches<'_>) {
for (name, profile) in profiles.iter() { for (name, profile) in profiles.iter() {
let task = DeploymentTask::new( let task = DeploymentTask::new(
name.clone(), name.clone(),
deployment_info.get(name).unwrap().clone(), all_nodes.get(name).unwrap().to_host(),
profile.clone(), profile.clone(),
goal, goal,
); );

View file

@ -17,28 +17,27 @@ pub fn subcommand() -> App<'static, 'static> {
} }
pub async fn run(_global_args: &ArgMatches<'_>, local_args: &ArgMatches<'_>) { pub async fn run(_global_args: &ArgMatches<'_>, local_args: &ArgMatches<'_>) {
let hive = Hive::from_config_arg(local_args).unwrap(); let mut hive = Hive::from_config_arg(local_args).unwrap();
println!("Enumerating nodes..."); println!("Enumerating nodes...");
let deployment_info = hive.deployment_info().await.unwrap(); let all_nodes = hive.deployment_info().await.unwrap();
let all_nodes: Vec<String> = deployment_info.keys().cloned().collect();
let selected_nodes = match local_args.value_of("on") { let selected_nodes = match local_args.value_of("on") {
Some(filter) => { Some(filter) => {
util::filter_nodes(&all_nodes, filter) util::filter_nodes(&all_nodes, filter)
} }
None => all_nodes.clone(), None => all_nodes.keys().cloned().collect(),
}; };
if selected_nodes.len() == 0 { if selected_nodes.len() == 0 {
println!("No hosts matched. Exiting..."); println!("No hosts matched. Exiting...");
return; quit::with_code(2);
} }
if selected_nodes.len() == all_nodes.len() { if selected_nodes.len() == all_nodes.len() {
println!("Building all node configurations..."); println!("Building all node configurations...");
} else { } else {
println!("Selected {} out of {} hosts. Building node configurations...", selected_nodes.len(), deployment_info.len()); println!("Selected {} out of {} hosts. Building node configurations...", selected_nodes.len(), all_nodes.len());
} }
hive.build_selected(selected_nodes).await.unwrap(); hive.build_selected(selected_nodes).await.unwrap();

52
src/command/introspect.rs Normal file
View file

@ -0,0 +1,52 @@
use std::path::PathBuf;
use clap::{Arg, App, SubCommand, ArgMatches};
use crate::nix::Hive;
pub fn subcommand() -> App<'static, 'static> {
let command = SubCommand::with_name("introspect")
.about("Evaluate expressions using the complete configuration.")
.long_about(r#"Your expression should take an attribute set with keys `pkgs`, `lib` and `nodes` (like a NixOS module) and return a JSON-serializable value.
For example, to retrieve the configuration of one node, you may write something like:
{ nodes, ... }: nodes.node-a.config.networking.hostName
"#)
.arg(Arg::with_name("expression_file")
.index(1)
.help("The .nix file containing the expression")
.takes_value(true))
.arg(Arg::with_name("expression")
.short("E")
.help("The Nix expression")
.takes_value(true))
.arg(Arg::with_name("config")
.short("f")
.long("config")
.help("Path to a Hive expression")
.default_value("hive.nix")
.required(true))
;
command
}
pub async fn run(_global_args: &ArgMatches<'_>, local_args: &ArgMatches<'_>) {
let mut hive = Hive::from_config_arg(local_args).unwrap();
if !(local_args.is_present("expression") ^ local_args.is_present("expression_file")) {
eprintln!("Either an expression (-E) xor a .nix file containing an expression should be specified, not both.");
quit::with_code(1);
}
let expression = if local_args.is_present("expression") {
local_args.value_of("expression").unwrap().to_string()
} else {
let path: PathBuf = local_args.value_of("expression_file").unwrap().into();
format!("import {}", path.canonicalize().expect("Could not generate absolute path to expression file.").to_str().unwrap())
};
let result = hive.introspect(expression).await.unwrap();
println!("{}", result);
}

View file

@ -1,2 +1,3 @@
pub mod build; pub mod build;
pub mod apply; pub mod apply;
pub mod introspect;

View file

@ -1,14 +1,15 @@
use std::cmp::min; use std::cmp::min;
use std::sync::{Arc, Mutex}; use std::sync::Arc;
use indicatif::{MultiProgress, ProgressBar, ProgressDrawTarget}; use indicatif::{MultiProgress, ProgressBar, ProgressDrawTarget};
use futures::future::join_all; use futures::future::join_all;
use tokio::sync::Mutex;
use crate::nix::{DeploymentTask, DeploymentResult}; use crate::nix::DeploymentTask;
use crate::progress::get_spinner_styles; use crate::progress::get_spinner_styles;
/// User-facing deploy routine /// User-facing deploy routine
pub async fn deploy(tasks: Vec<DeploymentTask<'static>>, max_parallelism: Option<usize>, progress_bar: bool) { pub async fn deploy(tasks: Vec<DeploymentTask>, max_parallelism: Option<usize>, progress_bar: bool) {
let parallelism = match max_parallelism { let parallelism = match max_parallelism {
Some(limit) => { Some(limit) => {
min(limit, tasks.len()) min(limit, tasks.len())
@ -32,7 +33,7 @@ pub async fn deploy(tasks: Vec<DeploymentTask<'static>>, max_parallelism: Option
} }
let tasks = Arc::new(Mutex::new(tasks)); let tasks = Arc::new(Mutex::new(tasks));
let result_list: Arc<Mutex<Vec<DeploymentResult>>> = Arc::new(Mutex::new(Vec::new())); let result_list: Arc<Mutex<Vec<(DeploymentTask, bool)>>> = Arc::new(Mutex::new(Vec::new()));
let mut futures = Vec::new(); let mut futures = Vec::new();
@ -48,7 +49,7 @@ pub async fn deploy(tasks: Vec<DeploymentTask<'static>>, max_parallelism: Option
// Perform tasks until there's none // Perform tasks until there's none
loop { loop {
let (task, remaining) = { let (task, remaining) = {
let mut tasks = tasks.lock().unwrap(); let mut tasks = tasks.lock().await;
let task = tasks.pop(); let task = tasks.pop();
let remaining = tasks.len(); let remaining = tasks.len();
(task, remaining) (task, remaining)
@ -68,25 +69,23 @@ pub async fn deploy(tasks: Vec<DeploymentTask<'static>>, max_parallelism: Option
bar.inc(0); bar.inc(0);
if progress_bar { if progress_bar {
task.set_progress_bar(&bar); task.set_progress_bar(bar.clone()).await;
task.set_failing_spinner_style(failing_spinner_style.clone());
} }
match task.execute().await { match task.execute().await {
Ok(result) => { Ok(_) => {
if !result.success() {
bar.abandon_with_message("Failed")
} else {
bar.finish_with_message(task.goal().success_str().unwrap()); bar.finish_with_message(task.goal().success_str().unwrap());
}
bar.inc(0); let mut result_list = result_list.lock().await;
let mut result_list = result_list.lock().unwrap(); result_list.push((task, true));
result_list.push(result);
}, },
Err(e) => { Err(e) => {
println!("An error occurred while pushing to {}: {:?}", task.name(), e); println!("An error occurred while pushing to {}: {:?}", task.name(), e);
bar.set_style(failing_spinner_style.clone()); bar.set_style(failing_spinner_style.clone());
bar.abandon_with_message("Internal error"); bar.abandon_with_message("Internal error");
let mut result_list = result_list.lock().await;
result_list.push((task, false));
}, },
} }
@ -109,10 +108,25 @@ pub async fn deploy(tasks: Vec<DeploymentTask<'static>>, max_parallelism: Option
join_all(futures).await; join_all(futures).await;
let result_list = result_list.lock().unwrap(); let mut result_list = result_list.lock().await;
for result in result_list.iter() { for (task, success) in result_list.drain(..) {
if !result.success() { if !success {
println!("{}", result); let name = task.name().to_owned();
let host = task.to_host().await;
print!("Failed to deploy to {}. ", name);
if let Some(logs) = host.dump_logs().await {
if let Some(lines) = logs.chunks(10).rev().next() {
println!("Last {} lines of logs:", lines.len());
for line in lines {
println!("{}", line);
}
} else {
println!("The log is empty.");
}
} else {
println!("Logs are not available for this target.");
}
} }
} }
} }

View file

@ -6,26 +6,37 @@ mod progress;
mod deployment; mod deployment;
mod util; mod util;
macro_rules! command {
($name:ident, $matches:ident) => {
if let Some(sub_matches) = $matches.subcommand_matches(stringify!($name)) {
command::$name::run(&$matches, &sub_matches).await;
return;
}
}
}
macro_rules! bind_command {
($name:ident, $app:ident) => {
$app = $app.subcommand(command::$name::subcommand());
}
}
#[tokio::main(flavor = "multi_thread")] #[tokio::main(flavor = "multi_thread")]
async fn main() -> Result<(), Box<dyn std::error::Error>>{ async fn main() {
let matches = App::new("Colmena") let mut app = App::new("Colmena")
.version("0.1.0") .version("0.1.0")
.author("Zhaofeng Li <hello@zhaofeng.li>") .author("Zhaofeng Li <hello@zhaofeng.li>")
.about("NixOS deployment tool") .about("NixOS deployment tool")
.global_setting(AppSettings::ColoredHelp) .global_setting(AppSettings::ColoredHelp)
.setting(AppSettings::ArgRequiredElseHelp) .setting(AppSettings::ArgRequiredElseHelp);
.subcommand(command::apply::subcommand())
.subcommand(command::build::subcommand())
.get_matches();
if let Some(sub_matches) = matches.subcommand_matches("build") { bind_command!(apply, app);
command::build::run(&matches, &sub_matches).await; bind_command!(build, app);
return Ok(()); bind_command!(introspect, app);
}
if let Some(sub_matches) = matches.subcommand_matches("apply") {
command::apply::run(&matches, &sub_matches).await;
return Ok(());
}
Ok(()) let matches = app.get_matches();
command!(apply, matches);
command!(build, matches);
command!(introspect, matches);
} }

View file

@ -107,7 +107,7 @@ let
value = evalNode name hive.${name}; value = evalNode name hive.${name};
}) nodeNames); }) nodeNames);
deploymentInfoJson = toJSON (lib.attrsets.mapAttrs (name: eval: eval.config.deployment) nodes); deploymentConfigJson = toJSON (lib.attrsets.mapAttrs (name: eval: eval.config.deployment) nodes);
toplevel = lib.attrsets.mapAttrs (name: eval: eval.config.system.build.toplevel) nodes; toplevel = lib.attrsets.mapAttrs (name: eval: eval.config.system.build.toplevel) nodes;
@ -126,6 +126,10 @@ let
echo "$json" > $out echo "$json" > $out
''; '';
}; };
introspect = function: function {
inherit pkgs lib nodes;
};
in { in {
inherit nodes deploymentInfoJson toplevel buildAll buildSelected; inherit nodes deploymentConfigJson toplevel buildAll buildSelected introspect;
} }

218
src/nix/host.rs Normal file
View file

@ -0,0 +1,218 @@
use std::process::Stdio;
use std::collections::HashSet;
use console::style;
use async_trait::async_trait;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::Command;
use indicatif::ProgressBar;
use super::{StorePath, DeploymentGoal, NixResult, NixError, NixCommand};
pub(crate) fn local() -> Box<dyn Host + 'static> {
Box::new(Local {})
}
#[derive(Copy, Clone, Debug)]
pub enum CopyDirection {
ToRemote,
FromRemote,
}
/// A Nix(OS) host.
///
/// The underlying implementation must be Send and Sync.
#[async_trait]
pub trait Host: Send + Sync + std::fmt::Debug {
/// Sends or receives the specified closure to the host
///
/// The StorePath and its dependent paths will then exist on this host.
async fn copy_closure(&mut self, closure: &StorePath, direction: CopyDirection, include_outputs: bool) -> NixResult<()>;
/// Realizes the specified derivation on the host
///
/// The derivation must already exist on the host.
/// After realization, paths in the Vec<StorePath> will then
/// exist on the host.
async fn realize_remote(&mut self, derivation: &StorePath) -> NixResult<Vec<StorePath>>;
/// Realizes the specified local derivation on the host then retrieves the outputs.
async fn realize(&mut self, derivation: &StorePath) -> NixResult<Vec<StorePath>> {
self.copy_closure(derivation, CopyDirection::ToRemote, false).await?;
let paths = self.realize_remote(derivation).await?;
self.copy_closure(derivation, CopyDirection::FromRemote, true).await?;
Ok(paths)
}
#[allow(unused_variables)]
/// Activates a system profile on the host, if it runs NixOS.
async fn activate(&mut self, profile: &StorePath, goal: DeploymentGoal) -> NixResult<()> {
Err(NixError::Unsupported)
}
#[allow(unused_variables)]
/// Provides a ProgressBar to use during operations.
fn set_progress_bar(&mut self, bar: ProgressBar) {
}
/// Dumps human-readable unstructured log messages related to the host.
async fn dump_logs(&self) -> Option<&[String]> {
None
}
}
/// The local machine running Colmena.
///
/// It may not be capable of realizing some derivations
/// (e.g., building Linux derivations on macOS).
#[derive(Debug)]
pub struct Local {}
#[async_trait]
impl Host for Local {
async fn copy_closure(&mut self, _closure: &StorePath, _direction: CopyDirection, _include_outputs: bool) -> NixResult<()> {
Ok(())
}
async fn realize_remote(&mut self, derivation: &StorePath) -> NixResult<Vec<StorePath>> {
Command::new("nix-store")
.arg("--realise")
.arg(derivation.as_path())
.capture_output()
.await
.map(|paths| {
paths.lines().map(|p| p.to_string().into()).collect()
})
}
}
/// A remote machine connected over SSH.
#[derive(Debug)]
pub struct SSH {
/// The username to use to connect.
user: String,
/// The hostname or IP address to connect to.
host: String,
friendly_name: String,
path_cache: HashSet<StorePath>,
progress: Option<ProgressBar>,
logs: Vec<String>,
}
#[async_trait]
impl Host for SSH {
async fn copy_closure(&mut self, closure: &StorePath, direction: CopyDirection, include_outputs: bool) -> NixResult<()> {
let command = self.nix_copy_closure(closure, direction, include_outputs);
self.run_command(command).await
}
async fn realize_remote(&mut self, derivation: &StorePath) -> NixResult<Vec<StorePath>> {
// FIXME
self.ssh(&["nix-store", "--realise", derivation.as_path().to_str().unwrap()])
.capture_output()
.await
.map(|paths| {
paths.lines().map(|p| p.to_string().into()).collect()
})
}
async fn activate(&mut self, profile: &StorePath, goal: DeploymentGoal) -> NixResult<()> {
let activation_command = format!("{}/bin/switch-to-configuration", profile.as_path().to_str().unwrap());
let command = self.ssh(&[&activation_command, goal.as_str().unwrap()]);
self.run_command(command).await
}
fn set_progress_bar(&mut self, bar: ProgressBar) {
self.progress = Some(bar);
}
async fn dump_logs(&self) -> Option<&[String]> {
Some(&self.logs)
}
}
impl SSH {
pub fn new(user: String, host: String) -> SSH {
let friendly_name = host.clone();
Self {
user,
host,
friendly_name,
path_cache: HashSet::new(),
progress: None,
logs: Vec::new(),
}
}
async fn run_command(&mut self, mut command: Command) -> NixResult<()> {
command.stdin(Stdio::null());
command.stdout(Stdio::piped());
command.stderr(Stdio::piped());
let mut child = command.spawn()?;
let mut stderr = BufReader::new(child.stderr.as_mut().unwrap());
loop {
let mut line = String::new();
let len = stderr.read_line(&mut line).await.unwrap();
if len == 0 {
break;
}
let trimmed = line.trim_end();
if let Some(progress) = self.progress.as_mut() {
progress.set_message(trimmed);
progress.inc(0);
} else {
println!("{} | {}", style(&self.friendly_name).cyan(), trimmed);
}
self.logs.push(line);
}
let exit = child.wait().await?;
if exit.success() {
Ok(())
} else {
Err(NixError::NixFailure { exit_code: exit.code().unwrap() })
}
}
fn ssh_target(&self) -> String {
format!("{}@{}", self.user, self.host)
}
fn nix_copy_closure(&self, path: &StorePath, direction: CopyDirection, include_outputs: bool) -> Command {
let mut command = Command::new("nix-copy-closure");
match direction {
CopyDirection::ToRemote => {
command.arg("--to");
}
CopyDirection::FromRemote => {
command.arg("--from");
}
}
if include_outputs {
command.arg("--include-outputs");
}
command
.arg("--gzip")
.arg("--use-substitutes")
.arg(&self.ssh_target())
.arg(path.as_path());
command
}
fn ssh(&self, command: &[&str]) -> Command {
// TODO: Allow configuation of SSH parameters
let mut cmd = Command::new("ssh");
cmd.arg(self.ssh_target())
.args(&["-o", "StrictHostKeyChecking=accept-new"])
.arg("--")
.args(command);
cmd
}
}

View file

@ -1,28 +1,135 @@
//! A Colmena Hive.
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::convert::AsRef; use std::convert::AsRef;
use std::io::Write; use std::io::Write;
use std::process::{ExitStatus, Stdio}; use std::process::Stdio;
use std::collections::HashMap; use std::collections::HashMap;
use std::fs; use std::fs;
use std::fmt;
use console::style;
use async_trait::async_trait; use async_trait::async_trait;
use clap::ArgMatches; use clap::ArgMatches;
use indicatif::{ProgressBar, ProgressStyle}; use indicatif::ProgressBar;
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
use serde::{Serialize, Deserialize}; use serde::{Serialize, Deserialize};
use snafu::Snafu; use snafu::Snafu;
use tempfile::{NamedTempFile, TempPath}; use tempfile::{NamedTempFile, TempPath};
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::Command; use tokio::process::Command;
use tokio::sync::Mutex;
mod host;
pub use host::{Host, CopyDirection};
use host::SSH;
const HIVE_EVAL: &'static [u8] = include_bytes!("eval.nix"); const HIVE_EVAL: &'static [u8] = include_bytes!("eval.nix");
pub type NixResult<T> = Result<T, NixError>;
#[derive(Debug, Snafu)]
pub enum NixError {
#[snafu(display("I/O Error: {}", error))]
IoError { error: std::io::Error },
#[snafu(display("Nix returned invalid response: {}", output))]
BadOutput { output: String },
#[snafu(display("Nix exited with error code: {}", exit_code))]
NixFailure { exit_code: i32 },
#[snafu(display("Nix was interrupted"))]
NixKilled,
#[snafu(display("This operation is not supported"))]
Unsupported,
#[snafu(display("Nix Error: {}", message))]
Unknown { message: String },
}
impl From<std::io::Error> for NixError {
fn from(error: std::io::Error) -> Self {
Self::IoError { error }
}
}
pub struct Hive {
hive: PathBuf,
eval_nix: TempPath,
builder: Box<dyn Host>,
}
impl Hive {
pub fn new<P: AsRef<Path>>(hive: P) -> NixResult<Self> {
let mut eval_nix = NamedTempFile::new()?;
eval_nix.write_all(HIVE_EVAL)?;
Ok(Self {
hive: hive.as_ref().to_owned(),
eval_nix: eval_nix.into_temp_path(),
builder: host::local(),
})
}
pub fn from_config_arg(args: &ArgMatches<'_>) -> NixResult<Self> {
let path = args.value_of("config").expect("The config arg should exist").to_owned();
let path = canonicalize_path(path);
Self::new(path)
}
/// Retrieve deployment info for all nodes
pub async fn deployment_info(&self) -> NixResult<HashMap<String, DeploymentConfig>> {
// FIXME: Really ugly :(
let s: String = self.nix_instantiate("hive.deploymentConfigJson").eval()
.capture_json().await?;
Ok(serde_json::from_str(&s).unwrap())
}
/// Builds selected nodes
pub async fn build_selected(&mut self, nodes: Vec<String>) -> NixResult<HashMap<String, StorePath>> {
let nodes_expr = SerializedNixExpresssion::new(&nodes)?;
let expr = format!("hive.buildSelected {{ names = {}; }}", nodes_expr.expression());
self.build_common(&expr).await
}
#[allow(dead_code)]
/// Builds all node configurations
pub async fn build_all(&mut self) -> NixResult<HashMap<String, StorePath>> {
self.build_common("hive.buildAll").await
}
/// Evaluates an expression using values from the configuration
pub async fn introspect(&mut self, expression: String) -> NixResult<String> {
let expression = format!("toJSON (hive.introspect ({}))", expression);
self.nix_instantiate(&expression).eval()
.capture_json().await
}
/// Builds node configurations
///
/// Expects the resulting store path to point to a JSON file containing
/// a map of node name -> store path.
async fn build_common(&mut self, expression: &str) -> NixResult<HashMap<String, StorePath>> {
let build: StorePath = self.nix_instantiate(expression).instantiate()
.capture_store_path().await?;
let realization = self.builder.realize(&build).await?;
assert!(realization.len() == 1);
let json = fs::read_to_string(&realization[0].as_path())?;
let result_map = serde_json::from_str(&json)
.expect("Bad result from our own build routine");
Ok(result_map)
}
fn nix_instantiate(&self, expression: &str) -> NixInstantiate {
NixInstantiate::new(&self.eval_nix, &self.hive, expression.to_owned())
}
}
#[derive(Debug, Clone, Deserialize)] #[derive(Debug, Clone, Deserialize)]
pub struct DeploymentInfo { pub struct DeploymentConfig {
#[serde(rename = "targetHost")] #[serde(rename = "targetHost")]
target_host: String, target_host: String,
@ -31,25 +138,12 @@ pub struct DeploymentInfo {
tags: Vec<String>, tags: Vec<String>,
} }
#[derive(Debug)] impl DeploymentConfig {
pub struct DeploymentTask<'task> { pub fn tags(&self) -> &[String] { &self.tags }
/// Name of the target. pub fn to_host(&self) -> Box<dyn Host> {
name: String, let host = SSH::new(self.target_user.clone(), self.target_host.clone());
Box::new(host)
/// The target to deploy to. }
target: DeploymentInfo,
/// Nix store path to the system profile to deploy.
profile: PathBuf,
/// The goal of this deployment.
goal: DeploymentGoal,
/// A ProgressBar to show the deployment progress to the user.
progress: Option<&'task ProgressBar>,
/// The ProgressStyle to set when the deployment is failing.
failing_spinner_style: Option<ProgressStyle>,
} }
#[derive(Debug, Copy, Clone)] #[derive(Debug, Copy, Clone)]
@ -105,112 +199,6 @@ impl DeploymentGoal {
} }
} }
/// Results of a DeploymentTask to show to the user.
pub struct DeploymentResult {
name: String,
push_output: Option<String>,
push_successful: Option<bool>,
activate_output: Option<String>,
activate_successful: Option<bool>,
}
impl DeploymentResult {
fn new(name: String) -> Self {
Self {
name,
push_output: None,
push_successful: None,
activate_output: None,
activate_successful: None,
}
}
/// Whether the deployment was successful overall.
pub fn success(&self) -> bool {
if let Some(push_successful) = self.push_successful {
if !push_successful {
return false;
}
}
if let Some(activate_successful) = self.activate_successful {
if !activate_successful {
return false;
}
}
true
}
fn dump_log(f: &mut fmt::Formatter<'_>, output: Option<&String>) -> fmt::Result {
if let Some(output) = output {
writeln!(f, "Last 10 lines of log:")?;
writeln!(f, "~~~~~~~~~~")?;
let lines: Vec<&str> = output.split("\n").collect();
let start = if lines.len() < 10 {
0
} else {
lines.len() - 10
};
for i in start..lines.len() {
writeln!(f, "{}", lines[i])?;
}
writeln!(f, "~~~~~~~~~~")?;
}
writeln!(f)
}
}
impl fmt::Display for DeploymentResult {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if let Some(push_successful) = self.push_successful {
if push_successful {
writeln!(f, "Deployment on node {} succeeded.", self.name)?;
} else {
write!(f, "Deployment on node {} failed. ", self.name)?;
Self::dump_log(f, self.push_output.as_ref())?;
}
}
if let Some(activate_successful) = self.activate_successful {
if activate_successful {
writeln!(f, "Activation on node {} succeeded.", self.name)?;
} else {
write!(f, "Activation on node {} failed.", self.name)?;
Self::dump_log(f, self.activate_output.as_ref())?;
}
}
Ok(())
}
}
#[derive(Debug, Snafu)]
pub enum NixError {
#[snafu(display("I/O Error: {}", error))]
IoError { error: std::io::Error },
#[snafu(display("Nix returned invalid response: {}", output))]
BadOutput { output: String },
#[snafu(display("Nix exited with error code: {}", exit_code))]
NixFailure { exit_code: i32 },
#[snafu(display("Nix was interrupted"))]
NixKilled,
#[snafu(display("Nix Error: {}", message))]
Unknown { message: String },
}
pub type NixResult<T> = Result<T, NixError>;
pub struct Hive {
hive: PathBuf,
eval_nix: TempPath,
}
struct NixInstantiate<'hive> { struct NixInstantiate<'hive> {
eval_nix: &'hive Path, eval_nix: &'hive Path,
hive: &'hive Path, hive: &'hive Path,
@ -263,11 +251,9 @@ impl NixCommand for Command {
/// Runs the command with stdout and stderr passed through to the user. /// Runs the command with stdout and stderr passed through to the user.
async fn passthrough(&mut self) -> NixResult<()> { async fn passthrough(&mut self) -> NixResult<()> {
let exit = self let exit = self
.spawn() .spawn()?
.map_err(map_io_error)?
.wait() .wait()
.await .await?;
.map_err(map_io_error)?;
if exit.success() { if exit.success() {
Ok(()) Ok(())
@ -285,11 +271,9 @@ impl NixCommand for Command {
let output = self let output = self
.stdout(Stdio::piped()) .stdout(Stdio::piped())
.stderr(Stdio::inherit()) .stderr(Stdio::inherit())
.spawn() .spawn()?
.map_err(map_io_error)?
.wait_with_output() .wait_with_output()
.await .await?;
.map_err(map_io_error)?;
if output.status.success() { if output.status.success() {
// FIXME: unwrap // FIXME: unwrap
@ -318,20 +302,25 @@ impl NixCommand for Command {
} }
/// A Nix store path. /// A Nix store path.
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
struct StorePath(PathBuf); pub struct StorePath(PathBuf);
impl StorePath { impl StorePath {
/// Builds the store path. /// Returns the store path
pub async fn realise(&self) -> NixResult<Vec<PathBuf>> { pub fn as_path(&self) -> &Path {
Command::new("nix-store") &self.0
.arg("--realise") }
.arg(&self.0) }
.capture_output()
.await impl From<String> for StorePath {
.map(|paths| { fn from(s: String) -> Self {
paths.lines().map(|p| p.into()).collect() Self(s.into())
}) }
}
impl Into<PathBuf> for StorePath {
fn into(self) -> PathBuf {
self.0
} }
} }
@ -347,9 +336,9 @@ struct SerializedNixExpresssion {
impl SerializedNixExpresssion { impl SerializedNixExpresssion {
pub fn new<'de, T>(data: T) -> NixResult<Self> where T: Serialize { pub fn new<'de, T>(data: T) -> NixResult<Self> where T: Serialize {
let mut tmp = NamedTempFile::new().map_err(map_io_error)?; let mut tmp = NamedTempFile::new()?;
let json = serde_json::to_vec(&data).expect("Could not serialize data"); let json = serde_json::to_vec(&data).expect("Could not serialize data");
tmp.write_all(&json).map_err(map_io_error)?; tmp.write_all(&json)?;
Ok(Self { Ok(Self {
json_file: tmp.into_temp_path(), json_file: tmp.into_temp_path(),
@ -361,84 +350,28 @@ impl SerializedNixExpresssion {
} }
} }
impl Hive { #[derive(Debug)]
pub fn new<P: AsRef<Path>>(hive: P) -> NixResult<Self> { pub struct DeploymentTask {
let mut eval_nix = NamedTempFile::new().map_err(map_io_error)?; /// Name of the target.
eval_nix.write_all(HIVE_EVAL).map_err(map_io_error)?; name: String,
Ok(Self { /// The target to deploy to.
hive: hive.as_ref().to_owned(), target: Mutex<Box<dyn Host>>,
eval_nix: eval_nix.into_temp_path(),
})
}
pub fn from_config_arg(args: &ArgMatches<'_>) -> NixResult<Self> { /// Nix store path to the system profile to deploy.
let path = args.value_of("config").expect("The config arg should exist").to_owned(); profile: StorePath,
let path = canonicalize_path(path);
Self::new(path) /// The goal of this deployment.
} goal: DeploymentGoal,
/// Retrieve a list of nodes in the hive
pub async fn nodes(&self) -> NixResult<Vec<String>> {
self.nix_instantiate("attrNames hive.nodes").eval()
.capture_json().await
}
/// Retrieve deployment info for all nodes
pub async fn deployment_info(&self) -> NixResult<HashMap<String, DeploymentInfo>> {
// FIXME: Really ugly :(
let s: String = self.nix_instantiate("hive.deploymentInfoJson").eval()
.capture_json().await?;
Ok(serde_json::from_str(&s).unwrap())
}
/// Builds selected nodes
pub async fn build_selected(&self, nodes: Vec<String>) -> NixResult<HashMap<String, PathBuf>> {
let nodes_expr = SerializedNixExpresssion::new(&nodes)?;
let expr = format!("hive.buildSelected {{ names = {}; }}", nodes_expr.expression());
self.build_common(&expr).await
}
/// Builds all node configurations
pub async fn build_all(&self) -> NixResult<HashMap<String, PathBuf>> {
self.build_common("hive.buildAll").await
}
/// Builds node configurations
///
/// Expects the resulting store path to point to a JSON file containing
/// a map of node name -> store path.
async fn build_common(&self, expression: &str) -> NixResult<HashMap<String, PathBuf>> {
let build: StorePath = self.nix_instantiate(expression).instantiate()
.capture_store_path().await?;
let realization = build.realise().await?;
assert!(realization.len() == 1);
let json = fs::read_to_string(&realization[0]).map_err(map_io_error)?;
let result_map: HashMap<String, PathBuf> = serde_json::from_str(&json)
.expect("Bad result from our own build routine");
Ok(result_map)
}
fn nix_instantiate(&self, expression: &str) -> NixInstantiate {
NixInstantiate::new(&self.eval_nix, &self.hive, expression.to_owned())
}
} }
impl<'task> DeploymentTask<'task> { impl DeploymentTask {
pub fn new(name: String, target: DeploymentInfo, profile: PathBuf, goal: DeploymentGoal) -> Self { pub fn new(name: String, target: Box<dyn Host>, profile: StorePath, goal: DeploymentGoal) -> Self {
Self { Self {
name, name,
target, target: Mutex::new(target),
profile, profile,
goal, goal,
progress: None,
failing_spinner_style: None,
} }
} }
@ -446,16 +379,13 @@ impl<'task> DeploymentTask<'task> {
pub fn goal(&self) -> DeploymentGoal { self.goal } pub fn goal(&self) -> DeploymentGoal { self.goal }
/// Set the progress bar used during deployment. /// Set the progress bar used during deployment.
pub fn set_progress_bar(&mut self, progress: &'task ProgressBar) { pub async fn set_progress_bar(&mut self, progress: ProgressBar) {
self.progress = Some(progress); let mut target = self.target.lock().await;
target.set_progress_bar(progress);
} }
/// Set a spinner style to switch to when the deployment is failing. /// Executes the deployment.
pub fn set_failing_spinner_style(&mut self, style: ProgressStyle) { pub async fn execute(&mut self) -> NixResult<()> {
self.failing_spinner_style = Some(style);
}
pub async fn execute(&mut self) -> NixResult<DeploymentResult> {
match self.goal { match self.goal {
DeploymentGoal::Push => { DeploymentGoal::Push => {
self.push().await self.push().await
@ -466,111 +396,23 @@ impl<'task> DeploymentTask<'task> {
} }
} }
async fn push(&mut self) -> NixResult<DeploymentResult> { /// Takes the Host out, consuming the DeploymentTask.
let mut result = DeploymentResult::new(self.name.clone()); pub async fn to_host(self) -> Box<dyn Host> {
self.target.into_inner()
// Issue of interest:
// https://github.com/NixOS/nix/issues?q=ipv6
let target = format!("{}@{}", self.target.target_user, self.target.target_host);
let mut command = Command::new("nix-copy-closure");
command
.arg("--to")
.arg("--gzip")
.arg("--include-outputs")
.arg("--use-substitutes")
.arg(&target)
.arg(&self.profile);
let (exit, output) = self.run_command(&mut command, false).await?;
if let Some(progress) = self.progress.as_mut() {
if !exit.success() {
if self.failing_spinner_style.is_some() {
let style = self.failing_spinner_style.as_ref().unwrap().clone();
progress.set_style(style);
}
}
} }
result.push_successful = Some(exit.success()); async fn push(&mut self) -> NixResult<()> {
result.push_output = output; let mut target = self.target.lock().await;
target.copy_closure(&self.profile, CopyDirection::ToRemote, true).await
Ok(result)
} }
async fn push_and_activate(&mut self) -> NixResult<DeploymentResult> { async fn push_and_activate(&mut self) -> NixResult<()> {
let mut result = self.push().await?; self.push().await?;
{
if !result.success() { let mut target = self.target.lock().await;
// Don't go any further target.activate(&self.profile, self.goal).await
return Ok(result);
}
let target = format!("{}@{}", self.target.target_user, self.target.target_host);
let activation_command = format!("{}/bin/switch-to-configuration", self.profile.to_str().unwrap());
let mut command = Command::new("ssh");
command
.arg("-o")
.arg("StrictHostKeyChecking=accept-new")
.arg(&target)
.arg("--")
.arg(activation_command)
.arg(self.goal.as_str().unwrap());
let (exit, output) = self.run_command(&mut command, true).await?;
if let Some(progress) = self.progress.as_mut() {
if !exit.success() {
if self.failing_spinner_style.is_some() {
let style = self.failing_spinner_style.as_ref().unwrap().clone();
progress.set_style(style);
} }
} }
}
result.activate_successful = Some(exit.success());
result.activate_output = output;
Ok(result)
}
async fn run_command(&mut self, command: &mut Command, capture_stdout: bool) -> NixResult<(ExitStatus, Option<String>)> {
command.stdin(Stdio::null());
command.stderr(Stdio::piped());
if capture_stdout {
command.stdout(Stdio::piped());
}
let mut child = command.spawn().map_err(map_io_error)?;
let mut stderr = BufReader::new(child.stderr.as_mut().unwrap());
let mut output = String::new();
loop {
let mut line = String::new();
let len = stderr.read_line(&mut line).await.unwrap();
if len == 0 {
break;
}
let trimmed = line.trim_end();
if let Some(progress) = self.progress.as_mut() {
progress.set_message(trimmed);
progress.inc(0);
} else {
println!("{} | {}", style(&self.name).cyan(), trimmed);
}
output += &line;
}
let exit = child.wait().await.map_err(map_io_error)?;
Ok((exit, Some(output)))
}
}
fn map_io_error(error: std::io::Error) -> NixError {
NixError::IoError { error }
} }
fn canonicalize_path(path: String) -> PathBuf { fn canonicalize_path(path: String) -> PathBuf {

View file

@ -1,21 +1,50 @@
use std::collections::HashMap;
use clap::{Arg, App}; use clap::{Arg, App};
use glob::Pattern as GlobPattern; use glob::Pattern as GlobPattern;
pub fn filter_nodes(nodes: &Vec<String>, filter: &str) -> Vec<String> { use super::nix::DeploymentConfig;
let filters: Vec<GlobPattern> = filter.split(",").map(|pattern| GlobPattern::new(pattern).unwrap()).collect();
enum NodeFilter {
NameFilter(GlobPattern),
TagFilter(GlobPattern),
}
pub fn filter_nodes(nodes: &HashMap<String, DeploymentConfig>, filter: &str) -> Vec<String> {
let filters: Vec<NodeFilter> = filter.split(",").map(|pattern| {
use NodeFilter::*;
if let Some(tag_pattern) = pattern.strip_prefix("@") {
TagFilter(GlobPattern::new(tag_pattern).unwrap())
} else {
NameFilter(GlobPattern::new(pattern).unwrap())
}
}).collect();
if filters.len() > 0 { if filters.len() > 0 {
nodes.iter().filter(|name| { nodes.iter().filter_map(|(name, node)| {
for filter in filters.iter() { for filter in filters.iter() {
if filter.matches(name) { use NodeFilter::*;
return true; match filter {
TagFilter(pat) => {
// Welp
for tag in node.tags() {
if pat.matches(tag) {
return Some(name);
}
}
}
NameFilter(pat) => {
if pat.matches(name) {
return Some(name)
}
}
} }
} }
false None
}).cloned().collect() }).cloned().collect()
} else { } else {
nodes.to_owned() nodes.keys().cloned().collect()
} }
} }
@ -30,11 +59,12 @@ pub fn register_common_args<'a, 'b>(command: App<'a, 'b>) -> App<'a, 'b> {
.arg(Arg::with_name("on") .arg(Arg::with_name("on")
.long("on") .long("on")
.help("Select a list of machines") .help("Select a list of machines")
.long_help(r#"The list is comma-separated and globs are supported. .long_help(r#"The list is comma-separated and globs are supported. To match tags, prepend the filter by @.
Valid examples: Valid examples:
- host1,host2,host3 - host1,host2,host3
- edge-* - edge-*
- edge-*,core-*"#) - edge-*,core-*
- @a-tag,@tags-can-have-*"#)
.takes_value(true)) .takes_value(true))
} }