Update go.mod, Alpine to 3.20.0 and to Go 1.22

Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alex@openfaas.com>
This commit is contained in:
Alex Ellis (OpenFaaS Ltd)
2024-05-31 13:54:40 +01:00
parent 65d37f2856
commit 3d2808354d
564 changed files with 15769 additions and 15925 deletions

View File

@ -5,6 +5,9 @@ issues:
- linters:
- errcheck
text: "Unsubscribe"
- linters:
- errcheck
text: "Drain"
- linters:
- errcheck
text: "msg.Ack"

View File

@ -1,12 +1,12 @@
language: go
go:
- "1.22.x"
- "1.21.x"
- "1.20.x"
go_import_path: github.com/nats-io/nats.go
install:
- go get -t ./...
- curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin
- if [[ "$TRAVIS_GO_VERSION" =~ 1.21 ]]; then
- if [[ "$TRAVIS_GO_VERSION" =~ 1.22 ]]; then
go install github.com/mattn/goveralls@latest;
go install github.com/wadey/gocovmerge@latest;
go install honnef.co/go/tools/cmd/staticcheck@latest;
@ -15,22 +15,22 @@ install:
before_script:
- $(exit $(go fmt ./... | wc -l))
- go vet -modfile=go_test.mod ./...
- if [[ "$TRAVIS_GO_VERSION" =~ 1.21 ]]; then
- if [[ "$TRAVIS_GO_VERSION" =~ 1.22 ]]; then
find . -type f -name "*.go" | xargs misspell -error -locale US;
GOFLAGS="-mod=mod -modfile=go_test.mod" staticcheck ./...;
fi
- golangci-lint run ./jetstream/...
script:
- go test -modfile=go_test.mod -v -run=TestNoRace -p=1 ./... --failfast -vet=off
- if [[ "$TRAVIS_GO_VERSION" =~ 1.21 ]]; then ./scripts/cov.sh TRAVIS; else go test -modfile=go_test.mod -race -v -p=1 ./... --failfast -vet=off -tags=internal_testing; fi
- if [[ "$TRAVIS_GO_VERSION" =~ 1.22 ]]; then ./scripts/cov.sh TRAVIS; else go test -modfile=go_test.mod -race -v -p=1 ./... --failfast -vet=off -tags=internal_testing; fi
after_success:
- if [[ "$TRAVIS_GO_VERSION" =~ 1.21 ]]; then $HOME/gopath/bin/goveralls -coverprofile=acc.out -service travis-ci; fi
- if [[ "$TRAVIS_GO_VERSION" =~ 1.22 ]]; then $HOME/gopath/bin/goveralls -coverprofile=acc.out -service travis-ci; fi
jobs:
include:
- name: "Go: 1.21.x (nats-server@main)"
go: "1.21.x"
- name: "Go: 1.22.x (nats-server@main)"
go: "1.22.x"
before_script:
- go get -modfile go_test.mod github.com/nats-io/nats-server/v2@main
allow_failures:
- name: "Go: 1.21.x (nats-server@main)"
- name: "Go: 1.22.x (nats-server@main)"

View File

@ -0,0 +1,80 @@
# Contributing
Thanks for your interest in contributing! This document contains `nats-io/nats.go` specific contributing details. If you
are a first-time contributor, please refer to the general [NATS Contributor Guide](https://nats.io/contributing/) to get
a comprehensive overview of contributing to the NATS project.
## Getting started
There are three general ways you can contribute to this repo:
- Proposing an enhancement or new feature
- Reporting a bug or regression
- Contributing changes to the source code
For the first two, refer to the [GitHub Issues](https://github.com/nats-io/nats.go/issues/new/choose) which guides you
through the available options along with the needed information to collect.
## Contributing changes
_Prior to opening a pull request, it is recommended to open an issue first to ensure the maintainers can review intended
changes. Exceptions to this rule include fixing non-functional source such as code comments, documentation or other
supporting files._
Proposing source code changes is done through GitHub's standard pull request workflow.
If your branch is a work-in-progress then please start by creating your pull requests as draft, by clicking the
down-arrow next to the `Create pull request` button and instead selecting `Create draft pull request`.
This will defer the automatic process of requesting a review from the NATS team and significantly reduces noise until
you are ready. Once you are happy, you can click the `Ready for review` button.
### Guidelines
A good pull request includes:
- A high-level description of the changes, including links to any issues that are related by adding comments
like `Resolves #NNN` to your description.
See [Linking a Pull Request to an Issue](https://docs.github.com/en/issues/tracking-your-work-with-issues/linking-a-pull-request-to-an-issue)
for more information.
- An up-to-date parent commit. Please make sure you are pulling in the latest `main` branch and rebasing your work on
top of it, i.e. `git rebase main`.
- Unit tests where appropriate. Bug fixes will benefit from the addition of regression tests. New features will not be
accepted without suitable test coverage!
- No more commits than necessary. Sometimes having multiple commits is useful for telling a story or isolating changes
from one another, but please squash down any unnecessary commits that may just be for clean-up, comments or small
changes.
- No additional external dependencies that aren't absolutely essential. Please do everything you can to avoid pulling in
additional libraries/dependencies into `go.mod` as we will be very critical of these.
### Sign-off
In order to accept a contribution, you will first need to certify that the contribution is your original work and that
you license the work to the project under
the [Apache-2.0 license](https://github.com/nats-io/nats.go/blob/main/LICENSE).
This is done by using `Signed-off-by` statements, which should appear in **both** your commit messages and your PR
description. Please note that we can only accept sign-offs under a legal name. Nicknames and aliases are not permitted.
To perform a sign-off with `git`, use `git commit -s` (or `--signoff`).
## Get help
If you have questions about the contribution process, please start
a [GitHub discussion](https://github.com/nats-io/nats.go/discussions), join the [NATS Slack](https://slack.nats.io/), or
send your question to the [NATS Google Group](https://groups.google.com/forum/#!forum/natsio).
## Testing
You should use `go_test.mod` to manage your testing dependencies. Please use the following command to update your
dependencies and avoid changing the main `go.mod` in a PR:
```shell
go mod tidy -modfile=go_test.mod
```
To the tests you can pass `-modfile=go_test.mod` flag to `go test` or instead you can also set `GOFLAGS="-modfile=go_test.mod"` as an environment variable:
```shell
go test ./... -modfile=go_test.mod
```

View File

@ -14,6 +14,8 @@ A [Go](http://golang.org) client for the [NATS messaging system](https://nats.io
[Coverage-Url]: https://coveralls.io/r/nats-io/nats.go?branch=main
[Coverage-image]: https://coveralls.io/repos/github/nats-io/nats.go/badge.svg?branch=main
**Check out [NATS by example](https://natsbyexample.com) - An evolving collection of runnable, cross-client reference examples for NATS.**
## Installation
```bash
@ -29,7 +31,7 @@ When using or transitioning to Go modules support:
```bash
# Go client latest or explicit version
go get github.com/nats-io/nats.go/@latest
go get github.com/nats-io/nats.go/@v1.31.0
go get github.com/nats-io/nats.go/@v1.35.0
# For latest NATS Server, add /v2 at the end
go get github.com/nats-io/nats-server/v2
@ -472,6 +474,19 @@ resp := &response{}
err := c.RequestWithContext(ctx, "foo", req, resp)
```
## Backwards compatibility
In the development of nats.go, we are committed to maintaining backward compatibility and ensuring a stable and reliable experience for all users. In general, we follow the standard go compatibility guidelines.
However, it's important to clarify our stance on certain types of changes:
- **Expanding structures:**
Adding new fields to structs is not considered a breaking change.
- **Adding methods to exported interfaces:**
Extending public interfaces with new methods is also not viewed as a breaking change within the context of this project. It is important to note that no unexported methods will be added to interfaces allowing users to implement them.
Additionally, this library always supports at least 2 latest minor Go versions. For example, if the latest Go version is 1.22, the library will support Go 1.21 and 1.22.
## License
Unless otherwise noted, the NATS source files are distributed

View File

@ -1,4 +1,4 @@
// Copyright 2016-2022 The NATS Authors
// Copyright 2016-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at

View File

@ -1,4 +1,4 @@
// Copyright 2012-2019 The NATS Authors
// Copyright 2012-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at

View File

@ -1,4 +1,4 @@
// Copyright 2012-2018 The NATS Authors
// Copyright 2012-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@ -24,7 +24,7 @@ import (
// DefaultEncoder implementation for EncodedConn.
// This encoder will leave []byte and string untouched, but will attempt to
// turn numbers into appropriate strings that can be decoded. It will also
// propely encoded and decode bools. If will encode a struct, but if you want
// properly encoded and decode bools. If will encode a struct, but if you want
// to properly handle structures you should use JsonEncoder.
type DefaultEncoder struct {
// Empty

View File

@ -1,4 +1,4 @@
// Copyright 2013-2018 The NATS Authors
// Copyright 2013-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at

View File

@ -1,4 +1,4 @@
// Copyright 2012-2018 The NATS Authors
// Copyright 2012-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at

View File

@ -4,19 +4,20 @@ go 1.19
require (
github.com/golang/protobuf v1.4.2
github.com/klauspost/compress v1.17.0
github.com/nats-io/nats-server/v2 v2.10.0
github.com/nats-io/nkeys v0.4.5
github.com/klauspost/compress v1.17.6
github.com/nats-io/jwt v1.2.2
github.com/nats-io/nats-server/v2 v2.10.11
github.com/nats-io/nkeys v0.4.7
github.com/nats-io/nuid v1.0.1
go.uber.org/goleak v1.2.1
golang.org/x/text v0.13.0
go.uber.org/goleak v1.3.0
golang.org/x/text v0.14.0
google.golang.org/protobuf v1.23.0
)
require (
github.com/minio/highwayhash v1.0.2 // indirect
github.com/nats-io/jwt/v2 v2.5.2 // indirect
golang.org/x/crypto v0.13.0 // indirect
golang.org/x/sys v0.12.0 // indirect
golang.org/x/time v0.3.0 // indirect
github.com/nats-io/jwt/v2 v2.5.3 // indirect
golang.org/x/crypto v0.19.0 // indirect
golang.org/x/sys v0.17.0 // indirect
golang.org/x/time v0.5.0 // indirect
)

View File

@ -10,32 +10,40 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM=
github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/compress v1.17.6 h1:60eq2E/jlfwQXtvZEeBUYADs+BwKBWURIY+Gj2eRGjI=
github.com/klauspost/compress v1.17.6/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt/v2 v2.5.2 h1:DhGH+nKt+wIkDxM6qnVSKjokq5t59AZV5HRcFW0zJwU=
github.com/nats-io/jwt/v2 v2.5.2/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI=
github.com/nats-io/nats-server/v2 v2.10.0 h1:rcU++Hzo+wARxtJugrV3J5z5iGdHeVG8tT8Chb3bKDg=
github.com/nats-io/nats-server/v2 v2.10.0/go.mod h1:3PMvMSu2cuK0J9YInRLWdFpFsswKKGUS77zVSAudRto=
github.com/nats-io/nkeys v0.4.5 h1:Zdz2BUlFm4fJlierwvGK+yl20IAKUm7eV6AAZXEhkPk=
github.com/nats-io/nkeys v0.4.5/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64=
github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU=
github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q=
github.com/nats-io/jwt/v2 v2.5.3 h1:/9SWvzc6hTfamcgXJ3uYRpgj+QuY2aLNqRiqrKcrpEo=
github.com/nats-io/jwt/v2 v2.5.3/go.mod h1:iysuPemFcc7p4IoYots3IuELSI4EDe9Y0bQMe+I3Bf4=
github.com/nats-io/nats-server/v2 v2.10.11 h1:yKUiLVincZISpo3A4YljJQ+HfLltGAgoNNJl99KL8I0=
github.com/nats-io/nats-server/v2 v2.10.11/go.mod h1:dXtOqVWzbMTEj+tUyC/itXjJhW37xh0tUBrTAlqAfx8=
github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
golang.org/x/crypto v0.13.0 h1:mvySKfSWJ+UKUii46M40LOvyWfN0s2U+46/jDd0e6Ck=
golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc=
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.19.0 h1:ENy+Az/9Y1vSrlrvBSyna3PITt4tiZLf7sgCjZBX7Wo=
golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4=
golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y=
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=

View File

@ -32,6 +32,11 @@ import (
)
// JetStream allows persistent messaging through JetStream.
//
// NOTE: JetStream is part of legacy API.
// Users are encouraged to switch to the new JetStream API for enhanced capabilities and
// simplified API. Please refer to the `jetstream` package.
// See: https://github.com/nats-io/nats.go/blob/main/jetstream/README.md
type JetStream interface {
// Publish publishes a message to JetStream.
Publish(subj string, data []byte, opts ...PubOpt) (*PubAck, error)
@ -106,6 +111,11 @@ type JetStream interface {
}
// JetStreamContext allows JetStream messaging and stream management.
//
// NOTE: JetStreamContext is part of legacy API.
// Users are encouraged to switch to the new JetStream API for enhanced capabilities and
// simplified API. Please refer to the `jetstream` package.
// See: https://github.com/nats-io/nats.go/blob/main/jetstream/README.md
type JetStreamContext interface {
JetStream
JetStreamManager
@ -227,14 +237,16 @@ type js struct {
opts *jsOpts
// For async publish context.
mu sync.RWMutex
rpre string
rsub *Subscription
pafs map[string]*pubAckFuture
stc chan struct{}
dch chan struct{}
rr *rand.Rand
connStatusCh chan (Status)
mu sync.RWMutex
rpre string
rsub *Subscription
pafs map[string]*pubAckFuture
stc chan struct{}
dch chan struct{}
rr *rand.Rand
connStatusCh chan (Status)
replyPrefix string
replyPrefixLen int
}
type jsOpts struct {
@ -274,6 +286,11 @@ const (
// JetStream returns a JetStreamContext for messaging and stream management.
// Errors are only returned if inconsistent options are provided.
//
// NOTE: JetStreamContext is part of legacy API.
// Users are encouraged to switch to the new JetStream API for enhanced capabilities and
// simplified API. Please refer to the `jetstream` package.
// See: https://github.com/nats-io/nats.go/blob/main/jetstream/README.md
func (nc *Conn) JetStream(opts ...JSOpt) (JetStreamContext, error) {
js := &js{
nc: nc,
@ -283,6 +300,12 @@ func (nc *Conn) JetStream(opts ...JSOpt) (JetStreamContext, error) {
maxpa: defaultAsyncPubAckInflight,
},
}
inboxPrefix := InboxPrefix
if js.nc.Opts.InboxPrefix != _EMPTY_ {
inboxPrefix = js.nc.Opts.InboxPrefix + "."
}
js.replyPrefix = inboxPrefix
js.replyPrefixLen = len(js.replyPrefix) + aReplyTokensize + 1
for _, opt := range opts {
if err := opt.configureJSContext(js.opts); err != nil {
@ -537,7 +560,7 @@ func (js *js) PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) {
}
if err != nil {
for r, ttl := 0, o.ttl; err == ErrNoResponders && (r < o.rnum || o.rnum < 0); r++ {
for r, ttl := 0, o.ttl; errors.Is(err, ErrNoResponders) && (r < o.rnum || o.rnum < 0); r++ {
// To protect against small blips in leadership changes etc, if we get a no responders here retry.
if o.ctx != nil {
select {
@ -559,7 +582,7 @@ func (js *js) PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) {
}
}
if err != nil {
if err == ErrNoResponders {
if errors.Is(err, ErrNoResponders) {
err = ErrNoStreamResponse
}
return nil, err
@ -641,7 +664,6 @@ func (paf *pubAckFuture) Msg() *Msg {
}
// For quick token lookup etc.
const aReplyPreLen = 14
const aReplyTokensize = 6
func (js *js) newAsyncReply() string {
@ -654,11 +676,7 @@ func (js *js) newAsyncReply() string {
for i := 0; i < aReplyTokensize; i++ {
b[i] = rdigits[int(b[i]%base)]
}
inboxPrefix := InboxPrefix
if js.nc.Opts.InboxPrefix != _EMPTY_ {
inboxPrefix = js.nc.Opts.InboxPrefix + "."
}
js.rpre = fmt.Sprintf("%s%s.", inboxPrefix, b[:aReplyTokensize])
js.rpre = fmt.Sprintf("%s%s.", js.replyPrefix, b[:aReplyTokensize])
sub, err := js.nc.Subscribe(fmt.Sprintf("%s*", js.rpre), js.handleAsyncReply)
if err != nil {
js.mu.Unlock()
@ -694,10 +712,20 @@ func (js *js) resetPendingAcksOnReconnect() {
return
}
js.mu.Lock()
for _, paf := range js.pafs {
errCb := js.opts.aecb
for id, paf := range js.pafs {
paf.err = ErrDisconnected
if paf.errCh != nil {
paf.errCh <- paf.err
}
if errCb != nil {
// clear reply subject so that new one is created on republish
js.mu.Unlock()
errCb(js, paf.msg, ErrDisconnected)
js.mu.Lock()
}
delete(js.pafs, id)
}
js.pafs = nil
if js.dch != nil {
close(js.dch)
js.dch = nil
@ -767,10 +795,10 @@ func (js *js) asyncStall() <-chan struct{} {
// Handle an async reply from PublishAsync.
func (js *js) handleAsyncReply(m *Msg) {
if len(m.Subject) <= aReplyPreLen {
if len(m.Subject) <= js.replyPrefixLen {
return
}
id := m.Subject[aReplyPreLen:]
id := m.Subject[js.replyPrefixLen:]
js.mu.Lock()
paf := js.getPAF(id)
@ -916,7 +944,7 @@ func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) {
return nil, errors.New("nats: error creating async reply handler")
}
id := m.Reply[aReplyPreLen:]
id := m.Reply[js.replyPrefixLen:]
paf := &pubAckFuture{msg: m, st: time.Now()}
numPending, maxPending := js.registerPAF(id, paf)
@ -1241,6 +1269,10 @@ func (sub *Subscription) deleteConsumer() error {
sub.mu.Unlock()
return nil
}
if jsi.stream == _EMPTY_ || jsi.consumer == _EMPTY_ {
sub.mu.Unlock()
return nil
}
stream, consumer := jsi.stream, jsi.consumer
js := jsi.js
sub.mu.Unlock()
@ -1594,7 +1626,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
if consumer != _EMPTY_ && !o.skipCInfo {
info, err = js.ConsumerInfo(stream, consumer)
notFoundErr = errors.Is(err, ErrConsumerNotFound)
lookupErr = err == ErrJetStreamNotEnabled || err == ErrTimeout || err == context.DeadlineExceeded
lookupErr = err == ErrJetStreamNotEnabled || errors.Is(err, ErrTimeout) || errors.Is(err, context.DeadlineExceeded)
}
switch {
@ -1808,7 +1840,9 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
if bl < DefaultSubPendingBytesLimit {
bl = DefaultSubPendingBytesLimit
}
sub.SetPendingLimits(maxap, bl)
if err := sub.SetPendingLimits(maxap, bl); err != nil {
return nil, err
}
}
// Do heartbeats last if needed.
@ -2047,7 +2081,16 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) {
js := jsi.js
sub.mu.Unlock()
consName := nuid.Next()
sub.mu.Lock()
// Attempt to delete the existing consumer.
// We don't wait for the response since even if it's unsuccessful,
// inactivity threshold will kick in and delete it.
if jsi.consumer != _EMPTY_ {
go js.DeleteConsumer(jsi.stream, jsi.consumer)
}
jsi.consumer = ""
sub.mu.Unlock()
consName := getHash(nuid.Next())
cinfo, err := js.upsertConsumer(jsi.stream, consName, cfg)
if err != nil {
var apiErr *APIError
@ -2813,7 +2856,7 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
// are no messages.
msg, err = sub.nextMsgWithContext(ctx, true, false)
if err != nil {
if err == errNoMessages {
if errors.Is(err, errNoMessages) {
err = nil
}
break
@ -2828,7 +2871,14 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
}
var hbTimer *time.Timer
var hbErr error
if err == nil && len(msgs) < batch {
sub.mu.Lock()
subClosed := sub.closed || sub.draining
sub.mu.Unlock()
if subClosed {
err = errors.Join(ErrBadSubscription, ErrSubscriptionClosed)
}
hbLock := sync.Mutex{}
if err == nil && len(msgs) < batch && !subClosed {
// For batch real size of 1, it does not make sense to set no_wait in
// the request.
noWait := batch-len(msgs) > 1
@ -2870,7 +2920,9 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
if o.hb > 0 {
if hbTimer == nil {
hbTimer = time.AfterFunc(2*o.hb, func() {
hbLock.Lock()
hbErr = ErrNoHeartbeat
hbLock.Unlock()
cancel()
})
} else {
@ -2893,13 +2945,13 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
usrMsg, err = checkMsg(msg, true, noWait)
if err == nil && usrMsg {
msgs = append(msgs, msg)
} else if noWait && (err == errNoMessages || err == errRequestsPending) && len(msgs) == 0 {
} else if noWait && (errors.Is(err, errNoMessages) || errors.Is(err, errRequestsPending)) && len(msgs) == 0 {
// If we have a 404/408 for our "no_wait" request and have
// not collected any message, then resend request to
// wait this time.
noWait = false
err = sendReq()
} else if err == ErrTimeout && len(msgs) == 0 {
} else if errors.Is(err, ErrTimeout) && len(msgs) == 0 {
// If we get a 408, we will bail if we already collected some
// messages, otherwise ignore and go back calling nextMsg.
err = nil
@ -2912,6 +2964,8 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
}
// If there is at least a message added to msgs, then need to return OK and no error
if err != nil && len(msgs) == 0 {
hbLock.Lock()
defer hbLock.Unlock()
if hbErr != nil {
return nil, hbErr
}
@ -3082,7 +3136,7 @@ func (sub *Subscription) FetchBatch(batch int, opts ...PullOpt) (MessageBatch, e
// are no messages.
msg, err := sub.nextMsgWithContext(ctx, true, false)
if err != nil {
if err == errNoMessages {
if errors.Is(err, errNoMessages) {
err = nil
}
result.err = err
@ -3096,8 +3150,14 @@ func (sub *Subscription) FetchBatch(batch int, opts ...PullOpt) (MessageBatch, e
result.msgs <- msg
}
}
if len(result.msgs) == batch || result.err != nil {
sub.mu.Lock()
subClosed := sub.closed || sub.draining
sub.mu.Unlock()
if len(result.msgs) == batch || result.err != nil || subClosed {
close(result.msgs)
if subClosed && len(result.msgs) == 0 {
return nil, errors.Join(ErrBadSubscription, ErrSubscriptionClosed)
}
result.done <- struct{}{}
return result, nil
}
@ -3136,9 +3196,12 @@ func (sub *Subscription) FetchBatch(batch int, opts ...PullOpt) (MessageBatch, e
}
var hbTimer *time.Timer
var hbErr error
hbLock := sync.Mutex{}
if o.hb > 0 {
hbTimer = time.AfterFunc(2*o.hb, func() {
hbLock.Lock()
hbErr = ErrNoHeartbeat
hbLock.Unlock()
cancel()
})
}
@ -3159,7 +3222,7 @@ func (sub *Subscription) FetchBatch(batch int, opts ...PullOpt) (MessageBatch, e
usrMsg, err = checkMsg(msg, true, false)
if err != nil {
if err == ErrTimeout {
if errors.Is(err, ErrTimeout) {
if reqID != "" && !subjectMatchesReqID(msg.Subject, reqID) {
// ignore timeout message from server if it comes from a different pull request
continue
@ -3174,11 +3237,13 @@ func (sub *Subscription) FetchBatch(batch int, opts ...PullOpt) (MessageBatch, e
}
}
if err != nil {
hbLock.Lock()
if hbErr != nil {
result.err = hbErr
} else {
result.err = o.checkCtxErr(err)
}
hbLock.Unlock()
}
close(result.msgs)
result.done <- struct{}{}
@ -3188,7 +3253,7 @@ func (sub *Subscription) FetchBatch(batch int, opts ...PullOpt) (MessageBatch, e
// checkCtxErr is used to determine whether ErrTimeout should be returned in case of context timeout
func (o *pullOpts) checkCtxErr(err error) error {
if o.ctx == nil && err == context.DeadlineExceeded {
if o.ctx == nil && errors.Is(err, context.DeadlineExceeded) {
return ErrTimeout
}
return err
@ -3204,7 +3269,7 @@ func (js *js) getConsumerInfoContext(ctx context.Context, stream, consumer strin
ccInfoSubj := fmt.Sprintf(apiConsumerInfoT, stream, consumer)
resp, err := js.apiRequestWithContext(ctx, js.apiSubj(ccInfoSubj), nil)
if err != nil {
if err == ErrNoResponders {
if errors.Is(err, ErrNoResponders) {
err = ErrJetStreamNotEnabled
}
return nil, err

View File

@ -1,4 +1,4 @@
// Copyright 2020-2022 The NATS Authors
// Copyright 2020-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@ -22,6 +22,10 @@ var (
// API errors
// ErrJetStreamNotEnabled is an error returned when JetStream is not enabled for an account.
//
// Note: This error will not be returned in clustered mode, even if each
// server in the cluster does not have JetStream enabled. In clustered mode,
// requests will time out instead.
ErrJetStreamNotEnabled JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeJetStreamNotEnabled, Description: "jetstream not enabled", Code: 503}}
// ErrJetStreamNotEnabledForAccount is an error returned when JetStream is not enabled for an account.
@ -51,7 +55,7 @@ var (
// ErrStreamSourceMultipleSubjectTransformsNotSupported is returned when the connected nats-server version does not support setting
// the stream sources. If this error is returned when executing AddStream(), the stream with invalid
// configuration was already created in the server.
ErrStreamSourceMultipleSubjectTransformsNotSupported JetStreamError = &jsError{message: "stream sourceing with multiple subject transforms not supported by nats-server"}
ErrStreamSourceMultipleSubjectTransformsNotSupported JetStreamError = &jsError{message: "stream sourcing with multiple subject transforms not supported by nats-server"}
// ErrConsumerNotFound is an error returned when consumer with given name does not exist.
ErrConsumerNotFound JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeConsumerNotFound, Description: "consumer not found", Code: 404}}
@ -120,6 +124,9 @@ var (
// ErrInvalidConsumerName is returned when the provided consumer name is invalid (contains '.' or ' ').
ErrInvalidConsumerName JetStreamError = &jsError{message: "invalid consumer name"}
// ErrInvalidFilterSubject is returned when the provided filter subject is invalid.
ErrInvalidFilterSubject JetStreamError = &jsError{message: "invalid filter subject"}
// ErrNoMatchingStream is returned when stream lookup by subject is unsuccessful.
ErrNoMatchingStream JetStreamError = &jsError{message: "no stream matches subject"}
@ -141,6 +148,9 @@ var (
// ErrNoHeartbeat is returned when no heartbeat is received from server when sending requests with pull consumer.
ErrNoHeartbeat JetStreamError = &jsError{message: "no heartbeat received"}
// ErrSubscriptionClosed is returned when attempting to send pull request to a closed subscription
ErrSubscriptionClosed JetStreamError = &jsError{message: "subscription closed"}
// DEPRECATED: ErrInvalidDurableName is no longer returned and will be removed in future releases.
// Use ErrInvalidConsumerName instead.
ErrInvalidDurableName = errors.New("nats: invalid durable name")

View File

@ -1,4 +1,4 @@
// Copyright 2021-2022 The NATS Authors
// Copyright 2021-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@ -70,6 +70,10 @@ type JetStreamManager interface {
SecureDeleteMsg(name string, seq uint64, opts ...JSOpt) error
// AddConsumer adds a consumer to a stream.
// If the consumer already exists, and the configuration is the same, it
// will return the existing consumer.
// If the consumer already exists, and the configuration is different, it
// will return ErrConsumerNameAlreadyInUse.
AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error)
// UpdateConsumer updates an existing consumer.
@ -102,51 +106,143 @@ type JetStreamManager interface {
// There are sensible defaults for most. If no subjects are
// given the name will be used as the only subject.
type StreamConfig struct {
Name string `json:"name"`
Description string `json:"description,omitempty"`
Subjects []string `json:"subjects,omitempty"`
Retention RetentionPolicy `json:"retention"`
MaxConsumers int `json:"max_consumers"`
MaxMsgs int64 `json:"max_msgs"`
MaxBytes int64 `json:"max_bytes"`
Discard DiscardPolicy `json:"discard"`
DiscardNewPerSubject bool `json:"discard_new_per_subject,omitempty"`
MaxAge time.Duration `json:"max_age"`
MaxMsgsPerSubject int64 `json:"max_msgs_per_subject"`
MaxMsgSize int32 `json:"max_msg_size,omitempty"`
Storage StorageType `json:"storage"`
Replicas int `json:"num_replicas"`
NoAck bool `json:"no_ack,omitempty"`
Template string `json:"template_owner,omitempty"`
Duplicates time.Duration `json:"duplicate_window,omitempty"`
Placement *Placement `json:"placement,omitempty"`
Mirror *StreamSource `json:"mirror,omitempty"`
Sources []*StreamSource `json:"sources,omitempty"`
Sealed bool `json:"sealed,omitempty"`
DenyDelete bool `json:"deny_delete,omitempty"`
DenyPurge bool `json:"deny_purge,omitempty"`
AllowRollup bool `json:"allow_rollup_hdrs,omitempty"`
Compression StoreCompression `json:"compression"`
FirstSeq uint64 `json:"first_seq,omitempty"`
// Name is the name of the stream. It is required and must be unique
// across the JetStream account.
//
// Name Names cannot contain whitespace, ., *, >, path separators
// (forward or backwards slash), and non-printable characters.
Name string `json:"name"`
// Allow applying a subject transform to incoming messages before doing anything else.
// Description is an optional description of the stream.
Description string `json:"description,omitempty"`
// Subjects is a list of subjects that the stream is listening on.
// Wildcards are supported. Subjects cannot be set if the stream is
// created as a mirror.
Subjects []string `json:"subjects,omitempty"`
// Retention defines the message retention policy for the stream.
// Defaults to LimitsPolicy.
Retention RetentionPolicy `json:"retention"`
// MaxConsumers specifies the maximum number of consumers allowed for
// the stream.
MaxConsumers int `json:"max_consumers"`
// MaxMsgs is the maximum number of messages the stream will store.
// After reaching the limit, stream adheres to the discard policy.
// If not set, server default is -1 (unlimited).
MaxMsgs int64 `json:"max_msgs"`
// MaxBytes is the maximum total size of messages the stream will store.
// After reaching the limit, stream adheres to the discard policy.
// If not set, server default is -1 (unlimited).
MaxBytes int64 `json:"max_bytes"`
// Discard defines the policy for handling messages when the stream
// reaches its limits in terms of number of messages or total bytes.
Discard DiscardPolicy `json:"discard"`
// DiscardNewPerSubject is a flag to enable discarding new messages per
// subject when limits are reached. Requires DiscardPolicy to be
// DiscardNew and the MaxMsgsPerSubject to be set.
DiscardNewPerSubject bool `json:"discard_new_per_subject,omitempty"`
// MaxAge is the maximum age of messages that the stream will retain.
MaxAge time.Duration `json:"max_age"`
// MaxMsgsPerSubject is the maximum number of messages per subject that
// the stream will retain.
MaxMsgsPerSubject int64 `json:"max_msgs_per_subject"`
// MaxMsgSize is the maximum size of any single message in the stream.
MaxMsgSize int32 `json:"max_msg_size,omitempty"`
// Storage specifies the type of storage backend used for the stream
// (file or memory).
Storage StorageType `json:"storage"`
// Replicas is the number of stream replicas in clustered JetStream.
// Defaults to 1, maximum is 5.
Replicas int `json:"num_replicas"`
// NoAck is a flag to disable acknowledging messages received by this
// stream.
//
// If set to true, publish methods from the JetStream client will not
// work as expected, since they rely on acknowledgements. Core NATS
// publish methods should be used instead. Note that this will make
// message delivery less reliable.
NoAck bool `json:"no_ack,omitempty"`
// Duplicates is the window within which to track duplicate messages.
// If not set, server default is 2 minutes.
Duplicates time.Duration `json:"duplicate_window,omitempty"`
// Placement is used to declare where the stream should be placed via
// tags and/or an explicit cluster name.
Placement *Placement `json:"placement,omitempty"`
// Mirror defines the configuration for mirroring another stream.
Mirror *StreamSource `json:"mirror,omitempty"`
// Sources is a list of other streams this stream sources messages from.
Sources []*StreamSource `json:"sources,omitempty"`
// Sealed streams do not allow messages to be published or deleted via limits or API,
// sealed streams can not be unsealed via configuration update. Can only
// be set on already created streams via the Update API.
Sealed bool `json:"sealed,omitempty"`
// DenyDelete restricts the ability to delete messages from a stream via
// the API. Defaults to false.
DenyDelete bool `json:"deny_delete,omitempty"`
// DenyPurge restricts the ability to purge messages from a stream via
// the API. Defaults to false.
DenyPurge bool `json:"deny_purge,omitempty"`
// AllowRollup allows the use of the Nats-Rollup header to replace all
// contents of a stream, or subject in a stream, with a single new
// message.
AllowRollup bool `json:"allow_rollup_hdrs,omitempty"`
// Compression specifies the message storage compression algorithm.
// Defaults to NoCompression.
Compression StoreCompression `json:"compression"`
// FirstSeq is the initial sequence number of the first message in the
// stream.
FirstSeq uint64 `json:"first_seq,omitempty"`
// SubjectTransform allows applying a transformation to matching
// messages' subjects.
SubjectTransform *SubjectTransformConfig `json:"subject_transform,omitempty"`
// Allow republish of the message after being sequenced and stored.
// RePublish allows immediate republishing a message to the configured
// subject after it's stored.
RePublish *RePublish `json:"republish,omitempty"`
// Allow higher performance, direct access to get individual messages. E.g. KeyValue
// AllowDirect enables direct access to individual messages using direct
// get API. Defaults to false.
AllowDirect bool `json:"allow_direct"`
// Allow higher performance and unified direct access for mirrors as well.
// MirrorDirect enables direct access to individual messages from the
// origin stream using direct get API. Defaults to false.
MirrorDirect bool `json:"mirror_direct"`
// Limits for consumers on this stream.
// ConsumerLimits defines limits of certain values that consumers can
// set, defaults for those who don't set these settings
ConsumerLimits StreamConsumerLimits `json:"consumer_limits,omitempty"`
// Metadata is additional metadata for the Stream.
// Keys starting with `_nats` are reserved.
// NOTE: Metadata requires nats-server v2.10.0+
// Metadata is a set of application-defined key-value pairs for
// associating metadata on the stream. This feature requires nats-server
// v2.10.0 or later.
Metadata map[string]string `json:"metadata,omitempty"`
// Template identifies the template that manages the Stream. DEPRECATED:
// This feature is no longer supported.
Template string `json:"template_owner,omitempty"`
}
// SubjectTransformConfig is for applying a subject transform (to matching messages) before doing anything else when a new message is received.
@ -252,11 +348,13 @@ type AccountInfo struct {
}
type Tier struct {
Memory uint64 `json:"memory"`
Store uint64 `json:"storage"`
Streams int `json:"streams"`
Consumers int `json:"consumers"`
Limits AccountLimits `json:"limits"`
Memory uint64 `json:"memory"`
Store uint64 `json:"storage"`
ReservedMemory uint64 `json:"reserved_memory"`
ReservedStore uint64 `json:"reserved_storage"`
Streams int `json:"streams"`
Consumers int `json:"consumers"`
Limits AccountLimits `json:"limits"`
}
// APIStats reports on API calls to JetStream for this account.
@ -282,9 +380,13 @@ type accountInfoResponse struct {
AccountInfo
}
// AccountInfo retrieves info about the JetStream usage from the current account.
// If JetStream is not enabled, this will return ErrJetStreamNotEnabled
// Other errors can happen but are generally considered retryable
// AccountInfo fetches account information from the server, containing details
// about the account associated with this JetStream connection. If account is
// not enabled for JetStream, ErrJetStreamNotEnabledForAccount is returned.
//
// If the server does not have JetStream enabled, ErrJetStreamNotEnabled is
// returned (for a single server setup). For clustered topologies, AccountInfo
// will time out.
func (js *js) AccountInfo(opts ...JSOpt) (*AccountInfo, error) {
o, cancel, err := getJSContextOpts(js.opts, opts...)
if err != nil {
@ -297,7 +399,7 @@ func (js *js) AccountInfo(opts ...JSOpt) (*AccountInfo, error) {
resp, err := js.apiRequestWithContext(o.ctx, js.apiSubj(apiAccountInfo), nil)
if err != nil {
// todo maybe nats server should never have no responder on this subject and always respond if they know there is no js to be had
if err == ErrNoResponders {
if errors.Is(err, ErrNoResponders) {
err = ErrJetStreamNotEnabled
}
return nil, err
@ -327,7 +429,11 @@ type consumerResponse struct {
*ConsumerInfo
}
// AddConsumer will add a JetStream consumer.
// AddConsumer adds a consumer to a stream.
// If the consumer already exists, and the configuration is the same, it
// will return the existing consumer.
// If the consumer already exists, and the configuration is different, it
// will return ErrConsumerNameAlreadyInUse.
func (js *js) AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) {
if cfg == nil {
cfg = &ConsumerConfig{}
@ -400,6 +506,10 @@ func (js *js) upsertConsumer(stream, consumerName string, cfg *ConsumerConfig, o
// if filter subject is empty or ">", use the endpoint without filter subject
ccSubj = fmt.Sprintf(apiConsumerCreateT, stream, consumerName)
} else {
// safeguard against passing invalid filter subject in request subject
if cfg.FilterSubject[0] == '.' || cfg.FilterSubject[len(cfg.FilterSubject)-1] == '.' {
return nil, fmt.Errorf("%w: %q", ErrInvalidFilterSubject, cfg.FilterSubject)
}
// if filter subject is not empty, use the endpoint with filter subject
ccSubj = fmt.Sprintf(apiConsumerCreateWithFilterSubjectT, stream, consumerName, cfg.FilterSubject)
}
@ -415,7 +525,7 @@ func (js *js) upsertConsumer(stream, consumerName string, cfg *ConsumerConfig, o
resp, err := js.apiRequestWithContext(o.ctx, js.apiSubj(ccSubj), req)
if err != nil {
if err == ErrNoResponders {
if errors.Is(err, ErrNoResponders) {
err = ErrJetStreamNotEnabled
}
return nil, err
@ -1623,7 +1733,7 @@ func (jsc *js) StreamNameBySubject(subj string, opts ...JSOpt) (string, error) {
resp, err := jsc.apiRequestWithContext(o.ctx, jsc.apiSubj(apiStreams), j)
if err != nil {
if err == ErrNoResponders {
if errors.Is(err, ErrNoResponders) {
err = ErrJetStreamNotEnabled
}
return _EMPTY_, err

View File

@ -1,4 +1,4 @@
// Copyright 2021-2022 The NATS Authors
// Copyright 2021-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@ -65,7 +65,10 @@ type KeyValue interface {
// WatchAll will invoke the callback for all updates.
WatchAll(opts ...WatchOpt) (KeyWatcher, error)
// Keys will return all keys.
// DEPRECATED: Use ListKeys instead to avoid memory issues.
Keys(opts ...WatchOpt) ([]string, error)
// ListKeys will return all keys in a channel.
ListKeys(opts ...WatchOpt) (KeyLister, error)
// History will return all historical values for the key.
History(key string, opts ...WatchOpt) ([]KeyValueEntry, error)
// Bucket returns the current bucket name.
@ -95,6 +98,9 @@ type KeyValueStatus interface {
// Bytes returns the size in bytes of the bucket
Bytes() uint64
// IsCompressed indicates if the data is compressed on disk
IsCompressed() bool
}
// KeyWatcher is what is returned when doing a watch.
@ -107,6 +113,12 @@ type KeyWatcher interface {
Stop() error
}
// KeyLister is used to retrieve a list of key value store keys
type KeyLister interface {
Keys() <-chan string
Stop() error
}
type WatchOpt interface {
configureWatcher(opts *watchOpts) error
}
@ -237,18 +249,22 @@ func purge() DeleteOpt {
// KeyValueConfig is for configuring a KeyValue store.
type KeyValueConfig struct {
Bucket string
Description string
MaxValueSize int32
History uint8
TTL time.Duration
MaxBytes int64
Storage StorageType
Replicas int
Placement *Placement
RePublish *RePublish
Mirror *StreamSource
Sources []*StreamSource
Bucket string `json:"bucket"`
Description string `json:"description,omitempty"`
MaxValueSize int32 `json:"max_value_size,omitempty"`
History uint8 `json:"history,omitempty"`
TTL time.Duration `json:"ttl,omitempty"`
MaxBytes int64 `json:"max_bytes,omitempty"`
Storage StorageType `json:"storage,omitempty"`
Replicas int `json:"num_replicas,omitempty"`
Placement *Placement `json:"placement,omitempty"`
RePublish *RePublish `json:"republish,omitempty"`
Mirror *StreamSource `json:"mirror,omitempty"`
Sources []*StreamSource `json:"sources,omitempty"`
// Enable underlying stream compression.
// NOTE: Compression is supported for nats-server 2.10.0+
Compression bool `json:"compression,omitempty"`
}
// Used to watch all keys.
@ -328,8 +344,9 @@ const (
// Regex for valid keys and buckets.
var (
validBucketRe = regexp.MustCompile(`\A[a-zA-Z0-9_-]+\z`)
validKeyRe = regexp.MustCompile(`\A[-/_=\.a-zA-Z0-9]+\z`)
validBucketRe = regexp.MustCompile(`^[a-zA-Z0-9_-]+$`)
validKeyRe = regexp.MustCompile(`^[-/_=\.a-zA-Z0-9]+$`)
validSearchKeyRe = regexp.MustCompile(`^[-/_=\.a-zA-Z0-9*]*[>]?$`)
)
// KeyValue will lookup and bind to an existing KeyValue store.
@ -337,13 +354,13 @@ func (js *js) KeyValue(bucket string) (KeyValue, error) {
if !js.nc.serverMinVersion(2, 6, 2) {
return nil, errors.New("nats: key-value requires at least server version 2.6.2")
}
if !validBucketRe.MatchString(bucket) {
if !bucketValid(bucket) {
return nil, ErrInvalidBucketName
}
stream := fmt.Sprintf(kvBucketNameTmpl, bucket)
si, err := js.StreamInfo(stream)
if err != nil {
if err == ErrStreamNotFound {
if errors.Is(err, ErrStreamNotFound) {
err = ErrBucketNotFound
}
return nil, err
@ -365,7 +382,7 @@ func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) {
if cfg == nil {
return nil, ErrKeyValueConfigRequired
}
if !validBucketRe.MatchString(cfg.Bucket) {
if !bucketValid(cfg.Bucket) {
return nil, ErrInvalidBucketName
}
if _, err := js.AccountInfo(); err != nil {
@ -405,6 +422,10 @@ func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) {
if cfg.TTL > 0 && cfg.TTL < duplicateWindow {
duplicateWindow = cfg.TTL
}
var compression StoreCompression
if cfg.Compression {
compression = S2Compression
}
scfg := &StreamConfig{
Name: fmt.Sprintf(kvBucketNameTmpl, cfg.Bucket),
Description: cfg.Description,
@ -422,6 +443,7 @@ func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) {
MaxConsumers: -1,
AllowDirect: true,
RePublish: cfg.RePublish,
Compression: compression,
}
if cfg.Mirror != nil {
// Copy in case we need to make changes so we do not change caller's version.
@ -465,7 +487,7 @@ func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) {
// the stream.
// The same logic applies for KVs created pre 2.9.x and
// the AllowDirect setting.
if err == ErrStreamNameAlreadyInUse {
if errors.Is(err, ErrStreamNameAlreadyInUse) {
if si, _ = js.StreamInfo(scfg.Name); si != nil {
// To compare, make the server's stream info discard
// policy same than ours.
@ -486,7 +508,7 @@ func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) {
// DeleteKeyValue will delete this KeyValue store (JetStream stream).
func (js *js) DeleteKeyValue(bucket string) error {
if !validBucketRe.MatchString(bucket) {
if !bucketValid(bucket) {
return ErrInvalidBucketName
}
stream := fmt.Sprintf(kvBucketNameTmpl, bucket)
@ -526,6 +548,13 @@ func (e *kve) Created() time.Time { return e.created }
func (e *kve) Delta() uint64 { return e.delta }
func (e *kve) Operation() KeyValueOp { return e.op }
func bucketValid(bucket string) bool {
if len(bucket) == 0 {
return false
}
return validBucketRe.MatchString(bucket)
}
func keyValid(key string) bool {
if len(key) == 0 || key[0] == '.' || key[len(key)-1] == '.' {
return false
@ -533,11 +562,18 @@ func keyValid(key string) bool {
return validKeyRe.MatchString(key)
}
func searchKeyValid(key string) bool {
if len(key) == 0 || key[0] == '.' || key[len(key)-1] == '.' {
return false
}
return validSearchKeyRe.MatchString(key)
}
// Get returns the latest value for the key.
func (kv *kvs) Get(key string) (KeyValueEntry, error) {
e, err := kv.get(key, kvLatestRevision)
if err != nil {
if err == ErrKeyDeleted {
if errors.Is(err, ErrKeyDeleted) {
return nil, ErrKeyNotFound
}
return nil, err
@ -550,7 +586,7 @@ func (kv *kvs) Get(key string) (KeyValueEntry, error) {
func (kv *kvs) GetRevision(key string, revision uint64) (KeyValueEntry, error) {
e, err := kv.get(key, revision)
if err != nil {
if err == ErrKeyDeleted {
if errors.Is(err, ErrKeyDeleted) {
return nil, ErrKeyNotFound
}
return nil, err
@ -587,7 +623,7 @@ func (kv *kvs) get(key string, revision uint64) (KeyValueEntry, error) {
}
}
if err != nil {
if err == ErrMsgNotFound {
if errors.Is(err, ErrMsgNotFound) {
err = ErrKeyNotFound
}
return nil, err
@ -654,7 +690,7 @@ func (kv *kvs) Create(key string, value []byte) (revision uint64, err error) {
// TODO(dlc) - Since we have tombstones for DEL ops for watchers, this could be from that
// so we need to double check.
if e, err := kv.get(key, kvLatestRevision); err == ErrKeyDeleted {
if e, err := kv.get(key, kvLatestRevision); errors.Is(err, ErrKeyDeleted) {
return kv.Update(key, value, e.Revision())
}
@ -830,6 +866,41 @@ func (kv *kvs) Keys(opts ...WatchOpt) ([]string, error) {
return keys, nil
}
type keyLister struct {
watcher KeyWatcher
keys chan string
}
// ListKeys will return all keys.
func (kv *kvs) ListKeys(opts ...WatchOpt) (KeyLister, error) {
opts = append(opts, IgnoreDeletes(), MetaOnly())
watcher, err := kv.WatchAll(opts...)
if err != nil {
return nil, err
}
kl := &keyLister{watcher: watcher, keys: make(chan string, 256)}
go func() {
defer close(kl.keys)
defer watcher.Stop()
for entry := range watcher.Updates() {
if entry == nil {
return
}
kl.keys <- entry.Key()
}
}()
return kl, nil
}
func (kl *keyLister) Keys() <-chan string {
return kl.keys
}
func (kl *keyLister) Stop() error {
return kl.watcher.Stop()
}
// History will return all values for the key.
func (kv *kvs) History(key string, opts ...WatchOpt) ([]KeyValueEntry, error) {
opts = append(opts, IncludeHistory())
@ -895,6 +966,9 @@ func (kv *kvs) WatchAll(opts ...WatchOpt) (KeyWatcher, error) {
// Watch will fire the callback when a key that matches the keys pattern is updated.
// keys needs to be a valid NATS subject.
func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) {
if !searchKeyValid(keys) {
return nil, fmt.Errorf("%w: %s", ErrInvalidKey, "keys cannot be empty and must be a valid NATS subject")
}
var o watchOpts
for _, opt := range opts {
if opt != nil {
@ -1040,6 +1114,9 @@ func (s *KeyValueBucketStatus) StreamInfo() *StreamInfo { return s.nfo }
// Bytes is the size of the stream
func (s *KeyValueBucketStatus) Bytes() uint64 { return s.nfo.State.Bytes }
// IsCompressed indicates if the data is compressed on disk
func (s *KeyValueBucketStatus) IsCompressed() bool { return s.nfo.Config.Compression != NoCompression }
// Status retrieves the status and configuration of a bucket
func (kv *kvs) Status() (KeyValueStatus, error) {
nfo, err := kv.js.StreamInfo(kv.stream)
@ -1062,7 +1139,7 @@ func (js *js) KeyValueStoreNames() <-chan string {
if !strings.HasPrefix(name, kvBucketNamePre) {
continue
}
ch <- name
ch <- strings.TrimPrefix(name, kvBucketNamePre)
}
}
}()

View File

@ -1,4 +1,4 @@
// Copyright 2012-2023 The NATS Authors
// Copyright 2012-2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@ -47,7 +47,7 @@ import (
// Default Constants
const (
Version = "1.31.0"
Version = "1.35.0"
DefaultURL = "nats://127.0.0.1:4222"
DefaultPort = 4222
DefaultMaxReconnect = 60
@ -90,55 +90,56 @@ const (
// Errors
var (
ErrConnectionClosed = errors.New("nats: connection closed")
ErrConnectionDraining = errors.New("nats: connection draining")
ErrDrainTimeout = errors.New("nats: draining connection timed out")
ErrConnectionReconnecting = errors.New("nats: connection reconnecting")
ErrSecureConnRequired = errors.New("nats: secure connection required")
ErrSecureConnWanted = errors.New("nats: secure connection not available")
ErrBadSubscription = errors.New("nats: invalid subscription")
ErrTypeSubscription = errors.New("nats: invalid subscription type")
ErrBadSubject = errors.New("nats: invalid subject")
ErrBadQueueName = errors.New("nats: invalid queue name")
ErrSlowConsumer = errors.New("nats: slow consumer, messages dropped")
ErrTimeout = errors.New("nats: timeout")
ErrBadTimeout = errors.New("nats: timeout invalid")
ErrAuthorization = errors.New("nats: authorization violation")
ErrAuthExpired = errors.New("nats: authentication expired")
ErrAuthRevoked = errors.New("nats: authentication revoked")
ErrAccountAuthExpired = errors.New("nats: account authentication expired")
ErrNoServers = errors.New("nats: no servers available for connection")
ErrJsonParse = errors.New("nats: connect message, json parse error")
ErrChanArg = errors.New("nats: argument needs to be a channel type")
ErrMaxPayload = errors.New("nats: maximum payload exceeded")
ErrMaxMessages = errors.New("nats: maximum messages delivered")
ErrSyncSubRequired = errors.New("nats: illegal call on an async subscription")
ErrMultipleTLSConfigs = errors.New("nats: multiple tls.Configs not allowed")
ErrNoInfoReceived = errors.New("nats: protocol exception, INFO not received")
ErrReconnectBufExceeded = errors.New("nats: outbound buffer limit exceeded")
ErrInvalidConnection = errors.New("nats: invalid connection")
ErrInvalidMsg = errors.New("nats: invalid message or message nil")
ErrInvalidArg = errors.New("nats: invalid argument")
ErrInvalidContext = errors.New("nats: invalid context")
ErrNoDeadlineContext = errors.New("nats: context requires a deadline")
ErrNoEchoNotSupported = errors.New("nats: no echo option not supported by this server")
ErrClientIDNotSupported = errors.New("nats: client ID not supported by this server")
ErrUserButNoSigCB = errors.New("nats: user callback defined without a signature handler")
ErrNkeyButNoSigCB = errors.New("nats: nkey defined without a signature handler")
ErrNoUserCB = errors.New("nats: user callback not defined")
ErrNkeyAndUser = errors.New("nats: user callback and nkey defined")
ErrNkeysNotSupported = errors.New("nats: nkeys not supported by the server")
ErrStaleConnection = errors.New("nats: " + STALE_CONNECTION)
ErrTokenAlreadySet = errors.New("nats: token and token handler both set")
ErrMsgNotBound = errors.New("nats: message is not bound to subscription/connection")
ErrMsgNoReply = errors.New("nats: message does not have a reply")
ErrClientIPNotSupported = errors.New("nats: client IP not supported by this server")
ErrDisconnected = errors.New("nats: server is disconnected")
ErrHeadersNotSupported = errors.New("nats: headers not supported by this server")
ErrBadHeaderMsg = errors.New("nats: message could not decode headers")
ErrNoResponders = errors.New("nats: no responders available for request")
ErrMaxConnectionsExceeded = errors.New("nats: server maximum connections exceeded")
ErrConnectionNotTLS = errors.New("nats: connection is not tls")
ErrConnectionClosed = errors.New("nats: connection closed")
ErrConnectionDraining = errors.New("nats: connection draining")
ErrDrainTimeout = errors.New("nats: draining connection timed out")
ErrConnectionReconnecting = errors.New("nats: connection reconnecting")
ErrSecureConnRequired = errors.New("nats: secure connection required")
ErrSecureConnWanted = errors.New("nats: secure connection not available")
ErrBadSubscription = errors.New("nats: invalid subscription")
ErrTypeSubscription = errors.New("nats: invalid subscription type")
ErrBadSubject = errors.New("nats: invalid subject")
ErrBadQueueName = errors.New("nats: invalid queue name")
ErrSlowConsumer = errors.New("nats: slow consumer, messages dropped")
ErrTimeout = errors.New("nats: timeout")
ErrBadTimeout = errors.New("nats: timeout invalid")
ErrAuthorization = errors.New("nats: authorization violation")
ErrAuthExpired = errors.New("nats: authentication expired")
ErrAuthRevoked = errors.New("nats: authentication revoked")
ErrAccountAuthExpired = errors.New("nats: account authentication expired")
ErrNoServers = errors.New("nats: no servers available for connection")
ErrJsonParse = errors.New("nats: connect message, json parse error")
ErrChanArg = errors.New("nats: argument needs to be a channel type")
ErrMaxPayload = errors.New("nats: maximum payload exceeded")
ErrMaxMessages = errors.New("nats: maximum messages delivered")
ErrSyncSubRequired = errors.New("nats: illegal call on an async subscription")
ErrMultipleTLSConfigs = errors.New("nats: multiple tls.Configs not allowed")
ErrClientCertOrRootCAsRequired = errors.New("nats: at least one of certCB or rootCAsCB must be set")
ErrNoInfoReceived = errors.New("nats: protocol exception, INFO not received")
ErrReconnectBufExceeded = errors.New("nats: outbound buffer limit exceeded")
ErrInvalidConnection = errors.New("nats: invalid connection")
ErrInvalidMsg = errors.New("nats: invalid message or message nil")
ErrInvalidArg = errors.New("nats: invalid argument")
ErrInvalidContext = errors.New("nats: invalid context")
ErrNoDeadlineContext = errors.New("nats: context requires a deadline")
ErrNoEchoNotSupported = errors.New("nats: no echo option not supported by this server")
ErrClientIDNotSupported = errors.New("nats: client ID not supported by this server")
ErrUserButNoSigCB = errors.New("nats: user callback defined without a signature handler")
ErrNkeyButNoSigCB = errors.New("nats: nkey defined without a signature handler")
ErrNoUserCB = errors.New("nats: user callback not defined")
ErrNkeyAndUser = errors.New("nats: user callback and nkey defined")
ErrNkeysNotSupported = errors.New("nats: nkeys not supported by the server")
ErrStaleConnection = errors.New("nats: " + STALE_CONNECTION)
ErrTokenAlreadySet = errors.New("nats: token and token handler both set")
ErrMsgNotBound = errors.New("nats: message is not bound to subscription/connection")
ErrMsgNoReply = errors.New("nats: message does not have a reply")
ErrClientIPNotSupported = errors.New("nats: client IP not supported by this server")
ErrDisconnected = errors.New("nats: server is disconnected")
ErrHeadersNotSupported = errors.New("nats: headers not supported by this server")
ErrBadHeaderMsg = errors.New("nats: message could not decode headers")
ErrNoResponders = errors.New("nats: no responders available for request")
ErrMaxConnectionsExceeded = errors.New("nats: server maximum connections exceeded")
ErrConnectionNotTLS = errors.New("nats: connection is not tls")
)
// GetDefaultOptions returns default configuration options for the client.
@ -565,7 +566,6 @@ type Conn struct {
respSub string // The wildcard subject
respSubPrefix string // the wildcard prefix including trailing .
respSubLen int // the length of the wildcard prefix excluding trailing .
respScanf string // The scanf template to extract mux token
respMux *Subscription // A single response subscription
respMap map[string]chan *Msg // Request map for the response msg channels
respRand *rand.Rand // Used for generating suffix
@ -607,14 +607,17 @@ type Subscription struct {
// For holding information about a JetStream consumer.
jsi *jsSub
delivered uint64
max uint64
conn *Conn
mcb MsgHandler
mch chan *Msg
closed bool
sc bool
connClosed bool
delivered uint64
max uint64
conn *Conn
mcb MsgHandler
mch chan *Msg
closed bool
sc bool
connClosed bool
draining bool
status SubStatus
statListeners map[chan SubStatus][]SubStatus
// Type of Subscription
typ SubscriptionType
@ -635,6 +638,30 @@ type Subscription struct {
dropped int
}
// Status represents the state of the connection.
type SubStatus int
const (
SubscriptionActive = SubStatus(iota)
SubscriptionDraining
SubscriptionClosed
SubscriptionSlowConsumer
)
func (s SubStatus) String() string {
switch s {
case SubscriptionActive:
return "Active"
case SubscriptionDraining:
return "Draining"
case SubscriptionClosed:
return "Closed"
case SubscriptionSlowConsumer:
return "SlowConsumer"
}
return "unknown status"
}
// Msg represents a message delivered by NATS. This structure is used
// by Subscribers and PublishMsg().
//
@ -849,7 +876,7 @@ func InProcessServer(server InProcessConnProvider) Option {
// Secure is an Option to enable TLS secure connections that skip server verification by default.
// Pass a TLS Configuration for proper TLS.
// NOTE: This should NOT be used in a production setting.
// A TLS Configuration using InsecureSkipVerify should NOT be used in a production setting.
func Secure(tls ...*tls.Config) Option {
return func(o *Options) error {
o.Secure = true
@ -864,6 +891,40 @@ func Secure(tls ...*tls.Config) Option {
}
}
// ClientTLSConfig is an Option to set the TLS configuration for secure
// connections. It can be used to e.g. set TLS config with cert and root CAs
// from memory. For simple use case of loading cert and CAs from file,
// ClientCert and RootCAs options are more convenient.
// If Secure is not already set this will set it as well.
func ClientTLSConfig(certCB TLSCertHandler, rootCAsCB RootCAsHandler) Option {
return func(o *Options) error {
o.Secure = true
if certCB == nil && rootCAsCB == nil {
return ErrClientCertOrRootCAsRequired
}
// Smoke test the callbacks to fail early
// if they are not valid.
if certCB != nil {
if _, err := certCB(); err != nil {
return err
}
}
if rootCAsCB != nil {
if _, err := rootCAsCB(); err != nil {
return err
}
}
if o.TLSConfig == nil {
o.TLSConfig = &tls.Config{MinVersion: tls.VersionTLS12}
}
o.TLSCertCB = certCB
o.RootCAsCB = rootCAsCB
return nil
}
}
// RootCAs is a helper option to provide the RootCAs pool from a list of filenames.
// If Secure is not already set this will set it as well.
func RootCAs(file ...string) Option {
@ -2100,6 +2161,47 @@ func (nc *Conn) waitForExits() {
nc.wg.Wait()
}
// ForceReconnect forces a reconnect attempt to the server.
// This is a non-blocking call and will start the reconnect
// process without waiting for it to complete.
//
// If the connection is already in the process of reconnecting,
// this call will force an immediate reconnect attempt (bypassing
// the current reconnect delay).
func (nc *Conn) ForceReconnect() error {
nc.mu.Lock()
defer nc.mu.Unlock()
if nc.isClosed() {
return ErrConnectionClosed
}
if nc.isReconnecting() {
// if we're already reconnecting, force a reconnect attempt
// even if we're in the middle of a backoff
if nc.rqch != nil {
close(nc.rqch)
}
return nil
}
// Clear any queued pongs
nc.clearPendingFlushCalls()
// Clear any queued and blocking requests.
nc.clearPendingRequestCalls()
// Stop ping timer if set.
nc.stopPingTimer()
// Go ahead and make sure we have flushed the outbound
nc.bw.flush()
nc.conn.Close()
nc.changeConnStatus(RECONNECTING)
go nc.doReconnect(nil, true)
return nil
}
// ConnectedUrl reports the connected server's URL
func (nc *Conn) ConnectedUrl() string {
if nc == nil {
@ -2359,7 +2461,7 @@ func (nc *Conn) connect() (bool, error) {
nc.setup()
nc.changeConnStatus(RECONNECTING)
nc.bw.switchToPending()
go nc.doReconnect(ErrNoServers)
go nc.doReconnect(ErrNoServers, false)
err = nil
} else {
nc.current = nil
@ -2659,7 +2761,7 @@ func (nc *Conn) stopPingTimer() {
// Try to reconnect using the option parameters.
// This function assumes we are allowed to reconnect.
func (nc *Conn) doReconnect(err error) {
func (nc *Conn) doReconnect(err error, forceReconnect bool) {
// We want to make sure we have the other watchers shutdown properly
// here before we proceed past this point.
nc.waitForExits()
@ -2715,7 +2817,8 @@ func (nc *Conn) doReconnect(err error) {
break
}
doSleep := i+1 >= len(nc.srvPool)
doSleep := i+1 >= len(nc.srvPool) && !forceReconnect
forceReconnect = false
nc.mu.Unlock()
if !doSleep {
@ -2742,6 +2845,12 @@ func (nc *Conn) doReconnect(err error) {
select {
case <-rqch:
rt.Stop()
// we need to reset the rqch channel to avoid
// closing a closed channel in the next iteration
nc.mu.Lock()
nc.rqch = make(chan struct{})
nc.mu.Unlock()
case <-rt.C:
}
}
@ -2811,18 +2920,19 @@ func (nc *Conn) doReconnect(err error) {
// Done with the pending buffer
nc.bw.doneWithPending()
// This is where we are truly connected.
nc.status = CONNECTED
// Queue up the correct callback. If we are in initial connect state
// (using retry on failed connect), we will call the ConnectedCB,
// otherwise the ReconnectedCB.
if nc.Opts.ReconnectedCB != nil && !nc.initc {
nc.ach.push(func() { nc.Opts.ReconnectedCB(nc) })
} else if nc.Opts.ConnectedCB != nil && nc.initc {
nc.ach.push(func() { nc.Opts.ConnectedCB(nc) })
}
// If we are here with a retry on failed connect, indicate that the
// initial connect is now complete.
nc.initc = false
// Queue up the reconnect callback.
if nc.Opts.ReconnectedCB != nil {
nc.ach.push(func() { nc.Opts.ReconnectedCB(nc) })
}
// Release lock here, we will return below.
nc.mu.Unlock()
@ -2865,7 +2975,7 @@ func (nc *Conn) processOpErr(err error) {
// Clear any queued pongs, e.g. pending flush calls.
nc.clearPendingFlushCalls()
go nc.doReconnect(err)
go nc.doReconnect(err, false)
nc.mu.Unlock()
return
}
@ -3257,6 +3367,9 @@ func (nc *Conn) processMsg(data []byte) {
}
// Clear any SlowConsumer status.
if sub.sc {
sub.changeSubStatus(SubscriptionActive)
}
sub.sc = false
sub.mu.Unlock()
@ -3280,8 +3393,9 @@ slowConsumer:
sub.pMsgs--
sub.pBytes -= len(m.Data)
}
sub.mu.Unlock()
if sc {
sub.changeSubStatus(SubscriptionSlowConsumer)
sub.mu.Unlock()
// Now we need connection's lock and we may end-up in the situation
// that we were trying to avoid, except that in this case, the client
// is already experiencing client-side slow consumer situation.
@ -3291,6 +3405,8 @@ slowConsumer:
nc.ach.push(func() { nc.Opts.AsyncErrorCB(nc, sub, ErrSlowConsumer) })
}
nc.mu.Unlock()
} else {
sub.mu.Unlock()
}
}
@ -3686,7 +3802,7 @@ func readMIMEHeader(tp *textproto.Reader) (textproto.MIMEHeader, error) {
}
// Process key fetching original case.
i := bytes.IndexByte([]byte(kv), ':')
i := strings.IndexByte(kv, ':')
if i < 0 {
return nil, ErrBadHeaderMsg
}
@ -3699,8 +3815,7 @@ func readMIMEHeader(tp *textproto.Reader) (textproto.MIMEHeader, error) {
for i < len(kv) && (kv[i] == ' ' || kv[i] == '\t') {
i++
}
value := string(kv[i:])
m[key] = append(m[key], value)
m[key] = append(m[key], kv[i:])
if err != nil {
return m, err
}
@ -3903,7 +4018,6 @@ func (nc *Conn) createNewRequestAndSend(subj string, hdr, data []byte) (chan *Ms
nc.mu.Unlock()
return nil, token, err
}
nc.respScanf = strings.Replace(nc.respSub, "*", "%s", -1)
nc.respMux = s
}
nc.mu.Unlock()
@ -4084,16 +4198,14 @@ func (nc *Conn) NewRespInbox() string {
}
// respToken will return the last token of a literal response inbox
// which we use for the message channel lookup. This needs to do a
// scan to protect itself against the server changing the subject.
// which we use for the message channel lookup. This needs to verify the subject
// prefix matches to protect itself against the server changing the subject.
// Lock should be held.
func (nc *Conn) respToken(respInbox string) string {
var token string
n, err := fmt.Sscanf(respInbox, nc.respScanf, &token)
if err != nil || n != 1 {
return ""
if token, found := strings.CutPrefix(respInbox, nc.respSubPrefix); found {
return token
}
return token
return ""
}
// Subscribe will express interest in the given subject. The subject
@ -4263,6 +4375,7 @@ func (nc *Conn) subscribeLocked(subj, queue string, cb MsgHandler, ch chan *Msg,
nc.kickFlusher()
}
sub.changeSubStatus(SubscriptionActive)
return sub, nil
}
@ -4298,8 +4411,15 @@ func (nc *Conn) removeSub(s *Subscription) {
}
}
if s.typ != AsyncSubscription {
done := s.pDone
if done != nil {
done(s.Subject)
}
}
// Mark as invalid
s.closed = true
s.changeSubStatus(SubscriptionClosed)
if s.pCond != nil {
s.pCond.Broadcast()
}
@ -4369,6 +4489,91 @@ func (s *Subscription) Drain() error {
return conn.unsubscribe(s, 0, true)
}
// IsDraining returns a boolean indicating whether the subscription
// is being drained.
// This will return false if the subscription has already been closed.
func (s *Subscription) IsDraining() bool {
if s == nil {
return false
}
s.mu.Lock()
defer s.mu.Unlock()
return s.draining
}
// StatusChanged returns a channel on which given list of subscription status
// changes will be sent. If no status is provided, all status changes will be sent.
// Available statuses are SubscriptionActive, SubscriptionDraining, SubscriptionClosed,
// and SubscriptionSlowConsumer.
// The returned channel will be closed when the subscription is closed.
func (s *Subscription) StatusChanged(statuses ...SubStatus) <-chan SubStatus {
if len(statuses) == 0 {
statuses = []SubStatus{SubscriptionActive, SubscriptionDraining, SubscriptionClosed, SubscriptionSlowConsumer}
}
ch := make(chan SubStatus, 10)
for _, status := range statuses {
s.registerStatusChangeListener(status, ch)
// initial status
if status == s.status {
ch <- status
}
}
return ch
}
// registerStatusChangeListener registers a channel waiting for a specific status change event.
// Status change events are non-blocking - if no receiver is waiting for the status change,
// it will not be sent on the channel. Closed channels are ignored.
func (s *Subscription) registerStatusChangeListener(status SubStatus, ch chan SubStatus) {
s.mu.Lock()
defer s.mu.Unlock()
if s.statListeners == nil {
s.statListeners = make(map[chan SubStatus][]SubStatus)
}
if _, ok := s.statListeners[ch]; !ok {
s.statListeners[ch] = make([]SubStatus, 0)
}
s.statListeners[ch] = append(s.statListeners[ch], status)
}
// sendStatusEvent sends subscription status event to all channels.
// If there is no listener, sendStatusEvent
// will not block. Lock should be held entering.
func (s *Subscription) sendStatusEvent(status SubStatus) {
for ch, statuses := range s.statListeners {
if !containsStatus(statuses, status) {
continue
}
// only send event if someone's listening
select {
case ch <- status:
default:
}
if status == SubscriptionClosed {
close(ch)
}
}
}
func containsStatus(statuses []SubStatus, status SubStatus) bool {
for _, s := range statuses {
if s == status {
return true
}
}
return false
}
// changeSubStatus changes subscription status and sends events
// to all listeners. Lock should be held entering.
func (s *Subscription) changeSubStatus(status SubStatus) {
if s == nil {
return
}
s.sendStatusEvent(status)
s.status = status
}
// Unsubscribe will remove interest in the given subject.
//
// For a JetStream subscription, if the library has created the JetStream
@ -4407,6 +4612,11 @@ func (s *Subscription) Unsubscribe() error {
// checkDrained will watch for a subscription to be fully drained
// and then remove it.
func (nc *Conn) checkDrained(sub *Subscription) {
defer func() {
sub.mu.Lock()
defer sub.mu.Unlock()
sub.draining = false
}()
if nc == nil || sub == nil {
return
}
@ -4516,6 +4726,10 @@ func (nc *Conn) unsubscribe(sub *Subscription, max int, drainMode bool) error {
}
if drainMode {
s.mu.Lock()
s.draining = true
sub.changeSubStatus(SubscriptionDraining)
s.mu.Unlock()
go nc.checkDrained(sub)
}
@ -4618,6 +4832,7 @@ func (s *Subscription) validateNextMsgState(pullSubInternal bool) error {
return ErrSyncSubRequired
}
if s.sc {
s.changeSubStatus(SubscriptionActive)
s.sc = false
return ErrSlowConsumer
}

View File

@ -1,4 +1,4 @@
// Copyright 2013-2022 The NATS Authors
// Copyright 2013-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at

View File

@ -1,4 +1,4 @@
// Copyright 2021-2022 The NATS Authors
// Copyright 2021-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@ -153,6 +153,9 @@ type ObjectStoreConfig struct {
// Bucket-specific metadata
// NOTE: Metadata requires nats-server v2.10.0+
Metadata map[string]string `json:"metadata,omitempty"`
// Enable underlying stream compression.
// NOTE: Compression is supported for nats-server 2.10.0+
Compression bool `json:"compression,omitempty"`
}
type ObjectStoreStatus interface {
@ -174,6 +177,8 @@ type ObjectStoreStatus interface {
BackingStore() string
// Metadata is the user supplied metadata for the bucket
Metadata() map[string]string
// IsCompressed indicates if the data is compressed on disk
IsCompressed() bool
}
// ObjectMetaOptions
@ -266,7 +271,10 @@ func (js *js) CreateObjectStore(cfg *ObjectStoreConfig) (ObjectStore, error) {
if maxBytes == 0 {
maxBytes = -1
}
var compression StoreCompression
if cfg.Compression {
compression = S2Compression
}
scfg := &StreamConfig{
Name: fmt.Sprintf(objNameTmpl, name),
Description: cfg.Description,
@ -280,6 +288,7 @@ func (js *js) CreateObjectStore(cfg *ObjectStoreConfig) (ObjectStore, error) {
AllowRollup: true,
AllowDirect: true,
Metadata: cfg.Metadata,
Compression: compression,
}
// Create our stream.
@ -377,13 +386,16 @@ func (obs *obs) Put(meta *ObjectMeta, r io.Reader, opts ...ObjectOpt) (*ObjectIn
defer jetStream.(*js).cleanupReplySub()
purgePartial := func() {
purgePartial := func() error {
// wait until all pubs are complete or up to default timeout before attempting purge
select {
case <-jetStream.PublishAsyncComplete():
case <-time.After(obs.js.opts.wait):
}
obs.js.purgeStream(obs.stream, &StreamPurgeRequest{Subject: chunkSubj})
if err := obs.js.purgeStream(obs.stream, &StreamPurgeRequest{Subject: chunkSubj}); err != nil {
return fmt.Errorf("could not cleanup bucket after erroneous put operation: %w", err)
}
return nil
}
m, h := NewMsg(chunkSubj), sha256.New()
@ -404,7 +416,9 @@ func (obs *obs) Put(meta *ObjectMeta, r io.Reader, opts ...ObjectOpt) (*ObjectIn
default:
}
if err != nil {
purgePartial()
if purgeErr := purgePartial(); purgeErr != nil {
return nil, errors.Join(err, purgeErr)
}
return nil, err
}
}
@ -415,7 +429,9 @@ func (obs *obs) Put(meta *ObjectMeta, r io.Reader, opts ...ObjectOpt) (*ObjectIn
// Handle all non EOF errors
if readErr != nil && readErr != io.EOF {
purgePartial()
if purgeErr := purgePartial(); purgeErr != nil {
return nil, errors.Join(readErr, purgeErr)
}
return nil, readErr
}
@ -427,11 +443,15 @@ func (obs *obs) Put(meta *ObjectMeta, r io.Reader, opts ...ObjectOpt) (*ObjectIn
// Send msg itself.
if _, err := jetStream.PublishMsgAsync(m); err != nil {
purgePartial()
if purgeErr := purgePartial(); purgeErr != nil {
return nil, errors.Join(err, purgeErr)
}
return nil, err
}
if err := getErr(); err != nil {
purgePartial()
if purgeErr := purgePartial(); purgeErr != nil {
return nil, errors.Join(err, purgeErr)
}
return nil, err
}
// Update totals.
@ -455,7 +475,9 @@ func (obs *obs) Put(meta *ObjectMeta, r io.Reader, opts ...ObjectOpt) (*ObjectIn
mm.Data, err = json.Marshal(info)
if err != nil {
if r != nil {
purgePartial()
if purgeErr := purgePartial(); purgeErr != nil {
return nil, errors.Join(err, purgeErr)
}
}
return nil, err
}
@ -464,7 +486,9 @@ func (obs *obs) Put(meta *ObjectMeta, r io.Reader, opts ...ObjectOpt) (*ObjectIn
_, err = jetStream.PublishMsgAsync(mm)
if err != nil {
if r != nil {
purgePartial()
if purgeErr := purgePartial(); purgeErr != nil {
return nil, errors.Join(err, purgeErr)
}
}
return nil, err
}
@ -474,7 +498,9 @@ func (obs *obs) Put(meta *ObjectMeta, r io.Reader, opts ...ObjectOpt) (*ObjectIn
case <-jetStream.PublishAsyncComplete():
if err := getErr(); err != nil {
if r != nil {
purgePartial()
if purgeErr := purgePartial(); purgeErr != nil {
return nil, errors.Join(err, purgeErr)
}
}
return nil, err
}
@ -487,7 +513,9 @@ func (obs *obs) Put(meta *ObjectMeta, r io.Reader, opts ...ObjectOpt) (*ObjectIn
// Delete any original chunks.
if einfo != nil && !einfo.Deleted {
echunkSubj := fmt.Sprintf(objChunksPreTmpl, obs.name, einfo.NUID)
obs.js.purgeStream(obs.stream, &StreamPurgeRequest{Subject: echunkSubj})
if err := obs.js.purgeStream(obs.stream, &StreamPurgeRequest{Subject: echunkSubj}); err != nil {
return info, err
}
}
// TODO would it be okay to do this to return the info with the correct time?
@ -516,11 +544,12 @@ func DecodeObjectDigest(data string) ([]byte, error) {
// ObjectResult impl.
type objResult struct {
sync.Mutex
info *ObjectInfo
r io.ReadCloser
err error
ctx context.Context
digest hash.Hash
info *ObjectInfo
r io.ReadCloser
err error
ctx context.Context
digest hash.Hash
readTimeout time.Duration
}
func (info *ObjectInfo) isLink() bool {
@ -604,7 +633,7 @@ func (obs *obs) Get(name string, opts ...GetObjectOpt) (ObjectResult, error) {
return lobs.Get(info.ObjectMeta.Opts.Link.Name)
}
result := &objResult{info: info, ctx: ctx}
result := &objResult{info: info, ctx: ctx, readTimeout: obs.js.opts.wait}
if info.Size == 0 {
return result, nil
}
@ -626,7 +655,7 @@ func (obs *obs) Get(name string, opts ...GetObjectOpt) (ObjectResult, error) {
if ctx != nil {
select {
case <-ctx.Done():
if ctx.Err() == context.Canceled {
if errors.Is(ctx.Err(), context.Canceled) {
err = ctx.Err()
} else {
err = ErrTimeout
@ -665,7 +694,12 @@ func (obs *obs) Get(name string, opts ...GetObjectOpt) (ObjectResult, error) {
}
chunkSubj := fmt.Sprintf(objChunksPreTmpl, obs.name, info.NUID)
_, err = obs.js.Subscribe(chunkSubj, processChunk, OrderedConsumer())
streamName := fmt.Sprintf(objNameTmpl, obs.name)
subscribeOpts := []SubOpt{
OrderedConsumer(),
BindStream(streamName),
}
_, err = obs.js.Subscribe(chunkSubj, processChunk, subscribeOpts...)
if err != nil {
return nil, err
}
@ -926,7 +960,7 @@ func (obs *obs) GetInfo(name string, opts ...GetObjectInfoOpt) (*ObjectInfo, err
m, err := obs.js.GetLastMsg(stream, metaSubj)
if err != nil {
if err == ErrMsgNotFound {
if errors.Is(err, ErrMsgNotFound) {
err = ErrObjectNotFound
}
return nil, err
@ -1081,7 +1115,8 @@ func (obs *obs) Watch(opts ...WatchOpt) (ObjectWatcher, error) {
}
// Used ordered consumer to deliver results.
subOpts := []SubOpt{OrderedConsumer()}
streamName := fmt.Sprintf(objNameTmpl, obs.name)
subOpts := []SubOpt{OrderedConsumer(), BindStream(streamName)}
if !o.includeHistory {
subOpts = append(subOpts, DeliverLastPerSubject())
}
@ -1204,6 +1239,9 @@ func (s *ObjectBucketStatus) Metadata() map[string]string { return s.nfo.Config.
// StreamInfo is the stream info retrieved to create the status
func (s *ObjectBucketStatus) StreamInfo() *StreamInfo { return s.nfo }
// IsCompressed indicates if the data is compressed on disk
func (s *ObjectBucketStatus) IsCompressed() bool { return s.nfo.Config.Compression != NoCompression }
// Status retrieves run-time status about a bucket
func (obs *obs) Status() (ObjectStoreStatus, error) {
nfo, err := obs.js.StreamInfo(obs.stream)
@ -1223,7 +1261,11 @@ func (obs *obs) Status() (ObjectStoreStatus, error) {
func (o *objResult) Read(p []byte) (n int, err error) {
o.Lock()
defer o.Unlock()
readDeadline := time.Now().Add(o.readTimeout)
if ctx := o.ctx; ctx != nil {
if deadline, ok := ctx.Deadline(); ok {
readDeadline = deadline
}
select {
case <-ctx.Done():
if ctx.Err() == context.Canceled {
@ -1242,7 +1284,7 @@ func (o *objResult) Read(p []byte) (n int, err error) {
}
r := o.r.(net.Conn)
r.SetReadDeadline(time.Now().Add(2 * time.Second))
r.SetReadDeadline(readDeadline)
n, err = r.Read(p)
if err, ok := err.(net.Error); ok && err.Timeout() {
if ctx := o.ctx; ctx != nil {

View File

@ -29,7 +29,7 @@ type timerPool struct {
// Get returns a timer that completes after the given duration.
func (tp *timerPool) Get(d time.Duration) *time.Timer {
if t, _ := tp.p.Get().(*time.Timer); t != nil {
if t, ok := tp.p.Get().(*time.Timer); ok && t != nil {
t.Reset(d)
return t
}