mirror of
https://github.com/openfaas/faas.git
synced 2025-06-18 03:56:37 +00:00
Sync async_nats work with master
Signed-off-by: Alex Ellis <alexellis2@gmail.com>
This commit is contained in:
20
gateway/vendor/github.com/nats-io/go-nats/LICENSE
generated
vendored
Normal file
20
gateway/vendor/github.com/nats-io/go-nats/LICENSE
generated
vendored
Normal file
@ -0,0 +1,20 @@
|
||||
The MIT License (MIT)
|
||||
|
||||
Copyright (c) 2012-2016 Apcera Inc.
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy of
|
||||
this software and associated documentation files (the "Software"), to deal in
|
||||
the Software without restriction, including without limitation the rights to
|
||||
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
|
||||
the Software, and to permit persons to whom the Software is furnished to do so,
|
||||
subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
|
||||
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
|
||||
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
|
||||
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
|
||||
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
350
gateway/vendor/github.com/nats-io/go-nats/README.md
generated
vendored
Normal file
350
gateway/vendor/github.com/nats-io/go-nats/README.md
generated
vendored
Normal file
@ -0,0 +1,350 @@
|
||||
# NATS - Go Client
|
||||
A [Go](http://golang.org) client for the [NATS messaging system](https://nats.io).
|
||||
|
||||
[](http://opensource.org/licenses/MIT)
|
||||
[](https://goreportcard.com/report/github.com/nats-io/go-nats) [](http://travis-ci.org/nats-io/go-nats) [](http://godoc.org/github.com/nats-io/go-nats) [](https://coveralls.io/r/nats-io/go-nats?branch=master)
|
||||
|
||||
## Installation
|
||||
|
||||
```bash
|
||||
# Go client
|
||||
go get github.com/nats-io/go-nats
|
||||
|
||||
# Server
|
||||
go get github.com/nats-io/gnatsd
|
||||
```
|
||||
|
||||
## Basic Usage
|
||||
|
||||
```go
|
||||
|
||||
nc, _ := nats.Connect(nats.DefaultURL)
|
||||
|
||||
// Simple Publisher
|
||||
nc.Publish("foo", []byte("Hello World"))
|
||||
|
||||
// Simple Async Subscriber
|
||||
nc.Subscribe("foo", func(m *nats.Msg) {
|
||||
fmt.Printf("Received a message: %s\n", string(m.Data))
|
||||
})
|
||||
|
||||
// Simple Sync Subscriber
|
||||
sub, err := nc.SubscribeSync("foo")
|
||||
m, err := sub.NextMsg(timeout)
|
||||
|
||||
// Channel Subscriber
|
||||
ch := make(chan *nats.Msg, 64)
|
||||
sub, err := nc.ChanSubscribe("foo", ch)
|
||||
msg := <- ch
|
||||
|
||||
// Unsubscribe
|
||||
sub.Unsubscribe()
|
||||
|
||||
// Requests
|
||||
msg, err := nc.Request("help", []byte("help me"), 10*time.Millisecond)
|
||||
|
||||
// Replies
|
||||
nc.Subscribe("help", func(m *Msg) {
|
||||
nc.Publish(m.Reply, []byte("I can help!"))
|
||||
})
|
||||
|
||||
// Close connection
|
||||
nc, _ := nats.Connect("nats://localhost:4222")
|
||||
nc.Close();
|
||||
```
|
||||
|
||||
## Encoded Connections
|
||||
|
||||
```go
|
||||
|
||||
nc, _ := nats.Connect(nats.DefaultURL)
|
||||
c, _ := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
|
||||
defer c.Close()
|
||||
|
||||
// Simple Publisher
|
||||
c.Publish("foo", "Hello World")
|
||||
|
||||
// Simple Async Subscriber
|
||||
c.Subscribe("foo", func(s string) {
|
||||
fmt.Printf("Received a message: %s\n", s)
|
||||
})
|
||||
|
||||
// EncodedConn can Publish any raw Go type using the registered Encoder
|
||||
type person struct {
|
||||
Name string
|
||||
Address string
|
||||
Age int
|
||||
}
|
||||
|
||||
// Go type Subscriber
|
||||
c.Subscribe("hello", func(p *person) {
|
||||
fmt.Printf("Received a person: %+v\n", p)
|
||||
})
|
||||
|
||||
me := &person{Name: "derek", Age: 22, Address: "140 New Montgomery Street, San Francisco, CA"}
|
||||
|
||||
// Go type Publisher
|
||||
c.Publish("hello", me)
|
||||
|
||||
// Unsubscribe
|
||||
sub, err := c.Subscribe("foo", nil)
|
||||
...
|
||||
sub.Unsubscribe()
|
||||
|
||||
// Requests
|
||||
var response string
|
||||
err := c.Request("help", "help me", &response, 10*time.Millisecond)
|
||||
if err != nil {
|
||||
fmt.Printf("Request failed: %v\n", err)
|
||||
}
|
||||
|
||||
// Replying
|
||||
c.Subscribe("help", func(subj, reply string, msg string) {
|
||||
c.Publish(reply, "I can help!")
|
||||
})
|
||||
|
||||
// Close connection
|
||||
c.Close();
|
||||
```
|
||||
|
||||
## TLS
|
||||
|
||||
```go
|
||||
// tls as a scheme will enable secure connections by default. This will also verify the server name.
|
||||
nc, err := nats.Connect("tls://nats.demo.io:4443")
|
||||
|
||||
// If you are using a self-signed certificate, you need to have a tls.Config with RootCAs setup.
|
||||
// We provide a helper method to make this case easier.
|
||||
nc, err = nats.Connect("tls://localhost:4443", nats.RootCAs("./configs/certs/ca.pem"))
|
||||
|
||||
// If the server requires client certificate, there is an helper function for that too:
|
||||
cert := nats.ClientCert("./configs/certs/client-cert.pem", "./configs/certs/client-key.pem")
|
||||
nc, err = nats.Connect("tls://localhost:4443", cert)
|
||||
|
||||
// You can also supply a complete tls.Config
|
||||
|
||||
certFile := "./configs/certs/client-cert.pem"
|
||||
keyFile := "./configs/certs/client-key.pem"
|
||||
cert, err := tls.LoadX509KeyPair(certFile, keyFile)
|
||||
if err != nil {
|
||||
t.Fatalf("error parsing X509 certificate/key pair: %v", err)
|
||||
}
|
||||
|
||||
config := &tls.Config{
|
||||
ServerName: opts.Host,
|
||||
Certificates: []tls.Certificate{cert},
|
||||
RootCAs: pool,
|
||||
MinVersion: tls.VersionTLS12,
|
||||
}
|
||||
|
||||
nc, err = nats.Connect("nats://localhost:4443", nats.Secure(config))
|
||||
if err != nil {
|
||||
t.Fatalf("Got an error on Connect with Secure Options: %+v\n", err)
|
||||
}
|
||||
|
||||
```
|
||||
|
||||
## Using Go Channels (netchan)
|
||||
|
||||
```go
|
||||
nc, _ := nats.Connect(nats.DefaultURL)
|
||||
ec, _ := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
|
||||
defer ec.Close()
|
||||
|
||||
type person struct {
|
||||
Name string
|
||||
Address string
|
||||
Age int
|
||||
}
|
||||
|
||||
recvCh := make(chan *person)
|
||||
ec.BindRecvChan("hello", recvCh)
|
||||
|
||||
sendCh := make(chan *person)
|
||||
ec.BindSendChan("hello", sendCh)
|
||||
|
||||
me := &person{Name: "derek", Age: 22, Address: "140 New Montgomery Street"}
|
||||
|
||||
// Send via Go channels
|
||||
sendCh <- me
|
||||
|
||||
// Receive via Go channels
|
||||
who := <- recvCh
|
||||
```
|
||||
|
||||
## Wildcard Subscriptions
|
||||
|
||||
```go
|
||||
|
||||
// "*" matches any token, at any level of the subject.
|
||||
nc.Subscribe("foo.*.baz", func(m *Msg) {
|
||||
fmt.Printf("Msg received on [%s] : %s\n", m.Subject, string(m.Data));
|
||||
})
|
||||
|
||||
nc.Subscribe("foo.bar.*", func(m *Msg) {
|
||||
fmt.Printf("Msg received on [%s] : %s\n", m.Subject, string(m.Data));
|
||||
})
|
||||
|
||||
// ">" matches any length of the tail of a subject, and can only be the last token
|
||||
// E.g. 'foo.>' will match 'foo.bar', 'foo.bar.baz', 'foo.foo.bar.bax.22'
|
||||
nc.Subscribe("foo.>", func(m *Msg) {
|
||||
fmt.Printf("Msg received on [%s] : %s\n", m.Subject, string(m.Data));
|
||||
})
|
||||
|
||||
// Matches all of the above
|
||||
nc.Publish("foo.bar.baz", []byte("Hello World"))
|
||||
|
||||
```
|
||||
|
||||
## Queue Groups
|
||||
|
||||
```go
|
||||
// All subscriptions with the same queue name will form a queue group.
|
||||
// Each message will be delivered to only one subscriber per queue group,
|
||||
// using queuing semantics. You can have as many queue groups as you wish.
|
||||
// Normal subscribers will continue to work as expected.
|
||||
|
||||
nc.QueueSubscribe("foo", "job_workers", func(_ *Msg) {
|
||||
received += 1;
|
||||
})
|
||||
|
||||
```
|
||||
|
||||
## Advanced Usage
|
||||
|
||||
```go
|
||||
|
||||
// Flush connection to server, returns when all messages have been processed.
|
||||
nc.Flush()
|
||||
fmt.Println("All clear!")
|
||||
|
||||
// FlushTimeout specifies a timeout value as well.
|
||||
err := nc.FlushTimeout(1*time.Second)
|
||||
if err != nil {
|
||||
fmt.Println("All clear!")
|
||||
} else {
|
||||
fmt.Println("Flushed timed out!")
|
||||
}
|
||||
|
||||
// Auto-unsubscribe after MAX_WANTED messages received
|
||||
const MAX_WANTED = 10
|
||||
sub, err := nc.Subscribe("foo")
|
||||
sub.AutoUnsubscribe(MAX_WANTED)
|
||||
|
||||
// Multiple connections
|
||||
nc1 := nats.Connect("nats://host1:4222")
|
||||
nc2 := nats.Connect("nats://host2:4222")
|
||||
|
||||
nc1.Subscribe("foo", func(m *Msg) {
|
||||
fmt.Printf("Received a message: %s\n", string(m.Data))
|
||||
})
|
||||
|
||||
nc2.Publish("foo", []byte("Hello World!"));
|
||||
|
||||
```
|
||||
|
||||
## Clustered Usage
|
||||
|
||||
```go
|
||||
|
||||
var servers = "nats://localhost:1222, nats://localhost:1223, nats://localhost:1224"
|
||||
|
||||
nc, err := nats.Connect(servers)
|
||||
|
||||
// Optionally set ReconnectWait and MaxReconnect attempts.
|
||||
// This example means 10 seconds total per backend.
|
||||
nc, err = nats.Connect(servers, nats.MaxReconnects(5), nats.ReconnectWait(2 * time.Second))
|
||||
|
||||
// Optionally disable randomization of the server pool
|
||||
nc, err = nats.Connect(servers, nats.DontRandomize())
|
||||
|
||||
// Setup callbacks to be notified on disconnects, reconnects and connection closed.
|
||||
nc, err = nats.Connect(servers,
|
||||
nats.DisconnectHandler(func(nc *nats.Conn) {
|
||||
fmt.Printf("Got disconnected!\n")
|
||||
}),
|
||||
nats.ReconnectHandler(func(_ *nats.Conn) {
|
||||
fmt.Printf("Got reconnected to %v!\n", nc.ConnectedUrl())
|
||||
}),
|
||||
nats.ClosedHandler(func(nc *nats.Conn) {
|
||||
fmt.Printf("Connection closed. Reason: %q\n", nc.LastError())
|
||||
})
|
||||
)
|
||||
|
||||
// When connecting to a mesh of servers with auto-discovery capabilities,
|
||||
// you may need to provide a username/password or token in order to connect
|
||||
// to any server in that mesh when authentication is required.
|
||||
// Instead of providing the credentials in the initial URL, you will use
|
||||
// new option setters:
|
||||
nc, err = nats.Connect("nats://localhost:4222", nats.UserInfo("foo", "bar"))
|
||||
|
||||
// For token based authentication:
|
||||
nc, err = nats.Connect("nats://localhost:4222", nats.Token("S3cretT0ken"))
|
||||
|
||||
// You can even pass the two at the same time in case one of the server
|
||||
// in the mesh requires token instead of user name and password.
|
||||
nc, err = nats.Connect("nats://localhost:4222",
|
||||
nats.UserInfo("foo", "bar"),
|
||||
nats.Token("S3cretT0ken"))
|
||||
|
||||
// Note that if credentials are specified in the initial URLs, they take
|
||||
// precedence on the credentials specfied through the options.
|
||||
// For instance, in the connect call below, the client library will use
|
||||
// the user "my" and password "pwd" to connect to locahost:4222, however,
|
||||
// it will use username "foo" and password "bar" when (re)connecting to
|
||||
// a different server URL that it got as part of the auto-discovery.
|
||||
nc, err = nats.Connect("nats://my:pwd@localhost:4222", nats.UserInfo("foo", "bar"))
|
||||
|
||||
```
|
||||
|
||||
## Context support (+Go 1.7)
|
||||
|
||||
```go
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
defer cancel()
|
||||
|
||||
nc, err := nats.Connect(nats.DefaultURL)
|
||||
|
||||
// Request with context
|
||||
msg, err := nc.RequestWithContext(ctx, "foo", []byte("bar"))
|
||||
|
||||
// Synchronous subscriber with context
|
||||
sub, err := nc.SubscribeSync("foo")
|
||||
msg, err := sub.NextMsgWithContext(ctx)
|
||||
|
||||
// Encoded Request with context
|
||||
c, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
|
||||
type request struct {
|
||||
Message string `json:"message"`
|
||||
}
|
||||
type response struct {
|
||||
Code int `json:"code"`
|
||||
}
|
||||
req := &request{Message: "Hello"}
|
||||
resp := &response{}
|
||||
err := c.RequestWithContext(ctx, "foo", req, resp)
|
||||
```
|
||||
|
||||
## License
|
||||
|
||||
(The MIT License)
|
||||
|
||||
Copyright (c) 2012-2017 Apcera Inc.
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to
|
||||
deal in the Software without restriction, including without limitation the
|
||||
rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
|
||||
sell copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in
|
||||
all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||
FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
|
||||
IN THE SOFTWARE.
|
166
gateway/vendor/github.com/nats-io/go-nats/context.go
generated
vendored
Normal file
166
gateway/vendor/github.com/nats-io/go-nats/context.go
generated
vendored
Normal file
@ -0,0 +1,166 @@
|
||||
// Copyright 2012-2017 Apcera Inc. All rights reserved.
|
||||
|
||||
// +build go1.7
|
||||
|
||||
// A Go client for the NATS messaging system (https://nats.io).
|
||||
package nats
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"reflect"
|
||||
)
|
||||
|
||||
// RequestWithContext takes a context, a subject and payload
|
||||
// in bytes and request expecting a single response.
|
||||
func (nc *Conn) RequestWithContext(ctx context.Context, subj string, data []byte) (*Msg, error) {
|
||||
if ctx == nil {
|
||||
return nil, ErrInvalidContext
|
||||
}
|
||||
if nc == nil {
|
||||
return nil, ErrInvalidConnection
|
||||
}
|
||||
|
||||
nc.mu.Lock()
|
||||
// If user wants the old style.
|
||||
if nc.Opts.UseOldRequestStyle {
|
||||
nc.mu.Unlock()
|
||||
return nc.oldRequestWithContext(ctx, subj, data)
|
||||
}
|
||||
|
||||
// Do setup for the new style.
|
||||
if nc.respMap == nil {
|
||||
// _INBOX wildcard
|
||||
nc.respSub = fmt.Sprintf("%s.*", NewInbox())
|
||||
nc.respMap = make(map[string]chan *Msg)
|
||||
}
|
||||
// Create literal Inbox and map to a chan msg.
|
||||
mch := make(chan *Msg, RequestChanLen)
|
||||
respInbox := nc.newRespInbox()
|
||||
token := respToken(respInbox)
|
||||
nc.respMap[token] = mch
|
||||
createSub := nc.respMux == nil
|
||||
ginbox := nc.respSub
|
||||
nc.mu.Unlock()
|
||||
|
||||
if createSub {
|
||||
// Make sure scoped subscription is setup only once.
|
||||
var err error
|
||||
nc.respSetup.Do(func() { err = nc.createRespMux(ginbox) })
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
err := nc.PublishRequest(subj, respInbox, data)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var ok bool
|
||||
var msg *Msg
|
||||
|
||||
select {
|
||||
case msg, ok = <-mch:
|
||||
if !ok {
|
||||
return nil, ErrConnectionClosed
|
||||
}
|
||||
case <-ctx.Done():
|
||||
nc.mu.Lock()
|
||||
delete(nc.respMap, token)
|
||||
nc.mu.Unlock()
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
|
||||
return msg, nil
|
||||
}
|
||||
|
||||
// oldRequestWithContext utilizes inbox and subscription per request.
|
||||
func (nc *Conn) oldRequestWithContext(ctx context.Context, subj string, data []byte) (*Msg, error) {
|
||||
inbox := NewInbox()
|
||||
ch := make(chan *Msg, RequestChanLen)
|
||||
|
||||
s, err := nc.subscribe(inbox, _EMPTY_, nil, ch)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s.AutoUnsubscribe(1)
|
||||
defer s.Unsubscribe()
|
||||
|
||||
err = nc.PublishRequest(subj, inbox, data)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return s.NextMsgWithContext(ctx)
|
||||
}
|
||||
|
||||
// NextMsgWithContext takes a context and returns the next message
|
||||
// available to a synchronous subscriber, blocking until it is delivered
|
||||
// or context gets canceled.
|
||||
func (s *Subscription) NextMsgWithContext(ctx context.Context) (*Msg, error) {
|
||||
if ctx == nil {
|
||||
return nil, ErrInvalidContext
|
||||
}
|
||||
if s == nil {
|
||||
return nil, ErrBadSubscription
|
||||
}
|
||||
|
||||
s.mu.Lock()
|
||||
err := s.validateNextMsgState()
|
||||
if err != nil {
|
||||
s.mu.Unlock()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// snapshot
|
||||
mch := s.mch
|
||||
s.mu.Unlock()
|
||||
|
||||
var ok bool
|
||||
var msg *Msg
|
||||
|
||||
select {
|
||||
case msg, ok = <-mch:
|
||||
if !ok {
|
||||
return nil, ErrConnectionClosed
|
||||
}
|
||||
err := s.processNextMsgDelivered(msg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
|
||||
return msg, nil
|
||||
}
|
||||
|
||||
// RequestWithContext will create an Inbox and perform a Request
|
||||
// using the provided cancellation context with the Inbox reply
|
||||
// for the data v. A response will be decoded into the vPtrResponse.
|
||||
func (c *EncodedConn) RequestWithContext(ctx context.Context, subject string, v interface{}, vPtr interface{}) error {
|
||||
if ctx == nil {
|
||||
return ErrInvalidContext
|
||||
}
|
||||
|
||||
b, err := c.Enc.Encode(subject, v)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
m, err := c.Conn.RequestWithContext(ctx, subject, b)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if reflect.TypeOf(vPtr) == emptyMsgType {
|
||||
mPtr := vPtr.(*Msg)
|
||||
*mPtr = *m
|
||||
} else {
|
||||
err := c.Enc.Decode(m.Subject, m.Data, vPtr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
249
gateway/vendor/github.com/nats-io/go-nats/enc.go
generated
vendored
Normal file
249
gateway/vendor/github.com/nats-io/go-nats/enc.go
generated
vendored
Normal file
@ -0,0 +1,249 @@
|
||||
// Copyright 2012-2015 Apcera Inc. All rights reserved.
|
||||
|
||||
package nats
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
// Default Encoders
|
||||
. "github.com/nats-io/go-nats/encoders/builtin"
|
||||
)
|
||||
|
||||
// Encoder interface is for all register encoders
|
||||
type Encoder interface {
|
||||
Encode(subject string, v interface{}) ([]byte, error)
|
||||
Decode(subject string, data []byte, vPtr interface{}) error
|
||||
}
|
||||
|
||||
var encMap map[string]Encoder
|
||||
var encLock sync.Mutex
|
||||
|
||||
// Indexe names into the Registered Encoders.
|
||||
const (
|
||||
JSON_ENCODER = "json"
|
||||
GOB_ENCODER = "gob"
|
||||
DEFAULT_ENCODER = "default"
|
||||
)
|
||||
|
||||
func init() {
|
||||
encMap = make(map[string]Encoder)
|
||||
// Register json, gob and default encoder
|
||||
RegisterEncoder(JSON_ENCODER, &JsonEncoder{})
|
||||
RegisterEncoder(GOB_ENCODER, &GobEncoder{})
|
||||
RegisterEncoder(DEFAULT_ENCODER, &DefaultEncoder{})
|
||||
}
|
||||
|
||||
// EncodedConn are the preferred way to interface with NATS. They wrap a bare connection to
|
||||
// a nats server and have an extendable encoder system that will encode and decode messages
|
||||
// from raw Go types.
|
||||
type EncodedConn struct {
|
||||
Conn *Conn
|
||||
Enc Encoder
|
||||
}
|
||||
|
||||
// NewEncodedConn will wrap an existing Connection and utilize the appropriate registered
|
||||
// encoder.
|
||||
func NewEncodedConn(c *Conn, encType string) (*EncodedConn, error) {
|
||||
if c == nil {
|
||||
return nil, errors.New("nats: Nil Connection")
|
||||
}
|
||||
if c.IsClosed() {
|
||||
return nil, ErrConnectionClosed
|
||||
}
|
||||
ec := &EncodedConn{Conn: c, Enc: EncoderForType(encType)}
|
||||
if ec.Enc == nil {
|
||||
return nil, fmt.Errorf("No encoder registered for '%s'", encType)
|
||||
}
|
||||
return ec, nil
|
||||
}
|
||||
|
||||
// RegisterEncoder will register the encType with the given Encoder. Useful for customization.
|
||||
func RegisterEncoder(encType string, enc Encoder) {
|
||||
encLock.Lock()
|
||||
defer encLock.Unlock()
|
||||
encMap[encType] = enc
|
||||
}
|
||||
|
||||
// EncoderForType will return the registered Encoder for the encType.
|
||||
func EncoderForType(encType string) Encoder {
|
||||
encLock.Lock()
|
||||
defer encLock.Unlock()
|
||||
return encMap[encType]
|
||||
}
|
||||
|
||||
// Publish publishes the data argument to the given subject. The data argument
|
||||
// will be encoded using the associated encoder.
|
||||
func (c *EncodedConn) Publish(subject string, v interface{}) error {
|
||||
b, err := c.Enc.Encode(subject, v)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return c.Conn.publish(subject, _EMPTY_, b)
|
||||
}
|
||||
|
||||
// PublishRequest will perform a Publish() expecting a response on the
|
||||
// reply subject. Use Request() for automatically waiting for a response
|
||||
// inline.
|
||||
func (c *EncodedConn) PublishRequest(subject, reply string, v interface{}) error {
|
||||
b, err := c.Enc.Encode(subject, v)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return c.Conn.publish(subject, reply, b)
|
||||
}
|
||||
|
||||
// Request will create an Inbox and perform a Request() call
|
||||
// with the Inbox reply for the data v. A response will be
|
||||
// decoded into the vPtrResponse.
|
||||
func (c *EncodedConn) Request(subject string, v interface{}, vPtr interface{}, timeout time.Duration) error {
|
||||
b, err := c.Enc.Encode(subject, v)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
m, err := c.Conn.Request(subject, b, timeout)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if reflect.TypeOf(vPtr) == emptyMsgType {
|
||||
mPtr := vPtr.(*Msg)
|
||||
*mPtr = *m
|
||||
} else {
|
||||
err = c.Enc.Decode(m.Subject, m.Data, vPtr)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Handler is a specific callback used for Subscribe. It is generalized to
|
||||
// an interface{}, but we will discover its format and arguments at runtime
|
||||
// and perform the correct callback, including de-marshaling JSON strings
|
||||
// back into the appropriate struct based on the signature of the Handler.
|
||||
//
|
||||
// Handlers are expected to have one of four signatures.
|
||||
//
|
||||
// type person struct {
|
||||
// Name string `json:"name,omitempty"`
|
||||
// Age uint `json:"age,omitempty"`
|
||||
// }
|
||||
//
|
||||
// handler := func(m *Msg)
|
||||
// handler := func(p *person)
|
||||
// handler := func(subject string, o *obj)
|
||||
// handler := func(subject, reply string, o *obj)
|
||||
//
|
||||
// These forms allow a callback to request a raw Msg ptr, where the processing
|
||||
// of the message from the wire is untouched. Process a JSON representation
|
||||
// and demarshal it into the given struct, e.g. person.
|
||||
// There are also variants where the callback wants either the subject, or the
|
||||
// subject and the reply subject.
|
||||
type Handler interface{}
|
||||
|
||||
// Dissect the cb Handler's signature
|
||||
func argInfo(cb Handler) (reflect.Type, int) {
|
||||
cbType := reflect.TypeOf(cb)
|
||||
if cbType.Kind() != reflect.Func {
|
||||
panic("nats: Handler needs to be a func")
|
||||
}
|
||||
numArgs := cbType.NumIn()
|
||||
if numArgs == 0 {
|
||||
return nil, numArgs
|
||||
}
|
||||
return cbType.In(numArgs - 1), numArgs
|
||||
}
|
||||
|
||||
var emptyMsgType = reflect.TypeOf(&Msg{})
|
||||
|
||||
// Subscribe will create a subscription on the given subject and process incoming
|
||||
// messages using the specified Handler. The Handler should be a func that matches
|
||||
// a signature from the description of Handler from above.
|
||||
func (c *EncodedConn) Subscribe(subject string, cb Handler) (*Subscription, error) {
|
||||
return c.subscribe(subject, _EMPTY_, cb)
|
||||
}
|
||||
|
||||
// QueueSubscribe will create a queue subscription on the given subject and process
|
||||
// incoming messages using the specified Handler. The Handler should be a func that
|
||||
// matches a signature from the description of Handler from above.
|
||||
func (c *EncodedConn) QueueSubscribe(subject, queue string, cb Handler) (*Subscription, error) {
|
||||
return c.subscribe(subject, queue, cb)
|
||||
}
|
||||
|
||||
// Internal implementation that all public functions will use.
|
||||
func (c *EncodedConn) subscribe(subject, queue string, cb Handler) (*Subscription, error) {
|
||||
if cb == nil {
|
||||
return nil, errors.New("nats: Handler required for EncodedConn Subscription")
|
||||
}
|
||||
argType, numArgs := argInfo(cb)
|
||||
if argType == nil {
|
||||
return nil, errors.New("nats: Handler requires at least one argument")
|
||||
}
|
||||
|
||||
cbValue := reflect.ValueOf(cb)
|
||||
wantsRaw := (argType == emptyMsgType)
|
||||
|
||||
natsCB := func(m *Msg) {
|
||||
var oV []reflect.Value
|
||||
if wantsRaw {
|
||||
oV = []reflect.Value{reflect.ValueOf(m)}
|
||||
} else {
|
||||
var oPtr reflect.Value
|
||||
if argType.Kind() != reflect.Ptr {
|
||||
oPtr = reflect.New(argType)
|
||||
} else {
|
||||
oPtr = reflect.New(argType.Elem())
|
||||
}
|
||||
if err := c.Enc.Decode(m.Subject, m.Data, oPtr.Interface()); err != nil {
|
||||
if c.Conn.Opts.AsyncErrorCB != nil {
|
||||
c.Conn.ach <- func() {
|
||||
c.Conn.Opts.AsyncErrorCB(c.Conn, m.Sub, errors.New("nats: Got an error trying to unmarshal: "+err.Error()))
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
if argType.Kind() != reflect.Ptr {
|
||||
oPtr = reflect.Indirect(oPtr)
|
||||
}
|
||||
|
||||
// Callback Arity
|
||||
switch numArgs {
|
||||
case 1:
|
||||
oV = []reflect.Value{oPtr}
|
||||
case 2:
|
||||
subV := reflect.ValueOf(m.Subject)
|
||||
oV = []reflect.Value{subV, oPtr}
|
||||
case 3:
|
||||
subV := reflect.ValueOf(m.Subject)
|
||||
replyV := reflect.ValueOf(m.Reply)
|
||||
oV = []reflect.Value{subV, replyV, oPtr}
|
||||
}
|
||||
|
||||
}
|
||||
cbValue.Call(oV)
|
||||
}
|
||||
|
||||
return c.Conn.subscribe(subject, queue, natsCB, nil)
|
||||
}
|
||||
|
||||
// FlushTimeout allows a Flush operation to have an associated timeout.
|
||||
func (c *EncodedConn) FlushTimeout(timeout time.Duration) (err error) {
|
||||
return c.Conn.FlushTimeout(timeout)
|
||||
}
|
||||
|
||||
// Flush will perform a round trip to the server and return when it
|
||||
// receives the internal reply.
|
||||
func (c *EncodedConn) Flush() error {
|
||||
return c.Conn.Flush()
|
||||
}
|
||||
|
||||
// Close will close the connection to the server. This call will release
|
||||
// all blocking calls, such as Flush(), etc.
|
||||
func (c *EncodedConn) Close() {
|
||||
c.Conn.Close()
|
||||
}
|
||||
|
||||
// LastError reports the last error encountered via the Connection.
|
||||
func (c *EncodedConn) LastError() error {
|
||||
return c.Conn.err
|
||||
}
|
106
gateway/vendor/github.com/nats-io/go-nats/encoders/builtin/default_enc.go
generated
vendored
Normal file
106
gateway/vendor/github.com/nats-io/go-nats/encoders/builtin/default_enc.go
generated
vendored
Normal file
@ -0,0 +1,106 @@
|
||||
// Copyright 2012-2015 Apcera Inc. All rights reserved.
|
||||
|
||||
package builtin
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"unsafe"
|
||||
)
|
||||
|
||||
// DefaultEncoder implementation for EncodedConn.
|
||||
// This encoder will leave []byte and string untouched, but will attempt to
|
||||
// turn numbers into appropriate strings that can be decoded. It will also
|
||||
// propely encoded and decode bools. If will encode a struct, but if you want
|
||||
// to properly handle structures you should use JsonEncoder.
|
||||
type DefaultEncoder struct {
|
||||
// Empty
|
||||
}
|
||||
|
||||
var trueB = []byte("true")
|
||||
var falseB = []byte("false")
|
||||
var nilB = []byte("")
|
||||
|
||||
// Encode
|
||||
func (je *DefaultEncoder) Encode(subject string, v interface{}) ([]byte, error) {
|
||||
switch arg := v.(type) {
|
||||
case string:
|
||||
bytes := *(*[]byte)(unsafe.Pointer(&arg))
|
||||
return bytes, nil
|
||||
case []byte:
|
||||
return arg, nil
|
||||
case bool:
|
||||
if arg {
|
||||
return trueB, nil
|
||||
} else {
|
||||
return falseB, nil
|
||||
}
|
||||
case nil:
|
||||
return nilB, nil
|
||||
default:
|
||||
var buf bytes.Buffer
|
||||
fmt.Fprintf(&buf, "%+v", arg)
|
||||
return buf.Bytes(), nil
|
||||
}
|
||||
}
|
||||
|
||||
// Decode
|
||||
func (je *DefaultEncoder) Decode(subject string, data []byte, vPtr interface{}) error {
|
||||
// Figure out what it's pointing to...
|
||||
sData := *(*string)(unsafe.Pointer(&data))
|
||||
switch arg := vPtr.(type) {
|
||||
case *string:
|
||||
*arg = sData
|
||||
return nil
|
||||
case *[]byte:
|
||||
*arg = data
|
||||
return nil
|
||||
case *int:
|
||||
n, err := strconv.ParseInt(sData, 10, 64)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
*arg = int(n)
|
||||
return nil
|
||||
case *int32:
|
||||
n, err := strconv.ParseInt(sData, 10, 64)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
*arg = int32(n)
|
||||
return nil
|
||||
case *int64:
|
||||
n, err := strconv.ParseInt(sData, 10, 64)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
*arg = int64(n)
|
||||
return nil
|
||||
case *float32:
|
||||
n, err := strconv.ParseFloat(sData, 32)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
*arg = float32(n)
|
||||
return nil
|
||||
case *float64:
|
||||
n, err := strconv.ParseFloat(sData, 64)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
*arg = float64(n)
|
||||
return nil
|
||||
case *bool:
|
||||
b, err := strconv.ParseBool(sData)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
*arg = b
|
||||
return nil
|
||||
default:
|
||||
vt := reflect.TypeOf(arg).Elem()
|
||||
return fmt.Errorf("nats: Default Encoder can't decode to type %s", vt)
|
||||
}
|
||||
}
|
34
gateway/vendor/github.com/nats-io/go-nats/encoders/builtin/gob_enc.go
generated
vendored
Normal file
34
gateway/vendor/github.com/nats-io/go-nats/encoders/builtin/gob_enc.go
generated
vendored
Normal file
@ -0,0 +1,34 @@
|
||||
// Copyright 2013-2015 Apcera Inc. All rights reserved.
|
||||
|
||||
package builtin
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/gob"
|
||||
)
|
||||
|
||||
// GobEncoder is a Go specific GOB Encoder implementation for EncodedConn.
|
||||
// This encoder will use the builtin encoding/gob to Marshal
|
||||
// and Unmarshal most types, including structs.
|
||||
type GobEncoder struct {
|
||||
// Empty
|
||||
}
|
||||
|
||||
// FIXME(dlc) - This could probably be more efficient.
|
||||
|
||||
// Encode
|
||||
func (ge *GobEncoder) Encode(subject string, v interface{}) ([]byte, error) {
|
||||
b := new(bytes.Buffer)
|
||||
enc := gob.NewEncoder(b)
|
||||
if err := enc.Encode(v); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return b.Bytes(), nil
|
||||
}
|
||||
|
||||
// Decode
|
||||
func (ge *GobEncoder) Decode(subject string, data []byte, vPtr interface{}) (err error) {
|
||||
dec := gob.NewDecoder(bytes.NewBuffer(data))
|
||||
err = dec.Decode(vPtr)
|
||||
return
|
||||
}
|
45
gateway/vendor/github.com/nats-io/go-nats/encoders/builtin/json_enc.go
generated
vendored
Normal file
45
gateway/vendor/github.com/nats-io/go-nats/encoders/builtin/json_enc.go
generated
vendored
Normal file
@ -0,0 +1,45 @@
|
||||
// Copyright 2012-2015 Apcera Inc. All rights reserved.
|
||||
|
||||
package builtin
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// JsonEncoder is a JSON Encoder implementation for EncodedConn.
|
||||
// This encoder will use the builtin encoding/json to Marshal
|
||||
// and Unmarshal most types, including structs.
|
||||
type JsonEncoder struct {
|
||||
// Empty
|
||||
}
|
||||
|
||||
// Encode
|
||||
func (je *JsonEncoder) Encode(subject string, v interface{}) ([]byte, error) {
|
||||
b, err := json.Marshal(v)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return b, nil
|
||||
}
|
||||
|
||||
// Decode
|
||||
func (je *JsonEncoder) Decode(subject string, data []byte, vPtr interface{}) (err error) {
|
||||
switch arg := vPtr.(type) {
|
||||
case *string:
|
||||
// If they want a string and it is a JSON string, strip quotes
|
||||
// This allows someone to send a struct but receive as a plain string
|
||||
// This cast should be efficient for Go 1.3 and beyond.
|
||||
str := string(data)
|
||||
if strings.HasPrefix(str, `"`) && strings.HasSuffix(str, `"`) {
|
||||
*arg = str[1 : len(str)-1]
|
||||
} else {
|
||||
*arg = str
|
||||
}
|
||||
case *[]byte:
|
||||
*arg = data
|
||||
default:
|
||||
err = json.Unmarshal(data, arg)
|
||||
}
|
||||
return
|
||||
}
|
2975
gateway/vendor/github.com/nats-io/go-nats/nats.go
generated
vendored
Normal file
2975
gateway/vendor/github.com/nats-io/go-nats/nats.go
generated
vendored
Normal file
File diff suppressed because it is too large
Load Diff
100
gateway/vendor/github.com/nats-io/go-nats/netchan.go
generated
vendored
Normal file
100
gateway/vendor/github.com/nats-io/go-nats/netchan.go
generated
vendored
Normal file
@ -0,0 +1,100 @@
|
||||
// Copyright 2013-2017 Apcera Inc. All rights reserved.
|
||||
|
||||
package nats
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"reflect"
|
||||
)
|
||||
|
||||
// This allows the functionality for network channels by binding send and receive Go chans
|
||||
// to subjects and optionally queue groups.
|
||||
// Data will be encoded and decoded via the EncodedConn and its associated encoders.
|
||||
|
||||
// BindSendChan binds a channel for send operations to NATS.
|
||||
func (c *EncodedConn) BindSendChan(subject string, channel interface{}) error {
|
||||
chVal := reflect.ValueOf(channel)
|
||||
if chVal.Kind() != reflect.Chan {
|
||||
return ErrChanArg
|
||||
}
|
||||
go chPublish(c, chVal, subject)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Publish all values that arrive on the channel until it is closed or we
|
||||
// encounter an error.
|
||||
func chPublish(c *EncodedConn, chVal reflect.Value, subject string) {
|
||||
for {
|
||||
val, ok := chVal.Recv()
|
||||
if !ok {
|
||||
// Channel has most likely been closed.
|
||||
return
|
||||
}
|
||||
if e := c.Publish(subject, val.Interface()); e != nil {
|
||||
// Do this under lock.
|
||||
c.Conn.mu.Lock()
|
||||
defer c.Conn.mu.Unlock()
|
||||
|
||||
if c.Conn.Opts.AsyncErrorCB != nil {
|
||||
// FIXME(dlc) - Not sure this is the right thing to do.
|
||||
// FIXME(ivan) - If the connection is not yet closed, try to schedule the callback
|
||||
if c.Conn.isClosed() {
|
||||
go c.Conn.Opts.AsyncErrorCB(c.Conn, nil, e)
|
||||
} else {
|
||||
c.Conn.ach <- func() { c.Conn.Opts.AsyncErrorCB(c.Conn, nil, e) }
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// BindRecvChan binds a channel for receive operations from NATS.
|
||||
func (c *EncodedConn) BindRecvChan(subject string, channel interface{}) (*Subscription, error) {
|
||||
return c.bindRecvChan(subject, _EMPTY_, channel)
|
||||
}
|
||||
|
||||
// BindRecvQueueChan binds a channel for queue-based receive operations from NATS.
|
||||
func (c *EncodedConn) BindRecvQueueChan(subject, queue string, channel interface{}) (*Subscription, error) {
|
||||
return c.bindRecvChan(subject, queue, channel)
|
||||
}
|
||||
|
||||
// Internal function to bind receive operations for a channel.
|
||||
func (c *EncodedConn) bindRecvChan(subject, queue string, channel interface{}) (*Subscription, error) {
|
||||
chVal := reflect.ValueOf(channel)
|
||||
if chVal.Kind() != reflect.Chan {
|
||||
return nil, ErrChanArg
|
||||
}
|
||||
argType := chVal.Type().Elem()
|
||||
|
||||
cb := func(m *Msg) {
|
||||
var oPtr reflect.Value
|
||||
if argType.Kind() != reflect.Ptr {
|
||||
oPtr = reflect.New(argType)
|
||||
} else {
|
||||
oPtr = reflect.New(argType.Elem())
|
||||
}
|
||||
if err := c.Enc.Decode(m.Subject, m.Data, oPtr.Interface()); err != nil {
|
||||
c.Conn.err = errors.New("nats: Got an error trying to unmarshal: " + err.Error())
|
||||
if c.Conn.Opts.AsyncErrorCB != nil {
|
||||
c.Conn.ach <- func() { c.Conn.Opts.AsyncErrorCB(c.Conn, m.Sub, c.Conn.err) }
|
||||
}
|
||||
return
|
||||
}
|
||||
if argType.Kind() != reflect.Ptr {
|
||||
oPtr = reflect.Indirect(oPtr)
|
||||
}
|
||||
// This is a bit hacky, but in this instance we may be trying to send to a closed channel.
|
||||
// and the user does not know when it is safe to close the channel.
|
||||
defer func() {
|
||||
// If we have panicked, recover and close the subscription.
|
||||
if r := recover(); r != nil {
|
||||
m.Sub.Unsubscribe()
|
||||
}
|
||||
}()
|
||||
// Actually do the send to the channel.
|
||||
chVal.Send(oPtr)
|
||||
}
|
||||
|
||||
return c.Conn.subscribe(subject, queue, cb, nil)
|
||||
}
|
470
gateway/vendor/github.com/nats-io/go-nats/parser.go
generated
vendored
Normal file
470
gateway/vendor/github.com/nats-io/go-nats/parser.go
generated
vendored
Normal file
@ -0,0 +1,470 @@
|
||||
// Copyright 2012-2017 Apcera Inc. All rights reserved.
|
||||
|
||||
package nats
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
)
|
||||
|
||||
type msgArg struct {
|
||||
subject []byte
|
||||
reply []byte
|
||||
sid int64
|
||||
size int
|
||||
}
|
||||
|
||||
const MAX_CONTROL_LINE_SIZE = 1024
|
||||
|
||||
type parseState struct {
|
||||
state int
|
||||
as int
|
||||
drop int
|
||||
ma msgArg
|
||||
argBuf []byte
|
||||
msgBuf []byte
|
||||
scratch [MAX_CONTROL_LINE_SIZE]byte
|
||||
}
|
||||
|
||||
const (
|
||||
OP_START = iota
|
||||
OP_PLUS
|
||||
OP_PLUS_O
|
||||
OP_PLUS_OK
|
||||
OP_MINUS
|
||||
OP_MINUS_E
|
||||
OP_MINUS_ER
|
||||
OP_MINUS_ERR
|
||||
OP_MINUS_ERR_SPC
|
||||
MINUS_ERR_ARG
|
||||
OP_M
|
||||
OP_MS
|
||||
OP_MSG
|
||||
OP_MSG_SPC
|
||||
MSG_ARG
|
||||
MSG_PAYLOAD
|
||||
MSG_END
|
||||
OP_P
|
||||
OP_PI
|
||||
OP_PIN
|
||||
OP_PING
|
||||
OP_PO
|
||||
OP_PON
|
||||
OP_PONG
|
||||
OP_I
|
||||
OP_IN
|
||||
OP_INF
|
||||
OP_INFO
|
||||
OP_INFO_SPC
|
||||
INFO_ARG
|
||||
)
|
||||
|
||||
// parse is the fast protocol parser engine.
|
||||
func (nc *Conn) parse(buf []byte) error {
|
||||
var i int
|
||||
var b byte
|
||||
|
||||
// Move to loop instead of range syntax to allow jumping of i
|
||||
for i = 0; i < len(buf); i++ {
|
||||
b = buf[i]
|
||||
|
||||
switch nc.ps.state {
|
||||
case OP_START:
|
||||
switch b {
|
||||
case 'M', 'm':
|
||||
nc.ps.state = OP_M
|
||||
case 'P', 'p':
|
||||
nc.ps.state = OP_P
|
||||
case '+':
|
||||
nc.ps.state = OP_PLUS
|
||||
case '-':
|
||||
nc.ps.state = OP_MINUS
|
||||
case 'I', 'i':
|
||||
nc.ps.state = OP_I
|
||||
default:
|
||||
goto parseErr
|
||||
}
|
||||
case OP_M:
|
||||
switch b {
|
||||
case 'S', 's':
|
||||
nc.ps.state = OP_MS
|
||||
default:
|
||||
goto parseErr
|
||||
}
|
||||
case OP_MS:
|
||||
switch b {
|
||||
case 'G', 'g':
|
||||
nc.ps.state = OP_MSG
|
||||
default:
|
||||
goto parseErr
|
||||
}
|
||||
case OP_MSG:
|
||||
switch b {
|
||||
case ' ', '\t':
|
||||
nc.ps.state = OP_MSG_SPC
|
||||
default:
|
||||
goto parseErr
|
||||
}
|
||||
case OP_MSG_SPC:
|
||||
switch b {
|
||||
case ' ', '\t':
|
||||
continue
|
||||
default:
|
||||
nc.ps.state = MSG_ARG
|
||||
nc.ps.as = i
|
||||
}
|
||||
case MSG_ARG:
|
||||
switch b {
|
||||
case '\r':
|
||||
nc.ps.drop = 1
|
||||
case '\n':
|
||||
var arg []byte
|
||||
if nc.ps.argBuf != nil {
|
||||
arg = nc.ps.argBuf
|
||||
} else {
|
||||
arg = buf[nc.ps.as : i-nc.ps.drop]
|
||||
}
|
||||
if err := nc.processMsgArgs(arg); err != nil {
|
||||
return err
|
||||
}
|
||||
nc.ps.drop, nc.ps.as, nc.ps.state = 0, i+1, MSG_PAYLOAD
|
||||
|
||||
// jump ahead with the index. If this overruns
|
||||
// what is left we fall out and process split
|
||||
// buffer.
|
||||
i = nc.ps.as + nc.ps.ma.size - 1
|
||||
default:
|
||||
if nc.ps.argBuf != nil {
|
||||
nc.ps.argBuf = append(nc.ps.argBuf, b)
|
||||
}
|
||||
}
|
||||
case MSG_PAYLOAD:
|
||||
if nc.ps.msgBuf != nil {
|
||||
if len(nc.ps.msgBuf) >= nc.ps.ma.size {
|
||||
nc.processMsg(nc.ps.msgBuf)
|
||||
nc.ps.argBuf, nc.ps.msgBuf, nc.ps.state = nil, nil, MSG_END
|
||||
} else {
|
||||
// copy as much as we can to the buffer and skip ahead.
|
||||
toCopy := nc.ps.ma.size - len(nc.ps.msgBuf)
|
||||
avail := len(buf) - i
|
||||
|
||||
if avail < toCopy {
|
||||
toCopy = avail
|
||||
}
|
||||
|
||||
if toCopy > 0 {
|
||||
start := len(nc.ps.msgBuf)
|
||||
// This is needed for copy to work.
|
||||
nc.ps.msgBuf = nc.ps.msgBuf[:start+toCopy]
|
||||
copy(nc.ps.msgBuf[start:], buf[i:i+toCopy])
|
||||
// Update our index
|
||||
i = (i + toCopy) - 1
|
||||
} else {
|
||||
nc.ps.msgBuf = append(nc.ps.msgBuf, b)
|
||||
}
|
||||
}
|
||||
} else if i-nc.ps.as >= nc.ps.ma.size {
|
||||
nc.processMsg(buf[nc.ps.as:i])
|
||||
nc.ps.argBuf, nc.ps.msgBuf, nc.ps.state = nil, nil, MSG_END
|
||||
}
|
||||
case MSG_END:
|
||||
switch b {
|
||||
case '\n':
|
||||
nc.ps.drop, nc.ps.as, nc.ps.state = 0, i+1, OP_START
|
||||
default:
|
||||
continue
|
||||
}
|
||||
case OP_PLUS:
|
||||
switch b {
|
||||
case 'O', 'o':
|
||||
nc.ps.state = OP_PLUS_O
|
||||
default:
|
||||
goto parseErr
|
||||
}
|
||||
case OP_PLUS_O:
|
||||
switch b {
|
||||
case 'K', 'k':
|
||||
nc.ps.state = OP_PLUS_OK
|
||||
default:
|
||||
goto parseErr
|
||||
}
|
||||
case OP_PLUS_OK:
|
||||
switch b {
|
||||
case '\n':
|
||||
nc.processOK()
|
||||
nc.ps.drop, nc.ps.state = 0, OP_START
|
||||
}
|
||||
case OP_MINUS:
|
||||
switch b {
|
||||
case 'E', 'e':
|
||||
nc.ps.state = OP_MINUS_E
|
||||
default:
|
||||
goto parseErr
|
||||
}
|
||||
case OP_MINUS_E:
|
||||
switch b {
|
||||
case 'R', 'r':
|
||||
nc.ps.state = OP_MINUS_ER
|
||||
default:
|
||||
goto parseErr
|
||||
}
|
||||
case OP_MINUS_ER:
|
||||
switch b {
|
||||
case 'R', 'r':
|
||||
nc.ps.state = OP_MINUS_ERR
|
||||
default:
|
||||
goto parseErr
|
||||
}
|
||||
case OP_MINUS_ERR:
|
||||
switch b {
|
||||
case ' ', '\t':
|
||||
nc.ps.state = OP_MINUS_ERR_SPC
|
||||
default:
|
||||
goto parseErr
|
||||
}
|
||||
case OP_MINUS_ERR_SPC:
|
||||
switch b {
|
||||
case ' ', '\t':
|
||||
continue
|
||||
default:
|
||||
nc.ps.state = MINUS_ERR_ARG
|
||||
nc.ps.as = i
|
||||
}
|
||||
case MINUS_ERR_ARG:
|
||||
switch b {
|
||||
case '\r':
|
||||
nc.ps.drop = 1
|
||||
case '\n':
|
||||
var arg []byte
|
||||
if nc.ps.argBuf != nil {
|
||||
arg = nc.ps.argBuf
|
||||
nc.ps.argBuf = nil
|
||||
} else {
|
||||
arg = buf[nc.ps.as : i-nc.ps.drop]
|
||||
}
|
||||
nc.processErr(string(arg))
|
||||
nc.ps.drop, nc.ps.as, nc.ps.state = 0, i+1, OP_START
|
||||
default:
|
||||
if nc.ps.argBuf != nil {
|
||||
nc.ps.argBuf = append(nc.ps.argBuf, b)
|
||||
}
|
||||
}
|
||||
case OP_P:
|
||||
switch b {
|
||||
case 'I', 'i':
|
||||
nc.ps.state = OP_PI
|
||||
case 'O', 'o':
|
||||
nc.ps.state = OP_PO
|
||||
default:
|
||||
goto parseErr
|
||||
}
|
||||
case OP_PO:
|
||||
switch b {
|
||||
case 'N', 'n':
|
||||
nc.ps.state = OP_PON
|
||||
default:
|
||||
goto parseErr
|
||||
}
|
||||
case OP_PON:
|
||||
switch b {
|
||||
case 'G', 'g':
|
||||
nc.ps.state = OP_PONG
|
||||
default:
|
||||
goto parseErr
|
||||
}
|
||||
case OP_PONG:
|
||||
switch b {
|
||||
case '\n':
|
||||
nc.processPong()
|
||||
nc.ps.drop, nc.ps.state = 0, OP_START
|
||||
}
|
||||
case OP_PI:
|
||||
switch b {
|
||||
case 'N', 'n':
|
||||
nc.ps.state = OP_PIN
|
||||
default:
|
||||
goto parseErr
|
||||
}
|
||||
case OP_PIN:
|
||||
switch b {
|
||||
case 'G', 'g':
|
||||
nc.ps.state = OP_PING
|
||||
default:
|
||||
goto parseErr
|
||||
}
|
||||
case OP_PING:
|
||||
switch b {
|
||||
case '\n':
|
||||
nc.processPing()
|
||||
nc.ps.drop, nc.ps.state = 0, OP_START
|
||||
}
|
||||
case OP_I:
|
||||
switch b {
|
||||
case 'N', 'n':
|
||||
nc.ps.state = OP_IN
|
||||
default:
|
||||
goto parseErr
|
||||
}
|
||||
case OP_IN:
|
||||
switch b {
|
||||
case 'F', 'f':
|
||||
nc.ps.state = OP_INF
|
||||
default:
|
||||
goto parseErr
|
||||
}
|
||||
case OP_INF:
|
||||
switch b {
|
||||
case 'O', 'o':
|
||||
nc.ps.state = OP_INFO
|
||||
default:
|
||||
goto parseErr
|
||||
}
|
||||
case OP_INFO:
|
||||
switch b {
|
||||
case ' ', '\t':
|
||||
nc.ps.state = OP_INFO_SPC
|
||||
default:
|
||||
goto parseErr
|
||||
}
|
||||
case OP_INFO_SPC:
|
||||
switch b {
|
||||
case ' ', '\t':
|
||||
continue
|
||||
default:
|
||||
nc.ps.state = INFO_ARG
|
||||
nc.ps.as = i
|
||||
}
|
||||
case INFO_ARG:
|
||||
switch b {
|
||||
case '\r':
|
||||
nc.ps.drop = 1
|
||||
case '\n':
|
||||
var arg []byte
|
||||
if nc.ps.argBuf != nil {
|
||||
arg = nc.ps.argBuf
|
||||
nc.ps.argBuf = nil
|
||||
} else {
|
||||
arg = buf[nc.ps.as : i-nc.ps.drop]
|
||||
}
|
||||
nc.processAsyncInfo(arg)
|
||||
nc.ps.drop, nc.ps.as, nc.ps.state = 0, i+1, OP_START
|
||||
default:
|
||||
if nc.ps.argBuf != nil {
|
||||
nc.ps.argBuf = append(nc.ps.argBuf, b)
|
||||
}
|
||||
}
|
||||
default:
|
||||
goto parseErr
|
||||
}
|
||||
}
|
||||
// Check for split buffer scenarios
|
||||
if (nc.ps.state == MSG_ARG || nc.ps.state == MINUS_ERR_ARG || nc.ps.state == INFO_ARG) && nc.ps.argBuf == nil {
|
||||
nc.ps.argBuf = nc.ps.scratch[:0]
|
||||
nc.ps.argBuf = append(nc.ps.argBuf, buf[nc.ps.as:i-nc.ps.drop]...)
|
||||
// FIXME, check max len
|
||||
}
|
||||
// Check for split msg
|
||||
if nc.ps.state == MSG_PAYLOAD && nc.ps.msgBuf == nil {
|
||||
// We need to clone the msgArg if it is still referencing the
|
||||
// read buffer and we are not able to process the msg.
|
||||
if nc.ps.argBuf == nil {
|
||||
nc.cloneMsgArg()
|
||||
}
|
||||
|
||||
// If we will overflow the scratch buffer, just create a
|
||||
// new buffer to hold the split message.
|
||||
if nc.ps.ma.size > cap(nc.ps.scratch)-len(nc.ps.argBuf) {
|
||||
lrem := len(buf[nc.ps.as:])
|
||||
|
||||
nc.ps.msgBuf = make([]byte, lrem, nc.ps.ma.size)
|
||||
copy(nc.ps.msgBuf, buf[nc.ps.as:])
|
||||
} else {
|
||||
nc.ps.msgBuf = nc.ps.scratch[len(nc.ps.argBuf):len(nc.ps.argBuf)]
|
||||
nc.ps.msgBuf = append(nc.ps.msgBuf, (buf[nc.ps.as:])...)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
parseErr:
|
||||
return fmt.Errorf("nats: Parse Error [%d]: '%s'", nc.ps.state, buf[i:])
|
||||
}
|
||||
|
||||
// cloneMsgArg is used when the split buffer scenario has the pubArg in the existing read buffer, but
|
||||
// we need to hold onto it into the next read.
|
||||
func (nc *Conn) cloneMsgArg() {
|
||||
nc.ps.argBuf = nc.ps.scratch[:0]
|
||||
nc.ps.argBuf = append(nc.ps.argBuf, nc.ps.ma.subject...)
|
||||
nc.ps.argBuf = append(nc.ps.argBuf, nc.ps.ma.reply...)
|
||||
nc.ps.ma.subject = nc.ps.argBuf[:len(nc.ps.ma.subject)]
|
||||
if nc.ps.ma.reply != nil {
|
||||
nc.ps.ma.reply = nc.ps.argBuf[len(nc.ps.ma.subject):]
|
||||
}
|
||||
}
|
||||
|
||||
const argsLenMax = 4
|
||||
|
||||
func (nc *Conn) processMsgArgs(arg []byte) error {
|
||||
// Unroll splitArgs to avoid runtime/heap issues
|
||||
a := [argsLenMax][]byte{}
|
||||
args := a[:0]
|
||||
start := -1
|
||||
for i, b := range arg {
|
||||
switch b {
|
||||
case ' ', '\t', '\r', '\n':
|
||||
if start >= 0 {
|
||||
args = append(args, arg[start:i])
|
||||
start = -1
|
||||
}
|
||||
default:
|
||||
if start < 0 {
|
||||
start = i
|
||||
}
|
||||
}
|
||||
}
|
||||
if start >= 0 {
|
||||
args = append(args, arg[start:])
|
||||
}
|
||||
|
||||
switch len(args) {
|
||||
case 3:
|
||||
nc.ps.ma.subject = args[0]
|
||||
nc.ps.ma.sid = parseInt64(args[1])
|
||||
nc.ps.ma.reply = nil
|
||||
nc.ps.ma.size = int(parseInt64(args[2]))
|
||||
case 4:
|
||||
nc.ps.ma.subject = args[0]
|
||||
nc.ps.ma.sid = parseInt64(args[1])
|
||||
nc.ps.ma.reply = args[2]
|
||||
nc.ps.ma.size = int(parseInt64(args[3]))
|
||||
default:
|
||||
return fmt.Errorf("nats: processMsgArgs Parse Error: '%s'", arg)
|
||||
}
|
||||
if nc.ps.ma.sid < 0 {
|
||||
return fmt.Errorf("nats: processMsgArgs Bad or Missing Sid: '%s'", arg)
|
||||
}
|
||||
if nc.ps.ma.size < 0 {
|
||||
return fmt.Errorf("nats: processMsgArgs Bad or Missing Size: '%s'", arg)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Ascii numbers 0-9
|
||||
const (
|
||||
ascii_0 = 48
|
||||
ascii_9 = 57
|
||||
)
|
||||
|
||||
// parseInt64 expects decimal positive numbers. We
|
||||
// return -1 to signal error
|
||||
func parseInt64(d []byte) (n int64) {
|
||||
if len(d) == 0 {
|
||||
return -1
|
||||
}
|
||||
for _, dec := range d {
|
||||
if dec < ascii_0 || dec > ascii_9 {
|
||||
return -1
|
||||
}
|
||||
n = n*10 + (int64(dec) - ascii_0)
|
||||
}
|
||||
return n
|
||||
}
|
43
gateway/vendor/github.com/nats-io/go-nats/timer.go
generated
vendored
Normal file
43
gateway/vendor/github.com/nats-io/go-nats/timer.go
generated
vendored
Normal file
@ -0,0 +1,43 @@
|
||||
package nats
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// global pool of *time.Timer's. can be used by multiple goroutines concurrently.
|
||||
var globalTimerPool timerPool
|
||||
|
||||
// timerPool provides GC-able pooling of *time.Timer's.
|
||||
// can be used by multiple goroutines concurrently.
|
||||
type timerPool struct {
|
||||
p sync.Pool
|
||||
}
|
||||
|
||||
// Get returns a timer that completes after the given duration.
|
||||
func (tp *timerPool) Get(d time.Duration) *time.Timer {
|
||||
if t, _ := tp.p.Get().(*time.Timer); t != nil {
|
||||
t.Reset(d)
|
||||
return t
|
||||
}
|
||||
|
||||
return time.NewTimer(d)
|
||||
}
|
||||
|
||||
// Put pools the given timer.
|
||||
//
|
||||
// There is no need to call t.Stop() before calling Put.
|
||||
//
|
||||
// Put will try to stop the timer before pooling. If the
|
||||
// given timer already expired, Put will read the unreceived
|
||||
// value if there is one.
|
||||
func (tp *timerPool) Put(t *time.Timer) {
|
||||
if !t.Stop() {
|
||||
select {
|
||||
case <-t.C:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
tp.p.Put(t)
|
||||
}
|
37
gateway/vendor/github.com/nats-io/go-nats/util/tls.go
generated
vendored
Normal file
37
gateway/vendor/github.com/nats-io/go-nats/util/tls.go
generated
vendored
Normal file
@ -0,0 +1,37 @@
|
||||
// Copyright 2016 Apcera Inc. All rights reserved.
|
||||
// +build go1.7
|
||||
|
||||
package util
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
)
|
||||
|
||||
// CloneTLSConfig returns a copy of c. Only the exported fields are copied.
|
||||
// This is temporary, until this is provided by the language.
|
||||
// https://go-review.googlesource.com/#/c/28075/
|
||||
func CloneTLSConfig(c *tls.Config) *tls.Config {
|
||||
return &tls.Config{
|
||||
Rand: c.Rand,
|
||||
Time: c.Time,
|
||||
Certificates: c.Certificates,
|
||||
NameToCertificate: c.NameToCertificate,
|
||||
GetCertificate: c.GetCertificate,
|
||||
RootCAs: c.RootCAs,
|
||||
NextProtos: c.NextProtos,
|
||||
ServerName: c.ServerName,
|
||||
ClientAuth: c.ClientAuth,
|
||||
ClientCAs: c.ClientCAs,
|
||||
InsecureSkipVerify: c.InsecureSkipVerify,
|
||||
CipherSuites: c.CipherSuites,
|
||||
PreferServerCipherSuites: c.PreferServerCipherSuites,
|
||||
SessionTicketsDisabled: c.SessionTicketsDisabled,
|
||||
SessionTicketKey: c.SessionTicketKey,
|
||||
ClientSessionCache: c.ClientSessionCache,
|
||||
MinVersion: c.MinVersion,
|
||||
MaxVersion: c.MaxVersion,
|
||||
CurvePreferences: c.CurvePreferences,
|
||||
DynamicRecordSizingDisabled: c.DynamicRecordSizingDisabled,
|
||||
Renegotiation: c.Renegotiation,
|
||||
}
|
||||
}
|
35
gateway/vendor/github.com/nats-io/go-nats/util/tls_pre17.go
generated
vendored
Normal file
35
gateway/vendor/github.com/nats-io/go-nats/util/tls_pre17.go
generated
vendored
Normal file
@ -0,0 +1,35 @@
|
||||
// Copyright 2016 Apcera Inc. All rights reserved.
|
||||
// +build go1.5,!go1.7
|
||||
|
||||
package util
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
)
|
||||
|
||||
// CloneTLSConfig returns a copy of c. Only the exported fields are copied.
|
||||
// This is temporary, until this is provided by the language.
|
||||
// https://go-review.googlesource.com/#/c/28075/
|
||||
func CloneTLSConfig(c *tls.Config) *tls.Config {
|
||||
return &tls.Config{
|
||||
Rand: c.Rand,
|
||||
Time: c.Time,
|
||||
Certificates: c.Certificates,
|
||||
NameToCertificate: c.NameToCertificate,
|
||||
GetCertificate: c.GetCertificate,
|
||||
RootCAs: c.RootCAs,
|
||||
NextProtos: c.NextProtos,
|
||||
ServerName: c.ServerName,
|
||||
ClientAuth: c.ClientAuth,
|
||||
ClientCAs: c.ClientCAs,
|
||||
InsecureSkipVerify: c.InsecureSkipVerify,
|
||||
CipherSuites: c.CipherSuites,
|
||||
PreferServerCipherSuites: c.PreferServerCipherSuites,
|
||||
SessionTicketsDisabled: c.SessionTicketsDisabled,
|
||||
SessionTicketKey: c.SessionTicketKey,
|
||||
ClientSessionCache: c.ClientSessionCache,
|
||||
MinVersion: c.MinVersion,
|
||||
MaxVersion: c.MaxVersion,
|
||||
CurvePreferences: c.CurvePreferences,
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user