blob: 44d5cc91b75150eda56f47fb1c9238b7908e34cc [file] [log] [blame]
Matteo Scandolo9a2772a2018-11-19 14:56:26 -08001package sarama
2
3import (
4 "bytes"
5 "compress/gzip"
6 "fmt"
7 "io/ioutil"
8 "time"
9
10 "github.com/eapache/go-xerial-snappy"
11 "github.com/pierrec/lz4"
12)
13
14// CompressionCodec represents the various compression codecs recognized by Kafka in messages.
15type CompressionCodec int8
16
17// The lowest 3 bits contain the compression codec used for the message
18const compressionCodecMask int8 = 0x07
19
20const (
21 CompressionNone CompressionCodec = 0
22 CompressionGZIP CompressionCodec = 1
23 CompressionSnappy CompressionCodec = 2
24 CompressionLZ4 CompressionCodec = 3
25 CompressionZSTD CompressionCodec = 4
26)
27
28func (cc CompressionCodec) String() string {
29 return []string{
30 "none",
31 "gzip",
32 "snappy",
33 "lz4",
34 }[int(cc)]
35}
36
37// CompressionLevelDefault is the constant to use in CompressionLevel
38// to have the default compression level for any codec. The value is picked
39// that we don't use any existing compression levels.
40const CompressionLevelDefault = -1000
41
42type Message struct {
43 Codec CompressionCodec // codec used to compress the message contents
44 CompressionLevel int // compression level
45 Key []byte // the message key, may be nil
46 Value []byte // the message contents
47 Set *MessageSet // the message set a message might wrap
48 Version int8 // v1 requires Kafka 0.10
49 Timestamp time.Time // the timestamp of the message (version 1+ only)
50
51 compressedCache []byte
52 compressedSize int // used for computing the compression ratio metrics
53}
54
55func (m *Message) encode(pe packetEncoder) error {
56 pe.push(newCRC32Field(crcIEEE))
57
58 pe.putInt8(m.Version)
59
60 attributes := int8(m.Codec) & compressionCodecMask
61 pe.putInt8(attributes)
62
63 if m.Version >= 1 {
64 if err := (Timestamp{&m.Timestamp}).encode(pe); err != nil {
65 return err
66 }
67 }
68
69 err := pe.putBytes(m.Key)
70 if err != nil {
71 return err
72 }
73
74 var payload []byte
75
76 if m.compressedCache != nil {
77 payload = m.compressedCache
78 m.compressedCache = nil
79 } else if m.Value != nil {
80 switch m.Codec {
81 case CompressionNone:
82 payload = m.Value
83 case CompressionGZIP:
84 var buf bytes.Buffer
85 var writer *gzip.Writer
86 if m.CompressionLevel != CompressionLevelDefault {
87 writer, err = gzip.NewWriterLevel(&buf, m.CompressionLevel)
88 if err != nil {
89 return err
90 }
91 } else {
92 writer = gzip.NewWriter(&buf)
93 }
94 if _, err = writer.Write(m.Value); err != nil {
95 return err
96 }
97 if err = writer.Close(); err != nil {
98 return err
99 }
100 m.compressedCache = buf.Bytes()
101 payload = m.compressedCache
102 case CompressionSnappy:
103 tmp := snappy.Encode(m.Value)
104 m.compressedCache = tmp
105 payload = m.compressedCache
106 case CompressionLZ4:
107 var buf bytes.Buffer
108 writer := lz4.NewWriter(&buf)
109 if _, err = writer.Write(m.Value); err != nil {
110 return err
111 }
112 if err = writer.Close(); err != nil {
113 return err
114 }
115 m.compressedCache = buf.Bytes()
116 payload = m.compressedCache
117 case CompressionZSTD:
118 c, err := zstdCompressLevel(nil, m.Value, m.CompressionLevel)
119 if err != nil {
120 return err
121 }
122 m.compressedCache = c
123 payload = m.compressedCache
124 default:
125 return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", m.Codec)}
126 }
127 // Keep in mind the compressed payload size for metric gathering
128 m.compressedSize = len(payload)
129 }
130
131 if err = pe.putBytes(payload); err != nil {
132 return err
133 }
134
135 return pe.pop()
136}
137
138func (m *Message) decode(pd packetDecoder) (err error) {
139 err = pd.push(newCRC32Field(crcIEEE))
140 if err != nil {
141 return err
142 }
143
144 m.Version, err = pd.getInt8()
145 if err != nil {
146 return err
147 }
148
149 if m.Version > 1 {
150 return PacketDecodingError{fmt.Sprintf("unknown magic byte (%v)", m.Version)}
151 }
152
153 attribute, err := pd.getInt8()
154 if err != nil {
155 return err
156 }
157 m.Codec = CompressionCodec(attribute & compressionCodecMask)
158
159 if m.Version == 1 {
160 if err := (Timestamp{&m.Timestamp}).decode(pd); err != nil {
161 return err
162 }
163 }
164
165 m.Key, err = pd.getBytes()
166 if err != nil {
167 return err
168 }
169
170 m.Value, err = pd.getBytes()
171 if err != nil {
172 return err
173 }
174
175 // Required for deep equal assertion during tests but might be useful
176 // for future metrics about the compression ratio in fetch requests
177 m.compressedSize = len(m.Value)
178
179 switch m.Codec {
180 case CompressionNone:
181 // nothing to do
182 case CompressionGZIP:
183 if m.Value == nil {
184 break
185 }
186 reader, err := gzip.NewReader(bytes.NewReader(m.Value))
187 if err != nil {
188 return err
189 }
190 if m.Value, err = ioutil.ReadAll(reader); err != nil {
191 return err
192 }
193 if err := m.decodeSet(); err != nil {
194 return err
195 }
196 case CompressionSnappy:
197 if m.Value == nil {
198 break
199 }
200 if m.Value, err = snappy.Decode(m.Value); err != nil {
201 return err
202 }
203 if err := m.decodeSet(); err != nil {
204 return err
205 }
206 case CompressionLZ4:
207 if m.Value == nil {
208 break
209 }
210 reader := lz4.NewReader(bytes.NewReader(m.Value))
211 if m.Value, err = ioutil.ReadAll(reader); err != nil {
212 return err
213 }
214 if err := m.decodeSet(); err != nil {
215 return err
216 }
217 case CompressionZSTD:
218 if m.Value == nil {
219 break
220 }
221 if m.Value, err = zstdDecompress(nil, m.Value); err != nil {
222 return err
223 }
224 if err := m.decodeSet(); err != nil {
225 return err
226 }
227 default:
228 return PacketDecodingError{fmt.Sprintf("invalid compression specified (%d)", m.Codec)}
229 }
230
231 return pd.pop()
232}
233
234// decodes a message set from a previousy encoded bulk-message
235func (m *Message) decodeSet() (err error) {
236 pd := realDecoder{raw: m.Value}
237 m.Set = &MessageSet{}
238 return m.Set.decode(&pd)
239}