blob: 3472d1c7d6fa720594297194753d9812d32b3674 [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001package kafka
2
3/**
4 * Copyright 2016 Confluent Inc.
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19import (
20 "fmt"
21 "time"
22 "unsafe"
23)
24
25/*
26#include <string.h>
27#include <stdlib.h>
28#include <librdkafka/rdkafka.h>
29#include "glue_rdkafka.h"
30
31void setup_rkmessage (rd_kafka_message_t *rkmessage,
32 rd_kafka_topic_t *rkt, int32_t partition,
33 const void *payload, size_t len,
34 void *key, size_t keyLen, void *opaque) {
35 rkmessage->rkt = rkt;
36 rkmessage->partition = partition;
37 rkmessage->payload = (void *)payload;
38 rkmessage->len = len;
39 rkmessage->key = (void *)key;
40 rkmessage->key_len = keyLen;
41 rkmessage->_private = opaque;
42}
43*/
44import "C"
45
46// TimestampType is a the Message timestamp type or source
47//
48type TimestampType int
49
50const (
51 // TimestampNotAvailable indicates no timestamp was set, or not available due to lacking broker support
52 TimestampNotAvailable = TimestampType(C.RD_KAFKA_TIMESTAMP_NOT_AVAILABLE)
53 // TimestampCreateTime indicates timestamp set by producer (source time)
54 TimestampCreateTime = TimestampType(C.RD_KAFKA_TIMESTAMP_CREATE_TIME)
55 // TimestampLogAppendTime indicates timestamp set set by broker (store time)
56 TimestampLogAppendTime = TimestampType(C.RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME)
57)
58
59func (t TimestampType) String() string {
60 switch t {
61 case TimestampCreateTime:
62 return "CreateTime"
63 case TimestampLogAppendTime:
64 return "LogAppendTime"
65 case TimestampNotAvailable:
66 fallthrough
67 default:
68 return "NotAvailable"
69 }
70}
71
72// Message represents a Kafka message
73type Message struct {
74 TopicPartition TopicPartition
75 Value []byte
76 Key []byte
77 Timestamp time.Time
78 TimestampType TimestampType
79 Opaque interface{}
80 Headers []Header
81}
82
83// String returns a human readable representation of a Message.
84// Key and payload are not represented.
85func (m *Message) String() string {
86 var topic string
87 if m.TopicPartition.Topic != nil {
88 topic = *m.TopicPartition.Topic
89 } else {
90 topic = ""
91 }
92 return fmt.Sprintf("%s[%d]@%s", topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
93}
94
95func (h *handle) getRktFromMessage(msg *Message) (crkt *C.rd_kafka_topic_t) {
96 if msg.TopicPartition.Topic == nil {
97 return nil
98 }
99
100 return h.getRkt(*msg.TopicPartition.Topic)
101}
102
103func (h *handle) newMessageFromFcMsg(fcMsg *C.fetched_c_msg_t) (msg *Message) {
104 msg = &Message{}
105
106 if fcMsg.ts != -1 {
107 ts := int64(fcMsg.ts)
108 msg.TimestampType = TimestampType(fcMsg.tstype)
109 msg.Timestamp = time.Unix(ts/1000, (ts%1000)*1000000)
110 }
111
112 if fcMsg.tmphdrsCnt > 0 {
113 msg.Headers = make([]Header, fcMsg.tmphdrsCnt)
114 for n := range msg.Headers {
115 tmphdr := (*[1 << 30]C.tmphdr_t)(unsafe.Pointer(fcMsg.tmphdrs))[n]
116 msg.Headers[n].Key = C.GoString(tmphdr.key)
117 if tmphdr.val != nil {
118 msg.Headers[n].Value = C.GoBytes(unsafe.Pointer(tmphdr.val), C.int(tmphdr.size))
119 } else {
120 msg.Headers[n].Value = nil
121 }
122 }
123 C.free(unsafe.Pointer(fcMsg.tmphdrs))
124 }
125
126 h.setupMessageFromC(msg, fcMsg.msg)
127
128 return msg
129}
130
131// setupMessageFromC sets up a message object from a C rd_kafka_message_t
132func (h *handle) setupMessageFromC(msg *Message, cmsg *C.rd_kafka_message_t) {
133 if cmsg.rkt != nil {
134 topic := h.getTopicNameFromRkt(cmsg.rkt)
135 msg.TopicPartition.Topic = &topic
136 }
137 msg.TopicPartition.Partition = int32(cmsg.partition)
138 if cmsg.payload != nil {
139 msg.Value = C.GoBytes(unsafe.Pointer(cmsg.payload), C.int(cmsg.len))
140 }
141 if cmsg.key != nil {
142 msg.Key = C.GoBytes(unsafe.Pointer(cmsg.key), C.int(cmsg.key_len))
143 }
144 msg.TopicPartition.Offset = Offset(cmsg.offset)
145 if cmsg.err != 0 {
146 msg.TopicPartition.Error = newError(cmsg.err)
147 }
148}
149
150// newMessageFromC creates a new message object from a C rd_kafka_message_t
151// NOTE: For use with Producer: does not set message timestamp fields.
152func (h *handle) newMessageFromC(cmsg *C.rd_kafka_message_t) (msg *Message) {
153 msg = &Message{}
154
155 h.setupMessageFromC(msg, cmsg)
156
157 return msg
158}
159
160// messageToC sets up cmsg as a clone of msg
161func (h *handle) messageToC(msg *Message, cmsg *C.rd_kafka_message_t) {
162 var valp unsafe.Pointer
163 var keyp unsafe.Pointer
164
165 // to circumvent Cgo constraints we need to allocate C heap memory
166 // for both Value and Key (one allocation back to back)
167 // and copy the bytes from Value and Key to the C memory.
168 // We later tell librdkafka (in produce()) to free the
169 // C memory pointer when it is done.
170 var payload unsafe.Pointer
171
172 valueLen := 0
173 keyLen := 0
174 if msg.Value != nil {
175 valueLen = len(msg.Value)
176 }
177 if msg.Key != nil {
178 keyLen = len(msg.Key)
179 }
180
181 allocLen := valueLen + keyLen
182 if allocLen > 0 {
183 payload = C.malloc(C.size_t(allocLen))
184 if valueLen > 0 {
185 copy((*[1 << 30]byte)(payload)[0:valueLen], msg.Value)
186 valp = payload
187 }
188 if keyLen > 0 {
189 copy((*[1 << 30]byte)(payload)[valueLen:allocLen], msg.Key)
190 keyp = unsafe.Pointer(&((*[1 << 31]byte)(payload)[valueLen]))
191 }
192 }
193
194 cmsg.rkt = h.getRktFromMessage(msg)
195 cmsg.partition = C.int32_t(msg.TopicPartition.Partition)
196 cmsg.payload = valp
197 cmsg.len = C.size_t(valueLen)
198 cmsg.key = keyp
199 cmsg.key_len = C.size_t(keyLen)
200 cmsg._private = nil
201}
202
203// used for testing messageToC performance
204func (h *handle) messageToCDummy(msg *Message) {
205 var cmsg C.rd_kafka_message_t
206 h.messageToC(msg, &cmsg)
207}