From 40829bbf88eccda8154b82a2ccd65f0690144e16 Mon Sep 17 00:00:00 2001 From: "Alex Ellis (OpenFaaS Ltd)" Date: Sat, 19 Sep 2020 21:07:43 +0100 Subject: [PATCH] Restart stopped tasks This patch reports stopped tasks as having zero scale, which means the gateway will send a "scale up" request, the same way as it does for paused containers, or those which have no task due to a reboot of the machine. The scale up logic will now delete the stopped task and recreate the task. Tested with nodeinfo and figlet on a Dell XPS with Ubuntu 16.04. The scaling logic has been re-written, but re-tested by manually pausing and manually removing the task of a container. Signed-off-by: Alex Ellis (OpenFaaS Ltd) --- docker-compose.yaml | 4 +- pkg/provider/handlers/functions.go | 3 +- pkg/provider/handlers/scale.go | 83 +++++++++++++++++++----------- pkg/supervisor.go | 17 +++--- 4 files changed, 67 insertions(+), 40 deletions(-) diff --git a/docker-compose.yaml b/docker-compose.yaml index 4405755..44f2b3a 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -1,7 +1,7 @@ version: "3.7" services: basic-auth-plugin: - image: "docker.io/openfaas/basic-auth-plugin:0.18.17${ARCH_SUFFIX}" + image: "docker.io/openfaas/basic-auth-plugin:0.18.18${ARCH_SUFFIX}" environment: - port=8080 - secret_mount_path=/run/secrets @@ -41,7 +41,7 @@ services: - "127.0.0.1:9090:9090" gateway: - image: "docker.io/openfaas/gateway:0.18.17${ARCH_SUFFIX}" + image: "docker.io/openfaas/gateway:0.18.18${ARCH_SUFFIX}" environment: - basic_auth=true - functions_provider_url=http://faasd-provider:8081/ diff --git a/pkg/provider/handlers/functions.go b/pkg/provider/handlers/functions.go index 41ad1b8..3f07cac 100644 --- a/pkg/provider/handlers/functions.go +++ b/pkg/provider/handlers/functions.go @@ -68,6 +68,7 @@ func GetFunction(client *containerd.Client, name string) (Function, error) { if err != nil { return Function{}, fmt.Errorf("unable to get task status for container: %s %s", name, err) } + if svc.Status == "running" { replicas = 1 f.pid = task.Pid() @@ -85,7 +86,7 @@ func GetFunction(client *containerd.Client, name string) (Function, error) { f.replicas = replicas return f, nil - } + return Function{}, fmt.Errorf("unable to find function: %s, error %s", name, err) } diff --git a/pkg/provider/handlers/scale.go b/pkg/provider/handlers/scale.go index 199d448..cac4023 100644 --- a/pkg/provider/handlers/scale.go +++ b/pkg/provider/handlers/scale.go @@ -11,6 +11,7 @@ import ( "github.com/containerd/containerd" "github.com/containerd/containerd/namespaces" gocni "github.com/containerd/go-cni" + "github.com/openfaas/faas-provider/types" faasd "github.com/openfaas/faasd/pkg" ) @@ -58,46 +59,70 @@ func MakeReplicaUpdateHandler(client *containerd.Client, cni gocni.CNI) func(w h return } - taskExists := true + var taskExists bool + var taskStatus *containerd.Status + task, taskErr := ctr.Task(ctx, nil) if taskErr != nil { msg := fmt.Sprintf("cannot load task for service %s, error: %s", name, taskErr) log.Printf("[Scale] %s\n", msg) taskExists = false + } else { + taskExists = true + status, statusErr := task.Status(ctx) + if statusErr != nil { + msg := fmt.Sprintf("cannot load task status for %s, error: %s", name, statusErr) + log.Printf("[Scale] %s\n", msg) + http.Error(w, msg, http.StatusInternalServerError) + return + } else { + taskStatus = &status + } } - if req.Replicas > 0 { - if taskExists { - if status, statusErr := task.Status(ctx); statusErr == nil { - if status.Status == containerd.Paused { - if resumeErr := task.Resume(ctx); resumeErr != nil { - log.Printf("[Scale] error resuming task %s, error: %s\n", name, resumeErr) - http.Error(w, resumeErr.Error(), http.StatusBadRequest) - } - } - } - } else { - deployErr := createTask(ctx, client, ctr, cni) - if deployErr != nil { - log.Printf("[Scale] error deploying %s, error: %s\n", name, deployErr) - http.Error(w, deployErr.Error(), http.StatusBadRequest) + createNewTask := false + + // Scale to zero + if req.Replicas == 0 { + // If a task is running, pause it + if taskExists && taskStatus.Status == containerd.Running { + if pauseErr := task.Pause(ctx); pauseErr != nil { + wrappedPauseErr := fmt.Errorf("error pausing task %s, error: %s", name, pauseErr) + log.Printf("[Scale] %s\n", wrappedPauseErr.Error()) + http.Error(w, wrappedPauseErr.Error(), http.StatusNotFound) return } - return - } - } else { - if taskExists { - if status, statusErr := task.Status(ctx); statusErr == nil { - if status.Status == containerd.Running { - if pauseErr := task.Pause(ctx); pauseErr != nil { - log.Printf("[Scale] error pausing task %s, error: %s\n", name, pauseErr) - http.Error(w, pauseErr.Error(), http.StatusBadRequest) - } - } - } } } - } + if taskExists { + if taskStatus != nil { + if taskStatus.Status == containerd.Paused { + if resumeErr := task.Resume(ctx); resumeErr != nil { + log.Printf("[Scale] error resuming task %s, error: %s\n", name, resumeErr) + http.Error(w, resumeErr.Error(), http.StatusBadRequest) + } + } else if taskStatus.Status == containerd.Stopped { + // Stopped tasks cannot be restarted, must be removed, and created again + if _, delErr := task.Delete(ctx); delErr != nil { + log.Printf("[Scale] error deleting stopped task %s, error: %s\n", name, delErr) + http.Error(w, delErr.Error(), http.StatusBadRequest) + } + createNewTask = true + } + } + } else { + createNewTask = true + } + if createNewTask { + deployErr := createTask(ctx, client, ctr, cni) + if deployErr != nil { + log.Printf("[Scale] error deploying %s, error: %s\n", name, deployErr) + http.Error(w, deployErr.Error(), http.StatusBadRequest) + return + + } + } + } } diff --git a/pkg/supervisor.go b/pkg/supervisor.go index e2695a6..5f61d92 100644 --- a/pkg/supervisor.go +++ b/pkg/supervisor.go @@ -161,7 +161,7 @@ func (s *Supervisor) Start(svcs []Service) error { Options: []string{"rbind", "ro"}, }) - newContainer, containerCreateErr := s.client.NewContainer( + newContainer, err := s.client.NewContainer( ctx, svc.Name, containerd.WithImage(image), @@ -173,28 +173,29 @@ func (s *Supervisor) Start(svcs []Service) error { oci.WithEnv(svc.Env)), ) - if containerCreateErr != nil { - log.Printf("Error creating container: %s\n", containerCreateErr) - return containerCreateErr + if err != nil { + log.Printf("Error creating container: %s\n", err) + return err } log.Printf("Created container: %s\n", newContainer.ID()) - task, taskErr := newContainer.NewTask(ctx, cio.BinaryIO("/usr/local/bin/faasd", nil)) - if taskErr != nil { + task, err := newContainer.NewTask(ctx, cio.BinaryIO("/usr/local/bin/faasd", nil)) + if err != nil { log.Printf("Error creating task: %s\n", err) - return taskErr + return err } labels := map[string]string{} network, err := cninetwork.CreateCNINetwork(ctx, s.cni, task, labels) - if err != nil { + log.Printf("Error creating CNI for %s: %s", svc.Name, err) return err } ip, err := cninetwork.GetIPAddress(network, task) if err != nil { + log.Printf("Error getting IP for %s: %s", svc.Name, err) return err }