From a7d486eee60702c015e91eac0faee33e2a606777 Mon Sep 17 00:00:00 2001 From: "Alex Ellis (OpenFaaS Ltd)" Date: Wed, 11 Jan 2023 12:12:26 +0000 Subject: [PATCH] Make OpenFaaS CE use the provider for load-balancing This change removes the direct functions option which was used originally for Docker Swarm. The Community Edition will rely on the faas provider - faas-netes / faasd for load-balancing of requests. Direct Functions is required in order to delegate load-balancing to Istio, Linkerd or some other kind of service mesh. Tested by deploying a modified gateway image to a KinD cluster, deploying the env function, and scaling to two replicas. This balanced the load between the two pods by printing out the names and then I ran a test with hey which returned 200s for all the requests. The prober which was part of the Istio support is no longer required in the CE gateway so is removed for simplicity. Signed-off-by: Alex Ellis (OpenFaaS Ltd) --- gateway/Dockerfile | 1 - gateway/handlers/probe_handler.go | 45 ------------ gateway/main.go | 24 ++----- gateway/probing/cache.go | 58 --------------- gateway/probing/prober.go | 116 ------------------------------ gateway/types/readconfig.go | 21 ------ gateway/types/readconfig_test.go | 90 ----------------------- 7 files changed, 5 insertions(+), 350 deletions(-) delete mode 100644 gateway/handlers/probe_handler.go delete mode 100644 gateway/probing/cache.go delete mode 100644 gateway/probing/prober.go diff --git a/gateway/Dockerfile b/gateway/Dockerfile index 7fc0765c..f745912f 100644 --- a/gateway/Dockerfile +++ b/gateway/Dockerfile @@ -29,7 +29,6 @@ COPY types types COPY plugin plugin COPY version version COPY scaling scaling -COPY probing probing COPY pkg pkg COPY main.go . diff --git a/gateway/handlers/probe_handler.go b/gateway/handlers/probe_handler.go deleted file mode 100644 index 47f83c8e..00000000 --- a/gateway/handlers/probe_handler.go +++ /dev/null @@ -1,45 +0,0 @@ -package handlers - -import ( - "fmt" - "net/http" - - "golang.org/x/sync/singleflight" - - "github.com/openfaas/faas/gateway/pkg/middleware" - "github.com/openfaas/faas/gateway/probing" -) - -func MakeProbeHandler(prober probing.FunctionProber, cache probing.ProbeCacher, resolver middleware.BaseURLResolver, next http.HandlerFunc, defaultNamespace string) http.HandlerFunc { - - group := singleflight.Group{} - - return func(w http.ResponseWriter, r *http.Request) { - functionName, namespace := middleware.GetNamespace(defaultNamespace, middleware.GetServiceName(r.URL.String())) - - key := fmt.Sprintf("Probe-%s.%s", functionName, namespace) - res, _, _ := group.Do(key, func() (interface{}, error) { - - cached, hit := cache.Get(functionName, namespace) - var probeResult probing.FunctionProbeResult - if hit && cached != nil && cached.Available { - probeResult = *cached - } else { - probeResult = prober.Probe(functionName, namespace) - cache.Set(functionName, namespace, &probeResult) - } - - return probeResult, nil - }) - - fnRes := res.(probing.FunctionProbeResult) - - if !fnRes.Available { - http.Error(w, fmt.Sprintf("unable to probe function endpoint %s", fnRes.Error), - http.StatusServiceUnavailable) - return - } - - next(w, r) - } -} diff --git a/gateway/main.go b/gateway/main.go index 9e776df3..7d7ad756 100644 --- a/gateway/main.go +++ b/gateway/main.go @@ -15,7 +15,6 @@ import ( "github.com/openfaas/faas/gateway/metrics" "github.com/openfaas/faas/gateway/pkg/middleware" "github.com/openfaas/faas/gateway/plugin" - "github.com/openfaas/faas/gateway/probing" "github.com/openfaas/faas/gateway/scaling" "github.com/openfaas/faas/gateway/types" "github.com/openfaas/faas/gateway/version" @@ -97,16 +96,8 @@ func main() { nilURLTransformer := middleware.TransparentURLPathTransformer{} trimURLTransformer := middleware.FunctionPrefixTrimmingURLPathTransformer{} - if config.DirectFunctions { - functionURLResolver = middleware.FunctionAsHostBaseURLResolver{ - FunctionSuffix: config.DirectFunctionsSuffix, - FunctionNamespace: config.Namespace, - } - functionURLTransformer = trimURLTransformer - } else { - functionURLResolver = urlResolver - functionURLTransformer = nilURLTransformer - } + functionURLResolver = urlResolver + functionURLTransformer = nilURLTransformer var serviceAuthInjector middleware.AuthInjector @@ -155,13 +146,6 @@ func main() { functionProxy := faasHandlers.Proxy - if config.ProbeFunctions { - prober := probing.NewFunctionProber(cachedFunctionQuery, functionURLResolver) - // Default of 5 seconds between refreshing probes for function invocations - probeCache := probing.NewProbeCache(time.Second * 5) - functionProxy = handlers.MakeProbeHandler(prober, probeCache, functionURLResolver, functionProxy, config.Namespace) - } - if config.ScaleFromZero { scalingFunctionCache := scaling.NewFunctionCache(scalingConfig.CacheExpiry) scaler := scaling.NewFunctionScaler(scalingConfig, scalingFunctionCache) @@ -169,7 +153,9 @@ func main() { } if config.UseNATS() { - log.Println("Async enabled: Using NATS Streaming.") + log.Println("Async enabled: Using NATS Streaming") + log.Println("Deprecation Notice: NATS Streaming is no longer maintained and won't receive updates from June 2023") + maxReconnect := 60 interval := time.Second * 2 diff --git a/gateway/probing/cache.go b/gateway/probing/cache.go deleted file mode 100644 index d7d02ada..00000000 --- a/gateway/probing/cache.go +++ /dev/null @@ -1,58 +0,0 @@ -// Copyright (c) OpenFaaS Author(s). All rights reserved. -// Licensed under the MIT license. See LICENSE file in the project root for full license information. - -package probing - -import ( - "fmt" - "sync" - "time" -) - -// ProbeCacher queries functions and caches the results -type ProbeCacher interface { - Set(functionName, namespace string, result *FunctionProbeResult) - Get(functionName, namespace string) (result *FunctionProbeResult, hit bool) -} - -// ProbeCache provides a cache of Probe replica counts -type ProbeCache struct { - Cache map[string]*FunctionProbeResult - Expiry time.Duration - Sync sync.RWMutex -} - -// NewProbeCache creates a function cache to query function metadata -func NewProbeCache(cacheExpiry time.Duration) ProbeCacher { - return &ProbeCache{ - Cache: make(map[string]*FunctionProbeResult), - Expiry: cacheExpiry, - } -} - -// Set replica count for functionName -func (fc *ProbeCache) Set(functionName, namespace string, result *FunctionProbeResult) { - fc.Sync.Lock() - defer fc.Sync.Unlock() - - fc.Cache[functionName+"."+namespace] = result -} - -func (fc *ProbeCache) Get(functionName, namespace string) (*FunctionProbeResult, bool) { - - result := &FunctionProbeResult{ - Available: false, - Error: fmt.Errorf("unavailable in cache"), - } - - hit := false - fc.Sync.RLock() - defer fc.Sync.RUnlock() - - if val, exists := fc.Cache[functionName+"."+namespace]; exists { - hit = val.Expired(fc.Expiry) == false - result = val - } - - return result, hit -} diff --git a/gateway/probing/prober.go b/gateway/probing/prober.go deleted file mode 100644 index b3f1aa3e..00000000 --- a/gateway/probing/prober.go +++ /dev/null @@ -1,116 +0,0 @@ -// Copyright (c) OpenFaaS Author(s). All rights reserved. -// Licensed under the MIT license. See LICENSE file in the project root for full license information. - -package probing - -import ( - "fmt" - "log" - "net/http" - "time" - - "github.com/openfaas/faas/gateway/pkg/middleware" - "github.com/openfaas/faas/gateway/scaling" - "github.com/openfaas/faas/gateway/types" -) - -// NewFunctionProber create a new scaler with the specified -// ScalingConfig -func NewFunctionProber(functionQuery scaling.FunctionQuery, resolver middleware.BaseURLResolver) FunctionProber { - // if directFunctions { - return &FunctionHTTPProber{ - Query: functionQuery, - Resolver: resolver, - } -} - -// FunctionHTTPProber probes a function's health endpoint -type FunctionHTTPProber struct { - Query scaling.FunctionQuery - Resolver middleware.BaseURLResolver - DirectFunctions bool -} - -type FunctionNonProber struct { -} - -func (f *FunctionNonProber) Probe(functionName, namespace string) FunctionProbeResult { - return FunctionProbeResult{ - Found: true, - Available: true, - } -} - -type FunctionProber interface { - Probe(functionName, namespace string) FunctionProbeResult -} - -// FunctionProbeResult holds the result of scaling from zero -type FunctionProbeResult struct { - Available bool - Error error - Found bool - Duration time.Duration - Updated time.Time -} - -// Expired find out whether the cache item has expired with -// the given expiry duration from when it was stored. -func (res *FunctionProbeResult) Expired(expiry time.Duration) bool { - return time.Now().After(res.Updated.Add(expiry)) -} - -// Scale scales a function from zero replicas to 1 or the value set in -// the minimum replicas metadata -func (f *FunctionHTTPProber) Probe(functionName, namespace string) FunctionProbeResult { - start := time.Now() - - cachedResponse, _ := f.Query.Get(functionName, namespace) - probePath := "/_/health" - - if cachedResponse.Annotations != nil { - if v, ok := (*cachedResponse.Annotations)["com.openfaas.http.path"]; ok && len(v) > 0 { - probePath = v - } - } - - maxCount := 10 - pollInterval := time.Millisecond * 50 - - err := types.Retry(func(attempt int) error { - u := f.Resolver.BuildURL(functionName, namespace, probePath, true) - - r, _ := http.NewRequest(http.MethodGet, u, nil) - r.Header.Set("User-Agent", "com.openfaas.gateway/probe") - - resp, err := http.DefaultClient.Do(r) - if err != nil { - return err - } - - log.Printf("[Probe] %s => %d", u, resp.StatusCode) - - if resp.StatusCode == http.StatusOK { - return nil - } - return fmt.Errorf("failed with status: %s", resp.Status) - }, "Probe", maxCount, pollInterval) - - if err != nil { - return FunctionProbeResult{ - Error: err, - Available: false, - Found: true, - Duration: time.Since(start), - Updated: time.Now(), - } - } - - return FunctionProbeResult{ - Error: nil, - Available: true, - Found: true, - Duration: time.Since(start), - Updated: time.Now(), - } -} diff --git a/gateway/types/readconfig.go b/gateway/types/readconfig.go index aea31521..4b93dbe3 100644 --- a/gateway/types/readconfig.go +++ b/gateway/types/readconfig.go @@ -8,7 +8,6 @@ import ( "net/url" "os" "strconv" - "strings" "time" ) @@ -129,9 +128,6 @@ func (ReadConfig) Read(hasEnv HasEnv) (*GatewayConfig, error) { cfg.PrometheusHost = prometheusHost } - cfg.DirectFunctions = parseBoolValue(hasEnv.Getenv("direct_functions")) - cfg.DirectFunctionsSuffix = hasEnv.Getenv("direct_functions_suffix") - cfg.UseBasicAuth = parseBoolValue(hasEnv.Getenv("basic_auth")) secretPath := hasEnv.Getenv("secret_mount_path") @@ -169,14 +165,6 @@ func (ReadConfig) Read(hasEnv HasEnv) (*GatewayConfig, error) { cfg.Namespace = hasEnv.Getenv("function_namespace") - if len(cfg.DirectFunctionsSuffix) > 0 && len(cfg.Namespace) > 0 { - if strings.HasPrefix(cfg.DirectFunctionsSuffix, cfg.Namespace) == false { - return nil, fmt.Errorf("function_namespace must be a sub-string of direct_functions_suffix") - } - } - - cfg.ProbeFunctions = parseBoolValue(hasEnv.Getenv("probe_functions")) - return &cfg, nil } @@ -216,12 +204,6 @@ type GatewayConfig struct { // Port to connect to Prometheus. PrometheusPort int - // If set to true we will access upstream functions directly rather than through the upstream provider - DirectFunctions bool - - // If set this will be used to resolve functions directly - DirectFunctionsSuffix string - // If set, reads secrets from file-system for enabling basic auth. UseBasicAuth bool @@ -245,9 +227,6 @@ type GatewayConfig struct { // Namespace for endpoints Namespace string - - // ProbeFunctions requires the gateway to probe the health endpoint of a function before invoking it - ProbeFunctions bool } // UseNATS Use NATSor not diff --git a/gateway/types/readconfig_test.go b/gateway/types/readconfig_test.go index acada92d..69131bcb 100644 --- a/gateway/types/readconfig_test.go +++ b/gateway/types/readconfig_test.go @@ -38,16 +38,6 @@ func TestRead_UseExternalProvider_Defaults(t *testing.T) { t.Fail() } - if config.DirectFunctions != false { - t.Log("Default for DirectFunctions should be false") - t.Fail() - } - - if len(config.DirectFunctionsSuffix) > 0 { - t.Log("Default for DirectFunctionsSuffix should be empty") - t.Fail() - } - if len(config.Namespace) > 0 { t.Log("Default for Namespace should be empty") t.Fail() @@ -89,86 +79,6 @@ func TestRead_NamespaceOverrideAgressWithFunctionSuffix_Valid(t *testing.T) { } } -func TestRead_NamespaceOverrideAgressWithFunctionSuffix_Invalid(t *testing.T) { - - defaults := NewEnvBucket() - readConfig := ReadConfig{} - - defaults.Setenv("direct_functions", "true") - wantSuffix := "openfaas-fn.cluster.local.svc." - - defaults.Setenv("direct_functions_suffix", wantSuffix) - defaults.Setenv("function_namespace", "fn") - - _, err := readConfig.Read(defaults) - - if err == nil { - t.Logf("Expected an error because function_namespace should be a sub-string of direct_functions_suffix") - t.Fail() - return - } - - want := "function_namespace must be a sub-string of direct_functions_suffix" - - if want != err.Error() { - t.Logf("Error want: %s, got: %s", want, err.Error()) - t.Fail() - } - -} - -func TestRead_DirectFunctionsOverride(t *testing.T) { - defaults := NewEnvBucket() - readConfig := ReadConfig{} - defaults.Setenv("direct_functions", "true") - wantSuffix := "openfaas-fn.cluster.local.svc." - defaults.Setenv("direct_functions_suffix", wantSuffix) - - config, _ := readConfig.Read(defaults) - - if config.DirectFunctions != true { - t.Logf("DirectFunctions should be true, got: %v", config.DirectFunctions) - t.Fail() - } - - if config.DirectFunctionsSuffix != wantSuffix { - t.Logf("DirectFunctionsSuffix want: %s, got: %s", wantSuffix, config.DirectFunctionsSuffix) - t.Fail() - } -} - -func TestRead_ProbeFunctions_Default(t *testing.T) { - defaults := NewEnvBucket() - readConfig := ReadConfig{} - defaults.Setenv("probe_functions", "") - - want := false - - config, _ := readConfig.Read(defaults) - - got := config.ProbeFunctions - if want != got { - t.Logf("ProbeFunctions want %v, but got %v", want, got) - t.Fail() - } -} - -func TestRead_ProbeFunctions_Enabled(t *testing.T) { - defaults := NewEnvBucket() - readConfig := ReadConfig{} - defaults.Setenv("probe_functions", "true") - - want := true - - config, _ := readConfig.Read(defaults) - - got := config.ProbeFunctions - if want != got { - t.Logf("ProbeFunctions want %v, but got %v", want, got) - t.Fail() - } -} - func TestRead_ScaleZeroDefaultAndOverride(t *testing.T) { defaults := NewEnvBucket() readConfig := ReadConfig{}