mirror of
https://github.com/openfaas/faas.git
synced 2025-06-21 22:33:23 +00:00
Reimplemnt the logs proxy without hijacking
**What** - Create an alternative proxy implementation using CloseNotifier and Flusher Signed-off-by: Lucas Roesler <roesler.lucas@gmail.com>
This commit is contained in:
committed by
Alex Ellis
parent
00c734a136
commit
e07a61fd0c
@ -17,6 +17,74 @@ const upstreamLogsEndpoint = "/system/logs"
|
||||
|
||||
// NewLogHandlerFunc creates and http HandlerFunc from the supplied log Requestor.
|
||||
func NewLogHandlerFunc(logProvider url.URL) http.HandlerFunc {
|
||||
writeRequestURI := false
|
||||
if _, exists := os.LookupEnv("write_request_uri"); exists {
|
||||
writeRequestURI = exists
|
||||
}
|
||||
|
||||
upstreamLogProviderBase := strings.TrimSuffix(logProvider.String(), "/")
|
||||
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := r.Context()
|
||||
if r.Body != nil {
|
||||
defer r.Body.Close()
|
||||
}
|
||||
|
||||
logRequest := buildUpstreamRequest(r, upstreamLogProviderBase, upstreamLogsEndpoint)
|
||||
if logRequest.Body != nil {
|
||||
defer logRequest.Body.Close()
|
||||
}
|
||||
|
||||
cn, ok := w.(http.CloseNotifier)
|
||||
if !ok {
|
||||
log.Println("LogHandler: response is not a CloseNotifier, required for streaming response")
|
||||
http.NotFound(w, r)
|
||||
return
|
||||
}
|
||||
|
||||
wf, ok := w.(writerFlusher)
|
||||
if !ok {
|
||||
log.Println("LogHandler: response is not a Flusher, required for streaming response")
|
||||
http.NotFound(w, r)
|
||||
return
|
||||
}
|
||||
|
||||
if writeRequestURI {
|
||||
log.Printf("LogProxy: proxying request to %s %s\n", logRequest.Host, logRequest.URL.String())
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
logRequest = logRequest.WithContext(ctx)
|
||||
defer cancel()
|
||||
|
||||
logResp, err := http.DefaultTransport.RoundTrip(logRequest)
|
||||
if err != nil {
|
||||
log.Printf("LogProxy: forwarding request failed: %s\n", err.Error())
|
||||
http.Error(w, "log request failed", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
defer logResp.Body.Close()
|
||||
|
||||
// watch for connection closures and stream data
|
||||
// connections and contexts should have cancel methods deferred already
|
||||
select {
|
||||
case err := <-copyNotify(&unbufferedWriter{wf}, logResp.Body):
|
||||
if err != nil {
|
||||
log.Printf("LogProxy: error while copy: %s", err.Error())
|
||||
return
|
||||
}
|
||||
case <-cn.CloseNotify():
|
||||
log.Printf("LogProxy: client connection closed")
|
||||
return
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// NewHijackLogHandlerFunc creates and http HandlerFunc from the supplied log Requestor.
|
||||
// This remains in the package for reference purposes, providers should instead use NewLogHandlerFunc
|
||||
func NewHijackLogHandlerFunc(logProvider url.URL) http.HandlerFunc {
|
||||
|
||||
writeRequestURI := false
|
||||
if _, exists := os.LookupEnv("write_request_uri"); exists {
|
||||
@ -50,8 +118,11 @@ func NewLogHandlerFunc(logProvider url.URL) http.HandlerFunc {
|
||||
}
|
||||
|
||||
defer clientConn.Close()
|
||||
clientConn.SetWriteDeadline(time.Time{}) // allow arbitrary time between log writes
|
||||
buf.Flush() // will write the headers and the initial 200 response
|
||||
// A zero value for t means Write will not time out, allowing us to stream the logs while
|
||||
// following even if there is a large gap in logs
|
||||
clientConn.SetWriteDeadline(time.Time{})
|
||||
// flush the headers and the initial 200 status code to the client
|
||||
buf.Flush()
|
||||
|
||||
if writeRequestURI {
|
||||
log.Printf("LogProxy: proxying request to %s %s\n", logRequest.Host, logRequest.URL.String())
|
||||
@ -68,6 +139,7 @@ func NewLogHandlerFunc(logProvider url.URL) http.HandlerFunc {
|
||||
buf.Flush()
|
||||
return
|
||||
}
|
||||
// Body is always closeable if err is nil
|
||||
defer logResp.Body.Close()
|
||||
|
||||
// write response headers directly to the buffer per RFC 2616
|
||||
@ -97,6 +169,26 @@ func NewLogHandlerFunc(logProvider url.URL) http.HandlerFunc {
|
||||
}
|
||||
}
|
||||
|
||||
type writerFlusher interface {
|
||||
io.Writer
|
||||
http.Flusher
|
||||
}
|
||||
|
||||
// unbufferedWriter is an io Writer that immediately flushes the after every call to Write.
|
||||
// This can wrap any http.ResponseWriter that also implements Flusher. This ensures that log
|
||||
// lines are immediately sent to the client
|
||||
type unbufferedWriter struct {
|
||||
dst writerFlusher
|
||||
}
|
||||
|
||||
// Write writes to the dst writer and then immediately flushes the writer
|
||||
func (u *unbufferedWriter) Write(p []byte) (n int, err error) {
|
||||
n, err = u.dst.Write(p)
|
||||
u.dst.Flush()
|
||||
|
||||
return n, err
|
||||
}
|
||||
|
||||
func copyNotify(destination io.Writer, source io.Reader) <-chan error {
|
||||
done := make(chan error, 1)
|
||||
go func() {
|
||||
|
@ -48,12 +48,12 @@ func Test_logsProxyDoesNotLeakGoroutinesWhenProviderClosesConnection(t *testing.
|
||||
|
||||
resp, err := http.Get(testSrv.URL + "?name=funcFoo")
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error sneding log request: %s", err)
|
||||
t.Fatalf("unexpected error sending log request: %s", err)
|
||||
}
|
||||
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error reading teh response body: %s", err)
|
||||
t.Fatalf("unexpected error reading the response body: %s", err)
|
||||
}
|
||||
|
||||
if string(body) != string(expectedMsg) {
|
||||
|
Reference in New Issue
Block a user