From 5b36a3a92331bef4270b37b7ec00d095f2aab8c2 Mon Sep 17 00:00:00 2001 From: "Alex Ellis (OpenFaaS Ltd)" Date: Tue, 27 Jun 2023 12:23:14 +0100 Subject: [PATCH] Enable quicker shutdown Reduces default grace period from 30s to 5s for removing functions. The healthcheck_interval env-var can be used to override the value and set it higher. Signed-off-by: Alex Ellis (OpenFaaS Ltd) --- cmd/up.go | 1 + pkg/provider/handlers/delete.go | 30 +++++++---- pkg/provider/handlers/update.go | 10 ++-- pkg/service/service.go | 88 ++++++++++++++++++--------------- pkg/supervisor.go | 13 +++-- 5 files changed, 83 insertions(+), 59 deletions(-) diff --git a/cmd/up.go b/cmd/up.go index 87a6268..48e2709 100644 --- a/cmd/up.go +++ b/cmd/up.go @@ -92,6 +92,7 @@ func runUp(cmd *cobra.Command, _ []string) error { <-sig log.Printf("Signal received.. shutting down server in %s\n", shutdownTimeout.String()) + err := supervisor.Remove(services) if err != nil { fmt.Println(err) diff --git a/pkg/provider/handlers/delete.go b/pkg/provider/handlers/delete.go index d45dfba..4deb5ad 100644 --- a/pkg/provider/handlers/delete.go +++ b/pkg/provider/handlers/delete.go @@ -7,6 +7,7 @@ import ( "io/ioutil" "log" "net/http" + "time" "github.com/containerd/containerd" "github.com/containerd/containerd/namespaces" @@ -32,8 +33,7 @@ func MakeDeleteHandler(client *containerd.Client, cni gocni.CNI) func(w http.Res log.Printf("[Delete] request: %s\n", string(body)) req := requests.DeleteFunctionRequest{} - err := json.Unmarshal(body, &req) - if err != nil { + if err := json.Unmarshal(body, &req); err != nil { log.Printf("[Delete] error parsing input: %s\n", err) http.Error(w, err.Error(), http.StatusBadRequest) @@ -58,7 +58,7 @@ func MakeDeleteHandler(client *containerd.Client, cni gocni.CNI) func(w http.Res function, err := GetFunction(client, name, lookupNamespace) if err != nil { - msg := fmt.Sprintf("service %s not found", name) + msg := fmt.Sprintf("function: %s not found", name) log.Printf("[Delete] %s\n", msg) http.Error(w, msg, http.StatusNotFound) return @@ -70,17 +70,29 @@ func MakeDeleteHandler(client *containerd.Client, cni gocni.CNI) func(w http.Res if function.replicas != 0 { err = cninetwork.DeleteCNINetwork(ctx, cni, client, name) if err != nil { - log.Printf("[Delete] error removing CNI network for %s, %s\n", name, err) + log.Printf("[Delete] error removing CNI network for: %s, %s\n", name, err) } } - containerErr := service.Remove(ctx, client, name) - if containerErr != nil { - log.Printf("[Delete] error removing %s, %s\n", name, containerErr) - http.Error(w, containerErr.Error(), http.StatusInternalServerError) + killTimeout := getKillTimeout(function.envVars) + + if err := service.Remove(ctx, client, name, killTimeout); err != nil { + log.Printf("[Delete] error removing %s, %s\n", name, err) + http.Error(w, err.Error(), http.StatusInternalServerError) return } - log.Printf("[Delete] deleted %s\n", name) + log.Printf("[Delete] deleted: %s\n", name) } } + +func getKillTimeout(envs map[string]string) time.Duration { + killTimeout := time.Second * 5 + if v, ok := envs["healthcheck_interval"]; ok { + dur, err := time.ParseDuration(v) + if err == nil { + killTimeout = dur + } + } + return killTimeout +} diff --git a/pkg/provider/handlers/update.go b/pkg/provider/handlers/update.go index 3461507..f8cf423 100644 --- a/pkg/provider/handlers/update.go +++ b/pkg/provider/handlers/update.go @@ -39,6 +39,7 @@ func MakeUpdateHandler(client *containerd.Client, cni gocni.CNI, secretMountPath return } + name := req.Service namespace := getRequestNamespace(req.Namespace) @@ -64,8 +65,7 @@ func MakeUpdateHandler(client *containerd.Client, cni gocni.CNI, secretMountPath return } - err = validateSecrets(namespaceSecretMountPath, req.Secrets) - if err != nil { + if err = validateSecrets(namespaceSecretMountPath, req.Secrets); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) } @@ -83,13 +83,15 @@ func MakeUpdateHandler(client *containerd.Client, cni gocni.CNI, secretMountPath } } - if err := service.Remove(ctx, client, name); err != nil { + killTimeout := getKillTimeout(function.annotations) + + if err := service.Remove(ctx, client, name, killTimeout); err != nil { log.Printf("[Update] error removing %s, %s\n", name, err) http.Error(w, err.Error(), http.StatusInternalServerError) return } - // The pull has already been done in prepull, so we can force this pull to "false" + // The pull has already been done in pre-pull, so we can force this pull to "false" pull := false if err := deploy(ctx, req, client, cni, namespaceSecretMountPath, pull); err != nil { diff --git a/pkg/service/service.go b/pkg/service/service.go index c856258..0b165c1 100644 --- a/pkg/service/service.go +++ b/pkg/service/service.go @@ -22,78 +22,84 @@ import ( const dockerConfigDir = "/var/lib/faasd/.docker/" // Remove removes a container -func Remove(ctx context.Context, client *containerd.Client, name string) error { +func Remove(ctx context.Context, client *containerd.Client, name string, killTimeout time.Duration) error { - container, containerErr := client.LoadContainer(ctx, name) - - if containerErr == nil { - taskFound := true - t, err := container.Task(ctx, nil) - if err != nil { - if errdefs.IsNotFound(err) { - taskFound = false - } else { - return fmt.Errorf("unable to get task %w: ", err) - } - } - - if taskFound { - status, err := t.Status(ctx) - if err != nil { - log.Printf("Unable to get status for: %s, error: %s", name, err.Error()) - } else { - log.Printf("Status of %s is: %s\n", name, status.Status) - } - - log.Printf("Need to kill task: %s\n", name) - if err = killTask(ctx, t); err != nil { - return fmt.Errorf("error killing task %s, %s, %w", container.ID(), name, err) - } - } - - if err := container.Delete(ctx, containerd.WithSnapshotCleanup); err != nil { - return fmt.Errorf("error deleting container %s, %s, %w", container.ID(), name, err) - } - - } else { + container, err := client.LoadContainer(ctx, name) + if err != nil { + // Perhaps the container was already removed, but the snapshot is still there service := client.SnapshotService("") key := name + "snapshot" + + // Don't return an error if the snapshot doesn't exist if _, err := client.SnapshotService("").Stat(ctx, key); err == nil { service.Remove(ctx, key) } + return nil } + + taskFound := true + t, err := container.Task(ctx, nil) + if err != nil { + if errdefs.IsNotFound(err) { + taskFound = false + } else { + return fmt.Errorf("unable to get task %w: ", err) + } + } + + if taskFound { + status, err := t.Status(ctx) + if err != nil { + log.Printf("Unable to get status for: %s, error: %s", name, err.Error()) + } else { + log.Printf("Status of %s is: %s\n", name, status.Status) + } + + if err = killTask(ctx, t, killTimeout); err != nil { + return fmt.Errorf("error killing task %s, %s, %w", container.ID(), name, err) + } + } + + if err := container.Delete(ctx, containerd.WithSnapshotCleanup); err != nil { + return fmt.Errorf("error deleting container %s, %s, %w", container.ID(), name, err) + } + return nil } -// Adapted from Stellar - https://github.com/stellar -func killTask(ctx context.Context, task containerd.Task) error { - - killTimeout := 30 * time.Second +// Adapted from Stellar - https://github.com/stellarproject +func killTask(ctx context.Context, task containerd.Task, killTimeout time.Duration) error { wg := &sync.WaitGroup{} wg.Add(1) var err error go func() { + id := task.ID() + defer wg.Done() if task != nil { wait, err := task.Wait(ctx) if err != nil { - log.Printf("error waiting on task: %s", err) + log.Printf("error waiting on task: %s: %s", id, err) return } if err := task.Kill(ctx, unix.SIGTERM, containerd.WithKillAll); err != nil { - log.Printf("error killing container task: %s", err) + log.Printf("error killing task: %s with SIGTERM: %s", id, err) } select { case <-wait: - task.Delete(ctx) + _, err := task.Delete(ctx) + if err != nil { + log.Printf("error deleting task: %s: %s", id, err) + } + return case <-time.After(killTimeout): if err := task.Kill(ctx, unix.SIGKILL, containerd.WithKillAll); err != nil { - log.Printf("error force killing container task: %s", err) + log.Printf("error killing task: %s with SIGTERM: %s", id, err) } return } diff --git a/pkg/supervisor.go b/pkg/supervisor.go index aceaad4..ac2ec46 100644 --- a/pkg/supervisor.go +++ b/pkg/supervisor.go @@ -10,6 +10,7 @@ import ( "sort" "strconv" "strings" + "time" "github.com/alexellis/arkade/pkg/env" "github.com/compose-spec/compose-go/loader" @@ -32,6 +33,7 @@ import ( const ( // workingDirectoryPermission user read/write/execute, group and others: read-only workingDirectoryPermission = 0744 + removalGracePeriod = time.Second * 5 ) type Service struct { @@ -126,9 +128,11 @@ func (s *Supervisor) Start(svcs []Service) error { for _, svc := range svcs { fmt.Printf("Removing old container for: %s\n", svc.Name) - containerErr := service.Remove(ctx, s.client, svc.Name) - if containerErr != nil { - return containerErr + if err := service.Remove(ctx, + s.client, + svc.Name, + removalGracePeriod); err != nil { + return err } } @@ -286,8 +290,7 @@ func (s *Supervisor) Remove(svcs []Service) error { return err } - err = service.Remove(ctx, s.client, svc.Name) - if err != nil { + if err := service.Remove(ctx, s.client, svc.Name, removalGracePeriod); err != nil { return err } }