mirror of
https://github.com/openfaas/faas.git
synced 2025-06-21 14:23:25 +00:00
Update/delete vendor files
Refreshed / updated vendor dependencies Tested with "make". Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alexellis2@gmail.com>
This commit is contained in:
committed by
Alex Ellis
parent
f62bcb0736
commit
d6cf72bb39
151
gateway/vendor/github.com/nats-io/go-nats-streaming/stan.go
generated
vendored
151
gateway/vendor/github.com/nats-io/go-nats-streaming/stan.go
generated
vendored
@ -1,4 +1,4 @@
|
||||
// Copyright 2016-2018 The NATS Authors
|
||||
// Copyright 2016-2019 The NATS Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
@ -17,7 +17,6 @@ package stan
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"runtime"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@ -27,7 +26,7 @@ import (
|
||||
)
|
||||
|
||||
// Version is the NATS Streaming Go Client version
|
||||
const Version = "0.4.0"
|
||||
const Version = "0.4.4"
|
||||
|
||||
const (
|
||||
// DefaultNatsURL is the default URL the client connects to
|
||||
@ -50,17 +49,39 @@ const (
|
||||
// Conn represents a connection to the NATS Streaming subsystem. It can Publish and
|
||||
// Subscribe to messages within the NATS Streaming cluster.
|
||||
type Conn interface {
|
||||
// Publish
|
||||
// Publish will publish to the cluster and wait for an ACK.
|
||||
Publish(subject string, data []byte) error
|
||||
|
||||
// PublishAsync will publish to the cluster and asynchronously process
|
||||
// the ACK or error state. It will return the GUID for the message being sent.
|
||||
PublishAsync(subject string, data []byte, ah AckHandler) (string, error)
|
||||
|
||||
// Subscribe
|
||||
// Subscribe will perform a subscription with the given options to the cluster.
|
||||
//
|
||||
// If no option is specified, DefaultSubscriptionOptions are used. The default start
|
||||
// position is to receive new messages only (messages published after the subscription is
|
||||
// registered in the cluster).
|
||||
Subscribe(subject string, cb MsgHandler, opts ...SubscriptionOption) (Subscription, error)
|
||||
|
||||
// QueueSubscribe
|
||||
// QueueSubscribe will perform a queue subscription with the given options to the cluster.
|
||||
//
|
||||
// If no option is specified, DefaultSubscriptionOptions are used. The default start
|
||||
// position is to receive new messages only (messages published after the subscription is
|
||||
// registered in the cluster).
|
||||
QueueSubscribe(subject, qgroup string, cb MsgHandler, opts ...SubscriptionOption) (Subscription, error)
|
||||
|
||||
// Close
|
||||
// Close a connection to the cluster.
|
||||
//
|
||||
// If there are active subscriptions at the time of the close, they are implicitly closed
|
||||
// (not unsubscribed) by the cluster. This means that durable subscriptions are maintained.
|
||||
//
|
||||
// The wait on asynchronous publish calls are canceled and ErrConnectionClosed will be
|
||||
// reported to the registered AckHandler. It is possible that the cluster received and
|
||||
// persisted these messages.
|
||||
//
|
||||
// If a NATS connection is provided as an option to the Connect() call, the NATS
|
||||
// connection is NOT closed when this call is invoked. This connection needs to be
|
||||
// managed by the application.
|
||||
Close() error
|
||||
|
||||
// NatsConn returns the underlying NATS conn. Use this with care. For
|
||||
@ -106,15 +127,46 @@ type ConnectionLostHandler func(Conn, error)
|
||||
|
||||
// Options can be used to a create a customized connection.
|
||||
type Options struct {
|
||||
NatsURL string
|
||||
NatsConn *nats.Conn
|
||||
ConnectTimeout time.Duration
|
||||
AckTimeout time.Duration
|
||||
DiscoverPrefix string
|
||||
// NatsURL is an URL (or comma separated list of URLs) to a node or nodes
|
||||
// in the cluster.
|
||||
NatsURL string
|
||||
|
||||
// NatsConn is a user provided low-level NATS connection that the streaming
|
||||
// connection will use to communicate with the cluster. When set, closing
|
||||
// the NATS streaming connection does NOT close this NATS connection.
|
||||
// It is the responsibility of the application to manage the lifetime of
|
||||
// the supplied NATS connection.
|
||||
NatsConn *nats.Conn
|
||||
|
||||
// ConnectTimeout is the timeout for the initial Connect(). This value is also
|
||||
// used for some of the internal request/replies with the cluster.
|
||||
ConnectTimeout time.Duration
|
||||
|
||||
// AckTimeout is how long to wait when a message is published for an ACK from
|
||||
// the cluster. If the library does not receive an ACK after this timeout,
|
||||
// the Publish() call (or the AckHandler) will return ErrTimeout.
|
||||
AckTimeout time.Duration
|
||||
|
||||
// DiscoverPrefix is the prefix connect requests are sent to for this cluster.
|
||||
// The default is "_STAN.discover".
|
||||
DiscoverPrefix string
|
||||
|
||||
// MaxPubAcksInflight specifies how many messages can be published without
|
||||
// getting ACKs back from the cluster before the Publish() or PublishAsync()
|
||||
// calls block.
|
||||
MaxPubAcksInflight int
|
||||
PingIterval int // In seconds
|
||||
PingMaxOut int
|
||||
ConnectionLostCB ConnectionLostHandler
|
||||
|
||||
// PingInterval is the interval at which client sends PINGs to the server
|
||||
// to detect the loss of a connection.
|
||||
PingIterval int
|
||||
|
||||
// PingMaxOut specifies the maximum number of PINGs without a corresponding
|
||||
// PONG before declaring the connection permanently lost.
|
||||
PingMaxOut int
|
||||
|
||||
// ConnectionLostCB specifies the handler to be invoked when the connection
|
||||
// is permanently lost.
|
||||
ConnectionLostCB ConnectionLostHandler
|
||||
}
|
||||
|
||||
// DefaultOptions are the NATS Streaming client's default options
|
||||
@ -132,6 +184,8 @@ var DefaultOptions = Options{
|
||||
type Option func(*Options) error
|
||||
|
||||
// NatsURL is an Option to set the URL the client should connect to.
|
||||
// The url can contain username/password semantics. e.g. nats://derek:pass@localhost:4222
|
||||
// Comma separated arrays are also supported, e.g. urlA, urlB.
|
||||
func NatsURL(u string) Option {
|
||||
return func(o *Options) error {
|
||||
o.NatsURL = u
|
||||
@ -166,7 +220,8 @@ func MaxPubAcksInflight(max int) Option {
|
||||
}
|
||||
|
||||
// NatsConn is an Option to set the underlying NATS connection to be used
|
||||
// by a NATS Streaming Conn object.
|
||||
// by a streaming connection object. When such option is set, closing the
|
||||
// streaming connection does not close the provided NATS connection.
|
||||
func NatsConn(nc *nats.Conn) Option {
|
||||
return func(o *Options) error {
|
||||
o.NatsConn = nc
|
||||
@ -188,7 +243,7 @@ func Pings(interval, maxOut int) Option {
|
||||
// do not check values.
|
||||
if !testAllowMillisecInPings {
|
||||
if interval < 1 || maxOut <= 2 {
|
||||
return fmt.Errorf("Invalid ping values: interval=%v (min>0) maxOut=%v (min=2)", interval, maxOut)
|
||||
return fmt.Errorf("invalid ping values: interval=%v (min>0) maxOut=%v (min=2)", interval, maxOut)
|
||||
}
|
||||
}
|
||||
o.PingIterval = interval
|
||||
@ -285,16 +340,15 @@ func Connect(stanClusterID, clientID string, options ...Option) (Conn, error) {
|
||||
hbInbox := nats.NewInbox()
|
||||
var err error
|
||||
if c.hbSubscription, err = c.nc.Subscribe(hbInbox, c.processHeartBeat); err != nil {
|
||||
c.Close()
|
||||
c.failConnect(err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Prepare a subscription on ping responses, even if we are not
|
||||
// going to need it, so that if that fails, it fails before initiating
|
||||
// a connection.
|
||||
pingSub, err := c.nc.Subscribe(nats.NewInbox(), c.processPingResponse)
|
||||
if err != nil {
|
||||
c.Close()
|
||||
if c.pingSub, err = c.nc.Subscribe(nats.NewInbox(), c.processPingResponse); err != nil {
|
||||
c.failConnect(err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -311,7 +365,7 @@ func Connect(stanClusterID, clientID string, options ...Option) (Conn, error) {
|
||||
b, _ := req.Marshal()
|
||||
reply, err := c.nc.Request(discoverSubject, b, c.opts.ConnectTimeout)
|
||||
if err != nil {
|
||||
c.Close()
|
||||
c.failConnect(err)
|
||||
if err == nats.ErrTimeout {
|
||||
return nil, ErrConnectReqTimeout
|
||||
}
|
||||
@ -321,14 +375,17 @@ func Connect(stanClusterID, clientID string, options ...Option) (Conn, error) {
|
||||
cr := &pb.ConnectResponse{}
|
||||
err = cr.Unmarshal(reply.Data)
|
||||
if err != nil {
|
||||
c.Close()
|
||||
c.failConnect(err)
|
||||
return nil, err
|
||||
}
|
||||
if cr.Error != "" {
|
||||
c.Close()
|
||||
c.failConnect(err)
|
||||
return nil, errors.New(cr.Error)
|
||||
}
|
||||
|
||||
// Past this point, we need to call Close() on error because the server
|
||||
// has accepted our connection.
|
||||
|
||||
// Capture cluster configuration endpoints to publish and subscribe/unsubscribe.
|
||||
c.pubPrefix = cr.PubPrefix
|
||||
c.subRequests = cr.SubRequests
|
||||
@ -354,7 +411,7 @@ func Connect(stanClusterID, clientID string, options ...Option) (Conn, error) {
|
||||
c.connLostCB = c.opts.ConnectionLostCB
|
||||
|
||||
unsubPingSub := true
|
||||
// Do this with servers which are at least at protcolOne.
|
||||
// Do this with servers which are at least at protocolOne.
|
||||
if cr.Protocol >= protocolOne {
|
||||
// Note that in the future server may override client ping
|
||||
// interval value sent in ConnectRequest, so use the
|
||||
@ -367,7 +424,7 @@ func Connect(stanClusterID, clientID string, options ...Option) (Conn, error) {
|
||||
|
||||
// These will be immutable.
|
||||
c.pingRequests = cr.PingRequests
|
||||
c.pingInbox = pingSub.Subject
|
||||
c.pingInbox = c.pingSub.Subject
|
||||
// In test, it is possible that we get a negative value
|
||||
// to represent milliseconds.
|
||||
if testAllowMillisecInPings && cr.PingInterval < 0 {
|
||||
@ -378,7 +435,6 @@ func Connect(stanClusterID, clientID string, options ...Option) (Conn, error) {
|
||||
}
|
||||
c.pingMaxOut = int(cr.PingMaxOut)
|
||||
c.pingBytes, _ = (&pb.Ping{ConnID: c.connID}).Marshal()
|
||||
c.pingSub = pingSub
|
||||
// Set the timer now that we are set. Use lock to create
|
||||
// synchronization point.
|
||||
c.pingMu.Lock()
|
||||
@ -387,15 +443,23 @@ func Connect(stanClusterID, clientID string, options ...Option) (Conn, error) {
|
||||
}
|
||||
}
|
||||
if unsubPingSub {
|
||||
pingSub.Unsubscribe()
|
||||
c.pingSub.Unsubscribe()
|
||||
c.pingSub = nil
|
||||
}
|
||||
|
||||
// Attach a finalizer
|
||||
runtime.SetFinalizer(&c, func(sc *conn) { sc.Close() })
|
||||
|
||||
return &c, nil
|
||||
}
|
||||
|
||||
// Invoked on a failed connect.
|
||||
// Perform appropriate cleanup operations but do not attempt to send
|
||||
// a close request.
|
||||
func (sc *conn) failConnect(err error) {
|
||||
sc.cleanupOnClose(err)
|
||||
if sc.nc != nil && sc.ncOwned {
|
||||
sc.nc.Close()
|
||||
}
|
||||
}
|
||||
|
||||
// Sends a PING (containing the connection's ID) to the server at intervals
|
||||
// specified by PingInterval option when connection is created.
|
||||
// Everytime a PING is sent, the number of outstanding PINGs is increased.
|
||||
@ -460,7 +524,7 @@ func (sc *conn) closeDueToPing(err error) {
|
||||
}
|
||||
// Stop timer, unsubscribe, fail the pubs, etc..
|
||||
sc.cleanupOnClose(err)
|
||||
// No need to send Close prototol, so simply close the underlying
|
||||
// No need to send Close protocol, so simply close the underlying
|
||||
// NATS connection (if we own it, and if not already closed)
|
||||
if sc.ncOwned && !sc.nc.IsClosed() {
|
||||
sc.nc.Close()
|
||||
@ -488,15 +552,21 @@ func (sc *conn) cleanupOnClose(err error) {
|
||||
}
|
||||
sc.pingMu.Unlock()
|
||||
|
||||
// Unsubscribe only if the NATS connection is not already closed...
|
||||
if !sc.nc.IsClosed() {
|
||||
if sc.ackSubscription != nil {
|
||||
sc.ackSubscription.Unsubscribe()
|
||||
// Unsubscribe only if the NATS connection is not already closed
|
||||
// and we don't own it (otherwise connection is going to be closed
|
||||
// so no need for explicit unsubscribe).
|
||||
if !sc.ncOwned && !sc.nc.IsClosed() {
|
||||
if sc.hbSubscription != nil {
|
||||
sc.hbSubscription.Unsubscribe()
|
||||
}
|
||||
if sc.pingSub != nil {
|
||||
sc.pingSub.Unsubscribe()
|
||||
}
|
||||
if sc.ackSubscription != nil {
|
||||
sc.ackSubscription.Unsubscribe()
|
||||
}
|
||||
}
|
||||
|
||||
// Fail all pending pubs
|
||||
for guid, pubAck := range sc.pubAckMap {
|
||||
if pubAck.t != nil {
|
||||
@ -586,7 +656,7 @@ func (sc *conn) processAck(m *nats.Msg) {
|
||||
pa := &pb.PubAck{}
|
||||
err := pa.Unmarshal(m.Data)
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("Error during ack unmarshal: %v", err))
|
||||
panic(fmt.Errorf("error during ack unmarshal: %v", err))
|
||||
}
|
||||
|
||||
// Remove
|
||||
@ -722,13 +792,16 @@ func (sc *conn) processMsg(raw *nats.Msg) {
|
||||
msg := &Msg{}
|
||||
err := msg.Unmarshal(raw.Data)
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("Error processing unmarshal for msg: %v", err))
|
||||
panic(fmt.Errorf("error processing unmarshal for msg: %v", err))
|
||||
}
|
||||
var sub *subscription
|
||||
// Lookup the subscription
|
||||
sc.RLock()
|
||||
nc := sc.nc
|
||||
isClosed := nc == nil
|
||||
sub := sc.subMap[raw.Subject]
|
||||
if !isClosed {
|
||||
sub = sc.subMap[raw.Subject]
|
||||
}
|
||||
sc.RUnlock()
|
||||
|
||||
// Check if sub is no longer valid or connection has been closed.
|
||||
|
Reference in New Issue
Block a user