Export new metrics for OpenFaaS Pro scaling

* Add service target metric
* Add service min replicas metric
* Add scale type metric

These combined allow new auto-scaling modes and parameters
for OpenFaaS Pro customers.

Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alexellis2@gmail.com>
This commit is contained in:
Alex Ellis (OpenFaaS Ltd)
2022-01-24 12:11:54 +00:00
committed by Alex Ellis
parent 34735d61d0
commit d85d5e7239
23 changed files with 235 additions and 472 deletions

View File

@ -35,9 +35,7 @@ func AddMetricsHandler(handler http.HandlerFunc, prometheusQuery PrometheusQuery
recorder.Code,
string(upstreamBody))
w.Header().Set("Content-Type", "text/plain")
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(fmt.Sprintf("List functions responded with code %d", recorder.Code)))
http.Error(w, "Unexpected status code retriving functions from backend", http.StatusInternalServerError)
return
}
@ -48,28 +46,33 @@ func AddMetricsHandler(handler http.HandlerFunc, prometheusQuery PrometheusQuery
if err != nil {
log.Printf("Metrics upstream error: %s", err)
w.Header().Set("Content-Type", "text/plain")
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte("Error parsing metrics from upstream provider/backend."))
http.Error(w, "Error parsing metrics from upstream provider/backend", http.StatusInternalServerError)
return
}
expr := url.QueryEscape(`sum(gateway_function_invocation_total{function_name=~".*", code=~".*"}) by (function_name, code)`)
// expr := "sum(gateway_function_invocation_total%7Bfunction_name%3D~%22.*%22%2C+code%3D~%22.*%22%7D)+by+(function_name%2C+code)"
results, fetchErr := prometheusQuery.Fetch(expr)
if fetchErr != nil {
log.Printf("Error querying Prometheus API: %s\n", fetchErr.Error())
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write(upstreamBody)
return
// Ensure values are empty first.
for i := range functions {
functions[i].InvocationCount = 0
}
mixIn(&functions, results)
if len(functions) > 0 {
bytesOut, marshalErr := json.Marshal(functions)
if marshalErr != nil {
log.Println(marshalErr)
ns := functions[0].Namespace
q := fmt.Sprintf(`sum(gateway_function_invocation_total{function_name=~".*.%s"}) by (function_name)`, ns)
// Restrict query results to only function names matching namespace suffix.
results, err := prometheusQuery.Fetch(url.QueryEscape(q))
if err != nil {
log.Printf("Error querying Prometheus: %s\n", err.Error())
return
}
mixIn(&functions, results)
}
bytesOut, err := json.Marshal(functions)
if err != nil {
log.Printf("Error serializing functions: %s", err)
http.Error(w, "error writing response after adding metrics", http.StatusInternalServerError)
return
}
@ -85,25 +88,19 @@ func mixIn(functions *[]types.FunctionStatus, metrics *VectorQueryResponse) {
return
}
// Ensure values are empty first.
for i := range *functions {
(*functions)[i].InvocationCount = 0
}
for i, function := range *functions {
for _, v := range metrics.Data.Result {
if v.Metric.FunctionName == fmt.Sprintf("%s.%s", function.Name, function.Namespace) {
metricValue := v.Value[1]
switch metricValue.(type) {
switch value := metricValue.(type) {
case string:
f, strconvErr := strconv.ParseFloat(metricValue.(string), 64)
if strconvErr != nil {
log.Printf("Unable to convert value for metric: %s\n", strconvErr)
f, err := strconv.ParseFloat(value, 64)
if err != nil {
log.Printf("add_metrics: unable to convert value %q for metric: %s", value, err)
continue
}
(*functions)[i].InvocationCount += f
break
}
}
}