feat(tvix/castore/blob/naive_seeker): add some more tracing

Change-Id: Iecf4a82a7d84008a8620825570b34e9094e6d590
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11445
Reviewed-by: raitobezarius <tvl@lahfa.xyz>
Tested-by: BuildkiteCI
This commit is contained in:
Florian Klink 2024-04-16 16:59:25 +03:00 committed by flokli
parent 9d9c731147
commit bfd342873c

View file

@ -4,7 +4,7 @@ use pin_project_lite::pin_project;
use std::io; use std::io;
use std::task::Poll; use std::task::Poll;
use tokio::io::AsyncRead; use tokio::io::AsyncRead;
use tracing::{debug, instrument}; use tracing::{debug, instrument, trace, warn};
pin_project! { pin_project! {
/// This implements [tokio::io::AsyncSeek] for and [tokio::io::AsyncRead] by /// This implements [tokio::io::AsyncSeek] for and [tokio::io::AsyncRead] by
@ -79,6 +79,7 @@ impl<R: tokio::io::AsyncRead> NaiveSeeker<R> {
} }
impl<R: tokio::io::AsyncRead> tokio::io::AsyncRead for NaiveSeeker<R> { impl<R: tokio::io::AsyncRead> tokio::io::AsyncRead for NaiveSeeker<R> {
#[instrument(level = "trace", skip_all)]
fn poll_read( fn poll_read(
self: std::pin::Pin<&mut Self>, self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>, cx: &mut std::task::Context<'_>,
@ -90,9 +91,12 @@ impl<R: tokio::io::AsyncRead> tokio::io::AsyncRead for NaiveSeeker<R> {
let this = self.project(); let this = self.project();
ready!(this.r.poll_read(cx, buf))?; ready!(this.r.poll_read(cx, buf))?;
let bytes_read = buf.filled().len() - filled_before; let bytes_read = buf.filled().len() - filled_before;
*this.pos += bytes_read as u64; *this.pos += bytes_read as u64;
trace!(bytes_read = bytes_read, new_pos = this.pos, "poll_read");
Ok(()).into() Ok(()).into()
} }
} }
@ -105,16 +109,18 @@ impl<R: tokio::io::AsyncRead> tokio::io::AsyncBufRead for NaiveSeeker<R> {
self.project().r.poll_fill_buf(cx) self.project().r.poll_fill_buf(cx)
} }
#[instrument(level = "trace", skip(self))]
fn consume(self: std::pin::Pin<&mut Self>, amt: usize) { fn consume(self: std::pin::Pin<&mut Self>, amt: usize) {
let this = self.project(); let this = self.project();
this.r.consume(amt); this.r.consume(amt);
let pos: &mut u64 = this.pos; *this.pos += amt as u64;
*pos += amt as u64;
trace!(new_pos = this.pos, "consume");
} }
} }
impl<R: tokio::io::AsyncRead> tokio::io::AsyncSeek for NaiveSeeker<R> { impl<R: tokio::io::AsyncRead> tokio::io::AsyncSeek for NaiveSeeker<R> {
#[instrument(skip(self), err(Debug))] #[instrument(level="trace", skip(self), fields(inner_pos=%self.pos), err(Debug))]
fn start_seek( fn start_seek(
self: std::pin::Pin<&mut Self>, self: std::pin::Pin<&mut Self>,
position: std::io::SeekFrom, position: std::io::SeekFrom,
@ -149,23 +155,24 @@ impl<R: tokio::io::AsyncRead> tokio::io::AsyncSeek for NaiveSeeker<R> {
} }
}; };
debug!(absolute_offset=?absolute_offset, "seek"); // we already know absolute_offset is >= self.pos
// we already know absolute_offset is larger than self.pos
debug_assert!( debug_assert!(
absolute_offset >= self.pos, absolute_offset >= self.pos,
"absolute_offset {} is larger than self.pos {}", "absolute_offset {} must be >= self.pos {}",
absolute_offset, absolute_offset,
self.pos self.pos
); );
// calculate bytes to skip // calculate bytes to skip
*self.project().bytes_to_skip = absolute_offset - self.pos; let this = self.project();
*this.bytes_to_skip = absolute_offset - *this.pos;
debug!(bytes_to_skip = *this.bytes_to_skip, "seek");
Ok(()) Ok(())
} }
#[instrument(skip(self))] #[instrument(skip_all)]
fn poll_complete( fn poll_complete(
mut self: std::pin::Pin<&mut Self>, mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>, cx: &mut std::task::Context<'_>,