[SEBA-930] update GRPC version to 1.27 and change kafka message producing

Change-Id: I14145a1351eb2523fa54e66381ad97abc5eedf50
diff --git a/vendor/google.golang.org/grpc/internal/transport/handler_server.go b/vendor/google.golang.org/grpc/internal/transport/handler_server.go
index 78f9ddc..c3c32da 100644
--- a/vendor/google.golang.org/grpc/internal/transport/handler_server.go
+++ b/vendor/google.golang.org/grpc/internal/transport/handler_server.go
@@ -227,7 +227,9 @@
 
 	if err == nil { // transport has not been closed
 		if ht.stats != nil {
-			ht.stats.HandleRPC(s.Context(), &stats.OutTrailer{})
+			ht.stats.HandleRPC(s.Context(), &stats.OutTrailer{
+				Trailer: s.trailer.Copy(),
+			})
 		}
 	}
 	ht.Close()
@@ -289,7 +291,9 @@
 
 	if err == nil {
 		if ht.stats != nil {
-			ht.stats.HandleRPC(s.Context(), &stats.OutHeader{})
+			ht.stats.HandleRPC(s.Context(), &stats.OutHeader{
+				Header: md.Copy(),
+			})
 		}
 	}
 	return err
@@ -334,7 +338,7 @@
 		Addr: ht.RemoteAddr(),
 	}
 	if req.TLS != nil {
-		pr.AuthInfo = credentials.TLSInfo{State: *req.TLS}
+		pr.AuthInfo = credentials.TLSInfo{State: *req.TLS, CommonAuthInfo: credentials.CommonAuthInfo{credentials.PrivacyAndIntegrity}}
 	}
 	ctx = metadata.NewIncomingContext(ctx, ht.headerMD)
 	s.ctx = peer.NewContext(ctx, pr)
diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_client.go b/vendor/google.golang.org/grpc/internal/transport/http2_client.go
index 294661a..2d6feeb 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http2_client.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http2_client.go
@@ -45,9 +45,14 @@
 	"google.golang.org/grpc/status"
 )
 
+// clientConnectionCounter counts the number of connections a client has
+// initiated (equal to the number of http2Clients created). Must be accessed
+// atomically.
+var clientConnectionCounter uint64
+
 // http2Client implements the ClientTransport interface with HTTP2.
 type http2Client struct {
-	lastRead   int64 // keep this field 64-bit aligned
+	lastRead   int64 // Keep this field 64-bit aligned. Accessed atomically.
 	ctx        context.Context
 	cancel     context.CancelFunc
 	ctxDone    <-chan struct{} // Cache the ctx.Done() chan.
@@ -126,6 +131,8 @@
 	onClose  func()
 
 	bufferPool *bufferPool
+
+	connectionID uint64
 }
 
 func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) {
@@ -329,6 +336,8 @@
 		}
 	}
 
+	t.connectionID = atomic.AddUint64(&clientConnectionCounter, 1)
+
 	if err := t.framer.writer.Flush(); err != nil {
 		return nil, err
 	}
@@ -394,7 +403,8 @@
 func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr) ([]hpack.HeaderField, error) {
 	aud := t.createAudience(callHdr)
 	ri := credentials.RequestInfo{
-		Method: callHdr.Method,
+		Method:   callHdr.Method,
+		AuthInfo: t.authInfo,
 	}
 	ctxWithRequestInfo := internal.NewRequestInfoContext.(func(context.Context, credentials.RequestInfo) context.Context)(ctx, ri)
 	authData, err := t.getTrAuthData(ctxWithRequestInfo, aud)
@@ -424,6 +434,7 @@
 
 	if callHdr.SendCompress != "" {
 		headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: callHdr.SendCompress})
+		headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-accept-encoding", Value: callHdr.SendCompress})
 	}
 	if dl, ok := ctx.Deadline(); ok {
 		// Send out timeout regardless its value. The server can detect timeout context by itself.
@@ -669,12 +680,14 @@
 		}
 	}
 	if t.statsHandler != nil {
+		header, _, _ := metadata.FromOutgoingContextRaw(ctx)
 		outHeader := &stats.OutHeader{
 			Client:      true,
 			FullMethod:  callHdr.Method,
 			RemoteAddr:  t.remoteAddr,
 			LocalAddr:   t.localAddr,
 			Compression: callHdr.SendCompress,
+			Header:      header.Copy(),
 		}
 		t.statsHandler.HandleRPC(s.ctx, outHeader)
 	}
@@ -1177,12 +1190,14 @@
 				inHeader := &stats.InHeader{
 					Client:     true,
 					WireLength: int(frame.Header().Length),
+					Header:     s.header.Copy(),
 				}
 				t.statsHandler.HandleRPC(s.ctx, inHeader)
 			} else {
 				inTrailer := &stats.InTrailer{
 					Client:     true,
 					WireLength: int(frame.Header().Length),
+					Trailer:    s.trailer.Copy(),
 				}
 				t.statsHandler.HandleRPC(s.ctx, inTrailer)
 			}
@@ -1369,7 +1384,6 @@
 			// acked).
 			sleepDuration := minTime(t.kp.Time, timeoutLeft)
 			timeoutLeft -= sleepDuration
-			prevNano = lastRead
 			timer.Reset(sleepDuration)
 		case <-t.ctx.Done():
 			if !timer.Stop() {
diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_server.go b/vendor/google.golang.org/grpc/internal/transport/http2_server.go
index 0760383..8b04b03 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http2_server.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http2_server.go
@@ -62,8 +62,13 @@
 	statusRawProto = internal.StatusRawProto.(func(*status.Status) *spb.Status)
 )
 
+// serverConnectionCounter counts the number of connections a server has seen
+// (equal to the number of http2Servers created). Must be accessed atomically.
+var serverConnectionCounter uint64
+
 // http2Server implements the ServerTransport interface with HTTP2.
 type http2Server struct {
+	lastRead    int64 // Keep this field 64-bit aligned. Accessed atomically.
 	ctx         context.Context
 	done        chan struct{}
 	conn        net.Conn
@@ -83,12 +88,8 @@
 	controlBuf *controlBuffer
 	fc         *trInFlow
 	stats      stats.Handler
-	// Flag to keep track of reading activity on transport.
-	// 1 is true and 0 is false.
-	activity uint32 // Accessed atomically.
 	// Keepalive and max-age parameters for the server.
 	kp keepalive.ServerParameters
-
 	// Keepalive enforcement policy.
 	kep keepalive.EnforcementPolicy
 	// The time instance last ping was received.
@@ -124,6 +125,8 @@
 	channelzID int64 // channelz unique identification number
 	czData     *channelzData
 	bufferPool *bufferPool
+
+	connectionID uint64
 }
 
 // newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
@@ -253,6 +256,9 @@
 	if channelz.IsOn() {
 		t.channelzID = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr))
 	}
+
+	t.connectionID = atomic.AddUint64(&serverConnectionCounter, 1)
+
 	t.framer.writer.Flush()
 
 	defer func() {
@@ -277,7 +283,7 @@
 	if err != nil {
 		return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to read initial settings frame: %v", err)
 	}
-	atomic.StoreUint32(&t.activity, 1)
+	atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
 	sf, ok := frame.(*http2.SettingsFrame)
 	if !ok {
 		return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
@@ -416,6 +422,7 @@
 			LocalAddr:   t.localAddr,
 			Compression: s.recvCompress,
 			WireLength:  int(frame.Header().Length),
+			Header:      metadata.MD(state.data.mdata).Copy(),
 		}
 		t.stats.HandleRPC(s.ctx, inHeader)
 	}
@@ -449,7 +456,7 @@
 	for {
 		t.controlBuf.throttle()
 		frame, err := t.framer.fr.ReadFrame()
-		atomic.StoreUint32(&t.activity, 1)
+		atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
 		if err != nil {
 			if se, ok := err.(http2.StreamError); ok {
 				warningf("transport: http2Server.HandleStreams encountered http2.StreamError: %v", se)
@@ -808,7 +815,9 @@
 	if t.stats != nil {
 		// Note: WireLength is not set in outHeader.
 		// TODO(mmukhi): Revisit this later, if needed.
-		outHeader := &stats.OutHeader{}
+		outHeader := &stats.OutHeader{
+			Header: s.header.Copy(),
+		}
 		t.stats.HandleRPC(s.Context(), outHeader)
 	}
 	return nil
@@ -871,7 +880,9 @@
 	rst := s.getState() == streamActive
 	t.finishStream(s, rst, http2.ErrCodeNo, trailingHeader, true)
 	if t.stats != nil {
-		t.stats.HandleRPC(s.Context(), &stats.OutTrailer{})
+		t.stats.HandleRPC(s.Context(), &stats.OutTrailer{
+			Trailer: s.trailer.Copy(),
+		})
 	}
 	return nil
 }
@@ -932,32 +943,35 @@
 // after an additional duration of keepalive.Timeout.
 func (t *http2Server) keepalive() {
 	p := &ping{}
-	var pingSent bool
-	maxIdle := time.NewTimer(t.kp.MaxConnectionIdle)
-	maxAge := time.NewTimer(t.kp.MaxConnectionAge)
-	keepalive := time.NewTimer(t.kp.Time)
-	// NOTE: All exit paths of this function should reset their
-	// respective timers. A failure to do so will cause the
-	// following clean-up to deadlock and eventually leak.
+	// True iff a ping has been sent, and no data has been received since then.
+	outstandingPing := false
+	// Amount of time remaining before which we should receive an ACK for the
+	// last sent ping.
+	kpTimeoutLeft := time.Duration(0)
+	// Records the last value of t.lastRead before we go block on the timer.
+	// This is required to check for read activity since then.
+	prevNano := time.Now().UnixNano()
+	// Initialize the different timers to their default values.
+	idleTimer := time.NewTimer(t.kp.MaxConnectionIdle)
+	ageTimer := time.NewTimer(t.kp.MaxConnectionAge)
+	kpTimer := time.NewTimer(t.kp.Time)
 	defer func() {
-		if !maxIdle.Stop() {
-			<-maxIdle.C
-		}
-		if !maxAge.Stop() {
-			<-maxAge.C
-		}
-		if !keepalive.Stop() {
-			<-keepalive.C
-		}
+		// We need to drain the underlying channel in these timers after a call
+		// to Stop(), only if we are interested in resetting them. Clearly we
+		// are not interested in resetting them here.
+		idleTimer.Stop()
+		ageTimer.Stop()
+		kpTimer.Stop()
 	}()
+
 	for {
 		select {
-		case <-maxIdle.C:
+		case <-idleTimer.C:
 			t.mu.Lock()
 			idle := t.idle
 			if idle.IsZero() { // The connection is non-idle.
 				t.mu.Unlock()
-				maxIdle.Reset(t.kp.MaxConnectionIdle)
+				idleTimer.Reset(t.kp.MaxConnectionIdle)
 				continue
 			}
 			val := t.kp.MaxConnectionIdle - time.Since(idle)
@@ -966,43 +980,51 @@
 				// The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
 				// Gracefully close the connection.
 				t.drain(http2.ErrCodeNo, []byte{})
-				// Resetting the timer so that the clean-up doesn't deadlock.
-				maxIdle.Reset(infinity)
 				return
 			}
-			maxIdle.Reset(val)
-		case <-maxAge.C:
+			idleTimer.Reset(val)
+		case <-ageTimer.C:
 			t.drain(http2.ErrCodeNo, []byte{})
-			maxAge.Reset(t.kp.MaxConnectionAgeGrace)
+			ageTimer.Reset(t.kp.MaxConnectionAgeGrace)
 			select {
-			case <-maxAge.C:
+			case <-ageTimer.C:
 				// Close the connection after grace period.
 				infof("transport: closing server transport due to maximum connection age.")
 				t.Close()
-				// Resetting the timer so that the clean-up doesn't deadlock.
-				maxAge.Reset(infinity)
 			case <-t.done:
 			}
 			return
-		case <-keepalive.C:
-			if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
-				pingSent = false
-				keepalive.Reset(t.kp.Time)
+		case <-kpTimer.C:
+			lastRead := atomic.LoadInt64(&t.lastRead)
+			if lastRead > prevNano {
+				// There has been read activity since the last time we were
+				// here. Setup the timer to fire at kp.Time seconds from
+				// lastRead time and continue.
+				outstandingPing = false
+				kpTimer.Reset(time.Duration(lastRead) + t.kp.Time - time.Duration(time.Now().UnixNano()))
+				prevNano = lastRead
 				continue
 			}
-			if pingSent {
+			if outstandingPing && kpTimeoutLeft <= 0 {
 				infof("transport: closing server transport due to idleness.")
 				t.Close()
-				// Resetting the timer so that the clean-up doesn't deadlock.
-				keepalive.Reset(infinity)
 				return
 			}
-			pingSent = true
-			if channelz.IsOn() {
-				atomic.AddInt64(&t.czData.kpCount, 1)
+			if !outstandingPing {
+				if channelz.IsOn() {
+					atomic.AddInt64(&t.czData.kpCount, 1)
+				}
+				t.controlBuf.put(p)
+				kpTimeoutLeft = t.kp.Timeout
+				outstandingPing = true
 			}
-			t.controlBuf.put(p)
-			keepalive.Reset(t.kp.Timeout)
+			// The amount of time to sleep here is the minimum of kp.Time and
+			// timeoutLeft. This will ensure that we wait only for kp.Time
+			// before sending out the next ping (for cases where the ping is
+			// acked).
+			sleepDuration := minTime(t.kp.Time, kpTimeoutLeft)
+			kpTimeoutLeft -= sleepDuration
+			kpTimer.Reset(sleepDuration)
 		case <-t.done:
 			return
 		}
diff --git a/vendor/google.golang.org/grpc/internal/transport/transport.go b/vendor/google.golang.org/grpc/internal/transport/transport.go
index bfab940..a30da9e 100644
--- a/vendor/google.golang.org/grpc/internal/transport/transport.go
+++ b/vendor/google.golang.org/grpc/internal/transport/transport.go
@@ -73,10 +73,11 @@
 }
 
 // recvBuffer is an unbounded channel of recvMsg structs.
-// Note recvBuffer differs from controlBuffer only in that recvBuffer
-// holds a channel of only recvMsg structs instead of objects implementing "item" interface.
-// recvBuffer is written to much more often than
-// controlBuffer and using strict recvMsg structs helps avoid allocation in "recvBuffer.put"
+//
+// Note: recvBuffer differs from buffer.Unbounded only in the fact that it
+// holds a channel of recvMsg structs instead of objects implementing "item"
+// interface. recvBuffer is written to much more often and using strict recvMsg
+// structs helps avoid allocation in "recvBuffer.put"
 type recvBuffer struct {
 	c       chan recvMsg
 	mu      sync.Mutex