Add namespace in function name for metrics

This commit adds namespace in function names while logging metrics to
prometheus, irrespective of the function is invoked with namespace suffix
or not.

This is also required to add multiple namespace support to faas-idler

https://github.com/openfaas-incubator/faas-idler/issues/37 which is part
of https://github.com/openfaas/faas-netes/issues/511

Signed-off-by: Vivek Singh <vivekkmr45@yahoo.in>
This commit is contained in:
Vivek Singh 2020-03-22 13:08:11 +05:30 committed by Alex Ellis
parent be8090468e
commit f7b02b47f8
3 changed files with 125 additions and 49 deletions

View File

@ -43,15 +43,21 @@ func urlToLabel(path string) string {
// PrometheusFunctionNotifier records metrics to Prometheus // PrometheusFunctionNotifier records metrics to Prometheus
type PrometheusFunctionNotifier struct { type PrometheusFunctionNotifier struct {
Metrics *metrics.MetricOptions Metrics *metrics.MetricOptions
//FunctionNamespace default namespace of the function
FunctionNamespace string
} }
// Notify records metrics in Prometheus // Notify records metrics in Prometheus
func (p PrometheusFunctionNotifier) Notify(method string, URL string, originalURL string, statusCode int, event string, duration time.Duration) { func (p PrometheusFunctionNotifier) Notify(method string, URL string, originalURL string, statusCode int, event string, duration time.Duration) {
serviceName := getServiceName(originalURL)
if len(p.FunctionNamespace) > 0 {
if index := strings.Index(serviceName, "."); index == -1 {
serviceName = fmt.Sprintf("%s.%s", serviceName, p.FunctionNamespace)
}
}
if event == "completed" { if event == "completed" {
seconds := duration.Seconds() seconds := duration.Seconds()
serviceName := getServiceName(originalURL)
p.Metrics.GatewayFunctionsHistogram. p.Metrics.GatewayFunctionsHistogram.
WithLabelValues(serviceName). WithLabelValues(serviceName).
Observe(seconds) Observe(seconds)
@ -62,7 +68,6 @@ func (p PrometheusFunctionNotifier) Notify(method string, URL string, originalUR
With(prometheus.Labels{"function_name": serviceName, "code": code}). With(prometheus.Labels{"function_name": serviceName, "code": code}).
Inc() Inc()
} else if event == "started" { } else if event == "started" {
serviceName := getServiceName(originalURL)
p.Metrics.GatewayFunctionInvocationStarted.WithLabelValues(serviceName).Inc() p.Metrics.GatewayFunctionInvocationStarted.WithLabelValues(serviceName).Inc()
} }

View File

@ -77,7 +77,8 @@ func main() {
loggingNotifier := handlers.LoggingNotifier{} loggingNotifier := handlers.LoggingNotifier{}
prometheusNotifier := handlers.PrometheusFunctionNotifier{ prometheusNotifier := handlers.PrometheusFunctionNotifier{
Metrics: &metricsOptions, Metrics: &metricsOptions,
FunctionNamespace: config.Namespace,
} }
prometheusServiceNotifier := handlers.PrometheusServiceNotifier{ prometheusServiceNotifier := handlers.PrometheusServiceNotifier{

View File

@ -6,10 +6,12 @@ package metrics
import ( import (
"encoding/json" "encoding/json"
"fmt"
"io/ioutil" "io/ioutil"
"net" "net"
"net/http" "net/http"
"net/url" "net/url"
"path"
"time" "time"
"log" "log"
@ -56,8 +58,14 @@ func (e *Exporter) Collect(ch chan<- prometheus.Metric) {
e.metricOptions.ServiceReplicasGauge.Reset() e.metricOptions.ServiceReplicasGauge.Reset()
for _, service := range e.services { for _, service := range e.services {
var serviceName string
if len(service.Namespace) > 0 {
serviceName = fmt.Sprintf("%s.%s", service.Name, service.Namespace)
} else {
serviceName = service.Name
}
e.metricOptions.ServiceReplicasGauge. e.metricOptions.ServiceReplicasGauge.
WithLabelValues(service.Name). WithLabelValues(serviceName).
Set(float64(service.Replicas)) Set(float64(service.Replicas))
} }
@ -72,9 +80,46 @@ func (e *Exporter) StartServiceWatcher(endpointURL url.URL, metricsOptions Metri
ticker := time.NewTicker(interval) ticker := time.NewTicker(interval)
quit := make(chan struct{}) quit := make(chan struct{})
timeout := 3 * time.Second go func() {
for {
select {
case <-ticker.C:
proxyClient := http.Client{ namespaces, err := e.getNamespaces(endpointURL)
if err != nil {
log.Println(err)
}
if len(namespaces) == 0 {
emptyNamespace := ""
services, err := e.getFunctions(endpointURL, emptyNamespace)
if err != nil {
log.Println(err)
continue
}
e.services = services
} else {
for _, namespace := range namespaces {
services, err := e.getFunctions(endpointURL, namespace)
if err != nil {
log.Println(err)
continue
}
e.services = append(e.services, services...)
}
}
break
case <-quit:
return
}
}
}()
}
func (e *Exporter) getHTTPClient(timeout time.Duration) http.Client {
return http.Client{
Transport: &http.Transport{ Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment, Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{ DialContext: (&net.Dialer{
@ -87,45 +132,70 @@ func (e *Exporter) StartServiceWatcher(endpointURL url.URL, metricsOptions Metri
ExpectContinueTimeout: 1500 * time.Millisecond, ExpectContinueTimeout: 1500 * time.Millisecond,
}, },
} }
}
go func() {
for { func (e *Exporter) getFunctions(endpointURL url.URL, namespace string) ([]types.FunctionStatus, error) {
select { timeout := 3 * time.Second
case <-ticker.C: proxyClient := e.getHTTPClient(timeout)
get, err := http.NewRequest(http.MethodGet, endpointURL.String()+"system/functions", nil) endpointURL.Path = path.Join(endpointURL.Path, "/system/functions")
if err != nil { if len(namespace) > 0 {
log.Println(err) q := endpointURL.Query()
return q.Set("namespace", namespace)
} endpointURL.RawQuery = q.Encode()
}
if e.credentials != nil {
get.SetBasicAuth(e.credentials.User, e.credentials.Password) get, _ := http.NewRequest(http.MethodGet, endpointURL.String(), nil)
} if e.credentials != nil {
get.SetBasicAuth(e.credentials.User, e.credentials.Password)
services := []types.FunctionStatus{} }
res, err := proxyClient.Do(get)
if err != nil { services := []types.FunctionStatus{}
log.Println(err) res, err := proxyClient.Do(get)
continue if err != nil {
} return services, err
bytesOut, readErr := ioutil.ReadAll(res.Body) }
if readErr != nil {
log.Println(err) bytesOut, readErr := ioutil.ReadAll(res.Body)
continue if readErr != nil {
} return services, readErr
unmarshalErr := json.Unmarshal(bytesOut, &services) }
if unmarshalErr != nil {
log.Println(err) unmarshalErr := json.Unmarshal(bytesOut, &services)
continue if unmarshalErr != nil {
} return services, unmarshalErr
}
e.services = services return services, nil
}
break
case <-quit: func (e *Exporter) getNamespaces(endpointURL url.URL) ([]string, error) {
return namespaces := []string{}
}
} get, _ := http.NewRequest(http.MethodGet, endpointURL.String()+"system/namespaces", nil)
}() if e.credentials != nil {
get.SetBasicAuth(e.credentials.User, e.credentials.Password)
}
timeout := 3 * time.Second
proxyClient := e.getHTTPClient(timeout)
res, err := proxyClient.Do(get)
if err != nil {
return namespaces, err
}
if res.StatusCode == http.StatusNotFound {
return namespaces, nil
}
bytesOut, readErr := ioutil.ReadAll(res.Body)
if readErr != nil {
return namespaces, readErr
}
unmarshalErr := json.Unmarshal(bytesOut, &namespaces)
if unmarshalErr != nil {
return namespaces, unmarshalErr
}
return namespaces, nil
} }