tvl-depot/tools/nixery/cmd/server/main.go
talyz 28417afbb4 fix(nixery): Avoid race when the same image is fetched in parallel
Remove a race condition which appears when uploadHashLayer is called
with the same key from multiple threads simultaneously. This can
easily happen when the same image path is requested by multiple
clients at the same time. When it does, a 500 status is returned and
the following error message is logged:

{
  "context": {
    "filePath": "github.com/google/nixery/builder/builder.go",
    "lineNumber": 440,
    "functionName": "github.com/google/nixery/builder.uploadHashLayer"
  },
  "error": "rename /var/lib/nixery/staging/<hash> /var/lib/nixery/layers/<hash>: no such file or directory",
  "eventTime": "...",
  "layer": "<hash>",
  "message": "failed to move layer from staging",
  ...
}

To solve this issue, introduce a mutex keyed on the uploaded hash and
move all layer caching into uploadHashLayer. This could additionally
provide a small performance benefit when an already built image is
requested and NIXERY_PKGS_PATH is set, since symlink layers and config
layers are now also cached.

Change-Id: I50788a7ec7940cb5e5760f244692e361019a9bb7
Reviewed-on: https://cl.tvl.fyi/c/depot/+/6695
Reviewed-by: tazjin <tazjin@tvl.su>
Tested-by: BuildkiteCI
2022-09-19 11:28:10 +00:00

283 lines
7.8 KiB
Go

// Copyright 2022 The TVL Contributors
// SPDX-License-Identifier: Apache-2.0
// The nixery server implements a container registry that transparently builds
// container images based on Nix derivations.
//
// The Nix derivation used for image creation is responsible for creating
// objects that are compatible with the registry API. The targeted registry
// protocol is currently Docker's.
//
// When an image is requested, the required contents are parsed out of the
// request and a Nix-build is initiated that eventually responds with the
// manifest as well as information linking each layer digest to a local
// filesystem path.
package main
import (
"context"
"crypto/sha256"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"regexp"
"github.com/google/nixery/builder"
"github.com/google/nixery/config"
"github.com/google/nixery/layers"
"github.com/google/nixery/logs"
mf "github.com/google/nixery/manifest"
"github.com/google/nixery/storage"
"github.com/im7mortal/kmutex"
log "github.com/sirupsen/logrus"
)
// ManifestMediaType is the Content-Type used for the manifest itself. This
// corresponds to the "Image Manifest V2, Schema 2" described on this page:
//
// https://docs.docker.com/registry/spec/manifest-v2-2/
const manifestMediaType string = "application/vnd.docker.distribution.manifest.v2+json"
// This variable will be initialised during the build process and set
// to the hash of the entire Nixery source tree.
var version string = "devel"
// Regexes matching the V2 Registry API routes. This only includes the
// routes required for serving images, since pushing and other such
// functionality is not available.
var (
manifestRegex = regexp.MustCompile(`^/v2/([\w|\-|\.|\_|\/]+)/manifests/([\w|\-|\.|\_]+)$`)
blobRegex = regexp.MustCompile(`^/v2/([\w|\-|\.|\_|\/]+)/(blobs|manifests)/sha256:(\w+)$`)
)
// Downloads the popularity information for the package set from the
// URL specified in Nixery's configuration.
func downloadPopularity(url string) (layers.Popularity, error) {
resp, err := http.Get(url)
if err != nil {
return nil, err
}
if resp.StatusCode != 200 {
return nil, fmt.Errorf("popularity download from '%s' returned status: %s\n", url, resp.Status)
}
j, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
var pop layers.Popularity
err = json.Unmarshal(j, &pop)
if err != nil {
return nil, err
}
return pop, nil
}
// Error format corresponding to the registry protocol V2 specification. This
// allows feeding back errors to clients in a way that can be presented to
// users.
type registryError struct {
Code string `json:"code"`
Message string `json:"message"`
}
type registryErrors struct {
Errors []registryError `json:"errors"`
}
func writeError(w http.ResponseWriter, status int, code, message string) {
err := registryErrors{
Errors: []registryError{
{code, message},
},
}
json, _ := json.Marshal(err)
w.WriteHeader(status)
w.Header().Add("Content-Type", "application/json")
w.Write(json)
}
type registryHandler struct {
state *builder.State
}
// Serve a manifest by tag, building it via Nix and populating caches
// if necessary.
func (h *registryHandler) serveManifestTag(w http.ResponseWriter, r *http.Request, name string, tag string) {
log.WithFields(log.Fields{
"image": name,
"tag": tag,
}).Info("requesting image manifest")
image := builder.ImageFromName(name, tag)
buildResult, err := builder.BuildImage(r.Context(), h.state, &image)
if err != nil {
writeError(w, 500, "UNKNOWN", "image build failure")
log.WithError(err).WithFields(log.Fields{
"image": name,
"tag": tag,
}).Error("failed to build image manifest")
return
}
// Some error types have special handling, which is applied
// here.
if buildResult.Error == "not_found" {
s := fmt.Sprintf("Could not find Nix packages: %v", buildResult.Pkgs)
writeError(w, 404, "MANIFEST_UNKNOWN", s)
log.WithFields(log.Fields{
"image": name,
"tag": tag,
"packages": buildResult.Pkgs,
}).Warn("could not find Nix packages")
return
}
// This marshaling error is ignored because we know that this
// field represents valid JSON data.
manifest, _ := json.Marshal(buildResult.Manifest)
w.Header().Add("Content-Type", manifestMediaType)
// The manifest needs to be persisted to the blob storage (to become
// available for clients that fetch manifests by their hash, e.g.
// containerd) and served to the client.
//
// Since we have no stable key to address this manifest (it may be
// uncacheable, yet still addressable by blob) we need to separate
// out the hashing, uploading and serving phases. The latter is
// especially important as clients may start to fetch it by digest
// as soon as they see a response.
sha256sum := fmt.Sprintf("%x", sha256.Sum256(manifest))
path := "layers/" + sha256sum
ctx := context.TODO()
_, _, err = h.state.Storage.Persist(ctx, path, mf.ManifestType, func(sw io.Writer) (string, int64, error) {
// We already know the hash, so no additional hash needs to be
// constructed here.
written, err := sw.Write(manifest)
return sha256sum, int64(written), err
})
if err != nil {
writeError(w, 500, "MANIFEST_UPLOAD", "could not upload manifest to blob store")
log.WithError(err).WithFields(log.Fields{
"image": name,
"tag": tag,
}).Error("could not upload manifest")
return
}
w.Write(manifest)
}
// serveBlob serves a blob from storage by digest
func (h *registryHandler) serveBlob(w http.ResponseWriter, r *http.Request, blobType, digest string) {
storage := h.state.Storage
err := storage.Serve(digest, r, w)
if err != nil {
log.WithError(err).WithFields(log.Fields{
"type": blobType,
"digest": digest,
"backend": storage.Name(),
}).Error("failed to serve blob from storage backend")
}
}
// ServeHTTP dispatches HTTP requests to the matching handlers.
func (h *registryHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Acknowledge that we speak V2 with an empty response
if r.RequestURI == "/v2/" {
return
}
// Build & serve a manifest by tag
manifestMatches := manifestRegex.FindStringSubmatch(r.RequestURI)
if len(manifestMatches) == 3 {
h.serveManifestTag(w, r, manifestMatches[1], manifestMatches[2])
return
}
// Serve a blob by digest
layerMatches := blobRegex.FindStringSubmatch(r.RequestURI)
if len(layerMatches) == 4 {
h.serveBlob(w, r, layerMatches[2], layerMatches[3])
return
}
log.WithField("uri", r.RequestURI).Info("unsupported registry route")
w.WriteHeader(404)
}
func main() {
logs.Init(version)
cfg, err := config.FromEnv()
if err != nil {
log.WithError(err).Fatal("failed to load configuration")
}
var s storage.Backend
switch cfg.Backend {
case config.GCS:
s, err = storage.NewGCSBackend()
case config.FileSystem:
s, err = storage.NewFSBackend()
}
if err != nil {
log.WithError(err).Fatal("failed to initialise storage backend")
}
log.WithField("backend", s.Name()).Info("initialised storage backend")
cache, err := builder.NewCache()
if err != nil {
log.WithError(err).Fatal("failed to instantiate build cache")
}
var pop layers.Popularity
if cfg.PopUrl != "" {
pop, err = downloadPopularity(cfg.PopUrl)
if err != nil {
log.WithError(err).WithField("popURL", cfg.PopUrl).
Fatal("failed to fetch popularity information")
}
}
state := builder.State{
Cache: &cache,
Cfg: cfg,
Pop: pop,
Storage: s,
UploadMutex: kmutex.New(),
}
log.WithFields(log.Fields{
"version": version,
"port": cfg.Port,
}).Info("starting Nixery")
// All /v2/ requests belong to the registry handler.
http.Handle("/v2/", &registryHandler{
state: &state,
})
// All other roots are served by the static file server.
webDir := http.Dir(cfg.WebDir)
http.Handle("/", http.FileServer(webDir))
log.Fatal(http.ListenAndServe(":"+cfg.Port, nil))
}