refactor(tvix/store/fs): make fetch_directory_inode_data async
To make this easier, move it outside of TvixStoreFs, and accept the DirectoryService as a function argument, so we don't need to worry about the lifetime of self. This also aligns with how we spawn async tasks inside the rest of TvixStoreFs. Change-Id: I3b95072209d32039f05aed122240f2d6db7ad172 Reviewed-on: https://cl.tvl.fyi/c/depot/+/9713 Reviewed-by: Connor Brewster <cbrewster@hey.com> Autosubmit: flokli <flokli@flokli.de> Tested-by: BuildkiteCI
This commit is contained in:
parent
9757bf6377
commit
173641ed37
1 changed files with 32 additions and 30 deletions
|
@ -30,7 +30,7 @@ use tokio::{
|
||||||
io::{AsyncBufReadExt, AsyncSeekExt},
|
io::{AsyncBufReadExt, AsyncSeekExt},
|
||||||
sync::mpsc,
|
sync::mpsc,
|
||||||
};
|
};
|
||||||
use tracing::{debug, info_span, warn};
|
use tracing::{debug, info_span, instrument, warn};
|
||||||
use tvix_castore::{
|
use tvix_castore::{
|
||||||
blobservice::{BlobReader, BlobService},
|
blobservice::{BlobReader, BlobService},
|
||||||
directoryservice::DirectoryService,
|
directoryservice::DirectoryService,
|
||||||
|
@ -211,33 +211,6 @@ impl TvixStoreFs {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// This will lookup a directory by digest, and will turn it into a
|
|
||||||
/// [InodeData::Directory(DirectoryInodeData::Populated(..))].
|
|
||||||
/// This is both used to initially insert the root node of a store path,
|
|
||||||
/// as well as when looking up an intermediate DirectoryNode.
|
|
||||||
fn fetch_directory_inode_data(&self, directory_digest: &B3Digest) -> Result<InodeData, Error> {
|
|
||||||
let directory_service = self.directory_service.clone();
|
|
||||||
let directory_digest_clone = directory_digest.clone();
|
|
||||||
let task = self
|
|
||||||
.tokio_handle
|
|
||||||
.spawn(async move { directory_service.get(&directory_digest_clone).await });
|
|
||||||
match self.tokio_handle.block_on(task).unwrap() {
|
|
||||||
Err(e) => {
|
|
||||||
warn!(e = e.to_string(), directory.digest=%directory_digest, "failed to get directory");
|
|
||||||
Err(e)
|
|
||||||
}
|
|
||||||
// If the Directory can't be found, this is a hole, bail out.
|
|
||||||
Ok(None) => {
|
|
||||||
tracing::error!(directory.digest=%directory_digest, "directory not found in directory service");
|
|
||||||
Err(Error::StorageError(format!(
|
|
||||||
"directory {} not found",
|
|
||||||
directory_digest
|
|
||||||
)))
|
|
||||||
}
|
|
||||||
Ok(Some(directory)) => Ok(directory.into()),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl FileSystem for TvixStoreFs {
|
impl FileSystem for TvixStoreFs {
|
||||||
|
@ -319,7 +292,12 @@ impl FileSystem for TvixStoreFs {
|
||||||
return Err(io::Error::from_raw_os_error(libc::ENOTDIR));
|
return Err(io::Error::from_raw_os_error(libc::ENOTDIR));
|
||||||
}
|
}
|
||||||
InodeData::Directory(DirectoryInodeData::Sparse(ref parent_digest, _)) => {
|
InodeData::Directory(DirectoryInodeData::Sparse(ref parent_digest, _)) => {
|
||||||
match self.fetch_directory_inode_data(parent_digest) {
|
let directory_service = self.directory_service.clone();
|
||||||
|
let parent_digest = parent_digest.to_owned();
|
||||||
|
let task = self.tokio_handle.spawn(async move {
|
||||||
|
fetch_directory_inode_data(directory_service, &parent_digest).await
|
||||||
|
});
|
||||||
|
match self.tokio_handle.block_on(task).unwrap() {
|
||||||
Ok(new_data) => {
|
Ok(new_data) => {
|
||||||
// update data in [self.inode_tracker] with populated variant.
|
// update data in [self.inode_tracker] with populated variant.
|
||||||
// FUTUREWORK: change put to return the data after
|
// FUTUREWORK: change put to return the data after
|
||||||
|
@ -464,7 +442,12 @@ impl FileSystem for TvixStoreFs {
|
||||||
return Err(io::Error::from_raw_os_error(libc::ENOTDIR));
|
return Err(io::Error::from_raw_os_error(libc::ENOTDIR));
|
||||||
}
|
}
|
||||||
InodeData::Directory(DirectoryInodeData::Sparse(ref directory_digest, _)) => {
|
InodeData::Directory(DirectoryInodeData::Sparse(ref directory_digest, _)) => {
|
||||||
match self.fetch_directory_inode_data(directory_digest) {
|
let directory_digest = directory_digest.to_owned();
|
||||||
|
let directory_service = self.directory_service.clone();
|
||||||
|
let task = self.tokio_handle.spawn(async move {
|
||||||
|
fetch_directory_inode_data(directory_service, &directory_digest).await
|
||||||
|
});
|
||||||
|
match self.tokio_handle.block_on(task).unwrap() {
|
||||||
Ok(new_data) => {
|
Ok(new_data) => {
|
||||||
// update data in [self.inode_tracker] with populated variant.
|
// update data in [self.inode_tracker] with populated variant.
|
||||||
// FUTUREWORK: change put to return the data after
|
// FUTUREWORK: change put to return the data after
|
||||||
|
@ -688,3 +671,22 @@ impl FileSystem for TvixStoreFs {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// This will lookup a directory by digest, and will turn it into a
|
||||||
|
/// [InodeData::Directory(DirectoryInodeData::Populated(..))].
|
||||||
|
/// This is both used to initially insert the root node of a store path,
|
||||||
|
/// as well as when looking up an intermediate DirectoryNode.
|
||||||
|
#[instrument(skip_all, fields(directory.digest = %directory_digest), err)]
|
||||||
|
async fn fetch_directory_inode_data<DS: DirectoryService + ?Sized>(
|
||||||
|
directory_service: Arc<DS>,
|
||||||
|
directory_digest: &B3Digest,
|
||||||
|
) -> Result<InodeData, Error> {
|
||||||
|
match directory_service.get(directory_digest).await? {
|
||||||
|
// If the Directory can't be found, this is a hole, bail out.
|
||||||
|
None => Err(Error::StorageError(format!(
|
||||||
|
"directory {} not found",
|
||||||
|
directory_digest
|
||||||
|
))),
|
||||||
|
Some(directory) => Ok(directory.into()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue