blob: 652bdfa3b02bbf3fe47b1e8f6246df2ec7c44c1f [file] [log] [blame]
Scott Baker2c1c4822019-10-16 11:02:41 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kafka
17
18import (
19 "context"
20 "errors"
21 "fmt"
22 "github.com/golang/protobuf/proto"
23 "github.com/golang/protobuf/ptypes"
24 "github.com/golang/protobuf/ptypes/any"
25 "github.com/google/uuid"
Scott Bakerce767002019-10-23 13:30:24 -070026 "github.com/opencord/voltha-lib-go/v2/pkg/log"
Scott Bakerf1b096c2019-11-01 12:36:30 -070027 ic "github.com/opencord/voltha-protos/v2/go/inter_container"
Scott Baker2c1c4822019-10-16 11:02:41 -070028 "reflect"
29 "strings"
30 "sync"
31 "time"
32)
33
Scott Baker2c1c4822019-10-16 11:02:41 -070034const (
35 DefaultMaxRetries = 3
36 DefaultRequestTimeout = 10000 // 10000 milliseconds - to handle a wider latency range
37)
38
39const (
40 TransactionKey = "transactionID"
41 FromTopic = "fromTopic"
42)
43
44var ErrorTransactionNotAcquired = errors.New("transaction-not-acquired")
45var ErrorTransactionInvalidId = errors.New("transaction-invalid-id")
46
47// requestHandlerChannel represents an interface associated with a channel. Whenever, an event is
48// obtained from that channel, this interface is invoked. This is used to handle
49// async requests into the Core via the kafka messaging bus
50type requestHandlerChannel struct {
51 requesthandlerInterface interface{}
52 ch <-chan *ic.InterContainerMessage
53}
54
55// transactionChannel represents a combination of a topic and a channel onto which a response received
56// on the kafka bus will be sent to
57type transactionChannel struct {
58 topic *Topic
59 ch chan *ic.InterContainerMessage
60}
61
62// InterContainerProxy represents the messaging proxy
63type InterContainerProxy struct {
64 kafkaHost string
65 kafkaPort int
66 DefaultTopic *Topic
67 defaultRequestHandlerInterface interface{}
68 deviceDiscoveryTopic *Topic
69 kafkaClient Client
70 doneCh chan int
71
72 // This map is used to map a topic to an interface and channel. When a request is received
73 // on that channel (registered to the topic) then that interface is invoked.
74 topicToRequestHandlerChannelMap map[string]*requestHandlerChannel
75 lockTopicRequestHandlerChannelMap sync.RWMutex
76
77 // This map is used to map a channel to a response topic. This channel handles all responses on that
78 // channel for that topic and forward them to the appropriate consumers channel, using the
79 // transactionIdToChannelMap.
80 topicToResponseChannelMap map[string]<-chan *ic.InterContainerMessage
81 lockTopicResponseChannelMap sync.RWMutex
82
83 // This map is used to map a transaction to a consumers channel. This is used whenever a request has been
84 // sent out and we are waiting for a response.
85 transactionIdToChannelMap map[string]*transactionChannel
86 lockTransactionIdToChannelMap sync.RWMutex
87}
88
89type InterContainerProxyOption func(*InterContainerProxy)
90
91func InterContainerHost(host string) InterContainerProxyOption {
92 return func(args *InterContainerProxy) {
93 args.kafkaHost = host
94 }
95}
96
97func InterContainerPort(port int) InterContainerProxyOption {
98 return func(args *InterContainerProxy) {
99 args.kafkaPort = port
100 }
101}
102
103func DefaultTopic(topic *Topic) InterContainerProxyOption {
104 return func(args *InterContainerProxy) {
105 args.DefaultTopic = topic
106 }
107}
108
109func DeviceDiscoveryTopic(topic *Topic) InterContainerProxyOption {
110 return func(args *InterContainerProxy) {
111 args.deviceDiscoveryTopic = topic
112 }
113}
114
115func RequestHandlerInterface(handler interface{}) InterContainerProxyOption {
116 return func(args *InterContainerProxy) {
117 args.defaultRequestHandlerInterface = handler
118 }
119}
120
121func MsgClient(client Client) InterContainerProxyOption {
122 return func(args *InterContainerProxy) {
123 args.kafkaClient = client
124 }
125}
126
127func NewInterContainerProxy(opts ...InterContainerProxyOption) (*InterContainerProxy, error) {
128 proxy := &InterContainerProxy{
129 kafkaHost: DefaultKafkaHost,
130 kafkaPort: DefaultKafkaPort,
131 }
132
133 for _, option := range opts {
134 option(proxy)
135 }
136
137 // Create the locks for all the maps
138 proxy.lockTopicRequestHandlerChannelMap = sync.RWMutex{}
139 proxy.lockTransactionIdToChannelMap = sync.RWMutex{}
140 proxy.lockTopicResponseChannelMap = sync.RWMutex{}
141
142 return proxy, nil
143}
144
145func (kp *InterContainerProxy) Start() error {
khenaidoob332f9b2020-01-16 16:25:26 -0500146 logger.Info("Starting-Proxy")
Scott Baker2c1c4822019-10-16 11:02:41 -0700147
148 // Kafka MsgClient should already have been created. If not, output fatal error
149 if kp.kafkaClient == nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500150 logger.Fatal("kafka-client-not-set")
Scott Baker2c1c4822019-10-16 11:02:41 -0700151 }
152
153 // Create the Done channel
154 kp.doneCh = make(chan int, 1)
155
156 // Start the kafka client
157 if err := kp.kafkaClient.Start(); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500158 logger.Errorw("Cannot-create-kafka-proxy", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700159 return err
160 }
161
162 // Create the topic to response channel map
163 kp.topicToResponseChannelMap = make(map[string]<-chan *ic.InterContainerMessage)
164 //
165 // Create the transactionId to Channel Map
166 kp.transactionIdToChannelMap = make(map[string]*transactionChannel)
167
168 // Create the topic to request channel map
169 kp.topicToRequestHandlerChannelMap = make(map[string]*requestHandlerChannel)
170
171 return nil
172}
173
174func (kp *InterContainerProxy) Stop() {
khenaidoob332f9b2020-01-16 16:25:26 -0500175 logger.Info("stopping-intercontainer-proxy")
Scott Baker2c1c4822019-10-16 11:02:41 -0700176 kp.doneCh <- 1
177 // TODO : Perform cleanup
178 kp.kafkaClient.Stop()
179 //kp.deleteAllTopicRequestHandlerChannelMap()
180 //kp.deleteAllTopicResponseChannelMap()
181 //kp.deleteAllTransactionIdToChannelMap()
182}
183
184// DeviceDiscovered publish the discovered device onto the kafka messaging bus
185func (kp *InterContainerProxy) DeviceDiscovered(deviceId string, deviceType string, parentId string, publisher string) error {
khenaidoob332f9b2020-01-16 16:25:26 -0500186 logger.Debugw("sending-device-discovery-msg", log.Fields{"deviceId": deviceId})
Scott Baker2c1c4822019-10-16 11:02:41 -0700187 // Simple validation
188 if deviceId == "" || deviceType == "" {
khenaidoob332f9b2020-01-16 16:25:26 -0500189 logger.Errorw("invalid-parameters", log.Fields{"id": deviceId, "type": deviceType})
Scott Baker2c1c4822019-10-16 11:02:41 -0700190 return errors.New("invalid-parameters")
191 }
192 // Create the device discovery message
193 header := &ic.Header{
194 Id: uuid.New().String(),
195 Type: ic.MessageType_DEVICE_DISCOVERED,
196 FromTopic: kp.DefaultTopic.Name,
197 ToTopic: kp.deviceDiscoveryTopic.Name,
198 Timestamp: time.Now().UnixNano(),
199 }
200 body := &ic.DeviceDiscovered{
201 Id: deviceId,
202 DeviceType: deviceType,
203 ParentId: parentId,
204 Publisher: publisher,
205 }
206
207 var marshalledData *any.Any
208 var err error
209 if marshalledData, err = ptypes.MarshalAny(body); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500210 logger.Errorw("cannot-marshal-request", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700211 return err
212 }
213 msg := &ic.InterContainerMessage{
214 Header: header,
215 Body: marshalledData,
216 }
217
218 // Send the message
219 if err := kp.kafkaClient.Send(msg, kp.deviceDiscoveryTopic); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500220 logger.Errorw("cannot-send-device-discovery-message", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700221 return err
222 }
223 return nil
224}
225
226// InvokeRPC is used to send a request to a given topic
227func (kp *InterContainerProxy) InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
228 waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any) {
229
230 // If a replyToTopic is provided then we use it, otherwise just use the default toTopic. The replyToTopic is
231 // typically the device ID.
232 responseTopic := replyToTopic
233 if responseTopic == nil {
234 responseTopic = kp.DefaultTopic
235 }
236
237 // Encode the request
238 protoRequest, err := encodeRequest(rpc, toTopic, responseTopic, key, kvArgs...)
239 if err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500240 logger.Warnw("cannot-format-request", log.Fields{"rpc": rpc, "error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700241 return false, nil
242 }
243
244 // Subscribe for response, if needed, before sending request
245 var ch <-chan *ic.InterContainerMessage
246 if waitForResponse {
247 var err error
248 if ch, err = kp.subscribeForResponse(*responseTopic, protoRequest.Header.Id); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500249 logger.Errorw("failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700250 }
251 }
252
253 // Send request - if the topic is formatted with a device Id then we will send the request using a
254 // specific key, hence ensuring a single partition is used to publish the request. This ensures that the
255 // subscriber on that topic will receive the request in the order it was sent. The key used is the deviceId.
256 //key := GetDeviceIdFromTopic(*toTopic)
khenaidoob332f9b2020-01-16 16:25:26 -0500257 logger.Debugw("sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
Scott Baker2c1c4822019-10-16 11:02:41 -0700258 go kp.kafkaClient.Send(protoRequest, toTopic, key)
259
260 if waitForResponse {
261 // Create a child context based on the parent context, if any
262 var cancel context.CancelFunc
263 childCtx := context.Background()
264 if ctx == nil {
265 ctx, cancel = context.WithTimeout(context.Background(), DefaultRequestTimeout*time.Millisecond)
266 } else {
267 childCtx, cancel = context.WithTimeout(ctx, DefaultRequestTimeout*time.Millisecond)
268 }
269 defer cancel()
270
271 // Wait for response as well as timeout or cancellation
272 // Remove the subscription for a response on return
273 defer kp.unSubscribeForResponse(protoRequest.Header.Id)
274 select {
275 case msg, ok := <-ch:
276 if !ok {
khenaidoob332f9b2020-01-16 16:25:26 -0500277 logger.Warnw("channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700278 protoError := &ic.Error{Reason: "channel-closed"}
279 var marshalledArg *any.Any
280 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
281 return false, nil // Should never happen
282 }
283 return false, marshalledArg
284 }
khenaidoob332f9b2020-01-16 16:25:26 -0500285 logger.Debugw("received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
Scott Baker2c1c4822019-10-16 11:02:41 -0700286 var responseBody *ic.InterContainerResponseBody
287 var err error
288 if responseBody, err = decodeResponse(msg); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500289 logger.Errorw("decode-response-error", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700290 }
291 return responseBody.Success, responseBody.Result
292 case <-ctx.Done():
khenaidoob332f9b2020-01-16 16:25:26 -0500293 logger.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
Scott Baker2c1c4822019-10-16 11:02:41 -0700294 // pack the error as proto any type
295 protoError := &ic.Error{Reason: ctx.Err().Error()}
296 var marshalledArg *any.Any
297 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
298 return false, nil // Should never happen
299 }
300 return false, marshalledArg
301 case <-childCtx.Done():
khenaidoob332f9b2020-01-16 16:25:26 -0500302 logger.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": childCtx.Err()})
Scott Baker2c1c4822019-10-16 11:02:41 -0700303 // pack the error as proto any type
304 protoError := &ic.Error{Reason: childCtx.Err().Error()}
305 var marshalledArg *any.Any
306 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
307 return false, nil // Should never happen
308 }
309 return false, marshalledArg
310 case <-kp.doneCh:
khenaidoob332f9b2020-01-16 16:25:26 -0500311 logger.Infow("received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
Scott Baker2c1c4822019-10-16 11:02:41 -0700312 return true, nil
313 }
314 }
315 return true, nil
316}
317
318// SubscribeWithRequestHandlerInterface allows a caller to assign a target object to be invoked automatically
319// when a message is received on a given topic
320func (kp *InterContainerProxy) SubscribeWithRequestHandlerInterface(topic Topic, handler interface{}) error {
321
322 // Subscribe to receive messages for that topic
323 var ch <-chan *ic.InterContainerMessage
324 var err error
325 if ch, err = kp.kafkaClient.Subscribe(&topic); err != nil {
326 //if ch, err = kp.Subscribe(topic); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500327 logger.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700328 return err
329 }
330
331 kp.defaultRequestHandlerInterface = handler
332 kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: handler, ch: ch})
333 // Launch a go routine to receive and process kafka messages
334 go kp.waitForMessages(ch, topic, handler)
335
336 return nil
337}
338
339// SubscribeWithDefaultRequestHandler allows a caller to add a topic to an existing target object to be invoked automatically
340// when a message is received on a given topic. So far there is only 1 target registered per microservice
341func (kp *InterContainerProxy) SubscribeWithDefaultRequestHandler(topic Topic, initialOffset int64) error {
342 // Subscribe to receive messages for that topic
343 var ch <-chan *ic.InterContainerMessage
344 var err error
345 if ch, err = kp.kafkaClient.Subscribe(&topic, &KVArg{Key: Offset, Value: initialOffset}); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500346 logger.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700347 return err
348 }
349 kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: kp.defaultRequestHandlerInterface, ch: ch})
350
351 // Launch a go routine to receive and process kafka messages
352 go kp.waitForMessages(ch, topic, kp.defaultRequestHandlerInterface)
353
354 return nil
355}
356
357func (kp *InterContainerProxy) UnSubscribeFromRequestHandler(topic Topic) error {
358 return kp.deleteFromTopicRequestHandlerChannelMap(topic.Name)
359}
360
361// setupTopicResponseChannelMap sets up single consumers channel that will act as a broadcast channel for all
362// responses from that topic.
363func (kp *InterContainerProxy) setupTopicResponseChannelMap(topic string, arg <-chan *ic.InterContainerMessage) {
364 kp.lockTopicResponseChannelMap.Lock()
365 defer kp.lockTopicResponseChannelMap.Unlock()
366 if _, exist := kp.topicToResponseChannelMap[topic]; !exist {
367 kp.topicToResponseChannelMap[topic] = arg
368 }
369}
370
371func (kp *InterContainerProxy) isTopicSubscribedForResponse(topic string) bool {
372 kp.lockTopicResponseChannelMap.RLock()
373 defer kp.lockTopicResponseChannelMap.RUnlock()
374 _, exist := kp.topicToResponseChannelMap[topic]
375 return exist
376}
377
378func (kp *InterContainerProxy) deleteFromTopicResponseChannelMap(topic string) error {
379 kp.lockTopicResponseChannelMap.Lock()
380 defer kp.lockTopicResponseChannelMap.Unlock()
381 if _, exist := kp.topicToResponseChannelMap[topic]; exist {
382 // Unsubscribe to this topic first - this will close the subscribed channel
383 var err error
384 if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500385 logger.Errorw("unsubscribing-error", log.Fields{"topic": topic})
Scott Baker2c1c4822019-10-16 11:02:41 -0700386 }
387 delete(kp.topicToResponseChannelMap, topic)
388 return err
389 } else {
390 return errors.New(fmt.Sprintf("%s-Topic-not-found", topic))
391 }
392}
393
394func (kp *InterContainerProxy) deleteAllTopicResponseChannelMap() error {
395 kp.lockTopicResponseChannelMap.Lock()
396 defer kp.lockTopicResponseChannelMap.Unlock()
397 var err error
398 for topic, _ := range kp.topicToResponseChannelMap {
399 // Unsubscribe to this topic first - this will close the subscribed channel
400 if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500401 logger.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700402 }
403 delete(kp.topicToResponseChannelMap, topic)
404 }
405 return err
406}
407
408func (kp *InterContainerProxy) addToTopicRequestHandlerChannelMap(topic string, arg *requestHandlerChannel) {
409 kp.lockTopicRequestHandlerChannelMap.Lock()
410 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
411 if _, exist := kp.topicToRequestHandlerChannelMap[topic]; !exist {
412 kp.topicToRequestHandlerChannelMap[topic] = arg
413 }
414}
415
416func (kp *InterContainerProxy) deleteFromTopicRequestHandlerChannelMap(topic string) error {
417 kp.lockTopicRequestHandlerChannelMap.Lock()
418 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
419 if _, exist := kp.topicToRequestHandlerChannelMap[topic]; exist {
420 // Close the kafka client client first by unsubscribing to this topic
421 kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch)
422 delete(kp.topicToRequestHandlerChannelMap, topic)
423 return nil
424 } else {
425 return errors.New(fmt.Sprintf("%s-Topic-not-found", topic))
426 }
427}
428
429func (kp *InterContainerProxy) deleteAllTopicRequestHandlerChannelMap() error {
430 kp.lockTopicRequestHandlerChannelMap.Lock()
431 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
432 var err error
433 for topic, _ := range kp.topicToRequestHandlerChannelMap {
434 // Close the kafka client client first by unsubscribing to this topic
435 if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500436 logger.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700437 }
438 delete(kp.topicToRequestHandlerChannelMap, topic)
439 }
440 return err
441}
442
443func (kp *InterContainerProxy) addToTransactionIdToChannelMap(id string, topic *Topic, arg chan *ic.InterContainerMessage) {
444 kp.lockTransactionIdToChannelMap.Lock()
445 defer kp.lockTransactionIdToChannelMap.Unlock()
446 if _, exist := kp.transactionIdToChannelMap[id]; !exist {
447 kp.transactionIdToChannelMap[id] = &transactionChannel{topic: topic, ch: arg}
448 }
449}
450
451func (kp *InterContainerProxy) deleteFromTransactionIdToChannelMap(id string) {
452 kp.lockTransactionIdToChannelMap.Lock()
453 defer kp.lockTransactionIdToChannelMap.Unlock()
454 if transChannel, exist := kp.transactionIdToChannelMap[id]; exist {
455 // Close the channel first
456 close(transChannel.ch)
457 delete(kp.transactionIdToChannelMap, id)
458 }
459}
460
461func (kp *InterContainerProxy) deleteTopicTransactionIdToChannelMap(id string) {
462 kp.lockTransactionIdToChannelMap.Lock()
463 defer kp.lockTransactionIdToChannelMap.Unlock()
464 for key, value := range kp.transactionIdToChannelMap {
465 if value.topic.Name == id {
466 close(value.ch)
467 delete(kp.transactionIdToChannelMap, key)
468 }
469 }
470}
471
472func (kp *InterContainerProxy) deleteAllTransactionIdToChannelMap() {
473 kp.lockTransactionIdToChannelMap.Lock()
474 defer kp.lockTransactionIdToChannelMap.Unlock()
475 for key, value := range kp.transactionIdToChannelMap {
476 close(value.ch)
477 delete(kp.transactionIdToChannelMap, key)
478 }
479}
480
481func (kp *InterContainerProxy) DeleteTopic(topic Topic) error {
482 // If we have any consumers on that topic we need to close them
483 if err := kp.deleteFromTopicResponseChannelMap(topic.Name); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500484 logger.Errorw("delete-from-topic-responsechannelmap-failed", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700485 }
486 if err := kp.deleteFromTopicRequestHandlerChannelMap(topic.Name); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500487 logger.Errorw("delete-from-topic-requesthandlerchannelmap-failed", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700488 }
489 kp.deleteTopicTransactionIdToChannelMap(topic.Name)
490
491 return kp.kafkaClient.DeleteTopic(&topic)
492}
493
494func encodeReturnedValue(returnedVal interface{}) (*any.Any, error) {
495 // Encode the response argument - needs to be a proto message
496 if returnedVal == nil {
497 return nil, nil
498 }
499 protoValue, ok := returnedVal.(proto.Message)
500 if !ok {
khenaidoob332f9b2020-01-16 16:25:26 -0500501 logger.Warnw("response-value-not-proto-message", log.Fields{"error": ok, "returnVal": returnedVal})
Scott Baker2c1c4822019-10-16 11:02:41 -0700502 err := errors.New("response-value-not-proto-message")
503 return nil, err
504 }
505
506 // Marshal the returned value, if any
507 var marshalledReturnedVal *any.Any
508 var err error
509 if marshalledReturnedVal, err = ptypes.MarshalAny(protoValue); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500510 logger.Warnw("cannot-marshal-returned-val", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700511 return nil, err
512 }
513 return marshalledReturnedVal, nil
514}
515
516func encodeDefaultFailedResponse(request *ic.InterContainerMessage) *ic.InterContainerMessage {
517 responseHeader := &ic.Header{
518 Id: request.Header.Id,
519 Type: ic.MessageType_RESPONSE,
520 FromTopic: request.Header.ToTopic,
521 ToTopic: request.Header.FromTopic,
522 Timestamp: time.Now().Unix(),
523 }
524 responseBody := &ic.InterContainerResponseBody{
525 Success: false,
526 Result: nil,
527 }
528 var marshalledResponseBody *any.Any
529 var err error
530 // Error should never happen here
531 if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500532 logger.Warnw("cannot-marshal-failed-response-body", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700533 }
534
535 return &ic.InterContainerMessage{
536 Header: responseHeader,
537 Body: marshalledResponseBody,
538 }
539
540}
541
542//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
543//or an error on failure
544func encodeResponse(request *ic.InterContainerMessage, success bool, returnedValues ...interface{}) (*ic.InterContainerMessage, error) {
khenaidoob332f9b2020-01-16 16:25:26 -0500545 //logger.Debugw("encodeResponse", log.Fields{"success": success, "returnedValues": returnedValues})
Scott Baker2c1c4822019-10-16 11:02:41 -0700546 responseHeader := &ic.Header{
547 Id: request.Header.Id,
548 Type: ic.MessageType_RESPONSE,
549 FromTopic: request.Header.ToTopic,
550 ToTopic: request.Header.FromTopic,
551 KeyTopic: request.Header.KeyTopic,
552 Timestamp: time.Now().UnixNano(),
553 }
554
555 // Go over all returned values
556 var marshalledReturnedVal *any.Any
557 var err error
558 for _, returnVal := range returnedValues {
559 if marshalledReturnedVal, err = encodeReturnedValue(returnVal); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500560 logger.Warnw("cannot-marshal-response-body", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700561 }
562 break // for now we support only 1 returned value - (excluding the error)
563 }
564
565 responseBody := &ic.InterContainerResponseBody{
566 Success: success,
567 Result: marshalledReturnedVal,
568 }
569
570 // Marshal the response body
571 var marshalledResponseBody *any.Any
572 if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500573 logger.Warnw("cannot-marshal-response-body", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700574 return nil, err
575 }
576
577 return &ic.InterContainerMessage{
578 Header: responseHeader,
579 Body: marshalledResponseBody,
580 }, nil
581}
582
583func CallFuncByName(myClass interface{}, funcName string, params ...interface{}) (out []reflect.Value, err error) {
584 myClassValue := reflect.ValueOf(myClass)
585 // Capitalize the first letter in the funcName to workaround the first capital letters required to
586 // invoke a function from a different package
587 funcName = strings.Title(funcName)
588 m := myClassValue.MethodByName(funcName)
589 if !m.IsValid() {
590 return make([]reflect.Value, 0), fmt.Errorf("method-not-found \"%s\"", funcName)
591 }
592 in := make([]reflect.Value, len(params))
593 for i, param := range params {
594 in[i] = reflect.ValueOf(param)
595 }
596 out = m.Call(in)
597 return
598}
599
600func (kp *InterContainerProxy) addTransactionId(transactionId string, currentArgs []*ic.Argument) []*ic.Argument {
601 arg := &KVArg{
602 Key: TransactionKey,
603 Value: &ic.StrType{Val: transactionId},
604 }
605
606 var marshalledArg *any.Any
607 var err error
608 if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: transactionId}); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500609 logger.Warnw("cannot-add-transactionId", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700610 return currentArgs
611 }
612 protoArg := &ic.Argument{
613 Key: arg.Key,
614 Value: marshalledArg,
615 }
616 return append(currentArgs, protoArg)
617}
618
619func (kp *InterContainerProxy) addFromTopic(fromTopic string, currentArgs []*ic.Argument) []*ic.Argument {
620 var marshalledArg *any.Any
621 var err error
622 if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: fromTopic}); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500623 logger.Warnw("cannot-add-transactionId", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700624 return currentArgs
625 }
626 protoArg := &ic.Argument{
627 Key: FromTopic,
628 Value: marshalledArg,
629 }
630 return append(currentArgs, protoArg)
631}
632
633func (kp *InterContainerProxy) handleMessage(msg *ic.InterContainerMessage, targetInterface interface{}) {
634
635 // First extract the header to know whether this is a request - responses are handled by a different handler
636 if msg.Header.Type == ic.MessageType_REQUEST {
637 var out []reflect.Value
638 var err error
639
640 // Get the request body
641 requestBody := &ic.InterContainerRequestBody{}
642 if err = ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500643 logger.Warnw("cannot-unmarshal-request", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700644 } else {
khenaidoob332f9b2020-01-16 16:25:26 -0500645 logger.Debugw("received-request", log.Fields{"rpc": requestBody.Rpc, "header": msg.Header})
Scott Baker2c1c4822019-10-16 11:02:41 -0700646 // let the callee unpack the arguments as its the only one that knows the real proto type
647 // Augment the requestBody with the message Id as it will be used in scenarios where cores
648 // are set in pairs and competing
649 requestBody.Args = kp.addTransactionId(msg.Header.Id, requestBody.Args)
650
651 // Augment the requestBody with the From topic name as it will be used in scenarios where a container
652 // needs to send an unsollicited message to the currently requested container
653 requestBody.Args = kp.addFromTopic(msg.Header.FromTopic, requestBody.Args)
654
655 out, err = CallFuncByName(targetInterface, requestBody.Rpc, requestBody.Args)
656 if err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500657 logger.Warn(err)
Scott Baker2c1c4822019-10-16 11:02:41 -0700658 }
659 }
660 // Response required?
661 if requestBody.ResponseRequired {
662 // If we already have an error before then just return that
663 var returnError *ic.Error
664 var returnedValues []interface{}
665 var success bool
666 if err != nil {
667 returnError = &ic.Error{Reason: err.Error()}
668 returnedValues = make([]interface{}, 1)
669 returnedValues[0] = returnError
670 } else {
671 returnedValues = make([]interface{}, 0)
672 // Check for errors first
673 lastIndex := len(out) - 1
674 if out[lastIndex].Interface() != nil { // Error
675 if retError, ok := out[lastIndex].Interface().(error); ok {
676 if retError.Error() == ErrorTransactionNotAcquired.Error() {
khenaidoob332f9b2020-01-16 16:25:26 -0500677 logger.Debugw("Ignoring request", log.Fields{"error": retError, "txId": msg.Header.Id})
Scott Baker2c1c4822019-10-16 11:02:41 -0700678 return // Ignore - process is in competing mode and ignored transaction
679 }
680 returnError = &ic.Error{Reason: retError.Error()}
681 returnedValues = append(returnedValues, returnError)
682 } else { // Should never happen
683 returnError = &ic.Error{Reason: "incorrect-error-returns"}
684 returnedValues = append(returnedValues, returnError)
685 }
686 } else if len(out) == 2 && reflect.ValueOf(out[0].Interface()).IsValid() && reflect.ValueOf(out[0].Interface()).IsNil() {
khenaidoob332f9b2020-01-16 16:25:26 -0500687 logger.Warnw("Unexpected response of (nil,nil)", log.Fields{"txId": msg.Header.Id})
Scott Baker2c1c4822019-10-16 11:02:41 -0700688 return // Ignore - should not happen
689 } else { // Non-error case
690 success = true
691 for idx, val := range out {
khenaidoob332f9b2020-01-16 16:25:26 -0500692 //logger.Debugw("returned-api-response-loop", log.Fields{"idx": idx, "val": val.Interface()})
Scott Baker2c1c4822019-10-16 11:02:41 -0700693 if idx != lastIndex {
694 returnedValues = append(returnedValues, val.Interface())
695 }
696 }
697 }
698 }
699
700 var icm *ic.InterContainerMessage
701 if icm, err = encodeResponse(msg, success, returnedValues...); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500702 logger.Warnw("error-encoding-response-returning-failure-result", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700703 icm = encodeDefaultFailedResponse(msg)
704 }
705 // To preserve ordering of messages, all messages to a given topic are sent to the same partition
706 // by providing a message key. The key is encoded in the topic name. If the deviceId is not
707 // present then the key will be empty, hence all messages for a given topic will be sent to all
708 // partitions.
709 replyTopic := &Topic{Name: msg.Header.FromTopic}
710 key := msg.Header.KeyTopic
khenaidoob332f9b2020-01-16 16:25:26 -0500711 logger.Debugw("sending-response-to-kafka", log.Fields{"rpc": requestBody.Rpc, "header": icm.Header, "key": key})
Scott Baker2c1c4822019-10-16 11:02:41 -0700712 // TODO: handle error response.
713 go kp.kafkaClient.Send(icm, replyTopic, key)
714 }
715 } else if msg.Header.Type == ic.MessageType_RESPONSE {
khenaidoob332f9b2020-01-16 16:25:26 -0500716 logger.Debugw("response-received", log.Fields{"msg-header": msg.Header})
Scott Baker2c1c4822019-10-16 11:02:41 -0700717 go kp.dispatchResponse(msg)
718 } else {
khenaidoob332f9b2020-01-16 16:25:26 -0500719 logger.Warnw("unsupported-message-received", log.Fields{"msg-header": msg.Header})
Scott Baker2c1c4822019-10-16 11:02:41 -0700720 }
721}
722
723func (kp *InterContainerProxy) waitForMessages(ch <-chan *ic.InterContainerMessage, topic Topic, targetInterface interface{}) {
724 // Wait for messages
725 for msg := range ch {
khenaidoob332f9b2020-01-16 16:25:26 -0500726 //logger.Debugw("request-received", log.Fields{"msg": msg, "topic": topic.Name, "target": targetInterface})
Scott Baker2c1c4822019-10-16 11:02:41 -0700727 go kp.handleMessage(msg, targetInterface)
728 }
729}
730
731func (kp *InterContainerProxy) dispatchResponse(msg *ic.InterContainerMessage) {
732 kp.lockTransactionIdToChannelMap.RLock()
733 defer kp.lockTransactionIdToChannelMap.RUnlock()
734 if _, exist := kp.transactionIdToChannelMap[msg.Header.Id]; !exist {
khenaidoob332f9b2020-01-16 16:25:26 -0500735 logger.Debugw("no-waiting-channel", log.Fields{"transaction": msg.Header.Id})
Scott Baker2c1c4822019-10-16 11:02:41 -0700736 return
737 }
738 kp.transactionIdToChannelMap[msg.Header.Id].ch <- msg
739}
740
741// subscribeForResponse allows a caller to subscribe to a given topic when waiting for a response.
742// This method is built to prevent all subscribers to receive all messages as is the case of the Subscribe
743// API. There is one response channel waiting for kafka messages before dispatching the message to the
744// corresponding waiting channel
745func (kp *InterContainerProxy) subscribeForResponse(topic Topic, trnsId string) (chan *ic.InterContainerMessage, error) {
khenaidoob332f9b2020-01-16 16:25:26 -0500746 logger.Debugw("subscribeForResponse", log.Fields{"topic": topic.Name, "trnsid": trnsId})
Scott Baker2c1c4822019-10-16 11:02:41 -0700747
748 // Create a specific channel for this consumers. We cannot use the channel from the kafkaclient as it will
749 // broadcast any message for this topic to all channels waiting on it.
750 ch := make(chan *ic.InterContainerMessage)
751 kp.addToTransactionIdToChannelMap(trnsId, &topic, ch)
752
753 return ch, nil
754}
755
756func (kp *InterContainerProxy) unSubscribeForResponse(trnsId string) error {
khenaidoob332f9b2020-01-16 16:25:26 -0500757 logger.Debugw("unsubscribe-for-response", log.Fields{"trnsId": trnsId})
Scott Baker2c1c4822019-10-16 11:02:41 -0700758 kp.deleteFromTransactionIdToChannelMap(trnsId)
759 return nil
760}
761
Scott Baker104b67d2019-10-29 15:56:27 -0700762func (kp *InterContainerProxy) EnableLivenessChannel(enable bool) chan bool {
763 return kp.kafkaClient.EnableLivenessChannel(enable)
764}
765
Scott Baker0fef6982019-12-12 09:49:42 -0800766func (kp *InterContainerProxy) EnableHealthinessChannel(enable bool) chan bool {
767 return kp.kafkaClient.EnableHealthinessChannel(enable)
768}
769
Scott Baker104b67d2019-10-29 15:56:27 -0700770func (kp *InterContainerProxy) SendLiveness() error {
771 return kp.kafkaClient.SendLiveness()
772}
773
Scott Baker2c1c4822019-10-16 11:02:41 -0700774//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
775//or an error on failure
776func encodeRequest(rpc string, toTopic *Topic, replyTopic *Topic, key string, kvArgs ...*KVArg) (*ic.InterContainerMessage, error) {
777 requestHeader := &ic.Header{
778 Id: uuid.New().String(),
779 Type: ic.MessageType_REQUEST,
780 FromTopic: replyTopic.Name,
781 ToTopic: toTopic.Name,
782 KeyTopic: key,
783 Timestamp: time.Now().UnixNano(),
784 }
785 requestBody := &ic.InterContainerRequestBody{
786 Rpc: rpc,
787 ResponseRequired: true,
788 ReplyToTopic: replyTopic.Name,
789 }
790
791 for _, arg := range kvArgs {
792 if arg == nil {
793 // In case the caller sends an array with empty args
794 continue
795 }
796 var marshalledArg *any.Any
797 var err error
798 // ascertain the value interface type is a proto.Message
799 protoValue, ok := arg.Value.(proto.Message)
800 if !ok {
khenaidoob332f9b2020-01-16 16:25:26 -0500801 logger.Warnw("argument-value-not-proto-message", log.Fields{"error": ok, "Value": arg.Value})
Scott Baker2c1c4822019-10-16 11:02:41 -0700802 err := errors.New("argument-value-not-proto-message")
803 return nil, err
804 }
805 if marshalledArg, err = ptypes.MarshalAny(protoValue); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500806 logger.Warnw("cannot-marshal-request", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700807 return nil, err
808 }
809 protoArg := &ic.Argument{
810 Key: arg.Key,
811 Value: marshalledArg,
812 }
813 requestBody.Args = append(requestBody.Args, protoArg)
814 }
815
816 var marshalledData *any.Any
817 var err error
818 if marshalledData, err = ptypes.MarshalAny(requestBody); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500819 logger.Warnw("cannot-marshal-request", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700820 return nil, err
821 }
822 request := &ic.InterContainerMessage{
823 Header: requestHeader,
824 Body: marshalledData,
825 }
826 return request, nil
827}
828
829func decodeResponse(response *ic.InterContainerMessage) (*ic.InterContainerResponseBody, error) {
830 // Extract the message body
831 responseBody := ic.InterContainerResponseBody{}
832 if err := ptypes.UnmarshalAny(response.Body, &responseBody); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500833 logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700834 return nil, err
835 }
khenaidoob332f9b2020-01-16 16:25:26 -0500836 //logger.Debugw("response-decoded-successfully", log.Fields{"response-status": &responseBody.Success})
Scott Baker2c1c4822019-10-16 11:02:41 -0700837
838 return &responseBody, nil
839
840}