faas/gateway/handlers/alerthandler.go
Alex Ellis (OpenFaaS Ltd) df4126d8f5 Scale functions with namespace option
Allows alerts to trigger functions to scale when they
also have an optional namespace set.

Tested e2e with Kubernetes 1.15 and a non-default namespace.

Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alexellis2@gmail.com>
2019-09-20 18:38:55 +01:00

116 lines
3.1 KiB
Go

// Copyright (c) Alex Ellis 2017. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
package handlers
import (
"encoding/json"
"fmt"
"io/ioutil"
"log"
"math"
"net/http"
"github.com/openfaas/faas/gateway/requests"
"github.com/openfaas/faas/gateway/scaling"
)
// MakeAlertHandler handles alerts from Prometheus Alertmanager
func MakeAlertHandler(service scaling.ServiceQuery, defaultNamespace string) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
log.Println("Alert received.")
body, readErr := ioutil.ReadAll(r.Body)
log.Println(string(body))
if readErr != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte("Unable to read alert."))
log.Println(readErr)
return
}
var req requests.PrometheusAlert
err := json.Unmarshal(body, &req)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte("Unable to parse alert, bad format."))
log.Println(err)
return
}
errors := handleAlerts(&req, service, defaultNamespace)
if len(errors) > 0 {
log.Println(errors)
var errorOutput string
for d, err := range errors {
errorOutput += fmt.Sprintf("[%d] %s\n", d, err)
}
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(errorOutput))
return
}
w.WriteHeader(http.StatusOK)
}
}
func handleAlerts(req *requests.PrometheusAlert, service scaling.ServiceQuery, defaultNamespace string) []error {
var errors []error
for _, alert := range req.Alerts {
if err := scaleService(alert, service, defaultNamespace); err != nil {
log.Println(err)
errors = append(errors, err)
}
}
return errors
}
func scaleService(alert requests.PrometheusInnerAlert, service scaling.ServiceQuery, defaultNamespace string) error {
var err error
serviceName, namespace := getNamespace(defaultNamespace, alert.Labels.FunctionName)
if len(serviceName) > 0 {
queryResponse, getErr := service.GetReplicas(serviceName, namespace)
if getErr == nil {
status := alert.Status
newReplicas := CalculateReplicas(status, queryResponse.Replicas, uint64(queryResponse.MaxReplicas), queryResponse.MinReplicas, queryResponse.ScalingFactor)
log.Printf("[Scale] function=%s %d => %d.\n", serviceName, queryResponse.Replicas, newReplicas)
if newReplicas == queryResponse.Replicas {
return nil
}
updateErr := service.SetReplicas(serviceName, namespace, newReplicas)
if updateErr != nil {
err = updateErr
}
}
}
return err
}
// CalculateReplicas decides what replica count to set depending on current/desired amount
func CalculateReplicas(status string, currentReplicas uint64, maxReplicas uint64, minReplicas uint64, scalingFactor uint64) uint64 {
newReplicas := currentReplicas
step := uint64(math.Ceil(float64(maxReplicas) / 100 * float64(scalingFactor)))
if status == "firing" && step > 0 {
if currentReplicas+step > maxReplicas {
newReplicas = maxReplicas
} else {
newReplicas = currentReplicas + step
}
} else { // Resolved event.
newReplicas = minReplicas
}
return newReplicas
}