Clean up logging / progress display

This commit is contained in:
Zhaofeng Li 2021-02-09 19:28:45 -08:00
parent 8934726664
commit a2fa8f1da7
15 changed files with 486 additions and 346 deletions

1
Cargo.lock generated
View file

@ -96,6 +96,7 @@ name = "colmena"
version = "0.1.0"
dependencies = [
"async-trait",
"atty",
"clap",
"console",
"env_logger",

View file

@ -8,6 +8,7 @@ edition = "2018"
[dependencies]
async-trait = "0.1.42"
atty = "0.2"
clap = "2.33.3"
console = "0.13.0"
env_logger = "0.8.2"

View file

@ -17,5 +17,5 @@ in rustPlatform.buildRustPackage {
src = ./.;
};
};
cargoSha256 = "0xxp6hklnpdidrydvahv26vvpdysa7x7sf19vll8fb8zgbhjfvjm";
cargoSha256 = "0imalrw8im6zl5lq8k5j05msykax85lya39vq0fxagifdckcdfsb";
}

View file

@ -5,8 +5,8 @@ use clap::{Arg, App, SubCommand, ArgMatches};
use crate::nix::deployment::{
Deployment,
DeploymentGoal,
DeploymentTarget,
Goal,
Target,
DeploymentOptions,
EvaluationNodeLimit,
ParallelismLimit,
@ -134,12 +134,12 @@ pub async fn run(_global_args: &ArgMatches<'_>, local_args: &ArgMatches<'_>) {
// FIXME: This is ugly :/ Make an enum wrapper for this fake "keys" goal
let goal_arg = local_args.value_of("goal").unwrap();
let goal = if goal_arg == "keys" {
DeploymentGoal::Build
Goal::Build
} else {
DeploymentGoal::from_str(goal_arg).unwrap()
Goal::from_str(goal_arg).unwrap()
};
let build_only = goal == DeploymentGoal::Build && goal_arg != "keys";
let build_only = goal == Goal::Build && goal_arg != "keys";
let mut targets = HashMap::new();
for node in &selected_nodes {
@ -149,14 +149,14 @@ pub async fn run(_global_args: &ArgMatches<'_>, local_args: &ArgMatches<'_>) {
Some(host) => {
targets.insert(
node.clone(),
DeploymentTarget::new(host, config.clone()),
Target::new(host, config.clone()),
);
}
None => {
if build_only {
targets.insert(
node.clone(),
DeploymentTarget::new(localhost(), config.clone()),
Target::new(localhost(), config.clone()),
);
}
}

View file

@ -8,8 +8,8 @@ use tokio::process::Command;
use crate::nix::deployment::{
Deployment,
DeploymentGoal,
DeploymentTarget,
Goal,
Target,
DeploymentOptions,
};
use crate::nix::host;
@ -88,19 +88,19 @@ pub async fn run(_global_args: &ArgMatches<'_>, local_args: &ArgMatches<'_>) {
hostname::get().expect("Could not get hostname")
.to_string_lossy().into_owned()
};
let goal = DeploymentGoal::from_str(local_args.value_of("goal").unwrap()).unwrap();
let goal = Goal::from_str(local_args.value_of("goal").unwrap()).unwrap();
log::info!("Enumerating nodes...");
let all_nodes = hive.deployment_info().await.unwrap();
let target: DeploymentTarget = {
let target: Target = {
if let Some(info) = all_nodes.get(&hostname) {
if !info.allows_local_deployment() {
log::error!("Local deployment is not enabled for host {}.", hostname);
log::error!("Hint: Set deployment.allowLocalDeployment to true.");
quit::with_code(2);
}
DeploymentTarget::new(
Target::new(
host::local(),
info.clone(),
)

View file

@ -1,3 +1,5 @@
#![feature(async_closure)]
use std::env;
use clap::{App, AppSettings, Arg};

View file

@ -3,12 +3,10 @@ use std::sync::Arc;
use std::collections::HashMap;
use futures::future::join_all;
use futures::join;
use indicatif::{MultiProgress, ProgressBar, ProgressStyle, ProgressDrawTarget};
use tokio::sync::{Mutex, Semaphore};
use super::{Hive, Host, CopyOptions, NodeConfig, Profile, StoreDerivation, ProfileMap, host};
use crate::progress::get_spinner_styles;
use crate::progress::{Progress, ProcessProgress, OutputStyle};
/// Amount of RAM reserved for the system, in MB.
const EVAL_RESERVE_MB: u64 = 1024;
@ -19,26 +17,21 @@ const EVAL_PER_HOST_MB: u64 = 512;
const BATCH_OPERATION_LABEL: &'static str = "(...)";
macro_rules! set_up_batch_progress_bar {
($multi:ident, $style:ident, $chunk:ident, $single_text:expr, $batch_text:expr) => {{
let bar = $multi.add(ProgressBar::new(100));
bar.set_style($style.clone());
bar.enable_steady_tick(100);
($progress:ident, $style:ident, $chunk:ident, $single_text:expr, $batch_text:expr) => {{
if $chunk.len() == 1 {
bar.set_prefix(&$chunk[0]);
bar.set_message($single_text);
let mut bar = $progress.create_process_progress($chunk[0].to_string());
bar.log($single_text);
bar
} else {
bar.set_prefix(BATCH_OPERATION_LABEL);
bar.set_message(&format!($batch_text, $chunk.len()));
let mut bar = $progress.create_process_progress(BATCH_OPERATION_LABEL.to_string());
bar.log(&format!($batch_text, $chunk.len()));
bar
}
bar.inc(0);
bar
}};
}
#[derive(Debug, Copy, Clone, PartialEq)]
pub enum DeploymentGoal {
pub enum Goal {
/// Build the configurations only.
Build,
@ -58,7 +51,7 @@ pub enum DeploymentGoal {
DryActivate,
}
impl DeploymentGoal {
impl Goal {
pub fn from_str(s: &str) -> Option<Self> {
match s {
"build" => Some(Self::Build),
@ -72,7 +65,7 @@ impl DeploymentGoal {
}
pub fn as_str(&self) -> Option<&'static str> {
use DeploymentGoal::*;
use Goal::*;
match self {
Build => None,
Push => None,
@ -84,7 +77,7 @@ impl DeploymentGoal {
}
pub fn success_str(&self) -> Option<&'static str> {
use DeploymentGoal::*;
use Goal::*;
match self {
Build => Some("Configuration built"),
Push => Some("Pushed"),
@ -96,7 +89,7 @@ impl DeploymentGoal {
}
pub fn should_switch_profile(&self) -> bool {
use DeploymentGoal::*;
use Goal::*;
match self {
Boot | Switch => true,
_ => false,
@ -104,7 +97,7 @@ impl DeploymentGoal {
}
pub fn requires_activation(&self) -> bool {
use DeploymentGoal::*;
use Goal::*;
match self {
Build | Push => false,
_ => true,
@ -114,7 +107,7 @@ impl DeploymentGoal {
/// Internal deployment stages.
#[derive(Debug)]
enum DeploymentStage {
enum Stage {
Evaluate(Vec<String>),
Build(Vec<String>),
Apply(String),
@ -124,7 +117,7 @@ enum DeploymentStage {
#[derive(Debug)]
struct DeploymentResult {
/// Stage in which the deployment ended.
stage: DeploymentStage,
stage: Stage,
/// Whether the deployment succeeded or not.
success: bool,
@ -134,7 +127,7 @@ struct DeploymentResult {
}
impl DeploymentResult {
fn success(stage: DeploymentStage, logs: Option<String>) -> Self {
fn success(stage: Stage, logs: Option<String>) -> Self {
Self {
stage,
success: true,
@ -142,7 +135,7 @@ impl DeploymentResult {
}
}
fn failure(stage: DeploymentStage, logs: Option<String>) -> Self {
fn failure(stage: Stage, logs: Option<String>) -> Self {
Self {
stage,
success: false,
@ -155,7 +148,7 @@ impl DeploymentResult {
}
fn print(&self) {
use DeploymentStage::*;
use Stage::*;
if self.is_successful() {
unimplemented!();
@ -197,12 +190,12 @@ impl DeploymentResult {
/// A deployment target.
#[derive(Debug)]
pub struct DeploymentTarget {
pub struct Target {
host: Box<dyn Host>,
config: NodeConfig,
}
impl DeploymentTarget {
impl Target {
pub fn new(host: Box<dyn Host>, config: NodeConfig) -> Self {
Self { host, config }
}
@ -211,10 +204,10 @@ impl DeploymentTarget {
#[derive(Debug)]
pub struct Deployment {
hive: Hive,
goal: DeploymentGoal,
goal: Goal,
target_names: Vec<String>,
targets: Mutex<HashMap<String, DeploymentTarget>>,
progress_alignment: usize,
targets: Mutex<HashMap<String, Target>>,
label_width: usize,
parallelism_limit: ParallelismLimit,
evaluation_node_limit: EvaluationNodeLimit,
options: DeploymentOptions,
@ -222,11 +215,10 @@ pub struct Deployment {
}
impl Deployment {
pub fn new(hive: Hive, targets: HashMap<String, DeploymentTarget>, goal: DeploymentGoal) -> Self {
pub fn new(hive: Hive, targets: HashMap<String, Target>, goal: Goal) -> Self {
let target_names: Vec<String> = targets.keys().cloned().collect();
let progress_alignment = if let Some(len) = target_names.iter().map(|n| n.len()).max() {
let label_width = if let Some(len) = target_names.iter().map(|n| n.len()).max() {
max(BATCH_OPERATION_LABEL.len(), len)
} else {
BATCH_OPERATION_LABEL.len()
@ -237,7 +229,7 @@ impl Deployment {
goal,
target_names,
targets: Mutex::new(targets),
progress_alignment,
label_width,
parallelism_limit: ParallelismLimit::default(),
evaluation_node_limit: EvaluationNodeLimit::default(),
options: DeploymentOptions::default(),
@ -257,213 +249,171 @@ impl Deployment {
self.evaluation_node_limit = limit;
}
// FIXME: Duplication
/// Uploads keys only (user-facing)
pub async fn upload_keys(self: Arc<Self>) {
let multi = Arc::new(MultiProgress::new());
let root_bar = Arc::new(multi.add(ProgressBar::new(100)));
multi.set_draw_target(ProgressDrawTarget::stderr_nohz());
{
let (style, _) = self.spinner_styles();
root_bar.set_message("Uploading keys...");
root_bar.set_style(style);
root_bar.tick();
root_bar.enable_steady_tick(100);
}
let arc_self = self.clone();
let mut futures = Vec::new();
for node in self.target_names.iter() {
let node = node.to_owned();
let mut target = {
let mut targets = arc_self.targets.lock().await;
targets.remove(&node).unwrap()
};
let multi = multi.clone();
let arc_self = self.clone();
futures.push(tokio::spawn(async move {
let permit = arc_self.parallelism_limit.apply.acquire().await.unwrap();
let bar = multi.add(ProgressBar::new(100));
let (style, fail_style) = arc_self.spinner_styles();
bar.set_style(style);
bar.set_prefix(&node);
bar.tick();
bar.enable_steady_tick(100);
if let Err(e) = target.host.upload_keys(&target.config.keys).await {
bar.set_style(fail_style);
bar.abandon_with_message(&format!("Failed to upload keys: {}", e));
let mut results = arc_self.results.lock().await;
let stage = DeploymentStage::Apply(node.to_string());
let logs = target.host.dump_logs().await.map(|s| s.to_string());
results.push(DeploymentResult::failure(stage, logs));
return;
} else {
bar.finish_with_message("Keys uploaded");
}
drop(permit);
}));
}
let wait_for_tasks = tokio::spawn(async move {
join_all(futures).await;
root_bar.finish_with_message("Finished");
});
let tasks_result = if self.options.progress_bar {
let wait_for_bars = tokio::task::spawn_blocking(move || {
multi.join().unwrap();
});
let (tasks_result, _) = join!(wait_for_tasks, wait_for_bars);
tasks_result
} else {
wait_for_tasks.await
let progress = {
let mut progress = Progress::default();
progress.set_label_width(self.label_width);
Arc::new(progress)
};
if let Err(e) = tasks_result {
log::error!("Deployment process failed: {}", e);
let arc_self = self.clone();
{
let arc_self = self.clone();
progress.run(async move |progress| {
let mut futures = Vec::new();
for node in self.target_names.iter() {
let node = node.to_owned();
let mut target = {
let mut targets = arc_self.targets.lock().await;
targets.remove(&node).unwrap()
};
let arc_self = self.clone();
let progress = progress.clone();
// how come the bars show up only when initialized here???
futures.push(tokio::spawn(async move {
let permit = arc_self.parallelism_limit.apply.acquire().await.unwrap();
let mut process = progress.create_process_progress(node.clone());
process.log("Uploading keys...");
if let Err(e) = target.host.upload_keys(&target.config.keys).await {
process.failure(&format!("Failed to upload keys: {}", e));
let mut results = arc_self.results.lock().await;
let stage = Stage::Apply(node.to_string());
let logs = target.host.dump_logs().await.map(|s| s.to_string());
results.push(DeploymentResult::failure(stage, logs));
return;
} else {
process.success("Keys uploaded");
}
drop(permit);
}));
}
join_all(futures).await
}).await;
}
self.print_logs().await;
arc_self.print_logs().await;
}
/// Executes the deployment (user-facing)
///
/// Self must be wrapped inside an Arc.
pub async fn execute(self: Arc<Self>) {
let multi = Arc::new(MultiProgress::new());
let root_bar = Arc::new(multi.add(ProgressBar::new(100)));
multi.set_draw_target(ProgressDrawTarget::stderr_nohz());
{
let (style, _) = self.spinner_styles();
root_bar.set_message("Running...");
root_bar.set_style(style);
root_bar.tick();
root_bar.enable_steady_tick(100);
}
let progress = {
let mut progress = if !self.options.progress_bar {
Progress::with_style(OutputStyle::Plain)
} else {
Progress::default()
};
progress.set_label_width(self.label_width);
Arc::new(progress)
};
let arc_self = self.clone();
let eval_limit = arc_self.clone().eval_limit();
// FIXME: Saner logging
let mut futures = Vec::new();
for chunk in self.target_names.chunks(eval_limit) {
{
let arc_self = self.clone();
let multi = multi.clone();
let (style, _) = self.spinner_styles();
let eval_limit = arc_self.clone().eval_limit();
// FIXME: Eww
let chunk: Vec<String> = chunk.iter().map(|s| s.to_string()).collect();
futures.push(tokio::spawn(async move {
let drv = {
// Evaluation phase
let permit = arc_self.parallelism_limit.evaluation.acquire().await.unwrap();
let bar = set_up_batch_progress_bar!(multi, style, chunk,
"Evaluating configuration...",
"Evaluating configurations for {} nodes"
);
let arc_self = arc_self.clone();
let drv = match arc_self.eval_profiles(&chunk, bar).await {
Some(drv) => drv,
None => {
return;
}
};
drop(permit);
drv
};
let profiles = {
// Build phase
let permit = arc_self.parallelism_limit.build.acquire().await.unwrap();
let bar = set_up_batch_progress_bar!(multi, style, chunk,
"Building configuration...",
"Building configurations for {} nodes"
);
let goal = arc_self.goal;
let arc_self = arc_self.clone();
let profiles = arc_self.build_profiles(&chunk, drv, bar.clone()).await;
let profiles = match profiles {
Some(profiles) => profiles,
None => {
return;
}
};
if goal != DeploymentGoal::Build {
bar.finish_and_clear();
}
drop(permit);
profiles
};
// Should we continue?
if arc_self.goal == DeploymentGoal::Build {
return;
}
// Apply phase
progress.run(async move |progress| {
let mut futures = Vec::new();
for node in chunk {
let arc_self = arc_self.clone();
let multi = multi.clone();
let target = {
let mut targets = arc_self.targets.lock().await;
targets.remove(&node).unwrap()
};
let profile = profiles.get(&node).cloned()
.expect(&format!("Somehow profile for {} was not built", node));
for chunk in self.target_names.chunks(eval_limit) {
let arc_self = arc_self.clone();
let progress = progress.clone();
// FIXME: Eww
let chunk: Vec<String> = chunk.iter().map(|s| s.to_string()).collect();
futures.push(tokio::spawn(async move {
arc_self.apply_profile(&node, target, profile, multi).await
let drv = {
// Evaluation phase
let permit = arc_self.parallelism_limit.evaluation.acquire().await.unwrap();
let bar = set_up_batch_progress_bar!(progress, style, chunk,
"Evaluating configuration...",
"Evaluating configurations for {} nodes"
);
let arc_self = arc_self.clone();
let drv = match arc_self.eval_profiles(&chunk, bar).await {
Some(drv) => drv,
None => {
return;
}
};
drop(permit);
drv
};
let profiles = {
// Build phase
let permit = arc_self.parallelism_limit.build.acquire().await.unwrap();
let bar = set_up_batch_progress_bar!(progress, style, chunk,
"Building configuration...",
"Building configurations for {} nodes"
);
let goal = arc_self.goal;
let arc_self = arc_self.clone();
let profiles = arc_self.build_profiles(&chunk, drv, bar.clone()).await;
let profiles = match profiles {
Some(profiles) => profiles,
None => {
return;
}
};
if goal != Goal::Build {
bar.success_quiet();
}
drop(permit);
profiles
};
// Should we continue?
if arc_self.goal == Goal::Build {
return;
}
// Apply phase
let mut futures = Vec::new();
for node in chunk {
let arc_self = arc_self.clone();
let progress = progress.clone();
let target = {
let mut targets = arc_self.targets.lock().await;
targets.remove(&node).unwrap()
};
let profile = profiles.get(&node).cloned()
.expect(&format!("Somehow profile for {} was not built", node));
futures.push(tokio::spawn(async move {
arc_self.apply_profile(&node, target, profile, progress).await
}));
}
}));
}
join_all(futures).await;
}));
}).await;
}
let wait_for_tasks = tokio::spawn(async move {
join_all(futures).await;
root_bar.finish_with_message("Finished");
});
let tasks_result = if self.options.progress_bar {
let wait_for_bars = tokio::task::spawn_blocking(move || {
multi.join().unwrap();
});
let (tasks_result, _) = join!(wait_for_tasks, wait_for_bars);
tasks_result
} else {
wait_for_tasks.await
};
if let Err(e) = tasks_result {
log::error!("Deployment process failed: {}", e);
}
self.print_logs().await;
arc_self.print_logs().await;
}
async fn print_logs(&self) {
@ -475,53 +425,47 @@ impl Deployment {
}
}
async fn eval_profiles(self: Arc<Self>, chunk: &Vec<String>, progress: ProgressBar) -> Option<StoreDerivation<ProfileMap>> {
let (eval, logs) = self.hive.eval_selected(&chunk, Some(progress.clone())).await;
async fn eval_profiles(self: Arc<Self>, chunk: &Vec<String>, progress: ProcessProgress) -> Option<StoreDerivation<ProfileMap>> {
let (eval, logs) = self.hive.eval_selected(&chunk, progress.clone()).await;
match eval {
Ok(drv) => {
progress.finish_and_clear();
progress.success_quiet();
Some(drv)
}
Err(e) => {
let (_, fail_style) = self.spinner_styles();
progress.set_style(fail_style.clone());
progress.abandon_with_message(&format!("Evalation failed: {}", e));
progress.failure(&format!("Evalation failed: {}", e));
let mut results = self.results.lock().await;
let stage = DeploymentStage::Evaluate(chunk.clone());
let stage = Stage::Evaluate(chunk.clone());
results.push(DeploymentResult::failure(stage, logs));
None
}
}
}
async fn build_profiles(self: Arc<Self>, chunk: &Vec<String>, derivation: StoreDerivation<ProfileMap>, progress: ProgressBar) -> Option<ProfileMap> {
async fn build_profiles(self: Arc<Self>, chunk: &Vec<String>, derivation: StoreDerivation<ProfileMap>, progress: ProcessProgress) -> Option<ProfileMap> {
// FIXME: Remote build?
let mut builder = host::local();
if self.options.progress_bar {
builder.set_progress_bar(progress.clone());
}
builder.set_progress_bar(progress.clone());
match derivation.realize(&mut *builder).await {
Ok(profiles) => {
progress.finish_with_message("Successfully built");
progress.success("Successfully built");
let mut results = self.results.lock().await;
let stage = DeploymentStage::Build(chunk.clone());
let stage = Stage::Build(chunk.clone());
let logs = builder.dump_logs().await.map(|s| s.to_string());
results.push(DeploymentResult::success(stage, logs));
Some(profiles)
}
Err(e) => {
let (_, fail_style) = self.spinner_styles();
progress.set_style(fail_style);
progress.abandon_with_message(&format!("Build failed: {}", e));
progress.failure(&format!("Build failed: {}", e));
let mut results = self.results.lock().await;
let stage = DeploymentStage::Build(chunk.clone());
let stage = Stage::Build(chunk.clone());
let logs = builder.dump_logs().await.map(|s| s.to_string());
results.push(DeploymentResult::failure(stage, logs));
None
@ -529,55 +473,46 @@ impl Deployment {
}
}
async fn apply_profile(self: Arc<Self>, name: &str, mut target: DeploymentTarget, profile: Profile, multi: Arc<MultiProgress>) {
let (style, fail_style) = self.spinner_styles();
async fn apply_profile(self: Arc<Self>, name: &str, mut target: Target, profile: Profile, multi: Arc<Progress>) {
let permit = self.parallelism_limit.apply.acquire().await.unwrap();
let bar = multi.add(ProgressBar::new(100));
bar.set_style(style);
bar.set_prefix(name);
bar.tick();
bar.enable_steady_tick(100);
let mut bar = multi.create_process_progress(name.to_string());
if self.options.upload_keys && !target.config.keys.is_empty() {
bar.set_message("Uploading keys...");
bar.log("Uploading keys...");
if let Err(e) = target.host.upload_keys(&target.config.keys).await {
bar.set_style(fail_style);
bar.abandon_with_message(&format!("Failed to upload keys: {}", e));
bar.failure(&format!("Failed to upload keys: {}", e));
let mut results = self.results.lock().await;
let stage = DeploymentStage::Apply(name.to_string());
let stage = Stage::Apply(name.to_string());
let logs = target.host.dump_logs().await.map(|s| s.to_string());
results.push(DeploymentResult::failure(stage, logs));
return;
}
}
bar.set_message("Starting...");
bar.log("Starting...");
if self.options.progress_bar {
target.host.set_progress_bar(bar.clone());
}
target.host.set_progress_bar(bar.clone());
let copy_options = self.options.to_copy_options()
.include_outputs(true);
match target.host.deploy(&profile, self.goal, copy_options).await {
Ok(_) => {
bar.finish_with_message(self.goal.success_str().unwrap());
bar.success(self.goal.success_str().unwrap());
let mut results = self.results.lock().await;
let stage = DeploymentStage::Apply(name.to_string());
let stage = Stage::Apply(name.to_string());
let logs = target.host.dump_logs().await.map(|s| s.to_string());
results.push(DeploymentResult::success(stage, logs));
}
Err(e) => {
bar.set_style(fail_style);
bar.abandon_with_message(&format!("Failed: {}", e));
bar.failure(&format!("Failed: {}", e));
let mut results = self.results.lock().await;
let stage = DeploymentStage::Apply(name.to_string());
let stage = Stage::Apply(name.to_string());
let logs = target.host.dump_logs().await.map(|s| s.to_string());
results.push(DeploymentResult::failure(stage, logs));
}
@ -586,10 +521,6 @@ impl Deployment {
drop(permit);
}
fn spinner_styles(&self) -> (ProgressStyle, ProgressStyle) {
get_spinner_styles(self.progress_alignment)
}
fn eval_limit(&self) -> usize {
if let Some(limit) = self.evaluation_node_limit.get_limit() {
limit

View file

@ -2,7 +2,6 @@ use std::collections::HashMap;
use std::io::Write;
use std::path::{Path, PathBuf};
use indicatif::ProgressBar;
use tempfile::{NamedTempFile, TempPath};
use tokio::process::Command;
use serde::Serialize;
@ -16,6 +15,7 @@ use super::{
};
use super::NixCommand;
use crate::util::CommandExecution;
use crate::progress::ProcessProgress;
const HIVE_EVAL: &'static [u8] = include_bytes!("eval.nix");
@ -67,7 +67,7 @@ impl Hive {
/// Evaluation may take up a lot of memory, so we make it possible
/// to split up the evaluation process into chunks and run them
/// concurrently with other processes (e.g., build and apply).
pub async fn eval_selected(&self, nodes: &Vec<String>, progress_bar: Option<ProgressBar>) -> (NixResult<StoreDerivation<ProfileMap>>, Option<String>) {
pub async fn eval_selected(&self, nodes: &Vec<String>, progress_bar: ProcessProgress) -> (NixResult<StoreDerivation<ProfileMap>>, Option<String>) {
// FIXME: The return type is ugly...
let nodes_expr = SerializedNixExpresssion::new(nodes);
@ -79,11 +79,8 @@ impl Hive {
let expr = format!("hive.buildSelected {{ names = {}; }}", nodes_expr.expression());
let command = self.nix_instantiate(&expr).instantiate();
let mut execution = CommandExecution::new("(eval)", command);
if let Some(bar) = progress_bar {
execution.set_progress_bar(bar);
}
let mut execution = CommandExecution::new(command);
execution.set_progress_bar(progress_bar);
let eval = execution
.capture_store_path().await;

View file

@ -1,4 +1,3 @@
//!
use std::convert::TryInto;
use std::collections::HashMap;
use std::fs;
@ -6,12 +5,12 @@ use std::io::Write;
use async_trait::async_trait;
use tokio::process::Command;
use indicatif::ProgressBar;
use tempfile::NamedTempFile;
use super::{CopyDirection, CopyOptions, Host};
use crate::nix::{StorePath, Profile, DeploymentGoal, NixResult, NixCommand, Key, SYSTEM_PROFILE};
use crate::nix::{StorePath, Profile, Goal, NixResult, NixCommand, Key, SYSTEM_PROFILE};
use crate::util::CommandExecution;
use crate::progress::ProcessProgress;
/// The local machine running Colmena.
///
@ -19,14 +18,14 @@ use crate::util::CommandExecution;
/// (e.g., building Linux derivations on macOS).
#[derive(Debug)]
pub struct Local {
progress_bar: Option<ProgressBar>,
progress_bar: ProcessProgress,
logs: String,
}
impl Local {
pub fn new() -> Self {
Self {
progress_bar: None,
progress_bar: ProcessProgress::default(),
logs: String::new(),
}
}
@ -44,11 +43,9 @@ impl Host for Local {
.arg("--realise")
.arg(derivation.as_path());
let mut execution = CommandExecution::new("local", command);
let mut execution = CommandExecution::new(command);
if let Some(bar) = self.progress_bar.as_ref() {
execution.set_progress_bar(bar.clone());
}
execution.set_progress_bar(self.progress_bar.clone());
let result = execution.run().await;
@ -69,7 +66,7 @@ impl Host for Local {
Ok(())
}
async fn activate(&mut self, profile: &Profile, goal: DeploymentGoal) -> NixResult<()> {
async fn activate(&mut self, profile: &Profile, goal: Goal) -> NixResult<()> {
if goal.should_switch_profile() {
let path = profile.as_path().to_str().unwrap();
Command::new("nix-env")
@ -84,11 +81,9 @@ impl Host for Local {
command
.args(&activation_command[1..]);
let mut execution = CommandExecution::new("local", command);
let mut execution = CommandExecution::new(command);
if let Some(bar) = self.progress_bar.as_ref() {
execution.set_progress_bar(bar.clone());
}
execution.set_progress_bar(self.progress_bar.clone());
let result = execution.run().await;
@ -99,8 +94,8 @@ impl Host for Local {
result
}
fn set_progress_bar(&mut self, bar: ProgressBar) {
self.progress_bar = Some(bar);
fn set_progress_bar(&mut self, bar: ProcessProgress) {
self.progress_bar = bar;
}
async fn dump_logs(&self) -> Option<&str> {
Some(&self.logs)
@ -110,9 +105,7 @@ impl Host for Local {
impl Local {
/// "Uploads" a single key.
async fn upload_key(&mut self, name: &str, key: &Key) -> NixResult<()> {
if let Some(progress_bar) = self.progress_bar.as_ref() {
progress_bar.set_message(&format!("Deploying key {}", name));
}
self.progress_bar.log(&format!("Deploying key {}", name));
let dest_path = key.dest_dir.join(name);
@ -129,7 +122,7 @@ impl Local {
.arg(&key.permissions)
.arg(&temp_path);
let mut execution = CommandExecution::new("local", command);
let mut execution = CommandExecution::new(command);
let exit = execution.run().await;
let (stdout, stderr) = execution.get_logs();
@ -144,7 +137,7 @@ impl Local {
.arg(&format!("{}:{}", key.user, key.group))
.arg(&temp_path);
let mut execution = CommandExecution::new("local", command);
let mut execution = CommandExecution::new(command);
let exit = execution.run().await;
let (stdout, stderr) = execution.get_logs();

View file

@ -1,9 +1,9 @@
use std::collections::HashMap;
use async_trait::async_trait;
use indicatif::ProgressBar;
use super::{StorePath, Profile, DeploymentGoal, NixResult, NixError, Key};
use super::{StorePath, Profile, Goal, NixResult, NixError, Key};
use crate::progress::ProcessProgress;
mod ssh;
pub use ssh::Ssh;
@ -84,7 +84,7 @@ pub trait Host: Send + Sync + std::fmt::Debug {
}
/// Pushes and optionally activates a profile to the host.
async fn deploy(&mut self, profile: &Profile, goal: DeploymentGoal, copy_options: CopyOptions) -> NixResult<()> {
async fn deploy(&mut self, profile: &Profile, goal: Goal, copy_options: CopyOptions) -> NixResult<()> {
self.copy_closure(profile.as_store_path(), CopyDirection::ToRemote, copy_options).await?;
if goal.requires_activation() {
@ -104,13 +104,13 @@ pub trait Host: Send + Sync + std::fmt::Debug {
/// Activates a system profile on the host, if it runs NixOS.
///
/// The profile must already exist on the host. You should probably use deploy instead.
async fn activate(&mut self, profile: &Profile, goal: DeploymentGoal) -> NixResult<()> {
async fn activate(&mut self, profile: &Profile, goal: Goal) -> NixResult<()> {
Err(NixError::Unsupported)
}
#[allow(unused_variables)]
/// Provides a ProgressBar to use during operations.
fn set_progress_bar(&mut self, bar: ProgressBar) {
/// Provides a ProcessProgress to use during operations.
fn set_progress_bar(&mut self, bar: ProcessProgress) {
}
/// Dumps human-readable unstructured log messages related to the host.

View file

@ -4,13 +4,13 @@ use std::process::Stdio;
use async_trait::async_trait;
use futures::future::join3;
use indicatif::ProgressBar;
use tokio::process::Command;
use tokio::io::{AsyncWriteExt, BufReader};
use super::{CopyDirection, CopyOptions, Host};
use crate::nix::{StorePath, Profile, DeploymentGoal, NixResult, NixCommand, NixError, Key, SYSTEM_PROFILE};
use crate::nix::{StorePath, Profile, Goal, NixResult, NixCommand, NixError, Key, SYSTEM_PROFILE};
use crate::util::{CommandExecution, capture_stream};
use crate::progress::ProcessProgress;
const DEPLOY_KEY_TEMPLATE: &'static str = include_str!("./deploy-key.template");
@ -25,7 +25,7 @@ pub struct Ssh {
friendly_name: String,
path_cache: HashSet<StorePath>,
progress_bar: Option<ProgressBar>,
progress_bar: ProcessProgress,
logs: String,
}
@ -55,7 +55,7 @@ impl Host for Ssh {
Ok(())
}
async fn activate(&mut self, profile: &Profile, goal: DeploymentGoal) -> NixResult<()> {
async fn activate(&mut self, profile: &Profile, goal: Goal) -> NixResult<()> {
if goal.should_switch_profile() {
let path = profile.as_path().to_str().unwrap();
let set_profile = self.ssh(&["nix-env", "--profile", SYSTEM_PROFILE, "--set", path]);
@ -67,8 +67,8 @@ impl Host for Ssh {
let command = self.ssh(&v);
self.run_command(command).await
}
fn set_progress_bar(&mut self, bar: ProgressBar) {
self.progress_bar = Some(bar);
fn set_progress_bar(&mut self, bar: ProcessProgress) {
self.progress_bar = bar;
}
async fn dump_logs(&self) -> Option<&str> {
Some(&self.logs)
@ -83,17 +83,15 @@ impl Ssh {
host,
friendly_name,
path_cache: HashSet::new(),
progress_bar: None,
progress_bar: ProcessProgress::default(),
logs: String::new(),
}
}
async fn run_command(&mut self, command: Command) -> NixResult<()> {
let mut execution = CommandExecution::new(&self.friendly_name, command);
let mut execution = CommandExecution::new(command);
if let Some(bar) = self.progress_bar.as_ref() {
execution.set_progress_bar(bar.clone());
}
execution.set_progress_bar(self.progress_bar.clone());
let result = execution.run().await;
@ -154,9 +152,7 @@ impl Ssh {
impl Ssh {
/// Uploads a single key.
async fn upload_key(&mut self, name: &str, key: &Key) -> NixResult<()> {
if let Some(progress_bar) = self.progress_bar.as_ref() {
progress_bar.set_message(&format!("Deploying key {}", name));
}
self.progress_bar.log(&format!("Deploying key {}", name));
let dest_path = key.dest_dir.join(name);
@ -183,8 +179,8 @@ impl Ssh {
let stderr = BufReader::new(child.stderr.take().unwrap());
let futures = join3(
capture_stream(stdout, &self.friendly_name, self.progress_bar.clone()),
capture_stream(stderr, &self.friendly_name, self.progress_bar.clone()),
capture_stream(stdout, self.progress_bar.clone()),
capture_stream(stderr, self.progress_bar.clone()),
child.wait(),
);
let (stdout_str, stderr_str, exit) = futures.await;

View file

@ -29,7 +29,7 @@ pub mod profile;
pub use profile::{Profile, ProfileMap};
pub mod deployment;
pub use deployment::{DeploymentGoal, DeploymentTarget, Deployment};
pub use deployment::{Goal, Target, Deployment};
pub const SYSTEM_PROFILE: &'static str = "/nix/var/nix/profiles/system";

View file

@ -5,7 +5,7 @@ use std::fs;
use std::path::Path;
use super::{
DeploymentGoal,
Goal,
NixResult,
NixError,
StorePath,
@ -32,7 +32,7 @@ impl Profile {
}
/// Returns the command to activate this profile.
pub fn activation_command(&self, goal: DeploymentGoal) -> Option<Vec<String>> {
pub fn activation_command(&self, goal: Goal) -> Option<Vec<String>> {
if let Some(goal) = goal.as_str() {
let path = self.as_path().join("bin/switch-to-configuration");
let switch_to_configuration = path.to_str()

View file

@ -1,15 +1,242 @@
use indicatif::ProgressStyle;
//! Progress display utilities.
pub fn get_spinner_styles(node_name_alignment: usize) -> (ProgressStyle, ProgressStyle) {
let template = format!("{{prefix:>{}.bold.dim}} {{spinner}} {{elapsed}} {{wide_msg}}", node_name_alignment);
use std::future::Future;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
use std::time::Duration;
use atty::Stream;
use console::Style;
use futures::join;
use indicatif::{
MultiProgress,
ProgressStyle as IndicatifStyle,
ProgressBar as IndicatifBar,
};
pub fn get_spinner_styles(label_width: usize) -> (IndicatifStyle, IndicatifStyle) {
let template = format!("{{prefix:>{}.bold.dim}} {{spinner}} {{elapsed}} {{wide_msg}}", label_width);
(
ProgressStyle::default_spinner()
IndicatifStyle::default_spinner()
.tick_chars("🕛🕐🕑🕒🕓🕔🕕🕖🕗🕘🕙🕚✅")
.template(&template),
ProgressStyle::default_spinner()
IndicatifStyle::default_spinner()
.tick_chars("❌❌")
.template(&template),
)
}
pub enum OutputStyle {
/// Show condensed progress bars with fancy spinners.
///
/// Not usable in a non-interactive environment.
Condensed,
/// Output log lines directly to console.
Plain,
}
/// Parallel progress display.
///
/// Currently a simple wrapper over MultiProgress.
/// Sometimes we need to log directly to the console, in case
/// stdout is not connected to a TTY or the user requests
/// verbose logging via `--verbose`.
///
/// This is normally only usable as Arc<Progress>.
pub struct Progress {
multi: Option<Arc<MultiProgress>>, // eww
/// Width of the labels for alignment
label_width: usize,
}
impl Progress {
pub fn with_style(output_style: OutputStyle) -> Self {
let multi = match output_style {
OutputStyle::Condensed => Some(Arc::new(Self::init_multi())),
OutputStyle::Plain => None,
};
Self {
multi,
label_width: 10,
}
}
pub fn set_label_width(&mut self, width: usize) {
self.label_width = width;
}
/// Returns a handle for a process to display progress information.
pub fn create_process_progress(&self, label: String) -> ProcessProgress {
let mut progress = ProcessProgress::new(label.clone(), self.label_width);
if let Some(multi) = self.multi.as_ref() {
let bar = multi.add(IndicatifBar::new(100));
let (style, _) = get_spinner_styles(self.label_width);
bar.set_prefix(&label);
bar.set_style(style);
bar.enable_steady_tick(100);
progress.set_bar(bar);
}
progress
}
/// Runs code that may initate multiple processes.
pub async fn run<F: Future, U>(self: Arc<Self>, func: U) -> F::Output
where U: FnOnce(Arc<Progress>) -> F
{
if let Some(multi) = self.multi.as_ref() {
let multi = multi.clone();
let finished = Arc::new(AtomicBool::new(false));
let redraw_future = {
let finished = finished.clone();
tokio::task::spawn_blocking(move || {
while !finished.load(Ordering::SeqCst) {
multi.join().unwrap();
thread::sleep(Duration::from_millis(100));
}
multi.join().unwrap();
})
};
let func_future = {
let finished = finished.clone();
async move {
let result = func(self).await;
finished.store(true, Ordering::SeqCst);
// root_bar.finish_and_clear();
result
}
};
let (func_result, _) = join!(func_future, redraw_future);
func_result
} else {
// Plain output. Simple.
func(self.clone()).await
}
}
fn init_multi() -> MultiProgress {
let multi = MultiProgress::new();
multi
}
fn detect_output() -> OutputStyle {
if atty::is(Stream::Stdout) {
OutputStyle::Condensed
} else {
OutputStyle::Plain
}
}
}
impl Default for Progress {
fn default() -> Self {
let style = Self::detect_output();
Self::with_style(style)
}
}
/// Progress display for a single process.
#[derive(Debug, Clone)]
pub struct ProcessProgress {
label: String,
label_width: usize,
bar: Option<IndicatifBar>,
quiet: bool,
}
impl ProcessProgress {
fn new(label: String, label_width: usize) -> Self {
Self {
label,
label_width,
bar: None,
quiet: false,
}
}
fn set_bar(&mut self, bar: IndicatifBar) {
self.bar = Some(bar);
}
/// Displays a new line of log.
pub fn log(&mut self, message: &str) {
if self.quiet {
return;
}
if let Some(bar) = self.bar.as_ref() {
bar.set_message(message);
} else {
let style = Style::new().bold();
self.plain_print(style, message);
}
}
/// Marks the process as successful, consuming the spinner.
pub fn success(self, message: &str) {
if self.quiet {
return;
}
if let Some(bar) = self.bar.as_ref() {
bar.finish_with_message(message);
} else {
let style = Style::new().bold().green();
self.plain_print(style, message);
}
}
/// Marks the process as successful, consuming the spinner.
pub fn success_quiet(self) {
if self.quiet {
return;
}
if let Some(bar) = self.bar.as_ref() {
bar.finish_and_clear();
}
}
/// Marks the process as unsuccessful, consuming the spinner.
pub fn failure(self, message: &str) {
if self.quiet {
return;
}
if let Some(bar) = self.bar.as_ref() {
let (_, fail_style) = get_spinner_styles(self.label_width);
bar.set_style(fail_style);
bar.abandon_with_message(message);
} else {
let style = Style::new().bold().red();
self.plain_print(style, message);
}
}
fn plain_print(&self, style: Style, line: &str) {
eprintln!("{:>width$} | {}", style.apply_to(&self.label), line, width = self.label_width);
}
}
impl Default for ProcessProgress {
/// Creates a ProcessProgress that does nothing.
fn default() -> Self {
Self {
label: String::new(),
label_width: 0,
bar: None,
quiet: true,
}
}
}

View file

@ -1,18 +1,16 @@
use std::collections::HashMap;
use std::convert::AsRef;
use std::fs;
use std::path::PathBuf;
use std::process::Stdio;
use clap::{App, Arg, ArgMatches};
use console::style;
use futures::future::join3;
use glob::Pattern as GlobPattern;
use indicatif::ProgressBar;
use tokio::io::{AsyncRead, AsyncBufReadExt, BufReader};
use tokio::process::Command;
use super::nix::{NodeConfig, Hive, NixResult, NixError};
use super::progress::ProcessProgress;
enum NodeFilter {
NameFilter(GlobPattern),
@ -21,27 +19,25 @@ enum NodeFilter {
/// Non-interactive execution of an arbitrary Nix command.
pub struct CommandExecution {
label: String,
command: Command,
progress_bar: Option<ProgressBar>,
progress_bar: ProcessProgress,
stdout: Option<String>,
stderr: Option<String>,
}
impl CommandExecution {
pub fn new<S: AsRef<str>>(label: S, command: Command) -> Self {
pub fn new(command: Command) -> Self {
Self {
label: label.as_ref().to_string(),
command,
progress_bar: None,
progress_bar: ProcessProgress::default(),
stdout: None,
stderr: None,
}
}
/// Provides a ProgressBar to use to display output.
pub fn set_progress_bar(&mut self, bar: ProgressBar) {
self.progress_bar = Some(bar);
/// Provides a ProcessProgress to use to display output.
pub fn set_progress_bar(&mut self, bar: ProcessProgress) {
self.progress_bar = bar;
}
/// Retrieve logs from the last invocation.
@ -64,8 +60,8 @@ impl CommandExecution {
let stderr = BufReader::new(child.stderr.take().unwrap());
let futures = join3(
capture_stream(stdout, &self.label, self.progress_bar.clone()),
capture_stream(stderr, &self.label, self.progress_bar.clone()),
capture_stream(stdout, self.progress_bar.clone()),
capture_stream(stderr, self.progress_bar.clone()),
child.wait(),
);
@ -219,7 +215,7 @@ fn canonicalize_cli_path(path: String) -> PathBuf {
}
}
pub async fn capture_stream<R: AsyncRead + Unpin>(mut stream: BufReader<R>, label: &str, mut progress_bar: Option<ProgressBar>) -> String {
pub async fn capture_stream<R: AsyncRead + Unpin>(mut stream: BufReader<R>, mut progress_bar: ProcessProgress) -> String {
let mut log = String::new();
loop {
@ -231,11 +227,7 @@ pub async fn capture_stream<R: AsyncRead + Unpin>(mut stream: BufReader<R>, labe
}
let trimmed = line.trim_end();
if let Some(progress_bar) = progress_bar.as_mut() {
progress_bar.set_message(trimmed);
} else {
eprintln!("{} | {}", style(label).cyan(), trimmed);
}
progress_bar.log(trimmed);
log += trimmed;
log += "\n";