forked from DGNum/colmena
Initial commit
This commit is contained in:
commit
e092ba5bb1
14 changed files with 1869 additions and 0 deletions
89
src/command/apply.rs
Normal file
89
src/command/apply.rs
Normal file
|
@ -0,0 +1,89 @@
|
|||
use clap::{Arg, App, SubCommand, ArgMatches};
|
||||
|
||||
use crate::nix::{Hive, DeploymentTask, DeploymentGoal};
|
||||
use crate::deployment::deploy;
|
||||
use crate::util;
|
||||
|
||||
pub fn subcommand() -> App<'static, 'static> {
|
||||
let command = SubCommand::with_name("apply")
|
||||
.about("Apply the configuration")
|
||||
.arg(Arg::with_name("goal")
|
||||
.help("Deployment goal")
|
||||
.long_help("Same as the targets for switch-to-configuration.\n\"push\" means only copying the closures to remote nodes.")
|
||||
.default_value("switch")
|
||||
.index(1)
|
||||
.possible_values(&["push", "switch", "boot", "test", "dry-activate"]))
|
||||
.arg(Arg::with_name("parallel")
|
||||
.short("p")
|
||||
.long("parallel")
|
||||
.help("Parallelism limit")
|
||||
.long_help("Set to 0 to disable parallemism limit.")
|
||||
.default_value("10")
|
||||
.takes_value(true)
|
||||
.validator(|s| {
|
||||
match s.parse::<usize>() {
|
||||
Ok(_) => Ok(()),
|
||||
Err(_) => Err(String::from("The value must be a valid number")),
|
||||
}
|
||||
}))
|
||||
.arg(Arg::with_name("verbose")
|
||||
.short("v")
|
||||
.long("verbose")
|
||||
.help("Be verbose")
|
||||
.long_help("Deactivates the progress spinner and prints every line of output.")
|
||||
.takes_value(false));
|
||||
|
||||
util::register_common_args(command)
|
||||
}
|
||||
|
||||
pub async fn run(_global_args: &ArgMatches<'_>, local_args: &ArgMatches<'_>) {
|
||||
let hive = Hive::from_config_arg(local_args).unwrap();
|
||||
|
||||
println!("Enumerating nodes...");
|
||||
let deployment_info = hive.deployment_info().await.unwrap();
|
||||
let all_nodes: Vec<String> = deployment_info.keys().cloned().collect();
|
||||
|
||||
let selected_nodes = match local_args.value_of("on") {
|
||||
Some(filter) => {
|
||||
util::filter_nodes(&all_nodes, filter)
|
||||
}
|
||||
None => all_nodes.clone(),
|
||||
};
|
||||
|
||||
if selected_nodes.len() == 0 {
|
||||
println!("No hosts matched. Exiting...");
|
||||
return;
|
||||
}
|
||||
|
||||
if selected_nodes.len() == all_nodes.len() {
|
||||
println!("Building all node configurations...");
|
||||
} else {
|
||||
println!("Selected {} out of {} hosts. Building node configurations...", selected_nodes.len(), deployment_info.len());
|
||||
}
|
||||
|
||||
// Some ugly argument mangling :/
|
||||
let profiles = hive.build_selected(selected_nodes).await.unwrap();
|
||||
let goal = DeploymentGoal::from_str(local_args.value_of("goal").unwrap()).unwrap();
|
||||
let verbose = local_args.is_present("verbose");
|
||||
|
||||
let max_parallelism = local_args.value_of("parallel").unwrap().parse::<usize>().unwrap();
|
||||
let max_parallelism = match max_parallelism {
|
||||
0 => None,
|
||||
_ => Some(max_parallelism),
|
||||
};
|
||||
|
||||
let mut task_list: Vec<DeploymentTask> = Vec::new();
|
||||
for (name, profile) in profiles.iter() {
|
||||
let task = DeploymentTask::new(
|
||||
name.clone(),
|
||||
deployment_info.get(name).unwrap().clone(),
|
||||
profile.clone(),
|
||||
goal,
|
||||
);
|
||||
task_list.push(task);
|
||||
}
|
||||
|
||||
println!("Applying configurations...");
|
||||
|
||||
deploy(task_list, max_parallelism, !verbose).await;
|
||||
}
|
47
src/command/build.rs
Normal file
47
src/command/build.rs
Normal file
|
@ -0,0 +1,47 @@
|
|||
use clap::{Arg, App, SubCommand, ArgMatches};
|
||||
|
||||
use crate::nix::Hive;
|
||||
use crate::util;
|
||||
|
||||
pub fn subcommand() -> App<'static, 'static> {
|
||||
let command = SubCommand::with_name("build")
|
||||
.about("Build the configuration")
|
||||
.arg(Arg::with_name("verbose")
|
||||
.short("v")
|
||||
.long("verbose")
|
||||
.help("Be verbose")
|
||||
.long_help("Deactivates the progress spinner and prints every line of output.")
|
||||
.takes_value(false));
|
||||
|
||||
util::register_common_args(command)
|
||||
}
|
||||
|
||||
pub async fn run(_global_args: &ArgMatches<'_>, local_args: &ArgMatches<'_>) {
|
||||
let hive = Hive::from_config_arg(local_args).unwrap();
|
||||
|
||||
println!("Enumerating nodes...");
|
||||
let deployment_info = hive.deployment_info().await.unwrap();
|
||||
let all_nodes: Vec<String> = deployment_info.keys().cloned().collect();
|
||||
|
||||
let selected_nodes = match local_args.value_of("on") {
|
||||
Some(filter) => {
|
||||
util::filter_nodes(&all_nodes, filter)
|
||||
}
|
||||
None => all_nodes.clone(),
|
||||
};
|
||||
|
||||
if selected_nodes.len() == 0 {
|
||||
println!("No hosts matched. Exiting...");
|
||||
return;
|
||||
}
|
||||
|
||||
if selected_nodes.len() == all_nodes.len() {
|
||||
println!("Building all node configurations...");
|
||||
} else {
|
||||
println!("Selected {} out of {} hosts. Building node configurations...", selected_nodes.len(), deployment_info.len());
|
||||
}
|
||||
|
||||
hive.build_selected(selected_nodes).await.unwrap();
|
||||
|
||||
println!("Success!");
|
||||
}
|
2
src/command/mod.rs
Normal file
2
src/command/mod.rs
Normal file
|
@ -0,0 +1,2 @@
|
|||
pub mod build;
|
||||
pub mod apply;
|
118
src/deployment.rs
Normal file
118
src/deployment.rs
Normal file
|
@ -0,0 +1,118 @@
|
|||
use std::cmp::min;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use indicatif::{MultiProgress, ProgressBar, ProgressDrawTarget};
|
||||
use futures::future::join_all;
|
||||
|
||||
use crate::nix::{DeploymentTask, DeploymentResult};
|
||||
use crate::progress::get_spinner_styles;
|
||||
|
||||
/// User-facing deploy routine
|
||||
pub async fn deploy(tasks: Vec<DeploymentTask<'static>>, max_parallelism: Option<usize>, progress_bar: bool) {
|
||||
let parallelism = match max_parallelism {
|
||||
Some(limit) => {
|
||||
min(limit, tasks.len())
|
||||
}
|
||||
None => {
|
||||
tasks.len()
|
||||
}
|
||||
};
|
||||
|
||||
let node_name_alignment = tasks.iter().map(|task| task.name().len()).max().unwrap();
|
||||
|
||||
let multi = Arc::new(MultiProgress::new());
|
||||
let root_bar = Arc::new(multi.add(ProgressBar::new(tasks.len() as u64)));
|
||||
multi.set_draw_target(ProgressDrawTarget::stderr_nohz());
|
||||
|
||||
{
|
||||
let (spinner_style, _) = get_spinner_styles(node_name_alignment);
|
||||
root_bar.set_message("Running...");
|
||||
root_bar.set_style(spinner_style);
|
||||
root_bar.inc(0);
|
||||
}
|
||||
|
||||
let tasks = Arc::new(Mutex::new(tasks));
|
||||
let result_list: Arc<Mutex<Vec<DeploymentResult>>> = Arc::new(Mutex::new(Vec::new()));
|
||||
|
||||
let mut futures = Vec::new();
|
||||
|
||||
for _ in 0..parallelism {
|
||||
let tasks = tasks.clone();
|
||||
let result_list = result_list.clone();
|
||||
let multi = multi.clone();
|
||||
let (spinner_style, failing_spinner_style) = get_spinner_styles(node_name_alignment);
|
||||
|
||||
let root_bar = root_bar.clone();
|
||||
|
||||
let future = tokio::spawn(async move {
|
||||
// Perform tasks until there's none
|
||||
loop {
|
||||
let (task, remaining) = {
|
||||
let mut tasks = tasks.lock().unwrap();
|
||||
let task = tasks.pop();
|
||||
let remaining = tasks.len();
|
||||
(task, remaining)
|
||||
};
|
||||
|
||||
if task.is_none() {
|
||||
// We are donzo!
|
||||
return;
|
||||
}
|
||||
|
||||
let mut task = task.unwrap();
|
||||
|
||||
let bar = multi.add(ProgressBar::new(100));
|
||||
bar.set_style(spinner_style.clone());
|
||||
bar.set_prefix(task.name());
|
||||
bar.set_message("Starting...");
|
||||
bar.inc(0);
|
||||
|
||||
if progress_bar {
|
||||
task.set_progress_bar(&bar);
|
||||
task.set_failing_spinner_style(failing_spinner_style.clone());
|
||||
}
|
||||
|
||||
match task.execute().await {
|
||||
Ok(result) => {
|
||||
if !result.success() {
|
||||
bar.abandon_with_message("Failed")
|
||||
} else {
|
||||
bar.finish_with_message(task.goal().success_str().unwrap());
|
||||
}
|
||||
bar.inc(0);
|
||||
let mut result_list = result_list.lock().unwrap();
|
||||
result_list.push(result);
|
||||
},
|
||||
Err(e) => {
|
||||
println!("An error occurred while pushing to {}: {:?}", task.name(), e);
|
||||
bar.set_style(failing_spinner_style.clone());
|
||||
bar.abandon_with_message("Internal error");
|
||||
},
|
||||
}
|
||||
|
||||
root_bar.inc(1);
|
||||
|
||||
if remaining == 0 {
|
||||
root_bar.finish_with_message("Finished");
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
futures.push(future);
|
||||
}
|
||||
|
||||
if progress_bar {
|
||||
futures.push(tokio::task::spawn_blocking(move || {
|
||||
multi.join().unwrap();
|
||||
}));
|
||||
}
|
||||
|
||||
join_all(futures).await;
|
||||
|
||||
let result_list = result_list.lock().unwrap();
|
||||
for result in result_list.iter() {
|
||||
if !result.success() {
|
||||
println!("{}", result);
|
||||
}
|
||||
}
|
||||
}
|
131
src/eval.nix
Normal file
131
src/eval.nix
Normal file
|
@ -0,0 +1,131 @@
|
|||
{ rawHive }:
|
||||
with builtins;
|
||||
let
|
||||
defaultHive = {
|
||||
# Will be set in defaultHiveMeta
|
||||
network = {};
|
||||
|
||||
# Like in NixOps, there is a special host named `defaults`
|
||||
# containing configurations that will be applied to all
|
||||
# hosts.
|
||||
defaults = {};
|
||||
};
|
||||
|
||||
defaultHiveMeta = {
|
||||
name = "hive";
|
||||
description = "A Colmena Hive";
|
||||
|
||||
# Can be a path, a lambda, or an initialized Nixpkgs attrset
|
||||
nixpkgs = <nixpkgs>;
|
||||
};
|
||||
|
||||
# Colmena-specific options
|
||||
#
|
||||
# Largely compatible with NixOps/Morph.
|
||||
deploymentOptions = { name, lib, ... }:
|
||||
let
|
||||
types = lib.types;
|
||||
in {
|
||||
options = {
|
||||
deployment = {
|
||||
targetHost = lib.mkOption {
|
||||
description = ''
|
||||
The target SSH node for deployment.
|
||||
|
||||
If not specified, the node's attribute name will be used.
|
||||
'';
|
||||
type = types.str;
|
||||
default = name;
|
||||
};
|
||||
targetUser = lib.mkOption {
|
||||
description = ''
|
||||
The user to use to log into the remote node.
|
||||
'';
|
||||
type = types.str;
|
||||
default = "root";
|
||||
};
|
||||
tags = lib.mkOption {
|
||||
description = ''
|
||||
A list of tags for the node.
|
||||
|
||||
Can be used to select a group of nodes for deployment.
|
||||
'';
|
||||
type = types.listOf types.str;
|
||||
default = [];
|
||||
};
|
||||
};
|
||||
};
|
||||
};
|
||||
|
||||
hiveMeta = {
|
||||
network = defaultHiveMeta // (if rawHive ? network then rawHive.network else {});
|
||||
};
|
||||
hive = defaultHive // rawHive // hiveMeta;
|
||||
|
||||
pkgs = let
|
||||
pkgConf = hive.network.nixpkgs;
|
||||
in if typeOf pkgConf == "path" then
|
||||
import pkgConf {}
|
||||
else if typeOf pkgConf == "lambda" then
|
||||
pkgConf {}
|
||||
else if typeOf pkgConf == "set" then
|
||||
pkgConf
|
||||
else throw ''
|
||||
network.nixpkgs must be one of:
|
||||
|
||||
- A path to Nixpkgs (e.g., <nixpkgs>)
|
||||
- A Nixpkgs lambda (e.g., import <nixpkgs>)
|
||||
- A Nixpkgs attribute set
|
||||
'';
|
||||
|
||||
lib = pkgs.lib;
|
||||
reservedNames = [ "defaults" "network" "meta" ];
|
||||
|
||||
evalNode = name: config: let
|
||||
evalConfig = import (pkgs.path + "/nixos/lib/eval-config.nix");
|
||||
in evalConfig {
|
||||
system = currentSystem;
|
||||
modules = [
|
||||
deploymentOptions
|
||||
hive.defaults
|
||||
config
|
||||
] ++ (import (pkgs.path + "/nixos/modules/module-list.nix"));
|
||||
specialArgs = {
|
||||
inherit name nodes;
|
||||
modulesPath = pkgs.path + "/nixos/modules";
|
||||
};
|
||||
};
|
||||
|
||||
nodeNames = filter (name: ! elem name reservedNames) (attrNames hive);
|
||||
|
||||
# Exported attributes
|
||||
#
|
||||
# Functions are intended to be called with `nix-instantiate --eval --json`
|
||||
|
||||
nodes = listToAttrs (map (name: {
|
||||
inherit name;
|
||||
value = evalNode name hive.${name};
|
||||
}) nodeNames);
|
||||
|
||||
deploymentInfoJson = toJSON (lib.attrsets.mapAttrs (name: eval: eval.config.deployment) nodes);
|
||||
|
||||
toplevel = lib.attrsets.mapAttrs (name: eval: eval.config.system.build.toplevel) nodes;
|
||||
|
||||
buildAll = buildSelected {
|
||||
names = nodeNames;
|
||||
};
|
||||
buildSelected = { names ? null }: let
|
||||
# Change in the order of the names should not cause a derivation to be created
|
||||
selected = lib.attrsets.filterAttrs (name: _: elem name names) toplevel;
|
||||
in derivation rec {
|
||||
name = "colmena-${hive.network.name}";
|
||||
system = currentSystem;
|
||||
json = toJSON (lib.attrsets.mapAttrs (k: v: toString v) selected);
|
||||
builder = pkgs.writeScript "${name}.sh" ''
|
||||
#!/bin/sh
|
||||
echo "$json" > $out
|
||||
'';
|
||||
};
|
||||
in {
|
||||
inherit nodes deploymentInfoJson toplevel buildAll buildSelected;
|
||||
}
|
31
src/main.rs
Normal file
31
src/main.rs
Normal file
|
@ -0,0 +1,31 @@
|
|||
use clap::{App, AppSettings};
|
||||
|
||||
mod nix;
|
||||
mod command;
|
||||
mod progress;
|
||||
mod deployment;
|
||||
mod util;
|
||||
|
||||
#[tokio::main(flavor = "multi_thread")]
|
||||
async fn main() -> Result<(), Box<dyn std::error::Error>>{
|
||||
let matches = App::new("Colmena")
|
||||
.version("0.1.0")
|
||||
.author("Zhaofeng Li <hello@zhaofeng.li>")
|
||||
.about("NixOS deployment tool")
|
||||
.global_setting(AppSettings::ColoredHelp)
|
||||
.setting(AppSettings::ArgRequiredElseHelp)
|
||||
.subcommand(command::apply::subcommand())
|
||||
.subcommand(command::build::subcommand())
|
||||
.get_matches();
|
||||
|
||||
if let Some(sub_matches) = matches.subcommand_matches("build") {
|
||||
command::build::run(&matches, &sub_matches).await;
|
||||
return Ok(());
|
||||
}
|
||||
if let Some(sub_matches) = matches.subcommand_matches("apply") {
|
||||
command::apply::run(&matches, &sub_matches).await;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
583
src/nix.rs
Normal file
583
src/nix.rs
Normal file
|
@ -0,0 +1,583 @@
|
|||
//! A Colmena Hive.
|
||||
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::convert::AsRef;
|
||||
use std::io::Write;
|
||||
use std::process::{ExitStatus, Stdio};
|
||||
use std::collections::HashMap;
|
||||
use std::fs;
|
||||
use std::fmt;
|
||||
|
||||
use console::style;
|
||||
use async_trait::async_trait;
|
||||
use clap::ArgMatches;
|
||||
use indicatif::{ProgressBar, ProgressStyle};
|
||||
use serde::de::DeserializeOwned;
|
||||
use serde::{Serialize, Deserialize};
|
||||
use snafu::Snafu;
|
||||
use tempfile::{NamedTempFile, TempPath};
|
||||
use tokio::io::{AsyncBufReadExt, BufReader};
|
||||
use tokio::process::Command;
|
||||
|
||||
const HIVE_EVAL: &'static [u8] = include_bytes!("eval.nix");
|
||||
|
||||
#[derive(Debug, Clone, Deserialize)]
|
||||
pub struct DeploymentInfo {
|
||||
#[serde(rename = "targetHost")]
|
||||
target_host: String,
|
||||
|
||||
#[serde(rename = "targetUser")]
|
||||
target_user: String,
|
||||
tags: Vec<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct DeploymentTask<'task> {
|
||||
/// Name of the target.
|
||||
name: String,
|
||||
|
||||
/// The target to deploy to.
|
||||
target: DeploymentInfo,
|
||||
|
||||
/// Nix store path to the system profile to deploy.
|
||||
profile: PathBuf,
|
||||
|
||||
/// The goal of this deployment.
|
||||
goal: DeploymentGoal,
|
||||
|
||||
/// A ProgressBar to show the deployment progress to the user.
|
||||
progress: Option<&'task ProgressBar>,
|
||||
|
||||
/// The ProgressStyle to set when the deployment is failing.
|
||||
failing_spinner_style: Option<ProgressStyle>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Copy, Clone)]
|
||||
pub enum DeploymentGoal {
|
||||
/// Push the closures only.
|
||||
Push,
|
||||
|
||||
/// Make the configuration the boot default and activate now.
|
||||
Switch,
|
||||
|
||||
/// Make the configuration the boot default.
|
||||
Boot,
|
||||
|
||||
/// Activate the configuration, but don't make it the boot default.
|
||||
Test,
|
||||
|
||||
/// Show what would be done if this configuration were activated.
|
||||
DryActivate,
|
||||
}
|
||||
|
||||
impl DeploymentGoal {
|
||||
pub fn from_str(s: &str) -> Option<Self> {
|
||||
match s {
|
||||
"push" => Some(Self::Push),
|
||||
"switch" => Some(Self::Switch),
|
||||
"boot" => Some(Self::Boot),
|
||||
"test" => Some(Self::Test),
|
||||
"dry-activate" => Some(Self::DryActivate),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn as_str(&self) -> Option<&'static str> {
|
||||
use DeploymentGoal::*;
|
||||
match self {
|
||||
Push => None,
|
||||
Switch => Some("switch"),
|
||||
Boot => Some("boot"),
|
||||
Test => Some("test"),
|
||||
DryActivate => Some("dry-activate"),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn success_str(&self) -> Option<&'static str> {
|
||||
use DeploymentGoal::*;
|
||||
match self {
|
||||
Push => Some("Pushed"),
|
||||
Switch => Some("Activation successful"),
|
||||
Boot => Some("Will be activated next boot"),
|
||||
Test => Some("Activation successful (test)"),
|
||||
DryActivate => Some("Dry activation successful"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Results of a DeploymentTask to show to the user.
|
||||
pub struct DeploymentResult {
|
||||
name: String,
|
||||
push_output: Option<String>,
|
||||
push_successful: Option<bool>,
|
||||
activate_output: Option<String>,
|
||||
activate_successful: Option<bool>,
|
||||
}
|
||||
|
||||
impl DeploymentResult {
|
||||
fn new(name: String) -> Self {
|
||||
Self {
|
||||
name,
|
||||
push_output: None,
|
||||
push_successful: None,
|
||||
activate_output: None,
|
||||
activate_successful: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Whether the deployment was successful overall.
|
||||
pub fn success(&self) -> bool {
|
||||
if let Some(push_successful) = self.push_successful {
|
||||
if !push_successful {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(activate_successful) = self.activate_successful {
|
||||
if !activate_successful {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
true
|
||||
}
|
||||
|
||||
fn dump_log(f: &mut fmt::Formatter<'_>, output: Option<&String>) -> fmt::Result {
|
||||
if let Some(output) = output {
|
||||
writeln!(f, "Last 10 lines of log:")?;
|
||||
writeln!(f, "~~~~~~~~~~")?;
|
||||
let lines: Vec<&str> = output.split("\n").collect();
|
||||
|
||||
let start = if lines.len() < 10 {
|
||||
0
|
||||
} else {
|
||||
lines.len() - 10
|
||||
};
|
||||
|
||||
for i in start..lines.len() {
|
||||
writeln!(f, "{}", lines[i])?;
|
||||
}
|
||||
writeln!(f, "~~~~~~~~~~")?;
|
||||
}
|
||||
|
||||
writeln!(f)
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for DeploymentResult {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
if let Some(push_successful) = self.push_successful {
|
||||
if push_successful {
|
||||
writeln!(f, "Deployment on node {} succeeded.", self.name)?;
|
||||
} else {
|
||||
write!(f, "Deployment on node {} failed. ", self.name)?;
|
||||
Self::dump_log(f, self.push_output.as_ref())?;
|
||||
}
|
||||
}
|
||||
if let Some(activate_successful) = self.activate_successful {
|
||||
if activate_successful {
|
||||
writeln!(f, "Activation on node {} succeeded.", self.name)?;
|
||||
} else {
|
||||
write!(f, "Activation on node {} failed.", self.name)?;
|
||||
Self::dump_log(f, self.activate_output.as_ref())?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
pub enum NixError {
|
||||
#[snafu(display("I/O Error: {}", error))]
|
||||
IoError { error: std::io::Error },
|
||||
|
||||
#[snafu(display("Nix returned invalid response: {}", output))]
|
||||
BadOutput { output: String },
|
||||
|
||||
#[snafu(display("Nix exited with error code: {}", exit_code))]
|
||||
NixFailure { exit_code: i32 },
|
||||
|
||||
#[snafu(display("Nix was interrupted"))]
|
||||
NixKilled,
|
||||
|
||||
#[snafu(display("Nix Error: {}", message))]
|
||||
Unknown { message: String },
|
||||
}
|
||||
|
||||
pub type NixResult<T> = Result<T, NixError>;
|
||||
|
||||
pub struct Hive {
|
||||
hive: PathBuf,
|
||||
eval_nix: TempPath,
|
||||
}
|
||||
|
||||
struct NixInstantiate<'hive> {
|
||||
eval_nix: &'hive Path,
|
||||
hive: &'hive Path,
|
||||
expression: String,
|
||||
}
|
||||
|
||||
impl<'hive> NixInstantiate<'hive> {
|
||||
fn new(eval_nix: &'hive Path, hive: &'hive Path, expression: String) -> Self {
|
||||
Self {
|
||||
eval_nix,
|
||||
expression,
|
||||
hive,
|
||||
}
|
||||
}
|
||||
|
||||
fn instantiate(self) -> Command {
|
||||
// FIXME: unwrap
|
||||
// Technically filenames can be arbitrary byte strings (OsStr),
|
||||
// but Nix may not like it...
|
||||
|
||||
let mut command = Command::new("nix-instantiate");
|
||||
command
|
||||
.arg("-E")
|
||||
.arg(format!(
|
||||
"with builtins; let eval = import {}; hive = eval {{ rawHive = import {}; }}; in {}",
|
||||
self.eval_nix.to_str().unwrap(),
|
||||
self.hive.to_str().unwrap(),
|
||||
self.expression,
|
||||
));
|
||||
command
|
||||
}
|
||||
|
||||
fn eval(self) -> Command {
|
||||
let mut command = self.instantiate();
|
||||
command.arg("--eval").arg("--json");
|
||||
command
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
trait NixCommand {
|
||||
async fn passthrough(&mut self) -> NixResult<()>;
|
||||
async fn capture_output(&mut self) -> NixResult<String>;
|
||||
async fn capture_json<T>(&mut self) -> NixResult<T> where T: DeserializeOwned;
|
||||
async fn capture_store_path(&mut self) -> NixResult<StorePath>;
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl NixCommand for Command {
|
||||
/// Runs the command with stdout and stderr passed through to the user.
|
||||
async fn passthrough(&mut self) -> NixResult<()> {
|
||||
let exit = self
|
||||
.spawn()
|
||||
.map_err(map_io_error)?
|
||||
.wait()
|
||||
.await
|
||||
.map_err(map_io_error)?;
|
||||
|
||||
if exit.success() {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(match exit.code() {
|
||||
Some(exit_code) => NixError::NixFailure { exit_code },
|
||||
None => NixError::NixKilled,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Captures output as a String.
|
||||
async fn capture_output(&mut self) -> NixResult<String> {
|
||||
// We want the user to see the raw errors
|
||||
let output = self
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::inherit())
|
||||
.spawn()
|
||||
.map_err(map_io_error)?
|
||||
.wait_with_output()
|
||||
.await
|
||||
.map_err(map_io_error)?;
|
||||
|
||||
if output.status.success() {
|
||||
// FIXME: unwrap
|
||||
Ok(String::from_utf8(output.stdout).unwrap())
|
||||
} else {
|
||||
Err(match output.status.code() {
|
||||
Some(exit_code) => NixError::NixFailure { exit_code },
|
||||
None => NixError::NixKilled,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Captures deserialized output from JSON.
|
||||
async fn capture_json<T>(&mut self) -> NixResult<T> where T: DeserializeOwned {
|
||||
let output = self.capture_output().await?;
|
||||
serde_json::from_str(&output).map_err(|_| NixError::BadOutput {
|
||||
output: output.clone()
|
||||
})
|
||||
}
|
||||
|
||||
/// Captures a single store path.
|
||||
async fn capture_store_path(&mut self) -> NixResult<StorePath> {
|
||||
let output = self.capture_output().await?;
|
||||
Ok(StorePath(output.trim_end().into()))
|
||||
}
|
||||
}
|
||||
|
||||
/// A Nix store path.
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
struct StorePath(PathBuf);
|
||||
|
||||
impl StorePath {
|
||||
/// Builds the store path.
|
||||
pub async fn realise(&self) -> NixResult<Vec<PathBuf>> {
|
||||
Command::new("nix-store")
|
||||
.arg("--realise")
|
||||
.arg(&self.0)
|
||||
.capture_output()
|
||||
.await
|
||||
.map(|paths| {
|
||||
paths.lines().map(|p| p.into()).collect()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// A serialized Nix expression.
|
||||
///
|
||||
/// Very hacky and involves an Import From Derivation, so should be
|
||||
/// avoided as much as possible. But I suppose it's more robust than attempting
|
||||
/// to generate Nix expressions directly or escaping a JSON string to strip
|
||||
/// off Nix interpolation.
|
||||
struct SerializedNixExpresssion {
|
||||
json_file: TempPath,
|
||||
}
|
||||
|
||||
impl SerializedNixExpresssion {
|
||||
pub fn new<'de, T>(data: T) -> NixResult<Self> where T: Serialize {
|
||||
let mut tmp = NamedTempFile::new().map_err(map_io_error)?;
|
||||
let json = serde_json::to_vec(&data).expect("Could not serialize data");
|
||||
tmp.write_all(&json).map_err(map_io_error)?;
|
||||
|
||||
Ok(Self {
|
||||
json_file: tmp.into_temp_path(),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn expression(&self) -> String {
|
||||
format!("(builtins.fromJSON (builtins.readFile {}))", self.json_file.to_str().unwrap())
|
||||
}
|
||||
}
|
||||
|
||||
impl Hive {
|
||||
pub fn new<P: AsRef<Path>>(hive: P) -> NixResult<Self> {
|
||||
let mut eval_nix = NamedTempFile::new().map_err(map_io_error)?;
|
||||
eval_nix.write_all(HIVE_EVAL).map_err(map_io_error)?;
|
||||
|
||||
Ok(Self {
|
||||
hive: hive.as_ref().to_owned(),
|
||||
eval_nix: eval_nix.into_temp_path(),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn from_config_arg(args: &ArgMatches<'_>) -> NixResult<Self> {
|
||||
let path = args.value_of("config").expect("The config arg should exist").to_owned();
|
||||
let path = canonicalize_path(path);
|
||||
|
||||
Self::new(path)
|
||||
}
|
||||
|
||||
/// Retrieve a list of nodes in the hive
|
||||
pub async fn nodes(&self) -> NixResult<Vec<String>> {
|
||||
self.nix_instantiate("attrNames hive.nodes").eval()
|
||||
.capture_json().await
|
||||
}
|
||||
|
||||
/// Retrieve deployment info for all nodes
|
||||
pub async fn deployment_info(&self) -> NixResult<HashMap<String, DeploymentInfo>> {
|
||||
// FIXME: Really ugly :(
|
||||
let s: String = self.nix_instantiate("hive.deploymentInfoJson").eval()
|
||||
.capture_json().await?;
|
||||
|
||||
Ok(serde_json::from_str(&s).unwrap())
|
||||
}
|
||||
|
||||
/// Builds selected nodes
|
||||
pub async fn build_selected(&self, nodes: Vec<String>) -> NixResult<HashMap<String, PathBuf>> {
|
||||
let nodes_expr = SerializedNixExpresssion::new(&nodes)?;
|
||||
let expr = format!("hive.buildSelected {{ names = {}; }}", nodes_expr.expression());
|
||||
|
||||
self.build_common(&expr).await
|
||||
}
|
||||
|
||||
/// Builds all node configurations
|
||||
pub async fn build_all(&self) -> NixResult<HashMap<String, PathBuf>> {
|
||||
self.build_common("hive.buildAll").await
|
||||
}
|
||||
|
||||
/// Builds node configurations
|
||||
///
|
||||
/// Expects the resulting store path to point to a JSON file containing
|
||||
/// a map of node name -> store path.
|
||||
async fn build_common(&self, expression: &str) -> NixResult<HashMap<String, PathBuf>> {
|
||||
let build: StorePath = self.nix_instantiate(expression).instantiate()
|
||||
.capture_store_path().await?;
|
||||
|
||||
let realization = build.realise().await?;
|
||||
assert!(realization.len() == 1);
|
||||
|
||||
let json = fs::read_to_string(&realization[0]).map_err(map_io_error)?;
|
||||
let result_map: HashMap<String, PathBuf> = serde_json::from_str(&json)
|
||||
.expect("Bad result from our own build routine");
|
||||
|
||||
Ok(result_map)
|
||||
}
|
||||
|
||||
fn nix_instantiate(&self, expression: &str) -> NixInstantiate {
|
||||
NixInstantiate::new(&self.eval_nix, &self.hive, expression.to_owned())
|
||||
}
|
||||
}
|
||||
|
||||
impl<'task> DeploymentTask<'task> {
|
||||
pub fn new(name: String, target: DeploymentInfo, profile: PathBuf, goal: DeploymentGoal) -> Self {
|
||||
Self {
|
||||
name,
|
||||
target,
|
||||
profile,
|
||||
goal,
|
||||
progress: None,
|
||||
failing_spinner_style: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn name(&self) -> &str { &self.name }
|
||||
pub fn goal(&self) -> DeploymentGoal { self.goal }
|
||||
|
||||
/// Set the progress bar used during deployment.
|
||||
pub fn set_progress_bar(&mut self, progress: &'task ProgressBar) {
|
||||
self.progress = Some(progress);
|
||||
}
|
||||
|
||||
/// Set a spinner style to switch to when the deployment is failing.
|
||||
pub fn set_failing_spinner_style(&mut self, style: ProgressStyle) {
|
||||
self.failing_spinner_style = Some(style);
|
||||
}
|
||||
|
||||
pub async fn execute(&mut self) -> NixResult<DeploymentResult> {
|
||||
match self.goal {
|
||||
DeploymentGoal::Push => {
|
||||
self.push().await
|
||||
}
|
||||
_ => {
|
||||
self.push_and_activate().await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn push(&mut self) -> NixResult<DeploymentResult> {
|
||||
let mut result = DeploymentResult::new(self.name.clone());
|
||||
|
||||
// Issue of interest:
|
||||
// https://github.com/NixOS/nix/issues?q=ipv6
|
||||
let target = format!("{}@{}", self.target.target_user, self.target.target_host);
|
||||
let mut command = Command::new("nix-copy-closure");
|
||||
command
|
||||
.arg("--to")
|
||||
.arg("--gzip")
|
||||
.arg("--include-outputs")
|
||||
.arg("--use-substitutes")
|
||||
.arg(&target)
|
||||
.arg(&self.profile);
|
||||
|
||||
let (exit, output) = self.run_command(&mut command, false).await?;
|
||||
|
||||
if let Some(progress) = self.progress.as_mut() {
|
||||
if !exit.success() {
|
||||
if self.failing_spinner_style.is_some() {
|
||||
let style = self.failing_spinner_style.as_ref().unwrap().clone();
|
||||
progress.set_style(style);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
result.push_successful = Some(exit.success());
|
||||
result.push_output = output;
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
async fn push_and_activate(&mut self) -> NixResult<DeploymentResult> {
|
||||
let mut result = self.push().await?;
|
||||
|
||||
if !result.success() {
|
||||
// Don't go any further
|
||||
return Ok(result);
|
||||
}
|
||||
|
||||
let target = format!("{}@{}", self.target.target_user, self.target.target_host);
|
||||
let activation_command = format!("{}/bin/switch-to-configuration", self.profile.to_str().unwrap());
|
||||
let mut command = Command::new("ssh");
|
||||
command
|
||||
.arg("-o")
|
||||
.arg("StrictHostKeyChecking=accept-new")
|
||||
.arg(&target)
|
||||
.arg("--")
|
||||
.arg(activation_command)
|
||||
.arg(self.goal.as_str().unwrap());
|
||||
|
||||
let (exit, output) = self.run_command(&mut command, true).await?;
|
||||
|
||||
if let Some(progress) = self.progress.as_mut() {
|
||||
if !exit.success() {
|
||||
if self.failing_spinner_style.is_some() {
|
||||
let style = self.failing_spinner_style.as_ref().unwrap().clone();
|
||||
progress.set_style(style);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
result.activate_successful = Some(exit.success());
|
||||
result.activate_output = output;
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
async fn run_command(&mut self, command: &mut Command, capture_stdout: bool) -> NixResult<(ExitStatus, Option<String>)> {
|
||||
command.stdin(Stdio::null());
|
||||
command.stderr(Stdio::piped());
|
||||
|
||||
if capture_stdout {
|
||||
command.stdout(Stdio::piped());
|
||||
}
|
||||
|
||||
let mut child = command.spawn().map_err(map_io_error)?;
|
||||
|
||||
let mut stderr = BufReader::new(child.stderr.as_mut().unwrap());
|
||||
let mut output = String::new();
|
||||
|
||||
loop {
|
||||
let mut line = String::new();
|
||||
let len = stderr.read_line(&mut line).await.unwrap();
|
||||
|
||||
if len == 0 {
|
||||
break;
|
||||
}
|
||||
|
||||
let trimmed = line.trim_end();
|
||||
if let Some(progress) = self.progress.as_mut() {
|
||||
progress.set_message(trimmed);
|
||||
progress.inc(0);
|
||||
} else {
|
||||
println!("{} | {}", style(&self.name).cyan(), trimmed);
|
||||
}
|
||||
output += &line;
|
||||
}
|
||||
let exit = child.wait().await.map_err(map_io_error)?;
|
||||
Ok((exit, Some(output)))
|
||||
}
|
||||
}
|
||||
|
||||
fn map_io_error(error: std::io::Error) -> NixError {
|
||||
NixError::IoError { error }
|
||||
}
|
||||
|
||||
fn canonicalize_path(path: String) -> PathBuf {
|
||||
if !path.starts_with("/") {
|
||||
format!("./{}", path).into()
|
||||
} else {
|
||||
path.into()
|
||||
}
|
||||
}
|
||||
|
15
src/progress.rs
Normal file
15
src/progress.rs
Normal file
|
@ -0,0 +1,15 @@
|
|||
use indicatif::ProgressStyle;
|
||||
|
||||
pub fn get_spinner_styles(node_name_alignment: usize) -> (ProgressStyle, ProgressStyle) {
|
||||
let template = format!("{{prefix:>{}.bold.dim}} {{spinner}} {{elapsed}} {{wide_msg}}", node_name_alignment);
|
||||
|
||||
(
|
||||
ProgressStyle::default_spinner()
|
||||
.tick_chars("🕛🕐🕑🕒🕓🕔🕕🕖🕗🕘🕙🕚✅")
|
||||
.template(&template),
|
||||
|
||||
ProgressStyle::default_spinner()
|
||||
.tick_chars("❌❌")
|
||||
.template(&template),
|
||||
)
|
||||
}
|
40
src/util.rs
Normal file
40
src/util.rs
Normal file
|
@ -0,0 +1,40 @@
|
|||
use clap::{Arg, App};
|
||||
use glob::Pattern as GlobPattern;
|
||||
|
||||
pub fn filter_nodes(nodes: &Vec<String>, filter: &str) -> Vec<String> {
|
||||
let filters: Vec<GlobPattern> = filter.split(",").map(|pattern| GlobPattern::new(pattern).unwrap()).collect();
|
||||
|
||||
if filters.len() > 0 {
|
||||
nodes.iter().filter(|name| {
|
||||
for filter in filters.iter() {
|
||||
if filter.matches(name) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
false
|
||||
}).cloned().collect()
|
||||
} else {
|
||||
nodes.to_owned()
|
||||
}
|
||||
}
|
||||
|
||||
pub fn register_common_args<'a, 'b>(command: App<'a, 'b>) -> App<'a, 'b> {
|
||||
command
|
||||
.arg(Arg::with_name("config")
|
||||
.short("f")
|
||||
.long("config")
|
||||
.help("Path to a Hive expression")
|
||||
.default_value("hive.nix")
|
||||
.required(true))
|
||||
.arg(Arg::with_name("on")
|
||||
.long("on")
|
||||
.help("Select a list of machines")
|
||||
.long_help(r#"The list is comma-separated and globs are supported.
|
||||
Valid examples:
|
||||
|
||||
- host1,host2,host3
|
||||
- edge-*
|
||||
- edge-*,core-*"#)
|
||||
.takes_value(true))
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue