VOL-1691 Fix openolt adapter getting stuck while registartion with core

Change-Id: Ide8131f325bc15f1b909e14d7af6ee9bcd6b3b5b
diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_server.go b/vendor/google.golang.org/grpc/internal/transport/http2_server.go
index d038b2d..150b73e 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http2_server.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http2_server.go
@@ -35,9 +35,11 @@
 	"golang.org/x/net/http2"
 	"golang.org/x/net/http2/hpack"
 
+	spb "google.golang.org/genproto/googleapis/rpc/status"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/credentials"
 	"google.golang.org/grpc/grpclog"
+	"google.golang.org/grpc/internal"
 	"google.golang.org/grpc/internal/channelz"
 	"google.golang.org/grpc/internal/grpcrand"
 	"google.golang.org/grpc/keepalive"
@@ -55,6 +57,9 @@
 	// ErrHeaderListSizeLimitViolation indicates that the header list size is larger
 	// than the limit set by peer.
 	ErrHeaderListSizeLimitViolation = errors.New("transport: trying to send header list size larger than the limit set by peer")
+	// statusRawProto is a function to get to the raw status proto wrapped in a
+	// status.Status without a proto.Clone().
+	statusRawProto = internal.StatusRawProto.(func(*status.Status) *spb.Status)
 )
 
 // http2Server implements the ServerTransport interface with HTTP2.
@@ -119,6 +124,7 @@
 	// Fields below are for channelz metric collection.
 	channelzID int64 // channelz unique identification number
 	czData     *channelzData
+	bufferPool *bufferPool
 }
 
 // newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
@@ -220,6 +226,7 @@
 		kep:               kep,
 		initialWindowSize: iwz,
 		czData:            new(channelzData),
+		bufferPool:        newBufferPool(),
 	}
 	t.controlBuf = newControlBuffer(t.ctxDone)
 	if dynamicWindow {
@@ -286,7 +293,9 @@
 // operateHeader takes action on the decoded headers.
 func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) {
 	streamID := frame.Header().StreamID
-	state := decodeState{serverSide: true}
+	state := &decodeState{
+		serverSide: true,
+	}
 	if err := state.decodeHeader(frame); err != nil {
 		if se, ok := status.FromError(err); ok {
 			t.controlBuf.put(&cleanupStream{
@@ -305,16 +314,16 @@
 		st:             t,
 		buf:            buf,
 		fc:             &inFlow{limit: uint32(t.initialWindowSize)},
-		recvCompress:   state.encoding,
-		method:         state.method,
-		contentSubtype: state.contentSubtype,
+		recvCompress:   state.data.encoding,
+		method:         state.data.method,
+		contentSubtype: state.data.contentSubtype,
 	}
 	if frame.StreamEnded() {
 		// s is just created by the caller. No lock needed.
 		s.state = streamReadDone
 	}
-	if state.timeoutSet {
-		s.ctx, s.cancel = context.WithTimeout(t.ctx, state.timeout)
+	if state.data.timeoutSet {
+		s.ctx, s.cancel = context.WithTimeout(t.ctx, state.data.timeout)
 	} else {
 		s.ctx, s.cancel = context.WithCancel(t.ctx)
 	}
@@ -327,19 +336,19 @@
 	}
 	s.ctx = peer.NewContext(s.ctx, pr)
 	// Attach the received metadata to the context.
-	if len(state.mdata) > 0 {
-		s.ctx = metadata.NewIncomingContext(s.ctx, state.mdata)
+	if len(state.data.mdata) > 0 {
+		s.ctx = metadata.NewIncomingContext(s.ctx, state.data.mdata)
 	}
-	if state.statsTags != nil {
-		s.ctx = stats.SetIncomingTags(s.ctx, state.statsTags)
+	if state.data.statsTags != nil {
+		s.ctx = stats.SetIncomingTags(s.ctx, state.data.statsTags)
 	}
-	if state.statsTrace != nil {
-		s.ctx = stats.SetIncomingTrace(s.ctx, state.statsTrace)
+	if state.data.statsTrace != nil {
+		s.ctx = stats.SetIncomingTrace(s.ctx, state.data.statsTrace)
 	}
 	if t.inTapHandle != nil {
 		var err error
 		info := &tap.Info{
-			FullMethodName: state.method,
+			FullMethodName: state.data.method,
 		}
 		s.ctx, err = t.inTapHandle(s.ctx, info)
 		if err != nil {
@@ -403,9 +412,10 @@
 	s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)
 	s.trReader = &transportReader{
 		reader: &recvBufferReader{
-			ctx:     s.ctx,
-			ctxDone: s.ctxDone,
-			recv:    s.buf,
+			ctx:        s.ctx,
+			ctxDone:    s.ctxDone,
+			recv:       s.buf,
+			freeBuffer: t.bufferPool.put,
 		},
 		windowHandler: func(n int) {
 			t.updateWindow(s, uint32(n))
@@ -435,7 +445,7 @@
 				s := t.activeStreams[se.StreamID]
 				t.mu.Unlock()
 				if s != nil {
-					t.closeStream(s, true, se.Code, nil, false)
+					t.closeStream(s, true, se.Code, false)
 				} else {
 					t.controlBuf.put(&cleanupStream{
 						streamID: se.StreamID,
@@ -577,7 +587,7 @@
 	}
 	if size > 0 {
 		if err := s.fc.onData(size); err != nil {
-			t.closeStream(s, true, http2.ErrCodeFlowControl, nil, false)
+			t.closeStream(s, true, http2.ErrCodeFlowControl, false)
 			return
 		}
 		if f.Header().Flags.Has(http2.FlagDataPadded) {
@@ -589,9 +599,10 @@
 		// guarantee f.Data() is consumed before the arrival of next frame.
 		// Can this copy be eliminated?
 		if len(f.Data()) > 0 {
-			data := make([]byte, len(f.Data()))
-			copy(data, f.Data())
-			s.write(recvMsg{data: data})
+			buffer := t.bufferPool.get()
+			buffer.Reset()
+			buffer.Write(f.Data())
+			s.write(recvMsg{buffer: buffer})
 		}
 	}
 	if f.Header().Flags.Has(http2.FlagDataEndStream) {
@@ -602,11 +613,18 @@
 }
 
 func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) {
-	s, ok := t.getStream(f)
-	if !ok {
+	// If the stream is not deleted from the transport's active streams map, then do a regular close stream.
+	if s, ok := t.getStream(f); ok {
+		t.closeStream(s, false, 0, false)
 		return
 	}
-	t.closeStream(s, false, 0, nil, false)
+	// If the stream is already deleted from the active streams map, then put a cleanupStream item into controlbuf to delete the stream from loopy writer's established streams map.
+	t.controlBuf.put(&cleanupStream{
+		streamID: f.Header().StreamID,
+		rst:      false,
+		rstCode:  0,
+		onWrite:  func() {},
+	})
 }
 
 func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
@@ -770,7 +788,7 @@
 		if err != nil {
 			return err
 		}
-		t.closeStream(s, true, http2.ErrCodeInternal, nil, false)
+		t.closeStream(s, true, http2.ErrCodeInternal, false)
 		return ErrHeaderListSizeLimitViolation
 	}
 	if t.stats != nil {
@@ -808,7 +826,7 @@
 	headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))})
 	headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())})
 
-	if p := st.Proto(); p != nil && len(p.Details) > 0 {
+	if p := statusRawProto(st); p != nil && len(p.Details) > 0 {
 		stBytes, err := proto.Marshal(p)
 		if err != nil {
 			// TODO: return error instead, when callers are able to handle it.
@@ -834,10 +852,12 @@
 		if err != nil {
 			return err
 		}
-		t.closeStream(s, true, http2.ErrCodeInternal, nil, false)
+		t.closeStream(s, true, http2.ErrCodeInternal, false)
 		return ErrHeaderListSizeLimitViolation
 	}
-	t.closeStream(s, false, 0, trailingHeader, true)
+	// Send a RST_STREAM after the trailers if the client has not already half-closed.
+	rst := s.getState() == streamActive
+	t.finishStream(s, rst, http2.ErrCodeNo, trailingHeader, true)
 	if t.stats != nil {
 		t.stats.HandleRPC(s.Context(), &stats.OutTrailer{})
 	}
@@ -849,6 +869,9 @@
 func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
 	if !s.isHeaderSent() { // Headers haven't been written yet.
 		if err := t.WriteHeader(s, nil); err != nil {
+			if _, ok := err.(ConnectionError); ok {
+				return err
+			}
 			// TODO(mmukhi, dfawley): Make sure this is the right code to return.
 			return status.Errorf(codes.Internal, "transport: %v", err)
 		}
@@ -1006,15 +1029,17 @@
 
 // deleteStream deletes the stream s from transport's active streams.
 func (t *http2Server) deleteStream(s *Stream, eosReceived bool) {
-	t.mu.Lock()
-	if _, ok := t.activeStreams[s.id]; !ok {
-		t.mu.Unlock()
-		return
-	}
+	// In case stream sending and receiving are invoked in separate
+	// goroutines (e.g., bi-directional streaming), cancel needs to be
+	// called to interrupt the potential blocking on other goroutines.
+	s.cancel()
 
-	delete(t.activeStreams, s.id)
-	if len(t.activeStreams) == 0 {
-		t.idle = time.Now()
+	t.mu.Lock()
+	if _, ok := t.activeStreams[s.id]; ok {
+		delete(t.activeStreams, s.id)
+		if len(t.activeStreams) == 0 {
+			t.idle = time.Now()
+		}
 	}
 	t.mu.Unlock()
 
@@ -1027,51 +1052,36 @@
 	}
 }
 
-// closeStream clears the footprint of a stream when the stream is not needed
-// any more.
-func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {
-	// Mark the stream as done
+// finishStream closes the stream and puts the trailing headerFrame into controlbuf.
+func (t *http2Server) finishStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {
 	oldState := s.swapState(streamDone)
+	if oldState == streamDone {
+		// If the stream was already done, return.
+		return
+	}
 
-	// In case stream sending and receiving are invoked in separate
-	// goroutines (e.g., bi-directional streaming), cancel needs to be
-	// called to interrupt the potential blocking on other goroutines.
-	s.cancel()
+	hdr.cleanup = &cleanupStream{
+		streamID: s.id,
+		rst:      rst,
+		rstCode:  rstCode,
+		onWrite: func() {
+			t.deleteStream(s, eosReceived)
+		},
+	}
+	t.controlBuf.put(hdr)
+}
 
-	// Deletes the stream from active streams
+// closeStream clears the footprint of a stream when the stream is not needed any more.
+func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, eosReceived bool) {
+	s.swapState(streamDone)
 	t.deleteStream(s, eosReceived)
 
-	cleanup := &cleanupStream{
+	t.controlBuf.put(&cleanupStream{
 		streamID: s.id,
 		rst:      rst,
 		rstCode:  rstCode,
 		onWrite:  func() {},
-	}
-
-	// No trailer. Puts cleanupFrame into transport's control buffer.
-	if hdr == nil {
-		t.controlBuf.put(cleanup)
-		return
-	}
-
-	// We do the check here, because of the following scenario:
-	// 1. closeStream is called first with a trailer. A trailer item with a piggybacked cleanup item
-	// is put to control buffer.
-	// 2. Loopy writer is waiting on a stream quota. It will never get it because client errored at
-	// some point. So loopy can't act on trailer
-	// 3. Client sends a RST_STREAM due to the error. Then closeStream is called without a trailer as
-	// the result of the received RST_STREAM.
-	// If we do this check at the beginning of the closeStream, then we won't put a cleanup item in
-	// response to received RST_STREAM into the control buffer and outStream in loopy writer will
-	// never get cleaned up.
-
-	// If the stream is already done, don't send the trailer.
-	if oldState == streamDone {
-		return
-	}
-
-	hdr.cleanup = cleanup
-	t.controlBuf.put(hdr)
+	})
 }
 
 func (t *http2Server) RemoteAddr() net.Addr {