Allow min-scale

Signed-off-by: Alex Ellis <alexellis2@gmail.com>
This commit is contained in:
Alex Ellis 2017-11-29 21:58:11 +00:00
parent 12c5f4926e
commit 2452fdea0b
6 changed files with 105 additions and 25 deletions

View File

@ -242,7 +242,7 @@ The `-d` value passes in the argument for your function. This is read via STDIN
Grab OS, CPU and other info via a Node.js container using the `os` module.
If you invoke this method in a while loop or with a load-generator tool then it will auto-scale to 5, 10, 15 and finally 20 replicas due to the load. You will then be able to see the various Docker containers responding with a different Hostname for each request as the work is distributed evenly.
If you invoke this method in a while loop or with a load-generator tool then it will auto-scale to 5, 10, 15 and finally 25 replicas due to the load. You will then be able to see the various Docker containers responding with a different Hostname for each request as the work is distributed evenly.
Here is a loop that can be used to invoke the function in a loop to trigger auto-scaling.
```
@ -262,6 +262,14 @@ CPU count: 1
Uptime: 776839
```
To control scaling behaviour you can set a min/max scale value with a label when deploying your function via the CLI or the API:
```
labels:
"com.openfaas.scale.min": "5"
"com.openfaas.scale.max": "15"
```
**Sample function: webhook stasher (webhookstash)**
Another cool sample function is the Webhook Stasher which saves the body of any data posted to the service to the container's filesystem. Each file is written with the filename of the UNIX time.

View File

@ -22,7 +22,7 @@ import (
const DefaultMaxReplicas = 20
type ServiceQuery interface {
GetReplicas(service string) (currentReplicas uint64, maxReplicas uint64, err error)
GetReplicas(service string) (currentReplicas uint64, maxReplicas uint64, minReplicas uint64, err error)
SetReplicas(service string, count uint64) error
}
@ -39,30 +39,45 @@ type SwarmServiceQuery struct {
}
// GetReplicas replica count for function
func (s SwarmServiceQuery) GetReplicas(serviceName string) (uint64, uint64, error) {
func (s SwarmServiceQuery) GetReplicas(serviceName string) (uint64, uint64, uint64, error) {
var err error
var currentReplicas uint64
maxReplicas := uint64(DefaultMaxReplicas)
minReplicas := uint64(1)
opts := types.ServiceInspectOptions{
InsertDefaults: true,
}
service, _, err := s.c.ServiceInspectWithRaw(context.Background(), serviceName, opts)
if err == nil {
currentReplicas = *service.Spec.Mode.Replicated.Replicas
log.Println("service.Spec.Annotations.Labels ", service.Spec.Annotations.Labels)
log.Println("service.Spec.TaskTemplate.ContainerSpec.Labels ", service.Spec.TaskTemplate.ContainerSpec.Labels)
log.Println("service.Spec.Labels ", service.Spec.Labels)
replicaLabel := service.Spec.TaskTemplate.ContainerSpec.Labels["com.faas.max_replicas"]
minScale := service.Spec.Annotations.Labels["com.openfaas.scale.min"]
maxScale := service.Spec.Annotations.Labels["com.openfaas.scale.max"]
if len(replicaLabel) > 0 {
maxReplicasLabel, err := strconv.Atoi(replicaLabel)
if len(maxScale) > 0 {
labelValue, err := strconv.Atoi(maxScale)
if err != nil {
log.Printf("Bad replica count: %s, should be uint.\n", replicaLabel)
log.Printf("Bad replica count: %s, should be uint", maxScale)
} else {
maxReplicas = uint64(maxReplicasLabel)
maxReplicas = uint64(labelValue)
}
}
if len(minScale) > 0 {
labelValue, err := strconv.Atoi(maxScale)
if err != nil {
log.Printf("Bad replica count: %s, should be uint", minScale)
} else {
minReplicas = uint64(labelValue)
}
}
}
return currentReplicas, maxReplicas, err
return currentReplicas, maxReplicas, minReplicas, err
}
// SetReplicas update the replica count
@ -145,16 +160,17 @@ func scaleService(alert requests.PrometheusInnerAlert, sq ServiceQuery) error {
serviceName := alert.Labels.FunctionName
if len(serviceName) > 0 {
currentReplicas, maxReplicas, getErr := sq.GetReplicas(serviceName)
currentReplicas, maxReplicas, minReplicas, getErr := sq.GetReplicas(serviceName)
if getErr == nil {
status := alert.Status
newReplicas := CalculateReplicas(status, currentReplicas, uint64(maxReplicas))
newReplicas := CalculateReplicas(status, currentReplicas, uint64(maxReplicas), minReplicas)
log.Printf("[Scale] function=%s %d => %d.\n", serviceName, currentReplicas, newReplicas)
if newReplicas == currentReplicas {
return nil
}
updateErr := sq.SetReplicas(serviceName, newReplicas)
if updateErr != nil {
err = updateErr
@ -165,7 +181,7 @@ func scaleService(alert requests.PrometheusInnerAlert, sq ServiceQuery) error {
}
// CalculateReplicas decides what replica count to set depending on current/desired amount
func CalculateReplicas(status string, currentReplicas uint64, maxReplicas uint64) uint64 {
func CalculateReplicas(status string, currentReplicas uint64, maxReplicas uint64, minReplicas uint64) uint64 {
newReplicas := currentReplicas
const step = 5
@ -181,7 +197,7 @@ func CalculateReplicas(status string, currentReplicas uint64, maxReplicas uint64
}
}
} else { // Resolved event.
newReplicas = 1
newReplicas = minReplicas
}
return newReplicas
}

View File

@ -9,6 +9,7 @@ import (
"io/ioutil"
"log"
"net/http"
"strconv"
"strings"
"time"
@ -112,6 +113,11 @@ func makeSpec(request *requests.CreateFunctionRequest, maxRestarts uint64, resta
Constraints: constraints,
},
},
Mode: swarm.ServiceMode{
Replicated: &swarm.ReplicatedService{
Replicas: getMinReplicas(request),
},
},
}
// TODO: request.EnvProcess should only be set if it's not nil, otherwise we override anything in the Docker image already
@ -211,3 +217,18 @@ func buildResources(request *requests.CreateFunctionRequest) *swarm.ResourceRequ
}
return resources
}
func getMinReplicas(request *requests.CreateFunctionRequest) *uint64 {
replicas := uint64(1)
if request.Labels != nil {
if val, exists := (*request.Labels)["com.openfaas.scale.min"]; exists {
value, err := strconv.Atoi(val)
if err != nil {
log.Println(err)
}
replicas = uint64(value)
}
}
return &replicas
}

View File

@ -126,4 +126,8 @@ func updateSpec(request *requests.CreateFunctionRequest, spec *swarm.ServiceSpec
if len(env) > 0 {
spec.TaskTemplate.ContainerSpec.Env = env
}
if spec.Mode.Replicated != nil {
spec.Mode.Replicated.Replicas = getMinReplicas(request)
}
}

View File

@ -10,6 +10,7 @@ import (
"net"
"net/http"
"net/url"
"strconv"
"time"
"fmt"
@ -48,10 +49,8 @@ type ExternalServiceQuery struct {
ProxyClient http.Client
}
const maxReplicas = 40
// GetReplicas replica count for function
func (s ExternalServiceQuery) GetReplicas(serviceName string) (uint64, uint64, error) {
func (s ExternalServiceQuery) GetReplicas(serviceName string) (uint64, uint64, uint64, error) {
var err error
function := requests.Function{}
@ -73,9 +72,34 @@ func (s ExternalServiceQuery) GetReplicas(serviceName string) (uint64, uint64, e
}
}
max := uint64(maxReplicas)
maxReplicas := uint64(handlers.DefaultMaxReplicas)
minReplicas := uint64(1)
return function.Replicas, max, err
if function.Labels != nil {
labels := *function.Labels
minScale := labels["com.openfaas.scale.min"]
maxScale := labels["com.openfaas.scale.max"]
if len(minScale) > 0 {
labelValue, err := strconv.Atoi(minScale)
if err != nil {
log.Printf("Bad replica count: %s, should be uint", minScale)
} else {
minReplicas = uint64(labelValue)
}
}
if len(maxScale) > 0 {
labelValue, err := strconv.Atoi(maxScale)
if err != nil {
log.Printf("Bad replica count: %s, should be uint", maxScale)
} else {
maxReplicas = uint64(labelValue)
}
}
}
return function.Replicas, maxReplicas, minReplicas, err
}
// ScaleServiceRequest request scaling of replica
@ -103,10 +127,12 @@ func (s ExternalServiceQuery) SetReplicas(serviceName string, count uint64) erro
defer req.Body.Close()
res, err := s.ProxyClient.Do(req)
defer res.Body.Close()
if err != nil {
log.Println(urlPath, err)
} else {
if res.Body != nil {
defer res.Body.Close()
}
}
if res.StatusCode != http.StatusOK {

View File

@ -10,7 +10,8 @@ import (
)
func TestScale1to5(t *testing.T) {
newReplicas := handlers.CalculateReplicas("firing", 1, 20)
minReplicas := uint64(1)
newReplicas := handlers.CalculateReplicas("firing", 1, 20, minReplicas)
if newReplicas != 5 {
t.Log("Expected increment in blocks of 5 from 1 to 5")
t.Fail()
@ -18,7 +19,8 @@ func TestScale1to5(t *testing.T) {
}
func TestScale5to10(t *testing.T) {
newReplicas := handlers.CalculateReplicas("firing", 5, 20)
minReplicas := uint64(1)
newReplicas := handlers.CalculateReplicas("firing", 5, 20, minReplicas)
if newReplicas != 10 {
t.Log("Expected increment in blocks of 5 from 5 to 10")
t.Fail()
@ -26,7 +28,8 @@ func TestScale5to10(t *testing.T) {
}
func TestScaleCeilingOf20Replicas_Noaction(t *testing.T) {
newReplicas := handlers.CalculateReplicas("firing", 20, 20)
minReplicas := uint64(1)
newReplicas := handlers.CalculateReplicas("firing", 20, 20, minReplicas)
if newReplicas != 20 {
t.Log("Expected ceiling of 20 replicas")
t.Fail()
@ -34,7 +37,8 @@ func TestScaleCeilingOf20Replicas_Noaction(t *testing.T) {
}
func TestScaleCeilingOf20Replicas(t *testing.T) {
newReplicas := handlers.CalculateReplicas("firing", 19, 20)
minReplicas := uint64(1)
newReplicas := handlers.CalculateReplicas("firing", 19, 20, minReplicas)
if newReplicas != 20 {
t.Log("Expected ceiling of 20 replicas")
t.Fail()
@ -42,7 +46,8 @@ func TestScaleCeilingOf20Replicas(t *testing.T) {
}
func TestBackingOff10to1(t *testing.T) {
newReplicas := handlers.CalculateReplicas("resolved", 10, 20)
minReplicas := uint64(1)
newReplicas := handlers.CalculateReplicas("resolved", 10, 20, minReplicas)
if newReplicas != 1 {
t.Log("Expected backing off to 1 replica")
t.Fail()