mirror of
https://github.com/openfaas/faas.git
synced 2025-06-21 14:23:25 +00:00
Compare commits
1 Commits
0.25.4
...
openfaaslt
Author | SHA1 | Date | |
---|---|---|---|
1f610f2128 |
@ -7,7 +7,7 @@ SERVER?=docker.io
|
||||
OWNER?=alexellis2
|
||||
NAME=gateway
|
||||
|
||||
.PHONY: build-local
|
||||
.PHONY: local-docker
|
||||
build-local:
|
||||
@echo $(SERVER)/$(OWNER)/$(NAME):$(TAG) \
|
||||
&& docker buildx create --use --name=multiarch --node multiarch \
|
||||
@ -17,8 +17,8 @@ build-local:
|
||||
--output "type=docker,push=false" \
|
||||
--tag $(SERVER)/$(OWNER)/$(NAME):$(TAG) .
|
||||
|
||||
.PHONY: build-push
|
||||
build-push:
|
||||
.PHONY: push-docker
|
||||
push-docker:
|
||||
@echo $(SERVER)/$(OWNER)/$(NAME):$(TAG) \
|
||||
&& docker buildx create --use --name=multiarch --node multiarch \
|
||||
&& docker buildx build \
|
||||
|
@ -82,6 +82,6 @@ type LoggingNotifier struct {
|
||||
// Notify the LoggingNotifier about a request
|
||||
func (LoggingNotifier) Notify(method string, URL string, originalURL string, statusCode int, event string, duration time.Duration) {
|
||||
if event == "completed" {
|
||||
log.Printf("Forwarded [%s] to %s - [%d] - %fs seconds", method, originalURL, statusCode, duration.Seconds())
|
||||
log.Printf("Forwarded [%s] to %s - [%d] - %.4fs", method, originalURL, statusCode, duration.Seconds())
|
||||
}
|
||||
}
|
||||
|
@ -48,6 +48,7 @@ func MakeScalingHandler(next http.HandlerFunc, scaler scaling.FunctionScaler, co
|
||||
return
|
||||
}
|
||||
|
||||
log.Printf("[Scale] function=%s.%s 0=>N timed-out after %fs\n", functionName, namespace, res.Duration.Seconds())
|
||||
log.Printf("[Scale] function=%s.%s 0=>N timed-out after %.4fs\n",
|
||||
functionName, namespace, res.Duration.Seconds())
|
||||
}
|
||||
}
|
||||
|
@ -102,7 +102,7 @@ func (s ExternalServiceQuery) GetReplicas(serviceName, serviceNamespace string)
|
||||
// log.Printf("GetReplicas [%s.%s] took: %fs", serviceName, serviceNamespace, time.Since(start).Seconds())
|
||||
|
||||
} else {
|
||||
log.Printf("GetReplicas [%s.%s] took: %fs, code: %d\n", serviceName, serviceNamespace, time.Since(start).Seconds(), res.StatusCode)
|
||||
log.Printf("GetReplicas [%s.%s] took: %.4fs, code: %d\n", serviceName, serviceNamespace, time.Since(start).Seconds(), res.StatusCode)
|
||||
return emptyServiceQueryResponse, fmt.Errorf("server returned non-200 status code (%d) for function, %s, body: %s", res.StatusCode, serviceName, string(bytesOut))
|
||||
}
|
||||
|
||||
@ -176,7 +176,8 @@ func (s ExternalServiceQuery) SetReplicas(serviceName, serviceNamespace string,
|
||||
err = fmt.Errorf("error scaling HTTP code %d, %s", res.StatusCode, urlPath)
|
||||
}
|
||||
|
||||
log.Printf("SetReplicas [%s.%s] took: %fs", serviceName, serviceNamespace, time.Since(start).Seconds())
|
||||
log.Printf("SetReplicas [%s.%s] took: %.4fs",
|
||||
serviceName, serviceNamespace, time.Since(start).Seconds())
|
||||
|
||||
return err
|
||||
}
|
||||
|
@ -39,6 +39,8 @@ type FunctionScaleResult struct {
|
||||
func (f *FunctionScaler) Scale(functionName, namespace string) FunctionScaleResult {
|
||||
start := time.Now()
|
||||
|
||||
// First check the cache, if there are available replicas, then the
|
||||
// request can be served.
|
||||
if cachedResponse, hit := f.Cache.Get(functionName, namespace); hit &&
|
||||
cachedResponse.AvailableReplicas > 0 {
|
||||
return FunctionScaleResult{
|
||||
@ -48,8 +50,10 @@ func (f *FunctionScaler) Scale(functionName, namespace string) FunctionScaleResu
|
||||
Duration: time.Since(start),
|
||||
}
|
||||
}
|
||||
getKey := fmt.Sprintf("GetReplicas-%s.%s", functionName, namespace)
|
||||
|
||||
// The wasn't a hit, or there were no available replicas found
|
||||
// so query the live endpoint
|
||||
getKey := fmt.Sprintf("GetReplicas-%s.%s", functionName, namespace)
|
||||
res, err, _ := f.SingleFlight.Do(getKey, func() (interface{}, error) {
|
||||
return f.Config.ServiceQuery.GetReplicas(functionName, namespace)
|
||||
})
|
||||
@ -71,16 +75,30 @@ func (f *FunctionScaler) Scale(functionName, namespace string) FunctionScaleResu
|
||||
}
|
||||
}
|
||||
|
||||
queryResponse := res.(ServiceQueryResponse)
|
||||
// Check if there are available replicas in the live data
|
||||
if res.(ServiceQueryResponse).AvailableReplicas > 0 {
|
||||
return FunctionScaleResult{
|
||||
Error: nil,
|
||||
Available: true,
|
||||
Found: true,
|
||||
Duration: time.Since(start),
|
||||
}
|
||||
}
|
||||
|
||||
// Store the result of GetReplicas in the cache
|
||||
queryResponse := res.(ServiceQueryResponse)
|
||||
f.Cache.Set(functionName, namespace, queryResponse)
|
||||
|
||||
if queryResponse.AvailableReplicas == 0 {
|
||||
// If the desired replica count is 0, then a scale up event
|
||||
// is required.
|
||||
if queryResponse.Replicas == 0 {
|
||||
minReplicas := uint64(1)
|
||||
if queryResponse.MinReplicas > 0 {
|
||||
minReplicas = queryResponse.MinReplicas
|
||||
}
|
||||
|
||||
// In a retry-loop, first query desired replicas, then
|
||||
// set them if the value is still at 0.
|
||||
scaleResult := types.Retry(func(attempt int) error {
|
||||
|
||||
res, err, _ := f.SingleFlight.Do(getKey, func() (interface{}, error) {
|
||||
@ -91,19 +109,23 @@ func (f *FunctionScaler) Scale(functionName, namespace string) FunctionScaleResu
|
||||
return err
|
||||
}
|
||||
|
||||
// Cache the response
|
||||
queryResponse = res.(ServiceQueryResponse)
|
||||
|
||||
f.Cache.Set(functionName, namespace, queryResponse)
|
||||
|
||||
// The scale up is complete because the desired replica count
|
||||
// has been set to 1 or more.
|
||||
if queryResponse.Replicas > 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Request a scale up to the minimum amount of replicas
|
||||
setKey := fmt.Sprintf("SetReplicas-%s.%s", functionName, namespace)
|
||||
|
||||
if _, err, _ := f.SingleFlight.Do(setKey, func() (interface{}, error) {
|
||||
|
||||
log.Printf("[Scale %d] function=%s 0 => %d requested", attempt, functionName, minReplicas)
|
||||
log.Printf("[Scale %d/%d] function=%s 0 => %d requested",
|
||||
attempt, int(f.Config.SetScaleRetries), functionName, minReplicas)
|
||||
|
||||
if err := f.Config.ServiceQuery.SetReplicas(functionName, namespace, minReplicas); err != nil {
|
||||
return nil, fmt.Errorf("unable to scale function [%s], err: %s", functionName, err)
|
||||
@ -126,43 +148,44 @@ func (f *FunctionScaler) Scale(functionName, namespace string) FunctionScaleResu
|
||||
}
|
||||
}
|
||||
|
||||
for i := 0; i < int(f.Config.MaxPollCount); i++ {
|
||||
}
|
||||
|
||||
res, err, _ := f.SingleFlight.Do(getKey, func() (interface{}, error) {
|
||||
return f.Config.ServiceQuery.GetReplicas(functionName, namespace)
|
||||
})
|
||||
queryResponse := res.(ServiceQueryResponse)
|
||||
// Holding pattern for at least one function replica to be available
|
||||
for i := 0; i < int(f.Config.MaxPollCount); i++ {
|
||||
|
||||
if err == nil {
|
||||
f.Cache.Set(functionName, namespace, queryResponse)
|
||||
}
|
||||
res, err, _ := f.SingleFlight.Do(getKey, func() (interface{}, error) {
|
||||
return f.Config.ServiceQuery.GetReplicas(functionName, namespace)
|
||||
})
|
||||
queryResponse := res.(ServiceQueryResponse)
|
||||
|
||||
totalTime := time.Since(start)
|
||||
|
||||
if err != nil {
|
||||
return FunctionScaleResult{
|
||||
Error: err,
|
||||
Available: false,
|
||||
Found: true,
|
||||
Duration: totalTime,
|
||||
}
|
||||
}
|
||||
|
||||
if queryResponse.AvailableReplicas > 0 {
|
||||
|
||||
log.Printf("[Scale] function=%s 0 => %d successful - %fs",
|
||||
functionName, queryResponse.AvailableReplicas, totalTime.Seconds())
|
||||
|
||||
return FunctionScaleResult{
|
||||
Error: nil,
|
||||
Available: true,
|
||||
Found: true,
|
||||
Duration: totalTime,
|
||||
}
|
||||
}
|
||||
|
||||
time.Sleep(f.Config.FunctionPollInterval)
|
||||
if err == nil {
|
||||
f.Cache.Set(functionName, namespace, queryResponse)
|
||||
}
|
||||
|
||||
totalTime := time.Since(start)
|
||||
|
||||
if err != nil {
|
||||
return FunctionScaleResult{
|
||||
Error: err,
|
||||
Available: false,
|
||||
Found: true,
|
||||
Duration: totalTime,
|
||||
}
|
||||
}
|
||||
|
||||
if queryResponse.AvailableReplicas > 0 {
|
||||
|
||||
log.Printf("[Ready] function=%s waited for - %.4fs", functionName, totalTime.Seconds())
|
||||
|
||||
return FunctionScaleResult{
|
||||
Error: nil,
|
||||
Available: true,
|
||||
Found: true,
|
||||
Duration: totalTime,
|
||||
}
|
||||
}
|
||||
|
||||
time.Sleep(f.Config.FunctionPollInterval)
|
||||
}
|
||||
|
||||
return FunctionScaleResult{
|
||||
|
Reference in New Issue
Block a user