feat(users/picnoir/tvix-daemon): parse up to the operation
Using all the primitives recently implemented to nix-compat to reach the point where the Nix client start to send us operation requests. Using a small integration test script (or the VM test, but let's face it, it's too slow to be useful), we manage to reach the point where we're able to read a store operation: 2024-03-21T18:53:27.624876Z INFO tvix_daemon: Incoming connection addr=unix 2024-03-21T18:53:27.625312Z INFO worker:perform_init_handshake: tvix_daemon: Trust sent conn=Connection(unix) conn=Connection(unix) 2024-03-21T18:53:27.625406Z INFO worker: tvix_daemon: Client hanshake succeeded conn=Connection(unix) 2024-03-21T18:53:27.625488Z INFO worker: tvix_daemon: Operation received op=SetOptions conn=Connection(unix) We had to take some shortcuts wrt. stderr/log management. The CPP Nix codebase is a bit confusing in that area. I'll need to spend more time reading this to fully understand what's happening there. For now, sending the STDERR_LAST command to the client does the trick. Change-Id: I9b0e20a52d885e64fe29188496aac5334de61edd Reviewed-on: https://cl.tvl.fyi/c/depot/+/11233 Tested-by: BuildkiteCI Reviewed-by: flokli <flokli@flokli.de>
This commit is contained in:
parent
08cc27cc20
commit
c35a5ff611
2 changed files with 34 additions and 11 deletions
|
@ -6,6 +6,8 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||||
|
|
||||||
use crate::wire::primitive;
|
use crate::wire::primitive;
|
||||||
|
|
||||||
|
pub static STDERR_LAST: u64 = 0x616c7473;
|
||||||
|
|
||||||
/// Worker Operation
|
/// Worker Operation
|
||||||
///
|
///
|
||||||
/// These operations are encoded as unsigned 64 bits before being sent
|
/// These operations are encoded as unsigned 64 bits before being sent
|
||||||
|
|
|
@ -4,7 +4,7 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||||
use tokio_listener::{self, SystemOptions, UserOptions};
|
use tokio_listener::{self, SystemOptions, UserOptions};
|
||||||
use tracing::{debug, error, info, instrument};
|
use tracing::{debug, error, info, instrument};
|
||||||
|
|
||||||
use nix_compat::wire::primitive;
|
use nix_compat::wire::{bytes, primitive, worker_protocol};
|
||||||
|
|
||||||
#[derive(Parser, Debug)]
|
#[derive(Parser, Debug)]
|
||||||
struct Cli {
|
struct Cli {
|
||||||
|
@ -26,13 +26,17 @@ async fn main() {
|
||||||
.parse()
|
.parse()
|
||||||
.expect("Invalid listening socket address");
|
.expect("Invalid listening socket address");
|
||||||
let system_options: SystemOptions = Default::default();
|
let system_options: SystemOptions = Default::default();
|
||||||
let user_options: UserOptions = Default::default();
|
let mut user_options: UserOptions = Default::default();
|
||||||
|
user_options.recv_buffer_size = Some(1024);
|
||||||
|
user_options.send_buffer_size = Some(1024);
|
||||||
|
info!(user_options.send_buffer_size);
|
||||||
|
info!(user_options.recv_buffer_size);
|
||||||
let mut listener = tokio_listener::Listener::bind(&addr, &system_options, &user_options)
|
let mut listener = tokio_listener::Listener::bind(&addr, &system_options, &user_options)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
info!("Listening for incoming connections on {:?}", listener);
|
info!(listener_address = ?listener, "Listening for incoming connections");
|
||||||
while let Ok((conn, addr)) = listener.accept().await {
|
while let Ok((conn, addr)) = listener.accept().await {
|
||||||
info!("Incoming connection from {addr}");
|
info!(addr = %addr, "Incoming connection");
|
||||||
tokio::spawn(async move { worker(conn).await });
|
tokio::spawn(async move { worker(conn).await });
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -46,9 +50,16 @@ where
|
||||||
{
|
{
|
||||||
match perform_init_handshake(&mut conn).await {
|
match perform_init_handshake(&mut conn).await {
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
// TODO: process request here, dispatch to operation
|
info!("Client hanshake succeeded");
|
||||||
// handler.
|
// TODO: implement logging. For now, we'll just send
|
||||||
info!("Handshake done, bye now");
|
// STDERR_LAST, which is good enough to get Nix respond to
|
||||||
|
// us.
|
||||||
|
primitive::write_u64(&mut conn, worker_protocol::STDERR_LAST)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
//
|
||||||
|
let op = worker_protocol::read_op(&mut conn).await.unwrap();
|
||||||
|
info!(op = ?op, "Operation received");
|
||||||
}
|
}
|
||||||
Err(e) => error!("Client handshake failed: {}", e),
|
Err(e) => error!("Client handshake failed: {}", e),
|
||||||
}
|
}
|
||||||
|
@ -106,7 +117,13 @@ where
|
||||||
// good idea.
|
// good idea.
|
||||||
debug!("write version");
|
debug!("write version");
|
||||||
// Plain str padded to 64 bits.
|
// Plain str padded to 64 bits.
|
||||||
conn.write(&"2.3.17\0\0".as_bytes()).await?;
|
bytes::write_bytes(&mut conn, "2.3.17".as_bytes()).await?;
|
||||||
|
conn.flush().await?;
|
||||||
|
}
|
||||||
|
if protocol_minor >= 35 {
|
||||||
|
worker_protocol::write_worker_trust_level(&mut conn, worker_protocol::Trust::Trusted)
|
||||||
|
.await?;
|
||||||
|
info!("Trust sent");
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -137,9 +154,13 @@ mod integration_tests {
|
||||||
.read(&vec![0; 8])
|
.read(&vec![0; 8])
|
||||||
// reservespace
|
// reservespace
|
||||||
.read(&vec![0; 8])
|
.read(&vec![0; 8])
|
||||||
// version
|
// version (size)
|
||||||
.write(&"2.3.17\0\0".as_bytes())
|
.write(&vec![0x06, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
|
||||||
|
// version (data == 2.2.17 + padding)
|
||||||
|
.write(&vec![50, 46, 51, 46, 49, 55, 0, 0])
|
||||||
|
// Trusted (1 == client trusted
|
||||||
|
.write(&vec![1, 0, 0, 0, 0, 0, 0, 0])
|
||||||
.build();
|
.build();
|
||||||
crate::worker(&mut test_conn).await;
|
crate::perform_init_handshake(&mut test_conn).await.unwrap();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue