Error handling around Docker socket, refactor scaling to separate method

This commit is contained in:
Alex 2017-01-23 09:19:52 +00:00
parent 2759aaf849
commit 0dfafe99c5
2 changed files with 66 additions and 43 deletions

View File

@ -17,6 +17,7 @@ type AlexaRequest struct {
Intent AlexaIntent `json:"intent"` Intent AlexaIntent `json:"intent"`
} }
// AlexaRequestBody top-level request produced by Alexa SDK
type AlexaRequestBody struct { type AlexaRequestBody struct {
Session AlexaSession `json:"session"` Session AlexaSession `json:"session"`
Request AlexaRequest `json:"request"` Request AlexaRequest `json:"request"`
@ -32,8 +33,17 @@ type PrometheusInnerAlert struct {
Labels PrometheusInnerAlertLabel `json:"labels"` Labels PrometheusInnerAlertLabel `json:"labels"`
} }
// PrometheusAlert as produced by AlertManager
type PrometheusAlert struct { type PrometheusAlert struct {
Status string `json:"status"` Status string `json:"status"`
Receiver string `json:"receiver"` Receiver string `json:"receiver"`
Alerts []PrometheusInnerAlert `json:"alerts"` Alerts []PrometheusInnerAlert `json:"alerts"`
} }
// Function exported for system/functions endpoint
type Function struct {
Name string `json:"name"`
Image string `json:"image"`
InvocationCount float64 `json:"invocationCount"`
Replicas uint64 `json:"replicas"`
}

View File

@ -18,12 +18,49 @@ import (
io_prometheus_client "github.com/prometheus/client_model/go" io_prometheus_client "github.com/prometheus/client_model/go"
) )
func scaleService(req requests.PrometheusAlert, c *client.Client) error {
var err error
//Todo: convert to loop / handler.
serviceName := req.Alerts[0].Labels.FunctionName
service, _, inspectErr := c.ServiceInspectWithRaw(context.Background(), serviceName)
if inspectErr != nil {
var replicas uint64
if req.Status == "firing" {
if *service.Spec.Mode.Replicated.Replicas < 20 {
replicas = *service.Spec.Mode.Replicated.Replicas + uint64(5)
} else {
return err
}
} else {
replicas = *service.Spec.Mode.Replicated.Replicas - uint64(5)
if replicas <= 0 {
replicas = 1
}
}
log.Printf("Scaling %s to %d replicas.\n", serviceName, replicas)
service.Spec.Mode.Replicated.Replicas = &replicas
updateOpts := types.ServiceUpdateOptions{}
updateOpts.RegistryAuthFrom = types.RegistryAuthFromSpec
response, updateErr := c.ServiceUpdate(context.Background(), service.ID, service.Version, service.Spec, updateOpts)
if updateErr != nil {
err = updateErr
}
log.Println(response)
} else {
err = inspectErr
}
return err
}
func makeAlertHandler(c *client.Client) http.HandlerFunc { func makeAlertHandler(c *client.Client) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
log.Println("Alert received.") log.Println("Alert received.")
body, _ := ioutil.ReadAll(r.Body) body, _ := ioutil.ReadAll(r.Body)
fmt.Println(string(body))
// Todo: parse alert, validate alert and scale up or down function
var req requests.PrometheusAlert var req requests.PrometheusAlert
err := json.Unmarshal(body, &req) err := json.Unmarshal(body, &req)
@ -32,47 +69,18 @@ func makeAlertHandler(c *client.Client) http.HandlerFunc {
} }
if len(req.Alerts) > 0 { if len(req.Alerts) > 0 {
serviceName := req.Alerts[0].Labels.FunctionName err := scaleService(req, c)
service, _, _ := c.ServiceInspectWithRaw(context.Background(), serviceName) if err != nil {
var replicas uint64 log.Println(err)
if req.Status == "firing" {
if *service.Spec.Mode.Replicated.Replicas < 20 {
replicas = *service.Spec.Mode.Replicated.Replicas + uint64(5)
} else {
return
}
} else {
replicas = *service.Spec.Mode.Replicated.Replicas - uint64(5)
if replicas <= 0 {
replicas = 1
}
}
log.Printf("Scaling %s to %d replicas.\n", serviceName, replicas)
service.Spec.Mode.Replicated.Replicas = &replicas
updateOpts := types.ServiceUpdateOptions{}
updateOpts.RegistryAuthFrom = types.RegistryAuthFromSpec
response, updateErr := c.ServiceUpdate(context.Background(), service.ID, service.Version, service.Spec, updateOpts)
if updateErr != nil {
w.WriteHeader(http.StatusInternalServerError) w.WriteHeader(http.StatusInternalServerError)
log.Println(response) } else {
w.WriteHeader(http.StatusOK)
} }
w.WriteHeader(http.StatusOK)
} }
} }
} }
// Function exported for system/functions endpoint // makeFunctionReader gives a summary of Function structs with Docker service stats overlaid with Prometheus counters.
type Function struct {
Name string `json:"name"`
Image string `json:"image"`
InvocationCount float64 `json:"invocationCount"`
Replicas uint64 `json:"replicas"`
}
func makeFunctionReader(metricsOptions metrics.MetricOptions, c *client.Client) http.HandlerFunc { func makeFunctionReader(metricsOptions metrics.MetricOptions, c *client.Client) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
@ -88,16 +96,16 @@ func makeFunctionReader(metricsOptions metrics.MetricOptions, c *client.Client)
} }
// TODO: Filter only "faas" functions (via metadata?) // TODO: Filter only "faas" functions (via metadata?)
functions := make([]requests.Function, 0)
functions := make([]Function, 0)
for _, service := range services { for _, service := range services {
counter, _ := metricsOptions.GatewayFunctionInvocation.GetMetricWithLabelValues(service.Spec.Name) counter, _ := metricsOptions.GatewayFunctionInvocation.GetMetricWithLabelValues(service.Spec.Name)
var pbmetric io_prometheus_client.Metric // Get the metric's value from ProtoBuf interface (idea via Julius Volz)
counter.Write(&pbmetric) var protoMetric io_prometheus_client.Metric
invocations := pbmetric.GetCounter().GetValue() counter.Write(&protoMetric)
invocations := protoMetric.GetCounter().GetValue()
f := Function{ f := requests.Function{
Name: service.Spec.Name, Name: service.Spec.Name,
Image: service.Spec.TaskTemplate.ContainerSpec.Image, Image: service.Spec.TaskTemplate.ContainerSpec.Image,
InvocationCount: invocations, InvocationCount: invocations,
@ -120,6 +128,11 @@ func main() {
if err != nil { if err != nil {
log.Fatal("Error with Docker client.") log.Fatal("Error with Docker client.")
} }
dockerVersion, err := dockerClient.ServerVersion(context.Background())
if err != nil {
log.Fatal("Error with Docker server.\n", err)
}
log.Println("API version: %s, %s\n", dockerVersion.APIVersion, dockerVersion.Version)
metricsOptions := metrics.BuildMetricsOptions() metricsOptions := metrics.BuildMetricsOptions()
metrics.RegisterMetrics(metricsOptions) metrics.RegisterMetrics(metricsOptions)