Restart stopped tasks

This patch reports stopped tasks as having zero scale, which
means the gateway will send a "scale up" request, the same
way as it does for paused containers, or those which have
no task due to a reboot of the machine.

The scale up logic will now delete the stopped task and
recreate the task.

Tested with nodeinfo and figlet on a Dell XPS with
Ubuntu 16.04. The scaling logic has been re-written, but
re-tested by manually pausing and manually removing
the task of a container.

Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alexellis2@gmail.com>
This commit is contained in:
Alex Ellis (OpenFaaS Ltd)
2020-09-19 21:07:43 +01:00
committed by Alex Ellis
parent 87f49b0289
commit 40829bbf88
4 changed files with 67 additions and 40 deletions

View File

@ -1,7 +1,7 @@
version: "3.7"
services:
basic-auth-plugin:
image: "docker.io/openfaas/basic-auth-plugin:0.18.17${ARCH_SUFFIX}"
image: "docker.io/openfaas/basic-auth-plugin:0.18.18${ARCH_SUFFIX}"
environment:
- port=8080
- secret_mount_path=/run/secrets
@ -41,7 +41,7 @@ services:
- "127.0.0.1:9090:9090"
gateway:
image: "docker.io/openfaas/gateway:0.18.17${ARCH_SUFFIX}"
image: "docker.io/openfaas/gateway:0.18.18${ARCH_SUFFIX}"
environment:
- basic_auth=true
- functions_provider_url=http://faasd-provider:8081/

View File

@ -68,6 +68,7 @@ func GetFunction(client *containerd.Client, name string) (Function, error) {
if err != nil {
return Function{}, fmt.Errorf("unable to get task status for container: %s %s", name, err)
}
if svc.Status == "running" {
replicas = 1
f.pid = task.Pid()
@ -85,7 +86,7 @@ func GetFunction(client *containerd.Client, name string) (Function, error) {
f.replicas = replicas
return f, nil
}
return Function{}, fmt.Errorf("unable to find function: %s, error %s", name, err)
}

View File

@ -11,6 +11,7 @@ import (
"github.com/containerd/containerd"
"github.com/containerd/containerd/namespaces"
gocni "github.com/containerd/go-cni"
"github.com/openfaas/faas-provider/types"
faasd "github.com/openfaas/faasd/pkg"
)
@ -58,46 +59,70 @@ func MakeReplicaUpdateHandler(client *containerd.Client, cni gocni.CNI) func(w h
return
}
taskExists := true
var taskExists bool
var taskStatus *containerd.Status
task, taskErr := ctr.Task(ctx, nil)
if taskErr != nil {
msg := fmt.Sprintf("cannot load task for service %s, error: %s", name, taskErr)
log.Printf("[Scale] %s\n", msg)
taskExists = false
} else {
taskExists = true
status, statusErr := task.Status(ctx)
if statusErr != nil {
msg := fmt.Sprintf("cannot load task status for %s, error: %s", name, statusErr)
log.Printf("[Scale] %s\n", msg)
http.Error(w, msg, http.StatusInternalServerError)
return
} else {
taskStatus = &status
}
}
if req.Replicas > 0 {
if taskExists {
if status, statusErr := task.Status(ctx); statusErr == nil {
if status.Status == containerd.Paused {
if resumeErr := task.Resume(ctx); resumeErr != nil {
log.Printf("[Scale] error resuming task %s, error: %s\n", name, resumeErr)
http.Error(w, resumeErr.Error(), http.StatusBadRequest)
}
}
}
} else {
deployErr := createTask(ctx, client, ctr, cni)
if deployErr != nil {
log.Printf("[Scale] error deploying %s, error: %s\n", name, deployErr)
http.Error(w, deployErr.Error(), http.StatusBadRequest)
createNewTask := false
// Scale to zero
if req.Replicas == 0 {
// If a task is running, pause it
if taskExists && taskStatus.Status == containerd.Running {
if pauseErr := task.Pause(ctx); pauseErr != nil {
wrappedPauseErr := fmt.Errorf("error pausing task %s, error: %s", name, pauseErr)
log.Printf("[Scale] %s\n", wrappedPauseErr.Error())
http.Error(w, wrappedPauseErr.Error(), http.StatusNotFound)
return
}
return
}
} else {
if taskExists {
if status, statusErr := task.Status(ctx); statusErr == nil {
if status.Status == containerd.Running {
if pauseErr := task.Pause(ctx); pauseErr != nil {
log.Printf("[Scale] error pausing task %s, error: %s\n", name, pauseErr)
http.Error(w, pauseErr.Error(), http.StatusBadRequest)
}
}
}
}
}
}
if taskExists {
if taskStatus != nil {
if taskStatus.Status == containerd.Paused {
if resumeErr := task.Resume(ctx); resumeErr != nil {
log.Printf("[Scale] error resuming task %s, error: %s\n", name, resumeErr)
http.Error(w, resumeErr.Error(), http.StatusBadRequest)
}
} else if taskStatus.Status == containerd.Stopped {
// Stopped tasks cannot be restarted, must be removed, and created again
if _, delErr := task.Delete(ctx); delErr != nil {
log.Printf("[Scale] error deleting stopped task %s, error: %s\n", name, delErr)
http.Error(w, delErr.Error(), http.StatusBadRequest)
}
createNewTask = true
}
}
} else {
createNewTask = true
}
if createNewTask {
deployErr := createTask(ctx, client, ctr, cni)
if deployErr != nil {
log.Printf("[Scale] error deploying %s, error: %s\n", name, deployErr)
http.Error(w, deployErr.Error(), http.StatusBadRequest)
return
}
}
}
}

View File

@ -161,7 +161,7 @@ func (s *Supervisor) Start(svcs []Service) error {
Options: []string{"rbind", "ro"},
})
newContainer, containerCreateErr := s.client.NewContainer(
newContainer, err := s.client.NewContainer(
ctx,
svc.Name,
containerd.WithImage(image),
@ -173,28 +173,29 @@ func (s *Supervisor) Start(svcs []Service) error {
oci.WithEnv(svc.Env)),
)
if containerCreateErr != nil {
log.Printf("Error creating container: %s\n", containerCreateErr)
return containerCreateErr
if err != nil {
log.Printf("Error creating container: %s\n", err)
return err
}
log.Printf("Created container: %s\n", newContainer.ID())
task, taskErr := newContainer.NewTask(ctx, cio.BinaryIO("/usr/local/bin/faasd", nil))
if taskErr != nil {
task, err := newContainer.NewTask(ctx, cio.BinaryIO("/usr/local/bin/faasd", nil))
if err != nil {
log.Printf("Error creating task: %s\n", err)
return taskErr
return err
}
labels := map[string]string{}
network, err := cninetwork.CreateCNINetwork(ctx, s.cni, task, labels)
if err != nil {
log.Printf("Error creating CNI for %s: %s", svc.Name, err)
return err
}
ip, err := cninetwork.GetIPAddress(network, task)
if err != nil {
log.Printf("Error getting IP for %s: %s", svc.Name, err)
return err
}