feat(tvix/tools/turbofetch): init
Change-Id: I2efa6f94f57e812c52371256a4e62d1d54ff5057 Reviewed-on: https://cl.tvl.fyi/c/depot/+/9925 Reviewed-by: flokli <flokli@flokli.de> Tested-by: BuildkiteCI
This commit is contained in:
parent
9cd2e92065
commit
bdda10a2f5
9 changed files with 8624 additions and 0 deletions
1715
tvix/tools/turbofetch/Cargo.lock
generated
Normal file
1715
tvix/tools/turbofetch/Cargo.lock
generated
Normal file
File diff suppressed because it is too large
Load diff
6453
tvix/tools/turbofetch/Cargo.nix
Normal file
6453
tvix/tools/turbofetch/Cargo.nix
Normal file
File diff suppressed because it is too large
Load diff
28
tvix/tools/turbofetch/Cargo.toml
Normal file
28
tvix/tools/turbofetch/Cargo.toml
Normal file
|
@ -0,0 +1,28 @@
|
|||
[package]
|
||||
name = "turbofetch"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
# We don't join the //tvix workspace, as this is fairly cache.nixos.org-specific.
|
||||
[workspace]
|
||||
members = ["."]
|
||||
|
||||
[dependencies]
|
||||
aws_lambda_events = { version = "0.11.1", default-features = false, features = ["lambda_function_urls"] }
|
||||
bytes = "1.5.0"
|
||||
data-encoding = "2.4.0"
|
||||
futures = { version = "0.3.29", default-features = false, features = ["std"] }
|
||||
httparse = "1.8.0"
|
||||
hyper = { version = "0.14.27", default-features = false }
|
||||
lambda_runtime = "0.8.2"
|
||||
magic-buffer = "0.1.0"
|
||||
rusoto_core = { version = "0.48.0", features = ["rustls"], default-features = false }
|
||||
rusoto_s3 = { version = "0.48.0", features = ["rustls"], default-features = false }
|
||||
serde_json = "1.0.108"
|
||||
serde = { version = "1.0.190", features = ["derive"] }
|
||||
tokio = { version = "1.33.0", features = ["full"] }
|
||||
tower = "0.4.13"
|
||||
# TODO(edef): zstd = "0.13.0"
|
||||
zstd = "0.9.0"
|
||||
tracing-subscriber = { version = "0.3.17", features = ["json"] }
|
||||
tracing = "0.1.40"
|
1
tvix/tools/turbofetch/OWNERS
Normal file
1
tvix/tools/turbofetch/OWNERS
Normal file
|
@ -0,0 +1 @@
|
|||
edef
|
12
tvix/tools/turbofetch/default.nix
Normal file
12
tvix/tools/turbofetch/default.nix
Normal file
|
@ -0,0 +1,12 @@
|
|||
{ lib, pkgs, ... }:
|
||||
|
||||
(pkgs.callPackage ./Cargo.nix {
|
||||
defaultCrateOverrides = pkgs.defaultCrateOverrides // {
|
||||
ring = prev: {
|
||||
# TODO(edef): implement CARGO_MANIFEST_LINKS in crate2nix
|
||||
CARGO_MANIFEST_LINKS = ''ring_core_${lib.replaceStrings ["."] ["_"] prev.version}'';
|
||||
};
|
||||
};
|
||||
}).rootCrate.build.override {
|
||||
runTests = true;
|
||||
}
|
5
tvix/tools/turbofetch/deploy.sh
Executable file
5
tvix/tools/turbofetch/deploy.sh
Executable file
|
@ -0,0 +1,5 @@
|
|||
#! /usr/bin/env nix-shell
|
||||
#! nix-shell -i "bash -e"
|
||||
#! nix-shell -p cargo-lambda
|
||||
cargo lambda build --release
|
||||
cargo lambda deploy
|
87
tvix/tools/turbofetch/src/buffer.rs
Normal file
87
tvix/tools/turbofetch/src/buffer.rs
Normal file
|
@ -0,0 +1,87 @@
|
|||
use magic_buffer::MagicBuffer;
|
||||
use std::cell::Cell;
|
||||
|
||||
/// Buffer is a FIFO queue for bytes, built on a ring buffer.
|
||||
/// It always provides contiguous slices for both the readable and writable parts,
|
||||
/// using an underlying buffer that is "mirrored" in virtual memory.
|
||||
pub struct Buffer {
|
||||
buffer: MagicBuffer,
|
||||
/// first readable byte
|
||||
head: Cell<usize>,
|
||||
/// first writable byte
|
||||
tail: usize,
|
||||
}
|
||||
|
||||
// SAFETY: MagicBuffer isn't bound to a thread, and neither are any of the other fields.
|
||||
// MagicBuffer ought to be Send+Sync itself, upstream PR at https://github.com/sklose/magic-buffer/pull/4
|
||||
unsafe impl Send for Buffer {}
|
||||
|
||||
impl Buffer {
|
||||
/// Allocate a fresh buffer, with the specified capacity.
|
||||
/// The buffer can contain at most `capacity - 1` bytes.
|
||||
/// The capacity must be a power of two, and at least [Buffer::min_len].
|
||||
pub fn new(capacity: usize) -> Buffer {
|
||||
Buffer {
|
||||
// MagicBuffer::new verifies that `capacity` is a power of two,
|
||||
// and at least MagicBuffer::min_len().
|
||||
buffer: MagicBuffer::new(capacity).unwrap(),
|
||||
// `head == tail` means the buffer is empty.
|
||||
// In order to ensure that this remains unambiguous,
|
||||
// the buffer can only be filled with capacity-1 bytes.
|
||||
head: Cell::new(0),
|
||||
tail: 0,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the minimum buffer capacity.
|
||||
/// This depends on the operating system and architecture.
|
||||
pub fn min_capacity() -> usize {
|
||||
MagicBuffer::min_len()
|
||||
}
|
||||
|
||||
/// Return the capacity of the buffer.
|
||||
/// This is equal to `self.data().len() + self.space().len() + 1`.
|
||||
pub fn capacity(&self) -> usize {
|
||||
self.buffer.len()
|
||||
}
|
||||
|
||||
/// Return the valid, readable data in the buffer.
|
||||
pub fn data(&self) -> &[u8] {
|
||||
let len = self.buffer.len();
|
||||
let head = self.head.get();
|
||||
|
||||
if head <= self.tail {
|
||||
&self.buffer[head..self.tail]
|
||||
} else {
|
||||
&self.buffer[head..self.tail + len]
|
||||
}
|
||||
}
|
||||
|
||||
/// Mark `read_len` bytes of the readable data as consumed, freeing the space.
|
||||
pub fn consume(&self, read_len: usize) {
|
||||
debug_assert!(read_len <= self.data().len());
|
||||
let mut head = self.head.get();
|
||||
head += read_len;
|
||||
head &= self.buffer.len() - 1;
|
||||
self.head.set(head);
|
||||
}
|
||||
|
||||
/// Return the empty, writable space in the buffer.
|
||||
pub fn space(&mut self) -> &mut [u8] {
|
||||
let len = self.buffer.len();
|
||||
let head = self.head.get();
|
||||
|
||||
if head <= self.tail {
|
||||
&mut self.buffer[self.tail..head + len - 1]
|
||||
} else {
|
||||
&mut self.buffer[self.tail..head - 1]
|
||||
}
|
||||
}
|
||||
|
||||
/// Mark `written_len` bytes of the writable space as valid, readable data.
|
||||
pub fn commit(&mut self, written_len: usize) {
|
||||
debug_assert!(written_len <= self.space().len());
|
||||
self.tail += written_len;
|
||||
self.tail &= self.buffer.len() - 1;
|
||||
}
|
||||
}
|
103
tvix/tools/turbofetch/src/lib.rs
Normal file
103
tvix/tools/turbofetch/src/lib.rs
Normal file
|
@ -0,0 +1,103 @@
|
|||
use std::{mem::MaybeUninit, str};
|
||||
use tokio::io::{self, AsyncRead, AsyncReadExt};
|
||||
|
||||
pub use buffer::Buffer;
|
||||
mod buffer;
|
||||
|
||||
/// Read as much data into `buffer` as possible.
|
||||
/// Returns [io::ErrorKind::OutOfMemory] if the buffer is already full.
|
||||
async fn slurp(buffer: &mut Buffer, sock: &mut (impl AsyncRead + Unpin)) -> io::Result<()> {
|
||||
match buffer.space() {
|
||||
[] => Err(io::Error::new(io::ErrorKind::OutOfMemory, "buffer filled")),
|
||||
buf => {
|
||||
let n = sock.read(buf).await?;
|
||||
if n == 0 {
|
||||
return Err(io::ErrorKind::UnexpectedEof.into());
|
||||
}
|
||||
buffer.commit(n);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn get_content_length(headers: &[httparse::Header]) -> io::Result<u64> {
|
||||
for header in headers {
|
||||
if header.name == "Transfer-Encoding" {
|
||||
return Err(io::Error::new(
|
||||
io::ErrorKind::InvalidData,
|
||||
"Transfer-Encoding is unsupported",
|
||||
));
|
||||
}
|
||||
|
||||
if header.name == "Content-Length" {
|
||||
return str::from_utf8(header.value)
|
||||
.ok()
|
||||
.and_then(|v| v.parse().ok())
|
||||
.ok_or_else(|| {
|
||||
io::Error::new(io::ErrorKind::InvalidData, "invalid Content-Length")
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
Err(io::Error::new(
|
||||
io::ErrorKind::InvalidData,
|
||||
"Content-Length missing",
|
||||
))
|
||||
}
|
||||
|
||||
/// Read an HTTP response from `sock` using `buffer`, returning the response body.
|
||||
/// Returns an error if anything but 200 OK is received.
|
||||
///
|
||||
/// The buffer must have enough space to contain the entire response body.
|
||||
/// If there is not enough space, [io::ErrorKind::OutOfMemory] is returned.
|
||||
///
|
||||
/// The HTTP response must use `Content-Length`, without `Transfer-Encoding`.
|
||||
pub async fn parse_response<'a>(
|
||||
sock: &mut (impl AsyncRead + Unpin),
|
||||
buffer: &'a mut Buffer,
|
||||
) -> io::Result<&'a [u8]> {
|
||||
let body_len = loop {
|
||||
let mut headers = [MaybeUninit::uninit(); 16];
|
||||
let mut response = httparse::Response::new(&mut []);
|
||||
let status = httparse::ParserConfig::default()
|
||||
.parse_response_with_uninit_headers(&mut response, buffer.data(), &mut headers)
|
||||
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
|
||||
|
||||
if let httparse::Status::Complete(n) = status {
|
||||
buffer.consume(n);
|
||||
|
||||
let code = response.code.unwrap();
|
||||
if code != 200 {
|
||||
return Err(io::Error::new(
|
||||
io::ErrorKind::Other,
|
||||
format!("HTTP response {code}"),
|
||||
));
|
||||
}
|
||||
|
||||
break get_content_length(response.headers)?;
|
||||
}
|
||||
|
||||
slurp(buffer, sock).await?;
|
||||
};
|
||||
|
||||
let buf_len = buffer.space().len() + buffer.data().len();
|
||||
|
||||
if body_len > buf_len as u64 {
|
||||
return Err(io::Error::new(
|
||||
io::ErrorKind::OutOfMemory,
|
||||
"HTTP response body does not fit in buffer",
|
||||
));
|
||||
}
|
||||
|
||||
let body_len = body_len as usize;
|
||||
|
||||
while buffer.data().len() < body_len {
|
||||
slurp(buffer, sock).await?;
|
||||
}
|
||||
|
||||
let data = buffer.data();
|
||||
buffer.consume(body_len);
|
||||
|
||||
Ok(&data[..body_len])
|
||||
}
|
220
tvix/tools/turbofetch/src/main.rs
Normal file
220
tvix/tools/turbofetch/src/main.rs
Normal file
|
@ -0,0 +1,220 @@
|
|||
//! turbofetch is a high-performance bulk S3 object aggregator.
|
||||
//!
|
||||
//! It operates on two S3 buckets: a source bucket (nix-cache), and a
|
||||
//! work bucket defined at runtime. The work bucket contains a job file
|
||||
//! consisting of concatenated 32-character keys, representing narinfo
|
||||
//! files in the source bucket, without the `.narinfo` suffix or any
|
||||
//! other separators.
|
||||
//!
|
||||
//! Each run of turbofetch processes a half-open range of indices from the
|
||||
//! job file, and outputs a zstd stream of concatenated objects, without
|
||||
//! additional separators and in no particular order. These segment files
|
||||
//! are written into the work bucket, named for the range of indices they
|
||||
//! cover. `/narinfo.zst/000000000c380d40-000000000c385b60` covers the 20k
|
||||
//! objects `[0xc380d40, 0xc385b60) = [205000000, 205020000)`. Empirically,
|
||||
//! segment files of 20k objects achieve a compression ratio of 4.7x.
|
||||
//!
|
||||
//! Reassembly is left to narinfo2parquet, which interprets StorePath lines.
|
||||
//!
|
||||
//! TODO(edef): any retries/error handling whatsoever
|
||||
//! Currently, it fails an entire range if anything goes wrong, and doesn't
|
||||
//! write any output.
|
||||
|
||||
use bytes::Bytes;
|
||||
use futures::{stream::FuturesUnordered, Stream, TryStreamExt};
|
||||
use rusoto_core::ByteStream;
|
||||
use rusoto_s3::{GetObjectRequest, PutObjectRequest, S3Client, S3};
|
||||
use serde::Deserialize;
|
||||
use std::{io::Write, mem, ops::Range, ptr};
|
||||
use tokio::{
|
||||
io::{self, AsyncReadExt, AsyncWriteExt},
|
||||
net::TcpStream,
|
||||
};
|
||||
|
||||
/// Fetch a group of keys, streaming concatenated chunks as they arrive from S3.
|
||||
/// `keys` must be a slice from the job file. Any network error at all fails the
|
||||
/// entire batch, and there is no rate limiting.
|
||||
fn fetch(keys: &[[u8; 32]]) -> impl Stream<Item = io::Result<Bytes>> {
|
||||
// S3 supports only HTTP/1.1, but we can ease the pain somewhat by using
|
||||
// HTTP pipelining. It terminates the TCP connection after receiving 100
|
||||
// requests, so we chunk the keys up accordingly, and make one connection
|
||||
// for each chunk.
|
||||
keys.chunks(100)
|
||||
.map(|chunk| {
|
||||
const PREFIX: &[u8] = b"GET /nix-cache/";
|
||||
const SUFFIX: &[u8] = b".narinfo HTTP/1.1\nHost: s3.amazonaws.com\n\n";
|
||||
const LENGTH: usize = PREFIX.len() + 32 + SUFFIX.len();
|
||||
|
||||
let mut request = Vec::with_capacity(LENGTH * 100);
|
||||
for key in chunk {
|
||||
request.extend_from_slice(PREFIX);
|
||||
request.extend_from_slice(key);
|
||||
request.extend_from_slice(SUFFIX);
|
||||
}
|
||||
|
||||
(request, chunk.len())
|
||||
})
|
||||
.map(|(request, n)| async move {
|
||||
let (mut read, mut write) = TcpStream::connect("s3.amazonaws.com:80")
|
||||
.await?
|
||||
.into_split();
|
||||
|
||||
let _handle = tokio::spawn(async move {
|
||||
let request = request;
|
||||
write.write_all(&request).await
|
||||
});
|
||||
|
||||
let mut buffer = turbofetch::Buffer::new(512 * 1024);
|
||||
let mut bodies = vec![];
|
||||
|
||||
for _ in 0..n {
|
||||
let body = turbofetch::parse_response(&mut read, &mut buffer).await?;
|
||||
bodies.extend_from_slice(body);
|
||||
}
|
||||
|
||||
Ok::<_, io::Error>(Bytes::from(bodies))
|
||||
})
|
||||
.collect::<FuturesUnordered<_>>()
|
||||
}
|
||||
|
||||
/// Retrieve a range of keys from the job file.
|
||||
async fn get_range(
|
||||
s3: &'static S3Client,
|
||||
bucket: String,
|
||||
key: String,
|
||||
range: Range<u64>,
|
||||
) -> io::Result<Box<[[u8; 32]]>> {
|
||||
let resp = s3
|
||||
.get_object(GetObjectRequest {
|
||||
bucket,
|
||||
key,
|
||||
range: Some(format!("bytes={}-{}", range.start * 32, range.end * 32 - 1)),
|
||||
..GetObjectRequest::default()
|
||||
})
|
||||
.await
|
||||
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
|
||||
|
||||
let mut body = vec![];
|
||||
resp.body
|
||||
.ok_or(io::ErrorKind::InvalidData)?
|
||||
.into_async_read()
|
||||
.read_to_end(&mut body)
|
||||
.await?;
|
||||
|
||||
let body = exact_chunks(body.into_boxed_slice()).ok_or(io::ErrorKind::InvalidData)?;
|
||||
|
||||
Ok(body)
|
||||
}
|
||||
|
||||
fn exact_chunks(mut buf: Box<[u8]>) -> Option<Box<[[u8; 32]]>> {
|
||||
// SAFETY: We ensure that `buf.len()` is a multiple of 32, and there are no alignment requirements.
|
||||
unsafe {
|
||||
let ptr = buf.as_mut_ptr();
|
||||
let len = buf.len();
|
||||
|
||||
if len % 32 != 0 {
|
||||
return None;
|
||||
}
|
||||
|
||||
let ptr = ptr as *mut [u8; 32];
|
||||
let len = len / 32;
|
||||
mem::forget(buf);
|
||||
|
||||
Some(Box::from_raw(ptr::slice_from_raw_parts_mut(ptr, len)))
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(edef): factor this out into a separate entry point
|
||||
#[tokio::main(flavor = "current_thread")]
|
||||
async fn main() -> Result<(), lambda_runtime::Error> {
|
||||
let s3 = S3Client::new(rusoto_core::Region::UsEast1);
|
||||
let s3 = &*Box::leak(Box::new(s3));
|
||||
|
||||
tracing_subscriber::fmt()
|
||||
.json()
|
||||
.with_max_level(tracing::Level::INFO)
|
||||
// this needs to be set to remove duplicated information in the log.
|
||||
.with_current_span(false)
|
||||
// this needs to be set to false, otherwise ANSI color codes will
|
||||
// show up in a confusing manner in CloudWatch logs.
|
||||
.with_ansi(false)
|
||||
// disabling time is handy because CloudWatch will add the ingestion time.
|
||||
.without_time()
|
||||
// remove the name of the function from every log entry
|
||||
.with_target(false)
|
||||
.init();
|
||||
|
||||
lambda_runtime::run(lambda_runtime::service_fn(|event| func(s3, event))).await
|
||||
}
|
||||
|
||||
/// Lambda request body
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct Params {
|
||||
work_bucket: String,
|
||||
job_file: String,
|
||||
start: u64,
|
||||
end: u64,
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(s3, event), fields(req_id = %event.context.request_id))]
|
||||
async fn func(
|
||||
s3: &'static S3Client,
|
||||
event: lambda_runtime::LambdaEvent<
|
||||
aws_lambda_events::lambda_function_urls::LambdaFunctionUrlRequest,
|
||||
>,
|
||||
) -> Result<&'static str, lambda_runtime::Error> {
|
||||
let mut params = event.payload.body.ok_or("no body")?;
|
||||
|
||||
if event.payload.is_base64_encoded {
|
||||
params = String::from_utf8(data_encoding::BASE64.decode(params.as_bytes())?)?;
|
||||
}
|
||||
|
||||
let params: Params = serde_json::from_str(¶ms)?;
|
||||
|
||||
if params.start >= params.end {
|
||||
return Err("nope".into());
|
||||
}
|
||||
|
||||
let keys = get_range(
|
||||
s3,
|
||||
params.work_bucket.clone(),
|
||||
params.job_file.to_owned(),
|
||||
params.start..params.end,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let zchunks = fetch(&keys)
|
||||
.try_fold(
|
||||
Box::new(zstd::Encoder::new(vec![], zstd::DEFAULT_COMPRESSION_LEVEL).unwrap()),
|
||||
|mut w, buf| {
|
||||
w.write_all(&buf).unwrap();
|
||||
async { Ok(w) }
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
let zchunks = to_byte_stream(zchunks.finish().unwrap());
|
||||
|
||||
tracing::info!("we got to put_object");
|
||||
|
||||
s3.put_object(PutObjectRequest {
|
||||
bucket: params.work_bucket,
|
||||
key: format!("narinfo.zst/{:016x}-{:016x}", params.start, params.end),
|
||||
body: Some(zchunks),
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
|
||||
|
||||
tracing::info!("… and it worked!");
|
||||
|
||||
Ok("OK")
|
||||
}
|
||||
|
||||
fn to_byte_stream(buffer: Vec<u8>) -> ByteStream {
|
||||
let size_hint = buffer.len();
|
||||
ByteStream::new_with_size(
|
||||
futures::stream::once(async { Ok(buffer.into()) }),
|
||||
size_hint,
|
||||
)
|
||||
}
|
Loading…
Reference in a new issue