Bump google.golang.org/grpc from 1.53.0 to 1.56.3

Bumps [google.golang.org/grpc](https://github.com/grpc/grpc-go) from 1.53.0 to 1.56.3.
- [Release notes](https://github.com/grpc/grpc-go/releases)
- [Commits](https://github.com/grpc/grpc-go/compare/v1.53.0...v1.56.3)

---
updated-dependencies:
- dependency-name: google.golang.org/grpc
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
This commit is contained in:
dependabot[bot]
2023-10-25 22:25:20 +00:00
committed by Alex Ellis
parent 1cb5493f72
commit 7ef56d8dae
47 changed files with 2393 additions and 1096 deletions

View File

@ -43,8 +43,8 @@ import (
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/binarylog"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/grpcutil"
"google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
@ -74,10 +74,10 @@ func init() {
srv.drainServerTransports(addr)
}
internal.AddGlobalServerOptions = func(opt ...ServerOption) {
extraServerOptions = append(extraServerOptions, opt...)
globalServerOptions = append(globalServerOptions, opt...)
}
internal.ClearGlobalServerOptions = func() {
extraServerOptions = nil
globalServerOptions = nil
}
internal.BinaryLogger = binaryLogger
internal.JoinServerOptions = newJoinServerOption
@ -115,12 +115,6 @@ type serviceInfo struct {
mdata interface{}
}
type serverWorkerData struct {
st transport.ServerTransport
wg *sync.WaitGroup
stream *transport.Stream
}
// Server is a gRPC server to serve RPC requests.
type Server struct {
opts serverOptions
@ -145,7 +139,7 @@ type Server struct {
channelzID *channelz.Identifier
czData *channelzData
serverWorkerChannels []chan *serverWorkerData
serverWorkerChannel chan func()
}
type serverOptions struct {
@ -177,13 +171,14 @@ type serverOptions struct {
}
var defaultServerOptions = serverOptions{
maxConcurrentStreams: math.MaxUint32,
maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
maxSendMessageSize: defaultServerMaxSendMessageSize,
connectionTimeout: 120 * time.Second,
writeBufferSize: defaultWriteBufSize,
readBufferSize: defaultReadBufSize,
}
var extraServerOptions []ServerOption
var globalServerOptions []ServerOption
// A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
type ServerOption interface {
@ -387,6 +382,9 @@ func MaxSendMsgSize(m int) ServerOption {
// MaxConcurrentStreams returns a ServerOption that will apply a limit on the number
// of concurrent streams to each ServerTransport.
func MaxConcurrentStreams(n uint32) ServerOption {
if n == 0 {
n = math.MaxUint32
}
return newFuncServerOption(func(o *serverOptions) {
o.maxConcurrentStreams = n
})
@ -560,47 +558,40 @@ func NumStreamWorkers(numServerWorkers uint32) ServerOption {
const serverWorkerResetThreshold = 1 << 16
// serverWorkers blocks on a *transport.Stream channel forever and waits for
// data to be fed by serveStreams. This allows different requests to be
// data to be fed by serveStreams. This allows multiple requests to be
// processed by the same goroutine, removing the need for expensive stack
// re-allocations (see the runtime.morestack problem [1]).
//
// [1] https://github.com/golang/go/issues/18138
func (s *Server) serverWorker(ch chan *serverWorkerData) {
// To make sure all server workers don't reset at the same time, choose a
// random number of iterations before resetting.
threshold := serverWorkerResetThreshold + grpcrand.Intn(serverWorkerResetThreshold)
for completed := 0; completed < threshold; completed++ {
data, ok := <-ch
func (s *Server) serverWorker() {
for completed := 0; completed < serverWorkerResetThreshold; completed++ {
f, ok := <-s.serverWorkerChannel
if !ok {
return
}
s.handleStream(data.st, data.stream, s.traceInfo(data.st, data.stream))
data.wg.Done()
f()
}
go s.serverWorker(ch)
go s.serverWorker()
}
// initServerWorkers creates worker goroutines and channels to process incoming
// initServerWorkers creates worker goroutines and a channel to process incoming
// connections to reduce the time spent overall on runtime.morestack.
func (s *Server) initServerWorkers() {
s.serverWorkerChannels = make([]chan *serverWorkerData, s.opts.numServerWorkers)
s.serverWorkerChannel = make(chan func())
for i := uint32(0); i < s.opts.numServerWorkers; i++ {
s.serverWorkerChannels[i] = make(chan *serverWorkerData)
go s.serverWorker(s.serverWorkerChannels[i])
go s.serverWorker()
}
}
func (s *Server) stopServerWorkers() {
for i := uint32(0); i < s.opts.numServerWorkers; i++ {
close(s.serverWorkerChannels[i])
}
close(s.serverWorkerChannel)
}
// NewServer creates a gRPC server which has no service registered and has not
// started to accept requests yet.
func NewServer(opt ...ServerOption) *Server {
opts := defaultServerOptions
for _, o := range extraServerOptions {
for _, o := range globalServerOptions {
o.apply(&opts)
}
for _, o := range opt {
@ -897,7 +888,7 @@ func (s *Server) drainServerTransports(addr string) {
s.mu.Lock()
conns := s.conns[addr]
for st := range conns {
st.Drain()
st.Drain("")
}
s.mu.Unlock()
}
@ -945,26 +936,26 @@ func (s *Server) serveStreams(st transport.ServerTransport) {
defer st.Close(errors.New("finished serving streams for the server transport"))
var wg sync.WaitGroup
var roundRobinCounter uint32
streamQuota := newHandlerQuota(s.opts.maxConcurrentStreams)
st.HandleStreams(func(stream *transport.Stream) {
wg.Add(1)
streamQuota.acquire()
f := func() {
defer streamQuota.release()
defer wg.Done()
s.handleStream(st, stream, s.traceInfo(st, stream))
}
if s.opts.numServerWorkers > 0 {
data := &serverWorkerData{st: st, wg: &wg, stream: stream}
select {
case s.serverWorkerChannels[atomic.AddUint32(&roundRobinCounter, 1)%s.opts.numServerWorkers] <- data:
case s.serverWorkerChannel <- f:
return
default:
// If all stream workers are busy, fallback to the default code path.
go func() {
s.handleStream(st, stream, s.traceInfo(st, stream))
wg.Done()
}()
}
} else {
go func() {
defer wg.Done()
s.handleStream(st, stream, s.traceInfo(st, stream))
}()
}
go f()
}, func(ctx context.Context, method string) context.Context {
if !EnableTracing {
return ctx
@ -1053,7 +1044,7 @@ func (s *Server) addConn(addr string, st transport.ServerTransport) bool {
if s.drain {
// Transport added after we drained our existing conns: drain it
// immediately.
st.Drain()
st.Drain("")
}
if s.conns[addr] == nil {
@ -1252,7 +1243,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
logEntry.PeerAddr = peer.Addr
}
for _, binlog := range binlogs {
binlog.Log(logEntry)
binlog.Log(ctx, logEntry)
}
}
@ -1263,6 +1254,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
var comp, decomp encoding.Compressor
var cp Compressor
var dc Decompressor
var sendCompressorName string
// If dc is set and matches the stream's compression, use it. Otherwise, try
// to find a matching registered compressor for decomp.
@ -1283,12 +1275,18 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
// NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
if s.opts.cp != nil {
cp = s.opts.cp
stream.SetSendCompress(cp.Type())
sendCompressorName = cp.Type()
} else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
// Legacy compressor not specified; attempt to respond with same encoding.
comp = encoding.GetCompressor(rc)
if comp != nil {
stream.SetSendCompress(rc)
sendCompressorName = comp.Name()
}
}
if sendCompressorName != "" {
if err := stream.SetSendCompress(sendCompressorName); err != nil {
return status.Errorf(codes.Internal, "grpc: failed to set send compressor: %v", err)
}
}
@ -1312,11 +1310,12 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
}
for _, sh := range shs {
sh.HandleRPC(stream.Context(), &stats.InPayload{
RecvTime: time.Now(),
Payload: v,
WireLength: payInfo.wireLength + headerLen,
Data: d,
Length: len(d),
RecvTime: time.Now(),
Payload: v,
Length: len(d),
WireLength: payInfo.compressedLength + headerLen,
CompressedLength: payInfo.compressedLength,
Data: d,
})
}
if len(binlogs) != 0 {
@ -1324,7 +1323,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
Message: d,
}
for _, binlog := range binlogs {
binlog.Log(cm)
binlog.Log(stream.Context(), cm)
}
}
if trInfo != nil {
@ -1357,7 +1356,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
Header: h,
}
for _, binlog := range binlogs {
binlog.Log(sh)
binlog.Log(stream.Context(), sh)
}
}
st := &binarylog.ServerTrailer{
@ -1365,7 +1364,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
Err: appErr,
}
for _, binlog := range binlogs {
binlog.Log(st)
binlog.Log(stream.Context(), st)
}
}
return appErr
@ -1375,6 +1374,11 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
}
opts := &transport.Options{Last: true}
// Server handler could have set new compressor by calling SetSendCompressor.
// In case it is set, we need to use it for compressing outbound message.
if stream.SendCompress() != sendCompressorName {
comp = encoding.GetCompressor(stream.SendCompress())
}
if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
if err == io.EOF {
// The entire stream is done (for unary RPC only).
@ -1402,8 +1406,8 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
Err: appErr,
}
for _, binlog := range binlogs {
binlog.Log(sh)
binlog.Log(st)
binlog.Log(stream.Context(), sh)
binlog.Log(stream.Context(), st)
}
}
return err
@ -1417,8 +1421,8 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
Message: reply,
}
for _, binlog := range binlogs {
binlog.Log(sh)
binlog.Log(sm)
binlog.Log(stream.Context(), sh)
binlog.Log(stream.Context(), sm)
}
}
if channelz.IsOn() {
@ -1430,17 +1434,16 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
// TODO: Should we be logging if writing status failed here, like above?
// Should the logging be in WriteStatus? Should we ignore the WriteStatus
// error or allow the stats handler to see it?
err = t.WriteStatus(stream, statusOK)
if len(binlogs) != 0 {
st := &binarylog.ServerTrailer{
Trailer: stream.Trailer(),
Err: appErr,
}
for _, binlog := range binlogs {
binlog.Log(st)
binlog.Log(stream.Context(), st)
}
}
return err
return t.WriteStatus(stream, statusOK)
}
// chainStreamServerInterceptors chains all stream server interceptors into one.
@ -1574,7 +1577,7 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
logEntry.PeerAddr = peer.Addr
}
for _, binlog := range ss.binlogs {
binlog.Log(logEntry)
binlog.Log(stream.Context(), logEntry)
}
}
@ -1597,12 +1600,18 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
// NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
if s.opts.cp != nil {
ss.cp = s.opts.cp
stream.SetSendCompress(s.opts.cp.Type())
ss.sendCompressorName = s.opts.cp.Type()
} else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
// Legacy compressor not specified; attempt to respond with same encoding.
ss.comp = encoding.GetCompressor(rc)
if ss.comp != nil {
stream.SetSendCompress(rc)
ss.sendCompressorName = rc
}
}
if ss.sendCompressorName != "" {
if err := stream.SetSendCompress(ss.sendCompressorName); err != nil {
return status.Errorf(codes.Internal, "grpc: failed to set send compressor: %v", err)
}
}
@ -1640,16 +1649,16 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
ss.trInfo.tr.SetError()
ss.mu.Unlock()
}
t.WriteStatus(ss.s, appStatus)
if len(ss.binlogs) != 0 {
st := &binarylog.ServerTrailer{
Trailer: ss.s.Trailer(),
Err: appErr,
}
for _, binlog := range ss.binlogs {
binlog.Log(st)
binlog.Log(stream.Context(), st)
}
}
t.WriteStatus(ss.s, appStatus)
// TODO: Should we log an error from WriteStatus here and below?
return appErr
}
@ -1658,17 +1667,16 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
ss.trInfo.tr.LazyLog(stringer("OK"), false)
ss.mu.Unlock()
}
err = t.WriteStatus(ss.s, statusOK)
if len(ss.binlogs) != 0 {
st := &binarylog.ServerTrailer{
Trailer: ss.s.Trailer(),
Err: appErr,
}
for _, binlog := range ss.binlogs {
binlog.Log(st)
binlog.Log(stream.Context(), st)
}
}
return err
return t.WriteStatus(ss.s, statusOK)
}
func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
@ -1846,7 +1854,7 @@ func (s *Server) GracefulStop() {
if !s.drain {
for _, conns := range s.conns {
for st := range conns {
st.Drain()
st.Drain("graceful_stop")
}
}
s.drain = true
@ -1935,6 +1943,60 @@ func SendHeader(ctx context.Context, md metadata.MD) error {
return nil
}
// SetSendCompressor sets a compressor for outbound messages from the server.
// It must not be called after any event that causes headers to be sent
// (see ServerStream.SetHeader for the complete list). Provided compressor is
// used when below conditions are met:
//
// - compressor is registered via encoding.RegisterCompressor
// - compressor name must exist in the client advertised compressor names
// sent in grpc-accept-encoding header. Use ClientSupportedCompressors to
// get client supported compressor names.
//
// The context provided must be the context passed to the server's handler.
// It must be noted that compressor name encoding.Identity disables the
// outbound compression.
// By default, server messages will be sent using the same compressor with
// which request messages were sent.
//
// It is not safe to call SetSendCompressor concurrently with SendHeader and
// SendMsg.
//
// # Experimental
//
// Notice: This function is EXPERIMENTAL and may be changed or removed in a
// later release.
func SetSendCompressor(ctx context.Context, name string) error {
stream, ok := ServerTransportStreamFromContext(ctx).(*transport.Stream)
if !ok || stream == nil {
return fmt.Errorf("failed to fetch the stream from the given context")
}
if err := validateSendCompressor(name, stream.ClientAdvertisedCompressors()); err != nil {
return fmt.Errorf("unable to set send compressor: %w", err)
}
return stream.SetSendCompress(name)
}
// ClientSupportedCompressors returns compressor names advertised by the client
// via grpc-accept-encoding header.
//
// The context provided must be the context passed to the server's handler.
//
// # Experimental
//
// Notice: This function is EXPERIMENTAL and may be changed or removed in a
// later release.
func ClientSupportedCompressors(ctx context.Context) ([]string, error) {
stream, ok := ServerTransportStreamFromContext(ctx).(*transport.Stream)
if !ok || stream == nil {
return nil, fmt.Errorf("failed to fetch the stream from the given context %v", ctx)
}
return strings.Split(stream.ClientAdvertisedCompressors(), ","), nil
}
// SetTrailer sets the trailer metadata that will be sent when an RPC returns.
// When called more than once, all the provided metadata will be merged.
//
@ -1969,3 +2031,51 @@ type channelzServer struct {
func (c *channelzServer) ChannelzMetric() *channelz.ServerInternalMetric {
return c.s.channelzMetric()
}
// validateSendCompressor returns an error when given compressor name cannot be
// handled by the server or the client based on the advertised compressors.
func validateSendCompressor(name, clientCompressors string) error {
if name == encoding.Identity {
return nil
}
if !grpcutil.IsCompressorNameRegistered(name) {
return fmt.Errorf("compressor not registered %q", name)
}
for _, c := range strings.Split(clientCompressors, ",") {
if c == name {
return nil // found match
}
}
return fmt.Errorf("client does not support compressor %q", name)
}
// atomicSemaphore implements a blocking, counting semaphore. acquire should be
// called synchronously; release may be called asynchronously.
type atomicSemaphore struct {
n int64
wait chan struct{}
}
func (q *atomicSemaphore) acquire() {
if atomic.AddInt64(&q.n, -1) < 0 {
// We ran out of quota. Block until a release happens.
<-q.wait
}
}
func (q *atomicSemaphore) release() {
// N.B. the "<= 0" check below should allow for this to work with multiple
// concurrent calls to acquire, but also note that with synchronous calls to
// acquire, as our system does, n will never be less than -1. There are
// fairness issues (queuing) to consider if this was to be generalized.
if atomic.AddInt64(&q.n, 1) <= 0 {
// An acquire was waiting on us. Unblock it.
q.wait <- struct{}{}
}
}
func newHandlerQuota(n uint32) *atomicSemaphore {
return &atomicSemaphore{n: int64(n), wait: make(chan struct{}, 1)}
}