Make OpenFaaS CE use the provider for load-balancing

This change removes the direct functions option which was
used originally for Docker Swarm. The Community Edition will
rely on the faas provider - faas-netes / faasd for load-balancing
of requests.

Direct Functions is required in order to delegate load-balancing
to Istio, Linkerd or some other kind of service mesh.

Tested by deploying a modified gateway image to a KinD cluster,
deploying the env function, and scaling to two replicas. This
balanced the load between the two pods by printing out the names
and then I ran a test with hey which returned 200s for all the
requests.

The prober which was part of the Istio support is no longer
required in the CE gateway so is removed for simplicity.

Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alex@openfaas.com>
This commit is contained in:
Alex Ellis (OpenFaaS Ltd) 2023-01-11 12:12:26 +00:00
parent b1ef4b49b7
commit a7d486eee6
7 changed files with 5 additions and 350 deletions

View File

@ -29,7 +29,6 @@ COPY types types
COPY plugin plugin
COPY version version
COPY scaling scaling
COPY probing probing
COPY pkg pkg
COPY main.go .

View File

@ -1,45 +0,0 @@
package handlers
import (
"fmt"
"net/http"
"golang.org/x/sync/singleflight"
"github.com/openfaas/faas/gateway/pkg/middleware"
"github.com/openfaas/faas/gateway/probing"
)
func MakeProbeHandler(prober probing.FunctionProber, cache probing.ProbeCacher, resolver middleware.BaseURLResolver, next http.HandlerFunc, defaultNamespace string) http.HandlerFunc {
group := singleflight.Group{}
return func(w http.ResponseWriter, r *http.Request) {
functionName, namespace := middleware.GetNamespace(defaultNamespace, middleware.GetServiceName(r.URL.String()))
key := fmt.Sprintf("Probe-%s.%s", functionName, namespace)
res, _, _ := group.Do(key, func() (interface{}, error) {
cached, hit := cache.Get(functionName, namespace)
var probeResult probing.FunctionProbeResult
if hit && cached != nil && cached.Available {
probeResult = *cached
} else {
probeResult = prober.Probe(functionName, namespace)
cache.Set(functionName, namespace, &probeResult)
}
return probeResult, nil
})
fnRes := res.(probing.FunctionProbeResult)
if !fnRes.Available {
http.Error(w, fmt.Sprintf("unable to probe function endpoint %s", fnRes.Error),
http.StatusServiceUnavailable)
return
}
next(w, r)
}
}

View File

@ -15,7 +15,6 @@ import (
"github.com/openfaas/faas/gateway/metrics"
"github.com/openfaas/faas/gateway/pkg/middleware"
"github.com/openfaas/faas/gateway/plugin"
"github.com/openfaas/faas/gateway/probing"
"github.com/openfaas/faas/gateway/scaling"
"github.com/openfaas/faas/gateway/types"
"github.com/openfaas/faas/gateway/version"
@ -97,16 +96,8 @@ func main() {
nilURLTransformer := middleware.TransparentURLPathTransformer{}
trimURLTransformer := middleware.FunctionPrefixTrimmingURLPathTransformer{}
if config.DirectFunctions {
functionURLResolver = middleware.FunctionAsHostBaseURLResolver{
FunctionSuffix: config.DirectFunctionsSuffix,
FunctionNamespace: config.Namespace,
}
functionURLTransformer = trimURLTransformer
} else {
functionURLResolver = urlResolver
functionURLTransformer = nilURLTransformer
}
var serviceAuthInjector middleware.AuthInjector
@ -155,13 +146,6 @@ func main() {
functionProxy := faasHandlers.Proxy
if config.ProbeFunctions {
prober := probing.NewFunctionProber(cachedFunctionQuery, functionURLResolver)
// Default of 5 seconds between refreshing probes for function invocations
probeCache := probing.NewProbeCache(time.Second * 5)
functionProxy = handlers.MakeProbeHandler(prober, probeCache, functionURLResolver, functionProxy, config.Namespace)
}
if config.ScaleFromZero {
scalingFunctionCache := scaling.NewFunctionCache(scalingConfig.CacheExpiry)
scaler := scaling.NewFunctionScaler(scalingConfig, scalingFunctionCache)
@ -169,7 +153,9 @@ func main() {
}
if config.UseNATS() {
log.Println("Async enabled: Using NATS Streaming.")
log.Println("Async enabled: Using NATS Streaming")
log.Println("Deprecation Notice: NATS Streaming is no longer maintained and won't receive updates from June 2023")
maxReconnect := 60
interval := time.Second * 2

View File

@ -1,58 +0,0 @@
// Copyright (c) OpenFaaS Author(s). All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
package probing
import (
"fmt"
"sync"
"time"
)
// ProbeCacher queries functions and caches the results
type ProbeCacher interface {
Set(functionName, namespace string, result *FunctionProbeResult)
Get(functionName, namespace string) (result *FunctionProbeResult, hit bool)
}
// ProbeCache provides a cache of Probe replica counts
type ProbeCache struct {
Cache map[string]*FunctionProbeResult
Expiry time.Duration
Sync sync.RWMutex
}
// NewProbeCache creates a function cache to query function metadata
func NewProbeCache(cacheExpiry time.Duration) ProbeCacher {
return &ProbeCache{
Cache: make(map[string]*FunctionProbeResult),
Expiry: cacheExpiry,
}
}
// Set replica count for functionName
func (fc *ProbeCache) Set(functionName, namespace string, result *FunctionProbeResult) {
fc.Sync.Lock()
defer fc.Sync.Unlock()
fc.Cache[functionName+"."+namespace] = result
}
func (fc *ProbeCache) Get(functionName, namespace string) (*FunctionProbeResult, bool) {
result := &FunctionProbeResult{
Available: false,
Error: fmt.Errorf("unavailable in cache"),
}
hit := false
fc.Sync.RLock()
defer fc.Sync.RUnlock()
if val, exists := fc.Cache[functionName+"."+namespace]; exists {
hit = val.Expired(fc.Expiry) == false
result = val
}
return result, hit
}

View File

@ -1,116 +0,0 @@
// Copyright (c) OpenFaaS Author(s). All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
package probing
import (
"fmt"
"log"
"net/http"
"time"
"github.com/openfaas/faas/gateway/pkg/middleware"
"github.com/openfaas/faas/gateway/scaling"
"github.com/openfaas/faas/gateway/types"
)
// NewFunctionProber create a new scaler with the specified
// ScalingConfig
func NewFunctionProber(functionQuery scaling.FunctionQuery, resolver middleware.BaseURLResolver) FunctionProber {
// if directFunctions {
return &FunctionHTTPProber{
Query: functionQuery,
Resolver: resolver,
}
}
// FunctionHTTPProber probes a function's health endpoint
type FunctionHTTPProber struct {
Query scaling.FunctionQuery
Resolver middleware.BaseURLResolver
DirectFunctions bool
}
type FunctionNonProber struct {
}
func (f *FunctionNonProber) Probe(functionName, namespace string) FunctionProbeResult {
return FunctionProbeResult{
Found: true,
Available: true,
}
}
type FunctionProber interface {
Probe(functionName, namespace string) FunctionProbeResult
}
// FunctionProbeResult holds the result of scaling from zero
type FunctionProbeResult struct {
Available bool
Error error
Found bool
Duration time.Duration
Updated time.Time
}
// Expired find out whether the cache item has expired with
// the given expiry duration from when it was stored.
func (res *FunctionProbeResult) Expired(expiry time.Duration) bool {
return time.Now().After(res.Updated.Add(expiry))
}
// Scale scales a function from zero replicas to 1 or the value set in
// the minimum replicas metadata
func (f *FunctionHTTPProber) Probe(functionName, namespace string) FunctionProbeResult {
start := time.Now()
cachedResponse, _ := f.Query.Get(functionName, namespace)
probePath := "/_/health"
if cachedResponse.Annotations != nil {
if v, ok := (*cachedResponse.Annotations)["com.openfaas.http.path"]; ok && len(v) > 0 {
probePath = v
}
}
maxCount := 10
pollInterval := time.Millisecond * 50
err := types.Retry(func(attempt int) error {
u := f.Resolver.BuildURL(functionName, namespace, probePath, true)
r, _ := http.NewRequest(http.MethodGet, u, nil)
r.Header.Set("User-Agent", "com.openfaas.gateway/probe")
resp, err := http.DefaultClient.Do(r)
if err != nil {
return err
}
log.Printf("[Probe] %s => %d", u, resp.StatusCode)
if resp.StatusCode == http.StatusOK {
return nil
}
return fmt.Errorf("failed with status: %s", resp.Status)
}, "Probe", maxCount, pollInterval)
if err != nil {
return FunctionProbeResult{
Error: err,
Available: false,
Found: true,
Duration: time.Since(start),
Updated: time.Now(),
}
}
return FunctionProbeResult{
Error: nil,
Available: true,
Found: true,
Duration: time.Since(start),
Updated: time.Now(),
}
}

View File

@ -8,7 +8,6 @@ import (
"net/url"
"os"
"strconv"
"strings"
"time"
)
@ -129,9 +128,6 @@ func (ReadConfig) Read(hasEnv HasEnv) (*GatewayConfig, error) {
cfg.PrometheusHost = prometheusHost
}
cfg.DirectFunctions = parseBoolValue(hasEnv.Getenv("direct_functions"))
cfg.DirectFunctionsSuffix = hasEnv.Getenv("direct_functions_suffix")
cfg.UseBasicAuth = parseBoolValue(hasEnv.Getenv("basic_auth"))
secretPath := hasEnv.Getenv("secret_mount_path")
@ -169,14 +165,6 @@ func (ReadConfig) Read(hasEnv HasEnv) (*GatewayConfig, error) {
cfg.Namespace = hasEnv.Getenv("function_namespace")
if len(cfg.DirectFunctionsSuffix) > 0 && len(cfg.Namespace) > 0 {
if strings.HasPrefix(cfg.DirectFunctionsSuffix, cfg.Namespace) == false {
return nil, fmt.Errorf("function_namespace must be a sub-string of direct_functions_suffix")
}
}
cfg.ProbeFunctions = parseBoolValue(hasEnv.Getenv("probe_functions"))
return &cfg, nil
}
@ -216,12 +204,6 @@ type GatewayConfig struct {
// Port to connect to Prometheus.
PrometheusPort int
// If set to true we will access upstream functions directly rather than through the upstream provider
DirectFunctions bool
// If set this will be used to resolve functions directly
DirectFunctionsSuffix string
// If set, reads secrets from file-system for enabling basic auth.
UseBasicAuth bool
@ -245,9 +227,6 @@ type GatewayConfig struct {
// Namespace for endpoints
Namespace string
// ProbeFunctions requires the gateway to probe the health endpoint of a function before invoking it
ProbeFunctions bool
}
// UseNATS Use NATSor not

View File

@ -38,16 +38,6 @@ func TestRead_UseExternalProvider_Defaults(t *testing.T) {
t.Fail()
}
if config.DirectFunctions != false {
t.Log("Default for DirectFunctions should be false")
t.Fail()
}
if len(config.DirectFunctionsSuffix) > 0 {
t.Log("Default for DirectFunctionsSuffix should be empty")
t.Fail()
}
if len(config.Namespace) > 0 {
t.Log("Default for Namespace should be empty")
t.Fail()
@ -89,86 +79,6 @@ func TestRead_NamespaceOverrideAgressWithFunctionSuffix_Valid(t *testing.T) {
}
}
func TestRead_NamespaceOverrideAgressWithFunctionSuffix_Invalid(t *testing.T) {
defaults := NewEnvBucket()
readConfig := ReadConfig{}
defaults.Setenv("direct_functions", "true")
wantSuffix := "openfaas-fn.cluster.local.svc."
defaults.Setenv("direct_functions_suffix", wantSuffix)
defaults.Setenv("function_namespace", "fn")
_, err := readConfig.Read(defaults)
if err == nil {
t.Logf("Expected an error because function_namespace should be a sub-string of direct_functions_suffix")
t.Fail()
return
}
want := "function_namespace must be a sub-string of direct_functions_suffix"
if want != err.Error() {
t.Logf("Error want: %s, got: %s", want, err.Error())
t.Fail()
}
}
func TestRead_DirectFunctionsOverride(t *testing.T) {
defaults := NewEnvBucket()
readConfig := ReadConfig{}
defaults.Setenv("direct_functions", "true")
wantSuffix := "openfaas-fn.cluster.local.svc."
defaults.Setenv("direct_functions_suffix", wantSuffix)
config, _ := readConfig.Read(defaults)
if config.DirectFunctions != true {
t.Logf("DirectFunctions should be true, got: %v", config.DirectFunctions)
t.Fail()
}
if config.DirectFunctionsSuffix != wantSuffix {
t.Logf("DirectFunctionsSuffix want: %s, got: %s", wantSuffix, config.DirectFunctionsSuffix)
t.Fail()
}
}
func TestRead_ProbeFunctions_Default(t *testing.T) {
defaults := NewEnvBucket()
readConfig := ReadConfig{}
defaults.Setenv("probe_functions", "")
want := false
config, _ := readConfig.Read(defaults)
got := config.ProbeFunctions
if want != got {
t.Logf("ProbeFunctions want %v, but got %v", want, got)
t.Fail()
}
}
func TestRead_ProbeFunctions_Enabled(t *testing.T) {
defaults := NewEnvBucket()
readConfig := ReadConfig{}
defaults.Setenv("probe_functions", "true")
want := true
config, _ := readConfig.Read(defaults)
got := config.ProbeFunctions
if want != got {
t.Logf("ProbeFunctions want %v, but got %v", want, got)
t.Fail()
}
}
func TestRead_ScaleZeroDefaultAndOverride(t *testing.T) {
defaults := NewEnvBucket()
readConfig := ReadConfig{}