Support customising the NATS Streaming channel.

Signed-off-by: Bruno Miguel Custódio <brunomcustodio@gmail.com>
This commit is contained in:
Bruno Miguel Custódio
2019-12-10 11:06:47 +00:00
committed by Alex Ellis
parent 3422bdcce9
commit 03dc8824d2
7 changed files with 39 additions and 7 deletions

6
gateway/Gopkg.lock generated
View File

@ -112,15 +112,15 @@
version = "0.12.0" version = "0.12.0"
[[projects]] [[projects]]
digest = "1:28692259e850b777eb6db1ee8937ecd14d4d1172204d583f8b71ac11455a737e" digest = "1:b7c06bdc590871ac5c88f8a85acc9f892c994d6327de311ec5871aaecb6f9489"
name = "github.com/openfaas/nats-queue-worker" name = "github.com/openfaas/nats-queue-worker"
packages = [ packages = [
"handler", "handler",
"nats", "nats",
] ]
pruneopts = "UT" pruneopts = "UT"
revision = "8dff9cb169398f4f131565702c7f695e76247ed6" revision = "dea1c90b8cc66dc73597b7531a4fd29a32b5f88c"
version = "0.8.2" version = "0.9.0"
[[projects]] [[projects]]
digest = "1:eb04f69c8991e52eff33c428bd729e04208bf03235be88e4df0d88497c6861b9" digest = "1:eb04f69c8991e52eff33c428bd729e04208bf03235be88e4df0d88497c6861b9"

View File

@ -12,7 +12,7 @@
[[constraint]] [[constraint]]
name = "github.com/openfaas/nats-queue-worker" name = "github.com/openfaas/nats-queue-worker"
version = "0.8.2" version = "0.9.0"
[[constraint]] [[constraint]]
name = "github.com/prometheus/client_golang" name = "github.com/prometheus/client_golang"

View File

@ -73,6 +73,7 @@ The gateway can be configured through the following environment variables:
| `faas_nats_address` | The host at which NATS Streaming can be reached. Required for asynchronous mode | | `faas_nats_address` | The host at which NATS Streaming can be reached. Required for asynchronous mode |
| `faas_nats_port` | The port at which NATS Streaming can be reached. Required for asynchronous mode | | `faas_nats_port` | The port at which NATS Streaming can be reached. Required for asynchronous mode |
| `faas_nats_cluster_name` | The name of the target NATS Streaming cluster. Defaults to `faas-cluster` for backwards-compatibility | | `faas_nats_cluster_name` | The name of the target NATS Streaming cluster. Defaults to `faas-cluster` for backwards-compatibility |
| `faas_nats_channel` | The name of the NATS Streaming channel to use. Defaults to `faas-request` for backwards-compatibility |
| `faas_prometheus_host` | Host to connect to Prometheus. Default: `"prometheus"` | | `faas_prometheus_host` | Host to connect to Prometheus. Default: `"prometheus"` |
| `faas_promethus_port` | Port to connect to Prometheus. Default: `9090` | | `faas_promethus_port` | Port to connect to Prometheus. Default: `9090` |
| `direct_functions` | `true` or `false` - functions are invoked directly over overlay network by DNS name without passing through the provider | | `direct_functions` | `true` or `false` - functions are invoked directly over overlay network by DNS name without passing through the provider |

View File

@ -135,7 +135,7 @@ func main() {
defaultNATSConfig := natsHandler.NewDefaultNATSConfig(maxReconnect, interval) defaultNATSConfig := natsHandler.NewDefaultNATSConfig(maxReconnect, interval)
natsQueue, queueErr := natsHandler.CreateNATSQueue(*config.NATSAddress, *config.NATSPort, *config.NATSClusterName, defaultNATSConfig) natsQueue, queueErr := natsHandler.CreateNATSQueue(*config.NATSAddress, *config.NATSPort, *config.NATSClusterName, *config.NATSChannel, defaultNATSConfig)
if queueErr != nil { if queueErr != nil {
log.Fatalln(queueErr) log.Fatalln(queueErr)
} }

View File

@ -106,6 +106,14 @@ func (ReadConfig) Read(hasEnv HasEnv) (*GatewayConfig, error) {
cfg.NATSClusterName = &v cfg.NATSClusterName = &v
} }
faasNATSChannel := hasEnv.Getenv("faas_nats_channel")
if len(faasNATSChannel) > 0 {
cfg.NATSChannel = &faasNATSChannel
} else {
v := "faas-request"
cfg.NATSChannel = &v
}
prometheusPort := hasEnv.Getenv("faas_prometheus_port") prometheusPort := hasEnv.Getenv("faas_prometheus_port")
if len(prometheusPort) > 0 { if len(prometheusPort) > 0 {
prometheusPortVal, err := strconv.Atoi(prometheusPort) prometheusPortVal, err := strconv.Atoi(prometheusPort)
@ -197,6 +205,9 @@ type GatewayConfig struct {
// The name of the NATS Streaming cluster. Required for async mode. // The name of the NATS Streaming cluster. Required for async mode.
NATSClusterName *string NATSClusterName *string
// NATSChannel is the name of the NATS Streaming channel used for asynchronous function invocations.
NATSChannel *string
// Host to connect to Prometheus. // Host to connect to Prometheus.
PrometheusHost string PrometheusHost string

View File

@ -255,6 +255,12 @@ func TestRead_UseNATS(t *testing.T) {
t.Fail() t.Fail()
} }
wantNATSChannel := "faas-request"
if *config.NATSChannel != wantNATSChannel {
t.Logf("faas_nats_channel: want %s, got %s", wantNATSChannel, *config.NATSChannel)
t.Fail()
}
defaults.Setenv("faas_nats_cluster_name", "example-nats-cluster") defaults.Setenv("faas_nats_cluster_name", "example-nats-cluster")
config, _ = readConfig.Read(defaults) config, _ = readConfig.Read(defaults)
@ -263,6 +269,15 @@ func TestRead_UseNATS(t *testing.T) {
t.Logf("faas_nats_cluster_name: want %s, got %s", wantNATSClusterName, *config.NATSClusterName) t.Logf("faas_nats_cluster_name: want %s, got %s", wantNATSClusterName, *config.NATSClusterName)
t.Fail() t.Fail()
} }
defaults.Setenv("faas_nats_channel", "foo")
config, _ = readConfig.Read(defaults)
wantNATSChannel = "foo"
if *config.NATSChannel != wantNATSChannel {
t.Logf("faas_nats_channel: want %s, got %s", wantNATSChannel, *config.NATSChannel)
t.Fail()
}
} }
func TestRead_UseNATSBadPort(t *testing.T) { func TestRead_UseNATSBadPort(t *testing.T) {

View File

@ -7,18 +7,23 @@ import (
) )
// CreateNATSQueue ready for asynchronous processing // CreateNATSQueue ready for asynchronous processing
func CreateNATSQueue(address string, port int, clusterName string, clientConfig NATSConfig) (*NATSQueue, error) { func CreateNATSQueue(address string, port int, clusterName, channel string, clientConfig NATSConfig) (*NATSQueue, error) {
var err error var err error
natsURL := fmt.Sprintf("nats://%s:%d", address, port) natsURL := fmt.Sprintf("nats://%s:%d", address, port)
log.Printf("Opening connection to %s\n", natsURL) log.Printf("Opening connection to %s\n", natsURL)
clientID := clientConfig.GetClientID() clientID := clientConfig.GetClientID()
// If 'channel' is empty, use the previous default.
if channel == "" {
channel = "faas-request"
}
queue1 := NATSQueue{ queue1 := NATSQueue{
ClientID: clientID, ClientID: clientID,
ClusterID: clusterName, ClusterID: clusterName,
NATSURL: natsURL, NATSURL: natsURL,
Topic: "faas-request", Topic: channel,
maxReconnect: clientConfig.GetMaxReconnect(), maxReconnect: clientConfig.GetMaxReconnect(),
reconnectDelay: clientConfig.GetReconnectDelay(), reconnectDelay: clientConfig.GetReconnectDelay(),
ncMutex: &sync.RWMutex{}, ncMutex: &sync.RWMutex{},