feat(users/picnoir/tvix-daemon): implement set_options operation
The protocol is more stateful than I initially thought. We need to keep track to a bunch of things, including but not limited to: the client settings, the client version. I moved things around a bit to keep this state along with the client socket. Change-Id: Ibd34fbe7821c20a460934ea1af0719f5de46e491 Reviewed-on: https://cl.tvl.fyi/c/depot/+/11359 Reviewed-by: flokli <flokli@flokli.de> Tested-by: BuildkiteCI
This commit is contained in:
parent
199f9b0a79
commit
efd8ab5e0b
1 changed files with 52 additions and 9 deletions
|
@ -4,7 +4,10 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||||
use tokio_listener::{self, SystemOptions, UserOptions};
|
use tokio_listener::{self, SystemOptions, UserOptions};
|
||||||
use tracing::{debug, error, info, instrument, Level};
|
use tracing::{debug, error, info, instrument, Level};
|
||||||
|
|
||||||
use nix_compat::wire::{bytes, primitive, worker_protocol};
|
use nix_compat::wire::{
|
||||||
|
bytes, primitive,
|
||||||
|
worker_protocol::{self, ClientSettings},
|
||||||
|
};
|
||||||
|
|
||||||
#[derive(Parser, Debug)]
|
#[derive(Parser, Debug)]
|
||||||
struct Cli {
|
struct Cli {
|
||||||
|
@ -16,6 +19,15 @@ struct Cli {
|
||||||
verbosity: Option<Level>,
|
verbosity: Option<Level>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Structure used to hold the client socket connection and some
|
||||||
|
/// metadata about the connection.
|
||||||
|
#[derive(Debug)]
|
||||||
|
struct ClientConnection<R: AsyncReadExt + AsyncWriteExt + Unpin> {
|
||||||
|
conn: R,
|
||||||
|
version_minor: u64,
|
||||||
|
client_settings: Option<ClientSettings>,
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
#[instrument()]
|
#[instrument()]
|
||||||
async fn main() {
|
async fn main() {
|
||||||
|
@ -58,22 +70,47 @@ where
|
||||||
R: AsyncReadExt + AsyncWriteExt + Unpin + std::fmt::Debug,
|
R: AsyncReadExt + AsyncWriteExt + Unpin + std::fmt::Debug,
|
||||||
{
|
{
|
||||||
match perform_init_handshake(&mut conn).await {
|
match perform_init_handshake(&mut conn).await {
|
||||||
Ok(_) => {
|
Ok(mut client_connection) => {
|
||||||
info!("Client hanshake succeeded");
|
debug!("Client hanshake succeeded");
|
||||||
// TODO: implement logging. For now, we'll just send
|
// TODO: implement logging. For now, we'll just send
|
||||||
// STDERR_LAST, which is good enough to get Nix respond to
|
// STDERR_LAST, which is good enough to get Nix respond to
|
||||||
// us.
|
// us.
|
||||||
primitive::write_u64(&mut conn, worker_protocol::STDERR_LAST)
|
primitive::write_u64(&mut client_connection.conn, worker_protocol::STDERR_LAST)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
//
|
loop {
|
||||||
let op = worker_protocol::read_op(&mut conn).await.unwrap();
|
let op = worker_protocol::read_op(&mut client_connection.conn)
|
||||||
info!(op = ?op, "Operation received");
|
.await
|
||||||
|
.unwrap();
|
||||||
|
match op {
|
||||||
|
worker_protocol::Operation::SetOptions => {
|
||||||
|
let settings = op_set_options(&mut client_connection).await.unwrap();
|
||||||
|
client_connection.client_settings = Some(settings);
|
||||||
|
debug!(settings = ?client_connection.client_settings, "Received client settings");
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
error!(op = ?op, "Unimplemented operation");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Err(e) => error!("Client handshake failed: {}", e),
|
Err(e) => error!("Client handshake failed: {}", e),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn op_set_options<R>(conn: &mut ClientConnection<R>) -> std::io::Result<ClientSettings>
|
||||||
|
where
|
||||||
|
R: AsyncReadExt + AsyncWriteExt + Unpin + std::fmt::Debug,
|
||||||
|
{
|
||||||
|
let settings =
|
||||||
|
worker_protocol::read_client_settings(&mut conn.conn, conn.version_minor).await?;
|
||||||
|
// The client expects us to send some logs when we're processing
|
||||||
|
// the settings. Sending STDERR_LAST signal we're done processing.
|
||||||
|
primitive::write_u64(&mut conn.conn, worker_protocol::STDERR_LAST).await?;
|
||||||
|
Ok(settings)
|
||||||
|
}
|
||||||
|
|
||||||
/// Performs the initial handshake. During the handshake, the client
|
/// Performs the initial handshake. During the handshake, the client
|
||||||
/// will first send a magic u64, to which the daemon needs to respond
|
/// will first send a magic u64, to which the daemon needs to respond
|
||||||
/// with another magic u64.
|
/// with another magic u64.
|
||||||
|
@ -81,7 +118,9 @@ where
|
||||||
/// We then retrieve the client version, and discard a bunch of now
|
/// We then retrieve the client version, and discard a bunch of now
|
||||||
/// obsolete data.
|
/// obsolete data.
|
||||||
#[instrument()]
|
#[instrument()]
|
||||||
async fn perform_init_handshake<'a, R: 'a>(mut conn: &'a mut R) -> anyhow::Result<()>
|
async fn perform_init_handshake<'a, R: 'a>(
|
||||||
|
mut conn: &'a mut R,
|
||||||
|
) -> anyhow::Result<ClientConnection<&'a mut R>>
|
||||||
where
|
where
|
||||||
&'a mut R: AsyncReadExt + AsyncWriteExt + Unpin + std::fmt::Debug,
|
&'a mut R: AsyncReadExt + AsyncWriteExt + Unpin + std::fmt::Debug,
|
||||||
{
|
{
|
||||||
|
@ -134,7 +173,11 @@ where
|
||||||
.await?;
|
.await?;
|
||||||
info!("Trust sent");
|
info!("Trust sent");
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(ClientConnection {
|
||||||
|
conn,
|
||||||
|
version_minor: protocol_minor,
|
||||||
|
client_settings: None,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue