From 18f6c720b50db7da5f9c410f9fd3369ed7aff379 Mon Sep 17 00:00:00 2001 From: "Alex Ellis (OpenFaaS Ltd)" Date: Wed, 22 Apr 2020 14:14:25 +0100 Subject: [PATCH] Extract a caching function_query type This type abstracts the function_query type and introduces an interface for testing and substitution. Signed-off-by: Alex Ellis (OpenFaaS Ltd) --- gateway/handlers/queue_proxy.go | 44 ++++++++++------------- gateway/main.go | 4 ++- gateway/scaling/function_query.go | 60 +++++++++++++++++++++++++++++++ 3 files changed, 82 insertions(+), 26 deletions(-) create mode 100644 gateway/scaling/function_query.go diff --git a/gateway/handlers/queue_proxy.go b/gateway/handlers/queue_proxy.go index 822c3fa9..15c7b007 100644 --- a/gateway/handlers/queue_proxy.go +++ b/gateway/handlers/queue_proxy.go @@ -17,8 +17,10 @@ import ( "github.com/openfaas/faas/gateway/scaling" ) +const queueAnnotation = "com.openfaas.queue" + // MakeQueuedProxy accepts work onto a queue -func MakeQueuedProxy(metrics metrics.MetricOptions, wildcard bool, queuer queue.RequestQueuer, pathTransformer URLPathTransformer, defaultNS string, functionCacher scaling.FunctionCacher, serviceQuery scaling.ServiceQuery) http.HandlerFunc { +func MakeQueuedProxy(metrics metrics.MetricOptions, queuer queue.RequestQueuer, pathTransformer URLPathTransformer, defaultNS string, functionQuery scaling.FunctionQuery) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { if r.Body != nil { defer r.Body.Close() @@ -40,7 +42,7 @@ func MakeQueuedProxy(metrics metrics.MetricOptions, wildcard bool, queuer queue. vars := mux.Vars(r) name := vars["name"] - queueName, err := getQueueName(name, functionCacher, serviceQuery) + queueName, err := getQueueName(name, functionQuery) req := &queue.Request{ Function: name, @@ -68,29 +70,6 @@ func MakeQueuedProxy(metrics metrics.MetricOptions, wildcard bool, queuer queue. } } -func getQueueName(name string, cache scaling.FunctionCacher, serviceQuery scaling.ServiceQuery) (queueName string, err error) { - fn, ns := getNameParts(name) - - query, hit := cache.Get(fn, ns) - if !hit { - queryResponse, err := serviceQuery.GetReplicas(fn, ns) - if err != nil { - return "", err - } - cache.Set(fn, ns, queryResponse) - } - - query, _ = cache.Get(fn, ns) - - queueName = "" - if query.Annotations != nil { - if v := (*query.Annotations)["com.openfaas.queue"]; len(v) > 0 { - queueName = v - } - } - return queueName, err -} - func getCallbackURLHeader(header http.Header) (*url.URL, error) { value := header.Get("X-Callback-Url") var callbackURL *url.URL @@ -107,6 +86,21 @@ func getCallbackURLHeader(header http.Header) (*url.URL, error) { return callbackURL, nil } +func getQueueName(name string, fnQuery scaling.FunctionQuery) (queueName string, err error) { + fn, ns := getNameParts(name) + + annotations, err := fnQuery.GetAnnotations(fn, ns) + if err != nil { + return "", err + } + queueName = "" + if v := annotations[queueAnnotation]; len(v) > 0 { + queueName = v + } + + return queueName, err +} + func getNameParts(name string) (fn, ns string) { fn = name ns = "" diff --git a/gateway/main.go b/gateway/main.go index 9f63ad9e..2be7a11d 100644 --- a/gateway/main.go +++ b/gateway/main.go @@ -161,8 +161,10 @@ func main() { } queueFunctionCache := scaling.NewFunctionCache(scalingConfig.CacheExpiry) + functionQuery := scaling.NewCachedFunctionQuery(queueFunctionCache, externalServiceQuery) + faasHandlers.QueuedProxy = handlers.MakeNotifierWrapper( - handlers.MakeCallIDMiddleware(handlers.MakeQueuedProxy(metricsOptions, true, natsQueue, trimURLTransformer, config.Namespace, queueFunctionCache, externalServiceQuery)), + handlers.MakeCallIDMiddleware(handlers.MakeQueuedProxy(metricsOptions, natsQueue, trimURLTransformer, config.Namespace, functionQuery)), forwardingNotifiers, ) diff --git a/gateway/scaling/function_query.go b/gateway/scaling/function_query.go new file mode 100644 index 00000000..4cf472e9 --- /dev/null +++ b/gateway/scaling/function_query.go @@ -0,0 +1,60 @@ +// Copyright (c) OpenFaaS Author(s). All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +package scaling + +import "fmt" + +type CachedFunctionQuery struct { + cache FunctionCacher + serviceQuery ServiceQuery + emptyAnnotations map[string]string +} + +func NewCachedFunctionQuery(cache FunctionCacher, serviceQuery ServiceQuery) FunctionQuery { + return &CachedFunctionQuery{ + cache: cache, + serviceQuery: serviceQuery, + emptyAnnotations: map[string]string{}, + } +} + +func (c *CachedFunctionQuery) GetAnnotations(name string, namespace string) (annotations map[string]string, err error) { + res, err := c.Get(name, namespace) + if err != nil { + return c.emptyAnnotations, err + } + + if res.Annotations == nil { + return c.emptyAnnotations, nil + } + return *res.Annotations, nil +} + +func (c *CachedFunctionQuery) Get(fn string, ns string) (ServiceQueryResponse, error) { + + query, hit := c.cache.Get(fn, ns) + if !hit { + + // If there is a cache miss, then fetch the value from the provider API + queryResponse, err := c.serviceQuery.GetReplicas(fn, ns) + if err != nil { + return ServiceQueryResponse{}, err + } + c.cache.Set(fn, ns, queryResponse) + } + + // At this point the value almost certainly must be present, so if not + // return an error. + query, hit = c.cache.Get(fn, ns) + if !hit { + return ServiceQueryResponse{}, fmt.Errorf("error with cache key: %s", fn+"."+ns) + } + + return query, nil +} + +type FunctionQuery interface { + Get(name string, namespace string) (ServiceQueryResponse, error) + GetAnnotations(name string, namespace string) (annotations map[string]string, err error) +}