diff --git a/gateway/handlers/forwarding_proxy.go b/gateway/handlers/forwarding_proxy.go index 64c8f516..d97f99bd 100644 --- a/gateway/handlers/forwarding_proxy.go +++ b/gateway/handlers/forwarding_proxy.go @@ -5,6 +5,7 @@ package handlers import ( "context" + "errors" "fmt" "io" "log" @@ -12,6 +13,7 @@ import ( "net/http/httputil" "os" "strings" + "sync" "time" fhttputil "github.com/openfaas/faas-provider/httputil" @@ -100,9 +102,6 @@ func forwardRequest(w http.ResponseWriter, } upstreamReq := buildUpstreamRequest(r, baseURL, requestURL) - if upstreamReq.Body != nil { - defer upstreamReq.Body.Close() - } if serviceAuthInjector != nil { serviceAuthInjector.Inject(upstreamReq) @@ -113,9 +112,8 @@ func forwardRequest(w http.ResponseWriter, } if strings.HasPrefix(r.Header.Get("Accept"), "text/event-stream") { - ww := fhttputil.NewHttpWriteInterceptor(w) - reverseProxy.ServeHTTP(ww, upstreamReq) - return ww.Status(), nil + + return handleEventStream(w, r, reverseProxy, upstreamReq, timeout) } ctx, cancel := context.WithTimeout(r.Context(), timeout) @@ -143,6 +141,36 @@ func forwardRequest(w http.ResponseWriter, return res.StatusCode, nil } +func handleEventStream(w http.ResponseWriter, r *http.Request, reverseProxy *httputil.ReverseProxy, upstreamReq *http.Request, timeout time.Duration) (int, error) { + ww := fhttputil.NewHttpWriteInterceptor(w) + + ctx, cancel := context.WithTimeoutCause(r.Context(), timeout, http.ErrHandlerTimeout) + defer cancel() + + r = r.WithContext(ctx) + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + defer func() { + wg.Done() + if r := recover(); r != nil { + if errors.Is(r.(error), http.ErrAbortHandler) { + log.Printf("Aborted [%s] for: %s", upstreamReq.Method, upstreamReq.URL.Path) + + } else { + log.Printf("Recovered from panic in reverseproxy: %v", r) + } + } + }() + + reverseProxy.ServeHTTP(ww, r) + }() + + wg.Wait() + + return ww.Status(), nil +} + func copyHeaders(destination http.Header, source *http.Header) { for k, v := range *source { vClone := make([]string, len(v)) @@ -176,12 +204,22 @@ var hopHeaders = []string{ } func makeRewriteProxy(baseURLResolver middleware.BaseURLResolver, urlPathTransformer middleware.URLPathTransformer) *httputil.ReverseProxy { + return &httputil.ReverseProxy{ - ErrorLog: log.New(io.Discard, "proxy:", 0), - Transport: http.DefaultClient.Transport, + + ErrorLog: log.New(io.Discard, "proxy:", 0), ErrorHandler: func(w http.ResponseWriter, r *http.Request, err error) { }, Director: func(r *http.Request) { + + baseURL := baseURLResolver.Resolve(r) + baseURLu, _ := r.URL.Parse(baseURL) + + requestURL := urlPathTransformer.Transform(r) + + r.URL.Scheme = "http" + r.URL.Path = requestURL + r.URL.Host = baseURLu.Host }, } }