Update vendor

Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alexellis2@gmail.com>
This commit is contained in:
Alex Ellis (OpenFaaS Ltd)
2024-11-18 21:45:59 +00:00
parent cfcd4f05ad
commit ab2c34bb34
268 changed files with 37236 additions and 3001 deletions

View File

@ -31,7 +31,7 @@ When using or transitioning to Go modules support:
```bash
# Go client latest or explicit version
go get github.com/nats-io/nats.go/@latest
go get github.com/nats-io/nats.go/@v1.36.0
go get github.com/nats-io/nats.go/@v1.37.0
# For latest NATS Server, add /v2 at the end
go get github.com/nats-io/nats-server/v2
@ -93,11 +93,13 @@ nc.Close()
```
## JetStream
[![JetStream API Reference](https://pkg.go.dev/badge/github.com/nats-io/nats.go/jetstream.svg)](https://pkg.go.dev/github.com/nats-io/nats.go/jetstream)
JetStream is the built-in NATS persistence system. `nats.go` provides a built-in
API enabling both managing JetStream assets as well as publishing/consuming
persistent messages.
### Basic usage
```go
@ -134,60 +136,6 @@ To find more information on `nats.go` JetStream API, visit
The service API (`micro`) allows you to [easily build NATS services](micro/README.md) The
services API is currently in beta release.
## Encoded Connections
```go
nc, _ := nats.Connect(nats.DefaultURL)
c, _ := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
defer c.Close()
// Simple Publisher
c.Publish("foo", "Hello World")
// Simple Async Subscriber
c.Subscribe("foo", func(s string) {
fmt.Printf("Received a message: %s\n", s)
})
// EncodedConn can Publish any raw Go type using the registered Encoder
type person struct {
Name string
Address string
Age int
}
// Go type Subscriber
c.Subscribe("hello", func(p *person) {
fmt.Printf("Received a person: %+v\n", p)
})
me := &person{Name: "derek", Age: 22, Address: "140 New Montgomery Street, San Francisco, CA"}
// Go type Publisher
c.Publish("hello", me)
// Unsubscribe
sub, err := c.Subscribe("foo", nil)
// ...
sub.Unsubscribe()
// Requests
var response string
err = c.Request("help", "help me", &response, 10*time.Millisecond)
if err != nil {
fmt.Printf("Request failed: %v\n", err)
}
// Replying
c.Subscribe("help", func(subj, reply string, msg string) {
c.Publish(reply, "I can help!")
})
// Close connection
c.Close();
```
## New Authentication (Nkeys and User Credentials)
This requires server with version >= 2.0.0
@ -267,34 +215,6 @@ if err != nil {
```
## Using Go Channels (netchan)
```go
nc, _ := nats.Connect(nats.DefaultURL)
ec, _ := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
defer ec.Close()
type person struct {
Name string
Address string
Age int
}
recvCh := make(chan *person)
ec.BindRecvChan("hello", recvCh)
sendCh := make(chan *person)
ec.BindSendChan("hello", sendCh)
me := &person{Name: "derek", Age: 22, Address: "140 New Montgomery Street"}
// Send via Go channels
sendCh <- me
// Receive via Go channels
who := <- recvCh
```
## Wildcard Subscriptions
```go
@ -461,17 +381,6 @@ msg, err := nc.RequestWithContext(ctx, "foo", []byte("bar"))
sub, err := nc.SubscribeSync("foo")
msg, err := sub.NextMsgWithContext(ctx)
// Encoded Request with context
c, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
type request struct {
Message string `json:"message"`
}
type response struct {
Code int `json:"code"`
}
req := &request{Message: "Hello"}
resp := &response{}
err := c.RequestWithContext(ctx, "foo", req, resp)
```
## Backwards compatibility

View File

@ -217,6 +217,8 @@ func (nc *Conn) FlushWithContext(ctx context.Context) error {
// RequestWithContext will create an Inbox and perform a Request
// using the provided cancellation context with the Inbox reply
// for the data v. A response will be decoded into the vPtr last parameter.
//
// Deprecated: Encoded connections are no longer supported.
func (c *EncodedConn) RequestWithContext(ctx context.Context, subject string, v any, vPtr any) error {
if ctx == nil {
return ErrInvalidContext

View File

@ -24,7 +24,11 @@ import (
"github.com/nats-io/nats.go/encoders/builtin"
)
//lint:file-ignore SA1019 Ignore deprecation warnings for EncodedConn
// Encoder interface is for all register encoders
//
// Deprecated: Encoded connections are no longer supported.
type Encoder interface {
Encode(subject string, v any) ([]byte, error)
Decode(subject string, data []byte, vPtr any) error
@ -51,6 +55,8 @@ func init() {
// EncodedConn are the preferred way to interface with NATS. They wrap a bare connection to
// a nats server and have an extendable encoder system that will encode and decode messages
// from raw Go types.
//
// Deprecated: Encoded connections are no longer supported.
type EncodedConn struct {
Conn *Conn
Enc Encoder
@ -58,6 +64,8 @@ type EncodedConn struct {
// NewEncodedConn will wrap an existing Connection and utilize the appropriate registered
// encoder.
//
// Deprecated: Encoded connections are no longer supported.
func NewEncodedConn(c *Conn, encType string) (*EncodedConn, error) {
if c == nil {
return nil, errors.New("nats: Nil Connection")
@ -73,6 +81,8 @@ func NewEncodedConn(c *Conn, encType string) (*EncodedConn, error) {
}
// RegisterEncoder will register the encType with the given Encoder. Useful for customization.
//
// Deprecated: Encoded connections are no longer supported.
func RegisterEncoder(encType string, enc Encoder) {
encLock.Lock()
defer encLock.Unlock()
@ -80,6 +90,8 @@ func RegisterEncoder(encType string, enc Encoder) {
}
// EncoderForType will return the registered Encoder for the encType.
//
// Deprecated: Encoded connections are no longer supported.
func EncoderForType(encType string) Encoder {
encLock.Lock()
defer encLock.Unlock()
@ -88,6 +100,8 @@ func EncoderForType(encType string) Encoder {
// Publish publishes the data argument to the given subject. The data argument
// will be encoded using the associated encoder.
//
// Deprecated: Encoded connections are no longer supported.
func (c *EncodedConn) Publish(subject string, v any) error {
b, err := c.Enc.Encode(subject, v)
if err != nil {
@ -99,6 +113,8 @@ func (c *EncodedConn) Publish(subject string, v any) error {
// PublishRequest will perform a Publish() expecting a response on the
// reply subject. Use Request() for automatically waiting for a response
// inline.
//
// Deprecated: Encoded connections are no longer supported.
func (c *EncodedConn) PublishRequest(subject, reply string, v any) error {
b, err := c.Enc.Encode(subject, v)
if err != nil {
@ -110,6 +126,8 @@ func (c *EncodedConn) PublishRequest(subject, reply string, v any) error {
// Request will create an Inbox and perform a Request() call
// with the Inbox reply for the data v. A response will be
// decoded into the vPtr Response.
//
// Deprecated: Encoded connections are no longer supported.
func (c *EncodedConn) Request(subject string, v any, vPtr any, timeout time.Duration) error {
b, err := c.Enc.Encode(subject, v)
if err != nil {
@ -150,6 +168,8 @@ func (c *EncodedConn) Request(subject string, v any, vPtr any, timeout time.Dura
// and demarshal it into the given struct, e.g. person.
// There are also variants where the callback wants either the subject, or the
// subject and the reply subject.
//
// Deprecated: Encoded connections are no longer supported.
type Handler any
// Dissect the cb Handler's signature
@ -170,6 +190,8 @@ var emptyMsgType = reflect.TypeOf(&Msg{})
// Subscribe will create a subscription on the given subject and process incoming
// messages using the specified Handler. The Handler should be a func that matches
// a signature from the description of Handler from above.
//
// Deprecated: Encoded connections are no longer supported.
func (c *EncodedConn) Subscribe(subject string, cb Handler) (*Subscription, error) {
return c.subscribe(subject, _EMPTY_, cb)
}
@ -177,6 +199,8 @@ func (c *EncodedConn) Subscribe(subject string, cb Handler) (*Subscription, erro
// QueueSubscribe will create a queue subscription on the given subject and process
// incoming messages using the specified Handler. The Handler should be a func that
// matches a signature from the description of Handler from above.
//
// Deprecated: Encoded connections are no longer supported.
func (c *EncodedConn) QueueSubscribe(subject, queue string, cb Handler) (*Subscription, error) {
return c.subscribe(subject, queue, cb)
}
@ -238,18 +262,24 @@ func (c *EncodedConn) subscribe(subject, queue string, cb Handler) (*Subscriptio
}
// FlushTimeout allows a Flush operation to have an associated timeout.
//
// Deprecated: Encoded connections are no longer supported.
func (c *EncodedConn) FlushTimeout(timeout time.Duration) (err error) {
return c.Conn.FlushTimeout(timeout)
}
// Flush will perform a round trip to the server and return when it
// receives the internal reply.
//
// Deprecated: Encoded connections are no longer supported.
func (c *EncodedConn) Flush() error {
return c.Conn.Flush()
}
// Close will close the connection to the server. This call will release
// all blocking calls, such as Flush(), etc.
//
// Deprecated: Encoded connections are no longer supported.
func (c *EncodedConn) Close() {
c.Conn.Close()
}
@ -259,11 +289,15 @@ func (c *EncodedConn) Close() {
// will be drained and can not publish any additional messages. Upon draining
// of the publishers, the connection will be closed. Use the ClosedCB()
// option to know when the connection has moved from draining to closed.
//
// Deprecated: Encoded connections are no longer supported.
func (c *EncodedConn) Drain() error {
return c.Conn.Drain()
}
// LastError reports the last error encountered via the Connection.
//
// Deprecated: Encoded connections are no longer supported.
func (c *EncodedConn) LastError() error {
return c.Conn.LastError()
}

View File

@ -26,6 +26,8 @@ import (
// turn numbers into appropriate strings that can be decoded. It will also
// properly encoded and decode bools. If will encode a struct, but if you want
// to properly handle structures you should use JsonEncoder.
//
// Deprecated: Encoded connections are no longer supported.
type DefaultEncoder struct {
// Empty
}
@ -35,6 +37,8 @@ var falseB = []byte("false")
var nilB = []byte("")
// Encode
//
// Deprecated: Encoded connections are no longer supported.
func (je *DefaultEncoder) Encode(subject string, v any) ([]byte, error) {
switch arg := v.(type) {
case string:
@ -58,6 +62,8 @@ func (je *DefaultEncoder) Encode(subject string, v any) ([]byte, error) {
}
// Decode
//
// Deprecated: Encoded connections are no longer supported.
func (je *DefaultEncoder) Decode(subject string, data []byte, vPtr any) error {
// Figure out what it's pointing to...
sData := *(*string)(unsafe.Pointer(&data))

View File

@ -21,6 +21,8 @@ import (
// GobEncoder is a Go specific GOB Encoder implementation for EncodedConn.
// This encoder will use the builtin encoding/gob to Marshal
// and Unmarshal most types, including structs.
//
// Deprecated: Encoded connections are no longer supported.
type GobEncoder struct {
// Empty
}
@ -28,6 +30,8 @@ type GobEncoder struct {
// FIXME(dlc) - This could probably be more efficient.
// Encode
//
// Deprecated: Encoded connections are no longer supported.
func (ge *GobEncoder) Encode(subject string, v any) ([]byte, error) {
b := new(bytes.Buffer)
enc := gob.NewEncoder(b)
@ -38,6 +42,8 @@ func (ge *GobEncoder) Encode(subject string, v any) ([]byte, error) {
}
// Decode
//
// Deprecated: Encoded connections are no longer supported.
func (ge *GobEncoder) Decode(subject string, data []byte, vPtr any) (err error) {
dec := gob.NewDecoder(bytes.NewBuffer(data))
err = dec.Decode(vPtr)

View File

@ -21,11 +21,15 @@ import (
// JsonEncoder is a JSON Encoder implementation for EncodedConn.
// This encoder will use the builtin encoding/json to Marshal
// and Unmarshal most types, including structs.
//
// Deprecated: Encoded connections are no longer supported.
type JsonEncoder struct {
// Empty
}
// Encode
//
// Deprecated: Encoded connections are no longer supported.
func (je *JsonEncoder) Encode(subject string, v any) ([]byte, error) {
b, err := json.Marshal(v)
if err != nil {
@ -35,6 +39,8 @@ func (je *JsonEncoder) Encode(subject string, v any) ([]byte, error) {
}
// Decode
//
// Deprecated: Encoded connections are no longer supported.
func (je *JsonEncoder) Decode(subject string, data []byte, vPtr any) (err error) {
switch arg := vPtr.(type) {
case *string:

View File

@ -1,23 +1,25 @@
module github.com/nats-io/nats.go
go 1.19
go 1.21
toolchain go1.22.5
require (
github.com/golang/protobuf v1.4.2
github.com/klauspost/compress v1.17.8
github.com/klauspost/compress v1.17.9
github.com/nats-io/jwt v1.2.2
github.com/nats-io/nats-server/v2 v2.10.16
github.com/nats-io/nats-server/v2 v2.10.17
github.com/nats-io/nkeys v0.4.7
github.com/nats-io/nuid v1.0.1
go.uber.org/goleak v1.3.0
golang.org/x/text v0.15.0
golang.org/x/text v0.16.0
google.golang.org/protobuf v1.23.0
)
require (
github.com/minio/highwayhash v1.0.2 // indirect
github.com/nats-io/jwt/v2 v2.5.7 // indirect
golang.org/x/crypto v0.23.0 // indirect
golang.org/x/sys v0.20.0 // indirect
golang.org/x/crypto v0.24.0 // indirect
golang.org/x/sys v0.21.0 // indirect
golang.org/x/time v0.5.0 // indirect
)

View File

@ -1,4 +1,5 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
@ -10,38 +11,40 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU=
github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA=
github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU=
github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q=
github.com/nats-io/jwt/v2 v2.5.7 h1:j5lH1fUXCnJnY8SsQeB/a/z9Azgu2bYIDvtPVNdxe2c=
github.com/nats-io/jwt/v2 v2.5.7/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A=
github.com/nats-io/nats-server/v2 v2.10.16 h1:2jXaiydp5oB/nAx/Ytf9fdCi9QN6ItIc9eehX8kwVV0=
github.com/nats-io/nats-server/v2 v2.10.16/go.mod h1:Pksi38H2+6xLe1vQx0/EA4bzetM0NqyIHcIbmgXSkIU=
github.com/nats-io/nats-server/v2 v2.10.17 h1:PTVObNBD3TZSNUDgzFb1qQsQX4mOgFmOuG9vhT+KBUY=
github.com/nats-io/nats-server/v2 v2.10.17/go.mod h1:5OUyc4zg42s/p2i92zbbqXvUNsbF0ivdTLKshVMn2YQ=
github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI=
golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8=
golang.org/x/crypto v0.24.0 h1:mnl8DM0o513X8fdIkmyFE/5hTYxbwYOjDS/+rK6qpRI=
golang.org/x/crypto v0.24.0/go.mod h1:Z1PMYSOR5nyMcyAVAIQSKCDwalqy85Aqn1x3Ws4L5DM=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y=
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws=
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk=
golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
@ -54,3 +57,4 @@ google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzi
google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM=
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@ -1,4 +1,4 @@
// Copyright 2020-2023 The NATS Authors
// Copyright 2020-2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@ -58,6 +58,19 @@ type JetStream interface {
// PublishAsyncComplete returns a channel that will be closed when all outstanding messages are ack'd.
PublishAsyncComplete() <-chan struct{}
// CleanupPublisher will cleanup the publishing side of JetStreamContext.
//
// This will unsubscribe from the internal reply subject if needed.
// All pending async publishes will fail with ErrJetStreamPublisherClosed.
//
// If an error handler was provided, it will be called for each pending async
// publish and PublishAsyncComplete will be closed.
//
// After completing JetStreamContext is still usable - internal subscription
// will be recreated on next publish, but the acks from previous publishes will
// be lost.
CleanupPublisher()
// Subscribe creates an async Subscription for JetStream.
// The stream and consumer names can be provided with the nats.Bind() option.
// For creating an ephemeral (where the consumer name is picked by the server),
@ -719,10 +732,7 @@ func (js *js) resetPendingAcksOnReconnect() {
paf.errCh <- paf.err
}
if errCb != nil {
// clear reply subject so that new one is created on republish
js.mu.Unlock()
errCb(js, paf.msg, ErrDisconnected)
js.mu.Lock()
defer errCb(js, paf.msg, ErrDisconnected)
}
delete(js.pafs, id)
}
@ -734,6 +744,38 @@ func (js *js) resetPendingAcksOnReconnect() {
}
}
// CleanupPublisher will cleanup the publishing side of JetStreamContext.
//
// This will unsubscribe from the internal reply subject if needed.
// All pending async publishes will fail with ErrJetStreamContextClosed.
//
// If an error handler was provided, it will be called for each pending async
// publish and PublishAsyncComplete will be closed.
//
// After completing JetStreamContext is still usable - internal subscription
// will be recreated on next publish, but the acks from previous publishes will
// be lost.
func (js *js) CleanupPublisher() {
js.cleanupReplySub()
js.mu.Lock()
errCb := js.opts.aecb
for id, paf := range js.pafs {
paf.err = ErrJetStreamPublisherClosed
if paf.errCh != nil {
paf.errCh <- paf.err
}
if errCb != nil {
defer errCb(js, paf.msg, ErrJetStreamPublisherClosed)
}
delete(js.pafs, id)
}
if js.dch != nil {
close(js.dch)
js.dch = nil
}
js.mu.Unlock()
}
func (js *js) cleanupReplySub() {
js.mu.Lock()
if js.rsub != nil {
@ -2899,10 +2941,11 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
}
// Make our request expiration a bit shorter than the current timeout.
expires := ttl
if ttl >= 20*time.Millisecond {
expires = ttl - 10*time.Millisecond
expiresDiff := time.Duration(float64(ttl) * 0.1)
if expiresDiff > 5*time.Second {
expiresDiff = 5 * time.Second
}
expires := ttl - expiresDiff
nr.Batch = batch - len(msgs)
nr.Expires = expires
@ -3166,10 +3209,11 @@ func (sub *Subscription) FetchBatch(batch int, opts ...PullOpt) (MessageBatch, e
ttl = time.Until(deadline)
// Make our request expiration a bit shorter than the current timeout.
expires := ttl
if ttl >= 20*time.Millisecond {
expires = ttl - 10*time.Millisecond
expiresDiff := time.Duration(float64(ttl) * 0.1)
if expiresDiff > 5*time.Second {
expiresDiff = 5 * time.Second
}
expires := ttl - expiresDiff
requestBatch := batch - len(result.msgs)
req := nextRequest{

View File

@ -151,7 +151,10 @@ var (
// ErrSubscriptionClosed is returned when attempting to send pull request to a closed subscription
ErrSubscriptionClosed JetStreamError = &jsError{message: "subscription closed"}
// DEPRECATED: ErrInvalidDurableName is no longer returned and will be removed in future releases.
// ErrJetStreamPublisherClosed is returned for each unfinished ack future when JetStream.Cleanup is called.
ErrJetStreamPublisherClosed JetStreamError = &jsError{message: "jetstream context closed"}
// Deprecated: ErrInvalidDurableName is no longer returned and will be removed in future releases.
// Use ErrInvalidConsumerName instead.
ErrInvalidDurableName = errors.New("nats: invalid durable name")
)

View File

@ -41,7 +41,7 @@ type JetStreamManager interface {
PurgeStream(name string, opts ...JSOpt) error
// StreamsInfo can be used to retrieve a list of StreamInfo objects.
// DEPRECATED: Use Streams() instead.
// Deprecated: Use Streams() instead.
StreamsInfo(opts ...JSOpt) <-chan *StreamInfo
// Streams can be used to retrieve a list of StreamInfo objects.
@ -86,7 +86,7 @@ type JetStreamManager interface {
ConsumerInfo(stream, name string, opts ...JSOpt) (*ConsumerInfo, error)
// ConsumersInfo is used to retrieve a list of ConsumerInfo objects.
// DEPRECATED: Use Consumers() instead.
// Deprecated: Use Consumers() instead.
ConsumersInfo(stream string, opts ...JSOpt) <-chan *ConsumerInfo
// Consumers is used to retrieve a list of ConsumerInfo objects.
@ -240,7 +240,7 @@ type StreamConfig struct {
// v2.10.0 or later.
Metadata map[string]string `json:"metadata,omitempty"`
// Template identifies the template that manages the Stream. DEPRECATED:
// Template identifies the template that manages the Stream. Deprecated:
// This feature is no longer supported.
Template string `json:"template_owner,omitempty"`
}
@ -747,7 +747,7 @@ func (jsc *js) Consumers(stream string, opts ...JSOpt) <-chan *ConsumerInfo {
}
// ConsumersInfo is used to retrieve a list of ConsumerInfo objects.
// DEPRECATED: Use Consumers() instead.
// Deprecated: Use Consumers() instead.
func (jsc *js) ConsumersInfo(stream string, opts ...JSOpt) <-chan *ConsumerInfo {
return jsc.Consumers(stream, opts...)
}
@ -1617,7 +1617,7 @@ func (jsc *js) Streams(opts ...JSOpt) <-chan *StreamInfo {
}
// StreamsInfo can be used to retrieve a list of StreamInfo objects.
// DEPRECATED: Use Streams() instead.
// Deprecated: Use Streams() instead.
func (jsc *js) StreamsInfo(opts ...JSOpt) <-chan *StreamInfo {
return jsc.Streams(opts...)
}

View File

@ -65,7 +65,7 @@ type KeyValue interface {
// WatchAll will invoke the callback for all updates.
WatchAll(opts ...WatchOpt) (KeyWatcher, error)
// Keys will return all keys.
// DEPRECATED: Use ListKeys instead to avoid memory issues.
// Deprecated: Use ListKeys instead to avoid memory issues.
Keys(opts ...WatchOpt) ([]string, error)
// ListKeys will return all keys in a channel.
ListKeys(opts ...WatchOpt) (KeyLister, error)

View File

@ -47,7 +47,7 @@ import (
// Default Constants
const (
Version = "1.36.0"
Version = "1.37.0"
DefaultURL = "nats://127.0.0.1:4222"
DefaultPort = 4222
DefaultMaxReconnect = 60
@ -160,7 +160,7 @@ func GetDefaultOptions() Options {
}
}
// DEPRECATED: Use GetDefaultOptions() instead.
// Deprecated: Use GetDefaultOptions() instead.
// DefaultOptions is not safe for use by multiple clients.
// For details see #308.
var DefaultOptions = GetDefaultOptions()
@ -386,7 +386,7 @@ type Options struct {
// DisconnectedCB sets the disconnected handler that is called
// whenever the connection is disconnected.
// Will not be called if DisconnectedErrCB is set
// DEPRECATED. Use DisconnectedErrCB which passes error that caused
// Deprecated. Use DisconnectedErrCB which passes error that caused
// the disconnect event.
DisconnectedCB ConnHandler
@ -450,7 +450,7 @@ type Options struct {
TokenHandler AuthTokenHandler
// Dialer allows a custom net.Dialer when forming connections.
// DEPRECATED: should use CustomDialer instead.
// Deprecated: should use CustomDialer instead.
Dialer *net.Dialer
// CustomDialer allows to specify a custom dialer (not necessarily
@ -1108,7 +1108,7 @@ func DisconnectErrHandler(cb ConnErrHandler) Option {
}
// DisconnectHandler is an Option to set the disconnected handler.
// DEPRECATED: Use DisconnectErrHandler.
// Deprecated: Use DisconnectErrHandler.
func DisconnectHandler(cb ConnHandler) Option {
return func(o *Options) error {
o.DisconnectedCB = cb
@ -1280,7 +1280,7 @@ func SyncQueueLen(max int) Option {
// Dialer is an Option to set the dialer which will be used when
// attempting to establish a connection.
// DEPRECATED: Should use CustomDialer instead.
// Deprecated: Should use CustomDialer instead.
func Dialer(dialer *net.Dialer) Option {
return func(o *Options) error {
o.Dialer = dialer
@ -1397,7 +1397,7 @@ func TLSHandshakeFirst() Option {
// Handler processing
// SetDisconnectHandler will set the disconnect event handler.
// DEPRECATED: Use SetDisconnectErrHandler
// Deprecated: Use SetDisconnectErrHandler
func (nc *Conn) SetDisconnectHandler(dcb ConnHandler) {
if nc == nil {
return
@ -1513,7 +1513,7 @@ func processUrlString(url string) []string {
urls := strings.Split(url, ",")
var j int
for _, s := range urls {
u := strings.TrimSpace(s)
u := strings.TrimSuffix(strings.TrimSpace(s), "/")
if len(u) > 0 {
urls[j] = u
j++
@ -4902,7 +4902,8 @@ func (s *Subscription) processNextMsgDelivered(msg *Msg) error {
}
// Queued returns the number of queued messages in the client for this subscription.
// DEPRECATED: Use Pending()
//
// Deprecated: Use Pending()
func (s *Subscription) QueuedMsgs() (int, error) {
m, _, err := s.Pending()
return int(m), err

View File

@ -23,6 +23,8 @@ import (
// Data will be encoded and decoded via the EncodedConn and its associated encoders.
// BindSendChan binds a channel for send operations to NATS.
//
// Deprecated: Encoded connections are no longer supported.
func (c *EncodedConn) BindSendChan(subject string, channel any) error {
chVal := reflect.ValueOf(channel)
if chVal.Kind() != reflect.Chan {
@ -61,11 +63,15 @@ func chPublish(c *EncodedConn, chVal reflect.Value, subject string) {
}
// BindRecvChan binds a channel for receive operations from NATS.
//
// Deprecated: Encoded connections are no longer supported.
func (c *EncodedConn) BindRecvChan(subject string, channel any) (*Subscription, error) {
return c.bindRecvChan(subject, _EMPTY_, channel)
}
// BindRecvQueueChan binds a channel for queue-based receive operations from NATS.
//
// Deprecated: Encoded connections are no longer supported.
func (c *EncodedConn) BindRecvQueueChan(subject, queue string, channel any) (*Subscription, error) {
return c.bindRecvChan(subject, queue, channel)
}