[VOL-5374] go version upgrade to 1.23.1 and few other package versions upgrade
Signed-off-by: akashreddyk <akash.kankanala@radisys.com>
Change-Id: I50531e8febdc00b335ebbe5a4b1099fc3bf6d5b4
diff --git a/vendor/golang.org/x/net/http2/server.go b/vendor/golang.org/x/net/http2/server.go
index 0ccbe9b..b55547a 100644
--- a/vendor/golang.org/x/net/http2/server.go
+++ b/vendor/golang.org/x/net/http2/server.go
@@ -29,6 +29,7 @@
"bufio"
"bytes"
"context"
+ "crypto/rand"
"crypto/tls"
"errors"
"fmt"
@@ -52,10 +53,14 @@
)
const (
- prefaceTimeout = 10 * time.Second
- firstSettingsTimeout = 2 * time.Second // should be in-flight with preface anyway
- handlerChunkWriteSize = 4 << 10
- defaultMaxStreams = 250 // TODO: make this 100 as the GFE seems to?
+ prefaceTimeout = 10 * time.Second
+ firstSettingsTimeout = 2 * time.Second // should be in-flight with preface anyway
+ handlerChunkWriteSize = 4 << 10
+ defaultMaxStreams = 250 // TODO: make this 100 as the GFE seems to?
+
+ // maxQueuedControlFrames is the maximum number of control frames like
+ // SETTINGS, PING and RST_STREAM that will be queued for writing before
+ // the connection is closed to prevent memory exhaustion attacks.
maxQueuedControlFrames = 10000
)
@@ -98,6 +103,19 @@
// the HTTP/2 spec's recommendations.
MaxConcurrentStreams uint32
+ // MaxDecoderHeaderTableSize optionally specifies the http2
+ // SETTINGS_HEADER_TABLE_SIZE to send in the initial settings frame. It
+ // informs the remote endpoint of the maximum size of the header compression
+ // table used to decode header blocks, in octets. If zero, the default value
+ // of 4096 is used.
+ MaxDecoderHeaderTableSize uint32
+
+ // MaxEncoderHeaderTableSize optionally specifies an upper limit for the
+ // header compression table used for encoding request headers. Received
+ // SETTINGS_HEADER_TABLE_SIZE settings are capped at this limit. If zero,
+ // the default value of 4096 is used.
+ MaxEncoderHeaderTableSize uint32
+
// MaxReadFrameSize optionally specifies the largest frame
// this server is willing to read. A valid value is between
// 16k and 16M, inclusive. If zero or otherwise invalid, a
@@ -111,8 +129,25 @@
// IdleTimeout specifies how long until idle clients should be
// closed with a GOAWAY frame. PING frames are not considered
// activity for the purposes of IdleTimeout.
+ // If zero or negative, there is no timeout.
IdleTimeout time.Duration
+ // ReadIdleTimeout is the timeout after which a health check using a ping
+ // frame will be carried out if no frame is received on the connection.
+ // If zero, no health check is performed.
+ ReadIdleTimeout time.Duration
+
+ // PingTimeout is the timeout after which the connection will be closed
+ // if a response to a ping is not received.
+ // If zero, a default of 15 seconds is used.
+ PingTimeout time.Duration
+
+ // WriteByteTimeout is the timeout after which a connection will be
+ // closed if no data can be written to it. The timeout begins when data is
+ // available to write, and is extended whenever any bytes are written.
+ // If zero or negative, there is no timeout.
+ WriteByteTimeout time.Duration
+
// MaxUploadBufferPerConnection is the size of the initial flow
// control window for each connections. The HTTP/2 spec does not
// allow this to be smaller than 65535 or larger than 2^32-1.
@@ -130,47 +165,49 @@
// If nil, a default scheduler is chosen.
NewWriteScheduler func() WriteScheduler
+ // CountError, if non-nil, is called on HTTP/2 server errors.
+ // It's intended to increment a metric for monitoring, such
+ // as an expvar or Prometheus metric.
+ // The errType consists of only ASCII word characters.
+ CountError func(errType string)
+
// Internal state. This is a pointer (rather than embedded directly)
// so that we don't embed a Mutex in this struct, which will make the
// struct non-copyable, which might break some callers.
state *serverInternalState
+
+ // Synchronization group used for testing.
+ // Outside of tests, this is nil.
+ group synctestGroupInterface
}
-func (s *Server) initialConnRecvWindowSize() int32 {
- if s.MaxUploadBufferPerConnection > initialWindowSize {
- return s.MaxUploadBufferPerConnection
+func (s *Server) markNewGoroutine() {
+ if s.group != nil {
+ s.group.Join()
}
- return 1 << 20
}
-func (s *Server) initialStreamRecvWindowSize() int32 {
- if s.MaxUploadBufferPerStream > 0 {
- return s.MaxUploadBufferPerStream
+func (s *Server) now() time.Time {
+ if s.group != nil {
+ return s.group.Now()
}
- return 1 << 20
+ return time.Now()
}
-func (s *Server) maxReadFrameSize() uint32 {
- if v := s.MaxReadFrameSize; v >= minMaxFrameSize && v <= maxFrameSize {
- return v
+// newTimer creates a new time.Timer, or a synthetic timer in tests.
+func (s *Server) newTimer(d time.Duration) timer {
+ if s.group != nil {
+ return s.group.NewTimer(d)
}
- return defaultMaxReadFrameSize
+ return timeTimer{time.NewTimer(d)}
}
-func (s *Server) maxConcurrentStreams() uint32 {
- if v := s.MaxConcurrentStreams; v > 0 {
- return v
+// afterFunc creates a new time.AfterFunc timer, or a synthetic timer in tests.
+func (s *Server) afterFunc(d time.Duration, f func()) timer {
+ if s.group != nil {
+ return s.group.AfterFunc(d, f)
}
- return defaultMaxStreams
-}
-
-// maxQueuedControlFrames is the maximum number of control frames like
-// SETTINGS, PING and RST_STREAM that will be queued for writing before
-// the connection is closed to prevent memory exhaustion attacks.
-func (s *Server) maxQueuedControlFrames() int {
- // TODO: if anybody asks, add a Server field, and remember to define the
- // behavior of negative values.
- return maxQueuedControlFrames
+ return timeTimer{time.AfterFunc(d, f)}
}
type serverInternalState struct {
@@ -269,7 +306,7 @@
if s.TLSNextProto == nil {
s.TLSNextProto = map[string]func(*http.Server, *tls.Conn, http.Handler){}
}
- protoHandler := func(hs *http.Server, c *tls.Conn, h http.Handler) {
+ protoHandler := func(hs *http.Server, c net.Conn, h http.Handler, sawClientPreface bool) {
if testHookOnConn != nil {
testHookOnConn()
}
@@ -286,12 +323,31 @@
ctx = bc.BaseContext()
}
conf.ServeConn(c, &ServeConnOpts{
- Context: ctx,
- Handler: h,
- BaseConfig: hs,
+ Context: ctx,
+ Handler: h,
+ BaseConfig: hs,
+ SawClientPreface: sawClientPreface,
})
}
- s.TLSNextProto[NextProtoTLS] = protoHandler
+ s.TLSNextProto[NextProtoTLS] = func(hs *http.Server, c *tls.Conn, h http.Handler) {
+ protoHandler(hs, c, h, false)
+ }
+ // The "unencrypted_http2" TLSNextProto key is used to pass off non-TLS HTTP/2 conns.
+ //
+ // A connection passed in this method has already had the HTTP/2 preface read from it.
+ s.TLSNextProto[nextProtoUnencryptedHTTP2] = func(hs *http.Server, c *tls.Conn, h http.Handler) {
+ nc, err := unencryptedNetConnFromTLSConn(c)
+ if err != nil {
+ if lg := hs.ErrorLog; lg != nil {
+ lg.Print(err)
+ } else {
+ log.Print(err)
+ }
+ go c.Close()
+ return
+ }
+ protoHandler(hs, nc, h, true)
+ }
return nil
}
@@ -309,6 +365,20 @@
// requests. If nil, BaseConfig.Handler is used. If BaseConfig
// or BaseConfig.Handler is nil, http.DefaultServeMux is used.
Handler http.Handler
+
+ // UpgradeRequest is an initial request received on a connection
+ // undergoing an h2c upgrade. The request body must have been
+ // completely read from the connection before calling ServeConn,
+ // and the 101 Switching Protocols response written.
+ UpgradeRequest *http.Request
+
+ // Settings is the decoded contents of the HTTP2-Settings header
+ // in an h2c upgrade request.
+ Settings []byte
+
+ // SawClientPreface is set if the HTTP/2 connection preface
+ // has already been read from the connection.
+ SawClientPreface bool
}
func (o *ServeConnOpts) context() context.Context {
@@ -352,16 +422,22 @@
//
// The opts parameter is optional. If nil, default values are used.
func (s *Server) ServeConn(c net.Conn, opts *ServeConnOpts) {
+ s.serveConn(c, opts, nil)
+}
+
+func (s *Server) serveConn(c net.Conn, opts *ServeConnOpts, newf func(*serverConn)) {
baseCtx, cancel := serverConnBaseContext(c, opts)
defer cancel()
+ http1srv := opts.baseConfig()
+ conf := configFromServer(http1srv, s)
sc := &serverConn{
srv: s,
- hs: opts.baseConfig(),
+ hs: http1srv,
conn: c,
baseCtx: baseCtx,
remoteAddrStr: c.RemoteAddr().String(),
- bw: newBufferedWriter(c),
+ bw: newBufferedWriter(s.group, c, conf.WriteByteTimeout),
handler: opts.handler(),
streams: make(map[uint32]*stream),
readFrameCh: make(chan readFrameResult),
@@ -371,12 +447,18 @@
bodyReadCh: make(chan bodyReadMsg), // buffering doesn't matter either way
doneServing: make(chan struct{}),
clientMaxStreams: math.MaxUint32, // Section 6.5.2: "Initially, there is no limit to this value"
- advMaxStreams: s.maxConcurrentStreams(),
+ advMaxStreams: conf.MaxConcurrentStreams,
initialStreamSendWindowSize: initialWindowSize,
+ initialStreamRecvWindowSize: conf.MaxUploadBufferPerStream,
maxFrameSize: initialMaxFrameSize,
- headerTableSize: initialHeaderTableSize,
+ pingTimeout: conf.PingTimeout,
+ countErrorFunc: conf.CountError,
serveG: newGoroutineLock(),
pushEnabled: true,
+ sawClientPreface: opts.SawClientPreface,
+ }
+ if newf != nil {
+ newf(sc)
}
s.state.registerConn(sc)
@@ -387,27 +469,31 @@
// passes the connection off to us with the deadline already set.
// Write deadlines are set per stream in serverConn.newStream.
// Disarm the net.Conn write deadline here.
- if sc.hs.WriteTimeout != 0 {
+ if sc.hs.WriteTimeout > 0 {
sc.conn.SetWriteDeadline(time.Time{})
}
if s.NewWriteScheduler != nil {
sc.writeSched = s.NewWriteScheduler()
} else {
- sc.writeSched = NewRandomWriteScheduler()
+ sc.writeSched = newRoundRobinWriteScheduler()
}
// These start at the RFC-specified defaults. If there is a higher
// configured value for inflow, that will be updated when we send a
// WINDOW_UPDATE shortly after sending SETTINGS.
sc.flow.add(initialWindowSize)
- sc.inflow.add(initialWindowSize)
+ sc.inflow.init(initialWindowSize)
sc.hpackEncoder = hpack.NewEncoder(&sc.headerWriteBuf)
+ sc.hpackEncoder.SetMaxDynamicTableSizeLimit(conf.MaxEncoderHeaderTableSize)
fr := NewFramer(sc.bw, c)
- fr.ReadMetaHeaders = hpack.NewDecoder(initialHeaderTableSize, nil)
+ if conf.CountError != nil {
+ fr.countError = conf.CountError
+ }
+ fr.ReadMetaHeaders = hpack.NewDecoder(conf.MaxDecoderHeaderTableSize, nil)
fr.MaxHeaderListSize = sc.maxHeaderListSize()
- fr.SetMaxReadFrameSize(s.maxReadFrameSize())
+ fr.SetMaxReadFrameSize(conf.MaxReadFrameSize)
sc.framer = fr
if tc, ok := c.(connectionStater); ok {
@@ -440,7 +526,7 @@
// So for now, do nothing here again.
}
- if !s.PermitProhibitedCipherSuites && isBadCipher(sc.tlsState.CipherSuite) {
+ if !conf.PermitProhibitedCipherSuites && isBadCipher(sc.tlsState.CipherSuite) {
// "Endpoints MAY choose to generate a connection error
// (Section 5.4.1) of type INADEQUATE_SECURITY if one of
// the prohibited cipher suites are negotiated."
@@ -456,10 +542,28 @@
}
}
+ if opts.Settings != nil {
+ fr := &SettingsFrame{
+ FrameHeader: FrameHeader{valid: true},
+ p: opts.Settings,
+ }
+ if err := fr.ForeachSetting(sc.processSetting); err != nil {
+ sc.rejectConn(ErrCodeProtocol, "invalid settings")
+ return
+ }
+ opts.Settings = nil
+ }
+
if hook := testHookGetServerConn; hook != nil {
hook(sc)
}
- sc.serve()
+
+ if opts.UpgradeRequest != nil {
+ sc.upgradeRequest(opts.UpgradeRequest)
+ opts.UpgradeRequest = nil
+ }
+
+ sc.serve(conf)
}
func serverConnBaseContext(c net.Conn, opts *ServeConnOpts) (ctx context.Context, cancel func()) {
@@ -494,15 +598,17 @@
wroteFrameCh chan frameWriteResult // from writeFrameAsync -> serve, tickles more frame writes
bodyReadCh chan bodyReadMsg // from handlers -> serve
serveMsgCh chan interface{} // misc messages & code to send to / run on the serve loop
- flow flow // conn-wide (not stream-specific) outbound flow control
- inflow flow // conn-wide inbound flow control
+ flow outflow // conn-wide (not stream-specific) outbound flow control
+ inflow inflow // conn-wide inbound flow control
tlsState *tls.ConnectionState // shared by all handlers, like net/http
remoteAddrStr string
writeSched WriteScheduler
+ countErrorFunc func(errType string)
// Everything following is owned by the serve loop; use serveG.check():
serveG goroutineLock // used to verify funcs are on serve()
pushEnabled bool
+ sawClientPreface bool // preface has already been read, used in h2c upgrade
sawFirstSettings bool // got the initial SETTINGS frame after the preface
needToSendSettingsAck bool
unackedSettings int // how many SETTINGS have we sent without ACKs?
@@ -511,23 +617,31 @@
advMaxStreams uint32 // our SETTINGS_MAX_CONCURRENT_STREAMS advertised the client
curClientStreams uint32 // number of open streams initiated by the client
curPushedStreams uint32 // number of open streams initiated by server push
+ curHandlers uint32 // number of running handler goroutines
maxClientStreamID uint32 // max ever seen from client (odd), or 0 if there have been no client requests
maxPushPromiseID uint32 // ID of the last push promise (even), or 0 if there have been no pushes
streams map[uint32]*stream
+ unstartedHandlers []unstartedHandler
initialStreamSendWindowSize int32
+ initialStreamRecvWindowSize int32
maxFrameSize int32
- headerTableSize uint32
peerMaxHeaderListSize uint32 // zero means unknown (default)
canonHeader map[string]string // http2-lower-case -> Go-Canonical-Case
+ canonHeaderKeysSize int // canonHeader keys size in bytes
writingFrame bool // started writing a frame (on serve goroutine or separate)
writingFrameAsync bool // started a frame on its own goroutine but haven't heard back on wroteFrameCh
needsFrameFlush bool // last frame write wasn't a flush
inGoAway bool // we've started to or sent GOAWAY
inFrameScheduleLoop bool // whether we're in the scheduleFrameWrite loop
needToSendGoAway bool // we need to schedule a GOAWAY frame write
+ pingSent bool
+ sentPingData [8]byte
goAwayCode ErrCode
- shutdownTimer *time.Timer // nil until used
- idleTimer *time.Timer // nil if unused
+ shutdownTimer timer // nil until used
+ idleTimer timer // nil if unused
+ readIdleTimeout time.Duration
+ pingTimeout time.Duration
+ readIdleTimer timer // nil if unused
// Owned by the writeFrameAsync goroutine:
headerWriteBuf bytes.Buffer
@@ -542,11 +656,7 @@
if n <= 0 {
n = http.DefaultMaxHeaderBytes
}
- // http2's count is in a slightly different unit and includes 32 bytes per pair.
- // So, take the net/http.Server value and pad it up a bit, assuming 10 headers.
- const perFieldOverhead = 32 // per http2 spec
- const typicalHeaders = 10 // conservative
- return uint32(n + typicalHeaders*perFieldOverhead)
+ return uint32(adjustHTTP1MaxHeaderSize(int64(n)))
}
func (sc *serverConn) curOpenStreams() uint32 {
@@ -571,15 +681,17 @@
cancelCtx func()
// owned by serverConn's serve loop:
- bodyBytes int64 // body bytes seen so far
- declBodyBytes int64 // or -1 if undeclared
- flow flow // limits writing from Handler to client
- inflow flow // what the client is allowed to POST/etc to us
+ bodyBytes int64 // body bytes seen so far
+ declBodyBytes int64 // or -1 if undeclared
+ flow outflow // limits writing from Handler to client
+ inflow inflow // what the client is allowed to POST/etc to us
state streamState
- resetQueued bool // RST_STREAM queued for write; set by sc.resetStream
- gotTrailerHeader bool // HEADER frame for trailers was seen
- wroteHeaders bool // whether we wrote headers (not status 100)
- writeDeadline *time.Timer // nil if unused
+ resetQueued bool // RST_STREAM queued for write; set by sc.resetStream
+ gotTrailerHeader bool // HEADER frame for trailers was seen
+ wroteHeaders bool // whether we wrote headers (not status 100)
+ readDeadline timer // nil if unused
+ writeDeadline timer // nil if unused
+ closeErr error // set before cw is closed
trailer http.Header // accumulated trailers
reqTrailer http.Header // handler's Request.Trailer
@@ -657,11 +769,7 @@
return false
}
- // TODO: remove this string search and be more like the Windows
- // case below. That might involve modifying the standard library
- // to return better error types.
- str := err.Error()
- if strings.Contains(str, "use of closed network connection") {
+ if errors.Is(err, net.ErrClosed) {
return true
}
@@ -695,6 +803,13 @@
}
}
+// maxCachedCanonicalHeadersKeysSize is an arbitrarily-chosen limit on the size
+// of the entries in the canonHeader cache.
+// This should be larger than the size of unique, uncommon header keys likely to
+// be sent by the peer, while not so high as to permit unreasonable memory usage
+// if the peer sends an unbounded number of unique header keys.
+const maxCachedCanonicalHeadersKeysSize = 2048
+
func (sc *serverConn) canonicalHeader(v string) string {
sc.serveG.check()
buildCommonHeaderMapsOnce()
@@ -710,7 +825,11 @@
sc.canonHeader = make(map[string]string)
}
cv = http.CanonicalHeaderKey(v)
- sc.canonHeader[v] = cv
+ size := 100 + len(v)*2 // 100 bytes of map overhead + key + value
+ if sc.canonHeaderKeysSize+size <= maxCachedCanonicalHeadersKeysSize {
+ sc.canonHeader[v] = cv
+ sc.canonHeaderKeysSize += size
+ }
return cv
}
@@ -729,8 +848,9 @@
// consumer is done with the frame.
// It's run on its own goroutine.
func (sc *serverConn) readFrames() {
- gate := make(gate)
- gateDone := gate.Done
+ sc.srv.markNewGoroutine()
+ gate := make(chan struct{})
+ gateDone := func() { gate <- struct{}{} }
for {
f, err := sc.framer.ReadFrame()
select {
@@ -760,8 +880,14 @@
// and then reports when it's done.
// At most one goroutine can be running writeFrameAsync at a time per
// serverConn.
-func (sc *serverConn) writeFrameAsync(wr FrameWriteRequest) {
- err := wr.write.writeFrame(sc)
+func (sc *serverConn) writeFrameAsync(wr FrameWriteRequest, wd *writeData) {
+ sc.srv.markNewGoroutine()
+ var err error
+ if wd == nil {
+ err = wr.write.writeFrame(sc)
+ } else {
+ err = sc.framer.endWrite()
+ }
sc.wroteFrameCh <- frameWriteResult{wr: wr, err: err}
}
@@ -794,7 +920,7 @@
}
}
-func (sc *serverConn) serve() {
+func (sc *serverConn) serve(conf http2Config) {
sc.serveG.check()
defer sc.notePanic()
defer sc.conn.Close()
@@ -806,19 +932,24 @@
sc.vlogf("http2: server connection from %v on %p", sc.conn.RemoteAddr(), sc.hs)
}
+ settings := writeSettings{
+ {SettingMaxFrameSize, conf.MaxReadFrameSize},
+ {SettingMaxConcurrentStreams, sc.advMaxStreams},
+ {SettingMaxHeaderListSize, sc.maxHeaderListSize()},
+ {SettingHeaderTableSize, conf.MaxDecoderHeaderTableSize},
+ {SettingInitialWindowSize, uint32(sc.initialStreamRecvWindowSize)},
+ }
+ if !disableExtendedConnectProtocol {
+ settings = append(settings, Setting{SettingEnableConnectProtocol, 1})
+ }
sc.writeFrame(FrameWriteRequest{
- write: writeSettings{
- {SettingMaxFrameSize, sc.srv.maxReadFrameSize()},
- {SettingMaxConcurrentStreams, sc.advMaxStreams},
- {SettingMaxHeaderListSize, sc.maxHeaderListSize()},
- {SettingInitialWindowSize, uint32(sc.srv.initialStreamRecvWindowSize())},
- },
+ write: settings,
})
sc.unackedSettings++
- // Each connection starts with intialWindowSize inflow tokens.
+ // Each connection starts with initialWindowSize inflow tokens.
// If a higher value is configured, we add more tokens.
- if diff := sc.srv.initialConnRecvWindowSize() - initialWindowSize; diff > 0 {
+ if diff := conf.MaxUploadBufferPerConnection - initialWindowSize; diff > 0 {
sc.sendWindowUpdate(nil, int(diff))
}
@@ -833,16 +964,23 @@
sc.setConnState(http.StateActive)
sc.setConnState(http.StateIdle)
- if sc.srv.IdleTimeout != 0 {
- sc.idleTimer = time.AfterFunc(sc.srv.IdleTimeout, sc.onIdleTimer)
+ if sc.srv.IdleTimeout > 0 {
+ sc.idleTimer = sc.srv.afterFunc(sc.srv.IdleTimeout, sc.onIdleTimer)
defer sc.idleTimer.Stop()
}
+ if conf.SendPingTimeout > 0 {
+ sc.readIdleTimeout = conf.SendPingTimeout
+ sc.readIdleTimer = sc.srv.afterFunc(conf.SendPingTimeout, sc.onReadIdleTimer)
+ defer sc.readIdleTimer.Stop()
+ }
+
go sc.readFrames() // closed by defer sc.conn.Close above
- settingsTimer := time.AfterFunc(firstSettingsTimeout, sc.onSettingsTimer)
+ settingsTimer := sc.srv.afterFunc(firstSettingsTimeout, sc.onSettingsTimer)
defer settingsTimer.Stop()
+ lastFrameTime := sc.srv.now()
loopNum := 0
for {
loopNum++
@@ -856,6 +994,16 @@
case res := <-sc.wroteFrameCh:
sc.wroteFrame(res)
case res := <-sc.readFrameCh:
+ lastFrameTime = sc.srv.now()
+ // Process any written frames before reading new frames from the client since a
+ // written frame could have triggered a new stream to be started.
+ if sc.writingFrameAsync {
+ select {
+ case wroteRes := <-sc.wroteFrameCh:
+ sc.wroteFrame(wroteRes)
+ default:
+ }
+ }
if !sc.processFrameFromReader(res) {
return
}
@@ -878,16 +1026,22 @@
case idleTimerMsg:
sc.vlogf("connection is idle")
sc.goAway(ErrCodeNo)
+ case readIdleTimerMsg:
+ sc.handlePingTimer(lastFrameTime)
case shutdownTimerMsg:
sc.vlogf("GOAWAY close timer fired; closing conn from %v", sc.conn.RemoteAddr())
return
case gracefulShutdownMsg:
sc.startGracefulShutdownInternal()
+ case handlerDoneMsg:
+ sc.handlerDone()
default:
panic("unknown timer")
}
case *startPushRequest:
sc.startPush(v)
+ case func(*serverConn):
+ v(sc)
default:
panic(fmt.Sprintf("unexpected type %T", v))
}
@@ -896,7 +1050,7 @@
// If the peer is causing us to generate a lot of control frames,
// but not reading them from us, assume they are trying to make us
// run out of memory.
- if sc.queuedControlFrames > sc.srv.maxQueuedControlFrames() {
+ if sc.queuedControlFrames > maxQueuedControlFrames {
sc.vlogf("http2: too many control frames in send queue, closing connection")
return
}
@@ -912,12 +1066,30 @@
}
}
-func (sc *serverConn) awaitGracefulShutdown(sharedCh <-chan struct{}, privateCh chan struct{}) {
- select {
- case <-sc.doneServing:
- case <-sharedCh:
- close(privateCh)
+func (sc *serverConn) handlePingTimer(lastFrameReadTime time.Time) {
+ if sc.pingSent {
+ sc.vlogf("timeout waiting for PING response")
+ sc.conn.Close()
+ return
}
+
+ pingAt := lastFrameReadTime.Add(sc.readIdleTimeout)
+ now := sc.srv.now()
+ if pingAt.After(now) {
+ // We received frames since arming the ping timer.
+ // Reset it for the next possible timeout.
+ sc.readIdleTimer.Reset(pingAt.Sub(now))
+ return
+ }
+
+ sc.pingSent = true
+ // Ignore crypto/rand.Read errors: It generally can't fail, and worse case if it does
+ // is we send a PING frame containing 0s.
+ _, _ = rand.Read(sc.sentPingData[:])
+ sc.writeFrame(FrameWriteRequest{
+ write: &writePing{data: sc.sentPingData},
+ })
+ sc.readIdleTimer.Reset(sc.pingTimeout)
}
type serverMessage int
@@ -926,12 +1098,15 @@
var (
settingsTimerMsg = new(serverMessage)
idleTimerMsg = new(serverMessage)
+ readIdleTimerMsg = new(serverMessage)
shutdownTimerMsg = new(serverMessage)
gracefulShutdownMsg = new(serverMessage)
+ handlerDoneMsg = new(serverMessage)
)
func (sc *serverConn) onSettingsTimer() { sc.sendServeMsg(settingsTimerMsg) }
func (sc *serverConn) onIdleTimer() { sc.sendServeMsg(idleTimerMsg) }
+func (sc *serverConn) onReadIdleTimer() { sc.sendServeMsg(readIdleTimerMsg) }
func (sc *serverConn) onShutdownTimer() { sc.sendServeMsg(shutdownTimerMsg) }
func (sc *serverConn) sendServeMsg(msg interface{}) {
@@ -948,6 +1123,9 @@
// returns errPrefaceTimeout on timeout, or an error if the greeting
// is invalid.
func (sc *serverConn) readPreface() error {
+ if sc.sawClientPreface {
+ return nil
+ }
errc := make(chan error, 1)
go func() {
// Read the client preface
@@ -960,10 +1138,10 @@
errc <- nil
}
}()
- timer := time.NewTimer(prefaceTimeout) // TODO: configurable on *Server?
+ timer := sc.srv.newTimer(prefaceTimeout) // TODO: configurable on *Server?
defer timer.Stop()
select {
- case <-timer.C:
+ case <-timer.C():
return errPrefaceTimeout
case err := <-errc:
if err == nil {
@@ -1153,9 +1331,16 @@
sc.writingFrameAsync = false
err := wr.write.writeFrame(sc)
sc.wroteFrame(frameWriteResult{wr: wr, err: err})
+ } else if wd, ok := wr.write.(*writeData); ok {
+ // Encode the frame in the serve goroutine, to ensure we don't have
+ // any lingering asynchronous references to data passed to Write.
+ // See https://go.dev/issue/58446.
+ sc.framer.startWriteDataPadded(wd.streamID, wd.endStream, wd.p, nil)
+ sc.writingFrameAsync = true
+ go sc.writeFrameAsync(wr, wd)
} else {
sc.writingFrameAsync = true
- go sc.writeFrameAsync(wr)
+ go sc.writeFrameAsync(wr, nil)
}
}
@@ -1174,6 +1359,10 @@
sc.writingFrame = false
sc.writingFrameAsync = false
+ if res.err != nil {
+ sc.conn.Close()
+ }
+
wr := res.wr
if writeEndsStream(wr.write) {
@@ -1308,6 +1497,9 @@
func (sc *serverConn) goAway(code ErrCode) {
sc.serveG.check()
if sc.inGoAway {
+ if sc.goAwayCode == ErrCodeNo {
+ sc.goAwayCode = code
+ }
return
}
sc.inGoAway = true
@@ -1318,7 +1510,7 @@
func (sc *serverConn) shutDownIn(d time.Duration) {
sc.serveG.check()
- sc.shutdownTimer = time.AfterFunc(d, sc.onShutdownTimer)
+ sc.shutdownTimer = sc.srv.afterFunc(d, sc.onShutdownTimer)
}
func (sc *serverConn) resetStream(se StreamError) {
@@ -1371,6 +1563,11 @@
sc.goAway(ErrCodeFlowControl)
return true
case ConnectionError:
+ if res.f != nil {
+ if id := res.f.Header().StreamID; id > sc.maxClientStreamID {
+ sc.maxClientStreamID = id
+ }
+ }
sc.logf("http2: server connection error from %v: %v", sc.conn.RemoteAddr(), ev)
sc.goAway(ErrCode(ev))
return true // goAway will handle shutdown
@@ -1390,11 +1587,26 @@
// First frame received must be SETTINGS.
if !sc.sawFirstSettings {
if _, ok := f.(*SettingsFrame); !ok {
- return ConnectionError(ErrCodeProtocol)
+ return sc.countError("first_settings", ConnectionError(ErrCodeProtocol))
}
sc.sawFirstSettings = true
}
+ // Discard frames for streams initiated after the identified last
+ // stream sent in a GOAWAY, or all frames after sending an error.
+ // We still need to return connection-level flow control for DATA frames.
+ // RFC 9113 Section 6.8.
+ if sc.inGoAway && (sc.goAwayCode != ErrCodeNo || f.Header().StreamID > sc.maxClientStreamID) {
+
+ if f, ok := f.(*DataFrame); ok {
+ if !sc.inflow.take(f.Length) {
+ return sc.countError("data_flow", streamError(f.Header().StreamID, ErrCodeFlowControl))
+ }
+ sc.sendWindowUpdate(nil, int(f.Length)) // conn-level
+ }
+ return nil
+ }
+
switch f := f.(type) {
case *SettingsFrame:
return sc.processSettings(f)
@@ -1415,7 +1627,7 @@
case *PushPromiseFrame:
// A client cannot push. Thus, servers MUST treat the receipt of a PUSH_PROMISE
// frame as a connection error (Section 5.4.1) of type PROTOCOL_ERROR.
- return ConnectionError(ErrCodeProtocol)
+ return sc.countError("push_promise", ConnectionError(ErrCodeProtocol))
default:
sc.vlogf("http2: server ignoring frame: %v", f.Header())
return nil
@@ -1425,6 +1637,11 @@
func (sc *serverConn) processPing(f *PingFrame) error {
sc.serveG.check()
if f.IsAck() {
+ if sc.pingSent && sc.sentPingData == f.Data {
+ // This is a response to a PING we sent.
+ sc.pingSent = false
+ sc.readIdleTimer.Reset(sc.readIdleTimeout)
+ }
// 6.7 PING: " An endpoint MUST NOT respond to PING frames
// containing this flag."
return nil
@@ -1435,10 +1652,7 @@
// identifier field value other than 0x0, the recipient MUST
// respond with a connection error (Section 5.4.1) of type
// PROTOCOL_ERROR."
- return ConnectionError(ErrCodeProtocol)
- }
- if sc.inGoAway && sc.goAwayCode != ErrCodeNo {
- return nil
+ return sc.countError("ping_on_stream", ConnectionError(ErrCodeProtocol))
}
sc.writeFrame(FrameWriteRequest{write: writePingAck{f}})
return nil
@@ -1454,7 +1668,7 @@
// or PRIORITY on a stream in this state MUST be
// treated as a connection error (Section 5.4.1) of
// type PROTOCOL_ERROR."
- return ConnectionError(ErrCodeProtocol)
+ return sc.countError("stream_idle", ConnectionError(ErrCodeProtocol))
}
if st == nil {
// "WINDOW_UPDATE can be sent by a peer that has sent a
@@ -1465,7 +1679,7 @@
return nil
}
if !st.flow.add(int32(f.Increment)) {
- return streamError(f.StreamID, ErrCodeFlowControl)
+ return sc.countError("bad_flow", streamError(f.StreamID, ErrCodeFlowControl))
}
default: // connection-level flow control
if !sc.flow.add(int32(f.Increment)) {
@@ -1486,7 +1700,7 @@
// identifying an idle stream is received, the
// recipient MUST treat this as a connection error
// (Section 5.4.1) of type PROTOCOL_ERROR.
- return ConnectionError(ErrCodeProtocol)
+ return sc.countError("reset_idle_stream", ConnectionError(ErrCodeProtocol))
}
if st != nil {
st.cancelCtx()
@@ -1501,6 +1715,9 @@
panic(fmt.Sprintf("invariant; can't close stream in state %v", st.state))
}
st.state = stateClosed
+ if st.readDeadline != nil {
+ st.readDeadline.Stop()
+ }
if st.writeDeadline != nil {
st.writeDeadline.Stop()
}
@@ -1512,7 +1729,7 @@
delete(sc.streams, st.id)
if len(sc.streams) == 0 {
sc.setConnState(http.StateIdle)
- if sc.srv.IdleTimeout != 0 {
+ if sc.srv.IdleTimeout > 0 && sc.idleTimer != nil {
sc.idleTimer.Reset(sc.srv.IdleTimeout)
}
if h1ServerKeepAlivesDisabled(sc.hs) {
@@ -1526,6 +1743,15 @@
p.CloseWithError(err)
}
+ if e, ok := err.(StreamError); ok {
+ if e.Cause != nil {
+ err = e.Cause
+ } else {
+ err = errStreamClosed
+ }
+ }
+ st.closeErr = err
+ st.cancelCtx()
st.cw.Close() // signals Handler's CloseNotifier, unblocks writes, etc
sc.writeSched.CloseStream(st.id)
}
@@ -1538,7 +1764,7 @@
// Why is the peer ACKing settings we never sent?
// The spec doesn't mention this case, but
// hang up on them anyway.
- return ConnectionError(ErrCodeProtocol)
+ return sc.countError("ack_mystery", ConnectionError(ErrCodeProtocol))
}
return nil
}
@@ -1546,7 +1772,7 @@
// This isn't actually in the spec, but hang up on
// suspiciously large settings frames or those with
// duplicate entries.
- return ConnectionError(ErrCodeProtocol)
+ return sc.countError("settings_big_or_dups", ConnectionError(ErrCodeProtocol))
}
if err := f.ForeachSetting(sc.processSetting); err != nil {
return err
@@ -1568,7 +1794,6 @@
}
switch s.ID {
case SettingHeaderTableSize:
- sc.headerTableSize = s.Val
sc.hpackEncoder.SetMaxDynamicTableSize(s.Val)
case SettingEnablePush:
sc.pushEnabled = s.Val != 0
@@ -1580,6 +1805,9 @@
sc.maxFrameSize = int32(s.Val) // the maximum valid s.Val is < 2^31
case SettingMaxHeaderListSize:
sc.peerMaxHeaderListSize = s.Val
+ case SettingEnableConnectProtocol:
+ // Receipt of this parameter by a server does not
+ // have any impact
default:
// Unknown setting: "An endpoint that receives a SETTINGS
// frame with any unknown or unsupported identifier MUST
@@ -1613,7 +1841,7 @@
// control window to exceed the maximum size as a
// connection error (Section 5.4.1) of type
// FLOW_CONTROL_ERROR."
- return ConnectionError(ErrCodeFlowControl)
+ return sc.countError("setting_win_size", ConnectionError(ErrCodeFlowControl))
}
}
return nil
@@ -1622,16 +1850,6 @@
func (sc *serverConn) processData(f *DataFrame) error {
sc.serveG.check()
id := f.Header().StreamID
- if sc.inGoAway && (sc.goAwayCode != ErrCodeNo || id > sc.maxClientStreamID) {
- // Discard all DATA frames if the GOAWAY is due to an
- // error, or:
- //
- // Section 6.8: After sending a GOAWAY frame, the sender
- // can discard frames for streams initiated by the
- // receiver with identifiers higher than the identified
- // last stream.
- return nil
- }
data := f.Data()
state, st := sc.state(id)
@@ -1646,7 +1864,7 @@
// or PRIORITY on a stream in this state MUST be
// treated as a connection error (Section 5.4.1) of
// type PROTOCOL_ERROR."
- return ConnectionError(ErrCodeProtocol)
+ return sc.countError("data_on_idle", ConnectionError(ErrCodeProtocol))
}
// "If a DATA frame is received whose stream is not in "open"
@@ -1662,21 +1880,16 @@
// But still enforce their connection-level flow control,
// and return any flow control bytes since we're not going
// to consume them.
- if sc.inflow.available() < int32(f.Length) {
- return streamError(id, ErrCodeFlowControl)
+ if !sc.inflow.take(f.Length) {
+ return sc.countError("data_flow", streamError(id, ErrCodeFlowControl))
}
- // Deduct the flow control from inflow, since we're
- // going to immediately add it back in
- // sendWindowUpdate, which also schedules sending the
- // frames.
- sc.inflow.take(int32(f.Length))
sc.sendWindowUpdate(nil, int(f.Length)) // conn-level
if st != nil && st.resetQueued {
// Already have a stream error in flight. Don't send another.
return nil
}
- return streamError(id, ErrCodeStreamClosed)
+ return sc.countError("closed", streamError(id, ErrCodeStreamClosed))
}
if st.body == nil {
panic("internal error: should have a body in this state")
@@ -1684,37 +1897,46 @@
// Sender sending more than they'd declared?
if st.declBodyBytes != -1 && st.bodyBytes+int64(len(data)) > st.declBodyBytes {
+ if !sc.inflow.take(f.Length) {
+ return sc.countError("data_flow", streamError(id, ErrCodeFlowControl))
+ }
+ sc.sendWindowUpdate(nil, int(f.Length)) // conn-level
+
st.body.CloseWithError(fmt.Errorf("sender tried to send more than declared Content-Length of %d bytes", st.declBodyBytes))
// RFC 7540, sec 8.1.2.6: A request or response is also malformed if the
// value of a content-length header field does not equal the sum of the
// DATA frame payload lengths that form the body.
- return streamError(id, ErrCodeProtocol)
+ return sc.countError("send_too_much", streamError(id, ErrCodeProtocol))
}
if f.Length > 0 {
// Check whether the client has flow control quota.
- if st.inflow.available() < int32(f.Length) {
- return streamError(id, ErrCodeFlowControl)
+ if !takeInflows(&sc.inflow, &st.inflow, f.Length) {
+ return sc.countError("flow_on_data_length", streamError(id, ErrCodeFlowControl))
}
- st.inflow.take(int32(f.Length))
if len(data) > 0 {
+ st.bodyBytes += int64(len(data))
wrote, err := st.body.Write(data)
if err != nil {
+ // The handler has closed the request body.
+ // Return the connection-level flow control for the discarded data,
+ // but not the stream-level flow control.
sc.sendWindowUpdate(nil, int(f.Length)-wrote)
- return streamError(id, ErrCodeStreamClosed)
+ return nil
}
if wrote != len(data) {
panic("internal error: bad Writer")
}
- st.bodyBytes += int64(len(data))
}
// Return any padded flow control now, since we won't
// refund it later on body reads.
- if pad := int32(f.Length) - int32(len(data)); pad > 0 {
- sc.sendWindowUpdate32(nil, pad)
- sc.sendWindowUpdate32(st, pad)
- }
+ // Call sendWindowUpdate even if there is no padding,
+ // to return buffered flow control credit if the sent
+ // window has shrunk.
+ pad := int32(f.Length) - int32(len(data))
+ sc.sendWindowUpdate32(nil, pad)
+ sc.sendWindowUpdate32(st, pad)
}
if f.StreamEnded() {
st.endStream()
@@ -1768,26 +1990,36 @@
}
}
+// onReadTimeout is run on its own goroutine (from time.AfterFunc)
+// when the stream's ReadTimeout has fired.
+func (st *stream) onReadTimeout() {
+ if st.body != nil {
+ // Wrap the ErrDeadlineExceeded to avoid callers depending on us
+ // returning the bare error.
+ st.body.CloseWithError(fmt.Errorf("%w", os.ErrDeadlineExceeded))
+ }
+}
+
// onWriteTimeout is run on its own goroutine (from time.AfterFunc)
// when the stream's WriteTimeout has fired.
func (st *stream) onWriteTimeout() {
- st.sc.writeFrameFromHandler(FrameWriteRequest{write: streamError(st.id, ErrCodeInternal)})
+ st.sc.writeFrameFromHandler(FrameWriteRequest{write: StreamError{
+ StreamID: st.id,
+ Code: ErrCodeInternal,
+ Cause: os.ErrDeadlineExceeded,
+ }})
}
func (sc *serverConn) processHeaders(f *MetaHeadersFrame) error {
sc.serveG.check()
id := f.StreamID
- if sc.inGoAway {
- // Ignore.
- return nil
- }
// http://tools.ietf.org/html/rfc7540#section-5.1.1
// Streams initiated by a client MUST use odd-numbered stream
// identifiers. [...] An endpoint that receives an unexpected
// stream identifier MUST respond with a connection error
// (Section 5.4.1) of type PROTOCOL_ERROR.
if id%2 != 1 {
- return ConnectionError(ErrCodeProtocol)
+ return sc.countError("headers_even", ConnectionError(ErrCodeProtocol))
}
// A HEADERS frame can be used to create a new stream or
// send a trailer for an open one. If we already have a stream
@@ -1804,7 +2036,7 @@
// this state, it MUST respond with a stream error (Section 5.4.2) of
// type STREAM_CLOSED.
if st.state == stateHalfClosedRemote {
- return streamError(id, ErrCodeStreamClosed)
+ return sc.countError("headers_half_closed", streamError(id, ErrCodeStreamClosed))
}
return st.processTrailerHeaders(f)
}
@@ -1815,7 +2047,7 @@
// receives an unexpected stream identifier MUST respond with
// a connection error (Section 5.4.1) of type PROTOCOL_ERROR.
if id <= sc.maxClientStreamID {
- return ConnectionError(ErrCodeProtocol)
+ return sc.countError("stream_went_down", ConnectionError(ErrCodeProtocol))
}
sc.maxClientStreamID = id
@@ -1832,14 +2064,14 @@
if sc.curClientStreams+1 > sc.advMaxStreams {
if sc.unackedSettings == 0 {
// They should know better.
- return streamError(id, ErrCodeProtocol)
+ return sc.countError("over_max_streams", streamError(id, ErrCodeProtocol))
}
// Assume it's a network race, where they just haven't
// received our last SETTINGS update. But actually
// this can't happen yet, because we don't yet provide
// a way for users to adjust server parameters at
// runtime.
- return streamError(id, ErrCodeRefusedStream)
+ return sc.countError("over_max_streams_race", streamError(id, ErrCodeRefusedStream))
}
initialState := stateOpen
@@ -1849,7 +2081,7 @@
st := sc.newStream(id, 0, initialState)
if f.HasPriority() {
- if err := checkPriority(f.StreamID, f.Priority); err != nil {
+ if err := sc.checkPriority(f.StreamID, f.Priority); err != nil {
return err
}
sc.writeSched.AdjustStream(st.id, f.Priority)
@@ -1881,27 +2113,51 @@
// similar to how the http1 server works. Here it's
// technically more like the http1 Server's ReadHeaderTimeout
// (in Go 1.8), though. That's a more sane option anyway.
- if sc.hs.ReadTimeout != 0 {
+ if sc.hs.ReadTimeout > 0 {
+ sc.conn.SetReadDeadline(time.Time{})
+ st.readDeadline = sc.srv.afterFunc(sc.hs.ReadTimeout, st.onReadTimeout)
+ }
+
+ return sc.scheduleHandler(id, rw, req, handler)
+}
+
+func (sc *serverConn) upgradeRequest(req *http.Request) {
+ sc.serveG.check()
+ id := uint32(1)
+ sc.maxClientStreamID = id
+ st := sc.newStream(id, 0, stateHalfClosedRemote)
+ st.reqTrailer = req.Trailer
+ if st.reqTrailer != nil {
+ st.trailer = make(http.Header)
+ }
+ rw := sc.newResponseWriter(st, req)
+
+ // Disable any read deadline set by the net/http package
+ // prior to the upgrade.
+ if sc.hs.ReadTimeout > 0 {
sc.conn.SetReadDeadline(time.Time{})
}
- go sc.runHandler(rw, req, handler)
- return nil
+ // This is the first request on the connection,
+ // so start the handler directly rather than going
+ // through scheduleHandler.
+ sc.curHandlers++
+ go sc.runHandler(rw, req, sc.handler.ServeHTTP)
}
func (st *stream) processTrailerHeaders(f *MetaHeadersFrame) error {
sc := st.sc
sc.serveG.check()
if st.gotTrailerHeader {
- return ConnectionError(ErrCodeProtocol)
+ return sc.countError("dup_trailers", ConnectionError(ErrCodeProtocol))
}
st.gotTrailerHeader = true
if !f.StreamEnded() {
- return streamError(st.id, ErrCodeProtocol)
+ return sc.countError("trailers_not_ended", streamError(st.id, ErrCodeProtocol))
}
if len(f.PseudoFields()) > 0 {
- return streamError(st.id, ErrCodeProtocol)
+ return sc.countError("trailers_pseudo", streamError(st.id, ErrCodeProtocol))
}
if st.trailer != nil {
for _, hf := range f.RegularFields() {
@@ -1910,7 +2166,7 @@
// TODO: send more details to the peer somehow. But http2 has
// no way to send debug data at a stream level. Discuss with
// HTTP folk.
- return streamError(st.id, ErrCodeProtocol)
+ return sc.countError("trailers_bogus", streamError(st.id, ErrCodeProtocol))
}
st.trailer[key] = append(st.trailer[key], hf.Value)
}
@@ -1919,22 +2175,19 @@
return nil
}
-func checkPriority(streamID uint32, p PriorityParam) error {
+func (sc *serverConn) checkPriority(streamID uint32, p PriorityParam) error {
if streamID == p.StreamDep {
// Section 5.3.1: "A stream cannot depend on itself. An endpoint MUST treat
// this as a stream error (Section 5.4.2) of type PROTOCOL_ERROR."
// Section 5.3.3 says that a stream can depend on one of its dependencies,
// so it's only self-dependencies that are forbidden.
- return streamError(streamID, ErrCodeProtocol)
+ return sc.countError("priority", streamError(streamID, ErrCodeProtocol))
}
return nil
}
func (sc *serverConn) processPriority(f *PriorityFrame) error {
- if sc.inGoAway {
- return nil
- }
- if err := checkPriority(f.StreamID, f.PriorityParam); err != nil {
+ if err := sc.checkPriority(f.StreamID, f.PriorityParam); err != nil {
return err
}
sc.writeSched.AdjustStream(f.StreamID, f.PriorityParam)
@@ -1958,10 +2211,9 @@
st.cw.Init()
st.flow.conn = &sc.flow // link to conn-level counter
st.flow.add(sc.initialStreamSendWindowSize)
- st.inflow.conn = &sc.inflow // link to conn-level counter
- st.inflow.add(sc.srv.initialStreamRecvWindowSize())
- if sc.hs.WriteTimeout != 0 {
- st.writeDeadline = time.AfterFunc(sc.hs.WriteTimeout, st.onWriteTimeout)
+ st.inflow.init(sc.initialStreamRecvWindowSize)
+ if sc.hs.WriteTimeout > 0 {
+ st.writeDeadline = sc.srv.afterFunc(sc.hs.WriteTimeout, st.onWriteTimeout)
}
sc.streams[id] = st
@@ -1986,12 +2238,18 @@
scheme: f.PseudoValue("scheme"),
authority: f.PseudoValue("authority"),
path: f.PseudoValue("path"),
+ protocol: f.PseudoValue("protocol"),
+ }
+
+ // extended connect is disabled, so we should not see :protocol
+ if disableExtendedConnectProtocol && rp.protocol != "" {
+ return nil, nil, sc.countError("bad_connect", streamError(f.StreamID, ErrCodeProtocol))
}
isConnect := rp.method == "CONNECT"
if isConnect {
- if rp.path != "" || rp.scheme != "" || rp.authority == "" {
- return nil, nil, streamError(f.StreamID, ErrCodeProtocol)
+ if rp.protocol == "" && (rp.path != "" || rp.scheme != "" || rp.authority == "") {
+ return nil, nil, sc.countError("bad_connect", streamError(f.StreamID, ErrCodeProtocol))
}
} else if rp.method == "" || rp.path == "" || (rp.scheme != "https" && rp.scheme != "http") {
// See 8.1.2.6 Malformed Requests and Responses:
@@ -2004,13 +2262,7 @@
// "All HTTP/2 requests MUST include exactly one valid
// value for the :method, :scheme, and :path
// pseudo-header fields"
- return nil, nil, streamError(f.StreamID, ErrCodeProtocol)
- }
-
- bodyOpen := !f.StreamEnded()
- if rp.method == "HEAD" && bodyOpen {
- // HEAD requests can't have bodies
- return nil, nil, streamError(f.StreamID, ErrCodeProtocol)
+ return nil, nil, sc.countError("bad_path_method", streamError(f.StreamID, ErrCodeProtocol))
}
rp.header = make(http.Header)
@@ -2020,11 +2272,15 @@
if rp.authority == "" {
rp.authority = rp.header.Get("Host")
}
+ if rp.protocol != "" {
+ rp.header.Set(":protocol", rp.protocol)
+ }
rw, req, err := sc.newWriterAndRequestNoBody(st, rp)
if err != nil {
return nil, nil, err
}
+ bodyOpen := !f.StreamEnded()
if bodyOpen {
if vv, ok := rp.header["Content-Length"]; ok {
if cl, err := strconv.ParseUint(vv[0], 10, 63); err == nil {
@@ -2045,6 +2301,7 @@
type requestParam struct {
method string
scheme, authority, path string
+ protocol string
header http.Header
}
@@ -2056,7 +2313,7 @@
tlsState = sc.tlsState
}
- needsContinue := rp.header.Get("Expect") == "100-continue"
+ needsContinue := httpguts.HeaderValuesContainsToken(rp.header["Expect"], "100-continue")
if needsContinue {
rp.header.Del("Expect")
}
@@ -2086,14 +2343,14 @@
var url_ *url.URL
var requestURI string
- if rp.method == "CONNECT" {
+ if rp.method == "CONNECT" && rp.protocol == "" {
url_ = &url.URL{Host: rp.authority}
requestURI = rp.authority // mimic HTTP/1 server behavior
} else {
var err error
url_, err = url.ParseRequestURI(rp.path)
if err != nil {
- return nil, nil, streamError(st.id, ErrCodeProtocol)
+ return nil, nil, sc.countError("bad_path", streamError(st.id, ErrCodeProtocol))
}
requestURI = rp.path
}
@@ -2119,6 +2376,11 @@
}
req = req.WithContext(st.ctx)
+ rw := sc.newResponseWriter(st, req)
+ return rw, req, nil
+}
+
+func (sc *serverConn) newResponseWriter(st *stream, req *http.Request) *responseWriter {
rws := responseWriterStatePool.Get().(*responseWriterState)
bwSave := rws.bw
*rws = responseWriterState{} // zero all the fields
@@ -2127,17 +2389,72 @@
rws.bw.Reset(chunkWriter{rws})
rws.stream = st
rws.req = req
- rws.body = body
+ return &responseWriter{rws: rws}
+}
- rw := &responseWriter{rws: rws}
- return rw, req, nil
+type unstartedHandler struct {
+ streamID uint32
+ rw *responseWriter
+ req *http.Request
+ handler func(http.ResponseWriter, *http.Request)
+}
+
+// scheduleHandler starts a handler goroutine,
+// or schedules one to start as soon as an existing handler finishes.
+func (sc *serverConn) scheduleHandler(streamID uint32, rw *responseWriter, req *http.Request, handler func(http.ResponseWriter, *http.Request)) error {
+ sc.serveG.check()
+ maxHandlers := sc.advMaxStreams
+ if sc.curHandlers < maxHandlers {
+ sc.curHandlers++
+ go sc.runHandler(rw, req, handler)
+ return nil
+ }
+ if len(sc.unstartedHandlers) > int(4*sc.advMaxStreams) {
+ return sc.countError("too_many_early_resets", ConnectionError(ErrCodeEnhanceYourCalm))
+ }
+ sc.unstartedHandlers = append(sc.unstartedHandlers, unstartedHandler{
+ streamID: streamID,
+ rw: rw,
+ req: req,
+ handler: handler,
+ })
+ return nil
+}
+
+func (sc *serverConn) handlerDone() {
+ sc.serveG.check()
+ sc.curHandlers--
+ i := 0
+ maxHandlers := sc.advMaxStreams
+ for ; i < len(sc.unstartedHandlers); i++ {
+ u := sc.unstartedHandlers[i]
+ if sc.streams[u.streamID] == nil {
+ // This stream was reset before its goroutine had a chance to start.
+ continue
+ }
+ if sc.curHandlers >= maxHandlers {
+ break
+ }
+ sc.curHandlers++
+ go sc.runHandler(u.rw, u.req, u.handler)
+ sc.unstartedHandlers[i] = unstartedHandler{} // don't retain references
+ }
+ sc.unstartedHandlers = sc.unstartedHandlers[i:]
+ if len(sc.unstartedHandlers) == 0 {
+ sc.unstartedHandlers = nil
+ }
}
// Run on its own goroutine.
func (sc *serverConn) runHandler(rw *responseWriter, req *http.Request, handler func(http.ResponseWriter, *http.Request)) {
+ sc.srv.markNewGoroutine()
+ defer sc.sendServeMsg(handlerDoneMsg)
didPanic := true
defer func() {
rw.rws.stream.cancelCtx()
+ if req.MultipartForm != nil {
+ req.MultipartForm.RemoveAll()
+ }
if didPanic {
e := recover()
sc.writeFrameFromHandler(FrameWriteRequest{
@@ -2241,47 +2558,28 @@
}
// st may be nil for conn-level
-func (sc *serverConn) sendWindowUpdate(st *stream, n int) {
- sc.serveG.check()
- // "The legal range for the increment to the flow control
- // window is 1 to 2^31-1 (2,147,483,647) octets."
- // A Go Read call on 64-bit machines could in theory read
- // a larger Read than this. Very unlikely, but we handle it here
- // rather than elsewhere for now.
- const maxUint31 = 1<<31 - 1
- for n >= maxUint31 {
- sc.sendWindowUpdate32(st, maxUint31)
- n -= maxUint31
- }
- sc.sendWindowUpdate32(st, int32(n))
+func (sc *serverConn) sendWindowUpdate32(st *stream, n int32) {
+ sc.sendWindowUpdate(st, int(n))
}
// st may be nil for conn-level
-func (sc *serverConn) sendWindowUpdate32(st *stream, n int32) {
+func (sc *serverConn) sendWindowUpdate(st *stream, n int) {
sc.serveG.check()
- if n == 0 {
+ var streamID uint32
+ var send int32
+ if st == nil {
+ send = sc.inflow.add(n)
+ } else {
+ streamID = st.id
+ send = st.inflow.add(n)
+ }
+ if send == 0 {
return
}
- if n < 0 {
- panic("negative update")
- }
- var streamID uint32
- if st != nil {
- streamID = st.id
- }
sc.writeFrame(FrameWriteRequest{
- write: writeWindowUpdate{streamID: streamID, n: uint32(n)},
+ write: writeWindowUpdate{streamID: streamID, n: uint32(send)},
stream: st,
})
- var ok bool
- if st == nil {
- ok = sc.inflow.add(n)
- } else {
- ok = st.inflow.add(n)
- }
- if !ok {
- panic("internal error; sent too many window updates without decrements?")
- }
}
// requestBody is the Handler's Request.Body type.
@@ -2290,17 +2588,18 @@
_ incomparable
stream *stream
conn *serverConn
- closed bool // for use by Close only
- sawEOF bool // for use by Read only
- pipe *pipe // non-nil if we have a HTTP entity message body
- needsContinue bool // need to send a 100-continue
+ closeOnce sync.Once // for use by Close only
+ sawEOF bool // for use by Read only
+ pipe *pipe // non-nil if we have an HTTP entity message body
+ needsContinue bool // need to send a 100-continue
}
func (b *requestBody) Close() error {
- if b.pipe != nil && !b.closed {
- b.pipe.BreakWithError(errClosedBody)
- }
- b.closed = true
+ b.closeOnce.Do(func() {
+ if b.pipe != nil {
+ b.pipe.BreakWithError(errClosedBody)
+ }
+ })
return nil
}
@@ -2344,7 +2643,6 @@
// immutable within a request:
stream *stream
req *http.Request
- body *requestBody // to close at end of request, if DATA frames didn't
conn *serverConn
// TODO: adjust buffer writing sizes based on server config, frame size updates from peer, etc
@@ -2358,7 +2656,6 @@
wroteHeader bool // WriteHeader called (explicitly or implicitly). Not necessarily sent to user yet.
sentHeader bool // have we sent the header frame?
handlerDone bool // handler has finished
- dirty bool // a Write failed; don't reuse this responseWriterState
sentContentLen int64 // non-zero if handler set a Content-Length header
wroteBytes int64
@@ -2369,7 +2666,15 @@
type chunkWriter struct{ rws *responseWriterState }
-func (cw chunkWriter) Write(p []byte) (n int, err error) { return cw.rws.writeChunk(p) }
+func (cw chunkWriter) Write(p []byte) (n int, err error) {
+ n, err = cw.rws.writeChunk(p)
+ if err == errStreamClosed {
+ // If writing failed because the stream has been closed,
+ // return the reason it was closed.
+ err = cw.rws.stream.closeErr
+ }
+ return n, err
+}
func (rws *responseWriterState) hasTrailers() bool { return len(rws.trailers) > 0 }
@@ -2408,6 +2713,10 @@
rws.writeHeader(200)
}
+ if rws.handlerDone {
+ rws.promoteUndeclaredTrailers()
+ }
+
isHeadResp := rws.req.Method == "HEAD"
if !rws.sentHeader {
rws.sentHeader = true
@@ -2420,7 +2729,8 @@
clen = ""
}
}
- if clen == "" && rws.handlerDone && bodyAllowedForStatus(rws.status) && (len(p) > 0 || !isHeadResp) {
+ _, hasContentLength := rws.snapHeader["Content-Length"]
+ if !hasContentLength && clen == "" && rws.handlerDone && bodyAllowedForStatus(rws.status) && (len(p) > 0 || !isHeadResp) {
clen = strconv.Itoa(len(p))
}
_, hasContentType := rws.snapHeader["Content-Type"]
@@ -2434,7 +2744,7 @@
var date string
if _, ok := rws.snapHeader["Date"]; !ok {
// TODO(bradfitz): be faster here, like net/http? measure.
- date = time.Now().UTC().Format(http.TimeFormat)
+ date = rws.conn.srv.now().UTC().Format(http.TimeFormat)
}
for _, v := range rws.snapHeader["Trailer"] {
@@ -2465,7 +2775,6 @@
date: date,
})
if err != nil {
- rws.dirty = true
return 0, err
}
if endStream {
@@ -2479,10 +2788,6 @@
return 0, nil
}
- if rws.handlerDone {
- rws.promoteUndeclaredTrailers()
- }
-
// only send trailers if they have actually been defined by the
// server handler.
hasNonemptyTrailers := rws.hasNonemptyTrailers()
@@ -2490,7 +2795,6 @@
if len(p) > 0 || endStream {
// only send a 0 byte DATA frame if we're ending the stream.
if err := rws.conn.writeDataFromHandler(rws.stream, p, endStream); err != nil {
- rws.dirty = true
return 0, err
}
}
@@ -2502,9 +2806,6 @@
trailers: rws.trailers,
endStream: true,
})
- if err != nil {
- rws.dirty = true
- }
return len(p), err
}
return len(p), nil
@@ -2520,8 +2821,9 @@
// prior to the headers being written. If the set of trailers is fixed
// or known before the header is written, the normal Go trailers mechanism
// is preferred:
-// https://golang.org/pkg/net/http/#ResponseWriter
-// https://golang.org/pkg/net/http/#example_ResponseWriter_trailers
+//
+// https://golang.org/pkg/net/http/#ResponseWriter
+// https://golang.org/pkg/net/http/#example_ResponseWriter_trailers
const TrailerPrefix = "Trailer:"
// promoteUndeclaredTrailers permits http.Handlers to set trailers
@@ -2562,23 +2864,90 @@
}
}
+func (w *responseWriter) SetReadDeadline(deadline time.Time) error {
+ st := w.rws.stream
+ if !deadline.IsZero() && deadline.Before(w.rws.conn.srv.now()) {
+ // If we're setting a deadline in the past, reset the stream immediately
+ // so writes after SetWriteDeadline returns will fail.
+ st.onReadTimeout()
+ return nil
+ }
+ w.rws.conn.sendServeMsg(func(sc *serverConn) {
+ if st.readDeadline != nil {
+ if !st.readDeadline.Stop() {
+ // Deadline already exceeded, or stream has been closed.
+ return
+ }
+ }
+ if deadline.IsZero() {
+ st.readDeadline = nil
+ } else if st.readDeadline == nil {
+ st.readDeadline = sc.srv.afterFunc(deadline.Sub(sc.srv.now()), st.onReadTimeout)
+ } else {
+ st.readDeadline.Reset(deadline.Sub(sc.srv.now()))
+ }
+ })
+ return nil
+}
+
+func (w *responseWriter) SetWriteDeadline(deadline time.Time) error {
+ st := w.rws.stream
+ if !deadline.IsZero() && deadline.Before(w.rws.conn.srv.now()) {
+ // If we're setting a deadline in the past, reset the stream immediately
+ // so writes after SetWriteDeadline returns will fail.
+ st.onWriteTimeout()
+ return nil
+ }
+ w.rws.conn.sendServeMsg(func(sc *serverConn) {
+ if st.writeDeadline != nil {
+ if !st.writeDeadline.Stop() {
+ // Deadline already exceeded, or stream has been closed.
+ return
+ }
+ }
+ if deadline.IsZero() {
+ st.writeDeadline = nil
+ } else if st.writeDeadline == nil {
+ st.writeDeadline = sc.srv.afterFunc(deadline.Sub(sc.srv.now()), st.onWriteTimeout)
+ } else {
+ st.writeDeadline.Reset(deadline.Sub(sc.srv.now()))
+ }
+ })
+ return nil
+}
+
+func (w *responseWriter) EnableFullDuplex() error {
+ // We always support full duplex responses, so this is a no-op.
+ return nil
+}
+
func (w *responseWriter) Flush() {
+ w.FlushError()
+}
+
+func (w *responseWriter) FlushError() error {
rws := w.rws
if rws == nil {
panic("Header called after Handler finished")
}
+ var err error
if rws.bw.Buffered() > 0 {
- if err := rws.bw.Flush(); err != nil {
- // Ignore the error. The frame writer already knows.
- return
- }
+ err = rws.bw.Flush()
} else {
// The bufio.Writer won't call chunkWriter.Write
- // (writeChunk with zero bytes, so we have to do it
+ // (writeChunk with zero bytes), so we have to do it
// ourselves to force the HTTP response header and/or
// final DATA frame (with END_STREAM) to be sent.
- rws.writeChunk(nil)
+ _, err = chunkWriter{rws}.Write(nil)
+ if err == nil {
+ select {
+ case <-rws.stream.cw:
+ err = rws.stream.closeErr
+ default:
+ }
+ }
}
+ return err
}
func (w *responseWriter) CloseNotify() <-chan bool {
@@ -2617,8 +2986,7 @@
// Issue 22880: require valid WriteHeader status codes.
// For now we only enforce that it's three digits.
// In the future we might block things over 599 (600 and above aren't defined
- // at http://httpwg.org/specs/rfc7231.html#status.codes)
- // and we might block under 200 (once we have more mature 1xx support).
+ // at http://httpwg.org/specs/rfc7231.html#status.codes).
// But for now any three digits.
//
// We used to send "HTTP/1.1 000 0" on the wire in responses but there's
@@ -2639,13 +3007,39 @@
}
func (rws *responseWriterState) writeHeader(code int) {
- if !rws.wroteHeader {
- checkWriteHeaderCode(code)
- rws.wroteHeader = true
- rws.status = code
- if len(rws.handlerHeader) > 0 {
- rws.snapHeader = cloneHeader(rws.handlerHeader)
+ if rws.wroteHeader {
+ return
+ }
+
+ checkWriteHeaderCode(code)
+
+ // Handle informational headers
+ if code >= 100 && code <= 199 {
+ // Per RFC 8297 we must not clear the current header map
+ h := rws.handlerHeader
+
+ _, cl := h["Content-Length"]
+ _, te := h["Transfer-Encoding"]
+ if cl || te {
+ h = h.Clone()
+ h.Del("Content-Length")
+ h.Del("Transfer-Encoding")
}
+
+ rws.conn.writeHeaders(rws.stream, &writeResHeaders{
+ streamID: rws.stream.id,
+ httpResCode: code,
+ h: h,
+ endStream: rws.handlerDone && !rws.hasTrailers(),
+ })
+
+ return
+ }
+
+ rws.wroteHeader = true
+ rws.status = code
+ if len(rws.handlerHeader) > 0 {
+ rws.snapHeader = cloneHeader(rws.handlerHeader)
}
}
@@ -2702,19 +3096,10 @@
func (w *responseWriter) handlerDone() {
rws := w.rws
- dirty := rws.dirty
rws.handlerDone = true
w.Flush()
w.rws = nil
- if !dirty {
- // Only recycle the pool if all prior Write calls to
- // the serverConn goroutine completed successfully. If
- // they returned earlier due to resets from the peer
- // there might still be write goroutines outstanding
- // from the serverConn referencing the rws memory. See
- // issue 20704.
- responseWriterStatePool.Put(rws)
- }
+ responseWriterStatePool.Put(rws)
}
// Push errors.
@@ -2897,6 +3282,7 @@
panic(fmt.Sprintf("newWriterAndRequestNoBody(%+v): %v", msg.url, err))
}
+ sc.curHandlers++
go sc.runHandler(rw, req, sc.handler.ServeHTTP)
return promisedID, nil
}
@@ -2976,3 +3362,31 @@
}
return false
}
+
+func (sc *serverConn) countError(name string, err error) error {
+ if sc == nil || sc.srv == nil {
+ return err
+ }
+ f := sc.countErrorFunc
+ if f == nil {
+ return err
+ }
+ var typ string
+ var code ErrCode
+ switch e := err.(type) {
+ case ConnectionError:
+ typ = "conn"
+ code = ErrCode(e)
+ case StreamError:
+ typ = "stream"
+ code = ErrCode(e.Code)
+ default:
+ return err
+ }
+ codeStr := errCodeName[code]
+ if codeStr == "" {
+ codeStr = strconv.Itoa(int(code))
+ }
+ f(fmt.Sprintf("%s_%s_%s", typ, codeStr, name))
+ return err
+}