From 501e813d41d0e6acd7b8c27bfc8f45c44e94c5eb Mon Sep 17 00:00:00 2001 From: Alex Ellis Date: Wed, 25 Jan 2017 22:29:17 +0000 Subject: [PATCH] Move handlers into ./handlers --- gateway/Dockerfile | 1 + gateway/Dockerfile.build | 7 +- gateway/handlers/alerthandler.go | 89 +++++++++++++++ gateway/handlers/functionshandler.go | 56 ++++++++++ gateway/{ => handlers}/proxy.go | 36 +++--- gateway/server.go | 141 ++---------------------- gateway/tests/alexhostname_request.json | 24 ++++ gateway/tests/isalexa_test.go | 27 +++++ gateway/tests/unmarshall_test.go | 5 +- 9 files changed, 233 insertions(+), 153 deletions(-) create mode 100644 gateway/handlers/alerthandler.go create mode 100644 gateway/handlers/functionshandler.go rename gateway/{ => handlers}/proxy.go (81%) create mode 100644 gateway/tests/alexhostname_request.json create mode 100644 gateway/tests/isalexa_test.go diff --git a/gateway/Dockerfile b/gateway/Dockerfile index d397da87..a0f506c5 100644 --- a/gateway/Dockerfile +++ b/gateway/Dockerfile @@ -7,6 +7,7 @@ ENV http_proxy "" ENV https_proxy "" COPY gateway . + COPY assets assets CMD ["./gateway"] diff --git a/gateway/Dockerfile.build b/gateway/Dockerfile.build index f0b78f6e..7a85b529 100644 --- a/gateway/Dockerfile.build +++ b/gateway/Dockerfile.build @@ -6,14 +6,15 @@ RUN go get -d github.com/docker/docker/api/types \ && go get -d github.com/docker/docker/client \ && go get github.com/gorilla/mux \ && go get github.com/prometheus/client_golang/prometheus -# RUN go get -d github.com/prometheus/client_model/go +RUN go get -d github.com/Sirupsen/logrus WORKDIR /go/src/github.com/alexellis/faas/gateway COPY metrics metrics COPY requests requests -COPY tests tests +COPY tests tests +COPY handlers handlers + COPY server.go . -COPY proxy.go . RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o app . diff --git a/gateway/handlers/alerthandler.go b/gateway/handlers/alerthandler.go new file mode 100644 index 00000000..0017954b --- /dev/null +++ b/gateway/handlers/alerthandler.go @@ -0,0 +1,89 @@ +package handlers + +import ( + "context" + "encoding/json" + "io/ioutil" + "log" + "net/http" + + "github.com/alexellis/faas/gateway/requests" + "github.com/docker/docker/api/types" + "github.com/docker/docker/client" +) + +func scaleService(req requests.PrometheusAlert, c *client.Client) error { + var err error + //Todo: convert to loop / handler. + serviceName := req.Alerts[0].Labels.FunctionName + service, _, inspectErr := c.ServiceInspectWithRaw(context.Background(), serviceName) + if inspectErr == nil { + var replicas uint64 + + if req.Status == "firing" { + if *service.Spec.Mode.Replicated.Replicas < 20 { + replicas = *service.Spec.Mode.Replicated.Replicas + uint64(5) + } else { + return err + } + } else { // Resolved event. + // Previously decremented by 5, but event only fires once, so set to 1/1. + if *service.Spec.Mode.Replicated.Replicas > 1 { + // replicas = *service.Spec.Mode.Replicated.Replicas - uint64(5) + // if replicas < 1 { + // replicas = 1 + // } + // return nil + + replicas = 1 + } else { + return nil + } + } + + log.Printf("Scaling %s to %d replicas.\n", serviceName, replicas) + + service.Spec.Mode.Replicated.Replicas = &replicas + updateOpts := types.ServiceUpdateOptions{} + updateOpts.RegistryAuthFrom = types.RegistryAuthFromSpec + + response, updateErr := c.ServiceUpdate(context.Background(), service.ID, service.Version, service.Spec, updateOpts) + if updateErr != nil { + err = updateErr + } + log.Println(response) + + } else { + err = inspectErr + } + + return err +} + +func MakeAlertHandler(c *client.Client) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + log.Println("Alert received.") + body, readErr := ioutil.ReadAll(r.Body) + if readErr != nil { + log.Println(readErr) + return + } + + var req requests.PrometheusAlert + err := json.Unmarshal(body, &req) + if err != nil { + log.Println(err) + return + } + + if len(req.Alerts) > 0 { + err := scaleService(req, c) + if err != nil { + log.Println(err) + w.WriteHeader(http.StatusInternalServerError) + } else { + w.WriteHeader(http.StatusOK) + } + } + } +} diff --git a/gateway/handlers/functionshandler.go b/gateway/handlers/functionshandler.go new file mode 100644 index 00000000..e2c377db --- /dev/null +++ b/gateway/handlers/functionshandler.go @@ -0,0 +1,56 @@ +package handlers + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + + "github.com/alexellis/faas/gateway/metrics" + "github.com/alexellis/faas/gateway/requests" + "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/filters" + "github.com/docker/docker/client" + io_prometheus_client "github.com/prometheus/client_model/go" +) + +// MakeFunctionReader gives a summary of Function structs with Docker service stats overlaid with Prometheus counters. +func MakeFunctionReader(metricsOptions metrics.MetricOptions, c *client.Client) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + + serviceFilter := filters.NewArgs() + + options := types.ServiceListOptions{ + Filters: serviceFilter, + } + + services, err := c.ServiceList(context.Background(), options) + if err != nil { + fmt.Println(err) + } + + // TODO: Filter only "faas" functions (via metadata?) + functions := make([]requests.Function, 0) + for _, service := range services { + counter, _ := metricsOptions.GatewayFunctionInvocation.GetMetricWithLabelValues(service.Spec.Name) + + // Get the metric's value from ProtoBuf interface (idea via Julius Volz) + var protoMetric io_prometheus_client.Metric + counter.Write(&protoMetric) + invocations := protoMetric.GetCounter().GetValue() + + f := requests.Function{ + Name: service.Spec.Name, + Image: service.Spec.TaskTemplate.ContainerSpec.Image, + InvocationCount: invocations, + Replicas: *service.Spec.Mode.Replicated.Replicas, + } + functions = append(functions, f) + } + + functionBytes, _ := json.Marshal(functions) + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(200) + w.Write(functionBytes) + } +} diff --git a/gateway/proxy.go b/gateway/handlers/proxy.go similarity index 81% rename from gateway/proxy.go rename to gateway/handlers/proxy.go index a426f843..fecb7c2c 100644 --- a/gateway/proxy.go +++ b/gateway/handlers/proxy.go @@ -1,4 +1,4 @@ -package main +package handlers import ( "bytes" @@ -6,12 +6,12 @@ import ( "encoding/json" "fmt" "io/ioutil" - "log" "net/http" "strconv" "strings" "time" + "github.com/Sirupsen/logrus" "github.com/alexellis/faas/gateway/metrics" "github.com/alexellis/faas/gateway/requests" "github.com/docker/docker/api/types" @@ -20,27 +20,26 @@ import ( "github.com/gorilla/mux" ) -// MakeProxy creates a proxy for HTTP web requests which can be routed to a function. -func MakeProxy(metrics metrics.MetricOptions, wildcard bool, c *client.Client) http.HandlerFunc { +// makeProxy creates a proxy for HTTP web requests which can be routed to a function. +func MakeProxy(metrics metrics.MetricOptions, wildcard bool, c *client.Client, logger *logrus.Logger) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { metrics.GatewayRequestsTotal.Inc() if r.Method == "POST" { - log.Println(r.Header) + logger.Infoln(r.Header) header := r.Header["X-Function"] - log.Println(header) - // fmt.Println(wildcard) + logger.Infoln(header) if wildcard == true { vars := mux.Vars(r) name := vars["name"] fmt.Println("invoke by name") - lookupInvoke(w, r, metrics, name, c) + lookupInvoke(w, r, metrics, name, c, logger) } else if len(header) > 0 { - lookupInvoke(w, r, metrics, header[0], c) + lookupInvoke(w, r, metrics, header[0], c, logger) } else { requestBody, _ := ioutil.ReadAll(r.Body) - alexaService := isAlexa(requestBody) + alexaService := IsAlexa(requestBody) fmt.Println(alexaService) if len(alexaService.Session.SessionId) > 0 && @@ -50,7 +49,7 @@ func MakeProxy(metrics metrics.MetricOptions, wildcard bool, c *client.Client) h fmt.Println("Alexa SDK request found") fmt.Printf("SessionId=%s, Intent=%s, AppId=%s\n", alexaService.Session.SessionId, alexaService.Request.Intent.Name, alexaService.Session.Application.ApplicationId) - invokeService(w, r, metrics, alexaService.Request.Intent.Name, requestBody) + invokeService(w, r, metrics, alexaService.Request.Intent.Name, requestBody, logger) } else { w.WriteHeader(http.StatusBadRequest) w.Write([]byte("Provide an x-function header or a valid Alexa SDK request.")) @@ -60,10 +59,10 @@ func MakeProxy(metrics metrics.MetricOptions, wildcard bool, c *client.Client) h } } -func isAlexa(requestBody []byte) requests.AlexaRequestBody { +func IsAlexa(requestBody []byte) requests.AlexaRequestBody { body := requests.AlexaRequestBody{} buf := bytes.NewBuffer(requestBody) - fmt.Println(buf) + // fmt.Println(buf) str := buf.String() parts := strings.Split(str, "sessionId") if len(parts) > 1 { @@ -72,18 +71,18 @@ func isAlexa(requestBody []byte) requests.AlexaRequestBody { return body } -func lookupInvoke(w http.ResponseWriter, r *http.Request, metrics metrics.MetricOptions, name string, c *client.Client) { +func lookupInvoke(w http.ResponseWriter, r *http.Request, metrics metrics.MetricOptions, name string, c *client.Client, logger *logrus.Logger) { exists, err := lookupSwarmService(name, c) if err != nil || exists == false { if err != nil { - log.Fatalln(err) + logger.Fatalln(err) } w.WriteHeader(http.StatusInternalServerError) w.Write([]byte("Error resolving service.")) } if exists == true { requestBody, _ := ioutil.ReadAll(r.Body) - invokeService(w, r, metrics, name, requestBody) + invokeService(w, r, metrics, name, requestBody, logger) } } @@ -96,7 +95,7 @@ func lookupSwarmService(serviceName string, c *client.Client) (bool, error) { return len(services) > 0, err } -func invokeService(w http.ResponseWriter, r *http.Request, metrics metrics.MetricOptions, service string, requestBody []byte) { +func invokeService(w http.ResponseWriter, r *http.Request, metrics metrics.MetricOptions, service string, requestBody []byte, logger *logrus.Logger) { metrics.GatewayFunctionInvocation.WithLabelValues(service).Add(1) stamp := strconv.FormatInt(time.Now().Unix(), 10) @@ -105,9 +104,10 @@ func invokeService(w http.ResponseWriter, r *http.Request, metrics metrics.Metri buf := bytes.NewBuffer(requestBody) url := "http://" + service + ":" + strconv.Itoa(8080) + "/" fmt.Printf("[%s] Forwarding request to: %s\n", stamp, url) + response, err := http.Post(url, "text/plain", buf) if err != nil { - log.Println(err) + logger.Infoln(err) w.WriteHeader(500) buf := bytes.NewBufferString("Can't reach service: " + service) w.Write(buf.Bytes()) diff --git a/gateway/server.go b/gateway/server.go index ef39e65d..2616410c 100644 --- a/gateway/server.go +++ b/gateway/server.go @@ -2,140 +2,21 @@ package main import ( "context" - "encoding/json" - "fmt" - "io/ioutil" "log" "net/http" "time" + "github.com/Sirupsen/logrus" + faashandlers "github.com/alexellis/faas/gateway/handlers" "github.com/alexellis/faas/gateway/metrics" - "github.com/alexellis/faas/gateway/requests" - "github.com/docker/docker/api/types" - "github.com/docker/docker/api/types/filters" "github.com/docker/docker/client" "github.com/gorilla/mux" - io_prometheus_client "github.com/prometheus/client_model/go" ) -func scaleService(req requests.PrometheusAlert, c *client.Client) error { - var err error - //Todo: convert to loop / handler. - serviceName := req.Alerts[0].Labels.FunctionName - service, _, inspectErr := c.ServiceInspectWithRaw(context.Background(), serviceName) - if inspectErr == nil { - var replicas uint64 - - if req.Status == "firing" { - if *service.Spec.Mode.Replicated.Replicas < 20 { - replicas = *service.Spec.Mode.Replicated.Replicas + uint64(5) - } else { - return err - } - } else { // Resolved event. - // Previously decremented by 5, but event only fires once, so set to 1/1. - if *service.Spec.Mode.Replicated.Replicas > 1 { - // replicas = *service.Spec.Mode.Replicated.Replicas - uint64(5) - // if replicas < 1 { - // replicas = 1 - // } - // return nil - - replicas = 1 - } else { - return nil - } - } - - log.Printf("Scaling %s to %d replicas.\n", serviceName, replicas) - - service.Spec.Mode.Replicated.Replicas = &replicas - updateOpts := types.ServiceUpdateOptions{} - updateOpts.RegistryAuthFrom = types.RegistryAuthFromSpec - - response, updateErr := c.ServiceUpdate(context.Background(), service.ID, service.Version, service.Spec, updateOpts) - if updateErr != nil { - err = updateErr - } - log.Println(response) - - } else { - err = inspectErr - } - - return err -} - -func makeAlertHandler(c *client.Client) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - log.Println("Alert received.") - body, readErr := ioutil.ReadAll(r.Body) - if readErr != nil { - log.Println(readErr) - return - } - - var req requests.PrometheusAlert - err := json.Unmarshal(body, &req) - if err != nil { - log.Println(err) - return - } - - if len(req.Alerts) > 0 { - err := scaleService(req, c) - if err != nil { - log.Println(err) - w.WriteHeader(http.StatusInternalServerError) - } else { - w.WriteHeader(http.StatusOK) - } - } - } -} - -// makeFunctionReader gives a summary of Function structs with Docker service stats overlaid with Prometheus counters. -func makeFunctionReader(metricsOptions metrics.MetricOptions, c *client.Client) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - - serviceFilter := filters.NewArgs() - - options := types.ServiceListOptions{ - Filters: serviceFilter, - } - - services, err := c.ServiceList(context.Background(), options) - if err != nil { - fmt.Println(err) - } - - // TODO: Filter only "faas" functions (via metadata?) - functions := make([]requests.Function, 0) - for _, service := range services { - counter, _ := metricsOptions.GatewayFunctionInvocation.GetMetricWithLabelValues(service.Spec.Name) - - // Get the metric's value from ProtoBuf interface (idea via Julius Volz) - var protoMetric io_prometheus_client.Metric - counter.Write(&protoMetric) - invocations := protoMetric.GetCounter().GetValue() - - f := requests.Function{ - Name: service.Spec.Name, - Image: service.Spec.TaskTemplate.ContainerSpec.Image, - InvocationCount: invocations, - Replicas: *service.Spec.Mode.Replicated.Replicas, - } - functions = append(functions, f) - } - - functionBytes, _ := json.Marshal(functions) - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(200) - w.Write(functionBytes) - } -} - func main() { + logger := logrus.Logger{} + logrus.SetFormatter(&logrus.TextFormatter{}) + var dockerClient *client.Client var err error dockerClient, err = client.NewEnvClient() @@ -146,16 +27,18 @@ func main() { if err != nil { log.Fatal("Error with Docker server.\n", err) } - log.Println("API version: %s, %s\n", dockerVersion.APIVersion, dockerVersion.Version) + log.Printf("API version: %s, %s\n", dockerVersion.APIVersion, dockerVersion.Version) metricsOptions := metrics.BuildMetricsOptions() metrics.RegisterMetrics(metricsOptions) r := mux.NewRouter() - r.HandleFunc("/function/{name:[a-zA-Z_]+}", MakeProxy(metricsOptions, true, dockerClient)) - r.HandleFunc("/system/alert", makeAlertHandler(dockerClient)) - r.HandleFunc("/system/functions", makeFunctionReader(metricsOptions, dockerClient)).Methods("GET") - r.HandleFunc("/", MakeProxy(metricsOptions, false, dockerClient)).Methods("POST") + r.HandleFunc("/function/{name:[a-zA-Z_]+}", faashandlers.MakeProxy(metricsOptions, true, dockerClient, &logger)) + + r.HandleFunc("/system/alert", faashandlers.MakeAlertHandler(dockerClient)) + r.HandleFunc("/system/functions", faashandlers.MakeFunctionReader(metricsOptions, dockerClient)).Methods("GET") + + r.HandleFunc("/", faashandlers.MakeProxy(metricsOptions, false, dockerClient, &logger)).Methods("POST") metricsHandler := metrics.PrometheusHandler() r.Handle("/metrics", metricsHandler) diff --git a/gateway/tests/alexhostname_request.json b/gateway/tests/alexhostname_request.json new file mode 100644 index 00000000..e7953654 --- /dev/null +++ b/gateway/tests/alexhostname_request.json @@ -0,0 +1,24 @@ +{ + "session": { + "sessionId": "SessionId.ea96e58d-dc16-43e1-b238-daac4541110c", + "application": { + "applicationId": "amzn1.ask.skill.72fb1025-aacc-4d05-a582-21344940c023" + }, + "attributes": {}, + "user": { + "userId": "amzn1.ask.account.AEN7KA5DBXAAWQPDUXTXFWBARZ5YZ6TNOQR5CUMV5LCCJTMBZVFP45SZVLGDD5GQBOM7QMELRS7LHG3F2FN2QQQMTBURDL5I4PQ33EHMNNGO4TXWG732Y6SDM2YZKHSPWIIWBH3GSE3Q3TTFAYN2Y66RHBKRANYCNMX2WORMASUGVRHUNBB4HZMJEC7HQDWUSXAOMP77WGJU4AY" + }, + "new": true + }, + "request": { + "type": "IntentRequest", + "requestId": "EdwRequestId.a934104e-3282-4620-b056-4aa4c5995503", + "locale": "en-GB", + "timestamp": "2016-12-07T15:50:01Z", + "intent": { + "name": "HostnameIntent", + "slots": {} + } + }, + "version": "1.0" +} \ No newline at end of file diff --git a/gateway/tests/isalexa_test.go b/gateway/tests/isalexa_test.go new file mode 100644 index 00000000..89c86370 --- /dev/null +++ b/gateway/tests/isalexa_test.go @@ -0,0 +1,27 @@ +package tests + +import ( + "testing" + + "io/ioutil" + + "github.com/alexellis/faas/gateway/handlers" + "github.com/alexellis/faas/gateway/requests" +) + +func TestIsAlexa(t *testing.T) { + requestBody, _ := ioutil.ReadFile("./alexhostname_request.json") + var result requests.AlexaRequestBody + + result = handlers.IsAlexa(requestBody) + + if len(result.Session.Application.ApplicationId) == 0 { + t.Fail() + } + if len(result.Session.SessionId) == 0 { + t.Fail() + } + if len(result.Request.Intent.Name) == 0 { + t.Fail() + } +} diff --git a/gateway/tests/unmarshall_test.go b/gateway/tests/unmarshall_test.go index 4fa52288..5a1c675d 100644 --- a/gateway/tests/unmarshall_test.go +++ b/gateway/tests/unmarshall_test.go @@ -1,7 +1,6 @@ -package requests +package tests import ( - "fmt" "testing" "io/ioutil" @@ -11,6 +10,7 @@ import ( "github.com/alexellis/faas/gateway/requests" ) +// TestUnmarshallAlert is an exploratory test from TDD'ing the struct to parse a Prometheus alert func TestUnmarshallAlert(t *testing.T) { file, _ := ioutil.ReadFile("./test_alert.json") @@ -19,7 +19,6 @@ func TestUnmarshallAlert(t *testing.T) { if err != nil { t.Fatal(err) } - fmt.Println("OK", string(file), alert) if (len(alert.Status)) == 0 { t.Fatal("No status read") }