[VOL-1386] This commit add "dep" as the package management tool
for voltha-go.
Change-Id: I52bc4911dd00a441756ec7c30f46d45091f3f90e
diff --git a/vendor/github.com/confluentinc/confluent-kafka-go/kafka/adminapi.go b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/adminapi.go
new file mode 100644
index 0000000..c2ba76c
--- /dev/null
+++ b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/adminapi.go
@@ -0,0 +1,942 @@
+/**
+ * Copyright 2018 Confluent Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka
+
+import (
+ "context"
+ "fmt"
+ "strings"
+ "time"
+ "unsafe"
+)
+
+/*
+#include <librdkafka/rdkafka.h>
+#include <stdlib.h>
+
+static const rd_kafka_topic_result_t *
+topic_result_by_idx (const rd_kafka_topic_result_t **topics, size_t cnt, size_t idx) {
+ if (idx >= cnt)
+ return NULL;
+ return topics[idx];
+}
+
+static const rd_kafka_ConfigResource_t *
+ConfigResource_by_idx (const rd_kafka_ConfigResource_t **res, size_t cnt, size_t idx) {
+ if (idx >= cnt)
+ return NULL;
+ return res[idx];
+}
+
+static const rd_kafka_ConfigEntry_t *
+ConfigEntry_by_idx (const rd_kafka_ConfigEntry_t **entries, size_t cnt, size_t idx) {
+ if (idx >= cnt)
+ return NULL;
+ return entries[idx];
+}
+*/
+import "C"
+
+// AdminClient is derived from an existing Producer or Consumer
+type AdminClient struct {
+ handle *handle
+ isDerived bool // Derived from existing client handle
+}
+
+func durationToMilliseconds(t time.Duration) int {
+ if t > 0 {
+ return (int)(t.Seconds() * 1000.0)
+ }
+ return (int)(t)
+}
+
+// TopicResult provides per-topic operation result (error) information.
+type TopicResult struct {
+ // Topic name
+ Topic string
+ // Error, if any, of result. Check with `Error.Code() != ErrNoError`.
+ Error Error
+}
+
+// String returns a human-readable representation of a TopicResult.
+func (t TopicResult) String() string {
+ if t.Error.code == 0 {
+ return t.Topic
+ }
+ return fmt.Sprintf("%s (%s)", t.Topic, t.Error.str)
+}
+
+// TopicSpecification holds parameters for creating a new topic.
+// TopicSpecification is analogous to NewTopic in the Java Topic Admin API.
+type TopicSpecification struct {
+ // Topic name to create.
+ Topic string
+ // Number of partitions in topic.
+ NumPartitions int
+ // Default replication factor for the topic's partitions, or zero
+ // if an explicit ReplicaAssignment is set.
+ ReplicationFactor int
+ // (Optional) Explicit replica assignment. The outer array is
+ // indexed by the partition number, while the inner per-partition array
+ // contains the replica broker ids. The first broker in each
+ // broker id list will be the preferred replica.
+ ReplicaAssignment [][]int32
+ // Topic configuration.
+ Config map[string]string
+}
+
+// PartitionsSpecification holds parameters for creating additional partitions for a topic.
+// PartitionsSpecification is analogous to NewPartitions in the Java Topic Admin API.
+type PartitionsSpecification struct {
+ // Topic to create more partitions for.
+ Topic string
+ // New partition count for topic, must be higher than current partition count.
+ IncreaseTo int
+ // (Optional) Explicit replica assignment. The outer array is
+ // indexed by the new partition index (i.e., 0 for the first added
+ // partition), while the inner per-partition array
+ // contains the replica broker ids. The first broker in each
+ // broker id list will be the preferred replica.
+ ReplicaAssignment [][]int32
+}
+
+// ResourceType represents an Apache Kafka resource type
+type ResourceType int
+
+const (
+ // ResourceUnknown - Unknown
+ ResourceUnknown = ResourceType(C.RD_KAFKA_RESOURCE_UNKNOWN)
+ // ResourceAny - match any resource type (DescribeConfigs)
+ ResourceAny = ResourceType(C.RD_KAFKA_RESOURCE_ANY)
+ // ResourceTopic - Topic
+ ResourceTopic = ResourceType(C.RD_KAFKA_RESOURCE_TOPIC)
+ // ResourceGroup - Group
+ ResourceGroup = ResourceType(C.RD_KAFKA_RESOURCE_GROUP)
+ // ResourceBroker - Broker
+ ResourceBroker = ResourceType(C.RD_KAFKA_RESOURCE_BROKER)
+)
+
+// String returns the human-readable representation of a ResourceType
+func (t ResourceType) String() string {
+ return C.GoString(C.rd_kafka_ResourceType_name(C.rd_kafka_ResourceType_t(t)))
+}
+
+// ResourceTypeFromString translates a resource type name/string to
+// a ResourceType value.
+func ResourceTypeFromString(typeString string) (ResourceType, error) {
+ switch strings.ToUpper(typeString) {
+ case "ANY":
+ return ResourceAny, nil
+ case "TOPIC":
+ return ResourceTopic, nil
+ case "GROUP":
+ return ResourceGroup, nil
+ case "BROKER":
+ return ResourceBroker, nil
+ default:
+ return ResourceUnknown, newGoError(ErrInvalidArg)
+ }
+}
+
+// ConfigSource represents an Apache Kafka config source
+type ConfigSource int
+
+const (
+ // ConfigSourceUnknown is the default value
+ ConfigSourceUnknown = ConfigSource(C.RD_KAFKA_CONFIG_SOURCE_UNKNOWN_CONFIG)
+ // ConfigSourceDynamicTopic is dynamic topic config that is configured for a specific topic
+ ConfigSourceDynamicTopic = ConfigSource(C.RD_KAFKA_CONFIG_SOURCE_DYNAMIC_TOPIC_CONFIG)
+ // ConfigSourceDynamicBroker is dynamic broker config that is configured for a specific broker
+ ConfigSourceDynamicBroker = ConfigSource(C.RD_KAFKA_CONFIG_SOURCE_DYNAMIC_BROKER_CONFIG)
+ // ConfigSourceDynamicDefaultBroker is dynamic broker config that is configured as default for all brokers in the cluster
+ ConfigSourceDynamicDefaultBroker = ConfigSource(C.RD_KAFKA_CONFIG_SOURCE_DYNAMIC_DEFAULT_BROKER_CONFIG)
+ // ConfigSourceStaticBroker is static broker config provided as broker properties at startup (e.g. from server.properties file)
+ ConfigSourceStaticBroker = ConfigSource(C.RD_KAFKA_CONFIG_SOURCE_STATIC_BROKER_CONFIG)
+ // ConfigSourceDefault is built-in default configuration for configs that have a default value
+ ConfigSourceDefault = ConfigSource(C.RD_KAFKA_CONFIG_SOURCE_DEFAULT_CONFIG)
+)
+
+// String returns the human-readable representation of a ConfigSource type
+func (t ConfigSource) String() string {
+ return C.GoString(C.rd_kafka_ConfigSource_name(C.rd_kafka_ConfigSource_t(t)))
+}
+
+// ConfigResource holds parameters for altering an Apache Kafka configuration resource
+type ConfigResource struct {
+ // Type of resource to set.
+ Type ResourceType
+ // Name of resource to set.
+ Name string
+ // Config entries to set.
+ // Configuration updates are atomic, any configuration property not provided
+ // here will be reverted (by the broker) to its default value.
+ // Use DescribeConfigs to retrieve the list of current configuration entry values.
+ Config []ConfigEntry
+}
+
+// String returns a human-readable representation of a ConfigResource
+func (c ConfigResource) String() string {
+ return fmt.Sprintf("Resource(%s, %s)", c.Type, c.Name)
+}
+
+// AlterOperation specifies the operation to perform on the ConfigEntry.
+// Currently only AlterOperationSet.
+type AlterOperation int
+
+const (
+ // AlterOperationSet sets/overwrites the configuration setting.
+ AlterOperationSet = iota
+)
+
+// String returns the human-readable representation of an AlterOperation
+func (o AlterOperation) String() string {
+ switch o {
+ case AlterOperationSet:
+ return "Set"
+ default:
+ return fmt.Sprintf("Unknown%d?", int(o))
+ }
+}
+
+// ConfigEntry holds parameters for altering a resource's configuration.
+type ConfigEntry struct {
+ // Name of configuration entry, e.g., topic configuration property name.
+ Name string
+ // Value of configuration entry.
+ Value string
+ // Operation to perform on the entry.
+ Operation AlterOperation
+}
+
+// StringMapToConfigEntries creates a new map of ConfigEntry objects from the
+// provided string map. The AlterOperation is set on each created entry.
+func StringMapToConfigEntries(stringMap map[string]string, operation AlterOperation) []ConfigEntry {
+ var ceList []ConfigEntry
+
+ for k, v := range stringMap {
+ ceList = append(ceList, ConfigEntry{Name: k, Value: v, Operation: operation})
+ }
+
+ return ceList
+}
+
+// String returns a human-readable representation of a ConfigEntry.
+func (c ConfigEntry) String() string {
+ return fmt.Sprintf("%v %s=\"%s\"", c.Operation, c.Name, c.Value)
+}
+
+// ConfigEntryResult contains the result of a single configuration entry from a
+// DescribeConfigs request.
+type ConfigEntryResult struct {
+ // Name of configuration entry, e.g., topic configuration property name.
+ Name string
+ // Value of configuration entry.
+ Value string
+ // Source indicates the configuration source.
+ Source ConfigSource
+ // IsReadOnly indicates whether the configuration entry can be altered.
+ IsReadOnly bool
+ // IsSensitive indicates whether the configuration entry contains sensitive information, in which case the value will be unset.
+ IsSensitive bool
+ // IsSynonym indicates whether the configuration entry is a synonym for another configuration property.
+ IsSynonym bool
+ // Synonyms contains a map of configuration entries that are synonyms to this configuration entry.
+ Synonyms map[string]ConfigEntryResult
+}
+
+// String returns a human-readable representation of a ConfigEntryResult.
+func (c ConfigEntryResult) String() string {
+ return fmt.Sprintf("%s=\"%s\"", c.Name, c.Value)
+}
+
+// setFromC sets up a ConfigEntryResult from a C ConfigEntry
+func configEntryResultFromC(cEntry *C.rd_kafka_ConfigEntry_t) (entry ConfigEntryResult) {
+ entry.Name = C.GoString(C.rd_kafka_ConfigEntry_name(cEntry))
+ cValue := C.rd_kafka_ConfigEntry_value(cEntry)
+ if cValue != nil {
+ entry.Value = C.GoString(cValue)
+ }
+ entry.Source = ConfigSource(C.rd_kafka_ConfigEntry_source(cEntry))
+ entry.IsReadOnly = cint2bool(C.rd_kafka_ConfigEntry_is_read_only(cEntry))
+ entry.IsSensitive = cint2bool(C.rd_kafka_ConfigEntry_is_sensitive(cEntry))
+ entry.IsSynonym = cint2bool(C.rd_kafka_ConfigEntry_is_synonym(cEntry))
+
+ var cSynCnt C.size_t
+ cSyns := C.rd_kafka_ConfigEntry_synonyms(cEntry, &cSynCnt)
+ if cSynCnt > 0 {
+ entry.Synonyms = make(map[string]ConfigEntryResult)
+ }
+
+ for si := 0; si < int(cSynCnt); si++ {
+ cSyn := C.ConfigEntry_by_idx(cSyns, cSynCnt, C.size_t(si))
+ Syn := configEntryResultFromC(cSyn)
+ entry.Synonyms[Syn.Name] = Syn
+ }
+
+ return entry
+}
+
+// ConfigResourceResult provides the result for a resource from a AlterConfigs or
+// DescribeConfigs request.
+type ConfigResourceResult struct {
+ // Type of returned result resource.
+ Type ResourceType
+ // Name of returned result resource.
+ Name string
+ // Error, if any, of returned result resource.
+ Error Error
+ // Config entries, if any, of returned result resource.
+ Config map[string]ConfigEntryResult
+}
+
+// String returns a human-readable representation of a ConfigResourceResult.
+func (c ConfigResourceResult) String() string {
+ if c.Error.Code() != 0 {
+ return fmt.Sprintf("ResourceResult(%s, %s, \"%v\")", c.Type, c.Name, c.Error)
+
+ }
+ return fmt.Sprintf("ResourceResult(%s, %s, %d config(s))", c.Type, c.Name, len(c.Config))
+}
+
+// waitResult waits for a result event on cQueue or the ctx to be cancelled, whichever happens
+// first.
+// The returned result event is checked for errors its error is returned if set.
+func (a *AdminClient) waitResult(ctx context.Context, cQueue *C.rd_kafka_queue_t, cEventType C.rd_kafka_event_type_t) (rkev *C.rd_kafka_event_t, err error) {
+
+ resultChan := make(chan *C.rd_kafka_event_t)
+ closeChan := make(chan bool) // never written to, just closed
+
+ go func() {
+ for {
+ select {
+ case _, ok := <-closeChan:
+ if !ok {
+ // Context cancelled/timed out
+ close(resultChan)
+ return
+ }
+
+ default:
+ // Wait for result event for at most 50ms
+ // to avoid blocking for too long if
+ // context is cancelled.
+ rkev := C.rd_kafka_queue_poll(cQueue, 50)
+ if rkev != nil {
+ resultChan <- rkev
+ close(resultChan)
+ return
+ }
+ }
+ }
+ }()
+
+ select {
+ case rkev = <-resultChan:
+ // Result type check
+ if cEventType != C.rd_kafka_event_type(rkev) {
+ err = newErrorFromString(ErrInvalidType,
+ fmt.Sprintf("Expected %d result event, not %d", (int)(cEventType), (int)(C.rd_kafka_event_type(rkev))))
+ C.rd_kafka_event_destroy(rkev)
+ return nil, err
+ }
+
+ // Generic error handling
+ cErr := C.rd_kafka_event_error(rkev)
+ if cErr != 0 {
+ err = newErrorFromCString(cErr, C.rd_kafka_event_error_string(rkev))
+ C.rd_kafka_event_destroy(rkev)
+ return nil, err
+ }
+ close(closeChan)
+ return rkev, nil
+ case <-ctx.Done():
+ // signal close to go-routine
+ close(closeChan)
+ // wait for close from go-routine to make sure it is done
+ // using cQueue before we return.
+ rkev, ok := <-resultChan
+ if ok {
+ // throw away result since context was cancelled
+ C.rd_kafka_event_destroy(rkev)
+ }
+ return nil, ctx.Err()
+ }
+}
+
+// cToTopicResults converts a C topic_result_t array to Go TopicResult list.
+func (a *AdminClient) cToTopicResults(cTopicRes **C.rd_kafka_topic_result_t, cCnt C.size_t) (result []TopicResult, err error) {
+
+ result = make([]TopicResult, int(cCnt))
+
+ for i := 0; i < int(cCnt); i++ {
+ cTopic := C.topic_result_by_idx(cTopicRes, cCnt, C.size_t(i))
+ result[i].Topic = C.GoString(C.rd_kafka_topic_result_name(cTopic))
+ result[i].Error = newErrorFromCString(
+ C.rd_kafka_topic_result_error(cTopic),
+ C.rd_kafka_topic_result_error_string(cTopic))
+ }
+
+ return result, nil
+}
+
+// cConfigResourceToResult converts a C ConfigResource result array to Go ConfigResourceResult
+func (a *AdminClient) cConfigResourceToResult(cRes **C.rd_kafka_ConfigResource_t, cCnt C.size_t) (result []ConfigResourceResult, err error) {
+
+ result = make([]ConfigResourceResult, int(cCnt))
+
+ for i := 0; i < int(cCnt); i++ {
+ cRes := C.ConfigResource_by_idx(cRes, cCnt, C.size_t(i))
+ result[i].Type = ResourceType(C.rd_kafka_ConfigResource_type(cRes))
+ result[i].Name = C.GoString(C.rd_kafka_ConfigResource_name(cRes))
+ result[i].Error = newErrorFromCString(
+ C.rd_kafka_ConfigResource_error(cRes),
+ C.rd_kafka_ConfigResource_error_string(cRes))
+ var cConfigCnt C.size_t
+ cConfigs := C.rd_kafka_ConfigResource_configs(cRes, &cConfigCnt)
+ if cConfigCnt > 0 {
+ result[i].Config = make(map[string]ConfigEntryResult)
+ }
+ for ci := 0; ci < int(cConfigCnt); ci++ {
+ cEntry := C.ConfigEntry_by_idx(cConfigs, cConfigCnt, C.size_t(ci))
+ entry := configEntryResultFromC(cEntry)
+ result[i].Config[entry.Name] = entry
+ }
+ }
+
+ return result, nil
+}
+
+// CreateTopics creates topics in cluster.
+//
+// The list of TopicSpecification objects define the per-topic partition count, replicas, etc.
+//
+// Topic creation is non-atomic and may succeed for some topics but fail for others,
+// make sure to check the result for topic-specific errors.
+//
+// Note: TopicSpecification is analogous to NewTopic in the Java Topic Admin API.
+func (a *AdminClient) CreateTopics(ctx context.Context, topics []TopicSpecification, options ...CreateTopicsAdminOption) (result []TopicResult, err error) {
+ cTopics := make([]*C.rd_kafka_NewTopic_t, len(topics))
+
+ cErrstrSize := C.size_t(512)
+ cErrstr := (*C.char)(C.malloc(cErrstrSize))
+ defer C.free(unsafe.Pointer(cErrstr))
+
+ // Convert Go TopicSpecifications to C TopicSpecifications
+ for i, topic := range topics {
+
+ var cReplicationFactor C.int
+ if topic.ReplicationFactor == 0 {
+ cReplicationFactor = -1
+ } else {
+ cReplicationFactor = C.int(topic.ReplicationFactor)
+ }
+ if topic.ReplicaAssignment != nil {
+ if cReplicationFactor != -1 {
+ return nil, newErrorFromString(ErrInvalidArg,
+ "TopicSpecification.ReplicationFactor and TopicSpecification.ReplicaAssignment are mutually exclusive")
+ }
+
+ if len(topic.ReplicaAssignment) != topic.NumPartitions {
+ return nil, newErrorFromString(ErrInvalidArg,
+ "TopicSpecification.ReplicaAssignment must contain exactly TopicSpecification.NumPartitions partitions")
+ }
+
+ } else if cReplicationFactor == -1 {
+ return nil, newErrorFromString(ErrInvalidArg,
+ "TopicSpecification.ReplicationFactor or TopicSpecification.ReplicaAssignment must be specified")
+ }
+
+ cTopics[i] = C.rd_kafka_NewTopic_new(
+ C.CString(topic.Topic),
+ C.int(topic.NumPartitions),
+ cReplicationFactor,
+ cErrstr, cErrstrSize)
+ if cTopics[i] == nil {
+ return nil, newErrorFromString(ErrInvalidArg,
+ fmt.Sprintf("Topic %s: %s", topic.Topic, C.GoString(cErrstr)))
+ }
+
+ defer C.rd_kafka_NewTopic_destroy(cTopics[i])
+
+ for p, replicas := range topic.ReplicaAssignment {
+ cReplicas := make([]C.int32_t, len(replicas))
+ for ri, replica := range replicas {
+ cReplicas[ri] = C.int32_t(replica)
+ }
+ cErr := C.rd_kafka_NewTopic_set_replica_assignment(
+ cTopics[i], C.int32_t(p),
+ (*C.int32_t)(&cReplicas[0]), C.size_t(len(cReplicas)),
+ cErrstr, cErrstrSize)
+ if cErr != 0 {
+ return nil, newCErrorFromString(cErr,
+ fmt.Sprintf("Failed to set replica assignment for topic %s partition %d: %s", topic.Topic, p, C.GoString(cErrstr)))
+ }
+ }
+
+ for key, value := range topic.Config {
+ cErr := C.rd_kafka_NewTopic_set_config(
+ cTopics[i],
+ C.CString(key), C.CString(value))
+ if cErr != 0 {
+ return nil, newCErrorFromString(cErr,
+ fmt.Sprintf("Failed to set config %s=%s for topic %s", key, value, topic.Topic))
+ }
+ }
+ }
+
+ // Convert Go AdminOptions (if any) to C AdminOptions
+ genericOptions := make([]AdminOption, len(options))
+ for i := range options {
+ genericOptions[i] = options[i]
+ }
+ cOptions, err := adminOptionsSetup(a.handle, C.RD_KAFKA_ADMIN_OP_CREATETOPICS, genericOptions)
+ if err != nil {
+ return nil, err
+ }
+ defer C.rd_kafka_AdminOptions_destroy(cOptions)
+
+ // Create temporary queue for async operation
+ cQueue := C.rd_kafka_queue_new(a.handle.rk)
+ defer C.rd_kafka_queue_destroy(cQueue)
+
+ // Asynchronous call
+ C.rd_kafka_CreateTopics(
+ a.handle.rk,
+ (**C.rd_kafka_NewTopic_t)(&cTopics[0]),
+ C.size_t(len(cTopics)),
+ cOptions,
+ cQueue)
+
+ // Wait for result, error or context timeout
+ rkev, err := a.waitResult(ctx, cQueue, C.RD_KAFKA_EVENT_CREATETOPICS_RESULT)
+ if err != nil {
+ return nil, err
+ }
+ defer C.rd_kafka_event_destroy(rkev)
+
+ cRes := C.rd_kafka_event_CreateTopics_result(rkev)
+
+ // Convert result from C to Go
+ var cCnt C.size_t
+ cTopicRes := C.rd_kafka_CreateTopics_result_topics(cRes, &cCnt)
+
+ return a.cToTopicResults(cTopicRes, cCnt)
+}
+
+// DeleteTopics deletes a batch of topics.
+//
+// This operation is not transactional and may succeed for a subset of topics while
+// failing others.
+// It may take several seconds after the DeleteTopics result returns success for
+// all the brokers to become aware that the topics are gone. During this time,
+// topic metadata and configuration may continue to return information about deleted topics.
+//
+// Requires broker version >= 0.10.1.0
+func (a *AdminClient) DeleteTopics(ctx context.Context, topics []string, options ...DeleteTopicsAdminOption) (result []TopicResult, err error) {
+ cTopics := make([]*C.rd_kafka_DeleteTopic_t, len(topics))
+
+ cErrstrSize := C.size_t(512)
+ cErrstr := (*C.char)(C.malloc(cErrstrSize))
+ defer C.free(unsafe.Pointer(cErrstr))
+
+ // Convert Go DeleteTopics to C DeleteTopics
+ for i, topic := range topics {
+ cTopics[i] = C.rd_kafka_DeleteTopic_new(C.CString(topic))
+ if cTopics[i] == nil {
+ return nil, newErrorFromString(ErrInvalidArg,
+ fmt.Sprintf("Invalid arguments for topic %s", topic))
+ }
+
+ defer C.rd_kafka_DeleteTopic_destroy(cTopics[i])
+ }
+
+ // Convert Go AdminOptions (if any) to C AdminOptions
+ genericOptions := make([]AdminOption, len(options))
+ for i := range options {
+ genericOptions[i] = options[i]
+ }
+ cOptions, err := adminOptionsSetup(a.handle, C.RD_KAFKA_ADMIN_OP_DELETETOPICS, genericOptions)
+ if err != nil {
+ return nil, err
+ }
+ defer C.rd_kafka_AdminOptions_destroy(cOptions)
+
+ // Create temporary queue for async operation
+ cQueue := C.rd_kafka_queue_new(a.handle.rk)
+ defer C.rd_kafka_queue_destroy(cQueue)
+
+ // Asynchronous call
+ C.rd_kafka_DeleteTopics(
+ a.handle.rk,
+ (**C.rd_kafka_DeleteTopic_t)(&cTopics[0]),
+ C.size_t(len(cTopics)),
+ cOptions,
+ cQueue)
+
+ // Wait for result, error or context timeout
+ rkev, err := a.waitResult(ctx, cQueue, C.RD_KAFKA_EVENT_DELETETOPICS_RESULT)
+ if err != nil {
+ return nil, err
+ }
+ defer C.rd_kafka_event_destroy(rkev)
+
+ cRes := C.rd_kafka_event_DeleteTopics_result(rkev)
+
+ // Convert result from C to Go
+ var cCnt C.size_t
+ cTopicRes := C.rd_kafka_DeleteTopics_result_topics(cRes, &cCnt)
+
+ return a.cToTopicResults(cTopicRes, cCnt)
+}
+
+// CreatePartitions creates additional partitions for topics.
+func (a *AdminClient) CreatePartitions(ctx context.Context, partitions []PartitionsSpecification, options ...CreatePartitionsAdminOption) (result []TopicResult, err error) {
+ cParts := make([]*C.rd_kafka_NewPartitions_t, len(partitions))
+
+ cErrstrSize := C.size_t(512)
+ cErrstr := (*C.char)(C.malloc(cErrstrSize))
+ defer C.free(unsafe.Pointer(cErrstr))
+
+ // Convert Go PartitionsSpecification to C NewPartitions
+ for i, part := range partitions {
+ cParts[i] = C.rd_kafka_NewPartitions_new(C.CString(part.Topic), C.size_t(part.IncreaseTo), cErrstr, cErrstrSize)
+ if cParts[i] == nil {
+ return nil, newErrorFromString(ErrInvalidArg,
+ fmt.Sprintf("Topic %s: %s", part.Topic, C.GoString(cErrstr)))
+ }
+
+ defer C.rd_kafka_NewPartitions_destroy(cParts[i])
+
+ for pidx, replicas := range part.ReplicaAssignment {
+ cReplicas := make([]C.int32_t, len(replicas))
+ for ri, replica := range replicas {
+ cReplicas[ri] = C.int32_t(replica)
+ }
+ cErr := C.rd_kafka_NewPartitions_set_replica_assignment(
+ cParts[i], C.int32_t(pidx),
+ (*C.int32_t)(&cReplicas[0]), C.size_t(len(cReplicas)),
+ cErrstr, cErrstrSize)
+ if cErr != 0 {
+ return nil, newCErrorFromString(cErr,
+ fmt.Sprintf("Failed to set replica assignment for topic %s new partition index %d: %s", part.Topic, pidx, C.GoString(cErrstr)))
+ }
+ }
+
+ }
+
+ // Convert Go AdminOptions (if any) to C AdminOptions
+ genericOptions := make([]AdminOption, len(options))
+ for i := range options {
+ genericOptions[i] = options[i]
+ }
+ cOptions, err := adminOptionsSetup(a.handle, C.RD_KAFKA_ADMIN_OP_CREATEPARTITIONS, genericOptions)
+ if err != nil {
+ return nil, err
+ }
+ defer C.rd_kafka_AdminOptions_destroy(cOptions)
+
+ // Create temporary queue for async operation
+ cQueue := C.rd_kafka_queue_new(a.handle.rk)
+ defer C.rd_kafka_queue_destroy(cQueue)
+
+ // Asynchronous call
+ C.rd_kafka_CreatePartitions(
+ a.handle.rk,
+ (**C.rd_kafka_NewPartitions_t)(&cParts[0]),
+ C.size_t(len(cParts)),
+ cOptions,
+ cQueue)
+
+ // Wait for result, error or context timeout
+ rkev, err := a.waitResult(ctx, cQueue, C.RD_KAFKA_EVENT_CREATEPARTITIONS_RESULT)
+ if err != nil {
+ return nil, err
+ }
+ defer C.rd_kafka_event_destroy(rkev)
+
+ cRes := C.rd_kafka_event_CreatePartitions_result(rkev)
+
+ // Convert result from C to Go
+ var cCnt C.size_t
+ cTopicRes := C.rd_kafka_CreatePartitions_result_topics(cRes, &cCnt)
+
+ return a.cToTopicResults(cTopicRes, cCnt)
+}
+
+// AlterConfigs alters/updates cluster resource configuration.
+//
+// Updates are not transactional so they may succeed for a subset
+// of the provided resources while others fail.
+// The configuration for a particular resource is updated atomically,
+// replacing values using the provided ConfigEntrys and reverting
+// unspecified ConfigEntrys to their default values.
+//
+// Requires broker version >=0.11.0.0
+//
+// AlterConfigs will replace all existing configuration for
+// the provided resources with the new configuration given,
+// reverting all other configuration to their default values.
+//
+// Multiple resources and resource types may be set, but at most one
+// resource of type ResourceBroker is allowed per call since these
+// resource requests must be sent to the broker specified in the resource.
+func (a *AdminClient) AlterConfigs(ctx context.Context, resources []ConfigResource, options ...AlterConfigsAdminOption) (result []ConfigResourceResult, err error) {
+ cRes := make([]*C.rd_kafka_ConfigResource_t, len(resources))
+
+ cErrstrSize := C.size_t(512)
+ cErrstr := (*C.char)(C.malloc(cErrstrSize))
+ defer C.free(unsafe.Pointer(cErrstr))
+
+ // Convert Go ConfigResources to C ConfigResources
+ for i, res := range resources {
+ cRes[i] = C.rd_kafka_ConfigResource_new(
+ C.rd_kafka_ResourceType_t(res.Type), C.CString(res.Name))
+ if cRes[i] == nil {
+ return nil, newErrorFromString(ErrInvalidArg,
+ fmt.Sprintf("Invalid arguments for resource %v", res))
+ }
+
+ defer C.rd_kafka_ConfigResource_destroy(cRes[i])
+
+ for _, entry := range res.Config {
+ var cErr C.rd_kafka_resp_err_t
+ switch entry.Operation {
+ case AlterOperationSet:
+ cErr = C.rd_kafka_ConfigResource_set_config(
+ cRes[i], C.CString(entry.Name), C.CString(entry.Value))
+ default:
+ panic(fmt.Sprintf("Invalid ConfigEntry.Operation: %v", entry.Operation))
+ }
+
+ if cErr != 0 {
+ return nil,
+ newCErrorFromString(cErr,
+ fmt.Sprintf("Failed to add configuration %s: %s",
+ entry, C.GoString(C.rd_kafka_err2str(cErr))))
+ }
+ }
+ }
+
+ // Convert Go AdminOptions (if any) to C AdminOptions
+ genericOptions := make([]AdminOption, len(options))
+ for i := range options {
+ genericOptions[i] = options[i]
+ }
+ cOptions, err := adminOptionsSetup(a.handle, C.RD_KAFKA_ADMIN_OP_ALTERCONFIGS, genericOptions)
+ if err != nil {
+ return nil, err
+ }
+ defer C.rd_kafka_AdminOptions_destroy(cOptions)
+
+ // Create temporary queue for async operation
+ cQueue := C.rd_kafka_queue_new(a.handle.rk)
+ defer C.rd_kafka_queue_destroy(cQueue)
+
+ // Asynchronous call
+ C.rd_kafka_AlterConfigs(
+ a.handle.rk,
+ (**C.rd_kafka_ConfigResource_t)(&cRes[0]),
+ C.size_t(len(cRes)),
+ cOptions,
+ cQueue)
+
+ // Wait for result, error or context timeout
+ rkev, err := a.waitResult(ctx, cQueue, C.RD_KAFKA_EVENT_ALTERCONFIGS_RESULT)
+ if err != nil {
+ return nil, err
+ }
+ defer C.rd_kafka_event_destroy(rkev)
+
+ cResult := C.rd_kafka_event_AlterConfigs_result(rkev)
+
+ // Convert results from C to Go
+ var cCnt C.size_t
+ cResults := C.rd_kafka_AlterConfigs_result_resources(cResult, &cCnt)
+
+ return a.cConfigResourceToResult(cResults, cCnt)
+}
+
+// DescribeConfigs retrieves configuration for cluster resources.
+//
+// The returned configuration includes default values, use
+// ConfigEntryResult.IsDefault or ConfigEntryResult.Source to distinguish
+// default values from manually configured settings.
+//
+// The value of config entries where .IsSensitive is true
+// will always be nil to avoid disclosing sensitive
+// information, such as security settings.
+//
+// Configuration entries where .IsReadOnly is true can't be modified
+// (with AlterConfigs).
+//
+// Synonym configuration entries are returned if the broker supports
+// it (broker version >= 1.1.0). See .Synonyms.
+//
+// Requires broker version >=0.11.0.0
+//
+// Multiple resources and resource types may be requested, but at most
+// one resource of type ResourceBroker is allowed per call
+// since these resource requests must be sent to the broker specified
+// in the resource.
+func (a *AdminClient) DescribeConfigs(ctx context.Context, resources []ConfigResource, options ...DescribeConfigsAdminOption) (result []ConfigResourceResult, err error) {
+ cRes := make([]*C.rd_kafka_ConfigResource_t, len(resources))
+
+ cErrstrSize := C.size_t(512)
+ cErrstr := (*C.char)(C.malloc(cErrstrSize))
+ defer C.free(unsafe.Pointer(cErrstr))
+
+ // Convert Go ConfigResources to C ConfigResources
+ for i, res := range resources {
+ cRes[i] = C.rd_kafka_ConfigResource_new(
+ C.rd_kafka_ResourceType_t(res.Type), C.CString(res.Name))
+ if cRes[i] == nil {
+ return nil, newErrorFromString(ErrInvalidArg,
+ fmt.Sprintf("Invalid arguments for resource %v", res))
+ }
+
+ defer C.rd_kafka_ConfigResource_destroy(cRes[i])
+ }
+
+ // Convert Go AdminOptions (if any) to C AdminOptions
+ genericOptions := make([]AdminOption, len(options))
+ for i := range options {
+ genericOptions[i] = options[i]
+ }
+ cOptions, err := adminOptionsSetup(a.handle, C.RD_KAFKA_ADMIN_OP_DESCRIBECONFIGS, genericOptions)
+ if err != nil {
+ return nil, err
+ }
+ defer C.rd_kafka_AdminOptions_destroy(cOptions)
+
+ // Create temporary queue for async operation
+ cQueue := C.rd_kafka_queue_new(a.handle.rk)
+ defer C.rd_kafka_queue_destroy(cQueue)
+
+ // Asynchronous call
+ C.rd_kafka_DescribeConfigs(
+ a.handle.rk,
+ (**C.rd_kafka_ConfigResource_t)(&cRes[0]),
+ C.size_t(len(cRes)),
+ cOptions,
+ cQueue)
+
+ // Wait for result, error or context timeout
+ rkev, err := a.waitResult(ctx, cQueue, C.RD_KAFKA_EVENT_DESCRIBECONFIGS_RESULT)
+ if err != nil {
+ return nil, err
+ }
+ defer C.rd_kafka_event_destroy(rkev)
+
+ cResult := C.rd_kafka_event_DescribeConfigs_result(rkev)
+
+ // Convert results from C to Go
+ var cCnt C.size_t
+ cResults := C.rd_kafka_DescribeConfigs_result_resources(cResult, &cCnt)
+
+ return a.cConfigResourceToResult(cResults, cCnt)
+}
+
+// GetMetadata queries broker for cluster and topic metadata.
+// If topic is non-nil only information about that topic is returned, else if
+// allTopics is false only information about locally used topics is returned,
+// else information about all topics is returned.
+// GetMetadata is equivalent to listTopics, describeTopics and describeCluster in the Java API.
+func (a *AdminClient) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*Metadata, error) {
+ return getMetadata(a, topic, allTopics, timeoutMs)
+}
+
+// String returns a human readable name for an AdminClient instance
+func (a *AdminClient) String() string {
+ return fmt.Sprintf("admin-%s", a.handle.String())
+}
+
+// get_handle implements the Handle interface
+func (a *AdminClient) gethandle() *handle {
+ return a.handle
+}
+
+// Close an AdminClient instance.
+func (a *AdminClient) Close() {
+ if a.isDerived {
+ // Derived AdminClient needs no cleanup.
+ a.handle = &handle{}
+ return
+ }
+
+ a.handle.cleanup()
+
+ C.rd_kafka_destroy(a.handle.rk)
+}
+
+// NewAdminClient creats a new AdminClient instance with a new underlying client instance
+func NewAdminClient(conf *ConfigMap) (*AdminClient, error) {
+
+ err := versionCheck()
+ if err != nil {
+ return nil, err
+ }
+
+ a := &AdminClient{}
+ a.handle = &handle{}
+
+ // Convert ConfigMap to librdkafka conf_t
+ cConf, err := conf.convert()
+ if err != nil {
+ return nil, err
+ }
+
+ cErrstr := (*C.char)(C.malloc(C.size_t(256)))
+ defer C.free(unsafe.Pointer(cErrstr))
+
+ // Create librdkafka producer instance. The Producer is somewhat cheaper than
+ // the consumer, but any instance type can be used for Admin APIs.
+ a.handle.rk = C.rd_kafka_new(C.RD_KAFKA_PRODUCER, cConf, cErrstr, 256)
+ if a.handle.rk == nil {
+ return nil, newErrorFromCString(C.RD_KAFKA_RESP_ERR__INVALID_ARG, cErrstr)
+ }
+
+ a.isDerived = false
+ a.handle.setup()
+
+ return a, nil
+}
+
+// NewAdminClientFromProducer derives a new AdminClient from an existing Producer instance.
+// The AdminClient will use the same configuration and connections as the parent instance.
+func NewAdminClientFromProducer(p *Producer) (a *AdminClient, err error) {
+ if p.handle.rk == nil {
+ return nil, newErrorFromString(ErrInvalidArg, "Can't derive AdminClient from closed producer")
+ }
+
+ a = &AdminClient{}
+ a.handle = &p.handle
+ a.isDerived = true
+ return a, nil
+}
+
+// NewAdminClientFromConsumer derives a new AdminClient from an existing Consumer instance.
+// The AdminClient will use the same configuration and connections as the parent instance.
+func NewAdminClientFromConsumer(c *Consumer) (a *AdminClient, err error) {
+ if c.handle.rk == nil {
+ return nil, newErrorFromString(ErrInvalidArg, "Can't derive AdminClient from closed consumer")
+ }
+
+ a = &AdminClient{}
+ a.handle = &c.handle
+ a.isDerived = true
+ return a, nil
+}