diff --git a/watchdog/handler.go b/watchdog/handler.go new file mode 100644 index 00000000..c2ca63f6 --- /dev/null +++ b/watchdog/handler.go @@ -0,0 +1,307 @@ +// Copyright (c) Alex Ellis 2017. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +package main + +import ( + "bytes" + "fmt" + "io/ioutil" + "log" + "net/http" + "os" + "os/exec" + "path/filepath" + "strings" + "sync" + "time" + + "github.com/openfaas/faas/watchdog/types" +) + +var acceptingConnections bool + +// buildFunctionInput for a GET method this is an empty byte array. +func buildFunctionInput(config *WatchdogConfig, r *http.Request) ([]byte, error) { + var res []byte + var requestBytes []byte + var err error + + if r.Body == nil { + return res, nil + } + defer r.Body.Close() + + if err != nil { + log.Println(err) + return res, err + } + + requestBytes, err = ioutil.ReadAll(r.Body) + if config.marshalRequest { + marshalRes, marshalErr := types.MarshalRequest(requestBytes, &r.Header) + err = marshalErr + res = marshalRes + } else { + res = requestBytes + } + + return res, err +} + +func debugHeaders(source *http.Header, direction string) { + for k, vv := range *source { + fmt.Printf("[%s] %s=%s\n", direction, k, vv) + } +} + +type requestInfo struct { + headerWritten bool +} + +func pipeRequest(config *WatchdogConfig, w http.ResponseWriter, r *http.Request, method string) { + startTime := time.Now() + + parts := strings.Split(config.faasProcess, " ") + + ri := &requestInfo{} + + if config.debugHeaders { + debugHeaders(&r.Header, "in") + } + + log.Println("Forking fprocess.") + + targetCmd := exec.Command(parts[0], parts[1:]...) + + envs := getAdditionalEnvs(config, r, method) + if len(envs) > 0 { + targetCmd.Env = envs + } + + writer, _ := targetCmd.StdinPipe() + + var out []byte + var err error + var requestBody []byte + + var wg sync.WaitGroup + + wgCount := 2 + + var buildInputErr error + requestBody, buildInputErr = buildFunctionInput(config, r) + if buildInputErr != nil { + ri.headerWritten = true + w.WriteHeader(http.StatusBadRequest) + // I.e. "exit code 1" + w.Write([]byte(buildInputErr.Error())) + + // Verbose message - i.e. stack trace + w.Write([]byte("\n")) + w.Write(out) + + return + } + + wg.Add(wgCount) + + var timer *time.Timer + + if config.execTimeout > 0*time.Second { + timer = time.NewTimer(config.execTimeout) + + go func() { + <-timer.C + log.Printf("Killing process: %s\n", config.faasProcess) + if targetCmd != nil && targetCmd.Process != nil { + ri.headerWritten = true + w.WriteHeader(http.StatusRequestTimeout) + + w.Write([]byte("Killed process.\n")) + + val := targetCmd.Process.Kill() + if val != nil { + log.Printf("Killed process: %s - error %s\n", config.faasProcess, val.Error()) + } + } + }() + } + + // Write to pipe in separate go-routine to prevent blocking + go func() { + defer wg.Done() + writer.Write(requestBody) + writer.Close() + }() + + if config.combineOutput { + // Read the output from stdout/stderr and combine into one variable for output. + go func() { + defer wg.Done() + + out, err = targetCmd.CombinedOutput() + }() + } else { + go func() { + var b bytes.Buffer + targetCmd.Stderr = &b + + defer wg.Done() + + out, err = targetCmd.Output() + if b.Len() > 0 { + log.Printf("stderr: %s", b.Bytes()) + } + b.Reset() + }() + } + + wg.Wait() + if timer != nil { + timer.Stop() + } + + if err != nil { + if config.writeDebug == true { + log.Printf("Success=%t, Error=%s\n", targetCmd.ProcessState.Success(), err.Error()) + log.Printf("Out=%s\n", out) + } + + if ri.headerWritten == false { + w.WriteHeader(http.StatusInternalServerError) + response := bytes.NewBufferString(err.Error()) + w.Write(response.Bytes()) + w.Write([]byte("\n")) + if len(out) > 0 { + w.Write(out) + } + ri.headerWritten = true + } + return + } + + var bytesWritten string + if config.writeDebug == true { + os.Stdout.Write(out) + } else { + bytesWritten = fmt.Sprintf("Wrote %d Bytes", len(out)) + } + + if len(config.contentType) > 0 { + w.Header().Set("Content-Type", config.contentType) + } else { + + // Match content-type of caller if no override specified. + clientContentType := r.Header.Get("Content-Type") + if len(clientContentType) > 0 { + w.Header().Set("Content-Type", clientContentType) + } + } + + execDuration := time.Since(startTime).Seconds() + if ri.headerWritten == false { + w.Header().Set("X-Duration-Seconds", fmt.Sprintf("%f", execDuration)) + ri.headerWritten = true + w.WriteHeader(200) + w.Write(out) + } + + if config.debugHeaders { + header := w.Header() + debugHeaders(&header, "out") + } + + if len(bytesWritten) > 0 { + log.Printf("%s - Duration: %f seconds", bytesWritten, execDuration) + } else { + log.Printf("Duration: %f seconds", execDuration) + } +} + +func getAdditionalEnvs(config *WatchdogConfig, r *http.Request, method string) []string { + var envs []string + + if config.cgiHeaders { + envs = os.Environ() + + for k, v := range r.Header { + kv := fmt.Sprintf("Http_%s=%s", strings.Replace(k, "-", "_", -1), v[0]) + envs = append(envs, kv) + } + + envs = append(envs, fmt.Sprintf("Http_Method=%s", method)) + envs = append(envs, fmt.Sprintf("Http_ContentLength=%d", r.ContentLength)) + + if config.writeDebug { + log.Println("Query ", r.URL.RawQuery) + } + + if len(r.URL.RawQuery) > 0 { + envs = append(envs, fmt.Sprintf("Http_Query=%s", r.URL.RawQuery)) + } + + if config.writeDebug { + log.Println("Path ", r.URL.Path) + } + + if len(r.URL.Path) > 0 { + envs = append(envs, fmt.Sprintf("Http_Path=%s", r.URL.Path)) + } + + } + + return envs +} + +func lockFilePresent() bool { + path := filepath.Join(os.TempDir(), ".lock") + if _, err := os.Stat(path); os.IsNotExist(err) { + return false + } + return true +} + +func createLockFile() (string, error) { + path := filepath.Join(os.TempDir(), ".lock") + log.Printf("Writing lock-file to: %s\n", path) + writeErr := ioutil.WriteFile(path, []byte{}, 0660) + acceptingConnections = true + + return path, writeErr +} + +func makeHealthHandler() func(http.ResponseWriter, *http.Request) { + return func(w http.ResponseWriter, r *http.Request) { + switch r.Method { + case http.MethodGet: + if acceptingConnections == false || lockFilePresent() == false { + w.WriteHeader(http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusOK) + w.Write([]byte("OK")) + break + default: + w.WriteHeader(http.StatusMethodNotAllowed) + } + } +} + +func makeRequestHandler(config *WatchdogConfig) func(http.ResponseWriter, *http.Request) { + return func(w http.ResponseWriter, r *http.Request) { + switch r.Method { + case + http.MethodPost, + http.MethodPut, + http.MethodDelete, + http.MethodGet: + pipeRequest(config, w, r, r.Method) + break + default: + w.WriteHeader(http.StatusMethodNotAllowed) + + } + } +} diff --git a/watchdog/main.go b/watchdog/main.go index a1d569a8..2baf37ef 100644 --- a/watchdog/main.go +++ b/watchdog/main.go @@ -4,318 +4,31 @@ package main import ( - "bytes" "context" "flag" "fmt" - "io/ioutil" "log" "net/http" "os" - "os/exec" "os/signal" - "path/filepath" - "strings" - "sync" "syscall" "time" "github.com/openfaas/faas/watchdog/types" ) -var acceptingConnections bool - -// buildFunctionInput for a GET method this is an empty byte array. -func buildFunctionInput(config *WatchdogConfig, r *http.Request) ([]byte, error) { - var res []byte - var requestBytes []byte - var err error - - if r.Body == nil { - return res, nil - } - defer r.Body.Close() - - if err != nil { - log.Println(err) - return res, err - } - - requestBytes, err = ioutil.ReadAll(r.Body) - if config.marshalRequest { - marshalRes, marshalErr := types.MarshalRequest(requestBytes, &r.Header) - err = marshalErr - res = marshalRes - } else { - res = requestBytes - } - - return res, err -} - -func debugHeaders(source *http.Header, direction string) { - for k, vv := range *source { - fmt.Printf("[%s] %s=%s\n", direction, k, vv) - } -} - -type requestInfo struct { - headerWritten bool -} - -func pipeRequest(config *WatchdogConfig, w http.ResponseWriter, r *http.Request, method string) { - startTime := time.Now() - - parts := strings.Split(config.faasProcess, " ") - - ri := &requestInfo{} - - if config.debugHeaders { - debugHeaders(&r.Header, "in") - } - - log.Println("Forking fprocess.") - - targetCmd := exec.Command(parts[0], parts[1:]...) - - envs := getAdditionalEnvs(config, r, method) - if len(envs) > 0 { - targetCmd.Env = envs - } - - writer, _ := targetCmd.StdinPipe() - - var out []byte - var err error - var requestBody []byte - - var wg sync.WaitGroup - - wgCount := 2 - - var buildInputErr error - requestBody, buildInputErr = buildFunctionInput(config, r) - if buildInputErr != nil { - ri.headerWritten = true - w.WriteHeader(http.StatusBadRequest) - // I.e. "exit code 1" - w.Write([]byte(buildInputErr.Error())) - - // Verbose message - i.e. stack trace - w.Write([]byte("\n")) - w.Write(out) - - return - } - - wg.Add(wgCount) - - var timer *time.Timer - - if config.execTimeout > 0*time.Second { - timer = time.NewTimer(config.execTimeout) - - go func() { - <-timer.C - log.Printf("Killing process: %s\n", config.faasProcess) - if targetCmd != nil && targetCmd.Process != nil { - ri.headerWritten = true - w.WriteHeader(http.StatusRequestTimeout) - - w.Write([]byte("Killed process.\n")) - - val := targetCmd.Process.Kill() - if val != nil { - log.Printf("Killed process: %s - error %s\n", config.faasProcess, val.Error()) - } - } - }() - } - - // Write to pipe in separate go-routine to prevent blocking - go func() { - defer wg.Done() - writer.Write(requestBody) - writer.Close() - }() - - if config.combineOutput { - // Read the output from stdout/stderr and combine into one variable for output. - go func() { - defer wg.Done() - - out, err = targetCmd.CombinedOutput() - }() - } else { - go func() { - var b bytes.Buffer - targetCmd.Stderr = &b - - defer wg.Done() - - out, err = targetCmd.Output() - if b.Len() > 0 { - log.Printf("stderr: %s", b.Bytes()) - } - b.Reset() - }() - } - - wg.Wait() - if timer != nil { - timer.Stop() - } - - if err != nil { - if config.writeDebug == true { - log.Printf("Success=%t, Error=%s\n", targetCmd.ProcessState.Success(), err.Error()) - log.Printf("Out=%s\n", out) - } - - if ri.headerWritten == false { - w.WriteHeader(http.StatusInternalServerError) - response := bytes.NewBufferString(err.Error()) - w.Write(response.Bytes()) - w.Write([]byte("\n")) - if len(out) > 0 { - w.Write(out) - } - ri.headerWritten = true - } - return - } - - var bytesWritten string - if config.writeDebug == true { - os.Stdout.Write(out) - } else { - bytesWritten = fmt.Sprintf("Wrote %d Bytes", len(out)) - } - - if len(config.contentType) > 0 { - w.Header().Set("Content-Type", config.contentType) - } else { - - // Match content-type of caller if no override specified. - clientContentType := r.Header.Get("Content-Type") - if len(clientContentType) > 0 { - w.Header().Set("Content-Type", clientContentType) - } - } - - execDuration := time.Since(startTime).Seconds() - if ri.headerWritten == false { - w.Header().Set("X-Duration-Seconds", fmt.Sprintf("%f", execDuration)) - ri.headerWritten = true - w.WriteHeader(200) - w.Write(out) - } - - if config.debugHeaders { - header := w.Header() - debugHeaders(&header, "out") - } - - if len(bytesWritten) > 0 { - log.Printf("%s - Duration: %f seconds", bytesWritten, execDuration) - } else { - log.Printf("Duration: %f seconds", execDuration) - } -} - -func getAdditionalEnvs(config *WatchdogConfig, r *http.Request, method string) []string { - var envs []string - - if config.cgiHeaders { - envs = os.Environ() - - for k, v := range r.Header { - kv := fmt.Sprintf("Http_%s=%s", strings.Replace(k, "-", "_", -1), v[0]) - envs = append(envs, kv) - } - - envs = append(envs, fmt.Sprintf("Http_Method=%s", method)) - envs = append(envs, fmt.Sprintf("Http_ContentLength=%d", r.ContentLength)) - - if config.writeDebug { - log.Println("Query ", r.URL.RawQuery) - } - - if len(r.URL.RawQuery) > 0 { - envs = append(envs, fmt.Sprintf("Http_Query=%s", r.URL.RawQuery)) - } - - if config.writeDebug { - log.Println("Path ", r.URL.Path) - } - - if len(r.URL.Path) > 0 { - envs = append(envs, fmt.Sprintf("Http_Path=%s", r.URL.Path)) - } - - } - - return envs -} - -func lockFilePresent() bool { - path := filepath.Join(os.TempDir(), ".lock") - if _, err := os.Stat(path); os.IsNotExist(err) { - return false - } - return true -} - -func createLockFile() (string, error) { - path := filepath.Join(os.TempDir(), ".lock") - log.Printf("Writing lock-file to: %s\n", path) - writeErr := ioutil.WriteFile(path, []byte{}, 0660) - acceptingConnections = true - - return path, writeErr -} - -func makeHealthHandler() func(http.ResponseWriter, *http.Request) { - return func(w http.ResponseWriter, r *http.Request) { - switch r.Method { - case http.MethodGet: - if acceptingConnections == false || lockFilePresent() == false { - w.WriteHeader(http.StatusInternalServerError) - return - } - - w.WriteHeader(http.StatusOK) - w.Write([]byte("OK")) - break - default: - w.WriteHeader(http.StatusMethodNotAllowed) - } - } -} - -func makeRequestHandler(config *WatchdogConfig) func(http.ResponseWriter, *http.Request) { - return func(w http.ResponseWriter, r *http.Request) { - switch r.Method { - case - http.MethodPost, - http.MethodPut, - http.MethodDelete, - http.MethodGet: - pipeRequest(config, w, r, r.Method) - break - default: - w.WriteHeader(http.StatusMethodNotAllowed) - - } - } -} - -var ( - version = flag.Bool("version", false, "Print the version and Git SHA") -) +var version bool func main() { + flag.BoolVar(&version, "version", false, "Print the version and exit") + flag.Parse() + printVersion() + + if version == true { + return + } + acceptingConnections = false osEnv := types.OsEnv{} @@ -327,12 +40,6 @@ func main() { return } - if *version == true { - fmt.Printf("Commit: %v\n", GitCommit) - fmt.Printf("Version: %v\n", BuildVersion()) - return - } - readTimeout := config.readTimeout writeTimeout := config.writeTimeout @@ -390,3 +97,12 @@ func listenUntilShutdown(shutdownTimeout time.Duration, s *http.Server) { <-idleConnsClosed } + +func printVersion() { + sha := "unknown" + if len(GitCommit) > 0 { + sha = GitCommit + } + + log.Printf("Version: %v\tSHA: %v\n", BuildVersion(), sha) +}