diff --git a/docker-compose.yml b/docker-compose.yml index 28aa0e31..db39f059 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -92,7 +92,7 @@ services: - 'node.platform.os == linux' queue-worker: - image: openfaas/queue-worker:0.5.4 + image: openfaas/queue-worker:0.6.0 networks: - functions environment: diff --git a/gateway/Gopkg.lock b/gateway/Gopkg.lock index 2d86b1e3..a68d160b 100644 --- a/gateway/Gopkg.lock +++ b/gateway/Gopkg.lock @@ -3,163 +3,128 @@ [[projects]] branch = "master" - digest = "1:0c5485088ce274fac2e931c1b979f2619345097b39d91af3239977114adf0320" name = "github.com/beorn7/perks" packages = ["quantile"] - pruneopts = "" revision = "4c0e84591b9aa9e6dcfdf3e020114cd81f89d5f9" [[projects]] - digest = "1:a9e4ff75555e4500e409dc87c1d708b090bb8dd77f889bbf266773f3dc23af70" name = "github.com/docker/distribution" packages = ["uuid"] - pruneopts = "" revision = "48294d928ced5dd9b378f7fd7c6f5da3ff3f2c89" version = "v2.6.2" [[projects]] - digest = "1:70a80170917a15e1ff02faab5f9e716e945e0676e86599ba144d38f96e30c3bf" name = "github.com/gogo/protobuf" packages = [ "gogoproto", "proto", - "protoc-gen-gogo/descriptor", + "protoc-gen-gogo/descriptor" ] - pruneopts = "" revision = "342cbe0a04158f6dcb03ca0079991a51a4248c02" version = "v0.5" [[projects]] branch = "master" - digest = "1:3b760d3b93f994df8eb1d9ebfad17d3e9e37edcb7f7efaa15b427c0d7a64f4e4" name = "github.com/golang/protobuf" packages = ["proto"] - pruneopts = "" revision = "1e59b77b52bf8e4b449a57e6f79f21226d571845" [[projects]] - digest = "1:20ed7daa9b3b38b6d1d39b48ab3fd31122be5419461470d0c28de3e121c93ecf" name = "github.com/gorilla/context" packages = ["."] - pruneopts = "" revision = "1ea25387ff6f684839d82767c1733ff4d4d15d0a" version = "v1.1" [[projects]] - digest = "1:c2c8666b4836c81a1d247bdf21c6a6fc1ab586538ab56f74437c2e0df5c375e1" name = "github.com/gorilla/mux" packages = ["."] - pruneopts = "" revision = "e3702bed27f0d39777b0b37b664b6280e8ef8fbf" version = "v1.6.2" [[projects]] - digest = "1:4c23ced97a470b17d9ffd788310502a077b9c1f60221a85563e49696276b4147" name = "github.com/matttproud/golang_protobuf_extensions" packages = ["pbutil"] - pruneopts = "" revision = "3247c84500bff8d9fb6d579d800f20b3e091582c" version = "v1.0.0" [[projects]] - digest = "1:665af347df4c5d1ae4c3eacd0754f5337a301f6a3f2444c9993b996605c8c02b" name = "github.com/nats-io/go-nats" packages = [ ".", "encoders/builtin", - "util", + "util" ] - pruneopts = "" revision = "062418ea1c2181f52dc0f954f6204370519a868b" version = "v1.5.0" [[projects]] - digest = "1:ca0ffe35a7afa99d614845251f508505188d4117b87d368ee2f4b84c85f3fd29" name = "github.com/nats-io/go-nats-streaming" packages = [ ".", - "pb", + "pb" ] - pruneopts = "" revision = "e15a53f85e4932540600a16b56f6c4f65f58176f" version = "v0.4.0" [[projects]] - digest = "1:be61e8224b84064109eaba8157cbb4bbe6ca12443e182b6624fdfa1c0dcf53d9" name = "github.com/nats-io/nuid" packages = ["."] - pruneopts = "" revision = "289cccf02c178dc782430d534e3c1f5b72af807f" version = "v1.0.0" [[projects]] - digest = "1:3eb611e5a4c84bb93879f9704b2fdc0ea9b29e59f465b3b8b2af382a747ef86f" name = "github.com/openfaas/faas-provider" packages = ["auth"] - pruneopts = "" - revision = "9ce928bc82cbb2642e6d534f93a7904116179e6c" - version = "0.7.0" + revision = "220324e98f5db5aa61f02d1ab13f03e91310796c" + version = "0.8.1" [[projects]] - digest = "1:26136842b63206aa4f8d20730edcdf88bd0e89f81df51b199b8baf9b9ed288bc" name = "github.com/openfaas/nats-queue-worker" - packages = ["handler"] - pruneopts = "" - revision = "15287c9b2af293cee0c545ccc014a68e1f0a4424" - version = "0.4.7" + packages = [ + "handler", + "nats" + ] + revision = "1d731056d1729865034b4598f44031eab7f76adb" + version = "0.6.0" [[projects]] - digest = "1:6f218995d6a74636cfcab45ce03005371e682b4b9bee0e5eb0ccfd83ef85364f" name = "github.com/prometheus/client_golang" packages = [ "prometheus", "prometheus/internal", + "prometheus/promhttp" ] - pruneopts = "" revision = "505eaef017263e299324067d40ca2c48f6a2cf50" version = "v0.9.2" [[projects]] branch = "master" - digest = "1:60aca47f4eeeb972f1b9da7e7db51dee15ff6c59f7b401c1588b8e6771ba15ef" name = "github.com/prometheus/client_model" packages = ["go"] - pruneopts = "" revision = "99fa1f4be8e564e8a6b613da7fa6f46c9edafc6c" [[projects]] branch = "master" - digest = "1:e3aa5178be4fc4ae8cdb37d11c02f7490c00450a9f419e6aa84d02d3b47e90d2" name = "github.com/prometheus/common" packages = [ "expfmt", "internal/bitbucket.org/ww/goautoneg", - "model", + "model" ] - pruneopts = "" revision = "2e54d0b93cba2fd133edc32211dcc32c06ef72ca" [[projects]] branch = "master" - digest = "1:f0857d075687b4ddebb10c8403d5fec9f093f7208b34ed5b6f3101ee2e77cec5" name = "github.com/prometheus/procfs" packages = [ ".", - "xfs", + "xfs" ] - pruneopts = "" revision = "b15cd069a83443be3154b719d0cc9fe8117f09fb" [solve-meta] analyzer-name = "dep" analyzer-version = 1 - input-imports = [ - "github.com/docker/distribution/uuid", - "github.com/gorilla/mux", - "github.com/openfaas/faas-provider/auth", - "github.com/openfaas/nats-queue-worker/handler", - "github.com/prometheus/client_golang/prometheus", - "github.com/prometheus/client_model/go", - ] + inputs-digest = "b2b3291c34b853085074ceb72c897de6bb7baec0b0193da42a5d0850e5280133" solver-name = "gps-cdcl" solver-version = 1 diff --git a/gateway/Gopkg.toml b/gateway/Gopkg.toml index db82de8f..6efdebaf 100644 --- a/gateway/Gopkg.toml +++ b/gateway/Gopkg.toml @@ -14,7 +14,7 @@ ignored = ["github.com/openfaas/faas/gateway/queue"] [[constraint]] name = "github.com/openfaas/nats-queue-worker" - version = "0.4.7" + version = "0.6.0" [[constraint]] name = "github.com/prometheus/client_golang" diff --git a/gateway/server.go b/gateway/server.go index c9a7ab0f..8201bb40 100644 --- a/gateway/server.go +++ b/gateway/server.go @@ -10,12 +10,11 @@ import ( "time" "github.com/gorilla/mux" - "github.com/openfaas/faas/gateway/handlers" - "github.com/openfaas/faas/gateway/scaling" - "github.com/openfaas/faas-provider/auth" + "github.com/openfaas/faas/gateway/handlers" "github.com/openfaas/faas/gateway/metrics" "github.com/openfaas/faas/gateway/plugin" + "github.com/openfaas/faas/gateway/scaling" "github.com/openfaas/faas/gateway/types" natsHandler "github.com/openfaas/nats-queue-worker/handler" ) @@ -103,7 +102,12 @@ func main() { if config.UseNATS() { log.Println("Async enabled: Using NATS Streaming.") - natsQueue, queueErr := natsHandler.CreateNatsQueue(*config.NATSAddress, *config.NATSPort, natsHandler.DefaultNatsConfig{}) + maxReconnect := 60 + interval := time.Second * 2 + + defaultNATSConfig := natsHandler.NewDefaultNATSConfig(maxReconnect, interval) + + natsQueue, queueErr := natsHandler.CreateNATSQueue(*config.NATSAddress, *config.NATSPort, defaultNATSConfig) if queueErr != nil { log.Fatalln(queueErr) } diff --git a/gateway/vendor/github.com/openfaas/faas-provider/.DEREK.yml b/gateway/vendor/github.com/openfaas/faas-provider/.DEREK.yml new file mode 100644 index 00000000..8f544aa5 --- /dev/null +++ b/gateway/vendor/github.com/openfaas/faas-provider/.DEREK.yml @@ -0,0 +1 @@ +redirect: https://raw.githubusercontent.com/openfaas/faas/master/.DEREK.yml diff --git a/gateway/vendor/github.com/openfaas/faas-provider/Dockerfile b/gateway/vendor/github.com/openfaas/faas-provider/Dockerfile index 0d075f76..70f7c65c 100644 --- a/gateway/vendor/github.com/openfaas/faas-provider/Dockerfile +++ b/gateway/vendor/github.com/openfaas/faas-provider/Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.9.7-alpine +FROM golang:1.10.4-alpine3.8 RUN mkdir -p /go/src/github.com/openfaas/faas-provider/ @@ -12,7 +12,7 @@ COPY serve.go . RUN go test ./auth/ -v \ && CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o faas-provider . -FROM alpine:3.7 +FROM alpine:3.8 RUN apk --no-cache add ca-certificates WORKDIR /root/ diff --git a/gateway/vendor/github.com/openfaas/faas-provider/Gopkg.lock b/gateway/vendor/github.com/openfaas/faas-provider/Gopkg.lock index 65cc582b..07862a29 100644 --- a/gateway/vendor/github.com/openfaas/faas-provider/Gopkg.lock +++ b/gateway/vendor/github.com/openfaas/faas-provider/Gopkg.lock @@ -2,20 +2,24 @@ [[projects]] + digest = "1:160eabf7a69910fd74f29c692718bc2437c1c1c7d4c9dea9712357752a70e5df" name = "github.com/gorilla/context" packages = ["."] + pruneopts = "UT" revision = "1ea25387ff6f684839d82767c1733ff4d4d15d0a" version = "v1.1" [[projects]] + digest = "1:e73f5b0152105f18bc131fba127d9949305c8693f8a762588a82a48f61756f5f" name = "github.com/gorilla/mux" packages = ["."] - revision = "7f08801859139f86dfafd1c296e2cba9a80d292e" - version = "v1.6.0" + pruneopts = "UT" + revision = "e3702bed27f0d39777b0b37b664b6280e8ef8fbf" + version = "v1.6.2" [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "22efb1c7d9d2403520db6d9a878b2f1e52741e51425cbda743cfd25f00c84a9b" + input-imports = ["github.com/gorilla/mux"] solver-name = "gps-cdcl" solver-version = 1 diff --git a/gateway/vendor/github.com/openfaas/faas-provider/Gopkg.toml b/gateway/vendor/github.com/openfaas/faas-provider/Gopkg.toml index e36ca86f..3810eacc 100644 --- a/gateway/vendor/github.com/openfaas/faas-provider/Gopkg.toml +++ b/gateway/vendor/github.com/openfaas/faas-provider/Gopkg.toml @@ -4,4 +4,4 @@ [[constraint]] name = "github.com/gorilla/mux" - version = "1.6.0" + version = "1.6.2" diff --git a/gateway/vendor/github.com/openfaas/faas-provider/proxy/handler_test.go b/gateway/vendor/github.com/openfaas/faas-provider/proxy/handler_test.go new file mode 100644 index 00000000..996847c9 --- /dev/null +++ b/gateway/vendor/github.com/openfaas/faas-provider/proxy/handler_test.go @@ -0,0 +1,134 @@ +package proxy + +import ( + "bytes" + "errors" + "log" + "net/http" + "net/http/httptest" + "net/url" + "strings" + "testing" + "time" + + "github.com/gorilla/mux" +) + +type testBaseURLResolver struct { + testServerBase string + err error +} + +func (tr *testBaseURLResolver) Resolve(name string) (url.URL, error) { + if tr.err != nil { + return url.URL{}, tr.err + } + + return url.URL{ + Scheme: "http", + Host: tr.testServerBase, + }, nil +} +func Test_NewHandlerFunc_Panic(t *testing.T) { + defer func() { + if r := recover(); r == nil { + t.Errorf("should panic if resolver is nil") + } + }() + + NewHandlerFunc(time.Second, nil) +} + +func Test_NewHandlerFunc_NoPanic(t *testing.T) { + defer func() { + if r := recover(); r != nil { + t.Errorf("should not panic if resolver is not nil") + } + }() + + proxyFunc := NewHandlerFunc(time.Second, &testBaseURLResolver{}) + if proxyFunc == nil { + t.Errorf("proxy handler func is nil") + } +} + +func Test_ProxyHandler_NonAllowedMethods(t *testing.T) { + + proxyFunc := NewHandlerFunc(time.Second, &testBaseURLResolver{}) + + nonAllowedMethods := []string{ + http.MethodHead, http.MethodConnect, http.MethodOptions, http.MethodTrace, + } + + for _, method := range nonAllowedMethods { + t.Run(method+" method is not allowed", func(t *testing.T) { + w := httptest.NewRecorder() + req := httptest.NewRequest(method, "http://example.com/foo", nil) + proxyFunc(w, req) + resp := w.Result() + if resp.StatusCode != http.StatusMethodNotAllowed { + t.Errorf("expected status code `%d`, got `%d`", http.StatusMethodNotAllowed, resp.StatusCode) + } + }) + } +} + +func Test_ProxyHandler_MissingFunctionNameError(t *testing.T) { + proxyFunc := NewHandlerFunc(time.Second, &testBaseURLResolver{"", nil}) + + w := httptest.NewRecorder() + req := httptest.NewRequest("GET", "http://example.com/foo", nil) + req = mux.SetURLVars(req, map[string]string{"name": ""}) + + proxyFunc(w, req) + + if w.Code != http.StatusBadRequest { + t.Errorf("expected status code `%d`, got `%d`", http.StatusBadRequest, w.Code) + } + + respBody := w.Body.String() + if respBody != errMissingFunctionName { + t.Errorf("expected error message `%s`, got `%s`", errMissingFunctionName, respBody) + } +} + +func Test_ProxyHandler_ResolveError(t *testing.T) { + logs := &bytes.Buffer{} + log.SetOutput(logs) + + resolveErr := errors.New("can not find test service `foo`") + proxyFunc := NewHandlerFunc(time.Second, &testBaseURLResolver{"", resolveErr}) + + w := httptest.NewRecorder() + req := httptest.NewRequest("GET", "http://example.com/foo", nil) + req = mux.SetURLVars(req, map[string]string{"name": "foo"}) + + proxyFunc(w, req) + + if w.Code != http.StatusNotFound { + t.Errorf("expected status code `%d`, got `%d`", http.StatusBadRequest, w.Code) + } + + respBody := w.Body.String() + if respBody != "Cannot find service: foo." { + t.Errorf("expected error message `%s`, got `%s`", "Cannot find service: foo.", respBody) + } + + if !strings.Contains(logs.String(), resolveErr.Error()) { + t.Errorf("expected logs to contain `%s`", resolveErr.Error()) + } +} + +func Test_ProxyHandler_Proxy_Success(t *testing.T) { + t.Skip("Test not implemented yet") + // testFuncService := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // w.WriteHeader(http.StatusOK) + // })) + // proxyFunc := NewHandlerFunc(time.Second, &testBaseURLResolver{testFuncService.URL, nil}) + + // w := httptest.NewRecorder() + // req := httptest.NewRequest("GET", "http://example.com/foo", nil) + // req = mux.SetURLVars(req, map[string]string{"name": "foo"}) + + // proxyFunc(w, req) +} diff --git a/gateway/vendor/github.com/openfaas/faas-provider/proxy/proxy.go b/gateway/vendor/github.com/openfaas/faas-provider/proxy/proxy.go new file mode 100644 index 00000000..86e40179 --- /dev/null +++ b/gateway/vendor/github.com/openfaas/faas-provider/proxy/proxy.go @@ -0,0 +1,221 @@ +// Package proxy provides a default function invocation proxy method for OpenFaaS providers. +// +// The function proxy logic is used by the Gateway when `direct_functions` is set to false. +// This means that the provider will direct call the function and return the results. This +// involves resolving the function by name and then copying the result into the original HTTP +// request. +// +// openfaas-provider has implemented a standard HTTP HandlerFunc that will handle setting +// timeout values, parsing the request path, and copying the request/response correctly. +// bootstrapHandlers := bootTypes.FaaSHandlers{ +// FunctionProxy: proxy.NewHandlerFunc(timeout, resolver), +// DeleteHandler: handlers.MakeDeleteHandler(clientset), +// DeployHandler: handlers.MakeDeployHandler(clientset), +// FunctionReader: handlers.MakeFunctionReader(clientset), +// ReplicaReader: handlers.MakeReplicaReader(clientset), +// ReplicaUpdater: handlers.MakeReplicaUpdater(clientset), +// InfoHandler: handlers.MakeInfoHandler(), +// } +// +// proxy.NewHandlerFunc is optional, but does simplify the logic of your provider. +package proxy + +import ( + "fmt" + "io" + "log" + "net" + "net/http" + "net/url" + "time" + + "github.com/gorilla/mux" +) + +const ( + watchdogPort = "8080" + defaultContentType = "text/plain" + errMissingFunctionName = "Please provide a valid route /function/function_name." +) + +// BaseURLResolver URL resolver for proxy requests +// +// The FaaS provider implementation is responsible for providing the resolver function implementation. +// BaseURLResolver.Resolve will receive the function name and should return the URL of the +// function service. +type BaseURLResolver interface { + Resolve(functionName string) (url.URL, error) +} + +// NewHandlerFunc creates a standard http.HandlerFunc to proxy function requests. +// The returned http.HandlerFunc will ensure: +// +// - proper proxy request timeouts +// - proxy requests for GET, POST, PATCH, PUT, and DELETE +// - path parsing including support for extracing the function name, sub-paths, and query paremeters +// - passing and setting the `X-Forwarded-Host` and `X-Forwarded-For` headers +// - logging errors and proxy request timing to stdout +// +// Note that this will panic if `resolver` is nil. +func NewHandlerFunc(timeout time.Duration, resolver BaseURLResolver) http.HandlerFunc { + if resolver == nil { + panic("NewHandlerFunc: empty proxy handler resolver, cannot be nil") + } + + proxyClient := http.Client{ + // these Transport values ensure that the http Client will eventually timeout and prevents + // infinite retries. The default http.Client configure these timeouts. The specific + // values tuned via performance testing/benchmarking + // + // Additional context can be found at + // - https://medium.com/@nate510/don-t-use-go-s-default-http-client-4804cb19f779 + // - https://blog.cloudflare.com/the-complete-guide-to-golang-net-http-timeouts/ + Transport: &http.Transport{ + Proxy: http.ProxyFromEnvironment, + DialContext: (&net.Dialer{ + Timeout: timeout, + KeepAlive: 1 * time.Second, + }).DialContext, + IdleConnTimeout: 120 * time.Millisecond, + ExpectContinueTimeout: 1500 * time.Millisecond, + }, + Timeout: timeout, + CheckRedirect: func(req *http.Request, via []*http.Request) error { + return http.ErrUseLastResponse + }, + } + + return func(w http.ResponseWriter, r *http.Request) { + if r.Body != nil { + defer r.Body.Close() + } + + switch r.Method { + case http.MethodPost, + http.MethodPut, + http.MethodPatch, + http.MethodDelete, + http.MethodGet: + + proxyRequest(w, r, proxyClient, resolver) + + default: + w.WriteHeader(http.StatusMethodNotAllowed) + } + } +} + +// proxyRequest handles the actual resolution of and then request to the function service. +func proxyRequest(w http.ResponseWriter, originalReq *http.Request, proxyClient http.Client, resolver BaseURLResolver) { + ctx := originalReq.Context() + + pathVars := mux.Vars(originalReq) + functionName := pathVars["name"] + if functionName == "" { + writeError(w, http.StatusBadRequest, errMissingFunctionName) + return + } + + functionAddr, resolveErr := resolver.Resolve(functionName) + if resolveErr != nil { + // TODO: Should record the 404/not found error in Prometheus. + log.Printf("resolver error: cannot find %s: %s\n", functionName, resolveErr.Error()) + writeError(w, http.StatusNotFound, "Cannot find service: %s.", functionName) + return + } + + proxyReq, err := buildProxyRequest(originalReq, functionAddr, pathVars["params"]) + if err != nil { + writeError(w, http.StatusInternalServerError, "Failed to resolve service: %s.", functionName) + return + } + defer proxyReq.Body.Close() + + start := time.Now() + response, err := proxyClient.Do(proxyReq.WithContext(ctx)) + seconds := time.Since(start) + + if err != nil { + log.Printf("error with proxy request to: %s, %s\n", proxyReq.URL.String(), err.Error()) + + writeError(w, http.StatusInternalServerError, "Can't reach service for: %s.", functionName) + return + } + + log.Printf("%s took %f seconds\n", functionName, seconds.Seconds()) + + clientHeader := w.Header() + copyHeaders(clientHeader, &response.Header) + w.Header().Set("Content-Type", getContentType(response.Header, originalReq.Header)) + + w.WriteHeader(http.StatusOK) + io.Copy(w, response.Body) +} + +// buildProxyRequest creates a request object for the proxy request, it will ensure that +// the original request headers are preserved as well as setting openfaas system headers +func buildProxyRequest(originalReq *http.Request, baseURL url.URL, extraPath string) (*http.Request, error) { + + host := baseURL.Host + if baseURL.Port() == "" { + host = baseURL.Host + ":" + watchdogPort + } + + url := url.URL{ + Scheme: baseURL.Scheme, + Host: host, + Path: extraPath, + RawQuery: originalReq.URL.RawQuery, + } + + upstreamReq, err := http.NewRequest(originalReq.Method, url.String(), nil) + if err != nil { + return nil, err + } + copyHeaders(upstreamReq.Header, &originalReq.Header) + + if len(originalReq.Host) > 0 && upstreamReq.Header.Get("X-Forwarded-Host") == "" { + upstreamReq.Header["X-Forwarded-Host"] = []string{originalReq.Host} + } + if upstreamReq.Header.Get("X-Forwarded-For") == "" { + upstreamReq.Header["X-Forwarded-For"] = []string{originalReq.RemoteAddr} + } + + if originalReq.Body != nil { + upstreamReq.Body = originalReq.Body + } + + return upstreamReq, nil +} + +// copyHeaders clones the header values from the source into the destination. +func copyHeaders(destination http.Header, source *http.Header) { + for k, v := range *source { + vClone := make([]string, len(v)) + copy(vClone, v) + destination[k] = vClone + } +} + +// getContentType resolves the correct Content-Type for a proxied function. +func getContentType(request http.Header, proxyResponse http.Header) (headerContentType string) { + responseHeader := proxyResponse.Get("Content-Type") + requestHeader := request.Get("Content-Type") + + if len(responseHeader) > 0 { + headerContentType = responseHeader + } else if len(requestHeader) > 0 { + headerContentType = requestHeader + } else { + headerContentType = defaultContentType + } + + return headerContentType +} + +// writeError sets the response status code and write formats the provided message as the +// response body +func writeError(w http.ResponseWriter, statusCode int, msg string, args ...interface{}) { + w.WriteHeader(statusCode) + w.Write([]byte(fmt.Sprintf(msg, args...))) +} diff --git a/gateway/vendor/github.com/openfaas/faas-provider/proxy/proxy_test.go b/gateway/vendor/github.com/openfaas/faas-provider/proxy/proxy_test.go new file mode 100644 index 00000000..a68946b5 --- /dev/null +++ b/gateway/vendor/github.com/openfaas/faas-provider/proxy/proxy_test.go @@ -0,0 +1,385 @@ +package proxy + +import ( + "bytes" + "fmt" + "io/ioutil" + "net/http" + "net/http/httptest" + "net/url" + "testing" + + "github.com/gorilla/mux" +) + +func varHandler(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + w.WriteHeader(http.StatusOK) + w.Write([]byte(fmt.Sprintf("name: %s params: %s", vars["name"], vars["params"]))) +} + +func testResolver(functionName string) (url.URL, error) { + return url.URL{ + Scheme: "http", + Host: functionName, + }, nil +} + +func Test_pathParsing(t *testing.T) { + tt := []struct { + name string + functionPath string + functionName string + extraPath string + statusCode int + }{ + { + "simple_name_match", + "/function/echo", + "echo", + "", + 200, + }, + { + "simple_name_match_with_trailing_slash", + "/function/echo/", + "echo", + "", + 200, + }, + { + "name_match_with_additional_path_values", + "/function/echo/subPath/extras", + "echo", + "subPath/extras", + 200, + }, + { + "name_match_with_additional_path_values_and_querystring", + "/function/echo/subPath/extras?query=true", + "echo", + "subPath/extras", + 200, + }, + { + "not_found_if_no_name", + "/function/", + "", + "", + 404, + }, + } + + // Need to create a router that we can pass the request through so that the vars will be added to the context + router := mux.NewRouter() + router.HandleFunc("/function/{name:[-a-zA-Z_0-9]+}", varHandler) + router.HandleFunc("/function/{name:[-a-zA-Z_0-9]+}/", varHandler) + router.HandleFunc("/function/{name:[-a-zA-Z_0-9]+}/{params:.*}", varHandler) + + for _, s := range tt { + t.Run(s.name, func(t *testing.T) { + rr := httptest.NewRecorder() + req, err := http.NewRequest("GET", s.functionPath, nil) + if err != nil { + t.Fatal(err) + } + + router.ServeHTTP(rr, req) + if rr.Code != s.statusCode { + t.Fatalf("unexpected status code; got: %d, expected: %d", rr.Code, s.statusCode) + } + + body := rr.Body.String() + expectedBody := fmt.Sprintf("name: %s params: %s", s.functionName, s.extraPath) + if s.statusCode == http.StatusOK && body != expectedBody { + t.Fatalf("incorrect function name and path params; got: %s, expected: %s", body, expectedBody) + } + }) + } +} + +func Test_buildProxyRequest_Body_Method_Query(t *testing.T) { + srcBytes := []byte("hello world") + + reader := bytes.NewReader(srcBytes) + request, _ := http.NewRequest(http.MethodPost, "/?code=1", reader) + request.Header.Set("X-Source", "unit-test") + + if request.URL.RawQuery != "code=1" { + t.Errorf("Query - want: %s, got: %s", "code=1", request.URL.RawQuery) + t.Fail() + } + + funcURL, _ := testResolver("funcName") + upstream, err := buildProxyRequest(request, funcURL, "") + if err != nil { + t.Fatal(err.Error()) + } + + if request.Method != upstream.Method { + t.Errorf("Method - want: %s, got: %s", request.Method, upstream.Method) + t.Fail() + } + + upstreamBytes, _ := ioutil.ReadAll(upstream.Body) + + if string(upstreamBytes) != string(srcBytes) { + t.Errorf("Body - want: %s, got: %s", string(upstreamBytes), string(srcBytes)) + t.Fail() + } + + if request.Header.Get("X-Source") != upstream.Header.Get("X-Source") { + t.Errorf("Header X-Source - want: %s, got: %s", request.Header.Get("X-Source"), upstream.Header.Get("X-Source")) + t.Fail() + } + + if request.URL.RawQuery != upstream.URL.RawQuery { + t.Errorf("URL.RawQuery - want: %s, got: %s", request.URL.RawQuery, upstream.URL.RawQuery) + t.Fail() + } + +} + +func Test_buildProxyRequest_NoBody_GetMethod_NoQuery(t *testing.T) { + request, _ := http.NewRequest(http.MethodGet, "/", nil) + + funcURL, _ := testResolver("funcName") + upstream, err := buildProxyRequest(request, funcURL, "") + if err != nil { + t.Fatal(err.Error()) + } + + if request.Method != upstream.Method { + t.Errorf("Method - want: %s, got: %s", request.Method, upstream.Method) + t.Fail() + } + + if upstream.Body != nil { + t.Errorf("Body - expected nil") + t.Fail() + } + + if request.URL.RawQuery != upstream.URL.RawQuery { + t.Errorf("URL.RawQuery - want: %s, got: %s", request.URL.RawQuery, upstream.URL.RawQuery) + t.Fail() + } + +} + +func Test_buildProxyRequest_HasXForwardedHostHeaderWhenSet(t *testing.T) { + srcBytes := []byte("hello world") + + reader := bytes.NewReader(srcBytes) + request, err := http.NewRequest(http.MethodPost, "http://gateway/function?code=1", reader) + + if err != nil { + t.Fatal(err) + } + + funcURL, _ := testResolver("funcName") + upstream, err := buildProxyRequest(request, funcURL, "/") + if err != nil { + t.Fatal(err.Error()) + } + + if request.Host != upstream.Header.Get("X-Forwarded-Host") { + t.Errorf("Host - want: %s, got: %s", request.Host, upstream.Header.Get("X-Forwarded-Host")) + } +} + +func Test_buildProxyRequest_XForwardedHostHeader_Empty_WhenNotSet(t *testing.T) { + srcBytes := []byte("hello world") + + reader := bytes.NewReader(srcBytes) + request, err := http.NewRequest(http.MethodPost, "/function", reader) + + if err != nil { + t.Fatal(err) + } + + funcURL, _ := testResolver("funcName") + upstream, err := buildProxyRequest(request, funcURL, "/") + if err != nil { + t.Fatal(err.Error()) + } + + if request.Host != upstream.Header.Get("X-Forwarded-Host") { + t.Errorf("Host - want: %s, got: %s", request.Host, upstream.Header.Get("X-Forwarded-Host")) + } +} + +func Test_buildProxyRequest_XForwardedHostHeader_WhenAlreadyPresent(t *testing.T) { + srcBytes := []byte("hello world") + headerValue := "test.openfaas.com" + reader := bytes.NewReader(srcBytes) + request, err := http.NewRequest(http.MethodPost, "/function/test", reader) + + if err != nil { + t.Fatal(err) + } + + request.Header.Set("X-Forwarded-Host", headerValue) + funcURL, _ := testResolver("funcName") + upstream, err := buildProxyRequest(request, funcURL, "/") + if err != nil { + t.Fatal(err.Error()) + } + + if upstream.Header.Get("X-Forwarded-Host") != headerValue { + t.Errorf("X-Forwarded-Host - want: %s, got: %s", headerValue, upstream.Header.Get("X-Forwarded-Host")) + } +} + +func Test_buildProxyRequest_WithPathNoQuery(t *testing.T) { + srcBytes := []byte("hello world") + functionPath := "/employee/info/300" + + requestPath := fmt.Sprintf("/function/xyz%s", functionPath) + + reader := bytes.NewReader(srcBytes) + request, _ := http.NewRequest(http.MethodPost, requestPath, reader) + request.Header.Set("X-Source", "unit-test") + + queryWant := "" + if request.URL.RawQuery != queryWant { + + t.Errorf("Query - want: %s, got: %s", queryWant, request.URL.RawQuery) + t.Fail() + } + + funcURL, _ := testResolver("xyz") + upstream, err := buildProxyRequest(request, funcURL, functionPath) + if err != nil { + t.Fatal(err.Error()) + } + + if request.Method != upstream.Method { + t.Errorf("Method - want: %s, got: %s", request.Method, upstream.Method) + t.Fail() + } + + upstreamBytes, _ := ioutil.ReadAll(upstream.Body) + + if string(upstreamBytes) != string(srcBytes) { + t.Errorf("Body - want: %s, got: %s", string(upstreamBytes), string(srcBytes)) + t.Fail() + } + + if request.Header.Get("X-Source") != upstream.Header.Get("X-Source") { + t.Errorf("Header X-Source - want: %s, got: %s", request.Header.Get("X-Source"), upstream.Header.Get("X-Source")) + t.Fail() + } + + if request.URL.RawQuery != upstream.URL.RawQuery { + t.Errorf("URL.RawQuery - want: %s, got: %s", request.URL.RawQuery, upstream.URL.RawQuery) + t.Fail() + } + + if functionPath != upstream.URL.Path { + t.Errorf("URL.Path - want: %s, got: %s", functionPath, upstream.URL.Path) + t.Fail() + } + +} + +func Test_buildProxyRequest_WithNoPathNoQuery(t *testing.T) { + srcBytes := []byte("hello world") + functionPath := "/" + + requestPath := fmt.Sprintf("/function/xyz%s", functionPath) + + reader := bytes.NewReader(srcBytes) + request, _ := http.NewRequest(http.MethodPost, requestPath, reader) + request.Header.Set("X-Source", "unit-test") + + queryWant := "" + if request.URL.RawQuery != queryWant { + + t.Errorf("Query - want: %s, got: %s", queryWant, request.URL.RawQuery) + t.Fail() + } + + funcURL, _ := testResolver("xyz") + upstream, err := buildProxyRequest(request, funcURL, functionPath) + if err != nil { + t.Fatal(err.Error()) + } + + if request.Method != upstream.Method { + t.Errorf("Method - want: %s, got: %s", request.Method, upstream.Method) + t.Fail() + } + + upstreamBytes, _ := ioutil.ReadAll(upstream.Body) + + if string(upstreamBytes) != string(srcBytes) { + t.Errorf("Body - want: %s, got: %s", string(upstreamBytes), string(srcBytes)) + t.Fail() + } + + if request.Header.Get("X-Source") != upstream.Header.Get("X-Source") { + t.Errorf("Header X-Source - want: %s, got: %s", request.Header.Get("X-Source"), upstream.Header.Get("X-Source")) + t.Fail() + } + + if request.URL.RawQuery != upstream.URL.RawQuery { + t.Errorf("URL.RawQuery - want: %s, got: %s", request.URL.RawQuery, upstream.URL.RawQuery) + t.Fail() + } + + if functionPath != upstream.URL.Path { + t.Errorf("URL.Path - want: %s, got: %s", functionPath, upstream.URL.Path) + t.Fail() + } + +} + +func Test_buildProxyRequest_WithPathAndQuery(t *testing.T) { + srcBytes := []byte("hello world") + functionPath := "/employee/info/300" + + requestPath := fmt.Sprintf("/function/xyz%s?code=1", functionPath) + + reader := bytes.NewReader(srcBytes) + request, _ := http.NewRequest(http.MethodPost, requestPath, reader) + request.Header.Set("X-Source", "unit-test") + + if request.URL.RawQuery != "code=1" { + t.Errorf("Query - want: %s, got: %s", "code=1", request.URL.RawQuery) + t.Fail() + } + + funcURL, _ := testResolver("xyz") + upstream, err := buildProxyRequest(request, funcURL, functionPath) + if err != nil { + t.Fatal(err.Error()) + } + + if request.Method != upstream.Method { + t.Errorf("Method - want: %s, got: %s", request.Method, upstream.Method) + t.Fail() + } + + upstreamBytes, _ := ioutil.ReadAll(upstream.Body) + + if string(upstreamBytes) != string(srcBytes) { + t.Errorf("Body - want: %s, got: %s", string(upstreamBytes), string(srcBytes)) + t.Fail() + } + + if request.Header.Get("X-Source") != upstream.Header.Get("X-Source") { + t.Errorf("Header X-Source - want: %s, got: %s", request.Header.Get("X-Source"), upstream.Header.Get("X-Source")) + t.Fail() + } + + if request.URL.RawQuery != upstream.URL.RawQuery { + t.Errorf("URL.RawQuery - want: %s, got: %s", request.URL.RawQuery, upstream.URL.RawQuery) + t.Fail() + } + + if functionPath != upstream.URL.Path { + t.Errorf("URL.Path - want: %s, got: %s", functionPath, upstream.URL.Path) + t.Fail() + } + +} diff --git a/gateway/vendor/github.com/openfaas/faas-provider/serve.go b/gateway/vendor/github.com/openfaas/faas-provider/serve.go index 83c29ba8..cf50d07e 100644 --- a/gateway/vendor/github.com/openfaas/faas-provider/serve.go +++ b/gateway/vendor/github.com/openfaas/faas-provider/serve.go @@ -9,6 +9,7 @@ import ( "net/http" "github.com/gorilla/mux" + "github.com/openfaas/faas-provider/auth" "github.com/openfaas/faas-provider/types" ) @@ -26,6 +27,28 @@ func Router() *mux.Router { // Serve load your handlers into the correct OpenFaaS route spec. This function is blocking. func Serve(handlers *types.FaaSHandlers, config *types.FaaSConfig) { + + if config.EnableBasicAuth { + reader := auth.ReadBasicAuthFromDisk{ + SecretMountPath: config.SecretMountPath, + } + + credentials, err := reader.Read() + if err != nil { + log.Fatal(err) + } + + handlers.FunctionReader = auth.DecorateWithBasicAuth(handlers.FunctionReader, credentials) + handlers.DeployHandler = auth.DecorateWithBasicAuth(handlers.DeployHandler, credentials) + handlers.DeleteHandler = auth.DecorateWithBasicAuth(handlers.DeleteHandler, credentials) + handlers.UpdateHandler = auth.DecorateWithBasicAuth(handlers.UpdateHandler, credentials) + handlers.ReplicaReader = auth.DecorateWithBasicAuth(handlers.ReplicaReader, credentials) + handlers.ReplicaUpdater = auth.DecorateWithBasicAuth(handlers.ReplicaUpdater, credentials) + handlers.InfoHandler = auth.DecorateWithBasicAuth(handlers.InfoHandler, credentials) + handlers.SecretHandler = auth.DecorateWithBasicAuth(handlers.SecretHandler, credentials) + } + + // System (auth) endpoints r.HandleFunc("/system/functions", handlers.FunctionReader).Methods("GET") r.HandleFunc("/system/functions", handlers.DeployHandler).Methods("POST") r.HandleFunc("/system/functions", handlers.DeleteHandler).Methods("DELETE") @@ -33,11 +56,14 @@ func Serve(handlers *types.FaaSHandlers, config *types.FaaSConfig) { r.HandleFunc("/system/function/{name:[-a-zA-Z_0-9]+}", handlers.ReplicaReader).Methods("GET") r.HandleFunc("/system/scale-function/{name:[-a-zA-Z_0-9]+}", handlers.ReplicaUpdater).Methods("POST") + r.HandleFunc("/system/info", handlers.InfoHandler).Methods("GET") + r.HandleFunc("/system/secrets", handlers.SecretHandler).Methods(http.MethodGet, http.MethodPut, http.MethodPost, http.MethodDelete) + + // Open endpoints r.HandleFunc("/function/{name:[-a-zA-Z_0-9]+}", handlers.FunctionProxy) r.HandleFunc("/function/{name:[-a-zA-Z_0-9]+}/", handlers.FunctionProxy) - - r.HandleFunc("/system/info", handlers.InfoHandler).Methods("GET") + r.HandleFunc("/function/{name:[-a-zA-Z_0-9]+}/{params:.*}", handlers.FunctionProxy) if config.EnableHealth { r.HandleFunc("/healthz", handlers.Health).Methods("GET") diff --git a/gateway/vendor/github.com/openfaas/faas-provider/types/config.go b/gateway/vendor/github.com/openfaas/faas-provider/types/config.go index 53857ca8..fe446d31 100644 --- a/gateway/vendor/github.com/openfaas/faas-provider/types/config.go +++ b/gateway/vendor/github.com/openfaas/faas-provider/types/config.go @@ -9,10 +9,13 @@ import ( type FaaSHandlers struct { FunctionReader http.HandlerFunc DeployHandler http.HandlerFunc + // FunctionProxy provides the function invocation proxy logic. Use proxy.NewHandlerFunc to + // use the standard OpenFaaS proxy implementation or provide completely custom proxy logic. + FunctionProxy http.HandlerFunc DeleteHandler http.HandlerFunc ReplicaReader http.HandlerFunc - FunctionProxy http.HandlerFunc ReplicaUpdater http.HandlerFunc + SecretHandler http.HandlerFunc // Optional: Update an existing function UpdateHandler http.HandlerFunc @@ -22,8 +25,10 @@ type FaaSHandlers struct { // FaaSConfig set config for HTTP handlers type FaaSConfig struct { - TCPPort *int - ReadTimeout time.Duration - WriteTimeout time.Duration - EnableHealth bool + TCPPort *int + ReadTimeout time.Duration + WriteTimeout time.Duration + EnableHealth bool + EnableBasicAuth bool + SecretMountPath string } diff --git a/gateway/vendor/github.com/openfaas/nats-queue-worker/.DEREK.yml b/gateway/vendor/github.com/openfaas/nats-queue-worker/.DEREK.yml index c075190d..617820da 100644 --- a/gateway/vendor/github.com/openfaas/nats-queue-worker/.DEREK.yml +++ b/gateway/vendor/github.com/openfaas/nats-queue-worker/.DEREK.yml @@ -1,16 +1 @@ -maintainers: - - alexellis - - rgee0 - - johnmccabe - - jockdarock - - ericstoekl - - austinfrey - - itscaro - - rorpage - - kenfdev - - BurtonR - -features: - - dco_check - - comments - +redirect: https://raw.githubusercontent.com/openfaas/faas/master/.DEREK.yml \ No newline at end of file diff --git a/gateway/vendor/github.com/openfaas/nats-queue-worker/.travis.yml b/gateway/vendor/github.com/openfaas/nats-queue-worker/.travis.yml index 199e1e18..4c33f0dc 100644 --- a/gateway/vendor/github.com/openfaas/nats-queue-worker/.travis.yml +++ b/gateway/vendor/github.com/openfaas/nats-queue-worker/.travis.yml @@ -23,10 +23,16 @@ after_success: - if [ ! -z "$TRAVIS_TAG" ] ; then if [ -z $DOCKER_NS ] ; then - export DOCKER_NS=functions; + export DOCKER_NS=openfaas; fi docker tag $DOCKER_NS/queue-worker:latest-dev $DOCKER_NS/queue-worker:$TRAVIS_TAG; echo $DOCKER_PASSWORD | docker login -u=$DOCKER_USERNAME --password-stdin; docker push $DOCKER_NS/queue-worker:$TRAVIS_TAG; + + + docker tag $DOCKER_NS/queue-worker:latest-dev quay.io/$DOCKER_NS/queue-worker:$TRAVIS_TAG; + echo $QUAY_PASSWORD | docker login -u=$QUAY_USERNAME --password-stdin quay.io; + docker push quay.io/$DOCKER_NS/queue-worker:$TRAVIS_TAG; + fi \ No newline at end of file diff --git a/gateway/vendor/github.com/openfaas/nats-queue-worker/Dockerfile b/gateway/vendor/github.com/openfaas/nats-queue-worker/Dockerfile index 26cdf337..656f0f3a 100644 --- a/gateway/vendor/github.com/openfaas/nats-queue-worker/Dockerfile +++ b/gateway/vendor/github.com/openfaas/nats-queue-worker/Dockerfile @@ -1,14 +1,18 @@ -FROM golang:1.9.7-alpine as golang +FROM golang:1.10-alpine as golang WORKDIR /go/src/github.com/openfaas/nats-queue-worker COPY vendor vendor COPY handler handler +COPY nats nats COPY main.go . +COPY readconfig.go . +COPY readconfig_test.go . +COPY auth.go . -RUN go test -v ./handler/ +RUN go test -v ./... RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o app . -FROM alpine:3.7 +FROM alpine:3.8 RUN addgroup -S app \ && adduser -S -g app app \ diff --git a/gateway/vendor/github.com/openfaas/nats-queue-worker/Dockerfile.arm64 b/gateway/vendor/github.com/openfaas/nats-queue-worker/Dockerfile.arm64 new file mode 100644 index 00000000..b214dd00 --- /dev/null +++ b/gateway/vendor/github.com/openfaas/nats-queue-worker/Dockerfile.arm64 @@ -0,0 +1,32 @@ +FROM golang:1.10-alpine as golang +WORKDIR /go/src/github.com/openfaas/nats-queue-worker + +COPY vendor vendor +COPY handler handler +COPY nats nats +COPY auth.go . +COPY readconfig.go . +COPY readconfig_test.go . +COPY main.go . + +RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o app . + +FROM alpine:3.8 + +RUN addgroup -S app \ + && adduser -S -g app app \ + && apk add --no-cache ca-certificates + +WORKDIR /home/app + +EXPOSE 8080 + +ENV http_proxy "" +ENV https_proxy "" + +COPY --from=golang /go/src/github.com/openfaas/nats-queue-worker/app . + +RUN chown -R app:app ./ + +USER app +CMD ["./app"] diff --git a/gateway/vendor/github.com/openfaas/nats-queue-worker/Dockerfile.armhf b/gateway/vendor/github.com/openfaas/nats-queue-worker/Dockerfile.armhf index df9dac6f..2d9b16c9 100644 --- a/gateway/vendor/github.com/openfaas/nats-queue-worker/Dockerfile.armhf +++ b/gateway/vendor/github.com/openfaas/nats-queue-worker/Dockerfile.armhf @@ -1,13 +1,17 @@ -FROM golang:1.9.7-alpine as golang +FROM golang:1.10-alpine as golang WORKDIR /go/src/github.com/openfaas/nats-queue-worker COPY vendor vendor COPY handler handler +COPY nats nats COPY main.go . +COPY readconfig.go . +COPY readconfig_test.go . +COPY auth.go . RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o app . -FROM alpine:3.7 +FROM alpine:3.8 RUN addgroup -S app \ && adduser -S -g app app \ @@ -16,6 +20,7 @@ RUN addgroup -S app \ WORKDIR /home/app EXPOSE 8080 + ENV http_proxy "" ENV https_proxy "" diff --git a/gateway/vendor/github.com/openfaas/nats-queue-worker/Gopkg.lock b/gateway/vendor/github.com/openfaas/nats-queue-worker/Gopkg.lock index 70bf744a..eef927e8 100644 --- a/gateway/vendor/github.com/openfaas/nats-queue-worker/Gopkg.lock +++ b/gateway/vendor/github.com/openfaas/nats-queue-worker/Gopkg.lock @@ -39,12 +39,18 @@ [[projects]] name = "github.com/openfaas/faas" packages = ["gateway/queue"] - revision = "81334141832d6c4fc9b78e6b5e17e5330eca7606" - version = "0.8.2" + revision = "bfa869ec8c0c04c26c5b0ed434bc367e712dcaef" + version = "0.10.2" + +[[projects]] + name = "github.com/openfaas/faas-provider" + packages = ["auth"] + revision = "9ce928bc82cbb2642e6d534f93a7904116179e6c" + version = "0.7.0" [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "fb873f03ba109e6e479b9133d610f50d7fd6cbcdc0fe7dcf947ae70a1eade465" + inputs-digest = "b568e20d336fb2625ad72caa1b479a46ed1362e320083720feabccd1abaedbee" solver-name = "gps-cdcl" solver-version = 1 diff --git a/gateway/vendor/github.com/openfaas/nats-queue-worker/Gopkg.toml b/gateway/vendor/github.com/openfaas/nats-queue-worker/Gopkg.toml index c690a8c7..bf7c08a4 100644 --- a/gateway/vendor/github.com/openfaas/nats-queue-worker/Gopkg.toml +++ b/gateway/vendor/github.com/openfaas/nats-queue-worker/Gopkg.toml @@ -31,7 +31,7 @@ [[constraint]] name = "github.com/openfaas/faas" - version = "0.8.2" + version = "0.10.2" [[constraint]] name = "github.com/nats-io/go-nats" @@ -40,3 +40,7 @@ [prune] go-tests = true unused-packages = true + +[[constraint]] + name = "github.com/openfaas/faas-provider" + version = "0.7.0" diff --git a/gateway/vendor/github.com/openfaas/nats-queue-worker/Makefile b/gateway/vendor/github.com/openfaas/nats-queue-worker/Makefile index a7a4981b..ad6b19f8 100644 --- a/gateway/vendor/github.com/openfaas/nats-queue-worker/Makefile +++ b/gateway/vendor/github.com/openfaas/nats-queue-worker/Makefile @@ -1,9 +1,29 @@ TAG?=latest +.PHONY: build build: - docker build --build-arg http_proxy="${http_proxy}" --build-arg https_proxy="${https_proxy}" -t functions/queue-worker:$(TAG) . + docker build --build-arg http_proxy="${http_proxy}" --build-arg https_proxy="${https_proxy}" -t openfaas/queue-worker:$(TAG) . +.PHONY: push push: - docker push functions/queue-worker:$(TAG) + docker push openfaas/queue-worker:$(TAG) +.PHONY: all all: build + +.PHONY: ci-armhf-build +ci-armhf-build: + docker build --build-arg http_proxy="${http_proxy}" --build-arg https_proxy="${https_proxy}" -t openfaas/queue-worker:$(TAG)-armhf . -f Dockerfile.armhf + +.PHONY: ci-armhf-push +ci-armhf-push: + docker push openfaas/queue-worker:$(TAG)-armhf + +.PHONY: ci-arm64-build +ci-arm64-build: + docker build --build-arg http_proxy="${http_proxy}" --build-arg https_proxy="${https_proxy}" -t openfaas/queue-worker:$(TAG)-arm64 . -f Dockerfile.arm64 + +.PHONY: ci-arm64-push +ci-arm64-push: + docker push openfaas/queue-worker:$(TAG)-arm64 + diff --git a/gateway/vendor/github.com/openfaas/nats-queue-worker/README.md b/gateway/vendor/github.com/openfaas/nats-queue-worker/README.md index ab515e39..72078233 100644 --- a/gateway/vendor/github.com/openfaas/nats-queue-worker/README.md +++ b/gateway/vendor/github.com/openfaas/nats-queue-worker/README.md @@ -1,12 +1,14 @@ ## Queue worker for OpenFaaS - NATS Streaming +[![Build Status](https://travis-ci.org/openfaas/nats-queue-worker.svg?branch=master)](https://travis-ci.org/openfaas/nats-queue-worker) + This is a queue-worker to enable asynchronous processing of function requests. > Note: A Kafka queue-worker is under-way through a PR on the main OpenFaaS repository. * [Read more in the async guide](https://github.com/openfaas/faas/blob/master/guide/asynchronous.md) -Hub image: [functions/queue-worker](https://hub.docker.com/r/functions/queue-worker/) +Hub image: [openfaas/queue-worker](https://hub.docker.com/r/openfaas/queue-worker/) License: MIT diff --git a/gateway/vendor/github.com/openfaas/nats-queue-worker/auth.go b/gateway/vendor/github.com/openfaas/nats-queue-worker/auth.go new file mode 100644 index 00000000..d52f9637 --- /dev/null +++ b/gateway/vendor/github.com/openfaas/nats-queue-worker/auth.go @@ -0,0 +1,43 @@ +package main + +import ( + "fmt" + "net/http" + "os" + + "github.com/openfaas/faas-provider/auth" +) + +//AddBasicAuth to a request by reading secrets +func AddBasicAuth(req *http.Request) error { + if os.Getenv("basic_auth") == "true" { + reader := auth.ReadBasicAuthFromDisk{} + + if len(os.Getenv("secret_mount_path")) > 0 { + reader.SecretMountPath = os.Getenv("secret_mount_path") + } + + credentials, err := reader.Read() + if err != nil { + return fmt.Errorf("Unable to read basic auth: %s", err.Error()) + } + + req.SetBasicAuth(credentials.User, credentials.Password) + } + return nil +} + +//LoadCredentials load credentials from dis +func LoadCredentials() (*auth.BasicAuthCredentials, error) { + reader := auth.ReadBasicAuthFromDisk{} + + if len(os.Getenv("secret_mount_path")) > 0 { + reader.SecretMountPath = os.Getenv("secret_mount_path") + } + + credentials, err := reader.Read() + if err != nil { + return nil, fmt.Errorf("Unable to read basic auth: %s", err.Error()) + } + return credentials, nil +} diff --git a/gateway/vendor/github.com/openfaas/nats-queue-worker/build.sh b/gateway/vendor/github.com/openfaas/nats-queue-worker/build.sh index e9e4d4af..390f3194 100755 --- a/gateway/vendor/github.com/openfaas/nats-queue-worker/build.sh +++ b/gateway/vendor/github.com/openfaas/nats-queue-worker/build.sh @@ -6,6 +6,6 @@ if [ $1 ] ; then eTAG=$1 fi -echo Building functions/queue-worker:$eTAG +echo Building openfaas/queue-worker:$eTAG -docker build --build-arg http_proxy=$http_proxy -t functions/queue-worker:$eTAG . +docker build --build-arg http_proxy=$http_proxy -t openfaas/queue-worker:$eTAG . diff --git a/gateway/vendor/github.com/openfaas/nats-queue-worker/handler/handler.go b/gateway/vendor/github.com/openfaas/nats-queue-worker/handler/handler.go index c5dac851..64777848 100644 --- a/gateway/vendor/github.com/openfaas/nats-queue-worker/handler/handler.go +++ b/gateway/vendor/github.com/openfaas/nats-queue-worker/handler/handler.go @@ -1,38 +1,13 @@ package handler import ( - "encoding/json" "fmt" "log" - "os" - - "github.com/nats-io/go-nats-streaming" - "github.com/openfaas/faas/gateway/queue" - "regexp" + "sync" ) -// NatsQueue queue for work -type NatsQueue struct { - nc stan.Conn -} - -type NatsConfig interface { - GetClientID() string -} - -type DefaultNatsConfig struct { -} - -var supportedCharacters, _ = regexp.Compile("[^a-zA-Z0-9-_]+") - -func (DefaultNatsConfig) GetClientID() string { - val, _ := os.Hostname() - return getClientId(val) -} - -// CreateNatsQueue ready for asynchronous processing -func CreateNatsQueue(address string, port int, clientConfig NatsConfig) (*NatsQueue, error) { - queue1 := NatsQueue{} +// CreateNATSQueue ready for asynchronous processing +func CreateNATSQueue(address string, port int, clientConfig NATSConfig) (*NATSQueue, error) { var err error natsURL := fmt.Sprintf("nats://%s:%d", address, port) log.Printf("Opening connection to %s\n", natsURL) @@ -40,28 +15,17 @@ func CreateNatsQueue(address string, port int, clientConfig NatsConfig) (*NatsQu clientID := clientConfig.GetClientID() clusterID := "faas-cluster" - nc, err := stan.Connect(clusterID, clientID, stan.NatsURL(natsURL)) - queue1.nc = nc + queue1 := NATSQueue{ + ClientID: clientID, + ClusterID: clusterID, + NATSURL: natsURL, + Topic: "faas-request", + maxReconnect: clientConfig.GetMaxReconnect(), + reconnectDelay: clientConfig.GetReconnectDelay(), + ncMutex: &sync.RWMutex{}, + } + + err = queue1.connect() return &queue1, err } - -// Queue request for processing -func (q *NatsQueue) Queue(req *queue.Request) error { - var err error - - fmt.Printf("NatsQueue - submitting request: %s.\n", req.Function) - - out, err := json.Marshal(req) - if err != nil { - log.Println(err) - } - - err = q.nc.Publish("faas-request", out) - - return err -} - -func getClientId(hostname string) string { - return "faas-publisher-" + supportedCharacters.ReplaceAllString(hostname, "_") -} \ No newline at end of file diff --git a/gateway/vendor/github.com/openfaas/nats-queue-worker/handler/handler_test.go b/gateway/vendor/github.com/openfaas/nats-queue-worker/handler/handler_test.go index c42ba370..dbf4f13c 100644 --- a/gateway/vendor/github.com/openfaas/nats-queue-worker/handler/handler_test.go +++ b/gateway/vendor/github.com/openfaas/nats-queue-worker/handler/handler_test.go @@ -4,36 +4,37 @@ import ( "os" "strings" "testing" + + "github.com/openfaas/nats-queue-worker/nats" ) func Test_GetClientID_ContainsHostname(t *testing.T) { - c := DefaultNatsConfig{} + c := DefaultNATSConfig{} val := c.GetClientID() hostname, _ := os.Hostname() - if !strings.HasSuffix(val, hostname) { + encodedHostname := nats.GetClientID(hostname) + if !strings.HasSuffix(val, encodedHostname) { t.Errorf("GetClientID should contain hostname as suffix, got: %s", val) t.Fail() } } -func TestCreateClientId(t *testing.T) { - clientId := getClientId("computer-a") - expected := "faas-publisher-computer-a" - if clientId != expected { - t.Logf("Expected client id `%s` actual `%s`\n", expected, clientId) +func TestCreategetClientID(t *testing.T) { + clientID := getClientID("computer-a") + want := "faas-publisher-computer-a" + if clientID != want { + t.Logf("Want clientID: `%s`, but got: `%s`\n", want, clientID) t.Fail() } } -func TestCreateClientIdWhenHostHasUnsupportedCharacters(t *testing.T) { - clientId := getClientId("computer-a.acme.com") - expected := "faas-publisher-computer-a_acme_com" - if clientId != expected { - t.Logf("Expected client id `%s` actual `%s`\n", expected, clientId) +func TestCreategetClientIDWhenHostHasUnsupportedCharacters(t *testing.T) { + clientID := getClientID("computer-a.acme.com") + want := "faas-publisher-computer-a_acme_com" + if clientID != want { + t.Logf("Want clientID: `%s`, but got: `%s`\n", want, clientID) t.Fail() } } - - diff --git a/gateway/vendor/github.com/openfaas/nats-queue-worker/handler/nats_config.go b/gateway/vendor/github.com/openfaas/nats-queue-worker/handler/nats_config.go new file mode 100644 index 00000000..2cb615a0 --- /dev/null +++ b/gateway/vendor/github.com/openfaas/nats-queue-worker/handler/nats_config.go @@ -0,0 +1,41 @@ +package handler + +import ( + "os" + "time" + + "github.com/openfaas/nats-queue-worker/nats" +) + +type NATSConfig interface { + GetClientID() string + GetMaxReconnect() int + GetReconnectDelay() time.Duration +} + +type DefaultNATSConfig struct { + maxReconnect int + reconnectDelay time.Duration +} + +func NewDefaultNATSConfig(maxReconnect int, reconnectDelay time.Duration) DefaultNATSConfig { + return DefaultNATSConfig{maxReconnect, reconnectDelay} +} + +// GetClientID returns the ClientID assigned to this producer/consumer. +func (DefaultNATSConfig) GetClientID() string { + val, _ := os.Hostname() + return getClientID(val) +} + +func (c DefaultNATSConfig) GetMaxReconnect() int { + return c.maxReconnect +} + +func (c DefaultNATSConfig) GetReconnectDelay() time.Duration { + return c.reconnectDelay +} + +func getClientID(hostname string) string { + return "faas-publisher-" + nats.GetClientID(hostname) +} diff --git a/gateway/vendor/github.com/openfaas/nats-queue-worker/handler/nats_queue.go b/gateway/vendor/github.com/openfaas/nats-queue-worker/handler/nats_queue.go new file mode 100644 index 00000000..2881524e --- /dev/null +++ b/gateway/vendor/github.com/openfaas/nats-queue-worker/handler/nats_queue.go @@ -0,0 +1,88 @@ +package handler + +import ( + "encoding/json" + "fmt" + "log" + "sync" + "time" + + "github.com/nats-io/go-nats-streaming" + "github.com/openfaas/faas/gateway/queue" +) + +// NATSQueue queue for work +type NATSQueue struct { + nc stan.Conn + ncMutex *sync.RWMutex + maxReconnect int + reconnectDelay time.Duration + + // ClientID for NATS Streaming + ClientID string + + // ClusterID in NATS Streaming + ClusterID string + + // NATSURL URL to connect to NATS + NATSURL string + + // Topic to respond to + Topic string +} + +// Queue request for processing +func (q *NATSQueue) Queue(req *queue.Request) error { + fmt.Printf("NatsQueue - submitting request: %s.\n", req.Function) + + out, err := json.Marshal(req) + if err != nil { + log.Println(err) + } + + q.ncMutex.RLock() + nc := q.nc + q.ncMutex.RUnlock() + + return nc.Publish(q.Topic, out) +} + +func (q *NATSQueue) connect() error { + nc, err := stan.Connect( + q.ClusterID, + q.ClientID, + stan.NatsURL(q.NATSURL), + stan.SetConnectionLostHandler(func(conn stan.Conn, err error) { + log.Printf("Disconnected from %s\n", q.NATSURL) + + q.reconnect() + }), + ) + + if err != nil { + return err + } + + q.ncMutex.Lock() + q.nc = nc + q.ncMutex.Unlock() + + return nil +} + +func (q *NATSQueue) reconnect() { + for i := 0; i < q.maxReconnect; i++ { + time.Sleep(time.Second * time.Duration(i) * q.reconnectDelay) + + if err := q.connect(); err == nil { + log.Printf("Reconnecting (%d/%d) to %s. OK\n", i+1, q.maxReconnect, q.NATSURL) + + return + } + + log.Printf("Reconnecting (%d/%d) to %s failed\n", i+1, q.maxReconnect, q.NATSURL) + } + + log.Printf("Reached reconnection limit (%d) for %s\n", q.maxReconnect, q.NATSURL) + +} diff --git a/gateway/vendor/github.com/openfaas/nats-queue-worker/main.go b/gateway/vendor/github.com/openfaas/nats-queue-worker/main.go index 014f2b03..6a1b0244 100644 --- a/gateway/vendor/github.com/openfaas/nats-queue-worker/main.go +++ b/gateway/vendor/github.com/openfaas/nats-queue-worker/main.go @@ -10,14 +10,15 @@ import ( "net" "os" "os/signal" - "strconv" "strings" "time" "net/http" "github.com/nats-io/go-nats-streaming" + "github.com/openfaas/faas-provider/auth" "github.com/openfaas/faas/gateway/queue" + "github.com/openfaas/nats-queue-worker/nats" ) // AsyncReport is the report from a function executed on a queue worker. @@ -49,41 +50,32 @@ func makeClient() http.Client { } func main() { + readConfig := ReadConfig{} + config := readConfig.Read() log.SetFlags(0) clusterID := "faas-cluster" val, _ := os.Hostname() - clientID := "faas-worker-" + val - - natsAddress := "nats" - gatewayAddress := "gateway" - functionSuffix := "" - var debugPrintBody bool - - if val, exists := os.LookupEnv("faas_nats_address"); exists { - natsAddress = val - } - - if val, exists := os.LookupEnv("faas_gateway_address"); exists { - gatewayAddress = val - } - - if val, exists := os.LookupEnv("faas_function_suffix"); exists { - functionSuffix = val - } - - if val, exists := os.LookupEnv("faas_print_body"); exists { - debugPrintBody = val == "1" || val == "true" - } + clientID := "faas-worker-" + nats.GetClientID(val) var durable string var qgroup string var unsubscribe bool + var credentials *auth.BasicAuthCredentials + var err error + + if os.Getenv("basic_auth") == "true" { + log.Printf("Loading basic authentication credentials") + credentials, err = LoadCredentials() + if err != nil { + log.Printf("Error with LoadCredentials: %s ", err.Error()) + } + } client := makeClient() - sc, err := stan.Connect(clusterID, clientID, stan.NatsURL("nats://"+natsAddress+":4222")) + sc, err := stan.Connect(clusterID, clientID, stan.NatsURL("nats://"+config.NatsAddress+":4222")) if err != nil { - log.Fatalf("Can't connect: %v\n", err) + log.Fatalf("Can't connect to %s: %v\n", "nats://"+config.NatsAddress+":4222", err) } startOpt := stan.StartWithLastReceived() @@ -104,8 +96,11 @@ func main() { return } + xCallID := req.Header.Get("X-Call-Id") + fmt.Printf("Request for %s.\n", req.Function) - if debugPrintBody { + + if config.DebugPrintBody { fmt.Println(string(req.Body)) } @@ -114,7 +109,7 @@ func main() { queryString = fmt.Sprintf("?%s", strings.TrimLeft(req.QueryString, "?")) } - functionURL := fmt.Sprintf("http://%s%s:8080/%s", req.Function, functionSuffix, queryString) + functionURL := fmt.Sprintf("http://%s%s:8080/%s", req.Function, config.FunctionSuffix, queryString) request, err := http.NewRequest(http.MethodPost, functionURL, bytes.NewReader(req.Body)) defer request.Body.Close() @@ -134,7 +129,12 @@ func main() { if req.CallbackURL != nil { log.Printf("Callback to: %s\n", req.CallbackURL.String()) - resultStatusCode, resultErr := postResult(&client, res, functionResult, req.CallbackURL.String()) + resultStatusCode, resultErr := postResult(&client, + res, + functionResult, + req.CallbackURL.String(), + xCallID, + status) if resultErr != nil { log.Println(resultErr) } else { @@ -142,7 +142,7 @@ func main() { } } - statusCode, reportErr := postReport(&client, req.Function, status, timeTaken, gatewayAddress) + statusCode, reportErr := postReport(&client, req.Function, status, timeTaken, config.GatewayAddress, credentials) if reportErr != nil { log.Println(reportErr) } else { @@ -160,7 +160,12 @@ func main() { if err != nil { log.Println(err) } - fmt.Println(string(functionResult)) + + if config.WriteDebug { + fmt.Println(string(functionResult)) + } else { + fmt.Printf("Wrote %d Bytes\n", len(string(functionResult))) + } } timeTaken := time.Since(started).Seconds() @@ -169,7 +174,12 @@ func main() { if req.CallbackURL != nil { log.Printf("Callback to: %s\n", req.CallbackURL.String()) - resultStatusCode, resultErr := postResult(&client, res, functionResult, req.CallbackURL.String()) + resultStatusCode, resultErr := postResult(&client, + res, + functionResult, + req.CallbackURL.String(), + xCallID, + res.StatusCode) if resultErr != nil { log.Println(resultErr) } else { @@ -177,7 +187,7 @@ func main() { } } - statusCode, reportErr := postReport(&client, req.Function, res.StatusCode, timeTaken, gatewayAddress) + statusCode, reportErr := postReport(&client, req.Function, res.StatusCode, timeTaken, config.GatewayAddress, credentials) if reportErr != nil { log.Println(reportErr) @@ -189,29 +199,8 @@ func main() { subj := "faas-request" qgroup = "faas" - ackWait := time.Second * 30 - maxInflight := 1 - - if value, exists := os.LookupEnv("max_inflight"); exists { - val, err := strconv.Atoi(value) - if err != nil { - log.Println("max_inflight error:", err) - } else { - maxInflight = val - } - } - - if val, exists := os.LookupEnv("ack_wait"); exists { - ackWaitVal, durationErr := time.ParseDuration(val) - if durationErr != nil { - log.Println("ack_wait error:", durationErr) - } else { - ackWait = ackWaitVal - } - } - - log.Println("Wait for ", ackWait) - sub, err := sc.QueueSubscribe(subj, qgroup, mcb, startOpt, stan.DurableName(durable), stan.MaxInflight(maxInflight), stan.AckWait(ackWait)) + log.Println("Wait for ", config.AckWait) + sub, err := sc.QueueSubscribe(subj, qgroup, mcb, startOpt, stan.DurableName(durable), stan.MaxInflight(config.MaxInflight), stan.AckWait(config.AckWait)) if err != nil { log.Panicln(err) } @@ -224,7 +213,7 @@ func main() { cleanupDone := make(chan bool) signal.Notify(signalChan, os.Interrupt) go func() { - for _ = range signalChan { + for range signalChan { fmt.Printf("\nReceived an interrupt, unsubscribing and closing connection...\n\n") // Do not unsubscribe a durable on exit, except if asked to. if durable == "" || unsubscribe { @@ -237,7 +226,7 @@ func main() { <-cleanupDone } -func postResult(client *http.Client, functionRes *http.Response, result []byte, callbackURL string) (int, error) { +func postResult(client *http.Client, functionRes *http.Response, result []byte, callbackURL string, xCallID string, statusCode int) (int, error) { var reader io.Reader if result != nil { @@ -246,7 +235,15 @@ func postResult(client *http.Client, functionRes *http.Response, result []byte, request, err := http.NewRequest(http.MethodPost, callbackURL, reader) - copyHeaders(request.Header, &functionRes.Header) + if functionRes != nil { + copyHeaders(request.Header, &functionRes.Header) + } + + request.Header.Set("X-Function-Status", fmt.Sprintf("%d", statusCode)) + + if len(xCallID) > 0 { + request.Header.Set("X-Call-Id", xCallID) + } res, err := client.Do(request) @@ -272,7 +269,7 @@ func copyHeaders(destination http.Header, source *http.Header) { } } -func postReport(client *http.Client, function string, statusCode int, timeTaken float64, gatewayAddress string) (int, error) { +func postReport(client *http.Client, function string, statusCode int, timeTaken float64, gatewayAddress string, credentials *auth.BasicAuthCredentials) (int, error) { req := AsyncReport{ FunctionName: function, StatusCode: statusCode, @@ -282,6 +279,11 @@ func postReport(client *http.Client, function string, statusCode int, timeTaken targetPostback := "http://" + gatewayAddress + ":8080/system/async-report" reqBytes, _ := json.Marshal(req) request, err := http.NewRequest(http.MethodPost, targetPostback, bytes.NewReader(reqBytes)) + + if os.Getenv("basic_auth") == "true" && credentials != nil { + request.SetBasicAuth(credentials.User, credentials.Password) + } + defer request.Body.Close() res, err := client.Do(request) diff --git a/gateway/vendor/github.com/openfaas/nats-queue-worker/nats/client.go b/gateway/vendor/github.com/openfaas/nats-queue-worker/nats/client.go new file mode 100644 index 00000000..d494d474 --- /dev/null +++ b/gateway/vendor/github.com/openfaas/nats-queue-worker/nats/client.go @@ -0,0 +1,8 @@ +package nats + +import "regexp" + +var supportedCharacters = regexp.MustCompile("[^a-zA-Z0-9-_]+") +func GetClientID(value string) string { + return supportedCharacters.ReplaceAllString(value, "_") +} \ No newline at end of file diff --git a/gateway/vendor/github.com/openfaas/nats-queue-worker/nats/client_test.go b/gateway/vendor/github.com/openfaas/nats-queue-worker/nats/client_test.go new file mode 100644 index 00000000..72de3c35 --- /dev/null +++ b/gateway/vendor/github.com/openfaas/nats-queue-worker/nats/client_test.go @@ -0,0 +1,23 @@ +package nats + +import ( + "testing" +) + +func TestGetClientID(t *testing.T) { + clientID := GetClientID("computer-a") + want := "computer-a" + if clientID != want { + t.Logf("Want clientID: `%s`, but got: `%s`\n", want, clientID) + t.Fail() + } +} + +func TestGetClientIDWhenHostHasUnsupportedCharacters(t *testing.T) { + clientID := GetClientID("computer-a.acme.com") + want := "computer-a_acme_com" + if clientID != want { + t.Logf("Want clientID: `%s`, but got: `%s`\n", want, clientID) + t.Fail() + } +} diff --git a/gateway/vendor/github.com/openfaas/nats-queue-worker/readconfig.go b/gateway/vendor/github.com/openfaas/nats-queue-worker/readconfig.go new file mode 100644 index 00000000..2473e9a4 --- /dev/null +++ b/gateway/vendor/github.com/openfaas/nats-queue-worker/readconfig.go @@ -0,0 +1,81 @@ +package main + +import ( + "log" + "os" + "strconv" + "time" +) + +// ReadConfig constitutes config from env variables +type ReadConfig struct { +} + +func (ReadConfig) Read() QueueWorkerConfig { + cfg := QueueWorkerConfig{ + AckWait: time.Second * 30, + MaxInflight: 1, + } + + if val, exists := os.LookupEnv("faas_nats_address"); exists { + cfg.NatsAddress = val + } else { + cfg.NatsAddress = "nats" + } + + if val, exists := os.LookupEnv("faas_gateway_address"); exists { + cfg.GatewayAddress = val + } else { + cfg.GatewayAddress = "gateway" + } + + if val, exists := os.LookupEnv("faas_function_suffix"); exists { + cfg.FunctionSuffix = val + } + + if val, exists := os.LookupEnv("faas_print_body"); exists { + if val == "1" || val == "true" { + cfg.DebugPrintBody = true + } else { + cfg.DebugPrintBody = false + } + } + + if val, exists := os.LookupEnv("write_debug"); exists { + if val == "1" || val == "true" { + cfg.WriteDebug = true + } else { + cfg.WriteDebug = false + } + } + + if value, exists := os.LookupEnv("max_inflight"); exists { + val, err := strconv.Atoi(value) + if err != nil { + log.Println("max_inflight error:", err) + } else { + cfg.MaxInflight = val + } + } + + if val, exists := os.LookupEnv("ack_wait"); exists { + ackWaitVal, durationErr := time.ParseDuration(val) + if durationErr != nil { + log.Println("ack_wait error:", durationErr) + } else { + cfg.AckWait = ackWaitVal + } + } + + return cfg +} + +type QueueWorkerConfig struct { + NatsAddress string + GatewayAddress string + FunctionSuffix string + DebugPrintBody bool + WriteDebug bool + MaxInflight int + AckWait time.Duration +} diff --git a/gateway/vendor/github.com/openfaas/nats-queue-worker/readconfig_test.go b/gateway/vendor/github.com/openfaas/nats-queue-worker/readconfig_test.go new file mode 100644 index 00000000..6a0adeb6 --- /dev/null +++ b/gateway/vendor/github.com/openfaas/nats-queue-worker/readconfig_test.go @@ -0,0 +1,96 @@ +package main + +import ( + "os" + "testing" + "time" +) + +func Test_ReadConfig(t *testing.T) { + + readConfig := ReadConfig{} + + os.Setenv("faas_nats_address", "test_nats") + os.Setenv("faas_gateway_address", "test_gatewayaddr") + os.Setenv("faas_function_suffix", "test_suffix") + os.Setenv("faas_print_body", "true") + os.Setenv("write_debug", "true") + os.Setenv("max_inflight", "10") + os.Setenv("ack_wait", "10ms") + + config := readConfig.Read() + + expected := "test_nats" + if config.NatsAddress != expected { + t.Logf("Expected NatsAddress `%s` actual `%s`\n", expected, config.NatsAddress) + t.Fail() + } + + expected = "test_gatewayaddr" + if config.GatewayAddress != expected { + t.Logf("Expected GatewayAddress `%s` actual `%s`\n", expected, config.GatewayAddress) + t.Fail() + } + + expected = "test_suffix" + if config.FunctionSuffix != expected { + t.Logf("Expected FunctionSuffix `%s` actual `%s`\n", expected, config.FunctionSuffix) + t.Fail() + } + + if config.DebugPrintBody != true { + t.Logf("Expected DebugPrintBody `%v` actual `%v`\n", true, config.DebugPrintBody) + t.Fail() + } + + if config.WriteDebug != true { + t.Logf("Expected WriteDebug `%v` actual `%v`\n", true, config.WriteDebug) + t.Fail() + } + + expectedMaxInflight := 10 + if config.MaxInflight != expectedMaxInflight { + t.Logf("Expected maxInflight `%v` actual `%v`\n", expectedMaxInflight, config.MaxInflight) + t.Fail() + } + + expectedAckWait := time.Millisecond * 10 + if config.AckWait != expectedAckWait { + t.Logf("Expected maxInflight `%v` actual `%v`\n", expectedAckWait, config.AckWait) + t.Fail() + } + + os.Unsetenv("max_inflight") + os.Unsetenv("ack_wait") + + config = readConfig.Read() + + expectedMaxInflight = 1 + if config.MaxInflight != expectedMaxInflight { + t.Logf("Expected maxInflight `%v` actual `%v`\n", expectedMaxInflight, config.MaxInflight) + t.Fail() + } + + expectedAckWait = time.Second * 30 + if config.AckWait != expectedAckWait { + t.Logf("Expected maxInflight `%v` actual `%v`\n", expectedAckWait, config.AckWait) + t.Fail() + } + + os.Setenv("max_inflight", "10.00") + os.Setenv("ack_wait", "10") + + config = readConfig.Read() + + expectedMaxInflight = 1 + if config.MaxInflight != expectedMaxInflight { + t.Logf("Expected maxInflight `%v` actual `%v`\n", expectedMaxInflight, config.MaxInflight) + t.Fail() + } + + expectedAckWait = time.Second * 30 + if config.AckWait != expectedAckWait { + t.Logf("Expected ackWait `%v` actual `%v`\n", expectedAckWait, config.AckWait) + t.Fail() + } +}