Allow dot in function name

This patch enables the use-case for multiple namepsaces by
allowing a dot to be used in the function name.

dep has been run to update OpenFaaS projects and also to
prune unused files.

Tested by doing a build.

Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alexellis2@gmail.com>
This commit is contained in:
Alex Ellis (OpenFaaS Ltd)
2019-09-20 11:12:19 +01:00
committed by Alex Ellis
parent dc3c5fb9b3
commit 0a90125aba
1298 changed files with 86 additions and 629745 deletions

View File

@ -1 +0,0 @@
redirect: https://raw.githubusercontent.com/openfaas/faas/master/.DEREK.yml

View File

@ -1,35 +0,0 @@
<!--- Provide a general summary of the issue in the Title above -->
## Expected Behaviour
<!--- If you're describing a bug, tell us what should happen -->
<!--- If you're suggesting a change/improvement, tell us how it should work -->
## Current Behaviour
<!--- If describing a bug, tell us what happens instead of the expected behavior -->
<!--- If suggesting a change/improvement, explain the difference from current behavior -->
## Possible Solution
<!--- Not obligatory, but suggest a fix/reason for the bug, -->
<!--- or ideas how to implement the addition or change -->
## Steps to Reproduce (for bugs)
<!--- Provide a link to a live example, or an unambiguous set of steps to -->
<!--- reproduce this bug. Include code to reproduce, if relevant -->
1.
2.
3.
4.
## Context
<!--- How has this issue affected you? What are you trying to accomplish? -->
<!--- Providing context helps us come up with a solution that is most useful in the real world -->
## Your Environment
<!--- Include as many relevant details about the environment you experienced the bug in -->
* Docker version `docker version` (e.g. Docker 17.0.05 ):
* Are you using Docker Swarm or Kubernetes (FaaS-netes)?
* Operating System and version (e.g. Linux, Windows, MacOS):
* Link to your project or a code example to reproduce issue:

View File

@ -1,31 +0,0 @@
<!--- Provide a general summary of your changes in the Title above -->
## Description
<!--- Describe your changes in detail -->
## Motivation and Context
<!--- Why is this change required? What problem does it solve? -->
<!--- If it fixes an open issue, please link to the issue here. -->
- [ ] I have raised an issue to propose this change ([required](https://github.com/openfaas/faas/blob/master/CONTRIBUTING.md))
## How Has This Been Tested?
<!--- Please describe in detail how you tested your changes. -->
<!--- Include details of your testing environment, and the tests you ran to -->
<!--- see how your change affects other areas of the code, etc. -->
## Types of changes
<!--- What types of changes does your code introduce? Put an `x` in all the boxes that apply: -->
- [ ] Bug fix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)
## Checklist:
<!--- Go over all the following points, and put an `x` in all the boxes that apply. -->
<!--- If you're unsure about any of these, don't hesitate to ask. We're here to help! -->
- [ ] My code follows the code style of this project.
- [ ] My change requires a change to the documentation.
- [ ] I have updated the documentation accordingly.
- [ ] I've read the [CONTRIBUTION](https://github.com/openfaas/faas/blob/master/CONTRIBUTING.md) guide
- [ ] I have signed-off my commits with `git commit -s`
- [ ] I have added tests to cover my changes.
- [ ] All new and existing tests passed.

View File

@ -1,20 +0,0 @@
# Binaries for programs and plugins
*.exe
*.dll
*.so
*.dylib
# Test binary, build with `go test -c`
*.test
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
# Go
.glide/
.idea
.DS_Store
nats-queue-worker
queue-worker

View File

@ -1,38 +0,0 @@
sudo: required
language: go
go:
- 1.9.x
services:
- docker
addons:
apt:
packages:
- docker-ce
install:
- echo "Please don't go get"
script:
- ./build.sh
after_success:
- if [ ! -z "$TRAVIS_TAG" ] ; then
if [ -z $DOCKER_NS ] ; then
export DOCKER_NS=openfaas;
fi
docker tag $DOCKER_NS/queue-worker:latest-dev $DOCKER_NS/queue-worker:$TRAVIS_TAG;
echo $DOCKER_PASSWORD | docker login -u=$DOCKER_USERNAME --password-stdin;
docker push $DOCKER_NS/queue-worker:$TRAVIS_TAG;
docker tag $DOCKER_NS/queue-worker:latest-dev quay.io/$DOCKER_NS/queue-worker:$TRAVIS_TAG;
echo $QUAY_PASSWORD | docker login -u=$QUAY_USERNAME --password-stdin quay.io;
docker push quay.io/$DOCKER_NS/queue-worker:$TRAVIS_TAG;
fi

View File

@ -1,9 +0,0 @@
## Contributing
### License
This project is licensed under the MIT License.
## Guidelines
See guide for [FaaS](https://github.com/openfaas/faas/blob/master/CONTRIBUTING.md) here.

View File

@ -1,33 +0,0 @@
FROM golang:1.10-alpine as golang
WORKDIR /go/src/github.com/openfaas/nats-queue-worker
COPY vendor vendor
COPY handler handler
COPY nats nats
COPY main.go .
COPY types.go .
COPY readconfig.go .
COPY readconfig_test.go .
COPY auth.go .
RUN go test -v ./...
RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o app .
FROM alpine:3.8
RUN addgroup -S app \
&& adduser -S -g app app \
&& apk add --no-cache ca-certificates
WORKDIR /home/app
EXPOSE 8080
ENV http_proxy ""
ENV https_proxy ""
COPY --from=golang /go/src/github.com/openfaas/nats-queue-worker/app .
RUN chown -R app:app ./
USER app
CMD ["./app"]

View File

@ -1,33 +0,0 @@
FROM golang:1.10-alpine as golang
WORKDIR /go/src/github.com/openfaas/nats-queue-worker
COPY vendor vendor
COPY handler handler
COPY nats nats
COPY auth.go .
COPY types.go .
COPY readconfig.go .
COPY readconfig_test.go .
COPY main.go .
RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o app .
FROM alpine:3.10
RUN addgroup -S app \
&& adduser -S -g app app \
&& apk add --no-cache ca-certificates
WORKDIR /home/app
EXPOSE 8080
ENV http_proxy ""
ENV https_proxy ""
COPY --from=golang /go/src/github.com/openfaas/nats-queue-worker/app .
RUN chown -R app:app ./
USER app
CMD ["./app"]

View File

@ -1,33 +0,0 @@
FROM golang:1.10-alpine as golang
WORKDIR /go/src/github.com/openfaas/nats-queue-worker
COPY vendor vendor
COPY handler handler
COPY nats nats
COPY main.go .
COPY types.go .
COPY readconfig.go .
COPY readconfig_test.go .
COPY auth.go .
RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o app .
FROM alpine:3.10
RUN addgroup -S app \
&& adduser -S -g app app \
&& apk add --no-cache ca-certificates
WORKDIR /home/app
EXPOSE 8080
ENV http_proxy ""
ENV https_proxy ""
COPY --from=golang /go/src/github.com/openfaas/nats-queue-worker/app .
RUN chown -R app:app ./
USER app
CMD ["./app"]

View File

@ -1,56 +0,0 @@
# This file is autogenerated, do not edit; changes may be undone by the next 'dep ensure'.
[[projects]]
name = "github.com/gogo/protobuf"
packages = [
"gogoproto",
"proto",
"protoc-gen-gogo/descriptor"
]
revision = "1adfc126b41513cc696b209667c8656ea7aac67c"
version = "v1.0.0"
[[projects]]
name = "github.com/nats-io/go-nats"
packages = [
".",
"encoders/builtin",
"util"
]
revision = "062418ea1c2181f52dc0f954f6204370519a868b"
version = "v1.5.0"
[[projects]]
name = "github.com/nats-io/go-nats-streaming"
packages = [
".",
"pb"
]
revision = "e15a53f85e4932540600a16b56f6c4f65f58176f"
version = "v0.4.0"
[[projects]]
name = "github.com/nats-io/nuid"
packages = ["."]
revision = "289cccf02c178dc782430d534e3c1f5b72af807f"
version = "v1.0.0"
[[projects]]
name = "github.com/openfaas/faas"
packages = ["gateway/queue"]
revision = "bfa869ec8c0c04c26c5b0ed434bc367e712dcaef"
version = "0.10.2"
[[projects]]
name = "github.com/openfaas/faas-provider"
packages = ["auth"]
revision = "9ce928bc82cbb2642e6d534f93a7904116179e6c"
version = "0.7.0"
[solve-meta]
analyzer-name = "dep"
analyzer-version = 1
inputs-digest = "b568e20d336fb2625ad72caa1b479a46ed1362e320083720feabccd1abaedbee"
solver-name = "gps-cdcl"
solver-version = 1

View File

@ -1,46 +0,0 @@
# Gopkg.toml example
#
# Refer to https://golang.github.io/dep/docs/Gopkg.toml.html
# for detailed Gopkg.toml documentation.
#
# required = ["github.com/user/thing/cmd/thing"]
# ignored = ["github.com/user/project/pkgX", "bitbucket.org/user/project/pkgA/pkgY"]
#
# [[constraint]]
# name = "github.com/user/project"
# version = "1.0.0"
#
# [[constraint]]
# name = "github.com/user/project2"
# branch = "dev"
# source = "github.com/myfork/project2"
#
# [[override]]
# name = "github.com/x/y"
# version = "2.4.0"
#
# [prune]
# non-go = false
# go-tests = true
# unused-packages = true
[[constraint]]
name = "github.com/nats-io/go-nats-streaming"
version = "0.4.0"
[[constraint]]
name = "github.com/openfaas/faas"
version = "0.10.2"
[[constraint]]
name = "github.com/nats-io/go-nats"
version = "v1.5.0"
[prune]
go-tests = true
unused-packages = true
[[constraint]]
name = "github.com/openfaas/faas-provider"
version = "0.7.0"

View File

@ -1,29 +0,0 @@
TAG?=latest
.PHONY: build
build:
docker build --build-arg http_proxy="${http_proxy}" --build-arg https_proxy="${https_proxy}" -t openfaas/queue-worker:$(TAG) .
.PHONY: push
push:
docker push openfaas/queue-worker:$(TAG)
.PHONY: all
all: build
.PHONY: ci-armhf-build
ci-armhf-build:
docker build --build-arg http_proxy="${http_proxy}" --build-arg https_proxy="${https_proxy}" -t openfaas/queue-worker:$(TAG)-armhf . -f Dockerfile.armhf
.PHONY: ci-armhf-push
ci-armhf-push:
docker push openfaas/queue-worker:$(TAG)-armhf
.PHONY: ci-arm64-build
ci-arm64-build:
docker build --build-arg http_proxy="${http_proxy}" --build-arg https_proxy="${https_proxy}" -t openfaas/queue-worker:$(TAG)-arm64 . -f Dockerfile.arm64
.PHONY: ci-arm64-push
ci-arm64-push:
docker push openfaas/queue-worker:$(TAG)-arm64

View File

@ -1,21 +0,0 @@
## Queue worker for OpenFaaS - NATS Streaming
[![Build Status](https://travis-ci.org/openfaas/nats-queue-worker.svg?branch=master)](https://travis-ci.org/openfaas/nats-queue-worker)
This is a queue-worker to enable asynchronous processing of function requests.
> Note: A Kafka queue-worker is under-way through a PR on the main OpenFaaS repository.
* [Read more in the async guide](https://github.com/openfaas/faas/blob/master/guide/asynchronous.md)
Hub image: [openfaas/queue-worker](https://hub.docker.com/r/openfaas/queue-worker/)
License: MIT
Screenshots from keynote / video - find out more over at https://www.openfaas.com/
<img width="1440" alt="screen shot 2017-10-26 at 15 55 25" src="https://user-images.githubusercontent.com/6358735/32060207-049d4afa-ba66-11e7-8fc2-f4a0a84cbdaf.png">
<img width="1440" alt="screen shot 2017-10-26 at 15 55 19" src="https://user-images.githubusercontent.com/6358735/32060206-047eb75c-ba66-11e7-94d3-1343ea1811db.png">
<img width="1440" alt="screen shot 2017-10-26 at 15 55 06" src="https://user-images.githubusercontent.com/6358735/32060205-04545692-ba66-11e7-9e6d-b800a07b9bf5.png">

View File

@ -1,43 +0,0 @@
package main
import (
"fmt"
"net/http"
"os"
"github.com/openfaas/faas-provider/auth"
)
//AddBasicAuth to a request by reading secrets
func AddBasicAuth(req *http.Request) error {
if os.Getenv("basic_auth") == "true" {
reader := auth.ReadBasicAuthFromDisk{}
if len(os.Getenv("secret_mount_path")) > 0 {
reader.SecretMountPath = os.Getenv("secret_mount_path")
}
credentials, err := reader.Read()
if err != nil {
return fmt.Errorf("unable to read basic auth: %s", err.Error())
}
req.SetBasicAuth(credentials.User, credentials.Password)
}
return nil
}
//LoadCredentials load credentials from dis
func LoadCredentials() (*auth.BasicAuthCredentials, error) {
reader := auth.ReadBasicAuthFromDisk{}
if len(os.Getenv("secret_mount_path")) > 0 {
reader.SecretMountPath = os.Getenv("secret_mount_path")
}
credentials, err := reader.Read()
if err != nil {
return nil, fmt.Errorf("unable to read basic auth: %s", err.Error())
}
return credentials, nil
}

View File

@ -1,11 +0,0 @@
#!/bin/sh
export eTAG="latest-dev"
echo $1
if [ $1 ] ; then
eTAG=$1
fi
echo Building openfaas/queue-worker:$eTAG
docker build --build-arg http_proxy=$http_proxy -t openfaas/queue-worker:$eTAG .

View File

@ -1,40 +0,0 @@
package handler
import (
"os"
"strings"
"testing"
"github.com/openfaas/nats-queue-worker/nats"
)
func Test_GetClientID_ContainsHostname(t *testing.T) {
c := DefaultNATSConfig{}
val := c.GetClientID()
hostname, _ := os.Hostname()
encodedHostname := nats.GetClientID(hostname)
if !strings.HasSuffix(val, encodedHostname) {
t.Errorf("GetClientID should contain hostname as suffix, got: %s", val)
t.Fail()
}
}
func TestCreategetClientID(t *testing.T) {
clientID := getClientID("computer-a")
want := "faas-publisher-computer-a"
if clientID != want {
t.Logf("Want clientID: `%s`, but got: `%s`\n", want, clientID)
t.Fail()
}
}
func TestCreategetClientIDWhenHostHasUnsupportedCharacters(t *testing.T) {
clientID := getClientID("computer-a.acme.com")
want := "faas-publisher-computer-a_acme_com"
if clientID != want {
t.Logf("Want clientID: `%s`, but got: `%s`\n", want, clientID)
t.Fail()
}
}

View File

@ -1,305 +0,0 @@
package main
import (
"bytes"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"log"
"net"
"net/http"
"os"
"os/signal"
"strings"
"sync"
"time"
stan "github.com/nats-io/go-nats-streaming"
"github.com/openfaas/faas-provider/auth"
"github.com/openfaas/faas/gateway/queue"
"github.com/openfaas/nats-queue-worker/nats"
)
func main() {
readConfig := ReadConfig{}
config := readConfig.Read()
log.SetFlags(0)
hostname, _ := os.Hostname()
var durable string
var unsubscribe bool
var credentials *auth.BasicAuthCredentials
var err error
if os.Getenv("basic_auth") == "true" {
log.Printf("Loading basic authentication credentials")
credentials, err = LoadCredentials()
if err != nil {
log.Printf("Error with LoadCredentials: %s ", err.Error())
}
}
client := makeClient()
i := 0
messageHandler := func(msg *stan.Msg) {
i++
log.Printf("[#%d] Received on [%s]: '%s'\n", i, msg.Subject, msg)
started := time.Now()
req := queue.Request{}
unmarshalErr := json.Unmarshal(msg.Data, &req)
if unmarshalErr != nil {
log.Printf("Unmarshal error: %s with data %s", unmarshalErr, msg.Data)
return
}
xCallID := req.Header.Get("X-Call-Id")
fmt.Printf("Request for %s.\n", req.Function)
if config.DebugPrintBody {
fmt.Println(string(req.Body))
}
queryString := ""
if len(req.QueryString) > 0 {
queryString = fmt.Sprintf("?%s", strings.TrimLeft(req.QueryString, "?"))
}
functionURL := fmt.Sprintf("http://%s%s:8080/%s", req.Function, config.FunctionSuffix, queryString)
request, err := http.NewRequest(http.MethodPost, functionURL, bytes.NewReader(req.Body))
defer request.Body.Close()
copyHeaders(request.Header, &req.Header)
res, err := client.Do(request)
var status int
var functionResult []byte
if err != nil {
status = http.StatusServiceUnavailable
log.Println(err)
timeTaken := time.Since(started).Seconds()
if req.CallbackURL != nil {
log.Printf("Callback to: %s\n", req.CallbackURL.String())
resultStatusCode, resultErr := postResult(&client,
res,
functionResult,
req.CallbackURL.String(),
xCallID,
status)
if resultErr != nil {
log.Println(resultErr)
} else {
log.Printf("Posted result: %d", resultStatusCode)
}
}
statusCode, reportErr := postReport(&client, req.Function, status, timeTaken, config.GatewayAddress, credentials)
if reportErr != nil {
log.Println(reportErr)
} else {
log.Printf("Posting report - %d\n", statusCode)
}
return
}
if res.Body != nil {
defer res.Body.Close()
resData, err := ioutil.ReadAll(res.Body)
functionResult = resData
if err != nil {
log.Println(err)
}
if config.WriteDebug {
fmt.Println(string(functionResult))
} else {
fmt.Printf("Wrote %d Bytes\n", len(string(functionResult)))
}
}
timeTaken := time.Since(started).Seconds()
fmt.Println(res.Status)
if req.CallbackURL != nil {
log.Printf("Callback to: %s\n", req.CallbackURL.String())
resultStatusCode, resultErr := postResult(&client,
res,
functionResult,
req.CallbackURL.String(),
xCallID,
res.StatusCode)
if resultErr != nil {
log.Println(resultErr)
} else {
log.Printf("Posted result: %d", resultStatusCode)
}
}
statusCode, reportErr := postReport(&client, req.Function, res.StatusCode, timeTaken, config.GatewayAddress, credentials)
if reportErr != nil {
log.Println(reportErr)
} else {
log.Printf("Posting report - %d\n", statusCode)
}
}
natsURL := "nats://" + config.NatsAddress + ":4222"
natsQueue := NATSQueue{
clusterID: "faas-cluster",
clientID: "faas-worker-" + nats.GetClientID(hostname),
natsURL: natsURL,
connMutex: &sync.RWMutex{},
maxReconnect: config.MaxReconnect,
reconnectDelay: config.ReconnectDelay,
quitCh: make(chan struct{}),
subject: "faas-request",
qgroup: "faas",
durable: durable,
messageHandler: messageHandler,
startOption: stan.StartWithLastReceived(),
maxInFlight: stan.MaxInflight(config.MaxInflight),
ackWait: config.AckWait,
}
if initErr := natsQueue.connect(); initErr != nil {
log.Panic(initErr)
}
// Wait for a SIGINT (perhaps triggered by user with CTRL-C)
// Run cleanup when signal is received
signalChan := make(chan os.Signal, 1)
cleanupDone := make(chan bool)
signal.Notify(signalChan, os.Interrupt)
go func() {
for range signalChan {
fmt.Printf("\nReceived an interrupt, unsubscribing and closing connection...\n\n")
// Do not unsubscribe a durable on exit, except if asked to.
if durable == "" || unsubscribe {
if err := natsQueue.unsubscribe(); err != nil {
log.Panicf(
"Cannot unsubscribe subject: %s from %s because of an error: %v",
natsQueue.subject,
natsQueue.natsURL,
err,
)
}
}
if err := natsQueue.closeConnection(); err != nil {
log.Panicf("Cannot close connection to %s because of an error: %v\n", natsQueue.natsURL, err)
}
cleanupDone <- true
}
}()
<-cleanupDone
}
// makeClient constructs a HTTP client with keep-alive turned
// off and a dial-timeout of 30 seconds.
func makeClient() http.Client {
proxyClient := http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 0,
}).DialContext,
MaxIdleConns: 1,
DisableKeepAlives: true,
IdleConnTimeout: 120 * time.Millisecond,
ExpectContinueTimeout: 1500 * time.Millisecond,
},
}
return proxyClient
}
func postResult(client *http.Client, functionRes *http.Response, result []byte, callbackURL string, xCallID string, statusCode int) (int, error) {
var reader io.Reader
if result != nil {
reader = bytes.NewReader(result)
}
request, err := http.NewRequest(http.MethodPost, callbackURL, reader)
if functionRes != nil {
copyHeaders(request.Header, &functionRes.Header)
}
request.Header.Set("X-Function-Status", fmt.Sprintf("%d", statusCode))
if len(xCallID) > 0 {
request.Header.Set("X-Call-Id", xCallID)
}
res, err := client.Do(request)
if err != nil {
return http.StatusBadGateway, fmt.Errorf("error posting result to URL %s %s", callbackURL, err.Error())
}
if request.Body != nil {
defer request.Body.Close()
}
if res.Body != nil {
defer res.Body.Close()
}
return res.StatusCode, nil
}
func copyHeaders(destination http.Header, source *http.Header) {
for k, v := range *source {
vClone := make([]string, len(v))
copy(vClone, v)
(destination)[k] = vClone
}
}
func postReport(client *http.Client, function string, statusCode int, timeTaken float64, gatewayAddress string, credentials *auth.BasicAuthCredentials) (int, error) {
req := AsyncReport{
FunctionName: function,
StatusCode: statusCode,
TimeTaken: timeTaken,
}
targetPostback := "http://" + gatewayAddress + ":8080/system/async-report"
reqBytes, _ := json.Marshal(req)
request, err := http.NewRequest(http.MethodPost, targetPostback, bytes.NewReader(reqBytes))
if os.Getenv("basic_auth") == "true" && credentials != nil {
request.SetBasicAuth(credentials.User, credentials.Password)
}
defer request.Body.Close()
res, err := client.Do(request)
if err != nil {
return http.StatusGatewayTimeout, fmt.Errorf("cannot post report to %s: %s", targetPostback, err)
}
if res.Body != nil {
defer res.Body.Close()
}
return res.StatusCode, nil
}

View File

@ -1,23 +0,0 @@
package nats
import (
"testing"
)
func TestGetClientID(t *testing.T) {
clientID := GetClientID("computer-a")
want := "computer-a"
if clientID != want {
t.Logf("Want clientID: `%s`, but got: `%s`\n", want, clientID)
t.Fail()
}
}
func TestGetClientIDWhenHostHasUnsupportedCharacters(t *testing.T) {
clientID := GetClientID("computer-a.acme.com")
want := "computer-a_acme_com"
if clientID != want {
t.Logf("Want clientID: `%s`, but got: `%s`\n", want, clientID)
t.Fail()
}
}

View File

@ -1,112 +0,0 @@
package main
import (
"log"
"os"
"strconv"
"time"
)
// ReadConfig constitutes config from env variables
type ReadConfig struct {
}
const DefaultMaxReconnect = 120
const DefaultReconnectDelay = time.Second * 2
func (ReadConfig) Read() QueueWorkerConfig {
cfg := QueueWorkerConfig{
AckWait: time.Second * 30,
MaxInflight: 1,
}
if val, exists := os.LookupEnv("faas_nats_address"); exists {
cfg.NatsAddress = val
} else {
cfg.NatsAddress = "nats"
}
if val, exists := os.LookupEnv("faas_gateway_address"); exists {
cfg.GatewayAddress = val
} else {
cfg.GatewayAddress = "gateway"
}
if val, exists := os.LookupEnv("faas_function_suffix"); exists {
cfg.FunctionSuffix = val
}
if val, exists := os.LookupEnv("faas_print_body"); exists {
if val == "1" || val == "true" {
cfg.DebugPrintBody = true
} else {
cfg.DebugPrintBody = false
}
}
if val, exists := os.LookupEnv("write_debug"); exists {
if val == "1" || val == "true" {
cfg.WriteDebug = true
} else {
cfg.WriteDebug = false
}
}
if value, exists := os.LookupEnv("max_inflight"); exists {
val, err := strconv.Atoi(value)
if err != nil {
log.Println("max_inflight error:", err)
} else {
cfg.MaxInflight = val
}
}
cfg.MaxReconnect = DefaultMaxReconnect
if value, exists := os.LookupEnv("faas_max_reconnect"); exists {
val, err := strconv.Atoi(value)
if err != nil {
log.Println("converting faas_max_reconnect to int error:", err)
} else {
cfg.MaxReconnect = val
}
}
cfg.ReconnectDelay = DefaultReconnectDelay
if value, exists := os.LookupEnv("faas_reconnect_delay"); exists {
reconnectDelayVal, durationErr := time.ParseDuration(value)
if durationErr != nil {
log.Println("parse env var: faas_reconnect_delay as time.Duration error:", durationErr)
} else {
cfg.ReconnectDelay = reconnectDelayVal
}
}
if val, exists := os.LookupEnv("ack_wait"); exists {
ackWaitVal, durationErr := time.ParseDuration(val)
if durationErr != nil {
log.Println("ack_wait error:", durationErr)
} else {
cfg.AckWait = ackWaitVal
}
}
return cfg
}
type QueueWorkerConfig struct {
NatsAddress string
GatewayAddress string
FunctionSuffix string
DebugPrintBody bool
WriteDebug bool
MaxInflight int
AckWait time.Duration
MaxReconnect int
ReconnectDelay time.Duration
}

View File

@ -1,96 +0,0 @@
package main
import (
"os"
"testing"
"time"
)
func Test_ReadConfig(t *testing.T) {
readConfig := ReadConfig{}
os.Setenv("faas_nats_address", "test_nats")
os.Setenv("faas_gateway_address", "test_gatewayaddr")
os.Setenv("faas_function_suffix", "test_suffix")
os.Setenv("faas_print_body", "true")
os.Setenv("write_debug", "true")
os.Setenv("max_inflight", "10")
os.Setenv("ack_wait", "10ms")
config := readConfig.Read()
expected := "test_nats"
if config.NatsAddress != expected {
t.Logf("Expected NatsAddress `%s` actual `%s`\n", expected, config.NatsAddress)
t.Fail()
}
expected = "test_gatewayaddr"
if config.GatewayAddress != expected {
t.Logf("Expected GatewayAddress `%s` actual `%s`\n", expected, config.GatewayAddress)
t.Fail()
}
expected = "test_suffix"
if config.FunctionSuffix != expected {
t.Logf("Expected FunctionSuffix `%s` actual `%s`\n", expected, config.FunctionSuffix)
t.Fail()
}
if config.DebugPrintBody != true {
t.Logf("Expected DebugPrintBody `%v` actual `%v`\n", true, config.DebugPrintBody)
t.Fail()
}
if config.WriteDebug != true {
t.Logf("Expected WriteDebug `%v` actual `%v`\n", true, config.WriteDebug)
t.Fail()
}
expectedMaxInflight := 10
if config.MaxInflight != expectedMaxInflight {
t.Logf("Expected maxInflight `%v` actual `%v`\n", expectedMaxInflight, config.MaxInflight)
t.Fail()
}
expectedAckWait := time.Millisecond * 10
if config.AckWait != expectedAckWait {
t.Logf("Expected maxInflight `%v` actual `%v`\n", expectedAckWait, config.AckWait)
t.Fail()
}
os.Unsetenv("max_inflight")
os.Unsetenv("ack_wait")
config = readConfig.Read()
expectedMaxInflight = 1
if config.MaxInflight != expectedMaxInflight {
t.Logf("Expected maxInflight `%v` actual `%v`\n", expectedMaxInflight, config.MaxInflight)
t.Fail()
}
expectedAckWait = time.Second * 30
if config.AckWait != expectedAckWait {
t.Logf("Expected maxInflight `%v` actual `%v`\n", expectedAckWait, config.AckWait)
t.Fail()
}
os.Setenv("max_inflight", "10.00")
os.Setenv("ack_wait", "10")
config = readConfig.Read()
expectedMaxInflight = 1
if config.MaxInflight != expectedMaxInflight {
t.Logf("Expected maxInflight `%v` actual `%v`\n", expectedMaxInflight, config.MaxInflight)
t.Fail()
}
expectedAckWait = time.Second * 30
if config.AckWait != expectedAckWait {
t.Logf("Expected ackWait `%v` actual `%v`\n", expectedAckWait, config.AckWait)
t.Fail()
}
}

View File

@ -1,142 +0,0 @@
package main
import (
"fmt"
"log"
"sync"
"time"
stan "github.com/nats-io/go-nats-streaming"
)
// AsyncReport is the report from a function executed on a queue worker.
type AsyncReport struct {
FunctionName string `json:"name"`
StatusCode int `json:"statusCode"`
TimeTaken float64 `json:"timeTaken"`
}
// NATSQueue represents a subscription to NATS Streaming
type NATSQueue struct {
clusterID string
clientID string
natsURL string
maxReconnect int
reconnectDelay time.Duration
conn stan.Conn
connMutex *sync.RWMutex
quitCh chan struct{}
subject string
qgroup string
durable string
ackWait time.Duration
messageHandler func(*stan.Msg)
startOption stan.SubscriptionOption
maxInFlight stan.SubscriptionOption
subscription stan.Subscription
}
// connect creates a subscription to NATS Streaming
func (q *NATSQueue) connect() error {
log.Printf("Connect: %s\n", q.natsURL)
nc, err := stan.Connect(
q.clusterID,
q.clientID,
stan.NatsURL(q.natsURL),
stan.SetConnectionLostHandler(func(conn stan.Conn, err error) {
log.Printf("Disconnected from %s\n", q.natsURL)
q.reconnect()
}),
)
if err != nil {
return fmt.Errorf("can't connect to %s: %v", q.natsURL, err)
}
q.connMutex.Lock()
defer q.connMutex.Unlock()
q.conn = nc
log.Printf("Subscribing to: %s at %s\n", q.subject, q.natsURL)
log.Println("Wait for ", q.ackWait)
subscription, err := q.conn.QueueSubscribe(
q.subject,
q.qgroup,
q.messageHandler,
stan.DurableName(q.durable),
stan.AckWait(q.ackWait),
q.startOption,
q.maxInFlight,
)
if err != nil {
return fmt.Errorf("couldn't subscribe to %s at %s. Error: %v", q.subject, q.natsURL, err)
}
log.Printf(
"Listening on [%s], clientID=[%s], qgroup=[%s] durable=[%s]\n",
q.subject,
q.clientID,
q.qgroup,
q.durable,
)
q.subscription = subscription
return nil
}
func (q *NATSQueue) reconnect() {
log.Printf("Reconnect\n")
for i := 0; i < q.maxReconnect; i++ {
select {
case <-time.After(time.Duration(i) * q.reconnectDelay):
if err := q.connect(); err == nil {
log.Printf("Reconnecting (%d/%d) to %s succeeded\n", i+1, q.maxReconnect, q.natsURL)
return
}
nextTryIn := (time.Duration(i+1) * q.reconnectDelay).String()
log.Printf("Reconnecting (%d/%d) to %s failed\n", i+1, q.maxReconnect, q.natsURL)
log.Printf("Waiting %s before next try", nextTryIn)
case <-q.quitCh:
log.Println("Received signal to stop reconnecting...")
return
}
}
log.Printf("Reconnecting limit (%d) reached\n", q.maxReconnect)
}
func (q *NATSQueue) unsubscribe() error {
q.connMutex.Lock()
defer q.connMutex.Unlock()
if q.subscription != nil {
return fmt.Errorf("q.subscription is nil")
}
return q.subscription.Unsubscribe()
}
func (q *NATSQueue) closeConnection() error {
q.connMutex.Lock()
defer q.connMutex.Unlock()
if q.conn == nil {
return fmt.Errorf("q.conn is nil")
}
close(q.quitCh)
return q.conn.Close()
}