blob: 6523ec2f74d5e9767872d5e9fc41de51b4bf1374 [file] [log] [blame]
Scott Bakered4efab2020-01-13 19:12:25 -08001package sarama
2
3type MessageBlock struct {
4 Offset int64
5 Msg *Message
6}
7
8// Messages convenience helper which returns either all the
9// messages that are wrapped in this block
10func (msb *MessageBlock) Messages() []*MessageBlock {
11 if msb.Msg.Set != nil {
12 return msb.Msg.Set.Messages
13 }
14 return []*MessageBlock{msb}
15}
16
17func (msb *MessageBlock) encode(pe packetEncoder) error {
18 pe.putInt64(msb.Offset)
19 pe.push(&lengthField{})
20 err := msb.Msg.encode(pe)
21 if err != nil {
22 return err
23 }
24 return pe.pop()
25}
26
27func (msb *MessageBlock) decode(pd packetDecoder) (err error) {
28 if msb.Offset, err = pd.getInt64(); err != nil {
29 return err
30 }
31
32 lengthDecoder := acquireLengthField()
33 defer releaseLengthField(lengthDecoder)
34
35 if err = pd.push(lengthDecoder); err != nil {
36 return err
37 }
38
39 msb.Msg = new(Message)
40 if err = msb.Msg.decode(pd); err != nil {
41 return err
42 }
43
44 if err = pd.pop(); err != nil {
45 return err
46 }
47
48 return nil
49}
50
51type MessageSet struct {
52 PartialTrailingMessage bool // whether the set on the wire contained an incomplete trailing MessageBlock
53 OverflowMessage bool // whether the set on the wire contained an overflow message
54 Messages []*MessageBlock
55}
56
57func (ms *MessageSet) encode(pe packetEncoder) error {
58 for i := range ms.Messages {
59 err := ms.Messages[i].encode(pe)
60 if err != nil {
61 return err
62 }
63 }
64 return nil
65}
66
67func (ms *MessageSet) decode(pd packetDecoder) (err error) {
68 ms.Messages = nil
69
70 for pd.remaining() > 0 {
71 magic, err := magicValue(pd)
72 if err != nil {
73 if err == ErrInsufficientData {
74 ms.PartialTrailingMessage = true
75 return nil
76 }
77 return err
78 }
79
80 if magic > 1 {
81 return nil
82 }
83
84 msb := new(MessageBlock)
85 err = msb.decode(pd)
86 switch err {
87 case nil:
88 ms.Messages = append(ms.Messages, msb)
89 case ErrInsufficientData:
90 // As an optimization the server is allowed to return a partial message at the
91 // end of the message set. Clients should handle this case. So we just ignore such things.
92 if msb.Offset == -1 {
93 // This is an overflow message caused by chunked down conversion
94 ms.OverflowMessage = true
95 } else {
96 ms.PartialTrailingMessage = true
97 }
98 return nil
99 default:
100 return err
101 }
102 }
103
104 return nil
105}
106
107func (ms *MessageSet) addMessage(msg *Message) {
108 block := new(MessageBlock)
109 block.Msg = msg
110 ms.Messages = append(ms.Messages, block)
111}