refactor(tvix/store): remove ChunkService

Whether chunking is involved or not, is an implementation detail of each
Blobstore. Consumers of a whole blob shouldn't need to worry about that.
It currently is not visible in the gRPC interface either. It
shouldn't bleed into everything.

Let the BlobService trait provide `open_read` and `open_write` methods,
which return handles providing io::Read or io::Write, and leave the
details up to the implementation.

This means, our custom BlobReader module can go away, and all the
chunking bits in there, too.

In the future, we might still want to add more chunking-aware syncing,
but as a syncing strategy some stores can expose, not as a fundamental
protocol component.

This currently needs "SyncReadIntoAsyncRead", taken and vendored in from
https://github.com/tokio-rs/tokio/pull/5669.
It provides a AsyncRead for a sync Read, which is necessary to connect
our (sync) BlobReader interface to a GRPC server implementation.

As an alternative, we could also make the BlobReader itself async, and
let consumers of the trait (EvalIO) deal with the async-ness, but this
is less of a change for now.

In terms of vendoring, I initially tried to move our tokio crate to
these commits, but ended up in version incompatibilities, so let's
vendor it in for now.

Change-Id: I5969ebbc4c0e1ceece47981be3b9e7cfb3f59ad0
Reviewed-on: https://cl.tvl.fyi/c/depot/+/8551
Tested-by: BuildkiteCI
Reviewed-by: tazjin <tazjin@tvl.su>
This commit is contained in:
Florian Klink 2023-05-11 15:49:01 +03:00 committed by flokli
parent b22b685f0b
commit 616fa4476f
26 changed files with 560 additions and 1297 deletions

9
tvix/Cargo.lock generated
View file

@ -692,12 +692,6 @@ version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e88a8acf291dafb59c2d96e8f59828f3838bb1a70398823ade51a84de6a6deed"
[[package]]
name = "fastcdc"
version = "3.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "10010f9b2e601acfda445cb35385cf4241fce85c6e1ea702b157c39f79f8787a"
[[package]]
name = "fastrand"
version = "1.9.0"
@ -2704,10 +2698,11 @@ version = "0.1.0"
dependencies = [
"anyhow",
"blake3",
"bytes",
"clap 4.2.7",
"count-write",
"data-encoding",
"fastcdc",
"futures",
"lazy_static",
"nix-compat",
"prost",

View file

@ -1933,16 +1933,6 @@ rec {
"The Rust-Crypto Project Developers"
];
};
"fastcdc" = rec {
crateName = "fastcdc";
version = "3.0.3";
edition = "2018";
sha256 = "0ykqz1wrzhspn41af7kfbklgqha2ry2m7csw8kdcy6k05sdhy08h";
authors = [
"Nathan Fiedler <nathanfiedler@fastmail.fm>"
];
};
"fastrand" = rec {
crateName = "fastrand";
@ -8005,6 +7995,10 @@ rec {
packageId = "blake3";
features = [ "rayon" "std" ];
}
{
name = "bytes";
packageId = "bytes";
}
{
name = "clap";
packageId = "clap 4.2.7";
@ -8019,8 +8013,8 @@ rec {
packageId = "data-encoding";
}
{
name = "fastcdc";
packageId = "fastcdc";
name = "futures";
packageId = "futures";
}
{
name = "lazy_static";

View file

@ -9,7 +9,6 @@ blake3 = { version = "1.3.1", features = ["rayon", "std"] }
clap = { version = "4.0", features = ["derive", "env"] }
count-write = "0.1.0"
data-encoding = "2.3.3"
fastcdc = "3.0.2"
lazy_static = "1.4.0"
nix-compat = { path = "../nix-compat" }
prost = "0.11.2"
@ -25,6 +24,8 @@ tracing-subscriber = { version = "0.3.16", features = ["json"] }
walkdir = "2.3.2"
tokio-util = { version = "0.7.8", features = ["io", "io-util"] }
tower = "0.4.13"
futures = "0.3.28"
bytes = "1.4.0"
[dependencies.tonic-reflection]
optional = true

View file

@ -21,15 +21,6 @@ service BlobService {
// The server may decide on whatever chunking it may seem fit as a size for
// the individual BlobChunk sent in the response stream.
//
// It specifically is NOT necessarily using chunk sizes communicated in a
// previous Stat request.
//
// It's up to the specific store to decide on whether it allows Read on a
// Blob at all, or only on smaller chunks communicated in a Stat() call
// first.
//
// Clients are enouraged to Stat() first, and then only read the individual
// chunks they don't have yet.
rpc Read(ReadBlobRequest) returns (stream BlobChunk);
// Put uploads a Blob, by reading a stream of bytes.

View file

@ -43,16 +43,6 @@ type BlobServiceClient interface {
//
// The server may decide on whatever chunking it may seem fit as a size for
// the individual BlobChunk sent in the response stream.
//
// It specifically is NOT necessarily using chunk sizes communicated in a
// previous Stat request.
//
// It's up to the specific store to decide on whether it allows Read on a
// Blob at all, or only on smaller chunks communicated in a Stat() call
// first.
//
// Clients are enouraged to Stat() first, and then only read the individual
// chunks they don't have yet.
Read(ctx context.Context, in *ReadBlobRequest, opts ...grpc.CallOption) (BlobService_ReadClient, error)
// Put uploads a Blob, by reading a stream of bytes.
//
@ -160,16 +150,6 @@ type BlobServiceServer interface {
//
// The server may decide on whatever chunking it may seem fit as a size for
// the individual BlobChunk sent in the response stream.
//
// It specifically is NOT necessarily using chunk sizes communicated in a
// previous Stat request.
//
// It's up to the specific store to decide on whether it allows Read on a
// Blob at all, or only on smaller chunks communicated in a Stat() call
// first.
//
// Clients are enouraged to Stat() first, and then only read the individual
// chunks they don't have yet.
Read(*ReadBlobRequest, BlobService_ReadServer) error
// Put uploads a Blob, by reading a stream of bytes.
//

View file

@ -8,7 +8,6 @@ use nix_compat::nixhash::NixHashWithMode;
use std::path::PathBuf;
use tracing_subscriber::prelude::*;
use tvix_store::blobservice::SledBlobService;
use tvix_store::chunkservice::SledChunkService;
use tvix_store::directoryservice::SledDirectoryService;
use tvix_store::import::import_path;
use tvix_store::nar::NARCalculationService;
@ -87,7 +86,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// initialize stores
let mut blob_service = SledBlobService::new("blobs.sled".into())?;
let mut chunk_service = SledChunkService::new("chunks.sled".into())?;
let mut directory_service = SledDirectoryService::new("directories.sled".into())?;
let path_info_service = SledPathInfoService::new("pathinfo.sled".into())?;
@ -102,15 +100,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let nar_calculation_service = NonCachingNARCalculationService::new(
blob_service.clone(),
chunk_service.clone(),
directory_service.clone(),
);
#[allow(unused_mut)]
let mut router = server
.add_service(BlobServiceServer::new(GRPCBlobServiceWrapper::new(
.add_service(BlobServiceServer::new(GRPCBlobServiceWrapper::from(
blob_service,
chunk_service,
)))
.add_service(DirectoryServiceServer::new(
GRPCDirectoryServiceWrapper::from(directory_service),
@ -135,17 +131,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
Commands::Import { paths } => {
let nar_calculation_service = NonCachingNARCalculationService::new(
blob_service.clone(),
chunk_service.clone(),
directory_service.clone(),
);
for path in paths {
let root_node = import_path(
&mut blob_service,
&mut chunk_service,
&mut directory_service,
&path,
)?;
let root_node = import_path(&mut blob_service, &mut directory_service, &path)?;
let nar_hash = NixHashWithMode::Recursive(NixHash::new(
HashAlgo::Sha256,

View file

@ -1,376 +0,0 @@
use std::io::{self, Cursor, Read, Write};
use data_encoding::BASE64;
use crate::{chunkservice::ChunkService, proto};
/// BlobReader implements reading of a blob, by querying individual chunks.
///
/// It doesn't talk to BlobService, but assumes something has already fetched
/// blob_meta already.
pub struct BlobReader<'a, CS: ChunkService> {
// used to look up chunks
chunk_service: &'a CS,
// internal iterator over chunk hashes and their sizes
chunks_iter: std::vec::IntoIter<proto::blob_meta::ChunkMeta>,
// If a chunk was partially read (if buf.len() < chunk.size),
// a cursor to its contents are stored here.
current_chunk: Option<Cursor<Vec<u8>>>,
}
impl<'a, CS: ChunkService> BlobReader<'a, CS> {
pub fn open(chunk_service: &'a CS, blob_meta: proto::BlobMeta) -> Self {
Self {
chunk_service,
chunks_iter: blob_meta.chunks.into_iter(),
current_chunk: None,
}
}
/// reads (up to n bytes) from the current chunk into buf (if there is
/// a chunk).
///
/// If it arrives at the end of the chunk, sets it back to None.
/// Returns a [std::io::Result<usize>] of the bytes read from the chunk.
fn read_from_current_chunk<W: std::io::Write>(
&mut self,
m: usize,
buf: &mut W,
) -> std::io::Result<usize> {
// If there's still something in partial_chunk, read from there
// (up to m: usize bytes) and return the number of bytes read.
if let Some(current_chunk) = &mut self.current_chunk {
let result = io::copy(&mut current_chunk.take(m as u64), buf);
match result {
Ok(n) => {
// if we were not able to read all off m bytes,
// this means we arrived at the end of the chunk.
if n < m as u64 {
self.current_chunk = None
}
// n can never be > m, so downcasting this to usize is ok.
Ok(n as usize)
}
Err(e) => Err(e),
}
} else {
Ok(0)
}
}
}
impl<CS: ChunkService> std::io::Read for BlobReader<'_, CS> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
let read_max = buf.len();
let mut bytes_read = 0_usize;
let mut buf_w = std::io::BufWriter::new(buf);
// read up to buf.len() bytes into buf, by reading from the current
// chunk and subsequent ones.
loop {
// try to fill buf with bytes from the current chunk
// (if there's still one)
let n = self.read_from_current_chunk(read_max - bytes_read, &mut buf_w)?;
bytes_read += n;
// We want to make sure we don't accidentially read past more than
// we're allowed to.
assert!(bytes_read <= read_max);
// buf is entirerly filled, we're done.
if bytes_read == read_max {
buf_w.flush()?;
break Ok(bytes_read);
}
// Otherwise, bytes_read is < read_max, so we could still write
// more to buf.
// Check if we have more chunks to read from.
match self.chunks_iter.next() {
// No more chunks, we're done.
None => {
buf_w.flush()?;
return Ok(bytes_read);
}
// There's another chunk to visit, fetch its contents
Some(chunk_meta) => {
let chunk_meta_digest: [u8; 32] =
chunk_meta.digest.clone().try_into().map_err(|_e| {
std::io::Error::new(
io::ErrorKind::InvalidData,
format!(
"chunk in chunkmeta has wrong digest size, expected 32, got {}",
chunk_meta.digest.len(),
),
)
})?;
match self.chunk_service.get(&chunk_meta_digest) {
// Fetch successful, put it into `self.current_chunk` and restart the loop.
Ok(Some(chunk_data)) => {
// make sure the size matches what chunk_meta says as well.
if chunk_data.len() as u32 != chunk_meta.size {
break Err(std::io::Error::new(
io::ErrorKind::InvalidData,
format!(
"chunk_service returned chunk with wrong size for {}, expected {}, got {}",
BASE64.encode(&chunk_meta.digest), chunk_meta.size, chunk_data.len()
)
));
}
self.current_chunk = Some(Cursor::new(chunk_data));
}
// Chunk requested does not exist
Ok(None) => {
break Err(std::io::Error::new(
io::ErrorKind::NotFound,
format!("chunk {} not found", BASE64.encode(&chunk_meta.digest)),
))
}
// Error occured while fetching the next chunk, propagate the error from the chunk service
Err(e) => {
break Err(std::io::Error::new(io::ErrorKind::InvalidData, e));
}
}
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::BlobReader;
use crate::chunkservice::ChunkService;
use crate::proto;
use crate::tests::fixtures::DUMMY_DATA_1;
use crate::tests::fixtures::DUMMY_DATA_2;
use crate::tests::fixtures::DUMMY_DIGEST;
use crate::tests::utils::gen_chunk_service;
use std::io::Cursor;
use std::io::Read;
use std::io::Write;
#[test]
/// reading from a blobmeta with zero chunks should produce zero bytes.
fn empty_blobmeta() -> anyhow::Result<()> {
let chunk_service = gen_chunk_service();
let blobmeta = proto::BlobMeta {
chunks: vec![],
inline_bao: vec![],
};
let mut blob_reader = BlobReader::open(&chunk_service, blobmeta);
let mut buf = Cursor::new(Vec::new());
let res = std::io::copy(&mut blob_reader, &mut buf);
assert_eq!(0, res.unwrap());
Ok(())
}
#[test]
/// trying to read something where the chunk doesn't exist should fail
fn missing_chunk_fail() -> anyhow::Result<()> {
let chunk_service = gen_chunk_service();
let blobmeta = proto::BlobMeta {
chunks: vec![proto::blob_meta::ChunkMeta {
digest: DUMMY_DIGEST.to_vec(),
size: 42,
}],
inline_bao: vec![],
};
let mut blob_reader = BlobReader::open(&chunk_service, blobmeta);
let mut buf = Cursor::new(Vec::new());
let res = std::io::copy(&mut blob_reader, &mut buf);
assert!(res.is_err());
Ok(())
}
#[test]
/// read something containing the single (empty) chunk
fn empty_chunk() -> anyhow::Result<()> {
let chunk_service = gen_chunk_service();
// insert a single chunk
let dgst = chunk_service.put(vec![]).expect("must succeed");
// assemble a blobmeta
let blobmeta = proto::BlobMeta {
chunks: vec![proto::blob_meta::ChunkMeta {
digest: dgst.to_vec(),
size: 0,
}],
inline_bao: vec![],
};
let mut blob_reader = BlobReader::open(&chunk_service, blobmeta);
let mut buf: Vec<u8> = Vec::new();
let res =
std::io::copy(&mut blob_reader, &mut Cursor::new(&mut buf)).expect("must succeed");
assert_eq!(res, 0, "number of bytes read must match");
assert!(buf.is_empty(), "buf must be empty");
Ok(())
}
/// read something which contains a single chunk
#[test]
fn single_chunk() -> anyhow::Result<()> {
let chunk_service = gen_chunk_service();
// insert a single chunk
let dgst = chunk_service
.put(DUMMY_DATA_1.clone())
.expect("must succeed");
// assemble a blobmeta
let blobmeta = proto::BlobMeta {
chunks: vec![proto::blob_meta::ChunkMeta {
digest: dgst.to_vec(),
size: 3,
}],
inline_bao: vec![],
};
let mut blob_reader = BlobReader::open(&chunk_service, blobmeta);
let mut buf: Vec<u8> = Vec::new();
let res =
std::io::copy(&mut blob_reader, &mut Cursor::new(&mut buf)).expect("must succeed");
assert_eq!(res, 3, "number of bytes read must match");
assert_eq!(DUMMY_DATA_1[..], buf[..], "data read must match");
Ok(())
}
/// read something referring to a chunk, but with wrong size
#[test]
fn wrong_size_fail() -> anyhow::Result<()> {
let chunk_service = gen_chunk_service();
// insert chunks
let dgst_1 = chunk_service
.put(DUMMY_DATA_1.clone())
.expect("must succeed");
// assemble a blobmeta
let blobmeta = proto::BlobMeta {
chunks: vec![proto::blob_meta::ChunkMeta {
digest: dgst_1.to_vec(),
size: 42,
}],
inline_bao: vec![],
};
let mut blob_reader = BlobReader::open(&chunk_service, blobmeta);
let mut buf: Vec<u8> = Vec::new();
let res = std::io::copy(&mut blob_reader, &mut Cursor::new(&mut buf));
assert!(res.is_err(), "reading must fail");
Ok(())
}
/// read something referring to multiple chunks
#[test]
fn multiple_chunks() -> anyhow::Result<()> {
let chunk_service = gen_chunk_service();
// insert chunks
let dgst_1 = chunk_service
.put(DUMMY_DATA_1.clone())
.expect("must succeed");
let dgst_2 = chunk_service
.put(DUMMY_DATA_2.clone())
.expect("must succeed");
// assemble a blobmeta
let blobmeta = proto::BlobMeta {
chunks: vec![
proto::blob_meta::ChunkMeta {
digest: dgst_1.to_vec(),
size: 3,
},
proto::blob_meta::ChunkMeta {
digest: dgst_2.to_vec(),
size: 2,
},
proto::blob_meta::ChunkMeta {
digest: dgst_1.to_vec(),
size: 3,
},
],
inline_bao: vec![],
};
// assemble ecpected data
let mut expected_data: Vec<u8> = Vec::new();
expected_data.extend_from_slice(&DUMMY_DATA_1[..]);
expected_data.extend_from_slice(&DUMMY_DATA_2[..]);
expected_data.extend_from_slice(&DUMMY_DATA_1[..]);
// read via io::copy
{
let mut blob_reader = BlobReader::open(&chunk_service, blobmeta.clone());
let mut buf: Vec<u8> = Vec::new();
let res =
std::io::copy(&mut blob_reader, &mut Cursor::new(&mut buf)).expect("must succeed");
assert_eq!(8, res, "number of bytes read must match");
assert_eq!(expected_data[..], buf[..], "data read must match");
}
// now read the same thing again, but not via io::copy, but individually
{
let mut blob_reader = BlobReader::open(&chunk_service, blobmeta);
let mut buf: Vec<u8> = Vec::new();
let mut cursor = Cursor::new(&mut buf);
let mut bytes_read = 0;
loop {
let mut smallbuf = [0xff; 1];
match blob_reader.read(&mut smallbuf) {
Ok(n) => {
if n == 0 {
break;
}
let w_b = cursor.write(&smallbuf).unwrap();
assert_eq!(n, w_b);
bytes_read += w_b;
}
Err(_) => {
panic!("error occured during read");
}
}
}
assert_eq!(8, bytes_read, "number of bytes read must match");
assert_eq!(expected_data[..], buf[..], "data read must match");
}
Ok(())
}
}

View file

@ -1,51 +1,80 @@
use data_encoding::BASE64;
use std::io::Cursor;
use std::{
collections::HashMap,
sync::{Arc, RwLock},
};
use tracing::instrument;
use tracing::{instrument, warn};
use crate::{proto, Error};
use super::{BlobService, BlobWriter};
use crate::Error;
use super::BlobService;
// type B3Digest = [u8; 32];
// struct B3Digest ([u8; 32]);
#[derive(Clone, Default)]
pub struct MemoryBlobService {
db: Arc<RwLock<HashMap<Vec<u8>, proto::BlobMeta>>>,
db: Arc<RwLock<HashMap<[u8; 32], Vec<u8>>>>,
}
impl BlobService for MemoryBlobService {
#[instrument(skip(self, req), fields(blob.digest=BASE64.encode(&req.digest)))]
fn stat(&self, req: &proto::StatBlobRequest) -> Result<Option<proto::BlobMeta>, Error> {
if req.include_bao {
todo!("not implemented yet")
}
type BlobReader = Cursor<Vec<u8>>;
type BlobWriter = MemoryBlobWriter;
#[instrument(skip(self, digest), fields(blob.digest=BASE64.encode(digest)))]
fn has(&self, digest: &[u8; 32]) -> Result<bool, Error> {
let db = self.db.read().unwrap();
// if include_chunks is also false, the user only wants to know if the
// blob is present at all.
if !req.include_chunks {
Ok(if db.contains_key(&req.digest) {
Some(proto::BlobMeta::default())
} else {
None
})
} else {
match db.get(&req.digest) {
None => Ok(None),
Some(blob_meta) => Ok(Some(blob_meta.clone())),
Ok(db.contains_key(digest))
}
fn open_read(&self, digest: &[u8; 32]) -> Result<Option<Self::BlobReader>, Error> {
let db = self.db.read().unwrap();
Ok(db.get(digest).map(|x| Cursor::new(x.clone())))
}
#[instrument(skip(self))]
fn open_write(&self) -> Result<Self::BlobWriter, Error> {
Ok(MemoryBlobWriter::new(self.db.clone()))
}
}
#[instrument(skip(self, blob_meta, blob_digest), fields(blob.digest = BASE64.encode(blob_digest)))]
fn put(&self, blob_digest: &[u8], blob_meta: proto::BlobMeta) -> Result<(), Error> {
let mut db = self.db.write().unwrap();
pub struct MemoryBlobWriter {
db: Arc<RwLock<HashMap<[u8; 32], Vec<u8>>>>,
db.insert(blob_digest.to_vec(), blob_meta);
buf: Vec<u8>,
}
Ok(())
// TODO: make sure all callers make sure the chunks exist.
// TODO: where should we calculate the bao?
impl MemoryBlobWriter {
fn new(db: Arc<RwLock<HashMap<[u8; 32], Vec<u8>>>>) -> Self {
Self {
buf: Vec::new(),
db,
}
}
}
impl std::io::Write for MemoryBlobWriter {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.buf.write(buf)
}
fn flush(&mut self) -> std::io::Result<()> {
self.buf.flush()
}
}
impl BlobWriter for MemoryBlobWriter {
fn close(self) -> Result<[u8; 32], Error> {
// in this memory implementation, we don't actually bother hashing
// incrementally while writing, but do it at the end.
let mut hasher = blake3::Hasher::new();
hasher.update(&self.buf);
let digest: [u8; 32] = hasher.finalize().into();
// open the database for writing.
let mut db = self.db.write()?;
db.insert(digest, self.buf);
Ok(digest)
}
}

View file

@ -1,4 +1,6 @@
use crate::{proto, Error};
use std::io;
use crate::Error;
mod memory;
mod sled;
@ -7,14 +9,31 @@ pub use self::memory::MemoryBlobService;
pub use self::sled::SledBlobService;
/// The base trait all BlobService services need to implement.
/// It provides information about how a blob is chunked,
/// and allows creating new blobs by creating a BlobMeta (referring to chunks
/// in a [crate::chunkservice::ChunkService]).
/// It provides functions to check whether a given blob exists,
/// a way to get a [io::Read] to a blob, and a method to initiate writing a new
/// Blob, which returns a [BlobWriter], that can be used
pub trait BlobService {
/// Retrieve chunking information for a given blob
fn stat(&self, req: &proto::StatBlobRequest) -> Result<Option<proto::BlobMeta>, Error>;
type BlobReader: io::Read + Send + std::marker::Unpin;
type BlobWriter: BlobWriter + Send;
/// Insert chunking information for a given blob.
/// Implementations SHOULD make sure chunks referred do exist.
fn put(&self, blob_digest: &[u8], blob_meta: proto::BlobMeta) -> Result<(), Error>;
/// Check if the service has the blob, by its content hash.
fn has(&self, digest: &[u8; 32]) -> Result<bool, Error>;
/// Request a blob from the store, by its content hash. Returns a Option<BlobReader>.
fn open_read(&self, digest: &[u8; 32]) -> Result<Option<Self::BlobReader>, Error>;
/// Insert a new blob into the store. Returns a [BlobWriter], which
/// implements [io::Write] and a [BlobWriter::close].
/// TODO: is there any reason we want this to be a Result<>, and not just T?
fn open_write(&self) -> Result<Self::BlobWriter, Error>;
}
/// A [io::Write] that you need to close() afterwards, and get back the digest
/// of the written blob.
pub trait BlobWriter: io::Write {
/// Signal there's no more data to be written, and return the digest of the
/// contents written.
///
/// This consumes self, so it's not possible to close twice.
fn close(self) -> Result<[u8; 32], Error>;
}

View file

@ -1,13 +1,13 @@
use std::path::PathBuf;
use std::{
io::{self, Cursor},
path::PathBuf,
};
use super::{BlobService, BlobWriter};
use crate::Error;
use data_encoding::BASE64;
use prost::Message;
use tracing::instrument;
use crate::{proto, Error};
use super::BlobService;
#[derive(Clone)]
pub struct SledBlobService {
db: sled::Db,
@ -30,44 +30,69 @@ impl SledBlobService {
}
impl BlobService for SledBlobService {
#[instrument(name = "SledBlobService::stat", skip(self, req), fields(blob.digest=BASE64.encode(&req.digest)))]
fn stat(&self, req: &proto::StatBlobRequest) -> Result<Option<proto::BlobMeta>, Error> {
if req.include_bao {
todo!("not implemented yet")
}
type BlobReader = Cursor<Vec<u8>>;
type BlobWriter = SledBlobWriter;
// if include_chunks is also false, the user only wants to know if the
// blob is present at all.
if !req.include_chunks {
match self.db.contains_key(&req.digest) {
Ok(false) => Ok(None),
Ok(true) => Ok(Some(proto::BlobMeta::default())),
#[instrument(name = "SledBlobService::has", skip(self), fields(blob.digest=BASE64.encode(digest)))]
fn has(&self, digest: &[u8; 32]) -> Result<bool, Error> {
match self.db.contains_key(digest) {
Ok(has) => Ok(has),
Err(e) => Err(Error::StorageError(e.to_string())),
}
} else {
match self.db.get(&req.digest) {
}
#[instrument(name = "SledBlobService::open_read", skip(self), fields(blob.digest=BASE64.encode(digest)))]
fn open_read(&self, digest: &[u8; 32]) -> Result<Option<Self::BlobReader>, Error> {
match self.db.get(digest) {
Ok(None) => Ok(None),
Ok(Some(data)) => match proto::BlobMeta::decode(&*data) {
Ok(blob_meta) => Ok(Some(blob_meta)),
Err(e) => Err(Error::StorageError(format!(
"unable to parse blobmeta message for blob {}: {}",
BASE64.encode(&req.digest),
e
))),
},
Ok(Some(data)) => Ok(Some(Cursor::new(data[..].to_vec()))),
Err(e) => Err(Error::StorageError(e.to_string())),
}
}
#[instrument(name = "SledBlobService::open_write", skip(self))]
fn open_write(&self) -> Result<Self::BlobWriter, Error> {
Ok(SledBlobWriter::new(self.db.clone()))
}
}
pub struct SledBlobWriter {
db: sled::Db,
buf: Vec<u8>,
hasher: blake3::Hasher,
}
impl SledBlobWriter {
pub fn new(db: sled::Db) -> Self {
Self {
buf: Vec::default(),
db,
hasher: blake3::Hasher::new(),
}
}
}
#[instrument(name = "SledBlobService::put", skip(self, blob_meta, blob_digest), fields(blob.digest = BASE64.encode(blob_digest)))]
fn put(&self, blob_digest: &[u8], blob_meta: proto::BlobMeta) -> Result<(), Error> {
let result = self.db.insert(blob_digest, blob_meta.encode_to_vec());
if let Err(e) = result {
return Err(Error::StorageError(e.to_string()));
impl io::Write for SledBlobWriter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let bytes_written = self.buf.write(buf)?;
self.hasher.write(&buf[..bytes_written])
}
Ok(())
// TODO: make sure all callers make sure the chunks exist.
// TODO: where should we calculate the bao?
fn flush(&mut self) -> io::Result<()> {
self.buf.flush()
}
}
impl BlobWriter for SledBlobWriter {
fn close(self) -> Result<[u8; 32], Error> {
let digest = self.hasher.finalize();
self.db.insert(digest.as_bytes(), self.buf).map_err(|e| {
Error::StorageError(format!("unable to insert blob: {}", e.to_string()))
})?;
Ok(digest
.to_owned()
.try_into()
.map_err(|_| Error::StorageError("invalid digest length in response".to_string()))?)
}
}

View file

@ -1,53 +0,0 @@
use data_encoding::BASE64;
use std::{
collections::HashMap,
sync::{Arc, RwLock},
};
use tracing::instrument;
use crate::Error;
use super::ChunkService;
#[derive(Clone, Default)]
pub struct MemoryChunkService {
db: Arc<RwLock<HashMap<[u8; 32], Vec<u8>>>>,
}
impl ChunkService for MemoryChunkService {
#[instrument(skip(self, digest), fields(chunk.digest=BASE64.encode(digest)))]
fn has(&self, digest: &[u8; 32]) -> Result<bool, Error> {
let db = self.db.read().unwrap();
Ok(db.get(digest).is_some())
}
#[instrument(skip(self), fields(chunk.digest=BASE64.encode(digest)))]
fn get(&self, digest: &[u8; 32]) -> Result<Option<Vec<u8>>, Error> {
let db = self.db.read().unwrap();
match db.get(digest) {
None => Ok(None),
Some(data) => {
// calculate the hash to verify this is really what we expect
let actual_digest = blake3::hash(data).as_bytes().to_vec();
if actual_digest != digest {
return Err(Error::StorageError(format!(
"invalid hash encountered when reading chunk, expected {}, got {}",
BASE64.encode(digest),
BASE64.encode(&actual_digest),
)));
}
Ok(Some(data.clone()))
}
}
}
#[instrument(skip(self, data))]
fn put(&self, data: Vec<u8>) -> Result<[u8; 32], Error> {
let digest = blake3::hash(&data);
let mut db = self.db.write().unwrap();
db.insert(*digest.as_bytes(), data);
Ok(*digest.as_bytes())
}
}

View file

@ -1,28 +0,0 @@
mod util;
pub mod memory;
pub mod sled;
use crate::Error;
pub use self::memory::MemoryChunkService;
pub use self::sled::SledChunkService;
pub use self::util::read_all_and_chunk;
pub use self::util::update_hasher;
pub use self::util::upload_chunk;
/// The base trait all ChunkService services need to implement.
/// It allows checking for the existence, download and upload of chunks.
/// It's usually used after consulting a [crate::blobservice::BlobService] for
/// chunking information.
pub trait ChunkService {
/// check if the service has a chunk, given by its digest.
fn has(&self, digest: &[u8; 32]) -> Result<bool, Error>;
/// retrieve a chunk by its digest. Implementations MUST validate the digest
/// matches.
fn get(&self, digest: &[u8; 32]) -> Result<Option<Vec<u8>>, Error>;
/// insert a chunk. returns the digest of the chunk, or an error.
fn put(&self, data: Vec<u8>) -> Result<[u8; 32], Error>;
}

View file

@ -1,70 +0,0 @@
use std::path::PathBuf;
use data_encoding::BASE64;
use tracing::instrument;
use crate::Error;
use super::ChunkService;
#[derive(Clone)]
pub struct SledChunkService {
db: sled::Db,
}
impl SledChunkService {
pub fn new(p: PathBuf) -> Result<Self, sled::Error> {
let config = sled::Config::default().use_compression(true).path(p);
let db = config.open()?;
Ok(Self { db })
}
pub fn new_temporary() -> Result<Self, sled::Error> {
let config = sled::Config::default().temporary(true);
let db = config.open()?;
Ok(Self { db })
}
}
impl ChunkService for SledChunkService {
#[instrument(name = "SledChunkService::has", skip(self, digest), fields(chunk.digest=BASE64.encode(digest)))]
fn has(&self, digest: &[u8; 32]) -> Result<bool, Error> {
match self.db.get(digest) {
Ok(None) => Ok(false),
Ok(Some(_)) => Ok(true),
Err(e) => Err(Error::StorageError(e.to_string())),
}
}
#[instrument(name = "SledChunkService::get", skip(self), fields(chunk.digest=BASE64.encode(digest)))]
fn get(&self, digest: &[u8; 32]) -> Result<Option<Vec<u8>>, Error> {
match self.db.get(digest) {
Ok(None) => Ok(None),
Ok(Some(data)) => {
// calculate the hash to verify this is really what we expect
let actual_digest = blake3::hash(&data).as_bytes().to_vec();
if actual_digest != digest {
return Err(Error::StorageError(format!(
"invalid hash encountered when reading chunk, expected {}, got {}",
BASE64.encode(digest),
BASE64.encode(&actual_digest),
)));
}
Ok(Some(Vec::from(&*data)))
}
Err(e) => Err(Error::StorageError(e.to_string())),
}
}
#[instrument(name = "SledChunkService::put", skip(self, data))]
fn put(&self, data: Vec<u8>) -> Result<[u8; 32], Error> {
let digest = blake3::hash(&data);
let result = self.db.insert(digest.as_bytes(), data);
if let Err(e) = result {
return Err(Error::StorageError(e.to_string()));
}
Ok(*digest.as_bytes())
}
}

View file

@ -1,85 +0,0 @@
use crate::{proto, Error};
use std::io::Read;
use tracing::{debug, instrument};
use super::ChunkService;
/// uploads a chunk to a chunk service, and returns its digest (or an error) when done.
#[instrument(skip_all, err)]
pub fn upload_chunk<CS: ChunkService>(
chunk_service: &CS,
chunk_data: Vec<u8>,
) -> Result<[u8; 32], Error> {
let mut hasher = blake3::Hasher::new();
update_hasher(&mut hasher, &chunk_data);
let digest = hasher.finalize();
if chunk_service.has(digest.as_bytes())? {
debug!("already has chunk, skipping");
}
let digest_resp = chunk_service.put(chunk_data)?;
assert_eq!(&digest_resp, digest.as_bytes());
Ok(*digest.as_bytes())
}
/// reads through a reader, writes chunks to a [ChunkService] and returns a
/// [proto::BlobMeta] pointing to all the chunks.
#[instrument(skip_all, err)]
pub fn read_all_and_chunk<CS: ChunkService, R: Read>(
chunk_service: &CS,
r: R,
) -> Result<(Vec<u8>, proto::BlobMeta), Error> {
let mut blob_meta = proto::BlobMeta::default();
// hash the file contents, upload chunks if not there yet
let mut blob_hasher = blake3::Hasher::new();
// TODO: play with chunking sizes
let chunker_avg_size = 64 * 1024;
let chunker_min_size = chunker_avg_size / 4;
let chunker_max_size = chunker_avg_size * 4;
let chunker =
fastcdc::v2020::StreamCDC::new(r, chunker_min_size, chunker_avg_size, chunker_max_size);
for chunking_result in chunker {
let chunk = chunking_result.unwrap();
// TODO: convert to error::UnableToRead
let chunk_len = chunk.data.len() as u32;
// update calculate blob hash
update_hasher(&mut blob_hasher, &chunk.data);
let chunk_digest = upload_chunk(chunk_service, chunk.data)?;
blob_meta.chunks.push(proto::blob_meta::ChunkMeta {
digest: chunk_digest.to_vec(),
size: chunk_len,
});
}
Ok((blob_hasher.finalize().as_bytes().to_vec(), blob_meta))
}
/// updates a given hasher with more data. Uses rayon if the data is
/// sufficiently big.
///
/// From the docs:
///
/// To get any performance benefit from multithreading, the input buffer needs
/// to be large. As a rule of thumb on x86_64, update_rayon is slower than
/// update for inputs under 128 KiB. That threshold varies quite a lot across
/// different processors, and its important to benchmark your specific use
/// case.
///
/// We didn't benchmark yet, so these numbers might need tweaking.
#[instrument(skip_all)]
pub fn update_hasher(hasher: &mut blake3::Hasher, data: &[u8]) {
if data.len() > 128 * 1024 {
hasher.update_rayon(data);
} else {
hasher.update(data);
}
}

View file

@ -1,19 +1,17 @@
use crate::{chunkservice::read_all_and_chunk, directoryservice::DirectoryPutter, proto};
use crate::{blobservice::BlobService, directoryservice::DirectoryService};
use crate::{blobservice::BlobWriter, directoryservice::DirectoryPutter, proto};
use std::{
collections::HashMap,
fmt::Debug,
fs,
fs::File,
io,
os::unix::prelude::PermissionsExt,
path::{Path, PathBuf},
};
use tracing::instrument;
use walkdir::WalkDir;
use crate::{
blobservice::BlobService, chunkservice::ChunkService, directoryservice::DirectoryService,
};
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("failed to upload directory at {0}: {1}")]
@ -57,9 +55,8 @@ impl From<super::Error> for Error {
//
// It assumes the caller adds returned nodes to the directories it assembles.
#[instrument(skip_all, fields(entry.file_type=?&entry.file_type(),entry.path=?entry.path()))]
fn process_entry<BS: BlobService, CS: ChunkService + std::marker::Sync, DP: DirectoryPutter>(
fn process_entry<BS: BlobService, DP: DirectoryPutter>(
blob_service: &mut BS,
chunk_service: &mut CS,
directory_putter: &mut DP,
entry: &walkdir::DirEntry,
maybe_directory: Option<proto::Directory>,
@ -112,23 +109,16 @@ fn process_entry<BS: BlobService, CS: ChunkService + std::marker::Sync, DP: Dire
.metadata()
.map_err(|e| Error::UnableToStat(entry_path.clone(), e.into()))?;
let file = File::open(entry_path.clone())
let mut file = File::open(entry_path.clone())
.map_err(|e| Error::UnableToOpen(entry_path.clone(), e))?;
let (blob_digest, blob_meta) = read_all_and_chunk(chunk_service, file)?;
let mut writer = blob_service.open_write()?;
// upload blobmeta if not there yet
if blob_service
.stat(&proto::StatBlobRequest {
digest: blob_digest.to_vec(),
include_chunks: false,
include_bao: false,
})?
.is_none()
{
// upload blobmeta
blob_service.put(&blob_digest, blob_meta)?;
}
if let Err(e) = io::copy(&mut file, &mut writer) {
return Err(Error::UnableToRead(entry_path, e));
};
let digest = writer.close()?;
return Ok(proto::node::Node::File(proto::FileNode {
name: entry
@ -136,7 +126,7 @@ fn process_entry<BS: BlobService, CS: ChunkService + std::marker::Sync, DP: Dire
.to_str()
.map(|s| Ok(s.to_owned()))
.unwrap_or(Err(Error::InvalidEncoding(entry.path().to_path_buf())))?,
digest: blob_digest,
digest: digest.to_vec(),
size: metadata.len() as u32,
// If it's executable by the user, it'll become executable.
// This matches nix's dump() function behaviour.
@ -152,15 +142,9 @@ fn process_entry<BS: BlobService, CS: ChunkService + std::marker::Sync, DP: Dire
/// to the PathInfoService.
//
// returns the root node, or an error.
#[instrument(skip(blob_service, chunk_service, directory_service), fields(path=?p))]
pub fn import_path<
BS: BlobService,
CS: ChunkService + std::marker::Sync,
DS: DirectoryService,
P: AsRef<Path> + Debug,
>(
#[instrument(skip(blob_service, directory_service), fields(path=?p))]
pub fn import_path<BS: BlobService, DS: DirectoryService, P: AsRef<Path> + Debug>(
blob_service: &mut BS,
chunk_service: &mut CS,
directory_service: &mut DS,
p: P,
) -> Result<proto::node::Node, Error> {
@ -212,13 +196,7 @@ pub fn import_path<
}
};
let node = process_entry(
blob_service,
chunk_service,
&mut directory_putter,
&entry,
maybe_directory,
)?;
let node = process_entry(blob_service, &mut directory_putter, &entry, maybe_directory)?;
if entry.depth() == 0 {
return Ok(node);

View file

@ -1,15 +1,12 @@
mod blobreader;
mod errors;
pub mod blobservice;
pub mod chunkservice;
pub mod directoryservice;
pub mod import;
pub mod nar;
pub mod pathinfoservice;
pub mod proto;
pub use blobreader::BlobReader;
pub use errors::Error;
#[cfg(test)]

View file

@ -2,7 +2,6 @@ use count_write::CountWrite;
use sha2::{Digest, Sha256};
use crate::blobservice::BlobService;
use crate::chunkservice::ChunkService;
use crate::directoryservice::DirectoryService;
use crate::proto;
@ -12,26 +11,20 @@ use super::{NARCalculationService, RenderError};
/// A NAR calculation service which simply renders the whole NAR whenever
/// we ask for the calculation.
#[derive(Clone)]
pub struct NonCachingNARCalculationService<
BS: BlobService,
CS: ChunkService + Clone,
DS: DirectoryService,
> {
nar_renderer: NARRenderer<BS, CS, DS>,
pub struct NonCachingNARCalculationService<BS: BlobService, DS: DirectoryService> {
nar_renderer: NARRenderer<BS, DS>,
}
impl<BS: BlobService, CS: ChunkService + Clone, DS: DirectoryService>
NonCachingNARCalculationService<BS, CS, DS>
{
pub fn new(blob_service: BS, chunk_service: CS, directory_service: DS) -> Self {
impl<BS: BlobService, DS: DirectoryService> NonCachingNARCalculationService<BS, DS> {
pub fn new(blob_service: BS, directory_service: DS) -> Self {
Self {
nar_renderer: NARRenderer::new(blob_service, chunk_service, directory_service),
nar_renderer: NARRenderer::new(blob_service, directory_service),
}
}
}
impl<BS: BlobService, CS: ChunkService + Clone, DS: DirectoryService> NARCalculationService
for NonCachingNARCalculationService<BS, CS, DS>
impl<BS: BlobService, DS: DirectoryService> NARCalculationService
for NonCachingNARCalculationService<BS, DS>
{
fn calculate_nar(
&self,

View file

@ -1,10 +1,11 @@
use std::io::{self, BufReader};
use crate::{
blobservice::BlobService,
chunkservice::ChunkService,
directoryservice::DirectoryService,
proto::{self, NamedNode},
BlobReader,
};
use data_encoding::BASE64;
use nix_compat::nar;
use super::RenderError;
@ -12,17 +13,15 @@ use super::RenderError;
/// A NAR renderer, using a blob_service, chunk_service and directory_service
/// to render a NAR to a writer.
#[derive(Clone)]
pub struct NARRenderer<BS: BlobService, CS: ChunkService + Clone, DS: DirectoryService> {
pub struct NARRenderer<BS: BlobService, DS: DirectoryService> {
blob_service: BS,
chunk_service: CS,
directory_service: DS,
}
impl<BS: BlobService, CS: ChunkService + Clone, DS: DirectoryService> NARRenderer<BS, CS, DS> {
pub fn new(blob_service: BS, chunk_service: CS, directory_service: DS) -> Self {
impl<BS: BlobService, DS: DirectoryService> NARRenderer<BS, DS> {
pub fn new(blob_service: BS, directory_service: DS) -> Self {
Self {
blob_service,
chunk_service,
directory_service,
}
}
@ -65,40 +64,15 @@ impl<BS: BlobService, CS: ChunkService + Clone, DS: DirectoryService> NARRendere
))
})?;
// query blob_service for blob_meta
let resp = self
.blob_service
.stat(&proto::StatBlobRequest {
digest: digest.to_vec(),
include_chunks: true,
..Default::default()
})
.map_err(RenderError::StoreError)?;
// TODO: handle error
let mut blob_reader = match self.blob_service.open_read(&digest).unwrap() {
Some(blob_reader) => Ok(BufReader::new(blob_reader)),
None => Err(RenderError::NARWriterError(io::Error::new(
io::ErrorKind::NotFound,
format!("blob with digest {} not found", BASE64.encode(&digest)),
))),
}?;
match resp {
// if it's None, that's an error!
None => {
return Err(RenderError::BlobNotFound(
digest.to_vec(),
proto_file_node.name.to_owned(),
));
}
Some(blob_meta) => {
// make sure the blob_meta size matches what we expect from proto_file_node
let blob_meta_size = blob_meta.chunks.iter().fold(0, |acc, e| acc + e.size);
if blob_meta_size != proto_file_node.size {
return Err(RenderError::UnexpectedBlobMeta(
digest.to_vec(),
proto_file_node.name.to_owned(),
proto_file_node.size,
blob_meta_size,
));
}
let mut blob_reader = std::io::BufReader::new(BlobReader::open(
&self.chunk_service,
blob_meta,
));
nar_node
.file(
proto_file_node.executable,
@ -107,8 +81,6 @@ impl<BS: BlobService, CS: ChunkService + Clone, DS: DirectoryService> NARRendere
)
.map_err(RenderError::NARWriterError)?;
}
}
}
proto::node::Node::Directory(proto_directory_node) => {
let digest: [u8; 32] =
proto_directory_node

View file

@ -1,54 +1,34 @@
use std::collections::VecDeque;
use crate::{
blobservice::BlobService,
chunkservice::{read_all_and_chunk, update_hasher, ChunkService},
Error,
blobservice::{BlobService, BlobWriter},
proto::sync_read_into_async_read::SyncReadIntoAsyncRead,
};
use data_encoding::BASE64;
use tokio::{sync::mpsc::channel, task};
use tokio_stream::{wrappers::ReceiverStream, StreamExt};
use std::{collections::VecDeque, io, pin::Pin};
use tokio::task;
use tokio_stream::StreamExt;
use tokio_util::io::ReaderStream;
use tonic::{async_trait, Request, Response, Status, Streaming};
use tracing::{debug, instrument, warn};
use tracing::{instrument, warn};
pub struct GRPCBlobServiceWrapper<BS: BlobService, CS: ChunkService> {
pub struct GRPCBlobServiceWrapper<BS: BlobService> {
blob_service: BS,
chunk_service: CS,
}
impl<BS: BlobService, CS: ChunkService> GRPCBlobServiceWrapper<BS, CS> {
pub fn new(blob_service: BS, chunk_service: CS) -> Self {
impl<BS: BlobService> From<BS> for GRPCBlobServiceWrapper<BS> {
fn from(value: BS) -> Self {
Self {
blob_service,
chunk_service,
blob_service: value,
}
}
// upload the chunk to the chunk service, and return its digest (or an error) when done.
#[instrument(skip(chunk_service))]
fn upload_chunk(chunk_service: CS, chunk_data: Vec<u8>) -> Result<Vec<u8>, Error> {
let mut hasher = blake3::Hasher::new();
update_hasher(&mut hasher, &chunk_data);
let digest = hasher.finalize();
if chunk_service.has(digest.as_bytes())? {
debug!("already has chunk, skipping");
}
let digest_resp = chunk_service.put(chunk_data)?;
assert_eq!(&digest_resp, digest.as_bytes());
Ok(digest.as_bytes().to_vec())
}
}
#[async_trait]
impl<
BS: BlobService + Send + Sync + Clone + 'static,
CS: ChunkService + Send + Sync + Clone + 'static,
> super::blob_service_server::BlobService for GRPCBlobServiceWrapper<BS, CS>
impl<BS: BlobService + Send + Sync + Clone + 'static> super::blob_service_server::BlobService
for GRPCBlobServiceWrapper<BS>
{
type ReadStream = ReceiverStream<Result<super::BlobChunk, Status>>;
// https://github.com/tokio-rs/tokio/issues/2723#issuecomment-1534723933
type ReadStream =
Pin<Box<dyn futures::Stream<Item = Result<super::BlobChunk, Status>> + Send + 'static>>;
#[instrument(skip(self))]
async fn stat(
@ -56,12 +36,22 @@ impl<
request: Request<super::StatBlobRequest>,
) -> Result<Response<super::BlobMeta>, Status> {
let rq = request.into_inner();
match self.blob_service.stat(&rq) {
Ok(None) => Err(Status::not_found(format!(
let req_digest: [u8; 32] = rq
.digest
.clone()
.try_into()
.map_err(|_e| Status::invalid_argument("invalid digest length"))?;
if rq.include_chunks || rq.include_bao {
return Err(Status::internal("not implemented"));
}
match self.blob_service.has(&req_digest) {
Ok(true) => Ok(Response::new(super::BlobMeta::default())),
Ok(false) => Err(Status::not_found(format!(
"blob {} not found",
BASE64.encode(&rq.digest)
BASE64.encode(&req_digest)
))),
Ok(Some(blob_meta)) => Ok(Response::new(blob_meta)),
Err(e) => Err(e.into()),
}
}
@ -71,100 +61,39 @@ impl<
&self,
request: Request<super::ReadBlobRequest>,
) -> Result<Response<Self::ReadStream>, Status> {
let req = request.into_inner();
let (tx, rx) = channel(5);
let rq = request.into_inner();
let req_digest: [u8; 32] = req
let req_digest: [u8; 32] = rq
.digest
.clone()
.try_into()
.map_err(|_e| Status::invalid_argument("invalid digest length"))?;
.map_err(|_| Status::invalid_argument("invalid digest length"))?;
// query the blob service for more detailed blob info
let stat_resp = self.blob_service.stat(&super::StatBlobRequest {
digest: req_digest.to_vec(),
include_chunks: true,
..Default::default()
})?;
match self.blob_service.open_read(&req_digest) {
Ok(Some(reader)) => {
let async_reader: SyncReadIntoAsyncRead<_, bytes::BytesMut> = reader.into();
match stat_resp {
None => {
// If the stat didn't return any blobmeta, the client might
// still have asked for a single chunk to be read.
// Check the chunkstore.
if let Some(data) = self.chunk_service.get(&req_digest)? {
// We already know the hash matches, and contrary to
// iterating over a blobmeta, we can't know the size,
// so send the contents of that chunk over,
// as the first (and only) element of the stream.
task::spawn(async move {
let res = Ok(super::BlobChunk { data });
// send the result to the client. If the client already left, that's also fine.
if (tx.send(res).await).is_err() {
debug!("receiver dropped");
fn stream_mapper(
x: Result<bytes::Bytes, io::Error>,
) -> Result<super::BlobChunk, Status> {
match x {
Ok(bytes) => Ok(super::BlobChunk {
data: bytes.to_vec(),
}),
Err(e) => Err(Status::from(e)),
}
});
} else {
return Err(Status::not_found(format!(
}
let chunks_stream = ReaderStream::new(async_reader).map(stream_mapper);
Ok(Response::new(Box::pin(chunks_stream)))
}
Ok(None) => Err(Status::not_found(format!(
"blob {} not found",
BASE64.encode(&req_digest),
)));
}
}
Some(blobmeta) => {
let chunk_client = self.chunk_service.clone();
// TODO: use BlobReader?
// But then we might not be able to send compressed chunks as-is.
// Might require implementing https://docs.rs/futures-core/latest/futures_core/stream/trait.Stream.html for it
// first, so we can .next().await in here.
task::spawn(async move {
for chunkmeta in blobmeta.chunks {
// request chunk.
// We don't need to validate the digest again, as
// that's required for all implementations of ChunkService.
// TODO: handle error
let chunkmeta_digest = &chunkmeta.digest.try_into().unwrap();
let res = match chunk_client.get(chunkmeta_digest) {
BASE64.encode(&rq.digest)
))),
Err(e) => Err(e.into()),
// TODO: make this a separate error type
Ok(None) => Err(Error::StorageError(format!(
"consistency error: chunk {} for blob {} not found",
BASE64.encode(chunkmeta_digest),
BASE64.encode(&req_digest),
))
.into()),
Ok(Some(data)) => {
// We already know the hash matches, but also
// check the size matches what chunkmeta said.
if data.len() as u32 != chunkmeta.size {
Err(Error::StorageError(format!(
"consistency error: chunk {} for blob {} has wrong size, expected {}, got {}",
BASE64.encode(chunkmeta_digest),
BASE64.encode(&req_digest),
chunkmeta.size,
data.len(),
)).into())
} else {
// send out the current chunk
// TODO: we might want to break this up further if too big?
Ok(super::BlobChunk { data })
}
}
};
// send the result to the client
if (tx.send(res).await).is_err() {
debug!("receiver dropped");
break;
}
}
});
}
}
let receiver_stream = ReceiverStream::new(rx);
Ok(Response::new(receiver_stream))
}
#[instrument(skip(self))]
async fn put(
@ -180,37 +109,34 @@ impl<
let data_reader = tokio_util::io::StreamReader::new(data_stream);
// TODO: can we get rid of this clone?
let chunk_service = self.chunk_service.clone();
// prepare a writer, which we'll use in the blocking task below.
let mut writer = self
.blob_service
.open_write()
.map_err(|e| Status::internal(format!("unable to open for write: {}", e)))?;
let (blob_digest, blob_meta) =
task::spawn_blocking(move || -> Result<(Vec<u8>, super::BlobMeta), Error> {
// feed read_all_and_chunk a (sync) reader to the data retrieved from the stream.
read_all_and_chunk(
&chunk_service,
tokio_util::io::SyncIoBridge::new(data_reader),
)
let result = task::spawn_blocking(move || -> Result<super::PutBlobResponse, Status> {
// construct a sync reader to the data
let mut reader = tokio_util::io::SyncIoBridge::new(data_reader);
io::copy(&mut reader, &mut writer).map_err(|e| {
warn!("error copying: {}", e);
Status::internal("error copying")
})?;
let digest = writer
.close()
.map_err(|e| {
warn!("error closing stream: {}", e);
Status::internal("error closing stream")
})?
.to_vec();
Ok(super::PutBlobResponse { digest })
})
.await
.map_err(|e| Status::internal(e.to_string()))??;
.map_err(|_| Status::internal("failed to wait for task"))??;
// upload blobmeta if not there yet
if self
.blob_service
.stat(&super::StatBlobRequest {
digest: blob_digest.to_vec(),
include_chunks: false,
include_bao: false,
})?
.is_none()
{
// upload blobmeta
self.blob_service.put(&blob_digest, blob_meta)?;
}
// return to client.
Ok(Response::new(super::PutBlobResponse {
digest: blob_digest,
}))
Ok(Response::new(result))
}
}

View file

@ -11,6 +11,8 @@ mod grpc_blobservice_wrapper;
mod grpc_directoryservice_wrapper;
mod grpc_pathinfoservice_wrapper;
mod sync_read_into_async_read;
pub use grpc_blobservice_wrapper::GRPCBlobServiceWrapper;
pub use grpc_directoryservice_wrapper::GRPCDirectoryServiceWrapper;
pub use grpc_pathinfoservice_wrapper::GRPCPathInfoServiceWrapper;

View file

@ -0,0 +1,158 @@
use bytes::Buf;
use core::task::Poll::Ready;
use futures::ready;
use futures::Future;
use std::io;
use std::io::Read;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
use tokio::io::AsyncRead;
use tokio::runtime::Handle;
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
#[derive(Debug)]
enum State<Buf: bytes::Buf + bytes::BufMut> {
Idle(Option<Buf>),
Busy(JoinHandle<(io::Result<usize>, Buf)>),
}
use State::{Busy, Idle};
/// Use a [`SyncReadIntoAsyncRead`] to asynchronously read from a
/// synchronous API.
#[derive(Debug)]
pub struct SyncReadIntoAsyncRead<R: Read + Send, Buf: bytes::Buf + bytes::BufMut> {
state: Mutex<State<Buf>>,
reader: Arc<Mutex<R>>,
rt: Handle,
}
impl<R: Read + Send, Buf: bytes::Buf + bytes::BufMut> SyncReadIntoAsyncRead<R, Buf> {
/// This must be called from within a Tokio runtime context, or else it will panic.
#[track_caller]
pub fn new(rt: Handle, reader: R) -> Self {
Self {
rt,
state: State::Idle(None).into(),
reader: Arc::new(reader.into()),
}
}
/// This must be called from within a Tokio runtime context, or else it will panic.
pub fn new_with_reader(readable: R) -> Self {
Self::new(Handle::current(), readable)
}
}
/// Repeats operations that are interrupted.
macro_rules! uninterruptibly {
($e:expr) => {{
loop {
match $e {
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
res => break res,
}
}
}};
}
impl<
R: Read + Send + 'static + std::marker::Unpin,
Buf: bytes::Buf + bytes::BufMut + Send + Default + std::marker::Unpin + 'static,
> AsyncRead for SyncReadIntoAsyncRead<R, Buf>
{
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
dst: &mut tokio::io::ReadBuf<'_>,
) -> Poll<io::Result<()>> {
let me = self.get_mut();
// Do we need this mutex?
let state = me.state.get_mut();
loop {
match state {
Idle(ref mut buf_cell) => {
let mut buf = buf_cell.take().unwrap_or_default();
if buf.has_remaining() {
// Here, we will split the `buf` into `[..dst.remaining()... ; rest ]`
// The `rest` is stuffed into the `buf_cell` for further poll_read.
// The other is completely consumed into the unfilled destination.
// `rest` can be empty.
let mut adjusted_src =
buf.copy_to_bytes(std::cmp::min(buf.remaining(), dst.remaining()));
let copied_size = adjusted_src.remaining();
adjusted_src.copy_to_slice(dst.initialize_unfilled_to(copied_size));
dst.set_filled(copied_size);
*buf_cell = Some(buf);
return Ready(Ok(()));
}
let reader = me.reader.clone();
*state = Busy(me.rt.spawn_blocking(move || {
let result = uninterruptibly!(reader.blocking_lock().read(
// SAFETY: `reader.read` will *ONLY* write initialized bytes
// and never *READ* uninitialized bytes
// inside this buffer.
//
// Furthermore, casting the slice as `*mut [u8]`
// is safe because it has the same layout.
//
// Finally, the pointer obtained is valid and owned
// by `buf` only as we have a valid mutable reference
// to it, it is valid for write.
//
// Here, we copy an nightly API: https://doc.rust-lang.org/stable/src/core/mem/maybe_uninit.rs.html#994-998
unsafe {
&mut *(buf.chunk_mut().as_uninit_slice_mut()
as *mut [std::mem::MaybeUninit<u8>]
as *mut [u8])
}
));
if let Ok(n) = result {
// SAFETY: given we initialize `n` bytes, we can move `n` bytes
// forward.
unsafe {
buf.advance_mut(n);
}
}
(result, buf)
}));
}
Busy(ref mut rx) => {
let (result, mut buf) = ready!(Pin::new(rx).poll(cx))?;
match result {
Ok(n) => {
if n > 0 {
let remaining = std::cmp::min(n, dst.remaining());
let mut adjusted_src = buf.copy_to_bytes(remaining);
adjusted_src.copy_to_slice(dst.initialize_unfilled_to(remaining));
dst.advance(remaining);
}
*state = Idle(Some(buf));
return Ready(Ok(()));
}
Err(e) => {
*state = Idle(None);
return Ready(Err(e));
}
}
}
}
}
}
}
impl<R: Read + Send, Buf: bytes::Buf + bytes::BufMut> From<R> for SyncReadIntoAsyncRead<R, Buf> {
/// This must be called from within a Tokio runtime context, or else it will panic.
fn from(value: R) -> Self {
Self::new_with_reader(value)
}
}

View file

@ -1,18 +1,14 @@
use crate::blobservice::BlobService;
use crate::chunkservice::ChunkService;
use crate::proto::blob_meta::ChunkMeta;
use crate::proto::blob_service_server::BlobService as GRPCBlobService;
use crate::proto::{BlobChunk, GRPCBlobServiceWrapper, ReadBlobRequest, StatBlobRequest};
use crate::tests::fixtures::{BLOB_A, BLOB_A_DIGEST, BLOB_B, BLOB_B_DIGEST};
use crate::tests::utils::{gen_blob_service, gen_chunk_service};
use crate::tests::fixtures::{BLOB_A, BLOB_A_DIGEST};
use crate::tests::utils::gen_blob_service;
use tokio_stream::StreamExt;
fn gen_grpc_blob_service() -> GRPCBlobServiceWrapper<
impl BlobService + Send + Sync + Clone + 'static,
impl ChunkService + Send + Sync + Clone + 'static,
> {
fn gen_grpc_blob_service(
) -> GRPCBlobServiceWrapper<impl BlobService + Send + Sync + Clone + 'static> {
let blob_service = gen_blob_service();
let chunk_service = gen_chunk_service();
GRPCBlobServiceWrapper::new(blob_service, chunk_service)
GRPCBlobServiceWrapper::from(blob_service)
}
/// Trying to read a non-existent blob should return a not found error.
@ -26,8 +22,13 @@ async fn not_found_read() {
}))
.await;
let e = resp.expect_err("must_be_err");
// We can't use unwrap_err here, because the Ok value doesn't implement
// debug.
if let Err(e) = resp {
assert_eq!(e.code(), tonic::Code::NotFound);
} else {
panic!("resp is not err")
}
}
/// Trying to stat a non-existent blob should return a not found error.
@ -47,8 +48,7 @@ async fn not_found_stat() {
assert_eq!(resp.code(), tonic::Code::NotFound);
}
/// Put a blob in the store, get it back. We send something small enough so it
/// won't get split into multiple chunks.
/// Put a blob in the store, get it back.
#[tokio::test]
async fn put_read_stat() {
let service = gen_grpc_blob_service();
@ -64,39 +64,30 @@ async fn put_read_stat() {
assert_eq!(BLOB_A_DIGEST.to_vec(), put_resp.digest);
// Stat for the digest of A. It should return one chunk.
// Stat for the digest of A.
// We currently don't ask for more granular chunking data, as we don't
// expose it yet.
let resp = service
.stat(tonic::Request::new(StatBlobRequest {
digest: BLOB_A_DIGEST.to_vec(),
include_chunks: true,
..Default::default()
}))
.await
.expect("must succeed")
.into_inner();
assert_eq!(1, resp.chunks.len());
// the `chunks` field should point to the single chunk.
assert_eq!(
vec![ChunkMeta {
digest: BLOB_A_DIGEST.to_vec(),
size: BLOB_A.len() as u32,
}],
resp.chunks,
);
// Read the chunk. It should return the same data.
// Read the blob. It should return the same data.
let resp = service
.read(tonic::Request::new(ReadBlobRequest {
digest: BLOB_A_DIGEST.to_vec(),
}))
.await;
let mut rx = resp.expect("must succeed").into_inner().into_inner();
let mut rx = resp.ok().unwrap().into_inner();
// the stream should contain one element, a BlobChunk with the same contents as BLOB_A.
let item = rx
.recv()
.next()
.await
.expect("must be some")
.expect("must succeed");
@ -104,127 +95,8 @@ async fn put_read_stat() {
assert_eq!(BLOB_A.to_vec(), item.data);
// … and no more elements
assert!(rx.recv().await.is_none());
}
/// Put a bigger blob in the store, and get it back.
/// Assert the stat request actually returns more than one chunk, and
/// we can read each chunk individually, as well as the whole blob via the
/// `read()` method.
#[tokio::test]
async fn put_read_stat_large() {
let service = gen_grpc_blob_service();
// split up BLOB_B into BlobChunks containing 1K bytes each.
let blob_b_blobchunks: Vec<BlobChunk> = BLOB_B
.chunks(1024)
.map(|x| BlobChunk { data: x.to_vec() })
.collect();
assert!(blob_b_blobchunks.len() > 1);
// Send blob B
let put_resp = service
.put(tonic_mock::streaming_request(blob_b_blobchunks))
.await
.expect("must succeed")
.into_inner();
assert_eq!(BLOB_B_DIGEST.to_vec(), put_resp.digest);
// Stat for the digest of B
let resp = service
.stat(tonic::Request::new(StatBlobRequest {
digest: BLOB_B_DIGEST.to_vec(),
include_chunks: true,
..Default::default()
}))
.await
.expect("must succeed")
.into_inner();
// it should return more than one chunk.
assert_ne!(1, resp.chunks.len());
// The size added up should equal the size of BLOB_B.
let mut size_in_stat: u32 = 0;
for chunk in &resp.chunks {
size_in_stat += chunk.size
}
assert_eq!(BLOB_B.len() as u32, size_in_stat);
// Chunks are chunked up the same way we would do locally, when initializing the chunker with the same values.
// TODO: make the chunker config better accessible, so we don't need to synchronize this.
{
let chunker_avg_size = 64 * 1024;
let chunker_min_size = chunker_avg_size / 4;
let chunker_max_size = chunker_avg_size * 4;
// initialize a chunker with the current buffer
let blob_b = BLOB_B.to_vec();
let chunker = fastcdc::v2020::FastCDC::new(
&blob_b,
chunker_min_size,
chunker_avg_size,
chunker_max_size,
);
let mut num_chunks = 0;
for (i, chunk) in chunker.enumerate() {
assert_eq!(
resp.chunks[i].size, chunk.length as u32,
"expected locally-chunked chunk length to match stat response"
);
num_chunks += 1;
}
assert_eq!(
resp.chunks.len(),
num_chunks,
"expected number of chunks to match"
);
}
// Reading the whole blob by its digest via the read() interface should succeed.
{
let resp = service
.read(tonic::Request::new(ReadBlobRequest {
digest: BLOB_B_DIGEST.to_vec(),
}))
.await;
let mut rx = resp.expect("must succeed").into_inner().into_inner();
let mut buf: Vec<u8> = Vec::new();
while let Some(item) = rx.recv().await {
let mut blob_chunk = item.expect("must not be err");
buf.append(&mut blob_chunk.data);
}
assert_eq!(BLOB_B.to_vec(), buf);
}
// Reading the whole blob by reading individual chunks should also succeed.
{
let mut buf: Vec<u8> = Vec::new();
for chunk in &resp.chunks {
// request this individual chunk via read
let resp = service
.read(tonic::Request::new(ReadBlobRequest {
digest: chunk.digest.clone(),
}))
.await;
let mut rx = resp.expect("must succeed").into_inner().into_inner();
// append all items from the stream to the buffer
while let Some(item) = rx.recv().await {
let mut blob_chunk = item.expect("must not be err");
buf.append(&mut blob_chunk.data);
}
}
// finished looping over all chunks, compare
assert_eq!(BLOB_B.to_vec(), buf);
}
assert!(rx.next().await.is_none());
// TODO: we rely here on the blob being small enough to not get broken up into multiple chunks.
// Test with some bigger blob too
}

View file

@ -6,12 +6,10 @@ use crate::proto::GRPCPathInfoServiceWrapper;
use crate::proto::PathInfo;
use crate::proto::{GetPathInfoRequest, Node, SymlinkNode};
use crate::tests::fixtures::DUMMY_OUTPUT_HASH;
use crate::tests::utils::{
gen_blob_service, gen_chunk_service, gen_directory_service, gen_pathinfo_service,
};
use crate::tests::utils::{gen_blob_service, gen_directory_service, gen_pathinfo_service};
use tonic::Request;
/// generates a GRPCPathInfoService out of blob, chunk, directory and pathinfo services.
/// generates a GRPCPathInfoService out of blob, directory and pathinfo services.
///
/// We only interact with it via the PathInfo GRPC interface.
/// It uses the NonCachingNARCalculationService NARCalculationService to
@ -19,11 +17,7 @@ use tonic::Request;
fn gen_grpc_service() -> impl GRPCPathInfoService {
GRPCPathInfoServiceWrapper::new(
gen_pathinfo_service(),
NonCachingNARCalculationService::new(
gen_blob_service(),
gen_chunk_service(),
gen_directory_service(),
),
NonCachingNARCalculationService::new(gen_blob_service(), gen_directory_service()),
)
}

View file

@ -1,4 +1,4 @@
use super::utils::{gen_blob_service, gen_chunk_service, gen_directory_service};
use super::utils::{gen_blob_service, gen_directory_service};
use crate::blobservice::BlobService;
use crate::directoryservice::DirectoryService;
use crate::import::import_path;
@ -21,7 +21,6 @@ fn symlink() {
let root_node = import_path(
&mut gen_blob_service(),
&mut gen_chunk_service(),
&mut gen_directory_service(),
tmpdir.path().join("doesntmatter"),
)
@ -46,7 +45,6 @@ fn single_file() {
let root_node = import_path(
&mut blob_service,
&mut gen_chunk_service(),
&mut gen_directory_service(),
tmpdir.path().join("root"),
)
@ -64,13 +62,8 @@ fn single_file() {
// ensure the blob has been uploaded
assert!(blob_service
.stat(&proto::StatBlobRequest {
digest: HELLOWORLD_BLOB_DIGEST.to_vec(),
include_chunks: false,
..Default::default()
})
.unwrap()
.is_some());
.has(&HELLOWORLD_BLOB_DIGEST.to_vec().try_into().unwrap())
.unwrap());
}
#[test]
@ -89,12 +82,7 @@ fn complicated() {
let mut blob_service = gen_blob_service();
let mut directory_service = gen_directory_service();
let root_node = import_path(
&mut blob_service,
&mut gen_chunk_service(),
&mut directory_service,
tmpdir.path(),
)
let root_node = import_path(&mut blob_service, &mut directory_service, tmpdir.path())
.expect("must succeed");
// ensure root_node matched expectations
@ -124,11 +112,6 @@ fn complicated() {
// ensure EMPTY_BLOB_CONTENTS has been uploaded
assert!(blob_service
.stat(&proto::StatBlobRequest {
digest: EMPTY_BLOB_DIGEST.to_vec(),
include_chunks: false,
include_bao: false
})
.unwrap()
.is_some());
.has(&EMPTY_BLOB_DIGEST.to_vec().try_into().unwrap())
.unwrap());
}

View file

@ -1,21 +1,17 @@
use crate::blobservice::BlobService;
use crate::chunkservice::ChunkService;
use crate::blobservice::BlobWriter;
use crate::directoryservice::DirectoryService;
use crate::nar::NARRenderer;
use crate::proto;
use crate::proto::DirectoryNode;
use crate::proto::FileNode;
use crate::proto::SymlinkNode;
use crate::tests::fixtures::*;
use crate::tests::utils::*;
use std::io;
#[test]
fn single_symlink() {
let renderer = NARRenderer::new(
gen_blob_service(),
gen_chunk_service(),
gen_directory_service(),
);
let renderer = NARRenderer::new(gen_blob_service(), gen_directory_service());
// don't put anything in the stores, as we don't actually do any requests.
let mut buf: Vec<u8> = vec![];
@ -37,11 +33,7 @@ fn single_symlink() {
/// match what's in the store.
#[test]
fn single_file_missing_blob() {
let renderer = NARRenderer::new(
gen_blob_service(),
gen_chunk_service(),
gen_directory_service(),
);
let renderer = NARRenderer::new(gen_blob_service(), gen_directory_service());
let mut buf: Vec<u8> = vec![];
let e = renderer
@ -56,10 +48,11 @@ fn single_file_missing_blob() {
)
.expect_err("must fail");
if let crate::nar::RenderError::BlobNotFound(actual_digest, _) = e {
assert_eq!(HELLOWORLD_BLOB_DIGEST.to_vec(), actual_digest);
} else {
panic!("unexpected error")
match e {
crate::nar::RenderError::NARWriterError(e) => {
assert_eq!(io::ErrorKind::NotFound, e.kind());
}
_ => panic!("unexpected error: {:?}", e),
}
}
@ -68,29 +61,21 @@ fn single_file_missing_blob() {
#[test]
fn single_file_wrong_blob_size() {
let blob_service = gen_blob_service();
let chunk_service = gen_chunk_service();
// insert blob and chunk into the stores
chunk_service
.put(HELLOWORLD_BLOB_CONTENTS.to_vec())
.unwrap();
blob_service
.put(
&HELLOWORLD_BLOB_DIGEST,
proto::BlobMeta {
chunks: vec![proto::blob_meta::ChunkMeta {
digest: HELLOWORLD_BLOB_DIGEST.to_vec(),
size: HELLOWORLD_BLOB_CONTENTS.len() as u32,
}],
..Default::default()
},
// insert blob into the store
let mut writer = blob_service.open_write().unwrap();
io::copy(
&mut io::Cursor::new(HELLOWORLD_BLOB_CONTENTS.to_vec()),
&mut writer,
)
.unwrap();
assert_eq!(HELLOWORLD_BLOB_DIGEST.to_vec(), writer.close().unwrap());
let renderer = NARRenderer::new(blob_service, chunk_service, gen_directory_service());
let renderer = NARRenderer::new(blob_service, gen_directory_service());
// Test with a root FileNode of a too big size
{
let mut buf: Vec<u8> = vec![];
let e = renderer
.write_nar(
&mut buf,
@ -103,49 +88,52 @@ fn single_file_wrong_blob_size() {
)
.expect_err("must fail");
if let crate::nar::RenderError::UnexpectedBlobMeta(digest, _, expected_size, actual_size) = e {
assert_eq!(
digest,
HELLOWORLD_BLOB_DIGEST.to_vec(),
"expect digest to match"
);
assert_eq!(
expected_size, 42,
"expected expected size to be what's passed in the request"
);
assert_eq!(
actual_size,
HELLOWORLD_BLOB_CONTENTS.len() as u32,
"expected actual size to be correct"
);
} else {
panic!("unexpected error")
match e {
crate::nar::RenderError::NARWriterError(e) => {
assert_eq!(io::ErrorKind::UnexpectedEof, e.kind());
}
_ => panic!("unexpected error: {:?}", e),
}
}
// Test with a root FileNode of a too small size
{
let mut buf: Vec<u8> = vec![];
let e = renderer
.write_nar(
&mut buf,
&crate::proto::node::Node::File(FileNode {
name: "doesntmatter".to_string(),
digest: HELLOWORLD_BLOB_DIGEST.to_vec(),
size: 2, // <- note the wrong size here!
executable: false,
}),
)
.expect_err("must fail");
match e {
crate::nar::RenderError::NARWriterError(e) => {
assert_eq!(io::ErrorKind::InvalidInput, e.kind());
}
_ => panic!("unexpected error: {:?}", e),
}
}
}
#[test]
fn single_file() {
let blob_service = gen_blob_service();
let chunk_service = gen_chunk_service();
chunk_service
.put(HELLOWORLD_BLOB_CONTENTS.to_vec())
.unwrap();
blob_service
.put(
&HELLOWORLD_BLOB_DIGEST,
proto::BlobMeta {
chunks: vec![proto::blob_meta::ChunkMeta {
digest: HELLOWORLD_BLOB_DIGEST.to_vec(),
size: HELLOWORLD_BLOB_CONTENTS.len() as u32,
}],
..Default::default()
},
// insert blob into the store
let mut writer = blob_service.open_write().unwrap();
io::copy(
&mut io::Cursor::new(HELLOWORLD_BLOB_CONTENTS.to_vec()),
&mut writer,
)
.unwrap();
assert_eq!(HELLOWORLD_BLOB_DIGEST.to_vec(), writer.close().unwrap());
let renderer = NARRenderer::new(blob_service, chunk_service, gen_directory_service());
let renderer = NARRenderer::new(blob_service, gen_directory_service());
let mut buf: Vec<u8> = vec![];
renderer
@ -166,31 +154,24 @@ fn single_file() {
#[test]
fn test_complicated() {
let blob_service = gen_blob_service();
let chunk_service = gen_chunk_service();
let directory_service = gen_directory_service();
// put all data into the stores.
let digest = chunk_service.put(EMPTY_BLOB_CONTENTS.to_vec()).unwrap();
blob_service
.put(
&digest,
proto::BlobMeta {
chunks: vec![proto::blob_meta::ChunkMeta {
digest: digest.to_vec(),
size: EMPTY_BLOB_CONTENTS.len() as u32,
}],
..Default::default()
},
// insert blob into the store
let mut writer = blob_service.open_write().unwrap();
io::copy(
&mut io::Cursor::new(EMPTY_BLOB_CONTENTS.to_vec()),
&mut writer,
)
.unwrap();
assert_eq!(EMPTY_BLOB_DIGEST.to_vec(), writer.close().unwrap());
directory_service.put(DIRECTORY_WITH_KEEP.clone()).unwrap();
directory_service
.put(DIRECTORY_COMPLICATED.clone())
.unwrap();
let renderer = NARRenderer::new(blob_service, chunk_service, directory_service);
let renderer = NARRenderer::new(blob_service, directory_service);
let mut buf: Vec<u8> = vec![];
renderer

View file

@ -1,6 +1,5 @@
use crate::{
blobservice::{BlobService, MemoryBlobService},
chunkservice::{ChunkService, MemoryChunkService},
directoryservice::{DirectoryService, MemoryDirectoryService},
pathinfoservice::{MemoryPathInfoService, PathInfoService},
};
@ -9,10 +8,6 @@ pub fn gen_blob_service() -> impl BlobService + Send + Sync + Clone + 'static {
MemoryBlobService::default()
}
pub fn gen_chunk_service() -> impl ChunkService + Clone {
MemoryChunkService::default()
}
pub fn gen_directory_service() -> impl DirectoryService + Send + Sync + Clone + 'static {
MemoryDirectoryService::default()
}