diff --git a/gateway/Dockerfile b/gateway/Dockerfile index f68b3868..c902d8b5 100644 --- a/gateway/Dockerfile +++ b/gateway/Dockerfile @@ -28,6 +28,7 @@ COPY queue queue COPY plugin plugin COPY version version COPY scaling scaling +COPY pkg pkg COPY main.go . # Run a gofmt and exclude all vendored code. diff --git a/gateway/Dockerfile.arm64 b/gateway/Dockerfile.arm64 index 2ee06808..d94c3656 100644 --- a/gateway/Dockerfile.arm64 +++ b/gateway/Dockerfile.arm64 @@ -25,6 +25,7 @@ COPY queue queue COPY plugin plugin COPY version version COPY scaling scaling +COPY pkg pkg COPY main.go . # Run a gofmt and exclude all vendored code. diff --git a/gateway/Dockerfile.armhf b/gateway/Dockerfile.armhf index 0a5da0ea..f8b94655 100644 --- a/gateway/Dockerfile.armhf +++ b/gateway/Dockerfile.armhf @@ -25,6 +25,7 @@ COPY queue queue COPY plugin plugin COPY version version COPY scaling scaling +COPY pkg pkg COPY main.go . # Run a gofmt and exclude all vendored code. diff --git a/gateway/Gopkg.lock b/gateway/Gopkg.lock index 85a48d8d..3692d3dc 100644 --- a/gateway/Gopkg.lock +++ b/gateway/Gopkg.lock @@ -101,12 +101,12 @@ version = "v0.6.0" [[projects]] - digest = "1:340f4e2e095ead4e0a15b4646da3e4533f8b6520e3a382eaf586e8166f3bbcb5" + digest = "1:437d6b220b852cd40c465c4eaf01ec26fd1dfdfb9c8c67933a256d0c97fdabb8" name = "github.com/openfaas/faas" packages = ["gateway/queue"] pruneopts = "UT" - revision = "bfa869ec8c0c04c26c5b0ed434bc367e712dcaef" - version = "0.10.2" + revision = "a7c6c39200782ea2e1a1b005d56e3edd51e761a7" + version = "0.18.16" [[projects]] digest = "1:4a97aa8ada0b2f865ca69a3a3bc0a2524c24f31c578c995d5c52cecb6913a9dc" @@ -120,15 +120,15 @@ version = "0.12.0" [[projects]] - digest = "1:1cf86a1a93c110ebcf836468bd917e8c116d6d2fc7612829c15e946b02dbf864" + digest = "1:e40ae4e58551013ed5ad4085b337718c9fba314372c0751713ed9fe8082a323d" name = "github.com/openfaas/nats-queue-worker" packages = [ "handler", "nats", ] pruneopts = "UT" - revision = "a1835cb71db56e6b814b91df027acf62425a76ad" - version = "0.10.0" + revision = "1f4e16e1f7afe1fbd464fcaa8a7dabaa3e4ef0bb" + version = "0.10.1" [[projects]] digest = "1:eb04f69c8991e52eff33c428bd729e04208bf03235be88e4df0d88497c6861b9" diff --git a/gateway/Gopkg.toml b/gateway/Gopkg.toml index 054e9f9d..77273bc4 100644 --- a/gateway/Gopkg.toml +++ b/gateway/Gopkg.toml @@ -12,7 +12,7 @@ [[constraint]] name = "github.com/openfaas/nats-queue-worker" - version = "0.10.0" + version = "0.10.1" [[constraint]] name = "github.com/prometheus/client_golang" diff --git a/gateway/handlers/forwarding_proxy.go b/gateway/handlers/forwarding_proxy.go index 17f4b672..b1dabdc9 100644 --- a/gateway/handlers/forwarding_proxy.go +++ b/gateway/handlers/forwarding_proxy.go @@ -14,6 +14,7 @@ import ( "strings" "time" + "github.com/openfaas/faas/gateway/pkg/middleware" "github.com/openfaas/faas/gateway/types" ) @@ -43,7 +44,7 @@ func MakeForwardingProxyHandler(proxy *types.HTTPClientReverseProxy, notifiers []HTTPNotifier, baseURLResolver BaseURLResolver, urlPathTransformer URLPathTransformer, - serviceAuthInjector AuthInjector) http.HandlerFunc { + serviceAuthInjector middleware.AuthInjector) http.HandlerFunc { writeRequestURI := false if _, exists := os.LookupEnv("write_request_uri"); exists { @@ -108,7 +109,7 @@ func forwardRequest(w http.ResponseWriter, requestURL string, timeout time.Duration, writeRequestURI bool, - serviceAuthInjector AuthInjector) (int, error) { + serviceAuthInjector middleware.AuthInjector) (int, error) { upstreamReq := buildUpstreamRequest(r, baseURL, requestURL) if upstreamReq.Body != nil { diff --git a/gateway/handlers/queue_proxy.go b/gateway/handlers/queue_proxy.go index 3818a58c..822c3fa9 100644 --- a/gateway/handlers/queue_proxy.go +++ b/gateway/handlers/queue_proxy.go @@ -6,16 +6,19 @@ package handlers import ( "fmt" "io/ioutil" + "log" "net/http" "net/url" + "strings" "github.com/gorilla/mux" "github.com/openfaas/faas/gateway/metrics" "github.com/openfaas/faas/gateway/queue" + "github.com/openfaas/faas/gateway/scaling" ) // MakeQueuedProxy accepts work onto a queue -func MakeQueuedProxy(metrics metrics.MetricOptions, wildcard bool, canQueueRequests queue.CanQueueRequests, pathTransformer URLPathTransformer) http.HandlerFunc { +func MakeQueuedProxy(metrics metrics.MetricOptions, wildcard bool, queuer queue.RequestQueuer, pathTransformer URLPathTransformer, defaultNS string, functionCacher scaling.FunctionCacher, serviceQuery scaling.ServiceQuery) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { if r.Body != nil { defer r.Body.Close() @@ -24,29 +27,20 @@ func MakeQueuedProxy(metrics metrics.MetricOptions, wildcard bool, canQueueReque body, err := ioutil.ReadAll(r.Body) if err != nil { - w.WriteHeader(http.StatusBadRequest) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } - w.Write([]byte(err.Error())) + callbackURL, err := getCallbackURLHeader(r.Header) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) return } vars := mux.Vars(r) name := vars["name"] - callbackURLHeader := r.Header.Get("X-Callback-Url") - var callbackURL *url.URL - - if len(callbackURLHeader) > 0 { - urlVal, urlErr := url.Parse(callbackURLHeader) - if urlErr != nil { - w.WriteHeader(http.StatusBadRequest) - - w.Write([]byte(urlErr.Error())) - return - } - - callbackURL = urlVal - } + queueName, err := getQueueName(name, functionCacher, serviceQuery) req := &queue.Request{ Function: name, @@ -57,15 +51,69 @@ func MakeQueuedProxy(metrics metrics.MetricOptions, wildcard bool, canQueueReque Header: r.Header, Host: r.Host, CallbackURL: callbackURL, + QueueName: queueName, } - if err = canQueueRequests.Queue(req); err != nil { - w.WriteHeader(http.StatusInternalServerError) - w.Write([]byte(err.Error())) - fmt.Println(err) + if len(queueName) > 0 { + log.Printf("Queueing %s to: %s\n", name, queueName) + } + + if err = queuer.Queue(req); err != nil { + fmt.Printf("Queue error: %v\n", err) + http.Error(w, err.Error(), http.StatusInternalServerError) return } w.WriteHeader(http.StatusAccepted) } } + +func getQueueName(name string, cache scaling.FunctionCacher, serviceQuery scaling.ServiceQuery) (queueName string, err error) { + fn, ns := getNameParts(name) + + query, hit := cache.Get(fn, ns) + if !hit { + queryResponse, err := serviceQuery.GetReplicas(fn, ns) + if err != nil { + return "", err + } + cache.Set(fn, ns, queryResponse) + } + + query, _ = cache.Get(fn, ns) + + queueName = "" + if query.Annotations != nil { + if v := (*query.Annotations)["com.openfaas.queue"]; len(v) > 0 { + queueName = v + } + } + return queueName, err +} + +func getCallbackURLHeader(header http.Header) (*url.URL, error) { + value := header.Get("X-Callback-Url") + var callbackURL *url.URL + + if len(value) > 0 { + urlVal, err := url.Parse(value) + if err != nil { + return callbackURL, err + } + + callbackURL = urlVal + } + + return callbackURL, nil +} + +func getNameParts(name string) (fn, ns string) { + fn = name + ns = "" + + if index := strings.LastIndex(name, "."); index > 0 { + fn = name[:index] + ns = name[index+1:] + } + return fn, ns +} diff --git a/gateway/handlers/queue_proxy_test.go b/gateway/handlers/queue_proxy_test.go new file mode 100644 index 00000000..a5c8ee3c --- /dev/null +++ b/gateway/handlers/queue_proxy_test.go @@ -0,0 +1,74 @@ +// Copyright (c) Alex Ellis 2017. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +package handlers + +import ( + "net/http" + "testing" +) + +func Test_getNameParts(t *testing.T) { + fn, ns := getNameParts("figlet.openfaas-fn") + wantFn := "figlet" + wantNs := "openfaas-fn" + + if fn != wantFn { + t.Fatalf("want %s, got %s", wantFn, fn) + } + if ns != wantNs { + t.Fatalf("want %s, got %s", wantNs, ns) + } +} + +func Test_getNamePartsDualDot(t *testing.T) { + fn, ns := getNameParts("dev.figlet.openfaas-fn") + wantFn := "dev.figlet" + wantNs := "openfaas-fn" + + if fn != wantFn { + t.Fatalf("want %s, got %s", wantFn, fn) + } + if ns != wantNs { + t.Fatalf("want %s, got %s", wantNs, ns) + } +} + +func Test_getNameParts_NoNs(t *testing.T) { + fn, ns := getNameParts("figlet") + wantFn := "figlet" + wantNs := "" + + if fn != wantFn { + t.Fatalf("want %s, got %s", wantFn, fn) + } + if ns != wantNs { + t.Fatalf("want %s, got %s", wantNs, ns) + } +} + +func Test_getCallbackURLHeader(t *testing.T) { + want := "http://localhost:8080" + header := http.Header{} + header.Add("X-Callback-Url", want) + + uri, err := getCallbackURLHeader(header) + if err != nil { + t.Fatal(err) + } + + if uri.String() != want { + t.Fatalf("want %s, but got %s", want, uri.String()) + } +} + +func Test_getCallbackURLHeader_ParseFails(t *testing.T) { + want := "ht tp://foo.com" + header := http.Header{} + header.Add("X-Callback-Url", want) + + _, err := getCallbackURLHeader(header) + if err == nil { + t.Fatal("wanted a parsing error.") + } +} diff --git a/gateway/handlers/scaling.go b/gateway/handlers/scaling.go index 85ac6f3e..5ed3617f 100644 --- a/gateway/handlers/scaling.go +++ b/gateway/handlers/scaling.go @@ -24,9 +24,7 @@ func getNamespace(defaultNamespace, fullName string) (string, string) { // be called. If the function is not ready after the configured // amount of attempts / queries then next will not be invoked and a status // will be returned to the client. -func MakeScalingHandler(next http.HandlerFunc, config scaling.ScalingConfig, defaultNamespace string) http.HandlerFunc { - - scaler := scaling.NewFunctionScaler(config) +func MakeScalingHandler(next http.HandlerFunc, scaler scaling.FunctionScaler, config scaling.ScalingConfig, defaultNamespace string) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { diff --git a/gateway/main.go b/gateway/main.go index 1095fa94..9f63ad9e 100644 --- a/gateway/main.go +++ b/gateway/main.go @@ -13,6 +13,7 @@ import ( "github.com/openfaas/faas-provider/auth" "github.com/openfaas/faas/gateway/handlers" "github.com/openfaas/faas/gateway/metrics" + "github.com/openfaas/faas/gateway/pkg/middleware" "github.com/openfaas/faas/gateway/plugin" "github.com/openfaas/faas/gateway/scaling" "github.com/openfaas/faas/gateway/types" @@ -60,6 +61,9 @@ func main() { servicePollInterval := time.Second * 5 + metadataQuery := metrics.NewMetadataQuery(credentials) + fmt.Println(metadataQuery) + metricsOptions := metrics.BuildMetricsOptions() exporter := metrics.NewExporter(metricsOptions, credentials) exporter.StartServiceWatcher(*config.FunctionsProviderURL, metricsOptions, "func", servicePollInterval) @@ -100,10 +104,10 @@ func main() { functionURLTransformer = nilURLTransformer } - var serviceAuthInjector handlers.AuthInjector + var serviceAuthInjector middleware.AuthInjector if config.UseBasicAuth { - serviceAuthInjector = &handlers.BasicAuthInjector{Credentials: credentials} + serviceAuthInjector = &middleware.BasicAuthInjector{Credentials: credentials} } decorateExternalAuth := handlers.MakeExternalAuthHandler @@ -129,6 +133,21 @@ func main() { faasHandlers.LogProxyHandler = handlers.NewLogHandlerFunc(*config.LogsProviderURL, config.WriteTimeout) + scalingConfig := scaling.ScalingConfig{ + MaxPollCount: uint(1000), + SetScaleRetries: uint(20), + FunctionPollInterval: time.Millisecond * 50, + CacheExpiry: time.Second * 5, // freshness of replica values before going stale + ServiceQuery: externalServiceQuery, + } + + functionProxy := faasHandlers.Proxy + if config.ScaleFromZero { + scalingFunctionCache := scaling.NewFunctionCache(scalingConfig.CacheExpiry) + scaler := scaling.NewFunctionScaler(scalingConfig, scalingFunctionCache) + functionProxy = handlers.MakeScalingHandler(faasHandlers.Proxy, scaler, scalingConfig, config.Namespace) + } + if config.UseNATS() { log.Println("Async enabled: Using NATS Streaming.") maxReconnect := 60 @@ -141,8 +160,9 @@ func main() { log.Fatalln(queueErr) } + queueFunctionCache := scaling.NewFunctionCache(scalingConfig.CacheExpiry) faasHandlers.QueuedProxy = handlers.MakeNotifierWrapper( - handlers.MakeCallIDMiddleware(handlers.MakeQueuedProxy(metricsOptions, true, natsQueue, trimURLTransformer)), + handlers.MakeCallIDMiddleware(handlers.MakeQueuedProxy(metricsOptions, true, natsQueue, trimURLTransformer, config.Namespace, queueFunctionCache, externalServiceQuery)), forwardingNotifiers, ) @@ -188,20 +208,6 @@ func main() { r := mux.NewRouter() // max wait time to start a function = maxPollCount * functionPollInterval - functionProxy := faasHandlers.Proxy - - if config.ScaleFromZero { - scalingConfig := scaling.ScalingConfig{ - MaxPollCount: uint(1000), - SetScaleRetries: uint(20), - FunctionPollInterval: time.Millisecond * 50, - CacheExpiry: time.Second * 5, // freshness of replica values before going stale - ServiceQuery: externalServiceQuery, - } - - functionProxy = handlers.MakeScalingHandler(faasHandlers.Proxy, scalingConfig, config.Namespace) - } - r.HandleFunc("/function/{name:["+NameExpression+"]+}", functionProxy) r.HandleFunc("/function/{name:["+NameExpression+"]+}/", functionProxy) r.HandleFunc("/function/{name:["+NameExpression+"]+}/{params:.*}", functionProxy) diff --git a/gateway/metrics/metadata_query.go b/gateway/metrics/metadata_query.go new file mode 100644 index 00000000..f6b5b43d --- /dev/null +++ b/gateway/metrics/metadata_query.go @@ -0,0 +1,11 @@ +package metrics + +import "github.com/openfaas/faas-provider/auth" + +type MetadataQuery struct { + Credentials *auth.BasicAuthCredentials +} + +func NewMetadataQuery(credentials *auth.BasicAuthCredentials) *MetadataQuery { + return &MetadataQuery{Credentials: credentials} +} diff --git a/gateway/handlers/basic_auth_injector.go b/gateway/pkg/middleware/basic_auth_injector.go similarity index 96% rename from gateway/handlers/basic_auth_injector.go rename to gateway/pkg/middleware/basic_auth_injector.go index 83fc0220..81e62837 100644 --- a/gateway/handlers/basic_auth_injector.go +++ b/gateway/pkg/middleware/basic_auth_injector.go @@ -1,7 +1,7 @@ // Copyright (c) OpenFaaS Author(s). All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. -package handlers +package middleware import ( "net/http" diff --git a/gateway/handlers/basic_auth_injector_test.go b/gateway/pkg/middleware/basic_auth_injector_test.go similarity index 96% rename from gateway/handlers/basic_auth_injector_test.go rename to gateway/pkg/middleware/basic_auth_injector_test.go index 850bfab5..f844d11f 100644 --- a/gateway/handlers/basic_auth_injector_test.go +++ b/gateway/pkg/middleware/basic_auth_injector_test.go @@ -1,7 +1,7 @@ // Copyright (c) OpenFaaS Author(s). All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. -package handlers +package middleware import ( "net/http" diff --git a/gateway/handlers/serviceauthinjector.go b/gateway/pkg/middleware/serviceauthinjector.go similarity index 80% rename from gateway/handlers/serviceauthinjector.go rename to gateway/pkg/middleware/serviceauthinjector.go index 21a03881..7f9b95b1 100644 --- a/gateway/handlers/serviceauthinjector.go +++ b/gateway/pkg/middleware/serviceauthinjector.go @@ -1,4 +1,4 @@ -package handlers +package middleware import "net/http" diff --git a/gateway/plugin/external.go b/gateway/plugin/external.go index 3e334026..a6454ded 100644 --- a/gateway/plugin/external.go +++ b/gateway/plugin/external.go @@ -16,12 +16,26 @@ import ( "time" types "github.com/openfaas/faas-provider/types" - "github.com/openfaas/faas/gateway/handlers" + middleware "github.com/openfaas/faas/gateway/pkg/middleware" "github.com/openfaas/faas/gateway/scaling" ) +// ExternalServiceQuery proxies service queries to external plugin via HTTP +type ExternalServiceQuery struct { + URL url.URL + ProxyClient http.Client + AuthInjector middleware.AuthInjector +} + +// ScaleServiceRequest request scaling of replica +type ScaleServiceRequest struct { + ServiceName string `json:"serviceName"` + ServiceNamespace string `json:"serviceNamespace"` + Replicas uint64 `json:"replicas"` +} + // NewExternalServiceQuery proxies service queries to external plugin via HTTP -func NewExternalServiceQuery(externalURL url.URL, authInjector handlers.AuthInjector) scaling.ServiceQuery { +func NewExternalServiceQuery(externalURL url.URL, authInjector middleware.AuthInjector) scaling.ServiceQuery { timeout := 3 * time.Second proxyClient := http.Client{ @@ -45,20 +59,6 @@ func NewExternalServiceQuery(externalURL url.URL, authInjector handlers.AuthInje } } -// ExternalServiceQuery proxies service queries to external plugin via HTTP -type ExternalServiceQuery struct { - URL url.URL - ProxyClient http.Client - AuthInjector handlers.AuthInjector -} - -// ScaleServiceRequest request scaling of replica -type ScaleServiceRequest struct { - ServiceName string `json:"serviceName"` - ServiceNamespace string `json:"serviceNamespace"` - Replicas uint64 `json:"replicas"` -} - // GetReplicas replica count for function func (s ExternalServiceQuery) GetReplicas(serviceName, serviceNamespace string) (scaling.ServiceQueryResponse, error) { start := time.Now() @@ -126,6 +126,7 @@ func (s ExternalServiceQuery) GetReplicas(serviceName, serviceNamespace string) MinReplicas: minReplicas, ScalingFactor: scalingFactor, AvailableReplicas: availableReplicas, + Annotations: function.Annotations, }, err } diff --git a/gateway/plugin/external_test.go b/gateway/plugin/external_test.go index 13b3410b..58ebdcf8 100644 --- a/gateway/plugin/external_test.go +++ b/gateway/plugin/external_test.go @@ -7,7 +7,7 @@ import ( "strings" "testing" - "github.com/openfaas/faas/gateway/handlers" + middleware "github.com/openfaas/faas/gateway/pkg/middleware" "github.com/openfaas/faas/gateway/scaling" ) @@ -47,7 +47,7 @@ func TestGetReplicasNonExistentFn(t *testing.T) { })) defer testServer.Close() - var injector handlers.AuthInjector + var injector middleware.AuthInjector url, _ := url.Parse(testServer.URL + "/") esq := NewExternalServiceQuery(*url, injector) @@ -77,7 +77,7 @@ func TestGetReplicasExistentFn(t *testing.T) { AvailableReplicas: 0, } - var injector handlers.AuthInjector + var injector middleware.AuthInjector url, _ := url.Parse(testServer.URL + "/") esq := NewExternalServiceQuery(*url, injector) @@ -102,7 +102,7 @@ func TestSetReplicasNonExistentFn(t *testing.T) { })) defer testServer.Close() - var injector handlers.AuthInjector + var injector middleware.AuthInjector url, _ := url.Parse(testServer.URL + "/") esq := NewExternalServiceQuery(*url, injector) @@ -124,7 +124,7 @@ func TestSetReplicasExistentFn(t *testing.T) { })) defer testServer.Close() - var injector handlers.AuthInjector + var injector middleware.AuthInjector url, _ := url.Parse(testServer.URL + "/") esq := NewExternalServiceQuery(*url, injector) diff --git a/gateway/queue/types.go b/gateway/queue/types.go index 4d8e4e43..f650a15f 100644 --- a/gateway/queue/types.go +++ b/gateway/queue/types.go @@ -43,8 +43,3 @@ type Request struct { type RequestQueuer interface { Queue(req *Request) error } - -// CanQueueRequests can take on asynchronous requests -type CanQueueRequests interface { - Queue(req *Request) error -} diff --git a/gateway/scaling/function_cache.go b/gateway/scaling/function_cache.go index 1f6c9d34..4fe2a662 100644 --- a/gateway/scaling/function_cache.go +++ b/gateway/scaling/function_cache.go @@ -8,17 +8,10 @@ import ( "time" ) -// FunctionMeta holds the last refresh and any other -// meta-data needed for caching. -type FunctionMeta struct { - LastRefresh time.Time - ServiceQueryResponse ServiceQueryResponse -} - -// Expired find out whether the cache item has expired with -// the given expiry duration from when it was stored. -func (fm *FunctionMeta) Expired(expiry time.Duration) bool { - return time.Now().After(fm.LastRefresh.Add(expiry)) +// FunctionCacher queries functions and caches the results +type FunctionCacher interface { + Set(functionName, namespace string, serviceQueryResponse ServiceQueryResponse) + Get(functionName, namespace string) (ServiceQueryResponse, bool) } // FunctionCache provides a cache of Function replica counts @@ -28,8 +21,16 @@ type FunctionCache struct { Sync sync.RWMutex } +// NewFunctionCache creates a function cache to query function metadata +func NewFunctionCache(cacheExpiry time.Duration) FunctionCacher { + return &FunctionCache{ + Cache: make(map[string]*FunctionMeta), + Expiry: cacheExpiry, + } +} + // Set replica count for functionName -func (fc *FunctionCache) Set(functionName, namespace string, serviceQueryResponse ServiceQueryResponse) { +func (fc *FunctionCache) Set(functionName, namespace string, queryRes ServiceQueryResponse) { fc.Sync.Lock() defer fc.Sync.Unlock() @@ -38,14 +39,12 @@ func (fc *FunctionCache) Set(functionName, namespace string, serviceQueryRespons } fc.Cache[functionName+"."+namespace].LastRefresh = time.Now() - fc.Cache[functionName+"."+namespace].ServiceQueryResponse = serviceQueryResponse - // entry.LastRefresh = time.Now() - // entry.ServiceQueryResponse = serviceQueryResponse + fc.Cache[functionName+"."+namespace].ServiceQueryResponse = queryRes } // Get replica count for functionName func (fc *FunctionCache) Get(functionName, namespace string) (ServiceQueryResponse, bool) { - replicas := ServiceQueryResponse{ + queryRes := ServiceQueryResponse{ AvailableReplicas: 0, } @@ -54,9 +53,9 @@ func (fc *FunctionCache) Get(functionName, namespace string) (ServiceQueryRespon defer fc.Sync.RUnlock() if val, exists := fc.Cache[functionName+"."+namespace]; exists { - replicas = val.ServiceQueryResponse + queryRes = val.ServiceQueryResponse hit = !val.Expired(fc.Expiry) } - return replicas, hit + return queryRes, hit } diff --git a/gateway/scaling/function_meta.go b/gateway/scaling/function_meta.go new file mode 100644 index 00000000..73d0576d --- /dev/null +++ b/gateway/scaling/function_meta.go @@ -0,0 +1,21 @@ +// Copyright (c) OpenFaaS Author(s). All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +package scaling + +import ( + "time" +) + +// FunctionMeta holds the last refresh and any other +// meta-data needed for caching. +type FunctionMeta struct { + LastRefresh time.Time + ServiceQueryResponse ServiceQueryResponse +} + +// Expired find out whether the cache item has expired with +// the given expiry duration from when it was stored. +func (fm *FunctionMeta) Expired(expiry time.Duration) bool { + return time.Now().After(fm.LastRefresh.Add(expiry)) +} diff --git a/gateway/scaling/function_scaler.go b/gateway/scaling/function_scaler.go index 93c9b08a..bac88fb9 100644 --- a/gateway/scaling/function_scaler.go +++ b/gateway/scaling/function_scaler.go @@ -8,21 +8,16 @@ import ( // NewFunctionScaler create a new scaler with the specified // ScalingConfig -func NewFunctionScaler(config ScalingConfig) FunctionScaler { - cache := FunctionCache{ - Cache: make(map[string]*FunctionMeta), - Expiry: config.CacheExpiry, - } - +func NewFunctionScaler(config ScalingConfig, functionCacher FunctionCacher) FunctionScaler { return FunctionScaler{ - Cache: &cache, + Cache: functionCacher, Config: config, } } // FunctionScaler scales from zero type FunctionScaler struct { - Cache *FunctionCache + Cache FunctionCacher Config ScalingConfig } diff --git a/gateway/scaling/service_query.go b/gateway/scaling/service_query.go index e33c086b..7d60e61b 100644 --- a/gateway/scaling/service_query.go +++ b/gateway/scaling/service_query.go @@ -16,4 +16,5 @@ type ServiceQueryResponse struct { MinReplicas uint64 ScalingFactor uint64 AvailableReplicas uint64 + Annotations *map[string]string } diff --git a/gateway/vendor/github.com/openfaas/nats-queue-worker/handler/nats_queue.go b/gateway/vendor/github.com/openfaas/nats-queue-worker/handler/nats_queue.go index 1526c992..8d2cf35f 100644 --- a/gateway/vendor/github.com/openfaas/nats-queue-worker/handler/nats_queue.go +++ b/gateway/vendor/github.com/openfaas/nats-queue-worker/handler/nats_queue.go @@ -44,7 +44,12 @@ func (q *NATSQueue) Queue(req *queue.Request) error { nc := q.nc q.ncMutex.RUnlock() - return nc.Publish(q.Topic, out) + queueName := q.Topic + if len(req.QueueName) > 0 { + queueName = req.QueueName + } + + return nc.Publish(queueName, out) } func (q *NATSQueue) connect() error {