From 2452fdea0b39cfd3fcb4a4009918d9fc1956325c Mon Sep 17 00:00:00 2001 From: Alex Ellis Date: Wed, 29 Nov 2017 21:58:11 +0000 Subject: [PATCH] Allow min-scale Signed-off-by: Alex Ellis --- TestDrive.md | 10 +++++++- gateway/handlers/alerthandler.go | 40 +++++++++++++++++++++--------- gateway/handlers/create_handler.go | 21 ++++++++++++++++ gateway/handlers/update_handler.go | 4 +++ gateway/plugin/external.go | 40 ++++++++++++++++++++++++------ gateway/tests/alerthandler_test.go | 15 +++++++---- 6 files changed, 105 insertions(+), 25 deletions(-) diff --git a/TestDrive.md b/TestDrive.md index b5f8bf8c..730a2231 100644 --- a/TestDrive.md +++ b/TestDrive.md @@ -242,7 +242,7 @@ The `-d` value passes in the argument for your function. This is read via STDIN Grab OS, CPU and other info via a Node.js container using the `os` module. -If you invoke this method in a while loop or with a load-generator tool then it will auto-scale to 5, 10, 15 and finally 20 replicas due to the load. You will then be able to see the various Docker containers responding with a different Hostname for each request as the work is distributed evenly. +If you invoke this method in a while loop or with a load-generator tool then it will auto-scale to 5, 10, 15 and finally 25 replicas due to the load. You will then be able to see the various Docker containers responding with a different Hostname for each request as the work is distributed evenly. Here is a loop that can be used to invoke the function in a loop to trigger auto-scaling. ``` @@ -262,6 +262,14 @@ CPU count: 1 Uptime: 776839 ``` +To control scaling behaviour you can set a min/max scale value with a label when deploying your function via the CLI or the API: + +``` + labels: + "com.openfaas.scale.min": "5" + "com.openfaas.scale.max": "15" +``` + **Sample function: webhook stasher (webhookstash)** Another cool sample function is the Webhook Stasher which saves the body of any data posted to the service to the container's filesystem. Each file is written with the filename of the UNIX time. diff --git a/gateway/handlers/alerthandler.go b/gateway/handlers/alerthandler.go index c1c4d983..dd974533 100644 --- a/gateway/handlers/alerthandler.go +++ b/gateway/handlers/alerthandler.go @@ -22,7 +22,7 @@ import ( const DefaultMaxReplicas = 20 type ServiceQuery interface { - GetReplicas(service string) (currentReplicas uint64, maxReplicas uint64, err error) + GetReplicas(service string) (currentReplicas uint64, maxReplicas uint64, minReplicas uint64, err error) SetReplicas(service string, count uint64) error } @@ -39,30 +39,45 @@ type SwarmServiceQuery struct { } // GetReplicas replica count for function -func (s SwarmServiceQuery) GetReplicas(serviceName string) (uint64, uint64, error) { +func (s SwarmServiceQuery) GetReplicas(serviceName string) (uint64, uint64, uint64, error) { var err error var currentReplicas uint64 maxReplicas := uint64(DefaultMaxReplicas) + minReplicas := uint64(1) + opts := types.ServiceInspectOptions{ InsertDefaults: true, } service, _, err := s.c.ServiceInspectWithRaw(context.Background(), serviceName, opts) if err == nil { currentReplicas = *service.Spec.Mode.Replicated.Replicas + log.Println("service.Spec.Annotations.Labels ", service.Spec.Annotations.Labels) + log.Println("service.Spec.TaskTemplate.ContainerSpec.Labels ", service.Spec.TaskTemplate.ContainerSpec.Labels) + log.Println("service.Spec.Labels ", service.Spec.Labels) - replicaLabel := service.Spec.TaskTemplate.ContainerSpec.Labels["com.faas.max_replicas"] + minScale := service.Spec.Annotations.Labels["com.openfaas.scale.min"] + maxScale := service.Spec.Annotations.Labels["com.openfaas.scale.max"] - if len(replicaLabel) > 0 { - maxReplicasLabel, err := strconv.Atoi(replicaLabel) + if len(maxScale) > 0 { + labelValue, err := strconv.Atoi(maxScale) if err != nil { - log.Printf("Bad replica count: %s, should be uint.\n", replicaLabel) + log.Printf("Bad replica count: %s, should be uint", maxScale) } else { - maxReplicas = uint64(maxReplicasLabel) + maxReplicas = uint64(labelValue) + } + } + + if len(minScale) > 0 { + labelValue, err := strconv.Atoi(maxScale) + if err != nil { + log.Printf("Bad replica count: %s, should be uint", minScale) + } else { + minReplicas = uint64(labelValue) } } } - return currentReplicas, maxReplicas, err + return currentReplicas, maxReplicas, minReplicas, err } // SetReplicas update the replica count @@ -145,16 +160,17 @@ func scaleService(alert requests.PrometheusInnerAlert, sq ServiceQuery) error { serviceName := alert.Labels.FunctionName if len(serviceName) > 0 { - currentReplicas, maxReplicas, getErr := sq.GetReplicas(serviceName) + currentReplicas, maxReplicas, minReplicas, getErr := sq.GetReplicas(serviceName) if getErr == nil { status := alert.Status - newReplicas := CalculateReplicas(status, currentReplicas, uint64(maxReplicas)) + newReplicas := CalculateReplicas(status, currentReplicas, uint64(maxReplicas), minReplicas) log.Printf("[Scale] function=%s %d => %d.\n", serviceName, currentReplicas, newReplicas) if newReplicas == currentReplicas { return nil } + updateErr := sq.SetReplicas(serviceName, newReplicas) if updateErr != nil { err = updateErr @@ -165,7 +181,7 @@ func scaleService(alert requests.PrometheusInnerAlert, sq ServiceQuery) error { } // CalculateReplicas decides what replica count to set depending on current/desired amount -func CalculateReplicas(status string, currentReplicas uint64, maxReplicas uint64) uint64 { +func CalculateReplicas(status string, currentReplicas uint64, maxReplicas uint64, minReplicas uint64) uint64 { newReplicas := currentReplicas const step = 5 @@ -181,7 +197,7 @@ func CalculateReplicas(status string, currentReplicas uint64, maxReplicas uint64 } } } else { // Resolved event. - newReplicas = 1 + newReplicas = minReplicas } return newReplicas } diff --git a/gateway/handlers/create_handler.go b/gateway/handlers/create_handler.go index 4ba03f87..68f47e64 100644 --- a/gateway/handlers/create_handler.go +++ b/gateway/handlers/create_handler.go @@ -9,6 +9,7 @@ import ( "io/ioutil" "log" "net/http" + "strconv" "strings" "time" @@ -112,6 +113,11 @@ func makeSpec(request *requests.CreateFunctionRequest, maxRestarts uint64, resta Constraints: constraints, }, }, + Mode: swarm.ServiceMode{ + Replicated: &swarm.ReplicatedService{ + Replicas: getMinReplicas(request), + }, + }, } // TODO: request.EnvProcess should only be set if it's not nil, otherwise we override anything in the Docker image already @@ -211,3 +217,18 @@ func buildResources(request *requests.CreateFunctionRequest) *swarm.ResourceRequ } return resources } + +func getMinReplicas(request *requests.CreateFunctionRequest) *uint64 { + replicas := uint64(1) + + if request.Labels != nil { + if val, exists := (*request.Labels)["com.openfaas.scale.min"]; exists { + value, err := strconv.Atoi(val) + if err != nil { + log.Println(err) + } + replicas = uint64(value) + } + } + return &replicas +} diff --git a/gateway/handlers/update_handler.go b/gateway/handlers/update_handler.go index 0ea9fccb..9fe554d5 100644 --- a/gateway/handlers/update_handler.go +++ b/gateway/handlers/update_handler.go @@ -126,4 +126,8 @@ func updateSpec(request *requests.CreateFunctionRequest, spec *swarm.ServiceSpec if len(env) > 0 { spec.TaskTemplate.ContainerSpec.Env = env } + + if spec.Mode.Replicated != nil { + spec.Mode.Replicated.Replicas = getMinReplicas(request) + } } diff --git a/gateway/plugin/external.go b/gateway/plugin/external.go index ddf2d7f0..d8af10be 100644 --- a/gateway/plugin/external.go +++ b/gateway/plugin/external.go @@ -10,6 +10,7 @@ import ( "net" "net/http" "net/url" + "strconv" "time" "fmt" @@ -48,10 +49,8 @@ type ExternalServiceQuery struct { ProxyClient http.Client } -const maxReplicas = 40 - // GetReplicas replica count for function -func (s ExternalServiceQuery) GetReplicas(serviceName string) (uint64, uint64, error) { +func (s ExternalServiceQuery) GetReplicas(serviceName string) (uint64, uint64, uint64, error) { var err error function := requests.Function{} @@ -73,9 +72,34 @@ func (s ExternalServiceQuery) GetReplicas(serviceName string) (uint64, uint64, e } } - max := uint64(maxReplicas) + maxReplicas := uint64(handlers.DefaultMaxReplicas) + minReplicas := uint64(1) - return function.Replicas, max, err + if function.Labels != nil { + labels := *function.Labels + minScale := labels["com.openfaas.scale.min"] + maxScale := labels["com.openfaas.scale.max"] + + if len(minScale) > 0 { + labelValue, err := strconv.Atoi(minScale) + if err != nil { + log.Printf("Bad replica count: %s, should be uint", minScale) + } else { + minReplicas = uint64(labelValue) + } + } + + if len(maxScale) > 0 { + labelValue, err := strconv.Atoi(maxScale) + if err != nil { + log.Printf("Bad replica count: %s, should be uint", maxScale) + } else { + maxReplicas = uint64(labelValue) + } + } + } + + return function.Replicas, maxReplicas, minReplicas, err } // ScaleServiceRequest request scaling of replica @@ -103,10 +127,12 @@ func (s ExternalServiceQuery) SetReplicas(serviceName string, count uint64) erro defer req.Body.Close() res, err := s.ProxyClient.Do(req) - defer res.Body.Close() - if err != nil { log.Println(urlPath, err) + } else { + if res.Body != nil { + defer res.Body.Close() + } } if res.StatusCode != http.StatusOK { diff --git a/gateway/tests/alerthandler_test.go b/gateway/tests/alerthandler_test.go index 80d840e8..48f9fe9e 100644 --- a/gateway/tests/alerthandler_test.go +++ b/gateway/tests/alerthandler_test.go @@ -10,7 +10,8 @@ import ( ) func TestScale1to5(t *testing.T) { - newReplicas := handlers.CalculateReplicas("firing", 1, 20) + minReplicas := uint64(1) + newReplicas := handlers.CalculateReplicas("firing", 1, 20, minReplicas) if newReplicas != 5 { t.Log("Expected increment in blocks of 5 from 1 to 5") t.Fail() @@ -18,7 +19,8 @@ func TestScale1to5(t *testing.T) { } func TestScale5to10(t *testing.T) { - newReplicas := handlers.CalculateReplicas("firing", 5, 20) + minReplicas := uint64(1) + newReplicas := handlers.CalculateReplicas("firing", 5, 20, minReplicas) if newReplicas != 10 { t.Log("Expected increment in blocks of 5 from 5 to 10") t.Fail() @@ -26,7 +28,8 @@ func TestScale5to10(t *testing.T) { } func TestScaleCeilingOf20Replicas_Noaction(t *testing.T) { - newReplicas := handlers.CalculateReplicas("firing", 20, 20) + minReplicas := uint64(1) + newReplicas := handlers.CalculateReplicas("firing", 20, 20, minReplicas) if newReplicas != 20 { t.Log("Expected ceiling of 20 replicas") t.Fail() @@ -34,7 +37,8 @@ func TestScaleCeilingOf20Replicas_Noaction(t *testing.T) { } func TestScaleCeilingOf20Replicas(t *testing.T) { - newReplicas := handlers.CalculateReplicas("firing", 19, 20) + minReplicas := uint64(1) + newReplicas := handlers.CalculateReplicas("firing", 19, 20, minReplicas) if newReplicas != 20 { t.Log("Expected ceiling of 20 replicas") t.Fail() @@ -42,7 +46,8 @@ func TestScaleCeilingOf20Replicas(t *testing.T) { } func TestBackingOff10to1(t *testing.T) { - newReplicas := handlers.CalculateReplicas("resolved", 10, 20) + minReplicas := uint64(1) + newReplicas := handlers.CalculateReplicas("resolved", 10, 20, minReplicas) if newReplicas != 1 { t.Log("Expected backing off to 1 replica") t.Fail()