[SEBA-930] update GRPC version to 1.27 and change kafka message producing

Change-Id: I14145a1351eb2523fa54e66381ad97abc5eedf50
diff --git a/vendor/github.com/klauspost/compress/zstd/blockenc.go b/vendor/github.com/klauspost/compress/zstd/blockenc.go
index 9d9151a..507757d 100644
--- a/vendor/github.com/klauspost/compress/zstd/blockenc.go
+++ b/vendor/github.com/klauspost/compress/zstd/blockenc.go
@@ -51,7 +51,7 @@
 		b.coders.llEnc = &fseEncoder{}
 		b.coders.llPrev = &fseEncoder{}
 	}
-	b.litEnc = &huff0.Scratch{}
+	b.litEnc = &huff0.Scratch{WantLogLess: 4}
 	b.reset(nil)
 }
 
@@ -299,14 +299,28 @@
 	}
 }
 
+// encodeRaw can be used to set the output to a raw representation of supplied bytes.
+func (b *blockEnc) encodeRawTo(dst, src []byte) []byte {
+	var bh blockHeader
+	bh.setLast(b.last)
+	bh.setSize(uint32(len(src)))
+	bh.setType(blockTypeRaw)
+	dst = bh.appendTo(dst)
+	dst = append(dst, src...)
+	if debug {
+		println("Adding RAW block, length", len(src))
+	}
+	return dst
+}
+
 // encodeLits can be used if the block is only litLen.
-func (b *blockEnc) encodeLits() error {
+func (b *blockEnc) encodeLits(raw bool) error {
 	var bh blockHeader
 	bh.setLast(b.last)
 	bh.setSize(uint32(len(b.literals)))
 
 	// Don't compress extremely small blocks
-	if len(b.literals) < 32 {
+	if len(b.literals) < 32 || raw {
 		if debug {
 			println("Adding RAW block, length", len(b.literals))
 		}
@@ -324,18 +338,10 @@
 	if len(b.literals) >= 1024 {
 		// Use 4 Streams.
 		out, reUsed, err = huff0.Compress4X(b.literals, b.litEnc)
-		if len(out) > len(b.literals)-len(b.literals)>>4 {
-			// Bail out of compression is too little.
-			err = huff0.ErrIncompressible
-		}
 	} else if len(b.literals) > 32 {
 		// Use 1 stream
 		single = true
 		out, reUsed, err = huff0.Compress1X(b.literals, b.litEnc)
-		if len(out) > len(b.literals)-len(b.literals)>>4 {
-			// Bail out of compression is too little.
-			err = huff0.ErrIncompressible
-		}
 	} else {
 		err = huff0.ErrIncompressible
 	}
@@ -391,10 +397,56 @@
 	return nil
 }
 
-// encode will encode the block and put the output in b.output.
-func (b *blockEnc) encode() error {
+// fuzzFseEncoder can be used to fuzz the FSE encoder.
+func fuzzFseEncoder(data []byte) int {
+	if len(data) > maxSequences || len(data) < 2 {
+		return 0
+	}
+	enc := fseEncoder{}
+	hist := enc.Histogram()[:256]
+	maxSym := uint8(0)
+	for i, v := range data {
+		v = v & 63
+		data[i] = v
+		hist[v]++
+		if v > maxSym {
+			maxSym = v
+		}
+	}
+	if maxSym == 0 {
+		// All 0
+		return 0
+	}
+	maxCount := func(a []uint32) int {
+		var max uint32
+		for _, v := range a {
+			if v > max {
+				max = v
+			}
+		}
+		return int(max)
+	}
+	cnt := maxCount(hist[:maxSym])
+	if cnt == len(data) {
+		// RLE
+		return 0
+	}
+	enc.HistogramFinished(maxSym, cnt)
+	err := enc.normalizeCount(len(data))
+	if err != nil {
+		return 0
+	}
+	_, err = enc.writeCount(nil)
+	if err != nil {
+		panic(err)
+	}
+	return 1
+}
+
+// encode will encode the block and append the output in b.output.
+func (b *blockEnc) encode(raw bool) error {
 	if len(b.sequences) == 0 {
-		return b.encodeLits()
+		return b.encodeLits(raw)
 	}
 	// We want some difference
 	if len(b.literals) > (b.size - (b.size >> 5)) {
@@ -405,6 +457,8 @@
 	var lh literalsHeader
 	bh.setLast(b.last)
 	bh.setType(blockTypeCompressed)
+	// Store offset of the block header. Needed when we know the size.
+	bhOffset := len(b.output)
 	b.output = bh.appendTo(b.output)
 
 	var (
@@ -412,22 +466,17 @@
 		reUsed, single bool
 		err            error
 	)
-	if len(b.literals) >= 1024 {
+	if len(b.literals) >= 1024 && !raw {
 		// Use 4 Streams.
 		out, reUsed, err = huff0.Compress4X(b.literals, b.litEnc)
-		if len(out) > len(b.literals)-len(b.literals)>>4 {
-			err = huff0.ErrIncompressible
-		}
-	} else if len(b.literals) > 32 {
+	} else if len(b.literals) > 32 && !raw {
 		// Use 1 stream
 		single = true
 		out, reUsed, err = huff0.Compress1X(b.literals, b.litEnc)
-		if len(out) > len(b.literals)-len(b.literals)>>4 {
-			err = huff0.ErrIncompressible
-		}
 	} else {
 		err = huff0.ErrIncompressible
 	}
+
 	switch err {
 	case huff0.ErrIncompressible:
 		lh.setType(literalsBlockRaw)
@@ -695,23 +744,23 @@
 	}
 	b.output = wr.out
 
-	if len(b.output)-3 >= b.size {
+	if len(b.output)-3-bhOffset >= b.size {
 		// Maybe even add a bigger margin.
 		b.litEnc.Reuse = huff0.ReusePolicyNone
 		return errIncompressible
 	}
 
 	// Size is output minus block header.
-	bh.setSize(uint32(len(b.output)) - 3)
+	bh.setSize(uint32(len(b.output)-bhOffset) - 3)
 	if debug {
 		println("Rewriting block header", bh)
 	}
-	_ = bh.appendTo(b.output[:0])
+	_ = bh.appendTo(b.output[bhOffset:bhOffset])
 	b.coders.setPrev(llEnc, mlEnc, ofEnc)
 	return nil
 }
 
-var errIncompressible = errors.New("uncompressible")
+var errIncompressible = errors.New("incompressible")
 
 func (b *blockEnc) genCodes() {
 	if len(b.sequences) == 0 {