diff --git a/gateway/Dockerfile b/gateway/Dockerfile index b7af3a5a..68a9f281 100644 --- a/gateway/Dockerfile +++ b/gateway/Dockerfile @@ -3,7 +3,7 @@ ARG GIT_COMMIT_SHA ARG GIT_COMMIT_MESSAGE ARG VERSION='dev' -RUN curl -sL https://github.com/alexellis/license-check/releases/download/0.2.2/license-check \ +RUN curl -sSfL https://github.com/alexellis/license-check/releases/download/0.2.2/license-check \ > /usr/bin/license-check \ && chmod +x /usr/bin/license-check diff --git a/gateway/handlers/function_cache.go b/gateway/handlers/function_cache.go index 1128f817..852ec3b6 100644 --- a/gateway/handlers/function_cache.go +++ b/gateway/handlers/function_cache.go @@ -11,8 +11,8 @@ import ( // FunctionMeta holds the last refresh and any other // meta-data needed for caching. type FunctionMeta struct { - LastRefresh time.Time - Replicas uint64 + LastRefresh time.Time + ServiceQueryResponse ServiceQueryResponse } // Expired find out whether the cache item has expired with @@ -29,7 +29,7 @@ type FunctionCache struct { } // Set replica count for functionName -func (fc *FunctionCache) Set(functionName string, replicas uint64) { +func (fc *FunctionCache) Set(functionName string, serviceQueryResponse ServiceQueryResponse) { fc.Sync.Lock() if _, exists := fc.Cache[functionName]; !exists { @@ -38,19 +38,22 @@ func (fc *FunctionCache) Set(functionName string, replicas uint64) { entry := fc.Cache[functionName] entry.LastRefresh = time.Now() - entry.Replicas = replicas + entry.ServiceQueryResponse = serviceQueryResponse fc.Sync.Unlock() } // Get replica count for functionName -func (fc *FunctionCache) Get(functionName string) (uint64, bool) { - replicas := uint64(0) +func (fc *FunctionCache) Get(functionName string) (ServiceQueryResponse, bool) { + replicas := ServiceQueryResponse{ + AvailableReplicas: 0, + } + hit := false fc.Sync.Lock() if val, exists := fc.Cache[functionName]; exists { - replicas = val.Replicas + replicas = val.ServiceQueryResponse hit = !val.Expired(fc.Expiry) } diff --git a/gateway/handlers/function_cache_test.go b/gateway/handlers/function_cache_test.go index c7100c3f..6bbbc812 100644 --- a/gateway/handlers/function_cache_test.go +++ b/gateway/handlers/function_cache_test.go @@ -20,7 +20,7 @@ func Test_LastRefreshSet(t *testing.T) { t.Fail() } - cache.Set(fnName, 1) + cache.Set(fnName, ServiceQueryResponse{AvailableReplicas: 1}) if _, exists := cache.Cache[fnName]; !exists { t.Errorf("Expected entry to exist after setting %s", fnName) @@ -41,7 +41,7 @@ func Test_CacheExpiresIn1MS(t *testing.T) { Expiry: time.Millisecond * 1, } - cache.Set(fnName, 1) + cache.Set(fnName, ServiceQueryResponse{AvailableReplicas: 1}) time.Sleep(time.Millisecond * 2) _, hit := cache.Get(fnName) @@ -61,7 +61,7 @@ func Test_CacheGivesHitWithLongExpiry(t *testing.T) { Expiry: time.Millisecond * 500, } - cache.Set(fnName, 1) + cache.Set(fnName, ServiceQueryResponse{AvailableReplicas: 1}) _, hit := cache.Get(fnName) wantHit := true diff --git a/gateway/handlers/scaling.go b/gateway/handlers/scaling.go index adb29f54..b37da38c 100644 --- a/gateway/handlers/scaling.go +++ b/gateway/handlers/scaling.go @@ -4,16 +4,10 @@ package handlers import ( - "bytes" - "encoding/json" "fmt" - "io/ioutil" "log" "net/http" - "net/http/httptest" "time" - - "github.com/openfaas/faas/gateway/requests" ) // ScalingConfig for scaling behaviours @@ -21,6 +15,7 @@ type ScalingConfig struct { MaxPollCount uint FunctionPollInterval time.Duration CacheExpiry time.Duration + ServiceQuery ServiceQuery } // MakeScalingHandler creates handler which can scale a function from @@ -35,33 +30,31 @@ func MakeScalingHandler(next http.HandlerFunc, upstream http.HandlerFunc, config functionName := getServiceName(r.URL.String()) - if replicas, hit := cache.Get(functionName); hit && replicas > 0 { + if serviceQueryResponse, hit := cache.Get(functionName); hit && serviceQueryResponse.AvailableReplicas > 0 { next.ServeHTTP(w, r) return } - replicas, code, err := getReplicas(functionName, upstream) - cache.Set(functionName, replicas) + queryResponse, err := config.ServiceQuery.GetReplicas(functionName) + cache.Set(functionName, queryResponse) if err != nil { var errStr string - if code == http.StatusNotFound { - errStr = fmt.Sprintf("unable to find function: %s", functionName) - - } else { - errStr = fmt.Sprintf("error finding function %s: %s", functionName, err.Error()) - } + errStr = fmt.Sprintf("error finding function %s: %s", functionName, err.Error()) log.Printf(errStr) - w.WriteHeader(code) + w.WriteHeader(http.StatusInternalServerError) w.Write([]byte(errStr)) return } - if replicas == 0 { + if queryResponse.AvailableReplicas == 0 { minReplicas := uint64(1) + if queryResponse.MinReplicas > 0 { + minReplicas = queryResponse.MinReplicas + } - err := scaleFunction(functionName, minReplicas, upstream) + err := config.ServiceQuery.SetReplicas(functionName, minReplicas) if err != nil { errStr := fmt.Errorf("unable to scale function [%s], err: %s", functionName, err) log.Printf(errStr.Error()) @@ -72,8 +65,8 @@ func MakeScalingHandler(next http.HandlerFunc, upstream http.HandlerFunc, config } for i := 0; i < int(config.MaxPollCount); i++ { - replicas, _, err := getReplicas(functionName, upstream) - cache.Set(functionName, replicas) + queryResponse, err := config.ServiceQuery.GetReplicas(functionName) + cache.Set(functionName, queryResponse) if err != nil { errStr := fmt.Sprintf("error: %s", err.Error()) @@ -84,7 +77,7 @@ func MakeScalingHandler(next http.HandlerFunc, upstream http.HandlerFunc, config return } - if replicas > 0 { + if queryResponse.AvailableReplicas > 0 { break } @@ -95,53 +88,3 @@ func MakeScalingHandler(next http.HandlerFunc, upstream http.HandlerFunc, config next.ServeHTTP(w, r) } } - -func getReplicas(functionName string, upstream http.HandlerFunc) (uint64, int, error) { - - replicasQuery, _ := http.NewRequest(http.MethodGet, fmt.Sprintf("/system/function/%s", functionName), nil) - rr := httptest.NewRecorder() - - upstream.ServeHTTP(rr, replicasQuery) - if rr.Code != 200 { - log.Printf("error, query replicas status: %d", rr.Code) - - var errBody string - if rr.Body != nil { - errBody = string(rr.Body.String()) - } - - return 0, rr.Code, fmt.Errorf("unable to query function: %s", string(errBody)) - } - - replicaBytes, _ := ioutil.ReadAll(rr.Body) - replicaResult := requests.Function{} - json.Unmarshal(replicaBytes, &replicaResult) - - return replicaResult.AvailableReplicas, rr.Code, nil -} - -func scaleFunction(functionName string, minReplicas uint64, upstream http.HandlerFunc) error { - scaleReq := ScaleServiceRequest{ - Replicas: minReplicas, - ServiceName: functionName, - } - - scaleBytesOut, _ := json.Marshal(scaleReq) - scaleBytesOutBody := bytes.NewBuffer(scaleBytesOut) - setReplicasReq, _ := http.NewRequest(http.MethodPost, fmt.Sprintf("/system/scale-function/%s", functionName), scaleBytesOutBody) - - rr := httptest.NewRecorder() - upstream.ServeHTTP(rr, setReplicasReq) - - if rr.Code != 200 { - return fmt.Errorf("scale to 1 replica status: %d", rr.Code) - } - - return nil -} - -// ScaleServiceRequest request to scale a function -type ScaleServiceRequest struct { - ServiceName string `json:"serviceName"` - Replicas uint64 `json:"replicas"` -} diff --git a/gateway/server.go b/gateway/server.go index a3282707..8d533ad2 100644 --- a/gateway/server.go +++ b/gateway/server.go @@ -122,6 +122,7 @@ func main() { MaxPollCount: uint(1000), FunctionPollInterval: time.Millisecond * 10, CacheExpiry: time.Second * 5, // freshness of replica values before going stale + ServiceQuery: alertHandler, } functionProxy = handlers.MakeScalingHandler(faasHandlers.Proxy, queryFunction, scalingConfig)