feat(tvix/store): concurrently upload small blobs during nar ingestion
Currently all blobs are uploaded serially when ingesting NARs. If a NAR contains many, small blobs, ingestion may become slow if there is a lot of round-trip latency to the blob service. This makes the NAR ingester use the ConcurrentBlobUploader which allows for buffering small blobs in memory so they can be uploaded concurrently to the blob service without blocking further deserialization. Change-Id: I093a73770232df12d9a11e5d901b99c08505c3cb Reviewed-on: https://cl.tvl.fyi/c/depot/+/11694 Reviewed-by: flokli <flokli@flokli.de> Tested-by: BuildkiteCI
This commit is contained in:
parent
b0aaff25fa
commit
e7be342256
1 changed files with 22 additions and 12 deletions
|
@ -3,7 +3,10 @@ use tokio::{io::AsyncBufRead, sync::mpsc, try_join};
|
||||||
use tvix_castore::{
|
use tvix_castore::{
|
||||||
blobservice::BlobService,
|
blobservice::BlobService,
|
||||||
directoryservice::DirectoryService,
|
directoryservice::DirectoryService,
|
||||||
import::{ingest_entries, IngestionEntry, IngestionError},
|
import::{
|
||||||
|
blobs::{self, ConcurrentBlobUploader},
|
||||||
|
ingest_entries, IngestionEntry, IngestionError,
|
||||||
|
},
|
||||||
proto::{node::Node, NamedNode},
|
proto::{node::Node, NamedNode},
|
||||||
PathBuf,
|
PathBuf,
|
||||||
};
|
};
|
||||||
|
@ -18,7 +21,7 @@ pub async fn ingest_nar<R, BS, DS>(
|
||||||
) -> Result<Node, IngestionError<Error>>
|
) -> Result<Node, IngestionError<Error>>
|
||||||
where
|
where
|
||||||
R: AsyncBufRead + Unpin + Send,
|
R: AsyncBufRead + Unpin + Send,
|
||||||
BS: BlobService + Clone,
|
BS: BlobService + Clone + 'static,
|
||||||
DS: DirectoryService,
|
DS: DirectoryService,
|
||||||
{
|
{
|
||||||
// open the NAR for reading.
|
// open the NAR for reading.
|
||||||
|
@ -29,14 +32,22 @@ where
|
||||||
let rx = tokio_stream::wrappers::ReceiverStream::new(rx);
|
let rx = tokio_stream::wrappers::ReceiverStream::new(rx);
|
||||||
|
|
||||||
let produce = async move {
|
let produce = async move {
|
||||||
|
let mut blob_uploader = ConcurrentBlobUploader::new(blob_service);
|
||||||
|
|
||||||
let res = produce_nar_inner(
|
let res = produce_nar_inner(
|
||||||
blob_service,
|
&mut blob_uploader,
|
||||||
root_node,
|
root_node,
|
||||||
"root".parse().unwrap(), // HACK: the root node sent to ingest_entries may not be ROOT.
|
"root".parse().unwrap(), // HACK: the root node sent to ingest_entries may not be ROOT.
|
||||||
tx.clone(),
|
tx.clone(),
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
|
if let Err(err) = blob_uploader.join().await {
|
||||||
|
tx.send(Err(err.into()))
|
||||||
|
.await
|
||||||
|
.map_err(|e| Error::IO(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)))?;
|
||||||
|
}
|
||||||
|
|
||||||
tx.send(res)
|
tx.send(res)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| Error::IO(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)))?;
|
.map_err(|e| Error::IO(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)))?;
|
||||||
|
@ -54,13 +65,13 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn produce_nar_inner<BS>(
|
async fn produce_nar_inner<BS>(
|
||||||
blob_service: BS,
|
blob_uploader: &mut ConcurrentBlobUploader<BS>,
|
||||||
node: nar_reader::Node<'_, '_>,
|
node: nar_reader::Node<'_, '_>,
|
||||||
path: PathBuf,
|
path: PathBuf,
|
||||||
tx: mpsc::Sender<Result<IngestionEntry, Error>>,
|
tx: mpsc::Sender<Result<IngestionEntry, Error>>,
|
||||||
) -> Result<IngestionEntry, Error>
|
) -> Result<IngestionEntry, Error>
|
||||||
where
|
where
|
||||||
BS: BlobService + Clone,
|
BS: BlobService + Clone + 'static,
|
||||||
{
|
{
|
||||||
Ok(match node {
|
Ok(match node {
|
||||||
nar_reader::Node::Symlink { target } => IngestionEntry::Symlink { path, target },
|
nar_reader::Node::Symlink { target } => IngestionEntry::Symlink { path, target },
|
||||||
|
@ -68,12 +79,8 @@ where
|
||||||
executable,
|
executable,
|
||||||
mut reader,
|
mut reader,
|
||||||
} => {
|
} => {
|
||||||
let (digest, size) = {
|
let size = reader.len();
|
||||||
let mut blob_writer = blob_service.open_write().await;
|
let digest = blob_uploader.upload(&path, size, &mut reader).await?;
|
||||||
let size = tokio::io::copy_buf(&mut reader, &mut blob_writer).await?;
|
|
||||||
|
|
||||||
(blob_writer.close().await?, size)
|
|
||||||
};
|
|
||||||
|
|
||||||
IngestionEntry::Regular {
|
IngestionEntry::Regular {
|
||||||
path,
|
path,
|
||||||
|
@ -91,7 +98,7 @@ where
|
||||||
.expect("Tvix bug: failed to join name");
|
.expect("Tvix bug: failed to join name");
|
||||||
|
|
||||||
let entry = Box::pin(produce_nar_inner(
|
let entry = Box::pin(produce_nar_inner(
|
||||||
blob_service.clone(),
|
blob_uploader,
|
||||||
entry.node,
|
entry.node,
|
||||||
path,
|
path,
|
||||||
tx.clone(),
|
tx.clone(),
|
||||||
|
@ -112,6 +119,9 @@ where
|
||||||
pub enum Error {
|
pub enum Error {
|
||||||
#[error(transparent)]
|
#[error(transparent)]
|
||||||
IO(#[from] std::io::Error),
|
IO(#[from] std::io::Error),
|
||||||
|
|
||||||
|
#[error(transparent)]
|
||||||
|
BlobUpload(#[from] blobs::Error),
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|
Loading…
Reference in a new issue