blob: f64c79bbd1d137b45427871a5ed745ac9c3be7b4 [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001package sarama
2
3import (
4 "fmt"
5 "time"
6)
7
8// The lowest 3 bits contain the compression codec used for the message
9const compressionCodecMask int8 = 0x07
10
11// Bit 3 set for "LogAppend" timestamps
12const timestampTypeMask = 0x08
13
14// CompressionCodec represents the various compression codecs recognized by Kafka in messages.
15type CompressionCodec int8
16
17const (
18 CompressionNone CompressionCodec = 0
19 CompressionGZIP CompressionCodec = 1
20 CompressionSnappy CompressionCodec = 2
21 CompressionLZ4 CompressionCodec = 3
22 CompressionZSTD CompressionCodec = 4
23)
24
25func (cc CompressionCodec) String() string {
26 return []string{
27 "none",
28 "gzip",
29 "snappy",
30 "lz4",
31 }[int(cc)]
32}
33
34// CompressionLevelDefault is the constant to use in CompressionLevel
35// to have the default compression level for any codec. The value is picked
36// that we don't use any existing compression levels.
37const CompressionLevelDefault = -1000
38
39type Message struct {
40 Codec CompressionCodec // codec used to compress the message contents
41 CompressionLevel int // compression level
42 LogAppendTime bool // the used timestamp is LogAppendTime
43 Key []byte // the message key, may be nil
44 Value []byte // the message contents
45 Set *MessageSet // the message set a message might wrap
46 Version int8 // v1 requires Kafka 0.10
47 Timestamp time.Time // the timestamp of the message (version 1+ only)
48
49 compressedCache []byte
50 compressedSize int // used for computing the compression ratio metrics
51}
52
53func (m *Message) encode(pe packetEncoder) error {
54 pe.push(newCRC32Field(crcIEEE))
55
56 pe.putInt8(m.Version)
57
58 attributes := int8(m.Codec) & compressionCodecMask
59 if m.LogAppendTime {
60 attributes |= timestampTypeMask
61 }
62 pe.putInt8(attributes)
63
64 if m.Version >= 1 {
65 if err := (Timestamp{&m.Timestamp}).encode(pe); err != nil {
66 return err
67 }
68 }
69
70 err := pe.putBytes(m.Key)
71 if err != nil {
72 return err
73 }
74
75 var payload []byte
76
77 if m.compressedCache != nil {
78 payload = m.compressedCache
79 m.compressedCache = nil
80 } else if m.Value != nil {
81
82 payload, err = compress(m.Codec, m.CompressionLevel, m.Value)
83 if err != nil {
84 return err
85 }
86 m.compressedCache = payload
87 // Keep in mind the compressed payload size for metric gathering
88 m.compressedSize = len(payload)
89 }
90
91 if err = pe.putBytes(payload); err != nil {
92 return err
93 }
94
95 return pe.pop()
96}
97
98func (m *Message) decode(pd packetDecoder) (err error) {
99 err = pd.push(newCRC32Field(crcIEEE))
100 if err != nil {
101 return err
102 }
103
104 m.Version, err = pd.getInt8()
105 if err != nil {
106 return err
107 }
108
109 if m.Version > 1 {
110 return PacketDecodingError{fmt.Sprintf("unknown magic byte (%v)", m.Version)}
111 }
112
113 attribute, err := pd.getInt8()
114 if err != nil {
115 return err
116 }
117 m.Codec = CompressionCodec(attribute & compressionCodecMask)
118 m.LogAppendTime = attribute&timestampTypeMask == timestampTypeMask
119
120 if m.Version == 1 {
121 if err := (Timestamp{&m.Timestamp}).decode(pd); err != nil {
122 return err
123 }
124 }
125
126 m.Key, err = pd.getBytes()
127 if err != nil {
128 return err
129 }
130
131 m.Value, err = pd.getBytes()
132 if err != nil {
133 return err
134 }
135
136 // Required for deep equal assertion during tests but might be useful
137 // for future metrics about the compression ratio in fetch requests
138 m.compressedSize = len(m.Value)
139
140 switch m.Codec {
141 case CompressionNone:
142 // nothing to do
143 default:
144 if m.Value == nil {
145 break
146 }
147
148 m.Value, err = decompress(m.Codec, m.Value)
149 if err != nil {
150 return err
151 }
152 if err := m.decodeSet(); err != nil {
153 return err
154 }
155 }
156
157 return pd.pop()
158}
159
160// decodes a message set from a previously encoded bulk-message
161func (m *Message) decodeSet() (err error) {
162 pd := realDecoder{raw: m.Value}
163 m.Set = &MessageSet{}
164 return m.Set.decode(&pd)
165}