feat(tvix/glue): Implement builtins.fetchurl

Implement the fetchurl builtin, and lay the groundwork for implementing
the fetchTarball builtin (which works very similarly, and is implemented
using almost the same code in C++ nix).

An overview of how this works:

1. First, we check if the store path that *would* result from the
   download already exists in the store - if it does, we just return
   that
2. If we need to download the URL, TvixStoreIO has an `http_client:
   reqwest::Client` field now which we use to make the request
3. As we're downloading the blob, we hash the data incrementally into a
   SHA256 hasher
4. We compare the hash against the expected hash (if any) and bail out
   if it doesn't match
5. Finally, we put the blob in the store and return the store path

Since the logic is very similar, this commit also implements a *chunk*
of `fetchTarball` (though the actual implementation will likely include
a refactor to some of the code reuse here).

The main thing that's missing here is caching of downloaded blobs when
fetchurl is called without a hash - I've opened b/381 to track the TODO
there.

Adding the `SSL_CERT_FILE` here is necessary to teach reqwest how to
load it during tests - see 1c16dee20 (feat(tvix/store): use reqwests'
rustls-native-roots feature, 2024-03-03) for  more info.

Change-Id: I83c4abbc7c0c3bfe92461917e23d6d3430fbf137
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11017
Tested-by: BuildkiteCI
Reviewed-by: flokli <flokli@flokli.de>
Autosubmit: aspen <root@gws.fyi>
This commit is contained in:
Aspen Smith 2024-02-23 10:09:20 -05:00 committed by clbot
parent 83ad32c481
commit de727bccf9
11 changed files with 454 additions and 54 deletions

2
tvix/Cargo.lock generated
View file

@ -3424,6 +3424,7 @@ dependencies = [
"nix 0.27.1",
"nix-compat",
"pretty_assertions",
"reqwest",
"rstest",
"serde",
"serde_json",
@ -3432,6 +3433,7 @@ dependencies = [
"test-case",
"thiserror",
"tokio",
"tokio-util",
"tracing",
"tvix-build",
"tvix-castore",

View file

@ -10792,6 +10792,12 @@ rec {
name = "nix-compat";
packageId = "nix-compat";
}
{
name = "reqwest";
packageId = "reqwest";
usesDefaultFeatures = false;
features = [ "rustls-tls-native-roots" ];
}
{
name = "serde";
packageId = "serde";
@ -10812,6 +10818,11 @@ rec {
name = "tokio";
packageId = "tokio";
}
{
name = "tokio-util";
packageId = "tokio-util";
features = [ "io" "io-util" "compat" ];
}
{
name = "tracing";
packageId = "tracing";

View file

@ -10,12 +10,14 @@ bytes = "1.4.0"
data-encoding = "2.3.3"
futures = "0.3.30"
nix-compat = { path = "../nix-compat" }
reqwest = { version = "0.11.22", features = ["rustls-tls-native-roots"], default-features = false }
tvix-build = { path = "../build", default-features = false, features = []}
tvix-eval = { path = "../eval" }
tvix-castore = { path = "../castore" }
tvix-store = { path = "../store", default-features = false, features = []}
tracing = "0.1.37"
tokio = "1.28.0"
tokio-util = { version = "0.7.9", features = ["io", "io-util", "compat"] }
thiserror = "1.0.38"
serde = "1.0.195"
serde_json = "1.0"

View file

@ -1,5 +1,8 @@
{ depot, ... }:
{ depot, pkgs, ... }:
(depot.tvix.crates.workspaceMembers.tvix-glue.build.override {
runTests = true;
testPreRun = ''
export SSL_CERT_FILE=${pkgs.cacert}/etc/ssl/certs/ca-bundle.crt;
'';
})

View file

@ -10,8 +10,7 @@ use std::rc::Rc;
use tvix_eval::builtin_macros::builtins;
use tvix_eval::generators::{self, emit_warning_kind, GenCo};
use tvix_eval::{
AddContext, CatchableErrorKind, CoercionKind, ErrorKind, NixAttrs, NixContext,
NixContextElement, Value, WarningKind,
AddContext, ErrorKind, NixAttrs, NixContext, NixContextElement, Value, WarningKind,
};
// Constants used for strangely named fields in derivation inputs.
@ -144,6 +143,8 @@ fn handle_fixed_output(
pub(crate) mod derivation_builtins {
use std::collections::BTreeMap;
use crate::builtins::utils::{select_string, strong_importing_coerce_to_string};
use super::*;
use bstr::ByteSlice;
use nix_compat::store_path::hash_placeholder;
@ -197,27 +198,6 @@ pub(crate) mod derivation_builtins {
drv.outputs.insert("out".to_string(), Default::default());
let mut input_context = NixContext::new();
#[inline]
async fn strong_importing_coerce_to_string(
co: &GenCo,
val: Value,
) -> Result<NixString, CatchableErrorKind> {
let val = generators::request_force(co, val).await;
match generators::request_string_coerce(
co,
val,
CoercionKind {
strong: true,
import_paths: true,
},
)
.await
{
Err(cek) => Err(cek),
Ok(val_str) => Ok(val_str),
}
}
/// Inserts a key and value into the drv.environment BTreeMap, and fails if the
/// key did already exist before.
fn insert_env(
@ -385,21 +365,6 @@ pub(crate) mod derivation_builtins {
// Configure fixed-output derivations if required.
{
async fn select_string(
co: &GenCo,
attrs: &NixAttrs,
key: &str,
) -> Result<Result<Option<String>, CatchableErrorKind>, ErrorKind> {
if let Some(attr) = attrs.select(key) {
match strong_importing_coerce_to_string(co, attr.clone()).await {
Err(cek) => return Ok(Err(cek)),
Ok(str) => return Ok(Ok(Some(str.to_str()?.to_owned()))),
}
}
Ok(Ok(None))
}
let output_hash = match select_string(&co, &input, "outputHash")
.await
.context("evaluating the `outputHash` parameter")?

View file

@ -1,5 +1,8 @@
//! Contains errors that can occur during evaluation of builtins in this crate
use nix_compat::nixhash;
use nix_compat::{
nixhash::{self, NixHash},
store_path::BuildStorePathError,
};
use std::rc::Rc;
use thiserror::Error;
@ -25,3 +28,28 @@ impl From<DerivationError> for tvix_eval::ErrorKind {
tvix_eval::ErrorKind::TvixError(Rc::new(err))
}
}
#[derive(Debug, Error)]
pub enum FetcherError {
#[error("hash mismatch in file downloaded from {url}:\n wanted: {wanted}\n got: {got}")]
HashMismatch {
url: String,
wanted: NixHash,
got: NixHash,
},
#[error("Invalid hash type '{0}' for fetcher")]
InvalidHashType(&'static str),
#[error("Error in store path for fetcher output: {0}")]
StorePath(#[from] BuildStorePathError),
#[error(transparent)]
Http(#[from] reqwest::Error),
}
impl From<FetcherError> for tvix_eval::ErrorKind {
fn from(err: FetcherError) -> Self {
tvix_eval::ErrorKind::TvixError(Rc::new(err))
}
}

View file

@ -1,9 +1,189 @@
//! Contains builtins that fetch paths from the Internet
use crate::tvix_store_io::TvixStoreIO;
use bstr::ByteSlice;
use nix_compat::nixhash::{self, CAHash};
use nix_compat::store_path::{build_ca_path, StorePathRef};
use std::rc::Rc;
use tvix_eval::builtin_macros::builtins;
use tvix_eval::Value;
use tvix_eval::generators::GenCo;
use tvix_eval::{CatchableErrorKind, ErrorKind, NixContextElement, NixString, Value};
use super::utils::select_string;
use super::{DerivationError, FetcherError};
/// Attempts to mimic `nix::libutil::baseNameOf`
fn url_basename(s: &str) -> &str {
if s.is_empty() {
return "";
}
let mut last = s.len() - 1;
if s.chars().nth(last).unwrap() == '/' && last > 0 {
last -= 1;
}
if last == 0 {
return "";
}
let pos = match s[..=last].rfind('/') {
Some(pos) => {
if pos == last - 1 {
0
} else {
pos
}
}
None => 0,
};
&s[(pos + 1)..=last]
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum HashMode {
Flat,
Recursive,
}
/// Struct representing the arguments passed to fetcher functions
#[derive(Debug, PartialEq, Eq)]
struct FetchArgs {
url: String,
name: String,
hash: Option<CAHash>,
}
impl FetchArgs {
pub fn new(
url: String,
name: Option<String>,
sha256: Option<String>,
mode: HashMode,
) -> nixhash::Result<Self> {
Ok(Self {
name: name.unwrap_or_else(|| url_basename(&url).to_owned()),
url,
hash: sha256
.map(|h| {
let hash = nixhash::from_str(&h, Some("sha256"))?;
Ok(match mode {
HashMode::Flat => Some(nixhash::CAHash::Flat(hash)),
HashMode::Recursive => Some(nixhash::CAHash::Nar(hash)),
})
})
.transpose()?
.flatten(),
})
}
fn store_path(&self) -> Result<Option<StorePathRef>, ErrorKind> {
let Some(h) = &self.hash else {
return Ok(None);
};
build_ca_path(&self.name, h, Vec::<String>::new(), false)
.map(Some)
.map_err(|e| FetcherError::from(e).into())
}
async fn extract(
co: &GenCo,
args: Value,
default_name: Option<&str>,
mode: HashMode,
) -> Result<Result<Self, CatchableErrorKind>, ErrorKind> {
if let Ok(url) = args.to_str() {
return Ok(Ok(FetchArgs::new(
url.to_str()?.to_owned(),
None,
None,
mode,
)
.map_err(DerivationError::InvalidOutputHash)?));
}
let attrs = args.to_attrs().map_err(|_| ErrorKind::TypeError {
expected: "attribute set or string",
actual: args.type_of(),
})?;
let url = match select_string(co, &attrs, "url").await? {
Ok(s) => s.ok_or_else(|| ErrorKind::AttributeNotFound { name: "url".into() })?,
Err(cek) => return Ok(Err(cek)),
};
let name = match select_string(co, &attrs, "name").await? {
Ok(s) => s.or_else(|| default_name.map(|s| s.to_owned())),
Err(cek) => return Ok(Err(cek)),
};
let sha256 = match select_string(co, &attrs, "sha256").await? {
Ok(s) => s,
Err(cek) => return Ok(Err(cek)),
};
Ok(Ok(
FetchArgs::new(url, name, sha256, mode).map_err(DerivationError::InvalidOutputHash)?
))
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum FetchMode {
Url,
Tarball,
}
impl From<FetchMode> for HashMode {
fn from(value: FetchMode) -> Self {
match value {
FetchMode::Url => HashMode::Flat,
FetchMode::Tarball => HashMode::Recursive,
}
}
}
impl FetchMode {
fn default_name(self) -> Option<&'static str> {
match self {
FetchMode::Url => None,
FetchMode::Tarball => Some("source"),
}
}
}
fn string_from_store_path(store_path: StorePathRef) -> NixString {
NixString::new_context_from(
NixContextElement::Plain(store_path.to_absolute_path()).into(),
store_path.to_absolute_path(),
)
}
async fn fetch(
state: Rc<TvixStoreIO>,
co: GenCo,
args: Value,
mode: FetchMode,
) -> Result<Value, ErrorKind> {
let args = match FetchArgs::extract(&co, args, mode.default_name(), mode.into()).await? {
Ok(args) => args,
Err(cek) => return Ok(cek.into()),
};
if let Some(store_path) = args.store_path()? {
if state.store_path_exists(store_path).await? {
return Ok(string_from_store_path(store_path).into());
}
}
let hash = args.hash.as_ref().map(|h| h.hash());
let store_path = Rc::clone(&state).tokio_handle.block_on(state.fetch_url(
&args.url,
&args.name,
hash.as_deref(),
))?;
Ok(string_from_store_path(store_path.as_ref()).into())
}
#[allow(unused_variables)] // for the `state` arg, for now
#[builtins(state = "Rc<TvixStoreIO>")]
@ -11,15 +191,14 @@ pub(crate) mod fetcher_builtins {
use super::*;
use tvix_eval::generators::Gen;
use tvix_eval::{generators::GenCo, ErrorKind};
#[builtin("fetchurl")]
async fn builtin_fetchurl(
state: Rc<TvixStoreIO>,
co: GenCo,
url: Value,
args: Value,
) -> Result<Value, ErrorKind> {
Err(ErrorKind::NotImplemented("fetchurl"))
fetch(state, co, args, FetchMode::Url).await
}
#[builtin("fetchTarball")]
@ -28,7 +207,7 @@ pub(crate) mod fetcher_builtins {
co: GenCo,
args: Value,
) -> Result<Value, ErrorKind> {
Err(ErrorKind::NotImplemented("fetchTarball"))
fetch(state, co, args, FetchMode::Tarball).await
}
#[builtin("fetchGit")]
@ -40,3 +219,71 @@ pub(crate) mod fetcher_builtins {
Err(ErrorKind::NotImplemented("fetchGit"))
}
}
#[cfg(test)]
mod tests {
use std::str::FromStr;
use nix_compat::store_path::StorePath;
use super::*;
#[test]
fn fetchurl_store_path() {
let url = "https://raw.githubusercontent.com/aaptel/notmuch-extract-patch/f732a53e12a7c91a06755ebfab2007adc9b3063b/notmuch-extract-patch";
let sha256 = "0nawkl04sj7psw6ikzay7kydj3dhd0fkwghcsf5rzaw4bmp4kbax";
let args = FetchArgs::new(url.into(), None, Some(sha256.into()), HashMode::Flat).unwrap();
assert_eq!(
args.store_path().unwrap().unwrap().to_owned(),
StorePath::from_str("06qi00hylriyfm0nl827crgjvbax84mz-notmuch-extract-patch").unwrap()
)
}
#[test]
fn fetch_tarball_store_path() {
let url = "https://github.com/NixOS/nixpkgs/archive/91050ea1e57e50388fa87a3302ba12d188ef723a.tar.gz";
let sha256 = "1hf6cgaci1n186kkkjq106ryf8mmlq9vnwgfwh625wa8hfgdn4dm";
let args = FetchArgs::new(
url.into(),
Some("source".into()),
Some(sha256.into()),
HashMode::Recursive,
)
.unwrap();
assert_eq!(
args.store_path().unwrap().unwrap().to_owned(),
StorePath::from_str("7adgvk5zdfq4pwrhsm3n9lzypb12gw0g-source").unwrap()
)
}
mod url_basename {
use super::*;
#[test]
fn empty_path() {
assert_eq!(url_basename(""), "");
}
#[test]
fn path_on_root() {
assert_eq!(url_basename("/dir"), "dir");
}
#[test]
fn relative_path() {
assert_eq!(url_basename("dir/foo"), "foo");
}
#[test]
fn root_with_trailing_slash() {
assert_eq!(url_basename("/"), "");
}
#[test]
fn trailing_slash() {
assert_eq!(url_basename("/dir/"), "dir");
}
}
}

View file

@ -8,8 +8,9 @@ mod derivation;
mod errors;
mod fetchers;
mod import;
mod utils;
pub use errors::DerivationError;
pub use errors::{DerivationError, FetcherError};
/// Adds derivation-related builtins to the passed [tvix_eval::Evaluation].
///

View file

@ -0,0 +1,36 @@
use bstr::ByteSlice;
use tvix_eval::{
generators::{self, GenCo},
CatchableErrorKind, CoercionKind, ErrorKind, NixAttrs, NixString, Value,
};
pub(super) async fn strong_importing_coerce_to_string(
co: &GenCo,
val: Value,
) -> Result<NixString, CatchableErrorKind> {
let val = generators::request_force(co, val).await;
generators::request_string_coerce(
co,
val,
CoercionKind {
strong: true,
import_paths: true,
},
)
.await
}
pub(super) async fn select_string(
co: &GenCo,
attrs: &NixAttrs,
key: &str,
) -> Result<Result<Option<String>, CatchableErrorKind>, ErrorKind> {
if let Some(attr) = attrs.select(key) {
match strong_importing_coerce_to_string(co, attr.clone()).await {
Err(cek) => return Ok(Err(cek)),
Ok(str) => return Ok(Ok(Some(str.to_str()?.to_owned()))),
}
}
Ok(Ok(None))
}

View file

@ -4,7 +4,12 @@ use async_recursion::async_recursion;
use bytes::Bytes;
use futures::Stream;
use futures::{StreamExt, TryStreamExt};
use nix_compat::nixhash::NixHash;
use nix_compat::store_path::{build_ca_path, StorePathRef};
use nix_compat::{nixhash::CAHash, store_path::StorePath};
use sha2::{Digest, Sha256};
use std::marker::Unpin;
use std::rc::Rc;
use std::{
cell::RefCell,
collections::BTreeSet,
@ -15,17 +20,18 @@ use std::{
use tokio::io::AsyncReadExt;
use tracing::{error, instrument, warn, Level};
use tvix_build::buildservice::BuildService;
use tvix_eval::{EvalIO, FileType, StdIO};
use tvix_eval::{ErrorKind, EvalIO, FileType, StdIO};
use walkdir::DirEntry;
use tvix_castore::{
blobservice::BlobService,
directoryservice::{self, DirectoryService},
proto::{node::Node, NamedNode},
proto::{node::Node, FileNode, NamedNode},
B3Digest,
};
use tvix_store::{pathinfoservice::PathInfoService, proto::PathInfo};
use crate::builtins::FetcherError;
use crate::known_paths::KnownPaths;
use crate::tvix_build::derivation_to_build_request;
@ -51,7 +57,8 @@ pub struct TvixStoreIO {
std_io: StdIO,
#[allow(dead_code)]
build_service: Arc<dyn BuildService>,
tokio_handle: tokio::runtime::Handle,
pub(crate) tokio_handle: tokio::runtime::Handle,
http_client: reqwest::Client,
pub(crate) known_paths: RefCell<KnownPaths>,
}
@ -70,6 +77,7 @@ impl TvixStoreIO {
std_io: StdIO {},
build_service,
tokio_handle,
http_client: reqwest::Client::new(),
known_paths: Default::default(),
}
}
@ -278,7 +286,7 @@ impl TvixStoreIO {
/// with a [`tokio::runtime::Handle::block_on`] call for synchronicity.
pub(crate) fn ingest_entries_sync<S>(&self, entries_stream: S) -> io::Result<Node>
where
S: Stream<Item = DirEntry> + std::marker::Unpin,
S: Stream<Item = DirEntry> + Unpin,
{
self.tokio_handle.block_on(async move {
tvix_castore::import::ingest_entries(
@ -346,6 +354,97 @@ impl TvixStoreIO {
.await
})
}
pub async fn store_path_exists<'a>(&'a self, store_path: StorePathRef<'a>) -> io::Result<bool> {
Ok(self
.path_info_service
.as_ref()
.get(*store_path.digest())
.await?
.is_some())
}
pub async fn fetch_url(
&self,
url: &str,
name: &str,
hash: Option<&NixHash>,
) -> Result<StorePath, ErrorKind> {
let resp = self
.http_client
.get(url)
.send()
.await
.map_err(FetcherError::from)?;
let mut sha = Sha256::new();
let mut data = tokio_util::io::StreamReader::new(
resp.bytes_stream()
.inspect_ok(|data| {
sha.update(data);
})
.map_err(|e| {
let e = e.without_url();
warn!(%e, "failed to get response body");
io::Error::new(io::ErrorKind::BrokenPipe, e.to_string())
}),
);
let mut blob = self.blob_service.open_write().await;
let size = tokio::io::copy(&mut data, blob.as_mut()).await?;
let blob_digest = blob.close().await?;
let got = NixHash::Sha256(sha.finalize().into());
let hash = CAHash::Flat(if let Some(wanted) = hash {
if *wanted != got {
return Err(FetcherError::HashMismatch {
url: url.to_owned(),
wanted: wanted.clone(),
got,
}
.into());
}
wanted.clone()
} else {
got
});
let path = build_ca_path(name, &hash, Vec::<String>::new(), false)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
let node = Node::File(FileNode {
name: path.to_string().into(),
digest: blob_digest.into(),
size,
executable: false,
});
let (nar_size, nar_sha256) = self
.path_info_service
.calculate_nar(&node)
.await
.map_err(|e| ErrorKind::TvixError(Rc::new(e)))?;
let path_info = PathInfo {
node: Some(tvix_castore::proto::Node {
node: Some(node.clone()),
}),
references: vec![],
narinfo: Some(tvix_store::proto::NarInfo {
nar_size,
nar_sha256: nar_sha256.to_vec().into(),
signatures: vec![],
reference_names: vec![],
deriver: None, /* ? */
ca: Some((&hash).into()),
}),
};
self.path_info_service
.put(path_info)
.await
.map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?;
Ok(path.to_owned())
}
}
impl EvalIO for TvixStoreIO {

View file

@ -29,7 +29,7 @@ pub enum Error {
#[error("Dash is missing between hash and name")]
MissingDash,
#[error("Hash encoding is invalid: {0}")]
InvalidHashEncoding(DecodeError),
InvalidHashEncoding(#[from] DecodeError),
#[error("Invalid length")]
InvalidLength,
#[error(
@ -67,6 +67,13 @@ impl StorePath {
pub fn name(&self) -> &str {
self.name.as_ref()
}
pub fn as_ref(&self) -> StorePathRef<'_> {
StorePathRef {
digest: self.digest,
name: &self.name,
}
}
}
impl PartialOrd for StorePath {
@ -176,7 +183,7 @@ impl Serialize for StorePath {
/// Like [StorePath], but without a heap allocation for the name.
/// Used by [StorePath] for parsing.
///
#[derive(Debug, Eq, PartialEq)]
#[derive(Debug, Eq, PartialEq, Clone, Copy)]
pub struct StorePathRef<'a> {
digest: [u8; DIGEST_SIZE],
name: &'a str,
@ -237,8 +244,7 @@ impl<'a> StorePathRef<'a> {
Err(Error::InvalidLength)?
}
let digest = nixbase32::decode_fixed(&s[..ENCODED_DIGEST_SIZE])
.map_err(Error::InvalidHashEncoding)?;
let digest = nixbase32::decode_fixed(&s[..ENCODED_DIGEST_SIZE])?;
if s[ENCODED_DIGEST_SIZE] != b'-' {
return Err(Error::MissingDash);