mirror of
https://github.com/openfaas/faasd.git
synced 2025-06-19 12:36:38 +00:00
Compare commits
2 Commits
0.17.0
...
wait-for-h
Author | SHA1 | Date | |
---|---|---|---|
bbd3b4ff07 | |||
1d07fda0a4 |
@ -98,7 +98,7 @@ func makeProviderCmd() *cobra.Command {
|
|||||||
DeployHandler: handlers.MakeDeployHandler(client, cni, baseUserSecretsPath, alwaysPull),
|
DeployHandler: handlers.MakeDeployHandler(client, cni, baseUserSecretsPath, alwaysPull),
|
||||||
FunctionReader: handlers.MakeReadHandler(client),
|
FunctionReader: handlers.MakeReadHandler(client),
|
||||||
ReplicaReader: handlers.MakeReplicaReaderHandler(client),
|
ReplicaReader: handlers.MakeReplicaReaderHandler(client),
|
||||||
ReplicaUpdater: handlers.MakeReplicaUpdateHandler(client, cni),
|
ReplicaUpdater: handlers.MakeReplicaUpdateHandler(client, cni, invokeResolver),
|
||||||
UpdateHandler: handlers.MakeUpdateHandler(client, cni, baseUserSecretsPath, alwaysPull),
|
UpdateHandler: handlers.MakeUpdateHandler(client, cni, baseUserSecretsPath, alwaysPull),
|
||||||
HealthHandler: func(w http.ResponseWriter, r *http.Request) {},
|
HealthHandler: func(w http.ResponseWriter, r *http.Request) {},
|
||||||
InfoHandler: handlers.MakeInfoHandler(Version, GitCommit),
|
InfoHandler: handlers.MakeInfoHandler(Version, GitCommit),
|
||||||
|
@ -6,16 +6,20 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"log"
|
"log"
|
||||||
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"net/url"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/containerd/containerd"
|
"github.com/containerd/containerd"
|
||||||
"github.com/containerd/containerd/namespaces"
|
"github.com/containerd/containerd/namespaces"
|
||||||
gocni "github.com/containerd/go-cni"
|
gocni "github.com/containerd/go-cni"
|
||||||
|
|
||||||
|
"github.com/openfaas/faas-provider/proxy"
|
||||||
"github.com/openfaas/faas-provider/types"
|
"github.com/openfaas/faas-provider/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
func MakeReplicaUpdateHandler(client *containerd.Client, cni gocni.CNI) func(w http.ResponseWriter, r *http.Request) {
|
func MakeReplicaUpdateHandler(client *containerd.Client, cni gocni.CNI, resolver proxy.BaseURLResolver) func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
|
||||||
return func(w http.ResponseWriter, r *http.Request) {
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
|
||||||
@ -30,12 +34,9 @@ func MakeReplicaUpdateHandler(client *containerd.Client, cni gocni.CNI) func(w h
|
|||||||
log.Printf("[Scale] request: %s\n", string(body))
|
log.Printf("[Scale] request: %s\n", string(body))
|
||||||
|
|
||||||
req := types.ScaleServiceRequest{}
|
req := types.ScaleServiceRequest{}
|
||||||
err := json.Unmarshal(body, &req)
|
if err := json.Unmarshal(body, &req); err != nil {
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("[Scale] error parsing input: %s\n", err)
|
log.Printf("[Scale] error parsing input: %s\n", err)
|
||||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -55,18 +56,23 @@ func MakeReplicaUpdateHandler(client *containerd.Client, cni gocni.CNI) func(w h
|
|||||||
|
|
||||||
name := req.ServiceName
|
name := req.ServiceName
|
||||||
|
|
||||||
if _, err := GetFunction(client, name, namespace); err != nil {
|
fn, err := GetFunction(client, name, namespace)
|
||||||
|
if err != nil {
|
||||||
msg := fmt.Sprintf("service %s not found", name)
|
msg := fmt.Sprintf("service %s not found", name)
|
||||||
log.Printf("[Scale] %s\n", msg)
|
log.Printf("[Scale] %s\n", msg)
|
||||||
http.Error(w, msg, http.StatusNotFound)
|
http.Error(w, msg, http.StatusNotFound)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx := namespaces.WithNamespace(context.Background(), namespace)
|
healthPath := "/_/healthz"
|
||||||
|
if v := fn.annotations["com.openfaas.health.http.path"]; len(v) > 0 {
|
||||||
|
healthPath = v
|
||||||
|
}
|
||||||
|
|
||||||
ctr, ctrErr := client.LoadContainer(ctx, name)
|
ctx := namespaces.WithNamespace(context.Background(), namespace)
|
||||||
if ctrErr != nil {
|
ctr, err := client.LoadContainer(ctx, name)
|
||||||
msg := fmt.Sprintf("cannot load service %s, error: %s", name, ctrErr)
|
if err != nil {
|
||||||
|
msg := fmt.Sprintf("cannot load service %s, error: %s", name, err)
|
||||||
log.Printf("[Scale] %s\n", msg)
|
log.Printf("[Scale] %s\n", msg)
|
||||||
http.Error(w, msg, http.StatusNotFound)
|
http.Error(w, msg, http.StatusNotFound)
|
||||||
return
|
return
|
||||||
@ -75,16 +81,16 @@ func MakeReplicaUpdateHandler(client *containerd.Client, cni gocni.CNI) func(w h
|
|||||||
var taskExists bool
|
var taskExists bool
|
||||||
var taskStatus *containerd.Status
|
var taskStatus *containerd.Status
|
||||||
|
|
||||||
task, taskErr := ctr.Task(ctx, nil)
|
task, err := ctr.Task(ctx, nil)
|
||||||
if taskErr != nil {
|
if err != nil {
|
||||||
msg := fmt.Sprintf("cannot load task for service %s, error: %s", name, taskErr)
|
msg := fmt.Sprintf("cannot load task for service %s, error: %s", name, err)
|
||||||
log.Printf("[Scale] %s\n", msg)
|
log.Printf("[Scale] %s\n", msg)
|
||||||
taskExists = false
|
taskExists = false
|
||||||
} else {
|
} else {
|
||||||
taskExists = true
|
taskExists = true
|
||||||
status, statusErr := task.Status(ctx)
|
status, err := task.Status(ctx)
|
||||||
if statusErr != nil {
|
if err != nil {
|
||||||
msg := fmt.Sprintf("cannot load task status for %s, error: %s", name, statusErr)
|
msg := fmt.Sprintf("cannot load task status for %s, error: %s", name, err)
|
||||||
log.Printf("[Scale] %s\n", msg)
|
log.Printf("[Scale] %s\n", msg)
|
||||||
http.Error(w, msg, http.StatusInternalServerError)
|
http.Error(w, msg, http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
@ -99,28 +105,31 @@ func MakeReplicaUpdateHandler(client *containerd.Client, cni gocni.CNI) func(w h
|
|||||||
if req.Replicas == 0 {
|
if req.Replicas == 0 {
|
||||||
// If a task is running, pause it
|
// If a task is running, pause it
|
||||||
if taskExists && taskStatus.Status == containerd.Running {
|
if taskExists && taskStatus.Status == containerd.Running {
|
||||||
if pauseErr := task.Pause(ctx); pauseErr != nil {
|
if err := task.Pause(ctx); err != nil {
|
||||||
wrappedPauseErr := fmt.Errorf("error pausing task %s, error: %s", name, pauseErr)
|
werr := fmt.Errorf("error pausing task %s, error: %s", name, err)
|
||||||
log.Printf("[Scale] %s\n", wrappedPauseErr.Error())
|
log.Printf("[Scale] %s\n", werr.Error())
|
||||||
http.Error(w, wrappedPauseErr.Error(), http.StatusNotFound)
|
http.Error(w, werr.Error(), http.StatusNotFound)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Otherwise, no action is required
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if taskExists {
|
if taskExists {
|
||||||
if taskStatus != nil {
|
if taskStatus != nil {
|
||||||
if taskStatus.Status == containerd.Paused {
|
if taskStatus.Status == containerd.Paused {
|
||||||
if resumeErr := task.Resume(ctx); resumeErr != nil {
|
if err := task.Resume(ctx); err != nil {
|
||||||
log.Printf("[Scale] error resuming task %s, error: %s\n", name, resumeErr)
|
log.Printf("[Scale] error resuming task %s, error: %s\n", name, err)
|
||||||
http.Error(w, resumeErr.Error(), http.StatusBadRequest)
|
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
} else if taskStatus.Status == containerd.Stopped {
|
} else if taskStatus.Status == containerd.Stopped {
|
||||||
// Stopped tasks cannot be restarted, must be removed, and created again
|
// Stopped tasks cannot be restarted, must be removed, and created again
|
||||||
if _, delErr := task.Delete(ctx); delErr != nil {
|
if _, err := task.Delete(ctx); err != nil {
|
||||||
log.Printf("[Scale] error deleting stopped task %s, error: %s\n", name, delErr)
|
log.Printf("[Scale] error deleting stopped task %s, error: %s\n", name, err)
|
||||||
http.Error(w, delErr.Error(), http.StatusBadRequest)
|
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
createNewTask = true
|
createNewTask = true
|
||||||
@ -131,12 +140,70 @@ func MakeReplicaUpdateHandler(client *containerd.Client, cni gocni.CNI) func(w h
|
|||||||
}
|
}
|
||||||
|
|
||||||
if createNewTask {
|
if createNewTask {
|
||||||
deployErr := createTask(ctx, client, ctr, cni)
|
err := createTask(ctx, client, ctr, cni)
|
||||||
if deployErr != nil {
|
if err != nil {
|
||||||
log.Printf("[Scale] error deploying %s, error: %s\n", name, deployErr)
|
log.Printf("[Scale] error deploying %s, error: %s\n", name, err)
|
||||||
http.Error(w, deployErr.Error(), http.StatusBadRequest)
|
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := waitUntilHealthy(name, resolver, healthPath); err != nil {
|
||||||
|
log.Printf("[Scale] error waiting for function %s to become ready, error: %s\n", name, err)
|
||||||
|
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// waitUntilHealthy blocks until the healthPath returns a HTTP 200 for the
|
||||||
|
// IP address resolved for the given function.
|
||||||
|
// Maximum retries: 100
|
||||||
|
// Delay between each attempt: 20ms
|
||||||
|
// A custom path can be set via an annotation in the function's spec:
|
||||||
|
// com.openfaas.health.http.path: /handlers/ready
|
||||||
|
//
|
||||||
|
func waitUntilHealthy(name string, resolver proxy.BaseURLResolver, healthPath string) error {
|
||||||
|
endpoint, err := resolver.Resolve(name)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
host, port, _ := net.SplitHostPort(endpoint.Host)
|
||||||
|
u, err := url.Parse(fmt.Sprintf("http://%s:%s%s", host, port, healthPath))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try to hit the health endpoint and block until
|
||||||
|
// ready.
|
||||||
|
attempts := 100
|
||||||
|
pause := time.Millisecond * 20
|
||||||
|
for i := 0; i < attempts; i++ {
|
||||||
|
req, err := http.NewRequest(http.MethodGet, u.String(), nil)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
res, err := http.DefaultClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if res.Body != nil {
|
||||||
|
res.Body.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
if res.StatusCode != http.StatusOK {
|
||||||
|
return fmt.Errorf("unexpected health status: %d", res.StatusCode)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
time.Sleep(pause)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
Reference in New Issue
Block a user