Read config values from environment for max_conns tuning

- max_conns / idle / per host are now read from env-vars and have
defaults set to 1024 for both values
- logging / metrics are collected in the client transaction
rather than via defer (this may impact throughput)
- function cache moved to use RWMutex to try to improve latency
around locking when updating cache
- logging message added to show latency in running GetReplicas
because this was observed to increase in a linear fashion under
high concurrency
- changes tested against 3-node bare-metal 1.13 K8s cluster
with kubeadm

Signed-off-by: Alex Ellis (VMware) <alexellis2@gmail.com>
This commit is contained in:
Alex Ellis (VMware) 2019-01-22 15:59:32 +00:00 committed by Alex Ellis
parent 52c27e227a
commit 299e5a5933
7 changed files with 98 additions and 22 deletions

View File

@ -61,11 +61,11 @@ func MakeForwardingProxyHandler(proxy *types.HTTPClientReverseProxy, notifiers [
log.Printf("error with upstream request to: %s, %s\n", requestURL, err.Error())
}
defer func() {
for _, notifier := range notifiers {
notifier.Notify(r.Method, requestURL, originalURL, statusCode, seconds)
}
}()
// defer func() {
for _, notifier := range notifiers {
notifier.Notify(r.Method, requestURL, originalURL, statusCode, seconds)
}
// }()
}
}

View File

@ -6,6 +6,8 @@ package plugin
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net"
"net/http"
@ -13,10 +15,6 @@ import (
"strconv"
"time"
"fmt"
"io/ioutil"
"github.com/openfaas/faas-provider/auth"
"github.com/openfaas/faas/gateway/requests"
"github.com/openfaas/faas/gateway/scaling"
@ -62,6 +60,8 @@ type ScaleServiceRequest struct {
// GetReplicas replica count for function
func (s ExternalServiceQuery) GetReplicas(serviceName string) (scaling.ServiceQueryResponse, error) {
start := time.Now()
var err error
var emptyServiceQueryResponse scaling.ServiceQueryResponse
@ -92,6 +92,7 @@ func (s ExternalServiceQuery) GetReplicas(serviceName string) (scaling.ServiceQu
log.Println(urlPath, err)
}
} else {
log.Printf("GetReplicas took: %fs", time.Since(start).Seconds())
return emptyServiceQueryResponse, fmt.Errorf("server returned non-200 status code (%d) for function, %s", res.StatusCode, serviceName)
}
}
@ -115,6 +116,8 @@ func (s ExternalServiceQuery) GetReplicas(serviceName string) (scaling.ServiceQu
}
}
log.Printf("GetReplicas took: %fs", time.Since(start).Seconds())
return scaling.ServiceQueryResponse{
Replicas: function.Replicas,
MaxReplicas: maxReplicas,

View File

@ -25,7 +25,7 @@ func (fm *FunctionMeta) Expired(expiry time.Duration) bool {
type FunctionCache struct {
Cache map[string]*FunctionMeta
Expiry time.Duration
Sync sync.Mutex
Sync sync.RWMutex
}
// Set replica count for functionName
@ -37,23 +37,22 @@ func (fc *FunctionCache) Set(functionName string, serviceQueryResponse ServiceQu
fc.Cache[functionName] = &FunctionMeta{}
}
entry := fc.Cache[functionName]
entry.LastRefresh = time.Now()
entry.ServiceQueryResponse = serviceQueryResponse
fc.Cache[functionName].LastRefresh = time.Now()
fc.Cache[functionName].ServiceQueryResponse = serviceQueryResponse
// entry.LastRefresh = time.Now()
// entry.ServiceQueryResponse = serviceQueryResponse
}
// Get replica count for functionName
func (fc *FunctionCache) Get(functionName string) (ServiceQueryResponse, bool) {
fc.Sync.Lock()
defer fc.Sync.Unlock()
replicas := ServiceQueryResponse{
AvailableReplicas: 0,
}
hit := false
fc.Sync.RLock()
defer fc.Sync.RUnlock()
if val, exists := fc.Cache[functionName]; exists {
replicas = val.ServiceQueryResponse
hit = !val.Expired(fc.Expiry)

View File

@ -57,7 +57,7 @@ func main() {
exporter.StartServiceWatcher(*config.FunctionsProviderURL, metricsOptions, "func", servicePollInterval)
metrics.RegisterExporter(exporter)
reverseProxy := types.NewHTTPClientReverseProxy(config.FunctionsProviderURL, config.UpstreamTimeout)
reverseProxy := types.NewHTTPClientReverseProxy(config.FunctionsProviderURL, config.UpstreamTimeout, config.MaxIdleConns, config.MaxIdleConnsPerHost)
loggingNotifier := handlers.LoggingNotifier{}
prometheusNotifier := handlers.PrometheusFunctionNotifier{

View File

@ -11,7 +11,7 @@ import (
)
// NewHTTPClientReverseProxy proxies to an upstream host through the use of a http.Client
func NewHTTPClientReverseProxy(baseURL *url.URL, timeout time.Duration) *HTTPClientReverseProxy {
func NewHTTPClientReverseProxy(baseURL *url.URL, timeout time.Duration, maxIdleConns, maxIdleConnsPerHost int) *HTTPClientReverseProxy {
h := HTTPClientReverseProxy{
BaseURL: baseURL,
Timeout: timeout,
@ -23,6 +23,13 @@ func NewHTTPClientReverseProxy(baseURL *url.URL, timeout time.Duration) *HTTPCli
return http.ErrUseLastResponse
}
// These overrides for the default client enable re-use of connections and prevent
// CoreDNS from rate limiting the gateway under high traffic
//
// See also two similar projects where this value was updated:
// https://github.com/prometheus/prometheus/pull/3592
// https://github.com/minio/minio/pull/5860
// Taken from http.DefaultTransport in Go 1.11
h.Client.Transport = &http.Transport{
Proxy: http.ProxyFromEnvironment,
@ -31,8 +38,8 @@ func NewHTTPClientReverseProxy(baseURL *url.URL, timeout time.Duration) *HTTPCli
KeepAlive: timeout,
DualStack: true,
}).DialContext,
MaxIdleConns: 20000, // Overriden via https://github.com/errordeveloper/prometheus/commit/1f74477646aea93bebb7c098affa8e05132abb0c
MaxIdleConnsPerHost: 1024, // Overriden via https://github.com/minio/minio/pull/5860
MaxIdleConns: maxIdleConns,
MaxIdleConnsPerHost: maxIdleConnsPerHost,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,

View File

@ -114,6 +114,29 @@ func (ReadConfig) Read(hasEnv HasEnv) GatewayConfig {
cfg.SecretMountPath = secretPath
cfg.ScaleFromZero = parseBoolValue(hasEnv.Getenv("scale_from_zero"))
cfg.MaxIdleConns = 1024
cfg.MaxIdleConnsPerHost = 1024
maxIdleConns := hasEnv.Getenv("max_idle_conns")
if len(maxIdleConns) > 0 {
val, err := strconv.Atoi(maxIdleConns)
if err != nil {
log.Println("Invalid value for max_idle_conns")
} else {
cfg.MaxIdleConns = val
}
}
maxIdleConnsPerHost := hasEnv.Getenv("max_idle_conns_per_host")
if len(maxIdleConnsPerHost) > 0 {
val, err := strconv.Atoi(maxIdleConnsPerHost)
if err != nil {
log.Println("Invalid value for max_idle_conns_per_host")
} else {
cfg.MaxIdleConnsPerHost = val
}
}
return cfg
}
@ -155,8 +178,13 @@ type GatewayConfig struct {
// SecretMountPath specifies where to read secrets from for embedded basic auth
SecretMountPath string
// Enable the gateway to scale any service from 0 replicas to its configured "min replicas"
ScaleFromZero bool
MaxIdleConns int
MaxIdleConnsPerHost int
}
// UseNATS Use NATSor not

View File

@ -4,6 +4,7 @@
package types
import (
"fmt"
"testing"
"time"
)
@ -260,3 +261,41 @@ func TestRead_BasicAuth_SetTrue(t *testing.T) {
t.Fail()
}
}
func TestRead_MaxIdleConnsDefaults(t *testing.T) {
defaults := NewEnvBucket()
readConfig := ReadConfig{}
config := readConfig.Read(defaults)
if config.MaxIdleConns != 1024 {
t.Logf("config.MaxIdleConns, want: %d, got: %d\n", 1024, config.MaxIdleConns)
t.Fail()
}
if config.MaxIdleConnsPerHost != 1024 {
t.Logf("config.MaxIdleConnsPerHost, want: %d, got: %d\n", 1024, config.MaxIdleConnsPerHost)
t.Fail()
}
}
func TestRead_MaxIdleConns_Override(t *testing.T) {
defaults := NewEnvBucket()
readConfig := ReadConfig{}
defaults.Setenv("max_idle_conns", fmt.Sprintf("%d", 100))
defaults.Setenv("max_idle_conns_per_host", fmt.Sprintf("%d", 2))
config := readConfig.Read(defaults)
if config.MaxIdleConns != 100 {
t.Logf("config.MaxIdleConns, want: %d, got: %d\n", 100, config.MaxIdleConns)
t.Fail()
}
if config.MaxIdleConnsPerHost != 2 {
t.Logf("config.MaxIdleConnsPerHost, want: %d, got: %d\n", 2, config.MaxIdleConnsPerHost)
t.Fail()
}
}