Rename Makefile targets

Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alex@openfaas.com>
This commit is contained in:
Alex Ellis (OpenFaaS Ltd)
2023-10-23 11:29:19 +01:00
parent 479285caf6
commit 9ba4a73d5d
350 changed files with 22981 additions and 3972 deletions

View File

@ -0,0 +1,13 @@
issues:
max-issues-per-linter: 0
max-same-issues: 0
exclude-rules:
- linters:
- errcheck
text: "Unsubscribe"
- linters:
- errcheck
text: "msg.Ack"
- linters:
- errcheck
text: "watcher.Stop"

View File

@ -1,11 +1,12 @@
language: go
go:
- 1.18.x
- 1.17.x
- "1.21.x"
- "1.20.x"
go_import_path: github.com/nats-io/nats.go
install:
- go get -t ./...
- if [[ "$TRAVIS_GO_VERSION" =~ 1.18 ]]; then
- curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin
- if [[ "$TRAVIS_GO_VERSION" =~ 1.21 ]]; then
go install github.com/mattn/goveralls@latest;
go install github.com/wadey/gocovmerge@latest;
go install honnef.co/go/tools/cmd/staticcheck@latest;
@ -14,12 +15,22 @@ install:
before_script:
- $(exit $(go fmt ./... | wc -l))
- go vet -modfile=go_test.mod ./...
- if [[ "$TRAVIS_GO_VERSION" =~ 1.18 ]]; then
- if [[ "$TRAVIS_GO_VERSION" =~ 1.21 ]]; then
find . -type f -name "*.go" | xargs misspell -error -locale US;
GOFLAGS="-mod=mod -modfile=go_test.mod" staticcheck ./...;
fi
- golangci-lint run ./jetstream/...
script:
- go test -modfile=go_test.mod -v -run=TestNoRace -p=1 ./... --failfast -vet=off
- if [[ "$TRAVIS_GO_VERSION" =~ 1.18 ]]; then ./scripts/cov.sh TRAVIS; else go test -modfile=go_test.mod -race -v -p=1 ./... --failfast -vet=off; fi
- if [[ "$TRAVIS_GO_VERSION" =~ 1.21 ]]; then ./scripts/cov.sh TRAVIS; else go test -modfile=go_test.mod -race -v -p=1 ./... --failfast -vet=off -tags=internal_testing; fi
after_success:
- if [[ "$TRAVIS_GO_VERSION" =~ 1.18 ]]; then $HOME/gopath/bin/goveralls -coverprofile=acc.out -service travis-ci; fi
- if [[ "$TRAVIS_GO_VERSION" =~ 1.21 ]]; then $HOME/gopath/bin/goveralls -coverprofile=acc.out -service travis-ci; fi
jobs:
include:
- name: "Go: 1.21.x (nats-server@main)"
go: "1.21.x"
before_script:
- go get -modfile go_test.mod github.com/nats-io/nats-server/v2@main
allow_failures:
- name: "Go: 1.21.x (nats-server@main)"

View File

@ -29,7 +29,7 @@ When using or transitioning to Go modules support:
```bash
# Go client latest or explicit version
go get github.com/nats-io/nats.go/@latest
go get github.com/nats-io/nats.go/@v1.22.1
go get github.com/nats-io/nats.go/@v1.31.0
# For latest NATS Server, add /v2 at the end
go get github.com/nats-io/nats-server/v2
@ -90,84 +90,47 @@ nc.Drain()
nc.Close()
```
## JetStream Basic Usage
## JetStream
JetStream is the built-in NATS persistence system. `nats.go` provides a built-in
API enabling both managing JetStream assets as well as publishing/consuming
persistent messages.
### Basic usage
```go
import "github.com/nats-io/nats.go"
// Connect to NATS
// connect to nats server
nc, _ := nats.Connect(nats.DefaultURL)
// Create JetStream Context
js, _ := nc.JetStream(nats.PublishAsyncMaxPending(256))
// create jetstream context from nats connection
js, _ := jetstream.New(nc)
// Simple Stream Publisher
js.Publish("ORDERS.scratch", []byte("hello"))
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// Simple Async Stream Publisher
for i := 0; i < 500; i++ {
js.PublishAsync("ORDERS.scratch", []byte("hello"))
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * time.Second):
fmt.Println("Did not resolve in time")
}
// get existing stream handle
stream, _ := js.Stream(ctx, "foo")
// Simple Async Ephemeral Consumer
js.Subscribe("ORDERS.*", func(m *nats.Msg) {
fmt.Printf("Received a JetStream message: %s\n", string(m.Data))
// retrieve consumer handle from a stream
cons, _ := stream.Consumer(ctx, "cons")
// consume messages from the consumer in callback
cc, _ := cons.Consume(func(msg jetstream.Msg) {
fmt.Println("Received jetstream message: ", string(msg.Data()))
msg.Ack()
})
// Simple Sync Durable Consumer (optional SubOpts at the end)
sub, err := js.SubscribeSync("ORDERS.*", nats.Durable("MONITOR"), nats.MaxDeliver(3))
m, err := sub.NextMsg(timeout)
// Simple Pull Consumer
sub, err := js.PullSubscribe("ORDERS.*", "MONITOR")
msgs, err := sub.Fetch(10)
// Unsubscribe
sub.Unsubscribe()
// Drain
sub.Drain()
defer cc.Stop()
```
## JetStream Basic Management
To find more information on `nats.go` JetStream API, visit
[`jetstream/README.md`](jetstream/README.md)
```go
import "github.com/nats-io/nats.go"
> The current JetStream API replaces the [legacy JetStream API](legacy_jetstream.md)
// Connect to NATS
nc, _ := nats.Connect(nats.DefaultURL)
## Service API
// Create JetStream Context
js, _ := nc.JetStream()
// Create a Stream
js.AddStream(&nats.StreamConfig{
Name: "ORDERS",
Subjects: []string{"ORDERS.*"},
})
// Update a Stream
js.UpdateStream(&nats.StreamConfig{
Name: "ORDERS",
MaxBytes: 8,
})
// Create a Consumer
js.AddConsumer("ORDERS", &nats.ConsumerConfig{
Durable: "MONITOR",
})
// Delete Consumer
js.DeleteConsumer("ORDERS", "MONITOR")
// Delete Stream
js.DeleteStream("ORDERS")
```
The service API (`micro`) allows you to [easily build NATS services](micro/README.md) The
services API is currently in beta release.
## Encoded Connections

View File

@ -136,9 +136,8 @@ func (s *Subscription) nextMsgWithContext(ctx context.Context, pullSubInternal,
}
if err := s.processNextMsgDelivered(msg); err != nil {
return nil, err
} else {
return msg, nil
}
return msg, nil
default:
// If internal and we don't want to wait, signal that there is no
// message in the internal queue.
@ -218,7 +217,7 @@ func (nc *Conn) FlushWithContext(ctx context.Context) error {
// RequestWithContext will create an Inbox and perform a Request
// using the provided cancellation context with the Inbox reply
// for the data v. A response will be decoded into the vPtr last parameter.
func (c *EncodedConn) RequestWithContext(ctx context.Context, subject string, v interface{}, vPtr interface{}) error {
func (c *EncodedConn) RequestWithContext(ctx context.Context, subject string, v any, vPtr any) error {
if ctx == nil {
return ErrInvalidContext
}

View File

@ -2,12 +2,14 @@
This file lists the dependencies used in this repository.
| Dependency | License |
|-|-|
| Go | BSD 3-Clause "New" or "Revised" License |
| github.com/nats-io/nats.go | Apache License 2.0 |
| github.com/golang/protobuf v1.4.2 | BSD 3-Clause "New" or "Revised" License |
| github.com/nats-io/nats-server/v2 v2.1.8-0.20201115145023-f61fa8529a0f | Apache License 2.0 |
| github.com/nats-io/nkeys v0.2.0 | Apache License 2.0 |
| github.com/nats-io/nuid v1.0.1 | Apache License 2.0 |
| google.golang.org/protobuf v1.23.0 | BSD 3-Clause License |
| Dependency | License |
|-----------------------------------|--------------|
| Go | BSD 3-Clause |
| github.com/golang/protobuf/proto | BSD-3-Clause |
| github.com/klauspost/compress | BSD-3-Clause |
| github.com/nats-io/nats-server/v2 | Apache-2.0 |
| github.com/nats-io/nkeys | Apache-2.0 |
| github.com/nats-io/nuid | Apache-2.0 |
| go.uber.org/goleak | MIT |
| golang.org/x/text | BSD-3-Clause |
| google.golang.org/protobuf | BSD-3-Clause |

View File

@ -26,8 +26,8 @@ import (
// Encoder interface is for all register encoders
type Encoder interface {
Encode(subject string, v interface{}) ([]byte, error)
Decode(subject string, data []byte, vPtr interface{}) error
Encode(subject string, v any) ([]byte, error)
Decode(subject string, data []byte, vPtr any) error
}
var encMap map[string]Encoder
@ -88,7 +88,7 @@ func EncoderForType(encType string) Encoder {
// Publish publishes the data argument to the given subject. The data argument
// will be encoded using the associated encoder.
func (c *EncodedConn) Publish(subject string, v interface{}) error {
func (c *EncodedConn) Publish(subject string, v any) error {
b, err := c.Enc.Encode(subject, v)
if err != nil {
return err
@ -99,7 +99,7 @@ func (c *EncodedConn) Publish(subject string, v interface{}) error {
// PublishRequest will perform a Publish() expecting a response on the
// reply subject. Use Request() for automatically waiting for a response
// inline.
func (c *EncodedConn) PublishRequest(subject, reply string, v interface{}) error {
func (c *EncodedConn) PublishRequest(subject, reply string, v any) error {
b, err := c.Enc.Encode(subject, v)
if err != nil {
return err
@ -110,7 +110,7 @@ func (c *EncodedConn) PublishRequest(subject, reply string, v interface{}) error
// Request will create an Inbox and perform a Request() call
// with the Inbox reply for the data v. A response will be
// decoded into the vPtr Response.
func (c *EncodedConn) Request(subject string, v interface{}, vPtr interface{}, timeout time.Duration) error {
func (c *EncodedConn) Request(subject string, v any, vPtr any, timeout time.Duration) error {
b, err := c.Enc.Encode(subject, v)
if err != nil {
return err
@ -129,7 +129,7 @@ func (c *EncodedConn) Request(subject string, v interface{}, vPtr interface{}, t
}
// Handler is a specific callback used for Subscribe. It is generalized to
// an interface{}, but we will discover its format and arguments at runtime
// an any, but we will discover its format and arguments at runtime
// and perform the correct callback, including demarshaling encoded data
// back into the appropriate struct based on the signature of the Handler.
//
@ -150,7 +150,7 @@ func (c *EncodedConn) Request(subject string, v interface{}, vPtr interface{}, t
// and demarshal it into the given struct, e.g. person.
// There are also variants where the callback wants either the subject, or the
// subject and the reply subject.
type Handler interface{}
type Handler any
// Dissect the cb Handler's signature
func argInfo(cb Handler) (reflect.Type, int) {
@ -265,5 +265,5 @@ func (c *EncodedConn) Drain() error {
// LastError reports the last error encountered via the Connection.
func (c *EncodedConn) LastError() error {
return c.Conn.err
return c.Conn.LastError()
}

View File

@ -35,7 +35,7 @@ var falseB = []byte("false")
var nilB = []byte("")
// Encode
func (je *DefaultEncoder) Encode(subject string, v interface{}) ([]byte, error) {
func (je *DefaultEncoder) Encode(subject string, v any) ([]byte, error) {
switch arg := v.(type) {
case string:
bytes := *(*[]byte)(unsafe.Pointer(&arg))
@ -58,7 +58,7 @@ func (je *DefaultEncoder) Encode(subject string, v interface{}) ([]byte, error)
}
// Decode
func (je *DefaultEncoder) Decode(subject string, data []byte, vPtr interface{}) error {
func (je *DefaultEncoder) Decode(subject string, data []byte, vPtr any) error {
// Figure out what it's pointing to...
sData := *(*string)(unsafe.Pointer(&data))
switch arg := vPtr.(type) {

View File

@ -28,7 +28,7 @@ type GobEncoder struct {
// FIXME(dlc) - This could probably be more efficient.
// Encode
func (ge *GobEncoder) Encode(subject string, v interface{}) ([]byte, error) {
func (ge *GobEncoder) Encode(subject string, v any) ([]byte, error) {
b := new(bytes.Buffer)
enc := gob.NewEncoder(b)
if err := enc.Encode(v); err != nil {
@ -38,7 +38,7 @@ func (ge *GobEncoder) Encode(subject string, v interface{}) ([]byte, error) {
}
// Decode
func (ge *GobEncoder) Decode(subject string, data []byte, vPtr interface{}) (err error) {
func (ge *GobEncoder) Decode(subject string, data []byte, vPtr any) (err error) {
dec := gob.NewDecoder(bytes.NewBuffer(data))
err = dec.Decode(vPtr)
return

View File

@ -26,7 +26,7 @@ type JsonEncoder struct {
}
// Encode
func (je *JsonEncoder) Encode(subject string, v interface{}) ([]byte, error) {
func (je *JsonEncoder) Encode(subject string, v any) ([]byte, error) {
b, err := json.Marshal(v)
if err != nil {
return nil, err
@ -35,7 +35,7 @@ func (je *JsonEncoder) Encode(subject string, v interface{}) ([]byte, error) {
}
// Decode
func (je *JsonEncoder) Decode(subject string, data []byte, vPtr interface{}) (err error) {
func (je *JsonEncoder) Decode(subject string, data []byte, vPtr any) (err error) {
switch arg := vPtr.(type) {
case *string:
// If they want a string and it is a JSON string, strip quotes

View File

@ -1,20 +1,22 @@
module github.com/nats-io/nats.go
go 1.17
go 1.19
require (
github.com/golang/protobuf v1.4.2
github.com/nats-io/nats-server/v2 v2.9.6
github.com/nats-io/nkeys v0.3.0
github.com/klauspost/compress v1.17.0
github.com/nats-io/nats-server/v2 v2.10.0
github.com/nats-io/nkeys v0.4.5
github.com/nats-io/nuid v1.0.1
go.uber.org/goleak v1.2.1
golang.org/x/text v0.13.0
google.golang.org/protobuf v1.23.0
)
require (
github.com/klauspost/compress v1.15.11 // indirect
github.com/minio/highwayhash v1.0.2 // indirect
github.com/nats-io/jwt/v2 v2.3.0 // indirect
golang.org/x/crypto v0.0.0-20220926161630-eccd6366d1be // indirect
golang.org/x/sys v0.0.0-20220928140112-f11e5e49a4ec // indirect
golang.org/x/time v0.0.0-20220922220347-f3bd1da661af // indirect
github.com/nats-io/jwt/v2 v2.5.2 // indirect
golang.org/x/crypto v0.13.0 // indirect
golang.org/x/sys v0.12.0 // indirect
golang.org/x/time v0.3.0 // indirect
)

View File

@ -1,5 +1,4 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
@ -11,45 +10,32 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/klauspost/compress v1.15.11 h1:Lcadnb3RKGin4FYM/orgq0qde+nc15E5Cbqg4B9Sx9c=
github.com/klauspost/compress v1.15.11/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM=
github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt/v2 v2.3.0 h1:z2mA1a7tIf5ShggOFlR1oBPgd6hGqcDYsISxZByUzdI=
github.com/nats-io/jwt/v2 v2.3.0/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k=
github.com/nats-io/nats-server/v2 v2.9.6 h1:RTtK+rv/4CcliOuqGsy58g7MuWkBaWmF5TUNwuUo9Uw=
github.com/nats-io/nats-server/v2 v2.9.6/go.mod h1:AB6hAnGZDlYfqb7CTAm66ZKMZy9DpfierY1/PbpvI2g=
github.com/nats-io/nats.go v1.19.0/go.mod h1:tLqubohF7t4z3du1QDPYJIQQyhb4wl6DhjxEajSI7UA=
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
github.com/nats-io/jwt/v2 v2.5.2 h1:DhGH+nKt+wIkDxM6qnVSKjokq5t59AZV5HRcFW0zJwU=
github.com/nats-io/jwt/v2 v2.5.2/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI=
github.com/nats-io/nats-server/v2 v2.10.0 h1:rcU++Hzo+wARxtJugrV3J5z5iGdHeVG8tT8Chb3bKDg=
github.com/nats-io/nats-server/v2 v2.10.0/go.mod h1:3PMvMSu2cuK0J9YInRLWdFpFsswKKGUS77zVSAudRto=
github.com/nats-io/nkeys v0.4.5 h1:Zdz2BUlFm4fJlierwvGK+yl20IAKUm7eV6AAZXEhkPk=
github.com/nats-io/nkeys v0.4.5/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
go.uber.org/automaxprocs v1.5.1/go.mod h1:BF4eumQw0P9GtnuxxovUd06vwm1o18oMzFtK66vU6XU=
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/crypto v0.0.0-20220926161630-eccd6366d1be h1:fmw3UbQh+nxngCAHrDCCztao/kbYFnWjoqop8dHx05A=
golang.org/x/crypto v0.0.0-20220926161630-eccd6366d1be/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
golang.org/x/crypto v0.13.0 h1:mvySKfSWJ+UKUii46M40LOvyWfN0s2U+46/jDd0e6Ck=
golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc=
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g=
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220928140112-f11e5e49a4ec h1:BkDtF2Ih9xZ7le9ndzTA7KJow28VbQW3odyk/8drmuI=
golang.org/x/sys v0.0.0-20220928140112-f11e5e49a4ec/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/time v0.0.0-20220922220347-f3bd1da661af h1:Yx9k8YCG3dvF87UAn2tu2HQLf2dt/eR1bXxpLMWeH+Y=
golang.org/x/time v0.0.0-20220922220347-f3bd1da661af/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4=
golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
@ -59,7 +45,4 @@ google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miE
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM=
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=

View File

@ -0,0 +1,104 @@
// Copyright 2020-2022 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package parser
import (
"errors"
"fmt"
)
const (
AckDomainTokenPos = iota + 2
AckAccHashTokenPos
AckStreamTokenPos
AckConsumerTokenPos
AckNumDeliveredTokenPos
AckStreamSeqTokenPos
AckConsumerSeqTokenPos
AckTimestampSeqTokenPos
AckNumPendingTokenPos
)
var ErrInvalidSubjectFormat = errors.New("invalid format of ACK subject")
// Quick parser for positive numbers in ack reply encoding.
// NOTE: This parser does not detect uint64 overflow
func ParseNum(d string) (n uint64) {
if len(d) == 0 {
return 0
}
// ASCII numbers 0-9
const (
asciiZero = 48
asciiNine = 57
)
for _, dec := range d {
if dec < asciiZero || dec > asciiNine {
return 0
}
n = n*10 + uint64(dec) - asciiZero
}
return
}
func GetMetadataFields(subject string) ([]string, error) {
v1TokenCounts, v2TokenCounts := 9, 12
var start int
tokens := make([]string, 0, v2TokenCounts)
for i := 0; i < len(subject); i++ {
if subject[i] == '.' {
tokens = append(tokens, subject[start:i])
start = i + 1
}
}
tokens = append(tokens, subject[start:])
//
// Newer server will include the domain name and account hash in the subject,
// and a token at the end.
//
// Old subject was:
// $JS.ACK.<stream>.<consumer>.<delivered>.<sseq>.<cseq>.<tm>.<pending>
//
// New subject would be:
// $JS.ACK.<domain>.<account hash>.<stream>.<consumer>.<delivered>.<sseq>.<cseq>.<tm>.<pending>.<a token with a random value>
//
// v1 has 9 tokens, v2 has 12, but we must not be strict on the 12th since
// it may be removed in the future. Also, the library has no use for it.
// The point is that a v2 ACK subject is valid if it has at least 11 tokens.
//
tokensLen := len(tokens)
// If lower than 9 or more than 9 but less than 11, report an error
if tokensLen < v1TokenCounts || (tokensLen > v1TokenCounts && tokensLen < v2TokenCounts-1) {
return nil, ErrInvalidSubjectFormat
}
if tokens[0] != "$JS" || tokens[1] != "ACK" {
return nil, fmt.Errorf("%w: subject should start with $JS.ACK", ErrInvalidSubjectFormat)
}
// For v1 style, we insert 2 empty tokens (domain and hash) so that the
// rest of the library references known fields at a constant location.
if tokensLen == v1TokenCounts {
// Extend the array (we know the backend is big enough)
tokens = append(tokens[:AckDomainTokenPos+2], tokens[AckDomainTokenPos:]...)
// Clear the domain and hash tokens
tokens[AckDomainTokenPos], tokens[AckAccHashTokenPos] = "", ""
} else if tokens[AckDomainTokenPos] == "_" {
// If domain is "_", replace with empty value.
tokens[AckDomainTokenPos] = ""
}
return tokens, nil
}

File diff suppressed because it is too large Load Diff

View File

@ -33,6 +33,26 @@ var (
// ErrStreamNameAlreadyInUse is returned when a stream with given name already exists and has a different configuration.
ErrStreamNameAlreadyInUse JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeStreamNameInUse, Description: "stream name already in use", Code: 400}}
// ErrStreamSubjectTransformNotSupported is returned when the connected nats-server version does not support setting
// the stream subject transform. If this error is returned when executing AddStream(), the stream with invalid
// configuration was already created in the server.
ErrStreamSubjectTransformNotSupported JetStreamError = &jsError{message: "stream subject transformation not supported by nats-server"}
// ErrStreamSourceSubjectTransformNotSupported is returned when the connected nats-server version does not support setting
// the stream source subject transform. If this error is returned when executing AddStream(), the stream with invalid
// configuration was already created in the server.
ErrStreamSourceSubjectTransformNotSupported JetStreamError = &jsError{message: "stream subject transformation not supported by nats-server"}
// ErrStreamSourceNotSupported is returned when the connected nats-server version does not support setting
// the stream sources. If this error is returned when executing AddStream(), the stream with invalid
// configuration was already created in the server.
ErrStreamSourceNotSupported JetStreamError = &jsError{message: "stream sourcing is not supported by nats-server"}
// ErrStreamSourceMultipleSubjectTransformsNotSupported is returned when the connected nats-server version does not support setting
// the stream sources. If this error is returned when executing AddStream(), the stream with invalid
// configuration was already created in the server.
ErrStreamSourceMultipleSubjectTransformsNotSupported JetStreamError = &jsError{message: "stream sourceing with multiple subject transforms not supported by nats-server"}
// ErrConsumerNotFound is an error returned when consumer with given name does not exist.
ErrConsumerNotFound JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeConsumerNotFound, Description: "consumer not found", Code: 404}}
@ -42,6 +62,15 @@ var (
// ErrBadRequest is returned when invalid request is sent to JetStream API.
ErrBadRequest JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeBadRequest, Description: "bad request", Code: 400}}
// ErrDuplicateFilterSubjects is returned when both FilterSubject and FilterSubjects are specified when creating consumer.
ErrDuplicateFilterSubjects JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeDuplicateFilterSubjects, Description: "consumer cannot have both FilterSubject and FilterSubjects specified", Code: 500}}
// ErrDuplicateFilterSubjects is returned when filter subjects overlap when creating consumer.
ErrOverlappingFilterSubjects JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeOverlappingFilterSubjects, Description: "consumer subject filters cannot overlap", Code: 500}}
// ErrEmptyFilter is returned when a filter in FilterSubjects is empty.
ErrEmptyFilter JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeConsumerEmptyFilter, Description: "consumer filter in FilterSubjects cannot be empty", Code: 500}}
// Client errors
// ErrConsumerNameAlreadyInUse is an error returned when consumer with given name already exists.
@ -62,6 +91,11 @@ var (
// ErrConsumerNameRequired is returned when the provided consumer durable name is empty.
ErrConsumerNameRequired JetStreamError = &jsError{message: "consumer name is required"}
// ErrConsumerMultipleFilterSubjectsNotSupported is returned when the connected nats-server version does not support setting
// multiple filter subjects with filter_subjects field. If this error is returned when executing AddConsumer(), the consumer with invalid
// configuration was already created in the server.
ErrConsumerMultipleFilterSubjectsNotSupported JetStreamError = &jsError{message: "multiple consumer filter subjects not supported by nats-server"}
// ErrConsumerConfigRequired is returned when empty consumer consuguration is supplied to add/update consumer.
ErrConsumerConfigRequired JetStreamError = &jsError{message: "consumer configuration is required"}
@ -80,10 +114,10 @@ var (
// ErrNotJSMessage is returned when attempting to get metadata from non JetStream message .
ErrNotJSMessage JetStreamError = &jsError{message: "not a jetstream message"}
// ErrInvalidStreamName is returned when the provided stream name is invalid (contains '.').
// ErrInvalidStreamName is returned when the provided stream name is invalid (contains '.' or ' ').
ErrInvalidStreamName JetStreamError = &jsError{message: "invalid stream name"}
// ErrInvalidConsumerName is returned when the provided consumer name is invalid (contains '.').
// ErrInvalidConsumerName is returned when the provided consumer name is invalid (contains '.' or ' ').
ErrInvalidConsumerName JetStreamError = &jsError{message: "invalid consumer name"}
// ErrNoMatchingStream is returned when stream lookup by subject is unsuccessful.
@ -104,6 +138,9 @@ var (
// ErrConsumerLeadershipChanged is returned when pending requests are no longer valid after leadership has changed
ErrConsumerLeadershipChanged JetStreamError = &jsError{message: "Leadership Changed"}
// ErrNoHeartbeat is returned when no heartbeat is received from server when sending requests with pull consumer.
ErrNoHeartbeat JetStreamError = &jsError{message: "no heartbeat received"}
// DEPRECATED: ErrInvalidDurableName is no longer returned and will be removed in future releases.
// Use ErrInvalidConsumerName instead.
ErrInvalidDurableName = errors.New("nats: invalid durable name")
@ -115,17 +152,22 @@ type ErrorCode uint16
const (
JSErrCodeJetStreamNotEnabledForAccount ErrorCode = 10039
JSErrCodeJetStreamNotEnabled ErrorCode = 10076
JSErrCodeInsufficientResourcesErr ErrorCode = 10023
JSErrCodeStreamNotFound ErrorCode = 10059
JSErrCodeStreamNameInUse ErrorCode = 10058
JSErrCodeConsumerNotFound ErrorCode = 10014
JSErrCodeConsumerNameExists ErrorCode = 10013
JSErrCodeConsumerAlreadyExists ErrorCode = 10105
JSErrCodeConsumerNotFound ErrorCode = 10014
JSErrCodeConsumerNameExists ErrorCode = 10013
JSErrCodeConsumerAlreadyExists ErrorCode = 10105
JSErrCodeDuplicateFilterSubjects ErrorCode = 10136
JSErrCodeOverlappingFilterSubjects ErrorCode = 10138
JSErrCodeConsumerEmptyFilter ErrorCode = 10139
JSErrCodeMessageNotFound ErrorCode = 10037
JSErrCodeBadRequest ErrorCode = 10003
JSErrCodeBadRequest ErrorCode = 10003
JSStreamInvalidConfig ErrorCode = 10052
JSErrCodeStreamWrongLastSequence ErrorCode = 10071
)

View File

@ -102,30 +102,35 @@ type JetStreamManager interface {
// There are sensible defaults for most. If no subjects are
// given the name will be used as the only subject.
type StreamConfig struct {
Name string `json:"name"`
Description string `json:"description,omitempty"`
Subjects []string `json:"subjects,omitempty"`
Retention RetentionPolicy `json:"retention"`
MaxConsumers int `json:"max_consumers"`
MaxMsgs int64 `json:"max_msgs"`
MaxBytes int64 `json:"max_bytes"`
Discard DiscardPolicy `json:"discard"`
DiscardNewPerSubject bool `json:"discard_new_per_subject,omitempty"`
MaxAge time.Duration `json:"max_age"`
MaxMsgsPerSubject int64 `json:"max_msgs_per_subject"`
MaxMsgSize int32 `json:"max_msg_size,omitempty"`
Storage StorageType `json:"storage"`
Replicas int `json:"num_replicas"`
NoAck bool `json:"no_ack,omitempty"`
Template string `json:"template_owner,omitempty"`
Duplicates time.Duration `json:"duplicate_window,omitempty"`
Placement *Placement `json:"placement,omitempty"`
Mirror *StreamSource `json:"mirror,omitempty"`
Sources []*StreamSource `json:"sources,omitempty"`
Sealed bool `json:"sealed,omitempty"`
DenyDelete bool `json:"deny_delete,omitempty"`
DenyPurge bool `json:"deny_purge,omitempty"`
AllowRollup bool `json:"allow_rollup_hdrs,omitempty"`
Name string `json:"name"`
Description string `json:"description,omitempty"`
Subjects []string `json:"subjects,omitempty"`
Retention RetentionPolicy `json:"retention"`
MaxConsumers int `json:"max_consumers"`
MaxMsgs int64 `json:"max_msgs"`
MaxBytes int64 `json:"max_bytes"`
Discard DiscardPolicy `json:"discard"`
DiscardNewPerSubject bool `json:"discard_new_per_subject,omitempty"`
MaxAge time.Duration `json:"max_age"`
MaxMsgsPerSubject int64 `json:"max_msgs_per_subject"`
MaxMsgSize int32 `json:"max_msg_size,omitempty"`
Storage StorageType `json:"storage"`
Replicas int `json:"num_replicas"`
NoAck bool `json:"no_ack,omitempty"`
Template string `json:"template_owner,omitempty"`
Duplicates time.Duration `json:"duplicate_window,omitempty"`
Placement *Placement `json:"placement,omitempty"`
Mirror *StreamSource `json:"mirror,omitempty"`
Sources []*StreamSource `json:"sources,omitempty"`
Sealed bool `json:"sealed,omitempty"`
DenyDelete bool `json:"deny_delete,omitempty"`
DenyPurge bool `json:"deny_purge,omitempty"`
AllowRollup bool `json:"allow_rollup_hdrs,omitempty"`
Compression StoreCompression `json:"compression"`
FirstSeq uint64 `json:"first_seq,omitempty"`
// Allow applying a subject transform to incoming messages before doing anything else.
SubjectTransform *SubjectTransformConfig `json:"subject_transform,omitempty"`
// Allow republish of the message after being sequenced and stored.
RePublish *RePublish `json:"republish,omitempty"`
@ -134,6 +139,20 @@ type StreamConfig struct {
AllowDirect bool `json:"allow_direct"`
// Allow higher performance and unified direct access for mirrors as well.
MirrorDirect bool `json:"mirror_direct"`
// Limits for consumers on this stream.
ConsumerLimits StreamConsumerLimits `json:"consumer_limits,omitempty"`
// Metadata is additional metadata for the Stream.
// Keys starting with `_nats` are reserved.
// NOTE: Metadata requires nats-server v2.10.0+
Metadata map[string]string `json:"metadata,omitempty"`
}
// SubjectTransformConfig is for applying a subject transform (to matching messages) before doing anything else when a new message is received.
type SubjectTransformConfig struct {
Source string `json:"src,omitempty"`
Destination string `json:"dest"`
}
// RePublish is for republishing messages once committed to a stream. The original
@ -152,12 +171,13 @@ type Placement struct {
// StreamSource dictates how streams can source from other streams.
type StreamSource struct {
Name string `json:"name"`
OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
OptStartTime *time.Time `json:"opt_start_time,omitempty"`
FilterSubject string `json:"filter_subject,omitempty"`
External *ExternalStream `json:"external,omitempty"`
Domain string `json:"-"`
Name string `json:"name"`
OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
OptStartTime *time.Time `json:"opt_start_time,omitempty"`
FilterSubject string `json:"filter_subject,omitempty"`
SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"`
External *ExternalStream `json:"external,omitempty"`
Domain string `json:"-"`
}
// ExternalStream allows you to qualify access to a stream source in another
@ -167,6 +187,13 @@ type ExternalStream struct {
DeliverPrefix string `json:"deliver,omitempty"`
}
// StreamConsumerLimits are the limits for a consumer on a stream.
// These can be overridden on a per consumer basis.
type StreamConsumerLimits struct {
InactiveThreshold time.Duration `json:"inactive_threshold,omitempty"`
MaxAckPending int `json:"max_ack_pending,omitempty"`
}
// Helper for copying when we do not want to change user's version.
func (ss *StreamSource) copy() *StreamSource {
nss := *ss
@ -310,7 +337,7 @@ func (js *js) AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*C
consumerName = cfg.Durable
}
if consumerName != _EMPTY_ {
consInfo, err := js.ConsumerInfo(stream, consumerName)
consInfo, err := js.ConsumerInfo(stream, consumerName, opts...)
if err != nil && !errors.Is(err, ErrConsumerNotFound) && !errors.Is(err, ErrStreamNotFound) {
return nil, err
}
@ -319,6 +346,8 @@ func (js *js) AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*C
sameConfig := checkConfig(&consInfo.Config, cfg)
if sameConfig != nil {
return nil, fmt.Errorf("%w: creating consumer %q on stream %q", ErrConsumerNameAlreadyInUse, consumerName, stream)
} else {
return consInfo, nil
}
}
}
@ -359,20 +388,29 @@ func (js *js) upsertConsumer(stream, consumerName string, cfg *ConsumerConfig, o
var ccSubj string
if consumerName == _EMPTY_ {
// if consumer name is empty, use the legacy ephemeral endpoint
// if consumer name is empty (neither Durable nor Name is set), use the legacy ephemeral endpoint
ccSubj = fmt.Sprintf(apiLegacyConsumerCreateT, stream)
} else if err := checkConsumerName(consumerName); err != nil {
return nil, err
} else if !js.nc.serverMinVersion(2, 9, 0) || (cfg.Durable != "" && js.opts.featureFlags.useDurableConsumerCreate) {
// if server version is lower than 2.9.0 or user set the useDurableConsumerCreate flag, use the legacy DURABLE.CREATE endpoint
ccSubj = fmt.Sprintf(apiDurableCreateT, stream, consumerName)
} else {
// if above server version 2.9.0, use the endpoints with consumer name
if cfg.FilterSubject == _EMPTY_ || cfg.FilterSubject == ">" {
} else if js.nc.serverMinVersion(2, 9, 0) {
if cfg.Durable != "" && js.opts.featureFlags.useDurableConsumerCreate {
// if user set the useDurableConsumerCreate flag, use the legacy DURABLE.CREATE endpoint
ccSubj = fmt.Sprintf(apiDurableCreateT, stream, consumerName)
} else if cfg.FilterSubject == _EMPTY_ || cfg.FilterSubject == ">" {
// if filter subject is empty or ">", use the endpoint without filter subject
ccSubj = fmt.Sprintf(apiConsumerCreateT, stream, consumerName)
} else {
// if filter subject is not empty, use the endpoint with filter subject
ccSubj = fmt.Sprintf(apiConsumerCreateWithFilterSubjectT, stream, consumerName, cfg.FilterSubject)
}
} else {
if cfg.Durable != "" {
// if Durable is set, use the DURABLE.CREATE endpoint
ccSubj = fmt.Sprintf(apiDurableCreateT, stream, consumerName)
} else {
// if Durable is not set, use the legacy ephemeral endpoint
ccSubj = fmt.Sprintf(apiLegacyConsumerCreateT, stream)
}
}
resp, err := js.apiRequestWithContext(o.ctx, js.apiSubj(ccSubj), req)
@ -396,6 +434,11 @@ func (js *js) upsertConsumer(stream, consumerName string, cfg *ConsumerConfig, o
}
return nil, info.Error
}
// check whether multiple filter subjects (if used) are reflected in the returned ConsumerInfo
if len(cfg.FilterSubjects) != 0 && len(info.Config.FilterSubjects) == 0 {
return nil, ErrConsumerMultipleFilterSubjectsNotSupported
}
return info.ConsumerInfo, nil
}
@ -409,19 +452,20 @@ func checkStreamName(stream string) error {
if stream == _EMPTY_ {
return ErrStreamNameRequired
}
if strings.Contains(stream, ".") {
if strings.ContainsAny(stream, ". ") {
return ErrInvalidStreamName
}
return nil
}
// Check that the durable name exists and is valid, that is, that it does not contain any "."
// Check that the consumer name is not empty and is valid (does not contain "." and " ").
// Additional consumer name validation is done in nats-server.
// Returns ErrConsumerNameRequired if consumer name is empty, ErrInvalidConsumerName is invalid, otherwise nil
func checkConsumerName(consumer string) error {
if consumer == _EMPTY_ {
return ErrConsumerNameRequired
}
if strings.Contains(consumer, ".") {
if strings.ContainsAny(consumer, ". ") {
return ErrInvalidConsumerName
}
return nil
@ -768,6 +812,21 @@ func (js *js) AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) {
return nil, resp.Error
}
// check that input subject transform (if used) is reflected in the returned ConsumerInfo
if cfg.SubjectTransform != nil && resp.StreamInfo.Config.SubjectTransform == nil {
return nil, ErrStreamSubjectTransformNotSupported
}
if len(cfg.Sources) != 0 {
if len(cfg.Sources) != len(resp.Config.Sources) {
return nil, ErrStreamSourceNotSupported
}
for i := range cfg.Sources {
if len(cfg.Sources[i].SubjectTransforms) != 0 && len(resp.Sources[i].SubjectTransforms) == 0 {
return nil, ErrStreamSourceMultipleSubjectTransformsNotSupported
}
}
}
return resp.StreamInfo, nil
}
@ -885,11 +944,13 @@ type StreamAlternate struct {
// StreamSourceInfo shows information about an upstream stream source.
type StreamSourceInfo struct {
Name string `json:"name"`
Lag uint64 `json:"lag"`
Active time.Duration `json:"active"`
External *ExternalStream `json:"external"`
Error *APIError `json:"error"`
Name string `json:"name"`
Lag uint64 `json:"lag"`
Active time.Duration `json:"active"`
External *ExternalStream `json:"external"`
Error *APIError `json:"error"`
FilterSubject string `json:"filter_subject,omitempty"`
SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"`
}
// StreamState is information about the given stream.
@ -961,6 +1022,23 @@ func (js *js) UpdateStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error
}
return nil, resp.Error
}
// check that input subject transform (if used) is reflected in the returned StreamInfo
if cfg.SubjectTransform != nil && resp.StreamInfo.Config.SubjectTransform == nil {
return nil, ErrStreamSubjectTransformNotSupported
}
if len(cfg.Sources) != 0 {
if len(cfg.Sources) != len(resp.Config.Sources) {
return nil, ErrStreamSourceNotSupported
}
for i := range cfg.Sources {
if len(cfg.Sources[i].SubjectTransforms) != 0 && len(resp.Sources[i].SubjectTransforms) == 0 {
return nil, ErrStreamSourceMultipleSubjectTransformsNotSupported
}
}
}
return resp.StreamInfo, nil
}
@ -1107,7 +1185,7 @@ func (js *js) getMsg(name string, mreq *apiMsgGetRequest, opts ...JSOpt) (*RawSt
var hdr Header
if len(msg.Header) > 0 {
hdr, err = decodeHeadersMsg(msg.Header)
hdr, err = DecodeHeadersMsg(msg.Header)
if err != nil {
return nil, err
}

View File

@ -23,6 +23,8 @@ import (
"strings"
"sync"
"time"
"github.com/nats-io/nats.go/internal/parser"
)
// KeyValueManager is used to manage KeyValue stores.
@ -121,6 +123,8 @@ type watchOpts struct {
ignoreDeletes bool
// Include all history per subject, not just last one.
includeHistory bool
// Include only updates for keys.
updatesOnly bool
// retrieve only the meta data of the entry
metaOnly bool
}
@ -134,11 +138,25 @@ func (opt watchOptFn) configureWatcher(opts *watchOpts) error {
// IncludeHistory instructs the key watcher to include historical values as well.
func IncludeHistory() WatchOpt {
return watchOptFn(func(opts *watchOpts) error {
if opts.updatesOnly {
return errors.New("nats: include history can not be used with updates only")
}
opts.includeHistory = true
return nil
})
}
// UpdatesOnly instructs the key watcher to only include updates on values (without latest values when started).
func UpdatesOnly() WatchOpt {
return watchOptFn(func(opts *watchOpts) error {
if opts.includeHistory {
return errors.New("nats: updates only can not be used with include history")
}
opts.updatesOnly = true
return nil
})
}
// IgnoreDeletes will have the key watcher not pass any deleted keys.
func IgnoreDeletes() WatchOpt {
return watchOptFn(func(opts *watchOpts) error {
@ -414,14 +432,21 @@ func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) {
scfg.Mirror = m
scfg.MirrorDirect = true
} else if len(cfg.Sources) > 0 {
// For now we do not allow direct subjects for sources. If that is desired a user could use stream API directly.
for _, ss := range cfg.Sources {
if !strings.HasPrefix(ss.Name, kvBucketNamePre) {
ss = ss.copy()
var sourceBucketName string
if strings.HasPrefix(ss.Name, kvBucketNamePre) {
sourceBucketName = ss.Name[len(kvBucketNamePre):]
} else {
sourceBucketName = ss.Name
ss.Name = fmt.Sprintf(kvBucketNameTmpl, ss.Name)
}
if ss.External == nil || sourceBucketName != cfg.Bucket {
ss.SubjectTransforms = []SubjectTransformConfig{{Source: fmt.Sprintf(kvSubjectsTmpl, sourceBucketName), Destination: fmt.Sprintf(kvSubjectsTmpl, cfg.Bucket)}}
}
scfg.Sources = append(scfg.Sources, ss)
}
scfg.Subjects = []string{fmt.Sprintf(kvSubjectsTmpl, cfg.Bucket)}
} else {
scfg.Subjects = []string{fmt.Sprintf(kvSubjectsTmpl, cfg.Bucket)}
}
@ -438,11 +463,15 @@ func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) {
// and we are now moving to a v2.7.2+. If that is the case
// and the only difference is the discard policy, then update
// the stream.
// The same logic applies for KVs created pre 2.9.x and
// the AllowDirect setting.
if err == ErrStreamNameAlreadyInUse {
if si, _ = js.StreamInfo(scfg.Name); si != nil {
// To compare, make the server's stream info discard
// policy same than ours.
si.Config.Discard = scfg.Discard
// Also need to set allow direct for v2.9.x+
si.Config.AllowDirect = scfg.AllowDirect
if reflect.DeepEqual(&si.Config, scfg) {
si, err = js.UpdateStream(scfg)
}
@ -616,7 +645,7 @@ func (kv *kvs) PutString(key string, value string) (revision uint64, err error)
return kv.Put(key, []byte(value))
}
// Create will add the key/value pair iff it does not exist.
// Create will add the key/value pair if it does not exist.
func (kv *kvs) Create(key string, value []byte) (revision uint64, err error) {
v, err := kv.Update(key, value, 0)
if err == nil {
@ -639,7 +668,7 @@ func (kv *kvs) Create(key string, value []byte) (revision uint64, err error) {
return 0, err
}
// Update will update the value iff the latest revision matches.
// Update will update the value if the latest revision matches.
func (kv *kvs) Update(key string, value []byte, revision uint64) (uint64, error) {
if !keyValid(key) {
return 0, ErrInvalidKey
@ -885,7 +914,7 @@ func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) {
w := &watcher{updates: make(chan KeyValueEntry, 256), ctx: o.ctx}
update := func(m *Msg) {
tokens, err := getMetadataFields(m.Reply)
tokens, err := parser.GetMetadataFields(m.Reply)
if err != nil {
return
}
@ -903,7 +932,7 @@ func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) {
op = KeyValuePurge
}
}
delta := uint64(parseNum(tokens[ackNumPendingTokenPos]))
delta := parser.ParseNum(tokens[parser.AckNumPendingTokenPos])
w.mu.Lock()
defer w.mu.Unlock()
if !o.ignoreDeletes || (op != KeyValueDelete && op != KeyValuePurge) {
@ -911,14 +940,15 @@ func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) {
bucket: kv.name,
key: subj,
value: m.Data,
revision: uint64(parseNum(tokens[ackStreamSeqTokenPos])),
created: time.Unix(0, parseNum(tokens[ackTimestampSeqTokenPos])),
revision: parser.ParseNum(tokens[parser.AckStreamSeqTokenPos]),
created: time.Unix(0, int64(parser.ParseNum(tokens[parser.AckTimestampSeqTokenPos]))),
delta: delta,
op: op,
}
w.updates <- entry
}
// Check if done and initial values.
// Skip if UpdatesOnly() is set, since there will never be updates initially.
if !w.initDone {
w.received++
// We set this on the first trip through..
@ -937,6 +967,9 @@ func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) {
if !o.includeHistory {
subOpts = append(subOpts, DeliverLastPerSubject())
}
if o.updatesOnly {
subOpts = append(subOpts, DeliverNew())
}
if o.metaOnly {
subOpts = append(subOpts, HeadersOnly())
}
@ -955,12 +988,18 @@ func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) {
sub.mu.Lock()
// If there were no pending messages at the time of the creation
// of the consumer, send the marker.
if sub.jsi != nil && sub.jsi.pending == 0 {
// Skip if UpdatesOnly() is set, since there will never be updates initially.
if !o.updatesOnly {
if sub.jsi != nil && sub.jsi.pending == 0 {
w.initDone = true
w.updates <- nil
}
} else {
// if UpdatesOnly was used, mark initialization as complete
w.initDone = true
w.updates <- nil
}
// Set us up to close when the waitForMessages func returns.
sub.pDone = func() {
sub.pDone = func(_ string) {
close(w.updates)
}
sub.mu.Unlock()
@ -1014,16 +1053,16 @@ func (kv *kvs) Status() (KeyValueStatus, error) {
// KeyValueStoreNames is used to retrieve a list of key value store names
func (js *js) KeyValueStoreNames() <-chan string {
ch := make(chan string)
l := &streamLister{js: js}
l := &streamNamesLister{js: js}
l.js.opts.streamListSubject = fmt.Sprintf(kvSubjectsTmpl, "*")
go func() {
defer close(ch)
for l.Next() {
for _, info := range l.Page() {
if !strings.HasPrefix(info.Config.Name, kvBucketNamePre) {
for _, name := range l.Page() {
if !strings.HasPrefix(name, kvBucketNamePre) {
continue
}
ch <- info.Config.Name
ch <- name
}
}
}()

View File

@ -0,0 +1,83 @@
# Legacy JetStream API
This is a documentation for the legacy JetStream API. A README for the current
API can be found [here](jetstream/README.md)
## JetStream Basic Usage
```go
import "github.com/nats-io/nats.go"
// Connect to NATS
nc, _ := nats.Connect(nats.DefaultURL)
// Create JetStream Context
js, _ := nc.JetStream(nats.PublishAsyncMaxPending(256))
// Simple Stream Publisher
js.Publish("ORDERS.scratch", []byte("hello"))
// Simple Async Stream Publisher
for i := 0; i < 500; i++ {
js.PublishAsync("ORDERS.scratch", []byte("hello"))
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * time.Second):
fmt.Println("Did not resolve in time")
}
// Simple Async Ephemeral Consumer
js.Subscribe("ORDERS.*", func(m *nats.Msg) {
fmt.Printf("Received a JetStream message: %s\n", string(m.Data))
})
// Simple Sync Durable Consumer (optional SubOpts at the end)
sub, err := js.SubscribeSync("ORDERS.*", nats.Durable("MONITOR"), nats.MaxDeliver(3))
m, err := sub.NextMsg(timeout)
// Simple Pull Consumer
sub, err := js.PullSubscribe("ORDERS.*", "MONITOR")
msgs, err := sub.Fetch(10)
// Unsubscribe
sub.Unsubscribe()
// Drain
sub.Drain()
```
## JetStream Basic Management
```go
import "github.com/nats-io/nats.go"
// Connect to NATS
nc, _ := nats.Connect(nats.DefaultURL)
// Create JetStream Context
js, _ := nc.JetStream()
// Create a Stream
js.AddStream(&nats.StreamConfig{
Name: "ORDERS",
Subjects: []string{"ORDERS.*"},
})
// Update a Stream
js.UpdateStream(&nats.StreamConfig{
Name: "ORDERS",
MaxBytes: 8,
})
// Create a Consumer
js.AddConsumer("ORDERS", &nats.ConsumerConfig{
Durable: "MONITOR",
})
// Delete Consumer
js.DeleteConsumer("ORDERS", "MONITOR")
// Delete Stream
js.DeleteStream("ORDERS")
```

View File

@ -1,4 +1,4 @@
// Copyright 2012-2022 The NATS Authors
// Copyright 2012-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@ -47,7 +47,7 @@ import (
// Default Constants
const (
Version = "1.22.1"
Version = "1.31.0"
DefaultURL = "nats://127.0.0.1:4222"
DefaultPort = 4222
DefaultMaxReconnect = 60
@ -61,6 +61,7 @@ const (
DefaultReconnectBufSize = 8 * 1024 * 1024 // 8MB
RequestChanLen = 8
DefaultDrainTimeout = 30 * time.Second
DefaultFlusherTimeout = time.Minute
LangString = "go"
)
@ -140,10 +141,6 @@ var (
ErrConnectionNotTLS = errors.New("nats: connection is not tls")
)
func init() {
rand.Seed(time.Now().UnixNano())
}
// GetDefaultOptions returns default configuration options for the client.
func GetDefaultOptions() Options {
return Options{
@ -158,6 +155,7 @@ func GetDefaultOptions() Options {
SubChanLen: DefaultMaxChanLen,
ReconnectBufSize: DefaultReconnectBufSize,
DrainTimeout: DefaultDrainTimeout,
FlusherTimeout: DefaultFlusherTimeout,
}
}
@ -215,6 +213,13 @@ type ErrHandler func(*Conn, *Subscription, error)
// JWT for this user.
type UserJWTHandler func() (string, error)
// TLSCertHandler is used to fetch and return tls certificate.
type TLSCertHandler func() (tls.Certificate, error)
// RootCAsHandler is used to fetch and return a set of root certificate
// authorities that clients use when verifying server certificates.
type RootCAsHandler func() (*x509.CertPool, error)
// SignatureHandler is used to sign a nonce from the server while
// authenticating with nkeys. The user should sign the nonce and
// return the raw signature. The client will base64 encode this to
@ -303,6 +308,20 @@ type Options struct {
// transports.
TLSConfig *tls.Config
// TLSCertCB is used to fetch and return custom tls certificate.
TLSCertCB TLSCertHandler
// TLSHandshakeFirst is used to instruct the library perform
// the TLS handshake right after the connect and before receiving
// the INFO protocol from the server. If this option is enabled
// but the server is not configured to perform the TLS handshake
// first, the connection will fail.
TLSHandshakeFirst bool
// RootCAsCB is used to fetch and return a set of root certificate
// authorities that clients use when verifying server certificates.
RootCAsCB RootCAsHandler
// AllowReconnect enables reconnection logic to be used when we
// encounter a disconnect from the current server.
AllowReconnect bool
@ -346,6 +365,7 @@ type Options struct {
// FlusherTimeout is the maximum time to wait for write operations
// to the underlying connection to complete (including the flusher loop).
// Defaults to 1m.
FlusherTimeout time.Duration
// PingInterval is the period at which the client will be sending ping
@ -475,6 +495,9 @@ type Options struct {
// IgnoreAuthErrorAbort - if set to true, client opts out of the default connect behavior of aborting
// subsequent reconnect attempts if server returns the same auth error twice (regardless of reconnect policy).
IgnoreAuthErrorAbort bool
// SkipHostLookup skips the DNS lookup for the server hostname.
SkipHostLookup bool
}
const (
@ -511,31 +534,32 @@ type Conn struct {
mu sync.RWMutex
// Opts holds the configuration of the Conn.
// Modifying the configuration of a running Conn is a race.
Opts Options
wg sync.WaitGroup
srvPool []*srv
current *srv
urls map[string]struct{} // Keep track of all known URLs (used by processInfo)
conn net.Conn
bw *natsWriter
br *natsReader
fch chan struct{}
info serverInfo
ssid int64
subsMu sync.RWMutex
subs map[int64]*Subscription
ach *asyncCallbacksHandler
pongs []chan struct{}
scratch [scratchSize]byte
status Status
initc bool // true if the connection is performing the initial connect
err error
ps *parseState
ptmr *time.Timer
pout int
ar bool // abort reconnect
rqch chan struct{}
ws bool // true if a websocket connection
Opts Options
wg sync.WaitGroup
srvPool []*srv
current *srv
urls map[string]struct{} // Keep track of all known URLs (used by processInfo)
conn net.Conn
bw *natsWriter
br *natsReader
fch chan struct{}
info serverInfo
ssid int64
subsMu sync.RWMutex
subs map[int64]*Subscription
ach *asyncCallbacksHandler
pongs []chan struct{}
scratch [scratchSize]byte
status Status
statListeners map[Status][]chan Status
initc bool // true if the connection is performing the initial connect
err error
ps *parseState
ptmr *time.Timer
pout int
ar bool // abort reconnect
rqch chan struct{}
ws bool // true if a websocket connection
// New style response handler
respSub string // The wildcard subject
@ -599,7 +623,7 @@ type Subscription struct {
pHead *Msg
pTail *Msg
pCond *sync.Cond
pDone func()
pDone func(subject string)
// Pending stats, async subscriptions, high-speed etc.
pMsgs int
@ -673,6 +697,15 @@ func (m *Msg) Equal(msg *Msg) bool {
return true
}
// Size returns a message size in bytes.
func (m *Msg) Size() int {
if m.wsz != 0 {
return m.wsz
}
hdr, _ := m.headerBytes()
return len(m.Subject) + len(m.Reply) + len(hdr) + len(m.Data)
}
func (m *Msg) headerBytes() ([]byte, error) {
var hdr []byte
if len(m.Header) == 0 {
@ -835,21 +868,27 @@ func Secure(tls ...*tls.Config) Option {
// If Secure is not already set this will set it as well.
func RootCAs(file ...string) Option {
return func(o *Options) error {
pool := x509.NewCertPool()
for _, f := range file {
rootPEM, err := os.ReadFile(f)
if err != nil || rootPEM == nil {
return fmt.Errorf("nats: error loading or parsing rootCA file: %v", err)
}
ok := pool.AppendCertsFromPEM(rootPEM)
if !ok {
return fmt.Errorf("nats: failed to parse root certificate from %q", f)
rootCAsCB := func() (*x509.CertPool, error) {
pool := x509.NewCertPool()
for _, f := range file {
rootPEM, err := os.ReadFile(f)
if err != nil || rootPEM == nil {
return nil, fmt.Errorf("nats: error loading or parsing rootCA file: %w", err)
}
ok := pool.AppendCertsFromPEM(rootPEM)
if !ok {
return nil, fmt.Errorf("nats: failed to parse root certificate from %q", f)
}
}
return pool, nil
}
if o.TLSConfig == nil {
o.TLSConfig = &tls.Config{MinVersion: tls.VersionTLS12}
}
o.TLSConfig.RootCAs = pool
if _, err := rootCAsCB(); err != nil {
return err
}
o.RootCAsCB = rootCAsCB
o.Secure = true
return nil
}
@ -859,18 +898,24 @@ func RootCAs(file ...string) Option {
// If Secure is not already set this will set it as well.
func ClientCert(certFile, keyFile string) Option {
return func(o *Options) error {
cert, err := tls.LoadX509KeyPair(certFile, keyFile)
if err != nil {
return fmt.Errorf("nats: error loading client certificate: %v", err)
}
cert.Leaf, err = x509.ParseCertificate(cert.Certificate[0])
if err != nil {
return fmt.Errorf("nats: error parsing client certificate: %v", err)
tlsCertCB := func() (tls.Certificate, error) {
cert, err := tls.LoadX509KeyPair(certFile, keyFile)
if err != nil {
return tls.Certificate{}, fmt.Errorf("nats: error loading client certificate: %w", err)
}
cert.Leaf, err = x509.ParseCertificate(cert.Certificate[0])
if err != nil {
return tls.Certificate{}, fmt.Errorf("nats: error parsing client certificate: %w", err)
}
return cert, nil
}
if o.TLSConfig == nil {
o.TLSConfig = &tls.Config{MinVersion: tls.VersionTLS12}
}
o.TLSConfig.Certificates = []tls.Certificate{cert}
if _, err := tlsCertCB(); err != nil {
return err
}
o.TLSCertCB = tlsCertCB
o.Secure = true
return nil
}
@ -911,6 +956,7 @@ func ReconnectWait(t time.Duration) Option {
}
// MaxReconnects is an Option to set the maximum number of reconnect attempts.
// If negative, it will never stop trying to reconnect.
// Defaults to 60.
func MaxReconnects(max int) Option {
return func(o *Options) error {
@ -958,7 +1004,7 @@ func MaxPingsOutstanding(max int) Option {
}
// ReconnectBufSize sets the buffer size of messages kept while busy reconnecting.
// Defaults to 8388608 bytes (8MB).
// Defaults to 8388608 bytes (8MB). It can be disabled by setting it to -1.
func ReconnectBufSize(size int) Option {
return func(o *Options) error {
o.ReconnectBufSize = size
@ -1113,7 +1159,7 @@ func UserJWTAndSeed(jwt string, seed string) Option {
sigCB := func(nonce []byte) ([]byte, error) {
kp, err := nkeys.FromSeed([]byte(seed))
if err != nil {
return nil, fmt.Errorf("unable to extract key pair from seed: %v", err)
return nil, fmt.Errorf("unable to extract key pair from seed: %w", err)
}
// Wipe our key on exit.
defer kp.Wipe()
@ -1136,6 +1182,12 @@ func UserJWT(userCB UserJWTHandler, sigCB SignatureHandler) Option {
if sigCB == nil {
return ErrUserButNoSigCB
}
// Smoke test the user callback to ensure it is setup properly
// when processing options.
if _, err := userCB(); err != nil {
return err
}
o.UserJWT = userCB
o.SignatureCB = sigCB
return nil
@ -1262,6 +1314,25 @@ func IgnoreAuthErrorAbort() Option {
}
}
// SkipHostLookup is an Option to skip the host lookup when connecting to a server.
func SkipHostLookup() Option {
return func(o *Options) error {
o.SkipHostLookup = true
return nil
}
}
// TLSHandshakeFirst is an Option to perform the TLS handshake first, that is
// before receiving the INFO protocol. This requires the server to also be
// configured with such option, otherwise the connection will fail.
func TLSHandshakeFirst() Option {
return func(o *Options) error {
o.TLSHandshakeFirst = true
o.Secure = true
return nil
}
}
// Handler processing
// SetDisconnectHandler will set the disconnect event handler.
@ -1428,6 +1499,12 @@ func (o Options) Connect() (*Conn, error) {
}
}
// If the TLSHandshakeFirst option is specified, make sure that
// the Secure boolean is true.
if nc.Opts.TLSHandshakeFirst {
nc.Opts.Secure = true
}
if err := nc.setupServerPool(); err != nil {
return nil, err
}
@ -1895,7 +1972,7 @@ func (nc *Conn) createConn() (err error) {
hosts := []string{}
u := nc.current.url
if net.ParseIP(u.Hostname()) == nil {
if !nc.Opts.SkipHostLookup && net.ParseIP(u.Hostname()) == nil {
addrs, _ := net.LookupHost(u.Hostname())
for _, addr := range addrs {
hosts = append(hosts, net.JoinHostPort(addr, u.Port()))
@ -1956,11 +2033,23 @@ func (nc *Conn) makeTLSConn() error {
}
}
// Allow the user to configure their own tls.Config structure.
var tlsCopy *tls.Config
tlsCopy := &tls.Config{}
if nc.Opts.TLSConfig != nil {
tlsCopy = util.CloneTLSConfig(nc.Opts.TLSConfig)
} else {
tlsCopy = &tls.Config{}
}
if nc.Opts.TLSCertCB != nil {
cert, err := nc.Opts.TLSCertCB()
if err != nil {
return err
}
tlsCopy.Certificates = []tls.Certificate{cert}
}
if nc.Opts.RootCAsCB != nil {
rootCAs, err := nc.Opts.RootCAsCB()
if err != nil {
return err
}
tlsCopy.RootCAs = rootCAs
}
// If its blank we will override it with the current host
if tlsCopy.ServerName == _EMPTY_ {
@ -2168,7 +2257,15 @@ func (nc *Conn) processConnectInit() error {
defer nc.conn.SetDeadline(time.Time{})
// Set our status to connecting.
nc.status = CONNECTING
nc.changeConnStatus(CONNECTING)
// If we need to have a TLS connection and want the TLS handshake to occur
// first, do it now.
if nc.Opts.Secure && nc.Opts.TLSHandshakeFirst {
if err := nc.makeTLSConn(); err != nil {
return err
}
}
// Process the INFO protocol received from the server
err := nc.processExpectedInfo()
@ -2260,7 +2357,7 @@ func (nc *Conn) connect() (bool, error) {
nc.initc = false
} else if nc.Opts.RetryOnFailedConnect {
nc.setup()
nc.status = RECONNECTING
nc.changeConnStatus(RECONNECTING)
nc.bw.switchToPending()
go nc.doReconnect(ErrNoServers)
err = nil
@ -2286,8 +2383,13 @@ func (nc *Conn) checkForSecure() error {
o.Secure = true
}
// Need to rewrap with bufio
if o.Secure {
// If TLS handshake first is true, we have already done
// the handshake, so we are done here.
if o.TLSHandshakeFirst {
return nil
}
// Need to rewrap with bufio
if err := nc.makeTLSConn(); err != nil {
return err
}
@ -2382,7 +2484,7 @@ func (nc *Conn) connectProto() (string, error) {
}
sigraw, err := o.SignatureCB([]byte(nc.info.Nonce))
if err != nil {
return _EMPTY_, fmt.Errorf("error signing nonce: %v", err)
return _EMPTY_, fmt.Errorf("error signing nonce: %w", err)
}
sig = base64.RawURLEncoding.EncodeToString(sigraw)
}
@ -2439,6 +2541,9 @@ func (nc *Conn) sendConnect() error {
// Construct the CONNECT protocol string
cProto, err := nc.connectProto()
if err != nil {
if !nc.initc && nc.Opts.AsyncErrorCB != nil {
nc.ach.push(func() { nc.Opts.AsyncErrorCB(nc, nil, err) })
}
return err
}
@ -2453,6 +2558,9 @@ func (nc *Conn) sendConnect() error {
// reading byte-by-byte here is ok.
proto, err := nc.readProto()
if err != nil {
if !nc.initc && nc.Opts.AsyncErrorCB != nil {
nc.ach.push(func() { nc.Opts.AsyncErrorCB(nc, nil, err) })
}
return err
}
@ -2461,6 +2569,9 @@ func (nc *Conn) sendConnect() error {
// Read the rest now...
proto, err = nc.readProto()
if err != nil {
if !nc.initc && nc.Opts.AsyncErrorCB != nil {
nc.ach.push(func() { nc.Opts.AsyncErrorCB(nc, nil, err) })
}
return err
}
}
@ -2494,7 +2605,7 @@ func (nc *Conn) sendConnect() error {
}
// This is where we are truly connected.
nc.status = CONNECTED
nc.changeConnStatus(CONNECTED)
return nil
}
@ -2669,7 +2780,7 @@ func (nc *Conn) doReconnect(err error) {
if nc.ar {
break
}
nc.status = RECONNECTING
nc.changeConnStatus(RECONNECTING)
continue
}
@ -2687,7 +2798,7 @@ func (nc *Conn) doReconnect(err error) {
// Now send off and clear pending buffer
nc.err = nc.flushReconnectPendingItems()
if nc.err != nil {
nc.status = RECONNECTING
nc.changeConnStatus(RECONNECTING)
// Stop the ping timer (if set)
nc.stopPingTimer()
// Since processConnectInit() returned without error, the
@ -2740,7 +2851,7 @@ func (nc *Conn) processOpErr(err error) {
if nc.Opts.AllowReconnect && nc.status == CONNECTED {
// Set our new status
nc.status = RECONNECTING
nc.changeConnStatus(RECONNECTING)
// Stop ping timer if set
nc.stopPingTimer()
if nc.conn != nil {
@ -2759,7 +2870,7 @@ func (nc *Conn) processOpErr(err error) {
return
}
nc.status = DISCONNECTED
nc.changeConnStatus(DISCONNECTED)
nc.err = err
nc.mu.Unlock()
nc.close(CLOSED, true, nil)
@ -2958,7 +3069,7 @@ func (nc *Conn) waitForMsgs(s *Subscription) {
s.mu.Unlock()
if done != nil {
done()
done(s.Subject)
}
}
@ -2966,28 +3077,6 @@ func (nc *Conn) waitForMsgs(s *Subscription) {
// Return what is to be used. If we return nil the message will be dropped.
type msgFilter func(m *Msg) *Msg
func (nc *Conn) addMsgFilter(subject string, filter msgFilter) {
nc.subsMu.Lock()
defer nc.subsMu.Unlock()
if nc.filters == nil {
nc.filters = make(map[string]msgFilter)
}
nc.filters[subject] = filter
}
func (nc *Conn) removeMsgFilter(subject string) {
nc.subsMu.Lock()
defer nc.subsMu.Unlock()
if nc.filters != nil {
delete(nc.filters, subject)
if len(nc.filters) == 0 {
nc.filters = nil
}
}
}
// processMsg is called by parse and will place the msg on the
// appropriate channel/pending queue for processing. If the channel is full,
// or the pending queue is over the pending limits, the connection is
@ -3036,7 +3125,7 @@ func (nc *Conn) processMsg(data []byte) {
if nc.ps.ma.hdr > 0 {
hbuf := msgPayload[:nc.ps.ma.hdr]
msgPayload = msgPayload[nc.ps.ma.hdr:]
h, err = decodeHeadersMsg(hbuf)
h, err = DecodeHeadersMsg(hbuf)
if err != nil {
// We will pass the message through but send async error.
nc.mu.Lock()
@ -3094,8 +3183,10 @@ func (nc *Conn) processMsg(data []byte) {
}
}
// Skip processing if this is a control message.
if !ctrlMsg {
// Skip processing if this is a control message and
// if not a pull consumer heartbeat. For pull consumers,
// heartbeats have to be handled on per request basis.
if !ctrlMsg || (jsi != nil && jsi.pull) {
var chanSubCheckFC bool
// Subscription internal stats (applicable only for non ChanSubscription's)
if sub.typ != ChanSubscription {
@ -3551,9 +3642,10 @@ const (
statusLen = 3 // e.g. 20x, 40x, 50x
)
// decodeHeadersMsg will decode and headers.
func decodeHeadersMsg(data []byte) (Header, error) {
tp := textproto.NewReader(bufio.NewReader(bytes.NewReader(data)))
// DecodeHeadersMsg will decode and headers.
func DecodeHeadersMsg(data []byte) (Header, error) {
br := bufio.NewReaderSize(bytes.NewReader(data), 128)
tp := textproto.NewReader(br)
l, err := tp.ReadLine()
if err != nil || len(l) < hdrPreEnd || l[:hdrPreEnd] != hdrLine[:hdrPreEnd] {
return nil, ErrBadHeaderMsg
@ -4382,6 +4474,14 @@ func (s *Subscription) AutoUnsubscribe(max int) error {
return conn.unsubscribe(s, max, false)
}
// SetClosedHandler will set the closed handler for when a subscription
// is closed (either unsubscribed or drained).
func (s *Subscription) SetClosedHandler(handler func(subject string)) {
s.mu.Lock()
s.pDone = handler
s.mu.Unlock()
}
// unsubscribe performs the low level unsubscribe to the server.
// Use Subscription.Unsubscribe()
func (nc *Conn) unsubscribe(sub *Subscription, max int, drainMode bool) error {
@ -5007,15 +5107,15 @@ func (nc *Conn) close(status Status, doCBs bool, err error) {
nc.subs = nil
nc.subsMu.Unlock()
nc.status = status
nc.changeConnStatus(status)
// Perform appropriate callback if needed for a disconnect.
if doCBs {
if nc.conn != nil {
if nc.Opts.DisconnectedErrCB != nil {
nc.ach.push(func() { nc.Opts.DisconnectedErrCB(nc, err) })
} else if nc.Opts.DisconnectedCB != nil {
nc.ach.push(func() { nc.Opts.DisconnectedCB(nc) })
if disconnectedErrCB := nc.Opts.DisconnectedErrCB; disconnectedErrCB != nil {
nc.ach.push(func() { disconnectedErrCB(nc, err) })
} else if disconnectedCB := nc.Opts.DisconnectedCB; disconnectedCB != nil {
nc.ach.push(func() { disconnectedCB(nc) })
}
}
if nc.Opts.ClosedCB != nil {
@ -5152,7 +5252,7 @@ func (nc *Conn) drainConnection() {
// Flip State
nc.mu.Lock()
nc.status = DRAINING_PUBS
nc.changeConnStatus(DRAINING_PUBS)
nc.mu.Unlock()
// Do publish drain via Flush() call.
@ -5187,7 +5287,7 @@ func (nc *Conn) Drain() error {
nc.mu.Unlock()
return nil
}
nc.status = DRAINING_SUBS
nc.changeConnStatus(DRAINING_SUBS)
go nc.drainConnection()
nc.mu.Unlock()
@ -5397,6 +5497,68 @@ func (nc *Conn) GetClientID() (uint64, error) {
return nc.info.CID, nil
}
// StatusChanged returns a channel on which given list of connection status changes will be reported.
// If no statuses are provided, defaults will be used: CONNECTED, RECONNECTING, DISCONNECTED, CLOSED.
func (nc *Conn) StatusChanged(statuses ...Status) chan Status {
if len(statuses) == 0 {
statuses = []Status{CONNECTED, RECONNECTING, DISCONNECTED, CLOSED}
}
ch := make(chan Status, 10)
for _, s := range statuses {
nc.registerStatusChangeListener(s, ch)
}
return ch
}
// registerStatusChangeListener registers a channel waiting for a specific status change event.
// Status change events are non-blocking - if no receiver is waiting for the status change,
// it will not be sent on the channel. Closed channels are ignored.
func (nc *Conn) registerStatusChangeListener(status Status, ch chan Status) {
nc.mu.Lock()
defer nc.mu.Unlock()
if nc.statListeners == nil {
nc.statListeners = make(map[Status][]chan Status)
}
if _, ok := nc.statListeners[status]; !ok {
nc.statListeners[status] = make([]chan Status, 0)
}
nc.statListeners[status] = append(nc.statListeners[status], ch)
}
// sendStatusEvent sends connection status event to all channels.
// If channel is closed, or there is no listener, sendStatusEvent
// will not block. Lock should be held entering.
func (nc *Conn) sendStatusEvent(s Status) {
Loop:
for i := 0; i < len(nc.statListeners[s]); i++ {
// make sure channel is not closed
select {
case <-nc.statListeners[s][i]:
// if chan is closed, remove it
nc.statListeners[s][i] = nc.statListeners[s][len(nc.statListeners[s])-1]
nc.statListeners[s] = nc.statListeners[s][:len(nc.statListeners[s])-1]
i--
continue Loop
default:
}
// only send event if someone's listening
select {
case nc.statListeners[s][i] <- s:
default:
}
}
}
// changeConnStatus changes connections status and sends events
// to all listeners. Lock should be held entering.
func (nc *Conn) changeConnStatus(status Status) {
if nc == nil {
return
}
nc.sendStatusEvent(status)
nc.status = status
}
// NkeyOptionFromSeed will load an nkey pair from a seed file.
// It will return the NKey Option and will handle
// signing of nonce challenges from the server. It will take
@ -5432,12 +5594,12 @@ func wipeSlice(buf []byte) {
func userFromFile(userFile string) (string, error) {
path, err := expandPath(userFile)
if err != nil {
return _EMPTY_, fmt.Errorf("nats: %v", err)
return _EMPTY_, fmt.Errorf("nats: %w", err)
}
contents, err := os.ReadFile(path)
if err != nil {
return _EMPTY_, fmt.Errorf("nats: %v", err)
return _EMPTY_, fmt.Errorf("nats: %w", err)
}
defer wipeSlice(contents)
return nkeys.ParseDecoratedJWT(contents)
@ -5486,7 +5648,7 @@ func expandPath(p string) (string, error) {
func nkeyPairFromSeedFile(seedFile string) (nkeys.KeyPair, error) {
contents, err := os.ReadFile(seedFile)
if err != nil {
return nil, fmt.Errorf("nats: %v", err)
return nil, fmt.Errorf("nats: %w", err)
}
defer wipeSlice(contents)
return nkeys.ParseDecoratedNKey(contents)
@ -5497,7 +5659,7 @@ func nkeyPairFromSeedFile(seedFile string) (nkeys.KeyPair, error) {
func sigHandler(nonce []byte, seedFile string) ([]byte, error) {
kp, err := nkeyPairFromSeedFile(seedFile)
if err != nil {
return nil, fmt.Errorf("unable to extract key pair from file %q: %v", seedFile, err)
return nil, fmt.Errorf("unable to extract key pair from file %q: %w", seedFile, err)
}
// Wipe our key on exit.
defer kp.Wipe()

View File

@ -23,7 +23,7 @@ import (
// Data will be encoded and decoded via the EncodedConn and its associated encoders.
// BindSendChan binds a channel for send operations to NATS.
func (c *EncodedConn) BindSendChan(subject string, channel interface{}) error {
func (c *EncodedConn) BindSendChan(subject string, channel any) error {
chVal := reflect.ValueOf(channel)
if chVal.Kind() != reflect.Chan {
return ErrChanArg
@ -61,17 +61,17 @@ func chPublish(c *EncodedConn, chVal reflect.Value, subject string) {
}
// BindRecvChan binds a channel for receive operations from NATS.
func (c *EncodedConn) BindRecvChan(subject string, channel interface{}) (*Subscription, error) {
func (c *EncodedConn) BindRecvChan(subject string, channel any) (*Subscription, error) {
return c.bindRecvChan(subject, _EMPTY_, channel)
}
// BindRecvQueueChan binds a channel for queue-based receive operations from NATS.
func (c *EncodedConn) BindRecvQueueChan(subject, queue string, channel interface{}) (*Subscription, error) {
func (c *EncodedConn) BindRecvQueueChan(subject, queue string, channel any) (*Subscription, error) {
return c.bindRecvChan(subject, queue, channel)
}
// Internal function to bind receive operations for a channel.
func (c *EncodedConn) bindRecvChan(subject, queue string, channel interface{}) (*Subscription, error) {
func (c *EncodedConn) bindRecvChan(subject, queue string, channel any) (*Subscription, error) {
chVal := reflect.ValueOf(channel)
if chVal.Kind() != reflect.Chan {
return nil, ErrChanArg

View File

@ -29,14 +29,11 @@ import (
"sync"
"time"
"github.com/nats-io/nats.go/internal/parser"
"github.com/nats-io/nuid"
)
// ObjectStoreManager creates, loads and deletes Object Stores
//
// Notice: Experimental Preview
//
// This functionality is EXPERIMENTAL and may be changed in later releases.
type ObjectStoreManager interface {
// ObjectStore will look up and bind to an existing object store instance.
ObjectStore(bucket string) (ObjectStore, error)
@ -52,10 +49,6 @@ type ObjectStoreManager interface {
// ObjectStore is a blob store capable of storing large objects efficiently in
// JetStream streams
//
// Notice: Experimental Preview
//
// This functionality is EXPERIMENTAL and may be changed in later releases.
type ObjectStore interface {
// Put will place the contents from the reader into a new object.
Put(obj *ObjectMeta, reader io.Reader, opts ...ObjectOpt) (*ObjectInfo, error)
@ -149,13 +142,17 @@ var (
// ObjectStoreConfig is the config for the object store.
type ObjectStoreConfig struct {
Bucket string
Description string
TTL time.Duration
MaxBytes int64
Storage StorageType
Replicas int
Placement *Placement
Bucket string `json:"bucket"`
Description string `json:"description,omitempty"`
TTL time.Duration `json:"max_age,omitempty"`
MaxBytes int64 `json:"max_bytes,omitempty"`
Storage StorageType `json:"storage,omitempty"`
Replicas int `json:"num_replicas,omitempty"`
Placement *Placement `json:"placement,omitempty"`
// Bucket-specific metadata
// NOTE: Metadata requires nats-server v2.10.0+
Metadata map[string]string `json:"metadata,omitempty"`
}
type ObjectStoreStatus interface {
@ -175,6 +172,8 @@ type ObjectStoreStatus interface {
Size() uint64
// BackingStore provides details about the underlying storage
BackingStore() string
// Metadata is the user supplied metadata for the bucket
Metadata() map[string]string
}
// ObjectMetaOptions
@ -185,9 +184,10 @@ type ObjectMetaOptions struct {
// ObjectMeta is high level information about an object.
type ObjectMeta struct {
Name string `json:"name"`
Description string `json:"description,omitempty"`
Headers Header `json:"headers,omitempty"`
Name string `json:"name"`
Description string `json:"description,omitempty"`
Headers Header `json:"headers,omitempty"`
Metadata map[string]string `json:"metadata,omitempty"`
// Optional options.
Opts *ObjectMetaOptions `json:"options,omitempty"`
@ -279,6 +279,7 @@ func (js *js) CreateObjectStore(cfg *ObjectStoreConfig) (ObjectStore, error) {
Discard: DiscardNew,
AllowRollup: true,
AllowDirect: true,
Metadata: cfg.Metadata,
}
// Create our stream.
@ -368,14 +369,23 @@ func (obs *obs) Put(meta *ObjectMeta, r io.Reader, opts ...ObjectOpt) (*ObjectIn
return perr
}
purgePartial := func() { obs.js.purgeStream(obs.stream, &StreamPurgeRequest{Subject: chunkSubj}) }
// Create our own JS context to handle errors etc.
js, err := obs.js.nc.JetStream(PublishAsyncErrHandler(func(js JetStream, _ *Msg, err error) { setErr(err) }))
jetStream, err := obs.js.nc.JetStream(PublishAsyncErrHandler(func(js JetStream, _ *Msg, err error) { setErr(err) }))
if err != nil {
return nil, err
}
defer jetStream.(*js).cleanupReplySub()
purgePartial := func() {
// wait until all pubs are complete or up to default timeout before attempting purge
select {
case <-jetStream.PublishAsyncComplete():
case <-time.After(obs.js.opts.wait):
}
obs.js.purgeStream(obs.stream, &StreamPurgeRequest{Subject: chunkSubj})
}
m, h := NewMsg(chunkSubj), sha256.New()
chunk, sent, total := make([]byte, meta.Opts.ChunkSize), 0, uint64(0)
@ -416,7 +426,7 @@ func (obs *obs) Put(meta *ObjectMeta, r io.Reader, opts ...ObjectOpt) (*ObjectIn
h.Write(m.Data)
// Send msg itself.
if _, err := js.PublishMsgAsync(m); err != nil {
if _, err := jetStream.PublishMsgAsync(m); err != nil {
purgePartial()
return nil, err
}
@ -451,7 +461,7 @@ func (obs *obs) Put(meta *ObjectMeta, r io.Reader, opts ...ObjectOpt) (*ObjectIn
}
// Publish the meta message.
_, err = js.PublishMsgAsync(mm)
_, err = jetStream.PublishMsgAsync(mm)
if err != nil {
if r != nil {
purgePartial()
@ -461,7 +471,7 @@ func (obs *obs) Put(meta *ObjectMeta, r io.Reader, opts ...ObjectOpt) (*ObjectIn
// Wait for all to be processed.
select {
case <-js.PublishAsyncComplete():
case <-jetStream.PublishAsyncComplete():
if err := getErr(); err != nil {
if r != nil {
purgePartial()
@ -612,6 +622,7 @@ func (obs *obs) Get(name string, opts ...GetObjectOpt) (ObjectResult, error) {
result.digest = sha256.New()
processChunk := func(m *Msg) {
var err error
if ctx != nil {
select {
case <-ctx.Done():
@ -628,7 +639,7 @@ func (obs *obs) Get(name string, opts ...GetObjectOpt) (ObjectResult, error) {
}
}
tokens, err := getMetadataFields(m.Reply)
tokens, err := parser.GetMetadataFields(m.Reply)
if err != nil {
gotErr(m, err)
return
@ -647,7 +658,7 @@ func (obs *obs) Get(name string, opts ...GetObjectOpt) (ObjectResult, error) {
result.digest.Write(m.Data)
// Check if we are done.
if tokens[ackNumPendingTokenPos] == objNoPending {
if tokens[parser.AckNumPendingTokenPos] == objNoPending {
pw.Close()
m.Sub.Unsubscribe()
}
@ -963,6 +974,7 @@ func (obs *obs) UpdateMeta(name string, meta *ObjectMeta) error {
info.Name = meta.Name
info.Description = meta.Description
info.Headers = meta.Headers
info.Metadata = meta.Metadata
// Prepare the meta message
if err = publishMeta(info, obs.js); err != nil {
@ -1045,6 +1057,8 @@ func (obs *obs) Watch(opts ...WatchOpt) (ObjectWatcher, error) {
w.updates <- &info
}
// if UpdatesOnly is set, no not send nil to the channel
// as it would always be triggered after initializing the watcher
if !initDoneMarker && meta.NumPending == 0 {
initDoneMarker = true
w.updates <- nil
@ -1053,9 +1067,17 @@ func (obs *obs) Watch(opts ...WatchOpt) (ObjectWatcher, error) {
allMeta := fmt.Sprintf(objAllMetaPreTmpl, obs.name)
_, err := obs.js.GetLastMsg(obs.stream, allMeta)
if err == ErrMsgNotFound {
// if there are no messages on the stream and we are not watching
// updates only, send nil to the channel to indicate that the initial
// watch is done
if !o.updatesOnly {
if errors.Is(err, ErrMsgNotFound) {
initDoneMarker = true
w.updates <- nil
}
} else {
// if UpdatesOnly was used, mark initialization as complete
initDoneMarker = true
w.updates <- nil
}
// Used ordered consumer to deliver results.
@ -1063,6 +1085,9 @@ func (obs *obs) Watch(opts ...WatchOpt) (ObjectWatcher, error) {
if !o.includeHistory {
subOpts = append(subOpts, DeliverLastPerSubject())
}
if o.updatesOnly {
subOpts = append(subOpts, DeliverNew())
}
sub, err := obs.js.Subscribe(allMeta, update, subOpts...)
if err != nil {
return nil, err
@ -1173,6 +1198,9 @@ func (s *ObjectBucketStatus) Size() uint64 { return s.nfo.State.Bytes }
// BackingStore indicates what technology is used for storage of the bucket
func (s *ObjectBucketStatus) BackingStore() string { return "JetStream" }
// Metadata is the metadata supplied when creating the bucket
func (s *ObjectBucketStatus) Metadata() map[string]string { return s.nfo.Config.Metadata }
// StreamInfo is the stream info retrieved to create the status
func (s *ObjectBucketStatus) StreamInfo() *StreamInfo { return s.nfo }
@ -1207,7 +1235,7 @@ func (o *objResult) Read(p []byte) (n int, err error) {
}
}
if o.err != nil {
return 0, err
return 0, o.err
}
if o.r == nil {
return 0, io.EOF

View File

@ -1,4 +1,4 @@
// Copyright 2012-2122 The NATS Authors
// Copyright 2012-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at

29
gateway/vendor/github.com/nats-io/nats.go/rand.go generated vendored Normal file
View File

@ -0,0 +1,29 @@
// Copyright 2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build !go1.20
// +build !go1.20
// A Go client for the NATS messaging system (https://nats.io).
package nats
import (
"math/rand"
"time"
)
func init() {
// This is not needed since Go 1.20 because now rand.Seed always happens
// by default (uses runtime.fastrand64 instead as source).
rand.Seed(time.Now().UnixNano())
}

View File

@ -0,0 +1,59 @@
// Copyright 2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build internal_testing
// +build internal_testing
// Functions in this file are only available when building nats.go with the
// internal_testing build tag. They are used by the nats.go test suite.
package nats
// AddMsgFilter adds a message filter for the given subject
// to the connection. The filter will be called for each
// message received on the subject. If the filter returns
// nil, the message will be dropped.
func (nc *Conn) AddMsgFilter(subject string, filter msgFilter) {
nc.subsMu.Lock()
defer nc.subsMu.Unlock()
if nc.filters == nil {
nc.filters = make(map[string]msgFilter)
}
nc.filters[subject] = filter
}
// RemoveMsgFilter removes a message filter for the given subject.
func (nc *Conn) RemoveMsgFilter(subject string) {
nc.subsMu.Lock()
defer nc.subsMu.Unlock()
if nc.filters != nil {
delete(nc.filters, subject)
if len(nc.filters) == 0 {
nc.filters = nil
}
}
}
// IsJSControlMessage returns true if the message is a JetStream control message.
func IsJSControlMessage(msg *Msg) (bool, int) {
return isJSControlMessage(msg)
}
// CloseTCPConn closes the underlying TCP connection.
// It can be used to simulate a disconnect.
func (nc *Conn) CloseTCPConn() {
nc.mu.Lock()
defer nc.mu.Unlock()
nc.conn.Close()
}

View File

@ -1,4 +1,4 @@
// Copyright 2021-2022 The NATS Authors
// Copyright 2021-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@ -16,7 +16,6 @@ package nats
import (
"bufio"
"bytes"
"compress/flate"
"crypto/rand"
"crypto/sha1"
"encoding/base64"
@ -30,6 +29,8 @@ import (
"strings"
"time"
"unicode/utf8"
"github.com/klauspost/compress/flate"
)
type wsOpCode int
@ -448,8 +449,12 @@ func (w *websocketWriter) Write(p []byte) (int, error) {
} else {
w.compressor.Reset(buf)
}
w.compressor.Write(p)
w.compressor.Close()
if n, err = w.compressor.Write(p); err != nil {
return n, err
}
if err = w.compressor.Flush(); err != nil {
return n, err
}
b := buf.Bytes()
p = b[:len(b)-4]
}
@ -550,7 +555,7 @@ func wsFillFrameHeader(fh []byte, compressed bool, frameType wsOpCode, l int) (i
func (nc *Conn) wsInitHandshake(u *url.URL) error {
compress := nc.Opts.Compression
tlsRequired := u.Scheme == wsSchemeTLS || nc.Opts.Secure || nc.Opts.TLSConfig != nil
tlsRequired := u.Scheme == wsSchemeTLS || nc.Opts.Secure || nc.Opts.TLSConfig != nil || nc.Opts.TLSCertCB != nil || nc.Opts.RootCAsCB != nil
// Do TLS here as needed.
if tlsRequired {
if err := nc.makeTLSConn(); err != nil {
@ -692,6 +697,9 @@ func (nc *Conn) wsEnqueueCloseMsgLocked(status int, payload string) {
wr.cm = frame
wr.cmDone = true
nc.bw.flush()
if c := wr.compressor; c != nil {
c.Close()
}
}
func (nc *Conn) wsEnqueueControlMsg(needsLock bool, frameType wsOpCode, payload []byte) {

View File

@ -13,3 +13,4 @@ build/
# Project-local glide cache, RE: https://github.com/Masterminds/glide/issues/736
.glide/
.idea/

View File

@ -6,23 +6,45 @@ release:
name_template: '{{.Tag}}'
draft: true
builds:
- main: ./nk/main.go
- id: nk
main: ./nk/main.go
ldflags: "-X main.Version={{.Tag}}_{{.Commit}}"
binary: nk
goos:
- linux
- darwin
- linux
- windows
- freebsd
goarch:
- amd64
- arm
- arm64
- 386
- mips64le
- s390x
goarm:
- 6
- 7
ignore:
- goos: darwin
goarch: 386
- goos: freebsd
goarch: arm
- goos: freebsd
goarch: arm64
- goos: freebsd
goarch: 386
dist: build
archive:
wrap_in_directory: true
name_template: '{{ .ProjectName }}-v{{ .Version }}-{{ .Os }}-{{ .Arch }}{{ if .Arm
archives:
- name_template: '{{ .ProjectName }}-v{{ .Version }}-{{ .Os }}-{{ .Arch }}{{ if .Arm
}}v{{ .Arm }}{{ end }}'
format: zip
wrap_in_directory: true
format: zip
files:
- README.md
- LICENSE
checksum:
name_template: '{{ .ProjectName }}-v{{ .Version }}-checksums.txt'
@ -30,9 +52,12 @@ checksum:
snapshot:
name_template: 'dev'
nfpm:
formats:
- deb
bindir: /usr/local/bin
description: NKeys utility cli program
vendor: nats-io
nfpms:
- file_name_template: '{{ .ProjectName }}-v{{ .Version }}-{{ .Arch }}{{ if .Arm
}}v{{ .Arm }}{{ end }}'
maintainer: nats.io
description: NKeys utility cli program
vendor: nats-io
bindir: /usr/local/bin
formats:
- deb

View File

@ -1,35 +0,0 @@
language: go
sudo: false
arch:
- amd64
- ppc64le
go:
- 1.16.x
- 1.15.x
install:
- go get -t ./...
- go get github.com/mattn/goveralls
- go get -u honnef.co/go/tools/cmd/staticcheck
- go get -u github.com/client9/misspell/cmd/misspell
before_script:
- $(exit $(go fmt ./... | wc -l))
- go vet ./...
- misspell -error -locale US .
- staticcheck ./...
script:
- go test -v
- go test -v --race
- go test -v -covermode=count -coverprofile=coverage.out
- $HOME/gopath/bin/goveralls -coverprofile coverage.out -service travis-ci
#deploy:
#- provider: script
# skip_cleanup: true
# script: curl -sL http://git.io/goreleaser | bash
# on:
# tags: true
# condition: $TRAVIS_OS_NAME = linux

View File

@ -1,9 +1,9 @@
# NKEYS
[![License Apache 2](https://img.shields.io/badge/License-Apache2-blue.svg)](https://www.apache.org/licenses/LICENSE-2.0)
[![ReportCard](http://goreportcard.com/badge/nats-io/nkeys)](http://goreportcard.com/report/nats-io/nkeys)
[![Build Status](https://travis-ci.com/nats-io/nkeys.svg?branch=master)](http://travis-ci.com/nats-io/nkeys)
[![GoDoc](http://godoc.org/github.com/nats-io/nkeys?status.svg)](http://godoc.org/github.com/nats-io/nkeys)
[![Go Report Card](https://goreportcard.com/badge/github.com/nats-io/nkeys)](https://goreportcard.com/report/github.com/nats-io/nkeys)
[![Build Status](https://app.travis-ci.com/nats-io/nkeys.svg?branch=master)](https://app.travis-ci.com/nats-io/nkeys)
[![GoDoc](https://godoc.org/github.com/nats-io/nkeys?status.svg)](https://godoc.org/github.com/nats-io/nkeys)
[![Coverage Status](https://coveralls.io/repos/github/nats-io/nkeys/badge.svg?branch=master&service=github)](https://coveralls.io/github/nats-io/nkeys?branch=master)
A public-key signature system based on [Ed25519](https://ed25519.cr.yp.to/) for the NATS ecosystem.

View File

@ -13,15 +13,8 @@
package nkeys
import (
"errors"
)
// An implementation of crc16 according to CCITT standards for XMODEM.
// ErrInvalidChecksum indicates a failed verification.
var ErrInvalidChecksum = errors.New("nkeys: invalid checksum")
var crc16tab = [256]uint16{
0x0000, 0x1021, 0x2042, 0x3063, 0x4084, 0x50a5, 0x60c6, 0x70e7,
0x8108, 0x9129, 0xa14a, 0xb16b, 0xc18c, 0xd1ad, 0xe1ce, 0xf1ef,

View File

@ -2,8 +2,8 @@ package nkeys
import (
"bytes"
"errors"
"regexp"
"strings"
)
var userConfigRE = regexp.MustCompile(`\s*(?:(?:[-]{3,}.*[-]{3,}\r?\n)([\w\-.=]+)(?:\r?\n[-]{3,}.*[-]{3,}\r?\n))`)
@ -19,7 +19,7 @@ func ParseDecoratedJWT(contents []byte) (string, error) {
raw := items[0][1]
tmp := make([]byte, len(raw))
copy(tmp, raw)
return string(tmp), nil
return strings.TrimSpace(string(tmp)), nil
}
// ParseDecoratedNKey takes a creds file, finds the NKey portion and creates a
@ -42,12 +42,12 @@ func ParseDecoratedNKey(contents []byte) (KeyPair, error) {
}
}
if seed == nil {
return nil, errors.New("no nkey seed found")
return nil, ErrNoSeedFound
}
if !bytes.HasPrefix(seed, []byte("SO")) &&
!bytes.HasPrefix(seed, []byte("SA")) &&
!bytes.HasPrefix(seed, []byte("SU")) {
return nil, errors.New("doesn't contain a seed nkey")
return nil, ErrInvalidNkeySeed
}
kp, err := FromSeed(seed)
if err != nil {
@ -68,7 +68,7 @@ func ParseDecoratedUserNKey(contents []byte) (KeyPair, error) {
return nil, err
}
if !bytes.HasPrefix(seed, []byte("SU")) {
return nil, errors.New("doesn't contain an user seed nkey")
return nil, ErrInvalidUserSeed
}
kp, err := FromSeed(seed)
if err != nil {

View File

@ -0,0 +1,12 @@
# External Dependencies
This file lists the dependencies used in this repository.
| Dependency | License |
|-|-|
| Go | BSD 3-Clause "New" or "Revised" License |
| golang.org/x/crypto v0.3.0 | BSD 3-Clause "New" or "Revised" License |
| golang.org/x/net v0.2.0 | BSD 3-Clause "New" or "Revised" License |
| golang.org/x/sys v0.2.0 | BSD 3-Clause "New" or "Revised" License |
| golang.org/x/term v0.2.0 | BSD 3-Clause "New" or "Revised" License |
| golang.org/x/text v0.4.0 | BSD 3-Clause "New" or "Revised" License |

50
gateway/vendor/github.com/nats-io/nkeys/errors.go generated vendored Normal file
View File

@ -0,0 +1,50 @@
// Copyright 2022 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package nkeys
// Errors
const (
ErrInvalidPrefixByte = nkeysError("nkeys: invalid prefix byte")
ErrInvalidKey = nkeysError("nkeys: invalid key")
ErrInvalidPublicKey = nkeysError("nkeys: invalid public key")
ErrInvalidPrivateKey = nkeysError("nkeys: invalid private key")
ErrInvalidSeedLen = nkeysError("nkeys: invalid seed length")
ErrInvalidSeed = nkeysError("nkeys: invalid seed")
ErrInvalidEncoding = nkeysError("nkeys: invalid encoded key")
ErrInvalidSignature = nkeysError("nkeys: signature verification failed")
ErrCannotSign = nkeysError("nkeys: can not sign, no private key available")
ErrPublicKeyOnly = nkeysError("nkeys: no seed or private key available")
ErrIncompatibleKey = nkeysError("nkeys: incompatible key")
ErrInvalidChecksum = nkeysError("nkeys: invalid checksum")
ErrNoSeedFound = nkeysError("nkeys: no nkey seed found")
ErrInvalidNkeySeed = nkeysError("nkeys: doesn't contain a seed nkey")
ErrInvalidUserSeed = nkeysError("nkeys: doesn't contain an user seed nkey")
ErrInvalidRecipient = nkeysError("nkeys: not a valid recipient public curve key")
ErrInvalidSender = nkeysError("nkeys: not a valid sender public curve key")
ErrInvalidCurveKey = nkeysError("nkeys: not a valid curve key")
ErrInvalidCurveSeed = nkeysError("nkeys: not a valid curve seed")
ErrInvalidEncrypted = nkeysError("nkeys: encrypted input is not valid")
ErrInvalidEncVersion = nkeysError("nkeys: encrypted input wrong version")
ErrCouldNotDecrypt = nkeysError("nkeys: could not decrypt input")
ErrInvalidCurveKeyOperation = nkeysError("nkeys: curve key is not valid for sign/verify")
ErrInvalidNKeyOperation = nkeysError("nkeys: only curve key can seal/open")
ErrCannotOpen = nkeysError("nkeys: cannot open no private curve key available")
ErrCannotSeal = nkeysError("nkeys: cannot seal no private curve key available")
)
type nkeysError string
func (e nkeysError) Error() string {
return string(e)
}

View File

@ -1,4 +1,4 @@
// Copyright 2018 The NATS Authors
// Copyright 2018-2022 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@ -26,11 +26,25 @@ type kp struct {
seed []byte
}
// CreatePair will create a KeyPair based on the rand entropy and a type/prefix byte. rand can be nil.
func CreatePair(prefix PrefixByte) (KeyPair, error) {
var rawSeed [32]byte
// All seeds are 32 bytes long.
const seedLen = 32
_, err := io.ReadFull(rand.Reader, rawSeed[:])
// CreatePair will create a KeyPair based on the rand entropy and a type/prefix byte.
func CreatePair(prefix PrefixByte) (KeyPair, error) {
return CreatePairWithRand(prefix, rand.Reader)
}
// CreatePair will create a KeyPair based on the rand reader and a type/prefix byte. rand can be nil.
func CreatePairWithRand(prefix PrefixByte, rr io.Reader) (KeyPair, error) {
if prefix == PrefixByteCurve {
return CreateCurveKeysWithRand(rr)
}
if rr == nil {
rr = rand.Reader
}
var rawSeed [seedLen]byte
_, err := io.ReadFull(rr, rawSeed[:])
if err != nil {
return nil, err
}
@ -115,3 +129,18 @@ func (pair *kp) Verify(input []byte, sig []byte) error {
}
return nil
}
// Seal is only supported on CurveKeyPair
func (pair *kp) Seal(input []byte, recipient string) ([]byte, error) {
return nil, ErrInvalidNKeyOperation
}
// SealWithRand is only supported on CurveKeyPair
func (pair *kp) SealWithRand(input []byte, recipient string, rr io.Reader) ([]byte, error) {
return nil, ErrInvalidNKeyOperation
}
// Open is only supported on CurveKey
func (pair *kp) Open(input []byte, sender string) ([]byte, error) {
return nil, ErrInvalidNKeyOperation
}

View File

@ -13,37 +13,30 @@
// Package nkeys is an Ed25519 based public-key signature system that simplifies keys and seeds
// and performs signing and verification.
// It also supports encryption via x25519 keys and is compatible with https://pkg.go.dev/golang.org/x/crypto/nacl/box.
package nkeys
import (
"errors"
)
import "io"
// Version is our current version
const Version = "0.3.0"
// Errors
var (
ErrInvalidPrefixByte = errors.New("nkeys: invalid prefix byte")
ErrInvalidKey = errors.New("nkeys: invalid key")
ErrInvalidPublicKey = errors.New("nkeys: invalid public key")
ErrInvalidSeedLen = errors.New("nkeys: invalid seed length")
ErrInvalidSeed = errors.New("nkeys: invalid seed")
ErrInvalidEncoding = errors.New("nkeys: invalid encoded key")
ErrInvalidSignature = errors.New("nkeys: signature verification failed")
ErrCannotSign = errors.New("nkeys: can not sign, no private key available")
ErrPublicKeyOnly = errors.New("nkeys: no seed or private key available")
ErrIncompatibleKey = errors.New("nkeys: incompatible key")
)
const Version = "0.4.5"
// KeyPair provides the central interface to nkeys.
type KeyPair interface {
Seed() ([]byte, error)
PublicKey() (string, error)
PrivateKey() ([]byte, error)
// Sign is only supported on Non CurveKeyPairs
Sign(input []byte) ([]byte, error)
// Verify is only supported on Non CurveKeyPairs
Verify(input []byte, sig []byte) error
Wipe()
// Seal is only supported on CurveKeyPair
Seal(input []byte, recipient string) ([]byte, error)
// SealWithRand is only supported on CurveKeyPair
SealWithRand(input []byte, recipient string, rr io.Reader) ([]byte, error)
// Open is only supported on CurveKey
Open(input []byte, sender string) ([]byte, error)
}
// CreateUser will create a User typed KeyPair.
@ -86,10 +79,13 @@ func FromPublicKey(public string) (KeyPair, error) {
// FromSeed will create a KeyPair capable of signing and verifying signatures.
func FromSeed(seed []byte) (KeyPair, error) {
_, _, err := DecodeSeed(seed)
prefix, _, err := DecodeSeed(seed)
if err != nil {
return nil, err
}
if prefix == PrefixByteCurve {
return FromCurveSeed(seed)
}
copy := append([]byte{}, seed...)
return &kp{copy}, nil
}

View File

@ -64,3 +64,23 @@ func (p *pub) Wipe() {
p.pre = '0'
io.ReadFull(rand.Reader, p.pub)
}
func (p *pub) Seal(input []byte, recipient string) ([]byte, error) {
if p.pre == PrefixByteCurve {
return nil, ErrCannotSeal
}
return nil, ErrInvalidNKeyOperation
}
func (p *pub) SealWithRand(input []byte, _recipient string, rr io.Reader) ([]byte, error) {
if p.pre == PrefixByteCurve {
return nil, ErrCannotSeal
}
return nil, ErrInvalidNKeyOperation
}
func (p *pub) Open(input []byte, sender string) ([]byte, error) {
if p.pre == PrefixByteCurve {
return nil, ErrCannotOpen
}
return nil, ErrInvalidNKeyOperation
}

View File

@ -1,4 +1,4 @@
// Copyright 2018 The NATS Authors
// Copyright 2018-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@ -17,7 +17,6 @@ import (
"bytes"
"encoding/base32"
"encoding/binary"
"golang.org/x/crypto/ed25519"
)
// PrefixByte is a lead byte representing the type.
@ -45,8 +44,11 @@ const (
// PrefixByteUser is the version byte used for encoded NATS Users
PrefixByteUser PrefixByte = 20 << 3 // Base32-encodes to 'U...'
// PrefixByteCurve is the version byte used for encoded CurveKeys (X25519)
PrefixByteCurve PrefixByte = 23 << 3 // Base32-encodes to 'X...'
// PrefixByteUnknown is for unknown prefixes.
PrefixByteUnknown PrefixByte = 23 << 3 // Base32-encodes to 'X...'
PrefixByteUnknown PrefixByte = 25 << 3 // Base32-encodes to 'Z...'
)
// Set our encoding to not include padding '=='
@ -83,12 +85,13 @@ func Encode(prefix PrefixByte, src []byte) ([]byte, error) {
}
// EncodeSeed will encode a raw key with the prefix and then seed prefix and crc16 and then base32 encoded.
// `src` must be 32 bytes long (ed25519.SeedSize).
func EncodeSeed(public PrefixByte, src []byte) ([]byte, error) {
if err := checkValidPublicPrefixByte(public); err != nil {
return nil, err
}
if len(src) != ed25519.SeedSize {
if len(src) != seedLen {
return nil, ErrInvalidSeedLen
}
@ -134,22 +137,18 @@ func decode(src []byte) ([]byte, error) {
}
raw = raw[:n]
if len(raw) < 4 {
if n < 4 {
return nil, ErrInvalidEncoding
}
var crc uint16
checksum := bytes.NewReader(raw[len(raw)-2:])
if err := binary.Read(checksum, binary.LittleEndian, &crc); err != nil {
return nil, err
}
crc := binary.LittleEndian.Uint16(raw[n-2:])
// ensure checksum is valid
if err := validate(raw[0:len(raw)-2], crc); err != nil {
if err := validate(raw[0:n-2], crc); err != nil {
return nil, err
}
return raw[:len(raw)-2], nil
return raw[:n-2], nil
}
// Decode will decode the base32 string and check crc16 and enforce the prefix is what is expected.
@ -161,7 +160,8 @@ func Decode(expectedPrefix PrefixByte, src []byte) ([]byte, error) {
if err != nil {
return nil, err
}
if prefix := PrefixByte(raw[0]); prefix != expectedPrefix {
b1 := raw[0] & 248 // 248 = 11111000
if prefix := PrefixByte(b1); prefix != expectedPrefix {
return nil, ErrInvalidPrefixByte
}
return raw[1:], nil
@ -248,12 +248,18 @@ func IsValidPublicOperatorKey(src string) bool {
return err == nil
}
// IsValidPublicCurveKey will decode and verify the string is a valid encoded Public Curve Key.
func IsValidPublicCurveKey(src string) bool {
_, err := Decode(PrefixByteCurve, []byte(src))
return err == nil
}
// checkValidPrefixByte returns an error if the provided value
// is not one of the defined valid prefix byte constants.
func checkValidPrefixByte(prefix PrefixByte) error {
switch prefix {
case PrefixByteOperator, PrefixByteServer, PrefixByteCluster,
PrefixByteAccount, PrefixByteUser, PrefixByteSeed, PrefixBytePrivate:
PrefixByteAccount, PrefixByteUser, PrefixByteSeed, PrefixBytePrivate, PrefixByteCurve:
return nil
}
return ErrInvalidPrefixByte
@ -263,7 +269,7 @@ func checkValidPrefixByte(prefix PrefixByte) error {
// is not one of the public defined valid prefix byte constants.
func checkValidPublicPrefixByte(prefix PrefixByte) error {
switch prefix {
case PrefixByteServer, PrefixByteCluster, PrefixByteOperator, PrefixByteAccount, PrefixByteUser:
case PrefixByteOperator, PrefixByteServer, PrefixByteCluster, PrefixByteAccount, PrefixByteUser, PrefixByteCurve:
return nil
}
return ErrInvalidPrefixByte
@ -285,6 +291,8 @@ func (p PrefixByte) String() string {
return "seed"
case PrefixBytePrivate:
return "private"
case PrefixByteCurve:
return "x25519"
}
return "unknown"
}

184
gateway/vendor/github.com/nats-io/nkeys/xkeys.go generated vendored Normal file
View File

@ -0,0 +1,184 @@
// Copyright 2022 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package nkeys
import (
"bytes"
"crypto/rand"
"encoding/binary"
"io"
"golang.org/x/crypto/curve25519"
"golang.org/x/crypto/nacl/box"
)
// This package will support safe use of X25519 keys for asymmetric encryption.
// We will be compatible with nacl.Box, but generate random nonces automatically.
// We may add more advanced options in the future for group recipients and better
// end to end algorithms.
const (
curveKeyLen = 32
curveDecodeLen = 35
curveNonceLen = 24
)
type ckp struct {
seed [curveKeyLen]byte // Private raw key.
}
// CreateUser will create a User typed KeyPair.
func CreateCurveKeys() (KeyPair, error) {
return CreateCurveKeysWithRand(rand.Reader)
}
// CreateUser will create a User typed KeyPair with specified rand source.
func CreateCurveKeysWithRand(rr io.Reader) (KeyPair, error) {
var kp ckp
_, err := io.ReadFull(rr, kp.seed[:])
if err != nil {
return nil, err
}
return &kp, nil
}
// Will create a curve key pair from seed.
func FromCurveSeed(seed []byte) (KeyPair, error) {
pb, raw, err := DecodeSeed(seed)
if err != nil {
return nil, err
}
if pb != PrefixByteCurve || len(raw) != curveKeyLen {
return nil, ErrInvalidCurveSeed
}
var kp ckp
copy(kp.seed[:], raw)
return &kp, nil
}
// Seed will return the encoded seed.
func (pair *ckp) Seed() ([]byte, error) {
return EncodeSeed(PrefixByteCurve, pair.seed[:])
}
// PublicKey will return the encoded public key.
func (pair *ckp) PublicKey() (string, error) {
var pub [curveKeyLen]byte
curve25519.ScalarBaseMult(&pub, &pair.seed)
key, err := Encode(PrefixByteCurve, pub[:])
return string(key), err
}
// PrivateKey will return the encoded private key.
func (pair *ckp) PrivateKey() ([]byte, error) {
return Encode(PrefixBytePrivate, pair.seed[:])
}
func decodePubCurveKey(src string, dest [curveKeyLen]byte) error {
var raw [curveDecodeLen]byte // should always be 35
n, err := b32Enc.Decode(raw[:], []byte(src))
if err != nil {
return err
}
if n != curveDecodeLen {
return ErrInvalidCurveKey
}
// Make sure it is what we expected.
if prefix := PrefixByte(raw[0]); prefix != PrefixByteCurve {
return ErrInvalidPublicKey
}
var crc uint16
end := n - 2
sum := raw[end:n]
checksum := bytes.NewReader(sum)
if err := binary.Read(checksum, binary.LittleEndian, &crc); err != nil {
return err
}
// ensure checksum is valid
if err := validate(raw[:end], crc); err != nil {
return err
}
// Copy over, ignore prefix byte.
copy(dest[:], raw[1:end])
return nil
}
// Only version for now, but could add in X3DH in the future, etc.
const XKeyVersionV1 = "xkv1"
const vlen = len(XKeyVersionV1)
// Seal is compatible with nacl.Box.Seal() and can be used in similar situations for small messages.
// We generate the nonce from crypto rand by default.
func (pair *ckp) Seal(input []byte, recipient string) ([]byte, error) {
return pair.SealWithRand(input, recipient, rand.Reader)
}
func (pair *ckp) SealWithRand(input []byte, recipient string, rr io.Reader) ([]byte, error) {
var (
rpub [curveKeyLen]byte
nonce [curveNonceLen]byte
out [vlen + curveNonceLen]byte
err error
)
if err = decodePubCurveKey(recipient, rpub); err != nil {
return nil, ErrInvalidRecipient
}
if _, err := io.ReadFull(rr, nonce[:]); err != nil {
return nil, err
}
copy(out[:vlen], []byte(XKeyVersionV1))
copy(out[vlen:], nonce[:])
return box.Seal(out[:], input, &nonce, &rpub, &pair.seed), nil
}
func (pair *ckp) Open(input []byte, sender string) ([]byte, error) {
if len(input) <= vlen+curveNonceLen {
return nil, ErrInvalidEncrypted
}
var (
spub [curveKeyLen]byte
nonce [curveNonceLen]byte
err error
)
if !bytes.Equal(input[:vlen], []byte(XKeyVersionV1)) {
return nil, ErrInvalidEncVersion
}
copy(nonce[:], input[vlen:vlen+curveNonceLen])
if err = decodePubCurveKey(sender, spub); err != nil {
return nil, ErrInvalidSender
}
decrypted, ok := box.Open(nil, input[vlen+curveNonceLen:], &nonce, &spub, &pair.seed)
if !ok {
return nil, ErrCouldNotDecrypt
}
return decrypted, nil
}
// Wipe will randomize the contents of the secret key
func (pair *ckp) Wipe() {
io.ReadFull(rand.Reader, pair.seed[:])
}
func (pair *ckp) Sign(_ []byte) ([]byte, error) {
return nil, ErrInvalidCurveKeyOperation
}
func (pair *ckp) Verify(_ []byte, _ []byte) error {
return ErrInvalidCurveKeyOperation
}