refactor(tvix/store/nar/import): be a bit more generic
Change-Id: If9a536949f36f428abea1a893f937fe7063e2f41 Reviewed-on: https://cl.tvl.fyi/c/depot/+/10517 Autosubmit: flokli <flokli@flokli.de> Tested-by: BuildkiteCI Reviewed-by: raitobezarius <tvl@lahfa.xyz>
This commit is contained in:
parent
1b62f82b10
commit
09a92b78d2
1 changed files with 40 additions and 29 deletions
|
@ -1,6 +1,6 @@
|
|||
use std::{
|
||||
io::{self, BufRead},
|
||||
sync::Arc,
|
||||
ops::Deref,
|
||||
};
|
||||
|
||||
use bytes::Bytes;
|
||||
|
@ -21,11 +21,16 @@ use tvix_castore::{
|
|||
/// This function is not async (because the NAR reader is not)
|
||||
/// and calls [tokio::task::block_in_place] when interacting with backing
|
||||
/// services, so make sure to only call this with spawn_blocking.
|
||||
pub fn read_nar<R: BufRead + Send>(
|
||||
pub fn read_nar<R, BS, DS>(
|
||||
r: &mut R,
|
||||
blob_service: Arc<dyn BlobService>,
|
||||
directory_service: Arc<dyn DirectoryService>,
|
||||
) -> io::Result<castorepb::node::Node> {
|
||||
blob_service: BS,
|
||||
directory_service: DS,
|
||||
) -> io::Result<castorepb::node::Node>
|
||||
where
|
||||
R: BufRead + Send,
|
||||
BS: Deref<Target = dyn BlobService> + Clone + Send + 'static,
|
||||
DS: Deref<Target = dyn DirectoryService>,
|
||||
{
|
||||
let handle = tokio::runtime::Handle::current();
|
||||
|
||||
let directory_putter = directory_service.put_multiple_start();
|
||||
|
@ -73,13 +78,16 @@ pub fn read_nar<R: BufRead + Send>(
|
|||
///
|
||||
/// [DirectoryPutter] is passed around, so a single instance of it can be used,
|
||||
/// which is sufficient, as this reads through the whole NAR linerarly.
|
||||
fn process_node(
|
||||
fn process_node<BS>(
|
||||
handle: tokio::runtime::Handle,
|
||||
name: bytes::Bytes,
|
||||
node: nar::reader::Node,
|
||||
blob_service: Arc<dyn BlobService>,
|
||||
blob_service: BS,
|
||||
directory_putter: Box<dyn DirectoryPutter>,
|
||||
) -> io::Result<(castorepb::node::Node, Box<dyn DirectoryPutter>)> {
|
||||
) -> io::Result<(castorepb::node::Node, Box<dyn DirectoryPutter>)>
|
||||
where
|
||||
BS: Deref<Target = dyn BlobService> + Clone + Send + 'static,
|
||||
{
|
||||
Ok(match node {
|
||||
nar::reader::Node::Symlink { target } => (
|
||||
castorepb::node::Node::Symlink(castorepb::SymlinkNode {
|
||||
|
@ -99,13 +107,8 @@ fn process_node(
|
|||
directory_putter,
|
||||
),
|
||||
nar::reader::Node::Directory(dir_reader) => {
|
||||
let (directory_node, directory_putter) = process_dir_reader(
|
||||
handle,
|
||||
name,
|
||||
dir_reader,
|
||||
blob_service.clone(),
|
||||
directory_putter,
|
||||
)?;
|
||||
let (directory_node, directory_putter) =
|
||||
process_dir_reader(handle, name, dir_reader, blob_service, directory_putter)?;
|
||||
|
||||
(
|
||||
castorepb::node::Node::Directory(directory_node),
|
||||
|
@ -117,21 +120,22 @@ fn process_node(
|
|||
|
||||
/// Given a name and [nar::reader::FileReader], this ingests the file into the
|
||||
/// passed [BlobService] and returns a [castorepb::FileNode].
|
||||
fn process_file_reader(
|
||||
fn process_file_reader<BS>(
|
||||
handle: tokio::runtime::Handle,
|
||||
name: Bytes,
|
||||
mut file_reader: nar::reader::FileReader,
|
||||
executable: bool,
|
||||
blob_service: Arc<dyn BlobService>,
|
||||
) -> io::Result<castorepb::FileNode> {
|
||||
blob_service: BS,
|
||||
) -> io::Result<castorepb::FileNode>
|
||||
where
|
||||
BS: Deref<Target = dyn BlobService> + Clone + Send + 'static,
|
||||
{
|
||||
// store the length. If we read any other length, reading will fail.
|
||||
let expected_len = file_reader.len();
|
||||
|
||||
// prepare writing a new blob.
|
||||
let blob_writer = handle.block_on(handle.spawn({
|
||||
let blob_service = blob_service.clone();
|
||||
async move { blob_service.open_write().await }
|
||||
}))?;
|
||||
let blob_writer =
|
||||
handle.block_on(handle.spawn(async move { blob_service.open_write().await }))?;
|
||||
|
||||
// write the blob.
|
||||
let mut blob_writer = {
|
||||
|
@ -160,13 +164,16 @@ fn process_file_reader(
|
|||
///
|
||||
/// [DirectoryPutter] is passed around, so a single instance of it can be used,
|
||||
/// which is sufficient, as this reads through the whole NAR linerarly.
|
||||
fn process_dir_reader(
|
||||
fn process_dir_reader<BS>(
|
||||
handle: tokio::runtime::Handle,
|
||||
name: Bytes,
|
||||
mut dir_reader: nar::reader::DirReader,
|
||||
blob_service: Arc<dyn BlobService>,
|
||||
blob_service: BS,
|
||||
directory_putter: Box<dyn DirectoryPutter>,
|
||||
) -> io::Result<(castorepb::DirectoryNode, Box<dyn DirectoryPutter>)> {
|
||||
) -> io::Result<(castorepb::DirectoryNode, Box<dyn DirectoryPutter>)>
|
||||
where
|
||||
BS: Deref<Target = dyn BlobService> + Clone + Send + 'static,
|
||||
{
|
||||
let mut directory = castorepb::Directory::default();
|
||||
|
||||
let mut directory_putter = directory_putter;
|
||||
|
@ -228,14 +235,17 @@ mod test {
|
|||
|
||||
#[tokio::test]
|
||||
async fn single_symlink() {
|
||||
let blob_service = gen_blob_service();
|
||||
let directory_service = gen_directory_service();
|
||||
|
||||
let handle = tokio::runtime::Handle::current();
|
||||
|
||||
let root_node = handle
|
||||
.spawn_blocking(|| {
|
||||
read_nar(
|
||||
&mut Cursor::new(&NAR_CONTENTS_SYMLINK.clone()),
|
||||
gen_blob_service(),
|
||||
gen_directory_service(),
|
||||
blob_service,
|
||||
directory_service,
|
||||
)
|
||||
})
|
||||
.await
|
||||
|
@ -254,17 +264,18 @@ mod test {
|
|||
#[tokio::test]
|
||||
async fn single_file() {
|
||||
let blob_service = gen_blob_service();
|
||||
let directory_service = gen_directory_service();
|
||||
|
||||
let handle = tokio::runtime::Handle::current();
|
||||
|
||||
let root_node = handle
|
||||
.spawn_blocking({
|
||||
let blob_service = blob_service.clone();
|
||||
|| {
|
||||
move || {
|
||||
read_nar(
|
||||
&mut Cursor::new(&NAR_CONTENTS_HELLOWORLD.clone()),
|
||||
blob_service,
|
||||
gen_directory_service(),
|
||||
directory_service,
|
||||
)
|
||||
}
|
||||
})
|
||||
|
|
Loading…
Reference in a new issue