blob: 19a8be0f67fdaef04e5f8de6bafb38416069501d [file] [log] [blame]
/**
* Copyright 2018 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka
import (
"fmt"
"time"
"unsafe"
)
/*
#include <librdkafka/rdkafka.h>
#include <stdlib.h>
*/
import "C"
// AdminOptionOperationTimeout sets the broker's operation timeout, such as the
// timeout for CreateTopics to complete the creation of topics on the controller
// before returning a result to the application.
//
// CreateTopics, DeleteTopics, CreatePartitions:
// a value 0 will return immediately after triggering topic
// creation, while > 0 will wait this long for topic creation to propagate
// in cluster.
//
// Default: 0 (return immediately).
//
// Valid for CreateTopics, DeleteTopics, CreatePartitions.
type AdminOptionOperationTimeout struct {
isSet bool
val time.Duration
}
func (ao AdminOptionOperationTimeout) supportsCreateTopics() {
}
func (ao AdminOptionOperationTimeout) supportsDeleteTopics() {
}
func (ao AdminOptionOperationTimeout) supportsCreatePartitions() {
}
func (ao AdminOptionOperationTimeout) apply(cOptions *C.rd_kafka_AdminOptions_t) error {
if !ao.isSet {
return nil
}
cErrstrSize := C.size_t(512)
cErrstr := (*C.char)(C.malloc(cErrstrSize))
defer C.free(unsafe.Pointer(cErrstr))
cErr := C.rd_kafka_AdminOptions_set_operation_timeout(
cOptions, C.int(durationToMilliseconds(ao.val)),
cErrstr, cErrstrSize)
if cErr != 0 {
C.rd_kafka_AdminOptions_destroy(cOptions)
return newCErrorFromString(cErr,
fmt.Sprintf("Failed to set operation timeout: %s", C.GoString(cErrstr)))
}
return nil
}
// SetAdminOperationTimeout sets the broker's operation timeout, such as the
// timeout for CreateTopics to complete the creation of topics on the controller
// before returning a result to the application.
//
// CreateTopics, DeleteTopics, CreatePartitions:
// a value 0 will return immediately after triggering topic
// creation, while > 0 will wait this long for topic creation to propagate
// in cluster.
//
// Default: 0 (return immediately).
//
// Valid for CreateTopics, DeleteTopics, CreatePartitions.
func SetAdminOperationTimeout(t time.Duration) (ao AdminOptionOperationTimeout) {
ao.isSet = true
ao.val = t
return ao
}
// AdminOptionRequestTimeout sets the overall request timeout, including broker
// lookup, request transmission, operation time on broker, and response.
//
// Default: `socket.timeout.ms`.
//
// Valid for all Admin API methods.
type AdminOptionRequestTimeout struct {
isSet bool
val time.Duration
}
func (ao AdminOptionRequestTimeout) supportsCreateTopics() {
}
func (ao AdminOptionRequestTimeout) supportsDeleteTopics() {
}
func (ao AdminOptionRequestTimeout) supportsCreatePartitions() {
}
func (ao AdminOptionRequestTimeout) supportsAlterConfigs() {
}
func (ao AdminOptionRequestTimeout) supportsDescribeConfigs() {
}
func (ao AdminOptionRequestTimeout) apply(cOptions *C.rd_kafka_AdminOptions_t) error {
if !ao.isSet {
return nil
}
cErrstrSize := C.size_t(512)
cErrstr := (*C.char)(C.malloc(cErrstrSize))
defer C.free(unsafe.Pointer(cErrstr))
cErr := C.rd_kafka_AdminOptions_set_request_timeout(
cOptions, C.int(durationToMilliseconds(ao.val)),
cErrstr, cErrstrSize)
if cErr != 0 {
C.rd_kafka_AdminOptions_destroy(cOptions)
return newCErrorFromString(cErr,
fmt.Sprintf("%s", C.GoString(cErrstr)))
}
return nil
}
// SetAdminRequestTimeout sets the overall request timeout, including broker
// lookup, request transmission, operation time on broker, and response.
//
// Default: `socket.timeout.ms`.
//
// Valid for all Admin API methods.
func SetAdminRequestTimeout(t time.Duration) (ao AdminOptionRequestTimeout) {
ao.isSet = true
ao.val = t
return ao
}
// AdminOptionValidateOnly tells the broker to only validate the request,
// without performing the requested operation (create topics, etc).
//
// Default: false.
//
// Valid for CreateTopics, CreatePartitions, AlterConfigs
type AdminOptionValidateOnly struct {
isSet bool
val bool
}
func (ao AdminOptionValidateOnly) supportsCreateTopics() {
}
func (ao AdminOptionValidateOnly) supportsCreatePartitions() {
}
func (ao AdminOptionValidateOnly) supportsAlterConfigs() {
}
func (ao AdminOptionValidateOnly) apply(cOptions *C.rd_kafka_AdminOptions_t) error {
if !ao.isSet {
return nil
}
cErrstrSize := C.size_t(512)
cErrstr := (*C.char)(C.malloc(cErrstrSize))
defer C.free(unsafe.Pointer(cErrstr))
cErr := C.rd_kafka_AdminOptions_set_validate_only(
cOptions, bool2cint(ao.val),
cErrstr, cErrstrSize)
if cErr != 0 {
C.rd_kafka_AdminOptions_destroy(cOptions)
return newCErrorFromString(cErr,
fmt.Sprintf("%s", C.GoString(cErrstr)))
}
return nil
}
// SetAdminValidateOnly tells the broker to only validate the request,
// without performing the requested operation (create topics, etc).
//
// Default: false.
//
// Valid for CreateTopics, DeleteTopics, CreatePartitions, AlterConfigs
func SetAdminValidateOnly(validateOnly bool) (ao AdminOptionValidateOnly) {
ao.isSet = true
ao.val = validateOnly
return ao
}
// CreateTopicsAdminOption - see setters.
//
// See SetAdminRequestTimeout, SetAdminOperationTimeout, SetAdminValidateOnly.
type CreateTopicsAdminOption interface {
supportsCreateTopics()
apply(cOptions *C.rd_kafka_AdminOptions_t) error
}
// DeleteTopicsAdminOption - see setters.
//
// See SetAdminRequestTimeout, SetAdminOperationTimeout.
type DeleteTopicsAdminOption interface {
supportsDeleteTopics()
apply(cOptions *C.rd_kafka_AdminOptions_t) error
}
// CreatePartitionsAdminOption - see setters.
//
// See SetAdminRequestTimeout, SetAdminOperationTimeout, SetAdminValidateOnly.
type CreatePartitionsAdminOption interface {
supportsCreatePartitions()
apply(cOptions *C.rd_kafka_AdminOptions_t) error
}
// AlterConfigsAdminOption - see setters.
//
// See SetAdminRequestTimeout, SetAdminValidateOnly, SetAdminIncremental.
type AlterConfigsAdminOption interface {
supportsAlterConfigs()
apply(cOptions *C.rd_kafka_AdminOptions_t) error
}
// DescribeConfigsAdminOption - see setters.
//
// See SetAdminRequestTimeout.
type DescribeConfigsAdminOption interface {
supportsDescribeConfigs()
apply(cOptions *C.rd_kafka_AdminOptions_t) error
}
// AdminOption is a generic type not to be used directly.
//
// See CreateTopicsAdminOption et.al.
type AdminOption interface {
apply(cOptions *C.rd_kafka_AdminOptions_t) error
}
func adminOptionsSetup(h *handle, opType C.rd_kafka_admin_op_t, options []AdminOption) (*C.rd_kafka_AdminOptions_t, error) {
cOptions := C.rd_kafka_AdminOptions_new(h.rk, opType)
for _, opt := range options {
if opt == nil {
continue
}
err := opt.apply(cOptions)
if err != nil {
return nil, err
}
}
return cOptions, nil
}