mirror of
https://github.com/openfaas/faas.git
synced 2025-06-18 03:56:37 +00:00
Instrument async handlers
- instruments async handler for report and for queueing async requests - make MustRegister only ever run once to prevent sync issues Signed-off-by: Alex Ellis (VMware) <alexellis2@gmail.com>
This commit is contained in:
committed by
Alex Ellis
parent
5a1bdcdb91
commit
fca32a0e79
69
gateway/handlers/queue_proxy.go
Normal file
69
gateway/handlers/queue_proxy.go
Normal file
@ -0,0 +1,69 @@
|
||||
// Copyright (c) Alex Ellis 2017. All rights reserved.
|
||||
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
|
||||
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/url"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/openfaas/faas/gateway/metrics"
|
||||
"github.com/openfaas/faas/gateway/queue"
|
||||
)
|
||||
|
||||
// MakeQueuedProxy accepts work onto a queue
|
||||
func MakeQueuedProxy(metrics metrics.MetricOptions, wildcard bool, canQueueRequests queue.CanQueueRequests, pathTransformer URLPathTransformer) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
defer r.Body.Close()
|
||||
|
||||
body, err := ioutil.ReadAll(r.Body)
|
||||
|
||||
if err != nil {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
w.Write([]byte(err.Error()))
|
||||
return
|
||||
}
|
||||
|
||||
vars := mux.Vars(r)
|
||||
name := vars["name"]
|
||||
|
||||
callbackURLHeader := r.Header.Get("X-Callback-Url")
|
||||
var callbackURL *url.URL
|
||||
|
||||
if len(callbackURLHeader) > 0 {
|
||||
urlVal, urlErr := url.Parse(callbackURLHeader)
|
||||
if urlErr != nil {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
w.Write([]byte(urlErr.Error()))
|
||||
return
|
||||
}
|
||||
|
||||
callbackURL = urlVal
|
||||
}
|
||||
|
||||
req := &queue.Request{
|
||||
Function: name,
|
||||
Body: body,
|
||||
Method: r.Method,
|
||||
QueryString: r.URL.RawQuery,
|
||||
Path: pathTransformer.Transform(r),
|
||||
Header: r.Header,
|
||||
Host: r.Host,
|
||||
CallbackURL: callbackURL,
|
||||
}
|
||||
|
||||
err = canQueueRequests.Queue(req)
|
||||
|
||||
if err != nil {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
w.Write([]byte(err.Error()))
|
||||
fmt.Println(err)
|
||||
return
|
||||
}
|
||||
|
||||
w.WriteHeader(http.StatusAccepted)
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user