From 71200e5c1780891ae543d8c09d5bfe6b56306e2d Mon Sep 17 00:00:00 2001 From: soyouzpanda Date: Sat, 24 Aug 2024 15:26:36 +0200 Subject: [PATCH] feat(generic): Implemented every function of Host to GenericHost --- src/error.rs | 3 + src/nix/deployment/mod.rs | 15 +--- src/nix/hive/eval.nix | 2 +- src/nix/hive/mod.rs | 47 +++++++----- src/nix/hive/options.nix | 9 +++ src/nix/host/generic.rs | 152 ++++++++++++++++++++++++++++---------- src/nix/host/local.rs | 11 +-- src/nix/host/mod.rs | 21 ++---- src/nix/host/ssh.rs | 11 +-- src/nix/mod.rs | 8 +- src/nix/profile.rs | 6 +- 11 files changed, 175 insertions(+), 110 deletions(-) diff --git a/src/error.rs b/src/error.rs index f0fbe11..40093b5 100644 --- a/src/error.rs +++ b/src/error.rs @@ -55,6 +55,9 @@ pub enum ColmenaError { #[snafu(display("Unexpected active profile: {:?}", profile))] ActiveProfileUnexpected { profile: Profile }, + #[snafu(display("Invalid JSON from activation program: {}", error))] + InvalidActivationProgramJson { error: serde_json::Error }, + #[snafu(display("Could not determine current profile"))] FailedToGetCurrentProfile, diff --git a/src/nix/deployment/mod.rs b/src/nix/deployment/mod.rs index da25324..7b19dca 100644 --- a/src/nix/deployment/mod.rs +++ b/src/nix/deployment/mod.rs @@ -28,7 +28,7 @@ use super::{ host::Local as LocalHost, key::{Key, UploadAt as UploadKeyAt}, ColmenaError, ColmenaResult, CopyDirection, CopyOptions, Hive, Host, NodeConfig, NodeName, - Profile, ProfileDerivation, RebootOptions, StorePath, + Profile, ProfileDerivation, RebootOptions, }; /// A deployment. @@ -637,18 +637,7 @@ impl Deployment { } } - let activation_program: &StorePath = - &arc_self.hive - .get_registry_config() - .await? - .systems - .get(target.config.system_type.as_ref().unwrap_or(&"nixos".to_owned())) - .expect(&format!("System type {} does not exists", - target.config.system_type.as_ref().unwrap_or(&"nixos".to_owned()) - )) - .activation_program; - - host.activate(&profile_r, arc_self.goal, activation_program).await?; + host.activate(&profile_r, arc_self.goal).await?; job.success_with_message(arc_self.goal.success_str().to_string())?; diff --git a/src/nix/hive/eval.nix b/src/nix/hive/eval.nix index e06fe45..3e0ebbb 100644 --- a/src/nix/hive/eval.nix +++ b/src/nix/hive/eval.nix @@ -197,7 +197,7 @@ let ]; serializableSystemTypeConfigKeys = [ - "supportsDeployment" + "supportsDeployment" "activationProgram" "protocol" ]; in rec { diff --git a/src/nix/hive/mod.rs b/src/nix/hive/mod.rs index 8e093f3..c1cd44c 100644 --- a/src/nix/hive/mod.rs +++ b/src/nix/hive/mod.rs @@ -274,24 +274,37 @@ impl Hive { for node in selected_nodes.into_iter() { let config = node_configs.remove(&node).unwrap(); - let host = config.to_ssh_host().map(|mut host| { - n_ssh += 1; - - if let Some(ssh_config) = &ssh_config { - host.set_ssh_config(ssh_config.clone()); - } - - if self.is_flake() { - host.set_use_nix3_copy(true); - } - - host.upcast() - }); - let ssh_host = host.is_some(); - let target = TargetNode::new(node.clone(), host, config); - - if !ssh_only || ssh_host { + if let Some(system_type) = config.system_type.as_ref() { + log::debug!( + "Using generic host (system_type: {}) for node {}", + system_type, + node.0 + ); + let system_config = registry.systems.get(system_type).unwrap(); + let generic_host = config.to_generic_host(system_config); + let target = TargetNode::new(node.clone(), Some(Box::new(generic_host)), config); targets.insert(node, target); + } else { + log::debug!("Using SSH host for node {}", node.0); + let host = config.to_ssh_host().map(|mut host| { + n_ssh += 1; + + if let Some(ssh_config) = &ssh_config { + host.set_ssh_config(ssh_config.clone()); + } + + if self.is_flake() { + host.set_use_nix3_copy(true); + } + + host.upcast() + }); + let ssh_host = host.is_some(); + let target = TargetNode::new(node.clone(), host, config); + + if !ssh_only || ssh_host { + targets.insert(node, target); + } } } diff --git a/src/nix/hive/options.nix b/src/nix/hive/options.nix index 2b65de3..bba966c 100644 --- a/src/nix/hive/options.nix +++ b/src/nix/hive/options.nix @@ -264,6 +264,15 @@ with builtins; rec { ''; type = types.path; }; + protocol = lib.mkOption { + type = types.str; + default = "local"; + description = mdDoc '' + Which protocol to use to deploy. + + It can be ssh, local, etc. but strongly depends on `activationProgram` + ''; + }; }; }; registryOptions = { lib, ... }: let diff --git a/src/nix/host/generic.rs b/src/nix/host/generic.rs index 5b1075a..3cb25d2 100644 --- a/src/nix/host/generic.rs +++ b/src/nix/host/generic.rs @@ -6,35 +6,31 @@ use serde::{Deserialize, Serialize}; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter}; use tokio::process::Command; -use super::{CopyDirection, CopyOptions, Host}; +use super::{CopyDirection, CopyOptions, Host, RebootOptions}; use crate::error::{ColmenaError, ColmenaResult}; use crate::job::JobHandle; -use crate::nix::{self, Profile, StorePath}; +use crate::nix::{self, Key, Profile, StorePath, SystemTypeConfig}; -#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] -pub struct TransportId(String); +pub type TransportId = String; #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] pub struct Transport { id: TransportId, - name: String, long_name: String, description: String, } -#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] -pub struct GoalId(String); +pub type GoalId = String; #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] pub struct Goal { id: GoalId, - name: String, long_name: String, description: String, } /// A request to the activation program -#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub enum Request { /// Asks for activation program's capabilities Capabilities, @@ -44,7 +40,7 @@ pub enum Request { transport: TransportId, path: StorePath, to_host: bool, - options: HashMap, + options: CopyOptions, }, /// Deploys the profile to host @@ -52,18 +48,32 @@ pub enum Request { transport: TransportId, goal: GoalId, toplevel: StorePath, - options: HashMap, + options: CopyOptions, }, /// Realizes the derivation Realize { transport: TransportId, path: StorePath, - options: HashMap, }, /// Uploads keys to host - UploadKeys { transport: TransportId }, + UploadKeys { + transport: TransportId, + keys: HashMap, + require_ownership: bool, + }, + + Activate { + transport: TransportId, + profile: StorePath, + goal: GoalId, + }, + + Reboot { + transport: TransportId, + wait_for_boot: bool, + }, } #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] @@ -96,47 +106,61 @@ impl Host for GenericHost { direction: CopyDirection, options: CopyOptions, ) -> ColmenaResult<()> { - self.call(Request::CopyClosure { + self.call_default_handler(Request::CopyClosure { transport: self.transport.clone(), path: closure.clone(), to_host: direction == CopyDirection::ToRemote, - options: HashMap::from([( - "include_outputs".to_owned(), - if options.include_outputs { - "true".to_owned() - } else { - "false".to_owned() - }, - )]), + options, }) .await } async fn realize_remote(&mut self, derivation: &StorePath) -> ColmenaResult> { - self.call(Request::Realize { - transport: self.transport.clone(), - path: derivation.clone(), - options: HashMap::new(), - }) - .await?; - - Ok(Vec::new()) + Ok(self + .call( + Request::Realize { + transport: self.transport.clone(), + path: derivation.clone(), + }, + move |response, mut store_paths| { + match response { + Response::Progress { phase } => println!("{}", phase), + Response::NewStorePath { store_path } => store_paths.push(store_path), + _ => (), + }; + store_paths + }, + Vec::new(), + ) + .await?) } - fn set_job(&mut self, bar: Option) {} + fn set_job(&mut self, _: Option) {} async fn deploy( &mut self, profile: &Profile, goal: nix::Goal, copy_options: CopyOptions, - activation_program: Option<&StorePath>, ) -> ColmenaResult<()> { - self.call(Request::Deploy { + self.call_default_handler(Request::Deploy { transport: self.transport.clone(), - goal: GoalId(goal.to_string()), + goal: goal.to_string(), toplevel: profile.as_store_path().clone(), - options: HashMap::new(), + options: copy_options, + }) + .await + } + + async fn upload_keys( + &mut self, + keys: &HashMap, + require_ownership: bool, + ) -> ColmenaResult<()> { + self.call_default_handler(Request::UploadKeys { + transport: self.transport.clone(), + keys: keys.clone(), + require_ownership, }) .await } @@ -148,17 +172,50 @@ impl Host for GenericHost { async fn get_main_system_profile(&mut self) -> ColmenaResult { Err(ColmenaError::Unsupported) } + + async fn activate(&mut self, profile: &Profile, goal: nix::Goal) -> ColmenaResult<()> { + self.call_default_handler(Request::Activate { + transport: self.transport.clone(), + profile: profile.as_store_path().clone(), + goal: goal.to_string(), + }) + .await + } + + async fn reboot(&mut self, options: RebootOptions) -> ColmenaResult<()> { + self.call_default_handler(Request::Reboot { + transport: self.transport.clone(), + wait_for_boot: options.wait_for_boot, + }) + .await + } } impl GenericHost { - async fn call(&mut self, request: Request) -> ColmenaResult<()> { + pub fn new(system: &SystemTypeConfig) -> GenericHost { + Self { + activation_program: system.activation_program.clone(), + transport: system.protocol.clone(), + } + } + + async fn call( + &mut self, + request: Request, + handler: F, + initial_value: T, + ) -> ColmenaResult + where + F: Fn(Response, T) -> T, + { let mut command = Command::new(self.activation_program.as_path()) .stdin(Stdio::piped()) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .spawn()?; - let json = serde_json::to_string(&request).unwrap(); + let json = serde_json::to_string(&request) + .map_err(|error| ColmenaError::InvalidActivationProgramJson { error })?; let mut stdin = BufWriter::new(command.stdin.take().unwrap()); let mut stdout = BufReader::new(command.stdout.take().unwrap()).lines(); @@ -172,11 +229,26 @@ impl GenericHost { .expect("child process encountered an error"); }); + let mut value = initial_value; + while let Some(line) = stdout.next_line().await? { - let response: Response = serde_json::from_str(line.as_str()).unwrap(); - println!("{:?}", response); + let response: Response = serde_json::from_str(line.as_str()) + .map_err(|error| ColmenaError::InvalidActivationProgramJson { error })?; + value = handler(response, value); } - Ok(()) + Ok(value) + } + + async fn call_default_handler(&mut self, request: Request) -> ColmenaResult<()> { + self.call( + request, + |response, _| match response { + Response::Progress { phase } => println!("{phase}"), + _ => (), + }, + (), + ) + .await } } diff --git a/src/nix/host/local.rs b/src/nix/host/local.rs index 681394b..a5bf0df 100644 --- a/src/nix/host/local.rs +++ b/src/nix/host/local.rs @@ -78,12 +78,7 @@ impl Host for Local { Ok(()) } - async fn activate( - &mut self, - profile: &Profile, - goal: Goal, - activation_program: &StorePath, - ) -> ColmenaResult<()> { + async fn activate(&mut self, profile: &Profile, goal: Goal) -> ColmenaResult<()> { if !goal.requires_activation() { return Err(ColmenaError::Unsupported); } @@ -96,9 +91,7 @@ impl Host for Local { } let command = { - let activation_command = profile - .activation_command(goal, activation_program) - .unwrap(); + let activation_command = profile.activation_command(goal).unwrap(); self.make_privileged_command(&activation_command) }; diff --git a/src/nix/host/mod.rs b/src/nix/host/mod.rs index 69c9b90..8d8a40b 100644 --- a/src/nix/host/mod.rs +++ b/src/nix/host/mod.rs @@ -1,6 +1,7 @@ use std::collections::HashMap; use async_trait::async_trait; +use serde::{Deserialize, Serialize}; use super::{Goal, Key, Profile, StorePath}; use crate::error::{ColmenaError, ColmenaResult}; @@ -15,14 +16,15 @@ pub use local::Local; mod key_uploader; mod generic; +pub use generic::GenericHost; -#[derive(Copy, Clone, Debug, PartialEq, Eq)] +#[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] pub enum CopyDirection { ToRemote, FromRemote, } -#[derive(Copy, Clone, Debug)] +#[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] pub struct CopyOptions { include_outputs: bool, use_substitutes: bool, @@ -130,7 +132,6 @@ pub trait Host: Send + Sync + std::fmt::Debug { profile: &Profile, goal: Goal, copy_options: CopyOptions, - activation_program: Option<&StorePath>, ) -> ColmenaResult<()> { self.copy_closure( profile.as_store_path(), @@ -140,12 +141,7 @@ pub trait Host: Send + Sync + std::fmt::Debug { .await?; if goal.requires_activation() { - self.activate( - profile, - goal, - activation_program.expect("Unknown activation program"), - ) - .await?; + self.activate(profile, goal).await?; } Ok(()) @@ -179,12 +175,7 @@ pub trait Host: Send + Sync + std::fmt::Debug { /// /// The profile must already exist on the host. You should probably use deploy instead. #[allow(unused_variables)] - async fn activate( - &mut self, - profile: &Profile, - goal: Goal, - activation_program: &StorePath, - ) -> ColmenaResult<()> { + async fn activate(&mut self, profile: &Profile, goal: Goal) -> ColmenaResult<()> { Err(ColmenaError::Unsupported) } diff --git a/src/nix/host/ssh.rs b/src/nix/host/ssh.rs index ae3952c..64ebb68 100644 --- a/src/nix/host/ssh.rs +++ b/src/nix/host/ssh.rs @@ -90,12 +90,7 @@ impl Host for Ssh { Ok(()) } - async fn activate( - &mut self, - profile: &Profile, - goal: Goal, - activation_program: &StorePath, - ) -> ColmenaResult<()> { + async fn activate(&mut self, profile: &Profile, goal: Goal) -> ColmenaResult<()> { if !goal.requires_activation() { return Err(ColmenaError::Unsupported); } @@ -106,9 +101,7 @@ impl Host for Ssh { self.run_command(set_profile).await?; } - let activation_command = profile - .activation_command(goal, activation_program) - .unwrap(); + let activation_command = profile.activation_command(goal).unwrap(); let v: Vec<&str> = activation_command.iter().map(|s| &**s).collect(); let command = self.ssh(&v); self.run_command(command).await diff --git a/src/nix/mod.rs b/src/nix/mod.rs index 000135a..2ff3307 100644 --- a/src/nix/mod.rs +++ b/src/nix/mod.rs @@ -10,8 +10,8 @@ use validator::{Validate, ValidationError as ValidationErrorType}; use crate::error::{ColmenaError, ColmenaResult}; pub mod host; -use host::Ssh; pub use host::{CopyDirection, CopyOptions, Host, RebootOptions}; +use host::{GenericHost, Ssh}; pub mod hive; pub use hive::{Hive, HivePath}; @@ -104,6 +104,8 @@ pub struct SystemTypeConfig { #[serde(rename = "activationProgram")] pub activation_program: StorePath, + + pub protocol: String, } #[derive(Debug, Clone, Validate, Deserialize)] @@ -211,6 +213,10 @@ impl NodeConfig { host }) } + + pub fn to_generic_host(&self, system_config: &SystemTypeConfig) -> GenericHost { + GenericHost::new(system_config) + } } impl NixFlags { diff --git a/src/nix/profile.rs b/src/nix/profile.rs index 752e3a4..b71c637 100644 --- a/src/nix/profile.rs +++ b/src/nix/profile.rs @@ -26,11 +26,7 @@ impl Profile { } /// Returns the command to activate this profile. - pub fn activation_command( - &self, - goal: Goal, - activation_program: &StorePath, - ) -> Option> { + pub fn activation_command(&self, goal: Goal) -> Option> { if let Some(goal) = goal.as_str() { let path = self.as_path().join("bin/switch-to-configuration"); let switch_to_configuration = path