Proxy the gateway using TCP

There appeared to be an issue with logs appearing #98 and #68

@LucasRoesler spent a considerable amount of time looking into
this and concluded that the faas-provider and approach we are
taking to stream logs from journalctl as a process was
working as expected.

The issue appears to have been with the proxy code and its
use of a HTTP connection. Somewhere within the code, a buffer
was holding onto the data before flushing it 20-30 seconds later

This appeared to users as if the logs were not working at all.

Before fixing, the gateway container was tested by exposing
it over an SSH tunnel and inlets tunnel, both worked as
expected. The updates have been tested on multipass with
Ubuntu 18.04 and a binary built locally.

Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alexellis2@gmail.com>
This commit is contained in:
Alex Ellis (OpenFaaS Ltd) 2020-09-08 11:22:27 +01:00
parent 16a8d2ac6c
commit 97c11b3f5d

View File

@ -1,11 +1,10 @@
package pkg
import (
"context"
"fmt"
"io"
"io/ioutil"
"log"
"net"
"net/http"
"time"
@ -27,6 +26,10 @@ type Proxy struct {
Port int
}
type proxyState struct {
Host string
}
// Start listening and forwarding HTTP to the host
func (p *Proxy) Start(gatewayChan chan string, done chan bool) error {
tcp := p.Port
@ -44,83 +47,37 @@ func (p *Proxy) Start(gatewayChan chan string, done chan bool) error {
fmt.Printf("Gateway: %s\n", ps.Host)
s := &http.Server{
Addr: fmt.Sprintf(":%d", tcp),
ReadTimeout: p.Timeout,
WriteTimeout: p.Timeout,
MaxHeaderBytes: 1 << 20, // Max header of 1MB
Handler: http.HandlerFunc(makeProxy(&ps)),
l, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", tcp))
if err != nil {
log.Printf("Error: %s", err.Error())
return err
}
go func() {
log.Printf("[proxy] Begin listen on %d\n", p.Port)
if err := s.ListenAndServe(); err != http.ErrServerClosed {
log.Printf("Error ListenAndServe: %v", err)
}
}()
defer l.Close()
log.Println("[proxy] Wait for done")
<-done
log.Println("[proxy] Done received")
if err := s.Shutdown(context.Background()); err != nil {
log.Printf("[proxy] Error in Shutdown: %v", err)
for {
// Wait for a connection.
conn, err := l.Accept()
if err != nil {
acceptErr := fmt.Errorf("Unable to accept on %d, error: %s", tcp, err.Error())
log.Printf("%s", acceptErr.Error())
return acceptErr
}
return nil
upstream, err := net.Dial("tcp", fmt.Sprintf("%s", ps.Host))
if err != nil {
log.Printf("unable to dial to %s, error: %s", ps.Host, err.Error())
return err
}
// copyHeaders clones the header values from the source into the destination.
func copyHeaders(destination http.Header, source *http.Header) {
for k, v := range *source {
vClone := make([]string, len(v))
copy(vClone, v)
destination[k] = vClone
go pipe(conn, upstream)
go pipe(upstream, conn)
}
}
type proxyState struct {
Host string
}
func makeProxy(ps *proxyState) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
query := ""
if len(r.URL.RawQuery) > 0 {
query = "?" + r.URL.RawQuery
}
upstream := fmt.Sprintf("http://%s%s%s", ps.Host, r.URL.Path, query)
fmt.Printf("[faasd] proxy: %s\n", upstream)
if r.Body != nil {
defer r.Body.Close()
}
wrapper := ioutil.NopCloser(r.Body)
upReq, upErr := http.NewRequest(r.Method, upstream, wrapper)
copyHeaders(upReq.Header, &r.Header)
if upErr != nil {
log.Println(upErr)
http.Error(w, upErr.Error(), http.StatusInternalServerError)
return
}
upRes, upResErr := http.DefaultClient.Do(upReq)
if upResErr != nil {
log.Println(upResErr)
http.Error(w, upResErr.Error(), http.StatusInternalServerError)
return
}
copyHeaders(w.Header(), &upRes.Header)
w.WriteHeader(upRes.StatusCode)
io.Copy(w, upRes.Body)
}
func pipe(from net.Conn, to net.Conn) {
defer from.Close()
io.Copy(from, to)
}