faas/gateway/scaling/function_cache.go
Alex Ellis (OpenFaaS Ltd) 2bfca6d848 Publish to multiple topics
Enables publishing to various topics according to annotations
on the functions. The function cache is moved up one level so
that it can be shared between the scale from zero code and the
queue proxy.

Unit tests added for new internal methods.

Tested e2e with arkade and the newest queue-worker and RC
gateway image with two queues and an annotation on one of the
functions of com.openfaas.queue. It worked as expected including
with multiple namespace support.

Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alexellis2@gmail.com>
2020-04-22 15:26:42 +01:00

62 lines
1.7 KiB
Go

// Copyright (c) OpenFaaS Author(s). All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
package scaling
import (
"sync"
"time"
)
// FunctionCacher queries functions and caches the results
type FunctionCacher interface {
Set(functionName, namespace string, serviceQueryResponse ServiceQueryResponse)
Get(functionName, namespace string) (ServiceQueryResponse, bool)
}
// FunctionCache provides a cache of Function replica counts
type FunctionCache struct {
Cache map[string]*FunctionMeta
Expiry time.Duration
Sync sync.RWMutex
}
// NewFunctionCache creates a function cache to query function metadata
func NewFunctionCache(cacheExpiry time.Duration) FunctionCacher {
return &FunctionCache{
Cache: make(map[string]*FunctionMeta),
Expiry: cacheExpiry,
}
}
// Set replica count for functionName
func (fc *FunctionCache) Set(functionName, namespace string, queryRes ServiceQueryResponse) {
fc.Sync.Lock()
defer fc.Sync.Unlock()
if _, exists := fc.Cache[functionName+"."+namespace]; !exists {
fc.Cache[functionName+"."+namespace] = &FunctionMeta{}
}
fc.Cache[functionName+"."+namespace].LastRefresh = time.Now()
fc.Cache[functionName+"."+namespace].ServiceQueryResponse = queryRes
}
// Get replica count for functionName
func (fc *FunctionCache) Get(functionName, namespace string) (ServiceQueryResponse, bool) {
queryRes := ServiceQueryResponse{
AvailableReplicas: 0,
}
hit := false
fc.Sync.RLock()
defer fc.Sync.RUnlock()
if val, exists := fc.Cache[functionName+"."+namespace]; exists {
queryRes = val.ServiceQueryResponse
hit = !val.Expired(fc.Expiry)
}
return queryRes, hit
}