blob: f4100f5606265ca1b4e3d30520fce1c29967ce3d [file] [log] [blame]
kesavand2cde6582020-06-22 04:56:23 -04001package sarama
2
kesavandc71914f2022-03-25 11:19:03 +05303// ConsumerGroupMemberMetadata holds the metadata for consumer group
4// https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/ConsumerProtocolSubscription.json
kesavand2cde6582020-06-22 04:56:23 -04005type ConsumerGroupMemberMetadata struct {
kesavandc71914f2022-03-25 11:19:03 +05306 Version int16
7 Topics []string
8 UserData []byte
9 OwnedPartitions []*OwnedPartition
kesavand2cde6582020-06-22 04:56:23 -040010}
11
12func (m *ConsumerGroupMemberMetadata) encode(pe packetEncoder) error {
13 pe.putInt16(m.Version)
14
15 if err := pe.putStringArray(m.Topics); err != nil {
16 return err
17 }
18
19 if err := pe.putBytes(m.UserData); err != nil {
20 return err
21 }
22
23 return nil
24}
25
26func (m *ConsumerGroupMemberMetadata) decode(pd packetDecoder) (err error) {
27 if m.Version, err = pd.getInt16(); err != nil {
28 return
29 }
30
31 if m.Topics, err = pd.getStringArray(); err != nil {
32 return
33 }
34
35 if m.UserData, err = pd.getBytes(); err != nil {
36 return
37 }
kesavandc71914f2022-03-25 11:19:03 +053038 if m.Version >= 1 {
39 n, err := pd.getArrayLength()
40 if err != nil {
41 // permit missing data here in case of misbehaving 3rd party
42 // clients who incorrectly marked the member metadata as V1 in
43 // their JoinGroup request
44 if err == ErrInsufficientData {
45 return nil
46 }
47 return err
48 }
49 if n == 0 {
50 return nil
51 }
52 m.OwnedPartitions = make([]*OwnedPartition, n)
53 for i := 0; i < n; i++ {
54 m.OwnedPartitions[i] = &OwnedPartition{}
55 if err := m.OwnedPartitions[i].decode(pd); err != nil {
56 return err
57 }
58 }
59 }
kesavand2cde6582020-06-22 04:56:23 -040060
61 return nil
62}
63
kesavandc71914f2022-03-25 11:19:03 +053064type OwnedPartition struct {
65 Topic string
66 Partitions []int32
67}
68
69func (m *OwnedPartition) decode(pd packetDecoder) (err error) {
70 if m.Topic, err = pd.getString(); err != nil {
71 return err
72 }
73 if m.Partitions, err = pd.getInt32Array(); err != nil {
74 return err
75 }
76
77 return nil
78}
79
80// ConsumerGroupMemberAssignment holds the member assignment for a consume group
81// https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/ConsumerProtocolAssignment.json
kesavand2cde6582020-06-22 04:56:23 -040082type ConsumerGroupMemberAssignment struct {
83 Version int16
84 Topics map[string][]int32
85 UserData []byte
86}
87
88func (m *ConsumerGroupMemberAssignment) encode(pe packetEncoder) error {
89 pe.putInt16(m.Version)
90
91 if err := pe.putArrayLength(len(m.Topics)); err != nil {
92 return err
93 }
94
95 for topic, partitions := range m.Topics {
96 if err := pe.putString(topic); err != nil {
97 return err
98 }
99 if err := pe.putInt32Array(partitions); err != nil {
100 return err
101 }
102 }
103
104 if err := pe.putBytes(m.UserData); err != nil {
105 return err
106 }
107
108 return nil
109}
110
111func (m *ConsumerGroupMemberAssignment) decode(pd packetDecoder) (err error) {
112 if m.Version, err = pd.getInt16(); err != nil {
113 return
114 }
115
116 var topicLen int
117 if topicLen, err = pd.getArrayLength(); err != nil {
118 return
119 }
120
121 m.Topics = make(map[string][]int32, topicLen)
122 for i := 0; i < topicLen; i++ {
123 var topic string
124 if topic, err = pd.getString(); err != nil {
125 return
126 }
127 if m.Topics[topic], err = pd.getInt32Array(); err != nil {
128 return
129 }
130 }
131
132 if m.UserData, err = pd.getBytes(); err != nil {
133 return
134 }
135
136 return nil
137}