fix(tvix/castore/blobservice/grpc): don't use NaiveSeeker for now

Userland likes to seek backwards, and until we have store composition
and can serve chunks from a local cache, we need to buffer the
individual chunks in memory.

Change-Id: I66978a0722d5f55ed4a9a49d116cecb64a01995d
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11448
Tested-by: BuildkiteCI
Reviewed-by: raitobezarius <tvl@lahfa.xyz>
This commit is contained in:
Florian Klink 2024-04-16 17:48:47 +03:00 committed by flokli
parent 8107678632
commit 538d5fc8ee

View file

@ -1,10 +1,15 @@
use super::{naive_seeker::NaiveSeeker, BlobReader, BlobService, BlobWriter, ChunkedReader};
use super::{BlobReader, BlobService, BlobWriter, ChunkedReader};
use crate::{
proto::{self, stat_blob_response::ChunkMeta},
B3Digest,
};
use futures::sink::SinkExt;
use std::{io, pin::pin, sync::Arc, task::Poll};
use std::{
io::{self, Cursor},
pin::pin,
sync::Arc,
task::Poll,
};
use tokio::io::AsyncWriteExt;
use tokio::task::JoinHandle;
use tokio_stream::{wrappers::ReceiverStream, StreamExt};
@ -55,11 +60,10 @@ impl BlobService for GRPCBlobService {
#[instrument(skip(self, digest), fields(blob.digest=%digest), err)]
async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> {
// First try to get a list of chunks. In case there's only one chunk returned,
// or the backend does not support chunking, return a NaiveSeeker.
// Otherwise use a ChunkedReader.
// TODO: we should check if we want to replace NaiveSeeker with a simple
// Cursor on the raw `Vec<u8>`, as seeking backwards is something that
// clients generally do.
// buffer its data into a Vec, otherwise use a ChunkedReader.
// We previously used NaiveSeeker here, but userland likes to seek backwards too often,
// and without store composition this will get very noisy.
// FUTUREWORK: use CombinedBlobService and store composition.
match self.chunks(digest).await {
Ok(None) => Ok(None),
Ok(Some(chunks)) => {
@ -82,9 +86,13 @@ impl BlobService for GRPCBlobService {
});
// Use StreamReader::new to convert to an AsyncRead.
let data_reader = tokio_util::io::StreamReader::new(data_stream);
let mut data_reader = tokio_util::io::StreamReader::new(data_stream);
Ok(Some(Box::new(NaiveSeeker::new(data_reader))))
let mut buf = Vec::new();
// TODO: only do this up to a certain limit.
tokio::io::copy(&mut data_reader, &mut buf).await?;
Ok(Some(Box::new(Cursor::new(buf))))
}
Err(e) if e.code() == Code::NotFound => Ok(None),
Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),