2022-11-19 21:34:49 +01:00
|
|
|
package server
|
|
|
|
|
|
|
|
import (
|
2023-09-17 20:47:11 +02:00
|
|
|
"bufio"
|
2022-11-19 21:34:49 +01:00
|
|
|
"context"
|
|
|
|
"encoding/base64"
|
2023-09-17 20:47:11 +02:00
|
|
|
"errors"
|
2022-11-19 21:34:49 +01:00
|
|
|
"fmt"
|
|
|
|
"io"
|
|
|
|
|
2023-09-17 20:47:11 +02:00
|
|
|
storev1pb "code.tvl.fyi/tvix/store/protos"
|
|
|
|
log "github.com/sirupsen/logrus"
|
|
|
|
)
|
2022-11-19 21:34:49 +01:00
|
|
|
|
2023-09-17 20:47:11 +02:00
|
|
|
// the size of individual BlobChunk we send when uploading to BlobService.
|
|
|
|
const chunkSize = 1024 * 1024
|
2022-11-19 21:34:49 +01:00
|
|
|
|
2023-09-17 20:47:11 +02:00
|
|
|
// this produces a callback function that can be used as blobCb for the
|
|
|
|
// reader.Import function call
|
|
|
|
func genBlobServiceWriteCb(ctx context.Context, blobServiceClient storev1pb.BlobServiceClient) func(io.Reader) error {
|
|
|
|
return func(blobReader io.Reader) error {
|
|
|
|
// Ensure the blobReader is buffered to at least the chunk size.
|
|
|
|
blobReader = bufio.NewReaderSize(blobReader, chunkSize)
|
2022-11-19 21:34:49 +01:00
|
|
|
|
|
|
|
putter, err := blobServiceClient.Put(ctx)
|
|
|
|
if err != nil {
|
|
|
|
// return error to the importer
|
|
|
|
return fmt.Errorf("error from blob service: %w", err)
|
|
|
|
}
|
2023-09-17 20:47:11 +02:00
|
|
|
|
|
|
|
blobSize := 0
|
|
|
|
chunk := make([]byte, chunkSize)
|
|
|
|
|
|
|
|
for {
|
|
|
|
n, err := blobReader.Read(chunk)
|
|
|
|
if err != nil && !errors.Is(err, io.EOF) {
|
|
|
|
return fmt.Errorf("unable to read from blobreader: %w", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
if n != 0 {
|
|
|
|
log.WithField("chunk_size", n).Debug("sending chunk")
|
|
|
|
blobSize += n
|
|
|
|
|
|
|
|
// send the blob chunk to the server. The err is only valid in the inner scope
|
|
|
|
if err := putter.Send(&storev1pb.BlobChunk{
|
|
|
|
Data: chunk[:n],
|
|
|
|
}); err != nil {
|
|
|
|
return fmt.Errorf("sending blob chunk: %w", err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// if our read from blobReader returned an EOF, we're done reading
|
|
|
|
if errors.Is(err, io.EOF) {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
|
2022-11-19 21:34:49 +01:00
|
|
|
}
|
2023-09-17 20:47:11 +02:00
|
|
|
|
2022-11-19 21:34:49 +01:00
|
|
|
resp, err := putter.CloseAndRecv()
|
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("close blob putter: %w", err)
|
|
|
|
}
|
|
|
|
|
2023-09-17 20:47:11 +02:00
|
|
|
log.WithFields(log.Fields{
|
|
|
|
"blob_digest": base64.StdEncoding.EncodeToString(resp.GetDigest()),
|
|
|
|
"blob_size": blobSize,
|
2023-09-18 11:04:59 +02:00
|
|
|
}).Debug("uploaded blob")
|
2022-11-19 21:34:49 +01:00
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
}
|