blob: 953147d01bf9b39ef6fcb9a489321e6d26f22668 [file] [log] [blame]
kesavand2cde6582020-06-22 04:56:23 -04001package sarama
2
3import (
4 "bytes"
5 "compress/gzip"
6 "fmt"
kesavandc71914f2022-03-25 11:19:03 +05307 "io"
kesavand2cde6582020-06-22 04:56:23 -04008 "sync"
9
kesavandc71914f2022-03-25 11:19:03 +053010 snappy "github.com/eapache/go-xerial-snappy"
kesavand2cde6582020-06-22 04:56:23 -040011 "github.com/pierrec/lz4"
12)
13
14var (
15 lz4ReaderPool = sync.Pool{
16 New: func() interface{} {
17 return lz4.NewReader(nil)
18 },
19 }
20
21 gzipReaderPool sync.Pool
22)
23
24func decompress(cc CompressionCodec, data []byte) ([]byte, error) {
25 switch cc {
26 case CompressionNone:
27 return data, nil
28 case CompressionGZIP:
kesavandc71914f2022-03-25 11:19:03 +053029 var err error
30 reader, ok := gzipReaderPool.Get().(*gzip.Reader)
31 if !ok {
kesavand2cde6582020-06-22 04:56:23 -040032 reader, err = gzip.NewReader(bytes.NewReader(data))
kesavandc71914f2022-03-25 11:19:03 +053033 } else {
34 err = reader.Reset(bytes.NewReader(data))
35 }
36
37 if err != nil {
38 return nil, err
kesavand2cde6582020-06-22 04:56:23 -040039 }
40
41 defer gzipReaderPool.Put(reader)
42
kesavandc71914f2022-03-25 11:19:03 +053043 return io.ReadAll(reader)
kesavand2cde6582020-06-22 04:56:23 -040044 case CompressionSnappy:
45 return snappy.Decode(data)
46 case CompressionLZ4:
kesavandc71914f2022-03-25 11:19:03 +053047 reader, ok := lz4ReaderPool.Get().(*lz4.Reader)
48 if !ok {
49 reader = lz4.NewReader(bytes.NewReader(data))
50 } else {
51 reader.Reset(bytes.NewReader(data))
52 }
kesavand2cde6582020-06-22 04:56:23 -040053 defer lz4ReaderPool.Put(reader)
54
kesavandc71914f2022-03-25 11:19:03 +053055 return io.ReadAll(reader)
kesavand2cde6582020-06-22 04:56:23 -040056 case CompressionZSTD:
kesavandc71914f2022-03-25 11:19:03 +053057 return zstdDecompress(ZstdDecoderParams{}, nil, data)
kesavand2cde6582020-06-22 04:56:23 -040058 default:
59 return nil, PacketDecodingError{fmt.Sprintf("invalid compression specified (%d)", cc)}
60 }
61}