refactor(tvix/store): Upgrade tokio-listener to get tonic support
Tonic support was added to tokio-listener upstream which removes the need for use to have tonic compatibility wrapper types around it. See: https://github.com/vi/tokio-listener/pull/2 Fixes b/311 Change-Id: I04a2dbb3bc3c8bfe9339583c0b46070c7ec97811 Reviewed-on: https://cl.tvl.fyi/c/depot/+/9721 Tested-by: BuildkiteCI Reviewed-by: flokli <flokli@flokli.de>
This commit is contained in:
parent
8e811fe625
commit
e3d72cc4cb
6 changed files with 29 additions and 141 deletions
5
tvix/Cargo.lock
generated
5
tvix/Cargo.lock
generated
|
@ -2570,9 +2570,9 @@ dependencies = [
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "tokio-listener"
|
name = "tokio-listener"
|
||||||
version = "0.2.1"
|
version = "0.2.2"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "05875e290052a679fec29aef1da57eb4da3dafb59c6fe579fb143ccca3dea7fb"
|
checksum = "669ed78565b6ce6482aaf8c1f67e0ae1fa1cf1a97c090e96994d502857675d45"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"document-features",
|
"document-features",
|
||||||
"futures-core",
|
"futures-core",
|
||||||
|
@ -2581,6 +2581,7 @@ dependencies = [
|
||||||
"pin-project",
|
"pin-project",
|
||||||
"socket2 0.5.4",
|
"socket2 0.5.4",
|
||||||
"tokio",
|
"tokio",
|
||||||
|
"tonic",
|
||||||
"tracing",
|
"tracing",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
|
@ -7481,9 +7481,9 @@ rec {
|
||||||
};
|
};
|
||||||
"tokio-listener" = rec {
|
"tokio-listener" = rec {
|
||||||
crateName = "tokio-listener";
|
crateName = "tokio-listener";
|
||||||
version = "0.2.1";
|
version = "0.2.2";
|
||||||
edition = "2021";
|
edition = "2021";
|
||||||
sha256 = "1yx7vsiwqg0lzdwyavwwnnpkvnmlgsjivvwsqbz7k9jj00lmx1q5";
|
sha256 = "0iaxcxbjhl2dk6b0w2bwm7qiryp119zgdhgqma169kmncn2xg7k6";
|
||||||
dependencies = [
|
dependencies = [
|
||||||
{
|
{
|
||||||
name = "document-features";
|
name = "document-features";
|
||||||
|
@ -7522,6 +7522,11 @@ rec {
|
||||||
packageId = "tokio";
|
packageId = "tokio";
|
||||||
features = [ "net" "io-std" "time" "sync" ];
|
features = [ "net" "io-std" "time" "sync" ];
|
||||||
}
|
}
|
||||||
|
{
|
||||||
|
name = "tonic";
|
||||||
|
packageId = "tonic";
|
||||||
|
optional = true;
|
||||||
|
}
|
||||||
{
|
{
|
||||||
name = "tracing";
|
name = "tracing";
|
||||||
packageId = "tracing";
|
packageId = "tracing";
|
||||||
|
@ -7538,6 +7543,10 @@ rec {
|
||||||
packageId = "tokio";
|
packageId = "tokio";
|
||||||
features = [ "macros" "rt" "io-util" ];
|
features = [ "macros" "rt" "io-util" ];
|
||||||
}
|
}
|
||||||
|
{
|
||||||
|
name = "tonic";
|
||||||
|
packageId = "tonic";
|
||||||
|
}
|
||||||
];
|
];
|
||||||
features = {
|
features = {
|
||||||
"clap" = [ "dep:clap" ];
|
"clap" = [ "dep:clap" ];
|
||||||
|
@ -7549,10 +7558,12 @@ rec {
|
||||||
"serde_with" = [ "dep:serde_with" ];
|
"serde_with" = [ "dep:serde_with" ];
|
||||||
"socket2" = [ "dep:socket2" ];
|
"socket2" = [ "dep:socket2" ];
|
||||||
"socket_options" = [ "socket2" ];
|
"socket_options" = [ "socket2" ];
|
||||||
|
"tonic" = [ "dep:tonic" ];
|
||||||
|
"tonic010" = [ "tonic" ];
|
||||||
"unix_path_tools" = [ "nix" ];
|
"unix_path_tools" = [ "nix" ];
|
||||||
"user_facing_default" = [ "inetd" "unix" "unix_path_tools" "sd_listen" "socket_options" ];
|
"user_facing_default" = [ "inetd" "unix" "unix_path_tools" "sd_listen" "socket_options" ];
|
||||||
};
|
};
|
||||||
resolvedDefaultFeatures = [ "default" "hyper" "hyper014" "inetd" "nix" "sd_listen" "socket2" "socket_options" "unix" "unix_path_tools" "user_facing_default" ];
|
resolvedDefaultFeatures = [ "default" "hyper" "hyper014" "inetd" "nix" "sd_listen" "socket2" "socket_options" "tonic" "tonic010" "unix" "unix_path_tools" "user_facing_default" ];
|
||||||
};
|
};
|
||||||
"tokio-macros" = rec {
|
"tokio-macros" = rec {
|
||||||
crateName = "tokio-macros";
|
crateName = "tokio-macros";
|
||||||
|
@ -9003,6 +9014,7 @@ rec {
|
||||||
{
|
{
|
||||||
name = "tokio-listener";
|
name = "tokio-listener";
|
||||||
packageId = "tokio-listener";
|
packageId = "tokio-listener";
|
||||||
|
features = [ "tonic010" ];
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
name = "tokio-stream";
|
name = "tokio-stream";
|
||||||
|
|
|
@ -20,9 +20,10 @@ prost = "0.12.1"
|
||||||
sha2 = "0.10.6"
|
sha2 = "0.10.6"
|
||||||
sled = { version = "0.34.7", features = ["compression"] }
|
sled = { version = "0.34.7", features = ["compression"] }
|
||||||
thiserror = "1.0.38"
|
thiserror = "1.0.38"
|
||||||
|
tokio = { version = "1.32.0", features = ["fs", "macros", "net", "rt", "rt-multi-thread", "signal"] }
|
||||||
|
tokio-listener = { version = "0.2.2", features = [ "tonic010" ] }
|
||||||
tokio-stream = { version = "0.1.14", features = ["fs"] }
|
tokio-stream = { version = "0.1.14", features = ["fs"] }
|
||||||
tokio-util = { version = "0.7.9", features = ["io", "io-util", "compat"] }
|
tokio-util = { version = "0.7.9", features = ["io", "io-util", "compat"] }
|
||||||
tokio = { version = "1.32.0", features = ["fs", "macros", "net", "rt", "rt-multi-thread", "signal"] }
|
|
||||||
tonic = { version = "0.10.2", features = ["tls", "tls-roots"] }
|
tonic = { version = "0.10.2", features = ["tls", "tls-roots"] }
|
||||||
tower = "0.4.13"
|
tower = "0.4.13"
|
||||||
tracing = "0.1.37"
|
tracing = "0.1.37"
|
||||||
|
@ -30,7 +31,6 @@ tracing-subscriber = { version = "0.3.16", features = ["json"] }
|
||||||
tvix-castore = { path = "../castore" }
|
tvix-castore = { path = "../castore" }
|
||||||
url = "2.4.0"
|
url = "2.4.0"
|
||||||
walkdir = "2.4.0"
|
walkdir = "2.4.0"
|
||||||
tokio-listener = { version = "0.2.1" }
|
|
||||||
async-recursion = "1.0.5"
|
async-recursion = "1.0.5"
|
||||||
|
|
||||||
[dependencies.fuse-backend-rs]
|
[dependencies.fuse-backend-rs]
|
||||||
|
|
|
@ -7,6 +7,9 @@ use std::io;
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use tokio::task::JoinHandle;
|
use tokio::task::JoinHandle;
|
||||||
|
use tokio_listener::Listener;
|
||||||
|
use tokio_listener::SystemOptions;
|
||||||
|
use tokio_listener::UserOptions;
|
||||||
use tracing_subscriber::prelude::*;
|
use tracing_subscriber::prelude::*;
|
||||||
use tvix_castore::blobservice;
|
use tvix_castore::blobservice;
|
||||||
use tvix_castore::directoryservice;
|
use tvix_castore::directoryservice;
|
||||||
|
@ -17,7 +20,6 @@ use tvix_castore::proto::node::Node;
|
||||||
use tvix_castore::proto::GRPCBlobServiceWrapper;
|
use tvix_castore::proto::GRPCBlobServiceWrapper;
|
||||||
use tvix_castore::proto::GRPCDirectoryServiceWrapper;
|
use tvix_castore::proto::GRPCDirectoryServiceWrapper;
|
||||||
use tvix_castore::proto::NamedNode;
|
use tvix_castore::proto::NamedNode;
|
||||||
use tvix_store::listener::ListenerStream;
|
|
||||||
use tvix_store::pathinfoservice;
|
use tvix_store::pathinfoservice;
|
||||||
use tvix_store::proto::path_info_service_server::PathInfoServiceServer;
|
use tvix_store::proto::path_info_service_server::PathInfoServiceServer;
|
||||||
use tvix_store::proto::GRPCPathInfoServiceWrapper;
|
use tvix_store::proto::GRPCPathInfoServiceWrapper;
|
||||||
|
@ -228,7 +230,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
|
||||||
info!("tvix-store listening on {}", listen_address);
|
info!("tvix-store listening on {}", listen_address);
|
||||||
|
|
||||||
let listener = ListenerStream::bind(&listen_address).await?;
|
let listener = Listener::bind(
|
||||||
|
&listen_address,
|
||||||
|
&SystemOptions::default(),
|
||||||
|
&UserOptions::default(),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
router.serve_with_incoming(listener).await?;
|
router.serve_with_incoming(listener).await?;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
#[cfg(feature = "fs")]
|
#[cfg(feature = "fs")]
|
||||||
pub mod fs;
|
pub mod fs;
|
||||||
|
|
||||||
pub mod listener;
|
|
||||||
pub mod nar;
|
pub mod nar;
|
||||||
pub mod pathinfoservice;
|
pub mod pathinfoservice;
|
||||||
pub mod proto;
|
pub mod proto;
|
||||||
|
|
|
@ -1,131 +0,0 @@
|
||||||
use std::{
|
|
||||||
io,
|
|
||||||
ops::{Deref, DerefMut},
|
|
||||||
pin::Pin,
|
|
||||||
task::{Context, Poll},
|
|
||||||
};
|
|
||||||
|
|
||||||
use futures::Stream;
|
|
||||||
use pin_project_lite::pin_project;
|
|
||||||
use tokio::io::{AsyncRead, AsyncWrite};
|
|
||||||
use tokio_listener::{Listener, ListenerAddress};
|
|
||||||
use tonic::transport::server::{Connected, TcpConnectInfo, UdsConnectInfo};
|
|
||||||
|
|
||||||
/// A wrapper around a [Listener] which implements the [Stream] trait.
|
|
||||||
/// Mainly used to bridge [tokio_listener] with [tonic].
|
|
||||||
pub struct ListenerStream {
|
|
||||||
inner: Listener,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ListenerStream {
|
|
||||||
/// Convert a [Listener] into a [Stream].
|
|
||||||
pub fn new(inner: Listener) -> Self {
|
|
||||||
Self { inner }
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Binds to the specified address and returns a [Stream] of connections.
|
|
||||||
pub async fn bind(addr: &ListenerAddress) -> io::Result<Self> {
|
|
||||||
let listener = Listener::bind(addr, &Default::default(), &Default::default()).await?;
|
|
||||||
|
|
||||||
Ok(Self::new(listener))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Stream for ListenerStream {
|
|
||||||
type Item = io::Result<Connection>;
|
|
||||||
|
|
||||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
|
||||||
match self.inner.poll_accept(cx) {
|
|
||||||
Poll::Ready(Ok((connection, _))) => Poll::Ready(Some(Ok(Connection::new(connection)))),
|
|
||||||
Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))),
|
|
||||||
Poll::Pending => Poll::Pending,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pin_project! {
|
|
||||||
/// A wrapper around a [tokio_listener::Connection] that implements the [Connected] trait
|
|
||||||
/// so it is compatible with [tonic].
|
|
||||||
pub struct Connection {
|
|
||||||
#[pin]
|
|
||||||
inner: tokio_listener::Connection,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Connection {
|
|
||||||
fn new(inner: tokio_listener::Connection) -> Self {
|
|
||||||
Self { inner }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Deref for Connection {
|
|
||||||
type Target = tokio_listener::Connection;
|
|
||||||
|
|
||||||
fn deref(&self) -> &Self::Target {
|
|
||||||
&self.inner
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl DerefMut for Connection {
|
|
||||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
|
||||||
&mut self.inner
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub enum ListenerConnectInfo {
|
|
||||||
TCP(TcpConnectInfo),
|
|
||||||
Unix(UdsConnectInfo),
|
|
||||||
Stdio,
|
|
||||||
Other,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Connected for Connection {
|
|
||||||
type ConnectInfo = ListenerConnectInfo;
|
|
||||||
|
|
||||||
fn connect_info(&self) -> Self::ConnectInfo {
|
|
||||||
if let Some(tcp_stream) = self.try_borrow_tcp() {
|
|
||||||
ListenerConnectInfo::TCP(tcp_stream.connect_info())
|
|
||||||
} else if let Some(unix_stream) = self.try_borrow_unix() {
|
|
||||||
ListenerConnectInfo::Unix(unix_stream.connect_info())
|
|
||||||
} else if self.try_borrow_stdio().is_some() {
|
|
||||||
ListenerConnectInfo::Stdio
|
|
||||||
} else {
|
|
||||||
ListenerConnectInfo::Other
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl AsyncRead for Connection {
|
|
||||||
fn poll_read(
|
|
||||||
self: Pin<&mut Self>,
|
|
||||||
cx: &mut std::task::Context<'_>,
|
|
||||||
buf: &mut tokio::io::ReadBuf<'_>,
|
|
||||||
) -> Poll<io::Result<()>> {
|
|
||||||
self.project().inner.poll_read(cx, buf)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl AsyncWrite for Connection {
|
|
||||||
fn poll_write(
|
|
||||||
self: Pin<&mut Self>,
|
|
||||||
cx: &mut std::task::Context<'_>,
|
|
||||||
buf: &[u8],
|
|
||||||
) -> Poll<std::result::Result<usize, io::Error>> {
|
|
||||||
self.project().inner.poll_write(cx, buf)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn poll_flush(
|
|
||||||
self: Pin<&mut Self>,
|
|
||||||
cx: &mut std::task::Context<'_>,
|
|
||||||
) -> Poll<std::result::Result<(), io::Error>> {
|
|
||||||
self.project().inner.poll_flush(cx)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn poll_shutdown(
|
|
||||||
self: Pin<&mut Self>,
|
|
||||||
cx: &mut std::task::Context<'_>,
|
|
||||||
) -> Poll<std::result::Result<(), io::Error>> {
|
|
||||||
self.project().inner.poll_shutdown(cx)
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Add table
Reference in a new issue