refactor(server): Introduce pluggable interface for storage backends
This abstracts over the functionality of Google Cloud Storage and other potential underlying storage backends to make it possible to replace these in Nixery. The GCS backend is not yet reimplemented.
This commit is contained in:
parent
ffe58d6cb5
commit
f7d16c5d45
5 changed files with 110 additions and 228 deletions
|
@ -28,18 +28,16 @@ import (
|
|||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"os/exec"
|
||||
"sort"
|
||||
"strings"
|
||||
|
||||
"cloud.google.com/go/storage"
|
||||
"github.com/google/nixery/server/config"
|
||||
"github.com/google/nixery/server/layers"
|
||||
"github.com/google/nixery/server/manifest"
|
||||
"github.com/google/nixery/server/storage"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"golang.org/x/oauth2/google"
|
||||
)
|
||||
|
||||
// The maximum number of layers in an image is 125. To allow for
|
||||
|
@ -47,19 +45,16 @@ import (
|
|||
// use up is set at a lower point.
|
||||
const LayerBudget int = 94
|
||||
|
||||
// API scope needed for renaming objects in GCS
|
||||
const gcsScope = "https://www.googleapis.com/auth/devstorage.read_write"
|
||||
|
||||
// HTTP client to use for direct calls to APIs that are not part of the SDK
|
||||
var client = &http.Client{}
|
||||
|
||||
// State holds the runtime state that is carried around in Nixery and
|
||||
// passed to builder functions.
|
||||
type State struct {
|
||||
Bucket *storage.BucketHandle
|
||||
Cache *LocalCache
|
||||
Cfg config.Config
|
||||
Pop layers.Popularity
|
||||
Storage storage.Backend
|
||||
Cache *LocalCache
|
||||
Cfg config.Config
|
||||
Pop layers.Popularity
|
||||
}
|
||||
|
||||
// Image represents the information necessary for building a container image.
|
||||
|
@ -349,53 +344,6 @@ func prepareLayers(ctx context.Context, s *State, image *Image, result *ImageRes
|
|||
return entries, nil
|
||||
}
|
||||
|
||||
// renameObject renames an object in the specified Cloud Storage
|
||||
// bucket.
|
||||
//
|
||||
// The Go API for Cloud Storage does not support renaming objects, but
|
||||
// the HTTP API does. The code below makes the relevant call manually.
|
||||
func renameObject(ctx context.Context, s *State, old, new string) error {
|
||||
bucket := s.Cfg.Bucket
|
||||
|
||||
creds, err := google.FindDefaultCredentials(ctx, gcsScope)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
token, err := creds.TokenSource.Token()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// as per https://cloud.google.com/storage/docs/renaming-copying-moving-objects#rename
|
||||
url := fmt.Sprintf(
|
||||
"https://www.googleapis.com/storage/v1/b/%s/o/%s/rewriteTo/b/%s/o/%s",
|
||||
url.PathEscape(bucket), url.PathEscape(old),
|
||||
url.PathEscape(bucket), url.PathEscape(new),
|
||||
)
|
||||
|
||||
req, err := http.NewRequest("POST", url, nil)
|
||||
req.Header.Add("Authorization", "Bearer "+token.AccessToken)
|
||||
_, err = client.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// It seems that 'rewriteTo' copies objects instead of
|
||||
// renaming/moving them, hence a deletion call afterwards is
|
||||
// required.
|
||||
if err = s.Bucket.Object(old).Delete(ctx); err != nil {
|
||||
log.WithError(err).WithFields(log.Fields{
|
||||
"new": new,
|
||||
"old": old,
|
||||
}).Warn("failed to delete renamed object")
|
||||
|
||||
// this error should not break renaming and is not returned
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// layerWriter is the type for functions that can write a layer to the
|
||||
// multiwriter used for uploading & hashing.
|
||||
//
|
||||
|
@ -430,33 +378,32 @@ func (b *byteCounter) Write(p []byte) (n int, err error) {
|
|||
// The return value is the layer's SHA256 hash, which is used in the
|
||||
// image manifest.
|
||||
func uploadHashLayer(ctx context.Context, s *State, key string, lw layerWriter) (*manifest.Entry, error) {
|
||||
staging := s.Bucket.Object("staging/" + key)
|
||||
path := "staging/" + key
|
||||
sha256sum, size, err := s.Storage.Persist(path, func(sw io.Writer) (string, int64, error) {
|
||||
// Sets up a "multiwriter" that simultaneously runs both hash
|
||||
// algorithms and uploads to the storage backend.
|
||||
shasum := sha256.New()
|
||||
counter := &byteCounter{}
|
||||
multi := io.MultiWriter(sw, shasum, counter)
|
||||
|
||||
// Sets up a "multiwriter" that simultaneously runs both hash
|
||||
// algorithms and uploads to the bucket
|
||||
sw := staging.NewWriter(ctx)
|
||||
shasum := sha256.New()
|
||||
counter := &byteCounter{}
|
||||
multi := io.MultiWriter(sw, shasum, counter)
|
||||
err := lw(multi)
|
||||
sha256sum := fmt.Sprintf("%x", shasum.Sum([]byte{}))
|
||||
|
||||
return sha256sum, counter.count, err
|
||||
})
|
||||
|
||||
err := lw(multi)
|
||||
if err != nil {
|
||||
log.WithError(err).WithField("layer", key).
|
||||
Error("failed to create and upload layer")
|
||||
log.WithError(err).WithFields(log.Fields{
|
||||
"layer": key,
|
||||
"backend": s.Storage.Name(),
|
||||
}).Error("failed to create and store layer")
|
||||
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err = sw.Close(); err != nil {
|
||||
log.WithError(err).WithField("layer", key).
|
||||
Error("failed to upload layer to staging")
|
||||
}
|
||||
|
||||
sha256sum := fmt.Sprintf("%x", shasum.Sum([]byte{}))
|
||||
|
||||
// Hashes are now known and the object is in the bucket, what
|
||||
// remains is to move it to the correct location and cache it.
|
||||
err = renameObject(ctx, s, "staging/"+key, "layers/"+sha256sum)
|
||||
err = s.Storage.Move("staging/"+key, "layers/"+sha256sum)
|
||||
if err != nil {
|
||||
log.WithError(err).WithField("layer", key).
|
||||
Error("failed to move layer from staging")
|
||||
|
@ -464,8 +411,6 @@ func uploadHashLayer(ctx context.Context, s *State, key string, lw layerWriter)
|
|||
return nil, err
|
||||
}
|
||||
|
||||
size := counter.count
|
||||
|
||||
log.WithFields(log.Fields{
|
||||
"layer": key,
|
||||
"sha256": sha256sum,
|
||||
|
|
|
@ -114,24 +114,18 @@ func (c *LocalCache) localCacheLayer(key string, e manifest.Entry) {
|
|||
}
|
||||
|
||||
// Retrieve a manifest from the cache(s). First the local cache is
|
||||
// checked, then the GCS-bucket cache.
|
||||
// checked, then the storage backend.
|
||||
func manifestFromCache(ctx context.Context, s *State, key string) (json.RawMessage, bool) {
|
||||
if m, cached := s.Cache.manifestFromLocalCache(key); cached {
|
||||
return m, true
|
||||
}
|
||||
|
||||
obj := s.Bucket.Object("manifests/" + key)
|
||||
|
||||
// Probe whether the file exists before trying to fetch it.
|
||||
_, err := obj.Attrs(ctx)
|
||||
r, err := s.Storage.Fetch("manifests/" + key)
|
||||
if err != nil {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
r, err := obj.NewReader(ctx)
|
||||
if err != nil {
|
||||
log.WithError(err).WithField("manifest", key).
|
||||
Error("failed to retrieve manifest from bucket cache")
|
||||
log.WithError(err).WithFields(log.Fields{
|
||||
"manifest": key,
|
||||
"backend": s.Storage.Name(),
|
||||
})
|
||||
|
||||
return nil, false
|
||||
}
|
||||
|
@ -139,8 +133,10 @@ func manifestFromCache(ctx context.Context, s *State, key string) (json.RawMessa
|
|||
|
||||
m, err := ioutil.ReadAll(r)
|
||||
if err != nil {
|
||||
log.WithError(err).WithField("manifest", key).
|
||||
Error("failed to read cached manifest from bucket")
|
||||
log.WithError(err).WithFields(log.Fields{
|
||||
"manifest": key,
|
||||
"backend": s.Storage.Name(),
|
||||
}).Error("failed to read cached manifest from storage backend")
|
||||
|
||||
return nil, false
|
||||
}
|
||||
|
@ -155,21 +151,17 @@ func manifestFromCache(ctx context.Context, s *State, key string) (json.RawMessa
|
|||
func cacheManifest(ctx context.Context, s *State, key string, m json.RawMessage) {
|
||||
go s.Cache.localCacheManifest(key, m)
|
||||
|
||||
obj := s.Bucket.Object("manifests/" + key)
|
||||
w := obj.NewWriter(ctx)
|
||||
r := bytes.NewReader([]byte(m))
|
||||
path := "manifests/" + key
|
||||
_, size, err := s.Storage.Persist(path, func(w io.Writer) (string, int64, error) {
|
||||
size, err := io.Copy(w, bytes.NewReader([]byte(m)))
|
||||
return "", size, err
|
||||
})
|
||||
|
||||
size, err := io.Copy(w, r)
|
||||
if err != nil {
|
||||
log.WithError(err).WithField("manifest", key).
|
||||
Error("failed to cache manifest to GCS")
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
if err = w.Close(); err != nil {
|
||||
log.WithError(err).WithField("manifest", key).
|
||||
Error("failed to cache manifest to GCS")
|
||||
log.WithError(err).WithFields(log.Fields{
|
||||
"manifest": key,
|
||||
"backend": s.Storage.Name(),
|
||||
}).Error("failed to cache manifest to storage backend")
|
||||
|
||||
return
|
||||
}
|
||||
|
@ -177,7 +169,8 @@ func cacheManifest(ctx context.Context, s *State, key string, m json.RawMessage)
|
|||
log.WithFields(log.Fields{
|
||||
"manifest": key,
|
||||
"size": size,
|
||||
}).Info("cached manifest to GCS")
|
||||
"backend": s.Storage.Name(),
|
||||
}).Info("cached manifest to storage backend")
|
||||
}
|
||||
|
||||
// Retrieve a layer build from the cache, first checking the local
|
||||
|
@ -187,16 +180,12 @@ func layerFromCache(ctx context.Context, s *State, key string) (*manifest.Entry,
|
|||
return entry, true
|
||||
}
|
||||
|
||||
obj := s.Bucket.Object("builds/" + key)
|
||||
_, err := obj.Attrs(ctx)
|
||||
r, err := s.Storage.Fetch("builds/" + key)
|
||||
if err != nil {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
r, err := obj.NewReader(ctx)
|
||||
if err != nil {
|
||||
log.WithError(err).WithField("layer", key).
|
||||
Error("failed to retrieve cached layer from GCS")
|
||||
log.WithError(err).WithFields(log.Fields{
|
||||
"layer": key,
|
||||
"backend": s.Storage.Name(),
|
||||
}).Warn("failed to retrieve cached layer from storage backend")
|
||||
|
||||
return nil, false
|
||||
}
|
||||
|
@ -205,8 +194,10 @@ func layerFromCache(ctx context.Context, s *State, key string) (*manifest.Entry,
|
|||
jb := bytes.NewBuffer([]byte{})
|
||||
_, err = io.Copy(jb, r)
|
||||
if err != nil {
|
||||
log.WithError(err).WithField("layer", key).
|
||||
Error("failed to read cached layer from GCS")
|
||||
log.WithError(err).WithFields(log.Fields{
|
||||
"layer": key,
|
||||
"backend": s.Storage.Name(),
|
||||
}).Error("failed to read cached layer from storage backend")
|
||||
|
||||
return nil, false
|
||||
}
|
||||
|
@ -227,24 +218,19 @@ func layerFromCache(ctx context.Context, s *State, key string) (*manifest.Entry,
|
|||
func cacheLayer(ctx context.Context, s *State, key string, entry manifest.Entry) {
|
||||
s.Cache.localCacheLayer(key, entry)
|
||||
|
||||
obj := s.Bucket.Object("builds/" + key)
|
||||
|
||||
j, _ := json.Marshal(&entry)
|
||||
path := "builds/" + key
|
||||
_, _, err := s.Storage.Persist(path, func(w io.Writer) (string, int64, error) {
|
||||
size, err := io.Copy(w, bytes.NewReader(j))
|
||||
return "", size, err
|
||||
})
|
||||
|
||||
w := obj.NewWriter(ctx)
|
||||
|
||||
_, err := io.Copy(w, bytes.NewReader(j))
|
||||
if err != nil {
|
||||
log.WithError(err).WithField("layer", key).
|
||||
Error("failed to cache layer")
|
||||
|
||||
return
|
||||
log.WithError(err).WithFields(log.Fields{
|
||||
"layer": key,
|
||||
"backend": s.Storage.Name(),
|
||||
}).Error("failed to cache layer")
|
||||
}
|
||||
|
||||
if err = w.Close(); err != nil {
|
||||
log.WithError(err).WithField("layer", key).
|
||||
Error("failed to cache layer")
|
||||
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
|
|
@ -18,42 +18,11 @@
|
|||
package config
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"os"
|
||||
|
||||
"cloud.google.com/go/storage"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"golang.org/x/oauth2/google"
|
||||
)
|
||||
|
||||
// Configure GCS URL signing in the presence of a service account key
|
||||
// (toggled if the user has set GOOGLE_APPLICATION_CREDENTIALS).
|
||||
func signingOptsFromEnv() *storage.SignedURLOptions {
|
||||
path := os.Getenv("GOOGLE_APPLICATION_CREDENTIALS")
|
||||
if path == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
key, err := ioutil.ReadFile(path)
|
||||
if err != nil {
|
||||
log.WithError(err).WithField("file", path).Fatal("failed to read service account key")
|
||||
}
|
||||
|
||||
conf, err := google.JWTConfigFromJSON(key)
|
||||
if err != nil {
|
||||
log.WithError(err).WithField("file", path).Fatal("failed to parse service account key")
|
||||
}
|
||||
|
||||
log.WithField("account", conf.Email).Info("GCS URL signing enabled")
|
||||
|
||||
return &storage.SignedURLOptions{
|
||||
Scheme: storage.SigningSchemeV4,
|
||||
GoogleAccessID: conf.Email,
|
||||
PrivateKey: conf.PrivateKey,
|
||||
Method: "GET",
|
||||
}
|
||||
}
|
||||
|
||||
func getConfig(key, desc, def string) string {
|
||||
value := os.Getenv(key)
|
||||
if value == "" && def == "" {
|
||||
|
@ -70,13 +39,11 @@ func getConfig(key, desc, def string) string {
|
|||
|
||||
// Config holds the Nixery configuration options.
|
||||
type Config struct {
|
||||
Bucket string // GCS bucket to cache & serve layers
|
||||
Signing *storage.SignedURLOptions // Signing options to use for GCS URLs
|
||||
Port string // Port on which to launch HTTP server
|
||||
Pkgs PkgSource // Source for Nix package set
|
||||
Timeout string // Timeout for a single Nix builder (seconds)
|
||||
WebDir string // Directory with static web assets
|
||||
PopUrl string // URL to the Nix package popularity count
|
||||
Port string // Port on which to launch HTTP server
|
||||
Pkgs PkgSource // Source for Nix package set
|
||||
Timeout string // Timeout for a single Nix builder (seconds)
|
||||
WebDir string // Directory with static web assets
|
||||
PopUrl string // URL to the Nix package popularity count
|
||||
}
|
||||
|
||||
func FromEnv() (Config, error) {
|
||||
|
@ -86,10 +53,8 @@ func FromEnv() (Config, error) {
|
|||
}
|
||||
|
||||
return Config{
|
||||
Bucket: getConfig("BUCKET", "GCS bucket for layer storage", ""),
|
||||
Port: getConfig("PORT", "HTTP port", ""),
|
||||
Pkgs: pkgs,
|
||||
Signing: signingOptsFromEnv(),
|
||||
Timeout: getConfig("NIX_TIMEOUT", "Nix builder timeout", "60"),
|
||||
WebDir: getConfig("WEB_DIR", "Static web file dir", ""),
|
||||
PopUrl: os.Getenv("NIX_POPULARITY_URL"),
|
||||
|
|
|
@ -32,9 +32,7 @@ import (
|
|||
"io/ioutil"
|
||||
"net/http"
|
||||
"regexp"
|
||||
"time"
|
||||
|
||||
"cloud.google.com/go/storage"
|
||||
"github.com/google/nixery/server/builder"
|
||||
"github.com/google/nixery/server/config"
|
||||
"github.com/google/nixery/server/layers"
|
||||
|
@ -59,49 +57,6 @@ var (
|
|||
layerRegex = regexp.MustCompile(`^/v2/([\w|\-|\.|\_|\/]+)/blobs/sha256:(\w+)$`)
|
||||
)
|
||||
|
||||
// layerRedirect constructs the public URL of the layer object in the Cloud
|
||||
// Storage bucket, signs it and redirects the user there.
|
||||
//
|
||||
// Signing the URL allows unauthenticated clients to retrieve objects from the
|
||||
// bucket.
|
||||
//
|
||||
// The Docker client is known to follow redirects, but this might not be true
|
||||
// for all other registry clients.
|
||||
func constructLayerUrl(cfg *config.Config, digest string) (string, error) {
|
||||
log.WithField("layer", digest).Info("redirecting layer request to bucket")
|
||||
object := "layers/" + digest
|
||||
|
||||
if cfg.Signing != nil {
|
||||
opts := *cfg.Signing
|
||||
opts.Expires = time.Now().Add(5 * time.Minute)
|
||||
return storage.SignedURL(cfg.Bucket, object, &opts)
|
||||
} else {
|
||||
return ("https://storage.googleapis.com/" + cfg.Bucket + "/" + object), nil
|
||||
}
|
||||
}
|
||||
|
||||
// prepareBucket configures the handle to a Cloud Storage bucket in which
|
||||
// individual layers will be stored after Nix builds. Nixery does not directly
|
||||
// serve layers to registry clients, instead it redirects them to the public
|
||||
// URLs of the Cloud Storage bucket.
|
||||
//
|
||||
// The bucket is required for Nixery to function correctly, hence fatal errors
|
||||
// are generated in case it fails to be set up correctly.
|
||||
func prepareBucket(ctx context.Context, cfg *config.Config) *storage.BucketHandle {
|
||||
client, err := storage.NewClient(ctx)
|
||||
if err != nil {
|
||||
log.WithError(err).Fatal("failed to set up Cloud Storage client")
|
||||
}
|
||||
|
||||
bkt := client.Bucket(cfg.Bucket)
|
||||
|
||||
if _, err := bkt.Attrs(ctx); err != nil {
|
||||
log.WithError(err).WithField("bucket", cfg.Bucket).Fatal("could not access configured bucket")
|
||||
}
|
||||
|
||||
return bkt
|
||||
}
|
||||
|
||||
// Downloads the popularity information for the package set from the
|
||||
// URL specified in Nixery's configuration.
|
||||
func downloadPopularity(url string) (layers.Popularity, error) {
|
||||
|
@ -218,16 +173,15 @@ func (h *registryHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||
layerMatches := layerRegex.FindStringSubmatch(r.RequestURI)
|
||||
if len(layerMatches) == 3 {
|
||||
digest := layerMatches[2]
|
||||
url, err := constructLayerUrl(&h.state.Cfg, digest)
|
||||
|
||||
storage := h.state.Storage
|
||||
err := storage.ServeLayer(digest, w)
|
||||
if err != nil {
|
||||
log.WithError(err).WithField("layer", digest).Error("failed to sign GCS URL")
|
||||
writeError(w, 500, "UNKNOWN", "could not serve layer")
|
||||
return
|
||||
log.WithError(err).WithFields(log.Fields{
|
||||
"layer": digest,
|
||||
"backend": storage.Name(),
|
||||
}).Error("failed to serve layer from storage backend")
|
||||
}
|
||||
|
||||
w.Header().Set("Location", url)
|
||||
w.WriteHeader(303)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -243,7 +197,6 @@ func main() {
|
|||
}
|
||||
|
||||
ctx := context.Background()
|
||||
bucket := prepareBucket(ctx, &cfg)
|
||||
cache, err := builder.NewCache()
|
||||
if err != nil {
|
||||
log.WithError(err).Fatal("failed to instantiate build cache")
|
||||
|
@ -259,10 +212,9 @@ func main() {
|
|||
}
|
||||
|
||||
state := builder.State{
|
||||
Bucket: bucket,
|
||||
Cache: &cache,
|
||||
Cfg: cfg,
|
||||
Pop: pop,
|
||||
Cache: &cache,
|
||||
Cfg: cfg,
|
||||
Pop: pop,
|
||||
}
|
||||
|
||||
log.WithFields(log.Fields{
|
||||
|
|
34
tools/nixery/server/storage/storage.go
Normal file
34
tools/nixery/server/storage/storage.go
Normal file
|
@ -0,0 +1,34 @@
|
|||
// Package storage implements an interface that can be implemented by
|
||||
// storage backends, such as Google Cloud Storage or the local
|
||||
// filesystem.
|
||||
package storage
|
||||
|
||||
import (
|
||||
"io"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
type Backend interface {
|
||||
// Name returns the name of the storage backend, for use in
|
||||
// log messages and such.
|
||||
Name() string
|
||||
|
||||
// Persist provides a user-supplied function with a writer
|
||||
// that stores data in the storage backend.
|
||||
//
|
||||
// It needs to return the SHA256 hash of the data written as
|
||||
// well as the total number of bytes, as those are required
|
||||
// for the image manifest.
|
||||
Persist(string, func(io.Writer) (string, int64, error)) (string, int64, error)
|
||||
|
||||
// Fetch retrieves data from the storage backend.
|
||||
Fetch(path string) (io.ReadCloser, error)
|
||||
|
||||
// Move renames a path inside the storage backend. This is
|
||||
// used for staging uploads while calculating their hashes.
|
||||
Move(old, new string) error
|
||||
|
||||
// Serve provides a handler function to serve HTTP requests
|
||||
// for layers in the storage backend.
|
||||
ServeLayer(digest string, w http.ResponseWriter) error
|
||||
}
|
Loading…
Reference in a new issue