faas/watchdog/main.go
John McCabe fac3345668 Use http package consts for http methods
This commit replaces occurences of http method strings with the
corresponding consts from the http package.

*Note* UPDATE is not strictly speaking a valid method and as such isn't
part of the http package (should be a PUT or PATCH?)

Signed-off-by: John McCabe <john@johnmccabe.net>
2018-03-23 16:37:33 +00:00

351 lines
7.5 KiB
Go

// Copyright (c) Alex Ellis 2017. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
package main
import (
"bytes"
"fmt"
"io/ioutil"
"log"
"net/http"
"os"
"os/exec"
"path/filepath"
"strings"
"sync"
"time"
"github.com/openfaas/faas/watchdog/types"
)
// buildFunctionInput for a GET method this is an empty byte array.
func buildFunctionInput(config *WatchdogConfig, r *http.Request) ([]byte, error) {
var res []byte
var requestBytes []byte
var err error
if r.Body == nil {
return res, nil
}
defer r.Body.Close()
if err != nil {
log.Println(err)
return res, err
}
requestBytes, err = ioutil.ReadAll(r.Body)
if config.marshalRequest {
marshalRes, marshalErr := types.MarshalRequest(requestBytes, &r.Header)
err = marshalErr
res = marshalRes
} else {
res = requestBytes
}
return res, err
}
func debugHeaders(source *http.Header, direction string) {
for k, vv := range *source {
fmt.Printf("[%s] %s=%s\n", direction, k, vv)
}
}
type requestInfo struct {
headerWritten bool
}
func pipeRequest(config *WatchdogConfig, w http.ResponseWriter, r *http.Request, method string) {
startTime := time.Now()
parts := strings.Split(config.faasProcess, " ")
ri := &requestInfo{}
if config.debugHeaders {
debugHeaders(&r.Header, "in")
}
log.Println("Forking fprocess.")
targetCmd := exec.Command(parts[0], parts[1:]...)
envs := getAdditionalEnvs(config, r, method)
if len(envs) > 0 {
targetCmd.Env = envs
}
writer, _ := targetCmd.StdinPipe()
var out []byte
var err error
var requestBody []byte
var wg sync.WaitGroup
wgCount := 2
var buildInputErr error
requestBody, buildInputErr = buildFunctionInput(config, r)
if buildInputErr != nil {
ri.headerWritten = true
w.WriteHeader(http.StatusBadRequest)
// I.e. "exit code 1"
w.Write([]byte(buildInputErr.Error()))
// Verbose message - i.e. stack trace
w.Write([]byte("\n"))
w.Write(out)
return
}
wg.Add(wgCount)
var timer *time.Timer
if config.execTimeout > 0*time.Second {
timer = time.NewTimer(config.execTimeout)
go func() {
<-timer.C
log.Printf("Killing process: %s\n", config.faasProcess)
if targetCmd != nil && targetCmd.Process != nil {
ri.headerWritten = true
w.WriteHeader(http.StatusRequestTimeout)
w.Write([]byte("Killed process.\n"))
val := targetCmd.Process.Kill()
if val != nil {
log.Printf("Killed process: %s - error %s\n", config.faasProcess, val.Error())
}
}
}()
}
// Write to pipe in separate go-routine to prevent blocking
go func() {
defer wg.Done()
writer.Write(requestBody)
writer.Close()
}()
if config.combineOutput {
// Read the output from stdout/stderr and combine into one variable for output.
go func() {
defer wg.Done()
out, err = targetCmd.CombinedOutput()
}()
} else {
go func() {
var b bytes.Buffer
targetCmd.Stderr = &b
defer wg.Done()
out, err = targetCmd.Output()
if b.Len() > 0 {
log.Printf("stderr: %s", b.Bytes())
}
b.Reset()
}()
}
wg.Wait()
if timer != nil {
timer.Stop()
}
if err != nil {
if config.writeDebug == true {
log.Printf("Success=%t, Error=%s\n", targetCmd.ProcessState.Success(), err.Error())
log.Printf("Out=%s\n", out)
}
if ri.headerWritten == false {
w.WriteHeader(http.StatusInternalServerError)
response := bytes.NewBufferString(err.Error())
w.Write(response.Bytes())
w.Write([]byte("\n"))
if len(out) > 0 {
w.Write(out)
}
ri.headerWritten = true
}
return
}
var bytesWritten string
if config.writeDebug == true {
os.Stdout.Write(out)
} else {
bytesWritten = fmt.Sprintf("Wrote %d Bytes", len(out))
}
if len(config.contentType) > 0 {
w.Header().Set("Content-Type", config.contentType)
} else {
// Match content-type of caller if no override specified.
clientContentType := r.Header.Get("Content-Type")
if len(clientContentType) > 0 {
w.Header().Set("Content-Type", clientContentType)
}
}
execDuration := time.Since(startTime).Seconds()
if ri.headerWritten == false {
w.Header().Set("X-Duration-Seconds", fmt.Sprintf("%f", execDuration))
ri.headerWritten = true
w.WriteHeader(200)
w.Write(out)
}
if config.debugHeaders {
header := w.Header()
debugHeaders(&header, "out")
}
if len(bytesWritten) > 0 {
log.Printf("%s - Duration: %f seconds", bytesWritten, execDuration)
} else {
log.Printf("Duration: %f seconds", execDuration)
}
}
func getAdditionalEnvs(config *WatchdogConfig, r *http.Request, method string) []string {
var envs []string
if config.cgiHeaders {
envs = os.Environ()
for k, v := range r.Header {
kv := fmt.Sprintf("Http_%s=%s", strings.Replace(k, "-", "_", -1), v[0])
envs = append(envs, kv)
}
envs = append(envs, fmt.Sprintf("Http_Method=%s", method))
envs = append(envs, fmt.Sprintf("Http_ContentLength=%d", r.ContentLength))
if config.writeDebug {
log.Println("Query ", r.URL.RawQuery)
}
if len(r.URL.RawQuery) > 0 {
envs = append(envs, fmt.Sprintf("Http_Query=%s", r.URL.RawQuery))
}
if config.writeDebug {
log.Println("Path ", r.URL.Path)
}
if len(r.URL.Path) > 0 {
envs = append(envs, fmt.Sprintf("Http_Path=%s", r.URL.Path))
}
}
return envs
}
func lockFilePresent() bool {
path := filepath.Join(os.TempDir(), ".lock")
if _, err := os.Stat(path); os.IsNotExist(err) {
return false
}
return true
}
func createLockFile() error {
path := filepath.Join(os.TempDir(), ".lock")
log.Printf("Writing lock-file to: %s\n", path)
writeErr := ioutil.WriteFile(path, []byte{}, 0660)
return writeErr
}
func removeLockFile() error {
path := filepath.Join(os.TempDir(), ".lock")
log.Printf("Removing lock-file : %s\n", path)
removeErr := os.Remove(path)
return removeErr
}
func makeHealthHandler() func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case http.MethodGet:
if lockFilePresent() == false {
w.WriteHeader(http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
w.Write([]byte("OK"))
break
default:
w.WriteHeader(http.StatusMethodNotAllowed)
}
}
}
func makeRequestHandler(config *WatchdogConfig) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case
http.MethodPost,
http.MethodPut,
http.MethodDelete,
"UPDATE",
http.MethodGet:
pipeRequest(config, w, r, r.Method)
break
default:
w.WriteHeader(http.StatusMethodNotAllowed)
}
}
}
func main() {
osEnv := types.OsEnv{}
readConfig := ReadConfig{}
config := readConfig.Read(osEnv)
if len(config.faasProcess) == 0 {
log.Panicln("Provide a valid process via fprocess environmental variable.")
return
}
readTimeout := config.readTimeout
writeTimeout := config.writeTimeout
s := &http.Server{
Addr: fmt.Sprintf(":%d", config.port),
ReadTimeout: readTimeout,
WriteTimeout: writeTimeout,
MaxHeaderBytes: 1 << 20, // Max header of 1MB
}
http.HandleFunc("/_/health", makeHealthHandler())
http.HandleFunc("/", makeRequestHandler(&config))
if config.suppressLock == false {
path := filepath.Join(os.TempDir(), ".lock")
log.Printf("Writing lock-file to: %s\n", path)
writeErr := ioutil.WriteFile(path, []byte{}, 0660)
if writeErr != nil {
log.Panicf("Cannot write %s. To disable lock-file set env suppress_lock=true.\n Error: %s.\n", path, writeErr.Error())
}
} else {
log.Println("Warning: \"suppress_lock\" is enabled. No automated health-checks will be in place for your function.")
}
log.Fatal(s.ListenAndServe())
}