From 5c84134af30bd7a330feadcaa348cd76c41f7e0b Mon Sep 17 00:00:00 2001 From: Zhaofeng Li Date: Sat, 20 Nov 2021 23:34:52 -0800 Subject: [PATCH] Refactor node names --- src/command/apply_local.rs | 20 ++++++++----- src/command/exec.rs | 2 +- src/nix/deployment.rs | 42 +++++++++++++------------- src/nix/hive.rs | 11 +++---- src/nix/mod.rs | 61 ++++++++++++++++++++++++++++++++++++-- src/nix/profile.rs | 9 +++--- src/nix/tests/mod.rs | 48 +++++++++++++++++------------- src/util.rs | 4 +-- 8 files changed, 134 insertions(+), 63 deletions(-) diff --git a/src/command/apply_local.rs b/src/command/apply_local.rs index e6ec0c3..8b62e14 100644 --- a/src/command/apply_local.rs +++ b/src/command/apply_local.rs @@ -12,7 +12,7 @@ use crate::nix::deployment::{ Target, DeploymentOptions, }; -use crate::nix::{NixError, host}; +use crate::nix::{NixError, NodeName, host}; use crate::util; pub fn subcommand() -> App<'static, 'static> { @@ -90,11 +90,15 @@ pub async fn run(_global_args: &ArgMatches<'_>, local_args: &ArgMatches<'_>) -> } let hive = util::hive_from_args(local_args).await.unwrap(); - let hostname = if local_args.is_present("node") { - local_args.value_of("node").unwrap().to_owned() - } else { - hostname::get().expect("Could not get hostname") - .to_string_lossy().into_owned() + let hostname = { + let s = if local_args.is_present("node") { + local_args.value_of("node").unwrap().to_owned() + } else { + hostname::get().expect("Could not get hostname") + .to_string_lossy().into_owned() + }; + + NodeName::new(s)? }; let goal = Goal::from_str(local_args.value_of("goal").unwrap()).unwrap(); @@ -102,7 +106,7 @@ pub async fn run(_global_args: &ArgMatches<'_>, local_args: &ArgMatches<'_>) -> if let Some(info) = hive.deployment_info_for(&hostname).await.unwrap() { let nix_options = hive.nix_options().await.unwrap(); if !info.allows_local_deployment() { - log::error!("Local deployment is not enabled for host {}.", hostname); + log::error!("Local deployment is not enabled for host {}.", hostname.as_str()); log::error!("Hint: Set deployment.allowLocalDeployment to true."); quit::with_code(2); } @@ -111,7 +115,7 @@ pub async fn run(_global_args: &ArgMatches<'_>, local_args: &ArgMatches<'_>) -> info.clone(), ) } else { - log::error!("Host {} is not present in the Hive configuration.", hostname); + log::error!("Host {} is not present in the Hive configuration.", hostname.as_str()); quit::with_code(2); } }; diff --git a/src/command/exec.rs b/src/command/exec.rs index 3c692f5..50d16bb 100644 --- a/src/command/exec.rs +++ b/src/command/exec.rs @@ -141,7 +141,7 @@ pub async fn run(_global_args: &ArgMatches<'_>, local_args: &ArgMatches<'_>) -> None => None, }; - let progress = progress.create_task_progress(name.clone()); + let progress = progress.create_task_progress(name.to_string()); let command_v: Vec<&str> = command.iter().map(|s| s.as_str()).collect(); let command = host.ssh(&command_v); diff --git a/src/nix/deployment.rs b/src/nix/deployment.rs index 34c88fe..eda3227 100644 --- a/src/nix/deployment.rs +++ b/src/nix/deployment.rs @@ -5,7 +5,7 @@ use std::sync::Arc; use futures::future::join_all; use tokio::sync::{Mutex, Semaphore}; -use super::{Hive, Host, CopyOptions, NodeConfig, Profile, StoreDerivation, ProfileMap, host}; +use super::{Hive, Host, CopyOptions, NodeName, NodeConfig, Profile, StoreDerivation, ProfileMap, host}; use super::key::{Key, UploadAt}; use crate::progress::{Progress, TaskProgress, OutputStyle}; @@ -109,9 +109,9 @@ impl Goal { /// Internal deployment stages. #[derive(Debug)] enum Stage { - Evaluate(Vec), - Build(Vec), - Apply(String), + Evaluate(Vec), + Build(Vec), + Apply(NodeName), } /// Results of a deployment to a node. @@ -168,9 +168,9 @@ impl DeploymentResult { } } - fn print_failed_nodes(&self, prefix: &'static str, nodes: &Vec, full_logs: bool) { + fn print_failed_nodes(&self, prefix: &'static str, nodes: &Vec, full_logs: bool) { let msg = if nodes.len() == 1 { - format!("{} {} failed.", prefix, nodes[0]) + format!("{} {} failed.", prefix, nodes[0].as_str()) } else { format!("{} {} nodes failed.", prefix, nodes.len()) }; @@ -209,8 +209,8 @@ impl Target { pub struct Deployment { hive: Hive, goal: Goal, - target_names: Vec, - targets: Mutex>, + target_names: Vec, + targets: Mutex>, label_width: usize, parallelism_limit: ParallelismLimit, evaluation_node_limit: EvaluationNodeLimit, @@ -219,8 +219,8 @@ pub struct Deployment { } impl Deployment { - pub fn new(hive: Hive, targets: HashMap, goal: Goal) -> Self { - let target_names: Vec = targets.keys().cloned().collect(); + pub fn new(hive: Hive, targets: HashMap, goal: Goal) -> Self { + let target_names: Vec = targets.keys().cloned().collect(); let label_width = if let Some(len) = target_names.iter().map(|n| n.len()).max() { max(BATCH_OPERATION_LABEL.len(), len) @@ -280,7 +280,7 @@ impl Deployment { let progress = progress.clone(); futures.push(async move { let permit = arc_self.parallelism_limit.apply.acquire().await.unwrap(); - let mut task = progress.create_task_progress(node.clone()); + let mut task = progress.create_task_progress(node.to_string()); task.log("Uploading keys..."); @@ -288,7 +288,7 @@ impl Deployment { task.failure_err(&e); let mut results = arc_self.results.lock().await; - let stage = Stage::Apply(node.to_string()); + let stage = Stage::Apply(node); let logs = target.host.dump_logs().await.map(|s| s.to_string()); results.push(DeploymentResult::failure(stage, logs)); return; @@ -337,7 +337,7 @@ impl Deployment { let progress = progress.clone(); // FIXME: Eww - let chunk: Vec = chunk.iter().map(|s| s.to_string()).collect(); + let chunk: Vec = chunk.iter().map(|s| s.clone()).collect(); futures.push(async move { let drv = { @@ -424,7 +424,7 @@ impl Deployment { targets.remove(&node).unwrap() }; let profile = profiles.get(&node).cloned() - .expect(&format!("Somehow profile for {} was not built", node)); + .expect(&format!("Somehow profile for {} was not built", node.as_str())); futures.push(async move { arc_self.apply_profile(&node, target, profile, progress).await @@ -457,7 +457,7 @@ impl Deployment { } } - async fn eval_profiles(self: Arc, chunk: &Vec, progress: TaskProgress) -> Option> { + async fn eval_profiles(self: Arc, chunk: &Vec, progress: TaskProgress) -> Option> { let (eval, logs) = self.hive.eval_selected(&chunk, progress.clone()).await; match eval { @@ -476,7 +476,7 @@ impl Deployment { } } - async fn build_profiles(self: Arc, chunk: &Vec, derivation: StoreDerivation, progress: TaskProgress) -> Option { + async fn build_profiles(self: Arc, chunk: &Vec, derivation: StoreDerivation, progress: TaskProgress) -> Option { let nix_options = self.hive.nix_options().await.unwrap(); // FIXME: Remote build? let mut builder = host::local(nix_options); @@ -506,7 +506,7 @@ impl Deployment { } } - async fn apply_profile(self: Arc, name: &str, mut target: Target, profile: Profile, multi: Arc) { + async fn apply_profile(self: Arc, name: &NodeName, mut target: Target, profile: Profile, multi: Arc) { let permit = self.parallelism_limit.apply.acquire().await.unwrap(); let mut bar = multi.create_task_progress(name.to_string()); @@ -546,7 +546,7 @@ impl Deployment { bar.failure_err(&e); let mut results = self.results.lock().await; - let stage = Stage::Apply(name.to_string()); + let stage = Stage::Apply(name.clone()); let logs = target.host.dump_logs().await.map(|s| s.to_string()); results.push(DeploymentResult::failure(stage, logs)); return; @@ -570,7 +570,7 @@ impl Deployment { bar.failure_err(&e); let mut results = self.results.lock().await; - let stage = Stage::Apply(name.to_string()); + let stage = Stage::Apply(name.clone()); let logs = target.host.dump_logs().await.map(|s| s.to_string()); results.push(DeploymentResult::failure(stage, logs)); return; @@ -580,7 +580,7 @@ impl Deployment { bar.success(self.goal.success_str().unwrap()); let mut results = self.results.lock().await; - let stage = Stage::Apply(name.to_string()); + let stage = Stage::Apply(name.clone()); let logs = target.host.dump_logs().await.map(|s| s.to_string()); results.push(DeploymentResult::success(stage, logs)); } @@ -588,7 +588,7 @@ impl Deployment { bar.failure(&format!("Failed: {}", e)); let mut results = self.results.lock().await; - let stage = Stage::Apply(name.to_string()); + let stage = Stage::Apply(name.clone()); let logs = target.host.dump_logs().await.map(|s| s.to_string()); results.push(DeploymentResult::failure(stage, logs)); } diff --git a/src/nix/hive.rs b/src/nix/hive.rs index 2473d98..cda7880 100644 --- a/src/nix/hive.rs +++ b/src/nix/hive.rs @@ -13,6 +13,7 @@ use super::{ Flake, StoreDerivation, NixResult, + NodeName, NodeConfig, ProfileMap, }; @@ -116,12 +117,12 @@ impl Hive { } /// Retrieve deployment info for all nodes. - pub async fn deployment_info(&self) -> NixResult> { + pub async fn deployment_info(&self) -> NixResult> { // FIXME: Really ugly :( let s: String = self.nix_instantiate("hive.deploymentConfigJson").eval_with_builders().await? .capture_json().await?; - let configs: HashMap = serde_json::from_str(&s).unwrap(); + let configs: HashMap = serde_json::from_str(&s).unwrap(); for config in configs.values() { config.validate()?; for key in config.keys.values() { @@ -132,8 +133,8 @@ impl Hive { } /// Retrieve deployment info for a single node. - pub async fn deployment_info_for(&self, node: &str) -> NixResult> { - let expr = format!("toJSON (hive.nodes.\"{}\".config.deployment or null)", node); + pub async fn deployment_info_for(&self, node: &NodeName) -> NixResult> { + let expr = format!("toJSON (hive.nodes.\"{}\".config.deployment or null)", node.as_str()); let s: String = self.nix_instantiate(&expr).eval_with_builders().await? .capture_json().await?; @@ -145,7 +146,7 @@ impl Hive { /// Evaluation may take up a lot of memory, so we make it possible /// to split up the evaluation process into chunks and run them /// concurrently with other processes (e.g., build and apply). - pub async fn eval_selected(&self, nodes: &Vec, progress_bar: TaskProgress) -> (NixResult>, Option) { + pub async fn eval_selected(&self, nodes: &Vec, progress_bar: TaskProgress) -> (NixResult>, Option) { // FIXME: The return type is ugly... let nodes_expr = SerializedNixExpresssion::new(nodes); diff --git a/src/nix/mod.rs b/src/nix/mod.rs index 413a6b5..a8edf12 100644 --- a/src/nix/mod.rs +++ b/src/nix/mod.rs @@ -1,12 +1,14 @@ use std::collections::HashMap; use std::convert::TryFrom; +use std::hash::Hash; +use std::ops::Deref; use std::os::unix::process::ExitStatusExt; use std::path::Path; use std::process::{ExitStatus, Stdio}; use async_trait::async_trait; -use serde::de::DeserializeOwned; -use serde::Deserialize; +use serde::de::{self, DeserializeOwned}; +use serde::{Deserialize, Deserializer, Serialize}; use snafu::Snafu; use tokio::process::Command; use users::get_current_username; @@ -39,6 +41,9 @@ pub use info::NixCheck; pub mod flake; pub use flake::Flake; +pub mod node_filter; +pub use node_filter::NodeFilter; + #[cfg(test)] mod tests; @@ -82,6 +87,9 @@ pub enum NixError { #[snafu(display("Current Nix version does not support Flakes"))] NoFlakesSupport, + #[snafu(display("Node name cannot be empty"))] + EmptyNodeName, + #[snafu(display("Nix Error: {}", message))] Unknown { message: String }, } @@ -113,6 +121,14 @@ impl From for NixError { } } +/// A node's attribute name. +#[derive(Serialize, Deserialize, Clone, Debug, Hash, Eq, PartialEq)] +#[serde(transparent)] +pub struct NodeName( + #[serde(deserialize_with = "NodeName::deserialize")] + String +); + #[derive(Debug, Clone, Validate, Deserialize)] pub struct NodeConfig { #[serde(rename = "targetHost")] @@ -138,6 +154,47 @@ pub struct NodeConfig { keys: HashMap, } +impl NodeName { + /// Returns the string. + pub fn as_str(&self) -> &str { + &self.0 + } + + /// Creates a NodeName from a String. + pub fn new(name: String) -> NixResult { + let validated = Self::validate(name)?; + Ok(Self(validated)) + } + + /// Deserializes a potentially-invalid node name. + fn deserialize<'de, D>(deserializer: D) -> Result + where D: Deserializer<'de> + { + use de::Error; + String::deserialize(deserializer) + .and_then(|s| { + Self::validate(s).map_err(|e| Error::custom(e.to_string())) + }) + } + + fn validate(s: String) -> NixResult { + // FIXME: Elaborate + if s.len() == 0 { + return Err(NixError::EmptyNodeName); + } + + Ok(s) + } +} + +impl Deref for NodeName { + type Target = str; + + fn deref(&self) -> &str { + self.0.as_str() + } +} + impl NodeConfig { pub fn tags(&self) -> &[String] { &self.tags } pub fn allows_local_deployment(&self) -> bool { self.allow_local_deployment } diff --git a/src/nix/profile.rs b/src/nix/profile.rs index 769f5e0..11d9b61 100644 --- a/src/nix/profile.rs +++ b/src/nix/profile.rs @@ -11,6 +11,7 @@ use super::{ Goal, NixResult, NixError, + NodeName, StorePath, }; @@ -65,10 +66,10 @@ impl Profile { /// A map of names to their associated NixOS system profiles. #[derive(Debug)] -pub struct ProfileMap(HashMap); +pub struct ProfileMap(HashMap); impl Deref for ProfileMap { - type Target = HashMap; + type Target = HashMap; fn deref(&self) -> &Self::Target { &self.0 @@ -98,7 +99,7 @@ impl TryFrom> for ProfileMap { let path = paths[0].as_path(); let json: String = fs::read_to_string(path)?; - let mut raw_map: HashMap = serde_json::from_str(&json).map_err(|_| NixError::BadOutput { + let mut raw_map: HashMap = serde_json::from_str(&json).map_err(|_| NixError::BadOutput { output: String::from("The returned profile map is invalid"), })?; @@ -122,7 +123,7 @@ impl ProfileMap { // This will actually try to build all profiles, but since they // already exist only the GC roots will be created. for (node, profile) in self.0.iter() { - let path = base.join(format!("node-{}", node)); + let path = base.join(format!("node-{}", node.as_str())); let mut command = Command::new("nix-store"); command.args(&["--no-build-output", "--indirect", "--add-root", path.to_str().unwrap()]); diff --git a/src/nix/tests/mod.rs b/src/nix/tests/mod.rs index 5694851..7822f89 100644 --- a/src/nix/tests/mod.rs +++ b/src/nix/tests/mod.rs @@ -13,6 +13,12 @@ use std::path::PathBuf; use tempfile::NamedTempFile; use tokio_test::block_on; +macro_rules! node { + ($n:expr) => { + NodeName::new($n.to_string()).unwrap() + } +} + fn set_eq(a: &[T], b: &[T]) -> bool where T: Eq + Hash, @@ -61,7 +67,7 @@ impl TempHive { } /// Asserts that the specified nodes can be fully evaluated. - pub fn eval_success(text: &str, nodes: Vec) { + pub fn eval_success(text: &str, nodes: Vec) { let hive = Self::new(text); let progress = TaskProgress::new("tests".to_string(), 5); let (profiles, _) = block_on(hive.eval_selected(&nodes, progress)); @@ -69,7 +75,7 @@ impl TempHive { } /// Asserts that the specified nodes will fail to evaluate. - pub fn eval_failure(text: &str, nodes: Vec) { + pub fn eval_failure(text: &str, nodes: Vec) { let hive = Self::new(text); let progress = TaskProgress::new("tests".to_string(), 5); let (profiles, _) = block_on(hive.eval_selected(&nodes, progress)); @@ -125,26 +131,28 @@ fn test_parse_simple() { assert!(set_eq( &["host-a", "host-b"], - &nodes.keys().map(String::as_str).collect::>(), + &nodes.keys().map(NodeName::as_str).collect::>(), )); // host-a + let host_a = &nodes[&node!("host-a")]; assert!(set_eq( &["common-tag", "a-tag"], - &nodes["host-a"].tags.iter().map(String::as_str).collect::>(), + &host_a.tags.iter().map(String::as_str).collect::>(), )); - assert_eq!(Some("host-a"), nodes["host-a"].target_host.as_deref()); - assert_eq!(None, nodes["host-a"].target_port); - assert_eq!(Some("root"), nodes["host-a"].target_user.as_deref()); + assert_eq!(Some("host-a"), host_a.target_host.as_deref()); + assert_eq!(None, host_a.target_port); + assert_eq!(Some("root"), host_a.target_user.as_deref()); // host-b + let host_b = &nodes[&node!("host-b")]; assert!(set_eq( &["common-tag"], - &nodes["host-b"].tags.iter().map(String::as_str).collect::>(), + &host_b.tags.iter().map(String::as_str).collect::>(), )); - assert_eq!(Some("somehost.tld"), nodes["host-b"].target_host.as_deref()); - assert_eq!(Some(1234), nodes["host-b"].target_port); - assert_eq!(Some("luser"), nodes["host-b"].target_user.as_deref()); + assert_eq!(Some("somehost.tld"), host_b.target_host.as_deref()); + assert_eq!(Some(1234), host_b.target_port); + assert_eq!(Some("luser"), host_b.target_user.as_deref()); } #[test] @@ -160,7 +168,7 @@ fn test_parse_flake() { let nodes = block_on(hive.deployment_info()).unwrap(); assert!(set_eq( &["host-a", "host-b"], - &nodes.keys().map(String::as_str).collect::>(), + &nodes.keys().map(NodeName::as_str).collect::>(), )); } @@ -257,7 +265,7 @@ fn test_eval_non_existent_pkg() { environment.systemPackages = with pkgs; [ thisPackageDoesNotExist ]; }; } - "#, vec![ "test".to_string() ]); + "#, vec![ node!("test") ]); } // Nixpkgs config tests @@ -279,7 +287,7 @@ fn test_nixpkgs_overlay_meta_nixpkgs() { environment.systemPackages = with pkgs; [ my-coreutils ]; }; } - "#, vec![ "test".to_string() ]); + "#, vec![ node!("test") ]); } #[test] @@ -295,7 +303,7 @@ fn test_nixpkgs_overlay_node_config() { environment.systemPackages = with pkgs; [ my-coreutils ]; }; } - "#, vec![ "test".to_string() ]); + "#, vec![ node!("test") ]); } #[test] @@ -318,7 +326,7 @@ fn test_nixpkgs_overlay_both() { environment.systemPackages = with pkgs; [ meta-coreutils node-busybox ]; }; } - "#, vec![ "test".to_string() ]); + "#, vec![ node!("test") ]); } #[test] @@ -337,7 +345,7 @@ fn test_nixpkgs_config_meta_nixpkgs() { boot.isContainer = assert pkgs.config.allowUnfree; true; }; } - "#, vec![ "test".to_string() ]); + "#, vec![ node!("test") ]); } #[test] @@ -352,7 +360,7 @@ fn test_nixpkgs_config_node_config() { boot.isContainer = assert pkgs.config.allowUnfree; true; }; } - "#, vec![ "test".to_string() ]); + "#, vec![ node!("test") ]); } #[test] @@ -381,7 +389,7 @@ fn test_nixpkgs_config_override() { .replace("META_VAL", "true") .replace("NODE_VAL", "false") .replace("EXPECTED_VAL", "false"), - vec![ "test".to_string() ] + vec![ node!("test") ] ); TempHive::eval_success( @@ -389,7 +397,7 @@ fn test_nixpkgs_config_override() { .replace("META_VAL", "false") .replace("NODE_VAL", "true") .replace("EXPECTED_VAL", "true"), - vec![ "test".to_string() ] + vec![ node!("test") ] ); } diff --git a/src/util.rs b/src/util.rs index 8735962..2330812 100644 --- a/src/util.rs +++ b/src/util.rs @@ -8,7 +8,7 @@ use glob::Pattern as GlobPattern; use tokio::io::{AsyncRead, AsyncBufReadExt, BufReader}; use tokio::process::Command; -use super::nix::{Flake, NodeConfig, Hive, HivePath, NixResult}; +use super::nix::{Flake, NodeName, NodeConfig, Hive, HivePath, NixResult}; use super::progress::TaskProgress; enum NodeFilter { @@ -145,7 +145,7 @@ pub async fn hive_from_args(args: &ArgMatches<'_>) -> NixResult { Ok(hive) } -pub fn filter_nodes(nodes: &HashMap, filter: &str) -> Vec { +pub fn filter_nodes(nodes: &HashMap, filter: &str) -> Vec { let filters: Vec = filter.split(",").map(|pattern| { use NodeFilter::*; if let Some(tag_pattern) = pattern.strip_prefix("@") {