mirror of
https://github.com/openfaas/faas.git
synced 2025-06-10 09:16:48 +00:00
Implement log proxy handler
**What** - Implement log handler method that will hijack the connection and clear timeouts to allow long lived streams - Proxies requests to the logs provider and returns the response unmodified Signed-off-by: Lucas Roesler <roesler.lucas@gmail.com>
This commit is contained in:
parent
3c4077f3df
commit
e7e91ecd15
@ -155,6 +155,10 @@ func copyHeaders(destination http.Header, source *http.Header) {
|
||||
copy(vClone, v)
|
||||
(destination)[k] = vClone
|
||||
}
|
||||
|
||||
for _, h := range hopHeaders {
|
||||
destination.Del(h)
|
||||
}
|
||||
}
|
||||
|
||||
func deleteHeaders(target *http.Header, exclude *[]string) {
|
||||
|
133
gateway/handlers/logs.go
Normal file
133
gateway/handlers/logs.go
Normal file
@ -0,0 +1,133 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
const crlf = "\r\n"
|
||||
const upstreamLogsEndpoint = "/system/logs"
|
||||
|
||||
// NewLogHandlerFunc creates and http HandlerFunc from the supplied log Requestor.
|
||||
func NewLogHandlerFunc(logProvider url.URL) http.HandlerFunc {
|
||||
|
||||
writeRequestURI := false
|
||||
if _, exists := os.LookupEnv("write_request_uri"); exists {
|
||||
writeRequestURI = exists
|
||||
}
|
||||
|
||||
upstreamLogProviderBase := strings.TrimSuffix(logProvider.String(), "/")
|
||||
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := r.Context()
|
||||
if r.Body != nil {
|
||||
defer r.Body.Close()
|
||||
}
|
||||
|
||||
logRequest := buildUpstreamRequest(r, upstreamLogProviderBase, upstreamLogsEndpoint)
|
||||
if logRequest.Body != nil {
|
||||
defer logRequest.Body.Close()
|
||||
}
|
||||
|
||||
hijacker, ok := w.(http.Hijacker)
|
||||
if !ok {
|
||||
log.Println("LogProxy: response is not a Hijacker, required for streaming response")
|
||||
http.NotFound(w, r)
|
||||
return
|
||||
}
|
||||
|
||||
clientConn, buf, err := hijacker.Hijack()
|
||||
if err != nil {
|
||||
log.Println("LogProxy: failed to hijack connection for streaming response")
|
||||
return
|
||||
}
|
||||
|
||||
defer clientConn.Close()
|
||||
clientConn.SetWriteDeadline(time.Time{}) // allow arbitrary time between log writes
|
||||
buf.Flush() // will write the headers and the initial 200 response
|
||||
|
||||
if writeRequestURI {
|
||||
log.Printf("LogProxy: proxying request to %s %s\n", logRequest.Host, logRequest.URL.String())
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
logRequest = logRequest.WithContext(ctx)
|
||||
defer cancel()
|
||||
|
||||
logResp, err := http.DefaultTransport.RoundTrip(logRequest)
|
||||
if err != nil {
|
||||
log.Printf("LogProxy: forwarding request failed: %s\n", err.Error())
|
||||
buf.WriteString("HTTP/1.1 500 Server Error" + crlf)
|
||||
buf.Flush()
|
||||
return
|
||||
}
|
||||
defer logResp.Body.Close()
|
||||
|
||||
// write response headers directly to the buffer per RFC 2616
|
||||
// https://www.w3.org/Protocols/rfc2616/rfc2616-sec6.html
|
||||
buf.WriteString("HTTP/1.1 " + logResp.Status + crlf)
|
||||
logResp.Header.Write(buf)
|
||||
buf.WriteString(crlf)
|
||||
buf.Flush()
|
||||
|
||||
// watch for connection closures and stream data
|
||||
// connections and contexts should have cancel methods deferred already
|
||||
select {
|
||||
case err := <-copyNotify(buf, logResp.Body):
|
||||
if err != nil {
|
||||
log.Printf("LogProxy: error while copy: %s", err.Error())
|
||||
return
|
||||
}
|
||||
logResp.Trailer.Write(buf)
|
||||
case err := <-closeNotify(ctx, clientConn):
|
||||
if err != nil {
|
||||
log.Printf("LogProxy: client connection closed: %s", err.Error())
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func copyNotify(destination io.Writer, source io.Reader) <-chan error {
|
||||
done := make(chan error, 1)
|
||||
go func() {
|
||||
_, err := io.Copy(destination, source)
|
||||
done <- err
|
||||
}()
|
||||
return done
|
||||
}
|
||||
|
||||
// closeNotify will watch the connection and notify when then connection is closed
|
||||
func closeNotify(ctx context.Context, c net.Conn) <-chan error {
|
||||
notify := make(chan error, 1)
|
||||
|
||||
go func() {
|
||||
buf := make([]byte, 1)
|
||||
// blocks until non-zero read or error. From the fd.Read docs:
|
||||
// If the caller wanted a zero byte read, return immediately
|
||||
// without trying (but after acquiring the readLock).
|
||||
// Otherwise syscall.Read returns 0, nil which looks like
|
||||
// io.EOF.
|
||||
// It is important that `buf` is allocated a non-zero size
|
||||
n, err := c.Read(buf)
|
||||
if err != nil {
|
||||
log.Printf("LogProxy: test connection: %s\n", err)
|
||||
notify <- err
|
||||
return
|
||||
}
|
||||
if n > 0 {
|
||||
log.Printf("LogProxy: unexpected data: %s\n", buf[:n])
|
||||
return
|
||||
}
|
||||
}()
|
||||
return notify
|
||||
}
|
@ -113,6 +113,8 @@ func main() {
|
||||
forwardingNotifiers,
|
||||
)
|
||||
|
||||
faasHandlers.LogProxyHandler = handlers.NewLogHandlerFunc(*config.LogsProviderURL)
|
||||
|
||||
if config.UseNATS() {
|
||||
log.Println("Async enabled: Using NATS Streaming.")
|
||||
maxReconnect := 60
|
||||
@ -163,6 +165,8 @@ func main() {
|
||||
decorateExternalAuth(faasHandlers.AsyncReport, config.UpstreamTimeout, config.AuthProxyURL, config.AuthProxyPassBody)
|
||||
faasHandlers.SecretHandler =
|
||||
decorateExternalAuth(faasHandlers.SecretHandler, config.UpstreamTimeout, config.AuthProxyURL, config.AuthProxyPassBody)
|
||||
faasHandlers.LogProxyHandler =
|
||||
decorateExternalAuth(faasHandlers.LogProxyHandler, config.UpstreamTimeout, config.AuthProxyURL, config.AuthProxyPassBody)
|
||||
}
|
||||
|
||||
r := mux.NewRouter()
|
||||
@ -197,6 +201,7 @@ func main() {
|
||||
r.HandleFunc("/system/scale-function/{name:[-a-zA-Z_0-9]+}", faasHandlers.ScaleFunction).Methods(http.MethodPost)
|
||||
|
||||
r.HandleFunc("/system/secrets", faasHandlers.SecretHandler).Methods(http.MethodGet, http.MethodPut, http.MethodPost, http.MethodDelete)
|
||||
r.HandleFunc("/system/logs", faasHandlers.LogProxyHandler).Methods(http.MethodGet)
|
||||
|
||||
if faasHandlers.QueuedProxy != nil {
|
||||
r.HandleFunc("/async-function/{name:[-a-zA-Z_0-9]+}/", faasHandlers.QueuedProxy).Methods(http.MethodPost)
|
||||
|
@ -31,4 +31,7 @@ type HandlerSet struct {
|
||||
|
||||
// SecretHandler allows secrets to be managed
|
||||
SecretHandler http.HandlerFunc
|
||||
|
||||
// LogProxyHandler allows streaming of logs for functions
|
||||
LogProxyHandler http.HandlerFunc
|
||||
}
|
||||
|
@ -72,6 +72,16 @@ func (ReadConfig) Read(hasEnv HasEnv) GatewayConfig {
|
||||
}
|
||||
}
|
||||
|
||||
if len(hasEnv.Getenv("logs_provider_url")) > 0 {
|
||||
var err error
|
||||
cfg.LogsProviderURL, err = url.Parse(hasEnv.Getenv("logs_provider_url"))
|
||||
if err != nil {
|
||||
log.Fatal("If logs_provider_url is provided, then it should be a valid URL.", err)
|
||||
}
|
||||
} else if cfg.FunctionsProviderURL != nil {
|
||||
cfg.LogsProviderURL, _ = url.Parse(cfg.FunctionsProviderURL.String())
|
||||
}
|
||||
|
||||
faasNATSAddress := hasEnv.Getenv("faas_nats_address")
|
||||
if len(faasNATSAddress) > 0 {
|
||||
cfg.NATSAddress = &faasNATSAddress
|
||||
@ -158,6 +168,9 @@ type GatewayConfig struct {
|
||||
// URL for alternate functions provider.
|
||||
FunctionsProviderURL *url.URL
|
||||
|
||||
// URL for alternate function logs provider.
|
||||
LogsProviderURL *url.URL
|
||||
|
||||
// Address of the NATS service. Required for async mode.
|
||||
NATSAddress *string
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user