feat(tvix/store/pathinfosvc/nix_http): check Nar{Size,Hash} matches
Ensure the initially communicated NarHash/NarSize from the NarInfo matches what we read, and don't return a PathInfo message if there's a mismatch. Also move the buffering layer around a bit. Change-Id: I68c60ecfaf0f9cd5edacea648437ecb0c9729251 Reviewed-on: https://cl.tvl.fyi/c/depot/+/10148 Reviewed-by: tazjin <tazjin@tvl.su> Tested-by: BuildkiteCI
This commit is contained in:
parent
fce9c0e99e
commit
7e8719be91
1 changed files with 84 additions and 13 deletions
|
@ -1,5 +1,5 @@
|
|||
use std::{
|
||||
io::{self, BufRead},
|
||||
io::{self, BufRead, Read, Write},
|
||||
pin::Pin,
|
||||
sync::Arc,
|
||||
};
|
||||
|
@ -8,6 +8,7 @@ use data_encoding::BASE64;
|
|||
use futures::{Stream, TryStreamExt};
|
||||
use nix_compat::{narinfo::NarInfo, nixbase32};
|
||||
use reqwest::StatusCode;
|
||||
use sha2::{digest::FixedOutput, Digest, Sha256};
|
||||
use tonic::async_trait;
|
||||
use tracing::{debug, instrument, warn};
|
||||
use tvix_castore::{
|
||||
|
@ -147,12 +148,12 @@ impl PathInfoService for NixHTTPPathInfoService {
|
|||
warn!(e=%e, "failed to get response body");
|
||||
io::Error::new(io::ErrorKind::BrokenPipe, e.to_string())
|
||||
}));
|
||||
let sync_r = std::io::BufReader::new(tokio_util::io::SyncIoBridge::new(async_r));
|
||||
let sync_r = tokio_util::io::SyncIoBridge::new(async_r);
|
||||
|
||||
// handle decompression, by wrapping the reader.
|
||||
let mut sync_r: Box<dyn BufRead + Send> = match narinfo.compression {
|
||||
let sync_r: Box<dyn BufRead + Send> = match narinfo.compression {
|
||||
Some("none") => Box::new(sync_r),
|
||||
Some("xz") => Box::new(std::io::BufReader::new(xz2::read::XzDecoder::new(sync_r))),
|
||||
Some("xz") => Box::new(io::BufReader::new(xz2::read::XzDecoder::new(sync_r))),
|
||||
Some(comp) => {
|
||||
return Err(Error::InvalidRequest(
|
||||
format!("unsupported compression: {}", comp).to_string(),
|
||||
|
@ -168,20 +169,55 @@ impl PathInfoService for NixHTTPPathInfoService {
|
|||
let res = tokio::task::spawn_blocking({
|
||||
let blob_service = self.blob_service.clone();
|
||||
let directory_service = self.directory_service.clone();
|
||||
move || crate::nar::read_nar(&mut sync_r, blob_service, directory_service)
|
||||
move || -> io::Result<_> {
|
||||
// Wrap the reader once more, so we can calculate NarSize and NarHash
|
||||
let mut sync_r = io::BufReader::new(NarReader::from(sync_r));
|
||||
let root_node = crate::nar::read_nar(&mut sync_r, blob_service, directory_service)?;
|
||||
|
||||
let (_, nar_hash, nar_size) = sync_r.into_inner().into_inner();
|
||||
|
||||
Ok((root_node, nar_hash, nar_size))
|
||||
}
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
match res {
|
||||
Ok(root_node) => Ok(Some(PathInfo {
|
||||
node: Some(castorepb::Node {
|
||||
// set the name of the root node to the digest-name of the store path.
|
||||
node: Some(root_node.rename(narinfo.store_path.to_string().to_owned().into())),
|
||||
}),
|
||||
references: pathinfo.references,
|
||||
narinfo: pathinfo.narinfo,
|
||||
})),
|
||||
Ok((root_node, nar_hash, nar_size)) => {
|
||||
// ensure the ingested narhash and narsize do actually match.
|
||||
if narinfo.nar_size != nar_size {
|
||||
warn!(
|
||||
narinfo.nar_size = narinfo.nar_size,
|
||||
http.nar_size = nar_size,
|
||||
"NARSize mismatch"
|
||||
);
|
||||
Err(io::Error::new(
|
||||
io::ErrorKind::InvalidData,
|
||||
"NarSize mismatch".to_string(),
|
||||
))?;
|
||||
} else if narinfo.nar_hash != nar_hash {
|
||||
warn!(
|
||||
narinfo.nar_hash = BASE64.encode(&narinfo.nar_hash),
|
||||
http.nar_hash = BASE64.encode(&nar_hash),
|
||||
"NarHash mismatch"
|
||||
);
|
||||
Err(io::Error::new(
|
||||
io::ErrorKind::InvalidData,
|
||||
"NarHash mismatch".to_string(),
|
||||
))?;
|
||||
}
|
||||
|
||||
Ok(Some(PathInfo {
|
||||
node: Some(castorepb::Node {
|
||||
// set the name of the root node to the digest-name of the store path.
|
||||
node: Some(
|
||||
root_node.rename(narinfo.store_path.to_string().to_owned().into()),
|
||||
),
|
||||
}),
|
||||
references: pathinfo.references,
|
||||
narinfo: pathinfo.narinfo,
|
||||
}))
|
||||
}
|
||||
Err(e) => Err(e.into()),
|
||||
}
|
||||
}
|
||||
|
@ -211,3 +247,38 @@ impl PathInfoService for NixHTTPPathInfoService {
|
|||
}))
|
||||
}
|
||||
}
|
||||
|
||||
/// Small helper reader implementing [std::io::Read].
|
||||
/// It can be used to wrap another reader, counts the number of bytes read
|
||||
/// and the sha256 digest of the contents.
|
||||
struct NarReader<R: Read> {
|
||||
r: R,
|
||||
|
||||
sha256: sha2::Sha256,
|
||||
bytes_read: u64,
|
||||
}
|
||||
|
||||
impl<R: Read> NarReader<R> {
|
||||
pub fn from(inner: R) -> Self {
|
||||
Self {
|
||||
r: inner,
|
||||
sha256: Sha256::new(),
|
||||
bytes_read: 0,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the (remaining) inner reader, the sha256 digest and the number of bytes read.
|
||||
pub fn into_inner(self) -> (R, [u8; 32], u64) {
|
||||
(self.r, self.sha256.finalize_fixed().into(), self.bytes_read)
|
||||
}
|
||||
}
|
||||
|
||||
impl<R: Read> Read for NarReader<R> {
|
||||
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
||||
self.r.read(buf).map(|n| {
|
||||
self.bytes_read += n as u64;
|
||||
self.sha256.write_all(&buf[..n]).unwrap();
|
||||
n
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue