[VOL-5291] - On demand PON & NNI stats

Change-Id: I1950394b08b0a76968b7e68bffd310714c24a3f3
Signed-off-by: Akash Reddy Kankanala <akash.kankanala@radisys.com>
diff --git a/vendor/golang.org/x/net/http2/server.go b/vendor/golang.org/x/net/http2/server.go
index cd057f3..6c349f3 100644
--- a/vendor/golang.org/x/net/http2/server.go
+++ b/vendor/golang.org/x/net/http2/server.go
@@ -124,6 +124,7 @@
 	// IdleTimeout specifies how long until idle clients should be
 	// closed with a GOAWAY frame. PING frames are not considered
 	// activity for the purposes of IdleTimeout.
+	// If zero or negative, there is no timeout.
 	IdleTimeout time.Duration
 
 	// MaxUploadBufferPerConnection is the size of the initial flow
@@ -153,6 +154,39 @@
 	// so that we don't embed a Mutex in this struct, which will make the
 	// struct non-copyable, which might break some callers.
 	state *serverInternalState
+
+	// Synchronization group used for testing.
+	// Outside of tests, this is nil.
+	group synctestGroupInterface
+}
+
+func (s *Server) markNewGoroutine() {
+	if s.group != nil {
+		s.group.Join()
+	}
+}
+
+func (s *Server) now() time.Time {
+	if s.group != nil {
+		return s.group.Now()
+	}
+	return time.Now()
+}
+
+// newTimer creates a new time.Timer, or a synthetic timer in tests.
+func (s *Server) newTimer(d time.Duration) timer {
+	if s.group != nil {
+		return s.group.NewTimer(d)
+	}
+	return timeTimer{time.NewTimer(d)}
+}
+
+// afterFunc creates a new time.AfterFunc timer, or a synthetic timer in tests.
+func (s *Server) afterFunc(d time.Duration, f func()) timer {
+	if s.group != nil {
+		return s.group.AfterFunc(d, f)
+	}
+	return timeTimer{time.AfterFunc(d, f)}
 }
 
 func (s *Server) initialConnRecvWindowSize() int32 {
@@ -399,6 +433,10 @@
 //
 // The opts parameter is optional. If nil, default values are used.
 func (s *Server) ServeConn(c net.Conn, opts *ServeConnOpts) {
+	s.serveConn(c, opts, nil)
+}
+
+func (s *Server) serveConn(c net.Conn, opts *ServeConnOpts, newf func(*serverConn)) {
 	baseCtx, cancel := serverConnBaseContext(c, opts)
 	defer cancel()
 
@@ -425,6 +463,9 @@
 		pushEnabled:                 true,
 		sawClientPreface:            opts.SawClientPreface,
 	}
+	if newf != nil {
+		newf(sc)
+	}
 
 	s.state.registerConn(sc)
 	defer s.state.unregisterConn(sc)
@@ -434,14 +475,14 @@
 	// passes the connection off to us with the deadline already set.
 	// Write deadlines are set per stream in serverConn.newStream.
 	// Disarm the net.Conn write deadline here.
-	if sc.hs.WriteTimeout != 0 {
+	if sc.hs.WriteTimeout > 0 {
 		sc.conn.SetWriteDeadline(time.Time{})
 	}
 
 	if s.NewWriteScheduler != nil {
 		sc.writeSched = s.NewWriteScheduler()
 	} else {
-		sc.writeSched = NewPriorityWriteScheduler(nil)
+		sc.writeSched = newRoundRobinWriteScheduler()
 	}
 
 	// These start at the RFC-specified defaults. If there is a higher
@@ -581,9 +622,11 @@
 	advMaxStreams               uint32 // our SETTINGS_MAX_CONCURRENT_STREAMS advertised the client
 	curClientStreams            uint32 // number of open streams initiated by the client
 	curPushedStreams            uint32 // number of open streams initiated by server push
+	curHandlers                 uint32 // number of running handler goroutines
 	maxClientStreamID           uint32 // max ever seen from client (odd), or 0 if there have been no client requests
 	maxPushPromiseID            uint32 // ID of the last push promise (even), or 0 if there have been no pushes
 	streams                     map[uint32]*stream
+	unstartedHandlers           []unstartedHandler
 	initialStreamSendWindowSize int32
 	maxFrameSize                int32
 	peerMaxHeaderListSize       uint32            // zero means unknown (default)
@@ -596,8 +639,8 @@
 	inFrameScheduleLoop         bool              // whether we're in the scheduleFrameWrite loop
 	needToSendGoAway            bool              // we need to schedule a GOAWAY frame write
 	goAwayCode                  ErrCode
-	shutdownTimer               *time.Timer // nil until used
-	idleTimer                   *time.Timer // nil if unused
+	shutdownTimer               timer // nil until used
+	idleTimer                   timer // nil if unused
 
 	// Owned by the writeFrameAsync goroutine:
 	headerWriteBuf bytes.Buffer
@@ -646,12 +689,12 @@
 	flow             outflow // limits writing from Handler to client
 	inflow           inflow  // what the client is allowed to POST/etc to us
 	state            streamState
-	resetQueued      bool        // RST_STREAM queued for write; set by sc.resetStream
-	gotTrailerHeader bool        // HEADER frame for trailers was seen
-	wroteHeaders     bool        // whether we wrote headers (not status 100)
-	readDeadline     *time.Timer // nil if unused
-	writeDeadline    *time.Timer // nil if unused
-	closeErr         error       // set before cw is closed
+	resetQueued      bool  // RST_STREAM queued for write; set by sc.resetStream
+	gotTrailerHeader bool  // HEADER frame for trailers was seen
+	wroteHeaders     bool  // whether we wrote headers (not status 100)
+	readDeadline     timer // nil if unused
+	writeDeadline    timer // nil if unused
+	closeErr         error // set before cw is closed
 
 	trailer    http.Header // accumulated trailers
 	reqTrailer http.Header // handler's Request.Trailer
@@ -729,11 +772,7 @@
 		return false
 	}
 
-	// TODO: remove this string search and be more like the Windows
-	// case below. That might involve modifying the standard library
-	// to return better error types.
-	str := err.Error()
-	if strings.Contains(str, "use of closed network connection") {
+	if errors.Is(err, net.ErrClosed) {
 		return true
 	}
 
@@ -812,8 +851,9 @@
 // consumer is done with the frame.
 // It's run on its own goroutine.
 func (sc *serverConn) readFrames() {
-	gate := make(gate)
-	gateDone := gate.Done
+	sc.srv.markNewGoroutine()
+	gate := make(chan struct{})
+	gateDone := func() { gate <- struct{}{} }
 	for {
 		f, err := sc.framer.ReadFrame()
 		select {
@@ -844,6 +884,7 @@
 // At most one goroutine can be running writeFrameAsync at a time per
 // serverConn.
 func (sc *serverConn) writeFrameAsync(wr FrameWriteRequest, wd *writeData) {
+	sc.srv.markNewGoroutine()
 	var err error
 	if wd == nil {
 		err = wr.write.writeFrame(sc)
@@ -922,14 +963,14 @@
 	sc.setConnState(http.StateActive)
 	sc.setConnState(http.StateIdle)
 
-	if sc.srv.IdleTimeout != 0 {
-		sc.idleTimer = time.AfterFunc(sc.srv.IdleTimeout, sc.onIdleTimer)
+	if sc.srv.IdleTimeout > 0 {
+		sc.idleTimer = sc.srv.afterFunc(sc.srv.IdleTimeout, sc.onIdleTimer)
 		defer sc.idleTimer.Stop()
 	}
 
 	go sc.readFrames() // closed by defer sc.conn.Close above
 
-	settingsTimer := time.AfterFunc(firstSettingsTimeout, sc.onSettingsTimer)
+	settingsTimer := sc.srv.afterFunc(firstSettingsTimeout, sc.onSettingsTimer)
 	defer settingsTimer.Stop()
 
 	loopNum := 0
@@ -981,6 +1022,8 @@
 					return
 				case gracefulShutdownMsg:
 					sc.startGracefulShutdownInternal()
+				case handlerDoneMsg:
+					sc.handlerDone()
 				default:
 					panic("unknown timer")
 				}
@@ -1012,14 +1055,6 @@
 	}
 }
 
-func (sc *serverConn) awaitGracefulShutdown(sharedCh <-chan struct{}, privateCh chan struct{}) {
-	select {
-	case <-sc.doneServing:
-	case <-sharedCh:
-		close(privateCh)
-	}
-}
-
 type serverMessage int
 
 // Message values sent to serveMsgCh.
@@ -1028,6 +1063,7 @@
 	idleTimerMsg        = new(serverMessage)
 	shutdownTimerMsg    = new(serverMessage)
 	gracefulShutdownMsg = new(serverMessage)
+	handlerDoneMsg      = new(serverMessage)
 )
 
 func (sc *serverConn) onSettingsTimer() { sc.sendServeMsg(settingsTimerMsg) }
@@ -1063,10 +1099,10 @@
 			errc <- nil
 		}
 	}()
-	timer := time.NewTimer(prefaceTimeout) // TODO: configurable on *Server?
+	timer := sc.srv.newTimer(prefaceTimeout) // TODO: configurable on *Server?
 	defer timer.Stop()
 	select {
-	case <-timer.C:
+	case <-timer.C():
 		return errPrefaceTimeout
 	case err := <-errc:
 		if err == nil {
@@ -1431,7 +1467,7 @@
 
 func (sc *serverConn) shutDownIn(d time.Duration) {
 	sc.serveG.check()
-	sc.shutdownTimer = time.AfterFunc(d, sc.onShutdownTimer)
+	sc.shutdownTimer = sc.srv.afterFunc(d, sc.onShutdownTimer)
 }
 
 func (sc *serverConn) resetStream(se StreamError) {
@@ -1484,6 +1520,11 @@
 		sc.goAway(ErrCodeFlowControl)
 		return true
 	case ConnectionError:
+		if res.f != nil {
+			if id := res.f.Header().StreamID; id > sc.maxClientStreamID {
+				sc.maxClientStreamID = id
+			}
+		}
 		sc.logf("http2: server connection error from %v: %v", sc.conn.RemoteAddr(), ev)
 		sc.goAway(ErrCode(ev))
 		return true // goAway will handle shutdown
@@ -1640,7 +1681,7 @@
 	delete(sc.streams, st.id)
 	if len(sc.streams) == 0 {
 		sc.setConnState(http.StateIdle)
-		if sc.srv.IdleTimeout != 0 {
+		if sc.srv.IdleTimeout > 0 && sc.idleTimer != nil {
 			sc.idleTimer.Reset(sc.srv.IdleTimeout)
 		}
 		if h1ServerKeepAlivesDisabled(sc.hs) {
@@ -1662,6 +1703,7 @@
 		}
 	}
 	st.closeErr = err
+	st.cancelCtx()
 	st.cw.Close() // signals Handler's CloseNotifier, unblocks writes, etc
 	sc.writeSched.CloseStream(st.id)
 }
@@ -1900,9 +1942,11 @@
 // onReadTimeout is run on its own goroutine (from time.AfterFunc)
 // when the stream's ReadTimeout has fired.
 func (st *stream) onReadTimeout() {
-	// Wrap the ErrDeadlineExceeded to avoid callers depending on us
-	// returning the bare error.
-	st.body.CloseWithError(fmt.Errorf("%w", os.ErrDeadlineExceeded))
+	if st.body != nil {
+		// Wrap the ErrDeadlineExceeded to avoid callers depending on us
+		// returning the bare error.
+		st.body.CloseWithError(fmt.Errorf("%w", os.ErrDeadlineExceeded))
+	}
 }
 
 // onWriteTimeout is run on its own goroutine (from time.AfterFunc)
@@ -2018,15 +2062,12 @@
 	// similar to how the http1 server works. Here it's
 	// technically more like the http1 Server's ReadHeaderTimeout
 	// (in Go 1.8), though. That's a more sane option anyway.
-	if sc.hs.ReadTimeout != 0 {
+	if sc.hs.ReadTimeout > 0 {
 		sc.conn.SetReadDeadline(time.Time{})
-		if st.body != nil {
-			st.readDeadline = time.AfterFunc(sc.hs.ReadTimeout, st.onReadTimeout)
-		}
+		st.readDeadline = sc.srv.afterFunc(sc.hs.ReadTimeout, st.onReadTimeout)
 	}
 
-	go sc.runHandler(rw, req, handler)
-	return nil
+	return sc.scheduleHandler(id, rw, req, handler)
 }
 
 func (sc *serverConn) upgradeRequest(req *http.Request) {
@@ -2042,10 +2083,14 @@
 
 	// Disable any read deadline set by the net/http package
 	// prior to the upgrade.
-	if sc.hs.ReadTimeout != 0 {
+	if sc.hs.ReadTimeout > 0 {
 		sc.conn.SetReadDeadline(time.Time{})
 	}
 
+	// This is the first request on the connection,
+	// so start the handler directly rather than going
+	// through scheduleHandler.
+	sc.curHandlers++
 	go sc.runHandler(rw, req, sc.handler.ServeHTTP)
 }
 
@@ -2116,8 +2161,8 @@
 	st.flow.conn = &sc.flow // link to conn-level counter
 	st.flow.add(sc.initialStreamSendWindowSize)
 	st.inflow.init(sc.srv.initialStreamRecvWindowSize())
-	if sc.hs.WriteTimeout != 0 {
-		st.writeDeadline = time.AfterFunc(sc.hs.WriteTimeout, st.onWriteTimeout)
+	if sc.hs.WriteTimeout > 0 {
+		st.writeDeadline = sc.srv.afterFunc(sc.hs.WriteTimeout, st.onWriteTimeout)
 	}
 
 	sc.streams[id] = st
@@ -2286,8 +2331,63 @@
 	return &responseWriter{rws: rws}
 }
 
+type unstartedHandler struct {
+	streamID uint32
+	rw       *responseWriter
+	req      *http.Request
+	handler  func(http.ResponseWriter, *http.Request)
+}
+
+// scheduleHandler starts a handler goroutine,
+// or schedules one to start as soon as an existing handler finishes.
+func (sc *serverConn) scheduleHandler(streamID uint32, rw *responseWriter, req *http.Request, handler func(http.ResponseWriter, *http.Request)) error {
+	sc.serveG.check()
+	maxHandlers := sc.advMaxStreams
+	if sc.curHandlers < maxHandlers {
+		sc.curHandlers++
+		go sc.runHandler(rw, req, handler)
+		return nil
+	}
+	if len(sc.unstartedHandlers) > int(4*sc.advMaxStreams) {
+		return sc.countError("too_many_early_resets", ConnectionError(ErrCodeEnhanceYourCalm))
+	}
+	sc.unstartedHandlers = append(sc.unstartedHandlers, unstartedHandler{
+		streamID: streamID,
+		rw:       rw,
+		req:      req,
+		handler:  handler,
+	})
+	return nil
+}
+
+func (sc *serverConn) handlerDone() {
+	sc.serveG.check()
+	sc.curHandlers--
+	i := 0
+	maxHandlers := sc.advMaxStreams
+	for ; i < len(sc.unstartedHandlers); i++ {
+		u := sc.unstartedHandlers[i]
+		if sc.streams[u.streamID] == nil {
+			// This stream was reset before its goroutine had a chance to start.
+			continue
+		}
+		if sc.curHandlers >= maxHandlers {
+			break
+		}
+		sc.curHandlers++
+		go sc.runHandler(u.rw, u.req, u.handler)
+		sc.unstartedHandlers[i] = unstartedHandler{} // don't retain references
+	}
+	sc.unstartedHandlers = sc.unstartedHandlers[i:]
+	if len(sc.unstartedHandlers) == 0 {
+		sc.unstartedHandlers = nil
+	}
+}
+
 // Run on its own goroutine.
 func (sc *serverConn) runHandler(rw *responseWriter, req *http.Request, handler func(http.ResponseWriter, *http.Request)) {
+	sc.srv.markNewGoroutine()
+	defer sc.sendServeMsg(handlerDoneMsg)
 	didPanic := true
 	defer func() {
 		rw.rws.stream.cancelCtx()
@@ -2429,7 +2529,7 @@
 	conn          *serverConn
 	closeOnce     sync.Once // for use by Close only
 	sawEOF        bool      // for use by Read only
-	pipe          *pipe     // non-nil if we have a HTTP entity message body
+	pipe          *pipe     // non-nil if we have an HTTP entity message body
 	needsContinue bool      // need to send a 100-continue
 }
 
@@ -2495,7 +2595,6 @@
 	wroteHeader   bool        // WriteHeader called (explicitly or implicitly). Not necessarily sent to user yet.
 	sentHeader    bool        // have we sent the header frame?
 	handlerDone   bool        // handler has finished
-	dirty         bool        // a Write failed; don't reuse this responseWriterState
 
 	sentContentLen int64 // non-zero if handler set a Content-Length header
 	wroteBytes     int64
@@ -2569,7 +2668,8 @@
 				clen = ""
 			}
 		}
-		if clen == "" && rws.handlerDone && bodyAllowedForStatus(rws.status) && (len(p) > 0 || !isHeadResp) {
+		_, hasContentLength := rws.snapHeader["Content-Length"]
+		if !hasContentLength && clen == "" && rws.handlerDone && bodyAllowedForStatus(rws.status) && (len(p) > 0 || !isHeadResp) {
 			clen = strconv.Itoa(len(p))
 		}
 		_, hasContentType := rws.snapHeader["Content-Type"]
@@ -2583,7 +2683,7 @@
 		var date string
 		if _, ok := rws.snapHeader["Date"]; !ok {
 			// TODO(bradfitz): be faster here, like net/http? measure.
-			date = time.Now().UTC().Format(http.TimeFormat)
+			date = rws.conn.srv.now().UTC().Format(http.TimeFormat)
 		}
 
 		for _, v := range rws.snapHeader["Trailer"] {
@@ -2614,7 +2714,6 @@
 			date:          date,
 		})
 		if err != nil {
-			rws.dirty = true
 			return 0, err
 		}
 		if endStream {
@@ -2635,7 +2734,6 @@
 	if len(p) > 0 || endStream {
 		// only send a 0 byte DATA frame if we're ending the stream.
 		if err := rws.conn.writeDataFromHandler(rws.stream, p, endStream); err != nil {
-			rws.dirty = true
 			return 0, err
 		}
 	}
@@ -2647,9 +2745,6 @@
 			trailers:  rws.trailers,
 			endStream: true,
 		})
-		if err != nil {
-			rws.dirty = true
-		}
 		return len(p), err
 	}
 	return len(p), nil
@@ -2710,7 +2805,7 @@
 
 func (w *responseWriter) SetReadDeadline(deadline time.Time) error {
 	st := w.rws.stream
-	if !deadline.IsZero() && deadline.Before(time.Now()) {
+	if !deadline.IsZero() && deadline.Before(w.rws.conn.srv.now()) {
 		// If we're setting a deadline in the past, reset the stream immediately
 		// so writes after SetWriteDeadline returns will fail.
 		st.onReadTimeout()
@@ -2726,9 +2821,9 @@
 		if deadline.IsZero() {
 			st.readDeadline = nil
 		} else if st.readDeadline == nil {
-			st.readDeadline = time.AfterFunc(deadline.Sub(time.Now()), st.onReadTimeout)
+			st.readDeadline = sc.srv.afterFunc(deadline.Sub(sc.srv.now()), st.onReadTimeout)
 		} else {
-			st.readDeadline.Reset(deadline.Sub(time.Now()))
+			st.readDeadline.Reset(deadline.Sub(sc.srv.now()))
 		}
 	})
 	return nil
@@ -2736,7 +2831,7 @@
 
 func (w *responseWriter) SetWriteDeadline(deadline time.Time) error {
 	st := w.rws.stream
-	if !deadline.IsZero() && deadline.Before(time.Now()) {
+	if !deadline.IsZero() && deadline.Before(w.rws.conn.srv.now()) {
 		// If we're setting a deadline in the past, reset the stream immediately
 		// so writes after SetWriteDeadline returns will fail.
 		st.onWriteTimeout()
@@ -2752,9 +2847,9 @@
 		if deadline.IsZero() {
 			st.writeDeadline = nil
 		} else if st.writeDeadline == nil {
-			st.writeDeadline = time.AfterFunc(deadline.Sub(time.Now()), st.onWriteTimeout)
+			st.writeDeadline = sc.srv.afterFunc(deadline.Sub(sc.srv.now()), st.onWriteTimeout)
 		} else {
-			st.writeDeadline.Reset(deadline.Sub(time.Now()))
+			st.writeDeadline.Reset(deadline.Sub(sc.srv.now()))
 		}
 	})
 	return nil
@@ -2774,7 +2869,7 @@
 		err = rws.bw.Flush()
 	} else {
 		// The bufio.Writer won't call chunkWriter.Write
-		// (writeChunk with zero bytes, so we have to do it
+		// (writeChunk with zero bytes), so we have to do it
 		// ourselves to force the HTTP response header and/or
 		// final DATA frame (with END_STREAM) to be sent.
 		_, err = chunkWriter{rws}.Write(nil)
@@ -2865,14 +2960,12 @@
 			h.Del("Transfer-Encoding")
 		}
 
-		if rws.conn.writeHeaders(rws.stream, &writeResHeaders{
+		rws.conn.writeHeaders(rws.stream, &writeResHeaders{
 			streamID:    rws.stream.id,
 			httpResCode: code,
 			h:           h,
 			endStream:   rws.handlerDone && !rws.hasTrailers(),
-		}) != nil {
-			rws.dirty = true
-		}
+		})
 
 		return
 	}
@@ -2937,19 +3030,10 @@
 
 func (w *responseWriter) handlerDone() {
 	rws := w.rws
-	dirty := rws.dirty
 	rws.handlerDone = true
 	w.Flush()
 	w.rws = nil
-	if !dirty {
-		// Only recycle the pool if all prior Write calls to
-		// the serverConn goroutine completed successfully. If
-		// they returned earlier due to resets from the peer
-		// there might still be write goroutines outstanding
-		// from the serverConn referencing the rws memory. See
-		// issue 20704.
-		responseWriterStatePool.Put(rws)
-	}
+	responseWriterStatePool.Put(rws)
 }
 
 // Push errors.
@@ -3132,6 +3216,7 @@
 			panic(fmt.Sprintf("newWriterAndRequestNoBody(%+v): %v", msg.url, err))
 		}
 
+		sc.curHandlers++
 		go sc.runHandler(rw, req, sc.handler.ServeHTTP)
 		return promisedID, nil
 	}