blob: 19a8be0f67fdaef04e5f8de6bafb38416069501d [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001/**
2 * Copyright 2018 Confluent Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package kafka
18
19import (
20 "fmt"
21 "time"
22 "unsafe"
23)
24
25/*
26#include <librdkafka/rdkafka.h>
27#include <stdlib.h>
28*/
29import "C"
30
31// AdminOptionOperationTimeout sets the broker's operation timeout, such as the
32// timeout for CreateTopics to complete the creation of topics on the controller
33// before returning a result to the application.
34//
35// CreateTopics, DeleteTopics, CreatePartitions:
36// a value 0 will return immediately after triggering topic
37// creation, while > 0 will wait this long for topic creation to propagate
38// in cluster.
39//
40// Default: 0 (return immediately).
41//
42// Valid for CreateTopics, DeleteTopics, CreatePartitions.
43type AdminOptionOperationTimeout struct {
44 isSet bool
45 val time.Duration
46}
47
48func (ao AdminOptionOperationTimeout) supportsCreateTopics() {
49}
50func (ao AdminOptionOperationTimeout) supportsDeleteTopics() {
51}
52func (ao AdminOptionOperationTimeout) supportsCreatePartitions() {
53}
54
55func (ao AdminOptionOperationTimeout) apply(cOptions *C.rd_kafka_AdminOptions_t) error {
56 if !ao.isSet {
57 return nil
58 }
59
60 cErrstrSize := C.size_t(512)
61 cErrstr := (*C.char)(C.malloc(cErrstrSize))
62 defer C.free(unsafe.Pointer(cErrstr))
63
64 cErr := C.rd_kafka_AdminOptions_set_operation_timeout(
65 cOptions, C.int(durationToMilliseconds(ao.val)),
66 cErrstr, cErrstrSize)
67 if cErr != 0 {
68 C.rd_kafka_AdminOptions_destroy(cOptions)
69 return newCErrorFromString(cErr,
70 fmt.Sprintf("Failed to set operation timeout: %s", C.GoString(cErrstr)))
71
72 }
73
74 return nil
75}
76
77// SetAdminOperationTimeout sets the broker's operation timeout, such as the
78// timeout for CreateTopics to complete the creation of topics on the controller
79// before returning a result to the application.
80//
81// CreateTopics, DeleteTopics, CreatePartitions:
82// a value 0 will return immediately after triggering topic
83// creation, while > 0 will wait this long for topic creation to propagate
84// in cluster.
85//
86// Default: 0 (return immediately).
87//
88// Valid for CreateTopics, DeleteTopics, CreatePartitions.
89func SetAdminOperationTimeout(t time.Duration) (ao AdminOptionOperationTimeout) {
90 ao.isSet = true
91 ao.val = t
92 return ao
93}
94
95// AdminOptionRequestTimeout sets the overall request timeout, including broker
96// lookup, request transmission, operation time on broker, and response.
97//
98// Default: `socket.timeout.ms`.
99//
100// Valid for all Admin API methods.
101type AdminOptionRequestTimeout struct {
102 isSet bool
103 val time.Duration
104}
105
106func (ao AdminOptionRequestTimeout) supportsCreateTopics() {
107}
108func (ao AdminOptionRequestTimeout) supportsDeleteTopics() {
109}
110func (ao AdminOptionRequestTimeout) supportsCreatePartitions() {
111}
112func (ao AdminOptionRequestTimeout) supportsAlterConfigs() {
113}
114func (ao AdminOptionRequestTimeout) supportsDescribeConfigs() {
115}
116
117func (ao AdminOptionRequestTimeout) apply(cOptions *C.rd_kafka_AdminOptions_t) error {
118 if !ao.isSet {
119 return nil
120 }
121
122 cErrstrSize := C.size_t(512)
123 cErrstr := (*C.char)(C.malloc(cErrstrSize))
124 defer C.free(unsafe.Pointer(cErrstr))
125
126 cErr := C.rd_kafka_AdminOptions_set_request_timeout(
127 cOptions, C.int(durationToMilliseconds(ao.val)),
128 cErrstr, cErrstrSize)
129 if cErr != 0 {
130 C.rd_kafka_AdminOptions_destroy(cOptions)
131 return newCErrorFromString(cErr,
132 fmt.Sprintf("%s", C.GoString(cErrstr)))
133
134 }
135
136 return nil
137}
138
139// SetAdminRequestTimeout sets the overall request timeout, including broker
140// lookup, request transmission, operation time on broker, and response.
141//
142// Default: `socket.timeout.ms`.
143//
144// Valid for all Admin API methods.
145func SetAdminRequestTimeout(t time.Duration) (ao AdminOptionRequestTimeout) {
146 ao.isSet = true
147 ao.val = t
148 return ao
149}
150
151// AdminOptionValidateOnly tells the broker to only validate the request,
152// without performing the requested operation (create topics, etc).
153//
154// Default: false.
155//
156// Valid for CreateTopics, CreatePartitions, AlterConfigs
157type AdminOptionValidateOnly struct {
158 isSet bool
159 val bool
160}
161
162func (ao AdminOptionValidateOnly) supportsCreateTopics() {
163}
164func (ao AdminOptionValidateOnly) supportsCreatePartitions() {
165}
166func (ao AdminOptionValidateOnly) supportsAlterConfigs() {
167}
168
169func (ao AdminOptionValidateOnly) apply(cOptions *C.rd_kafka_AdminOptions_t) error {
170 if !ao.isSet {
171 return nil
172 }
173
174 cErrstrSize := C.size_t(512)
175 cErrstr := (*C.char)(C.malloc(cErrstrSize))
176 defer C.free(unsafe.Pointer(cErrstr))
177
178 cErr := C.rd_kafka_AdminOptions_set_validate_only(
179 cOptions, bool2cint(ao.val),
180 cErrstr, cErrstrSize)
181 if cErr != 0 {
182 C.rd_kafka_AdminOptions_destroy(cOptions)
183 return newCErrorFromString(cErr,
184 fmt.Sprintf("%s", C.GoString(cErrstr)))
185
186 }
187
188 return nil
189}
190
191// SetAdminValidateOnly tells the broker to only validate the request,
192// without performing the requested operation (create topics, etc).
193//
194// Default: false.
195//
196// Valid for CreateTopics, DeleteTopics, CreatePartitions, AlterConfigs
197func SetAdminValidateOnly(validateOnly bool) (ao AdminOptionValidateOnly) {
198 ao.isSet = true
199 ao.val = validateOnly
200 return ao
201}
202
203// CreateTopicsAdminOption - see setters.
204//
205// See SetAdminRequestTimeout, SetAdminOperationTimeout, SetAdminValidateOnly.
206type CreateTopicsAdminOption interface {
207 supportsCreateTopics()
208 apply(cOptions *C.rd_kafka_AdminOptions_t) error
209}
210
211// DeleteTopicsAdminOption - see setters.
212//
213// See SetAdminRequestTimeout, SetAdminOperationTimeout.
214type DeleteTopicsAdminOption interface {
215 supportsDeleteTopics()
216 apply(cOptions *C.rd_kafka_AdminOptions_t) error
217}
218
219// CreatePartitionsAdminOption - see setters.
220//
221// See SetAdminRequestTimeout, SetAdminOperationTimeout, SetAdminValidateOnly.
222type CreatePartitionsAdminOption interface {
223 supportsCreatePartitions()
224 apply(cOptions *C.rd_kafka_AdminOptions_t) error
225}
226
227// AlterConfigsAdminOption - see setters.
228//
229// See SetAdminRequestTimeout, SetAdminValidateOnly, SetAdminIncremental.
230type AlterConfigsAdminOption interface {
231 supportsAlterConfigs()
232 apply(cOptions *C.rd_kafka_AdminOptions_t) error
233}
234
235// DescribeConfigsAdminOption - see setters.
236//
237// See SetAdminRequestTimeout.
238type DescribeConfigsAdminOption interface {
239 supportsDescribeConfigs()
240 apply(cOptions *C.rd_kafka_AdminOptions_t) error
241}
242
243// AdminOption is a generic type not to be used directly.
244//
245// See CreateTopicsAdminOption et.al.
246type AdminOption interface {
247 apply(cOptions *C.rd_kafka_AdminOptions_t) error
248}
249
250func adminOptionsSetup(h *handle, opType C.rd_kafka_admin_op_t, options []AdminOption) (*C.rd_kafka_AdminOptions_t, error) {
251
252 cOptions := C.rd_kafka_AdminOptions_new(h.rk, opType)
253 for _, opt := range options {
254 if opt == nil {
255 continue
256 }
257 err := opt.apply(cOptions)
258 if err != nil {
259 return nil, err
260 }
261 }
262
263 return cOptions, nil
264}