fix(tvix): Avoid buffering file into memory in builtins.hashFile
Right now `builtins.hashFile` always reads the entire file into memory before hashing, which is not ideal for large files. This replaces `read_to_string` with `open_file` which allows calculating the hash of the file without buffering it entirely into memory. Other callers can continue to buffer into memory if they choose, but they still use the `open_file` VM request and then call `read_to_string` or `read_to_end` on the `std::io::Reader`. Fixes b/380 Change-Id: Ifa1c8324bcee8f751604b0b449feab875c632fda Reviewed-on: https://cl.tvl.fyi/c/depot/+/11236 Reviewed-by: flokli <flokli@flokli.de> Tested-by: BuildkiteCI
This commit is contained in:
parent
17849c5c00
commit
63116d8c21
9 changed files with 80 additions and 74 deletions
|
@ -6,18 +6,22 @@ use sha2::{digest::Output, Digest, Sha256, Sha512};
|
||||||
|
|
||||||
use crate::ErrorKind;
|
use crate::ErrorKind;
|
||||||
|
|
||||||
fn hash<D: Digest>(b: &[u8]) -> Output<D> {
|
/// Reads through all data from the passed reader, and returns the resulting [Digest].
|
||||||
|
/// The exact hash function used is left generic over all [Digest].
|
||||||
|
fn hash<D: Digest + std::io::Write>(mut r: impl std::io::Read) -> Result<Output<D>, ErrorKind> {
|
||||||
let mut hasher = D::new();
|
let mut hasher = D::new();
|
||||||
hasher.update(b);
|
std::io::copy(&mut r, &mut hasher)?;
|
||||||
hasher.finalize()
|
Ok(hasher.finalize())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn hash_nix_string(algo: impl AsRef<[u8]>, s: impl AsRef<[u8]>) -> Result<String, ErrorKind> {
|
/// For a given algo "string" and reader for data, calculate the digest
|
||||||
|
/// and return it as a hexlower encoded [String].
|
||||||
|
pub fn hash_nix_string(algo: impl AsRef<[u8]>, s: impl std::io::Read) -> Result<String, ErrorKind> {
|
||||||
match algo.as_ref() {
|
match algo.as_ref() {
|
||||||
b"md5" => Ok(HEXLOWER.encode(hash::<Md5>(s.as_ref()).as_bstr())),
|
b"md5" => Ok(HEXLOWER.encode(hash::<Md5>(s)?.as_bstr())),
|
||||||
b"sha1" => Ok(HEXLOWER.encode(hash::<Sha1>(s.as_ref()).as_bstr())),
|
b"sha1" => Ok(HEXLOWER.encode(hash::<Sha1>(s)?.as_bstr())),
|
||||||
b"sha256" => Ok(HEXLOWER.encode(hash::<Sha256>(s.as_ref()).as_bstr())),
|
b"sha256" => Ok(HEXLOWER.encode(hash::<Sha256>(s)?.as_bstr())),
|
||||||
b"sha512" => Ok(HEXLOWER.encode(hash::<Sha512>(s.as_ref()).as_bstr())),
|
b"sha512" => Ok(HEXLOWER.encode(hash::<Sha512>(s)?.as_bstr())),
|
||||||
_ => Err(ErrorKind::UnknownHashType(
|
_ => Err(ErrorKind::UnknownHashType(
|
||||||
algo.as_ref().as_bstr().to_string(),
|
algo.as_ref().as_bstr().to_string(),
|
||||||
)),
|
)),
|
||||||
|
|
|
@ -31,14 +31,13 @@ mod impure_builtins {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[builtin("hashFile")]
|
#[builtin("hashFile")]
|
||||||
#[allow(non_snake_case)]
|
async fn builtin_hash_file(co: GenCo, algo: Value, path: Value) -> Result<Value, ErrorKind> {
|
||||||
async fn builtin_hashFile(co: GenCo, algo: Value, path: Value) -> Result<Value, ErrorKind> {
|
|
||||||
let path = match coerce_value_to_path(&co, path).await? {
|
let path = match coerce_value_to_path(&co, path).await? {
|
||||||
Err(cek) => return Ok(Value::from(cek)),
|
Err(cek) => return Ok(Value::from(cek)),
|
||||||
Ok(p) => p,
|
Ok(p) => p,
|
||||||
};
|
};
|
||||||
let s = generators::request_read_to_string(&co, path).await;
|
let r = generators::request_open_file(&co, path).await;
|
||||||
hash_nix_string(algo.to_str()?, s.to_str()?).map(Value::from)
|
Ok(hash_nix_string(algo.to_str()?, r).map(Value::from)?)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[builtin("pathExists")]
|
#[builtin("pathExists")]
|
||||||
|
@ -79,7 +78,13 @@ mod impure_builtins {
|
||||||
async fn builtin_read_file(co: GenCo, path: Value) -> Result<Value, ErrorKind> {
|
async fn builtin_read_file(co: GenCo, path: Value) -> Result<Value, ErrorKind> {
|
||||||
match coerce_value_to_path(&co, path).await? {
|
match coerce_value_to_path(&co, path).await? {
|
||||||
Err(cek) => Ok(Value::from(cek)),
|
Err(cek) => Ok(Value::from(cek)),
|
||||||
Ok(path) => Ok(generators::request_read_to_string(&co, path).await),
|
Ok(path) => {
|
||||||
|
let mut buf = Vec::new();
|
||||||
|
generators::request_open_file(&co, path)
|
||||||
|
.await
|
||||||
|
.read_to_end(&mut buf)?;
|
||||||
|
Ok(Value::from(buf))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -773,9 +773,8 @@ mod pure_builtins {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[builtin("hashString")]
|
#[builtin("hashString")]
|
||||||
#[allow(non_snake_case)]
|
async fn builtin_hash_string(co: GenCo, algo: Value, s: Value) -> Result<Value, ErrorKind> {
|
||||||
async fn builtin_hashString(co: GenCo, algo: Value, s: Value) -> Result<Value, ErrorKind> {
|
hash_nix_string(algo.to_str()?, std::io::Cursor::new(s.to_str()?)).map(Value::from)
|
||||||
hash_nix_string(algo.to_str()?, s.to_str()?).map(Value::from)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[builtin("head")]
|
#[builtin("head")]
|
||||||
|
|
|
@ -6,7 +6,6 @@
|
||||||
//! instance, or observers).
|
//! instance, or observers).
|
||||||
|
|
||||||
use super::GlobalsMap;
|
use super::GlobalsMap;
|
||||||
use bstr::ByteSlice;
|
|
||||||
use genawaiter::rc::Gen;
|
use genawaiter::rc::Gen;
|
||||||
use std::rc::Weak;
|
use std::rc::Weak;
|
||||||
|
|
||||||
|
@ -39,9 +38,11 @@ async fn import_impl(
|
||||||
return Ok(cached);
|
return Ok(cached);
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(tazjin): make this return a string directly instead
|
let mut reader = generators::request_open_file(&co, path.clone()).await;
|
||||||
let contents: Value = generators::request_read_to_string(&co, path.clone()).await;
|
// We read to a String instead of a Vec<u8> because rnix only supports
|
||||||
let contents = contents.to_str()?.to_str()?.to_owned();
|
// string source files.
|
||||||
|
let mut contents = String::new();
|
||||||
|
reader.read_to_string(&mut contents)?;
|
||||||
|
|
||||||
let parsed = rnix::ast::Root::parse(&contents);
|
let parsed = rnix::ast::Root::parse(&contents);
|
||||||
let errors = parsed.errors();
|
let errors = parsed.errors();
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
//! how store paths are opened and so on.
|
//! how store paths are opened and so on.
|
||||||
|
|
||||||
use std::{
|
use std::{
|
||||||
|
fs::File,
|
||||||
io,
|
io,
|
||||||
path::{Path, PathBuf},
|
path::{Path, PathBuf},
|
||||||
};
|
};
|
||||||
|
@ -48,13 +49,8 @@ pub trait EvalIO {
|
||||||
/// * `builtins.pathExists :: path -> bool`
|
/// * `builtins.pathExists :: path -> bool`
|
||||||
fn path_exists(&self, path: &Path) -> io::Result<bool>;
|
fn path_exists(&self, path: &Path) -> io::Result<bool>;
|
||||||
|
|
||||||
/// Read the file at the specified path to a `Vec<u8>`.
|
/// Open the file at the specified path to a `io::Read`.
|
||||||
///
|
fn open(&self, path: &Path) -> io::Result<Box<dyn io::Read>>;
|
||||||
/// This is used for the following language evaluation cases:
|
|
||||||
///
|
|
||||||
/// * `builtins.readFile :: path -> string`
|
|
||||||
/// * `builtins.import :: path -> any`
|
|
||||||
fn read_to_end(&self, path: &Path) -> io::Result<Vec<u8>>;
|
|
||||||
|
|
||||||
/// Read the directory at the specified path and return the names
|
/// Read the directory at the specified path and return the names
|
||||||
/// of its entries associated with their [`FileType`].
|
/// of its entries associated with their [`FileType`].
|
||||||
|
@ -99,8 +95,8 @@ impl EvalIO for StdIO {
|
||||||
path.try_exists()
|
path.try_exists()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn read_to_end(&self, path: &Path) -> io::Result<Vec<u8>> {
|
fn open(&self, path: &Path) -> io::Result<Box<dyn io::Read>> {
|
||||||
std::fs::read(path)
|
Ok(Box::new(File::open(path)?))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn read_dir(&self, path: &Path) -> io::Result<Vec<(bytes::Bytes, FileType)>> {
|
fn read_dir(&self, path: &Path) -> io::Result<Vec<(bytes::Bytes, FileType)>> {
|
||||||
|
@ -145,7 +141,7 @@ impl EvalIO for DummyIO {
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn read_to_end(&self, _: &Path) -> io::Result<Vec<u8>> {
|
fn open(&self, _: &Path) -> io::Result<Box<dyn io::Read>> {
|
||||||
Err(io::Error::new(
|
Err(io::Error::new(
|
||||||
io::ErrorKind::Unsupported,
|
io::ErrorKind::Unsupported,
|
||||||
"I/O methods are not implemented in DummyIO",
|
"I/O methods are not implemented in DummyIO",
|
||||||
|
|
|
@ -102,8 +102,8 @@ pub enum VMRequest {
|
||||||
/// Request that the VM imports the given path through its I/O interface.
|
/// Request that the VM imports the given path through its I/O interface.
|
||||||
PathImport(PathBuf),
|
PathImport(PathBuf),
|
||||||
|
|
||||||
/// Request that the VM reads the given path to a string.
|
/// Request that the VM opens the specified file and provides a reader.
|
||||||
ReadToString(PathBuf),
|
OpenFile(PathBuf),
|
||||||
|
|
||||||
/// Request that the VM checks whether the given path exists.
|
/// Request that the VM checks whether the given path exists.
|
||||||
PathExists(PathBuf),
|
PathExists(PathBuf),
|
||||||
|
@ -170,8 +170,8 @@ impl Display for VMRequest {
|
||||||
write!(f, "import_cache_put({})", p.to_string_lossy())
|
write!(f, "import_cache_put({})", p.to_string_lossy())
|
||||||
}
|
}
|
||||||
VMRequest::PathImport(p) => write!(f, "path_import({})", p.to_string_lossy()),
|
VMRequest::PathImport(p) => write!(f, "path_import({})", p.to_string_lossy()),
|
||||||
VMRequest::ReadToString(p) => {
|
VMRequest::OpenFile(p) => {
|
||||||
write!(f, "read_to_string({})", p.to_string_lossy())
|
write!(f, "open_file({})", p.to_string_lossy())
|
||||||
}
|
}
|
||||||
VMRequest::PathExists(p) => write!(f, "path_exists({})", p.to_string_lossy()),
|
VMRequest::PathExists(p) => write!(f, "path_exists({})", p.to_string_lossy()),
|
||||||
VMRequest::ReadDir(p) => write!(f, "read_dir({})", p.to_string_lossy()),
|
VMRequest::ReadDir(p) => write!(f, "read_dir({})", p.to_string_lossy()),
|
||||||
|
@ -199,6 +199,9 @@ pub enum VMResponse {
|
||||||
|
|
||||||
/// VM response with a span to use at the current point.
|
/// VM response with a span to use at the current point.
|
||||||
Span(LightSpan),
|
Span(LightSpan),
|
||||||
|
|
||||||
|
/// [std::io::Reader] produced by the VM in response to some IO operation.
|
||||||
|
Reader(Box<dyn std::io::Read>),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Display for VMResponse {
|
impl Display for VMResponse {
|
||||||
|
@ -209,6 +212,7 @@ impl Display for VMResponse {
|
||||||
VMResponse::Path(p) => write!(f, "path({})", p.to_string_lossy()),
|
VMResponse::Path(p) => write!(f, "path({})", p.to_string_lossy()),
|
||||||
VMResponse::Directory(d) => write!(f, "dir(len = {})", d.len()),
|
VMResponse::Directory(d) => write!(f, "dir(len = {})", d.len()),
|
||||||
VMResponse::Span(_) => write!(f, "span"),
|
VMResponse::Span(_) => write!(f, "span"),
|
||||||
|
VMResponse::Reader(_) => write!(f, "reader"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -425,18 +429,18 @@ where
|
||||||
message = VMResponse::Path(imported);
|
message = VMResponse::Path(imported);
|
||||||
}
|
}
|
||||||
|
|
||||||
VMRequest::ReadToString(path) => {
|
VMRequest::OpenFile(path) => {
|
||||||
let content = self
|
let reader = self
|
||||||
.io_handle
|
.io_handle
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.read_to_end(&path)
|
.open(&path)
|
||||||
.map_err(|e| ErrorKind::IO {
|
.map_err(|e| ErrorKind::IO {
|
||||||
path: Some(path),
|
path: Some(path),
|
||||||
error: e.into(),
|
error: e.into(),
|
||||||
})
|
})
|
||||||
.with_span(&span, self)?;
|
.with_span(&span, self)?;
|
||||||
|
|
||||||
message = VMResponse::Value(content.into())
|
message = VMResponse::Reader(reader)
|
||||||
}
|
}
|
||||||
|
|
||||||
VMRequest::PathExists(path) => {
|
VMRequest::PathExists(path) => {
|
||||||
|
@ -730,9 +734,10 @@ pub(crate) async fn request_path_import(co: &GenCo, path: PathBuf) -> PathBuf {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn request_read_to_string(co: &GenCo, path: PathBuf) -> Value {
|
/// Request that the VM open a [std::io::Read] for the specified file.
|
||||||
match co.yield_(VMRequest::ReadToString(path)).await {
|
pub async fn request_open_file(co: &GenCo, path: PathBuf) -> Box<dyn std::io::Read> {
|
||||||
VMResponse::Value(value) => value,
|
match co.yield_(VMRequest::OpenFile(path)).await {
|
||||||
|
VMResponse::Reader(value) => value,
|
||||||
msg => panic!(
|
msg => panic!(
|
||||||
"Tvix bug: VM responded with incorrect generator message: {}",
|
"Tvix bug: VM responded with incorrect generator message: {}",
|
||||||
msg
|
msg
|
||||||
|
|
|
@ -177,9 +177,9 @@ mod import_builtins {
|
||||||
})
|
})
|
||||||
.transpose()?;
|
.transpose()?;
|
||||||
|
|
||||||
// FUTUREWORK(performance): this reads the file instead of using a stat-like
|
// FUTUREWORK(performance): this opens the file instead of using a stat-like
|
||||||
// system call to the file, this degrades very badly on large files.
|
// system call to the file.
|
||||||
if !recursive_ingestion && state.read_to_end(path.as_ref()).is_err() {
|
if !recursive_ingestion && state.open(path.as_ref()).is_err() {
|
||||||
Err(ImportError::FlatImportOfNonFile(
|
Err(ImportError::FlatImportOfNonFile(
|
||||||
path.to_string_lossy().to_string(),
|
path.to_string_lossy().to_string(),
|
||||||
))?;
|
))?;
|
||||||
|
|
|
@ -8,7 +8,7 @@
|
||||||
//! otherwise fundamental features like nixpkgs bootstrapping and hash
|
//! otherwise fundamental features like nixpkgs bootstrapping and hash
|
||||||
//! calculation will not work.
|
//! calculation will not work.
|
||||||
|
|
||||||
use std::io;
|
use std::io::{self, Cursor};
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
use tvix_eval::{EvalIO, FileType};
|
use tvix_eval::{EvalIO, FileType};
|
||||||
|
|
||||||
|
@ -44,7 +44,7 @@ where
|
||||||
self.actual.as_ref().path_exists(path)
|
self.actual.as_ref().path_exists(path)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn read_to_end(&self, path: &Path) -> io::Result<Vec<u8>> {
|
fn open(&self, path: &Path) -> io::Result<Box<dyn io::Read>> {
|
||||||
// Bundled version of corepkgs/fetchurl.nix. The counterpart
|
// Bundled version of corepkgs/fetchurl.nix. The counterpart
|
||||||
// of this happens in [crate::configure_nix_path], where the `nix_path`
|
// of this happens in [crate::configure_nix_path], where the `nix_path`
|
||||||
// of the evaluation has `nix=/__corepkgs__` added to it.
|
// of the evaluation has `nix=/__corepkgs__` added to it.
|
||||||
|
@ -52,13 +52,12 @@ where
|
||||||
// This workaround is similar to what cppnix does for passing
|
// This workaround is similar to what cppnix does for passing
|
||||||
// the path through.
|
// the path through.
|
||||||
//
|
//
|
||||||
// TODO: this comparison is bad and allocates, we should use
|
// TODO: this comparison is bad we should use the sane path library.
|
||||||
// the sane path library.
|
|
||||||
if path.starts_with("/__corepkgs__/fetchurl.nix") {
|
if path.starts_with("/__corepkgs__/fetchurl.nix") {
|
||||||
return Ok(include_bytes!("fetchurl.nix").to_vec());
|
return Ok(Box::new(Cursor::new(include_bytes!("fetchurl.nix"))));
|
||||||
}
|
}
|
||||||
|
|
||||||
self.actual.as_ref().read_to_end(path)
|
self.actual.as_ref().open(path)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn read_dir(&self, path: &Path) -> io::Result<Vec<(bytes::Bytes, FileType)>> {
|
fn read_dir(&self, path: &Path) -> io::Result<Vec<(bytes::Bytes, FileType)>> {
|
||||||
|
|
|
@ -17,7 +17,7 @@ use std::{
|
||||||
path::{Path, PathBuf},
|
path::{Path, PathBuf},
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
};
|
};
|
||||||
use tokio::io::AsyncReadExt;
|
use tokio_util::io::SyncIoBridge;
|
||||||
use tracing::{error, instrument, warn, Level};
|
use tracing::{error, instrument, warn, Level};
|
||||||
use tvix_build::buildservice::BuildService;
|
use tvix_build::buildservice::BuildService;
|
||||||
use tvix_eval::{ErrorKind, EvalIO, FileType, StdIO};
|
use tvix_eval::{ErrorKind, EvalIO, FileType, StdIO};
|
||||||
|
@ -478,7 +478,7 @@ impl EvalIO for TvixStoreIO {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[instrument(skip(self), err)]
|
#[instrument(skip(self), err)]
|
||||||
fn read_to_end(&self, path: &Path) -> io::Result<Vec<u8>> {
|
fn open(&self, path: &Path) -> io::Result<Box<dyn io::Read>> {
|
||||||
if let Ok((store_path, sub_path)) =
|
if let Ok((store_path, sub_path)) =
|
||||||
StorePath::from_absolute_path_full(&path.to_string_lossy())
|
StorePath::from_absolute_path_full(&path.to_string_lossy())
|
||||||
{
|
{
|
||||||
|
@ -509,10 +509,13 @@ impl EvalIO for TvixStoreIO {
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
self.tokio_handle.block_on(async {
|
self.tokio_handle.block_on(async {
|
||||||
let mut reader = {
|
|
||||||
let resp = self.blob_service.as_ref().open_read(&digest).await?;
|
let resp = self.blob_service.as_ref().open_read(&digest).await?;
|
||||||
match resp {
|
match resp {
|
||||||
Some(blob_reader) => blob_reader,
|
Some(blob_reader) => {
|
||||||
|
// The VM Response needs a sync [std::io::Reader].
|
||||||
|
Ok(Box::new(SyncIoBridge::new(blob_reader))
|
||||||
|
as Box<dyn io::Read>)
|
||||||
|
}
|
||||||
None => {
|
None => {
|
||||||
error!(
|
error!(
|
||||||
blob.digest = %digest,
|
blob.digest = %digest,
|
||||||
|
@ -521,15 +524,9 @@ impl EvalIO for TvixStoreIO {
|
||||||
Err(io::Error::new(
|
Err(io::Error::new(
|
||||||
io::ErrorKind::NotFound,
|
io::ErrorKind::NotFound,
|
||||||
format!("blob {} not found", &digest),
|
format!("blob {} not found", &digest),
|
||||||
))?
|
))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
|
||||||
|
|
||||||
let mut buf = Vec::new();
|
|
||||||
|
|
||||||
reader.read_to_end(&mut buf).await?;
|
|
||||||
Ok(buf)
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
Node::Symlink(_symlink_node) => Err(io::Error::new(
|
Node::Symlink(_symlink_node) => Err(io::Error::new(
|
||||||
|
@ -540,11 +537,11 @@ impl EvalIO for TvixStoreIO {
|
||||||
} else {
|
} else {
|
||||||
// As tvix-store doesn't manage /nix/store on the filesystem,
|
// As tvix-store doesn't manage /nix/store on the filesystem,
|
||||||
// we still need to also ask self.std_io here.
|
// we still need to also ask self.std_io here.
|
||||||
self.std_io.read_to_end(path)
|
self.std_io.open(path)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// The store path is no store path, so do regular StdIO.
|
// The store path is no store path, so do regular StdIO.
|
||||||
self.std_io.read_to_end(path)
|
self.std_io.open(path)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue