feat(tvix/tools/weave): init

Scalable tracing GC for the cache.nixos.org dataset.

Change-Id: I6c7852796f28e1a1c7607384ffb55f44407e1185
Reviewed-on: https://cl.tvl.fyi/c/depot/+/10765
Tested-by: BuildkiteCI
Reviewed-by: flokli <flokli@flokli.de>
This commit is contained in:
edef 2024-02-15 22:20:47 +00:00
parent 692f2bfb1c
commit e3860689ba
9 changed files with 11471 additions and 0 deletions

2179
tvix/tools/weave/Cargo.lock generated Normal file

File diff suppressed because it is too large Load diff

8809
tvix/tools/weave/Cargo.nix Normal file

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,20 @@
[package]
name = "weave"
version = "0.1.0"
edition = "2021"
[workspace]
members = ["."]
# TODO(edef): cut down on required features, this is kind of a grab bag right now
[dependencies]
anyhow = { version = "1.0.79", features = ["backtrace"] }
hashbrown = "0.14.3"
nix-compat = { version = "0.1.0", path = "../../nix-compat" }
owning_ref = "0.4.1"
rayon = "1.8.1"
tokio = { version = "1.36.0", features = ["full"] }
[dependencies.polars]
version = "0.36.2"
features = ["parquet"]

1
tvix/tools/weave/OWNERS Normal file
View file

@ -0,0 +1 @@
edef

View file

@ -0,0 +1,3 @@
{ pkgs, ... }:
(pkgs.callPackage ./Cargo.nix { }).rootCrate.build

View file

@ -0,0 +1,114 @@
//! Swizzle reads a `narinfo.parquet` file, usually produced by `narinfo2parquet`.
//!
//! It swizzles the reference list, ie it converts the references from absolute,
//! global identifiers (store path hashes) to indices into the `store_path_hash`
//! column (ie, row numbers), so that we can later walk the reference graph
//! efficiently.
//!
//! Path hashes are represented as non-null, 20-byte `Binary` values.
//! The indices are represented as 32-bit unsigned integers, with in-band nulls
//! represented by [INDEX_NULL] (the all-1 bit pattern), to permit swizzling
//! partial datasets.
//!
//! In essence, it converts from names to pointers, so that `weave` can simply
//! chase pointers to trace the live set. This replaces an `O(log(n))` lookup
//! with `O(1)` indexing, and produces a much denser representation that actually
//! fits in memory.
//!
//! The in-memory representation is at least 80% smaller, and the indices compress
//! well in Parquet due to both temporal locality of reference and the power law
//! distribution of reference "popularity".
//!
//! Only two columns are read from `narinfo.parquet`:
//!
//! * `store_path_hash :: PathHash`
//! * `references :: List[PathHash]`
//!
//! Output is written to `narinfo-references.parquet` in the form of a single
//! `List[u32]` column, `reference_idxs`.
//!
//! This file is inherently bound to the corresponding `narinfo.parquet`,
//! since it essentially contains pointers into this file.
use anyhow::Result;
use hashbrown::HashTable;
use polars::prelude::*;
use rayon::prelude::*;
use std::fs::File;
use tokio::runtime::Runtime;
use weave::{as_fixed_binary, hash64, load_ph_array, DONE, INDEX_NULL};
fn main() -> Result<()> {
let ph_array = load_ph_array()?;
// TODO(edef): re-parallelise this
// We originally parallelised on chunks, but ph_array is only a single chunk, due to how Parquet loading works.
// TODO(edef): outline the 64-bit hash prefix? it's an indirection, but it saves ~2G of memory
eprint!("… build index\r");
let ph_map: HashTable<(u64, u32)> = {
let mut ph_map = HashTable::with_capacity(ph_array.len());
for (offset, item) in ph_array.iter().enumerate() {
let offset = offset as u32;
let hash = hash64(item);
ph_map.insert_unique(hash, (hash, offset), |&(hash, _)| hash);
}
ph_map
};
eprintln!("{DONE}");
eprint!("… swizzle references\r");
let mut pq = ParquetReader::new(File::open("narinfo.parquet")?)
.with_columns(Some(vec!["references".into()]))
.batched(1 << 16)?;
let mut reference_idxs =
Series::new_empty("reference_idxs", &DataType::List(DataType::UInt32.into()));
let mut bounce = vec![];
let runtime = Runtime::new()?;
while let Some(batches) = runtime.block_on(pq.next_batches(48))? {
batches
.into_par_iter()
.map(|df| -> ListChunked {
df.column("references")
.unwrap()
.list()
.unwrap()
.apply_to_inner(&|series: Series| -> PolarsResult<Series> {
let series = series.binary()?;
let mut out: Vec<u32> = Vec::with_capacity(series.len());
out.extend(as_fixed_binary::<20>(series).flat_map(|xs| xs).map(|key| {
let hash = hash64(&key);
ph_map
.find(hash, |&(candidate_hash, candidate_index)| {
candidate_hash == hash
&& &ph_array[candidate_index as usize] == key
})
.map(|&(_, index)| index)
.unwrap_or(INDEX_NULL)
}));
Ok(Series::from_vec("reference_idxs", out))
})
.unwrap()
})
.collect_into_vec(&mut bounce);
for batch in bounce.drain(..) {
reference_idxs.append(&batch.into_series())?;
}
}
eprintln!("{DONE}");
eprint!("… writing output\r");
ParquetWriter::new(File::create("narinfo-references.parquet")?).finish(&mut df! {
"reference_idxs" => reference_idxs,
}?)?;
eprintln!("{DONE}");
Ok(())
}

View file

@ -0,0 +1,27 @@
use owning_ref::{OwningRef, StableAddress};
use polars::export::arrow::buffer::Buffer;
use std::ops::Deref;
/// An shared `[[u8; N]]` backed by a Polars [Buffer].
pub type FixedBytes<const N: usize> = OwningRef<Bytes, [[u8; N]]>;
/// Wrapper struct to make [Buffer] implement [StableAddress].
/// TODO(edef): upstream the `impl`
pub struct Bytes(pub Buffer<u8>);
/// SAFETY: [Buffer] is always an Arc+Vec indirection.
unsafe impl StableAddress for Bytes {}
impl Bytes {
pub fn map<U: ?Sized>(self, f: impl FnOnce(&[u8]) -> &U) -> OwningRef<Self, U> {
OwningRef::new(self).map(f)
}
}
impl Deref for Bytes {
type Target = [u8];
fn deref(&self) -> &Self::Target {
&*self.0
}
}

102
tvix/tools/weave/src/lib.rs Normal file
View file

@ -0,0 +1,102 @@
use anyhow::Result;
use rayon::prelude::*;
use std::{fs::File, slice};
use polars::{
datatypes::BinaryChunked,
export::arrow::array::BinaryArray,
prelude::{ParquetReader, SerReader},
};
pub use crate::bytes::*;
mod bytes;
pub const INDEX_NULL: u32 = !0;
pub const DONE: &str = "\u{2714}";
/// A terrific hash function, turning 20 bytes of cryptographic hash
/// into 8 bytes of cryptographic hash.
pub fn hash64(h: &[u8; 20]) -> u64 {
let mut buf = [0; 8];
buf.copy_from_slice(&h[..8]);
u64::from_ne_bytes(buf)
}
/// Read a dense `store_path_hash` array from `narinfo.parquet`,
/// returning it as an owned [FixedBytes].
pub fn load_ph_array() -> Result<FixedBytes<20>> {
eprint!("… load store_path_hash\r");
// TODO(edef): this could use a further pushdown, since polars is more hindrance than help here
// We know this has to fit in memory (we can't mmap it without further encoding constraints),
// and we want a single `Vec<[u8; 20]>` of the data.
let ph_array = into_fixed_binary_rechunk::<20>(
ParquetReader::new(File::open("narinfo.parquet").unwrap())
.with_columns(Some(vec!["store_path_hash".into()]))
.set_rechunk(true)
.finish()?
.column("store_path_hash")?
.binary()?,
);
u32::try_from(ph_array.len()).expect("dataset exceeds 2^32");
eprintln!("{DONE}");
Ok(ph_array)
}
/// Iterator over `&[[u8; N]]` from a dense [BinaryChunked].
pub fn as_fixed_binary<const N: usize>(
chunked: &BinaryChunked,
) -> impl Iterator<Item = &[[u8; N]]> + DoubleEndedIterator {
chunked.downcast_iter().map(|array| {
assert_fixed_dense::<N>(array);
exact_chunks(array.values()).unwrap()
})
}
/// Convert a dense [BinaryChunked] into a single chunk as [FixedBytes],
/// without taking a reference to the offsets array and validity bitmap.
fn into_fixed_binary_rechunk<const N: usize>(chunked: &BinaryChunked) -> FixedBytes<N> {
let chunked = chunked.rechunk();
let mut iter = chunked.downcast_iter();
let array = iter.next().unwrap();
assert_fixed_dense::<N>(array);
Bytes(array.values().clone()).map(|buf| exact_chunks(buf).unwrap())
}
/// Ensures that the supplied Arrow array consists of densely packed bytestrings of length `N`.
/// In other words, ensure that it is free of nulls, and that the offsets have a fixed stride of `N`.
fn assert_fixed_dense<const N: usize>(array: &BinaryArray<i64>) {
let null_count = array.validity().map_or(0, |bits| bits.unset_bits());
if null_count > 0 {
panic!("null values present");
}
let length_check = array
.offsets()
.as_slice()
.par_windows(2)
.all(|w| (w[1] - w[0]) == N as i64);
if !length_check {
panic!("lengths are inconsistent");
}
}
fn exact_chunks<const K: usize>(buf: &[u8]) -> Option<&[[u8; K]]> {
// SAFETY: We ensure that `buf.len()` is a multiple of K, and there are no alignment requirements.
unsafe {
let ptr = buf.as_ptr();
let len = buf.len();
if len % K != 0 {
return None;
}
let ptr = ptr as *mut [u8; K];
let len = len / K;
Some(slice::from_raw_parts(ptr, len))
}
}

View file

@ -0,0 +1,216 @@
//! Weave resolves a list of roots from `nixpkgs.roots` against `narinfo.parquet`,
//! and then uses the reference graph from the accompanying `narinfo-references.parquet`
//! produced by `swizzle` to collect the closure of the roots.
//!
//! They are written to `live_idxs.parquet`, which only has one column, representing
//! the row numbers in `narinfo.parquet` corresponding to live paths.
use anyhow::Result;
use hashbrown::{hash_table, HashTable};
use nix_compat::nixbase32;
use rayon::prelude::*;
use std::{
collections::{BTreeMap, HashSet},
fs::{self, File},
ops::Index,
sync::atomic::{AtomicU32, Ordering},
};
use polars::{
datatypes::StaticArray,
export::arrow::{array::UInt32Array, offset::OffsetsBuffer},
prelude::*,
};
use weave::{hash64, DONE, INDEX_NULL};
fn main() -> Result<()> {
eprint!("… parse roots\r");
let roots: PathSet32 = {
let mut roots = Vec::new();
fs::read("nixpkgs.roots")?
.par_chunks_exact(32 + 1)
.map(|e| nixbase32::decode_fixed::<20>(&e[0..32]).unwrap())
.collect_into_vec(&mut roots);
roots.iter().collect()
};
eprintln!("{DONE}");
{
let ph_array = weave::load_ph_array()?;
eprint!("… resolve roots\r");
ph_array.par_iter().enumerate().for_each(|(idx, h)| {
if let Some(idx_slot) = roots.find(h) {
idx_slot
.compare_exchange(INDEX_NULL, idx as u32, Ordering::SeqCst, Ordering::SeqCst)
.expect("duplicate entry");
}
});
eprintln!("{DONE}");
}
let mut todo = HashSet::with_capacity(roots.len());
{
let mut unknown_roots = 0usize;
for (_, idx) in roots.table {
let idx = idx.into_inner();
if idx == INDEX_NULL {
unknown_roots += 1;
continue;
}
todo.insert(idx);
}
println!("skipping {unknown_roots} unknown roots");
}
eprint!("… load reference_idxs\r");
let ri_array = ParquetReader::new(File::open("narinfo-references.parquet")?)
.finish()?
.column("reference_idxs")?
.list()?
.clone();
let ri_array = {
ChunkedList::new(ri_array.downcast_iter().map(|chunk| {
(
chunk.offsets(),
chunk
.values()
.as_any()
.downcast_ref::<UInt32Array>()
.unwrap()
.as_slice()
.unwrap(),
)
}))
};
eprintln!("{DONE}");
let mut seen = todo.clone();
while !todo.is_empty() {
println!("todo: {} seen: {}", todo.len(), seen.len());
todo = todo
.par_iter()
.flat_map(|&parent| {
if parent == INDEX_NULL {
return vec![];
}
ri_array[parent as usize]
.iter()
.cloned()
.filter(|child| !seen.contains(child))
.collect::<Vec<u32>>()
})
.collect();
for &index in &todo {
seen.insert(index);
}
}
println!("done: {} paths", seen.len());
if seen.remove(&INDEX_NULL) {
println!("WARNING: missing edges");
}
eprint!("… gathering live set\r");
let mut seen: Vec<u32> = seen.into_iter().collect();
seen.par_sort();
eprintln!("{DONE}");
eprint!("… writing output\r");
ParquetWriter::new(File::create("live_idxs.parquet")?).finish(&mut df! {
"live_idx" => seen,
}?)?;
eprintln!("{DONE}");
Ok(())
}
struct PathSet32 {
table: HashTable<([u8; 20], AtomicU32)>,
}
impl PathSet32 {
fn with_capacity(capacity: usize) -> Self {
Self {
table: HashTable::with_capacity(capacity),
}
}
fn insert(&mut self, value: &[u8; 20]) -> bool {
let hash = hash64(value);
match self
.table
.entry(hash, |(x, _)| x == value, |(x, _)| hash64(x))
{
hash_table::Entry::Occupied(_) => false,
hash_table::Entry::Vacant(entry) => {
entry.insert((*value, AtomicU32::new(INDEX_NULL)));
true
}
}
}
fn find(&self, value: &[u8; 20]) -> Option<&AtomicU32> {
let hash = hash64(value);
self.table
.find(hash, |(x, _)| x == value)
.as_ref()
.map(|(_, x)| x)
}
fn len(&self) -> usize {
self.table.len()
}
}
impl<'a> FromIterator<&'a [u8; 20]> for PathSet32 {
fn from_iter<T: IntoIterator<Item = &'a [u8; 20]>>(iter: T) -> Self {
let iter = iter.into_iter();
let mut this = Self::with_capacity(iter.size_hint().0);
for item in iter {
this.insert(item);
}
this
}
}
struct ChunkedList<'a, T> {
by_offset: BTreeMap<usize, (&'a OffsetsBuffer<i64>, &'a [T])>,
}
impl<'a, T> ChunkedList<'a, T> {
fn new(chunks: impl IntoIterator<Item = (&'a OffsetsBuffer<i64>, &'a [T])>) -> Self {
let mut next_offset = 0usize;
ChunkedList {
by_offset: chunks
.into_iter()
.map(|(offsets, values)| {
let offset = next_offset;
next_offset = next_offset.checked_add(offsets.len_proxy()).unwrap();
(offset, (offsets, values))
})
.collect(),
}
}
}
impl<'a, T> Index<usize> for ChunkedList<'a, T> {
type Output = [T];
fn index(&self, index: usize) -> &Self::Output {
let (&base, &(offsets, values)) = self.by_offset.range(..=index).next_back().unwrap();
let (start, end) = offsets.start_end(index - base);
&values[start..end]
}
}