From 78a6825be6e347050fbb8008afaaa38a8469db51 Mon Sep 17 00:00:00 2001 From: Zhaofeng Li Date: Tue, 9 Feb 2021 22:07:10 -0800 Subject: [PATCH] Add exec command --- src/command/apply.rs | 5 ++ src/command/exec.rs | 174 +++++++++++++++++++++++++++++++++++++++++++ src/command/mod.rs | 1 + src/main.rs | 2 + src/nix/host/ssh.rs | 33 ++++---- 5 files changed, 199 insertions(+), 16 deletions(-) create mode 100644 src/command/exec.rs diff --git a/src/command/apply.rs b/src/command/apply.rs index 4bb9383..2034b40 100644 --- a/src/command/apply.rs +++ b/src/command/apply.rs @@ -180,6 +180,11 @@ pub async fn run(_global_args: &ArgMatches<'_>, local_args: &ArgMatches<'_>) { log::info!("Selected {} out of {} hosts ({} skipped)", targets.len(), all_nodes.len(), selected_nodes.len() - targets.len()); } + if targets.len() == 0 { + log::warn!("No selected nodes are accessible over SSH. Exiting..."); + quit::with_code(2); + } + let mut deployment = Deployment::new(hive, targets, goal); let mut options = DeploymentOptions::default(); diff --git a/src/command/exec.rs b/src/command/exec.rs new file mode 100644 index 0000000..5368796 --- /dev/null +++ b/src/command/exec.rs @@ -0,0 +1,174 @@ +use std::collections::HashMap; +use std::env; +use std::path::PathBuf; +use std::sync::Arc; + +use clap::{Arg, App, AppSettings, SubCommand, ArgMatches}; +use futures::future::join_all; +use tokio::sync::Semaphore; + +use crate::nix::NixError; +use crate::progress::{Progress, OutputStyle}; +use crate::util::{self, CommandExecution}; + +pub fn subcommand() -> App<'static, 'static> { + let command = SubCommand::with_name("exec") + .about("Run a command against remote hosts") + .long_about(r#"Run a command on remote machines + +By default, Colmena will print out all output lines. Use --oneline for a condensed status output with each node taking up a single line. +"#) + .setting(AppSettings::TrailingVarArg) + .arg(Arg::with_name("parallel") + .short("p") + .long("parallel") + .value_name("LIMIT") + .help("Deploy parallelism limit") + .long_help(r#"Limits the maximum number of hosts to run the command in parallel. + +In `colmena exec`, the parallelism limit is disabled (0) by default. +"#) + .default_value("0") + .takes_value(true) + .validator(|s| { + match s.parse::() { + Ok(_) => Ok(()), + Err(_) => Err(String::from("The value must be a valid number")), + } + })) + .arg(Arg::with_name("verbose") + .short("v") + .long("verbose") + .help("Be verbose") + .long_help("Deactivates the progress spinner and prints every line of output.") + .takes_value(false)) + .arg(Arg::with_name("command") + .value_name("COMMAND") + .last(true) + .help("Command") + .required(true) + .multiple(true) + .long_help(r#"Command to run + +It's recommended to use -- to separate Colmena options from the command to run. For example: + + colmena exec --on @routers -- tcpdump -vni any ip[9] == 89 +"#)); + + util::register_selector_args(command) +} + +pub async fn run(_global_args: &ArgMatches<'_>, local_args: &ArgMatches<'_>) { + let hive = util::hive_from_args(local_args).unwrap(); + + log::info!("Enumerating nodes..."); + let all_nodes = hive.deployment_info().await.unwrap(); + + let selected_nodes = match local_args.value_of("on") { + Some(filter) => { + util::filter_nodes(&all_nodes, filter) + } + None => all_nodes.keys().cloned().collect(), + }; + + if selected_nodes.len() == 0 { + log::warn!("No hosts matched. Exiting..."); + quit::with_code(2); + } + + let ssh_config = env::var("SSH_CONFIG_FILE") + .ok().map(PathBuf::from); + + let mut hosts = HashMap::new(); + for node in &selected_nodes { + let config = all_nodes.get(node).unwrap(); + let host = config.to_ssh_host(); + match host { + Some(mut host) => { + if let Some(ssh_config) = ssh_config.as_ref() { + host.set_ssh_config(ssh_config.clone()); + } + + hosts.insert(node.clone(), host); + } + None => {}, + } + } + + if hosts.len() == all_nodes.len() { + log::info!("Selected all {} nodes.", hosts.len()); + } else if hosts.len() == selected_nodes.len() { + log::info!("Selected {} out of {} hosts.", hosts.len(), all_nodes.len()); + } else { + log::info!("Selected {} out of {} hosts ({} skipped)", hosts.len(), all_nodes.len(), selected_nodes.len() - hosts.len()); + } + + if hosts.len() == 0 { + log::warn!("No selected nodes are accessible over SSH. Exiting..."); + quit::with_code(2); + } + + let mut progress = if local_args.is_present("verbose") { + Progress::with_style(OutputStyle::Plain) + } else { + Progress::default() + }; + + let parallel_sp = Arc::new({ + let limit = local_args.value_of("parallel").unwrap() + .parse::().unwrap(); + + if limit > 0 { + Some(Semaphore::new(limit)) + } else { + None + } + }); + + let label_width = hosts.keys().map(|n| n.len()).max().unwrap(); + progress.set_label_width(label_width); + + let progress = Arc::new(progress); + let command: Arc> = Arc::new(local_args.values_of("command").unwrap().map(|s| s.to_string()).collect()); + + progress.run(async move |progress| { + let mut futures = Vec::new(); + + for (name, host) in hosts.drain() { + let parallel_sp = parallel_sp.clone(); + let command = command.clone(); + let progress = progress.clone(); + + futures.push(tokio::spawn(async move { + let permit = match parallel_sp.as_ref() { + Some(sp) => Some(sp.acquire().await.unwrap()), + None => None, + }; + + let progress = progress.create_process_progress(name.clone()); + + let command_v: Vec<&str> = command.iter().map(|s| s.as_str()).collect(); + let command = host.ssh(&command_v); + let mut execution = CommandExecution::new(command); + execution.set_progress_bar(progress.clone()); + + match execution.run().await { + Ok(()) => { + progress.success("Exited"); + } + Err(e) => { + if let NixError::NixFailure { exit_code } = e { + progress.failure(&format!("Exited with code {}", exit_code)); + } else { + progress.failure(&format!("Error during execution: {}", e)); + } + } + } + + drop(permit); + })); + } + + join_all(futures).await; + }).await; +} diff --git a/src/command/mod.rs b/src/command/mod.rs index 02bc206..fdace5a 100644 --- a/src/command/mod.rs +++ b/src/command/mod.rs @@ -3,3 +3,4 @@ pub mod apply; pub mod introspect; pub mod apply_local; pub mod upload_keys; +pub mod exec; diff --git a/src/main.rs b/src/main.rs index ed77035..16783f3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -69,6 +69,7 @@ For a sample configuration, see . bind_command!(build, app); bind_command!(introspect, app); bind_command!(upload_keys, app); + bind_command!(exec, app); let matches = app.clone().get_matches(); @@ -77,6 +78,7 @@ For a sample configuration, see . command!(build, matches); command!(introspect, matches); command!("upload-keys", upload_keys, matches); + command!(exec, matches); app.print_long_help().unwrap(); } diff --git a/src/nix/host/ssh.rs b/src/nix/host/ssh.rs index 5d15d8b..2ced2b3 100644 --- a/src/nix/host/ssh.rs +++ b/src/nix/host/ssh.rs @@ -109,6 +109,23 @@ impl Ssh { Box::new(self) } + /// Returns a Tokio Command to run an arbitrary command on the host. + pub fn ssh(&self, command: &[&str]) -> Command { + let options = self.ssh_options(); + let options_str = options.join(" "); + + let mut cmd = Command::new("ssh"); + + cmd + .arg(self.ssh_target()) + .args(&options) + .arg("--") + .args(command) + .env("NIX_SSHOPTS", options_str); + + cmd + } + async fn run_command(&mut self, command: Command) -> NixResult<()> { let mut execution = CommandExecution::new(command); @@ -176,22 +193,6 @@ impl Ssh { options } - fn ssh(&self, command: &[&str]) -> Command { - let options = self.ssh_options(); - let options_str = options.join(" "); - - let mut cmd = Command::new("ssh"); - - cmd - .arg(self.ssh_target()) - .args(&options) - .arg("--") - .args(command) - .env("NIX_SSHOPTS", options_str); - - cmd - } - /// Uploads a single key. async fn upload_key(&mut self, name: &str, key: &Key) -> NixResult<()> { self.progress_bar.log(&format!("Deploying key {}", name));