blob: 5866a47e911976a8ceac8939d8720d0dbda7c0d9 [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001package kafka
2
3/**
4 * Copyright 2016 Confluent Inc.
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19import (
20 "fmt"
21 "reflect"
22 "strings"
23 "unsafe"
24)
25
26/*
27#include <stdlib.h>
28#include <librdkafka/rdkafka.h>
29*/
30import "C"
31
32// ConfigValue supports the following types:
33// bool, int, string, any type with the standard String() interface
34type ConfigValue interface{}
35
36// ConfigMap is a map contaning standard librdkafka configuration properties as documented in:
37// https://github.com/edenhill/librdkafka/tree/master/CONFIGURATION.md
38//
39// The special property "default.topic.config" (optional) is a ConfigMap containing default topic
40// configuration properties.
41type ConfigMap map[string]ConfigValue
42
43// SetKey sets configuration property key to value.
44// For user convenience a key prefixed with {topic}. will be
45// set on the "default.topic.config" sub-map.
46func (m ConfigMap) SetKey(key string, value ConfigValue) error {
47 if strings.HasPrefix(key, "{topic}.") {
48 _, found := m["default.topic.config"]
49 if !found {
50 m["default.topic.config"] = ConfigMap{}
51 }
52 m["default.topic.config"].(ConfigMap)[strings.TrimPrefix(key, "{topic}.")] = value
53 } else {
54 m[key] = value
55 }
56
57 return nil
58}
59
60// Set implements flag.Set (command line argument parser) as a convenience
61// for `-X key=value` config.
62func (m ConfigMap) Set(kv string) error {
63 i := strings.Index(kv, "=")
64 if i == -1 {
65 return Error{ErrInvalidArg, "Expected key=value"}
66 }
67
68 k := kv[:i]
69 v := kv[i+1:]
70
71 return m.SetKey(k, v)
72}
73
74func value2string(v ConfigValue) (ret string, errstr string) {
75
76 switch x := v.(type) {
77 case bool:
78 if x {
79 ret = "true"
80 } else {
81 ret = "false"
82 }
83 case int:
84 ret = fmt.Sprintf("%d", x)
85 case string:
86 ret = x
87 case fmt.Stringer:
88 ret = x.String()
89 default:
90 return "", fmt.Sprintf("Invalid value type %T", v)
91 }
92
93 return ret, ""
94}
95
96// rdkAnyconf abstracts rd_kafka_conf_t and rd_kafka_topic_conf_t
97// into a common interface.
98type rdkAnyconf interface {
99 set(cKey *C.char, cVal *C.char, cErrstr *C.char, errstrSize int) C.rd_kafka_conf_res_t
100}
101
102func anyconfSet(anyconf rdkAnyconf, key string, val ConfigValue) (err error) {
103 value, errstr := value2string(val)
104 if errstr != "" {
105 return Error{ErrInvalidArg, fmt.Sprintf("%s for key %s (expected string,bool,int,ConfigMap)", errstr, key)}
106 }
107 cKey := C.CString(key)
108 cVal := C.CString(value)
109 cErrstr := (*C.char)(C.malloc(C.size_t(128)))
110 defer C.free(unsafe.Pointer(cErrstr))
111
112 if anyconf.set(cKey, cVal, cErrstr, 128) != C.RD_KAFKA_CONF_OK {
113 C.free(unsafe.Pointer(cKey))
114 C.free(unsafe.Pointer(cVal))
115 return newErrorFromCString(C.RD_KAFKA_RESP_ERR__INVALID_ARG, cErrstr)
116 }
117
118 return nil
119}
120
121// we need these typedefs to workaround a crash in golint
122// when parsing the set() methods below
123type rdkConf C.rd_kafka_conf_t
124type rdkTopicConf C.rd_kafka_topic_conf_t
125
126func (cConf *rdkConf) set(cKey *C.char, cVal *C.char, cErrstr *C.char, errstrSize int) C.rd_kafka_conf_res_t {
127 return C.rd_kafka_conf_set((*C.rd_kafka_conf_t)(cConf), cKey, cVal, cErrstr, C.size_t(errstrSize))
128}
129
130func (ctopicConf *rdkTopicConf) set(cKey *C.char, cVal *C.char, cErrstr *C.char, errstrSize int) C.rd_kafka_conf_res_t {
131 return C.rd_kafka_topic_conf_set((*C.rd_kafka_topic_conf_t)(ctopicConf), cKey, cVal, cErrstr, C.size_t(errstrSize))
132}
133
134func configConvertAnyconf(m ConfigMap, anyconf rdkAnyconf) (err error) {
135 // set plugins first, any plugin-specific configuration depends on
136 // the plugin to have already been set
137 pluginPaths, ok := m["plugin.library.paths"]
138 if ok {
139 err = anyconfSet(anyconf, "plugin.library.paths", pluginPaths)
140 if err != nil {
141 return err
142 }
143 }
144 for k, v := range m {
145 if k == "plugin.library.paths" {
146 continue
147 }
148 switch v.(type) {
149 case ConfigMap:
150 /* Special sub-ConfigMap, only used for default.topic.config */
151
152 if k != "default.topic.config" {
153 return Error{ErrInvalidArg, fmt.Sprintf("Invalid type for key %s", k)}
154 }
155
156 var cTopicConf = C.rd_kafka_topic_conf_new()
157
158 err = configConvertAnyconf(v.(ConfigMap),
159 (*rdkTopicConf)(cTopicConf))
160 if err != nil {
161 C.rd_kafka_topic_conf_destroy(cTopicConf)
162 return err
163 }
164
165 C.rd_kafka_conf_set_default_topic_conf(
166 (*C.rd_kafka_conf_t)(anyconf.(*rdkConf)),
167 (*C.rd_kafka_topic_conf_t)((*rdkTopicConf)(cTopicConf)))
168
169 default:
170 err = anyconfSet(anyconf, k, v)
171 if err != nil {
172 return err
173 }
174 }
175 }
176
177 return nil
178}
179
180// convert ConfigMap to C rd_kafka_conf_t *
181func (m ConfigMap) convert() (cConf *C.rd_kafka_conf_t, err error) {
182 cConf = C.rd_kafka_conf_new()
183
184 err = configConvertAnyconf(m, (*rdkConf)(cConf))
185 if err != nil {
186 C.rd_kafka_conf_destroy(cConf)
187 return nil, err
188 }
189 return cConf, nil
190}
191
192// get finds key in the configmap and returns its value.
193// If the key is not found defval is returned.
194// If the key is found but the type is mismatched an error is returned.
195func (m ConfigMap) get(key string, defval ConfigValue) (ConfigValue, error) {
196 if strings.HasPrefix(key, "{topic}.") {
197 defconfCv, found := m["default.topic.config"]
198 if !found {
199 return defval, nil
200 }
201 return defconfCv.(ConfigMap).get(strings.TrimPrefix(key, "{topic}."), defval)
202 }
203
204 v, ok := m[key]
205 if !ok {
206 return defval, nil
207 }
208
209 if defval != nil && reflect.TypeOf(defval) != reflect.TypeOf(v) {
210 return nil, Error{ErrInvalidArg, fmt.Sprintf("%s expects type %T, not %T", key, defval, v)}
211 }
212
213 return v, nil
214}
215
216// extract performs a get() and if found deletes the key.
217func (m ConfigMap) extract(key string, defval ConfigValue) (ConfigValue, error) {
218
219 v, err := m.get(key, defval)
220 if err != nil {
221 return nil, err
222 }
223
224 delete(m, key)
225
226 return v, nil
227}
228
229func (m ConfigMap) clone() ConfigMap {
230 m2 := make(ConfigMap)
231 for k, v := range m {
232 m2[k] = v
233 }
234 return m2
235}
236
237// Get finds the given key in the ConfigMap and returns its value.
238// If the key is not found `defval` is returned.
239// If the key is found but the type does not match that of `defval` (unless nil)
240// an ErrInvalidArg error is returned.
241func (m ConfigMap) Get(key string, defval ConfigValue) (ConfigValue, error) {
242 return m.get(key, defval)
243}