Migrate to containerd v1.7.0 and update dependencies

* Updates containerd to v1.7.0 and new binary for 32-bit
Arm OSes.
* Updates Go dependencies - openfaas and external

Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alexellis2@gmail.com>
This commit is contained in:
Alex Ellis (OpenFaaS Ltd)
2023-03-19 10:55:53 +00:00
committed by Alex Ellis
parent 9efd019e86
commit c41c2cd9fc
1133 changed files with 104391 additions and 75499 deletions

View File

@ -137,6 +137,7 @@ type earlyAbortStream struct {
streamID uint32
contentSubtype string
status *status.Status
rst bool
}
func (*earlyAbortStream) isTransportResponseFrame() bool { return false }
@ -190,7 +191,7 @@ type goAway struct {
code http2.ErrCode
debugData []byte
headsUp bool
closeConn bool
closeConn error // if set, loopyWriter will exit, resulting in conn closure
}
func (*goAway) isTransportResponseFrame() bool { return false }
@ -208,6 +209,14 @@ type outFlowControlSizeRequest struct {
func (*outFlowControlSizeRequest) isTransportResponseFrame() bool { return false }
// closeConnection is an instruction to tell the loopy writer to flush the
// framer and exit, which will cause the transport's connection to be closed
// (by the client or server). The transport itself will close after the reader
// encounters the EOF caused by the connection closure.
type closeConnection struct{}
func (closeConnection) isTransportResponseFrame() bool { return false }
type outStreamState int
const (
@ -407,7 +416,7 @@ func (c *controlBuffer) get(block bool) (interface{}, error) {
select {
case <-c.ch:
case <-c.done:
return nil, ErrConnClosing
return nil, errors.New("transport closed by client")
}
}
}
@ -518,18 +527,9 @@ const minBatchSize = 1000
// As an optimization, to increase the batch size for each flush, loopy yields the processor, once
// if the batch size is too low to give stream goroutines a chance to fill it up.
func (l *loopyWriter) run() (err error) {
defer func() {
if err == ErrConnClosing {
// Don't log ErrConnClosing as error since it happens
// 1. When the connection is closed by some other known issue.
// 2. User closed the connection.
// 3. A graceful close of connection.
if logger.V(logLevel) {
logger.Infof("transport: loopyWriter.run returning. %v", err)
}
err = nil
}
}()
// Always flush the writer before exiting in case there are pending frames
// to be sent.
defer l.framer.writer.Flush()
for {
it, err := l.cbuf.get(true)
if err != nil {
@ -573,7 +573,6 @@ func (l *loopyWriter) run() (err error) {
}
l.framer.writer.Flush()
break hasdata
}
}
}
@ -654,19 +653,20 @@ func (l *loopyWriter) headerHandler(h *headerFrame) error {
itl: &itemList{},
wq: h.wq,
}
str.itl.enqueue(h)
return l.originateStream(str)
return l.originateStream(str, h)
}
func (l *loopyWriter) originateStream(str *outStream) error {
hdr := str.itl.dequeue().(*headerFrame)
if err := hdr.initStream(str.id); err != nil {
if err == ErrConnClosing {
return err
}
// Other errors(errStreamDrain) need not close transport.
func (l *loopyWriter) originateStream(str *outStream, hdr *headerFrame) error {
// l.draining is set when handling GoAway. In which case, we want to avoid
// creating new streams.
if l.draining {
// TODO: provide a better error with the reason we are in draining.
hdr.onOrphaned(errStreamDrain)
return nil
}
if err := hdr.initStream(str.id); err != nil {
return err
}
if err := l.writeHeader(str.id, hdr.endStream, hdr.hf, hdr.onWrite); err != nil {
return err
}
@ -762,8 +762,8 @@ func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error {
return err
}
}
if l.side == clientSide && l.draining && len(l.estdStreams) == 0 {
return ErrConnClosing
if l.draining && len(l.estdStreams) == 0 {
return errors.New("finished processing active streams while in draining mode")
}
return nil
}
@ -786,6 +786,11 @@ func (l *loopyWriter) earlyAbortStreamHandler(eas *earlyAbortStream) error {
if err := l.writeHeader(eas.streamID, true, headerFields, nil); err != nil {
return err
}
if eas.rst {
if err := l.framer.fr.WriteRSTStream(eas.streamID, http2.ErrCodeNo); err != nil {
return err
}
}
return nil
}
@ -793,7 +798,7 @@ func (l *loopyWriter) incomingGoAwayHandler(*incomingGoAway) error {
if l.side == clientSide {
l.draining = true
if len(l.estdStreams) == 0 {
return ErrConnClosing
return errors.New("received GOAWAY with no active streams")
}
}
return nil
@ -811,6 +816,13 @@ func (l *loopyWriter) goAwayHandler(g *goAway) error {
return nil
}
func (l *loopyWriter) closeConnectionHandler() error {
// Exit loopyWriter entirely by returning an error here. This will lead to
// the transport closing the connection, and, ultimately, transport
// closure.
return ErrConnClosing
}
func (l *loopyWriter) handle(i interface{}) error {
switch i := i.(type) {
case *incomingWindowUpdate:
@ -839,6 +851,8 @@ func (l *loopyWriter) handle(i interface{}) error {
return l.goAwayHandler(i)
case *outFlowControlSizeRequest:
return l.outFlowControlSizeRequestHandler(i)
case closeConnection:
return l.closeConnectionHandler()
default:
return fmt.Errorf("transport: unknown control message type %T", i)
}
@ -880,9 +894,9 @@ func (l *loopyWriter) processData() (bool, error) {
dataItem := str.itl.peek().(*dataFrame) // Peek at the first data item this stream.
// A data item is represented by a dataFrame, since it later translates into
// multiple HTTP2 data frames.
// Every dataFrame has two buffers; h that keeps grpc-message header and d that is acutal data.
// Every dataFrame has two buffers; h that keeps grpc-message header and d that is actual data.
// As an optimization to keep wire traffic low, data from d is copied to h to make as big as the
// maximum possilbe HTTP2 frame size.
// maximum possible HTTP2 frame size.
if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // Empty data frame
// Client sends out empty data frame with endStream = true

View File

@ -47,3 +47,9 @@ const (
defaultClientMaxHeaderListSize = uint32(16 << 20)
defaultServerMaxHeaderListSize = uint32(16 << 20)
)
// MaxStreamID is the upper bound for the stream ID before the current
// transport gracefully closes and new transport is created for subsequent RPCs.
// This is set to 75% of 2^31-1. Streams are identified with an unsigned 31-bit
// integer. It's exported so that tests can override it.
var MaxStreamID = uint32(math.MaxInt32 * 3 / 4)

View File

@ -46,24 +46,32 @@ import (
"google.golang.org/grpc/status"
)
// NewServerHandlerTransport returns a ServerTransport handling gRPC
// from inside an http.Handler. It requires that the http Server
// supports HTTP/2.
func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats stats.Handler) (ServerTransport, error) {
// NewServerHandlerTransport returns a ServerTransport handling gRPC from
// inside an http.Handler, or writes an HTTP error to w and returns an error.
// It requires that the http Server supports HTTP/2.
func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats []stats.Handler) (ServerTransport, error) {
if r.ProtoMajor != 2 {
return nil, errors.New("gRPC requires HTTP/2")
msg := "gRPC requires HTTP/2"
http.Error(w, msg, http.StatusBadRequest)
return nil, errors.New(msg)
}
if r.Method != "POST" {
return nil, errors.New("invalid gRPC request method")
msg := fmt.Sprintf("invalid gRPC request method %q", r.Method)
http.Error(w, msg, http.StatusBadRequest)
return nil, errors.New(msg)
}
contentType := r.Header.Get("Content-Type")
// TODO: do we assume contentType is lowercase? we did before
contentSubtype, validContentType := grpcutil.ContentSubtype(contentType)
if !validContentType {
return nil, errors.New("invalid gRPC request content-type")
msg := fmt.Sprintf("invalid gRPC request content-type %q", contentType)
http.Error(w, msg, http.StatusUnsupportedMediaType)
return nil, errors.New(msg)
}
if _, ok := w.(http.Flusher); !ok {
return nil, errors.New("gRPC requires a ResponseWriter supporting http.Flusher")
msg := "gRPC requires a ResponseWriter supporting http.Flusher"
http.Error(w, msg, http.StatusInternalServerError)
return nil, errors.New(msg)
}
st := &serverHandlerTransport{
@ -79,7 +87,9 @@ func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats sta
if v := r.Header.Get("grpc-timeout"); v != "" {
to, err := decodeTimeout(v)
if err != nil {
return nil, status.Errorf(codes.Internal, "malformed time-out: %v", err)
msg := fmt.Sprintf("malformed grpc-timeout: %v", err)
http.Error(w, msg, http.StatusBadRequest)
return nil, status.Error(codes.Internal, msg)
}
st.timeoutSet = true
st.timeout = to
@ -97,7 +107,9 @@ func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats sta
for _, v := range vv {
v, err := decodeMetadataHeader(k, v)
if err != nil {
return nil, status.Errorf(codes.Internal, "malformed binary metadata: %v", err)
msg := fmt.Sprintf("malformed binary metadata %q in header %q: %v", v, k, err)
http.Error(w, msg, http.StatusBadRequest)
return nil, status.Error(codes.Internal, msg)
}
metakv = append(metakv, k, v)
}
@ -138,15 +150,18 @@ type serverHandlerTransport struct {
// TODO make sure this is consistent across handler_server and http2_server
contentSubtype string
stats stats.Handler
stats []stats.Handler
}
func (ht *serverHandlerTransport) Close() {
ht.closeOnce.Do(ht.closeCloseChanOnce)
func (ht *serverHandlerTransport) Close(err error) {
ht.closeOnce.Do(func() {
if logger.V(logLevel) {
logger.Infof("Closing serverHandlerTransport: %v", err)
}
close(ht.closedCh)
})
}
func (ht *serverHandlerTransport) closeCloseChanOnce() { close(ht.closedCh) }
func (ht *serverHandlerTransport) RemoteAddr() net.Addr { return strAddr(ht.req.RemoteAddr) }
// strAddr is a net.Addr backed by either a TCP "ip:port" string, or
@ -228,15 +243,15 @@ func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) erro
})
if err == nil { // transport has not been closed
if ht.stats != nil {
// Note: The trailer fields are compressed with hpack after this call returns.
// No WireLength field is set here.
ht.stats.HandleRPC(s.Context(), &stats.OutTrailer{
// Note: The trailer fields are compressed with hpack after this call returns.
// No WireLength field is set here.
for _, sh := range ht.stats {
sh.HandleRPC(s.Context(), &stats.OutTrailer{
Trailer: s.trailer.Copy(),
})
}
}
ht.Close()
ht.Close(errors.New("finished writing status"))
return err
}
@ -314,10 +329,10 @@ func (ht *serverHandlerTransport) WriteHeader(s *Stream, md metadata.MD) error {
})
if err == nil {
if ht.stats != nil {
for _, sh := range ht.stats {
// Note: The header fields are compressed with hpack after this call returns.
// No WireLength field is set here.
ht.stats.HandleRPC(s.Context(), &stats.OutHeader{
sh.HandleRPC(s.Context(), &stats.OutHeader{
Header: md.Copy(),
Compression: s.sendCompress,
})
@ -346,7 +361,7 @@ func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), trace
case <-ht.req.Context().Done():
}
cancel()
ht.Close()
ht.Close(errors.New("request is done processing"))
}()
req := ht.req
@ -369,14 +384,14 @@ func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), trace
}
ctx = metadata.NewIncomingContext(ctx, ht.headerMD)
s.ctx = peer.NewContext(ctx, pr)
if ht.stats != nil {
s.ctx = ht.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
for _, sh := range ht.stats {
s.ctx = sh.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
inHeader := &stats.InHeader{
FullMethod: s.method,
RemoteAddr: ht.RemoteAddr(),
Compression: s.recvCompress,
}
ht.stats.HandleRPC(s.ctx, inHeader)
sh.HandleRPC(s.ctx, inHeader)
}
s.trReader = &transportReader{
reader: &recvBufferReader{ctx: s.ctx, ctxDone: s.ctx.Done(), recv: s.buf, freeBuffer: func(*bytes.Buffer) {}},
@ -442,10 +457,10 @@ func (ht *serverHandlerTransport) Drain() {
// mapRecvMsgError returns the non-nil err into the appropriate
// error value as expected by callers of *grpc.parser.recvMsg.
// In particular, in can only be:
// * io.EOF
// * io.ErrUnexpectedEOF
// * of type transport.ConnectionError
// * an error from the status package
// - io.EOF
// - io.ErrUnexpectedEOF
// - of type transport.ConnectionError
// - an error from the status package
func mapRecvMsgError(err error) error {
if err == io.EOF || err == io.ErrUnexpectedEOF {
return err

View File

@ -38,8 +38,10 @@ import (
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/internal/channelz"
icredentials "google.golang.org/grpc/internal/credentials"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/grpcutil"
imetadata "google.golang.org/grpc/internal/metadata"
istatus "google.golang.org/grpc/internal/status"
"google.golang.org/grpc/internal/syscall"
"google.golang.org/grpc/internal/transport/networktype"
"google.golang.org/grpc/keepalive"
@ -57,11 +59,15 @@ var clientConnectionCounter uint64
// http2Client implements the ClientTransport interface with HTTP2.
type http2Client struct {
lastRead int64 // Keep this field 64-bit aligned. Accessed atomically.
ctx context.Context
cancel context.CancelFunc
ctxDone <-chan struct{} // Cache the ctx.Done() chan.
userAgent string
lastRead int64 // Keep this field 64-bit aligned. Accessed atomically.
ctx context.Context
cancel context.CancelFunc
ctxDone <-chan struct{} // Cache the ctx.Done() chan.
userAgent string
// address contains the resolver returned address for this transport.
// If the `ServerName` field is set, it takes precedence over `CallHdr.Host`
// passed to `NewStream`, when determining the :authority header.
address resolver.Address
md metadata.MD
conn net.Conn // underlying communication channel
loopy *loopyWriter
@ -78,6 +84,7 @@ type http2Client struct {
framer *framer
// controlBuf delivers all the control related tasks (e.g., window
// updates, reset streams, and various settings) to the controller.
// Do not access controlBuf with mu held.
controlBuf *controlBuffer
fc *trInFlow
// The scheme used: https if TLS is on, http otherwise.
@ -90,7 +97,7 @@ type http2Client struct {
kp keepalive.ClientParameters
keepaliveEnabled bool
statsHandler stats.Handler
statsHandlers []stats.Handler
initialWindowSize int32
@ -98,17 +105,15 @@ type http2Client struct {
maxSendHeaderListSize *uint32
bdpEst *bdpEstimator
// onPrefaceReceipt is a callback that client transport calls upon
// receiving server preface to signal that a succefull HTTP2
// connection was established.
onPrefaceReceipt func()
maxConcurrentStreams uint32
streamQuota int64
streamsQuotaAvailable chan struct{}
waitingStreams uint32
nextID uint32
registeredCompressors string
// Do not access controlBuf with mu held.
mu sync.Mutex // guard the following variables
state transportState
activeStreams map[uint32]*Stream
@ -132,11 +137,10 @@ type http2Client struct {
kpDormant bool
// Fields below are for channelz metric collection.
channelzID int64 // channelz unique identification number
channelzID *channelz.Identifier
czData *channelzData
onGoAway func(GoAwayReason)
onClose func()
onClose func(GoAwayReason)
bufferPool *bufferPool
@ -192,7 +196,7 @@ func isTemporary(err error) bool {
// newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
// and starts to receive messages on it. Non-nil error returns if construction
// fails.
func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onPrefaceReceipt func(), onGoAway func(GoAwayReason), onClose func()) (_ *http2Client, err error) {
func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onClose func(GoAwayReason)) (_ *http2Client, err error) {
scheme := "http"
ctx, cancel := context.WithCancel(ctx)
defer func() {
@ -212,14 +216,40 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
if opts.FailOnNonTempDialError {
return nil, connectionErrorf(isTemporary(err), err, "transport: error while dialing: %v", err)
}
return nil, connectionErrorf(true, err, "transport: Error while dialing %v", err)
return nil, connectionErrorf(true, err, "transport: Error while dialing: %v", err)
}
// Any further errors will close the underlying connection
defer func(conn net.Conn) {
if err != nil {
conn.Close()
}
}(conn)
// The following defer and goroutine monitor the connectCtx for cancelation
// and deadline. On context expiration, the connection is hard closed and
// this function will naturally fail as a result. Otherwise, the defer
// waits for the goroutine to exit to prevent the context from being
// monitored (and to prevent the connection from ever being closed) after
// returning from this function.
ctxMonitorDone := grpcsync.NewEvent()
newClientCtx, newClientDone := context.WithCancel(connectCtx)
defer func() {
newClientDone() // Awaken the goroutine below if connectCtx hasn't expired.
<-ctxMonitorDone.Done() // Wait for the goroutine below to exit.
}()
go func(conn net.Conn) {
defer ctxMonitorDone.Fire() // Signal this goroutine has exited.
<-newClientCtx.Done() // Block until connectCtx expires or the defer above executes.
if err := connectCtx.Err(); err != nil {
// connectCtx expired before exiting the function. Hard close the connection.
if logger.V(logLevel) {
logger.Infof("newClientTransport: aborting due to connectCtx: %v", err)
}
conn.Close()
}
}(conn)
kp := opts.KeepaliveParams
// Validate keepalive parameters.
if kp.Time == 0 {
@ -251,15 +281,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
}
}
if transportCreds != nil {
rawConn := conn
// Pull the deadline from the connectCtx, which will be used for
// timeouts in the authentication protocol handshake. Can ignore the
// boolean as the deadline will return the zero value, which will make
// the conn not timeout on I/O operations.
deadline, _ := connectCtx.Deadline()
rawConn.SetDeadline(deadline)
conn, authInfo, err = transportCreds.ClientHandshake(connectCtx, addr.ServerName, rawConn)
rawConn.SetDeadline(time.Time{})
conn, authInfo, err = transportCreds.ClientHandshake(connectCtx, addr.ServerName, conn)
if err != nil {
return nil, connectionErrorf(isTemporary(err), err, "transport: authentication handshake failed: %v", err)
}
@ -297,6 +319,8 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
ctxDone: ctx.Done(), // Cache Done chan.
cancel: cancel,
userAgent: opts.UserAgent,
registeredCompressors: grpcutil.RegisteredCompressors(),
address: addr,
conn: conn,
remoteAddr: conn.RemoteAddr(),
localAddr: conn.LocalAddr(),
@ -311,19 +335,19 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
isSecure: isSecure,
perRPCCreds: perRPCCreds,
kp: kp,
statsHandler: opts.StatsHandler,
statsHandlers: opts.StatsHandlers,
initialWindowSize: initialWindowSize,
onPrefaceReceipt: onPrefaceReceipt,
nextID: 1,
maxConcurrentStreams: defaultMaxStreamsClient,
streamQuota: defaultMaxStreamsClient,
streamsQuotaAvailable: make(chan struct{}, 1),
czData: new(channelzData),
onGoAway: onGoAway,
onClose: onClose,
keepaliveEnabled: keepaliveEnabled,
bufferPool: newBufferPool(),
onClose: onClose,
}
// Add peer information to the http2client context.
t.ctx = peer.NewContext(t.ctx, t.getPeer())
if md, ok := addr.Metadata.(*metadata.MD); ok {
t.md = *md
@ -341,38 +365,50 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
updateFlowControl: t.updateFlowControl,
}
}
if t.statsHandler != nil {
t.ctx = t.statsHandler.TagConn(t.ctx, &stats.ConnTagInfo{
for _, sh := range t.statsHandlers {
t.ctx = sh.TagConn(t.ctx, &stats.ConnTagInfo{
RemoteAddr: t.remoteAddr,
LocalAddr: t.localAddr,
})
connBegin := &stats.ConnBegin{
Client: true,
}
t.statsHandler.HandleConn(t.ctx, connBegin)
sh.HandleConn(t.ctx, connBegin)
}
if channelz.IsOn() {
t.channelzID = channelz.RegisterNormalSocket(t, opts.ChannelzParentID, fmt.Sprintf("%s -> %s", t.localAddr, t.remoteAddr))
t.channelzID, err = channelz.RegisterNormalSocket(t, opts.ChannelzParentID, fmt.Sprintf("%s -> %s", t.localAddr, t.remoteAddr))
if err != nil {
return nil, err
}
if t.keepaliveEnabled {
t.kpDormancyCond = sync.NewCond(&t.mu)
go t.keepalive()
}
// Start the reader goroutine for incoming message. Each transport has
// a dedicated goroutine which reads HTTP2 frame from network. Then it
// dispatches the frame to the corresponding stream entity.
go t.reader()
// Start the reader goroutine for incoming messages. Each transport has a
// dedicated goroutine which reads HTTP2 frames from the network. Then it
// dispatches the frame to the corresponding stream entity. When the
// server preface is received, readerErrCh is closed. If an error occurs
// first, an error is pushed to the channel. This must be checked before
// returning from this function.
readerErrCh := make(chan error, 1)
go t.reader(readerErrCh)
defer func() {
if err == nil {
err = <-readerErrCh
}
if err != nil {
t.Close(err)
}
}()
// Send connection preface to server.
n, err := t.conn.Write(clientPreface)
if err != nil {
err = connectionErrorf(true, err, "transport: failed to write client preface: %v", err)
t.Close(err)
return nil, err
}
if n != len(clientPreface) {
err = connectionErrorf(true, nil, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
t.Close(err)
return nil, err
}
var ss []http2.Setting
@ -392,14 +428,12 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
err = t.framer.fr.WriteSettings(ss...)
if err != nil {
err = connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err)
t.Close(err)
return nil, err
}
// Adjust the connection flow control window if needed.
if delta := uint32(icwz - defaultWindowSize); delta > 0 {
if err := t.framer.fr.WriteWindowUpdate(0, delta); err != nil {
err = connectionErrorf(true, err, "transport: failed to write window update: %v", err)
t.Close(err)
return nil, err
}
}
@ -412,10 +446,8 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
go func() {
t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst)
err := t.loopy.run()
if err != nil {
if logger.V(logLevel) {
logger.Errorf("transport: loopyWriter.run returning. Err: %v", err)
}
if logger.V(logLevel) {
logger.Infof("transport: loopyWriter exited. Closing connection. Err: %v", err)
}
// Do not close the transport. Let reader goroutine handle it since
// there might be data in the buffers.
@ -466,7 +498,7 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
func (t *http2Client) getPeer() *peer.Peer {
return &peer.Peer{
Addr: t.remoteAddr,
AuthInfo: t.authInfo,
AuthInfo: t.authInfo, // Can be nil
}
}
@ -502,9 +534,22 @@ func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr)
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-previous-rpc-attempts", Value: strconv.Itoa(callHdr.PreviousAttempts)})
}
registeredCompressors := t.registeredCompressors
if callHdr.SendCompress != "" {
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: callHdr.SendCompress})
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-accept-encoding", Value: callHdr.SendCompress})
// Include the outgoing compressor name when compressor is not registered
// via encoding.RegisterCompressor. This is possible when client uses
// WithCompressor dial option.
if !grpcutil.IsCompressorNameRegistered(callHdr.SendCompress) {
if registeredCompressors != "" {
registeredCompressors += ","
}
registeredCompressors += callHdr.SendCompress
}
}
if registeredCompressors != "" {
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-accept-encoding", Value: registeredCompressors})
}
if dl, ok := ctx.Deadline(); ok {
// Send out timeout regardless its value. The server can detect timeout context by itself.
@ -584,7 +629,11 @@ func (t *http2Client) getTrAuthData(ctx context.Context, audience string) (map[s
for _, c := range t.perRPCCreds {
data, err := c.GetRequestMetadata(ctx, audience)
if err != nil {
if _, ok := status.FromError(err); ok {
if st, ok := status.FromError(err); ok {
// Restrict the code to the list allowed by gRFC A54.
if istatus.IsRestrictedControlPlaneCode(st) {
err = status.Errorf(codes.Internal, "transport: received per-RPC creds error with illegal status: %v", err)
}
return nil, err
}
@ -613,7 +662,14 @@ func (t *http2Client) getCallAuthData(ctx context.Context, audience string, call
}
data, err := callCreds.GetRequestMetadata(ctx, audience)
if err != nil {
return nil, status.Errorf(codes.Internal, "transport: %v", err)
if st, ok := status.FromError(err); ok {
// Restrict the code to the list allowed by gRFC A54.
if istatus.IsRestrictedControlPlaneCode(st) {
err = status.Errorf(codes.Internal, "transport: received per-RPC creds error with illegal status: %v", err)
}
return nil, err
}
return nil, status.Errorf(codes.Internal, "transport: per-RPC creds failed due to error: %v", err)
}
callAuthData = make(map[string]string, len(data))
for k, v := range data {
@ -629,18 +685,17 @@ func (t *http2Client) getCallAuthData(ctx context.Context, audience string, call
// NewStream errors result in transparent retry, as they mean nothing went onto
// the wire. However, there are two notable exceptions:
//
// 1. If the stream headers violate the max header list size allowed by the
// server. In this case there is no reason to retry at all, as it is
// assumed the RPC would continue to fail on subsequent attempts.
// 2. If the credentials errored when requesting their headers. In this case,
// it's possible a retry can fix the problem, but indefinitely transparently
// retrying is not appropriate as it is likely the credentials, if they can
// eventually succeed, would need I/O to do so.
// 1. If the stream headers violate the max header list size allowed by the
// server. It's possible this could succeed on another transport, even if
// it's unlikely, but do not transparently retry.
// 2. If the credentials errored when requesting their headers. In this case,
// it's possible a retry can fix the problem, but indefinitely transparently
// retrying is not appropriate as it is likely the credentials, if they can
// eventually succeed, would need I/O to do so.
type NewStreamError struct {
Err error
DoNotRetry bool
DoNotTransparentRetry bool
AllowTransparentRetry bool
}
func (e NewStreamError) Error() string {
@ -649,11 +704,23 @@ func (e NewStreamError) Error() string {
// NewStream creates a stream and registers it into the transport as "active"
// streams. All non-nil errors returned will be *NewStreamError.
func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) {
func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error) {
ctx = peer.NewContext(ctx, t.getPeer())
// ServerName field of the resolver returned address takes precedence over
// Host field of CallHdr to determine the :authority header. This is because,
// the ServerName field takes precedence for server authentication during
// TLS handshake, and the :authority header should match the value used
// for server authentication.
if t.address.ServerName != "" {
newCallHdr := *callHdr
newCallHdr.Host = t.address.ServerName
callHdr = &newCallHdr
}
headerFields, err := t.createHeaderFields(ctx, callHdr)
if err != nil {
return nil, &NewStreamError{Err: err, DoNotTransparentRetry: true}
return nil, &NewStreamError{Err: err, AllowTransparentRetry: false}
}
s := t.newStream(ctx, callHdr)
cleanup := func(err error) {
@ -675,17 +742,13 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
endStream: false,
initStream: func(id uint32) error {
t.mu.Lock()
if state := t.state; state != reachable {
// TODO: handle transport closure in loopy instead and remove this
// initStream is never called when transport is draining.
if t.state == closing {
t.mu.Unlock()
// Do a quick cleanup.
err := error(errStreamDrain)
if state == closing {
err = ErrConnClosing
}
cleanup(err)
return err
cleanup(ErrConnClosing)
return ErrConnClosing
}
t.activeStreams[id] = s
if channelz.IsOn() {
atomic.AddInt64(&t.czData.streamsStarted, 1)
atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano())
@ -702,6 +765,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
}
firstTry := true
var ch chan struct{}
transportDrainRequired := false
checkForStreamQuota := func(it interface{}) bool {
if t.streamQuota <= 0 { // Can go negative if server decreases it.
if firstTry {
@ -717,8 +781,20 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
h := it.(*headerFrame)
h.streamID = t.nextID
t.nextID += 2
// Drain client transport if nextID > MaxStreamID which signals gRPC that
// the connection is closed and a new one must be created for subsequent RPCs.
transportDrainRequired = t.nextID > MaxStreamID
s.id = h.streamID
s.fc = &inFlow{limit: uint32(t.initialWindowSize)}
t.mu.Lock()
if t.activeStreams == nil { // Can be niled from Close().
t.mu.Unlock()
return false // Don't create a stream if the transport is already closed.
}
t.activeStreams[s.id] = s
t.mu.Unlock()
if t.streamQuota > 0 && t.waitingStreams > 0 {
select {
case t.streamsQuotaAvailable <- struct{}{}:
@ -744,22 +820,17 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
}
for {
success, err := t.controlBuf.executeAndPut(func(it interface{}) bool {
if !checkForStreamQuota(it) {
return false
}
if !checkForHeaderListSize(it) {
return false
}
return true
return checkForHeaderListSize(it) && checkForStreamQuota(it)
}, hdr)
if err != nil {
return nil, &NewStreamError{Err: err}
// Connection closed.
return nil, &NewStreamError{Err: err, AllowTransparentRetry: true}
}
if success {
break
}
if hdrListSizeErr != nil {
return nil, &NewStreamError{Err: hdrListSizeErr, DoNotRetry: true}
return nil, &NewStreamError{Err: hdrListSizeErr}
}
firstTry = false
select {
@ -767,29 +838,38 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
case <-ctx.Done():
return nil, &NewStreamError{Err: ContextErr(ctx.Err())}
case <-t.goAway:
return nil, &NewStreamError{Err: errStreamDrain}
return nil, &NewStreamError{Err: errStreamDrain, AllowTransparentRetry: true}
case <-t.ctx.Done():
return nil, &NewStreamError{Err: ErrConnClosing}
return nil, &NewStreamError{Err: ErrConnClosing, AllowTransparentRetry: true}
}
}
if t.statsHandler != nil {
if len(t.statsHandlers) != 0 {
header, ok := metadata.FromOutgoingContext(ctx)
if ok {
header.Set("user-agent", t.userAgent)
} else {
header = metadata.Pairs("user-agent", t.userAgent)
}
// Note: The header fields are compressed with hpack after this call returns.
// No WireLength field is set here.
outHeader := &stats.OutHeader{
Client: true,
FullMethod: callHdr.Method,
RemoteAddr: t.remoteAddr,
LocalAddr: t.localAddr,
Compression: callHdr.SendCompress,
Header: header,
for _, sh := range t.statsHandlers {
// Note: The header fields are compressed with hpack after this call returns.
// No WireLength field is set here.
// Note: Creating a new stats object to prevent pollution.
outHeader := &stats.OutHeader{
Client: true,
FullMethod: callHdr.Method,
RemoteAddr: t.remoteAddr,
LocalAddr: t.localAddr,
Compression: callHdr.SendCompress,
Header: header,
}
sh.HandleRPC(s.ctx, outHeader)
}
t.statsHandler.HandleRPC(s.ctx, outHeader)
}
if transportDrainRequired {
if logger.V(logLevel) {
logger.Infof("transport: t.nextID > MaxStreamID. Draining")
}
t.GracefulClose()
}
return s, nil
}
@ -872,20 +952,21 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.
// Close kicks off the shutdown process of the transport. This should be called
// only once on a transport. Once it is called, the transport should not be
// accessed any more.
//
// This method blocks until the addrConn that initiated this transport is
// re-connected. This happens because t.onClose() begins reconnect logic at the
// addrConn level and blocks until the addrConn is successfully connected.
func (t *http2Client) Close(err error) {
t.mu.Lock()
// Make sure we only Close once.
// Make sure we only close once.
if t.state == closing {
t.mu.Unlock()
return
}
// Call t.onClose before setting the state to closing to prevent the client
// from attempting to create new streams ASAP.
t.onClose()
if logger.V(logLevel) {
logger.Infof("transport: closing: %v", err)
}
// Call t.onClose ASAP to prevent the client from attempting to create new
// streams.
if t.state != draining {
t.onClose(GoAwayInvalid)
}
t.state = closing
streams := t.activeStreams
t.activeStreams = nil
@ -898,9 +979,7 @@ func (t *http2Client) Close(err error) {
t.controlBuf.finish()
t.cancel()
t.conn.Close()
if channelz.IsOn() {
channelz.RemoveEntry(t.channelzID)
}
channelz.RemoveEntry(t.channelzID)
// Append info about previous goaways if there were any, since this may be important
// for understanding the root cause for this connection to be closed.
_, goAwayDebugMessage := t.GetGoAwayReason()
@ -917,11 +996,11 @@ func (t *http2Client) Close(err error) {
for _, s := range streams {
t.closeStream(s, err, false, http2.ErrCodeNo, st, nil, false)
}
if t.statsHandler != nil {
for _, sh := range t.statsHandlers {
connEnd := &stats.ConnEnd{
Client: true,
}
t.statsHandler.HandleConn(t.ctx, connEnd)
sh.HandleConn(t.ctx, connEnd)
}
}
@ -937,11 +1016,15 @@ func (t *http2Client) GracefulClose() {
t.mu.Unlock()
return
}
if logger.V(logLevel) {
logger.Infof("transport: GracefulClose called")
}
t.onClose(GoAwayInvalid)
t.state = draining
active := len(t.activeStreams)
t.mu.Unlock()
if active == 0 {
t.Close(ErrConnClosing)
t.Close(connectionErrorf(true, nil, "no active streams left to process while draining"))
return
}
t.controlBuf.put(&incomingGoAway{})
@ -1001,13 +1084,13 @@ func (t *http2Client) updateWindow(s *Stream, n uint32) {
// for the transport and the stream based on the current bdp
// estimation.
func (t *http2Client) updateFlowControl(n uint32) {
t.mu.Lock()
for _, s := range t.activeStreams {
s.fc.newLimit(n)
}
t.mu.Unlock()
updateIWS := func(interface{}) bool {
t.initialWindowSize = int32(n)
t.mu.Lock()
for _, s := range t.activeStreams {
s.fc.newLimit(n)
}
t.mu.Unlock()
return true
}
t.controlBuf.executeAndPut(updateIWS, &outgoingWindowUpdate{streamID: 0, increment: t.fc.newLimit(n)})
@ -1099,7 +1182,7 @@ func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
statusCode, ok := http2ErrConvTab[f.ErrCode]
if !ok {
if logger.V(logLevel) {
logger.Warningf("transport: http2Client.handleRSTStream found no mapped gRPC status for the received http2 error %v", f.ErrCode)
logger.Warningf("transport: http2Client.handleRSTStream found no mapped gRPC status for the received http2 error: %v", f.ErrCode)
}
statusCode = codes.Unknown
}
@ -1213,12 +1296,14 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
default:
t.setGoAwayReason(f)
close(t.goAway)
t.controlBuf.put(&incomingGoAway{})
defer t.controlBuf.put(&incomingGoAway{}) // Defer as t.mu is currently held.
// Notify the clientconn about the GOAWAY before we set the state to
// draining, to allow the client to stop attempting to create streams
// before disallowing new streams on this connection.
t.onGoAway(t.goAwayReason)
t.state = draining
if t.state != draining {
t.onClose(t.goAwayReason)
t.state = draining
}
}
// All streams with IDs greater than the GoAwayId
// and smaller than the previous GoAway ID should be killed.
@ -1226,18 +1311,29 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
if upperLimit == 0 { // This is the first GoAway Frame.
upperLimit = math.MaxUint32 // Kill all streams after the GoAway ID.
}
t.prevGoAwayID = id
if len(t.activeStreams) == 0 {
t.mu.Unlock()
t.Close(connectionErrorf(true, nil, "received goaway and there are no active streams"))
return
}
streamsToClose := make([]*Stream, 0)
for streamID, stream := range t.activeStreams {
if streamID > id && streamID <= upperLimit {
// The stream was unprocessed by the server.
atomic.StoreUint32(&stream.unprocessed, 1)
t.closeStream(stream, errStreamDrain, false, http2.ErrCodeNo, statusGoAway, nil, false)
if streamID > id && streamID <= upperLimit {
atomic.StoreUint32(&stream.unprocessed, 1)
streamsToClose = append(streamsToClose, stream)
}
}
}
t.prevGoAwayID = id
active := len(t.activeStreams)
t.mu.Unlock()
if active == 0 {
t.Close(connectionErrorf(true, nil, "received goaway and there are no active streams"))
// Called outside t.mu because closeStream can take controlBuf's mu, which
// could induce deadlock and is not allowed.
for _, stream := range streamsToClose {
t.closeStream(stream, errStreamDrain, false, http2.ErrCodeNo, statusGoAway, nil, false)
}
}
@ -1433,7 +1529,7 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
close(s.headerChan)
}
if t.statsHandler != nil {
for _, sh := range t.statsHandlers {
if isHeader {
inHeader := &stats.InHeader{
Client: true,
@ -1441,14 +1537,14 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
Header: metadata.MD(mdata).Copy(),
Compression: s.recvCompress,
}
t.statsHandler.HandleRPC(s.ctx, inHeader)
sh.HandleRPC(s.ctx, inHeader)
} else {
inTrailer := &stats.InTrailer{
Client: true,
WireLength: int(frame.Header().Length),
Trailer: metadata.MD(mdata).Copy(),
}
t.statsHandler.HandleRPC(s.ctx, inTrailer)
sh.HandleRPC(s.ctx, inTrailer)
}
}
@ -1465,33 +1561,35 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
t.closeStream(s, io.EOF, rst, http2.ErrCodeNo, statusGen, mdata, true)
}
// reader runs as a separate goroutine in charge of reading data from network
// connection.
//
// TODO(zhaoq): currently one reader per transport. Investigate whether this is
// optimal.
// TODO(zhaoq): Check the validity of the incoming frame sequence.
func (t *http2Client) reader() {
defer close(t.readerDone)
// Check the validity of server preface.
// readServerPreface reads and handles the initial settings frame from the
// server.
func (t *http2Client) readServerPreface() error {
frame, err := t.framer.fr.ReadFrame()
if err != nil {
err = connectionErrorf(true, err, "error reading server preface: %v", err)
t.Close(err) // this kicks off resetTransport, so must be last before return
return
}
t.conn.SetReadDeadline(time.Time{}) // reset deadline once we get the settings frame (we didn't time out, yay!)
if t.keepaliveEnabled {
atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
return connectionErrorf(true, err, "error reading server preface: %v", err)
}
sf, ok := frame.(*http2.SettingsFrame)
if !ok {
// this kicks off resetTransport, so must be last before return
t.Close(connectionErrorf(true, nil, "initial http2 frame from server is not a settings frame: %T", frame))
return connectionErrorf(true, nil, "initial http2 frame from server is not a settings frame: %T", frame)
}
t.handleSettings(sf, true)
return nil
}
// reader verifies the server preface and reads all subsequent data from
// network connection. If the server preface is not read successfully, an
// error is pushed to errCh; otherwise errCh is closed with no error.
func (t *http2Client) reader(errCh chan<- error) {
defer close(t.readerDone)
if err := t.readServerPreface(); err != nil {
errCh <- err
return
}
t.onPrefaceReceipt()
t.handleSettings(sf, true)
close(errCh)
if t.keepaliveEnabled {
atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
}
// loop to keep reading incoming messages on this transport.
for {
@ -1694,3 +1792,9 @@ func (t *http2Client) getOutFlowWindow() int64 {
return -2
}
}
func (t *http2Client) stateForTesting() transportState {
t.mu.Lock()
defer t.mu.Unlock()
return t.state
}

View File

@ -36,11 +36,13 @@ import (
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
"google.golang.org/grpc/internal/grpcutil"
"google.golang.org/grpc/internal/syscall"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
@ -52,10 +54,10 @@ import (
var (
// ErrIllegalHeaderWrite indicates that setting header is illegal because of
// the stream's state.
ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHeader was already called")
ErrIllegalHeaderWrite = status.Error(codes.Internal, "transport: SendHeader called multiple times")
// ErrHeaderListSizeLimitViolation indicates that the header list size is larger
// than the limit set by peer.
ErrHeaderListSizeLimitViolation = errors.New("transport: trying to send header list size larger than the limit set by peer")
ErrHeaderListSizeLimitViolation = status.Error(codes.Internal, "transport: trying to send header list size larger than the limit set by peer")
)
// serverConnectionCounter counts the number of connections a server has seen
@ -82,7 +84,7 @@ type http2Server struct {
// updates, reset streams, and various settings) to the controller.
controlBuf *controlBuffer
fc *trInFlow
stats stats.Handler
stats []stats.Handler
// Keepalive and max-age parameters for the server.
kp keepalive.ServerParameters
// Keepalive enforcement policy.
@ -101,13 +103,13 @@ type http2Server struct {
mu sync.Mutex // guard the following
// drainChan is initialized when Drain() is called the first time.
// After which the server writes out the first GoAway(with ID 2^31-1) frame.
// Then an independent goroutine will be launched to later send the second GoAway.
// During this time we don't want to write another first GoAway(with ID 2^31 -1) frame.
// Thus call to Drain() will be a no-op if drainChan is already initialized since draining is
// already underway.
drainChan chan struct{}
// drainEvent is initialized when Drain() is called the first time. After
// which the server writes out the first GoAway(with ID 2^31-1) frame. Then
// an independent goroutine will be launched to later send the second
// GoAway. During this time we don't want to write another first GoAway(with
// ID 2^31 -1) frame. Thus call to Drain() will be a no-op if drainEvent is
// already initialized since draining is already underway.
drainEvent *grpcsync.Event
state transportState
activeStreams map[uint32]*Stream
// idle is the time instant when the connection went idle.
@ -117,7 +119,7 @@ type http2Server struct {
idle time.Time
// Fields below are for channelz metric collection.
channelzID int64 // channelz unique identification number
channelzID *channelz.Identifier
czData *channelzData
bufferPool *bufferPool
@ -231,6 +233,11 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
if kp.Timeout == 0 {
kp.Timeout = defaultServerKeepaliveTimeout
}
if kp.Time != infinity {
if err = syscall.SetTCPUserTimeout(conn, kp.Timeout); err != nil {
return nil, connectionErrorf(false, err, "transport: failed to set TCP_USER_TIMEOUT: %v", err)
}
}
kep := config.KeepalivePolicy
if kep.MinTime == 0 {
kep.MinTime = defaultKeepalivePolicyMinTime
@ -252,7 +259,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
fc: &trInFlow{limit: uint32(icwz)},
state: reachable,
activeStreams: make(map[uint32]*Stream),
stats: config.StatsHandler,
stats: config.StatsHandlers,
kp: kp,
idle: time.Now(),
kep: kep,
@ -260,6 +267,9 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
czData: new(channelzData),
bufferPool: newBufferPool(),
}
// Add peer information to the http2server context.
t.ctx = peer.NewContext(t.ctx, t.getPeer())
t.controlBuf = newControlBuffer(t.done)
if dynamicWindow {
t.bdpEst = &bdpEstimator{
@ -267,25 +277,25 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
updateFlowControl: t.updateFlowControl,
}
}
if t.stats != nil {
t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{
for _, sh := range t.stats {
t.ctx = sh.TagConn(t.ctx, &stats.ConnTagInfo{
RemoteAddr: t.remoteAddr,
LocalAddr: t.localAddr,
})
connBegin := &stats.ConnBegin{}
t.stats.HandleConn(t.ctx, connBegin)
sh.HandleConn(t.ctx, connBegin)
}
if channelz.IsOn() {
t.channelzID = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr))
t.channelzID, err = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr))
if err != nil {
return nil, err
}
t.connectionID = atomic.AddUint64(&serverConnectionCounter, 1)
t.framer.writer.Flush()
defer func() {
if err != nil {
t.Close()
t.Close(err)
}
}()
@ -323,10 +333,9 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
go func() {
t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst)
t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
if err := t.loopy.run(); err != nil {
if logger.V(logLevel) {
logger.Errorf("transport: loopyWriter.run returning. Err: %v", err)
}
err := t.loopy.run()
if logger.V(logLevel) {
logger.Infof("transport: loopyWriter exited. Closing connection. Err: %v", err)
}
t.conn.Close()
t.controlBuf.finish()
@ -336,8 +345,9 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
return t, nil
}
// operateHeader takes action on the decoded headers.
func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) {
// operateHeaders takes action on the decoded headers. Returns an error if fatal
// error encountered and transport needs to close, otherwise returns nil.
func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) error {
// Acquire max stream ID lock for entire duration
t.maxStreamMu.Lock()
defer t.maxStreamMu.Unlock()
@ -353,15 +363,12 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
rstCode: http2.ErrCodeFrameSize,
onWrite: func() {},
})
return false
return nil
}
if streamID%2 != 1 || streamID <= t.maxStreamID {
// illegal gRPC stream id.
if logger.V(logLevel) {
logger.Errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID)
}
return true
return fmt.Errorf("received an illegal stream id: %v. headers frame: %+v", streamID, frame)
}
t.maxStreamID = streamID
@ -373,13 +380,14 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
fc: &inFlow{limit: uint32(t.initialWindowSize)},
}
var (
// If a gRPC Response-Headers has already been received, then it means
// that the peer is speaking gRPC and we are in gRPC mode.
isGRPC = false
mdata = make(map[string][]string)
httpMethod string
// headerError is set if an error is encountered while parsing the headers
headerError bool
// if false, content-type was missing or invalid
isGRPC = false
contentType = ""
mdata = make(map[string][]string)
httpMethod string
// these are set if an error is encountered while parsing the headers
protocolError bool
headerError *status.Status
timeoutSet bool
timeout time.Duration
@ -390,6 +398,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
case "content-type":
contentSubtype, validContentType := grpcutil.ContentSubtype(hf.Value)
if !validContentType {
contentType = hf.Value
break
}
mdata[hf.Name] = append(mdata[hf.Name], hf.Value)
@ -405,7 +414,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
timeoutSet = true
var err error
if timeout, err = decodeTimeout(hf.Value); err != nil {
headerError = true
headerError = status.Newf(codes.Internal, "malformed grpc-timeout: %v", err)
}
// "Transports must consider requests containing the Connection header
// as malformed." - A41
@ -413,14 +422,14 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
if logger.V(logLevel) {
logger.Errorf("transport: http2Server.operateHeaders parsed a :connection header which makes a request malformed as per the HTTP/2 spec")
}
headerError = true
protocolError = true
default:
if isReservedHeader(hf.Name) && !isWhitelistedHeader(hf.Name) {
break
}
v, err := decodeMetadataHeader(hf.Name, hf.Value)
if err != nil {
headerError = true
headerError = status.Newf(codes.Internal, "malformed binary metadata %q in header %q: %v", hf.Value, hf.Name, err)
logger.Warningf("Failed to decode metadata header (%q, %q): %v", hf.Name, hf.Value, err)
break
}
@ -439,22 +448,43 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
logger.Errorf("transport: %v", errMsg)
}
t.controlBuf.put(&earlyAbortStream{
httpStatus: 400,
httpStatus: http.StatusBadRequest,
streamID: streamID,
contentSubtype: s.contentSubtype,
status: status.New(codes.Internal, errMsg),
rst: !frame.StreamEnded(),
})
return false
return nil
}
if !isGRPC || headerError {
if protocolError {
t.controlBuf.put(&cleanupStream{
streamID: streamID,
rst: true,
rstCode: http2.ErrCodeProtocol,
onWrite: func() {},
})
return false
return nil
}
if !isGRPC {
t.controlBuf.put(&earlyAbortStream{
httpStatus: http.StatusUnsupportedMediaType,
streamID: streamID,
contentSubtype: s.contentSubtype,
status: status.Newf(codes.InvalidArgument, "invalid gRPC request content-type %q", contentType),
rst: !frame.StreamEnded(),
})
return nil
}
if headerError != nil {
t.controlBuf.put(&earlyAbortStream{
httpStatus: http.StatusBadRequest,
streamID: streamID,
contentSubtype: s.contentSubtype,
status: headerError,
rst: !frame.StreamEnded(),
})
return nil
}
// "If :authority is missing, Host must be renamed to :authority." - A41
@ -479,14 +509,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
} else {
s.ctx, s.cancel = context.WithCancel(t.ctx)
}
pr := &peer.Peer{
Addr: t.remoteAddr,
}
// Attach Auth info if there is any.
if t.authInfo != nil {
pr.AuthInfo = t.authInfo
}
s.ctx = peer.NewContext(s.ctx, pr)
// Attach the received metadata to the context.
if len(mdata) > 0 {
s.ctx = metadata.NewIncomingContext(s.ctx, mdata)
@ -501,7 +524,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
if t.state != reachable {
t.mu.Unlock()
s.cancel()
return false
return nil
}
if uint32(len(t.activeStreams)) >= t.maxStreams {
t.mu.Unlock()
@ -512,21 +535,23 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
onWrite: func() {},
})
s.cancel()
return false
return nil
}
if httpMethod != http.MethodPost {
t.mu.Unlock()
errMsg := fmt.Sprintf("http2Server.operateHeaders parsed a :method field: %v which should be POST", httpMethod)
if logger.V(logLevel) {
logger.Infof("transport: http2Server.operateHeaders parsed a :method field: %v which should be POST", httpMethod)
logger.Infof("transport: %v", errMsg)
}
t.controlBuf.put(&cleanupStream{
streamID: streamID,
rst: true,
rstCode: http2.ErrCodeProtocol,
onWrite: func() {},
t.controlBuf.put(&earlyAbortStream{
httpStatus: 405,
streamID: streamID,
contentSubtype: s.contentSubtype,
status: status.New(codes.Internal, errMsg),
rst: !frame.StreamEnded(),
})
s.cancel()
return false
return nil
}
if t.inTapHandle != nil {
var err error
@ -544,8 +569,9 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
streamID: s.id,
contentSubtype: s.contentSubtype,
status: stat,
rst: !frame.StreamEnded(),
})
return false
return nil
}
}
t.activeStreams[streamID] = s
@ -561,8 +587,8 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
t.adjustWindow(s, uint32(n))
}
s.ctx = traceCtx(s.ctx, s.method)
if t.stats != nil {
s.ctx = t.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
for _, sh := range t.stats {
s.ctx = sh.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
inHeader := &stats.InHeader{
FullMethod: s.method,
RemoteAddr: t.remoteAddr,
@ -571,7 +597,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
WireLength: int(frame.Header().Length),
Header: metadata.MD(mdata).Copy(),
}
t.stats.HandleRPC(s.ctx, inHeader)
sh.HandleRPC(s.ctx, inHeader)
}
s.ctxDone = s.ctx.Done()
s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)
@ -592,7 +618,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
wq: s.wq,
})
handle(s)
return false
return nil
}
// HandleStreams receives incoming streams using the given handler. This is
@ -625,19 +651,16 @@ func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.
continue
}
if err == io.EOF || err == io.ErrUnexpectedEOF {
t.Close()
t.Close(err)
return
}
if logger.V(logLevel) {
logger.Warningf("transport: http2Server.HandleStreams failed to read frame: %v", err)
}
t.Close()
t.Close(err)
return
}
switch frame := frame.(type) {
case *http2.MetaHeadersFrame:
if t.operateHeaders(frame, handle, traceCtx) {
t.Close()
if err := t.operateHeaders(frame, handle, traceCtx); err != nil {
t.Close(err)
break
}
case *http2.DataFrame:
@ -838,8 +861,8 @@ const (
func (t *http2Server) handlePing(f *http2.PingFrame) {
if f.IsAck() {
if f.Data == goAwayPing.data && t.drainChan != nil {
close(t.drainChan)
if f.Data == goAwayPing.data && t.drainEvent != nil {
t.drainEvent.Fire()
return
}
// Maybe it's a BDP ping.
@ -881,10 +904,7 @@ func (t *http2Server) handlePing(f *http2.PingFrame) {
if t.pingStrikes > maxPingStrikes {
// Send goaway and close the connection.
if logger.V(logLevel) {
logger.Errorf("transport: Got too many pings from the client, closing the connection.")
}
t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true})
t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: errors.New("got too many pings from the client")})
}
}
@ -925,12 +945,27 @@ func (t *http2Server) checkForHeaderListSize(it interface{}) bool {
return true
}
func (t *http2Server) streamContextErr(s *Stream) error {
select {
case <-t.done:
return ErrConnClosing
default:
}
return ContextErr(s.ctx.Err())
}
// WriteHeader sends the header metadata md back to the client.
func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
if s.updateHeaderSent() || s.getState() == streamDone {
s.hdrMu.Lock()
defer s.hdrMu.Unlock()
if s.getState() == streamDone {
return t.streamContextErr(s)
}
if s.updateHeaderSent() {
return ErrIllegalHeaderWrite
}
s.hdrMu.Lock()
if md.Len() > 0 {
if s.header.Len() > 0 {
s.header = metadata.Join(s.header, md)
@ -939,10 +974,8 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
}
}
if err := t.writeHeaderLocked(s); err != nil {
s.hdrMu.Unlock()
return err
return status.Convert(err).Err()
}
s.hdrMu.Unlock()
return nil
}
@ -973,14 +1006,14 @@ func (t *http2Server) writeHeaderLocked(s *Stream) error {
t.closeStream(s, true, http2.ErrCodeInternal, false)
return ErrHeaderListSizeLimitViolation
}
if t.stats != nil {
for _, sh := range t.stats {
// Note: Headers are compressed with hpack after this call returns.
// No WireLength field is set here.
outHeader := &stats.OutHeader{
Header: s.header.Copy(),
Compression: s.sendCompress,
}
t.stats.HandleRPC(s.Context(), outHeader)
sh.HandleRPC(s.Context(), outHeader)
}
return nil
}
@ -990,17 +1023,19 @@ func (t *http2Server) writeHeaderLocked(s *Stream) error {
// TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
// OK is adopted.
func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
s.hdrMu.Lock()
defer s.hdrMu.Unlock()
if s.getState() == streamDone {
return nil
}
s.hdrMu.Lock()
// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
// first and create a slice of that exact size.
headerFields := make([]hpack.HeaderField, 0, 2) // grpc-status and grpc-message will be there if none else.
if !s.updateHeaderSent() { // No headers have been sent.
if len(s.header) > 0 { // Send a separate header frame.
if err := t.writeHeaderLocked(s); err != nil {
s.hdrMu.Unlock()
return err
}
} else { // Send a trailer only response.
@ -1029,7 +1064,7 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
endStream: true,
onWrite: t.setResetPingStrikes,
}
s.hdrMu.Unlock()
success, err := t.controlBuf.execute(t.checkForHeaderListSize, trailingHeader)
if !success {
if err != nil {
@ -1041,10 +1076,10 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
// Send a RST_STREAM after the trailers if the client has not already half-closed.
rst := s.getState() == streamActive
t.finishStream(s, rst, http2.ErrCodeNo, trailingHeader, true)
if t.stats != nil {
for _, sh := range t.stats {
// Note: The trailer fields are compressed with hpack after this call returns.
// No WireLength field is set here.
t.stats.HandleRPC(s.Context(), &stats.OutTrailer{
sh.HandleRPC(s.Context(), &stats.OutTrailer{
Trailer: s.trailer.Copy(),
})
}
@ -1056,23 +1091,12 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
if !s.isHeaderSent() { // Headers haven't been written yet.
if err := t.WriteHeader(s, nil); err != nil {
if _, ok := err.(ConnectionError); ok {
return err
}
// TODO(mmukhi, dfawley): Make sure this is the right code to return.
return status.Errorf(codes.Internal, "transport: %v", err)
return err
}
} else {
// Writing headers checks for this condition.
if s.getState() == streamDone {
// TODO(mmukhi, dfawley): Should the server write also return io.EOF?
s.cancel()
select {
case <-t.done:
return ErrConnClosing
default:
}
return ContextErr(s.ctx.Err())
return t.streamContextErr(s)
}
}
df := &dataFrame{
@ -1082,12 +1106,7 @@ func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) e
onEachWrite: t.setResetPingStrikes,
}
if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
select {
case <-t.done:
return ErrConnClosing
default:
}
return ContextErr(s.ctx.Err())
return t.streamContextErr(s)
}
return t.controlBuf.put(df)
}
@ -1149,7 +1168,7 @@ func (t *http2Server) keepalive() {
if logger.V(logLevel) {
logger.Infof("transport: closing server transport due to maximum connection age.")
}
t.Close()
t.controlBuf.put(closeConnection{})
case <-t.done:
}
return
@ -1165,10 +1184,7 @@ func (t *http2Server) keepalive() {
continue
}
if outstandingPing && kpTimeoutLeft <= 0 {
if logger.V(logLevel) {
logger.Infof("transport: closing server transport due to idleness.")
}
t.Close()
t.Close(fmt.Errorf("keepalive ping not acked within timeout %s", t.kp.Time))
return
}
if !outstandingPing {
@ -1195,12 +1211,15 @@ func (t *http2Server) keepalive() {
// Close starts shutting down the http2Server transport.
// TODO(zhaoq): Now the destruction is not blocked on any pending streams. This
// could cause some resource issue. Revisit this later.
func (t *http2Server) Close() {
func (t *http2Server) Close(err error) {
t.mu.Lock()
if t.state == closing {
t.mu.Unlock()
return
}
if logger.V(logLevel) {
logger.Infof("transport: closing: %v", err)
}
t.state = closing
streams := t.activeStreams
t.activeStreams = nil
@ -1210,25 +1229,19 @@ func (t *http2Server) Close() {
if err := t.conn.Close(); err != nil && logger.V(logLevel) {
logger.Infof("transport: error closing conn during Close: %v", err)
}
if channelz.IsOn() {
channelz.RemoveEntry(t.channelzID)
}
channelz.RemoveEntry(t.channelzID)
// Cancel all active streams.
for _, s := range streams {
s.cancel()
}
if t.stats != nil {
for _, sh := range t.stats {
connEnd := &stats.ConnEnd{}
t.stats.HandleConn(t.ctx, connEnd)
sh.HandleConn(t.ctx, connEnd)
}
}
// deleteStream deletes the stream s from transport's active streams.
func (t *http2Server) deleteStream(s *Stream, eosReceived bool) {
// In case stream sending and receiving are invoked in separate
// goroutines (e.g., bi-directional streaming), cancel needs to be
// called to interrupt the potential blocking on other goroutines.
s.cancel()
t.mu.Lock()
if _, ok := t.activeStreams[s.id]; ok {
@ -1250,6 +1263,11 @@ func (t *http2Server) deleteStream(s *Stream, eosReceived bool) {
// finishStream closes the stream and puts the trailing headerFrame into controlbuf.
func (t *http2Server) finishStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {
// In case stream sending and receiving are invoked in separate
// goroutines (e.g., bi-directional streaming), cancel needs to be
// called to interrupt the potential blocking on other goroutines.
s.cancel()
oldState := s.swapState(streamDone)
if oldState == streamDone {
// If the stream was already done, return.
@ -1269,6 +1287,11 @@ func (t *http2Server) finishStream(s *Stream, rst bool, rstCode http2.ErrCode, h
// closeStream clears the footprint of a stream when the stream is not needed any more.
func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, eosReceived bool) {
// In case stream sending and receiving are invoked in separate
// goroutines (e.g., bi-directional streaming), cancel needs to be
// called to interrupt the potential blocking on other goroutines.
s.cancel()
s.swapState(streamDone)
t.deleteStream(s, eosReceived)
@ -1287,10 +1310,10 @@ func (t *http2Server) RemoteAddr() net.Addr {
func (t *http2Server) Drain() {
t.mu.Lock()
defer t.mu.Unlock()
if t.drainChan != nil {
if t.drainEvent != nil {
return
}
t.drainChan = make(chan struct{})
t.drainEvent = grpcsync.NewEvent()
t.controlBuf.put(&goAway{code: http2.ErrCodeNo, debugData: []byte{}, headsUp: true})
}
@ -1311,19 +1334,20 @@ func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
// Stop accepting more streams now.
t.state = draining
sid := t.maxStreamID
retErr := g.closeConn
if len(t.activeStreams) == 0 {
g.closeConn = true
retErr = errors.New("second GOAWAY written and no active streams left to process")
}
t.mu.Unlock()
t.maxStreamMu.Unlock()
if err := t.framer.fr.WriteGoAway(sid, g.code, g.debugData); err != nil {
return false, err
}
if g.closeConn {
if retErr != nil {
// Abruptly close the connection following the GoAway (via
// loopywriter). But flush out what's inside the buffer first.
t.framer.writer.Flush()
return false, fmt.Errorf("transport: Connection closing")
return false, retErr
}
return true, nil
}
@ -1345,7 +1369,7 @@ func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
timer := time.NewTimer(time.Minute)
defer timer.Stop()
select {
case <-t.drainChan:
case <-t.drainEvent.Done():
case <-timer.C:
case <-t.done:
return
@ -1404,6 +1428,13 @@ func (t *http2Server) getOutFlowWindow() int64 {
}
}
func (t *http2Server) getPeer() *peer.Peer {
return &peer.Peer{
Addr: t.remoteAddr,
AuthInfo: t.authInfo, // Can be nil
}
}
func getJitter(v time.Duration) time.Duration {
if v == infinity {
return 0

View File

@ -20,7 +20,6 @@ package transport
import (
"bufio"
"bytes"
"encoding/base64"
"fmt"
"io"
@ -45,14 +44,8 @@ import (
const (
// http2MaxFrameLen specifies the max length of a HTTP2 frame.
http2MaxFrameLen = 16384 // 16KB frame
// http://http2.github.io/http2-spec/#SettingValues
// https://httpwg.org/specs/rfc7540.html#SettingValues
http2InitHeaderTableSize = 4096
// baseContentType is the base content-type for gRPC. This is a valid
// content-type on it's own, but can also include a content-subtype such as
// "proto" as a suffix after "+" or ";". See
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests
// for more details.
)
var (
@ -257,13 +250,13 @@ func encodeGrpcMessage(msg string) string {
}
func encodeGrpcMessageUnchecked(msg string) string {
var buf bytes.Buffer
var sb strings.Builder
for len(msg) > 0 {
r, size := utf8.DecodeRuneInString(msg)
for _, b := range []byte(string(r)) {
if size > 1 {
// If size > 1, r is not ascii. Always do percent encoding.
buf.WriteString(fmt.Sprintf("%%%02X", b))
fmt.Fprintf(&sb, "%%%02X", b)
continue
}
@ -272,14 +265,14 @@ func encodeGrpcMessageUnchecked(msg string) string {
//
// fmt.Sprintf("%%%02X", utf8.RuneError) gives "%FFFD".
if b >= spaceByte && b <= tildeByte && b != percentByte {
buf.WriteByte(b)
sb.WriteByte(b)
} else {
buf.WriteString(fmt.Sprintf("%%%02X", b))
fmt.Fprintf(&sb, "%%%02X", b)
}
}
msg = msg[size:]
}
return buf.String()
return sb.String()
}
// decodeGrpcMessage decodes the msg encoded by encodeGrpcMessage.
@ -297,23 +290,23 @@ func decodeGrpcMessage(msg string) string {
}
func decodeGrpcMessageUnchecked(msg string) string {
var buf bytes.Buffer
var sb strings.Builder
lenMsg := len(msg)
for i := 0; i < lenMsg; i++ {
c := msg[i]
if c == percentByte && i+2 < lenMsg {
parsed, err := strconv.ParseUint(msg[i+1:i+3], 16, 8)
if err != nil {
buf.WriteByte(c)
sb.WriteByte(c)
} else {
buf.WriteByte(byte(parsed))
sb.WriteByte(byte(parsed))
i += 2
}
} else {
buf.WriteByte(c)
sb.WriteByte(c)
}
}
return buf.String()
return sb.String()
}
type bufWriter struct {
@ -322,8 +315,6 @@ type bufWriter struct {
batchSize int
conn net.Conn
err error
onFlush func()
}
func newBufWriter(conn net.Conn, batchSize int) *bufWriter {
@ -360,9 +351,6 @@ func (w *bufWriter) Flush() error {
if w.offset == 0 {
return nil
}
if w.onFlush != nil {
w.onFlush()
}
_, w.err = w.conn.Write(w.buf[:w.offset])
w.offset = 0
return w.err

View File

@ -34,6 +34,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/resolver"
@ -42,6 +43,10 @@ import (
"google.golang.org/grpc/tap"
)
// ErrNoHeaders is used as a signal that a trailers only response was received,
// and is not a real error.
var ErrNoHeaders = errors.New("stream has no headers")
const logLevel = 2
type bufferPool struct {
@ -365,9 +370,15 @@ func (s *Stream) Header() (metadata.MD, error) {
return s.header.Copy(), nil
}
s.waitOnHeader()
if !s.headerValid {
return nil, s.status.Err()
}
if s.noHeaders {
return nil, ErrNoHeaders
}
return s.header.Copy(), nil
}
@ -522,14 +533,14 @@ type ServerConfig struct {
ConnectionTimeout time.Duration
Credentials credentials.TransportCredentials
InTapHandle tap.ServerInHandle
StatsHandler stats.Handler
StatsHandlers []stats.Handler
KeepaliveParams keepalive.ServerParameters
KeepalivePolicy keepalive.EnforcementPolicy
InitialWindowSize int32
InitialConnWindowSize int32
WriteBufferSize int
ReadBufferSize int
ChannelzParentID int64
ChannelzParentID *channelz.Identifier
MaxHeaderListSize *uint32
HeaderTableSize *uint32
}
@ -552,8 +563,8 @@ type ConnectOptions struct {
CredsBundle credentials.Bundle
// KeepaliveParams stores the keepalive parameters.
KeepaliveParams keepalive.ClientParameters
// StatsHandler stores the handler for stats.
StatsHandler stats.Handler
// StatsHandlers stores the handler for stats.
StatsHandlers []stats.Handler
// InitialWindowSize sets the initial window size for a stream.
InitialWindowSize int32
// InitialConnWindowSize sets the initial window size for a connection.
@ -563,7 +574,7 @@ type ConnectOptions struct {
// ReadBufferSize sets the size of read buffer, which in turn determines how much data can be read at most for one read syscall.
ReadBufferSize int
// ChannelzParentID sets the addrConn id which initiate the creation of this client transport.
ChannelzParentID int64
ChannelzParentID *channelz.Identifier
// MaxHeaderListSize sets the max (uncompressed) size of header list that is prepared to be received.
MaxHeaderListSize *uint32
// UseProxy specifies if a proxy should be used.
@ -572,8 +583,8 @@ type ConnectOptions struct {
// NewClientTransport establishes the transport with the required ConnectOptions
// and returns it to the caller.
func NewClientTransport(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onPrefaceReceipt func(), onGoAway func(GoAwayReason), onClose func()) (ClientTransport, error) {
return newHTTP2Client(connectCtx, ctx, addr, opts, onPrefaceReceipt, onGoAway, onClose)
func NewClientTransport(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onClose func(GoAwayReason)) (ClientTransport, error) {
return newHTTP2Client(connectCtx, ctx, addr, opts, onClose)
}
// Options provides additional hints and information for message
@ -690,7 +701,7 @@ type ServerTransport interface {
// Close tears down the transport. Once it is called, the transport
// should not be accessed any more. All the pending streams and their
// handlers will be terminated asynchronously.
Close()
Close(err error)
// RemoteAddr returns the remote network address.
RemoteAddr() net.Addr
@ -741,6 +752,12 @@ func (e ConnectionError) Origin() error {
return e.err
}
// Unwrap returns the original error of this connection error or nil when the
// origin is nil.
func (e ConnectionError) Unwrap() error {
return e.err
}
var (
// ErrConnClosing indicates that the transport is closing.
ErrConnClosing = connectionErrorf(true, nil, "transport is closing")