diff --git a/gateway/metrics/exporter.go b/gateway/metrics/exporter.go index 0d642dbd..25ff9141 100644 --- a/gateway/metrics/exporter.go +++ b/gateway/metrics/exporter.go @@ -14,9 +14,46 @@ import ( "log" "github.com/openfaas/faas/gateway/requests" + "github.com/prometheus/client_golang/prometheus" ) -func AttachExternalWatcher(endpointURL url.URL, metricsOptions MetricOptions, label string, interval time.Duration) { +// Exporter is a prometheus exporter +type Exporter struct { + metricOptions MetricOptions + services []requests.Function +} + +// NewExporter creates a new exporter for the OpenFaaS gateway metrics +func NewExporter(options MetricOptions) *Exporter { + return &Exporter{ + metricOptions: options, + services: []requests.Function{}, + } +} + +// Describe is to describe the metrics for Prometheus +func (e *Exporter) Describe(ch chan<- *prometheus.Desc) { + e.metricOptions.GatewayFunctionInvocation.Describe(ch) + e.metricOptions.GatewayFunctionsHistogram.Describe(ch) + e.metricOptions.ServiceReplicasCounter.Describe(ch) +} + +// Collect collects data to be consumed by prometheus +func (e *Exporter) Collect(ch chan<- prometheus.Metric) { + e.metricOptions.GatewayFunctionInvocation.Collect(ch) + e.metricOptions.GatewayFunctionsHistogram.Collect(ch) + + e.metricOptions.ServiceReplicasCounter.Reset() + for _, service := range e.services { + e.metricOptions.ServiceReplicasCounter. + WithLabelValues(service.Name). + Set(float64(service.Replicas)) + } + e.metricOptions.ServiceReplicasCounter.Collect(ch) +} + +// StartServiceWatcher starts a ticker and collects service replica counts to expose to prometheus +func (e *Exporter) StartServiceWatcher(endpointURL url.URL, metricsOptions MetricOptions, label string, interval time.Duration) { ticker := time.NewTicker(interval) quit := make(chan struct{}) proxyClient := http.Client{ @@ -57,11 +94,7 @@ func AttachExternalWatcher(endpointURL url.URL, metricsOptions MetricOptions, la continue } - for _, service := range services { - metricsOptions.ServiceReplicasCounter. - WithLabelValues(service.Name). - Set(float64(service.Replicas)) - } + e.services = services break case <-quit: diff --git a/gateway/metrics/exporter_test.go b/gateway/metrics/exporter_test.go new file mode 100644 index 00000000..02a94cc4 --- /dev/null +++ b/gateway/metrics/exporter_test.go @@ -0,0 +1,88 @@ +package metrics + +import ( + "testing" + + "github.com/openfaas/faas/gateway/requests" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" +) + +type metricResult struct { + value float64 + labels map[string]string +} + +func labels2Map(labels []*dto.LabelPair) map[string]string { + res := map[string]string{} + for _, l := range labels { + res[l.GetName()] = l.GetValue() + } + return res +} + +func readGauge(g prometheus.Metric) metricResult { + m := &dto.Metric{} + g.Write(m) + + return metricResult{ + value: m.GetGauge().GetValue(), + labels: labels2Map(m.GetLabel()), + } +} + +func Test_Describe_DescribesThePrometheusMetrics(t *testing.T) { + metricsOptions := BuildMetricsOptions() + exporter := NewExporter(metricsOptions) + + ch := make(chan *prometheus.Desc) + defer close(ch) + + go exporter.Describe(ch) + + d := (<-ch) + expectedGatewayFunctionInvocationDesc := `Desc{fqName: "gateway_function_invocation_total", help: "Individual function metrics", constLabels: {}, variableLabels: [function_name code]}` + actualGatewayFunctionInvocationDesc := d.String() + if expectedGatewayFunctionInvocationDesc != actualGatewayFunctionInvocationDesc { + t.Errorf("Want %s, got: %s", expectedGatewayFunctionInvocationDesc, actualGatewayFunctionInvocationDesc) + } + d = (<-ch) + expectedGatewayFunctionsHistogramDesc := `Desc{fqName: "gateway_functions_seconds", help: "Function time taken", constLabels: {}, variableLabels: [function_name]}` + actualGatewayFunctionsHistogramDesc := d.String() + if expectedGatewayFunctionsHistogramDesc != actualGatewayFunctionsHistogramDesc { + t.Errorf("Want %s, got: %s", expectedGatewayFunctionsHistogramDesc, actualGatewayFunctionsHistogramDesc) + } + d = (<-ch) + expectedServiceReplicasCounterDesc := `Desc{fqName: "gateway_service_count", help: "Docker service replicas", constLabels: {}, variableLabels: [function_name]}` + actualServiceReplicasCounterDesc := d.String() + if expectedServiceReplicasCounterDesc != actualServiceReplicasCounterDesc { + t.Errorf("Want %s, got: %s", expectedServiceReplicasCounterDesc, actualServiceReplicasCounterDesc) + } +} + +func Test_Collect_CollectsTheNumberOfReplicasOfAService(t *testing.T) { + metricsOptions := BuildMetricsOptions() + exporter := NewExporter(metricsOptions) + + expectedService := requests.Function{ + Name: "function_with_two_replica", + Replicas: 2, + } + + exporter.services = []requests.Function{expectedService} + + ch := make(chan prometheus.Metric) + defer close(ch) + + go exporter.Collect(ch) + + g := (<-ch).(prometheus.Gauge) + result := readGauge(g) + if expectedService.Name != result.labels["function_name"] { + t.Errorf("Want %s, got %s", expectedService.Name, result.labels["function_name"]) + } + expectedReplicas := float64(expectedService.Replicas) + if expectedReplicas != result.value { + t.Errorf("Want %f, got %f", expectedReplicas, result.value) + } +} diff --git a/gateway/metrics/metrics.go b/gateway/metrics/metrics.go index 2cb3da9a..d4da4679 100644 --- a/gateway/metrics/metrics.go +++ b/gateway/metrics/metrics.go @@ -54,8 +54,6 @@ func BuildMetricsOptions() MetricOptions { } //RegisterMetrics registers with Prometheus for tracking -func RegisterMetrics(metricsOptions MetricOptions) { - prometheus.Register(metricsOptions.GatewayFunctionInvocation) - prometheus.Register(metricsOptions.GatewayFunctionsHistogram) - prometheus.Register(metricsOptions.ServiceReplicasCounter) +func RegisterExporter(exporter *Exporter) { + prometheus.MustRegister(exporter) } diff --git a/gateway/server.go b/gateway/server.go index e879d771..af5f297b 100644 --- a/gateway/server.go +++ b/gateway/server.go @@ -47,13 +47,15 @@ func main() { } } - metricsOptions := metrics.BuildMetricsOptions() - metrics.RegisterMetrics(metricsOptions) - var faasHandlers types.HandlerSet servicePollInterval := time.Second * 5 + metricsOptions := metrics.BuildMetricsOptions() + exporter := metrics.NewExporter(metricsOptions) + exporter.StartServiceWatcher(*config.FunctionsProviderURL, metricsOptions, "func", servicePollInterval) + metrics.RegisterExporter(exporter) + reverseProxy := types.NewHTTPClientReverseProxy(config.FunctionsProviderURL, config.UpstreamTimeout) loggingNotifier := handlers.LoggingNotifier{} @@ -84,8 +86,6 @@ func main() { alertHandler := plugin.NewExternalServiceQuery(*config.FunctionsProviderURL) faasHandlers.Alert = handlers.MakeAlertHandler(alertHandler) - metrics.AttachExternalWatcher(*config.FunctionsProviderURL, metricsOptions, "func", servicePollInterval) - if config.UseNATS() { log.Println("Async enabled: Using NATS Streaming.") natsQueue, queueErr := natsHandler.CreateNatsQueue(*config.NATSAddress, *config.NATSPort, natsHandler.DefaultNatsConfig{})