blob: eaccbfc268ecade69ab45cd1b85485f6bad2ca5b [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001package sarama
2
3import (
4 "bytes"
5 "compress/gzip"
6 "fmt"
7 "io/ioutil"
8 "sync"
9
10 "github.com/eapache/go-xerial-snappy"
11 "github.com/pierrec/lz4"
12)
13
14var (
15 lz4ReaderPool = sync.Pool{
16 New: func() interface{} {
17 return lz4.NewReader(nil)
18 },
19 }
20
21 gzipReaderPool sync.Pool
22)
23
24func decompress(cc CompressionCodec, data []byte) ([]byte, error) {
25 switch cc {
26 case CompressionNone:
27 return data, nil
28 case CompressionGZIP:
29 var (
30 err error
31 reader *gzip.Reader
32 readerIntf = gzipReaderPool.Get()
33 )
34 if readerIntf != nil {
35 reader = readerIntf.(*gzip.Reader)
36 } else {
37 reader, err = gzip.NewReader(bytes.NewReader(data))
38 if err != nil {
39 return nil, err
40 }
41 }
42
43 defer gzipReaderPool.Put(reader)
44
45 if err := reader.Reset(bytes.NewReader(data)); err != nil {
46 return nil, err
47 }
48
49 return ioutil.ReadAll(reader)
50 case CompressionSnappy:
51 return snappy.Decode(data)
52 case CompressionLZ4:
53 reader := lz4ReaderPool.Get().(*lz4.Reader)
54 defer lz4ReaderPool.Put(reader)
55
56 reader.Reset(bytes.NewReader(data))
57 return ioutil.ReadAll(reader)
58 case CompressionZSTD:
59 return zstdDecompress(nil, data)
60 default:
61 return nil, PacketDecodingError{fmt.Sprintf("invalid compression specified (%d)", cc)}
62 }
63}