diff --git a/watchdog/Gopkg.lock b/watchdog/Gopkg.lock index e3c5dc73..581231fc 100644 --- a/watchdog/Gopkg.lock +++ b/watchdog/Gopkg.lock @@ -25,6 +25,14 @@ revision = "c12348ce28de40eed0136aa2b644d0ee0650e56c" version = "v1.0.1" +[[projects]] + branch = "master" + digest = "1:cc8e0046e1076991a51c3aa9ab4827b8bf1bf4d7715cd45a95888663c2d851e3" + name = "github.com/openfaas/faas-middleware" + packages = ["concurrency-limiter"] + pruneopts = "UT" + revision = "6a78c3a94beb2a99d6aa443bca21331653840a23" + [[projects]] digest = "1:ef03fb1dae4d010196652653f00a8002e94c19bcabdc8ca5100a804ffef63a47" name = "github.com/prometheus/client_golang" @@ -70,6 +78,7 @@ analyzer-name = "dep" analyzer-version = 1 input-imports = [ + "github.com/openfaas/faas-middleware/concurrency-limiter", "github.com/prometheus/client_golang/prometheus", "github.com/prometheus/client_golang/prometheus/promauto", "github.com/prometheus/client_golang/prometheus/promhttp", diff --git a/watchdog/README.md b/watchdog/README.md index 3792293a..6fed2001 100644 --- a/watchdog/README.md +++ b/watchdog/README.md @@ -102,6 +102,7 @@ The watchdog can be configured through environmental variables. You must always | `exec_timeout` | Hard timeout for process exec'd for each incoming request (in seconds). Disabled if set to 0 | | `write_debug` | Write all output, error messages, and additional information to the logs. Default is false | | `combine_output` | True by default - combines stdout/stderr in function response, when set to false `stderr` is written to the container logs and stdout is used for function response | +| `max_inflight` | Limit the maximum number of requests in flight | ## Advanced / tuning diff --git a/watchdog/handler.go b/watchdog/handler.go index ac487b8d..61d6618a 100644 --- a/watchdog/handler.go +++ b/watchdog/handler.go @@ -17,6 +17,8 @@ import ( "sync/atomic" "time" + limiter "github.com/openfaas/faas-middleware/concurrency-limiter" + "github.com/openfaas/faas/watchdog/types" ) @@ -294,8 +296,8 @@ func makeHealthHandler() func(http.ResponseWriter, *http.Request) { } } -func makeRequestHandler(config *WatchdogConfig) func(http.ResponseWriter, *http.Request) { - return func(w http.ResponseWriter, r *http.Request) { +func makeRequestHandler(config *WatchdogConfig) http.HandlerFunc { + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { switch r.Method { case http.MethodPost, @@ -309,5 +311,6 @@ func makeRequestHandler(config *WatchdogConfig) func(http.ResponseWriter, *http. w.WriteHeader(http.StatusMethodNotAllowed) } - } + }) + return limiter.NewConcurrencyLimiter(handler, config.maxInflight).ServeHTTP } diff --git a/watchdog/metrics/metrics.go b/watchdog/metrics/metrics.go index d5283d96..efdda8a6 100644 --- a/watchdog/metrics/metrics.go +++ b/watchdog/metrics/metrics.go @@ -59,7 +59,7 @@ func (m *MetricsServer) Serve(cancel chan bool) { // InstrumentHandler returns a handler which records HTTP requests // as they are made -func InstrumentHandler(next http.HandlerFunc, _http Http) http.HandlerFunc { +func InstrumentHandler(next http.Handler, _http Http) http.HandlerFunc { return promhttp.InstrumentHandlerCounter(_http.RequestsTotal, promhttp.InstrumentHandlerDuration(_http.RequestDurationHistogram, next)) } diff --git a/watchdog/readconfig.go b/watchdog/readconfig.go index 88d950c1..fc9321e8 100644 --- a/watchdog/readconfig.go +++ b/watchdog/readconfig.go @@ -92,6 +92,7 @@ func (ReadConfig) Read(hasEnv HasEnv) WatchdogConfig { } cfg.metricsPort = 8081 + cfg.maxInflight = parseIntValue(hasEnv.Getenv("max_inflight"), 0) return cfg } @@ -137,4 +138,10 @@ type WatchdogConfig struct { // metricsPort is the HTTP port to serve metrics on metricsPort int + + // maxInflight limits the number of simultaneous + // requests that the watchdog allows concurrently. + // Any request which exceeds this limit will + // have an immediate response of 429. + maxInflight int } diff --git a/watchdog/vendor/github.com/openfaas/faas-middleware/LICENSE b/watchdog/vendor/github.com/openfaas/faas-middleware/LICENSE new file mode 100644 index 00000000..ab95f4fa --- /dev/null +++ b/watchdog/vendor/github.com/openfaas/faas-middleware/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2019 OpenFaaS + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/watchdog/vendor/github.com/openfaas/faas-middleware/concurrency-limiter/concurrency_limiter.go b/watchdog/vendor/github.com/openfaas/faas-middleware/concurrency-limiter/concurrency_limiter.go new file mode 100644 index 00000000..30c75d89 --- /dev/null +++ b/watchdog/vendor/github.com/openfaas/faas-middleware/concurrency-limiter/concurrency_limiter.go @@ -0,0 +1,67 @@ +package limiter + +import ( + "fmt" + "net/http" + "sync/atomic" +) + +type ConcurrencyLimiter struct { + backendHTTPHandler http.Handler + /* + We keep two counters here in order to make it so that we can know when a request has gone to completed + in the tests. We could wrap these up in a condvar, so there's no need to spinlock, but that seems overkill + for testing. + + This is effectively a very fancy semaphore built for optimistic concurrency only, and with spinlocks. If + you want to add timeouts here / pessimistic concurrency, signaling needs to be added and/or a condvar esque + sorta thing needs to be done to wake up waiters who are waiting post-spin. + + Otherwise, there's all sorts of futzing in order to make sure that the concurrency limiter handler + has completed + The math works on overflow: + var x, y uint64 + x = (1 << 64 - 1) + y = (1 << 64 - 1) + x++ + fmt.Println(x) + fmt.Println(y) + fmt.Println(x - y) + Prints: + 0 + 18446744073709551615 + 1 + */ + requestsStarted uint64 + requestsCompleted uint64 + + maxInflightRequests uint64 +} + +func (cl *ConcurrencyLimiter) ServeHTTP(w http.ResponseWriter, r *http.Request) { + requestsStarted := atomic.AddUint64(&cl.requestsStarted, 1) + completedRequested := atomic.LoadUint64(&cl.requestsCompleted) + if requestsStarted-completedRequested > cl.maxInflightRequests { + // This is a failure pathway, and we do not want to block on the write to finish + atomic.AddUint64(&cl.requestsCompleted, 1) + w.WriteHeader(http.StatusTooManyRequests) + fmt.Fprintf(w, "Concurrent request limit exceeded. Max concurrent requests: %d\n", cl.maxInflightRequests) + return + } + cl.backendHTTPHandler.ServeHTTP(w, r) + atomic.AddUint64(&cl.requestsCompleted, 1) +} + +// NewConcurrencyLimiter creates a handler which limits the active number of active, concurrent +// requests. If the concurrency limit is less than, or equal to 0, then it will just return the handler +// passed to it. +func NewConcurrencyLimiter(handler http.Handler, concurrencyLimit int) http.Handler { + if concurrencyLimit <= 0 { + return handler + } + + return &ConcurrencyLimiter{ + backendHTTPHandler: handler, + maxInflightRequests: uint64(concurrencyLimit), + } +}