Compare commits

..

5 Commits

Author SHA1 Message Date
d7e0bebe25 Fix for proxy exiting early
When a proxied core service was accessed before it was ready
to accept a connection, due to a start-up, restart, etc of
a core service, then the proxy exited instead of continuing
to accept new connections.

This meant having to restart faasd and hope the race condition
worked itself out, or that no incoming requests were made.

Tested with Grafana, which seemed to manifest the issue
the most.

Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alexellis2@gmail.com>
2025-01-30 10:00:14 +00:00
ef689d7b62 Fix for update workflow
When terminating a function and replacing it during an update,
there was often an error about task precondition not met
which meant having to try and or wait or being left in an
inconsistent state.

The new flow makes sure "Wait" is called in either code path
and allows for a custom gap between the SIGTERM and SIGKILL
through the grace_period env var - set as a Go duration.

Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alexellis2@gmail.com>
2025-01-21 17:20:30 +00:00
854ec5836d Remove unused context
Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alexellis2@gmail.com>
2025-01-21 16:03:17 +00:00
4ab5f60b9d Reduce logging from log streaming command
Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alexellis2@gmail.com>
2025-01-21 12:58:00 +00:00
a8b61f2086 Upgrade various core components
Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alexellis2@gmail.com>
2025-01-21 12:54:54 +00:00
5 changed files with 55 additions and 21 deletions

View File

@ -21,7 +21,7 @@ services:
# - "127.0.0.1:8222:8222"
prometheus:
image: docker.io/prom/prometheus:v3.0.1
image: docker.io/prom/prometheus:v3.1.0
# nobody
user: "65534"
volumes:

View File

@ -58,10 +58,10 @@ func (r *requester) Query(ctx context.Context, req logs.Request) (<-chan logs.Me
// buildCmd reeturns the equivalent of
//
// journalctl -t <namespace>:<name> \
// --output=json \
// --since=<timestamp> \
// <--follow> \
// journalctl -t <namespace>:<name> \
// --output=json \
// --since=<timestamp> \
// <--follow> \
func buildCmd(ctx context.Context, req logs.Request) *exec.Cmd {
// // set the cursor position based on req, default to 5m
since := time.Now().Add(-5 * time.Minute)
@ -105,12 +105,12 @@ func streamLogs(ctx context.Context, cmd *exec.Cmd, out io.ReadCloser, msgs chan
// will ensure `out` is closed and all related resources cleaned up
go func() {
err := cmd.Wait()
log.Println("wait result", err)
if err := cmd.Wait(); err != nil {
log.Printf("journalctl exited with error: %s", err)
}
}()
defer func() {
log.Println("closing journal stream")
close(msgs)
}()
@ -176,7 +176,6 @@ func parseEntry(entry map[string]string) (logs.Message, error) {
}
func logErrOut(out io.ReadCloser) {
defer log.Println("stderr closed")
defer out.Close()
io.Copy(log.Writer(), out)

View File

@ -4,6 +4,7 @@ import (
"context"
"errors"
"log"
"strings"
"time"
"github.com/containerd/containerd"
@ -51,7 +52,10 @@ func ListFunctions(client *containerd.Client, namespace string) (map[string]*Fun
name := c.ID()
f, err := GetFunction(client, name, namespace)
if err != nil {
log.Printf("skipping %s, error: %s", name, err)
if !strings.Contains(err.Error(), "unable to get IP address for container") {
log.Printf("List functions, skipping: %s, error: %s", name, err)
}
} else {
functions[name] = &f
}

View File

@ -86,7 +86,7 @@ func (p *Proxy) Start() error {
conn.Close()
log.Printf("Unable to dial: %s, error: %s", upstreamAddr, err.Error())
return err
continue
}
go pipe(conn, upstream)

View File

@ -6,6 +6,7 @@ import (
"log"
"os"
"path/filepath"
"strings"
"sync"
"time"
@ -45,10 +46,24 @@ func Remove(ctx context.Context, client *containerd.Client, name string) error {
log.Printf("Status of %s is: %s\n", name, status.Status)
}
log.Printf("Need to kill task: %s\n", name)
if err = killTask(ctx, t); err != nil {
var gracePeriod = time.Second * 30
spec, err := t.Spec(ctx)
if err == nil {
for _, p := range spec.Process.Env {
k, v, ok := strings.Cut(p, "=")
if ok && k == "grace_period" {
periodVal, err := time.ParseDuration(v)
if err == nil {
gracePeriod = periodVal
}
}
}
}
if err = killTask(ctx, t, gracePeriod); err != nil {
return fmt.Errorf("error killing task %s, %s, %w", container.ID(), name, err)
}
}
if err := container.Delete(ctx, containerd.WithSnapshotCleanup); err != nil {
@ -66,14 +81,13 @@ func Remove(ctx context.Context, client *containerd.Client, name string) error {
}
// Adapted from Stellar - https://github.com/stellar
func killTask(ctx context.Context, task containerd.Task) error {
killTimeout := 30 * time.Second
func killTask(ctx context.Context, task containerd.Task, gracePeriod time.Duration) error {
wg := &sync.WaitGroup{}
wg.Add(1)
var err error
waited := false
go func() {
defer wg.Done()
if task != nil {
@ -89,22 +103,39 @@ func killTask(ctx context.Context, task containerd.Task) error {
select {
case <-wait:
task.Delete(ctx)
waited = true
return
case <-time.After(killTimeout):
case <-time.After(gracePeriod):
log.Printf("Sending SIGKILL to: %s after: %s", task.ID(), gracePeriod.Round(time.Second).String())
if err := task.Kill(ctx, unix.SIGKILL, containerd.WithKillAll); err != nil {
log.Printf("error force killing container task: %s", err)
log.Printf("error sending SIGKILL to task: %s", err)
}
return
}
}
}()
wg.Wait()
if task != nil {
if !waited {
wait, err := task.Wait(ctx)
if err != nil {
log.Printf("error waiting on task after kill: %s", err)
}
<-wait
}
if _, err := task.Delete(ctx); err != nil {
return err
}
}
return err
}
func getResolver(ctx context.Context, configFile *configfile.ConfigFile) (remotes.Resolver, error) {
func getResolver(configFile *configfile.ConfigFile) (remotes.Resolver, error) {
// credsFunc is based on https://github.com/moby/buildkit/blob/0b130cca040246d2ddf55117eeff34f546417e40/session/auth/authprovider/authprovider.go#L35
credFunc := func(host string) (string, string, error) {
if host == "registry-1.docker.io" {
@ -139,7 +170,7 @@ func PrepareImage(ctx context.Context, client *containerd.Client, imageName, sna
if err != nil {
return nil, err
}
resolver, err = getResolver(ctx, configFile)
resolver, err = getResolver(configFile)
if err != nil {
return empty, err
}