diff --git a/gateway/Dockerfile b/gateway/Dockerfile index 68a9f281..ce390515 100644 --- a/gateway/Dockerfile +++ b/gateway/Dockerfile @@ -20,6 +20,7 @@ COPY types types COPY queue queue COPY plugin plugin COPY version version +COPY scaling scaling COPY server.go . # Run a gofmt and exclude all vendored code. diff --git a/gateway/Dockerfile.arm64 b/gateway/Dockerfile.arm64 index e2efc1ec..621a2cbf 100644 --- a/gateway/Dockerfile.arm64 +++ b/gateway/Dockerfile.arm64 @@ -13,6 +13,7 @@ COPY tests tests COPY types types COPY queue queue COPY plugin plugin +COPY scaling scaling COPY server.go . # Run a gofmt and exclude all vendored code. diff --git a/gateway/Dockerfile.armhf b/gateway/Dockerfile.armhf index 876748a5..e3e02344 100644 --- a/gateway/Dockerfile.armhf +++ b/gateway/Dockerfile.armhf @@ -13,6 +13,7 @@ COPY types types COPY queue queue COPY plugin plugin COPY version version +COPY scaling scaling COPY server.go . RUN GOARM=7 CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o gateway . diff --git a/gateway/handlers/alerthandler.go b/gateway/handlers/alerthandler.go index 60a5c0fe..d0a76a0f 100644 --- a/gateway/handlers/alerthandler.go +++ b/gateway/handlers/alerthandler.go @@ -11,30 +11,11 @@ import ( "net/http" "github.com/openfaas/faas/gateway/requests" -) - -const ( - // DefaultMinReplicas is the minimal amount of replicas for a service. - DefaultMinReplicas = 1 - - // DefaultMaxReplicas is the amount of replicas a service will auto-scale up to. - DefaultMaxReplicas = 20 - - // DefaultScalingFactor is the defining proportion for the scaling increments. - DefaultScalingFactor = 20 - - // MinScaleLabel label indicating min scale for a function - MinScaleLabel = "com.openfaas.scale.min" - - // MaxScaleLabel label indicating max scale for a function - MaxScaleLabel = "com.openfaas.scale.max" - - // ScalingFactorLabel label indicates the scaling factor for a function - ScalingFactorLabel = "com.openfaas.scale.factor" + "github.com/openfaas/faas/gateway/scaling" ) // MakeAlertHandler handles alerts from Prometheus Alertmanager -func MakeAlertHandler(service ServiceQuery) http.HandlerFunc { +func MakeAlertHandler(service scaling.ServiceQuery) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { log.Println("Alert received.") @@ -76,7 +57,7 @@ func MakeAlertHandler(service ServiceQuery) http.HandlerFunc { } } -func handleAlerts(req *requests.PrometheusAlert, service ServiceQuery) []error { +func handleAlerts(req *requests.PrometheusAlert, service scaling.ServiceQuery) []error { var errors []error for _, alert := range req.Alerts { if err := scaleService(alert, service); err != nil { @@ -88,7 +69,7 @@ func handleAlerts(req *requests.PrometheusAlert, service ServiceQuery) []error { return errors } -func scaleService(alert requests.PrometheusInnerAlert, service ServiceQuery) error { +func scaleService(alert requests.PrometheusInnerAlert, service scaling.ServiceQuery) error { var err error serviceName := alert.Labels.FunctionName diff --git a/gateway/handlers/alerthandler_test.go b/gateway/handlers/alerthandler_test.go index 7ac864f6..8be3a2c6 100644 --- a/gateway/handlers/alerthandler_test.go +++ b/gateway/handlers/alerthandler_test.go @@ -5,12 +5,14 @@ package handlers import ( "testing" + + "github.com/openfaas/faas/gateway/scaling" ) func TestDisabledScale(t *testing.T) { minReplicas := uint64(1) scalingFactor := uint64(0) - newReplicas := CalculateReplicas("firing", DefaultMinReplicas, DefaultMaxReplicas, minReplicas, scalingFactor) + newReplicas := CalculateReplicas("firing", scaling.DefaultMinReplicas, scaling.DefaultMaxReplicas, minReplicas, scalingFactor) if newReplicas != minReplicas { t.Logf("Expected not to scale, but replicas were: %d", newReplicas) t.Fail() @@ -20,7 +22,7 @@ func TestDisabledScale(t *testing.T) { func TestParameterEdge(t *testing.T) { minReplicas := uint64(0) scalingFactor := uint64(0) - newReplicas := CalculateReplicas("firing", DefaultMinReplicas, DefaultMaxReplicas, minReplicas, scalingFactor) + newReplicas := CalculateReplicas("firing", scaling.DefaultMinReplicas, scaling.DefaultMaxReplicas, minReplicas, scalingFactor) if newReplicas != 0 { t.Log("Expected not to scale") t.Fail() @@ -41,7 +43,7 @@ func TestScalingWithSameUpperLowerLimit(t *testing.T) { func TestMaxScale(t *testing.T) { minReplicas := uint64(1) scalingFactor := uint64(100) - newReplicas := CalculateReplicas("firing", DefaultMinReplicas, DefaultMaxReplicas, minReplicas, scalingFactor) + newReplicas := CalculateReplicas("firing", scaling.DefaultMinReplicas, scaling.DefaultMaxReplicas, minReplicas, scalingFactor) if newReplicas != 20 { t.Log("Expected ceiling of 20 replicas") t.Fail() @@ -51,7 +53,7 @@ func TestMaxScale(t *testing.T) { func TestInitialScale(t *testing.T) { minReplicas := uint64(1) scalingFactor := uint64(20) - newReplicas := CalculateReplicas("firing", DefaultMinReplicas, DefaultMaxReplicas, minReplicas, scalingFactor) + newReplicas := CalculateReplicas("firing", scaling.DefaultMinReplicas, scaling.DefaultMaxReplicas, minReplicas, scalingFactor) if newReplicas != 4 { t.Log("Expected the increment to equal 4") t.Fail() @@ -61,7 +63,7 @@ func TestInitialScale(t *testing.T) { func TestScale(t *testing.T) { minReplicas := uint64(1) scalingFactor := uint64(20) - newReplicas := CalculateReplicas("firing", 4, DefaultMaxReplicas, minReplicas, scalingFactor) + newReplicas := CalculateReplicas("firing", 4, scaling.DefaultMaxReplicas, minReplicas, scalingFactor) if newReplicas != 8 { t.Log("Expected newReplicas to equal 8") t.Fail() @@ -71,7 +73,7 @@ func TestScale(t *testing.T) { func TestScaleCeiling(t *testing.T) { minReplicas := uint64(1) scalingFactor := uint64(20) - newReplicas := CalculateReplicas("firing", 20, DefaultMaxReplicas, minReplicas, scalingFactor) + newReplicas := CalculateReplicas("firing", 20, scaling.DefaultMaxReplicas, minReplicas, scalingFactor) if newReplicas != 20 { t.Log("Expected ceiling of 20 replicas") t.Fail() @@ -81,7 +83,7 @@ func TestScaleCeiling(t *testing.T) { func TestScaleCeilingEdge(t *testing.T) { minReplicas := uint64(1) scalingFactor := uint64(20) - newReplicas := CalculateReplicas("firing", 19, DefaultMaxReplicas, minReplicas, scalingFactor) + newReplicas := CalculateReplicas("firing", 19, scaling.DefaultMaxReplicas, minReplicas, scalingFactor) if newReplicas != 20 { t.Log("Expected ceiling of 20 replicas") t.Fail() @@ -91,7 +93,7 @@ func TestScaleCeilingEdge(t *testing.T) { func TestBackingOff(t *testing.T) { minReplicas := uint64(1) scalingFactor := uint64(20) - newReplicas := CalculateReplicas("resolved", 8, DefaultMaxReplicas, minReplicas, scalingFactor) + newReplicas := CalculateReplicas("resolved", 8, scaling.DefaultMaxReplicas, minReplicas, scalingFactor) if newReplicas != 1 { t.Log("Expected backing off to 1 replica") t.Fail() diff --git a/gateway/handlers/scaling.go b/gateway/handlers/scaling.go index a9dcb171..a7d293d8 100644 --- a/gateway/handlers/scaling.go +++ b/gateway/handlers/scaling.go @@ -7,100 +7,47 @@ import ( "fmt" "log" "net/http" - "time" + + "github.com/openfaas/faas/gateway/scaling" ) -// ScalingConfig for scaling behaviours -type ScalingConfig struct { - // MaxPollCount attempts to query a function before giving up - MaxPollCount uint - - // FunctionPollInterval delay or interval between polling a function's readiness status - FunctionPollInterval time.Duration - - // CacheExpiry life-time for a cache entry before considering invalid - CacheExpiry time.Duration - - // ServiceQuery queries available/ready replicas for function - ServiceQuery ServiceQuery -} - // MakeScalingHandler creates handler which can scale a function from // zero to N replica(s). After scaling the next http.HandlerFunc will // be called. If the function is not ready after the configured // amount of attempts / queries then next will not be invoked and a status // will be returned to the client. -func MakeScalingHandler(next http.HandlerFunc, config ScalingConfig) http.HandlerFunc { - cache := FunctionCache{ - Cache: make(map[string]*FunctionMeta), - Expiry: config.CacheExpiry, - } +func MakeScalingHandler(next http.HandlerFunc, config scaling.ScalingConfig) http.HandlerFunc { + + scaler := scaling.NewFunctionScaler(config) return func(w http.ResponseWriter, r *http.Request) { functionName := getServiceName(r.URL.String()) + res := scaler.Scale(functionName) - if serviceQueryResponse, hit := cache.Get(functionName); hit && serviceQueryResponse.AvailableReplicas > 0 { - next.ServeHTTP(w, r) - return - } + if !res.Found { + errStr := fmt.Sprintf("error finding function %s: %s", functionName, res.Error.Error()) + log.Printf("Scaling: %s", errStr) - queryResponse, err := config.ServiceQuery.GetReplicas(functionName) - - if err != nil { - var errStr string - errStr = fmt.Sprintf("error finding function %s: %s", functionName, err.Error()) - - log.Printf(errStr) w.WriteHeader(http.StatusNotFound) w.Write([]byte(errStr)) return } - cache.Set(functionName, queryResponse) + if res.Error != nil { + errStr := fmt.Sprintf("error finding function %s: %s", functionName, res.Error.Error()) + log.Printf("Scaling: %s", errStr) - if queryResponse.AvailableReplicas == 0 { - minReplicas := uint64(1) - if queryResponse.MinReplicas > 0 { - minReplicas = queryResponse.MinReplicas - } - - log.Printf("[Scale] function=%s 0 => %d requested", functionName, minReplicas) - scalingStartTime := time.Now() - - err := config.ServiceQuery.SetReplicas(functionName, minReplicas) - if err != nil { - errStr := fmt.Errorf("unable to scale function [%s], err: %s", functionName, err) - log.Printf(errStr.Error()) - - w.WriteHeader(http.StatusInternalServerError) - w.Write([]byte(errStr.Error())) - return - } - - for i := 0; i < int(config.MaxPollCount); i++ { - queryResponse, err := config.ServiceQuery.GetReplicas(functionName) - cache.Set(functionName, queryResponse) - - if err != nil { - errStr := fmt.Sprintf("error: %s", err.Error()) - log.Printf(errStr) - - w.WriteHeader(http.StatusInternalServerError) - w.Write([]byte(errStr)) - return - } - - if queryResponse.AvailableReplicas > 0 { - scalingDuration := time.Since(scalingStartTime) - log.Printf("[Scale] function=%s 0 => %d successful - %f seconds", functionName, queryResponse.AvailableReplicas, scalingDuration.Seconds()) - break - } - - time.Sleep(config.FunctionPollInterval) - } + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte(errStr)) + return } - next.ServeHTTP(w, r) + if res.Available { + next.ServeHTTP(w, r) + return + } + + log.Printf("[Scale] function=%s 0=>N timed-out after %f seconds", functionName, res.Duration.Seconds()) } } diff --git a/gateway/plugin/external.go b/gateway/plugin/external.go index 58c17f4a..3de0d5a8 100644 --- a/gateway/plugin/external.go +++ b/gateway/plugin/external.go @@ -18,12 +18,12 @@ import ( "io/ioutil" "github.com/openfaas/faas-provider/auth" - "github.com/openfaas/faas/gateway/handlers" "github.com/openfaas/faas/gateway/requests" + "github.com/openfaas/faas/gateway/scaling" ) // NewExternalServiceQuery proxies service queries to external plugin via HTTP -func NewExternalServiceQuery(externalURL url.URL, credentials *auth.BasicAuthCredentials) handlers.ServiceQuery { +func NewExternalServiceQuery(externalURL url.URL, credentials *auth.BasicAuthCredentials) scaling.ServiceQuery { timeout := 3 * time.Second proxyClient := http.Client{ @@ -61,9 +61,9 @@ type ScaleServiceRequest struct { } // GetReplicas replica count for function -func (s ExternalServiceQuery) GetReplicas(serviceName string) (handlers.ServiceQueryResponse, error) { +func (s ExternalServiceQuery) GetReplicas(serviceName string) (scaling.ServiceQueryResponse, error) { var err error - var emptyServiceQueryResponse handlers.ServiceQueryResponse + var emptyServiceQueryResponse scaling.ServiceQueryResponse function := requests.Function{} @@ -96,17 +96,17 @@ func (s ExternalServiceQuery) GetReplicas(serviceName string) (handlers.ServiceQ } } - minReplicas := uint64(handlers.DefaultMinReplicas) - maxReplicas := uint64(handlers.DefaultMaxReplicas) - scalingFactor := uint64(handlers.DefaultScalingFactor) + minReplicas := uint64(scaling.DefaultMinReplicas) + maxReplicas := uint64(scaling.DefaultMaxReplicas) + scalingFactor := uint64(scaling.DefaultScalingFactor) availableReplicas := function.AvailableReplicas if function.Labels != nil { labels := *function.Labels - minReplicas = extractLabelValue(labels[handlers.MinScaleLabel], minReplicas) - maxReplicas = extractLabelValue(labels[handlers.MaxScaleLabel], maxReplicas) - extractedScalingFactor := extractLabelValue(labels[handlers.ScalingFactorLabel], scalingFactor) + minReplicas = extractLabelValue(labels[scaling.MinScaleLabel], minReplicas) + maxReplicas = extractLabelValue(labels[scaling.MaxScaleLabel], maxReplicas) + extractedScalingFactor := extractLabelValue(labels[scaling.ScalingFactorLabel], scalingFactor) if extractedScalingFactor >= 0 && extractedScalingFactor <= 100 { scalingFactor = extractedScalingFactor @@ -115,7 +115,7 @@ func (s ExternalServiceQuery) GetReplicas(serviceName string) (handlers.ServiceQ } } - return handlers.ServiceQueryResponse{ + return scaling.ServiceQueryResponse{ Replicas: function.Replicas, MaxReplicas: maxReplicas, MinReplicas: minReplicas, diff --git a/gateway/plugin/external_test.go b/gateway/plugin/external_test.go index 1d14ab00..851c0585 100644 --- a/gateway/plugin/external_test.go +++ b/gateway/plugin/external_test.go @@ -8,7 +8,7 @@ import ( "testing" "github.com/openfaas/faas-provider/auth" - "github.com/openfaas/faas/gateway/handlers" + "github.com/openfaas/faas/gateway/scaling" ) const fallbackValue = 120 @@ -70,11 +70,11 @@ func TestGetReplicasExistentFn(t *testing.T) { })) defer testServer.Close() - expectedSvcQryResp := handlers.ServiceQueryResponse{ + expectedSvcQryResp := scaling.ServiceQueryResponse{ Replicas: 0, - MaxReplicas: uint64(handlers.DefaultMaxReplicas), - MinReplicas: uint64(handlers.DefaultMinReplicas), - ScalingFactor: uint64(handlers.DefaultScalingFactor), + MaxReplicas: uint64(scaling.DefaultMaxReplicas), + MinReplicas: uint64(scaling.DefaultMinReplicas), + ScalingFactor: uint64(scaling.DefaultScalingFactor), AvailableReplicas: 0, } diff --git a/gateway/handlers/function_cache.go b/gateway/scaling/function_cache.go similarity index 98% rename from gateway/handlers/function_cache.go rename to gateway/scaling/function_cache.go index 1c141f28..61b4062c 100644 --- a/gateway/handlers/function_cache.go +++ b/gateway/scaling/function_cache.go @@ -1,7 +1,7 @@ // Copyright (c) OpenFaaS Author(s). All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. -package handlers +package scaling import ( "sync" diff --git a/gateway/handlers/function_cache_test.go b/gateway/scaling/function_cache_test.go similarity index 99% rename from gateway/handlers/function_cache_test.go rename to gateway/scaling/function_cache_test.go index edee342c..f8b39ac0 100644 --- a/gateway/handlers/function_cache_test.go +++ b/gateway/scaling/function_cache_test.go @@ -1,7 +1,7 @@ // Copyright (c) OpenFaaS Author(s). All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. -package handlers +package scaling import ( "testing" diff --git a/gateway/scaling/function_scaler.go b/gateway/scaling/function_scaler.go new file mode 100644 index 00000000..9b37184b --- /dev/null +++ b/gateway/scaling/function_scaler.go @@ -0,0 +1,108 @@ +package scaling + +import ( + "fmt" + "log" + "time" +) + +// NewFunctionScaler create a new scaler with the specified +// ScalingConfig +func NewFunctionScaler(config ScalingConfig) FunctionScaler { + cache := FunctionCache{ + Cache: make(map[string]*FunctionMeta), + Expiry: config.CacheExpiry, + } + + return FunctionScaler{ + Cache: &cache, + Config: config, + } +} + +// FunctionScaler scales from zero +type FunctionScaler struct { + Cache *FunctionCache + Config ScalingConfig +} + +// FunctionScaleResult holds the result of scaling from zero +type FunctionScaleResult struct { + Available bool + Error error + Found bool + Duration time.Duration +} + +// Scale scales a function from zero replicas to 1 or the value set in +// the minimum replicas metadata +func (f *FunctionScaler) Scale(functionName string) FunctionScaleResult { + start := time.Now() + queryResponse, err := f.Config.ServiceQuery.GetReplicas(functionName) + + if err != nil { + return FunctionScaleResult{ + Error: err, + Available: false, + Found: false, + Duration: time.Since(start), + } + } + + f.Cache.Set(functionName, queryResponse) + + if queryResponse.AvailableReplicas == 0 { + minReplicas := uint64(1) + if queryResponse.MinReplicas > 0 { + minReplicas = queryResponse.MinReplicas + } + + log.Printf("[Scale] function=%s 0 => %d requested", functionName, minReplicas) + + setScaleErr := f.Config.ServiceQuery.SetReplicas(functionName, minReplicas) + if setScaleErr != nil { + return FunctionScaleResult{ + Error: fmt.Errorf("unable to scale function [%s], err: %s", functionName, err), + Available: false, + Found: true, + Duration: time.Since(start), + } + } + + for i := 0; i < int(f.Config.MaxPollCount); i++ { + queryResponse, err := f.Config.ServiceQuery.GetReplicas(functionName) + f.Cache.Set(functionName, queryResponse) + totalTime := time.Since(start) + + if err != nil { + return FunctionScaleResult{ + Error: err, + Available: false, + Found: true, + Duration: totalTime, + } + } + + if queryResponse.AvailableReplicas > 0 { + + log.Printf("[Scale] function=%s 0 => %d successful - %f seconds", functionName, queryResponse.AvailableReplicas, totalTime.Seconds()) + + return FunctionScaleResult{ + Error: nil, + Available: true, + Found: true, + Duration: totalTime, + } + } + + time.Sleep(f.Config.FunctionPollInterval) + } + } + + return FunctionScaleResult{ + Error: nil, + Available: true, + Found: true, + Duration: time.Since(start), + } +} diff --git a/gateway/scaling/range.go b/gateway/scaling/range.go new file mode 100644 index 00000000..54586583 --- /dev/null +++ b/gateway/scaling/range.go @@ -0,0 +1,21 @@ +package scaling + +const ( + // DefaultMinReplicas is the minimal amount of replicas for a service. + DefaultMinReplicas = 1 + + // DefaultMaxReplicas is the amount of replicas a service will auto-scale up to. + DefaultMaxReplicas = 20 + + // DefaultScalingFactor is the defining proportion for the scaling increments. + DefaultScalingFactor = 20 + + // MinScaleLabel label indicating min scale for a function + MinScaleLabel = "com.openfaas.scale.min" + + // MaxScaleLabel label indicating max scale for a function + MaxScaleLabel = "com.openfaas.scale.max" + + // ScalingFactorLabel label indicates the scaling factor for a function + ScalingFactorLabel = "com.openfaas.scale.factor" +) diff --git a/gateway/scaling/scaling_config.go b/gateway/scaling/scaling_config.go new file mode 100644 index 00000000..918647a8 --- /dev/null +++ b/gateway/scaling/scaling_config.go @@ -0,0 +1,20 @@ +package scaling + +import ( + "time" +) + +// ScalingConfig for scaling behaviours +type ScalingConfig struct { + // MaxPollCount attempts to query a function before giving up + MaxPollCount uint + + // FunctionPollInterval delay or interval between polling a function's readiness status + FunctionPollInterval time.Duration + + // CacheExpiry life-time for a cache entry before considering invalid + CacheExpiry time.Duration + + // ServiceQuery queries available/ready replicas for function + ServiceQuery ServiceQuery +} diff --git a/gateway/handlers/service_query.go b/gateway/scaling/service_query.go similarity index 97% rename from gateway/handlers/service_query.go rename to gateway/scaling/service_query.go index 8139b898..84880a41 100644 --- a/gateway/handlers/service_query.go +++ b/gateway/scaling/service_query.go @@ -1,7 +1,7 @@ // Copyright (c) OpenFaaS Author(s). All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. -package handlers +package scaling // ServiceQuery provides interface for replica querying/setting type ServiceQuery interface { diff --git a/gateway/server.go b/gateway/server.go index 9a3d1545..97fe4925 100644 --- a/gateway/server.go +++ b/gateway/server.go @@ -11,6 +11,7 @@ import ( "github.com/gorilla/mux" "github.com/openfaas/faas/gateway/handlers" + "github.com/openfaas/faas/gateway/scaling" "github.com/openfaas/faas-provider/auth" "github.com/openfaas/faas/gateway/metrics" @@ -134,7 +135,7 @@ func main() { functionProxy := faasHandlers.Proxy if config.ScaleFromZero { - scalingConfig := handlers.ScalingConfig{ + scalingConfig := scaling.ScalingConfig{ MaxPollCount: uint(1000), FunctionPollInterval: time.Millisecond * 10, CacheExpiry: time.Second * 5, // freshness of replica values before going stale