blob: 5752acc8aeb78dd89926e5861e85553f2f5077ec [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001package sarama
2
3type JoinGroupResponse struct {
4 Version int16
5 ThrottleTime int32
6 Err KError
7 GenerationId int32
8 GroupProtocol string
9 LeaderId string
10 MemberId string
11 Members map[string][]byte
12}
13
14func (r *JoinGroupResponse) GetMembers() (map[string]ConsumerGroupMemberMetadata, error) {
15 members := make(map[string]ConsumerGroupMemberMetadata, len(r.Members))
16 for id, bin := range r.Members {
17 meta := new(ConsumerGroupMemberMetadata)
18 if err := decode(bin, meta); err != nil {
19 return nil, err
20 }
21 members[id] = *meta
22 }
23 return members, nil
24}
25
26func (r *JoinGroupResponse) encode(pe packetEncoder) error {
27 if r.Version >= 2 {
28 pe.putInt32(r.ThrottleTime)
29 }
30 pe.putInt16(int16(r.Err))
31 pe.putInt32(r.GenerationId)
32
33 if err := pe.putString(r.GroupProtocol); err != nil {
34 return err
35 }
36 if err := pe.putString(r.LeaderId); err != nil {
37 return err
38 }
39 if err := pe.putString(r.MemberId); err != nil {
40 return err
41 }
42
43 if err := pe.putArrayLength(len(r.Members)); err != nil {
44 return err
45 }
46
47 for memberId, memberMetadata := range r.Members {
48 if err := pe.putString(memberId); err != nil {
49 return err
50 }
51
52 if err := pe.putBytes(memberMetadata); err != nil {
53 return err
54 }
55 }
56
57 return nil
58}
59
60func (r *JoinGroupResponse) decode(pd packetDecoder, version int16) (err error) {
61 r.Version = version
62
63 if version >= 2 {
64 if r.ThrottleTime, err = pd.getInt32(); err != nil {
65 return
66 }
67 }
68
69 kerr, err := pd.getInt16()
70 if err != nil {
71 return err
72 }
73
74 r.Err = KError(kerr)
75
76 if r.GenerationId, err = pd.getInt32(); err != nil {
77 return
78 }
79
80 if r.GroupProtocol, err = pd.getString(); err != nil {
81 return
82 }
83
84 if r.LeaderId, err = pd.getString(); err != nil {
85 return
86 }
87
88 if r.MemberId, err = pd.getString(); err != nil {
89 return
90 }
91
92 n, err := pd.getArrayLength()
93 if err != nil {
94 return err
95 }
96 if n == 0 {
97 return nil
98 }
99
100 r.Members = make(map[string][]byte)
101 for i := 0; i < n; i++ {
102 memberId, err := pd.getString()
103 if err != nil {
104 return err
105 }
106
107 memberMetadata, err := pd.getBytes()
108 if err != nil {
109 return err
110 }
111
112 r.Members[memberId] = memberMetadata
113 }
114
115 return nil
116}
117
118func (r *JoinGroupResponse) key() int16 {
119 return 11
120}
121
122func (r *JoinGroupResponse) version() int16 {
123 return r.Version
124}
125
126func (r *JoinGroupResponse) requiredVersion() KafkaVersion {
127 switch r.Version {
128 case 2:
129 return V0_11_0_0
130 case 1:
131 return V0_10_1_0
132 default:
133 return V0_9_0_0
134 }
135}