mirror of
https://github.com/openfaas/faas.git
synced 2025-06-08 16:26:47 +00:00
Note, not all `alexellis/github` references should be changed, there are a number of repos which are not part of the openfaas org, this commit excludes those. Signed-off-by: John McCabe <john@johnmccabe.net>
49 lines
947 B
Go
49 lines
947 B
Go
package handler
|
|
|
|
import (
|
|
"fmt"
|
|
"log"
|
|
|
|
"encoding/json"
|
|
|
|
"github.com/openfaas/faas/gateway/queue"
|
|
"github.com/nats-io/go-nats-streaming"
|
|
)
|
|
|
|
// NatsQueue queue for work
|
|
type NatsQueue struct {
|
|
nc stan.Conn
|
|
}
|
|
|
|
// CreateNatsQueue ready for asynchronous processing
|
|
func CreateNatsQueue(address string, port int) (*NatsQueue, error) {
|
|
queue1 := NatsQueue{}
|
|
var err error
|
|
natsURL := fmt.Sprintf("nats://%s:%d", address, port)
|
|
log.Printf("Opening connection to %s\n", natsURL)
|
|
|
|
clientID := "faas-publisher"
|
|
clusterID := "faas-cluster"
|
|
|
|
nc, err := stan.Connect(clusterID, clientID, stan.NatsURL(natsURL))
|
|
queue1.nc = nc
|
|
|
|
return &queue1, err
|
|
}
|
|
|
|
// Queue request for processing
|
|
func (q *NatsQueue) Queue(req *queue.Request) error {
|
|
var err error
|
|
|
|
fmt.Printf("NatsQueue - submitting request: %s.\n", req.Function)
|
|
|
|
out, err := json.Marshal(req)
|
|
if err != nil {
|
|
log.Println(err)
|
|
}
|
|
|
|
err = q.nc.Publish("faas-request", out)
|
|
|
|
return err
|
|
}
|