[SEBA-930] update GRPC version to 1.27 and change kafka message producing

Change-Id: I14145a1351eb2523fa54e66381ad97abc5eedf50
diff --git a/vendor/github.com/klauspost/compress/huff0/bitwriter.go b/vendor/github.com/klauspost/compress/huff0/bitwriter.go
index ec0c3fc..bda4021 100644
--- a/vendor/github.com/klauspost/compress/huff0/bitwriter.go
+++ b/vendor/github.com/klauspost/compress/huff0/bitwriter.go
@@ -38,7 +38,7 @@
 	b.nBits += bits
 }
 
-// addBits16Clean will add up to 16 bits. value may not contain more set bits than indicated.
+// encSymbol will add up to 16 bits. value may not contain more set bits than indicated.
 // It will not check if there is space for them, so the caller must ensure that it has flushed recently.
 func (b *bitWriter) encSymbol(ct cTable, symbol byte) {
 	enc := ct[symbol]
@@ -46,6 +46,17 @@
 	b.nBits += enc.nBits
 }
 
+// encTwoSymbols will add up to 32 bits. value may not contain more set bits than indicated.
+// It will not check if there is space for them, so the caller must ensure that it has flushed recently.
+func (b *bitWriter) encTwoSymbols(ct cTable, av, bv byte) {
+	encA := ct[av]
+	encB := ct[bv]
+	sh := b.nBits & 63
+	combined := uint64(encA.val) | (uint64(encB.val) << (encA.nBits & 63))
+	b.bitContainer |= combined << sh
+	b.nBits += encA.nBits + encB.nBits
+}
+
 // addBits16ZeroNC will add up to 16 bits.
 // It will not check if there is space for them,
 // so the caller must ensure that it has flushed recently.
diff --git a/vendor/github.com/klauspost/compress/huff0/compress.go b/vendor/github.com/klauspost/compress/huff0/compress.go
index dd4f7fe..0843cb0 100644
--- a/vendor/github.com/klauspost/compress/huff0/compress.go
+++ b/vendor/github.com/klauspost/compress/huff0/compress.go
@@ -54,6 +54,12 @@
 		canReuse = s.canUseTable(s.prevTable)
 	}
 
+	// We want the output size to be less than this:
+	wantSize := len(in)
+	if s.WantLogLess > 0 {
+		wantSize -= wantSize >> s.WantLogLess
+	}
+
 	// Reset for next run.
 	s.clearCount = true
 	s.maxCount = 0
@@ -74,10 +80,13 @@
 
 	if s.Reuse == ReusePolicyPrefer && canReuse {
 		keepTable := s.cTable
+		keepTL := s.actualTableLog
 		s.cTable = s.prevTable
+		s.actualTableLog = s.prevTableLog
 		s.Out, err = compressor(in)
 		s.cTable = keepTable
-		if err == nil && len(s.Out) < len(in) {
+		s.actualTableLog = keepTL
+		if err == nil && len(s.Out) < wantSize {
 			s.OutData = s.Out
 			return s.Out, true, nil
 		}
@@ -86,7 +95,6 @@
 	}
 
 	// Calculate new table.
-	s.optimalTableLog()
 	err = s.buildCTable()
 	if err != nil {
 		return nil, false, err
@@ -100,13 +108,22 @@
 		hSize := len(s.Out)
 		oldSize := s.prevTable.estimateSize(s.count[:s.symbolLen])
 		newSize := s.cTable.estimateSize(s.count[:s.symbolLen])
-		if oldSize <= hSize+newSize || hSize+12 >= len(in) {
+		if oldSize <= hSize+newSize || hSize+12 >= wantSize {
 			// Retain cTable even if we re-use.
 			keepTable := s.cTable
+			keepTL := s.actualTableLog
+
 			s.cTable = s.prevTable
+			s.actualTableLog = s.prevTableLog
 			s.Out, err = compressor(in)
+
+			// Restore ctable.
 			s.cTable = keepTable
-			if len(s.Out) >= len(in) {
+			s.actualTableLog = keepTL
+			if err != nil {
+				return nil, false, err
+			}
+			if len(s.Out) >= wantSize {
 				return nil, false, ErrIncompressible
 			}
 			s.OutData = s.Out
@@ -128,12 +145,12 @@
 		s.OutTable = nil
 		return nil, false, err
 	}
-	if len(s.Out) >= len(in) {
+	if len(s.Out) >= wantSize {
 		s.OutTable = nil
 		return nil, false, ErrIncompressible
 	}
 	// Move current table into previous.
-	s.prevTable, s.cTable = s.cTable, s.prevTable[:0]
+	s.prevTable, s.prevTableLog, s.cTable = s.cTable, s.actualTableLog, s.prevTable[:0]
 	s.OutData = s.Out[len(s.OutTable):]
 	return s.Out, false, nil
 }
@@ -154,28 +171,23 @@
 	for i := len(src) & 3; i > 0; i-- {
 		bw.encSymbol(cTable, src[n+i-1])
 	}
+	n -= 4
 	if s.actualTableLog <= 8 {
-		n -= 4
 		for ; n >= 0; n -= 4 {
 			tmp := src[n : n+4]
 			// tmp should be len 4
 			bw.flush32()
-			bw.encSymbol(cTable, tmp[3])
-			bw.encSymbol(cTable, tmp[2])
-			bw.encSymbol(cTable, tmp[1])
-			bw.encSymbol(cTable, tmp[0])
+			bw.encTwoSymbols(cTable, tmp[3], tmp[2])
+			bw.encTwoSymbols(cTable, tmp[1], tmp[0])
 		}
 	} else {
-		n -= 4
 		for ; n >= 0; n -= 4 {
 			tmp := src[n : n+4]
 			// tmp should be len 4
 			bw.flush32()
-			bw.encSymbol(cTable, tmp[3])
-			bw.encSymbol(cTable, tmp[2])
+			bw.encTwoSymbols(cTable, tmp[3], tmp[2])
 			bw.flush32()
-			bw.encSymbol(cTable, tmp[1])
-			bw.encSymbol(cTable, tmp[0])
+			bw.encTwoSymbols(cTable, tmp[1], tmp[0])
 		}
 	}
 	err := bw.close()
@@ -313,9 +325,26 @@
 	return true
 }
 
+func (s *Scratch) validateTable(c cTable) bool {
+	if len(c) < int(s.symbolLen) {
+		return false
+	}
+	for i, v := range s.count[:s.symbolLen] {
+		if v != 0 {
+			if c[i].nBits == 0 {
+				return false
+			}
+			if c[i].nBits > s.actualTableLog {
+				return false
+			}
+		}
+	}
+	return true
+}
+
 // minTableLog provides the minimum logSize to safely represent a distribution.
 func (s *Scratch) minTableLog() uint8 {
-	minBitsSrc := highBit32(uint32(s.br.remain()-1)) + 1
+	minBitsSrc := highBit32(uint32(s.br.remain())) + 1
 	minBitsSymbols := highBit32(uint32(s.symbolLen-1)) + 2
 	if minBitsSrc < minBitsSymbols {
 		return uint8(minBitsSrc)
@@ -327,7 +356,7 @@
 func (s *Scratch) optimalTableLog() {
 	tableLog := s.TableLog
 	minBits := s.minTableLog()
-	maxBitsSrc := uint8(highBit32(uint32(s.br.remain()-1))) - 2
+	maxBitsSrc := uint8(highBit32(uint32(s.br.remain()-1))) - 1
 	if maxBitsSrc < tableLog {
 		// Accuracy can be reduced
 		tableLog = maxBitsSrc
@@ -354,6 +383,7 @@
 const huffNodesMask = huffNodesLen - 1
 
 func (s *Scratch) buildCTable() error {
+	s.optimalTableLog()
 	s.huffSort()
 	if cap(s.cTable) < maxSymbolValue+1 {
 		s.cTable = make([]cTableEntry, s.symbolLen, maxSymbolValue+1)
@@ -430,7 +460,7 @@
 		return fmt.Errorf("internal error: maxNbBits (%d) > tableLogMax (%d)", maxNbBits, tableLogMax)
 	}
 	var nbPerRank [tableLogMax + 1]uint16
-	var valPerRank [tableLogMax + 1]uint16
+	var valPerRank [16]uint16
 	for _, v := range huffNode[:nonNullRank+1] {
 		nbPerRank[v.nbBits]++
 	}
@@ -446,16 +476,17 @@
 	}
 
 	// push nbBits per symbol, symbol order
-	// TODO: changed `s.symbolLen` -> `nonNullRank+1` (micro-opt)
 	for _, v := range huffNode[:nonNullRank+1] {
 		s.cTable[v.symbol].nBits = v.nbBits
 	}
 
 	// assign value within rank, symbol order
-	for n, val := range s.cTable[:s.symbolLen] {
-		v := valPerRank[val.nBits]
-		s.cTable[n].val = v
-		valPerRank[val.nBits] = v + 1
+	t := s.cTable[:s.symbolLen]
+	for n, val := range t {
+		nbits := val.nBits & 15
+		v := valPerRank[nbits]
+		t[n].val = v
+		valPerRank[nbits] = v + 1
 	}
 
 	return nil
@@ -479,10 +510,12 @@
 		r := highBit32(v+1) & 31
 		rank[r].base++
 	}
-	for n := 30; n > 0; n-- {
+	// maxBitLength is log2(BlockSizeMax) + 1
+	const maxBitLength = 18 + 1
+	for n := maxBitLength; n > 0; n-- {
 		rank[n-1].base += rank[n].base
 	}
-	for n := range rank[:] {
+	for n := range rank[:maxBitLength] {
 		rank[n].current = rank[n].base
 	}
 	for n, c := range s.count[:s.symbolLen] {
@@ -501,7 +534,7 @@
 }
 
 func (s *Scratch) setMaxHeight(lastNonNull int) uint8 {
-	maxNbBits := s.TableLog
+	maxNbBits := s.actualTableLog
 	huffNode := s.nodes[1 : huffNodesLen+1]
 	//huffNode = huffNode[: huffNodesLen]
 
diff --git a/vendor/github.com/klauspost/compress/huff0/decompress.go b/vendor/github.com/klauspost/compress/huff0/decompress.go
index 43b4815..97ae66a 100644
--- a/vendor/github.com/klauspost/compress/huff0/decompress.go
+++ b/vendor/github.com/klauspost/compress/huff0/decompress.go
@@ -15,8 +15,7 @@
 
 // single-symbols decoding
 type dEntrySingle struct {
-	byte  uint8
-	nBits uint8
+	entry uint16
 }
 
 // double-symbols decoding
@@ -76,14 +75,15 @@
 	}
 
 	// collect weight stats
-	var rankStats [tableLogMax + 1]uint32
+	var rankStats [16]uint32
 	weightTotal := uint32(0)
 	for _, v := range s.huffWeight[:s.symbolLen] {
 		if v > tableLogMax {
 			return s, nil, errors.New("corrupt input: weight too large")
 		}
-		rankStats[v]++
-		weightTotal += (1 << (v & 15)) >> 1
+		v2 := v & 15
+		rankStats[v2]++
+		weightTotal += (1 << v2) >> 1
 	}
 	if weightTotal == 0 {
 		return s, nil, errors.New("corrupt input: weights zero")
@@ -134,15 +134,17 @@
 	if len(s.dt.single) != tSize {
 		s.dt.single = make([]dEntrySingle, tSize)
 	}
-
 	for n, w := range s.huffWeight[:s.symbolLen] {
+		if w == 0 {
+			continue
+		}
 		length := (uint32(1) << w) >> 1
 		d := dEntrySingle{
-			byte:  uint8(n),
-			nBits: s.actualTableLog + 1 - w,
+			entry: uint16(s.actualTableLog+1-w) | (uint16(n) << 8),
 		}
-		for u := rankStats[w]; u < rankStats[w]+length; u++ {
-			s.dt.single[u] = d
+		single := s.dt.single[rankStats[w] : rankStats[w]+length]
+		for i := range single {
+			single[i] = d
 		}
 		rankStats[w] += length
 	}
@@ -167,12 +169,12 @@
 	decode := func() byte {
 		val := br.peekBitsFast(s.actualTableLog) /* note : actualTableLog >= 1 */
 		v := s.dt.single[val]
-		br.bitsRead += v.nBits
-		return v.byte
+		br.bitsRead += uint8(v.entry)
+		return uint8(v.entry >> 8)
 	}
 	hasDec := func(v dEntrySingle) byte {
-		br.bitsRead += v.nBits
-		return v.byte
+		br.bitsRead += uint8(v.entry)
+		return uint8(v.entry >> 8)
 	}
 
 	// Avoid bounds check by always having full sized table.
@@ -269,33 +271,81 @@
 	decode := func(br *bitReader) byte {
 		val := br.peekBitsFast(s.actualTableLog) /* note : actualTableLog >= 1 */
 		v := single[val&tlMask]
-		br.bitsRead += v.nBits
-		return v.byte
+		br.bitsRead += uint8(v.entry)
+		return uint8(v.entry >> 8)
 	}
 
 	// Use temp table to avoid bound checks/append penalty.
 	var tmp = s.huffWeight[:256]
 	var off uint8
+	var decoded int
 
 	// Decode 2 values from each decoder/loop.
 	const bufoff = 256 / 4
 bigloop:
 	for {
 		for i := range br {
-			if br[i].off < 4 {
+			br := &br[i]
+			if br.off < 4 {
 				break bigloop
 			}
-			br[i].fillFast()
+			br.fillFast()
 		}
-		tmp[off] = decode(&br[0])
-		tmp[off+bufoff] = decode(&br[1])
-		tmp[off+bufoff*2] = decode(&br[2])
-		tmp[off+bufoff*3] = decode(&br[3])
-		tmp[off+1] = decode(&br[0])
-		tmp[off+1+bufoff] = decode(&br[1])
-		tmp[off+1+bufoff*2] = decode(&br[2])
-		tmp[off+1+bufoff*3] = decode(&br[3])
+
+		{
+			const stream = 0
+			val := br[stream].peekBitsFast(s.actualTableLog)
+			v := single[val&tlMask]
+			br[stream].bitsRead += uint8(v.entry)
+
+			val2 := br[stream].peekBitsFast(s.actualTableLog)
+			v2 := single[val2&tlMask]
+			tmp[off+bufoff*stream+1] = uint8(v2.entry >> 8)
+			tmp[off+bufoff*stream] = uint8(v.entry >> 8)
+			br[stream].bitsRead += uint8(v2.entry)
+		}
+
+		{
+			const stream = 1
+			val := br[stream].peekBitsFast(s.actualTableLog)
+			v := single[val&tlMask]
+			br[stream].bitsRead += uint8(v.entry)
+
+			val2 := br[stream].peekBitsFast(s.actualTableLog)
+			v2 := single[val2&tlMask]
+			tmp[off+bufoff*stream+1] = uint8(v2.entry >> 8)
+			tmp[off+bufoff*stream] = uint8(v.entry >> 8)
+			br[stream].bitsRead += uint8(v2.entry)
+		}
+
+		{
+			const stream = 2
+			val := br[stream].peekBitsFast(s.actualTableLog)
+			v := single[val&tlMask]
+			br[stream].bitsRead += uint8(v.entry)
+
+			val2 := br[stream].peekBitsFast(s.actualTableLog)
+			v2 := single[val2&tlMask]
+			tmp[off+bufoff*stream+1] = uint8(v2.entry >> 8)
+			tmp[off+bufoff*stream] = uint8(v.entry >> 8)
+			br[stream].bitsRead += uint8(v2.entry)
+		}
+
+		{
+			const stream = 3
+			val := br[stream].peekBitsFast(s.actualTableLog)
+			v := single[val&tlMask]
+			br[stream].bitsRead += uint8(v.entry)
+
+			val2 := br[stream].peekBitsFast(s.actualTableLog)
+			v2 := single[val2&tlMask]
+			tmp[off+bufoff*stream+1] = uint8(v2.entry >> 8)
+			tmp[off+bufoff*stream] = uint8(v.entry >> 8)
+			br[stream].bitsRead += uint8(v2.entry)
+		}
+
 		off += 2
+
 		if off == bufoff {
 			if bufoff > dstEvery {
 				return nil, errors.New("corruption detected: stream overrun 1")
@@ -306,6 +356,7 @@
 			copy(dstOut[dstEvery*3:], tmp[bufoff*3:bufoff*4])
 			off = 0
 			dstOut = dstOut[bufoff:]
+			decoded += 256
 			// There must at least be 3 buffers left.
 			if len(dstOut) < dstEvery*3 {
 				return nil, errors.New("corruption detected: stream overrun 2")
@@ -321,9 +372,11 @@
 		copy(dstOut[dstEvery:dstEvery+ioff], tmp[bufoff:bufoff*2])
 		copy(dstOut[dstEvery*2:dstEvery*2+ioff], tmp[bufoff*2:bufoff*3])
 		copy(dstOut[dstEvery*3:dstEvery*3+ioff], tmp[bufoff*3:bufoff*4])
+		decoded += int(off) * 4
 		dstOut = dstOut[off:]
 	}
 
+	// Decode remaining.
 	for i := range br {
 		offset := dstEvery * i
 		br := &br[i]
@@ -335,12 +388,15 @@
 			dstOut[offset] = decode(br)
 			offset++
 		}
+		decoded += offset - dstEvery*i
 		err = br.close()
 		if err != nil {
 			return nil, err
 		}
 	}
-
+	if dstSize != decoded {
+		return nil, errors.New("corruption detected: short output block")
+	}
 	return s.Out, nil
 }
 
@@ -360,7 +416,7 @@
 		broken++
 		if enc.nBits == 0 {
 			for _, dec := range dt {
-				if dec.byte == byte(sym) {
+				if uint8(dec.entry>>8) == byte(sym) {
 					fmt.Fprintf(w, "symbol %x has decoder, but no encoder\n", sym)
 					errs++
 					break
@@ -376,12 +432,12 @@
 		top := enc.val << ub
 		// decoder looks at top bits.
 		dec := dt[top]
-		if dec.nBits != enc.nBits {
-			fmt.Fprintf(w, "symbol 0x%x bit size mismatch (enc: %d, dec:%d).\n", sym, enc.nBits, dec.nBits)
+		if uint8(dec.entry) != enc.nBits {
+			fmt.Fprintf(w, "symbol 0x%x bit size mismatch (enc: %d, dec:%d).\n", sym, enc.nBits, uint8(dec.entry))
 			errs++
 		}
-		if dec.byte != uint8(sym) {
-			fmt.Fprintf(w, "symbol 0x%x decoder output mismatch (enc: %d, dec:%d).\n", sym, sym, dec.byte)
+		if uint8(dec.entry>>8) != uint8(sym) {
+			fmt.Fprintf(w, "symbol 0x%x decoder output mismatch (enc: %d, dec:%d).\n", sym, sym, uint8(dec.entry>>8))
 			errs++
 		}
 		if errs > 0 {
@@ -392,12 +448,12 @@
 		for i := uint16(0); i < (1 << ub); i++ {
 			vval := top | i
 			dec := dt[vval]
-			if dec.nBits != enc.nBits {
-				fmt.Fprintf(w, "symbol 0x%x bit size mismatch (enc: %d, dec:%d).\n", vval, enc.nBits, dec.nBits)
+			if uint8(dec.entry) != enc.nBits {
+				fmt.Fprintf(w, "symbol 0x%x bit size mismatch (enc: %d, dec:%d).\n", vval, enc.nBits, uint8(dec.entry))
 				errs++
 			}
-			if dec.byte != uint8(sym) {
-				fmt.Fprintf(w, "symbol 0x%x decoder output mismatch (enc: %d, dec:%d).\n", vval, sym, dec.byte)
+			if uint8(dec.entry>>8) != uint8(sym) {
+				fmt.Fprintf(w, "symbol 0x%x decoder output mismatch (enc: %d, dec:%d).\n", vval, sym, uint8(dec.entry>>8))
 				errs++
 			}
 			if errs > 20 {
diff --git a/vendor/github.com/klauspost/compress/huff0/huff0.go b/vendor/github.com/klauspost/compress/huff0/huff0.go
index 6f823f9..53249df 100644
--- a/vendor/github.com/klauspost/compress/huff0/huff0.go
+++ b/vendor/github.com/klauspost/compress/huff0/huff0.go
@@ -83,12 +83,18 @@
 	MaxSymbolValue uint8
 
 	// TableLog will attempt to override the tablelog for the next block.
-	// Must be <= 11.
+	// Must be <= 11 and >= 5.
 	TableLog uint8
 
 	// Reuse will specify the reuse policy
 	Reuse ReusePolicy
 
+	// WantLogLess allows to specify a log 2 reduction that should at least be achieved,
+	// otherwise the block will be returned as incompressible.
+	// The reduction should then at least be (input size >> WantLogLess)
+	// If WantLogLess == 0 any improvement will do.
+	WantLogLess uint8
+
 	// MaxDecodedSize will set the maximum allowed output size.
 	// This value will automatically be set to BlockSizeMax if not set.
 	// Decoders will return ErrMaxDecodedSizeExceeded is this limit is exceeded.
@@ -99,6 +105,7 @@
 	maxCount       int    // count of the most probable symbol
 	clearCount     bool   // clear count
 	actualTableLog uint8  // Selected tablelog.
+	prevTableLog   uint8  // Tablelog for previous table
 	prevTable      cTable // Table used for previous compression.
 	cTable         cTable // compression table
 	dt             dTable // decompression table
@@ -121,8 +128,8 @@
 	if s.TableLog == 0 {
 		s.TableLog = tableLogDefault
 	}
-	if s.TableLog > tableLogMax {
-		return nil, fmt.Errorf("tableLog (%d) > maxTableLog (%d)", s.TableLog, tableLogMax)
+	if s.TableLog > tableLogMax || s.TableLog < minTablelog {
+		return nil, fmt.Errorf(" invalid tableLog %d (%d -> %d)", s.TableLog, minTablelog, tableLogMax)
 	}
 	if s.MaxDecodedSize <= 0 || s.MaxDecodedSize > BlockSizeMax {
 		s.MaxDecodedSize = BlockSizeMax