From 2bfca6d848cfdbb4901790eef1f103e8f09519d9 Mon Sep 17 00:00:00 2001 From: "Alex Ellis (OpenFaaS Ltd)" Date: Wed, 22 Apr 2020 11:46:06 +0100 Subject: [PATCH] Publish to multiple topics Enables publishing to various topics according to annotations on the functions. The function cache is moved up one level so that it can be shared between the scale from zero code and the queue proxy. Unit tests added for new internal methods. Tested e2e with arkade and the newest queue-worker and RC gateway image with two queues and an annotation on one of the functions of com.openfaas.queue. It worked as expected including with multiple namespace support. Signed-off-by: Alex Ellis (OpenFaaS Ltd) --- gateway/Dockerfile | 1 + gateway/Dockerfile.arm64 | 1 + gateway/Dockerfile.armhf | 1 + gateway/Gopkg.lock | 12 +-- gateway/Gopkg.toml | 2 +- gateway/handlers/forwarding_proxy.go | 5 +- gateway/handlers/queue_proxy.go | 90 ++++++++++++++----- gateway/handlers/queue_proxy_test.go | 74 +++++++++++++++ gateway/handlers/scaling.go | 4 +- gateway/main.go | 40 +++++---- gateway/metrics/metadata_query.go | 11 +++ .../middleware}/basic_auth_injector.go | 2 +- .../middleware}/basic_auth_injector_test.go | 2 +- .../middleware}/serviceauthinjector.go | 2 +- gateway/plugin/external.go | 33 +++---- gateway/plugin/external_test.go | 10 +-- gateway/queue/types.go | 5 -- gateway/scaling/function_cache.go | 35 ++++---- gateway/scaling/function_meta.go | 21 +++++ gateway/scaling/function_scaler.go | 11 +-- gateway/scaling/service_query.go | 1 + .../nats-queue-worker/handler/nats_queue.go | 7 +- 22 files changed, 264 insertions(+), 106 deletions(-) create mode 100644 gateway/handlers/queue_proxy_test.go create mode 100644 gateway/metrics/metadata_query.go rename gateway/{handlers => pkg/middleware}/basic_auth_injector.go (96%) rename gateway/{handlers => pkg/middleware}/basic_auth_injector_test.go (96%) rename gateway/{handlers => pkg/middleware}/serviceauthinjector.go (80%) create mode 100644 gateway/scaling/function_meta.go diff --git a/gateway/Dockerfile b/gateway/Dockerfile index f68b3868..c902d8b5 100644 --- a/gateway/Dockerfile +++ b/gateway/Dockerfile @@ -28,6 +28,7 @@ COPY queue queue COPY plugin plugin COPY version version COPY scaling scaling +COPY pkg pkg COPY main.go . # Run a gofmt and exclude all vendored code. diff --git a/gateway/Dockerfile.arm64 b/gateway/Dockerfile.arm64 index 2ee06808..d94c3656 100644 --- a/gateway/Dockerfile.arm64 +++ b/gateway/Dockerfile.arm64 @@ -25,6 +25,7 @@ COPY queue queue COPY plugin plugin COPY version version COPY scaling scaling +COPY pkg pkg COPY main.go . # Run a gofmt and exclude all vendored code. diff --git a/gateway/Dockerfile.armhf b/gateway/Dockerfile.armhf index 0a5da0ea..f8b94655 100644 --- a/gateway/Dockerfile.armhf +++ b/gateway/Dockerfile.armhf @@ -25,6 +25,7 @@ COPY queue queue COPY plugin plugin COPY version version COPY scaling scaling +COPY pkg pkg COPY main.go . # Run a gofmt and exclude all vendored code. diff --git a/gateway/Gopkg.lock b/gateway/Gopkg.lock index 85a48d8d..3692d3dc 100644 --- a/gateway/Gopkg.lock +++ b/gateway/Gopkg.lock @@ -101,12 +101,12 @@ version = "v0.6.0" [[projects]] - digest = "1:340f4e2e095ead4e0a15b4646da3e4533f8b6520e3a382eaf586e8166f3bbcb5" + digest = "1:437d6b220b852cd40c465c4eaf01ec26fd1dfdfb9c8c67933a256d0c97fdabb8" name = "github.com/openfaas/faas" packages = ["gateway/queue"] pruneopts = "UT" - revision = "bfa869ec8c0c04c26c5b0ed434bc367e712dcaef" - version = "0.10.2" + revision = "a7c6c39200782ea2e1a1b005d56e3edd51e761a7" + version = "0.18.16" [[projects]] digest = "1:4a97aa8ada0b2f865ca69a3a3bc0a2524c24f31c578c995d5c52cecb6913a9dc" @@ -120,15 +120,15 @@ version = "0.12.0" [[projects]] - digest = "1:1cf86a1a93c110ebcf836468bd917e8c116d6d2fc7612829c15e946b02dbf864" + digest = "1:e40ae4e58551013ed5ad4085b337718c9fba314372c0751713ed9fe8082a323d" name = "github.com/openfaas/nats-queue-worker" packages = [ "handler", "nats", ] pruneopts = "UT" - revision = "a1835cb71db56e6b814b91df027acf62425a76ad" - version = "0.10.0" + revision = "1f4e16e1f7afe1fbd464fcaa8a7dabaa3e4ef0bb" + version = "0.10.1" [[projects]] digest = "1:eb04f69c8991e52eff33c428bd729e04208bf03235be88e4df0d88497c6861b9" diff --git a/gateway/Gopkg.toml b/gateway/Gopkg.toml index 054e9f9d..77273bc4 100644 --- a/gateway/Gopkg.toml +++ b/gateway/Gopkg.toml @@ -12,7 +12,7 @@ [[constraint]] name = "github.com/openfaas/nats-queue-worker" - version = "0.10.0" + version = "0.10.1" [[constraint]] name = "github.com/prometheus/client_golang" diff --git a/gateway/handlers/forwarding_proxy.go b/gateway/handlers/forwarding_proxy.go index 17f4b672..b1dabdc9 100644 --- a/gateway/handlers/forwarding_proxy.go +++ b/gateway/handlers/forwarding_proxy.go @@ -14,6 +14,7 @@ import ( "strings" "time" + "github.com/openfaas/faas/gateway/pkg/middleware" "github.com/openfaas/faas/gateway/types" ) @@ -43,7 +44,7 @@ func MakeForwardingProxyHandler(proxy *types.HTTPClientReverseProxy, notifiers []HTTPNotifier, baseURLResolver BaseURLResolver, urlPathTransformer URLPathTransformer, - serviceAuthInjector AuthInjector) http.HandlerFunc { + serviceAuthInjector middleware.AuthInjector) http.HandlerFunc { writeRequestURI := false if _, exists := os.LookupEnv("write_request_uri"); exists { @@ -108,7 +109,7 @@ func forwardRequest(w http.ResponseWriter, requestURL string, timeout time.Duration, writeRequestURI bool, - serviceAuthInjector AuthInjector) (int, error) { + serviceAuthInjector middleware.AuthInjector) (int, error) { upstreamReq := buildUpstreamRequest(r, baseURL, requestURL) if upstreamReq.Body != nil { diff --git a/gateway/handlers/queue_proxy.go b/gateway/handlers/queue_proxy.go index 3818a58c..822c3fa9 100644 --- a/gateway/handlers/queue_proxy.go +++ b/gateway/handlers/queue_proxy.go @@ -6,16 +6,19 @@ package handlers import ( "fmt" "io/ioutil" + "log" "net/http" "net/url" + "strings" "github.com/gorilla/mux" "github.com/openfaas/faas/gateway/metrics" "github.com/openfaas/faas/gateway/queue" + "github.com/openfaas/faas/gateway/scaling" ) // MakeQueuedProxy accepts work onto a queue -func MakeQueuedProxy(metrics metrics.MetricOptions, wildcard bool, canQueueRequests queue.CanQueueRequests, pathTransformer URLPathTransformer) http.HandlerFunc { +func MakeQueuedProxy(metrics metrics.MetricOptions, wildcard bool, queuer queue.RequestQueuer, pathTransformer URLPathTransformer, defaultNS string, functionCacher scaling.FunctionCacher, serviceQuery scaling.ServiceQuery) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { if r.Body != nil { defer r.Body.Close() @@ -24,29 +27,20 @@ func MakeQueuedProxy(metrics metrics.MetricOptions, wildcard bool, canQueueReque body, err := ioutil.ReadAll(r.Body) if err != nil { - w.WriteHeader(http.StatusBadRequest) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } - w.Write([]byte(err.Error())) + callbackURL, err := getCallbackURLHeader(r.Header) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) return } vars := mux.Vars(r) name := vars["name"] - callbackURLHeader := r.Header.Get("X-Callback-Url") - var callbackURL *url.URL - - if len(callbackURLHeader) > 0 { - urlVal, urlErr := url.Parse(callbackURLHeader) - if urlErr != nil { - w.WriteHeader(http.StatusBadRequest) - - w.Write([]byte(urlErr.Error())) - return - } - - callbackURL = urlVal - } + queueName, err := getQueueName(name, functionCacher, serviceQuery) req := &queue.Request{ Function: name, @@ -57,15 +51,69 @@ func MakeQueuedProxy(metrics metrics.MetricOptions, wildcard bool, canQueueReque Header: r.Header, Host: r.Host, CallbackURL: callbackURL, + QueueName: queueName, } - if err = canQueueRequests.Queue(req); err != nil { - w.WriteHeader(http.StatusInternalServerError) - w.Write([]byte(err.Error())) - fmt.Println(err) + if len(queueName) > 0 { + log.Printf("Queueing %s to: %s\n", name, queueName) + } + + if err = queuer.Queue(req); err != nil { + fmt.Printf("Queue error: %v\n", err) + http.Error(w, err.Error(), http.StatusInternalServerError) return } w.WriteHeader(http.StatusAccepted) } } + +func getQueueName(name string, cache scaling.FunctionCacher, serviceQuery scaling.ServiceQuery) (queueName string, err error) { + fn, ns := getNameParts(name) + + query, hit := cache.Get(fn, ns) + if !hit { + queryResponse, err := serviceQuery.GetReplicas(fn, ns) + if err != nil { + return "", err + } + cache.Set(fn, ns, queryResponse) + } + + query, _ = cache.Get(fn, ns) + + queueName = "" + if query.Annotations != nil { + if v := (*query.Annotations)["com.openfaas.queue"]; len(v) > 0 { + queueName = v + } + } + return queueName, err +} + +func getCallbackURLHeader(header http.Header) (*url.URL, error) { + value := header.Get("X-Callback-Url") + var callbackURL *url.URL + + if len(value) > 0 { + urlVal, err := url.Parse(value) + if err != nil { + return callbackURL, err + } + + callbackURL = urlVal + } + + return callbackURL, nil +} + +func getNameParts(name string) (fn, ns string) { + fn = name + ns = "" + + if index := strings.LastIndex(name, "."); index > 0 { + fn = name[:index] + ns = name[index+1:] + } + return fn, ns +} diff --git a/gateway/handlers/queue_proxy_test.go b/gateway/handlers/queue_proxy_test.go new file mode 100644 index 00000000..a5c8ee3c --- /dev/null +++ b/gateway/handlers/queue_proxy_test.go @@ -0,0 +1,74 @@ +// Copyright (c) Alex Ellis 2017. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +package handlers + +import ( + "net/http" + "testing" +) + +func Test_getNameParts(t *testing.T) { + fn, ns := getNameParts("figlet.openfaas-fn") + wantFn := "figlet" + wantNs := "openfaas-fn" + + if fn != wantFn { + t.Fatalf("want %s, got %s", wantFn, fn) + } + if ns != wantNs { + t.Fatalf("want %s, got %s", wantNs, ns) + } +} + +func Test_getNamePartsDualDot(t *testing.T) { + fn, ns := getNameParts("dev.figlet.openfaas-fn") + wantFn := "dev.figlet" + wantNs := "openfaas-fn" + + if fn != wantFn { + t.Fatalf("want %s, got %s", wantFn, fn) + } + if ns != wantNs { + t.Fatalf("want %s, got %s", wantNs, ns) + } +} + +func Test_getNameParts_NoNs(t *testing.T) { + fn, ns := getNameParts("figlet") + wantFn := "figlet" + wantNs := "" + + if fn != wantFn { + t.Fatalf("want %s, got %s", wantFn, fn) + } + if ns != wantNs { + t.Fatalf("want %s, got %s", wantNs, ns) + } +} + +func Test_getCallbackURLHeader(t *testing.T) { + want := "http://localhost:8080" + header := http.Header{} + header.Add("X-Callback-Url", want) + + uri, err := getCallbackURLHeader(header) + if err != nil { + t.Fatal(err) + } + + if uri.String() != want { + t.Fatalf("want %s, but got %s", want, uri.String()) + } +} + +func Test_getCallbackURLHeader_ParseFails(t *testing.T) { + want := "ht tp://foo.com" + header := http.Header{} + header.Add("X-Callback-Url", want) + + _, err := getCallbackURLHeader(header) + if err == nil { + t.Fatal("wanted a parsing error.") + } +} diff --git a/gateway/handlers/scaling.go b/gateway/handlers/scaling.go index 85ac6f3e..5ed3617f 100644 --- a/gateway/handlers/scaling.go +++ b/gateway/handlers/scaling.go @@ -24,9 +24,7 @@ func getNamespace(defaultNamespace, fullName string) (string, string) { // be called. If the function is not ready after the configured // amount of attempts / queries then next will not be invoked and a status // will be returned to the client. -func MakeScalingHandler(next http.HandlerFunc, config scaling.ScalingConfig, defaultNamespace string) http.HandlerFunc { - - scaler := scaling.NewFunctionScaler(config) +func MakeScalingHandler(next http.HandlerFunc, scaler scaling.FunctionScaler, config scaling.ScalingConfig, defaultNamespace string) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { diff --git a/gateway/main.go b/gateway/main.go index 1095fa94..9f63ad9e 100644 --- a/gateway/main.go +++ b/gateway/main.go @@ -13,6 +13,7 @@ import ( "github.com/openfaas/faas-provider/auth" "github.com/openfaas/faas/gateway/handlers" "github.com/openfaas/faas/gateway/metrics" + "github.com/openfaas/faas/gateway/pkg/middleware" "github.com/openfaas/faas/gateway/plugin" "github.com/openfaas/faas/gateway/scaling" "github.com/openfaas/faas/gateway/types" @@ -60,6 +61,9 @@ func main() { servicePollInterval := time.Second * 5 + metadataQuery := metrics.NewMetadataQuery(credentials) + fmt.Println(metadataQuery) + metricsOptions := metrics.BuildMetricsOptions() exporter := metrics.NewExporter(metricsOptions, credentials) exporter.StartServiceWatcher(*config.FunctionsProviderURL, metricsOptions, "func", servicePollInterval) @@ -100,10 +104,10 @@ func main() { functionURLTransformer = nilURLTransformer } - var serviceAuthInjector handlers.AuthInjector + var serviceAuthInjector middleware.AuthInjector if config.UseBasicAuth { - serviceAuthInjector = &handlers.BasicAuthInjector{Credentials: credentials} + serviceAuthInjector = &middleware.BasicAuthInjector{Credentials: credentials} } decorateExternalAuth := handlers.MakeExternalAuthHandler @@ -129,6 +133,21 @@ func main() { faasHandlers.LogProxyHandler = handlers.NewLogHandlerFunc(*config.LogsProviderURL, config.WriteTimeout) + scalingConfig := scaling.ScalingConfig{ + MaxPollCount: uint(1000), + SetScaleRetries: uint(20), + FunctionPollInterval: time.Millisecond * 50, + CacheExpiry: time.Second * 5, // freshness of replica values before going stale + ServiceQuery: externalServiceQuery, + } + + functionProxy := faasHandlers.Proxy + if config.ScaleFromZero { + scalingFunctionCache := scaling.NewFunctionCache(scalingConfig.CacheExpiry) + scaler := scaling.NewFunctionScaler(scalingConfig, scalingFunctionCache) + functionProxy = handlers.MakeScalingHandler(faasHandlers.Proxy, scaler, scalingConfig, config.Namespace) + } + if config.UseNATS() { log.Println("Async enabled: Using NATS Streaming.") maxReconnect := 60 @@ -141,8 +160,9 @@ func main() { log.Fatalln(queueErr) } + queueFunctionCache := scaling.NewFunctionCache(scalingConfig.CacheExpiry) faasHandlers.QueuedProxy = handlers.MakeNotifierWrapper( - handlers.MakeCallIDMiddleware(handlers.MakeQueuedProxy(metricsOptions, true, natsQueue, trimURLTransformer)), + handlers.MakeCallIDMiddleware(handlers.MakeQueuedProxy(metricsOptions, true, natsQueue, trimURLTransformer, config.Namespace, queueFunctionCache, externalServiceQuery)), forwardingNotifiers, ) @@ -188,20 +208,6 @@ func main() { r := mux.NewRouter() // max wait time to start a function = maxPollCount * functionPollInterval - functionProxy := faasHandlers.Proxy - - if config.ScaleFromZero { - scalingConfig := scaling.ScalingConfig{ - MaxPollCount: uint(1000), - SetScaleRetries: uint(20), - FunctionPollInterval: time.Millisecond * 50, - CacheExpiry: time.Second * 5, // freshness of replica values before going stale - ServiceQuery: externalServiceQuery, - } - - functionProxy = handlers.MakeScalingHandler(faasHandlers.Proxy, scalingConfig, config.Namespace) - } - r.HandleFunc("/function/{name:["+NameExpression+"]+}", functionProxy) r.HandleFunc("/function/{name:["+NameExpression+"]+}/", functionProxy) r.HandleFunc("/function/{name:["+NameExpression+"]+}/{params:.*}", functionProxy) diff --git a/gateway/metrics/metadata_query.go b/gateway/metrics/metadata_query.go new file mode 100644 index 00000000..f6b5b43d --- /dev/null +++ b/gateway/metrics/metadata_query.go @@ -0,0 +1,11 @@ +package metrics + +import "github.com/openfaas/faas-provider/auth" + +type MetadataQuery struct { + Credentials *auth.BasicAuthCredentials +} + +func NewMetadataQuery(credentials *auth.BasicAuthCredentials) *MetadataQuery { + return &MetadataQuery{Credentials: credentials} +} diff --git a/gateway/handlers/basic_auth_injector.go b/gateway/pkg/middleware/basic_auth_injector.go similarity index 96% rename from gateway/handlers/basic_auth_injector.go rename to gateway/pkg/middleware/basic_auth_injector.go index 83fc0220..81e62837 100644 --- a/gateway/handlers/basic_auth_injector.go +++ b/gateway/pkg/middleware/basic_auth_injector.go @@ -1,7 +1,7 @@ // Copyright (c) OpenFaaS Author(s). All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. -package handlers +package middleware import ( "net/http" diff --git a/gateway/handlers/basic_auth_injector_test.go b/gateway/pkg/middleware/basic_auth_injector_test.go similarity index 96% rename from gateway/handlers/basic_auth_injector_test.go rename to gateway/pkg/middleware/basic_auth_injector_test.go index 850bfab5..f844d11f 100644 --- a/gateway/handlers/basic_auth_injector_test.go +++ b/gateway/pkg/middleware/basic_auth_injector_test.go @@ -1,7 +1,7 @@ // Copyright (c) OpenFaaS Author(s). All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. -package handlers +package middleware import ( "net/http" diff --git a/gateway/handlers/serviceauthinjector.go b/gateway/pkg/middleware/serviceauthinjector.go similarity index 80% rename from gateway/handlers/serviceauthinjector.go rename to gateway/pkg/middleware/serviceauthinjector.go index 21a03881..7f9b95b1 100644 --- a/gateway/handlers/serviceauthinjector.go +++ b/gateway/pkg/middleware/serviceauthinjector.go @@ -1,4 +1,4 @@ -package handlers +package middleware import "net/http" diff --git a/gateway/plugin/external.go b/gateway/plugin/external.go index 3e334026..a6454ded 100644 --- a/gateway/plugin/external.go +++ b/gateway/plugin/external.go @@ -16,12 +16,26 @@ import ( "time" types "github.com/openfaas/faas-provider/types" - "github.com/openfaas/faas/gateway/handlers" + middleware "github.com/openfaas/faas/gateway/pkg/middleware" "github.com/openfaas/faas/gateway/scaling" ) +// ExternalServiceQuery proxies service queries to external plugin via HTTP +type ExternalServiceQuery struct { + URL url.URL + ProxyClient http.Client + AuthInjector middleware.AuthInjector +} + +// ScaleServiceRequest request scaling of replica +type ScaleServiceRequest struct { + ServiceName string `json:"serviceName"` + ServiceNamespace string `json:"serviceNamespace"` + Replicas uint64 `json:"replicas"` +} + // NewExternalServiceQuery proxies service queries to external plugin via HTTP -func NewExternalServiceQuery(externalURL url.URL, authInjector handlers.AuthInjector) scaling.ServiceQuery { +func NewExternalServiceQuery(externalURL url.URL, authInjector middleware.AuthInjector) scaling.ServiceQuery { timeout := 3 * time.Second proxyClient := http.Client{ @@ -45,20 +59,6 @@ func NewExternalServiceQuery(externalURL url.URL, authInjector handlers.AuthInje } } -// ExternalServiceQuery proxies service queries to external plugin via HTTP -type ExternalServiceQuery struct { - URL url.URL - ProxyClient http.Client - AuthInjector handlers.AuthInjector -} - -// ScaleServiceRequest request scaling of replica -type ScaleServiceRequest struct { - ServiceName string `json:"serviceName"` - ServiceNamespace string `json:"serviceNamespace"` - Replicas uint64 `json:"replicas"` -} - // GetReplicas replica count for function func (s ExternalServiceQuery) GetReplicas(serviceName, serviceNamespace string) (scaling.ServiceQueryResponse, error) { start := time.Now() @@ -126,6 +126,7 @@ func (s ExternalServiceQuery) GetReplicas(serviceName, serviceNamespace string) MinReplicas: minReplicas, ScalingFactor: scalingFactor, AvailableReplicas: availableReplicas, + Annotations: function.Annotations, }, err } diff --git a/gateway/plugin/external_test.go b/gateway/plugin/external_test.go index 13b3410b..58ebdcf8 100644 --- a/gateway/plugin/external_test.go +++ b/gateway/plugin/external_test.go @@ -7,7 +7,7 @@ import ( "strings" "testing" - "github.com/openfaas/faas/gateway/handlers" + middleware "github.com/openfaas/faas/gateway/pkg/middleware" "github.com/openfaas/faas/gateway/scaling" ) @@ -47,7 +47,7 @@ func TestGetReplicasNonExistentFn(t *testing.T) { })) defer testServer.Close() - var injector handlers.AuthInjector + var injector middleware.AuthInjector url, _ := url.Parse(testServer.URL + "/") esq := NewExternalServiceQuery(*url, injector) @@ -77,7 +77,7 @@ func TestGetReplicasExistentFn(t *testing.T) { AvailableReplicas: 0, } - var injector handlers.AuthInjector + var injector middleware.AuthInjector url, _ := url.Parse(testServer.URL + "/") esq := NewExternalServiceQuery(*url, injector) @@ -102,7 +102,7 @@ func TestSetReplicasNonExistentFn(t *testing.T) { })) defer testServer.Close() - var injector handlers.AuthInjector + var injector middleware.AuthInjector url, _ := url.Parse(testServer.URL + "/") esq := NewExternalServiceQuery(*url, injector) @@ -124,7 +124,7 @@ func TestSetReplicasExistentFn(t *testing.T) { })) defer testServer.Close() - var injector handlers.AuthInjector + var injector middleware.AuthInjector url, _ := url.Parse(testServer.URL + "/") esq := NewExternalServiceQuery(*url, injector) diff --git a/gateway/queue/types.go b/gateway/queue/types.go index 4d8e4e43..f650a15f 100644 --- a/gateway/queue/types.go +++ b/gateway/queue/types.go @@ -43,8 +43,3 @@ type Request struct { type RequestQueuer interface { Queue(req *Request) error } - -// CanQueueRequests can take on asynchronous requests -type CanQueueRequests interface { - Queue(req *Request) error -} diff --git a/gateway/scaling/function_cache.go b/gateway/scaling/function_cache.go index 1f6c9d34..4fe2a662 100644 --- a/gateway/scaling/function_cache.go +++ b/gateway/scaling/function_cache.go @@ -8,17 +8,10 @@ import ( "time" ) -// FunctionMeta holds the last refresh and any other -// meta-data needed for caching. -type FunctionMeta struct { - LastRefresh time.Time - ServiceQueryResponse ServiceQueryResponse -} - -// Expired find out whether the cache item has expired with -// the given expiry duration from when it was stored. -func (fm *FunctionMeta) Expired(expiry time.Duration) bool { - return time.Now().After(fm.LastRefresh.Add(expiry)) +// FunctionCacher queries functions and caches the results +type FunctionCacher interface { + Set(functionName, namespace string, serviceQueryResponse ServiceQueryResponse) + Get(functionName, namespace string) (ServiceQueryResponse, bool) } // FunctionCache provides a cache of Function replica counts @@ -28,8 +21,16 @@ type FunctionCache struct { Sync sync.RWMutex } +// NewFunctionCache creates a function cache to query function metadata +func NewFunctionCache(cacheExpiry time.Duration) FunctionCacher { + return &FunctionCache{ + Cache: make(map[string]*FunctionMeta), + Expiry: cacheExpiry, + } +} + // Set replica count for functionName -func (fc *FunctionCache) Set(functionName, namespace string, serviceQueryResponse ServiceQueryResponse) { +func (fc *FunctionCache) Set(functionName, namespace string, queryRes ServiceQueryResponse) { fc.Sync.Lock() defer fc.Sync.Unlock() @@ -38,14 +39,12 @@ func (fc *FunctionCache) Set(functionName, namespace string, serviceQueryRespons } fc.Cache[functionName+"."+namespace].LastRefresh = time.Now() - fc.Cache[functionName+"."+namespace].ServiceQueryResponse = serviceQueryResponse - // entry.LastRefresh = time.Now() - // entry.ServiceQueryResponse = serviceQueryResponse + fc.Cache[functionName+"."+namespace].ServiceQueryResponse = queryRes } // Get replica count for functionName func (fc *FunctionCache) Get(functionName, namespace string) (ServiceQueryResponse, bool) { - replicas := ServiceQueryResponse{ + queryRes := ServiceQueryResponse{ AvailableReplicas: 0, } @@ -54,9 +53,9 @@ func (fc *FunctionCache) Get(functionName, namespace string) (ServiceQueryRespon defer fc.Sync.RUnlock() if val, exists := fc.Cache[functionName+"."+namespace]; exists { - replicas = val.ServiceQueryResponse + queryRes = val.ServiceQueryResponse hit = !val.Expired(fc.Expiry) } - return replicas, hit + return queryRes, hit } diff --git a/gateway/scaling/function_meta.go b/gateway/scaling/function_meta.go new file mode 100644 index 00000000..73d0576d --- /dev/null +++ b/gateway/scaling/function_meta.go @@ -0,0 +1,21 @@ +// Copyright (c) OpenFaaS Author(s). All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +package scaling + +import ( + "time" +) + +// FunctionMeta holds the last refresh and any other +// meta-data needed for caching. +type FunctionMeta struct { + LastRefresh time.Time + ServiceQueryResponse ServiceQueryResponse +} + +// Expired find out whether the cache item has expired with +// the given expiry duration from when it was stored. +func (fm *FunctionMeta) Expired(expiry time.Duration) bool { + return time.Now().After(fm.LastRefresh.Add(expiry)) +} diff --git a/gateway/scaling/function_scaler.go b/gateway/scaling/function_scaler.go index 93c9b08a..bac88fb9 100644 --- a/gateway/scaling/function_scaler.go +++ b/gateway/scaling/function_scaler.go @@ -8,21 +8,16 @@ import ( // NewFunctionScaler create a new scaler with the specified // ScalingConfig -func NewFunctionScaler(config ScalingConfig) FunctionScaler { - cache := FunctionCache{ - Cache: make(map[string]*FunctionMeta), - Expiry: config.CacheExpiry, - } - +func NewFunctionScaler(config ScalingConfig, functionCacher FunctionCacher) FunctionScaler { return FunctionScaler{ - Cache: &cache, + Cache: functionCacher, Config: config, } } // FunctionScaler scales from zero type FunctionScaler struct { - Cache *FunctionCache + Cache FunctionCacher Config ScalingConfig } diff --git a/gateway/scaling/service_query.go b/gateway/scaling/service_query.go index e33c086b..7d60e61b 100644 --- a/gateway/scaling/service_query.go +++ b/gateway/scaling/service_query.go @@ -16,4 +16,5 @@ type ServiceQueryResponse struct { MinReplicas uint64 ScalingFactor uint64 AvailableReplicas uint64 + Annotations *map[string]string } diff --git a/gateway/vendor/github.com/openfaas/nats-queue-worker/handler/nats_queue.go b/gateway/vendor/github.com/openfaas/nats-queue-worker/handler/nats_queue.go index 1526c992..8d2cf35f 100644 --- a/gateway/vendor/github.com/openfaas/nats-queue-worker/handler/nats_queue.go +++ b/gateway/vendor/github.com/openfaas/nats-queue-worker/handler/nats_queue.go @@ -44,7 +44,12 @@ func (q *NATSQueue) Queue(req *queue.Request) error { nc := q.nc q.ncMutex.RUnlock() - return nc.Publish(q.Topic, out) + queueName := q.Topic + if len(req.QueueName) > 0 { + queueName = req.QueueName + } + + return nc.Publish(queueName, out) } func (q *NATSQueue) connect() error {