Refactor node names

This commit is contained in:
Zhaofeng Li 2021-11-20 23:34:52 -08:00
parent de397dfc60
commit 5c84134af3
8 changed files with 134 additions and 63 deletions

View file

@ -12,7 +12,7 @@ use crate::nix::deployment::{
Target,
DeploymentOptions,
};
use crate::nix::{NixError, host};
use crate::nix::{NixError, NodeName, host};
use crate::util;
pub fn subcommand() -> App<'static, 'static> {
@ -90,11 +90,15 @@ pub async fn run(_global_args: &ArgMatches<'_>, local_args: &ArgMatches<'_>) ->
}
let hive = util::hive_from_args(local_args).await.unwrap();
let hostname = if local_args.is_present("node") {
local_args.value_of("node").unwrap().to_owned()
} else {
hostname::get().expect("Could not get hostname")
.to_string_lossy().into_owned()
let hostname = {
let s = if local_args.is_present("node") {
local_args.value_of("node").unwrap().to_owned()
} else {
hostname::get().expect("Could not get hostname")
.to_string_lossy().into_owned()
};
NodeName::new(s)?
};
let goal = Goal::from_str(local_args.value_of("goal").unwrap()).unwrap();
@ -102,7 +106,7 @@ pub async fn run(_global_args: &ArgMatches<'_>, local_args: &ArgMatches<'_>) ->
if let Some(info) = hive.deployment_info_for(&hostname).await.unwrap() {
let nix_options = hive.nix_options().await.unwrap();
if !info.allows_local_deployment() {
log::error!("Local deployment is not enabled for host {}.", hostname);
log::error!("Local deployment is not enabled for host {}.", hostname.as_str());
log::error!("Hint: Set deployment.allowLocalDeployment to true.");
quit::with_code(2);
}
@ -111,7 +115,7 @@ pub async fn run(_global_args: &ArgMatches<'_>, local_args: &ArgMatches<'_>) ->
info.clone(),
)
} else {
log::error!("Host {} is not present in the Hive configuration.", hostname);
log::error!("Host {} is not present in the Hive configuration.", hostname.as_str());
quit::with_code(2);
}
};

View file

@ -141,7 +141,7 @@ pub async fn run(_global_args: &ArgMatches<'_>, local_args: &ArgMatches<'_>) ->
None => None,
};
let progress = progress.create_task_progress(name.clone());
let progress = progress.create_task_progress(name.to_string());
let command_v: Vec<&str> = command.iter().map(|s| s.as_str()).collect();
let command = host.ssh(&command_v);

View file

@ -5,7 +5,7 @@ use std::sync::Arc;
use futures::future::join_all;
use tokio::sync::{Mutex, Semaphore};
use super::{Hive, Host, CopyOptions, NodeConfig, Profile, StoreDerivation, ProfileMap, host};
use super::{Hive, Host, CopyOptions, NodeName, NodeConfig, Profile, StoreDerivation, ProfileMap, host};
use super::key::{Key, UploadAt};
use crate::progress::{Progress, TaskProgress, OutputStyle};
@ -109,9 +109,9 @@ impl Goal {
/// Internal deployment stages.
#[derive(Debug)]
enum Stage {
Evaluate(Vec<String>),
Build(Vec<String>),
Apply(String),
Evaluate(Vec<NodeName>),
Build(Vec<NodeName>),
Apply(NodeName),
}
/// Results of a deployment to a node.
@ -168,9 +168,9 @@ impl DeploymentResult {
}
}
fn print_failed_nodes(&self, prefix: &'static str, nodes: &Vec<String>, full_logs: bool) {
fn print_failed_nodes(&self, prefix: &'static str, nodes: &Vec<NodeName>, full_logs: bool) {
let msg = if nodes.len() == 1 {
format!("{} {} failed.", prefix, nodes[0])
format!("{} {} failed.", prefix, nodes[0].as_str())
} else {
format!("{} {} nodes failed.", prefix, nodes.len())
};
@ -209,8 +209,8 @@ impl Target {
pub struct Deployment {
hive: Hive,
goal: Goal,
target_names: Vec<String>,
targets: Mutex<HashMap<String, Target>>,
target_names: Vec<NodeName>,
targets: Mutex<HashMap<NodeName, Target>>,
label_width: usize,
parallelism_limit: ParallelismLimit,
evaluation_node_limit: EvaluationNodeLimit,
@ -219,8 +219,8 @@ pub struct Deployment {
}
impl Deployment {
pub fn new(hive: Hive, targets: HashMap<String, Target>, goal: Goal) -> Self {
let target_names: Vec<String> = targets.keys().cloned().collect();
pub fn new(hive: Hive, targets: HashMap<NodeName, Target>, goal: Goal) -> Self {
let target_names: Vec<NodeName> = targets.keys().cloned().collect();
let label_width = if let Some(len) = target_names.iter().map(|n| n.len()).max() {
max(BATCH_OPERATION_LABEL.len(), len)
@ -280,7 +280,7 @@ impl Deployment {
let progress = progress.clone();
futures.push(async move {
let permit = arc_self.parallelism_limit.apply.acquire().await.unwrap();
let mut task = progress.create_task_progress(node.clone());
let mut task = progress.create_task_progress(node.to_string());
task.log("Uploading keys...");
@ -288,7 +288,7 @@ impl Deployment {
task.failure_err(&e);
let mut results = arc_self.results.lock().await;
let stage = Stage::Apply(node.to_string());
let stage = Stage::Apply(node);
let logs = target.host.dump_logs().await.map(|s| s.to_string());
results.push(DeploymentResult::failure(stage, logs));
return;
@ -337,7 +337,7 @@ impl Deployment {
let progress = progress.clone();
// FIXME: Eww
let chunk: Vec<String> = chunk.iter().map(|s| s.to_string()).collect();
let chunk: Vec<NodeName> = chunk.iter().map(|s| s.clone()).collect();
futures.push(async move {
let drv = {
@ -424,7 +424,7 @@ impl Deployment {
targets.remove(&node).unwrap()
};
let profile = profiles.get(&node).cloned()
.expect(&format!("Somehow profile for {} was not built", node));
.expect(&format!("Somehow profile for {} was not built", node.as_str()));
futures.push(async move {
arc_self.apply_profile(&node, target, profile, progress).await
@ -457,7 +457,7 @@ impl Deployment {
}
}
async fn eval_profiles(self: Arc<Self>, chunk: &Vec<String>, progress: TaskProgress) -> Option<StoreDerivation<ProfileMap>> {
async fn eval_profiles(self: Arc<Self>, chunk: &Vec<NodeName>, progress: TaskProgress) -> Option<StoreDerivation<ProfileMap>> {
let (eval, logs) = self.hive.eval_selected(&chunk, progress.clone()).await;
match eval {
@ -476,7 +476,7 @@ impl Deployment {
}
}
async fn build_profiles(self: Arc<Self>, chunk: &Vec<String>, derivation: StoreDerivation<ProfileMap>, progress: TaskProgress) -> Option<ProfileMap> {
async fn build_profiles(self: Arc<Self>, chunk: &Vec<NodeName>, derivation: StoreDerivation<ProfileMap>, progress: TaskProgress) -> Option<ProfileMap> {
let nix_options = self.hive.nix_options().await.unwrap();
// FIXME: Remote build?
let mut builder = host::local(nix_options);
@ -506,7 +506,7 @@ impl Deployment {
}
}
async fn apply_profile(self: Arc<Self>, name: &str, mut target: Target, profile: Profile, multi: Arc<Progress>) {
async fn apply_profile(self: Arc<Self>, name: &NodeName, mut target: Target, profile: Profile, multi: Arc<Progress>) {
let permit = self.parallelism_limit.apply.acquire().await.unwrap();
let mut bar = multi.create_task_progress(name.to_string());
@ -546,7 +546,7 @@ impl Deployment {
bar.failure_err(&e);
let mut results = self.results.lock().await;
let stage = Stage::Apply(name.to_string());
let stage = Stage::Apply(name.clone());
let logs = target.host.dump_logs().await.map(|s| s.to_string());
results.push(DeploymentResult::failure(stage, logs));
return;
@ -570,7 +570,7 @@ impl Deployment {
bar.failure_err(&e);
let mut results = self.results.lock().await;
let stage = Stage::Apply(name.to_string());
let stage = Stage::Apply(name.clone());
let logs = target.host.dump_logs().await.map(|s| s.to_string());
results.push(DeploymentResult::failure(stage, logs));
return;
@ -580,7 +580,7 @@ impl Deployment {
bar.success(self.goal.success_str().unwrap());
let mut results = self.results.lock().await;
let stage = Stage::Apply(name.to_string());
let stage = Stage::Apply(name.clone());
let logs = target.host.dump_logs().await.map(|s| s.to_string());
results.push(DeploymentResult::success(stage, logs));
}
@ -588,7 +588,7 @@ impl Deployment {
bar.failure(&format!("Failed: {}", e));
let mut results = self.results.lock().await;
let stage = Stage::Apply(name.to_string());
let stage = Stage::Apply(name.clone());
let logs = target.host.dump_logs().await.map(|s| s.to_string());
results.push(DeploymentResult::failure(stage, logs));
}

View file

@ -13,6 +13,7 @@ use super::{
Flake,
StoreDerivation,
NixResult,
NodeName,
NodeConfig,
ProfileMap,
};
@ -116,12 +117,12 @@ impl Hive {
}
/// Retrieve deployment info for all nodes.
pub async fn deployment_info(&self) -> NixResult<HashMap<String, NodeConfig>> {
pub async fn deployment_info(&self) -> NixResult<HashMap<NodeName, NodeConfig>> {
// FIXME: Really ugly :(
let s: String = self.nix_instantiate("hive.deploymentConfigJson").eval_with_builders().await?
.capture_json().await?;
let configs: HashMap<String, NodeConfig> = serde_json::from_str(&s).unwrap();
let configs: HashMap<NodeName, NodeConfig> = serde_json::from_str(&s).unwrap();
for config in configs.values() {
config.validate()?;
for key in config.keys.values() {
@ -132,8 +133,8 @@ impl Hive {
}
/// Retrieve deployment info for a single node.
pub async fn deployment_info_for(&self, node: &str) -> NixResult<Option<NodeConfig>> {
let expr = format!("toJSON (hive.nodes.\"{}\".config.deployment or null)", node);
pub async fn deployment_info_for(&self, node: &NodeName) -> NixResult<Option<NodeConfig>> {
let expr = format!("toJSON (hive.nodes.\"{}\".config.deployment or null)", node.as_str());
let s: String = self.nix_instantiate(&expr).eval_with_builders().await?
.capture_json().await?;
@ -145,7 +146,7 @@ impl Hive {
/// Evaluation may take up a lot of memory, so we make it possible
/// to split up the evaluation process into chunks and run them
/// concurrently with other processes (e.g., build and apply).
pub async fn eval_selected(&self, nodes: &Vec<String>, progress_bar: TaskProgress) -> (NixResult<StoreDerivation<ProfileMap>>, Option<String>) {
pub async fn eval_selected(&self, nodes: &Vec<NodeName>, progress_bar: TaskProgress) -> (NixResult<StoreDerivation<ProfileMap>>, Option<String>) {
// FIXME: The return type is ugly...
let nodes_expr = SerializedNixExpresssion::new(nodes);

View file

@ -1,12 +1,14 @@
use std::collections::HashMap;
use std::convert::TryFrom;
use std::hash::Hash;
use std::ops::Deref;
use std::os::unix::process::ExitStatusExt;
use std::path::Path;
use std::process::{ExitStatus, Stdio};
use async_trait::async_trait;
use serde::de::DeserializeOwned;
use serde::Deserialize;
use serde::de::{self, DeserializeOwned};
use serde::{Deserialize, Deserializer, Serialize};
use snafu::Snafu;
use tokio::process::Command;
use users::get_current_username;
@ -39,6 +41,9 @@ pub use info::NixCheck;
pub mod flake;
pub use flake::Flake;
pub mod node_filter;
pub use node_filter::NodeFilter;
#[cfg(test)]
mod tests;
@ -82,6 +87,9 @@ pub enum NixError {
#[snafu(display("Current Nix version does not support Flakes"))]
NoFlakesSupport,
#[snafu(display("Node name cannot be empty"))]
EmptyNodeName,
#[snafu(display("Nix Error: {}", message))]
Unknown { message: String },
}
@ -113,6 +121,14 @@ impl From<ExitStatus> for NixError {
}
}
/// A node's attribute name.
#[derive(Serialize, Deserialize, Clone, Debug, Hash, Eq, PartialEq)]
#[serde(transparent)]
pub struct NodeName(
#[serde(deserialize_with = "NodeName::deserialize")]
String
);
#[derive(Debug, Clone, Validate, Deserialize)]
pub struct NodeConfig {
#[serde(rename = "targetHost")]
@ -138,6 +154,47 @@ pub struct NodeConfig {
keys: HashMap<String, Key>,
}
impl NodeName {
/// Returns the string.
pub fn as_str(&self) -> &str {
&self.0
}
/// Creates a NodeName from a String.
pub fn new(name: String) -> NixResult<Self> {
let validated = Self::validate(name)?;
Ok(Self(validated))
}
/// Deserializes a potentially-invalid node name.
fn deserialize<'de, D>(deserializer: D) -> Result<String, D::Error>
where D: Deserializer<'de>
{
use de::Error;
String::deserialize(deserializer)
.and_then(|s| {
Self::validate(s).map_err(|e| Error::custom(e.to_string()))
})
}
fn validate(s: String) -> NixResult<String> {
// FIXME: Elaborate
if s.len() == 0 {
return Err(NixError::EmptyNodeName);
}
Ok(s)
}
}
impl Deref for NodeName {
type Target = str;
fn deref(&self) -> &str {
self.0.as_str()
}
}
impl NodeConfig {
pub fn tags(&self) -> &[String] { &self.tags }
pub fn allows_local_deployment(&self) -> bool { self.allow_local_deployment }

View file

@ -11,6 +11,7 @@ use super::{
Goal,
NixResult,
NixError,
NodeName,
StorePath,
};
@ -65,10 +66,10 @@ impl Profile {
/// A map of names to their associated NixOS system profiles.
#[derive(Debug)]
pub struct ProfileMap(HashMap<String, Profile>);
pub struct ProfileMap(HashMap<NodeName, Profile>);
impl Deref for ProfileMap {
type Target = HashMap<String, Profile>;
type Target = HashMap<NodeName, Profile>;
fn deref(&self) -> &Self::Target {
&self.0
@ -98,7 +99,7 @@ impl TryFrom<Vec<StorePath>> for ProfileMap {
let path = paths[0].as_path();
let json: String = fs::read_to_string(path)?;
let mut raw_map: HashMap<String, StorePath> = serde_json::from_str(&json).map_err(|_| NixError::BadOutput {
let mut raw_map: HashMap<NodeName, StorePath> = serde_json::from_str(&json).map_err(|_| NixError::BadOutput {
output: String::from("The returned profile map is invalid"),
})?;
@ -122,7 +123,7 @@ impl ProfileMap {
// This will actually try to build all profiles, but since they
// already exist only the GC roots will be created.
for (node, profile) in self.0.iter() {
let path = base.join(format!("node-{}", node));
let path = base.join(format!("node-{}", node.as_str()));
let mut command = Command::new("nix-store");
command.args(&["--no-build-output", "--indirect", "--add-root", path.to_str().unwrap()]);

View file

@ -13,6 +13,12 @@ use std::path::PathBuf;
use tempfile::NamedTempFile;
use tokio_test::block_on;
macro_rules! node {
($n:expr) => {
NodeName::new($n.to_string()).unwrap()
}
}
fn set_eq<T>(a: &[T], b: &[T]) -> bool
where
T: Eq + Hash,
@ -61,7 +67,7 @@ impl TempHive {
}
/// Asserts that the specified nodes can be fully evaluated.
pub fn eval_success(text: &str, nodes: Vec<String>) {
pub fn eval_success(text: &str, nodes: Vec<NodeName>) {
let hive = Self::new(text);
let progress = TaskProgress::new("tests".to_string(), 5);
let (profiles, _) = block_on(hive.eval_selected(&nodes, progress));
@ -69,7 +75,7 @@ impl TempHive {
}
/// Asserts that the specified nodes will fail to evaluate.
pub fn eval_failure(text: &str, nodes: Vec<String>) {
pub fn eval_failure(text: &str, nodes: Vec<NodeName>) {
let hive = Self::new(text);
let progress = TaskProgress::new("tests".to_string(), 5);
let (profiles, _) = block_on(hive.eval_selected(&nodes, progress));
@ -125,26 +131,28 @@ fn test_parse_simple() {
assert!(set_eq(
&["host-a", "host-b"],
&nodes.keys().map(String::as_str).collect::<Vec<&str>>(),
&nodes.keys().map(NodeName::as_str).collect::<Vec<&str>>(),
));
// host-a
let host_a = &nodes[&node!("host-a")];
assert!(set_eq(
&["common-tag", "a-tag"],
&nodes["host-a"].tags.iter().map(String::as_str).collect::<Vec<&str>>(),
&host_a.tags.iter().map(String::as_str).collect::<Vec<&str>>(),
));
assert_eq!(Some("host-a"), nodes["host-a"].target_host.as_deref());
assert_eq!(None, nodes["host-a"].target_port);
assert_eq!(Some("root"), nodes["host-a"].target_user.as_deref());
assert_eq!(Some("host-a"), host_a.target_host.as_deref());
assert_eq!(None, host_a.target_port);
assert_eq!(Some("root"), host_a.target_user.as_deref());
// host-b
let host_b = &nodes[&node!("host-b")];
assert!(set_eq(
&["common-tag"],
&nodes["host-b"].tags.iter().map(String::as_str).collect::<Vec<&str>>(),
&host_b.tags.iter().map(String::as_str).collect::<Vec<&str>>(),
));
assert_eq!(Some("somehost.tld"), nodes["host-b"].target_host.as_deref());
assert_eq!(Some(1234), nodes["host-b"].target_port);
assert_eq!(Some("luser"), nodes["host-b"].target_user.as_deref());
assert_eq!(Some("somehost.tld"), host_b.target_host.as_deref());
assert_eq!(Some(1234), host_b.target_port);
assert_eq!(Some("luser"), host_b.target_user.as_deref());
}
#[test]
@ -160,7 +168,7 @@ fn test_parse_flake() {
let nodes = block_on(hive.deployment_info()).unwrap();
assert!(set_eq(
&["host-a", "host-b"],
&nodes.keys().map(String::as_str).collect::<Vec<&str>>(),
&nodes.keys().map(NodeName::as_str).collect::<Vec<&str>>(),
));
}
@ -257,7 +265,7 @@ fn test_eval_non_existent_pkg() {
environment.systemPackages = with pkgs; [ thisPackageDoesNotExist ];
};
}
"#, vec![ "test".to_string() ]);
"#, vec![ node!("test") ]);
}
// Nixpkgs config tests
@ -279,7 +287,7 @@ fn test_nixpkgs_overlay_meta_nixpkgs() {
environment.systemPackages = with pkgs; [ my-coreutils ];
};
}
"#, vec![ "test".to_string() ]);
"#, vec![ node!("test") ]);
}
#[test]
@ -295,7 +303,7 @@ fn test_nixpkgs_overlay_node_config() {
environment.systemPackages = with pkgs; [ my-coreutils ];
};
}
"#, vec![ "test".to_string() ]);
"#, vec![ node!("test") ]);
}
#[test]
@ -318,7 +326,7 @@ fn test_nixpkgs_overlay_both() {
environment.systemPackages = with pkgs; [ meta-coreutils node-busybox ];
};
}
"#, vec![ "test".to_string() ]);
"#, vec![ node!("test") ]);
}
#[test]
@ -337,7 +345,7 @@ fn test_nixpkgs_config_meta_nixpkgs() {
boot.isContainer = assert pkgs.config.allowUnfree; true;
};
}
"#, vec![ "test".to_string() ]);
"#, vec![ node!("test") ]);
}
#[test]
@ -352,7 +360,7 @@ fn test_nixpkgs_config_node_config() {
boot.isContainer = assert pkgs.config.allowUnfree; true;
};
}
"#, vec![ "test".to_string() ]);
"#, vec![ node!("test") ]);
}
#[test]
@ -381,7 +389,7 @@ fn test_nixpkgs_config_override() {
.replace("META_VAL", "true")
.replace("NODE_VAL", "false")
.replace("EXPECTED_VAL", "false"),
vec![ "test".to_string() ]
vec![ node!("test") ]
);
TempHive::eval_success(
@ -389,7 +397,7 @@ fn test_nixpkgs_config_override() {
.replace("META_VAL", "false")
.replace("NODE_VAL", "true")
.replace("EXPECTED_VAL", "true"),
vec![ "test".to_string() ]
vec![ node!("test") ]
);
}

View file

@ -8,7 +8,7 @@ use glob::Pattern as GlobPattern;
use tokio::io::{AsyncRead, AsyncBufReadExt, BufReader};
use tokio::process::Command;
use super::nix::{Flake, NodeConfig, Hive, HivePath, NixResult};
use super::nix::{Flake, NodeName, NodeConfig, Hive, HivePath, NixResult};
use super::progress::TaskProgress;
enum NodeFilter {
@ -145,7 +145,7 @@ pub async fn hive_from_args(args: &ArgMatches<'_>) -> NixResult<Hive> {
Ok(hive)
}
pub fn filter_nodes(nodes: &HashMap<String, NodeConfig>, filter: &str) -> Vec<String> {
pub fn filter_nodes(nodes: &HashMap<NodeName, NodeConfig>, filter: &str) -> Vec<NodeName> {
let filters: Vec<NodeFilter> = filter.split(",").map(|pattern| {
use NodeFilter::*;
if let Some(tag_pattern) = pattern.strip_prefix("@") {