Compare commits

...

2 Commits

Author SHA1 Message Date
93825e8354 Add null-checking for labels
Fixes an issue introduced in #45 which was undetected. When
users do not pass in "labels" to the deployment - or a valid
empty object, then a nil dereference causes a panic.

Fixes: #101

Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alexellis2@gmail.com>
2020-09-11 12:15:33 +01:00
6752a61a95 Proxy the gateway using TCP
There appeared to be an issue with logs appearing #98 and #68

@LucasRoesler spent a considerable amount of time looking into
this and concluded that the faas-provider and approach we are
taking to stream logs from journalctl as a process was
working as expected.

The issue appears to have been with the proxy code and its
use of a HTTP connection. Somewhere within the code, a buffer
was holding onto the data before flushing it 20-30 seconds later

This appeared to users as if the logs were not working at all.

Before fixing, the gateway container was tested by exposing
it over an SSH tunnel and inlets tunnel, both worked as
expected. The updates have been tested on multipass with
Ubuntu 18.04 and a binary built locally.

Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alexellis2@gmail.com>
2020-09-08 13:19:51 +01:00
2 changed files with 36 additions and 73 deletions

View File

@ -69,6 +69,7 @@ func deploy(ctx context.Context, req types.FunctionDeployment, client *container
if err != nil { if err != nil {
return err return err
} }
imgRef := reference.TagNameOnly(r).String() imgRef := reference.TagNameOnly(r).String()
snapshotter := "" snapshotter := ""
@ -98,6 +99,11 @@ func deploy(ctx context.Context, req types.FunctionDeployment, client *container
name := req.Service name := req.Service
labels := map[string]string{}
if req.Labels != nil {
labels = *req.Labels
}
container, err := client.NewContainer( container, err := client.NewContainer(
ctx, ctx,
name, name,
@ -108,7 +114,7 @@ func deploy(ctx context.Context, req types.FunctionDeployment, client *container
oci.WithCapabilities([]string{"CAP_NET_RAW"}), oci.WithCapabilities([]string{"CAP_NET_RAW"}),
oci.WithMounts(mounts), oci.WithMounts(mounts),
oci.WithEnv(envs)), oci.WithEnv(envs)),
containerd.WithContainerLabels(*req.Labels), containerd.WithContainerLabels(labels),
) )
if err != nil { if err != nil {

View File

@ -1,11 +1,10 @@
package pkg package pkg
import ( import (
"context"
"fmt" "fmt"
"io" "io"
"io/ioutil"
"log" "log"
"net"
"net/http" "net/http"
"time" "time"
@ -27,6 +26,10 @@ type Proxy struct {
Port int Port int
} }
type proxyState struct {
Host string
}
// Start listening and forwarding HTTP to the host // Start listening and forwarding HTTP to the host
func (p *Proxy) Start(gatewayChan chan string, done chan bool) error { func (p *Proxy) Start(gatewayChan chan string, done chan bool) error {
tcp := p.Port tcp := p.Port
@ -44,83 +47,37 @@ func (p *Proxy) Start(gatewayChan chan string, done chan bool) error {
fmt.Printf("Gateway: %s\n", ps.Host) fmt.Printf("Gateway: %s\n", ps.Host)
s := &http.Server{ l, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", tcp))
Addr: fmt.Sprintf(":%d", tcp), if err != nil {
ReadTimeout: p.Timeout, log.Printf("Error: %s", err.Error())
WriteTimeout: p.Timeout, return err
MaxHeaderBytes: 1 << 20, // Max header of 1MB
Handler: http.HandlerFunc(makeProxy(&ps)),
} }
go func() { defer l.Close()
log.Printf("[proxy] Begin listen on %d\n", p.Port)
if err := s.ListenAndServe(); err != http.ErrServerClosed {
log.Printf("Error ListenAndServe: %v", err)
}
}()
log.Println("[proxy] Wait for done") for {
<-done // Wait for a connection.
log.Println("[proxy] Done received") conn, err := l.Accept()
if err := s.Shutdown(context.Background()); err != nil { if err != nil {
log.Printf("[proxy] Error in Shutdown: %v", err) acceptErr := fmt.Errorf("Unable to accept on %d, error: %s", tcp, err.Error())
log.Printf("%s", acceptErr.Error())
return acceptErr
} }
return nil upstream, err := net.Dial("tcp", fmt.Sprintf("%s", ps.Host))
}
if err != nil {
log.Printf("unable to dial to %s, error: %s", ps.Host, err.Error())
return err
}
go pipe(conn, upstream)
go pipe(upstream, conn)
// copyHeaders clones the header values from the source into the destination.
func copyHeaders(destination http.Header, source *http.Header) {
for k, v := range *source {
vClone := make([]string, len(v))
copy(vClone, v)
destination[k] = vClone
} }
} }
type proxyState struct { func pipe(from net.Conn, to net.Conn) {
Host string defer from.Close()
} io.Copy(from, to)
func makeProxy(ps *proxyState) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
query := ""
if len(r.URL.RawQuery) > 0 {
query = "?" + r.URL.RawQuery
}
upstream := fmt.Sprintf("http://%s%s%s", ps.Host, r.URL.Path, query)
fmt.Printf("[faasd] proxy: %s\n", upstream)
if r.Body != nil {
defer r.Body.Close()
}
wrapper := ioutil.NopCloser(r.Body)
upReq, upErr := http.NewRequest(r.Method, upstream, wrapper)
copyHeaders(upReq.Header, &r.Header)
if upErr != nil {
log.Println(upErr)
http.Error(w, upErr.Error(), http.StatusInternalServerError)
return
}
upRes, upResErr := http.DefaultClient.Do(upReq)
if upResErr != nil {
log.Println(upResErr)
http.Error(w, upResErr.Error(), http.StatusInternalServerError)
return
}
copyHeaders(w.Header(), &upRes.Header)
w.WriteHeader(upRes.StatusCode)
io.Copy(w, upRes.Body)
}
} }