feat(tvix/tools/crunch-v2): init

This is a tool for ingesting subsets of cache.nixos.org into its own flattened castore format.
Currently, produced chunks are not preserved, and this purely serves as a way of measuring
compression/deduplication ratios for various chunking and compression parameters.

Change-Id: I3983af02a66f7837d76874ee0fc8b2fab62ac17e
Reviewed-on: https://cl.tvl.fyi/c/depot/+/10486
Tested-by: BuildkiteCI
Reviewed-by: flokli <flokli@flokli.de>
This commit is contained in:
edef 2024-01-17 16:04:03 +00:00
parent e0a1c03b24
commit 4f22203a3a
12 changed files with 15022 additions and 0 deletions

1
tvix/tools/crunch-v2/.gitignore vendored Normal file
View file

@ -0,0 +1 @@
*.parquet

3041
tvix/tools/crunch-v2/Cargo.lock generated Normal file

File diff suppressed because it is too large Load diff

11252
tvix/tools/crunch-v2/Cargo.nix Normal file

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,38 @@
[package]
name = "crunch-v2"
version = "0.1.0"
edition = "2021"
[workspace]
members = ["."]
[dependencies]
anyhow = { version = "1.0.75", features = ["backtrace"] }
lazy_static = "1.4.0"
bstr = "1.8.0"
bytes = "1.5.0"
futures = "0.3.29"
tokio = { version = "1.34.0", features = ["full"] }
rusoto_core = { version = "0.48.0", default-features = false, features = ["hyper-rustls"] }
rusoto_s3 = { version = "0.48.0", default-features = false, features = ["rustls"] }
nix-compat = { version = "0.1.0", path = "../../nix-compat" }
sled = "0.34.7"
fastcdc = "3.1.0"
blake3 = "1.5.0"
sha2 = { version = "0.10.8", features = ["asm"] }
digest = "0.10.7"
bzip2 = "0.4.4"
xz2 = "0.1.7"
zstd = "0.13.0"
prost = "0.12.2"
polars = { version = "0.35.4", default-features = false, features = ["parquet", "lazy", "sql", "dtype-struct"] }
indicatif = "0.17.7"
[build-dependencies]
prost-build = "0.12.2"

View file

@ -0,0 +1 @@
edef

View file

@ -0,0 +1,6 @@
use std::io::Result;
fn main() -> Result<()> {
prost_build::compile_protos(&["protos/flatstore.proto"], &["protos/"])?;
Ok(())
}

View file

@ -0,0 +1,15 @@
{ pkgs, ... }:
let
crates = import ./Cargo.nix {
inherit pkgs;
nixpkgs = pkgs.path;
defaultCrateOverrides = pkgs.defaultCrateOverrides // {
crunch-v2 = prev: {
nativeBuildInputs = (prev.nativeBuildInputs or [ ]) ++ [ pkgs.buildPackages.protobuf ];
};
};
};
in
crates.rootCrate.build

View file

@ -0,0 +1,38 @@
syntax = "proto3";
package tvix.flatstore.v1;
message Path {
bytes nar_hash = 1;
oneof node {
DirectoryNode directory = 2;
FileNode file = 3;
SymlinkNode symlink = 4;
}
}
message DirectoryNode {
bytes name = 1;
repeated DirectoryNode directories = 2;
repeated FileNode files = 3;
repeated SymlinkNode symlinks = 4;
}
message FileNode {
bytes name = 1;
bytes hash = 2;
repeated Chunk chunks = 3;
bool executable = 4;
}
message Chunk {
bytes hash = 1;
uint32 size = 2;
uint32 size_compressed = 3;
}
message SymlinkNode {
bytes name = 1;
bytes target = 2;
}

View file

@ -0,0 +1,139 @@
//! This tool lossily converts a Sled database produced by crunch-v2 into a Parquet file for analysis.
//! The resulting `crunch.parquet` has columns file_hash`, `nar_hash`, and `chunk`.
//! The first two are SHA-256 hashes of the compressed file and the NAR it decompresses to.
//! `chunk` is a struct array corresponding to [crunch_v2::proto::Chunk] messages.
//! They are concatenated without any additional structure, so nothing but the chunk list is preserved.
use anyhow::Result;
use indicatif::{ProgressBar, ProgressStyle};
use std::fs::File;
use crunch_v2::{
proto::{self, path::Node},
FILES,
};
use prost::Message;
use polars::{
chunked_array::builder::AnonymousOwnedListBuilder,
prelude::{
df, BinaryChunkedBuilder, ChunkedBuilder, DataFrame, DataType, Field, ListBuilderTrait,
NamedFrom, ParquetWriter, PrimitiveChunkedBuilder, Series, UInt32Type,
},
series::IntoSeries,
};
fn main() -> Result<()> {
let w = ParquetWriter::new(File::create("crunch.parquet")?);
let progress = ProgressBar::new(FILES.len() as u64).with_style(ProgressStyle::with_template(
"{elapsed_precise}/{duration_precise} {wide_bar} {pos}/{len}",
)?);
let mut frame = FrameBuilder::new();
for entry in &*FILES {
let (file_hash, pb) = entry?;
frame.push(
file_hash[..].try_into().unwrap(),
proto::Path::decode(&pb[..])?,
);
progress.inc(1);
}
w.finish(&mut frame.finish())?;
Ok(())
}
struct FrameBuilder {
file_hash: BinaryChunkedBuilder,
nar_hash: BinaryChunkedBuilder,
chunk: AnonymousOwnedListBuilder,
}
impl FrameBuilder {
fn new() -> Self {
Self {
file_hash: BinaryChunkedBuilder::new("file_hash", 0, 0),
nar_hash: BinaryChunkedBuilder::new("nar_hash", 0, 0),
chunk: AnonymousOwnedListBuilder::new(
"chunk",
0,
Some(DataType::Struct(vec![
Field::new("hash", DataType::Binary),
Field::new("size", DataType::UInt32),
Field::new("size_compressed", DataType::UInt32),
])),
),
}
}
fn push(&mut self, file_hash: [u8; 32], pb: proto::Path) {
self.file_hash.append_value(&file_hash[..]);
self.nar_hash.append_value(pb.nar_hash);
self.chunk
.append_series(&ChunkFrameBuilder::new(pb.node.unwrap()))
.unwrap();
}
fn finish(mut self) -> DataFrame {
df! {
"file_hash" => self.file_hash.finish().into_series(),
"nar_hash" => self.nar_hash.finish().into_series(),
"chunk" => self.chunk.finish().into_series()
}
.unwrap()
}
}
struct ChunkFrameBuilder {
hash: BinaryChunkedBuilder,
size: PrimitiveChunkedBuilder<UInt32Type>,
size_compressed: PrimitiveChunkedBuilder<UInt32Type>,
}
impl ChunkFrameBuilder {
fn new(node: proto::path::Node) -> Series {
let mut this = Self {
hash: BinaryChunkedBuilder::new("hash", 0, 0),
size: PrimitiveChunkedBuilder::new("size", 0),
size_compressed: PrimitiveChunkedBuilder::new("size_compressed", 0),
};
this.push(node);
this.finish()
}
fn push(&mut self, node: Node) {
match node {
Node::Directory(node) => {
for node in node.files {
self.push(Node::File(node));
}
for node in node.directories {
self.push(Node::Directory(node));
}
}
Node::File(node) => {
for chunk in node.chunks {
self.hash.append_value(&chunk.hash);
self.size.append_value(chunk.size);
self.size_compressed.append_value(chunk.size_compressed);
}
}
Node::Symlink(_) => {}
}
}
fn finish(self) -> Series {
df! {
"hash" => self.hash.finish().into_series(),
"size" => self.size.finish().into_series(),
"size_compressed" => self.size_compressed.finish().into_series()
}
.unwrap()
.into_struct("chunk")
.into_series()
}
}

View file

@ -0,0 +1,10 @@
use lazy_static::lazy_static;
pub mod proto {
include!(concat!(env!("OUT_DIR"), "/tvix.flatstore.v1.rs"));
}
lazy_static! {
static ref DB: sled::Db = sled::open("crunch.db").unwrap();
pub static ref FILES: sled::Tree = DB.open_tree("files").unwrap();
}

View file

@ -0,0 +1,270 @@
//! This is a tool for ingesting subsets of cache.nixos.org into its own flattened castore format.
//! Currently, produced chunks are not preserved, and this purely serves as a way of measuring
//! compression/deduplication ratios for various chunking and compression parameters.
//!
//! NARs to be ingested are read from `ingest.parquet`, and filtered by an SQL expression provided as a program argument.
//! The `file_hash` column should contain SHA-256 hashes of the compressed data, corresponding to the `FileHash` narinfo field.
//! The `compression` column should contain either `"bzip2"` or `"xz"`, corresponding to the `Compression` narinfo field.
//! Additional columns are ignored, but can be used by the SQL filter expression.
//!
//! flatstore protobufs are written to a sled database named `crunch.db`, addressed by file hash.
use crunch_v2::{proto, FILES};
mod remote;
use anyhow::Result;
use futures::{stream, StreamExt, TryStreamExt};
use indicatif::{ProgressBar, ProgressStyle};
use std::{
env,
io::{self, BufRead, Read, Write},
ptr,
};
use polars::{
prelude::{col, LazyFrame, ScanArgsParquet},
sql::sql_expr,
};
use fastcdc::v2020::{ChunkData, StreamCDC};
use nix_compat::nar::reader as nar;
use digest::Digest;
use prost::Message;
use sha2::Sha256;
#[tokio::main]
async fn main() -> Result<()> {
let mut args = env::args();
args.next().unwrap();
let filter = sql_expr(args.next().unwrap())?;
let df = LazyFrame::scan_parquet("ingest.parquet", ScanArgsParquet::default())?
.filter(filter)
.select([col("file_hash"), col("compression")])
.drop_nulls(None)
.collect()?;
let progress = ProgressBar::new(df.height() as u64).with_style(ProgressStyle::with_template(
"{elapsed_precise}/{duration_precise} {wide_bar} {pos}/{len}",
)?);
let file_hash = df
.column("file_hash")?
.binary()?
.into_iter()
.map(|h| -> [u8; 32] { h.unwrap().try_into().unwrap() });
let compression = df
.column("compression")?
.utf8()?
.into_iter()
.map(|c| c.unwrap());
let res = stream::iter(file_hash.zip(compression))
.map(Ok)
.try_for_each_concurrent(Some(16), |(file_hash, compression)| {
let progress = progress.clone();
async move {
if FILES.contains_key(&file_hash)? {
progress.inc(1);
return Ok(());
}
let reader = remote::nar(file_hash, compression).await?;
tokio::task::spawn_blocking(move || {
let mut reader = Sha256Reader::from(reader);
let path = ingest(nar::open(&mut reader)?, vec![]).map(|node| proto::Path {
nar_hash: reader.finalize().as_slice().into(),
node: Some(node),
})?;
FILES.insert(file_hash, path.encode_to_vec())?;
progress.inc(1);
Ok::<_, anyhow::Error>(())
})
.await?
}
})
.await;
let flush = crunch_v2::FILES.flush_async().await;
res?;
flush?;
Ok(())
}
fn ingest(node: nar::Node, name: Vec<u8>) -> Result<proto::path::Node> {
match node {
nar::Node::Symlink { target } => Ok(proto::path::Node::Symlink(proto::SymlinkNode {
name,
target,
})),
nar::Node::Directory(mut reader) => {
let mut directories = vec![];
let mut files = vec![];
let mut symlinks = vec![];
while let Some(node) = reader.next()? {
match ingest(node.node, node.name)? {
proto::path::Node::Directory(node) => {
directories.push(node);
}
proto::path::Node::File(node) => {
files.push(node);
}
proto::path::Node::Symlink(node) => {
symlinks.push(node);
}
}
}
Ok(proto::path::Node::Directory(proto::DirectoryNode {
name,
directories,
files,
symlinks,
}))
}
nar::Node::File { executable, reader } => {
let mut reader = B3Reader::from(reader);
let mut chunks = vec![];
for chunk in StreamCDC::new(&mut reader, 1 << 17, 1 << 18, 1 << 19) {
let ChunkData {
length: size, data, ..
} = chunk?;
let hash = blake3::hash(&data);
let size_compressed = zstd_size(&data, 9);
chunks.push(proto::Chunk {
hash: hash.as_bytes().as_slice().into(),
size: size.try_into().unwrap(),
size_compressed: size_compressed.try_into().unwrap(),
});
}
Ok(proto::path::Node::File(proto::FileNode {
name,
hash: reader.finalize().as_bytes().as_slice().into(),
chunks,
executable,
}))
}
}
}
struct Sha256Reader<R> {
inner: R,
hasher: Sha256,
buf: *const [u8],
}
const ZERO_BUF: *const [u8] = ptr::slice_from_raw_parts(1 as *const u8, 0);
unsafe impl<R: Send> Send for Sha256Reader<R> {}
impl<R> From<R> for Sha256Reader<R> {
fn from(value: R) -> Self {
Self {
inner: value,
hasher: Sha256::new(),
buf: ZERO_BUF,
}
}
}
impl<R: Read> Read for Sha256Reader<R> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.buf = ZERO_BUF;
let n = self.inner.read(buf)?;
self.hasher.update(&buf[..n]);
Ok(n)
}
}
impl<R: BufRead> BufRead for Sha256Reader<R> {
fn fill_buf(&mut self) -> io::Result<&[u8]> {
self.buf = ZERO_BUF;
let buf = self.inner.fill_buf()?;
self.buf = buf as *const [u8];
Ok(buf)
}
fn consume(&mut self, amt: usize) {
// UNSAFETY: This assumes that `R::consume` doesn't invalidate the buffer.
// That's not a sound assumption in general, though it is likely to hold.
// TODO(edef): refactor this codebase to write a fresh NAR for verification purposes
// we already buffer full chunks, so there's no pressing need to reuse the input buffers
unsafe {
let (head, buf) = (*self.buf).split_at(amt);
self.buf = buf as *const [u8];
self.hasher.update(head);
self.inner.consume(amt);
}
}
}
impl<R> Sha256Reader<R> {
fn finalize(self) -> [u8; 32] {
self.hasher.finalize().into()
}
}
struct B3Reader<R> {
inner: R,
hasher: blake3::Hasher,
}
impl<R> From<R> for B3Reader<R> {
fn from(value: R) -> Self {
Self {
inner: value,
hasher: blake3::Hasher::new(),
}
}
}
impl<R: Read> Read for B3Reader<R> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let n = self.inner.read(buf)?;
self.hasher.update(&buf[..n]);
Ok(n)
}
}
impl<R> B3Reader<R> {
fn finalize(self) -> blake3::Hash {
self.hasher.finalize()
}
}
fn zstd_size(data: &[u8], level: i32) -> u64 {
let mut w = zstd::Encoder::new(CountingWriter::default(), level).unwrap();
w.write_all(&data).unwrap();
let CountingWriter(size) = w.finish().unwrap();
size
}
#[derive(Default)]
struct CountingWriter(u64);
impl Write for CountingWriter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.0 += buf.len() as u64;
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}

View file

@ -0,0 +1,211 @@
use std::{
cmp,
io::{self, BufRead, BufReader, Read},
pin::Pin,
task::{self, Poll},
};
use anyhow::{bail, Result};
use bytes::{Buf, Bytes};
use futures::{future::BoxFuture, Future, FutureExt, Stream, StreamExt};
use lazy_static::lazy_static;
use tokio::runtime::Handle;
use nix_compat::nixbase32;
use rusoto_core::{ByteStream, Region};
use rusoto_s3::{GetObjectOutput, GetObjectRequest, S3Client, S3};
use bzip2::read::BzDecoder;
use xz2::read::XzDecoder;
lazy_static! {
static ref S3_CLIENT: S3Client = S3Client::new(Region::UsEast1);
}
const BUCKET: &str = "nix-cache";
pub async fn nar(
file_hash: [u8; 32],
compression: &str,
) -> Result<Box<BufReader<dyn Read + Send>>> {
let (extension, decompress): (&'static str, fn(_) -> Box<_>) = match compression {
"bzip2" => ("bz2", decompress_bz2),
"xz" => ("xz", decompress_xz),
_ => bail!("unknown compression: {compression}"),
};
Ok(decompress(
FileStream::new(FileKey {
file_hash,
extension,
})
.await?
.into(),
))
}
fn decompress_xz(reader: FileStreamReader) -> Box<BufReader<dyn Read + Send>> {
Box::new(BufReader::new(XzDecoder::new(reader)))
}
fn decompress_bz2(reader: FileStreamReader) -> Box<BufReader<dyn Read + Send>> {
Box::new(BufReader::new(BzDecoder::new(reader)))
}
struct FileStreamReader {
inner: FileStream,
buffer: Bytes,
}
impl From<FileStream> for FileStreamReader {
fn from(value: FileStream) -> Self {
FileStreamReader {
inner: value,
buffer: Bytes::new(),
}
}
}
impl Read for FileStreamReader {
fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> {
let src = self.fill_buf()?;
let n = cmp::min(src.len(), dst.len());
dst[..n].copy_from_slice(&src[..n]);
self.consume(n);
Ok(n)
}
}
impl BufRead for FileStreamReader {
fn fill_buf(&mut self) -> io::Result<&[u8]> {
if !self.buffer.is_empty() {
return Ok(&self.buffer);
}
self.buffer = Handle::current()
.block_on(self.inner.next())
.transpose()?
.unwrap_or_default();
Ok(&self.buffer)
}
fn consume(&mut self, cnt: usize) {
self.buffer.advance(cnt);
}
}
struct FileKey {
file_hash: [u8; 32],
extension: &'static str,
}
impl FileKey {
fn get(
&self,
offset: u64,
e_tag: Option<&str>,
) -> impl Future<Output = io::Result<GetObjectOutput>> + Send + 'static {
let input = GetObjectRequest {
bucket: BUCKET.to_string(),
key: format!(
"nar/{}.nar.{}",
nixbase32::encode(&self.file_hash),
self.extension
),
if_match: e_tag.map(str::to_owned),
range: Some(format!("bytes {}-", offset + 1)),
..Default::default()
};
async {
S3_CLIENT
.get_object(input)
.await
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))
}
}
}
struct FileStream {
key: FileKey,
e_tag: String,
offset: u64,
length: u64,
inner: FileStreamState,
}
enum FileStreamState {
Response(BoxFuture<'static, io::Result<GetObjectOutput>>),
Body(ByteStream),
Eof,
}
impl FileStream {
pub async fn new(key: FileKey) -> io::Result<Self> {
let resp = key.get(0, None).await?;
Ok(FileStream {
key,
e_tag: resp.e_tag.unwrap(),
offset: 0,
length: resp.content_length.unwrap().try_into().unwrap(),
inner: FileStreamState::Body(resp.body.unwrap()),
})
}
}
macro_rules! poll {
($expr:expr) => {
match $expr {
Poll::Pending => {
return Poll::Pending;
}
Poll::Ready(value) => value,
}
};
}
impl Stream for FileStream {
type Item = io::Result<Bytes>;
fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
let chunk = loop {
match &mut this.inner {
FileStreamState::Response(resp) => match poll!(resp.poll_unpin(cx)) {
Err(err) => {
this.inner = FileStreamState::Eof;
return Poll::Ready(Some(Err(err)));
}
Ok(resp) => {
this.inner = FileStreamState::Body(resp.body.unwrap());
}
},
FileStreamState::Body(body) => match poll!(body.poll_next_unpin(cx)) {
None | Some(Err(_)) => {
this.inner = FileStreamState::Response(
this.key.get(this.offset, Some(&this.e_tag)).boxed(),
);
}
Some(Ok(chunk)) => {
break chunk;
}
},
FileStreamState::Eof => {
return Poll::Ready(None);
}
}
};
this.offset += chunk.len() as u64;
if this.offset >= this.length {
this.inner = FileStreamState::Eof;
}
Poll::Ready(Some(Ok(chunk)))
}
}