blob: 12cd7c3d510489522dfe489a275ba770910ed71f [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001package sarama
2
3import (
4 "bytes"
5 "compress/gzip"
6 "fmt"
7 "sync"
8
khenaidood948f772021-08-11 17:49:24 -04009 snappy "github.com/eapache/go-xerial-snappy"
khenaidooac637102019-01-14 15:44:34 -050010 "github.com/pierrec/lz4"
11)
12
13var (
14 lz4WriterPool = sync.Pool{
15 New: func() interface{} {
16 return lz4.NewWriter(nil)
17 },
18 }
19
20 gzipWriterPool = sync.Pool{
21 New: func() interface{} {
22 return gzip.NewWriter(nil)
23 },
24 }
khenaidood948f772021-08-11 17:49:24 -040025 gzipWriterPoolForCompressionLevel1 = sync.Pool{
26 New: func() interface{} {
27 gz, err := gzip.NewWriterLevel(nil, 1)
28 if err != nil {
29 panic(err)
30 }
31 return gz
32 },
33 }
34 gzipWriterPoolForCompressionLevel2 = sync.Pool{
35 New: func() interface{} {
36 gz, err := gzip.NewWriterLevel(nil, 2)
37 if err != nil {
38 panic(err)
39 }
40 return gz
41 },
42 }
43 gzipWriterPoolForCompressionLevel3 = sync.Pool{
44 New: func() interface{} {
45 gz, err := gzip.NewWriterLevel(nil, 3)
46 if err != nil {
47 panic(err)
48 }
49 return gz
50 },
51 }
52 gzipWriterPoolForCompressionLevel4 = sync.Pool{
53 New: func() interface{} {
54 gz, err := gzip.NewWriterLevel(nil, 4)
55 if err != nil {
56 panic(err)
57 }
58 return gz
59 },
60 }
61 gzipWriterPoolForCompressionLevel5 = sync.Pool{
62 New: func() interface{} {
63 gz, err := gzip.NewWriterLevel(nil, 5)
64 if err != nil {
65 panic(err)
66 }
67 return gz
68 },
69 }
70 gzipWriterPoolForCompressionLevel6 = sync.Pool{
71 New: func() interface{} {
72 gz, err := gzip.NewWriterLevel(nil, 6)
73 if err != nil {
74 panic(err)
75 }
76 return gz
77 },
78 }
79 gzipWriterPoolForCompressionLevel7 = sync.Pool{
80 New: func() interface{} {
81 gz, err := gzip.NewWriterLevel(nil, 7)
82 if err != nil {
83 panic(err)
84 }
85 return gz
86 },
87 }
88 gzipWriterPoolForCompressionLevel8 = sync.Pool{
89 New: func() interface{} {
90 gz, err := gzip.NewWriterLevel(nil, 8)
91 if err != nil {
92 panic(err)
93 }
94 return gz
95 },
96 }
97 gzipWriterPoolForCompressionLevel9 = sync.Pool{
98 New: func() interface{} {
99 gz, err := gzip.NewWriterLevel(nil, 9)
100 if err != nil {
101 panic(err)
102 }
103 return gz
104 },
105 }
khenaidooac637102019-01-14 15:44:34 -0500106)
107
108func compress(cc CompressionCodec, level int, data []byte) ([]byte, error) {
109 switch cc {
110 case CompressionNone:
111 return data, nil
112 case CompressionGZIP:
113 var (
114 err error
115 buf bytes.Buffer
116 writer *gzip.Writer
117 )
khenaidood948f772021-08-11 17:49:24 -0400118
119 switch level {
120 case CompressionLevelDefault:
121 writer = gzipWriterPool.Get().(*gzip.Writer)
122 defer gzipWriterPool.Put(writer)
123 writer.Reset(&buf)
124 case 1:
125 writer = gzipWriterPoolForCompressionLevel1.Get().(*gzip.Writer)
126 defer gzipWriterPoolForCompressionLevel1.Put(writer)
127 writer.Reset(&buf)
128 case 2:
129 writer = gzipWriterPoolForCompressionLevel2.Get().(*gzip.Writer)
130 defer gzipWriterPoolForCompressionLevel2.Put(writer)
131 writer.Reset(&buf)
132 case 3:
133 writer = gzipWriterPoolForCompressionLevel3.Get().(*gzip.Writer)
134 defer gzipWriterPoolForCompressionLevel3.Put(writer)
135 writer.Reset(&buf)
136 case 4:
137 writer = gzipWriterPoolForCompressionLevel4.Get().(*gzip.Writer)
138 defer gzipWriterPoolForCompressionLevel4.Put(writer)
139 writer.Reset(&buf)
140 case 5:
141 writer = gzipWriterPoolForCompressionLevel5.Get().(*gzip.Writer)
142 defer gzipWriterPoolForCompressionLevel5.Put(writer)
143 writer.Reset(&buf)
144 case 6:
145 writer = gzipWriterPoolForCompressionLevel6.Get().(*gzip.Writer)
146 defer gzipWriterPoolForCompressionLevel6.Put(writer)
147 writer.Reset(&buf)
148 case 7:
149 writer = gzipWriterPoolForCompressionLevel7.Get().(*gzip.Writer)
150 defer gzipWriterPoolForCompressionLevel7.Put(writer)
151 writer.Reset(&buf)
152 case 8:
153 writer = gzipWriterPoolForCompressionLevel8.Get().(*gzip.Writer)
154 defer gzipWriterPoolForCompressionLevel8.Put(writer)
155 writer.Reset(&buf)
156 case 9:
157 writer = gzipWriterPoolForCompressionLevel9.Get().(*gzip.Writer)
158 defer gzipWriterPoolForCompressionLevel9.Put(writer)
159 writer.Reset(&buf)
160 default:
khenaidooac637102019-01-14 15:44:34 -0500161 writer, err = gzip.NewWriterLevel(&buf, level)
162 if err != nil {
163 return nil, err
164 }
khenaidooac637102019-01-14 15:44:34 -0500165 }
166 if _, err := writer.Write(data); err != nil {
167 return nil, err
168 }
169 if err := writer.Close(); err != nil {
170 return nil, err
171 }
172 return buf.Bytes(), nil
173 case CompressionSnappy:
174 return snappy.Encode(data), nil
175 case CompressionLZ4:
176 writer := lz4WriterPool.Get().(*lz4.Writer)
177 defer lz4WriterPool.Put(writer)
178
179 var buf bytes.Buffer
180 writer.Reset(&buf)
181
182 if _, err := writer.Write(data); err != nil {
183 return nil, err
184 }
185 if err := writer.Close(); err != nil {
186 return nil, err
187 }
188 return buf.Bytes(), nil
189 case CompressionZSTD:
khenaidood948f772021-08-11 17:49:24 -0400190 return zstdCompress(nil, data)
khenaidooac637102019-01-14 15:44:34 -0500191 default:
192 return nil, PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", cc)}
193 }
194}