Extract separate files/packages for better visibility.

This commit is contained in:
Alex Ellis 2017-01-21 15:50:19 +00:00
parent 8da30eecaa
commit b9bd7c8101
7 changed files with 209 additions and 188 deletions

View File

@ -5,7 +5,7 @@ services:
- "/var/run/docker.sock:/var/run/docker.sock" - "/var/run/docker.sock:/var/run/docker.sock"
ports: ports:
- 8080:8080 - 8080:8080
image: alexellis2/faas-gateway:latest-dev1 image: alexellis2/faas-gateway:latest-dev
networks: networks:
- functions - functions
deploy: deploy:

View File

@ -10,7 +10,9 @@ RUN go get -d github.com/docker/docker/api/types \
WORKDIR /go/src/github.com/alexellis/faas/gateway WORKDIR /go/src/github.com/alexellis/faas/gateway
COPY metrics metrics COPY metrics metrics
COPY requests requests
COPY server.go . COPY server.go .
COPY proxy.go .
RUN find /go/src/github.com/alexellis/faas/gateway/ RUN find /go/src/github.com/alexellis/faas/gateway/
RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o app . RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o app .

View File

@ -18,3 +18,41 @@ type MetricOptions struct {
func PrometheusHandler() http.Handler { func PrometheusHandler() http.Handler {
return prometheus.Handler() return prometheus.Handler()
} }
func BuildMetricsOptions() MetricOptions {
GatewayRequestsTotal := prometheus.NewCounter(prometheus.CounterOpts{
Name: "gateway_requests_total",
Help: "Total amount of HTTP requests to the gateway",
})
GatewayServerlessServedTotal := prometheus.NewCounter(prometheus.CounterOpts{
Name: "gateway_serverless_invocation_total",
Help: "Total amount of serverless function invocations",
})
GatewayFunctions := prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "gateway_functions",
Help: "Gateway functions",
})
GatewayFunctionInvocation := prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "gateway_function_invocation_total",
Help: "Individual function metrics",
},
[]string{"function_name"},
)
metricsOptions := MetricOptions{
GatewayRequestsTotal: GatewayRequestsTotal,
GatewayServerlessServedTotal: GatewayServerlessServedTotal,
GatewayFunctions: GatewayFunctions,
GatewayFunctionInvocation: GatewayFunctionInvocation,
}
return metricsOptions
}
func RegisterMetrics(metricsOptions MetricOptions) {
prometheus.Register(metricsOptions.GatewayRequestsTotal)
prometheus.Register(metricsOptions.GatewayServerlessServedTotal)
prometheus.Register(metricsOptions.GatewayFunctions)
prometheus.Register(metricsOptions.GatewayFunctionInvocation)
}

132
gateway/proxy.go Normal file
View File

@ -0,0 +1,132 @@
package main
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"strconv"
"strings"
"time"
"github.com/alexellis/faas/gateway/metrics"
"github.com/alexellis/faas/gateway/requests"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/client"
"github.com/gorilla/mux"
)
// MakeProxy creates a proxy for HTTP web requests which can be routed to a function.
func MakeProxy(metrics metrics.MetricOptions, wildcard bool, c *client.Client) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
metrics.GatewayRequestsTotal.Inc()
if r.Method == "POST" {
log.Println(r.Header)
header := r.Header["X-Function"]
log.Println(header)
fmt.Println(wildcard)
if wildcard == true {
vars := mux.Vars(r)
name := vars["name"]
fmt.Println("invoke by name")
lookupInvoke(w, r, metrics, name, c)
} else if len(header) > 0 {
lookupInvoke(w, r, metrics, header[0], c)
} else {
requestBody, _ := ioutil.ReadAll(r.Body)
alexaService := isAlexa(requestBody)
fmt.Println(alexaService)
if len(alexaService.Session.SessionId) > 0 &&
len(alexaService.Session.Application.ApplicationId) > 0 &&
len(alexaService.Request.Intent.Name) > 0 {
fmt.Println("Alexa SDK request found")
fmt.Printf("SessionId=%s, Intent=%s, AppId=%s\n", alexaService.Session.SessionId, alexaService.Request.Intent.Name, alexaService.Session.Application.ApplicationId)
invokeService(w, r, metrics, alexaService.Request.Intent.Name, requestBody)
} else {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte("Provide an x-function header or a valid Alexa SDK request."))
}
}
}
}
}
func isAlexa(requestBody []byte) requests.AlexaRequestBody {
body := requests.AlexaRequestBody{}
buf := bytes.NewBuffer(requestBody)
fmt.Println(buf)
str := buf.String()
parts := strings.Split(str, "sessionId")
if len(parts) > 1 {
json.Unmarshal(requestBody, &body)
}
return body
}
func lookupInvoke(w http.ResponseWriter, r *http.Request, metrics metrics.MetricOptions, name string, c *client.Client) {
exists, err := lookupSwarmService(name, c)
if err != nil || exists == false {
if err != nil {
log.Fatalln(err)
}
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte("Error resolving service."))
}
if exists == true {
requestBody, _ := ioutil.ReadAll(r.Body)
invokeService(w, r, metrics, name, requestBody)
}
}
func lookupSwarmService(serviceName string, c *client.Client) (bool, error) {
fmt.Printf("Resolving: '%s'\n", serviceName)
serviceFilter := filters.NewArgs()
serviceFilter.Add("name", serviceName)
services, err := c.ServiceList(context.Background(), types.ServiceListOptions{Filters: serviceFilter})
return len(services) > 0, err
}
func invokeService(w http.ResponseWriter, r *http.Request, metrics metrics.MetricOptions, service string, requestBody []byte) {
metrics.GatewayFunctionInvocation.WithLabelValues(service).Add(1)
stamp := strconv.FormatInt(time.Now().Unix(), 10)
start := time.Now()
buf := bytes.NewBuffer(requestBody)
url := "http://" + service + ":" + strconv.Itoa(8080) + "/"
fmt.Printf("[%s] Forwarding request to: %s\n", stamp, url)
response, err := http.Post(url, "text/plain", buf)
if err != nil {
log.Println(err)
w.WriteHeader(500)
buf := bytes.NewBufferString("Can't reach service: " + service)
w.Write(buf.Bytes())
return
}
responseBody, readErr := ioutil.ReadAll(response.Body)
if readErr != nil {
fmt.Println(readErr)
w.WriteHeader(500)
buf := bytes.NewBufferString("Error reading response from service: " + service)
w.Write(buf.Bytes())
return
}
w.WriteHeader(http.StatusOK)
w.Write(responseBody)
seconds := time.Since(start).Seconds()
fmt.Printf("[%s] took %f seconds\n", stamp, seconds)
metrics.GatewayServerlessServedTotal.Inc()
metrics.GatewayFunctions.Observe(seconds)
}

View File

@ -0,0 +1,23 @@
package requests
type AlexaSessionApplication struct {
ApplicationId string `json:"applicationId"`
}
type AlexaSession struct {
SessionId string `json:"sessionId"`
Application AlexaSessionApplication `json:"application"`
}
type AlexaIntent struct {
Name string `json:"name"`
}
type AlexaRequest struct {
Intent AlexaIntent `json:"intent"`
}
type AlexaRequestBody struct {
Session AlexaSession `json:"session"`
Request AlexaRequest `json:"request"`
}

View File

@ -1,159 +1,16 @@
package main package main
import ( import (
"bytes"
"context"
"fmt" "fmt"
"log" "log"
"net/http" "net/http"
"strconv"
"strings"
"time" "time"
"io/ioutil"
"encoding/json"
"github.com/alexellis/faas/gateway/metrics" "github.com/alexellis/faas/gateway/metrics"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/client" "github.com/docker/docker/client"
"github.com/gorilla/mux" "github.com/gorilla/mux"
"github.com/prometheus/client_golang/prometheus"
) )
type AlexaSessionApplication struct {
ApplicationId string `json:"applicationId"`
}
type AlexaSession struct {
SessionId string `json:"sessionId"`
Application AlexaSessionApplication `json:"application"`
}
type AlexaIntent struct {
Name string `json:"name"`
}
type AlexaRequest struct {
Intent AlexaIntent `json:"intent"`
}
type AlexaRequestBody struct {
Session AlexaSession `json:"session"`
Request AlexaRequest `json:"request"`
}
func lookupSwarmService(serviceName string, c *client.Client) (bool, error) {
fmt.Printf("Resolving: '%s'\n", serviceName)
serviceFilter := filters.NewArgs()
serviceFilter.Add("name", serviceName)
services, err := c.ServiceList(context.Background(), types.ServiceListOptions{Filters: serviceFilter})
return len(services) > 0, err
}
func isAlexa(requestBody []byte) AlexaRequestBody {
body := AlexaRequestBody{}
buf := bytes.NewBuffer(requestBody)
fmt.Println(buf)
str := buf.String()
parts := strings.Split(str, "sessionId")
if len(parts) > 1 {
json.Unmarshal(requestBody, &body)
}
return body
}
func invokeService(w http.ResponseWriter, r *http.Request, metrics metrics.MetricOptions, service string, requestBody []byte) {
metrics.GatewayFunctionInvocation.WithLabelValues(service).Add(1)
stamp := strconv.FormatInt(time.Now().Unix(), 10)
start := time.Now()
buf := bytes.NewBuffer(requestBody)
url := "http://" + service + ":" + strconv.Itoa(8080) + "/"
fmt.Printf("[%s] Forwarding request to: %s\n", stamp, url)
response, err := http.Post(url, "text/plain", buf)
if err != nil {
log.Println(err)
w.WriteHeader(500)
buf := bytes.NewBufferString("Can't reach service: " + service)
w.Write(buf.Bytes())
return
}
responseBody, readErr := ioutil.ReadAll(response.Body)
if readErr != nil {
fmt.Println(readErr)
w.WriteHeader(500)
buf := bytes.NewBufferString("Error reading response from service: " + service)
w.Write(buf.Bytes())
return
}
w.WriteHeader(http.StatusOK)
w.Write(responseBody)
seconds := time.Since(start).Seconds()
fmt.Printf("[%s] took %f seconds\n", stamp, seconds)
metrics.GatewayServerlessServedTotal.Inc()
metrics.GatewayFunctions.Observe(seconds)
}
func lookupInvoke(w http.ResponseWriter, r *http.Request, metrics metrics.MetricOptions, name string, c *client.Client) {
exists, err := lookupSwarmService(name, c)
if err != nil || exists == false {
if err != nil {
log.Fatalln(err)
}
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte("Error resolving service."))
}
if exists == true {
requestBody, _ := ioutil.ReadAll(r.Body)
invokeService(w, r, metrics, name, requestBody)
}
}
func makeProxy(metrics metrics.MetricOptions, wildcard bool, c *client.Client) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
metrics.GatewayRequestsTotal.Inc()
if r.Method == "POST" {
log.Println(r.Header)
header := r.Header["X-Function"]
log.Println(header)
fmt.Println(wildcard)
if wildcard == true {
vars := mux.Vars(r)
name := vars["name"]
fmt.Println("invoke by name")
lookupInvoke(w, r, metrics, name, c)
} else if len(header) > 0 {
lookupInvoke(w, r, metrics, header[0], c)
} else {
requestBody, _ := ioutil.ReadAll(r.Body)
alexaService := isAlexa(requestBody)
fmt.Println(alexaService)
if len(alexaService.Session.SessionId) > 0 &&
len(alexaService.Session.Application.ApplicationId) > 0 &&
len(alexaService.Request.Intent.Name) > 0 {
fmt.Println("Alexa SDK request found")
fmt.Printf("SessionId=%s, Intent=%s, AppId=%s\n", alexaService.Session.SessionId, alexaService.Request.Intent.Name, alexaService.Session.Application.ApplicationId)
invokeService(w, r, metrics, alexaService.Request.Intent.Name, requestBody)
} else {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte("Provide an x-function header or a valid Alexa SDK request."))
}
}
}
}
}
func makeAlertHandler(c *client.Client) http.HandlerFunc { func makeAlertHandler(c *client.Client) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
fmt.Println(c) fmt.Println(c)
@ -164,51 +21,20 @@ func makeAlertHandler(c *client.Client) http.HandlerFunc {
} }
func main() { func main() {
GatewayRequestsTotal := prometheus.NewCounter(prometheus.CounterOpts{ var dockerClient *client.Client
Name: "gateway_requests_total",
Help: "Total amount of HTTP requests to the gateway",
})
GatewayServerlessServedTotal := prometheus.NewCounter(prometheus.CounterOpts{
Name: "gateway_serverless_invocation_total",
Help: "Total amount of serverless function invocations",
})
GatewayFunctions := prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "gateway_functions",
Help: "Gateway functions",
})
GatewayFunctionInvocation := prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "gateway_function_invocation_total",
Help: "Individual function metrics",
},
[]string{"function_name"},
)
prometheus.Register(GatewayRequestsTotal)
prometheus.Register(GatewayServerlessServedTotal)
prometheus.Register(GatewayFunctions)
prometheus.Register(GatewayFunctionInvocation)
metricsOptions := metrics.MetricOptions{
GatewayRequestsTotal: GatewayRequestsTotal,
GatewayServerlessServedTotal: GatewayServerlessServedTotal,
GatewayFunctions: GatewayFunctions,
GatewayFunctionInvocation: GatewayFunctionInvocation,
}
var c *client.Client
var err error var err error
c, err = client.NewEnvClient() dockerClient, err = client.NewEnvClient()
if err != nil { if err != nil {
log.Fatal("Error with Docker client.") log.Fatal("Error with Docker client.")
} }
metricsOptions := metrics.BuildMetricsOptions()
metrics.RegisterMetrics(metricsOptions)
r := mux.NewRouter() r := mux.NewRouter()
r.HandleFunc("/function/{name:[a-zA-Z_]+}", makeProxy(metricsOptions, true, c)) r.HandleFunc("/function/{name:[a-zA-Z_]+}", MakeProxy(metricsOptions, true, dockerClient))
r.HandleFunc("/system/alert", makeAlertHandler(dockerClient))
r.HandleFunc("/system/alert", makeAlertHandler(c)) r.HandleFunc("/", MakeProxy(metricsOptions, false, dockerClient))
r.HandleFunc("/", makeProxy(metricsOptions, false, c))
metricsHandler := metrics.PrometheusHandler() metricsHandler := metrics.PrometheusHandler()
r.Handle("/metrics", metricsHandler) r.Handle("/metrics", metricsHandler)

View File

@ -4,12 +4,12 @@ ALERT service_down
ALERT APIHighInvocationRate ALERT APIHighInvocationRate
IF rate ( gateway_function_invocation_total [10s] ) > 5 IF rate ( gateway_function_invocation_total [10s] ) > 5
FOR 30s FOR 30s
ANNOTATIONS {
summary = "High invocation total on {{ $labels.instance }}",
description = "High invocation total on {{ $labels.instance }}",
}
LABELS { LABELS {
service = "gateway", service = "gateway",
severity = "major", severity = "major",
value = "{{$value}}", value = "{{$value}}"
} }
ANNOTATIONS {
summary = "High invocation total on {{ $labels.instance }}",
description = "High invocation total on {{ $labels.instance }}"
}