From e07a61fd0ce83a88ae7598d0ea50ea0505db0f41 Mon Sep 17 00:00:00 2001 From: Lucas Roesler Date: Sat, 13 Apr 2019 19:40:37 +0200 Subject: [PATCH] Reimplemnt the logs proxy without hijacking **What** - Create an alternative proxy implementation using CloseNotifier and Flusher Signed-off-by: Lucas Roesler --- gateway/handlers/logs.go | 96 ++++++++++++++++++++++++++++++++++- gateway/handlers/logs_test.go | 4 +- 2 files changed, 96 insertions(+), 4 deletions(-) diff --git a/gateway/handlers/logs.go b/gateway/handlers/logs.go index 47725da6..d0a6e5fa 100644 --- a/gateway/handlers/logs.go +++ b/gateway/handlers/logs.go @@ -17,6 +17,74 @@ const upstreamLogsEndpoint = "/system/logs" // NewLogHandlerFunc creates and http HandlerFunc from the supplied log Requestor. func NewLogHandlerFunc(logProvider url.URL) http.HandlerFunc { + writeRequestURI := false + if _, exists := os.LookupEnv("write_request_uri"); exists { + writeRequestURI = exists + } + + upstreamLogProviderBase := strings.TrimSuffix(logProvider.String(), "/") + + return func(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + if r.Body != nil { + defer r.Body.Close() + } + + logRequest := buildUpstreamRequest(r, upstreamLogProviderBase, upstreamLogsEndpoint) + if logRequest.Body != nil { + defer logRequest.Body.Close() + } + + cn, ok := w.(http.CloseNotifier) + if !ok { + log.Println("LogHandler: response is not a CloseNotifier, required for streaming response") + http.NotFound(w, r) + return + } + + wf, ok := w.(writerFlusher) + if !ok { + log.Println("LogHandler: response is not a Flusher, required for streaming response") + http.NotFound(w, r) + return + } + + if writeRequestURI { + log.Printf("LogProxy: proxying request to %s %s\n", logRequest.Host, logRequest.URL.String()) + } + + ctx, cancel := context.WithCancel(ctx) + logRequest = logRequest.WithContext(ctx) + defer cancel() + + logResp, err := http.DefaultTransport.RoundTrip(logRequest) + if err != nil { + log.Printf("LogProxy: forwarding request failed: %s\n", err.Error()) + http.Error(w, "log request failed", http.StatusInternalServerError) + return + } + defer logResp.Body.Close() + + // watch for connection closures and stream data + // connections and contexts should have cancel methods deferred already + select { + case err := <-copyNotify(&unbufferedWriter{wf}, logResp.Body): + if err != nil { + log.Printf("LogProxy: error while copy: %s", err.Error()) + return + } + case <-cn.CloseNotify(): + log.Printf("LogProxy: client connection closed") + return + } + + return + } +} + +// NewHijackLogHandlerFunc creates and http HandlerFunc from the supplied log Requestor. +// This remains in the package for reference purposes, providers should instead use NewLogHandlerFunc +func NewHijackLogHandlerFunc(logProvider url.URL) http.HandlerFunc { writeRequestURI := false if _, exists := os.LookupEnv("write_request_uri"); exists { @@ -50,8 +118,11 @@ func NewLogHandlerFunc(logProvider url.URL) http.HandlerFunc { } defer clientConn.Close() - clientConn.SetWriteDeadline(time.Time{}) // allow arbitrary time between log writes - buf.Flush() // will write the headers and the initial 200 response + // A zero value for t means Write will not time out, allowing us to stream the logs while + // following even if there is a large gap in logs + clientConn.SetWriteDeadline(time.Time{}) + // flush the headers and the initial 200 status code to the client + buf.Flush() if writeRequestURI { log.Printf("LogProxy: proxying request to %s %s\n", logRequest.Host, logRequest.URL.String()) @@ -68,6 +139,7 @@ func NewLogHandlerFunc(logProvider url.URL) http.HandlerFunc { buf.Flush() return } + // Body is always closeable if err is nil defer logResp.Body.Close() // write response headers directly to the buffer per RFC 2616 @@ -97,6 +169,26 @@ func NewLogHandlerFunc(logProvider url.URL) http.HandlerFunc { } } +type writerFlusher interface { + io.Writer + http.Flusher +} + +// unbufferedWriter is an io Writer that immediately flushes the after every call to Write. +// This can wrap any http.ResponseWriter that also implements Flusher. This ensures that log +// lines are immediately sent to the client +type unbufferedWriter struct { + dst writerFlusher +} + +// Write writes to the dst writer and then immediately flushes the writer +func (u *unbufferedWriter) Write(p []byte) (n int, err error) { + n, err = u.dst.Write(p) + u.dst.Flush() + + return n, err +} + func copyNotify(destination io.Writer, source io.Reader) <-chan error { done := make(chan error, 1) go func() { diff --git a/gateway/handlers/logs_test.go b/gateway/handlers/logs_test.go index 8d98c883..13bfd1a4 100644 --- a/gateway/handlers/logs_test.go +++ b/gateway/handlers/logs_test.go @@ -48,12 +48,12 @@ func Test_logsProxyDoesNotLeakGoroutinesWhenProviderClosesConnection(t *testing. resp, err := http.Get(testSrv.URL + "?name=funcFoo") if err != nil { - t.Fatalf("unexpected error sneding log request: %s", err) + t.Fatalf("unexpected error sending log request: %s", err) } body, err := ioutil.ReadAll(resp.Body) if err != nil { - t.Fatalf("unexpected error reading teh response body: %s", err) + t.Fatalf("unexpected error reading the response body: %s", err) } if string(body) != string(expectedMsg) {