deployment: Move chunked mode into self-contained function

This commit is contained in:
Zhaofeng Li 2022-01-21 00:45:12 -08:00
parent 82361e5ea5
commit d3e556027f

View file

@ -164,17 +164,10 @@ impl Deployment {
Ok(())
} else {
// Do the whole eval-build-deploy flow
let chunks = self.get_chunks();
let targets = mem::replace(&mut self.targets, HashMap::new());
let deployment = DeploymentHandle::new(self);
let meta_future = meta.run(|meta| async move {
let mut futures = Vec::new();
for chunk in chunks.into_iter() {
futures.push(deployment.execute_chunk(meta.clone(), chunk));
}
join_all(futures).await
.into_iter().collect::<ColmenaResult<Vec<()>>>()?;
deployment.execute_chunked(meta.clone(), targets).await?;
Ok(())
});
@ -202,25 +195,33 @@ impl Deployment {
self.evaluation_node_limit = limit;
}
fn get_chunks(&mut self) -> Vec<TargetNodeMap> {
/// Executes the deployment on selected nodes, evaluating a chunk at a time.
async fn execute_chunked(self: &DeploymentHandle, parent: JobHandle, mut targets: TargetNodeMap)
-> ColmenaResult<()>
{
let eval_limit = self.evaluation_node_limit.get_limit()
.unwrap_or_else(|| self.targets.len());
let mut result = Vec::new();
let mut futures = Vec::new();
for chunk in self.targets.drain().chunks(eval_limit).into_iter() {
for chunk in targets.drain().chunks(eval_limit).into_iter() {
let mut map = HashMap::new();
for (name, host) in chunk {
map.insert(name, host);
}
result.push(map);
futures.push(self.execute_one_chunk(parent.clone(), map));
}
result
join_all(futures).await
.into_iter()
.collect::<ColmenaResult<Vec<()>>>()?;
Ok(())
}
/// Executes the deployment against a portion of nodes.
async fn execute_chunk(self: &DeploymentHandle, parent: JobHandle, mut chunk: TargetNodeMap) -> ColmenaResult<()> {
async fn execute_one_chunk(self: &DeploymentHandle, parent: JobHandle, mut chunk: TargetNodeMap) -> ColmenaResult<()> {
if self.goal == Goal::UploadKeys {
unreachable!(); // some logic is screwed up
}