blob: 600c7c4dfb7b1dc91a74df5d27b77c1cc3269d07 [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001package sarama
2
3type MessageBlock struct {
4 Offset int64
5 Msg *Message
6}
7
8// Messages convenience helper which returns either all the
9// messages that are wrapped in this block
10func (msb *MessageBlock) Messages() []*MessageBlock {
11 if msb.Msg.Set != nil {
12 return msb.Msg.Set.Messages
13 }
14 return []*MessageBlock{msb}
15}
16
17func (msb *MessageBlock) encode(pe packetEncoder) error {
18 pe.putInt64(msb.Offset)
19 pe.push(&lengthField{})
20 err := msb.Msg.encode(pe)
21 if err != nil {
22 return err
23 }
24 return pe.pop()
25}
26
27func (msb *MessageBlock) decode(pd packetDecoder) (err error) {
28 if msb.Offset, err = pd.getInt64(); err != nil {
29 return err
30 }
31
32 if err = pd.push(&lengthField{}); err != nil {
33 return err
34 }
35
36 msb.Msg = new(Message)
37 if err = msb.Msg.decode(pd); err != nil {
38 return err
39 }
40
41 if err = pd.pop(); err != nil {
42 return err
43 }
44
45 return nil
46}
47
48type MessageSet struct {
49 PartialTrailingMessage bool // whether the set on the wire contained an incomplete trailing MessageBlock
50 OverflowMessage bool // whether the set on the wire contained an overflow message
51 Messages []*MessageBlock
52}
53
54func (ms *MessageSet) encode(pe packetEncoder) error {
55 for i := range ms.Messages {
56 err := ms.Messages[i].encode(pe)
57 if err != nil {
58 return err
59 }
60 }
61 return nil
62}
63
64func (ms *MessageSet) decode(pd packetDecoder) (err error) {
65 ms.Messages = nil
66
67 for pd.remaining() > 0 {
68 magic, err := magicValue(pd)
69 if err != nil {
70 if err == ErrInsufficientData {
71 ms.PartialTrailingMessage = true
72 return nil
73 }
74 return err
75 }
76
77 if magic > 1 {
78 return nil
79 }
80
81 msb := new(MessageBlock)
82 err = msb.decode(pd)
83 switch err {
84 case nil:
85 ms.Messages = append(ms.Messages, msb)
86 case ErrInsufficientData:
87 // As an optimization the server is allowed to return a partial message at the
88 // end of the message set. Clients should handle this case. So we just ignore such things.
89 if msb.Offset == -1 {
90 // This is an overflow message caused by chunked down conversion
91 ms.OverflowMessage = true
92 } else {
93 ms.PartialTrailingMessage = true
94 }
95 return nil
96 default:
97 return err
98 }
99 }
100
101 return nil
102}
103
104func (ms *MessageSet) addMessage(msg *Message) {
105 block := new(MessageBlock)
106 block.Msg = msg
107 ms.Messages = append(ms.Messages, block)
108}