blob: c09e64d8ab8f29a4d0b2f63e92d3b09ddbf4ca02 [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001package kafka
2
3/**
4 * Copyright 2016 Confluent Inc.
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19import (
20 "fmt"
21 "sync"
22 "unsafe"
23)
24
25/*
26#include <librdkafka/rdkafka.h>
27#include <stdlib.h>
28*/
29import "C"
30
31// Handle represents a generic client handle containing common parts for
32// both Producer and Consumer.
33type Handle interface {
34 gethandle() *handle
35}
36
37// Common instance handle for both Producer and Consumer
38type handle struct {
39 rk *C.rd_kafka_t
40 rkq *C.rd_kafka_queue_t
41
42 // Termination of background go-routines
43 terminatedChan chan string // string is go-routine name
44
45 // Topic <-> rkt caches
46 rktCacheLock sync.Mutex
47 // topic name -> rkt cache
48 rktCache map[string]*C.rd_kafka_topic_t
49 // rkt -> topic name cache
50 rktNameCache map[*C.rd_kafka_topic_t]string
51
52 //
53 // cgo map
54 // Maps C callbacks based on cgoid back to its Go object
55 cgoLock sync.Mutex
56 cgoidNext uintptr
57 cgomap map[int]cgoif
58
59 //
60 // producer
61 //
62 p *Producer
63
64 // Forward delivery reports on Producer.Events channel
65 fwdDr bool
66
67 //
68 // consumer
69 //
70 c *Consumer
71
72 // Forward rebalancing ack responsibility to application (current setting)
73 currAppRebalanceEnable bool
74}
75
76func (h *handle) String() string {
77 return C.GoString(C.rd_kafka_name(h.rk))
78}
79
80func (h *handle) setup() {
81 h.rktCache = make(map[string]*C.rd_kafka_topic_t)
82 h.rktNameCache = make(map[*C.rd_kafka_topic_t]string)
83 h.cgomap = make(map[int]cgoif)
84 h.terminatedChan = make(chan string, 10)
85}
86
87func (h *handle) cleanup() {
88 for _, crkt := range h.rktCache {
89 C.rd_kafka_topic_destroy(crkt)
90 }
91
92 if h.rkq != nil {
93 C.rd_kafka_queue_destroy(h.rkq)
94 }
95}
96
97// waitTerminated waits termination of background go-routines.
98// termCnt is the number of goroutines expected to signal termination completion
99// on h.terminatedChan
100func (h *handle) waitTerminated(termCnt int) {
101 // Wait for termCnt termination-done events from goroutines
102 for ; termCnt > 0; termCnt-- {
103 _ = <-h.terminatedChan
104 }
105}
106
107// getRkt0 finds or creates and returns a C topic_t object from the local cache.
108func (h *handle) getRkt0(topic string, ctopic *C.char, doLock bool) (crkt *C.rd_kafka_topic_t) {
109 if doLock {
110 h.rktCacheLock.Lock()
111 defer h.rktCacheLock.Unlock()
112 }
113 crkt, ok := h.rktCache[topic]
114 if ok {
115 return crkt
116 }
117
118 if ctopic == nil {
119 ctopic = C.CString(topic)
120 defer C.free(unsafe.Pointer(ctopic))
121 }
122
123 crkt = C.rd_kafka_topic_new(h.rk, ctopic, nil)
124 if crkt == nil {
125 panic(fmt.Sprintf("Unable to create new C topic \"%s\": %s",
126 topic, C.GoString(C.rd_kafka_err2str(C.rd_kafka_last_error()))))
127 }
128
129 h.rktCache[topic] = crkt
130 h.rktNameCache[crkt] = topic
131
132 return crkt
133}
134
135// getRkt finds or creates and returns a C topic_t object from the local cache.
136func (h *handle) getRkt(topic string) (crkt *C.rd_kafka_topic_t) {
137 return h.getRkt0(topic, nil, true)
138}
139
140// getTopicNameFromRkt returns the topic name for a C topic_t object, preferably
141// using the local cache to avoid a cgo call.
142func (h *handle) getTopicNameFromRkt(crkt *C.rd_kafka_topic_t) (topic string) {
143 h.rktCacheLock.Lock()
144 defer h.rktCacheLock.Unlock()
145
146 topic, ok := h.rktNameCache[crkt]
147 if ok {
148 return topic
149 }
150
151 // we need our own copy/refcount of the crkt
152 ctopic := C.rd_kafka_topic_name(crkt)
153 topic = C.GoString(ctopic)
154
155 crkt = h.getRkt0(topic, ctopic, false /* dont lock */)
156
157 return topic
158}
159
160// cgoif is a generic interface for holding Go state passed as opaque
161// value to the C code.
162// Since pointers to complex Go types cannot be passed to C we instead create
163// a cgoif object, generate a unique id that is added to the cgomap,
164// and then pass that id to the C code. When the C code callback is called we
165// use the id to look up the cgoif object in the cgomap.
166type cgoif interface{}
167
168// delivery report cgoif container
169type cgoDr struct {
170 deliveryChan chan Event
171 opaque interface{}
172}
173
174// cgoPut adds object cg to the handle's cgo map and returns a
175// unique id for the added entry.
176// Thread-safe.
177// FIXME: the uniquity of the id is questionable over time.
178func (h *handle) cgoPut(cg cgoif) (cgoid int) {
179 h.cgoLock.Lock()
180 defer h.cgoLock.Unlock()
181
182 h.cgoidNext++
183 if h.cgoidNext == 0 {
184 h.cgoidNext++
185 }
186 cgoid = (int)(h.cgoidNext)
187 h.cgomap[cgoid] = cg
188 return cgoid
189}
190
191// cgoGet looks up cgoid in the cgo map, deletes the reference from the map
192// and returns the object, if found. Else returns nil, false.
193// Thread-safe.
194func (h *handle) cgoGet(cgoid int) (cg cgoif, found bool) {
195 if cgoid == 0 {
196 return nil, false
197 }
198
199 h.cgoLock.Lock()
200 defer h.cgoLock.Unlock()
201 cg, found = h.cgomap[cgoid]
202 if found {
203 delete(h.cgomap, cgoid)
204 }
205
206 return cg, found
207}