refactor(tvix/castore/fs): use std::sync::Mutex

This allows us acquiring the lock in sync code still. Also, simplify
some of the error handling a bit.

Change-Id: I29e83b715f92808e95ecb0ae9de787339d1a371d
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11424
Reviewed-by: raitobezarius <tvl@lahfa.xyz>
Autosubmit: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
This commit is contained in:
Florian Klink 2024-04-15 11:02:49 +03:00 committed by flokli
parent 1bf6b9f5a0
commit 3ed7eda79b

View file

@ -27,6 +27,7 @@ use fuse_backend_rs::api::filesystem::{
use futures::StreamExt; use futures::StreamExt;
use parking_lot::RwLock; use parking_lot::RwLock;
use std::ffi::CStr; use std::ffi::CStr;
use std::sync::Mutex;
use std::{ use std::{
collections::HashMap, collections::HashMap,
io, io,
@ -98,7 +99,7 @@ pub struct TvixStoreFs<BS, DS, RN> {
/// This holds all open file handles /// This holds all open file handles
#[allow(clippy::type_complexity)] #[allow(clippy::type_complexity)]
file_handles: RwLock<HashMap<u64, Arc<tokio::sync::Mutex<Box<dyn BlobReader>>>>>, file_handles: RwLock<HashMap<u64, Arc<Mutex<Box<dyn BlobReader>>>>>,
next_file_handle: AtomicU64, next_file_handle: AtomicU64,
@ -528,7 +529,7 @@ where
debug!("add file handle {}", fh); debug!("add file handle {}", fh);
self.file_handles self.file_handles
.write() .write()
.insert(fh, Arc::new(tokio::sync::Mutex::new(blob_reader))); .insert(fh, Arc::new(Mutex::new(blob_reader)));
Ok(( Ok((
Some(fh), Some(fh),
@ -581,29 +582,31 @@ where
// We need to take out the blob reader from self.file_handles, so we can // We need to take out the blob reader from self.file_handles, so we can
// interact with it in the separate task. // interact with it in the separate task.
// On success, we pass it back out of the task, so we can put it back in self.file_handles. // On success, we pass it back out of the task, so we can put it back in self.file_handles.
let blob_reader = match self.file_handles.read().get(&handle) { let blob_reader = self
Some(blob_reader) => blob_reader.clone(), .file_handles
None => { .read()
.get(&handle)
.ok_or_else(|| {
warn!("file handle {} unknown", handle); warn!("file handle {} unknown", handle);
return Err(io::Error::from_raw_os_error(libc::EIO)); io::Error::from_raw_os_error(libc::EIO)
} })
}; .cloned()?;
let mut blob_reader = blob_reader
.lock()
.map_err(|_| crate::Error::StorageError("mutex poisoned".into()))?;
let buf = self.tokio_handle.block_on(async move { let buf = self.tokio_handle.block_on(async move {
let mut blob_reader = blob_reader.lock().await;
// seek to the offset specified, which is relative to the start of the file. // seek to the offset specified, which is relative to the start of the file.
let resp = blob_reader.seek(io::SeekFrom::Start(offset)).await; let pos = blob_reader
.seek(io::SeekFrom::Start(offset))
match resp { .await
Ok(pos) => { .map_err(|e| {
debug_assert_eq!(offset, pos);
}
Err(e) => {
warn!("failed to seek to offset {}: {}", offset, e); warn!("failed to seek to offset {}: {}", offset, e);
return Err(io::Error::from_raw_os_error(libc::EIO)); io::Error::from_raw_os_error(libc::EIO)
} })?;
}
debug_assert_eq!(offset, pos);
// As written in the fuse docs, read should send exactly the number // As written in the fuse docs, read should send exactly the number
// of bytes requested except on EOF or error. // of bytes requested except on EOF or error.
@ -613,7 +616,7 @@ where
// copy things from the internal buffer into buf to fill it till up until size // copy things from the internal buffer into buf to fill it till up until size
tokio::io::copy(&mut blob_reader.as_mut().take(size as u64), &mut buf).await?; tokio::io::copy(&mut blob_reader.as_mut().take(size as u64), &mut buf).await?;
Ok(buf) Ok::<_, std::io::Error>(buf)
})?; })?;
w.write(&buf) w.write(&buf)