From 30928739ee9cb98af4464036556b5947271e61b1 Mon Sep 17 00:00:00 2001 From: Alex Ellis Date: Sun, 4 Mar 2018 09:07:49 +0000 Subject: [PATCH] Use context for upstream timeouts Signed-off-by: Alex Ellis --- gateway/handlers/alerthandler.go | 8 +-- gateway/handlers/forwarding_proxy.go | 98 +++++++++++++++++----------- gateway/handlers/service_query.go | 10 ++- gateway/plugin/external.go | 23 ++++--- gateway/types/proxy_client.go | 3 + 5 files changed, 91 insertions(+), 51 deletions(-) diff --git a/gateway/handlers/alerthandler.go b/gateway/handlers/alerthandler.go index 114a98db..58fd3be9 100644 --- a/gateway/handlers/alerthandler.go +++ b/gateway/handlers/alerthandler.go @@ -83,14 +83,14 @@ func scaleService(alert requests.PrometheusInnerAlert, service ServiceQuery) err serviceName := alert.Labels.FunctionName if len(serviceName) > 0 { - currentReplicas, maxReplicas, minReplicas, getErr := service.GetReplicas(serviceName) + queryResponse, getErr := service.GetReplicas(serviceName) if getErr == nil { status := alert.Status - newReplicas := CalculateReplicas(status, currentReplicas, uint64(maxReplicas), minReplicas) + newReplicas := CalculateReplicas(status, queryResponse.Replicas, uint64(queryResponse.MaxReplicas), queryResponse.MinReplicas) - log.Printf("[Scale] function=%s %d => %d.\n", serviceName, currentReplicas, newReplicas) - if newReplicas == currentReplicas { + log.Printf("[Scale] function=%s %d => %d.\n", serviceName, queryResponse.Replicas, newReplicas) + if newReplicas == queryResponse.Replicas { return nil } diff --git a/gateway/handlers/forwarding_proxy.go b/gateway/handlers/forwarding_proxy.go index 693eba54..f1003fd9 100644 --- a/gateway/handlers/forwarding_proxy.go +++ b/gateway/handlers/forwarding_proxy.go @@ -1,6 +1,7 @@ package handlers import ( + "context" "io" "log" "net/http" @@ -23,65 +24,86 @@ func MakeForwardingProxyHandler(proxy *types.HTTPClientReverseProxy, metrics *me return func(w http.ResponseWriter, r *http.Request) { requestURL := r.URL.String() + serviceName := getServiceName(requestURL) log.Printf("> Forwarding [%s] to %s", r.Method, requestURL) + start := time.Now() - upstreamReq, _ := http.NewRequest(r.Method, baseURL+requestURL, nil) - - upstreamReq.Header["X-Forwarded-For"] = []string{r.RequestURI} - - if r.Body != nil { - defer r.Body.Close() - upstreamReq.Body = r.Body + statusCode, err := forwardRequest(w, r, proxy.Client, baseURL, requestURL, proxy.Timeout) + if err != nil { + log.Printf("error with upstream request to: %s, %s\n", requestURL, err.Error()) } - res, resErr := proxy.Client.Do(upstreamReq) - if resErr != nil { - log.Printf("upstream client error: %s\n", resErr) - return - } - - if res.Body != nil { - defer res.Body.Close() - } - - // Populate any headers received - for k, v := range res.Header { - w.Header()[k] = v - } - - // Write status code - w.WriteHeader(res.StatusCode) - - // Copy the body over - io.CopyBuffer(w, res.Body, nil) - seconds := time.Since(start).Seconds() log.Printf("< [%s] - %d took %f seconds\n", r.URL.String(), - res.StatusCode, seconds) - - forward := "/function/" - if startsWith(requestURL, forward) { - // log.Printf("function=%s", uri[len(forward):]) - - service := requestURL[len(forward):] + statusCode, seconds) + if len(serviceName) > 0 { metrics.GatewayFunctionsHistogram. - WithLabelValues(service). + WithLabelValues(serviceName). Observe(seconds) - code := strconv.Itoa(res.StatusCode) + code := strconv.Itoa(statusCode) metrics.GatewayFunctionInvocation. - With(prometheus.Labels{"function_name": service, "code": code}). + With(prometheus.Labels{"function_name": serviceName, "code": code}). Inc() } } } +func forwardRequest(w http.ResponseWriter, r *http.Request, proxyClient *http.Client, baseURL string, requestURL string, timeout time.Duration) (int, error) { + + upstreamReq, _ := http.NewRequest(r.Method, baseURL+requestURL, nil) + + upstreamReq.Header["X-Forwarded-For"] = []string{r.RequestURI} + + if r.Body != nil { + defer r.Body.Close() + upstreamReq.Body = r.Body + } + ctx, cancel := context.WithTimeout(context.Background(), timeout-time.Second*1) + defer cancel() + + res, resErr := proxyClient.Do(upstreamReq.WithContext(ctx)) + if resErr != nil { + badStatus := http.StatusBadGateway + w.WriteHeader(badStatus) + return badStatus, resErr + } + + if res.Body != nil { + defer res.Body.Close() + } + + // Populate any headers received + for k, v := range res.Header { + w.Header()[k] = v + } + + // Write status code + w.WriteHeader(res.StatusCode) + + if res.Body != nil { + // Copy the body over + io.CopyBuffer(w, res.Body, nil) + } + + return res.StatusCode, nil +} + +func getServiceName(urlValue string) string { + var serviceName string + forward := "/function/" + if startsWith(urlValue, forward) { + serviceName = urlValue[len(forward):] + } + return serviceName +} + func startsWith(value, token string) bool { return len(value) > len(token) && strings.Index(value, token) == 0 } diff --git a/gateway/handlers/service_query.go b/gateway/handlers/service_query.go index 6655e9c8..c8e34dca 100644 --- a/gateway/handlers/service_query.go +++ b/gateway/handlers/service_query.go @@ -2,6 +2,14 @@ package handlers // ServiceQuery provides interface for replica querying/setting type ServiceQuery interface { - GetReplicas(service string) (currentReplicas uint64, maxReplicas uint64, minReplicas uint64, err error) + GetReplicas(service string) (response ServiceQueryResponse, err error) SetReplicas(service string, count uint64) error } + +// ServiceQueryResponse response from querying a function status +type ServiceQueryResponse struct { + Replicas uint64 + MaxReplicas uint64 + MinReplicas uint64 + AvailableReplicas uint64 +} diff --git a/gateway/plugin/external.go b/gateway/plugin/external.go index dde3bf74..18398b27 100644 --- a/gateway/plugin/external.go +++ b/gateway/plugin/external.go @@ -49,9 +49,16 @@ type ExternalServiceQuery struct { ProxyClient http.Client } +// ScaleServiceRequest request scaling of replica +type ScaleServiceRequest struct { + ServiceName string `json:"serviceName"` + Replicas uint64 `json:"replicas"` +} + // GetReplicas replica count for function -func (s ExternalServiceQuery) GetReplicas(serviceName string) (uint64, uint64, uint64, error) { +func (s ExternalServiceQuery) GetReplicas(serviceName string) (handlers.ServiceQueryResponse, error) { var err error + function := requests.Function{} urlPath := fmt.Sprintf("%ssystem/function/%s", s.URL.String(), serviceName) @@ -80,6 +87,7 @@ func (s ExternalServiceQuery) GetReplicas(serviceName string) (uint64, uint64, u maxReplicas := uint64(handlers.DefaultMaxReplicas) minReplicas := uint64(1) + availableReplicas := function.AvailableReplicas if function.Labels != nil { labels := *function.Labels @@ -105,13 +113,12 @@ func (s ExternalServiceQuery) GetReplicas(serviceName string) (uint64, uint64, u } } - return function.Replicas, maxReplicas, minReplicas, err -} - -// ScaleServiceRequest request scaling of replica -type ScaleServiceRequest struct { - ServiceName string `json:"serviceName"` - Replicas uint64 `json:"replicas"` + return handlers.ServiceQueryResponse{ + Replicas: function.Replicas, + MaxReplicas: maxReplicas, + MinReplicas: minReplicas, + AvailableReplicas: availableReplicas, + }, err } // SetReplicas update the replica count diff --git a/gateway/types/proxy_client.go b/gateway/types/proxy_client.go index 495c164d..2d4c5776 100644 --- a/gateway/types/proxy_client.go +++ b/gateway/types/proxy_client.go @@ -14,6 +14,7 @@ import ( func NewHTTPClientReverseProxy(baseURL *url.URL, timeout time.Duration) *HTTPClientReverseProxy { h := HTTPClientReverseProxy{ BaseURL: baseURL, + Timeout: timeout, } h.Client = &http.Client{ @@ -26,6 +27,7 @@ func NewHTTPClientReverseProxy(baseURL *url.URL, timeout time.Duration) *HTTPCli IdleConnTimeout: 120 * time.Millisecond, ExpectContinueTimeout: 1500 * time.Millisecond, }, + Timeout: timeout, } return &h } @@ -34,4 +36,5 @@ func NewHTTPClientReverseProxy(baseURL *url.URL, timeout time.Duration) *HTTPCli type HTTPClientReverseProxy struct { BaseURL *url.URL Client *http.Client + Timeout time.Duration }