refactor(tvix/castore/import): put invariant checker into a .inspect()
Separate this a bit stronger from the main application flow. Change-Id: I2e9bd3ec47cc6e37256ba6afc6e0586ddc9a051f Reviewed-on: https://cl.tvl.fyi/c/depot/+/11416 Autosubmit: flokli <flokli@flokli.de> Tested-by: BuildkiteCI Reviewed-by: Brian Olsen <me@griff.name> Reviewed-by: Connor Brewster <cbrewster@hey.com>
This commit is contained in:
parent
b70744fda6
commit
c088123d4e
1 changed files with 18 additions and 18 deletions
|
@ -8,7 +8,7 @@ use crate::proto::SymlinkNode;
|
||||||
use crate::Error as CastoreError;
|
use crate::Error as CastoreError;
|
||||||
use async_stream::stream;
|
use async_stream::stream;
|
||||||
use futures::pin_mut;
|
use futures::pin_mut;
|
||||||
use futures::Stream;
|
use futures::{Stream, StreamExt};
|
||||||
use std::fs::FileType;
|
use std::fs::FileType;
|
||||||
use tracing::Level;
|
use tracing::Level;
|
||||||
|
|
||||||
|
@ -21,7 +21,6 @@ use std::{
|
||||||
os::unix::prelude::PermissionsExt,
|
os::unix::prelude::PermissionsExt,
|
||||||
path::{Path, PathBuf},
|
path::{Path, PathBuf},
|
||||||
};
|
};
|
||||||
use tokio_stream::StreamExt;
|
|
||||||
use tracing::instrument;
|
use tracing::instrument;
|
||||||
use walkdir::DirEntry;
|
use walkdir::DirEntry;
|
||||||
use walkdir::WalkDir;
|
use walkdir::WalkDir;
|
||||||
|
@ -201,7 +200,7 @@ impl MerkleInvariantChecker {
|
||||||
pub async fn ingest_entries<'a, BS, DS, S>(
|
pub async fn ingest_entries<'a, BS, DS, S>(
|
||||||
blob_service: BS,
|
blob_service: BS,
|
||||||
directory_service: DS,
|
directory_service: DS,
|
||||||
mut direntry_stream: S,
|
#[allow(unused_mut)] mut direntry_stream: S,
|
||||||
) -> Result<Node, Error>
|
) -> Result<Node, Error>
|
||||||
where
|
where
|
||||||
BS: AsRef<dyn BlobService> + Clone,
|
BS: AsRef<dyn BlobService> + Clone,
|
||||||
|
@ -215,25 +214,26 @@ where
|
||||||
#[cfg(debug_assertions)]
|
#[cfg(debug_assertions)]
|
||||||
let mut invariant_checker: MerkleInvariantChecker = Default::default();
|
let mut invariant_checker: MerkleInvariantChecker = Default::default();
|
||||||
|
|
||||||
|
#[cfg(debug_assertions)]
|
||||||
|
let mut direntry_stream = direntry_stream.inspect(|e| {
|
||||||
|
// If we find an ancestor before we see this entry, this means that the caller
|
||||||
|
// broke the contract, refer to the documentation of the invariant checker to
|
||||||
|
// understand the reasoning here.
|
||||||
|
if let Some(ancestor) = invariant_checker.find_ancestor(e) {
|
||||||
|
panic!(
|
||||||
|
"Tvix bug: merkle invariant checker discovered that {} was processed before {}!",
|
||||||
|
ancestor.display(),
|
||||||
|
e.path().display()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
invariant_checker.see(e);
|
||||||
|
});
|
||||||
|
|
||||||
// We need to process a directory's children before processing
|
// We need to process a directory's children before processing
|
||||||
// the directory itself in order to have all the data needed
|
// the directory itself in order to have all the data needed
|
||||||
// to compute the hash.
|
// to compute the hash.
|
||||||
while let Some(entry) = direntry_stream.next().await {
|
while let Some(entry) = direntry_stream.next().await {
|
||||||
#[cfg(debug_assertions)]
|
|
||||||
{
|
|
||||||
// If we find an ancestor before we see this entry, this means that the caller
|
|
||||||
// broke the contract, refer to the documentation of the invariant checker to
|
|
||||||
// understand the reasoning here.
|
|
||||||
if let Some(ancestor) = invariant_checker.find_ancestor(&entry) {
|
|
||||||
panic!("Tvix bug: merkle invariant checker discovered that {} was processed before {}!",
|
|
||||||
ancestor.display(),
|
|
||||||
entry.path().display()
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
invariant_checker.see(&entry);
|
|
||||||
}
|
|
||||||
|
|
||||||
let file_type = entry.file_type();
|
let file_type = entry.file_type();
|
||||||
|
|
||||||
let node = if file_type.is_dir() {
|
let node = if file_type.is_dir() {
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue