From 596a40020bd960a9bc8ee859683c9feb5e7abc87 Mon Sep 17 00:00:00 2001 From: soyouzpanda Date: Fri, 23 Aug 2024 14:22:42 +0200 Subject: [PATCH 1/6] feat(activation): Added activation program TODO: customize activation command to launch activation program --- src/nix/deployment/mod.rs | 15 +++++++++++++-- src/nix/hive/options.nix | 6 ++++++ src/nix/host/local.rs | 11 +++++++++-- src/nix/host/mod.rs | 15 +++++++++++++-- src/nix/host/ssh.rs | 11 +++++++++-- src/nix/mod.rs | 3 +++ src/nix/profile.rs | 6 +++++- 7 files changed, 58 insertions(+), 9 deletions(-) diff --git a/src/nix/deployment/mod.rs b/src/nix/deployment/mod.rs index 7b19dca..da25324 100644 --- a/src/nix/deployment/mod.rs +++ b/src/nix/deployment/mod.rs @@ -28,7 +28,7 @@ use super::{ host::Local as LocalHost, key::{Key, UploadAt as UploadKeyAt}, ColmenaError, ColmenaResult, CopyDirection, CopyOptions, Hive, Host, NodeConfig, NodeName, - Profile, ProfileDerivation, RebootOptions, + Profile, ProfileDerivation, RebootOptions, StorePath, }; /// A deployment. @@ -637,7 +637,18 @@ impl Deployment { } } - host.activate(&profile_r, arc_self.goal).await?; + let activation_program: &StorePath = + &arc_self.hive + .get_registry_config() + .await? + .systems + .get(target.config.system_type.as_ref().unwrap_or(&"nixos".to_owned())) + .expect(&format!("System type {} does not exists", + target.config.system_type.as_ref().unwrap_or(&"nixos".to_owned()) + )) + .activation_program; + + host.activate(&profile_r, arc_self.goal, activation_program).await?; job.success_with_message(arc_self.goal.success_str().to_string())?; diff --git a/src/nix/hive/options.nix b/src/nix/hive/options.nix index a35ef81..2b65de3 100644 --- a/src/nix/hive/options.nix +++ b/src/nix/hive/options.nix @@ -258,6 +258,12 @@ with builtins; rec { type = types.functionTo types.unspecified; default = _: {}; }; + activationProgram = lib.mkOption { + description = mdDoc '' + Program to execute at activation time. + ''; + type = types.path; + }; }; }; registryOptions = { lib, ... }: let diff --git a/src/nix/host/local.rs b/src/nix/host/local.rs index a5bf0df..681394b 100644 --- a/src/nix/host/local.rs +++ b/src/nix/host/local.rs @@ -78,7 +78,12 @@ impl Host for Local { Ok(()) } - async fn activate(&mut self, profile: &Profile, goal: Goal) -> ColmenaResult<()> { + async fn activate( + &mut self, + profile: &Profile, + goal: Goal, + activation_program: &StorePath, + ) -> ColmenaResult<()> { if !goal.requires_activation() { return Err(ColmenaError::Unsupported); } @@ -91,7 +96,9 @@ impl Host for Local { } let command = { - let activation_command = profile.activation_command(goal).unwrap(); + let activation_command = profile + .activation_command(goal, activation_program) + .unwrap(); self.make_privileged_command(&activation_command) }; diff --git a/src/nix/host/mod.rs b/src/nix/host/mod.rs index 8ff44ad..531c26c 100644 --- a/src/nix/host/mod.rs +++ b/src/nix/host/mod.rs @@ -128,6 +128,7 @@ pub trait Host: Send + Sync + std::fmt::Debug { profile: &Profile, goal: Goal, copy_options: CopyOptions, + activation_program: Option<&StorePath>, ) -> ColmenaResult<()> { self.copy_closure( profile.as_store_path(), @@ -137,7 +138,12 @@ pub trait Host: Send + Sync + std::fmt::Debug { .await?; if goal.requires_activation() { - self.activate(profile, goal).await?; + self.activate( + profile, + goal, + activation_program.expect("Unknown activation program"), + ) + .await?; } Ok(()) @@ -171,7 +177,12 @@ pub trait Host: Send + Sync + std::fmt::Debug { /// /// The profile must already exist on the host. You should probably use deploy instead. #[allow(unused_variables)] - async fn activate(&mut self, profile: &Profile, goal: Goal) -> ColmenaResult<()> { + async fn activate( + &mut self, + profile: &Profile, + goal: Goal, + activation_program: &StorePath, + ) -> ColmenaResult<()> { Err(ColmenaError::Unsupported) } diff --git a/src/nix/host/ssh.rs b/src/nix/host/ssh.rs index 64ebb68..ae3952c 100644 --- a/src/nix/host/ssh.rs +++ b/src/nix/host/ssh.rs @@ -90,7 +90,12 @@ impl Host for Ssh { Ok(()) } - async fn activate(&mut self, profile: &Profile, goal: Goal) -> ColmenaResult<()> { + async fn activate( + &mut self, + profile: &Profile, + goal: Goal, + activation_program: &StorePath, + ) -> ColmenaResult<()> { if !goal.requires_activation() { return Err(ColmenaError::Unsupported); } @@ -101,7 +106,9 @@ impl Host for Ssh { self.run_command(set_profile).await?; } - let activation_command = profile.activation_command(goal).unwrap(); + let activation_command = profile + .activation_command(goal, activation_program) + .unwrap(); let v: Vec<&str> = activation_command.iter().map(|s| &**s).collect(); let command = self.ssh(&v); self.run_command(command).await diff --git a/src/nix/mod.rs b/src/nix/mod.rs index a1a0b6b..000135a 100644 --- a/src/nix/mod.rs +++ b/src/nix/mod.rs @@ -101,6 +101,9 @@ pub struct MetaConfig { pub struct SystemTypeConfig { #[serde(rename = "supportsDeployment")] pub supports_deployment: bool, + + #[serde(rename = "activationProgram")] + pub activation_program: StorePath, } #[derive(Debug, Clone, Validate, Deserialize)] diff --git a/src/nix/profile.rs b/src/nix/profile.rs index b71c637..752e3a4 100644 --- a/src/nix/profile.rs +++ b/src/nix/profile.rs @@ -26,7 +26,11 @@ impl Profile { } /// Returns the command to activate this profile. - pub fn activation_command(&self, goal: Goal) -> Option> { + pub fn activation_command( + &self, + goal: Goal, + activation_program: &StorePath, + ) -> Option> { if let Some(goal) = goal.as_str() { let path = self.as_path().join("bin/switch-to-configuration"); let switch_to_configuration = path -- 2.47.0 From ab8d8b032145e054f5ecbb508b8d33e42db6cef2 Mon Sep 17 00:00:00 2001 From: soyouzpanda Date: Fri, 23 Aug 2024 17:57:14 +0200 Subject: [PATCH 2/6] feat(generic): Added first generic host --- src/nix/host/generic.rs | 182 ++++++++++++++++++++++++++++++++++++++++ src/nix/host/mod.rs | 4 +- 2 files changed, 185 insertions(+), 1 deletion(-) create mode 100644 src/nix/host/generic.rs diff --git a/src/nix/host/generic.rs b/src/nix/host/generic.rs new file mode 100644 index 0000000..5b1075a --- /dev/null +++ b/src/nix/host/generic.rs @@ -0,0 +1,182 @@ +use std::collections::HashMap; +use std::process::Stdio; + +use async_trait::async_trait; +use serde::{Deserialize, Serialize}; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter}; +use tokio::process::Command; + +use super::{CopyDirection, CopyOptions, Host}; +use crate::error::{ColmenaError, ColmenaResult}; +use crate::job::JobHandle; +use crate::nix::{self, Profile, StorePath}; + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct TransportId(String); + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct Transport { + id: TransportId, + name: String, + long_name: String, + description: String, +} + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct GoalId(String); + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct Goal { + id: GoalId, + name: String, + long_name: String, + description: String, +} + +/// A request to the activation program +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub enum Request { + /// Asks for activation program's capabilities + Capabilities, + + /// Copy closure to/from host + CopyClosure { + transport: TransportId, + path: StorePath, + to_host: bool, + options: HashMap, + }, + + /// Deploys the profile to host + Deploy { + transport: TransportId, + goal: GoalId, + toplevel: StorePath, + options: HashMap, + }, + + /// Realizes the derivation + Realize { + transport: TransportId, + path: StorePath, + options: HashMap, + }, + + /// Uploads keys to host + UploadKeys { transport: TransportId }, +} + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub enum CapabilityResponse { + V1 { + supported_transports: Vec, + supported_goals: Vec, + }, +} + +/// A response from the activation program +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub enum Response { + Capabilities(CapabilityResponse), + Progress { phase: String }, + NewStorePath { store_path: StorePath }, +} + +#[derive(Debug)] +pub struct GenericHost { + activation_program: StorePath, + transport: TransportId, +} + +#[async_trait] +impl Host for GenericHost { + async fn copy_closure( + &mut self, + closure: &StorePath, + direction: CopyDirection, + options: CopyOptions, + ) -> ColmenaResult<()> { + self.call(Request::CopyClosure { + transport: self.transport.clone(), + path: closure.clone(), + to_host: direction == CopyDirection::ToRemote, + options: HashMap::from([( + "include_outputs".to_owned(), + if options.include_outputs { + "true".to_owned() + } else { + "false".to_owned() + }, + )]), + }) + .await + } + + async fn realize_remote(&mut self, derivation: &StorePath) -> ColmenaResult> { + self.call(Request::Realize { + transport: self.transport.clone(), + path: derivation.clone(), + options: HashMap::new(), + }) + .await?; + + Ok(Vec::new()) + } + + fn set_job(&mut self, bar: Option) {} + + async fn deploy( + &mut self, + profile: &Profile, + goal: nix::Goal, + copy_options: CopyOptions, + activation_program: Option<&StorePath>, + ) -> ColmenaResult<()> { + self.call(Request::Deploy { + transport: self.transport.clone(), + goal: GoalId(goal.to_string()), + toplevel: profile.as_store_path().clone(), + options: HashMap::new(), + }) + .await + } + + async fn get_current_system_profile(&mut self) -> ColmenaResult { + Err(ColmenaError::Unsupported) + } + + async fn get_main_system_profile(&mut self) -> ColmenaResult { + Err(ColmenaError::Unsupported) + } +} + +impl GenericHost { + async fn call(&mut self, request: Request) -> ColmenaResult<()> { + let mut command = Command::new(self.activation_program.as_path()) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn()?; + + let json = serde_json::to_string(&request).unwrap(); + + let mut stdin = BufWriter::new(command.stdin.take().unwrap()); + let mut stdout = BufReader::new(command.stdout.take().unwrap()).lines(); + + stdin.write_all(json.as_bytes()).await?; + + tokio::spawn(async move { + let _status = command + .wait() + .await + .expect("child process encountered an error"); + }); + + while let Some(line) = stdout.next_line().await? { + let response: Response = serde_json::from_str(line.as_str()).unwrap(); + println!("{:?}", response); + } + + Ok(()) + } +} diff --git a/src/nix/host/mod.rs b/src/nix/host/mod.rs index 531c26c..69c9b90 100644 --- a/src/nix/host/mod.rs +++ b/src/nix/host/mod.rs @@ -14,7 +14,9 @@ pub use local::Local; mod key_uploader; -#[derive(Copy, Clone, Debug)] +mod generic; + +#[derive(Copy, Clone, Debug, PartialEq, Eq)] pub enum CopyDirection { ToRemote, FromRemote, -- 2.47.0 From 71200e5c1780891ae543d8c09d5bfe6b56306e2d Mon Sep 17 00:00:00 2001 From: soyouzpanda Date: Sat, 24 Aug 2024 15:26:36 +0200 Subject: [PATCH 3/6] feat(generic): Implemented every function of Host to GenericHost --- src/error.rs | 3 + src/nix/deployment/mod.rs | 15 +--- src/nix/hive/eval.nix | 2 +- src/nix/hive/mod.rs | 47 +++++++----- src/nix/hive/options.nix | 9 +++ src/nix/host/generic.rs | 152 ++++++++++++++++++++++++++++---------- src/nix/host/local.rs | 11 +-- src/nix/host/mod.rs | 21 ++---- src/nix/host/ssh.rs | 11 +-- src/nix/mod.rs | 8 +- src/nix/profile.rs | 6 +- 11 files changed, 175 insertions(+), 110 deletions(-) diff --git a/src/error.rs b/src/error.rs index f0fbe11..40093b5 100644 --- a/src/error.rs +++ b/src/error.rs @@ -55,6 +55,9 @@ pub enum ColmenaError { #[snafu(display("Unexpected active profile: {:?}", profile))] ActiveProfileUnexpected { profile: Profile }, + #[snafu(display("Invalid JSON from activation program: {}", error))] + InvalidActivationProgramJson { error: serde_json::Error }, + #[snafu(display("Could not determine current profile"))] FailedToGetCurrentProfile, diff --git a/src/nix/deployment/mod.rs b/src/nix/deployment/mod.rs index da25324..7b19dca 100644 --- a/src/nix/deployment/mod.rs +++ b/src/nix/deployment/mod.rs @@ -28,7 +28,7 @@ use super::{ host::Local as LocalHost, key::{Key, UploadAt as UploadKeyAt}, ColmenaError, ColmenaResult, CopyDirection, CopyOptions, Hive, Host, NodeConfig, NodeName, - Profile, ProfileDerivation, RebootOptions, StorePath, + Profile, ProfileDerivation, RebootOptions, }; /// A deployment. @@ -637,18 +637,7 @@ impl Deployment { } } - let activation_program: &StorePath = - &arc_self.hive - .get_registry_config() - .await? - .systems - .get(target.config.system_type.as_ref().unwrap_or(&"nixos".to_owned())) - .expect(&format!("System type {} does not exists", - target.config.system_type.as_ref().unwrap_or(&"nixos".to_owned()) - )) - .activation_program; - - host.activate(&profile_r, arc_self.goal, activation_program).await?; + host.activate(&profile_r, arc_self.goal).await?; job.success_with_message(arc_self.goal.success_str().to_string())?; diff --git a/src/nix/hive/eval.nix b/src/nix/hive/eval.nix index e06fe45..3e0ebbb 100644 --- a/src/nix/hive/eval.nix +++ b/src/nix/hive/eval.nix @@ -197,7 +197,7 @@ let ]; serializableSystemTypeConfigKeys = [ - "supportsDeployment" + "supportsDeployment" "activationProgram" "protocol" ]; in rec { diff --git a/src/nix/hive/mod.rs b/src/nix/hive/mod.rs index 8e093f3..c1cd44c 100644 --- a/src/nix/hive/mod.rs +++ b/src/nix/hive/mod.rs @@ -274,24 +274,37 @@ impl Hive { for node in selected_nodes.into_iter() { let config = node_configs.remove(&node).unwrap(); - let host = config.to_ssh_host().map(|mut host| { - n_ssh += 1; - - if let Some(ssh_config) = &ssh_config { - host.set_ssh_config(ssh_config.clone()); - } - - if self.is_flake() { - host.set_use_nix3_copy(true); - } - - host.upcast() - }); - let ssh_host = host.is_some(); - let target = TargetNode::new(node.clone(), host, config); - - if !ssh_only || ssh_host { + if let Some(system_type) = config.system_type.as_ref() { + log::debug!( + "Using generic host (system_type: {}) for node {}", + system_type, + node.0 + ); + let system_config = registry.systems.get(system_type).unwrap(); + let generic_host = config.to_generic_host(system_config); + let target = TargetNode::new(node.clone(), Some(Box::new(generic_host)), config); targets.insert(node, target); + } else { + log::debug!("Using SSH host for node {}", node.0); + let host = config.to_ssh_host().map(|mut host| { + n_ssh += 1; + + if let Some(ssh_config) = &ssh_config { + host.set_ssh_config(ssh_config.clone()); + } + + if self.is_flake() { + host.set_use_nix3_copy(true); + } + + host.upcast() + }); + let ssh_host = host.is_some(); + let target = TargetNode::new(node.clone(), host, config); + + if !ssh_only || ssh_host { + targets.insert(node, target); + } } } diff --git a/src/nix/hive/options.nix b/src/nix/hive/options.nix index 2b65de3..bba966c 100644 --- a/src/nix/hive/options.nix +++ b/src/nix/hive/options.nix @@ -264,6 +264,15 @@ with builtins; rec { ''; type = types.path; }; + protocol = lib.mkOption { + type = types.str; + default = "local"; + description = mdDoc '' + Which protocol to use to deploy. + + It can be ssh, local, etc. but strongly depends on `activationProgram` + ''; + }; }; }; registryOptions = { lib, ... }: let diff --git a/src/nix/host/generic.rs b/src/nix/host/generic.rs index 5b1075a..3cb25d2 100644 --- a/src/nix/host/generic.rs +++ b/src/nix/host/generic.rs @@ -6,35 +6,31 @@ use serde::{Deserialize, Serialize}; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter}; use tokio::process::Command; -use super::{CopyDirection, CopyOptions, Host}; +use super::{CopyDirection, CopyOptions, Host, RebootOptions}; use crate::error::{ColmenaError, ColmenaResult}; use crate::job::JobHandle; -use crate::nix::{self, Profile, StorePath}; +use crate::nix::{self, Key, Profile, StorePath, SystemTypeConfig}; -#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] -pub struct TransportId(String); +pub type TransportId = String; #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] pub struct Transport { id: TransportId, - name: String, long_name: String, description: String, } -#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] -pub struct GoalId(String); +pub type GoalId = String; #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] pub struct Goal { id: GoalId, - name: String, long_name: String, description: String, } /// A request to the activation program -#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub enum Request { /// Asks for activation program's capabilities Capabilities, @@ -44,7 +40,7 @@ pub enum Request { transport: TransportId, path: StorePath, to_host: bool, - options: HashMap, + options: CopyOptions, }, /// Deploys the profile to host @@ -52,18 +48,32 @@ pub enum Request { transport: TransportId, goal: GoalId, toplevel: StorePath, - options: HashMap, + options: CopyOptions, }, /// Realizes the derivation Realize { transport: TransportId, path: StorePath, - options: HashMap, }, /// Uploads keys to host - UploadKeys { transport: TransportId }, + UploadKeys { + transport: TransportId, + keys: HashMap, + require_ownership: bool, + }, + + Activate { + transport: TransportId, + profile: StorePath, + goal: GoalId, + }, + + Reboot { + transport: TransportId, + wait_for_boot: bool, + }, } #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] @@ -96,47 +106,61 @@ impl Host for GenericHost { direction: CopyDirection, options: CopyOptions, ) -> ColmenaResult<()> { - self.call(Request::CopyClosure { + self.call_default_handler(Request::CopyClosure { transport: self.transport.clone(), path: closure.clone(), to_host: direction == CopyDirection::ToRemote, - options: HashMap::from([( - "include_outputs".to_owned(), - if options.include_outputs { - "true".to_owned() - } else { - "false".to_owned() - }, - )]), + options, }) .await } async fn realize_remote(&mut self, derivation: &StorePath) -> ColmenaResult> { - self.call(Request::Realize { - transport: self.transport.clone(), - path: derivation.clone(), - options: HashMap::new(), - }) - .await?; - - Ok(Vec::new()) + Ok(self + .call( + Request::Realize { + transport: self.transport.clone(), + path: derivation.clone(), + }, + move |response, mut store_paths| { + match response { + Response::Progress { phase } => println!("{}", phase), + Response::NewStorePath { store_path } => store_paths.push(store_path), + _ => (), + }; + store_paths + }, + Vec::new(), + ) + .await?) } - fn set_job(&mut self, bar: Option) {} + fn set_job(&mut self, _: Option) {} async fn deploy( &mut self, profile: &Profile, goal: nix::Goal, copy_options: CopyOptions, - activation_program: Option<&StorePath>, ) -> ColmenaResult<()> { - self.call(Request::Deploy { + self.call_default_handler(Request::Deploy { transport: self.transport.clone(), - goal: GoalId(goal.to_string()), + goal: goal.to_string(), toplevel: profile.as_store_path().clone(), - options: HashMap::new(), + options: copy_options, + }) + .await + } + + async fn upload_keys( + &mut self, + keys: &HashMap, + require_ownership: bool, + ) -> ColmenaResult<()> { + self.call_default_handler(Request::UploadKeys { + transport: self.transport.clone(), + keys: keys.clone(), + require_ownership, }) .await } @@ -148,17 +172,50 @@ impl Host for GenericHost { async fn get_main_system_profile(&mut self) -> ColmenaResult { Err(ColmenaError::Unsupported) } + + async fn activate(&mut self, profile: &Profile, goal: nix::Goal) -> ColmenaResult<()> { + self.call_default_handler(Request::Activate { + transport: self.transport.clone(), + profile: profile.as_store_path().clone(), + goal: goal.to_string(), + }) + .await + } + + async fn reboot(&mut self, options: RebootOptions) -> ColmenaResult<()> { + self.call_default_handler(Request::Reboot { + transport: self.transport.clone(), + wait_for_boot: options.wait_for_boot, + }) + .await + } } impl GenericHost { - async fn call(&mut self, request: Request) -> ColmenaResult<()> { + pub fn new(system: &SystemTypeConfig) -> GenericHost { + Self { + activation_program: system.activation_program.clone(), + transport: system.protocol.clone(), + } + } + + async fn call( + &mut self, + request: Request, + handler: F, + initial_value: T, + ) -> ColmenaResult + where + F: Fn(Response, T) -> T, + { let mut command = Command::new(self.activation_program.as_path()) .stdin(Stdio::piped()) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .spawn()?; - let json = serde_json::to_string(&request).unwrap(); + let json = serde_json::to_string(&request) + .map_err(|error| ColmenaError::InvalidActivationProgramJson { error })?; let mut stdin = BufWriter::new(command.stdin.take().unwrap()); let mut stdout = BufReader::new(command.stdout.take().unwrap()).lines(); @@ -172,11 +229,26 @@ impl GenericHost { .expect("child process encountered an error"); }); + let mut value = initial_value; + while let Some(line) = stdout.next_line().await? { - let response: Response = serde_json::from_str(line.as_str()).unwrap(); - println!("{:?}", response); + let response: Response = serde_json::from_str(line.as_str()) + .map_err(|error| ColmenaError::InvalidActivationProgramJson { error })?; + value = handler(response, value); } - Ok(()) + Ok(value) + } + + async fn call_default_handler(&mut self, request: Request) -> ColmenaResult<()> { + self.call( + request, + |response, _| match response { + Response::Progress { phase } => println!("{phase}"), + _ => (), + }, + (), + ) + .await } } diff --git a/src/nix/host/local.rs b/src/nix/host/local.rs index 681394b..a5bf0df 100644 --- a/src/nix/host/local.rs +++ b/src/nix/host/local.rs @@ -78,12 +78,7 @@ impl Host for Local { Ok(()) } - async fn activate( - &mut self, - profile: &Profile, - goal: Goal, - activation_program: &StorePath, - ) -> ColmenaResult<()> { + async fn activate(&mut self, profile: &Profile, goal: Goal) -> ColmenaResult<()> { if !goal.requires_activation() { return Err(ColmenaError::Unsupported); } @@ -96,9 +91,7 @@ impl Host for Local { } let command = { - let activation_command = profile - .activation_command(goal, activation_program) - .unwrap(); + let activation_command = profile.activation_command(goal).unwrap(); self.make_privileged_command(&activation_command) }; diff --git a/src/nix/host/mod.rs b/src/nix/host/mod.rs index 69c9b90..8d8a40b 100644 --- a/src/nix/host/mod.rs +++ b/src/nix/host/mod.rs @@ -1,6 +1,7 @@ use std::collections::HashMap; use async_trait::async_trait; +use serde::{Deserialize, Serialize}; use super::{Goal, Key, Profile, StorePath}; use crate::error::{ColmenaError, ColmenaResult}; @@ -15,14 +16,15 @@ pub use local::Local; mod key_uploader; mod generic; +pub use generic::GenericHost; -#[derive(Copy, Clone, Debug, PartialEq, Eq)] +#[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] pub enum CopyDirection { ToRemote, FromRemote, } -#[derive(Copy, Clone, Debug)] +#[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] pub struct CopyOptions { include_outputs: bool, use_substitutes: bool, @@ -130,7 +132,6 @@ pub trait Host: Send + Sync + std::fmt::Debug { profile: &Profile, goal: Goal, copy_options: CopyOptions, - activation_program: Option<&StorePath>, ) -> ColmenaResult<()> { self.copy_closure( profile.as_store_path(), @@ -140,12 +141,7 @@ pub trait Host: Send + Sync + std::fmt::Debug { .await?; if goal.requires_activation() { - self.activate( - profile, - goal, - activation_program.expect("Unknown activation program"), - ) - .await?; + self.activate(profile, goal).await?; } Ok(()) @@ -179,12 +175,7 @@ pub trait Host: Send + Sync + std::fmt::Debug { /// /// The profile must already exist on the host. You should probably use deploy instead. #[allow(unused_variables)] - async fn activate( - &mut self, - profile: &Profile, - goal: Goal, - activation_program: &StorePath, - ) -> ColmenaResult<()> { + async fn activate(&mut self, profile: &Profile, goal: Goal) -> ColmenaResult<()> { Err(ColmenaError::Unsupported) } diff --git a/src/nix/host/ssh.rs b/src/nix/host/ssh.rs index ae3952c..64ebb68 100644 --- a/src/nix/host/ssh.rs +++ b/src/nix/host/ssh.rs @@ -90,12 +90,7 @@ impl Host for Ssh { Ok(()) } - async fn activate( - &mut self, - profile: &Profile, - goal: Goal, - activation_program: &StorePath, - ) -> ColmenaResult<()> { + async fn activate(&mut self, profile: &Profile, goal: Goal) -> ColmenaResult<()> { if !goal.requires_activation() { return Err(ColmenaError::Unsupported); } @@ -106,9 +101,7 @@ impl Host for Ssh { self.run_command(set_profile).await?; } - let activation_command = profile - .activation_command(goal, activation_program) - .unwrap(); + let activation_command = profile.activation_command(goal).unwrap(); let v: Vec<&str> = activation_command.iter().map(|s| &**s).collect(); let command = self.ssh(&v); self.run_command(command).await diff --git a/src/nix/mod.rs b/src/nix/mod.rs index 000135a..2ff3307 100644 --- a/src/nix/mod.rs +++ b/src/nix/mod.rs @@ -10,8 +10,8 @@ use validator::{Validate, ValidationError as ValidationErrorType}; use crate::error::{ColmenaError, ColmenaResult}; pub mod host; -use host::Ssh; pub use host::{CopyDirection, CopyOptions, Host, RebootOptions}; +use host::{GenericHost, Ssh}; pub mod hive; pub use hive::{Hive, HivePath}; @@ -104,6 +104,8 @@ pub struct SystemTypeConfig { #[serde(rename = "activationProgram")] pub activation_program: StorePath, + + pub protocol: String, } #[derive(Debug, Clone, Validate, Deserialize)] @@ -211,6 +213,10 @@ impl NodeConfig { host }) } + + pub fn to_generic_host(&self, system_config: &SystemTypeConfig) -> GenericHost { + GenericHost::new(system_config) + } } impl NixFlags { diff --git a/src/nix/profile.rs b/src/nix/profile.rs index 752e3a4..b71c637 100644 --- a/src/nix/profile.rs +++ b/src/nix/profile.rs @@ -26,11 +26,7 @@ impl Profile { } /// Returns the command to activate this profile. - pub fn activation_command( - &self, - goal: Goal, - activation_program: &StorePath, - ) -> Option> { + pub fn activation_command(&self, goal: Goal) -> Option> { if let Some(goal) = goal.as_str() { let path = self.as_path().join("bin/switch-to-configuration"); let switch_to_configuration = path -- 2.47.0 From c53d80aa081567a71b328f2ab4d6ad0b8d8aecfb Mon Sep 17 00:00:00 2001 From: soyouzpanda Date: Thu, 3 Oct 2024 18:42:52 +0200 Subject: [PATCH 4/6] feat: Oui --- Cargo.toml | 1 + src/nix/hive/mod.rs | 3 +- src/nix/hive/options.nix | 7 +++ src/nix/host/generic.rs | 93 ++++++++++++++++++++++++---------------- src/nix/mod.rs | 7 ++- 5 files changed, 72 insertions(+), 39 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 745af53..dad0f43 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,6 +44,7 @@ version = "1.28.1" features = [ "fs", "io-util", + "io-std", "macros", "process", "rt", diff --git a/src/nix/hive/mod.rs b/src/nix/hive/mod.rs index c1cd44c..5a385c2 100644 --- a/src/nix/hive/mod.rs +++ b/src/nix/hive/mod.rs @@ -281,7 +281,8 @@ impl Hive { node.0 ); let system_config = registry.systems.get(system_type).unwrap(); - let generic_host = config.to_generic_host(system_config); + let mut generic_host = config.to_generic_host(system_config)?; + generic_host.connect().await?; let target = TargetNode::new(node.clone(), Some(Box::new(generic_host)), config); targets.insert(node, target); } else { diff --git a/src/nix/hive/options.nix b/src/nix/hive/options.nix index bba966c..9f97d25 100644 --- a/src/nix/hive/options.nix +++ b/src/nix/hive/options.nix @@ -134,6 +134,13 @@ with builtins; rec { type = types.nullOr types.str; default = "root"; }; + connectionOptions = lib.mkOption { + description = mdDoc '' + Connection options given to the activation program. + ''; + type = types.attrsOf types.str; + default = { }; + }; allowLocalDeployment = lib.mkOption { description = mdDoc '' Allow the configuration to be applied locally on the host running diff --git a/src/nix/host/generic.rs b/src/nix/host/generic.rs index 3cb25d2..c203402 100644 --- a/src/nix/host/generic.rs +++ b/src/nix/host/generic.rs @@ -4,7 +4,7 @@ use std::process::Stdio; use async_trait::async_trait; use serde::{Deserialize, Serialize}; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter}; -use tokio::process::Command; +use tokio::process::{Child, ChildStdin, ChildStdout, Command}; use super::{CopyDirection, CopyOptions, Host, RebootOptions}; use crate::error::{ColmenaError, ColmenaResult}; @@ -35,9 +35,12 @@ pub enum Request { /// Asks for activation program's capabilities Capabilities, + Connection { + connection_options: HashMap, + }, + /// Copy closure to/from host CopyClosure { - transport: TransportId, path: StorePath, to_host: bool, options: CopyOptions, @@ -45,7 +48,6 @@ pub enum Request { /// Deploys the profile to host Deploy { - transport: TransportId, goal: GoalId, toplevel: StorePath, options: CopyOptions, @@ -53,25 +55,21 @@ pub enum Request { /// Realizes the derivation Realize { - transport: TransportId, path: StorePath, }, /// Uploads keys to host UploadKeys { - transport: TransportId, keys: HashMap, require_ownership: bool, }, Activate { - transport: TransportId, profile: StorePath, goal: GoalId, }, Reboot { - transport: TransportId, wait_for_boot: bool, }, } @@ -90,12 +88,20 @@ pub enum Response { Capabilities(CapabilityResponse), Progress { phase: String }, NewStorePath { store_path: StorePath }, + Failed { error: String }, +} + +#[derive(Debug)] +pub struct OpenedCommand { + command: Child, + stdin: BufWriter, + stdout: BufReader, } #[derive(Debug)] pub struct GenericHost { - activation_program: StorePath, - transport: TransportId, + activation_program: OpenedCommand, + connection_options: HashMap, } #[async_trait] @@ -107,7 +113,6 @@ impl Host for GenericHost { options: CopyOptions, ) -> ColmenaResult<()> { self.call_default_handler(Request::CopyClosure { - transport: self.transport.clone(), path: closure.clone(), to_host: direction == CopyDirection::ToRemote, options, @@ -119,7 +124,6 @@ impl Host for GenericHost { Ok(self .call( Request::Realize { - transport: self.transport.clone(), path: derivation.clone(), }, move |response, mut store_paths| { @@ -144,7 +148,6 @@ impl Host for GenericHost { copy_options: CopyOptions, ) -> ColmenaResult<()> { self.call_default_handler(Request::Deploy { - transport: self.transport.clone(), goal: goal.to_string(), toplevel: profile.as_store_path().clone(), options: copy_options, @@ -158,7 +161,6 @@ impl Host for GenericHost { require_ownership: bool, ) -> ColmenaResult<()> { self.call_default_handler(Request::UploadKeys { - transport: self.transport.clone(), keys: keys.clone(), require_ownership, }) @@ -175,7 +177,6 @@ impl Host for GenericHost { async fn activate(&mut self, profile: &Profile, goal: nix::Goal) -> ColmenaResult<()> { self.call_default_handler(Request::Activate { - transport: self.transport.clone(), profile: profile.as_store_path().clone(), goal: goal.to_string(), }) @@ -184,7 +185,6 @@ impl Host for GenericHost { async fn reboot(&mut self, options: RebootOptions) -> ColmenaResult<()> { self.call_default_handler(Request::Reboot { - transport: self.transport.clone(), wait_for_boot: options.wait_for_boot, }) .await @@ -192,11 +192,36 @@ impl Host for GenericHost { } impl GenericHost { - pub fn new(system: &SystemTypeConfig) -> GenericHost { - Self { - activation_program: system.activation_program.clone(), - transport: system.protocol.clone(), - } + pub fn new( + system: &SystemTypeConfig, + connection_options: &HashMap, + ) -> ColmenaResult { + let mut command = Command::new(system.activation_program.as_path()) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::inherit()) + .spawn()?; + + let stdin = BufWriter::new(command.stdin.take().unwrap()); + let stdout = BufReader::new(command.stdout.take().unwrap()); + + let activation_program = OpenedCommand { + command, + stdin, + stdout, + }; + + Ok(Self { + activation_program, + connection_options: connection_options.clone(), + }) + } + + pub async fn connect(&mut self) -> ColmenaResult<()> { + self.call_default_handler(Request::Connection { + connection_options: self.connection_options.clone(), + }) + .await } async fn call( @@ -208,30 +233,26 @@ impl GenericHost { where F: Fn(Response, T) -> T, { - let mut command = Command::new(self.activation_program.as_path()) - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .spawn()?; - let json = serde_json::to_string(&request) .map_err(|error| ColmenaError::InvalidActivationProgramJson { error })?; - let mut stdin = BufWriter::new(command.stdin.take().unwrap()); - let mut stdout = BufReader::new(command.stdout.take().unwrap()).lines(); + log::trace!("giving to activation program stdin {}", json.as_str()); + + let stdin = &mut self.activation_program.stdin; + let stdout = &mut self.activation_program.stdout; stdin.write_all(json.as_bytes()).await?; + stdin.write_all(b"\n").await?; + stdin.flush().await?; - tokio::spawn(async move { - let _status = command - .wait() - .await - .expect("child process encountered an error"); - }); - + let mut line = String::new(); let mut value = initial_value; - while let Some(line) = stdout.next_line().await? { + while stdout.read_line(&mut line).await.is_ok_and(|x| x != 0) { + if line == "\n" { + break; + } + log::trace!("receiving from activation program:\n{}", line); let response: Response = serde_json::from_str(line.as_str()) .map_err(|error| ColmenaError::InvalidActivationProgramJson { error })?; value = handler(response, value); diff --git a/src/nix/mod.rs b/src/nix/mod.rs index 2ff3307..b2a8204 100644 --- a/src/nix/mod.rs +++ b/src/nix/mod.rs @@ -67,6 +67,9 @@ pub struct NodeConfig { #[serde(rename = "targetPort")] target_port: Option, + #[serde(rename = "connectionOptions")] + connection_options: HashMap, + #[serde(rename = "allowLocalDeployment")] allow_local_deployment: bool, @@ -214,8 +217,8 @@ impl NodeConfig { }) } - pub fn to_generic_host(&self, system_config: &SystemTypeConfig) -> GenericHost { - GenericHost::new(system_config) + pub fn to_generic_host(&self, system_config: &SystemTypeConfig) -> ColmenaResult { + GenericHost::new(system_config, &self.connection_options) } } -- 2.47.0 From e956ae403b925dccb1561bf7c9a178354f3f816c Mon Sep 17 00:00:00 2001 From: soyouzpanda Date: Sun, 6 Oct 2024 01:32:25 +0200 Subject: [PATCH 5/6] feat: connectionOptions -> connectionUri & removed useless options --- src/nix/hive/options.nix | 15 +++-------- src/nix/host/generic.rs | 58 +++++++++++++++++----------------------- src/nix/mod.rs | 8 +++--- 3 files changed, 30 insertions(+), 51 deletions(-) diff --git a/src/nix/hive/options.nix b/src/nix/hive/options.nix index 9f97d25..c1c28c6 100644 --- a/src/nix/hive/options.nix +++ b/src/nix/hive/options.nix @@ -134,12 +134,12 @@ with builtins; rec { type = types.nullOr types.str; default = "root"; }; - connectionOptions = lib.mkOption { + connectionUri = lib.mkOption { description = mdDoc '' Connection options given to the activation program. ''; - type = types.attrsOf types.str; - default = { }; + type = types.str; + default = "ssh://localhost"; }; allowLocalDeployment = lib.mkOption { description = mdDoc '' @@ -271,15 +271,6 @@ with builtins; rec { ''; type = types.path; }; - protocol = lib.mkOption { - type = types.str; - default = "local"; - description = mdDoc '' - Which protocol to use to deploy. - - It can be ssh, local, etc. but strongly depends on `activationProgram` - ''; - }; }; }; registryOptions = { lib, ... }: let diff --git a/src/nix/host/generic.rs b/src/nix/host/generic.rs index c203402..e58b6ba 100644 --- a/src/nix/host/generic.rs +++ b/src/nix/host/generic.rs @@ -3,7 +3,7 @@ use std::process::Stdio; use async_trait::async_trait; use serde::{Deserialize, Serialize}; -use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter}; +use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader, BufWriter}; use tokio::process::{Child, ChildStdin, ChildStdout, Command}; use super::{CopyDirection, CopyOptions, Host, RebootOptions}; @@ -32,11 +32,8 @@ pub struct Goal { /// A request to the activation program #[derive(Clone, Debug, Serialize, Deserialize)] pub enum Request { - /// Asks for activation program's capabilities - Capabilities, - Connection { - connection_options: HashMap, + connection_uri: String, }, /// Copy closure to/from host @@ -74,25 +71,19 @@ pub enum Request { }, } -#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] -pub enum CapabilityResponse { - V1 { - supported_transports: Vec, - supported_goals: Vec, - }, -} - /// A response from the activation program #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] pub enum Response { - Capabilities(CapabilityResponse), + ConnectionFailed { error: String }, + ConnectionSucceded { supported_goals: Vec }, Progress { phase: String }, NewStorePath { store_path: StorePath }, Failed { error: String }, + Unsupported, } #[derive(Debug)] -pub struct OpenedCommand { +pub struct ActivationCommand { command: Child, stdin: BufWriter, stdout: BufReader, @@ -100,8 +91,8 @@ pub struct OpenedCommand { #[derive(Debug)] pub struct GenericHost { - activation_program: OpenedCommand, - connection_options: HashMap, + activation_program: ActivationCommand, + connection_uri: String, } #[async_trait] @@ -192,10 +183,7 @@ impl Host for GenericHost { } impl GenericHost { - pub fn new( - system: &SystemTypeConfig, - connection_options: &HashMap, - ) -> ColmenaResult { + pub fn new(system: &SystemTypeConfig, connection_uri: String) -> ColmenaResult { let mut command = Command::new(system.activation_program.as_path()) .stdin(Stdio::piped()) .stdout(Stdio::piped()) @@ -205,7 +193,7 @@ impl GenericHost { let stdin = BufWriter::new(command.stdin.take().unwrap()); let stdout = BufReader::new(command.stdout.take().unwrap()); - let activation_program = OpenedCommand { + let activation_program = ActivationCommand { command, stdin, stdout, @@ -213,13 +201,13 @@ impl GenericHost { Ok(Self { activation_program, - connection_options: connection_options.clone(), + connection_uri, }) } pub async fn connect(&mut self) -> ColmenaResult<()> { self.call_default_handler(Request::Connection { - connection_options: self.connection_options.clone(), + connection_uri: self.connection_uri.clone(), }) .await } @@ -248,28 +236,30 @@ impl GenericHost { let mut line = String::new(); let mut value = initial_value; + // We're reading JSONL, so we can read all the line and parse it while stdout.read_line(&mut line).await.is_ok_and(|x| x != 0) { if line == "\n" { + log::trace!("finished receiving responses from activation program"); break; } log::trace!("receiving from activation program:\n{}", line); let response: Response = serde_json::from_str(line.as_str()) .map_err(|error| ColmenaError::InvalidActivationProgramJson { error })?; - value = handler(response, value); + + match response { + Response::Progress { phase } => { + log::info!("{phase}"); + break; + } + Response::Unsupported => return Err(ColmenaError::Unsupported), + _ => value = handler(response, value), + } } Ok(value) } async fn call_default_handler(&mut self, request: Request) -> ColmenaResult<()> { - self.call( - request, - |response, _| match response { - Response::Progress { phase } => println!("{phase}"), - _ => (), - }, - (), - ) - .await + self.call(request, |_, _| {}, ()).await } } diff --git a/src/nix/mod.rs b/src/nix/mod.rs index b2a8204..fd70c9c 100644 --- a/src/nix/mod.rs +++ b/src/nix/mod.rs @@ -67,8 +67,8 @@ pub struct NodeConfig { #[serde(rename = "targetPort")] target_port: Option, - #[serde(rename = "connectionOptions")] - connection_options: HashMap, + #[serde(rename = "connectionUri")] + connection_uri: String, #[serde(rename = "allowLocalDeployment")] allow_local_deployment: bool, @@ -107,8 +107,6 @@ pub struct SystemTypeConfig { #[serde(rename = "activationProgram")] pub activation_program: StorePath, - - pub protocol: String, } #[derive(Debug, Clone, Validate, Deserialize)] @@ -218,7 +216,7 @@ impl NodeConfig { } pub fn to_generic_host(&self, system_config: &SystemTypeConfig) -> ColmenaResult { - GenericHost::new(system_config, &self.connection_options) + GenericHost::new(system_config, self.connection_uri.clone()) } } -- 2.47.0 From 94b1a11326ad21b11e3b21c038daa498d8bb215b Mon Sep 17 00:00:00 2001 From: soyouzpanda Date: Thu, 10 Oct 2024 21:04:01 +0200 Subject: [PATCH 6/6] feat: create colmena library --- Cargo.toml | 8 ++++++++ src/lib.rs | 16 ++++++++++++++++ src/nix/host/generic.rs | 2 +- src/nix/host/mod.rs | 2 +- 4 files changed, 26 insertions(+), 2 deletions(-) create mode 100644 src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index dad0f43..c5b8610 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,3 +51,11 @@ features = [ "rt-multi-thread", "sync", ] + +[[bin]] +name = "colmena" +path = "src/main.rs" + +[lib] +name = "colmena" +path = "src/lib.rs" diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..f1ac46a --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,16 @@ +#![allow(dead_code)] + +mod cli; +mod command; +mod error; +mod job; +mod nix; +mod progress; +mod troubleshooter; +mod util; + +pub use nix::host::generic::Request as GenericRequest; +pub use nix::host::generic::Response as GenericResponse; +pub use nix::host::CopyOptions; +pub use nix::key::{Key, UploadAt}; +pub use nix::store::StorePath; diff --git a/src/nix/host/generic.rs b/src/nix/host/generic.rs index e58b6ba..29d8516 100644 --- a/src/nix/host/generic.rs +++ b/src/nix/host/generic.rs @@ -3,7 +3,7 @@ use std::process::Stdio; use async_trait::async_trait; use serde::{Deserialize, Serialize}; -use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader, BufWriter}; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter}; use tokio::process::{Child, ChildStdin, ChildStdout, Command}; use super::{CopyDirection, CopyOptions, Host, RebootOptions}; diff --git a/src/nix/host/mod.rs b/src/nix/host/mod.rs index 8d8a40b..3cc35f6 100644 --- a/src/nix/host/mod.rs +++ b/src/nix/host/mod.rs @@ -15,7 +15,7 @@ pub use local::Local; mod key_uploader; -mod generic; +pub mod generic; pub use generic::GenericHost; #[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] -- 2.47.0