From 5339fdcdbe10e103ebc1a6529edd1c0a01155547 Mon Sep 17 00:00:00 2001 From: Alex Ellis Date: Thu, 7 Sep 2017 09:15:27 +0100 Subject: [PATCH] Query Prometheus API for stats. Signed-off-by: Alex Ellis --- gateway/handlers/reader.go | 34 ++++----- gateway/metrics/add_metrics.go | 110 ++++++++++++++++++++++++++++ gateway/metrics/prometheus_query.go | 63 ++++++++++++++++ gateway/server.go | 4 +- gateway/tests/config_test.go | 78 ++++++++++++++++++++ gateway/types/readconfig.go | 22 +++++- 6 files changed, 292 insertions(+), 19 deletions(-) create mode 100644 gateway/metrics/add_metrics.go create mode 100644 gateway/metrics/prometheus_query.go diff --git a/gateway/handlers/reader.go b/gateway/handlers/reader.go index e28bd84c..cf18a804 100644 --- a/gateway/handlers/reader.go +++ b/gateway/handlers/reader.go @@ -16,8 +16,6 @@ import ( "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/filters" "github.com/docker/docker/client" - "github.com/prometheus/client_golang/prometheus" - io_prometheus_client "github.com/prometheus/client_model/go" ) // MakeFunctionReader gives a summary of Function structs with Docker service stats overlaid with Prometheus counters. @@ -41,8 +39,10 @@ func MakeFunctionReader(metricsOptions metrics.MetricOptions, c *client.Client) for _, service := range services { if len(service.Spec.TaskTemplate.ContainerSpec.Labels["function"]) > 0 { - invocations := getCounterValue(service.Spec.Name, "200", &metricsOptions) + - getCounterValue(service.Spec.Name, "500", &metricsOptions) + + // Ping counters + // getCounterValue(service.Spec.Name, "200", &metricsOptions) + // getCounterValue(service.Spec.Name, "500", &metricsOptions) var envProcess string @@ -55,7 +55,7 @@ func MakeFunctionReader(metricsOptions metrics.MetricOptions, c *client.Client) f := requests.Function{ Name: service.Spec.Name, Image: service.Spec.TaskTemplate.ContainerSpec.Image, - InvocationCount: invocations, + InvocationCount: 0, Replicas: *service.Spec.Mode.Replicated.Replicas, EnvProcess: envProcess, } @@ -71,18 +71,18 @@ func MakeFunctionReader(metricsOptions metrics.MetricOptions, c *client.Client) } } -func getCounterValue(service string, code string, metricsOptions *metrics.MetricOptions) float64 { +// func getCounterValue(service string, code string, metricsOptions *metrics.MetricOptions) float64 { - metric, err := metricsOptions.GatewayFunctionInvocation. - GetMetricWith(prometheus.Labels{"function_name": service, "code": code}) +// metric, err := metricsOptions.GatewayFunctionInvocation. +// GetMetricWith(prometheus.Labels{"function_name": service, "code": code}) - if err != nil { - return 0 - } +// if err != nil { +// return 0 +// } - // Get the metric's value from ProtoBuf interface (idea via Julius Volz) - var protoMetric io_prometheus_client.Metric - metric.Write(&protoMetric) - invocations := protoMetric.GetCounter().GetValue() - return invocations -} +// // Get the metric's value from ProtoBuf interface (idea via Julius Volz) +// var protoMetric io_prometheus_client.Metric +// metric.Write(&protoMetric) +// invocations := protoMetric.GetCounter().GetValue() +// return invocations +// } diff --git a/gateway/metrics/add_metrics.go b/gateway/metrics/add_metrics.go new file mode 100644 index 00000000..20368c4c --- /dev/null +++ b/gateway/metrics/add_metrics.go @@ -0,0 +1,110 @@ +package metrics + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "log" + "net/http" + "net/http/httptest" + "strconv" + + "github.com/alexellis/faas/gateway/requests" +) + +func makeClient() http.Client { + // Fine-tune the client to fail fast. + return http.Client{} +} + +// AddMetricsHandler wraps a http.HandlerFunc with Prometheus metrics +func AddMetricsHandler(handler http.HandlerFunc, host string, port int) http.HandlerFunc { + client := makeClient() + + prometheusQuery := NewPrometheusQuery(host, port, &client) + return func(w http.ResponseWriter, r *http.Request) { + // log.Printf("Calling upstream for function info\n") + + recorder := httptest.NewRecorder() + handler.ServeHTTP(recorder, r) + upstreamCall := recorder.Result() + + if upstreamCall.Body == nil { + return + } + defer upstreamCall.Body.Close() + + if recorder.Code != http.StatusOK { + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte(fmt.Sprintf("Error pulling metrics from provider/backend. Status code: %d", recorder.Code))) + return + } + + upstreamBody, _ := ioutil.ReadAll(upstreamCall.Body) + var functions []requests.Function + + err := json.Unmarshal(upstreamBody, &functions) + if err != nil { + log.Println(err) + + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte("Error parsing metrics from upstream provider/backend.")) + return + } + + // log.Printf("Querying Prometheus API\n") + // `sum(gateway_function_invocation_total{function_name=~".*", code=~".*"}) by (function_name, code)`) + expr := "sum(gateway_function_invocation_total%7Bfunction_name%3D~%22.*%22%2C+code%3D~%22.*%22%7D)+by+(function_name%2C+code)" + results, fetchErr := prometheusQuery.Fetch(expr) + if fetchErr != nil { + log.Printf("Error querying Prometheus API: %s\n", fetchErr.Error()) + + w.WriteHeader(http.StatusOK) + w.Write(upstreamBody) + return + } + + mixIn(&functions, results) + + bytesOut, marshalErr := json.Marshal(functions) + if marshalErr != nil { + log.Println(marshalErr) + return + } + + // log.Printf("Writing bytesOut: %s\n", bytesOut) + w.WriteHeader(http.StatusOK) + w.Write(bytesOut) + } +} + +func mixIn(functions *[]requests.Function, metrics *VectorQueryResponse) { + if functions == nil { + return + } + + // Ensure values are empty first. + for i := range *functions { + (*functions)[i].InvocationCount = 0 + } + + for i, function := range *functions { + for _, v := range metrics.Data.Result { + + if v.Metric.FunctionName == function.Name { + metricValue := v.Value[1] + switch metricValue.(type) { + case string: + // log.Println("String") + f, strconvErr := strconv.ParseFloat(metricValue.(string), 64) + if strconvErr != nil { + log.Printf("Unable to convert value for metric: %s\n", strconvErr) + continue + } + (*functions)[i].InvocationCount += f + break + } + } + } + } +} diff --git a/gateway/metrics/prometheus_query.go b/gateway/metrics/prometheus_query.go new file mode 100644 index 00000000..d6fa0144 --- /dev/null +++ b/gateway/metrics/prometheus_query.go @@ -0,0 +1,63 @@ +package metrics + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "net/http" +) + +// PrometheusQuery a PrometheusQuery +type PrometheusQuery struct { + Port int + Host string + Client *http.Client +} + +// NewPrometheusQuery create a NewPrometheusQuery +func NewPrometheusQuery(host string, port int, client *http.Client) PrometheusQuery { + return PrometheusQuery{ + Client: client, + Host: host, + Port: port, + } +} + +// Fetch queries aggregated stats +func (q *PrometheusQuery) Fetch(query string) (*VectorQueryResponse, error) { + + req, reqErr := http.NewRequest("GET", fmt.Sprintf("http://%s:%d/api/v1/query/?query=%s", q.Host, q.Port, query), nil) + if reqErr != nil { + return nil, reqErr + } + res, getErr := q.Client.Do(req) + if getErr != nil { + return nil, getErr + } + defer res.Body.Close() + bytesOut, readErr := ioutil.ReadAll(res.Body) + if readErr != nil { + return nil, readErr + } + + var values VectorQueryResponse + + unmarshalErr := json.Unmarshal(bytesOut, &values) + if unmarshalErr != nil { + return nil, unmarshalErr + } + + return &values, nil +} + +type VectorQueryResponse struct { + Data struct { + Result []struct { + Metric struct { + Code string `json:"code"` + FunctionName string `json:"function_name"` + } + Value []interface{} `json:"value"` + } + } +} diff --git a/gateway/server.go b/gateway/server.go index 174353e3..0373d63a 100644 --- a/gateway/server.go +++ b/gateway/server.go @@ -114,6 +114,8 @@ func main() { faasHandlers.AsyncReport = internalHandlers.MakeAsyncReport(metricsOptions) } + listFunctions := metrics.AddMetricsHandler(faasHandlers.ListFunctions, config.PrometheusHost, config.PrometheusPort) + r := mux.NewRouter() // r.StrictSlash(false) // This didn't work, so register routes twice. @@ -121,7 +123,7 @@ func main() { r.HandleFunc("/function/{name:[-a-zA-Z_0-9]+}/", faasHandlers.Proxy) r.HandleFunc("/system/alert", faasHandlers.Alert) - r.HandleFunc("/system/functions", faasHandlers.ListFunctions).Methods("GET") + r.HandleFunc("/system/functions", listFunctions).Methods("GET") r.HandleFunc("/system/functions", faasHandlers.DeployFunction).Methods("POST") r.HandleFunc("/system/functions", faasHandlers.DeleteFunction).Methods("DELETE") diff --git a/gateway/tests/config_test.go b/gateway/tests/config_test.go index bd7c918a..86e7e5a7 100644 --- a/gateway/tests/config_test.go +++ b/gateway/tests/config_test.go @@ -73,3 +73,81 @@ func TestRead_ReadAndWriteTimeoutConfig(t *testing.T) { t.Fail() } } + +func TestRead_UseNATSDefaultsToOff(t *testing.T) { + defaults := NewEnvBucket() + readConfig := types.ReadConfig{} + + config := readConfig.Read(defaults) + + if config.UseNATS() == true { + t.Log("NATS is supposed to be off by default") + t.Fail() + } +} + +func TestRead_UseNATS(t *testing.T) { + defaults := NewEnvBucket() + defaults.Setenv("faas_nats_address", "nats") + defaults.Setenv("faas_nats_port", "6222") + readConfig := types.ReadConfig{} + + config := readConfig.Read(defaults) + + if config.UseNATS() == false { + t.Log("NATS was requested in config, but not enabled.") + t.Fail() + } +} + +func TestRead_UseNATSBadPort(t *testing.T) { + + defaults := NewEnvBucket() + defaults.Setenv("faas_nats_address", "nats") + defaults.Setenv("faas_nats_port", "6fff") + readConfig := types.ReadConfig{} + + config := readConfig.Read(defaults) + + if config.UseNATS() == true { + t.Log("NATS had bad config, should not be enabled.") + t.Fail() + } +} + +func TestRead_PrometheusNonDefaults(t *testing.T) { + defaults := NewEnvBucket() + defaults.Setenv("faas_prometheus_host", "prom1") + defaults.Setenv("faas_prometheus_port", "9999") + readConfig := types.ReadConfig{} + + config := readConfig.Read(defaults) + + if config.PrometheusHost != "prom1" { + t.Logf("config.PrometheusHost, want: %s, got: %s\n", "prom1", config.PrometheusHost) + t.Fail() + } + + if config.PrometheusPort != 9999 { + t.Logf("config.PrometheusHost, want: %d, got: %d\n", 9999, config.PrometheusPort) + t.Fail() + } +} + +func TestRead_PrometheusDefaults(t *testing.T) { + defaults := NewEnvBucket() + + readConfig := types.ReadConfig{} + + config := readConfig.Read(defaults) + + if config.PrometheusHost != "prometheus" { + t.Logf("config.PrometheusHost, want: %s, got: %s\n", "prometheus", config.PrometheusHost) + t.Fail() + } + + if config.PrometheusPort != 9090 { + t.Logf("config.PrometheusHost, want: %d, got: %d\n", 9090, config.PrometheusPort) + t.Fail() + } +} diff --git a/gateway/types/readconfig.go b/gateway/types/readconfig.go index 3ace1df6..3a6ceb4a 100644 --- a/gateway/types/readconfig.go +++ b/gateway/types/readconfig.go @@ -48,7 +48,10 @@ func parseIntValue(val string, fallback int) int { // Read fetches config from environmental variables. func (ReadConfig) Read(hasEnv HasEnv) GatewayConfig { - cfg := GatewayConfig{} + cfg := GatewayConfig{ + PrometheusHost: "prometheus", + PrometheusPort: 9090, + } readTimeout := parseIntValue(hasEnv.Getenv("read_timeout"), 8) writeTimeout := parseIntValue(hasEnv.Getenv("write_timeout"), 8) @@ -79,6 +82,21 @@ func (ReadConfig) Read(hasEnv HasEnv) GatewayConfig { } } + prometheusPort := hasEnv.Getenv("faas_prometheus_port") + if len(prometheusPort) > 0 { + prometheusPortVal, err := strconv.Atoi(prometheusPort) + if err != nil { + log.Println("Invalid port for faas_prometheus_port") + } else { + cfg.PrometheusPort = prometheusPortVal + } + } + + prometheusHost := hasEnv.Getenv("faas_prometheus_host") + if len(prometheusHost) > 0 { + cfg.PrometheusHost = prometheusHost + } + return cfg } @@ -89,6 +107,8 @@ type GatewayConfig struct { FunctionsProviderURL *url.URL NATSAddress *string NATSPort *int + PrometheusHost string + PrometheusPort int } // UseNATS Use NATSor not