fix(tvix/tools/weave): handle sliced arrays correctly
The start/end offsets are not necessarily coterminous with the underlying values array, so even if the stride is fixed, we still we need to slice the chunks down to match the start/end offsets. This bug shouldn't affect the correctness of any existing code, since we're always working with unsliced arrays read directly from Parquet. Change-Id: I2f7ddc4e66d4d3b2317a44bd436a35bff36bac79 Reviewed-on: https://cl.tvl.fyi/c/depot/+/11081 Tested-by: BuildkiteCI Reviewed-by: flokli <flokli@flokli.de>
This commit is contained in:
parent
ca97e5f485
commit
6bdaebcb55
1 changed files with 12 additions and 8 deletions
|
@ -1,6 +1,6 @@
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use rayon::prelude::*;
|
use rayon::prelude::*;
|
||||||
use std::{fs::File, slice};
|
use std::{fs::File, ops::Range, slice};
|
||||||
|
|
||||||
use polars::{
|
use polars::{
|
||||||
datatypes::BinaryChunked,
|
datatypes::BinaryChunked,
|
||||||
|
@ -49,8 +49,8 @@ pub fn as_fixed_binary<const N: usize>(
|
||||||
chunked: &BinaryChunked,
|
chunked: &BinaryChunked,
|
||||||
) -> impl Iterator<Item = &[[u8; N]]> + DoubleEndedIterator {
|
) -> impl Iterator<Item = &[[u8; N]]> + DoubleEndedIterator {
|
||||||
chunked.downcast_iter().map(|array| {
|
chunked.downcast_iter().map(|array| {
|
||||||
assert_fixed_dense::<N>(array);
|
let range = assert_fixed_dense::<N>(array);
|
||||||
exact_chunks(array.values()).unwrap()
|
exact_chunks(&array.values()[range]).unwrap()
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -61,20 +61,22 @@ fn into_fixed_binary_rechunk<const N: usize>(chunked: &BinaryChunked) -> FixedBy
|
||||||
let mut iter = chunked.downcast_iter();
|
let mut iter = chunked.downcast_iter();
|
||||||
let array = iter.next().unwrap();
|
let array = iter.next().unwrap();
|
||||||
|
|
||||||
assert_fixed_dense::<N>(array);
|
let range = assert_fixed_dense::<N>(array);
|
||||||
Bytes(array.values().clone()).map(|buf| exact_chunks(buf).unwrap())
|
Bytes(array.values().clone().sliced(range.start, range.len()))
|
||||||
|
.map(|buf| exact_chunks(buf).unwrap())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Ensures that the supplied Arrow array consists of densely packed bytestrings of length `N`.
|
/// Ensures that the supplied Arrow array consists of densely packed bytestrings of length `N`.
|
||||||
/// In other words, ensure that it is free of nulls, and that the offsets have a fixed stride of `N`.
|
/// In other words, ensure that it is free of nulls, and that the offsets have a fixed stride of `N`.
|
||||||
fn assert_fixed_dense<const N: usize>(array: &BinaryArray<i64>) {
|
#[must_use = "only the range returned is guaranteed to be conformant"]
|
||||||
|
fn assert_fixed_dense<const N: usize>(array: &BinaryArray<i64>) -> Range<usize> {
|
||||||
let null_count = array.validity().map_or(0, |bits| bits.unset_bits());
|
let null_count = array.validity().map_or(0, |bits| bits.unset_bits());
|
||||||
if null_count > 0 {
|
if null_count > 0 {
|
||||||
panic!("null values present");
|
panic!("null values present");
|
||||||
}
|
}
|
||||||
|
|
||||||
let length_check = array
|
let offsets = array.offsets();
|
||||||
.offsets()
|
let length_check = offsets
|
||||||
.as_slice()
|
.as_slice()
|
||||||
.par_windows(2)
|
.par_windows(2)
|
||||||
.all(|w| (w[1] - w[0]) == N as i64);
|
.all(|w| (w[1] - w[0]) == N as i64);
|
||||||
|
@ -82,6 +84,8 @@ fn assert_fixed_dense<const N: usize>(array: &BinaryArray<i64>) {
|
||||||
if !length_check {
|
if !length_check {
|
||||||
panic!("lengths are inconsistent");
|
panic!("lengths are inconsistent");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
(*offsets.first() as usize)..(*offsets.last() as usize)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn exact_chunks<const K: usize>(buf: &[u8]) -> Option<&[[u8; K]]> {
|
fn exact_chunks<const K: usize>(buf: &[u8]) -> Option<&[[u8; K]]> {
|
||||||
|
|
Loading…
Reference in a new issue