nix: Add initial nix-eval-job integration

A DrvSetEvaluator is able to evaluate attribute sets of derivations,
streaming results as they come in.
This commit is contained in:
Zhaofeng Li 2022-01-22 17:50:53 -08:00
parent 3e40e84e19
commit cf9a72a1d4
9 changed files with 548 additions and 5 deletions

2
Cargo.lock generated
View file

@ -124,6 +124,7 @@ dependencies = [
name = "colmena" name = "colmena"
version = "0.3.0-pre" version = "0.3.0-pre"
dependencies = [ dependencies = [
"async-stream",
"async-trait", "async-trait",
"atty", "atty",
"clap", "clap",
@ -149,6 +150,7 @@ dependencies = [
"sys-info", "sys-info",
"tempfile", "tempfile",
"tokio", "tokio",
"tokio-stream",
"tokio-test", "tokio-test",
"users", "users",
"uuid", "uuid",

View file

@ -7,6 +7,7 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
async-stream = "0.3.2"
async-trait = "0.1.42" async-trait = "0.1.42"
atty = "0.2" atty = "0.2"
clap = "3.0.0" clap = "3.0.0"
@ -31,6 +32,7 @@ shell-escape = "0.1.5"
sys-info = "0.9.0" sys-info = "0.9.0"
snafu = "0.6.10" snafu = "0.6.10"
tempfile = "3.1.0" tempfile = "3.1.0"
tokio-stream = "0.1.8"
users = "0.11.0" users = "0.11.0"
uuid = { version = "0.8.2", features = ["serde", "v4"] } uuid = { version = "0.8.2", features = ["serde", "v4"] }
validator = { version = "0.14", features = ["derive"] } validator = { version = "0.14", features = ["derive"] }

View file

@ -1,4 +1,4 @@
{ lib, stdenv, rustPlatform, installShellFiles }: { lib, stdenv, rustPlatform, installShellFiles, nix-eval-jobs ? null }:
rustPlatform.buildRustPackage rec { rustPlatform.buildRustPackage rec {
pname = "colmena"; pname = "colmena";
@ -9,10 +9,14 @@ rustPlatform.buildRustPackage rec {
src = lib.cleanSource ./.; src = lib.cleanSource ./.;
}; };
cargoSha256 = "sha256-UUDKAY5zOm/okdRN9Bh+1OgPJFnd1Gw5VMd/+/P46jQ="; cargoSha256 = "sha256-glLQwj9K6efC2cXoOx7oVMOfrFDhFTgm1C+Pghn5hGo=";
nativeBuildInputs = [ installShellFiles ]; nativeBuildInputs = [ installShellFiles ];
propagatedBuildInputs = lib.optional (nix-eval-jobs != null) nix-eval-jobs;
NIX_EVAL_JOBS = lib.optionalString (nix-eval-jobs != null) "${nix-eval-jobs}/bin/nix-eval-jobs";
postInstall = lib.optionalString (stdenv.hostPlatform == stdenv.buildPlatform) '' postInstall = lib.optionalString (stdenv.hostPlatform == stdenv.buildPlatform) ''
installShellCompletion --cmd colmena \ installShellCompletion --cmd colmena \
--bash <($out/bin/colmena gen-completions bash) \ --bash <($out/bin/colmena gen-completions bash) \

View file

@ -9,6 +9,7 @@ use crate::nix::deployment::{
Goal, Goal,
Options, Options,
EvaluationNodeLimit, EvaluationNodeLimit,
Evaluator,
ParallelismLimit, ParallelismLimit,
}; };
use crate::progress::SimpleProgressOutput; use crate::progress::SimpleProgressOutput;
@ -110,6 +111,14 @@ To temporarily disable remote build on all nodes, use `--no-build-on-target`.
.long_help(r#"If `deployment.replaceUnknownProfiles` is set for a target, using this switch .long_help(r#"If `deployment.replaceUnknownProfiles` is set for a target, using this switch
will treat deployment.replaceUnknownProfiles as though it was set true and perform unknown profile replacement."#) will treat deployment.replaceUnknownProfiles as though it was set true and perform unknown profile replacement."#)
.takes_value(false)) .takes_value(false))
.arg(Arg::new("evaluator")
.long("evaluator")
.help("The evaluator to use (experimental)")
.long_help(r#"If set to `chunked` (default), evaluation of nodes will happen in batches. If set to `streaming`, the experimental streaming evaluator (nix-eval-jobs) will be used and nodes will be evaluated in parallel.
This is an experimental feature."#)
.default_value("chunked")
.possible_values(Evaluator::possible_values()))
} }
pub fn subcommand() -> App<'static> { pub fn subcommand() -> App<'static> {
@ -156,6 +165,7 @@ pub async fn run(_global_args: &ArgMatches, local_args: &ArgMatches) -> Result<(
options.set_gzip(!local_args.is_present("no-gzip")); options.set_gzip(!local_args.is_present("no-gzip"));
options.set_upload_keys(!local_args.is_present("no-keys")); options.set_upload_keys(!local_args.is_present("no-keys"));
options.set_force_replace_unknown_profiles(local_args.is_present("force-replace-unknown-profiles")); options.set_force_replace_unknown_profiles(local_args.is_present("force-replace-unknown-profiles"));
options.set_evaluator(local_args.value_of_t("evaluator").unwrap());
if local_args.is_present("keep-result") { if local_args.is_present("keep-result") {
options.set_create_gc_roots(true); options.set_create_gc_roots(true);

View file

@ -8,7 +8,7 @@ pub mod limits;
pub use limits::{EvaluationNodeLimit, ParallelismLimit}; pub use limits::{EvaluationNodeLimit, ParallelismLimit};
pub mod options; pub mod options;
pub use options::Options; pub use options::{Options, Evaluator};
use std::collections::HashMap; use std::collections::HashMap;
use std::mem; use std::mem;
@ -16,6 +16,7 @@ use std::sync::Arc;
use futures::future::join_all; use futures::future::join_all;
use itertools::Itertools; use itertools::Itertools;
use tokio_stream::StreamExt;
use crate::progress::Sender as ProgressSender; use crate::progress::Sender as ProgressSender;
use crate::job::{JobMonitor, JobHandle, JobType, JobState}; use crate::job::{JobMonitor, JobHandle, JobType, JobState};
@ -34,6 +35,11 @@ use super::{
CopyDirection, CopyDirection,
CopyOptions, CopyOptions,
key::{Key, UploadAt as UploadKeyAt}, key::{Key, UploadAt as UploadKeyAt},
evaluator::{
DrvSetEvaluator,
NixEvalJobs,
EvalError,
},
}; };
use super::host; use super::host;
@ -163,7 +169,15 @@ impl Deployment {
let targets = mem::replace(&mut self.targets, HashMap::new()); let targets = mem::replace(&mut self.targets, HashMap::new());
let deployment = DeploymentHandle::new(self); let deployment = DeploymentHandle::new(self);
let meta_future = meta.run(|meta| async move { let meta_future = meta.run(|meta| async move {
match deployment.options.evaluator {
Evaluator::Chunked => {
deployment.execute_chunked(meta.clone(), targets).await?; deployment.execute_chunked(meta.clone(), targets).await?;
}
Evaluator::Streaming => {
log::warn!("Streaming evaluation is an experimental feature");
deployment.execute_streaming(meta.clone(), targets).await?;
}
}
Ok(()) Ok(())
}); });
@ -216,6 +230,99 @@ impl Deployment {
Ok(()) Ok(())
} }
/// Executes the deployment on selected nodes using a streaming evaluator.
async fn execute_streaming(self: &DeploymentHandle, parent: JobHandle, mut targets: TargetNodeMap)
-> ColmenaResult<()>
{
if self.goal == Goal::UploadKeys {
unreachable!(); // some logic is screwed up
}
let nodes: Vec<NodeName> = targets.keys().cloned().collect();
let expr = self.hive.eval_selected_expr(&nodes)?;
let job = parent.create_job(JobType::Evaluate, nodes.clone())?;
let futures = job.run(|job| async move {
let mut evaluator = NixEvalJobs::default();
evaluator.set_job(job.clone());
// FIXME: nix-eval-jobs currently does not support IFD with builders
let options = self.hive.nix_options();
let mut stream = evaluator.evaluate(&expr, options).await?;
let mut futures: Vec<tokio::task::JoinHandle<ColmenaResult<()>>> = Vec::new();
while let Some(item) = stream.next().await {
match item {
Ok(attr) => {
let node_name = NodeName::new(attr.attribute().to_owned())?;
let profile_drv: ProfileDerivation = attr.into_derivation()?;
// FIXME: Consolidate
let mut target = targets.remove(&node_name).unwrap();
if let Some(force_build_on_target) = self.options.force_build_on_target {
target.config.set_build_on_target(force_build_on_target);
}
let job_handle = job.clone();
let arc_self = self.clone();
futures.push(tokio::spawn(async move {
let (target, profile) = {
if target.config.build_on_target() {
arc_self.build_on_node(job_handle.clone(), target, profile_drv.clone()).await?
} else {
arc_self.build_and_push_node(job_handle.clone(), target, profile_drv.clone()).await?
}
};
if arc_self.goal.requires_activation() {
arc_self.activate_node(job_handle, target, profile).await
} else {
Ok(())
}
}));
}
Err(e) => {
match e {
EvalError::Global(e) => {
// Global error - Abort immediately
return Err(e);
}
EvalError::Attribute(e) => {
// Attribute-level error
//
// Here the eventual non-zero exit code of the evaluator
// will translate into an `EvalError::Global`, causing
// the entire future to resolve to an Err.
let node_name = NodeName::new(e.attribute().to_string()).unwrap();
let nodes = vec![ node_name ];
let job = parent.create_job(JobType::Evaluate, nodes)?;
job.state(JobState::Running)?;
for line in e.error().lines() {
job.stderr(line.to_string())?;
}
job.state(JobState::Failed)?;
}
}
}
}
}
Ok(futures)
}).await?;
join_all(futures).await
.into_iter()
.map(|r| r.unwrap()) // panic on JoinError (future panicked)
.collect::<ColmenaResult<Vec<()>>>()?;
Ok(())
}
/// Executes the deployment against a portion of nodes. /// Executes the deployment against a portion of nodes.
async fn execute_one_chunk(self: &DeploymentHandle, parent: JobHandle, mut chunk: TargetNodeMap) -> ColmenaResult<()> { async fn execute_one_chunk(self: &DeploymentHandle, parent: JobHandle, mut chunk: TargetNodeMap) -> ColmenaResult<()> {
if self.goal == Goal::UploadKeys { if self.goal == Goal::UploadKeys {

View file

@ -1,5 +1,8 @@
//! Deployment options. //! Deployment options.
use std::str::FromStr;
use crate::error::{ColmenaError, ColmenaResult};
use crate::nix::CopyOptions; use crate::nix::CopyOptions;
/// Options for a deployment. /// Options for a deployment.
@ -25,6 +28,16 @@ pub struct Options {
/// Ignore the node-level `deployment.replaceUnknownProfiles` option. /// Ignore the node-level `deployment.replaceUnknownProfiles` option.
pub(super) force_replace_unknown_profiles: bool, pub(super) force_replace_unknown_profiles: bool,
/// Which evaluator to use (experimental).
pub(super) evaluator: Evaluator,
}
/// Which evaluator to use.
#[derive(Clone, Debug, PartialEq)]
pub enum Evaluator {
Chunked,
Streaming,
} }
impl Options { impl Options {
@ -52,6 +65,10 @@ impl Options {
self.force_replace_unknown_profiles = enable; self.force_replace_unknown_profiles = enable;
} }
pub fn set_evaluator(&mut self, evaluator: Evaluator) {
self.evaluator = evaluator;
}
pub fn to_copy_options(&self) -> CopyOptions { pub fn to_copy_options(&self) -> CopyOptions {
let options = CopyOptions::default(); let options = CopyOptions::default();
@ -70,6 +87,27 @@ impl Default for Options {
create_gc_roots: false, create_gc_roots: false,
force_build_on_target: None, force_build_on_target: None,
force_replace_unknown_profiles: false, force_replace_unknown_profiles: false,
evaluator: Evaluator::Chunked,
} }
} }
} }
impl FromStr for Evaluator {
type Err = ColmenaError;
fn from_str(s: &str) -> ColmenaResult<Self> {
match s {
"chunked" => Ok(Self::Chunked),
"streaming" => Ok(Self::Streaming),
_ => Err(ColmenaError::Unknown {
message: "Valid values are 'chunked' and 'streaming'".to_string(),
}),
}
}
}
impl Evaluator {
pub fn possible_values() -> &'static [&'static str] {
&[ "chunked", "streaming" ]
}
}

92
src/nix/evaluator/mod.rs Normal file
View file

@ -0,0 +1,92 @@
//! Nix evaluator.
//!
//! A `DrvSetEvaluator` evaluates an attribute set of derivations. Such an
//! implementation may be able to parallelize the evaluation
//! (e.g., with [nix-eval-jobs](https://github.com/nix-community/nix-eval-jobs))
//! and emit results as soon as individual attributes finish evaluating.
//!
//! TODO: Port the chunked evaluator to DrvSetEvaluator
pub mod nix_eval_jobs;
pub use nix_eval_jobs::NixEvalJobs;
use std::convert::TryFrom;
use std::pin::Pin;
use std::result::Result as StdResult;
use async_trait::async_trait;
use futures::Stream;
use crate::job::JobHandle;
use crate::error::{ColmenaResult, ColmenaError};
use super::{BuildResult, StorePath, StoreDerivation, NixExpression, NixOptions};
/// The result of an evaluation.
///
/// The `Ok` variant always correspond to one attribute.
/// The `Err` variant may apply to a single attribute or to the entire
/// evaluation.
pub type EvalResult = StdResult<AttributeOutput, EvalError>;
/// An evaluation error.
#[derive(Debug)]
pub enum EvalError {
/// An attribute-level error.
Attribute(AttributeError),
/// A global error.
Global(ColmenaError),
}
/// The evaluation output of an attribute.
#[derive(Debug)]
pub struct AttributeOutput {
attribute: String,
drv_path: StorePath,
}
/// An attribute-level error.
#[derive(Debug)]
pub struct AttributeError {
attribute: String,
error: String,
}
/// A derivation set evaluator.
///
/// Such an evaluator can evaluate an attribute set of derivations.
#[async_trait]
pub trait DrvSetEvaluator {
/// Evaluates an attribute set of derivation, returning results as they come in.
async fn evaluate(&self, expression: &dyn NixExpression, options: NixOptions) -> ColmenaResult<Pin<Box<dyn Stream<Item = EvalResult>>>>;
/// Provides a JobHandle to use during operations.
#[allow(unused_variables)]
fn set_job(&mut self, job: JobHandle) {}
}
impl AttributeOutput {
/// Returns the attribute name.
pub fn attribute(&self) -> &str {
&self.attribute
}
/// Returns the derivation for this attribute.
pub fn into_derivation<T>(self) -> ColmenaResult<StoreDerivation<T>>
where T: TryFrom<BuildResult<T>>,
{
self.drv_path.into_derivation()
}
}
impl AttributeError {
/// Returns the attribute name.
pub fn attribute(&self) -> &str {
&self.attribute
}
/// Returns the error.
pub fn error(&self) -> &str {
&self.error
}
}

View file

@ -0,0 +1,286 @@
//! nix-eval-jobs evaluator.
//!
//! This evaluator can evaluate attributes in parallel.
//!
//! During build time, the nix-eval-jobs binary may be pinned by setting
//! the `NIX_EVAL_JOBS` environment variable.
use std::io::Write;
use std::path::PathBuf;
use std::pin::Pin;
use std::process::Stdio;
use async_stream::stream;
use async_trait::async_trait;
use futures::Stream;
use serde::Deserialize;
use tempfile::NamedTempFile;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::Command;
use crate::error::{ColmenaResult, ColmenaError};
use crate::job::{JobHandle, null_job_handle};
use crate::nix::{StorePath, NixExpression, NixOptions};
use crate::util::capture_stream;
use super::{DrvSetEvaluator, EvalResult, EvalError, AttributeOutput, AttributeError};
/// The pinned nix-eval-jobs binary.
pub const NIX_EVAL_JOBS: Option<&str> = option_env!("NIX_EVAL_JOBS");
pub struct NixEvalJobs {
executable: PathBuf,
job: JobHandle,
}
/// A line in the eval output.
#[derive(Deserialize)]
#[serde(untagged)]
enum EvalLine {
Derivation(EvalLineDerivation),
Error(EvalLineAttributeError),
}
/// An output from nix-eval-jobs.
///
/// This is nix-eval-jobs's version of `AttributeOutput`.
#[derive(Deserialize)]
struct EvalLineDerivation {
#[serde(rename = "attr")]
attribute: String,
#[serde(rename = "drvPath")]
drv_path: StorePath,
}
/// An error from nix-eval-jobs.
///
/// This is nix-eval-jobs's version of `AttributeError`.
#[derive(Deserialize)]
struct EvalLineAttributeError {
#[serde(rename = "attr")]
attribute: String,
#[serde(rename = "error")]
error: String,
}
#[async_trait]
impl DrvSetEvaluator for NixEvalJobs {
async fn evaluate(&self, expression: &dyn NixExpression, options: NixOptions) -> ColmenaResult<Pin<Box<dyn Stream<Item = EvalResult>>>> {
let expr_file = {
let mut f = NamedTempFile::new()?;
f.write_all(expression.expression().as_bytes())?;
f.into_temp_path()
};
let mut command = Command::new(&self.executable);
command
.arg("--impure")
.args(&["--workers", "10"]) // FIXME: Configurable
.arg(&expr_file);
command.args(options.to_args());
if expression.requires_flakes() {
command.args(&["--experimental-features", "flakes"]);
}
let mut child = command
.stderr(Stdio::piped())
.stdout(Stdio::piped())
.spawn()?;
let mut stdout = BufReader::new(child.stdout.take().unwrap());
let stderr = BufReader::new(child.stderr.take().unwrap());
let job = self.job.clone();
tokio::spawn(async move {
capture_stream(stderr, Some(job), true).await
});
Ok(Box::pin(stream! {
loop {
let mut line = String::new();
let len = {
let r = stdout.read_line(&mut line).await
.map_err(|e| EvalError::Global(e.into()));
match r {
Ok(v) => v,
Err(e) => {
yield Err(e);
break;
}
}
};
if len == 0 {
// Stream ended, wait for exit code
let r = child.wait().await
.map_err(|e| EvalError::Global(e.into()));
let status = match r {
Ok(v) => v,
Err(e) => {
yield Err(e);
break;
}
};
if !status.success() {
yield Err(EvalError::Global(status.into()));
}
break;
}
let trimmed = line.trim();
match serde_json::from_str::<EvalLine>(trimmed) {
Ok(el) => {
yield el.into();
}
Err(e) => {
let bad_output = ColmenaError::BadOutput {
output: e.to_string(),
};
yield Err(EvalError::Global(bad_output));
break;
}
}
}
drop(expr_file);
}))
}
fn set_job(&mut self, job: JobHandle) {
self.job = job;
}
}
impl Default for NixEvalJobs {
fn default() -> Self {
let binary = NIX_EVAL_JOBS.unwrap_or("nix-eval-jobs");
Self {
executable: PathBuf::from(binary),
job: null_job_handle(),
}
}
}
impl From<EvalLineDerivation> for AttributeOutput {
fn from(eld: EvalLineDerivation) -> Self {
Self {
attribute: eld.attribute,
drv_path: eld.drv_path,
}
}
}
impl From<EvalLineAttributeError> for AttributeError {
fn from(ele: EvalLineAttributeError) -> Self {
Self {
attribute: ele.attribute,
error: ele.error,
}
}
}
impl From<EvalLine> for EvalResult {
fn from(el: EvalLine) -> Self {
match el {
EvalLine::Derivation(eld) => Ok(eld.into()),
EvalLine::Error(ele) => Err(EvalError::Attribute(ele.into())),
}
}
}
/// Returns the pinned nix-eval-jobs executable.
///
/// This is used for informational purposes in `colmena nix-info`.
pub fn get_pinned_nix_eval_jobs() -> Option<&'static str> {
NIX_EVAL_JOBS
}
#[cfg(test)]
mod tests {
use super::*;
use tokio_test::block_on;
use tokio_stream::StreamExt;
#[test]
fn test_eval() {
let evaluator = NixEvalJobs::default();
let expr = r#"with import <nixpkgs> {}; { a = pkgs.hello; b = pkgs.bash; }"#.to_string();
block_on(async move {
let mut stream = evaluator.evaluate(&expr, NixOptions::default()).await.unwrap();
let mut count = 0;
while let Some(value) = stream.next().await {
eprintln!("Got {:?}", value);
assert!(value.is_ok());
count += 1;
}
assert_eq!(2, count);
});
}
#[test]
fn test_global_error() {
let evaluator = NixEvalJobs::default();
let expr = r#"gibberish"#.to_string();
block_on(async move {
let mut stream = evaluator.evaluate(&expr, NixOptions::default()).await.unwrap();
let mut count = 0;
while let Some(value) = stream.next().await {
eprintln!("Got {:?}", value);
assert!(value.is_err());
count += 1;
}
assert_eq!(1, count);
});
}
#[test]
fn test_attribute_error() {
let evaluator = NixEvalJobs::default();
let expr = r#"with import <nixpkgs> {}; { a = pkgs.hello; b = throw "an error"; }"#.to_string();
block_on(async move {
let mut stream = evaluator.evaluate(&expr, NixOptions::default()).await.unwrap();
let mut count = 0;
while let Some(value) = stream.next().await {
eprintln!("Got {:?}", value);
match value {
Ok(v) => {
assert_eq!("a", v.attribute);
}
Err(e) => {
match e {
EvalError::Attribute(a) => {
assert_eq!("b", a.attribute);
}
_ => {
panic!("Expected an attribute error, got {:?}", e);
}
}
}
}
count += 1;
}
assert_eq!(2, count);
});
}
}

View file

@ -38,6 +38,8 @@ pub use flake::Flake;
pub mod node_filter; pub mod node_filter;
pub use node_filter::NodeFilter; pub use node_filter::NodeFilter;
pub mod evaluator;
/// Path to the main system profile. /// Path to the main system profile.
pub const SYSTEM_PROFILE: &str = "/nix/var/nix/profiles/system"; pub const SYSTEM_PROFILE: &str = "/nix/var/nix/profiles/system";