blob: 411da38ad204570a1a65f883477849af08466c39 [file] [log] [blame]
khenaidood948f772021-08-11 17:49:24 -04001package sarama
2
3import "time"
4
5type DescribeLogDirsResponse struct {
6 ThrottleTime time.Duration
7
8 // Version 0 and 1 are equal
9 // The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
10 Version int16
11
12 LogDirs []DescribeLogDirsResponseDirMetadata
13}
14
15func (r *DescribeLogDirsResponse) encode(pe packetEncoder) error {
16 pe.putInt32(int32(r.ThrottleTime / time.Millisecond))
17
18 if err := pe.putArrayLength(len(r.LogDirs)); err != nil {
19 return err
20 }
21
22 for _, dir := range r.LogDirs {
23 if err := dir.encode(pe); err != nil {
24 return err
25 }
26 }
27
28 return nil
29}
30
31func (r *DescribeLogDirsResponse) decode(pd packetDecoder, version int16) error {
32 throttleTime, err := pd.getInt32()
33 if err != nil {
34 return err
35 }
36 r.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
37
38 // Decode array of DescribeLogDirsResponseDirMetadata
39 n, err := pd.getArrayLength()
40 if err != nil {
41 return err
42 }
43
44 r.LogDirs = make([]DescribeLogDirsResponseDirMetadata, n)
45 for i := 0; i < n; i++ {
46 dir := DescribeLogDirsResponseDirMetadata{}
47 if err := dir.decode(pd, version); err != nil {
48 return err
49 }
50 r.LogDirs[i] = dir
51 }
52
53 return nil
54}
55
56func (r *DescribeLogDirsResponse) key() int16 {
57 return 35
58}
59
60func (r *DescribeLogDirsResponse) version() int16 {
61 return r.Version
62}
63
64func (r *DescribeLogDirsResponse) headerVersion() int16 {
65 return 0
66}
67
68func (r *DescribeLogDirsResponse) requiredVersion() KafkaVersion {
69 return V1_0_0_0
70}
71
72type DescribeLogDirsResponseDirMetadata struct {
73 ErrorCode KError
74
75 // The absolute log directory path
76 Path string
77 Topics []DescribeLogDirsResponseTopic
78}
79
80func (r *DescribeLogDirsResponseDirMetadata) encode(pe packetEncoder) error {
81 pe.putInt16(int16(r.ErrorCode))
82
83 if err := pe.putString(r.Path); err != nil {
84 return err
85 }
86
87 if err := pe.putArrayLength(len(r.Topics)); err != nil {
88 return err
89 }
90 for _, topic := range r.Topics {
91 if err := topic.encode(pe); err != nil {
92 return err
93 }
94 }
95
96 return nil
97}
98
99func (r *DescribeLogDirsResponseDirMetadata) decode(pd packetDecoder, version int16) error {
100 errCode, err := pd.getInt16()
101 if err != nil {
102 return err
103 }
104 r.ErrorCode = KError(errCode)
105
106 path, err := pd.getString()
107 if err != nil {
108 return err
109 }
110 r.Path = path
111
112 // Decode array of DescribeLogDirsResponseTopic
113 n, err := pd.getArrayLength()
114 if err != nil {
115 return err
116 }
117
118 r.Topics = make([]DescribeLogDirsResponseTopic, n)
119 for i := 0; i < n; i++ {
120 t := DescribeLogDirsResponseTopic{}
121
122 if err := t.decode(pd, version); err != nil {
123 return err
124 }
125
126 r.Topics[i] = t
127 }
128
129 return nil
130}
131
132// DescribeLogDirsResponseTopic contains a topic's partitions descriptions
133type DescribeLogDirsResponseTopic struct {
134 Topic string
135 Partitions []DescribeLogDirsResponsePartition
136}
137
138func (r *DescribeLogDirsResponseTopic) encode(pe packetEncoder) error {
139 if err := pe.putString(r.Topic); err != nil {
140 return err
141 }
142
143 if err := pe.putArrayLength(len(r.Partitions)); err != nil {
144 return err
145 }
146 for _, partition := range r.Partitions {
147 if err := partition.encode(pe); err != nil {
148 return err
149 }
150 }
151
152 return nil
153}
154
155func (r *DescribeLogDirsResponseTopic) decode(pd packetDecoder, version int16) error {
156 t, err := pd.getString()
157 if err != nil {
158 return err
159 }
160 r.Topic = t
161
162 n, err := pd.getArrayLength()
163 if err != nil {
164 return err
165 }
166 r.Partitions = make([]DescribeLogDirsResponsePartition, n)
167 for i := 0; i < n; i++ {
168 p := DescribeLogDirsResponsePartition{}
169 if err := p.decode(pd, version); err != nil {
170 return err
171 }
172 r.Partitions[i] = p
173 }
174
175 return nil
176}
177
178// DescribeLogDirsResponsePartition describes a partition's log directory
179type DescribeLogDirsResponsePartition struct {
180 PartitionID int32
181
182 // The size of the log segments of the partition in bytes.
183 Size int64
184
185 // The lag of the log's LEO w.r.t. partition's HW (if it is the current log for the partition) or
186 // current replica's LEO (if it is the future log for the partition)
187 OffsetLag int64
188
189 // True if this log is created by AlterReplicaLogDirsRequest and will replace the current log of
190 // the replica in the future.
191 IsTemporary bool
192}
193
194func (r *DescribeLogDirsResponsePartition) encode(pe packetEncoder) error {
195 pe.putInt32(r.PartitionID)
196 pe.putInt64(r.Size)
197 pe.putInt64(r.OffsetLag)
198 pe.putBool(r.IsTemporary)
199
200 return nil
201}
202
203func (r *DescribeLogDirsResponsePartition) decode(pd packetDecoder, version int16) error {
204 pID, err := pd.getInt32()
205 if err != nil {
206 return err
207 }
208 r.PartitionID = pID
209
210 size, err := pd.getInt64()
211 if err != nil {
212 return err
213 }
214 r.Size = size
215
216 lag, err := pd.getInt64()
217 if err != nil {
218 return err
219 }
220 r.OffsetLag = lag
221
222 isTemp, err := pd.getBool()
223 if err != nil {
224 return err
225 }
226 r.IsTemporary = isTemp
227
228 return nil
229}