// Copyright (c) Alex Ellis 2017. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. // Package main provides the OpenFaaS Classic Watchdog. The Classic Watchdog is a HTTP // shim for serverless functions providing health-checking, graceful shutdowns, // timeouts and a consistent logging experience. package main import ( "context" "flag" "fmt" "log" "net/http" "os" "os/signal" "path/filepath" "sync/atomic" "syscall" "time" "github.com/openfaas/faas/watchdog/metrics" "github.com/openfaas/faas/watchdog/types" ) var ( acceptingConnections int32 ) func main() { var runHealthcheck bool var versionFlag bool flag.BoolVar(&versionFlag, "version", false, "Print the version and exit") flag.BoolVar(&runHealthcheck, "run-healthcheck", false, "Check for the a lock-file, when using an exec healthcheck. Exit 0 for present, non-zero when not found.") flag.Parse() if runHealthcheck { if lockFilePresent() { os.Exit(0) } fmt.Fprintf(os.Stderr, "unable to find lock file.\n") os.Exit(1) } printVersion() if versionFlag { return } atomic.StoreInt32(&acceptingConnections, 0) osEnv := types.OsEnv{} readConfig := ReadConfig{} config := readConfig.Read(osEnv) if len(config.faasProcess) == 0 { log.Panicln("Provide a valid process via fprocess environmental variable.") return } readTimeout := config.readTimeout writeTimeout := config.writeTimeout s := &http.Server{ Addr: fmt.Sprintf(":%d", config.port), ReadTimeout: readTimeout, WriteTimeout: writeTimeout, MaxHeaderBytes: 1 << 20, // Max header of 1MB } httpMetrics := metrics.NewHttp() log.Printf("Timeouts: read: %s, write: %s hard: %s.\n", readTimeout, writeTimeout, config.execTimeout) log.Printf("Listening on port: %d\n", config.port) http.HandleFunc("/_/health", makeHealthHandler()) http.HandleFunc("/", metrics.InstrumentHandler(makeRequestHandler(&config), httpMetrics)) metricsServer := metrics.MetricsServer{} metricsServer.Register(config.metricsPort) cancel := make(chan bool) go metricsServer.Serve(cancel) shutdownTimeout := config.writeTimeout listenUntilShutdown(shutdownTimeout, s, config.suppressLock) } func markUnhealthy() error { atomic.StoreInt32(&acceptingConnections, 0) path := filepath.Join(os.TempDir(), ".lock") log.Printf("Removing lock-file : %s\n", path) removeErr := os.Remove(path) return removeErr } // listenUntilShutdown will listen for HTTP requests until SIGTERM // is sent at which point the code will wait `shutdownTimeout` before // closing off connections and a futher `shutdownTimeout` before // exiting func listenUntilShutdown(shutdownTimeout time.Duration, s *http.Server, suppressLock bool) { idleConnsClosed := make(chan struct{}) go func() { sig := make(chan os.Signal, 1) signal.Notify(sig, syscall.SIGTERM) <-sig log.Printf("SIGTERM received.. shutting down server in %s\n", shutdownTimeout.String()) healthErr := markUnhealthy() if healthErr != nil { log.Printf("Unable to mark unhealthy during shutdown: %s\n", healthErr.Error()) } <-time.Tick(shutdownTimeout) if err := s.Shutdown(context.Background()); err != nil { // Error from closing listeners, or context timeout: log.Printf("Error in Shutdown: %v", err) } log.Printf("No new connections allowed. Exiting in: %s\n", shutdownTimeout.String()) <-time.Tick(shutdownTimeout) close(idleConnsClosed) }() // Run the HTTP server in a separate go-routine. go func() { if err := s.ListenAndServe(); err != http.ErrServerClosed { log.Printf("Error ListenAndServe: %v", err) close(idleConnsClosed) } }() if suppressLock == false { path, writeErr := createLockFile() if writeErr != nil { log.Panicf("Cannot write %s. To disable lock-file set env suppress_lock=true.\n Error: %s.\n", path, writeErr.Error()) } } else { log.Println("Warning: \"suppress_lock\" is enabled. No automated health-checks will be in place for your function.") atomic.StoreInt32(&acceptingConnections, 1) } <-idleConnsClosed } func printVersion() { sha := "unknown" if len(GitCommit) > 0 { sha = GitCommit } log.Printf("Version: %v\tSHA: %v\n", BuildVersion(), sha) }