mirror of
https://github.com/openfaas/faasd.git
synced 2025-06-19 12:36:38 +00:00
Migrate to containerd 1.54
Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alexellis2@gmail.com>
This commit is contained in:
committed by
Alex Ellis
parent
4c9c66812a
commit
2ae8b31ac0
153
vendor/google.golang.org/grpc/stream.go
generated
vendored
153
vendor/google.golang.org/grpc/stream.go
generated
vendored
@ -31,11 +31,13 @@ import (
|
||||
"google.golang.org/grpc/balancer"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/encoding"
|
||||
"google.golang.org/grpc/grpclog"
|
||||
"google.golang.org/grpc/internal/balancerload"
|
||||
"google.golang.org/grpc/internal/binarylog"
|
||||
"google.golang.org/grpc/internal/channelz"
|
||||
"google.golang.org/grpc/internal/grpcrand"
|
||||
"google.golang.org/grpc/internal/grpcutil"
|
||||
iresolver "google.golang.org/grpc/internal/resolver"
|
||||
"google.golang.org/grpc/internal/serviceconfig"
|
||||
"google.golang.org/grpc/internal/transport"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/peer"
|
||||
@ -50,14 +52,20 @@ import (
|
||||
// of the RPC.
|
||||
type StreamHandler func(srv interface{}, stream ServerStream) error
|
||||
|
||||
// StreamDesc represents a streaming RPC service's method specification.
|
||||
// StreamDesc represents a streaming RPC service's method specification. Used
|
||||
// on the server when registering services and on the client when initiating
|
||||
// new streams.
|
||||
type StreamDesc struct {
|
||||
StreamName string
|
||||
Handler StreamHandler
|
||||
// StreamName and Handler are only used when registering handlers on a
|
||||
// server.
|
||||
StreamName string // the name of the method excluding the service
|
||||
Handler StreamHandler // the handler called for the method
|
||||
|
||||
// At least one of these is true.
|
||||
ServerStreams bool
|
||||
ClientStreams bool
|
||||
// ServerStreams and ClientStreams are used for registering handlers on a
|
||||
// server as well as defining RPC behavior when passed to NewClientStream
|
||||
// and ClientConn.NewStream. At least one must be true.
|
||||
ServerStreams bool // indicates the server can perform streaming sends
|
||||
ClientStreams bool // indicates the client can perform streaming sends
|
||||
}
|
||||
|
||||
// Stream defines the common interface a client or server stream has to satisfy.
|
||||
@ -164,13 +172,48 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
|
||||
}
|
||||
}()
|
||||
}
|
||||
c := defaultCallInfo()
|
||||
// Provide an opportunity for the first RPC to see the first service config
|
||||
// provided by the resolver.
|
||||
if err := cc.waitForResolvedAddrs(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
mc := cc.GetMethodConfig(method)
|
||||
|
||||
var mc serviceconfig.MethodConfig
|
||||
var onCommit func()
|
||||
var newStream = func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
|
||||
return newClientStreamWithParams(ctx, desc, cc, method, mc, onCommit, done, opts...)
|
||||
}
|
||||
|
||||
rpcInfo := iresolver.RPCInfo{Context: ctx, Method: method}
|
||||
rpcConfig, err := cc.safeConfigSelector.SelectConfig(rpcInfo)
|
||||
if err != nil {
|
||||
return nil, toRPCErr(err)
|
||||
}
|
||||
|
||||
if rpcConfig != nil {
|
||||
if rpcConfig.Context != nil {
|
||||
ctx = rpcConfig.Context
|
||||
}
|
||||
mc = rpcConfig.MethodConfig
|
||||
onCommit = rpcConfig.OnCommitted
|
||||
if rpcConfig.Interceptor != nil {
|
||||
rpcInfo.Context = nil
|
||||
ns := newStream
|
||||
newStream = func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
|
||||
cs, err := rpcConfig.Interceptor.NewStream(ctx, rpcInfo, done, ns)
|
||||
if err != nil {
|
||||
return nil, toRPCErr(err)
|
||||
}
|
||||
return cs, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return newStream(ctx, func() {})
|
||||
}
|
||||
|
||||
func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, mc serviceconfig.MethodConfig, onCommit, doneFunc func(), opts ...CallOption) (_ iresolver.ClientStream, err error) {
|
||||
c := defaultCallInfo()
|
||||
if mc.WaitForReady != nil {
|
||||
c.failFast = !*mc.WaitForReady
|
||||
}
|
||||
@ -207,6 +250,7 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
|
||||
Host: cc.authority,
|
||||
Method: method,
|
||||
ContentSubtype: c.contentSubtype,
|
||||
DoneFunc: doneFunc,
|
||||
}
|
||||
|
||||
// Set our outgoing compression according to the UseCompressor CallOption, if
|
||||
@ -272,13 +316,13 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
|
||||
cancel: cancel,
|
||||
beginTime: beginTime,
|
||||
firstAttempt: true,
|
||||
onCommit: onCommit,
|
||||
}
|
||||
if !cc.dopts.disableRetry {
|
||||
cs.retryThrottler = cc.retryThrottler.Load().(*retryThrottler)
|
||||
}
|
||||
cs.binlog = binarylog.GetMethodLogger(method)
|
||||
|
||||
cs.callInfo.stream = cs
|
||||
// Only this initial attempt has stats/tracing.
|
||||
// TODO(dfawley): move to newAttempt when per-attempt stats are implemented.
|
||||
if err := cs.newAttemptLocked(sh, trInfo); err != nil {
|
||||
@ -348,7 +392,16 @@ func (cs *clientStream) newAttemptLocked(sh stats.Handler, trInfo *traceInfo) (r
|
||||
if err := cs.ctx.Err(); err != nil {
|
||||
return toRPCErr(err)
|
||||
}
|
||||
t, done, err := cs.cc.getTransport(cs.ctx, cs.callInfo.failFast, cs.callHdr.Method)
|
||||
|
||||
ctx := cs.ctx
|
||||
if cs.cc.parsedTarget.Scheme == "xds" {
|
||||
// Add extra metadata (metadata that will be added by transport) to context
|
||||
// so the balancer can see them.
|
||||
ctx = grpcutil.WithExtraMetadata(cs.ctx, metadata.Pairs(
|
||||
"content-type", grpcutil.ContentType(cs.callHdr.ContentSubtype),
|
||||
))
|
||||
}
|
||||
t, done, err := cs.cc.getTransport(ctx, cs.callInfo.failFast, cs.callHdr.Method)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -366,6 +419,11 @@ func (a *csAttempt) newStream() error {
|
||||
cs.callHdr.PreviousAttempts = cs.numRetries
|
||||
s, err := a.t.NewStream(cs.ctx, cs.callHdr)
|
||||
if err != nil {
|
||||
if _, ok := err.(transport.PerformedIOError); ok {
|
||||
// Return without converting to an RPC error so retry code can
|
||||
// inspect.
|
||||
return err
|
||||
}
|
||||
return toRPCErr(err)
|
||||
}
|
||||
cs.attempt.s = s
|
||||
@ -419,7 +477,8 @@ type clientStream struct {
|
||||
// place where we need to check if the attempt is nil.
|
||||
attempt *csAttempt
|
||||
// TODO(hedging): hedging will have multiple attempts simultaneously.
|
||||
committed bool // active attempt committed for retry?
|
||||
committed bool // active attempt committed for retry?
|
||||
onCommit func()
|
||||
buffer []func(a *csAttempt) error // operations to replay on retry
|
||||
bufferSize int // current size of buffer
|
||||
}
|
||||
@ -448,6 +507,9 @@ type csAttempt struct {
|
||||
}
|
||||
|
||||
func (cs *clientStream) commitAttemptLocked() {
|
||||
if !cs.committed && cs.onCommit != nil {
|
||||
cs.onCommit()
|
||||
}
|
||||
cs.committed = true
|
||||
cs.buffer = nil
|
||||
}
|
||||
@ -461,11 +523,21 @@ func (cs *clientStream) commitAttempt() {
|
||||
// shouldRetry returns nil if the RPC should be retried; otherwise it returns
|
||||
// the error that should be returned by the operation.
|
||||
func (cs *clientStream) shouldRetry(err error) error {
|
||||
if cs.attempt.s == nil && !cs.callInfo.failFast {
|
||||
// In the event of any error from NewStream (attempt.s == nil), we
|
||||
// never attempted to write anything to the wire, so we can retry
|
||||
// indefinitely for non-fail-fast RPCs.
|
||||
return nil
|
||||
unprocessed := false
|
||||
if cs.attempt.s == nil {
|
||||
pioErr, ok := err.(transport.PerformedIOError)
|
||||
if ok {
|
||||
// Unwrap error.
|
||||
err = toRPCErr(pioErr.Err)
|
||||
} else {
|
||||
unprocessed = true
|
||||
}
|
||||
if !ok && !cs.callInfo.failFast {
|
||||
// In the event of a non-IO operation error from NewStream, we
|
||||
// never attempted to write anything to the wire, so we can retry
|
||||
// indefinitely for non-fail-fast RPCs.
|
||||
return nil
|
||||
}
|
||||
}
|
||||
if cs.finished || cs.committed {
|
||||
// RPC is finished or committed; cannot retry.
|
||||
@ -474,13 +546,12 @@ func (cs *clientStream) shouldRetry(err error) error {
|
||||
// Wait for the trailers.
|
||||
if cs.attempt.s != nil {
|
||||
<-cs.attempt.s.Done()
|
||||
unprocessed = cs.attempt.s.Unprocessed()
|
||||
}
|
||||
if cs.firstAttempt && (cs.attempt.s == nil || cs.attempt.s.Unprocessed()) {
|
||||
if cs.firstAttempt && unprocessed {
|
||||
// First attempt, stream unprocessed: transparently retry.
|
||||
cs.firstAttempt = false
|
||||
return nil
|
||||
}
|
||||
cs.firstAttempt = false
|
||||
if cs.cc.dopts.disableRetry {
|
||||
return err
|
||||
}
|
||||
@ -488,7 +559,7 @@ func (cs *clientStream) shouldRetry(err error) error {
|
||||
pushback := 0
|
||||
hasPushback := false
|
||||
if cs.attempt.s != nil {
|
||||
if to, toErr := cs.attempt.s.TrailersOnly(); toErr != nil || !to {
|
||||
if !cs.attempt.s.TrailersOnly() {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -498,13 +569,13 @@ func (cs *clientStream) shouldRetry(err error) error {
|
||||
if len(sps) == 1 {
|
||||
var e error
|
||||
if pushback, e = strconv.Atoi(sps[0]); e != nil || pushback < 0 {
|
||||
grpclog.Infof("Server retry pushback specified to abort (%q).", sps[0])
|
||||
channelz.Infof(logger, cs.cc.channelzID, "Server retry pushback specified to abort (%q).", sps[0])
|
||||
cs.retryThrottler.throttle() // This counts as a failure for throttling.
|
||||
return err
|
||||
}
|
||||
hasPushback = true
|
||||
} else if len(sps) > 1 {
|
||||
grpclog.Warningf("Server retry pushback specified multiple values (%q); not retrying.", sps)
|
||||
channelz.Warningf(logger, cs.cc.channelzID, "Server retry pushback specified multiple values (%q); not retrying.", sps)
|
||||
cs.retryThrottler.throttle() // This counts as a failure for throttling.
|
||||
return err
|
||||
}
|
||||
@ -517,8 +588,8 @@ func (cs *clientStream) shouldRetry(err error) error {
|
||||
code = status.Convert(err).Code()
|
||||
}
|
||||
|
||||
rp := cs.methodConfig.retryPolicy
|
||||
if rp == nil || !rp.retryableStatusCodes[code] {
|
||||
rp := cs.methodConfig.RetryPolicy
|
||||
if rp == nil || !rp.RetryableStatusCodes[code] {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -527,7 +598,7 @@ func (cs *clientStream) shouldRetry(err error) error {
|
||||
if cs.retryThrottler.throttle() {
|
||||
return err
|
||||
}
|
||||
if cs.numRetries+1 >= rp.maxAttempts {
|
||||
if cs.numRetries+1 >= rp.MaxAttempts {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -536,9 +607,9 @@ func (cs *clientStream) shouldRetry(err error) error {
|
||||
dur = time.Millisecond * time.Duration(pushback)
|
||||
cs.numRetriesSincePushback = 0
|
||||
} else {
|
||||
fact := math.Pow(rp.backoffMultiplier, float64(cs.numRetriesSincePushback))
|
||||
cur := float64(rp.initialBackoff) * fact
|
||||
if max := float64(rp.maxBackoff); cur > max {
|
||||
fact := math.Pow(rp.BackoffMultiplier, float64(cs.numRetriesSincePushback))
|
||||
cur := float64(rp.InitialBackoff) * fact
|
||||
if max := float64(rp.MaxBackoff); cur > max {
|
||||
cur = max
|
||||
}
|
||||
dur = time.Duration(grpcrand.Int63n(int64(cur)))
|
||||
@ -566,6 +637,7 @@ func (cs *clientStream) retryLocked(lastErr error) error {
|
||||
cs.commitAttemptLocked()
|
||||
return err
|
||||
}
|
||||
cs.firstAttempt = false
|
||||
if err := cs.newAttemptLocked(nil, nil); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -800,6 +872,15 @@ func (cs *clientStream) finish(err error) {
|
||||
}
|
||||
cs.finished = true
|
||||
cs.commitAttemptLocked()
|
||||
if cs.attempt != nil {
|
||||
cs.attempt.finish(err)
|
||||
// after functions all rely upon having a stream.
|
||||
if cs.attempt.s != nil {
|
||||
for _, o := range cs.opts {
|
||||
o.after(cs.callInfo, cs.attempt)
|
||||
}
|
||||
}
|
||||
}
|
||||
cs.mu.Unlock()
|
||||
// For binary logging. only log cancel in finish (could be caused by RPC ctx
|
||||
// canceled or ClientConn closed). Trailer will be logged in RecvMsg.
|
||||
@ -821,15 +902,6 @@ func (cs *clientStream) finish(err error) {
|
||||
cs.cc.incrCallsSucceeded()
|
||||
}
|
||||
}
|
||||
if cs.attempt != nil {
|
||||
cs.attempt.finish(err)
|
||||
// after functions all rely upon having a stream.
|
||||
if cs.attempt.s != nil {
|
||||
for _, o := range cs.opts {
|
||||
o.after(cs.callInfo)
|
||||
}
|
||||
}
|
||||
}
|
||||
cs.cancel()
|
||||
}
|
||||
|
||||
@ -906,7 +978,7 @@ func (a *csAttempt) recvMsg(m interface{}, payInfo *payloadInfo) (err error) {
|
||||
Payload: m,
|
||||
// TODO truncate large payload.
|
||||
Data: payInfo.uncompressedBytes,
|
||||
WireLength: payInfo.wireLength,
|
||||
WireLength: payInfo.wireLength + headerLen,
|
||||
Length: len(payInfo.uncompressedBytes),
|
||||
})
|
||||
}
|
||||
@ -1067,7 +1139,6 @@ func newNonRetryClientStream(ctx context.Context, desc *StreamDesc, method strin
|
||||
t: t,
|
||||
}
|
||||
|
||||
as.callInfo.stream = as
|
||||
s, err := as.t.NewStream(as.ctx, as.callHdr)
|
||||
if err != nil {
|
||||
err = toRPCErr(err)
|
||||
@ -1489,7 +1560,7 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) {
|
||||
Payload: m,
|
||||
// TODO truncate large payload.
|
||||
Data: payInfo.uncompressedBytes,
|
||||
WireLength: payInfo.wireLength,
|
||||
WireLength: payInfo.wireLength + headerLen,
|
||||
Length: len(payInfo.uncompressedBytes),
|
||||
})
|
||||
}
|
||||
|
Reference in New Issue
Block a user