blob: 061147d8b1695cfb1735cb83ce7ab49ac5369afd [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001/**
2 * Copyright 2016 Confluent Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package kafka
18
19import (
20 "unsafe"
21)
22
23/*
24#include <stdlib.h>
25#include <librdkafka/rdkafka.h>
26
27struct rd_kafka_metadata_broker *_getMetadata_broker_element(struct rd_kafka_metadata *m, int i) {
28 return &m->brokers[i];
29}
30
31struct rd_kafka_metadata_topic *_getMetadata_topic_element(struct rd_kafka_metadata *m, int i) {
32 return &m->topics[i];
33}
34
35struct rd_kafka_metadata_partition *_getMetadata_partition_element(struct rd_kafka_metadata *m, int topic_idx, int partition_idx) {
36 return &m->topics[topic_idx].partitions[partition_idx];
37}
38
39int32_t _get_int32_element (int32_t *arr, int i) {
40 return arr[i];
41}
42
43*/
44import "C"
45
46// BrokerMetadata contains per-broker metadata
47type BrokerMetadata struct {
48 ID int32
49 Host string
50 Port int
51}
52
53// PartitionMetadata contains per-partition metadata
54type PartitionMetadata struct {
55 ID int32
56 Error Error
57 Leader int32
58 Replicas []int32
59 Isrs []int32
60}
61
62// TopicMetadata contains per-topic metadata
63type TopicMetadata struct {
64 Topic string
65 Partitions []PartitionMetadata
66 Error Error
67}
68
69// Metadata contains broker and topic metadata for all (matching) topics
70type Metadata struct {
71 Brokers []BrokerMetadata
72 Topics map[string]TopicMetadata
73
74 OriginatingBroker BrokerMetadata
75}
76
77// getMetadata queries broker for cluster and topic metadata.
78// If topic is non-nil only information about that topic is returned, else if
79// allTopics is false only information about locally used topics is returned,
80// else information about all topics is returned.
81func getMetadata(H Handle, topic *string, allTopics bool, timeoutMs int) (*Metadata, error) {
82 h := H.gethandle()
83
84 var rkt *C.rd_kafka_topic_t
85 if topic != nil {
86 rkt = h.getRkt(*topic)
87 }
88
89 var cMd *C.struct_rd_kafka_metadata
90 cErr := C.rd_kafka_metadata(h.rk, bool2cint(allTopics),
91 rkt, &cMd, C.int(timeoutMs))
92 if cErr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
93 return nil, newError(cErr)
94 }
95
96 m := Metadata{}
97 defer C.rd_kafka_metadata_destroy(cMd)
98
99 m.Brokers = make([]BrokerMetadata, cMd.broker_cnt)
100 for i := 0; i < int(cMd.broker_cnt); i++ {
101 b := C._getMetadata_broker_element(cMd, C.int(i))
102 m.Brokers[i] = BrokerMetadata{int32(b.id), C.GoString(b.host),
103 int(b.port)}
104 }
105
106 m.Topics = make(map[string]TopicMetadata, int(cMd.topic_cnt))
107 for i := 0; i < int(cMd.topic_cnt); i++ {
108 t := C._getMetadata_topic_element(cMd, C.int(i))
109
110 thisTopic := C.GoString(t.topic)
111 m.Topics[thisTopic] = TopicMetadata{Topic: thisTopic,
112 Error: newError(t.err),
113 Partitions: make([]PartitionMetadata, int(t.partition_cnt))}
114
115 for j := 0; j < int(t.partition_cnt); j++ {
116 p := C._getMetadata_partition_element(cMd, C.int(i), C.int(j))
117 m.Topics[thisTopic].Partitions[j] = PartitionMetadata{
118 ID: int32(p.id),
119 Error: newError(p.err),
120 Leader: int32(p.leader)}
121 m.Topics[thisTopic].Partitions[j].Replicas = make([]int32, int(p.replica_cnt))
122 for ir := 0; ir < int(p.replica_cnt); ir++ {
123 m.Topics[thisTopic].Partitions[j].Replicas[ir] = int32(C._get_int32_element(p.replicas, C.int(ir)))
124 }
125
126 m.Topics[thisTopic].Partitions[j].Isrs = make([]int32, int(p.isr_cnt))
127 for ii := 0; ii < int(p.isr_cnt); ii++ {
128 m.Topics[thisTopic].Partitions[j].Isrs[ii] = int32(C._get_int32_element(p.isrs, C.int(ii)))
129 }
130 }
131 }
132
133 m.OriginatingBroker = BrokerMetadata{int32(cMd.orig_broker_id),
134 C.GoString(cMd.orig_broker_name), 0}
135
136 return &m, nil
137}
138
139// queryWatermarkOffsets returns the broker's low and high offsets for the given topic
140// and partition.
141func queryWatermarkOffsets(H Handle, topic string, partition int32, timeoutMs int) (low, high int64, err error) {
142 h := H.gethandle()
143
144 ctopic := C.CString(topic)
145 defer C.free(unsafe.Pointer(ctopic))
146
147 var cLow, cHigh C.int64_t
148
149 e := C.rd_kafka_query_watermark_offsets(h.rk, ctopic, C.int32_t(partition),
150 &cLow, &cHigh, C.int(timeoutMs))
151 if e != C.RD_KAFKA_RESP_ERR_NO_ERROR {
152 return 0, 0, newError(e)
153 }
154
155 low = int64(cLow)
156 high = int64(cHigh)
157 return low, high, nil
158}