diff --git a/gateway/Makefile b/gateway/Makefile index aef66896..10e2934f 100644 --- a/gateway/Makefile +++ b/gateway/Makefile @@ -7,7 +7,7 @@ SERVER?=docker.io OWNER?=alexellis2 NAME=gateway -.PHONY: build-local +.PHONY: local-docker build-local: @echo $(SERVER)/$(OWNER)/$(NAME):$(TAG) \ && docker buildx create --use --name=multiarch --node multiarch \ @@ -17,8 +17,8 @@ build-local: --output "type=docker,push=false" \ --tag $(SERVER)/$(OWNER)/$(NAME):$(TAG) . -.PHONY: build-push -build-push: +.PHONY: push-docker +push-docker: @echo $(SERVER)/$(OWNER)/$(NAME):$(TAG) \ && docker buildx create --use --name=multiarch --node multiarch \ && docker buildx build \ diff --git a/gateway/handlers/notifiers.go b/gateway/handlers/notifiers.go index 4795b90f..261b2e61 100644 --- a/gateway/handlers/notifiers.go +++ b/gateway/handlers/notifiers.go @@ -82,6 +82,6 @@ type LoggingNotifier struct { // Notify the LoggingNotifier about a request func (LoggingNotifier) Notify(method string, URL string, originalURL string, statusCode int, event string, duration time.Duration) { if event == "completed" { - log.Printf("Forwarded [%s] to %s - [%d] - %fs seconds", method, originalURL, statusCode, duration.Seconds()) + log.Printf("Forwarded [%s] to %s - [%d] - %.4fs", method, originalURL, statusCode, duration.Seconds()) } } diff --git a/gateway/handlers/scaling.go b/gateway/handlers/scaling.go index 69f1d3fa..4d0abd25 100644 --- a/gateway/handlers/scaling.go +++ b/gateway/handlers/scaling.go @@ -48,6 +48,7 @@ func MakeScalingHandler(next http.HandlerFunc, scaler scaling.FunctionScaler, co return } - log.Printf("[Scale] function=%s.%s 0=>N timed-out after %fs\n", functionName, namespace, res.Duration.Seconds()) + log.Printf("[Scale] function=%s.%s 0=>N timed-out after %.4fs\n", + functionName, namespace, res.Duration.Seconds()) } } diff --git a/gateway/plugin/external.go b/gateway/plugin/external.go index a23fa6ec..571971d1 100644 --- a/gateway/plugin/external.go +++ b/gateway/plugin/external.go @@ -102,7 +102,7 @@ func (s ExternalServiceQuery) GetReplicas(serviceName, serviceNamespace string) // log.Printf("GetReplicas [%s.%s] took: %fs", serviceName, serviceNamespace, time.Since(start).Seconds()) } else { - log.Printf("GetReplicas [%s.%s] took: %fs, code: %d\n", serviceName, serviceNamespace, time.Since(start).Seconds(), res.StatusCode) + log.Printf("GetReplicas [%s.%s] took: %.4fs, code: %d\n", serviceName, serviceNamespace, time.Since(start).Seconds(), res.StatusCode) return emptyServiceQueryResponse, fmt.Errorf("server returned non-200 status code (%d) for function, %s, body: %s", res.StatusCode, serviceName, string(bytesOut)) } @@ -176,7 +176,8 @@ func (s ExternalServiceQuery) SetReplicas(serviceName, serviceNamespace string, err = fmt.Errorf("error scaling HTTP code %d, %s", res.StatusCode, urlPath) } - log.Printf("SetReplicas [%s.%s] took: %fs", serviceName, serviceNamespace, time.Since(start).Seconds()) + log.Printf("SetReplicas [%s.%s] took: %.4fs", + serviceName, serviceNamespace, time.Since(start).Seconds()) return err } diff --git a/gateway/scaling/function_scaler.go b/gateway/scaling/function_scaler.go index 4ca0aa90..cf3ce8d6 100644 --- a/gateway/scaling/function_scaler.go +++ b/gateway/scaling/function_scaler.go @@ -39,6 +39,8 @@ type FunctionScaleResult struct { func (f *FunctionScaler) Scale(functionName, namespace string) FunctionScaleResult { start := time.Now() + // First check the cache, if there are available replicas, then the + // request can be served. if cachedResponse, hit := f.Cache.Get(functionName, namespace); hit && cachedResponse.AvailableReplicas > 0 { return FunctionScaleResult{ @@ -48,8 +50,10 @@ func (f *FunctionScaler) Scale(functionName, namespace string) FunctionScaleResu Duration: time.Since(start), } } - getKey := fmt.Sprintf("GetReplicas-%s.%s", functionName, namespace) + // The wasn't a hit, or there were no available replicas found + // so query the live endpoint + getKey := fmt.Sprintf("GetReplicas-%s.%s", functionName, namespace) res, err, _ := f.SingleFlight.Do(getKey, func() (interface{}, error) { return f.Config.ServiceQuery.GetReplicas(functionName, namespace) }) @@ -71,16 +75,30 @@ func (f *FunctionScaler) Scale(functionName, namespace string) FunctionScaleResu } } - queryResponse := res.(ServiceQueryResponse) + // Check if there are available replicas in the live data + if res.(ServiceQueryResponse).AvailableReplicas > 0 { + return FunctionScaleResult{ + Error: nil, + Available: true, + Found: true, + Duration: time.Since(start), + } + } + // Store the result of GetReplicas in the cache + queryResponse := res.(ServiceQueryResponse) f.Cache.Set(functionName, namespace, queryResponse) - if queryResponse.AvailableReplicas == 0 { + // If the desired replica count is 0, then a scale up event + // is required. + if queryResponse.Replicas == 0 { minReplicas := uint64(1) if queryResponse.MinReplicas > 0 { minReplicas = queryResponse.MinReplicas } + // In a retry-loop, first query desired replicas, then + // set them if the value is still at 0. scaleResult := types.Retry(func(attempt int) error { res, err, _ := f.SingleFlight.Do(getKey, func() (interface{}, error) { @@ -91,19 +109,23 @@ func (f *FunctionScaler) Scale(functionName, namespace string) FunctionScaleResu return err } + // Cache the response queryResponse = res.(ServiceQueryResponse) - f.Cache.Set(functionName, namespace, queryResponse) + // The scale up is complete because the desired replica count + // has been set to 1 or more. if queryResponse.Replicas > 0 { return nil } + // Request a scale up to the minimum amount of replicas setKey := fmt.Sprintf("SetReplicas-%s.%s", functionName, namespace) if _, err, _ := f.SingleFlight.Do(setKey, func() (interface{}, error) { - log.Printf("[Scale %d] function=%s 0 => %d requested", attempt, functionName, minReplicas) + log.Printf("[Scale %d/%d] function=%s 0 => %d requested", + attempt, int(f.Config.SetScaleRetries), functionName, minReplicas) if err := f.Config.ServiceQuery.SetReplicas(functionName, namespace, minReplicas); err != nil { return nil, fmt.Errorf("unable to scale function [%s], err: %s", functionName, err) @@ -126,43 +148,44 @@ func (f *FunctionScaler) Scale(functionName, namespace string) FunctionScaleResu } } - for i := 0; i < int(f.Config.MaxPollCount); i++ { + } - res, err, _ := f.SingleFlight.Do(getKey, func() (interface{}, error) { - return f.Config.ServiceQuery.GetReplicas(functionName, namespace) - }) - queryResponse := res.(ServiceQueryResponse) + // Holding pattern for at least one function replica to be available + for i := 0; i < int(f.Config.MaxPollCount); i++ { - if err == nil { - f.Cache.Set(functionName, namespace, queryResponse) - } + res, err, _ := f.SingleFlight.Do(getKey, func() (interface{}, error) { + return f.Config.ServiceQuery.GetReplicas(functionName, namespace) + }) + queryResponse := res.(ServiceQueryResponse) - totalTime := time.Since(start) - - if err != nil { - return FunctionScaleResult{ - Error: err, - Available: false, - Found: true, - Duration: totalTime, - } - } - - if queryResponse.AvailableReplicas > 0 { - - log.Printf("[Scale] function=%s 0 => %d successful - %fs", - functionName, queryResponse.AvailableReplicas, totalTime.Seconds()) - - return FunctionScaleResult{ - Error: nil, - Available: true, - Found: true, - Duration: totalTime, - } - } - - time.Sleep(f.Config.FunctionPollInterval) + if err == nil { + f.Cache.Set(functionName, namespace, queryResponse) } + + totalTime := time.Since(start) + + if err != nil { + return FunctionScaleResult{ + Error: err, + Available: false, + Found: true, + Duration: totalTime, + } + } + + if queryResponse.AvailableReplicas > 0 { + + log.Printf("[Ready] function=%s waited for - %.4fs", functionName, totalTime.Seconds()) + + return FunctionScaleResult{ + Error: nil, + Available: true, + Found: true, + Duration: totalTime, + } + } + + time.Sleep(f.Config.FunctionPollInterval) } return FunctionScaleResult{