From e7e91ecd155188141e9759360b85d138bbc5e4c3 Mon Sep 17 00:00:00 2001 From: Lucas Roesler Date: Sun, 10 Mar 2019 14:34:45 +0100 Subject: [PATCH] Implement log proxy handler **What** - Implement log handler method that will hijack the connection and clear timeouts to allow long lived streams - Proxies requests to the logs provider and returns the response unmodified Signed-off-by: Lucas Roesler --- gateway/handlers/forwarding_proxy.go | 4 + gateway/handlers/logs.go | 133 +++++++++++++++++++++++++++ gateway/server.go | 5 + gateway/types/handler_set.go | 3 + gateway/types/readconfig.go | 13 +++ 5 files changed, 158 insertions(+) create mode 100644 gateway/handlers/logs.go diff --git a/gateway/handlers/forwarding_proxy.go b/gateway/handlers/forwarding_proxy.go index e4e729a7..3f3cc564 100644 --- a/gateway/handlers/forwarding_proxy.go +++ b/gateway/handlers/forwarding_proxy.go @@ -155,6 +155,10 @@ func copyHeaders(destination http.Header, source *http.Header) { copy(vClone, v) (destination)[k] = vClone } + + for _, h := range hopHeaders { + destination.Del(h) + } } func deleteHeaders(target *http.Header, exclude *[]string) { diff --git a/gateway/handlers/logs.go b/gateway/handlers/logs.go new file mode 100644 index 00000000..47725da6 --- /dev/null +++ b/gateway/handlers/logs.go @@ -0,0 +1,133 @@ +package handlers + +import ( + "context" + "io" + "log" + "net" + "net/http" + "net/url" + "os" + "strings" + "time" +) + +const crlf = "\r\n" +const upstreamLogsEndpoint = "/system/logs" + +// NewLogHandlerFunc creates and http HandlerFunc from the supplied log Requestor. +func NewLogHandlerFunc(logProvider url.URL) http.HandlerFunc { + + writeRequestURI := false + if _, exists := os.LookupEnv("write_request_uri"); exists { + writeRequestURI = exists + } + + upstreamLogProviderBase := strings.TrimSuffix(logProvider.String(), "/") + + return func(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + if r.Body != nil { + defer r.Body.Close() + } + + logRequest := buildUpstreamRequest(r, upstreamLogProviderBase, upstreamLogsEndpoint) + if logRequest.Body != nil { + defer logRequest.Body.Close() + } + + hijacker, ok := w.(http.Hijacker) + if !ok { + log.Println("LogProxy: response is not a Hijacker, required for streaming response") + http.NotFound(w, r) + return + } + + clientConn, buf, err := hijacker.Hijack() + if err != nil { + log.Println("LogProxy: failed to hijack connection for streaming response") + return + } + + defer clientConn.Close() + clientConn.SetWriteDeadline(time.Time{}) // allow arbitrary time between log writes + buf.Flush() // will write the headers and the initial 200 response + + if writeRequestURI { + log.Printf("LogProxy: proxying request to %s %s\n", logRequest.Host, logRequest.URL.String()) + } + + ctx, cancel := context.WithCancel(ctx) + logRequest = logRequest.WithContext(ctx) + defer cancel() + + logResp, err := http.DefaultTransport.RoundTrip(logRequest) + if err != nil { + log.Printf("LogProxy: forwarding request failed: %s\n", err.Error()) + buf.WriteString("HTTP/1.1 500 Server Error" + crlf) + buf.Flush() + return + } + defer logResp.Body.Close() + + // write response headers directly to the buffer per RFC 2616 + // https://www.w3.org/Protocols/rfc2616/rfc2616-sec6.html + buf.WriteString("HTTP/1.1 " + logResp.Status + crlf) + logResp.Header.Write(buf) + buf.WriteString(crlf) + buf.Flush() + + // watch for connection closures and stream data + // connections and contexts should have cancel methods deferred already + select { + case err := <-copyNotify(buf, logResp.Body): + if err != nil { + log.Printf("LogProxy: error while copy: %s", err.Error()) + return + } + logResp.Trailer.Write(buf) + case err := <-closeNotify(ctx, clientConn): + if err != nil { + log.Printf("LogProxy: client connection closed: %s", err.Error()) + } + return + } + + return + } +} + +func copyNotify(destination io.Writer, source io.Reader) <-chan error { + done := make(chan error, 1) + go func() { + _, err := io.Copy(destination, source) + done <- err + }() + return done +} + +// closeNotify will watch the connection and notify when then connection is closed +func closeNotify(ctx context.Context, c net.Conn) <-chan error { + notify := make(chan error, 1) + + go func() { + buf := make([]byte, 1) + // blocks until non-zero read or error. From the fd.Read docs: + // If the caller wanted a zero byte read, return immediately + // without trying (but after acquiring the readLock). + // Otherwise syscall.Read returns 0, nil which looks like + // io.EOF. + // It is important that `buf` is allocated a non-zero size + n, err := c.Read(buf) + if err != nil { + log.Printf("LogProxy: test connection: %s\n", err) + notify <- err + return + } + if n > 0 { + log.Printf("LogProxy: unexpected data: %s\n", buf[:n]) + return + } + }() + return notify +} diff --git a/gateway/server.go b/gateway/server.go index 3945def7..d4f5ae1a 100644 --- a/gateway/server.go +++ b/gateway/server.go @@ -113,6 +113,8 @@ func main() { forwardingNotifiers, ) + faasHandlers.LogProxyHandler = handlers.NewLogHandlerFunc(*config.LogsProviderURL) + if config.UseNATS() { log.Println("Async enabled: Using NATS Streaming.") maxReconnect := 60 @@ -163,6 +165,8 @@ func main() { decorateExternalAuth(faasHandlers.AsyncReport, config.UpstreamTimeout, config.AuthProxyURL, config.AuthProxyPassBody) faasHandlers.SecretHandler = decorateExternalAuth(faasHandlers.SecretHandler, config.UpstreamTimeout, config.AuthProxyURL, config.AuthProxyPassBody) + faasHandlers.LogProxyHandler = + decorateExternalAuth(faasHandlers.LogProxyHandler, config.UpstreamTimeout, config.AuthProxyURL, config.AuthProxyPassBody) } r := mux.NewRouter() @@ -197,6 +201,7 @@ func main() { r.HandleFunc("/system/scale-function/{name:[-a-zA-Z_0-9]+}", faasHandlers.ScaleFunction).Methods(http.MethodPost) r.HandleFunc("/system/secrets", faasHandlers.SecretHandler).Methods(http.MethodGet, http.MethodPut, http.MethodPost, http.MethodDelete) + r.HandleFunc("/system/logs", faasHandlers.LogProxyHandler).Methods(http.MethodGet) if faasHandlers.QueuedProxy != nil { r.HandleFunc("/async-function/{name:[-a-zA-Z_0-9]+}/", faasHandlers.QueuedProxy).Methods(http.MethodPost) diff --git a/gateway/types/handler_set.go b/gateway/types/handler_set.go index 2f01eb59..6b0459bb 100644 --- a/gateway/types/handler_set.go +++ b/gateway/types/handler_set.go @@ -31,4 +31,7 @@ type HandlerSet struct { // SecretHandler allows secrets to be managed SecretHandler http.HandlerFunc + + // LogProxyHandler allows streaming of logs for functions + LogProxyHandler http.HandlerFunc } diff --git a/gateway/types/readconfig.go b/gateway/types/readconfig.go index 68873447..89ffa7ab 100644 --- a/gateway/types/readconfig.go +++ b/gateway/types/readconfig.go @@ -72,6 +72,16 @@ func (ReadConfig) Read(hasEnv HasEnv) GatewayConfig { } } + if len(hasEnv.Getenv("logs_provider_url")) > 0 { + var err error + cfg.LogsProviderURL, err = url.Parse(hasEnv.Getenv("logs_provider_url")) + if err != nil { + log.Fatal("If logs_provider_url is provided, then it should be a valid URL.", err) + } + } else if cfg.FunctionsProviderURL != nil { + cfg.LogsProviderURL, _ = url.Parse(cfg.FunctionsProviderURL.String()) + } + faasNATSAddress := hasEnv.Getenv("faas_nats_address") if len(faasNATSAddress) > 0 { cfg.NATSAddress = &faasNATSAddress @@ -158,6 +168,9 @@ type GatewayConfig struct { // URL for alternate functions provider. FunctionsProviderURL *url.URL + // URL for alternate function logs provider. + LogsProviderURL *url.URL + // Address of the NATS service. Required for async mode. NATSAddress *string