diff --git a/tvix/store/src/pathinfoservice/memory.rs b/tvix/store/src/pathinfoservice/memory.rs index 203611b85..f1bbf67f8 100644 --- a/tvix/store/src/pathinfoservice/memory.rs +++ b/tvix/store/src/pathinfoservice/memory.rs @@ -1,10 +1,9 @@ use super::PathInfoService; use crate::proto::PathInfo; -use futures::stream::{iter, BoxStream}; -use std::{ - collections::HashMap, - sync::{Arc, RwLock}, -}; +use async_stream::try_stream; +use futures::stream::BoxStream; +use std::{collections::HashMap, sync::Arc}; +use tokio::sync::RwLock; use tonic::async_trait; use tvix_castore::Error; @@ -16,7 +15,7 @@ pub struct MemoryPathInfoService { #[async_trait] impl PathInfoService for MemoryPathInfoService { async fn get(&self, digest: [u8; 20]) -> Result, Error> { - let db = self.db.read().unwrap(); + let db = self.db.read().await; match db.get(&digest) { None => Ok(None), @@ -35,7 +34,7 @@ impl PathInfoService for MemoryPathInfoService { // In case the PathInfo is valid, and we were able to extract a NixPath, store it in the database. // This overwrites existing PathInfo objects. Ok(nix_path) => { - let mut db = self.db.write().unwrap(); + let mut db = self.db.write().await; db.insert(*nix_path.digest(), path_info.clone()); Ok(path_info) @@ -44,14 +43,15 @@ impl PathInfoService for MemoryPathInfoService { } fn list(&self) -> BoxStream<'static, Result> { - let db = self.db.read().unwrap(); + let db = self.db.clone(); - // Copy all elements into a list. - // This is a bit ugly, because we can't have db escape the lifetime - // of this function, but elements need to be returned owned anyways, and this in- - // memory impl is only for testing purposes anyways. - let items: Vec<_> = db.iter().map(|(_k, v)| Ok(v.clone())).collect(); + Box::pin(try_stream! { + let db = db.read().await; + let it = db.iter(); - Box::pin(iter(items)) + for (_k, v) in it { + yield v.clone() + } + }) } }