feat(tvix/nar-bridge): support uploading NAR files
This ingests NAR files into the {Blob,Directory}Service, which are already part of the AppState. As we then need to correlate the root node to the uploaded PathInfo, we need to keep a (short-lived) lookup table from NARHash to root node around. We insert it into a `LruCache` after the NAR is uploaded, and use `peek()` to do the lookup, which doesn't update the LRU list. Change-Id: I48a4c6246bacf76559c5a4ccad2a0bc25c1b7900 Reviewed-on: https://cl.tvl.fyi/c/depot/+/11986 Tested-by: BuildkiteCI Reviewed-by: Brian Olsen <me@griff.name>
This commit is contained in:
parent
861cc1f341
commit
5d906054da
5 changed files with 158 additions and 13 deletions
13
tvix/Cargo.lock
generated
13
tvix/Cargo.lock
generated
|
@ -2246,9 +2246,12 @@ dependencies = [
|
||||||
"bytes",
|
"bytes",
|
||||||
"clap",
|
"clap",
|
||||||
"data-encoding",
|
"data-encoding",
|
||||||
|
"futures",
|
||||||
"hex-literal",
|
"hex-literal",
|
||||||
"itertools 0.12.0",
|
"itertools 0.12.0",
|
||||||
|
"lru",
|
||||||
"nix-compat",
|
"nix-compat",
|
||||||
|
"parking_lot 0.12.3",
|
||||||
"prost",
|
"prost",
|
||||||
"prost-build",
|
"prost-build",
|
||||||
"rstest",
|
"rstest",
|
||||||
|
@ -2436,7 +2439,7 @@ dependencies = [
|
||||||
"hyper 0.14.28",
|
"hyper 0.14.28",
|
||||||
"itertools 0.12.0",
|
"itertools 0.12.0",
|
||||||
"md-5",
|
"md-5",
|
||||||
"parking_lot 0.12.2",
|
"parking_lot 0.12.3",
|
||||||
"percent-encoding",
|
"percent-encoding",
|
||||||
"quick-xml",
|
"quick-xml",
|
||||||
"rand",
|
"rand",
|
||||||
|
@ -2599,9 +2602,9 @@ dependencies = [
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "parking_lot"
|
name = "parking_lot"
|
||||||
version = "0.12.2"
|
version = "0.12.3"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "7e4af0ca4f6caed20e900d564c242b8e5d4903fdacf31d3daf527b66fe6f42fb"
|
checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"lock_api",
|
"lock_api",
|
||||||
"parking_lot_core 0.9.9",
|
"parking_lot_core 0.9.9",
|
||||||
|
@ -4542,7 +4545,7 @@ dependencies = [
|
||||||
"lazy_static",
|
"lazy_static",
|
||||||
"libc",
|
"libc",
|
||||||
"object_store",
|
"object_store",
|
||||||
"parking_lot 0.12.2",
|
"parking_lot 0.12.3",
|
||||||
"petgraph",
|
"petgraph",
|
||||||
"pin-project-lite",
|
"pin-project-lite",
|
||||||
"prost",
|
"prost",
|
||||||
|
@ -4727,7 +4730,7 @@ dependencies = [
|
||||||
"lazy_static",
|
"lazy_static",
|
||||||
"lru",
|
"lru",
|
||||||
"nix-compat",
|
"nix-compat",
|
||||||
"parking_lot 0.12.2",
|
"parking_lot 0.12.3",
|
||||||
"pin-project-lite",
|
"pin-project-lite",
|
||||||
"prost",
|
"prost",
|
||||||
"prost-build",
|
"prost-build",
|
||||||
|
|
|
@ -6888,15 +6888,27 @@ rec {
|
||||||
name = "data-encoding";
|
name = "data-encoding";
|
||||||
packageId = "data-encoding";
|
packageId = "data-encoding";
|
||||||
}
|
}
|
||||||
|
{
|
||||||
|
name = "futures";
|
||||||
|
packageId = "futures";
|
||||||
|
}
|
||||||
{
|
{
|
||||||
name = "itertools";
|
name = "itertools";
|
||||||
packageId = "itertools 0.12.0";
|
packageId = "itertools 0.12.0";
|
||||||
}
|
}
|
||||||
|
{
|
||||||
|
name = "lru";
|
||||||
|
packageId = "lru";
|
||||||
|
}
|
||||||
{
|
{
|
||||||
name = "nix-compat";
|
name = "nix-compat";
|
||||||
packageId = "nix-compat";
|
packageId = "nix-compat";
|
||||||
features = [ "async" ];
|
features = [ "async" ];
|
||||||
}
|
}
|
||||||
|
{
|
||||||
|
name = "parking_lot";
|
||||||
|
packageId = "parking_lot 0.12.3";
|
||||||
|
}
|
||||||
{
|
{
|
||||||
name = "prost";
|
name = "prost";
|
||||||
packageId = "prost";
|
packageId = "prost";
|
||||||
|
@ -7533,7 +7545,7 @@ rec {
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
name = "parking_lot";
|
name = "parking_lot";
|
||||||
packageId = "parking_lot 0.12.2";
|
packageId = "parking_lot 0.12.3";
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
name = "percent-encoding";
|
name = "percent-encoding";
|
||||||
|
@ -8128,11 +8140,11 @@ rec {
|
||||||
};
|
};
|
||||||
resolvedDefaultFeatures = [ "default" ];
|
resolvedDefaultFeatures = [ "default" ];
|
||||||
};
|
};
|
||||||
"parking_lot 0.12.2" = rec {
|
"parking_lot 0.12.3" = rec {
|
||||||
crateName = "parking_lot";
|
crateName = "parking_lot";
|
||||||
version = "0.12.2";
|
version = "0.12.3";
|
||||||
edition = "2021";
|
edition = "2021";
|
||||||
sha256 = "1ys2dzz6cysjmwyivwxczl1ljpcf5cj4qmhdj07d5bkc9z5g0jky";
|
sha256 = "09ws9g6245iiq8z975h8ycf818a66q3c6zv4b5h8skpm7hc1igzi";
|
||||||
authors = [
|
authors = [
|
||||||
"Amanieu d'Antras <amanieu@gmail.com>"
|
"Amanieu d'Antras <amanieu@gmail.com>"
|
||||||
];
|
];
|
||||||
|
@ -14484,7 +14496,7 @@ rec {
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
name = "parking_lot";
|
name = "parking_lot";
|
||||||
packageId = "parking_lot 0.12.2";
|
packageId = "parking_lot 0.12.3";
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
name = "petgraph";
|
name = "petgraph";
|
||||||
|
@ -15238,7 +15250,7 @@ rec {
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
name = "parking_lot";
|
name = "parking_lot";
|
||||||
packageId = "parking_lot 0.12.2";
|
packageId = "parking_lot 0.12.3";
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
name = "pin-project-lite";
|
name = "pin-project-lite";
|
||||||
|
|
|
@ -8,6 +8,7 @@ axum = { version = "0.7.5", features = ["http2"] }
|
||||||
bytes = "1.4.0"
|
bytes = "1.4.0"
|
||||||
clap = { version = "4.0", features = ["derive", "env"] }
|
clap = { version = "4.0", features = ["derive", "env"] }
|
||||||
data-encoding = "2.3.3"
|
data-encoding = "2.3.3"
|
||||||
|
futures = "0.3.30"
|
||||||
itertools = "0.12.0"
|
itertools = "0.12.0"
|
||||||
prost = "0.12.1"
|
prost = "0.12.1"
|
||||||
nix-compat = { path = "../nix-compat", features = ["async"] }
|
nix-compat = { path = "../nix-compat", features = ["async"] }
|
||||||
|
@ -23,6 +24,8 @@ tracing = "0.1.37"
|
||||||
tracing-subscriber = "0.3.16"
|
tracing-subscriber = "0.3.16"
|
||||||
url = "2.4.0"
|
url = "2.4.0"
|
||||||
serde = { version = "1.0.204", features = ["derive"] }
|
serde = { version = "1.0.204", features = ["derive"] }
|
||||||
|
lru = "0.12.3"
|
||||||
|
parking_lot = "0.12.3"
|
||||||
|
|
||||||
[build-dependencies]
|
[build-dependencies]
|
||||||
prost-build = "0.12.1"
|
prost-build = "0.12.1"
|
||||||
|
|
|
@ -1,18 +1,32 @@
|
||||||
use axum::routing::head;
|
use axum::routing::{head, put};
|
||||||
use axum::{routing::get, Router};
|
use axum::{routing::get, Router};
|
||||||
|
use lru::LruCache;
|
||||||
|
use parking_lot::RwLock;
|
||||||
|
use std::num::NonZeroUsize;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tvix_castore::blobservice::BlobService;
|
use tvix_castore::blobservice::BlobService;
|
||||||
use tvix_castore::directoryservice::DirectoryService;
|
use tvix_castore::directoryservice::DirectoryService;
|
||||||
|
use tvix_castore::proto::node::Node;
|
||||||
use tvix_store::pathinfoservice::PathInfoService;
|
use tvix_store::pathinfoservice::PathInfoService;
|
||||||
|
|
||||||
mod nar;
|
mod nar;
|
||||||
mod narinfo;
|
mod narinfo;
|
||||||
|
|
||||||
|
/// The capacity of the lookup table from NarHash to [Node].
|
||||||
|
/// Should be bigger than the number of concurrent NAR upload.
|
||||||
|
/// Cannot be [NonZeroUsize] here due to rust-analyzer going bananas.
|
||||||
|
/// SAFETY: 1000 != 0
|
||||||
|
const ROOT_NODES_CACHE_CAPACITY: usize = 1000;
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct AppState {
|
pub struct AppState {
|
||||||
blob_service: Arc<dyn BlobService>,
|
blob_service: Arc<dyn BlobService>,
|
||||||
directory_service: Arc<dyn DirectoryService>,
|
directory_service: Arc<dyn DirectoryService>,
|
||||||
path_info_service: Arc<dyn PathInfoService>,
|
path_info_service: Arc<dyn PathInfoService>,
|
||||||
|
|
||||||
|
/// Lookup table from NarHash to [Node], necessary to populate the root_node
|
||||||
|
/// field of the PathInfo when processing the narinfo upload.
|
||||||
|
root_nodes: Arc<RwLock<LruCache<[u8; 32], Node>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl AppState {
|
impl AppState {
|
||||||
|
@ -25,6 +39,10 @@ impl AppState {
|
||||||
blob_service,
|
blob_service,
|
||||||
directory_service,
|
directory_service,
|
||||||
path_info_service,
|
path_info_service,
|
||||||
|
root_nodes: Arc::new(RwLock::new(LruCache::new({
|
||||||
|
// SAFETY: 1000 != 0
|
||||||
|
unsafe { NonZeroUsize::new_unchecked(ROOT_NODES_CACHE_CAPACITY) }
|
||||||
|
}))),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -32,6 +50,7 @@ impl AppState {
|
||||||
pub fn gen_router(priority: u64) -> Router<AppState> {
|
pub fn gen_router(priority: u64) -> Router<AppState> {
|
||||||
Router::new()
|
Router::new()
|
||||||
.route("/", get(root))
|
.route("/", get(root))
|
||||||
|
.route("/nar/:nar_str", put(nar::put))
|
||||||
.route("/nar/tvix-castore/:root_node_enc", get(nar::get))
|
.route("/nar/tvix-castore/:root_node_enc", get(nar::get))
|
||||||
.route("/:narinfo_str", get(narinfo::get))
|
.route("/:narinfo_str", get(narinfo::get))
|
||||||
.route("/:narinfo_str", head(narinfo::head))
|
.route("/:narinfo_str", head(narinfo::head))
|
||||||
|
|
|
@ -4,9 +4,13 @@ use axum::http::StatusCode;
|
||||||
use axum::response::Response;
|
use axum::response::Response;
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use data_encoding::BASE64URL_NOPAD;
|
use data_encoding::BASE64URL_NOPAD;
|
||||||
|
use futures::TryStreamExt;
|
||||||
|
use nix_compat::nixbase32;
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
|
use std::io;
|
||||||
use tokio_util::io::ReaderStream;
|
use tokio_util::io::ReaderStream;
|
||||||
use tracing::{instrument, warn};
|
use tracing::{instrument, warn, Span};
|
||||||
|
use tvix_store::nar::ingest_nar_and_hash;
|
||||||
|
|
||||||
use crate::AppState;
|
use crate::AppState;
|
||||||
|
|
||||||
|
@ -75,3 +79,107 @@ pub async fn get(
|
||||||
.body(Body::from_stream(ReaderStream::new(r)))
|
.body(Body::from_stream(ReaderStream::new(r)))
|
||||||
.unwrap())
|
.unwrap())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[instrument(skip(blob_service, directory_service, request))]
|
||||||
|
pub async fn put(
|
||||||
|
axum::extract::Path(nar_str): axum::extract::Path<String>,
|
||||||
|
axum::extract::State(AppState {
|
||||||
|
blob_service,
|
||||||
|
directory_service,
|
||||||
|
root_nodes,
|
||||||
|
..
|
||||||
|
}): axum::extract::State<AppState>,
|
||||||
|
request: axum::extract::Request,
|
||||||
|
) -> Result<&'static str, StatusCode> {
|
||||||
|
let nar_hash_expected = parse_nar_str(&nar_str)?;
|
||||||
|
|
||||||
|
let s = request.into_body().into_data_stream();
|
||||||
|
|
||||||
|
let mut r = tokio_util::io::StreamReader::new(s.map_err(|e| {
|
||||||
|
warn!(err=%e, "failed to read request body");
|
||||||
|
io::Error::new(io::ErrorKind::BrokenPipe, e.to_string())
|
||||||
|
}));
|
||||||
|
|
||||||
|
// ingest the NAR
|
||||||
|
let (root_node, nar_hash_actual, nar_size) =
|
||||||
|
ingest_nar_and_hash(blob_service.clone(), directory_service.clone(), &mut r)
|
||||||
|
.await
|
||||||
|
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))
|
||||||
|
.map_err(|e| {
|
||||||
|
warn!(err=%e, "failed to ingest nar");
|
||||||
|
StatusCode::INTERNAL_SERVER_ERROR
|
||||||
|
})?;
|
||||||
|
|
||||||
|
let s = Span::current();
|
||||||
|
s.record("nar_hash.expected", nixbase32::encode(&nar_hash_expected));
|
||||||
|
s.record("nar_size", nar_size);
|
||||||
|
|
||||||
|
if nar_hash_expected != nar_hash_actual {
|
||||||
|
warn!(
|
||||||
|
nar_hash.expected = nixbase32::encode(&nar_hash_expected),
|
||||||
|
nar_hash.actual = nixbase32::encode(&nar_hash_actual),
|
||||||
|
"nar hash mismatch"
|
||||||
|
);
|
||||||
|
return Err(StatusCode::BAD_REQUEST);
|
||||||
|
}
|
||||||
|
|
||||||
|
// store mapping of narhash to root node into root_nodes.
|
||||||
|
// we need it later to populate the root node when accepting the PathInfo.
|
||||||
|
root_nodes.write().put(nar_hash_actual, root_node);
|
||||||
|
|
||||||
|
Ok("")
|
||||||
|
}
|
||||||
|
|
||||||
|
// FUTUREWORK: maybe head by narhash. Though not too critical, as we do
|
||||||
|
// implement HEAD for .narinfo.
|
||||||
|
|
||||||
|
/// Parses a `14cx20k6z4hq508kqi2lm79qfld5f9mf7kiafpqsjs3zlmycza0k.nar`
|
||||||
|
/// string and returns the nixbase32-decoded digest.
|
||||||
|
/// No compression is supported.
|
||||||
|
fn parse_nar_str(s: &str) -> Result<[u8; 32], StatusCode> {
|
||||||
|
if !s.is_char_boundary(52) {
|
||||||
|
warn!("invalid string, no char boundary at 32");
|
||||||
|
return Err(StatusCode::NOT_FOUND);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(match s.split_at(52) {
|
||||||
|
(hash_str, ".nar") => {
|
||||||
|
// we know this is 52 bytes
|
||||||
|
let hash_str_fixed: [u8; 52] = hash_str.as_bytes().try_into().unwrap();
|
||||||
|
nixbase32::decode_fixed(hash_str_fixed).map_err(|e| {
|
||||||
|
warn!(err=%e, "invalid digest");
|
||||||
|
StatusCode::NOT_FOUND
|
||||||
|
})?
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
warn!("invalid string");
|
||||||
|
return Err(StatusCode::BAD_REQUEST);
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod test {
|
||||||
|
use super::parse_nar_str;
|
||||||
|
use hex_literal::hex;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn success() {
|
||||||
|
assert_eq!(
|
||||||
|
hex!("13a8cf7ca57f68a9f1752acee36a72a55187d3a954443c112818926f26109d91"),
|
||||||
|
parse_nar_str("14cx20k6z4hq508kqi2lm79qfld5f9mf7kiafpqsjs3zlmycza0k.nar").unwrap()
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn failure() {
|
||||||
|
assert!(
|
||||||
|
parse_nar_str("14cx20k6z4hq508kqi2lm79qfld5f9mf7kiafpqsjs3zlmycza0k.nar.x").is_err()
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
parse_nar_str("14cx20k6z4hq508kqi2lm79qfld5f9mf7kiafpqsjs3zlmycza0k.nar.xz").is_err()
|
||||||
|
);
|
||||||
|
assert!(parse_nar_str("14cx20k6z4hq508kqi2lm79qfld5f9mf7kiafpqsjs3zlmycza0").is_err());
|
||||||
|
assert!(parse_nar_str("14cx20k6z4hq508kqi2lm79qfld5f9mf7kiafpqsjs3zlmycza0🦊.nar").is_err())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue