blob: a75c1b6b8667c72ba79986f4b84cd38ca258940b [file] [log] [blame]
Scott Baker2c1c4822019-10-16 11:02:41 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kafka
17
18import (
19 "context"
20 "errors"
21 "fmt"
Scott Baker2c1c4822019-10-16 11:02:41 -070022 "reflect"
23 "strings"
24 "sync"
25 "time"
serkant.uluderyab38671c2019-11-01 09:35:38 -070026
27 "github.com/golang/protobuf/proto"
28 "github.com/golang/protobuf/ptypes"
29 "github.com/golang/protobuf/ptypes/any"
30 "github.com/google/uuid"
31 "github.com/opencord/voltha-lib-go/v3/pkg/log"
32 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
Scott Baker2c1c4822019-10-16 11:02:41 -070033)
34
Scott Baker2c1c4822019-10-16 11:02:41 -070035const (
36 DefaultMaxRetries = 3
37 DefaultRequestTimeout = 10000 // 10000 milliseconds - to handle a wider latency range
38)
39
40const (
41 TransactionKey = "transactionID"
42 FromTopic = "fromTopic"
43)
44
45var ErrorTransactionNotAcquired = errors.New("transaction-not-acquired")
46var ErrorTransactionInvalidId = errors.New("transaction-invalid-id")
47
48// requestHandlerChannel represents an interface associated with a channel. Whenever, an event is
49// obtained from that channel, this interface is invoked. This is used to handle
50// async requests into the Core via the kafka messaging bus
51type requestHandlerChannel struct {
52 requesthandlerInterface interface{}
53 ch <-chan *ic.InterContainerMessage
54}
55
56// transactionChannel represents a combination of a topic and a channel onto which a response received
57// on the kafka bus will be sent to
58type transactionChannel struct {
59 topic *Topic
60 ch chan *ic.InterContainerMessage
61}
62
Matteo Scandolo2ba00d32020-01-16 17:33:03 -080063type InterContainerProxy interface {
64 Start() error
65 Stop()
66 DeviceDiscovered(deviceId string, deviceType string, parentId string, publisher string) error
67 InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic, waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any)
68 SubscribeWithRequestHandlerInterface(topic Topic, handler interface{}) error
69 SubscribeWithDefaultRequestHandler(topic Topic, initialOffset int64) error
70 UnSubscribeFromRequestHandler(topic Topic) error
71 DeleteTopic(topic Topic) error
72}
73
74// interContainerProxy represents the messaging proxy
75type interContainerProxy struct {
Scott Baker2c1c4822019-10-16 11:02:41 -070076 kafkaHost string
77 kafkaPort int
78 DefaultTopic *Topic
79 defaultRequestHandlerInterface interface{}
80 deviceDiscoveryTopic *Topic
81 kafkaClient Client
82 doneCh chan int
83
84 // This map is used to map a topic to an interface and channel. When a request is received
85 // on that channel (registered to the topic) then that interface is invoked.
86 topicToRequestHandlerChannelMap map[string]*requestHandlerChannel
87 lockTopicRequestHandlerChannelMap sync.RWMutex
88
89 // This map is used to map a channel to a response topic. This channel handles all responses on that
90 // channel for that topic and forward them to the appropriate consumers channel, using the
91 // transactionIdToChannelMap.
92 topicToResponseChannelMap map[string]<-chan *ic.InterContainerMessage
93 lockTopicResponseChannelMap sync.RWMutex
94
95 // This map is used to map a transaction to a consumers channel. This is used whenever a request has been
96 // sent out and we are waiting for a response.
97 transactionIdToChannelMap map[string]*transactionChannel
98 lockTransactionIdToChannelMap sync.RWMutex
99}
100
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800101type InterContainerProxyOption func(*interContainerProxy)
Scott Baker2c1c4822019-10-16 11:02:41 -0700102
103func InterContainerHost(host string) InterContainerProxyOption {
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800104 return func(args *interContainerProxy) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700105 args.kafkaHost = host
106 }
107}
108
109func InterContainerPort(port int) InterContainerProxyOption {
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800110 return func(args *interContainerProxy) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700111 args.kafkaPort = port
112 }
113}
114
115func DefaultTopic(topic *Topic) InterContainerProxyOption {
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800116 return func(args *interContainerProxy) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700117 args.DefaultTopic = topic
118 }
119}
120
121func DeviceDiscoveryTopic(topic *Topic) InterContainerProxyOption {
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800122 return func(args *interContainerProxy) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700123 args.deviceDiscoveryTopic = topic
124 }
125}
126
127func RequestHandlerInterface(handler interface{}) InterContainerProxyOption {
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800128 return func(args *interContainerProxy) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700129 args.defaultRequestHandlerInterface = handler
130 }
131}
132
133func MsgClient(client Client) InterContainerProxyOption {
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800134 return func(args *interContainerProxy) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700135 args.kafkaClient = client
136 }
137}
138
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800139func newInterContainerProxy(opts ...InterContainerProxyOption) (*interContainerProxy, error) {
140 proxy := &interContainerProxy{
Scott Baker2c1c4822019-10-16 11:02:41 -0700141 kafkaHost: DefaultKafkaHost,
142 kafkaPort: DefaultKafkaPort,
143 }
144
145 for _, option := range opts {
146 option(proxy)
147 }
148
149 // Create the locks for all the maps
150 proxy.lockTopicRequestHandlerChannelMap = sync.RWMutex{}
151 proxy.lockTransactionIdToChannelMap = sync.RWMutex{}
152 proxy.lockTopicResponseChannelMap = sync.RWMutex{}
153
154 return proxy, nil
155}
156
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800157func NewInterContainerProxy(opts ...InterContainerProxyOption) (InterContainerProxy, error) {
158 return newInterContainerProxy(opts...)
159}
160
161func (kp *interContainerProxy) Start() error {
khenaidoob332f9b2020-01-16 16:25:26 -0500162 logger.Info("Starting-Proxy")
Scott Baker2c1c4822019-10-16 11:02:41 -0700163
164 // Kafka MsgClient should already have been created. If not, output fatal error
165 if kp.kafkaClient == nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500166 logger.Fatal("kafka-client-not-set")
Scott Baker2c1c4822019-10-16 11:02:41 -0700167 }
168
169 // Create the Done channel
170 kp.doneCh = make(chan int, 1)
171
172 // Start the kafka client
173 if err := kp.kafkaClient.Start(); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500174 logger.Errorw("Cannot-create-kafka-proxy", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700175 return err
176 }
177
178 // Create the topic to response channel map
179 kp.topicToResponseChannelMap = make(map[string]<-chan *ic.InterContainerMessage)
180 //
181 // Create the transactionId to Channel Map
182 kp.transactionIdToChannelMap = make(map[string]*transactionChannel)
183
184 // Create the topic to request channel map
185 kp.topicToRequestHandlerChannelMap = make(map[string]*requestHandlerChannel)
186
187 return nil
188}
189
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800190func (kp *interContainerProxy) Stop() {
khenaidoob332f9b2020-01-16 16:25:26 -0500191 logger.Info("stopping-intercontainer-proxy")
Scott Baker2c1c4822019-10-16 11:02:41 -0700192 kp.doneCh <- 1
193 // TODO : Perform cleanup
194 kp.kafkaClient.Stop()
195 //kp.deleteAllTopicRequestHandlerChannelMap()
196 //kp.deleteAllTopicResponseChannelMap()
197 //kp.deleteAllTransactionIdToChannelMap()
198}
199
200// DeviceDiscovered publish the discovered device onto the kafka messaging bus
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800201func (kp *interContainerProxy) DeviceDiscovered(deviceId string, deviceType string, parentId string, publisher string) error {
khenaidoob332f9b2020-01-16 16:25:26 -0500202 logger.Debugw("sending-device-discovery-msg", log.Fields{"deviceId": deviceId})
Scott Baker2c1c4822019-10-16 11:02:41 -0700203 // Simple validation
204 if deviceId == "" || deviceType == "" {
khenaidoob332f9b2020-01-16 16:25:26 -0500205 logger.Errorw("invalid-parameters", log.Fields{"id": deviceId, "type": deviceType})
Scott Baker2c1c4822019-10-16 11:02:41 -0700206 return errors.New("invalid-parameters")
207 }
208 // Create the device discovery message
209 header := &ic.Header{
210 Id: uuid.New().String(),
211 Type: ic.MessageType_DEVICE_DISCOVERED,
212 FromTopic: kp.DefaultTopic.Name,
213 ToTopic: kp.deviceDiscoveryTopic.Name,
214 Timestamp: time.Now().UnixNano(),
215 }
216 body := &ic.DeviceDiscovered{
217 Id: deviceId,
218 DeviceType: deviceType,
219 ParentId: parentId,
220 Publisher: publisher,
221 }
222
223 var marshalledData *any.Any
224 var err error
225 if marshalledData, err = ptypes.MarshalAny(body); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500226 logger.Errorw("cannot-marshal-request", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700227 return err
228 }
229 msg := &ic.InterContainerMessage{
230 Header: header,
231 Body: marshalledData,
232 }
233
234 // Send the message
235 if err := kp.kafkaClient.Send(msg, kp.deviceDiscoveryTopic); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500236 logger.Errorw("cannot-send-device-discovery-message", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700237 return err
238 }
239 return nil
240}
241
242// InvokeRPC is used to send a request to a given topic
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800243func (kp *interContainerProxy) InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
Scott Baker2c1c4822019-10-16 11:02:41 -0700244 waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any) {
245
246 // If a replyToTopic is provided then we use it, otherwise just use the default toTopic. The replyToTopic is
247 // typically the device ID.
248 responseTopic := replyToTopic
249 if responseTopic == nil {
250 responseTopic = kp.DefaultTopic
251 }
252
253 // Encode the request
254 protoRequest, err := encodeRequest(rpc, toTopic, responseTopic, key, kvArgs...)
255 if err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500256 logger.Warnw("cannot-format-request", log.Fields{"rpc": rpc, "error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700257 return false, nil
258 }
259
260 // Subscribe for response, if needed, before sending request
261 var ch <-chan *ic.InterContainerMessage
262 if waitForResponse {
263 var err error
264 if ch, err = kp.subscribeForResponse(*responseTopic, protoRequest.Header.Id); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500265 logger.Errorw("failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700266 }
267 }
268
269 // Send request - if the topic is formatted with a device Id then we will send the request using a
270 // specific key, hence ensuring a single partition is used to publish the request. This ensures that the
271 // subscriber on that topic will receive the request in the order it was sent. The key used is the deviceId.
272 //key := GetDeviceIdFromTopic(*toTopic)
khenaidoob332f9b2020-01-16 16:25:26 -0500273 logger.Debugw("sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
Scott Baker2c1c4822019-10-16 11:02:41 -0700274 go kp.kafkaClient.Send(protoRequest, toTopic, key)
275
276 if waitForResponse {
277 // Create a child context based on the parent context, if any
278 var cancel context.CancelFunc
279 childCtx := context.Background()
280 if ctx == nil {
281 ctx, cancel = context.WithTimeout(context.Background(), DefaultRequestTimeout*time.Millisecond)
282 } else {
283 childCtx, cancel = context.WithTimeout(ctx, DefaultRequestTimeout*time.Millisecond)
284 }
285 defer cancel()
286
287 // Wait for response as well as timeout or cancellation
288 // Remove the subscription for a response on return
289 defer kp.unSubscribeForResponse(protoRequest.Header.Id)
290 select {
291 case msg, ok := <-ch:
292 if !ok {
khenaidoob332f9b2020-01-16 16:25:26 -0500293 logger.Warnw("channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700294 protoError := &ic.Error{Reason: "channel-closed"}
295 var marshalledArg *any.Any
296 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
297 return false, nil // Should never happen
298 }
299 return false, marshalledArg
300 }
khenaidoob332f9b2020-01-16 16:25:26 -0500301 logger.Debugw("received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
Scott Baker2c1c4822019-10-16 11:02:41 -0700302 var responseBody *ic.InterContainerResponseBody
303 var err error
304 if responseBody, err = decodeResponse(msg); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500305 logger.Errorw("decode-response-error", log.Fields{"error": err})
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800306 // FIXME we should return something
Scott Baker2c1c4822019-10-16 11:02:41 -0700307 }
308 return responseBody.Success, responseBody.Result
309 case <-ctx.Done():
khenaidoob332f9b2020-01-16 16:25:26 -0500310 logger.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
Scott Baker2c1c4822019-10-16 11:02:41 -0700311 // pack the error as proto any type
312 protoError := &ic.Error{Reason: ctx.Err().Error()}
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800313
314 // FIXME we need to return a Code together with the reason
315 //protoError := &ic.Error{Reason: childCtx.Err().Error(), Code: codes.DeadlineExceeded}
Scott Baker2c1c4822019-10-16 11:02:41 -0700316 var marshalledArg *any.Any
317 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
318 return false, nil // Should never happen
319 }
320 return false, marshalledArg
321 case <-childCtx.Done():
khenaidoob332f9b2020-01-16 16:25:26 -0500322 logger.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": childCtx.Err()})
Scott Baker2c1c4822019-10-16 11:02:41 -0700323 // pack the error as proto any type
324 protoError := &ic.Error{Reason: childCtx.Err().Error()}
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800325
326 // FIXME we need to return a Code together with the reason
327 //protoError := &ic.Error{Reason: childCtx.Err().Error(), Code: codes.DeadlineExceeded}
Scott Baker2c1c4822019-10-16 11:02:41 -0700328 var marshalledArg *any.Any
329 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
330 return false, nil // Should never happen
331 }
332 return false, marshalledArg
333 case <-kp.doneCh:
khenaidoob332f9b2020-01-16 16:25:26 -0500334 logger.Infow("received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
Scott Baker2c1c4822019-10-16 11:02:41 -0700335 return true, nil
336 }
337 }
338 return true, nil
339}
340
341// SubscribeWithRequestHandlerInterface allows a caller to assign a target object to be invoked automatically
342// when a message is received on a given topic
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800343func (kp *interContainerProxy) SubscribeWithRequestHandlerInterface(topic Topic, handler interface{}) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700344
345 // Subscribe to receive messages for that topic
346 var ch <-chan *ic.InterContainerMessage
347 var err error
348 if ch, err = kp.kafkaClient.Subscribe(&topic); err != nil {
349 //if ch, err = kp.Subscribe(topic); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500350 logger.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700351 return err
352 }
353
354 kp.defaultRequestHandlerInterface = handler
355 kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: handler, ch: ch})
356 // Launch a go routine to receive and process kafka messages
357 go kp.waitForMessages(ch, topic, handler)
358
359 return nil
360}
361
362// SubscribeWithDefaultRequestHandler allows a caller to add a topic to an existing target object to be invoked automatically
363// when a message is received on a given topic. So far there is only 1 target registered per microservice
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800364func (kp *interContainerProxy) SubscribeWithDefaultRequestHandler(topic Topic, initialOffset int64) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700365 // Subscribe to receive messages for that topic
366 var ch <-chan *ic.InterContainerMessage
367 var err error
368 if ch, err = kp.kafkaClient.Subscribe(&topic, &KVArg{Key: Offset, Value: initialOffset}); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500369 logger.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700370 return err
371 }
372 kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: kp.defaultRequestHandlerInterface, ch: ch})
373
374 // Launch a go routine to receive and process kafka messages
375 go kp.waitForMessages(ch, topic, kp.defaultRequestHandlerInterface)
376
377 return nil
378}
379
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800380func (kp *interContainerProxy) UnSubscribeFromRequestHandler(topic Topic) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700381 return kp.deleteFromTopicRequestHandlerChannelMap(topic.Name)
382}
383
384// setupTopicResponseChannelMap sets up single consumers channel that will act as a broadcast channel for all
385// responses from that topic.
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800386func (kp *interContainerProxy) setupTopicResponseChannelMap(topic string, arg <-chan *ic.InterContainerMessage) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700387 kp.lockTopicResponseChannelMap.Lock()
388 defer kp.lockTopicResponseChannelMap.Unlock()
389 if _, exist := kp.topicToResponseChannelMap[topic]; !exist {
390 kp.topicToResponseChannelMap[topic] = arg
391 }
392}
393
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800394func (kp *interContainerProxy) isTopicSubscribedForResponse(topic string) bool {
Scott Baker2c1c4822019-10-16 11:02:41 -0700395 kp.lockTopicResponseChannelMap.RLock()
396 defer kp.lockTopicResponseChannelMap.RUnlock()
397 _, exist := kp.topicToResponseChannelMap[topic]
398 return exist
399}
400
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800401func (kp *interContainerProxy) deleteFromTopicResponseChannelMap(topic string) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700402 kp.lockTopicResponseChannelMap.Lock()
403 defer kp.lockTopicResponseChannelMap.Unlock()
404 if _, exist := kp.topicToResponseChannelMap[topic]; exist {
405 // Unsubscribe to this topic first - this will close the subscribed channel
406 var err error
407 if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500408 logger.Errorw("unsubscribing-error", log.Fields{"topic": topic})
Scott Baker2c1c4822019-10-16 11:02:41 -0700409 }
410 delete(kp.topicToResponseChannelMap, topic)
411 return err
412 } else {
413 return errors.New(fmt.Sprintf("%s-Topic-not-found", topic))
414 }
415}
416
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800417func (kp *interContainerProxy) deleteAllTopicResponseChannelMap() error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700418 kp.lockTopicResponseChannelMap.Lock()
419 defer kp.lockTopicResponseChannelMap.Unlock()
420 var err error
421 for topic, _ := range kp.topicToResponseChannelMap {
422 // Unsubscribe to this topic first - this will close the subscribed channel
423 if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500424 logger.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700425 }
426 delete(kp.topicToResponseChannelMap, topic)
427 }
428 return err
429}
430
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800431func (kp *interContainerProxy) addToTopicRequestHandlerChannelMap(topic string, arg *requestHandlerChannel) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700432 kp.lockTopicRequestHandlerChannelMap.Lock()
433 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
434 if _, exist := kp.topicToRequestHandlerChannelMap[topic]; !exist {
435 kp.topicToRequestHandlerChannelMap[topic] = arg
436 }
437}
438
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800439func (kp *interContainerProxy) deleteFromTopicRequestHandlerChannelMap(topic string) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700440 kp.lockTopicRequestHandlerChannelMap.Lock()
441 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
442 if _, exist := kp.topicToRequestHandlerChannelMap[topic]; exist {
443 // Close the kafka client client first by unsubscribing to this topic
444 kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch)
445 delete(kp.topicToRequestHandlerChannelMap, topic)
446 return nil
447 } else {
448 return errors.New(fmt.Sprintf("%s-Topic-not-found", topic))
449 }
450}
451
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800452func (kp *interContainerProxy) deleteAllTopicRequestHandlerChannelMap() error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700453 kp.lockTopicRequestHandlerChannelMap.Lock()
454 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
455 var err error
456 for topic, _ := range kp.topicToRequestHandlerChannelMap {
457 // Close the kafka client client first by unsubscribing to this topic
458 if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500459 logger.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700460 }
461 delete(kp.topicToRequestHandlerChannelMap, topic)
462 }
463 return err
464}
465
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800466func (kp *interContainerProxy) addToTransactionIdToChannelMap(id string, topic *Topic, arg chan *ic.InterContainerMessage) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700467 kp.lockTransactionIdToChannelMap.Lock()
468 defer kp.lockTransactionIdToChannelMap.Unlock()
469 if _, exist := kp.transactionIdToChannelMap[id]; !exist {
470 kp.transactionIdToChannelMap[id] = &transactionChannel{topic: topic, ch: arg}
471 }
472}
473
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800474func (kp *interContainerProxy) deleteFromTransactionIdToChannelMap(id string) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700475 kp.lockTransactionIdToChannelMap.Lock()
476 defer kp.lockTransactionIdToChannelMap.Unlock()
477 if transChannel, exist := kp.transactionIdToChannelMap[id]; exist {
478 // Close the channel first
479 close(transChannel.ch)
480 delete(kp.transactionIdToChannelMap, id)
481 }
482}
483
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800484func (kp *interContainerProxy) deleteTopicTransactionIdToChannelMap(id string) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700485 kp.lockTransactionIdToChannelMap.Lock()
486 defer kp.lockTransactionIdToChannelMap.Unlock()
487 for key, value := range kp.transactionIdToChannelMap {
488 if value.topic.Name == id {
489 close(value.ch)
490 delete(kp.transactionIdToChannelMap, key)
491 }
492 }
493}
494
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800495func (kp *interContainerProxy) deleteAllTransactionIdToChannelMap() {
Scott Baker2c1c4822019-10-16 11:02:41 -0700496 kp.lockTransactionIdToChannelMap.Lock()
497 defer kp.lockTransactionIdToChannelMap.Unlock()
498 for key, value := range kp.transactionIdToChannelMap {
499 close(value.ch)
500 delete(kp.transactionIdToChannelMap, key)
501 }
502}
503
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800504func (kp *interContainerProxy) DeleteTopic(topic Topic) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700505 // If we have any consumers on that topic we need to close them
506 if err := kp.deleteFromTopicResponseChannelMap(topic.Name); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500507 logger.Errorw("delete-from-topic-responsechannelmap-failed", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700508 }
509 if err := kp.deleteFromTopicRequestHandlerChannelMap(topic.Name); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500510 logger.Errorw("delete-from-topic-requesthandlerchannelmap-failed", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700511 }
512 kp.deleteTopicTransactionIdToChannelMap(topic.Name)
513
514 return kp.kafkaClient.DeleteTopic(&topic)
515}
516
517func encodeReturnedValue(returnedVal interface{}) (*any.Any, error) {
518 // Encode the response argument - needs to be a proto message
519 if returnedVal == nil {
520 return nil, nil
521 }
522 protoValue, ok := returnedVal.(proto.Message)
523 if !ok {
khenaidoob332f9b2020-01-16 16:25:26 -0500524 logger.Warnw("response-value-not-proto-message", log.Fields{"error": ok, "returnVal": returnedVal})
Scott Baker2c1c4822019-10-16 11:02:41 -0700525 err := errors.New("response-value-not-proto-message")
526 return nil, err
527 }
528
529 // Marshal the returned value, if any
530 var marshalledReturnedVal *any.Any
531 var err error
532 if marshalledReturnedVal, err = ptypes.MarshalAny(protoValue); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500533 logger.Warnw("cannot-marshal-returned-val", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700534 return nil, err
535 }
536 return marshalledReturnedVal, nil
537}
538
539func encodeDefaultFailedResponse(request *ic.InterContainerMessage) *ic.InterContainerMessage {
540 responseHeader := &ic.Header{
541 Id: request.Header.Id,
542 Type: ic.MessageType_RESPONSE,
543 FromTopic: request.Header.ToTopic,
544 ToTopic: request.Header.FromTopic,
Kent Hagermanccfa2132019-12-17 13:29:34 -0500545 Timestamp: time.Now().UnixNano(),
Scott Baker2c1c4822019-10-16 11:02:41 -0700546 }
547 responseBody := &ic.InterContainerResponseBody{
548 Success: false,
549 Result: nil,
550 }
551 var marshalledResponseBody *any.Any
552 var err error
553 // Error should never happen here
554 if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500555 logger.Warnw("cannot-marshal-failed-response-body", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700556 }
557
558 return &ic.InterContainerMessage{
559 Header: responseHeader,
560 Body: marshalledResponseBody,
561 }
562
563}
564
565//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
566//or an error on failure
567func encodeResponse(request *ic.InterContainerMessage, success bool, returnedValues ...interface{}) (*ic.InterContainerMessage, error) {
khenaidoob332f9b2020-01-16 16:25:26 -0500568 //logger.Debugw("encodeResponse", log.Fields{"success": success, "returnedValues": returnedValues})
Scott Baker2c1c4822019-10-16 11:02:41 -0700569 responseHeader := &ic.Header{
570 Id: request.Header.Id,
571 Type: ic.MessageType_RESPONSE,
572 FromTopic: request.Header.ToTopic,
573 ToTopic: request.Header.FromTopic,
574 KeyTopic: request.Header.KeyTopic,
575 Timestamp: time.Now().UnixNano(),
576 }
577
578 // Go over all returned values
579 var marshalledReturnedVal *any.Any
580 var err error
581 for _, returnVal := range returnedValues {
582 if marshalledReturnedVal, err = encodeReturnedValue(returnVal); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500583 logger.Warnw("cannot-marshal-response-body", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700584 }
585 break // for now we support only 1 returned value - (excluding the error)
586 }
587
588 responseBody := &ic.InterContainerResponseBody{
589 Success: success,
590 Result: marshalledReturnedVal,
591 }
592
593 // Marshal the response body
594 var marshalledResponseBody *any.Any
595 if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500596 logger.Warnw("cannot-marshal-response-body", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700597 return nil, err
598 }
599
600 return &ic.InterContainerMessage{
601 Header: responseHeader,
602 Body: marshalledResponseBody,
603 }, nil
604}
605
606func CallFuncByName(myClass interface{}, funcName string, params ...interface{}) (out []reflect.Value, err error) {
607 myClassValue := reflect.ValueOf(myClass)
608 // Capitalize the first letter in the funcName to workaround the first capital letters required to
609 // invoke a function from a different package
610 funcName = strings.Title(funcName)
611 m := myClassValue.MethodByName(funcName)
612 if !m.IsValid() {
613 return make([]reflect.Value, 0), fmt.Errorf("method-not-found \"%s\"", funcName)
614 }
615 in := make([]reflect.Value, len(params))
616 for i, param := range params {
617 in[i] = reflect.ValueOf(param)
618 }
619 out = m.Call(in)
620 return
621}
622
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800623func (kp *interContainerProxy) addTransactionId(transactionId string, currentArgs []*ic.Argument) []*ic.Argument {
Scott Baker2c1c4822019-10-16 11:02:41 -0700624 arg := &KVArg{
625 Key: TransactionKey,
626 Value: &ic.StrType{Val: transactionId},
627 }
628
629 var marshalledArg *any.Any
630 var err error
631 if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: transactionId}); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500632 logger.Warnw("cannot-add-transactionId", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700633 return currentArgs
634 }
635 protoArg := &ic.Argument{
636 Key: arg.Key,
637 Value: marshalledArg,
638 }
639 return append(currentArgs, protoArg)
640}
641
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800642func (kp *interContainerProxy) addFromTopic(fromTopic string, currentArgs []*ic.Argument) []*ic.Argument {
Scott Baker2c1c4822019-10-16 11:02:41 -0700643 var marshalledArg *any.Any
644 var err error
645 if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: fromTopic}); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500646 logger.Warnw("cannot-add-transactionId", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700647 return currentArgs
648 }
649 protoArg := &ic.Argument{
650 Key: FromTopic,
651 Value: marshalledArg,
652 }
653 return append(currentArgs, protoArg)
654}
655
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800656func (kp *interContainerProxy) handleMessage(msg *ic.InterContainerMessage, targetInterface interface{}) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700657
658 // First extract the header to know whether this is a request - responses are handled by a different handler
659 if msg.Header.Type == ic.MessageType_REQUEST {
660 var out []reflect.Value
661 var err error
662
663 // Get the request body
664 requestBody := &ic.InterContainerRequestBody{}
665 if err = ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500666 logger.Warnw("cannot-unmarshal-request", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700667 } else {
khenaidoob332f9b2020-01-16 16:25:26 -0500668 logger.Debugw("received-request", log.Fields{"rpc": requestBody.Rpc, "header": msg.Header})
Scott Baker2c1c4822019-10-16 11:02:41 -0700669 // let the callee unpack the arguments as its the only one that knows the real proto type
670 // Augment the requestBody with the message Id as it will be used in scenarios where cores
671 // are set in pairs and competing
672 requestBody.Args = kp.addTransactionId(msg.Header.Id, requestBody.Args)
673
674 // Augment the requestBody with the From topic name as it will be used in scenarios where a container
675 // needs to send an unsollicited message to the currently requested container
676 requestBody.Args = kp.addFromTopic(msg.Header.FromTopic, requestBody.Args)
677
678 out, err = CallFuncByName(targetInterface, requestBody.Rpc, requestBody.Args)
679 if err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500680 logger.Warn(err)
Scott Baker2c1c4822019-10-16 11:02:41 -0700681 }
682 }
683 // Response required?
684 if requestBody.ResponseRequired {
685 // If we already have an error before then just return that
686 var returnError *ic.Error
687 var returnedValues []interface{}
688 var success bool
689 if err != nil {
690 returnError = &ic.Error{Reason: err.Error()}
691 returnedValues = make([]interface{}, 1)
692 returnedValues[0] = returnError
693 } else {
694 returnedValues = make([]interface{}, 0)
695 // Check for errors first
696 lastIndex := len(out) - 1
697 if out[lastIndex].Interface() != nil { // Error
698 if retError, ok := out[lastIndex].Interface().(error); ok {
699 if retError.Error() == ErrorTransactionNotAcquired.Error() {
khenaidoob332f9b2020-01-16 16:25:26 -0500700 logger.Debugw("Ignoring request", log.Fields{"error": retError, "txId": msg.Header.Id})
Scott Baker2c1c4822019-10-16 11:02:41 -0700701 return // Ignore - process is in competing mode and ignored transaction
702 }
703 returnError = &ic.Error{Reason: retError.Error()}
704 returnedValues = append(returnedValues, returnError)
705 } else { // Should never happen
706 returnError = &ic.Error{Reason: "incorrect-error-returns"}
707 returnedValues = append(returnedValues, returnError)
708 }
709 } else if len(out) == 2 && reflect.ValueOf(out[0].Interface()).IsValid() && reflect.ValueOf(out[0].Interface()).IsNil() {
khenaidoob332f9b2020-01-16 16:25:26 -0500710 logger.Warnw("Unexpected response of (nil,nil)", log.Fields{"txId": msg.Header.Id})
Scott Baker2c1c4822019-10-16 11:02:41 -0700711 return // Ignore - should not happen
712 } else { // Non-error case
713 success = true
714 for idx, val := range out {
khenaidoob332f9b2020-01-16 16:25:26 -0500715 //logger.Debugw("returned-api-response-loop", log.Fields{"idx": idx, "val": val.Interface()})
Scott Baker2c1c4822019-10-16 11:02:41 -0700716 if idx != lastIndex {
717 returnedValues = append(returnedValues, val.Interface())
718 }
719 }
720 }
721 }
722
723 var icm *ic.InterContainerMessage
724 if icm, err = encodeResponse(msg, success, returnedValues...); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500725 logger.Warnw("error-encoding-response-returning-failure-result", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700726 icm = encodeDefaultFailedResponse(msg)
727 }
728 // To preserve ordering of messages, all messages to a given topic are sent to the same partition
729 // by providing a message key. The key is encoded in the topic name. If the deviceId is not
730 // present then the key will be empty, hence all messages for a given topic will be sent to all
731 // partitions.
732 replyTopic := &Topic{Name: msg.Header.FromTopic}
733 key := msg.Header.KeyTopic
khenaidoob332f9b2020-01-16 16:25:26 -0500734 logger.Debugw("sending-response-to-kafka", log.Fields{"rpc": requestBody.Rpc, "header": icm.Header, "key": key})
Scott Baker2c1c4822019-10-16 11:02:41 -0700735 // TODO: handle error response.
736 go kp.kafkaClient.Send(icm, replyTopic, key)
737 }
738 } else if msg.Header.Type == ic.MessageType_RESPONSE {
khenaidoob332f9b2020-01-16 16:25:26 -0500739 logger.Debugw("response-received", log.Fields{"msg-header": msg.Header})
Scott Baker2c1c4822019-10-16 11:02:41 -0700740 go kp.dispatchResponse(msg)
741 } else {
khenaidoob332f9b2020-01-16 16:25:26 -0500742 logger.Warnw("unsupported-message-received", log.Fields{"msg-header": msg.Header})
Scott Baker2c1c4822019-10-16 11:02:41 -0700743 }
744}
745
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800746func (kp *interContainerProxy) waitForMessages(ch <-chan *ic.InterContainerMessage, topic Topic, targetInterface interface{}) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700747 // Wait for messages
748 for msg := range ch {
khenaidoob332f9b2020-01-16 16:25:26 -0500749 //logger.Debugw("request-received", log.Fields{"msg": msg, "topic": topic.Name, "target": targetInterface})
Scott Baker2c1c4822019-10-16 11:02:41 -0700750 go kp.handleMessage(msg, targetInterface)
751 }
752}
753
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800754func (kp *interContainerProxy) dispatchResponse(msg *ic.InterContainerMessage) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700755 kp.lockTransactionIdToChannelMap.RLock()
756 defer kp.lockTransactionIdToChannelMap.RUnlock()
757 if _, exist := kp.transactionIdToChannelMap[msg.Header.Id]; !exist {
khenaidoob332f9b2020-01-16 16:25:26 -0500758 logger.Debugw("no-waiting-channel", log.Fields{"transaction": msg.Header.Id})
Scott Baker2c1c4822019-10-16 11:02:41 -0700759 return
760 }
761 kp.transactionIdToChannelMap[msg.Header.Id].ch <- msg
762}
763
764// subscribeForResponse allows a caller to subscribe to a given topic when waiting for a response.
765// This method is built to prevent all subscribers to receive all messages as is the case of the Subscribe
766// API. There is one response channel waiting for kafka messages before dispatching the message to the
767// corresponding waiting channel
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800768func (kp *interContainerProxy) subscribeForResponse(topic Topic, trnsId string) (chan *ic.InterContainerMessage, error) {
khenaidoob332f9b2020-01-16 16:25:26 -0500769 logger.Debugw("subscribeForResponse", log.Fields{"topic": topic.Name, "trnsid": trnsId})
Scott Baker2c1c4822019-10-16 11:02:41 -0700770
771 // Create a specific channel for this consumers. We cannot use the channel from the kafkaclient as it will
772 // broadcast any message for this topic to all channels waiting on it.
773 ch := make(chan *ic.InterContainerMessage)
774 kp.addToTransactionIdToChannelMap(trnsId, &topic, ch)
775
776 return ch, nil
777}
778
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800779func (kp *interContainerProxy) unSubscribeForResponse(trnsId string) error {
khenaidoob332f9b2020-01-16 16:25:26 -0500780 logger.Debugw("unsubscribe-for-response", log.Fields{"trnsId": trnsId})
Scott Baker2c1c4822019-10-16 11:02:41 -0700781 kp.deleteFromTransactionIdToChannelMap(trnsId)
782 return nil
783}
784
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800785func (kp *interContainerProxy) EnableLivenessChannel(enable bool) chan bool {
Scott Baker104b67d2019-10-29 15:56:27 -0700786 return kp.kafkaClient.EnableLivenessChannel(enable)
787}
788
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800789func (kp *interContainerProxy) EnableHealthinessChannel(enable bool) chan bool {
Scott Baker0fef6982019-12-12 09:49:42 -0800790 return kp.kafkaClient.EnableHealthinessChannel(enable)
791}
792
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800793func (kp *interContainerProxy) SendLiveness() error {
Scott Baker104b67d2019-10-29 15:56:27 -0700794 return kp.kafkaClient.SendLiveness()
795}
796
Scott Baker2c1c4822019-10-16 11:02:41 -0700797//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
798//or an error on failure
799func encodeRequest(rpc string, toTopic *Topic, replyTopic *Topic, key string, kvArgs ...*KVArg) (*ic.InterContainerMessage, error) {
800 requestHeader := &ic.Header{
801 Id: uuid.New().String(),
802 Type: ic.MessageType_REQUEST,
803 FromTopic: replyTopic.Name,
804 ToTopic: toTopic.Name,
805 KeyTopic: key,
806 Timestamp: time.Now().UnixNano(),
807 }
808 requestBody := &ic.InterContainerRequestBody{
809 Rpc: rpc,
810 ResponseRequired: true,
811 ReplyToTopic: replyTopic.Name,
812 }
813
814 for _, arg := range kvArgs {
815 if arg == nil {
816 // In case the caller sends an array with empty args
817 continue
818 }
819 var marshalledArg *any.Any
820 var err error
821 // ascertain the value interface type is a proto.Message
822 protoValue, ok := arg.Value.(proto.Message)
823 if !ok {
khenaidoob332f9b2020-01-16 16:25:26 -0500824 logger.Warnw("argument-value-not-proto-message", log.Fields{"error": ok, "Value": arg.Value})
Scott Baker2c1c4822019-10-16 11:02:41 -0700825 err := errors.New("argument-value-not-proto-message")
826 return nil, err
827 }
828 if marshalledArg, err = ptypes.MarshalAny(protoValue); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500829 logger.Warnw("cannot-marshal-request", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700830 return nil, err
831 }
832 protoArg := &ic.Argument{
833 Key: arg.Key,
834 Value: marshalledArg,
835 }
836 requestBody.Args = append(requestBody.Args, protoArg)
837 }
838
839 var marshalledData *any.Any
840 var err error
841 if marshalledData, err = ptypes.MarshalAny(requestBody); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500842 logger.Warnw("cannot-marshal-request", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700843 return nil, err
844 }
845 request := &ic.InterContainerMessage{
846 Header: requestHeader,
847 Body: marshalledData,
848 }
849 return request, nil
850}
851
852func decodeResponse(response *ic.InterContainerMessage) (*ic.InterContainerResponseBody, error) {
853 // Extract the message body
854 responseBody := ic.InterContainerResponseBody{}
855 if err := ptypes.UnmarshalAny(response.Body, &responseBody); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500856 logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700857 return nil, err
858 }
khenaidoob332f9b2020-01-16 16:25:26 -0500859 //logger.Debugw("response-decoded-successfully", log.Fields{"response-status": &responseBody.Success})
Scott Baker2c1c4822019-10-16 11:02:41 -0700860
861 return &responseBody, nil
862
863}