feat(tvix/castore): simple filesystem blob service
The simple filesystem `BlobService` enable a user to write blob store on an existing filesystem using a prefix-style layout in the provided root directory, e.g. the two first bytes of the blake3 hashes are used as directories prefixes. Change-Id: I3451a688a6f39027b9c6517d853b95a87adb3a52 Reviewed-on: https://cl.tvl.fyi/c/depot/+/10071 Autosubmit: raitobezarius <tvl@lahfa.xyz> Tested-by: BuildkiteCI Reviewed-by: flokli <flokli@flokli.de>
This commit is contained in:
parent
923a5737e6
commit
0ae32d45f6
6 changed files with 316 additions and 1 deletions
20
tvix/Cargo.lock
generated
20
tvix/Cargo.lock
generated
|
@ -138,6 +138,16 @@ dependencies = [
|
|||
"syn 2.0.39",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-tempfile"
|
||||
version = "0.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b37d4bb113c47e4f263d4b0221912ff5aa840a51bc9b7b47b024e1cf1926fd9b"
|
||||
dependencies = [
|
||||
"tokio",
|
||||
"uuid",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-trait"
|
||||
version = "0.1.68"
|
||||
|
@ -3093,6 +3103,7 @@ name = "tvix-castore"
|
|||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"async-stream",
|
||||
"async-tempfile",
|
||||
"blake3",
|
||||
"bstr",
|
||||
"bytes",
|
||||
|
@ -3334,6 +3345,15 @@ version = "0.2.1"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a"
|
||||
|
||||
[[package]]
|
||||
name = "uuid"
|
||||
version = "1.5.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "88ad59a7560b41a70d191093a945f0b87bc1deeda46fb237479708a1d6b6cdfc"
|
||||
dependencies = [
|
||||
"getrandom",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "valuable"
|
||||
version = "0.1.0"
|
||||
|
|
|
@ -470,6 +470,40 @@ rec {
|
|||
];
|
||||
|
||||
};
|
||||
"async-tempfile" = rec {
|
||||
crateName = "async-tempfile";
|
||||
version = "0.4.0";
|
||||
edition = "2021";
|
||||
sha256 = "16zx4qcwzq94n13pp6xwa4589apm5y8j20jb7lk4yzn42fqlnzdk";
|
||||
authors = [
|
||||
"Markus Mayer"
|
||||
];
|
||||
dependencies = [
|
||||
{
|
||||
name = "tokio";
|
||||
packageId = "tokio";
|
||||
features = [ "fs" ];
|
||||
}
|
||||
{
|
||||
name = "uuid";
|
||||
packageId = "uuid";
|
||||
optional = true;
|
||||
features = [ "v4" ];
|
||||
}
|
||||
];
|
||||
devDependencies = [
|
||||
{
|
||||
name = "tokio";
|
||||
packageId = "tokio";
|
||||
features = [ "rt-multi-thread" "macros" ];
|
||||
}
|
||||
];
|
||||
features = {
|
||||
"default" = [ "uuid" ];
|
||||
"uuid" = [ "dep:uuid" ];
|
||||
};
|
||||
resolvedDefaultFeatures = [ "default" "uuid" ];
|
||||
};
|
||||
"async-trait" = rec {
|
||||
crateName = "async-trait";
|
||||
version = "0.1.68";
|
||||
|
@ -9553,6 +9587,10 @@ rec {
|
|||
name = "async-stream";
|
||||
packageId = "async-stream";
|
||||
}
|
||||
{
|
||||
name = "async-tempfile";
|
||||
packageId = "async-tempfile";
|
||||
}
|
||||
{
|
||||
name = "blake3";
|
||||
packageId = "blake3";
|
||||
|
@ -10410,6 +10448,55 @@ rec {
|
|||
features = { };
|
||||
resolvedDefaultFeatures = [ "default" ];
|
||||
};
|
||||
"uuid" = rec {
|
||||
crateName = "uuid";
|
||||
version = "1.5.0";
|
||||
edition = "2018";
|
||||
sha256 = "1z6dnvba224p8wvv4vx4xpgc2yxqy12sk4qh346sfh8baskmkbc8";
|
||||
authors = [
|
||||
"Ashley Mannix<ashleymannix@live.com.au>"
|
||||
"Christopher Armstrong"
|
||||
"Dylan DPC<dylan.dpc@gmail.com>"
|
||||
"Hunar Roop Kahlon<hunar.roop@gmail.com>"
|
||||
];
|
||||
dependencies = [
|
||||
{
|
||||
name = "getrandom";
|
||||
packageId = "getrandom";
|
||||
rename = "getrandom";
|
||||
optional = true;
|
||||
}
|
||||
];
|
||||
features = {
|
||||
"arbitrary" = [ "dep:arbitrary" ];
|
||||
"atomic" = [ "dep:atomic" ];
|
||||
"borsh" = [ "dep:borsh" ];
|
||||
"bytemuck" = [ "dep:bytemuck" ];
|
||||
"default" = [ "std" ];
|
||||
"fast-rng" = [ "rng" "rand" ];
|
||||
"getrandom" = [ "dep:getrandom" ];
|
||||
"js" = [ "wasm-bindgen" "getrandom" "getrandom/js" ];
|
||||
"macro-diagnostics" = [ "uuid-macro-internal" ];
|
||||
"md-5" = [ "dep:md-5" ];
|
||||
"md5" = [ "md-5" ];
|
||||
"rand" = [ "dep:rand" ];
|
||||
"rng" = [ "getrandom" ];
|
||||
"serde" = [ "dep:serde" ];
|
||||
"sha1" = [ "sha1_smol" ];
|
||||
"sha1_smol" = [ "dep:sha1_smol" ];
|
||||
"slog" = [ "dep:slog" ];
|
||||
"uuid-macro-internal" = [ "dep:uuid-macro-internal" ];
|
||||
"v1" = [ "atomic" ];
|
||||
"v3" = [ "md5" ];
|
||||
"v4" = [ "rng" ];
|
||||
"v5" = [ "sha1" ];
|
||||
"v6" = [ "atomic" ];
|
||||
"v7" = [ "atomic" "rng" ];
|
||||
"wasm-bindgen" = [ "dep:wasm-bindgen" ];
|
||||
"zerocopy" = [ "dep:zerocopy" ];
|
||||
};
|
||||
resolvedDefaultFeatures = [ "default" "getrandom" "rng" "std" "v4" ];
|
||||
};
|
||||
"valuable" = rec {
|
||||
crateName = "valuable";
|
||||
version = "0.1.0";
|
||||
|
|
|
@ -23,6 +23,7 @@ tracing = "0.1.37"
|
|||
url = "2.4.0"
|
||||
walkdir = "2.4.0"
|
||||
bstr = "1.6.0"
|
||||
async-tempfile = "0.4.0"
|
||||
|
||||
[dependencies.tonic-reflection]
|
||||
optional = true
|
||||
|
|
|
@ -3,7 +3,9 @@ use url::Url;
|
|||
|
||||
use crate::{proto::blob_service_client::BlobServiceClient, Error};
|
||||
|
||||
use super::{BlobService, GRPCBlobService, MemoryBlobService, SledBlobService};
|
||||
use super::{
|
||||
BlobService, GRPCBlobService, MemoryBlobService, SimpleFilesystemBlobService, SledBlobService,
|
||||
};
|
||||
|
||||
/// Constructs a new instance of a [BlobService] from an URI.
|
||||
///
|
||||
|
@ -11,6 +13,7 @@ use super::{BlobService, GRPCBlobService, MemoryBlobService, SledBlobService};
|
|||
/// - `memory://` ([MemoryBlobService])
|
||||
/// - `sled://` ([SledBlobService])
|
||||
/// - `grpc+*://` ([GRPCBlobService])
|
||||
/// - `simplefs://` ([SimpleFilesystemBlobService])
|
||||
///
|
||||
/// See their `from_url` methods for more details about their syntax.
|
||||
pub async fn from_addr(uri: &str) -> Result<Arc<dyn BlobService>, crate::Error> {
|
||||
|
@ -54,6 +57,12 @@ pub async fn from_addr(uri: &str) -> Result<Arc<dyn BlobService>, crate::Error>
|
|||
// Constructing the channel is handled by tvix_castore::channel::from_url.
|
||||
let client = BlobServiceClient::new(crate::tonic::channel_from_url(&url).await?);
|
||||
Arc::new(GRPCBlobService::from_client(client))
|
||||
} else if url.scheme() == "simplefs" {
|
||||
if url.path().is_empty() {
|
||||
return Err(Error::StorageError("Invalid filesystem path".to_string()));
|
||||
}
|
||||
|
||||
Arc::new(SimpleFilesystemBlobService::new(url.path().into()).await?)
|
||||
} else {
|
||||
Err(crate::Error::StorageError(format!(
|
||||
"unknown scheme: {}",
|
||||
|
|
|
@ -7,6 +7,7 @@ mod from_addr;
|
|||
mod grpc;
|
||||
mod memory;
|
||||
mod naive_seeker;
|
||||
mod simplefs;
|
||||
mod sled;
|
||||
|
||||
#[cfg(test)]
|
||||
|
@ -15,6 +16,7 @@ mod tests;
|
|||
pub use self::from_addr::from_addr;
|
||||
pub use self::grpc::GRPCBlobService;
|
||||
pub use self::memory::MemoryBlobService;
|
||||
pub use self::simplefs::SimpleFilesystemBlobService;
|
||||
pub use self::sled::SledBlobService;
|
||||
|
||||
/// The base trait all BlobService services need to implement.
|
||||
|
@ -51,3 +53,4 @@ pub trait BlobReader: tokio::io::AsyncRead + tokio::io::AsyncSeek + Send + Unpin
|
|||
|
||||
/// A [`io::Cursor<Vec<u8>>`] can be used as a BlobReader.
|
||||
impl BlobReader for io::Cursor<Vec<u8>> {}
|
||||
impl BlobReader for tokio::fs::File {}
|
||||
|
|
195
tvix/castore/src/blobservice/simplefs.rs
Normal file
195
tvix/castore/src/blobservice/simplefs.rs
Normal file
|
@ -0,0 +1,195 @@
|
|||
use std::{
|
||||
io,
|
||||
path::{Path, PathBuf},
|
||||
pin::pin,
|
||||
task::Poll,
|
||||
};
|
||||
|
||||
use bytes::Buf;
|
||||
use data_encoding::HEXLOWER;
|
||||
use pin_project_lite::pin_project;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tonic::async_trait;
|
||||
use tracing::instrument;
|
||||
|
||||
use crate::B3Digest;
|
||||
|
||||
use super::{BlobReader, BlobService, BlobWriter};
|
||||
|
||||
/// Connects to a tvix-store BlobService on an existing path backed by a POSIX-compliant
|
||||
/// filesystem.
|
||||
///
|
||||
/// It takes an existing path, builds a `tmp` directory and a `blobs` directory inside of it. All
|
||||
/// blobs received are staged in that `tmp` directory, then they are moved **atomically** into
|
||||
/// `blobs/B3DIGEST[:2]/B3DIGEST[2:]` in a sharding style, e.g. `abcdef` gets turned into `ab/cdef`
|
||||
///
|
||||
/// **Disclaimer** : This very simple implementation is subject to change and does not give any
|
||||
/// final guarantees on the on-disk format.
|
||||
#[derive(Clone)]
|
||||
pub struct SimpleFilesystemBlobService {
|
||||
/// Where the blobs are located on a filesystem already mounted.
|
||||
path: PathBuf,
|
||||
}
|
||||
|
||||
impl SimpleFilesystemBlobService {
|
||||
pub async fn new(path: PathBuf) -> std::io::Result<Self> {
|
||||
tokio::fs::create_dir_all(&path).await?;
|
||||
tokio::fs::create_dir_all(path.join("tmp")).await?;
|
||||
tokio::fs::create_dir_all(path.join("blobs")).await?;
|
||||
|
||||
Ok(Self { path })
|
||||
}
|
||||
}
|
||||
|
||||
fn derive_path(root: &Path, digest: &B3Digest) -> PathBuf {
|
||||
let prefix = HEXLOWER.encode(&digest.as_slice()[..2]);
|
||||
let pathname = HEXLOWER.encode(digest.as_slice());
|
||||
|
||||
root.join("blobs").join(prefix).join(pathname)
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl BlobService for SimpleFilesystemBlobService {
|
||||
#[instrument(skip_all, ret, err, fields(blob.digest=%digest))]
|
||||
async fn has(&self, digest: &B3Digest) -> io::Result<bool> {
|
||||
Ok(tokio::fs::try_exists(derive_path(&self.path, digest)).await?)
|
||||
}
|
||||
|
||||
#[instrument(skip_all, err, fields(blob.digest=%digest))]
|
||||
async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> {
|
||||
let dst_path = derive_path(&self.path, digest);
|
||||
let reader = match tokio::fs::File::open(dst_path).await {
|
||||
Ok(file) => {
|
||||
let reader: Box<dyn BlobReader> = Box::new(file);
|
||||
Ok(Some(reader))
|
||||
}
|
||||
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
|
||||
Err(e) => Err(e),
|
||||
};
|
||||
|
||||
Ok(reader?)
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn open_write(&self) -> Box<dyn BlobWriter> {
|
||||
let file = match async_tempfile::TempFile::new_in(self.path.join("tmp")).await {
|
||||
Ok(file) => Ok(file),
|
||||
Err(e) => match e {
|
||||
async_tempfile::Error::Io(io_error) => Err(io_error),
|
||||
async_tempfile::Error::InvalidFile => Err(std::io::Error::new(
|
||||
std::io::ErrorKind::NotFound,
|
||||
"invalid or missing file specified",
|
||||
)),
|
||||
async_tempfile::Error::InvalidDirectory => Err(std::io::Error::new(
|
||||
std::io::ErrorKind::NotFound,
|
||||
"invalid or missing directory specified",
|
||||
)),
|
||||
},
|
||||
};
|
||||
|
||||
Box::new(SimpleFilesystemBlobWriter {
|
||||
root: self.path.clone(),
|
||||
file,
|
||||
digester: blake3::Hasher::new(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pin_project! {
|
||||
struct SimpleFilesystemBlobWriter {
|
||||
root: PathBuf,
|
||||
file: std::io::Result<async_tempfile::TempFile>,
|
||||
digester: blake3::Hasher
|
||||
}
|
||||
}
|
||||
|
||||
impl tokio::io::AsyncWrite for SimpleFilesystemBlobWriter {
|
||||
fn poll_write(
|
||||
mut self: std::pin::Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
buf: &[u8],
|
||||
) -> std::task::Poll<Result<usize, std::io::Error>> {
|
||||
if let Err(e) = self.file.as_mut() {
|
||||
return Poll::Ready(Err(std::mem::replace(
|
||||
e,
|
||||
std::io::Error::new(
|
||||
std::io::ErrorKind::NotConnected,
|
||||
"this file is already closed",
|
||||
),
|
||||
)));
|
||||
}
|
||||
|
||||
let writer = self.file.as_mut().unwrap();
|
||||
match pin!(writer).poll_write(cx, buf) {
|
||||
Poll::Ready(Ok(n)) => {
|
||||
let this = self.project();
|
||||
this.digester.update(buf.take(n).into_inner());
|
||||
Poll::Ready(Ok(n))
|
||||
}
|
||||
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
|
||||
Poll::Pending => Poll::Pending,
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_flush(
|
||||
mut self: std::pin::Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> std::task::Poll<Result<(), std::io::Error>> {
|
||||
if let Err(e) = self.file.as_mut() {
|
||||
return Poll::Ready(Err(std::mem::replace(
|
||||
e,
|
||||
std::io::Error::new(
|
||||
std::io::ErrorKind::NotConnected,
|
||||
"this file is already closed",
|
||||
),
|
||||
)));
|
||||
}
|
||||
|
||||
let writer = self.file.as_mut().unwrap();
|
||||
pin!(writer).poll_flush(cx)
|
||||
}
|
||||
|
||||
fn poll_shutdown(
|
||||
mut self: std::pin::Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> std::task::Poll<Result<(), std::io::Error>> {
|
||||
if let Err(e) = self.file.as_mut() {
|
||||
return Poll::Ready(Err(std::mem::replace(
|
||||
e,
|
||||
std::io::Error::new(
|
||||
std::io::ErrorKind::NotConnected,
|
||||
"this file is already closed",
|
||||
),
|
||||
)));
|
||||
}
|
||||
|
||||
let writer = self.file.as_mut().unwrap();
|
||||
pin!(writer).poll_shutdown(cx)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl BlobWriter for SimpleFilesystemBlobWriter {
|
||||
async fn close(&mut self) -> io::Result<B3Digest> {
|
||||
if let Err(e) = self.file.as_mut() {
|
||||
return Err(std::mem::replace(
|
||||
e,
|
||||
std::io::Error::new(
|
||||
std::io::ErrorKind::NotConnected,
|
||||
"this file is already closed",
|
||||
),
|
||||
));
|
||||
}
|
||||
|
||||
let writer = self.file.as_mut().unwrap();
|
||||
writer.sync_all().await?;
|
||||
writer.flush().await?;
|
||||
|
||||
let digest: B3Digest = self.digester.finalize().as_bytes().into();
|
||||
let dst_path = derive_path(&self.root, &digest);
|
||||
tokio::fs::create_dir_all(dst_path.parent().unwrap()).await?;
|
||||
tokio::fs::rename(writer.file_path(), dst_path).await?;
|
||||
|
||||
Ok(digest)
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue