feat(tvix/store/blobservice): add GRPCBlobServiceWrapper
This takes a BlobService and ChunkService in the constructor, and provides a [proto::blob_service_server::BlobService] trait for it. Implementing proto::blob_service_server::BlobService is a lot of surface to cover, and providing this wrapper will make individual implementations taking care of how to store chunks or chunking information much simpler. Change-Id: Ia7b46484fb3ac9104354d496ff2922dca96ff7b9 Reviewed-on: https://cl.tvl.fyi/c/depot/+/8092 Reviewed-by: raitobezarius <tvl@lahfa.xyz> Tested-by: BuildkiteCI
This commit is contained in:
parent
bdf0725def
commit
69516f1f68
3 changed files with 244 additions and 0 deletions
|
@ -1,5 +1,6 @@
|
|||
use std::sync::PoisonError;
|
||||
use thiserror::Error;
|
||||
use tonic::Status;
|
||||
|
||||
/// Errors related to communication with the store.
|
||||
#[derive(Debug, Error)]
|
||||
|
@ -16,3 +17,12 @@ impl<T> From<PoisonError<T>> for Error {
|
|||
Error::StorageError(value.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Error> for Status {
|
||||
fn from(value: Error) -> Self {
|
||||
match value {
|
||||
Error::InvalidRequest(msg) => Status::invalid_argument(msg),
|
||||
Error::StorageError(msg) => Status::data_loss(format!("storage error: {}", msg)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
231
tvix/store/src/proto/grpc_blobservice_wrapper.rs
Normal file
231
tvix/store/src/proto/grpc_blobservice_wrapper.rs
Normal file
|
@ -0,0 +1,231 @@
|
|||
use crate::{blobservice::BlobService, chunkservice::ChunkService, Error};
|
||||
use data_encoding::BASE64;
|
||||
use tokio::{sync::mpsc::channel, task};
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
use tonic::{async_trait, Request, Response, Status, Streaming};
|
||||
use tracing::{debug, instrument};
|
||||
|
||||
pub struct GRPCBlobServiceWrapper<BS: BlobService, CS: ChunkService> {
|
||||
blob_service: BS,
|
||||
chunk_service: CS,
|
||||
}
|
||||
|
||||
impl<BS: BlobService, CS: ChunkService> GRPCBlobServiceWrapper<BS, CS> {
|
||||
pub fn new(blob_service: BS, chunk_service: CS) -> Self {
|
||||
Self {
|
||||
blob_service,
|
||||
chunk_service,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<
|
||||
BS: BlobService + Send + Sync + Clone + 'static,
|
||||
CS: ChunkService + Send + Sync + Clone + 'static,
|
||||
> super::blob_service_server::BlobService for GRPCBlobServiceWrapper<BS, CS>
|
||||
{
|
||||
type ReadStream = ReceiverStream<Result<super::BlobChunk, Status>>;
|
||||
|
||||
#[instrument(skip(self))]
|
||||
async fn stat(
|
||||
&self,
|
||||
request: Request<super::StatBlobRequest>,
|
||||
) -> Result<Response<super::BlobMeta>, Status> {
|
||||
let rq = request.into_inner();
|
||||
match self.blob_service.stat(&rq) {
|
||||
Ok(None) => Err(Status::not_found(format!(
|
||||
"blob {} not found",
|
||||
BASE64.encode(&rq.digest)
|
||||
))),
|
||||
Ok(Some(blob_meta)) => Ok(Response::new(blob_meta)),
|
||||
Err(e) => Err(e.into()),
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip(self))]
|
||||
async fn read(
|
||||
&self,
|
||||
request: Request<super::ReadBlobRequest>,
|
||||
) -> Result<Response<Self::ReadStream>, Status> {
|
||||
let req = request.into_inner();
|
||||
let (tx, rx) = channel(5);
|
||||
|
||||
// query the chunk service for more detailed blob info
|
||||
let stat_resp = self.blob_service.stat(&super::StatBlobRequest {
|
||||
digest: req.digest.to_vec(),
|
||||
include_chunks: true,
|
||||
..Default::default()
|
||||
})?;
|
||||
|
||||
match stat_resp {
|
||||
None => {
|
||||
// If the stat didn't return any blobmeta, the client might
|
||||
// still have asked for a single chunk to be read.
|
||||
// Check the chunkstore.
|
||||
if let Some(data) = self.chunk_service.get(&req.digest)? {
|
||||
// We already know the hash matches, and contrary to
|
||||
// iterating over a blobmeta, we can't know the size,
|
||||
// so send the contents of that chunk over,
|
||||
// as the first (and only) element of the stream.
|
||||
task::spawn(async move {
|
||||
let res = Ok(super::BlobChunk { data });
|
||||
// send the result to the client. If the client already left, that's also fine.
|
||||
if (tx.send(res).await).is_err() {
|
||||
debug!("receiver dropped");
|
||||
}
|
||||
});
|
||||
} else {
|
||||
return Err(Status::not_found(format!(
|
||||
"blob {} not found",
|
||||
BASE64.encode(&req.digest),
|
||||
)));
|
||||
}
|
||||
}
|
||||
Some(blobmeta) => {
|
||||
let chunk_client = self.chunk_service.clone();
|
||||
|
||||
// TODO: use BlobReader?
|
||||
// But then we might not be able to send compressed chunks as-is.
|
||||
// Might require implementing https://docs.rs/futures-core/latest/futures_core/stream/trait.Stream.html for it
|
||||
// first, so we can .next().await in here.
|
||||
|
||||
task::spawn(async move {
|
||||
for chunkmeta in blobmeta.chunks {
|
||||
// request chunk.
|
||||
// We don't need to validate the digest again, as
|
||||
// that's required for all implementations of ChunkService.
|
||||
let res = match chunk_client.get(&chunkmeta.digest) {
|
||||
Err(e) => Err(e.into()),
|
||||
// TODO: make this a separate error type
|
||||
Ok(None) => Err(Error::StorageError(format!(
|
||||
"consistency error: chunk {} for blob {} not found",
|
||||
BASE64.encode(&chunkmeta.digest),
|
||||
BASE64.encode(&req.digest),
|
||||
))
|
||||
.into()),
|
||||
Ok(Some(data)) => {
|
||||
// We already know the hash matches, but also
|
||||
// check the size matches what chunkmeta said.
|
||||
if data.len() as u32 != chunkmeta.size {
|
||||
Err(Error::StorageError(format!(
|
||||
"consistency error: chunk {} for blob {} has wrong size, expected {}, got {}",
|
||||
BASE64.encode(&chunkmeta.digest),
|
||||
BASE64.encode(&req.digest),
|
||||
chunkmeta.size,
|
||||
data.len(),
|
||||
)).into())
|
||||
} else {
|
||||
// send out the current chunk
|
||||
// TODO: we might want to break this up further if too big?
|
||||
Ok(super::BlobChunk { data })
|
||||
}
|
||||
}
|
||||
};
|
||||
// send the result to the client
|
||||
if (tx.send(res).await).is_err() {
|
||||
debug!("receiver dropped");
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
let receiver_stream = ReceiverStream::new(rx);
|
||||
Ok(Response::new(receiver_stream))
|
||||
}
|
||||
|
||||
#[instrument(skip(self))]
|
||||
async fn put(
|
||||
&self,
|
||||
request: Request<Streaming<super::BlobChunk>>,
|
||||
) -> Result<Response<super::PutBlobResponse>, Status> {
|
||||
let mut req_inner = request.into_inner();
|
||||
|
||||
// TODO: for now, we collect all Chunks into a large Vec<u8>, and then
|
||||
// pass it to a (content-defined) Chunker.
|
||||
// This is because the fastcdc crate currently operates on byte slices,
|
||||
// not on something implementing [std::io::Read].
|
||||
// (see https://github.com/nlfiedler/fastcdc-rs/issues/17)
|
||||
|
||||
let mut blob_contents: Vec<u8> = Vec::new();
|
||||
|
||||
while let Some(mut blob_chunk) = req_inner.message().await? {
|
||||
blob_contents.append(&mut blob_chunk.data);
|
||||
}
|
||||
|
||||
// initialize a new chunker
|
||||
// TODO: play with chunking sizes
|
||||
let chunker = fastcdc::FastCDC::new(
|
||||
&blob_contents,
|
||||
64 * 1024 / 4, // min
|
||||
64 * 1024, // avg
|
||||
64 * 1024 * 4, // max
|
||||
);
|
||||
|
||||
// initialize blake3 hashers. chunk_hasher is used and reset for each
|
||||
// chunk, blob_hasher calculates the hash of the whole blob.
|
||||
let mut chunk_hasher = blake3::Hasher::new();
|
||||
let mut blob_hasher = blake3::Hasher::new();
|
||||
|
||||
// start a BlobMeta, which we'll fill while looping over the chunks
|
||||
let mut blob_meta = super::BlobMeta::default();
|
||||
|
||||
// loop over all the chunks
|
||||
for chunk in chunker {
|
||||
// extract the data itself
|
||||
let chunk_data: Vec<u8> =
|
||||
blob_contents[chunk.offset..chunk.offset + chunk.length].to_vec();
|
||||
|
||||
// calculate the digest of that chunk
|
||||
chunk_hasher.update(&chunk_data);
|
||||
let chunk_digest = chunk_hasher.finalize();
|
||||
chunk_hasher.reset();
|
||||
|
||||
// also update blob_hasher
|
||||
blob_hasher.update(&chunk_data);
|
||||
|
||||
// check if chunk is already in db, and if not, insert.
|
||||
match self.chunk_service.has(chunk_digest.as_bytes()) {
|
||||
Err(e) => {
|
||||
return Err(Error::StorageError(format!(
|
||||
"unable to check if chunk {} exists: {}",
|
||||
BASE64.encode(chunk_digest.as_bytes()),
|
||||
e
|
||||
))
|
||||
.into());
|
||||
}
|
||||
Ok(has_chunk) => {
|
||||
if !has_chunk {
|
||||
if let Err(e) = self.chunk_service.put(chunk_data.to_vec()) {
|
||||
return Err(Error::StorageError(format!(
|
||||
"unable to store chunk {}: {}",
|
||||
BASE64.encode(chunk_digest.as_bytes()),
|
||||
e
|
||||
))
|
||||
.into());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// add chunk to blobmeta
|
||||
blob_meta.chunks.push(super::blob_meta::ChunkMeta {
|
||||
digest: chunk_digest.as_bytes().to_vec(),
|
||||
size: chunk.length as u32,
|
||||
});
|
||||
}
|
||||
|
||||
// done reading data, finalize blob_hasher and insert blobmeta.
|
||||
let blob_digest = blob_hasher.finalize();
|
||||
|
||||
// TODO: don't store if we already have it (potentially with different chunking)
|
||||
match self.blob_service.put(blob_digest.as_bytes(), blob_meta) {
|
||||
Ok(()) => Ok(Response::new(super::PutBlobResponse {
|
||||
digest: blob_digest.as_bytes().to_vec(),
|
||||
})),
|
||||
Err(e) => Err(e.into()),
|
||||
}
|
||||
}
|
||||
}
|
|
@ -7,6 +7,9 @@ use prost::Message;
|
|||
|
||||
use nix_compat::store_path::{ParseStorePathError, StorePath};
|
||||
|
||||
mod grpc_blobservice_wrapper;
|
||||
pub use grpc_blobservice_wrapper::GRPCBlobServiceWrapper;
|
||||
|
||||
tonic::include_proto!("tvix.store.v1");
|
||||
|
||||
#[cfg(feature = "reflection")]
|
||||
|
|
Loading…
Reference in a new issue