diff --git a/tools/nixery/builder/builder.go b/tools/nixery/builder/builder.go index 901373b57..7f0bd7fff 100644 --- a/tools/nixery/builder/builder.go +++ b/tools/nixery/builder/builder.go @@ -26,6 +26,7 @@ import ( "github.com/google/nixery/layers" "github.com/google/nixery/manifest" "github.com/google/nixery/storage" + "github.com/im7mortal/kmutex" log "github.com/sirupsen/logrus" ) @@ -37,10 +38,11 @@ const LayerBudget int = 94 // State holds the runtime state that is carried around in Nixery and // passed to builder functions. type State struct { - Storage storage.Backend - Cache *LocalCache - Cfg config.Config - Pop layers.Popularity + Storage storage.Backend + Cache *LocalCache + Cfg config.Config + Pop layers.Popularity + UploadMutex *kmutex.Kmutex } // Architecture represents the possible CPU architectures for which @@ -292,30 +294,19 @@ func prepareLayers(ctx context.Context, s *State, image *Image, result *ImageRes // Missing layers are built and uploaded to the storage // bucket. for _, l := range grouped { - if entry, cached := layerFromCache(ctx, s, l.Hash()); cached { - entries = append(entries, *entry) - } else { - lh := l.Hash() + lh := l.Hash() - // While packing store paths, the SHA sum of - // the uncompressed layer is computed and - // written to `tarhash`. - // - // TODO(tazjin): Refactor this to make the - // flow of data cleaner. - var tarhash string - lw := func(w io.Writer) error { - var err error - tarhash, err = packStorePaths(&l, w) - return err - } - - entry, err := uploadHashLayer(ctx, s, lh, lw) + // While packing store paths, the SHA sum of + // the uncompressed layer is computed and + // written to `tarhash`. + // + // TODO(tazjin): Refactor this to make the + // flow of data cleaner. + lw := func(w io.Writer) (string, error) { + tarhash, err := packStorePaths(&l, w) if err != nil { - return nil, err + return "", err } - entry.MergeRating = l.MergeRating - entry.TarHash = tarhash var pkgs []string for _, p := range l.Contents { @@ -328,15 +319,21 @@ func prepareLayers(ctx context.Context, s *State, image *Image, result *ImageRes "tarhash": tarhash, }).Info("created image layer") - go cacheLayer(ctx, s, l.Hash(), *entry) - entries = append(entries, *entry) + return tarhash, err } + + entry, err := uploadHashLayer(ctx, s, lh, l.MergeRating, lw) + if err != nil { + return nil, err + } + + entries = append(entries, *entry) } // Symlink layer (built in the first Nix build) needs to be // included here manually: slkey := result.SymlinkLayer.TarHash - entry, err := uploadHashLayer(ctx, s, slkey, func(w io.Writer) error { + entry, err := uploadHashLayer(ctx, s, slkey, 0, func(w io.Writer) (string, error) { f, err := os.Open(result.SymlinkLayer.Path) if err != nil { log.WithError(err).WithFields(log.Fields{ @@ -345,7 +342,7 @@ func prepareLayers(ctx context.Context, s *State, image *Image, result *ImageRes "layer": slkey, }).Error("failed to open symlink layer") - return err + return "", err } defer f.Close() @@ -358,18 +355,16 @@ func prepareLayers(ctx context.Context, s *State, image *Image, result *ImageRes "layer": slkey, }).Error("failed to upload symlink layer") - return err + return "", err } - return gz.Close() + return "sha256:" + slkey, gz.Close() }) if err != nil { return nil, err } - entry.TarHash = "sha256:" + result.SymlinkLayer.TarHash - go cacheLayer(ctx, s, slkey, *entry) entries = append(entries, *entry) return entries, nil @@ -380,7 +375,7 @@ func prepareLayers(ctx context.Context, s *State, image *Image, result *ImageRes // // This type exists to avoid duplication between the handling of // symlink layers and store path layers. -type layerWriter func(w io.Writer) error +type layerWriter func(w io.Writer) (string, error) // byteCounter is a special io.Writer that counts all bytes written to // it and does nothing else. @@ -408,8 +403,16 @@ func (b *byteCounter) Write(p []byte) (n int, err error) { // // The return value is the layer's SHA256 hash, which is used in the // image manifest. -func uploadHashLayer(ctx context.Context, s *State, key string, lw layerWriter) (*manifest.Entry, error) { +func uploadHashLayer(ctx context.Context, s *State, key string, mrating uint64, lw layerWriter) (*manifest.Entry, error) { + s.UploadMutex.Lock(key) + defer s.UploadMutex.Unlock(key) + + if entry, cached := layerFromCache(ctx, s, key); cached { + return entry, nil + } + path := "staging/" + key + var tarhash string sha256sum, size, err := s.Storage.Persist(ctx, path, manifest.LayerType, func(sw io.Writer) (string, int64, error) { // Sets up a "multiwriter" that simultaneously runs both hash // algorithms and uploads to the storage backend. @@ -417,7 +420,8 @@ func uploadHashLayer(ctx context.Context, s *State, key string, lw layerWriter) counter := &byteCounter{} multi := io.MultiWriter(sw, shasum, counter) - err := lw(multi) + var err error + tarhash, err = lw(multi) sha256sum := fmt.Sprintf("%x", shasum.Sum([]byte{})) return sha256sum, counter.count, err @@ -449,10 +453,14 @@ func uploadHashLayer(ctx context.Context, s *State, key string, lw layerWriter) }).Info("created and persisted layer") entry := manifest.Entry{ - Digest: "sha256:" + sha256sum, - Size: size, + Digest: "sha256:" + sha256sum, + Size: size, + TarHash: tarhash, + MergeRating: mrating, } + cacheLayer(ctx, s, key, entry) + return &entry, nil } @@ -493,13 +501,13 @@ func BuildImage(ctx context.Context, s *State, image *Image) (*BuildResult, erro } m, c := manifest.Manifest(image.Arch.imageArch, layers, cmd) - lw := func(w io.Writer) error { + lw := func(w io.Writer) (string, error) { r := bytes.NewReader(c.Config) _, err := io.Copy(w, r) - return err + return "", err } - if _, err = uploadHashLayer(ctx, s, c.SHA256, lw); err != nil { + if _, err = uploadHashLayer(ctx, s, c.SHA256, 0, lw); err != nil { log.WithError(err).WithFields(log.Fields{ "image": image.Name, "tag": image.Tag, diff --git a/tools/nixery/cmd/server/main.go b/tools/nixery/cmd/server/main.go index 8fe1679cf..24aec6391 100644 --- a/tools/nixery/cmd/server/main.go +++ b/tools/nixery/cmd/server/main.go @@ -30,6 +30,7 @@ import ( "github.com/google/nixery/logs" mf "github.com/google/nixery/manifest" "github.com/google/nixery/storage" + "github.com/im7mortal/kmutex" log "github.com/sirupsen/logrus" ) @@ -257,10 +258,11 @@ func main() { } state := builder.State{ - Cache: &cache, - Cfg: cfg, - Pop: pop, - Storage: s, + Cache: &cache, + Cfg: cfg, + Pop: pop, + Storage: s, + UploadMutex: kmutex.New(), } log.WithFields(log.Fields{ diff --git a/tools/nixery/default.nix b/tools/nixery/default.nix index 529794e59..74cb588a3 100644 --- a/tools/nixery/default.nix +++ b/tools/nixery/default.nix @@ -69,7 +69,7 @@ depot.nix.readTree.drvTargets rec { doCheck = true; # Needs to be updated after every modification of go.mod/go.sum - vendorSha256 = "115dfdhpklgmp6dsy59bp0i9inqim208lf1sqbnl9jy91bnnbl32"; + vendorSha256 = "sha256-io9NCeZmjCZPLmII3ajXIsBWbT40XiW8ncXOuUDabbo="; buildFlagsArray = [ "-ldflags=-s -w -X main.version=${nixery-commit-hash}" diff --git a/tools/nixery/go.mod b/tools/nixery/go.mod index 005daa337..9e896ffb4 100644 --- a/tools/nixery/go.mod +++ b/tools/nixery/go.mod @@ -6,6 +6,7 @@ require ( cloud.google.com/go/storage v1.22.1 github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/google/go-cmp v0.5.8 + github.com/im7mortal/kmutex v1.0.1 // indirect github.com/pkg/xattr v0.4.7 github.com/sirupsen/logrus v1.8.1 golang.org/x/oauth2 v0.0.0-20220524215830-622c5d57e401 diff --git a/tools/nixery/go.sum b/tools/nixery/go.sum index d6afad227..5b6054fb6 100644 --- a/tools/nixery/go.sum +++ b/tools/nixery/go.sum @@ -201,6 +201,8 @@ github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= +github.com/im7mortal/kmutex v1.0.1 h1:zAACzjwD+OEknDqnLdvRa/BhzFM872EBwKijviGLc9Q= +github.com/im7mortal/kmutex v1.0.1/go.mod h1:f71c/Ugk/+58OHRAgvgzPP3QEiWGUjK13fd8ozfKWdo= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= github.com/jung-kurt/gofpdf v1.0.0/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes=