refactor(tvix/store/pathinfo/memory): tokio RwLock, improve list()

We don't want to use the std::sync::RwLock here, as it blocks.

This also means we don't need to deal with the error cases anymore.

The list() implementation is updated to use try_stream, which means we
can now avoid collecting everything into a Vec before returning from it.

Change-Id: I9057dcc410dc553e6b1be3f20d5ee830569e8218
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11611
Reviewed-by: Connor Brewster <cbrewster@hey.com>
Autosubmit: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
This commit is contained in:
Florian Klink 2024-05-10 08:24:24 +03:00 committed by clbot
parent 11850249c8
commit 96b8b1a205

View file

@ -1,10 +1,9 @@
use super::PathInfoService; use super::PathInfoService;
use crate::proto::PathInfo; use crate::proto::PathInfo;
use futures::stream::{iter, BoxStream}; use async_stream::try_stream;
use std::{ use futures::stream::BoxStream;
collections::HashMap, use std::{collections::HashMap, sync::Arc};
sync::{Arc, RwLock}, use tokio::sync::RwLock;
};
use tonic::async_trait; use tonic::async_trait;
use tvix_castore::Error; use tvix_castore::Error;
@ -16,7 +15,7 @@ pub struct MemoryPathInfoService {
#[async_trait] #[async_trait]
impl PathInfoService for MemoryPathInfoService { impl PathInfoService for MemoryPathInfoService {
async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> { async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
let db = self.db.read().unwrap(); let db = self.db.read().await;
match db.get(&digest) { match db.get(&digest) {
None => Ok(None), None => Ok(None),
@ -35,7 +34,7 @@ impl PathInfoService for MemoryPathInfoService {
// In case the PathInfo is valid, and we were able to extract a NixPath, store it in the database. // In case the PathInfo is valid, and we were able to extract a NixPath, store it in the database.
// This overwrites existing PathInfo objects. // This overwrites existing PathInfo objects.
Ok(nix_path) => { Ok(nix_path) => {
let mut db = self.db.write().unwrap(); let mut db = self.db.write().await;
db.insert(*nix_path.digest(), path_info.clone()); db.insert(*nix_path.digest(), path_info.clone());
Ok(path_info) Ok(path_info)
@ -44,14 +43,15 @@ impl PathInfoService for MemoryPathInfoService {
} }
fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> { fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
let db = self.db.read().unwrap(); let db = self.db.clone();
// Copy all elements into a list. Box::pin(try_stream! {
// This is a bit ugly, because we can't have db escape the lifetime let db = db.read().await;
// of this function, but elements need to be returned owned anyways, and this in- let it = db.iter();
// memory impl is only for testing purposes anyways.
let items: Vec<_> = db.iter().map(|(_k, v)| Ok(v.clone())).collect();
Box::pin(iter(items)) for (_k, v) in it {
yield v.clone()
}
})
} }
} }