mirror of
https://github.com/openfaas/faasd.git
synced 2025-06-18 12:06:36 +00:00
Compare commits
5 Commits
Author | SHA1 | Date | |
---|---|---|---|
d7e0bebe25 | |||
ef689d7b62 | |||
854ec5836d | |||
4ab5f60b9d | |||
a8b61f2086 |
@ -21,7 +21,7 @@ services:
|
||||
# - "127.0.0.1:8222:8222"
|
||||
|
||||
prometheus:
|
||||
image: docker.io/prom/prometheus:v3.0.1
|
||||
image: docker.io/prom/prometheus:v3.1.0
|
||||
# nobody
|
||||
user: "65534"
|
||||
volumes:
|
||||
|
@ -58,10 +58,10 @@ func (r *requester) Query(ctx context.Context, req logs.Request) (<-chan logs.Me
|
||||
|
||||
// buildCmd reeturns the equivalent of
|
||||
//
|
||||
// journalctl -t <namespace>:<name> \
|
||||
// --output=json \
|
||||
// --since=<timestamp> \
|
||||
// <--follow> \
|
||||
// journalctl -t <namespace>:<name> \
|
||||
// --output=json \
|
||||
// --since=<timestamp> \
|
||||
// <--follow> \
|
||||
func buildCmd(ctx context.Context, req logs.Request) *exec.Cmd {
|
||||
// // set the cursor position based on req, default to 5m
|
||||
since := time.Now().Add(-5 * time.Minute)
|
||||
@ -105,12 +105,12 @@ func streamLogs(ctx context.Context, cmd *exec.Cmd, out io.ReadCloser, msgs chan
|
||||
|
||||
// will ensure `out` is closed and all related resources cleaned up
|
||||
go func() {
|
||||
err := cmd.Wait()
|
||||
log.Println("wait result", err)
|
||||
if err := cmd.Wait(); err != nil {
|
||||
log.Printf("journalctl exited with error: %s", err)
|
||||
}
|
||||
}()
|
||||
|
||||
defer func() {
|
||||
log.Println("closing journal stream")
|
||||
close(msgs)
|
||||
}()
|
||||
|
||||
@ -176,7 +176,6 @@ func parseEntry(entry map[string]string) (logs.Message, error) {
|
||||
}
|
||||
|
||||
func logErrOut(out io.ReadCloser) {
|
||||
defer log.Println("stderr closed")
|
||||
defer out.Close()
|
||||
|
||||
io.Copy(log.Writer(), out)
|
||||
|
@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"log"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/containerd/containerd"
|
||||
@ -51,7 +52,10 @@ func ListFunctions(client *containerd.Client, namespace string) (map[string]*Fun
|
||||
name := c.ID()
|
||||
f, err := GetFunction(client, name, namespace)
|
||||
if err != nil {
|
||||
log.Printf("skipping %s, error: %s", name, err)
|
||||
if !strings.Contains(err.Error(), "unable to get IP address for container") {
|
||||
log.Printf("List functions, skipping: %s, error: %s", name, err)
|
||||
}
|
||||
|
||||
} else {
|
||||
functions[name] = &f
|
||||
}
|
||||
|
@ -86,7 +86,7 @@ func (p *Proxy) Start() error {
|
||||
conn.Close()
|
||||
|
||||
log.Printf("Unable to dial: %s, error: %s", upstreamAddr, err.Error())
|
||||
return err
|
||||
continue
|
||||
}
|
||||
|
||||
go pipe(conn, upstream)
|
||||
|
@ -6,6 +6,7 @@ import (
|
||||
"log"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@ -45,10 +46,24 @@ func Remove(ctx context.Context, client *containerd.Client, name string) error {
|
||||
log.Printf("Status of %s is: %s\n", name, status.Status)
|
||||
}
|
||||
|
||||
log.Printf("Need to kill task: %s\n", name)
|
||||
if err = killTask(ctx, t); err != nil {
|
||||
var gracePeriod = time.Second * 30
|
||||
spec, err := t.Spec(ctx)
|
||||
if err == nil {
|
||||
for _, p := range spec.Process.Env {
|
||||
k, v, ok := strings.Cut(p, "=")
|
||||
if ok && k == "grace_period" {
|
||||
periodVal, err := time.ParseDuration(v)
|
||||
if err == nil {
|
||||
gracePeriod = periodVal
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err = killTask(ctx, t, gracePeriod); err != nil {
|
||||
return fmt.Errorf("error killing task %s, %s, %w", container.ID(), name, err)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
if err := container.Delete(ctx, containerd.WithSnapshotCleanup); err != nil {
|
||||
@ -66,14 +81,13 @@ func Remove(ctx context.Context, client *containerd.Client, name string) error {
|
||||
}
|
||||
|
||||
// Adapted from Stellar - https://github.com/stellar
|
||||
func killTask(ctx context.Context, task containerd.Task) error {
|
||||
|
||||
killTimeout := 30 * time.Second
|
||||
func killTask(ctx context.Context, task containerd.Task, gracePeriod time.Duration) error {
|
||||
|
||||
wg := &sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
var err error
|
||||
|
||||
waited := false
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
if task != nil {
|
||||
@ -89,22 +103,39 @@ func killTask(ctx context.Context, task containerd.Task) error {
|
||||
|
||||
select {
|
||||
case <-wait:
|
||||
task.Delete(ctx)
|
||||
waited = true
|
||||
return
|
||||
case <-time.After(killTimeout):
|
||||
case <-time.After(gracePeriod):
|
||||
log.Printf("Sending SIGKILL to: %s after: %s", task.ID(), gracePeriod.Round(time.Second).String())
|
||||
if err := task.Kill(ctx, unix.SIGKILL, containerd.WithKillAll); err != nil {
|
||||
log.Printf("error force killing container task: %s", err)
|
||||
log.Printf("error sending SIGKILL to task: %s", err)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
wg.Wait()
|
||||
|
||||
if task != nil {
|
||||
if !waited {
|
||||
wait, err := task.Wait(ctx)
|
||||
if err != nil {
|
||||
log.Printf("error waiting on task after kill: %s", err)
|
||||
}
|
||||
|
||||
<-wait
|
||||
}
|
||||
|
||||
if _, err := task.Delete(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func getResolver(ctx context.Context, configFile *configfile.ConfigFile) (remotes.Resolver, error) {
|
||||
func getResolver(configFile *configfile.ConfigFile) (remotes.Resolver, error) {
|
||||
// credsFunc is based on https://github.com/moby/buildkit/blob/0b130cca040246d2ddf55117eeff34f546417e40/session/auth/authprovider/authprovider.go#L35
|
||||
credFunc := func(host string) (string, string, error) {
|
||||
if host == "registry-1.docker.io" {
|
||||
@ -139,7 +170,7 @@ func PrepareImage(ctx context.Context, client *containerd.Client, imageName, sna
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
resolver, err = getResolver(ctx, configFile)
|
||||
resolver, err = getResolver(configFile)
|
||||
if err != nil {
|
||||
return empty, err
|
||||
}
|
||||
|
Reference in New Issue
Block a user