Redesign deployment process

Now evaluation can be automatically split into chunks based on available
RAM. All three stages of the deployment process (evaluate, build,
apply) can happen concurrently.

Fixes #1.
This commit is contained in:
Zhaofeng Li 2021-01-24 14:08:48 -08:00
parent f53ebef41c
commit 506b894be6
16 changed files with 1415 additions and 615 deletions

17
Cargo.lock generated
View file

@ -58,6 +58,12 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ad1f8e949d755f9d79112b5bb46938e0ef9d3804a0b16dfab13aafcaa5f0fa72"
[[package]]
name = "cc"
version = "1.0.66"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c0496836a84f8d0495758516b8621a622beb77c0fed418570e50764093ced48"
[[package]]
name = "cfg-if"
version = "0.1.10"
@ -103,6 +109,7 @@ dependencies = [
"serde",
"serde_json",
"snafu",
"sys-info",
"tempfile",
"tokio",
]
@ -705,6 +712,16 @@ dependencies = [
"unicode-xid",
]
[[package]]
name = "sys-info"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5cfbd84f86389198ade41b439f72a5b1b3a8ba728e61cd589e1720d0df44c39"
dependencies = [
"cc",
"libc",
]
[[package]]
name = "tempfile"
version = "3.1.0"

View file

@ -20,6 +20,7 @@ log = "0.4.11"
quit = "1.1.2"
serde = { version = "1.0.118", features = ["derive"] }
serde_json = "1.0"
sys-info = "0.7.0"
snafu = "0.6.10"
tempfile = "3.1.0"
tokio = { version = "1.0.0", features = ["full"] }

View file

@ -1,6 +1,7 @@
{
pkgs ? import ./pkgs.nix {},
}: let
lib = pkgs.lib;
rustPlatform = if pkgs ? pinnedRust then pkgs.makeRustPlatform {
rustc = pkgs.pinnedRust;
cargo = pkgs.pinnedRust;
@ -9,6 +10,12 @@ in rustPlatform.buildRustPackage {
name = "colmena-dev";
version = "0.1.0";
src = lib.cleanSourceWith {
filter = name: type: !(type == "directory" && baseNameOf name == "target");
src = lib.cleanSourceWith {
filter = lib.cleanSourceFilter;
src = ./.;
cargoSha256 = "1ai046vbvydyqhwiy8qz0d28dch5jpxg3rzk7nrh2sdwcvxirmvm";
};
};
cargoSha256 = "0m35xjslm5gxr2cb5fw8pkqpm853hsznhsncry2kvicqzwh63ldm";
}

View file

@ -1,24 +1,47 @@
use std::collections::HashMap;
use std::sync::Arc;
use clap::{Arg, App, SubCommand, ArgMatches};
use crate::nix::{DeploymentTask, DeploymentGoal};
use crate::nix::host::CopyOptions;
use crate::deployment::deploy;
use crate::nix::deployment::{
Deployment,
DeploymentGoal,
DeploymentOptions,
EvaluationNodeLimit,
ParallelismLimit,
};
use crate::nix::host::local as localhost;
use crate::util;
pub fn subcommand() -> App<'static, 'static> {
let command = SubCommand::with_name("apply")
.about("Apply configurations on remote machines")
.arg(Arg::with_name("goal")
.help("Deployment goal")
.long_help("Same as the targets for switch-to-configuration.\n\"push\" means only copying the closures to remote nodes.")
.default_value("switch")
.index(1)
.possible_values(&["push", "switch", "boot", "test", "dry-activate"]))
pub fn register_deploy_args<'a, 'b>(command: App<'a, 'b>) -> App<'a, 'b> {
command
.arg(Arg::with_name("eval-node-limit")
.long("eval-node-limit")
.value_name("LIMIT")
.help("Evaluation node limit")
.long_help(r#"Limits the maximum number of hosts to be evaluated at once.
The evaluation process is RAM-intensive. The default behavior is to limit the maximum number of host evaluated at the same time based on naive heuristics.
Set to 0 to disable the limit.
"#)
.default_value("auto")
.takes_value(true)
.validator(|s| {
if s == "auto" {
return Ok(());
}
match s.parse::<usize>() {
Ok(_) => Ok(()),
Err(_) => Err(String::from("The value must be a valid number")),
}
}))
.arg(Arg::with_name("parallel")
.short("p")
.long("parallel")
.value_name("LIMIT")
.help("Parallelism limit")
.help("Deploy parallelism limit")
.long_help(r#"Limits the maximum number of hosts to be deployed in parallel.
Set to 0 to disable parallemism limit.
@ -31,12 +54,33 @@ Set to 0 to disable parallemism limit.
Err(_) => Err(String::from("The value must be a valid number")),
}
}))
.arg(Arg::with_name("parallel-build")
.long("parallel-build")
.value_name("LIMIT")
.help("Build parallelism limit")
.long_help("Limits the maximum number of parallel build processes.")
.default_value("2")
.takes_value(true)
.validator(|s| {
if s == "0" {
return Err(String::from("The value must be non-zero"));
}
match s.parse::<usize>() {
Ok(_) => Ok(()),
Err(_) => Err(String::from("The value must be a valid number")),
}
}))
.arg(Arg::with_name("verbose")
.short("v")
.long("verbose")
.help("Be verbose")
.long_help("Deactivates the progress spinner and prints every line of output.")
.takes_value(false))
.arg(Arg::with_name("no-build-substitutes")
.long("no-build-substitutes")
.help("Do not use substitutes during build")
.long_help("Disables the use of substituters when building.")
.takes_value(false))
.arg(Arg::with_name("no-substitutes")
.long("no-substitutes")
.help("Do not use substitutes")
@ -47,13 +91,25 @@ Set to 0 to disable parallemism limit.
.help("Do not use gzip")
.long_help("Disables the use of gzip when copying closures to the remote host.")
.takes_value(false))
}
pub fn subcommand() -> App<'static, 'static> {
let command = SubCommand::with_name("apply")
.about("Apply configurations on remote machines")
.arg(Arg::with_name("goal")
.help("Deployment goal")
.long_help("Same as the targets for switch-to-configuration.\n\"push\" means only copying the closures to remote nodes.")
.default_value("switch")
.index(1)
.possible_values(&["build", "push", "switch", "boot", "test", "dry-activate"]))
;
let command = register_deploy_args(command);
util::register_selector_args(command)
}
pub async fn run(_global_args: &ArgMatches<'_>, local_args: &ArgMatches<'_>) {
let mut hive = util::hive_from_args(local_args).unwrap();
let hive = util::hive_from_args(local_args).unwrap();
log::info!("Enumerating nodes...");
let all_nodes = hive.deployment_info().await.unwrap();
@ -70,50 +126,70 @@ pub async fn run(_global_args: &ArgMatches<'_>, local_args: &ArgMatches<'_>) {
quit::with_code(2);
}
if selected_nodes.len() == all_nodes.len() {
log::info!("Building all node configurations...");
} else {
log::info!("Selected {} out of {} hosts. Building node configurations...", selected_nodes.len(), all_nodes.len());
}
// Some ugly argument mangling :/
let mut profiles = hive.build_selected(selected_nodes).await.unwrap();
let goal = DeploymentGoal::from_str(local_args.value_of("goal").unwrap()).unwrap();
let verbose = local_args.is_present("verbose");
let max_parallelism = local_args.value_of("parallel").unwrap().parse::<usize>().unwrap();
let max_parallelism = match max_parallelism {
0 => None,
_ => Some(max_parallelism),
};
let mut task_list: Vec<DeploymentTask> = Vec::new();
let mut skip_list: Vec<String> = Vec::new();
for (name, profile) in profiles.drain() {
let target = all_nodes.get(&name).unwrap().to_ssh_host();
match target {
Some(target) => {
let mut task = DeploymentTask::new(name, target, profile, goal);
let options = CopyOptions::default()
.gzip(!local_args.is_present("no-gzip"))
.use_substitutes(!local_args.is_present("no-substitutes"))
;
task.set_copy_options(options);
task_list.push(task);
let mut targets = HashMap::new();
for node in &selected_nodes {
let host = all_nodes.get(node).unwrap().to_ssh_host();
match host {
Some(host) => {
targets.insert(node.clone(), host);
}
None => {
skip_list.push(name);
if goal == DeploymentGoal::Build {
targets.insert(node.clone(), localhost());
}
}
}
}
if skip_list.len() != 0 {
log::info!("Applying configurations ({} skipped)...", skip_list.len());
if targets.len() == all_nodes.len() {
log::info!("Selected all {} nodes.", targets.len());
} else if targets.len() == selected_nodes.len() {
log::info!("Selected {} out of {} hosts.", targets.len(), all_nodes.len());
} else {
log::info!("Applying configurations...");
log::info!("Selected {} out of {} hosts ({} skipped)", targets.len(), all_nodes.len(), selected_nodes.len() - targets.len());
}
deploy(task_list, max_parallelism, !verbose).await;
let mut deployment = Deployment::new(hive, targets, goal);
let mut options = DeploymentOptions::default();
options.set_substituters_build(!local_args.is_present("no-build-substitutes"));
options.set_substituters_push(!local_args.is_present("no-substitutes"));
options.set_gzip(!local_args.is_present("no-gzip"));
options.set_progress_bar(!local_args.is_present("verbose"));
deployment.set_options(options);
let mut parallelism_limit = ParallelismLimit::default();
parallelism_limit.set_apply_limit({
let limit = local_args.value_of("parallel").unwrap().parse::<usize>().unwrap();
if limit == 0 {
selected_nodes.len() // HACK
} else {
local_args.value_of("parallel").unwrap().parse::<usize>().unwrap()
}
});
parallelism_limit.set_build_limit({
let limit = local_args.value_of("parallel").unwrap().parse::<usize>().unwrap();
if limit == 0 {
panic!("The build parallelism limit must not be 0");
}
limit
});
deployment.set_parallelism_limit(parallelism_limit);
let evaluation_node_limit = match local_args.value_of("eval-node-limit").unwrap() {
"auto" => EvaluationNodeLimit::Heuristic,
number => {
let number = number.parse::<usize>().unwrap();
if number == 0 {
EvaluationNodeLimit::None
} else {
EvaluationNodeLimit::Manual(number)
}
}
};
deployment.set_evaluation_node_limit(evaluation_node_limit);
let deployment = Arc::new(deployment);
deployment.execute().await;
}

View file

@ -1,10 +1,12 @@
use std::env;
use std::collections::HashMap;
use std::sync::Arc;
use clap::{Arg, App, SubCommand, ArgMatches};
use tokio::fs;
use tokio::process::Command;
use crate::nix::{DeploymentTask, DeploymentGoal, Host};
use crate::nix::{Deployment, DeploymentGoal, Host};
use crate::nix::host;
use crate::util;
@ -57,7 +59,7 @@ pub async fn run(_global_args: &ArgMatches<'_>, local_args: &ArgMatches<'_>) {
}
}
let mut hive = util::hive_from_args(local_args).unwrap();
let hive = util::hive_from_args(local_args).unwrap();
let hostname = hostname::get().expect("Could not get hostname")
.to_string_lossy().into_owned();
let goal = DeploymentGoal::from_str(local_args.value_of("goal").unwrap()).unwrap();
@ -79,16 +81,12 @@ pub async fn run(_global_args: &ArgMatches<'_>, local_args: &ArgMatches<'_>) {
}
};
log::info!("Building local node configuration...");
let profile = {
let selected_nodes: Vec<String> = vec![hostname.clone()];
let mut profiles = hive.build_selected(selected_nodes).await
.expect("Failed to build local configurations");
profiles.remove(&hostname).unwrap()
};
let mut targets = HashMap::new();
targets.insert(hostname.clone(), target);
let mut task = DeploymentTask::new(hostname, target, profile, goal);
task.execute().await.unwrap();
let deployment = Arc::new(Deployment::new(hive, targets, goal));
deployment.execute().await;
}
async fn escalate() -> ! {

View file

@ -1,45 +1,23 @@
use clap::{Arg, App, SubCommand, ArgMatches};
use clap::{Arg, App, SubCommand};
use crate::util;
use super::apply;
pub use super::apply::run;
pub fn subcommand() -> App<'static, 'static> {
let command = SubCommand::with_name("build")
.about("Build the configuration")
.arg(Arg::with_name("verbose")
.short("v")
.long("verbose")
.help("Be verbose")
.long_help("Deactivates the progress spinner and prints every line of output.")
.takes_value(false));
.about("Build the configuration but not push to remote machines")
.long_about(r#"Build the configuration but not push to remote machines
This subcommand behaves as if you invoked `apply` with the `build` goal."#)
.arg(Arg::with_name("goal")
.hidden(true)
.default_value("build")
.possible_values(&["build"])
.takes_value(true));
let command = apply::register_deploy_args(command);
util::register_selector_args(command)
}
pub async fn run(_global_args: &ArgMatches<'_>, local_args: &ArgMatches<'_>) {
let mut hive = util::hive_from_args(local_args).unwrap();
log::info!("Enumerating nodes...");
let all_nodes = hive.deployment_info().await.unwrap();
let selected_nodes = match local_args.value_of("on") {
Some(filter) => {
util::filter_nodes(&all_nodes, filter)
}
None => all_nodes.keys().cloned().collect(),
};
if selected_nodes.len() == 0 {
log::warn!("No hosts matched. Exiting...");
quit::with_code(2);
}
if selected_nodes.len() == all_nodes.len() {
log::info!("Building all node configurations...");
} else {
log::info!("Selected {} out of {} hosts. Building node configurations...", selected_nodes.len(), all_nodes.len());
}
hive.build_selected(selected_nodes).await.unwrap();
log::info!("Success!");
}

View file

@ -24,7 +24,7 @@ For example, to retrieve the configuration of one node, you may write something
}
pub async fn run(_global_args: &ArgMatches<'_>, local_args: &ArgMatches<'_>) {
let mut hive = util::hive_from_args(local_args).unwrap();
let hive = util::hive_from_args(local_args).unwrap();
if !(local_args.is_present("expression") ^ local_args.is_present("expression_file")) {
log::error!("Either an expression (-E) xor a .nix file containing an expression should be specified, not both.");

View file

@ -1,131 +0,0 @@
use std::cmp::min;
use std::sync::Arc;
use indicatif::{MultiProgress, ProgressBar, ProgressDrawTarget};
use futures::future::join_all;
use tokio::sync::Mutex;
use crate::nix::DeploymentTask;
use crate::progress::get_spinner_styles;
/// User-facing deploy routine
pub async fn deploy(tasks: Vec<DeploymentTask>, max_parallelism: Option<usize>, progress_bar: bool) {
let parallelism = match max_parallelism {
Some(limit) => {
min(limit, tasks.len())
}
None => {
tasks.len()
}
};
let node_name_alignment = tasks.iter().map(|task| task.name().len()).max().unwrap();
let multi = Arc::new(MultiProgress::new());
let root_bar = Arc::new(multi.add(ProgressBar::new(tasks.len() as u64)));
multi.set_draw_target(ProgressDrawTarget::stderr_nohz());
{
let (spinner_style, _) = get_spinner_styles(node_name_alignment);
root_bar.set_message("Running...");
root_bar.set_style(spinner_style);
root_bar.inc(0);
}
let tasks = Arc::new(Mutex::new(tasks));
let result_list: Arc<Mutex<Vec<(DeploymentTask, bool)>>> = Arc::new(Mutex::new(Vec::new()));
let mut futures = Vec::new();
for _ in 0..parallelism {
let tasks = tasks.clone();
let result_list = result_list.clone();
let multi = multi.clone();
let (spinner_style, failing_spinner_style) = get_spinner_styles(node_name_alignment);
let root_bar = root_bar.clone();
let future = tokio::spawn(async move {
// Perform tasks until there's none
loop {
let (task, remaining) = {
let mut tasks = tasks.lock().await;
let task = tasks.pop();
let remaining = tasks.len();
(task, remaining)
};
if task.is_none() {
// We are donzo!
return;
}
let mut task = task.unwrap();
let bar = multi.add(ProgressBar::new(100));
bar.set_style(spinner_style.clone());
bar.set_prefix(task.name());
bar.set_message("Starting...");
bar.inc(0);
if progress_bar {
task.set_progress_bar(bar.clone()).await;
}
match task.execute().await {
Ok(_) => {
bar.finish_with_message(task.goal().success_str().unwrap());
let mut result_list = result_list.lock().await;
result_list.push((task, true));
},
Err(_) => {
bar.set_style(failing_spinner_style.clone());
bar.abandon_with_message("Failed");
let mut result_list = result_list.lock().await;
result_list.push((task, false));
},
}
root_bar.inc(1);
if remaining == 0 {
root_bar.finish_with_message("Finished");
}
}
});
futures.push(future);
}
if progress_bar {
futures.push(tokio::task::spawn_blocking(move || {
multi.join().unwrap();
}));
}
join_all(futures).await;
let mut result_list = result_list.lock().await;
for (task, success) in result_list.drain(..) {
if !success {
let name = task.name().to_owned();
let host = task.to_host().await;
print!("Failed to deploy to {}. ", name);
if let Some(logs) = host.dump_logs().await {
if let Some(lines) = logs.chunks(10).rev().next() {
println!("Last {} lines of logs:", lines.len());
for line in lines {
println!("{}", line.trim_end());
}
} else {
println!("The log is empty.");
}
} else {
println!("Logs are not available for this target.");
}
}
}
}

View file

@ -4,7 +4,6 @@ use clap::{App, AppSettings, Arg};
mod nix;
mod command;
mod progress;
mod deployment;
mod util;
macro_rules! command {

626
src/nix/deployment.rs Normal file
View file

@ -0,0 +1,626 @@
use std::cmp::max;
use std::sync::Arc;
use std::collections::HashMap;
use futures::future::join_all;
use futures::join;
use indicatif::{MultiProgress, ProgressBar, ProgressDrawTarget};
use tokio::sync::{Mutex, Semaphore};
use super::{Hive, Host, CopyOptions, host};
use crate::progress::get_spinner_styles;
/// Amount of RAM reserved for the system, in MB.
const EVAL_RESERVE_MB: u64 = 1024;
/// Estimated amount of RAM needed to evaluate one host, in MB.
const EVAL_PER_HOST_MB: u64 = 512;
const BATCH_OPERATION_LABEL: &'static str = "(...)";
macro_rules! set_up_batch_progress_bar {
($multi:ident, $style:ident, $chunk:ident, $single_text:expr, $batch_text:expr) => {{
let bar = $multi.add(ProgressBar::new(100));
bar.set_style($style.clone());
bar.enable_steady_tick(100);
if $chunk.len() == 1 {
bar.set_prefix(&$chunk[0]);
bar.set_message($single_text);
} else {
bar.set_prefix(BATCH_OPERATION_LABEL);
bar.set_message(&format!($batch_text, $chunk.len()));
}
bar.inc(0);
bar
}};
}
#[derive(Debug, Copy, Clone, PartialEq)]
pub enum DeploymentGoal {
/// Build the configurations only.
Build,
/// Push the closures only.
Push,
/// Make the configuration the boot default and activate now.
Switch,
/// Make the configuration the boot default.
Boot,
/// Activate the configuration, but don't make it the boot default.
Test,
/// Show what would be done if this configuration were activated.
DryActivate,
}
impl DeploymentGoal {
pub fn from_str(s: &str) -> Option<Self> {
match s {
"build" => Some(Self::Build),
"push" => Some(Self::Push),
"switch" => Some(Self::Switch),
"boot" => Some(Self::Boot),
"test" => Some(Self::Test),
"dry-activate" => Some(Self::DryActivate),
_ => None,
}
}
pub fn as_str(&self) -> Option<&'static str> {
use DeploymentGoal::*;
match self {
Build => None,
Push => None,
Switch => Some("switch"),
Boot => Some("boot"),
Test => Some("test"),
DryActivate => Some("dry-activate"),
}
}
pub fn success_str(&self) -> Option<&'static str> {
use DeploymentGoal::*;
match self {
Build => Some("Configuration built"),
Push => Some("Pushed"),
Switch => Some("Activation successful"),
Boot => Some("Will be activated next boot"),
Test => Some("Activation successful (test)"),
DryActivate => Some("Dry activation successful"),
}
}
pub fn should_switch_profile(&self) -> bool {
use DeploymentGoal::*;
match self {
Boot | Switch => true,
_ => false,
}
}
pub fn requires_activation(&self) -> bool {
use DeploymentGoal::*;
match self {
Build | Push => false,
_ => true,
}
}
}
/// Internal deployment stages
#[derive(Debug)]
enum DeploymentStage {
Evaluate(Vec<String>),
Build(Vec<String>),
Apply(String),
}
/// Results of a deployment to a node
#[derive(Debug)]
struct DeploymentResult {
/// Stage in which the deployment ended.
stage: DeploymentStage,
/// Whether the deployment succeeded or not.
success: bool,
/// Unstructured logs of the deployment.
logs: Option<String>,
}
impl DeploymentResult {
fn success(stage: DeploymentStage, logs: Option<String>) -> Self {
Self {
stage,
success: true,
logs,
}
}
fn failure(stage: DeploymentStage, logs: Option<String>) -> Self {
Self {
stage,
success: true,
logs,
}
}
fn is_successful(&self) -> bool {
self.success
}
fn print(&self) {
use DeploymentStage::*;
if self.is_successful() {
unimplemented!();
}
match &self.stage {
Evaluate(nodes) => {
self.print_failed_nodes("Evaluation of", &nodes);
}
Build(nodes) => {
self.print_failed_nodes("Build of", &nodes);
}
Apply(node) => {
self.print_failed_nodes("Deployment to", &vec![node.clone()]);
}
}
}
fn print_failed_nodes(&self, prefix: &'static str, nodes: &Vec<String>) {
let last_lines: Option<Vec<String>> = self.logs.as_ref().map(|logs| {
logs.split("\n").collect::<Vec<&str>>().iter().rev().take(10).rev()
.map(|line| line.to_string()).collect()
});
let msg = if nodes.len() == 1 {
format!("{} {} failed.", prefix, nodes[0])
} else {
format!("{} {} nodes failed.", prefix, nodes.len())
};
if let Some(lines) = last_lines {
log::error!("{} Last {} lines of logs:", msg, lines.len());
for line in lines {
log::error!("{}", line);
}
}
}
}
#[derive(Debug)]
pub struct Deployment {
hive: Hive,
goal: DeploymentGoal,
nodes: Vec<String>,
node_hosts: Mutex<HashMap<String, Box<dyn Host>>>,
parallelism_limit: ParallelismLimit,
evaluation_node_limit: EvaluationNodeLimit,
options: DeploymentOptions,
results: Mutex<Vec<DeploymentResult>>,
}
impl Deployment {
pub fn new(hive: Hive, targets: HashMap<String, Box<dyn Host>>, goal: DeploymentGoal) -> Self {
let nodes: Vec<String> = targets.keys().cloned().collect();
Self {
hive,
goal,
nodes,
node_hosts: Mutex::new(targets),
parallelism_limit: ParallelismLimit::default(),
evaluation_node_limit: EvaluationNodeLimit::default(),
options: DeploymentOptions::default(),
results: Mutex::new(Vec::new()),
}
}
pub fn set_options(&mut self, options: DeploymentOptions) {
self.options = options;
}
pub fn set_parallelism_limit(&mut self, limit: ParallelismLimit) {
self.parallelism_limit = limit;
}
pub fn set_evaluation_node_limit(&mut self, limit: EvaluationNodeLimit) {
self.evaluation_node_limit = limit;
}
/// Executes the deployment (user-facing)
///
/// Self must be wrapped inside an Arc.
pub async fn execute(self: Arc<Self>) {
let multi = Arc::new(MultiProgress::new());
let root_bar = Arc::new(multi.add(ProgressBar::new(100)));
let alignment = self.node_name_alignment();
multi.set_draw_target(ProgressDrawTarget::stderr_nohz());
{
let (spinner_style, _) = get_spinner_styles(alignment);
root_bar.set_message("Running...");
root_bar.set_style(spinner_style);
root_bar.tick();
root_bar.enable_steady_tick(100);
}
let arc_self = self.clone();
let eval_limit = arc_self.clone().eval_limit();
// FIXME: Saner logging
let mut futures = Vec::new();
for chunk in self.nodes.chunks(eval_limit) {
let arc_self = self.clone();
let multi = multi.clone();
let (spinner_style, failing_spinner_style) = get_spinner_styles(alignment);
// FIXME: Eww
let chunk: Vec<String> = chunk.iter().map(|s| s.to_string()).collect();
futures.push(tokio::spawn(async move {
let drv = {
// Evaluation phase
let permit = arc_self.parallelism_limit.evaluation.acquire().await.unwrap();
let bar = set_up_batch_progress_bar!(multi, spinner_style, chunk,
"Evaluating configuration...",
"Evaluating configurations for {} nodes"
);
let drv = match arc_self.hive.eval_selected(&chunk, Some(bar.clone())).await {
Ok(drv) => {
bar.finish_and_clear();
drv
}
Err(e) => {
bar.set_style(failing_spinner_style.clone());
bar.abandon_with_message(&format!("Evalation failed: {}", e));
let mut results = arc_self.results.lock().await;
let stage = DeploymentStage::Evaluate(chunk.clone());
results.push(DeploymentResult::failure(stage, None));
return;
}
};
drop(permit);
drv
};
let profiles = {
// Build phase
let permit = arc_self.parallelism_limit.build.acquire().await.unwrap();
let bar = set_up_batch_progress_bar!(multi, spinner_style, chunk,
"Building configuration...",
"Building configurations for {} nodes"
);
// FIXME: Remote build?
let mut builder = host::local();
if arc_self.options.progress_bar {
builder.set_progress_bar(bar.clone());
}
let profiles = match drv.realize(&mut *builder).await {
Ok(profiles) => {
let goal = arc_self.goal;
if goal == DeploymentGoal::Build {
bar.finish_with_message(goal.success_str().unwrap());
let mut results = arc_self.results.lock().await;
let stage = DeploymentStage::Build(chunk.clone());
let logs = builder.dump_logs().await.map(|s| s.to_string());
results.push(DeploymentResult::success(stage, logs));
return;
} else {
bar.finish_and_clear();
profiles
}
}
Err(e) => {
bar.set_style(failing_spinner_style.clone());
bar.abandon_with_message(&format!("Build failed: {}", e));
let mut results = arc_self.results.lock().await;
let stage = DeploymentStage::Build(chunk.clone());
let logs = builder.dump_logs().await.map(|s| s.to_string());
results.push(DeploymentResult::failure(stage, logs));
return;
}
};
drop(permit);
profiles
};
// Apply phase
let mut futures = Vec::new();
for node in chunk {
let arc_self = arc_self.clone();
let multi = multi.clone();
let spinner_style = spinner_style.clone();
let failing_spinner_style = failing_spinner_style.clone();
let mut host = {
let mut node_hosts = arc_self.node_hosts.lock().await;
node_hosts.remove(&node).unwrap()
};
let profile = profiles.get(&node).cloned()
.expect(&format!("Somehow profile for {} was not built", node));
futures.push(tokio::spawn(async move {
let permit = arc_self.parallelism_limit.apply.acquire().await.unwrap();
let bar = multi.add(ProgressBar::new(100));
bar.set_style(spinner_style);
bar.set_prefix(&node);
bar.set_message("Starting...");
bar.tick();
bar.enable_steady_tick(100);
if arc_self.options.progress_bar {
host.set_progress_bar(bar.clone());
}
let copy_options = arc_self.options.to_copy_options()
.include_outputs(true);
let goal = arc_self.goal;
match host.deploy(&profile, goal, copy_options).await {
Ok(_) => {
bar.finish_with_message(goal.success_str().unwrap());
let mut results = arc_self.results.lock().await;
let stage = DeploymentStage::Apply(node.clone());
let logs = host.dump_logs().await.map(|s| s.to_string());
results.push(DeploymentResult::success(stage, logs));
}
Err(e) => {
bar.set_style(failing_spinner_style);
bar.abandon_with_message(&format!("Failed: {}", e));
let mut results = arc_self.results.lock().await;
let stage = DeploymentStage::Apply(node.clone());
let logs = host.dump_logs().await.map(|s| s.to_string());
results.push(DeploymentResult::failure(stage, logs));
}
}
drop(permit);
}));
}
join_all(futures).await;
}));
}
let wait_for_tasks = tokio::spawn(async move {
join_all(futures).await;
root_bar.finish_with_message("Finished");
});
let tasks_result = if self.options.progress_bar {
let wait_for_bars = tokio::task::spawn_blocking(move || {
multi.join().unwrap();
});
let (tasks_result, _) = join!(wait_for_tasks, wait_for_bars);
tasks_result
} else {
wait_for_tasks.await
};
if let Err(e) = tasks_result {
log::error!("Deployment process failed: {}", e);
}
self.print_logs().await;
}
async fn print_logs(&self) {
let results = self.results.lock().await;
for result in results.iter() {
if !result.is_successful() {
result.print();
}
}
}
fn node_name_alignment(&self) -> usize {
if let Some(len) = self.nodes.iter().map(|n| n.len()).max() {
max(BATCH_OPERATION_LABEL.len(), len)
} else {
BATCH_OPERATION_LABEL.len()
}
}
fn eval_limit(&self) -> usize {
if let Some(limit) = self.evaluation_node_limit.get_limit() {
limit
} else {
self.nodes.len()
}
}
}
#[derive(Debug)]
pub struct ParallelismLimit {
/// Limit of concurrent evaluation processes.
evaluation: Semaphore,
/// Limit of concurrent build processes.
build: Semaphore,
/// Limit of concurrent apply processes.
apply: Semaphore,
}
impl Default for ParallelismLimit {
fn default() -> Self {
Self {
evaluation: Semaphore::new(1),
build: Semaphore::new(2),
apply: Semaphore::new(10),
}
}
}
impl ParallelismLimit {
// Do we actually want this to be configurable?
/*
/// Sets the concurrent evaluation limit.
///
/// This limits the number of evaluation processes, not
/// the number of nodes in each evaluation process.
/// The latter is controlled in DeploymentOptions.
pub fn set_evaluation_limit(&mut self, limit: usize) {
self.evaluation = Semaphore::new(limit);
}
*/
/// Sets the concurrent build limit.
pub fn set_build_limit(&mut self, limit: usize) {
self.build = Semaphore::new(limit);
}
/// Sets the concurrent apply limit.
pub fn set_apply_limit(&mut self, limit: usize) {
self.apply = Semaphore::new(limit);
}
}
#[derive(Copy, Clone, Debug)]
pub struct DeploymentOptions {
/// Whether to show condensed progress bars.
///
/// If set to false, verbose logs will be displayed instead.
progress_bar: bool,
/// Whether to use binary caches when building.
substituters_build: bool,
/// Whether to use binary caches when copying closures to remote hosts.
substituters_push: bool,
/// Whether to use gzip when copying closures to remote hosts.
gzip: bool,
}
impl Default for DeploymentOptions {
fn default() -> Self {
Self {
progress_bar: true,
substituters_build: true,
substituters_push: true,
gzip: true,
}
}
}
impl DeploymentOptions {
pub fn set_progress_bar(&mut self, value: bool) {
self.progress_bar = value;
}
pub fn set_substituters_build(&mut self, value: bool) {
self.substituters_build = value;
}
pub fn set_substituters_push(&mut self, value: bool) {
self.substituters_push = value;
}
pub fn set_gzip(&mut self, value: bool) {
self.gzip = value;
}
fn to_copy_options(&self) -> CopyOptions {
let options = CopyOptions::default();
options
.use_substitutes(self.substituters_push)
.gzip(self.gzip)
}
}
/// Limit of the number of nodes in each evaluation process.
///
/// The evaluation process is very RAM-intensive, with memory
/// consumption scaling linearly with the number of nodes
/// evaluated at the same time. This can be a problem if you
/// are deploying to a large number of nodes at the same time,
/// where `nix-instantiate` may consume too much RAM and get
/// killed by the OS (`NixKilled` error).
///
/// Evaluating each node on its own is not an efficient solution,
/// with total CPU time and memory consumption vastly exceeding the
/// case where we evaluate the same set of nodes at the same time
/// (TODO: Provide statistics).
///
/// To overcome this problem, we split the evaluation process into
/// chunks when necessary, with the maximum number of nodes in
/// each `nix-instantiate` invocation determined with:
///
/// - A simple heuristic based on remaining memory in the system
/// - A supplied number
/// - No limit at all
#[derive(Copy, Clone, Debug)]
pub enum EvaluationNodeLimit {
/// Use a naive heuristic based on available memory.
Heuristic,
/// Supply the maximum number of nodes.
Manual(usize),
/// Do not limit the number of nodes in each evaluation process
None,
}
impl Default for EvaluationNodeLimit {
fn default() -> Self {
Self::Heuristic
}
}
impl EvaluationNodeLimit {
/// Returns the maximum number of hosts in each evaluation.
///
/// The result should be cached.
pub fn get_limit(&self) -> Option<usize> {
match self {
EvaluationNodeLimit::Heuristic => {
if let Ok(mem_info) = sys_info::mem_info() {
let mut mb = mem_info.avail / 1024;
if mb >= EVAL_RESERVE_MB {
mb -= EVAL_RESERVE_MB;
}
let nodes = mb / EVAL_PER_HOST_MB;
if nodes == 0 {
Some(1)
} else {
Some(nodes as usize)
}
} else {
Some(10)
}
}
EvaluationNodeLimit::Manual(limit) => Some(*limit),
EvaluationNodeLimit::None => None,
}
}
}

160
src/nix/hive.rs Normal file
View file

@ -0,0 +1,160 @@
use std::collections::HashMap;
use std::io::Write;
use std::path::{Path, PathBuf};
use indicatif::ProgressBar;
use tempfile::{NamedTempFile, TempPath};
use tokio::process::Command;
use serde::Serialize;
use super::{
StoreDerivation,
NixResult,
NodeConfig,
ProfileMap,
};
use super::NixCommand;
use crate::util::CommandExecution;
const HIVE_EVAL: &'static [u8] = include_bytes!("eval.nix");
#[derive(Debug)]
pub struct Hive {
hive: PathBuf,
eval_nix: TempPath,
show_trace: bool,
}
impl Hive {
pub fn new<P: AsRef<Path>>(hive: P) -> NixResult<Self> {
let mut eval_nix = NamedTempFile::new().unwrap();
eval_nix.write_all(HIVE_EVAL).unwrap();
Ok(Self {
hive: hive.as_ref().to_owned(),
eval_nix: eval_nix.into_temp_path(),
show_trace: false,
})
}
pub fn show_trace(&mut self, value: bool) {
self.show_trace = value;
}
pub fn as_path(&self) -> &Path {
&self.hive
}
/// Retrieve deployment info for all nodes
pub async fn deployment_info(&self) -> NixResult<HashMap<String, NodeConfig>> {
// FIXME: Really ugly :(
let s: String = self.nix_instantiate("hive.deploymentConfigJson").eval()
.capture_json().await?;
Ok(serde_json::from_str(&s).unwrap())
}
/// Evaluates selected nodes.
///
/// Evaluation may take up a lot of memory, so we make it possible
/// to split up the evaluation process into chunks and run them
/// concurrently with other processes (e.g., build and apply).
pub async fn eval_selected(&self, nodes: &Vec<String>, progress_bar: Option<ProgressBar>) -> NixResult<StoreDerivation<ProfileMap>> {
let nodes_expr = SerializedNixExpresssion::new(nodes)?;
let expr = format!("hive.buildSelected {{ names = {}; }}", nodes_expr.expression());
let command = self.nix_instantiate(&expr).instantiate();
let mut execution = CommandExecution::new("(eval)", command);
if let Some(bar) = progress_bar {
execution.set_progress_bar(bar);
}
let eval = execution
.capture_store_path().await?;
let drv = eval.to_derivation()
.expect("The result should be a store derivation");
Ok(drv)
}
/// Evaluates an expression using values from the configuration
pub async fn introspect(&self, expression: String) -> NixResult<String> {
let expression = format!("toJSON (hive.introspect ({}))", expression);
self.nix_instantiate(&expression).eval()
.capture_json().await
}
fn nix_instantiate(&self, expression: &str) -> NixInstantiate {
NixInstantiate::new(&self, expression.to_owned())
}
}
struct NixInstantiate<'hive> {
hive: &'hive Hive,
expression: String,
}
impl<'hive> NixInstantiate<'hive> {
fn new(hive: &'hive Hive, expression: String) -> Self {
Self {
hive,
expression,
}
}
fn instantiate(self) -> Command {
// FIXME: unwrap
// Technically filenames can be arbitrary byte strings (OsStr),
// but Nix may not like it...
let mut command = Command::new("nix-instantiate");
command
.arg("--no-gc-warning")
.arg("-E")
.arg(format!(
"with builtins; let eval = import {}; hive = eval {{ rawHive = import {}; }}; in {}",
self.hive.eval_nix.to_str().unwrap(),
self.hive.as_path().to_str().unwrap(),
self.expression,
));
if self.hive.show_trace {
command.arg("--show-trace");
}
command
}
fn eval(self) -> Command {
let mut command = self.instantiate();
command.arg("--eval").arg("--json");
command
}
}
/// A serialized Nix expression.
///
/// Very hacky and involves an Import From Derivation, so should be
/// avoided as much as possible. But I suppose it's more robust than attempting
/// to generate Nix expressions directly or escaping a JSON string to strip
/// off Nix interpolation.
struct SerializedNixExpresssion {
json_file: TempPath,
}
impl SerializedNixExpresssion {
pub fn new<'de, T>(data: T) -> NixResult<Self> where T: Serialize {
let mut tmp = NamedTempFile::new()?;
let json = serde_json::to_vec(&data).expect("Could not serialize data");
tmp.write_all(&json)?;
Ok(Self {
json_file: tmp.into_temp_path(),
})
}
pub fn expression(&self) -> String {
format!("(builtins.fromJSON (builtins.readFile {}))", self.json_file.to_str().unwrap())
}
}

View file

@ -1,16 +1,15 @@
use std::process::Stdio;
use std::collections::HashSet;
use std::convert::TryInto;
use console::style;
use async_trait::async_trait;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::Command;
use indicatif::ProgressBar;
use super::{StorePath, DeploymentGoal, NixResult, NixError, NixCommand, SYSTEM_PROFILE};
use super::{StorePath, Profile, DeploymentGoal, NixResult, NixError, NixCommand, SYSTEM_PROFILE};
use crate::util::CommandExecution;
pub(crate) fn local() -> Box<dyn Host + 'static> {
Box::new(Local {})
Box::new(Local::new())
}
#[derive(Copy, Clone, Debug)]
@ -81,9 +80,22 @@ pub trait Host: Send + Sync + std::fmt::Debug {
Ok(paths)
}
/// Pushes and optionally activates a profile to the host.
async fn deploy(&mut self, profile: &Profile, goal: DeploymentGoal, copy_options: CopyOptions) -> NixResult<()> {
self.copy_closure(profile.as_store_path(), CopyDirection::ToRemote, copy_options).await?;
if goal.requires_activation() {
self.activate(profile, goal).await?;
}
Ok(())
}
#[allow(unused_variables)]
/// Activates a system profile on the host, if it runs NixOS.
async fn activate(&mut self, profile: &StorePath, goal: DeploymentGoal) -> NixResult<()> {
///
/// The profile must already exist on the host. You should probably use deploy instead.
async fn activate(&mut self, profile: &Profile, goal: DeploymentGoal) -> NixResult<()> {
Err(NixError::Unsupported)
}
@ -93,7 +105,7 @@ pub trait Host: Send + Sync + std::fmt::Debug {
}
/// Dumps human-readable unstructured log messages related to the host.
async fn dump_logs(&self) -> Option<&[String]> {
async fn dump_logs(&self) -> Option<&str> {
None
}
}
@ -103,7 +115,19 @@ pub trait Host: Send + Sync + std::fmt::Debug {
/// It may not be capable of realizing some derivations
/// (e.g., building Linux derivations on macOS).
#[derive(Debug)]
pub struct Local {}
pub struct Local {
progress_bar: Option<ProgressBar>,
logs: String,
}
impl Local {
pub fn new() -> Self {
Self {
progress_bar: None,
logs: String::new(),
}
}
}
#[async_trait]
impl Host for Local {
@ -111,31 +135,58 @@ impl Host for Local {
Ok(())
}
async fn realize_remote(&mut self, derivation: &StorePath) -> NixResult<Vec<StorePath>> {
Command::new("nix-store")
let mut command = Command::new("nix-store");
command
.arg("--no-gc-warning")
.arg("--realise")
.arg(derivation.as_path())
.capture_output()
.await
.map(|paths| {
paths.lines().map(|p| p.to_string().into()).collect()
})
.arg(derivation.as_path());
let mut execution = CommandExecution::new("local", command);
if let Some(bar) = self.progress_bar.as_ref() {
execution.set_progress_bar(bar.clone());
}
async fn activate(&mut self, profile: &StorePath, goal: DeploymentGoal) -> NixResult<()> {
let profile = profile.as_path().to_str().unwrap();
execution.run().await?;
let (stdout, _) = execution.get_logs();
stdout.unwrap().lines().map(|p| p.to_string().try_into()).collect()
}
async fn activate(&mut self, profile: &Profile, goal: DeploymentGoal) -> NixResult<()> {
if goal.should_switch_profile() {
let path = profile.as_path().to_str().unwrap();
Command::new("nix-env")
.args(&["--profile", SYSTEM_PROFILE])
.args(&["--set", profile])
.args(&["--set", path])
.passthrough()
.await?;
}
let activation_command = format!("{}/bin/switch-to-configuration", profile);
Command::new(activation_command)
.arg(goal.as_str().unwrap())
.passthrough()
.await
let activation_command = profile.activation_command(goal).unwrap();
let mut command = Command::new(&activation_command[0]);
command
.args(&activation_command[1..]);
let mut execution = CommandExecution::new("local", command);
if let Some(bar) = self.progress_bar.as_ref() {
execution.set_progress_bar(bar.clone());
}
let result = execution.run().await;
// FIXME: Bad - Order of lines is messed up
let (stdout, stderr) = execution.get_logs();
self.logs += stdout.unwrap();
self.logs += stderr.unwrap();
result
}
fn set_progress_bar(&mut self, bar: ProgressBar) {
self.progress_bar = Some(bar);
}
async fn dump_logs(&self) -> Option<&str> {
Some(&self.logs)
}
}
@ -150,8 +201,8 @@ pub struct SSH {
friendly_name: String,
path_cache: HashSet<StorePath>,
progress: Option<ProgressBar>,
logs: Vec<String>,
progress_bar: Option<ProgressBar>,
logs: String,
}
#[async_trait]
@ -162,29 +213,33 @@ impl Host for SSH {
}
async fn realize_remote(&mut self, derivation: &StorePath) -> NixResult<Vec<StorePath>> {
// FIXME
self.ssh(&["nix-store", "--no-gc-warning", "--realise", derivation.as_path().to_str().unwrap()])
let paths = self.ssh(&["nix-store", "--no-gc-warning", "--realise", derivation.as_path().to_str().unwrap()])
.capture_output()
.await
.map(|paths| {
paths.lines().map(|p| p.to_string().into()).collect()
})
}
async fn activate(&mut self, profile: &StorePath, goal: DeploymentGoal) -> NixResult<()> {
let profile = profile.as_path().to_str().unwrap();
.await;
match paths {
Ok(paths) => {
paths.lines().map(|p| p.to_string().try_into()).collect()
}
Err(e) => Err(e),
}
}
async fn activate(&mut self, profile: &Profile, goal: DeploymentGoal) -> NixResult<()> {
if goal.should_switch_profile() {
let set_profile = self.ssh(&["nix-env", "--profile", SYSTEM_PROFILE, "--set", profile]);
let path = profile.as_path().to_str().unwrap();
let set_profile = self.ssh(&["nix-env", "--profile", SYSTEM_PROFILE, "--set", path]);
self.run_command(set_profile).await?;
}
let activation_command = format!("{}/bin/switch-to-configuration", profile);
let command = self.ssh(&[&activation_command, goal.as_str().unwrap()]);
let activation_command = profile.activation_command(goal).unwrap();
let v: Vec<&str> = activation_command.iter().map(|s| &**s).collect();
let command = self.ssh(&v);
self.run_command(command).await
}
fn set_progress_bar(&mut self, bar: ProgressBar) {
self.progress = Some(bar);
self.progress_bar = Some(bar);
}
async fn dump_logs(&self) -> Option<&[String]> {
async fn dump_logs(&self) -> Option<&str> {
Some(&self.logs)
}
}
@ -197,44 +252,26 @@ impl SSH {
host,
friendly_name,
path_cache: HashSet::new(),
progress: None,
logs: Vec::new(),
progress_bar: None,
logs: String::new(),
}
}
async fn run_command(&mut self, mut command: Command) -> NixResult<()> {
command.stdin(Stdio::null());
command.stdout(Stdio::piped());
command.stderr(Stdio::piped());
async fn run_command(&mut self, command: Command) -> NixResult<()> {
let mut execution = CommandExecution::new(&self.friendly_name, command);
let mut child = command.spawn()?;
let mut stderr = BufReader::new(child.stderr.as_mut().unwrap());
loop {
let mut line = String::new();
let len = stderr.read_line(&mut line).await.unwrap();
if len == 0 {
break;
if let Some(bar) = self.progress_bar.as_ref() {
execution.set_progress_bar(bar.clone());
}
let trimmed = line.trim_end();
if let Some(progress) = self.progress.as_mut() {
progress.set_message(trimmed);
progress.inc(0);
} else {
eprintln!("{} | {}", style(&self.friendly_name).cyan(), trimmed);
}
self.logs.push(line);
}
let exit = child.wait().await?;
let result = execution.run().await;
if exit.success() {
Ok(())
} else {
Err(NixError::NixFailure { exit_code: exit.code().unwrap() })
}
// FIXME: Bad - Order of lines is messed up
let (stdout, stderr) = execution.get_logs();
self.logs += stdout.unwrap();
self.logs += stderr.unwrap();
result
}
fn ssh_target(&self) -> String {

View file

@ -1,29 +1,35 @@
use std::path::{Path, PathBuf};
use std::convert::AsRef;
use std::io::Write;
use std::convert::TryFrom;
use std::process::Stdio;
use std::collections::HashMap;
use std::fs;
use async_trait::async_trait;
use indicatif::ProgressBar;
use serde::de::DeserializeOwned;
use serde::{Serialize, Deserialize};
use serde::Deserialize;
use snafu::Snafu;
use tempfile::{NamedTempFile, TempPath};
use tokio::process::Command;
use tokio::sync::Mutex;
use crate::util::CommandExecution;
pub mod host;
pub use host::{Host, CopyDirection, CopyOptions};
use host::SSH;
const HIVE_EVAL: &'static [u8] = include_bytes!("eval.nix");
pub mod hive;
pub use hive::Hive;
pub mod store;
pub use store::{StorePath, StoreDerivation};
pub mod profile;
pub use profile::{Profile, ProfileMap};
pub mod deployment;
pub use deployment::{DeploymentGoal, Deployment};
pub const SYSTEM_PROFILE: &'static str = "/nix/var/nix/profiles/system";
pub type NixResult<T> = Result<T, NixError>;
#[non_exhaustive]
#[derive(Debug, Snafu)]
pub enum NixError {
#[snafu(display("I/O Error: {}", error))]
@ -41,6 +47,12 @@ pub enum NixError {
#[snafu(display("This operation is not supported"))]
Unsupported,
#[snafu(display("Invalid Nix store path"))]
InvalidStorePath,
#[snafu(display("Invalid NixOS system profile"))]
InvalidProfile,
#[snafu(display("Nix Error: {}", message))]
Unknown { message: String },
}
@ -51,85 +63,8 @@ impl From<std::io::Error> for NixError {
}
}
pub struct Hive {
hive: PathBuf,
eval_nix: TempPath,
builder: Box<dyn Host>,
show_trace: bool,
}
impl Hive {
pub fn new<P: AsRef<Path>>(hive: P) -> NixResult<Self> {
let mut eval_nix = NamedTempFile::new()?;
eval_nix.write_all(HIVE_EVAL)?;
Ok(Self {
hive: hive.as_ref().to_owned(),
eval_nix: eval_nix.into_temp_path(),
builder: host::local(),
show_trace: false,
})
}
pub fn show_trace(&mut self, value: bool) {
self.show_trace = value;
}
/// Retrieve deployment info for all nodes
pub async fn deployment_info(&self) -> NixResult<HashMap<String, DeploymentConfig>> {
// FIXME: Really ugly :(
let s: String = self.nix_instantiate("hive.deploymentConfigJson").eval()
.capture_json().await?;
Ok(serde_json::from_str(&s).unwrap())
}
/// Builds selected nodes
pub async fn build_selected(&mut self, nodes: Vec<String>) -> NixResult<HashMap<String, StorePath>> {
let nodes_expr = SerializedNixExpresssion::new(&nodes)?;
let expr = format!("hive.buildSelected {{ names = {}; }}", nodes_expr.expression());
self.build_common(&expr).await
}
#[allow(dead_code)]
/// Builds all node configurations
pub async fn build_all(&mut self) -> NixResult<HashMap<String, StorePath>> {
self.build_common("hive.buildAll").await
}
/// Evaluates an expression using values from the configuration
pub async fn introspect(&mut self, expression: String) -> NixResult<String> {
let expression = format!("toJSON (hive.introspect ({}))", expression);
self.nix_instantiate(&expression).eval()
.capture_json().await
}
/// Builds node configurations
///
/// Expects the resulting store path to point to a JSON file containing
/// a map of node name -> store path.
async fn build_common(&mut self, expression: &str) -> NixResult<HashMap<String, StorePath>> {
let build: StorePath = self.nix_instantiate(expression).instantiate()
.capture_store_path().await?;
let realization = self.builder.realize(&build).await?;
assert!(realization.len() == 1);
let json = fs::read_to_string(&realization[0].as_path())?;
let result_map = serde_json::from_str(&json)
.expect("Bad result from our own build routine");
Ok(result_map)
}
fn nix_instantiate(&self, expression: &str) -> NixInstantiate {
NixInstantiate::new(&self, expression.to_owned())
}
}
#[derive(Debug, Clone, Deserialize)]
pub struct DeploymentConfig {
pub struct NodeConfig {
#[serde(rename = "targetHost")]
target_host: Option<String>,
@ -141,7 +76,7 @@ pub struct DeploymentConfig {
tags: Vec<String>,
}
impl DeploymentConfig {
impl NodeConfig {
pub fn tags(&self) -> &[String] { &self.tags }
pub fn allows_local_deployment(&self) -> bool { self.allow_local_deployment }
@ -154,110 +89,6 @@ impl DeploymentConfig {
}
}
#[derive(Debug, Copy, Clone)]
pub enum DeploymentGoal {
/// Push the closures only.
Push,
/// Make the configuration the boot default and activate now.
Switch,
/// Make the configuration the boot default.
Boot,
/// Activate the configuration, but don't make it the boot default.
Test,
/// Show what would be done if this configuration were activated.
DryActivate,
}
impl DeploymentGoal {
pub fn from_str(s: &str) -> Option<Self> {
match s {
"push" => Some(Self::Push),
"switch" => Some(Self::Switch),
"boot" => Some(Self::Boot),
"test" => Some(Self::Test),
"dry-activate" => Some(Self::DryActivate),
_ => None,
}
}
pub fn as_str(&self) -> Option<&'static str> {
use DeploymentGoal::*;
match self {
Push => None,
Switch => Some("switch"),
Boot => Some("boot"),
Test => Some("test"),
DryActivate => Some("dry-activate"),
}
}
pub fn success_str(&self) -> Option<&'static str> {
use DeploymentGoal::*;
match self {
Push => Some("Pushed"),
Switch => Some("Activation successful"),
Boot => Some("Will be activated next boot"),
Test => Some("Activation successful (test)"),
DryActivate => Some("Dry activation successful"),
}
}
pub fn should_switch_profile(&self) -> bool {
use DeploymentGoal::*;
match self {
Boot | Switch => true,
_ => false,
}
}
}
struct NixInstantiate<'hive> {
hive: &'hive Hive,
expression: String,
}
impl<'hive> NixInstantiate<'hive> {
fn new(hive: &'hive Hive, expression: String) -> Self {
Self {
hive,
expression,
}
}
fn instantiate(self) -> Command {
// FIXME: unwrap
// Technically filenames can be arbitrary byte strings (OsStr),
// but Nix may not like it...
let mut command = Command::new("nix-instantiate");
command
.arg("--no-gc-warning")
.arg("-E")
.arg(format!(
"with builtins; let eval = import {}; hive = eval {{ rawHive = import {}; }}; in {}",
self.hive.eval_nix.to_str().unwrap(),
self.hive.hive.to_str().unwrap(),
self.expression,
));
if self.hive.show_trace {
command.arg("--show-trace");
}
command
}
fn eval(self) -> Command {
let mut command = self.instantiate();
command.arg("--eval").arg("--json");
command
}
}
#[async_trait]
trait NixCommand {
async fn passthrough(&mut self) -> NixResult<()>;
@ -317,131 +148,37 @@ impl NixCommand for Command {
/// Captures a single store path.
async fn capture_store_path(&mut self) -> NixResult<StorePath> {
let output = self.capture_output().await?;
Ok(StorePath(output.trim_end().into()))
let path = output.trim_end().to_owned();
StorePath::try_from(path)
}
}
/// A Nix store path.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StorePath(PathBuf);
impl StorePath {
/// Returns the store path
pub fn as_path(&self) -> &Path {
&self.0
}
#[async_trait]
impl NixCommand for CommandExecution {
async fn passthrough(&mut self) -> NixResult<()> {
self.run().await
}
impl From<String> for StorePath {
fn from(s: String) -> Self {
Self(s.into())
}
/// Captures output as a String.
async fn capture_output(&mut self) -> NixResult<String> {
self.run().await?;
let (stdout, _) = self.get_logs();
Ok(stdout.unwrap().to_owned())
}
impl Into<PathBuf> for StorePath {
fn into(self) -> PathBuf {
self.0
}
}
/// A serialized Nix expression.
///
/// Very hacky and involves an Import From Derivation, so should be
/// avoided as much as possible. But I suppose it's more robust than attempting
/// to generate Nix expressions directly or escaping a JSON string to strip
/// off Nix interpolation.
struct SerializedNixExpresssion {
json_file: TempPath,
}
impl SerializedNixExpresssion {
pub fn new<'de, T>(data: T) -> NixResult<Self> where T: Serialize {
let mut tmp = NamedTempFile::new()?;
let json = serde_json::to_vec(&data).expect("Could not serialize data");
tmp.write_all(&json)?;
Ok(Self {
json_file: tmp.into_temp_path(),
/// Captures deserialized output from JSON.
async fn capture_json<T>(&mut self) -> NixResult<T> where T: DeserializeOwned {
let output = self.capture_output().await?;
serde_json::from_str(&output).map_err(|_| NixError::BadOutput {
output: output.clone()
})
}
pub fn expression(&self) -> String {
format!("(builtins.fromJSON (builtins.readFile {}))", self.json_file.to_str().unwrap())
}
}
#[derive(Debug)]
pub struct DeploymentTask {
/// Name of the target.
name: String,
/// The target to deploy to.
target: Mutex<Box<dyn Host>>,
/// Nix store path to the system profile to deploy.
profile: StorePath,
/// The goal of this deployment.
goal: DeploymentGoal,
/// Options used for copying closures to the remote host.
copy_options: CopyOptions,
}
impl DeploymentTask {
pub fn new(name: String, target: Box<dyn Host>, profile: StorePath, goal: DeploymentGoal) -> Self {
Self {
name,
target: Mutex::new(target),
profile,
goal,
copy_options: CopyOptions::default(),
}
}
pub fn name(&self) -> &str { &self.name }
pub fn goal(&self) -> DeploymentGoal { self.goal }
/// Set options used for copying closures to the remote host.
pub fn set_copy_options(&mut self, options: CopyOptions) {
self.copy_options = options;
}
/// Set the progress bar used during deployment.
pub async fn set_progress_bar(&mut self, progress: ProgressBar) {
let mut target = self.target.lock().await;
target.set_progress_bar(progress);
}
/// Executes the deployment.
pub async fn execute(&mut self) -> NixResult<()> {
match self.goal {
DeploymentGoal::Push => {
self.push().await
}
_ => {
self.push_and_activate().await
}
}
}
/// Takes the Host out, consuming the DeploymentTask.
pub async fn to_host(self) -> Box<dyn Host> {
self.target.into_inner()
}
async fn push(&mut self) -> NixResult<()> {
let mut target = self.target.lock().await;
let options = self.copy_options.include_outputs(true);
target.copy_closure(&self.profile, CopyDirection::ToRemote, options).await
}
async fn push_and_activate(&mut self) -> NixResult<()> {
self.push().await?;
{
let mut target = self.target.lock().await;
target.activate(&self.profile, self.goal).await
}
/// Captures a single store path.
async fn capture_store_path(&mut self) -> NixResult<StorePath> {
let output = self.capture_output().await?;
let path = output.trim_end().to_owned();
StorePath::try_from(path)
}
}

112
src/nix/profile.rs Normal file
View file

@ -0,0 +1,112 @@
use std::collections::HashMap;
use std::convert::TryFrom;
use std::ops::{Deref, DerefMut};
use std::fs;
use std::path::Path;
use super::{
DeploymentGoal,
NixResult,
NixError,
StorePath,
};
/// A NixOS system profile.
#[derive(Clone, Debug)]
pub struct Profile(StorePath);
impl Profile {
pub fn from_store_path(path: StorePath) -> NixResult<Self> {
if
!path.is_dir() ||
!path.join("bin/switch-to-configuration").exists()
{
return Err(NixError::InvalidProfile);
}
if let None = path.to_str() {
Err(NixError::InvalidProfile)
} else {
Ok(Self(path))
}
}
/// Returns the command to activate this profile.
pub fn activation_command(&self, goal: DeploymentGoal) -> Option<Vec<String>> {
if let Some(goal) = goal.as_str() {
let path = self.as_path().join("bin/switch-to-configuration");
let switch_to_configuration = path.to_str()
.expect("The string should be UTF-8 valid")
.to_string();
let mut v = Vec::new();
v.push(switch_to_configuration);
v.push(goal.to_string());
Some(v)
} else {
None
}
}
/// Returns the store path.
pub fn as_store_path(&self) -> &StorePath {
&self.0
}
/// Returns the raw store path.
pub fn as_path(&self) -> &Path {
&self.0.as_path()
}
}
/// A map of names to their associated NixOS system profiles.
#[derive(Debug)]
pub struct ProfileMap(HashMap<String, Profile>);
impl Deref for ProfileMap {
type Target = HashMap<String, Profile>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl DerefMut for ProfileMap {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl TryFrom<Vec<StorePath>> for ProfileMap {
type Error = NixError;
fn try_from(paths: Vec<StorePath>) -> NixResult<Self> {
match paths.len() {
0 => Err(NixError::BadOutput {
output: String::from("Build produced no outputs"),
}),
l if l > 1 => Err(NixError::BadOutput {
output: String::from("Build produced multiple outputs"),
}),
_ => {
// We expect a JSON file containing a
// HashMap<String, StorePath>
let path = paths[0].as_path();
let json: String = fs::read_to_string(path)?;
let mut raw_map: HashMap<String, StorePath> = serde_json::from_str(&json).map_err(|_| NixError::BadOutput {
output: String::from("The returned profile map is invalid"),
})?;
let mut checked_map = HashMap::new();
for (node, profile) in raw_map.drain() {
let profile = Profile::from_store_path(profile)?;
checked_map.insert(node, profile);
}
Ok(Self(checked_map))
}
}
}
}

86
src/nix/store.rs Normal file
View file

@ -0,0 +1,86 @@
use std::convert::{TryFrom, TryInto};
use std::marker::PhantomData;
use std::path::{Path, PathBuf};
use std::ops::Deref;
use serde::{Serialize, Deserialize};
use super::{Host, NixResult, NixError};
/// A Nix store path.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StorePath(PathBuf);
impl StorePath {
/// Returns the raw store path.
pub fn as_path(&self) -> &Path {
&self.0
}
/// Determines whether the path points to a derivation.
pub fn is_derivation(&self) -> bool {
if let Some(ext) = self.0.extension() {
ext == "drv"
} else {
false
}
}
/// Converts the store path into a store derivation.
pub fn to_derivation<T: TryFrom<Vec<StorePath>>>(self) -> Option<StoreDerivation<T>> {
if self.is_derivation() {
Some(StoreDerivation::<T>::from_store_path_unchecked(self))
} else {
None
}
}
}
impl Deref for StorePath {
type Target = Path;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl TryFrom<String> for StorePath {
type Error = NixError;
fn try_from(s: String) -> NixResult<Self> {
if s.starts_with("/nix/store/") {
Ok(Self(s.into()))
} else {
Err(NixError::InvalidStorePath)
}
}
}
impl Into<PathBuf> for StorePath {
fn into(self) -> PathBuf {
self.0
}
}
/// A store derivation (.drv) that will result in a T when built.
pub struct StoreDerivation<T: TryFrom<Vec<StorePath>>>{
path: StorePath,
_target: PhantomData<T>,
}
impl<T: TryFrom<Vec<StorePath>>> StoreDerivation<T> {
fn from_store_path_unchecked(path: StorePath) -> Self {
Self {
path,
_target: PhantomData,
}
}
}
impl<T: TryFrom<Vec<StorePath>, Error=NixError>> StoreDerivation<T> {
/// Builds the store derivation on a host, resulting in a T.
pub async fn realize(&self, host: &mut dyn Host) -> NixResult<T> {
let paths: Vec<StorePath> = host.realize(&self.path).await?;
paths.try_into()
}
}

View file

@ -1,17 +1,114 @@
use std::collections::HashMap;
use std::path::PathBuf;
use std::convert::AsRef;
use std::fs;
use std::path::PathBuf;
use std::process::Stdio;
use clap::{App, Arg, ArgMatches};
use console::style;
use futures::future::join3;
use glob::Pattern as GlobPattern;
use indicatif::ProgressBar;
use tokio::io::{AsyncRead, AsyncBufReadExt, BufReader};
use tokio::process::Command;
use super::nix::{DeploymentConfig, Hive, NixResult};
use super::nix::{NodeConfig, Hive, NixResult, NixError};
enum NodeFilter {
NameFilter(GlobPattern),
TagFilter(GlobPattern),
}
/// Non-interactive execution of an arbitrary Nix command.
pub struct CommandExecution {
label: String,
command: Command,
progress_bar: Option<ProgressBar>,
stdout: Option<String>,
stderr: Option<String>,
}
impl CommandExecution {
pub fn new<S: AsRef<str>>(label: S, command: Command) -> Self {
Self {
label: label.as_ref().to_string(),
command,
progress_bar: None,
stdout: None,
stderr: None,
}
}
/// Provides a ProgressBar to use to display output.
pub fn set_progress_bar(&mut self, bar: ProgressBar) {
self.progress_bar = Some(bar);
}
/// Retrieve logs from the last invocation.
pub fn get_logs(&self) -> (Option<&String>, Option<&String>) {
(self.stdout.as_ref(), self.stderr.as_ref())
}
/// Run the command.
pub async fn run(&mut self) -> NixResult<()> {
self.command.stdin(Stdio::null());
self.command.stdout(Stdio::piped());
self.command.stderr(Stdio::piped());
self.stdout = Some(String::new());
self.stderr = Some(String::new());
let mut child = self.command.spawn()?;
let stdout = BufReader::new(child.stdout.take().unwrap());
let stderr = BufReader::new(child.stderr.take().unwrap());
async fn capture_stream<R: AsyncRead + Unpin>(mut stream: BufReader<R>, label: &str, mut progress_bar: Option<ProgressBar>) -> String {
let mut log = String::new();
loop {
let mut line = String::new();
let len = stream.read_line(&mut line).await.unwrap();
if len == 0 {
break;
}
let trimmed = line.trim_end();
if let Some(progress_bar) = progress_bar.as_mut() {
progress_bar.set_message(trimmed);
} else {
eprintln!("{} | {}", style(label).cyan(), trimmed);
}
log += trimmed;
log += "\n";
}
log
}
let futures = join3(
capture_stream(stdout, &self.label, self.progress_bar.clone()),
capture_stream(stderr, &self.label, self.progress_bar.clone()),
child.wait(),
);
let (stdout_str, stderr_str, wait) = futures.await;
self.stdout = Some(stdout_str);
self.stderr = Some(stderr_str);
let exit = wait?;
if exit.success() {
Ok(())
} else {
Err(NixError::NixFailure { exit_code: exit.code().unwrap() })
}
}
}
pub fn hive_from_args(args: &ArgMatches<'_>) -> NixResult<Hive> {
let path = match args.occurrences_of("config") {
0 => {
@ -84,7 +181,7 @@ pub fn hive_from_args(args: &ArgMatches<'_>) -> NixResult<Hive> {
Ok(hive)
}
pub fn filter_nodes(nodes: &HashMap<String, DeploymentConfig>, filter: &str) -> Vec<String> {
pub fn filter_nodes(nodes: &HashMap<String, NodeConfig>, filter: &str) -> Vec<String> {
let filters: Vec<NodeFilter> = filter.split(",").map(|pattern| {
use NodeFilter::*;
if let Some(tag_pattern) = pattern.strip_prefix("@") {