blob: a3fe8c0614eb1e249afbd734caeac26c4fe477fd [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001package sarama
2
3import (
4 "encoding/binary"
5 "time"
6)
7
8const (
Scott Baker8461e152019-10-01 14:44:30 -07009 isTransactionalMask = 0x10
khenaidooac637102019-01-14 15:44:34 -050010 controlMask = 0x20
11 maximumRecordOverhead = 5*binary.MaxVarintLen32 + binary.MaxVarintLen64 + 1
12)
13
khenaidood948f772021-08-11 17:49:24 -040014// RecordHeader stores key and value for a record header
khenaidooac637102019-01-14 15:44:34 -050015type RecordHeader struct {
16 Key []byte
17 Value []byte
18}
19
20func (h *RecordHeader) encode(pe packetEncoder) error {
21 if err := pe.putVarintBytes(h.Key); err != nil {
22 return err
23 }
24 return pe.putVarintBytes(h.Value)
25}
26
27func (h *RecordHeader) decode(pd packetDecoder) (err error) {
28 if h.Key, err = pd.getVarintBytes(); err != nil {
29 return err
30 }
31
32 if h.Value, err = pd.getVarintBytes(); err != nil {
33 return err
34 }
35 return nil
36}
37
khenaidood948f772021-08-11 17:49:24 -040038// Record is kafka record type
khenaidooac637102019-01-14 15:44:34 -050039type Record struct {
Scott Baker8461e152019-10-01 14:44:30 -070040 Headers []*RecordHeader
41
khenaidooac637102019-01-14 15:44:34 -050042 Attributes int8
43 TimestampDelta time.Duration
44 OffsetDelta int64
45 Key []byte
46 Value []byte
Scott Baker8461e152019-10-01 14:44:30 -070047 length varintLengthField
khenaidooac637102019-01-14 15:44:34 -050048}
49
50func (r *Record) encode(pe packetEncoder) error {
51 pe.push(&r.length)
52 pe.putInt8(r.Attributes)
53 pe.putVarint(int64(r.TimestampDelta / time.Millisecond))
54 pe.putVarint(r.OffsetDelta)
55 if err := pe.putVarintBytes(r.Key); err != nil {
56 return err
57 }
58 if err := pe.putVarintBytes(r.Value); err != nil {
59 return err
60 }
61 pe.putVarint(int64(len(r.Headers)))
62
63 for _, h := range r.Headers {
64 if err := h.encode(pe); err != nil {
65 return err
66 }
67 }
68
69 return pe.pop()
70}
71
72func (r *Record) decode(pd packetDecoder) (err error) {
73 if err = pd.push(&r.length); err != nil {
74 return err
75 }
76
77 if r.Attributes, err = pd.getInt8(); err != nil {
78 return err
79 }
80
81 timestamp, err := pd.getVarint()
82 if err != nil {
83 return err
84 }
85 r.TimestampDelta = time.Duration(timestamp) * time.Millisecond
86
87 if r.OffsetDelta, err = pd.getVarint(); err != nil {
88 return err
89 }
90
91 if r.Key, err = pd.getVarintBytes(); err != nil {
92 return err
93 }
94
95 if r.Value, err = pd.getVarintBytes(); err != nil {
96 return err
97 }
98
99 numHeaders, err := pd.getVarint()
100 if err != nil {
101 return err
102 }
103
104 if numHeaders >= 0 {
105 r.Headers = make([]*RecordHeader, numHeaders)
106 }
107 for i := int64(0); i < numHeaders; i++ {
108 hdr := new(RecordHeader)
109 if err := hdr.decode(pd); err != nil {
110 return err
111 }
112 r.Headers[i] = hdr
113 }
114
115 return pd.pop()
116}