gRPC migration

Change-Id: Ib390f6dde0d5a8d6db12ccd7da41135570ad1354
diff --git a/vendor/google.golang.org/grpc/server.go b/vendor/google.golang.org/grpc/server.go
index edfcdca..e54083d 100644
--- a/vendor/google.golang.org/grpc/server.go
+++ b/vendor/google.golang.org/grpc/server.go
@@ -116,8 +116,6 @@
 	dc                    Decompressor
 	unaryInt              UnaryServerInterceptor
 	streamInt             StreamServerInterceptor
-	chainUnaryInts        []UnaryServerInterceptor
-	chainStreamInts       []StreamServerInterceptor
 	inTapHandle           tap.ServerInHandle
 	statsHandler          stats.Handler
 	maxConcurrentStreams  uint32
@@ -313,16 +311,6 @@
 	})
 }
 
-// ChainUnaryInterceptor returns a ServerOption that specifies the chained interceptor
-// for unary RPCs. The first interceptor will be the outer most,
-// while the last interceptor will be the inner most wrapper around the real call.
-// All unary interceptors added by this method will be chained.
-func ChainUnaryInterceptor(interceptors ...UnaryServerInterceptor) ServerOption {
-	return newFuncServerOption(func(o *serverOptions) {
-		o.chainUnaryInts = append(o.chainUnaryInts, interceptors...)
-	})
-}
-
 // StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the
 // server. Only one stream interceptor can be installed.
 func StreamInterceptor(i StreamServerInterceptor) ServerOption {
@@ -334,16 +322,6 @@
 	})
 }
 
-// ChainStreamInterceptor returns a ServerOption that specifies the chained interceptor
-// for stream RPCs. The first interceptor will be the outer most,
-// while the last interceptor will be the inner most wrapper around the real call.
-// All stream interceptors added by this method will be chained.
-func ChainStreamInterceptor(interceptors ...StreamServerInterceptor) ServerOption {
-	return newFuncServerOption(func(o *serverOptions) {
-		o.chainStreamInts = append(o.chainStreamInts, interceptors...)
-	})
-}
-
 // InTapHandle returns a ServerOption that sets the tap handle for all the server
 // transport to be created. Only one can be installed.
 func InTapHandle(h tap.ServerInHandle) ServerOption {
@@ -366,8 +344,8 @@
 // unknown service handler. The provided method is a bidi-streaming RPC service
 // handler that will be invoked instead of returning the "unimplemented" gRPC
 // error whenever a request is received for an unregistered service or method.
-// The handling function and stream interceptor (if set) have full access to
-// the ServerStream, including its Context.
+// The handling function has full access to the Context of the request and the
+// stream, and the invocation bypasses interceptors.
 func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
 	return newFuncServerOption(func(o *serverOptions) {
 		o.unknownStreamDesc = &StreamDesc{
@@ -426,8 +404,6 @@
 		done:   grpcsync.NewEvent(),
 		czData: new(channelzData),
 	}
-	chainUnaryServerInterceptors(s)
-	chainStreamServerInterceptors(s)
 	s.cv = sync.NewCond(&s.mu)
 	if EnableTracing {
 		_, file, line, _ := runtime.Caller(1)
@@ -682,7 +658,7 @@
 			s.mu.Lock()
 			s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
 			s.mu.Unlock()
-			channelz.Warningf(s.channelzID, "grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
+			grpclog.Warningf("grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
 			rawConn.Close()
 		}
 		rawConn.SetDeadline(time.Time{})
@@ -729,7 +705,7 @@
 		s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
 		s.mu.Unlock()
 		c.Close()
-		channelz.Warning(s.channelzID, "grpc: Server.Serve failed to create ServerTransport: ", err)
+		grpclog.Warningln("grpc: Server.Serve failed to create ServerTransport: ", err)
 		return nil
 	}
 
@@ -868,12 +844,12 @@
 func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error {
 	data, err := encode(s.getCodec(stream.ContentSubtype()), msg)
 	if err != nil {
-		channelz.Error(s.channelzID, "grpc: server failed to encode response: ", err)
+		grpclog.Errorln("grpc: server failed to encode response: ", err)
 		return err
 	}
 	compData, err := compress(data, cp, comp)
 	if err != nil {
-		channelz.Error(s.channelzID, "grpc: server failed to compress response: ", err)
+		grpclog.Errorln("grpc: server failed to compress response: ", err)
 		return err
 	}
 	hdr, payload := msgHeader(data, compData)
@@ -888,93 +864,42 @@
 	return err
 }
 
-// chainUnaryServerInterceptors chains all unary server interceptors into one.
-func chainUnaryServerInterceptors(s *Server) {
-	// Prepend opts.unaryInt to the chaining interceptors if it exists, since unaryInt will
-	// be executed before any other chained interceptors.
-	interceptors := s.opts.chainUnaryInts
-	if s.opts.unaryInt != nil {
-		interceptors = append([]UnaryServerInterceptor{s.opts.unaryInt}, s.opts.chainUnaryInts...)
-	}
-
-	var chainedInt UnaryServerInterceptor
-	if len(interceptors) == 0 {
-		chainedInt = nil
-	} else if len(interceptors) == 1 {
-		chainedInt = interceptors[0]
-	} else {
-		chainedInt = func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (interface{}, error) {
-			return interceptors[0](ctx, req, info, getChainUnaryHandler(interceptors, 0, info, handler))
-		}
-	}
-
-	s.opts.unaryInt = chainedInt
-}
-
-// getChainUnaryHandler recursively generate the chained UnaryHandler
-func getChainUnaryHandler(interceptors []UnaryServerInterceptor, curr int, info *UnaryServerInfo, finalHandler UnaryHandler) UnaryHandler {
-	if curr == len(interceptors)-1 {
-		return finalHandler
-	}
-
-	return func(ctx context.Context, req interface{}) (interface{}, error) {
-		return interceptors[curr+1](ctx, req, info, getChainUnaryHandler(interceptors, curr+1, info, finalHandler))
-	}
-}
-
 func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) {
-	sh := s.opts.statsHandler
-	if sh != nil || trInfo != nil || channelz.IsOn() {
-		if channelz.IsOn() {
-			s.incrCallsStarted()
-		}
-		var statsBegin *stats.Begin
-		if sh != nil {
-			beginTime := time.Now()
-			statsBegin = &stats.Begin{
-				BeginTime: beginTime,
-			}
-			sh.HandleRPC(stream.Context(), statsBegin)
-		}
-		if trInfo != nil {
-			trInfo.tr.LazyLog(&trInfo.firstLine, false)
-		}
-		// The deferred error handling for tracing, stats handler and channelz are
-		// combined into one function to reduce stack usage -- a defer takes ~56-64
-		// bytes on the stack, so overflowing the stack will require a stack
-		// re-allocation, which is expensive.
-		//
-		// To maintain behavior similar to separate deferred statements, statements
-		// should be executed in the reverse order. That is, tracing first, stats
-		// handler second, and channelz last. Note that panics *within* defers will
-		// lead to different behavior, but that's an acceptable compromise; that
-		// would be undefined behavior territory anyway.
+	if channelz.IsOn() {
+		s.incrCallsStarted()
 		defer func() {
-			if trInfo != nil {
-				if err != nil && err != io.EOF {
-					trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
-					trInfo.tr.SetError()
-				}
-				trInfo.tr.Finish()
+			if err != nil && err != io.EOF {
+				s.incrCallsFailed()
+			} else {
+				s.incrCallsSucceeded()
 			}
-
-			if sh != nil {
-				end := &stats.End{
-					BeginTime: statsBegin.BeginTime,
-					EndTime:   time.Now(),
-				}
-				if err != nil && err != io.EOF {
-					end.Error = toRPCErr(err)
-				}
-				sh.HandleRPC(stream.Context(), end)
+		}()
+	}
+	sh := s.opts.statsHandler
+	if sh != nil {
+		beginTime := time.Now()
+		begin := &stats.Begin{
+			BeginTime: beginTime,
+		}
+		sh.HandleRPC(stream.Context(), begin)
+		defer func() {
+			end := &stats.End{
+				BeginTime: beginTime,
+				EndTime:   time.Now(),
 			}
-
-			if channelz.IsOn() {
-				if err != nil && err != io.EOF {
-					s.incrCallsFailed()
-				} else {
-					s.incrCallsSucceeded()
-				}
+			if err != nil && err != io.EOF {
+				end.Error = toRPCErr(err)
+			}
+			sh.HandleRPC(stream.Context(), end)
+		}()
+	}
+	if trInfo != nil {
+		defer trInfo.tr.Finish()
+		trInfo.tr.LazyLog(&trInfo.firstLine, false)
+		defer func() {
+			if err != nil && err != io.EOF {
+				trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
+				trInfo.tr.SetError()
 			}
 		}()
 	}
@@ -1047,7 +972,7 @@
 	if err != nil {
 		if st, ok := status.FromError(err); ok {
 			if e := t.WriteStatus(stream, st); e != nil {
-				channelz.Warningf(s.channelzID, "grpc: Server.processUnaryRPC failed to write status %v", e)
+				grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
 			}
 		}
 		return err
@@ -1092,7 +1017,7 @@
 			trInfo.tr.SetError()
 		}
 		if e := t.WriteStatus(stream, appStatus); e != nil {
-			channelz.Warningf(s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
+			grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e)
 		}
 		if binlog != nil {
 			if h, _ := stream.Header(); h.Len() > 0 {
@@ -1119,9 +1044,9 @@
 			// The entire stream is done (for unary RPC only).
 			return err
 		}
-		if sts, ok := status.FromError(err); ok {
-			if e := t.WriteStatus(stream, sts); e != nil {
-				channelz.Warningf(s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
+		if s, ok := status.FromError(err); ok {
+			if e := t.WriteStatus(stream, s); e != nil {
+				grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e)
 			}
 		} else {
 			switch st := err.(type) {
@@ -1171,52 +1096,34 @@
 	return err
 }
 
-// chainStreamServerInterceptors chains all stream server interceptors into one.
-func chainStreamServerInterceptors(s *Server) {
-	// Prepend opts.streamInt to the chaining interceptors if it exists, since streamInt will
-	// be executed before any other chained interceptors.
-	interceptors := s.opts.chainStreamInts
-	if s.opts.streamInt != nil {
-		interceptors = append([]StreamServerInterceptor{s.opts.streamInt}, s.opts.chainStreamInts...)
-	}
-
-	var chainedInt StreamServerInterceptor
-	if len(interceptors) == 0 {
-		chainedInt = nil
-	} else if len(interceptors) == 1 {
-		chainedInt = interceptors[0]
-	} else {
-		chainedInt = func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error {
-			return interceptors[0](srv, ss, info, getChainStreamHandler(interceptors, 0, info, handler))
-		}
-	}
-
-	s.opts.streamInt = chainedInt
-}
-
-// getChainStreamHandler recursively generate the chained StreamHandler
-func getChainStreamHandler(interceptors []StreamServerInterceptor, curr int, info *StreamServerInfo, finalHandler StreamHandler) StreamHandler {
-	if curr == len(interceptors)-1 {
-		return finalHandler
-	}
-
-	return func(srv interface{}, ss ServerStream) error {
-		return interceptors[curr+1](srv, ss, info, getChainStreamHandler(interceptors, curr+1, info, finalHandler))
-	}
-}
-
 func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) {
 	if channelz.IsOn() {
 		s.incrCallsStarted()
+		defer func() {
+			if err != nil && err != io.EOF {
+				s.incrCallsFailed()
+			} else {
+				s.incrCallsSucceeded()
+			}
+		}()
 	}
 	sh := s.opts.statsHandler
-	var statsBegin *stats.Begin
 	if sh != nil {
 		beginTime := time.Now()
-		statsBegin = &stats.Begin{
+		begin := &stats.Begin{
 			BeginTime: beginTime,
 		}
-		sh.HandleRPC(stream.Context(), statsBegin)
+		sh.HandleRPC(stream.Context(), begin)
+		defer func() {
+			end := &stats.End{
+				BeginTime: beginTime,
+				EndTime:   time.Now(),
+			}
+			if err != nil && err != io.EOF {
+				end.Error = toRPCErr(err)
+			}
+			sh.HandleRPC(stream.Context(), end)
+		}()
 	}
 	ctx := NewContextWithServerTransportStream(stream.Context(), stream)
 	ss := &serverStream{
@@ -1231,41 +1138,6 @@
 		statsHandler:          sh,
 	}
 
-	if sh != nil || trInfo != nil || channelz.IsOn() {
-		// See comment in processUnaryRPC on defers.
-		defer func() {
-			if trInfo != nil {
-				ss.mu.Lock()
-				if err != nil && err != io.EOF {
-					ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
-					ss.trInfo.tr.SetError()
-				}
-				ss.trInfo.tr.Finish()
-				ss.trInfo.tr = nil
-				ss.mu.Unlock()
-			}
-
-			if sh != nil {
-				end := &stats.End{
-					BeginTime: statsBegin.BeginTime,
-					EndTime:   time.Now(),
-				}
-				if err != nil && err != io.EOF {
-					end.Error = toRPCErr(err)
-				}
-				sh.HandleRPC(stream.Context(), end)
-			}
-
-			if channelz.IsOn() {
-				if err != nil && err != io.EOF {
-					s.incrCallsFailed()
-				} else {
-					s.incrCallsSucceeded()
-				}
-			}
-		}()
-	}
-
 	ss.binlog = binarylog.GetMethodLogger(stream.Method())
 	if ss.binlog != nil {
 		md, _ := metadata.FromIncomingContext(ctx)
@@ -1319,6 +1191,16 @@
 
 	if trInfo != nil {
 		trInfo.tr.LazyLog(&trInfo.firstLine, false)
+		defer func() {
+			ss.mu.Lock()
+			if err != nil && err != io.EOF {
+				ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
+				ss.trInfo.tr.SetError()
+			}
+			ss.trInfo.tr.Finish()
+			ss.trInfo.tr = nil
+			ss.mu.Unlock()
+		}()
 	}
 	var appErr error
 	var server interface{}
@@ -1389,7 +1271,7 @@
 				trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
 				trInfo.tr.SetError()
 			}
-			channelz.Warningf(s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
+			grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err)
 		}
 		if trInfo != nil {
 			trInfo.tr.Finish()
@@ -1430,7 +1312,7 @@
 			trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
 			trInfo.tr.SetError()
 		}
-		channelz.Warningf(s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
+		grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err)
 	}
 	if trInfo != nil {
 		trInfo.tr.Finish()