Refactoring and deployment.keys implementation

More refactoring of the deployment process, as well as an initial
implementation of `deployment.keys`.

Fixes #2.
This commit is contained in:
Zhaofeng Li 2021-02-08 18:38:14 -08:00
parent 21c2bef3ad
commit 84aa165aef
19 changed files with 1156 additions and 461 deletions

155
Cargo.lock generated
View file

@ -106,12 +106,14 @@ dependencies = [
"libc",
"log",
"quit",
"regex",
"serde",
"serde_json",
"snafu",
"sys-info",
"tempfile",
"tokio",
"validator",
]
[[package]]
@ -155,6 +157,16 @@ dependencies = [
"termcolor",
]
[[package]]
name = "form_urlencoded"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ece68d15c92e84fa4f19d3780f1294e5ca82a78a6d515f1efaabcc144688be00"
dependencies = [
"matches",
"percent-encoding",
]
[[package]]
name = "futures"
version = "0.3.8"
@ -293,6 +305,23 @@ version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c1ad908cc71012b7bea4d0c53ba96a8cba9962f048fa68d143376143d863b7a"
[[package]]
name = "idna"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "de910d521f7cc3135c4de8db1cb910e0b5ed1dc6f57c381cd07e8e661ce10094"
dependencies = [
"matches",
"unicode-bidi",
"unicode-normalization",
]
[[package]]
name = "if_chain"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1f7280c75fb2e2fc47080ec80ccc481376923acb04501957fc38f935c3de5088"
[[package]]
name = "indicatif"
version = "0.15.0"
@ -356,6 +385,12 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ffbee8634e0d45d258acb448e7eaab3fce7a0a467395d4d9f228e3c1f01fb2e4"
[[package]]
name = "matches"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ffc5c5338469d4d3ea17d269fa8ea3512ad247247c30bd2df69e68309ed0a08"
[[package]]
name = "memchr"
version = "2.3.4"
@ -441,6 +476,12 @@ dependencies = [
"winapi",
]
[[package]]
name = "percent-encoding"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e"
[[package]]
name = "pin-project"
version = "1.0.2"
@ -479,6 +520,30 @@ version = "0.2.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac74c624d6b2d21f425f752262f42188365d7b8ff1aff74c82e45136510a4857"
[[package]]
name = "proc-macro-error"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c"
dependencies = [
"proc-macro-error-attr",
"proc-macro2",
"quote",
"syn",
"version_check",
]
[[package]]
name = "proc-macro-error-attr"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869"
dependencies = [
"proc-macro2",
"quote",
"version_check",
]
[[package]]
name = "proc-macro-hack"
version = "0.5.19"
@ -773,6 +838,21 @@ dependencies = [
"lazy_static",
]
[[package]]
name = "tinyvec"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "317cca572a0e89c3ce0ca1f1bdc9369547fe318a683418e42ac8f59d14701023"
dependencies = [
"tinyvec_macros",
]
[[package]]
name = "tinyvec_macros"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c"
[[package]]
name = "tokio"
version = "1.0.0"
@ -804,6 +884,24 @@ dependencies = [
"syn",
]
[[package]]
name = "unicode-bidi"
version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49f2bd0c6468a8230e1db229cff8029217cf623c767ea5d60bfbd42729ea54d5"
dependencies = [
"matches",
]
[[package]]
name = "unicode-normalization"
version = "0.1.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a13e63ab62dbe32aeee58d1c5408d35c36c392bba5d9d3142287219721afe606"
dependencies = [
"tinyvec",
]
[[package]]
name = "unicode-width"
version = "0.1.8"
@ -816,12 +914,69 @@ version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7fe0bb3479651439c9112f72b6c505038574c9fbb575ed1bf3b797fa39dd564"
[[package]]
name = "url"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5909f2b0817350449ed73e8bcd81c8c3c8d9a7a5d8acba4b27db277f1868976e"
dependencies = [
"form_urlencoded",
"idna",
"matches",
"percent-encoding",
]
[[package]]
name = "validator"
version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "841d6937c33ec6039d8071bcf72933146b5bbe378d645d8fa59bdadabfc2a249"
dependencies = [
"idna",
"lazy_static",
"regex",
"serde",
"serde_derive",
"serde_json",
"url",
"validator_derive",
"validator_types",
]
[[package]]
name = "validator_derive"
version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4286b4497f270f59276a89ae0ad109d5f8f18c69b613e3fb22b61201aadb0c4d"
dependencies = [
"if_chain",
"lazy_static",
"proc-macro-error",
"proc-macro2",
"quote",
"regex",
"syn",
"validator_types",
]
[[package]]
name = "validator_types"
version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ad9680608df133af2c1ddd5eaf1ddce91d60d61b6bc51494ef326458365a470a"
[[package]]
name = "vec_map"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191"
[[package]]
name = "version_check"
version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5a972e5669d67ba988ce3dc826706fb0a8b01471c088cb0b6110b805cc36aed"
[[package]]
name = "wasi"
version = "0.9.0+wasi-snapshot-preview1"

View file

@ -18,9 +18,11 @@ indicatif = "0.15.0"
libc = "0.2.81"
log = "0.4.11"
quit = "1.1.2"
regex = "1"
serde = { version = "1.0.118", features = ["derive"] }
serde_json = "1.0"
sys-info = "0.7.0"
snafu = "0.6.10"
tempfile = "3.1.0"
tokio = { version = "1.0.0", features = ["full"] }
validator = { version = "0.12", features = ["derive"] }

View file

@ -150,6 +150,35 @@ As an example, the following `hive.nix` includes a node (`laptop`) that is meant
On `laptop`, run `colmena apply-local --sudo` to activate the configuration.
## Secrets
Colmena allows you to upload secret files to nodes that will not be stored in the Nix store.
It implements a subset of the `deployment.keys` options supported by NixOps.
For example, to deploy ACME credentials for use with `security.acme`:
```
{
shared-box = {
security.acme.certs."my-site.tld".credentialsFile = "/run/keys/acme-credentials.secret";
deployment.keys."acme-credentials.secret" = {
text = ''
PDNS_API_URL=https://dns.provider
PDNS_API_KEY=top-secret-api-key
'';
destDir = "/run/keys"; # Default: /run/keys
owner = "acme"; # Default: root
group = "nginx"; # Default: root
mode = "0640"; # Default: 0600
};
# Rest of configuration...
};
}
```
Take note that if you use the default path (`/run/keys`), the secret files are only stored in-memory and will not survive reboots.
To upload your secrets without performing a full deployment, use `colmena upload-keys`.
## Current limitations
- It's required to use SSH keys to log into the remote hosts, and interactive authentication will not work.

View file

@ -17,5 +17,5 @@ in rustPlatform.buildRustPackage {
src = ./.;
};
};
cargoSha256 = "0m35xjslm5gxr2cb5fw8pkqpm853hsznhsncry2kvicqzwh63ldm";
cargoSha256 = "0xxp6hklnpdidrydvahv26vvpdysa7x7sf19vll8fb8zgbhjfvjm";
}

View file

@ -6,6 +6,7 @@ use clap::{Arg, App, SubCommand, ArgMatches};
use crate::nix::deployment::{
Deployment,
DeploymentGoal,
DeploymentTarget,
DeploymentOptions,
EvaluationNodeLimit,
ParallelismLimit,
@ -76,6 +77,15 @@ Set to 0 to disable parallemism limit.
.help("Be verbose")
.long_help("Deactivates the progress spinner and prints every line of output.")
.takes_value(false))
.arg(Arg::with_name("no-keys")
.long("no-keys")
.help("Do not upload keys")
.long_help(r#"Do not upload secret keys set in `deployment.keys`.
By default, Colmena will upload keys set in `deployment.keys` before deploying the new profile on a node.
To upload keys without building or deploying the rest of the configuration, use `colmena upload-keys`.
"#)
.takes_value(false))
.arg(Arg::with_name("no-substitutes")
.long("no-substitutes")
.help("Do not use substitutes")
@ -96,7 +106,7 @@ pub fn subcommand() -> App<'static, 'static> {
.long_help("Same as the targets for switch-to-configuration.\n\"push\" means only copying the closures to remote nodes.")
.default_value("switch")
.index(1)
.possible_values(&["build", "push", "switch", "boot", "test", "dry-activate"]))
.possible_values(&["build", "push", "switch", "boot", "test", "dry-activate", "keys"]))
;
let command = register_deploy_args(command);
@ -121,17 +131,33 @@ pub async fn run(_global_args: &ArgMatches<'_>, local_args: &ArgMatches<'_>) {
quit::with_code(2);
}
let goal = DeploymentGoal::from_str(local_args.value_of("goal").unwrap()).unwrap();
// FIXME: This is ugly :/ Make an enum wrapper for this fake "keys" goal
let goal_arg = local_args.value_of("goal").unwrap();
let goal = if goal_arg == "keys" {
DeploymentGoal::Build
} else {
DeploymentGoal::from_str(goal_arg).unwrap()
};
let build_only = goal == DeploymentGoal::Build && goal_arg != "keys";
let mut targets = HashMap::new();
for node in &selected_nodes {
let host = all_nodes.get(node).unwrap().to_ssh_host();
let config = all_nodes.get(node).unwrap();
let host = config.to_ssh_host();
match host {
Some(host) => {
targets.insert(node.clone(), host);
targets.insert(
node.clone(),
DeploymentTarget::new(host, config.clone()),
);
}
None => {
if goal == DeploymentGoal::Build {
targets.insert(node.clone(), localhost());
if build_only {
targets.insert(
node.clone(),
DeploymentTarget::new(localhost(), config.clone()),
);
}
}
}
@ -149,6 +175,7 @@ pub async fn run(_global_args: &ArgMatches<'_>, local_args: &ArgMatches<'_>) {
let mut options = DeploymentOptions::default();
options.set_substituters_push(!local_args.is_present("no-substitutes"));
options.set_upload_keys(!local_args.is_present("no-upload-keys"));
options.set_gzip(!local_args.is_present("no-gzip"));
options.set_progress_bar(!local_args.is_present("verbose"));
deployment.set_options(options);
@ -185,5 +212,10 @@ pub async fn run(_global_args: &ArgMatches<'_>, local_args: &ArgMatches<'_>) {
deployment.set_evaluation_node_limit(evaluation_node_limit);
let deployment = Arc::new(deployment);
deployment.execute().await;
if goal_arg == "keys" {
deployment.upload_keys().await;
} else {
deployment.execute().await;
}
}

View file

@ -6,7 +6,12 @@ use clap::{Arg, App, SubCommand, ArgMatches};
use tokio::fs;
use tokio::process::Command;
use crate::nix::{Deployment, DeploymentGoal, Host};
use crate::nix::deployment::{
Deployment,
DeploymentGoal,
DeploymentTarget,
DeploymentOptions,
};
use crate::nix::host;
use crate::util;
@ -21,7 +26,20 @@ pub fn subcommand() -> App<'static, 'static> {
.possible_values(&["push", "switch", "boot", "test", "dry-activate"]))
.arg(Arg::with_name("sudo")
.long("sudo")
.help("Attempt to escalate privileges if not run as root")
.help("Attempt to escalate privileges if not run as root"))
.arg(Arg::with_name("verbose")
.short("v")
.long("verbose")
.help("Be verbose")
.long_help("Deactivates the progress spinner and prints every line of output.")
.takes_value(false))
.arg(Arg::with_name("no-keys")
.long("no-keys")
.help("Do not deploy keys")
.long_help(r#"Do not deploy secret keys set in `deployment.keys`.
By default, Colmena will deploy keys set in `deployment.keys` before activating the profile on this host.
"#)
.takes_value(false))
.arg(Arg::with_name("node")
.long("node")
@ -75,14 +93,17 @@ pub async fn run(_global_args: &ArgMatches<'_>, local_args: &ArgMatches<'_>) {
log::info!("Enumerating nodes...");
let all_nodes = hive.deployment_info().await.unwrap();
let target: Box<dyn Host> = {
let target: DeploymentTarget = {
if let Some(info) = all_nodes.get(&hostname) {
if !info.allows_local_deployment() {
log::error!("Local deployment is not enabled for host {}.", hostname);
log::error!("Hint: Set deployment.allowLocalDeployment to true.");
quit::with_code(2);
}
host::local()
DeploymentTarget::new(
host::local(),
info.clone(),
)
} else {
log::error!("Host {} is not present in the Hive configuration.", hostname);
quit::with_code(2);
@ -92,8 +113,13 @@ pub async fn run(_global_args: &ArgMatches<'_>, local_args: &ArgMatches<'_>) {
let mut targets = HashMap::new();
targets.insert(hostname.clone(), target);
let deployment = Arc::new(Deployment::new(hive, targets, goal));
let mut deployment = Deployment::new(hive, targets, goal);
let mut options = DeploymentOptions::default();
options.set_upload_keys(!local_args.is_present("no-upload-keys"));
options.set_progress_bar(!local_args.is_present("verbose"));
deployment.set_options(options);
let deployment = Arc::new(deployment);
deployment.execute().await;
}

View file

@ -2,3 +2,4 @@ pub mod build;
pub mod apply;
pub mod introspect;
pub mod apply_local;
pub mod upload_keys;

View file

@ -0,0 +1,23 @@
use clap::{Arg, App, SubCommand};
use crate::util;
use super::apply;
pub use super::apply::run;
pub fn subcommand() -> App<'static, 'static> {
let command = SubCommand::with_name("upload-keys")
.about("Upload keys to remote hosts")
.long_about(r#"Upload keys to remote hosts
This subcommand behaves as if you invoked `apply` with the pseudo `keys` goal."#)
.arg(Arg::with_name("goal")
.hidden(true)
.default_value("keys")
.possible_values(&["keys"])
.takes_value(true));
let command = apply::register_deploy_args(command);
util::register_selector_args(command)
}

View file

@ -66,6 +66,7 @@ For a sample configuration, see <https://github.com/zhaofengli/colmena>.
bind_command!(apply_local, app);
bind_command!(build, app);
bind_command!(introspect, app);
bind_command!(upload_keys, app);
let matches = app.clone().get_matches();
@ -73,6 +74,7 @@ For a sample configuration, see <https://github.com/zhaofengli/colmena>.
command!("apply-local", apply_local, matches);
command!(build, matches);
command!(introspect, matches);
command!("upload-keys", upload_keys, matches);
app.print_long_help().unwrap();
}

View file

@ -4,10 +4,10 @@ use std::collections::HashMap;
use futures::future::join_all;
use futures::join;
use indicatif::{MultiProgress, ProgressBar, ProgressDrawTarget};
use indicatif::{MultiProgress, ProgressBar, ProgressStyle, ProgressDrawTarget};
use tokio::sync::{Mutex, Semaphore};
use super::{Hive, Host, CopyOptions, host};
use super::{Hive, Host, CopyOptions, NodeConfig, Profile, StoreDerivation, ProfileMap, host};
use crate::progress::get_spinner_styles;
/// Amount of RAM reserved for the system, in MB.
@ -112,7 +112,7 @@ impl DeploymentGoal {
}
}
/// Internal deployment stages
/// Internal deployment stages.
#[derive(Debug)]
enum DeploymentStage {
Evaluate(Vec<String>),
@ -120,7 +120,7 @@ enum DeploymentStage {
Apply(String),
}
/// Results of a deployment to a node
/// Results of a deployment to a node.
#[derive(Debug)]
struct DeploymentResult {
/// Stage in which the deployment ended.
@ -195,12 +195,26 @@ impl DeploymentResult {
}
}
/// A deployment target.
#[derive(Debug)]
pub struct DeploymentTarget {
host: Box<dyn Host>,
config: NodeConfig,
}
impl DeploymentTarget {
pub fn new(host: Box<dyn Host>, config: NodeConfig) -> Self {
Self { host, config }
}
}
#[derive(Debug)]
pub struct Deployment {
hive: Hive,
goal: DeploymentGoal,
nodes: Vec<String>,
node_hosts: Mutex<HashMap<String, Box<dyn Host>>>,
target_names: Vec<String>,
targets: Mutex<HashMap<String, DeploymentTarget>>,
progress_alignment: usize,
parallelism_limit: ParallelismLimit,
evaluation_node_limit: EvaluationNodeLimit,
options: DeploymentOptions,
@ -208,14 +222,22 @@ pub struct Deployment {
}
impl Deployment {
pub fn new(hive: Hive, targets: HashMap<String, Box<dyn Host>>, goal: DeploymentGoal) -> Self {
let nodes: Vec<String> = targets.keys().cloned().collect();
pub fn new(hive: Hive, targets: HashMap<String, DeploymentTarget>, goal: DeploymentGoal) -> Self {
let target_names: Vec<String> = targets.keys().cloned().collect();
let progress_alignment = if let Some(len) = target_names.iter().map(|n| n.len()).max() {
max(BATCH_OPERATION_LABEL.len(), len)
} else {
BATCH_OPERATION_LABEL.len()
};
Self {
hive,
goal,
nodes,
node_hosts: Mutex::new(targets),
target_names,
targets: Mutex::new(targets),
progress_alignment,
parallelism_limit: ParallelismLimit::default(),
evaluation_node_limit: EvaluationNodeLimit::default(),
options: DeploymentOptions::default(),
@ -235,19 +257,96 @@ impl Deployment {
self.evaluation_node_limit = limit;
}
// FIXME: Duplication
/// Uploads keys only (user-facing)
pub async fn upload_keys(self: Arc<Self>) {
let multi = Arc::new(MultiProgress::new());
let root_bar = Arc::new(multi.add(ProgressBar::new(100)));
multi.set_draw_target(ProgressDrawTarget::stderr_nohz());
{
let (style, _) = self.spinner_styles();
root_bar.set_message("Uploading keys...");
root_bar.set_style(style);
root_bar.tick();
root_bar.enable_steady_tick(100);
}
let arc_self = self.clone();
let mut futures = Vec::new();
for node in self.target_names.iter() {
let node = node.to_owned();
let mut target = {
let mut targets = arc_self.targets.lock().await;
targets.remove(&node).unwrap()
};
let multi = multi.clone();
let arc_self = self.clone();
futures.push(tokio::spawn(async move {
let permit = arc_self.parallelism_limit.apply.acquire().await.unwrap();
let bar = multi.add(ProgressBar::new(100));
let (style, fail_style) = arc_self.spinner_styles();
bar.set_style(style);
bar.set_prefix(&node);
bar.tick();
bar.enable_steady_tick(100);
if let Err(e) = target.host.upload_keys(&target.config.keys).await {
bar.set_style(fail_style);
bar.abandon_with_message(&format!("Failed to upload keys: {}", e));
let mut results = arc_self.results.lock().await;
let stage = DeploymentStage::Apply(node.to_string());
let logs = target.host.dump_logs().await.map(|s| s.to_string());
results.push(DeploymentResult::failure(stage, logs));
return;
} else {
bar.finish_with_message("Keys uploaded");
}
drop(permit);
}));
}
let wait_for_tasks = tokio::spawn(async move {
join_all(futures).await;
root_bar.finish_with_message("Finished");
});
let tasks_result = if self.options.progress_bar {
let wait_for_bars = tokio::task::spawn_blocking(move || {
multi.join().unwrap();
});
let (tasks_result, _) = join!(wait_for_tasks, wait_for_bars);
tasks_result
} else {
wait_for_tasks.await
};
if let Err(e) = tasks_result {
log::error!("Deployment process failed: {}", e);
}
self.print_logs().await;
}
/// Executes the deployment (user-facing)
///
/// Self must be wrapped inside an Arc.
pub async fn execute(self: Arc<Self>) {
let multi = Arc::new(MultiProgress::new());
let root_bar = Arc::new(multi.add(ProgressBar::new(100)));
let alignment = self.node_name_alignment();
multi.set_draw_target(ProgressDrawTarget::stderr_nohz());
{
let (spinner_style, _) = get_spinner_styles(alignment);
let (style, _) = self.spinner_styles();
root_bar.set_message("Running...");
root_bar.set_style(spinner_style);
root_bar.set_style(style);
root_bar.tick();
root_bar.enable_steady_tick(100);
}
@ -258,10 +357,10 @@ impl Deployment {
// FIXME: Saner logging
let mut futures = Vec::new();
for chunk in self.nodes.chunks(eval_limit) {
for chunk in self.target_names.chunks(eval_limit) {
let arc_self = self.clone();
let multi = multi.clone();
let (spinner_style, failing_spinner_style) = get_spinner_styles(alignment);
let (style, _) = self.spinner_styles();
// FIXME: Eww
let chunk: Vec<String> = chunk.iter().map(|s| s.to_string()).collect();
@ -271,25 +370,15 @@ impl Deployment {
// Evaluation phase
let permit = arc_self.parallelism_limit.evaluation.acquire().await.unwrap();
let bar = set_up_batch_progress_bar!(multi, spinner_style, chunk,
let bar = set_up_batch_progress_bar!(multi, style, chunk,
"Evaluating configuration...",
"Evaluating configurations for {} nodes"
);
let (eval, logs) = arc_self.hive.eval_selected(&chunk, Some(bar.clone())).await;
let drv = match eval {
Ok(drv) => {
bar.finish_and_clear();
drv
}
Err(e) => {
bar.set_style(failing_spinner_style.clone());
bar.abandon_with_message(&format!("Evalation failed: {}", e));
let mut results = arc_self.results.lock().await;
let stage = DeploymentStage::Evaluate(chunk.clone());
results.push(DeploymentResult::failure(stage, logs));
let arc_self = arc_self.clone();
let drv = match arc_self.eval_profiles(&chunk, bar).await {
Some(drv) => drv,
None => {
return;
}
};
@ -302,104 +391,50 @@ impl Deployment {
// Build phase
let permit = arc_self.parallelism_limit.build.acquire().await.unwrap();
let bar = set_up_batch_progress_bar!(multi, spinner_style, chunk,
let bar = set_up_batch_progress_bar!(multi, style, chunk,
"Building configuration...",
"Building configurations for {} nodes"
);
// FIXME: Remote build?
let mut builder = host::local();
let goal = arc_self.goal;
let arc_self = arc_self.clone();
let profiles = arc_self.build_profiles(&chunk, drv, bar.clone()).await;
if arc_self.options.progress_bar {
builder.set_progress_bar(bar.clone());
}
let profiles = match drv.realize(&mut *builder).await {
Ok(profiles) => {
let goal = arc_self.goal;
if goal == DeploymentGoal::Build {
bar.finish_with_message(goal.success_str().unwrap());
let mut results = arc_self.results.lock().await;
let stage = DeploymentStage::Build(chunk.clone());
let logs = builder.dump_logs().await.map(|s| s.to_string());
results.push(DeploymentResult::success(stage, logs));
return;
} else {
bar.finish_and_clear();
profiles
}
}
Err(e) => {
bar.set_style(failing_spinner_style.clone());
bar.abandon_with_message(&format!("Build failed: {}", e));
let mut results = arc_self.results.lock().await;
let stage = DeploymentStage::Build(chunk.clone());
let logs = builder.dump_logs().await.map(|s| s.to_string());
results.push(DeploymentResult::failure(stage, logs));
let profiles = match profiles {
Some(profiles) => profiles,
None => {
return;
}
};
if goal != DeploymentGoal::Build {
bar.finish_and_clear();
}
drop(permit);
profiles
};
// Should we continue?
if arc_self.goal == DeploymentGoal::Build {
return;
}
// Apply phase
let mut futures = Vec::new();
for node in chunk {
let arc_self = arc_self.clone();
let multi = multi.clone();
let spinner_style = spinner_style.clone();
let failing_spinner_style = failing_spinner_style.clone();
let mut host = {
let mut node_hosts = arc_self.node_hosts.lock().await;
node_hosts.remove(&node).unwrap()
let target = {
let mut targets = arc_self.targets.lock().await;
targets.remove(&node).unwrap()
};
let profile = profiles.get(&node).cloned()
.expect(&format!("Somehow profile for {} was not built", node));
futures.push(tokio::spawn(async move {
let permit = arc_self.parallelism_limit.apply.acquire().await.unwrap();
let bar = multi.add(ProgressBar::new(100));
bar.set_style(spinner_style);
bar.set_prefix(&node);
bar.set_message("Starting...");
bar.tick();
bar.enable_steady_tick(100);
if arc_self.options.progress_bar {
host.set_progress_bar(bar.clone());
}
let copy_options = arc_self.options.to_copy_options()
.include_outputs(true);
let goal = arc_self.goal;
match host.deploy(&profile, goal, copy_options).await {
Ok(_) => {
bar.finish_with_message(goal.success_str().unwrap());
let mut results = arc_self.results.lock().await;
let stage = DeploymentStage::Apply(node.clone());
let logs = host.dump_logs().await.map(|s| s.to_string());
results.push(DeploymentResult::success(stage, logs));
}
Err(e) => {
bar.set_style(failing_spinner_style);
bar.abandon_with_message(&format!("Failed: {}", e));
let mut results = arc_self.results.lock().await;
let stage = DeploymentStage::Apply(node.clone());
let logs = host.dump_logs().await.map(|s| s.to_string());
results.push(DeploymentResult::failure(stage, logs));
}
}
drop(permit);
arc_self.apply_profile(&node, target, profile, multi).await
}));
}
@ -440,19 +475,126 @@ impl Deployment {
}
}
fn node_name_alignment(&self) -> usize {
if let Some(len) = self.nodes.iter().map(|n| n.len()).max() {
max(BATCH_OPERATION_LABEL.len(), len)
} else {
BATCH_OPERATION_LABEL.len()
async fn eval_profiles(self: Arc<Self>, chunk: &Vec<String>, progress: ProgressBar) -> Option<StoreDerivation<ProfileMap>> {
let (eval, logs) = self.hive.eval_selected(&chunk, Some(progress.clone())).await;
match eval {
Ok(drv) => {
progress.finish_and_clear();
Some(drv)
}
Err(e) => {
let (_, fail_style) = self.spinner_styles();
progress.set_style(fail_style.clone());
progress.abandon_with_message(&format!("Evalation failed: {}", e));
let mut results = self.results.lock().await;
let stage = DeploymentStage::Evaluate(chunk.clone());
results.push(DeploymentResult::failure(stage, logs));
None
}
}
}
async fn build_profiles(self: Arc<Self>, chunk: &Vec<String>, derivation: StoreDerivation<ProfileMap>, progress: ProgressBar) -> Option<ProfileMap> {
// FIXME: Remote build?
let mut builder = host::local();
if self.options.progress_bar {
builder.set_progress_bar(progress.clone());
}
match derivation.realize(&mut *builder).await {
Ok(profiles) => {
progress.finish_with_message("Successfully built");
let mut results = self.results.lock().await;
let stage = DeploymentStage::Build(chunk.clone());
let logs = builder.dump_logs().await.map(|s| s.to_string());
results.push(DeploymentResult::success(stage, logs));
Some(profiles)
}
Err(e) => {
let (_, fail_style) = self.spinner_styles();
progress.set_style(fail_style);
progress.abandon_with_message(&format!("Build failed: {}", e));
let mut results = self.results.lock().await;
let stage = DeploymentStage::Build(chunk.clone());
let logs = builder.dump_logs().await.map(|s| s.to_string());
results.push(DeploymentResult::failure(stage, logs));
None
}
}
}
async fn apply_profile(self: Arc<Self>, name: &str, mut target: DeploymentTarget, profile: Profile, multi: Arc<MultiProgress>) {
let (style, fail_style) = self.spinner_styles();
let permit = self.parallelism_limit.apply.acquire().await.unwrap();
let bar = multi.add(ProgressBar::new(100));
bar.set_style(style);
bar.set_prefix(name);
bar.tick();
bar.enable_steady_tick(100);
if self.options.upload_keys && !target.config.keys.is_empty() {
bar.set_message("Uploading keys...");
if let Err(e) = target.host.upload_keys(&target.config.keys).await {
bar.set_style(fail_style);
bar.abandon_with_message(&format!("Failed to upload keys: {}", e));
let mut results = self.results.lock().await;
let stage = DeploymentStage::Apply(name.to_string());
let logs = target.host.dump_logs().await.map(|s| s.to_string());
results.push(DeploymentResult::failure(stage, logs));
return;
}
}
bar.set_message("Starting...");
if self.options.progress_bar {
target.host.set_progress_bar(bar.clone());
}
let copy_options = self.options.to_copy_options()
.include_outputs(true);
match target.host.deploy(&profile, self.goal, copy_options).await {
Ok(_) => {
bar.finish_with_message(self.goal.success_str().unwrap());
let mut results = self.results.lock().await;
let stage = DeploymentStage::Apply(name.to_string());
let logs = target.host.dump_logs().await.map(|s| s.to_string());
results.push(DeploymentResult::success(stage, logs));
}
Err(e) => {
bar.set_style(fail_style);
bar.abandon_with_message(&format!("Failed: {}", e));
let mut results = self.results.lock().await;
let stage = DeploymentStage::Apply(name.to_string());
let logs = target.host.dump_logs().await.map(|s| s.to_string());
results.push(DeploymentResult::failure(stage, logs));
}
}
drop(permit);
}
fn spinner_styles(&self) -> (ProgressStyle, ProgressStyle) {
get_spinner_styles(self.progress_alignment)
}
fn eval_limit(&self) -> usize {
if let Some(limit) = self.evaluation_node_limit.get_limit() {
limit
} else {
self.nodes.len()
self.target_names.len()
}
}
}
@ -515,6 +657,9 @@ pub struct DeploymentOptions {
/// Whether to use gzip when copying closures to remote hosts.
gzip: bool,
/// Whether to upload keys when deploying.
upload_keys: bool,
}
impl Default for DeploymentOptions {
@ -523,6 +668,7 @@ impl Default for DeploymentOptions {
progress_bar: true,
substituters_push: true,
gzip: true,
upload_keys: true,
}
}
}
@ -540,6 +686,10 @@ impl DeploymentOptions {
self.gzip = value;
}
pub fn set_upload_keys(&mut self, enable: bool) {
self.upload_keys = enable;
}
fn to_copy_options(&self) -> CopyOptions {
let options = CopyOptions::default();

View file

@ -23,13 +23,12 @@ let
nodeNixpkgs = {};
};
types = lib.types;
# Colmena-specific options
#
# Largely compatible with NixOps/Morph.
deploymentOptions = { name, lib, ... }:
let
types = lib.types;
in {
deploymentOptions = { name, lib, ... }: {
options = {
deployment = {
targetHost = lib.mkOption {
@ -75,6 +74,55 @@ let
type = types.listOf types.str;
default = [];
};
keys = lib.mkOption {
description = ''
A set of secrets to be deployed to the node.
Secrets are transferred to the node out-of-band and
never ends up in the Nix store.
'';
type = types.attrsOf keyType;
default = {};
};
};
};
};
keyType = types.submodule {
options = {
text = lib.mkOption {
description = ''
Content of the key.
'';
type = types.str;
};
destDir = lib.mkOption {
description = ''
Destination directory on the host.
'';
default = "/run/keys";
type = types.str;
};
user = lib.mkOption {
description = ''
The group that will own the file.
'';
default = "root";
type = types.str;
};
group = lib.mkOption {
description = ''
The group that will own the file.
'';
default = "root";
type = types.str;
};
permissions = lib.mkOption {
description = ''
Permissions to set for the file.
'';
default = "0600";
type = types.str;
};
};
};

View file

@ -6,6 +6,7 @@ use indicatif::ProgressBar;
use tempfile::{NamedTempFile, TempPath};
use tokio::process::Command;
use serde::Serialize;
use validator::Validate;
use super::{
StoreDerivation,
@ -27,7 +28,7 @@ pub struct Hive {
impl Hive {
pub fn new<P: AsRef<Path>>(hive: P) -> NixResult<Self> {
let mut eval_nix = NamedTempFile::new().unwrap();
let mut eval_nix = NamedTempFile::new()?;
eval_nix.write_all(HIVE_EVAL).unwrap();
Ok(Self {
@ -51,7 +52,14 @@ impl Hive {
let s: String = self.nix_instantiate("hive.deploymentConfigJson").eval()
.capture_json().await?;
Ok(serde_json::from_str(&s).unwrap())
let configs: HashMap<String, NodeConfig> = serde_json::from_str(&s).unwrap();
for config in configs.values() {
config.validate()?;
for key in config.keys.values() {
key.validate()?;
}
}
Ok(configs)
}
/// Evaluates selected nodes.

View file

@ -1,328 +0,0 @@
use std::collections::HashSet;
use std::convert::TryInto;
use async_trait::async_trait;
use tokio::process::Command;
use indicatif::ProgressBar;
use super::{StorePath, Profile, DeploymentGoal, NixResult, NixError, NixCommand, SYSTEM_PROFILE};
use crate::util::CommandExecution;
pub(crate) fn local() -> Box<dyn Host + 'static> {
Box::new(Local::new())
}
#[derive(Copy, Clone, Debug)]
pub enum CopyDirection {
ToRemote,
FromRemote,
}
#[derive(Copy, Clone, Debug)]
pub struct CopyOptions {
include_outputs: bool,
use_substitutes: bool,
gzip: bool,
}
impl Default for CopyOptions {
fn default() -> Self {
Self {
include_outputs: true,
use_substitutes: true,
gzip: true,
}
}
}
impl CopyOptions {
pub fn include_outputs(mut self, val: bool) -> Self {
self.include_outputs = val;
self
}
pub fn use_substitutes(mut self, val: bool) -> Self {
self.use_substitutes = val;
self
}
pub fn gzip(mut self, val: bool) -> Self {
self.gzip = val;
self
}
}
/// A Nix(OS) host.
///
/// The underlying implementation must be Send and Sync.
#[async_trait]
pub trait Host: Send + Sync + std::fmt::Debug {
/// Sends or receives the specified closure to the host
///
/// The StorePath and its dependent paths will then exist on this host.
async fn copy_closure(&mut self, closure: &StorePath, direction: CopyDirection, options: CopyOptions) -> NixResult<()>;
/// Realizes the specified derivation on the host
///
/// The derivation must already exist on the host.
/// After realization, paths in the Vec<StorePath> will then
/// exist on the host.
async fn realize_remote(&mut self, derivation: &StorePath) -> NixResult<Vec<StorePath>>;
/// Realizes the specified local derivation on the host then retrieves the outputs.
async fn realize(&mut self, derivation: &StorePath) -> NixResult<Vec<StorePath>> {
let options = CopyOptions::default();
self.copy_closure(derivation, CopyDirection::ToRemote, options.include_outputs(false)).await?;
let paths = self.realize_remote(derivation).await?;
self.copy_closure(derivation, CopyDirection::FromRemote, options.include_outputs(true)).await?;
Ok(paths)
}
/// Pushes and optionally activates a profile to the host.
async fn deploy(&mut self, profile: &Profile, goal: DeploymentGoal, copy_options: CopyOptions) -> NixResult<()> {
self.copy_closure(profile.as_store_path(), CopyDirection::ToRemote, copy_options).await?;
if goal.requires_activation() {
self.activate(profile, goal).await?;
}
Ok(())
}
#[allow(unused_variables)]
/// Activates a system profile on the host, if it runs NixOS.
///
/// The profile must already exist on the host. You should probably use deploy instead.
async fn activate(&mut self, profile: &Profile, goal: DeploymentGoal) -> NixResult<()> {
Err(NixError::Unsupported)
}
#[allow(unused_variables)]
/// Provides a ProgressBar to use during operations.
fn set_progress_bar(&mut self, bar: ProgressBar) {
}
/// Dumps human-readable unstructured log messages related to the host.
async fn dump_logs(&self) -> Option<&str> {
None
}
}
/// The local machine running Colmena.
///
/// It may not be capable of realizing some derivations
/// (e.g., building Linux derivations on macOS).
#[derive(Debug)]
pub struct Local {
progress_bar: Option<ProgressBar>,
logs: String,
}
impl Local {
pub fn new() -> Self {
Self {
progress_bar: None,
logs: String::new(),
}
}
}
#[async_trait]
impl Host for Local {
async fn copy_closure(&mut self, _closure: &StorePath, _direction: CopyDirection, _options: CopyOptions) -> NixResult<()> {
Ok(())
}
async fn realize_remote(&mut self, derivation: &StorePath) -> NixResult<Vec<StorePath>> {
let mut command = Command::new("nix-store");
command
.arg("--no-gc-warning")
.arg("--realise")
.arg(derivation.as_path());
let mut execution = CommandExecution::new("local", command);
if let Some(bar) = self.progress_bar.as_ref() {
execution.set_progress_bar(bar.clone());
}
let result = execution.run().await;
let (stdout, stderr) = execution.get_logs();
self.logs += stderr.unwrap();
match result {
Ok(()) => {
stdout.unwrap().lines().map(|p| p.to_string().try_into()).collect()
}
Err(e) => Err(e),
}
}
async fn activate(&mut self, profile: &Profile, goal: DeploymentGoal) -> NixResult<()> {
if goal.should_switch_profile() {
let path = profile.as_path().to_str().unwrap();
Command::new("nix-env")
.args(&["--profile", SYSTEM_PROFILE])
.args(&["--set", path])
.passthrough()
.await?;
}
let activation_command = profile.activation_command(goal).unwrap();
let mut command = Command::new(&activation_command[0]);
command
.args(&activation_command[1..]);
let mut execution = CommandExecution::new("local", command);
if let Some(bar) = self.progress_bar.as_ref() {
execution.set_progress_bar(bar.clone());
}
let result = execution.run().await;
// FIXME: Bad - Order of lines is messed up
let (stdout, stderr) = execution.get_logs();
self.logs += stdout.unwrap();
self.logs += stderr.unwrap();
result
}
fn set_progress_bar(&mut self, bar: ProgressBar) {
self.progress_bar = Some(bar);
}
async fn dump_logs(&self) -> Option<&str> {
Some(&self.logs)
}
}
/// A remote machine connected over SSH.
#[derive(Debug)]
pub struct SSH {
/// The username to use to connect.
user: String,
/// The hostname or IP address to connect to.
host: String,
friendly_name: String,
path_cache: HashSet<StorePath>,
progress_bar: Option<ProgressBar>,
logs: String,
}
#[async_trait]
impl Host for SSH {
async fn copy_closure(&mut self, closure: &StorePath, direction: CopyDirection, options: CopyOptions) -> NixResult<()> {
let command = self.nix_copy_closure(closure, direction, options);
self.run_command(command).await
}
async fn realize_remote(&mut self, derivation: &StorePath) -> NixResult<Vec<StorePath>> {
// FIXME
let paths = self.ssh(&["nix-store", "--no-gc-warning", "--realise", derivation.as_path().to_str().unwrap()])
.capture_output()
.await;
match paths {
Ok(paths) => {
paths.lines().map(|p| p.to_string().try_into()).collect()
}
Err(e) => Err(e),
}
}
async fn activate(&mut self, profile: &Profile, goal: DeploymentGoal) -> NixResult<()> {
if goal.should_switch_profile() {
let path = profile.as_path().to_str().unwrap();
let set_profile = self.ssh(&["nix-env", "--profile", SYSTEM_PROFILE, "--set", path]);
self.run_command(set_profile).await?;
}
let activation_command = profile.activation_command(goal).unwrap();
let v: Vec<&str> = activation_command.iter().map(|s| &**s).collect();
let command = self.ssh(&v);
self.run_command(command).await
}
fn set_progress_bar(&mut self, bar: ProgressBar) {
self.progress_bar = Some(bar);
}
async fn dump_logs(&self) -> Option<&str> {
Some(&self.logs)
}
}
impl SSH {
pub fn new(user: String, host: String) -> SSH {
let friendly_name = host.clone();
Self {
user,
host,
friendly_name,
path_cache: HashSet::new(),
progress_bar: None,
logs: String::new(),
}
}
async fn run_command(&mut self, command: Command) -> NixResult<()> {
let mut execution = CommandExecution::new(&self.friendly_name, command);
if let Some(bar) = self.progress_bar.as_ref() {
execution.set_progress_bar(bar.clone());
}
let result = execution.run().await;
// FIXME: Bad - Order of lines is messed up
let (stdout, stderr) = execution.get_logs();
self.logs += stdout.unwrap();
self.logs += stderr.unwrap();
result
}
fn ssh_target(&self) -> String {
format!("{}@{}", self.user, self.host)
}
fn nix_copy_closure(&self, path: &StorePath, direction: CopyDirection, options: CopyOptions) -> Command {
let mut command = Command::new("nix-copy-closure");
match direction {
CopyDirection::ToRemote => {
command.arg("--to");
}
CopyDirection::FromRemote => {
command.arg("--from");
}
}
// FIXME: Host-agnostic abstraction
if options.include_outputs {
command.arg("--include-outputs");
}
if options.use_substitutes {
command.arg("--use-substitutes");
}
if options.gzip {
command.arg("--gzip");
}
command
.arg(&self.ssh_target())
.arg(path.as_path());
command
}
fn ssh(&self, command: &[&str]) -> Command {
// TODO: Allow configuation of SSH parameters
let mut cmd = Command::new("ssh");
cmd.arg(self.ssh_target())
.args(&["-o", "StrictHostKeyChecking=accept-new"])
.arg("--")
.args(command);
cmd
}
}

View file

@ -0,0 +1,15 @@
set -euo pipefail
destination=%DESTINATION%
tmp=$destination.tmp
user=%USER%
group=%GROUP%
permissions=%PERMISSIONS%
mkdir -p $(dirname "$destination")
touch "$tmp"
chown "$user:$group" $tmp
chmod "$permissions" $tmp
cat <&0 >$tmp
mv "$tmp" "$destination"

151
src/nix/host/local.rs Normal file
View file

@ -0,0 +1,151 @@
//!
use std::convert::TryInto;
use std::collections::HashMap;
use std::fs;
use std::io::Write;
use async_trait::async_trait;
use tokio::process::Command;
use indicatif::ProgressBar;
use tempfile::NamedTempFile;
use super::{CopyDirection, CopyOptions, Host};
use crate::nix::{StorePath, Profile, DeploymentGoal, NixResult, NixCommand, Key, SYSTEM_PROFILE};
use crate::util::CommandExecution;
/// The local machine running Colmena.
///
/// It may not be capable of realizing some derivations
/// (e.g., building Linux derivations on macOS).
#[derive(Debug)]
pub struct Local {
progress_bar: Option<ProgressBar>,
logs: String,
}
impl Local {
pub fn new() -> Self {
Self {
progress_bar: None,
logs: String::new(),
}
}
}
#[async_trait]
impl Host for Local {
async fn copy_closure(&mut self, _closure: &StorePath, _direction: CopyDirection, _options: CopyOptions) -> NixResult<()> {
Ok(())
}
async fn realize_remote(&mut self, derivation: &StorePath) -> NixResult<Vec<StorePath>> {
let mut command = Command::new("nix-store");
command
.arg("--no-gc-warning")
.arg("--realise")
.arg(derivation.as_path());
let mut execution = CommandExecution::new("local", command);
if let Some(bar) = self.progress_bar.as_ref() {
execution.set_progress_bar(bar.clone());
}
let result = execution.run().await;
let (stdout, stderr) = execution.get_logs();
self.logs += stderr.unwrap();
match result {
Ok(()) => {
stdout.unwrap().lines().map(|p| p.to_string().try_into()).collect()
}
Err(e) => Err(e),
}
}
async fn upload_keys(&mut self, keys: &HashMap<String, Key>) -> NixResult<()> {
for (name, key) in keys {
self.upload_key(&name, &key).await?;
}
Ok(())
}
async fn activate(&mut self, profile: &Profile, goal: DeploymentGoal) -> NixResult<()> {
if goal.should_switch_profile() {
let path = profile.as_path().to_str().unwrap();
Command::new("nix-env")
.args(&["--profile", SYSTEM_PROFILE])
.args(&["--set", path])
.passthrough()
.await?;
}
let activation_command = profile.activation_command(goal).unwrap();
let mut command = Command::new(&activation_command[0]);
command
.args(&activation_command[1..]);
let mut execution = CommandExecution::new("local", command);
if let Some(bar) = self.progress_bar.as_ref() {
execution.set_progress_bar(bar.clone());
}
let result = execution.run().await;
// FIXME: Bad - Order of lines is messed up
let (stdout, stderr) = execution.get_logs();
self.logs += stdout.unwrap();
self.logs += stderr.unwrap();
result
}
fn set_progress_bar(&mut self, bar: ProgressBar) {
self.progress_bar = Some(bar);
}
async fn dump_logs(&self) -> Option<&str> {
Some(&self.logs)
}
}
impl Local {
/// "Uploads" a single key.
async fn upload_key(&mut self, name: &str, key: &Key) -> NixResult<()> {
if let Some(progress_bar) = self.progress_bar.as_ref() {
progress_bar.set_message(&format!("Deploying key {}", name));
}
let dest_path = key.dest_dir.join(name);
let mut temp = NamedTempFile::new()?;
temp.write_all(key.text.as_bytes())?;
let (_, temp_path) = temp.keep().map_err(|pe| pe.error)?;
// Well, we need the userspace chmod program to parse the
// permission, for NixOps compatibility
{
let mut command = Command::new("chmod");
command
.arg(&key.permissions)
.arg(&temp_path);
let mut execution = CommandExecution::new("local", command);
execution.run().await?;
}
{
let mut command = Command::new("chown");
command
.arg(&format!("{}:{}", key.user, key.group))
.arg(&temp_path);
let mut execution = CommandExecution::new("local", command);
execution.run().await?;
}
let parent_dir = dest_path.parent().unwrap();
fs::create_dir_all(parent_dir)?;
fs::rename(temp_path, dest_path)?;
Ok(())
}
}

120
src/nix/host/mod.rs Normal file
View file

@ -0,0 +1,120 @@
use std::collections::HashMap;
use async_trait::async_trait;
use indicatif::ProgressBar;
use super::{StorePath, Profile, DeploymentGoal, NixResult, NixError, Key};
mod ssh;
pub use ssh::Ssh;
mod local;
pub use local::Local;
pub(crate) fn local() -> Box<dyn Host + 'static> {
Box::new(Local::new())
}
#[derive(Copy, Clone, Debug)]
pub enum CopyDirection {
ToRemote,
FromRemote,
}
#[derive(Copy, Clone, Debug)]
pub struct CopyOptions {
include_outputs: bool,
use_substitutes: bool,
gzip: bool,
}
impl Default for CopyOptions {
fn default() -> Self {
Self {
include_outputs: true,
use_substitutes: true,
gzip: true,
}
}
}
impl CopyOptions {
pub fn include_outputs(mut self, val: bool) -> Self {
self.include_outputs = val;
self
}
pub fn use_substitutes(mut self, val: bool) -> Self {
self.use_substitutes = val;
self
}
pub fn gzip(mut self, val: bool) -> Self {
self.gzip = val;
self
}
}
/// A Nix(OS) host.
///
/// The underlying implementation must be Send and Sync.
#[async_trait]
pub trait Host: Send + Sync + std::fmt::Debug {
/// Sends or receives the specified closure to the host
///
/// The StorePath and its dependent paths will then exist on this host.
async fn copy_closure(&mut self, closure: &StorePath, direction: CopyDirection, options: CopyOptions) -> NixResult<()>;
/// Realizes the specified derivation on the host
///
/// The derivation must already exist on the host.
/// After realization, paths in the Vec<StorePath> will then
/// exist on the host.
async fn realize_remote(&mut self, derivation: &StorePath) -> NixResult<Vec<StorePath>>;
/// Realizes the specified local derivation on the host then retrieves the outputs.
async fn realize(&mut self, derivation: &StorePath) -> NixResult<Vec<StorePath>> {
let options = CopyOptions::default();
self.copy_closure(derivation, CopyDirection::ToRemote, options.include_outputs(false)).await?;
let paths = self.realize_remote(derivation).await?;
self.copy_closure(derivation, CopyDirection::FromRemote, options.include_outputs(true)).await?;
Ok(paths)
}
/// Pushes and optionally activates a profile to the host.
async fn deploy(&mut self, profile: &Profile, goal: DeploymentGoal, copy_options: CopyOptions) -> NixResult<()> {
self.copy_closure(profile.as_store_path(), CopyDirection::ToRemote, copy_options).await?;
if goal.requires_activation() {
self.activate(profile, goal).await?;
}
Ok(())
}
#[allow(unused_variables)]
/// Uploads a set of keys to the host.
async fn upload_keys(&mut self, keys: &HashMap<String, Key>) -> NixResult<()> {
Err(NixError::Unsupported)
}
#[allow(unused_variables)]
/// Activates a system profile on the host, if it runs NixOS.
///
/// The profile must already exist on the host. You should probably use deploy instead.
async fn activate(&mut self, profile: &Profile, goal: DeploymentGoal) -> NixResult<()> {
Err(NixError::Unsupported)
}
#[allow(unused_variables)]
/// Provides a ProgressBar to use during operations.
fn set_progress_bar(&mut self, bar: ProgressBar) {
}
/// Dumps human-readable unstructured log messages related to the host.
async fn dump_logs(&self) -> Option<&str> {
None
}
}

188
src/nix/host/ssh.rs Normal file
View file

@ -0,0 +1,188 @@
use std::collections::{HashMap, HashSet};
use std::convert::TryInto;
use std::process::Stdio;
use async_trait::async_trait;
use indicatif::ProgressBar;
use tokio::process::Command;
use tokio::io::AsyncWriteExt;
use super::{CopyDirection, CopyOptions, Host};
use crate::nix::{StorePath, Profile, DeploymentGoal, NixResult, NixCommand, NixError, Key, SYSTEM_PROFILE};
use crate::util::CommandExecution;
const DEPLOY_KEY_TEMPLATE: &'static str = include_str!("./deploy-key.template");
/// A remote machine connected over SSH.
#[derive(Debug)]
pub struct Ssh {
/// The username to use to connect.
user: String,
/// The hostname or IP address to connect to.
host: String,
friendly_name: String,
path_cache: HashSet<StorePath>,
progress_bar: Option<ProgressBar>,
logs: String,
}
#[async_trait]
impl Host for Ssh {
async fn copy_closure(&mut self, closure: &StorePath, direction: CopyDirection, options: CopyOptions) -> NixResult<()> {
let command = self.nix_copy_closure(closure, direction, options);
self.run_command(command).await
}
async fn realize_remote(&mut self, derivation: &StorePath) -> NixResult<Vec<StorePath>> {
// FIXME
let paths = self.ssh(&["nix-store", "--no-gc-warning", "--realise", derivation.as_path().to_str().unwrap()])
.capture_output()
.await;
match paths {
Ok(paths) => {
paths.lines().map(|p| p.to_string().try_into()).collect()
}
Err(e) => Err(e),
}
}
async fn upload_keys(&mut self, keys: &HashMap<String, Key>) -> NixResult<()> {
for (name, key) in keys {
self.upload_key(&name, &key).await?;
}
Ok(())
}
async fn activate(&mut self, profile: &Profile, goal: DeploymentGoal) -> NixResult<()> {
if goal.should_switch_profile() {
let path = profile.as_path().to_str().unwrap();
let set_profile = self.ssh(&["nix-env", "--profile", SYSTEM_PROFILE, "--set", path]);
self.run_command(set_profile).await?;
}
let activation_command = profile.activation_command(goal).unwrap();
let v: Vec<&str> = activation_command.iter().map(|s| &**s).collect();
let command = self.ssh(&v);
self.run_command(command).await
}
fn set_progress_bar(&mut self, bar: ProgressBar) {
self.progress_bar = Some(bar);
}
async fn dump_logs(&self) -> Option<&str> {
Some(&self.logs)
}
}
impl Ssh {
pub fn new(user: String, host: String) -> Self {
let friendly_name = host.clone();
Self {
user,
host,
friendly_name,
path_cache: HashSet::new(),
progress_bar: None,
logs: String::new(),
}
}
async fn run_command(&mut self, command: Command) -> NixResult<()> {
let mut execution = CommandExecution::new(&self.friendly_name, command);
if let Some(bar) = self.progress_bar.as_ref() {
execution.set_progress_bar(bar.clone());
}
let result = execution.run().await;
// FIXME: Bad - Order of lines is messed up
let (stdout, stderr) = execution.get_logs();
self.logs += stdout.unwrap();
self.logs += stderr.unwrap();
result
}
fn ssh_target(&self) -> String {
format!("{}@{}", self.user, self.host)
}
fn nix_copy_closure(&self, path: &StorePath, direction: CopyDirection, options: CopyOptions) -> Command {
let mut command = Command::new("nix-copy-closure");
match direction {
CopyDirection::ToRemote => {
command.arg("--to");
}
CopyDirection::FromRemote => {
command.arg("--from");
}
}
// FIXME: Host-agnostic abstraction
if options.include_outputs {
command.arg("--include-outputs");
}
if options.use_substitutes {
command.arg("--use-substitutes");
}
if options.gzip {
command.arg("--gzip");
}
command
.arg(&self.ssh_target())
.arg(path.as_path());
command
}
fn ssh(&self, command: &[&str]) -> Command {
// TODO: Allow configuation of SSH parameters
let mut cmd = Command::new("ssh");
cmd.arg(self.ssh_target())
.args(&["-o", "StrictHostKeyChecking=accept-new", "-T"])
.arg("--")
.args(command);
cmd
}
}
impl Ssh {
/// Uploads a single key.
async fn upload_key(&mut self, name: &str, key: &Key) -> NixResult<()> {
if let Some(progress_bar) = self.progress_bar.as_ref() {
progress_bar.set_message(&format!("Deploying key {}", name));
}
let dest_path = key.dest_dir.join(name);
let remote_command = DEPLOY_KEY_TEMPLATE.to_string()
.replace("%DESTINATION%", dest_path.to_str().unwrap())
.replace("%USER%", &key.user)
.replace("%GROUP%", &key.group)
.replace("%PERMISSIONS%", &key.permissions);
let mut command = self.ssh(&["sh", "-c", &remote_command]);
command.stdin(Stdio::piped());
command.stderr(Stdio::null());
command.stdout(Stdio::null());
let mut child = command.spawn()?;
let mut stdin = child.stdin.take().unwrap();
stdin.write_all(key.text.as_bytes()).await?;
stdin.flush().await?;
drop(stdin);
let exit = child.wait().await?;
if exit.success() {
Ok(())
} else {
Err(NixError::NixFailure { exit_code: exit.code().unwrap() })
}
}
}

35
src/nix/key.rs Normal file
View file

@ -0,0 +1,35 @@
use std::path::PathBuf;
use regex::Regex;
use serde::{Serialize, Deserialize};
use validator::{Validate, ValidationError};
#[derive(Debug, Clone, Validate, Serialize, Deserialize)]
pub struct Key {
pub(crate) text: String,
#[validate(custom = "validate_dest_dir")]
#[serde(rename = "destDir")]
pub(super) dest_dir: PathBuf,
#[validate(custom = "validate_unix_name")]
pub(super) user: String,
#[validate(custom = "validate_unix_name")]
pub(super) group: String,
pub(super) permissions: String,
}
fn validate_unix_name(name: &str) -> Result<(), ValidationError> {
let re = Regex::new(r"^[a-z][-a-z0-9]*$").unwrap();
if re.is_match(name) {
Ok(())
} else {
Err(ValidationError::new("Invalid user/group name"))
}
}
fn validate_dest_dir(dir: &PathBuf) -> Result<(), ValidationError> {
if dir.has_root() {
Ok(())
} else {
Err(ValidationError::new("Secret key destination directory must be absolute"))
}
}

View file

@ -1,4 +1,6 @@
use std::collections::HashMap;
use std::convert::TryFrom;
use std::path::{Component as PathComponent, Path};
use std::process::Stdio;
use async_trait::async_trait;
@ -6,12 +8,13 @@ use serde::de::DeserializeOwned;
use serde::Deserialize;
use snafu::Snafu;
use tokio::process::Command;
use validator::{Validate, ValidationErrors, ValidationError as ValidationErrorType};
use crate::util::CommandExecution;
pub mod host;
pub use host::{Host, CopyDirection, CopyOptions};
use host::SSH;
use host::Ssh;
pub mod hive;
pub use hive::Hive;
@ -19,11 +22,14 @@ pub use hive::Hive;
pub mod store;
pub use store::{StorePath, StoreDerivation};
pub mod key;
pub use key::Key;
pub mod profile;
pub use profile::{Profile, ProfileMap};
pub mod deployment;
pub use deployment::{DeploymentGoal, Deployment};
pub use deployment::{DeploymentGoal, DeploymentTarget, Deployment};
pub const SYSTEM_PROFILE: &'static str = "/nix/var/nix/profiles/system";
@ -50,6 +56,9 @@ pub enum NixError {
#[snafu(display("Invalid Nix store path"))]
InvalidStorePath,
#[snafu(display("Validation error"))]
ValidationError { errors: ValidationErrors },
#[snafu(display("Invalid NixOS system profile"))]
InvalidProfile,
@ -63,7 +72,13 @@ impl From<std::io::Error> for NixError {
}
}
#[derive(Debug, Clone, Deserialize)]
impl From<ValidationErrors> for NixError {
fn from(errors: ValidationErrors) -> Self {
Self::ValidationError { errors }
}
}
#[derive(Debug, Clone, Validate, Deserialize)]
pub struct NodeConfig {
#[serde(rename = "targetHost")]
target_host: Option<String>,
@ -74,6 +89,9 @@ pub struct NodeConfig {
#[serde(rename = "allowLocalDeployment")]
allow_local_deployment: bool,
tags: Vec<String>,
#[validate(custom = "validate_keys")]
keys: HashMap<String, Key>,
}
impl NodeConfig {
@ -82,7 +100,7 @@ impl NodeConfig {
pub fn to_ssh_host(&self) -> Option<Box<dyn Host>> {
self.target_host.as_ref().map(|target_host| {
let host = SSH::new(self.target_user.clone(), target_host.clone());
let host = Ssh::new(self.target_user.clone(), target_host.clone());
let host: Box<dyn Host> = Box::new(host);
host
})
@ -182,3 +200,23 @@ impl NixCommand for CommandExecution {
StorePath::try_from(path)
}
}
fn validate_keys(keys: &HashMap<String, Key>) -> Result<(), ValidationErrorType> {
// Bad secret names:
// - /etc/passwd
// - ../../../../../etc/passwd
for name in keys.keys() {
let path = Path::new(name);
if path.has_root() {
return Err(ValidationErrorType::new("Secret key name cannot be absolute"));
}
for component in path.components() {
if component == PathComponent::ParentDir {
return Err(ValidationErrorType::new("Secret key name cannot refer to parent directory"));
}
}
}
Ok(())
}