feat(tvix/store): use tokio-listener for tvix-store daemon command

This allows binding on unix sockets, as well as systemd socket
activation.

Change-Id: Icf648c4fd0895468c52607deb6397b8b5928102b
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11901
Autosubmit: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
Reviewed-by: Connor Brewster <cbrewster@hey.com>
This commit is contained in:
Florian Klink 2024-06-30 22:29:11 +03:00 committed by clbot
parent f6c759de58
commit 830fdda8d4
4 changed files with 31 additions and 19 deletions

1
tvix/Cargo.lock generated
View file

@ -3831,6 +3831,7 @@ version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4134661e12ec11c6276be73544a43144a357b08dfab5c41fd226e15b5bc9a6b2" checksum = "4134661e12ec11c6276be73544a43144a357b08dfab5c41fd226e15b5bc9a6b2"
dependencies = [ dependencies = [
"clap",
"document-features", "document-features",
"futures-core", "futures-core",
"futures-util", "futures-util",

View file

@ -11722,6 +11722,13 @@ rec {
edition = "2021"; edition = "2021";
sha256 = "1cm6r5dmpq96s8gw9dgsinq5g8s466j48dg7dckwc4gc28g6cd21"; sha256 = "1cm6r5dmpq96s8gw9dgsinq5g8s466j48dg7dckwc4gc28g6cd21";
dependencies = [ dependencies = [
{
name = "clap";
packageId = "clap";
optional = true;
usesDefaultFeatures = false;
features = [ "derive" "std" ];
}
{ {
name = "document-features"; name = "document-features";
packageId = "document-features"; packageId = "document-features";
@ -11776,6 +11783,11 @@ rec {
} }
]; ];
devDependencies = [ devDependencies = [
{
name = "clap";
packageId = "clap";
features = [ "help" ];
}
{ {
name = "tokio"; name = "tokio";
packageId = "tokio"; packageId = "tokio";
@ -11801,7 +11813,7 @@ rec {
"unix_path_tools" = [ "nix" ]; "unix_path_tools" = [ "nix" ];
"user_facing_default" = [ "inetd" "unix" "unix_path_tools" "sd_listen" "socket_options" ]; "user_facing_default" = [ "inetd" "unix" "unix_path_tools" "sd_listen" "socket_options" ];
}; };
resolvedDefaultFeatures = [ "default" "inetd" "nix" "sd_listen" "socket2" "socket_options" "tokio-util" "tonic011" "unix" "unix_path_tools" "user_facing_default" ]; resolvedDefaultFeatures = [ "clap" "default" "inetd" "multi-listener" "nix" "sd_listen" "socket2" "socket_options" "tokio-util" "tonic011" "unix" "unix_path_tools" "user_facing_default" ];
}; };
"tokio-macros" = rec { "tokio-macros" = rec {
crateName = "tokio-macros"; crateName = "tokio-macros";
@ -14252,7 +14264,7 @@ rec {
{ {
name = "tokio-listener"; name = "tokio-listener";
packageId = "tokio-listener"; packageId = "tokio-listener";
features = [ "tonic011" ]; features = [ "clap" "multi-listener" "sd_listen" "tonic011" ];
} }
{ {
name = "tokio-stream"; name = "tokio-stream";

View file

@ -26,7 +26,7 @@ sha2 = "0.10.6"
sled = { version = "0.34.7" } sled = { version = "0.34.7" }
thiserror = "1.0.38" thiserror = "1.0.38"
tokio = { version = "1.32.0", features = ["fs", "macros", "net", "rt", "rt-multi-thread", "signal"] } tokio = { version = "1.32.0", features = ["fs", "macros", "net", "rt", "rt-multi-thread", "signal"] }
tokio-listener = { version = "0.4.1", features = [ "tonic011" ] } tokio-listener = { version = "0.4.2", features = [ "clap", "multi-listener", "sd_listen", "tonic011" ] }
tokio-stream = { version = "0.1.14", features = ["fs"] } tokio-stream = { version = "0.1.14", features = ["fs"] }
tokio-util = { version = "0.7.9", features = ["io", "io-util", "compat"] } tokio-util = { version = "0.7.9", features = ["io", "io-util", "compat"] }
tonic = { version = "0.11.0", features = ["tls", "tls-roots"] } tonic = { version = "0.11.0", features = ["tls", "tls-roots"] }

View file

@ -9,9 +9,6 @@ use serde::Deserialize;
use serde::Serialize; use serde::Serialize;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Arc; use std::sync::Arc;
use tokio_listener::Listener;
use tokio_listener::SystemOptions;
use tokio_listener::UserOptions;
use tonic::transport::Server; use tonic::transport::Server;
use tower::ServiceBuilder; use tower::ServiceBuilder;
use tower_http::trace::{DefaultMakeSpan, TraceLayer}; use tower_http::trace::{DefaultMakeSpan, TraceLayer};
@ -66,8 +63,9 @@ struct Cli {
enum Commands { enum Commands {
/// Runs the tvix-store daemon. /// Runs the tvix-store daemon.
Daemon { Daemon {
#[arg(long, short = 'l')] /// The address to listen on.
listen_address: Option<String>, #[clap(flatten)]
listen_args: tokio_listener::ListenerAddressLFlag,
#[arg( #[arg(
long, long,
@ -198,7 +196,7 @@ fn default_threads() -> usize {
async fn run_cli(cli: Cli) -> Result<(), Box<dyn std::error::Error>> { async fn run_cli(cli: Cli) -> Result<(), Box<dyn std::error::Error>> {
match cli.command { match cli.command {
Commands::Daemon { Commands::Daemon {
listen_address, listen_args,
blob_service_addr, blob_service_addr,
directory_service_addr, directory_service_addr,
path_info_service_addr, path_info_service_addr,
@ -212,11 +210,6 @@ async fn run_cli(cli: Cli) -> Result<(), Box<dyn std::error::Error>> {
) )
.await?; .await?;
let listen_address = listen_address
.unwrap_or_else(|| "[::]:8000".to_string())
.parse()
.unwrap();
let mut server = Server::builder().layer( let mut server = Server::builder().layer(
ServiceBuilder::new() ServiceBuilder::new()
.layer( .layer(
@ -251,15 +244,21 @@ async fn run_cli(cli: Cli) -> Result<(), Box<dyn std::error::Error>> {
router = router.add_service(reflection_svc); router = router.add_service(reflection_svc);
} }
info!(listen_address=%listen_address, "starting daemon"); let listen_address = &listen_args.listen_address.unwrap_or_else(|| {
"[::]:8000"
.parse()
.expect("invalid fallback listen address")
});
let listener = Listener::bind( let listener = tokio_listener::Listener::bind(
&listen_address, listen_address,
&SystemOptions::default(), &Default::default(),
&UserOptions::default(), &listen_args.listener_options,
) )
.await?; .await?;
info!(listen_address=%listen_address, "starting daemon");
router.serve_with_incoming(listener).await?; router.serve_with_incoming(listener).await?;
} }
Commands::Import { Commands::Import {