feat(tvix/castore/blobsvc/grpc): rm VecDec, fix docstring

The docstrings were not updated once we made the BlobService trait async.
There's no more need to turn things into a sync reader.

Also, rearrange the stream manipulation a bit, and remove the need to
create a new VecDeque for each element in the stream. bytes::Bytes
implements the Buf trait.

Fixes b/289.

Change-Id: Id2bbedca5876b462e630c144b74cc289c3916c4d
Reviewed-on: https://cl.tvl.fyi/c/depot/+/10582
Autosubmit: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
Reviewed-by: raitobezarius <tvl@lahfa.xyz>
This commit is contained in:
Florian Klink 2024-01-09 13:55:38 +02:00 committed by clbot
parent 17eaacb139
commit 8fbdf72825

View file

@ -1,12 +1,7 @@
use super::{naive_seeker::NaiveSeeker, BlobReader, BlobService, BlobWriter}; use super::{naive_seeker::NaiveSeeker, BlobReader, BlobService, BlobWriter};
use crate::{proto, B3Digest}; use crate::{proto, B3Digest};
use futures::sink::SinkExt; use futures::sink::SinkExt;
use std::{ use std::{io, pin::pin, task::Poll};
collections::VecDeque,
io::{self},
pin::pin,
task::Poll,
};
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tokio_stream::{wrappers::ReceiverStream, StreamExt}; use tokio_stream::{wrappers::ReceiverStream, StreamExt};
@ -54,30 +49,23 @@ impl BlobService for GRPCBlobService {
} }
} }
// On success, this returns a Ok(Some(io::Read)), which can be used to read
// the contents of the Blob, identified by the digest.
async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> { async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> {
// Get a stream of [proto::BlobChunk], or return an error if the blob // Get a stream of [proto::BlobChunk], or return an error if the blob
// doesn't exist. // doesn't exist.
let resp = self match self
.grpc_client .grpc_client
.clone() .clone()
.read(proto::ReadBlobRequest { .read(proto::ReadBlobRequest {
digest: digest.clone().into(), digest: digest.clone().into(),
}) })
.await; .await
{
// This runs the task to completion, which on success will return a stream.
// On reading from it, we receive individual [proto::BlobChunk], so we
// massage this to a stream of bytes,
// then create an [AsyncRead], which we'll turn into a [io::Read],
// that's returned from the function.
match resp {
Ok(stream) => { Ok(stream) => {
// map the stream of proto::BlobChunk to bytes. // on success, this is a stream of tonic::Result<proto::BlobChunk>,
let data_stream = stream.into_inner().map(|x| { // so access .data and map errors into std::io::Error.
x.map(|x| VecDeque::from(x.data.to_vec())) let data_stream = stream.into_inner().map(|e| {
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e)) e.map(|c| c.data)
.map_err(|s| std::io::Error::new(io::ErrorKind::InvalidData, s))
}); });
// Use StreamReader::new to convert to an AsyncRead. // Use StreamReader::new to convert to an AsyncRead.