[SEBA-930] update GRPC version to 1.27 and change kafka message producing

Change-Id: I14145a1351eb2523fa54e66381ad97abc5eedf50
diff --git a/vendor/google.golang.org/grpc/server.go b/vendor/google.golang.org/grpc/server.go
index e54083d..0d75cb1 100644
--- a/vendor/google.golang.org/grpc/server.go
+++ b/vendor/google.golang.org/grpc/server.go
@@ -344,8 +344,8 @@
 // unknown service handler. The provided method is a bidi-streaming RPC service
 // handler that will be invoked instead of returning the "unimplemented" gRPC
 // error whenever a request is received for an unregistered service or method.
-// The handling function has full access to the Context of the request and the
-// stream, and the invocation bypasses interceptors.
+// The handling function and stream interceptor (if set) have full access to
+// the ServerStream, including its Context.
 func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
 	return newFuncServerOption(func(o *serverOptions) {
 		o.unknownStreamDesc = &StreamDesc{
@@ -865,41 +865,58 @@
 }
 
 func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) {
-	if channelz.IsOn() {
-		s.incrCallsStarted()
-		defer func() {
-			if err != nil && err != io.EOF {
-				s.incrCallsFailed()
-			} else {
-				s.incrCallsSucceeded()
-			}
-		}()
-	}
 	sh := s.opts.statsHandler
-	if sh != nil {
-		beginTime := time.Now()
-		begin := &stats.Begin{
-			BeginTime: beginTime,
+	if sh != nil || trInfo != nil || channelz.IsOn() {
+		if channelz.IsOn() {
+			s.incrCallsStarted()
 		}
-		sh.HandleRPC(stream.Context(), begin)
-		defer func() {
-			end := &stats.End{
+		var statsBegin *stats.Begin
+		if sh != nil {
+			beginTime := time.Now()
+			statsBegin = &stats.Begin{
 				BeginTime: beginTime,
-				EndTime:   time.Now(),
 			}
-			if err != nil && err != io.EOF {
-				end.Error = toRPCErr(err)
-			}
-			sh.HandleRPC(stream.Context(), end)
-		}()
-	}
-	if trInfo != nil {
-		defer trInfo.tr.Finish()
-		trInfo.tr.LazyLog(&trInfo.firstLine, false)
+			sh.HandleRPC(stream.Context(), statsBegin)
+		}
+		if trInfo != nil {
+			trInfo.tr.LazyLog(&trInfo.firstLine, false)
+		}
+		// The deferred error handling for tracing, stats handler and channelz are
+		// combined into one function to reduce stack usage -- a defer takes ~56-64
+		// bytes on the stack, so overflowing the stack will require a stack
+		// re-allocation, which is expensive.
+		//
+		// To maintain behavior similar to separate deferred statements, statements
+		// should be executed in the reverse order. That is, tracing first, stats
+		// handler second, and channelz last. Note that panics *within* defers will
+		// lead to different behavior, but that's an acceptable compromise; that
+		// would be undefined behavior territory anyway.
 		defer func() {
-			if err != nil && err != io.EOF {
-				trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
-				trInfo.tr.SetError()
+			if trInfo != nil {
+				if err != nil && err != io.EOF {
+					trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
+					trInfo.tr.SetError()
+				}
+				trInfo.tr.Finish()
+			}
+
+			if sh != nil {
+				end := &stats.End{
+					BeginTime: statsBegin.BeginTime,
+					EndTime:   time.Now(),
+				}
+				if err != nil && err != io.EOF {
+					end.Error = toRPCErr(err)
+				}
+				sh.HandleRPC(stream.Context(), end)
+			}
+
+			if channelz.IsOn() {
+				if err != nil && err != io.EOF {
+					s.incrCallsFailed()
+				} else {
+					s.incrCallsSucceeded()
+				}
 			}
 		}()
 	}
@@ -1099,31 +1116,15 @@
 func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) {
 	if channelz.IsOn() {
 		s.incrCallsStarted()
-		defer func() {
-			if err != nil && err != io.EOF {
-				s.incrCallsFailed()
-			} else {
-				s.incrCallsSucceeded()
-			}
-		}()
 	}
 	sh := s.opts.statsHandler
+	var statsBegin *stats.Begin
 	if sh != nil {
 		beginTime := time.Now()
-		begin := &stats.Begin{
+		statsBegin = &stats.Begin{
 			BeginTime: beginTime,
 		}
-		sh.HandleRPC(stream.Context(), begin)
-		defer func() {
-			end := &stats.End{
-				BeginTime: beginTime,
-				EndTime:   time.Now(),
-			}
-			if err != nil && err != io.EOF {
-				end.Error = toRPCErr(err)
-			}
-			sh.HandleRPC(stream.Context(), end)
-		}()
+		sh.HandleRPC(stream.Context(), statsBegin)
 	}
 	ctx := NewContextWithServerTransportStream(stream.Context(), stream)
 	ss := &serverStream{
@@ -1138,6 +1139,41 @@
 		statsHandler:          sh,
 	}
 
+	if sh != nil || trInfo != nil || channelz.IsOn() {
+		// See comment in processUnaryRPC on defers.
+		defer func() {
+			if trInfo != nil {
+				ss.mu.Lock()
+				if err != nil && err != io.EOF {
+					ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
+					ss.trInfo.tr.SetError()
+				}
+				ss.trInfo.tr.Finish()
+				ss.trInfo.tr = nil
+				ss.mu.Unlock()
+			}
+
+			if sh != nil {
+				end := &stats.End{
+					BeginTime: statsBegin.BeginTime,
+					EndTime:   time.Now(),
+				}
+				if err != nil && err != io.EOF {
+					end.Error = toRPCErr(err)
+				}
+				sh.HandleRPC(stream.Context(), end)
+			}
+
+			if channelz.IsOn() {
+				if err != nil && err != io.EOF {
+					s.incrCallsFailed()
+				} else {
+					s.incrCallsSucceeded()
+				}
+			}
+		}()
+	}
+
 	ss.binlog = binarylog.GetMethodLogger(stream.Method())
 	if ss.binlog != nil {
 		md, _ := metadata.FromIncomingContext(ctx)
@@ -1191,16 +1227,6 @@
 
 	if trInfo != nil {
 		trInfo.tr.LazyLog(&trInfo.firstLine, false)
-		defer func() {
-			ss.mu.Lock()
-			if err != nil && err != io.EOF {
-				ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
-				ss.trInfo.tr.SetError()
-			}
-			ss.trInfo.tr.Finish()
-			ss.trInfo.tr = nil
-			ss.mu.Unlock()
-		}()
 	}
 	var appErr error
 	var server interface{}