fix(tvix/castore/blobservice): update bytes_read only on successful read

We previously updated this.pos also in case the underlying read returned
an error.

Also, use the ready! macro to remove the match block, and instrument
errors returned during start_seek.

Change-Id: Ic32e26579d964a76b45687134acc48d72d67c36f
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11421
Reviewed-by: Brian Olsen <me@griff.name>
Reviewed-by: Connor Brewster <cbrewster@hey.com>
Tested-by: BuildkiteCI
Autosubmit: flokli <flokli@flokli.de>
This commit is contained in:
Florian Klink 2024-04-14 15:55:50 +03:00 committed by clbot
parent bccb4c92c3
commit c19bc95b8a
2 changed files with 16 additions and 22 deletions

View file

@ -1,12 +1,12 @@
use futures::TryStreamExt; use futures::{ready, TryStreamExt};
use pin_project_lite::pin_project; use pin_project_lite::pin_project;
use tokio::io::{AsyncRead, AsyncSeekExt}; use tokio::io::{AsyncRead, AsyncSeekExt};
use tokio_stream::StreamExt; use tokio_stream::StreamExt;
use tokio_util::io::{ReaderStream, StreamReader}; use tokio_util::io::{ReaderStream, StreamReader};
use tracing::warn; use tracing::{instrument, warn};
use crate::B3Digest; use crate::B3Digest;
use std::{cmp::Ordering, pin::Pin, task::Poll}; use std::{cmp::Ordering, pin::Pin};
use super::{BlobReader, BlobService}; use super::{BlobReader, BlobService};
@ -60,15 +60,12 @@ where
let filled_before = buf.filled().len(); let filled_before = buf.filled().len();
let this = self.project(); let this = self.project();
match this.r.poll_read(cx, buf) {
Poll::Ready(a) => { ready!(this.r.poll_read(cx, buf))?;
let bytes_read = buf.filled().len() - filled_before; let bytes_read = buf.filled().len() - filled_before;
*this.pos += bytes_read as u64; *this.pos += bytes_read as u64;
Poll::Ready(a) Ok(()).into()
}
Poll::Pending => Poll::Pending,
}
} }
} }
@ -76,6 +73,7 @@ impl<BS> tokio::io::AsyncSeek for ChunkedReader<BS>
where where
BS: AsRef<dyn BlobService> + Clone + Send + 'static, BS: AsRef<dyn BlobService> + Clone + Send + 'static,
{ {
#[instrument(skip(self), err(Debug))]
fn start_seek(self: Pin<&mut Self>, position: std::io::SeekFrom) -> std::io::Result<()> { fn start_seek(self: Pin<&mut Self>, position: std::io::SeekFrom) -> std::io::Result<()> {
let total_len = self.chunked_blob.blob_length(); let total_len = self.chunked_blob.blob_length();
let current_pos = self.pos; let current_pos = self.pos;

View file

@ -1,4 +1,5 @@
use super::BlobReader; use super::BlobReader;
use futures::ready;
use pin_project_lite::pin_project; use pin_project_lite::pin_project;
use std::io; use std::io;
use std::task::Poll; use std::task::Poll;
@ -83,18 +84,13 @@ impl<R: tokio::io::AsyncRead> tokio::io::AsyncRead for NaiveSeeker<R> {
// The amount of data read can be determined by the increase // The amount of data read can be determined by the increase
// in the length of the slice returned by `ReadBuf::filled`. // in the length of the slice returned by `ReadBuf::filled`.
let filled_before = buf.filled().len(); let filled_before = buf.filled().len();
let this = self.project(); let this = self.project();
let pos: &mut u64 = this.pos; ready!(this.r.poll_read(cx, buf))?;
match this.r.poll_read(cx, buf) {
Poll::Ready(a) => {
let bytes_read = buf.filled().len() - filled_before; let bytes_read = buf.filled().len() - filled_before;
*pos += bytes_read as u64; *this.pos += bytes_read as u64;
Poll::Ready(a) Ok(()).into()
}
Poll::Pending => Poll::Pending,
}
} }
} }
@ -115,7 +111,7 @@ impl<R: tokio::io::AsyncRead> tokio::io::AsyncBufRead for NaiveSeeker<R> {
} }
impl<R: tokio::io::AsyncRead> tokio::io::AsyncSeek for NaiveSeeker<R> { impl<R: tokio::io::AsyncRead> tokio::io::AsyncSeek for NaiveSeeker<R> {
#[instrument(skip(self))] #[instrument(skip(self), err(Debug))]
fn start_seek( fn start_seek(
self: std::pin::Pin<&mut Self>, self: std::pin::Pin<&mut Self>,
position: std::io::SeekFrom, position: std::io::SeekFrom,