VOL-1691 Fix openolt adapter getting stuck while registartion with core
Change-Id: Ide8131f325bc15f1b909e14d7af6ee9bcd6b3b5b
diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_server.go b/vendor/google.golang.org/grpc/internal/transport/http2_server.go
index d038b2d..150b73e 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http2_server.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http2_server.go
@@ -35,9 +35,11 @@
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
+ spb "google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
+ "google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/keepalive"
@@ -55,6 +57,9 @@
// ErrHeaderListSizeLimitViolation indicates that the header list size is larger
// than the limit set by peer.
ErrHeaderListSizeLimitViolation = errors.New("transport: trying to send header list size larger than the limit set by peer")
+ // statusRawProto is a function to get to the raw status proto wrapped in a
+ // status.Status without a proto.Clone().
+ statusRawProto = internal.StatusRawProto.(func(*status.Status) *spb.Status)
)
// http2Server implements the ServerTransport interface with HTTP2.
@@ -119,6 +124,7 @@
// Fields below are for channelz metric collection.
channelzID int64 // channelz unique identification number
czData *channelzData
+ bufferPool *bufferPool
}
// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
@@ -220,6 +226,7 @@
kep: kep,
initialWindowSize: iwz,
czData: new(channelzData),
+ bufferPool: newBufferPool(),
}
t.controlBuf = newControlBuffer(t.ctxDone)
if dynamicWindow {
@@ -286,7 +293,9 @@
// operateHeader takes action on the decoded headers.
func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) {
streamID := frame.Header().StreamID
- state := decodeState{serverSide: true}
+ state := &decodeState{
+ serverSide: true,
+ }
if err := state.decodeHeader(frame); err != nil {
if se, ok := status.FromError(err); ok {
t.controlBuf.put(&cleanupStream{
@@ -305,16 +314,16 @@
st: t,
buf: buf,
fc: &inFlow{limit: uint32(t.initialWindowSize)},
- recvCompress: state.encoding,
- method: state.method,
- contentSubtype: state.contentSubtype,
+ recvCompress: state.data.encoding,
+ method: state.data.method,
+ contentSubtype: state.data.contentSubtype,
}
if frame.StreamEnded() {
// s is just created by the caller. No lock needed.
s.state = streamReadDone
}
- if state.timeoutSet {
- s.ctx, s.cancel = context.WithTimeout(t.ctx, state.timeout)
+ if state.data.timeoutSet {
+ s.ctx, s.cancel = context.WithTimeout(t.ctx, state.data.timeout)
} else {
s.ctx, s.cancel = context.WithCancel(t.ctx)
}
@@ -327,19 +336,19 @@
}
s.ctx = peer.NewContext(s.ctx, pr)
// Attach the received metadata to the context.
- if len(state.mdata) > 0 {
- s.ctx = metadata.NewIncomingContext(s.ctx, state.mdata)
+ if len(state.data.mdata) > 0 {
+ s.ctx = metadata.NewIncomingContext(s.ctx, state.data.mdata)
}
- if state.statsTags != nil {
- s.ctx = stats.SetIncomingTags(s.ctx, state.statsTags)
+ if state.data.statsTags != nil {
+ s.ctx = stats.SetIncomingTags(s.ctx, state.data.statsTags)
}
- if state.statsTrace != nil {
- s.ctx = stats.SetIncomingTrace(s.ctx, state.statsTrace)
+ if state.data.statsTrace != nil {
+ s.ctx = stats.SetIncomingTrace(s.ctx, state.data.statsTrace)
}
if t.inTapHandle != nil {
var err error
info := &tap.Info{
- FullMethodName: state.method,
+ FullMethodName: state.data.method,
}
s.ctx, err = t.inTapHandle(s.ctx, info)
if err != nil {
@@ -403,9 +412,10 @@
s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)
s.trReader = &transportReader{
reader: &recvBufferReader{
- ctx: s.ctx,
- ctxDone: s.ctxDone,
- recv: s.buf,
+ ctx: s.ctx,
+ ctxDone: s.ctxDone,
+ recv: s.buf,
+ freeBuffer: t.bufferPool.put,
},
windowHandler: func(n int) {
t.updateWindow(s, uint32(n))
@@ -435,7 +445,7 @@
s := t.activeStreams[se.StreamID]
t.mu.Unlock()
if s != nil {
- t.closeStream(s, true, se.Code, nil, false)
+ t.closeStream(s, true, se.Code, false)
} else {
t.controlBuf.put(&cleanupStream{
streamID: se.StreamID,
@@ -577,7 +587,7 @@
}
if size > 0 {
if err := s.fc.onData(size); err != nil {
- t.closeStream(s, true, http2.ErrCodeFlowControl, nil, false)
+ t.closeStream(s, true, http2.ErrCodeFlowControl, false)
return
}
if f.Header().Flags.Has(http2.FlagDataPadded) {
@@ -589,9 +599,10 @@
// guarantee f.Data() is consumed before the arrival of next frame.
// Can this copy be eliminated?
if len(f.Data()) > 0 {
- data := make([]byte, len(f.Data()))
- copy(data, f.Data())
- s.write(recvMsg{data: data})
+ buffer := t.bufferPool.get()
+ buffer.Reset()
+ buffer.Write(f.Data())
+ s.write(recvMsg{buffer: buffer})
}
}
if f.Header().Flags.Has(http2.FlagDataEndStream) {
@@ -602,11 +613,18 @@
}
func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) {
- s, ok := t.getStream(f)
- if !ok {
+ // If the stream is not deleted from the transport's active streams map, then do a regular close stream.
+ if s, ok := t.getStream(f); ok {
+ t.closeStream(s, false, 0, false)
return
}
- t.closeStream(s, false, 0, nil, false)
+ // If the stream is already deleted from the active streams map, then put a cleanupStream item into controlbuf to delete the stream from loopy writer's established streams map.
+ t.controlBuf.put(&cleanupStream{
+ streamID: f.Header().StreamID,
+ rst: false,
+ rstCode: 0,
+ onWrite: func() {},
+ })
}
func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
@@ -770,7 +788,7 @@
if err != nil {
return err
}
- t.closeStream(s, true, http2.ErrCodeInternal, nil, false)
+ t.closeStream(s, true, http2.ErrCodeInternal, false)
return ErrHeaderListSizeLimitViolation
}
if t.stats != nil {
@@ -808,7 +826,7 @@
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))})
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())})
- if p := st.Proto(); p != nil && len(p.Details) > 0 {
+ if p := statusRawProto(st); p != nil && len(p.Details) > 0 {
stBytes, err := proto.Marshal(p)
if err != nil {
// TODO: return error instead, when callers are able to handle it.
@@ -834,10 +852,12 @@
if err != nil {
return err
}
- t.closeStream(s, true, http2.ErrCodeInternal, nil, false)
+ t.closeStream(s, true, http2.ErrCodeInternal, false)
return ErrHeaderListSizeLimitViolation
}
- t.closeStream(s, false, 0, trailingHeader, true)
+ // Send a RST_STREAM after the trailers if the client has not already half-closed.
+ rst := s.getState() == streamActive
+ t.finishStream(s, rst, http2.ErrCodeNo, trailingHeader, true)
if t.stats != nil {
t.stats.HandleRPC(s.Context(), &stats.OutTrailer{})
}
@@ -849,6 +869,9 @@
func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
if !s.isHeaderSent() { // Headers haven't been written yet.
if err := t.WriteHeader(s, nil); err != nil {
+ if _, ok := err.(ConnectionError); ok {
+ return err
+ }
// TODO(mmukhi, dfawley): Make sure this is the right code to return.
return status.Errorf(codes.Internal, "transport: %v", err)
}
@@ -1006,15 +1029,17 @@
// deleteStream deletes the stream s from transport's active streams.
func (t *http2Server) deleteStream(s *Stream, eosReceived bool) {
- t.mu.Lock()
- if _, ok := t.activeStreams[s.id]; !ok {
- t.mu.Unlock()
- return
- }
+ // In case stream sending and receiving are invoked in separate
+ // goroutines (e.g., bi-directional streaming), cancel needs to be
+ // called to interrupt the potential blocking on other goroutines.
+ s.cancel()
- delete(t.activeStreams, s.id)
- if len(t.activeStreams) == 0 {
- t.idle = time.Now()
+ t.mu.Lock()
+ if _, ok := t.activeStreams[s.id]; ok {
+ delete(t.activeStreams, s.id)
+ if len(t.activeStreams) == 0 {
+ t.idle = time.Now()
+ }
}
t.mu.Unlock()
@@ -1027,51 +1052,36 @@
}
}
-// closeStream clears the footprint of a stream when the stream is not needed
-// any more.
-func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {
- // Mark the stream as done
+// finishStream closes the stream and puts the trailing headerFrame into controlbuf.
+func (t *http2Server) finishStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {
oldState := s.swapState(streamDone)
+ if oldState == streamDone {
+ // If the stream was already done, return.
+ return
+ }
- // In case stream sending and receiving are invoked in separate
- // goroutines (e.g., bi-directional streaming), cancel needs to be
- // called to interrupt the potential blocking on other goroutines.
- s.cancel()
+ hdr.cleanup = &cleanupStream{
+ streamID: s.id,
+ rst: rst,
+ rstCode: rstCode,
+ onWrite: func() {
+ t.deleteStream(s, eosReceived)
+ },
+ }
+ t.controlBuf.put(hdr)
+}
- // Deletes the stream from active streams
+// closeStream clears the footprint of a stream when the stream is not needed any more.
+func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, eosReceived bool) {
+ s.swapState(streamDone)
t.deleteStream(s, eosReceived)
- cleanup := &cleanupStream{
+ t.controlBuf.put(&cleanupStream{
streamID: s.id,
rst: rst,
rstCode: rstCode,
onWrite: func() {},
- }
-
- // No trailer. Puts cleanupFrame into transport's control buffer.
- if hdr == nil {
- t.controlBuf.put(cleanup)
- return
- }
-
- // We do the check here, because of the following scenario:
- // 1. closeStream is called first with a trailer. A trailer item with a piggybacked cleanup item
- // is put to control buffer.
- // 2. Loopy writer is waiting on a stream quota. It will never get it because client errored at
- // some point. So loopy can't act on trailer
- // 3. Client sends a RST_STREAM due to the error. Then closeStream is called without a trailer as
- // the result of the received RST_STREAM.
- // If we do this check at the beginning of the closeStream, then we won't put a cleanup item in
- // response to received RST_STREAM into the control buffer and outStream in loopy writer will
- // never get cleaned up.
-
- // If the stream is already done, don't send the trailer.
- if oldState == streamDone {
- return
- }
-
- hdr.cleanup = cleanup
- t.controlBuf.put(hdr)
+ })
}
func (t *http2Server) RemoteAddr() net.Addr {