Fixes for request body passing into text streaming proxy

In the previous version, whilst responses were streamed
correctly, the request body was not being received by
the function. This has been tested, along with adding
a forced timeout according to upstream_timeout, which
was a miss in the original commit.

Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alex@openfaas.com>
This commit is contained in:
Alex Ellis (OpenFaaS Ltd) 2024-01-11 17:40:42 +00:00
parent 4679f27804
commit 5c13f1f01c

View File

@ -5,6 +5,7 @@ package handlers
import (
"context"
"errors"
"fmt"
"io"
"log"
@ -12,6 +13,7 @@ import (
"net/http/httputil"
"os"
"strings"
"sync"
"time"
fhttputil "github.com/openfaas/faas-provider/httputil"
@ -100,9 +102,6 @@ func forwardRequest(w http.ResponseWriter,
}
upstreamReq := buildUpstreamRequest(r, baseURL, requestURL)
if upstreamReq.Body != nil {
defer upstreamReq.Body.Close()
}
if serviceAuthInjector != nil {
serviceAuthInjector.Inject(upstreamReq)
@ -113,9 +112,8 @@ func forwardRequest(w http.ResponseWriter,
}
if strings.HasPrefix(r.Header.Get("Accept"), "text/event-stream") {
ww := fhttputil.NewHttpWriteInterceptor(w)
reverseProxy.ServeHTTP(ww, upstreamReq)
return ww.Status(), nil
return handleEventStream(w, r, reverseProxy, upstreamReq, timeout)
}
ctx, cancel := context.WithTimeout(r.Context(), timeout)
@ -143,6 +141,36 @@ func forwardRequest(w http.ResponseWriter,
return res.StatusCode, nil
}
func handleEventStream(w http.ResponseWriter, r *http.Request, reverseProxy *httputil.ReverseProxy, upstreamReq *http.Request, timeout time.Duration) (int, error) {
ww := fhttputil.NewHttpWriteInterceptor(w)
ctx, cancel := context.WithTimeoutCause(r.Context(), timeout, http.ErrHandlerTimeout)
defer cancel()
r = r.WithContext(ctx)
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer func() {
wg.Done()
if r := recover(); r != nil {
if errors.Is(r.(error), http.ErrAbortHandler) {
log.Printf("Aborted [%s] for: %s", upstreamReq.Method, upstreamReq.URL.Path)
} else {
log.Printf("Recovered from panic in reverseproxy: %v", r)
}
}
}()
reverseProxy.ServeHTTP(ww, r)
}()
wg.Wait()
return ww.Status(), nil
}
func copyHeaders(destination http.Header, source *http.Header) {
for k, v := range *source {
vClone := make([]string, len(v))
@ -176,12 +204,22 @@ var hopHeaders = []string{
}
func makeRewriteProxy(baseURLResolver middleware.BaseURLResolver, urlPathTransformer middleware.URLPathTransformer) *httputil.ReverseProxy {
return &httputil.ReverseProxy{
ErrorLog: log.New(io.Discard, "proxy:", 0),
Transport: http.DefaultClient.Transport,
ErrorLog: log.New(io.Discard, "proxy:", 0),
ErrorHandler: func(w http.ResponseWriter, r *http.Request, err error) {
},
Director: func(r *http.Request) {
baseURL := baseURLResolver.Resolve(r)
baseURLu, _ := r.URL.Parse(baseURL)
requestURL := urlPathTransformer.Transform(r)
r.URL.Scheme = "http"
r.URL.Path = requestURL
r.URL.Host = baseURLu.Host
},
}
}