mirror of
https://github.com/openfaas/faasd.git
synced 2025-06-22 06:43:28 +00:00
Upgrade containerd to 1.6.2 and CNI to 0.9.1
Upgrades containerd, and switches to the official 64-bit ARM binary. Continues to use my binary for 32-bit arm hosts. CNI upgraded to v0.9.1 Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alexellis2@gmail.com>
This commit is contained in:
committed by
Alex Ellis
parent
449bcf2691
commit
912ac265f4
593
vendor/google.golang.org/grpc/clientconn.go
generated
vendored
593
vendor/google.golang.org/grpc/clientconn.go
generated
vendored
@ -23,6 +23,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"net/url"
|
||||
"reflect"
|
||||
"strings"
|
||||
"sync"
|
||||
@ -37,7 +38,6 @@ import (
|
||||
"google.golang.org/grpc/internal/backoff"
|
||||
"google.golang.org/grpc/internal/channelz"
|
||||
"google.golang.org/grpc/internal/grpcsync"
|
||||
"google.golang.org/grpc/internal/grpcutil"
|
||||
iresolver "google.golang.org/grpc/internal/resolver"
|
||||
"google.golang.org/grpc/internal/transport"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
@ -83,13 +83,13 @@ var (
|
||||
// errTransportCredsAndBundle indicates that creds bundle is used together
|
||||
// with other individual Transport Credentials.
|
||||
errTransportCredsAndBundle = errors.New("grpc: credentials.Bundle may not be used with individual TransportCredentials")
|
||||
// errTransportCredentialsMissing indicates that users want to transmit security
|
||||
// information (e.g., OAuth2 token) which requires secure connection on an insecure
|
||||
// connection.
|
||||
// errNoTransportCredsInBundle indicated that the configured creds bundle
|
||||
// returned a transport credentials which was nil.
|
||||
errNoTransportCredsInBundle = errors.New("grpc: credentials.Bundle must return non-nil transport credentials")
|
||||
// errTransportCredentialsMissing indicates that users want to transmit
|
||||
// security information (e.g., OAuth2 token) which requires secure
|
||||
// connection on an insecure connection.
|
||||
errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)")
|
||||
// errCredentialsConflict indicates that grpc.WithTransportCredentials()
|
||||
// and grpc.WithInsecure() are both called for a connection.
|
||||
errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)")
|
||||
)
|
||||
|
||||
const (
|
||||
@ -177,17 +177,20 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
|
||||
cc.csMgr.channelzID = cc.channelzID
|
||||
}
|
||||
|
||||
if !cc.dopts.insecure {
|
||||
if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil {
|
||||
return nil, errNoTransportSecurity
|
||||
}
|
||||
if cc.dopts.copts.TransportCredentials != nil && cc.dopts.copts.CredsBundle != nil {
|
||||
return nil, errTransportCredsAndBundle
|
||||
}
|
||||
} else {
|
||||
if cc.dopts.copts.TransportCredentials != nil || cc.dopts.copts.CredsBundle != nil {
|
||||
return nil, errCredentialsConflict
|
||||
}
|
||||
if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil {
|
||||
return nil, errNoTransportSecurity
|
||||
}
|
||||
if cc.dopts.copts.TransportCredentials != nil && cc.dopts.copts.CredsBundle != nil {
|
||||
return nil, errTransportCredsAndBundle
|
||||
}
|
||||
if cc.dopts.copts.CredsBundle != nil && cc.dopts.copts.CredsBundle.TransportCredentials() == nil {
|
||||
return nil, errNoTransportCredsInBundle
|
||||
}
|
||||
transportCreds := cc.dopts.copts.TransportCredentials
|
||||
if transportCreds == nil {
|
||||
transportCreds = cc.dopts.copts.CredsBundle.TransportCredentials()
|
||||
}
|
||||
if transportCreds.Info().SecurityProtocol == "insecure" {
|
||||
for _, cd := range cc.dopts.copts.PerRPCCredentials {
|
||||
if cd.RequireTransportSecurity() {
|
||||
return nil, errTransportCredentialsMissing
|
||||
@ -248,38 +251,15 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
|
||||
}
|
||||
|
||||
// Determine the resolver to use.
|
||||
cc.parsedTarget = grpcutil.ParseTarget(cc.target, cc.dopts.copts.Dialer != nil)
|
||||
channelz.Infof(logger, cc.channelzID, "parsed scheme: %q", cc.parsedTarget.Scheme)
|
||||
resolverBuilder := cc.getResolver(cc.parsedTarget.Scheme)
|
||||
if resolverBuilder == nil {
|
||||
// If resolver builder is still nil, the parsed target's scheme is
|
||||
// not registered. Fallback to default resolver and set Endpoint to
|
||||
// the original target.
|
||||
channelz.Infof(logger, cc.channelzID, "scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme)
|
||||
cc.parsedTarget = resolver.Target{
|
||||
Scheme: resolver.GetDefaultScheme(),
|
||||
Endpoint: target,
|
||||
}
|
||||
resolverBuilder = cc.getResolver(cc.parsedTarget.Scheme)
|
||||
if resolverBuilder == nil {
|
||||
return nil, fmt.Errorf("could not get resolver for default scheme: %q", cc.parsedTarget.Scheme)
|
||||
}
|
||||
resolverBuilder, err := cc.parseTargetAndFindResolver()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
creds := cc.dopts.copts.TransportCredentials
|
||||
if creds != nil && creds.Info().ServerName != "" {
|
||||
cc.authority = creds.Info().ServerName
|
||||
} else if cc.dopts.insecure && cc.dopts.authority != "" {
|
||||
cc.authority = cc.dopts.authority
|
||||
} else if strings.HasPrefix(cc.target, "unix:") || strings.HasPrefix(cc.target, "unix-abstract:") {
|
||||
cc.authority = "localhost"
|
||||
} else if strings.HasPrefix(cc.parsedTarget.Endpoint, ":") {
|
||||
cc.authority = "localhost" + cc.parsedTarget.Endpoint
|
||||
} else {
|
||||
// Use endpoint from "scheme://authority/endpoint" as the default
|
||||
// authority for ClientConn.
|
||||
cc.authority = cc.parsedTarget.Endpoint
|
||||
cc.authority, err = determineAuthority(cc.parsedTarget.Endpoint, cc.target, cc.dopts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
channelz.Infof(logger, cc.channelzID, "Channel authority set to %q", cc.authority)
|
||||
|
||||
if cc.dopts.scChan != nil && !scSet {
|
||||
// Blocking wait for the initial service config.
|
||||
@ -305,6 +285,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
|
||||
DialCreds: credsClone,
|
||||
CredsBundle: cc.dopts.copts.CredsBundle,
|
||||
Dialer: cc.dopts.copts.Dialer,
|
||||
Authority: cc.authority,
|
||||
CustomUserAgent: cc.dopts.copts.UserAgent,
|
||||
ChannelzParentID: cc.channelzID,
|
||||
Target: cc.parsedTarget,
|
||||
@ -322,6 +303,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
|
||||
// A blocking dial blocks until the clientConn is ready.
|
||||
if cc.dopts.block {
|
||||
for {
|
||||
cc.Connect()
|
||||
s := cc.GetState()
|
||||
if s == connectivity.Ready {
|
||||
break
|
||||
@ -539,12 +521,31 @@ func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connec
|
||||
//
|
||||
// Experimental
|
||||
//
|
||||
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
|
||||
// later release.
|
||||
// Notice: This API is EXPERIMENTAL and may be changed or removed in a later
|
||||
// release.
|
||||
func (cc *ClientConn) GetState() connectivity.State {
|
||||
return cc.csMgr.getState()
|
||||
}
|
||||
|
||||
// Connect causes all subchannels in the ClientConn to attempt to connect if
|
||||
// the channel is idle. Does not wait for the connection attempts to begin
|
||||
// before returning.
|
||||
//
|
||||
// Experimental
|
||||
//
|
||||
// Notice: This API is EXPERIMENTAL and may be changed or removed in a later
|
||||
// release.
|
||||
func (cc *ClientConn) Connect() {
|
||||
cc.mu.Lock()
|
||||
defer cc.mu.Unlock()
|
||||
if cc.balancerWrapper != nil && cc.balancerWrapper.exitIdle() {
|
||||
return
|
||||
}
|
||||
for ac := range cc.conns {
|
||||
go ac.connect()
|
||||
}
|
||||
}
|
||||
|
||||
func (cc *ClientConn) scWatcher() {
|
||||
for {
|
||||
select {
|
||||
@ -632,7 +633,10 @@ func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
|
||||
}
|
||||
|
||||
var ret error
|
||||
if cc.dopts.disableServiceConfig || s.ServiceConfig == nil {
|
||||
if cc.dopts.disableServiceConfig {
|
||||
channelz.Infof(logger, cc.channelzID, "ignoring service config from resolver (%v) and applying the default because service config is disabled", s.ServiceConfig)
|
||||
cc.maybeApplyDefaultServiceConfig(s.Addresses)
|
||||
} else if s.ServiceConfig == nil {
|
||||
cc.maybeApplyDefaultServiceConfig(s.Addresses)
|
||||
// TODO: do we need to apply a failing LB policy if there is no
|
||||
// default, per the error handling design?
|
||||
@ -711,7 +715,12 @@ func (cc *ClientConn) switchBalancer(name string) {
|
||||
return
|
||||
}
|
||||
if cc.balancerWrapper != nil {
|
||||
// Don't hold cc.mu while closing the balancers. The balancers may call
|
||||
// methods that require cc.mu (e.g. cc.NewSubConn()). Holding the mutex
|
||||
// would cause a deadlock in that case.
|
||||
cc.mu.Unlock()
|
||||
cc.balancerWrapper.close()
|
||||
cc.mu.Lock()
|
||||
}
|
||||
|
||||
builder := balancer.Get(name)
|
||||
@ -840,8 +849,7 @@ func (ac *addrConn) connect() error {
|
||||
ac.updateConnectivityState(connectivity.Connecting, nil)
|
||||
ac.mu.Unlock()
|
||||
|
||||
// Start a goroutine connecting to the server asynchronously.
|
||||
go ac.resetTransport()
|
||||
ac.resetTransport()
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -878,6 +886,7 @@ func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
|
||||
// ac.state is Ready, try to find the connected address.
|
||||
var curAddrFound bool
|
||||
for _, a := range addrs {
|
||||
a.ServerName = ac.cc.getServerName(a)
|
||||
if reflect.DeepEqual(ac.curAddr, a) {
|
||||
curAddrFound = true
|
||||
break
|
||||
@ -891,6 +900,26 @@ func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
|
||||
return curAddrFound
|
||||
}
|
||||
|
||||
// getServerName determines the serverName to be used in the connection
|
||||
// handshake. The default value for the serverName is the authority on the
|
||||
// ClientConn, which either comes from the user's dial target or through an
|
||||
// authority override specified using the WithAuthority dial option. Name
|
||||
// resolvers can specify a per-address override for the serverName through the
|
||||
// resolver.Address.ServerName field which is used only if the WithAuthority
|
||||
// dial option was not used. The rationale is that per-address authority
|
||||
// overrides specified by the name resolver can represent a security risk, while
|
||||
// an override specified by the user is more dependable since they probably know
|
||||
// what they are doing.
|
||||
func (cc *ClientConn) getServerName(addr resolver.Address) string {
|
||||
if cc.dopts.authority != "" {
|
||||
return cc.dopts.authority
|
||||
}
|
||||
if addr.ServerName != "" {
|
||||
return addr.ServerName
|
||||
}
|
||||
return cc.authority
|
||||
}
|
||||
|
||||
func getMethodConfig(sc *ServiceConfig, method string) MethodConfig {
|
||||
if sc == nil {
|
||||
return MethodConfig{}
|
||||
@ -1046,12 +1075,12 @@ func (cc *ClientConn) Close() error {
|
||||
|
||||
cc.blockingpicker.close()
|
||||
|
||||
if rWrapper != nil {
|
||||
rWrapper.close()
|
||||
}
|
||||
if bWrapper != nil {
|
||||
bWrapper.close()
|
||||
}
|
||||
if rWrapper != nil {
|
||||
rWrapper.close()
|
||||
}
|
||||
|
||||
for ac := range conns {
|
||||
ac.tearDown(ErrClientConnClosing)
|
||||
@ -1130,112 +1159,86 @@ func (ac *addrConn) adjustParams(r transport.GoAwayReason) {
|
||||
}
|
||||
|
||||
func (ac *addrConn) resetTransport() {
|
||||
for i := 0; ; i++ {
|
||||
if i > 0 {
|
||||
ac.cc.resolveNow(resolver.ResolveNowOptions{})
|
||||
}
|
||||
|
||||
ac.mu.Lock()
|
||||
if ac.state == connectivity.Shutdown {
|
||||
ac.mu.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
addrs := ac.addrs
|
||||
backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx)
|
||||
// This will be the duration that dial gets to finish.
|
||||
dialDuration := minConnectTimeout
|
||||
if ac.dopts.minConnectTimeout != nil {
|
||||
dialDuration = ac.dopts.minConnectTimeout()
|
||||
}
|
||||
|
||||
if dialDuration < backoffFor {
|
||||
// Give dial more time as we keep failing to connect.
|
||||
dialDuration = backoffFor
|
||||
}
|
||||
// We can potentially spend all the time trying the first address, and
|
||||
// if the server accepts the connection and then hangs, the following
|
||||
// addresses will never be tried.
|
||||
//
|
||||
// The spec doesn't mention what should be done for multiple addresses.
|
||||
// https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md#proposed-backoff-algorithm
|
||||
connectDeadline := time.Now().Add(dialDuration)
|
||||
|
||||
ac.updateConnectivityState(connectivity.Connecting, nil)
|
||||
ac.transport = nil
|
||||
ac.mu.Lock()
|
||||
if ac.state == connectivity.Shutdown {
|
||||
ac.mu.Unlock()
|
||||
|
||||
newTr, addr, reconnect, err := ac.tryAllAddrs(addrs, connectDeadline)
|
||||
if err != nil {
|
||||
// After exhausting all addresses, the addrConn enters
|
||||
// TRANSIENT_FAILURE.
|
||||
ac.mu.Lock()
|
||||
if ac.state == connectivity.Shutdown {
|
||||
ac.mu.Unlock()
|
||||
return
|
||||
}
|
||||
ac.updateConnectivityState(connectivity.TransientFailure, err)
|
||||
|
||||
// Backoff.
|
||||
b := ac.resetBackoff
|
||||
ac.mu.Unlock()
|
||||
|
||||
timer := time.NewTimer(backoffFor)
|
||||
select {
|
||||
case <-timer.C:
|
||||
ac.mu.Lock()
|
||||
ac.backoffIdx++
|
||||
ac.mu.Unlock()
|
||||
case <-b:
|
||||
timer.Stop()
|
||||
case <-ac.ctx.Done():
|
||||
timer.Stop()
|
||||
return
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
ac.mu.Lock()
|
||||
if ac.state == connectivity.Shutdown {
|
||||
ac.mu.Unlock()
|
||||
newTr.Close(fmt.Errorf("reached connectivity state: SHUTDOWN"))
|
||||
return
|
||||
}
|
||||
ac.curAddr = addr
|
||||
ac.transport = newTr
|
||||
ac.backoffIdx = 0
|
||||
|
||||
hctx, hcancel := context.WithCancel(ac.ctx)
|
||||
ac.startHealthCheck(hctx)
|
||||
ac.mu.Unlock()
|
||||
|
||||
// Block until the created transport is down. And when this happens,
|
||||
// we restart from the top of the addr list.
|
||||
<-reconnect.Done()
|
||||
hcancel()
|
||||
// restart connecting - the top of the loop will set state to
|
||||
// CONNECTING. This is against the current connectivity semantics doc,
|
||||
// however it allows for graceful behavior for RPCs not yet dispatched
|
||||
// - unfortunate timing would otherwise lead to the RPC failing even
|
||||
// though the TRANSIENT_FAILURE state (called for by the doc) would be
|
||||
// instantaneous.
|
||||
//
|
||||
// Ideally we should transition to Idle here and block until there is
|
||||
// RPC activity that leads to the balancer requesting a reconnect of
|
||||
// the associated SubConn.
|
||||
return
|
||||
}
|
||||
|
||||
addrs := ac.addrs
|
||||
backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx)
|
||||
// This will be the duration that dial gets to finish.
|
||||
dialDuration := minConnectTimeout
|
||||
if ac.dopts.minConnectTimeout != nil {
|
||||
dialDuration = ac.dopts.minConnectTimeout()
|
||||
}
|
||||
|
||||
if dialDuration < backoffFor {
|
||||
// Give dial more time as we keep failing to connect.
|
||||
dialDuration = backoffFor
|
||||
}
|
||||
// We can potentially spend all the time trying the first address, and
|
||||
// if the server accepts the connection and then hangs, the following
|
||||
// addresses will never be tried.
|
||||
//
|
||||
// The spec doesn't mention what should be done for multiple addresses.
|
||||
// https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md#proposed-backoff-algorithm
|
||||
connectDeadline := time.Now().Add(dialDuration)
|
||||
|
||||
ac.updateConnectivityState(connectivity.Connecting, nil)
|
||||
ac.mu.Unlock()
|
||||
|
||||
if err := ac.tryAllAddrs(addrs, connectDeadline); err != nil {
|
||||
ac.cc.resolveNow(resolver.ResolveNowOptions{})
|
||||
// After exhausting all addresses, the addrConn enters
|
||||
// TRANSIENT_FAILURE.
|
||||
ac.mu.Lock()
|
||||
if ac.state == connectivity.Shutdown {
|
||||
ac.mu.Unlock()
|
||||
return
|
||||
}
|
||||
ac.updateConnectivityState(connectivity.TransientFailure, err)
|
||||
|
||||
// Backoff.
|
||||
b := ac.resetBackoff
|
||||
ac.mu.Unlock()
|
||||
|
||||
timer := time.NewTimer(backoffFor)
|
||||
select {
|
||||
case <-timer.C:
|
||||
ac.mu.Lock()
|
||||
ac.backoffIdx++
|
||||
ac.mu.Unlock()
|
||||
case <-b:
|
||||
timer.Stop()
|
||||
case <-ac.ctx.Done():
|
||||
timer.Stop()
|
||||
return
|
||||
}
|
||||
|
||||
ac.mu.Lock()
|
||||
if ac.state != connectivity.Shutdown {
|
||||
ac.updateConnectivityState(connectivity.Idle, err)
|
||||
}
|
||||
ac.mu.Unlock()
|
||||
return
|
||||
}
|
||||
// Success; reset backoff.
|
||||
ac.mu.Lock()
|
||||
ac.backoffIdx = 0
|
||||
ac.mu.Unlock()
|
||||
}
|
||||
|
||||
// tryAllAddrs tries to creates a connection to the addresses, and stop when at the
|
||||
// first successful one. It returns the transport, the address and a Event in
|
||||
// the successful case. The Event fires when the returned transport disconnects.
|
||||
func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.Time) (transport.ClientTransport, resolver.Address, *grpcsync.Event, error) {
|
||||
// tryAllAddrs tries to creates a connection to the addresses, and stop when at
|
||||
// the first successful one. It returns an error if no address was successfully
|
||||
// connected, or updates ac appropriately with the new transport.
|
||||
func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.Time) error {
|
||||
var firstConnErr error
|
||||
for _, addr := range addrs {
|
||||
ac.mu.Lock()
|
||||
if ac.state == connectivity.Shutdown {
|
||||
ac.mu.Unlock()
|
||||
return nil, resolver.Address{}, nil, errConnClosing
|
||||
return errConnClosing
|
||||
}
|
||||
|
||||
ac.cc.mu.RLock()
|
||||
@ -1250,9 +1253,9 @@ func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.T
|
||||
|
||||
channelz.Infof(logger, ac.channelzID, "Subchannel picks a new address %q to connect", addr.Addr)
|
||||
|
||||
newTr, reconnect, err := ac.createTransport(addr, copts, connectDeadline)
|
||||
err := ac.createTransport(addr, copts, connectDeadline)
|
||||
if err == nil {
|
||||
return newTr, addr, reconnect, nil
|
||||
return nil
|
||||
}
|
||||
if firstConnErr == nil {
|
||||
firstConnErr = err
|
||||
@ -1261,57 +1264,50 @@ func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.T
|
||||
}
|
||||
|
||||
// Couldn't connect to any address.
|
||||
return nil, resolver.Address{}, nil, firstConnErr
|
||||
return firstConnErr
|
||||
}
|
||||
|
||||
// createTransport creates a connection to addr. It returns the transport and a
|
||||
// Event in the successful case. The Event fires when the returned transport
|
||||
// disconnects.
|
||||
func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) (transport.ClientTransport, *grpcsync.Event, error) {
|
||||
prefaceReceived := make(chan struct{})
|
||||
onCloseCalled := make(chan struct{})
|
||||
reconnect := grpcsync.NewEvent()
|
||||
// createTransport creates a connection to addr. It returns an error if the
|
||||
// address was not successfully connected, or updates ac appropriately with the
|
||||
// new transport.
|
||||
func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) error {
|
||||
// TODO: Delete prefaceReceived and move the logic to wait for it into the
|
||||
// transport.
|
||||
prefaceReceived := grpcsync.NewEvent()
|
||||
connClosed := grpcsync.NewEvent()
|
||||
|
||||
// addr.ServerName takes precedent over ClientConn authority, if present.
|
||||
if addr.ServerName == "" {
|
||||
addr.ServerName = ac.cc.authority
|
||||
}
|
||||
|
||||
once := sync.Once{}
|
||||
onGoAway := func(r transport.GoAwayReason) {
|
||||
ac.mu.Lock()
|
||||
ac.adjustParams(r)
|
||||
once.Do(func() {
|
||||
if ac.state == connectivity.Ready {
|
||||
// Prevent this SubConn from being used for new RPCs by setting its
|
||||
// state to Connecting.
|
||||
//
|
||||
// TODO: this should be Idle when grpc-go properly supports it.
|
||||
ac.updateConnectivityState(connectivity.Connecting, nil)
|
||||
}
|
||||
})
|
||||
ac.mu.Unlock()
|
||||
reconnect.Fire()
|
||||
}
|
||||
addr.ServerName = ac.cc.getServerName(addr)
|
||||
hctx, hcancel := context.WithCancel(ac.ctx)
|
||||
hcStarted := false // protected by ac.mu
|
||||
|
||||
onClose := func() {
|
||||
ac.mu.Lock()
|
||||
once.Do(func() {
|
||||
if ac.state == connectivity.Ready {
|
||||
// Prevent this SubConn from being used for new RPCs by setting its
|
||||
// state to Connecting.
|
||||
//
|
||||
// TODO: this should be Idle when grpc-go properly supports it.
|
||||
ac.updateConnectivityState(connectivity.Connecting, nil)
|
||||
}
|
||||
})
|
||||
ac.mu.Unlock()
|
||||
close(onCloseCalled)
|
||||
reconnect.Fire()
|
||||
defer ac.mu.Unlock()
|
||||
defer connClosed.Fire()
|
||||
if !hcStarted || hctx.Err() != nil {
|
||||
// We didn't start the health check or set the state to READY, so
|
||||
// no need to do anything else here.
|
||||
//
|
||||
// OR, we have already cancelled the health check context, meaning
|
||||
// we have already called onClose once for this transport. In this
|
||||
// case it would be dangerous to clear the transport and update the
|
||||
// state, since there may be a new transport in this addrConn.
|
||||
return
|
||||
}
|
||||
hcancel()
|
||||
ac.transport = nil
|
||||
// Refresh the name resolver
|
||||
ac.cc.resolveNow(resolver.ResolveNowOptions{})
|
||||
if ac.state != connectivity.Shutdown {
|
||||
ac.updateConnectivityState(connectivity.Idle, nil)
|
||||
}
|
||||
}
|
||||
|
||||
onPrefaceReceipt := func() {
|
||||
close(prefaceReceived)
|
||||
onGoAway := func(r transport.GoAwayReason) {
|
||||
ac.mu.Lock()
|
||||
ac.adjustParams(r)
|
||||
ac.mu.Unlock()
|
||||
onClose()
|
||||
}
|
||||
|
||||
connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline)
|
||||
@ -1320,27 +1316,67 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
|
||||
copts.ChannelzParentID = ac.channelzID
|
||||
}
|
||||
|
||||
newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, addr, copts, onPrefaceReceipt, onGoAway, onClose)
|
||||
newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, addr, copts, func() { prefaceReceived.Fire() }, onGoAway, onClose)
|
||||
if err != nil {
|
||||
// newTr is either nil, or closed.
|
||||
channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v. Err: %v. Reconnecting...", addr, err)
|
||||
return nil, nil, err
|
||||
channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v. Err: %v", addr, err)
|
||||
return err
|
||||
}
|
||||
|
||||
select {
|
||||
case <-time.After(time.Until(connectDeadline)):
|
||||
case <-connectCtx.Done():
|
||||
// We didn't get the preface in time.
|
||||
newTr.Close(fmt.Errorf("failed to receive server preface within timeout"))
|
||||
channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr)
|
||||
return nil, nil, errors.New("timed out waiting for server handshake")
|
||||
case <-prefaceReceived:
|
||||
// The error we pass to Close() is immaterial since there are no open
|
||||
// streams at this point, so no trailers with error details will be sent
|
||||
// out. We just need to pass a non-nil error.
|
||||
newTr.Close(transport.ErrConnClosing)
|
||||
if connectCtx.Err() == context.DeadlineExceeded {
|
||||
err := errors.New("failed to receive server preface within timeout")
|
||||
channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v: %v", addr, err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
case <-prefaceReceived.Done():
|
||||
// We got the preface - huzzah! things are good.
|
||||
case <-onCloseCalled:
|
||||
// The transport has already closed - noop.
|
||||
return nil, nil, errors.New("connection closed")
|
||||
// TODO(deklerk) this should bail on ac.ctx.Done(). Add a test and fix.
|
||||
ac.mu.Lock()
|
||||
defer ac.mu.Unlock()
|
||||
if connClosed.HasFired() {
|
||||
// onClose called first; go idle but do nothing else.
|
||||
if ac.state != connectivity.Shutdown {
|
||||
ac.updateConnectivityState(connectivity.Idle, nil)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
if ac.state == connectivity.Shutdown {
|
||||
// This can happen if the subConn was removed while in `Connecting`
|
||||
// state. tearDown() would have set the state to `Shutdown`, but
|
||||
// would not have closed the transport since ac.transport would not
|
||||
// been set at that point.
|
||||
//
|
||||
// We run this in a goroutine because newTr.Close() calls onClose()
|
||||
// inline, which requires locking ac.mu.
|
||||
//
|
||||
// The error we pass to Close() is immaterial since there are no open
|
||||
// streams at this point, so no trailers with error details will be sent
|
||||
// out. We just need to pass a non-nil error.
|
||||
go newTr.Close(transport.ErrConnClosing)
|
||||
return nil
|
||||
}
|
||||
ac.curAddr = addr
|
||||
ac.transport = newTr
|
||||
hcStarted = true
|
||||
ac.startHealthCheck(hctx) // Will set state to READY if appropriate.
|
||||
return nil
|
||||
case <-connClosed.Done():
|
||||
// The transport has already closed. If we received the preface, too,
|
||||
// this is not an error.
|
||||
select {
|
||||
case <-prefaceReceived.Done():
|
||||
return nil
|
||||
default:
|
||||
return errors.New("connection closed before server preface received")
|
||||
}
|
||||
}
|
||||
return newTr, reconnect, nil
|
||||
}
|
||||
|
||||
// startHealthCheck starts the health checking stream (RPC) to watch the health
|
||||
@ -1424,26 +1460,14 @@ func (ac *addrConn) resetConnectBackoff() {
|
||||
ac.mu.Unlock()
|
||||
}
|
||||
|
||||
// getReadyTransport returns the transport if ac's state is READY.
|
||||
// Otherwise it returns nil, false.
|
||||
// If ac's state is IDLE, it will trigger ac to connect.
|
||||
func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) {
|
||||
// getReadyTransport returns the transport if ac's state is READY or nil if not.
|
||||
func (ac *addrConn) getReadyTransport() transport.ClientTransport {
|
||||
ac.mu.Lock()
|
||||
if ac.state == connectivity.Ready && ac.transport != nil {
|
||||
t := ac.transport
|
||||
ac.mu.Unlock()
|
||||
return t, true
|
||||
defer ac.mu.Unlock()
|
||||
if ac.state == connectivity.Ready {
|
||||
return ac.transport
|
||||
}
|
||||
var idle bool
|
||||
if ac.state == connectivity.Idle {
|
||||
idle = true
|
||||
}
|
||||
ac.mu.Unlock()
|
||||
// Trigger idle ac to connect.
|
||||
if idle {
|
||||
ac.connect()
|
||||
}
|
||||
return nil, false
|
||||
return nil
|
||||
}
|
||||
|
||||
// tearDown starts to tear down the addrConn.
|
||||
@ -1594,3 +1618,114 @@ func (cc *ClientConn) connectionError() error {
|
||||
defer cc.lceMu.Unlock()
|
||||
return cc.lastConnectionError
|
||||
}
|
||||
|
||||
func (cc *ClientConn) parseTargetAndFindResolver() (resolver.Builder, error) {
|
||||
channelz.Infof(logger, cc.channelzID, "original dial target is: %q", cc.target)
|
||||
|
||||
var rb resolver.Builder
|
||||
parsedTarget, err := parseTarget(cc.target)
|
||||
if err != nil {
|
||||
channelz.Infof(logger, cc.channelzID, "dial target %q parse failed: %v", cc.target, err)
|
||||
} else {
|
||||
channelz.Infof(logger, cc.channelzID, "parsed dial target is: %+v", parsedTarget)
|
||||
rb = cc.getResolver(parsedTarget.Scheme)
|
||||
if rb != nil {
|
||||
cc.parsedTarget = parsedTarget
|
||||
return rb, nil
|
||||
}
|
||||
}
|
||||
|
||||
// We are here because the user's dial target did not contain a scheme or
|
||||
// specified an unregistered scheme. We should fallback to the default
|
||||
// scheme, except when a custom dialer is specified in which case, we should
|
||||
// always use passthrough scheme.
|
||||
defScheme := resolver.GetDefaultScheme()
|
||||
channelz.Infof(logger, cc.channelzID, "fallback to scheme %q", defScheme)
|
||||
canonicalTarget := defScheme + ":///" + cc.target
|
||||
|
||||
parsedTarget, err = parseTarget(canonicalTarget)
|
||||
if err != nil {
|
||||
channelz.Infof(logger, cc.channelzID, "dial target %q parse failed: %v", canonicalTarget, err)
|
||||
return nil, err
|
||||
}
|
||||
channelz.Infof(logger, cc.channelzID, "parsed dial target is: %+v", parsedTarget)
|
||||
rb = cc.getResolver(parsedTarget.Scheme)
|
||||
if rb == nil {
|
||||
return nil, fmt.Errorf("could not get resolver for default scheme: %q", parsedTarget.Scheme)
|
||||
}
|
||||
cc.parsedTarget = parsedTarget
|
||||
return rb, nil
|
||||
}
|
||||
|
||||
// parseTarget uses RFC 3986 semantics to parse the given target into a
|
||||
// resolver.Target struct containing scheme, authority and endpoint. Query
|
||||
// params are stripped from the endpoint.
|
||||
func parseTarget(target string) (resolver.Target, error) {
|
||||
u, err := url.Parse(target)
|
||||
if err != nil {
|
||||
return resolver.Target{}, err
|
||||
}
|
||||
// For targets of the form "[scheme]://[authority]/endpoint, the endpoint
|
||||
// value returned from url.Parse() contains a leading "/". Although this is
|
||||
// in accordance with RFC 3986, we do not want to break existing resolver
|
||||
// implementations which expect the endpoint without the leading "/". So, we
|
||||
// end up stripping the leading "/" here. But this will result in an
|
||||
// incorrect parsing for something like "unix:///path/to/socket". Since we
|
||||
// own the "unix" resolver, we can workaround in the unix resolver by using
|
||||
// the `URL` field instead of the `Endpoint` field.
|
||||
endpoint := u.Path
|
||||
if endpoint == "" {
|
||||
endpoint = u.Opaque
|
||||
}
|
||||
endpoint = strings.TrimPrefix(endpoint, "/")
|
||||
return resolver.Target{
|
||||
Scheme: u.Scheme,
|
||||
Authority: u.Host,
|
||||
Endpoint: endpoint,
|
||||
URL: *u,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Determine channel authority. The order of precedence is as follows:
|
||||
// - user specified authority override using `WithAuthority` dial option
|
||||
// - creds' notion of server name for the authentication handshake
|
||||
// - endpoint from dial target of the form "scheme://[authority]/endpoint"
|
||||
func determineAuthority(endpoint, target string, dopts dialOptions) (string, error) {
|
||||
// Historically, we had two options for users to specify the serverName or
|
||||
// authority for a channel. One was through the transport credentials
|
||||
// (either in its constructor, or through the OverrideServerName() method).
|
||||
// The other option (for cases where WithInsecure() dial option was used)
|
||||
// was to use the WithAuthority() dial option.
|
||||
//
|
||||
// A few things have changed since:
|
||||
// - `insecure` package with an implementation of the `TransportCredentials`
|
||||
// interface for the insecure case
|
||||
// - WithAuthority() dial option support for secure credentials
|
||||
authorityFromCreds := ""
|
||||
if creds := dopts.copts.TransportCredentials; creds != nil && creds.Info().ServerName != "" {
|
||||
authorityFromCreds = creds.Info().ServerName
|
||||
}
|
||||
authorityFromDialOption := dopts.authority
|
||||
if (authorityFromCreds != "" && authorityFromDialOption != "") && authorityFromCreds != authorityFromDialOption {
|
||||
return "", fmt.Errorf("ClientConn's authority from transport creds %q and dial option %q don't match", authorityFromCreds, authorityFromDialOption)
|
||||
}
|
||||
|
||||
switch {
|
||||
case authorityFromDialOption != "":
|
||||
return authorityFromDialOption, nil
|
||||
case authorityFromCreds != "":
|
||||
return authorityFromCreds, nil
|
||||
case strings.HasPrefix(target, "unix:") || strings.HasPrefix(target, "unix-abstract:"):
|
||||
// TODO: remove when the unix resolver implements optional interface to
|
||||
// return channel authority.
|
||||
return "localhost", nil
|
||||
case strings.HasPrefix(endpoint, ":"):
|
||||
return "localhost" + endpoint, nil
|
||||
default:
|
||||
// TODO: Define an optional interface on the resolver builder to return
|
||||
// the channel authority given the user's dial target. For resolvers
|
||||
// which don't implement this interface, we will use the endpoint from
|
||||
// "scheme://authority/endpoint" as the default authority.
|
||||
return endpoint, nil
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user