mirror of
https://github.com/openfaas/faas.git
synced 2025-06-08 16:26:47 +00:00
If the watchdog is sent SIGTERM from an external process then it should stop accepting new connections and attempt to finish the work in progress. This change makes use of the new ability in Go 1.9 and onwards to cancel a HTTP server gracefully. The write_timeout duration is used as a grace period to allow all in-flight requests to complete. The pattern is taken directly from the offical example in the Golang documentation. [1] Further tuning and testing may be needed for Windows containers which have a different set of signals for closing work. This change aims to cover the majority use-case for Linux containers. The HTTP health-check is also invalidated by creating an and expression with the existing lock file. Tested with Kubernetes by deploying a custom watchdog and the fprocess of `env`. Log message was observed when scaling down and connections stopped being accepted on terminating replica. Also corrects some typos from previous PR. [1] https://golang.org/pkg/net/http/#Server.Shutdown Signed-off-by: Alex Ellis (VMware) <alexellis2@gmail.com>
381 lines
8.1 KiB
Go
381 lines
8.1 KiB
Go
// Copyright (c) Alex Ellis 2017. All rights reserved.
|
|
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
|
|
|
|
package main
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"log"
|
|
"net/http"
|
|
"os"
|
|
"os/exec"
|
|
"os/signal"
|
|
"path/filepath"
|
|
"strings"
|
|
"sync"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/openfaas/faas/watchdog/types"
|
|
)
|
|
|
|
var acceptingConnections bool
|
|
|
|
// buildFunctionInput for a GET method this is an empty byte array.
|
|
func buildFunctionInput(config *WatchdogConfig, r *http.Request) ([]byte, error) {
|
|
var res []byte
|
|
var requestBytes []byte
|
|
var err error
|
|
|
|
if r.Body == nil {
|
|
return res, nil
|
|
}
|
|
defer r.Body.Close()
|
|
|
|
if err != nil {
|
|
log.Println(err)
|
|
return res, err
|
|
}
|
|
|
|
requestBytes, err = ioutil.ReadAll(r.Body)
|
|
if config.marshalRequest {
|
|
marshalRes, marshalErr := types.MarshalRequest(requestBytes, &r.Header)
|
|
err = marshalErr
|
|
res = marshalRes
|
|
} else {
|
|
res = requestBytes
|
|
}
|
|
|
|
return res, err
|
|
}
|
|
|
|
func debugHeaders(source *http.Header, direction string) {
|
|
for k, vv := range *source {
|
|
fmt.Printf("[%s] %s=%s\n", direction, k, vv)
|
|
}
|
|
}
|
|
|
|
type requestInfo struct {
|
|
headerWritten bool
|
|
}
|
|
|
|
func pipeRequest(config *WatchdogConfig, w http.ResponseWriter, r *http.Request, method string) {
|
|
startTime := time.Now()
|
|
|
|
parts := strings.Split(config.faasProcess, " ")
|
|
|
|
ri := &requestInfo{}
|
|
|
|
if config.debugHeaders {
|
|
debugHeaders(&r.Header, "in")
|
|
}
|
|
|
|
log.Println("Forking fprocess.")
|
|
|
|
targetCmd := exec.Command(parts[0], parts[1:]...)
|
|
|
|
envs := getAdditionalEnvs(config, r, method)
|
|
if len(envs) > 0 {
|
|
targetCmd.Env = envs
|
|
}
|
|
|
|
writer, _ := targetCmd.StdinPipe()
|
|
|
|
var out []byte
|
|
var err error
|
|
var requestBody []byte
|
|
|
|
var wg sync.WaitGroup
|
|
|
|
wgCount := 2
|
|
|
|
var buildInputErr error
|
|
requestBody, buildInputErr = buildFunctionInput(config, r)
|
|
if buildInputErr != nil {
|
|
ri.headerWritten = true
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
// I.e. "exit code 1"
|
|
w.Write([]byte(buildInputErr.Error()))
|
|
|
|
// Verbose message - i.e. stack trace
|
|
w.Write([]byte("\n"))
|
|
w.Write(out)
|
|
|
|
return
|
|
}
|
|
|
|
wg.Add(wgCount)
|
|
|
|
var timer *time.Timer
|
|
|
|
if config.execTimeout > 0*time.Second {
|
|
timer = time.NewTimer(config.execTimeout)
|
|
|
|
go func() {
|
|
<-timer.C
|
|
log.Printf("Killing process: %s\n", config.faasProcess)
|
|
if targetCmd != nil && targetCmd.Process != nil {
|
|
ri.headerWritten = true
|
|
w.WriteHeader(http.StatusRequestTimeout)
|
|
|
|
w.Write([]byte("Killed process.\n"))
|
|
|
|
val := targetCmd.Process.Kill()
|
|
if val != nil {
|
|
log.Printf("Killed process: %s - error %s\n", config.faasProcess, val.Error())
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// Write to pipe in separate go-routine to prevent blocking
|
|
go func() {
|
|
defer wg.Done()
|
|
writer.Write(requestBody)
|
|
writer.Close()
|
|
}()
|
|
|
|
if config.combineOutput {
|
|
// Read the output from stdout/stderr and combine into one variable for output.
|
|
go func() {
|
|
defer wg.Done()
|
|
|
|
out, err = targetCmd.CombinedOutput()
|
|
}()
|
|
} else {
|
|
go func() {
|
|
var b bytes.Buffer
|
|
targetCmd.Stderr = &b
|
|
|
|
defer wg.Done()
|
|
|
|
out, err = targetCmd.Output()
|
|
if b.Len() > 0 {
|
|
log.Printf("stderr: %s", b.Bytes())
|
|
}
|
|
b.Reset()
|
|
}()
|
|
}
|
|
|
|
wg.Wait()
|
|
if timer != nil {
|
|
timer.Stop()
|
|
}
|
|
|
|
if err != nil {
|
|
if config.writeDebug == true {
|
|
log.Printf("Success=%t, Error=%s\n", targetCmd.ProcessState.Success(), err.Error())
|
|
log.Printf("Out=%s\n", out)
|
|
}
|
|
|
|
if ri.headerWritten == false {
|
|
w.WriteHeader(http.StatusInternalServerError)
|
|
response := bytes.NewBufferString(err.Error())
|
|
w.Write(response.Bytes())
|
|
w.Write([]byte("\n"))
|
|
if len(out) > 0 {
|
|
w.Write(out)
|
|
}
|
|
ri.headerWritten = true
|
|
}
|
|
return
|
|
}
|
|
|
|
var bytesWritten string
|
|
if config.writeDebug == true {
|
|
os.Stdout.Write(out)
|
|
} else {
|
|
bytesWritten = fmt.Sprintf("Wrote %d Bytes", len(out))
|
|
}
|
|
|
|
if len(config.contentType) > 0 {
|
|
w.Header().Set("Content-Type", config.contentType)
|
|
} else {
|
|
|
|
// Match content-type of caller if no override specified.
|
|
clientContentType := r.Header.Get("Content-Type")
|
|
if len(clientContentType) > 0 {
|
|
w.Header().Set("Content-Type", clientContentType)
|
|
}
|
|
}
|
|
|
|
execDuration := time.Since(startTime).Seconds()
|
|
if ri.headerWritten == false {
|
|
w.Header().Set("X-Duration-Seconds", fmt.Sprintf("%f", execDuration))
|
|
ri.headerWritten = true
|
|
w.WriteHeader(200)
|
|
w.Write(out)
|
|
}
|
|
|
|
if config.debugHeaders {
|
|
header := w.Header()
|
|
debugHeaders(&header, "out")
|
|
}
|
|
|
|
if len(bytesWritten) > 0 {
|
|
log.Printf("%s - Duration: %f seconds", bytesWritten, execDuration)
|
|
} else {
|
|
log.Printf("Duration: %f seconds", execDuration)
|
|
}
|
|
}
|
|
|
|
func getAdditionalEnvs(config *WatchdogConfig, r *http.Request, method string) []string {
|
|
var envs []string
|
|
|
|
if config.cgiHeaders {
|
|
envs = os.Environ()
|
|
|
|
for k, v := range r.Header {
|
|
kv := fmt.Sprintf("Http_%s=%s", strings.Replace(k, "-", "_", -1), v[0])
|
|
envs = append(envs, kv)
|
|
}
|
|
|
|
envs = append(envs, fmt.Sprintf("Http_Method=%s", method))
|
|
envs = append(envs, fmt.Sprintf("Http_ContentLength=%d", r.ContentLength))
|
|
|
|
if config.writeDebug {
|
|
log.Println("Query ", r.URL.RawQuery)
|
|
}
|
|
|
|
if len(r.URL.RawQuery) > 0 {
|
|
envs = append(envs, fmt.Sprintf("Http_Query=%s", r.URL.RawQuery))
|
|
}
|
|
|
|
if config.writeDebug {
|
|
log.Println("Path ", r.URL.Path)
|
|
}
|
|
|
|
if len(r.URL.Path) > 0 {
|
|
envs = append(envs, fmt.Sprintf("Http_Path=%s", r.URL.Path))
|
|
}
|
|
|
|
}
|
|
|
|
return envs
|
|
}
|
|
|
|
func lockFilePresent() bool {
|
|
path := filepath.Join(os.TempDir(), ".lock")
|
|
if _, err := os.Stat(path); os.IsNotExist(err) {
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
func createLockFile() (string, error) {
|
|
path := filepath.Join(os.TempDir(), ".lock")
|
|
log.Printf("Writing lock-file to: %s\n", path)
|
|
writeErr := ioutil.WriteFile(path, []byte{}, 0660)
|
|
acceptingConnections = true
|
|
|
|
return path, writeErr
|
|
}
|
|
|
|
func makeHealthHandler() func(http.ResponseWriter, *http.Request) {
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
switch r.Method {
|
|
case http.MethodGet:
|
|
if acceptingConnections == false || lockFilePresent() == false {
|
|
w.WriteHeader(http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
w.WriteHeader(http.StatusOK)
|
|
w.Write([]byte("OK"))
|
|
break
|
|
default:
|
|
w.WriteHeader(http.StatusMethodNotAllowed)
|
|
}
|
|
}
|
|
}
|
|
|
|
func makeRequestHandler(config *WatchdogConfig) func(http.ResponseWriter, *http.Request) {
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
switch r.Method {
|
|
case
|
|
http.MethodPost,
|
|
http.MethodPut,
|
|
http.MethodDelete,
|
|
http.MethodGet:
|
|
pipeRequest(config, w, r, r.Method)
|
|
break
|
|
default:
|
|
w.WriteHeader(http.StatusMethodNotAllowed)
|
|
|
|
}
|
|
}
|
|
}
|
|
|
|
func main() {
|
|
acceptingConnections = false
|
|
|
|
osEnv := types.OsEnv{}
|
|
readConfig := ReadConfig{}
|
|
config := readConfig.Read(osEnv)
|
|
|
|
if len(config.faasProcess) == 0 {
|
|
log.Panicln("Provide a valid process via fprocess environmental variable.")
|
|
return
|
|
}
|
|
|
|
readTimeout := config.readTimeout
|
|
writeTimeout := config.writeTimeout
|
|
|
|
s := &http.Server{
|
|
Addr: fmt.Sprintf(":%d", config.port),
|
|
ReadTimeout: readTimeout,
|
|
WriteTimeout: writeTimeout,
|
|
MaxHeaderBytes: 1 << 20, // Max header of 1MB
|
|
}
|
|
|
|
http.HandleFunc("/_/health", makeHealthHandler())
|
|
http.HandleFunc("/", makeRequestHandler(&config))
|
|
|
|
if config.suppressLock == false {
|
|
path, writeErr := createLockFile()
|
|
|
|
if writeErr != nil {
|
|
log.Panicf("Cannot write %s. To disable lock-file set env suppress_lock=true.\n Error: %s.\n", path, writeErr.Error())
|
|
}
|
|
} else {
|
|
log.Println("Warning: \"suppress_lock\" is enabled. No automated health-checks will be in place for your function.")
|
|
acceptingConnections = true
|
|
}
|
|
|
|
listenUntilShutdown(config.writeTimeout, s)
|
|
}
|
|
|
|
func listenUntilShutdown(shutdownTimeout time.Duration, s *http.Server) {
|
|
|
|
idleConnsClosed := make(chan struct{})
|
|
go func() {
|
|
sig := make(chan os.Signal, 1)
|
|
signal.Notify(sig, syscall.SIGTERM)
|
|
|
|
<-sig
|
|
|
|
log.Printf("SIGTERM received.. shutting down server")
|
|
|
|
acceptingConnections = false
|
|
|
|
if err := s.Shutdown(context.Background()); err != nil {
|
|
// Error from closing listeners, or context timeout:
|
|
log.Printf("Error in Shutdown: %v", err)
|
|
}
|
|
|
|
<-time.Tick(shutdownTimeout)
|
|
|
|
close(idleConnsClosed)
|
|
}()
|
|
|
|
if err := s.ListenAndServe(); err != http.ErrServerClosed {
|
|
log.Printf("Error ListenAndServe: %v", err)
|
|
close(idleConnsClosed)
|
|
}
|
|
|
|
<-idleConnsClosed
|
|
}
|