Updates for text streaming functions

Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alexellis2@gmail.com>
This commit is contained in:
Alex Ellis (OpenFaaS Ltd)
2024-01-11 18:13:34 +00:00
parent f17a25f3e8
commit 4a3fa684e2
59 changed files with 845 additions and 816 deletions

View File

@ -26,11 +26,12 @@ import (
"log"
"net"
"net/http"
"net/http/httputil"
"net/url"
"time"
"github.com/gorilla/mux"
"github.com/openfaas/faas-provider/httputil"
fhttputil "github.com/openfaas/faas-provider/httputil"
"github.com/openfaas/faas-provider/types"
)
@ -68,6 +69,17 @@ func NewHandlerFunc(config types.FaaSConfig, resolver BaseURLResolver, verbose b
proxyClient := NewProxyClientFromConfig(config)
reverseProxy := httputil.ReverseProxy{}
reverseProxy.Director = func(req *http.Request) {
// At least an empty director is required to prevent runtime errors.
req.URL.Scheme = "http"
}
reverseProxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, err error) {
}
// Errors are common during disconnect of client, no need to log them.
reverseProxy.ErrorLog = log.New(io.Discard, "", 0)
return func(w http.ResponseWriter, r *http.Request) {
if r.Body != nil {
defer r.Body.Close()
@ -81,7 +93,7 @@ func NewHandlerFunc(config types.FaaSConfig, resolver BaseURLResolver, verbose b
http.MethodGet,
http.MethodOptions,
http.MethodHead:
proxyRequest(w, r, proxyClient, resolver, verbose)
proxyRequest(w, r, proxyClient, resolver, &reverseProxy, verbose)
default:
w.WriteHeader(http.StatusMethodNotAllowed)
@ -134,7 +146,7 @@ func NewProxyClient(timeout time.Duration, maxIdleConns int, maxIdleConnsPerHost
}
// proxyRequest handles the actual resolution of and then request to the function service.
func proxyRequest(w http.ResponseWriter, originalReq *http.Request, proxyClient *http.Client, resolver BaseURLResolver, verbose bool) {
func proxyRequest(w http.ResponseWriter, originalReq *http.Request, proxyClient *http.Client, resolver BaseURLResolver, reverseProxy *httputil.ReverseProxy, verbose bool) {
ctx := originalReq.Context()
pathVars := mux.Vars(originalReq)
@ -142,7 +154,7 @@ func proxyRequest(w http.ResponseWriter, originalReq *http.Request, proxyClient
if functionName == "" {
w.Header().Add(openFaaSInternalHeader, "proxy")
httputil.Errorf(w, http.StatusBadRequest, "Provide function name in the request path")
fhttputil.Errorf(w, http.StatusBadRequest, "Provide function name in the request path")
return
}
@ -152,7 +164,7 @@ func proxyRequest(w http.ResponseWriter, originalReq *http.Request, proxyClient
// TODO: Should record the 404/not found error in Prometheus.
log.Printf("resolver error: no endpoints for %s: %s\n", functionName, err.Error())
httputil.Errorf(w, http.StatusServiceUnavailable, "No endpoints available for: %s.", functionName)
fhttputil.Errorf(w, http.StatusServiceUnavailable, "No endpoints available for: %s.", functionName)
return
}
@ -161,7 +173,7 @@ func proxyRequest(w http.ResponseWriter, originalReq *http.Request, proxyClient
w.Header().Add(openFaaSInternalHeader, "proxy")
httputil.Errorf(w, http.StatusInternalServerError, "Failed to resolve service: %s.", functionName)
fhttputil.Errorf(w, http.StatusInternalServerError, "Failed to resolve service: %s.", functionName)
return
}
@ -169,16 +181,29 @@ func proxyRequest(w http.ResponseWriter, originalReq *http.Request, proxyClient
defer proxyReq.Body.Close()
}
start := time.Now()
if verbose {
start := time.Now()
defer func() {
seconds := time.Since(start)
log.Printf("%s took %f seconds\n", functionName, seconds.Seconds())
}()
}
if v := originalReq.Header.Get("Accept"); v == "text/event-stream" {
originalReq.URL = proxyReq.URL
reverseProxy.ServeHTTP(w, originalReq)
return
}
response, err := proxyClient.Do(proxyReq.WithContext(ctx))
seconds := time.Since(start)
if err != nil {
log.Printf("error with proxy request to: %s, %s\n", proxyReq.URL.String(), err.Error())
w.Header().Add(openFaaSInternalHeader, "proxy")
httputil.Errorf(w, http.StatusInternalServerError, "Can't reach service for: %s.", functionName)
fhttputil.Errorf(w, http.StatusInternalServerError, "Can't reach service for: %s.", functionName)
return
}
@ -186,10 +211,6 @@ func proxyRequest(w http.ResponseWriter, originalReq *http.Request, proxyClient
defer response.Body.Close()
}
if verbose {
log.Printf("%s took %f seconds\n", functionName, seconds.Seconds())
}
clientHeader := w.Header()
copyHeaders(clientHeader, &response.Header)
w.Header().Set("Content-Type", getContentType(originalReq.Header, response.Header))