Refactor scaling to use existing code

Existing code has been used for scaling up and querying replicas.
This meant the new code was deleted and there is less duplication
now.

The cache store a whole query response rather than just the
available replica count and the tests were updated. This has been
tested with Docker swarm and the image:
 openfaas/gateway:scale-17-07-2018

This feature now needs the env-var of scale_from_zero to be enabled
in order to turn on the scaling behaviour.

Signed-off-by: Alex Ellis (VMware) <alexellis2@gmail.com>
This commit is contained in:
Alex Ellis (VMware) 2018-07-17 10:29:22 +01:00 committed by Alex Ellis
parent c58af8da56
commit 9512f09d2b
5 changed files with 29 additions and 82 deletions

View File

@ -3,7 +3,7 @@ ARG GIT_COMMIT_SHA
ARG GIT_COMMIT_MESSAGE ARG GIT_COMMIT_MESSAGE
ARG VERSION='dev' ARG VERSION='dev'
RUN curl -sL https://github.com/alexellis/license-check/releases/download/0.2.2/license-check \ RUN curl -sSfL https://github.com/alexellis/license-check/releases/download/0.2.2/license-check \
> /usr/bin/license-check \ > /usr/bin/license-check \
&& chmod +x /usr/bin/license-check && chmod +x /usr/bin/license-check

View File

@ -12,7 +12,7 @@ import (
// meta-data needed for caching. // meta-data needed for caching.
type FunctionMeta struct { type FunctionMeta struct {
LastRefresh time.Time LastRefresh time.Time
Replicas uint64 ServiceQueryResponse ServiceQueryResponse
} }
// Expired find out whether the cache item has expired with // Expired find out whether the cache item has expired with
@ -29,7 +29,7 @@ type FunctionCache struct {
} }
// Set replica count for functionName // Set replica count for functionName
func (fc *FunctionCache) Set(functionName string, replicas uint64) { func (fc *FunctionCache) Set(functionName string, serviceQueryResponse ServiceQueryResponse) {
fc.Sync.Lock() fc.Sync.Lock()
if _, exists := fc.Cache[functionName]; !exists { if _, exists := fc.Cache[functionName]; !exists {
@ -38,19 +38,22 @@ func (fc *FunctionCache) Set(functionName string, replicas uint64) {
entry := fc.Cache[functionName] entry := fc.Cache[functionName]
entry.LastRefresh = time.Now() entry.LastRefresh = time.Now()
entry.Replicas = replicas entry.ServiceQueryResponse = serviceQueryResponse
fc.Sync.Unlock() fc.Sync.Unlock()
} }
// Get replica count for functionName // Get replica count for functionName
func (fc *FunctionCache) Get(functionName string) (uint64, bool) { func (fc *FunctionCache) Get(functionName string) (ServiceQueryResponse, bool) {
replicas := uint64(0) replicas := ServiceQueryResponse{
AvailableReplicas: 0,
}
hit := false hit := false
fc.Sync.Lock() fc.Sync.Lock()
if val, exists := fc.Cache[functionName]; exists { if val, exists := fc.Cache[functionName]; exists {
replicas = val.Replicas replicas = val.ServiceQueryResponse
hit = !val.Expired(fc.Expiry) hit = !val.Expired(fc.Expiry)
} }

View File

@ -20,7 +20,7 @@ func Test_LastRefreshSet(t *testing.T) {
t.Fail() t.Fail()
} }
cache.Set(fnName, 1) cache.Set(fnName, ServiceQueryResponse{AvailableReplicas: 1})
if _, exists := cache.Cache[fnName]; !exists { if _, exists := cache.Cache[fnName]; !exists {
t.Errorf("Expected entry to exist after setting %s", fnName) t.Errorf("Expected entry to exist after setting %s", fnName)
@ -41,7 +41,7 @@ func Test_CacheExpiresIn1MS(t *testing.T) {
Expiry: time.Millisecond * 1, Expiry: time.Millisecond * 1,
} }
cache.Set(fnName, 1) cache.Set(fnName, ServiceQueryResponse{AvailableReplicas: 1})
time.Sleep(time.Millisecond * 2) time.Sleep(time.Millisecond * 2)
_, hit := cache.Get(fnName) _, hit := cache.Get(fnName)
@ -61,7 +61,7 @@ func Test_CacheGivesHitWithLongExpiry(t *testing.T) {
Expiry: time.Millisecond * 500, Expiry: time.Millisecond * 500,
} }
cache.Set(fnName, 1) cache.Set(fnName, ServiceQueryResponse{AvailableReplicas: 1})
_, hit := cache.Get(fnName) _, hit := cache.Get(fnName)
wantHit := true wantHit := true

View File

@ -4,16 +4,10 @@
package handlers package handlers
import ( import (
"bytes"
"encoding/json"
"fmt" "fmt"
"io/ioutil"
"log" "log"
"net/http" "net/http"
"net/http/httptest"
"time" "time"
"github.com/openfaas/faas/gateway/requests"
) )
// ScalingConfig for scaling behaviours // ScalingConfig for scaling behaviours
@ -21,6 +15,7 @@ type ScalingConfig struct {
MaxPollCount uint MaxPollCount uint
FunctionPollInterval time.Duration FunctionPollInterval time.Duration
CacheExpiry time.Duration CacheExpiry time.Duration
ServiceQuery ServiceQuery
} }
// MakeScalingHandler creates handler which can scale a function from // MakeScalingHandler creates handler which can scale a function from
@ -35,33 +30,31 @@ func MakeScalingHandler(next http.HandlerFunc, upstream http.HandlerFunc, config
functionName := getServiceName(r.URL.String()) functionName := getServiceName(r.URL.String())
if replicas, hit := cache.Get(functionName); hit && replicas > 0 { if serviceQueryResponse, hit := cache.Get(functionName); hit && serviceQueryResponse.AvailableReplicas > 0 {
next.ServeHTTP(w, r) next.ServeHTTP(w, r)
return return
} }
replicas, code, err := getReplicas(functionName, upstream) queryResponse, err := config.ServiceQuery.GetReplicas(functionName)
cache.Set(functionName, replicas) cache.Set(functionName, queryResponse)
if err != nil { if err != nil {
var errStr string var errStr string
if code == http.StatusNotFound {
errStr = fmt.Sprintf("unable to find function: %s", functionName)
} else {
errStr = fmt.Sprintf("error finding function %s: %s", functionName, err.Error()) errStr = fmt.Sprintf("error finding function %s: %s", functionName, err.Error())
}
log.Printf(errStr) log.Printf(errStr)
w.WriteHeader(code) w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(errStr)) w.Write([]byte(errStr))
return return
} }
if replicas == 0 { if queryResponse.AvailableReplicas == 0 {
minReplicas := uint64(1) minReplicas := uint64(1)
if queryResponse.MinReplicas > 0 {
minReplicas = queryResponse.MinReplicas
}
err := scaleFunction(functionName, minReplicas, upstream) err := config.ServiceQuery.SetReplicas(functionName, minReplicas)
if err != nil { if err != nil {
errStr := fmt.Errorf("unable to scale function [%s], err: %s", functionName, err) errStr := fmt.Errorf("unable to scale function [%s], err: %s", functionName, err)
log.Printf(errStr.Error()) log.Printf(errStr.Error())
@ -72,8 +65,8 @@ func MakeScalingHandler(next http.HandlerFunc, upstream http.HandlerFunc, config
} }
for i := 0; i < int(config.MaxPollCount); i++ { for i := 0; i < int(config.MaxPollCount); i++ {
replicas, _, err := getReplicas(functionName, upstream) queryResponse, err := config.ServiceQuery.GetReplicas(functionName)
cache.Set(functionName, replicas) cache.Set(functionName, queryResponse)
if err != nil { if err != nil {
errStr := fmt.Sprintf("error: %s", err.Error()) errStr := fmt.Sprintf("error: %s", err.Error())
@ -84,7 +77,7 @@ func MakeScalingHandler(next http.HandlerFunc, upstream http.HandlerFunc, config
return return
} }
if replicas > 0 { if queryResponse.AvailableReplicas > 0 {
break break
} }
@ -95,53 +88,3 @@ func MakeScalingHandler(next http.HandlerFunc, upstream http.HandlerFunc, config
next.ServeHTTP(w, r) next.ServeHTTP(w, r)
} }
} }
func getReplicas(functionName string, upstream http.HandlerFunc) (uint64, int, error) {
replicasQuery, _ := http.NewRequest(http.MethodGet, fmt.Sprintf("/system/function/%s", functionName), nil)
rr := httptest.NewRecorder()
upstream.ServeHTTP(rr, replicasQuery)
if rr.Code != 200 {
log.Printf("error, query replicas status: %d", rr.Code)
var errBody string
if rr.Body != nil {
errBody = string(rr.Body.String())
}
return 0, rr.Code, fmt.Errorf("unable to query function: %s", string(errBody))
}
replicaBytes, _ := ioutil.ReadAll(rr.Body)
replicaResult := requests.Function{}
json.Unmarshal(replicaBytes, &replicaResult)
return replicaResult.AvailableReplicas, rr.Code, nil
}
func scaleFunction(functionName string, minReplicas uint64, upstream http.HandlerFunc) error {
scaleReq := ScaleServiceRequest{
Replicas: minReplicas,
ServiceName: functionName,
}
scaleBytesOut, _ := json.Marshal(scaleReq)
scaleBytesOutBody := bytes.NewBuffer(scaleBytesOut)
setReplicasReq, _ := http.NewRequest(http.MethodPost, fmt.Sprintf("/system/scale-function/%s", functionName), scaleBytesOutBody)
rr := httptest.NewRecorder()
upstream.ServeHTTP(rr, setReplicasReq)
if rr.Code != 200 {
return fmt.Errorf("scale to 1 replica status: %d", rr.Code)
}
return nil
}
// ScaleServiceRequest request to scale a function
type ScaleServiceRequest struct {
ServiceName string `json:"serviceName"`
Replicas uint64 `json:"replicas"`
}

View File

@ -122,6 +122,7 @@ func main() {
MaxPollCount: uint(1000), MaxPollCount: uint(1000),
FunctionPollInterval: time.Millisecond * 10, FunctionPollInterval: time.Millisecond * 10,
CacheExpiry: time.Second * 5, // freshness of replica values before going stale CacheExpiry: time.Second * 5, // freshness of replica values before going stale
ServiceQuery: alertHandler,
} }
functionProxy = handlers.MakeScalingHandler(faasHandlers.Proxy, queryFunction, scalingConfig) functionProxy = handlers.MakeScalingHandler(faasHandlers.Proxy, queryFunction, scalingConfig)