refactor(tvix/nar-bridge): do root directory dgst check in uploader

This check makes more sense there, and gives stronger semantics - Done()
only succeeds if the other side successfully received everything, *and*
came up with the same hashes as we did.

Change-Id: I20b706961053fd00d22cc70e1c8cc859705587e0
Reviewed-on: https://cl.tvl.fyi/c/depot/+/9542
Tested-by: BuildkiteCI
Autosubmit: flokli <flokli@flokli.de>
Reviewed-by: Connor Brewster <cbrewster@hey.com>
This commit is contained in:
Florian Klink 2023-10-05 08:55:05 +03:00 committed by flokli
parent c04041a001
commit cb807ad79b
2 changed files with 19 additions and 20 deletions

View file

@ -1,6 +1,7 @@
package importer package importer
import ( import (
"bytes"
"context" "context"
"encoding/base64" "encoding/base64"
"fmt" "fmt"
@ -17,6 +18,7 @@ type DirectoriesUploader struct {
ctx context.Context ctx context.Context
directoryServiceClient castorev1pb.DirectoryServiceClient directoryServiceClient castorev1pb.DirectoryServiceClient
directoryServicePutStream castorev1pb.DirectoryService_PutClient directoryServicePutStream castorev1pb.DirectoryService_PutClient
lastDirectoryDigest []byte
} }
func NewDirectoriesUploader(ctx context.Context, directoryServiceClient castorev1pb.DirectoryServiceClient) *DirectoriesUploader { func NewDirectoriesUploader(ctx context.Context, directoryServiceClient castorev1pb.DirectoryServiceClient) *DirectoriesUploader {
@ -50,20 +52,36 @@ func (du *DirectoriesUploader) Put(directory *castorev1pb.Directory) ([]byte, er
} }
log.WithField("digest", base64.StdEncoding.EncodeToString(directoryDigest)).Debug("uploaded directory") log.WithField("digest", base64.StdEncoding.EncodeToString(directoryDigest)).Debug("uploaded directory")
// update lastDirectoryDigest
du.lastDirectoryDigest = directoryDigest
return directoryDigest, nil return directoryDigest, nil
} }
// Done closes the stream and returns the response. // Done closes the stream and returns the response.
// It returns null if closed for a second time.
func (du *DirectoriesUploader) Done() (*castorev1pb.PutDirectoryResponse, error) { func (du *DirectoriesUploader) Done() (*castorev1pb.PutDirectoryResponse, error) {
// only close once, and only if we opened. // only close once, and only if we opened.
if du.directoryServicePutStream == nil { if du.directoryServicePutStream == nil {
return nil, nil return nil, nil
} }
putDirectoryResponse, err := du.directoryServicePutStream.CloseAndRecv() putDirectoryResponse, err := du.directoryServicePutStream.CloseAndRecv()
if err != nil { if err != nil {
return nil, fmt.Errorf("unable to close directory service put stream: %v", err) return nil, fmt.Errorf("unable to close directory service put stream: %v", err)
} }
// ensure the response contains the same digest as the one we have in lastDirectoryDigest.
// Otherwise, the backend came up with another digest than we, in which we return an error.
if !bytes.Equal(du.lastDirectoryDigest, putDirectoryResponse.RootDigest) {
return nil, fmt.Errorf(
"backend calculated different root digest as we, expected %s, actual %s",
base64.StdEncoding.EncodeToString(du.lastDirectoryDigest),
base64.StdEncoding.EncodeToString(putDirectoryResponse.RootDigest),
)
}
// clear directoryServicePutStream.
du.directoryServicePutStream = nil du.directoryServicePutStream = nil
return putDirectoryResponse, nil return putDirectoryResponse, nil

View file

@ -200,31 +200,12 @@ func (p *PathInfoServiceServer) Get(ctx context.Context, getPathInfoRequest *sto
// Close the directories uploader. This ensures the DirectoryService has // Close the directories uploader. This ensures the DirectoryService has
// properly persisted all Directory messages sent. // properly persisted all Directory messages sent.
directoriesPutResponse, err := directoriesUploader.Done() if _, err := directoriesUploader.Done(); err != nil {
if err != nil {
log.WithError(err).Error("error during directory upload") log.WithError(err).Error("error during directory upload")
return nil, status.Error(codes.Internal, "error during directory upload") return nil, status.Error(codes.Internal, "error during directory upload")
} }
// If we uploaded directories (so directoriesPutResponse doesn't return null),
// the RootDigest field in directoriesPutResponse should match the digest
// returned in importedPathInfo.
// This check ensures the directory service came up with the same root hash as we did.
if directoriesPutResponse != nil {
rootDigestPathInfo := pathInfo.GetNode().GetDirectory().GetDigest()
rootDigestDirectoriesPutResponse := directoriesPutResponse.GetRootDigest()
log := log.WithFields(logrus.Fields{
"root_digest_pathinfo": rootDigestPathInfo,
"root_digest_directories_put_resp": rootDigestDirectoriesPutResponse,
})
if !bytes.Equal(rootDigestPathInfo, rootDigestDirectoriesPutResponse) {
log.Errorf("returned root digest doesn't match what's calculated")
return nil, status.Error(codes.Internal, "error in root digest calculation")
}
}
// Compare NAR hash in the NARInfo with the one we calculated while reading the NAR // Compare NAR hash in the NARInfo with the one we calculated while reading the NAR
// We already checked above that the digest is in sha256. // We already checked above that the digest is in sha256.
importedNarSha256 := pathInfo.GetNarinfo().GetNarSha256() importedNarSha256 := pathInfo.GetNarinfo().GetNarSha256()