Update apimachinery

Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alexellis2@gmail.com>
This commit is contained in:
Alex Ellis (OpenFaaS Ltd)
2022-10-03 16:46:59 +01:00
parent 95792f8d58
commit 7c118225b2
18 changed files with 277 additions and 165 deletions

View File

@ -315,6 +315,20 @@ type ServeConnOpts struct {
// requests. If nil, BaseConfig.Handler is used. If BaseConfig
// or BaseConfig.Handler is nil, http.DefaultServeMux is used.
Handler http.Handler
// UpgradeRequest is an initial request received on a connection
// undergoing an h2c upgrade. The request body must have been
// completely read from the connection before calling ServeConn,
// and the 101 Switching Protocols response written.
UpgradeRequest *http.Request
// Settings is the decoded contents of the HTTP2-Settings header
// in an h2c upgrade request.
Settings []byte
// SawClientPreface is set if the HTTP/2 connection preface
// has already been read from the connection.
SawClientPreface bool
}
func (o *ServeConnOpts) context() context.Context {
@ -383,6 +397,7 @@ func (s *Server) ServeConn(c net.Conn, opts *ServeConnOpts) {
headerTableSize: initialHeaderTableSize,
serveG: newGoroutineLock(),
pushEnabled: true,
sawClientPreface: opts.SawClientPreface,
}
s.state.registerConn(sc)
@ -400,7 +415,7 @@ func (s *Server) ServeConn(c net.Conn, opts *ServeConnOpts) {
if s.NewWriteScheduler != nil {
sc.writeSched = s.NewWriteScheduler()
} else {
sc.writeSched = NewRandomWriteScheduler()
sc.writeSched = NewPriorityWriteScheduler(nil)
}
// These start at the RFC-specified defaults. If there is a higher
@ -465,9 +480,27 @@ func (s *Server) ServeConn(c net.Conn, opts *ServeConnOpts) {
}
}
if opts.Settings != nil {
fr := &SettingsFrame{
FrameHeader: FrameHeader{valid: true},
p: opts.Settings,
}
if err := fr.ForeachSetting(sc.processSetting); err != nil {
sc.rejectConn(ErrCodeProtocol, "invalid settings")
return
}
opts.Settings = nil
}
if hook := testHookGetServerConn; hook != nil {
hook(sc)
}
if opts.UpgradeRequest != nil {
sc.upgradeRequest(opts.UpgradeRequest)
opts.UpgradeRequest = nil
}
sc.serve()
}
@ -512,6 +545,7 @@ type serverConn struct {
// Everything following is owned by the serve loop; use serveG.check():
serveG goroutineLock // used to verify funcs are on serve()
pushEnabled bool
sawClientPreface bool // preface has already been read, used in h2c upgrade
sawFirstSettings bool // got the initial SETTINGS frame after the preface
needToSendSettingsAck bool
unackedSettings int // how many SETTINGS have we sent without ACKs?
@ -974,6 +1008,9 @@ var errPrefaceTimeout = errors.New("timeout waiting for client preface")
// returns errPrefaceTimeout on timeout, or an error if the greeting
// is invalid.
func (sc *serverConn) readPreface() error {
if sc.sawClientPreface {
return nil
}
errc := make(chan error, 1)
go func() {
// Read the client preface
@ -1915,6 +1952,26 @@ func (sc *serverConn) processHeaders(f *MetaHeadersFrame) error {
return nil
}
func (sc *serverConn) upgradeRequest(req *http.Request) {
sc.serveG.check()
id := uint32(1)
sc.maxClientStreamID = id
st := sc.newStream(id, 0, stateHalfClosedRemote)
st.reqTrailer = req.Trailer
if st.reqTrailer != nil {
st.trailer = make(http.Header)
}
rw := sc.newResponseWriter(st, req)
// Disable any read deadline set by the net/http package
// prior to the upgrade.
if sc.hs.ReadTimeout != 0 {
sc.conn.SetReadDeadline(time.Time{})
}
go sc.runHandler(rw, req, sc.handler.ServeHTTP)
}
func (st *stream) processTrailerHeaders(f *MetaHeadersFrame) error {
sc := st.sc
sc.serveG.check()
@ -2145,6 +2202,11 @@ func (sc *serverConn) newWriterAndRequestNoBody(st *stream, rp requestParam) (*r
}
req = req.WithContext(st.ctx)
rw := sc.newResponseWriter(st, req)
return rw, req, nil
}
func (sc *serverConn) newResponseWriter(st *stream, req *http.Request) *responseWriter {
rws := responseWriterStatePool.Get().(*responseWriterState)
bwSave := rws.bw
*rws = responseWriterState{} // zero all the fields
@ -2153,10 +2215,7 @@ func (sc *serverConn) newWriterAndRequestNoBody(st *stream, rp requestParam) (*r
rws.bw.Reset(chunkWriter{rws})
rws.stream = st
rws.req = req
rws.body = body
rw := &responseWriter{rws: rws}
return rw, req, nil
return &responseWriter{rws: rws}
}
// Run on its own goroutine.
@ -2316,17 +2375,18 @@ type requestBody struct {
_ incomparable
stream *stream
conn *serverConn
closed bool // for use by Close only
sawEOF bool // for use by Read only
pipe *pipe // non-nil if we have a HTTP entity message body
needsContinue bool // need to send a 100-continue
closeOnce sync.Once // for use by Close only
sawEOF bool // for use by Read only
pipe *pipe // non-nil if we have a HTTP entity message body
needsContinue bool // need to send a 100-continue
}
func (b *requestBody) Close() error {
if b.pipe != nil && !b.closed {
b.pipe.BreakWithError(errClosedBody)
}
b.closed = true
b.closeOnce.Do(func() {
if b.pipe != nil {
b.pipe.BreakWithError(errClosedBody)
}
})
return nil
}
@ -2370,7 +2430,6 @@ type responseWriterState struct {
// immutable within a request:
stream *stream
req *http.Request
body *requestBody // to close at end of request, if DATA frames didn't
conn *serverConn
// TODO: adjust buffer writing sizes based on server config, frame size updates from peer, etc
@ -2546,8 +2605,9 @@ func (rws *responseWriterState) writeChunk(p []byte) (n int, err error) {
// prior to the headers being written. If the set of trailers is fixed
// or known before the header is written, the normal Go trailers mechanism
// is preferred:
// https://golang.org/pkg/net/http/#ResponseWriter
// https://golang.org/pkg/net/http/#example_ResponseWriter_trailers
//
// https://golang.org/pkg/net/http/#ResponseWriter
// https://golang.org/pkg/net/http/#example_ResponseWriter_trailers
const TrailerPrefix = "Trailer:"
// promoteUndeclaredTrailers permits http.Handlers to set trailers
@ -2643,8 +2703,7 @@ func checkWriteHeaderCode(code int) {
// Issue 22880: require valid WriteHeader status codes.
// For now we only enforce that it's three digits.
// In the future we might block things over 599 (600 and above aren't defined
// at http://httpwg.org/specs/rfc7231.html#status.codes)
// and we might block under 200 (once we have more mature 1xx support).
// at http://httpwg.org/specs/rfc7231.html#status.codes).
// But for now any three digits.
//
// We used to send "HTTP/1.1 000 0" on the wire in responses but there's
@ -2665,13 +2724,41 @@ func (w *responseWriter) WriteHeader(code int) {
}
func (rws *responseWriterState) writeHeader(code int) {
if !rws.wroteHeader {
checkWriteHeaderCode(code)
rws.wroteHeader = true
rws.status = code
if len(rws.handlerHeader) > 0 {
rws.snapHeader = cloneHeader(rws.handlerHeader)
if rws.wroteHeader {
return
}
checkWriteHeaderCode(code)
// Handle informational headers
if code >= 100 && code <= 199 {
// Per RFC 8297 we must not clear the current header map
h := rws.handlerHeader
_, cl := h["Content-Length"]
_, te := h["Transfer-Encoding"]
if cl || te {
h = h.Clone()
h.Del("Content-Length")
h.Del("Transfer-Encoding")
}
if rws.conn.writeHeaders(rws.stream, &writeResHeaders{
streamID: rws.stream.id,
httpResCode: code,
h: h,
endStream: rws.handlerDone && !rws.hasTrailers(),
}) != nil {
rws.dirty = true
}
return
}
rws.wroteHeader = true
rws.status = code
if len(rws.handlerHeader) > 0 {
rws.snapHeader = cloneHeader(rws.handlerHeader)
}
}