Migrate to Go modules

Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alexellis2@gmail.com>
This commit is contained in:
Alex Ellis (OpenFaaS Ltd)
2021-01-20 12:12:11 +00:00
committed by Alex Ellis
parent 2e2250afe8
commit 7ce266adc0
667 changed files with 211781 additions and 17546 deletions

View File

@ -1,8 +1,7 @@
language: go
sudo: false
go:
- 1.14.x
- 1.13.x
- 1.12.x
env:
- GO111MODULE=off
go_import_path: github.com/nats-io/nats.go
@ -21,4 +20,4 @@ before_script:
script:
- go test -i -race ./...
- go test -v -run=TestNoRace -p=1 ./...
- if [[ "$TRAVIS_GO_VERSION" =~ 1.13 ]]; then ./scripts/cov.sh TRAVIS; else go test -race -v -p=1 ./... --failfast; fi
- if [[ "$TRAVIS_GO_VERSION" =~ 1.14 ]]; then ./scripts/cov.sh TRAVIS; else go test -race -v -p=1 ./... --failfast; fi

View File

@ -20,7 +20,7 @@ When using or transitioning to Go modules support:
```bash
# Go client latest or explicit version
go get github.com/nats-io/nats.go/@latest
go get github.com/nats-io/nats.go/@v1.9.2
go get github.com/nats-io/nats.go/@v1.10.0
# For latest NATS Server, add /v2 at the end
go get github.com/nats-io/nats-server/v2
@ -325,6 +325,18 @@ nc, err := nats.Connect(servers)
// This example means 10 seconds total per backend.
nc, err = nats.Connect(servers, nats.MaxReconnects(5), nats.ReconnectWait(2 * time.Second))
// You can also add some jitter for the reconnection.
// This call will add up to 500 milliseconds for non TLS connections and 2 seconds for TLS connections.
// If not specified, the library defaults to 100 milliseconds and 1 second, respectively.
nc, err = nats.Connect(servers, nats.ReconnectJitter(500*time.Millisecond, 2*time.Second))
// You can also specify a custom reconnect delay handler. If set, the library will invoke it when it has tried
// all URLs in its list. The value returned will be used as the total sleep time, so add your own jitter.
// The library will pass the number of times it went through the whole list.
nc, err = nats.Connect(servers, nats.CustomReconnectDelay(func(attempts int) time.Duration {
return someBackoffFunction(attempts)
}))
// Optionally disable randomization of the server pool
nc, err = nats.Connect(servers, nats.DontRandomize())

View File

@ -1,4 +1,4 @@
// Copyright 2012-2019 The NATS Authors
// Copyright 2012-2020 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@ -45,19 +45,21 @@ import (
// Default Constants
const (
Version = "1.9.2"
DefaultURL = "nats://127.0.0.1:4222"
DefaultPort = 4222
DefaultMaxReconnect = 60
DefaultReconnectWait = 2 * time.Second
DefaultTimeout = 2 * time.Second
DefaultPingInterval = 2 * time.Minute
DefaultMaxPingOut = 2
DefaultMaxChanLen = 8192 // 8k
DefaultReconnectBufSize = 8 * 1024 * 1024 // 8MB
RequestChanLen = 8
DefaultDrainTimeout = 30 * time.Second
LangString = "go"
Version = "1.10.0"
DefaultURL = "nats://127.0.0.1:4222"
DefaultPort = 4222
DefaultMaxReconnect = 60
DefaultReconnectWait = 2 * time.Second
DefaultReconnectJitter = 100 * time.Millisecond
DefaultReconnectJitterTLS = time.Second
DefaultTimeout = 2 * time.Second
DefaultPingInterval = 2 * time.Minute
DefaultMaxPingOut = 2
DefaultMaxChanLen = 8192 // 8k
DefaultReconnectBufSize = 8 * 1024 * 1024 // 8MB
RequestChanLen = 8
DefaultDrainTimeout = 30 * time.Second
LangString = "go"
)
const (
@ -116,6 +118,8 @@ var (
ErrTokenAlreadySet = errors.New("nats: token and token handler both set")
ErrMsgNotBound = errors.New("nats: message is not bound to subscription/connection")
ErrMsgNoReply = errors.New("nats: message does not have a reply")
ErrClientIPNotSupported = errors.New("nats: client IP not supported by this server")
ErrDisconnected = errors.New("nats: server is disconnected")
)
func init() {
@ -125,15 +129,17 @@ func init() {
// GetDefaultOptions returns default configuration options for the client.
func GetDefaultOptions() Options {
return Options{
AllowReconnect: true,
MaxReconnect: DefaultMaxReconnect,
ReconnectWait: DefaultReconnectWait,
Timeout: DefaultTimeout,
PingInterval: DefaultPingInterval,
MaxPingsOut: DefaultMaxPingOut,
SubChanLen: DefaultMaxChanLen,
ReconnectBufSize: DefaultReconnectBufSize,
DrainTimeout: DefaultDrainTimeout,
AllowReconnect: true,
MaxReconnect: DefaultMaxReconnect,
ReconnectWait: DefaultReconnectWait,
ReconnectJitter: DefaultReconnectJitter,
ReconnectJitterTLS: DefaultReconnectJitterTLS,
Timeout: DefaultTimeout,
PingInterval: DefaultPingInterval,
MaxPingsOut: DefaultMaxPingOut,
SubChanLen: DefaultMaxChanLen,
ReconnectBufSize: DefaultReconnectBufSize,
DrainTimeout: DefaultDrainTimeout,
}
}
@ -180,6 +186,12 @@ type SignatureHandler func([]byte) ([]byte, error)
// AuthTokenHandler is used to generate a new token.
type AuthTokenHandler func() string
// ReconnectDelayHandler is used to get from the user the desired
// delay the library should pause before attempting to reconnect
// again. Note that this is invoked after the library tried the
// whole list of URLs and failed to reconnect.
type ReconnectDelayHandler func(attempts int) time.Duration
// asyncCB is used to preserve order for async callbacks.
type asyncCB struct {
f func()
@ -256,6 +268,24 @@ type Options struct {
// to a server that we were already connected to previously.
ReconnectWait time.Duration
// CustomReconnectDelayCB is invoked after the library tried every
// URL in the server list and failed to reconnect. It passes to the
// user the current number of attempts. This function returns the
// amount of time the library will sleep before attempting to reconnect
// again. It is strongly recommended that this value contains some
// jitter to prevent all connections to attempt reconnecting at the same time.
CustomReconnectDelayCB ReconnectDelayHandler
// ReconnectJitter sets the upper bound for a random delay added to
// ReconnectWait during a reconnect when no TLS is used.
// Note that any jitter is capped with ReconnectJitterMax.
ReconnectJitter time.Duration
// ReconnectJitterTLS sets the upper bound for a random delay added to
// ReconnectWait during a reconnect when TLS is used.
// Note that any jitter is capped with ReconnectJitterMax.
ReconnectJitterTLS time.Duration
// Timeout sets the timeout for a Dial operation on a connection.
Timeout time.Duration
@ -409,6 +439,7 @@ type Conn struct {
ptmr *time.Timer
pout int
ar bool // abort reconnect
rqch chan struct{}
// New style response handler
respSub string // The wildcard subject
@ -486,17 +517,16 @@ type Statistics struct {
// Tracks individual backend servers.
type srv struct {
url *url.URL
didConnect bool
reconnects int
lastAttempt time.Time
lastErr error
isImplicit bool
tlsName string
url *url.URL
didConnect bool
reconnects int
lastErr error
isImplicit bool
tlsName string
}
type serverInfo struct {
Id string `json:"server_id"`
ID string `json:"server_id"`
Host string `json:"host"`
Port uint `json:"port"`
Version string `json:"version"`
@ -506,6 +536,7 @@ type serverInfo struct {
ConnectURLs []string `json:"connect_urls,omitempty"`
Proto int `json:"proto,omitempty"`
CID uint64 `json:"client_id,omitempty"`
ClientIP string `json:"client_ip,omitempty"`
Nonce string `json:"nonce,omitempty"`
}
@ -669,6 +700,24 @@ func MaxReconnects(max int) Option {
}
}
// ReconnectJitter is an Option to set the upper bound of a random delay added ReconnectWait.
func ReconnectJitter(jitter, jitterForTLS time.Duration) Option {
return func(o *Options) error {
o.ReconnectJitter = jitter
o.ReconnectJitterTLS = jitterForTLS
return nil
}
}
// CustomReconnectDelay is an Option to set the CustomReconnectDelayCB option.
// See CustomReconnectDelayCB Option for more details.
func CustomReconnectDelay(cb ReconnectDelayHandler) Option {
return func(o *Options) error {
o.CustomReconnectDelayCB = cb
return nil
}
}
// PingInterval is an Option to set the period for client ping commands.
func PingInterval(t time.Duration) Option {
return func(o *Options) error {
@ -1123,7 +1172,7 @@ func (nc *Conn) setupServerPool() error {
// Randomize if allowed to
if !nc.Opts.NoRandomize {
nc.shufflePool()
nc.shufflePool(0)
}
// Normally, if this one is set, Options.Servers should not be,
@ -1220,14 +1269,16 @@ func (nc *Conn) addURLToPool(sURL string, implicit, saveTLSName bool) error {
}
// shufflePool swaps randomly elements in the server pool
func (nc *Conn) shufflePool() {
if len(nc.srvPool) <= 1 {
// The `offset` value indicates that the shuffling should start at
// this offset and leave the elements from [0..offset) intact.
func (nc *Conn) shufflePool(offset int) {
if len(nc.srvPool) <= offset+1 {
return
}
source := rand.NewSource(time.Now().UnixNano())
r := rand.New(source)
for i := range nc.srvPool {
j := r.Intn(i + 1)
for i := offset; i < len(nc.srvPool); i++ {
j := offset + r.Intn(i+1-offset)
nc.srvPool[i], nc.srvPool[j] = nc.srvPool[j], nc.srvPool[i]
}
}
@ -1249,8 +1300,6 @@ func (nc *Conn) createConn() (err error) {
}
if _, cur := nc.currentServer(); cur == nil {
return ErrNoServers
} else {
cur.lastAttempt = time.Now()
}
// We will auto-expand host names if they resolve to multiple IPs
@ -1384,7 +1433,7 @@ func (nc *Conn) ConnectedServerId() string {
if nc.status != CONNECTED {
return _EMPTY_
}
return nc.info.Id
return nc.info.ID
}
// Low level setup for structs, etc
@ -1393,6 +1442,7 @@ func (nc *Conn) setup() {
nc.pongs = make([]chan struct{}, 0, 8)
nc.fch = make(chan struct{}, flushChanSize)
nc.rqch = make(chan struct{})
// Setup scratch outbound buffer for PUB
pub := nc.scratch[:len(_PUB_P_)]
@ -1450,6 +1500,7 @@ func (nc *Conn) connect() error {
// For first connect we walk all servers in the pool and try
// to connect immediately.
nc.mu.Lock()
defer nc.mu.Unlock()
nc.initc = true
// The pool may change inside the loop iteration due to INFO protocol.
for i := 0; i < len(nc.srvPool); i++ {
@ -1463,8 +1514,8 @@ func (nc *Conn) connect() error {
err = nc.processConnectInit()
if err == nil {
nc.srvPool[i].didConnect = true
nc.srvPool[i].reconnects = 0
nc.current.didConnect = true
nc.current.reconnects = 0
nc.current.lastErr = nil
returnedErr = nil
break
@ -1487,7 +1538,7 @@ func (nc *Conn) connect() error {
if returnedErr == nil && nc.status != CONNECTED {
returnedErr = ErrNoServers
}
nc.mu.Unlock()
return returnedErr
}
@ -1814,33 +1865,63 @@ func (nc *Conn) doReconnect(err error) {
// This is used to wait on go routines exit if we start them in the loop
// but an error occurs after that.
waitForGoRoutines := false
var rt *time.Timer
// Channel used to kick routine out of sleep when conn is closed.
rqch := nc.rqch
// Counter that is increased when the whole list of servers has been tried.
var wlf int
for len(nc.srvPool) > 0 {
var jitter time.Duration
var rw time.Duration
// If a custom reconnect delay handler is set, this takes precedence.
crd := nc.Opts.CustomReconnectDelayCB
if crd == nil {
rw = nc.Opts.ReconnectWait
// TODO: since we sleep only after the whole list has been tried, we can't
// rely on individual *srv to know if it is a TLS or non-TLS url.
// We have to pick which type of jitter to use, for now, we use these hints:
jitter = nc.Opts.ReconnectJitter
if nc.Opts.Secure || nc.Opts.TLSConfig != nil {
jitter = nc.Opts.ReconnectJitterTLS
}
}
for i := 0; len(nc.srvPool) > 0; {
cur, err := nc.selectNextServer()
if err != nil {
nc.err = err
break
}
sleepTime := int64(0)
// Sleep appropriate amount of time before the
// connection attempt if connecting to same server
// we just got disconnected from..
if time.Since(cur.lastAttempt) < nc.Opts.ReconnectWait {
sleepTime = int64(nc.Opts.ReconnectWait - time.Since(cur.lastAttempt))
}
// On Windows, createConn() will take more than a second when no
// server is running at that address. So it could be that the
// time elapsed between reconnect attempts is always > than
// the set option. Release the lock to give a chance to a parallel
// nc.Close() to break the loop.
doSleep := i+1 >= len(nc.srvPool)
nc.mu.Unlock()
if sleepTime <= 0 {
if !doSleep {
i++
// Release the lock to give a chance to a concurrent nc.Close() to break the loop.
runtime.Gosched()
} else {
time.Sleep(time.Duration(sleepTime))
i = 0
var st time.Duration
if crd != nil {
wlf++
st = crd(wlf)
} else {
st = rw
if jitter > 0 {
st += time.Duration(rand.Int63n(int64(jitter)))
}
}
if rt == nil {
rt = time.NewTimer(st)
} else {
rt.Reset(st)
}
select {
case <-rqch:
rt.Stop()
case <-rt.C:
}
}
// If the readLoop, etc.. go routines were started, wait for them to complete.
if waitForGoRoutines {
@ -2427,8 +2508,14 @@ func (nc *Conn) processInfo(info string) error {
}
nc.addURLToPool(fmt.Sprintf("%s://%s", nc.connScheme(), curl), true, saveTLS)
}
if hasNew && !nc.initc && nc.Opts.DiscoveredServersCB != nil {
nc.ach.push(func() { nc.Opts.DiscoveredServersCB(nc) })
if hasNew {
// Randomize the pool if allowed but leave the first URL in place.
if !nc.Opts.NoRandomize {
nc.shufflePool(1)
}
if !nc.initc && nc.Opts.DiscoveredServersCB != nil {
nc.ach.push(func() { nc.Opts.DiscoveredServersCB(nc) })
}
}
return nil
@ -2748,7 +2835,7 @@ func (nc *Conn) oldRequest(subj string, data []byte, timeout time.Duration) (*Ms
inbox := NewInbox()
ch := make(chan *Msg, RequestChanLen)
s, err := nc.subscribe(inbox, _EMPTY_, nil, ch, false)
s, err := nc.subscribe(inbox, _EMPTY_, nil, ch, true)
if err != nil {
return nil, err
}
@ -3541,10 +3628,25 @@ func (nc *Conn) FlushTimeout(timeout time.Duration) (err error) {
return
}
// RTT calculates the round trip time between this client and the server.
func (nc *Conn) RTT() (time.Duration, error) {
if nc.IsClosed() {
return 0, ErrConnectionClosed
}
if nc.IsReconnecting() {
return 0, ErrDisconnected
}
start := time.Now()
if err := nc.FlushTimeout(10 * time.Second); err != nil {
return 0, err
}
return time.Since(start), nil
}
// Flush will perform a round trip to the server and return when it
// receives the internal reply.
func (nc *Conn) Flush() error {
return nc.FlushTimeout(60 * time.Second)
return nc.FlushTimeout(10 * time.Second)
}
// Buffered will return the number of bytes buffered to be sent to the server.
@ -3636,9 +3738,13 @@ func (nc *Conn) close(status Status, doCBs bool, err error) {
// Kick the Go routines so they fall out.
nc.kickFlusher()
nc.mu.Unlock()
nc.mu.Lock()
// If the reconnect timer is waiting between a reconnect attempt,
// this will kick it out.
if nc.rqch != nil {
close(nc.rqch)
nc.rqch = nil
}
// Clear any queued pongs, e.g. pending flush calls.
nc.clearPendingFlushCalls()
@ -4005,10 +4111,25 @@ func (nc *Conn) Barrier(f func()) error {
return nil
}
// GetClientIP returns the client IP as known by the server.
// Supported as of server version 2.1.6.
func (nc *Conn) GetClientIP() (net.IP, error) {
nc.mu.RLock()
defer nc.mu.RUnlock()
if nc.isClosed() {
return nil, ErrConnectionClosed
}
if nc.info.ClientIP == "" {
return nil, ErrClientIPNotSupported
}
ip := net.ParseIP(nc.info.ClientIP)
return ip, nil
}
// GetClientID returns the client ID assigned by the server to which
// the client is currently connected to. Note that the value may change if
// the client reconnects.
// This function returns ErrNoClientIDReturned if the server is of a
// This function returns ErrClientIDNotSupported if the server is of a
// version prior to 1.2.0.
func (nc *Conn) GetClientID() (uint64, error) {
nc.mu.RLock()

View File

@ -24,7 +24,7 @@ type msgArg struct {
size int
}
const MAX_CONTROL_LINE_SIZE = 1024
const MAX_CONTROL_LINE_SIZE = 4096
type parseState struct {
state int