From 5de04dbc12ca4de4942e9319fb7223bde54d6076 Mon Sep 17 00:00:00 2001 From: Ryan Lahfa Date: Mon, 12 Jun 2023 13:41:24 +0200 Subject: [PATCH] feat(tvix/store): increase blob chunk size From 64 bytes to 100 KBytes. We need to provide a custom wrapper with a different Default instance. Change-Id: Id7c6c437b8183b355a9e388f98cef1622b363f64 Reviewed-on: https://cl.tvl.fyi/c/depot/+/8748 Tested-by: BuildkiteCI Reviewed-by: flokli --- .../src/proto/grpc_blobservice_wrapper.rs | 71 ++++++++++++++++++- 1 file changed, 69 insertions(+), 2 deletions(-) diff --git a/tvix/store/src/proto/grpc_blobservice_wrapper.rs b/tvix/store/src/proto/grpc_blobservice_wrapper.rs index f1ab3a87e..04097997c 100644 --- a/tvix/store/src/proto/grpc_blobservice_wrapper.rs +++ b/tvix/store/src/proto/grpc_blobservice_wrapper.rs @@ -1,7 +1,13 @@ use crate::{ blobservice::BlobService, proto::sync_read_into_async_read::SyncReadIntoAsyncRead, B3Digest, }; -use std::{collections::VecDeque, io, pin::Pin, sync::Arc}; +use std::{ + collections::VecDeque, + io, + ops::{Deref, DerefMut}, + pin::Pin, + sync::Arc, +}; use tokio::task; use tokio_stream::StreamExt; use tokio_util::io::ReaderStream; @@ -20,6 +26,64 @@ impl From> for GRPCBlobServiceWrapper { } } +// This is necessary because bytes::BytesMut comes up with +// a default 64 bytes capacity that cannot be changed +// easily if you assume a bytes::BufMut trait implementation +// Therefore, we override the Default implementation here +// TODO(raitobezarius?): upstream me properly +struct BytesMutWithDefaultCapacity { + inner: bytes::BytesMut, +} + +impl Deref for BytesMutWithDefaultCapacity { + type Target = bytes::BytesMut; + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl DerefMut for BytesMutWithDefaultCapacity { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.inner + } +} + +impl Default for BytesMutWithDefaultCapacity { + fn default() -> Self { + BytesMutWithDefaultCapacity { + inner: bytes::BytesMut::with_capacity(N), + } + } +} + +impl bytes::Buf for BytesMutWithDefaultCapacity { + fn remaining(&self) -> usize { + self.inner.remaining() + } + + fn chunk(&self) -> &[u8] { + self.inner.chunk() + } + + fn advance(&mut self, cnt: usize) { + self.inner.advance(cnt); + } +} + +unsafe impl bytes::BufMut for BytesMutWithDefaultCapacity { + fn remaining_mut(&self) -> usize { + self.inner.remaining_mut() + } + + unsafe fn advance_mut(&mut self, cnt: usize) { + self.inner.advance_mut(cnt); + } + + fn chunk_mut(&mut self) -> &mut bytes::buf::UninitSlice { + self.inner.chunk_mut() + } +} + #[async_trait] impl super::blob_service_server::BlobService for GRPCBlobServiceWrapper { // https://github.com/tokio-rs/tokio/issues/2723#issuecomment-1534723933 @@ -58,7 +122,10 @@ impl super::blob_service_server::BlobService for GRPCBlobServiceWrapper { match self.blob_service.open_read(&req_digest) { Ok(Some(reader)) => { - let async_reader: SyncReadIntoAsyncRead<_, bytes::BytesMut> = reader.into(); + let async_reader: SyncReadIntoAsyncRead< + _, + BytesMutWithDefaultCapacity<{ 100 * 1024 }>, + > = reader.into(); fn stream_mapper( x: Result,