diff --git a/tvix/store/src/blobservice/dumb_seeker.rs b/tvix/store/src/blobservice/dumb_seeker.rs index 5548ea0bd..8629394d2 100644 --- a/tvix/store/src/blobservice/dumb_seeker.rs +++ b/tvix/store/src/blobservice/dumb_seeker.rs @@ -80,7 +80,17 @@ impl io::Seek for DumbSeeker { Err(e) => return Err(e), } } - debug_assert_eq!(bytes_to_skip, bytes_skipped); + + // This will fail when seeking past the end of self.r + if bytes_to_skip != bytes_skipped { + return Err(std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + format!( + "tried to skip {} bytes, but only was able to skip {} until reaching EOF", + bytes_to_skip, bytes_skipped + ), + )); + } self.pos = absolute_offset; diff --git a/tvix/store/src/fuse/mod.rs b/tvix/store/src/fuse/mod.rs index 8aa49c499..dd2c479b3 100644 --- a/tvix/store/src/fuse/mod.rs +++ b/tvix/store/src/fuse/mod.rs @@ -6,7 +6,7 @@ mod inodes; mod tests; use crate::{ - blobservice::BlobService, + blobservice::{BlobReader, BlobService}, directoryservice::DirectoryService, fuse::{ file_attr::gen_file_attr, @@ -18,7 +18,7 @@ use crate::{ }; use fuser::{FileAttr, ReplyAttr, Request}; use nix_compat::store_path::StorePath; -use std::io::Read; +use std::io::{self, Read, Seek}; use std::str::FromStr; use std::sync::Arc; use std::{collections::HashMap, time::Duration}; @@ -70,6 +70,11 @@ pub struct FUSE { /// This keeps track of inodes and data alongside them. inode_tracker: InodeTracker, + + /// This holds all open file handles + file_handles: HashMap>, + + next_file_handle: u64, } impl FUSE { @@ -85,6 +90,9 @@ impl FUSE { store_paths: HashMap::default(), inode_tracker: Default::default(), + + file_handles: Default::default(), + next_file_handle: 1, } } @@ -357,21 +365,10 @@ impl fuser::Filesystem for FUSE { } } - /// TODO: implement open + close? - - #[tracing::instrument(skip_all, fields(rq.inode = ino, rq.offset = offset, rq.size = size))] - fn read( - &mut self, - _req: &Request<'_>, - ino: u64, - _fh: u64, - offset: i64, - size: u32, - _flags: i32, - _lock_owner: Option, - reply: fuser::ReplyData, - ) { - debug!("read"); + #[tracing::instrument(skip_all, fields(rq.inode = ino))] + fn open(&mut self, _req: &Request<'_>, ino: u64, _flags: i32, reply: fuser::ReplyOpen) { + // get a new file handle + let fh = self.next_file_handle; if ino == fuser::FUSE_ROOT_ID { reply.error(libc::ENOSYS); @@ -398,26 +395,79 @@ impl fuser::Filesystem for FUSE { reply.error(libc::EIO); } Ok(Some(blob_reader)) => { - let data: std::io::Result> = blob_reader - .bytes() - // TODO: this is obviously terrible. blobreader should implement seek. - .skip(offset.try_into().unwrap()) - .take(size.try_into().unwrap()) - .collect(); + self.file_handles.insert(fh, blob_reader); + reply.opened(fh, 0); - match data { - Ok(data) => { - // respond with the requested data - reply.data(&data); - } - Err(e) => reply.error(e.raw_os_error().unwrap()), - } + // TODO: this will overflow after 2**64 operations, + // which is fine for now. + // See https://cl.tvl.fyi/c/depot/+/8834/comment/a6684ce0_d72469d1 + // for the discussion on alternatives. + self.next_file_handle += 1; } } } } } + #[tracing::instrument(skip_all, fields(rq.inode = ino, fh = fh))] + fn release( + &mut self, + _req: &Request<'_>, + ino: u64, + fh: u64, + _flags: i32, + _lock_owner: Option, + _flush: bool, + reply: fuser::ReplyEmpty, + ) { + // remove and get ownership on the blob reader + let blob_reader = self.file_handles.remove(&fh).unwrap(); + // drop it, which will close it. + drop(blob_reader); + + reply.ok(); + } + + #[tracing::instrument(skip_all, fields(rq.inode = ino, rq.offset = offset, rq.size = size))] + fn read( + &mut self, + _req: &Request<'_>, + ino: u64, + fh: u64, + offset: i64, + size: u32, + _flags: i32, + _lock_owner: Option, + reply: fuser::ReplyData, + ) { + debug!("read"); + + let blob_reader = self.file_handles.get_mut(&fh).unwrap(); + + // seek to the offset specified, which is relative to the start of the file. + let resp = blob_reader.seek(io::SeekFrom::Start(offset as u64)); + match resp { + Ok(pos) => { + debug_assert_eq!(offset as u64, pos); + } + Err(e) => { + warn!("failed to seek to offset {}: {}", offset, e); + reply.error(libc::EIO); + return; + } + } + + // now with the blobreader seeked to this location, read size of data + let data: std::io::Result> = + blob_reader.bytes().take(size.try_into().unwrap()).collect(); + + match data { + // respond with the requested data + Ok(data) => reply.data(&data), + Err(e) => reply.error(e.raw_os_error().unwrap()), + } + } + #[tracing::instrument(skip_all, fields(rq.inode = ino))] fn readlink(&mut self, _req: &Request<'_>, ino: u64, reply: fuser::ReplyData) { if ino == fuser::FUSE_ROOT_ID {