khenaidoo | ac63710 | 2019-01-14 15:44:34 -0500 | [diff] [blame^] | 1 | package kafka |
| 2 | |
| 3 | /** |
| 4 | * Copyright 2016 Confluent Inc. |
| 5 | * |
| 6 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 7 | * you may not use this file except in compliance with the License. |
| 8 | * You may obtain a copy of the License at |
| 9 | * |
| 10 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 11 | * |
| 12 | * Unless required by applicable law or agreed to in writing, software |
| 13 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 15 | * See the License for the specific language governing permissions and |
| 16 | * limitations under the License. |
| 17 | */ |
| 18 | |
| 19 | import ( |
| 20 | "fmt" |
| 21 | "sync" |
| 22 | "unsafe" |
| 23 | ) |
| 24 | |
| 25 | /* |
| 26 | #include <librdkafka/rdkafka.h> |
| 27 | #include <stdlib.h> |
| 28 | */ |
| 29 | import "C" |
| 30 | |
| 31 | // Handle represents a generic client handle containing common parts for |
| 32 | // both Producer and Consumer. |
| 33 | type Handle interface { |
| 34 | gethandle() *handle |
| 35 | } |
| 36 | |
| 37 | // Common instance handle for both Producer and Consumer |
| 38 | type handle struct { |
| 39 | rk *C.rd_kafka_t |
| 40 | rkq *C.rd_kafka_queue_t |
| 41 | |
| 42 | // Termination of background go-routines |
| 43 | terminatedChan chan string // string is go-routine name |
| 44 | |
| 45 | // Topic <-> rkt caches |
| 46 | rktCacheLock sync.Mutex |
| 47 | // topic name -> rkt cache |
| 48 | rktCache map[string]*C.rd_kafka_topic_t |
| 49 | // rkt -> topic name cache |
| 50 | rktNameCache map[*C.rd_kafka_topic_t]string |
| 51 | |
| 52 | // |
| 53 | // cgo map |
| 54 | // Maps C callbacks based on cgoid back to its Go object |
| 55 | cgoLock sync.Mutex |
| 56 | cgoidNext uintptr |
| 57 | cgomap map[int]cgoif |
| 58 | |
| 59 | // |
| 60 | // producer |
| 61 | // |
| 62 | p *Producer |
| 63 | |
| 64 | // Forward delivery reports on Producer.Events channel |
| 65 | fwdDr bool |
| 66 | |
| 67 | // |
| 68 | // consumer |
| 69 | // |
| 70 | c *Consumer |
| 71 | |
| 72 | // Forward rebalancing ack responsibility to application (current setting) |
| 73 | currAppRebalanceEnable bool |
| 74 | } |
| 75 | |
| 76 | func (h *handle) String() string { |
| 77 | return C.GoString(C.rd_kafka_name(h.rk)) |
| 78 | } |
| 79 | |
| 80 | func (h *handle) setup() { |
| 81 | h.rktCache = make(map[string]*C.rd_kafka_topic_t) |
| 82 | h.rktNameCache = make(map[*C.rd_kafka_topic_t]string) |
| 83 | h.cgomap = make(map[int]cgoif) |
| 84 | h.terminatedChan = make(chan string, 10) |
| 85 | } |
| 86 | |
| 87 | func (h *handle) cleanup() { |
| 88 | for _, crkt := range h.rktCache { |
| 89 | C.rd_kafka_topic_destroy(crkt) |
| 90 | } |
| 91 | |
| 92 | if h.rkq != nil { |
| 93 | C.rd_kafka_queue_destroy(h.rkq) |
| 94 | } |
| 95 | } |
| 96 | |
| 97 | // waitTerminated waits termination of background go-routines. |
| 98 | // termCnt is the number of goroutines expected to signal termination completion |
| 99 | // on h.terminatedChan |
| 100 | func (h *handle) waitTerminated(termCnt int) { |
| 101 | // Wait for termCnt termination-done events from goroutines |
| 102 | for ; termCnt > 0; termCnt-- { |
| 103 | _ = <-h.terminatedChan |
| 104 | } |
| 105 | } |
| 106 | |
| 107 | // getRkt0 finds or creates and returns a C topic_t object from the local cache. |
| 108 | func (h *handle) getRkt0(topic string, ctopic *C.char, doLock bool) (crkt *C.rd_kafka_topic_t) { |
| 109 | if doLock { |
| 110 | h.rktCacheLock.Lock() |
| 111 | defer h.rktCacheLock.Unlock() |
| 112 | } |
| 113 | crkt, ok := h.rktCache[topic] |
| 114 | if ok { |
| 115 | return crkt |
| 116 | } |
| 117 | |
| 118 | if ctopic == nil { |
| 119 | ctopic = C.CString(topic) |
| 120 | defer C.free(unsafe.Pointer(ctopic)) |
| 121 | } |
| 122 | |
| 123 | crkt = C.rd_kafka_topic_new(h.rk, ctopic, nil) |
| 124 | if crkt == nil { |
| 125 | panic(fmt.Sprintf("Unable to create new C topic \"%s\": %s", |
| 126 | topic, C.GoString(C.rd_kafka_err2str(C.rd_kafka_last_error())))) |
| 127 | } |
| 128 | |
| 129 | h.rktCache[topic] = crkt |
| 130 | h.rktNameCache[crkt] = topic |
| 131 | |
| 132 | return crkt |
| 133 | } |
| 134 | |
| 135 | // getRkt finds or creates and returns a C topic_t object from the local cache. |
| 136 | func (h *handle) getRkt(topic string) (crkt *C.rd_kafka_topic_t) { |
| 137 | return h.getRkt0(topic, nil, true) |
| 138 | } |
| 139 | |
| 140 | // getTopicNameFromRkt returns the topic name for a C topic_t object, preferably |
| 141 | // using the local cache to avoid a cgo call. |
| 142 | func (h *handle) getTopicNameFromRkt(crkt *C.rd_kafka_topic_t) (topic string) { |
| 143 | h.rktCacheLock.Lock() |
| 144 | defer h.rktCacheLock.Unlock() |
| 145 | |
| 146 | topic, ok := h.rktNameCache[crkt] |
| 147 | if ok { |
| 148 | return topic |
| 149 | } |
| 150 | |
| 151 | // we need our own copy/refcount of the crkt |
| 152 | ctopic := C.rd_kafka_topic_name(crkt) |
| 153 | topic = C.GoString(ctopic) |
| 154 | |
| 155 | crkt = h.getRkt0(topic, ctopic, false /* dont lock */) |
| 156 | |
| 157 | return topic |
| 158 | } |
| 159 | |
| 160 | // cgoif is a generic interface for holding Go state passed as opaque |
| 161 | // value to the C code. |
| 162 | // Since pointers to complex Go types cannot be passed to C we instead create |
| 163 | // a cgoif object, generate a unique id that is added to the cgomap, |
| 164 | // and then pass that id to the C code. When the C code callback is called we |
| 165 | // use the id to look up the cgoif object in the cgomap. |
| 166 | type cgoif interface{} |
| 167 | |
| 168 | // delivery report cgoif container |
| 169 | type cgoDr struct { |
| 170 | deliveryChan chan Event |
| 171 | opaque interface{} |
| 172 | } |
| 173 | |
| 174 | // cgoPut adds object cg to the handle's cgo map and returns a |
| 175 | // unique id for the added entry. |
| 176 | // Thread-safe. |
| 177 | // FIXME: the uniquity of the id is questionable over time. |
| 178 | func (h *handle) cgoPut(cg cgoif) (cgoid int) { |
| 179 | h.cgoLock.Lock() |
| 180 | defer h.cgoLock.Unlock() |
| 181 | |
| 182 | h.cgoidNext++ |
| 183 | if h.cgoidNext == 0 { |
| 184 | h.cgoidNext++ |
| 185 | } |
| 186 | cgoid = (int)(h.cgoidNext) |
| 187 | h.cgomap[cgoid] = cg |
| 188 | return cgoid |
| 189 | } |
| 190 | |
| 191 | // cgoGet looks up cgoid in the cgo map, deletes the reference from the map |
| 192 | // and returns the object, if found. Else returns nil, false. |
| 193 | // Thread-safe. |
| 194 | func (h *handle) cgoGet(cgoid int) (cg cgoif, found bool) { |
| 195 | if cgoid == 0 { |
| 196 | return nil, false |
| 197 | } |
| 198 | |
| 199 | h.cgoLock.Lock() |
| 200 | defer h.cgoLock.Unlock() |
| 201 | cg, found = h.cgomap[cgoid] |
| 202 | if found { |
| 203 | delete(h.cgomap, cgoid) |
| 204 | } |
| 205 | |
| 206 | return cg, found |
| 207 | } |