fix(tvix/nar-bridge): chunk blobs

Instead of creating one big BlobChunk containing all data, and creating
way too large proto messages, chunk blobs up to a reasonable (1MiB)
chunk size, and send them to the server like that.

Change-Id: Ia45a53956a6d7c0599cc59ac516ba37e9fb1b30e
Reviewed-on: https://cl.tvl.fyi/c/depot/+/9357
Reviewed-by: Connor Brewster <cbrewster@hey.com>
Tested-by: BuildkiteCI
This commit is contained in:
Florian Klink 2023-09-17 21:47:11 +03:00 committed by flokli
parent 136b12ddd7
commit 0c031461c3
2 changed files with 48 additions and 25 deletions

View file

@ -41,7 +41,7 @@ func New(r io.Reader) *Reader {
func (r *Reader) Import( func (r *Reader) Import(
ctx context.Context, ctx context.Context,
// callback function called with each regular file content // callback function called with each regular file content
fileCb func(fileReader io.Reader) error, blobCb func(fileReader io.Reader) error,
// callback function called with each finalized directory node // callback function called with each finalized directory node
directoryCb func(directory *storev1pb.Directory) error, directoryCb func(directory *storev1pb.Directory) error,
) (*storev1pb.PathInfo, error) { ) (*storev1pb.PathInfo, error) {
@ -219,9 +219,9 @@ func (r *Reader) Import(
// wrap reader with a reader calculating the blake3 hash // wrap reader with a reader calculating the blake3 hash
fileReader := NewHasher(narReader, blake3.New(32, nil)) fileReader := NewHasher(narReader, blake3.New(32, nil))
err := fileCb(fileReader) err := blobCb(fileReader)
if err != nil { if err != nil {
return nil, fmt.Errorf("failure from fileCb: %w", err) return nil, fmt.Errorf("failure from blobCb: %w", err)
} }
// drive the file reader to the end, in case the CB function doesn't read // drive the file reader to the end, in case the CB function doesn't read

View file

@ -1,47 +1,70 @@
package server package server
import ( import (
storev1pb "code.tvl.fyi/tvix/store/protos" "bufio"
"context" "context"
"encoding/base64" "encoding/base64"
"errors"
"fmt" "fmt"
log "github.com/sirupsen/logrus"
"io" "io"
storev1pb "code.tvl.fyi/tvix/store/protos"
log "github.com/sirupsen/logrus"
) )
// this returns a callback function that can be used as fileCb // the size of individual BlobChunk we send when uploading to BlobService.
// for the reader.Import function call const chunkSize = 1024 * 1024
// this produces a callback function that can be used as blobCb for the
// reader.Import function call
func genBlobServiceWriteCb(ctx context.Context, blobServiceClient storev1pb.BlobServiceClient) func(io.Reader) error { func genBlobServiceWriteCb(ctx context.Context, blobServiceClient storev1pb.BlobServiceClient) func(io.Reader) error {
return func(fileReader io.Reader) error { return func(blobReader io.Reader) error {
// Read from fileReader into a buffer. // Ensure the blobReader is buffered to at least the chunk size.
// We currently buffer all contents and send them to blobServiceClient at once, blobReader = bufio.NewReaderSize(blobReader, chunkSize)
// but that's about to change.
contents, err := io.ReadAll(fileReader)
if err != nil {
return fmt.Errorf("unable to read all contents from file reader: %w", err)
}
log := log.WithField("blob_size", len(contents))
log.Infof("about to upload blob")
putter, err := blobServiceClient.Put(ctx) putter, err := blobServiceClient.Put(ctx)
if err != nil { if err != nil {
// return error to the importer // return error to the importer
return fmt.Errorf("error from blob service: %w", err) return fmt.Errorf("error from blob service: %w", err)
} }
err = putter.Send(&storev1pb.BlobChunk{
Data: contents, blobSize := 0
}) chunk := make([]byte, chunkSize)
if err != nil {
return fmt.Errorf("putting blob chunk: %w", err) for {
n, err := blobReader.Read(chunk)
if err != nil && !errors.Is(err, io.EOF) {
return fmt.Errorf("unable to read from blobreader: %w", err)
}
if n != 0 {
log.WithField("chunk_size", n).Debug("sending chunk")
blobSize += n
// send the blob chunk to the server. The err is only valid in the inner scope
if err := putter.Send(&storev1pb.BlobChunk{
Data: chunk[:n],
}); err != nil {
return fmt.Errorf("sending blob chunk: %w", err)
}
}
// if our read from blobReader returned an EOF, we're done reading
if errors.Is(err, io.EOF) {
break
}
} }
resp, err := putter.CloseAndRecv() resp, err := putter.CloseAndRecv()
if err != nil { if err != nil {
return fmt.Errorf("close blob putter: %w", err) return fmt.Errorf("close blob putter: %w", err)
} }
log.WithField("digest", base64.StdEncoding.EncodeToString(resp.GetDigest())).Info("uploaded blob") log.WithFields(log.Fields{
"blob_digest": base64.StdEncoding.EncodeToString(resp.GetDigest()),
"blob_size": blobSize,
}).Info("uploaded blob")
return nil return nil
} }