From e956ae403b925dccb1561bf7c9a178354f3f816c Mon Sep 17 00:00:00 2001 From: soyouzpanda Date: Sun, 6 Oct 2024 01:32:25 +0200 Subject: [PATCH] feat: connectionOptions -> connectionUri & removed useless options --- src/nix/hive/options.nix | 15 +++-------- src/nix/host/generic.rs | 58 +++++++++++++++++----------------------- src/nix/mod.rs | 8 +++--- 3 files changed, 30 insertions(+), 51 deletions(-) diff --git a/src/nix/hive/options.nix b/src/nix/hive/options.nix index 9f97d25..c1c28c6 100644 --- a/src/nix/hive/options.nix +++ b/src/nix/hive/options.nix @@ -134,12 +134,12 @@ with builtins; rec { type = types.nullOr types.str; default = "root"; }; - connectionOptions = lib.mkOption { + connectionUri = lib.mkOption { description = mdDoc '' Connection options given to the activation program. ''; - type = types.attrsOf types.str; - default = { }; + type = types.str; + default = "ssh://localhost"; }; allowLocalDeployment = lib.mkOption { description = mdDoc '' @@ -271,15 +271,6 @@ with builtins; rec { ''; type = types.path; }; - protocol = lib.mkOption { - type = types.str; - default = "local"; - description = mdDoc '' - Which protocol to use to deploy. - - It can be ssh, local, etc. but strongly depends on `activationProgram` - ''; - }; }; }; registryOptions = { lib, ... }: let diff --git a/src/nix/host/generic.rs b/src/nix/host/generic.rs index c203402..e58b6ba 100644 --- a/src/nix/host/generic.rs +++ b/src/nix/host/generic.rs @@ -3,7 +3,7 @@ use std::process::Stdio; use async_trait::async_trait; use serde::{Deserialize, Serialize}; -use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter}; +use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader, BufWriter}; use tokio::process::{Child, ChildStdin, ChildStdout, Command}; use super::{CopyDirection, CopyOptions, Host, RebootOptions}; @@ -32,11 +32,8 @@ pub struct Goal { /// A request to the activation program #[derive(Clone, Debug, Serialize, Deserialize)] pub enum Request { - /// Asks for activation program's capabilities - Capabilities, - Connection { - connection_options: HashMap, + connection_uri: String, }, /// Copy closure to/from host @@ -74,25 +71,19 @@ pub enum Request { }, } -#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] -pub enum CapabilityResponse { - V1 { - supported_transports: Vec, - supported_goals: Vec, - }, -} - /// A response from the activation program #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] pub enum Response { - Capabilities(CapabilityResponse), + ConnectionFailed { error: String }, + ConnectionSucceded { supported_goals: Vec }, Progress { phase: String }, NewStorePath { store_path: StorePath }, Failed { error: String }, + Unsupported, } #[derive(Debug)] -pub struct OpenedCommand { +pub struct ActivationCommand { command: Child, stdin: BufWriter, stdout: BufReader, @@ -100,8 +91,8 @@ pub struct OpenedCommand { #[derive(Debug)] pub struct GenericHost { - activation_program: OpenedCommand, - connection_options: HashMap, + activation_program: ActivationCommand, + connection_uri: String, } #[async_trait] @@ -192,10 +183,7 @@ impl Host for GenericHost { } impl GenericHost { - pub fn new( - system: &SystemTypeConfig, - connection_options: &HashMap, - ) -> ColmenaResult { + pub fn new(system: &SystemTypeConfig, connection_uri: String) -> ColmenaResult { let mut command = Command::new(system.activation_program.as_path()) .stdin(Stdio::piped()) .stdout(Stdio::piped()) @@ -205,7 +193,7 @@ impl GenericHost { let stdin = BufWriter::new(command.stdin.take().unwrap()); let stdout = BufReader::new(command.stdout.take().unwrap()); - let activation_program = OpenedCommand { + let activation_program = ActivationCommand { command, stdin, stdout, @@ -213,13 +201,13 @@ impl GenericHost { Ok(Self { activation_program, - connection_options: connection_options.clone(), + connection_uri, }) } pub async fn connect(&mut self) -> ColmenaResult<()> { self.call_default_handler(Request::Connection { - connection_options: self.connection_options.clone(), + connection_uri: self.connection_uri.clone(), }) .await } @@ -248,28 +236,30 @@ impl GenericHost { let mut line = String::new(); let mut value = initial_value; + // We're reading JSONL, so we can read all the line and parse it while stdout.read_line(&mut line).await.is_ok_and(|x| x != 0) { if line == "\n" { + log::trace!("finished receiving responses from activation program"); break; } log::trace!("receiving from activation program:\n{}", line); let response: Response = serde_json::from_str(line.as_str()) .map_err(|error| ColmenaError::InvalidActivationProgramJson { error })?; - value = handler(response, value); + + match response { + Response::Progress { phase } => { + log::info!("{phase}"); + break; + } + Response::Unsupported => return Err(ColmenaError::Unsupported), + _ => value = handler(response, value), + } } Ok(value) } async fn call_default_handler(&mut self, request: Request) -> ColmenaResult<()> { - self.call( - request, - |response, _| match response { - Response::Progress { phase } => println!("{phase}"), - _ => (), - }, - (), - ) - .await + self.call(request, |_, _| {}, ()).await } } diff --git a/src/nix/mod.rs b/src/nix/mod.rs index b2a8204..fd70c9c 100644 --- a/src/nix/mod.rs +++ b/src/nix/mod.rs @@ -67,8 +67,8 @@ pub struct NodeConfig { #[serde(rename = "targetPort")] target_port: Option, - #[serde(rename = "connectionOptions")] - connection_options: HashMap, + #[serde(rename = "connectionUri")] + connection_uri: String, #[serde(rename = "allowLocalDeployment")] allow_local_deployment: bool, @@ -107,8 +107,6 @@ pub struct SystemTypeConfig { #[serde(rename = "activationProgram")] pub activation_program: StorePath, - - pub protocol: String, } #[derive(Debug, Clone, Validate, Deserialize)] @@ -218,7 +216,7 @@ impl NodeConfig { } pub fn to_generic_host(&self, system_config: &SystemTypeConfig) -> ColmenaResult { - GenericHost::new(system_config, &self.connection_options) + GenericHost::new(system_config, self.connection_uri.clone()) } }