blob: cded308cf0fd175a2ef2ef7ffebf2b7fbda7795a [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001package sarama
2
3import (
4 "encoding/binary"
5 "time"
6)
7
8const (
9 controlMask = 0x20
10 maximumRecordOverhead = 5*binary.MaxVarintLen32 + binary.MaxVarintLen64 + 1
11)
12
13type RecordHeader struct {
14 Key []byte
15 Value []byte
16}
17
18func (h *RecordHeader) encode(pe packetEncoder) error {
19 if err := pe.putVarintBytes(h.Key); err != nil {
20 return err
21 }
22 return pe.putVarintBytes(h.Value)
23}
24
25func (h *RecordHeader) decode(pd packetDecoder) (err error) {
26 if h.Key, err = pd.getVarintBytes(); err != nil {
27 return err
28 }
29
30 if h.Value, err = pd.getVarintBytes(); err != nil {
31 return err
32 }
33 return nil
34}
35
36type Record struct {
37 Attributes int8
38 TimestampDelta time.Duration
39 OffsetDelta int64
40 Key []byte
41 Value []byte
42 Headers []*RecordHeader
43
44 length varintLengthField
45}
46
47func (r *Record) encode(pe packetEncoder) error {
48 pe.push(&r.length)
49 pe.putInt8(r.Attributes)
50 pe.putVarint(int64(r.TimestampDelta / time.Millisecond))
51 pe.putVarint(r.OffsetDelta)
52 if err := pe.putVarintBytes(r.Key); err != nil {
53 return err
54 }
55 if err := pe.putVarintBytes(r.Value); err != nil {
56 return err
57 }
58 pe.putVarint(int64(len(r.Headers)))
59
60 for _, h := range r.Headers {
61 if err := h.encode(pe); err != nil {
62 return err
63 }
64 }
65
66 return pe.pop()
67}
68
69func (r *Record) decode(pd packetDecoder) (err error) {
70 if err = pd.push(&r.length); err != nil {
71 return err
72 }
73
74 if r.Attributes, err = pd.getInt8(); err != nil {
75 return err
76 }
77
78 timestamp, err := pd.getVarint()
79 if err != nil {
80 return err
81 }
82 r.TimestampDelta = time.Duration(timestamp) * time.Millisecond
83
84 if r.OffsetDelta, err = pd.getVarint(); err != nil {
85 return err
86 }
87
88 if r.Key, err = pd.getVarintBytes(); err != nil {
89 return err
90 }
91
92 if r.Value, err = pd.getVarintBytes(); err != nil {
93 return err
94 }
95
96 numHeaders, err := pd.getVarint()
97 if err != nil {
98 return err
99 }
100
101 if numHeaders >= 0 {
102 r.Headers = make([]*RecordHeader, numHeaders)
103 }
104 for i := int64(0); i < numHeaders; i++ {
105 hdr := new(RecordHeader)
106 if err := hdr.decode(pd); err != nil {
107 return err
108 }
109 r.Headers[i] = hdr
110 }
111
112 return pd.pop()
113}