mirror of
https://github.com/openfaas/faas.git
synced 2025-06-19 04:26:35 +00:00
Scale functions with namespace option
Allows alerts to trigger functions to scale when they also have an optional namespace set. Tested e2e with Kubernetes 1.15 and a non-default namespace. Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alexellis2@gmail.com>
This commit is contained in:
committed by
Alex Ellis
parent
238ce1be23
commit
df4126d8f5
@ -16,7 +16,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// MakeAlertHandler handles alerts from Prometheus Alertmanager
|
// MakeAlertHandler handles alerts from Prometheus Alertmanager
|
||||||
func MakeAlertHandler(service scaling.ServiceQuery) http.HandlerFunc {
|
func MakeAlertHandler(service scaling.ServiceQuery, defaultNamespace string) http.HandlerFunc {
|
||||||
return func(w http.ResponseWriter, r *http.Request) {
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
|
||||||
log.Println("Alert received.")
|
log.Println("Alert received.")
|
||||||
@ -42,7 +42,7 @@ func MakeAlertHandler(service scaling.ServiceQuery) http.HandlerFunc {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
errors := handleAlerts(&req, service)
|
errors := handleAlerts(&req, service, defaultNamespace)
|
||||||
if len(errors) > 0 {
|
if len(errors) > 0 {
|
||||||
log.Println(errors)
|
log.Println(errors)
|
||||||
var errorOutput string
|
var errorOutput string
|
||||||
@ -58,10 +58,10 @@ func MakeAlertHandler(service scaling.ServiceQuery) http.HandlerFunc {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func handleAlerts(req *requests.PrometheusAlert, service scaling.ServiceQuery) []error {
|
func handleAlerts(req *requests.PrometheusAlert, service scaling.ServiceQuery, defaultNamespace string) []error {
|
||||||
var errors []error
|
var errors []error
|
||||||
for _, alert := range req.Alerts {
|
for _, alert := range req.Alerts {
|
||||||
if err := scaleService(alert, service); err != nil {
|
if err := scaleService(alert, service, defaultNamespace); err != nil {
|
||||||
log.Println(err)
|
log.Println(err)
|
||||||
errors = append(errors, err)
|
errors = append(errors, err)
|
||||||
}
|
}
|
||||||
@ -70,12 +70,13 @@ func handleAlerts(req *requests.PrometheusAlert, service scaling.ServiceQuery) [
|
|||||||
return errors
|
return errors
|
||||||
}
|
}
|
||||||
|
|
||||||
func scaleService(alert requests.PrometheusInnerAlert, service scaling.ServiceQuery) error {
|
func scaleService(alert requests.PrometheusInnerAlert, service scaling.ServiceQuery, defaultNamespace string) error {
|
||||||
var err error
|
var err error
|
||||||
serviceName := alert.Labels.FunctionName
|
|
||||||
|
serviceName, namespace := getNamespace(defaultNamespace, alert.Labels.FunctionName)
|
||||||
|
|
||||||
if len(serviceName) > 0 {
|
if len(serviceName) > 0 {
|
||||||
queryResponse, getErr := service.GetReplicas(serviceName)
|
queryResponse, getErr := service.GetReplicas(serviceName, namespace)
|
||||||
if getErr == nil {
|
if getErr == nil {
|
||||||
status := alert.Status
|
status := alert.Status
|
||||||
|
|
||||||
@ -86,7 +87,7 @@ func scaleService(alert requests.PrometheusInnerAlert, service scaling.ServiceQu
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
updateErr := service.SetReplicas(serviceName, newReplicas)
|
updateErr := service.SetReplicas(serviceName, namespace, newReplicas)
|
||||||
if updateErr != nil {
|
if updateErr != nil {
|
||||||
err = updateErr
|
err = updateErr
|
||||||
}
|
}
|
||||||
|
45
gateway/handlers/namespaces_test.go
Normal file
45
gateway/handlers/namespaces_test.go
Normal file
@ -0,0 +1,45 @@
|
|||||||
|
// Copyright (c) Alex Ellis 2017. All rights reserved.
|
||||||
|
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
|
||||||
|
|
||||||
|
package handlers
|
||||||
|
|
||||||
|
import "testing"
|
||||||
|
|
||||||
|
func Test_getNamespace_Default(t *testing.T) {
|
||||||
|
root, ns := getNamespace("openfaas-fn", "figlet.openfaas-fn")
|
||||||
|
wantRoot := "figlet"
|
||||||
|
wantNs := "openfaas-fn"
|
||||||
|
|
||||||
|
if root != wantRoot {
|
||||||
|
t.Errorf("function root: want %s, got %s", wantRoot, root)
|
||||||
|
}
|
||||||
|
if ns != wantNs {
|
||||||
|
t.Errorf("function ns: want %s, got %s", wantNs, ns)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func Test_getNamespace_Override(t *testing.T) {
|
||||||
|
root, ns := getNamespace("fn", "figlet.fn")
|
||||||
|
wantRoot := "figlet"
|
||||||
|
wantNs := "fn"
|
||||||
|
|
||||||
|
if root != wantRoot {
|
||||||
|
t.Errorf("function root: want %s, got %s", wantRoot, root)
|
||||||
|
}
|
||||||
|
if ns != wantNs {
|
||||||
|
t.Errorf("function ns: want %s, got %s", wantNs, ns)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func Test_getNamespace_Empty(t *testing.T) {
|
||||||
|
root, ns := getNamespace("", "figlet")
|
||||||
|
wantRoot := "figlet"
|
||||||
|
wantNs := ""
|
||||||
|
|
||||||
|
if root != wantRoot {
|
||||||
|
t.Errorf("function root: want %s, got %s", wantRoot, root)
|
||||||
|
}
|
||||||
|
if ns != wantNs {
|
||||||
|
t.Errorf("function ns: want %s, got %s", wantNs, ns)
|
||||||
|
}
|
||||||
|
}
|
@ -7,23 +7,33 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"strings"
|
||||||
|
|
||||||
"github.com/openfaas/faas/gateway/scaling"
|
"github.com/openfaas/faas/gateway/scaling"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func getNamespace(defaultNamespace, fullName string) (string, string) {
|
||||||
|
if index := strings.LastIndex(fullName, "."); index > -1 {
|
||||||
|
return fullName[:index], fullName[index+1:]
|
||||||
|
}
|
||||||
|
return fullName, defaultNamespace
|
||||||
|
}
|
||||||
|
|
||||||
// MakeScalingHandler creates handler which can scale a function from
|
// MakeScalingHandler creates handler which can scale a function from
|
||||||
// zero to N replica(s). After scaling the next http.HandlerFunc will
|
// zero to N replica(s). After scaling the next http.HandlerFunc will
|
||||||
// be called. If the function is not ready after the configured
|
// be called. If the function is not ready after the configured
|
||||||
// amount of attempts / queries then next will not be invoked and a status
|
// amount of attempts / queries then next will not be invoked and a status
|
||||||
// will be returned to the client.
|
// will be returned to the client.
|
||||||
func MakeScalingHandler(next http.HandlerFunc, config scaling.ScalingConfig) http.HandlerFunc {
|
func MakeScalingHandler(next http.HandlerFunc, config scaling.ScalingConfig, defaultNamespace string) http.HandlerFunc {
|
||||||
|
|
||||||
scaler := scaling.NewFunctionScaler(config)
|
scaler := scaling.NewFunctionScaler(config)
|
||||||
|
|
||||||
return func(w http.ResponseWriter, r *http.Request) {
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
|
||||||
functionName := getServiceName(r.URL.String())
|
functionName := getServiceName(r.URL.String())
|
||||||
res := scaler.Scale(functionName)
|
_, namespace := getNamespace(defaultNamespace, functionName)
|
||||||
|
|
||||||
|
res := scaler.Scale(functionName, namespace)
|
||||||
|
|
||||||
if !res.Found {
|
if !res.Found {
|
||||||
errStr := fmt.Sprintf("error finding function %s: %s", functionName, res.Error.Error())
|
errStr := fmt.Sprintf("error finding function %s: %s", functionName, res.Error.Error())
|
||||||
|
@ -55,11 +55,12 @@ type ExternalServiceQuery struct {
|
|||||||
// ScaleServiceRequest request scaling of replica
|
// ScaleServiceRequest request scaling of replica
|
||||||
type ScaleServiceRequest struct {
|
type ScaleServiceRequest struct {
|
||||||
ServiceName string `json:"serviceName"`
|
ServiceName string `json:"serviceName"`
|
||||||
|
ServiceNamespace string `json:"serviceNamespace"`
|
||||||
Replicas uint64 `json:"replicas"`
|
Replicas uint64 `json:"replicas"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetReplicas replica count for function
|
// GetReplicas replica count for function
|
||||||
func (s ExternalServiceQuery) GetReplicas(serviceName string) (scaling.ServiceQueryResponse, error) {
|
func (s ExternalServiceQuery) GetReplicas(serviceName, serviceNamespace string) (scaling.ServiceQueryResponse, error) {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
@ -67,7 +68,7 @@ func (s ExternalServiceQuery) GetReplicas(serviceName string) (scaling.ServiceQu
|
|||||||
|
|
||||||
function := types.FunctionStatus{}
|
function := types.FunctionStatus{}
|
||||||
|
|
||||||
urlPath := fmt.Sprintf("%ssystem/function/%s", s.URL.String(), serviceName)
|
urlPath := fmt.Sprintf("%ssystem/function/%s?namespace=%s", s.URL.String(), serviceName, serviceNamespace)
|
||||||
|
|
||||||
req, _ := http.NewRequest(http.MethodGet, urlPath, nil)
|
req, _ := http.NewRequest(http.MethodGet, urlPath, nil)
|
||||||
|
|
||||||
@ -91,8 +92,10 @@ func (s ExternalServiceQuery) GetReplicas(serviceName string) (scaling.ServiceQu
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
log.Println(urlPath, err)
|
log.Println(urlPath, err)
|
||||||
}
|
}
|
||||||
|
log.Printf("GetReplicas [%s.%s] took: %fs", serviceName, serviceNamespace, time.Since(start).Seconds())
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
log.Printf("GetReplicas took: %fs", time.Since(start).Seconds())
|
log.Printf("GetReplicas [%s.%s] took: %fs, code: %d\n", serviceName, serviceNamespace, time.Since(start).Seconds(), res.StatusCode)
|
||||||
return emptyServiceQueryResponse, fmt.Errorf("server returned non-200 status code (%d) for function, %s", res.StatusCode, serviceName)
|
return emptyServiceQueryResponse, fmt.Errorf("server returned non-200 status code (%d) for function, %s", res.StatusCode, serviceName)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -115,8 +118,7 @@ func (s ExternalServiceQuery) GetReplicas(serviceName string) (scaling.ServiceQu
|
|||||||
log.Printf("Bad Scaling Factor: %d, is not in range of [0 - 100]. Will fallback to %d", extractedScalingFactor, scalingFactor)
|
log.Printf("Bad Scaling Factor: %d, is not in range of [0 - 100]. Will fallback to %d", extractedScalingFactor, scalingFactor)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
log.Printf("GetReplicas [%s.%s] took: %fs", serviceName, serviceNamespace, time.Since(start).Seconds())
|
||||||
log.Printf("GetReplicas took: %fs", time.Since(start).Seconds())
|
|
||||||
|
|
||||||
return scaling.ServiceQueryResponse{
|
return scaling.ServiceQueryResponse{
|
||||||
Replicas: function.Replicas,
|
Replicas: function.Replicas,
|
||||||
@ -128,12 +130,13 @@ func (s ExternalServiceQuery) GetReplicas(serviceName string) (scaling.ServiceQu
|
|||||||
}
|
}
|
||||||
|
|
||||||
// SetReplicas update the replica count
|
// SetReplicas update the replica count
|
||||||
func (s ExternalServiceQuery) SetReplicas(serviceName string, count uint64) error {
|
func (s ExternalServiceQuery) SetReplicas(serviceName, serviceNamespace string, count uint64) error {
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
scaleReq := ScaleServiceRequest{
|
scaleReq := ScaleServiceRequest{
|
||||||
ServiceName: serviceName,
|
ServiceName: serviceName,
|
||||||
Replicas: count,
|
Replicas: count,
|
||||||
|
ServiceNamespace: serviceNamespace,
|
||||||
}
|
}
|
||||||
|
|
||||||
requestBody, err := json.Marshal(scaleReq)
|
requestBody, err := json.Marshal(scaleReq)
|
||||||
@ -141,7 +144,8 @@ func (s ExternalServiceQuery) SetReplicas(serviceName string, count uint64) erro
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
urlPath := fmt.Sprintf("%ssystem/scale-function/%s", s.URL.String(), serviceName)
|
start := time.Now()
|
||||||
|
urlPath := fmt.Sprintf("%ssystem/scale-function/%s?namespace=%s", s.URL.String(), serviceName, serviceNamespace)
|
||||||
req, _ := http.NewRequest(http.MethodPost, urlPath, bytes.NewReader(requestBody))
|
req, _ := http.NewRequest(http.MethodPost, urlPath, bytes.NewReader(requestBody))
|
||||||
|
|
||||||
if s.AuthInjector != nil {
|
if s.AuthInjector != nil {
|
||||||
@ -163,6 +167,8 @@ func (s ExternalServiceQuery) SetReplicas(serviceName string, count uint64) erro
|
|||||||
err = fmt.Errorf("error scaling HTTP code %d, %s", res.StatusCode, urlPath)
|
err = fmt.Errorf("error scaling HTTP code %d, %s", res.StatusCode, urlPath)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.Printf("SetReplicas [%s.%s] took: %fs", serviceName, serviceNamespace, time.Since(start).Seconds())
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -52,7 +52,7 @@ func TestGetReplicasNonExistentFn(t *testing.T) {
|
|||||||
|
|
||||||
esq := NewExternalServiceQuery(*url, injector)
|
esq := NewExternalServiceQuery(*url, injector)
|
||||||
|
|
||||||
svcQryResp, err := esq.GetReplicas("burt")
|
svcQryResp, err := esq.GetReplicas("figlet", "")
|
||||||
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Logf("Error was nil, expected non-nil - the service query response value was %+v ", svcQryResp)
|
t.Logf("Error was nil, expected non-nil - the service query response value was %+v ", svcQryResp)
|
||||||
@ -82,7 +82,7 @@ func TestGetReplicasExistentFn(t *testing.T) {
|
|||||||
|
|
||||||
esq := NewExternalServiceQuery(*url, injector)
|
esq := NewExternalServiceQuery(*url, injector)
|
||||||
|
|
||||||
svcQryResp, err := esq.GetReplicas("burt")
|
svcQryResp, err := esq.GetReplicas("figlet", "")
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Logf("Expected err to be nil got: %s ", err.Error())
|
t.Logf("Expected err to be nil got: %s ", err.Error())
|
||||||
@ -106,7 +106,7 @@ func TestSetReplicasNonExistentFn(t *testing.T) {
|
|||||||
url, _ := url.Parse(testServer.URL + "/")
|
url, _ := url.Parse(testServer.URL + "/")
|
||||||
esq := NewExternalServiceQuery(*url, injector)
|
esq := NewExternalServiceQuery(*url, injector)
|
||||||
|
|
||||||
err := esq.SetReplicas("burt", 1)
|
err := esq.SetReplicas("figlet", "", 1)
|
||||||
|
|
||||||
expectedErrStr := "error scaling HTTP code 500"
|
expectedErrStr := "error scaling HTTP code 500"
|
||||||
|
|
||||||
@ -129,7 +129,7 @@ func TestSetReplicasExistentFn(t *testing.T) {
|
|||||||
url, _ := url.Parse(testServer.URL + "/")
|
url, _ := url.Parse(testServer.URL + "/")
|
||||||
esq := NewExternalServiceQuery(*url, injector)
|
esq := NewExternalServiceQuery(*url, injector)
|
||||||
|
|
||||||
err := esq.SetReplicas("burt", 1)
|
err := esq.SetReplicas("figlet", "", 1)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Logf("Expected err to be nil got: %s ", err.Error())
|
t.Logf("Expected err to be nil got: %s ", err.Error())
|
||||||
|
@ -29,22 +29,22 @@ type FunctionCache struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Set replica count for functionName
|
// Set replica count for functionName
|
||||||
func (fc *FunctionCache) Set(functionName string, serviceQueryResponse ServiceQueryResponse) {
|
func (fc *FunctionCache) Set(functionName, namespace string, serviceQueryResponse ServiceQueryResponse) {
|
||||||
fc.Sync.Lock()
|
fc.Sync.Lock()
|
||||||
defer fc.Sync.Unlock()
|
defer fc.Sync.Unlock()
|
||||||
|
|
||||||
if _, exists := fc.Cache[functionName]; !exists {
|
if _, exists := fc.Cache[functionName+"."+namespace]; !exists {
|
||||||
fc.Cache[functionName] = &FunctionMeta{}
|
fc.Cache[functionName+"."+namespace] = &FunctionMeta{}
|
||||||
}
|
}
|
||||||
|
|
||||||
fc.Cache[functionName].LastRefresh = time.Now()
|
fc.Cache[functionName+"."+namespace].LastRefresh = time.Now()
|
||||||
fc.Cache[functionName].ServiceQueryResponse = serviceQueryResponse
|
fc.Cache[functionName+"."+namespace].ServiceQueryResponse = serviceQueryResponse
|
||||||
// entry.LastRefresh = time.Now()
|
// entry.LastRefresh = time.Now()
|
||||||
// entry.ServiceQueryResponse = serviceQueryResponse
|
// entry.ServiceQueryResponse = serviceQueryResponse
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get replica count for functionName
|
// Get replica count for functionName
|
||||||
func (fc *FunctionCache) Get(functionName string) (ServiceQueryResponse, bool) {
|
func (fc *FunctionCache) Get(functionName, namespace string) (ServiceQueryResponse, bool) {
|
||||||
replicas := ServiceQueryResponse{
|
replicas := ServiceQueryResponse{
|
||||||
AvailableReplicas: 0,
|
AvailableReplicas: 0,
|
||||||
}
|
}
|
||||||
@ -53,7 +53,7 @@ func (fc *FunctionCache) Get(functionName string) (ServiceQueryResponse, bool) {
|
|||||||
fc.Sync.RLock()
|
fc.Sync.RLock()
|
||||||
defer fc.Sync.RUnlock()
|
defer fc.Sync.RUnlock()
|
||||||
|
|
||||||
if val, exists := fc.Cache[functionName]; exists {
|
if val, exists := fc.Cache[functionName+"."+namespace]; exists {
|
||||||
replicas = val.ServiceQueryResponse
|
replicas = val.ServiceQueryResponse
|
||||||
hit = !val.Expired(fc.Expiry)
|
hit = !val.Expired(fc.Expiry)
|
||||||
}
|
}
|
||||||
|
@ -12,7 +12,7 @@ func Test_LastRefreshSet(t *testing.T) {
|
|||||||
before := time.Now()
|
before := time.Now()
|
||||||
|
|
||||||
fnName := "echo"
|
fnName := "echo"
|
||||||
|
namespace := ""
|
||||||
cache := FunctionCache{
|
cache := FunctionCache{
|
||||||
Cache: make(map[string]*FunctionMeta),
|
Cache: make(map[string]*FunctionMeta),
|
||||||
Expiry: time.Millisecond * 1,
|
Expiry: time.Millisecond * 1,
|
||||||
@ -23,14 +23,14 @@ func Test_LastRefreshSet(t *testing.T) {
|
|||||||
t.Fail()
|
t.Fail()
|
||||||
}
|
}
|
||||||
|
|
||||||
cache.Set(fnName, ServiceQueryResponse{AvailableReplicas: 1})
|
cache.Set(fnName, "", ServiceQueryResponse{AvailableReplicas: 1})
|
||||||
|
|
||||||
if _, exists := cache.Cache[fnName]; !exists {
|
if _, exists := cache.Cache[fnName+"."+namespace]; !exists {
|
||||||
t.Errorf("Expected entry to exist after setting %s", fnName)
|
t.Errorf("Expected entry to exist after setting %s", fnName)
|
||||||
t.Fail()
|
t.Fail()
|
||||||
}
|
}
|
||||||
|
|
||||||
if cache.Cache[fnName].LastRefresh.Before(before) {
|
if cache.Cache[fnName+"."+namespace].LastRefresh.Before(before) {
|
||||||
t.Errorf("Expected LastRefresh for function to have been after start of test")
|
t.Errorf("Expected LastRefresh for function to have been after start of test")
|
||||||
t.Fail()
|
t.Fail()
|
||||||
}
|
}
|
||||||
@ -38,16 +38,16 @@ func Test_LastRefreshSet(t *testing.T) {
|
|||||||
|
|
||||||
func Test_CacheExpiresIn1MS(t *testing.T) {
|
func Test_CacheExpiresIn1MS(t *testing.T) {
|
||||||
fnName := "echo"
|
fnName := "echo"
|
||||||
|
namespace := ""
|
||||||
cache := FunctionCache{
|
cache := FunctionCache{
|
||||||
Cache: make(map[string]*FunctionMeta),
|
Cache: make(map[string]*FunctionMeta),
|
||||||
Expiry: time.Millisecond * 1,
|
Expiry: time.Millisecond * 1,
|
||||||
}
|
}
|
||||||
|
|
||||||
cache.Set(fnName, ServiceQueryResponse{AvailableReplicas: 1})
|
cache.Set(fnName, namespace, ServiceQueryResponse{AvailableReplicas: 1})
|
||||||
time.Sleep(time.Millisecond * 2)
|
time.Sleep(time.Millisecond * 2)
|
||||||
|
|
||||||
_, hit := cache.Get(fnName)
|
_, hit := cache.Get(fnName, namespace)
|
||||||
|
|
||||||
wantHit := false
|
wantHit := false
|
||||||
|
|
||||||
@ -58,15 +58,16 @@ func Test_CacheExpiresIn1MS(t *testing.T) {
|
|||||||
|
|
||||||
func Test_CacheGivesHitWithLongExpiry(t *testing.T) {
|
func Test_CacheGivesHitWithLongExpiry(t *testing.T) {
|
||||||
fnName := "echo"
|
fnName := "echo"
|
||||||
|
namespace := ""
|
||||||
|
|
||||||
cache := FunctionCache{
|
cache := FunctionCache{
|
||||||
Cache: make(map[string]*FunctionMeta),
|
Cache: make(map[string]*FunctionMeta),
|
||||||
Expiry: time.Millisecond * 500,
|
Expiry: time.Millisecond * 500,
|
||||||
}
|
}
|
||||||
|
|
||||||
cache.Set(fnName, ServiceQueryResponse{AvailableReplicas: 1})
|
cache.Set(fnName, namespace, ServiceQueryResponse{AvailableReplicas: 1})
|
||||||
|
_, hit := cache.Get(fnName, namespace)
|
||||||
|
|
||||||
_, hit := cache.Get(fnName)
|
|
||||||
wantHit := true
|
wantHit := true
|
||||||
|
|
||||||
if hit != wantHit {
|
if hit != wantHit {
|
||||||
@ -76,16 +77,17 @@ func Test_CacheGivesHitWithLongExpiry(t *testing.T) {
|
|||||||
|
|
||||||
func Test_CacheFunctionExists(t *testing.T) {
|
func Test_CacheFunctionExists(t *testing.T) {
|
||||||
fnName := "echo"
|
fnName := "echo"
|
||||||
|
namespace := ""
|
||||||
|
|
||||||
cache := FunctionCache{
|
cache := FunctionCache{
|
||||||
Cache: make(map[string]*FunctionMeta),
|
Cache: make(map[string]*FunctionMeta),
|
||||||
Expiry: time.Millisecond * 10,
|
Expiry: time.Millisecond * 10,
|
||||||
}
|
}
|
||||||
|
|
||||||
cache.Set(fnName, ServiceQueryResponse{AvailableReplicas: 1})
|
cache.Set(fnName, namespace, ServiceQueryResponse{AvailableReplicas: 1})
|
||||||
time.Sleep(time.Millisecond * 2)
|
time.Sleep(time.Millisecond * 2)
|
||||||
|
|
||||||
_, hit := cache.Get(fnName)
|
_, hit := cache.Get(fnName, namespace)
|
||||||
|
|
||||||
wantHit := true
|
wantHit := true
|
||||||
|
|
||||||
@ -93,19 +95,41 @@ func Test_CacheFunctionExists(t *testing.T) {
|
|||||||
t.Errorf("hit, want: %v, got %v", wantHit, hit)
|
t.Errorf("hit, want: %v, got %v", wantHit, hit)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
func Test_CacheFunctionNotExist(t *testing.T) {
|
|
||||||
|
func Test_CacheFunctionExistsWithNamespace(t *testing.T) {
|
||||||
fnName := "echo"
|
fnName := "echo"
|
||||||
testName := "burt"
|
namespace := "openfaas-fn"
|
||||||
|
|
||||||
cache := FunctionCache{
|
cache := FunctionCache{
|
||||||
Cache: make(map[string]*FunctionMeta),
|
Cache: make(map[string]*FunctionMeta),
|
||||||
Expiry: time.Millisecond * 10,
|
Expiry: time.Millisecond * 10,
|
||||||
}
|
}
|
||||||
|
|
||||||
cache.Set(fnName, ServiceQueryResponse{AvailableReplicas: 1})
|
cache.Set(fnName, namespace, ServiceQueryResponse{AvailableReplicas: 1})
|
||||||
|
|
||||||
|
_, hit := cache.Get(fnName, namespace)
|
||||||
|
|
||||||
|
wantHit := true
|
||||||
|
|
||||||
|
if hit != wantHit {
|
||||||
|
t.Errorf("hit, want: %v, got %v", wantHit, hit)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func Test_CacheFunctionNotExist(t *testing.T) {
|
||||||
|
fnName := "echo"
|
||||||
|
testName := "burt"
|
||||||
|
namespace := ""
|
||||||
|
|
||||||
|
cache := FunctionCache{
|
||||||
|
Cache: make(map[string]*FunctionMeta),
|
||||||
|
Expiry: time.Millisecond * 10,
|
||||||
|
}
|
||||||
|
|
||||||
|
cache.Set(fnName, namespace, ServiceQueryResponse{AvailableReplicas: 1})
|
||||||
time.Sleep(time.Millisecond * 2)
|
time.Sleep(time.Millisecond * 2)
|
||||||
|
|
||||||
_, hit := cache.Get(testName)
|
_, hit := cache.Get(testName, namespace)
|
||||||
|
|
||||||
wantHit := false
|
wantHit := false
|
||||||
|
|
||||||
|
@ -36,10 +36,10 @@ type FunctionScaleResult struct {
|
|||||||
|
|
||||||
// Scale scales a function from zero replicas to 1 or the value set in
|
// Scale scales a function from zero replicas to 1 or the value set in
|
||||||
// the minimum replicas metadata
|
// the minimum replicas metadata
|
||||||
func (f *FunctionScaler) Scale(functionName string) FunctionScaleResult {
|
func (f *FunctionScaler) Scale(functionName, namespace string) FunctionScaleResult {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|
||||||
if cachedResponse, hit := f.Cache.Get(functionName); hit &&
|
if cachedResponse, hit := f.Cache.Get(functionName, namespace); hit &&
|
||||||
cachedResponse.AvailableReplicas > 0 {
|
cachedResponse.AvailableReplicas > 0 {
|
||||||
return FunctionScaleResult{
|
return FunctionScaleResult{
|
||||||
Error: nil,
|
Error: nil,
|
||||||
@ -49,7 +49,7 @@ func (f *FunctionScaler) Scale(functionName string) FunctionScaleResult {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
queryResponse, err := f.Config.ServiceQuery.GetReplicas(functionName)
|
queryResponse, err := f.Config.ServiceQuery.GetReplicas(functionName, namespace)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return FunctionScaleResult{
|
return FunctionScaleResult{
|
||||||
@ -60,7 +60,7 @@ func (f *FunctionScaler) Scale(functionName string) FunctionScaleResult {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
f.Cache.Set(functionName, queryResponse)
|
f.Cache.Set(functionName, namespace, queryResponse)
|
||||||
|
|
||||||
if queryResponse.AvailableReplicas == 0 {
|
if queryResponse.AvailableReplicas == 0 {
|
||||||
minReplicas := uint64(1)
|
minReplicas := uint64(1)
|
||||||
@ -69,19 +69,19 @@ func (f *FunctionScaler) Scale(functionName string) FunctionScaleResult {
|
|||||||
}
|
}
|
||||||
|
|
||||||
scaleResult := backoff(func(attempt int) error {
|
scaleResult := backoff(func(attempt int) error {
|
||||||
queryResponse, err := f.Config.ServiceQuery.GetReplicas(functionName)
|
queryResponse, err := f.Config.ServiceQuery.GetReplicas(functionName, namespace)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
f.Cache.Set(functionName, queryResponse)
|
f.Cache.Set(functionName, namespace, queryResponse)
|
||||||
|
|
||||||
if queryResponse.Replicas > 0 {
|
if queryResponse.Replicas > 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Printf("[Scale %d] function=%s 0 => %d requested", attempt, functionName, minReplicas)
|
log.Printf("[Scale %d] function=%s 0 => %d requested", attempt, functionName, minReplicas)
|
||||||
setScaleErr := f.Config.ServiceQuery.SetReplicas(functionName, minReplicas)
|
setScaleErr := f.Config.ServiceQuery.SetReplicas(functionName, namespace, minReplicas)
|
||||||
if setScaleErr != nil {
|
if setScaleErr != nil {
|
||||||
return fmt.Errorf("unable to scale function [%s], err: %s", functionName, setScaleErr)
|
return fmt.Errorf("unable to scale function [%s], err: %s", functionName, setScaleErr)
|
||||||
}
|
}
|
||||||
@ -100,9 +100,9 @@ func (f *FunctionScaler) Scale(functionName string) FunctionScaleResult {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for i := 0; i < int(f.Config.MaxPollCount); i++ {
|
for i := 0; i < int(f.Config.MaxPollCount); i++ {
|
||||||
queryResponse, err := f.Config.ServiceQuery.GetReplicas(functionName)
|
queryResponse, err := f.Config.ServiceQuery.GetReplicas(functionName, namespace)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
f.Cache.Set(functionName, queryResponse)
|
f.Cache.Set(functionName, namespace, queryResponse)
|
||||||
}
|
}
|
||||||
totalTime := time.Since(start)
|
totalTime := time.Since(start)
|
||||||
|
|
||||||
|
@ -5,8 +5,8 @@ package scaling
|
|||||||
|
|
||||||
// ServiceQuery provides interface for replica querying/setting
|
// ServiceQuery provides interface for replica querying/setting
|
||||||
type ServiceQuery interface {
|
type ServiceQuery interface {
|
||||||
GetReplicas(service string) (response ServiceQueryResponse, err error)
|
GetReplicas(service, namespace string) (response ServiceQueryResponse, err error)
|
||||||
SetReplicas(service string, count uint64) error
|
SetReplicas(service, namespace string, count uint64) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// ServiceQueryResponse response from querying a function status
|
// ServiceQueryResponse response from querying a function status
|
||||||
|
@ -117,9 +117,9 @@ func main() {
|
|||||||
faasHandlers.InfoHandler = handlers.MakeInfoHandler(handlers.MakeForwardingProxyHandler(reverseProxy, forwardingNotifiers, urlResolver, nilURLTransformer, serviceAuthInjector))
|
faasHandlers.InfoHandler = handlers.MakeInfoHandler(handlers.MakeForwardingProxyHandler(reverseProxy, forwardingNotifiers, urlResolver, nilURLTransformer, serviceAuthInjector))
|
||||||
faasHandlers.SecretHandler = handlers.MakeForwardingProxyHandler(reverseProxy, forwardingNotifiers, urlResolver, nilURLTransformer, serviceAuthInjector)
|
faasHandlers.SecretHandler = handlers.MakeForwardingProxyHandler(reverseProxy, forwardingNotifiers, urlResolver, nilURLTransformer, serviceAuthInjector)
|
||||||
|
|
||||||
alertHandler := plugin.NewExternalServiceQuery(*config.FunctionsProviderURL, serviceAuthInjector)
|
externalServiceQuery := plugin.NewExternalServiceQuery(*config.FunctionsProviderURL, serviceAuthInjector)
|
||||||
faasHandlers.Alert = handlers.MakeNotifierWrapper(
|
faasHandlers.Alert = handlers.MakeNotifierWrapper(
|
||||||
handlers.MakeAlertHandler(alertHandler),
|
handlers.MakeAlertHandler(externalServiceQuery, config.Namespace),
|
||||||
forwardingNotifiers,
|
forwardingNotifiers,
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -190,10 +190,10 @@ func main() {
|
|||||||
SetScaleRetries: uint(20),
|
SetScaleRetries: uint(20),
|
||||||
FunctionPollInterval: time.Millisecond * 50,
|
FunctionPollInterval: time.Millisecond * 50,
|
||||||
CacheExpiry: time.Second * 5, // freshness of replica values before going stale
|
CacheExpiry: time.Second * 5, // freshness of replica values before going stale
|
||||||
ServiceQuery: alertHandler,
|
ServiceQuery: externalServiceQuery,
|
||||||
}
|
}
|
||||||
|
|
||||||
functionProxy = handlers.MakeScalingHandler(faasHandlers.Proxy, scalingConfig)
|
functionProxy = handlers.MakeScalingHandler(faasHandlers.Proxy, scalingConfig, config.Namespace)
|
||||||
}
|
}
|
||||||
|
|
||||||
r.HandleFunc("/function/{name:["+NameExpression+"]+}", functionProxy)
|
r.HandleFunc("/function/{name:["+NameExpression+"]+}", functionProxy)
|
||||||
|
Reference in New Issue
Block a user