[VOL-5291] On demand statistics for ONU and OLT

Change-Id: I4850bb0f0d2235122cb0c1bcf835b3672bb34436
Signed-off-by: Akash Reddy Kankanala <akash.kankanala@radisys.com>
diff --git a/vendor/google.golang.org/grpc/server.go b/vendor/google.golang.org/grpc/server.go
index 4599b51..81969e7 100644
--- a/vendor/google.golang.org/grpc/server.go
+++ b/vendor/google.golang.org/grpc/server.go
@@ -43,8 +43,8 @@
 	"google.golang.org/grpc/internal"
 	"google.golang.org/grpc/internal/binarylog"
 	"google.golang.org/grpc/internal/channelz"
-	"google.golang.org/grpc/internal/grpcrand"
 	"google.golang.org/grpc/internal/grpcsync"
+	"google.golang.org/grpc/internal/grpcutil"
 	"google.golang.org/grpc/internal/transport"
 	"google.golang.org/grpc/keepalive"
 	"google.golang.org/grpc/metadata"
@@ -73,6 +73,14 @@
 	internal.DrainServerTransports = func(srv *Server, addr string) {
 		srv.drainServerTransports(addr)
 	}
+	internal.AddGlobalServerOptions = func(opt ...ServerOption) {
+		globalServerOptions = append(globalServerOptions, opt...)
+	}
+	internal.ClearGlobalServerOptions = func() {
+		globalServerOptions = nil
+	}
+	internal.BinaryLogger = binaryLogger
+	internal.JoinServerOptions = newJoinServerOption
 }
 
 var statusOK = status.New(codes.OK, "")
@@ -134,10 +142,10 @@
 	channelzRemoveOnce sync.Once
 	serveWG            sync.WaitGroup // counts active Serve goroutines for GracefulStop
 
-	channelzID int64 // channelz unique identification number
+	channelzID *channelz.Identifier
 	czData     *channelzData
 
-	serverWorkerChannels []chan *serverWorkerData
+	serverWorkerChannel chan *serverWorkerData
 }
 
 type serverOptions struct {
@@ -149,8 +157,9 @@
 	streamInt             StreamServerInterceptor
 	chainUnaryInts        []UnaryServerInterceptor
 	chainStreamInts       []StreamServerInterceptor
+	binaryLogger          binarylog.Logger
 	inTapHandle           tap.ServerInHandle
-	statsHandler          stats.Handler
+	statsHandlers         []stats.Handler
 	maxConcurrentStreams  uint32
 	maxReceiveMessageSize int
 	maxSendMessageSize    int
@@ -174,6 +183,7 @@
 	writeBufferSize:       defaultWriteBufSize,
 	readBufferSize:        defaultReadBufSize,
 }
+var globalServerOptions []ServerOption
 
 // A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
 type ServerOption interface {
@@ -207,10 +217,27 @@
 	}
 }
 
-// WriteBufferSize determines how much data can be batched before doing a write on the wire.
-// The corresponding memory allocation for this buffer will be twice the size to keep syscalls low.
-// The default value for this buffer is 32KB.
-// Zero will disable the write buffer such that each write will be on underlying connection.
+// joinServerOption provides a way to combine arbitrary number of server
+// options into one.
+type joinServerOption struct {
+	opts []ServerOption
+}
+
+func (mdo *joinServerOption) apply(do *serverOptions) {
+	for _, opt := range mdo.opts {
+		opt.apply(do)
+	}
+}
+
+func newJoinServerOption(opts ...ServerOption) ServerOption {
+	return &joinServerOption{opts: opts}
+}
+
+// WriteBufferSize determines how much data can be batched before doing a write
+// on the wire. The corresponding memory allocation for this buffer will be
+// twice the size to keep syscalls low. The default value for this buffer is
+// 32KB. Zero or negative values will disable the write buffer such that each
+// write will be on underlying connection.
 // Note: A Send call may not directly translate to a write.
 func WriteBufferSize(s int) ServerOption {
 	return newFuncServerOption(func(o *serverOptions) {
@@ -218,11 +245,10 @@
 	})
 }
 
-// ReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most
-// for one read syscall.
-// The default value for this buffer is 32KB.
-// Zero will disable read buffer for a connection so data framer can access the underlying
-// conn directly.
+// ReadBufferSize lets you set the size of read buffer, this determines how much
+// data can be read at most for one read syscall. The default value for this
+// buffer is 32KB. Zero or negative values will disable read buffer for a
+// connection so data framer can access the underlying conn directly.
 func ReadBufferSize(s int) ServerOption {
 	return newFuncServerOption(func(o *serverOptions) {
 		o.readBufferSize = s
@@ -435,7 +461,21 @@
 // StatsHandler returns a ServerOption that sets the stats handler for the server.
 func StatsHandler(h stats.Handler) ServerOption {
 	return newFuncServerOption(func(o *serverOptions) {
-		o.statsHandler = h
+		if h == nil {
+			logger.Error("ignoring nil parameter in grpc.StatsHandler ServerOption")
+			// Do not allow a nil stats handler, which would otherwise cause
+			// panics.
+			return
+		}
+		o.statsHandlers = append(o.statsHandlers, h)
+	})
+}
+
+// binaryLogger returns a ServerOption that can set the binary logger for the
+// server.
+func binaryLogger(bl binarylog.Logger) ServerOption {
+	return newFuncServerOption(func(o *serverOptions) {
+		o.binaryLogger = bl
 	})
 }
 
@@ -520,46 +560,47 @@
 const serverWorkerResetThreshold = 1 << 16
 
 // serverWorkers blocks on a *transport.Stream channel forever and waits for
-// data to be fed by serveStreams. This allows different requests to be
+// data to be fed by serveStreams. This allows multiple requests to be
 // processed by the same goroutine, removing the need for expensive stack
 // re-allocations (see the runtime.morestack problem [1]).
 //
 // [1] https://github.com/golang/go/issues/18138
-func (s *Server) serverWorker(ch chan *serverWorkerData) {
-	// To make sure all server workers don't reset at the same time, choose a
-	// random number of iterations before resetting.
-	threshold := serverWorkerResetThreshold + grpcrand.Intn(serverWorkerResetThreshold)
-	for completed := 0; completed < threshold; completed++ {
-		data, ok := <-ch
+func (s *Server) serverWorker() {
+	for completed := 0; completed < serverWorkerResetThreshold; completed++ {
+		data, ok := <-s.serverWorkerChannel
 		if !ok {
 			return
 		}
-		s.handleStream(data.st, data.stream, s.traceInfo(data.st, data.stream))
-		data.wg.Done()
+		s.handleSingleStream(data)
 	}
-	go s.serverWorker(ch)
+	go s.serverWorker()
 }
 
-// initServerWorkers creates worker goroutines and channels to process incoming
+func (s *Server) handleSingleStream(data *serverWorkerData) {
+	defer data.wg.Done()
+	s.handleStream(data.st, data.stream, s.traceInfo(data.st, data.stream))
+}
+
+// initServerWorkers creates worker goroutines and a channel to process incoming
 // connections to reduce the time spent overall on runtime.morestack.
 func (s *Server) initServerWorkers() {
-	s.serverWorkerChannels = make([]chan *serverWorkerData, s.opts.numServerWorkers)
+	s.serverWorkerChannel = make(chan *serverWorkerData)
 	for i := uint32(0); i < s.opts.numServerWorkers; i++ {
-		s.serverWorkerChannels[i] = make(chan *serverWorkerData)
-		go s.serverWorker(s.serverWorkerChannels[i])
+		go s.serverWorker()
 	}
 }
 
 func (s *Server) stopServerWorkers() {
-	for i := uint32(0); i < s.opts.numServerWorkers; i++ {
-		close(s.serverWorkerChannels[i])
-	}
+	close(s.serverWorkerChannel)
 }
 
 // NewServer creates a gRPC server which has no service registered and has not
 // started to accept requests yet.
 func NewServer(opt ...ServerOption) *Server {
 	opts := defaultServerOptions
+	for _, o := range globalServerOptions {
+		o.apply(&opts)
+	}
 	for _, o := range opt {
 		o.apply(&opts)
 	}
@@ -584,9 +625,8 @@
 		s.initServerWorkers()
 	}
 
-	if channelz.IsOn() {
-		s.channelzID = channelz.RegisterServer(&channelzServer{s}, "")
-	}
+	s.channelzID = channelz.RegisterServer(&channelzServer{s}, "")
+	channelz.Info(logger, s.channelzID, "Server created")
 	return s
 }
 
@@ -712,7 +752,7 @@
 
 type listenSocket struct {
 	net.Listener
-	channelzID int64
+	channelzID *channelz.Identifier
 }
 
 func (l *listenSocket) ChannelzMetric() *channelz.SocketInternalMetric {
@@ -724,9 +764,8 @@
 
 func (l *listenSocket) Close() error {
 	err := l.Listener.Close()
-	if channelz.IsOn() {
-		channelz.RemoveEntry(l.channelzID)
-	}
+	channelz.RemoveEntry(l.channelzID)
+	channelz.Info(logger, l.channelzID, "ListenSocket deleted")
 	return err
 }
 
@@ -759,11 +798,6 @@
 	ls := &listenSocket{Listener: lis}
 	s.lis[ls] = true
 
-	if channelz.IsOn() {
-		ls.channelzID = channelz.RegisterListenSocket(ls, s.channelzID, lis.Addr().String())
-	}
-	s.mu.Unlock()
-
 	defer func() {
 		s.mu.Lock()
 		if s.lis != nil && s.lis[ls] {
@@ -773,8 +807,16 @@
 		s.mu.Unlock()
 	}()
 
-	var tempDelay time.Duration // how long to sleep on accept failure
+	var err error
+	ls.channelzID, err = channelz.RegisterListenSocket(ls, s.channelzID, lis.Addr().String())
+	if err != nil {
+		s.mu.Unlock()
+		return err
+	}
+	s.mu.Unlock()
+	channelz.Info(logger, ls.channelzID, "ListenSocket created")
 
+	var tempDelay time.Duration // how long to sleep on accept failure
 	for {
 		rawConn, err := lis.Accept()
 		if err != nil {
@@ -853,7 +895,7 @@
 	s.mu.Lock()
 	conns := s.conns[addr]
 	for st := range conns {
-		st.Drain()
+		st.Drain("")
 	}
 	s.mu.Unlock()
 }
@@ -866,7 +908,7 @@
 		ConnectionTimeout:     s.opts.connectionTimeout,
 		Credentials:           s.opts.creds,
 		InTapHandle:           s.opts.inTapHandle,
-		StatsHandler:          s.opts.statsHandler,
+		StatsHandlers:         s.opts.statsHandlers,
 		KeepaliveParams:       s.opts.keepaliveParams,
 		KeepalivePolicy:       s.opts.keepalivePolicy,
 		InitialWindowSize:     s.opts.initialWindowSize,
@@ -887,7 +929,7 @@
 		if err != credentials.ErrConnDispatched {
 			// Don't log on ErrConnDispatched and io.EOF to prevent log spam.
 			if err != io.EOF {
-				channelz.Warning(logger, s.channelzID, "grpc: Server.Serve failed to create ServerTransport: ", err)
+				channelz.Info(logger, s.channelzID, "grpc: Server.Serve failed to create ServerTransport: ", err)
 			}
 			c.Close()
 		}
@@ -898,29 +940,24 @@
 }
 
 func (s *Server) serveStreams(st transport.ServerTransport) {
-	defer st.Close()
+	defer st.Close(errors.New("finished serving streams for the server transport"))
 	var wg sync.WaitGroup
 
-	var roundRobinCounter uint32
 	st.HandleStreams(func(stream *transport.Stream) {
 		wg.Add(1)
 		if s.opts.numServerWorkers > 0 {
 			data := &serverWorkerData{st: st, wg: &wg, stream: stream}
 			select {
-			case s.serverWorkerChannels[atomic.AddUint32(&roundRobinCounter, 1)%s.opts.numServerWorkers] <- data:
+			case s.serverWorkerChannel <- data:
+				return
 			default:
 				// If all stream workers are busy, fallback to the default code path.
-				go func() {
-					s.handleStream(st, stream, s.traceInfo(st, stream))
-					wg.Done()
-				}()
 			}
-		} else {
-			go func() {
-				defer wg.Done()
-				s.handleStream(st, stream, s.traceInfo(st, stream))
-			}()
 		}
+		go func() {
+			defer wg.Done()
+			s.handleStream(st, stream, s.traceInfo(st, stream))
+		}()
 	}, func(ctx context.Context, method string) context.Context {
 		if !EnableTracing {
 			return ctx
@@ -962,9 +999,10 @@
 // Notice: This API is EXPERIMENTAL and may be changed or removed in a
 // later release.
 func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
-	st, err := transport.NewServerHandlerTransport(w, r, s.opts.statsHandler)
+	st, err := transport.NewServerHandlerTransport(w, r, s.opts.statsHandlers)
 	if err != nil {
-		http.Error(w, err.Error(), http.StatusInternalServerError)
+		// Errors returned from transport.NewServerHandlerTransport have
+		// already been written to w.
 		return
 	}
 	if !s.addConn(listenerAddressForServeHTTP, st) {
@@ -1002,13 +1040,13 @@
 	s.mu.Lock()
 	defer s.mu.Unlock()
 	if s.conns == nil {
-		st.Close()
+		st.Close(errors.New("Server.addConn called when server has already been stopped"))
 		return false
 	}
 	if s.drain {
 		// Transport added after we drained our existing conns: drain it
 		// immediately.
-		st.Drain()
+		st.Drain("")
 	}
 
 	if s.conns[addr] == nil {
@@ -1075,8 +1113,10 @@
 		return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(payload), s.opts.maxSendMessageSize)
 	}
 	err = t.Write(stream, hdr, payload, opts)
-	if err == nil && s.opts.statsHandler != nil {
-		s.opts.statsHandler.HandleRPC(stream.Context(), outPayload(false, msg, data, payload, time.Now()))
+	if err == nil {
+		for _, sh := range s.opts.statsHandlers {
+			sh.HandleRPC(stream.Context(), outPayload(false, msg, data, payload, time.Now()))
+		}
 	}
 	return err
 }
@@ -1104,32 +1144,27 @@
 
 func chainUnaryInterceptors(interceptors []UnaryServerInterceptor) UnaryServerInterceptor {
 	return func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (interface{}, error) {
-		// the struct ensures the variables are allocated together, rather than separately, since we
-		// know they should be garbage collected together. This saves 1 allocation and decreases
-		// time/call by about 10% on the microbenchmark.
-		var state struct {
-			i    int
-			next UnaryHandler
-		}
-		state.next = func(ctx context.Context, req interface{}) (interface{}, error) {
-			if state.i == len(interceptors)-1 {
-				return interceptors[state.i](ctx, req, info, handler)
-			}
-			state.i++
-			return interceptors[state.i-1](ctx, req, info, state.next)
-		}
-		return state.next(ctx, req)
+		return interceptors[0](ctx, req, info, getChainUnaryHandler(interceptors, 0, info, handler))
+	}
+}
+
+func getChainUnaryHandler(interceptors []UnaryServerInterceptor, curr int, info *UnaryServerInfo, finalHandler UnaryHandler) UnaryHandler {
+	if curr == len(interceptors)-1 {
+		return finalHandler
+	}
+	return func(ctx context.Context, req interface{}) (interface{}, error) {
+		return interceptors[curr+1](ctx, req, info, getChainUnaryHandler(interceptors, curr+1, info, finalHandler))
 	}
 }
 
 func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, md *MethodDesc, trInfo *traceInfo) (err error) {
-	sh := s.opts.statsHandler
-	if sh != nil || trInfo != nil || channelz.IsOn() {
+	shs := s.opts.statsHandlers
+	if len(shs) != 0 || trInfo != nil || channelz.IsOn() {
 		if channelz.IsOn() {
 			s.incrCallsStarted()
 		}
 		var statsBegin *stats.Begin
-		if sh != nil {
+		for _, sh := range shs {
 			beginTime := time.Now()
 			statsBegin = &stats.Begin{
 				BeginTime:      beginTime,
@@ -1160,7 +1195,7 @@
 				trInfo.tr.Finish()
 			}
 
-			if sh != nil {
+			for _, sh := range shs {
 				end := &stats.End{
 					BeginTime: statsBegin.BeginTime,
 					EndTime:   time.Now(),
@@ -1180,9 +1215,16 @@
 			}
 		}()
 	}
-
-	binlog := binarylog.GetMethodLogger(stream.Method())
-	if binlog != nil {
+	var binlogs []binarylog.MethodLogger
+	if ml := binarylog.GetMethodLogger(stream.Method()); ml != nil {
+		binlogs = append(binlogs, ml)
+	}
+	if s.opts.binaryLogger != nil {
+		if ml := s.opts.binaryLogger.GetMethodLogger(stream.Method()); ml != nil {
+			binlogs = append(binlogs, ml)
+		}
+	}
+	if len(binlogs) != 0 {
 		ctx := stream.Context()
 		md, _ := metadata.FromIncomingContext(ctx)
 		logEntry := &binarylog.ClientHeader{
@@ -1202,7 +1244,9 @@
 		if peer, ok := peer.FromContext(ctx); ok {
 			logEntry.PeerAddr = peer.Addr
 		}
-		binlog.Log(logEntry)
+		for _, binlog := range binlogs {
+			binlog.Log(ctx, logEntry)
+		}
 	}
 
 	// comp and cp are used for compression.  decomp and dc are used for
@@ -1212,6 +1256,7 @@
 	var comp, decomp encoding.Compressor
 	var cp Compressor
 	var dc Decompressor
+	var sendCompressorName string
 
 	// If dc is set and matches the stream's compression, use it.  Otherwise, try
 	// to find a matching registered compressor for decomp.
@@ -1232,23 +1277,29 @@
 	// NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
 	if s.opts.cp != nil {
 		cp = s.opts.cp
-		stream.SetSendCompress(cp.Type())
+		sendCompressorName = cp.Type()
 	} else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
 		// Legacy compressor not specified; attempt to respond with same encoding.
 		comp = encoding.GetCompressor(rc)
 		if comp != nil {
-			stream.SetSendCompress(rc)
+			sendCompressorName = comp.Name()
+		}
+	}
+
+	if sendCompressorName != "" {
+		if err := stream.SetSendCompress(sendCompressorName); err != nil {
+			return status.Errorf(codes.Internal, "grpc: failed to set send compressor: %v", err)
 		}
 	}
 
 	var payInfo *payloadInfo
-	if sh != nil || binlog != nil {
+	if len(shs) != 0 || len(binlogs) != 0 {
 		payInfo = &payloadInfo{}
 	}
 	d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)
 	if err != nil {
 		if e := t.WriteStatus(stream, status.Convert(err)); e != nil {
-			channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status %v", e)
+			channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
 		}
 		return err
 	}
@@ -1259,19 +1310,23 @@
 		if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil {
 			return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
 		}
-		if sh != nil {
+		for _, sh := range shs {
 			sh.HandleRPC(stream.Context(), &stats.InPayload{
-				RecvTime:   time.Now(),
-				Payload:    v,
-				WireLength: payInfo.wireLength + headerLen,
-				Data:       d,
-				Length:     len(d),
+				RecvTime:         time.Now(),
+				Payload:          v,
+				Length:           len(d),
+				WireLength:       payInfo.compressedLength + headerLen,
+				CompressedLength: payInfo.compressedLength,
+				Data:             d,
 			})
 		}
-		if binlog != nil {
-			binlog.Log(&binarylog.ClientMessage{
+		if len(binlogs) != 0 {
+			cm := &binarylog.ClientMessage{
 				Message: d,
-			})
+			}
+			for _, binlog := range binlogs {
+				binlog.Log(stream.Context(), cm)
+			}
 		}
 		if trInfo != nil {
 			trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
@@ -1283,9 +1338,10 @@
 	if appErr != nil {
 		appStatus, ok := status.FromError(appErr)
 		if !ok {
-			// Convert appErr if it is not a grpc status error.
-			appErr = status.Error(codes.Unknown, appErr.Error())
-			appStatus, _ = status.FromError(appErr)
+			// Convert non-status application error to a status error with code
+			// Unknown, but handle context errors specifically.
+			appStatus = status.FromContextError(appErr)
+			appErr = appStatus.Err()
 		}
 		if trInfo != nil {
 			trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
@@ -1294,18 +1350,24 @@
 		if e := t.WriteStatus(stream, appStatus); e != nil {
 			channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
 		}
-		if binlog != nil {
+		if len(binlogs) != 0 {
 			if h, _ := stream.Header(); h.Len() > 0 {
 				// Only log serverHeader if there was header. Otherwise it can
 				// be trailer only.
-				binlog.Log(&binarylog.ServerHeader{
+				sh := &binarylog.ServerHeader{
 					Header: h,
-				})
+				}
+				for _, binlog := range binlogs {
+					binlog.Log(stream.Context(), sh)
+				}
 			}
-			binlog.Log(&binarylog.ServerTrailer{
+			st := &binarylog.ServerTrailer{
 				Trailer: stream.Trailer(),
 				Err:     appErr,
-			})
+			}
+			for _, binlog := range binlogs {
+				binlog.Log(stream.Context(), st)
+			}
 		}
 		return appErr
 	}
@@ -1314,6 +1376,11 @@
 	}
 	opts := &transport.Options{Last: true}
 
+	// Server handler could have set new compressor by calling SetSendCompressor.
+	// In case it is set, we need to use it for compressing outbound message.
+	if stream.SendCompress() != sendCompressorName {
+		comp = encoding.GetCompressor(stream.SendCompress())
+	}
 	if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
 		if err == io.EOF {
 			// The entire stream is done (for unary RPC only).
@@ -1331,26 +1398,34 @@
 				panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st))
 			}
 		}
-		if binlog != nil {
+		if len(binlogs) != 0 {
 			h, _ := stream.Header()
-			binlog.Log(&binarylog.ServerHeader{
+			sh := &binarylog.ServerHeader{
 				Header: h,
-			})
-			binlog.Log(&binarylog.ServerTrailer{
+			}
+			st := &binarylog.ServerTrailer{
 				Trailer: stream.Trailer(),
 				Err:     appErr,
-			})
+			}
+			for _, binlog := range binlogs {
+				binlog.Log(stream.Context(), sh)
+				binlog.Log(stream.Context(), st)
+			}
 		}
 		return err
 	}
-	if binlog != nil {
+	if len(binlogs) != 0 {
 		h, _ := stream.Header()
-		binlog.Log(&binarylog.ServerHeader{
+		sh := &binarylog.ServerHeader{
 			Header: h,
-		})
-		binlog.Log(&binarylog.ServerMessage{
+		}
+		sm := &binarylog.ServerMessage{
 			Message: reply,
-		})
+		}
+		for _, binlog := range binlogs {
+			binlog.Log(stream.Context(), sh)
+			binlog.Log(stream.Context(), sm)
+		}
 	}
 	if channelz.IsOn() {
 		t.IncrMsgSent()
@@ -1361,14 +1436,16 @@
 	// TODO: Should we be logging if writing status failed here, like above?
 	// Should the logging be in WriteStatus?  Should we ignore the WriteStatus
 	// error or allow the stats handler to see it?
-	err = t.WriteStatus(stream, statusOK)
-	if binlog != nil {
-		binlog.Log(&binarylog.ServerTrailer{
+	if len(binlogs) != 0 {
+		st := &binarylog.ServerTrailer{
 			Trailer: stream.Trailer(),
 			Err:     appErr,
-		})
+		}
+		for _, binlog := range binlogs {
+			binlog.Log(stream.Context(), st)
+		}
 	}
-	return err
+	return t.WriteStatus(stream, statusOK)
 }
 
 // chainStreamServerInterceptors chains all stream server interceptors into one.
@@ -1394,21 +1471,16 @@
 
 func chainStreamInterceptors(interceptors []StreamServerInterceptor) StreamServerInterceptor {
 	return func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error {
-		// the struct ensures the variables are allocated together, rather than separately, since we
-		// know they should be garbage collected together. This saves 1 allocation and decreases
-		// time/call by about 10% on the microbenchmark.
-		var state struct {
-			i    int
-			next StreamHandler
-		}
-		state.next = func(srv interface{}, ss ServerStream) error {
-			if state.i == len(interceptors)-1 {
-				return interceptors[state.i](srv, ss, info, handler)
-			}
-			state.i++
-			return interceptors[state.i-1](srv, ss, info, state.next)
-		}
-		return state.next(srv, ss)
+		return interceptors[0](srv, ss, info, getChainStreamHandler(interceptors, 0, info, handler))
+	}
+}
+
+func getChainStreamHandler(interceptors []StreamServerInterceptor, curr int, info *StreamServerInfo, finalHandler StreamHandler) StreamHandler {
+	if curr == len(interceptors)-1 {
+		return finalHandler
+	}
+	return func(srv interface{}, stream ServerStream) error {
+		return interceptors[curr+1](srv, stream, info, getChainStreamHandler(interceptors, curr+1, info, finalHandler))
 	}
 }
 
@@ -1416,16 +1488,18 @@
 	if channelz.IsOn() {
 		s.incrCallsStarted()
 	}
-	sh := s.opts.statsHandler
+	shs := s.opts.statsHandlers
 	var statsBegin *stats.Begin
-	if sh != nil {
+	if len(shs) != 0 {
 		beginTime := time.Now()
 		statsBegin = &stats.Begin{
 			BeginTime:      beginTime,
 			IsClientStream: sd.ClientStreams,
 			IsServerStream: sd.ServerStreams,
 		}
-		sh.HandleRPC(stream.Context(), statsBegin)
+		for _, sh := range shs {
+			sh.HandleRPC(stream.Context(), statsBegin)
+		}
 	}
 	ctx := NewContextWithServerTransportStream(stream.Context(), stream)
 	ss := &serverStream{
@@ -1437,10 +1511,10 @@
 		maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
 		maxSendMessageSize:    s.opts.maxSendMessageSize,
 		trInfo:                trInfo,
-		statsHandler:          sh,
+		statsHandler:          shs,
 	}
 
-	if sh != nil || trInfo != nil || channelz.IsOn() {
+	if len(shs) != 0 || trInfo != nil || channelz.IsOn() {
 		// See comment in processUnaryRPC on defers.
 		defer func() {
 			if trInfo != nil {
@@ -1454,7 +1528,7 @@
 				ss.mu.Unlock()
 			}
 
-			if sh != nil {
+			if len(shs) != 0 {
 				end := &stats.End{
 					BeginTime: statsBegin.BeginTime,
 					EndTime:   time.Now(),
@@ -1462,7 +1536,9 @@
 				if err != nil && err != io.EOF {
 					end.Error = toRPCErr(err)
 				}
-				sh.HandleRPC(stream.Context(), end)
+				for _, sh := range shs {
+					sh.HandleRPC(stream.Context(), end)
+				}
 			}
 
 			if channelz.IsOn() {
@@ -1475,8 +1551,15 @@
 		}()
 	}
 
-	ss.binlog = binarylog.GetMethodLogger(stream.Method())
-	if ss.binlog != nil {
+	if ml := binarylog.GetMethodLogger(stream.Method()); ml != nil {
+		ss.binlogs = append(ss.binlogs, ml)
+	}
+	if s.opts.binaryLogger != nil {
+		if ml := s.opts.binaryLogger.GetMethodLogger(stream.Method()); ml != nil {
+			ss.binlogs = append(ss.binlogs, ml)
+		}
+	}
+	if len(ss.binlogs) != 0 {
 		md, _ := metadata.FromIncomingContext(ctx)
 		logEntry := &binarylog.ClientHeader{
 			Header:     md,
@@ -1495,7 +1578,9 @@
 		if peer, ok := peer.FromContext(ss.Context()); ok {
 			logEntry.PeerAddr = peer.Addr
 		}
-		ss.binlog.Log(logEntry)
+		for _, binlog := range ss.binlogs {
+			binlog.Log(stream.Context(), logEntry)
+		}
 	}
 
 	// If dc is set and matches the stream's compression, use it.  Otherwise, try
@@ -1517,12 +1602,18 @@
 	// NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
 	if s.opts.cp != nil {
 		ss.cp = s.opts.cp
-		stream.SetSendCompress(s.opts.cp.Type())
+		ss.sendCompressorName = s.opts.cp.Type()
 	} else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
 		// Legacy compressor not specified; attempt to respond with same encoding.
 		ss.comp = encoding.GetCompressor(rc)
 		if ss.comp != nil {
-			stream.SetSendCompress(rc)
+			ss.sendCompressorName = rc
+		}
+	}
+
+	if ss.sendCompressorName != "" {
+		if err := stream.SetSendCompress(ss.sendCompressorName); err != nil {
+			return status.Errorf(codes.Internal, "grpc: failed to set send compressor: %v", err)
 		}
 	}
 
@@ -1549,7 +1640,9 @@
 	if appErr != nil {
 		appStatus, ok := status.FromError(appErr)
 		if !ok {
-			appStatus = status.New(codes.Unknown, appErr.Error())
+			// Convert non-status application error to a status error with code
+			// Unknown, but handle context errors specifically.
+			appStatus = status.FromContextError(appErr)
 			appErr = appStatus.Err()
 		}
 		if trInfo != nil {
@@ -1558,13 +1651,16 @@
 			ss.trInfo.tr.SetError()
 			ss.mu.Unlock()
 		}
-		t.WriteStatus(ss.s, appStatus)
-		if ss.binlog != nil {
-			ss.binlog.Log(&binarylog.ServerTrailer{
+		if len(ss.binlogs) != 0 {
+			st := &binarylog.ServerTrailer{
 				Trailer: ss.s.Trailer(),
 				Err:     appErr,
-			})
+			}
+			for _, binlog := range ss.binlogs {
+				binlog.Log(stream.Context(), st)
+			}
 		}
+		t.WriteStatus(ss.s, appStatus)
 		// TODO: Should we log an error from WriteStatus here and below?
 		return appErr
 	}
@@ -1573,14 +1669,16 @@
 		ss.trInfo.tr.LazyLog(stringer("OK"), false)
 		ss.mu.Unlock()
 	}
-	err = t.WriteStatus(ss.s, statusOK)
-	if ss.binlog != nil {
-		ss.binlog.Log(&binarylog.ServerTrailer{
+	if len(ss.binlogs) != 0 {
+		st := &binarylog.ServerTrailer{
 			Trailer: ss.s.Trailer(),
 			Err:     appErr,
-		})
+		}
+		for _, binlog := range ss.binlogs {
+			binlog.Log(stream.Context(), st)
+		}
 	}
-	return err
+	return t.WriteStatus(ss.s, statusOK)
 }
 
 func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
@@ -1706,11 +1804,7 @@
 		s.done.Fire()
 	}()
 
-	s.channelzRemoveOnce.Do(func() {
-		if channelz.IsOn() {
-			channelz.RemoveEntry(s.channelzID)
-		}
-	})
+	s.channelzRemoveOnce.Do(func() { channelz.RemoveEntry(s.channelzID) })
 
 	s.mu.Lock()
 	listeners := s.lis
@@ -1726,7 +1820,7 @@
 	}
 	for _, cs := range conns {
 		for st := range cs {
-			st.Close()
+			st.Close(errors.New("Server.Stop called"))
 		}
 	}
 	if s.opts.numServerWorkers > 0 {
@@ -1748,11 +1842,7 @@
 	s.quit.Fire()
 	defer s.done.Fire()
 
-	s.channelzRemoveOnce.Do(func() {
-		if channelz.IsOn() {
-			channelz.RemoveEntry(s.channelzID)
-		}
-	})
+	s.channelzRemoveOnce.Do(func() { channelz.RemoveEntry(s.channelzID) })
 	s.mu.Lock()
 	if s.conns == nil {
 		s.mu.Unlock()
@@ -1766,7 +1856,7 @@
 	if !s.drain {
 		for _, conns := range s.conns {
 			for st := range conns {
-				st.Drain()
+				st.Drain("graceful_stop")
 			}
 		}
 		s.drain = true
@@ -1805,12 +1895,26 @@
 	return codec
 }
 
-// SetHeader sets the header metadata.
-// When called multiple times, all the provided metadata will be merged.
-// All the metadata will be sent out when one of the following happens:
-//   - grpc.SendHeader() is called;
-//   - The first response is sent out;
-//   - An RPC status is sent out (error or success).
+// SetHeader sets the header metadata to be sent from the server to the client.
+// The context provided must be the context passed to the server's handler.
+//
+// Streaming RPCs should prefer the SetHeader method of the ServerStream.
+//
+// When called multiple times, all the provided metadata will be merged.  All
+// the metadata will be sent out when one of the following happens:
+//
+//   - grpc.SendHeader is called, or for streaming handlers, stream.SendHeader.
+//   - The first response message is sent.  For unary handlers, this occurs when
+//     the handler returns; for streaming handlers, this can happen when stream's
+//     SendMsg method is called.
+//   - An RPC status is sent out (error or success).  This occurs when the handler
+//     returns.
+//
+// SetHeader will fail if called after any of the events above.
+//
+// The error returned is compatible with the status package.  However, the
+// status code will often not match the RPC status as seen by the client
+// application, and therefore, should not be relied upon for this purpose.
 func SetHeader(ctx context.Context, md metadata.MD) error {
 	if md.Len() == 0 {
 		return nil
@@ -1822,8 +1926,14 @@
 	return stream.SetHeader(md)
 }
 
-// SendHeader sends header metadata. It may be called at most once.
-// The provided md and headers set by SetHeader() will be sent.
+// SendHeader sends header metadata. It may be called at most once, and may not
+// be called after any event that causes headers to be sent (see SetHeader for
+// a complete list).  The provided md and headers set by SetHeader() will be
+// sent.
+//
+// The error returned is compatible with the status package.  However, the
+// status code will often not match the RPC status as seen by the client
+// application, and therefore, should not be relied upon for this purpose.
 func SendHeader(ctx context.Context, md metadata.MD) error {
 	stream := ServerTransportStreamFromContext(ctx)
 	if stream == nil {
@@ -1835,8 +1945,66 @@
 	return nil
 }
 
+// SetSendCompressor sets a compressor for outbound messages from the server.
+// It must not be called after any event that causes headers to be sent
+// (see ServerStream.SetHeader for the complete list). Provided compressor is
+// used when below conditions are met:
+//
+//   - compressor is registered via encoding.RegisterCompressor
+//   - compressor name must exist in the client advertised compressor names
+//     sent in grpc-accept-encoding header. Use ClientSupportedCompressors to
+//     get client supported compressor names.
+//
+// The context provided must be the context passed to the server's handler.
+// It must be noted that compressor name encoding.Identity disables the
+// outbound compression.
+// By default, server messages will be sent using the same compressor with
+// which request messages were sent.
+//
+// It is not safe to call SetSendCompressor concurrently with SendHeader and
+// SendMsg.
+//
+// # Experimental
+//
+// Notice: This function is EXPERIMENTAL and may be changed or removed in a
+// later release.
+func SetSendCompressor(ctx context.Context, name string) error {
+	stream, ok := ServerTransportStreamFromContext(ctx).(*transport.Stream)
+	if !ok || stream == nil {
+		return fmt.Errorf("failed to fetch the stream from the given context")
+	}
+
+	if err := validateSendCompressor(name, stream.ClientAdvertisedCompressors()); err != nil {
+		return fmt.Errorf("unable to set send compressor: %w", err)
+	}
+
+	return stream.SetSendCompress(name)
+}
+
+// ClientSupportedCompressors returns compressor names advertised by the client
+// via grpc-accept-encoding header.
+//
+// The context provided must be the context passed to the server's handler.
+//
+// # Experimental
+//
+// Notice: This function is EXPERIMENTAL and may be changed or removed in a
+// later release.
+func ClientSupportedCompressors(ctx context.Context) ([]string, error) {
+	stream, ok := ServerTransportStreamFromContext(ctx).(*transport.Stream)
+	if !ok || stream == nil {
+		return nil, fmt.Errorf("failed to fetch the stream from the given context %v", ctx)
+	}
+
+	return strings.Split(stream.ClientAdvertisedCompressors(), ","), nil
+}
+
 // SetTrailer sets the trailer metadata that will be sent when an RPC returns.
 // When called more than once, all the provided metadata will be merged.
+//
+// The error returned is compatible with the status package.  However, the
+// status code will often not match the RPC status as seen by the client
+// application, and therefore, should not be relied upon for this purpose.
 func SetTrailer(ctx context.Context, md metadata.MD) error {
 	if md.Len() == 0 {
 		return nil
@@ -1865,3 +2033,22 @@
 func (c *channelzServer) ChannelzMetric() *channelz.ServerInternalMetric {
 	return c.s.channelzMetric()
 }
+
+// validateSendCompressor returns an error when given compressor name cannot be
+// handled by the server or the client based on the advertised compressors.
+func validateSendCompressor(name, clientCompressors string) error {
+	if name == encoding.Identity {
+		return nil
+	}
+
+	if !grpcutil.IsCompressorNameRegistered(name) {
+		return fmt.Errorf("compressor not registered %q", name)
+	}
+
+	for _, c := range strings.Split(clientCompressors, ",") {
+		if c == name {
+			return nil // found match
+		}
+	}
+	return fmt.Errorf("client does not support compressor %q", name)
+}