blob: c2ba76c9332bb6cbb5c189b637c14eb183b314de [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001/**
2 * Copyright 2018 Confluent Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package kafka
18
19import (
20 "context"
21 "fmt"
22 "strings"
23 "time"
24 "unsafe"
25)
26
27/*
28#include <librdkafka/rdkafka.h>
29#include <stdlib.h>
30
31static const rd_kafka_topic_result_t *
32topic_result_by_idx (const rd_kafka_topic_result_t **topics, size_t cnt, size_t idx) {
33 if (idx >= cnt)
34 return NULL;
35 return topics[idx];
36}
37
38static const rd_kafka_ConfigResource_t *
39ConfigResource_by_idx (const rd_kafka_ConfigResource_t **res, size_t cnt, size_t idx) {
40 if (idx >= cnt)
41 return NULL;
42 return res[idx];
43}
44
45static const rd_kafka_ConfigEntry_t *
46ConfigEntry_by_idx (const rd_kafka_ConfigEntry_t **entries, size_t cnt, size_t idx) {
47 if (idx >= cnt)
48 return NULL;
49 return entries[idx];
50}
51*/
52import "C"
53
54// AdminClient is derived from an existing Producer or Consumer
55type AdminClient struct {
56 handle *handle
57 isDerived bool // Derived from existing client handle
58}
59
60func durationToMilliseconds(t time.Duration) int {
61 if t > 0 {
62 return (int)(t.Seconds() * 1000.0)
63 }
64 return (int)(t)
65}
66
67// TopicResult provides per-topic operation result (error) information.
68type TopicResult struct {
69 // Topic name
70 Topic string
71 // Error, if any, of result. Check with `Error.Code() != ErrNoError`.
72 Error Error
73}
74
75// String returns a human-readable representation of a TopicResult.
76func (t TopicResult) String() string {
77 if t.Error.code == 0 {
78 return t.Topic
79 }
80 return fmt.Sprintf("%s (%s)", t.Topic, t.Error.str)
81}
82
83// TopicSpecification holds parameters for creating a new topic.
84// TopicSpecification is analogous to NewTopic in the Java Topic Admin API.
85type TopicSpecification struct {
86 // Topic name to create.
87 Topic string
88 // Number of partitions in topic.
89 NumPartitions int
90 // Default replication factor for the topic's partitions, or zero
91 // if an explicit ReplicaAssignment is set.
92 ReplicationFactor int
93 // (Optional) Explicit replica assignment. The outer array is
94 // indexed by the partition number, while the inner per-partition array
95 // contains the replica broker ids. The first broker in each
96 // broker id list will be the preferred replica.
97 ReplicaAssignment [][]int32
98 // Topic configuration.
99 Config map[string]string
100}
101
102// PartitionsSpecification holds parameters for creating additional partitions for a topic.
103// PartitionsSpecification is analogous to NewPartitions in the Java Topic Admin API.
104type PartitionsSpecification struct {
105 // Topic to create more partitions for.
106 Topic string
107 // New partition count for topic, must be higher than current partition count.
108 IncreaseTo int
109 // (Optional) Explicit replica assignment. The outer array is
110 // indexed by the new partition index (i.e., 0 for the first added
111 // partition), while the inner per-partition array
112 // contains the replica broker ids. The first broker in each
113 // broker id list will be the preferred replica.
114 ReplicaAssignment [][]int32
115}
116
117// ResourceType represents an Apache Kafka resource type
118type ResourceType int
119
120const (
121 // ResourceUnknown - Unknown
122 ResourceUnknown = ResourceType(C.RD_KAFKA_RESOURCE_UNKNOWN)
123 // ResourceAny - match any resource type (DescribeConfigs)
124 ResourceAny = ResourceType(C.RD_KAFKA_RESOURCE_ANY)
125 // ResourceTopic - Topic
126 ResourceTopic = ResourceType(C.RD_KAFKA_RESOURCE_TOPIC)
127 // ResourceGroup - Group
128 ResourceGroup = ResourceType(C.RD_KAFKA_RESOURCE_GROUP)
129 // ResourceBroker - Broker
130 ResourceBroker = ResourceType(C.RD_KAFKA_RESOURCE_BROKER)
131)
132
133// String returns the human-readable representation of a ResourceType
134func (t ResourceType) String() string {
135 return C.GoString(C.rd_kafka_ResourceType_name(C.rd_kafka_ResourceType_t(t)))
136}
137
138// ResourceTypeFromString translates a resource type name/string to
139// a ResourceType value.
140func ResourceTypeFromString(typeString string) (ResourceType, error) {
141 switch strings.ToUpper(typeString) {
142 case "ANY":
143 return ResourceAny, nil
144 case "TOPIC":
145 return ResourceTopic, nil
146 case "GROUP":
147 return ResourceGroup, nil
148 case "BROKER":
149 return ResourceBroker, nil
150 default:
151 return ResourceUnknown, newGoError(ErrInvalidArg)
152 }
153}
154
155// ConfigSource represents an Apache Kafka config source
156type ConfigSource int
157
158const (
159 // ConfigSourceUnknown is the default value
160 ConfigSourceUnknown = ConfigSource(C.RD_KAFKA_CONFIG_SOURCE_UNKNOWN_CONFIG)
161 // ConfigSourceDynamicTopic is dynamic topic config that is configured for a specific topic
162 ConfigSourceDynamicTopic = ConfigSource(C.RD_KAFKA_CONFIG_SOURCE_DYNAMIC_TOPIC_CONFIG)
163 // ConfigSourceDynamicBroker is dynamic broker config that is configured for a specific broker
164 ConfigSourceDynamicBroker = ConfigSource(C.RD_KAFKA_CONFIG_SOURCE_DYNAMIC_BROKER_CONFIG)
165 // ConfigSourceDynamicDefaultBroker is dynamic broker config that is configured as default for all brokers in the cluster
166 ConfigSourceDynamicDefaultBroker = ConfigSource(C.RD_KAFKA_CONFIG_SOURCE_DYNAMIC_DEFAULT_BROKER_CONFIG)
167 // ConfigSourceStaticBroker is static broker config provided as broker properties at startup (e.g. from server.properties file)
168 ConfigSourceStaticBroker = ConfigSource(C.RD_KAFKA_CONFIG_SOURCE_STATIC_BROKER_CONFIG)
169 // ConfigSourceDefault is built-in default configuration for configs that have a default value
170 ConfigSourceDefault = ConfigSource(C.RD_KAFKA_CONFIG_SOURCE_DEFAULT_CONFIG)
171)
172
173// String returns the human-readable representation of a ConfigSource type
174func (t ConfigSource) String() string {
175 return C.GoString(C.rd_kafka_ConfigSource_name(C.rd_kafka_ConfigSource_t(t)))
176}
177
178// ConfigResource holds parameters for altering an Apache Kafka configuration resource
179type ConfigResource struct {
180 // Type of resource to set.
181 Type ResourceType
182 // Name of resource to set.
183 Name string
184 // Config entries to set.
185 // Configuration updates are atomic, any configuration property not provided
186 // here will be reverted (by the broker) to its default value.
187 // Use DescribeConfigs to retrieve the list of current configuration entry values.
188 Config []ConfigEntry
189}
190
191// String returns a human-readable representation of a ConfigResource
192func (c ConfigResource) String() string {
193 return fmt.Sprintf("Resource(%s, %s)", c.Type, c.Name)
194}
195
196// AlterOperation specifies the operation to perform on the ConfigEntry.
197// Currently only AlterOperationSet.
198type AlterOperation int
199
200const (
201 // AlterOperationSet sets/overwrites the configuration setting.
202 AlterOperationSet = iota
203)
204
205// String returns the human-readable representation of an AlterOperation
206func (o AlterOperation) String() string {
207 switch o {
208 case AlterOperationSet:
209 return "Set"
210 default:
211 return fmt.Sprintf("Unknown%d?", int(o))
212 }
213}
214
215// ConfigEntry holds parameters for altering a resource's configuration.
216type ConfigEntry struct {
217 // Name of configuration entry, e.g., topic configuration property name.
218 Name string
219 // Value of configuration entry.
220 Value string
221 // Operation to perform on the entry.
222 Operation AlterOperation
223}
224
225// StringMapToConfigEntries creates a new map of ConfigEntry objects from the
226// provided string map. The AlterOperation is set on each created entry.
227func StringMapToConfigEntries(stringMap map[string]string, operation AlterOperation) []ConfigEntry {
228 var ceList []ConfigEntry
229
230 for k, v := range stringMap {
231 ceList = append(ceList, ConfigEntry{Name: k, Value: v, Operation: operation})
232 }
233
234 return ceList
235}
236
237// String returns a human-readable representation of a ConfigEntry.
238func (c ConfigEntry) String() string {
239 return fmt.Sprintf("%v %s=\"%s\"", c.Operation, c.Name, c.Value)
240}
241
242// ConfigEntryResult contains the result of a single configuration entry from a
243// DescribeConfigs request.
244type ConfigEntryResult struct {
245 // Name of configuration entry, e.g., topic configuration property name.
246 Name string
247 // Value of configuration entry.
248 Value string
249 // Source indicates the configuration source.
250 Source ConfigSource
251 // IsReadOnly indicates whether the configuration entry can be altered.
252 IsReadOnly bool
253 // IsSensitive indicates whether the configuration entry contains sensitive information, in which case the value will be unset.
254 IsSensitive bool
255 // IsSynonym indicates whether the configuration entry is a synonym for another configuration property.
256 IsSynonym bool
257 // Synonyms contains a map of configuration entries that are synonyms to this configuration entry.
258 Synonyms map[string]ConfigEntryResult
259}
260
261// String returns a human-readable representation of a ConfigEntryResult.
262func (c ConfigEntryResult) String() string {
263 return fmt.Sprintf("%s=\"%s\"", c.Name, c.Value)
264}
265
266// setFromC sets up a ConfigEntryResult from a C ConfigEntry
267func configEntryResultFromC(cEntry *C.rd_kafka_ConfigEntry_t) (entry ConfigEntryResult) {
268 entry.Name = C.GoString(C.rd_kafka_ConfigEntry_name(cEntry))
269 cValue := C.rd_kafka_ConfigEntry_value(cEntry)
270 if cValue != nil {
271 entry.Value = C.GoString(cValue)
272 }
273 entry.Source = ConfigSource(C.rd_kafka_ConfigEntry_source(cEntry))
274 entry.IsReadOnly = cint2bool(C.rd_kafka_ConfigEntry_is_read_only(cEntry))
275 entry.IsSensitive = cint2bool(C.rd_kafka_ConfigEntry_is_sensitive(cEntry))
276 entry.IsSynonym = cint2bool(C.rd_kafka_ConfigEntry_is_synonym(cEntry))
277
278 var cSynCnt C.size_t
279 cSyns := C.rd_kafka_ConfigEntry_synonyms(cEntry, &cSynCnt)
280 if cSynCnt > 0 {
281 entry.Synonyms = make(map[string]ConfigEntryResult)
282 }
283
284 for si := 0; si < int(cSynCnt); si++ {
285 cSyn := C.ConfigEntry_by_idx(cSyns, cSynCnt, C.size_t(si))
286 Syn := configEntryResultFromC(cSyn)
287 entry.Synonyms[Syn.Name] = Syn
288 }
289
290 return entry
291}
292
293// ConfigResourceResult provides the result for a resource from a AlterConfigs or
294// DescribeConfigs request.
295type ConfigResourceResult struct {
296 // Type of returned result resource.
297 Type ResourceType
298 // Name of returned result resource.
299 Name string
300 // Error, if any, of returned result resource.
301 Error Error
302 // Config entries, if any, of returned result resource.
303 Config map[string]ConfigEntryResult
304}
305
306// String returns a human-readable representation of a ConfigResourceResult.
307func (c ConfigResourceResult) String() string {
308 if c.Error.Code() != 0 {
309 return fmt.Sprintf("ResourceResult(%s, %s, \"%v\")", c.Type, c.Name, c.Error)
310
311 }
312 return fmt.Sprintf("ResourceResult(%s, %s, %d config(s))", c.Type, c.Name, len(c.Config))
313}
314
315// waitResult waits for a result event on cQueue or the ctx to be cancelled, whichever happens
316// first.
317// The returned result event is checked for errors its error is returned if set.
318func (a *AdminClient) waitResult(ctx context.Context, cQueue *C.rd_kafka_queue_t, cEventType C.rd_kafka_event_type_t) (rkev *C.rd_kafka_event_t, err error) {
319
320 resultChan := make(chan *C.rd_kafka_event_t)
321 closeChan := make(chan bool) // never written to, just closed
322
323 go func() {
324 for {
325 select {
326 case _, ok := <-closeChan:
327 if !ok {
328 // Context cancelled/timed out
329 close(resultChan)
330 return
331 }
332
333 default:
334 // Wait for result event for at most 50ms
335 // to avoid blocking for too long if
336 // context is cancelled.
337 rkev := C.rd_kafka_queue_poll(cQueue, 50)
338 if rkev != nil {
339 resultChan <- rkev
340 close(resultChan)
341 return
342 }
343 }
344 }
345 }()
346
347 select {
348 case rkev = <-resultChan:
349 // Result type check
350 if cEventType != C.rd_kafka_event_type(rkev) {
351 err = newErrorFromString(ErrInvalidType,
352 fmt.Sprintf("Expected %d result event, not %d", (int)(cEventType), (int)(C.rd_kafka_event_type(rkev))))
353 C.rd_kafka_event_destroy(rkev)
354 return nil, err
355 }
356
357 // Generic error handling
358 cErr := C.rd_kafka_event_error(rkev)
359 if cErr != 0 {
360 err = newErrorFromCString(cErr, C.rd_kafka_event_error_string(rkev))
361 C.rd_kafka_event_destroy(rkev)
362 return nil, err
363 }
364 close(closeChan)
365 return rkev, nil
366 case <-ctx.Done():
367 // signal close to go-routine
368 close(closeChan)
369 // wait for close from go-routine to make sure it is done
370 // using cQueue before we return.
371 rkev, ok := <-resultChan
372 if ok {
373 // throw away result since context was cancelled
374 C.rd_kafka_event_destroy(rkev)
375 }
376 return nil, ctx.Err()
377 }
378}
379
380// cToTopicResults converts a C topic_result_t array to Go TopicResult list.
381func (a *AdminClient) cToTopicResults(cTopicRes **C.rd_kafka_topic_result_t, cCnt C.size_t) (result []TopicResult, err error) {
382
383 result = make([]TopicResult, int(cCnt))
384
385 for i := 0; i < int(cCnt); i++ {
386 cTopic := C.topic_result_by_idx(cTopicRes, cCnt, C.size_t(i))
387 result[i].Topic = C.GoString(C.rd_kafka_topic_result_name(cTopic))
388 result[i].Error = newErrorFromCString(
389 C.rd_kafka_topic_result_error(cTopic),
390 C.rd_kafka_topic_result_error_string(cTopic))
391 }
392
393 return result, nil
394}
395
396// cConfigResourceToResult converts a C ConfigResource result array to Go ConfigResourceResult
397func (a *AdminClient) cConfigResourceToResult(cRes **C.rd_kafka_ConfigResource_t, cCnt C.size_t) (result []ConfigResourceResult, err error) {
398
399 result = make([]ConfigResourceResult, int(cCnt))
400
401 for i := 0; i < int(cCnt); i++ {
402 cRes := C.ConfigResource_by_idx(cRes, cCnt, C.size_t(i))
403 result[i].Type = ResourceType(C.rd_kafka_ConfigResource_type(cRes))
404 result[i].Name = C.GoString(C.rd_kafka_ConfigResource_name(cRes))
405 result[i].Error = newErrorFromCString(
406 C.rd_kafka_ConfigResource_error(cRes),
407 C.rd_kafka_ConfigResource_error_string(cRes))
408 var cConfigCnt C.size_t
409 cConfigs := C.rd_kafka_ConfigResource_configs(cRes, &cConfigCnt)
410 if cConfigCnt > 0 {
411 result[i].Config = make(map[string]ConfigEntryResult)
412 }
413 for ci := 0; ci < int(cConfigCnt); ci++ {
414 cEntry := C.ConfigEntry_by_idx(cConfigs, cConfigCnt, C.size_t(ci))
415 entry := configEntryResultFromC(cEntry)
416 result[i].Config[entry.Name] = entry
417 }
418 }
419
420 return result, nil
421}
422
423// CreateTopics creates topics in cluster.
424//
425// The list of TopicSpecification objects define the per-topic partition count, replicas, etc.
426//
427// Topic creation is non-atomic and may succeed for some topics but fail for others,
428// make sure to check the result for topic-specific errors.
429//
430// Note: TopicSpecification is analogous to NewTopic in the Java Topic Admin API.
431func (a *AdminClient) CreateTopics(ctx context.Context, topics []TopicSpecification, options ...CreateTopicsAdminOption) (result []TopicResult, err error) {
432 cTopics := make([]*C.rd_kafka_NewTopic_t, len(topics))
433
434 cErrstrSize := C.size_t(512)
435 cErrstr := (*C.char)(C.malloc(cErrstrSize))
436 defer C.free(unsafe.Pointer(cErrstr))
437
438 // Convert Go TopicSpecifications to C TopicSpecifications
439 for i, topic := range topics {
440
441 var cReplicationFactor C.int
442 if topic.ReplicationFactor == 0 {
443 cReplicationFactor = -1
444 } else {
445 cReplicationFactor = C.int(topic.ReplicationFactor)
446 }
447 if topic.ReplicaAssignment != nil {
448 if cReplicationFactor != -1 {
449 return nil, newErrorFromString(ErrInvalidArg,
450 "TopicSpecification.ReplicationFactor and TopicSpecification.ReplicaAssignment are mutually exclusive")
451 }
452
453 if len(topic.ReplicaAssignment) != topic.NumPartitions {
454 return nil, newErrorFromString(ErrInvalidArg,
455 "TopicSpecification.ReplicaAssignment must contain exactly TopicSpecification.NumPartitions partitions")
456 }
457
458 } else if cReplicationFactor == -1 {
459 return nil, newErrorFromString(ErrInvalidArg,
460 "TopicSpecification.ReplicationFactor or TopicSpecification.ReplicaAssignment must be specified")
461 }
462
463 cTopics[i] = C.rd_kafka_NewTopic_new(
464 C.CString(topic.Topic),
465 C.int(topic.NumPartitions),
466 cReplicationFactor,
467 cErrstr, cErrstrSize)
468 if cTopics[i] == nil {
469 return nil, newErrorFromString(ErrInvalidArg,
470 fmt.Sprintf("Topic %s: %s", topic.Topic, C.GoString(cErrstr)))
471 }
472
473 defer C.rd_kafka_NewTopic_destroy(cTopics[i])
474
475 for p, replicas := range topic.ReplicaAssignment {
476 cReplicas := make([]C.int32_t, len(replicas))
477 for ri, replica := range replicas {
478 cReplicas[ri] = C.int32_t(replica)
479 }
480 cErr := C.rd_kafka_NewTopic_set_replica_assignment(
481 cTopics[i], C.int32_t(p),
482 (*C.int32_t)(&cReplicas[0]), C.size_t(len(cReplicas)),
483 cErrstr, cErrstrSize)
484 if cErr != 0 {
485 return nil, newCErrorFromString(cErr,
486 fmt.Sprintf("Failed to set replica assignment for topic %s partition %d: %s", topic.Topic, p, C.GoString(cErrstr)))
487 }
488 }
489
490 for key, value := range topic.Config {
491 cErr := C.rd_kafka_NewTopic_set_config(
492 cTopics[i],
493 C.CString(key), C.CString(value))
494 if cErr != 0 {
495 return nil, newCErrorFromString(cErr,
496 fmt.Sprintf("Failed to set config %s=%s for topic %s", key, value, topic.Topic))
497 }
498 }
499 }
500
501 // Convert Go AdminOptions (if any) to C AdminOptions
502 genericOptions := make([]AdminOption, len(options))
503 for i := range options {
504 genericOptions[i] = options[i]
505 }
506 cOptions, err := adminOptionsSetup(a.handle, C.RD_KAFKA_ADMIN_OP_CREATETOPICS, genericOptions)
507 if err != nil {
508 return nil, err
509 }
510 defer C.rd_kafka_AdminOptions_destroy(cOptions)
511
512 // Create temporary queue for async operation
513 cQueue := C.rd_kafka_queue_new(a.handle.rk)
514 defer C.rd_kafka_queue_destroy(cQueue)
515
516 // Asynchronous call
517 C.rd_kafka_CreateTopics(
518 a.handle.rk,
519 (**C.rd_kafka_NewTopic_t)(&cTopics[0]),
520 C.size_t(len(cTopics)),
521 cOptions,
522 cQueue)
523
524 // Wait for result, error or context timeout
525 rkev, err := a.waitResult(ctx, cQueue, C.RD_KAFKA_EVENT_CREATETOPICS_RESULT)
526 if err != nil {
527 return nil, err
528 }
529 defer C.rd_kafka_event_destroy(rkev)
530
531 cRes := C.rd_kafka_event_CreateTopics_result(rkev)
532
533 // Convert result from C to Go
534 var cCnt C.size_t
535 cTopicRes := C.rd_kafka_CreateTopics_result_topics(cRes, &cCnt)
536
537 return a.cToTopicResults(cTopicRes, cCnt)
538}
539
540// DeleteTopics deletes a batch of topics.
541//
542// This operation is not transactional and may succeed for a subset of topics while
543// failing others.
544// It may take several seconds after the DeleteTopics result returns success for
545// all the brokers to become aware that the topics are gone. During this time,
546// topic metadata and configuration may continue to return information about deleted topics.
547//
548// Requires broker version >= 0.10.1.0
549func (a *AdminClient) DeleteTopics(ctx context.Context, topics []string, options ...DeleteTopicsAdminOption) (result []TopicResult, err error) {
550 cTopics := make([]*C.rd_kafka_DeleteTopic_t, len(topics))
551
552 cErrstrSize := C.size_t(512)
553 cErrstr := (*C.char)(C.malloc(cErrstrSize))
554 defer C.free(unsafe.Pointer(cErrstr))
555
556 // Convert Go DeleteTopics to C DeleteTopics
557 for i, topic := range topics {
558 cTopics[i] = C.rd_kafka_DeleteTopic_new(C.CString(topic))
559 if cTopics[i] == nil {
560 return nil, newErrorFromString(ErrInvalidArg,
561 fmt.Sprintf("Invalid arguments for topic %s", topic))
562 }
563
564 defer C.rd_kafka_DeleteTopic_destroy(cTopics[i])
565 }
566
567 // Convert Go AdminOptions (if any) to C AdminOptions
568 genericOptions := make([]AdminOption, len(options))
569 for i := range options {
570 genericOptions[i] = options[i]
571 }
572 cOptions, err := adminOptionsSetup(a.handle, C.RD_KAFKA_ADMIN_OP_DELETETOPICS, genericOptions)
573 if err != nil {
574 return nil, err
575 }
576 defer C.rd_kafka_AdminOptions_destroy(cOptions)
577
578 // Create temporary queue for async operation
579 cQueue := C.rd_kafka_queue_new(a.handle.rk)
580 defer C.rd_kafka_queue_destroy(cQueue)
581
582 // Asynchronous call
583 C.rd_kafka_DeleteTopics(
584 a.handle.rk,
585 (**C.rd_kafka_DeleteTopic_t)(&cTopics[0]),
586 C.size_t(len(cTopics)),
587 cOptions,
588 cQueue)
589
590 // Wait for result, error or context timeout
591 rkev, err := a.waitResult(ctx, cQueue, C.RD_KAFKA_EVENT_DELETETOPICS_RESULT)
592 if err != nil {
593 return nil, err
594 }
595 defer C.rd_kafka_event_destroy(rkev)
596
597 cRes := C.rd_kafka_event_DeleteTopics_result(rkev)
598
599 // Convert result from C to Go
600 var cCnt C.size_t
601 cTopicRes := C.rd_kafka_DeleteTopics_result_topics(cRes, &cCnt)
602
603 return a.cToTopicResults(cTopicRes, cCnt)
604}
605
606// CreatePartitions creates additional partitions for topics.
607func (a *AdminClient) CreatePartitions(ctx context.Context, partitions []PartitionsSpecification, options ...CreatePartitionsAdminOption) (result []TopicResult, err error) {
608 cParts := make([]*C.rd_kafka_NewPartitions_t, len(partitions))
609
610 cErrstrSize := C.size_t(512)
611 cErrstr := (*C.char)(C.malloc(cErrstrSize))
612 defer C.free(unsafe.Pointer(cErrstr))
613
614 // Convert Go PartitionsSpecification to C NewPartitions
615 for i, part := range partitions {
616 cParts[i] = C.rd_kafka_NewPartitions_new(C.CString(part.Topic), C.size_t(part.IncreaseTo), cErrstr, cErrstrSize)
617 if cParts[i] == nil {
618 return nil, newErrorFromString(ErrInvalidArg,
619 fmt.Sprintf("Topic %s: %s", part.Topic, C.GoString(cErrstr)))
620 }
621
622 defer C.rd_kafka_NewPartitions_destroy(cParts[i])
623
624 for pidx, replicas := range part.ReplicaAssignment {
625 cReplicas := make([]C.int32_t, len(replicas))
626 for ri, replica := range replicas {
627 cReplicas[ri] = C.int32_t(replica)
628 }
629 cErr := C.rd_kafka_NewPartitions_set_replica_assignment(
630 cParts[i], C.int32_t(pidx),
631 (*C.int32_t)(&cReplicas[0]), C.size_t(len(cReplicas)),
632 cErrstr, cErrstrSize)
633 if cErr != 0 {
634 return nil, newCErrorFromString(cErr,
635 fmt.Sprintf("Failed to set replica assignment for topic %s new partition index %d: %s", part.Topic, pidx, C.GoString(cErrstr)))
636 }
637 }
638
639 }
640
641 // Convert Go AdminOptions (if any) to C AdminOptions
642 genericOptions := make([]AdminOption, len(options))
643 for i := range options {
644 genericOptions[i] = options[i]
645 }
646 cOptions, err := adminOptionsSetup(a.handle, C.RD_KAFKA_ADMIN_OP_CREATEPARTITIONS, genericOptions)
647 if err != nil {
648 return nil, err
649 }
650 defer C.rd_kafka_AdminOptions_destroy(cOptions)
651
652 // Create temporary queue for async operation
653 cQueue := C.rd_kafka_queue_new(a.handle.rk)
654 defer C.rd_kafka_queue_destroy(cQueue)
655
656 // Asynchronous call
657 C.rd_kafka_CreatePartitions(
658 a.handle.rk,
659 (**C.rd_kafka_NewPartitions_t)(&cParts[0]),
660 C.size_t(len(cParts)),
661 cOptions,
662 cQueue)
663
664 // Wait for result, error or context timeout
665 rkev, err := a.waitResult(ctx, cQueue, C.RD_KAFKA_EVENT_CREATEPARTITIONS_RESULT)
666 if err != nil {
667 return nil, err
668 }
669 defer C.rd_kafka_event_destroy(rkev)
670
671 cRes := C.rd_kafka_event_CreatePartitions_result(rkev)
672
673 // Convert result from C to Go
674 var cCnt C.size_t
675 cTopicRes := C.rd_kafka_CreatePartitions_result_topics(cRes, &cCnt)
676
677 return a.cToTopicResults(cTopicRes, cCnt)
678}
679
680// AlterConfigs alters/updates cluster resource configuration.
681//
682// Updates are not transactional so they may succeed for a subset
683// of the provided resources while others fail.
684// The configuration for a particular resource is updated atomically,
685// replacing values using the provided ConfigEntrys and reverting
686// unspecified ConfigEntrys to their default values.
687//
688// Requires broker version >=0.11.0.0
689//
690// AlterConfigs will replace all existing configuration for
691// the provided resources with the new configuration given,
692// reverting all other configuration to their default values.
693//
694// Multiple resources and resource types may be set, but at most one
695// resource of type ResourceBroker is allowed per call since these
696// resource requests must be sent to the broker specified in the resource.
697func (a *AdminClient) AlterConfigs(ctx context.Context, resources []ConfigResource, options ...AlterConfigsAdminOption) (result []ConfigResourceResult, err error) {
698 cRes := make([]*C.rd_kafka_ConfigResource_t, len(resources))
699
700 cErrstrSize := C.size_t(512)
701 cErrstr := (*C.char)(C.malloc(cErrstrSize))
702 defer C.free(unsafe.Pointer(cErrstr))
703
704 // Convert Go ConfigResources to C ConfigResources
705 for i, res := range resources {
706 cRes[i] = C.rd_kafka_ConfigResource_new(
707 C.rd_kafka_ResourceType_t(res.Type), C.CString(res.Name))
708 if cRes[i] == nil {
709 return nil, newErrorFromString(ErrInvalidArg,
710 fmt.Sprintf("Invalid arguments for resource %v", res))
711 }
712
713 defer C.rd_kafka_ConfigResource_destroy(cRes[i])
714
715 for _, entry := range res.Config {
716 var cErr C.rd_kafka_resp_err_t
717 switch entry.Operation {
718 case AlterOperationSet:
719 cErr = C.rd_kafka_ConfigResource_set_config(
720 cRes[i], C.CString(entry.Name), C.CString(entry.Value))
721 default:
722 panic(fmt.Sprintf("Invalid ConfigEntry.Operation: %v", entry.Operation))
723 }
724
725 if cErr != 0 {
726 return nil,
727 newCErrorFromString(cErr,
728 fmt.Sprintf("Failed to add configuration %s: %s",
729 entry, C.GoString(C.rd_kafka_err2str(cErr))))
730 }
731 }
732 }
733
734 // Convert Go AdminOptions (if any) to C AdminOptions
735 genericOptions := make([]AdminOption, len(options))
736 for i := range options {
737 genericOptions[i] = options[i]
738 }
739 cOptions, err := adminOptionsSetup(a.handle, C.RD_KAFKA_ADMIN_OP_ALTERCONFIGS, genericOptions)
740 if err != nil {
741 return nil, err
742 }
743 defer C.rd_kafka_AdminOptions_destroy(cOptions)
744
745 // Create temporary queue for async operation
746 cQueue := C.rd_kafka_queue_new(a.handle.rk)
747 defer C.rd_kafka_queue_destroy(cQueue)
748
749 // Asynchronous call
750 C.rd_kafka_AlterConfigs(
751 a.handle.rk,
752 (**C.rd_kafka_ConfigResource_t)(&cRes[0]),
753 C.size_t(len(cRes)),
754 cOptions,
755 cQueue)
756
757 // Wait for result, error or context timeout
758 rkev, err := a.waitResult(ctx, cQueue, C.RD_KAFKA_EVENT_ALTERCONFIGS_RESULT)
759 if err != nil {
760 return nil, err
761 }
762 defer C.rd_kafka_event_destroy(rkev)
763
764 cResult := C.rd_kafka_event_AlterConfigs_result(rkev)
765
766 // Convert results from C to Go
767 var cCnt C.size_t
768 cResults := C.rd_kafka_AlterConfigs_result_resources(cResult, &cCnt)
769
770 return a.cConfigResourceToResult(cResults, cCnt)
771}
772
773// DescribeConfigs retrieves configuration for cluster resources.
774//
775// The returned configuration includes default values, use
776// ConfigEntryResult.IsDefault or ConfigEntryResult.Source to distinguish
777// default values from manually configured settings.
778//
779// The value of config entries where .IsSensitive is true
780// will always be nil to avoid disclosing sensitive
781// information, such as security settings.
782//
783// Configuration entries where .IsReadOnly is true can't be modified
784// (with AlterConfigs).
785//
786// Synonym configuration entries are returned if the broker supports
787// it (broker version >= 1.1.0). See .Synonyms.
788//
789// Requires broker version >=0.11.0.0
790//
791// Multiple resources and resource types may be requested, but at most
792// one resource of type ResourceBroker is allowed per call
793// since these resource requests must be sent to the broker specified
794// in the resource.
795func (a *AdminClient) DescribeConfigs(ctx context.Context, resources []ConfigResource, options ...DescribeConfigsAdminOption) (result []ConfigResourceResult, err error) {
796 cRes := make([]*C.rd_kafka_ConfigResource_t, len(resources))
797
798 cErrstrSize := C.size_t(512)
799 cErrstr := (*C.char)(C.malloc(cErrstrSize))
800 defer C.free(unsafe.Pointer(cErrstr))
801
802 // Convert Go ConfigResources to C ConfigResources
803 for i, res := range resources {
804 cRes[i] = C.rd_kafka_ConfigResource_new(
805 C.rd_kafka_ResourceType_t(res.Type), C.CString(res.Name))
806 if cRes[i] == nil {
807 return nil, newErrorFromString(ErrInvalidArg,
808 fmt.Sprintf("Invalid arguments for resource %v", res))
809 }
810
811 defer C.rd_kafka_ConfigResource_destroy(cRes[i])
812 }
813
814 // Convert Go AdminOptions (if any) to C AdminOptions
815 genericOptions := make([]AdminOption, len(options))
816 for i := range options {
817 genericOptions[i] = options[i]
818 }
819 cOptions, err := adminOptionsSetup(a.handle, C.RD_KAFKA_ADMIN_OP_DESCRIBECONFIGS, genericOptions)
820 if err != nil {
821 return nil, err
822 }
823 defer C.rd_kafka_AdminOptions_destroy(cOptions)
824
825 // Create temporary queue for async operation
826 cQueue := C.rd_kafka_queue_new(a.handle.rk)
827 defer C.rd_kafka_queue_destroy(cQueue)
828
829 // Asynchronous call
830 C.rd_kafka_DescribeConfigs(
831 a.handle.rk,
832 (**C.rd_kafka_ConfigResource_t)(&cRes[0]),
833 C.size_t(len(cRes)),
834 cOptions,
835 cQueue)
836
837 // Wait for result, error or context timeout
838 rkev, err := a.waitResult(ctx, cQueue, C.RD_KAFKA_EVENT_DESCRIBECONFIGS_RESULT)
839 if err != nil {
840 return nil, err
841 }
842 defer C.rd_kafka_event_destroy(rkev)
843
844 cResult := C.rd_kafka_event_DescribeConfigs_result(rkev)
845
846 // Convert results from C to Go
847 var cCnt C.size_t
848 cResults := C.rd_kafka_DescribeConfigs_result_resources(cResult, &cCnt)
849
850 return a.cConfigResourceToResult(cResults, cCnt)
851}
852
853// GetMetadata queries broker for cluster and topic metadata.
854// If topic is non-nil only information about that topic is returned, else if
855// allTopics is false only information about locally used topics is returned,
856// else information about all topics is returned.
857// GetMetadata is equivalent to listTopics, describeTopics and describeCluster in the Java API.
858func (a *AdminClient) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*Metadata, error) {
859 return getMetadata(a, topic, allTopics, timeoutMs)
860}
861
862// String returns a human readable name for an AdminClient instance
863func (a *AdminClient) String() string {
864 return fmt.Sprintf("admin-%s", a.handle.String())
865}
866
867// get_handle implements the Handle interface
868func (a *AdminClient) gethandle() *handle {
869 return a.handle
870}
871
872// Close an AdminClient instance.
873func (a *AdminClient) Close() {
874 if a.isDerived {
875 // Derived AdminClient needs no cleanup.
876 a.handle = &handle{}
877 return
878 }
879
880 a.handle.cleanup()
881
882 C.rd_kafka_destroy(a.handle.rk)
883}
884
885// NewAdminClient creats a new AdminClient instance with a new underlying client instance
886func NewAdminClient(conf *ConfigMap) (*AdminClient, error) {
887
888 err := versionCheck()
889 if err != nil {
890 return nil, err
891 }
892
893 a := &AdminClient{}
894 a.handle = &handle{}
895
896 // Convert ConfigMap to librdkafka conf_t
897 cConf, err := conf.convert()
898 if err != nil {
899 return nil, err
900 }
901
902 cErrstr := (*C.char)(C.malloc(C.size_t(256)))
903 defer C.free(unsafe.Pointer(cErrstr))
904
905 // Create librdkafka producer instance. The Producer is somewhat cheaper than
906 // the consumer, but any instance type can be used for Admin APIs.
907 a.handle.rk = C.rd_kafka_new(C.RD_KAFKA_PRODUCER, cConf, cErrstr, 256)
908 if a.handle.rk == nil {
909 return nil, newErrorFromCString(C.RD_KAFKA_RESP_ERR__INVALID_ARG, cErrstr)
910 }
911
912 a.isDerived = false
913 a.handle.setup()
914
915 return a, nil
916}
917
918// NewAdminClientFromProducer derives a new AdminClient from an existing Producer instance.
919// The AdminClient will use the same configuration and connections as the parent instance.
920func NewAdminClientFromProducer(p *Producer) (a *AdminClient, err error) {
921 if p.handle.rk == nil {
922 return nil, newErrorFromString(ErrInvalidArg, "Can't derive AdminClient from closed producer")
923 }
924
925 a = &AdminClient{}
926 a.handle = &p.handle
927 a.isDerived = true
928 return a, nil
929}
930
931// NewAdminClientFromConsumer derives a new AdminClient from an existing Consumer instance.
932// The AdminClient will use the same configuration and connections as the parent instance.
933func NewAdminClientFromConsumer(c *Consumer) (a *AdminClient, err error) {
934 if c.handle.rk == nil {
935 return nil, newErrorFromString(ErrInvalidArg, "Can't derive AdminClient from closed consumer")
936 }
937
938 a = &AdminClient{}
939 a.handle = &c.handle
940 a.isDerived = true
941 return a, nil
942}