Compare commits

..

3 Commits

Author SHA1 Message Date
d7e0bebe25 Fix for proxy exiting early
When a proxied core service was accessed before it was ready
to accept a connection, due to a start-up, restart, etc of
a core service, then the proxy exited instead of continuing
to accept new connections.

This meant having to restart faasd and hope the race condition
worked itself out, or that no incoming requests were made.

Tested with Grafana, which seemed to manifest the issue
the most.

Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alexellis2@gmail.com>
2025-01-30 10:00:14 +00:00
ef689d7b62 Fix for update workflow
When terminating a function and replacing it during an update,
there was often an error about task precondition not met
which meant having to try and or wait or being left in an
inconsistent state.

The new flow makes sure "Wait" is called in either code path
and allows for a custom gap between the SIGTERM and SIGKILL
through the grace_period env var - set as a Go duration.

Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alexellis2@gmail.com>
2025-01-21 17:20:30 +00:00
854ec5836d Remove unused context
Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alexellis2@gmail.com>
2025-01-21 16:03:17 +00:00
3 changed files with 47 additions and 12 deletions

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"errors" "errors"
"log" "log"
"strings"
"time" "time"
"github.com/containerd/containerd" "github.com/containerd/containerd"
@ -51,7 +52,10 @@ func ListFunctions(client *containerd.Client, namespace string) (map[string]*Fun
name := c.ID() name := c.ID()
f, err := GetFunction(client, name, namespace) f, err := GetFunction(client, name, namespace)
if err != nil { if err != nil {
log.Printf("skipping %s, error: %s", name, err) if !strings.Contains(err.Error(), "unable to get IP address for container") {
log.Printf("List functions, skipping: %s, error: %s", name, err)
}
} else { } else {
functions[name] = &f functions[name] = &f
} }

View File

@ -86,7 +86,7 @@ func (p *Proxy) Start() error {
conn.Close() conn.Close()
log.Printf("Unable to dial: %s, error: %s", upstreamAddr, err.Error()) log.Printf("Unable to dial: %s, error: %s", upstreamAddr, err.Error())
return err continue
} }
go pipe(conn, upstream) go pipe(conn, upstream)

View File

@ -6,6 +6,7 @@ import (
"log" "log"
"os" "os"
"path/filepath" "path/filepath"
"strings"
"sync" "sync"
"time" "time"
@ -45,10 +46,24 @@ func Remove(ctx context.Context, client *containerd.Client, name string) error {
log.Printf("Status of %s is: %s\n", name, status.Status) log.Printf("Status of %s is: %s\n", name, status.Status)
} }
log.Printf("Need to kill task: %s\n", name) var gracePeriod = time.Second * 30
if err = killTask(ctx, t); err != nil { spec, err := t.Spec(ctx)
if err == nil {
for _, p := range spec.Process.Env {
k, v, ok := strings.Cut(p, "=")
if ok && k == "grace_period" {
periodVal, err := time.ParseDuration(v)
if err == nil {
gracePeriod = periodVal
}
}
}
}
if err = killTask(ctx, t, gracePeriod); err != nil {
return fmt.Errorf("error killing task %s, %s, %w", container.ID(), name, err) return fmt.Errorf("error killing task %s, %s, %w", container.ID(), name, err)
} }
} }
if err := container.Delete(ctx, containerd.WithSnapshotCleanup); err != nil { if err := container.Delete(ctx, containerd.WithSnapshotCleanup); err != nil {
@ -66,14 +81,13 @@ func Remove(ctx context.Context, client *containerd.Client, name string) error {
} }
// Adapted from Stellar - https://github.com/stellar // Adapted from Stellar - https://github.com/stellar
func killTask(ctx context.Context, task containerd.Task) error { func killTask(ctx context.Context, task containerd.Task, gracePeriod time.Duration) error {
killTimeout := 30 * time.Second
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
wg.Add(1) wg.Add(1)
var err error var err error
waited := false
go func() { go func() {
defer wg.Done() defer wg.Done()
if task != nil { if task != nil {
@ -89,22 +103,39 @@ func killTask(ctx context.Context, task containerd.Task) error {
select { select {
case <-wait: case <-wait:
task.Delete(ctx) waited = true
return return
case <-time.After(killTimeout): case <-time.After(gracePeriod):
log.Printf("Sending SIGKILL to: %s after: %s", task.ID(), gracePeriod.Round(time.Second).String())
if err := task.Kill(ctx, unix.SIGKILL, containerd.WithKillAll); err != nil { if err := task.Kill(ctx, unix.SIGKILL, containerd.WithKillAll); err != nil {
log.Printf("error force killing container task: %s", err) log.Printf("error sending SIGKILL to task: %s", err)
} }
return return
} }
} }
}() }()
wg.Wait() wg.Wait()
if task != nil {
if !waited {
wait, err := task.Wait(ctx)
if err != nil {
log.Printf("error waiting on task after kill: %s", err)
}
<-wait
}
if _, err := task.Delete(ctx); err != nil {
return err
}
}
return err return err
} }
func getResolver(ctx context.Context, configFile *configfile.ConfigFile) (remotes.Resolver, error) { func getResolver(configFile *configfile.ConfigFile) (remotes.Resolver, error) {
// credsFunc is based on https://github.com/moby/buildkit/blob/0b130cca040246d2ddf55117eeff34f546417e40/session/auth/authprovider/authprovider.go#L35 // credsFunc is based on https://github.com/moby/buildkit/blob/0b130cca040246d2ddf55117eeff34f546417e40/session/auth/authprovider/authprovider.go#L35
credFunc := func(host string) (string, string, error) { credFunc := func(host string) (string, string, error) {
if host == "registry-1.docker.io" { if host == "registry-1.docker.io" {
@ -139,7 +170,7 @@ func PrepareImage(ctx context.Context, client *containerd.Client, imageName, sna
if err != nil { if err != nil {
return nil, err return nil, err
} }
resolver, err = getResolver(ctx, configFile) resolver, err = getResolver(configFile)
if err != nil { if err != nil {
return empty, err return empty, err
} }