Update queue code for legacy NATS Streaming

NATS Streaming is deprecated and will have no support from
early 2023 by Synadia. Upgrade to OpenFaaS Pro as soon as
possible.

Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alex@openfaas.com>
This commit is contained in:
Alex Ellis (OpenFaaS Ltd) 2022-12-14 11:24:06 +00:00
parent 88bedf78bd
commit 004bbddadb

View File

@ -6,7 +6,6 @@ package handlers
import ( import (
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"log"
"net/http" "net/http"
"net/url" "net/url"
"strings" "strings"
@ -19,8 +18,6 @@ import (
"github.com/openfaas/faas/gateway/scaling" "github.com/openfaas/faas/gateway/scaling"
) )
const queueAnnotation = "com.openfaas.queue"
// MakeQueuedProxy accepts work onto a queue // MakeQueuedProxy accepts work onto a queue
func MakeQueuedProxy(metrics metrics.MetricOptions, queuer ftypes.RequestQueuer, pathTransformer middleware.URLPathTransformer, defaultNS string, functionQuery scaling.FunctionQuery) http.HandlerFunc { func MakeQueuedProxy(metrics metrics.MetricOptions, queuer ftypes.RequestQueuer, pathTransformer middleware.URLPathTransformer, defaultNS string, functionQuery scaling.FunctionQuery) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
@ -44,12 +41,6 @@ func MakeQueuedProxy(metrics metrics.MetricOptions, queuer ftypes.RequestQueuer,
vars := mux.Vars(r) vars := mux.Vars(r)
name := vars["name"] name := vars["name"]
queueName, err := getQueueName(name, functionQuery)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
req := &ftypes.QueueRequest{ req := &ftypes.QueueRequest{
Function: name, Function: name,
Body: body, Body: body,
@ -59,11 +50,6 @@ func MakeQueuedProxy(metrics metrics.MetricOptions, queuer ftypes.RequestQueuer,
Header: r.Header, Header: r.Header,
Host: r.Host, Host: r.Host,
CallbackURL: callbackURL, CallbackURL: callbackURL,
QueueName: queueName,
}
if len(queueName) > 0 {
log.Printf("Queueing %s to: %s\n", name, queueName)
} }
if err = queuer.Queue(req); err != nil { if err = queuer.Queue(req); err != nil {
@ -92,21 +78,6 @@ func getCallbackURLHeader(header http.Header) (*url.URL, error) {
return callbackURL, nil return callbackURL, nil
} }
func getQueueName(name string, fnQuery scaling.FunctionQuery) (queueName string, err error) {
fn, ns := getNameParts(name)
annotations, err := fnQuery.GetAnnotations(fn, ns)
if err != nil {
return "", err
}
queueName = ""
if v := annotations[queueAnnotation]; len(v) > 0 {
queueName = v
}
return queueName, err
}
func getNameParts(name string) (fn, ns string) { func getNameParts(name string) (fn, ns string) {
fn = name fn = name
ns = "" ns = ""