refactor(tvix/glue): drop ingest_entries_sync

Make this function async, and do the block_on on the (single) callsite.

Change-Id: Ib8b0b54ab5370fe02ef95f38a45d8866868a9d60
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11285
Reviewed-by: Connor Brewster <cbrewster@hey.com>
Tested-by: BuildkiteCI
This commit is contained in:
Florian Klink 2024-03-28 22:05:46 +01:00 committed by flokli
parent bd32024047
commit 156a5a0fb6
2 changed files with 20 additions and 18 deletions

View file

@ -102,12 +102,15 @@ async fn filtered_ingest(
pin_mut!(entries_stream); pin_mut!(entries_stream);
state state.tokio_handle.block_on(async {
.ingest_entries_sync(entries_stream) state
.map_err(|err| ErrorKind::IO { .ingest_entries(entries_stream)
path: Some(path.to_path_buf()), .await
error: err.into(), .map_err(|err| ErrorKind::IO {
}) path: Some(path.to_path_buf()),
error: err.into(),
})
})
} }
#[builtins(state = "Rc<TvixStoreIO>")] #[builtins(state = "Rc<TvixStoreIO>")]

View file

@ -273,21 +273,20 @@ impl TvixStoreIO {
.map_err(|e| std::io::Error::new(io::ErrorKind::Other, e)) .map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))
} }
/// This forwards the ingestion to the [`tvix_castore::import::ingest_entries`] /// This forwards the ingestion to the [`tvix_castore::import::ingest_entries`],
/// with a [`tokio::runtime::Handle::block_on`] call for synchronicity. /// passing the blob_service and directory_service that's used.
pub(crate) fn ingest_entries_sync<S>(&self, entries_stream: S) -> io::Result<Node> /// The error is mapped to std::io::Error for simplicity.
pub(crate) async fn ingest_entries<S>(&self, entries_stream: S) -> io::Result<Node>
where where
S: Stream<Item = DirEntry> + Unpin, S: Stream<Item = DirEntry> + Unpin,
{ {
self.tokio_handle.block_on(async move { tvix_castore::import::ingest_entries(
tvix_castore::import::ingest_entries( &self.blob_service,
&self.blob_service, &self.directory_service,
&self.directory_service, entries_stream,
entries_stream, )
) .await
.await .map_err(|err| std::io::Error::new(io::ErrorKind::Other, err))
.map_err(|err| std::io::Error::new(io::ErrorKind::Other, err))
})
} }
pub(crate) async fn node_to_path_info( pub(crate) async fn node_to_path_info(