feat(tvix/store): add chunkservice
This adds the simpler ChunkService trait, and an implementation for it using sled, and one using a HashMap. Change-Id: Icb0fdc41b37b44e9e9e4f548d0f4acae1d83b71e Reviewed-on: https://cl.tvl.fyi/c/depot/+/8086 Reviewed-by: raitobezarius <tvl@lahfa.xyz> Tested-by: BuildkiteCI
This commit is contained in:
parent
3f27fe3484
commit
119aa43171
4 changed files with 148 additions and 0 deletions
61
tvix/store/src/chunkservice/memory.rs
Normal file
61
tvix/store/src/chunkservice/memory.rs
Normal file
|
@ -0,0 +1,61 @@
|
|||
use data_encoding::BASE64;
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
sync::{Arc, RwLock},
|
||||
};
|
||||
use tracing::instrument;
|
||||
|
||||
use crate::Error;
|
||||
|
||||
use super::ChunkService;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct MemoryChunkService {
|
||||
db: Arc<RwLock<HashMap<Vec<u8>, Vec<u8>>>>,
|
||||
}
|
||||
|
||||
impl MemoryChunkService {
|
||||
pub fn new() -> Self {
|
||||
let db = Arc::new(RwLock::new(HashMap::default()));
|
||||
|
||||
Self { db }
|
||||
}
|
||||
}
|
||||
|
||||
impl ChunkService for MemoryChunkService {
|
||||
#[instrument(skip(self, digest), fields(chunk.digest=BASE64.encode(digest)))]
|
||||
fn has(&self, digest: &[u8]) -> Result<bool, Error> {
|
||||
let db = self.db.read().unwrap();
|
||||
Ok(db.get(digest).is_some())
|
||||
}
|
||||
|
||||
#[instrument(skip(self), fields(chunk.digest=BASE64.encode(digest)))]
|
||||
fn get(&self, digest: &[u8]) -> Result<Option<Vec<u8>>, Error> {
|
||||
let db = self.db.read().unwrap();
|
||||
match db.get(digest) {
|
||||
None => Ok(None),
|
||||
Some(data) => {
|
||||
// calculate the hash to verify this is really what we expect
|
||||
let actual_digest = blake3::hash(&data).as_bytes().to_vec();
|
||||
if actual_digest != digest {
|
||||
return Err(Error::StorageError(format!(
|
||||
"invalid hash encountered when reading chunk, expected {}, got {}",
|
||||
BASE64.encode(digest),
|
||||
BASE64.encode(&actual_digest),
|
||||
)));
|
||||
}
|
||||
Ok(Some(data.clone()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip(self, data))]
|
||||
fn put(&self, data: Vec<u8>) -> Result<Vec<u8>, Error> {
|
||||
let digest = blake3::hash(&data).as_bytes().to_vec();
|
||||
|
||||
let mut db = self.db.write().unwrap();
|
||||
db.insert(digest.clone(), data);
|
||||
|
||||
Ok(digest)
|
||||
}
|
||||
}
|
23
tvix/store/src/chunkservice/mod.rs
Normal file
23
tvix/store/src/chunkservice/mod.rs
Normal file
|
@ -0,0 +1,23 @@
|
|||
pub mod memory;
|
||||
pub mod sled;
|
||||
|
||||
use crate::Error;
|
||||
|
||||
pub use self::memory::MemoryChunkService;
|
||||
pub use self::sled::SledChunkService;
|
||||
|
||||
/// The base trait all ChunkService services need to implement.
|
||||
/// It allows checking for the existence, download and upload of chunks.
|
||||
/// It's usually used after consulting a [crate::blobservice::BlobService] for
|
||||
/// chunking information.
|
||||
pub trait ChunkService {
|
||||
/// check if the service has a chunk, given by its digest.
|
||||
fn has(&self, digest: &[u8]) -> Result<bool, Error>;
|
||||
|
||||
/// retrieve a chunk by its digest. Implementations MUST validate the digest
|
||||
/// matches.
|
||||
fn get(&self, digest: &[u8]) -> Result<Option<Vec<u8>>, Error>;
|
||||
|
||||
/// insert a chunk. returns the digest of the chunk, or an error.
|
||||
fn put(&self, data: Vec<u8>) -> Result<Vec<u8>, Error>;
|
||||
}
|
63
tvix/store/src/chunkservice/sled.rs
Normal file
63
tvix/store/src/chunkservice/sled.rs
Normal file
|
@ -0,0 +1,63 @@
|
|||
use std::path::PathBuf;
|
||||
|
||||
use data_encoding::BASE64;
|
||||
use tracing::instrument;
|
||||
|
||||
use crate::Error;
|
||||
|
||||
use super::ChunkService;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct SledChunkService {
|
||||
db: sled::Db,
|
||||
}
|
||||
|
||||
impl SledChunkService {
|
||||
pub fn new(p: PathBuf) -> Result<Self, sled::Error> {
|
||||
let config = sled::Config::default().use_compression(true).path(p);
|
||||
let db = config.open()?;
|
||||
|
||||
Ok(Self { db })
|
||||
}
|
||||
}
|
||||
|
||||
impl ChunkService for SledChunkService {
|
||||
#[instrument(name = "SledChunkService::has", skip(self, digest), fields(chunk.digest=BASE64.encode(digest)))]
|
||||
fn has(&self, digest: &[u8]) -> Result<bool, Error> {
|
||||
match self.db.get(digest) {
|
||||
Ok(None) => Ok(false),
|
||||
Ok(Some(_)) => Ok(true),
|
||||
Err(e) => Err(Error::StorageError(e.to_string())),
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(name = "SledChunkService::get", skip(self), fields(chunk.digest=BASE64.encode(digest)))]
|
||||
fn get(&self, digest: &[u8]) -> Result<Option<Vec<u8>>, Error> {
|
||||
match self.db.get(digest) {
|
||||
Ok(None) => Ok(None),
|
||||
Ok(Some(data)) => {
|
||||
// calculate the hash to verify this is really what we expect
|
||||
let actual_digest = blake3::hash(&data).as_bytes().to_vec();
|
||||
if actual_digest != digest {
|
||||
return Err(Error::StorageError(format!(
|
||||
"invalid hash encountered when reading chunk, expected {}, got {}",
|
||||
BASE64.encode(digest),
|
||||
BASE64.encode(&actual_digest),
|
||||
)));
|
||||
}
|
||||
Ok(Some(Vec::from(&*data)))
|
||||
}
|
||||
Err(e) => Err(Error::StorageError(e.to_string())),
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(name = "SledChunkService::put", skip(self, data))]
|
||||
fn put(&self, data: Vec<u8>) -> Result<Vec<u8>, Error> {
|
||||
let digest = blake3::hash(&data).as_bytes().to_vec();
|
||||
let result = self.db.insert(&digest, data);
|
||||
if let Err(e) = result {
|
||||
return Err(Error::StorageError(e.to_string()));
|
||||
}
|
||||
Ok(digest)
|
||||
}
|
||||
}
|
|
@ -2,6 +2,7 @@ pub mod client;
|
|||
|
||||
mod errors;
|
||||
|
||||
pub mod chunkservice;
|
||||
pub mod proto;
|
||||
|
||||
pub mod dummy_blob_service;
|
||||
|
|
Loading…
Reference in a new issue