blob: 00086e3b803f46585f8b7bc404e32cd3b348b739 [file] [log] [blame]
khenaidoobf6e7bb2018-08-14 22:27:29 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
khenaidooabad44c2018-08-03 16:58:35 -040016package kafka
17
18import (
19 "context"
20 "errors"
21 "fmt"
22 "github.com/Shopify/sarama"
23 "github.com/golang/protobuf/proto"
24 "github.com/golang/protobuf/ptypes"
25 "github.com/golang/protobuf/ptypes/any"
26 "github.com/google/uuid"
27 "github.com/opencord/voltha-go/common/log"
28 ca "github.com/opencord/voltha-go/protos/core_adapter"
29 "reflect"
30 "sync"
31 "time"
32)
33
34// Initialize the logger - gets the default until the main function setup the logger
35func init() {
36 log.GetLogger()
37}
38
39const (
40 DefaultKafkaHost = "10.100.198.240"
41 DefaultKafkaPort = 9092
42 DefaultTopicName = "Core"
43 DefaultSleepOnError = 1
44 DefaultFlushFrequency = 1
45 DefaultFlushMessages = 1
46 DefaultFlushMaxmessages = 1
47 DefaultMaxRetries = 3
48 DefaultReturnSuccess = false
49 DefaultReturnErrors = true
50 DefaultConsumerMaxwait = 50
51 DefaultMaxProcessingTime = 100
52 DefaultRequestTimeout = 50 // 50 milliseconds
53)
54
55type consumerChannels struct {
56 consumer sarama.PartitionConsumer
57 channels []chan *ca.InterContainerMessage
58}
59
60// KafkaMessagingProxy represents the messaging proxy
61type KafkaMessagingProxy struct {
62 KafkaHost string
63 KafkaPort int
64 DefaultTopic *Topic
65 TargetInterface interface{}
66 producer sarama.AsyncProducer
67 consumer sarama.Consumer
68 doneCh chan int
69 topicToConsumerChannelMap map[string]*consumerChannels
70 transactionIdToChannelMap map[string]chan *ca.InterContainerMessage
71 lockTopicToConsumerChannelMap sync.RWMutex
72 lockTransactionIdToChannelMap sync.RWMutex
73}
74
75type KafkaProxyOption func(*KafkaMessagingProxy)
76
77func KafkaHost(host string) KafkaProxyOption {
78 return func(args *KafkaMessagingProxy) {
79 args.KafkaHost = host
80 }
81}
82
83func KafkaPort(port int) KafkaProxyOption {
84 return func(args *KafkaMessagingProxy) {
85 args.KafkaPort = port
86 }
87}
88
89func DefaultTopic(topic *Topic) KafkaProxyOption {
90 return func(args *KafkaMessagingProxy) {
91 args.DefaultTopic = topic
92 }
93}
94
95func TargetInterface(target interface{}) KafkaProxyOption {
96 return func(args *KafkaMessagingProxy) {
97 args.TargetInterface = target
98 }
99}
100
101func NewKafkaMessagingProxy(opts ...KafkaProxyOption) (*KafkaMessagingProxy, error) {
102 proxy := &KafkaMessagingProxy{
103 KafkaHost: DefaultKafkaHost,
104 KafkaPort: DefaultKafkaPort,
105 DefaultTopic: &Topic{Name: DefaultTopicName},
106 }
107
108 for _, option := range opts {
109 option(proxy)
110 }
111
112 // Create the locks for all the maps
113 proxy.lockTopicToConsumerChannelMap = sync.RWMutex{}
114 proxy.lockTransactionIdToChannelMap = sync.RWMutex{}
115
116 return proxy, nil
117}
118
119func (kp *KafkaMessagingProxy) Start() error {
120 log.Info("Starting-Proxy")
121
122 // Create the Done channel
123 kp.doneCh = make(chan int, 1)
124
125 // Create the Publisher
126 if err := kp.createPublisher(DefaultMaxRetries); err != nil {
127 log.Errorw("Cannot-create-kafka-publisher", log.Fields{"error": err})
128 return err
129 }
130
131 // Create the master consumer
132 if err := kp.createConsumer(DefaultMaxRetries); err != nil {
133 log.Errorw("Cannot-create-kafka-consumer", log.Fields{"error": err})
134 return err
135 }
136
137 // Create the topic to consumer/channel map
138 kp.topicToConsumerChannelMap = make(map[string]*consumerChannels)
139
140 // Create the transactionId to Channel Map
141 kp.transactionIdToChannelMap = make(map[string]chan *ca.InterContainerMessage)
142
143 return nil
144}
145
146func (kp *KafkaMessagingProxy) Stop() {
147 log.Info("Stopping-Proxy")
148 if kp.producer != nil {
149 if err := kp.producer.Close(); err != nil {
150 panic(err)
151 }
152 }
153 if kp.consumer != nil {
154 if err := kp.consumer.Close(); err != nil {
155 panic(err)
156 }
157 }
158 //Close the done channel to close all long processing Go routines
159 close(kp.doneCh)
160}
161
162func (kp *KafkaMessagingProxy) InvokeRPC(ctx context.Context, rpc string, topic *Topic, waitForResponse bool,
163 kvArgs ...*KVArg) (bool, *any.Any) {
164 // Encode the request
165 protoRequest, err := encodeRequest(rpc, topic, kp.DefaultTopic, kvArgs...)
166 if err != nil {
167 log.Warnw("cannot-format-request", log.Fields{"rpc": rpc, "error": err})
168 return false, nil
169 }
170
171 // Subscribe for response, if needed, before sending request
172 var ch <-chan *ca.InterContainerMessage
173 if waitForResponse {
174 var err error
175 if ch, err = kp.subscribeForResponse(*kp.DefaultTopic, protoRequest.Header.Id); err != nil {
176 log.Errorw("failed-to-subscribe-for-response", log.Fields{"error": err, "topic": topic.Name})
177 }
178 }
179
180 // Send request
181 go kp.sendToKafkaTopic(protoRequest, topic)
182
183 if waitForResponse {
184 // if ctx is nil use a default timeout ctx to ensure we do not wait forever
185 var cancel context.CancelFunc
186 if ctx == nil {
187 ctx, cancel = context.WithTimeout(context.Background(), DefaultRequestTimeout*time.Millisecond)
188 defer cancel()
189 }
190
191 // Wait for response as well as timeout or cancellation
192 // Remove the subscription for a response on return
193 defer kp.unSubscribeForResponse(protoRequest.Header.Id)
194 select {
195 case msg := <-ch:
196 log.Debugw("received-response", log.Fields{"rpc": rpc, "msg": msg})
197
198 var responseBody *ca.InterContainerResponseBody
199 var err error
200 if responseBody, err = decodeResponse(msg); err != nil {
201 log.Errorw("decode-response-error", log.Fields{"error": err})
202 }
203 return responseBody.Success, responseBody.Result
204 case <-ctx.Done():
205 log.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
206 // pack the error as proto any type
207 protoError := &ca.Error{Reason: ctx.Err().Error()}
208 var marshalledArg *any.Any
209 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
210 return false, nil // Should never happen
211 }
212 return false, marshalledArg
213 case <-kp.doneCh:
214 log.Infow("received-exit-signal", log.Fields{"topic": topic.Name, "rpc": rpc})
215 return true, nil
216 }
217 }
218 return true, nil
219}
220
221// Subscribe allows a caller to subscribe to a given topic. A channel is returned to the
222// caller to receive messages from that topic.
223func (kp *KafkaMessagingProxy) Subscribe(topic Topic) (<-chan *ca.InterContainerMessage, error) {
224
225 log.Debugw("subscribe", log.Fields{"topic": topic.Name})
226
227 if consumerCh := kp.getConsumerChannel(topic); consumerCh != nil {
228 log.Debugw("topic-already-subscribed", log.Fields{"topic": topic.Name})
229 // Create a channel specific for that consumer and add it to the consumer channel map
230 ch := make(chan *ca.InterContainerMessage)
231 kp.addChannelToConsumerChannelMap(topic, ch)
232 return ch, nil
233 }
234
235 // Register for the topic and set it up
236 var consumerListeningChannel chan *ca.InterContainerMessage
237 var err error
238 if consumerListeningChannel, err = kp.setupConsumerChannel(topic); err != nil {
239 log.Warnw("create-consumer-channel-failure", log.Fields{"error": err, "topic": topic.Name})
240 return nil, err
241 }
242
243 return consumerListeningChannel, nil
244}
245
246func (kp *KafkaMessagingProxy) UnSubscribe(topic Topic, ch <-chan *ca.InterContainerMessage) error {
247 log.Debugw("unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
248 err := kp.removeChannelFromConsumerChannelMap(topic, ch)
249 return err
250}
251
252// SubscribeWithTarget allows a caller to assign a target object to be invoked automatically
253// when a message is received on a given topic
254func (kp *KafkaMessagingProxy) SubscribeWithTarget(topic Topic, targetInterface interface{}) error {
255
256 // Subscribe to receive messages for that topic
257 var ch <-chan *ca.InterContainerMessage
258 var err error
259 if ch, err = kp.Subscribe(topic); err != nil {
260 log.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
261 }
262 // Launch a go routine to receive and process kafka messages
263 go kp.waitForRequest(ch, topic, targetInterface)
264
265 return nil
266}
267
268func (kp *KafkaMessagingProxy) UnSubscribeTarget(ctx context.Context, topic Topic, targetInterface interface{}) error {
269 // TODO - mostly relevant with multiple interfaces
270 return nil
271}
272
273func (kp *KafkaMessagingProxy) addToTopicToConsumerChannelMap(id string, arg *consumerChannels) {
274 kp.lockTopicToConsumerChannelMap.Lock()
275 defer kp.lockTopicToConsumerChannelMap.Unlock()
276 if _, exist := kp.topicToConsumerChannelMap[id]; !exist {
277 kp.topicToConsumerChannelMap[id] = arg
278 }
279}
280
281func (kp *KafkaMessagingProxy) deleteFromTopicToConsumerChannelMap(id string) {
282 kp.lockTopicToConsumerChannelMap.Lock()
283 defer kp.lockTopicToConsumerChannelMap.Unlock()
284 if _, exist := kp.topicToConsumerChannelMap[id]; exist {
285 delete(kp.topicToConsumerChannelMap, id)
286 }
287}
288
289func (kp *KafkaMessagingProxy) getConsumerChannel(topic Topic) *consumerChannels {
290 kp.lockTopicToConsumerChannelMap.Lock()
291 defer kp.lockTopicToConsumerChannelMap.Unlock()
292
293 if consumerCh, exist := kp.topicToConsumerChannelMap[topic.Name]; exist {
294 return consumerCh
295 }
296 return nil
297}
298
299func (kp *KafkaMessagingProxy) addChannelToConsumerChannelMap(topic Topic, ch chan *ca.InterContainerMessage) {
300 kp.lockTopicToConsumerChannelMap.Lock()
301 defer kp.lockTopicToConsumerChannelMap.Unlock()
302 if consumerCh, exist := kp.topicToConsumerChannelMap[topic.Name]; exist {
303 consumerCh.channels = append(consumerCh.channels, ch)
304 return
305 }
306 log.Warnw("consumer-channel-not-exist", log.Fields{"topic": topic.Name})
307}
308
309func (kp *KafkaMessagingProxy) removeChannelFromConsumerChannelMap(topic Topic, ch <-chan *ca.InterContainerMessage) error {
310 kp.lockTopicToConsumerChannelMap.Lock()
311 defer kp.lockTopicToConsumerChannelMap.Unlock()
312 if consumerCh, exist := kp.topicToConsumerChannelMap[topic.Name]; exist {
313 // Channel will be closed in the removeChannel method
314 consumerCh.channels = removeChannel(consumerCh.channels, ch)
315 return nil
316 }
317 log.Warnw("topic-does-not-exist", log.Fields{"topic": topic.Name})
318 return errors.New("topic-does-not-exist")
319}
320
321func (kp *KafkaMessagingProxy) addToTransactionIdToChannelMap(id string, arg chan *ca.InterContainerMessage) {
322 kp.lockTransactionIdToChannelMap.Lock()
323 defer kp.lockTransactionIdToChannelMap.Unlock()
324 if _, exist := kp.transactionIdToChannelMap[id]; !exist {
325 kp.transactionIdToChannelMap[id] = arg
326 }
327}
328
329func (kp *KafkaMessagingProxy) deleteFromTransactionIdToChannelMap(id string) {
330 kp.lockTransactionIdToChannelMap.Lock()
331 defer kp.lockTransactionIdToChannelMap.Unlock()
332 if _, exist := kp.transactionIdToChannelMap[id]; exist {
333 delete(kp.transactionIdToChannelMap, id)
334 }
335}
336
337func (kp *KafkaMessagingProxy) createPublisher(retries int) error {
338 // This Creates the publisher
339 config := sarama.NewConfig()
340 config.Producer.Partitioner = sarama.NewRandomPartitioner
341 config.Producer.Flush.Frequency = time.Duration(DefaultFlushFrequency)
342 config.Producer.Flush.Messages = DefaultFlushMessages
343 config.Producer.Flush.MaxMessages = DefaultFlushMaxmessages
344 config.Producer.Return.Errors = DefaultReturnErrors
345 config.Producer.Return.Successes = DefaultReturnSuccess
346 config.Producer.RequiredAcks = sarama.WaitForAll
347 kafkaFullAddr := fmt.Sprintf("%s:%d", kp.KafkaHost, kp.KafkaPort)
348 brokers := []string{kafkaFullAddr}
349
350 for {
351 producer, err := sarama.NewAsyncProducer(brokers, config)
352 if err != nil {
353 if retries == 0 {
354 log.Errorw("error-starting-publisher", log.Fields{"error": err})
355 return err
356 } else {
357 // If retries is -ve then we will retry indefinitely
358 retries--
359 }
360 log.Info("retrying-after-a-second-delay")
361 time.Sleep(time.Duration(DefaultSleepOnError) * time.Second)
362 } else {
363 kp.producer = producer
364 break
365 }
366 }
367 log.Info("Kafka-publisher-created")
368 return nil
369}
370
371func (kp *KafkaMessagingProxy) createConsumer(retries int) error {
372 config := sarama.NewConfig()
373 config.Consumer.Return.Errors = true
374 config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
375 config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
376 config.Consumer.Offsets.Initial = sarama.OffsetNewest
377 kafkaFullAddr := fmt.Sprintf("%s:%d", kp.KafkaHost, kp.KafkaPort)
378 brokers := []string{kafkaFullAddr}
379
380 for {
381 consumer, err := sarama.NewConsumer(brokers, config)
382 if err != nil {
383 if retries == 0 {
384 log.Errorw("error-starting-consumer", log.Fields{"error": err})
385 return err
386 } else {
387 // If retries is -ve then we will retry indefinitely
388 retries--
389 }
390 log.Info("retrying-after-a-second-delay")
391 time.Sleep(time.Duration(DefaultSleepOnError) * time.Second)
392 } else {
393 kp.consumer = consumer
394 break
395 }
396 }
397 log.Info("Kafka-consumer-created")
398 return nil
399}
400
401func encodeReturnedValue(request *ca.InterContainerMessage, returnedVal interface{}) (*any.Any, error) {
402 // Encode the response argument - needs to be a proto message
403 if returnedVal == nil {
404 return nil, nil
405 }
406 protoValue, ok := returnedVal.(proto.Message)
407 if !ok {
408 log.Warnw("response-value-not-proto-message", log.Fields{"error": ok, "returnVal": returnedVal})
409 err := errors.New("response-value-not-proto-message")
410 return nil, err
411 }
412
413 // Marshal the returned value, if any
414 var marshalledReturnedVal *any.Any
415 var err error
416 if marshalledReturnedVal, err = ptypes.MarshalAny(protoValue); err != nil {
417 log.Warnw("cannot-marshal-returned-val", log.Fields{"error": err})
418 return nil, err
419 }
420 return marshalledReturnedVal, nil
421}
422
423func encodeDefaultFailedResponse(request *ca.InterContainerMessage) *ca.InterContainerMessage {
424 responseHeader := &ca.Header{
425 Id: request.Header.Id,
426 Type: ca.MessageType_RESPONSE,
427 FromTopic: request.Header.ToTopic,
428 ToTopic: request.Header.FromTopic,
429 Timestamp: time.Now().Unix(),
430 }
431 responseBody := &ca.InterContainerResponseBody{
432 Success: false,
433 Result: nil,
434 }
435 var marshalledResponseBody *any.Any
436 var err error
437 // Error should never happen here
438 if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
439 log.Warnw("cannot-marshal-failed-response-body", log.Fields{"error": err})
440 }
441
442 return &ca.InterContainerMessage{
443 Header: responseHeader,
444 Body: marshalledResponseBody,
445 }
446
447}
448
449//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
450//or an error on failure
451func encodeResponse(request *ca.InterContainerMessage, success bool, returnedValues ...interface{}) (*ca.InterContainerMessage, error) {
452
453 log.Infow("encodeResponse", log.Fields{"success": success, "returnedValues": returnedValues})
454 responseHeader := &ca.Header{
455 Id: request.Header.Id,
456 Type: ca.MessageType_RESPONSE,
457 FromTopic: request.Header.ToTopic,
458 ToTopic: request.Header.FromTopic,
459 Timestamp: time.Now().Unix(),
460 }
461
462 // Go over all returned values
463 var marshalledReturnedVal *any.Any
464 var err error
465 for _, returnVal := range returnedValues {
466 if marshalledReturnedVal, err = encodeReturnedValue(request, returnVal); err != nil {
467 log.Warnw("cannot-marshal-response-body", log.Fields{"error": err})
468 }
469 break // for now we support only 1 returned value - (excluding the error)
470 }
471
472 responseBody := &ca.InterContainerResponseBody{
473 Success: success,
474 Result: marshalledReturnedVal,
475 }
476
477 // Marshal the response body
478 var marshalledResponseBody *any.Any
479 if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
480 log.Warnw("cannot-marshal-response-body", log.Fields{"error": err})
481 return nil, err
482 }
483
484 return &ca.InterContainerMessage{
485 Header: responseHeader,
486 Body: marshalledResponseBody,
487 }, nil
488}
489
490func CallFuncByName(myClass interface{}, funcName string, params ...interface{}) (out []reflect.Value, err error) {
491 myClassValue := reflect.ValueOf(myClass)
492 m := myClassValue.MethodByName(funcName)
493 if !m.IsValid() {
494 return make([]reflect.Value, 0), fmt.Errorf("Method not found \"%s\"", funcName)
495 }
496 in := make([]reflect.Value, len(params))
497 for i, param := range params {
498 in[i] = reflect.ValueOf(param)
499 }
500 out = m.Call(in)
501 return
502}
503
504func (kp *KafkaMessagingProxy) handleRequest(msg *ca.InterContainerMessage, targetInterface interface{}) {
505
506 // First extract the header to know whether this is a request of a response
507 if msg.Header.Type == ca.MessageType_REQUEST {
508 log.Debugw("received-request", log.Fields{"header": msg.Header})
509
510 var out []reflect.Value
511 var err error
512
513 // Get the request body
514 requestBody := &ca.InterContainerRequestBody{}
515 if err = ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
516 log.Warnw("cannot-unmarshal-request", log.Fields{"error": err})
517 } else {
518 // let the callee unpack the arguments as its the only one that knows the real proto type
519 out, err = CallFuncByName(targetInterface, requestBody.Rpc, requestBody.Args)
520 if err != nil {
521 log.Warn(err)
522 }
523 }
524 // Response required?
525 if requestBody.ResponseRequired {
526 // If we already have an error before then just return that
527 var returnError *ca.Error
528 var returnedValues []interface{}
529 var success bool
530 if err != nil {
531 returnError = &ca.Error{Reason: err.Error()}
532 returnedValues = make([]interface{}, 1)
533 returnedValues[0] = returnError
534 } else {
535 log.Debugw("returned-api-response", log.Fields{"len": len(out), "err": err})
536 returnSize := 1 // Minimum array size
537 if len(out) > 1 {
538 returnSize = len(out) - 1
539 }
540 returnedValues = make([]interface{}, returnSize)
541 for idx, val := range out {
542 log.Debugw("returned-api-response-loop", log.Fields{"idx": idx, "val": val.Interface()})
543 if idx == 0 {
544 if val.Interface() != nil {
545 if goError, ok := out[0].Interface().(error); ok {
546 returnError = &ca.Error{Reason: goError.Error()}
547 returnedValues[0] = returnError
548 } // Else should never occur - maybe never say never?
549 break
550 } else {
551 success = true
552 }
553 } else {
554 returnedValues[idx-1] = val.Interface()
555 }
556 }
557 }
558
559 var icm *ca.InterContainerMessage
560 if icm, err = encodeResponse(msg, success, returnedValues...); err != nil {
561 log.Warnw("error-encoding-response-returning-failure-result", log.Fields{"erroe": err})
562 icm = encodeDefaultFailedResponse(msg)
563 }
564 kp.sendToKafkaTopic(icm, &Topic{Name: msg.Header.FromTopic})
565 }
566
567 } else if msg.Header.Type == ca.MessageType_RESPONSE {
568 log.Warnw("received-response-on-request-handler", log.Fields{"header": msg.Header})
569 } else {
570 log.Errorw("invalid-message", log.Fields{"header": msg.Header})
571 }
572}
573
574func (kp *KafkaMessagingProxy) waitForRequest(ch <-chan *ca.InterContainerMessage, topic Topic, targetInterface interface{}) {
575 // Wait for messages
576 for msg := range ch {
577 log.Debugw("request-received", log.Fields{"msg": msg, "topic": topic.Name, "target": targetInterface})
578 go kp.handleRequest(msg, targetInterface)
579 }
580}
581
582// dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
583// topic via the unique channel each subsciber received during subscription
584func (kp *KafkaMessagingProxy) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ca.InterContainerMessage) {
585 // Need to go over all channels and publish messages to them - do we need to copy msg?
586 kp.lockTopicToConsumerChannelMap.Lock()
587 defer kp.lockTopicToConsumerChannelMap.Unlock()
588 for _, ch := range consumerCh.channels {
589 go func(c chan *ca.InterContainerMessage) {
590 c <- protoMessage
591 }(ch)
592 }
593}
594
595func (kp *KafkaMessagingProxy) consumeMessagesLoop(topic Topic) {
596 log.Debugw("starting-consuming-messages", log.Fields{"topic": topic.Name})
597 var consumerCh *consumerChannels
598 if consumerCh = kp.getConsumerChannel(topic); consumerCh == nil {
599 log.Errorw("consumer-not-exist", log.Fields{"topic": topic.Name})
600 return
601 }
602startloop:
603 for {
604 select {
605 case err := <-consumerCh.consumer.Errors():
606 log.Warnw("consumer-error", log.Fields{"error": err})
607 case msg := <-consumerCh.consumer.Messages():
608 log.Debugw("message-received", log.Fields{"msg": msg})
609 // Since the only expected message is a proto intercontainermessage then extract it right away
610 // instead of dispatching it to the consumers
611 msgBody := msg.Value
612 icm := &ca.InterContainerMessage{}
613 if err := proto.Unmarshal(msgBody, icm); err != nil {
614 log.Warnw("invalid-message", log.Fields{"error": err})
615 continue
616 }
617 log.Debugw("msg-to-consumers", log.Fields{"msg": *icm, "len": len(consumerCh.channels)})
618
619 go kp.dispatchToConsumers(consumerCh, icm)
620 case <-kp.doneCh:
621 log.Infow("received-exit-signal", log.Fields{"topic": topic.Name})
622 break startloop
623 }
624 }
625}
626
627func (kp *KafkaMessagingProxy) dispatchResponse(msg *ca.InterContainerMessage) {
628 kp.lockTransactionIdToChannelMap.Lock()
629 defer kp.lockTransactionIdToChannelMap.Unlock()
630 if _, exist := kp.transactionIdToChannelMap[msg.Header.Id]; !exist {
631 log.Debugw("no-waiting-channel", log.Fields{"transaction": msg.Header.Id})
632 return
633 }
634 kp.transactionIdToChannelMap[msg.Header.Id] <- msg
635}
636
637func (kp *KafkaMessagingProxy) waitForResponse(ch chan *ca.InterContainerMessage, topic Topic) {
638 log.Debugw("starting-consuming-responses-loop", log.Fields{"topic": topic.Name})
639startloop:
640 for {
641 select {
642 case msg := <-ch:
643 log.Debugw("message-received", log.Fields{"topic": topic.Name, "msg": msg})
644 go kp.dispatchResponse(msg)
645 // Need to handle program exit - TODO
646 case <-kp.doneCh:
647 log.Infow("received-exit-signal", log.Fields{"topic": topic.Name})
648 break startloop
649 }
650 }
651}
652
653// createConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
654// for that topic. It also starts the routine that listens for messages on that topic.
655func (kp *KafkaMessagingProxy) setupConsumerChannel(topic Topic) (chan *ca.InterContainerMessage, error) {
656
657 if consumerCh := kp.getConsumerChannel(topic); consumerCh != nil {
658 return nil, nil // Already created, so just ignore
659 }
660
661 partitionList, err := kp.consumer.Partitions(topic.Name)
662 if err != nil {
663 log.Warnw("get-partition-failure", log.Fields{"error": err, "topic": topic.Name})
664 return nil, err
665 }
666
667 log.Debugw("partitions", log.Fields{"topic": topic.Name, "partitionList": partitionList, "first": partitionList[0]})
668 // Create a partition consumer for that topic - for now just use one partition
669 var pConsumer sarama.PartitionConsumer
670 if pConsumer, err = kp.consumer.ConsumePartition(topic.Name, partitionList[0], sarama.OffsetNewest); err != nil {
671 log.Warnw("consumer-partition-failure", log.Fields{"error": err, "topic": topic.Name})
672 return nil, err
673 }
674
675 // Create the consumer/channel structure and set the consumer and create a channel on that topic - for now
676 // unbuffered to verify race conditions.
677 consumerListeningChannel := make(chan *ca.InterContainerMessage)
678 cc := &consumerChannels{
679 consumer: pConsumer,
680 channels: []chan *ca.InterContainerMessage{consumerListeningChannel},
681 }
682
683 // Add the consumer channel to the map
684 kp.addToTopicToConsumerChannelMap(topic.Name, cc)
685
686 //Start a consumer to listen on that specific topic
687 go kp.consumeMessagesLoop(topic)
688
689 return consumerListeningChannel, nil
690}
691
692// subscribeForResponse allows a caller to subscribe to a given topic when waiting for a response.
693// This method is built to prevent all subscribers to receive all messages as is the case of the Subscribe
694// API. There is one response channel waiting for kafka messages before dispatching the message to the
695// corresponding waiting channel
696func (kp *KafkaMessagingProxy) subscribeForResponse(topic Topic, trnsId string) (chan *ca.InterContainerMessage, error) {
697 log.Debugw("subscribeForResponse", log.Fields{"topic": topic.Name})
698
699 if consumerCh := kp.getConsumerChannel(topic); consumerCh == nil {
700 log.Debugw("topic-not-subscribed", log.Fields{"topic": topic.Name})
701 var consumerListeningChannel chan *ca.InterContainerMessage
702 var err error
703 if consumerListeningChannel, err = kp.setupConsumerChannel(topic); err != nil {
704 log.Warnw("create-consumer-channel-failure", log.Fields{"error": err, "topic": topic.Name})
705 return nil, err
706 }
707 // Start a go routine to listen to response messages over the consumer listening channel
708 go kp.waitForResponse(consumerListeningChannel, topic)
709 }
710
711 ch := make(chan *ca.InterContainerMessage)
712 kp.addToTransactionIdToChannelMap(trnsId, ch)
713
714 return ch, nil
715}
716
717func removeChannel(channels []chan *ca.InterContainerMessage, ch <-chan *ca.InterContainerMessage) []chan *ca.InterContainerMessage {
718 var i int
719 var channel chan *ca.InterContainerMessage
720 for i, channel = range channels {
721 if channel == ch {
722 channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]
723 close(channel)
724 return channels[:len(channels)-1]
725 }
726 }
727 return channels
728}
729
730func (kp *KafkaMessagingProxy) unSubscribeForResponse(trnsId string) error {
731 log.Debugw("unsubscribe-for-response", log.Fields{"trnsId": trnsId})
732 // Close the channel first
733 close(kp.transactionIdToChannelMap[trnsId])
734 kp.deleteFromTransactionIdToChannelMap(trnsId)
735 return nil
736}
737
738//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
739//or an error on failure
740func encodeRequest(rpc string, toTopic *Topic, replyTopic *Topic, kvArgs ...*KVArg) (*ca.InterContainerMessage, error) {
741 requestHeader := &ca.Header{
742 Id: uuid.New().String(),
743 Type: ca.MessageType_REQUEST,
744 FromTopic: replyTopic.Name,
745 ToTopic: toTopic.Name,
746 Timestamp: time.Now().Unix(),
747 }
748 requestBody := &ca.InterContainerRequestBody{
749 Rpc: rpc,
750 ResponseRequired: true,
751 ReplyToTopic: replyTopic.Name,
752 }
753
754 for _, arg := range kvArgs {
755 var marshalledArg *any.Any
756 var err error
757 // ascertain the value interface type is a proto.Message
758 protoValue, ok := arg.Value.(proto.Message)
759 if !ok {
760 log.Warnw("argument-value-not-proto-message", log.Fields{"error": ok, "Value": arg.Value})
761 err := errors.New("argument-value-not-proto-message")
762 return nil, err
763 }
764 if marshalledArg, err = ptypes.MarshalAny(protoValue); err != nil {
765 log.Warnw("cannot-marshal-request", log.Fields{"error": err})
766 return nil, err
767 }
768 protoArg := &ca.Argument{
769 Key: arg.Key,
770 Value: marshalledArg,
771 }
772 requestBody.Args = append(requestBody.Args, protoArg)
773 }
774
775 var marshalledData *any.Any
776 var err error
777 if marshalledData, err = ptypes.MarshalAny(requestBody); err != nil {
778 log.Warnw("cannot-marshal-request", log.Fields{"error": err})
779 return nil, err
780 }
781 request := &ca.InterContainerMessage{
782 Header: requestHeader,
783 Body: marshalledData,
784 }
785 return request, nil
786}
787
788// sendRequest formats and sends the request onto the kafka messaging bus. It waits for the
789// response if needed. This function must, therefore, be run in its own routine.
790func (kp *KafkaMessagingProxy) sendToKafkaTopic(msg *ca.InterContainerMessage, topic *Topic) {
791
792 // Create the Sarama producer message
793 time := time.Now().Unix()
794 marshalled, _ := proto.Marshal(msg)
795 kafkaMsg := &sarama.ProducerMessage{
796 Topic: topic.Name,
797 Key: sarama.StringEncoder(time),
798 Value: sarama.ByteEncoder(marshalled),
799 }
800
801 // Send message to kafka
802 kp.producer.Input() <- kafkaMsg
803
804}
805
806func decodeResponse(response *ca.InterContainerMessage) (*ca.InterContainerResponseBody, error) {
807 // Extract the message body
808 responseBody := ca.InterContainerResponseBody{}
809
810 log.Debugw("decodeResponse", log.Fields{"icr": &response})
811 if err := ptypes.UnmarshalAny(response.Body, &responseBody); err != nil {
812 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
813 return nil, err
814 }
815 log.Debugw("decodeResponse", log.Fields{"icrbody": &responseBody})
816
817 return &responseBody, nil
818
819}