mirror of
https://github.com/openfaas/faasd.git
synced 2025-06-21 22:33:27 +00:00
Add unit test for proxy and shutdown channel
* Proxy has initial unit test and more can be added * Shutdown channel and cancellation added for proper shutdown of the proxy Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alexellis2@gmail.com>
This commit is contained in:
123
pkg/proxy.go
123
pkg/proxy.go
@ -1,6 +1,7 @@
|
||||
package pkg
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
@ -10,83 +11,58 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
func NewProxy(timeout time.Duration) *Proxy {
|
||||
func NewProxy(port int, timeout time.Duration) *Proxy {
|
||||
|
||||
return &Proxy{
|
||||
Port: port,
|
||||
Timeout: timeout,
|
||||
}
|
||||
}
|
||||
|
||||
type Proxy struct {
|
||||
Timeout time.Duration
|
||||
Port int
|
||||
}
|
||||
|
||||
func (p *Proxy) Start(gatewayChan chan string) error {
|
||||
tcp := 8080
|
||||
func (p *Proxy) Start(gatewayChan chan string, done chan bool) error {
|
||||
tcp := p.Port
|
||||
|
||||
http.DefaultClient.CheckRedirect = func(req *http.Request, via []*http.Request) error {
|
||||
return http.ErrUseLastResponse
|
||||
}
|
||||
|
||||
data := struct{ host string }{
|
||||
host: "",
|
||||
ps := proxyState{
|
||||
Host: "",
|
||||
}
|
||||
|
||||
data.host = <-gatewayChan
|
||||
ps.Host = <-gatewayChan
|
||||
|
||||
log.Printf("Starting faasd proxy on %d\n", tcp)
|
||||
|
||||
fmt.Printf("Gateway: %s\n", data.host)
|
||||
fmt.Printf("Gateway: %s\n", ps.Host)
|
||||
|
||||
s := &http.Server{
|
||||
Addr: fmt.Sprintf(":%d", tcp),
|
||||
ReadTimeout: p.Timeout,
|
||||
WriteTimeout: p.Timeout,
|
||||
MaxHeaderBytes: 1 << 20, // Max header of 1MB
|
||||
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
query := ""
|
||||
if len(r.URL.RawQuery) > 0 {
|
||||
query = "?" + r.URL.RawQuery
|
||||
}
|
||||
|
||||
upstream := fmt.Sprintf("http://%s:8080%s%s", data.host, r.URL.Path, query)
|
||||
fmt.Printf("[faasd] proxy: %s\n", upstream)
|
||||
|
||||
if r.Body != nil {
|
||||
defer r.Body.Close()
|
||||
}
|
||||
|
||||
wrapper := ioutil.NopCloser(r.Body)
|
||||
upReq, upErr := http.NewRequest(r.Method, upstream, wrapper)
|
||||
|
||||
copyHeaders(upReq.Header, &r.Header)
|
||||
|
||||
if upErr != nil {
|
||||
log.Println(upErr)
|
||||
|
||||
http.Error(w, upErr.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
upRes, upResErr := http.DefaultClient.Do(upReq)
|
||||
|
||||
if upResErr != nil {
|
||||
log.Println(upResErr)
|
||||
|
||||
http.Error(w, upResErr.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
copyHeaders(w.Header(), &upRes.Header)
|
||||
|
||||
w.WriteHeader(upRes.StatusCode)
|
||||
io.Copy(w, upRes.Body)
|
||||
|
||||
}),
|
||||
Handler: http.HandlerFunc(makeProxy(&ps)),
|
||||
}
|
||||
|
||||
return s.ListenAndServe()
|
||||
go func() {
|
||||
log.Printf("[proxy] Begin listen on %d\n", p.Port)
|
||||
if err := s.ListenAndServe(); err != http.ErrServerClosed {
|
||||
log.Printf("Error ListenAndServe: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
log.Println("[proxy] Wait for done")
|
||||
<-done
|
||||
log.Println("[proxy] Done received")
|
||||
if err := s.Shutdown(context.Background()); err != nil {
|
||||
log.Printf("[proxy] Error in Shutdown: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// copyHeaders clones the header values from the source into the destination.
|
||||
@ -97,3 +73,50 @@ func copyHeaders(destination http.Header, source *http.Header) {
|
||||
destination[k] = vClone
|
||||
}
|
||||
}
|
||||
|
||||
type proxyState struct {
|
||||
Host string
|
||||
}
|
||||
|
||||
func makeProxy(ps *proxyState) func(w http.ResponseWriter, r *http.Request) {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
query := ""
|
||||
if len(r.URL.RawQuery) > 0 {
|
||||
query = "?" + r.URL.RawQuery
|
||||
}
|
||||
|
||||
upstream := fmt.Sprintf("http://%s%s%s", ps.Host, r.URL.Path, query)
|
||||
fmt.Printf("[faasd] proxy: %s\n", upstream)
|
||||
|
||||
if r.Body != nil {
|
||||
defer r.Body.Close()
|
||||
}
|
||||
|
||||
wrapper := ioutil.NopCloser(r.Body)
|
||||
upReq, upErr := http.NewRequest(r.Method, upstream, wrapper)
|
||||
|
||||
copyHeaders(upReq.Header, &r.Header)
|
||||
|
||||
if upErr != nil {
|
||||
log.Println(upErr)
|
||||
|
||||
http.Error(w, upErr.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
upRes, upResErr := http.DefaultClient.Do(upReq)
|
||||
|
||||
if upResErr != nil {
|
||||
log.Println(upResErr)
|
||||
|
||||
http.Error(w, upResErr.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
copyHeaders(w.Header(), &upRes.Header)
|
||||
|
||||
w.WriteHeader(upRes.StatusCode)
|
||||
io.Copy(w, upRes.Body)
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user