VOL-2112 move to voltha-lib-go
Change-Id: Ic1af08003c1d2c698c0cce371e64f47b47b8d875
diff --git a/vendor/google.golang.org/grpc/server.go b/vendor/google.golang.org/grpc/server.go
index 33272a4..f064b73 100644
--- a/vendor/google.golang.org/grpc/server.go
+++ b/vendor/google.golang.org/grpc/server.go
@@ -42,6 +42,7 @@
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/binarylog"
"google.golang.org/grpc/internal/channelz"
+ "google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
@@ -56,6 +57,8 @@
defaultServerMaxSendMessageSize = math.MaxInt32
)
+var statusOK = status.New(codes.OK, "")
+
type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error)
// MethodDesc represents an RPC service's method specification.
@@ -86,21 +89,19 @@
// Server is a gRPC server to serve RPC requests.
type Server struct {
- opts options
+ opts serverOptions
mu sync.Mutex // guards following
lis map[net.Listener]bool
- conns map[io.Closer]bool
+ conns map[transport.ServerTransport]bool
serve bool
drain bool
cv *sync.Cond // signaled when connections close for GracefulStop
m map[string]*service // service name -> service info
events trace.EventLog
- quit chan struct{}
- done chan struct{}
- quitOnce sync.Once
- doneOnce sync.Once
+ quit *grpcsync.Event
+ done *grpcsync.Event
channelzRemoveOnce sync.Once
serveWG sync.WaitGroup // counts active Serve goroutines for GracefulStop
@@ -108,7 +109,7 @@
czData *channelzData
}
-type options struct {
+type serverOptions struct {
creds credentials.TransportCredentials
codec baseCodec
cp Compressor
@@ -131,7 +132,7 @@
maxHeaderListSize *uint32
}
-var defaultServerOptions = options{
+var defaultServerOptions = serverOptions{
maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
maxSendMessageSize: defaultServerMaxSendMessageSize,
connectionTimeout: 120 * time.Second,
@@ -140,7 +141,33 @@
}
// A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
-type ServerOption func(*options)
+type ServerOption interface {
+ apply(*serverOptions)
+}
+
+// EmptyServerOption does not alter the server configuration. It can be embedded
+// in another structure to build custom server options.
+//
+// This API is EXPERIMENTAL.
+type EmptyServerOption struct{}
+
+func (EmptyServerOption) apply(*serverOptions) {}
+
+// funcServerOption wraps a function that modifies serverOptions into an
+// implementation of the ServerOption interface.
+type funcServerOption struct {
+ f func(*serverOptions)
+}
+
+func (fdo *funcServerOption) apply(do *serverOptions) {
+ fdo.f(do)
+}
+
+func newFuncServerOption(f func(*serverOptions)) *funcServerOption {
+ return &funcServerOption{
+ f: f,
+ }
+}
// WriteBufferSize determines how much data can be batched before doing a write on the wire.
// The corresponding memory allocation for this buffer will be twice the size to keep syscalls low.
@@ -148,9 +175,9 @@
// Zero will disable the write buffer such that each write will be on underlying connection.
// Note: A Send call may not directly translate to a write.
func WriteBufferSize(s int) ServerOption {
- return func(o *options) {
+ return newFuncServerOption(func(o *serverOptions) {
o.writeBufferSize = s
- }
+ })
}
// ReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most
@@ -159,25 +186,25 @@
// Zero will disable read buffer for a connection so data framer can access the underlying
// conn directly.
func ReadBufferSize(s int) ServerOption {
- return func(o *options) {
+ return newFuncServerOption(func(o *serverOptions) {
o.readBufferSize = s
- }
+ })
}
// InitialWindowSize returns a ServerOption that sets window size for stream.
// The lower bound for window size is 64K and any value smaller than that will be ignored.
func InitialWindowSize(s int32) ServerOption {
- return func(o *options) {
+ return newFuncServerOption(func(o *serverOptions) {
o.initialWindowSize = s
- }
+ })
}
// InitialConnWindowSize returns a ServerOption that sets window size for a connection.
// The lower bound for window size is 64K and any value smaller than that will be ignored.
func InitialConnWindowSize(s int32) ServerOption {
- return func(o *options) {
+ return newFuncServerOption(func(o *serverOptions) {
o.initialConnWindowSize = s
- }
+ })
}
// KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
@@ -187,25 +214,25 @@
kp.Time = time.Second
}
- return func(o *options) {
+ return newFuncServerOption(func(o *serverOptions) {
o.keepaliveParams = kp
- }
+ })
}
// KeepaliveEnforcementPolicy returns a ServerOption that sets keepalive enforcement policy for the server.
func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption {
- return func(o *options) {
+ return newFuncServerOption(func(o *serverOptions) {
o.keepalivePolicy = kep
- }
+ })
}
// CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
//
// This will override any lookups by content-subtype for Codecs registered with RegisterCodec.
func CustomCodec(codec Codec) ServerOption {
- return func(o *options) {
+ return newFuncServerOption(func(o *serverOptions) {
o.codec = codec
- }
+ })
}
// RPCCompressor returns a ServerOption that sets a compressor for outbound
@@ -216,9 +243,9 @@
//
// Deprecated: use encoding.RegisterCompressor instead.
func RPCCompressor(cp Compressor) ServerOption {
- return func(o *options) {
+ return newFuncServerOption(func(o *serverOptions) {
o.cp = cp
- }
+ })
}
// RPCDecompressor returns a ServerOption that sets a decompressor for inbound
@@ -227,9 +254,9 @@
//
// Deprecated: use encoding.RegisterCompressor instead.
func RPCDecompressor(dc Decompressor) ServerOption {
- return func(o *options) {
+ return newFuncServerOption(func(o *serverOptions) {
o.dc = dc
- }
+ })
}
// MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
@@ -243,73 +270,73 @@
// MaxRecvMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
// If this is not set, gRPC uses the default 4MB.
func MaxRecvMsgSize(m int) ServerOption {
- return func(o *options) {
+ return newFuncServerOption(func(o *serverOptions) {
o.maxReceiveMessageSize = m
- }
+ })
}
// MaxSendMsgSize returns a ServerOption to set the max message size in bytes the server can send.
// If this is not set, gRPC uses the default `math.MaxInt32`.
func MaxSendMsgSize(m int) ServerOption {
- return func(o *options) {
+ return newFuncServerOption(func(o *serverOptions) {
o.maxSendMessageSize = m
- }
+ })
}
// MaxConcurrentStreams returns a ServerOption that will apply a limit on the number
// of concurrent streams to each ServerTransport.
func MaxConcurrentStreams(n uint32) ServerOption {
- return func(o *options) {
+ return newFuncServerOption(func(o *serverOptions) {
o.maxConcurrentStreams = n
- }
+ })
}
// Creds returns a ServerOption that sets credentials for server connections.
func Creds(c credentials.TransportCredentials) ServerOption {
- return func(o *options) {
+ return newFuncServerOption(func(o *serverOptions) {
o.creds = c
- }
+ })
}
// UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the
// server. Only one unary interceptor can be installed. The construction of multiple
// interceptors (e.g., chaining) can be implemented at the caller.
func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
- return func(o *options) {
+ return newFuncServerOption(func(o *serverOptions) {
if o.unaryInt != nil {
panic("The unary server interceptor was already set and may not be reset.")
}
o.unaryInt = i
- }
+ })
}
// StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the
// server. Only one stream interceptor can be installed.
func StreamInterceptor(i StreamServerInterceptor) ServerOption {
- return func(o *options) {
+ return newFuncServerOption(func(o *serverOptions) {
if o.streamInt != nil {
panic("The stream server interceptor was already set and may not be reset.")
}
o.streamInt = i
- }
+ })
}
// InTapHandle returns a ServerOption that sets the tap handle for all the server
// transport to be created. Only one can be installed.
func InTapHandle(h tap.ServerInHandle) ServerOption {
- return func(o *options) {
+ return newFuncServerOption(func(o *serverOptions) {
if o.inTapHandle != nil {
panic("The tap handle was already set and may not be reset.")
}
o.inTapHandle = h
- }
+ })
}
// StatsHandler returns a ServerOption that sets the stats handler for the server.
func StatsHandler(h stats.Handler) ServerOption {
- return func(o *options) {
+ return newFuncServerOption(func(o *serverOptions) {
o.statsHandler = h
- }
+ })
}
// UnknownServiceHandler returns a ServerOption that allows for adding a custom
@@ -319,7 +346,7 @@
// The handling function has full access to the Context of the request and the
// stream, and the invocation bypasses interceptors.
func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
- return func(o *options) {
+ return newFuncServerOption(func(o *serverOptions) {
o.unknownStreamDesc = &StreamDesc{
StreamName: "unknown_service_handler",
Handler: streamHandler,
@@ -327,7 +354,7 @@
ClientStreams: true,
ServerStreams: true,
}
- }
+ })
}
// ConnectionTimeout returns a ServerOption that sets the timeout for
@@ -337,17 +364,17 @@
//
// This API is EXPERIMENTAL.
func ConnectionTimeout(d time.Duration) ServerOption {
- return func(o *options) {
+ return newFuncServerOption(func(o *serverOptions) {
o.connectionTimeout = d
- }
+ })
}
// MaxHeaderListSize returns a ServerOption that sets the max (uncompressed) size
// of header list that the server is prepared to accept.
func MaxHeaderListSize(s uint32) ServerOption {
- return func(o *options) {
+ return newFuncServerOption(func(o *serverOptions) {
o.maxHeaderListSize = &s
- }
+ })
}
// NewServer creates a gRPC server which has no service registered and has not
@@ -355,15 +382,15 @@
func NewServer(opt ...ServerOption) *Server {
opts := defaultServerOptions
for _, o := range opt {
- o(&opts)
+ o.apply(&opts)
}
s := &Server{
lis: make(map[net.Listener]bool),
opts: opts,
- conns: make(map[io.Closer]bool),
+ conns: make(map[transport.ServerTransport]bool),
m: make(map[string]*service),
- quit: make(chan struct{}),
- done: make(chan struct{}),
+ quit: grpcsync.NewEvent(),
+ done: grpcsync.NewEvent(),
czData: new(channelzData),
}
s.cv = sync.NewCond(&s.mu)
@@ -530,11 +557,9 @@
s.serveWG.Add(1)
defer func() {
s.serveWG.Done()
- select {
- // Stop or GracefulStop called; block until done and return nil.
- case <-s.quit:
- <-s.done
- default:
+ if s.quit.HasFired() {
+ // Stop or GracefulStop called; block until done and return nil.
+ <-s.done.Done()
}
}()
@@ -577,7 +602,7 @@
timer := time.NewTimer(tempDelay)
select {
case <-timer.C:
- case <-s.quit:
+ case <-s.quit.Done():
timer.Stop()
return nil
}
@@ -587,10 +612,8 @@
s.printf("done serving; Accept = %v", err)
s.mu.Unlock()
- select {
- case <-s.quit:
+ if s.quit.HasFired() {
return nil
- default:
}
return err
}
@@ -611,29 +634,26 @@
// handleRawConn forks a goroutine to handle a just-accepted connection that
// has not had any I/O performed on it yet.
func (s *Server) handleRawConn(rawConn net.Conn) {
+ if s.quit.HasFired() {
+ rawConn.Close()
+ return
+ }
rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
conn, authInfo, err := s.useTransportAuthenticator(rawConn)
if err != nil {
- s.mu.Lock()
- s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
- s.mu.Unlock()
- grpclog.Warningf("grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
- // If serverHandshake returns ErrConnDispatched, keep rawConn open.
+ // ErrConnDispatched means that the connection was dispatched away from
+ // gRPC; those connections should be left open.
if err != credentials.ErrConnDispatched {
+ s.mu.Lock()
+ s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
+ s.mu.Unlock()
+ grpclog.Warningf("grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
rawConn.Close()
}
rawConn.SetDeadline(time.Time{})
return
}
- s.mu.Lock()
- if s.conns == nil {
- s.mu.Unlock()
- conn.Close()
- return
- }
- s.mu.Unlock()
-
// Finish handshaking (HTTP2)
st := s.newHTTP2Transport(conn, authInfo)
if st == nil {
@@ -741,6 +761,9 @@
// traceInfo returns a traceInfo and associates it with stream, if tracing is enabled.
// If tracing is not enabled, it returns nil.
func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) {
+ if !EnableTracing {
+ return nil
+ }
tr, ok := trace.FromContext(stream.Context())
if !ok {
return nil
@@ -748,37 +771,38 @@
trInfo = &traceInfo{
tr: tr,
+ firstLine: firstLine{
+ client: false,
+ remoteAddr: st.RemoteAddr(),
+ },
}
- trInfo.firstLine.client = false
- trInfo.firstLine.remoteAddr = st.RemoteAddr()
-
if dl, ok := stream.Context().Deadline(); ok {
trInfo.firstLine.deadline = time.Until(dl)
}
return trInfo
}
-func (s *Server) addConn(c io.Closer) bool {
+func (s *Server) addConn(st transport.ServerTransport) bool {
s.mu.Lock()
defer s.mu.Unlock()
if s.conns == nil {
- c.Close()
+ st.Close()
return false
}
if s.drain {
// Transport added after we drained our existing conns: drain it
// immediately.
- c.(transport.ServerTransport).Drain()
+ st.Drain()
}
- s.conns[c] = true
+ s.conns[st] = true
return true
}
-func (s *Server) removeConn(c io.Closer) {
+func (s *Server) removeConn(st transport.ServerTransport) {
s.mu.Lock()
defer s.mu.Unlock()
if s.conns != nil {
- delete(s.conns, c)
+ delete(s.conns, st)
s.cv.Broadcast()
}
}
@@ -859,7 +883,6 @@
}
if trInfo != nil {
defer trInfo.tr.Finish()
- trInfo.firstLine.client = false
trInfo.tr.LazyLog(&trInfo.firstLine, false)
defer func() {
if err != nil && err != io.EOF {
@@ -951,10 +974,11 @@
}
if sh != nil {
sh.HandleRPC(stream.Context(), &stats.InPayload{
- RecvTime: time.Now(),
- Payload: v,
- Data: d,
- Length: len(d),
+ RecvTime: time.Now(),
+ Payload: v,
+ WireLength: payInfo.wireLength,
+ Data: d,
+ Length: len(d),
})
}
if binlog != nil {
@@ -1050,7 +1074,7 @@
// TODO: Should we be logging if writing status failed here, like above?
// Should the logging be in WriteStatus? Should we ignore the WriteStatus
// error or allow the stats handler to see it?
- err = t.WriteStatus(stream, status.New(codes.OK, ""))
+ err = t.WriteStatus(stream, statusOK)
if binlog != nil {
binlog.Log(&binarylog.ServerTrailer{
Trailer: stream.Trailer(),
@@ -1208,7 +1232,7 @@
ss.trInfo.tr.LazyLog(stringer("OK"), false)
ss.mu.Unlock()
}
- err = t.WriteStatus(ss.s, status.New(codes.OK, ""))
+ err = t.WriteStatus(ss.s, statusOK)
if ss.binlog != nil {
ss.binlog.Log(&binarylog.ServerTrailer{
Trailer: ss.s.Trailer(),
@@ -1245,7 +1269,8 @@
service := sm[:pos]
method := sm[pos+1:]
- if srv, ok := s.m[service]; ok {
+ srv, knownService := s.m[service]
+ if knownService {
if md, ok := srv.md[method]; ok {
s.processUnaryRPC(t, stream, srv, md, trInfo)
return
@@ -1260,11 +1285,16 @@
s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
return
}
+ var errDesc string
+ if !knownService {
+ errDesc = fmt.Sprintf("unknown service %v", service)
+ } else {
+ errDesc = fmt.Sprintf("unknown method %v for service %v", method, service)
+ }
if trInfo != nil {
- trInfo.tr.LazyLog(&fmtStringer{"Unknown service %v", []interface{}{service}}, true)
+ trInfo.tr.LazyPrintf("%s", errDesc)
trInfo.tr.SetError()
}
- errDesc := fmt.Sprintf("unknown service %v", service)
if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
if trInfo != nil {
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
@@ -1319,15 +1349,11 @@
// pending RPCs on the client side will get notified by connection
// errors.
func (s *Server) Stop() {
- s.quitOnce.Do(func() {
- close(s.quit)
- })
+ s.quit.Fire()
defer func() {
s.serveWG.Wait()
- s.doneOnce.Do(func() {
- close(s.done)
- })
+ s.done.Fire()
}()
s.channelzRemoveOnce.Do(func() {
@@ -1364,15 +1390,8 @@
// accepting new connections and RPCs and blocks until all the pending RPCs are
// finished.
func (s *Server) GracefulStop() {
- s.quitOnce.Do(func() {
- close(s.quit)
- })
-
- defer func() {
- s.doneOnce.Do(func() {
- close(s.done)
- })
- }()
+ s.quit.Fire()
+ defer s.done.Fire()
s.channelzRemoveOnce.Do(func() {
if channelz.IsOn() {
@@ -1390,8 +1409,8 @@
}
s.lis = nil
if !s.drain {
- for c := range s.conns {
- c.(transport.ServerTransport).Drain()
+ for st := range s.conns {
+ st.Drain()
}
s.drain = true
}