From 97c11b3f5dea1fe03b86251c8a8394ab139c8aa1 Mon Sep 17 00:00:00 2001 From: "Alex Ellis (OpenFaaS Ltd)" Date: Tue, 8 Sep 2020 11:22:27 +0100 Subject: [PATCH] Proxy the gateway using TCP There appeared to be an issue with logs appearing #98 and #68 @LucasRoesler spent a considerable amount of time looking into this and concluded that the faas-provider and approach we are taking to stream logs from journalctl as a process was working as expected. The issue appears to have been with the proxy code and its use of a HTTP connection. Somewhere within the code, a buffer was holding onto the data before flushing it 20-30 seconds later This appeared to users as if the logs were not working at all. Before fixing, the gateway container was tested by exposing it over an SSH tunnel and inlets tunnel, both worked as expected. The updates have been tested on multipass with Ubuntu 18.04 and a binary built locally. Signed-off-by: Alex Ellis (OpenFaaS Ltd) --- pkg/proxy.go | 101 +++++++++++++++------------------------------------ 1 file changed, 29 insertions(+), 72 deletions(-) diff --git a/pkg/proxy.go b/pkg/proxy.go index f78a4ce..3feaaef 100644 --- a/pkg/proxy.go +++ b/pkg/proxy.go @@ -1,11 +1,10 @@ package pkg import ( - "context" "fmt" "io" - "io/ioutil" "log" + "net" "net/http" "time" @@ -27,6 +26,10 @@ type Proxy struct { Port int } +type proxyState struct { + Host string +} + // Start listening and forwarding HTTP to the host func (p *Proxy) Start(gatewayChan chan string, done chan bool) error { tcp := p.Port @@ -44,83 +47,37 @@ func (p *Proxy) Start(gatewayChan chan string, done chan bool) error { fmt.Printf("Gateway: %s\n", ps.Host) - s := &http.Server{ - Addr: fmt.Sprintf(":%d", tcp), - ReadTimeout: p.Timeout, - WriteTimeout: p.Timeout, - MaxHeaderBytes: 1 << 20, // Max header of 1MB - Handler: http.HandlerFunc(makeProxy(&ps)), + l, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", tcp)) + if err != nil { + log.Printf("Error: %s", err.Error()) + return err } - go func() { - log.Printf("[proxy] Begin listen on %d\n", p.Port) - if err := s.ListenAndServe(); err != http.ErrServerClosed { - log.Printf("Error ListenAndServe: %v", err) + defer l.Close() + + for { + // Wait for a connection. + conn, err := l.Accept() + if err != nil { + acceptErr := fmt.Errorf("Unable to accept on %d, error: %s", tcp, err.Error()) + log.Printf("%s", acceptErr.Error()) + return acceptErr } - }() - log.Println("[proxy] Wait for done") - <-done - log.Println("[proxy] Done received") - if err := s.Shutdown(context.Background()); err != nil { - log.Printf("[proxy] Error in Shutdown: %v", err) - } + upstream, err := net.Dial("tcp", fmt.Sprintf("%s", ps.Host)) - return nil -} + if err != nil { + log.Printf("unable to dial to %s, error: %s", ps.Host, err.Error()) + return err + } + + go pipe(conn, upstream) + go pipe(upstream, conn) -// copyHeaders clones the header values from the source into the destination. -func copyHeaders(destination http.Header, source *http.Header) { - for k, v := range *source { - vClone := make([]string, len(v)) - copy(vClone, v) - destination[k] = vClone } } -type proxyState struct { - Host string -} - -func makeProxy(ps *proxyState) func(w http.ResponseWriter, r *http.Request) { - return func(w http.ResponseWriter, r *http.Request) { - - query := "" - if len(r.URL.RawQuery) > 0 { - query = "?" + r.URL.RawQuery - } - - upstream := fmt.Sprintf("http://%s%s%s", ps.Host, r.URL.Path, query) - fmt.Printf("[faasd] proxy: %s\n", upstream) - - if r.Body != nil { - defer r.Body.Close() - } - - wrapper := ioutil.NopCloser(r.Body) - upReq, upErr := http.NewRequest(r.Method, upstream, wrapper) - - copyHeaders(upReq.Header, &r.Header) - - if upErr != nil { - log.Println(upErr) - - http.Error(w, upErr.Error(), http.StatusInternalServerError) - return - } - - upRes, upResErr := http.DefaultClient.Do(upReq) - - if upResErr != nil { - log.Println(upResErr) - - http.Error(w, upResErr.Error(), http.StatusInternalServerError) - return - } - - copyHeaders(w.Header(), &upRes.Header) - - w.WriteHeader(upRes.StatusCode) - io.Copy(w, upRes.Body) - } +func pipe(from net.Conn, to net.Conn) { + defer from.Close() + io.Copy(from, to) }