faas/gateway/handlers/scaling.go
Alex Ellis (OpenFaaS Ltd) 2bfca6d848 Publish to multiple topics
Enables publishing to various topics according to annotations
on the functions. The function cache is moved up one level so
that it can be shared between the scale from zero code and the
queue proxy.

Unit tests added for new internal methods.

Tested e2e with arkade and the newest queue-worker and RC
gateway image with two queues and an annotation on one of the
functions of com.openfaas.queue. It worked as expected including
with multiple namespace support.

Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alexellis2@gmail.com>
2020-04-22 15:26:42 +01:00

61 lines
1.8 KiB
Go

// Copyright (c) OpenFaaS Author(s). All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
package handlers
import (
"fmt"
"log"
"net/http"
"strings"
"github.com/openfaas/faas/gateway/scaling"
)
func getNamespace(defaultNamespace, fullName string) (string, string) {
if index := strings.LastIndex(fullName, "."); index > -1 {
return fullName[:index], fullName[index+1:]
}
return fullName, defaultNamespace
}
// MakeScalingHandler creates handler which can scale a function from
// zero to N replica(s). After scaling the next http.HandlerFunc will
// be called. If the function is not ready after the configured
// amount of attempts / queries then next will not be invoked and a status
// will be returned to the client.
func MakeScalingHandler(next http.HandlerFunc, scaler scaling.FunctionScaler, config scaling.ScalingConfig, defaultNamespace string) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
functionName, namespace := getNamespace(defaultNamespace, getServiceName(r.URL.String()))
res := scaler.Scale(functionName, namespace)
if !res.Found {
errStr := fmt.Sprintf("error finding function %s.%s: %s", functionName, namespace, res.Error.Error())
log.Printf("Scaling: %s\n", errStr)
w.WriteHeader(http.StatusNotFound)
w.Write([]byte(errStr))
return
}
if res.Error != nil {
errStr := fmt.Sprintf("error finding function %s.%s: %s", functionName, namespace, res.Error.Error())
log.Printf("Scaling: %s\n", errStr)
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(errStr))
return
}
if res.Available {
next.ServeHTTP(w, r)
return
}
log.Printf("[Scale] function=%s.%s 0=>N timed-out after %fs\n", functionName, namespace, res.Duration.Seconds())
}
}