mirror of
https://github.com/openfaas/faas.git
synced 2025-06-18 12:06:37 +00:00
Bump to nats-queue-worker 0.7.0
Includes fix for reconnection bug to NATS Streaming Signed-off-by: Alex Ellis <alexellis2@gmail.com>
This commit is contained in:
2
gateway/vendor/github.com/openfaas/nats-queue-worker/.gitignore
generated
vendored
2
gateway/vendor/github.com/openfaas/nats-queue-worker/.gitignore
generated
vendored
@ -16,3 +16,5 @@
|
||||
.DS_Store
|
||||
|
||||
nats-queue-worker
|
||||
queue-worker
|
||||
|
||||
|
1
gateway/vendor/github.com/openfaas/nats-queue-worker/Dockerfile
generated
vendored
1
gateway/vendor/github.com/openfaas/nats-queue-worker/Dockerfile
generated
vendored
@ -5,6 +5,7 @@ COPY vendor vendor
|
||||
COPY handler handler
|
||||
COPY nats nats
|
||||
COPY main.go .
|
||||
COPY types.go .
|
||||
COPY readconfig.go .
|
||||
COPY readconfig_test.go .
|
||||
COPY auth.go .
|
||||
|
1
gateway/vendor/github.com/openfaas/nats-queue-worker/Dockerfile.arm64
generated
vendored
1
gateway/vendor/github.com/openfaas/nats-queue-worker/Dockerfile.arm64
generated
vendored
@ -5,6 +5,7 @@ COPY vendor vendor
|
||||
COPY handler handler
|
||||
COPY nats nats
|
||||
COPY auth.go .
|
||||
COPY types.go .
|
||||
COPY readconfig.go .
|
||||
COPY readconfig_test.go .
|
||||
COPY main.go .
|
||||
|
1
gateway/vendor/github.com/openfaas/nats-queue-worker/Dockerfile.armhf
generated
vendored
1
gateway/vendor/github.com/openfaas/nats-queue-worker/Dockerfile.armhf
generated
vendored
@ -5,6 +5,7 @@ COPY vendor vendor
|
||||
COPY handler handler
|
||||
COPY nats nats
|
||||
COPY main.go .
|
||||
COPY types.go .
|
||||
COPY readconfig.go .
|
||||
COPY readconfig_test.go .
|
||||
COPY auth.go .
|
||||
|
4
gateway/vendor/github.com/openfaas/nats-queue-worker/auth.go
generated
vendored
4
gateway/vendor/github.com/openfaas/nats-queue-worker/auth.go
generated
vendored
@ -19,7 +19,7 @@ func AddBasicAuth(req *http.Request) error {
|
||||
|
||||
credentials, err := reader.Read()
|
||||
if err != nil {
|
||||
return fmt.Errorf("Unable to read basic auth: %s", err.Error())
|
||||
return fmt.Errorf("unable to read basic auth: %s", err.Error())
|
||||
}
|
||||
|
||||
req.SetBasicAuth(credentials.User, credentials.Password)
|
||||
@ -37,7 +37,7 @@ func LoadCredentials() (*auth.BasicAuthCredentials, error) {
|
||||
|
||||
credentials, err := reader.Read()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Unable to read basic auth: %s", err.Error())
|
||||
return nil, fmt.Errorf("unable to read basic auth: %s", err.Error())
|
||||
}
|
||||
return credentials, nil
|
||||
}
|
||||
|
8
gateway/vendor/github.com/openfaas/nats-queue-worker/handler/nats_queue.go
generated
vendored
8
gateway/vendor/github.com/openfaas/nats-queue-worker/handler/nats_queue.go
generated
vendored
@ -7,7 +7,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/nats-io/go-nats-streaming"
|
||||
stan "github.com/nats-io/go-nats-streaming"
|
||||
"github.com/openfaas/faas/gateway/queue"
|
||||
)
|
||||
|
||||
@ -48,6 +48,8 @@ func (q *NATSQueue) Queue(req *queue.Request) error {
|
||||
}
|
||||
|
||||
func (q *NATSQueue) connect() error {
|
||||
log.Printf("Connect: %s\n", q.NATSURL)
|
||||
|
||||
nc, err := stan.Connect(
|
||||
q.ClusterID,
|
||||
q.ClientID,
|
||||
@ -71,8 +73,10 @@ func (q *NATSQueue) connect() error {
|
||||
}
|
||||
|
||||
func (q *NATSQueue) reconnect() {
|
||||
log.Printf("Reconnect\n")
|
||||
|
||||
for i := 0; i < q.maxReconnect; i++ {
|
||||
time.Sleep(time.Second * time.Duration(i) * q.reconnectDelay)
|
||||
time.Sleep(time.Duration(i) * q.reconnectDelay)
|
||||
|
||||
if err := q.connect(); err == nil {
|
||||
log.Printf("Reconnecting (%d/%d) to %s. OK\n", i+1, q.maxReconnect, q.NATSURL)
|
||||
|
109
gateway/vendor/github.com/openfaas/nats-queue-worker/main.go
generated
vendored
109
gateway/vendor/github.com/openfaas/nats-queue-worker/main.go
generated
vendored
@ -8,58 +8,27 @@ import (
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/signal"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"net/http"
|
||||
|
||||
"github.com/nats-io/go-nats-streaming"
|
||||
stan "github.com/nats-io/go-nats-streaming"
|
||||
"github.com/openfaas/faas-provider/auth"
|
||||
"github.com/openfaas/faas/gateway/queue"
|
||||
"github.com/openfaas/nats-queue-worker/nats"
|
||||
)
|
||||
|
||||
// AsyncReport is the report from a function executed on a queue worker.
|
||||
type AsyncReport struct {
|
||||
FunctionName string `json:"name"`
|
||||
StatusCode int `json:"statusCode"`
|
||||
TimeTaken float64 `json:"timeTaken"`
|
||||
}
|
||||
|
||||
func printMsg(m *stan.Msg, i int) {
|
||||
log.Printf("[#%d] Received on [%s]: '%s'\n", i, m.Subject, m)
|
||||
}
|
||||
|
||||
func makeClient() http.Client {
|
||||
proxyClient := http.Client{
|
||||
Transport: &http.Transport{
|
||||
Proxy: http.ProxyFromEnvironment,
|
||||
DialContext: (&net.Dialer{
|
||||
Timeout: 30 * time.Second,
|
||||
KeepAlive: 0,
|
||||
}).DialContext,
|
||||
MaxIdleConns: 1,
|
||||
DisableKeepAlives: true,
|
||||
IdleConnTimeout: 120 * time.Millisecond,
|
||||
ExpectContinueTimeout: 1500 * time.Millisecond,
|
||||
},
|
||||
}
|
||||
return proxyClient
|
||||
}
|
||||
|
||||
func main() {
|
||||
readConfig := ReadConfig{}
|
||||
config := readConfig.Read()
|
||||
log.SetFlags(0)
|
||||
|
||||
clusterID := "faas-cluster"
|
||||
val, _ := os.Hostname()
|
||||
clientID := "faas-worker-" + nats.GetClientID(val)
|
||||
hostname, _ := os.Hostname()
|
||||
|
||||
var durable string
|
||||
var qgroup string
|
||||
var unsubscribe bool
|
||||
var credentials *auth.BasicAuthCredentials
|
||||
var err error
|
||||
@ -73,18 +42,12 @@ func main() {
|
||||
}
|
||||
|
||||
client := makeClient()
|
||||
sc, err := stan.Connect(clusterID, clientID, stan.NatsURL("nats://"+config.NatsAddress+":4222"))
|
||||
if err != nil {
|
||||
log.Fatalf("Can't connect to %s: %v\n", "nats://"+config.NatsAddress+":4222", err)
|
||||
}
|
||||
|
||||
startOpt := stan.StartWithLastReceived()
|
||||
|
||||
i := 0
|
||||
mcb := func(msg *stan.Msg) {
|
||||
messageHandler := func(msg *stan.Msg) {
|
||||
i++
|
||||
|
||||
printMsg(msg, i)
|
||||
log.Printf("[#%d] Received on [%s]: '%s'\n", i, msg.Subject, msg)
|
||||
|
||||
started := time.Now()
|
||||
|
||||
@ -196,16 +159,30 @@ func main() {
|
||||
}
|
||||
}
|
||||
|
||||
subj := "faas-request"
|
||||
qgroup = "faas"
|
||||
natsURL := "nats://" + config.NatsAddress + ":4222"
|
||||
|
||||
log.Println("Wait for ", config.AckWait)
|
||||
sub, err := sc.QueueSubscribe(subj, qgroup, mcb, startOpt, stan.DurableName(durable), stan.MaxInflight(config.MaxInflight), stan.AckWait(config.AckWait))
|
||||
if err != nil {
|
||||
log.Panicln(err)
|
||||
natsQueue := NATSQueue{
|
||||
clusterID: "faas-cluster",
|
||||
clientID: "faas-worker-" + nats.GetClientID(hostname),
|
||||
natsURL: natsURL,
|
||||
|
||||
connMutex: &sync.RWMutex{},
|
||||
maxReconnect: config.MaxReconnect,
|
||||
reconnectDelay: config.ReconnectDelay,
|
||||
quitCh: make(chan struct{}),
|
||||
|
||||
subject: "faas-request",
|
||||
qgroup: "faas",
|
||||
durable: durable,
|
||||
messageHandler: messageHandler,
|
||||
startOption: stan.StartWithLastReceived(),
|
||||
maxInFlight: stan.MaxInflight(config.MaxInflight),
|
||||
ackWait: config.AckWait,
|
||||
}
|
||||
|
||||
log.Printf("Listening on [%s], clientID=[%s], qgroup=[%s] durable=[%s]\n", subj, clientID, qgroup, durable)
|
||||
if initErr := natsQueue.connect(); initErr != nil {
|
||||
log.Panic(initErr)
|
||||
}
|
||||
|
||||
// Wait for a SIGINT (perhaps triggered by user with CTRL-C)
|
||||
// Run cleanup when signal is received
|
||||
@ -217,15 +194,43 @@ func main() {
|
||||
fmt.Printf("\nReceived an interrupt, unsubscribing and closing connection...\n\n")
|
||||
// Do not unsubscribe a durable on exit, except if asked to.
|
||||
if durable == "" || unsubscribe {
|
||||
sub.Unsubscribe()
|
||||
if err := natsQueue.unsubscribe(); err != nil {
|
||||
log.Panicf(
|
||||
"Cannot unsubscribe subject: %s from %s because of an error: %v",
|
||||
natsQueue.subject,
|
||||
natsQueue.natsURL,
|
||||
err,
|
||||
)
|
||||
}
|
||||
}
|
||||
if err := natsQueue.closeConnection(); err != nil {
|
||||
log.Panicf("Cannot close connection to %s because of an error: %v\n", natsQueue.natsURL, err)
|
||||
}
|
||||
sc.Close()
|
||||
cleanupDone <- true
|
||||
}
|
||||
}()
|
||||
<-cleanupDone
|
||||
}
|
||||
|
||||
// makeClient constructs a HTTP client with keep-alive turned
|
||||
// off and a dial-timeout of 30 seconds.
|
||||
func makeClient() http.Client {
|
||||
proxyClient := http.Client{
|
||||
Transport: &http.Transport{
|
||||
Proxy: http.ProxyFromEnvironment,
|
||||
DialContext: (&net.Dialer{
|
||||
Timeout: 30 * time.Second,
|
||||
KeepAlive: 0,
|
||||
}).DialContext,
|
||||
MaxIdleConns: 1,
|
||||
DisableKeepAlives: true,
|
||||
IdleConnTimeout: 120 * time.Millisecond,
|
||||
ExpectContinueTimeout: 1500 * time.Millisecond,
|
||||
},
|
||||
}
|
||||
return proxyClient
|
||||
}
|
||||
|
||||
func postResult(client *http.Client, functionRes *http.Response, result []byte, callbackURL string, xCallID string, statusCode int) (int, error) {
|
||||
var reader io.Reader
|
||||
|
||||
|
3
gateway/vendor/github.com/openfaas/nats-queue-worker/nats/client.go
generated
vendored
3
gateway/vendor/github.com/openfaas/nats-queue-worker/nats/client.go
generated
vendored
@ -3,6 +3,7 @@ package nats
|
||||
import "regexp"
|
||||
|
||||
var supportedCharacters = regexp.MustCompile("[^a-zA-Z0-9-_]+")
|
||||
|
||||
func GetClientID(value string) string {
|
||||
return supportedCharacters.ReplaceAllString(value, "_")
|
||||
}
|
||||
}
|
||||
|
31
gateway/vendor/github.com/openfaas/nats-queue-worker/readconfig.go
generated
vendored
31
gateway/vendor/github.com/openfaas/nats-queue-worker/readconfig.go
generated
vendored
@ -11,6 +11,10 @@ import (
|
||||
type ReadConfig struct {
|
||||
}
|
||||
|
||||
const DefaultMaxReconnect = 120
|
||||
|
||||
const DefaultReconnectDelay = time.Second * 2
|
||||
|
||||
func (ReadConfig) Read() QueueWorkerConfig {
|
||||
cfg := QueueWorkerConfig{
|
||||
AckWait: time.Second * 30,
|
||||
@ -58,6 +62,31 @@ func (ReadConfig) Read() QueueWorkerConfig {
|
||||
}
|
||||
}
|
||||
|
||||
cfg.MaxReconnect = DefaultMaxReconnect
|
||||
|
||||
if value, exists := os.LookupEnv("faas_max_reconnect"); exists {
|
||||
val, err := strconv.Atoi(value)
|
||||
|
||||
if err != nil {
|
||||
log.Println("converting faas_max_reconnect to int error:", err)
|
||||
} else {
|
||||
cfg.MaxReconnect = val
|
||||
}
|
||||
}
|
||||
|
||||
cfg.ReconnectDelay = DefaultReconnectDelay
|
||||
|
||||
if value, exists := os.LookupEnv("faas_reconnect_delay"); exists {
|
||||
reconnectDelayVal, durationErr := time.ParseDuration(value)
|
||||
|
||||
if durationErr != nil {
|
||||
log.Println("parse env var: faas_reconnect_delay as time.Duration error:", durationErr)
|
||||
|
||||
} else {
|
||||
cfg.ReconnectDelay = reconnectDelayVal
|
||||
}
|
||||
}
|
||||
|
||||
if val, exists := os.LookupEnv("ack_wait"); exists {
|
||||
ackWaitVal, durationErr := time.ParseDuration(val)
|
||||
if durationErr != nil {
|
||||
@ -78,4 +107,6 @@ type QueueWorkerConfig struct {
|
||||
WriteDebug bool
|
||||
MaxInflight int
|
||||
AckWait time.Duration
|
||||
MaxReconnect int
|
||||
ReconnectDelay time.Duration
|
||||
}
|
||||
|
142
gateway/vendor/github.com/openfaas/nats-queue-worker/types.go
generated
vendored
Normal file
142
gateway/vendor/github.com/openfaas/nats-queue-worker/types.go
generated
vendored
Normal file
@ -0,0 +1,142 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
stan "github.com/nats-io/go-nats-streaming"
|
||||
)
|
||||
|
||||
// AsyncReport is the report from a function executed on a queue worker.
|
||||
type AsyncReport struct {
|
||||
FunctionName string `json:"name"`
|
||||
StatusCode int `json:"statusCode"`
|
||||
TimeTaken float64 `json:"timeTaken"`
|
||||
}
|
||||
|
||||
// NATSQueue represents a subscription to NATS Streaming
|
||||
type NATSQueue struct {
|
||||
clusterID string
|
||||
clientID string
|
||||
natsURL string
|
||||
|
||||
maxReconnect int
|
||||
reconnectDelay time.Duration
|
||||
conn stan.Conn
|
||||
connMutex *sync.RWMutex
|
||||
quitCh chan struct{}
|
||||
|
||||
subject string
|
||||
qgroup string
|
||||
durable string
|
||||
ackWait time.Duration
|
||||
messageHandler func(*stan.Msg)
|
||||
startOption stan.SubscriptionOption
|
||||
maxInFlight stan.SubscriptionOption
|
||||
subscription stan.Subscription
|
||||
}
|
||||
|
||||
// connect creates a subscription to NATS Streaming
|
||||
func (q *NATSQueue) connect() error {
|
||||
log.Printf("Connect: %s\n", q.natsURL)
|
||||
|
||||
nc, err := stan.Connect(
|
||||
q.clusterID,
|
||||
q.clientID,
|
||||
stan.NatsURL(q.natsURL),
|
||||
stan.SetConnectionLostHandler(func(conn stan.Conn, err error) {
|
||||
log.Printf("Disconnected from %s\n", q.natsURL)
|
||||
|
||||
q.reconnect()
|
||||
}),
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't connect to %s: %v", q.natsURL, err)
|
||||
}
|
||||
|
||||
q.connMutex.Lock()
|
||||
defer q.connMutex.Unlock()
|
||||
|
||||
q.conn = nc
|
||||
|
||||
log.Printf("Subscribing to: %s at %s\n", q.subject, q.natsURL)
|
||||
log.Println("Wait for ", q.ackWait)
|
||||
|
||||
subscription, err := q.conn.QueueSubscribe(
|
||||
q.subject,
|
||||
q.qgroup,
|
||||
q.messageHandler,
|
||||
stan.DurableName(q.durable),
|
||||
stan.AckWait(q.ackWait),
|
||||
q.startOption,
|
||||
q.maxInFlight,
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("couldn't subscribe to %s at %s. Error: %v", q.subject, q.natsURL, err)
|
||||
}
|
||||
|
||||
log.Printf(
|
||||
"Listening on [%s], clientID=[%s], qgroup=[%s] durable=[%s]\n",
|
||||
q.subject,
|
||||
q.clientID,
|
||||
q.qgroup,
|
||||
q.durable,
|
||||
)
|
||||
|
||||
q.subscription = subscription
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (q *NATSQueue) reconnect() {
|
||||
log.Printf("Reconnect\n")
|
||||
|
||||
for i := 0; i < q.maxReconnect; i++ {
|
||||
select {
|
||||
case <-time.After(time.Duration(i) * q.reconnectDelay):
|
||||
if err := q.connect(); err == nil {
|
||||
log.Printf("Reconnecting (%d/%d) to %s succeeded\n", i+1, q.maxReconnect, q.natsURL)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
nextTryIn := (time.Duration(i+1) * q.reconnectDelay).String()
|
||||
|
||||
log.Printf("Reconnecting (%d/%d) to %s failed\n", i+1, q.maxReconnect, q.natsURL)
|
||||
log.Printf("Waiting %s before next try", nextTryIn)
|
||||
case <-q.quitCh:
|
||||
log.Println("Received signal to stop reconnecting...")
|
||||
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
log.Printf("Reconnecting limit (%d) reached\n", q.maxReconnect)
|
||||
}
|
||||
|
||||
func (q *NATSQueue) unsubscribe() error {
|
||||
q.connMutex.Lock()
|
||||
defer q.connMutex.Unlock()
|
||||
|
||||
if q.subscription != nil {
|
||||
return fmt.Errorf("q.subscription is nil")
|
||||
}
|
||||
|
||||
return q.subscription.Unsubscribe()
|
||||
}
|
||||
|
||||
func (q *NATSQueue) closeConnection() error {
|
||||
q.connMutex.Lock()
|
||||
defer q.connMutex.Unlock()
|
||||
|
||||
if q.conn == nil {
|
||||
return fmt.Errorf("q.conn is nil")
|
||||
}
|
||||
|
||||
close(q.quitCh)
|
||||
|
||||
return q.conn.Close()
|
||||
}
|
Reference in New Issue
Block a user