mirror of
https://github.com/openfaas/faas.git
synced 2025-06-22 23:03:24 +00:00
Support streaming responses from functions
Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alex@openfaas.com>
This commit is contained in:
committed by
Alex Ellis
parent
02205b8b19
commit
4679f27804
@ -9,9 +9,12 @@ import (
|
||||
"io"
|
||||
"log"
|
||||
"net/http"
|
||||
"net/http/httputil"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
fhttputil "github.com/openfaas/faas-provider/httputil"
|
||||
"github.com/openfaas/faas/gateway/pkg/middleware"
|
||||
"github.com/openfaas/faas/gateway/types"
|
||||
)
|
||||
@ -28,7 +31,10 @@ func MakeForwardingProxyHandler(proxy *types.HTTPClientReverseProxy,
|
||||
writeRequestURI = exists
|
||||
}
|
||||
|
||||
reverseProxy := makeRewriteProxy(baseURLResolver, urlPathTransformer)
|
||||
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
baseURL := baseURLResolver.Resolve(r)
|
||||
originalURL := r.URL.String()
|
||||
requestURL := urlPathTransformer.Transform(r)
|
||||
@ -39,13 +45,13 @@ func MakeForwardingProxyHandler(proxy *types.HTTPClientReverseProxy,
|
||||
|
||||
start := time.Now()
|
||||
|
||||
statusCode, err := forwardRequest(w, r, proxy.Client, baseURL, requestURL, proxy.Timeout, writeRequestURI, serviceAuthInjector)
|
||||
|
||||
seconds := time.Since(start)
|
||||
statusCode, err := forwardRequest(w, r, proxy.Client, baseURL, requestURL, proxy.Timeout, writeRequestURI, serviceAuthInjector, reverseProxy)
|
||||
if err != nil {
|
||||
log.Printf("error with upstream request to: %s, %s\n", requestURL, err.Error())
|
||||
}
|
||||
|
||||
seconds := time.Since(start)
|
||||
|
||||
for _, notifier := range notifiers {
|
||||
notifier.Notify(r.Method, requestURL, originalURL, statusCode, "completed", seconds)
|
||||
}
|
||||
@ -86,7 +92,12 @@ func forwardRequest(w http.ResponseWriter,
|
||||
requestURL string,
|
||||
timeout time.Duration,
|
||||
writeRequestURI bool,
|
||||
serviceAuthInjector middleware.AuthInjector) (int, error) {
|
||||
serviceAuthInjector middleware.AuthInjector,
|
||||
reverseProxy *httputil.ReverseProxy) (int, error) {
|
||||
|
||||
if r.Body != nil {
|
||||
defer r.Body.Close()
|
||||
}
|
||||
|
||||
upstreamReq := buildUpstreamRequest(r, baseURL, requestURL)
|
||||
if upstreamReq.Body != nil {
|
||||
@ -101,14 +112,20 @@ func forwardRequest(w http.ResponseWriter,
|
||||
log.Printf("forwardRequest: %s %s\n", upstreamReq.Host, upstreamReq.URL.String())
|
||||
}
|
||||
|
||||
if strings.HasPrefix(r.Header.Get("Accept"), "text/event-stream") {
|
||||
ww := fhttputil.NewHttpWriteInterceptor(w)
|
||||
reverseProxy.ServeHTTP(ww, upstreamReq)
|
||||
return ww.Status(), nil
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(r.Context(), timeout)
|
||||
defer cancel()
|
||||
|
||||
res, resErr := proxyClient.Do(upstreamReq.WithContext(ctx))
|
||||
if resErr != nil {
|
||||
res, err := proxyClient.Do(upstreamReq.WithContext(ctx))
|
||||
if err != nil {
|
||||
badStatus := http.StatusBadGateway
|
||||
w.WriteHeader(badStatus)
|
||||
return badStatus, resErr
|
||||
return badStatus, err
|
||||
}
|
||||
|
||||
if res.Body != nil {
|
||||
@ -117,12 +134,10 @@ func forwardRequest(w http.ResponseWriter,
|
||||
|
||||
copyHeaders(w.Header(), &res.Header)
|
||||
|
||||
// Write status code
|
||||
w.WriteHeader(res.StatusCode)
|
||||
|
||||
if res.Body != nil {
|
||||
// Copy the body over
|
||||
io.CopyBuffer(w, res.Body, nil)
|
||||
io.Copy(w, res.Body)
|
||||
}
|
||||
|
||||
return res.StatusCode, nil
|
||||
@ -159,3 +174,14 @@ var hopHeaders = []string{
|
||||
"Transfer-Encoding",
|
||||
"Upgrade",
|
||||
}
|
||||
|
||||
func makeRewriteProxy(baseURLResolver middleware.BaseURLResolver, urlPathTransformer middleware.URLPathTransformer) *httputil.ReverseProxy {
|
||||
return &httputil.ReverseProxy{
|
||||
ErrorLog: log.New(io.Discard, "proxy:", 0),
|
||||
Transport: http.DefaultClient.Transport,
|
||||
ErrorHandler: func(w http.ResponseWriter, r *http.Request, err error) {
|
||||
},
|
||||
Director: func(r *http.Request) {
|
||||
},
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user