Updating to latest protos and device-management interface, releasing 2.0

Change-Id: I2d2ebf5b305d6d06b8d01c49d4d67e7ff050f5d4
diff --git a/vendor/google.golang.org/grpc/server.go b/vendor/google.golang.org/grpc/server.go
index edfcdca..eadf9e0 100644
--- a/vendor/google.golang.org/grpc/server.go
+++ b/vendor/google.golang.org/grpc/server.go
@@ -40,8 +40,10 @@
 	"google.golang.org/grpc/encoding"
 	"google.golang.org/grpc/encoding/proto"
 	"google.golang.org/grpc/grpclog"
+	"google.golang.org/grpc/internal"
 	"google.golang.org/grpc/internal/binarylog"
 	"google.golang.org/grpc/internal/channelz"
+	"google.golang.org/grpc/internal/grpcrand"
 	"google.golang.org/grpc/internal/grpcsync"
 	"google.golang.org/grpc/internal/transport"
 	"google.golang.org/grpc/keepalive"
@@ -55,9 +57,26 @@
 const (
 	defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4
 	defaultServerMaxSendMessageSize    = math.MaxInt32
+
+	// Server transports are tracked in a map which is keyed on listener
+	// address. For regular gRPC traffic, connections are accepted in Serve()
+	// through a call to Accept(), and we use the actual listener address as key
+	// when we add it to the map. But for connections received through
+	// ServeHTTP(), we do not have a listener and hence use this dummy value.
+	listenerAddressForServeHTTP = "listenerAddressForServeHTTP"
 )
 
+func init() {
+	internal.GetServerCredentials = func(srv *Server) credentials.TransportCredentials {
+		return srv.opts.creds
+	}
+	internal.DrainServerTransports = func(srv *Server, addr string) {
+		srv.drainServerTransports(addr)
+	}
+}
+
 var statusOK = status.New(codes.OK, "")
+var logger = grpclog.Component("core")
 
 type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error)
 
@@ -78,27 +97,37 @@
 	Metadata    interface{}
 }
 
-// service consists of the information of the server serving this service and
-// the methods in this service.
-type service struct {
-	server interface{} // the server for service methods
-	md     map[string]*MethodDesc
-	sd     map[string]*StreamDesc
-	mdata  interface{}
+// serviceInfo wraps information about a service. It is very similar to
+// ServiceDesc and is constructed from it for internal purposes.
+type serviceInfo struct {
+	// Contains the implementation for the methods in this service.
+	serviceImpl interface{}
+	methods     map[string]*MethodDesc
+	streams     map[string]*StreamDesc
+	mdata       interface{}
+}
+
+type serverWorkerData struct {
+	st     transport.ServerTransport
+	wg     *sync.WaitGroup
+	stream *transport.Stream
 }
 
 // Server is a gRPC server to serve RPC requests.
 type Server struct {
 	opts serverOptions
 
-	mu     sync.Mutex // guards following
-	lis    map[net.Listener]bool
-	conns  map[transport.ServerTransport]bool
-	serve  bool
-	drain  bool
-	cv     *sync.Cond          // signaled when connections close for GracefulStop
-	m      map[string]*service // service name -> service info
-	events trace.EventLog
+	mu  sync.Mutex // guards following
+	lis map[net.Listener]bool
+	// conns contains all active server transports. It is a map keyed on a
+	// listener address with the value being the set of active transports
+	// belonging to that listener.
+	conns    map[string]map[transport.ServerTransport]bool
+	serve    bool
+	drain    bool
+	cv       *sync.Cond              // signaled when connections close for GracefulStop
+	services map[string]*serviceInfo // service name -> service info
+	events   trace.EventLog
 
 	quit               *grpcsync.Event
 	done               *grpcsync.Event
@@ -107,6 +136,8 @@
 
 	channelzID int64 // channelz unique identification number
 	czData     *channelzData
+
+	serverWorkerChannels []chan *serverWorkerData
 }
 
 type serverOptions struct {
@@ -133,6 +164,7 @@
 	connectionTimeout     time.Duration
 	maxHeaderListSize     *uint32
 	headerTableSize       *uint32
+	numServerWorkers      uint32
 }
 
 var defaultServerOptions = serverOptions{
@@ -151,7 +183,10 @@
 // EmptyServerOption does not alter the server configuration. It can be embedded
 // in another structure to build custom server options.
 //
-// This API is EXPERIMENTAL.
+// Experimental
+//
+// Notice: This type is EXPERIMENTAL and may be changed or removed in a
+// later release.
 type EmptyServerOption struct{}
 
 func (EmptyServerOption) apply(*serverOptions) {}
@@ -213,7 +248,7 @@
 // KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
 func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
 	if kp.Time > 0 && kp.Time < time.Second {
-		grpclog.Warning("Adjusting keepalive ping interval to minimum period of 1s")
+		logger.Warning("Adjusting keepalive ping interval to minimum period of 1s")
 		kp.Time = time.Second
 	}
 
@@ -232,19 +267,55 @@
 // CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
 //
 // This will override any lookups by content-subtype for Codecs registered with RegisterCodec.
+//
+// Deprecated: register codecs using encoding.RegisterCodec. The server will
+// automatically use registered codecs based on the incoming requests' headers.
+// See also
+// https://github.com/grpc/grpc-go/blob/master/Documentation/encoding.md#using-a-codec.
+// Will be supported throughout 1.x.
 func CustomCodec(codec Codec) ServerOption {
 	return newFuncServerOption(func(o *serverOptions) {
 		o.codec = codec
 	})
 }
 
+// ForceServerCodec returns a ServerOption that sets a codec for message
+// marshaling and unmarshaling.
+//
+// This will override any lookups by content-subtype for Codecs registered
+// with RegisterCodec.
+//
+// See Content-Type on
+// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
+// more details. Also see the documentation on RegisterCodec and
+// CallContentSubtype for more details on the interaction between encoding.Codec
+// and content-subtype.
+//
+// This function is provided for advanced users; prefer to register codecs
+// using encoding.RegisterCodec.
+// The server will automatically use registered codecs based on the incoming
+// requests' headers. See also
+// https://github.com/grpc/grpc-go/blob/master/Documentation/encoding.md#using-a-codec.
+// Will be supported throughout 1.x.
+//
+// Experimental
+//
+// Notice: This API is EXPERIMENTAL and may be changed or removed in a
+// later release.
+func ForceServerCodec(codec encoding.Codec) ServerOption {
+	return newFuncServerOption(func(o *serverOptions) {
+		o.codec = codec
+	})
+}
+
 // RPCCompressor returns a ServerOption that sets a compressor for outbound
 // messages.  For backward compatibility, all outbound messages will be sent
 // using this compressor, regardless of incoming message compression.  By
 // default, server messages will be sent using the same compressor with which
 // request messages were sent.
 //
-// Deprecated: use encoding.RegisterCompressor instead.
+// Deprecated: use encoding.RegisterCompressor instead. Will be supported
+// throughout 1.x.
 func RPCCompressor(cp Compressor) ServerOption {
 	return newFuncServerOption(func(o *serverOptions) {
 		o.cp = cp
@@ -255,7 +326,8 @@
 // messages.  It has higher priority than decompressors registered via
 // encoding.RegisterCompressor.
 //
-// Deprecated: use encoding.RegisterCompressor instead.
+// Deprecated: use encoding.RegisterCompressor instead. Will be supported
+// throughout 1.x.
 func RPCDecompressor(dc Decompressor) ServerOption {
 	return newFuncServerOption(func(o *serverOptions) {
 		o.dc = dc
@@ -265,7 +337,7 @@
 // MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
 // If this is not set, gRPC uses the default limit.
 //
-// Deprecated: use MaxRecvMsgSize instead.
+// Deprecated: use MaxRecvMsgSize instead. Will be supported throughout 1.x.
 func MaxMsgSize(m int) ServerOption {
 	return MaxRecvMsgSize(m)
 }
@@ -335,7 +407,7 @@
 }
 
 // ChainStreamInterceptor returns a ServerOption that specifies the chained interceptor
-// for stream RPCs. The first interceptor will be the outer most,
+// for streaming RPCs. The first interceptor will be the outer most,
 // while the last interceptor will be the inner most wrapper around the real call.
 // All stream interceptors added by this method will be chained.
 func ChainStreamInterceptor(interceptors ...StreamServerInterceptor) ServerOption {
@@ -346,6 +418,11 @@
 
 // InTapHandle returns a ServerOption that sets the tap handle for all the server
 // transport to be created. Only one can be installed.
+//
+// Experimental
+//
+// Notice: This API is EXPERIMENTAL and may be changed or removed in a
+// later release.
 func InTapHandle(h tap.ServerInHandle) ServerOption {
 	return newFuncServerOption(func(o *serverOptions) {
 		if o.inTapHandle != nil {
@@ -385,7 +462,10 @@
 // new connections.  If this is not set, the default is 120 seconds.  A zero or
 // negative value will result in an immediate timeout.
 //
-// This API is EXPERIMENTAL.
+// Experimental
+//
+// Notice: This API is EXPERIMENTAL and may be changed or removed in a
+// later release.
 func ConnectionTimeout(d time.Duration) ServerOption {
 	return newFuncServerOption(func(o *serverOptions) {
 		o.connectionTimeout = d
@@ -403,13 +483,79 @@
 // HeaderTableSize returns a ServerOption that sets the size of dynamic
 // header table for stream.
 //
-// This API is EXPERIMENTAL.
+// Experimental
+//
+// Notice: This API is EXPERIMENTAL and may be changed or removed in a
+// later release.
 func HeaderTableSize(s uint32) ServerOption {
 	return newFuncServerOption(func(o *serverOptions) {
 		o.headerTableSize = &s
 	})
 }
 
+// NumStreamWorkers returns a ServerOption that sets the number of worker
+// goroutines that should be used to process incoming streams. Setting this to
+// zero (default) will disable workers and spawn a new goroutine for each
+// stream.
+//
+// Experimental
+//
+// Notice: This API is EXPERIMENTAL and may be changed or removed in a
+// later release.
+func NumStreamWorkers(numServerWorkers uint32) ServerOption {
+	// TODO: If/when this API gets stabilized (i.e. stream workers become the
+	// only way streams are processed), change the behavior of the zero value to
+	// a sane default. Preliminary experiments suggest that a value equal to the
+	// number of CPUs available is most performant; requires thorough testing.
+	return newFuncServerOption(func(o *serverOptions) {
+		o.numServerWorkers = numServerWorkers
+	})
+}
+
+// serverWorkerResetThreshold defines how often the stack must be reset. Every
+// N requests, by spawning a new goroutine in its place, a worker can reset its
+// stack so that large stacks don't live in memory forever. 2^16 should allow
+// each goroutine stack to live for at least a few seconds in a typical
+// workload (assuming a QPS of a few thousand requests/sec).
+const serverWorkerResetThreshold = 1 << 16
+
+// serverWorkers blocks on a *transport.Stream channel forever and waits for
+// data to be fed by serveStreams. This allows different requests to be
+// processed by the same goroutine, removing the need for expensive stack
+// re-allocations (see the runtime.morestack problem [1]).
+//
+// [1] https://github.com/golang/go/issues/18138
+func (s *Server) serverWorker(ch chan *serverWorkerData) {
+	// To make sure all server workers don't reset at the same time, choose a
+	// random number of iterations before resetting.
+	threshold := serverWorkerResetThreshold + grpcrand.Intn(serverWorkerResetThreshold)
+	for completed := 0; completed < threshold; completed++ {
+		data, ok := <-ch
+		if !ok {
+			return
+		}
+		s.handleStream(data.st, data.stream, s.traceInfo(data.st, data.stream))
+		data.wg.Done()
+	}
+	go s.serverWorker(ch)
+}
+
+// initServerWorkers creates worker goroutines and channels to process incoming
+// connections to reduce the time spent overall on runtime.morestack.
+func (s *Server) initServerWorkers() {
+	s.serverWorkerChannels = make([]chan *serverWorkerData, s.opts.numServerWorkers)
+	for i := uint32(0); i < s.opts.numServerWorkers; i++ {
+		s.serverWorkerChannels[i] = make(chan *serverWorkerData)
+		go s.serverWorker(s.serverWorkerChannels[i])
+	}
+}
+
+func (s *Server) stopServerWorkers() {
+	for i := uint32(0); i < s.opts.numServerWorkers; i++ {
+		close(s.serverWorkerChannels[i])
+	}
+}
+
 // NewServer creates a gRPC server which has no service registered and has not
 // started to accept requests yet.
 func NewServer(opt ...ServerOption) *Server {
@@ -418,13 +564,13 @@
 		o.apply(&opts)
 	}
 	s := &Server{
-		lis:    make(map[net.Listener]bool),
-		opts:   opts,
-		conns:  make(map[transport.ServerTransport]bool),
-		m:      make(map[string]*service),
-		quit:   grpcsync.NewEvent(),
-		done:   grpcsync.NewEvent(),
-		czData: new(channelzData),
+		lis:      make(map[net.Listener]bool),
+		opts:     opts,
+		conns:    make(map[string]map[transport.ServerTransport]bool),
+		services: make(map[string]*serviceInfo),
+		quit:     grpcsync.NewEvent(),
+		done:     grpcsync.NewEvent(),
+		czData:   new(channelzData),
 	}
 	chainUnaryServerInterceptors(s)
 	chainStreamServerInterceptors(s)
@@ -434,6 +580,10 @@
 		s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
 	}
 
+	if s.opts.numServerWorkers > 0 {
+		s.initServerWorkers()
+	}
+
 	if channelz.IsOn() {
 		s.channelzID = channelz.RegisterServer(&channelzServer{s}, "")
 	}
@@ -456,14 +606,29 @@
 	}
 }
 
+// ServiceRegistrar wraps a single method that supports service registration. It
+// enables users to pass concrete types other than grpc.Server to the service
+// registration methods exported by the IDL generated code.
+type ServiceRegistrar interface {
+	// RegisterService registers a service and its implementation to the
+	// concrete type implementing this interface.  It may not be called
+	// once the server has started serving.
+	// desc describes the service and its methods and handlers. impl is the
+	// service implementation which is passed to the method handlers.
+	RegisterService(desc *ServiceDesc, impl interface{})
+}
+
 // RegisterService registers a service and its implementation to the gRPC
 // server. It is called from the IDL generated code. This must be called before
-// invoking Serve.
+// invoking Serve. If ss is non-nil (for legacy code), its type is checked to
+// ensure it implements sd.HandlerType.
 func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
-	ht := reflect.TypeOf(sd.HandlerType).Elem()
-	st := reflect.TypeOf(ss)
-	if !st.Implements(ht) {
-		grpclog.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
+	if ss != nil {
+		ht := reflect.TypeOf(sd.HandlerType).Elem()
+		st := reflect.TypeOf(ss)
+		if !st.Implements(ht) {
+			logger.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
+		}
 	}
 	s.register(sd, ss)
 }
@@ -473,26 +638,26 @@
 	defer s.mu.Unlock()
 	s.printf("RegisterService(%q)", sd.ServiceName)
 	if s.serve {
-		grpclog.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
+		logger.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
 	}
-	if _, ok := s.m[sd.ServiceName]; ok {
-		grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
+	if _, ok := s.services[sd.ServiceName]; ok {
+		logger.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
 	}
-	srv := &service{
-		server: ss,
-		md:     make(map[string]*MethodDesc),
-		sd:     make(map[string]*StreamDesc),
-		mdata:  sd.Metadata,
+	info := &serviceInfo{
+		serviceImpl: ss,
+		methods:     make(map[string]*MethodDesc),
+		streams:     make(map[string]*StreamDesc),
+		mdata:       sd.Metadata,
 	}
 	for i := range sd.Methods {
 		d := &sd.Methods[i]
-		srv.md[d.MethodName] = d
+		info.methods[d.MethodName] = d
 	}
 	for i := range sd.Streams {
 		d := &sd.Streams[i]
-		srv.sd[d.StreamName] = d
+		info.streams[d.StreamName] = d
 	}
-	s.m[sd.ServiceName] = srv
+	s.services[sd.ServiceName] = info
 }
 
 // MethodInfo contains the information of an RPC including its method name and type.
@@ -516,16 +681,16 @@
 // Service names include the package names, in the form of <package>.<service>.
 func (s *Server) GetServiceInfo() map[string]ServiceInfo {
 	ret := make(map[string]ServiceInfo)
-	for n, srv := range s.m {
-		methods := make([]MethodInfo, 0, len(srv.md)+len(srv.sd))
-		for m := range srv.md {
+	for n, srv := range s.services {
+		methods := make([]MethodInfo, 0, len(srv.methods)+len(srv.streams))
+		for m := range srv.methods {
 			methods = append(methods, MethodInfo{
 				Name:           m,
 				IsClientStream: false,
 				IsServerStream: false,
 			})
 		}
-		for m, d := range srv.sd {
+		for m, d := range srv.streams {
 			methods = append(methods, MethodInfo{
 				Name:           m,
 				IsClientStream: d.ClientStreams,
@@ -545,13 +710,6 @@
 // the server being stopped.
 var ErrServerStopped = errors.New("grpc: the server has been stopped")
 
-func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
-	if s.opts.creds == nil {
-		return rawConn, nil, nil
-	}
-	return s.opts.creds.ServerHandshake(rawConn)
-}
-
 type listenSocket struct {
 	net.Listener
 	channelzID int64
@@ -660,7 +818,7 @@
 		// s.conns before this conn can be added.
 		s.serveWG.Add(1)
 		go func() {
-			s.handleRawConn(rawConn)
+			s.handleRawConn(lis.Addr().String(), rawConn)
 			s.serveWG.Done()
 		}()
 	}
@@ -668,49 +826,45 @@
 
 // handleRawConn forks a goroutine to handle a just-accepted connection that
 // has not had any I/O performed on it yet.
-func (s *Server) handleRawConn(rawConn net.Conn) {
+func (s *Server) handleRawConn(lisAddr string, rawConn net.Conn) {
 	if s.quit.HasFired() {
 		rawConn.Close()
 		return
 	}
 	rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
-	conn, authInfo, err := s.useTransportAuthenticator(rawConn)
-	if err != nil {
-		// ErrConnDispatched means that the connection was dispatched away from
-		// gRPC; those connections should be left open.
-		if err != credentials.ErrConnDispatched {
-			s.mu.Lock()
-			s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
-			s.mu.Unlock()
-			channelz.Warningf(s.channelzID, "grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
-			rawConn.Close()
-		}
-		rawConn.SetDeadline(time.Time{})
-		return
-	}
 
 	// Finish handshaking (HTTP2)
-	st := s.newHTTP2Transport(conn, authInfo)
+	st := s.newHTTP2Transport(rawConn)
+	rawConn.SetDeadline(time.Time{})
 	if st == nil {
 		return
 	}
 
-	rawConn.SetDeadline(time.Time{})
-	if !s.addConn(st) {
+	if !s.addConn(lisAddr, st) {
 		return
 	}
 	go func() {
 		s.serveStreams(st)
-		s.removeConn(st)
+		s.removeConn(lisAddr, st)
 	}()
 }
 
+func (s *Server) drainServerTransports(addr string) {
+	s.mu.Lock()
+	conns := s.conns[addr]
+	for st := range conns {
+		st.Drain()
+	}
+	s.mu.Unlock()
+}
+
 // newHTTP2Transport sets up a http/2 transport (using the
 // gRPC http2 server transport in transport/http2_server.go).
-func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) transport.ServerTransport {
+func (s *Server) newHTTP2Transport(c net.Conn) transport.ServerTransport {
 	config := &transport.ServerConfig{
 		MaxStreams:            s.opts.maxConcurrentStreams,
-		AuthInfo:              authInfo,
+		ConnectionTimeout:     s.opts.connectionTimeout,
+		Credentials:           s.opts.creds,
 		InTapHandle:           s.opts.inTapHandle,
 		StatsHandler:          s.opts.statsHandler,
 		KeepaliveParams:       s.opts.keepaliveParams,
@@ -723,13 +877,20 @@
 		MaxHeaderListSize:     s.opts.maxHeaderListSize,
 		HeaderTableSize:       s.opts.headerTableSize,
 	}
-	st, err := transport.NewServerTransport("http2", c, config)
+	st, err := transport.NewServerTransport(c, config)
 	if err != nil {
 		s.mu.Lock()
 		s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
 		s.mu.Unlock()
-		c.Close()
-		channelz.Warning(s.channelzID, "grpc: Server.Serve failed to create ServerTransport: ", err)
+		// ErrConnDispatched means that the connection was dispatched away from
+		// gRPC; those connections should be left open.
+		if err != credentials.ErrConnDispatched {
+			// Don't log on ErrConnDispatched and io.EOF to prevent log spam.
+			if err != io.EOF {
+				channelz.Warning(logger, s.channelzID, "grpc: Server.Serve failed to create ServerTransport: ", err)
+			}
+			c.Close()
+		}
 		return nil
 	}
 
@@ -739,12 +900,27 @@
 func (s *Server) serveStreams(st transport.ServerTransport) {
 	defer st.Close()
 	var wg sync.WaitGroup
+
+	var roundRobinCounter uint32
 	st.HandleStreams(func(stream *transport.Stream) {
 		wg.Add(1)
-		go func() {
-			defer wg.Done()
-			s.handleStream(st, stream, s.traceInfo(st, stream))
-		}()
+		if s.opts.numServerWorkers > 0 {
+			data := &serverWorkerData{st: st, wg: &wg, stream: stream}
+			select {
+			case s.serverWorkerChannels[atomic.AddUint32(&roundRobinCounter, 1)%s.opts.numServerWorkers] <- data:
+			default:
+				// If all stream workers are busy, fallback to the default code path.
+				go func() {
+					s.handleStream(st, stream, s.traceInfo(st, stream))
+					wg.Done()
+				}()
+			}
+		} else {
+			go func() {
+				defer wg.Done()
+				s.handleStream(st, stream, s.traceInfo(st, stream))
+			}()
+		}
 	}, func(ctx context.Context, method string) context.Context {
 		if !EnableTracing {
 			return ctx
@@ -779,18 +955,22 @@
 // Note that ServeHTTP uses Go's HTTP/2 server implementation which is totally
 // separate from grpc-go's HTTP/2 server. Performance and features may vary
 // between the two paths. ServeHTTP does not support some gRPC features
-// available through grpc-go's HTTP/2 server, and it is currently EXPERIMENTAL
-// and subject to change.
+// available through grpc-go's HTTP/2 server.
+//
+// Experimental
+//
+// Notice: This API is EXPERIMENTAL and may be changed or removed in a
+// later release.
 func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 	st, err := transport.NewServerHandlerTransport(w, r, s.opts.statsHandler)
 	if err != nil {
 		http.Error(w, err.Error(), http.StatusInternalServerError)
 		return
 	}
-	if !s.addConn(st) {
+	if !s.addConn(listenerAddressForServeHTTP, st) {
 		return
 	}
-	defer s.removeConn(st)
+	defer s.removeConn(listenerAddressForServeHTTP, st)
 	s.serveStreams(st)
 }
 
@@ -818,7 +998,7 @@
 	return trInfo
 }
 
-func (s *Server) addConn(st transport.ServerTransport) bool {
+func (s *Server) addConn(addr string, st transport.ServerTransport) bool {
 	s.mu.Lock()
 	defer s.mu.Unlock()
 	if s.conns == nil {
@@ -830,15 +1010,28 @@
 		// immediately.
 		st.Drain()
 	}
-	s.conns[st] = true
+
+	if s.conns[addr] == nil {
+		// Create a map entry if this is the first connection on this listener.
+		s.conns[addr] = make(map[transport.ServerTransport]bool)
+	}
+	s.conns[addr][st] = true
 	return true
 }
 
-func (s *Server) removeConn(st transport.ServerTransport) {
+func (s *Server) removeConn(addr string, st transport.ServerTransport) {
 	s.mu.Lock()
 	defer s.mu.Unlock()
-	if s.conns != nil {
-		delete(s.conns, st)
+
+	conns := s.conns[addr]
+	if conns != nil {
+		delete(conns, st)
+		if len(conns) == 0 {
+			// If the last connection for this address is being removed, also
+			// remove the map entry corresponding to the address. This is used
+			// in GracefulStop() when waiting for all connections to be closed.
+			delete(s.conns, addr)
+		}
 		s.cv.Broadcast()
 	}
 }
@@ -868,12 +1061,12 @@
 func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error {
 	data, err := encode(s.getCodec(stream.ContentSubtype()), msg)
 	if err != nil {
-		channelz.Error(s.channelzID, "grpc: server failed to encode response: ", err)
+		channelz.Error(logger, s.channelzID, "grpc: server failed to encode response: ", err)
 		return err
 	}
 	compData, err := compress(data, cp, comp)
 	if err != nil {
-		channelz.Error(s.channelzID, "grpc: server failed to compress response: ", err)
+		channelz.Error(logger, s.channelzID, "grpc: server failed to compress response: ", err)
 		return err
 	}
 	hdr, payload := msgHeader(data, compData)
@@ -903,26 +1096,33 @@
 	} else if len(interceptors) == 1 {
 		chainedInt = interceptors[0]
 	} else {
-		chainedInt = func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (interface{}, error) {
-			return interceptors[0](ctx, req, info, getChainUnaryHandler(interceptors, 0, info, handler))
-		}
+		chainedInt = chainUnaryInterceptors(interceptors)
 	}
 
 	s.opts.unaryInt = chainedInt
 }
 
-// getChainUnaryHandler recursively generate the chained UnaryHandler
-func getChainUnaryHandler(interceptors []UnaryServerInterceptor, curr int, info *UnaryServerInfo, finalHandler UnaryHandler) UnaryHandler {
-	if curr == len(interceptors)-1 {
-		return finalHandler
-	}
-
-	return func(ctx context.Context, req interface{}) (interface{}, error) {
-		return interceptors[curr+1](ctx, req, info, getChainUnaryHandler(interceptors, curr+1, info, finalHandler))
+func chainUnaryInterceptors(interceptors []UnaryServerInterceptor) UnaryServerInterceptor {
+	return func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (interface{}, error) {
+		// the struct ensures the variables are allocated together, rather than separately, since we
+		// know they should be garbage collected together. This saves 1 allocation and decreases
+		// time/call by about 10% on the microbenchmark.
+		var state struct {
+			i    int
+			next UnaryHandler
+		}
+		state.next = func(ctx context.Context, req interface{}) (interface{}, error) {
+			if state.i == len(interceptors)-1 {
+				return interceptors[state.i](ctx, req, info, handler)
+			}
+			state.i++
+			return interceptors[state.i-1](ctx, req, info, state.next)
+		}
+		return state.next(ctx, req)
 	}
 }
 
-func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) {
+func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, md *MethodDesc, trInfo *traceInfo) (err error) {
 	sh := s.opts.statsHandler
 	if sh != nil || trInfo != nil || channelz.IsOn() {
 		if channelz.IsOn() {
@@ -932,7 +1132,9 @@
 		if sh != nil {
 			beginTime := time.Now()
 			statsBegin = &stats.Begin{
-				BeginTime: beginTime,
+				BeginTime:      beginTime,
+				IsClientStream: false,
+				IsServerStream: false,
 			}
 			sh.HandleRPC(stream.Context(), statsBegin)
 		}
@@ -1045,10 +1247,8 @@
 	}
 	d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)
 	if err != nil {
-		if st, ok := status.FromError(err); ok {
-			if e := t.WriteStatus(stream, st); e != nil {
-				channelz.Warningf(s.channelzID, "grpc: Server.processUnaryRPC failed to write status %v", e)
-			}
+		if e := t.WriteStatus(stream, status.Convert(err)); e != nil {
+			channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status %v", e)
 		}
 		return err
 	}
@@ -1063,7 +1263,7 @@
 			sh.HandleRPC(stream.Context(), &stats.InPayload{
 				RecvTime:   time.Now(),
 				Payload:    v,
-				WireLength: payInfo.wireLength,
+				WireLength: payInfo.wireLength + headerLen,
 				Data:       d,
 				Length:     len(d),
 			})
@@ -1079,7 +1279,7 @@
 		return nil
 	}
 	ctx := NewContextWithServerTransportStream(stream.Context(), stream)
-	reply, appErr := md.Handler(srv.server, ctx, df, s.opts.unaryInt)
+	reply, appErr := md.Handler(info.serviceImpl, ctx, df, s.opts.unaryInt)
 	if appErr != nil {
 		appStatus, ok := status.FromError(appErr)
 		if !ok {
@@ -1092,7 +1292,7 @@
 			trInfo.tr.SetError()
 		}
 		if e := t.WriteStatus(stream, appStatus); e != nil {
-			channelz.Warningf(s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
+			channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
 		}
 		if binlog != nil {
 			if h, _ := stream.Header(); h.Len() > 0 {
@@ -1121,7 +1321,7 @@
 		}
 		if sts, ok := status.FromError(err); ok {
 			if e := t.WriteStatus(stream, sts); e != nil {
-				channelz.Warningf(s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
+				channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
 			}
 		} else {
 			switch st := err.(type) {
@@ -1186,26 +1386,33 @@
 	} else if len(interceptors) == 1 {
 		chainedInt = interceptors[0]
 	} else {
-		chainedInt = func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error {
-			return interceptors[0](srv, ss, info, getChainStreamHandler(interceptors, 0, info, handler))
-		}
+		chainedInt = chainStreamInterceptors(interceptors)
 	}
 
 	s.opts.streamInt = chainedInt
 }
 
-// getChainStreamHandler recursively generate the chained StreamHandler
-func getChainStreamHandler(interceptors []StreamServerInterceptor, curr int, info *StreamServerInfo, finalHandler StreamHandler) StreamHandler {
-	if curr == len(interceptors)-1 {
-		return finalHandler
-	}
-
-	return func(srv interface{}, ss ServerStream) error {
-		return interceptors[curr+1](srv, ss, info, getChainStreamHandler(interceptors, curr+1, info, finalHandler))
+func chainStreamInterceptors(interceptors []StreamServerInterceptor) StreamServerInterceptor {
+	return func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error {
+		// the struct ensures the variables are allocated together, rather than separately, since we
+		// know they should be garbage collected together. This saves 1 allocation and decreases
+		// time/call by about 10% on the microbenchmark.
+		var state struct {
+			i    int
+			next StreamHandler
+		}
+		state.next = func(srv interface{}, ss ServerStream) error {
+			if state.i == len(interceptors)-1 {
+				return interceptors[state.i](srv, ss, info, handler)
+			}
+			state.i++
+			return interceptors[state.i-1](srv, ss, info, state.next)
+		}
+		return state.next(srv, ss)
 	}
 }
 
-func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) {
+func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, sd *StreamDesc, trInfo *traceInfo) (err error) {
 	if channelz.IsOn() {
 		s.incrCallsStarted()
 	}
@@ -1214,7 +1421,9 @@
 	if sh != nil {
 		beginTime := time.Now()
 		statsBegin = &stats.Begin{
-			BeginTime: beginTime,
+			BeginTime:      beginTime,
+			IsClientStream: sd.ClientStreams,
+			IsServerStream: sd.ServerStreams,
 		}
 		sh.HandleRPC(stream.Context(), statsBegin)
 	}
@@ -1317,13 +1526,15 @@
 		}
 	}
 
+	ss.ctx = newContextWithRPCInfo(ss.ctx, false, ss.codec, ss.cp, ss.comp)
+
 	if trInfo != nil {
 		trInfo.tr.LazyLog(&trInfo.firstLine, false)
 	}
 	var appErr error
 	var server interface{}
-	if srv != nil {
-		server = srv.server
+	if info != nil {
+		server = info.serviceImpl
 	}
 	if s.opts.streamInt == nil {
 		appErr = sd.Handler(server, ss)
@@ -1384,12 +1595,12 @@
 			trInfo.tr.SetError()
 		}
 		errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
-		if err := t.WriteStatus(stream, status.New(codes.ResourceExhausted, errDesc)); err != nil {
+		if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
 			if trInfo != nil {
 				trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
 				trInfo.tr.SetError()
 			}
-			channelz.Warningf(s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
+			channelz.Warningf(logger, s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
 		}
 		if trInfo != nil {
 			trInfo.tr.Finish()
@@ -1399,13 +1610,13 @@
 	service := sm[:pos]
 	method := sm[pos+1:]
 
-	srv, knownService := s.m[service]
+	srv, knownService := s.services[service]
 	if knownService {
-		if md, ok := srv.md[method]; ok {
+		if md, ok := srv.methods[method]; ok {
 			s.processUnaryRPC(t, stream, srv, md, trInfo)
 			return
 		}
-		if sd, ok := srv.sd[method]; ok {
+		if sd, ok := srv.streams[method]; ok {
 			s.processStreamingRPC(t, stream, srv, sd, trInfo)
 			return
 		}
@@ -1430,7 +1641,7 @@
 			trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
 			trInfo.tr.SetError()
 		}
-		channelz.Warningf(s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
+		channelz.Warningf(logger, s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
 	}
 	if trInfo != nil {
 		trInfo.tr.Finish()
@@ -1443,7 +1654,10 @@
 // NewContextWithServerTransportStream creates a new context from ctx and
 // attaches stream to it.
 //
-// This API is EXPERIMENTAL.
+// Experimental
+//
+// Notice: This API is EXPERIMENTAL and may be changed or removed in a
+// later release.
 func NewContextWithServerTransportStream(ctx context.Context, stream ServerTransportStream) context.Context {
 	return context.WithValue(ctx, streamKey{}, stream)
 }
@@ -1455,7 +1669,10 @@
 //
 // See also NewContextWithServerTransportStream.
 //
-// This API is EXPERIMENTAL.
+// Experimental
+//
+// Notice: This type is EXPERIMENTAL and may be changed or removed in a
+// later release.
 type ServerTransportStream interface {
 	Method() string
 	SetHeader(md metadata.MD) error
@@ -1467,7 +1684,10 @@
 // ctx. Returns nil if the given context has no stream associated with it
 // (which implies it is not an RPC invocation context).
 //
-// This API is EXPERIMENTAL.
+// Experimental
+//
+// Notice: This API is EXPERIMENTAL and may be changed or removed in a
+// later release.
 func ServerTransportStreamFromContext(ctx context.Context) ServerTransportStream {
 	s, _ := ctx.Value(streamKey{}).(ServerTransportStream)
 	return s
@@ -1495,7 +1715,7 @@
 	s.mu.Lock()
 	listeners := s.lis
 	s.lis = nil
-	st := s.conns
+	conns := s.conns
 	s.conns = nil
 	// interrupt GracefulStop if Stop and GracefulStop are called concurrently.
 	s.cv.Broadcast()
@@ -1504,8 +1724,13 @@
 	for lis := range listeners {
 		lis.Close()
 	}
-	for c := range st {
-		c.Close()
+	for _, cs := range conns {
+		for st := range cs {
+			st.Close()
+		}
+	}
+	if s.opts.numServerWorkers > 0 {
+		s.stopServerWorkers()
 	}
 
 	s.mu.Lock()
@@ -1539,8 +1764,10 @@
 	}
 	s.lis = nil
 	if !s.drain {
-		for st := range s.conns {
-			st.Drain()
+		for _, conns := range s.conns {
+			for st := range conns {
+				st.Drain()
+			}
 		}
 		s.drain = true
 	}