mirror of
https://github.com/openfaas/faas.git
synced 2025-06-17 12:46:59 +00:00
Enable handling of multiple concurrent alerts
This commit is contained in:
parent
f2172462bb
commit
09cc91108a
@ -9,6 +9,8 @@ import (
|
|||||||
|
|
||||||
"strconv"
|
"strconv"
|
||||||
|
|
||||||
|
"fmt"
|
||||||
|
|
||||||
"github.com/alexellis/faas/gateway/requests"
|
"github.com/alexellis/faas/gateway/requests"
|
||||||
"github.com/docker/docker/api/types"
|
"github.com/docker/docker/api/types"
|
||||||
"github.com/docker/docker/client"
|
"github.com/docker/docker/client"
|
||||||
@ -21,11 +23,15 @@ const DefaultMaxReplicas = 20
|
|||||||
func MakeAlertHandler(c *client.Client) http.HandlerFunc {
|
func MakeAlertHandler(c *client.Client) http.HandlerFunc {
|
||||||
return func(w http.ResponseWriter, r *http.Request) {
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
log.Println("Alert received.")
|
log.Println("Alert received.")
|
||||||
|
|
||||||
body, readErr := ioutil.ReadAll(r.Body)
|
body, readErr := ioutil.ReadAll(r.Body)
|
||||||
|
|
||||||
log.Println(string(body))
|
log.Println(string(body))
|
||||||
|
|
||||||
if readErr != nil {
|
if readErr != nil {
|
||||||
|
w.WriteHeader(http.StatusBadRequest)
|
||||||
|
w.Write([]byte("Unable to read alert."))
|
||||||
|
|
||||||
log.Println(readErr)
|
log.Println(readErr)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -33,24 +39,43 @@ func MakeAlertHandler(c *client.Client) http.HandlerFunc {
|
|||||||
var req requests.PrometheusAlert
|
var req requests.PrometheusAlert
|
||||||
err := json.Unmarshal(body, &req)
|
err := json.Unmarshal(body, &req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
w.WriteHeader(http.StatusBadRequest)
|
||||||
|
w.Write([]byte("Unable to parse alert, bad format."))
|
||||||
log.Println(err)
|
log.Println(err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(req.Alerts) > 0 {
|
errors := handleAlerts(&req, c)
|
||||||
if err := scaleService(req, c); err != nil {
|
if len(errors) > 0 {
|
||||||
log.Println(err)
|
log.Println(errors)
|
||||||
w.WriteHeader(http.StatusInternalServerError)
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
|
|
||||||
|
var errorOutput string
|
||||||
|
for d, err := range errors {
|
||||||
|
errorOutput += fmt.Sprintf("[%d] %s\n", d, err)
|
||||||
|
}
|
||||||
|
w.Write([]byte(errorOutput))
|
||||||
} else {
|
} else {
|
||||||
w.WriteHeader(http.StatusOK)
|
w.WriteHeader(http.StatusOK)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func handleAlerts(req *requests.PrometheusAlert, c *client.Client) []error {
|
||||||
|
var errors []error
|
||||||
|
for _, alert := range req.Alerts {
|
||||||
|
if err := scaleService(alert, c); err != nil {
|
||||||
|
log.Println(err)
|
||||||
|
errors = append(errors, err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func scaleService(req requests.PrometheusAlert, c *client.Client) error {
|
return errors
|
||||||
|
}
|
||||||
|
|
||||||
|
func scaleService(alert requests.PrometheusInnerAlert, c *client.Client) error {
|
||||||
var err error
|
var err error
|
||||||
serviceName := req.Alerts[0].Labels.FunctionName
|
serviceName := alert.Labels.FunctionName
|
||||||
|
|
||||||
if len(serviceName) > 0 {
|
if len(serviceName) > 0 {
|
||||||
opts := types.ServiceInspectOptions{
|
opts := types.ServiceInspectOptions{
|
||||||
@ -61,7 +86,7 @@ func scaleService(req requests.PrometheusAlert, c *client.Client) error {
|
|||||||
if inspectErr == nil {
|
if inspectErr == nil {
|
||||||
|
|
||||||
currentReplicas := *service.Spec.Mode.Replicated.Replicas
|
currentReplicas := *service.Spec.Mode.Replicated.Replicas
|
||||||
status := req.Status
|
status := alert.Status
|
||||||
|
|
||||||
replicaLabel := service.Spec.TaskTemplate.ContainerSpec.Labels["com.faas.max_replicas"]
|
replicaLabel := service.Spec.TaskTemplate.ContainerSpec.Labels["com.faas.max_replicas"]
|
||||||
maxReplicas := DefaultMaxReplicas
|
maxReplicas := DefaultMaxReplicas
|
||||||
@ -73,20 +98,19 @@ func scaleService(req requests.PrometheusAlert, c *client.Client) error {
|
|||||||
}
|
}
|
||||||
newReplicas := CalculateReplicas(status, currentReplicas, uint64(maxReplicas))
|
newReplicas := CalculateReplicas(status, currentReplicas, uint64(maxReplicas))
|
||||||
|
|
||||||
|
log.Printf("[Scale] function=%s %d => %d.\n", serviceName, currentReplicas, newReplicas)
|
||||||
if newReplicas == currentReplicas {
|
if newReplicas == currentReplicas {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Printf("Scaling %s to %d replicas.\n", serviceName, newReplicas)
|
|
||||||
service.Spec.Mode.Replicated.Replicas = &newReplicas
|
service.Spec.Mode.Replicated.Replicas = &newReplicas
|
||||||
updateOpts := types.ServiceUpdateOptions{}
|
updateOpts := types.ServiceUpdateOptions{}
|
||||||
updateOpts.RegistryAuthFrom = types.RegistryAuthFromSpec
|
updateOpts.RegistryAuthFrom = types.RegistryAuthFromSpec
|
||||||
|
|
||||||
response, updateErr := c.ServiceUpdate(context.Background(), service.ID, service.Version, service.Spec, updateOpts)
|
_, updateErr := c.ServiceUpdate(context.Background(), service.ID, service.Version, service.Spec, updateOpts)
|
||||||
if updateErr != nil {
|
if updateErr != nil {
|
||||||
err = updateErr
|
err = updateErr
|
||||||
}
|
}
|
||||||
log.Println(response)
|
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
err = inspectErr
|
err = inspectErr
|
||||||
|
Loading…
x
Reference in New Issue
Block a user