mirror of
https://github.com/openfaas/faasd.git
synced 2025-06-08 16:06:47 +00:00
Fix for update workflow
When terminating a function and replacing it during an update, there was often an error about task precondition not met which meant having to try and or wait or being left in an inconsistent state. The new flow makes sure "Wait" is called in either code path and allows for a custom gap between the SIGTERM and SIGKILL through the grace_period env var - set as a Go duration. Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alexellis2@gmail.com>
This commit is contained in:
parent
854ec5836d
commit
ef689d7b62
@ -4,6 +4,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"log"
|
"log"
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/containerd/containerd"
|
"github.com/containerd/containerd"
|
||||||
@ -51,7 +52,10 @@ func ListFunctions(client *containerd.Client, namespace string) (map[string]*Fun
|
|||||||
name := c.ID()
|
name := c.ID()
|
||||||
f, err := GetFunction(client, name, namespace)
|
f, err := GetFunction(client, name, namespace)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("skipping %s, error: %s", name, err)
|
if !strings.Contains(err.Error(), "unable to get IP address for container") {
|
||||||
|
log.Printf("List functions, skipping: %s, error: %s", name, err)
|
||||||
|
}
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
functions[name] = &f
|
functions[name] = &f
|
||||||
}
|
}
|
||||||
|
@ -6,6 +6,7 @@ import (
|
|||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -45,10 +46,24 @@ func Remove(ctx context.Context, client *containerd.Client, name string) error {
|
|||||||
log.Printf("Status of %s is: %s\n", name, status.Status)
|
log.Printf("Status of %s is: %s\n", name, status.Status)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Printf("Need to kill task: %s\n", name)
|
var gracePeriod = time.Second * 30
|
||||||
if err = killTask(ctx, t); err != nil {
|
spec, err := t.Spec(ctx)
|
||||||
|
if err == nil {
|
||||||
|
for _, p := range spec.Process.Env {
|
||||||
|
k, v, ok := strings.Cut(p, "=")
|
||||||
|
if ok && k == "grace_period" {
|
||||||
|
periodVal, err := time.ParseDuration(v)
|
||||||
|
if err == nil {
|
||||||
|
gracePeriod = periodVal
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = killTask(ctx, t, gracePeriod); err != nil {
|
||||||
return fmt.Errorf("error killing task %s, %s, %w", container.ID(), name, err)
|
return fmt.Errorf("error killing task %s, %s, %w", container.ID(), name, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := container.Delete(ctx, containerd.WithSnapshotCleanup); err != nil {
|
if err := container.Delete(ctx, containerd.WithSnapshotCleanup); err != nil {
|
||||||
@ -66,14 +81,13 @@ func Remove(ctx context.Context, client *containerd.Client, name string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Adapted from Stellar - https://github.com/stellar
|
// Adapted from Stellar - https://github.com/stellar
|
||||||
func killTask(ctx context.Context, task containerd.Task) error {
|
func killTask(ctx context.Context, task containerd.Task, gracePeriod time.Duration) error {
|
||||||
|
|
||||||
killTimeout := 30 * time.Second
|
|
||||||
|
|
||||||
wg := &sync.WaitGroup{}
|
wg := &sync.WaitGroup{}
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
|
waited := false
|
||||||
go func() {
|
go func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
if task != nil {
|
if task != nil {
|
||||||
@ -89,18 +103,35 @@ func killTask(ctx context.Context, task containerd.Task) error {
|
|||||||
|
|
||||||
select {
|
select {
|
||||||
case <-wait:
|
case <-wait:
|
||||||
task.Delete(ctx)
|
waited = true
|
||||||
return
|
return
|
||||||
case <-time.After(killTimeout):
|
case <-time.After(gracePeriod):
|
||||||
|
log.Printf("Sending SIGKILL to: %s after: %s", task.ID(), gracePeriod.Round(time.Second).String())
|
||||||
if err := task.Kill(ctx, unix.SIGKILL, containerd.WithKillAll); err != nil {
|
if err := task.Kill(ctx, unix.SIGKILL, containerd.WithKillAll); err != nil {
|
||||||
log.Printf("error force killing container task: %s", err)
|
log.Printf("error sending SIGKILL to task: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
||||||
|
if task != nil {
|
||||||
|
if !waited {
|
||||||
|
wait, err := task.Wait(ctx)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("error waiting on task after kill: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
<-wait
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := task.Delete(ctx); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user