feat(tvix/nar-bridge): graceful shutdown

This gives existing clients 30s to finish their requests after receiving
an interrupt.

Change-Id: Ia9b0e662fd1ffbbb6c2d03f3dd6548b13cf3d241
Reviewed-on: https://cl.tvl.fyi/c/depot/+/9365
Autosubmit: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
Reviewed-by: Connor Brewster <cbrewster@hey.com>
This commit is contained in:
Florian Klink 2023-09-18 16:51:39 +03:00 committed by clbot
parent 02aed32bf2
commit 6c586bc2a7
2 changed files with 30 additions and 16 deletions

View file

@ -1,8 +1,10 @@
package main
import (
"context"
"os"
"os/signal"
"time"
"github.com/alecthomas/kong"
@ -33,25 +35,17 @@ func main() {
}
logrus.SetLevel(logLevel)
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
go func() {
for range c {
log.Info("Received Signal, shutting down…")
os.Exit(1)
}
}()
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
defer stop()
// connect to tvix-store
log.Debugf("Dialing to %v", cli.StoreAddr)
conn, err := grpc.Dial(cli.StoreAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
conn, err := grpc.DialContext(ctx, cli.StoreAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
log.Printf("Starting nar-bridge at %v", cli.ListenAddr)
s := server.New(
storev1pb.NewDirectoryServiceClient(conn),
storev1pb.NewBlobServiceClient(conn),
@ -60,9 +54,21 @@ func main() {
30,
)
err = s.ListenAndServe(cli.ListenAddr)
if err != nil {
log.Error("Server failed: %w", err)
log.Printf("Starting nar-bridge at %v", cli.ListenAddr)
go s.ListenAndServe(cli.ListenAddr)
// listen for the interrupt signal.
<-ctx.Done()
// Restore default behaviour on the interrupt signal
stop()
log.Info("Received Signal, shutting down, press Ctl+C again to force.")
timeoutCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if s.Shutdown(timeoutCtx); err != nil {
log.WithError(err).Warn("failed to shutdown")
os.Exit(1)
}
}

View file

@ -1,6 +1,7 @@
package server
import (
"context"
"fmt"
"net/http"
"sync"
@ -13,6 +14,7 @@ import (
)
type Server struct {
srv *http.Server
handler chi.Router
directoryServiceClient storev1pb.DirectoryServiceClient
@ -73,8 +75,14 @@ func New(
return s
}
func (s *Server) Shutdown(ctx context.Context) error {
return s.srv.Shutdown(ctx)
}
// ListenAndServer starts the webserver, and waits for it being closed or
// shutdown, after which it'll return ErrServerClosed.
func (s *Server) ListenAndServe(addr string) error {
srv := &http.Server{
s.srv = &http.Server{
Addr: addr,
Handler: s.handler,
ReadTimeout: 500 * time.Second,
@ -82,5 +90,5 @@ func (s *Server) ListenAndServe(addr string) error {
IdleTimeout: 500 * time.Second,
}
return srv.ListenAndServe()
return s.srv.ListenAndServe()
}