diff --git a/watchdog/README.md b/watchdog/README.md index 687e342f..2a6a5798 100644 --- a/watchdog/README.md +++ b/watchdog/README.md @@ -111,6 +111,16 @@ A new version of the watchdog is being tested over at [openfaas-incubator/of-wat This re-write is mainly structural for on-going maintenance. It will be a drop-in replacement for the existing watchdog and also has binary releases available. +### Graceful shutdowns + +The watchdog is capable of working with health-checks to provide a graceful shutdown. + +When a `SIGTERM` signal is detected within the watchdog process a Go routine will remove the `/tmp/.lock` file and mark the HTTP health-check as unhealthy and return HTTP 503. The code will then wait for the duration specified in `write_timeout`. During this window the container-orchestrator's health-check must run and complete. + +Now the orchestrator will mark this replica as unhealthy and remove it from the pool of valid HTTP endpoints. + +Now we will stop accepting new connections and wait for the value defined in `write_timeout` before finally allowing the process to exit. + ### Working with HTTP headers Headers and other request information are injected into environmental variables in the following format: diff --git a/watchdog/build.sh b/watchdog/build.sh index e0778fd6..5b7c92d1 100755 --- a/watchdog/build.sh +++ b/watchdog/build.sh @@ -1,5 +1,7 @@ -#!/bin/sh +#!/bin/bash + set -e + export arch=$(uname -m) if [ "$arch" = "armv7l" ] ; then diff --git a/watchdog/handler.go b/watchdog/handler.go index 61d6f5db..0cae45df 100644 --- a/watchdog/handler.go +++ b/watchdog/handler.go @@ -14,13 +14,12 @@ import ( "path/filepath" "strings" "sync" + "sync/atomic" "time" "github.com/openfaas/faas/watchdog/types" ) -var acceptingConnections bool - // buildFunctionInput for a GET method this is an empty byte array. func buildFunctionInput(config *WatchdogConfig, r *http.Request) ([]byte, error) { var res []byte @@ -270,7 +269,8 @@ func createLockFile() (string, error) { path := filepath.Join(os.TempDir(), ".lock") log.Printf("Writing lock-file to: %s\n", path) writeErr := ioutil.WriteFile(path, []byte{}, 0660) - acceptingConnections = true + + atomic.StoreInt32(&acceptingConnections, 1) return path, writeErr } @@ -279,13 +279,14 @@ func makeHealthHandler() func(http.ResponseWriter, *http.Request) { return func(w http.ResponseWriter, r *http.Request) { switch r.Method { case http.MethodGet: - if acceptingConnections == false || lockFilePresent() == false { - w.WriteHeader(http.StatusInternalServerError) + if atomic.LoadInt32(&acceptingConnections) == 0 || lockFilePresent() == false { + w.WriteHeader(http.StatusServiceUnavailable) return } w.WriteHeader(http.StatusOK) w.Write([]byte("OK")) + break default: w.WriteHeader(http.StatusMethodNotAllowed) diff --git a/watchdog/main.go b/watchdog/main.go index 2baf37ef..efcbcaa5 100644 --- a/watchdog/main.go +++ b/watchdog/main.go @@ -11,25 +11,30 @@ import ( "net/http" "os" "os/signal" + "path/filepath" + "sync/atomic" "syscall" "time" "github.com/openfaas/faas/watchdog/types" ) -var version bool +var ( + versionFlag bool + acceptingConnections int32 +) func main() { - flag.BoolVar(&version, "version", false, "Print the version and exit") + flag.BoolVar(&versionFlag, "version", false, "Print the version and exit") flag.Parse() printVersion() - if version == true { + if versionFlag { return } - acceptingConnections = false + atomic.StoreInt32(&acceptingConnections, 0) osEnv := types.OsEnv{} readConfig := ReadConfig{} @@ -50,24 +55,25 @@ func main() { MaxHeaderBytes: 1 << 20, // Max header of 1MB } + log.Printf("Read/write timeout: %s, %s. Port: %d\n", readTimeout, writeTimeout, config.port) http.HandleFunc("/_/health", makeHealthHandler()) http.HandleFunc("/", makeRequestHandler(&config)) - if config.suppressLock == false { - path, writeErr := createLockFile() + shutdownTimeout := config.writeTimeout - if writeErr != nil { - log.Panicf("Cannot write %s. To disable lock-file set env suppress_lock=true.\n Error: %s.\n", path, writeErr.Error()) - } - } else { - log.Println("Warning: \"suppress_lock\" is enabled. No automated health-checks will be in place for your function.") - acceptingConnections = true - } - - listenUntilShutdown(config.writeTimeout, s) + listenUntilShutdown(shutdownTimeout, s, config.suppressLock) } -func listenUntilShutdown(shutdownTimeout time.Duration, s *http.Server) { +func markUnhealthy() error { + atomic.StoreInt32(&acceptingConnections, 0) + + path := filepath.Join(os.TempDir(), ".lock") + log.Printf("Removing lock-file : %s\n", path) + removeErr := os.Remove(path) + return removeErr +} + +func listenUntilShutdown(shutdownTimeout time.Duration, s *http.Server, suppressLock bool) { idleConnsClosed := make(chan struct{}) go func() { @@ -76,23 +82,46 @@ func listenUntilShutdown(shutdownTimeout time.Duration, s *http.Server) { <-sig - log.Printf("SIGTERM received.. shutting down server") + log.Printf("SIGTERM received.. shutting down server in %s\n", shutdownTimeout.String()) - acceptingConnections = false + healthErr := markUnhealthy() + + if healthErr != nil { + log.Printf("Unable to mark unhealthy during shutdown: %s\n", healthErr.Error()) + } + + <-time.Tick(shutdownTimeout) if err := s.Shutdown(context.Background()); err != nil { // Error from closing listeners, or context timeout: log.Printf("Error in Shutdown: %v", err) } + log.Printf("No new connections allowed. Exiting in: %s\n", shutdownTimeout.String()) + <-time.Tick(shutdownTimeout) close(idleConnsClosed) }() - if err := s.ListenAndServe(); err != http.ErrServerClosed { - log.Printf("Error ListenAndServe: %v", err) - close(idleConnsClosed) + // Run the HTTP server in a separate go-routine. + go func() { + if err := s.ListenAndServe(); err != http.ErrServerClosed { + log.Printf("Error ListenAndServe: %v", err) + close(idleConnsClosed) + } + }() + + if suppressLock == false { + path, writeErr := createLockFile() + + if writeErr != nil { + log.Panicf("Cannot write %s. To disable lock-file set env suppress_lock=true.\n Error: %s.\n", path, writeErr.Error()) + } + } else { + log.Println("Warning: \"suppress_lock\" is enabled. No automated health-checks will be in place for your function.") + + atomic.StoreInt32(&acceptingConnections, 1) } <-idleConnsClosed diff --git a/watchdog/requesthandler_test.go b/watchdog/requesthandler_test.go index 8fefddb9..3f3eceba 100644 --- a/watchdog/requesthandler_test.go +++ b/watchdog/requesthandler_test.go @@ -463,7 +463,7 @@ func TestHealthHandler_StatusInternalServerError_LockFileNotPresent(t *testing.T handler := makeHealthHandler() handler(rr, req) - required := http.StatusInternalServerError + required := http.StatusServiceUnavailable if status := rr.Code; status != required { t.Errorf("handler returned wrong status code - got: %v, want: %v", status, required) }