blob: 042e1213c1165683f26048cb8f7156a20955633d [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kafka
17
18import (
19 "context"
20 "errors"
21 "fmt"
William Kurkianea869482019-04-09 15:16:11 -040022 "reflect"
23 "strings"
24 "sync"
25 "time"
William Kurkianea869482019-04-09 15:16:11 -040026
Esin Karamanccb714b2019-11-29 15:02:06 +000027 "github.com/golang/protobuf/proto"
28 "github.com/golang/protobuf/ptypes"
29 "github.com/golang/protobuf/ptypes/any"
30 "github.com/google/uuid"
31 "github.com/opencord/voltha-lib-go/v3/pkg/log"
32 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
33)
William Kurkianea869482019-04-09 15:16:11 -040034
35const (
36 DefaultMaxRetries = 3
37 DefaultRequestTimeout = 10000 // 10000 milliseconds - to handle a wider latency range
38)
39
40const (
41 TransactionKey = "transactionID"
42 FromTopic = "fromTopic"
43)
44
kdarapub26b4502019-10-05 03:02:33 +053045var ErrorTransactionNotAcquired = errors.New("transaction-not-acquired")
46var ErrorTransactionInvalidId = errors.New("transaction-invalid-id")
47
William Kurkianea869482019-04-09 15:16:11 -040048// requestHandlerChannel represents an interface associated with a channel. Whenever, an event is
49// obtained from that channel, this interface is invoked. This is used to handle
50// async requests into the Core via the kafka messaging bus
51type requestHandlerChannel struct {
52 requesthandlerInterface interface{}
53 ch <-chan *ic.InterContainerMessage
54}
55
56// transactionChannel represents a combination of a topic and a channel onto which a response received
57// on the kafka bus will be sent to
58type transactionChannel struct {
59 topic *Topic
60 ch chan *ic.InterContainerMessage
61}
62
63// InterContainerProxy represents the messaging proxy
64type InterContainerProxy struct {
65 kafkaHost string
66 kafkaPort int
67 DefaultTopic *Topic
68 defaultRequestHandlerInterface interface{}
69 deviceDiscoveryTopic *Topic
70 kafkaClient Client
71 doneCh chan int
72
73 // This map is used to map a topic to an interface and channel. When a request is received
74 // on that channel (registered to the topic) then that interface is invoked.
75 topicToRequestHandlerChannelMap map[string]*requestHandlerChannel
76 lockTopicRequestHandlerChannelMap sync.RWMutex
77
78 // This map is used to map a channel to a response topic. This channel handles all responses on that
79 // channel for that topic and forward them to the appropriate consumers channel, using the
80 // transactionIdToChannelMap.
81 topicToResponseChannelMap map[string]<-chan *ic.InterContainerMessage
82 lockTopicResponseChannelMap sync.RWMutex
83
84 // This map is used to map a transaction to a consumers channel. This is used whenever a request has been
85 // sent out and we are waiting for a response.
86 transactionIdToChannelMap map[string]*transactionChannel
87 lockTransactionIdToChannelMap sync.RWMutex
88}
89
90type InterContainerProxyOption func(*InterContainerProxy)
91
92func InterContainerHost(host string) InterContainerProxyOption {
93 return func(args *InterContainerProxy) {
94 args.kafkaHost = host
95 }
96}
97
98func InterContainerPort(port int) InterContainerProxyOption {
99 return func(args *InterContainerProxy) {
100 args.kafkaPort = port
101 }
102}
103
104func DefaultTopic(topic *Topic) InterContainerProxyOption {
105 return func(args *InterContainerProxy) {
106 args.DefaultTopic = topic
107 }
108}
109
110func DeviceDiscoveryTopic(topic *Topic) InterContainerProxyOption {
111 return func(args *InterContainerProxy) {
112 args.deviceDiscoveryTopic = topic
113 }
114}
115
116func RequestHandlerInterface(handler interface{}) InterContainerProxyOption {
117 return func(args *InterContainerProxy) {
118 args.defaultRequestHandlerInterface = handler
119 }
120}
121
122func MsgClient(client Client) InterContainerProxyOption {
123 return func(args *InterContainerProxy) {
124 args.kafkaClient = client
125 }
126}
127
128func NewInterContainerProxy(opts ...InterContainerProxyOption) (*InterContainerProxy, error) {
129 proxy := &InterContainerProxy{
130 kafkaHost: DefaultKafkaHost,
131 kafkaPort: DefaultKafkaPort,
132 }
133
134 for _, option := range opts {
135 option(proxy)
136 }
137
138 // Create the locks for all the maps
139 proxy.lockTopicRequestHandlerChannelMap = sync.RWMutex{}
140 proxy.lockTransactionIdToChannelMap = sync.RWMutex{}
141 proxy.lockTopicResponseChannelMap = sync.RWMutex{}
142
143 return proxy, nil
144}
145
146func (kp *InterContainerProxy) Start() error {
Esin Karamanccb714b2019-11-29 15:02:06 +0000147 logger.Info("Starting-Proxy")
William Kurkianea869482019-04-09 15:16:11 -0400148
149 // Kafka MsgClient should already have been created. If not, output fatal error
150 if kp.kafkaClient == nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000151 logger.Fatal("kafka-client-not-set")
William Kurkianea869482019-04-09 15:16:11 -0400152 }
153
154 // Create the Done channel
155 kp.doneCh = make(chan int, 1)
156
157 // Start the kafka client
158 if err := kp.kafkaClient.Start(); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000159 logger.Errorw("Cannot-create-kafka-proxy", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400160 return err
161 }
162
163 // Create the topic to response channel map
164 kp.topicToResponseChannelMap = make(map[string]<-chan *ic.InterContainerMessage)
165 //
166 // Create the transactionId to Channel Map
167 kp.transactionIdToChannelMap = make(map[string]*transactionChannel)
168
169 // Create the topic to request channel map
170 kp.topicToRequestHandlerChannelMap = make(map[string]*requestHandlerChannel)
171
172 return nil
173}
174
175func (kp *InterContainerProxy) Stop() {
Esin Karamanccb714b2019-11-29 15:02:06 +0000176 logger.Info("stopping-intercontainer-proxy")
William Kurkianea869482019-04-09 15:16:11 -0400177 kp.doneCh <- 1
178 // TODO : Perform cleanup
179 kp.kafkaClient.Stop()
180 //kp.deleteAllTopicRequestHandlerChannelMap()
181 //kp.deleteAllTopicResponseChannelMap()
182 //kp.deleteAllTransactionIdToChannelMap()
183}
184
185// DeviceDiscovered publish the discovered device onto the kafka messaging bus
186func (kp *InterContainerProxy) DeviceDiscovered(deviceId string, deviceType string, parentId string, publisher string) error {
Esin Karamanccb714b2019-11-29 15:02:06 +0000187 logger.Debugw("sending-device-discovery-msg", log.Fields{"deviceId": deviceId})
William Kurkianea869482019-04-09 15:16:11 -0400188 // Simple validation
189 if deviceId == "" || deviceType == "" {
Esin Karamanccb714b2019-11-29 15:02:06 +0000190 logger.Errorw("invalid-parameters", log.Fields{"id": deviceId, "type": deviceType})
William Kurkianea869482019-04-09 15:16:11 -0400191 return errors.New("invalid-parameters")
192 }
193 // Create the device discovery message
194 header := &ic.Header{
195 Id: uuid.New().String(),
196 Type: ic.MessageType_DEVICE_DISCOVERED,
197 FromTopic: kp.DefaultTopic.Name,
198 ToTopic: kp.deviceDiscoveryTopic.Name,
199 Timestamp: time.Now().UnixNano(),
200 }
201 body := &ic.DeviceDiscovered{
202 Id: deviceId,
203 DeviceType: deviceType,
204 ParentId: parentId,
205 Publisher: publisher,
206 }
207
208 var marshalledData *any.Any
209 var err error
210 if marshalledData, err = ptypes.MarshalAny(body); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000211 logger.Errorw("cannot-marshal-request", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400212 return err
213 }
214 msg := &ic.InterContainerMessage{
215 Header: header,
216 Body: marshalledData,
217 }
218
219 // Send the message
220 if err := kp.kafkaClient.Send(msg, kp.deviceDiscoveryTopic); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000221 logger.Errorw("cannot-send-device-discovery-message", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400222 return err
223 }
224 return nil
225}
226
227// InvokeRPC is used to send a request to a given topic
228func (kp *InterContainerProxy) InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
229 waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any) {
230
231 // If a replyToTopic is provided then we use it, otherwise just use the default toTopic. The replyToTopic is
232 // typically the device ID.
233 responseTopic := replyToTopic
234 if responseTopic == nil {
235 responseTopic = kp.DefaultTopic
236 }
237
238 // Encode the request
239 protoRequest, err := encodeRequest(rpc, toTopic, responseTopic, key, kvArgs...)
240 if err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000241 logger.Warnw("cannot-format-request", log.Fields{"rpc": rpc, "error": err})
William Kurkianea869482019-04-09 15:16:11 -0400242 return false, nil
243 }
244
245 // Subscribe for response, if needed, before sending request
246 var ch <-chan *ic.InterContainerMessage
247 if waitForResponse {
248 var err error
249 if ch, err = kp.subscribeForResponse(*responseTopic, protoRequest.Header.Id); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000250 logger.Errorw("failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400251 }
252 }
253
254 // Send request - if the topic is formatted with a device Id then we will send the request using a
255 // specific key, hence ensuring a single partition is used to publish the request. This ensures that the
256 // subscriber on that topic will receive the request in the order it was sent. The key used is the deviceId.
257 //key := GetDeviceIdFromTopic(*toTopic)
Esin Karamanccb714b2019-11-29 15:02:06 +0000258 logger.Debugw("sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
William Kurkianea869482019-04-09 15:16:11 -0400259 go kp.kafkaClient.Send(protoRequest, toTopic, key)
260
261 if waitForResponse {
262 // Create a child context based on the parent context, if any
263 var cancel context.CancelFunc
264 childCtx := context.Background()
265 if ctx == nil {
266 ctx, cancel = context.WithTimeout(context.Background(), DefaultRequestTimeout*time.Millisecond)
267 } else {
268 childCtx, cancel = context.WithTimeout(ctx, DefaultRequestTimeout*time.Millisecond)
269 }
270 defer cancel()
271
272 // Wait for response as well as timeout or cancellation
273 // Remove the subscription for a response on return
274 defer kp.unSubscribeForResponse(protoRequest.Header.Id)
275 select {
276 case msg, ok := <-ch:
277 if !ok {
Esin Karamanccb714b2019-11-29 15:02:06 +0000278 logger.Warnw("channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400279 protoError := &ic.Error{Reason: "channel-closed"}
280 var marshalledArg *any.Any
281 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
282 return false, nil // Should never happen
283 }
284 return false, marshalledArg
285 }
Esin Karamanccb714b2019-11-29 15:02:06 +0000286 logger.Debugw("received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
William Kurkianea869482019-04-09 15:16:11 -0400287 var responseBody *ic.InterContainerResponseBody
288 var err error
289 if responseBody, err = decodeResponse(msg); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000290 logger.Errorw("decode-response-error", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400291 }
292 return responseBody.Success, responseBody.Result
293 case <-ctx.Done():
Esin Karamanccb714b2019-11-29 15:02:06 +0000294 logger.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
William Kurkianea869482019-04-09 15:16:11 -0400295 // pack the error as proto any type
296 protoError := &ic.Error{Reason: ctx.Err().Error()}
297 var marshalledArg *any.Any
298 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
299 return false, nil // Should never happen
300 }
301 return false, marshalledArg
302 case <-childCtx.Done():
Esin Karamanccb714b2019-11-29 15:02:06 +0000303 logger.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": childCtx.Err()})
William Kurkianea869482019-04-09 15:16:11 -0400304 // pack the error as proto any type
305 protoError := &ic.Error{Reason: childCtx.Err().Error()}
306 var marshalledArg *any.Any
307 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
308 return false, nil // Should never happen
309 }
310 return false, marshalledArg
311 case <-kp.doneCh:
Esin Karamanccb714b2019-11-29 15:02:06 +0000312 logger.Infow("received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
William Kurkianea869482019-04-09 15:16:11 -0400313 return true, nil
314 }
315 }
316 return true, nil
317}
318
319// SubscribeWithRequestHandlerInterface allows a caller to assign a target object to be invoked automatically
320// when a message is received on a given topic
321func (kp *InterContainerProxy) SubscribeWithRequestHandlerInterface(topic Topic, handler interface{}) error {
322
323 // Subscribe to receive messages for that topic
324 var ch <-chan *ic.InterContainerMessage
325 var err error
326 if ch, err = kp.kafkaClient.Subscribe(&topic); err != nil {
327 //if ch, err = kp.Subscribe(topic); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000328 logger.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
Matt Jeannereteb5059f2019-07-19 06:11:00 -0400329 return err
William Kurkianea869482019-04-09 15:16:11 -0400330 }
331
332 kp.defaultRequestHandlerInterface = handler
333 kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: handler, ch: ch})
334 // Launch a go routine to receive and process kafka messages
335 go kp.waitForMessages(ch, topic, handler)
336
337 return nil
338}
339
340// SubscribeWithDefaultRequestHandler allows a caller to add a topic to an existing target object to be invoked automatically
341// when a message is received on a given topic. So far there is only 1 target registered per microservice
342func (kp *InterContainerProxy) SubscribeWithDefaultRequestHandler(topic Topic, initialOffset int64) error {
343 // Subscribe to receive messages for that topic
344 var ch <-chan *ic.InterContainerMessage
345 var err error
346 if ch, err = kp.kafkaClient.Subscribe(&topic, &KVArg{Key: Offset, Value: initialOffset}); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000347 logger.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400348 return err
349 }
350 kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: kp.defaultRequestHandlerInterface, ch: ch})
351
352 // Launch a go routine to receive and process kafka messages
353 go kp.waitForMessages(ch, topic, kp.defaultRequestHandlerInterface)
354
355 return nil
356}
357
358func (kp *InterContainerProxy) UnSubscribeFromRequestHandler(topic Topic) error {
359 return kp.deleteFromTopicRequestHandlerChannelMap(topic.Name)
360}
361
362// setupTopicResponseChannelMap sets up single consumers channel that will act as a broadcast channel for all
363// responses from that topic.
364func (kp *InterContainerProxy) setupTopicResponseChannelMap(topic string, arg <-chan *ic.InterContainerMessage) {
365 kp.lockTopicResponseChannelMap.Lock()
366 defer kp.lockTopicResponseChannelMap.Unlock()
367 if _, exist := kp.topicToResponseChannelMap[topic]; !exist {
368 kp.topicToResponseChannelMap[topic] = arg
369 }
370}
371
372func (kp *InterContainerProxy) isTopicSubscribedForResponse(topic string) bool {
373 kp.lockTopicResponseChannelMap.RLock()
374 defer kp.lockTopicResponseChannelMap.RUnlock()
375 _, exist := kp.topicToResponseChannelMap[topic]
376 return exist
377}
378
379func (kp *InterContainerProxy) deleteFromTopicResponseChannelMap(topic string) error {
380 kp.lockTopicResponseChannelMap.Lock()
381 defer kp.lockTopicResponseChannelMap.Unlock()
382 if _, exist := kp.topicToResponseChannelMap[topic]; exist {
383 // Unsubscribe to this topic first - this will close the subscribed channel
384 var err error
385 if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000386 logger.Errorw("unsubscribing-error", log.Fields{"topic": topic})
William Kurkianea869482019-04-09 15:16:11 -0400387 }
388 delete(kp.topicToResponseChannelMap, topic)
389 return err
390 } else {
391 return errors.New(fmt.Sprintf("%s-Topic-not-found", topic))
392 }
393}
394
395func (kp *InterContainerProxy) deleteAllTopicResponseChannelMap() error {
396 kp.lockTopicResponseChannelMap.Lock()
397 defer kp.lockTopicResponseChannelMap.Unlock()
398 var err error
399 for topic, _ := range kp.topicToResponseChannelMap {
400 // Unsubscribe to this topic first - this will close the subscribed channel
401 if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000402 logger.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
William Kurkianea869482019-04-09 15:16:11 -0400403 }
404 delete(kp.topicToResponseChannelMap, topic)
405 }
406 return err
407}
408
409func (kp *InterContainerProxy) addToTopicRequestHandlerChannelMap(topic string, arg *requestHandlerChannel) {
410 kp.lockTopicRequestHandlerChannelMap.Lock()
411 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
412 if _, exist := kp.topicToRequestHandlerChannelMap[topic]; !exist {
413 kp.topicToRequestHandlerChannelMap[topic] = arg
414 }
415}
416
417func (kp *InterContainerProxy) deleteFromTopicRequestHandlerChannelMap(topic string) error {
418 kp.lockTopicRequestHandlerChannelMap.Lock()
419 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
420 if _, exist := kp.topicToRequestHandlerChannelMap[topic]; exist {
421 // Close the kafka client client first by unsubscribing to this topic
422 kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch)
423 delete(kp.topicToRequestHandlerChannelMap, topic)
424 return nil
425 } else {
426 return errors.New(fmt.Sprintf("%s-Topic-not-found", topic))
427 }
428}
429
430func (kp *InterContainerProxy) deleteAllTopicRequestHandlerChannelMap() error {
431 kp.lockTopicRequestHandlerChannelMap.Lock()
432 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
433 var err error
434 for topic, _ := range kp.topicToRequestHandlerChannelMap {
435 // Close the kafka client client first by unsubscribing to this topic
436 if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000437 logger.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
William Kurkianea869482019-04-09 15:16:11 -0400438 }
439 delete(kp.topicToRequestHandlerChannelMap, topic)
440 }
441 return err
442}
443
444func (kp *InterContainerProxy) addToTransactionIdToChannelMap(id string, topic *Topic, arg chan *ic.InterContainerMessage) {
445 kp.lockTransactionIdToChannelMap.Lock()
446 defer kp.lockTransactionIdToChannelMap.Unlock()
447 if _, exist := kp.transactionIdToChannelMap[id]; !exist {
448 kp.transactionIdToChannelMap[id] = &transactionChannel{topic: topic, ch: arg}
449 }
450}
451
452func (kp *InterContainerProxy) deleteFromTransactionIdToChannelMap(id string) {
453 kp.lockTransactionIdToChannelMap.Lock()
454 defer kp.lockTransactionIdToChannelMap.Unlock()
455 if transChannel, exist := kp.transactionIdToChannelMap[id]; exist {
456 // Close the channel first
457 close(transChannel.ch)
458 delete(kp.transactionIdToChannelMap, id)
459 }
460}
461
462func (kp *InterContainerProxy) deleteTopicTransactionIdToChannelMap(id string) {
463 kp.lockTransactionIdToChannelMap.Lock()
464 defer kp.lockTransactionIdToChannelMap.Unlock()
465 for key, value := range kp.transactionIdToChannelMap {
466 if value.topic.Name == id {
467 close(value.ch)
468 delete(kp.transactionIdToChannelMap, key)
469 }
470 }
471}
472
473func (kp *InterContainerProxy) deleteAllTransactionIdToChannelMap() {
474 kp.lockTransactionIdToChannelMap.Lock()
475 defer kp.lockTransactionIdToChannelMap.Unlock()
476 for key, value := range kp.transactionIdToChannelMap {
477 close(value.ch)
478 delete(kp.transactionIdToChannelMap, key)
479 }
480}
481
482func (kp *InterContainerProxy) DeleteTopic(topic Topic) error {
483 // If we have any consumers on that topic we need to close them
484 if err := kp.deleteFromTopicResponseChannelMap(topic.Name); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000485 logger.Errorw("delete-from-topic-responsechannelmap-failed", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400486 }
487 if err := kp.deleteFromTopicRequestHandlerChannelMap(topic.Name); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000488 logger.Errorw("delete-from-topic-requesthandlerchannelmap-failed", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400489 }
490 kp.deleteTopicTransactionIdToChannelMap(topic.Name)
491
492 return kp.kafkaClient.DeleteTopic(&topic)
493}
494
495func encodeReturnedValue(returnedVal interface{}) (*any.Any, error) {
496 // Encode the response argument - needs to be a proto message
497 if returnedVal == nil {
498 return nil, nil
499 }
500 protoValue, ok := returnedVal.(proto.Message)
501 if !ok {
Esin Karamanccb714b2019-11-29 15:02:06 +0000502 logger.Warnw("response-value-not-proto-message", log.Fields{"error": ok, "returnVal": returnedVal})
William Kurkianea869482019-04-09 15:16:11 -0400503 err := errors.New("response-value-not-proto-message")
504 return nil, err
505 }
506
507 // Marshal the returned value, if any
508 var marshalledReturnedVal *any.Any
509 var err error
510 if marshalledReturnedVal, err = ptypes.MarshalAny(protoValue); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000511 logger.Warnw("cannot-marshal-returned-val", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400512 return nil, err
513 }
514 return marshalledReturnedVal, nil
515}
516
517func encodeDefaultFailedResponse(request *ic.InterContainerMessage) *ic.InterContainerMessage {
518 responseHeader := &ic.Header{
519 Id: request.Header.Id,
520 Type: ic.MessageType_RESPONSE,
521 FromTopic: request.Header.ToTopic,
522 ToTopic: request.Header.FromTopic,
523 Timestamp: time.Now().Unix(),
524 }
525 responseBody := &ic.InterContainerResponseBody{
526 Success: false,
527 Result: nil,
528 }
529 var marshalledResponseBody *any.Any
530 var err error
531 // Error should never happen here
532 if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000533 logger.Warnw("cannot-marshal-failed-response-body", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400534 }
535
536 return &ic.InterContainerMessage{
537 Header: responseHeader,
538 Body: marshalledResponseBody,
539 }
540
541}
542
543//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
544//or an error on failure
545func encodeResponse(request *ic.InterContainerMessage, success bool, returnedValues ...interface{}) (*ic.InterContainerMessage, error) {
Esin Karamanccb714b2019-11-29 15:02:06 +0000546 //logger.Debugw("encodeResponse", log.Fields{"success": success, "returnedValues": returnedValues})
William Kurkianea869482019-04-09 15:16:11 -0400547 responseHeader := &ic.Header{
548 Id: request.Header.Id,
549 Type: ic.MessageType_RESPONSE,
550 FromTopic: request.Header.ToTopic,
551 ToTopic: request.Header.FromTopic,
Matt Jeanneret384d8c92019-05-06 14:27:31 -0400552 KeyTopic: request.Header.KeyTopic,
William Kurkianea869482019-04-09 15:16:11 -0400553 Timestamp: time.Now().UnixNano(),
554 }
555
556 // Go over all returned values
557 var marshalledReturnedVal *any.Any
558 var err error
559 for _, returnVal := range returnedValues {
560 if marshalledReturnedVal, err = encodeReturnedValue(returnVal); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000561 logger.Warnw("cannot-marshal-response-body", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400562 }
563 break // for now we support only 1 returned value - (excluding the error)
564 }
565
566 responseBody := &ic.InterContainerResponseBody{
567 Success: success,
568 Result: marshalledReturnedVal,
569 }
570
571 // Marshal the response body
572 var marshalledResponseBody *any.Any
573 if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000574 logger.Warnw("cannot-marshal-response-body", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400575 return nil, err
576 }
577
578 return &ic.InterContainerMessage{
579 Header: responseHeader,
580 Body: marshalledResponseBody,
581 }, nil
582}
583
584func CallFuncByName(myClass interface{}, funcName string, params ...interface{}) (out []reflect.Value, err error) {
585 myClassValue := reflect.ValueOf(myClass)
586 // Capitalize the first letter in the funcName to workaround the first capital letters required to
587 // invoke a function from a different package
588 funcName = strings.Title(funcName)
589 m := myClassValue.MethodByName(funcName)
590 if !m.IsValid() {
591 return make([]reflect.Value, 0), fmt.Errorf("method-not-found \"%s\"", funcName)
592 }
593 in := make([]reflect.Value, len(params))
594 for i, param := range params {
595 in[i] = reflect.ValueOf(param)
596 }
597 out = m.Call(in)
598 return
599}
600
601func (kp *InterContainerProxy) addTransactionId(transactionId string, currentArgs []*ic.Argument) []*ic.Argument {
602 arg := &KVArg{
603 Key: TransactionKey,
604 Value: &ic.StrType{Val: transactionId},
605 }
606
607 var marshalledArg *any.Any
608 var err error
609 if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: transactionId}); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000610 logger.Warnw("cannot-add-transactionId", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400611 return currentArgs
612 }
613 protoArg := &ic.Argument{
614 Key: arg.Key,
615 Value: marshalledArg,
616 }
617 return append(currentArgs, protoArg)
618}
619
620func (kp *InterContainerProxy) addFromTopic(fromTopic string, currentArgs []*ic.Argument) []*ic.Argument {
621 var marshalledArg *any.Any
622 var err error
623 if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: fromTopic}); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000624 logger.Warnw("cannot-add-transactionId", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400625 return currentArgs
626 }
627 protoArg := &ic.Argument{
628 Key: FromTopic,
629 Value: marshalledArg,
630 }
631 return append(currentArgs, protoArg)
632}
633
634func (kp *InterContainerProxy) handleMessage(msg *ic.InterContainerMessage, targetInterface interface{}) {
635
636 // First extract the header to know whether this is a request - responses are handled by a different handler
637 if msg.Header.Type == ic.MessageType_REQUEST {
638 var out []reflect.Value
639 var err error
640
641 // Get the request body
642 requestBody := &ic.InterContainerRequestBody{}
643 if err = ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000644 logger.Warnw("cannot-unmarshal-request", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400645 } else {
Esin Karamanccb714b2019-11-29 15:02:06 +0000646 logger.Debugw("received-request", log.Fields{"rpc": requestBody.Rpc, "header": msg.Header})
William Kurkianea869482019-04-09 15:16:11 -0400647 // let the callee unpack the arguments as its the only one that knows the real proto type
648 // Augment the requestBody with the message Id as it will be used in scenarios where cores
649 // are set in pairs and competing
650 requestBody.Args = kp.addTransactionId(msg.Header.Id, requestBody.Args)
651
652 // Augment the requestBody with the From topic name as it will be used in scenarios where a container
653 // needs to send an unsollicited message to the currently requested container
654 requestBody.Args = kp.addFromTopic(msg.Header.FromTopic, requestBody.Args)
655
656 out, err = CallFuncByName(targetInterface, requestBody.Rpc, requestBody.Args)
657 if err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000658 logger.Warn(err)
William Kurkianea869482019-04-09 15:16:11 -0400659 }
660 }
661 // Response required?
662 if requestBody.ResponseRequired {
663 // If we already have an error before then just return that
664 var returnError *ic.Error
665 var returnedValues []interface{}
666 var success bool
667 if err != nil {
668 returnError = &ic.Error{Reason: err.Error()}
669 returnedValues = make([]interface{}, 1)
670 returnedValues[0] = returnError
671 } else {
672 returnedValues = make([]interface{}, 0)
673 // Check for errors first
674 lastIndex := len(out) - 1
675 if out[lastIndex].Interface() != nil { // Error
kdarapub26b4502019-10-05 03:02:33 +0530676 if retError, ok := out[lastIndex].Interface().(error); ok {
677 if retError.Error() == ErrorTransactionNotAcquired.Error() {
Esin Karamanccb714b2019-11-29 15:02:06 +0000678 logger.Debugw("Ignoring request", log.Fields{"error": retError, "txId": msg.Header.Id})
kdarapub26b4502019-10-05 03:02:33 +0530679 return // Ignore - process is in competing mode and ignored transaction
680 }
681 returnError = &ic.Error{Reason: retError.Error()}
William Kurkianea869482019-04-09 15:16:11 -0400682 returnedValues = append(returnedValues, returnError)
683 } else { // Should never happen
684 returnError = &ic.Error{Reason: "incorrect-error-returns"}
685 returnedValues = append(returnedValues, returnError)
686 }
687 } else if len(out) == 2 && reflect.ValueOf(out[0].Interface()).IsValid() && reflect.ValueOf(out[0].Interface()).IsNil() {
Esin Karamanccb714b2019-11-29 15:02:06 +0000688 logger.Warnw("Unexpected response of (nil,nil)", log.Fields{"txId": msg.Header.Id})
kdarapub26b4502019-10-05 03:02:33 +0530689 return // Ignore - should not happen
William Kurkianea869482019-04-09 15:16:11 -0400690 } else { // Non-error case
691 success = true
692 for idx, val := range out {
Esin Karamanccb714b2019-11-29 15:02:06 +0000693 //logger.Debugw("returned-api-response-loop", log.Fields{"idx": idx, "val": val.Interface()})
William Kurkianea869482019-04-09 15:16:11 -0400694 if idx != lastIndex {
695 returnedValues = append(returnedValues, val.Interface())
696 }
697 }
698 }
699 }
700
701 var icm *ic.InterContainerMessage
702 if icm, err = encodeResponse(msg, success, returnedValues...); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000703 logger.Warnw("error-encoding-response-returning-failure-result", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400704 icm = encodeDefaultFailedResponse(msg)
705 }
706 // To preserve ordering of messages, all messages to a given topic are sent to the same partition
707 // by providing a message key. The key is encoded in the topic name. If the deviceId is not
708 // present then the key will be empty, hence all messages for a given topic will be sent to all
709 // partitions.
710 replyTopic := &Topic{Name: msg.Header.FromTopic}
711 key := msg.Header.KeyTopic
Esin Karamanccb714b2019-11-29 15:02:06 +0000712 logger.Debugw("sending-response-to-kafka", log.Fields{"rpc": requestBody.Rpc, "header": icm.Header, "key": key})
William Kurkianea869482019-04-09 15:16:11 -0400713 // TODO: handle error response.
Matt Jeanneret384d8c92019-05-06 14:27:31 -0400714 go kp.kafkaClient.Send(icm, replyTopic, key)
William Kurkianea869482019-04-09 15:16:11 -0400715 }
716 } else if msg.Header.Type == ic.MessageType_RESPONSE {
Esin Karamanccb714b2019-11-29 15:02:06 +0000717 logger.Debugw("response-received", log.Fields{"msg-header": msg.Header})
William Kurkianea869482019-04-09 15:16:11 -0400718 go kp.dispatchResponse(msg)
719 } else {
Esin Karamanccb714b2019-11-29 15:02:06 +0000720 logger.Warnw("unsupported-message-received", log.Fields{"msg-header": msg.Header})
William Kurkianea869482019-04-09 15:16:11 -0400721 }
722}
723
724func (kp *InterContainerProxy) waitForMessages(ch <-chan *ic.InterContainerMessage, topic Topic, targetInterface interface{}) {
725 // Wait for messages
726 for msg := range ch {
Esin Karamanccb714b2019-11-29 15:02:06 +0000727 //logger.Debugw("request-received", log.Fields{"msg": msg, "topic": topic.Name, "target": targetInterface})
William Kurkianea869482019-04-09 15:16:11 -0400728 go kp.handleMessage(msg, targetInterface)
729 }
730}
731
732func (kp *InterContainerProxy) dispatchResponse(msg *ic.InterContainerMessage) {
733 kp.lockTransactionIdToChannelMap.RLock()
734 defer kp.lockTransactionIdToChannelMap.RUnlock()
735 if _, exist := kp.transactionIdToChannelMap[msg.Header.Id]; !exist {
Esin Karamanccb714b2019-11-29 15:02:06 +0000736 logger.Debugw("no-waiting-channel", log.Fields{"transaction": msg.Header.Id})
William Kurkianea869482019-04-09 15:16:11 -0400737 return
738 }
739 kp.transactionIdToChannelMap[msg.Header.Id].ch <- msg
740}
741
742// subscribeForResponse allows a caller to subscribe to a given topic when waiting for a response.
743// This method is built to prevent all subscribers to receive all messages as is the case of the Subscribe
744// API. There is one response channel waiting for kafka messages before dispatching the message to the
745// corresponding waiting channel
746func (kp *InterContainerProxy) subscribeForResponse(topic Topic, trnsId string) (chan *ic.InterContainerMessage, error) {
Esin Karamanccb714b2019-11-29 15:02:06 +0000747 logger.Debugw("subscribeForResponse", log.Fields{"topic": topic.Name, "trnsid": trnsId})
William Kurkianea869482019-04-09 15:16:11 -0400748
749 // Create a specific channel for this consumers. We cannot use the channel from the kafkaclient as it will
750 // broadcast any message for this topic to all channels waiting on it.
751 ch := make(chan *ic.InterContainerMessage)
752 kp.addToTransactionIdToChannelMap(trnsId, &topic, ch)
753
754 return ch, nil
755}
756
757func (kp *InterContainerProxy) unSubscribeForResponse(trnsId string) error {
Esin Karamanccb714b2019-11-29 15:02:06 +0000758 logger.Debugw("unsubscribe-for-response", log.Fields{"trnsId": trnsId})
William Kurkianea869482019-04-09 15:16:11 -0400759 kp.deleteFromTransactionIdToChannelMap(trnsId)
760 return nil
761}
762
cbabu95f21522019-11-13 14:25:18 +0100763func (kp *InterContainerProxy) EnableLivenessChannel(enable bool) chan bool {
764 return kp.kafkaClient.EnableLivenessChannel(enable)
765}
766
Scott Baker86fce9a2019-12-12 09:47:17 -0800767func (kp *InterContainerProxy) EnableHealthinessChannel(enable bool) chan bool {
768 return kp.kafkaClient.EnableHealthinessChannel(enable)
769}
770
cbabu95f21522019-11-13 14:25:18 +0100771func (kp *InterContainerProxy) SendLiveness() error {
772 return kp.kafkaClient.SendLiveness()
773}
774
William Kurkianea869482019-04-09 15:16:11 -0400775//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
776//or an error on failure
777func encodeRequest(rpc string, toTopic *Topic, replyTopic *Topic, key string, kvArgs ...*KVArg) (*ic.InterContainerMessage, error) {
778 requestHeader := &ic.Header{
779 Id: uuid.New().String(),
780 Type: ic.MessageType_REQUEST,
781 FromTopic: replyTopic.Name,
782 ToTopic: toTopic.Name,
Matt Jeanneret384d8c92019-05-06 14:27:31 -0400783 KeyTopic: key,
William Kurkianea869482019-04-09 15:16:11 -0400784 Timestamp: time.Now().UnixNano(),
785 }
786 requestBody := &ic.InterContainerRequestBody{
787 Rpc: rpc,
788 ResponseRequired: true,
789 ReplyToTopic: replyTopic.Name,
790 }
791
792 for _, arg := range kvArgs {
793 if arg == nil {
794 // In case the caller sends an array with empty args
795 continue
796 }
797 var marshalledArg *any.Any
798 var err error
799 // ascertain the value interface type is a proto.Message
800 protoValue, ok := arg.Value.(proto.Message)
801 if !ok {
Esin Karamanccb714b2019-11-29 15:02:06 +0000802 logger.Warnw("argument-value-not-proto-message", log.Fields{"error": ok, "Value": arg.Value})
William Kurkianea869482019-04-09 15:16:11 -0400803 err := errors.New("argument-value-not-proto-message")
804 return nil, err
805 }
806 if marshalledArg, err = ptypes.MarshalAny(protoValue); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000807 logger.Warnw("cannot-marshal-request", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400808 return nil, err
809 }
810 protoArg := &ic.Argument{
811 Key: arg.Key,
812 Value: marshalledArg,
813 }
814 requestBody.Args = append(requestBody.Args, protoArg)
815 }
816
817 var marshalledData *any.Any
818 var err error
819 if marshalledData, err = ptypes.MarshalAny(requestBody); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000820 logger.Warnw("cannot-marshal-request", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400821 return nil, err
822 }
823 request := &ic.InterContainerMessage{
824 Header: requestHeader,
825 Body: marshalledData,
826 }
827 return request, nil
828}
829
830func decodeResponse(response *ic.InterContainerMessage) (*ic.InterContainerResponseBody, error) {
831 // Extract the message body
832 responseBody := ic.InterContainerResponseBody{}
833 if err := ptypes.UnmarshalAny(response.Body, &responseBody); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000834 logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400835 return nil, err
836 }
Esin Karamanccb714b2019-11-29 15:02:06 +0000837 //logger.Debugw("response-decoded-successfully", log.Fields{"response-status": &responseBody.Success})
William Kurkianea869482019-04-09 15:16:11 -0400838
839 return &responseBody, nil
840
841}