blob: 9db57a03ced7b358a8519d38a9b6b2fc2b6696d7 [file] [log] [blame]
khenaidooabad44c2018-08-03 16:58:35 -04001package kafka
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "github.com/Shopify/sarama"
8 "github.com/golang/protobuf/proto"
9 "github.com/golang/protobuf/ptypes"
10 "github.com/golang/protobuf/ptypes/any"
11 "github.com/google/uuid"
12 "github.com/opencord/voltha-go/common/log"
13 ca "github.com/opencord/voltha-go/protos/core_adapter"
14 "reflect"
15 "sync"
16 "time"
17)
18
19// Initialize the logger - gets the default until the main function setup the logger
20func init() {
21 log.GetLogger()
22}
23
24const (
25 DefaultKafkaHost = "10.100.198.240"
26 DefaultKafkaPort = 9092
27 DefaultTopicName = "Core"
28 DefaultSleepOnError = 1
29 DefaultFlushFrequency = 1
30 DefaultFlushMessages = 1
31 DefaultFlushMaxmessages = 1
32 DefaultMaxRetries = 3
33 DefaultReturnSuccess = false
34 DefaultReturnErrors = true
35 DefaultConsumerMaxwait = 50
36 DefaultMaxProcessingTime = 100
37 DefaultRequestTimeout = 50 // 50 milliseconds
38)
39
40type consumerChannels struct {
41 consumer sarama.PartitionConsumer
42 channels []chan *ca.InterContainerMessage
43}
44
45// KafkaMessagingProxy represents the messaging proxy
46type KafkaMessagingProxy struct {
47 KafkaHost string
48 KafkaPort int
49 DefaultTopic *Topic
50 TargetInterface interface{}
51 producer sarama.AsyncProducer
52 consumer sarama.Consumer
53 doneCh chan int
54 topicToConsumerChannelMap map[string]*consumerChannels
55 transactionIdToChannelMap map[string]chan *ca.InterContainerMessage
56 lockTopicToConsumerChannelMap sync.RWMutex
57 lockTransactionIdToChannelMap sync.RWMutex
58}
59
60type KafkaProxyOption func(*KafkaMessagingProxy)
61
62func KafkaHost(host string) KafkaProxyOption {
63 return func(args *KafkaMessagingProxy) {
64 args.KafkaHost = host
65 }
66}
67
68func KafkaPort(port int) KafkaProxyOption {
69 return func(args *KafkaMessagingProxy) {
70 args.KafkaPort = port
71 }
72}
73
74func DefaultTopic(topic *Topic) KafkaProxyOption {
75 return func(args *KafkaMessagingProxy) {
76 args.DefaultTopic = topic
77 }
78}
79
80func TargetInterface(target interface{}) KafkaProxyOption {
81 return func(args *KafkaMessagingProxy) {
82 args.TargetInterface = target
83 }
84}
85
86func NewKafkaMessagingProxy(opts ...KafkaProxyOption) (*KafkaMessagingProxy, error) {
87 proxy := &KafkaMessagingProxy{
88 KafkaHost: DefaultKafkaHost,
89 KafkaPort: DefaultKafkaPort,
90 DefaultTopic: &Topic{Name: DefaultTopicName},
91 }
92
93 for _, option := range opts {
94 option(proxy)
95 }
96
97 // Create the locks for all the maps
98 proxy.lockTopicToConsumerChannelMap = sync.RWMutex{}
99 proxy.lockTransactionIdToChannelMap = sync.RWMutex{}
100
101 return proxy, nil
102}
103
104func (kp *KafkaMessagingProxy) Start() error {
105 log.Info("Starting-Proxy")
106
107 // Create the Done channel
108 kp.doneCh = make(chan int, 1)
109
110 // Create the Publisher
111 if err := kp.createPublisher(DefaultMaxRetries); err != nil {
112 log.Errorw("Cannot-create-kafka-publisher", log.Fields{"error": err})
113 return err
114 }
115
116 // Create the master consumer
117 if err := kp.createConsumer(DefaultMaxRetries); err != nil {
118 log.Errorw("Cannot-create-kafka-consumer", log.Fields{"error": err})
119 return err
120 }
121
122 // Create the topic to consumer/channel map
123 kp.topicToConsumerChannelMap = make(map[string]*consumerChannels)
124
125 // Create the transactionId to Channel Map
126 kp.transactionIdToChannelMap = make(map[string]chan *ca.InterContainerMessage)
127
128 return nil
129}
130
131func (kp *KafkaMessagingProxy) Stop() {
132 log.Info("Stopping-Proxy")
133 if kp.producer != nil {
134 if err := kp.producer.Close(); err != nil {
135 panic(err)
136 }
137 }
138 if kp.consumer != nil {
139 if err := kp.consumer.Close(); err != nil {
140 panic(err)
141 }
142 }
143 //Close the done channel to close all long processing Go routines
144 close(kp.doneCh)
145}
146
147func (kp *KafkaMessagingProxy) InvokeRPC(ctx context.Context, rpc string, topic *Topic, waitForResponse bool,
148 kvArgs ...*KVArg) (bool, *any.Any) {
149 // Encode the request
150 protoRequest, err := encodeRequest(rpc, topic, kp.DefaultTopic, kvArgs...)
151 if err != nil {
152 log.Warnw("cannot-format-request", log.Fields{"rpc": rpc, "error": err})
153 return false, nil
154 }
155
156 // Subscribe for response, if needed, before sending request
157 var ch <-chan *ca.InterContainerMessage
158 if waitForResponse {
159 var err error
160 if ch, err = kp.subscribeForResponse(*kp.DefaultTopic, protoRequest.Header.Id); err != nil {
161 log.Errorw("failed-to-subscribe-for-response", log.Fields{"error": err, "topic": topic.Name})
162 }
163 }
164
165 // Send request
166 go kp.sendToKafkaTopic(protoRequest, topic)
167
168 if waitForResponse {
169 // if ctx is nil use a default timeout ctx to ensure we do not wait forever
170 var cancel context.CancelFunc
171 if ctx == nil {
172 ctx, cancel = context.WithTimeout(context.Background(), DefaultRequestTimeout*time.Millisecond)
173 defer cancel()
174 }
175
176 // Wait for response as well as timeout or cancellation
177 // Remove the subscription for a response on return
178 defer kp.unSubscribeForResponse(protoRequest.Header.Id)
179 select {
180 case msg := <-ch:
181 log.Debugw("received-response", log.Fields{"rpc": rpc, "msg": msg})
182
183 var responseBody *ca.InterContainerResponseBody
184 var err error
185 if responseBody, err = decodeResponse(msg); err != nil {
186 log.Errorw("decode-response-error", log.Fields{"error": err})
187 }
188 return responseBody.Success, responseBody.Result
189 case <-ctx.Done():
190 log.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
191 // pack the error as proto any type
192 protoError := &ca.Error{Reason: ctx.Err().Error()}
193 var marshalledArg *any.Any
194 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
195 return false, nil // Should never happen
196 }
197 return false, marshalledArg
198 case <-kp.doneCh:
199 log.Infow("received-exit-signal", log.Fields{"topic": topic.Name, "rpc": rpc})
200 return true, nil
201 }
202 }
203 return true, nil
204}
205
206// Subscribe allows a caller to subscribe to a given topic. A channel is returned to the
207// caller to receive messages from that topic.
208func (kp *KafkaMessagingProxy) Subscribe(topic Topic) (<-chan *ca.InterContainerMessage, error) {
209
210 log.Debugw("subscribe", log.Fields{"topic": topic.Name})
211
212 if consumerCh := kp.getConsumerChannel(topic); consumerCh != nil {
213 log.Debugw("topic-already-subscribed", log.Fields{"topic": topic.Name})
214 // Create a channel specific for that consumer and add it to the consumer channel map
215 ch := make(chan *ca.InterContainerMessage)
216 kp.addChannelToConsumerChannelMap(topic, ch)
217 return ch, nil
218 }
219
220 // Register for the topic and set it up
221 var consumerListeningChannel chan *ca.InterContainerMessage
222 var err error
223 if consumerListeningChannel, err = kp.setupConsumerChannel(topic); err != nil {
224 log.Warnw("create-consumer-channel-failure", log.Fields{"error": err, "topic": topic.Name})
225 return nil, err
226 }
227
228 return consumerListeningChannel, nil
229}
230
231func (kp *KafkaMessagingProxy) UnSubscribe(topic Topic, ch <-chan *ca.InterContainerMessage) error {
232 log.Debugw("unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
233 err := kp.removeChannelFromConsumerChannelMap(topic, ch)
234 return err
235}
236
237// SubscribeWithTarget allows a caller to assign a target object to be invoked automatically
238// when a message is received on a given topic
239func (kp *KafkaMessagingProxy) SubscribeWithTarget(topic Topic, targetInterface interface{}) error {
240
241 // Subscribe to receive messages for that topic
242 var ch <-chan *ca.InterContainerMessage
243 var err error
244 if ch, err = kp.Subscribe(topic); err != nil {
245 log.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
246 }
247 // Launch a go routine to receive and process kafka messages
248 go kp.waitForRequest(ch, topic, targetInterface)
249
250 return nil
251}
252
253func (kp *KafkaMessagingProxy) UnSubscribeTarget(ctx context.Context, topic Topic, targetInterface interface{}) error {
254 // TODO - mostly relevant with multiple interfaces
255 return nil
256}
257
258func (kp *KafkaMessagingProxy) addToTopicToConsumerChannelMap(id string, arg *consumerChannels) {
259 kp.lockTopicToConsumerChannelMap.Lock()
260 defer kp.lockTopicToConsumerChannelMap.Unlock()
261 if _, exist := kp.topicToConsumerChannelMap[id]; !exist {
262 kp.topicToConsumerChannelMap[id] = arg
263 }
264}
265
266func (kp *KafkaMessagingProxy) deleteFromTopicToConsumerChannelMap(id string) {
267 kp.lockTopicToConsumerChannelMap.Lock()
268 defer kp.lockTopicToConsumerChannelMap.Unlock()
269 if _, exist := kp.topicToConsumerChannelMap[id]; exist {
270 delete(kp.topicToConsumerChannelMap, id)
271 }
272}
273
274func (kp *KafkaMessagingProxy) getConsumerChannel(topic Topic) *consumerChannels {
275 kp.lockTopicToConsumerChannelMap.Lock()
276 defer kp.lockTopicToConsumerChannelMap.Unlock()
277
278 if consumerCh, exist := kp.topicToConsumerChannelMap[topic.Name]; exist {
279 return consumerCh
280 }
281 return nil
282}
283
284func (kp *KafkaMessagingProxy) addChannelToConsumerChannelMap(topic Topic, ch chan *ca.InterContainerMessage) {
285 kp.lockTopicToConsumerChannelMap.Lock()
286 defer kp.lockTopicToConsumerChannelMap.Unlock()
287 if consumerCh, exist := kp.topicToConsumerChannelMap[topic.Name]; exist {
288 consumerCh.channels = append(consumerCh.channels, ch)
289 return
290 }
291 log.Warnw("consumer-channel-not-exist", log.Fields{"topic": topic.Name})
292}
293
294func (kp *KafkaMessagingProxy) removeChannelFromConsumerChannelMap(topic Topic, ch <-chan *ca.InterContainerMessage) error {
295 kp.lockTopicToConsumerChannelMap.Lock()
296 defer kp.lockTopicToConsumerChannelMap.Unlock()
297 if consumerCh, exist := kp.topicToConsumerChannelMap[topic.Name]; exist {
298 // Channel will be closed in the removeChannel method
299 consumerCh.channels = removeChannel(consumerCh.channels, ch)
300 return nil
301 }
302 log.Warnw("topic-does-not-exist", log.Fields{"topic": topic.Name})
303 return errors.New("topic-does-not-exist")
304}
305
306func (kp *KafkaMessagingProxy) addToTransactionIdToChannelMap(id string, arg chan *ca.InterContainerMessage) {
307 kp.lockTransactionIdToChannelMap.Lock()
308 defer kp.lockTransactionIdToChannelMap.Unlock()
309 if _, exist := kp.transactionIdToChannelMap[id]; !exist {
310 kp.transactionIdToChannelMap[id] = arg
311 }
312}
313
314func (kp *KafkaMessagingProxy) deleteFromTransactionIdToChannelMap(id string) {
315 kp.lockTransactionIdToChannelMap.Lock()
316 defer kp.lockTransactionIdToChannelMap.Unlock()
317 if _, exist := kp.transactionIdToChannelMap[id]; exist {
318 delete(kp.transactionIdToChannelMap, id)
319 }
320}
321
322func (kp *KafkaMessagingProxy) createPublisher(retries int) error {
323 // This Creates the publisher
324 config := sarama.NewConfig()
325 config.Producer.Partitioner = sarama.NewRandomPartitioner
326 config.Producer.Flush.Frequency = time.Duration(DefaultFlushFrequency)
327 config.Producer.Flush.Messages = DefaultFlushMessages
328 config.Producer.Flush.MaxMessages = DefaultFlushMaxmessages
329 config.Producer.Return.Errors = DefaultReturnErrors
330 config.Producer.Return.Successes = DefaultReturnSuccess
331 config.Producer.RequiredAcks = sarama.WaitForAll
332 kafkaFullAddr := fmt.Sprintf("%s:%d", kp.KafkaHost, kp.KafkaPort)
333 brokers := []string{kafkaFullAddr}
334
335 for {
336 producer, err := sarama.NewAsyncProducer(brokers, config)
337 if err != nil {
338 if retries == 0 {
339 log.Errorw("error-starting-publisher", log.Fields{"error": err})
340 return err
341 } else {
342 // If retries is -ve then we will retry indefinitely
343 retries--
344 }
345 log.Info("retrying-after-a-second-delay")
346 time.Sleep(time.Duration(DefaultSleepOnError) * time.Second)
347 } else {
348 kp.producer = producer
349 break
350 }
351 }
352 log.Info("Kafka-publisher-created")
353 return nil
354}
355
356func (kp *KafkaMessagingProxy) createConsumer(retries int) error {
357 config := sarama.NewConfig()
358 config.Consumer.Return.Errors = true
359 config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
360 config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
361 config.Consumer.Offsets.Initial = sarama.OffsetNewest
362 kafkaFullAddr := fmt.Sprintf("%s:%d", kp.KafkaHost, kp.KafkaPort)
363 brokers := []string{kafkaFullAddr}
364
365 for {
366 consumer, err := sarama.NewConsumer(brokers, config)
367 if err != nil {
368 if retries == 0 {
369 log.Errorw("error-starting-consumer", log.Fields{"error": err})
370 return err
371 } else {
372 // If retries is -ve then we will retry indefinitely
373 retries--
374 }
375 log.Info("retrying-after-a-second-delay")
376 time.Sleep(time.Duration(DefaultSleepOnError) * time.Second)
377 } else {
378 kp.consumer = consumer
379 break
380 }
381 }
382 log.Info("Kafka-consumer-created")
383 return nil
384}
385
386func encodeReturnedValue(request *ca.InterContainerMessage, returnedVal interface{}) (*any.Any, error) {
387 // Encode the response argument - needs to be a proto message
388 if returnedVal == nil {
389 return nil, nil
390 }
391 protoValue, ok := returnedVal.(proto.Message)
392 if !ok {
393 log.Warnw("response-value-not-proto-message", log.Fields{"error": ok, "returnVal": returnedVal})
394 err := errors.New("response-value-not-proto-message")
395 return nil, err
396 }
397
398 // Marshal the returned value, if any
399 var marshalledReturnedVal *any.Any
400 var err error
401 if marshalledReturnedVal, err = ptypes.MarshalAny(protoValue); err != nil {
402 log.Warnw("cannot-marshal-returned-val", log.Fields{"error": err})
403 return nil, err
404 }
405 return marshalledReturnedVal, nil
406}
407
408func encodeDefaultFailedResponse(request *ca.InterContainerMessage) *ca.InterContainerMessage {
409 responseHeader := &ca.Header{
410 Id: request.Header.Id,
411 Type: ca.MessageType_RESPONSE,
412 FromTopic: request.Header.ToTopic,
413 ToTopic: request.Header.FromTopic,
414 Timestamp: time.Now().Unix(),
415 }
416 responseBody := &ca.InterContainerResponseBody{
417 Success: false,
418 Result: nil,
419 }
420 var marshalledResponseBody *any.Any
421 var err error
422 // Error should never happen here
423 if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
424 log.Warnw("cannot-marshal-failed-response-body", log.Fields{"error": err})
425 }
426
427 return &ca.InterContainerMessage{
428 Header: responseHeader,
429 Body: marshalledResponseBody,
430 }
431
432}
433
434//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
435//or an error on failure
436func encodeResponse(request *ca.InterContainerMessage, success bool, returnedValues ...interface{}) (*ca.InterContainerMessage, error) {
437
438 log.Infow("encodeResponse", log.Fields{"success": success, "returnedValues": returnedValues})
439 responseHeader := &ca.Header{
440 Id: request.Header.Id,
441 Type: ca.MessageType_RESPONSE,
442 FromTopic: request.Header.ToTopic,
443 ToTopic: request.Header.FromTopic,
444 Timestamp: time.Now().Unix(),
445 }
446
447 // Go over all returned values
448 var marshalledReturnedVal *any.Any
449 var err error
450 for _, returnVal := range returnedValues {
451 if marshalledReturnedVal, err = encodeReturnedValue(request, returnVal); err != nil {
452 log.Warnw("cannot-marshal-response-body", log.Fields{"error": err})
453 }
454 break // for now we support only 1 returned value - (excluding the error)
455 }
456
457 responseBody := &ca.InterContainerResponseBody{
458 Success: success,
459 Result: marshalledReturnedVal,
460 }
461
462 // Marshal the response body
463 var marshalledResponseBody *any.Any
464 if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
465 log.Warnw("cannot-marshal-response-body", log.Fields{"error": err})
466 return nil, err
467 }
468
469 return &ca.InterContainerMessage{
470 Header: responseHeader,
471 Body: marshalledResponseBody,
472 }, nil
473}
474
475func CallFuncByName(myClass interface{}, funcName string, params ...interface{}) (out []reflect.Value, err error) {
476 myClassValue := reflect.ValueOf(myClass)
477 m := myClassValue.MethodByName(funcName)
478 if !m.IsValid() {
479 return make([]reflect.Value, 0), fmt.Errorf("Method not found \"%s\"", funcName)
480 }
481 in := make([]reflect.Value, len(params))
482 for i, param := range params {
483 in[i] = reflect.ValueOf(param)
484 }
485 out = m.Call(in)
486 return
487}
488
489func (kp *KafkaMessagingProxy) handleRequest(msg *ca.InterContainerMessage, targetInterface interface{}) {
490
491 // First extract the header to know whether this is a request of a response
492 if msg.Header.Type == ca.MessageType_REQUEST {
493 log.Debugw("received-request", log.Fields{"header": msg.Header})
494
495 var out []reflect.Value
496 var err error
497
498 // Get the request body
499 requestBody := &ca.InterContainerRequestBody{}
500 if err = ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
501 log.Warnw("cannot-unmarshal-request", log.Fields{"error": err})
502 } else {
503 // let the callee unpack the arguments as its the only one that knows the real proto type
504 out, err = CallFuncByName(targetInterface, requestBody.Rpc, requestBody.Args)
505 if err != nil {
506 log.Warn(err)
507 }
508 }
509 // Response required?
510 if requestBody.ResponseRequired {
511 // If we already have an error before then just return that
512 var returnError *ca.Error
513 var returnedValues []interface{}
514 var success bool
515 if err != nil {
516 returnError = &ca.Error{Reason: err.Error()}
517 returnedValues = make([]interface{}, 1)
518 returnedValues[0] = returnError
519 } else {
520 log.Debugw("returned-api-response", log.Fields{"len": len(out), "err": err})
521 returnSize := 1 // Minimum array size
522 if len(out) > 1 {
523 returnSize = len(out) - 1
524 }
525 returnedValues = make([]interface{}, returnSize)
526 for idx, val := range out {
527 log.Debugw("returned-api-response-loop", log.Fields{"idx": idx, "val": val.Interface()})
528 if idx == 0 {
529 if val.Interface() != nil {
530 if goError, ok := out[0].Interface().(error); ok {
531 returnError = &ca.Error{Reason: goError.Error()}
532 returnedValues[0] = returnError
533 } // Else should never occur - maybe never say never?
534 break
535 } else {
536 success = true
537 }
538 } else {
539 returnedValues[idx-1] = val.Interface()
540 }
541 }
542 }
543
544 var icm *ca.InterContainerMessage
545 if icm, err = encodeResponse(msg, success, returnedValues...); err != nil {
546 log.Warnw("error-encoding-response-returning-failure-result", log.Fields{"erroe": err})
547 icm = encodeDefaultFailedResponse(msg)
548 }
549 kp.sendToKafkaTopic(icm, &Topic{Name: msg.Header.FromTopic})
550 }
551
552 } else if msg.Header.Type == ca.MessageType_RESPONSE {
553 log.Warnw("received-response-on-request-handler", log.Fields{"header": msg.Header})
554 } else {
555 log.Errorw("invalid-message", log.Fields{"header": msg.Header})
556 }
557}
558
559func (kp *KafkaMessagingProxy) waitForRequest(ch <-chan *ca.InterContainerMessage, topic Topic, targetInterface interface{}) {
560 // Wait for messages
561 for msg := range ch {
562 log.Debugw("request-received", log.Fields{"msg": msg, "topic": topic.Name, "target": targetInterface})
563 go kp.handleRequest(msg, targetInterface)
564 }
565}
566
567// dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
568// topic via the unique channel each subsciber received during subscription
569func (kp *KafkaMessagingProxy) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ca.InterContainerMessage) {
570 // Need to go over all channels and publish messages to them - do we need to copy msg?
571 kp.lockTopicToConsumerChannelMap.Lock()
572 defer kp.lockTopicToConsumerChannelMap.Unlock()
573 for _, ch := range consumerCh.channels {
574 go func(c chan *ca.InterContainerMessage) {
575 c <- protoMessage
576 }(ch)
577 }
578}
579
580func (kp *KafkaMessagingProxy) consumeMessagesLoop(topic Topic) {
581 log.Debugw("starting-consuming-messages", log.Fields{"topic": topic.Name})
582 var consumerCh *consumerChannels
583 if consumerCh = kp.getConsumerChannel(topic); consumerCh == nil {
584 log.Errorw("consumer-not-exist", log.Fields{"topic": topic.Name})
585 return
586 }
587startloop:
588 for {
589 select {
590 case err := <-consumerCh.consumer.Errors():
591 log.Warnw("consumer-error", log.Fields{"error": err})
592 case msg := <-consumerCh.consumer.Messages():
593 log.Debugw("message-received", log.Fields{"msg": msg})
594 // Since the only expected message is a proto intercontainermessage then extract it right away
595 // instead of dispatching it to the consumers
596 msgBody := msg.Value
597 icm := &ca.InterContainerMessage{}
598 if err := proto.Unmarshal(msgBody, icm); err != nil {
599 log.Warnw("invalid-message", log.Fields{"error": err})
600 continue
601 }
602 log.Debugw("msg-to-consumers", log.Fields{"msg": *icm, "len": len(consumerCh.channels)})
603
604 go kp.dispatchToConsumers(consumerCh, icm)
605 case <-kp.doneCh:
606 log.Infow("received-exit-signal", log.Fields{"topic": topic.Name})
607 break startloop
608 }
609 }
610}
611
612func (kp *KafkaMessagingProxy) dispatchResponse(msg *ca.InterContainerMessage) {
613 kp.lockTransactionIdToChannelMap.Lock()
614 defer kp.lockTransactionIdToChannelMap.Unlock()
615 if _, exist := kp.transactionIdToChannelMap[msg.Header.Id]; !exist {
616 log.Debugw("no-waiting-channel", log.Fields{"transaction": msg.Header.Id})
617 return
618 }
619 kp.transactionIdToChannelMap[msg.Header.Id] <- msg
620}
621
622func (kp *KafkaMessagingProxy) waitForResponse(ch chan *ca.InterContainerMessage, topic Topic) {
623 log.Debugw("starting-consuming-responses-loop", log.Fields{"topic": topic.Name})
624startloop:
625 for {
626 select {
627 case msg := <-ch:
628 log.Debugw("message-received", log.Fields{"topic": topic.Name, "msg": msg})
629 go kp.dispatchResponse(msg)
630 // Need to handle program exit - TODO
631 case <-kp.doneCh:
632 log.Infow("received-exit-signal", log.Fields{"topic": topic.Name})
633 break startloop
634 }
635 }
636}
637
638// createConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
639// for that topic. It also starts the routine that listens for messages on that topic.
640func (kp *KafkaMessagingProxy) setupConsumerChannel(topic Topic) (chan *ca.InterContainerMessage, error) {
641
642 if consumerCh := kp.getConsumerChannel(topic); consumerCh != nil {
643 return nil, nil // Already created, so just ignore
644 }
645
646 partitionList, err := kp.consumer.Partitions(topic.Name)
647 if err != nil {
648 log.Warnw("get-partition-failure", log.Fields{"error": err, "topic": topic.Name})
649 return nil, err
650 }
651
652 log.Debugw("partitions", log.Fields{"topic": topic.Name, "partitionList": partitionList, "first": partitionList[0]})
653 // Create a partition consumer for that topic - for now just use one partition
654 var pConsumer sarama.PartitionConsumer
655 if pConsumer, err = kp.consumer.ConsumePartition(topic.Name, partitionList[0], sarama.OffsetNewest); err != nil {
656 log.Warnw("consumer-partition-failure", log.Fields{"error": err, "topic": topic.Name})
657 return nil, err
658 }
659
660 // Create the consumer/channel structure and set the consumer and create a channel on that topic - for now
661 // unbuffered to verify race conditions.
662 consumerListeningChannel := make(chan *ca.InterContainerMessage)
663 cc := &consumerChannels{
664 consumer: pConsumer,
665 channels: []chan *ca.InterContainerMessage{consumerListeningChannel},
666 }
667
668 // Add the consumer channel to the map
669 kp.addToTopicToConsumerChannelMap(topic.Name, cc)
670
671 //Start a consumer to listen on that specific topic
672 go kp.consumeMessagesLoop(topic)
673
674 return consumerListeningChannel, nil
675}
676
677// subscribeForResponse allows a caller to subscribe to a given topic when waiting for a response.
678// This method is built to prevent all subscribers to receive all messages as is the case of the Subscribe
679// API. There is one response channel waiting for kafka messages before dispatching the message to the
680// corresponding waiting channel
681func (kp *KafkaMessagingProxy) subscribeForResponse(topic Topic, trnsId string) (chan *ca.InterContainerMessage, error) {
682 log.Debugw("subscribeForResponse", log.Fields{"topic": topic.Name})
683
684 if consumerCh := kp.getConsumerChannel(topic); consumerCh == nil {
685 log.Debugw("topic-not-subscribed", log.Fields{"topic": topic.Name})
686 var consumerListeningChannel chan *ca.InterContainerMessage
687 var err error
688 if consumerListeningChannel, err = kp.setupConsumerChannel(topic); err != nil {
689 log.Warnw("create-consumer-channel-failure", log.Fields{"error": err, "topic": topic.Name})
690 return nil, err
691 }
692 // Start a go routine to listen to response messages over the consumer listening channel
693 go kp.waitForResponse(consumerListeningChannel, topic)
694 }
695
696 ch := make(chan *ca.InterContainerMessage)
697 kp.addToTransactionIdToChannelMap(trnsId, ch)
698
699 return ch, nil
700}
701
702func removeChannel(channels []chan *ca.InterContainerMessage, ch <-chan *ca.InterContainerMessage) []chan *ca.InterContainerMessage {
703 var i int
704 var channel chan *ca.InterContainerMessage
705 for i, channel = range channels {
706 if channel == ch {
707 channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]
708 close(channel)
709 return channels[:len(channels)-1]
710 }
711 }
712 return channels
713}
714
715func (kp *KafkaMessagingProxy) unSubscribeForResponse(trnsId string) error {
716 log.Debugw("unsubscribe-for-response", log.Fields{"trnsId": trnsId})
717 // Close the channel first
718 close(kp.transactionIdToChannelMap[trnsId])
719 kp.deleteFromTransactionIdToChannelMap(trnsId)
720 return nil
721}
722
723//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
724//or an error on failure
725func encodeRequest(rpc string, toTopic *Topic, replyTopic *Topic, kvArgs ...*KVArg) (*ca.InterContainerMessage, error) {
726 requestHeader := &ca.Header{
727 Id: uuid.New().String(),
728 Type: ca.MessageType_REQUEST,
729 FromTopic: replyTopic.Name,
730 ToTopic: toTopic.Name,
731 Timestamp: time.Now().Unix(),
732 }
733 requestBody := &ca.InterContainerRequestBody{
734 Rpc: rpc,
735 ResponseRequired: true,
736 ReplyToTopic: replyTopic.Name,
737 }
738
739 for _, arg := range kvArgs {
740 var marshalledArg *any.Any
741 var err error
742 // ascertain the value interface type is a proto.Message
743 protoValue, ok := arg.Value.(proto.Message)
744 if !ok {
745 log.Warnw("argument-value-not-proto-message", log.Fields{"error": ok, "Value": arg.Value})
746 err := errors.New("argument-value-not-proto-message")
747 return nil, err
748 }
749 if marshalledArg, err = ptypes.MarshalAny(protoValue); err != nil {
750 log.Warnw("cannot-marshal-request", log.Fields{"error": err})
751 return nil, err
752 }
753 protoArg := &ca.Argument{
754 Key: arg.Key,
755 Value: marshalledArg,
756 }
757 requestBody.Args = append(requestBody.Args, protoArg)
758 }
759
760 var marshalledData *any.Any
761 var err error
762 if marshalledData, err = ptypes.MarshalAny(requestBody); err != nil {
763 log.Warnw("cannot-marshal-request", log.Fields{"error": err})
764 return nil, err
765 }
766 request := &ca.InterContainerMessage{
767 Header: requestHeader,
768 Body: marshalledData,
769 }
770 return request, nil
771}
772
773// sendRequest formats and sends the request onto the kafka messaging bus. It waits for the
774// response if needed. This function must, therefore, be run in its own routine.
775func (kp *KafkaMessagingProxy) sendToKafkaTopic(msg *ca.InterContainerMessage, topic *Topic) {
776
777 // Create the Sarama producer message
778 time := time.Now().Unix()
779 marshalled, _ := proto.Marshal(msg)
780 kafkaMsg := &sarama.ProducerMessage{
781 Topic: topic.Name,
782 Key: sarama.StringEncoder(time),
783 Value: sarama.ByteEncoder(marshalled),
784 }
785
786 // Send message to kafka
787 kp.producer.Input() <- kafkaMsg
788
789}
790
791func decodeResponse(response *ca.InterContainerMessage) (*ca.InterContainerResponseBody, error) {
792 // Extract the message body
793 responseBody := ca.InterContainerResponseBody{}
794
795 log.Debugw("decodeResponse", log.Fields{"icr": &response})
796 if err := ptypes.UnmarshalAny(response.Body, &responseBody); err != nil {
797 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
798 return nil, err
799 }
800 log.Debugw("decodeResponse", log.Fields{"icrbody": &responseBody})
801
802 return &responseBody, nil
803
804}