mirror of
https://github.com/openfaas/faas.git
synced 2025-06-16 12:16:47 +00:00
Extract a caching function_query type
This type abstracts the function_query type and introduces an interface for testing and substitution. Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alexellis2@gmail.com>
This commit is contained in:
parent
2bfca6d848
commit
18f6c720b5
@ -17,8 +17,10 @@ import (
|
|||||||
"github.com/openfaas/faas/gateway/scaling"
|
"github.com/openfaas/faas/gateway/scaling"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const queueAnnotation = "com.openfaas.queue"
|
||||||
|
|
||||||
// MakeQueuedProxy accepts work onto a queue
|
// MakeQueuedProxy accepts work onto a queue
|
||||||
func MakeQueuedProxy(metrics metrics.MetricOptions, wildcard bool, queuer queue.RequestQueuer, pathTransformer URLPathTransformer, defaultNS string, functionCacher scaling.FunctionCacher, serviceQuery scaling.ServiceQuery) http.HandlerFunc {
|
func MakeQueuedProxy(metrics metrics.MetricOptions, queuer queue.RequestQueuer, pathTransformer URLPathTransformer, defaultNS string, functionQuery scaling.FunctionQuery) http.HandlerFunc {
|
||||||
return func(w http.ResponseWriter, r *http.Request) {
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
if r.Body != nil {
|
if r.Body != nil {
|
||||||
defer r.Body.Close()
|
defer r.Body.Close()
|
||||||
@ -40,7 +42,7 @@ func MakeQueuedProxy(metrics metrics.MetricOptions, wildcard bool, queuer queue.
|
|||||||
vars := mux.Vars(r)
|
vars := mux.Vars(r)
|
||||||
name := vars["name"]
|
name := vars["name"]
|
||||||
|
|
||||||
queueName, err := getQueueName(name, functionCacher, serviceQuery)
|
queueName, err := getQueueName(name, functionQuery)
|
||||||
|
|
||||||
req := &queue.Request{
|
req := &queue.Request{
|
||||||
Function: name,
|
Function: name,
|
||||||
@ -68,29 +70,6 @@ func MakeQueuedProxy(metrics metrics.MetricOptions, wildcard bool, queuer queue.
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func getQueueName(name string, cache scaling.FunctionCacher, serviceQuery scaling.ServiceQuery) (queueName string, err error) {
|
|
||||||
fn, ns := getNameParts(name)
|
|
||||||
|
|
||||||
query, hit := cache.Get(fn, ns)
|
|
||||||
if !hit {
|
|
||||||
queryResponse, err := serviceQuery.GetReplicas(fn, ns)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
cache.Set(fn, ns, queryResponse)
|
|
||||||
}
|
|
||||||
|
|
||||||
query, _ = cache.Get(fn, ns)
|
|
||||||
|
|
||||||
queueName = ""
|
|
||||||
if query.Annotations != nil {
|
|
||||||
if v := (*query.Annotations)["com.openfaas.queue"]; len(v) > 0 {
|
|
||||||
queueName = v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return queueName, err
|
|
||||||
}
|
|
||||||
|
|
||||||
func getCallbackURLHeader(header http.Header) (*url.URL, error) {
|
func getCallbackURLHeader(header http.Header) (*url.URL, error) {
|
||||||
value := header.Get("X-Callback-Url")
|
value := header.Get("X-Callback-Url")
|
||||||
var callbackURL *url.URL
|
var callbackURL *url.URL
|
||||||
@ -107,6 +86,21 @@ func getCallbackURLHeader(header http.Header) (*url.URL, error) {
|
|||||||
return callbackURL, nil
|
return callbackURL, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getQueueName(name string, fnQuery scaling.FunctionQuery) (queueName string, err error) {
|
||||||
|
fn, ns := getNameParts(name)
|
||||||
|
|
||||||
|
annotations, err := fnQuery.GetAnnotations(fn, ns)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
queueName = ""
|
||||||
|
if v := annotations[queueAnnotation]; len(v) > 0 {
|
||||||
|
queueName = v
|
||||||
|
}
|
||||||
|
|
||||||
|
return queueName, err
|
||||||
|
}
|
||||||
|
|
||||||
func getNameParts(name string) (fn, ns string) {
|
func getNameParts(name string) (fn, ns string) {
|
||||||
fn = name
|
fn = name
|
||||||
ns = ""
|
ns = ""
|
||||||
|
@ -161,8 +161,10 @@ func main() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
queueFunctionCache := scaling.NewFunctionCache(scalingConfig.CacheExpiry)
|
queueFunctionCache := scaling.NewFunctionCache(scalingConfig.CacheExpiry)
|
||||||
|
functionQuery := scaling.NewCachedFunctionQuery(queueFunctionCache, externalServiceQuery)
|
||||||
|
|
||||||
faasHandlers.QueuedProxy = handlers.MakeNotifierWrapper(
|
faasHandlers.QueuedProxy = handlers.MakeNotifierWrapper(
|
||||||
handlers.MakeCallIDMiddleware(handlers.MakeQueuedProxy(metricsOptions, true, natsQueue, trimURLTransformer, config.Namespace, queueFunctionCache, externalServiceQuery)),
|
handlers.MakeCallIDMiddleware(handlers.MakeQueuedProxy(metricsOptions, natsQueue, trimURLTransformer, config.Namespace, functionQuery)),
|
||||||
forwardingNotifiers,
|
forwardingNotifiers,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
60
gateway/scaling/function_query.go
Normal file
60
gateway/scaling/function_query.go
Normal file
@ -0,0 +1,60 @@
|
|||||||
|
// Copyright (c) OpenFaaS Author(s). All rights reserved.
|
||||||
|
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
|
||||||
|
|
||||||
|
package scaling
|
||||||
|
|
||||||
|
import "fmt"
|
||||||
|
|
||||||
|
type CachedFunctionQuery struct {
|
||||||
|
cache FunctionCacher
|
||||||
|
serviceQuery ServiceQuery
|
||||||
|
emptyAnnotations map[string]string
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewCachedFunctionQuery(cache FunctionCacher, serviceQuery ServiceQuery) FunctionQuery {
|
||||||
|
return &CachedFunctionQuery{
|
||||||
|
cache: cache,
|
||||||
|
serviceQuery: serviceQuery,
|
||||||
|
emptyAnnotations: map[string]string{},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *CachedFunctionQuery) GetAnnotations(name string, namespace string) (annotations map[string]string, err error) {
|
||||||
|
res, err := c.Get(name, namespace)
|
||||||
|
if err != nil {
|
||||||
|
return c.emptyAnnotations, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if res.Annotations == nil {
|
||||||
|
return c.emptyAnnotations, nil
|
||||||
|
}
|
||||||
|
return *res.Annotations, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *CachedFunctionQuery) Get(fn string, ns string) (ServiceQueryResponse, error) {
|
||||||
|
|
||||||
|
query, hit := c.cache.Get(fn, ns)
|
||||||
|
if !hit {
|
||||||
|
|
||||||
|
// If there is a cache miss, then fetch the value from the provider API
|
||||||
|
queryResponse, err := c.serviceQuery.GetReplicas(fn, ns)
|
||||||
|
if err != nil {
|
||||||
|
return ServiceQueryResponse{}, err
|
||||||
|
}
|
||||||
|
c.cache.Set(fn, ns, queryResponse)
|
||||||
|
}
|
||||||
|
|
||||||
|
// At this point the value almost certainly must be present, so if not
|
||||||
|
// return an error.
|
||||||
|
query, hit = c.cache.Get(fn, ns)
|
||||||
|
if !hit {
|
||||||
|
return ServiceQueryResponse{}, fmt.Errorf("error with cache key: %s", fn+"."+ns)
|
||||||
|
}
|
||||||
|
|
||||||
|
return query, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type FunctionQuery interface {
|
||||||
|
Get(name string, namespace string) (ServiceQueryResponse, error)
|
||||||
|
GetAnnotations(name string, namespace string) (annotations map[string]string, err error)
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user