blob: d207312efbfca2535154777e1955404fd5c0f3e5 [file] [log] [blame]
Scott Bakered4efab2020-01-13 19:12:25 -08001package sarama
2
3import "time"
4
5type DescribeLogDirsResponse struct {
6 ThrottleTime time.Duration
7
8 // Version 0 and 1 are equal
9 // The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
10 Version int16
11
12 LogDirs []DescribeLogDirsResponseDirMetadata
13}
14
15func (r *DescribeLogDirsResponse) encode(pe packetEncoder) error {
16 pe.putInt32(int32(r.ThrottleTime / time.Millisecond))
17
18 if err := pe.putArrayLength(len(r.LogDirs)); err != nil {
19 return err
20 }
21
22 for _, dir := range r.LogDirs {
23 if err := dir.encode(pe); err != nil {
24 return err
25 }
26 }
27
28 return nil
29}
30
31func (r *DescribeLogDirsResponse) decode(pd packetDecoder, version int16) error {
32 throttleTime, err := pd.getInt32()
33 if err != nil {
34 return err
35 }
36 r.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
37
38 // Decode array of DescribeLogDirsResponseDirMetadata
39 n, err := pd.getArrayLength()
40 if err != nil {
41 return err
42 }
43
44 r.LogDirs = make([]DescribeLogDirsResponseDirMetadata, n)
45 for i := 0; i < n; i++ {
46 dir := DescribeLogDirsResponseDirMetadata{}
47 if err := dir.decode(pd, version); err != nil {
48 return err
49 }
50 r.LogDirs[i] = dir
51 }
52
53 return nil
54}
55
56func (r *DescribeLogDirsResponse) key() int16 {
57 return 35
58}
59
60func (r *DescribeLogDirsResponse) version() int16 {
61 return r.Version
62}
63
64func (r *DescribeLogDirsResponse) requiredVersion() KafkaVersion {
65 return V1_0_0_0
66}
67
68type DescribeLogDirsResponseDirMetadata struct {
69 ErrorCode KError
70
71 // The absolute log directory path
72 Path string
73 Topics []DescribeLogDirsResponseTopic
74}
75
76func (r *DescribeLogDirsResponseDirMetadata) encode(pe packetEncoder) error {
77 pe.putInt16(int16(r.ErrorCode))
78
79 if err := pe.putString(r.Path); err != nil {
80 return err
81 }
82
83 for _, topic := range r.Topics {
84 if err := topic.encode(pe); err != nil {
85 return err
86 }
87 }
88
89 return nil
90}
91
92func (r *DescribeLogDirsResponseDirMetadata) decode(pd packetDecoder, version int16) error {
93 errCode, err := pd.getInt16()
94 if err != nil {
95 return err
96 }
97 r.ErrorCode = KError(errCode)
98
99 path, err := pd.getString()
100 if err != nil {
101 return err
102 }
103 r.Path = path
104
105 // Decode array of DescribeLogDirsResponseTopic
106 n, err := pd.getArrayLength()
107 if err != nil {
108 return err
109 }
110
111 r.Topics = make([]DescribeLogDirsResponseTopic, n)
112 for i := 0; i < n; i++ {
113 t := DescribeLogDirsResponseTopic{}
114
115 if err := t.decode(pd, version); err != nil {
116 return err
117 }
118
119 r.Topics[i] = t
120 }
121
122 return nil
123}
124
125// DescribeLogDirsResponseTopic contains a topic's partitions descriptions
126type DescribeLogDirsResponseTopic struct {
127 Topic string
128 Partitions []DescribeLogDirsResponsePartition
129}
130
131func (r *DescribeLogDirsResponseTopic) encode(pe packetEncoder) error {
132 if err := pe.putString(r.Topic); err != nil {
133 return err
134 }
135
136 for _, partition := range r.Partitions {
137 if err := partition.encode(pe); err != nil {
138 return err
139 }
140 }
141
142 return nil
143}
144
145func (r *DescribeLogDirsResponseTopic) decode(pd packetDecoder, version int16) error {
146 t, err := pd.getString()
147 if err != nil {
148 return err
149 }
150 r.Topic = t
151
152 n, err := pd.getArrayLength()
153 if err != nil {
154 return err
155 }
156 r.Partitions = make([]DescribeLogDirsResponsePartition, n)
157 for i := 0; i < n; i++ {
158 p := DescribeLogDirsResponsePartition{}
159 if err := p.decode(pd, version); err != nil {
160 return err
161 }
162 r.Partitions[i] = p
163 }
164
165 return nil
166}
167
168// DescribeLogDirsResponsePartition describes a partition's log directory
169type DescribeLogDirsResponsePartition struct {
170 PartitionID int32
171
172 // The size of the log segments of the partition in bytes.
173 Size int64
174
175 // The lag of the log's LEO w.r.t. partition's HW (if it is the current log for the partition) or
176 // current replica's LEO (if it is the future log for the partition)
177 OffsetLag int64
178
179 // True if this log is created by AlterReplicaLogDirsRequest and will replace the current log of
180 // the replica in the future.
181 IsTemporary bool
182}
183
184func (r *DescribeLogDirsResponsePartition) encode(pe packetEncoder) error {
185 pe.putInt32(r.PartitionID)
186 pe.putInt64(r.Size)
187 pe.putInt64(r.OffsetLag)
188 pe.putBool(r.IsTemporary)
189
190 return nil
191}
192
193func (r *DescribeLogDirsResponsePartition) decode(pd packetDecoder, version int16) error {
194 pID, err := pd.getInt32()
195 if err != nil {
196 return err
197 }
198 r.PartitionID = pID
199
200 size, err := pd.getInt64()
201 if err != nil {
202 return err
203 }
204 r.Size = size
205
206 lag, err := pd.getInt64()
207 if err != nil {
208 return err
209 }
210 r.OffsetLag = lag
211
212 isTemp, err := pd.getBool()
213 if err != nil {
214 return err
215 }
216 r.IsTemporary = isTemp
217
218 return nil
219}