Alter graceful shutdown sequence

- the shutdown sequence meant that the kubelet was still passing
work to the watchdog after the HTTP socket was closed. This change
means that the kubelet has a chance to run its check before we
finally stop accepting new connections. It will require some
basic co-ordination between the kubelet's checking period and the
"write_timeout" value in the container.

Tested with Kubernetes on GKE - before the change some Pods were
giving a connection refused error due to them being not detected
as unhealthy. Now I receive 0% error rate even with 20 qps.

Issue was shown by scaling to 20 replicas, starting a test with
hey and then scaling to 1 replica while tailing the logs from the
gateway. Before I saw some 502, now I see just 200s.

Signed-off-by: Alex Ellis (VMware) <alexellis2@gmail.com>
This commit is contained in:
Alex Ellis (VMware) 2018-09-17 11:35:57 +01:00 committed by Alex Ellis
parent d9f33435f0
commit e67811c91c
5 changed files with 70 additions and 28 deletions

View File

@ -111,6 +111,16 @@ A new version of the watchdog is being tested over at [openfaas-incubator/of-wat
This re-write is mainly structural for on-going maintenance. It will be a drop-in replacement for the existing watchdog and also has binary releases available.
### Graceful shutdowns
The watchdog is capable of working with health-checks to provide a graceful shutdown.
When a `SIGTERM` signal is detected within the watchdog process a Go routine will remove the `/tmp/.lock` file and mark the HTTP health-check as unhealthy and return HTTP 503. The code will then wait for the duration specified in `write_timeout`. During this window the container-orchestrator's health-check must run and complete.
Now the orchestrator will mark this replica as unhealthy and remove it from the pool of valid HTTP endpoints.
Now we will stop accepting new connections and wait for the value defined in `write_timeout` before finally allowing the process to exit.
### Working with HTTP headers
Headers and other request information are injected into environmental variables in the following format:

View File

@ -1,5 +1,7 @@
#!/bin/sh
#!/bin/bash
set -e
export arch=$(uname -m)
if [ "$arch" = "armv7l" ] ; then

View File

@ -14,13 +14,12 @@ import (
"path/filepath"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/openfaas/faas/watchdog/types"
)
var acceptingConnections bool
// buildFunctionInput for a GET method this is an empty byte array.
func buildFunctionInput(config *WatchdogConfig, r *http.Request) ([]byte, error) {
var res []byte
@ -270,7 +269,8 @@ func createLockFile() (string, error) {
path := filepath.Join(os.TempDir(), ".lock")
log.Printf("Writing lock-file to: %s\n", path)
writeErr := ioutil.WriteFile(path, []byte{}, 0660)
acceptingConnections = true
atomic.StoreInt32(&acceptingConnections, 1)
return path, writeErr
}
@ -279,13 +279,14 @@ func makeHealthHandler() func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case http.MethodGet:
if acceptingConnections == false || lockFilePresent() == false {
w.WriteHeader(http.StatusInternalServerError)
if atomic.LoadInt32(&acceptingConnections) == 0 || lockFilePresent() == false {
w.WriteHeader(http.StatusServiceUnavailable)
return
}
w.WriteHeader(http.StatusOK)
w.Write([]byte("OK"))
break
default:
w.WriteHeader(http.StatusMethodNotAllowed)

View File

@ -11,25 +11,30 @@ import (
"net/http"
"os"
"os/signal"
"path/filepath"
"sync/atomic"
"syscall"
"time"
"github.com/openfaas/faas/watchdog/types"
)
var version bool
var (
versionFlag bool
acceptingConnections int32
)
func main() {
flag.BoolVar(&version, "version", false, "Print the version and exit")
flag.BoolVar(&versionFlag, "version", false, "Print the version and exit")
flag.Parse()
printVersion()
if version == true {
if versionFlag {
return
}
acceptingConnections = false
atomic.StoreInt32(&acceptingConnections, 0)
osEnv := types.OsEnv{}
readConfig := ReadConfig{}
@ -50,24 +55,25 @@ func main() {
MaxHeaderBytes: 1 << 20, // Max header of 1MB
}
log.Printf("Read/write timeout: %s, %s. Port: %d\n", readTimeout, writeTimeout, config.port)
http.HandleFunc("/_/health", makeHealthHandler())
http.HandleFunc("/", makeRequestHandler(&config))
if config.suppressLock == false {
path, writeErr := createLockFile()
shutdownTimeout := config.writeTimeout
if writeErr != nil {
log.Panicf("Cannot write %s. To disable lock-file set env suppress_lock=true.\n Error: %s.\n", path, writeErr.Error())
}
} else {
log.Println("Warning: \"suppress_lock\" is enabled. No automated health-checks will be in place for your function.")
acceptingConnections = true
}
listenUntilShutdown(config.writeTimeout, s)
listenUntilShutdown(shutdownTimeout, s, config.suppressLock)
}
func listenUntilShutdown(shutdownTimeout time.Duration, s *http.Server) {
func markUnhealthy() error {
atomic.StoreInt32(&acceptingConnections, 0)
path := filepath.Join(os.TempDir(), ".lock")
log.Printf("Removing lock-file : %s\n", path)
removeErr := os.Remove(path)
return removeErr
}
func listenUntilShutdown(shutdownTimeout time.Duration, s *http.Server, suppressLock bool) {
idleConnsClosed := make(chan struct{})
go func() {
@ -76,23 +82,46 @@ func listenUntilShutdown(shutdownTimeout time.Duration, s *http.Server) {
<-sig
log.Printf("SIGTERM received.. shutting down server")
log.Printf("SIGTERM received.. shutting down server in %s\n", shutdownTimeout.String())
acceptingConnections = false
healthErr := markUnhealthy()
if healthErr != nil {
log.Printf("Unable to mark unhealthy during shutdown: %s\n", healthErr.Error())
}
<-time.Tick(shutdownTimeout)
if err := s.Shutdown(context.Background()); err != nil {
// Error from closing listeners, or context timeout:
log.Printf("Error in Shutdown: %v", err)
}
log.Printf("No new connections allowed. Exiting in: %s\n", shutdownTimeout.String())
<-time.Tick(shutdownTimeout)
close(idleConnsClosed)
}()
if err := s.ListenAndServe(); err != http.ErrServerClosed {
log.Printf("Error ListenAndServe: %v", err)
close(idleConnsClosed)
// Run the HTTP server in a separate go-routine.
go func() {
if err := s.ListenAndServe(); err != http.ErrServerClosed {
log.Printf("Error ListenAndServe: %v", err)
close(idleConnsClosed)
}
}()
if suppressLock == false {
path, writeErr := createLockFile()
if writeErr != nil {
log.Panicf("Cannot write %s. To disable lock-file set env suppress_lock=true.\n Error: %s.\n", path, writeErr.Error())
}
} else {
log.Println("Warning: \"suppress_lock\" is enabled. No automated health-checks will be in place for your function.")
atomic.StoreInt32(&acceptingConnections, 1)
}
<-idleConnsClosed

View File

@ -463,7 +463,7 @@ func TestHealthHandler_StatusInternalServerError_LockFileNotPresent(t *testing.T
handler := makeHealthHandler()
handler(rr, req)
required := http.StatusInternalServerError
required := http.StatusServiceUnavailable
if status := rr.Code; status != required {
t.Errorf("handler returned wrong status code - got: %v, want: %v", status, required)
}