From 62a3d1e6f80effcc01b0c676a4d17196b59bc51c Mon Sep 17 00:00:00 2001 From: Zhaofeng Li Date: Fri, 29 Jul 2022 22:13:09 -0700 Subject: [PATCH] Run rustfmt --- flake.nix | 2 +- src/cli.rs | 57 +++-- src/command/apply.rs | 54 ++-- src/command/apply_local.rs | 41 ++-- src/command/build.rs | 18 +- src/command/eval.rs | 17 +- src/command/exec.rs | 92 ++++--- src/command/mod.rs | 4 +- src/command/nix_info.rs | 7 +- src/command/test_progress.rs | 11 +- src/command/upload_keys.rs | 18 +- src/error.rs | 6 +- src/job.rs | 132 ++++++---- src/main.rs | 6 +- src/nix/deployment/mod.rs | 381 +++++++++++++++++------------ src/nix/deployment/options.rs | 2 +- src/nix/evaluator/mod.rs | 13 +- src/nix/evaluator/nix_eval_jobs.rs | 84 ++++--- src/nix/flake.rs | 16 +- src/nix/hive/assets.rs | 12 +- src/nix/hive/mod.rs | 163 +++++++----- src/nix/hive/tests/mod.rs | 181 +++++++++----- src/nix/host/key_uploader.rs | 35 ++- src/nix/host/local.rs | 60 +++-- src/nix/host/mod.rs | 38 ++- src/nix/host/ssh.rs | 90 +++++-- src/nix/info.rs | 31 ++- src/nix/key.rs | 64 ++--- src/nix/mod.rs | 44 ++-- src/nix/node_filter.rs | 157 +++++++----- src/nix/profile.rs | 32 +-- src/nix/store.rs | 25 +- src/progress/mod.rs | 17 +- src/progress/plain.rs | 46 +--- src/progress/spinner.rs | 38 ++- src/troubleshooter.rs | 28 ++- src/util.rs | 66 +++-- 37 files changed, 1241 insertions(+), 847 deletions(-) diff --git a/flake.nix b/flake.nix index fcd6054..4caf7c0 100644 --- a/flake.nix +++ b/flake.nix @@ -74,7 +74,7 @@ packages = with pkgs; [ bashInteractive editorconfig-checker - clippy rust-analyzer cargo-outdated + clippy rust-analyzer cargo-outdated rustfmt python3 python3Packages.flake8 ]; }; diff --git a/src/cli.rs b/src/cli.rs index fb87e17..d852b6f 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -2,7 +2,7 @@ use std::env; -use clap::{Command as ClapCommand, Arg, ArgMatches, ColorChoice}; +use clap::{Arg, ArgMatches, ColorChoice, Command as ClapCommand}; use clap_complete::Shell; use const_format::concatcp; use env_logger::fmt::WriteStyle; @@ -18,7 +18,13 @@ const MANUAL_URL_BASE: &str = "https://colmena.cli.rs"; /// We maintain CLI and Nix API stability for each minor version. /// This ensures that the user always sees accurate documentations, and we can /// easily perform updates to the manual after a release. -const MANUAL_URL: &str = concatcp!(MANUAL_URL_BASE, "/", env!("CARGO_PKG_VERSION_MAJOR"), ".", env!("CARGO_PKG_VERSION_MINOR")); +const MANUAL_URL: &str = concatcp!( + MANUAL_URL_BASE, + "/", + env!("CARGO_PKG_VERSION_MAJOR"), + ".", + env!("CARGO_PKG_VERSION_MINOR") +); /// The note shown when the user is using a pre-release version. /// @@ -29,12 +35,15 @@ const MANUAL_DISCREPANCY_NOTE: &str = "Note: You are using a pre-release version lazy_static! { static ref LONG_ABOUT: String = { - let mut message = format!(r#"NixOS deployment tool + let mut message = format!( + r#"NixOS deployment tool Colmena helps you deploy to multiple hosts running NixOS. For more details, read the manual at <{}>. -"#, MANUAL_URL); +"#, + MANUAL_URL + ); if !env!("CARGO_PKG_VERSION_PRE").is_empty() { message += MANUAL_DISCREPANCY_NOTE; @@ -42,12 +51,14 @@ For more details, read the manual at <{}>. message }; - static ref CONFIG_HELP: String = { - format!(r#"If this argument is not specified, Colmena will search upwards from the current working directory for a file named "flake.nix" or "hive.nix". This behavior is disabled if --config/-f is given explicitly. + format!( + r#"If this argument is not specified, Colmena will search upwards from the current working directory for a file named "flake.nix" or "hive.nix". This behavior is disabled if --config/-f is given explicitly. For a sample configuration, check the manual at <{}>. -"#, MANUAL_URL) +"#, + MANUAL_URL + ) }; } @@ -68,19 +79,15 @@ macro_rules! register_command { macro_rules! handle_command { ($module:ident, $matches:ident) => { if let Some(sub_matches) = $matches.subcommand_matches(stringify!($module)) { - crate::troubleshooter::run_wrapped( - &$matches, &sub_matches, - command::$module::run, - ).await; + crate::troubleshooter::run_wrapped(&$matches, &sub_matches, command::$module::run) + .await; return; } }; ($name:expr, $module:ident, $matches:ident) => { if let Some(sub_matches) = $matches.subcommand_matches($name) { - crate::troubleshooter::run_wrapped( - &$matches, &sub_matches, - command::$module::run, - ).await; + crate::troubleshooter::run_wrapped(&$matches, &sub_matches, command::$module::run) + .await; return; } }; @@ -131,14 +138,18 @@ It's also possible to specify the preference using environment variables. See ClapCommand { @@ -145,27 +140,26 @@ pub fn subcommand() -> ClapCommand<'static> { pub async fn run(_global_args: &ArgMatches, local_args: &ArgMatches) -> Result<(), ColmenaError> { let hive = util::hive_from_args(local_args).await?; - let ssh_config = env::var("SSH_CONFIG_FILE") - .ok().map(PathBuf::from); + let ssh_config = env::var("SSH_CONFIG_FILE").ok().map(PathBuf::from); let goal_arg = local_args.value_of("goal").unwrap(); let goal = Goal::from_str(goal_arg).unwrap(); - let filter = local_args.value_of("on") - .map(NodeFilter::new) - .transpose()?; + let filter = local_args.value_of("on").map(NodeFilter::new).transpose()?; if !filter.is_some() && goal != Goal::Build { - // User did not specify node, we should check meta and see rules - let meta = hive.get_meta_config().await?; - if !meta.allow_apply_all { - log::error!("No node filter is specified and meta.allowApplyAll is set to false."); - log::error!("Hint: Filter the nodes with --on."); - quit::with_code(1); - } + // User did not specify node, we should check meta and see rules + let meta = hive.get_meta_config().await?; + if !meta.allow_apply_all { + log::error!("No node filter is specified and meta.allowApplyAll is set to false."); + log::error!("Hint: Filter the nodes with --on."); + quit::with_code(1); + } } - let targets = hive.select_nodes(filter, ssh_config, goal.requires_target_host()).await?; + let targets = hive + .select_nodes(filter, ssh_config, goal.requires_target_host()) + .await?; let n_targets = targets.len(); let verbose = local_args.is_present("verbose") || goal == Goal::DryActivate; @@ -181,7 +175,9 @@ pub async fn run(_global_args: &ArgMatches, local_args: &ArgMatches) -> Result<( options.set_gzip(!local_args.is_present("no-gzip")); options.set_upload_keys(!local_args.is_present("no-keys")); options.set_reboot(local_args.is_present("reboot")); - options.set_force_replace_unknown_profiles(local_args.is_present("force-replace-unknown-profiles")); + options.set_force_replace_unknown_profiles( + local_args.is_present("force-replace-unknown-profiles"), + ); options.set_evaluator(local_args.value_of_t("evaluator").unwrap()); if local_args.is_present("keep-result") { @@ -207,7 +203,11 @@ pub async fn run(_global_args: &ArgMatches, local_args: &ArgMatches) -> Result<( let parallelism_limit = { let mut limit = ParallelismLimit::default(); limit.set_apply_limit({ - let limit = local_args.value_of("parallel").unwrap().parse::().unwrap(); + let limit = local_args + .value_of("parallel") + .unwrap() + .parse::() + .unwrap(); if limit == 0 { n_targets } else { @@ -232,12 +232,10 @@ pub async fn run(_global_args: &ArgMatches, local_args: &ArgMatches) -> Result<( deployment.set_parallelism_limit(parallelism_limit); deployment.set_evaluation_node_limit(evaluation_node_limit); - let (deployment, output) = tokio::join!( - deployment.execute(), - output.run_until_completion(), - ); + let (deployment, output) = tokio::join!(deployment.execute(), output.run_until_completion(),); - deployment?; output?; + deployment?; + output?; Ok(()) } diff --git a/src/command/apply_local.rs b/src/command/apply_local.rs index ac7340a..4171b0f 100644 --- a/src/command/apply_local.rs +++ b/src/command/apply_local.rs @@ -1,17 +1,12 @@ use regex::Regex; use std::collections::HashMap; -use clap::{Arg, Command as ClapCommand, ArgMatches}; +use clap::{Arg, ArgMatches, Command as ClapCommand}; use tokio::fs; use crate::error::ColmenaError; -use crate::nix::deployment::{ - Deployment, - Goal, - TargetNode, - Options, -}; -use crate::nix::{NodeName, host::Local as LocalHost}; +use crate::nix::deployment::{Deployment, Goal, Options, TargetNode}; +use crate::nix::{host::Local as LocalHost, NodeName}; use crate::progress::SimpleProgressOutput; use crate::util; @@ -89,8 +84,10 @@ pub async fn run(_global_args: &ArgMatches, local_args: &ArgMatches) -> Result<( let s = if local_args.is_present("node") { local_args.value_of("node").unwrap().to_owned() } else { - hostname::get().expect("Could not get hostname") - .to_string_lossy().into_owned() + hostname::get() + .expect("Could not get hostname") + .to_string_lossy() + .into_owned() }; NodeName::new(s)? @@ -101,7 +98,10 @@ pub async fn run(_global_args: &ArgMatches, local_args: &ArgMatches) -> Result<( if let Some(info) = hive.deployment_info_single(&hostname).await.unwrap() { let nix_options = hive.nix_options_with_builders().await.unwrap(); if !info.allows_local_deployment() { - log::error!("Local deployment is not enabled for host {}.", hostname.as_str()); + log::error!( + "Local deployment is not enabled for host {}.", + hostname.as_str() + ); log::error!("Hint: Set deployment.allowLocalDeployment to true."); quit::with_code(2); } @@ -111,13 +111,12 @@ pub async fn run(_global_args: &ArgMatches, local_args: &ArgMatches) -> Result<( host.set_privilege_escalation_command(Some(command)); } - TargetNode::new( - hostname.clone(), - Some(host.upcast()), - info.clone(), - ) + TargetNode::new(hostname.clone(), Some(host.upcast()), info.clone()) } else { - log::error!("Host \"{}\" is not present in the Hive configuration.", hostname.as_str()); + log::error!( + "Host \"{}\" is not present in the Hive configuration.", + hostname.as_str() + ); quit::with_code(2); } }; @@ -138,12 +137,10 @@ pub async fn run(_global_args: &ArgMatches, local_args: &ArgMatches) -> Result<( deployment.set_options(options); - let (deployment, output) = tokio::join!( - deployment.execute(), - output.run_until_completion(), - ); + let (deployment, output) = tokio::join!(deployment.execute(), output.run_until_completion(),); - deployment?; output?; + deployment?; + output?; Ok(()) } diff --git a/src/command/build.rs b/src/command/build.rs index d499e3c..140ea7a 100644 --- a/src/command/build.rs +++ b/src/command/build.rs @@ -8,14 +8,18 @@ pub use super::apply::run; pub fn subcommand() -> ClapCommand<'static> { let command = ClapCommand::new("build") .about("Build configurations but not push to remote machines") - .long_about(r#"Build configurations but not push to remote machines + .long_about( + r#"Build configurations but not push to remote machines -This subcommand behaves as if you invoked `apply` with the `build` goal."#) - .arg(Arg::new("goal") - .hide(true) - .default_value("build") - .possible_values(&["build"]) - .takes_value(true)); +This subcommand behaves as if you invoked `apply` with the `build` goal."#, + ) + .arg( + Arg::new("goal") + .hide(true) + .default_value("build") + .possible_values(&["build"]) + .takes_value(true), + ); let command = apply::register_deploy_args(command); diff --git a/src/command/eval.rs b/src/command/eval.rs index 5ffec44..301583d 100644 --- a/src/command/eval.rs +++ b/src/command/eval.rs @@ -1,6 +1,6 @@ use std::path::PathBuf; -use clap::{Arg, Command as ClapCommand, ArgMatches}; +use clap::{Arg, ArgMatches, Command as ClapCommand}; use crate::error::ColmenaError; use crate::util; @@ -10,8 +10,7 @@ pub fn subcommand() -> ClapCommand<'static> { } pub fn deprecated_alias() -> ClapCommand<'static> { - subcommand_gen("introspect") - .hide(true) + subcommand_gen("introspect").hide(true) } fn subcommand_gen(name: &str) -> ClapCommand<'static> { @@ -43,7 +42,9 @@ For example, to retrieve the configuration of one node, you may write something pub async fn run(global_args: &ArgMatches, local_args: &ArgMatches) -> Result<(), ColmenaError> { if let Some("introspect") = global_args.subcommand_name() { - log::warn!("`colmena introspect` has been renamed to `colmena eval`. Please update your scripts."); + log::warn!( + "`colmena introspect` has been renamed to `colmena eval`. Please update your scripts." + ); } let hive = util::hive_from_args(local_args).await?; @@ -57,7 +58,13 @@ pub async fn run(global_args: &ArgMatches, local_args: &ArgMatches) -> Result<() local_args.value_of("expression").unwrap().to_string() } else { let path: PathBuf = local_args.value_of("expression_file").unwrap().into(); - format!("import {}", path.canonicalize().expect("Could not generate absolute path to expression file.").to_str().unwrap()) + format!( + "import {}", + path.canonicalize() + .expect("Could not generate absolute path to expression file.") + .to_str() + .unwrap() + ) }; let instantiate = local_args.is_present("instantiate"); diff --git a/src/command/exec.rs b/src/command/exec.rs index e2deda8..31f8cfe 100644 --- a/src/command/exec.rs +++ b/src/command/exec.rs @@ -2,7 +2,7 @@ use std::env; use std::path::PathBuf; use std::sync::Arc; -use clap::{Arg, Command as ClapCommand, ArgMatches}; +use clap::{Arg, ArgMatches, Command as ClapCommand}; use futures::future::join_all; use tokio::sync::Semaphore; @@ -16,59 +16,67 @@ pub fn subcommand() -> ClapCommand<'static> { let command = ClapCommand::new("exec") .about("Run a command on remote machines") .trailing_var_arg(true) - .arg(Arg::new("parallel") - .short('p') - .long("parallel") - .value_name("LIMIT") - .help("Deploy parallelism limit") - .long_help(r#"Limits the maximum number of hosts to run the command in parallel. + .arg( + Arg::new("parallel") + .short('p') + .long("parallel") + .value_name("LIMIT") + .help("Deploy parallelism limit") + .long_help( + r#"Limits the maximum number of hosts to run the command in parallel. In `colmena exec`, the parallelism limit is disabled (0) by default. -"#) - .default_value("0") - .takes_value(true) - .validator(|s| { - match s.parse::() { +"#, + ) + .default_value("0") + .takes_value(true) + .validator(|s| match s.parse::() { Ok(_) => Ok(()), Err(_) => Err(String::from("The value must be a valid number")), - } - })) - .arg(Arg::new("verbose") - .short('v') - .long("verbose") - .help("Be verbose") - .long_help("Deactivates the progress spinner and prints every line of output.") - .takes_value(false)) - .arg(Arg::new("command") - .value_name("COMMAND") - .last(true) - .help("Command") - .required(true) - .multiple_occurrences(true) - .long_help(r#"Command to run + }), + ) + .arg( + Arg::new("verbose") + .short('v') + .long("verbose") + .help("Be verbose") + .long_help("Deactivates the progress spinner and prints every line of output.") + .takes_value(false), + ) + .arg( + Arg::new("command") + .value_name("COMMAND") + .last(true) + .help("Command") + .required(true) + .multiple_occurrences(true) + .long_help( + r#"Command to run It's recommended to use -- to separate Colmena options from the command to run. For example: colmena exec --on @routers -- tcpdump -vni any ip[9] == 89 -"#)); +"#, + ), + ); util::register_selector_args(command) } pub async fn run(_global_args: &ArgMatches, local_args: &ArgMatches) -> Result<(), ColmenaError> { let hive = util::hive_from_args(local_args).await?; - let ssh_config = env::var("SSH_CONFIG_FILE") - .ok().map(PathBuf::from); + let ssh_config = env::var("SSH_CONFIG_FILE").ok().map(PathBuf::from); - let filter = local_args.value_of("on") - .map(NodeFilter::new) - .transpose()?; + let filter = local_args.value_of("on").map(NodeFilter::new).transpose()?; let mut targets = hive.select_nodes(filter, ssh_config, true).await?; let parallel_sp = Arc::new({ - let limit = local_args.value_of("parallel").unwrap() - .parse::().unwrap(); + let limit = local_args + .value_of("parallel") + .unwrap() + .parse::() + .unwrap(); if limit > 0 { Some(Semaphore::new(limit)) @@ -77,7 +85,13 @@ pub async fn run(_global_args: &ArgMatches, local_args: &ArgMatches) -> Result<( } }); - let command: Arc> = Arc::new(local_args.values_of("command").unwrap().map(|s| s.to_string()).collect()); + let command: Arc> = Arc::new( + local_args + .values_of("command") + .unwrap() + .map(|s| s.to_string()) + .collect(), + ); let mut output = SimpleProgressOutput::new(local_args.is_present("verbose")); @@ -91,7 +105,7 @@ pub async fn run(_global_args: &ArgMatches, local_args: &ArgMatches) -> Result<( let mut host = target.into_host().unwrap(); - let job = meta.create_job(JobType::Execute, vec![ name.clone() ])?; + let job = meta.create_job(JobType::Execute, vec![name.clone()])?; futures.push(job.run_waiting(|job| async move { let permit = match parallel_sp.as_ref() { @@ -122,7 +136,9 @@ pub async fn run(_global_args: &ArgMatches, local_args: &ArgMatches) -> Result<( output.run_until_completion(), ); - meta?; monitor?; output?; + meta?; + monitor?; + output?; Ok(()) } diff --git a/src/command/mod.rs b/src/command/mod.rs index 2f8371d..af279bf 100644 --- a/src/command/mod.rs +++ b/src/command/mod.rs @@ -1,9 +1,9 @@ -pub mod build; pub mod apply; +pub mod build; pub mod eval; -pub mod upload_keys; pub mod exec; pub mod nix_info; +pub mod upload_keys; #[cfg(target_os = "linux")] pub mod apply_local; diff --git a/src/command/nix_info.rs b/src/command/nix_info.rs index 1c3b618..ae32a17 100644 --- a/src/command/nix_info.rs +++ b/src/command/nix_info.rs @@ -1,12 +1,11 @@ -use clap::{Command as ClapCommand, ArgMatches}; +use clap::{ArgMatches, Command as ClapCommand}; use crate::error::ColmenaError; -use crate::nix::NixCheck; use crate::nix::evaluator::nix_eval_jobs::get_pinned_nix_eval_jobs; +use crate::nix::NixCheck; pub fn subcommand() -> ClapCommand<'static> { - ClapCommand::new("nix-info") - .about("Show information about the current Nix installation") + ClapCommand::new("nix-info").about("Show information about the current Nix installation") } pub async fn run(_global_args: &ArgMatches, _local_args: &ArgMatches) -> Result<(), ColmenaError> { diff --git a/src/command/test_progress.rs b/src/command/test_progress.rs index bf2d558..36b57cb 100644 --- a/src/command/test_progress.rs +++ b/src/command/test_progress.rs @@ -1,17 +1,17 @@ use std::time::Duration; -use clap::{Command as ClapCommand, ArgMatches}; +use clap::{ArgMatches, Command as ClapCommand}; use tokio::time; use crate::error::{ColmenaError, ColmenaResult}; use crate::job::{JobMonitor, JobType}; use crate::nix::NodeName; -use crate::progress::{ProgressOutput, spinner::SpinnerOutput}; +use crate::progress::{spinner::SpinnerOutput, ProgressOutput}; macro_rules! node { ($n:expr) => { NodeName::new($n.to_string()).unwrap() - } + }; } pub fn subcommand() -> ClapCommand<'static> { @@ -44,7 +44,7 @@ pub async fn run(_global_args: &ArgMatches, _local_args: &ArgMatches) -> Result< Ok(()) }); - let build = meta.create_job(JobType::Build, vec![ node!("alpha"), node!("beta") ])?; + let build = meta.create_job(JobType::Build, vec![node!("alpha"), node!("beta")])?; let build = build.run(|_| async move { time::sleep(Duration::from_secs(5)).await; @@ -62,7 +62,8 @@ pub async fn run(_global_args: &ArgMatches, _local_args: &ArgMatches) -> Result< meta_future, ); - monitor?; output?; + monitor?; + output?; println!("Return Value -> {:?}", ret); diff --git a/src/command/upload_keys.rs b/src/command/upload_keys.rs index 0c39dc8..b0ca167 100644 --- a/src/command/upload_keys.rs +++ b/src/command/upload_keys.rs @@ -8,14 +8,18 @@ pub use super::apply::run; pub fn subcommand() -> ClapCommand<'static> { let command = ClapCommand::new("upload-keys") .about("Upload keys to remote hosts") - .long_about(r#"Upload keys to remote hosts + .long_about( + r#"Upload keys to remote hosts -This subcommand behaves as if you invoked `apply` with the pseudo `keys` goal."#) - .arg(Arg::new("goal") - .hide(true) - .default_value("keys") - .possible_values(&["keys"]) - .takes_value(true)); +This subcommand behaves as if you invoked `apply` with the pseudo `keys` goal."#, + ) + .arg( + Arg::new("goal") + .hide(true) + .default_value("keys") + .possible_values(&["keys"]) + .takes_value(true), + ); let command = apply::register_deploy_args(command); diff --git a/src/error.rs b/src/error.rs index 64b4d0b..619c14d 100644 --- a/src/error.rs +++ b/src/error.rs @@ -6,7 +6,7 @@ use std::process::ExitStatus; use snafu::Snafu; use validator::ValidationErrors; -use crate::nix::{key, StorePath, Profile}; +use crate::nix::{key, Profile, StorePath}; pub type ColmenaResult = Result; @@ -87,7 +87,9 @@ impl From for ColmenaError { fn from(status: ExitStatus) -> Self { match status.code() { Some(exit_code) => Self::ChildFailure { exit_code }, - None => Self::ChildKilled { signal: status.signal().unwrap() }, + None => Self::ChildKilled { + signal: status.signal().unwrap(), + }, } } } diff --git a/src/job.rs b/src/job.rs index 0624daf..ac5f483 100644 --- a/src/job.rs +++ b/src/job.rs @@ -13,9 +13,9 @@ use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; use tokio::time; use uuid::Uuid; -use crate::error::{ColmenaResult, ColmenaError}; +use crate::error::{ColmenaError, ColmenaResult}; use crate::nix::NodeName; -use crate::progress::{Sender as ProgressSender, Message as ProgressMessage, Line, LineStyle}; +use crate::progress::{Line, LineStyle, Message as ProgressMessage, Sender as ProgressSender}; pub type Sender = UnboundedSender; pub type Receiver = UnboundedReceiver; @@ -311,7 +311,12 @@ impl JobMonitor { } EventPayload::SuccessWithMessage(custom_message) => { let custom_message = Some(custom_message.clone()); - self.update_job_state(message.job_id, JobState::Succeeded, custom_message, false); + self.update_job_state( + message.job_id, + JobState::Succeeded, + custom_message, + false, + ); if message.job_id != self.meta_job_id { self.print_job_stats(); @@ -319,7 +324,12 @@ impl JobMonitor { } EventPayload::Noop(custom_message) => { let custom_message = Some(custom_message.clone()); - self.update_job_state(message.job_id, JobState::Succeeded, custom_message, true); + self.update_job_state( + message.job_id, + JobState::Succeeded, + custom_message, + true, + ); if message.job_id != self.meta_job_id { self.print_job_stats(); @@ -333,7 +343,9 @@ impl JobMonitor { self.print_job_stats(); } } - EventPayload::ChildStdout(m) | EventPayload::ChildStderr(m) | EventPayload::Message(m) => { + EventPayload::ChildStdout(m) + | EventPayload::ChildStderr(m) + | EventPayload::Message(m) => { if let Some(sender) = &self.progress { let metadata = &self.jobs[&message.job_id]; let line = metadata.get_line(m.clone()); @@ -348,7 +360,8 @@ impl JobMonitor { } /// Updates the state of a job. - fn update_job_state(&mut self, + fn update_job_state( + &mut self, job_id: JobId, new_state: JobState, message: Option, @@ -373,7 +386,9 @@ impl JobMonitor { if new_state != JobState::Waiting { if let Some(sender) = &self.progress { let text = if new_state == JobState::Succeeded { - metadata.custom_message.clone() + metadata + .custom_message + .clone() .or_else(|| metadata.describe_state_transition()) } else { metadata.describe_state_transition() @@ -401,8 +416,7 @@ impl JobMonitor { if let Some(sender) = &self.progress { let stats = self.get_job_stats(); let text = format!("{}", stats); - let line = self.jobs[&self.meta_job_id].get_line(text) - .noisy(); + let line = self.jobs[&self.meta_job_id].get_line(text).noisy(); let message = ProgressMessage::PrintMeta(line); sender.send(message).unwrap(); } @@ -463,10 +477,23 @@ impl JobMonitor { for job in self.jobs.values() { if job.state == JobState::Failed { - let logs: Vec<&Event> = self.events.iter().filter(|e| e.job_id == job.job_id).collect(); - let last_logs: Vec<&Event> = logs.into_iter().rev().take(LOG_CONTEXT_LINES).rev().collect(); + let logs: Vec<&Event> = self + .events + .iter() + .filter(|e| e.job_id == job.job_id) + .collect(); + let last_logs: Vec<&Event> = logs + .into_iter() + .rev() + .take(LOG_CONTEXT_LINES) + .rev() + .collect(); - log::error!("{} - Last {} lines of logs:", job.get_failure_summary(), last_logs.len()); + log::error!( + "{} - Last {} lines of logs:", + job.get_failure_summary(), + last_logs.len() + ); for event in last_logs { log::error!("{}", event.payload); } @@ -498,13 +525,12 @@ impl JobHandleInner { /// This sends out a Creation message with the metadata. pub fn create_job(&self, job_type: JobType, nodes: Vec) -> ColmenaResult { let job_id = JobId::new(); - let creation = JobCreation { - job_type, - nodes, - }; + let creation = JobCreation { job_type, nodes }; if job_type == JobType::Meta { - return Err(ColmenaError::Unknown { message: "Cannot create a meta job!".to_string() }); + return Err(ColmenaError::Unknown { + message: "Cannot create a meta job!".to_string(), + }); } let new_handle = Arc::new(Self { @@ -521,8 +547,9 @@ impl JobHandleInner { /// /// This immediately transitions the state to Running. pub async fn run(self: Arc, f: U) -> ColmenaResult - where U: FnOnce(Arc) -> F, - F: Future>, + where + U: FnOnce(Arc) -> F, + F: Future>, { self.run_internal(f, true).await } @@ -531,8 +558,9 @@ impl JobHandleInner { /// /// This does not immediately transition the state to Running. pub async fn run_waiting(self: Arc, f: U) -> ColmenaResult - where U: FnOnce(Arc) -> F, - F: Future>, + where + U: FnOnce(Arc) -> F, + F: Future>, { self.run_internal(f, false).await } @@ -574,8 +602,9 @@ impl JobHandleInner { /// Runs a closure, automatically updating the job monitor based on the result. async fn run_internal(self: Arc, f: U, report_running: bool) -> ColmenaResult - where U: FnOnce(Arc) -> F, - F: Future>, + where + U: FnOnce(Arc) -> F, + F: Future>, { if report_running { // Tell monitor we are starting @@ -606,7 +635,8 @@ impl JobHandleInner { let event = Event::new(self.job_id, payload); if let Some(sender) = &self.sender { - sender.send(event) + sender + .send(event) .map_err(|e| ColmenaError::unknown(Box::new(e)))?; } else { log::debug!("Sending event: {:?}", event); @@ -619,8 +649,9 @@ impl JobHandleInner { impl MetaJobHandle { /// Runs a closure, automatically updating the job monitor based on the result. pub async fn run(self, f: U) -> ColmenaResult - where U: FnOnce(JobHandle) -> F, - F: Future>, + where + U: FnOnce(JobHandle) -> F, + F: Future>, { let normal_handle = Arc::new(JobHandleInner { job_id: self.job_id, @@ -647,7 +678,8 @@ impl MetaJobHandle { fn send_payload(&self, payload: EventPayload) -> ColmenaResult<()> { let event = Event::new(self.job_id, payload); - self.sender.send(event) + self.sender + .send(event) .map_err(|e| ColmenaError::unknown(Box::new(e)))?; Ok(()) @@ -685,11 +717,10 @@ impl JobMetadata { return None; } - let node_list = describe_node_list(&self.nodes) - .unwrap_or_else(|| "some node(s)".to_string()); + let node_list = + describe_node_list(&self.nodes).unwrap_or_else(|| "some node(s)".to_string()); - let message = self.custom_message.as_deref() - .unwrap_or("No message"); + let message = self.custom_message.as_deref().unwrap_or("No message"); Some(match (self.job_type, self.state) { (JobType::Meta, JobState::Succeeded) => "All done!".to_string(), @@ -725,8 +756,8 @@ impl JobMetadata { /// Returns a human-readable string describing a failed job for use in the summary. fn get_failure_summary(&self) -> String { - let node_list = describe_node_list(&self.nodes) - .unwrap_or_else(|| "some node(s)".to_string()); + let node_list = + describe_node_list(&self.nodes).unwrap_or_else(|| "some node(s)".to_string()); match self.job_type { JobType::Evaluate => format!("Failed to evaluate {}", node_list), @@ -757,15 +788,15 @@ impl EventPayload { impl Display for EventPayload { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { - EventPayload::ChildStdout(o) => write!(f, " stdout) {}", o)?, - EventPayload::ChildStderr(o) => write!(f, " stderr) {}", o)?, - EventPayload::Message(m) => write!(f, " message) {}", m)?, - EventPayload::Creation(_) => write!(f, " created)")?, - EventPayload::NewState(s) => write!(f, " state) {:?}", s)?, + EventPayload::ChildStdout(o) => write!(f, " stdout) {}", o)?, + EventPayload::ChildStderr(o) => write!(f, " stderr) {}", o)?, + EventPayload::Message(m) => write!(f, " message) {}", m)?, + EventPayload::Creation(_) => write!(f, " created)")?, + EventPayload::NewState(s) => write!(f, " state) {:?}", s)?, EventPayload::SuccessWithMessage(m) => write!(f, " success) {}", m)?, - EventPayload::Noop(m) => write!(f, " noop) {}", m)?, - EventPayload::Failure(e) => write!(f, " failure) {}", e)?, - EventPayload::ShutdownMonitor => write!(f, "shutdown)")?, + EventPayload::Noop(m) => write!(f, " noop) {}", m)?, + EventPayload::Failure(e) => write!(f, " failure) {}", e)?, + EventPayload::ShutdownMonitor => write!(f, "shutdown)")?, } Ok(()) @@ -865,7 +896,7 @@ mod tests { macro_rules! node { ($n:expr) => { NodeName::new($n.to_string()).unwrap() - } + }; } #[test] @@ -876,21 +907,20 @@ mod tests { let meta = meta.run(|job: JobHandle| async move { job.message("hello world".to_string())?; - let eval_job = job.create_job(JobType::Evaluate, vec![ node!("alpha") ])?; - eval_job.run(|job| async move { - job.stdout("child stdout".to_string())?; + let eval_job = job.create_job(JobType::Evaluate, vec![node!("alpha")])?; + eval_job + .run(|job| async move { + job.stdout("child stdout".to_string())?; - Ok(()) - }).await?; + Ok(()) + }) + .await?; Err(ColmenaError::Unsupported) as ColmenaResult<()> }); // Run until completion - let (ret, monitor) = tokio::join!( - meta, - monitor.run_until_completion(), - ); + let (ret, monitor) = tokio::join!(meta, monitor.run_until_completion(),); match ret { Err(ColmenaError::Unsupported) => (), diff --git a/src/main.rs b/src/main.rs index 2886487..a915ebd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,11 +1,11 @@ #![deny(unused_must_use)] -mod error; -mod nix; mod cli; mod command; -mod progress; +mod error; mod job; +mod nix; +mod progress; mod troubleshooter; mod util; diff --git a/src/nix/deployment/mod.rs b/src/nix/deployment/mod.rs index 1223b94..494829b 100644 --- a/src/nix/deployment/mod.rs +++ b/src/nix/deployment/mod.rs @@ -8,7 +8,7 @@ pub mod limits; pub use limits::{EvaluationNodeLimit, ParallelismLimit}; pub mod options; -pub use options::{Options, Evaluator}; +pub use options::{Evaluator, Options}; use std::collections::HashMap; use std::mem; @@ -18,30 +18,17 @@ use futures::future::join_all; use itertools::Itertools; use tokio_stream::StreamExt; -use crate::progress::Sender as ProgressSender; -use crate::job::{JobMonitor, JobHandle, JobType, JobState}; -use crate::util; use super::NixOptions; +use crate::job::{JobHandle, JobMonitor, JobState, JobType}; +use crate::progress::Sender as ProgressSender; +use crate::util; use super::{ - Hive, - Host, - NodeName, - NodeConfig, - ColmenaError, - ColmenaResult, - Profile, - ProfileDerivation, - CopyDirection, - CopyOptions, - RebootOptions, + evaluator::{DrvSetEvaluator, EvalError, NixEvalJobs}, host::Local as LocalHost, key::{Key, UploadAt as UploadKeyAt}, - evaluator::{ - DrvSetEvaluator, - NixEvalJobs, - EvalError, - }, + ColmenaError, ColmenaResult, CopyDirection, CopyOptions, Hive, Host, NodeConfig, NodeName, + Profile, ProfileDerivation, RebootOptions, }; /// A deployment. @@ -106,7 +93,12 @@ impl TargetNode { impl Deployment { /// Creates a new deployment. - pub fn new(hive: Hive, targets: TargetNodeMap, goal: Goal, progress: Option) -> Self { + pub fn new( + hive: Hive, + targets: TargetNodeMap, + goal: Goal, + progress: Option, + ) -> Self { Self { hive, goal, @@ -151,16 +143,15 @@ impl Deployment { futures.push(deployment.upload_keys_to_node(meta.clone(), target)); } - join_all(futures).await - .into_iter().collect::>>()?; + join_all(futures) + .await + .into_iter() + .collect::>>()?; Ok(()) }); - let (result, _) = tokio::join!( - meta_future, - monitor.run_until_completion(), - ); + let (result, _) = tokio::join!(meta_future, monitor.run_until_completion(),); result?; @@ -183,10 +174,7 @@ impl Deployment { Ok(()) }); - let (result, _) = tokio::join!( - meta_future, - monitor.run_until_completion(), - ); + let (result, _) = tokio::join!(meta_future, monitor.run_until_completion(),); result?; @@ -207,10 +195,14 @@ impl Deployment { } /// Executes the deployment on selected nodes, evaluating a chunk at a time. - async fn execute_chunked(self: &DeploymentHandle, parent: JobHandle, mut targets: TargetNodeMap) - -> ColmenaResult<()> - { - let eval_limit = self.evaluation_node_limit.get_limit() + async fn execute_chunked( + self: &DeploymentHandle, + parent: JobHandle, + mut targets: TargetNodeMap, + ) -> ColmenaResult<()> { + let eval_limit = self + .evaluation_node_limit + .get_limit() .unwrap_or(self.targets.len()); let mut futures = Vec::new(); @@ -224,7 +216,8 @@ impl Deployment { futures.push(self.execute_one_chunk(parent.clone(), map)); } - join_all(futures).await + join_all(futures) + .await .into_iter() .collect::>>()?; @@ -232,9 +225,11 @@ impl Deployment { } /// Executes the deployment on selected nodes using a streaming evaluator. - async fn execute_streaming(self: &DeploymentHandle, parent: JobHandle, mut targets: TargetNodeMap) - -> ColmenaResult<()> - { + async fn execute_streaming( + self: &DeploymentHandle, + parent: JobHandle, + mut targets: TargetNodeMap, + ) -> ColmenaResult<()> { if self.goal == Goal::UploadKeys { unreachable!(); // some logic is screwed up } @@ -244,81 +239,101 @@ impl Deployment { let job = parent.create_job(JobType::Evaluate, nodes.clone())?; - let futures = job.run(|job| async move { - let mut evaluator = NixEvalJobs::default(); - let eval_limit = self.evaluation_node_limit.get_limit().unwrap_or(self.targets.len()); - evaluator.set_eval_limit(eval_limit); - evaluator.set_job(job.clone()); + let futures = job + .run(|job| async move { + let mut evaluator = NixEvalJobs::default(); + let eval_limit = self + .evaluation_node_limit + .get_limit() + .unwrap_or(self.targets.len()); + evaluator.set_eval_limit(eval_limit); + evaluator.set_job(job.clone()); - // FIXME: nix-eval-jobs currently does not support IFD with builders - let options = self.hive.nix_options(); - let mut stream = evaluator.evaluate(&expr, options).await?; + // FIXME: nix-eval-jobs currently does not support IFD with builders + let options = self.hive.nix_options(); + let mut stream = evaluator.evaluate(&expr, options).await?; - let mut futures: Vec>> = Vec::new(); + let mut futures: Vec>> = Vec::new(); - while let Some(item) = stream.next().await { - match item { - Ok(attr) => { - let node_name = NodeName::new(attr.attribute().to_owned())?; - let profile_drv: ProfileDerivation = attr.into_derivation()?; + while let Some(item) = stream.next().await { + match item { + Ok(attr) => { + let node_name = NodeName::new(attr.attribute().to_owned())?; + let profile_drv: ProfileDerivation = attr.into_derivation()?; - // FIXME: Consolidate - let mut target = targets.remove(&node_name).unwrap(); + // FIXME: Consolidate + let mut target = targets.remove(&node_name).unwrap(); - if let Some(force_build_on_target) = self.options.force_build_on_target { - target.config.set_build_on_target(force_build_on_target); - } + if let Some(force_build_on_target) = self.options.force_build_on_target + { + target.config.set_build_on_target(force_build_on_target); + } - let job_handle = job.clone(); - let arc_self = self.clone(); - futures.push(tokio::spawn(async move { - let (target, profile) = { - if target.config.build_on_target() { - arc_self.build_on_node(job_handle.clone(), target, profile_drv.clone()).await? + let job_handle = job.clone(); + let arc_self = self.clone(); + futures.push(tokio::spawn(async move { + let (target, profile) = { + if target.config.build_on_target() { + arc_self + .build_on_node( + job_handle.clone(), + target, + profile_drv.clone(), + ) + .await? + } else { + arc_self + .build_and_push_node( + job_handle.clone(), + target, + profile_drv.clone(), + ) + .await? + } + }; + + if arc_self.goal.requires_activation() { + arc_self.activate_node(job_handle, target, profile).await } else { - arc_self.build_and_push_node(job_handle.clone(), target, profile_drv.clone()).await? + Ok(()) } - }; - - if arc_self.goal.requires_activation() { - arc_self.activate_node(job_handle, target, profile).await - } else { - Ok(()) - } - })); - } - Err(e) => { - match e { - EvalError::Global(e) => { - // Global error - Abort immediately - return Err(e); - } - EvalError::Attribute(e) => { - // Attribute-level error - // - // Here the eventual non-zero exit code of the evaluator - // will translate into an `EvalError::Global`, causing - // the entire future to resolve to an Err. - - let node_name = NodeName::new(e.attribute().to_string()).unwrap(); - let nodes = vec![ node_name ]; - let job = parent.create_job(JobType::Evaluate, nodes)?; - - job.state(JobState::Running)?; - for line in e.error().lines() { - job.stderr(line.to_string())?; + })); + } + Err(e) => { + match e { + EvalError::Global(e) => { + // Global error - Abort immediately + return Err(e); + } + EvalError::Attribute(e) => { + // Attribute-level error + // + // Here the eventual non-zero exit code of the evaluator + // will translate into an `EvalError::Global`, causing + // the entire future to resolve to an Err. + + let node_name = + NodeName::new(e.attribute().to_string()).unwrap(); + let nodes = vec![node_name]; + let job = parent.create_job(JobType::Evaluate, nodes)?; + + job.state(JobState::Running)?; + for line in e.error().lines() { + job.stderr(line.to_string())?; + } + job.state(JobState::Failed)?; } - job.state(JobState::Failed)?; } } } } - } - Ok(futures) - }).await?; + Ok(futures) + }) + .await?; - join_all(futures).await + join_all(futures) + .await .into_iter() .map(|r| r.unwrap()) // panic on JoinError (future panicked) .collect::>>()?; @@ -327,7 +342,11 @@ impl Deployment { } /// Executes the deployment against a portion of nodes. - async fn execute_one_chunk(self: &DeploymentHandle, parent: JobHandle, mut chunk: TargetNodeMap) -> ColmenaResult<()> { + async fn execute_one_chunk( + self: &DeploymentHandle, + parent: JobHandle, + mut chunk: TargetNodeMap, + ) -> ColmenaResult<()> { if self.goal == Goal::UploadKeys { unreachable!(); // some logic is screwed up } @@ -349,9 +368,13 @@ impl Deployment { futures.push(async move { let (target, profile) = { if target.config.build_on_target() { - arc_self.build_on_node(job_handle.clone(), target, profile_drv.clone()).await? + arc_self + .build_on_node(job_handle.clone(), target, profile_drv.clone()) + .await? } else { - arc_self.build_and_push_node(job_handle.clone(), target, profile_drv.clone()).await? + arc_self + .build_and_push_node(job_handle.clone(), target, profile_drv.clone()) + .await? } }; @@ -363,16 +386,20 @@ impl Deployment { }); } - join_all(futures).await - .into_iter().collect::>>()?; + join_all(futures) + .await + .into_iter() + .collect::>>()?; Ok(()) } /// Evaluates a set of nodes, returning their corresponding store derivations. - async fn evaluate_nodes(self: &DeploymentHandle, parent: JobHandle, nodes: Vec) - -> ColmenaResult> - { + async fn evaluate_nodes( + self: &DeploymentHandle, + parent: JobHandle, + nodes: Vec, + ) -> ColmenaResult> { let job = parent.create_job(JobType::Evaluate, nodes.clone())?; job.run_waiting(|job| async move { @@ -384,11 +411,16 @@ impl Deployment { drop(permit); result - }).await + }) + .await } /// Only uploads keys to a node. - async fn upload_keys_to_node(self: &DeploymentHandle, parent: JobHandle, mut target: TargetNode) -> ColmenaResult<()> { + async fn upload_keys_to_node( + self: &DeploymentHandle, + parent: JobHandle, + mut target: TargetNode, + ) -> ColmenaResult<()> { let nodes = vec![target.name.clone()]; let job = parent.create_job(JobType::UploadKeys, nodes)?; job.run(|_| async move { @@ -400,37 +432,44 @@ impl Deployment { host.upload_keys(&target.config.keys, true).await?; Ok(()) - }).await + }) + .await } /// Builds a system profile directly on the node itself. - async fn build_on_node(self: &DeploymentHandle, parent: JobHandle, mut target: TargetNode, profile_drv: ProfileDerivation) - -> ColmenaResult<(TargetNode, Profile)> - { + async fn build_on_node( + self: &DeploymentHandle, + parent: JobHandle, + mut target: TargetNode, + profile_drv: ProfileDerivation, + ) -> ColmenaResult<(TargetNode, Profile)> { let nodes = vec![target.name.clone()]; let permit = self.parallelism_limit.apply.acquire().await.unwrap(); let build_job = parent.create_job(JobType::Build, nodes.clone())?; - let (target, profile) = build_job.run(|job| async move { - if target.host.is_none() { - return Err(ColmenaError::Unsupported); - } + let (target, profile) = build_job + .run(|job| async move { + if target.host.is_none() { + return Err(ColmenaError::Unsupported); + } - let host = target.host.as_mut().unwrap(); - host.set_job(Some(job.clone())); + let host = target.host.as_mut().unwrap(); + host.set_job(Some(job.clone())); - host.copy_closure( - profile_drv.as_store_path(), - CopyDirection::ToRemote, - CopyOptions::default().include_outputs(true), - ).await?; + host.copy_closure( + profile_drv.as_store_path(), + CopyDirection::ToRemote, + CopyOptions::default().include_outputs(true), + ) + .await?; - let profile = profile_drv.realize_remote(host).await?; + let profile = profile_drv.realize_remote(host).await?; - job.success_with_message(format!("Built {:?} on target node", profile.as_path()))?; - Ok((target, profile)) - }).await?; + job.success_with_message(format!("Built {:?} on target node", profile.as_path()))?; + Ok((target, profile)) + }) + .await?; drop(permit); @@ -438,9 +477,12 @@ impl Deployment { } /// Builds and pushes a system profile on a node. - async fn build_and_push_node(self: &DeploymentHandle, parent: JobHandle, target: TargetNode, profile_drv: ProfileDerivation) - -> ColmenaResult<(TargetNode, Profile)> - { + async fn build_and_push_node( + self: &DeploymentHandle, + parent: JobHandle, + target: TargetNode, + profile_drv: ProfileDerivation, + ) -> ColmenaResult<(TargetNode, Profile)> { let nodes = vec![target.name.clone()]; let permit = self.parallelism_limit.apply.acquire().await.unwrap(); @@ -448,16 +490,18 @@ impl Deployment { // Build system profile let build_job = parent.create_job(JobType::Build, nodes.clone())?; let arc_self = self.clone(); - let profile: Profile = build_job.run(|job| async move { - // FIXME: Remote builder? - let mut builder = LocalHost::new(arc_self.nix_options.clone()).upcast(); - builder.set_job(Some(job.clone())); + let profile: Profile = build_job + .run(|job| async move { + // FIXME: Remote builder? + let mut builder = LocalHost::new(arc_self.nix_options.clone()).upcast(); + builder.set_job(Some(job.clone())); - let profile = profile_drv.realize(&mut builder).await?; + let profile = profile_drv.realize(&mut builder).await?; - job.success_with_message(format!("Built {:?}", profile.as_path()))?; - Ok(profile) - }).await?; + job.success_with_message(format!("Built {:?}", profile.as_path()))?; + Ok(profile) + }) + .await?; // Create GC root let profile_r = profile.clone(); @@ -474,7 +518,8 @@ impl Deployment { job.noop("No context directory to create GC roots in".to_string())?; } Ok(target) - }).await? + }) + .await? } else { target }; @@ -487,20 +532,24 @@ impl Deployment { let push_job = parent.create_job(JobType::Push, nodes.clone())?; let push_profile = profile.clone(); let arc_self = self.clone(); - let target = push_job.run(|job| async move { - if target.host.is_none() { - return Err(ColmenaError::Unsupported); - } + let target = push_job + .run(|job| async move { + if target.host.is_none() { + return Err(ColmenaError::Unsupported); + } - let host = target.host.as_mut().unwrap(); - host.set_job(Some(job.clone())); - host.copy_closure( - push_profile.as_store_path(), - CopyDirection::ToRemote, - arc_self.options.to_copy_options()).await?; + let host = target.host.as_mut().unwrap(); + host.set_job(Some(job.clone())); + host.copy_closure( + push_profile.as_store_path(), + CopyDirection::ToRemote, + arc_self.options.to_copy_options(), + ) + .await?; - Ok(target) - }).await?; + Ok(target) + }) + .await?; drop(permit); @@ -510,9 +559,12 @@ impl Deployment { /// Activates a system profile on a node. /// /// This will also upload keys to the node. - async fn activate_node(self: DeploymentHandle, parent: JobHandle, mut target: TargetNode, profile: Profile) - -> ColmenaResult<()> - { + async fn activate_node( + self: DeploymentHandle, + parent: JobHandle, + mut target: TargetNode, + profile: Profile, + ) -> ColmenaResult<()> { let nodes = vec![target.name.clone()]; let permit = self.parallelism_limit.apply.acquire().await.unwrap(); @@ -521,7 +573,10 @@ impl Deployment { let mut target = if self.options.upload_keys { let job = parent.create_job(JobType::UploadKeys, nodes.clone())?; job.run_waiting(|job| async move { - let keys = target.config.keys.iter() + let keys = target + .config + .keys + .iter() .filter(|(_, v)| v.upload_at() == UploadKeyAt::PreActivation) .map(|(k, v)| (k.clone(), v.clone())) .collect::>(); @@ -540,7 +595,8 @@ impl Deployment { job.success_with_message("Uploaded keys (pre-activation)".to_string())?; Ok(target) - }).await? + }) + .await? } else { target }; @@ -580,7 +636,10 @@ impl Deployment { let mut target = if self.options.upload_keys { let job = parent.create_job(JobType::UploadKeys, nodes.clone())?; job.run_waiting(|job| async move { - let keys = target.config.keys.iter() + let keys = target + .config + .keys + .iter() .filter(|(_, v)| v.upload_at() == UploadKeyAt::PostActivation) .map(|(k, v)| (k.clone(), v.clone())) .collect::>(); @@ -599,7 +658,8 @@ impl Deployment { job.success_with_message("Uploaded keys (post-activation)".to_string())?; Ok(target) - }).await? + }) + .await? } else { target }; @@ -625,7 +685,8 @@ impl Deployment { host.reboot(options).await?; Ok(()) - }).await?; + }) + .await?; } drop(permit); diff --git a/src/nix/deployment/options.rs b/src/nix/deployment/options.rs index 7301add..7798285 100644 --- a/src/nix/deployment/options.rs +++ b/src/nix/deployment/options.rs @@ -116,6 +116,6 @@ impl FromStr for Evaluator { impl Evaluator { pub fn possible_values() -> &'static [&'static str] { - &[ "chunked", "streaming" ] + &["chunked", "streaming"] } } diff --git a/src/nix/evaluator/mod.rs b/src/nix/evaluator/mod.rs index 5c88ad5..468ed91 100644 --- a/src/nix/evaluator/mod.rs +++ b/src/nix/evaluator/mod.rs @@ -17,9 +17,9 @@ use std::result::Result as StdResult; use async_trait::async_trait; use futures::Stream; +use super::{BuildResult, NixExpression, NixOptions, StoreDerivation, StorePath}; +use crate::error::{ColmenaError, ColmenaResult}; use crate::job::JobHandle; -use crate::error::{ColmenaResult, ColmenaError}; -use super::{BuildResult, StorePath, StoreDerivation, NixExpression, NixOptions}; /// The result of an evaluation. /// @@ -58,7 +58,11 @@ pub struct AttributeError { #[async_trait] pub trait DrvSetEvaluator { /// Evaluates an attribute set of derivation, returning results as they come in. - async fn evaluate(&self, expression: &dyn NixExpression, options: NixOptions) -> ColmenaResult>>>; + async fn evaluate( + &self, + expression: &dyn NixExpression, + options: NixOptions, + ) -> ColmenaResult>>>; /// Sets the maximum number of attributes to evaluate at the same time. #[allow(unused_variables)] @@ -77,7 +81,8 @@ impl AttributeOutput { /// Returns the derivation for this attribute. pub fn into_derivation(self) -> ColmenaResult> - where T: TryFrom>, + where + T: TryFrom>, { self.drv_path.into_derivation() } diff --git a/src/nix/evaluator/nix_eval_jobs.rs b/src/nix/evaluator/nix_eval_jobs.rs index 918a914..98f5ac4 100644 --- a/src/nix/evaluator/nix_eval_jobs.rs +++ b/src/nix/evaluator/nix_eval_jobs.rs @@ -18,11 +18,11 @@ use tempfile::NamedTempFile; use tokio::io::{AsyncBufReadExt, BufReader}; use tokio::process::Command; -use crate::error::{ColmenaResult, ColmenaError}; -use crate::job::{JobHandle, null_job_handle}; -use crate::nix::{StorePath, NixExpression, NixOptions}; +use super::{AttributeError, AttributeOutput, DrvSetEvaluator, EvalError, EvalResult}; +use crate::error::{ColmenaError, ColmenaResult}; +use crate::job::{null_job_handle, JobHandle}; +use crate::nix::{NixExpression, NixOptions, StorePath}; use crate::util::capture_stream; -use super::{DrvSetEvaluator, EvalResult, EvalError, AttributeOutput, AttributeError}; /// The pinned nix-eval-jobs binary. pub const NIX_EVAL_JOBS: Option<&str> = option_env!("NIX_EVAL_JOBS"); @@ -73,7 +73,11 @@ struct EvalLineGlobalError { #[async_trait] impl DrvSetEvaluator for NixEvalJobs { - async fn evaluate(&self, expression: &dyn NixExpression, options: NixOptions) -> ColmenaResult>>> { + async fn evaluate( + &self, + expression: &dyn NixExpression, + options: NixOptions, + ) -> ColmenaResult>>> { let expr_file = { let mut f = NamedTempFile::new()?; f.write_all(expression.expression().as_bytes())?; @@ -83,7 +87,8 @@ impl DrvSetEvaluator for NixEvalJobs { let mut command = Command::new(&self.executable); command .arg("--impure") - .arg("--workers").arg(self.workers.to_string()) + .arg("--workers") + .arg(self.workers.to_string()) .arg(&expr_file); command.args(options.to_args()); @@ -101,9 +106,7 @@ impl DrvSetEvaluator for NixEvalJobs { let stderr = BufReader::new(child.stderr.take().unwrap()); let job = self.job.clone(); - tokio::spawn(async move { - capture_stream(stderr, Some(job), true).await - }); + tokio::spawn(async move { capture_stream(stderr, Some(job), true).await }); Ok(Box::pin(stream! { loop { @@ -206,9 +209,7 @@ impl From for AttributeError { impl From for ColmenaError { fn from(ele: EvalLineGlobalError) -> Self { - ColmenaError::Unknown { - message: ele.error, - } + ColmenaError::Unknown { message: ele.error } } } @@ -234,8 +235,8 @@ mod tests { use super::*; use ntest::timeout; - use tokio_test::block_on; use tokio_stream::StreamExt; + use tokio_test::block_on; #[test] #[timeout(30000)] @@ -244,7 +245,10 @@ mod tests { let expr = r#"with import {}; { a = pkgs.hello; b = pkgs.bash; }"#.to_string(); block_on(async move { - let mut stream = evaluator.evaluate(&expr, NixOptions::default()).await.unwrap(); + let mut stream = evaluator + .evaluate(&expr, NixOptions::default()) + .await + .unwrap(); let mut count = 0; while let Some(value) = stream.next().await { @@ -265,7 +269,10 @@ mod tests { let expr = r#"gibberish"#.to_string(); block_on(async move { - let mut stream = evaluator.evaluate(&expr, NixOptions::default()).await.unwrap(); + let mut stream = evaluator + .evaluate(&expr, NixOptions::default()) + .await + .unwrap(); let mut count = 0; while let Some(value) = stream.next().await { @@ -282,10 +289,14 @@ mod tests { #[timeout(30000)] fn test_attribute_error() { let evaluator = NixEvalJobs::default(); - let expr = r#"with import {}; { a = pkgs.hello; b = throw "an error"; }"#.to_string(); + let expr = + r#"with import {}; { a = pkgs.hello; b = throw "an error"; }"#.to_string(); block_on(async move { - let mut stream = evaluator.evaluate(&expr, NixOptions::default()).await.unwrap(); + let mut stream = evaluator + .evaluate(&expr, NixOptions::default()) + .await + .unwrap(); let mut count = 0; while let Some(value) = stream.next().await { @@ -295,16 +306,14 @@ mod tests { Ok(v) => { assert_eq!("a", v.attribute); } - Err(e) => { - match e { - EvalError::Attribute(a) => { - assert_eq!("b", a.attribute); - } - _ => { - panic!("Expected an attribute error, got {:?}", e); - } + Err(e) => match e { + EvalError::Attribute(a) => { + assert_eq!("b", a.attribute); } - } + _ => { + panic!("Expected an attribute error, got {:?}", e); + } + }, } count += 1; } @@ -324,7 +333,10 @@ mod tests { let expr = r#"with import {}; { a = pkgs.hello; b = pkgs.writeText "x" (import /sys/nonexistentfile); }"#.to_string(); block_on(async move { - let mut stream = evaluator.evaluate(&expr, NixOptions::default()).await.unwrap(); + let mut stream = evaluator + .evaluate(&expr, NixOptions::default()) + .await + .unwrap(); let mut count = 0; while let Some(value) = stream.next().await { @@ -334,17 +346,15 @@ mod tests { Ok(v) => { assert_eq!("a", v.attribute); } - Err(e) => { - match e { - EvalError::Global(e) => { - let message = format!("{}", e); - assert!(message.find("No such file or directory").is_some()); - } - _ => { - panic!("Expected a global error, got {:?}", e); - } + Err(e) => match e { + EvalError::Global(e) => { + let message = format!("{}", e); + assert!(message.find("No such file or directory").is_some()); } - } + _ => { + panic!("Expected a global error, got {:?}", e); + } + }, } count += 1; } diff --git a/src/nix/flake.rs b/src/nix/flake.rs index b897e2a..01f2bfe 100644 --- a/src/nix/flake.rs +++ b/src/nix/flake.rs @@ -7,7 +7,7 @@ use std::process::Stdio; use serde::Deserialize; use tokio::process::Command; -use super::{NixCheck, ColmenaError, ColmenaResult}; +use super::{ColmenaError, ColmenaResult, NixCheck}; /// A Nix Flake. #[derive(Debug)] @@ -27,7 +27,10 @@ impl Flake { pub async fn from_dir>(dir: P) -> ColmenaResult { NixCheck::require_flake_support().await?; - let flake = dir.as_ref().as_os_str().to_str() + let flake = dir + .as_ref() + .as_os_str() + .to_str() .expect("Flake directory path contains non-UTF-8 characters"); let info = FlakeMetadata::resolve(flake).await?; @@ -83,10 +86,9 @@ impl FlakeMetadata { return Err(output.status.into()); } - serde_json::from_slice::(&output.stdout) - .map_err(|_| { - let output = String::from_utf8_lossy(&output.stdout).to_string(); - ColmenaError::BadOutput { output } - }) + serde_json::from_slice::(&output.stdout).map_err(|_| { + let output = String::from_utf8_lossy(&output.stdout).to_string(); + ColmenaError::BadOutput { output } + }) } } diff --git a/src/nix/hive/assets.rs b/src/nix/hive/assets.rs index 5e1e6b3..a341695 100644 --- a/src/nix/hive/assets.rs +++ b/src/nix/hive/assets.rs @@ -32,9 +32,7 @@ impl Assets { create_file(&temp_dir, "options.nix", false, OPTIONS_NIX); create_file(&temp_dir, "modules.nix", false, MODULES_NIX); - Self { - temp_dir, - } + Self { temp_dir } } /// Returns the base expression from which the evaluated Hive can be used. @@ -62,8 +60,12 @@ impl Assets { } fn get_path(&self, name: &str) -> String { - self.temp_dir.path().join(name) - .to_str().unwrap().to_string() + self.temp_dir + .path() + .join(name) + .to_str() + .unwrap() + .to_string() } } diff --git a/src/nix/hive/mod.rs b/src/nix/hive/mod.rs index 3e8ce7e..2d2c3da 100644 --- a/src/nix/hive/mod.rs +++ b/src/nix/hive/mod.rs @@ -4,30 +4,24 @@ mod assets; mod tests; use std::collections::HashMap; +use std::convert::AsRef; use std::io::Write; use std::path::{Path, PathBuf}; -use std::convert::AsRef; +use serde::Serialize; use tempfile::{NamedTempFile, TempPath}; use tokio::process::Command; use tokio::sync::OnceCell; -use serde::Serialize; use validator::Validate; -use super::{ - Flake, - NixOptions, - NodeName, - NodeConfig, - NodeFilter, - NixExpression, - ProfileDerivation, - StorePath, MetaConfig, -}; use super::deployment::TargetNode; +use super::{ + Flake, MetaConfig, NixExpression, NixOptions, NodeConfig, NodeFilter, NodeName, + ProfileDerivation, StorePath, +}; use crate::error::ColmenaResult; -use crate::util::{CommandExecution, CommandExt}; use crate::job::JobHandle; +use crate::util::{CommandExecution, CommandExt}; use assets::Assets; #[derive(Debug)] @@ -98,12 +92,8 @@ impl HivePath { fn context_dir(&self) -> Option { match self { - Self::Legacy(p) => { - p.parent().map(|d| d.to_owned()) - } - Self::Flake(flake) => { - flake.local_dir().map(|d| d.to_owned()) - } + Self::Legacy(p) => p.parent().map(|d| d.to_owned()), + Self::Flake(flake) => flake.local_dir().map(|d| d.to_owned()), } } } @@ -112,7 +102,7 @@ impl Hive { pub fn new(path: HivePath) -> ColmenaResult { let context_dir = path.context_dir(); - Ok(Self{ + Ok(Self { path, context_dir, assets: Assets::new(), @@ -126,10 +116,14 @@ impl Hive { } pub async fn get_meta_config(&self) -> ColmenaResult<&MetaConfig> { - self.meta_config.get_or_try_init(||async { - self.nix_instantiate("hive.metaConfig").eval() - .capture_json().await - }).await + self.meta_config + .get_or_try_init(|| async { + self.nix_instantiate("hive.metaConfig") + .eval() + .capture_json() + .await + }) + .await } pub fn set_show_trace(&mut self, value: bool) { @@ -156,7 +150,12 @@ impl Hive { } /// Convenience wrapper to filter nodes for CLI actions. - pub async fn select_nodes(&self, filter: Option, ssh_config: Option, ssh_only: bool) -> ColmenaResult> { + pub async fn select_nodes( + &self, + filter: Option, + ssh_config: Option, + ssh_only: bool, + ) -> ColmenaResult> { let mut node_configs = None; log::info!("Enumerating nodes..."); @@ -168,15 +167,16 @@ impl Hive { log::debug!("Retrieving deployment info for all nodes..."); let all_node_configs = self.deployment_info().await?; - let filtered = filter.filter_node_configs(all_node_configs.iter()) - .into_iter().collect(); + let filtered = filter + .filter_node_configs(all_node_configs.iter()) + .into_iter() + .collect(); node_configs = Some(all_node_configs); filtered } else { - filter.filter_node_names(&all_nodes)? - .into_iter().collect() + filter.filter_node_names(&all_nodes)?.into_iter().collect() } } None => all_nodes.clone(), @@ -223,9 +223,18 @@ impl Hive { } else if targets.len() == all_nodes.len() { log::info!("Selected all {} nodes.", targets.len()); } else if !ssh_only || skipped == 0 { - log::info!("Selected {} out of {} hosts.", targets.len(), all_nodes.len()); + log::info!( + "Selected {} out of {} hosts.", + targets.len(), + all_nodes.len() + ); } else { - log::info!("Selected {} out of {} hosts ({} skipped).", targets.len(), all_nodes.len(), skipped); + log::info!( + "Selected {} out of {} hosts ({} skipped).", + targets.len(), + all_nodes.len(), + skipped + ); } Ok(targets) @@ -233,14 +242,20 @@ impl Hive { /// Returns a list of all node names. pub async fn node_names(&self) -> ColmenaResult> { - self.nix_instantiate("attrNames hive.nodes").eval() - .capture_json().await + self.nix_instantiate("attrNames hive.nodes") + .eval() + .capture_json() + .await } /// Retrieve deployment info for all nodes. pub async fn deployment_info(&self) -> ColmenaResult> { - let configs: HashMap = self.nix_instantiate("hive.deploymentConfig").eval_with_builders().await? - .capture_json().await?; + let configs: HashMap = self + .nix_instantiate("hive.deploymentConfig") + .eval_with_builders() + .await? + .capture_json() + .await?; for config in configs.values() { config.validate()?; @@ -253,19 +268,34 @@ impl Hive { /// Retrieve deployment info for a single node. #[cfg_attr(not(target_os = "linux"), allow(dead_code))] - pub async fn deployment_info_single(&self, node: &NodeName) -> ColmenaResult> { + pub async fn deployment_info_single( + &self, + node: &NodeName, + ) -> ColmenaResult> { let expr = format!("hive.nodes.\"{}\".config.deployment or null", node.as_str()); - self.nix_instantiate(&expr).eval_with_builders().await? - .capture_json().await + self.nix_instantiate(&expr) + .eval_with_builders() + .await? + .capture_json() + .await } /// Retrieve deployment info for a list of nodes. - pub async fn deployment_info_selected(&self, nodes: &[NodeName]) -> ColmenaResult> { + pub async fn deployment_info_selected( + &self, + nodes: &[NodeName], + ) -> ColmenaResult> { let nodes_expr = SerializedNixExpression::new(nodes)?; - let configs: HashMap = self.nix_instantiate(&format!("hive.deploymentConfigSelected {}", nodes_expr.expression())) - .eval_with_builders().await? - .capture_json().await?; + let configs: HashMap = self + .nix_instantiate(&format!( + "hive.deploymentConfigSelected {}", + nodes_expr.expression() + )) + .eval_with_builders() + .await? + .capture_json() + .await?; for config in configs.values() { config.validate()?; @@ -282,20 +312,25 @@ impl Hive { /// Evaluation may take up a lot of memory, so we make it possible /// to split up the evaluation process into chunks and run them /// concurrently with other processes (e.g., build and apply). - pub async fn eval_selected(&self, nodes: &[NodeName], job: Option) -> ColmenaResult> { + pub async fn eval_selected( + &self, + nodes: &[NodeName], + job: Option, + ) -> ColmenaResult> { let nodes_expr = SerializedNixExpression::new(nodes)?; let expr = format!("hive.evalSelectedDrvPaths {}", nodes_expr.expression()); - let command = self.nix_instantiate(&expr) - .eval_with_builders().await?; + let command = self.nix_instantiate(&expr).eval_with_builders().await?; let mut execution = CommandExecution::new(command); execution.set_job(job); execution.set_hide_stdout(true); execution - .capture_json::>().await? - .into_iter().map(|(name, path)| { + .capture_json::>() + .await? + .into_iter() + .map(|(name, path)| { let path = path.into_derivation()?; Ok((name, path)) }) @@ -316,12 +351,18 @@ impl Hive { pub async fn introspect(&self, expression: String, instantiate: bool) -> ColmenaResult { if instantiate { let expression = format!("hive.introspect ({})", expression); - self.nix_instantiate(&expression).instantiate_with_builders().await? - .capture_output().await + self.nix_instantiate(&expression) + .instantiate_with_builders() + .await? + .capture_output() + .await } else { let expression = format!("toJSON (hive.introspect ({}))", expression); - self.nix_instantiate(&expression).eval_with_builders().await? - .capture_json().await + self.nix_instantiate(&expression) + .eval_with_builders() + .await? + .capture_json() + .await } } @@ -346,10 +387,7 @@ impl Hive { impl<'hive> NixInstantiate<'hive> { fn new(hive: &'hive Hive, expression: String) -> Self { - Self { - hive, - expression, - } + Self { hive, expression } } fn instantiate(&self) -> Command { @@ -373,7 +411,10 @@ impl<'hive> NixInstantiate<'hive> { fn eval(self) -> Command { let mut command = self.instantiate(); let options = self.hive.nix_options(); - command.arg("--eval").arg("--json").arg("--strict") + command + .arg("--eval") + .arg("--json") + .arg("--strict") // Ensures the derivations are instantiated // Required for system profile evaluation and IFD .arg("--read-write-mode") @@ -401,7 +442,10 @@ impl<'hive> NixInstantiate<'hive> { } impl SerializedNixExpression { - pub fn new(data: T) -> ColmenaResult where T: Serialize { + pub fn new(data: T) -> ColmenaResult + where + T: Serialize, + { let mut tmp = NamedTempFile::new()?; let json = serde_json::to_vec(&data).expect("Could not serialize data"); tmp.write_all(&json)?; @@ -414,7 +458,10 @@ impl SerializedNixExpression { impl NixExpression for SerializedNixExpression { fn expression(&self) -> String { - format!("(builtins.fromJSON (builtins.readFile {}))", self.json_file.to_str().unwrap()) + format!( + "(builtins.fromJSON (builtins.readFile {}))", + self.json_file.to_str().unwrap() + ) } } diff --git a/src/nix/hive/tests/mod.rs b/src/nix/hive/tests/mod.rs index c653777..858170f 100644 --- a/src/nix/hive/tests/mod.rs +++ b/src/nix/hive/tests/mod.rs @@ -15,7 +15,7 @@ use tokio_test::block_on; macro_rules! node { ($n:expr) => { NodeName::new($n.to_string()).unwrap() - } + }; } fn set_eq(a: &[T], b: &[T]) -> bool @@ -92,7 +92,8 @@ impl Deref for TempHive { #[test] fn test_parse_simple() { - let hive = TempHive::new(r#" + let hive = TempHive::new( + r#" { defaults = { pkgs, ... }: { environment.systemPackages = with pkgs; [ @@ -123,7 +124,8 @@ fn test_parse_simple() { time.timeZone = "America/Los_Angeles"; }; } - "#); + "#, + ); let nodes = block_on(hive.deployment_info()).unwrap(); assert!(set_eq( @@ -135,7 +137,11 @@ fn test_parse_simple() { let host_a = &nodes[&node!("host-a")]; assert!(set_eq( &["common-tag", "a-tag"], - &host_a.tags.iter().map(String::as_str).collect::>(), + &host_a + .tags + .iter() + .map(String::as_str) + .collect::>(), )); assert_eq!(Some("host-a"), host_a.target_host.as_deref()); assert_eq!(None, host_a.target_port); @@ -145,7 +151,11 @@ fn test_parse_simple() { let host_b = &nodes[&node!("host-b")]; assert!(set_eq( &["common-tag"], - &host_b.tags.iter().map(String::as_str).collect::>(), + &host_b + .tags + .iter() + .map(String::as_str) + .collect::>(), )); assert_eq!(Some("somehost.tld"), host_b.target_host.as_deref()); assert_eq!(Some(1234), host_b.target_port); @@ -171,7 +181,8 @@ fn test_parse_flake() { #[test] fn test_parse_node_references() { - TempHive::valid(r#" + TempHive::valid( + r#" with builtins; { host-a = { name, nodes, ... }: @@ -186,23 +197,27 @@ fn test_parse_node_references() { assert nodes.host-a.config.time.timeZone == "America/Los_Angeles"; {}; } - "#); + "#, + ); } #[test] fn test_parse_unknown_option() { - TempHive::invalid(r#" + TempHive::invalid( + r#" { bad = { deployment.noSuchOption = "not kidding"; }; } - "#); + "#, + ); } #[test] fn test_config_list() { - TempHive::valid(r#" + TempHive::valid( + r#" with builtins; { host-a = [ @@ -219,12 +234,14 @@ fn test_config_list() { assert elem "some-tag" nodes.host-a.config.deployment.tags; {}; } - "#); + "#, + ); } #[test] fn test_parse_key_text() { - TempHive::valid(r#" + TempHive::valid( + r#" { test = { deployment.keys.topSecret = { @@ -232,12 +249,14 @@ fn test_parse_key_text() { }; }; } - "#); + "#, + ); } #[test] fn test_parse_key_command_good() { - TempHive::valid(r#" + TempHive::valid( + r#" { test = { deployment.keys.elohim = { @@ -245,12 +264,14 @@ fn test_parse_key_command_good() { }; }; } - "#); + "#, + ); } #[test] fn test_parse_key_command_bad() { - TempHive::invalid(r#" + TempHive::invalid( + r#" { test = { deployment.keys.elohim = { @@ -258,12 +279,14 @@ fn test_parse_key_command_bad() { }; }; } - "#); + "#, + ); } #[test] fn test_parse_key_file() { - TempHive::valid(r#" + TempHive::valid( + r#" { test = { deployment.keys.l337hax0rwow = { @@ -271,27 +294,32 @@ fn test_parse_key_file() { }; }; } - "#); + "#, + ); } #[test] fn test_eval_non_existent_pkg() { // Sanity check - TempHive::eval_failure(r#" + TempHive::eval_failure( + r#" { test = { pkgs, ... }: { boot.isContainer = true; environment.systemPackages = with pkgs; [ thisPackageDoesNotExist ]; }; } - "#, vec![ node!("test") ]); + "#, + vec![node!("test")], + ); } // Nixpkgs config tests #[test] fn test_nixpkgs_system() { - TempHive::valid(r#" + TempHive::valid( + r#" { meta = { nixpkgs = import { @@ -302,9 +330,11 @@ fn test_nixpkgs_system() { boot.isContainer = assert pkgs.system == "armv5tel-linux"; true; }; } - "#); + "#, + ); - TempHive::valid(r#" + TempHive::valid( + r#" { meta = { nixpkgs = import { @@ -316,12 +346,14 @@ fn test_nixpkgs_system() { boot.isContainer = assert pkgs.system == "armv5tel-linux"; true; }; } - "#); + "#, + ); } #[test] fn test_nixpkgs_path_like() { - TempHive::valid(r#" + TempHive::valid( + r#" { meta = { nixpkgs = { @@ -332,13 +364,15 @@ fn test_nixpkgs_path_like() { boot.isContainer = true; }; } - "#); + "#, + ); } #[test] fn test_nixpkgs_overlay_meta_nixpkgs() { // Only set overlays in meta.nixpkgs - TempHive::eval_success(r#" + TempHive::eval_success( + r#" { meta = { nixpkgs = import { @@ -352,13 +386,16 @@ fn test_nixpkgs_overlay_meta_nixpkgs() { environment.systemPackages = with pkgs; [ my-coreutils ]; }; } - "#, vec![ node!("test") ]); + "#, + vec![node!("test")], + ); } #[test] fn test_nixpkgs_overlay_node_config() { // Only set overlays in node config - TempHive::eval_success(r#" + TempHive::eval_success( + r#" { test = { pkgs, ... }: { boot.isContainer = true; @@ -368,13 +405,16 @@ fn test_nixpkgs_overlay_node_config() { environment.systemPackages = with pkgs; [ my-coreutils ]; }; } - "#, vec![ node!("test") ]); + "#, + vec![node!("test")], + ); } #[test] fn test_nixpkgs_overlay_both() { // Set overlays both in meta.nixpkgs and in node config - TempHive::eval_success(r#" + TempHive::eval_success( + r#" { meta = { nixpkgs = import { @@ -391,13 +431,16 @@ fn test_nixpkgs_overlay_both() { environment.systemPackages = with pkgs; [ meta-coreutils node-busybox ]; }; } - "#, vec![ node!("test") ]); + "#, + vec![node!("test")], + ); } #[test] fn test_nixpkgs_config_meta_nixpkgs() { // Set config in meta.nixpkgs - TempHive::eval_success(r#" + TempHive::eval_success( + r#" { meta = { nixpkgs = import { @@ -413,13 +456,16 @@ fn test_nixpkgs_config_meta_nixpkgs() { boot.isContainer = assert pkgs.config.allowUnfree; true; }; } - "#, vec![ node!("test") ]); + "#, + vec![node!("test")], + ); } #[test] fn test_nixpkgs_config_node_config() { // Set config in node config - TempHive::eval_success(r#" + TempHive::eval_success( + r#" { test = { pkgs, ... }: { nixpkgs.config = { @@ -428,7 +474,9 @@ fn test_nixpkgs_config_node_config() { boot.isContainer = assert pkgs.config.allowUnfree; true; }; } - "#, vec![ node!("test") ]); + "#, + vec![node!("test")], + ); } #[test] @@ -457,7 +505,7 @@ fn test_nixpkgs_config_override() { .replace("META_VAL", "true") .replace("NODE_VAL", "false") .replace("EXPECTED_VAL", "false"), - vec![ node!("test") ] + vec![node!("test")], ); TempHive::eval_success( @@ -465,13 +513,14 @@ fn test_nixpkgs_config_override() { .replace("META_VAL", "false") .replace("NODE_VAL", "true") .replace("EXPECTED_VAL", "true"), - vec![ node!("test") ] + vec![node!("test")], ); } #[test] fn test_meta_special_args() { - TempHive::valid(r#" + TempHive::valid( + r#" { meta.specialArgs = { undine = "assimilated"; @@ -483,12 +532,14 @@ fn test_meta_special_args() { boot.isContainer = true; }; } - "#); + "#, + ); } #[test] fn test_meta_node_special_args() { - TempHive::valid(r#" + TempHive::valid( + r#" { meta.specialArgs = { someArg = "global"; @@ -510,12 +561,14 @@ fn test_meta_node_special_args() { boot.isContainer = true; }; } - "#); + "#, + ); } #[test] fn test_hive_autocall() { - TempHive::valid(r#" + TempHive::valid( + r#" { argument ? "with default value" }: { @@ -523,9 +576,11 @@ fn test_hive_autocall() { boot.isContainer = true; }; } - "#); + "#, + ); - TempHive::valid(r#" + TempHive::valid( + r#" { some = "value"; __functor = self: { argument ? "with default value" }: { @@ -534,9 +589,11 @@ fn test_hive_autocall() { }; }; } - "#); + "#, + ); - TempHive::invalid(r#" + TempHive::invalid( + r#" { thisWontWork }: { @@ -544,47 +601,51 @@ fn test_hive_autocall() { boot.isContainer = true; }; } - "#); + "#, + ); } #[test] fn test_hive_introspect() { - let hive = TempHive::new(r#" + let hive = TempHive::new( + r#" { test = { ... }: { boot.isContainer = true; }; } - "#); + "#, + ); let expr = r#" { pkgs, lib, nodes }: assert pkgs ? hello; assert lib ? versionAtLeast; nodes.test.config.boot.isContainer - "#.to_string(); + "# + .to_string(); - let eval = block_on(hive.introspect(expr, false)) - .unwrap(); + let eval = block_on(hive.introspect(expr, false)).unwrap(); assert_eq!("true", eval); } #[test] fn test_hive_get_meta() { - let hive = TempHive::new(r#" + let hive = TempHive::new( + r#" { meta.allowApplyAll = false; meta.specialArgs = { this_is_new = false; }; } - "#); + "#, + ); - let eval = block_on(hive.get_meta_config()) - .unwrap(); + let eval = block_on(hive.get_meta_config()).unwrap(); - eprintln!("{:?}", eval); + eprintln!("{:?}", eval); - assert!(!eval.allow_apply_all); + assert!(!eval.allow_apply_all); } diff --git a/src/nix/host/key_uploader.rs b/src/nix/host/key_uploader.rs index 6feaeb8..df07d15 100644 --- a/src/nix/host/key_uploader.rs +++ b/src/nix/host/key_uploader.rs @@ -19,24 +19,36 @@ use crate::util::capture_stream; const SCRIPT_TEMPLATE: &str = include_str!("./key_uploader.template.sh"); -pub fn generate_script<'a>(key: &'a Key, destination: &'a Path, require_ownership: bool) -> Cow<'a, str> { - let key_script = SCRIPT_TEMPLATE.to_string() +pub fn generate_script<'a>( + key: &'a Key, + destination: &'a Path, + require_ownership: bool, +) -> Cow<'a, str> { + let key_script = SCRIPT_TEMPLATE + .to_string() .replace("%DESTINATION%", destination.to_str().unwrap()) .replace("%USER%", &escape(key.user().into())) .replace("%GROUP%", &escape(key.group().into())) .replace("%PERMISSIONS%", &escape(key.permissions().into())) - .replace("%REQUIRE_OWNERSHIP%", if require_ownership { "1" } else { "" }) - .trim_end_matches('\n').to_string(); + .replace( + "%REQUIRE_OWNERSHIP%", + if require_ownership { "1" } else { "" }, + ) + .trim_end_matches('\n') + .to_string(); escape(key_script.into()) } -pub async fn feed_uploader(mut uploader: Child, key: &Key, job: Option) -> ColmenaResult<()> { - let mut reader = key.reader().await - .map_err(|error| ColmenaError::KeyError { - name: key.name().to_owned(), - error, - })?; +pub async fn feed_uploader( + mut uploader: Child, + key: &Key, + job: Option, +) -> ColmenaResult<()> { + let mut reader = key.reader().await.map_err(|error| ColmenaError::KeyError { + name: key.name().to_owned(), + error, + })?; let mut stdin = uploader.stdin.take().unwrap(); tokio::io::copy(reader.as_mut(), &mut stdin).await?; @@ -52,7 +64,8 @@ pub async fn feed_uploader(mut uploader: Child, key: &Key, job: Option ColmenaResult<()> { + async fn copy_closure( + &mut self, + _closure: &StorePath, + _direction: CopyDirection, + _options: CopyOptions, + ) -> ColmenaResult<()> { Ok(()) } @@ -54,11 +59,18 @@ impl Host for Local { execution.run().await?; let (stdout, _) = execution.get_logs(); - stdout.unwrap().lines() - .map(|p| p.to_string().try_into()).collect() + stdout + .unwrap() + .lines() + .map(|p| p.to_string().try_into()) + .collect() } - async fn upload_keys(&mut self, keys: &HashMap, require_ownership: bool) -> ColmenaResult<()> { + async fn upload_keys( + &mut self, + keys: &HashMap, + require_ownership: bool, + ) -> ColmenaResult<()> { for (name, key) in keys { self.upload_key(name, key, require_ownership).await?; } @@ -98,7 +110,10 @@ impl Host for Local { .capture_output() .await?; - let path = paths.lines().into_iter().next() + let path = paths + .lines() + .into_iter() + .next() .ok_or(ColmenaError::FailedToGetCurrentProfile)? .to_string() .try_into()?; @@ -108,11 +123,20 @@ impl Host for Local { async fn get_main_system_profile(&mut self) -> ColmenaResult { let paths = Command::new("sh") - .args(&["-c", &format!("readlink -e {} || readlink -e {}", SYSTEM_PROFILE, CURRENT_PROFILE)]) + .args(&[ + "-c", + &format!( + "readlink -e {} || readlink -e {}", + SYSTEM_PROFILE, CURRENT_PROFILE + ), + ]) .capture_output() .await?; - let path = paths.lines().into_iter().next() + let path = paths + .lines() + .into_iter() + .next() .ok_or(ColmenaError::FailedToGetCurrentProfile)? .to_string() .try_into()?; @@ -135,13 +159,21 @@ impl Local { } /// "Uploads" a single key. - async fn upload_key(&mut self, name: &str, key: &Key, require_ownership: bool) -> ColmenaResult<()> { + async fn upload_key( + &mut self, + name: &str, + key: &Key, + require_ownership: bool, + ) -> ColmenaResult<()> { if let Some(job) = &self.job { job.message(format!("Deploying key {}", name))?; } let path = key.path(); - let key_script = format!("'{}'", key_uploader::generate_script(key, path, require_ownership)); + let key_script = format!( + "'{}'", + key_uploader::generate_script(key, path, require_ownership) + ); let mut command = self.make_privileged_command(&["sh", "-c", &key_script]); command.stdin(Stdio::piped()); diff --git a/src/nix/host/mod.rs b/src/nix/host/mod.rs index 62ad50d..8ff44ad 100644 --- a/src/nix/host/mod.rs +++ b/src/nix/host/mod.rs @@ -2,9 +2,9 @@ use std::collections::HashMap; use async_trait::async_trait; +use super::{Goal, Key, Profile, StorePath}; use crate::error::{ColmenaError, ColmenaResult}; use crate::job::JobHandle; -use super::{StorePath, Profile, Goal, Key}; mod ssh; pub use ssh::Ssh; @@ -92,7 +92,12 @@ pub trait Host: Send + Sync + std::fmt::Debug { /// Sends or receives the specified closure to the host /// /// The StorePath and its dependent paths will then exist on this host. - async fn copy_closure(&mut self, closure: &StorePath, direction: CopyDirection, options: CopyOptions) -> ColmenaResult<()>; + async fn copy_closure( + &mut self, + closure: &StorePath, + direction: CopyDirection, + options: CopyOptions, + ) -> ColmenaResult<()>; /// Realizes the specified derivation on the host /// @@ -106,19 +111,30 @@ pub trait Host: Send + Sync + std::fmt::Debug { /// Realizes the specified local derivation on the host then retrieves the outputs. async fn realize(&mut self, derivation: &StorePath) -> ColmenaResult> { - let options = CopyOptions::default() - .include_outputs(true); + let options = CopyOptions::default().include_outputs(true); - self.copy_closure(derivation, CopyDirection::ToRemote, options).await?; + self.copy_closure(derivation, CopyDirection::ToRemote, options) + .await?; let paths = self.realize_remote(derivation).await?; - self.copy_closure(derivation, CopyDirection::FromRemote, options).await?; + self.copy_closure(derivation, CopyDirection::FromRemote, options) + .await?; Ok(paths) } /// Pushes and optionally activates a profile to the host. - async fn deploy(&mut self, profile: &Profile, goal: Goal, copy_options: CopyOptions) -> ColmenaResult<()> { - self.copy_closure(profile.as_store_path(), CopyDirection::ToRemote, copy_options).await?; + async fn deploy( + &mut self, + profile: &Profile, + goal: Goal, + copy_options: CopyOptions, + ) -> ColmenaResult<()> { + self.copy_closure( + profile.as_store_path(), + CopyDirection::ToRemote, + copy_options, + ) + .await?; if goal.requires_activation() { self.activate(profile, goal).await?; @@ -133,7 +149,11 @@ pub trait Host: Send + Sync + std::fmt::Debug { /// will not be applied if the specified user/group does not /// exist. #[allow(unused_variables)] - async fn upload_keys(&mut self, keys: &HashMap, require_ownership: bool) -> ColmenaResult<()> { + async fn upload_keys( + &mut self, + keys: &HashMap, + require_ownership: bool, + ) -> ColmenaResult<()> { Err(ColmenaError::Unsupported) } diff --git a/src/nix/host/ssh.rs b/src/nix/host/ssh.rs index c20abe8..0dffbda 100644 --- a/src/nix/host/ssh.rs +++ b/src/nix/host/ssh.rs @@ -8,11 +8,11 @@ use async_trait::async_trait; use tokio::process::Command; use tokio::time::sleep; -use crate::error::{ColmenaResult, ColmenaError}; -use crate::nix::{StorePath, Profile, Goal, Key, SYSTEM_PROFILE, CURRENT_PROFILE}; -use crate::util::{CommandExecution, CommandExt}; +use super::{key_uploader, CopyDirection, CopyOptions, Host, RebootOptions}; +use crate::error::{ColmenaError, ColmenaResult}; use crate::job::JobHandle; -use super::{CopyDirection, CopyOptions, RebootOptions, Host, key_uploader}; +use crate::nix::{Goal, Key, Profile, StorePath, CURRENT_PROFILE, SYSTEM_PROFILE}; +use crate::util::{CommandExecution, CommandExt}; /// A remote machine connected over SSH. #[derive(Debug)] @@ -41,20 +41,28 @@ struct BootId(String); #[async_trait] impl Host for Ssh { - async fn copy_closure(&mut self, closure: &StorePath, direction: CopyDirection, options: CopyOptions) -> ColmenaResult<()> { + async fn copy_closure( + &mut self, + closure: &StorePath, + direction: CopyDirection, + options: CopyOptions, + ) -> ColmenaResult<()> { let command = self.nix_copy_closure(closure, direction, options); self.run_command(command).await } async fn realize_remote(&mut self, derivation: &StorePath) -> ColmenaResult> { - let command = self.ssh(&["nix-store", "--no-gc-warning", "--realise", derivation.as_path().to_str().unwrap()]); + let command = self.ssh(&[ + "nix-store", + "--no-gc-warning", + "--realise", + derivation.as_path().to_str().unwrap(), + ]); let mut execution = CommandExecution::new(command); execution.set_job(self.job.clone()); - let paths = execution - .capture_output() - .await?; + let paths = execution.capture_output().await?; paths.lines().map(|p| p.to_string().try_into()).collect() } @@ -63,7 +71,11 @@ impl Host for Ssh { self.job = job; } - async fn upload_keys(&mut self, keys: &HashMap, require_ownership: bool) -> ColmenaResult<()> { + async fn upload_keys( + &mut self, + keys: &HashMap, + require_ownership: bool, + ) -> ColmenaResult<()> { for (name, key) in keys { self.upload_key(name, key, require_ownership).await?; } @@ -89,11 +101,15 @@ impl Host for Ssh { } async fn get_current_system_profile(&mut self) -> ColmenaResult { - let paths = self.ssh(&["readlink", "-e", CURRENT_PROFILE]) + let paths = self + .ssh(&["readlink", "-e", CURRENT_PROFILE]) .capture_output() .await?; - let path = paths.lines().into_iter().next() + let path = paths + .lines() + .into_iter() + .next() .ok_or(ColmenaError::FailedToGetCurrentProfile)? .to_string() .try_into()?; @@ -102,13 +118,17 @@ impl Host for Ssh { } async fn get_main_system_profile(&mut self) -> ColmenaResult { - let command = format!("\"readlink -e {} || readlink -e {}\"", SYSTEM_PROFILE, CURRENT_PROFILE); + let command = format!( + "\"readlink -e {} || readlink -e {}\"", + SYSTEM_PROFILE, CURRENT_PROFILE + ); - let paths = self.ssh(&["sh", "-c", &command]) - .capture_output() - .await?; + let paths = self.ssh(&["sh", "-c", &command]).capture_output().await?; - let path = paths.lines().into_iter().next() + let path = paths + .lines() + .into_iter() + .next() .ok_or(ColmenaError::FailedToGetCurrentProfile)? .to_string() .try_into()?; @@ -151,9 +171,7 @@ impl Host for Ssh { let profile = self.get_current_system_profile().await?; if new_profile != profile { - return Err(ColmenaError::ActiveProfileUnexpected { - profile, - }); + return Err(ColmenaError::ActiveProfileUnexpected { profile }); } } @@ -201,8 +219,7 @@ impl Ssh { let mut cmd = Command::new("ssh"); - cmd - .arg(self.ssh_target()) + cmd.arg(self.ssh_target()) .args(&options) .arg("--") .args(privilege_escalation_command) @@ -226,7 +243,12 @@ impl Ssh { } } - fn nix_copy_closure(&self, path: &StorePath, direction: CopyDirection, options: CopyOptions) -> Command { + fn nix_copy_closure( + &self, + path: &StorePath, + direction: CopyDirection, + options: CopyOptions, + ) -> Command { let ssh_options = self.ssh_options(); let ssh_options_str = ssh_options.join(" "); @@ -262,8 +284,16 @@ impl Ssh { fn ssh_options(&self) -> Vec { // TODO: Allow configuation of SSH parameters - let mut options: Vec = ["-o", "StrictHostKeyChecking=accept-new", "-o", "BatchMode=yes", "-T"] - .iter().map(|s| s.to_string()).collect(); + let mut options: Vec = [ + "-o", + "StrictHostKeyChecking=accept-new", + "-o", + "BatchMode=yes", + "-T", + ] + .iter() + .map(|s| s.to_string()) + .collect(); if let Some(port) = self.port { options.push("-p".to_string()); @@ -279,7 +309,12 @@ impl Ssh { } /// Uploads a single key. - async fn upload_key(&mut self, name: &str, key: &Key, require_ownership: bool) -> ColmenaResult<()> { + async fn upload_key( + &mut self, + name: &str, + key: &Key, + require_ownership: bool, + ) -> ColmenaResult<()> { if let Some(job) = &self.job { job.message(format!("Uploading key {}", name))?; } @@ -299,7 +334,8 @@ impl Ssh { /// Returns the current Boot ID. async fn get_boot_id(&mut self) -> ColmenaResult { - let boot_id = self.ssh(&["cat", "/proc/sys/kernel/random/boot_id"]) + let boot_id = self + .ssh(&["cat", "/proc/sys/kernel/random/boot_id"]) .capture_output() .await?; diff --git a/src/nix/info.rs b/src/nix/info.rs index 24e28e8..906e658 100644 --- a/src/nix/info.rs +++ b/src/nix/info.rs @@ -20,7 +20,11 @@ impl NixVersion { let major = caps.name("major").unwrap().as_str().parse().unwrap(); let minor = caps.name("minor").unwrap().as_str().parse().unwrap(); - Self { major, minor, string } + Self { + major, + minor, + string, + } } else { Self { major: 0, @@ -61,20 +65,23 @@ impl NixCheck { pub async fn detect() -> Self { let version_cmd = Command::new("nix-instantiate") .arg("--version") - .output().await; + .output() + .await; if version_cmd.is_err() { return Self::NO_NIX; } - let version = NixVersion::parse(String::from_utf8_lossy(&version_cmd.unwrap().stdout).to_string()); + let version = + NixVersion::parse(String::from_utf8_lossy(&version_cmd.unwrap().stdout).to_string()); let flakes_supported = version.has_flakes(); let flake_cmd = Command::new("nix-instantiate") .args(&["--eval", "-E", "builtins.getFlake"]) .stdout(Stdio::null()) .stderr(Stdio::null()) - .status().await; + .status() + .await; if flake_cmd.is_err() { return Self::NO_NIX; @@ -121,16 +128,18 @@ impl NixCheck { log::warn!("Colmena will automatically enable Flakes for its operations, but you should enable it in your Nix configuration:"); log::warn!(" experimental-features = nix-command flakes"); } else { - let level = if required { - Level::Error - } else { - Level::Warn - }; - log::log!(level, "The Nix version you are using does not support Flakes."); + let level = if required { Level::Error } else { Level::Warn }; + log::log!( + level, + "The Nix version you are using does not support Flakes." + ); log::log!(level, "If you are using a Nixpkgs version before 21.11, please install nixUnstable for a version that includes Flakes support."); if required { - log::log!(level, "Cannot continue since Flakes support is required for this operation."); + log::log!( + level, + "Cannot continue since Flakes support is required for this operation." + ); } } } diff --git a/src/nix/key.rs b/src/nix/key.rs index 7bede72..b0a6fd0 100644 --- a/src/nix/key.rs +++ b/src/nix/key.rs @@ -8,11 +8,7 @@ use std::{ use regex::Regex; use serde::{Deserialize, Serialize}; use snafu::Snafu; -use tokio::{ - fs::File, - io::AsyncRead, - process::Command, -}; +use tokio::{fs::File, io::AsyncRead, process::Command}; use validator::{Validate, ValidationError}; #[non_exhaustive] @@ -48,18 +44,13 @@ impl TryFrom for KeySource { fn try_from(ks: KeySources) -> Result { match (ks.text, ks.command, ks.file) { - (Some(text), None, None) => { - Ok(KeySource::Text(text)) - } - (None, Some(command), None) => { - Ok(KeySource::Command(command)) - } - (None, None, Some(file)) => { - Ok(KeySource::File(file)) - } - x => { - Err(format!("Somehow 0 or more than 1 key source was specified: {:?}", x)) - } + (Some(text), None, None) => Ok(KeySource::Text(text)), + (None, Some(command), None) => Ok(KeySource::Command(command)), + (None, None, Some(file)) => Ok(KeySource::File(file)), + x => Err(format!( + "Somehow 0 or more than 1 key source was specified: {:?}", + x + )), } } } @@ -115,9 +106,7 @@ pub struct Key { impl Key { pub async fn reader(&'_ self) -> Result, KeyError> { match &self.source { - KeySource::Text(content) => { - Ok(Box::new(Cursor::new(content))) - } + KeySource::Text(content) => Ok(Box::new(Cursor::new(content))), KeySource::Command(command) => { let pathname = &command[0]; let argv = &command[1..]; @@ -128,7 +117,8 @@ impl Key { .stdout(Stdio::piped()) .stderr(Stdio::piped()) .spawn()? - .wait_with_output().await?; + .wait_with_output() + .await?; if output.status.success() { Ok(Box::new(Cursor::new(output.stdout))) @@ -142,18 +132,28 @@ impl Key { }) } } - KeySource::File(path) => { - Ok(Box::new(File::open(path).await?)) - } + KeySource::File(path) => Ok(Box::new(File::open(path).await?)), } } - pub fn name(&self) -> &str { &self.name } - pub fn path(&self) -> &Path { &self.path } - pub fn user(&self) -> &str { &self.user } - pub fn group(&self) -> &str { &self.group } - pub fn permissions(&self) -> &str { &self.permissions } - pub fn upload_at(&self) -> UploadAt { self.upload_at } + pub fn name(&self) -> &str { + &self.name + } + pub fn path(&self) -> &Path { + &self.path + } + pub fn user(&self) -> &str { + &self.user + } + pub fn group(&self) -> &str { + &self.group + } + pub fn permissions(&self) -> &str { + &self.permissions + } + pub fn upload_at(&self) -> UploadAt { + self.upload_at + } } fn validate_unix_name(name: &str) -> Result<(), ValidationError> { @@ -169,6 +169,8 @@ fn validate_dest_dir(dir: &Path) -> Result<(), ValidationError> { if dir.has_root() { Ok(()) } else { - Err(ValidationError::new("Secret key destination directory must be absolute")) + Err(ValidationError::new( + "Secret key destination directory must be absolute", + )) } } diff --git a/src/nix/mod.rs b/src/nix/mod.rs index 0a43548..ca017e5 100644 --- a/src/nix/mod.rs +++ b/src/nix/mod.rs @@ -7,17 +7,17 @@ use serde::de; use serde::{Deserialize, Deserializer, Serialize}; use validator::{Validate, ValidationError as ValidationErrorType}; -use crate::error::{ColmenaResult, ColmenaError}; +use crate::error::{ColmenaError, ColmenaResult}; pub mod host; -pub use host::{Host, CopyDirection, CopyOptions, RebootOptions}; use host::Ssh; +pub use host::{CopyDirection, CopyOptions, Host, RebootOptions}; pub mod hive; pub use hive::{Hive, HivePath}; pub mod store; -pub use store::{StorePath, StoreDerivation, BuildResult}; +pub use store::{BuildResult, StoreDerivation, StorePath}; pub mod key; pub use key::Key; @@ -48,10 +48,7 @@ pub const CURRENT_PROFILE: &str = "/run/current-system"; /// A node's attribute name. #[derive(Serialize, Deserialize, Clone, Debug, Hash, Eq, PartialEq)] #[serde(transparent)] -pub struct NodeName ( - #[serde(deserialize_with = "NodeName::deserialize")] - String -); +pub struct NodeName(#[serde(deserialize_with = "NodeName::deserialize")] String); #[derive(Debug, Clone, Validate, Deserialize)] pub struct NodeConfig { @@ -108,7 +105,7 @@ pub struct NixOptions { } /// A Nix expression. -pub trait NixExpression : Send + Sync { +pub trait NixExpression: Send + Sync { /// Returns the full Nix expression to be evaluated. fn expression(&self) -> String; @@ -132,13 +129,12 @@ impl NodeName { /// Deserializes a potentially-invalid node name. fn deserialize<'de, D>(deserializer: D) -> Result - where D: Deserializer<'de> + where + D: Deserializer<'de>, { use de::Error; String::deserialize(deserializer) - .and_then(|s| { - Self::validate(s).map_err(|e| Error::custom(e.to_string())) - }) + .and_then(|s| Self::validate(s).map_err(|e| Error::custom(e.to_string()))) } fn validate(s: String) -> ColmenaResult { @@ -160,14 +156,22 @@ impl Deref for NodeName { } impl NodeConfig { - pub fn tags(&self) -> &[String] { &self.tags } + pub fn tags(&self) -> &[String] { + &self.tags + } #[cfg_attr(not(target_os = "linux"), allow(dead_code))] - pub fn allows_local_deployment(&self) -> bool { self.allow_local_deployment } + pub fn allows_local_deployment(&self) -> bool { + self.allow_local_deployment + } - pub fn privilege_escalation_command(&self) -> &Vec { &self.privilege_escalation_command } + pub fn privilege_escalation_command(&self) -> &Vec { + &self.privilege_escalation_command + } - pub fn build_on_target(&self) -> bool { self.build_on_target } + pub fn build_on_target(&self) -> bool { + self.build_on_target + } pub fn set_build_on_target(&mut self, enable: bool) { self.build_on_target = enable; } @@ -228,11 +232,15 @@ fn validate_keys(keys: &HashMap) -> Result<(), ValidationErrorType> for name in keys.keys() { let path = Path::new(name); if path.has_root() { - return Err(ValidationErrorType::new("Secret key name cannot be absolute")); + return Err(ValidationErrorType::new( + "Secret key name cannot be absolute", + )); } if path.components().count() != 1 { - return Err(ValidationErrorType::new("Secret key name cannot contain path separators")); + return Err(ValidationErrorType::new( + "Secret key name cannot contain path separators", + )); } } Ok(()) diff --git a/src/nix/node_filter.rs b/src/nix/node_filter.rs index 71c3a18..6f645ca 100644 --- a/src/nix/node_filter.rs +++ b/src/nix/node_filter.rs @@ -2,11 +2,11 @@ use std::collections::HashSet; use std::convert::AsRef; -use std::iter::{Iterator, FromIterator}; +use std::iter::{FromIterator, Iterator}; use glob::Pattern as GlobPattern; -use super::{ColmenaError, ColmenaResult, NodeName, NodeConfig}; +use super::{ColmenaError, ColmenaResult, NodeConfig, NodeName}; /// A node filter containing a list of rules. pub struct NodeFilter { @@ -34,30 +34,29 @@ impl NodeFilter { if trimmed.is_empty() { log::warn!("Filter \"{}\" is blank and will match nothing", filter); - return Ok(Self { - rules: Vec::new(), - }); + return Ok(Self { rules: Vec::new() }); } - let rules = trimmed.split(',').map(|pattern| { - let pattern = pattern.trim(); + let rules = trimmed + .split(',') + .map(|pattern| { + let pattern = pattern.trim(); - if pattern.is_empty() { - return Err(ColmenaError::EmptyFilterRule); - } + if pattern.is_empty() { + return Err(ColmenaError::EmptyFilterRule); + } - if let Some(tag_pattern) = pattern.strip_prefix('@') { - Ok(Rule::MatchTag(GlobPattern::new(tag_pattern).unwrap())) - } else { - Ok(Rule::MatchName(GlobPattern::new(pattern).unwrap())) - } - }).collect::>>(); + if let Some(tag_pattern) = pattern.strip_prefix('@') { + Ok(Rule::MatchTag(GlobPattern::new(tag_pattern).unwrap())) + } else { + Ok(Rule::MatchName(GlobPattern::new(pattern).unwrap())) + } + }) + .collect::>>(); let rules = Result::from_iter(rules)?; - Ok(Self { - rules, - }) + Ok(Self { rules }) } /// Returns whether the filter has any rule matching NodeConfig information. @@ -71,32 +70,36 @@ impl NodeFilter { /// Runs the filter against a set of NodeConfigs and returns the matched ones. pub fn filter_node_configs<'a, I>(&self, nodes: I) -> HashSet - where I: Iterator + where + I: Iterator, { if self.rules.is_empty() { return HashSet::new(); } - nodes.filter_map(|(name, node)| { - for rule in self.rules.iter() { - match rule { - Rule::MatchName(pat) => { - if pat.matches(name.as_str()) { - return Some(name); - } - } - Rule::MatchTag(pat) => { - for tag in node.tags() { - if pat.matches(tag) { + nodes + .filter_map(|(name, node)| { + for rule in self.rules.iter() { + match rule { + Rule::MatchName(pat) => { + if pat.matches(name.as_str()) { return Some(name); } } + Rule::MatchTag(pat) => { + for tag in node.tags() { + if pat.matches(tag) { + return Some(name); + } + } + } } } - } - None - }).cloned().collect() + None + }) + .cloned() + .collect() } /// Runs the filter against a set of node names and returns the matched ones. @@ -140,7 +143,7 @@ mod tests { macro_rules! node { ($n:expr) => { NodeName::new($n.to_string()).unwrap() - } + }; } #[test] @@ -186,16 +189,22 @@ mod tests { #[test] fn test_filter_node_names() { - let nodes = vec![ node!("lax-alpha"), node!("lax-beta"), node!("sfo-gamma") ]; + let nodes = vec![node!("lax-alpha"), node!("lax-beta"), node!("sfo-gamma")]; assert_eq!( - &HashSet::from_iter([ node!("lax-alpha") ]), - &NodeFilter::new("lax-alpha").unwrap().filter_node_names(&nodes).unwrap(), + &HashSet::from_iter([node!("lax-alpha")]), + &NodeFilter::new("lax-alpha") + .unwrap() + .filter_node_names(&nodes) + .unwrap(), ); assert_eq!( - &HashSet::from_iter([ node!("lax-alpha"), node!("lax-beta") ]), - &NodeFilter::new("lax-*").unwrap().filter_node_names(&nodes).unwrap(), + &HashSet::from_iter([node!("lax-alpha"), node!("lax-beta")]), + &NodeFilter::new("lax-*") + .unwrap() + .filter_node_names(&nodes) + .unwrap(), ); } @@ -216,46 +225,66 @@ mod tests { let mut nodes = HashMap::new(); - nodes.insert(node!("alpha"), NodeConfig { - tags: vec![ "web".to_string(), "infra-lax".to_string() ], - ..template.clone() - }); + nodes.insert( + node!("alpha"), + NodeConfig { + tags: vec!["web".to_string(), "infra-lax".to_string()], + ..template.clone() + }, + ); - nodes.insert(node!("beta"), NodeConfig { - tags: vec![ "router".to_string(), "infra-sfo".to_string() ], - ..template.clone() - }); + nodes.insert( + node!("beta"), + NodeConfig { + tags: vec!["router".to_string(), "infra-sfo".to_string()], + ..template.clone() + }, + ); - nodes.insert(node!("gamma-a"), NodeConfig { - tags: vec![ "controller".to_string() ], - ..template.clone() - }); + nodes.insert( + node!("gamma-a"), + NodeConfig { + tags: vec!["controller".to_string()], + ..template.clone() + }, + ); - nodes.insert(node!("gamma-b"), NodeConfig { - tags: vec![ "ewaste".to_string() ], - ..template - }); + nodes.insert( + node!("gamma-b"), + NodeConfig { + tags: vec!["ewaste".to_string()], + ..template + }, + ); assert_eq!(4, nodes.len()); assert_eq!( - &HashSet::from_iter([ node!("alpha") ]), - &NodeFilter::new("@web").unwrap().filter_node_configs(nodes.iter()), + &HashSet::from_iter([node!("alpha")]), + &NodeFilter::new("@web") + .unwrap() + .filter_node_configs(nodes.iter()), ); assert_eq!( - &HashSet::from_iter([ node!("alpha"), node!("beta") ]), - &NodeFilter::new("@infra-*").unwrap().filter_node_configs(nodes.iter()), + &HashSet::from_iter([node!("alpha"), node!("beta")]), + &NodeFilter::new("@infra-*") + .unwrap() + .filter_node_configs(nodes.iter()), ); assert_eq!( - &HashSet::from_iter([ node!("beta"), node!("gamma-a") ]), - &NodeFilter::new("@router,@controller").unwrap().filter_node_configs(nodes.iter()), + &HashSet::from_iter([node!("beta"), node!("gamma-a")]), + &NodeFilter::new("@router,@controller") + .unwrap() + .filter_node_configs(nodes.iter()), ); assert_eq!( - &HashSet::from_iter([ node!("beta"), node!("gamma-a"), node!("gamma-b") ]), - &NodeFilter::new("@router,gamma-*").unwrap().filter_node_configs(nodes.iter()), + &HashSet::from_iter([node!("beta"), node!("gamma-a"), node!("gamma-b")]), + &NodeFilter::new("@router,gamma-*") + .unwrap() + .filter_node_configs(nodes.iter()), ); } } diff --git a/src/nix/profile.rs b/src/nix/profile.rs index 70dd1d6..75e1361 100644 --- a/src/nix/profile.rs +++ b/src/nix/profile.rs @@ -4,14 +4,7 @@ use std::process::Stdio; use tokio::process::Command; -use super::{ - Goal, - ColmenaResult, - ColmenaError, - StorePath, - StoreDerivation, - BuildResult, -}; +use super::{BuildResult, ColmenaError, ColmenaResult, Goal, StoreDerivation, StorePath}; pub type ProfileDerivation = StoreDerivation; @@ -21,10 +14,7 @@ pub struct Profile(StorePath); impl Profile { pub fn from_store_path(path: StorePath) -> ColmenaResult { - if - !path.is_dir() || - !path.join("bin/switch-to-configuration").exists() - { + if !path.is_dir() || !path.join("bin/switch-to-configuration").exists() { return Err(ColmenaError::InvalidProfile); } @@ -39,14 +29,12 @@ impl Profile { pub fn activation_command(&self, goal: Goal) -> Option> { if let Some(goal) = goal.as_str() { let path = self.as_path().join("bin/switch-to-configuration"); - let switch_to_configuration = path.to_str() + let switch_to_configuration = path + .to_str() .expect("The string should be UTF-8 valid") .to_string(); - Some(vec![ - switch_to_configuration, - goal.to_string(), - ]) + Some(vec![switch_to_configuration, goal.to_string()]) } else { None } @@ -65,7 +53,12 @@ impl Profile { /// Create a GC root for this profile. pub async fn create_gc_root(&self, path: &Path) -> ColmenaResult<()> { let mut command = Command::new("nix-store"); - command.args(&["--no-build-output", "--indirect", "--add-root", path.to_str().unwrap()]); + command.args(&[ + "--no-build-output", + "--indirect", + "--add-root", + path.to_str().unwrap(), + ]); command.args(&["--realise", self.as_path().to_str().unwrap()]); command.stdout(Stdio::null()); @@ -100,8 +93,7 @@ impl TryFrom> for Profile { }); } - let path = paths.iter().next() - .unwrap().to_owned(); + let path = paths.iter().next().unwrap().to_owned(); Ok(Self::from_store_path_unchecked(path)) } diff --git a/src/nix/store.rs b/src/nix/store.rs index d49f7d4..75c69b1 100644 --- a/src/nix/store.rs +++ b/src/nix/store.rs @@ -1,15 +1,15 @@ use std::convert::{TryFrom, TryInto}; -use std::marker::PhantomData; -use std::path::{Path, PathBuf}; -use std::ops::Deref; use std::fmt; +use std::marker::PhantomData; +use std::ops::Deref; +use std::path::{Path, PathBuf}; -use serde::{Serialize, Deserialize}; +use serde::{Deserialize, Serialize}; use tokio::process::Command; +use super::Host; use crate::error::{ColmenaError, ColmenaResult}; use crate::util::CommandExt; -use super::Host; /// A Nix store path. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] @@ -17,7 +17,7 @@ pub struct StorePath(PathBuf); /// A store derivation (.drv) that will result in a T when built. #[derive(Debug)] -pub struct StoreDerivation>>{ +pub struct StoreDerivation>> { path: StorePath, _target: PhantomData, } @@ -48,9 +48,12 @@ impl StorePath { let references = Command::new("nix-store") .args(&["--query", "--references"]) .arg(&self.0) - .capture_output().await? - .trim_end().split('\n') - .map(|p| StorePath(PathBuf::from(p))).collect(); + .capture_output() + .await? + .trim_end() + .split('\n') + .map(|p| StorePath(PathBuf::from(p))) + .collect(); Ok(references) } @@ -114,7 +117,7 @@ impl>> StoreDerivation { } } -impl, Error=ColmenaError>> StoreDerivation { +impl, Error = ColmenaError>> StoreDerivation { /// Builds the store derivation on a host, resulting in a T. pub async fn realize(&self, host: &mut Box) -> ColmenaResult { let paths: Vec = host.realize(&self.path).await?; @@ -144,7 +147,7 @@ impl>> fmt::Display for StoreDerivation { } } -impl, Error=ColmenaError>> BuildResult { +impl, Error = ColmenaError>> BuildResult { pub fn paths(&self) -> &[StorePath] { self.results.as_slice() } diff --git a/src/progress/mod.rs b/src/progress/mod.rs index bbf9768..e034d35 100644 --- a/src/progress/mod.rs +++ b/src/progress/mod.rs @@ -8,10 +8,7 @@ pub mod plain; pub mod spinner; use async_trait::async_trait; -use tokio::sync::mpsc::{self, - UnboundedReceiver as TokioReceiver, - UnboundedSender as TokioSender, -}; +use tokio::sync::mpsc::{self, UnboundedReceiver as TokioReceiver, UnboundedSender as TokioSender}; use crate::error::ColmenaResult; use crate::job::JobId; @@ -31,7 +28,7 @@ pub enum SimpleProgressOutput { /// A progress display driver. #[async_trait] -pub trait ProgressOutput : Sized { +pub trait ProgressOutput: Sized { /// Runs until a Message::Complete is received. async fn run_until_completion(self) -> ColmenaResult; @@ -111,14 +108,8 @@ impl SimpleProgressOutput { pub async fn run_until_completion(self) -> ColmenaResult { match self { - Self::Plain(o) => { - o.run_until_completion().await - .map(Self::Plain) - } - Self::Spinner(o) => { - o.run_until_completion().await - .map(Self::Spinner) - } + Self::Plain(o) => o.run_until_completion().await.map(Self::Plain), + Self::Spinner(o) => o.run_until_completion().await.map(Self::Spinner), } } } diff --git a/src/progress/plain.rs b/src/progress/plain.rs index 1a9b321..4bd5a96 100644 --- a/src/progress/plain.rs +++ b/src/progress/plain.rs @@ -3,17 +3,10 @@ use async_trait::async_trait; use console::Style as ConsoleStyle; -use crate::error::ColmenaResult; use super::{ - DEFAULT_LABEL_WIDTH, - ProgressOutput, - Sender, - Receiver, - Message, - Line, - LineStyle, - create_channel, + create_channel, Line, LineStyle, Message, ProgressOutput, Receiver, Sender, DEFAULT_LABEL_WIDTH, }; +use crate::error::ColmenaResult; pub struct PlainOutput { sender: Option, @@ -42,36 +35,21 @@ impl PlainOutput { } let label_style = match line.style { - LineStyle::Normal => { - ConsoleStyle::new().bold() - } - LineStyle::Success => { - ConsoleStyle::new().bold().green() - } - LineStyle::SuccessNoop => { - ConsoleStyle::new().bold().green().dim() - } - LineStyle::Failure => { - ConsoleStyle::new().bold().red() - } + LineStyle::Normal => ConsoleStyle::new().bold(), + LineStyle::Success => ConsoleStyle::new().bold().green(), + LineStyle::SuccessNoop => ConsoleStyle::new().bold().green().dim(), + LineStyle::Failure => ConsoleStyle::new().bold().red(), }; let text_style = match line.style { - LineStyle::Normal => { - ConsoleStyle::new() - } - LineStyle::Success => { - ConsoleStyle::new().green() - } - LineStyle::SuccessNoop => { - ConsoleStyle::new().dim() - } - LineStyle::Failure => { - ConsoleStyle::new().red() - } + LineStyle::Normal => ConsoleStyle::new(), + LineStyle::Success => ConsoleStyle::new().green(), + LineStyle::SuccessNoop => ConsoleStyle::new().dim(), + LineStyle::Failure => ConsoleStyle::new().red(), }; - eprintln!("{:>width$} | {}", + eprintln!( + "{:>width$} | {}", label_style.apply_to(line.label), text_style.apply_to(line.text), width = self.label_width, diff --git a/src/progress/spinner.rs b/src/progress/spinner.rs index 3a43539..530851a 100644 --- a/src/progress/spinner.rs +++ b/src/progress/spinner.rs @@ -4,20 +4,13 @@ use std::collections::HashMap; use std::time::{Duration, Instant}; use async_trait::async_trait; -use indicatif::{MultiProgress, ProgressStyle, ProgressBar}; +use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; +use super::{ + create_channel, Line, LineStyle, Message, ProgressOutput, Receiver, Sender, DEFAULT_LABEL_WIDTH, +}; use crate::error::ColmenaResult; use crate::job::JobId; -use super::{ - DEFAULT_LABEL_WIDTH, - ProgressOutput, - Sender, - Receiver, - Message, - Line, - LineStyle, - create_channel, -}; /// Progress spinner output. pub struct SpinnerOutput { @@ -91,8 +84,7 @@ impl SpinnerOutput { /// Creates a new bar. fn create_bar(&self, style: LineStyle) -> ProgressBar { - let bar = ProgressBar::new(100) - .with_style(self.get_spinner_style(style)); + let bar = ProgressBar::new(100).with_style(self.get_spinner_style(style)); let bar = self.multi.add(bar); bar.enable_steady_tick(Duration::from_millis(100)); @@ -222,25 +214,27 @@ impl JobState { } fn configure_one_off(&self, bar: &ProgressBar) { - bar.clone().with_elapsed(Instant::now().duration_since(self.since)); + bar.clone() + .with_elapsed(Instant::now().duration_since(self.since)); } } fn get_spinner_style(label_width: usize, style: LineStyle) -> ProgressStyle { - let template = format!("{{prefix:>{}.bold.dim}} {{spinner}} {{elapsed}} {{wide_msg}}", label_width); + let template = format!( + "{{prefix:>{}.bold.dim}} {{spinner}} {{elapsed}} {{wide_msg}}", + label_width + ); match style { LineStyle::Normal | LineStyle::Success | LineStyle::SuccessNoop => { ProgressStyle::default_spinner() - .tick_chars("🕛🕐🕑🕒🕓🕔🕕🕖🕗🕘🕙🕚✅") - .template(&template) - .unwrap() + .tick_chars("🕛🕐🕑🕒🕓🕔🕕🕖🕗🕘🕙🕚✅") + .template(&template) + .unwrap() } - LineStyle::Failure => { - ProgressStyle::default_spinner() + LineStyle::Failure => ProgressStyle::default_spinner() .tick_chars("❌❌") .template(&template) - .unwrap() - } + .unwrap(), } } diff --git a/src/troubleshooter.rs b/src/troubleshooter.rs index 80d7b5b..9eb189c 100644 --- a/src/troubleshooter.rs +++ b/src/troubleshooter.rs @@ -10,9 +10,14 @@ use clap::ArgMatches; use crate::error::ColmenaError; /// Runs a closure and tries to troubleshoot if it returns an error. -pub async fn run_wrapped<'a, F, U, T>(global_args: &'a ArgMatches, local_args: &'a ArgMatches, f: U) -> T - where U: FnOnce(&'a ArgMatches, &'a ArgMatches) -> F, - F: Future>, +pub async fn run_wrapped<'a, F, U, T>( + global_args: &'a ArgMatches, + local_args: &'a ArgMatches, + f: U, +) -> T +where + U: FnOnce(&'a ArgMatches, &'a ArgMatches) -> F, + F: Future>, { match f(global_args, local_args).await { Ok(r) => r, @@ -21,16 +26,23 @@ pub async fn run_wrapped<'a, F, U, T>(global_args: &'a ArgMatches, local_args: & log::error!("Operation failed with error: {}", error); if let Err(own_error) = troubleshoot(global_args, local_args, &error) { - log::error!("Error occurred while trying to troubleshoot another error: {}", own_error); + log::error!( + "Error occurred while trying to troubleshoot another error: {}", + own_error + ); } // Ensure we exit with a code quit::with_code(1); - }, + } } } -fn troubleshoot(global_args: &ArgMatches, _local_args: &ArgMatches, error: &ColmenaError) -> Result<(), ColmenaError> { +fn troubleshoot( + global_args: &ArgMatches, + _local_args: &ArgMatches, + error: &ColmenaError, +) -> Result<(), ColmenaError> { if let ColmenaError::NoFlakesSupport = error { // People following the tutorial might put hive.nix directly // in their Colmena checkout, and encounter NoFlakesSupport @@ -39,7 +51,9 @@ fn troubleshoot(global_args: &ArgMatches, _local_args: &ArgMatches, error: &Colm if global_args.occurrences_of("config") == 0 { let cwd = env::current_dir()?; if cwd.join("flake.nix").is_file() && cwd.join("hive.nix").is_file() { - eprintln!("Hint: You have both flake.nix and hive.nix in the current directory, and"); + eprintln!( + "Hint: You have both flake.nix and hive.nix in the current directory, and" + ); eprintln!(" Colmena will always prefer flake.nix if it exists."); eprintln!(); eprintln!(" Try passing `-f hive.nix` explicitly if this is what you want."); diff --git a/src/util.rs b/src/util.rs index a9289a8..cb45fbf 100644 --- a/src/util.rs +++ b/src/util.rs @@ -3,16 +3,16 @@ use std::path::PathBuf; use std::process::Stdio; use async_trait::async_trait; -use clap::{Command as ClapCommand, Arg, ArgMatches}; +use clap::{Arg, ArgMatches, Command as ClapCommand}; use futures::future::join3; use serde::de::DeserializeOwned; -use tokio::io::{AsyncRead, AsyncBufReadExt, BufReader}; +use tokio::io::{AsyncBufReadExt, AsyncRead, BufReader}; use tokio::process::Command; -use super::error::{ColmenaResult, ColmenaError}; -use super::nix::{Flake, Hive, HivePath, StorePath}; -use super::nix::deployment::TargetNodeMap; +use super::error::{ColmenaError, ColmenaResult}; use super::job::JobHandle; +use super::nix::deployment::TargetNodeMap; +use super::nix::{Flake, Hive, HivePath, StorePath}; const NEWLINE: u8 = 0xa; @@ -35,7 +35,9 @@ pub trait CommandExt { async fn capture_output(&mut self) -> ColmenaResult; /// Runs the command, capturing deserialized output from JSON. - async fn capture_json(&mut self) -> ColmenaResult where T: DeserializeOwned; + async fn capture_json(&mut self) -> ColmenaResult + where + T: DeserializeOwned; /// Runs the command, capturing a single store path. async fn capture_store_path(&mut self) -> ColmenaResult; @@ -81,7 +83,11 @@ impl CommandExecution { let stdout = BufReader::new(child.stdout.take().unwrap()); let stderr = BufReader::new(child.stderr.take().unwrap()); - let stdout_job = if self.hide_stdout { None } else { self.job.clone() }; + let stdout_job = if self.hide_stdout { + None + } else { + self.job.clone() + }; let futures = join3( capture_stream(stdout, stdout_job, false), @@ -107,10 +113,7 @@ impl CommandExecution { impl CommandExt for Command { /// Runs the command with stdout and stderr passed through to the user. async fn passthrough(&mut self) -> ColmenaResult<()> { - let exit = self - .spawn()? - .wait() - .await?; + let exit = self.spawn()?.wait().await?; if exit.success() { Ok(()) @@ -138,10 +141,13 @@ impl CommandExt for Command { } /// Captures deserialized output from JSON. - async fn capture_json(&mut self) -> ColmenaResult where T: DeserializeOwned { + async fn capture_json(&mut self) -> ColmenaResult + where + T: DeserializeOwned, + { let output = self.capture_output().await?; serde_json::from_str(&output).map_err(|_| ColmenaError::BadOutput { - output: output.clone() + output: output.clone(), }) } @@ -168,10 +174,13 @@ impl CommandExt for CommandExecution { } /// Captures deserialized output from JSON. - async fn capture_json(&mut self) -> ColmenaResult where T: DeserializeOwned { + async fn capture_json(&mut self) -> ColmenaResult + where + T: DeserializeOwned, + { let output = self.capture_output().await?; serde_json::from_str(&output).map_err(|_| ColmenaError::BadOutput { - output: output.clone() + output: output.clone(), }) } @@ -214,13 +223,19 @@ pub async fn hive_from_args(args: &ArgMatches) -> ColmenaResult { } if file_path.is_none() { - log::error!("Could not find `hive.nix` or `flake.nix` in {:?} or any parent directory", std::env::current_dir()?); + log::error!( + "Could not find `hive.nix` or `flake.nix` in {:?} or any parent directory", + std::env::current_dir()? + ); } file_path.unwrap() } _ => { - let path = args.value_of("config").expect("The config arg should exist").to_owned(); + let path = args + .value_of("config") + .expect("The config arg should exist") + .to_owned(); let fpath = PathBuf::from(&path); if !fpath.exists() && path.contains(':') { @@ -278,8 +293,13 @@ The list is comma-separated and globs are supported. To match tags, prepend the .takes_value(true)) } -pub async fn capture_stream(mut stream: BufReader, job: Option, stderr: bool) -> ColmenaResult - where R: AsyncRead + Unpin +pub async fn capture_stream( + mut stream: BufReader, + job: Option, + stderr: bool, +) -> ColmenaResult +where + R: AsyncRead + Unpin, { let mut log = String::new(); @@ -325,9 +345,7 @@ mod tests { let expected = "Hello\nWorld\n"; let stream = BufReader::new(expected.as_bytes()); - let captured = block_on(async { - capture_stream(stream, None, false).await.unwrap() - }); + let captured = block_on(async { capture_stream(stream, None, false).await.unwrap() }); assert_eq!(expected, captured); } @@ -335,9 +353,7 @@ mod tests { #[test] fn test_capture_stream_with_invalid_utf8() { let stream = BufReader::new([0x80, 0xa].as_slice()); - let captured = block_on(async { - capture_stream(stream, None, false).await.unwrap() - }); + let captured = block_on(async { capture_stream(stream, None, false).await.unwrap() }); assert_eq!("\u{fffd}\n", captured); }