[SEBA-930] update GRPC version to 1.27 and change kafka message producing
Change-Id: I14145a1351eb2523fa54e66381ad97abc5eedf50
diff --git a/vendor/github.com/klauspost/compress/zstd/blockdec.go b/vendor/github.com/klauspost/compress/zstd/blockdec.go
index 3e161ea..ed670bc 100644
--- a/vendor/github.com/klauspost/compress/zstd/blockdec.go
+++ b/vendor/github.com/klauspost/compress/zstd/blockdec.go
@@ -11,6 +11,7 @@
"sync"
"github.com/klauspost/compress/huff0"
+ "github.com/klauspost/compress/zstd/internal/xxhash"
)
type blockType uint8
@@ -88,6 +89,7 @@
sequenceBuf []seq
tmp [4]byte
err error
+ decWG sync.WaitGroup
}
func (b *blockDec) String() string {
@@ -104,6 +106,7 @@
input: make(chan struct{}, 1),
history: make(chan *history, 1),
}
+ b.decWG.Add(1)
go b.startDecoder()
return &b
}
@@ -160,7 +163,8 @@
b.data, err = br.readBig(cSize, b.dataStorage)
if err != nil {
if debug {
- println("Reading block:", err)
+ println("Reading block:", err, "(", cSize, ")", len(b.data))
+ printf("%T", br)
}
return err
}
@@ -181,11 +185,13 @@
close(b.input)
close(b.history)
close(b.result)
+ b.decWG.Wait()
}
// decodeAsync will prepare decoding the block when it receives input.
// This will separate output and history.
func (b *blockDec) startDecoder() {
+ defer b.decWG.Done()
for range b.input {
//println("blockDec: Got block input")
switch b.Type {
@@ -275,7 +281,7 @@
hist.b = nil
err := b.decodeCompressed(hist)
if debug {
- println("Decompressed to total", len(b.dst), "bytes, error:", err)
+ println("Decompressed to total", len(b.dst), "bytes, hash:", xxhash.Sum64(b.dst), "error:", err)
}
hist.b = b.dst
b.dst = saved
@@ -368,7 +374,7 @@
}
}
if debug {
- println("literals type:", litType, "litRegenSize:", litRegenSize, "litCompSize", litCompSize)
+ println("literals type:", litType, "litRegenSize:", litRegenSize, "litCompSize:", litCompSize, "sizeFormat:", sizeFormat, "4X:", fourStreams)
}
var literals []byte
var huff *huff0.Scratch
@@ -426,7 +432,6 @@
}
literals = in[:litCompSize]
in = in[litCompSize:]
-
huff = huffDecoderPool.Get().(*huff0.Scratch)
var err error
// Ensure we have space to store it.
@@ -637,7 +642,7 @@
hist.huffTree = huff
}
if debug {
- println("Final literals:", len(literals), "and", nSeqs, "sequences.")
+ println("Final literals:", len(literals), "hash:", xxhash.Sum64(literals), "and", nSeqs, "sequences.")
}
if nSeqs == 0 {