refactor(tvix/nar-bridge): drop reader package
Make the import function usable on any reader. Change-Id: I84d2004cb73cdd7a11fe8efb0f2efb6335d5e6b0 Reviewed-on: https://cl.tvl.fyi/c/depot/+/9527 Reviewed-by: Connor Brewster <cbrewster@hey.com> Tested-by: BuildkiteCI Autosubmit: flokli <flokli@flokli.de>
This commit is contained in:
parent
6e9a5dcd59
commit
b1ff1267be
6 changed files with 41 additions and 56 deletions
|
@ -1,4 +1,4 @@
|
||||||
package reader
|
package hashers
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
|
@ -1,4 +1,4 @@
|
||||||
package reader
|
package importer
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
@ -10,45 +10,36 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
castorev1pb "code.tvl.fyi/tvix/castore/protos"
|
castorev1pb "code.tvl.fyi/tvix/castore/protos"
|
||||||
|
"code.tvl.fyi/tvix/nar-bridge/pkg/hashers"
|
||||||
storev1pb "code.tvl.fyi/tvix/store/protos"
|
storev1pb "code.tvl.fyi/tvix/store/protos"
|
||||||
"github.com/nix-community/go-nix/pkg/nar"
|
"github.com/nix-community/go-nix/pkg/nar"
|
||||||
"lukechampine.com/blake3"
|
"lukechampine.com/blake3"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Reader struct {
|
|
||||||
hrSha256 *Hasher
|
|
||||||
}
|
|
||||||
|
|
||||||
// An item on the directories stack
|
// An item on the directories stack
|
||||||
type stackItem struct {
|
type stackItem struct {
|
||||||
path string
|
path string
|
||||||
directory *castorev1pb.Directory
|
directory *castorev1pb.Directory
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(r io.Reader) *Reader {
|
// Import reads NAR from a reader, and returns a (sparsely populated) PathInfo
|
||||||
// Instead of using the underlying reader itself, wrap the reader
|
// object.
|
||||||
// with a hasher calculating sha256 and one calculating sha512,
|
func Import(
|
||||||
// and feed that one into the NAR reader.
|
// a context, to support cancellation
|
||||||
hrSha256 := NewHasher(r, sha256.New())
|
|
||||||
|
|
||||||
return &Reader{
|
|
||||||
hrSha256: hrSha256,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Import reads from the internally-wrapped reader,
|
|
||||||
// and calls the callback functions whenever regular file contents are
|
|
||||||
// encountered, or a Directory node is about to be finished.
|
|
||||||
func (r *Reader) Import(
|
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
|
// The reader the data is read from
|
||||||
|
r io.Reader,
|
||||||
// callback function called with each regular file content
|
// callback function called with each regular file content
|
||||||
blobCb func(fileReader io.Reader) error,
|
blobCb func(fileReader io.Reader) error,
|
||||||
// callback function called with each finalized directory node
|
// callback function called with each finalized directory node
|
||||||
directoryCb func(directory *castorev1pb.Directory) error,
|
directoryCb func(directory *castorev1pb.Directory) error,
|
||||||
) (*storev1pb.PathInfo, error) {
|
) (*storev1pb.PathInfo, error) {
|
||||||
|
// wrap the passed reader in a reader that records the number of bytes read and
|
||||||
|
// their sha256 sum.
|
||||||
|
hr := hashers.NewHasher(r, sha256.New())
|
||||||
|
|
||||||
// construct a NAR reader, by reading through hrSha256
|
// construct a NAR reader from the underlying data.
|
||||||
narReader, err := nar.NewReader(r.hrSha256)
|
narReader, err := nar.NewReader(hr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to instantiate nar reader: %w", err)
|
return nil, fmt.Errorf("failed to instantiate nar reader: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -144,8 +135,8 @@ func (r *Reader) Import(
|
||||||
Node: nil,
|
Node: nil,
|
||||||
References: [][]byte{},
|
References: [][]byte{},
|
||||||
Narinfo: &storev1pb.NARInfo{
|
Narinfo: &storev1pb.NARInfo{
|
||||||
NarSize: uint64(r.hrSha256.BytesWritten()),
|
NarSize: uint64(hr.BytesWritten()),
|
||||||
NarSha256: r.hrSha256.Sum(nil),
|
NarSha256: hr.Sum(nil),
|
||||||
Signatures: []*storev1pb.NARInfo_Signature{},
|
Signatures: []*storev1pb.NARInfo_Signature{},
|
||||||
ReferenceNames: []string{},
|
ReferenceNames: []string{},
|
||||||
},
|
},
|
||||||
|
@ -215,7 +206,7 @@ func (r *Reader) Import(
|
||||||
}
|
}
|
||||||
if hdr.Type == nar.TypeRegular {
|
if hdr.Type == nar.TypeRegular {
|
||||||
// wrap reader with a reader calculating the blake3 hash
|
// wrap reader with a reader calculating the blake3 hash
|
||||||
fileReader := NewHasher(narReader, blake3.New(32, nil))
|
fileReader := hashers.NewHasher(narReader, blake3.New(32, nil))
|
||||||
|
|
||||||
err := blobCb(fileReader)
|
err := blobCb(fileReader)
|
||||||
if err != nil {
|
if err != nil {
|
|
@ -1,4 +1,4 @@
|
||||||
package reader_test
|
package importer_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
@ -8,7 +8,7 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
castorev1pb "code.tvl.fyi/tvix/castore/protos"
|
castorev1pb "code.tvl.fyi/tvix/castore/protos"
|
||||||
"code.tvl.fyi/tvix/nar-bridge/pkg/reader"
|
"code.tvl.fyi/tvix/nar-bridge/pkg/importer"
|
||||||
storev1pb "code.tvl.fyi/tvix/store/protos"
|
storev1pb "code.tvl.fyi/tvix/store/protos"
|
||||||
"github.com/google/go-cmp/cmp"
|
"github.com/google/go-cmp/cmp"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
@ -33,10 +33,9 @@ func TestSymlink(t *testing.T) {
|
||||||
f, err := os.Open("../../testdata/symlink.nar")
|
f, err := os.Open("../../testdata/symlink.nar")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
r := reader.New(f)
|
actualPathInfo, err := importer.Import(
|
||||||
|
|
||||||
actualPathInfo, err := r.Import(
|
|
||||||
context.Background(),
|
context.Background(),
|
||||||
|
f,
|
||||||
func(fileReader io.Reader) error {
|
func(fileReader io.Reader) error {
|
||||||
panic("no file contents expected!")
|
panic("no file contents expected!")
|
||||||
}, func(directory *castorev1pb.Directory) error {
|
}, func(directory *castorev1pb.Directory) error {
|
||||||
|
@ -72,10 +71,9 @@ func TestRegular(t *testing.T) {
|
||||||
f, err := os.Open("../../testdata/onebyteregular.nar")
|
f, err := os.Open("../../testdata/onebyteregular.nar")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
r := reader.New(f)
|
actualPathInfo, err := importer.Import(
|
||||||
|
|
||||||
actualPathInfo, err := r.Import(
|
|
||||||
context.Background(),
|
context.Background(),
|
||||||
|
f,
|
||||||
func(fileReader io.Reader) error {
|
func(fileReader io.Reader) error {
|
||||||
contents, err := io.ReadAll(fileReader)
|
contents, err := io.ReadAll(fileReader)
|
||||||
require.NoError(t, err, "reading fileReader should not error")
|
require.NoError(t, err, "reading fileReader should not error")
|
||||||
|
@ -123,15 +121,14 @@ func TestEmptyDirectory(t *testing.T) {
|
||||||
f, err := os.Open("../../testdata/emptydirectory.nar")
|
f, err := os.Open("../../testdata/emptydirectory.nar")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
r := reader.New(f)
|
|
||||||
|
|
||||||
expectedDirectory := &castorev1pb.Directory{
|
expectedDirectory := &castorev1pb.Directory{
|
||||||
Directories: []*castorev1pb.DirectoryNode{},
|
Directories: []*castorev1pb.DirectoryNode{},
|
||||||
Files: []*castorev1pb.FileNode{},
|
Files: []*castorev1pb.FileNode{},
|
||||||
Symlinks: []*castorev1pb.SymlinkNode{},
|
Symlinks: []*castorev1pb.SymlinkNode{},
|
||||||
}
|
}
|
||||||
actualPathInfo, err := r.Import(
|
actualPathInfo, err := importer.Import(
|
||||||
context.Background(),
|
context.Background(),
|
||||||
|
f,
|
||||||
func(fileReader io.Reader) error {
|
func(fileReader io.Reader) error {
|
||||||
panic("no file contents expected!")
|
panic("no file contents expected!")
|
||||||
}, func(directory *castorev1pb.Directory) error {
|
}, func(directory *castorev1pb.Directory) error {
|
||||||
|
@ -168,8 +165,6 @@ func TestFull(t *testing.T) {
|
||||||
f, err := os.Open("../../testdata/nar_1094wph9z4nwlgvsd53abfz8i117ykiv5dwnq9nnhz846s7xqd7d.nar")
|
f, err := os.Open("../../testdata/nar_1094wph9z4nwlgvsd53abfz8i117ykiv5dwnq9nnhz846s7xqd7d.nar")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
r := reader.New(f)
|
|
||||||
|
|
||||||
expectedDirectoryPaths := []string{
|
expectedDirectoryPaths := []string{
|
||||||
"/bin",
|
"/bin",
|
||||||
"/share/man/man1",
|
"/share/man/man1",
|
||||||
|
@ -478,8 +473,9 @@ func TestFull(t *testing.T) {
|
||||||
|
|
||||||
numDirectoriesReceived := 0
|
numDirectoriesReceived := 0
|
||||||
|
|
||||||
actualPathInfo, err := r.Import(
|
actualPathInfo, err := importer.Import(
|
||||||
context.Background(),
|
context.Background(),
|
||||||
|
f,
|
||||||
func(fileReader io.Reader) error {
|
func(fileReader io.Reader) error {
|
||||||
// Don't really bother reading and comparing the contents here,
|
// Don't really bother reading and comparing the contents here,
|
||||||
// We already verify the right digests are produced by comparing the
|
// We already verify the right digests are produced by comparing the
|
||||||
|
@ -533,12 +529,11 @@ func TestCallbackErrors(t *testing.T) {
|
||||||
f, err := os.Open("../../testdata/onebyteregular.nar")
|
f, err := os.Open("../../testdata/onebyteregular.nar")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
r := reader.New(f)
|
|
||||||
|
|
||||||
targetErr := errors.New("expected error")
|
targetErr := errors.New("expected error")
|
||||||
|
|
||||||
_, err = r.Import(
|
_, err = importer.Import(
|
||||||
context.Background(),
|
context.Background(),
|
||||||
|
f,
|
||||||
func(fileReader io.Reader) error {
|
func(fileReader io.Reader) error {
|
||||||
return targetErr
|
return targetErr
|
||||||
}, func(directory *castorev1pb.Directory) error {
|
}, func(directory *castorev1pb.Directory) error {
|
||||||
|
@ -552,12 +547,11 @@ func TestCallbackErrors(t *testing.T) {
|
||||||
f, err := os.Open("../../testdata/emptydirectory.nar")
|
f, err := os.Open("../../testdata/emptydirectory.nar")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
r := reader.New(f)
|
|
||||||
|
|
||||||
targetErr := errors.New("expected error")
|
targetErr := errors.New("expected error")
|
||||||
|
|
||||||
_, err = r.Import(
|
_, err = importer.Import(
|
||||||
context.Background(),
|
context.Background(),
|
||||||
|
f,
|
||||||
func(fileReader io.Reader) error {
|
func(fileReader io.Reader) error {
|
||||||
panic("no file contents expected!")
|
panic("no file contents expected!")
|
||||||
}, func(directory *castorev1pb.Directory) error {
|
}, func(directory *castorev1pb.Directory) error {
|
||||||
|
@ -585,9 +579,9 @@ func TestPopDirectories(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
defer f.Close()
|
defer f.Close()
|
||||||
|
|
||||||
r := reader.New(f)
|
_, err = importer.Import(
|
||||||
_, err = r.Import(
|
|
||||||
context.Background(),
|
context.Background(),
|
||||||
|
f,
|
||||||
func(fileReader io.Reader) error { return nil },
|
func(fileReader io.Reader) error { return nil },
|
||||||
func(directory *castorev1pb.Directory) error {
|
func(directory *castorev1pb.Directory) error {
|
||||||
return directory.Validate()
|
return directory.Validate()
|
|
@ -16,7 +16,7 @@ import (
|
||||||
const chunkSize = 1024 * 1024
|
const chunkSize = 1024 * 1024
|
||||||
|
|
||||||
// this produces a callback function that can be used as blobCb for the
|
// this produces a callback function that can be used as blobCb for the
|
||||||
// reader.Import function call
|
// importer.Import function call.
|
||||||
func genBlobServiceWriteCb(ctx context.Context, blobServiceClient castorev1pb.BlobServiceClient) func(io.Reader) error {
|
func genBlobServiceWriteCb(ctx context.Context, blobServiceClient castorev1pb.BlobServiceClient) func(io.Reader) error {
|
||||||
return func(blobReader io.Reader) error {
|
return func(blobReader io.Reader) error {
|
||||||
// Ensure the blobReader is buffered to at least the chunk size.
|
// Ensure the blobReader is buffered to at least the chunk size.
|
||||||
|
|
|
@ -7,7 +7,7 @@ import (
|
||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
castorev1pb "code.tvl.fyi/tvix/castore/protos"
|
castorev1pb "code.tvl.fyi/tvix/castore/protos"
|
||||||
"code.tvl.fyi/tvix/nar-bridge/pkg/reader"
|
"code.tvl.fyi/tvix/nar-bridge/pkg/importer"
|
||||||
"github.com/go-chi/chi/v5"
|
"github.com/go-chi/chi/v5"
|
||||||
nixhash "github.com/nix-community/go-nix/pkg/hash"
|
nixhash "github.com/nix-community/go-nix/pkg/hash"
|
||||||
"github.com/nix-community/go-nix/pkg/nixbase32"
|
"github.com/nix-community/go-nix/pkg/nixbase32"
|
||||||
|
@ -39,10 +39,10 @@ func registerNarPut(s *Server) {
|
||||||
directoriesUploader := NewDirectoriesUploader(ctx, s.directoryServiceClient)
|
directoriesUploader := NewDirectoriesUploader(ctx, s.directoryServiceClient)
|
||||||
defer directoriesUploader.Done() //nolint:errcheck
|
defer directoriesUploader.Done() //nolint:errcheck
|
||||||
|
|
||||||
// buffer the body by 10MiB
|
pathInfo, err := importer.Import(
|
||||||
rd := reader.New(bufio.NewReaderSize(r.Body, 10*1024*1024))
|
|
||||||
pathInfo, err := rd.Import(
|
|
||||||
ctx,
|
ctx,
|
||||||
|
// buffer the body by 10MiB
|
||||||
|
bufio.NewReaderSize(r.Body, 10*1024*1024),
|
||||||
genBlobServiceWriteCb(ctx, s.blobServiceClient),
|
genBlobServiceWriteCb(ctx, s.blobServiceClient),
|
||||||
func(directory *castorev1pb.Directory) error {
|
func(directory *castorev1pb.Directory) error {
|
||||||
return directoriesUploader.Put(directory)
|
return directoriesUploader.Put(directory)
|
||||||
|
|
|
@ -9,7 +9,7 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
castorev1pb "code.tvl.fyi/tvix/castore/protos"
|
castorev1pb "code.tvl.fyi/tvix/castore/protos"
|
||||||
"code.tvl.fyi/tvix/nar-bridge/pkg/reader"
|
"code.tvl.fyi/tvix/nar-bridge/pkg/importer"
|
||||||
"code.tvl.fyi/tvix/nar-bridge/pkg/writer"
|
"code.tvl.fyi/tvix/nar-bridge/pkg/writer"
|
||||||
storev1pb "code.tvl.fyi/tvix/store/protos"
|
storev1pb "code.tvl.fyi/tvix/store/protos"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
@ -159,9 +159,9 @@ func TestFull(t *testing.T) {
|
||||||
filesMap := make(map[string][]byte, 0)
|
filesMap := make(map[string][]byte, 0)
|
||||||
directoriesMap := make(map[string]*castorev1pb.Directory)
|
directoriesMap := make(map[string]*castorev1pb.Directory)
|
||||||
|
|
||||||
r := reader.New(bytes.NewBuffer(narContents))
|
pathInfo, err := importer.Import(
|
||||||
pathInfo, err := r.Import(
|
|
||||||
context.Background(),
|
context.Background(),
|
||||||
|
bytes.NewBuffer(narContents),
|
||||||
func(fileReader io.Reader) error {
|
func(fileReader io.Reader) error {
|
||||||
fileContents, err := io.ReadAll(fileReader)
|
fileContents, err := io.ReadAll(fileReader)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
Loading…
Reference in a new issue