Query Prometheus API for stats.

Signed-off-by: Alex Ellis <alexellis2@gmail.com>
This commit is contained in:
Alex Ellis 2017-09-07 09:15:27 +01:00
parent 820bea13d2
commit 5339fdcdbe
6 changed files with 292 additions and 19 deletions

View File

@ -16,8 +16,6 @@ import (
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/client"
"github.com/prometheus/client_golang/prometheus"
io_prometheus_client "github.com/prometheus/client_model/go"
)
// MakeFunctionReader gives a summary of Function structs with Docker service stats overlaid with Prometheus counters.
@ -41,8 +39,10 @@ func MakeFunctionReader(metricsOptions metrics.MetricOptions, c *client.Client)
for _, service := range services {
if len(service.Spec.TaskTemplate.ContainerSpec.Labels["function"]) > 0 {
invocations := getCounterValue(service.Spec.Name, "200", &metricsOptions) +
getCounterValue(service.Spec.Name, "500", &metricsOptions)
// Ping counters
// getCounterValue(service.Spec.Name, "200", &metricsOptions)
// getCounterValue(service.Spec.Name, "500", &metricsOptions)
var envProcess string
@ -55,7 +55,7 @@ func MakeFunctionReader(metricsOptions metrics.MetricOptions, c *client.Client)
f := requests.Function{
Name: service.Spec.Name,
Image: service.Spec.TaskTemplate.ContainerSpec.Image,
InvocationCount: invocations,
InvocationCount: 0,
Replicas: *service.Spec.Mode.Replicated.Replicas,
EnvProcess: envProcess,
}
@ -71,18 +71,18 @@ func MakeFunctionReader(metricsOptions metrics.MetricOptions, c *client.Client)
}
}
func getCounterValue(service string, code string, metricsOptions *metrics.MetricOptions) float64 {
// func getCounterValue(service string, code string, metricsOptions *metrics.MetricOptions) float64 {
metric, err := metricsOptions.GatewayFunctionInvocation.
GetMetricWith(prometheus.Labels{"function_name": service, "code": code})
// metric, err := metricsOptions.GatewayFunctionInvocation.
// GetMetricWith(prometheus.Labels{"function_name": service, "code": code})
if err != nil {
return 0
}
// if err != nil {
// return 0
// }
// Get the metric's value from ProtoBuf interface (idea via Julius Volz)
var protoMetric io_prometheus_client.Metric
metric.Write(&protoMetric)
invocations := protoMetric.GetCounter().GetValue()
return invocations
}
// // Get the metric's value from ProtoBuf interface (idea via Julius Volz)
// var protoMetric io_prometheus_client.Metric
// metric.Write(&protoMetric)
// invocations := protoMetric.GetCounter().GetValue()
// return invocations
// }

View File

@ -0,0 +1,110 @@
package metrics
import (
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"net/http/httptest"
"strconv"
"github.com/alexellis/faas/gateway/requests"
)
func makeClient() http.Client {
// Fine-tune the client to fail fast.
return http.Client{}
}
// AddMetricsHandler wraps a http.HandlerFunc with Prometheus metrics
func AddMetricsHandler(handler http.HandlerFunc, host string, port int) http.HandlerFunc {
client := makeClient()
prometheusQuery := NewPrometheusQuery(host, port, &client)
return func(w http.ResponseWriter, r *http.Request) {
// log.Printf("Calling upstream for function info\n")
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, r)
upstreamCall := recorder.Result()
if upstreamCall.Body == nil {
return
}
defer upstreamCall.Body.Close()
if recorder.Code != http.StatusOK {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(fmt.Sprintf("Error pulling metrics from provider/backend. Status code: %d", recorder.Code)))
return
}
upstreamBody, _ := ioutil.ReadAll(upstreamCall.Body)
var functions []requests.Function
err := json.Unmarshal(upstreamBody, &functions)
if err != nil {
log.Println(err)
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte("Error parsing metrics from upstream provider/backend."))
return
}
// log.Printf("Querying Prometheus API\n")
// `sum(gateway_function_invocation_total{function_name=~".*", code=~".*"}) by (function_name, code)`)
expr := "sum(gateway_function_invocation_total%7Bfunction_name%3D~%22.*%22%2C+code%3D~%22.*%22%7D)+by+(function_name%2C+code)"
results, fetchErr := prometheusQuery.Fetch(expr)
if fetchErr != nil {
log.Printf("Error querying Prometheus API: %s\n", fetchErr.Error())
w.WriteHeader(http.StatusOK)
w.Write(upstreamBody)
return
}
mixIn(&functions, results)
bytesOut, marshalErr := json.Marshal(functions)
if marshalErr != nil {
log.Println(marshalErr)
return
}
// log.Printf("Writing bytesOut: %s\n", bytesOut)
w.WriteHeader(http.StatusOK)
w.Write(bytesOut)
}
}
func mixIn(functions *[]requests.Function, metrics *VectorQueryResponse) {
if functions == nil {
return
}
// Ensure values are empty first.
for i := range *functions {
(*functions)[i].InvocationCount = 0
}
for i, function := range *functions {
for _, v := range metrics.Data.Result {
if v.Metric.FunctionName == function.Name {
metricValue := v.Value[1]
switch metricValue.(type) {
case string:
// log.Println("String")
f, strconvErr := strconv.ParseFloat(metricValue.(string), 64)
if strconvErr != nil {
log.Printf("Unable to convert value for metric: %s\n", strconvErr)
continue
}
(*functions)[i].InvocationCount += f
break
}
}
}
}
}

View File

@ -0,0 +1,63 @@
package metrics
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
)
// PrometheusQuery a PrometheusQuery
type PrometheusQuery struct {
Port int
Host string
Client *http.Client
}
// NewPrometheusQuery create a NewPrometheusQuery
func NewPrometheusQuery(host string, port int, client *http.Client) PrometheusQuery {
return PrometheusQuery{
Client: client,
Host: host,
Port: port,
}
}
// Fetch queries aggregated stats
func (q *PrometheusQuery) Fetch(query string) (*VectorQueryResponse, error) {
req, reqErr := http.NewRequest("GET", fmt.Sprintf("http://%s:%d/api/v1/query/?query=%s", q.Host, q.Port, query), nil)
if reqErr != nil {
return nil, reqErr
}
res, getErr := q.Client.Do(req)
if getErr != nil {
return nil, getErr
}
defer res.Body.Close()
bytesOut, readErr := ioutil.ReadAll(res.Body)
if readErr != nil {
return nil, readErr
}
var values VectorQueryResponse
unmarshalErr := json.Unmarshal(bytesOut, &values)
if unmarshalErr != nil {
return nil, unmarshalErr
}
return &values, nil
}
type VectorQueryResponse struct {
Data struct {
Result []struct {
Metric struct {
Code string `json:"code"`
FunctionName string `json:"function_name"`
}
Value []interface{} `json:"value"`
}
}
}

View File

@ -114,6 +114,8 @@ func main() {
faasHandlers.AsyncReport = internalHandlers.MakeAsyncReport(metricsOptions)
}
listFunctions := metrics.AddMetricsHandler(faasHandlers.ListFunctions, config.PrometheusHost, config.PrometheusPort)
r := mux.NewRouter()
// r.StrictSlash(false) // This didn't work, so register routes twice.
@ -121,7 +123,7 @@ func main() {
r.HandleFunc("/function/{name:[-a-zA-Z_0-9]+}/", faasHandlers.Proxy)
r.HandleFunc("/system/alert", faasHandlers.Alert)
r.HandleFunc("/system/functions", faasHandlers.ListFunctions).Methods("GET")
r.HandleFunc("/system/functions", listFunctions).Methods("GET")
r.HandleFunc("/system/functions", faasHandlers.DeployFunction).Methods("POST")
r.HandleFunc("/system/functions", faasHandlers.DeleteFunction).Methods("DELETE")

View File

@ -73,3 +73,81 @@ func TestRead_ReadAndWriteTimeoutConfig(t *testing.T) {
t.Fail()
}
}
func TestRead_UseNATSDefaultsToOff(t *testing.T) {
defaults := NewEnvBucket()
readConfig := types.ReadConfig{}
config := readConfig.Read(defaults)
if config.UseNATS() == true {
t.Log("NATS is supposed to be off by default")
t.Fail()
}
}
func TestRead_UseNATS(t *testing.T) {
defaults := NewEnvBucket()
defaults.Setenv("faas_nats_address", "nats")
defaults.Setenv("faas_nats_port", "6222")
readConfig := types.ReadConfig{}
config := readConfig.Read(defaults)
if config.UseNATS() == false {
t.Log("NATS was requested in config, but not enabled.")
t.Fail()
}
}
func TestRead_UseNATSBadPort(t *testing.T) {
defaults := NewEnvBucket()
defaults.Setenv("faas_nats_address", "nats")
defaults.Setenv("faas_nats_port", "6fff")
readConfig := types.ReadConfig{}
config := readConfig.Read(defaults)
if config.UseNATS() == true {
t.Log("NATS had bad config, should not be enabled.")
t.Fail()
}
}
func TestRead_PrometheusNonDefaults(t *testing.T) {
defaults := NewEnvBucket()
defaults.Setenv("faas_prometheus_host", "prom1")
defaults.Setenv("faas_prometheus_port", "9999")
readConfig := types.ReadConfig{}
config := readConfig.Read(defaults)
if config.PrometheusHost != "prom1" {
t.Logf("config.PrometheusHost, want: %s, got: %s\n", "prom1", config.PrometheusHost)
t.Fail()
}
if config.PrometheusPort != 9999 {
t.Logf("config.PrometheusHost, want: %d, got: %d\n", 9999, config.PrometheusPort)
t.Fail()
}
}
func TestRead_PrometheusDefaults(t *testing.T) {
defaults := NewEnvBucket()
readConfig := types.ReadConfig{}
config := readConfig.Read(defaults)
if config.PrometheusHost != "prometheus" {
t.Logf("config.PrometheusHost, want: %s, got: %s\n", "prometheus", config.PrometheusHost)
t.Fail()
}
if config.PrometheusPort != 9090 {
t.Logf("config.PrometheusHost, want: %d, got: %d\n", 9090, config.PrometheusPort)
t.Fail()
}
}

View File

@ -48,7 +48,10 @@ func parseIntValue(val string, fallback int) int {
// Read fetches config from environmental variables.
func (ReadConfig) Read(hasEnv HasEnv) GatewayConfig {
cfg := GatewayConfig{}
cfg := GatewayConfig{
PrometheusHost: "prometheus",
PrometheusPort: 9090,
}
readTimeout := parseIntValue(hasEnv.Getenv("read_timeout"), 8)
writeTimeout := parseIntValue(hasEnv.Getenv("write_timeout"), 8)
@ -79,6 +82,21 @@ func (ReadConfig) Read(hasEnv HasEnv) GatewayConfig {
}
}
prometheusPort := hasEnv.Getenv("faas_prometheus_port")
if len(prometheusPort) > 0 {
prometheusPortVal, err := strconv.Atoi(prometheusPort)
if err != nil {
log.Println("Invalid port for faas_prometheus_port")
} else {
cfg.PrometheusPort = prometheusPortVal
}
}
prometheusHost := hasEnv.Getenv("faas_prometheus_host")
if len(prometheusHost) > 0 {
cfg.PrometheusHost = prometheusHost
}
return cfg
}
@ -89,6 +107,8 @@ type GatewayConfig struct {
FunctionsProviderURL *url.URL
NATSAddress *string
NATSPort *int
PrometheusHost string
PrometheusPort int
}
// UseNATS Use NATSor not