Add exec command

This commit is contained in:
Zhaofeng Li 2021-02-09 22:07:10 -08:00
parent 1c9e7cdb83
commit 78a6825be6
5 changed files with 199 additions and 16 deletions

View file

@ -180,6 +180,11 @@ pub async fn run(_global_args: &ArgMatches<'_>, local_args: &ArgMatches<'_>) {
log::info!("Selected {} out of {} hosts ({} skipped)", targets.len(), all_nodes.len(), selected_nodes.len() - targets.len());
}
if targets.len() == 0 {
log::warn!("No selected nodes are accessible over SSH. Exiting...");
quit::with_code(2);
}
let mut deployment = Deployment::new(hive, targets, goal);
let mut options = DeploymentOptions::default();

174
src/command/exec.rs Normal file
View file

@ -0,0 +1,174 @@
use std::collections::HashMap;
use std::env;
use std::path::PathBuf;
use std::sync::Arc;
use clap::{Arg, App, AppSettings, SubCommand, ArgMatches};
use futures::future::join_all;
use tokio::sync::Semaphore;
use crate::nix::NixError;
use crate::progress::{Progress, OutputStyle};
use crate::util::{self, CommandExecution};
pub fn subcommand() -> App<'static, 'static> {
let command = SubCommand::with_name("exec")
.about("Run a command against remote hosts")
.long_about(r#"Run a command on remote machines
By default, Colmena will print out all output lines. Use --oneline for a condensed status output with each node taking up a single line.
"#)
.setting(AppSettings::TrailingVarArg)
.arg(Arg::with_name("parallel")
.short("p")
.long("parallel")
.value_name("LIMIT")
.help("Deploy parallelism limit")
.long_help(r#"Limits the maximum number of hosts to run the command in parallel.
In `colmena exec`, the parallelism limit is disabled (0) by default.
"#)
.default_value("0")
.takes_value(true)
.validator(|s| {
match s.parse::<usize>() {
Ok(_) => Ok(()),
Err(_) => Err(String::from("The value must be a valid number")),
}
}))
.arg(Arg::with_name("verbose")
.short("v")
.long("verbose")
.help("Be verbose")
.long_help("Deactivates the progress spinner and prints every line of output.")
.takes_value(false))
.arg(Arg::with_name("command")
.value_name("COMMAND")
.last(true)
.help("Command")
.required(true)
.multiple(true)
.long_help(r#"Command to run
It's recommended to use -- to separate Colmena options from the command to run. For example:
colmena exec --on @routers -- tcpdump -vni any ip[9] == 89
"#));
util::register_selector_args(command)
}
pub async fn run(_global_args: &ArgMatches<'_>, local_args: &ArgMatches<'_>) {
let hive = util::hive_from_args(local_args).unwrap();
log::info!("Enumerating nodes...");
let all_nodes = hive.deployment_info().await.unwrap();
let selected_nodes = match local_args.value_of("on") {
Some(filter) => {
util::filter_nodes(&all_nodes, filter)
}
None => all_nodes.keys().cloned().collect(),
};
if selected_nodes.len() == 0 {
log::warn!("No hosts matched. Exiting...");
quit::with_code(2);
}
let ssh_config = env::var("SSH_CONFIG_FILE")
.ok().map(PathBuf::from);
let mut hosts = HashMap::new();
for node in &selected_nodes {
let config = all_nodes.get(node).unwrap();
let host = config.to_ssh_host();
match host {
Some(mut host) => {
if let Some(ssh_config) = ssh_config.as_ref() {
host.set_ssh_config(ssh_config.clone());
}
hosts.insert(node.clone(), host);
}
None => {},
}
}
if hosts.len() == all_nodes.len() {
log::info!("Selected all {} nodes.", hosts.len());
} else if hosts.len() == selected_nodes.len() {
log::info!("Selected {} out of {} hosts.", hosts.len(), all_nodes.len());
} else {
log::info!("Selected {} out of {} hosts ({} skipped)", hosts.len(), all_nodes.len(), selected_nodes.len() - hosts.len());
}
if hosts.len() == 0 {
log::warn!("No selected nodes are accessible over SSH. Exiting...");
quit::with_code(2);
}
let mut progress = if local_args.is_present("verbose") {
Progress::with_style(OutputStyle::Plain)
} else {
Progress::default()
};
let parallel_sp = Arc::new({
let limit = local_args.value_of("parallel").unwrap()
.parse::<usize>().unwrap();
if limit > 0 {
Some(Semaphore::new(limit))
} else {
None
}
});
let label_width = hosts.keys().map(|n| n.len()).max().unwrap();
progress.set_label_width(label_width);
let progress = Arc::new(progress);
let command: Arc<Vec<String>> = Arc::new(local_args.values_of("command").unwrap().map(|s| s.to_string()).collect());
progress.run(async move |progress| {
let mut futures = Vec::new();
for (name, host) in hosts.drain() {
let parallel_sp = parallel_sp.clone();
let command = command.clone();
let progress = progress.clone();
futures.push(tokio::spawn(async move {
let permit = match parallel_sp.as_ref() {
Some(sp) => Some(sp.acquire().await.unwrap()),
None => None,
};
let progress = progress.create_process_progress(name.clone());
let command_v: Vec<&str> = command.iter().map(|s| s.as_str()).collect();
let command = host.ssh(&command_v);
let mut execution = CommandExecution::new(command);
execution.set_progress_bar(progress.clone());
match execution.run().await {
Ok(()) => {
progress.success("Exited");
}
Err(e) => {
if let NixError::NixFailure { exit_code } = e {
progress.failure(&format!("Exited with code {}", exit_code));
} else {
progress.failure(&format!("Error during execution: {}", e));
}
}
}
drop(permit);
}));
}
join_all(futures).await;
}).await;
}

View file

@ -3,3 +3,4 @@ pub mod apply;
pub mod introspect;
pub mod apply_local;
pub mod upload_keys;
pub mod exec;

View file

@ -69,6 +69,7 @@ For a sample configuration, see <https://github.com/zhaofengli/colmena>.
bind_command!(build, app);
bind_command!(introspect, app);
bind_command!(upload_keys, app);
bind_command!(exec, app);
let matches = app.clone().get_matches();
@ -77,6 +78,7 @@ For a sample configuration, see <https://github.com/zhaofengli/colmena>.
command!(build, matches);
command!(introspect, matches);
command!("upload-keys", upload_keys, matches);
command!(exec, matches);
app.print_long_help().unwrap();
}

View file

@ -109,6 +109,23 @@ impl Ssh {
Box::new(self)
}
/// Returns a Tokio Command to run an arbitrary command on the host.
pub fn ssh(&self, command: &[&str]) -> Command {
let options = self.ssh_options();
let options_str = options.join(" ");
let mut cmd = Command::new("ssh");
cmd
.arg(self.ssh_target())
.args(&options)
.arg("--")
.args(command)
.env("NIX_SSHOPTS", options_str);
cmd
}
async fn run_command(&mut self, command: Command) -> NixResult<()> {
let mut execution = CommandExecution::new(command);
@ -176,22 +193,6 @@ impl Ssh {
options
}
fn ssh(&self, command: &[&str]) -> Command {
let options = self.ssh_options();
let options_str = options.join(" ");
let mut cmd = Command::new("ssh");
cmd
.arg(self.ssh_target())
.args(&options)
.arg("--")
.args(command)
.env("NIX_SSHOPTS", options_str);
cmd
}
/// Uploads a single key.
async fn upload_key(&mut self, name: &str, key: &Key) -> NixResult<()> {
self.progress_bar.log(&format!("Deploying key {}", name));