Changed Metrics to be exposed via Exporter

This change exposes the gateway metrics with an exporter which
implements the Collector interface of prometheus.
This change Fixes #697

Signed-off-by: Ken Fukuyama <kenfdev@gmail.com>
This commit is contained in:
Ken Fukuyama 2018-08-07 07:33:25 +09:00 committed by Alex Ellis
parent 6d6a487711
commit 4fabd50799
4 changed files with 134 additions and 15 deletions

View File

@ -14,9 +14,46 @@ import (
"log"
"github.com/openfaas/faas/gateway/requests"
"github.com/prometheus/client_golang/prometheus"
)
func AttachExternalWatcher(endpointURL url.URL, metricsOptions MetricOptions, label string, interval time.Duration) {
// Exporter is a prometheus exporter
type Exporter struct {
metricOptions MetricOptions
services []requests.Function
}
// NewExporter creates a new exporter for the OpenFaaS gateway metrics
func NewExporter(options MetricOptions) *Exporter {
return &Exporter{
metricOptions: options,
services: []requests.Function{},
}
}
// Describe is to describe the metrics for Prometheus
func (e *Exporter) Describe(ch chan<- *prometheus.Desc) {
e.metricOptions.GatewayFunctionInvocation.Describe(ch)
e.metricOptions.GatewayFunctionsHistogram.Describe(ch)
e.metricOptions.ServiceReplicasCounter.Describe(ch)
}
// Collect collects data to be consumed by prometheus
func (e *Exporter) Collect(ch chan<- prometheus.Metric) {
e.metricOptions.GatewayFunctionInvocation.Collect(ch)
e.metricOptions.GatewayFunctionsHistogram.Collect(ch)
e.metricOptions.ServiceReplicasCounter.Reset()
for _, service := range e.services {
e.metricOptions.ServiceReplicasCounter.
WithLabelValues(service.Name).
Set(float64(service.Replicas))
}
e.metricOptions.ServiceReplicasCounter.Collect(ch)
}
// StartServiceWatcher starts a ticker and collects service replica counts to expose to prometheus
func (e *Exporter) StartServiceWatcher(endpointURL url.URL, metricsOptions MetricOptions, label string, interval time.Duration) {
ticker := time.NewTicker(interval)
quit := make(chan struct{})
proxyClient := http.Client{
@ -57,11 +94,7 @@ func AttachExternalWatcher(endpointURL url.URL, metricsOptions MetricOptions, la
continue
}
for _, service := range services {
metricsOptions.ServiceReplicasCounter.
WithLabelValues(service.Name).
Set(float64(service.Replicas))
}
e.services = services
break
case <-quit:

View File

@ -0,0 +1,88 @@
package metrics
import (
"testing"
"github.com/openfaas/faas/gateway/requests"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
)
type metricResult struct {
value float64
labels map[string]string
}
func labels2Map(labels []*dto.LabelPair) map[string]string {
res := map[string]string{}
for _, l := range labels {
res[l.GetName()] = l.GetValue()
}
return res
}
func readGauge(g prometheus.Metric) metricResult {
m := &dto.Metric{}
g.Write(m)
return metricResult{
value: m.GetGauge().GetValue(),
labels: labels2Map(m.GetLabel()),
}
}
func Test_Describe_DescribesThePrometheusMetrics(t *testing.T) {
metricsOptions := BuildMetricsOptions()
exporter := NewExporter(metricsOptions)
ch := make(chan *prometheus.Desc)
defer close(ch)
go exporter.Describe(ch)
d := (<-ch)
expectedGatewayFunctionInvocationDesc := `Desc{fqName: "gateway_function_invocation_total", help: "Individual function metrics", constLabels: {}, variableLabels: [function_name code]}`
actualGatewayFunctionInvocationDesc := d.String()
if expectedGatewayFunctionInvocationDesc != actualGatewayFunctionInvocationDesc {
t.Errorf("Want %s, got: %s", expectedGatewayFunctionInvocationDesc, actualGatewayFunctionInvocationDesc)
}
d = (<-ch)
expectedGatewayFunctionsHistogramDesc := `Desc{fqName: "gateway_functions_seconds", help: "Function time taken", constLabels: {}, variableLabels: [function_name]}`
actualGatewayFunctionsHistogramDesc := d.String()
if expectedGatewayFunctionsHistogramDesc != actualGatewayFunctionsHistogramDesc {
t.Errorf("Want %s, got: %s", expectedGatewayFunctionsHistogramDesc, actualGatewayFunctionsHistogramDesc)
}
d = (<-ch)
expectedServiceReplicasCounterDesc := `Desc{fqName: "gateway_service_count", help: "Docker service replicas", constLabels: {}, variableLabels: [function_name]}`
actualServiceReplicasCounterDesc := d.String()
if expectedServiceReplicasCounterDesc != actualServiceReplicasCounterDesc {
t.Errorf("Want %s, got: %s", expectedServiceReplicasCounterDesc, actualServiceReplicasCounterDesc)
}
}
func Test_Collect_CollectsTheNumberOfReplicasOfAService(t *testing.T) {
metricsOptions := BuildMetricsOptions()
exporter := NewExporter(metricsOptions)
expectedService := requests.Function{
Name: "function_with_two_replica",
Replicas: 2,
}
exporter.services = []requests.Function{expectedService}
ch := make(chan prometheus.Metric)
defer close(ch)
go exporter.Collect(ch)
g := (<-ch).(prometheus.Gauge)
result := readGauge(g)
if expectedService.Name != result.labels["function_name"] {
t.Errorf("Want %s, got %s", expectedService.Name, result.labels["function_name"])
}
expectedReplicas := float64(expectedService.Replicas)
if expectedReplicas != result.value {
t.Errorf("Want %f, got %f", expectedReplicas, result.value)
}
}

View File

@ -54,8 +54,6 @@ func BuildMetricsOptions() MetricOptions {
}
//RegisterMetrics registers with Prometheus for tracking
func RegisterMetrics(metricsOptions MetricOptions) {
prometheus.Register(metricsOptions.GatewayFunctionInvocation)
prometheus.Register(metricsOptions.GatewayFunctionsHistogram)
prometheus.Register(metricsOptions.ServiceReplicasCounter)
func RegisterExporter(exporter *Exporter) {
prometheus.MustRegister(exporter)
}

View File

@ -47,13 +47,15 @@ func main() {
}
}
metricsOptions := metrics.BuildMetricsOptions()
metrics.RegisterMetrics(metricsOptions)
var faasHandlers types.HandlerSet
servicePollInterval := time.Second * 5
metricsOptions := metrics.BuildMetricsOptions()
exporter := metrics.NewExporter(metricsOptions)
exporter.StartServiceWatcher(*config.FunctionsProviderURL, metricsOptions, "func", servicePollInterval)
metrics.RegisterExporter(exporter)
reverseProxy := types.NewHTTPClientReverseProxy(config.FunctionsProviderURL, config.UpstreamTimeout)
loggingNotifier := handlers.LoggingNotifier{}
@ -84,8 +86,6 @@ func main() {
alertHandler := plugin.NewExternalServiceQuery(*config.FunctionsProviderURL)
faasHandlers.Alert = handlers.MakeAlertHandler(alertHandler)
metrics.AttachExternalWatcher(*config.FunctionsProviderURL, metricsOptions, "func", servicePollInterval)
if config.UseNATS() {
log.Println("Async enabled: Using NATS Streaming.")
natsQueue, queueErr := natsHandler.CreateNatsQueue(*config.NATSAddress, *config.NATSPort, natsHandler.DefaultNatsConfig{})