blob: 3734e82e406878a84492976c4889423c67fbba70 [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001package sarama
2
3type GroupProtocol struct {
4 Name string
5 Metadata []byte
6}
7
8func (p *GroupProtocol) decode(pd packetDecoder) (err error) {
9 p.Name, err = pd.getString()
10 if err != nil {
11 return err
12 }
13 p.Metadata, err = pd.getBytes()
14 return err
15}
16
17func (p *GroupProtocol) encode(pe packetEncoder) (err error) {
18 if err := pe.putString(p.Name); err != nil {
19 return err
20 }
21 if err := pe.putBytes(p.Metadata); err != nil {
22 return err
23 }
24 return nil
25}
26
27type JoinGroupRequest struct {
28 Version int16
29 GroupId string
30 SessionTimeout int32
31 RebalanceTimeout int32
32 MemberId string
33 ProtocolType string
34 GroupProtocols map[string][]byte // deprecated; use OrderedGroupProtocols
35 OrderedGroupProtocols []*GroupProtocol
36}
37
38func (r *JoinGroupRequest) encode(pe packetEncoder) error {
39 if err := pe.putString(r.GroupId); err != nil {
40 return err
41 }
42 pe.putInt32(r.SessionTimeout)
43 if r.Version >= 1 {
44 pe.putInt32(r.RebalanceTimeout)
45 }
46 if err := pe.putString(r.MemberId); err != nil {
47 return err
48 }
49 if err := pe.putString(r.ProtocolType); err != nil {
50 return err
51 }
52
53 if len(r.GroupProtocols) > 0 {
54 if len(r.OrderedGroupProtocols) > 0 {
55 return PacketDecodingError{"cannot specify both GroupProtocols and OrderedGroupProtocols on JoinGroupRequest"}
56 }
57
58 if err := pe.putArrayLength(len(r.GroupProtocols)); err != nil {
59 return err
60 }
61 for name, metadata := range r.GroupProtocols {
62 if err := pe.putString(name); err != nil {
63 return err
64 }
65 if err := pe.putBytes(metadata); err != nil {
66 return err
67 }
68 }
69 } else {
70 if err := pe.putArrayLength(len(r.OrderedGroupProtocols)); err != nil {
71 return err
72 }
73 for _, protocol := range r.OrderedGroupProtocols {
74 if err := protocol.encode(pe); err != nil {
75 return err
76 }
77 }
78 }
79
80 return nil
81}
82
83func (r *JoinGroupRequest) decode(pd packetDecoder, version int16) (err error) {
84 r.Version = version
85
86 if r.GroupId, err = pd.getString(); err != nil {
87 return
88 }
89
90 if r.SessionTimeout, err = pd.getInt32(); err != nil {
91 return
92 }
93
94 if version >= 1 {
95 if r.RebalanceTimeout, err = pd.getInt32(); err != nil {
96 return err
97 }
98 }
99
100 if r.MemberId, err = pd.getString(); err != nil {
101 return
102 }
103
104 if r.ProtocolType, err = pd.getString(); err != nil {
105 return
106 }
107
108 n, err := pd.getArrayLength()
109 if err != nil {
110 return err
111 }
112 if n == 0 {
113 return nil
114 }
115
116 r.GroupProtocols = make(map[string][]byte)
117 for i := 0; i < n; i++ {
118 protocol := &GroupProtocol{}
119 if err := protocol.decode(pd); err != nil {
120 return err
121 }
122 r.GroupProtocols[protocol.Name] = protocol.Metadata
123 r.OrderedGroupProtocols = append(r.OrderedGroupProtocols, protocol)
124 }
125
126 return nil
127}
128
129func (r *JoinGroupRequest) key() int16 {
130 return 11
131}
132
133func (r *JoinGroupRequest) version() int16 {
134 return r.Version
135}
136
khenaidood948f772021-08-11 17:49:24 -0400137func (r *JoinGroupRequest) headerVersion() int16 {
138 return 1
139}
140
khenaidooac637102019-01-14 15:44:34 -0500141func (r *JoinGroupRequest) requiredVersion() KafkaVersion {
142 switch r.Version {
143 case 2:
144 return V0_11_0_0
145 case 1:
146 return V0_10_1_0
147 default:
148 return V0_9_0_0
149 }
150}
151
152func (r *JoinGroupRequest) AddGroupProtocol(name string, metadata []byte) {
153 r.OrderedGroupProtocols = append(r.OrderedGroupProtocols, &GroupProtocol{
154 Name: name,
155 Metadata: metadata,
156 })
157}
158
159func (r *JoinGroupRequest) AddGroupProtocolMetadata(name string, metadata *ConsumerGroupMemberMetadata) error {
160 bin, err := encode(metadata, nil)
161 if err != nil {
162 return err
163 }
164
165 r.AddGroupProtocol(name, bin)
166 return nil
167}