refactor(tvix/castore/grpc/blobsvc): inline stream_mapper

This can be written without the additional function.

Change-Id: Ib11c5d5254d3e44c8fa9661414835b0622eb1ac4
Reviewed-on: https://cl.tvl.fyi/c/depot/+/10735
Reviewed-by: Connor Brewster <cbrewster@hey.com>
Autosubmit: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
This commit is contained in:
Florian Klink 2024-02-02 17:45:33 +02:00 committed by clbot
parent 1157eea710
commit 5ad5a0da00

View file

@ -3,7 +3,6 @@ use core::pin::pin;
use futures::{stream::BoxStream, TryFutureExt}; use futures::{stream::BoxStream, TryFutureExt};
use std::{ use std::{
collections::VecDeque, collections::VecDeque,
io,
ops::{Deref, DerefMut}, ops::{Deref, DerefMut},
}; };
use tokio_stream::StreamExt; use tokio_stream::StreamExt;
@ -118,17 +117,9 @@ where
.map_err(|_e| Status::invalid_argument("invalid digest length"))?; .map_err(|_e| Status::invalid_argument("invalid digest length"))?;
match self.blob_service.open_read(&req_digest).await { match self.blob_service.open_read(&req_digest).await {
Ok(Some(reader)) => { Ok(Some(r)) => {
fn stream_mapper( let chunks_stream =
x: Result<bytes::Bytes, io::Error>, ReaderStream::new(r).map(|chunk| Ok(super::BlobChunk { data: chunk? }));
) -> Result<super::BlobChunk, Status> {
match x {
Ok(bytes) => Ok(super::BlobChunk { data: bytes }),
Err(e) => Err(Status::from(e)),
}
}
let chunks_stream = ReaderStream::new(reader).map(stream_mapper);
Ok(Response::new(Box::pin(chunks_stream))) Ok(Response::new(Box::pin(chunks_stream)))
} }
Ok(None) => Err(Status::not_found(format!("blob {} not found", &req_digest))), Ok(None) => Err(Status::not_found(format!("blob {} not found", &req_digest))),