VOL-1691 Fix openolt adapter getting stuck while registartion with core

Change-Id: Ide8131f325bc15f1b909e14d7af6ee9bcd6b3b5b
diff --git a/vendor/google.golang.org/grpc/stream.go b/vendor/google.golang.org/grpc/stream.go
index ccf996b..db14c32 100644
--- a/vendor/google.golang.org/grpc/stream.go
+++ b/vendor/google.golang.org/grpc/stream.go
@@ -30,9 +30,9 @@
 	"golang.org/x/net/trace"
 	"google.golang.org/grpc/balancer"
 	"google.golang.org/grpc/codes"
-	"google.golang.org/grpc/connectivity"
 	"google.golang.org/grpc/encoding"
 	"google.golang.org/grpc/grpclog"
+	"google.golang.org/grpc/internal/balancerload"
 	"google.golang.org/grpc/internal/binarylog"
 	"google.golang.org/grpc/internal/channelz"
 	"google.golang.org/grpc/internal/grpcrand"
@@ -230,17 +230,21 @@
 	if c.creds != nil {
 		callHdr.Creds = c.creds
 	}
-	var trInfo traceInfo
+	var trInfo *traceInfo
 	if EnableTracing {
-		trInfo.tr = trace.New("grpc.Sent."+methodFamily(method), method)
-		trInfo.firstLine.client = true
+		trInfo = &traceInfo{
+			tr: trace.New("grpc.Sent."+methodFamily(method), method),
+			firstLine: firstLine{
+				client: true,
+			},
+		}
 		if deadline, ok := ctx.Deadline(); ok {
 			trInfo.firstLine.deadline = time.Until(deadline)
 		}
 		trInfo.tr.LazyLog(&trInfo.firstLine, false)
 		ctx = trace.NewContext(ctx, trInfo.tr)
 	}
-	ctx = newContextWithRPCInfo(ctx, c.failFast)
+	ctx = newContextWithRPCInfo(ctx, c.failFast, c.codec, cp, comp)
 	sh := cc.dopts.copts.StatsHandler
 	var beginTime time.Time
 	if sh != nil {
@@ -323,7 +327,7 @@
 	return cs, nil
 }
 
-func (cs *clientStream) newAttemptLocked(sh stats.Handler, trInfo traceInfo) error {
+func (cs *clientStream) newAttemptLocked(sh stats.Handler, trInfo *traceInfo) error {
 	cs.attempt = &csAttempt{
 		cs:           cs,
 		dc:           cs.cc.dopts.dc,
@@ -338,6 +342,9 @@
 	if err != nil {
 		return err
 	}
+	if trInfo != nil {
+		trInfo.firstLine.SetRemoteAddr(t.RemoteAddr())
+	}
 	cs.attempt.t = t
 	cs.attempt.done = done
 	return nil
@@ -414,9 +421,10 @@
 	decompSet bool
 
 	mu sync.Mutex // guards trInfo.tr
+	// trInfo may be nil (if EnableTracing is false).
 	// trInfo.tr is set when created (if EnableTracing is true),
 	// and cleared when the finish method is called.
-	trInfo traceInfo
+	trInfo *traceInfo
 
 	statsHandler stats.Handler
 }
@@ -540,7 +548,7 @@
 			cs.commitAttemptLocked()
 			return err
 		}
-		if err := cs.newAttemptLocked(nil, traceInfo{}); err != nil {
+		if err := cs.newAttemptLocked(nil, nil); err != nil {
 			return err
 		}
 		if lastErr = cs.replayBufferLocked(); lastErr == nil {
@@ -668,15 +676,13 @@
 	if !cs.desc.ClientStreams {
 		cs.sentLast = true
 	}
-	data, err := encode(cs.codec, m)
+
+	// load hdr, payload, data
+	hdr, payload, data, err := prepareMsg(m, cs.codec, cs.cp, cs.comp)
 	if err != nil {
 		return err
 	}
-	compData, err := compress(data, cs.cp, cs.comp)
-	if err != nil {
-		return err
-	}
-	hdr, payload := msgHeader(data, compData)
+
 	// TODO(dfawley): should we be checking len(data) instead?
 	if len(payload) > *cs.callInfo.maxSendMessageSize {
 		return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), *cs.callInfo.maxSendMessageSize)
@@ -811,7 +817,7 @@
 
 func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error {
 	cs := a.cs
-	if EnableTracing {
+	if a.trInfo != nil {
 		a.mu.Lock()
 		if a.trInfo.tr != nil {
 			a.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true)
@@ -868,7 +874,7 @@
 		}
 		return toRPCErr(err)
 	}
-	if EnableTracing {
+	if a.trInfo != nil {
 		a.mu.Lock()
 		if a.trInfo.tr != nil {
 			a.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true)
@@ -881,8 +887,9 @@
 			RecvTime: time.Now(),
 			Payload:  m,
 			// TODO truncate large payload.
-			Data:   payInfo.uncompressedBytes,
-			Length: len(payInfo.uncompressedBytes),
+			Data:       payInfo.uncompressedBytes,
+			WireLength: payInfo.wireLength,
+			Length:     len(payInfo.uncompressedBytes),
 		})
 	}
 	if channelz.IsOn() {
@@ -915,22 +922,23 @@
 		// Ending a stream with EOF indicates a success.
 		err = nil
 	}
+	var tr metadata.MD
 	if a.s != nil {
 		a.t.CloseStream(a.s, err)
+		tr = a.s.Trailer()
 	}
 
 	if a.done != nil {
 		br := false
-		var tr metadata.MD
 		if a.s != nil {
 			br = a.s.BytesReceived()
-			tr = a.s.Trailer()
 		}
 		a.done(balancer.DoneInfo{
 			Err:           err,
 			Trailer:       tr,
 			BytesSent:     a.s != nil,
 			BytesReceived: br,
+			ServerLoad:    balancerload.Parse(tr),
 		})
 	}
 	if a.statsHandler != nil {
@@ -938,11 +946,12 @@
 			Client:    true,
 			BeginTime: a.cs.beginTime,
 			EndTime:   time.Now(),
+			Trailer:   tr,
 			Error:     err,
 		}
 		a.statsHandler.HandleRPC(a.cs.ctx, end)
 	}
-	if a.trInfo.tr != nil {
+	if a.trInfo != nil && a.trInfo.tr != nil {
 		if err == nil {
 			a.trInfo.tr.LazyPrintf("RPC: [OK]")
 		} else {
@@ -955,19 +964,18 @@
 	a.mu.Unlock()
 }
 
-func (ac *addrConn) newClientStream(ctx context.Context, desc *StreamDesc, method string, t transport.ClientTransport, opts ...CallOption) (_ ClientStream, err error) {
-	ac.mu.Lock()
-	if ac.transport != t {
-		ac.mu.Unlock()
-		return nil, status.Error(codes.Canceled, "the provided transport is no longer valid to use")
-	}
-	// transition to CONNECTING state when an attempt starts
-	if ac.state != connectivity.Connecting {
-		ac.updateConnectivityState(connectivity.Connecting)
-		ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
-	}
-	ac.mu.Unlock()
-
+// newClientStream creates a ClientStream with the specified transport, on the
+// given addrConn.
+//
+// It's expected that the given transport is either the same one in addrConn, or
+// is already closed. To avoid race, transport is specified separately, instead
+// of using ac.transpot.
+//
+// Main difference between this and ClientConn.NewStream:
+// - no retry
+// - no service config (or wait for service config)
+// - no tracing or stats
+func newNonRetryClientStream(ctx context.Context, desc *StreamDesc, method string, t transport.ClientTransport, ac *addrConn, opts ...CallOption) (_ ClientStream, err error) {
 	if t == nil {
 		// TODO: return RPC error here?
 		return nil, errors.New("transport provided is nil")
@@ -975,14 +983,6 @@
 	// defaultCallInfo contains unnecessary info(i.e. failfast, maxRetryRPCBufferSize), so we just initialize an empty struct.
 	c := &callInfo{}
 
-	for _, o := range opts {
-		if err := o.before(c); err != nil {
-			return nil, toRPCErr(err)
-		}
-	}
-	c.maxReceiveMessageSize = getMaxSize(nil, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
-	c.maxSendMessageSize = getMaxSize(nil, c.maxSendMessageSize, defaultServerMaxSendMessageSize)
-
 	// Possible context leak:
 	// The cancel function for the child context we create will only be called
 	// when RecvMsg returns a non-nil error, if the ClientConn is closed, or if
@@ -995,6 +995,13 @@
 		}
 	}()
 
+	for _, o := range opts {
+		if err := o.before(c); err != nil {
+			return nil, toRPCErr(err)
+		}
+	}
+	c.maxReceiveMessageSize = getMaxSize(nil, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
+	c.maxSendMessageSize = getMaxSize(nil, c.maxSendMessageSize, defaultServerMaxSendMessageSize)
 	if err := setCallInfoCodec(c); err != nil {
 		return nil, err
 	}
@@ -1027,6 +1034,7 @@
 		callHdr.Creds = c.creds
 	}
 
+	// Use a special addrConnStream to avoid retry.
 	as := &addrConnStream{
 		callHdr:  callHdr,
 		ac:       ac,
@@ -1138,15 +1146,13 @@
 	if !as.desc.ClientStreams {
 		as.sentLast = true
 	}
-	data, err := encode(as.codec, m)
+
+	// load hdr, payload, data
+	hdr, payld, _, err := prepareMsg(m, as.codec, as.cp, as.comp)
 	if err != nil {
 		return err
 	}
-	compData, err := compress(data, as.cp, as.comp)
-	if err != nil {
-		return err
-	}
-	hdr, payld := msgHeader(data, compData)
+
 	// TODO(dfawley): should we be checking len(data) instead?
 	if len(payld) > *as.callInfo.maxSendMessageSize {
 		return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payld), *as.callInfo.maxSendMessageSize)
@@ -1383,15 +1389,13 @@
 			ss.t.IncrMsgSent()
 		}
 	}()
-	data, err := encode(ss.codec, m)
+
+	// load hdr, payload, data
+	hdr, payload, data, err := prepareMsg(m, ss.codec, ss.cp, ss.comp)
 	if err != nil {
 		return err
 	}
-	compData, err := compress(data, ss.cp, ss.comp)
-	if err != nil {
-		return err
-	}
-	hdr, payload := msgHeader(data, compData)
+
 	// TODO(dfawley): should we be checking len(data) instead?
 	if len(payload) > ss.maxSendMessageSize {
 		return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), ss.maxSendMessageSize)
@@ -1466,8 +1470,9 @@
 			RecvTime: time.Now(),
 			Payload:  m,
 			// TODO truncate large payload.
-			Data:   payInfo.uncompressedBytes,
-			Length: len(payInfo.uncompressedBytes),
+			Data:       payInfo.uncompressedBytes,
+			WireLength: payInfo.wireLength,
+			Length:     len(payInfo.uncompressedBytes),
 		})
 	}
 	if ss.binlog != nil {
@@ -1483,3 +1488,24 @@
 func MethodFromServerStream(stream ServerStream) (string, bool) {
 	return Method(stream.Context())
 }
+
+// prepareMsg returns the hdr, payload and data
+// using the compressors passed or using the
+// passed preparedmsg
+func prepareMsg(m interface{}, codec baseCodec, cp Compressor, comp encoding.Compressor) (hdr, payload, data []byte, err error) {
+	if preparedMsg, ok := m.(*PreparedMsg); ok {
+		return preparedMsg.hdr, preparedMsg.payload, preparedMsg.encodedData, nil
+	}
+	// The input interface is not a prepared msg.
+	// Marshal and Compress the data at this point
+	data, err = encode(codec, m)
+	if err != nil {
+		return nil, nil, nil, err
+	}
+	compData, err := compress(data, cp, comp)
+	if err != nil {
+		return nil, nil, nil, err
+	}
+	hdr, payload = msgHeader(data, compData)
+	return hdr, payload, data, nil
+}