[VOL-2312] Logging - Integrate voltctl with new etcd-based dynamic loglevel mechanism. Testing is in progress
Change-Id: I2e13bb79008c9a49ebb6f58e575f51efebe6dbfd
diff --git a/vendor/google.golang.org/grpc/server.go b/vendor/google.golang.org/grpc/server.go
index 495a4f9..f064b73 100644
--- a/vendor/google.golang.org/grpc/server.go
+++ b/vendor/google.golang.org/grpc/server.go
@@ -42,6 +42,7 @@
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/binarylog"
"google.golang.org/grpc/internal/channelz"
+ "google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
@@ -56,6 +57,8 @@
defaultServerMaxSendMessageSize = math.MaxInt32
)
+var statusOK = status.New(codes.OK, "")
+
type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error)
// MethodDesc represents an RPC service's method specification.
@@ -90,17 +93,15 @@
mu sync.Mutex // guards following
lis map[net.Listener]bool
- conns map[io.Closer]bool
+ conns map[transport.ServerTransport]bool
serve bool
drain bool
cv *sync.Cond // signaled when connections close for GracefulStop
m map[string]*service // service name -> service info
events trace.EventLog
- quit chan struct{}
- done chan struct{}
- quitOnce sync.Once
- doneOnce sync.Once
+ quit *grpcsync.Event
+ done *grpcsync.Event
channelzRemoveOnce sync.Once
serveWG sync.WaitGroup // counts active Serve goroutines for GracefulStop
@@ -386,10 +387,10 @@
s := &Server{
lis: make(map[net.Listener]bool),
opts: opts,
- conns: make(map[io.Closer]bool),
+ conns: make(map[transport.ServerTransport]bool),
m: make(map[string]*service),
- quit: make(chan struct{}),
- done: make(chan struct{}),
+ quit: grpcsync.NewEvent(),
+ done: grpcsync.NewEvent(),
czData: new(channelzData),
}
s.cv = sync.NewCond(&s.mu)
@@ -556,11 +557,9 @@
s.serveWG.Add(1)
defer func() {
s.serveWG.Done()
- select {
- // Stop or GracefulStop called; block until done and return nil.
- case <-s.quit:
- <-s.done
- default:
+ if s.quit.HasFired() {
+ // Stop or GracefulStop called; block until done and return nil.
+ <-s.done.Done()
}
}()
@@ -603,7 +602,7 @@
timer := time.NewTimer(tempDelay)
select {
case <-timer.C:
- case <-s.quit:
+ case <-s.quit.Done():
timer.Stop()
return nil
}
@@ -613,10 +612,8 @@
s.printf("done serving; Accept = %v", err)
s.mu.Unlock()
- select {
- case <-s.quit:
+ if s.quit.HasFired() {
return nil
- default:
}
return err
}
@@ -637,6 +634,10 @@
// handleRawConn forks a goroutine to handle a just-accepted connection that
// has not had any I/O performed on it yet.
func (s *Server) handleRawConn(rawConn net.Conn) {
+ if s.quit.HasFired() {
+ rawConn.Close()
+ return
+ }
rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
conn, authInfo, err := s.useTransportAuthenticator(rawConn)
if err != nil {
@@ -653,14 +654,6 @@
return
}
- s.mu.Lock()
- if s.conns == nil {
- s.mu.Unlock()
- conn.Close()
- return
- }
- s.mu.Unlock()
-
// Finish handshaking (HTTP2)
st := s.newHTTP2Transport(conn, authInfo)
if st == nil {
@@ -768,6 +761,9 @@
// traceInfo returns a traceInfo and associates it with stream, if tracing is enabled.
// If tracing is not enabled, it returns nil.
func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) {
+ if !EnableTracing {
+ return nil
+ }
tr, ok := trace.FromContext(stream.Context())
if !ok {
return nil
@@ -786,27 +782,27 @@
return trInfo
}
-func (s *Server) addConn(c io.Closer) bool {
+func (s *Server) addConn(st transport.ServerTransport) bool {
s.mu.Lock()
defer s.mu.Unlock()
if s.conns == nil {
- c.Close()
+ st.Close()
return false
}
if s.drain {
// Transport added after we drained our existing conns: drain it
// immediately.
- c.(transport.ServerTransport).Drain()
+ st.Drain()
}
- s.conns[c] = true
+ s.conns[st] = true
return true
}
-func (s *Server) removeConn(c io.Closer) {
+func (s *Server) removeConn(st transport.ServerTransport) {
s.mu.Lock()
defer s.mu.Unlock()
if s.conns != nil {
- delete(s.conns, c)
+ delete(s.conns, st)
s.cv.Broadcast()
}
}
@@ -978,10 +974,11 @@
}
if sh != nil {
sh.HandleRPC(stream.Context(), &stats.InPayload{
- RecvTime: time.Now(),
- Payload: v,
- Data: d,
- Length: len(d),
+ RecvTime: time.Now(),
+ Payload: v,
+ WireLength: payInfo.wireLength,
+ Data: d,
+ Length: len(d),
})
}
if binlog != nil {
@@ -1077,7 +1074,7 @@
// TODO: Should we be logging if writing status failed here, like above?
// Should the logging be in WriteStatus? Should we ignore the WriteStatus
// error or allow the stats handler to see it?
- err = t.WriteStatus(stream, status.New(codes.OK, ""))
+ err = t.WriteStatus(stream, statusOK)
if binlog != nil {
binlog.Log(&binarylog.ServerTrailer{
Trailer: stream.Trailer(),
@@ -1235,7 +1232,7 @@
ss.trInfo.tr.LazyLog(stringer("OK"), false)
ss.mu.Unlock()
}
- err = t.WriteStatus(ss.s, status.New(codes.OK, ""))
+ err = t.WriteStatus(ss.s, statusOK)
if ss.binlog != nil {
ss.binlog.Log(&binarylog.ServerTrailer{
Trailer: ss.s.Trailer(),
@@ -1352,15 +1349,11 @@
// pending RPCs on the client side will get notified by connection
// errors.
func (s *Server) Stop() {
- s.quitOnce.Do(func() {
- close(s.quit)
- })
+ s.quit.Fire()
defer func() {
s.serveWG.Wait()
- s.doneOnce.Do(func() {
- close(s.done)
- })
+ s.done.Fire()
}()
s.channelzRemoveOnce.Do(func() {
@@ -1397,15 +1390,8 @@
// accepting new connections and RPCs and blocks until all the pending RPCs are
// finished.
func (s *Server) GracefulStop() {
- s.quitOnce.Do(func() {
- close(s.quit)
- })
-
- defer func() {
- s.doneOnce.Do(func() {
- close(s.done)
- })
- }()
+ s.quit.Fire()
+ defer s.done.Fire()
s.channelzRemoveOnce.Do(func() {
if channelz.IsOn() {
@@ -1423,8 +1409,8 @@
}
s.lis = nil
if !s.drain {
- for c := range s.conns {
- c.(transport.ServerTransport).Drain()
+ for st := range s.conns {
+ st.Drain()
}
s.drain = true
}