WIP: Generic activation program #9

Draft
ecoppens wants to merge 6 commits from ecoppens/colmena:main into main
2 changed files with 185 additions and 1 deletions
Showing only changes of commit ab8d8b0321 - Show all commits

182
src/nix/host/generic.rs Normal file
View file

@ -0,0 +1,182 @@
use std::collections::HashMap;
use std::process::Stdio;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter};
use tokio::process::Command;
use super::{CopyDirection, CopyOptions, Host};
use crate::error::{ColmenaError, ColmenaResult};
use crate::job::JobHandle;
use crate::nix::{self, Profile, StorePath};
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct TransportId(String);
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct Transport {
id: TransportId,
name: String,
long_name: String,
description: String,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct GoalId(String);
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct Goal {
id: GoalId,
name: String,
long_name: String,
description: String,
}
/// A request to the activation program
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum Request {
/// Asks for activation program's capabilities
Capabilities,
/// Copy closure to/from host
CopyClosure {
transport: TransportId,
path: StorePath,
to_host: bool,
options: HashMap<String, String>,
},
/// Deploys the profile to host
Deploy {
transport: TransportId,
goal: GoalId,
toplevel: StorePath,
options: HashMap<String, String>,
},
/// Realizes the derivation
Realize {
transport: TransportId,
path: StorePath,
options: HashMap<String, String>,
},
/// Uploads keys to host
UploadKeys { transport: TransportId },
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum CapabilityResponse {
V1 {
supported_transports: Vec<Transport>,
supported_goals: Vec<Goal>,
},
}
/// A response from the activation program
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum Response {
Capabilities(CapabilityResponse),
Progress { phase: String },
NewStorePath { store_path: StorePath },
}
#[derive(Debug)]
pub struct GenericHost {
activation_program: StorePath,
transport: TransportId,
}
#[async_trait]
impl Host for GenericHost {
async fn copy_closure(
&mut self,
closure: &StorePath,
direction: CopyDirection,
options: CopyOptions,
) -> ColmenaResult<()> {
self.call(Request::CopyClosure {
transport: self.transport.clone(),
path: closure.clone(),
to_host: direction == CopyDirection::ToRemote,
options: HashMap::from([(
"include_outputs".to_owned(),
if options.include_outputs {
"true".to_owned()
} else {
"false".to_owned()
},
)]),
})
.await
}
async fn realize_remote(&mut self, derivation: &StorePath) -> ColmenaResult<Vec<StorePath>> {
self.call(Request::Realize {
transport: self.transport.clone(),
path: derivation.clone(),
options: HashMap::new(),
})
.await?;
Ok(Vec::new())
}
fn set_job(&mut self, bar: Option<JobHandle>) {}
async fn deploy(
&mut self,
profile: &Profile,
goal: nix::Goal,
copy_options: CopyOptions,
activation_program: Option<&StorePath>,
) -> ColmenaResult<()> {
self.call(Request::Deploy {
transport: self.transport.clone(),
goal: GoalId(goal.to_string()),
toplevel: profile.as_store_path().clone(),
options: HashMap::new(),
})
.await
}
async fn get_current_system_profile(&mut self) -> ColmenaResult<Profile> {
Err(ColmenaError::Unsupported)
}
async fn get_main_system_profile(&mut self) -> ColmenaResult<Profile> {
Err(ColmenaError::Unsupported)
}
}
impl GenericHost {
async fn call(&mut self, request: Request) -> ColmenaResult<()> {
let mut command = Command::new(self.activation_program.as_path())
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()?;
let json = serde_json::to_string(&request).unwrap();
let mut stdin = BufWriter::new(command.stdin.take().unwrap());
let mut stdout = BufReader::new(command.stdout.take().unwrap()).lines();
stdin.write_all(json.as_bytes()).await?;
tokio::spawn(async move {
let _status = command
.wait()
.await
.expect("child process encountered an error");
});
while let Some(line) = stdout.next_line().await? {
let response: Response = serde_json::from_str(line.as_str()).unwrap();
println!("{:?}", response);
}
Ok(())
}
}

View file

@ -14,7 +14,9 @@ pub use local::Local;
mod key_uploader; mod key_uploader;
#[derive(Copy, Clone, Debug)] mod generic;
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum CopyDirection { pub enum CopyDirection {
ToRemote, ToRemote,
FromRemote, FromRemote,