khenaidoo | ac63710 | 2019-01-14 15:44:34 -0500 | [diff] [blame^] | 1 | package kafka |
| 2 | |
| 3 | /** |
| 4 | * Copyright 2016 Confluent Inc. |
| 5 | * |
| 6 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 7 | * you may not use this file except in compliance with the License. |
| 8 | * You may obtain a copy of the License at |
| 9 | * |
| 10 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 11 | * |
| 12 | * Unless required by applicable law or agreed to in writing, software |
| 13 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 15 | * See the License for the specific language governing permissions and |
| 16 | * limitations under the License. |
| 17 | */ |
| 18 | |
| 19 | import ( |
| 20 | "fmt" |
| 21 | "time" |
| 22 | "unsafe" |
| 23 | ) |
| 24 | |
| 25 | /* |
| 26 | #include <string.h> |
| 27 | #include <stdlib.h> |
| 28 | #include <librdkafka/rdkafka.h> |
| 29 | #include "glue_rdkafka.h" |
| 30 | |
| 31 | void setup_rkmessage (rd_kafka_message_t *rkmessage, |
| 32 | rd_kafka_topic_t *rkt, int32_t partition, |
| 33 | const void *payload, size_t len, |
| 34 | void *key, size_t keyLen, void *opaque) { |
| 35 | rkmessage->rkt = rkt; |
| 36 | rkmessage->partition = partition; |
| 37 | rkmessage->payload = (void *)payload; |
| 38 | rkmessage->len = len; |
| 39 | rkmessage->key = (void *)key; |
| 40 | rkmessage->key_len = keyLen; |
| 41 | rkmessage->_private = opaque; |
| 42 | } |
| 43 | */ |
| 44 | import "C" |
| 45 | |
| 46 | // TimestampType is a the Message timestamp type or source |
| 47 | // |
| 48 | type TimestampType int |
| 49 | |
| 50 | const ( |
| 51 | // TimestampNotAvailable indicates no timestamp was set, or not available due to lacking broker support |
| 52 | TimestampNotAvailable = TimestampType(C.RD_KAFKA_TIMESTAMP_NOT_AVAILABLE) |
| 53 | // TimestampCreateTime indicates timestamp set by producer (source time) |
| 54 | TimestampCreateTime = TimestampType(C.RD_KAFKA_TIMESTAMP_CREATE_TIME) |
| 55 | // TimestampLogAppendTime indicates timestamp set set by broker (store time) |
| 56 | TimestampLogAppendTime = TimestampType(C.RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME) |
| 57 | ) |
| 58 | |
| 59 | func (t TimestampType) String() string { |
| 60 | switch t { |
| 61 | case TimestampCreateTime: |
| 62 | return "CreateTime" |
| 63 | case TimestampLogAppendTime: |
| 64 | return "LogAppendTime" |
| 65 | case TimestampNotAvailable: |
| 66 | fallthrough |
| 67 | default: |
| 68 | return "NotAvailable" |
| 69 | } |
| 70 | } |
| 71 | |
| 72 | // Message represents a Kafka message |
| 73 | type Message struct { |
| 74 | TopicPartition TopicPartition |
| 75 | Value []byte |
| 76 | Key []byte |
| 77 | Timestamp time.Time |
| 78 | TimestampType TimestampType |
| 79 | Opaque interface{} |
| 80 | Headers []Header |
| 81 | } |
| 82 | |
| 83 | // String returns a human readable representation of a Message. |
| 84 | // Key and payload are not represented. |
| 85 | func (m *Message) String() string { |
| 86 | var topic string |
| 87 | if m.TopicPartition.Topic != nil { |
| 88 | topic = *m.TopicPartition.Topic |
| 89 | } else { |
| 90 | topic = "" |
| 91 | } |
| 92 | return fmt.Sprintf("%s[%d]@%s", topic, m.TopicPartition.Partition, m.TopicPartition.Offset) |
| 93 | } |
| 94 | |
| 95 | func (h *handle) getRktFromMessage(msg *Message) (crkt *C.rd_kafka_topic_t) { |
| 96 | if msg.TopicPartition.Topic == nil { |
| 97 | return nil |
| 98 | } |
| 99 | |
| 100 | return h.getRkt(*msg.TopicPartition.Topic) |
| 101 | } |
| 102 | |
| 103 | func (h *handle) newMessageFromFcMsg(fcMsg *C.fetched_c_msg_t) (msg *Message) { |
| 104 | msg = &Message{} |
| 105 | |
| 106 | if fcMsg.ts != -1 { |
| 107 | ts := int64(fcMsg.ts) |
| 108 | msg.TimestampType = TimestampType(fcMsg.tstype) |
| 109 | msg.Timestamp = time.Unix(ts/1000, (ts%1000)*1000000) |
| 110 | } |
| 111 | |
| 112 | if fcMsg.tmphdrsCnt > 0 { |
| 113 | msg.Headers = make([]Header, fcMsg.tmphdrsCnt) |
| 114 | for n := range msg.Headers { |
| 115 | tmphdr := (*[1 << 30]C.tmphdr_t)(unsafe.Pointer(fcMsg.tmphdrs))[n] |
| 116 | msg.Headers[n].Key = C.GoString(tmphdr.key) |
| 117 | if tmphdr.val != nil { |
| 118 | msg.Headers[n].Value = C.GoBytes(unsafe.Pointer(tmphdr.val), C.int(tmphdr.size)) |
| 119 | } else { |
| 120 | msg.Headers[n].Value = nil |
| 121 | } |
| 122 | } |
| 123 | C.free(unsafe.Pointer(fcMsg.tmphdrs)) |
| 124 | } |
| 125 | |
| 126 | h.setupMessageFromC(msg, fcMsg.msg) |
| 127 | |
| 128 | return msg |
| 129 | } |
| 130 | |
| 131 | // setupMessageFromC sets up a message object from a C rd_kafka_message_t |
| 132 | func (h *handle) setupMessageFromC(msg *Message, cmsg *C.rd_kafka_message_t) { |
| 133 | if cmsg.rkt != nil { |
| 134 | topic := h.getTopicNameFromRkt(cmsg.rkt) |
| 135 | msg.TopicPartition.Topic = &topic |
| 136 | } |
| 137 | msg.TopicPartition.Partition = int32(cmsg.partition) |
| 138 | if cmsg.payload != nil { |
| 139 | msg.Value = C.GoBytes(unsafe.Pointer(cmsg.payload), C.int(cmsg.len)) |
| 140 | } |
| 141 | if cmsg.key != nil { |
| 142 | msg.Key = C.GoBytes(unsafe.Pointer(cmsg.key), C.int(cmsg.key_len)) |
| 143 | } |
| 144 | msg.TopicPartition.Offset = Offset(cmsg.offset) |
| 145 | if cmsg.err != 0 { |
| 146 | msg.TopicPartition.Error = newError(cmsg.err) |
| 147 | } |
| 148 | } |
| 149 | |
| 150 | // newMessageFromC creates a new message object from a C rd_kafka_message_t |
| 151 | // NOTE: For use with Producer: does not set message timestamp fields. |
| 152 | func (h *handle) newMessageFromC(cmsg *C.rd_kafka_message_t) (msg *Message) { |
| 153 | msg = &Message{} |
| 154 | |
| 155 | h.setupMessageFromC(msg, cmsg) |
| 156 | |
| 157 | return msg |
| 158 | } |
| 159 | |
| 160 | // messageToC sets up cmsg as a clone of msg |
| 161 | func (h *handle) messageToC(msg *Message, cmsg *C.rd_kafka_message_t) { |
| 162 | var valp unsafe.Pointer |
| 163 | var keyp unsafe.Pointer |
| 164 | |
| 165 | // to circumvent Cgo constraints we need to allocate C heap memory |
| 166 | // for both Value and Key (one allocation back to back) |
| 167 | // and copy the bytes from Value and Key to the C memory. |
| 168 | // We later tell librdkafka (in produce()) to free the |
| 169 | // C memory pointer when it is done. |
| 170 | var payload unsafe.Pointer |
| 171 | |
| 172 | valueLen := 0 |
| 173 | keyLen := 0 |
| 174 | if msg.Value != nil { |
| 175 | valueLen = len(msg.Value) |
| 176 | } |
| 177 | if msg.Key != nil { |
| 178 | keyLen = len(msg.Key) |
| 179 | } |
| 180 | |
| 181 | allocLen := valueLen + keyLen |
| 182 | if allocLen > 0 { |
| 183 | payload = C.malloc(C.size_t(allocLen)) |
| 184 | if valueLen > 0 { |
| 185 | copy((*[1 << 30]byte)(payload)[0:valueLen], msg.Value) |
| 186 | valp = payload |
| 187 | } |
| 188 | if keyLen > 0 { |
| 189 | copy((*[1 << 30]byte)(payload)[valueLen:allocLen], msg.Key) |
| 190 | keyp = unsafe.Pointer(&((*[1 << 31]byte)(payload)[valueLen])) |
| 191 | } |
| 192 | } |
| 193 | |
| 194 | cmsg.rkt = h.getRktFromMessage(msg) |
| 195 | cmsg.partition = C.int32_t(msg.TopicPartition.Partition) |
| 196 | cmsg.payload = valp |
| 197 | cmsg.len = C.size_t(valueLen) |
| 198 | cmsg.key = keyp |
| 199 | cmsg.key_len = C.size_t(keyLen) |
| 200 | cmsg._private = nil |
| 201 | } |
| 202 | |
| 203 | // used for testing messageToC performance |
| 204 | func (h *handle) messageToCDummy(msg *Message) { |
| 205 | var cmsg C.rd_kafka_message_t |
| 206 | h.messageToC(msg, &cmsg) |
| 207 | } |