feat(tvix/store): use rayon to upload chunks concurrently
Look at the data that's written to us, and upload all chunks but the rest in parallel, using rayon. This required moving `upload_chunk` outside the struct, and accepting a ChunkService to use for upload (which it was previously getting from `self.chunk_service`). This doesn't speed up things too much for now, because things are still mostly linear. Change-Id: Id785b5705c3392214d2da1a5b6a182bcf5048c8d Reviewed-on: https://cl.tvl.fyi/c/depot/+/8195 Autosubmit: flokli <flokli@flokli.de> Tested-by: BuildkiteCI Reviewed-by: raitobezarius <tvl@lahfa.xyz>
This commit is contained in:
parent
2ef60282b6
commit
510927e43a
5 changed files with 66 additions and 39 deletions
1
tvix/Cargo.lock
generated
1
tvix/Cargo.lock
generated
|
@ -2796,6 +2796,7 @@ dependencies = [
|
||||||
"nix-compat",
|
"nix-compat",
|
||||||
"prost",
|
"prost",
|
||||||
"prost-build",
|
"prost-build",
|
||||||
|
"rayon",
|
||||||
"sha2 0.10.6",
|
"sha2 0.10.6",
|
||||||
"sled",
|
"sled",
|
||||||
"tempfile",
|
"tempfile",
|
||||||
|
|
|
@ -8330,6 +8330,10 @@ rec {
|
||||||
name = "prost";
|
name = "prost";
|
||||||
packageId = "prost";
|
packageId = "prost";
|
||||||
}
|
}
|
||||||
|
{
|
||||||
|
name = "rayon";
|
||||||
|
packageId = "rayon";
|
||||||
|
}
|
||||||
{
|
{
|
||||||
name = "sha2";
|
name = "sha2";
|
||||||
packageId = "sha2 0.10.6";
|
packageId = "sha2 0.10.6";
|
||||||
|
|
|
@ -14,10 +14,11 @@ count-write = "0.1.0"
|
||||||
data-encoding = "2.3.3"
|
data-encoding = "2.3.3"
|
||||||
fastcdc = "3.0.0"
|
fastcdc = "3.0.0"
|
||||||
lazy_static = "1.4.0"
|
lazy_static = "1.4.0"
|
||||||
|
nix-compat = { path = "../nix-compat" }
|
||||||
prost = "0.11.2"
|
prost = "0.11.2"
|
||||||
|
rayon = "1.6.1"
|
||||||
sha2 = "0.10.6"
|
sha2 = "0.10.6"
|
||||||
sled = { version = "0.34.7", features = ["compression"] }
|
sled = { version = "0.34.7", features = ["compression"] }
|
||||||
nix-compat = { path = "../nix-compat" }
|
|
||||||
thiserror = "1.0.38"
|
thiserror = "1.0.38"
|
||||||
tokio-stream = "0.1.11"
|
tokio-stream = "0.1.11"
|
||||||
tokio = { version = "1.23.0", features = ["rt-multi-thread"] }
|
tokio = { version = "1.23.0", features = ["rt-multi-thread"] }
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
use crate::chunkservice::ChunkService;
|
use crate::chunkservice::ChunkService;
|
||||||
use crate::{proto, Error};
|
use crate::{proto, Error};
|
||||||
|
use rayon::prelude::*;
|
||||||
use tracing::{debug, instrument};
|
use tracing::{debug, instrument};
|
||||||
|
|
||||||
pub struct BlobWriter<'a, CS: ChunkService> {
|
pub struct BlobWriter<'a, CS: ChunkService> {
|
||||||
|
@ -13,6 +14,31 @@ pub struct BlobWriter<'a, CS: ChunkService> {
|
||||||
buf: Vec<u8>,
|
buf: Vec<u8>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// upload a chunk to the chunk service, and return its digest (or an error) when done.
|
||||||
|
#[instrument(skip_all)]
|
||||||
|
fn upload_chunk<CS: ChunkService>(
|
||||||
|
chunk_service: &CS,
|
||||||
|
chunk_data: Vec<u8>,
|
||||||
|
) -> Result<Vec<u8>, Error> {
|
||||||
|
let mut hasher = blake3::Hasher::new();
|
||||||
|
// TODO: benchmark this number and factor it out
|
||||||
|
if chunk_data.len() >= 128 * 1024 {
|
||||||
|
hasher.update_rayon(&chunk_data);
|
||||||
|
} else {
|
||||||
|
hasher.update(&chunk_data);
|
||||||
|
}
|
||||||
|
let digest = hasher.finalize();
|
||||||
|
|
||||||
|
if chunk_service.has(digest.as_bytes())? {
|
||||||
|
debug!("already has chunk, skipping");
|
||||||
|
}
|
||||||
|
let digest_resp = chunk_service.put(chunk_data)?;
|
||||||
|
|
||||||
|
assert_eq!(digest_resp, digest.as_bytes());
|
||||||
|
|
||||||
|
Ok(digest.as_bytes().to_vec())
|
||||||
|
}
|
||||||
|
|
||||||
impl<'a, CS: ChunkService> BlobWriter<'a, CS> {
|
impl<'a, CS: ChunkService> BlobWriter<'a, CS> {
|
||||||
pub fn new(chunk_service: &'a CS) -> Self {
|
pub fn new(chunk_service: &'a CS) -> Self {
|
||||||
Self {
|
Self {
|
||||||
|
@ -35,7 +61,7 @@ impl<'a, CS: ChunkService> BlobWriter<'a, CS> {
|
||||||
// Also upload the last chunk (what's left in `self.buf`) to the chunk
|
// Also upload the last chunk (what's left in `self.buf`) to the chunk
|
||||||
// service and record it in BlobMeta.
|
// service and record it in BlobMeta.
|
||||||
let buf_len = self.buf.len() as u32;
|
let buf_len = self.buf.len() as u32;
|
||||||
let chunk_digest = self.upload_chunk(self.buf.clone())?;
|
let chunk_digest = upload_chunk(self.chunk_service, self.buf.clone())?;
|
||||||
|
|
||||||
self.blob_meta.chunks.push(proto::blob_meta::ChunkMeta {
|
self.blob_meta.chunks.push(proto::blob_meta::ChunkMeta {
|
||||||
digest: chunk_digest,
|
digest: chunk_digest,
|
||||||
|
@ -49,32 +75,11 @@ impl<'a, CS: ChunkService> BlobWriter<'a, CS> {
|
||||||
self.blob_meta.clone(),
|
self.blob_meta.clone(),
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
// upload a chunk to the chunk service, and return its digest (or an error) when done.
|
|
||||||
#[instrument(skip(self, chunk_data))]
|
|
||||||
fn upload_chunk(&mut self, chunk_data: Vec<u8>) -> Result<Vec<u8>, Error> {
|
|
||||||
let mut hasher = blake3::Hasher::new();
|
|
||||||
if chunk_data.len() >= 128 * 1024 {
|
|
||||||
hasher.update_rayon(&chunk_data);
|
|
||||||
} else {
|
|
||||||
hasher.update(&chunk_data);
|
|
||||||
}
|
|
||||||
let digest = hasher.finalize();
|
|
||||||
|
|
||||||
if self.chunk_service.has(digest.as_bytes())? {
|
|
||||||
debug!("already has chunk, skipping");
|
|
||||||
}
|
|
||||||
let digest_resp = self.chunk_service.put(chunk_data)?;
|
|
||||||
|
|
||||||
assert_eq!(digest_resp, digest.as_bytes());
|
|
||||||
|
|
||||||
Ok(digest.as_bytes().to_vec())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// This chunks up all data written using fastcdc, uploads all chunks to the
|
/// This chunks up all data written using fastcdc, uploads all chunks to the
|
||||||
// [ChunkService], and fills a [proto::BlobMeta] linking to these chunks.
|
// [ChunkService], and fills a [proto::BlobMeta] linking to these chunks.
|
||||||
impl<CS: ChunkService> std::io::Write for BlobWriter<'_, CS> {
|
impl<CS: ChunkService + std::marker::Sync> std::io::Write for BlobWriter<'_, CS> {
|
||||||
fn write(&mut self, input_buf: &[u8]) -> std::io::Result<usize> {
|
fn write(&mut self, input_buf: &[u8]) -> std::io::Result<usize> {
|
||||||
// calculate input_buf.len(), we need to return that later.
|
// calculate input_buf.len(), we need to return that later.
|
||||||
let input_buf_len = input_buf.len();
|
let input_buf_len = input_buf.len();
|
||||||
|
@ -107,6 +112,9 @@ impl<CS: ChunkService> std::io::Write for BlobWriter<'_, CS> {
|
||||||
chunker_max_size,
|
chunker_max_size,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// assemble a list of byte slices to be uploaded
|
||||||
|
let mut chunk_slices: Vec<&[u8]> = Vec::new();
|
||||||
|
|
||||||
// ask the chunker for cutting points in the buffer.
|
// ask the chunker for cutting points in the buffer.
|
||||||
let mut start_pos = 0_usize;
|
let mut start_pos = 0_usize;
|
||||||
let rest = loop {
|
let rest = loop {
|
||||||
|
@ -114,31 +122,44 @@ impl<CS: ChunkService> std::io::Write for BlobWriter<'_, CS> {
|
||||||
let (_fp, end_pos) = chunker.cut(start_pos, buf.len() - start_pos);
|
let (_fp, end_pos) = chunker.cut(start_pos, buf.len() - start_pos);
|
||||||
|
|
||||||
// whenever the last cut point is pointing to the end of the buffer,
|
// whenever the last cut point is pointing to the end of the buffer,
|
||||||
// keep that chunk left in there.
|
// return that from the loop.
|
||||||
// We don't know if the chunker decided to cut here simply because it was
|
// We don't know if the chunker decided to cut here simply because it was
|
||||||
// at the end of the buffer, or if it would also cut if there
|
// at the end of the buffer, or if it would also cut if there
|
||||||
// were more data.
|
// were more data.
|
||||||
//
|
//
|
||||||
// Split off all previous chunks and keep this chunk data in the buffer.
|
// Split off all previous chunks and keep this chunk data in the buffer.
|
||||||
if end_pos == buf.len() {
|
if end_pos == buf.len() {
|
||||||
break buf[start_pos..].to_vec();
|
break &buf[start_pos..];
|
||||||
}
|
}
|
||||||
|
|
||||||
// Upload that chunk to the chunk service and record it in BlobMeta.
|
// if it's an intermediate chunk, add it to chunk_slices.
|
||||||
// TODO: make upload_chunk async and upload concurrently?
|
// We'll later upload all of them in batch.
|
||||||
let chunk_data = &buf[start_pos..end_pos];
|
chunk_slices.push(&buf[start_pos..end_pos]);
|
||||||
let chunk_digest = self.upload_chunk(chunk_data.to_vec())?;
|
|
||||||
|
|
||||||
self.blob_meta.chunks.push(proto::blob_meta::ChunkMeta {
|
// advance start_pos over the processed chunk.
|
||||||
digest: chunk_digest,
|
|
||||||
size: chunk_data.len() as u32,
|
|
||||||
});
|
|
||||||
|
|
||||||
// move start_pos over the processed chunk.
|
|
||||||
start_pos = end_pos;
|
start_pos = end_pos;
|
||||||
};
|
};
|
||||||
|
|
||||||
self.buf = rest;
|
// Upload all chunks to the chunk service and map them to a ChunkMeta
|
||||||
|
let blob_meta_chunks: Vec<Result<proto::blob_meta::ChunkMeta, Error>> = chunk_slices
|
||||||
|
.into_par_iter()
|
||||||
|
.map(|chunk_slice| {
|
||||||
|
let chunk_service = self.chunk_service.clone();
|
||||||
|
let chunk_digest = upload_chunk(chunk_service.clone(), chunk_slice.to_vec())?;
|
||||||
|
|
||||||
|
Ok(proto::blob_meta::ChunkMeta {
|
||||||
|
digest: chunk_digest,
|
||||||
|
size: chunk_slice.len() as u32,
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
self.blob_meta.chunks = blob_meta_chunks
|
||||||
|
.into_iter()
|
||||||
|
.collect::<Result<Vec<proto::blob_meta::ChunkMeta>, Error>>()?;
|
||||||
|
|
||||||
|
// update buf to point to the rest we didn't upload.
|
||||||
|
self.buf = rest.to_vec();
|
||||||
|
|
||||||
Ok(input_buf_len)
|
Ok(input_buf_len)
|
||||||
}
|
}
|
||||||
|
|
|
@ -58,7 +58,7 @@ impl From<super::Error> for Error {
|
||||||
//
|
//
|
||||||
// It assumes the caller adds returned nodes to the directories it assembles.
|
// It assumes the caller adds returned nodes to the directories it assembles.
|
||||||
#[instrument(skip_all, fields(entry.file_type=?&entry.file_type(),entry.path=?entry.path()))]
|
#[instrument(skip_all, fields(entry.file_type=?&entry.file_type(),entry.path=?entry.path()))]
|
||||||
fn process_entry<BS: BlobService, CS: ChunkService, DS: DirectoryService>(
|
fn process_entry<BS: BlobService, CS: ChunkService + std::marker::Sync, DS: DirectoryService>(
|
||||||
blob_service: &mut BS,
|
blob_service: &mut BS,
|
||||||
chunk_service: &mut CS,
|
chunk_service: &mut CS,
|
||||||
directory_service: &mut DS,
|
directory_service: &mut DS,
|
||||||
|
@ -167,7 +167,7 @@ fn process_entry<BS: BlobService, CS: ChunkService, DS: DirectoryService>(
|
||||||
#[instrument(skip(blob_service, chunk_service, directory_service), fields(path=?p))]
|
#[instrument(skip(blob_service, chunk_service, directory_service), fields(path=?p))]
|
||||||
pub fn import_path<
|
pub fn import_path<
|
||||||
BS: BlobService,
|
BS: BlobService,
|
||||||
CS: ChunkService,
|
CS: ChunkService + std::marker::Sync,
|
||||||
DS: DirectoryService,
|
DS: DirectoryService,
|
||||||
P: AsRef<Path> + Debug,
|
P: AsRef<Path> + Debug,
|
||||||
>(
|
>(
|
||||||
|
|
Loading…
Reference in a new issue