refactor(tvix/store): use tokio::task::JoinHandle

This makes the inside code a bit less verbose.

I wasn't able to describe the type of the async move closure itself,
which would allow us to remove the JoinHandle<_> type annotation
entirely.

Change-Id: I06193982a0c7010bd72d3ffa4f760bea1b097632
Reviewed-on: https://cl.tvl.fyi/c/depot/+/9268
Autosubmit: flokli <flokli@flokli.de>
Reviewed-by: tazjin <tazjin@tvl.su>
Tested-by: BuildkiteCI
This commit is contained in:
Florian Klink 2023-09-05 15:22:57 +03:00 committed by clbot
parent f499d2e031
commit 7923cc19f6
3 changed files with 35 additions and 37 deletions

View file

@ -94,16 +94,15 @@ impl BlobService for GRPCBlobService {
let mut grpc_client = self.grpc_client.clone(); let mut grpc_client = self.grpc_client.clone();
let digest = digest.clone(); let digest = digest.clone();
let task: tokio::task::JoinHandle<Result<_, Status>> = let task: JoinHandle<Result<_, Status>> = self.tokio_handle.spawn(async move {
self.tokio_handle.spawn(async move { Ok(grpc_client
Ok(grpc_client .stat(proto::StatBlobRequest {
.stat(proto::StatBlobRequest { digest: digest.into(),
digest: digest.into(), ..Default::default()
..Default::default() })
}) .await?
.await? .into_inner())
.into_inner()) });
});
match self.tokio_handle.block_on(task)? { match self.tokio_handle.block_on(task)? {
Ok(_blob_meta) => Ok(true), Ok(_blob_meta) => Ok(true),
@ -122,7 +121,7 @@ impl BlobService for GRPCBlobService {
// Construct the task that'll send out the request and return the stream // Construct the task that'll send out the request and return the stream
// the gRPC client should use to send [proto::BlobChunk], or an error if // the gRPC client should use to send [proto::BlobChunk], or an error if
// the blob doesn't exist. // the blob doesn't exist.
let task: tokio::task::JoinHandle<Result<Streaming<proto::BlobChunk>, Status>> = let task: JoinHandle<Result<Streaming<proto::BlobChunk>, Status>> =
self.tokio_handle.spawn(async move { self.tokio_handle.spawn(async move {
let stream = grpc_client let stream = grpc_client
.read(proto::ReadBlobRequest { .read(proto::ReadBlobRequest {
@ -172,7 +171,7 @@ impl BlobService for GRPCBlobService {
let blobchunk_stream = ReceiverStream::new(rx).map(|x| proto::BlobChunk { data: x }); let blobchunk_stream = ReceiverStream::new(rx).map(|x| proto::BlobChunk { data: x });
// That receiver stream is used as a stream in the gRPC BlobService.put rpc call. // That receiver stream is used as a stream in the gRPC BlobService.put rpc call.
let task: tokio::task::JoinHandle<Result<_, Status>> = self let task: JoinHandle<Result<_, Status>> = self
.tokio_handle .tokio_handle
.spawn(async move { Ok(grpc_client.put(blobchunk_stream).await?.into_inner()) }); .spawn(async move { Ok(grpc_client.put(blobchunk_stream).await?.into_inner()) });

View file

@ -5,6 +5,7 @@ use crate::proto::{self, get_directory_request::ByWhat};
use crate::{B3Digest, Error}; use crate::{B3Digest, Error};
use tokio::net::UnixStream; use tokio::net::UnixStream;
use tokio::sync::mpsc::UnboundedSender; use tokio::sync::mpsc::UnboundedSender;
use tokio::task::JoinHandle;
use tokio_stream::wrappers::UnboundedReceiverStream; use tokio_stream::wrappers::UnboundedReceiverStream;
use tonic::{transport::Channel, Status}; use tonic::{transport::Channel, Status};
use tonic::{Code, Streaming}; use tonic::{Code, Streaming};
@ -162,7 +163,7 @@ impl DirectoryService for GRPCDirectoryService {
// clone so we can move it // clone so we can move it
let root_directory_digest_cpy = root_directory_digest.clone(); let root_directory_digest_cpy = root_directory_digest.clone();
let task: tokio::task::JoinHandle<Result<Streaming<proto::Directory>, Status>> = let task: JoinHandle<Result<Streaming<proto::Directory>, Status>> =
self.tokio_handle.spawn(async move { self.tokio_handle.spawn(async move {
let s = grpc_client let s = grpc_client
.get(proto::GetDirectoryRequest { .get(proto::GetDirectoryRequest {
@ -193,7 +194,7 @@ impl DirectoryService for GRPCDirectoryService {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let task: tokio::task::JoinHandle<Result<proto::PutDirectoryResponse, Status>> = let task: JoinHandle<Result<proto::PutDirectoryResponse, Status>> =
self.tokio_handle.spawn(async move { self.tokio_handle.spawn(async move {
let s = grpc_client let s = grpc_client
.put(UnboundedReceiverStream::new(rx)) .put(UnboundedReceiverStream::new(rx))
@ -303,7 +304,7 @@ pub struct GRPCPutter {
/// The task will yield a [proto::PutDirectoryResponse] once the stream is closed. /// The task will yield a [proto::PutDirectoryResponse] once the stream is closed.
#[allow(clippy::type_complexity)] // lol #[allow(clippy::type_complexity)] // lol
rq: Option<( rq: Option<(
tokio::task::JoinHandle<Result<proto::PutDirectoryResponse, Status>>, JoinHandle<Result<proto::PutDirectoryResponse, Status>>,
UnboundedSender<proto::Directory>, UnboundedSender<proto::Directory>,
)>, )>,
} }
@ -312,7 +313,7 @@ impl GRPCPutter {
pub fn new( pub fn new(
tokio_handle: tokio::runtime::Handle, tokio_handle: tokio::runtime::Handle,
directory_sender: UnboundedSender<proto::Directory>, directory_sender: UnboundedSender<proto::Directory>,
task: tokio::task::JoinHandle<Result<proto::PutDirectoryResponse, Status>>, task: JoinHandle<Result<proto::PutDirectoryResponse, Status>>,
) -> Self { ) -> Self {
Self { Self {
tokio_handle, tokio_handle,

View file

@ -5,7 +5,7 @@ use crate::{
proto::{self, ListPathInfoRequest}, proto::{self, ListPathInfoRequest},
}; };
use std::sync::Arc; use std::sync::Arc;
use tokio::net::UnixStream; use tokio::{net::UnixStream, task::JoinHandle};
use tonic::{transport::Channel, Code, Status, Streaming}; use tonic::{transport::Channel, Code, Status, Streaming};
/// Connects to a (remote) tvix-store PathInfoService over gRPC. /// Connects to a (remote) tvix-store PathInfoService over gRPC.
@ -96,7 +96,7 @@ impl PathInfoService for GRPCPathInfoService {
// Get a new handle to the gRPC client. // Get a new handle to the gRPC client.
let mut grpc_client = self.grpc_client.clone(); let mut grpc_client = self.grpc_client.clone();
let task: tokio::task::JoinHandle<Result<proto::PathInfo, Status>> = let task: JoinHandle<Result<proto::PathInfo, Status>> =
self.tokio_handle.spawn(async move { self.tokio_handle.spawn(async move {
let path_info = grpc_client let path_info = grpc_client
.get(proto::GetPathInfoRequest { .get(proto::GetPathInfoRequest {
@ -121,7 +121,7 @@ impl PathInfoService for GRPCPathInfoService {
// Get a new handle to the gRPC client. // Get a new handle to the gRPC client.
let mut grpc_client = self.grpc_client.clone(); let mut grpc_client = self.grpc_client.clone();
let task: tokio::task::JoinHandle<Result<proto::PathInfo, Status>> = let task: JoinHandle<Result<proto::PathInfo, Status>> =
self.tokio_handle.spawn(async move { self.tokio_handle.spawn(async move {
let path_info = grpc_client.put(path_info).await?.into_inner(); let path_info = grpc_client.put(path_info).await?.into_inner();
Ok(path_info) Ok(path_info)
@ -140,16 +140,15 @@ impl PathInfoService for GRPCPathInfoService {
let mut grpc_client = self.grpc_client.clone(); let mut grpc_client = self.grpc_client.clone();
let root_node = root_node.clone(); let root_node = root_node.clone();
let task: tokio::task::JoinHandle<Result<_, Status>> = let task: JoinHandle<Result<_, Status>> = self.tokio_handle.spawn(async move {
self.tokio_handle.spawn(async move { let path_info = grpc_client
let path_info = grpc_client .calculate_nar(proto::Node {
.calculate_nar(proto::Node { node: Some(root_node),
node: Some(root_node), })
}) .await?
.await? .into_inner();
.into_inner(); Ok(path_info)
Ok(path_info) });
});
let resp = self let resp = self
.tokio_handle .tokio_handle
@ -169,15 +168,14 @@ impl PathInfoService for GRPCPathInfoService {
// Get a new handle to the gRPC client. // Get a new handle to the gRPC client.
let mut grpc_client = self.grpc_client.clone(); let mut grpc_client = self.grpc_client.clone();
let task: tokio::task::JoinHandle<Result<_, Status>> = let task: JoinHandle<Result<_, Status>> = self.tokio_handle.spawn(async move {
self.tokio_handle.spawn(async move { let s = grpc_client
let s = grpc_client .list(ListPathInfoRequest::default())
.list(ListPathInfoRequest::default()) .await?
.await? .into_inner();
.into_inner();
Ok(s) Ok(s)
}); });
let stream = self.tokio_handle.block_on(task).unwrap().unwrap(); let stream = self.tokio_handle.block_on(task).unwrap().unwrap();