diff --git a/gateway/Dockerfile b/gateway/Dockerfile index 7fc0765c..f745912f 100644 --- a/gateway/Dockerfile +++ b/gateway/Dockerfile @@ -29,7 +29,6 @@ COPY types types COPY plugin plugin COPY version version COPY scaling scaling -COPY probing probing COPY pkg pkg COPY main.go . diff --git a/gateway/handlers/probe_handler.go b/gateway/handlers/probe_handler.go deleted file mode 100644 index 47f83c8e..00000000 --- a/gateway/handlers/probe_handler.go +++ /dev/null @@ -1,45 +0,0 @@ -package handlers - -import ( - "fmt" - "net/http" - - "golang.org/x/sync/singleflight" - - "github.com/openfaas/faas/gateway/pkg/middleware" - "github.com/openfaas/faas/gateway/probing" -) - -func MakeProbeHandler(prober probing.FunctionProber, cache probing.ProbeCacher, resolver middleware.BaseURLResolver, next http.HandlerFunc, defaultNamespace string) http.HandlerFunc { - - group := singleflight.Group{} - - return func(w http.ResponseWriter, r *http.Request) { - functionName, namespace := middleware.GetNamespace(defaultNamespace, middleware.GetServiceName(r.URL.String())) - - key := fmt.Sprintf("Probe-%s.%s", functionName, namespace) - res, _, _ := group.Do(key, func() (interface{}, error) { - - cached, hit := cache.Get(functionName, namespace) - var probeResult probing.FunctionProbeResult - if hit && cached != nil && cached.Available { - probeResult = *cached - } else { - probeResult = prober.Probe(functionName, namespace) - cache.Set(functionName, namespace, &probeResult) - } - - return probeResult, nil - }) - - fnRes := res.(probing.FunctionProbeResult) - - if !fnRes.Available { - http.Error(w, fmt.Sprintf("unable to probe function endpoint %s", fnRes.Error), - http.StatusServiceUnavailable) - return - } - - next(w, r) - } -} diff --git a/gateway/main.go b/gateway/main.go index 9e776df3..7d7ad756 100644 --- a/gateway/main.go +++ b/gateway/main.go @@ -15,7 +15,6 @@ import ( "github.com/openfaas/faas/gateway/metrics" "github.com/openfaas/faas/gateway/pkg/middleware" "github.com/openfaas/faas/gateway/plugin" - "github.com/openfaas/faas/gateway/probing" "github.com/openfaas/faas/gateway/scaling" "github.com/openfaas/faas/gateway/types" "github.com/openfaas/faas/gateway/version" @@ -97,16 +96,8 @@ func main() { nilURLTransformer := middleware.TransparentURLPathTransformer{} trimURLTransformer := middleware.FunctionPrefixTrimmingURLPathTransformer{} - if config.DirectFunctions { - functionURLResolver = middleware.FunctionAsHostBaseURLResolver{ - FunctionSuffix: config.DirectFunctionsSuffix, - FunctionNamespace: config.Namespace, - } - functionURLTransformer = trimURLTransformer - } else { - functionURLResolver = urlResolver - functionURLTransformer = nilURLTransformer - } + functionURLResolver = urlResolver + functionURLTransformer = nilURLTransformer var serviceAuthInjector middleware.AuthInjector @@ -155,13 +146,6 @@ func main() { functionProxy := faasHandlers.Proxy - if config.ProbeFunctions { - prober := probing.NewFunctionProber(cachedFunctionQuery, functionURLResolver) - // Default of 5 seconds between refreshing probes for function invocations - probeCache := probing.NewProbeCache(time.Second * 5) - functionProxy = handlers.MakeProbeHandler(prober, probeCache, functionURLResolver, functionProxy, config.Namespace) - } - if config.ScaleFromZero { scalingFunctionCache := scaling.NewFunctionCache(scalingConfig.CacheExpiry) scaler := scaling.NewFunctionScaler(scalingConfig, scalingFunctionCache) @@ -169,7 +153,9 @@ func main() { } if config.UseNATS() { - log.Println("Async enabled: Using NATS Streaming.") + log.Println("Async enabled: Using NATS Streaming") + log.Println("Deprecation Notice: NATS Streaming is no longer maintained and won't receive updates from June 2023") + maxReconnect := 60 interval := time.Second * 2 diff --git a/gateway/probing/cache.go b/gateway/probing/cache.go deleted file mode 100644 index d7d02ada..00000000 --- a/gateway/probing/cache.go +++ /dev/null @@ -1,58 +0,0 @@ -// Copyright (c) OpenFaaS Author(s). All rights reserved. -// Licensed under the MIT license. See LICENSE file in the project root for full license information. - -package probing - -import ( - "fmt" - "sync" - "time" -) - -// ProbeCacher queries functions and caches the results -type ProbeCacher interface { - Set(functionName, namespace string, result *FunctionProbeResult) - Get(functionName, namespace string) (result *FunctionProbeResult, hit bool) -} - -// ProbeCache provides a cache of Probe replica counts -type ProbeCache struct { - Cache map[string]*FunctionProbeResult - Expiry time.Duration - Sync sync.RWMutex -} - -// NewProbeCache creates a function cache to query function metadata -func NewProbeCache(cacheExpiry time.Duration) ProbeCacher { - return &ProbeCache{ - Cache: make(map[string]*FunctionProbeResult), - Expiry: cacheExpiry, - } -} - -// Set replica count for functionName -func (fc *ProbeCache) Set(functionName, namespace string, result *FunctionProbeResult) { - fc.Sync.Lock() - defer fc.Sync.Unlock() - - fc.Cache[functionName+"."+namespace] = result -} - -func (fc *ProbeCache) Get(functionName, namespace string) (*FunctionProbeResult, bool) { - - result := &FunctionProbeResult{ - Available: false, - Error: fmt.Errorf("unavailable in cache"), - } - - hit := false - fc.Sync.RLock() - defer fc.Sync.RUnlock() - - if val, exists := fc.Cache[functionName+"."+namespace]; exists { - hit = val.Expired(fc.Expiry) == false - result = val - } - - return result, hit -} diff --git a/gateway/probing/prober.go b/gateway/probing/prober.go deleted file mode 100644 index b3f1aa3e..00000000 --- a/gateway/probing/prober.go +++ /dev/null @@ -1,116 +0,0 @@ -// Copyright (c) OpenFaaS Author(s). All rights reserved. -// Licensed under the MIT license. See LICENSE file in the project root for full license information. - -package probing - -import ( - "fmt" - "log" - "net/http" - "time" - - "github.com/openfaas/faas/gateway/pkg/middleware" - "github.com/openfaas/faas/gateway/scaling" - "github.com/openfaas/faas/gateway/types" -) - -// NewFunctionProber create a new scaler with the specified -// ScalingConfig -func NewFunctionProber(functionQuery scaling.FunctionQuery, resolver middleware.BaseURLResolver) FunctionProber { - // if directFunctions { - return &FunctionHTTPProber{ - Query: functionQuery, - Resolver: resolver, - } -} - -// FunctionHTTPProber probes a function's health endpoint -type FunctionHTTPProber struct { - Query scaling.FunctionQuery - Resolver middleware.BaseURLResolver - DirectFunctions bool -} - -type FunctionNonProber struct { -} - -func (f *FunctionNonProber) Probe(functionName, namespace string) FunctionProbeResult { - return FunctionProbeResult{ - Found: true, - Available: true, - } -} - -type FunctionProber interface { - Probe(functionName, namespace string) FunctionProbeResult -} - -// FunctionProbeResult holds the result of scaling from zero -type FunctionProbeResult struct { - Available bool - Error error - Found bool - Duration time.Duration - Updated time.Time -} - -// Expired find out whether the cache item has expired with -// the given expiry duration from when it was stored. -func (res *FunctionProbeResult) Expired(expiry time.Duration) bool { - return time.Now().After(res.Updated.Add(expiry)) -} - -// Scale scales a function from zero replicas to 1 or the value set in -// the minimum replicas metadata -func (f *FunctionHTTPProber) Probe(functionName, namespace string) FunctionProbeResult { - start := time.Now() - - cachedResponse, _ := f.Query.Get(functionName, namespace) - probePath := "/_/health" - - if cachedResponse.Annotations != nil { - if v, ok := (*cachedResponse.Annotations)["com.openfaas.http.path"]; ok && len(v) > 0 { - probePath = v - } - } - - maxCount := 10 - pollInterval := time.Millisecond * 50 - - err := types.Retry(func(attempt int) error { - u := f.Resolver.BuildURL(functionName, namespace, probePath, true) - - r, _ := http.NewRequest(http.MethodGet, u, nil) - r.Header.Set("User-Agent", "com.openfaas.gateway/probe") - - resp, err := http.DefaultClient.Do(r) - if err != nil { - return err - } - - log.Printf("[Probe] %s => %d", u, resp.StatusCode) - - if resp.StatusCode == http.StatusOK { - return nil - } - return fmt.Errorf("failed with status: %s", resp.Status) - }, "Probe", maxCount, pollInterval) - - if err != nil { - return FunctionProbeResult{ - Error: err, - Available: false, - Found: true, - Duration: time.Since(start), - Updated: time.Now(), - } - } - - return FunctionProbeResult{ - Error: nil, - Available: true, - Found: true, - Duration: time.Since(start), - Updated: time.Now(), - } -} diff --git a/gateway/types/readconfig.go b/gateway/types/readconfig.go index aea31521..4b93dbe3 100644 --- a/gateway/types/readconfig.go +++ b/gateway/types/readconfig.go @@ -8,7 +8,6 @@ import ( "net/url" "os" "strconv" - "strings" "time" ) @@ -129,9 +128,6 @@ func (ReadConfig) Read(hasEnv HasEnv) (*GatewayConfig, error) { cfg.PrometheusHost = prometheusHost } - cfg.DirectFunctions = parseBoolValue(hasEnv.Getenv("direct_functions")) - cfg.DirectFunctionsSuffix = hasEnv.Getenv("direct_functions_suffix") - cfg.UseBasicAuth = parseBoolValue(hasEnv.Getenv("basic_auth")) secretPath := hasEnv.Getenv("secret_mount_path") @@ -169,14 +165,6 @@ func (ReadConfig) Read(hasEnv HasEnv) (*GatewayConfig, error) { cfg.Namespace = hasEnv.Getenv("function_namespace") - if len(cfg.DirectFunctionsSuffix) > 0 && len(cfg.Namespace) > 0 { - if strings.HasPrefix(cfg.DirectFunctionsSuffix, cfg.Namespace) == false { - return nil, fmt.Errorf("function_namespace must be a sub-string of direct_functions_suffix") - } - } - - cfg.ProbeFunctions = parseBoolValue(hasEnv.Getenv("probe_functions")) - return &cfg, nil } @@ -216,12 +204,6 @@ type GatewayConfig struct { // Port to connect to Prometheus. PrometheusPort int - // If set to true we will access upstream functions directly rather than through the upstream provider - DirectFunctions bool - - // If set this will be used to resolve functions directly - DirectFunctionsSuffix string - // If set, reads secrets from file-system for enabling basic auth. UseBasicAuth bool @@ -245,9 +227,6 @@ type GatewayConfig struct { // Namespace for endpoints Namespace string - - // ProbeFunctions requires the gateway to probe the health endpoint of a function before invoking it - ProbeFunctions bool } // UseNATS Use NATSor not diff --git a/gateway/types/readconfig_test.go b/gateway/types/readconfig_test.go index acada92d..69131bcb 100644 --- a/gateway/types/readconfig_test.go +++ b/gateway/types/readconfig_test.go @@ -38,16 +38,6 @@ func TestRead_UseExternalProvider_Defaults(t *testing.T) { t.Fail() } - if config.DirectFunctions != false { - t.Log("Default for DirectFunctions should be false") - t.Fail() - } - - if len(config.DirectFunctionsSuffix) > 0 { - t.Log("Default for DirectFunctionsSuffix should be empty") - t.Fail() - } - if len(config.Namespace) > 0 { t.Log("Default for Namespace should be empty") t.Fail() @@ -89,86 +79,6 @@ func TestRead_NamespaceOverrideAgressWithFunctionSuffix_Valid(t *testing.T) { } } -func TestRead_NamespaceOverrideAgressWithFunctionSuffix_Invalid(t *testing.T) { - - defaults := NewEnvBucket() - readConfig := ReadConfig{} - - defaults.Setenv("direct_functions", "true") - wantSuffix := "openfaas-fn.cluster.local.svc." - - defaults.Setenv("direct_functions_suffix", wantSuffix) - defaults.Setenv("function_namespace", "fn") - - _, err := readConfig.Read(defaults) - - if err == nil { - t.Logf("Expected an error because function_namespace should be a sub-string of direct_functions_suffix") - t.Fail() - return - } - - want := "function_namespace must be a sub-string of direct_functions_suffix" - - if want != err.Error() { - t.Logf("Error want: %s, got: %s", want, err.Error()) - t.Fail() - } - -} - -func TestRead_DirectFunctionsOverride(t *testing.T) { - defaults := NewEnvBucket() - readConfig := ReadConfig{} - defaults.Setenv("direct_functions", "true") - wantSuffix := "openfaas-fn.cluster.local.svc." - defaults.Setenv("direct_functions_suffix", wantSuffix) - - config, _ := readConfig.Read(defaults) - - if config.DirectFunctions != true { - t.Logf("DirectFunctions should be true, got: %v", config.DirectFunctions) - t.Fail() - } - - if config.DirectFunctionsSuffix != wantSuffix { - t.Logf("DirectFunctionsSuffix want: %s, got: %s", wantSuffix, config.DirectFunctionsSuffix) - t.Fail() - } -} - -func TestRead_ProbeFunctions_Default(t *testing.T) { - defaults := NewEnvBucket() - readConfig := ReadConfig{} - defaults.Setenv("probe_functions", "") - - want := false - - config, _ := readConfig.Read(defaults) - - got := config.ProbeFunctions - if want != got { - t.Logf("ProbeFunctions want %v, but got %v", want, got) - t.Fail() - } -} - -func TestRead_ProbeFunctions_Enabled(t *testing.T) { - defaults := NewEnvBucket() - readConfig := ReadConfig{} - defaults.Setenv("probe_functions", "true") - - want := true - - config, _ := readConfig.Read(defaults) - - got := config.ProbeFunctions - if want != got { - t.Logf("ProbeFunctions want %v, but got %v", want, got) - t.Fail() - } -} - func TestRead_ScaleZeroDefaultAndOverride(t *testing.T) { defaults := NewEnvBucket() readConfig := ReadConfig{}