mirror of
https://github.com/openfaas/faas.git
synced 2025-06-19 12:36:40 +00:00
Allow for customizing the name of the target NATS Streaming cluster.
Signed-off-by: Bruno Miguel Custódio <brunomcustodio@gmail.com>
This commit is contained in:
committed by
Alex Ellis
parent
ff5ec39e4d
commit
365f459b3f
6
gateway/Gopkg.lock
generated
6
gateway/Gopkg.lock
generated
@ -112,15 +112,15 @@
|
|||||||
version = "0.12.0"
|
version = "0.12.0"
|
||||||
|
|
||||||
[[projects]]
|
[[projects]]
|
||||||
digest = "1:f7b0087a32b4f017ce89562494ae510f21e7d22e70cc1911640a32ebe583e92e"
|
digest = "1:28692259e850b777eb6db1ee8937ecd14d4d1172204d583f8b71ac11455a737e"
|
||||||
name = "github.com/openfaas/nats-queue-worker"
|
name = "github.com/openfaas/nats-queue-worker"
|
||||||
packages = [
|
packages = [
|
||||||
"handler",
|
"handler",
|
||||||
"nats",
|
"nats",
|
||||||
]
|
]
|
||||||
pruneopts = "UT"
|
pruneopts = "UT"
|
||||||
revision = "6ac9962b0323727171ac7143a24816af421c8d22"
|
revision = "8dff9cb169398f4f131565702c7f695e76247ed6"
|
||||||
version = "0.8.0"
|
version = "0.8.2"
|
||||||
|
|
||||||
[[projects]]
|
[[projects]]
|
||||||
digest = "1:eb04f69c8991e52eff33c428bd729e04208bf03235be88e4df0d88497c6861b9"
|
digest = "1:eb04f69c8991e52eff33c428bd729e04208bf03235be88e4df0d88497c6861b9"
|
||||||
|
@ -12,7 +12,7 @@
|
|||||||
|
|
||||||
[[constraint]]
|
[[constraint]]
|
||||||
name = "github.com/openfaas/nats-queue-worker"
|
name = "github.com/openfaas/nats-queue-worker"
|
||||||
version = "0.8.0"
|
version = "0.8.2"
|
||||||
|
|
||||||
[[constraint]]
|
[[constraint]]
|
||||||
name = "github.com/prometheus/client_golang"
|
name = "github.com/prometheus/client_golang"
|
||||||
|
@ -70,8 +70,9 @@ The gateway can be configured through the following environment variables:
|
|||||||
| `read_timeout` | HTTP timeout for reading the payload from the client caller (in seconds). Default: `8` |
|
| `read_timeout` | HTTP timeout for reading the payload from the client caller (in seconds). Default: `8` |
|
||||||
| `functions_provider_url` | URL of upstream [functions provider](https://github.com/openfaas/faas-provider/) - i.e. Swarm, Kubernetes, Nomad etc |
|
| `functions_provider_url` | URL of upstream [functions provider](https://github.com/openfaas/faas-provider/) - i.e. Swarm, Kubernetes, Nomad etc |
|
||||||
| `logs_provider_url` | URL of the upstream function logs api provider, optional, when empty the `functions_provider_url` is used |
|
| `logs_provider_url` | URL of the upstream function logs api provider, optional, when empty the `functions_provider_url` is used |
|
||||||
| `faas_nats_address` | Address of NATS service. Required for asynchronous mode |
|
| `faas_nats_address` | The host at which NATS Streaming can be reached. Required for asynchronous mode |
|
||||||
| `faas_nats_port` | Port for NATS service. Requrired for asynchronous mode |
|
| `faas_nats_port` | The port at which NATS Streaming can be reached. Required for asynchronous mode |
|
||||||
|
| `faas_nats_cluster_name` | The name of the target NATS Streaming cluster. Defaults to `faas-cluster` for backwards-compatibility |
|
||||||
| `faas_prometheus_host` | Host to connect to Prometheus. Default: `"prometheus"` |
|
| `faas_prometheus_host` | Host to connect to Prometheus. Default: `"prometheus"` |
|
||||||
| `faas_promethus_port` | Port to connect to Prometheus. Default: `9090` |
|
| `faas_promethus_port` | Port to connect to Prometheus. Default: `9090` |
|
||||||
| `direct_functions` | `true` or `false` - functions are invoked directly over overlay network by DNS name without passing through the provider |
|
| `direct_functions` | `true` or `false` - functions are invoked directly over overlay network by DNS name without passing through the provider |
|
||||||
|
@ -135,7 +135,7 @@ func main() {
|
|||||||
|
|
||||||
defaultNATSConfig := natsHandler.NewDefaultNATSConfig(maxReconnect, interval)
|
defaultNATSConfig := natsHandler.NewDefaultNATSConfig(maxReconnect, interval)
|
||||||
|
|
||||||
natsQueue, queueErr := natsHandler.CreateNATSQueue(*config.NATSAddress, *config.NATSPort, defaultNATSConfig)
|
natsQueue, queueErr := natsHandler.CreateNATSQueue(*config.NATSAddress, *config.NATSPort, *config.NATSClusterName, defaultNATSConfig)
|
||||||
if queueErr != nil {
|
if queueErr != nil {
|
||||||
log.Fatalln(queueErr)
|
log.Fatalln(queueErr)
|
||||||
}
|
}
|
||||||
|
@ -98,6 +98,14 @@ func (ReadConfig) Read(hasEnv HasEnv) (*GatewayConfig, error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
faasNATSClusterName := hasEnv.Getenv("faas_nats_cluster_name")
|
||||||
|
if len(faasNATSClusterName) > 0 {
|
||||||
|
cfg.NATSClusterName = &faasNATSClusterName
|
||||||
|
} else {
|
||||||
|
v := "faas-cluster"
|
||||||
|
cfg.NATSClusterName = &v
|
||||||
|
}
|
||||||
|
|
||||||
prometheusPort := hasEnv.Getenv("faas_prometheus_port")
|
prometheusPort := hasEnv.Getenv("faas_prometheus_port")
|
||||||
if len(prometheusPort) > 0 {
|
if len(prometheusPort) > 0 {
|
||||||
prometheusPortVal, err := strconv.Atoi(prometheusPort)
|
prometheusPortVal, err := strconv.Atoi(prometheusPort)
|
||||||
@ -186,6 +194,9 @@ type GatewayConfig struct {
|
|||||||
// Port of the NATS Service. Required for async mode.
|
// Port of the NATS Service. Required for async mode.
|
||||||
NATSPort *int
|
NATSPort *int
|
||||||
|
|
||||||
|
// The name of the NATS Streaming cluster. Required for async mode.
|
||||||
|
NATSClusterName *string
|
||||||
|
|
||||||
// Host to connect to Prometheus.
|
// Host to connect to Prometheus.
|
||||||
PrometheusHost string
|
PrometheusHost string
|
||||||
|
|
||||||
|
@ -236,6 +236,33 @@ func TestRead_UseNATS(t *testing.T) {
|
|||||||
t.Log("NATS was requested in config, but not enabled.")
|
t.Log("NATS was requested in config, but not enabled.")
|
||||||
t.Fail()
|
t.Fail()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
wantNATSAddress := "nats"
|
||||||
|
if *config.NATSAddress != wantNATSAddress {
|
||||||
|
t.Logf("faas_nats_address: want %s, got %s", wantNATSAddress, *config.NATSAddress)
|
||||||
|
t.Fail()
|
||||||
|
}
|
||||||
|
|
||||||
|
wantNATSPort := 6222
|
||||||
|
if *config.NATSPort != wantNATSPort {
|
||||||
|
t.Logf("faas_nats_port: want %d, got %d", wantNATSPort, *config.NATSPort)
|
||||||
|
t.Fail()
|
||||||
|
}
|
||||||
|
|
||||||
|
wantNATSClusterName := "faas-cluster"
|
||||||
|
if *config.NATSClusterName != wantNATSClusterName {
|
||||||
|
t.Logf("faas_nats_cluster_name: want %s, got %s", wantNATSClusterName, *config.NATSClusterName)
|
||||||
|
t.Fail()
|
||||||
|
}
|
||||||
|
|
||||||
|
defaults.Setenv("faas_nats_cluster_name", "example-nats-cluster")
|
||||||
|
config, _ = readConfig.Read(defaults)
|
||||||
|
|
||||||
|
wantNATSClusterName = "example-nats-cluster"
|
||||||
|
if *config.NATSClusterName != wantNATSClusterName {
|
||||||
|
t.Logf("faas_nats_cluster_name: want %s, got %s", wantNATSClusterName, *config.NATSClusterName)
|
||||||
|
t.Fail()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestRead_UseNATSBadPort(t *testing.T) {
|
func TestRead_UseNATSBadPort(t *testing.T) {
|
||||||
@ -243,6 +270,7 @@ func TestRead_UseNATSBadPort(t *testing.T) {
|
|||||||
defaults := NewEnvBucket()
|
defaults := NewEnvBucket()
|
||||||
defaults.Setenv("faas_nats_address", "nats")
|
defaults.Setenv("faas_nats_address", "nats")
|
||||||
defaults.Setenv("faas_nats_port", "6fff")
|
defaults.Setenv("faas_nats_port", "6fff")
|
||||||
|
defaults.Setenv("faas_nats_cluster_name", "example-nats-cluster")
|
||||||
readConfig := ReadConfig{}
|
readConfig := ReadConfig{}
|
||||||
|
|
||||||
_, err := readConfig.Read(defaults)
|
_, err := readConfig.Read(defaults)
|
||||||
|
5
gateway/vendor/github.com/openfaas/nats-queue-worker/handler/handler.go
generated
vendored
5
gateway/vendor/github.com/openfaas/nats-queue-worker/handler/handler.go
generated
vendored
@ -7,17 +7,16 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// CreateNATSQueue ready for asynchronous processing
|
// CreateNATSQueue ready for asynchronous processing
|
||||||
func CreateNATSQueue(address string, port int, clientConfig NATSConfig) (*NATSQueue, error) {
|
func CreateNATSQueue(address string, port int, clusterName string, clientConfig NATSConfig) (*NATSQueue, error) {
|
||||||
var err error
|
var err error
|
||||||
natsURL := fmt.Sprintf("nats://%s:%d", address, port)
|
natsURL := fmt.Sprintf("nats://%s:%d", address, port)
|
||||||
log.Printf("Opening connection to %s\n", natsURL)
|
log.Printf("Opening connection to %s\n", natsURL)
|
||||||
|
|
||||||
clientID := clientConfig.GetClientID()
|
clientID := clientConfig.GetClientID()
|
||||||
clusterID := "faas-cluster"
|
|
||||||
|
|
||||||
queue1 := NATSQueue{
|
queue1 := NATSQueue{
|
||||||
ClientID: clientID,
|
ClientID: clientID,
|
||||||
ClusterID: clusterID,
|
ClusterID: clusterName,
|
||||||
NATSURL: natsURL,
|
NATSURL: natsURL,
|
||||||
Topic: "faas-request",
|
Topic: "faas-request",
|
||||||
maxReconnect: clientConfig.GetMaxReconnect(),
|
maxReconnect: clientConfig.GetMaxReconnect(),
|
||||||
|
Reference in New Issue
Block a user