2023-10-03 13:42:59 +02:00
|
|
|
package importer
|
2022-11-19 21:34:49 +01:00
|
|
|
|
|
|
|
import (
|
2023-10-05 07:55:05 +02:00
|
|
|
"bytes"
|
2022-11-19 21:34:49 +01:00
|
|
|
"context"
|
|
|
|
"encoding/base64"
|
|
|
|
"fmt"
|
|
|
|
|
2023-09-22 15:38:10 +02:00
|
|
|
castorev1pb "code.tvl.fyi/tvix/castore/protos"
|
2022-11-19 21:34:49 +01:00
|
|
|
log "github.com/sirupsen/logrus"
|
|
|
|
)
|
|
|
|
|
2023-10-03 13:37:15 +02:00
|
|
|
// DirectoriesUploader opens a Put stream when it receives the first Put() call,
|
|
|
|
// and then uses the opened stream for subsequent Put() calls.
|
|
|
|
// When the uploading is finished, a call to Done() will close the stream and
|
|
|
|
// return the root digest returned from the directoryServiceClient.
|
2022-11-19 21:34:49 +01:00
|
|
|
type DirectoriesUploader struct {
|
|
|
|
ctx context.Context
|
2023-09-22 15:38:10 +02:00
|
|
|
directoryServiceClient castorev1pb.DirectoryServiceClient
|
|
|
|
directoryServicePutStream castorev1pb.DirectoryService_PutClient
|
2023-10-05 07:55:05 +02:00
|
|
|
lastDirectoryDigest []byte
|
2022-11-19 21:34:49 +01:00
|
|
|
}
|
|
|
|
|
2023-09-22 15:38:10 +02:00
|
|
|
func NewDirectoriesUploader(ctx context.Context, directoryServiceClient castorev1pb.DirectoryServiceClient) *DirectoriesUploader {
|
2022-11-19 21:34:49 +01:00
|
|
|
return &DirectoriesUploader{
|
|
|
|
ctx: ctx,
|
|
|
|
directoryServiceClient: directoryServiceClient,
|
|
|
|
directoryServicePutStream: nil,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-10-03 12:59:13 +02:00
|
|
|
func (du *DirectoriesUploader) Put(directory *castorev1pb.Directory) ([]byte, error) {
|
|
|
|
directoryDigest, err := directory.Digest()
|
2022-11-19 21:34:49 +01:00
|
|
|
if err != nil {
|
2023-10-03 12:59:13 +02:00
|
|
|
return nil, fmt.Errorf("failed calculating directory digest: %w", err)
|
2022-11-19 21:34:49 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
// Send the directory to the directory service
|
|
|
|
// If the stream hasn't been initialized yet, do it first
|
|
|
|
if du.directoryServicePutStream == nil {
|
|
|
|
directoryServicePutStream, err := du.directoryServiceClient.Put(du.ctx)
|
|
|
|
if err != nil {
|
2023-10-03 12:59:13 +02:00
|
|
|
return nil, fmt.Errorf("unable to initialize directory service put stream: %v", err)
|
2022-11-19 21:34:49 +01:00
|
|
|
}
|
|
|
|
du.directoryServicePutStream = directoryServicePutStream
|
|
|
|
}
|
|
|
|
|
|
|
|
// send the directory out
|
|
|
|
err = du.directoryServicePutStream.Send(directory)
|
|
|
|
if err != nil {
|
2023-10-03 12:59:13 +02:00
|
|
|
return nil, fmt.Errorf("error sending directory: %w", err)
|
2022-11-19 21:34:49 +01:00
|
|
|
}
|
2023-10-03 12:59:13 +02:00
|
|
|
log.WithField("digest", base64.StdEncoding.EncodeToString(directoryDigest)).Debug("uploaded directory")
|
2022-11-19 21:34:49 +01:00
|
|
|
|
2023-10-05 07:55:05 +02:00
|
|
|
// update lastDirectoryDigest
|
|
|
|
du.lastDirectoryDigest = directoryDigest
|
|
|
|
|
2023-10-03 12:59:13 +02:00
|
|
|
return directoryDigest, nil
|
2022-11-19 21:34:49 +01:00
|
|
|
}
|
|
|
|
|
2023-10-03 13:37:15 +02:00
|
|
|
// Done closes the stream and returns the response.
|
2023-10-05 07:55:05 +02:00
|
|
|
// It returns null if closed for a second time.
|
2023-09-22 15:38:10 +02:00
|
|
|
func (du *DirectoriesUploader) Done() (*castorev1pb.PutDirectoryResponse, error) {
|
2022-11-19 21:34:49 +01:00
|
|
|
// only close once, and only if we opened.
|
|
|
|
if du.directoryServicePutStream == nil {
|
|
|
|
return nil, nil
|
|
|
|
}
|
2023-10-05 07:55:05 +02:00
|
|
|
|
2022-11-19 21:34:49 +01:00
|
|
|
putDirectoryResponse, err := du.directoryServicePutStream.CloseAndRecv()
|
|
|
|
if err != nil {
|
|
|
|
return nil, fmt.Errorf("unable to close directory service put stream: %v", err)
|
|
|
|
}
|
|
|
|
|
2023-10-05 07:55:05 +02:00
|
|
|
// ensure the response contains the same digest as the one we have in lastDirectoryDigest.
|
|
|
|
// Otherwise, the backend came up with another digest than we, in which we return an error.
|
|
|
|
if !bytes.Equal(du.lastDirectoryDigest, putDirectoryResponse.RootDigest) {
|
|
|
|
return nil, fmt.Errorf(
|
|
|
|
"backend calculated different root digest as we, expected %s, actual %s",
|
|
|
|
base64.StdEncoding.EncodeToString(du.lastDirectoryDigest),
|
|
|
|
base64.StdEncoding.EncodeToString(putDirectoryResponse.RootDigest),
|
|
|
|
)
|
|
|
|
}
|
|
|
|
|
|
|
|
// clear directoryServicePutStream.
|
2022-11-19 21:34:49 +01:00
|
|
|
du.directoryServicePutStream = nil
|
|
|
|
|
|
|
|
return putDirectoryResponse, nil
|
|
|
|
}
|