refactor(tvix/nar-bridge): combine writers/readers
We can drop most of Hasher if we use a MultiWriter writing to the hash function and a minimal CountingWriter. This should make things a bit more understandable. Change-Id: I37ee72d9a5c73f253aecc1ad761cb723389b89fc Reviewed-on: https://cl.tvl.fyi/c/depot/+/9529 Autosubmit: flokli <flokli@flokli.de> Reviewed-by: Connor Brewster <cbrewster@hey.com> Tested-by: BuildkiteCI
This commit is contained in:
parent
f92b0ef933
commit
49b427d773
3 changed files with 37 additions and 80 deletions
|
@ -1,66 +0,0 @@
|
||||||
package hashers
|
|
||||||
|
|
||||||
import (
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"hash"
|
|
||||||
"io"
|
|
||||||
)
|
|
||||||
|
|
||||||
var _ io.Reader = &Hasher{}
|
|
||||||
|
|
||||||
// Hasher wraps io.Reader.
|
|
||||||
// You can ask it for the digest of the hash function used internally, and the
|
|
||||||
// number of bytes written.
|
|
||||||
type Hasher struct {
|
|
||||||
r io.Reader
|
|
||||||
h hash.Hash
|
|
||||||
bytesRead uint32
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewHasher(r io.Reader, h hash.Hash) *Hasher {
|
|
||||||
return &Hasher{
|
|
||||||
r: r,
|
|
||||||
h: h,
|
|
||||||
bytesRead: 0,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *Hasher) Read(p []byte) (int, error) {
|
|
||||||
nRead, rdErr := h.r.Read(p)
|
|
||||||
|
|
||||||
// write the number of bytes read from the reader to the hash.
|
|
||||||
// We need to do this independently on whether there's been error.
|
|
||||||
// n always describes the number of successfully written bytes.
|
|
||||||
nHash, hashErr := h.h.Write(p[0:nRead])
|
|
||||||
if hashErr != nil {
|
|
||||||
return nRead, fmt.Errorf("unable to write to hash: %w", hashErr)
|
|
||||||
}
|
|
||||||
|
|
||||||
// We assume here the hash function accepts the whole p in one Go,
|
|
||||||
// and doesn't early-return on the Write.
|
|
||||||
// We compare it with nRead and bail out if that was not the case.
|
|
||||||
if nHash != nRead {
|
|
||||||
return nRead, fmt.Errorf("hash didn't accept the full write")
|
|
||||||
}
|
|
||||||
|
|
||||||
// update bytesWritten
|
|
||||||
h.bytesRead += uint32(nRead)
|
|
||||||
|
|
||||||
if rdErr != nil {
|
|
||||||
if errors.Is(rdErr, io.EOF) {
|
|
||||||
return nRead, rdErr
|
|
||||||
}
|
|
||||||
return nRead, fmt.Errorf("error from underlying reader: %w", rdErr)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nRead, hashErr
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *Hasher) BytesWritten() uint32 {
|
|
||||||
return h.bytesRead
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *Hasher) Sum(b []byte) []byte {
|
|
||||||
return h.h.Sum(b)
|
|
||||||
}
|
|
21
tvix/nar-bridge/pkg/importer/counting_writer.go
Normal file
21
tvix/nar-bridge/pkg/importer/counting_writer.go
Normal file
|
@ -0,0 +1,21 @@
|
||||||
|
package importer
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io"
|
||||||
|
)
|
||||||
|
|
||||||
|
// CountingWriter implements io.Writer.
|
||||||
|
var _ io.Writer = &CountingWriter{}
|
||||||
|
|
||||||
|
type CountingWriter struct {
|
||||||
|
bytesWritten uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cw *CountingWriter) Write(p []byte) (n int, err error) {
|
||||||
|
cw.bytesWritten += uint64(len(p))
|
||||||
|
return len(p), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cw *CountingWriter) BytesWritten() uint64 {
|
||||||
|
return cw.bytesWritten
|
||||||
|
}
|
|
@ -10,10 +10,8 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
castorev1pb "code.tvl.fyi/tvix/castore/protos"
|
castorev1pb "code.tvl.fyi/tvix/castore/protos"
|
||||||
"code.tvl.fyi/tvix/nar-bridge/pkg/hashers"
|
|
||||||
storev1pb "code.tvl.fyi/tvix/store/protos"
|
storev1pb "code.tvl.fyi/tvix/store/protos"
|
||||||
"github.com/nix-community/go-nix/pkg/nar"
|
"github.com/nix-community/go-nix/pkg/nar"
|
||||||
"lukechampine.com/blake3"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// An item on the directories stack
|
// An item on the directories stack
|
||||||
|
@ -34,12 +32,15 @@ func Import(
|
||||||
// callback function called with each finalized directory node
|
// callback function called with each finalized directory node
|
||||||
directoryCb func(directory *castorev1pb.Directory) ([]byte, error),
|
directoryCb func(directory *castorev1pb.Directory) ([]byte, error),
|
||||||
) (*storev1pb.PathInfo, error) {
|
) (*storev1pb.PathInfo, error) {
|
||||||
// wrap the passed reader in a reader that records the number of bytes read and
|
// We need to wrap the underlying reader a bit.
|
||||||
// their sha256 sum.
|
// - we want to keep track of the number of bytes read in total
|
||||||
hr := hashers.NewHasher(r, sha256.New())
|
// - we calculate the sha256 digest over all data read
|
||||||
|
// Express these two things in a MultiWriter, and give the NAR reader a
|
||||||
// construct a NAR reader from the underlying data.
|
// TeeReader that writes to it.
|
||||||
narReader, err := nar.NewReader(hr)
|
narCountW := &CountingWriter{}
|
||||||
|
sha256W := sha256.New()
|
||||||
|
multiW := io.MultiWriter(narCountW, sha256W)
|
||||||
|
narReader, err := nar.NewReader(io.TeeReader(r, multiW))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to instantiate nar reader: %w", err)
|
return nil, fmt.Errorf("failed to instantiate nar reader: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -132,8 +133,8 @@ func Import(
|
||||||
Node: nil,
|
Node: nil,
|
||||||
References: [][]byte{},
|
References: [][]byte{},
|
||||||
Narinfo: &storev1pb.NARInfo{
|
Narinfo: &storev1pb.NARInfo{
|
||||||
NarSize: uint64(hr.BytesWritten()),
|
NarSize: narCountW.BytesWritten(),
|
||||||
NarSha256: hr.Sum(nil),
|
NarSha256: sha256W.Sum(nil),
|
||||||
Signatures: []*storev1pb.NARInfo_Signature{},
|
Signatures: []*storev1pb.NARInfo_Signature{},
|
||||||
ReferenceNames: []string{},
|
ReferenceNames: []string{},
|
||||||
},
|
},
|
||||||
|
@ -202,8 +203,9 @@ func Import(
|
||||||
|
|
||||||
}
|
}
|
||||||
if hdr.Type == nar.TypeRegular {
|
if hdr.Type == nar.TypeRegular {
|
||||||
// wrap reader with a reader calculating the blake3 hash
|
// wrap reader with a reader counting the number of bytes read
|
||||||
blobReader := hashers.NewHasher(narReader, blake3.New(32, nil))
|
blobCountW := &CountingWriter{}
|
||||||
|
blobReader := io.TeeReader(narReader, blobCountW)
|
||||||
|
|
||||||
blobDigest, err := blobCb(blobReader)
|
blobDigest, err := blobCb(blobReader)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -212,8 +214,8 @@ func Import(
|
||||||
|
|
||||||
// ensure blobCb did read all the way to the end.
|
// ensure blobCb did read all the way to the end.
|
||||||
// If it didn't, the blobCb function is wrong and we should bail out.
|
// If it didn't, the blobCb function is wrong and we should bail out.
|
||||||
if blobReader.BytesWritten() != uint32(hdr.Size) {
|
if blobCountW.BytesWritten() != uint64(hdr.Size) {
|
||||||
panic("not read to end")
|
panic("blobCB did not read to end")
|
||||||
}
|
}
|
||||||
|
|
||||||
fileNode := &castorev1pb.FileNode{
|
fileNode := &castorev1pb.FileNode{
|
||||||
|
|
Loading…
Reference in a new issue