fix(tvix/nar-bridge): drop pathinfoservice
This now exists in tvix-store directly, as NixHTTPPathInfoService, and contrary to this version, also validates signatures. Change-Id: Ib6ca161e40d627b7d9741839fc849f2392f422da Reviewed-on: https://cl.tvl.fyi/c/depot/+/10155 Tested-by: BuildkiteCI Autosubmit: flokli <flokli@flokli.de> Reviewed-by: Connor Brewster <cbrewster@hey.com>
This commit is contained in:
parent
563886c3de
commit
dfb48dcade
3 changed files with 0 additions and 356 deletions
|
@ -51,7 +51,6 @@ This folder contains the following components:
|
||||||
* `//tvix/eval` - an implementation of the Nix programming language
|
* `//tvix/eval` - an implementation of the Nix programming language
|
||||||
* `//tvix/nar-bridge`
|
* `//tvix/nar-bridge`
|
||||||
* `nar-bridge-http`: A HTTP webserver providing a Nix HTTP Binary Cache interface in front of a tvix-store
|
* `nar-bridge-http`: A HTTP webserver providing a Nix HTTP Binary Cache interface in front of a tvix-store
|
||||||
* `nar-bridge-pathinfo`: A gRPC server, exposing a Nix HTTP Binary Cache as a tvix-store PathInfoService
|
|
||||||
* `//tvix/nix-compat` - a Rust library for compatibility with C++ Nix, features like encodings and hashing schemes and formats
|
* `//tvix/nix-compat` - a Rust library for compatibility with C++ Nix, features like encodings and hashing schemes and formats
|
||||||
* `//tvix/serde` - a Rust library for using the Nix language for app configuration
|
* `//tvix/serde` - a Rust library for using the Nix language for app configuration
|
||||||
* `//tvix/store` - a "filesystem" linking Nix store paths and metadata with the content-addressed layer
|
* `//tvix/store` - a "filesystem" linking Nix store paths and metadata with the content-addressed layer
|
||||||
|
|
|
@ -1,117 +0,0 @@
|
||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
"net"
|
|
||||||
"net/http"
|
|
||||||
"net/url"
|
|
||||||
"os"
|
|
||||||
"os/signal"
|
|
||||||
"strings"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/alecthomas/kong"
|
|
||||||
|
|
||||||
"google.golang.org/grpc"
|
|
||||||
"google.golang.org/grpc/credentials/insecure"
|
|
||||||
"google.golang.org/grpc/reflection"
|
|
||||||
|
|
||||||
castorev1pb "code.tvl.fyi/tvix/castore-go"
|
|
||||||
"code.tvl.fyi/tvix/nar-bridge/pkg/pathinfosvc"
|
|
||||||
storev1pb "code.tvl.fyi/tvix/store-go"
|
|
||||||
"github.com/sirupsen/logrus"
|
|
||||||
log "github.com/sirupsen/logrus"
|
|
||||||
)
|
|
||||||
|
|
||||||
// `help:"Provide a tvix-store gRPC PathInfoService for a HTTP Nix Binary Cache"`
|
|
||||||
var cli struct {
|
|
||||||
LogLevel string `enum:"trace,debug,info,warn,error,fatal,panic" help:"The log level to log with" default:"info"`
|
|
||||||
ListenAddr string `name:"listen-addr" help:"The address this service listens on" type:"string" default:"[::]:8001"` //nolint:lll
|
|
||||||
BlobServiceAddr string `name:"blob-service-addr" env:"BLOB_SERVICE_ADDR" default:"grpc+http://[::1]:8000"`
|
|
||||||
DirectoryServiceAddr string `name:"directory-service-addr" env:"DIRECTORY_SERVICE_ADDR" default:"grpc+http://[::1]:8000"`
|
|
||||||
HTTPBinaryCacheURL *url.URL `name:"http-binary-cache-url" env:"HTTP_BINARY_CACHE_URL" help:"The URL containing the Nix HTTP Binary cache" default:"https://cache.nixos.org"`
|
|
||||||
}
|
|
||||||
|
|
||||||
func connectService(ctx context.Context, serviceAddr string) (*grpc.ClientConn, error) {
|
|
||||||
if !strings.HasPrefix(serviceAddr, "grpc+http://") {
|
|
||||||
return nil, fmt.Errorf("invalid serviceAddr: %s", serviceAddr)
|
|
||||||
}
|
|
||||||
addr := strings.TrimPrefix(serviceAddr, "grpc+http://")
|
|
||||||
|
|
||||||
conn, err := grpc.DialContext(ctx, addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("did not connect: %v", err)
|
|
||||||
}
|
|
||||||
return conn, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func main() {
|
|
||||||
_ = kong.Parse(&cli)
|
|
||||||
|
|
||||||
logLevel, err := logrus.ParseLevel(cli.LogLevel)
|
|
||||||
if err != nil {
|
|
||||||
log.Fatal("invalid log level")
|
|
||||||
}
|
|
||||||
logrus.SetLevel(logLevel)
|
|
||||||
|
|
||||||
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
|
|
||||||
defer stop()
|
|
||||||
|
|
||||||
// connect to the two stores
|
|
||||||
connBlobSvc, err := connectService(ctx, cli.BlobServiceAddr)
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("unable to connect to blob service: %v", err)
|
|
||||||
}
|
|
||||||
defer connBlobSvc.Close()
|
|
||||||
|
|
||||||
connDirectorySvc, err := connectService(ctx, cli.DirectoryServiceAddr)
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("unable to connect to directory service: %v", err)
|
|
||||||
}
|
|
||||||
defer connDirectorySvc.Close()
|
|
||||||
|
|
||||||
// set up pathinfoservice
|
|
||||||
var opts []grpc.ServerOption
|
|
||||||
s := grpc.NewServer(opts...)
|
|
||||||
reflection.Register(s)
|
|
||||||
|
|
||||||
storev1pb.RegisterPathInfoServiceServer(s,
|
|
||||||
pathinfosvc.New(
|
|
||||||
cli.HTTPBinaryCacheURL,
|
|
||||||
&http.Client{},
|
|
||||||
castorev1pb.NewDirectoryServiceClient(connDirectorySvc),
|
|
||||||
castorev1pb.NewBlobServiceClient(connBlobSvc),
|
|
||||||
),
|
|
||||||
)
|
|
||||||
|
|
||||||
log.Printf("Starting nar-bridge-pathinfosvc at %v", cli.ListenAddr)
|
|
||||||
lis, err := net.Listen("tcp", cli.ListenAddr)
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("failed to listen: %v", err)
|
|
||||||
}
|
|
||||||
go s.Serve(lis)
|
|
||||||
|
|
||||||
// listen for the interrupt signal.
|
|
||||||
<-ctx.Done()
|
|
||||||
|
|
||||||
// Restore default behaviour on the interrupt signal
|
|
||||||
stop()
|
|
||||||
log.Info("Received Signal, shutting down, press Ctl+C again to force.")
|
|
||||||
|
|
||||||
stopped := make(chan interface{})
|
|
||||||
go func() {
|
|
||||||
s.GracefulStop()
|
|
||||||
close(stopped)
|
|
||||||
}()
|
|
||||||
|
|
||||||
t := time.NewTimer(30 * time.Second)
|
|
||||||
select {
|
|
||||||
case <-t.C:
|
|
||||||
log.Info("timeout, kicking remaining clients")
|
|
||||||
s.Stop()
|
|
||||||
case <-stopped:
|
|
||||||
log.Info("all clients left during grace period")
|
|
||||||
t.Stop()
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,238 +0,0 @@
|
||||||
package pathinfosvc
|
|
||||||
|
|
||||||
import (
|
|
||||||
"bufio"
|
|
||||||
"bytes"
|
|
||||||
"context"
|
|
||||||
"encoding/base64"
|
|
||||||
"fmt"
|
|
||||||
"io"
|
|
||||||
"net/http"
|
|
||||||
"net/url"
|
|
||||||
|
|
||||||
castorev1pb "code.tvl.fyi/tvix/castore-go"
|
|
||||||
"code.tvl.fyi/tvix/nar-bridge/pkg/importer"
|
|
||||||
storev1pb "code.tvl.fyi/tvix/store-go"
|
|
||||||
mh "github.com/multiformats/go-multihash/core"
|
|
||||||
"github.com/nix-community/go-nix/pkg/narinfo"
|
|
||||||
"github.com/nix-community/go-nix/pkg/nixbase32"
|
|
||||||
"github.com/sirupsen/logrus"
|
|
||||||
"github.com/ulikunitz/xz"
|
|
||||||
"google.golang.org/grpc/codes"
|
|
||||||
"google.golang.org/grpc/status"
|
|
||||||
)
|
|
||||||
|
|
||||||
var _ storev1pb.PathInfoServiceServer = &PathInfoServiceServer{}
|
|
||||||
|
|
||||||
// PathInfoServiceServer exposes a Nix HTTP Binary Cache as a storev1pb.PathInfoServiceServer.
|
|
||||||
type PathInfoServiceServer struct {
|
|
||||||
storev1pb.UnimplementedPathInfoServiceServer
|
|
||||||
httpEndpoint *url.URL
|
|
||||||
httpClient *http.Client
|
|
||||||
// TODO: signatures
|
|
||||||
|
|
||||||
directoryServiceClient castorev1pb.DirectoryServiceClient
|
|
||||||
blobServiceClient castorev1pb.BlobServiceClient
|
|
||||||
}
|
|
||||||
|
|
||||||
func New(httpEndpoint *url.URL, httpClient *http.Client, directoryServiceClient castorev1pb.DirectoryServiceClient, blobServiceClient castorev1pb.BlobServiceClient) *PathInfoServiceServer {
|
|
||||||
return &PathInfoServiceServer{
|
|
||||||
httpEndpoint: httpEndpoint,
|
|
||||||
httpClient: httpClient,
|
|
||||||
directoryServiceClient: directoryServiceClient,
|
|
||||||
blobServiceClient: blobServiceClient,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// CalculateNAR implements storev1.PathInfoServiceServer.
|
|
||||||
// It returns PermissionDenied, as clients are supposed to calculate NAR hashes themselves.
|
|
||||||
func (*PathInfoServiceServer) CalculateNAR(context.Context, *castorev1pb.Node) (*storev1pb.CalculateNARResponse, error) {
|
|
||||||
return nil, status.Error(codes.PermissionDenied, "do it yourself please")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get implements storev1.PathInfoServiceServer.
|
|
||||||
// It only supports lookup my outhash, translates them to a corresponding GET $outhash.narinfo request,
|
|
||||||
// ingests the NAR file, while populating blob and directory service, then returns the PathInfo node.
|
|
||||||
// Subsequent requests will traverse the NAR file again, so make sure to compose this with another
|
|
||||||
// PathInfoService as caching layer.
|
|
||||||
func (p *PathInfoServiceServer) Get(ctx context.Context, getPathInfoRequest *storev1pb.GetPathInfoRequest) (*storev1pb.PathInfo, error) {
|
|
||||||
outputHash := getPathInfoRequest.GetByOutputHash()
|
|
||||||
if outputHash == nil {
|
|
||||||
return nil, status.Error(codes.Unimplemented, "only by output hash supported")
|
|
||||||
}
|
|
||||||
|
|
||||||
// construct NARInfo URL
|
|
||||||
narinfoURL := p.httpEndpoint.JoinPath(fmt.Sprintf("%v.narinfo", nixbase32.EncodeToString(outputHash)))
|
|
||||||
|
|
||||||
log := logrus.WithField("output_hash", base64.StdEncoding.EncodeToString(outputHash))
|
|
||||||
|
|
||||||
// We start right with a GET request, rather than doing a HEAD request.
|
|
||||||
// If a request to the PathInfoService reaches us, an upper layer *wants* it
|
|
||||||
// from us.
|
|
||||||
// Doing a HEAD first wouldn't give us anything, we can still react on the Not
|
|
||||||
// Found situation when doing the GET request.
|
|
||||||
niRq, err := http.NewRequestWithContext(ctx, "GET", narinfoURL.String(), nil)
|
|
||||||
if err != nil {
|
|
||||||
log.WithError(err).Error("unable to construct NARInfo request")
|
|
||||||
return nil, status.Errorf(codes.Internal, "unable to construct NARInfo request")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Do the actual request; this follows redirects.
|
|
||||||
niResp, err := p.httpClient.Do(niRq)
|
|
||||||
if err != nil {
|
|
||||||
log.WithError(err).Error("unable to do NARInfo request")
|
|
||||||
return nil, status.Errorf(codes.Internal, "unable to do NARInfo request")
|
|
||||||
}
|
|
||||||
defer niResp.Body.Close()
|
|
||||||
|
|
||||||
// In the case of a 404, return a NotFound.
|
|
||||||
// We also return a NotFound in case of a 403 - this is to match the behaviour as Nix,
|
|
||||||
// when querying nix-cache.s3.amazonaws.com directly, rather than cache.nixos.org.
|
|
||||||
if niResp.StatusCode == http.StatusNotFound || niResp.StatusCode == http.StatusForbidden {
|
|
||||||
log.Warn("no NARInfo found")
|
|
||||||
return nil, status.Error(codes.NotFound, "no NARInfo found")
|
|
||||||
}
|
|
||||||
|
|
||||||
if niResp.StatusCode < 200 || niResp.StatusCode >= 300 {
|
|
||||||
log.WithField("status_code", niResp.StatusCode).Warn("Got non-success when trying to request NARInfo")
|
|
||||||
return nil, status.Errorf(codes.Internal, "got status code %v trying to request NARInfo", niResp.StatusCode)
|
|
||||||
}
|
|
||||||
|
|
||||||
// parse the NARInfo file.
|
|
||||||
narInfo, err := narinfo.Parse(niResp.Body)
|
|
||||||
if err != nil {
|
|
||||||
log.WithError(err).Warn("Unable to parse NARInfo")
|
|
||||||
return nil, status.Errorf(codes.Internal, "unable to parse NARInfo")
|
|
||||||
}
|
|
||||||
|
|
||||||
// close niResp.Body, we're not gonna read from there anymore.
|
|
||||||
_ = niResp.Body.Close()
|
|
||||||
|
|
||||||
// validate the NARInfo file. This ensures strings we need to parse actually parse,
|
|
||||||
// so we can just plain panic further down.
|
|
||||||
if err := narInfo.Check(); err != nil {
|
|
||||||
log.WithError(err).Warn("unable to validate NARInfo")
|
|
||||||
return nil, status.Errorf(codes.Internal, "unable to validate NARInfo: %s", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// only allow sha256 here. Is anything else even supported by Nix?
|
|
||||||
if narInfo.NarHash.HashType != mh.SHA2_256 {
|
|
||||||
log.Error("unsupported hash type")
|
|
||||||
return nil, status.Errorf(codes.Internal, "unsuported hash type in NarHash: %s", narInfo.NarHash.SRIString())
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: calculate fingerprint, check with trusted pubkeys, decide what to do on mismatch
|
|
||||||
|
|
||||||
log = log.WithField("narinfo_narhash", narInfo.NarHash.SRIString())
|
|
||||||
log = log.WithField("nar_url", narInfo.URL)
|
|
||||||
|
|
||||||
// prepare the GET request for the NAR file.
|
|
||||||
narRq, err := http.NewRequestWithContext(ctx, "GET", p.httpEndpoint.JoinPath(narInfo.URL).String(), nil)
|
|
||||||
if err != nil {
|
|
||||||
log.WithError(err).Error("unable to construct NAR request")
|
|
||||||
return nil, status.Errorf(codes.Internal, "unable to construct NAR request")
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Info("requesting NAR")
|
|
||||||
narResp, err := p.httpClient.Do(narRq)
|
|
||||||
if err != nil {
|
|
||||||
log.WithError(err).Error("error during NAR request")
|
|
||||||
return nil, status.Errorf(codes.Internal, "error during NAR request")
|
|
||||||
}
|
|
||||||
defer narResp.Body.Close()
|
|
||||||
|
|
||||||
// If we can't access the NAR file that the NARInfo is referring to, this is a store inconsistency.
|
|
||||||
// Propagate a more serious Internal error, rather than just a NotFound.
|
|
||||||
if narResp.StatusCode == http.StatusNotFound || narResp.StatusCode == http.StatusForbidden {
|
|
||||||
log.Error("Unable to find NAR")
|
|
||||||
return nil, status.Errorf(codes.Internal, "NAR at URL %s does not exist", narInfo.URL)
|
|
||||||
}
|
|
||||||
|
|
||||||
// wrap narResp.Body with some buffer.
|
|
||||||
// We already defer closing the http body, so it's ok to loose io.Close here.
|
|
||||||
var narBody io.Reader
|
|
||||||
narBody = bufio.NewReaderSize(narResp.Body, 10*1024*1024)
|
|
||||||
|
|
||||||
if narInfo.Compression == "none" {
|
|
||||||
// Nothing to do
|
|
||||||
} else if narInfo.Compression == "xz" {
|
|
||||||
narBody, err = xz.NewReader(narBody)
|
|
||||||
if err != nil {
|
|
||||||
log.WithError(err).Error("failed to open xz")
|
|
||||||
return nil, status.Errorf(codes.Internal, "failed to open xz")
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
log.WithField("nar_compression", narInfo.Compression).Error("unsupported compression")
|
|
||||||
return nil, fmt.Errorf("unsupported NAR compression: %s", narInfo.Compression)
|
|
||||||
}
|
|
||||||
|
|
||||||
directoriesUploader := importer.NewDirectoriesUploader(ctx, p.directoryServiceClient)
|
|
||||||
defer directoriesUploader.Done() //nolint:errcheck
|
|
||||||
|
|
||||||
blobUploaderCb := importer.GenBlobUploaderCb(ctx, p.blobServiceClient)
|
|
||||||
|
|
||||||
rootNode, _, importedNarSha256, err := importer.Import(
|
|
||||||
ctx,
|
|
||||||
narBody,
|
|
||||||
func(blobReader io.Reader) ([]byte, error) {
|
|
||||||
blobDigest, err := blobUploaderCb(blobReader)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
log.WithField("blob_digest", base64.StdEncoding.EncodeToString(blobDigest)).Debug("upload blob")
|
|
||||||
return blobDigest, nil
|
|
||||||
},
|
|
||||||
func(directory *castorev1pb.Directory) ([]byte, error) {
|
|
||||||
directoryDigest, err := directoriesUploader.Put(directory)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
log.WithField("directory_digest", base64.StdEncoding.EncodeToString(directoryDigest)).Debug("upload directory")
|
|
||||||
return directoryDigest, nil
|
|
||||||
},
|
|
||||||
)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
log.WithError(err).Error("error during NAR import")
|
|
||||||
return nil, status.Error(codes.Internal, "error during NAR import")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close the directories uploader. This ensures the DirectoryService has
|
|
||||||
// properly persisted all Directory messages sent.
|
|
||||||
if _, err := directoriesUploader.Done(); err != nil {
|
|
||||||
log.WithError(err).Error("error during directory upload")
|
|
||||||
|
|
||||||
return nil, status.Error(codes.Internal, "error during directory upload")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Compare NAR hash in the NARInfo with the one we calculated while reading the NAR
|
|
||||||
// We don't need to additionally compare the narSize.
|
|
||||||
if !bytes.Equal(narInfo.NarHash.Digest(), importedNarSha256) {
|
|
||||||
log := log.WithField("imported_nar_sha256", base64.StdEncoding.EncodeToString(importedNarSha256))
|
|
||||||
log.Error("imported digest doesn't match NARInfo digest")
|
|
||||||
|
|
||||||
return nil, fmt.Errorf("imported digest doesn't match NARInfo digest")
|
|
||||||
}
|
|
||||||
|
|
||||||
// generate PathInfo
|
|
||||||
pathInfo, err := importer.GenPathInfo(rootNode, narInfo)
|
|
||||||
if err != nil {
|
|
||||||
log.WithError(err).Error("uable to generate PathInfo")
|
|
||||||
return nil, status.Errorf(codes.Internal, "unable to generate PathInfo")
|
|
||||||
}
|
|
||||||
|
|
||||||
return pathInfo, nil
|
|
||||||
|
|
||||||
// TODO: Deriver, System, CA
|
|
||||||
}
|
|
||||||
|
|
||||||
// List implements storev1.PathInfoServiceServer.
|
|
||||||
// It returns a permission denied, because normally you can't get a listing
|
|
||||||
func (*PathInfoServiceServer) List(*storev1pb.ListPathInfoRequest, storev1pb.PathInfoService_ListServer) error {
|
|
||||||
return status.Error(codes.Unimplemented, "unimplemented")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Put implements storev1.PathInfoServiceServer.
|
|
||||||
func (*PathInfoServiceServer) Put(context.Context, *storev1pb.PathInfo) (*storev1pb.PathInfo, error) {
|
|
||||||
return nil, status.Error(codes.Unimplemented, "unimplemented")
|
|
||||||
}
|
|
Loading…
Reference in a new issue