blob: 52725758d211f6bc804418b3532cb28bc8b07bbe [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001package sarama
2
3import "errors"
4
5// ClusterAdmin is the administrative client for Kafka, which supports managing and inspecting topics,
6// brokers, configurations and ACLs. The minimum broker version required is 0.10.0.0.
7// Methods with stricter requirements will specify the minimum broker version required.
8// You MUST call Close() on a client to avoid leaks
9type ClusterAdmin interface {
10 // Creates a new topic. This operation is supported by brokers with version 0.10.1.0 or higher.
11 // It may take several seconds after CreateTopic returns success for all the brokers
12 // to become aware that the topic has been created. During this time, listTopics
13 // may not return information about the new topic.The validateOnly option is supported from version 0.10.2.0.
14 CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error
15
16 // Delete a topic. It may take several seconds after the DeleteTopic to returns success
17 // and for all the brokers to become aware that the topics are gone.
18 // During this time, listTopics may continue to return information about the deleted topic.
19 // If delete.topic.enable is false on the brokers, deleteTopic will mark
20 // the topic for deletion, but not actually delete them.
21 // This operation is supported by brokers with version 0.10.1.0 or higher.
22 DeleteTopic(topic string) error
23
24 // Increase the number of partitions of the topics according to the corresponding values.
25 // If partitions are increased for a topic that has a key, the partition logic or ordering of
26 // the messages will be affected. It may take several seconds after this method returns
27 // success for all the brokers to become aware that the partitions have been created.
28 // During this time, ClusterAdmin#describeTopics may not return information about the
29 // new partitions. This operation is supported by brokers with version 1.0.0 or higher.
30 CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error
31
32 // Delete records whose offset is smaller than the given offset of the corresponding partition.
33 // This operation is supported by brokers with version 0.11.0.0 or higher.
34 DeleteRecords(topic string, partitionOffsets map[int32]int64) error
35
36 // Get the configuration for the specified resources.
37 // The returned configuration includes default values and the Default is true
38 // can be used to distinguish them from user supplied values.
39 // Config entries where ReadOnly is true cannot be updated.
40 // The value of config entries where Sensitive is true is always nil so
41 // sensitive information is not disclosed.
42 // This operation is supported by brokers with version 0.11.0.0 or higher.
43 DescribeConfig(resource ConfigResource) ([]ConfigEntry, error)
44
45 // Update the configuration for the specified resources with the default options.
46 // This operation is supported by brokers with version 0.11.0.0 or higher.
47 // The resources with their configs (topic is the only resource type with configs
48 // that can be updated currently Updates are not transactional so they may succeed
49 // for some resources while fail for others. The configs for a particular resource are updated automatically.
50 AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error
51
52 // Creates access control lists (ACLs) which are bound to specific resources.
53 // This operation is not transactional so it may succeed for some ACLs while fail for others.
54 // If you attempt to add an ACL that duplicates an existing ACL, no error will be raised, but
55 // no changes will be made. This operation is supported by brokers with version 0.11.0.0 or higher.
56 CreateACL(resource Resource, acl Acl) error
57
58 // Lists access control lists (ACLs) according to the supplied filter.
59 // it may take some time for changes made by createAcls or deleteAcls to be reflected in the output of ListAcls
60 // This operation is supported by brokers with version 0.11.0.0 or higher.
61 ListAcls(filter AclFilter) ([]ResourceAcls, error)
62
63 // Deletes access control lists (ACLs) according to the supplied filters.
64 // This operation is not transactional so it may succeed for some ACLs while fail for others.
65 // This operation is supported by brokers with version 0.11.0.0 or higher.
66 DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error)
67
68 // Close shuts down the admin and closes underlying client.
69 Close() error
70}
71
72type clusterAdmin struct {
73 client Client
74 conf *Config
75}
76
77// NewClusterAdmin creates a new ClusterAdmin using the given broker addresses and configuration.
78func NewClusterAdmin(addrs []string, conf *Config) (ClusterAdmin, error) {
79 client, err := NewClient(addrs, conf)
80 if err != nil {
81 return nil, err
82 }
83
84 //make sure we can retrieve the controller
85 _, err = client.Controller()
86 if err != nil {
87 return nil, err
88 }
89
90 ca := &clusterAdmin{
91 client: client,
92 conf: client.Config(),
93 }
94 return ca, nil
95}
96
97func (ca *clusterAdmin) Close() error {
98 return ca.client.Close()
99}
100
101func (ca *clusterAdmin) Controller() (*Broker, error) {
102 return ca.client.Controller()
103}
104
105func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error {
106
107 if topic == "" {
108 return ErrInvalidTopic
109 }
110
111 if detail == nil {
112 return errors.New("You must specify topic details")
113 }
114
115 topicDetails := make(map[string]*TopicDetail)
116 topicDetails[topic] = detail
117
118 request := &CreateTopicsRequest{
119 TopicDetails: topicDetails,
120 ValidateOnly: validateOnly,
121 Timeout: ca.conf.Admin.Timeout,
122 }
123
124 if ca.conf.Version.IsAtLeast(V0_11_0_0) {
125 request.Version = 1
126 }
127 if ca.conf.Version.IsAtLeast(V1_0_0_0) {
128 request.Version = 2
129 }
130
131 b, err := ca.Controller()
132 if err != nil {
133 return err
134 }
135
136 rsp, err := b.CreateTopics(request)
137 if err != nil {
138 return err
139 }
140
141 topicErr, ok := rsp.TopicErrors[topic]
142 if !ok {
143 return ErrIncompleteResponse
144 }
145
146 if topicErr.Err != ErrNoError {
147 return topicErr.Err
148 }
149
150 return nil
151}
152
153func (ca *clusterAdmin) DeleteTopic(topic string) error {
154
155 if topic == "" {
156 return ErrInvalidTopic
157 }
158
159 request := &DeleteTopicsRequest{
160 Topics: []string{topic},
161 Timeout: ca.conf.Admin.Timeout,
162 }
163
164 if ca.conf.Version.IsAtLeast(V0_11_0_0) {
165 request.Version = 1
166 }
167
168 b, err := ca.Controller()
169 if err != nil {
170 return err
171 }
172
173 rsp, err := b.DeleteTopics(request)
174 if err != nil {
175 return err
176 }
177
178 topicErr, ok := rsp.TopicErrorCodes[topic]
179 if !ok {
180 return ErrIncompleteResponse
181 }
182
183 if topicErr != ErrNoError {
184 return topicErr
185 }
186 return nil
187}
188
189func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error {
190 if topic == "" {
191 return ErrInvalidTopic
192 }
193
194 topicPartitions := make(map[string]*TopicPartition)
195 topicPartitions[topic] = &TopicPartition{Count: count, Assignment: assignment}
196
197 request := &CreatePartitionsRequest{
198 TopicPartitions: topicPartitions,
199 Timeout: ca.conf.Admin.Timeout,
200 }
201
202 b, err := ca.Controller()
203 if err != nil {
204 return err
205 }
206
207 rsp, err := b.CreatePartitions(request)
208 if err != nil {
209 return err
210 }
211
212 topicErr, ok := rsp.TopicPartitionErrors[topic]
213 if !ok {
214 return ErrIncompleteResponse
215 }
216
217 if topicErr.Err != ErrNoError {
218 return topicErr.Err
219 }
220
221 return nil
222}
223
224func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]int64) error {
225
226 if topic == "" {
227 return ErrInvalidTopic
228 }
229
230 topics := make(map[string]*DeleteRecordsRequestTopic)
231 topics[topic] = &DeleteRecordsRequestTopic{PartitionOffsets: partitionOffsets}
232 request := &DeleteRecordsRequest{
233 Topics: topics,
234 Timeout: ca.conf.Admin.Timeout,
235 }
236
237 b, err := ca.Controller()
238 if err != nil {
239 return err
240 }
241
242 rsp, err := b.DeleteRecords(request)
243 if err != nil {
244 return err
245 }
246
247 _, ok := rsp.Topics[topic]
248 if !ok {
249 return ErrIncompleteResponse
250 }
251
252 //todo since we are dealing with couple of partitions it would be good if we return slice of errors
253 //for each partition instead of one error
254 return nil
255}
256
257func (ca *clusterAdmin) DescribeConfig(resource ConfigResource) ([]ConfigEntry, error) {
258
259 var entries []ConfigEntry
260 var resources []*ConfigResource
261 resources = append(resources, &resource)
262
263 request := &DescribeConfigsRequest{
264 Resources: resources,
265 }
266
267 b, err := ca.Controller()
268 if err != nil {
269 return nil, err
270 }
271
272 rsp, err := b.DescribeConfigs(request)
273 if err != nil {
274 return nil, err
275 }
276
277 for _, rspResource := range rsp.Resources {
278 if rspResource.Name == resource.Name {
279 if rspResource.ErrorMsg != "" {
280 return nil, errors.New(rspResource.ErrorMsg)
281 }
282 for _, cfgEntry := range rspResource.Configs {
283 entries = append(entries, *cfgEntry)
284 }
285 }
286 }
287 return entries, nil
288}
289
290func (ca *clusterAdmin) AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error {
291
292 var resources []*AlterConfigsResource
293 resources = append(resources, &AlterConfigsResource{
294 Type: resourceType,
295 Name: name,
296 ConfigEntries: entries,
297 })
298
299 request := &AlterConfigsRequest{
300 Resources: resources,
301 ValidateOnly: validateOnly,
302 }
303
304 b, err := ca.Controller()
305 if err != nil {
306 return err
307 }
308
309 rsp, err := b.AlterConfigs(request)
310 if err != nil {
311 return err
312 }
313
314 for _, rspResource := range rsp.Resources {
315 if rspResource.Name == name {
316 if rspResource.ErrorMsg != "" {
317 return errors.New(rspResource.ErrorMsg)
318 }
319 }
320 }
321 return nil
322}
323
324func (ca *clusterAdmin) CreateACL(resource Resource, acl Acl) error {
325 var acls []*AclCreation
326 acls = append(acls, &AclCreation{resource, acl})
327 request := &CreateAclsRequest{AclCreations: acls}
328
329 b, err := ca.Controller()
330 if err != nil {
331 return err
332 }
333
334 _, err = b.CreateAcls(request)
335 return err
336}
337
338func (ca *clusterAdmin) ListAcls(filter AclFilter) ([]ResourceAcls, error) {
339
340 request := &DescribeAclsRequest{AclFilter: filter}
341
342 b, err := ca.Controller()
343 if err != nil {
344 return nil, err
345 }
346
347 rsp, err := b.DescribeAcls(request)
348 if err != nil {
349 return nil, err
350 }
351
352 var lAcls []ResourceAcls
353 for _, rAcl := range rsp.ResourceAcls {
354 lAcls = append(lAcls, *rAcl)
355 }
356 return lAcls, nil
357}
358
359func (ca *clusterAdmin) DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error) {
360 var filters []*AclFilter
361 filters = append(filters, &filter)
362 request := &DeleteAclsRequest{Filters: filters}
363
364 b, err := ca.Controller()
365 if err != nil {
366 return nil, err
367 }
368
369 rsp, err := b.DeleteAcls(request)
370 if err != nil {
371 return nil, err
372 }
373
374 var mAcls []MatchingAcl
375 for _, fr := range rsp.FilterResponses {
376 for _, mACL := range fr.MatchingAcls {
377 mAcls = append(mAcls, *mACL)
378 }
379
380 }
381 return mAcls, nil
382}