mirror of
https://github.com/openfaas/faas.git
synced 2025-06-09 00:36:46 +00:00
Forward client HTTP headers through pipeline
This commit is contained in:
parent
a740783c78
commit
c705cd8e33
@ -79,9 +79,8 @@ func MakeFunctionReader(metricsOptions metrics.MetricOptions, c *client.Client)
|
|||||||
// MakeNewFunctionHandler creates a new function (service) inside the swarm network.
|
// MakeNewFunctionHandler creates a new function (service) inside the swarm network.
|
||||||
func MakeNewFunctionHandler(metricsOptions metrics.MetricOptions, c *client.Client) http.HandlerFunc {
|
func MakeNewFunctionHandler(metricsOptions metrics.MetricOptions, c *client.Client) http.HandlerFunc {
|
||||||
return func(w http.ResponseWriter, r *http.Request) {
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
|
||||||
body, _ := ioutil.ReadAll(r.Body)
|
|
||||||
defer r.Body.Close()
|
defer r.Body.Close()
|
||||||
|
body, _ := ioutil.ReadAll(r.Body)
|
||||||
|
|
||||||
request := requests.CreateFunctionRequest{}
|
request := requests.CreateFunctionRequest{}
|
||||||
err := json.Unmarshal(body, &request)
|
err := json.Unmarshal(body, &request)
|
||||||
@ -91,7 +90,10 @@ func MakeNewFunctionHandler(metricsOptions metrics.MetricOptions, c *client.Clie
|
|||||||
}
|
}
|
||||||
|
|
||||||
fmt.Println(request)
|
fmt.Println(request)
|
||||||
w.WriteHeader(http.StatusNotImplemented)
|
|
||||||
|
// TODO: review why this was here... debugging?
|
||||||
|
// w.WriteHeader(http.StatusNotImplemented)
|
||||||
|
|
||||||
options := types.ServiceCreateOptions{}
|
options := types.ServiceCreateOptions{}
|
||||||
spec := makeSpec(&request)
|
spec := makeSpec(&request)
|
||||||
|
|
||||||
|
@ -103,6 +103,14 @@ func lookupSwarmService(serviceName string, c *client.Client) (bool, error) {
|
|||||||
return len(services) > 0, err
|
return len(services) > 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func copyHeaders(destination *http.Header, source *http.Header) {
|
||||||
|
for k, vv := range *source {
|
||||||
|
vvClone := make([]string, len(vv))
|
||||||
|
copy(vvClone, vv)
|
||||||
|
(*destination)[k] = vvClone
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func invokeService(w http.ResponseWriter, r *http.Request, metrics metrics.MetricOptions, service string, requestBody []byte, logger *logrus.Logger, proxyClient *http.Client) {
|
func invokeService(w http.ResponseWriter, r *http.Request, metrics metrics.MetricOptions, service string, requestBody []byte, logger *logrus.Logger, proxyClient *http.Client) {
|
||||||
stamp := strconv.FormatInt(time.Now().Unix(), 10)
|
stamp := strconv.FormatInt(time.Now().Unix(), 10)
|
||||||
|
|
||||||
@ -123,14 +131,12 @@ func invokeService(w http.ResponseWriter, r *http.Request, metrics metrics.Metri
|
|||||||
}
|
}
|
||||||
|
|
||||||
contentType := r.Header.Get("Content-Type")
|
contentType := r.Header.Get("Content-Type")
|
||||||
if len(contentType) == 0 {
|
|
||||||
contentType = "text/plain"
|
|
||||||
}
|
|
||||||
|
|
||||||
fmt.Printf("[%s] Forwarding request [%s] to: %s\n", stamp, contentType, url)
|
fmt.Printf("[%s] Forwarding request [%s] to: %s\n", stamp, contentType, url)
|
||||||
|
|
||||||
request, err := http.NewRequest("POST", url, bytes.NewReader(requestBody))
|
request, err := http.NewRequest("POST", url, bytes.NewReader(requestBody))
|
||||||
request.Header.Add("Content-Type", contentType)
|
|
||||||
|
copyHeaders(&request.Header, &r.Header)
|
||||||
|
|
||||||
defer request.Body.Close()
|
defer request.Body.Close()
|
||||||
|
|
||||||
response, err := proxyClient.Do(request)
|
response, err := proxyClient.Do(request)
|
||||||
@ -152,6 +158,9 @@ func invokeService(w http.ResponseWriter, r *http.Request, metrics metrics.Metri
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
clientHeader := w.Header()
|
||||||
|
copyHeaders(&clientHeader, &response.Header)
|
||||||
|
|
||||||
// Match header for strict services
|
// Match header for strict services
|
||||||
w.Header().Set("Content-Type", r.Header.Get("Content-Type"))
|
w.Header().Set("Content-Type", r.Header.Get("Content-Type"))
|
||||||
|
|
||||||
|
4
gateway/tests/integration/README.md
Normal file
4
gateway/tests/integration/README.md
Normal file
@ -0,0 +1,4 @@
|
|||||||
|
# Integration testing
|
||||||
|
|
||||||
|
These tests should be run against the sample stack included in the repository root.
|
||||||
|
|
@ -12,10 +12,11 @@ import (
|
|||||||
// Before running these tests do a Docker stack deploy.
|
// Before running these tests do a Docker stack deploy.
|
||||||
|
|
||||||
func fireRequest(url string, method string, reqBody string) (string, int, error) {
|
func fireRequest(url string, method string, reqBody string) (string, int, error) {
|
||||||
return fireRequestWithHeader(url, method, reqBody, "")
|
headers := make(map[string]string)
|
||||||
|
return fireRequestWithHeaders(url, method, reqBody, headers)
|
||||||
}
|
}
|
||||||
|
|
||||||
func fireRequestWithHeader(url string, method string, reqBody string, xheader string) (string, int, error) {
|
func fireRequestWithHeaders(url string, method string, reqBody string, headers map[string]string) (string, int, error) {
|
||||||
httpClient := http.Client{
|
httpClient := http.Client{
|
||||||
Timeout: time.Second * 2, // Maximum of 2 secs
|
Timeout: time.Second * 2, // Maximum of 2 secs
|
||||||
}
|
}
|
||||||
@ -26,9 +27,10 @@ func fireRequestWithHeader(url string, method string, reqBody string, xheader st
|
|||||||
}
|
}
|
||||||
|
|
||||||
req.Header.Set("User-Agent", "go-integration")
|
req.Header.Set("User-Agent", "go-integration")
|
||||||
if len(xheader) != 0 {
|
for kk, vv := range headers {
|
||||||
req.Header.Set("X-Function", xheader)
|
req.Header.Set(kk, vv)
|
||||||
}
|
}
|
||||||
|
|
||||||
res, getErr := httpClient.Do(req)
|
res, getErr := httpClient.Do(req)
|
||||||
if getErr != nil {
|
if getErr != nil {
|
||||||
log.Fatal(getErr)
|
log.Fatal(getErr)
|
||||||
@ -46,14 +48,33 @@ func TestGet_Rejected(t *testing.T) {
|
|||||||
var reqBody string
|
var reqBody string
|
||||||
_, code, err := fireRequest("http://localhost:8080/function/func_echoit", http.MethodGet, reqBody)
|
_, code, err := fireRequest("http://localhost:8080/function/func_echoit", http.MethodGet, reqBody)
|
||||||
if code != http.StatusInternalServerError {
|
if code != http.StatusInternalServerError {
|
||||||
t.Log("Failed")
|
t.Logf("Failed got: %d", code)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Log(err)
|
t.Log(err)
|
||||||
t.Fail()
|
t.Fail()
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestEchoIt_Post_Route_Handler_ForwardsClientHeaders(t *testing.T) {
|
||||||
|
reqBody := "test message"
|
||||||
|
headers := make(map[string]string, 0)
|
||||||
|
headers["X-Api-Key"] = "123"
|
||||||
|
|
||||||
|
body, code, err := fireRequestWithHeaders("http://localhost:8080/function/func_echoit", http.MethodPost, reqBody, headers)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
t.Log(err)
|
||||||
|
t.Fail()
|
||||||
|
}
|
||||||
|
if code != http.StatusOK {
|
||||||
|
t.Log("Failed")
|
||||||
|
}
|
||||||
|
if body != reqBody {
|
||||||
|
t.Log("Expected body returned")
|
||||||
|
t.Fail()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestEchoIt_Post_Route_Handler(t *testing.T) {
|
func TestEchoIt_Post_Route_Handler(t *testing.T) {
|
||||||
@ -73,9 +94,12 @@ func TestEchoIt_Post_Route_Handler(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestEchoIt_Post_Header_Handler(t *testing.T) {
|
func TestEchoIt_Post_X_Header_Routing_Handler(t *testing.T) {
|
||||||
reqBody := "test message"
|
reqBody := "test message"
|
||||||
body, code, err := fireRequestWithHeader("http://localhost:8080/", http.MethodPost, reqBody, "func_echoit")
|
headers := make(map[string]string, 0)
|
||||||
|
headers["X-Function"] = "func_echoit"
|
||||||
|
|
||||||
|
body, code, err := fireRequestWithHeaders("http://localhost:8080/", http.MethodPost, reqBody, headers)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Log(err)
|
t.Log(err)
|
||||||
|
@ -2,6 +2,7 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"log"
|
"log"
|
||||||
"net/http"
|
"net/http"
|
||||||
@ -31,9 +32,19 @@ func buildFunctionInput(config *WatchdogConfig, r *http.Request) ([]byte, error)
|
|||||||
return res, err
|
return res, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func debugHeaders(source *http.Header, direction string) {
|
||||||
|
for k, vv := range *source {
|
||||||
|
fmt.Printf("[%s] %s=%s\n", direction, k, vv)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func pipeRequest(config *WatchdogConfig, w http.ResponseWriter, r *http.Request) {
|
func pipeRequest(config *WatchdogConfig, w http.ResponseWriter, r *http.Request) {
|
||||||
parts := strings.Split(config.faasProcess, " ")
|
parts := strings.Split(config.faasProcess, " ")
|
||||||
|
|
||||||
|
if config.debugHeaders {
|
||||||
|
debugHeaders(&r.Header, "in")
|
||||||
|
}
|
||||||
|
|
||||||
targetCmd := exec.Command(parts[0], parts[1:]...)
|
targetCmd := exec.Command(parts[0], parts[1:]...)
|
||||||
writer, _ := targetCmd.StdinPipe()
|
writer, _ := targetCmd.StdinPipe()
|
||||||
|
|
||||||
@ -51,6 +62,7 @@ func pipeRequest(config *WatchdogConfig, w http.ResponseWriter, r *http.Request)
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Write to pipe in separate go-routine to prevent blocking
|
||||||
go func() {
|
go func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
writer.Write(res)
|
writer.Write(res)
|
||||||
@ -77,12 +89,18 @@ func pipeRequest(config *WatchdogConfig, w http.ResponseWriter, r *http.Request)
|
|||||||
os.Stdout.Write(out)
|
os.Stdout.Write(out)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Match header for strict services
|
clientContentType := r.Header.Get("Content-Type")
|
||||||
if r.Header.Get("Content-Type") == "application/json" {
|
if len(clientContentType) > 0 {
|
||||||
w.Header().Set("Content-Type", "application/json")
|
w.Header().Set("Content-Type", "application/json")
|
||||||
}
|
}
|
||||||
|
|
||||||
w.WriteHeader(200)
|
w.WriteHeader(200)
|
||||||
w.Write(out)
|
w.Write(out)
|
||||||
|
|
||||||
|
if config.debugHeaders {
|
||||||
|
header := w.Header()
|
||||||
|
debugHeaders(&header, "out")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func makeRequestHandler(config *WatchdogConfig) func(http.ResponseWriter, *http.Request) {
|
func makeRequestHandler(config *WatchdogConfig) func(http.ResponseWriter, *http.Request) {
|
||||||
|
@ -58,13 +58,15 @@ func (ReadConfig) Read(hasEnv HasEnv) WatchdogConfig {
|
|||||||
cfg.writeDebug = parseBoolValue(hasEnv.Getenv("write_debug"))
|
cfg.writeDebug = parseBoolValue(hasEnv.Getenv("write_debug"))
|
||||||
|
|
||||||
cfg.marshallRequest = parseBoolValue(hasEnv.Getenv("marshall_request"))
|
cfg.marshallRequest = parseBoolValue(hasEnv.Getenv("marshall_request"))
|
||||||
|
cfg.debugHeaders = parseBoolValue(hasEnv.Getenv("debug_headers"))
|
||||||
|
|
||||||
return cfg
|
return cfg
|
||||||
}
|
}
|
||||||
|
|
||||||
// WatchdogConfig for the process.
|
// WatchdogConfig for the process.
|
||||||
type WatchdogConfig struct {
|
type WatchdogConfig struct {
|
||||||
readTimeout time.Duration
|
readTimeout time.Duration
|
||||||
|
|
||||||
writeTimeout time.Duration
|
writeTimeout time.Duration
|
||||||
// faasProcess is the process to exec
|
// faasProcess is the process to exec
|
||||||
faasProcess string
|
faasProcess string
|
||||||
@ -73,4 +75,7 @@ type WatchdogConfig struct {
|
|||||||
writeDebug bool
|
writeDebug bool
|
||||||
|
|
||||||
marshallRequest bool
|
marshallRequest bool
|
||||||
|
|
||||||
|
// prints out all incoming and out-going HTTP headers
|
||||||
|
debugHeaders bool
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user