diff --git a/Cargo.lock b/Cargo.lock index 0cd768f..86670c0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -124,6 +124,7 @@ dependencies = [ name = "colmena" version = "0.3.0-pre" dependencies = [ + "async-stream", "async-trait", "atty", "clap", @@ -149,6 +150,7 @@ dependencies = [ "sys-info", "tempfile", "tokio", + "tokio-stream", "tokio-test", "users", "uuid", diff --git a/Cargo.toml b/Cargo.toml index 926c06c..c568b7c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +async-stream = "0.3.2" async-trait = "0.1.42" atty = "0.2" clap = "3.0.0" @@ -31,6 +32,7 @@ shell-escape = "0.1.5" sys-info = "0.9.0" snafu = "0.6.10" tempfile = "3.1.0" +tokio-stream = "0.1.8" users = "0.11.0" uuid = { version = "0.8.2", features = ["serde", "v4"] } validator = { version = "0.14", features = ["derive"] } diff --git a/package.nix b/package.nix index 4c005ee..dca39a4 100644 --- a/package.nix +++ b/package.nix @@ -1,4 +1,4 @@ -{ lib, stdenv, rustPlatform, installShellFiles }: +{ lib, stdenv, rustPlatform, installShellFiles, nix-eval-jobs ? null }: rustPlatform.buildRustPackage rec { pname = "colmena"; @@ -9,10 +9,14 @@ rustPlatform.buildRustPackage rec { src = lib.cleanSource ./.; }; - cargoSha256 = "sha256-UUDKAY5zOm/okdRN9Bh+1OgPJFnd1Gw5VMd/+/P46jQ="; + cargoSha256 = "sha256-glLQwj9K6efC2cXoOx7oVMOfrFDhFTgm1C+Pghn5hGo="; nativeBuildInputs = [ installShellFiles ]; + propagatedBuildInputs = lib.optional (nix-eval-jobs != null) nix-eval-jobs; + + NIX_EVAL_JOBS = lib.optionalString (nix-eval-jobs != null) "${nix-eval-jobs}/bin/nix-eval-jobs"; + postInstall = lib.optionalString (stdenv.hostPlatform == stdenv.buildPlatform) '' installShellCompletion --cmd colmena \ --bash <($out/bin/colmena gen-completions bash) \ diff --git a/src/command/apply.rs b/src/command/apply.rs index 9d72739..3b293c6 100644 --- a/src/command/apply.rs +++ b/src/command/apply.rs @@ -9,6 +9,7 @@ use crate::nix::deployment::{ Goal, Options, EvaluationNodeLimit, + Evaluator, ParallelismLimit, }; use crate::progress::SimpleProgressOutput; @@ -110,6 +111,14 @@ To temporarily disable remote build on all nodes, use `--no-build-on-target`. .long_help(r#"If `deployment.replaceUnknownProfiles` is set for a target, using this switch will treat deployment.replaceUnknownProfiles as though it was set true and perform unknown profile replacement."#) .takes_value(false)) + .arg(Arg::new("evaluator") + .long("evaluator") + .help("The evaluator to use (experimental)") + .long_help(r#"If set to `chunked` (default), evaluation of nodes will happen in batches. If set to `streaming`, the experimental streaming evaluator (nix-eval-jobs) will be used and nodes will be evaluated in parallel. + +This is an experimental feature."#) + .default_value("chunked") + .possible_values(Evaluator::possible_values())) } pub fn subcommand() -> App<'static> { @@ -156,6 +165,7 @@ pub async fn run(_global_args: &ArgMatches, local_args: &ArgMatches) -> Result<( options.set_gzip(!local_args.is_present("no-gzip")); options.set_upload_keys(!local_args.is_present("no-keys")); options.set_force_replace_unknown_profiles(local_args.is_present("force-replace-unknown-profiles")); + options.set_evaluator(local_args.value_of_t("evaluator").unwrap()); if local_args.is_present("keep-result") { options.set_create_gc_roots(true); diff --git a/src/nix/deployment/mod.rs b/src/nix/deployment/mod.rs index 283efd8..fe310a4 100644 --- a/src/nix/deployment/mod.rs +++ b/src/nix/deployment/mod.rs @@ -8,7 +8,7 @@ pub mod limits; pub use limits::{EvaluationNodeLimit, ParallelismLimit}; pub mod options; -pub use options::Options; +pub use options::{Options, Evaluator}; use std::collections::HashMap; use std::mem; @@ -16,6 +16,7 @@ use std::sync::Arc; use futures::future::join_all; use itertools::Itertools; +use tokio_stream::StreamExt; use crate::progress::Sender as ProgressSender; use crate::job::{JobMonitor, JobHandle, JobType, JobState}; @@ -34,6 +35,11 @@ use super::{ CopyDirection, CopyOptions, key::{Key, UploadAt as UploadKeyAt}, + evaluator::{ + DrvSetEvaluator, + NixEvalJobs, + EvalError, + }, }; use super::host; @@ -163,7 +169,15 @@ impl Deployment { let targets = mem::replace(&mut self.targets, HashMap::new()); let deployment = DeploymentHandle::new(self); let meta_future = meta.run(|meta| async move { - deployment.execute_chunked(meta.clone(), targets).await?; + match deployment.options.evaluator { + Evaluator::Chunked => { + deployment.execute_chunked(meta.clone(), targets).await?; + } + Evaluator::Streaming => { + log::warn!("Streaming evaluation is an experimental feature"); + deployment.execute_streaming(meta.clone(), targets).await?; + } + } Ok(()) }); @@ -216,6 +230,99 @@ impl Deployment { Ok(()) } + /// Executes the deployment on selected nodes using a streaming evaluator. + async fn execute_streaming(self: &DeploymentHandle, parent: JobHandle, mut targets: TargetNodeMap) + -> ColmenaResult<()> + { + if self.goal == Goal::UploadKeys { + unreachable!(); // some logic is screwed up + } + + let nodes: Vec = targets.keys().cloned().collect(); + let expr = self.hive.eval_selected_expr(&nodes)?; + + let job = parent.create_job(JobType::Evaluate, nodes.clone())?; + + let futures = job.run(|job| async move { + let mut evaluator = NixEvalJobs::default(); + evaluator.set_job(job.clone()); + + // FIXME: nix-eval-jobs currently does not support IFD with builders + let options = self.hive.nix_options(); + let mut stream = evaluator.evaluate(&expr, options).await?; + + let mut futures: Vec>> = Vec::new(); + + while let Some(item) = stream.next().await { + match item { + Ok(attr) => { + let node_name = NodeName::new(attr.attribute().to_owned())?; + let profile_drv: ProfileDerivation = attr.into_derivation()?; + + // FIXME: Consolidate + let mut target = targets.remove(&node_name).unwrap(); + + if let Some(force_build_on_target) = self.options.force_build_on_target { + target.config.set_build_on_target(force_build_on_target); + } + + let job_handle = job.clone(); + let arc_self = self.clone(); + futures.push(tokio::spawn(async move { + let (target, profile) = { + if target.config.build_on_target() { + arc_self.build_on_node(job_handle.clone(), target, profile_drv.clone()).await? + } else { + arc_self.build_and_push_node(job_handle.clone(), target, profile_drv.clone()).await? + } + }; + + if arc_self.goal.requires_activation() { + arc_self.activate_node(job_handle, target, profile).await + } else { + Ok(()) + } + })); + } + Err(e) => { + match e { + EvalError::Global(e) => { + // Global error - Abort immediately + return Err(e); + } + EvalError::Attribute(e) => { + // Attribute-level error + // + // Here the eventual non-zero exit code of the evaluator + // will translate into an `EvalError::Global`, causing + // the entire future to resolve to an Err. + + let node_name = NodeName::new(e.attribute().to_string()).unwrap(); + let nodes = vec![ node_name ]; + let job = parent.create_job(JobType::Evaluate, nodes)?; + + job.state(JobState::Running)?; + for line in e.error().lines() { + job.stderr(line.to_string())?; + } + job.state(JobState::Failed)?; + } + } + } + } + } + + Ok(futures) + }).await?; + + join_all(futures).await + .into_iter() + .map(|r| r.unwrap()) // panic on JoinError (future panicked) + .collect::>>()?; + + Ok(()) + } + /// Executes the deployment against a portion of nodes. async fn execute_one_chunk(self: &DeploymentHandle, parent: JobHandle, mut chunk: TargetNodeMap) -> ColmenaResult<()> { if self.goal == Goal::UploadKeys { diff --git a/src/nix/deployment/options.rs b/src/nix/deployment/options.rs index e9c577f..8de5582 100644 --- a/src/nix/deployment/options.rs +++ b/src/nix/deployment/options.rs @@ -1,5 +1,8 @@ //! Deployment options. +use std::str::FromStr; + +use crate::error::{ColmenaError, ColmenaResult}; use crate::nix::CopyOptions; /// Options for a deployment. @@ -25,6 +28,16 @@ pub struct Options { /// Ignore the node-level `deployment.replaceUnknownProfiles` option. pub(super) force_replace_unknown_profiles: bool, + + /// Which evaluator to use (experimental). + pub(super) evaluator: Evaluator, +} + +/// Which evaluator to use. +#[derive(Clone, Debug, PartialEq)] +pub enum Evaluator { + Chunked, + Streaming, } impl Options { @@ -52,6 +65,10 @@ impl Options { self.force_replace_unknown_profiles = enable; } + pub fn set_evaluator(&mut self, evaluator: Evaluator) { + self.evaluator = evaluator; + } + pub fn to_copy_options(&self) -> CopyOptions { let options = CopyOptions::default(); @@ -70,6 +87,27 @@ impl Default for Options { create_gc_roots: false, force_build_on_target: None, force_replace_unknown_profiles: false, + evaluator: Evaluator::Chunked, } } } + +impl FromStr for Evaluator { + type Err = ColmenaError; + + fn from_str(s: &str) -> ColmenaResult { + match s { + "chunked" => Ok(Self::Chunked), + "streaming" => Ok(Self::Streaming), + _ => Err(ColmenaError::Unknown { + message: "Valid values are 'chunked' and 'streaming'".to_string(), + }), + } + } +} + +impl Evaluator { + pub fn possible_values() -> &'static [&'static str] { + &[ "chunked", "streaming" ] + } +} diff --git a/src/nix/evaluator/mod.rs b/src/nix/evaluator/mod.rs new file mode 100644 index 0000000..70263bf --- /dev/null +++ b/src/nix/evaluator/mod.rs @@ -0,0 +1,92 @@ +//! Nix evaluator. +//! +//! A `DrvSetEvaluator` evaluates an attribute set of derivations. Such an +//! implementation may be able to parallelize the evaluation +//! (e.g., with [nix-eval-jobs](https://github.com/nix-community/nix-eval-jobs)) +//! and emit results as soon as individual attributes finish evaluating. +//! +//! TODO: Port the chunked evaluator to DrvSetEvaluator + +pub mod nix_eval_jobs; +pub use nix_eval_jobs::NixEvalJobs; + +use std::convert::TryFrom; +use std::pin::Pin; +use std::result::Result as StdResult; + +use async_trait::async_trait; +use futures::Stream; + +use crate::job::JobHandle; +use crate::error::{ColmenaResult, ColmenaError}; +use super::{BuildResult, StorePath, StoreDerivation, NixExpression, NixOptions}; + +/// The result of an evaluation. +/// +/// The `Ok` variant always correspond to one attribute. +/// The `Err` variant may apply to a single attribute or to the entire +/// evaluation. +pub type EvalResult = StdResult; + +/// An evaluation error. +#[derive(Debug)] +pub enum EvalError { + /// An attribute-level error. + Attribute(AttributeError), + + /// A global error. + Global(ColmenaError), +} + +/// The evaluation output of an attribute. +#[derive(Debug)] +pub struct AttributeOutput { + attribute: String, + drv_path: StorePath, +} + +/// An attribute-level error. +#[derive(Debug)] +pub struct AttributeError { + attribute: String, + error: String, +} + +/// A derivation set evaluator. +/// +/// Such an evaluator can evaluate an attribute set of derivations. +#[async_trait] +pub trait DrvSetEvaluator { + /// Evaluates an attribute set of derivation, returning results as they come in. + async fn evaluate(&self, expression: &dyn NixExpression, options: NixOptions) -> ColmenaResult>>>; + + /// Provides a JobHandle to use during operations. + #[allow(unused_variables)] + fn set_job(&mut self, job: JobHandle) {} +} + +impl AttributeOutput { + /// Returns the attribute name. + pub fn attribute(&self) -> &str { + &self.attribute + } + + /// Returns the derivation for this attribute. + pub fn into_derivation(self) -> ColmenaResult> + where T: TryFrom>, + { + self.drv_path.into_derivation() + } +} + +impl AttributeError { + /// Returns the attribute name. + pub fn attribute(&self) -> &str { + &self.attribute + } + + /// Returns the error. + pub fn error(&self) -> &str { + &self.error + } +} diff --git a/src/nix/evaluator/nix_eval_jobs.rs b/src/nix/evaluator/nix_eval_jobs.rs new file mode 100644 index 0000000..f2609b4 --- /dev/null +++ b/src/nix/evaluator/nix_eval_jobs.rs @@ -0,0 +1,286 @@ +//! nix-eval-jobs evaluator. +//! +//! This evaluator can evaluate attributes in parallel. +//! +//! During build time, the nix-eval-jobs binary may be pinned by setting +//! the `NIX_EVAL_JOBS` environment variable. + +use std::io::Write; +use std::path::PathBuf; +use std::pin::Pin; +use std::process::Stdio; + +use async_stream::stream; +use async_trait::async_trait; +use futures::Stream; +use serde::Deserialize; +use tempfile::NamedTempFile; +use tokio::io::{AsyncBufReadExt, BufReader}; +use tokio::process::Command; + +use crate::error::{ColmenaResult, ColmenaError}; +use crate::job::{JobHandle, null_job_handle}; +use crate::nix::{StorePath, NixExpression, NixOptions}; +use crate::util::capture_stream; +use super::{DrvSetEvaluator, EvalResult, EvalError, AttributeOutput, AttributeError}; + +/// The pinned nix-eval-jobs binary. +pub const NIX_EVAL_JOBS: Option<&str> = option_env!("NIX_EVAL_JOBS"); + +pub struct NixEvalJobs { + executable: PathBuf, + job: JobHandle, +} + +/// A line in the eval output. +#[derive(Deserialize)] +#[serde(untagged)] +enum EvalLine { + Derivation(EvalLineDerivation), + Error(EvalLineAttributeError), +} + +/// An output from nix-eval-jobs. +/// +/// This is nix-eval-jobs's version of `AttributeOutput`. +#[derive(Deserialize)] +struct EvalLineDerivation { + #[serde(rename = "attr")] + attribute: String, + + #[serde(rename = "drvPath")] + drv_path: StorePath, +} + +/// An error from nix-eval-jobs. +/// +/// This is nix-eval-jobs's version of `AttributeError`. +#[derive(Deserialize)] +struct EvalLineAttributeError { + #[serde(rename = "attr")] + attribute: String, + + #[serde(rename = "error")] + error: String, +} + +#[async_trait] +impl DrvSetEvaluator for NixEvalJobs { + async fn evaluate(&self, expression: &dyn NixExpression, options: NixOptions) -> ColmenaResult>>> { + let expr_file = { + let mut f = NamedTempFile::new()?; + f.write_all(expression.expression().as_bytes())?; + f.into_temp_path() + }; + + let mut command = Command::new(&self.executable); + command + .arg("--impure") + .args(&["--workers", "10"]) // FIXME: Configurable + .arg(&expr_file); + + command.args(options.to_args()); + + if expression.requires_flakes() { + command.args(&["--experimental-features", "flakes"]); + } + + let mut child = command + .stderr(Stdio::piped()) + .stdout(Stdio::piped()) + .spawn()?; + + let mut stdout = BufReader::new(child.stdout.take().unwrap()); + let stderr = BufReader::new(child.stderr.take().unwrap()); + + let job = self.job.clone(); + tokio::spawn(async move { + capture_stream(stderr, Some(job), true).await + }); + + Ok(Box::pin(stream! { + loop { + let mut line = String::new(); + let len = { + let r = stdout.read_line(&mut line).await + .map_err(|e| EvalError::Global(e.into())); + + match r { + Ok(v) => v, + Err(e) => { + yield Err(e); + break; + } + } + }; + + if len == 0 { + // Stream ended, wait for exit code + let r = child.wait().await + .map_err(|e| EvalError::Global(e.into())); + + let status = match r { + Ok(v) => v, + Err(e) => { + yield Err(e); + break; + } + }; + + if !status.success() { + yield Err(EvalError::Global(status.into())); + } + + break; + } + + let trimmed = line.trim(); + match serde_json::from_str::(trimmed) { + Ok(el) => { + yield el.into(); + } + Err(e) => { + let bad_output = ColmenaError::BadOutput { + output: e.to_string(), + }; + + yield Err(EvalError::Global(bad_output)); + break; + } + } + } + + drop(expr_file); + })) + } + + fn set_job(&mut self, job: JobHandle) { + self.job = job; + } +} + +impl Default for NixEvalJobs { + fn default() -> Self { + let binary = NIX_EVAL_JOBS.unwrap_or("nix-eval-jobs"); + + Self { + executable: PathBuf::from(binary), + job: null_job_handle(), + } + } +} + +impl From for AttributeOutput { + fn from(eld: EvalLineDerivation) -> Self { + Self { + attribute: eld.attribute, + drv_path: eld.drv_path, + } + } +} + +impl From for AttributeError { + fn from(ele: EvalLineAttributeError) -> Self { + Self { + attribute: ele.attribute, + error: ele.error, + } + } +} + +impl From for EvalResult { + fn from(el: EvalLine) -> Self { + match el { + EvalLine::Derivation(eld) => Ok(eld.into()), + EvalLine::Error(ele) => Err(EvalError::Attribute(ele.into())), + } + } +} + +/// Returns the pinned nix-eval-jobs executable. +/// +/// This is used for informational purposes in `colmena nix-info`. +pub fn get_pinned_nix_eval_jobs() -> Option<&'static str> { + NIX_EVAL_JOBS +} + +#[cfg(test)] +mod tests { + use super::*; + + use tokio_test::block_on; + use tokio_stream::StreamExt; + + #[test] + fn test_eval() { + let evaluator = NixEvalJobs::default(); + let expr = r#"with import {}; { a = pkgs.hello; b = pkgs.bash; }"#.to_string(); + + block_on(async move { + let mut stream = evaluator.evaluate(&expr, NixOptions::default()).await.unwrap(); + let mut count = 0; + + while let Some(value) = stream.next().await { + eprintln!("Got {:?}", value); + assert!(value.is_ok()); + + count += 1; + } + + assert_eq!(2, count); + }); + } + + #[test] + fn test_global_error() { + let evaluator = NixEvalJobs::default(); + let expr = r#"gibberish"#.to_string(); + + block_on(async move { + let mut stream = evaluator.evaluate(&expr, NixOptions::default()).await.unwrap(); + let mut count = 0; + + while let Some(value) = stream.next().await { + eprintln!("Got {:?}", value); + assert!(value.is_err()); + count += 1; + } + + assert_eq!(1, count); + }); + } + + #[test] + fn test_attribute_error() { + let evaluator = NixEvalJobs::default(); + let expr = r#"with import {}; { a = pkgs.hello; b = throw "an error"; }"#.to_string(); + + block_on(async move { + let mut stream = evaluator.evaluate(&expr, NixOptions::default()).await.unwrap(); + let mut count = 0; + + while let Some(value) = stream.next().await { + eprintln!("Got {:?}", value); + + match value { + Ok(v) => { + assert_eq!("a", v.attribute); + } + Err(e) => { + match e { + EvalError::Attribute(a) => { + assert_eq!("b", a.attribute); + } + _ => { + panic!("Expected an attribute error, got {:?}", e); + } + } + } + } + count += 1; + } + + assert_eq!(2, count); + }); + } +} diff --git a/src/nix/mod.rs b/src/nix/mod.rs index 90066a7..e82e154 100644 --- a/src/nix/mod.rs +++ b/src/nix/mod.rs @@ -38,6 +38,8 @@ pub use flake::Flake; pub mod node_filter; pub use node_filter::NodeFilter; +pub mod evaluator; + /// Path to the main system profile. pub const SYSTEM_PROFILE: &str = "/nix/var/nix/profiles/system"; @@ -47,7 +49,7 @@ pub const CURRENT_PROFILE: &str = "/run/current-system"; /// A node's attribute name. #[derive(Serialize, Deserialize, Clone, Debug, Hash, Eq, PartialEq)] #[serde(transparent)] -pub struct NodeName( +pub struct NodeName ( #[serde(deserialize_with = "NodeName::deserialize")] String );