Publish to multiple topics

Enables publishing to various topics according to annotations
on the functions. The function cache is moved up one level so
that it can be shared between the scale from zero code and the
queue proxy.

Unit tests added for new internal methods.

Tested e2e with arkade and the newest queue-worker and RC
gateway image with two queues and an annotation on one of the
functions of com.openfaas.queue. It worked as expected including
with multiple namespace support.

Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alexellis2@gmail.com>
This commit is contained in:
Alex Ellis (OpenFaaS Ltd) 2020-04-22 11:46:06 +01:00 committed by Alex Ellis
parent a7c6c39200
commit 2bfca6d848
22 changed files with 264 additions and 106 deletions

View File

@ -28,6 +28,7 @@ COPY queue queue
COPY plugin plugin COPY plugin plugin
COPY version version COPY version version
COPY scaling scaling COPY scaling scaling
COPY pkg pkg
COPY main.go . COPY main.go .
# Run a gofmt and exclude all vendored code. # Run a gofmt and exclude all vendored code.

View File

@ -25,6 +25,7 @@ COPY queue queue
COPY plugin plugin COPY plugin plugin
COPY version version COPY version version
COPY scaling scaling COPY scaling scaling
COPY pkg pkg
COPY main.go . COPY main.go .
# Run a gofmt and exclude all vendored code. # Run a gofmt and exclude all vendored code.

View File

@ -25,6 +25,7 @@ COPY queue queue
COPY plugin plugin COPY plugin plugin
COPY version version COPY version version
COPY scaling scaling COPY scaling scaling
COPY pkg pkg
COPY main.go . COPY main.go .
# Run a gofmt and exclude all vendored code. # Run a gofmt and exclude all vendored code.

12
gateway/Gopkg.lock generated
View File

@ -101,12 +101,12 @@
version = "v0.6.0" version = "v0.6.0"
[[projects]] [[projects]]
digest = "1:340f4e2e095ead4e0a15b4646da3e4533f8b6520e3a382eaf586e8166f3bbcb5" digest = "1:437d6b220b852cd40c465c4eaf01ec26fd1dfdfb9c8c67933a256d0c97fdabb8"
name = "github.com/openfaas/faas" name = "github.com/openfaas/faas"
packages = ["gateway/queue"] packages = ["gateway/queue"]
pruneopts = "UT" pruneopts = "UT"
revision = "bfa869ec8c0c04c26c5b0ed434bc367e712dcaef" revision = "a7c6c39200782ea2e1a1b005d56e3edd51e761a7"
version = "0.10.2" version = "0.18.16"
[[projects]] [[projects]]
digest = "1:4a97aa8ada0b2f865ca69a3a3bc0a2524c24f31c578c995d5c52cecb6913a9dc" digest = "1:4a97aa8ada0b2f865ca69a3a3bc0a2524c24f31c578c995d5c52cecb6913a9dc"
@ -120,15 +120,15 @@
version = "0.12.0" version = "0.12.0"
[[projects]] [[projects]]
digest = "1:1cf86a1a93c110ebcf836468bd917e8c116d6d2fc7612829c15e946b02dbf864" digest = "1:e40ae4e58551013ed5ad4085b337718c9fba314372c0751713ed9fe8082a323d"
name = "github.com/openfaas/nats-queue-worker" name = "github.com/openfaas/nats-queue-worker"
packages = [ packages = [
"handler", "handler",
"nats", "nats",
] ]
pruneopts = "UT" pruneopts = "UT"
revision = "a1835cb71db56e6b814b91df027acf62425a76ad" revision = "1f4e16e1f7afe1fbd464fcaa8a7dabaa3e4ef0bb"
version = "0.10.0" version = "0.10.1"
[[projects]] [[projects]]
digest = "1:eb04f69c8991e52eff33c428bd729e04208bf03235be88e4df0d88497c6861b9" digest = "1:eb04f69c8991e52eff33c428bd729e04208bf03235be88e4df0d88497c6861b9"

View File

@ -12,7 +12,7 @@
[[constraint]] [[constraint]]
name = "github.com/openfaas/nats-queue-worker" name = "github.com/openfaas/nats-queue-worker"
version = "0.10.0" version = "0.10.1"
[[constraint]] [[constraint]]
name = "github.com/prometheus/client_golang" name = "github.com/prometheus/client_golang"

View File

@ -14,6 +14,7 @@ import (
"strings" "strings"
"time" "time"
"github.com/openfaas/faas/gateway/pkg/middleware"
"github.com/openfaas/faas/gateway/types" "github.com/openfaas/faas/gateway/types"
) )
@ -43,7 +44,7 @@ func MakeForwardingProxyHandler(proxy *types.HTTPClientReverseProxy,
notifiers []HTTPNotifier, notifiers []HTTPNotifier,
baseURLResolver BaseURLResolver, baseURLResolver BaseURLResolver,
urlPathTransformer URLPathTransformer, urlPathTransformer URLPathTransformer,
serviceAuthInjector AuthInjector) http.HandlerFunc { serviceAuthInjector middleware.AuthInjector) http.HandlerFunc {
writeRequestURI := false writeRequestURI := false
if _, exists := os.LookupEnv("write_request_uri"); exists { if _, exists := os.LookupEnv("write_request_uri"); exists {
@ -108,7 +109,7 @@ func forwardRequest(w http.ResponseWriter,
requestURL string, requestURL string,
timeout time.Duration, timeout time.Duration,
writeRequestURI bool, writeRequestURI bool,
serviceAuthInjector AuthInjector) (int, error) { serviceAuthInjector middleware.AuthInjector) (int, error) {
upstreamReq := buildUpstreamRequest(r, baseURL, requestURL) upstreamReq := buildUpstreamRequest(r, baseURL, requestURL)
if upstreamReq.Body != nil { if upstreamReq.Body != nil {

View File

@ -6,16 +6,19 @@ package handlers
import ( import (
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"log"
"net/http" "net/http"
"net/url" "net/url"
"strings"
"github.com/gorilla/mux" "github.com/gorilla/mux"
"github.com/openfaas/faas/gateway/metrics" "github.com/openfaas/faas/gateway/metrics"
"github.com/openfaas/faas/gateway/queue" "github.com/openfaas/faas/gateway/queue"
"github.com/openfaas/faas/gateway/scaling"
) )
// MakeQueuedProxy accepts work onto a queue // MakeQueuedProxy accepts work onto a queue
func MakeQueuedProxy(metrics metrics.MetricOptions, wildcard bool, canQueueRequests queue.CanQueueRequests, pathTransformer URLPathTransformer) http.HandlerFunc { func MakeQueuedProxy(metrics metrics.MetricOptions, wildcard bool, queuer queue.RequestQueuer, pathTransformer URLPathTransformer, defaultNS string, functionCacher scaling.FunctionCacher, serviceQuery scaling.ServiceQuery) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
if r.Body != nil { if r.Body != nil {
defer r.Body.Close() defer r.Body.Close()
@ -24,29 +27,20 @@ func MakeQueuedProxy(metrics metrics.MetricOptions, wildcard bool, canQueueReque
body, err := ioutil.ReadAll(r.Body) body, err := ioutil.ReadAll(r.Body)
if err != nil { if err != nil {
w.WriteHeader(http.StatusBadRequest) http.Error(w, err.Error(), http.StatusBadRequest)
return
}
w.Write([]byte(err.Error())) callbackURL, err := getCallbackURLHeader(r.Header)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return return
} }
vars := mux.Vars(r) vars := mux.Vars(r)
name := vars["name"] name := vars["name"]
callbackURLHeader := r.Header.Get("X-Callback-Url") queueName, err := getQueueName(name, functionCacher, serviceQuery)
var callbackURL *url.URL
if len(callbackURLHeader) > 0 {
urlVal, urlErr := url.Parse(callbackURLHeader)
if urlErr != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(urlErr.Error()))
return
}
callbackURL = urlVal
}
req := &queue.Request{ req := &queue.Request{
Function: name, Function: name,
@ -57,15 +51,69 @@ func MakeQueuedProxy(metrics metrics.MetricOptions, wildcard bool, canQueueReque
Header: r.Header, Header: r.Header,
Host: r.Host, Host: r.Host,
CallbackURL: callbackURL, CallbackURL: callbackURL,
QueueName: queueName,
} }
if err = canQueueRequests.Queue(req); err != nil { if len(queueName) > 0 {
w.WriteHeader(http.StatusInternalServerError) log.Printf("Queueing %s to: %s\n", name, queueName)
w.Write([]byte(err.Error())) }
fmt.Println(err)
if err = queuer.Queue(req); err != nil {
fmt.Printf("Queue error: %v\n", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return return
} }
w.WriteHeader(http.StatusAccepted) w.WriteHeader(http.StatusAccepted)
} }
} }
func getQueueName(name string, cache scaling.FunctionCacher, serviceQuery scaling.ServiceQuery) (queueName string, err error) {
fn, ns := getNameParts(name)
query, hit := cache.Get(fn, ns)
if !hit {
queryResponse, err := serviceQuery.GetReplicas(fn, ns)
if err != nil {
return "", err
}
cache.Set(fn, ns, queryResponse)
}
query, _ = cache.Get(fn, ns)
queueName = ""
if query.Annotations != nil {
if v := (*query.Annotations)["com.openfaas.queue"]; len(v) > 0 {
queueName = v
}
}
return queueName, err
}
func getCallbackURLHeader(header http.Header) (*url.URL, error) {
value := header.Get("X-Callback-Url")
var callbackURL *url.URL
if len(value) > 0 {
urlVal, err := url.Parse(value)
if err != nil {
return callbackURL, err
}
callbackURL = urlVal
}
return callbackURL, nil
}
func getNameParts(name string) (fn, ns string) {
fn = name
ns = ""
if index := strings.LastIndex(name, "."); index > 0 {
fn = name[:index]
ns = name[index+1:]
}
return fn, ns
}

View File

@ -0,0 +1,74 @@
// Copyright (c) Alex Ellis 2017. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
package handlers
import (
"net/http"
"testing"
)
func Test_getNameParts(t *testing.T) {
fn, ns := getNameParts("figlet.openfaas-fn")
wantFn := "figlet"
wantNs := "openfaas-fn"
if fn != wantFn {
t.Fatalf("want %s, got %s", wantFn, fn)
}
if ns != wantNs {
t.Fatalf("want %s, got %s", wantNs, ns)
}
}
func Test_getNamePartsDualDot(t *testing.T) {
fn, ns := getNameParts("dev.figlet.openfaas-fn")
wantFn := "dev.figlet"
wantNs := "openfaas-fn"
if fn != wantFn {
t.Fatalf("want %s, got %s", wantFn, fn)
}
if ns != wantNs {
t.Fatalf("want %s, got %s", wantNs, ns)
}
}
func Test_getNameParts_NoNs(t *testing.T) {
fn, ns := getNameParts("figlet")
wantFn := "figlet"
wantNs := ""
if fn != wantFn {
t.Fatalf("want %s, got %s", wantFn, fn)
}
if ns != wantNs {
t.Fatalf("want %s, got %s", wantNs, ns)
}
}
func Test_getCallbackURLHeader(t *testing.T) {
want := "http://localhost:8080"
header := http.Header{}
header.Add("X-Callback-Url", want)
uri, err := getCallbackURLHeader(header)
if err != nil {
t.Fatal(err)
}
if uri.String() != want {
t.Fatalf("want %s, but got %s", want, uri.String())
}
}
func Test_getCallbackURLHeader_ParseFails(t *testing.T) {
want := "ht tp://foo.com"
header := http.Header{}
header.Add("X-Callback-Url", want)
_, err := getCallbackURLHeader(header)
if err == nil {
t.Fatal("wanted a parsing error.")
}
}

View File

@ -24,9 +24,7 @@ func getNamespace(defaultNamespace, fullName string) (string, string) {
// be called. If the function is not ready after the configured // be called. If the function is not ready after the configured
// amount of attempts / queries then next will not be invoked and a status // amount of attempts / queries then next will not be invoked and a status
// will be returned to the client. // will be returned to the client.
func MakeScalingHandler(next http.HandlerFunc, config scaling.ScalingConfig, defaultNamespace string) http.HandlerFunc { func MakeScalingHandler(next http.HandlerFunc, scaler scaling.FunctionScaler, config scaling.ScalingConfig, defaultNamespace string) http.HandlerFunc {
scaler := scaling.NewFunctionScaler(config)
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {

View File

@ -13,6 +13,7 @@ import (
"github.com/openfaas/faas-provider/auth" "github.com/openfaas/faas-provider/auth"
"github.com/openfaas/faas/gateway/handlers" "github.com/openfaas/faas/gateway/handlers"
"github.com/openfaas/faas/gateway/metrics" "github.com/openfaas/faas/gateway/metrics"
"github.com/openfaas/faas/gateway/pkg/middleware"
"github.com/openfaas/faas/gateway/plugin" "github.com/openfaas/faas/gateway/plugin"
"github.com/openfaas/faas/gateway/scaling" "github.com/openfaas/faas/gateway/scaling"
"github.com/openfaas/faas/gateway/types" "github.com/openfaas/faas/gateway/types"
@ -60,6 +61,9 @@ func main() {
servicePollInterval := time.Second * 5 servicePollInterval := time.Second * 5
metadataQuery := metrics.NewMetadataQuery(credentials)
fmt.Println(metadataQuery)
metricsOptions := metrics.BuildMetricsOptions() metricsOptions := metrics.BuildMetricsOptions()
exporter := metrics.NewExporter(metricsOptions, credentials) exporter := metrics.NewExporter(metricsOptions, credentials)
exporter.StartServiceWatcher(*config.FunctionsProviderURL, metricsOptions, "func", servicePollInterval) exporter.StartServiceWatcher(*config.FunctionsProviderURL, metricsOptions, "func", servicePollInterval)
@ -100,10 +104,10 @@ func main() {
functionURLTransformer = nilURLTransformer functionURLTransformer = nilURLTransformer
} }
var serviceAuthInjector handlers.AuthInjector var serviceAuthInjector middleware.AuthInjector
if config.UseBasicAuth { if config.UseBasicAuth {
serviceAuthInjector = &handlers.BasicAuthInjector{Credentials: credentials} serviceAuthInjector = &middleware.BasicAuthInjector{Credentials: credentials}
} }
decorateExternalAuth := handlers.MakeExternalAuthHandler decorateExternalAuth := handlers.MakeExternalAuthHandler
@ -129,6 +133,21 @@ func main() {
faasHandlers.LogProxyHandler = handlers.NewLogHandlerFunc(*config.LogsProviderURL, config.WriteTimeout) faasHandlers.LogProxyHandler = handlers.NewLogHandlerFunc(*config.LogsProviderURL, config.WriteTimeout)
scalingConfig := scaling.ScalingConfig{
MaxPollCount: uint(1000),
SetScaleRetries: uint(20),
FunctionPollInterval: time.Millisecond * 50,
CacheExpiry: time.Second * 5, // freshness of replica values before going stale
ServiceQuery: externalServiceQuery,
}
functionProxy := faasHandlers.Proxy
if config.ScaleFromZero {
scalingFunctionCache := scaling.NewFunctionCache(scalingConfig.CacheExpiry)
scaler := scaling.NewFunctionScaler(scalingConfig, scalingFunctionCache)
functionProxy = handlers.MakeScalingHandler(faasHandlers.Proxy, scaler, scalingConfig, config.Namespace)
}
if config.UseNATS() { if config.UseNATS() {
log.Println("Async enabled: Using NATS Streaming.") log.Println("Async enabled: Using NATS Streaming.")
maxReconnect := 60 maxReconnect := 60
@ -141,8 +160,9 @@ func main() {
log.Fatalln(queueErr) log.Fatalln(queueErr)
} }
queueFunctionCache := scaling.NewFunctionCache(scalingConfig.CacheExpiry)
faasHandlers.QueuedProxy = handlers.MakeNotifierWrapper( faasHandlers.QueuedProxy = handlers.MakeNotifierWrapper(
handlers.MakeCallIDMiddleware(handlers.MakeQueuedProxy(metricsOptions, true, natsQueue, trimURLTransformer)), handlers.MakeCallIDMiddleware(handlers.MakeQueuedProxy(metricsOptions, true, natsQueue, trimURLTransformer, config.Namespace, queueFunctionCache, externalServiceQuery)),
forwardingNotifiers, forwardingNotifiers,
) )
@ -188,20 +208,6 @@ func main() {
r := mux.NewRouter() r := mux.NewRouter()
// max wait time to start a function = maxPollCount * functionPollInterval // max wait time to start a function = maxPollCount * functionPollInterval
functionProxy := faasHandlers.Proxy
if config.ScaleFromZero {
scalingConfig := scaling.ScalingConfig{
MaxPollCount: uint(1000),
SetScaleRetries: uint(20),
FunctionPollInterval: time.Millisecond * 50,
CacheExpiry: time.Second * 5, // freshness of replica values before going stale
ServiceQuery: externalServiceQuery,
}
functionProxy = handlers.MakeScalingHandler(faasHandlers.Proxy, scalingConfig, config.Namespace)
}
r.HandleFunc("/function/{name:["+NameExpression+"]+}", functionProxy) r.HandleFunc("/function/{name:["+NameExpression+"]+}", functionProxy)
r.HandleFunc("/function/{name:["+NameExpression+"]+}/", functionProxy) r.HandleFunc("/function/{name:["+NameExpression+"]+}/", functionProxy)
r.HandleFunc("/function/{name:["+NameExpression+"]+}/{params:.*}", functionProxy) r.HandleFunc("/function/{name:["+NameExpression+"]+}/{params:.*}", functionProxy)

View File

@ -0,0 +1,11 @@
package metrics
import "github.com/openfaas/faas-provider/auth"
type MetadataQuery struct {
Credentials *auth.BasicAuthCredentials
}
func NewMetadataQuery(credentials *auth.BasicAuthCredentials) *MetadataQuery {
return &MetadataQuery{Credentials: credentials}
}

View File

@ -1,7 +1,7 @@
// Copyright (c) OpenFaaS Author(s). All rights reserved. // Copyright (c) OpenFaaS Author(s). All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information. // Licensed under the MIT license. See LICENSE file in the project root for full license information.
package handlers package middleware
import ( import (
"net/http" "net/http"

View File

@ -1,7 +1,7 @@
// Copyright (c) OpenFaaS Author(s). All rights reserved. // Copyright (c) OpenFaaS Author(s). All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information. // Licensed under the MIT license. See LICENSE file in the project root for full license information.
package handlers package middleware
import ( import (
"net/http" "net/http"

View File

@ -1,4 +1,4 @@
package handlers package middleware
import "net/http" import "net/http"

View File

@ -16,12 +16,26 @@ import (
"time" "time"
types "github.com/openfaas/faas-provider/types" types "github.com/openfaas/faas-provider/types"
"github.com/openfaas/faas/gateway/handlers" middleware "github.com/openfaas/faas/gateway/pkg/middleware"
"github.com/openfaas/faas/gateway/scaling" "github.com/openfaas/faas/gateway/scaling"
) )
// ExternalServiceQuery proxies service queries to external plugin via HTTP
type ExternalServiceQuery struct {
URL url.URL
ProxyClient http.Client
AuthInjector middleware.AuthInjector
}
// ScaleServiceRequest request scaling of replica
type ScaleServiceRequest struct {
ServiceName string `json:"serviceName"`
ServiceNamespace string `json:"serviceNamespace"`
Replicas uint64 `json:"replicas"`
}
// NewExternalServiceQuery proxies service queries to external plugin via HTTP // NewExternalServiceQuery proxies service queries to external plugin via HTTP
func NewExternalServiceQuery(externalURL url.URL, authInjector handlers.AuthInjector) scaling.ServiceQuery { func NewExternalServiceQuery(externalURL url.URL, authInjector middleware.AuthInjector) scaling.ServiceQuery {
timeout := 3 * time.Second timeout := 3 * time.Second
proxyClient := http.Client{ proxyClient := http.Client{
@ -45,20 +59,6 @@ func NewExternalServiceQuery(externalURL url.URL, authInjector handlers.AuthInje
} }
} }
// ExternalServiceQuery proxies service queries to external plugin via HTTP
type ExternalServiceQuery struct {
URL url.URL
ProxyClient http.Client
AuthInjector handlers.AuthInjector
}
// ScaleServiceRequest request scaling of replica
type ScaleServiceRequest struct {
ServiceName string `json:"serviceName"`
ServiceNamespace string `json:"serviceNamespace"`
Replicas uint64 `json:"replicas"`
}
// GetReplicas replica count for function // GetReplicas replica count for function
func (s ExternalServiceQuery) GetReplicas(serviceName, serviceNamespace string) (scaling.ServiceQueryResponse, error) { func (s ExternalServiceQuery) GetReplicas(serviceName, serviceNamespace string) (scaling.ServiceQueryResponse, error) {
start := time.Now() start := time.Now()
@ -126,6 +126,7 @@ func (s ExternalServiceQuery) GetReplicas(serviceName, serviceNamespace string)
MinReplicas: minReplicas, MinReplicas: minReplicas,
ScalingFactor: scalingFactor, ScalingFactor: scalingFactor,
AvailableReplicas: availableReplicas, AvailableReplicas: availableReplicas,
Annotations: function.Annotations,
}, err }, err
} }

View File

@ -7,7 +7,7 @@ import (
"strings" "strings"
"testing" "testing"
"github.com/openfaas/faas/gateway/handlers" middleware "github.com/openfaas/faas/gateway/pkg/middleware"
"github.com/openfaas/faas/gateway/scaling" "github.com/openfaas/faas/gateway/scaling"
) )
@ -47,7 +47,7 @@ func TestGetReplicasNonExistentFn(t *testing.T) {
})) }))
defer testServer.Close() defer testServer.Close()
var injector handlers.AuthInjector var injector middleware.AuthInjector
url, _ := url.Parse(testServer.URL + "/") url, _ := url.Parse(testServer.URL + "/")
esq := NewExternalServiceQuery(*url, injector) esq := NewExternalServiceQuery(*url, injector)
@ -77,7 +77,7 @@ func TestGetReplicasExistentFn(t *testing.T) {
AvailableReplicas: 0, AvailableReplicas: 0,
} }
var injector handlers.AuthInjector var injector middleware.AuthInjector
url, _ := url.Parse(testServer.URL + "/") url, _ := url.Parse(testServer.URL + "/")
esq := NewExternalServiceQuery(*url, injector) esq := NewExternalServiceQuery(*url, injector)
@ -102,7 +102,7 @@ func TestSetReplicasNonExistentFn(t *testing.T) {
})) }))
defer testServer.Close() defer testServer.Close()
var injector handlers.AuthInjector var injector middleware.AuthInjector
url, _ := url.Parse(testServer.URL + "/") url, _ := url.Parse(testServer.URL + "/")
esq := NewExternalServiceQuery(*url, injector) esq := NewExternalServiceQuery(*url, injector)
@ -124,7 +124,7 @@ func TestSetReplicasExistentFn(t *testing.T) {
})) }))
defer testServer.Close() defer testServer.Close()
var injector handlers.AuthInjector var injector middleware.AuthInjector
url, _ := url.Parse(testServer.URL + "/") url, _ := url.Parse(testServer.URL + "/")
esq := NewExternalServiceQuery(*url, injector) esq := NewExternalServiceQuery(*url, injector)

View File

@ -43,8 +43,3 @@ type Request struct {
type RequestQueuer interface { type RequestQueuer interface {
Queue(req *Request) error Queue(req *Request) error
} }
// CanQueueRequests can take on asynchronous requests
type CanQueueRequests interface {
Queue(req *Request) error
}

View File

@ -8,17 +8,10 @@ import (
"time" "time"
) )
// FunctionMeta holds the last refresh and any other // FunctionCacher queries functions and caches the results
// meta-data needed for caching. type FunctionCacher interface {
type FunctionMeta struct { Set(functionName, namespace string, serviceQueryResponse ServiceQueryResponse)
LastRefresh time.Time Get(functionName, namespace string) (ServiceQueryResponse, bool)
ServiceQueryResponse ServiceQueryResponse
}
// Expired find out whether the cache item has expired with
// the given expiry duration from when it was stored.
func (fm *FunctionMeta) Expired(expiry time.Duration) bool {
return time.Now().After(fm.LastRefresh.Add(expiry))
} }
// FunctionCache provides a cache of Function replica counts // FunctionCache provides a cache of Function replica counts
@ -28,8 +21,16 @@ type FunctionCache struct {
Sync sync.RWMutex Sync sync.RWMutex
} }
// NewFunctionCache creates a function cache to query function metadata
func NewFunctionCache(cacheExpiry time.Duration) FunctionCacher {
return &FunctionCache{
Cache: make(map[string]*FunctionMeta),
Expiry: cacheExpiry,
}
}
// Set replica count for functionName // Set replica count for functionName
func (fc *FunctionCache) Set(functionName, namespace string, serviceQueryResponse ServiceQueryResponse) { func (fc *FunctionCache) Set(functionName, namespace string, queryRes ServiceQueryResponse) {
fc.Sync.Lock() fc.Sync.Lock()
defer fc.Sync.Unlock() defer fc.Sync.Unlock()
@ -38,14 +39,12 @@ func (fc *FunctionCache) Set(functionName, namespace string, serviceQueryRespons
} }
fc.Cache[functionName+"."+namespace].LastRefresh = time.Now() fc.Cache[functionName+"."+namespace].LastRefresh = time.Now()
fc.Cache[functionName+"."+namespace].ServiceQueryResponse = serviceQueryResponse fc.Cache[functionName+"."+namespace].ServiceQueryResponse = queryRes
// entry.LastRefresh = time.Now()
// entry.ServiceQueryResponse = serviceQueryResponse
} }
// Get replica count for functionName // Get replica count for functionName
func (fc *FunctionCache) Get(functionName, namespace string) (ServiceQueryResponse, bool) { func (fc *FunctionCache) Get(functionName, namespace string) (ServiceQueryResponse, bool) {
replicas := ServiceQueryResponse{ queryRes := ServiceQueryResponse{
AvailableReplicas: 0, AvailableReplicas: 0,
} }
@ -54,9 +53,9 @@ func (fc *FunctionCache) Get(functionName, namespace string) (ServiceQueryRespon
defer fc.Sync.RUnlock() defer fc.Sync.RUnlock()
if val, exists := fc.Cache[functionName+"."+namespace]; exists { if val, exists := fc.Cache[functionName+"."+namespace]; exists {
replicas = val.ServiceQueryResponse queryRes = val.ServiceQueryResponse
hit = !val.Expired(fc.Expiry) hit = !val.Expired(fc.Expiry)
} }
return replicas, hit return queryRes, hit
} }

View File

@ -0,0 +1,21 @@
// Copyright (c) OpenFaaS Author(s). All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
package scaling
import (
"time"
)
// FunctionMeta holds the last refresh and any other
// meta-data needed for caching.
type FunctionMeta struct {
LastRefresh time.Time
ServiceQueryResponse ServiceQueryResponse
}
// Expired find out whether the cache item has expired with
// the given expiry duration from when it was stored.
func (fm *FunctionMeta) Expired(expiry time.Duration) bool {
return time.Now().After(fm.LastRefresh.Add(expiry))
}

View File

@ -8,21 +8,16 @@ import (
// NewFunctionScaler create a new scaler with the specified // NewFunctionScaler create a new scaler with the specified
// ScalingConfig // ScalingConfig
func NewFunctionScaler(config ScalingConfig) FunctionScaler { func NewFunctionScaler(config ScalingConfig, functionCacher FunctionCacher) FunctionScaler {
cache := FunctionCache{
Cache: make(map[string]*FunctionMeta),
Expiry: config.CacheExpiry,
}
return FunctionScaler{ return FunctionScaler{
Cache: &cache, Cache: functionCacher,
Config: config, Config: config,
} }
} }
// FunctionScaler scales from zero // FunctionScaler scales from zero
type FunctionScaler struct { type FunctionScaler struct {
Cache *FunctionCache Cache FunctionCacher
Config ScalingConfig Config ScalingConfig
} }

View File

@ -16,4 +16,5 @@ type ServiceQueryResponse struct {
MinReplicas uint64 MinReplicas uint64
ScalingFactor uint64 ScalingFactor uint64
AvailableReplicas uint64 AvailableReplicas uint64
Annotations *map[string]string
} }

View File

@ -44,7 +44,12 @@ func (q *NATSQueue) Queue(req *queue.Request) error {
nc := q.nc nc := q.nc
q.ncMutex.RUnlock() q.ncMutex.RUnlock()
return nc.Publish(q.Topic, out) queueName := q.Topic
if len(req.QueueName) > 0 {
queueName = req.QueueName
}
return nc.Publish(queueName, out)
} }
func (q *NATSQueue) connect() error { func (q *NATSQueue) connect() error {