fix(tvix/store/pathinfo/sled): use spawn_blocking
This does IO, which might take a longer amount of time than what we want to be blocking the normal executor. Use spawn_blocking instead. I didn't add it for the constructors, as we only call these once. Change-Id: I9a1063099bac9582ca9681043c58c1edc780c5ff Reviewed-on: https://cl.tvl.fyi/c/depot/+/11618 Autosubmit: flokli <flokli@flokli.de> Reviewed-by: Connor Brewster <cbrewster@hey.com> Tested-by: BuildkiteCI
This commit is contained in:
parent
717081ae37
commit
944a781354
1 changed files with 45 additions and 22 deletions
|
@ -1,8 +1,8 @@
|
||||||
use super::PathInfoService;
|
use super::PathInfoService;
|
||||||
use crate::nar::calculate_size_and_sha256;
|
use crate::nar::calculate_size_and_sha256;
|
||||||
use crate::proto::PathInfo;
|
use crate::proto::PathInfo;
|
||||||
|
use async_stream::try_stream;
|
||||||
use data_encoding::BASE64;
|
use data_encoding::BASE64;
|
||||||
use futures::stream::iter;
|
|
||||||
use futures::stream::BoxStream;
|
use futures::stream::BoxStream;
|
||||||
use prost::Message;
|
use prost::Message;
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
|
@ -61,10 +61,16 @@ where
|
||||||
{
|
{
|
||||||
#[instrument(level = "trace", skip_all, fields(path_info.digest = BASE64.encode(&digest)))]
|
#[instrument(level = "trace", skip_all, fields(path_info.digest = BASE64.encode(&digest)))]
|
||||||
async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
|
async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
|
||||||
match self.db.get(digest).map_err(|e| {
|
let resp = tokio::task::spawn_blocking({
|
||||||
|
let db = self.db.clone();
|
||||||
|
move || db.get(digest.as_slice())
|
||||||
|
})
|
||||||
|
.await?
|
||||||
|
.map_err(|e| {
|
||||||
warn!("failed to retrieve PathInfo: {}", e);
|
warn!("failed to retrieve PathInfo: {}", e);
|
||||||
Error::StorageError(format!("failed to retrieve PathInfo: {}", e))
|
Error::StorageError(format!("failed to retrieve PathInfo: {}", e))
|
||||||
})? {
|
})?;
|
||||||
|
match resp {
|
||||||
None => Ok(None),
|
None => Ok(None),
|
||||||
Some(data) => {
|
Some(data) => {
|
||||||
let path_info = PathInfo::decode(&*data).map_err(|e| {
|
let path_info = PathInfo::decode(&*data).map_err(|e| {
|
||||||
|
@ -86,14 +92,19 @@ where
|
||||||
// In case the PathInfo is valid, we were able to parse a StorePath.
|
// In case the PathInfo is valid, we were able to parse a StorePath.
|
||||||
// Store it in the database, keyed by its digest.
|
// Store it in the database, keyed by its digest.
|
||||||
// This overwrites existing PathInfo objects.
|
// This overwrites existing PathInfo objects.
|
||||||
self.db
|
tokio::task::spawn_blocking({
|
||||||
.insert(store_path.digest(), path_info.encode_to_vec())
|
let db = self.db.clone();
|
||||||
.map_err(|e| {
|
let k = *store_path.digest();
|
||||||
warn!("failed to insert PathInfo: {}", e);
|
let data = path_info.encode_to_vec();
|
||||||
Error::StorageError(format! {
|
move || db.insert(k, data)
|
||||||
"failed to insert PathInfo: {}", e
|
})
|
||||||
})
|
.await?
|
||||||
})?;
|
.map_err(|e| {
|
||||||
|
warn!("failed to insert PathInfo: {}", e);
|
||||||
|
Error::StorageError(format! {
|
||||||
|
"failed to insert PathInfo: {}", e
|
||||||
|
})
|
||||||
|
})?;
|
||||||
|
|
||||||
Ok(path_info)
|
Ok(path_info)
|
||||||
}
|
}
|
||||||
|
@ -109,17 +120,29 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
|
fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
|
||||||
Box::pin(iter(self.db.iter().values().map(|v| {
|
let db = self.db.clone();
|
||||||
let data = v.map_err(|e| {
|
let mut it = db.iter().values();
|
||||||
warn!("failed to retrieve PathInfo: {}", e);
|
|
||||||
Error::StorageError(format!("failed to retrieve PathInfo: {}", e))
|
|
||||||
})?;
|
|
||||||
|
|
||||||
let path_info = PathInfo::decode(&*data).map_err(|e| {
|
Box::pin(try_stream! {
|
||||||
warn!("failed to decode stored PathInfo: {}", e);
|
// Don't block the executor while waiting for .next(), so wrap that
|
||||||
Error::StorageError(format!("failed to decode stored PathInfo: {}", e))
|
// in a spawn_blocking call.
|
||||||
})?;
|
// We need to pass around it to be able to reuse it.
|
||||||
Ok(path_info)
|
while let (Some(elem), new_it) = tokio::task::spawn_blocking(move || {
|
||||||
})))
|
(it.next(), it)
|
||||||
|
}).await? {
|
||||||
|
it = new_it;
|
||||||
|
let data = elem.map_err(|e| {
|
||||||
|
warn!("failed to retrieve PathInfo: {}", e);
|
||||||
|
Error::StorageError(format!("failed to retrieve PathInfo: {}", e))
|
||||||
|
})?;
|
||||||
|
|
||||||
|
let path_info = PathInfo::decode(&*data).map_err(|e| {
|
||||||
|
warn!("failed to decode stored PathInfo: {}", e);
|
||||||
|
Error::StorageError(format!("failed to decode stored PathInfo: {}", e))
|
||||||
|
})?;
|
||||||
|
|
||||||
|
yield path_info
|
||||||
|
}
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue