blob: 4baa6a08e83e6e17714a36a14b7dedbc1e758782 [file] [log] [blame]
khenaidood948f772021-08-11 17:49:24 -04001package sarama
2
3type PartitionReplicaReassignmentsStatus struct {
4 Replicas []int32
5 AddingReplicas []int32
6 RemovingReplicas []int32
7}
8
9func (b *PartitionReplicaReassignmentsStatus) encode(pe packetEncoder) error {
10 if err := pe.putCompactInt32Array(b.Replicas); err != nil {
11 return err
12 }
13 if err := pe.putCompactInt32Array(b.AddingReplicas); err != nil {
14 return err
15 }
16 if err := pe.putCompactInt32Array(b.RemovingReplicas); err != nil {
17 return err
18 }
19
20 pe.putEmptyTaggedFieldArray()
21
22 return nil
23}
24
25func (b *PartitionReplicaReassignmentsStatus) decode(pd packetDecoder) (err error) {
26 if b.Replicas, err = pd.getCompactInt32Array(); err != nil {
27 return err
28 }
29
30 if b.AddingReplicas, err = pd.getCompactInt32Array(); err != nil {
31 return err
32 }
33
34 if b.RemovingReplicas, err = pd.getCompactInt32Array(); err != nil {
35 return err
36 }
37
38 if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
39 return err
40 }
41
42 return err
43}
44
45type ListPartitionReassignmentsResponse struct {
46 Version int16
47 ThrottleTimeMs int32
48 ErrorCode KError
49 ErrorMessage *string
50 TopicStatus map[string]map[int32]*PartitionReplicaReassignmentsStatus
51}
52
53func (r *ListPartitionReassignmentsResponse) AddBlock(topic string, partition int32, replicas, addingReplicas, removingReplicas []int32) {
54 if r.TopicStatus == nil {
55 r.TopicStatus = make(map[string]map[int32]*PartitionReplicaReassignmentsStatus)
56 }
57 partitions := r.TopicStatus[topic]
58 if partitions == nil {
59 partitions = make(map[int32]*PartitionReplicaReassignmentsStatus)
60 r.TopicStatus[topic] = partitions
61 }
62
63 partitions[partition] = &PartitionReplicaReassignmentsStatus{Replicas: replicas, AddingReplicas: addingReplicas, RemovingReplicas: removingReplicas}
64}
65
66func (r *ListPartitionReassignmentsResponse) encode(pe packetEncoder) error {
67 pe.putInt32(r.ThrottleTimeMs)
68 pe.putInt16(int16(r.ErrorCode))
69 if err := pe.putNullableCompactString(r.ErrorMessage); err != nil {
70 return err
71 }
72
73 pe.putCompactArrayLength(len(r.TopicStatus))
74 for topic, partitions := range r.TopicStatus {
75 if err := pe.putCompactString(topic); err != nil {
76 return err
77 }
78 pe.putCompactArrayLength(len(partitions))
79 for partition, block := range partitions {
80 pe.putInt32(partition)
81
82 if err := block.encode(pe); err != nil {
83 return err
84 }
85 }
86 pe.putEmptyTaggedFieldArray()
87 }
88
89 pe.putEmptyTaggedFieldArray()
90
91 return nil
92}
93
94func (r *ListPartitionReassignmentsResponse) decode(pd packetDecoder, version int16) (err error) {
95 r.Version = version
96
97 if r.ThrottleTimeMs, err = pd.getInt32(); err != nil {
98 return err
99 }
100
101 kerr, err := pd.getInt16()
102 if err != nil {
103 return err
104 }
105
106 r.ErrorCode = KError(kerr)
107
108 if r.ErrorMessage, err = pd.getCompactNullableString(); err != nil {
109 return err
110 }
111
112 numTopics, err := pd.getCompactArrayLength()
113 if err != nil {
114 return err
115 }
116
117 r.TopicStatus = make(map[string]map[int32]*PartitionReplicaReassignmentsStatus, numTopics)
118 for i := 0; i < numTopics; i++ {
119 topic, err := pd.getCompactString()
120 if err != nil {
121 return err
122 }
123
124 ongoingPartitionReassignments, err := pd.getCompactArrayLength()
125 if err != nil {
126 return err
127 }
128
129 r.TopicStatus[topic] = make(map[int32]*PartitionReplicaReassignmentsStatus, ongoingPartitionReassignments)
130
131 for j := 0; j < ongoingPartitionReassignments; j++ {
132 partition, err := pd.getInt32()
133 if err != nil {
134 return err
135 }
136
137 block := &PartitionReplicaReassignmentsStatus{}
138 if err := block.decode(pd); err != nil {
139 return err
140 }
141 r.TopicStatus[topic][partition] = block
142 }
143
144 if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
145 return err
146 }
147 }
148 if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
149 return err
150 }
151
152 return nil
153}
154
155func (r *ListPartitionReassignmentsResponse) key() int16 {
156 return 46
157}
158
159func (r *ListPartitionReassignmentsResponse) version() int16 {
160 return r.Version
161}
162
163func (r *ListPartitionReassignmentsResponse) headerVersion() int16 {
164 return 1
165}
166
167func (r *ListPartitionReassignmentsResponse) requiredVersion() KafkaVersion {
168 return V2_4_0_0
169}