Alex Ellis (VMware) 223c561706 Vendor new queue-worker version
Introduces 0.4.6 of queue-worker - see upstream repo for changes.

Signed-off-by: Alex Ellis (VMware) <alexellis2@gmail.com>
2018-06-18 20:10:46 +01:00

762 lines
22 KiB
Go

// Copyright 2016-2018 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package stan is a Go client for the NATS Streaming messaging system (https://nats.io).
package stan
import (
"errors"
"fmt"
"runtime"
"sync"
"time"
"github.com/nats-io/go-nats"
"github.com/nats-io/go-nats-streaming/pb"
"github.com/nats-io/nuid"
)
// Version is the NATS Streaming Go Client version
const Version = "0.4.0"
const (
// DefaultNatsURL is the default URL the client connects to
DefaultNatsURL = "nats://localhost:4222"
// DefaultConnectWait is the default timeout used for the connect operation
DefaultConnectWait = 2 * time.Second
// DefaultDiscoverPrefix is the prefix subject used to connect to the NATS Streaming server
DefaultDiscoverPrefix = "_STAN.discover"
// DefaultACKPrefix is the prefix subject used to send ACKs to the NATS Streaming server
DefaultACKPrefix = "_STAN.acks"
// DefaultMaxPubAcksInflight is the default maximum number of published messages
// without outstanding ACKs from the server
DefaultMaxPubAcksInflight = 16384
// DefaultPingInterval is the default interval (in seconds) at which a connection sends a PING to the server
DefaultPingInterval = 5
// DefaultPingMaxOut is the number of PINGs without a response before the connection is considered lost.
DefaultPingMaxOut = 3
)
// Conn represents a connection to the NATS Streaming subsystem. It can Publish and
// Subscribe to messages within the NATS Streaming cluster.
type Conn interface {
// Publish
Publish(subject string, data []byte) error
PublishAsync(subject string, data []byte, ah AckHandler) (string, error)
// Subscribe
Subscribe(subject string, cb MsgHandler, opts ...SubscriptionOption) (Subscription, error)
// QueueSubscribe
QueueSubscribe(subject, qgroup string, cb MsgHandler, opts ...SubscriptionOption) (Subscription, error)
// Close
Close() error
// NatsConn returns the underlying NATS conn. Use this with care. For
// example, closing the wrapped NATS conn will put the NATS Streaming Conn
// in an invalid state.
NatsConn() *nats.Conn
}
const (
// Client send connID in ConnectRequest and PubMsg, and server
// listens and responds to client PINGs. The validity of the
// connection (based on connID) is checked on incoming PINGs.
protocolOne = int32(1)
)
// Errors
var (
ErrConnectReqTimeout = errors.New("stan: connect request timeout")
ErrCloseReqTimeout = errors.New("stan: close request timeout")
ErrSubReqTimeout = errors.New("stan: subscribe request timeout")
ErrUnsubReqTimeout = errors.New("stan: unsubscribe request timeout")
ErrConnectionClosed = errors.New("stan: connection closed")
ErrTimeout = errors.New("stan: publish ack timeout")
ErrBadAck = errors.New("stan: malformed ack")
ErrBadSubscription = errors.New("stan: invalid subscription")
ErrBadConnection = errors.New("stan: invalid connection")
ErrManualAck = errors.New("stan: cannot manually ack in auto-ack mode")
ErrNilMsg = errors.New("stan: nil message")
ErrNoServerSupport = errors.New("stan: not supported by server")
ErrMaxPings = errors.New("stan: connection lost due to PING failure")
)
var testAllowMillisecInPings = false
// AckHandler is used for Async Publishing to provide status of the ack.
// The func will be passed the GUID and any error state. No error means the
// message was successfully received by NATS Streaming.
type AckHandler func(string, error)
// ConnectionLostHandler is used to be notified if the Streaming connection
// is closed due to unexpected errors.
type ConnectionLostHandler func(Conn, error)
// Options can be used to a create a customized connection.
type Options struct {
NatsURL string
NatsConn *nats.Conn
ConnectTimeout time.Duration
AckTimeout time.Duration
DiscoverPrefix string
MaxPubAcksInflight int
PingIterval int // In seconds
PingMaxOut int
ConnectionLostCB ConnectionLostHandler
}
// DefaultOptions are the NATS Streaming client's default options
var DefaultOptions = Options{
NatsURL: DefaultNatsURL,
ConnectTimeout: DefaultConnectWait,
AckTimeout: DefaultAckWait,
DiscoverPrefix: DefaultDiscoverPrefix,
MaxPubAcksInflight: DefaultMaxPubAcksInflight,
PingIterval: DefaultPingInterval,
PingMaxOut: DefaultPingMaxOut,
}
// Option is a function on the options for a connection.
type Option func(*Options) error
// NatsURL is an Option to set the URL the client should connect to.
func NatsURL(u string) Option {
return func(o *Options) error {
o.NatsURL = u
return nil
}
}
// ConnectWait is an Option to set the timeout for establishing a connection.
func ConnectWait(t time.Duration) Option {
return func(o *Options) error {
o.ConnectTimeout = t
return nil
}
}
// PubAckWait is an Option to set the timeout for waiting for an ACK for a
// published message.
func PubAckWait(t time.Duration) Option {
return func(o *Options) error {
o.AckTimeout = t
return nil
}
}
// MaxPubAcksInflight is an Option to set the maximum number of published
// messages without outstanding ACKs from the server.
func MaxPubAcksInflight(max int) Option {
return func(o *Options) error {
o.MaxPubAcksInflight = max
return nil
}
}
// NatsConn is an Option to set the underlying NATS connection to be used
// by a NATS Streaming Conn object.
func NatsConn(nc *nats.Conn) Option {
return func(o *Options) error {
o.NatsConn = nc
return nil
}
}
// Pings is an Option to set the ping interval and max out values.
// The interval needs to be at least 1 and represents the number
// of seconds.
// The maxOut needs to be at least 2, since the count of sent PINGs
// increase whenever a PING is sent and reset to 0 when a response
// is received. Setting to 1 would cause the library to close the
// connection right away.
func Pings(interval, maxOut int) Option {
return func(o *Options) error {
// For tests, we may pass negative value that will be interpreted
// by the library as milliseconds. If this test boolean is set,
// do not check values.
if !testAllowMillisecInPings {
if interval < 1 || maxOut <= 2 {
return fmt.Errorf("Invalid ping values: interval=%v (min>0) maxOut=%v (min=2)", interval, maxOut)
}
}
o.PingIterval = interval
o.PingMaxOut = maxOut
return nil
}
}
// SetConnectionLostHandler is an Option to set the connection lost handler.
// This callback will be invoked should the client permanently lose
// contact with the server (or another client replaces it while being
// disconnected). The callback will not be invoked on normal Conn.Close().
func SetConnectionLostHandler(handler ConnectionLostHandler) Option {
return func(o *Options) error {
o.ConnectionLostCB = handler
return nil
}
}
// A conn represents a bare connection to a stan cluster.
type conn struct {
sync.RWMutex
clientID string
connID []byte // This is a NUID that uniquely identify connections.
pubPrefix string // Publish prefix set by stan, append our subject.
subRequests string // Subject to send subscription requests.
unsubRequests string // Subject to send unsubscribe requests.
subCloseRequests string // Subject to send subscription close requests.
closeRequests string // Subject to send close requests.
ackSubject string // publish acks
ackSubscription *nats.Subscription
hbSubscription *nats.Subscription
subMap map[string]*subscription
pubAckMap map[string]*ack
pubAckChan chan (struct{})
opts Options
nc *nats.Conn
ncOwned bool // NATS Streaming created the connection, so needs to close it.
pubNUID *nuid.NUID // NUID generator for published messages.
connLostCB ConnectionLostHandler
pingMu sync.Mutex
pingSub *nats.Subscription
pingTimer *time.Timer
pingBytes []byte
pingRequests string
pingInbox string
pingInterval time.Duration
pingMaxOut int
pingOut int
}
// Closure for ack contexts.
type ack struct {
t *time.Timer
ah AckHandler
ch chan error
}
// Connect will form a connection to the NATS Streaming subsystem.
// Note that clientID can contain only alphanumeric and `-` or `_` characters.
func Connect(stanClusterID, clientID string, options ...Option) (Conn, error) {
// Process Options
c := conn{clientID: clientID, opts: DefaultOptions, connID: []byte(nuid.Next()), pubNUID: nuid.New()}
for _, opt := range options {
if err := opt(&c.opts); err != nil {
return nil, err
}
}
// Check if the user has provided a connection as an option
c.nc = c.opts.NatsConn
// Create a NATS connection if it doesn't exist.
if c.nc == nil {
// We will set the max reconnect attempts to -1 (infinite)
// and the reconnect buffer to -1 to prevent any buffering
// (which may cause a published message to be flushed on
// reconnect while the API may have returned an error due
// to PubAck timeout.
nc, err := nats.Connect(c.opts.NatsURL,
nats.Name(clientID),
nats.MaxReconnects(-1),
nats.ReconnectBufSize(-1))
if err != nil {
return nil, err
}
c.nc = nc
c.ncOwned = true
} else if !c.nc.IsConnected() {
// Bail if the custom NATS connection is disconnected
return nil, ErrBadConnection
}
// Create a heartbeat inbox
hbInbox := nats.NewInbox()
var err error
if c.hbSubscription, err = c.nc.Subscribe(hbInbox, c.processHeartBeat); err != nil {
c.Close()
return nil, err
}
// Prepare a subscription on ping responses, even if we are not
// going to need it, so that if that fails, it fails before initiating
// a connection.
pingSub, err := c.nc.Subscribe(nats.NewInbox(), c.processPingResponse)
if err != nil {
c.Close()
return nil, err
}
// Send Request to discover the cluster
discoverSubject := c.opts.DiscoverPrefix + "." + stanClusterID
req := &pb.ConnectRequest{
ClientID: clientID,
HeartbeatInbox: hbInbox,
ConnID: c.connID,
Protocol: protocolOne,
PingInterval: int32(c.opts.PingIterval),
PingMaxOut: int32(c.opts.PingMaxOut),
}
b, _ := req.Marshal()
reply, err := c.nc.Request(discoverSubject, b, c.opts.ConnectTimeout)
if err != nil {
c.Close()
if err == nats.ErrTimeout {
return nil, ErrConnectReqTimeout
}
return nil, err
}
// Process the response, grab server pubPrefix
cr := &pb.ConnectResponse{}
err = cr.Unmarshal(reply.Data)
if err != nil {
c.Close()
return nil, err
}
if cr.Error != "" {
c.Close()
return nil, errors.New(cr.Error)
}
// Capture cluster configuration endpoints to publish and subscribe/unsubscribe.
c.pubPrefix = cr.PubPrefix
c.subRequests = cr.SubRequests
c.unsubRequests = cr.UnsubRequests
c.subCloseRequests = cr.SubCloseRequests
c.closeRequests = cr.CloseRequests
// Setup the ACK subscription
c.ackSubject = DefaultACKPrefix + "." + nuid.Next()
if c.ackSubscription, err = c.nc.Subscribe(c.ackSubject, c.processAck); err != nil {
c.Close()
return nil, err
}
c.ackSubscription.SetPendingLimits(1024*1024, 32*1024*1024)
c.pubAckMap = make(map[string]*ack)
// Create Subscription map
c.subMap = make(map[string]*subscription)
c.pubAckChan = make(chan struct{}, c.opts.MaxPubAcksInflight)
// Capture the connection error cb
c.connLostCB = c.opts.ConnectionLostCB
unsubPingSub := true
// Do this with servers which are at least at protcolOne.
if cr.Protocol >= protocolOne {
// Note that in the future server may override client ping
// interval value sent in ConnectRequest, so use the
// value in ConnectResponse to decide if we send PINGs
// and at what interval.
// In tests, the interval could be negative to indicate
// milliseconds.
if cr.PingInterval != 0 {
unsubPingSub = false
// These will be immutable.
c.pingRequests = cr.PingRequests
c.pingInbox = pingSub.Subject
// In test, it is possible that we get a negative value
// to represent milliseconds.
if testAllowMillisecInPings && cr.PingInterval < 0 {
c.pingInterval = time.Duration(cr.PingInterval*-1) * time.Millisecond
} else {
// PingInterval is otherwise assumed to be in seconds.
c.pingInterval = time.Duration(cr.PingInterval) * time.Second
}
c.pingMaxOut = int(cr.PingMaxOut)
c.pingBytes, _ = (&pb.Ping{ConnID: c.connID}).Marshal()
c.pingSub = pingSub
// Set the timer now that we are set. Use lock to create
// synchronization point.
c.pingMu.Lock()
c.pingTimer = time.AfterFunc(c.pingInterval, c.pingServer)
c.pingMu.Unlock()
}
}
if unsubPingSub {
pingSub.Unsubscribe()
}
// Attach a finalizer
runtime.SetFinalizer(&c, func(sc *conn) { sc.Close() })
return &c, nil
}
// Sends a PING (containing the connection's ID) to the server at intervals
// specified by PingInterval option when connection is created.
// Everytime a PING is sent, the number of outstanding PINGs is increased.
// If the total number is > than the PingMaxOut option, then the connection
// is closed, and connection error callback invoked if one was specified.
func (sc *conn) pingServer() {
sc.pingMu.Lock()
// In case the timer fired while we were stopping it.
if sc.pingTimer == nil {
sc.pingMu.Unlock()
return
}
sc.pingOut++
if sc.pingOut > sc.pingMaxOut {
sc.pingMu.Unlock()
sc.closeDueToPing(ErrMaxPings)
return
}
sc.pingTimer.Reset(sc.pingInterval)
nc := sc.nc
sc.pingMu.Unlock()
// Send the PING now. If the NATS connection is reported closed,
// we are done.
if err := nc.PublishRequest(sc.pingRequests, sc.pingInbox, sc.pingBytes); err == nats.ErrConnectionClosed {
sc.closeDueToPing(err)
}
}
// Receives PING responses from the server.
// If the response contains an error message, the connection is closed
// and the connection error callback is invoked (if one is specified).
// If no error, the number of ping out is reset to 0. There is no
// decrement by one since for a given PING, the client may received
// many responses when servers are running in channel partitioning mode.
// Regardless, any positive response from the server ought to signal
// that the connection is ok.
func (sc *conn) processPingResponse(m *nats.Msg) {
// No data means OK (we don't have to call Unmarshal)
if len(m.Data) > 0 {
pingResp := &pb.PingResponse{}
if err := pingResp.Unmarshal(m.Data); err != nil {
return
}
if pingResp.Error != "" {
sc.closeDueToPing(errors.New(pingResp.Error))
return
}
}
// Do not attempt to decrement, simply reset to 0.
sc.pingMu.Lock()
sc.pingOut = 0
sc.pingMu.Unlock()
}
// Closes a connection and invoke the connection error callback if one
// was registered when the connection was created.
func (sc *conn) closeDueToPing(err error) {
sc.Lock()
if sc.nc == nil {
sc.Unlock()
return
}
// Stop timer, unsubscribe, fail the pubs, etc..
sc.cleanupOnClose(err)
// No need to send Close prototol, so simply close the underlying
// NATS connection (if we own it, and if not already closed)
if sc.ncOwned && !sc.nc.IsClosed() {
sc.nc.Close()
}
// Mark this streaming connection as closed. Do this under pingMu lock.
sc.pingMu.Lock()
sc.nc = nil
sc.pingMu.Unlock()
// Capture callback (even though this is immutable).
cb := sc.connLostCB
sc.Unlock()
if cb != nil {
// Execute in separate go routine.
go cb(sc, err)
}
}
// Do some cleanup when connection is lost or closed.
// Connection lock is held on entry, and sc.nc is guaranteed not to be nil.
func (sc *conn) cleanupOnClose(err error) {
sc.pingMu.Lock()
if sc.pingTimer != nil {
sc.pingTimer.Stop()
sc.pingTimer = nil
}
sc.pingMu.Unlock()
// Unsubscribe only if the NATS connection is not already closed...
if !sc.nc.IsClosed() {
if sc.ackSubscription != nil {
sc.ackSubscription.Unsubscribe()
}
if sc.pingSub != nil {
sc.pingSub.Unsubscribe()
}
}
// Fail all pending pubs
for guid, pubAck := range sc.pubAckMap {
if pubAck.t != nil {
pubAck.t.Stop()
}
if pubAck.ah != nil {
pubAck.ah(guid, err)
} else if pubAck.ch != nil {
pubAck.ch <- err
}
delete(sc.pubAckMap, guid)
if len(sc.pubAckChan) > 0 {
<-sc.pubAckChan
}
}
}
// Close a connection to the stan system.
func (sc *conn) Close() error {
sc.Lock()
defer sc.Unlock()
if sc.nc == nil {
// We are already closed.
return nil
}
// Capture for NATS calls below.
nc := sc.nc
if sc.ncOwned {
defer nc.Close()
}
// Now close ourselves.
sc.cleanupOnClose(ErrConnectionClosed)
// Signals we are closed.
// Do this also under pingMu lock so that we don't need
// to grab sc's lock in pingServer.
sc.pingMu.Lock()
sc.nc = nil
sc.pingMu.Unlock()
req := &pb.CloseRequest{ClientID: sc.clientID}
b, _ := req.Marshal()
reply, err := nc.Request(sc.closeRequests, b, sc.opts.ConnectTimeout)
if err != nil {
if err == nats.ErrTimeout {
return ErrCloseReqTimeout
}
return err
}
cr := &pb.CloseResponse{}
err = cr.Unmarshal(reply.Data)
if err != nil {
return err
}
if cr.Error != "" {
return errors.New(cr.Error)
}
return nil
}
// NatsConn returns the underlying NATS conn. Use this with care. For example,
// closing the wrapped NATS conn will put the NATS Streaming Conn in an invalid
// state.
func (sc *conn) NatsConn() *nats.Conn {
sc.RLock()
nc := sc.nc
sc.RUnlock()
return nc
}
// Process a heartbeat from the NATS Streaming cluster
func (sc *conn) processHeartBeat(m *nats.Msg) {
// No payload assumed, just reply.
sc.RLock()
nc := sc.nc
sc.RUnlock()
if nc != nil {
nc.Publish(m.Reply, nil)
}
}
// Process an ack from the NATS Streaming cluster
func (sc *conn) processAck(m *nats.Msg) {
pa := &pb.PubAck{}
err := pa.Unmarshal(m.Data)
if err != nil {
panic(fmt.Errorf("Error during ack unmarshal: %v", err))
}
// Remove
a := sc.removeAck(pa.Guid)
if a != nil {
// Capture error if it exists.
if pa.Error != "" {
err = errors.New(pa.Error)
}
if a.ah != nil {
// Perform the ackHandler callback
a.ah(pa.Guid, err)
} else if a.ch != nil {
// Send to channel directly
a.ch <- err
}
}
}
// Publish will publish to the cluster and wait for an ACK.
func (sc *conn) Publish(subject string, data []byte) error {
// Need to make this a buffered channel of 1 in case
// a publish call is blocked in pubAckChan but cleanupOnClose()
// is trying to push the error to this channel.
ch := make(chan error, 1)
_, err := sc.publishAsync(subject, data, nil, ch)
if err == nil {
err = <-ch
}
return err
}
// PublishAsync will publish to the cluster on pubPrefix+subject and asynchronously
// process the ACK or error state. It will return the GUID for the message being sent.
func (sc *conn) PublishAsync(subject string, data []byte, ah AckHandler) (string, error) {
return sc.publishAsync(subject, data, ah, nil)
}
func (sc *conn) publishAsync(subject string, data []byte, ah AckHandler, ch chan error) (string, error) {
a := &ack{ah: ah, ch: ch}
sc.Lock()
if sc.nc == nil {
sc.Unlock()
return "", ErrConnectionClosed
}
subj := sc.pubPrefix + "." + subject
// This is only what we need from PubMsg in the timer below,
// so do this so that pe doesn't escape.
peGUID := sc.pubNUID.Next()
// We send connID regardless of server we connect to. Older server
// will simply not decode it.
pe := &pb.PubMsg{ClientID: sc.clientID, Guid: peGUID, Subject: subject, Data: data, ConnID: sc.connID}
b, _ := pe.Marshal()
// Map ack to guid.
sc.pubAckMap[peGUID] = a
// snapshot
ackSubject := sc.ackSubject
ackTimeout := sc.opts.AckTimeout
pac := sc.pubAckChan
nc := sc.nc
sc.Unlock()
// Use the buffered channel to control the number of outstanding acks.
pac <- struct{}{}
err := nc.PublishRequest(subj, ackSubject, b)
// Setup the timer for expiration.
sc.Lock()
if err != nil || sc.nc == nil {
sc.Unlock()
// If we got and error on publish or the connection has been closed,
// we need to return an error only if:
// - we can remove the pubAck from the map
// - we can't, but this is an async pub with no provided AckHandler
removed := sc.removeAck(peGUID) != nil
if removed || (ch == nil && ah == nil) {
if err == nil {
err = ErrConnectionClosed
}
return "", err
}
// pubAck was removed from cleanupOnClose() and error will be sent
// to appropriate go channel (ah or ch).
return peGUID, nil
}
a.t = time.AfterFunc(ackTimeout, func() {
pubAck := sc.removeAck(peGUID)
// processAck could get here before and handle the ack.
// If that's the case, we would get nil here and simply return.
if pubAck == nil {
return
}
if pubAck.ah != nil {
pubAck.ah(peGUID, ErrTimeout)
} else if a.ch != nil {
pubAck.ch <- ErrTimeout
}
})
sc.Unlock()
return peGUID, nil
}
// removeAck removes the ack from the pubAckMap and cancels any state, e.g. timers
func (sc *conn) removeAck(guid string) *ack {
var t *time.Timer
sc.Lock()
a := sc.pubAckMap[guid]
if a != nil {
t = a.t
delete(sc.pubAckMap, guid)
}
pac := sc.pubAckChan
sc.Unlock()
// Cancel timer if needed.
if t != nil {
t.Stop()
}
// Remove from channel to unblock PublishAsync
if a != nil && len(pac) > 0 {
<-pac
}
return a
}
// Process an msg from the NATS Streaming cluster
func (sc *conn) processMsg(raw *nats.Msg) {
msg := &Msg{}
err := msg.Unmarshal(raw.Data)
if err != nil {
panic(fmt.Errorf("Error processing unmarshal for msg: %v", err))
}
// Lookup the subscription
sc.RLock()
nc := sc.nc
isClosed := nc == nil
sub := sc.subMap[raw.Subject]
sc.RUnlock()
// Check if sub is no longer valid or connection has been closed.
if sub == nil || isClosed {
return
}
// Store in msg for backlink
msg.Sub = sub
sub.RLock()
cb := sub.cb
ackSubject := sub.ackInbox
isManualAck := sub.opts.ManualAcks
subsc := sub.sc // Can be nil if sub has been unsubscribed.
sub.RUnlock()
// Perform the callback
if cb != nil && subsc != nil {
cb(msg)
}
// Process auto-ack
if !isManualAck && nc != nil {
ack := &pb.Ack{Subject: msg.Subject, Sequence: msg.Sequence}
b, _ := ack.Marshal()
// FIXME(dlc) - Async error handler? Retry?
nc.Publish(ackSubject, b)
}
}