gRPC migration
Change-Id: Ib390f6dde0d5a8d6db12ccd7da41135570ad1354
diff --git a/vendor/google.golang.org/grpc/server.go b/vendor/google.golang.org/grpc/server.go
index edfcdca..e54083d 100644
--- a/vendor/google.golang.org/grpc/server.go
+++ b/vendor/google.golang.org/grpc/server.go
@@ -116,8 +116,6 @@
dc Decompressor
unaryInt UnaryServerInterceptor
streamInt StreamServerInterceptor
- chainUnaryInts []UnaryServerInterceptor
- chainStreamInts []StreamServerInterceptor
inTapHandle tap.ServerInHandle
statsHandler stats.Handler
maxConcurrentStreams uint32
@@ -313,16 +311,6 @@
})
}
-// ChainUnaryInterceptor returns a ServerOption that specifies the chained interceptor
-// for unary RPCs. The first interceptor will be the outer most,
-// while the last interceptor will be the inner most wrapper around the real call.
-// All unary interceptors added by this method will be chained.
-func ChainUnaryInterceptor(interceptors ...UnaryServerInterceptor) ServerOption {
- return newFuncServerOption(func(o *serverOptions) {
- o.chainUnaryInts = append(o.chainUnaryInts, interceptors...)
- })
-}
-
// StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the
// server. Only one stream interceptor can be installed.
func StreamInterceptor(i StreamServerInterceptor) ServerOption {
@@ -334,16 +322,6 @@
})
}
-// ChainStreamInterceptor returns a ServerOption that specifies the chained interceptor
-// for stream RPCs. The first interceptor will be the outer most,
-// while the last interceptor will be the inner most wrapper around the real call.
-// All stream interceptors added by this method will be chained.
-func ChainStreamInterceptor(interceptors ...StreamServerInterceptor) ServerOption {
- return newFuncServerOption(func(o *serverOptions) {
- o.chainStreamInts = append(o.chainStreamInts, interceptors...)
- })
-}
-
// InTapHandle returns a ServerOption that sets the tap handle for all the server
// transport to be created. Only one can be installed.
func InTapHandle(h tap.ServerInHandle) ServerOption {
@@ -366,8 +344,8 @@
// unknown service handler. The provided method is a bidi-streaming RPC service
// handler that will be invoked instead of returning the "unimplemented" gRPC
// error whenever a request is received for an unregistered service or method.
-// The handling function and stream interceptor (if set) have full access to
-// the ServerStream, including its Context.
+// The handling function has full access to the Context of the request and the
+// stream, and the invocation bypasses interceptors.
func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
return newFuncServerOption(func(o *serverOptions) {
o.unknownStreamDesc = &StreamDesc{
@@ -426,8 +404,6 @@
done: grpcsync.NewEvent(),
czData: new(channelzData),
}
- chainUnaryServerInterceptors(s)
- chainStreamServerInterceptors(s)
s.cv = sync.NewCond(&s.mu)
if EnableTracing {
_, file, line, _ := runtime.Caller(1)
@@ -682,7 +658,7 @@
s.mu.Lock()
s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
s.mu.Unlock()
- channelz.Warningf(s.channelzID, "grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
+ grpclog.Warningf("grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
rawConn.Close()
}
rawConn.SetDeadline(time.Time{})
@@ -729,7 +705,7 @@
s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
s.mu.Unlock()
c.Close()
- channelz.Warning(s.channelzID, "grpc: Server.Serve failed to create ServerTransport: ", err)
+ grpclog.Warningln("grpc: Server.Serve failed to create ServerTransport: ", err)
return nil
}
@@ -868,12 +844,12 @@
func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error {
data, err := encode(s.getCodec(stream.ContentSubtype()), msg)
if err != nil {
- channelz.Error(s.channelzID, "grpc: server failed to encode response: ", err)
+ grpclog.Errorln("grpc: server failed to encode response: ", err)
return err
}
compData, err := compress(data, cp, comp)
if err != nil {
- channelz.Error(s.channelzID, "grpc: server failed to compress response: ", err)
+ grpclog.Errorln("grpc: server failed to compress response: ", err)
return err
}
hdr, payload := msgHeader(data, compData)
@@ -888,93 +864,42 @@
return err
}
-// chainUnaryServerInterceptors chains all unary server interceptors into one.
-func chainUnaryServerInterceptors(s *Server) {
- // Prepend opts.unaryInt to the chaining interceptors if it exists, since unaryInt will
- // be executed before any other chained interceptors.
- interceptors := s.opts.chainUnaryInts
- if s.opts.unaryInt != nil {
- interceptors = append([]UnaryServerInterceptor{s.opts.unaryInt}, s.opts.chainUnaryInts...)
- }
-
- var chainedInt UnaryServerInterceptor
- if len(interceptors) == 0 {
- chainedInt = nil
- } else if len(interceptors) == 1 {
- chainedInt = interceptors[0]
- } else {
- chainedInt = func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (interface{}, error) {
- return interceptors[0](ctx, req, info, getChainUnaryHandler(interceptors, 0, info, handler))
- }
- }
-
- s.opts.unaryInt = chainedInt
-}
-
-// getChainUnaryHandler recursively generate the chained UnaryHandler
-func getChainUnaryHandler(interceptors []UnaryServerInterceptor, curr int, info *UnaryServerInfo, finalHandler UnaryHandler) UnaryHandler {
- if curr == len(interceptors)-1 {
- return finalHandler
- }
-
- return func(ctx context.Context, req interface{}) (interface{}, error) {
- return interceptors[curr+1](ctx, req, info, getChainUnaryHandler(interceptors, curr+1, info, finalHandler))
- }
-}
-
func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) {
- sh := s.opts.statsHandler
- if sh != nil || trInfo != nil || channelz.IsOn() {
- if channelz.IsOn() {
- s.incrCallsStarted()
- }
- var statsBegin *stats.Begin
- if sh != nil {
- beginTime := time.Now()
- statsBegin = &stats.Begin{
- BeginTime: beginTime,
- }
- sh.HandleRPC(stream.Context(), statsBegin)
- }
- if trInfo != nil {
- trInfo.tr.LazyLog(&trInfo.firstLine, false)
- }
- // The deferred error handling for tracing, stats handler and channelz are
- // combined into one function to reduce stack usage -- a defer takes ~56-64
- // bytes on the stack, so overflowing the stack will require a stack
- // re-allocation, which is expensive.
- //
- // To maintain behavior similar to separate deferred statements, statements
- // should be executed in the reverse order. That is, tracing first, stats
- // handler second, and channelz last. Note that panics *within* defers will
- // lead to different behavior, but that's an acceptable compromise; that
- // would be undefined behavior territory anyway.
+ if channelz.IsOn() {
+ s.incrCallsStarted()
defer func() {
- if trInfo != nil {
- if err != nil && err != io.EOF {
- trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
- trInfo.tr.SetError()
- }
- trInfo.tr.Finish()
+ if err != nil && err != io.EOF {
+ s.incrCallsFailed()
+ } else {
+ s.incrCallsSucceeded()
}
-
- if sh != nil {
- end := &stats.End{
- BeginTime: statsBegin.BeginTime,
- EndTime: time.Now(),
- }
- if err != nil && err != io.EOF {
- end.Error = toRPCErr(err)
- }
- sh.HandleRPC(stream.Context(), end)
+ }()
+ }
+ sh := s.opts.statsHandler
+ if sh != nil {
+ beginTime := time.Now()
+ begin := &stats.Begin{
+ BeginTime: beginTime,
+ }
+ sh.HandleRPC(stream.Context(), begin)
+ defer func() {
+ end := &stats.End{
+ BeginTime: beginTime,
+ EndTime: time.Now(),
}
-
- if channelz.IsOn() {
- if err != nil && err != io.EOF {
- s.incrCallsFailed()
- } else {
- s.incrCallsSucceeded()
- }
+ if err != nil && err != io.EOF {
+ end.Error = toRPCErr(err)
+ }
+ sh.HandleRPC(stream.Context(), end)
+ }()
+ }
+ if trInfo != nil {
+ defer trInfo.tr.Finish()
+ trInfo.tr.LazyLog(&trInfo.firstLine, false)
+ defer func() {
+ if err != nil && err != io.EOF {
+ trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
+ trInfo.tr.SetError()
}
}()
}
@@ -1047,7 +972,7 @@
if err != nil {
if st, ok := status.FromError(err); ok {
if e := t.WriteStatus(stream, st); e != nil {
- channelz.Warningf(s.channelzID, "grpc: Server.processUnaryRPC failed to write status %v", e)
+ grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
}
}
return err
@@ -1092,7 +1017,7 @@
trInfo.tr.SetError()
}
if e := t.WriteStatus(stream, appStatus); e != nil {
- channelz.Warningf(s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
+ grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e)
}
if binlog != nil {
if h, _ := stream.Header(); h.Len() > 0 {
@@ -1119,9 +1044,9 @@
// The entire stream is done (for unary RPC only).
return err
}
- if sts, ok := status.FromError(err); ok {
- if e := t.WriteStatus(stream, sts); e != nil {
- channelz.Warningf(s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
+ if s, ok := status.FromError(err); ok {
+ if e := t.WriteStatus(stream, s); e != nil {
+ grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e)
}
} else {
switch st := err.(type) {
@@ -1171,52 +1096,34 @@
return err
}
-// chainStreamServerInterceptors chains all stream server interceptors into one.
-func chainStreamServerInterceptors(s *Server) {
- // Prepend opts.streamInt to the chaining interceptors if it exists, since streamInt will
- // be executed before any other chained interceptors.
- interceptors := s.opts.chainStreamInts
- if s.opts.streamInt != nil {
- interceptors = append([]StreamServerInterceptor{s.opts.streamInt}, s.opts.chainStreamInts...)
- }
-
- var chainedInt StreamServerInterceptor
- if len(interceptors) == 0 {
- chainedInt = nil
- } else if len(interceptors) == 1 {
- chainedInt = interceptors[0]
- } else {
- chainedInt = func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error {
- return interceptors[0](srv, ss, info, getChainStreamHandler(interceptors, 0, info, handler))
- }
- }
-
- s.opts.streamInt = chainedInt
-}
-
-// getChainStreamHandler recursively generate the chained StreamHandler
-func getChainStreamHandler(interceptors []StreamServerInterceptor, curr int, info *StreamServerInfo, finalHandler StreamHandler) StreamHandler {
- if curr == len(interceptors)-1 {
- return finalHandler
- }
-
- return func(srv interface{}, ss ServerStream) error {
- return interceptors[curr+1](srv, ss, info, getChainStreamHandler(interceptors, curr+1, info, finalHandler))
- }
-}
-
func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) {
if channelz.IsOn() {
s.incrCallsStarted()
+ defer func() {
+ if err != nil && err != io.EOF {
+ s.incrCallsFailed()
+ } else {
+ s.incrCallsSucceeded()
+ }
+ }()
}
sh := s.opts.statsHandler
- var statsBegin *stats.Begin
if sh != nil {
beginTime := time.Now()
- statsBegin = &stats.Begin{
+ begin := &stats.Begin{
BeginTime: beginTime,
}
- sh.HandleRPC(stream.Context(), statsBegin)
+ sh.HandleRPC(stream.Context(), begin)
+ defer func() {
+ end := &stats.End{
+ BeginTime: beginTime,
+ EndTime: time.Now(),
+ }
+ if err != nil && err != io.EOF {
+ end.Error = toRPCErr(err)
+ }
+ sh.HandleRPC(stream.Context(), end)
+ }()
}
ctx := NewContextWithServerTransportStream(stream.Context(), stream)
ss := &serverStream{
@@ -1231,41 +1138,6 @@
statsHandler: sh,
}
- if sh != nil || trInfo != nil || channelz.IsOn() {
- // See comment in processUnaryRPC on defers.
- defer func() {
- if trInfo != nil {
- ss.mu.Lock()
- if err != nil && err != io.EOF {
- ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
- ss.trInfo.tr.SetError()
- }
- ss.trInfo.tr.Finish()
- ss.trInfo.tr = nil
- ss.mu.Unlock()
- }
-
- if sh != nil {
- end := &stats.End{
- BeginTime: statsBegin.BeginTime,
- EndTime: time.Now(),
- }
- if err != nil && err != io.EOF {
- end.Error = toRPCErr(err)
- }
- sh.HandleRPC(stream.Context(), end)
- }
-
- if channelz.IsOn() {
- if err != nil && err != io.EOF {
- s.incrCallsFailed()
- } else {
- s.incrCallsSucceeded()
- }
- }
- }()
- }
-
ss.binlog = binarylog.GetMethodLogger(stream.Method())
if ss.binlog != nil {
md, _ := metadata.FromIncomingContext(ctx)
@@ -1319,6 +1191,16 @@
if trInfo != nil {
trInfo.tr.LazyLog(&trInfo.firstLine, false)
+ defer func() {
+ ss.mu.Lock()
+ if err != nil && err != io.EOF {
+ ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
+ ss.trInfo.tr.SetError()
+ }
+ ss.trInfo.tr.Finish()
+ ss.trInfo.tr = nil
+ ss.mu.Unlock()
+ }()
}
var appErr error
var server interface{}
@@ -1389,7 +1271,7 @@
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
trInfo.tr.SetError()
}
- channelz.Warningf(s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
+ grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err)
}
if trInfo != nil {
trInfo.tr.Finish()
@@ -1430,7 +1312,7 @@
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
trInfo.tr.SetError()
}
- channelz.Warningf(s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
+ grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err)
}
if trInfo != nil {
trInfo.tr.Finish()