refactor(tvix/glue): simplify TvixStoreIO further
We don't need to spawn in all these places, we can just block_on directly, this is all IO bound. This also means, we don't need to clone any of the service handles (except preserving clone-ability of the BlobService). Change-Id: I7d90f4d6a263a98491caa071ada538a5197a5472 Reviewed-on: https://cl.tvl.fyi/c/depot/+/10540 Reviewed-by: raitobezarius <tvl@lahfa.xyz> Autosubmit: flokli <flokli@flokli.de> Tested-by: BuildkiteCI
This commit is contained in:
parent
4284cd82ef
commit
5f0360c566
2 changed files with 44 additions and 46 deletions
|
@ -94,7 +94,7 @@ fn interpret(code: &str, path: Option<PathBuf>, args: &Args, explain: bool) -> b
|
||||||
eval.io_handle = Box::new(tvix_glue::tvix_io::TvixIO::new(TvixStoreIO::new(
|
eval.io_handle = Box::new(tvix_glue::tvix_io::TvixIO::new(TvixStoreIO::new(
|
||||||
blob_service,
|
blob_service,
|
||||||
directory_service,
|
directory_service,
|
||||||
path_info_service.into(), // we need an Arc<_> here.
|
path_info_service,
|
||||||
tokio_runtime.handle().clone(),
|
tokio_runtime.handle().clone(),
|
||||||
)));
|
)));
|
||||||
|
|
||||||
|
|
|
@ -5,7 +5,6 @@ use std::{
|
||||||
io,
|
io,
|
||||||
ops::Deref,
|
ops::Deref,
|
||||||
path::{Path, PathBuf},
|
path::{Path, PathBuf},
|
||||||
sync::Arc,
|
|
||||||
};
|
};
|
||||||
use tokio::io::AsyncReadExt;
|
use tokio::io::AsyncReadExt;
|
||||||
use tracing::{error, instrument, warn};
|
use tracing::{error, instrument, warn};
|
||||||
|
@ -26,19 +25,23 @@ use tvix_store::pathinfoservice::PathInfoService;
|
||||||
/// This is to both cover cases of syntactically valid store paths, that exist
|
/// This is to both cover cases of syntactically valid store paths, that exist
|
||||||
/// on the filesystem (still managed by Nix), as well as being able to read
|
/// on the filesystem (still managed by Nix), as well as being able to read
|
||||||
/// files outside store paths.
|
/// files outside store paths.
|
||||||
pub struct TvixStoreIO {
|
pub struct TvixStoreIO<BS, DS, PS> {
|
||||||
blob_service: Arc<dyn BlobService>,
|
blob_service: BS,
|
||||||
directory_service: Arc<dyn DirectoryService>,
|
directory_service: DS,
|
||||||
path_info_service: Arc<dyn PathInfoService>,
|
path_info_service: PS,
|
||||||
std_io: StdIO,
|
std_io: StdIO,
|
||||||
tokio_handle: tokio::runtime::Handle,
|
tokio_handle: tokio::runtime::Handle,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TvixStoreIO {
|
impl<BS, DS, PS> TvixStoreIO<BS, DS, PS>
|
||||||
|
where
|
||||||
|
DS: Deref<Target = dyn DirectoryService>,
|
||||||
|
PS: Deref<Target = dyn PathInfoService>,
|
||||||
|
{
|
||||||
pub fn new(
|
pub fn new(
|
||||||
blob_service: Arc<dyn BlobService>,
|
blob_service: BS,
|
||||||
directory_service: Arc<dyn DirectoryService>,
|
directory_service: DS,
|
||||||
path_info_service: Arc<dyn PathInfoService>,
|
path_info_service: PS,
|
||||||
tokio_handle: tokio::runtime::Handle,
|
tokio_handle: tokio::runtime::Handle,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
|
@ -97,7 +100,12 @@ impl TvixStoreIO {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl EvalIO for TvixStoreIO {
|
impl<BS, DS, PS> EvalIO for TvixStoreIO<BS, DS, PS>
|
||||||
|
where
|
||||||
|
BS: Deref<Target = dyn BlobService> + Clone,
|
||||||
|
DS: Deref<Target = dyn DirectoryService>,
|
||||||
|
PS: Deref<Target = dyn PathInfoService>,
|
||||||
|
{
|
||||||
#[instrument(skip(self), ret, err)]
|
#[instrument(skip(self), ret, err)]
|
||||||
fn path_exists(&self, path: &Path) -> io::Result<bool> {
|
fn path_exists(&self, path: &Path) -> io::Result<bool> {
|
||||||
if let Ok((store_path, sub_path)) =
|
if let Ok((store_path, sub_path)) =
|
||||||
|
@ -144,11 +152,9 @@ impl EvalIO for TvixStoreIO {
|
||||||
)
|
)
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
let blob_service = self.blob_service.clone();
|
self.tokio_handle.block_on(async {
|
||||||
|
|
||||||
let task = self.tokio_handle.spawn(async move {
|
|
||||||
let mut reader = {
|
let mut reader = {
|
||||||
let resp = blob_service.open_read(&digest).await?;
|
let resp = self.blob_service.deref().open_read(&digest).await?;
|
||||||
match resp {
|
match resp {
|
||||||
Some(blob_reader) => blob_reader,
|
Some(blob_reader) => blob_reader,
|
||||||
None => {
|
None => {
|
||||||
|
@ -168,9 +174,7 @@ impl EvalIO for TvixStoreIO {
|
||||||
|
|
||||||
reader.read_to_string(&mut buf).await?;
|
reader.read_to_string(&mut buf).await?;
|
||||||
Ok(buf)
|
Ok(buf)
|
||||||
});
|
})
|
||||||
|
|
||||||
self.tokio_handle.block_on(task).unwrap()
|
|
||||||
}
|
}
|
||||||
Node::Symlink(_symlink_node) => Err(io::Error::new(
|
Node::Symlink(_symlink_node) => Err(io::Error::new(
|
||||||
io::ErrorKind::Unsupported,
|
io::ErrorKind::Unsupported,
|
||||||
|
@ -208,12 +212,10 @@ impl EvalIO for TvixStoreIO {
|
||||||
)
|
)
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
let directory_service = self.directory_service.clone();
|
if let Some(directory) = self
|
||||||
let digest_clone = digest.clone();
|
|
||||||
let task = self
|
|
||||||
.tokio_handle
|
.tokio_handle
|
||||||
.spawn(async move { directory_service.get(&digest_clone).await });
|
.block_on(async { self.directory_service.deref().get(&digest).await })?
|
||||||
if let Some(directory) = self.tokio_handle.block_on(task).unwrap()? {
|
{
|
||||||
let mut children: Vec<(bytes::Bytes, FileType)> = Vec::new();
|
let mut children: Vec<(bytes::Bytes, FileType)> = Vec::new();
|
||||||
for node in directory.nodes() {
|
for node in directory.nodes() {
|
||||||
children.push(match node {
|
children.push(match node {
|
||||||
|
@ -258,23 +260,15 @@ impl EvalIO for TvixStoreIO {
|
||||||
|
|
||||||
#[instrument(skip(self), ret, err)]
|
#[instrument(skip(self), ret, err)]
|
||||||
fn import_path(&self, path: &Path) -> io::Result<PathBuf> {
|
fn import_path(&self, path: &Path) -> io::Result<PathBuf> {
|
||||||
let task = self.tokio_handle.spawn({
|
let output_path = self.tokio_handle.block_on(async {
|
||||||
let blob_service = self.blob_service.clone();
|
tvix_store::utils::import_path(
|
||||||
let directory_service = self.directory_service.clone();
|
path,
|
||||||
let path_info_service = self.path_info_service.clone();
|
self.blob_service.deref(),
|
||||||
let path = path.to_owned();
|
self.directory_service.deref(),
|
||||||
async move {
|
self.path_info_service.deref(),
|
||||||
tvix_store::utils::import_path(
|
)
|
||||||
path,
|
.await
|
||||||
blob_service,
|
})?;
|
||||||
directory_service,
|
|
||||||
path_info_service,
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
let output_path = self.tokio_handle.block_on(task)??;
|
|
||||||
|
|
||||||
Ok(output_path.to_absolute_path().into())
|
Ok(output_path.to_absolute_path().into())
|
||||||
}
|
}
|
||||||
|
@ -290,9 +284,12 @@ mod tests {
|
||||||
use std::{cell::RefCell, path::Path, rc::Rc, sync::Arc};
|
use std::{cell::RefCell, path::Path, rc::Rc, sync::Arc};
|
||||||
|
|
||||||
use tempfile::TempDir;
|
use tempfile::TempDir;
|
||||||
use tvix_castore::{blobservice::MemoryBlobService, directoryservice::MemoryDirectoryService};
|
use tvix_castore::{
|
||||||
|
blobservice::{BlobService, MemoryBlobService},
|
||||||
|
directoryservice::{DirectoryService, MemoryDirectoryService},
|
||||||
|
};
|
||||||
use tvix_eval::EvaluationResult;
|
use tvix_eval::EvaluationResult;
|
||||||
use tvix_store::pathinfoservice::MemoryPathInfoService;
|
use tvix_store::pathinfoservice::{MemoryPathInfoService, PathInfoService};
|
||||||
|
|
||||||
use crate::{builtins::add_derivation_builtins, known_paths::KnownPaths};
|
use crate::{builtins::add_derivation_builtins, known_paths::KnownPaths};
|
||||||
|
|
||||||
|
@ -304,12 +301,13 @@ mod tests {
|
||||||
fn eval(str: &str) -> EvaluationResult {
|
fn eval(str: &str) -> EvaluationResult {
|
||||||
let mut eval = tvix_eval::Evaluation::new_impure();
|
let mut eval = tvix_eval::Evaluation::new_impure();
|
||||||
|
|
||||||
let blob_service = Arc::new(MemoryBlobService::default());
|
let blob_service = Arc::new(MemoryBlobService::default()) as Arc<dyn BlobService>;
|
||||||
let directory_service = Arc::new(MemoryDirectoryService::default());
|
let directory_service =
|
||||||
let path_info_service = Arc::new(MemoryPathInfoService::new(
|
Arc::new(MemoryDirectoryService::default()) as Arc<dyn DirectoryService>;
|
||||||
|
let path_info_service = Box::new(MemoryPathInfoService::new(
|
||||||
blob_service.clone(),
|
blob_service.clone(),
|
||||||
directory_service.clone(),
|
directory_service.clone(),
|
||||||
));
|
)) as Box<dyn PathInfoService>;
|
||||||
let runtime = tokio::runtime::Runtime::new().unwrap();
|
let runtime = tokio::runtime::Runtime::new().unwrap();
|
||||||
|
|
||||||
eval.io_handle = Box::new(TvixStoreIO::new(
|
eval.io_handle = Box::new(TvixStoreIO::new(
|
||||||
|
|
Loading…
Reference in a new issue