feat(users/picnoir/tvix-daemon): introduce tvix-daemon

This daemon is a re-implementation of the Nix daemon except it uses
tvix-store as a remote store.

For now, it's very barebones, this is just a quick and dirty setup to
get started with the project. We bind to the unix socket provided by
systemd, wait for cpp Nix to send the magic hello bytes, respond with
the magic hello bytes and call it a day.

Storing this under my username for now, the project is mostly
irrelevant as it is. We'll move it to Tvix if it gets complete and
relevant at some point.

Change-Id: Ifc5dce2df37413504f9de1942c5b7d425eddf759
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11198
Reviewed-by: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
This commit is contained in:
Picnoir 2024-03-19 14:34:59 +01:00 committed by picnoir picnoir
parent 09e5f1782c
commit 29b5ffbda0
7 changed files with 15782 additions and 0 deletions

3372
users/picnoir/tvix-daemon/Cargo.lock generated Normal file

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,17 @@
[package]
name = "tvix-daemon"
version = "0.1.0"
edition = "2021"
[dependencies]
tvix-store = { path = "../../../tvix/store/" }
nix-compat = { path = "../../../tvix/nix-compat/" }
tokio-listener = "0.3.1"
tokio = { version = "1.36.0", features = ["full"] }
tracing-subscriber = "0.3.18"
tracing = "0.1.40"
anyhow = "1.0.81"
clap = { version = "4.5.3", features = ["derive"] }
[dev-dependencies]
tokio-test = "0.4.4"

View file

@ -0,0 +1,14 @@
# Tvix-daemon
A **super** incomplete implementation of a Nix-compatible daemon. Same as the original except it's backed by Tvix-Store.
For now, this is mostly used as a playground to implement the Nix daemon wire format in nix-compat.
On the long run, I hope this to be useful to get some real-world usage experience of tvix-store.
## Build
```sh
mg shell //tvix:shell
cargo build
```

View file

@ -0,0 +1,26 @@
{ depot, lib, pkgs, ... }:
let
crate2nix = pkgs.callPackage ./Cargo.nix {
defaultCrateOverrides = {
tvix-castore = prev: {
PROTO_ROOT = depot.tvix.castore.protos.protos;
nativeBuildInputs = protobufDep prev;
};
tvix-store = prev: {
PROTO_ROOT = depot.tvix.store.protos.protos;
nativeBuildInputs = protobufDep prev;
};
};
};
protobufDep = prev: (prev.nativeBuildInputs or [ ]) ++ [ pkgs.buildPackages.protobuf ];
in
{
shell = (import ./shell.nix { inherit pkgs; });
tvix-daemon = crate2nix.rootCrate.build;
meta.ci.targets = [
"tvix-daemon"
"shell"
];
}

View file

@ -0,0 +1,21 @@
{ pkgs, ... }:
pkgs.mkShell {
name = "tvix-daemon";
packages = [
pkgs.cargo
pkgs.cargo-machete
pkgs.cargo-expand
pkgs.clippy
pkgs.evans
pkgs.fuse
pkgs.go
pkgs.grpcurl
pkgs.hyperfine
pkgs.nix_2_3 # b/313
pkgs.pkg-config
pkgs.rust-analyzer
pkgs.rustc
pkgs.rustfmt
pkgs.protobuf
];
}

View file

@ -0,0 +1,89 @@
use anyhow::anyhow;
use clap::Parser;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio_listener::{self, SystemOptions, UserOptions};
use tracing::{error, info, instrument};
use nix_compat::wire::primitive;
#[derive(Parser, Debug)]
struct Cli {
/// Listening unix socket path
#[arg(short, long)]
socket: Option<String>,
}
#[tokio::main]
#[instrument()]
async fn main() {
tracing_subscriber::fmt().compact().try_init().unwrap();
let args = Cli::parse();
info!("Started Tvix daemon");
let addr = args
.socket
.unwrap_or_else(|| "sd_listen_unix".to_string())
.parse()
.expect("Invalid listening socket address");
let system_options: SystemOptions = Default::default();
let user_options: UserOptions = Default::default();
let mut listener = tokio_listener::Listener::bind(&addr, &system_options, &user_options)
.await
.unwrap();
info!("Listening for incoming connections on {:?}", listener);
while let Ok((conn, addr)) = listener.accept().await {
info!("Incoming connection from {addr}");
tokio::spawn(async move { worker(conn).await });
}
}
/// Worker in charge to respond a Nix client using the Nix wire
/// protocol.
#[instrument()]
async fn worker<R>(mut conn: R)
where
R: AsyncReadExt + AsyncWriteExt + Unpin + std::fmt::Debug,
{
match perform_init_handshake(&mut conn).await {
Ok(_) => {
// TODO: process request here, dispatch to operation
// handler.
info!("Handshake done, bye now");
}
Err(e) => error!("Client handshake failed: {}", e),
}
}
/// Performs the initial handshake. During the handshake, the client
/// will first send a magic u64, to which the daemon needs to respond
/// with another magic u64.
async fn perform_init_handshake<'a, R: 'a>(mut conn: &'a mut R) -> anyhow::Result<()>
where
&'a mut R: AsyncReadExt + AsyncWriteExt + Unpin,
{
let mut magic_hello = vec![0; 8];
conn.read(&mut magic_hello).await?;
if magic_hello != primitive::MAGIC_HELLO {
Err(anyhow!(
"Invalid client hello received: {:?}, expected {:?}",
magic_hello,
primitive::MAGIC_HELLO
))
} else {
conn.write(&primitive::MAGIC_HELLO_RESPONSE).await?;
Ok(())
}
}
#[cfg(test)]
mod integration_tests {
use nix_compat::wire::primitive;
#[tokio::test]
async fn test_init_handshake() {
let mut test_conn = tokio_test::io::Builder::new()
.read(&primitive::MAGIC_HELLO)
.write(&primitive::MAGIC_HELLO_RESPONSE)
.build();
crate::worker(&mut test_conn).await;
}
}