blob: c00fb60d2a682257c18e291a94cd340dc9648c8f [file] [log] [blame]
khenaidoobf6e7bb2018-08-14 22:27:29 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
khenaidooabad44c2018-08-03 16:58:35 -040016package kafka
17
18import (
19 "context"
20 "errors"
21 "fmt"
22 "github.com/Shopify/sarama"
23 "github.com/golang/protobuf/proto"
24 "github.com/golang/protobuf/ptypes"
25 "github.com/golang/protobuf/ptypes/any"
26 "github.com/google/uuid"
27 "github.com/opencord/voltha-go/common/log"
28 ca "github.com/opencord/voltha-go/protos/core_adapter"
29 "reflect"
30 "sync"
31 "time"
32)
33
34// Initialize the logger - gets the default until the main function setup the logger
35func init() {
khenaidoob9203542018-09-17 22:56:37 -040036 log.AddPackage(log.JSON, log.WarnLevel, nil)
khenaidooabad44c2018-08-03 16:58:35 -040037}
38
39const (
40 DefaultKafkaHost = "10.100.198.240"
41 DefaultKafkaPort = 9092
42 DefaultTopicName = "Core"
43 DefaultSleepOnError = 1
44 DefaultFlushFrequency = 1
45 DefaultFlushMessages = 1
46 DefaultFlushMaxmessages = 1
47 DefaultMaxRetries = 3
48 DefaultReturnSuccess = false
49 DefaultReturnErrors = true
50 DefaultConsumerMaxwait = 50
51 DefaultMaxProcessingTime = 100
khenaidoob9203542018-09-17 22:56:37 -040052 DefaultRequestTimeout = 200 // 200 milliseconds - to handle a wider latency range
khenaidooabad44c2018-08-03 16:58:35 -040053)
54
55type consumerChannels struct {
56 consumer sarama.PartitionConsumer
57 channels []chan *ca.InterContainerMessage
58}
59
60// KafkaMessagingProxy represents the messaging proxy
61type KafkaMessagingProxy struct {
62 KafkaHost string
63 KafkaPort int
64 DefaultTopic *Topic
65 TargetInterface interface{}
66 producer sarama.AsyncProducer
67 consumer sarama.Consumer
68 doneCh chan int
khenaidoob9203542018-09-17 22:56:37 -040069 waitForResponseRoutineStarted bool
khenaidooabad44c2018-08-03 16:58:35 -040070 topicToConsumerChannelMap map[string]*consumerChannels
71 transactionIdToChannelMap map[string]chan *ca.InterContainerMessage
72 lockTopicToConsumerChannelMap sync.RWMutex
73 lockTransactionIdToChannelMap sync.RWMutex
74}
75
76type KafkaProxyOption func(*KafkaMessagingProxy)
77
78func KafkaHost(host string) KafkaProxyOption {
79 return func(args *KafkaMessagingProxy) {
80 args.KafkaHost = host
81 }
82}
83
84func KafkaPort(port int) KafkaProxyOption {
85 return func(args *KafkaMessagingProxy) {
86 args.KafkaPort = port
87 }
88}
89
90func DefaultTopic(topic *Topic) KafkaProxyOption {
91 return func(args *KafkaMessagingProxy) {
92 args.DefaultTopic = topic
93 }
94}
95
96func TargetInterface(target interface{}) KafkaProxyOption {
97 return func(args *KafkaMessagingProxy) {
98 args.TargetInterface = target
99 }
100}
101
102func NewKafkaMessagingProxy(opts ...KafkaProxyOption) (*KafkaMessagingProxy, error) {
103 proxy := &KafkaMessagingProxy{
104 KafkaHost: DefaultKafkaHost,
105 KafkaPort: DefaultKafkaPort,
106 DefaultTopic: &Topic{Name: DefaultTopicName},
107 }
108
109 for _, option := range opts {
110 option(proxy)
111 }
112
113 // Create the locks for all the maps
114 proxy.lockTopicToConsumerChannelMap = sync.RWMutex{}
115 proxy.lockTransactionIdToChannelMap = sync.RWMutex{}
116
117 return proxy, nil
118}
119
120func (kp *KafkaMessagingProxy) Start() error {
121 log.Info("Starting-Proxy")
122
123 // Create the Done channel
124 kp.doneCh = make(chan int, 1)
125
126 // Create the Publisher
127 if err := kp.createPublisher(DefaultMaxRetries); err != nil {
128 log.Errorw("Cannot-create-kafka-publisher", log.Fields{"error": err})
129 return err
130 }
131
132 // Create the master consumer
133 if err := kp.createConsumer(DefaultMaxRetries); err != nil {
134 log.Errorw("Cannot-create-kafka-consumer", log.Fields{"error": err})
135 return err
136 }
137
138 // Create the topic to consumer/channel map
139 kp.topicToConsumerChannelMap = make(map[string]*consumerChannels)
140
141 // Create the transactionId to Channel Map
142 kp.transactionIdToChannelMap = make(map[string]chan *ca.InterContainerMessage)
143
144 return nil
145}
146
147func (kp *KafkaMessagingProxy) Stop() {
148 log.Info("Stopping-Proxy")
149 if kp.producer != nil {
150 if err := kp.producer.Close(); err != nil {
151 panic(err)
152 }
153 }
154 if kp.consumer != nil {
155 if err := kp.consumer.Close(); err != nil {
156 panic(err)
157 }
158 }
159 //Close the done channel to close all long processing Go routines
160 close(kp.doneCh)
161}
162
163func (kp *KafkaMessagingProxy) InvokeRPC(ctx context.Context, rpc string, topic *Topic, waitForResponse bool,
164 kvArgs ...*KVArg) (bool, *any.Any) {
165 // Encode the request
166 protoRequest, err := encodeRequest(rpc, topic, kp.DefaultTopic, kvArgs...)
167 if err != nil {
168 log.Warnw("cannot-format-request", log.Fields{"rpc": rpc, "error": err})
169 return false, nil
170 }
171
172 // Subscribe for response, if needed, before sending request
173 var ch <-chan *ca.InterContainerMessage
174 if waitForResponse {
175 var err error
176 if ch, err = kp.subscribeForResponse(*kp.DefaultTopic, protoRequest.Header.Id); err != nil {
177 log.Errorw("failed-to-subscribe-for-response", log.Fields{"error": err, "topic": topic.Name})
178 }
179 }
180
181 // Send request
182 go kp.sendToKafkaTopic(protoRequest, topic)
183
184 if waitForResponse {
khenaidoob9203542018-09-17 22:56:37 -0400185 // Create a child context based on the parent context, if any
khenaidooabad44c2018-08-03 16:58:35 -0400186 var cancel context.CancelFunc
khenaidoob9203542018-09-17 22:56:37 -0400187 childCtx := context.Background()
khenaidooabad44c2018-08-03 16:58:35 -0400188 if ctx == nil {
189 ctx, cancel = context.WithTimeout(context.Background(), DefaultRequestTimeout*time.Millisecond)
khenaidoob9203542018-09-17 22:56:37 -0400190 } else {
191 childCtx, cancel = context.WithTimeout(ctx, DefaultRequestTimeout*time.Millisecond)
khenaidooabad44c2018-08-03 16:58:35 -0400192 }
khenaidoob9203542018-09-17 22:56:37 -0400193 defer cancel()
khenaidooabad44c2018-08-03 16:58:35 -0400194
195 // Wait for response as well as timeout or cancellation
196 // Remove the subscription for a response on return
197 defer kp.unSubscribeForResponse(protoRequest.Header.Id)
198 select {
199 case msg := <-ch:
200 log.Debugw("received-response", log.Fields{"rpc": rpc, "msg": msg})
201
202 var responseBody *ca.InterContainerResponseBody
203 var err error
204 if responseBody, err = decodeResponse(msg); err != nil {
205 log.Errorw("decode-response-error", log.Fields{"error": err})
206 }
207 return responseBody.Success, responseBody.Result
208 case <-ctx.Done():
209 log.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
210 // pack the error as proto any type
211 protoError := &ca.Error{Reason: ctx.Err().Error()}
212 var marshalledArg *any.Any
213 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
214 return false, nil // Should never happen
215 }
216 return false, marshalledArg
khenaidoob9203542018-09-17 22:56:37 -0400217 case <-childCtx.Done():
218 log.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": childCtx.Err()})
219 // pack the error as proto any type
220 protoError := &ca.Error{Reason: childCtx.Err().Error()}
221 var marshalledArg *any.Any
222 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
223 return false, nil // Should never happen
224 }
225 return false, marshalledArg
khenaidooabad44c2018-08-03 16:58:35 -0400226 case <-kp.doneCh:
227 log.Infow("received-exit-signal", log.Fields{"topic": topic.Name, "rpc": rpc})
228 return true, nil
229 }
230 }
231 return true, nil
232}
233
234// Subscribe allows a caller to subscribe to a given topic. A channel is returned to the
235// caller to receive messages from that topic.
236func (kp *KafkaMessagingProxy) Subscribe(topic Topic) (<-chan *ca.InterContainerMessage, error) {
237
238 log.Debugw("subscribe", log.Fields{"topic": topic.Name})
239
240 if consumerCh := kp.getConsumerChannel(topic); consumerCh != nil {
241 log.Debugw("topic-already-subscribed", log.Fields{"topic": topic.Name})
242 // Create a channel specific for that consumer and add it to the consumer channel map
243 ch := make(chan *ca.InterContainerMessage)
244 kp.addChannelToConsumerChannelMap(topic, ch)
245 return ch, nil
246 }
247
248 // Register for the topic and set it up
249 var consumerListeningChannel chan *ca.InterContainerMessage
250 var err error
251 if consumerListeningChannel, err = kp.setupConsumerChannel(topic); err != nil {
252 log.Warnw("create-consumer-channel-failure", log.Fields{"error": err, "topic": topic.Name})
253 return nil, err
254 }
255
256 return consumerListeningChannel, nil
257}
258
259func (kp *KafkaMessagingProxy) UnSubscribe(topic Topic, ch <-chan *ca.InterContainerMessage) error {
260 log.Debugw("unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
261 err := kp.removeChannelFromConsumerChannelMap(topic, ch)
262 return err
263}
264
265// SubscribeWithTarget allows a caller to assign a target object to be invoked automatically
266// when a message is received on a given topic
267func (kp *KafkaMessagingProxy) SubscribeWithTarget(topic Topic, targetInterface interface{}) error {
268
269 // Subscribe to receive messages for that topic
270 var ch <-chan *ca.InterContainerMessage
271 var err error
272 if ch, err = kp.Subscribe(topic); err != nil {
273 log.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
274 }
275 // Launch a go routine to receive and process kafka messages
276 go kp.waitForRequest(ch, topic, targetInterface)
277
278 return nil
279}
280
281func (kp *KafkaMessagingProxy) UnSubscribeTarget(ctx context.Context, topic Topic, targetInterface interface{}) error {
282 // TODO - mostly relevant with multiple interfaces
283 return nil
284}
285
khenaidoob9203542018-09-17 22:56:37 -0400286func (kp *KafkaMessagingProxy) addTopicToConsumerChannelMap(id string, arg *consumerChannels) {
khenaidooabad44c2018-08-03 16:58:35 -0400287 kp.lockTopicToConsumerChannelMap.Lock()
288 defer kp.lockTopicToConsumerChannelMap.Unlock()
289 if _, exist := kp.topicToConsumerChannelMap[id]; !exist {
290 kp.topicToConsumerChannelMap[id] = arg
291 }
292}
293
294func (kp *KafkaMessagingProxy) deleteFromTopicToConsumerChannelMap(id string) {
295 kp.lockTopicToConsumerChannelMap.Lock()
296 defer kp.lockTopicToConsumerChannelMap.Unlock()
297 if _, exist := kp.topicToConsumerChannelMap[id]; exist {
298 delete(kp.topicToConsumerChannelMap, id)
299 }
300}
301
302func (kp *KafkaMessagingProxy) getConsumerChannel(topic Topic) *consumerChannels {
303 kp.lockTopicToConsumerChannelMap.Lock()
304 defer kp.lockTopicToConsumerChannelMap.Unlock()
305
306 if consumerCh, exist := kp.topicToConsumerChannelMap[topic.Name]; exist {
307 return consumerCh
308 }
309 return nil
310}
311
312func (kp *KafkaMessagingProxy) addChannelToConsumerChannelMap(topic Topic, ch chan *ca.InterContainerMessage) {
313 kp.lockTopicToConsumerChannelMap.Lock()
314 defer kp.lockTopicToConsumerChannelMap.Unlock()
315 if consumerCh, exist := kp.topicToConsumerChannelMap[topic.Name]; exist {
316 consumerCh.channels = append(consumerCh.channels, ch)
317 return
318 }
319 log.Warnw("consumer-channel-not-exist", log.Fields{"topic": topic.Name})
320}
321
322func (kp *KafkaMessagingProxy) removeChannelFromConsumerChannelMap(topic Topic, ch <-chan *ca.InterContainerMessage) error {
323 kp.lockTopicToConsumerChannelMap.Lock()
324 defer kp.lockTopicToConsumerChannelMap.Unlock()
325 if consumerCh, exist := kp.topicToConsumerChannelMap[topic.Name]; exist {
326 // Channel will be closed in the removeChannel method
327 consumerCh.channels = removeChannel(consumerCh.channels, ch)
328 return nil
329 }
330 log.Warnw("topic-does-not-exist", log.Fields{"topic": topic.Name})
331 return errors.New("topic-does-not-exist")
332}
333
334func (kp *KafkaMessagingProxy) addToTransactionIdToChannelMap(id string, arg chan *ca.InterContainerMessage) {
335 kp.lockTransactionIdToChannelMap.Lock()
336 defer kp.lockTransactionIdToChannelMap.Unlock()
337 if _, exist := kp.transactionIdToChannelMap[id]; !exist {
338 kp.transactionIdToChannelMap[id] = arg
339 }
340}
341
342func (kp *KafkaMessagingProxy) deleteFromTransactionIdToChannelMap(id string) {
343 kp.lockTransactionIdToChannelMap.Lock()
344 defer kp.lockTransactionIdToChannelMap.Unlock()
345 if _, exist := kp.transactionIdToChannelMap[id]; exist {
346 delete(kp.transactionIdToChannelMap, id)
347 }
348}
349
350func (kp *KafkaMessagingProxy) createPublisher(retries int) error {
351 // This Creates the publisher
352 config := sarama.NewConfig()
353 config.Producer.Partitioner = sarama.NewRandomPartitioner
354 config.Producer.Flush.Frequency = time.Duration(DefaultFlushFrequency)
355 config.Producer.Flush.Messages = DefaultFlushMessages
356 config.Producer.Flush.MaxMessages = DefaultFlushMaxmessages
357 config.Producer.Return.Errors = DefaultReturnErrors
358 config.Producer.Return.Successes = DefaultReturnSuccess
359 config.Producer.RequiredAcks = sarama.WaitForAll
360 kafkaFullAddr := fmt.Sprintf("%s:%d", kp.KafkaHost, kp.KafkaPort)
361 brokers := []string{kafkaFullAddr}
362
363 for {
364 producer, err := sarama.NewAsyncProducer(brokers, config)
365 if err != nil {
366 if retries == 0 {
367 log.Errorw("error-starting-publisher", log.Fields{"error": err})
368 return err
369 } else {
370 // If retries is -ve then we will retry indefinitely
371 retries--
372 }
373 log.Info("retrying-after-a-second-delay")
374 time.Sleep(time.Duration(DefaultSleepOnError) * time.Second)
375 } else {
376 kp.producer = producer
377 break
378 }
379 }
380 log.Info("Kafka-publisher-created")
381 return nil
382}
383
384func (kp *KafkaMessagingProxy) createConsumer(retries int) error {
385 config := sarama.NewConfig()
386 config.Consumer.Return.Errors = true
387 config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
388 config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
389 config.Consumer.Offsets.Initial = sarama.OffsetNewest
390 kafkaFullAddr := fmt.Sprintf("%s:%d", kp.KafkaHost, kp.KafkaPort)
391 brokers := []string{kafkaFullAddr}
392
393 for {
394 consumer, err := sarama.NewConsumer(brokers, config)
395 if err != nil {
396 if retries == 0 {
397 log.Errorw("error-starting-consumer", log.Fields{"error": err})
398 return err
399 } else {
400 // If retries is -ve then we will retry indefinitely
401 retries--
402 }
403 log.Info("retrying-after-a-second-delay")
404 time.Sleep(time.Duration(DefaultSleepOnError) * time.Second)
405 } else {
406 kp.consumer = consumer
407 break
408 }
409 }
410 log.Info("Kafka-consumer-created")
411 return nil
412}
413
414func encodeReturnedValue(request *ca.InterContainerMessage, returnedVal interface{}) (*any.Any, error) {
415 // Encode the response argument - needs to be a proto message
416 if returnedVal == nil {
417 return nil, nil
418 }
419 protoValue, ok := returnedVal.(proto.Message)
420 if !ok {
421 log.Warnw("response-value-not-proto-message", log.Fields{"error": ok, "returnVal": returnedVal})
422 err := errors.New("response-value-not-proto-message")
423 return nil, err
424 }
425
426 // Marshal the returned value, if any
427 var marshalledReturnedVal *any.Any
428 var err error
429 if marshalledReturnedVal, err = ptypes.MarshalAny(protoValue); err != nil {
430 log.Warnw("cannot-marshal-returned-val", log.Fields{"error": err})
431 return nil, err
432 }
433 return marshalledReturnedVal, nil
434}
435
436func encodeDefaultFailedResponse(request *ca.InterContainerMessage) *ca.InterContainerMessage {
437 responseHeader := &ca.Header{
438 Id: request.Header.Id,
439 Type: ca.MessageType_RESPONSE,
440 FromTopic: request.Header.ToTopic,
441 ToTopic: request.Header.FromTopic,
442 Timestamp: time.Now().Unix(),
443 }
444 responseBody := &ca.InterContainerResponseBody{
445 Success: false,
446 Result: nil,
447 }
448 var marshalledResponseBody *any.Any
449 var err error
450 // Error should never happen here
451 if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
452 log.Warnw("cannot-marshal-failed-response-body", log.Fields{"error": err})
453 }
454
455 return &ca.InterContainerMessage{
456 Header: responseHeader,
457 Body: marshalledResponseBody,
458 }
459
460}
461
462//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
463//or an error on failure
464func encodeResponse(request *ca.InterContainerMessage, success bool, returnedValues ...interface{}) (*ca.InterContainerMessage, error) {
465
466 log.Infow("encodeResponse", log.Fields{"success": success, "returnedValues": returnedValues})
467 responseHeader := &ca.Header{
468 Id: request.Header.Id,
469 Type: ca.MessageType_RESPONSE,
470 FromTopic: request.Header.ToTopic,
471 ToTopic: request.Header.FromTopic,
472 Timestamp: time.Now().Unix(),
473 }
474
475 // Go over all returned values
476 var marshalledReturnedVal *any.Any
477 var err error
478 for _, returnVal := range returnedValues {
479 if marshalledReturnedVal, err = encodeReturnedValue(request, returnVal); err != nil {
480 log.Warnw("cannot-marshal-response-body", log.Fields{"error": err})
481 }
482 break // for now we support only 1 returned value - (excluding the error)
483 }
484
485 responseBody := &ca.InterContainerResponseBody{
486 Success: success,
487 Result: marshalledReturnedVal,
488 }
489
490 // Marshal the response body
491 var marshalledResponseBody *any.Any
492 if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
493 log.Warnw("cannot-marshal-response-body", log.Fields{"error": err})
494 return nil, err
495 }
496
497 return &ca.InterContainerMessage{
498 Header: responseHeader,
499 Body: marshalledResponseBody,
500 }, nil
501}
502
503func CallFuncByName(myClass interface{}, funcName string, params ...interface{}) (out []reflect.Value, err error) {
504 myClassValue := reflect.ValueOf(myClass)
505 m := myClassValue.MethodByName(funcName)
506 if !m.IsValid() {
507 return make([]reflect.Value, 0), fmt.Errorf("Method not found \"%s\"", funcName)
508 }
509 in := make([]reflect.Value, len(params))
510 for i, param := range params {
511 in[i] = reflect.ValueOf(param)
512 }
513 out = m.Call(in)
514 return
515}
516
517func (kp *KafkaMessagingProxy) handleRequest(msg *ca.InterContainerMessage, targetInterface interface{}) {
518
519 // First extract the header to know whether this is a request of a response
520 if msg.Header.Type == ca.MessageType_REQUEST {
521 log.Debugw("received-request", log.Fields{"header": msg.Header})
522
523 var out []reflect.Value
524 var err error
525
526 // Get the request body
527 requestBody := &ca.InterContainerRequestBody{}
528 if err = ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
529 log.Warnw("cannot-unmarshal-request", log.Fields{"error": err})
530 } else {
531 // let the callee unpack the arguments as its the only one that knows the real proto type
532 out, err = CallFuncByName(targetInterface, requestBody.Rpc, requestBody.Args)
533 if err != nil {
534 log.Warn(err)
535 }
536 }
537 // Response required?
538 if requestBody.ResponseRequired {
539 // If we already have an error before then just return that
540 var returnError *ca.Error
541 var returnedValues []interface{}
542 var success bool
543 if err != nil {
544 returnError = &ca.Error{Reason: err.Error()}
545 returnedValues = make([]interface{}, 1)
546 returnedValues[0] = returnError
547 } else {
548 log.Debugw("returned-api-response", log.Fields{"len": len(out), "err": err})
khenaidoob9203542018-09-17 22:56:37 -0400549 returnedValues = make([]interface{}, 0)
550 // Check for errors first
551 lastIndex := len(out) - 1
552 if out[lastIndex].Interface() != nil { // Error
553 if goError, ok := out[lastIndex].Interface().(error); ok {
554 returnError = &ca.Error{Reason: goError.Error()}
555 returnedValues = append(returnedValues, returnError)
556 } else { // Should never happen
557 returnError = &ca.Error{Reason: "incorrect-error-returns"}
558 returnedValues = append(returnedValues, returnError)
559 }
560 } else { // Non-error case
561 success = true
562 for idx, val := range out {
563 log.Debugw("returned-api-response-loop", log.Fields{"idx": idx, "val": val.Interface()})
564 if idx != lastIndex {
565 returnedValues = append(returnedValues, val.Interface())
khenaidooabad44c2018-08-03 16:58:35 -0400566 }
khenaidooabad44c2018-08-03 16:58:35 -0400567 }
568 }
569 }
570
571 var icm *ca.InterContainerMessage
572 if icm, err = encodeResponse(msg, success, returnedValues...); err != nil {
khenaidoob9203542018-09-17 22:56:37 -0400573 log.Warnw("error-encoding-response-returning-failure-result", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400574 icm = encodeDefaultFailedResponse(msg)
575 }
khenaidoob9203542018-09-17 22:56:37 -0400576 log.Debugw("sending-to-kafka", log.Fields{"msg": icm, "send-to-topic": msg.Header.FromTopic})
khenaidooabad44c2018-08-03 16:58:35 -0400577 kp.sendToKafkaTopic(icm, &Topic{Name: msg.Header.FromTopic})
578 }
579
580 } else if msg.Header.Type == ca.MessageType_RESPONSE {
581 log.Warnw("received-response-on-request-handler", log.Fields{"header": msg.Header})
582 } else {
583 log.Errorw("invalid-message", log.Fields{"header": msg.Header})
584 }
585}
586
587func (kp *KafkaMessagingProxy) waitForRequest(ch <-chan *ca.InterContainerMessage, topic Topic, targetInterface interface{}) {
588 // Wait for messages
589 for msg := range ch {
590 log.Debugw("request-received", log.Fields{"msg": msg, "topic": topic.Name, "target": targetInterface})
591 go kp.handleRequest(msg, targetInterface)
592 }
593}
594
595// dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
596// topic via the unique channel each subsciber received during subscription
597func (kp *KafkaMessagingProxy) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ca.InterContainerMessage) {
598 // Need to go over all channels and publish messages to them - do we need to copy msg?
599 kp.lockTopicToConsumerChannelMap.Lock()
600 defer kp.lockTopicToConsumerChannelMap.Unlock()
601 for _, ch := range consumerCh.channels {
602 go func(c chan *ca.InterContainerMessage) {
603 c <- protoMessage
604 }(ch)
605 }
606}
607
608func (kp *KafkaMessagingProxy) consumeMessagesLoop(topic Topic) {
609 log.Debugw("starting-consuming-messages", log.Fields{"topic": topic.Name})
610 var consumerCh *consumerChannels
611 if consumerCh = kp.getConsumerChannel(topic); consumerCh == nil {
612 log.Errorw("consumer-not-exist", log.Fields{"topic": topic.Name})
613 return
614 }
615startloop:
616 for {
617 select {
618 case err := <-consumerCh.consumer.Errors():
619 log.Warnw("consumer-error", log.Fields{"error": err})
620 case msg := <-consumerCh.consumer.Messages():
khenaidoob9203542018-09-17 22:56:37 -0400621 //log.Debugw("message-received", log.Fields{"msg": msg})
khenaidooabad44c2018-08-03 16:58:35 -0400622 // Since the only expected message is a proto intercontainermessage then extract it right away
623 // instead of dispatching it to the consumers
624 msgBody := msg.Value
625 icm := &ca.InterContainerMessage{}
626 if err := proto.Unmarshal(msgBody, icm); err != nil {
627 log.Warnw("invalid-message", log.Fields{"error": err})
628 continue
629 }
khenaidoob9203542018-09-17 22:56:37 -0400630 if icm.Header.Type == ca.MessageType_REQUEST {
631 log.Debugw("request-received", log.Fields{"msg": *icm, "len": len(consumerCh.channels)})
632 go kp.dispatchToConsumers(consumerCh, icm)
633 } else if icm.Header.Type == ca.MessageType_RESPONSE {
634 log.Debugw("response-received", log.Fields{"msg": *icm, "len": len(consumerCh.channels)})
635 go kp.dispatchResponse(icm)
636 } else {
637 log.Debugw("unsupported-msg-received", log.Fields{"msg": *icm})
638 }
639 //// TODO: Dispatch requests and responses separately
640 //go kp.dispatchToConsumers(consumerCh, icm)
khenaidooabad44c2018-08-03 16:58:35 -0400641 case <-kp.doneCh:
642 log.Infow("received-exit-signal", log.Fields{"topic": topic.Name})
643 break startloop
644 }
645 }
646}
647
648func (kp *KafkaMessagingProxy) dispatchResponse(msg *ca.InterContainerMessage) {
649 kp.lockTransactionIdToChannelMap.Lock()
650 defer kp.lockTransactionIdToChannelMap.Unlock()
651 if _, exist := kp.transactionIdToChannelMap[msg.Header.Id]; !exist {
652 log.Debugw("no-waiting-channel", log.Fields{"transaction": msg.Header.Id})
653 return
654 }
655 kp.transactionIdToChannelMap[msg.Header.Id] <- msg
656}
657
658func (kp *KafkaMessagingProxy) waitForResponse(ch chan *ca.InterContainerMessage, topic Topic) {
659 log.Debugw("starting-consuming-responses-loop", log.Fields{"topic": topic.Name})
khenaidoob9203542018-09-17 22:56:37 -0400660 kp.waitForResponseRoutineStarted = true
khenaidooabad44c2018-08-03 16:58:35 -0400661startloop:
662 for {
663 select {
664 case msg := <-ch:
665 log.Debugw("message-received", log.Fields{"topic": topic.Name, "msg": msg})
666 go kp.dispatchResponse(msg)
667 // Need to handle program exit - TODO
668 case <-kp.doneCh:
669 log.Infow("received-exit-signal", log.Fields{"topic": topic.Name})
670 break startloop
671 }
672 }
673}
674
675// createConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
676// for that topic. It also starts the routine that listens for messages on that topic.
677func (kp *KafkaMessagingProxy) setupConsumerChannel(topic Topic) (chan *ca.InterContainerMessage, error) {
678
679 if consumerCh := kp.getConsumerChannel(topic); consumerCh != nil {
680 return nil, nil // Already created, so just ignore
681 }
682
683 partitionList, err := kp.consumer.Partitions(topic.Name)
684 if err != nil {
685 log.Warnw("get-partition-failure", log.Fields{"error": err, "topic": topic.Name})
686 return nil, err
687 }
688
689 log.Debugw("partitions", log.Fields{"topic": topic.Name, "partitionList": partitionList, "first": partitionList[0]})
690 // Create a partition consumer for that topic - for now just use one partition
691 var pConsumer sarama.PartitionConsumer
692 if pConsumer, err = kp.consumer.ConsumePartition(topic.Name, partitionList[0], sarama.OffsetNewest); err != nil {
693 log.Warnw("consumer-partition-failure", log.Fields{"error": err, "topic": topic.Name})
694 return nil, err
695 }
696
697 // Create the consumer/channel structure and set the consumer and create a channel on that topic - for now
698 // unbuffered to verify race conditions.
699 consumerListeningChannel := make(chan *ca.InterContainerMessage)
700 cc := &consumerChannels{
701 consumer: pConsumer,
702 channels: []chan *ca.InterContainerMessage{consumerListeningChannel},
703 }
704
705 // Add the consumer channel to the map
khenaidoob9203542018-09-17 22:56:37 -0400706 kp.addTopicToConsumerChannelMap(topic.Name, cc)
khenaidooabad44c2018-08-03 16:58:35 -0400707
708 //Start a consumer to listen on that specific topic
709 go kp.consumeMessagesLoop(topic)
710
711 return consumerListeningChannel, nil
712}
713
714// subscribeForResponse allows a caller to subscribe to a given topic when waiting for a response.
715// This method is built to prevent all subscribers to receive all messages as is the case of the Subscribe
716// API. There is one response channel waiting for kafka messages before dispatching the message to the
717// corresponding waiting channel
718func (kp *KafkaMessagingProxy) subscribeForResponse(topic Topic, trnsId string) (chan *ca.InterContainerMessage, error) {
khenaidoob9203542018-09-17 22:56:37 -0400719 log.Debugw("subscribeForResponse", log.Fields{"topic": topic.Name, "trnsid": trnsId})
khenaidooabad44c2018-08-03 16:58:35 -0400720
721 if consumerCh := kp.getConsumerChannel(topic); consumerCh == nil {
722 log.Debugw("topic-not-subscribed", log.Fields{"topic": topic.Name})
khenaidooabad44c2018-08-03 16:58:35 -0400723 var err error
khenaidoob9203542018-09-17 22:56:37 -0400724
725 if _, err = kp.setupConsumerChannel(topic); err != nil {
khenaidooabad44c2018-08-03 16:58:35 -0400726 log.Warnw("create-consumer-channel-failure", log.Fields{"error": err, "topic": topic.Name})
727 return nil, err
728 }
khenaidooabad44c2018-08-03 16:58:35 -0400729 }
730
731 ch := make(chan *ca.InterContainerMessage)
732 kp.addToTransactionIdToChannelMap(trnsId, ch)
733
734 return ch, nil
735}
736
737func removeChannel(channels []chan *ca.InterContainerMessage, ch <-chan *ca.InterContainerMessage) []chan *ca.InterContainerMessage {
738 var i int
739 var channel chan *ca.InterContainerMessage
740 for i, channel = range channels {
741 if channel == ch {
742 channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]
743 close(channel)
744 return channels[:len(channels)-1]
745 }
746 }
747 return channels
748}
749
750func (kp *KafkaMessagingProxy) unSubscribeForResponse(trnsId string) error {
751 log.Debugw("unsubscribe-for-response", log.Fields{"trnsId": trnsId})
752 // Close the channel first
753 close(kp.transactionIdToChannelMap[trnsId])
754 kp.deleteFromTransactionIdToChannelMap(trnsId)
755 return nil
756}
757
758//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
759//or an error on failure
760func encodeRequest(rpc string, toTopic *Topic, replyTopic *Topic, kvArgs ...*KVArg) (*ca.InterContainerMessage, error) {
761 requestHeader := &ca.Header{
762 Id: uuid.New().String(),
763 Type: ca.MessageType_REQUEST,
764 FromTopic: replyTopic.Name,
765 ToTopic: toTopic.Name,
766 Timestamp: time.Now().Unix(),
767 }
768 requestBody := &ca.InterContainerRequestBody{
769 Rpc: rpc,
770 ResponseRequired: true,
771 ReplyToTopic: replyTopic.Name,
772 }
773
774 for _, arg := range kvArgs {
775 var marshalledArg *any.Any
776 var err error
777 // ascertain the value interface type is a proto.Message
778 protoValue, ok := arg.Value.(proto.Message)
779 if !ok {
780 log.Warnw("argument-value-not-proto-message", log.Fields{"error": ok, "Value": arg.Value})
781 err := errors.New("argument-value-not-proto-message")
782 return nil, err
783 }
784 if marshalledArg, err = ptypes.MarshalAny(protoValue); err != nil {
785 log.Warnw("cannot-marshal-request", log.Fields{"error": err})
786 return nil, err
787 }
788 protoArg := &ca.Argument{
789 Key: arg.Key,
790 Value: marshalledArg,
791 }
792 requestBody.Args = append(requestBody.Args, protoArg)
793 }
794
795 var marshalledData *any.Any
796 var err error
797 if marshalledData, err = ptypes.MarshalAny(requestBody); err != nil {
798 log.Warnw("cannot-marshal-request", log.Fields{"error": err})
799 return nil, err
800 }
801 request := &ca.InterContainerMessage{
802 Header: requestHeader,
803 Body: marshalledData,
804 }
805 return request, nil
806}
807
808// sendRequest formats and sends the request onto the kafka messaging bus. It waits for the
809// response if needed. This function must, therefore, be run in its own routine.
810func (kp *KafkaMessagingProxy) sendToKafkaTopic(msg *ca.InterContainerMessage, topic *Topic) {
811
812 // Create the Sarama producer message
813 time := time.Now().Unix()
814 marshalled, _ := proto.Marshal(msg)
815 kafkaMsg := &sarama.ProducerMessage{
816 Topic: topic.Name,
817 Key: sarama.StringEncoder(time),
818 Value: sarama.ByteEncoder(marshalled),
819 }
820
821 // Send message to kafka
822 kp.producer.Input() <- kafkaMsg
823
824}
825
826func decodeResponse(response *ca.InterContainerMessage) (*ca.InterContainerResponseBody, error) {
827 // Extract the message body
828 responseBody := ca.InterContainerResponseBody{}
829
830 log.Debugw("decodeResponse", log.Fields{"icr": &response})
831 if err := ptypes.UnmarshalAny(response.Body, &responseBody); err != nil {
832 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
833 return nil, err
834 }
835 log.Debugw("decodeResponse", log.Fields{"icrbody": &responseBody})
836
837 return &responseBody, nil
838
839}