diff --git a/gateway/Gopkg.lock b/gateway/Gopkg.lock index a68d160b..4b03d63b 100644 --- a/gateway/Gopkg.lock +++ b/gateway/Gopkg.lock @@ -3,128 +3,168 @@ [[projects]] branch = "master" + digest = "1:0c5485088ce274fac2e931c1b979f2619345097b39d91af3239977114adf0320" name = "github.com/beorn7/perks" packages = ["quantile"] + pruneopts = "" revision = "4c0e84591b9aa9e6dcfdf3e020114cd81f89d5f9" [[projects]] + digest = "1:a9e4ff75555e4500e409dc87c1d708b090bb8dd77f889bbf266773f3dc23af70" name = "github.com/docker/distribution" packages = ["uuid"] + pruneopts = "" revision = "48294d928ced5dd9b378f7fd7c6f5da3ff3f2c89" version = "v2.6.2" [[projects]] + digest = "1:70a80170917a15e1ff02faab5f9e716e945e0676e86599ba144d38f96e30c3bf" name = "github.com/gogo/protobuf" packages = [ "gogoproto", "proto", - "protoc-gen-gogo/descriptor" + "protoc-gen-gogo/descriptor", ] + pruneopts = "" revision = "342cbe0a04158f6dcb03ca0079991a51a4248c02" version = "v0.5" [[projects]] branch = "master" + digest = "1:3b760d3b93f994df8eb1d9ebfad17d3e9e37edcb7f7efaa15b427c0d7a64f4e4" name = "github.com/golang/protobuf" packages = ["proto"] + pruneopts = "" revision = "1e59b77b52bf8e4b449a57e6f79f21226d571845" [[projects]] + digest = "1:20ed7daa9b3b38b6d1d39b48ab3fd31122be5419461470d0c28de3e121c93ecf" name = "github.com/gorilla/context" packages = ["."] + pruneopts = "" revision = "1ea25387ff6f684839d82767c1733ff4d4d15d0a" version = "v1.1" [[projects]] + digest = "1:c2c8666b4836c81a1d247bdf21c6a6fc1ab586538ab56f74437c2e0df5c375e1" name = "github.com/gorilla/mux" packages = ["."] + pruneopts = "" revision = "e3702bed27f0d39777b0b37b664b6280e8ef8fbf" version = "v1.6.2" [[projects]] + digest = "1:4c23ced97a470b17d9ffd788310502a077b9c1f60221a85563e49696276b4147" name = "github.com/matttproud/golang_protobuf_extensions" packages = ["pbutil"] + pruneopts = "" revision = "3247c84500bff8d9fb6d579d800f20b3e091582c" version = "v1.0.0" [[projects]] + digest = "1:665af347df4c5d1ae4c3eacd0754f5337a301f6a3f2444c9993b996605c8c02b" name = "github.com/nats-io/go-nats" packages = [ ".", "encoders/builtin", - "util" + "util", ] + pruneopts = "" revision = "062418ea1c2181f52dc0f954f6204370519a868b" version = "v1.5.0" [[projects]] + digest = "1:ca0ffe35a7afa99d614845251f508505188d4117b87d368ee2f4b84c85f3fd29" name = "github.com/nats-io/go-nats-streaming" packages = [ ".", - "pb" + "pb", ] + pruneopts = "" revision = "e15a53f85e4932540600a16b56f6c4f65f58176f" version = "v0.4.0" [[projects]] + digest = "1:be61e8224b84064109eaba8157cbb4bbe6ca12443e182b6624fdfa1c0dcf53d9" name = "github.com/nats-io/nuid" packages = ["."] + pruneopts = "" revision = "289cccf02c178dc782430d534e3c1f5b72af807f" version = "v1.0.0" [[projects]] + digest = "1:6349b4be853e8be701de22e7d081c433447d185c0eeb8651d67bfc03c344f3fb" name = "github.com/openfaas/faas-provider" packages = ["auth"] + pruneopts = "" revision = "220324e98f5db5aa61f02d1ab13f03e91310796c" version = "0.8.1" [[projects]] + digest = "1:c91d031a0f53699e18f204e8a8d360d400a34686a3bb34d82c63a53ff1d73cea" name = "github.com/openfaas/nats-queue-worker" packages = [ "handler", - "nats" + "nats", ] - revision = "1d731056d1729865034b4598f44031eab7f76adb" - version = "0.6.0" + pruneopts = "" + revision = "d6f08acb3541ea3b1c0c29a44f13a18330e2fb29" + version = "0.7.0" [[projects]] + digest = "1:6f218995d6a74636cfcab45ce03005371e682b4b9bee0e5eb0ccfd83ef85364f" name = "github.com/prometheus/client_golang" packages = [ "prometheus", "prometheus/internal", - "prometheus/promhttp" + "prometheus/promhttp", ] + pruneopts = "" revision = "505eaef017263e299324067d40ca2c48f6a2cf50" version = "v0.9.2" [[projects]] branch = "master" + digest = "1:60aca47f4eeeb972f1b9da7e7db51dee15ff6c59f7b401c1588b8e6771ba15ef" name = "github.com/prometheus/client_model" packages = ["go"] + pruneopts = "" revision = "99fa1f4be8e564e8a6b613da7fa6f46c9edafc6c" [[projects]] branch = "master" + digest = "1:e3aa5178be4fc4ae8cdb37d11c02f7490c00450a9f419e6aa84d02d3b47e90d2" name = "github.com/prometheus/common" packages = [ "expfmt", "internal/bitbucket.org/ww/goautoneg", - "model" + "model", ] + pruneopts = "" revision = "2e54d0b93cba2fd133edc32211dcc32c06ef72ca" [[projects]] branch = "master" + digest = "1:f0857d075687b4ddebb10c8403d5fec9f093f7208b34ed5b6f3101ee2e77cec5" name = "github.com/prometheus/procfs" packages = [ ".", - "xfs" + "xfs", ] + pruneopts = "" revision = "b15cd069a83443be3154b719d0cc9fe8117f09fb" [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "b2b3291c34b853085074ceb72c897de6bb7baec0b0193da42a5d0850e5280133" + input-imports = [ + "github.com/docker/distribution/uuid", + "github.com/gorilla/mux", + "github.com/openfaas/faas-provider/auth", + "github.com/openfaas/nats-queue-worker/handler", + "github.com/prometheus/client_golang/prometheus", + "github.com/prometheus/client_golang/prometheus/promhttp", + "github.com/prometheus/client_model/go", + ] solver-name = "gps-cdcl" solver-version = 1 diff --git a/gateway/Gopkg.toml b/gateway/Gopkg.toml index 6efdebaf..a7f9c8a6 100644 --- a/gateway/Gopkg.toml +++ b/gateway/Gopkg.toml @@ -14,7 +14,7 @@ ignored = ["github.com/openfaas/faas/gateway/queue"] [[constraint]] name = "github.com/openfaas/nats-queue-worker" - version = "0.6.0" + version = "0.7.0" [[constraint]] name = "github.com/prometheus/client_golang" diff --git a/gateway/vendor/github.com/openfaas/nats-queue-worker/.gitignore b/gateway/vendor/github.com/openfaas/nats-queue-worker/.gitignore index 02fffa44..eadb000f 100644 --- a/gateway/vendor/github.com/openfaas/nats-queue-worker/.gitignore +++ b/gateway/vendor/github.com/openfaas/nats-queue-worker/.gitignore @@ -16,3 +16,5 @@ .DS_Store nats-queue-worker +queue-worker + diff --git a/gateway/vendor/github.com/openfaas/nats-queue-worker/Dockerfile b/gateway/vendor/github.com/openfaas/nats-queue-worker/Dockerfile index 656f0f3a..9fdb79a3 100644 --- a/gateway/vendor/github.com/openfaas/nats-queue-worker/Dockerfile +++ b/gateway/vendor/github.com/openfaas/nats-queue-worker/Dockerfile @@ -5,6 +5,7 @@ COPY vendor vendor COPY handler handler COPY nats nats COPY main.go . +COPY types.go . COPY readconfig.go . COPY readconfig_test.go . COPY auth.go . diff --git a/gateway/vendor/github.com/openfaas/nats-queue-worker/Dockerfile.arm64 b/gateway/vendor/github.com/openfaas/nats-queue-worker/Dockerfile.arm64 index b214dd00..8863ac10 100644 --- a/gateway/vendor/github.com/openfaas/nats-queue-worker/Dockerfile.arm64 +++ b/gateway/vendor/github.com/openfaas/nats-queue-worker/Dockerfile.arm64 @@ -5,6 +5,7 @@ COPY vendor vendor COPY handler handler COPY nats nats COPY auth.go . +COPY types.go . COPY readconfig.go . COPY readconfig_test.go . COPY main.go . diff --git a/gateway/vendor/github.com/openfaas/nats-queue-worker/Dockerfile.armhf b/gateway/vendor/github.com/openfaas/nats-queue-worker/Dockerfile.armhf index 2d9b16c9..bf7b85c3 100644 --- a/gateway/vendor/github.com/openfaas/nats-queue-worker/Dockerfile.armhf +++ b/gateway/vendor/github.com/openfaas/nats-queue-worker/Dockerfile.armhf @@ -5,6 +5,7 @@ COPY vendor vendor COPY handler handler COPY nats nats COPY main.go . +COPY types.go . COPY readconfig.go . COPY readconfig_test.go . COPY auth.go . diff --git a/gateway/vendor/github.com/openfaas/nats-queue-worker/auth.go b/gateway/vendor/github.com/openfaas/nats-queue-worker/auth.go index d52f9637..27015e73 100644 --- a/gateway/vendor/github.com/openfaas/nats-queue-worker/auth.go +++ b/gateway/vendor/github.com/openfaas/nats-queue-worker/auth.go @@ -19,7 +19,7 @@ func AddBasicAuth(req *http.Request) error { credentials, err := reader.Read() if err != nil { - return fmt.Errorf("Unable to read basic auth: %s", err.Error()) + return fmt.Errorf("unable to read basic auth: %s", err.Error()) } req.SetBasicAuth(credentials.User, credentials.Password) @@ -37,7 +37,7 @@ func LoadCredentials() (*auth.BasicAuthCredentials, error) { credentials, err := reader.Read() if err != nil { - return nil, fmt.Errorf("Unable to read basic auth: %s", err.Error()) + return nil, fmt.Errorf("unable to read basic auth: %s", err.Error()) } return credentials, nil } diff --git a/gateway/vendor/github.com/openfaas/nats-queue-worker/handler/nats_queue.go b/gateway/vendor/github.com/openfaas/nats-queue-worker/handler/nats_queue.go index 2881524e..13df6fdb 100644 --- a/gateway/vendor/github.com/openfaas/nats-queue-worker/handler/nats_queue.go +++ b/gateway/vendor/github.com/openfaas/nats-queue-worker/handler/nats_queue.go @@ -7,7 +7,7 @@ import ( "sync" "time" - "github.com/nats-io/go-nats-streaming" + stan "github.com/nats-io/go-nats-streaming" "github.com/openfaas/faas/gateway/queue" ) @@ -48,6 +48,8 @@ func (q *NATSQueue) Queue(req *queue.Request) error { } func (q *NATSQueue) connect() error { + log.Printf("Connect: %s\n", q.NATSURL) + nc, err := stan.Connect( q.ClusterID, q.ClientID, @@ -71,8 +73,10 @@ func (q *NATSQueue) connect() error { } func (q *NATSQueue) reconnect() { + log.Printf("Reconnect\n") + for i := 0; i < q.maxReconnect; i++ { - time.Sleep(time.Second * time.Duration(i) * q.reconnectDelay) + time.Sleep(time.Duration(i) * q.reconnectDelay) if err := q.connect(); err == nil { log.Printf("Reconnecting (%d/%d) to %s. OK\n", i+1, q.maxReconnect, q.NATSURL) diff --git a/gateway/vendor/github.com/openfaas/nats-queue-worker/main.go b/gateway/vendor/github.com/openfaas/nats-queue-worker/main.go index 6a1b0244..6648ae9f 100644 --- a/gateway/vendor/github.com/openfaas/nats-queue-worker/main.go +++ b/gateway/vendor/github.com/openfaas/nats-queue-worker/main.go @@ -8,58 +8,27 @@ import ( "io/ioutil" "log" "net" + "net/http" "os" "os/signal" "strings" + "sync" "time" - "net/http" - - "github.com/nats-io/go-nats-streaming" + stan "github.com/nats-io/go-nats-streaming" "github.com/openfaas/faas-provider/auth" "github.com/openfaas/faas/gateway/queue" "github.com/openfaas/nats-queue-worker/nats" ) -// AsyncReport is the report from a function executed on a queue worker. -type AsyncReport struct { - FunctionName string `json:"name"` - StatusCode int `json:"statusCode"` - TimeTaken float64 `json:"timeTaken"` -} - -func printMsg(m *stan.Msg, i int) { - log.Printf("[#%d] Received on [%s]: '%s'\n", i, m.Subject, m) -} - -func makeClient() http.Client { - proxyClient := http.Client{ - Transport: &http.Transport{ - Proxy: http.ProxyFromEnvironment, - DialContext: (&net.Dialer{ - Timeout: 30 * time.Second, - KeepAlive: 0, - }).DialContext, - MaxIdleConns: 1, - DisableKeepAlives: true, - IdleConnTimeout: 120 * time.Millisecond, - ExpectContinueTimeout: 1500 * time.Millisecond, - }, - } - return proxyClient -} - func main() { readConfig := ReadConfig{} config := readConfig.Read() log.SetFlags(0) - clusterID := "faas-cluster" - val, _ := os.Hostname() - clientID := "faas-worker-" + nats.GetClientID(val) + hostname, _ := os.Hostname() var durable string - var qgroup string var unsubscribe bool var credentials *auth.BasicAuthCredentials var err error @@ -73,18 +42,12 @@ func main() { } client := makeClient() - sc, err := stan.Connect(clusterID, clientID, stan.NatsURL("nats://"+config.NatsAddress+":4222")) - if err != nil { - log.Fatalf("Can't connect to %s: %v\n", "nats://"+config.NatsAddress+":4222", err) - } - - startOpt := stan.StartWithLastReceived() i := 0 - mcb := func(msg *stan.Msg) { + messageHandler := func(msg *stan.Msg) { i++ - printMsg(msg, i) + log.Printf("[#%d] Received on [%s]: '%s'\n", i, msg.Subject, msg) started := time.Now() @@ -196,16 +159,30 @@ func main() { } } - subj := "faas-request" - qgroup = "faas" + natsURL := "nats://" + config.NatsAddress + ":4222" - log.Println("Wait for ", config.AckWait) - sub, err := sc.QueueSubscribe(subj, qgroup, mcb, startOpt, stan.DurableName(durable), stan.MaxInflight(config.MaxInflight), stan.AckWait(config.AckWait)) - if err != nil { - log.Panicln(err) + natsQueue := NATSQueue{ + clusterID: "faas-cluster", + clientID: "faas-worker-" + nats.GetClientID(hostname), + natsURL: natsURL, + + connMutex: &sync.RWMutex{}, + maxReconnect: config.MaxReconnect, + reconnectDelay: config.ReconnectDelay, + quitCh: make(chan struct{}), + + subject: "faas-request", + qgroup: "faas", + durable: durable, + messageHandler: messageHandler, + startOption: stan.StartWithLastReceived(), + maxInFlight: stan.MaxInflight(config.MaxInflight), + ackWait: config.AckWait, } - log.Printf("Listening on [%s], clientID=[%s], qgroup=[%s] durable=[%s]\n", subj, clientID, qgroup, durable) + if initErr := natsQueue.connect(); initErr != nil { + log.Panic(initErr) + } // Wait for a SIGINT (perhaps triggered by user with CTRL-C) // Run cleanup when signal is received @@ -217,15 +194,43 @@ func main() { fmt.Printf("\nReceived an interrupt, unsubscribing and closing connection...\n\n") // Do not unsubscribe a durable on exit, except if asked to. if durable == "" || unsubscribe { - sub.Unsubscribe() + if err := natsQueue.unsubscribe(); err != nil { + log.Panicf( + "Cannot unsubscribe subject: %s from %s because of an error: %v", + natsQueue.subject, + natsQueue.natsURL, + err, + ) + } + } + if err := natsQueue.closeConnection(); err != nil { + log.Panicf("Cannot close connection to %s because of an error: %v\n", natsQueue.natsURL, err) } - sc.Close() cleanupDone <- true } }() <-cleanupDone } +// makeClient constructs a HTTP client with keep-alive turned +// off and a dial-timeout of 30 seconds. +func makeClient() http.Client { + proxyClient := http.Client{ + Transport: &http.Transport{ + Proxy: http.ProxyFromEnvironment, + DialContext: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 0, + }).DialContext, + MaxIdleConns: 1, + DisableKeepAlives: true, + IdleConnTimeout: 120 * time.Millisecond, + ExpectContinueTimeout: 1500 * time.Millisecond, + }, + } + return proxyClient +} + func postResult(client *http.Client, functionRes *http.Response, result []byte, callbackURL string, xCallID string, statusCode int) (int, error) { var reader io.Reader diff --git a/gateway/vendor/github.com/openfaas/nats-queue-worker/nats/client.go b/gateway/vendor/github.com/openfaas/nats-queue-worker/nats/client.go index d494d474..82c404b7 100644 --- a/gateway/vendor/github.com/openfaas/nats-queue-worker/nats/client.go +++ b/gateway/vendor/github.com/openfaas/nats-queue-worker/nats/client.go @@ -3,6 +3,7 @@ package nats import "regexp" var supportedCharacters = regexp.MustCompile("[^a-zA-Z0-9-_]+") + func GetClientID(value string) string { return supportedCharacters.ReplaceAllString(value, "_") -} \ No newline at end of file +} diff --git a/gateway/vendor/github.com/openfaas/nats-queue-worker/readconfig.go b/gateway/vendor/github.com/openfaas/nats-queue-worker/readconfig.go index 2473e9a4..00b532a3 100644 --- a/gateway/vendor/github.com/openfaas/nats-queue-worker/readconfig.go +++ b/gateway/vendor/github.com/openfaas/nats-queue-worker/readconfig.go @@ -11,6 +11,10 @@ import ( type ReadConfig struct { } +const DefaultMaxReconnect = 120 + +const DefaultReconnectDelay = time.Second * 2 + func (ReadConfig) Read() QueueWorkerConfig { cfg := QueueWorkerConfig{ AckWait: time.Second * 30, @@ -58,6 +62,31 @@ func (ReadConfig) Read() QueueWorkerConfig { } } + cfg.MaxReconnect = DefaultMaxReconnect + + if value, exists := os.LookupEnv("faas_max_reconnect"); exists { + val, err := strconv.Atoi(value) + + if err != nil { + log.Println("converting faas_max_reconnect to int error:", err) + } else { + cfg.MaxReconnect = val + } + } + + cfg.ReconnectDelay = DefaultReconnectDelay + + if value, exists := os.LookupEnv("faas_reconnect_delay"); exists { + reconnectDelayVal, durationErr := time.ParseDuration(value) + + if durationErr != nil { + log.Println("parse env var: faas_reconnect_delay as time.Duration error:", durationErr) + + } else { + cfg.ReconnectDelay = reconnectDelayVal + } + } + if val, exists := os.LookupEnv("ack_wait"); exists { ackWaitVal, durationErr := time.ParseDuration(val) if durationErr != nil { @@ -78,4 +107,6 @@ type QueueWorkerConfig struct { WriteDebug bool MaxInflight int AckWait time.Duration + MaxReconnect int + ReconnectDelay time.Duration } diff --git a/gateway/vendor/github.com/openfaas/nats-queue-worker/types.go b/gateway/vendor/github.com/openfaas/nats-queue-worker/types.go new file mode 100644 index 00000000..d5e19403 --- /dev/null +++ b/gateway/vendor/github.com/openfaas/nats-queue-worker/types.go @@ -0,0 +1,142 @@ +package main + +import ( + "fmt" + "log" + "sync" + "time" + + stan "github.com/nats-io/go-nats-streaming" +) + +// AsyncReport is the report from a function executed on a queue worker. +type AsyncReport struct { + FunctionName string `json:"name"` + StatusCode int `json:"statusCode"` + TimeTaken float64 `json:"timeTaken"` +} + +// NATSQueue represents a subscription to NATS Streaming +type NATSQueue struct { + clusterID string + clientID string + natsURL string + + maxReconnect int + reconnectDelay time.Duration + conn stan.Conn + connMutex *sync.RWMutex + quitCh chan struct{} + + subject string + qgroup string + durable string + ackWait time.Duration + messageHandler func(*stan.Msg) + startOption stan.SubscriptionOption + maxInFlight stan.SubscriptionOption + subscription stan.Subscription +} + +// connect creates a subscription to NATS Streaming +func (q *NATSQueue) connect() error { + log.Printf("Connect: %s\n", q.natsURL) + + nc, err := stan.Connect( + q.clusterID, + q.clientID, + stan.NatsURL(q.natsURL), + stan.SetConnectionLostHandler(func(conn stan.Conn, err error) { + log.Printf("Disconnected from %s\n", q.natsURL) + + q.reconnect() + }), + ) + if err != nil { + return fmt.Errorf("can't connect to %s: %v", q.natsURL, err) + } + + q.connMutex.Lock() + defer q.connMutex.Unlock() + + q.conn = nc + + log.Printf("Subscribing to: %s at %s\n", q.subject, q.natsURL) + log.Println("Wait for ", q.ackWait) + + subscription, err := q.conn.QueueSubscribe( + q.subject, + q.qgroup, + q.messageHandler, + stan.DurableName(q.durable), + stan.AckWait(q.ackWait), + q.startOption, + q.maxInFlight, + ) + + if err != nil { + return fmt.Errorf("couldn't subscribe to %s at %s. Error: %v", q.subject, q.natsURL, err) + } + + log.Printf( + "Listening on [%s], clientID=[%s], qgroup=[%s] durable=[%s]\n", + q.subject, + q.clientID, + q.qgroup, + q.durable, + ) + + q.subscription = subscription + + return nil +} + +func (q *NATSQueue) reconnect() { + log.Printf("Reconnect\n") + + for i := 0; i < q.maxReconnect; i++ { + select { + case <-time.After(time.Duration(i) * q.reconnectDelay): + if err := q.connect(); err == nil { + log.Printf("Reconnecting (%d/%d) to %s succeeded\n", i+1, q.maxReconnect, q.natsURL) + + return + } + + nextTryIn := (time.Duration(i+1) * q.reconnectDelay).String() + + log.Printf("Reconnecting (%d/%d) to %s failed\n", i+1, q.maxReconnect, q.natsURL) + log.Printf("Waiting %s before next try", nextTryIn) + case <-q.quitCh: + log.Println("Received signal to stop reconnecting...") + + return + } + } + + log.Printf("Reconnecting limit (%d) reached\n", q.maxReconnect) +} + +func (q *NATSQueue) unsubscribe() error { + q.connMutex.Lock() + defer q.connMutex.Unlock() + + if q.subscription != nil { + return fmt.Errorf("q.subscription is nil") + } + + return q.subscription.Unsubscribe() +} + +func (q *NATSQueue) closeConnection() error { + q.connMutex.Lock() + defer q.connMutex.Unlock() + + if q.conn == nil { + return fmt.Errorf("q.conn is nil") + } + + close(q.quitCh) + + return q.conn.Close() +}