VOL-1691 Fix openolt adapter getting stuck while registartion with core

Change-Id: Ide8131f325bc15f1b909e14d7af6ee9bcd6b3b5b
diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_client.go b/vendor/google.golang.org/grpc/internal/transport/http2_client.go
index ff8f4db..c96178d 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http2_client.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http2_client.go
@@ -117,6 +117,8 @@
 
 	onGoAway func(GoAwayReason)
 	onClose  func()
+
+	bufferPool *bufferPool
 }
 
 func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) {
@@ -249,6 +251,7 @@
 		onGoAway:              onGoAway,
 		onClose:               onClose,
 		keepaliveEnabled:      keepaliveEnabled,
+		bufferPool:            newBufferPool(),
 	}
 	t.controlBuf = newControlBuffer(t.ctxDone)
 	if opts.InitialWindowSize >= defaultWindowSize {
@@ -367,6 +370,7 @@
 			closeStream: func(err error) {
 				t.CloseStream(s, err)
 			},
+			freeBuffer: t.bufferPool.put,
 		},
 		windowHandler: func(n int) {
 			t.updateWindow(s, uint32(n))
@@ -437,6 +441,15 @@
 
 	if md, added, ok := metadata.FromOutgoingContextRaw(ctx); ok {
 		var k string
+		for k, vv := range md {
+			// HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
+			if isReservedHeader(k) {
+				continue
+			}
+			for _, v := range vv {
+				headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
+			}
+		}
 		for _, vv := range added {
 			for i, v := range vv {
 				if i%2 == 0 {
@@ -450,15 +463,6 @@
 				headerFields = append(headerFields, hpack.HeaderField{Name: strings.ToLower(k), Value: encodeMetadataHeader(k, v)})
 			}
 		}
-		for k, vv := range md {
-			// HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
-			if isReservedHeader(k) {
-				continue
-			}
-			for _, v := range vv {
-				headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
-			}
-		}
 	}
 	if md, ok := t.md.(*metadata.MD); ok {
 		for k, vv := range *md {
@@ -549,7 +553,7 @@
 		s.write(recvMsg{err: err})
 		close(s.done)
 		// If headerChan isn't closed, then close it.
-		if atomic.SwapUint32(&s.headerDone, 1) == 0 {
+		if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
 			close(s.headerChan)
 		}
 
@@ -713,7 +717,7 @@
 		s.write(recvMsg{err: err})
 	}
 	// If headerChan isn't closed, then close it.
-	if atomic.SwapUint32(&s.headerDone, 1) == 0 {
+	if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
 		s.noHeaders = true
 		close(s.headerChan)
 	}
@@ -794,21 +798,21 @@
 // stream is closed.  If there are no active streams, the transport is closed
 // immediately.  This does nothing if the transport is already draining or
 // closing.
-func (t *http2Client) GracefulClose() error {
+func (t *http2Client) GracefulClose() {
 	t.mu.Lock()
 	// Make sure we move to draining only from active.
 	if t.state == draining || t.state == closing {
 		t.mu.Unlock()
-		return nil
+		return
 	}
 	t.state = draining
 	active := len(t.activeStreams)
 	t.mu.Unlock()
 	if active == 0 {
-		return t.Close()
+		t.Close()
+		return
 	}
 	t.controlBuf.put(&incomingGoAway{})
-	return nil
 }
 
 // Write formats the data into HTTP2 data frame(s) and sends it out. The caller
@@ -946,9 +950,10 @@
 		// guarantee f.Data() is consumed before the arrival of next frame.
 		// Can this copy be eliminated?
 		if len(f.Data()) > 0 {
-			data := make([]byte, len(f.Data()))
-			copy(data, f.Data())
-			s.write(recvMsg{data: data})
+			buffer := t.bufferPool.get()
+			buffer.Reset()
+			buffer.Write(f.Data())
+			s.write(recvMsg{buffer: buffer})
 		}
 	}
 	// The server has closed the stream without sending trailers.  Record that
@@ -1140,16 +1145,26 @@
 	if !ok {
 		return
 	}
+	endStream := frame.StreamEnded()
 	atomic.StoreUint32(&s.bytesReceived, 1)
-	var state decodeState
-	if err := state.decodeHeader(frame); err != nil {
-		t.closeStream(s, err, true, http2.ErrCodeProtocol, status.New(codes.Internal, err.Error()), nil, false)
-		// Something wrong. Stops reading even when there is remaining.
+	initialHeader := atomic.LoadUint32(&s.headerChanClosed) == 0
+
+	if !initialHeader && !endStream {
+		// As specified by gRPC over HTTP2, a HEADERS frame (and associated CONTINUATION frames) can only appear at the start or end of a stream. Therefore, second HEADERS frame must have EOS bit set.
+		st := status.New(codes.Internal, "a HEADERS frame cannot appear in the middle of a stream")
+		t.closeStream(s, st.Err(), true, http2.ErrCodeProtocol, st, nil, false)
 		return
 	}
 
-	endStream := frame.StreamEnded()
-	var isHeader bool
+	state := &decodeState{}
+	// Initialize isGRPC value to be !initialHeader, since if a gRPC Response-Headers has already been received, then it means that the peer is speaking gRPC and we are in gRPC mode.
+	state.data.isGRPC = !initialHeader
+	if err := state.decodeHeader(frame); err != nil {
+		t.closeStream(s, err, true, http2.ErrCodeProtocol, status.Convert(err), nil, endStream)
+		return
+	}
+
+	isHeader := false
 	defer func() {
 		if t.statsHandler != nil {
 			if isHeader {
@@ -1167,29 +1182,33 @@
 			}
 		}
 	}()
-	// If headers haven't been received yet.
-	if atomic.SwapUint32(&s.headerDone, 1) == 0 {
+
+	// If headerChan hasn't been closed yet
+	if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
 		if !endStream {
-			// Headers frame is not actually a trailers-only frame.
+			// HEADERS frame block carries a Response-Headers.
 			isHeader = true
 			// These values can be set without any synchronization because
 			// stream goroutine will read it only after seeing a closed
 			// headerChan which we'll close after setting this.
-			s.recvCompress = state.encoding
-			if len(state.mdata) > 0 {
-				s.header = state.mdata
+			s.recvCompress = state.data.encoding
+			if len(state.data.mdata) > 0 {
+				s.header = state.data.mdata
 			}
 		} else {
+			// HEADERS frame block carries a Trailers-Only.
 			s.noHeaders = true
 		}
 		close(s.headerChan)
 	}
+
 	if !endStream {
 		return
 	}
+
 	// if client received END_STREAM from server while stream was still active, send RST_STREAM
 	rst := s.getState() == streamActive
-	t.closeStream(s, io.EOF, rst, http2.ErrCodeNo, state.status(), state.mdata, true)
+	t.closeStream(s, io.EOF, rst, http2.ErrCodeNo, state.status(), state.data.mdata, true)
 }
 
 // reader runs as a separate goroutine in charge of reading data from network
@@ -1356,6 +1375,8 @@
 	return &s
 }
 
+func (t *http2Client) RemoteAddr() net.Addr { return t.remoteAddr }
+
 func (t *http2Client) IncrMsgSent() {
 	atomic.AddInt64(&t.czData.msgSent, 1)
 	atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano())