blob: 51d3309c05f5d26640b9d87c584dc800b6ea30d3 [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001package sarama
2
3import (
4 "fmt"
5 "time"
6)
7
8// CompressionCodec represents the various compression codecs recognized by Kafka in messages.
9type CompressionCodec int8
10
11// The lowest 3 bits contain the compression codec used for the message
12const compressionCodecMask int8 = 0x07
13
14const (
15 CompressionNone CompressionCodec = 0
16 CompressionGZIP CompressionCodec = 1
17 CompressionSnappy CompressionCodec = 2
18 CompressionLZ4 CompressionCodec = 3
19 CompressionZSTD CompressionCodec = 4
20)
21
22func (cc CompressionCodec) String() string {
23 return []string{
24 "none",
25 "gzip",
26 "snappy",
27 "lz4",
28 }[int(cc)]
29}
30
31// CompressionLevelDefault is the constant to use in CompressionLevel
32// to have the default compression level for any codec. The value is picked
33// that we don't use any existing compression levels.
34const CompressionLevelDefault = -1000
35
36type Message struct {
37 Codec CompressionCodec // codec used to compress the message contents
38 CompressionLevel int // compression level
39 Key []byte // the message key, may be nil
40 Value []byte // the message contents
41 Set *MessageSet // the message set a message might wrap
42 Version int8 // v1 requires Kafka 0.10
43 Timestamp time.Time // the timestamp of the message (version 1+ only)
44
45 compressedCache []byte
46 compressedSize int // used for computing the compression ratio metrics
47}
48
49func (m *Message) encode(pe packetEncoder) error {
50 pe.push(newCRC32Field(crcIEEE))
51
52 pe.putInt8(m.Version)
53
54 attributes := int8(m.Codec) & compressionCodecMask
55 pe.putInt8(attributes)
56
57 if m.Version >= 1 {
58 if err := (Timestamp{&m.Timestamp}).encode(pe); err != nil {
59 return err
60 }
61 }
62
63 err := pe.putBytes(m.Key)
64 if err != nil {
65 return err
66 }
67
68 var payload []byte
69
70 if m.compressedCache != nil {
71 payload = m.compressedCache
72 m.compressedCache = nil
73 } else if m.Value != nil {
74
75 payload, err = compress(m.Codec, m.CompressionLevel, m.Value)
76 if err != nil {
77 return err
78 }
79 m.compressedCache = payload
80 // Keep in mind the compressed payload size for metric gathering
81 m.compressedSize = len(payload)
82 }
83
84 if err = pe.putBytes(payload); err != nil {
85 return err
86 }
87
88 return pe.pop()
89}
90
91func (m *Message) decode(pd packetDecoder) (err error) {
92 err = pd.push(newCRC32Field(crcIEEE))
93 if err != nil {
94 return err
95 }
96
97 m.Version, err = pd.getInt8()
98 if err != nil {
99 return err
100 }
101
102 if m.Version > 1 {
103 return PacketDecodingError{fmt.Sprintf("unknown magic byte (%v)", m.Version)}
104 }
105
106 attribute, err := pd.getInt8()
107 if err != nil {
108 return err
109 }
110 m.Codec = CompressionCodec(attribute & compressionCodecMask)
111
112 if m.Version == 1 {
113 if err := (Timestamp{&m.Timestamp}).decode(pd); err != nil {
114 return err
115 }
116 }
117
118 m.Key, err = pd.getBytes()
119 if err != nil {
120 return err
121 }
122
123 m.Value, err = pd.getBytes()
124 if err != nil {
125 return err
126 }
127
128 // Required for deep equal assertion during tests but might be useful
129 // for future metrics about the compression ratio in fetch requests
130 m.compressedSize = len(m.Value)
131
132 switch m.Codec {
133 case CompressionNone:
134 // nothing to do
135 default:
136 if m.Value == nil {
137 break
138 }
139
140 m.Value, err = decompress(m.Codec, m.Value)
141 if err != nil {
142 return err
143 }
144 if err := m.decodeSet(); err != nil {
145 return err
146 }
147 }
148
149 return pd.pop()
150}
151
152// decodes a message set from a previousy encoded bulk-message
153func (m *Message) decodeSet() (err error) {
154 pd := realDecoder{raw: m.Value}
155 m.Set = &MessageSet{}
156 return m.Set.decode(&pd)
157}