faas/gateway/handlers/forwarding_proxy.go
Alex Ellis 0c7e59fe8a Add direct_functions mode to gateway for tuning
Adds a pair of configuration options for performance tuning. The
gateway can now invoke functions directly and can bypass the
provider. See updated table in README.md for configuration values.

BaseURLResolver is added with unit tests that decouples resolving
upstream URL from the reverse proxy client code.

- SingleHostBaseURLResolver resolves a single upstream host
- FunctionAsHostBaseURLResolver resolves host based upon conventions
within the URL of the request to a function for direct access

Tested with Kubernetes (faas-netes) and faas-swarm through UI, CLI
calling system endpoints and functions directly.

Signed-off-by: Alex Ellis (VMware) <alexellis2@gmail.com>
2018-03-23 16:35:37 +00:00

169 lines
4.2 KiB
Go

package handlers
import (
"context"
"fmt"
"io"
"log"
"net/http"
"strconv"
"strings"
"time"
"github.com/openfaas/faas/gateway/metrics"
"github.com/openfaas/faas/gateway/types"
"github.com/prometheus/client_golang/prometheus"
)
// HTTPNotifier notify about HTTP request/response
type HTTPNotifier interface {
Notify(method string, URL string, statusCode int, duration time.Duration)
}
// BaseURLResolver URL resolver for upstream requests
type BaseURLResolver interface {
Resolve(r *http.Request) string
}
// MakeForwardingProxyHandler create a handler which forwards HTTP requests
func MakeForwardingProxyHandler(proxy *types.HTTPClientReverseProxy, notifiers []HTTPNotifier, baseURLResolver BaseURLResolver) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
baseURL := baseURLResolver.Resolve(r)
requestURL := r.URL.Path
start := time.Now()
statusCode, err := forwardRequest(w, r, proxy.Client, baseURL, requestURL, proxy.Timeout)
seconds := time.Since(start)
if err != nil {
log.Printf("error with upstream request to: %s, %s\n", requestURL, err.Error())
}
for _, notifier := range notifiers {
notifier.Notify(r.Method, requestURL, statusCode, seconds)
}
}
}
func forwardRequest(w http.ResponseWriter, r *http.Request, proxyClient *http.Client, baseURL string, requestURL string, timeout time.Duration) (int, error) {
upstreamReq, _ := http.NewRequest(r.Method, baseURL+requestURL, nil)
copyHeaders(upstreamReq.Header, &r.Header)
upstreamReq.Header["X-Forwarded-For"] = []string{r.RemoteAddr}
if r.Body != nil {
defer r.Body.Close()
upstreamReq.Body = r.Body
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
res, resErr := proxyClient.Do(upstreamReq.WithContext(ctx))
if resErr != nil {
badStatus := http.StatusBadGateway
w.WriteHeader(badStatus)
return badStatus, resErr
}
if res.Body != nil {
defer res.Body.Close()
}
copyHeaders(w.Header(), &res.Header)
// Write status code
w.WriteHeader(res.StatusCode)
if res.Body != nil {
// Copy the body over
io.CopyBuffer(w, res.Body, nil)
}
return res.StatusCode, nil
}
func copyHeaders(destination http.Header, source *http.Header) {
for k, v := range *source {
vClone := make([]string, len(v))
copy(vClone, v)
(destination)[k] = vClone
}
}
// PrometheusFunctionNotifier records metrics to Prometheus
type PrometheusFunctionNotifier struct {
Metrics *metrics.MetricOptions
}
// Notify records metrics in Prometheus
func (p PrometheusFunctionNotifier) Notify(method string, URL string, statusCode int, duration time.Duration) {
seconds := duration.Seconds()
serviceName := getServiceName(URL)
p.Metrics.GatewayFunctionsHistogram.
WithLabelValues(serviceName).
Observe(seconds)
code := strconv.Itoa(statusCode)
p.Metrics.GatewayFunctionInvocation.
With(prometheus.Labels{"function_name": serviceName, "code": code}).
Inc()
}
func getServiceName(urlValue string) string {
var serviceName string
forward := "/function/"
if strings.HasPrefix(urlValue, forward) {
serviceName = urlValue[len(forward):]
}
return serviceName
}
// LoggingNotifier notifies a log about a request
type LoggingNotifier struct {
}
// Notify a log about a request
func (LoggingNotifier) Notify(method string, URL string, statusCode int, duration time.Duration) {
log.Printf("Forwarded [%s] to %s - [%d] - %f seconds", method, URL, statusCode, duration.Seconds())
}
// SingleHostBaseURLResolver resolves URLs against a single BaseURL
type SingleHostBaseURLResolver struct {
BaseURL string
}
// Resolve the base URL for a request
func (s SingleHostBaseURLResolver) Resolve(r *http.Request) string {
baseURL := s.BaseURL
if strings.HasSuffix(baseURL, "/") {
baseURL = baseURL[0 : len(baseURL)-1]
}
return baseURL
}
// FunctionAsHostBaseURLResolver resolves URLs using a function from the URL as a host
type FunctionAsHostBaseURLResolver struct {
FunctionSuffix string
}
// Resolve the base URL for a request
func (f FunctionAsHostBaseURLResolver) Resolve(r *http.Request) string {
svcName := getServiceName(r.URL.Path)
const watchdogPort = 8080
var suffix string
if len(f.FunctionSuffix) > 0 {
suffix = "." + f.FunctionSuffix
}
return fmt.Sprintf("http://%s%s:%d", svcName, suffix, watchdogPort)
}