diff --git a/Cargo.toml b/Cargo.toml index 745af53..c5b8610 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,9 +44,18 @@ version = "1.28.1" features = [ "fs", "io-util", + "io-std", "macros", "process", "rt", "rt-multi-thread", "sync", ] + +[[bin]] +name = "colmena" +path = "src/main.rs" + +[lib] +name = "colmena" +path = "src/lib.rs" diff --git a/src/error.rs b/src/error.rs index a9855e6..40093b5 100644 --- a/src/error.rs +++ b/src/error.rs @@ -55,6 +55,9 @@ pub enum ColmenaError { #[snafu(display("Unexpected active profile: {:?}", profile))] ActiveProfileUnexpected { profile: Profile }, + #[snafu(display("Invalid JSON from activation program: {}", error))] + InvalidActivationProgramJson { error: serde_json::Error }, + #[snafu(display("Could not determine current profile"))] FailedToGetCurrentProfile, @@ -64,6 +67,12 @@ pub enum ColmenaError { #[snafu(display("Don't know how to connect to the node"))] NoTargetHost, + #[snafu(display( + "Don't know how to deploy node: {} -- does your system type support deployment?", + node_name + ))] + UndeployableHost { node_name: String }, + #[snafu(display("Node name cannot be empty"))] EmptyNodeName, diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..f1ac46a --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,16 @@ +#![allow(dead_code)] + +mod cli; +mod command; +mod error; +mod job; +mod nix; +mod progress; +mod troubleshooter; +mod util; + +pub use nix::host::generic::Request as GenericRequest; +pub use nix::host::generic::Response as GenericResponse; +pub use nix::host::CopyOptions; +pub use nix::key::{Key, UploadAt}; +pub use nix::store::StorePath; diff --git a/src/nix/hive/eval.nix b/src/nix/hive/eval.nix index 3572907..3e0ebbb 100644 --- a/src/nix/hive/eval.nix +++ b/src/nix/hive/eval.nix @@ -38,19 +38,25 @@ let else if uncheckedHive ? network then uncheckedHive.network else {}; + uncheckedRegistries = if uncheckedHive ? registry then uncheckedHive.registry else {}; + # The final hive will always have the meta key instead of network. hive = let userMeta = (lib.modules.evalModules { modules = [ colmenaOptions.metaOptions uncheckedUserMeta ]; }).config; + registry = (lib.modules.evalModules { + modules = [ colmenaOptions.registryOptions { registry = uncheckedRegistries; } ]; + }).config.registry; + mergedHive = assert lib.assertMsg (!(uncheckedHive ? __schema)) '' You cannot pass in an already-evaluated Hive into the evaluator. Hint: Use the `colmenaHive` output instead of `colmena`. ''; - removeAttrs (defaultHive // uncheckedHive) [ "meta" "network" ]; + removeAttrs (defaultHive // uncheckedHive) [ "meta" "network" "registry" ]; meta = { meta = @@ -58,7 +64,7 @@ let then userMeta // { nixpkgs = ; } else userMeta; }; - in mergedHive // meta; + in mergedHive // meta // { inherit registry; }; configsFor = node: let nodeConfig = hive.${node}; @@ -112,14 +118,23 @@ let in mkNixpkgs "meta.nixpkgs" nixpkgsConf; lib = nixpkgs.lib; - reservedNames = [ "defaults" "network" "meta" ]; + reservedNames = [ "defaults" "network" "meta" "registry" ]; - evalNode = name: configs: let + evalNode = name: configs: + # Some help on error messages. + assert (lib.assertMsg (lib.hasAttrByPath [ "deployment" "systemType" ] hive.${name}) + "${name} does not have a deployment system type!"); + assert (lib.assertMsg (builtins.typeOf hive.registry == "set")) + "The hive's registry is not a set, but of type '${builtins.typeOf hive.registry}'"; + assert (lib.assertMsg (lib.hasAttr hive.${name}.deployment.systemType hive.registry) + "${builtins.toJSON (hive.${name}.deployment.systemType)} does not exist in the registry of systems!"); + let + # We cannot use `configs` because we need to access to the raw configuration fragment. + inherit (hive.registry.${hive.${name}.deployment.systemType}) evalConfig; npkgs = if hasAttr name hive.meta.nodeNixpkgs then mkNixpkgs "meta.nodeNixpkgs.${name}" hive.meta.nodeNixpkgs.${name} else nixpkgs; - evalConfig = import (npkgs.path + "/nixos/lib/eval-config.nix"); # Here we need to merge the configurations in meta.nixpkgs # and in machine config. @@ -139,17 +154,19 @@ let in lib.optional (!hasTypedConfig && length remainingKeys != 0) "The following Nixpkgs configuration keys set in meta.nixpkgs will be ignored: ${toString remainingKeys}"; - }; + } // lib.optionalAttrs (builtins.hasAttr "localSystem" npkgs || builtins.hasAttr "crossSystem" npkgs) { + nixpkgs.localSystem = lib.mkBefore npkgs.localSystem; + nixpkgs.crossSystem = lib.mkBefore npkgs.crossSystem; + }; in evalConfig { - inherit (npkgs) system; + # This doesn't exist for `evalModules` the generic way. + # inherit (npkgs) system; modules = [ nixpkgsModule colmenaModules.assertionModule - colmenaModules.keyChownModule - colmenaModules.keyServiceModule colmenaOptions.deploymentOptions - hive.defaults + (hive.registry.${hive.${name}.deployment.systemType}.defaults or hive.defaults) ] ++ configs; specialArgs = { inherit name; @@ -179,6 +196,10 @@ let "allowApplyAll" ]; + serializableSystemTypeConfigKeys = [ + "supportsDeployment" "activationProgram" "protocol" + ]; + in rec { # Exported attributes __schema = "v0"; @@ -190,5 +211,9 @@ in rec { evalSelected = names: lib.filterAttrs (name: _: elem name names) toplevel; evalSelectedDrvPaths = names: lib.mapAttrs (_: v: v.drvPath) (evalSelected names); metaConfig = lib.filterAttrs (n: v: elem n metaConfigKeys) hive.meta; - introspect = f: f { inherit lib; pkgs = nixpkgs; nodes = uncheckedNodes; }; + # We cannot perform a `metaConfigKeys`-style simple check here + # because registry is arbitrarily deep and may evaluate nixpkgs indirectly. + registryConfig = lib.mapAttrs (systemTypeName: systemType: + lib.filterAttrs (n: v: elem n serializableSystemTypeConfigKeys) systemType) hive.registry; + introspect = f: f { inherit lib; pkgs = nixpkgs; inherit nodes; }; } diff --git a/src/nix/hive/mod.rs b/src/nix/hive/mod.rs index f9d9227..5a385c2 100644 --- a/src/nix/hive/mod.rs +++ b/src/nix/hive/mod.rs @@ -15,7 +15,7 @@ use validator::Validate; use super::deployment::TargetNode; use super::{ Flake, MetaConfig, NixExpression, NixFlags, NodeConfig, NodeFilter, NodeName, - ProfileDerivation, SerializedNixExpression, StorePath, + ProfileDerivation, RegistryConfig, SerializedNixExpression, StorePath, }; use crate::error::{ColmenaError, ColmenaResult}; use crate::job::JobHandle; @@ -87,6 +87,8 @@ pub struct Hive { nix_options: HashMap, meta_config: OnceCell, + + registry_config: OnceCell, } struct NixInstantiate<'hive> { @@ -140,6 +142,7 @@ impl Hive { impure: false, nix_options: HashMap::new(), meta_config: OnceCell::new(), + registry_config: OnceCell::new(), }) } @@ -158,6 +161,17 @@ impl Hive { .await } + pub async fn get_registry_config(&self) -> ColmenaResult<&RegistryConfig> { + self.registry_config + .get_or_try_init(|| async { + self.nix_instantiate("hive.registryConfig") + .eval() + .capture_json() + .await + }) + .await + } + pub fn set_show_trace(&mut self, value: bool) { self.show_trace = value; } @@ -201,6 +215,9 @@ impl Hive { ) -> ColmenaResult> { let mut node_configs = None; + log::info!("Enumerating systems..."); + let registry = self.get_registry_config().await?; + log::info!("Enumerating nodes..."); let all_nodes = self.node_names().await?; @@ -234,29 +251,61 @@ impl Hive { self.deployment_info_selected(&selected_nodes).await? }; + for node_config in &node_configs { + if let Some(system_type) = node_config.1.system_type.as_ref() { + let Some(system_config) = registry.systems.get(system_type) else { + // TODO: convert me to proper error? + log::warn!("'{:?}' is not a known system type in the registry, double check your expressions!", system_type); + return Err(ColmenaError::Unknown { + message: "unknown system type".to_string(), + }); + }; + + if !system_config.supports_deployment { + return Err(ColmenaError::UndeployableHost { + node_name: node_config.0.to_string(), + }); + } + } + } + let mut targets = HashMap::new(); let mut n_ssh = 0; for node in selected_nodes.into_iter() { let config = node_configs.remove(&node).unwrap(); - let host = config.to_ssh_host().map(|mut host| { - n_ssh += 1; - - if let Some(ssh_config) = &ssh_config { - host.set_ssh_config(ssh_config.clone()); - } - - if self.is_flake() { - host.set_use_nix3_copy(true); - } - - host.upcast() - }); - let ssh_host = host.is_some(); - let target = TargetNode::new(node.clone(), host, config); - - if !ssh_only || ssh_host { + if let Some(system_type) = config.system_type.as_ref() { + log::debug!( + "Using generic host (system_type: {}) for node {}", + system_type, + node.0 + ); + let system_config = registry.systems.get(system_type).unwrap(); + let mut generic_host = config.to_generic_host(system_config)?; + generic_host.connect().await?; + let target = TargetNode::new(node.clone(), Some(Box::new(generic_host)), config); targets.insert(node, target); + } else { + log::debug!("Using SSH host for node {}", node.0); + let host = config.to_ssh_host().map(|mut host| { + n_ssh += 1; + + if let Some(ssh_config) = &ssh_config { + host.set_ssh_config(ssh_config.clone()); + } + + if self.is_flake() { + host.set_use_nix3_copy(true); + } + + host.upcast() + }); + let ssh_host = host.is_some(); + let target = TargetNode::new(node.clone(), host, config); + + if !ssh_only || ssh_host { + targets.insert(node, target); + } } } diff --git a/src/nix/hive/options.nix b/src/nix/hive/options.nix index 95dcf1a..c1c28c6 100644 --- a/src/nix/hive/options.nix +++ b/src/nix/hive/options.nix @@ -98,6 +98,14 @@ with builtins; rec { in { options = { deployment = { + systemType = lib.mkOption { + description = mdDoc '' + System type used for this node, e.g. NixOS. + ''; + default = "nixos"; + # TODO: enum among all registered systems? + type = types.str; + }; targetHost = lib.mkOption { description = mdDoc '' The target SSH node for deployment. @@ -126,6 +134,13 @@ with builtins; rec { type = types.nullOr types.str; default = "root"; }; + connectionUri = lib.mkOption { + description = mdDoc '' + Connection options given to the activation program. + ''; + type = types.str; + default = "ssh://localhost"; + }; allowLocalDeployment = lib.mkOption { description = mdDoc '' Allow the configuration to be applied locally on the host running @@ -218,6 +233,58 @@ with builtins; rec { }; }; }; + # Options for a registered system type + systemTypeOptions = { name, lib, ... }: let + inherit (lib) types; + mdDoc = lib.mdDoc or lib.id; + in + { + options = { + evalConfig = lib.mkOption { + description = mdDoc '' + Evaluation function which share the same interface as `nixos/lib/eval-config.nix` + which can be tailored to your own usecases or to target another type of system, + e.g. nix-darwin. + ''; + type = types.functionTo types.unspecified; + }; + supportsDeployment = lib.mkOption { + description = mdDoc '' + Whether this system type supports deployment or not. + + If it supports deployment, it needs to have appropriate activation code, + refer to how to write custom activators. + ''; + default = name == "nixos"; + defaultText = "If a NixOS system, then true, otherwise false by default"; + }; + defaults = lib.mkOption { + description = mdDoc '' + Default configuration for that system type. + ''; + type = types.functionTo types.unspecified; + default = _: {}; + }; + activationProgram = lib.mkOption { + description = mdDoc '' + Program to execute at activation time. + ''; + type = types.path; + }; + }; + }; + registryOptions = { lib, ... }: let + inherit (lib) types; + mdDoc = lib.mdDoc or lib.id; + in + { + options.registry = lib.mkOption { + description = mdDoc '' + A registry of all system types. + ''; + type = types.attrsOf (types.submodule systemTypeOptions); + }; + }; # Hive-wide options metaOptions = { lib, ... }: let inherit (lib) types; diff --git a/src/nix/host/generic.rs b/src/nix/host/generic.rs new file mode 100644 index 0000000..48c52bf --- /dev/null +++ b/src/nix/host/generic.rs @@ -0,0 +1,287 @@ +use std::collections::HashMap; +use std::process::Stdio; + +use async_trait::async_trait; +use serde::{Deserialize, Serialize}; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter}; +use tokio::process::{Child, ChildStdin, ChildStdout, Command}; + +use super::{CopyDirection, CopyOptions, Host, RebootOptions}; +use crate::error::{ColmenaError, ColmenaResult}; +use crate::job::JobHandle; +use crate::nix::{self, Key, Profile, StorePath, SystemTypeConfig}; + +pub type TransportId = String; + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct Transport { + id: TransportId, + long_name: String, + description: String, +} + +pub type GoalId = String; + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct Goal { + id: GoalId, + long_name: String, + description: String, +} + +/// A request to the activation program +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum Request { + Connection { + connection_uri: String, + }, + + /// Copy closure to/from host + CopyClosure { + path: StorePath, + to_host: bool, + options: CopyOptions, + }, + + /// Deploys the profile to host + Deploy { + goal: GoalId, + toplevel: StorePath, + options: CopyOptions, + }, + + /// Realizes the derivation + Realize { + path: StorePath, + remote: bool, + }, + + /// Uploads keys to host + UploadKeys { + keys: HashMap, + require_ownership: bool, + }, + + Activate { + profile: StorePath, + goal: GoalId, + }, + + Reboot { + wait_for_boot: bool, + }, +} + +/// A response from the activation program +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub enum Response { + ConnectionFailed { error: String }, + ConnectionSucceded { supported_goals: Vec }, + Progress { phase: String }, + NewStorePath { store_path: StorePath }, + Failed { error: String }, + Unsupported, +} + +#[derive(Debug)] +pub struct ActivationCommand { + command: Child, + stdin: BufWriter, + stdout: BufReader, +} + +#[derive(Debug)] +pub struct GenericHost { + activation_program: ActivationCommand, + connection_uri: String, +} + +#[async_trait] +impl Host for GenericHost { + async fn copy_closure( + &mut self, + closure: &StorePath, + direction: CopyDirection, + options: CopyOptions, + ) -> ColmenaResult<()> { + self.call_default_handler(Request::CopyClosure { + path: closure.clone(), + to_host: direction == CopyDirection::ToRemote, + options, + }) + .await + } + + async fn realize_remote(&mut self, derivation: &StorePath) -> ColmenaResult> { + Ok(self + .call( + Request::Realize { + path: derivation.clone(), + remote: true + }, + move |response, mut store_paths| { + match response { + Response::Progress { phase } => println!("{}", phase), + Response::NewStorePath { store_path } => store_paths.push(store_path), + _ => (), + }; + store_paths + }, + Vec::new(), + ) + .await?) + } + + async fn realize(&mut self, derivation: &StorePath) -> ColmenaResult> { + Ok(self + .call( + Request::Realize { + path: derivation.clone(), + remote: false + }, + move |response, mut store_paths| { + match response { + Response::Progress { phase } => println!("{}", phase), + Response::NewStorePath { store_path } => store_paths.push(store_path), + _ => (), + }; + store_paths + }, + Vec::new(), + ) + .await?) + } + + fn set_job(&mut self, _: Option) {} + + async fn deploy( + &mut self, + profile: &Profile, + goal: nix::Goal, + copy_options: CopyOptions, + ) -> ColmenaResult<()> { + self.call_default_handler(Request::Deploy { + goal: goal.to_string(), + toplevel: profile.as_store_path().clone(), + options: copy_options, + }) + .await + } + + async fn upload_keys( + &mut self, + keys: &HashMap, + require_ownership: bool, + ) -> ColmenaResult<()> { + self.call_default_handler(Request::UploadKeys { + keys: keys.clone(), + require_ownership, + }) + .await + } + + async fn get_current_system_profile(&mut self) -> ColmenaResult { + Err(ColmenaError::Unsupported) + } + + async fn get_main_system_profile(&mut self) -> ColmenaResult { + Err(ColmenaError::Unsupported) + } + + async fn activate(&mut self, profile: &Profile, goal: nix::Goal) -> ColmenaResult<()> { + self.call_default_handler(Request::Activate { + profile: profile.as_store_path().clone(), + goal: goal.to_string(), + }) + .await + } + + async fn reboot(&mut self, options: RebootOptions) -> ColmenaResult<()> { + self.call_default_handler(Request::Reboot { + wait_for_boot: options.wait_for_boot, + }) + .await + } +} + +impl GenericHost { + pub fn new(system: &SystemTypeConfig, connection_uri: String) -> ColmenaResult { + let mut command = Command::new(system.activation_program.as_path()) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::inherit()) + .spawn()?; + + let stdin = BufWriter::new(command.stdin.take().unwrap()); + let stdout = BufReader::new(command.stdout.take().unwrap()); + + let activation_program = ActivationCommand { + command, + stdin, + stdout, + }; + + Ok(Self { + activation_program, + connection_uri, + }) + } + + pub async fn connect(&mut self) -> ColmenaResult<()> { + self.call_default_handler(Request::Connection { + connection_uri: self.connection_uri.clone(), + }) + .await + } + + async fn call( + &mut self, + request: Request, + handler: F, + initial_value: T, + ) -> ColmenaResult + where + F: Fn(Response, T) -> T, + { + let json = serde_json::to_string(&request) + .map_err(|error| ColmenaError::InvalidActivationProgramJson { error })?; + + log::trace!("giving to activation program stdin {}", json.as_str()); + + let stdin = &mut self.activation_program.stdin; + let stdout = &mut self.activation_program.stdout; + + stdin.write_all(json.as_bytes()).await?; + stdin.write_all(b"\n").await?; + stdin.flush().await?; + + let mut line = String::new(); + let mut value = initial_value; + + // We're reading JSONL, so we can read all the line and parse it + while stdout.read_line(&mut line).await.is_ok_and(|x| x != 0) { + if line == "\n" { + log::trace!("finished receiving responses from activation program"); + break; + } + log::trace!("receiving from activation program:\n{}", line); + let response: Response = serde_json::from_str(line.as_str()) + .map_err(|error| ColmenaError::InvalidActivationProgramJson { error })?; + + match response { + Response::Progress { phase } => { + log::info!("{phase}"); + break; + } + Response::Unsupported => return Err(ColmenaError::Unsupported), + _ => value = handler(response, value), + } + } + + Ok(value) + } + + async fn call_default_handler(&mut self, request: Request) -> ColmenaResult<()> { + self.call(request, |_, _| {}, ()).await + } +} diff --git a/src/nix/host/mod.rs b/src/nix/host/mod.rs index 8ff44ad..c7cf046 100644 --- a/src/nix/host/mod.rs +++ b/src/nix/host/mod.rs @@ -1,6 +1,7 @@ use std::collections::HashMap; use async_trait::async_trait; +use serde::{Deserialize, Serialize}; use super::{Goal, Key, Profile, StorePath}; use crate::error::{ColmenaError, ColmenaResult}; @@ -14,13 +15,16 @@ pub use local::Local; mod key_uploader; -#[derive(Copy, Clone, Debug)] +pub mod generic; +pub use generic::GenericHost; + +#[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] pub enum CopyDirection { ToRemote, FromRemote, } -#[derive(Copy, Clone, Debug)] +#[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] pub struct CopyOptions { include_outputs: bool, use_substitutes: bool, @@ -61,6 +65,18 @@ impl CopyOptions { self.gzip = val; self } + + pub fn get_include_outputs(&self) -> bool { + self.include_outputs + } + + pub fn get_use_substitutes(&self) -> bool { + self.use_substitutes + } + + pub fn get_gzip(&self) -> bool { + self.gzip + } } impl Default for RebootOptions { diff --git a/src/nix/mod.rs b/src/nix/mod.rs index 8479ce6..fd70c9c 100644 --- a/src/nix/mod.rs +++ b/src/nix/mod.rs @@ -10,8 +10,8 @@ use validator::{Validate, ValidationError as ValidationErrorType}; use crate::error::{ColmenaError, ColmenaResult}; pub mod host; -use host::Ssh; pub use host::{CopyDirection, CopyOptions, Host, RebootOptions}; +use host::{GenericHost, Ssh}; pub mod hive; pub use hive::{Hive, HivePath}; @@ -55,6 +55,9 @@ pub struct NodeName(#[serde(deserialize_with = "NodeName::deserialize")] String) #[derive(Debug, Clone, Validate, Deserialize)] pub struct NodeConfig { + #[serde(rename = "systemType")] + system_type: Option, + #[serde(rename = "targetHost")] target_host: Option, @@ -64,6 +67,9 @@ pub struct NodeConfig { #[serde(rename = "targetPort")] target_port: Option, + #[serde(rename = "connectionUri")] + connection_uri: String, + #[serde(rename = "allowLocalDeployment")] allow_local_deployment: bool, @@ -94,6 +100,21 @@ pub struct MetaConfig { pub machines_file: Option, } +#[derive(Debug, Clone, Validate, Deserialize)] +pub struct SystemTypeConfig { + #[serde(rename = "supportsDeployment")] + pub supports_deployment: bool, + + #[serde(rename = "activationProgram")] + pub activation_program: StorePath, +} + +#[derive(Debug, Clone, Validate, Deserialize)] +pub struct RegistryConfig { + #[serde(flatten)] + pub systems: HashMap, +} + /// Nix CLI flags. #[derive(Debug, Clone, Default)] pub struct NixFlags { @@ -193,6 +214,10 @@ impl NodeConfig { host }) } + + pub fn to_generic_host(&self, system_config: &SystemTypeConfig) -> ColmenaResult { + GenericHost::new(system_config, self.connection_uri.clone()) + } } impl NixFlags {