From 313899c291f0295506c275418e570b39b4a5f079 Mon Sep 17 00:00:00 2001 From: edef Date: Thu, 17 Oct 2024 13:26:01 +0000 Subject: [PATCH] refactor(users/edef/weave/swizzle): use polars streaming This vastly reduces the memory requirements, so we can run in ~40G RAM. Change-Id: I4952a780df294bd852a8b4682ba2fd59b9bae675 Reviewed-on: https://cl.tvl.fyi/c/depot/+/12667 Reviewed-by: flokli Tested-by: BuildkiteCI --- users/edef/weave/Cargo.lock | 12 ---- users/edef/weave/Cargo.nix | 48 ++------------ users/edef/weave/Cargo.toml | 3 +- users/edef/weave/src/bin/swizzle.rs | 99 +++++++++++++---------------- users/edef/weave/src/lib.rs | 20 +++++- 5 files changed, 70 insertions(+), 112 deletions(-) diff --git a/users/edef/weave/Cargo.lock b/users/edef/weave/Cargo.lock index aaa77014f..191059ffd 100644 --- a/users/edef/weave/Cargo.lock +++ b/users/edef/weave/Cargo.lock @@ -1653,15 +1653,6 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" -[[package]] -name = "signal-hook-registry" -version = "1.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9e9e0b4211b72e7b8b6e85c807d36c212bdb33ea8587f7569562a84df5465b1" -dependencies = [ - "libc", -] - [[package]] name = "signature" version = "2.2.0" @@ -1881,9 +1872,7 @@ dependencies = [ "bytes", "libc", "mio", - "parking_lot", "pin-project-lite", - "signal-hook-registry", "socket2", "tokio-macros", "windows-sys", @@ -2039,7 +2028,6 @@ dependencies = [ "polars", "rayon", "safer_owning_ref", - "tokio", ] [[package]] diff --git a/users/edef/weave/Cargo.nix b/users/edef/weave/Cargo.nix index a06bae091..c34a03b68 100644 --- a/users/edef/weave/Cargo.nix +++ b/users/edef/weave/Cargo.nix @@ -3395,7 +3395,7 @@ rec { "unique_counts" = [ "polars-ops/unique_counts" "polars-lazy?/unique_counts" ]; "zip_with" = [ "polars-core/zip_with" ]; }; - resolvedDefaultFeatures = [ "csv" "default" "docs" "dtype-date" "dtype-datetime" "dtype-duration" "dtype-slim" "fmt" "parquet" "polars-io" "polars-ops" "polars-time" "temporal" "zip_with" ]; + resolvedDefaultFeatures = [ "csv" "default" "docs" "dtype-date" "dtype-datetime" "dtype-duration" "dtype-slim" "fmt" "lazy" "parquet" "polars-io" "polars-lazy" "polars-ops" "polars-time" "streaming" "temporal" "zip_with" ]; }; "polars-arrow" = rec { crateName = "polars-arrow"; @@ -4204,7 +4204,7 @@ rec { "true_div" = [ "polars-plan/true_div" ]; "unique_counts" = [ "polars-plan/unique_counts" ]; }; - resolvedDefaultFeatures = [ "abs" "cross_join" "csv" "cum_agg" "dtype-date" "dtype-datetime" "dtype-duration" "dtype-i16" "dtype-i8" "dtype-time" "is_in" "log" "meta" "parquet" "polars-time" "regex" "round_series" "strings" "temporal" "trigonometry" ]; + resolvedDefaultFeatures = [ "abs" "chunked_ids" "cross_join" "csv" "cum_agg" "dtype-date" "dtype-datetime" "dtype-duration" "dtype-i16" "dtype-i8" "dtype-time" "is_in" "log" "meta" "parquet" "polars-pipe" "polars-time" "regex" "round_series" "streaming" "strings" "temporal" "trigonometry" ]; }; "polars-ops" = rec { crateName = "polars-ops"; @@ -4356,7 +4356,7 @@ rec { "timezones" = [ "chrono-tz" "chrono" ]; "unicode-reverse" = [ "dep:unicode-reverse" ]; }; - resolvedDefaultFeatures = [ "abs" "cross_join" "cum_agg" "dtype-date" "dtype-datetime" "dtype-duration" "is_in" "log" "round_series" "search_sorted" "strings" ]; + resolvedDefaultFeatures = [ "abs" "chunked_ids" "cross_join" "cum_agg" "dtype-date" "dtype-datetime" "dtype-duration" "is_in" "log" "round_series" "search_sorted" "strings" ]; }; "polars-parquet" = rec { crateName = "polars-parquet"; @@ -4780,7 +4780,7 @@ rec { "top_k" = [ "polars-ops/top_k" ]; "unique_counts" = [ "polars-ops/unique_counts" ]; }; - resolvedDefaultFeatures = [ "abs" "cross_join" "csv" "cum_agg" "dtype-date" "dtype-datetime" "dtype-duration" "dtype-i16" "dtype-i8" "dtype-time" "is_in" "log" "meta" "parquet" "polars-parquet" "polars-time" "regex" "round_series" "strings" "temporal" "trigonometry" ]; + resolvedDefaultFeatures = [ "abs" "chunked_ids" "cross_join" "csv" "cum_agg" "dtype-date" "dtype-datetime" "dtype-duration" "dtype-i16" "dtype-i8" "dtype-time" "is_in" "log" "meta" "parquet" "polars-parquet" "polars-time" "regex" "round_series" "streaming" "strings" "temporal" "trigonometry" ]; }; "polars-row" = rec { crateName = "polars-row"; @@ -5709,24 +5709,6 @@ rec { }; resolvedDefaultFeatures = [ "default" "std" ]; }; - "signal-hook-registry" = rec { - crateName = "signal-hook-registry"; - version = "1.4.2"; - edition = "2015"; - sha256 = "1cb5akgq8ajnd5spyn587srvs4n26ryq0p78nswffwhv46sf1sd9"; - libName = "signal_hook_registry"; - authors = [ - "Michal 'vorner' Vaner " - "Masaki Hara " - ]; - dependencies = [ - { - name = "libc"; - packageId = "libc"; - } - ]; - - }; "signature" = rec { crateName = "signature"; version = "2.2.0"; @@ -6282,21 +6264,10 @@ rec { optional = true; usesDefaultFeatures = false; } - { - name = "parking_lot"; - packageId = "parking_lot"; - optional = true; - } { name = "pin-project-lite"; packageId = "pin-project-lite"; } - { - name = "signal-hook-registry"; - packageId = "signal-hook-registry"; - optional = true; - target = { target, features }: (target."unix" or false); - } { name = "socket2"; packageId = "socket2"; @@ -6353,7 +6324,7 @@ rec { "tracing" = [ "dep:tracing" ]; "windows-sys" = [ "dep:windows-sys" ]; }; - resolvedDefaultFeatures = [ "bytes" "default" "fs" "full" "io-std" "io-util" "libc" "macros" "mio" "net" "parking_lot" "process" "rt" "rt-multi-thread" "signal" "signal-hook-registry" "socket2" "sync" "time" "tokio-macros" "windows-sys" ]; + resolvedDefaultFeatures = [ "bytes" "default" "io-util" "libc" "macros" "mio" "net" "rt" "rt-multi-thread" "socket2" "sync" "time" "tokio-macros" "windows-sys" ]; }; "tokio-macros" = rec { crateName = "tokio-macros"; @@ -6789,7 +6760,7 @@ rec { { name = "polars"; packageId = "polars"; - features = [ "parquet" ]; + features = [ "parquet" "lazy" "streaming" ]; } { name = "rayon"; @@ -6799,11 +6770,6 @@ rec { name = "safer_owning_ref"; packageId = "safer_owning_ref"; } - { - name = "tokio"; - packageId = "tokio"; - features = [ "full" ]; - } ]; }; @@ -7799,7 +7765,7 @@ rec { "Win32_Web" = [ "Win32" ]; "Win32_Web_InternetExplorer" = [ "Win32_Web" ]; }; - resolvedDefaultFeatures = [ "Wdk" "Wdk_Foundation" "Wdk_Storage" "Wdk_Storage_FileSystem" "Wdk_System" "Wdk_System_IO" "Win32" "Win32_Foundation" "Win32_Networking" "Win32_Networking_WinSock" "Win32_Security" "Win32_Storage" "Win32_Storage_FileSystem" "Win32_System" "Win32_System_Com" "Win32_System_Console" "Win32_System_IO" "Win32_System_Pipes" "Win32_System_SystemServices" "Win32_System_Threading" "Win32_System_WindowsProgramming" "Win32_UI" "Win32_UI_Shell" "default" ]; + resolvedDefaultFeatures = [ "Wdk" "Wdk_Foundation" "Wdk_Storage" "Wdk_Storage_FileSystem" "Wdk_System" "Wdk_System_IO" "Win32" "Win32_Foundation" "Win32_Networking" "Win32_Networking_WinSock" "Win32_Security" "Win32_Storage" "Win32_Storage_FileSystem" "Win32_System" "Win32_System_Com" "Win32_System_IO" "Win32_System_Pipes" "Win32_System_SystemServices" "Win32_System_Threading" "Win32_System_WindowsProgramming" "Win32_UI" "Win32_UI_Shell" "default" ]; }; "windows-targets" = rec { crateName = "windows-targets"; diff --git a/users/edef/weave/Cargo.toml b/users/edef/weave/Cargo.toml index 69cf3cf7e..72a205f66 100644 --- a/users/edef/weave/Cargo.toml +++ b/users/edef/weave/Cargo.toml @@ -13,8 +13,7 @@ hashbrown = "0.14.3" nix-compat = { version = "0.1.0", path = "../../../tvix/nix-compat" } safer_owning_ref = "0.5.0" rayon = "1.8.1" -tokio = { version = "1.36.0", features = ["full"] } [dependencies.polars] version = "0.36.2" -features = ["parquet"] +features = ["parquet", "lazy", "streaming"] diff --git a/users/edef/weave/src/bin/swizzle.rs b/users/edef/weave/src/bin/swizzle.rs index 68c185812..bcea3edfa 100644 --- a/users/edef/weave/src/bin/swizzle.rs +++ b/users/edef/weave/src/bin/swizzle.rs @@ -32,21 +32,21 @@ use anyhow::Result; use hashbrown::HashTable; -use polars::prelude::*; -use rayon::prelude::*; -use std::fs::File; -use tokio::runtime::Runtime; +use polars::{ + lazy::dsl::{col, SpecialEq}, + prelude::*, +}; -use weave::{as_fixed_binary, hash64, load_ph_array, DONE, INDEX_NULL}; +use weave::{as_fixed_binary, hash64, leak, load_ph_array, DONE, INDEX_NULL}; fn main() -> Result<()> { - let ph_array = load_ph_array()?; + let ph_array: &'static [[u8; 20]] = leak(load_ph_array()?); // TODO(edef): re-parallelise this // We originally parallelised on chunks, but ph_array is only a single chunk, due to how Parquet loading works. // TODO(edef): outline the 64-bit hash prefix? it's an indirection, but it saves ~2G of memory eprint!("… build index\r"); - let ph_map: HashTable<(u64, u32)> = { + let ph_map: &'static HashTable<(u64, u32)> = { let mut ph_map = HashTable::with_capacity(ph_array.len()); for (offset, item) in ph_array.iter().enumerate() { @@ -55,59 +55,48 @@ fn main() -> Result<()> { ph_map.insert_unique(hash, (hash, offset), |&(hash, _)| hash); } - ph_map + &*Box::leak(Box::new(ph_map)) }; eprintln!("{DONE}"); - eprint!("… swizzle references\r"); - let mut pq = ParquetReader::new(File::open("narinfo.parquet")?) - .with_columns(Some(vec!["references".into()])) - .batched(1 << 16)?; - - let mut reference_idxs = - Series::new_empty("reference_idxs", &DataType::List(DataType::UInt32.into())); - - let mut bounce = vec![]; - let runtime = Runtime::new()?; - while let Some(batches) = runtime.block_on(pq.next_batches(48))? { - batches - .into_par_iter() - .map(|df| -> ListChunked { - df.column("references") - .unwrap() - .list() - .unwrap() - .apply_to_inner(&|series: Series| -> PolarsResult { - let series = series.binary()?; - let mut out: Vec = Vec::with_capacity(series.len()); - - out.extend(as_fixed_binary::<20>(series).flat_map(|xs| xs).map(|key| { - let hash = hash64(&key); - ph_map - .find(hash, |&(candidate_hash, candidate_index)| { - candidate_hash == hash - && &ph_array[candidate_index as usize] == key - }) - .map(|&(_, index)| index) - .unwrap_or(INDEX_NULL) - })); - - Ok(Series::from_vec("reference_idxs", out)) - }) - .unwrap() + let ph_to_idx = |key: &[u8; 20]| -> u32 { + let hash = hash64(key); + ph_map + .find(hash, |&(candidate_hash, candidate_index)| { + candidate_hash == hash && &ph_array[candidate_index as usize] == key }) - .collect_into_vec(&mut bounce); + .map(|&(_, index)| index) + .unwrap_or(INDEX_NULL) + }; - for batch in bounce.drain(..) { - reference_idxs.append(&batch.into_series())?; - } - } - eprintln!("{DONE}"); - - eprint!("… writing output\r"); - ParquetWriter::new(File::create("narinfo-references.parquet")?).finish(&mut df! { - "reference_idxs" => reference_idxs, - }?)?; + eprint!("… swizzle references\r"); + LazyFrame::scan_parquet("narinfo.parquet", ScanArgsParquet::default())? + .with_column( + col("references") + .map( + move |series: Series| -> PolarsResult> { + Ok(Some( + series + .list()? + .apply_to_inner(&|series: Series| -> PolarsResult { + let series = series.binary()?; + let mut out: Vec = Vec::with_capacity(series.len()); + out.extend(as_fixed_binary(series).flatten().map(ph_to_idx)); + Ok(Series::from_vec("reference_idxs", out)) + })? + .into_series(), + )) + }, + SpecialEq::from_type(DataType::List(DataType::UInt32.into())), + ) + .alias("reference_idxs"), + ) + .select([col("reference_idxs")]) + .with_streaming(true) + .sink_parquet( + "narinfo-references.parquet".into(), + ParquetWriteOptions::default(), + )?; eprintln!("{DONE}"); Ok(()) diff --git a/users/edef/weave/src/lib.rs b/users/edef/weave/src/lib.rs index db3d07e7d..4ccd566ca 100644 --- a/users/edef/weave/src/lib.rs +++ b/users/edef/weave/src/lib.rs @@ -1,7 +1,13 @@ use anyhow::Result; -use owning_ref::ArcRef; +use owning_ref::{ArcRef, OwningRef}; use rayon::prelude::*; -use std::{fs::File, ops::Range, slice}; +use std::{ + fs::File, + mem, + ops::{Deref, Range}, + slice, + sync::Arc, +}; use polars::{ datatypes::BinaryChunked, @@ -24,6 +30,16 @@ pub fn hash64(h: &[u8; 20]) -> u64 { u64::from_ne_bytes(buf) } +pub fn leak(r: OwningRef, T>) -> &T { + // SAFETY: Either `ptr` points into the `Arc`, which lives until `r` is dropped, + // or it points at something else entirely which lives at least as long. + unsafe { + let ptr: *const T = r.deref(); + mem::forget(r); + &*ptr + } +} + /// Read a dense `store_path_hash` array from `narinfo.parquet`, /// returning it as an owned [FixedBytes]. pub fn load_ph_array() -> Result> {