Files
faasd/vendor/github.com/openfaas/faas-provider/logs/handler.go
Lucas Roesler 22882e2643 Initial journald log provider attempt
**What**
- journald log provider using exec to journalctl
```
journalctl -t <namespace>:<name>  --output=json --since=<timestamp> <--follow> --output-fields=SYSLOG_IDENTIFIER,MESSAGE,_PID,_SOURCE_REALTIME_TIMESTAMP
```
- This can be tested manually using `faas-cli logs` as normal, e.g.
  `faas-cli logs nodeinfo` should tail the last 5 mins of logs.
- Very basic tests ensuring that the `journalctl` comamand is correctly
  construction and that the json log entrys are parsed correctly.
- Add simple e2e test to grep the function logs

Signed-off-by: Lucas Roesler <roesler.lucas@gmail.com>
2020-03-07 10:11:09 +00:00

145 lines
4.1 KiB
Go
Generated

package logs
import (
"context"
"encoding/json"
"log"
"net/http"
"net/url"
"strconv"
"time"
"github.com/openfaas/faas-provider/httputil"
)
// Requester submits queries the logging system.
// This will be passed to the log handler constructor.
type Requester interface {
// Query submits a log request to the actual logging system.
Query(context.Context, Request) (<-chan Message, error)
}
// NewLogHandlerFunc creates an http HandlerFunc from the supplied log Requestor.
func NewLogHandlerFunc(requestor Requester, timeout time.Duration) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if r.Body != nil {
defer r.Body.Close()
}
cn, ok := w.(http.CloseNotifier)
if !ok {
log.Println("LogHandler: response is not a CloseNotifier, required for streaming response")
http.NotFound(w, r)
return
}
flusher, ok := w.(http.Flusher)
if !ok {
log.Println("LogHandler: response is not a Flusher, required for streaming response")
http.NotFound(w, r)
return
}
logRequest, err := parseRequest(r)
if err != nil {
log.Printf("LogHandler: could not parse request %s", err)
httputil.Errorf(w, http.StatusUnprocessableEntity, "could not parse the log request")
return
}
ctx, cancelQuery := context.WithTimeout(r.Context(), timeout)
defer cancelQuery()
messages, err := requestor.Query(ctx, logRequest)
if err != nil {
// add smarter error handling here
httputil.Errorf(w, http.StatusInternalServerError, "function log request failed")
return
}
// Send the initial headers saying we're gonna stream the response.
w.Header().Set("Connection", "Keep-Alive")
w.Header().Set("Transfer-Encoding", "chunked")
w.Header().Set(http.CanonicalHeaderKey("Content-Type"), "application/x-ndjson")
w.WriteHeader(http.StatusOK)
flusher.Flush()
// ensure that we always try to send the closing chunk, not the inverted order due to how
// the defer stack works. We need two flush statements to ensure that the empty slice is
// sent as its own chunk
defer flusher.Flush()
defer w.Write([]byte{})
defer flusher.Flush()
jsonEncoder := json.NewEncoder(w)
for messages != nil {
select {
case <-cn.CloseNotify():
log.Println("LogHandler: client stopped listening")
return
case msg, ok := <-messages:
if !ok {
log.Println("LogHandler: end of log stream")
messages = nil
return
}
// serialize and write the msg to the http ResponseWriter
err := jsonEncoder.Encode(msg)
if err != nil {
// can't actually write the status header here so we should json serialize an error
// and return that because we have already sent the content type and status code
log.Printf("LogHandler: failed to serialize log message: '%s'\n", msg.String())
log.Println(err.Error())
// write json error message here ?
jsonEncoder.Encode(Message{Text: "failed to serialize log message"})
flusher.Flush()
return
}
flusher.Flush()
}
}
return
}
}
// parseRequest extracts the logRequest from the GET variables or from the POST body
func parseRequest(r *http.Request) (logRequest Request, err error) {
query := r.URL.Query()
logRequest.Name = getValue(query, "name")
logRequest.Namespace = getValue(query, "namespace")
logRequest.Instance = getValue(query, "instance")
tailStr := getValue(query, "tail")
if tailStr != "" {
logRequest.Tail, err = strconv.Atoi(tailStr)
if err != nil {
return logRequest, err
}
}
// ignore error because it will default to false if we can't parse it
logRequest.Follow, _ = strconv.ParseBool(getValue(query, "follow"))
sinceStr := getValue(query, "since")
if sinceStr != "" {
since, err := time.Parse(time.RFC3339, sinceStr)
logRequest.Since = &since
if err != nil {
return logRequest, err
}
}
return logRequest, nil
}
// getValue returns the value for the given key. If the key has more than one value, it returns the
// last value. if the value does not exist, it returns the empty string.
func getValue(queryValues url.Values, name string) string {
values := queryValues[name]
if len(values) == 0 {
return ""
}
return values[len(values)-1]
}