Extract scaling from zero

- extracting this package means it can be used in other components
such as the asynchronous nats-queue-worker which may need to
invoke functions which are scaled down to zero replicas.

Ref: https://github.com/openfaas/nats-queue-worker/issues/32

Tested on Docker Swarm for scaling up, already scaled and not
found error.

Signed-off-by: Alex Ellis (VMware) <alexellis2@gmail.com>
This commit is contained in:
Alex Ellis (VMware)
2018-11-01 12:54:17 +00:00
committed by Alex Ellis
parent fb06e299cf
commit 9cea08c728
15 changed files with 208 additions and 125 deletions

View File

@ -0,0 +1,63 @@
// Copyright (c) OpenFaaS Author(s). All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
package scaling
import (
"sync"
"time"
)
// FunctionMeta holds the last refresh and any other
// meta-data needed for caching.
type FunctionMeta struct {
LastRefresh time.Time
ServiceQueryResponse ServiceQueryResponse
}
// Expired find out whether the cache item has expired with
// the given expiry duration from when it was stored.
func (fm *FunctionMeta) Expired(expiry time.Duration) bool {
return time.Now().After(fm.LastRefresh.Add(expiry))
}
// FunctionCache provides a cache of Function replica counts
type FunctionCache struct {
Cache map[string]*FunctionMeta
Expiry time.Duration
Sync sync.Mutex
}
// Set replica count for functionName
func (fc *FunctionCache) Set(functionName string, serviceQueryResponse ServiceQueryResponse) {
fc.Sync.Lock()
defer fc.Sync.Unlock()
if _, exists := fc.Cache[functionName]; !exists {
fc.Cache[functionName] = &FunctionMeta{}
}
entry := fc.Cache[functionName]
entry.LastRefresh = time.Now()
entry.ServiceQueryResponse = serviceQueryResponse
}
// Get replica count for functionName
func (fc *FunctionCache) Get(functionName string) (ServiceQueryResponse, bool) {
fc.Sync.Lock()
defer fc.Sync.Unlock()
replicas := ServiceQueryResponse{
AvailableReplicas: 0,
}
hit := false
if val, exists := fc.Cache[functionName]; exists {
replicas = val.ServiceQueryResponse
hit = !val.Expired(fc.Expiry)
}
return replicas, hit
}

View File

@ -0,0 +1,115 @@
// Copyright (c) OpenFaaS Author(s). All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
package scaling
import (
"testing"
"time"
)
func Test_LastRefreshSet(t *testing.T) {
before := time.Now()
fnName := "echo"
cache := FunctionCache{
Cache: make(map[string]*FunctionMeta),
Expiry: time.Millisecond * 1,
}
if cache.Cache == nil {
t.Errorf("Expected cache map to be initialized")
t.Fail()
}
cache.Set(fnName, ServiceQueryResponse{AvailableReplicas: 1})
if _, exists := cache.Cache[fnName]; !exists {
t.Errorf("Expected entry to exist after setting %s", fnName)
t.Fail()
}
if cache.Cache[fnName].LastRefresh.Before(before) {
t.Errorf("Expected LastRefresh for function to have been after start of test")
t.Fail()
}
}
func Test_CacheExpiresIn1MS(t *testing.T) {
fnName := "echo"
cache := FunctionCache{
Cache: make(map[string]*FunctionMeta),
Expiry: time.Millisecond * 1,
}
cache.Set(fnName, ServiceQueryResponse{AvailableReplicas: 1})
time.Sleep(time.Millisecond * 2)
_, hit := cache.Get(fnName)
wantHit := false
if hit != wantHit {
t.Errorf("hit, want: %v, got %v", wantHit, hit)
}
}
func Test_CacheGivesHitWithLongExpiry(t *testing.T) {
fnName := "echo"
cache := FunctionCache{
Cache: make(map[string]*FunctionMeta),
Expiry: time.Millisecond * 500,
}
cache.Set(fnName, ServiceQueryResponse{AvailableReplicas: 1})
_, hit := cache.Get(fnName)
wantHit := true
if hit != wantHit {
t.Errorf("hit, want: %v, got %v", wantHit, hit)
}
}
func Test_CacheFunctionExists(t *testing.T) {
fnName := "echo"
cache := FunctionCache{
Cache: make(map[string]*FunctionMeta),
Expiry: time.Millisecond * 10,
}
cache.Set(fnName, ServiceQueryResponse{AvailableReplicas: 1})
time.Sleep(time.Millisecond * 2)
_, hit := cache.Get(fnName)
wantHit := true
if hit != wantHit {
t.Errorf("hit, want: %v, got %v", wantHit, hit)
}
}
func Test_CacheFunctionNotExist(t *testing.T) {
fnName := "echo"
testName := "burt"
cache := FunctionCache{
Cache: make(map[string]*FunctionMeta),
Expiry: time.Millisecond * 10,
}
cache.Set(fnName, ServiceQueryResponse{AvailableReplicas: 1})
time.Sleep(time.Millisecond * 2)
_, hit := cache.Get(testName)
wantHit := false
if hit != wantHit {
t.Errorf("hit, want: %v, got %v", wantHit, hit)
}
}

View File

@ -0,0 +1,108 @@
package scaling
import (
"fmt"
"log"
"time"
)
// NewFunctionScaler create a new scaler with the specified
// ScalingConfig
func NewFunctionScaler(config ScalingConfig) FunctionScaler {
cache := FunctionCache{
Cache: make(map[string]*FunctionMeta),
Expiry: config.CacheExpiry,
}
return FunctionScaler{
Cache: &cache,
Config: config,
}
}
// FunctionScaler scales from zero
type FunctionScaler struct {
Cache *FunctionCache
Config ScalingConfig
}
// FunctionScaleResult holds the result of scaling from zero
type FunctionScaleResult struct {
Available bool
Error error
Found bool
Duration time.Duration
}
// Scale scales a function from zero replicas to 1 or the value set in
// the minimum replicas metadata
func (f *FunctionScaler) Scale(functionName string) FunctionScaleResult {
start := time.Now()
queryResponse, err := f.Config.ServiceQuery.GetReplicas(functionName)
if err != nil {
return FunctionScaleResult{
Error: err,
Available: false,
Found: false,
Duration: time.Since(start),
}
}
f.Cache.Set(functionName, queryResponse)
if queryResponse.AvailableReplicas == 0 {
minReplicas := uint64(1)
if queryResponse.MinReplicas > 0 {
minReplicas = queryResponse.MinReplicas
}
log.Printf("[Scale] function=%s 0 => %d requested", functionName, minReplicas)
setScaleErr := f.Config.ServiceQuery.SetReplicas(functionName, minReplicas)
if setScaleErr != nil {
return FunctionScaleResult{
Error: fmt.Errorf("unable to scale function [%s], err: %s", functionName, err),
Available: false,
Found: true,
Duration: time.Since(start),
}
}
for i := 0; i < int(f.Config.MaxPollCount); i++ {
queryResponse, err := f.Config.ServiceQuery.GetReplicas(functionName)
f.Cache.Set(functionName, queryResponse)
totalTime := time.Since(start)
if err != nil {
return FunctionScaleResult{
Error: err,
Available: false,
Found: true,
Duration: totalTime,
}
}
if queryResponse.AvailableReplicas > 0 {
log.Printf("[Scale] function=%s 0 => %d successful - %f seconds", functionName, queryResponse.AvailableReplicas, totalTime.Seconds())
return FunctionScaleResult{
Error: nil,
Available: true,
Found: true,
Duration: totalTime,
}
}
time.Sleep(f.Config.FunctionPollInterval)
}
}
return FunctionScaleResult{
Error: nil,
Available: true,
Found: true,
Duration: time.Since(start),
}
}

21
gateway/scaling/range.go Normal file
View File

@ -0,0 +1,21 @@
package scaling
const (
// DefaultMinReplicas is the minimal amount of replicas for a service.
DefaultMinReplicas = 1
// DefaultMaxReplicas is the amount of replicas a service will auto-scale up to.
DefaultMaxReplicas = 20
// DefaultScalingFactor is the defining proportion for the scaling increments.
DefaultScalingFactor = 20
// MinScaleLabel label indicating min scale for a function
MinScaleLabel = "com.openfaas.scale.min"
// MaxScaleLabel label indicating max scale for a function
MaxScaleLabel = "com.openfaas.scale.max"
// ScalingFactorLabel label indicates the scaling factor for a function
ScalingFactorLabel = "com.openfaas.scale.factor"
)

View File

@ -0,0 +1,20 @@
package scaling
import (
"time"
)
// ScalingConfig for scaling behaviours
type ScalingConfig struct {
// MaxPollCount attempts to query a function before giving up
MaxPollCount uint
// FunctionPollInterval delay or interval between polling a function's readiness status
FunctionPollInterval time.Duration
// CacheExpiry life-time for a cache entry before considering invalid
CacheExpiry time.Duration
// ServiceQuery queries available/ready replicas for function
ServiceQuery ServiceQuery
}

View File

@ -0,0 +1,19 @@
// Copyright (c) OpenFaaS Author(s). All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
package scaling
// ServiceQuery provides interface for replica querying/setting
type ServiceQuery interface {
GetReplicas(service string) (response ServiceQueryResponse, err error)
SetReplicas(service string, count uint64) error
}
// ServiceQueryResponse response from querying a function status
type ServiceQueryResponse struct {
Replicas uint64
MaxReplicas uint64
MinReplicas uint64
ScalingFactor uint64
AvailableReplicas uint64
}