From 365f459b3f3a05ad17ce83f4edd86f4276af354a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bruno=20Miguel=20Cust=C3=B3dio?= Date: Mon, 25 Nov 2019 10:40:37 +0000 Subject: [PATCH] Allow for customizing the name of the target NATS Streaming cluster. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Bruno Miguel Custódio --- gateway/Gopkg.lock | 6 ++-- gateway/Gopkg.toml | 2 +- gateway/README.md | 5 ++-- gateway/server.go | 2 +- gateway/types/readconfig.go | 11 ++++++++ gateway/types/readconfig_test.go | 28 +++++++++++++++++++ .../nats-queue-worker/handler/handler.go | 5 ++-- 7 files changed, 49 insertions(+), 10 deletions(-) diff --git a/gateway/Gopkg.lock b/gateway/Gopkg.lock index d5e93cc1..4f72a18e 100644 --- a/gateway/Gopkg.lock +++ b/gateway/Gopkg.lock @@ -112,15 +112,15 @@ version = "0.12.0" [[projects]] - digest = "1:f7b0087a32b4f017ce89562494ae510f21e7d22e70cc1911640a32ebe583e92e" + digest = "1:28692259e850b777eb6db1ee8937ecd14d4d1172204d583f8b71ac11455a737e" name = "github.com/openfaas/nats-queue-worker" packages = [ "handler", "nats", ] pruneopts = "UT" - revision = "6ac9962b0323727171ac7143a24816af421c8d22" - version = "0.8.0" + revision = "8dff9cb169398f4f131565702c7f695e76247ed6" + version = "0.8.2" [[projects]] digest = "1:eb04f69c8991e52eff33c428bd729e04208bf03235be88e4df0d88497c6861b9" diff --git a/gateway/Gopkg.toml b/gateway/Gopkg.toml index 2921d2e1..69c390a9 100644 --- a/gateway/Gopkg.toml +++ b/gateway/Gopkg.toml @@ -12,7 +12,7 @@ [[constraint]] name = "github.com/openfaas/nats-queue-worker" - version = "0.8.0" + version = "0.8.2" [[constraint]] name = "github.com/prometheus/client_golang" diff --git a/gateway/README.md b/gateway/README.md index fd7ba192..25352d5c 100644 --- a/gateway/README.md +++ b/gateway/README.md @@ -70,8 +70,9 @@ The gateway can be configured through the following environment variables: | `read_timeout` | HTTP timeout for reading the payload from the client caller (in seconds). Default: `8` | | `functions_provider_url` | URL of upstream [functions provider](https://github.com/openfaas/faas-provider/) - i.e. Swarm, Kubernetes, Nomad etc | | `logs_provider_url` | URL of the upstream function logs api provider, optional, when empty the `functions_provider_url` is used | -| `faas_nats_address` | Address of NATS service. Required for asynchronous mode | -| `faas_nats_port` | Port for NATS service. Requrired for asynchronous mode | +| `faas_nats_address` | The host at which NATS Streaming can be reached. Required for asynchronous mode | +| `faas_nats_port` | The port at which NATS Streaming can be reached. Required for asynchronous mode | +| `faas_nats_cluster_name` | The name of the target NATS Streaming cluster. Defaults to `faas-cluster` for backwards-compatibility | | `faas_prometheus_host` | Host to connect to Prometheus. Default: `"prometheus"` | | `faas_promethus_port` | Port to connect to Prometheus. Default: `9090` | | `direct_functions` | `true` or `false` - functions are invoked directly over overlay network by DNS name without passing through the provider | diff --git a/gateway/server.go b/gateway/server.go index 26264245..8c50cd49 100644 --- a/gateway/server.go +++ b/gateway/server.go @@ -135,7 +135,7 @@ func main() { defaultNATSConfig := natsHandler.NewDefaultNATSConfig(maxReconnect, interval) - natsQueue, queueErr := natsHandler.CreateNATSQueue(*config.NATSAddress, *config.NATSPort, defaultNATSConfig) + natsQueue, queueErr := natsHandler.CreateNATSQueue(*config.NATSAddress, *config.NATSPort, *config.NATSClusterName, defaultNATSConfig) if queueErr != nil { log.Fatalln(queueErr) } diff --git a/gateway/types/readconfig.go b/gateway/types/readconfig.go index dfb08ed9..12ccf382 100644 --- a/gateway/types/readconfig.go +++ b/gateway/types/readconfig.go @@ -98,6 +98,14 @@ func (ReadConfig) Read(hasEnv HasEnv) (*GatewayConfig, error) { } } + faasNATSClusterName := hasEnv.Getenv("faas_nats_cluster_name") + if len(faasNATSClusterName) > 0 { + cfg.NATSClusterName = &faasNATSClusterName + } else { + v := "faas-cluster" + cfg.NATSClusterName = &v + } + prometheusPort := hasEnv.Getenv("faas_prometheus_port") if len(prometheusPort) > 0 { prometheusPortVal, err := strconv.Atoi(prometheusPort) @@ -186,6 +194,9 @@ type GatewayConfig struct { // Port of the NATS Service. Required for async mode. NATSPort *int + // The name of the NATS Streaming cluster. Required for async mode. + NATSClusterName *string + // Host to connect to Prometheus. PrometheusHost string diff --git a/gateway/types/readconfig_test.go b/gateway/types/readconfig_test.go index bfdda03a..c307aa09 100644 --- a/gateway/types/readconfig_test.go +++ b/gateway/types/readconfig_test.go @@ -236,6 +236,33 @@ func TestRead_UseNATS(t *testing.T) { t.Log("NATS was requested in config, but not enabled.") t.Fail() } + + wantNATSAddress := "nats" + if *config.NATSAddress != wantNATSAddress { + t.Logf("faas_nats_address: want %s, got %s", wantNATSAddress, *config.NATSAddress) + t.Fail() + } + + wantNATSPort := 6222 + if *config.NATSPort != wantNATSPort { + t.Logf("faas_nats_port: want %d, got %d", wantNATSPort, *config.NATSPort) + t.Fail() + } + + wantNATSClusterName := "faas-cluster" + if *config.NATSClusterName != wantNATSClusterName { + t.Logf("faas_nats_cluster_name: want %s, got %s", wantNATSClusterName, *config.NATSClusterName) + t.Fail() + } + + defaults.Setenv("faas_nats_cluster_name", "example-nats-cluster") + config, _ = readConfig.Read(defaults) + + wantNATSClusterName = "example-nats-cluster" + if *config.NATSClusterName != wantNATSClusterName { + t.Logf("faas_nats_cluster_name: want %s, got %s", wantNATSClusterName, *config.NATSClusterName) + t.Fail() + } } func TestRead_UseNATSBadPort(t *testing.T) { @@ -243,6 +270,7 @@ func TestRead_UseNATSBadPort(t *testing.T) { defaults := NewEnvBucket() defaults.Setenv("faas_nats_address", "nats") defaults.Setenv("faas_nats_port", "6fff") + defaults.Setenv("faas_nats_cluster_name", "example-nats-cluster") readConfig := ReadConfig{} _, err := readConfig.Read(defaults) diff --git a/gateway/vendor/github.com/openfaas/nats-queue-worker/handler/handler.go b/gateway/vendor/github.com/openfaas/nats-queue-worker/handler/handler.go index 64777848..59b83ca3 100644 --- a/gateway/vendor/github.com/openfaas/nats-queue-worker/handler/handler.go +++ b/gateway/vendor/github.com/openfaas/nats-queue-worker/handler/handler.go @@ -7,17 +7,16 @@ import ( ) // CreateNATSQueue ready for asynchronous processing -func CreateNATSQueue(address string, port int, clientConfig NATSConfig) (*NATSQueue, error) { +func CreateNATSQueue(address string, port int, clusterName string, clientConfig NATSConfig) (*NATSQueue, error) { var err error natsURL := fmt.Sprintf("nats://%s:%d", address, port) log.Printf("Opening connection to %s\n", natsURL) clientID := clientConfig.GetClientID() - clusterID := "faas-cluster" queue1 := NATSQueue{ ClientID: clientID, - ClusterID: clusterID, + ClusterID: clusterName, NATSURL: natsURL, Topic: "faas-request", maxReconnect: clientConfig.GetMaxReconnect(),