diff --git a/Cargo.toml b/Cargo.toml index 745af53..dad0f43 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,6 +44,7 @@ version = "1.28.1" features = [ "fs", "io-util", + "io-std", "macros", "process", "rt", diff --git a/src/nix/hive/mod.rs b/src/nix/hive/mod.rs index c1cd44c..5a385c2 100644 --- a/src/nix/hive/mod.rs +++ b/src/nix/hive/mod.rs @@ -281,7 +281,8 @@ impl Hive { node.0 ); let system_config = registry.systems.get(system_type).unwrap(); - let generic_host = config.to_generic_host(system_config); + let mut generic_host = config.to_generic_host(system_config)?; + generic_host.connect().await?; let target = TargetNode::new(node.clone(), Some(Box::new(generic_host)), config); targets.insert(node, target); } else { diff --git a/src/nix/hive/options.nix b/src/nix/hive/options.nix index bba966c..9f97d25 100644 --- a/src/nix/hive/options.nix +++ b/src/nix/hive/options.nix @@ -134,6 +134,13 @@ with builtins; rec { type = types.nullOr types.str; default = "root"; }; + connectionOptions = lib.mkOption { + description = mdDoc '' + Connection options given to the activation program. + ''; + type = types.attrsOf types.str; + default = { }; + }; allowLocalDeployment = lib.mkOption { description = mdDoc '' Allow the configuration to be applied locally on the host running diff --git a/src/nix/host/generic.rs b/src/nix/host/generic.rs index 3cb25d2..c203402 100644 --- a/src/nix/host/generic.rs +++ b/src/nix/host/generic.rs @@ -4,7 +4,7 @@ use std::process::Stdio; use async_trait::async_trait; use serde::{Deserialize, Serialize}; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter}; -use tokio::process::Command; +use tokio::process::{Child, ChildStdin, ChildStdout, Command}; use super::{CopyDirection, CopyOptions, Host, RebootOptions}; use crate::error::{ColmenaError, ColmenaResult}; @@ -35,9 +35,12 @@ pub enum Request { /// Asks for activation program's capabilities Capabilities, + Connection { + connection_options: HashMap, + }, + /// Copy closure to/from host CopyClosure { - transport: TransportId, path: StorePath, to_host: bool, options: CopyOptions, @@ -45,7 +48,6 @@ pub enum Request { /// Deploys the profile to host Deploy { - transport: TransportId, goal: GoalId, toplevel: StorePath, options: CopyOptions, @@ -53,25 +55,21 @@ pub enum Request { /// Realizes the derivation Realize { - transport: TransportId, path: StorePath, }, /// Uploads keys to host UploadKeys { - transport: TransportId, keys: HashMap, require_ownership: bool, }, Activate { - transport: TransportId, profile: StorePath, goal: GoalId, }, Reboot { - transport: TransportId, wait_for_boot: bool, }, } @@ -90,12 +88,20 @@ pub enum Response { Capabilities(CapabilityResponse), Progress { phase: String }, NewStorePath { store_path: StorePath }, + Failed { error: String }, +} + +#[derive(Debug)] +pub struct OpenedCommand { + command: Child, + stdin: BufWriter, + stdout: BufReader, } #[derive(Debug)] pub struct GenericHost { - activation_program: StorePath, - transport: TransportId, + activation_program: OpenedCommand, + connection_options: HashMap, } #[async_trait] @@ -107,7 +113,6 @@ impl Host for GenericHost { options: CopyOptions, ) -> ColmenaResult<()> { self.call_default_handler(Request::CopyClosure { - transport: self.transport.clone(), path: closure.clone(), to_host: direction == CopyDirection::ToRemote, options, @@ -119,7 +124,6 @@ impl Host for GenericHost { Ok(self .call( Request::Realize { - transport: self.transport.clone(), path: derivation.clone(), }, move |response, mut store_paths| { @@ -144,7 +148,6 @@ impl Host for GenericHost { copy_options: CopyOptions, ) -> ColmenaResult<()> { self.call_default_handler(Request::Deploy { - transport: self.transport.clone(), goal: goal.to_string(), toplevel: profile.as_store_path().clone(), options: copy_options, @@ -158,7 +161,6 @@ impl Host for GenericHost { require_ownership: bool, ) -> ColmenaResult<()> { self.call_default_handler(Request::UploadKeys { - transport: self.transport.clone(), keys: keys.clone(), require_ownership, }) @@ -175,7 +177,6 @@ impl Host for GenericHost { async fn activate(&mut self, profile: &Profile, goal: nix::Goal) -> ColmenaResult<()> { self.call_default_handler(Request::Activate { - transport: self.transport.clone(), profile: profile.as_store_path().clone(), goal: goal.to_string(), }) @@ -184,7 +185,6 @@ impl Host for GenericHost { async fn reboot(&mut self, options: RebootOptions) -> ColmenaResult<()> { self.call_default_handler(Request::Reboot { - transport: self.transport.clone(), wait_for_boot: options.wait_for_boot, }) .await @@ -192,11 +192,36 @@ impl Host for GenericHost { } impl GenericHost { - pub fn new(system: &SystemTypeConfig) -> GenericHost { - Self { - activation_program: system.activation_program.clone(), - transport: system.protocol.clone(), - } + pub fn new( + system: &SystemTypeConfig, + connection_options: &HashMap, + ) -> ColmenaResult { + let mut command = Command::new(system.activation_program.as_path()) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::inherit()) + .spawn()?; + + let stdin = BufWriter::new(command.stdin.take().unwrap()); + let stdout = BufReader::new(command.stdout.take().unwrap()); + + let activation_program = OpenedCommand { + command, + stdin, + stdout, + }; + + Ok(Self { + activation_program, + connection_options: connection_options.clone(), + }) + } + + pub async fn connect(&mut self) -> ColmenaResult<()> { + self.call_default_handler(Request::Connection { + connection_options: self.connection_options.clone(), + }) + .await } async fn call( @@ -208,30 +233,26 @@ impl GenericHost { where F: Fn(Response, T) -> T, { - let mut command = Command::new(self.activation_program.as_path()) - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .spawn()?; - let json = serde_json::to_string(&request) .map_err(|error| ColmenaError::InvalidActivationProgramJson { error })?; - let mut stdin = BufWriter::new(command.stdin.take().unwrap()); - let mut stdout = BufReader::new(command.stdout.take().unwrap()).lines(); + log::trace!("giving to activation program stdin {}", json.as_str()); + + let stdin = &mut self.activation_program.stdin; + let stdout = &mut self.activation_program.stdout; stdin.write_all(json.as_bytes()).await?; + stdin.write_all(b"\n").await?; + stdin.flush().await?; - tokio::spawn(async move { - let _status = command - .wait() - .await - .expect("child process encountered an error"); - }); - + let mut line = String::new(); let mut value = initial_value; - while let Some(line) = stdout.next_line().await? { + while stdout.read_line(&mut line).await.is_ok_and(|x| x != 0) { + if line == "\n" { + break; + } + log::trace!("receiving from activation program:\n{}", line); let response: Response = serde_json::from_str(line.as_str()) .map_err(|error| ColmenaError::InvalidActivationProgramJson { error })?; value = handler(response, value); diff --git a/src/nix/mod.rs b/src/nix/mod.rs index 2ff3307..b2a8204 100644 --- a/src/nix/mod.rs +++ b/src/nix/mod.rs @@ -67,6 +67,9 @@ pub struct NodeConfig { #[serde(rename = "targetPort")] target_port: Option, + #[serde(rename = "connectionOptions")] + connection_options: HashMap, + #[serde(rename = "allowLocalDeployment")] allow_local_deployment: bool, @@ -214,8 +217,8 @@ impl NodeConfig { }) } - pub fn to_generic_host(&self, system_config: &SystemTypeConfig) -> GenericHost { - GenericHost::new(system_config) + pub fn to_generic_host(&self, system_config: &SystemTypeConfig) -> ColmenaResult { + GenericHost::new(system_config, &self.connection_options) } }