[VOL-5291] On demand statistics for ONU and OLT

Change-Id: I4850bb0f0d2235122cb0c1bcf835b3672bb34436
Signed-off-by: Akash Reddy Kankanala <akash.kankanala@radisys.com>
diff --git a/vendor/google.golang.org/grpc/stream.go b/vendor/google.golang.org/grpc/stream.go
index 40abdac..1009268 100644
--- a/vendor/google.golang.org/grpc/stream.go
+++ b/vendor/google.golang.org/grpc/stream.go
@@ -36,8 +36,10 @@
 	"google.golang.org/grpc/internal/channelz"
 	"google.golang.org/grpc/internal/grpcrand"
 	"google.golang.org/grpc/internal/grpcutil"
+	imetadata "google.golang.org/grpc/internal/metadata"
 	iresolver "google.golang.org/grpc/internal/resolver"
 	"google.golang.org/grpc/internal/serviceconfig"
+	istatus "google.golang.org/grpc/internal/status"
 	"google.golang.org/grpc/internal/transport"
 	"google.golang.org/grpc/metadata"
 	"google.golang.org/grpc/peer"
@@ -46,10 +48,12 @@
 )
 
 // StreamHandler defines the handler called by gRPC server to complete the
-// execution of a streaming RPC. If a StreamHandler returns an error, it
-// should be produced by the status package, or else gRPC will use
-// codes.Unknown as the status code and err.Error() as the status message
-// of the RPC.
+// execution of a streaming RPC.
+//
+// If a StreamHandler returns an error, it should either be produced by the
+// status package, or be one of the context errors. Otherwise, gRPC will use
+// codes.Unknown as the status code and err.Error() as the status message of the
+// RPC.
 type StreamHandler func(srv interface{}, stream ServerStream) error
 
 // StreamDesc represents a streaming RPC service's method specification.  Used
@@ -119,6 +123,9 @@
 	// calling RecvMsg on the same stream at the same time, but it is not safe
 	// to call SendMsg on the same stream in different goroutines. It is also
 	// not safe to call CloseSend concurrently with SendMsg.
+	//
+	// It is not safe to modify the message after calling SendMsg. Tracing
+	// libraries and stats handlers may use the message lazily.
 	SendMsg(m interface{}) error
 	// RecvMsg blocks until it receives a message into m or the stream is
 	// done. It returns io.EOF when the stream completes successfully. On
@@ -148,6 +155,11 @@
 // If none of the above happen, a goroutine and a context will be leaked, and grpc
 // will not call the optionally-configured stats handler with a stats.End message.
 func (cc *ClientConn) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) {
+	if err := cc.idlenessMgr.onCallBegin(); err != nil {
+		return nil, err
+	}
+	defer cc.idlenessMgr.onCallEnd()
+
 	// allow interceptor to see all applicable call options, which means those
 	// configured as defaults from dial option as well as per-call options
 	opts = combine(cc.dopts.callOptions, opts)
@@ -164,6 +176,20 @@
 }
 
 func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
+	if md, added, ok := metadata.FromOutgoingContextRaw(ctx); ok {
+		// validate md
+		if err := imetadata.Validate(md); err != nil {
+			return nil, status.Error(codes.Internal, err.Error())
+		}
+		// validate added
+		for _, kvs := range added {
+			for i := 0; i < len(kvs); i += 2 {
+				if err := imetadata.ValidatePair(kvs[i], kvs[i+1]); err != nil {
+					return nil, status.Error(codes.Internal, err.Error())
+				}
+			}
+		}
+	}
 	if channelz.IsOn() {
 		cc.incrCallsStarted()
 		defer func() {
@@ -187,6 +213,13 @@
 	rpcInfo := iresolver.RPCInfo{Context: ctx, Method: method}
 	rpcConfig, err := cc.safeConfigSelector.SelectConfig(rpcInfo)
 	if err != nil {
+		if st, ok := status.FromError(err); ok {
+			// Restrict the code to the list allowed by gRFC A54.
+			if istatus.IsRestrictedControlPlaneCode(st) {
+				err = status.Errorf(codes.Internal, "config selector returned illegal status: %v", err)
+			}
+			return nil, err
+		}
 		return nil, toRPCErr(err)
 	}
 
@@ -293,20 +326,35 @@
 	if !cc.dopts.disableRetry {
 		cs.retryThrottler = cc.retryThrottler.Load().(*retryThrottler)
 	}
-	cs.binlog = binarylog.GetMethodLogger(method)
-
-	if err := cs.newAttemptLocked(false /* isTransparent */); err != nil {
-		cs.finish(err)
-		return nil, err
+	if ml := binarylog.GetMethodLogger(method); ml != nil {
+		cs.binlogs = append(cs.binlogs, ml)
+	}
+	if cc.dopts.binaryLogger != nil {
+		if ml := cc.dopts.binaryLogger.GetMethodLogger(method); ml != nil {
+			cs.binlogs = append(cs.binlogs, ml)
+		}
 	}
 
-	op := func(a *csAttempt) error { return a.newStream() }
+	// Pick the transport to use and create a new stream on the transport.
+	// Assign cs.attempt upon success.
+	op := func(a *csAttempt) error {
+		if err := a.getTransport(); err != nil {
+			return err
+		}
+		if err := a.newStream(); err != nil {
+			return err
+		}
+		// Because this operation is always called either here (while creating
+		// the clientStream) or by the retry code while locked when replaying
+		// the operation, it is safe to access cs.attempt directly.
+		cs.attempt = a
+		return nil
+	}
 	if err := cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) }); err != nil {
-		cs.finish(err)
 		return nil, err
 	}
 
-	if cs.binlog != nil {
+	if len(cs.binlogs) != 0 {
 		md, _ := metadata.FromOutgoingContext(ctx)
 		logEntry := &binarylog.ClientHeader{
 			OnClientSide: true,
@@ -320,7 +368,9 @@
 				logEntry.Timeout = 0
 			}
 		}
-		cs.binlog.Log(logEntry)
+		for _, binlog := range cs.binlogs {
+			binlog.Log(cs.ctx, logEntry)
+		}
 	}
 
 	if desc != unaryStreamDesc {
@@ -341,14 +391,20 @@
 	return cs, nil
 }
 
-// newAttemptLocked creates a new attempt with a transport.
-// If it succeeds, then it replaces clientStream's attempt with this new attempt.
-func (cs *clientStream) newAttemptLocked(isTransparent bool) (retErr error) {
+// newAttemptLocked creates a new csAttempt without a transport or stream.
+func (cs *clientStream) newAttemptLocked(isTransparent bool) (*csAttempt, error) {
+	if err := cs.ctx.Err(); err != nil {
+		return nil, toRPCErr(err)
+	}
+	if err := cs.cc.ctx.Err(); err != nil {
+		return nil, ErrClientConnClosing
+	}
+
 	ctx := newContextWithRPCInfo(cs.ctx, cs.callInfo.failFast, cs.callInfo.codec, cs.cp, cs.comp)
 	method := cs.callHdr.Method
-	sh := cs.cc.dopts.copts.StatsHandler
 	var beginTime time.Time
-	if sh != nil {
+	shs := cs.cc.dopts.copts.StatsHandlers
+	for _, sh := range shs {
 		ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: cs.callInfo.failFast})
 		beginTime = time.Now()
 		begin := &stats.Begin{
@@ -377,58 +433,81 @@
 		ctx = trace.NewContext(ctx, trInfo.tr)
 	}
 
-	newAttempt := &csAttempt{
-		ctx:          ctx,
-		beginTime:    beginTime,
-		cs:           cs,
-		dc:           cs.cc.dopts.dc,
-		statsHandler: sh,
-		trInfo:       trInfo,
-	}
-	defer func() {
-		if retErr != nil {
-			// This attempt is not set in the clientStream, so it's finish won't
-			// be called. Call it here for stats and trace in case they are not
-			// nil.
-			newAttempt.finish(retErr)
-		}
-	}()
-
-	if err := ctx.Err(); err != nil {
-		return toRPCErr(err)
-	}
-
-	if cs.cc.parsedTarget.Scheme == "xds" {
+	if cs.cc.parsedTarget.URL.Scheme == "xds" {
 		// Add extra metadata (metadata that will be added by transport) to context
 		// so the balancer can see them.
 		ctx = grpcutil.WithExtraMetadata(ctx, metadata.Pairs(
 			"content-type", grpcutil.ContentType(cs.callHdr.ContentSubtype),
 		))
 	}
-	t, done, err := cs.cc.getTransport(ctx, cs.callInfo.failFast, cs.callHdr.Method)
+
+	return &csAttempt{
+		ctx:           ctx,
+		beginTime:     beginTime,
+		cs:            cs,
+		dc:            cs.cc.dopts.dc,
+		statsHandlers: shs,
+		trInfo:        trInfo,
+	}, nil
+}
+
+func (a *csAttempt) getTransport() error {
+	cs := a.cs
+
+	var err error
+	a.t, a.pickResult, err = cs.cc.getTransport(a.ctx, cs.callInfo.failFast, cs.callHdr.Method)
 	if err != nil {
+		if de, ok := err.(dropError); ok {
+			err = de.error
+			a.drop = true
+		}
 		return err
 	}
-	if trInfo != nil {
-		trInfo.firstLine.SetRemoteAddr(t.RemoteAddr())
+	if a.trInfo != nil {
+		a.trInfo.firstLine.SetRemoteAddr(a.t.RemoteAddr())
 	}
-	newAttempt.t = t
-	newAttempt.done = done
-	cs.attempt = newAttempt
 	return nil
 }
 
 func (a *csAttempt) newStream() error {
 	cs := a.cs
 	cs.callHdr.PreviousAttempts = cs.numRetries
+
+	// Merge metadata stored in PickResult, if any, with existing call metadata.
+	// It is safe to overwrite the csAttempt's context here, since all state
+	// maintained in it are local to the attempt. When the attempt has to be
+	// retried, a new instance of csAttempt will be created.
+	if a.pickResult.Metadata != nil {
+		// We currently do not have a function it the metadata package which
+		// merges given metadata with existing metadata in a context. Existing
+		// function `AppendToOutgoingContext()` takes a variadic argument of key
+		// value pairs.
+		//
+		// TODO: Make it possible to retrieve key value pairs from metadata.MD
+		// in a form passable to AppendToOutgoingContext(), or create a version
+		// of AppendToOutgoingContext() that accepts a metadata.MD.
+		md, _ := metadata.FromOutgoingContext(a.ctx)
+		md = metadata.Join(md, a.pickResult.Metadata)
+		a.ctx = metadata.NewOutgoingContext(a.ctx, md)
+	}
+
 	s, err := a.t.NewStream(a.ctx, cs.callHdr)
 	if err != nil {
-		// Return without converting to an RPC error so retry code can
-		// inspect.
-		return err
+		nse, ok := err.(*transport.NewStreamError)
+		if !ok {
+			// Unexpected.
+			return err
+		}
+
+		if nse.AllowTransparentRetry {
+			a.allowTransparentRetry = true
+		}
+
+		// Unwrap and convert error.
+		return toRPCErr(nse.Err)
 	}
-	cs.attempt.s = s
-	cs.attempt.p = &parser{r: s}
+	a.s = s
+	a.p = &parser{r: s}
 	return nil
 }
 
@@ -454,7 +533,7 @@
 
 	retryThrottler *retryThrottler // The throttler active when the RPC began.
 
-	binlog *binarylog.MethodLogger // Binary logger, can be nil.
+	binlogs []binarylog.MethodLogger
 	// serverHeaderBinlogged is a boolean for whether server header has been
 	// logged. Server header will be logged when the first time one of those
 	// happens: stream.Header(), stream.Recv().
@@ -486,12 +565,12 @@
 // csAttempt implements a single transport stream attempt within a
 // clientStream.
 type csAttempt struct {
-	ctx  context.Context
-	cs   *clientStream
-	t    transport.ClientTransport
-	s    *transport.Stream
-	p    *parser
-	done func(balancer.DoneInfo)
+	ctx        context.Context
+	cs         *clientStream
+	t          transport.ClientTransport
+	s          *transport.Stream
+	p          *parser
+	pickResult balancer.PickResult
 
 	finished  bool
 	dc        Decompressor
@@ -504,8 +583,13 @@
 	// and cleared when the finish method is called.
 	trInfo *traceInfo
 
-	statsHandler stats.Handler
-	beginTime    time.Time
+	statsHandlers []stats.Handler
+	beginTime     time.Time
+
+	// set for newStream errors that may be transparently retried
+	allowTransparentRetry bool
+	// set for pick errors that are returned as a status
+	drop bool
 }
 
 func (cs *clientStream) commitAttemptLocked() {
@@ -525,41 +609,21 @@
 // shouldRetry returns nil if the RPC should be retried; otherwise it returns
 // the error that should be returned by the operation.  If the RPC should be
 // retried, the bool indicates whether it is being retried transparently.
-func (cs *clientStream) shouldRetry(err error) (bool, error) {
-	if cs.attempt.s == nil {
-		// Error from NewClientStream.
-		nse, ok := err.(*transport.NewStreamError)
-		if !ok {
-			// Unexpected, but assume no I/O was performed and the RPC is not
-			// fatal, so retry indefinitely.
-			return true, nil
-		}
+func (a *csAttempt) shouldRetry(err error) (bool, error) {
+	cs := a.cs
 
-		// Unwrap and convert error.
-		err = toRPCErr(nse.Err)
-
-		// Never retry DoNotRetry errors, which indicate the RPC should not be
-		// retried due to max header list size violation, etc.
-		if nse.DoNotRetry {
-			return false, err
-		}
-
-		// In the event of a non-IO operation error from NewStream, we never
-		// attempted to write anything to the wire, so we can retry
-		// indefinitely.
-		if !nse.DoNotTransparentRetry {
-			return true, nil
-		}
-	}
-	if cs.finished || cs.committed {
-		// RPC is finished or committed; cannot retry.
+	if cs.finished || cs.committed || a.drop {
+		// RPC is finished or committed or was dropped by the picker; cannot retry.
 		return false, err
 	}
+	if a.s == nil && a.allowTransparentRetry {
+		return true, nil
+	}
 	// Wait for the trailers.
 	unprocessed := false
-	if cs.attempt.s != nil {
-		<-cs.attempt.s.Done()
-		unprocessed = cs.attempt.s.Unprocessed()
+	if a.s != nil {
+		<-a.s.Done()
+		unprocessed = a.s.Unprocessed()
 	}
 	if cs.firstAttempt && unprocessed {
 		// First attempt, stream unprocessed: transparently retry.
@@ -571,14 +635,14 @@
 
 	pushback := 0
 	hasPushback := false
-	if cs.attempt.s != nil {
-		if !cs.attempt.s.TrailersOnly() {
+	if a.s != nil {
+		if !a.s.TrailersOnly() {
 			return false, err
 		}
 
 		// TODO(retry): Move down if the spec changes to not check server pushback
 		// before considering this a failure for throttling.
-		sps := cs.attempt.s.Trailer()["grpc-retry-pushback-ms"]
+		sps := a.s.Trailer()["grpc-retry-pushback-ms"]
 		if len(sps) == 1 {
 			var e error
 			if pushback, e = strconv.Atoi(sps[0]); e != nil || pushback < 0 {
@@ -595,10 +659,10 @@
 	}
 
 	var code codes.Code
-	if cs.attempt.s != nil {
-		code = cs.attempt.s.Status().Code()
+	if a.s != nil {
+		code = a.s.Status().Code()
 	} else {
-		code = status.Convert(err).Code()
+		code = status.Code(err)
 	}
 
 	rp := cs.methodConfig.RetryPolicy
@@ -643,19 +707,24 @@
 }
 
 // Returns nil if a retry was performed and succeeded; error otherwise.
-func (cs *clientStream) retryLocked(lastErr error) error {
+func (cs *clientStream) retryLocked(attempt *csAttempt, lastErr error) error {
 	for {
-		cs.attempt.finish(toRPCErr(lastErr))
-		isTransparent, err := cs.shouldRetry(lastErr)
+		attempt.finish(toRPCErr(lastErr))
+		isTransparent, err := attempt.shouldRetry(lastErr)
 		if err != nil {
 			cs.commitAttemptLocked()
 			return err
 		}
 		cs.firstAttempt = false
-		if err := cs.newAttemptLocked(isTransparent); err != nil {
+		attempt, err = cs.newAttemptLocked(isTransparent)
+		if err != nil {
+			// Only returns error if the clientconn is closed or the context of
+			// the stream is canceled.
 			return err
 		}
-		if lastErr = cs.replayBufferLocked(); lastErr == nil {
+		// Note that the first op in the replay buffer always sets cs.attempt
+		// if it is able to pick a transport and create a stream.
+		if lastErr = cs.replayBufferLocked(attempt); lastErr == nil {
 			return nil
 		}
 	}
@@ -665,7 +734,10 @@
 	cs.commitAttempt()
 	// No need to lock before using attempt, since we know it is committed and
 	// cannot change.
-	return cs.attempt.s.Context()
+	if cs.attempt.s != nil {
+		return cs.attempt.s.Context()
+	}
+	return cs.ctx
 }
 
 func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func()) error {
@@ -679,6 +751,18 @@
 			// already be status errors.
 			return toRPCErr(op(cs.attempt))
 		}
+		if len(cs.buffer) == 0 {
+			// For the first op, which controls creation of the stream and
+			// assigns cs.attempt, we need to create a new attempt inline
+			// before executing the first op.  On subsequent ops, the attempt
+			// is created immediately before replaying the ops.
+			var err error
+			if cs.attempt, err = cs.newAttemptLocked(false /* isTransparent */); err != nil {
+				cs.mu.Unlock()
+				cs.finish(err)
+				return err
+			}
+		}
 		a := cs.attempt
 		cs.mu.Unlock()
 		err := op(a)
@@ -695,7 +779,7 @@
 			cs.mu.Unlock()
 			return err
 		}
-		if err := cs.retryLocked(err); err != nil {
+		if err := cs.retryLocked(a, err); err != nil {
 			cs.mu.Unlock()
 			return err
 		}
@@ -704,17 +788,25 @@
 
 func (cs *clientStream) Header() (metadata.MD, error) {
 	var m metadata.MD
+	noHeader := false
 	err := cs.withRetry(func(a *csAttempt) error {
 		var err error
 		m, err = a.s.Header()
+		if err == transport.ErrNoHeaders {
+			noHeader = true
+			return nil
+		}
 		return toRPCErr(err)
 	}, cs.commitAttemptLocked)
+
 	if err != nil {
 		cs.finish(err)
 		return nil, err
 	}
-	if cs.binlog != nil && !cs.serverHeaderBinlogged {
-		// Only log if binary log is on and header has not been logged.
+
+	if len(cs.binlogs) != 0 && !cs.serverHeaderBinlogged && !noHeader {
+		// Only log if binary log is on and header has not been logged, and
+		// there is actually headers to log.
 		logEntry := &binarylog.ServerHeader{
 			OnClientSide: true,
 			Header:       m,
@@ -723,10 +815,12 @@
 		if peer, ok := peer.FromContext(cs.Context()); ok {
 			logEntry.PeerAddr = peer.Addr
 		}
-		cs.binlog.Log(logEntry)
 		cs.serverHeaderBinlogged = true
+		for _, binlog := range cs.binlogs {
+			binlog.Log(cs.ctx, logEntry)
+		}
 	}
-	return m, err
+	return m, nil
 }
 
 func (cs *clientStream) Trailer() metadata.MD {
@@ -744,10 +838,9 @@
 	return cs.attempt.s.Trailer()
 }
 
-func (cs *clientStream) replayBufferLocked() error {
-	a := cs.attempt
+func (cs *clientStream) replayBufferLocked(attempt *csAttempt) error {
 	for _, f := range cs.buffer {
-		if err := f(a); err != nil {
+		if err := f(attempt); err != nil {
 			return err
 		}
 	}
@@ -795,47 +888,48 @@
 	if len(payload) > *cs.callInfo.maxSendMessageSize {
 		return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), *cs.callInfo.maxSendMessageSize)
 	}
-	msgBytes := data // Store the pointer before setting to nil. For binary logging.
 	op := func(a *csAttempt) error {
-		err := a.sendMsg(m, hdr, payload, data)
-		// nil out the message and uncomp when replaying; they are only needed for
-		// stats which is disabled for subsequent attempts.
-		m, data = nil, nil
-		return err
+		return a.sendMsg(m, hdr, payload, data)
 	}
 	err = cs.withRetry(op, func() { cs.bufferForRetryLocked(len(hdr)+len(payload), op) })
-	if cs.binlog != nil && err == nil {
-		cs.binlog.Log(&binarylog.ClientMessage{
+	if len(cs.binlogs) != 0 && err == nil {
+		cm := &binarylog.ClientMessage{
 			OnClientSide: true,
-			Message:      msgBytes,
-		})
+			Message:      data,
+		}
+		for _, binlog := range cs.binlogs {
+			binlog.Log(cs.ctx, cm)
+		}
 	}
-	return
+	return err
 }
 
 func (cs *clientStream) RecvMsg(m interface{}) error {
-	if cs.binlog != nil && !cs.serverHeaderBinlogged {
+	if len(cs.binlogs) != 0 && !cs.serverHeaderBinlogged {
 		// Call Header() to binary log header if it's not already logged.
 		cs.Header()
 	}
 	var recvInfo *payloadInfo
-	if cs.binlog != nil {
+	if len(cs.binlogs) != 0 {
 		recvInfo = &payloadInfo{}
 	}
 	err := cs.withRetry(func(a *csAttempt) error {
 		return a.recvMsg(m, recvInfo)
 	}, cs.commitAttemptLocked)
-	if cs.binlog != nil && err == nil {
-		cs.binlog.Log(&binarylog.ServerMessage{
+	if len(cs.binlogs) != 0 && err == nil {
+		sm := &binarylog.ServerMessage{
 			OnClientSide: true,
 			Message:      recvInfo.uncompressedBytes,
-		})
+		}
+		for _, binlog := range cs.binlogs {
+			binlog.Log(cs.ctx, sm)
+		}
 	}
 	if err != nil || !cs.desc.ServerStreams {
 		// err != nil or non-server-streaming indicates end of stream.
 		cs.finish(err)
 
-		if cs.binlog != nil {
+		if len(cs.binlogs) != 0 {
 			// finish will not log Trailer. Log Trailer here.
 			logEntry := &binarylog.ServerTrailer{
 				OnClientSide: true,
@@ -848,7 +942,9 @@
 			if peer, ok := peer.FromContext(cs.Context()); ok {
 				logEntry.PeerAddr = peer.Addr
 			}
-			cs.binlog.Log(logEntry)
+			for _, binlog := range cs.binlogs {
+				binlog.Log(cs.ctx, logEntry)
+			}
 		}
 	}
 	return err
@@ -869,10 +965,13 @@
 		return nil
 	}
 	cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) })
-	if cs.binlog != nil {
-		cs.binlog.Log(&binarylog.ClientHalfClose{
+	if len(cs.binlogs) != 0 {
+		chc := &binarylog.ClientHalfClose{
 			OnClientSide: true,
-		})
+		}
+		for _, binlog := range cs.binlogs {
+			binlog.Log(cs.ctx, chc)
+		}
 	}
 	// We never returned an error here for reasons.
 	return nil
@@ -889,6 +988,9 @@
 		return
 	}
 	cs.finished = true
+	for _, onFinish := range cs.callInfo.onFinish {
+		onFinish(err)
+	}
 	cs.commitAttemptLocked()
 	if cs.attempt != nil {
 		cs.attempt.finish(err)
@@ -905,10 +1007,13 @@
 	//
 	// Only one of cancel or trailer needs to be logged. In the cases where
 	// users don't call RecvMsg, users must have already canceled the RPC.
-	if cs.binlog != nil && status.Code(err) == codes.Canceled {
-		cs.binlog.Log(&binarylog.Cancel{
+	if len(cs.binlogs) != 0 && status.Code(err) == codes.Canceled {
+		c := &binarylog.Cancel{
 			OnClientSide: true,
-		})
+		}
+		for _, binlog := range cs.binlogs {
+			binlog.Log(cs.ctx, c)
+		}
 	}
 	if err == nil {
 		cs.retryThrottler.successfulRPC()
@@ -941,8 +1046,8 @@
 		}
 		return io.EOF
 	}
-	if a.statsHandler != nil {
-		a.statsHandler.HandleRPC(a.ctx, outPayload(true, m, data, payld, time.Now()))
+	for _, sh := range a.statsHandlers {
+		sh.HandleRPC(a.ctx, outPayload(true, m, data, payld, time.Now()))
 	}
 	if channelz.IsOn() {
 		a.t.IncrMsgSent()
@@ -952,7 +1057,7 @@
 
 func (a *csAttempt) recvMsg(m interface{}, payInfo *payloadInfo) (err error) {
 	cs := a.cs
-	if a.statsHandler != nil && payInfo == nil {
+	if len(a.statsHandlers) != 0 && payInfo == nil {
 		payInfo = &payloadInfo{}
 	}
 
@@ -980,6 +1085,7 @@
 			}
 			return io.EOF // indicates successful end of stream.
 		}
+
 		return toRPCErr(err)
 	}
 	if a.trInfo != nil {
@@ -989,15 +1095,16 @@
 		}
 		a.mu.Unlock()
 	}
-	if a.statsHandler != nil {
-		a.statsHandler.HandleRPC(a.ctx, &stats.InPayload{
+	for _, sh := range a.statsHandlers {
+		sh.HandleRPC(a.ctx, &stats.InPayload{
 			Client:   true,
 			RecvTime: time.Now(),
 			Payload:  m,
 			// TODO truncate large payload.
-			Data:       payInfo.uncompressedBytes,
-			WireLength: payInfo.wireLength + headerLen,
-			Length:     len(payInfo.uncompressedBytes),
+			Data:             payInfo.uncompressedBytes,
+			WireLength:       payInfo.compressedLength + headerLen,
+			CompressedLength: payInfo.compressedLength,
+			Length:           len(payInfo.uncompressedBytes),
 		})
 	}
 	if channelz.IsOn() {
@@ -1036,12 +1143,12 @@
 		tr = a.s.Trailer()
 	}
 
-	if a.done != nil {
+	if a.pickResult.Done != nil {
 		br := false
 		if a.s != nil {
 			br = a.s.BytesReceived()
 		}
-		a.done(balancer.DoneInfo{
+		a.pickResult.Done(balancer.DoneInfo{
 			Err:           err,
 			Trailer:       tr,
 			BytesSent:     a.s != nil,
@@ -1049,7 +1156,7 @@
 			ServerLoad:    balancerload.Parse(tr),
 		})
 	}
-	if a.statsHandler != nil {
+	for _, sh := range a.statsHandlers {
 		end := &stats.End{
 			Client:    true,
 			BeginTime: a.beginTime,
@@ -1057,7 +1164,7 @@
 			Trailer:   tr,
 			Error:     err,
 		}
-		a.statsHandler.HandleRPC(a.ctx, end)
+		sh.HandleRPC(a.ctx, end)
 	}
 	if a.trInfo != nil && a.trInfo.tr != nil {
 		if err == nil {
@@ -1166,14 +1273,19 @@
 	as.p = &parser{r: s}
 	ac.incrCallsStarted()
 	if desc != unaryStreamDesc {
-		// Listen on cc and stream contexts to cleanup when the user closes the
-		// ClientConn or cancels the stream context.  In all other cases, an error
-		// should already be injected into the recv buffer by the transport, which
-		// the client will eventually receive, and then we will cancel the stream's
-		// context in clientStream.finish.
+		// Listen on stream context to cleanup when the stream context is
+		// canceled.  Also listen for the addrConn's context in case the
+		// addrConn is closed or reconnects to a different address.  In all
+		// other cases, an error should already be injected into the recv
+		// buffer by the transport, which the client will eventually receive,
+		// and then we will cancel the stream's context in
+		// addrConnStream.finish.
 		go func() {
+			ac.mu.Lock()
+			acCtx := ac.ctx
+			ac.mu.Unlock()
 			select {
-			case <-ac.ctx.Done():
+			case <-acCtx.Done():
 				as.finish(status.Error(codes.Canceled, "grpc: the SubConn is closing"))
 			case <-ctx.Done():
 				as.finish(toRPCErr(ctx.Err()))
@@ -1362,8 +1474,10 @@
 
 // ServerStream defines the server-side behavior of a streaming RPC.
 //
-// All errors returned from ServerStream methods are compatible with the
-// status package.
+// Errors returned from ServerStream methods are compatible with the status
+// package.  However, the status code will often not match the RPC status as
+// seen by the client application, and therefore, should not be relied upon for
+// this purpose.
 type ServerStream interface {
 	// SetHeader sets the header metadata. It may be called multiple times.
 	// When call multiple times, all the provided metadata will be merged.
@@ -1395,6 +1509,9 @@
 	// It is safe to have a goroutine calling SendMsg and another goroutine
 	// calling RecvMsg on the same stream at the same time, but it is not safe
 	// to call SendMsg on the same stream in different goroutines.
+	//
+	// It is not safe to modify the message after calling SendMsg. Tracing
+	// libraries and stats handlers may use the message lazily.
 	SendMsg(m interface{}) error
 	// RecvMsg blocks until it receives a message into m or the stream is
 	// done. It returns io.EOF when the client has performed a CloseSend. On
@@ -1420,13 +1537,15 @@
 	comp   encoding.Compressor
 	decomp encoding.Compressor
 
+	sendCompressorName string
+
 	maxReceiveMessageSize int
 	maxSendMessageSize    int
 	trInfo                *traceInfo
 
-	statsHandler stats.Handler
+	statsHandler []stats.Handler
 
-	binlog *binarylog.MethodLogger
+	binlogs []binarylog.MethodLogger
 	// serverHeaderBinlogged indicates whether server header has been logged. It
 	// will happen when one of the following two happens: stream.SendHeader(),
 	// stream.Send().
@@ -1446,17 +1565,29 @@
 	if md.Len() == 0 {
 		return nil
 	}
+	err := imetadata.Validate(md)
+	if err != nil {
+		return status.Error(codes.Internal, err.Error())
+	}
 	return ss.s.SetHeader(md)
 }
 
 func (ss *serverStream) SendHeader(md metadata.MD) error {
-	err := ss.t.WriteHeader(ss.s, md)
-	if ss.binlog != nil && !ss.serverHeaderBinlogged {
+	err := imetadata.Validate(md)
+	if err != nil {
+		return status.Error(codes.Internal, err.Error())
+	}
+
+	err = ss.t.WriteHeader(ss.s, md)
+	if len(ss.binlogs) != 0 && !ss.serverHeaderBinlogged {
 		h, _ := ss.s.Header()
-		ss.binlog.Log(&binarylog.ServerHeader{
+		sh := &binarylog.ServerHeader{
 			Header: h,
-		})
+		}
 		ss.serverHeaderBinlogged = true
+		for _, binlog := range ss.binlogs {
+			binlog.Log(ss.ctx, sh)
+		}
 	}
 	return err
 }
@@ -1465,6 +1596,9 @@
 	if md.Len() == 0 {
 		return
 	}
+	if err := imetadata.Validate(md); err != nil {
+		logger.Errorf("stream: failed to validate md when setting trailer, err: %v", err)
+	}
 	ss.s.SetTrailer(md)
 }
 
@@ -1497,6 +1631,13 @@
 		}
 	}()
 
+	// Server handler could have set new compressor by calling SetSendCompressor.
+	// In case it is set, we need to use it for compressing outbound message.
+	if sendCompressorsName := ss.s.SendCompress(); sendCompressorsName != ss.sendCompressorName {
+		ss.comp = encoding.GetCompressor(sendCompressorsName)
+		ss.sendCompressorName = sendCompressorsName
+	}
+
 	// load hdr, payload, data
 	hdr, payload, data, err := prepareMsg(m, ss.codec, ss.cp, ss.comp)
 	if err != nil {
@@ -1510,20 +1651,28 @@
 	if err := ss.t.Write(ss.s, hdr, payload, &transport.Options{Last: false}); err != nil {
 		return toRPCErr(err)
 	}
-	if ss.binlog != nil {
+	if len(ss.binlogs) != 0 {
 		if !ss.serverHeaderBinlogged {
 			h, _ := ss.s.Header()
-			ss.binlog.Log(&binarylog.ServerHeader{
+			sh := &binarylog.ServerHeader{
 				Header: h,
-			})
+			}
 			ss.serverHeaderBinlogged = true
+			for _, binlog := range ss.binlogs {
+				binlog.Log(ss.ctx, sh)
+			}
 		}
-		ss.binlog.Log(&binarylog.ServerMessage{
+		sm := &binarylog.ServerMessage{
 			Message: data,
-		})
+		}
+		for _, binlog := range ss.binlogs {
+			binlog.Log(ss.ctx, sm)
+		}
 	}
-	if ss.statsHandler != nil {
-		ss.statsHandler.HandleRPC(ss.s.Context(), outPayload(false, m, data, payload, time.Now()))
+	if len(ss.statsHandler) != 0 {
+		for _, sh := range ss.statsHandler {
+			sh.HandleRPC(ss.s.Context(), outPayload(false, m, data, payload, time.Now()))
+		}
 	}
 	return nil
 }
@@ -1557,13 +1706,16 @@
 		}
 	}()
 	var payInfo *payloadInfo
-	if ss.statsHandler != nil || ss.binlog != nil {
+	if len(ss.statsHandler) != 0 || len(ss.binlogs) != 0 {
 		payInfo = &payloadInfo{}
 	}
 	if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxReceiveMessageSize, payInfo, ss.decomp); err != nil {
 		if err == io.EOF {
-			if ss.binlog != nil {
-				ss.binlog.Log(&binarylog.ClientHalfClose{})
+			if len(ss.binlogs) != 0 {
+				chc := &binarylog.ClientHalfClose{}
+				for _, binlog := range ss.binlogs {
+					binlog.Log(ss.ctx, chc)
+				}
 			}
 			return err
 		}
@@ -1572,20 +1724,26 @@
 		}
 		return toRPCErr(err)
 	}
-	if ss.statsHandler != nil {
-		ss.statsHandler.HandleRPC(ss.s.Context(), &stats.InPayload{
-			RecvTime: time.Now(),
-			Payload:  m,
-			// TODO truncate large payload.
-			Data:       payInfo.uncompressedBytes,
-			WireLength: payInfo.wireLength + headerLen,
-			Length:     len(payInfo.uncompressedBytes),
-		})
+	if len(ss.statsHandler) != 0 {
+		for _, sh := range ss.statsHandler {
+			sh.HandleRPC(ss.s.Context(), &stats.InPayload{
+				RecvTime: time.Now(),
+				Payload:  m,
+				// TODO truncate large payload.
+				Data:             payInfo.uncompressedBytes,
+				Length:           len(payInfo.uncompressedBytes),
+				WireLength:       payInfo.compressedLength + headerLen,
+				CompressedLength: payInfo.compressedLength,
+			})
+		}
 	}
-	if ss.binlog != nil {
-		ss.binlog.Log(&binarylog.ClientMessage{
+	if len(ss.binlogs) != 0 {
+		cm := &binarylog.ClientMessage{
 			Message: payInfo.uncompressedBytes,
-		})
+		}
+		for _, binlog := range ss.binlogs {
+			binlog.Log(ss.ctx, cm)
+		}
 	}
 	return nil
 }