From 299e5a59331f8c2ff3d7fcf71283f739e797e9fe Mon Sep 17 00:00:00 2001 From: "Alex Ellis (VMware)" Date: Tue, 22 Jan 2019 15:59:32 +0000 Subject: [PATCH] Read config values from environment for max_conns tuning - max_conns / idle / per host are now read from env-vars and have defaults set to 1024 for both values - logging / metrics are collected in the client transaction rather than via defer (this may impact throughput) - function cache moved to use RWMutex to try to improve latency around locking when updating cache - logging message added to show latency in running GetReplicas because this was observed to increase in a linear fashion under high concurrency - changes tested against 3-node bare-metal 1.13 K8s cluster with kubeadm Signed-off-by: Alex Ellis (VMware) --- gateway/handlers/forwarding_proxy.go | 10 +++---- gateway/plugin/external.go | 11 +++++--- gateway/scaling/function_cache.go | 17 ++++++------ gateway/server.go | 2 +- gateway/types/proxy_client.go | 13 +++++++--- gateway/types/readconfig.go | 28 ++++++++++++++++++++ gateway/types/readconfig_test.go | 39 ++++++++++++++++++++++++++++ 7 files changed, 98 insertions(+), 22 deletions(-) diff --git a/gateway/handlers/forwarding_proxy.go b/gateway/handlers/forwarding_proxy.go index 532694f4..04a76e31 100644 --- a/gateway/handlers/forwarding_proxy.go +++ b/gateway/handlers/forwarding_proxy.go @@ -61,11 +61,11 @@ func MakeForwardingProxyHandler(proxy *types.HTTPClientReverseProxy, notifiers [ log.Printf("error with upstream request to: %s, %s\n", requestURL, err.Error()) } - defer func() { - for _, notifier := range notifiers { - notifier.Notify(r.Method, requestURL, originalURL, statusCode, seconds) - } - }() + // defer func() { + for _, notifier := range notifiers { + notifier.Notify(r.Method, requestURL, originalURL, statusCode, seconds) + } + // }() } } diff --git a/gateway/plugin/external.go b/gateway/plugin/external.go index 3de0d5a8..2ad3d3b7 100644 --- a/gateway/plugin/external.go +++ b/gateway/plugin/external.go @@ -6,6 +6,8 @@ package plugin import ( "bytes" "encoding/json" + "fmt" + "io/ioutil" "log" "net" "net/http" @@ -13,10 +15,6 @@ import ( "strconv" "time" - "fmt" - - "io/ioutil" - "github.com/openfaas/faas-provider/auth" "github.com/openfaas/faas/gateway/requests" "github.com/openfaas/faas/gateway/scaling" @@ -62,6 +60,8 @@ type ScaleServiceRequest struct { // GetReplicas replica count for function func (s ExternalServiceQuery) GetReplicas(serviceName string) (scaling.ServiceQueryResponse, error) { + start := time.Now() + var err error var emptyServiceQueryResponse scaling.ServiceQueryResponse @@ -92,6 +92,7 @@ func (s ExternalServiceQuery) GetReplicas(serviceName string) (scaling.ServiceQu log.Println(urlPath, err) } } else { + log.Printf("GetReplicas took: %fs", time.Since(start).Seconds()) return emptyServiceQueryResponse, fmt.Errorf("server returned non-200 status code (%d) for function, %s", res.StatusCode, serviceName) } } @@ -115,6 +116,8 @@ func (s ExternalServiceQuery) GetReplicas(serviceName string) (scaling.ServiceQu } } + log.Printf("GetReplicas took: %fs", time.Since(start).Seconds()) + return scaling.ServiceQueryResponse{ Replicas: function.Replicas, MaxReplicas: maxReplicas, diff --git a/gateway/scaling/function_cache.go b/gateway/scaling/function_cache.go index 61b4062c..c3afa877 100644 --- a/gateway/scaling/function_cache.go +++ b/gateway/scaling/function_cache.go @@ -25,7 +25,7 @@ func (fm *FunctionMeta) Expired(expiry time.Duration) bool { type FunctionCache struct { Cache map[string]*FunctionMeta Expiry time.Duration - Sync sync.Mutex + Sync sync.RWMutex } // Set replica count for functionName @@ -37,23 +37,22 @@ func (fc *FunctionCache) Set(functionName string, serviceQueryResponse ServiceQu fc.Cache[functionName] = &FunctionMeta{} } - entry := fc.Cache[functionName] - entry.LastRefresh = time.Now() - entry.ServiceQueryResponse = serviceQueryResponse - + fc.Cache[functionName].LastRefresh = time.Now() + fc.Cache[functionName].ServiceQueryResponse = serviceQueryResponse + // entry.LastRefresh = time.Now() + // entry.ServiceQueryResponse = serviceQueryResponse } // Get replica count for functionName func (fc *FunctionCache) Get(functionName string) (ServiceQueryResponse, bool) { - - fc.Sync.Lock() - defer fc.Sync.Unlock() - replicas := ServiceQueryResponse{ AvailableReplicas: 0, } hit := false + fc.Sync.RLock() + defer fc.Sync.RUnlock() + if val, exists := fc.Cache[functionName]; exists { replicas = val.ServiceQueryResponse hit = !val.Expired(fc.Expiry) diff --git a/gateway/server.go b/gateway/server.go index 8201bb40..d0d87acd 100644 --- a/gateway/server.go +++ b/gateway/server.go @@ -57,7 +57,7 @@ func main() { exporter.StartServiceWatcher(*config.FunctionsProviderURL, metricsOptions, "func", servicePollInterval) metrics.RegisterExporter(exporter) - reverseProxy := types.NewHTTPClientReverseProxy(config.FunctionsProviderURL, config.UpstreamTimeout) + reverseProxy := types.NewHTTPClientReverseProxy(config.FunctionsProviderURL, config.UpstreamTimeout, config.MaxIdleConns, config.MaxIdleConnsPerHost) loggingNotifier := handlers.LoggingNotifier{} prometheusNotifier := handlers.PrometheusFunctionNotifier{ diff --git a/gateway/types/proxy_client.go b/gateway/types/proxy_client.go index 97bf0125..566880b2 100644 --- a/gateway/types/proxy_client.go +++ b/gateway/types/proxy_client.go @@ -11,7 +11,7 @@ import ( ) // NewHTTPClientReverseProxy proxies to an upstream host through the use of a http.Client -func NewHTTPClientReverseProxy(baseURL *url.URL, timeout time.Duration) *HTTPClientReverseProxy { +func NewHTTPClientReverseProxy(baseURL *url.URL, timeout time.Duration, maxIdleConns, maxIdleConnsPerHost int) *HTTPClientReverseProxy { h := HTTPClientReverseProxy{ BaseURL: baseURL, Timeout: timeout, @@ -23,6 +23,13 @@ func NewHTTPClientReverseProxy(baseURL *url.URL, timeout time.Duration) *HTTPCli return http.ErrUseLastResponse } + // These overrides for the default client enable re-use of connections and prevent + // CoreDNS from rate limiting the gateway under high traffic + // + // See also two similar projects where this value was updated: + // https://github.com/prometheus/prometheus/pull/3592 + // https://github.com/minio/minio/pull/5860 + // Taken from http.DefaultTransport in Go 1.11 h.Client.Transport = &http.Transport{ Proxy: http.ProxyFromEnvironment, @@ -31,8 +38,8 @@ func NewHTTPClientReverseProxy(baseURL *url.URL, timeout time.Duration) *HTTPCli KeepAlive: timeout, DualStack: true, }).DialContext, - MaxIdleConns: 20000, // Overriden via https://github.com/errordeveloper/prometheus/commit/1f74477646aea93bebb7c098affa8e05132abb0c - MaxIdleConnsPerHost: 1024, // Overriden via https://github.com/minio/minio/pull/5860 + MaxIdleConns: maxIdleConns, + MaxIdleConnsPerHost: maxIdleConnsPerHost, IdleConnTimeout: 90 * time.Second, TLSHandshakeTimeout: 10 * time.Second, ExpectContinueTimeout: 1 * time.Second, diff --git a/gateway/types/readconfig.go b/gateway/types/readconfig.go index cfe5c712..981efb14 100644 --- a/gateway/types/readconfig.go +++ b/gateway/types/readconfig.go @@ -114,6 +114,29 @@ func (ReadConfig) Read(hasEnv HasEnv) GatewayConfig { cfg.SecretMountPath = secretPath cfg.ScaleFromZero = parseBoolValue(hasEnv.Getenv("scale_from_zero")) + cfg.MaxIdleConns = 1024 + cfg.MaxIdleConnsPerHost = 1024 + + maxIdleConns := hasEnv.Getenv("max_idle_conns") + if len(maxIdleConns) > 0 { + val, err := strconv.Atoi(maxIdleConns) + if err != nil { + log.Println("Invalid value for max_idle_conns") + } else { + cfg.MaxIdleConns = val + } + } + + maxIdleConnsPerHost := hasEnv.Getenv("max_idle_conns_per_host") + if len(maxIdleConnsPerHost) > 0 { + val, err := strconv.Atoi(maxIdleConnsPerHost) + if err != nil { + log.Println("Invalid value for max_idle_conns_per_host") + } else { + cfg.MaxIdleConnsPerHost = val + } + } + return cfg } @@ -155,8 +178,13 @@ type GatewayConfig struct { // SecretMountPath specifies where to read secrets from for embedded basic auth SecretMountPath string + // Enable the gateway to scale any service from 0 replicas to its configured "min replicas" ScaleFromZero bool + + MaxIdleConns int + + MaxIdleConnsPerHost int } // UseNATS Use NATSor not diff --git a/gateway/types/readconfig_test.go b/gateway/types/readconfig_test.go index 9d904e77..d4b5fefc 100644 --- a/gateway/types/readconfig_test.go +++ b/gateway/types/readconfig_test.go @@ -4,6 +4,7 @@ package types import ( + "fmt" "testing" "time" ) @@ -260,3 +261,41 @@ func TestRead_BasicAuth_SetTrue(t *testing.T) { t.Fail() } } + +func TestRead_MaxIdleConnsDefaults(t *testing.T) { + defaults := NewEnvBucket() + + readConfig := ReadConfig{} + + config := readConfig.Read(defaults) + + if config.MaxIdleConns != 1024 { + t.Logf("config.MaxIdleConns, want: %d, got: %d\n", 1024, config.MaxIdleConns) + t.Fail() + } + + if config.MaxIdleConnsPerHost != 1024 { + t.Logf("config.MaxIdleConnsPerHost, want: %d, got: %d\n", 1024, config.MaxIdleConnsPerHost) + t.Fail() + } +} + +func TestRead_MaxIdleConns_Override(t *testing.T) { + defaults := NewEnvBucket() + + readConfig := ReadConfig{} + defaults.Setenv("max_idle_conns", fmt.Sprintf("%d", 100)) + defaults.Setenv("max_idle_conns_per_host", fmt.Sprintf("%d", 2)) + + config := readConfig.Read(defaults) + + if config.MaxIdleConns != 100 { + t.Logf("config.MaxIdleConns, want: %d, got: %d\n", 100, config.MaxIdleConns) + t.Fail() + } + + if config.MaxIdleConnsPerHost != 2 { + t.Logf("config.MaxIdleConnsPerHost, want: %d, got: %d\n", 2, config.MaxIdleConnsPerHost) + t.Fail() + } +}