feat(tvix/store/digests): use bytes::Bytes instead of Vec<u8>

This will save us some copies, because a clone will simply create an
additional pointer to the same data.

Change-Id: I017a5d6b4c85a861b5541ebad2858ad4fbf8e8fa
Reviewed-on: https://cl.tvl.fyi/c/depot/+/8978
Reviewed-by: raitobezarius <tvl@lahfa.xyz>
Autosubmit: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
This commit is contained in:
Florian Klink 2023-07-20 13:37:29 +03:00 committed by clbot
parent 72e82ffcb1
commit a6580748aa
14 changed files with 99 additions and 68 deletions

View file

@ -143,7 +143,7 @@ impl BlobService for GRPCBlobService {
Ok(stream) => {
// map the stream of proto::BlobChunk to bytes.
let data_stream = stream.map(|x| {
x.map(|x| VecDeque::from(x.data))
x.map(|x| VecDeque::from(x.data.to_vec()))
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))
});
@ -169,8 +169,7 @@ impl BlobService for GRPCBlobService {
// bytes arriving on the RX side are wrapped inside a
// [proto::BlobChunk], and a [ReceiverStream] is constructed.
let blobchunk_stream =
ReceiverStream::new(rx).map(|x| proto::BlobChunk { data: x.to_vec() });
let blobchunk_stream = ReceiverStream::new(rx).map(|x| proto::BlobChunk { data: x.into() });
// That receiver stream is used as a stream in the gRPC BlobService.put rpc call.
let task: tokio::task::JoinHandle<Result<_, Status>> = self
@ -250,7 +249,7 @@ impl BlobWriter for GRPCBlobWriter {
match self.tokio_handle.block_on(task)? {
Ok(resp) => {
// return the digest from the response, and store it in self.digest for subsequent closes.
let digest = B3Digest::from_vec(resp.digest).map_err(|_| {
let digest: B3Digest = resp.digest.try_into().map_err(|_| {
crate::Error::StorageError(
"invalid root digest length in response".to_string(),
)

View file

@ -108,7 +108,7 @@ impl BlobWriter for MemoryBlobWriter {
let (buf, hasher) = self.writers.take().unwrap();
// We know self.hasher is doing blake3 hashing, so this won't fail.
let digest = B3Digest::from_vec(hasher.finalize().as_bytes().to_vec()).unwrap();
let digest: B3Digest = hasher.finalize().as_bytes().into();
// Only insert if the blob doesn't already exist.
let db = self.db.read()?;

View file

@ -136,8 +136,7 @@ impl BlobWriter for SledBlobWriter {
} else {
let (buf, hasher) = self.writers.take().unwrap();
// We know self.hasher is doing blake3 hashing, so this won't fail.
let digest = B3Digest::from_vec(hasher.finalize().as_bytes().to_vec()).unwrap();
let digest: B3Digest = hasher.finalize().as_bytes().into();
// Only insert if the blob doesn't already exist.
if !self.db.contains_key(digest.to_vec()).map_err(|e| {

View file

@ -1,10 +1,9 @@
use bytes::Bytes;
use data_encoding::BASE64;
use thiserror::Error;
// FUTUREWORK: make generic
#[derive(PartialEq, Eq, Hash, Debug)]
pub struct B3Digest(Vec<u8>);
pub struct B3Digest(Bytes);
// TODO: allow converting these errors to crate::Error
#[derive(Error, Debug)]
@ -14,25 +13,49 @@ pub enum Error {
}
impl B3Digest {
// constructs a [B3Digest] from a [Vec<u8>].
// Returns an error if the digest has the wrong length.
pub fn from_vec(value: Vec<u8>) -> Result<Self, Error> {
if value.len() != 32 {
Err(Error::InvalidDigestLen(value.len()))
} else {
Ok(Self(value))
}
}
// returns a copy of the inner [Vec<u8>].
pub fn to_vec(&self) -> Vec<u8> {
self.0.to_vec()
}
}
impl From<B3Digest> for bytes::Bytes {
fn from(val: B3Digest) -> Self {
val.0
}
}
impl TryFrom<Vec<u8>> for B3Digest {
type Error = Error;
// constructs a [B3Digest] from a [Vec<u8>].
// Returns an error if the digest has the wrong length.
fn try_from(value: Vec<u8>) -> Result<Self, Self::Error> {
if value.len() != 32 {
Err(Error::InvalidDigestLen(value.len()))
} else {
Ok(Self(value.into()))
}
}
}
impl TryFrom<bytes::Bytes> for B3Digest {
type Error = Error;
// constructs a [B3Digest] from a [bytes::Bytes].
// Returns an error if the digest has the wrong length.
fn try_from(value: bytes::Bytes) -> Result<Self, Self::Error> {
if value.len() != 32 {
Err(Error::InvalidDigestLen(value.len()))
} else {
Ok(Self(value))
}
}
}
impl From<&[u8; 32]> for B3Digest {
fn from(value: &[u8; 32]) -> Self {
Self(value.to_vec())
Self(value.to_vec().into())
}
}
@ -44,6 +67,6 @@ impl Clone for B3Digest {
impl std::fmt::Display for B3Digest {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "b3:{}", BASE64.encode(self.0.as_slice()))
write!(f, "b3:{}", BASE64.encode(&self.0))
}
}

View file

@ -142,12 +142,13 @@ impl DirectoryService for GRPCDirectoryService {
.spawn(async move { grpc_client.put(tokio_stream::iter(vec![directory])).await });
match self.tokio_handle.block_on(task)? {
Ok(put_directory_resp) => Ok(B3Digest::from_vec(
put_directory_resp.into_inner().root_digest,
)
.map_err(|_| {
Error::StorageError("invalid root digest length in response".to_string())
})?),
Ok(put_directory_resp) => Ok(put_directory_resp
.into_inner()
.root_digest
.try_into()
.map_err(|_| {
Error::StorageError("invalid root digest length in response".to_string())
})?),
Err(e) => Err(crate::Error::StorageError(e.to_string())),
}
}
@ -265,7 +266,7 @@ impl Iterator for StreamIterator {
for child_directory in &directory.directories {
// We ran validate() above, so we know these digests must be correct.
let child_directory_digest =
B3Digest::from_vec(child_directory.digest.clone()).unwrap();
child_directory.digest.clone().try_into().unwrap();
self.expected_directory_digests
.insert(child_directory_digest);
@ -355,7 +356,7 @@ impl DirectoryPutter for GRPCPutter {
.map_err(|e| Error::StorageError(e.to_string()))?
.root_digest;
B3Digest::from_vec(root_digest).map_err(|_| {
root_digest.try_into().map_err(|_| {
Error::StorageError("invalid root digest length in response".to_string())
})
}

View file

@ -1,5 +1,5 @@
use super::DirectoryService;
use crate::{proto::NamedNode, B3Digest, Error};
use crate::{proto::NamedNode, Error};
use std::{os::unix::ffi::OsStrExt, sync::Arc};
use tracing::{instrument, warn};
@ -40,7 +40,9 @@ pub fn traverse_to(
Ok(None)
}
crate::proto::node::Node::Directory(directory_node) => {
let digest = B3Digest::from_vec(directory_node.digest)
let digest = directory_node
.digest
.try_into()
.map_err(|_e| Error::StorageError("invalid digest length".to_string()))?;
// fetch the linked node from the directory_service

View file

@ -35,7 +35,7 @@ impl<DS: DirectoryService> DirectoryTraverser<DS> {
fn enqueue_child_directories(&mut self, directory: &proto::Directory) {
for child_directory_node in &directory.directories {
// TODO: propagate error
let child_digest = B3Digest::from_vec(child_directory_node.digest.clone()).unwrap();
let child_digest: B3Digest = child_directory_node.digest.clone().try_into().unwrap();
if self.worklist_directory_digests.contains(&child_digest)
|| self.sent_directory_digests.contains(&child_digest)

View file

@ -38,7 +38,7 @@ impl From<&proto::SymlinkNode> for InodeData {
impl From<&proto::FileNode> for InodeData {
fn from(value: &proto::FileNode) -> Self {
InodeData::Regular(
B3Digest::from_vec(value.digest.clone()).unwrap(),
value.digest.clone().try_into().unwrap(),
value.size,
value.executable,
)
@ -49,7 +49,7 @@ impl From<&proto::FileNode> for InodeData {
impl From<&proto::DirectoryNode> for InodeData {
fn from(value: &proto::DirectoryNode) -> Self {
InodeData::Directory(DirectoryInodeData::Sparse(
B3Digest::from_vec(value.digest.clone()).unwrap(),
value.digest.clone().try_into().unwrap(),
value.size,
))
}

View file

@ -3,7 +3,6 @@ use crate::{
blobservice::BlobService,
directoryservice::DirectoryService,
proto::{self, NamedNode},
B3Digest,
};
use count_write::CountWrite;
use nix_compat::nar;
@ -65,7 +64,7 @@ fn walk_node(
.map_err(RenderError::NARWriterError)?;
}
proto::node::Node::File(proto_file_node) => {
let digest = B3Digest::from_vec(proto_file_node.digest.clone()).map_err(|_e| {
let digest = proto_file_node.digest.clone().try_into().map_err(|_e| {
warn!(
file_node = ?proto_file_node,
"invalid digest length in file node",
@ -96,8 +95,11 @@ fn walk_node(
.map_err(RenderError::NARWriterError)?;
}
proto::node::Node::Directory(proto_directory_node) => {
let digest =
B3Digest::from_vec(proto_directory_node.digest.to_vec()).map_err(|_e| {
let digest = proto_directory_node
.digest
.clone()
.try_into()
.map_err(|_e| {
RenderError::StoreError(crate::Error::StorageError(
"invalid digest len in directory node".to_string(),
))

View file

@ -1,6 +1,4 @@
use crate::{
blobservice::BlobService, proto::sync_read_into_async_read::SyncReadIntoAsyncRead, B3Digest,
};
use crate::{blobservice::BlobService, proto::sync_read_into_async_read::SyncReadIntoAsyncRead};
use std::{
collections::VecDeque,
io,
@ -96,7 +94,9 @@ impl super::blob_service_server::BlobService for GRPCBlobServiceWrapper {
request: Request<super::StatBlobRequest>,
) -> Result<Response<super::BlobMeta>, Status> {
let rq = request.into_inner();
let req_digest = B3Digest::from_vec(rq.digest)
let req_digest = rq
.digest
.try_into()
.map_err(|_e| Status::invalid_argument("invalid digest length"))?;
if rq.include_chunks || rq.include_bao {
@ -117,7 +117,9 @@ impl super::blob_service_server::BlobService for GRPCBlobServiceWrapper {
) -> Result<Response<Self::ReadStream>, Status> {
let rq = request.into_inner();
let req_digest = B3Digest::from_vec(rq.digest)
let req_digest = rq
.digest
.try_into()
.map_err(|_e| Status::invalid_argument("invalid digest length"))?;
match self.blob_service.open_read(&req_digest) {

View file

@ -38,8 +38,10 @@ impl proto::directory_service_server::DirectoryService for GRPCDirectoryServiceW
// look at the digest in the request and put it in the top of the queue.
match &req_inner.by_what {
None => return Err(Status::invalid_argument("by_what needs to be specified")),
Some(proto::get_directory_request::ByWhat::Digest(digest)) => {
let digest = B3Digest::from_vec(digest.to_vec())
Some(proto::get_directory_request::ByWhat::Digest(ref digest)) => {
let digest: B3Digest = digest
.clone()
.try_into()
.map_err(|_e| Status::invalid_argument("invalid digest length"))?;
task::spawn(async move {
@ -91,6 +93,8 @@ impl proto::directory_service_server::DirectoryService for GRPCDirectoryServiceW
// This keeps track of the seen directory keys, and their size.
// This is used to validate the size field of a reference to a previously sent directory.
// We don't need to keep the contents around, they're stored in the DB.
// https://github.com/rust-lang/rust-clippy/issues/5812
#[allow(clippy::mutable_key_type)]
let mut seen_directories_sizes: HashMap<B3Digest, u32> = HashMap::new();
let mut last_directory_dgst: Option<B3Digest> = None;
@ -110,7 +114,10 @@ impl proto::directory_service_server::DirectoryService for GRPCDirectoryServiceW
// to ensure it has been seen already in this stream, and that the size
// matches what we recorded.
for child_directory in &directory.directories {
let child_directory_digest = B3Digest::from_vec(child_directory.digest.to_vec())
let child_directory_digest: B3Digest = child_directory
.digest
.clone()
.try_into()
.map_err(|_e| Status::internal("invalid child directory digest len"))?;
match seen_directories_sizes.get(&child_directory_digest) {

View file

@ -247,12 +247,11 @@ impl Directory {
pub fn digest(&self) -> B3Digest {
let mut hasher = blake3::Hasher::new();
let vec = hasher
hasher
.update(&self.encode_to_vec())
.finalize()
.as_bytes()
.to_vec();
B3Digest::from_vec(vec).unwrap()
.into()
}
/// validate checks the directory for invalid data, such as:

View file

@ -1,7 +1,4 @@
use crate::{
proto::{Directory, DirectoryNode, FileNode, SymlinkNode, ValidateDirectoryError},
B3Digest,
};
use crate::proto::{Directory, DirectoryNode, FileNode, SymlinkNode, ValidateDirectoryError};
use lazy_static::lazy_static;
lazy_static! {
@ -69,11 +66,12 @@ fn digest() {
assert_eq!(
d.digest(),
B3Digest::from_vec(vec![
vec![
0xaf, 0x13, 0x49, 0xb9, 0xf5, 0xf9, 0xa1, 0xa6, 0xa0, 0x40, 0x4d, 0xea, 0x36, 0xdc,
0xc9, 0x49, 0x9b, 0xcb, 0x25, 0xc9, 0xad, 0xc1, 0x12, 0xb7, 0xcc, 0x9a, 0x93, 0xca,
0xe4, 0x1f, 0x32, 0x62
])
]
.try_into()
.unwrap()
)
}

View file

@ -216,8 +216,8 @@ impl EvalIO for TvixStoreIO {
))
}
crate::proto::node::Node::File(file_node) => {
let digest =
B3Digest::from_vec(file_node.digest.clone()).map_err(|_e| {
let digest: B3Digest =
file_node.digest.clone().try_into().map_err(|_e| {
error!(
file_node = ?file_node,
"invalid digest"
@ -272,16 +272,15 @@ impl EvalIO for TvixStoreIO {
match node {
crate::proto::node::Node::Directory(directory_node) => {
// fetch the Directory itself.
let digest =
B3Digest::from_vec(directory_node.digest.clone()).map_err(|_e| {
io::Error::new(
io::ErrorKind::InvalidData,
format!(
"invalid digest length in directory node: {:?}",
directory_node
),
)
})?;
let digest = directory_node.digest.clone().try_into().map_err(|_e| {
io::Error::new(
io::ErrorKind::InvalidData,
format!(
"invalid digest length in directory node: {:?}",
directory_node
),
)
})?;
if let Some(directory) = self.directory_service.get(&digest)? {
let mut children: Vec<(Vec<u8>, FileType)> = Vec::new();