refactor(tvix/store/nar/import): use AsRef

We need to be a bit careful and pass the BlobService around (similar to
how we already do with the directory_putter), but that allows getting
rid of a bunch of annoying trait bounds.

We also stop spawning additional tasks where we can just use block_on.

Change-Id: If36de0ee947d2c779d20a384308241d2262d4764
Reviewed-on: https://cl.tvl.fyi/c/depot/+/10580
Reviewed-by: raitobezarius <tvl@lahfa.xyz>
Tested-by: BuildkiteCI
Autosubmit: flokli <flokli@flokli.de>
This commit is contained in:
Florian Klink 2024-01-09 11:25:12 +02:00 committed by clbot
parent 0009383c07
commit 99f675ecef

View file

@ -1,10 +1,6 @@
use std::{
io::{self, BufRead},
ops::Deref,
};
use bytes::Bytes; use bytes::Bytes;
use nix_compat::nar; use nix_compat::nar;
use std::io::{self, BufRead};
use tokio_util::io::SyncIoBridge; use tokio_util::io::SyncIoBridge;
use tracing::warn; use tracing::warn;
use tvix_castore::{ use tvix_castore::{
@ -28,19 +24,19 @@ pub fn read_nar<R, BS, DS>(
) -> io::Result<castorepb::node::Node> ) -> io::Result<castorepb::node::Node>
where where
R: BufRead + Send, R: BufRead + Send,
BS: Deref<Target = dyn BlobService> + Clone + Send + 'static, BS: AsRef<dyn BlobService>,
DS: Deref<Target = dyn DirectoryService>, DS: AsRef<dyn DirectoryService>,
{ {
let handle = tokio::runtime::Handle::current(); let handle = tokio::runtime::Handle::current();
let directory_putter = directory_service.put_multiple_start(); let directory_putter = directory_service.as_ref().put_multiple_start();
let node = nix_compat::nar::reader::open(r)?; let node = nix_compat::nar::reader::open(r)?;
let (root_node, mut directory_putter) = process_node( let (root_node, mut directory_putter, _) = process_node(
handle.clone(), handle.clone(),
"".into(), // this is the root node, it has an empty name "".into(), // this is the root node, it has an empty name
node, node,
blob_service, &blob_service,
directory_putter, directory_putter,
)?; )?;
@ -84,9 +80,9 @@ fn process_node<BS>(
node: nar::reader::Node, node: nar::reader::Node,
blob_service: BS, blob_service: BS,
directory_putter: Box<dyn DirectoryPutter>, directory_putter: Box<dyn DirectoryPutter>,
) -> io::Result<(castorepb::node::Node, Box<dyn DirectoryPutter>)> ) -> io::Result<(castorepb::node::Node, Box<dyn DirectoryPutter>, BS)>
where where
BS: Deref<Target = dyn BlobService> + Clone + Send + 'static, BS: AsRef<dyn BlobService>,
{ {
Ok(match node { Ok(match node {
nar::reader::Node::Symlink { target } => ( nar::reader::Node::Symlink { target } => (
@ -95,6 +91,7 @@ where
target: target.into(), target: target.into(),
}), }),
directory_putter, directory_putter,
blob_service,
), ),
nar::reader::Node::File { executable, reader } => ( nar::reader::Node::File { executable, reader } => (
castorepb::node::Node::File(process_file_reader( castorepb::node::Node::File(process_file_reader(
@ -102,17 +99,19 @@ where
name, name,
reader, reader,
executable, executable,
blob_service, &blob_service,
)?), )?),
directory_putter, directory_putter,
blob_service,
), ),
nar::reader::Node::Directory(dir_reader) => { nar::reader::Node::Directory(dir_reader) => {
let (directory_node, directory_putter) = let (directory_node, directory_putter, blob_service_back) =
process_dir_reader(handle, name, dir_reader, blob_service, directory_putter)?; process_dir_reader(handle, name, dir_reader, blob_service, directory_putter)?;
( (
castorepb::node::Node::Directory(directory_node), castorepb::node::Node::Directory(directory_node),
directory_putter, directory_putter,
blob_service_back,
) )
} }
}) })
@ -128,14 +127,13 @@ fn process_file_reader<BS>(
blob_service: BS, blob_service: BS,
) -> io::Result<castorepb::FileNode> ) -> io::Result<castorepb::FileNode>
where where
BS: Deref<Target = dyn BlobService> + Clone + Send + 'static, BS: AsRef<dyn BlobService>,
{ {
// store the length. If we read any other length, reading will fail. // store the length. If we read any other length, reading will fail.
let expected_len = file_reader.len(); let expected_len = file_reader.len();
// prepare writing a new blob. // prepare writing a new blob.
let blob_writer = let blob_writer = handle.block_on(async { blob_service.as_ref().open_write().await });
handle.block_on(handle.spawn(async move { blob_service.open_write().await }))?;
// write the blob. // write the blob.
let mut blob_writer = { let mut blob_writer = {
@ -149,7 +147,7 @@ where
}; };
// close the blob_writer, retrieve the digest. // close the blob_writer, retrieve the digest.
let blob_digest = handle.block_on(handle.spawn(async move { blob_writer.close().await }))??; let blob_digest = handle.block_on(async { blob_writer.close().await })?;
Ok(castorepb::FileNode { Ok(castorepb::FileNode {
name, name,
@ -170,22 +168,24 @@ fn process_dir_reader<BS>(
mut dir_reader: nar::reader::DirReader, mut dir_reader: nar::reader::DirReader,
blob_service: BS, blob_service: BS,
directory_putter: Box<dyn DirectoryPutter>, directory_putter: Box<dyn DirectoryPutter>,
) -> io::Result<(castorepb::DirectoryNode, Box<dyn DirectoryPutter>)> ) -> io::Result<(castorepb::DirectoryNode, Box<dyn DirectoryPutter>, BS)>
where where
BS: Deref<Target = dyn BlobService> + Clone + Send + 'static, BS: AsRef<dyn BlobService>,
{ {
let mut directory = castorepb::Directory::default(); let mut directory = castorepb::Directory::default();
let mut directory_putter = directory_putter; let mut directory_putter = directory_putter;
let mut blob_service = blob_service;
while let Some(entry) = dir_reader.next()? { while let Some(entry) = dir_reader.next()? {
let (node, directory_putter_back) = process_node( let (node, directory_putter_back, blob_service_back) = process_node(
handle.clone(), handle.clone(),
entry.name.into(), entry.name.into(),
entry.node, entry.node,
blob_service.clone(), blob_service,
directory_putter, directory_putter,
)?; )?;
blob_service = blob_service_back;
directory_putter = directory_putter_back; directory_putter = directory_putter_back;
match node { match node {
@ -213,6 +213,7 @@ where
size: directory_size, size: directory_size,
}, },
directory_putter, directory_putter,
blob_service,
)) ))
} }
@ -238,8 +239,8 @@ mod test {
#[tokio::test] #[tokio::test]
async fn single_symlink() { async fn single_symlink() {
let blob_service: Arc<dyn BlobService> = gen_blob_service().into(); let blob_service = gen_blob_service();
let directory_service: Arc<dyn DirectoryService> = gen_directory_service().into(); let directory_service = gen_directory_service();
let handle = tokio::runtime::Handle::current(); let handle = tokio::runtime::Handle::current();
@ -267,7 +268,7 @@ mod test {
#[tokio::test] #[tokio::test]
async fn single_file() { async fn single_file() {
let blob_service: Arc<dyn BlobService> = gen_blob_service().into(); let blob_service: Arc<dyn BlobService> = gen_blob_service().into();
let directory_service: Arc<dyn DirectoryService> = gen_directory_service().into(); let directory_service = gen_directory_service();
let handle = tokio::runtime::Handle::current(); let handle = tokio::runtime::Handle::current();