VOL-1691 Fix openolt adapter getting stuck while registartion with core

Change-Id: Ide8131f325bc15f1b909e14d7af6ee9bcd6b3b5b
diff --git a/vendor/google.golang.org/grpc/server.go b/vendor/google.golang.org/grpc/server.go
index 33272a4..1766136 100644
--- a/vendor/google.golang.org/grpc/server.go
+++ b/vendor/google.golang.org/grpc/server.go
@@ -86,11 +86,11 @@
 
 // Server is a gRPC server to serve RPC requests.
 type Server struct {
-	opts options
+	opts serverOptions
 
 	mu     sync.Mutex // guards following
 	lis    map[net.Listener]bool
-	conns  map[io.Closer]bool
+	conns  map[transport.ServerTransport]bool
 	serve  bool
 	drain  bool
 	cv     *sync.Cond          // signaled when connections close for GracefulStop
@@ -108,7 +108,7 @@
 	czData     *channelzData
 }
 
-type options struct {
+type serverOptions struct {
 	creds                 credentials.TransportCredentials
 	codec                 baseCodec
 	cp                    Compressor
@@ -131,7 +131,7 @@
 	maxHeaderListSize     *uint32
 }
 
-var defaultServerOptions = options{
+var defaultServerOptions = serverOptions{
 	maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
 	maxSendMessageSize:    defaultServerMaxSendMessageSize,
 	connectionTimeout:     120 * time.Second,
@@ -140,7 +140,33 @@
 }
 
 // A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
-type ServerOption func(*options)
+type ServerOption interface {
+	apply(*serverOptions)
+}
+
+// EmptyServerOption does not alter the server configuration. It can be embedded
+// in another structure to build custom server options.
+//
+// This API is EXPERIMENTAL.
+type EmptyServerOption struct{}
+
+func (EmptyServerOption) apply(*serverOptions) {}
+
+// funcServerOption wraps a function that modifies serverOptions into an
+// implementation of the ServerOption interface.
+type funcServerOption struct {
+	f func(*serverOptions)
+}
+
+func (fdo *funcServerOption) apply(do *serverOptions) {
+	fdo.f(do)
+}
+
+func newFuncServerOption(f func(*serverOptions)) *funcServerOption {
+	return &funcServerOption{
+		f: f,
+	}
+}
 
 // WriteBufferSize determines how much data can be batched before doing a write on the wire.
 // The corresponding memory allocation for this buffer will be twice the size to keep syscalls low.
@@ -148,9 +174,9 @@
 // Zero will disable the write buffer such that each write will be on underlying connection.
 // Note: A Send call may not directly translate to a write.
 func WriteBufferSize(s int) ServerOption {
-	return func(o *options) {
+	return newFuncServerOption(func(o *serverOptions) {
 		o.writeBufferSize = s
-	}
+	})
 }
 
 // ReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most
@@ -159,25 +185,25 @@
 // Zero will disable read buffer for a connection so data framer can access the underlying
 // conn directly.
 func ReadBufferSize(s int) ServerOption {
-	return func(o *options) {
+	return newFuncServerOption(func(o *serverOptions) {
 		o.readBufferSize = s
-	}
+	})
 }
 
 // InitialWindowSize returns a ServerOption that sets window size for stream.
 // The lower bound for window size is 64K and any value smaller than that will be ignored.
 func InitialWindowSize(s int32) ServerOption {
-	return func(o *options) {
+	return newFuncServerOption(func(o *serverOptions) {
 		o.initialWindowSize = s
-	}
+	})
 }
 
 // InitialConnWindowSize returns a ServerOption that sets window size for a connection.
 // The lower bound for window size is 64K and any value smaller than that will be ignored.
 func InitialConnWindowSize(s int32) ServerOption {
-	return func(o *options) {
+	return newFuncServerOption(func(o *serverOptions) {
 		o.initialConnWindowSize = s
-	}
+	})
 }
 
 // KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
@@ -187,25 +213,25 @@
 		kp.Time = time.Second
 	}
 
-	return func(o *options) {
+	return newFuncServerOption(func(o *serverOptions) {
 		o.keepaliveParams = kp
-	}
+	})
 }
 
 // KeepaliveEnforcementPolicy returns a ServerOption that sets keepalive enforcement policy for the server.
 func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption {
-	return func(o *options) {
+	return newFuncServerOption(func(o *serverOptions) {
 		o.keepalivePolicy = kep
-	}
+	})
 }
 
 // CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
 //
 // This will override any lookups by content-subtype for Codecs registered with RegisterCodec.
 func CustomCodec(codec Codec) ServerOption {
-	return func(o *options) {
+	return newFuncServerOption(func(o *serverOptions) {
 		o.codec = codec
-	}
+	})
 }
 
 // RPCCompressor returns a ServerOption that sets a compressor for outbound
@@ -216,9 +242,9 @@
 //
 // Deprecated: use encoding.RegisterCompressor instead.
 func RPCCompressor(cp Compressor) ServerOption {
-	return func(o *options) {
+	return newFuncServerOption(func(o *serverOptions) {
 		o.cp = cp
-	}
+	})
 }
 
 // RPCDecompressor returns a ServerOption that sets a decompressor for inbound
@@ -227,9 +253,9 @@
 //
 // Deprecated: use encoding.RegisterCompressor instead.
 func RPCDecompressor(dc Decompressor) ServerOption {
-	return func(o *options) {
+	return newFuncServerOption(func(o *serverOptions) {
 		o.dc = dc
-	}
+	})
 }
 
 // MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
@@ -243,73 +269,73 @@
 // MaxRecvMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
 // If this is not set, gRPC uses the default 4MB.
 func MaxRecvMsgSize(m int) ServerOption {
-	return func(o *options) {
+	return newFuncServerOption(func(o *serverOptions) {
 		o.maxReceiveMessageSize = m
-	}
+	})
 }
 
 // MaxSendMsgSize returns a ServerOption to set the max message size in bytes the server can send.
 // If this is not set, gRPC uses the default `math.MaxInt32`.
 func MaxSendMsgSize(m int) ServerOption {
-	return func(o *options) {
+	return newFuncServerOption(func(o *serverOptions) {
 		o.maxSendMessageSize = m
-	}
+	})
 }
 
 // MaxConcurrentStreams returns a ServerOption that will apply a limit on the number
 // of concurrent streams to each ServerTransport.
 func MaxConcurrentStreams(n uint32) ServerOption {
-	return func(o *options) {
+	return newFuncServerOption(func(o *serverOptions) {
 		o.maxConcurrentStreams = n
-	}
+	})
 }
 
 // Creds returns a ServerOption that sets credentials for server connections.
 func Creds(c credentials.TransportCredentials) ServerOption {
-	return func(o *options) {
+	return newFuncServerOption(func(o *serverOptions) {
 		o.creds = c
-	}
+	})
 }
 
 // UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the
 // server. Only one unary interceptor can be installed. The construction of multiple
 // interceptors (e.g., chaining) can be implemented at the caller.
 func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
-	return func(o *options) {
+	return newFuncServerOption(func(o *serverOptions) {
 		if o.unaryInt != nil {
 			panic("The unary server interceptor was already set and may not be reset.")
 		}
 		o.unaryInt = i
-	}
+	})
 }
 
 // StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the
 // server. Only one stream interceptor can be installed.
 func StreamInterceptor(i StreamServerInterceptor) ServerOption {
-	return func(o *options) {
+	return newFuncServerOption(func(o *serverOptions) {
 		if o.streamInt != nil {
 			panic("The stream server interceptor was already set and may not be reset.")
 		}
 		o.streamInt = i
-	}
+	})
 }
 
 // InTapHandle returns a ServerOption that sets the tap handle for all the server
 // transport to be created. Only one can be installed.
 func InTapHandle(h tap.ServerInHandle) ServerOption {
-	return func(o *options) {
+	return newFuncServerOption(func(o *serverOptions) {
 		if o.inTapHandle != nil {
 			panic("The tap handle was already set and may not be reset.")
 		}
 		o.inTapHandle = h
-	}
+	})
 }
 
 // StatsHandler returns a ServerOption that sets the stats handler for the server.
 func StatsHandler(h stats.Handler) ServerOption {
-	return func(o *options) {
+	return newFuncServerOption(func(o *serverOptions) {
 		o.statsHandler = h
-	}
+	})
 }
 
 // UnknownServiceHandler returns a ServerOption that allows for adding a custom
@@ -319,7 +345,7 @@
 // The handling function has full access to the Context of the request and the
 // stream, and the invocation bypasses interceptors.
 func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
-	return func(o *options) {
+	return newFuncServerOption(func(o *serverOptions) {
 		o.unknownStreamDesc = &StreamDesc{
 			StreamName: "unknown_service_handler",
 			Handler:    streamHandler,
@@ -327,7 +353,7 @@
 			ClientStreams: true,
 			ServerStreams: true,
 		}
-	}
+	})
 }
 
 // ConnectionTimeout returns a ServerOption that sets the timeout for
@@ -337,17 +363,17 @@
 //
 // This API is EXPERIMENTAL.
 func ConnectionTimeout(d time.Duration) ServerOption {
-	return func(o *options) {
+	return newFuncServerOption(func(o *serverOptions) {
 		o.connectionTimeout = d
-	}
+	})
 }
 
 // MaxHeaderListSize returns a ServerOption that sets the max (uncompressed) size
 // of header list that the server is prepared to accept.
 func MaxHeaderListSize(s uint32) ServerOption {
-	return func(o *options) {
+	return newFuncServerOption(func(o *serverOptions) {
 		o.maxHeaderListSize = &s
-	}
+	})
 }
 
 // NewServer creates a gRPC server which has no service registered and has not
@@ -355,12 +381,12 @@
 func NewServer(opt ...ServerOption) *Server {
 	opts := defaultServerOptions
 	for _, o := range opt {
-		o(&opts)
+		o.apply(&opts)
 	}
 	s := &Server{
 		lis:    make(map[net.Listener]bool),
 		opts:   opts,
-		conns:  make(map[io.Closer]bool),
+		conns:  make(map[transport.ServerTransport]bool),
 		m:      make(map[string]*service),
 		quit:   make(chan struct{}),
 		done:   make(chan struct{}),
@@ -614,12 +640,13 @@
 	rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
 	conn, authInfo, err := s.useTransportAuthenticator(rawConn)
 	if err != nil {
-		s.mu.Lock()
-		s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
-		s.mu.Unlock()
-		grpclog.Warningf("grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
-		// If serverHandshake returns ErrConnDispatched, keep rawConn open.
+		// ErrConnDispatched means that the connection was dispatched away from
+		// gRPC; those connections should be left open.
 		if err != credentials.ErrConnDispatched {
+			s.mu.Lock()
+			s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
+			s.mu.Unlock()
+			grpclog.Warningf("grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
 			rawConn.Close()
 		}
 		rawConn.SetDeadline(time.Time{})
@@ -748,37 +775,38 @@
 
 	trInfo = &traceInfo{
 		tr: tr,
+		firstLine: firstLine{
+			client:     false,
+			remoteAddr: st.RemoteAddr(),
+		},
 	}
-	trInfo.firstLine.client = false
-	trInfo.firstLine.remoteAddr = st.RemoteAddr()
-
 	if dl, ok := stream.Context().Deadline(); ok {
 		trInfo.firstLine.deadline = time.Until(dl)
 	}
 	return trInfo
 }
 
-func (s *Server) addConn(c io.Closer) bool {
+func (s *Server) addConn(st transport.ServerTransport) bool {
 	s.mu.Lock()
 	defer s.mu.Unlock()
 	if s.conns == nil {
-		c.Close()
+		st.Close()
 		return false
 	}
 	if s.drain {
 		// Transport added after we drained our existing conns: drain it
 		// immediately.
-		c.(transport.ServerTransport).Drain()
+		st.Drain()
 	}
-	s.conns[c] = true
+	s.conns[st] = true
 	return true
 }
 
-func (s *Server) removeConn(c io.Closer) {
+func (s *Server) removeConn(st transport.ServerTransport) {
 	s.mu.Lock()
 	defer s.mu.Unlock()
 	if s.conns != nil {
-		delete(s.conns, c)
+		delete(s.conns, st)
 		s.cv.Broadcast()
 	}
 }
@@ -859,7 +887,6 @@
 	}
 	if trInfo != nil {
 		defer trInfo.tr.Finish()
-		trInfo.firstLine.client = false
 		trInfo.tr.LazyLog(&trInfo.firstLine, false)
 		defer func() {
 			if err != nil && err != io.EOF {
@@ -1245,7 +1272,8 @@
 	service := sm[:pos]
 	method := sm[pos+1:]
 
-	if srv, ok := s.m[service]; ok {
+	srv, knownService := s.m[service]
+	if knownService {
 		if md, ok := srv.md[method]; ok {
 			s.processUnaryRPC(t, stream, srv, md, trInfo)
 			return
@@ -1260,11 +1288,16 @@
 		s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
 		return
 	}
+	var errDesc string
+	if !knownService {
+		errDesc = fmt.Sprintf("unknown service %v", service)
+	} else {
+		errDesc = fmt.Sprintf("unknown method %v for service %v", method, service)
+	}
 	if trInfo != nil {
-		trInfo.tr.LazyLog(&fmtStringer{"Unknown service %v", []interface{}{service}}, true)
+		trInfo.tr.LazyPrintf("%s", errDesc)
 		trInfo.tr.SetError()
 	}
-	errDesc := fmt.Sprintf("unknown service %v", service)
 	if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
 		if trInfo != nil {
 			trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
@@ -1390,8 +1423,8 @@
 	}
 	s.lis = nil
 	if !s.drain {
-		for c := range s.conns {
-			c.(transport.ServerTransport).Drain()
+		for st := range s.conns {
+			st.Drain()
 		}
 		s.drain = true
 	}