blob: 4a3ec92ad9c798fc4a732cd9c12d250d145e3e24 [file] [log] [blame]
khenaidoobf6e7bb2018-08-14 22:27:29 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
khenaidooabad44c2018-08-03 16:58:35 -040016package kafka
17
18import (
19 "context"
20 "errors"
21 "fmt"
khenaidooabad44c2018-08-03 16:58:35 -040022 "github.com/golang/protobuf/proto"
23 "github.com/golang/protobuf/ptypes"
24 "github.com/golang/protobuf/ptypes/any"
25 "github.com/google/uuid"
26 "github.com/opencord/voltha-go/common/log"
khenaidoo79232702018-12-04 11:00:41 -050027 ic "github.com/opencord/voltha-go/protos/inter_container"
khenaidooabad44c2018-08-03 16:58:35 -040028 "reflect"
29 "sync"
30 "time"
31)
32
33// Initialize the logger - gets the default until the main function setup the logger
34func init() {
khenaidoob9203542018-09-17 22:56:37 -040035 log.AddPackage(log.JSON, log.WarnLevel, nil)
khenaidooabad44c2018-08-03 16:58:35 -040036}
37
38const (
khenaidoo43c82122018-11-22 18:38:28 -050039 DefaultMaxRetries = 3
40 DefaultRequestTimeout = 500 // 500 milliseconds - to handle a wider latency range
khenaidooabad44c2018-08-03 16:58:35 -040041)
42
khenaidoo43c82122018-11-22 18:38:28 -050043// requestHandlerChannel represents an interface associated with a channel. Whenever, an event is
44// obtained from that channel, this interface is invoked. This is used to handle
45// async requests into the Core via the kafka messaging bus
46type requestHandlerChannel struct {
47 requesthandlerInterface interface{}
khenaidoo79232702018-12-04 11:00:41 -050048 ch <-chan *ic.InterContainerMessage
khenaidooabad44c2018-08-03 16:58:35 -040049}
50
khenaidoo43c82122018-11-22 18:38:28 -050051// transactionChannel represents a combination of a topic and a channel onto which a response received
52// on the kafka bus will be sent to
53type transactionChannel struct {
54 topic *Topic
khenaidoo79232702018-12-04 11:00:41 -050055 ch chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -050056}
57
58// InterContainerProxy represents the messaging proxy
59type InterContainerProxy struct {
60 kafkaHost string
61 kafkaPort int
62 DefaultTopic *Topic
63 defaultRequestHandlerInterface interface{}
khenaidoo79232702018-12-04 11:00:41 -050064 deviceDiscoveryTopic *Topic
khenaidoo43c82122018-11-22 18:38:28 -050065 kafkaClient Client
66 doneCh chan int
67
68 // This map is used to map a topic to an interface and channel. When a request is received
69 // on that channel (registered to the topic) then that interface is invoked.
70 topicToRequestHandlerChannelMap map[string]*requestHandlerChannel
71 lockTopicRequestHandlerChannelMap sync.RWMutex
72
73 // This map is used to map a channel to a response topic. This channel handles all responses on that
khenaidoo4c1a5bf2018-11-29 15:53:42 -050074 // channel for that topic and forward them to the appropriate consumers channel, using the
khenaidoo43c82122018-11-22 18:38:28 -050075 // transactionIdToChannelMap.
khenaidoo79232702018-12-04 11:00:41 -050076 topicToResponseChannelMap map[string]<-chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -050077 lockTopicResponseChannelMap sync.RWMutex
78
khenaidoo4c1a5bf2018-11-29 15:53:42 -050079 // This map is used to map a transaction to a consumers channel. This is used whenever a request has been
khenaidoo43c82122018-11-22 18:38:28 -050080 // sent out and we are waiting for a response.
81 transactionIdToChannelMap map[string]*transactionChannel
khenaidooabad44c2018-08-03 16:58:35 -040082 lockTransactionIdToChannelMap sync.RWMutex
83}
84
khenaidoo43c82122018-11-22 18:38:28 -050085type InterContainerProxyOption func(*InterContainerProxy)
khenaidooabad44c2018-08-03 16:58:35 -040086
khenaidoo43c82122018-11-22 18:38:28 -050087func InterContainerHost(host string) InterContainerProxyOption {
88 return func(args *InterContainerProxy) {
89 args.kafkaHost = host
khenaidooabad44c2018-08-03 16:58:35 -040090 }
91}
92
khenaidoo43c82122018-11-22 18:38:28 -050093func InterContainerPort(port int) InterContainerProxyOption {
94 return func(args *InterContainerProxy) {
95 args.kafkaPort = port
khenaidooabad44c2018-08-03 16:58:35 -040096 }
97}
98
khenaidoo43c82122018-11-22 18:38:28 -050099func DefaultTopic(topic *Topic) InterContainerProxyOption {
100 return func(args *InterContainerProxy) {
khenaidooabad44c2018-08-03 16:58:35 -0400101 args.DefaultTopic = topic
102 }
103}
104
khenaidoo79232702018-12-04 11:00:41 -0500105func DeviceDiscoveryTopic(topic *Topic) InterContainerProxyOption {
106 return func(args *InterContainerProxy) {
107 args.deviceDiscoveryTopic = topic
108 }
109}
110
khenaidoo43c82122018-11-22 18:38:28 -0500111func RequestHandlerInterface(handler interface{}) InterContainerProxyOption {
112 return func(args *InterContainerProxy) {
113 args.defaultRequestHandlerInterface = handler
khenaidooabad44c2018-08-03 16:58:35 -0400114 }
115}
116
khenaidoo43c82122018-11-22 18:38:28 -0500117func MsgClient(client Client) InterContainerProxyOption {
118 return func(args *InterContainerProxy) {
119 args.kafkaClient = client
120 }
121}
122
123func NewInterContainerProxy(opts ...InterContainerProxyOption) (*InterContainerProxy, error) {
124 proxy := &InterContainerProxy{
125 kafkaHost: DefaultKafkaHost,
126 kafkaPort: DefaultKafkaPort,
khenaidooabad44c2018-08-03 16:58:35 -0400127 }
128
129 for _, option := range opts {
130 option(proxy)
131 }
132
133 // Create the locks for all the maps
khenaidoo43c82122018-11-22 18:38:28 -0500134 proxy.lockTopicRequestHandlerChannelMap = sync.RWMutex{}
khenaidooabad44c2018-08-03 16:58:35 -0400135 proxy.lockTransactionIdToChannelMap = sync.RWMutex{}
khenaidoo43c82122018-11-22 18:38:28 -0500136 proxy.lockTopicResponseChannelMap = sync.RWMutex{}
khenaidooabad44c2018-08-03 16:58:35 -0400137
138 return proxy, nil
139}
140
khenaidoo43c82122018-11-22 18:38:28 -0500141func (kp *InterContainerProxy) Start() error {
khenaidooabad44c2018-08-03 16:58:35 -0400142 log.Info("Starting-Proxy")
143
khenaidoo43c82122018-11-22 18:38:28 -0500144 // Kafka MsgClient should already have been created. If not, output fatal error
145 if kp.kafkaClient == nil {
146 log.Fatal("kafka-client-not-set")
147 }
148
khenaidooabad44c2018-08-03 16:58:35 -0400149 // Create the Done channel
150 kp.doneCh = make(chan int, 1)
151
khenaidoo43c82122018-11-22 18:38:28 -0500152 // Start the kafka client
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500153 if err := kp.kafkaClient.Start(); err != nil {
khenaidoo43c82122018-11-22 18:38:28 -0500154 log.Errorw("Cannot-create-kafka-proxy", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400155 return err
156 }
157
khenaidoo43c82122018-11-22 18:38:28 -0500158 // Create the topic to response channel map
khenaidoo79232702018-12-04 11:00:41 -0500159 kp.topicToResponseChannelMap = make(map[string]<-chan *ic.InterContainerMessage)
khenaidoo43c82122018-11-22 18:38:28 -0500160 //
khenaidooabad44c2018-08-03 16:58:35 -0400161 // Create the transactionId to Channel Map
khenaidoo43c82122018-11-22 18:38:28 -0500162 kp.transactionIdToChannelMap = make(map[string]*transactionChannel)
163
164 // Create the topic to request channel map
165 kp.topicToRequestHandlerChannelMap = make(map[string]*requestHandlerChannel)
khenaidooabad44c2018-08-03 16:58:35 -0400166
167 return nil
168}
169
khenaidoo43c82122018-11-22 18:38:28 -0500170func (kp *InterContainerProxy) Stop() {
171 log.Info("stopping-intercontainer-proxy")
172 kp.doneCh <- 1
173 // TODO : Perform cleanup
174 //kp.kafkaClient.Stop()
175 //kp.deleteAllTopicRequestHandlerChannelMap()
176 //kp.deleteAllTopicResponseChannelMap()
177 //kp.deleteAllTransactionIdToChannelMap()
khenaidooabad44c2018-08-03 16:58:35 -0400178}
179
khenaidoo79232702018-12-04 11:00:41 -0500180// DeviceDiscovered publish the discovered device onto the kafka messaging bus
181func (kp *InterContainerProxy) DeviceDiscovered(deviceId string, deviceType string, parentId string) error {
182 log.Debugw("sending-device-discovery-msg", log.Fields{"deviceId": deviceId})
183 // Simple validation
184 if deviceId == "" || deviceType == "" {
185 log.Errorw("invalid-parameters", log.Fields{"id": deviceId, "type": deviceType})
186 return errors.New("invalid-parameters")
187 }
188 // Create the device discovery message
189 header := &ic.Header{
190 Id: uuid.New().String(),
191 Type: ic.MessageType_DEVICE_DISCOVERED,
192 FromTopic: kp.DefaultTopic.Name,
193 ToTopic: kp.deviceDiscoveryTopic.Name,
194 Timestamp: time.Now().UnixNano(),
195 }
196 body := &ic.DeviceDiscovered{
197 Id: deviceId,
198 DeviceType: deviceType,
199 ParentId: parentId,
200 }
201
202 var marshalledData *any.Any
203 var err error
204 if marshalledData, err = ptypes.MarshalAny(body); err != nil {
205 log.Errorw("cannot-marshal-request", log.Fields{"error": err})
206 return err
207 }
208 msg := &ic.InterContainerMessage{
209 Header: header,
210 Body: marshalledData,
211 }
212
213 // Send the message
214 if err := kp.kafkaClient.Send(msg, kp.deviceDiscoveryTopic); err != nil {
215 log.Errorw("cannot-send-device-discovery-message", log.Fields{"error": err})
216 return err
217 }
218 return nil
219}
220
khenaidoo43c82122018-11-22 18:38:28 -0500221// InvokeRPC is used to send a request to a given topic
222func (kp *InterContainerProxy) InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
223 waitForResponse bool, kvArgs ...*KVArg) (bool, *any.Any) {
224
225 // If a replyToTopic is provided then we use it, otherwise just use the default toTopic. The replyToTopic is
226 // typically the device ID.
227 responseTopic := replyToTopic
228 if responseTopic == nil {
229 responseTopic = kp.DefaultTopic
230 }
231
khenaidooabad44c2018-08-03 16:58:35 -0400232 // Encode the request
khenaidoo43c82122018-11-22 18:38:28 -0500233 protoRequest, err := encodeRequest(rpc, toTopic, responseTopic, kvArgs...)
khenaidooabad44c2018-08-03 16:58:35 -0400234 if err != nil {
235 log.Warnw("cannot-format-request", log.Fields{"rpc": rpc, "error": err})
236 return false, nil
237 }
238
239 // Subscribe for response, if needed, before sending request
khenaidoo79232702018-12-04 11:00:41 -0500240 var ch <-chan *ic.InterContainerMessage
khenaidooabad44c2018-08-03 16:58:35 -0400241 if waitForResponse {
242 var err error
khenaidoo43c82122018-11-22 18:38:28 -0500243 if ch, err = kp.subscribeForResponse(*responseTopic, protoRequest.Header.Id); err != nil {
244 log.Errorw("failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
khenaidooabad44c2018-08-03 16:58:35 -0400245 }
246 }
247
khenaidoo43c82122018-11-22 18:38:28 -0500248 // Send request - if the topic is formatted with a device Id then we will send the request using a
249 // specific key, hence ensuring a single partition is used to publish the request. This ensures that the
250 // subscriber on that topic will receive the request in the order it was sent. The key used is the deviceId.
251 key := GetDeviceIdFromTopic(*toTopic)
252 log.Debugw("sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key})
253 go kp.kafkaClient.Send(protoRequest, toTopic, key)
khenaidooabad44c2018-08-03 16:58:35 -0400254
255 if waitForResponse {
khenaidoob9203542018-09-17 22:56:37 -0400256 // Create a child context based on the parent context, if any
khenaidooabad44c2018-08-03 16:58:35 -0400257 var cancel context.CancelFunc
khenaidoob9203542018-09-17 22:56:37 -0400258 childCtx := context.Background()
khenaidooabad44c2018-08-03 16:58:35 -0400259 if ctx == nil {
260 ctx, cancel = context.WithTimeout(context.Background(), DefaultRequestTimeout*time.Millisecond)
khenaidoob9203542018-09-17 22:56:37 -0400261 } else {
262 childCtx, cancel = context.WithTimeout(ctx, DefaultRequestTimeout*time.Millisecond)
khenaidooabad44c2018-08-03 16:58:35 -0400263 }
khenaidoob9203542018-09-17 22:56:37 -0400264 defer cancel()
khenaidooabad44c2018-08-03 16:58:35 -0400265
266 // Wait for response as well as timeout or cancellation
267 // Remove the subscription for a response on return
268 defer kp.unSubscribeForResponse(protoRequest.Header.Id)
269 select {
270 case msg := <-ch:
khenaidoo43c82122018-11-22 18:38:28 -0500271 log.Debugw("received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
khenaidoo79232702018-12-04 11:00:41 -0500272 var responseBody *ic.InterContainerResponseBody
khenaidooabad44c2018-08-03 16:58:35 -0400273 var err error
274 if responseBody, err = decodeResponse(msg); err != nil {
275 log.Errorw("decode-response-error", log.Fields{"error": err})
276 }
277 return responseBody.Success, responseBody.Result
278 case <-ctx.Done():
279 log.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
280 // pack the error as proto any type
khenaidoo79232702018-12-04 11:00:41 -0500281 protoError := &ic.Error{Reason: ctx.Err().Error()}
khenaidooabad44c2018-08-03 16:58:35 -0400282 var marshalledArg *any.Any
283 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
284 return false, nil // Should never happen
285 }
286 return false, marshalledArg
khenaidoob9203542018-09-17 22:56:37 -0400287 case <-childCtx.Done():
288 log.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": childCtx.Err()})
289 // pack the error as proto any type
khenaidoo79232702018-12-04 11:00:41 -0500290 protoError := &ic.Error{Reason: childCtx.Err().Error()}
khenaidoob9203542018-09-17 22:56:37 -0400291 var marshalledArg *any.Any
292 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
293 return false, nil // Should never happen
294 }
295 return false, marshalledArg
khenaidooabad44c2018-08-03 16:58:35 -0400296 case <-kp.doneCh:
khenaidoo43c82122018-11-22 18:38:28 -0500297 log.Infow("received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
khenaidooabad44c2018-08-03 16:58:35 -0400298 return true, nil
299 }
300 }
301 return true, nil
302}
303
khenaidoo43c82122018-11-22 18:38:28 -0500304// SubscribeWithRequestHandlerInterface allows a caller to assign a target object to be invoked automatically
khenaidooabad44c2018-08-03 16:58:35 -0400305// when a message is received on a given topic
khenaidoo43c82122018-11-22 18:38:28 -0500306func (kp *InterContainerProxy) SubscribeWithRequestHandlerInterface(topic Topic, handler interface{}) error {
khenaidooabad44c2018-08-03 16:58:35 -0400307
308 // Subscribe to receive messages for that topic
khenaidoo79232702018-12-04 11:00:41 -0500309 var ch <-chan *ic.InterContainerMessage
khenaidooabad44c2018-08-03 16:58:35 -0400310 var err error
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500311 if ch, err = kp.kafkaClient.Subscribe(&topic); err != nil {
khenaidoo43c82122018-11-22 18:38:28 -0500312 //if ch, err = kp.Subscribe(topic); err != nil {
khenaidooabad44c2018-08-03 16:58:35 -0400313 log.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
314 }
khenaidoo43c82122018-11-22 18:38:28 -0500315
316 kp.defaultRequestHandlerInterface = handler
317 kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: handler, ch: ch})
khenaidooabad44c2018-08-03 16:58:35 -0400318 // Launch a go routine to receive and process kafka messages
khenaidoo43c82122018-11-22 18:38:28 -0500319 go kp.waitForRequest(ch, topic, handler)
khenaidooabad44c2018-08-03 16:58:35 -0400320
321 return nil
322}
323
khenaidoo43c82122018-11-22 18:38:28 -0500324// SubscribeWithDefaultRequestHandler allows a caller to add a topic to an existing target object to be invoked automatically
325// when a message is received on a given topic. So far there is only 1 target registered per microservice
326func (kp *InterContainerProxy) SubscribeWithDefaultRequestHandler(topic Topic) error {
327 // Subscribe to receive messages for that topic
khenaidoo79232702018-12-04 11:00:41 -0500328 var ch <-chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -0500329 var err error
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500330 if ch, err = kp.kafkaClient.Subscribe(&topic); err != nil {
khenaidoo43c82122018-11-22 18:38:28 -0500331 log.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
332 }
333 kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: kp.defaultRequestHandlerInterface, ch: ch})
334
335 // Launch a go routine to receive and process kafka messages
336 go kp.waitForRequest(ch, topic, kp.defaultRequestHandlerInterface)
337
khenaidooabad44c2018-08-03 16:58:35 -0400338 return nil
339}
340
khenaidoo43c82122018-11-22 18:38:28 -0500341func (kp *InterContainerProxy) UnSubscribeFromRequestHandler(topic Topic) error {
342 return kp.deleteFromTopicRequestHandlerChannelMap(topic.Name)
343}
344
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500345// setupTopicResponseChannelMap sets up single consumers channel that will act as a broadcast channel for all
khenaidoo43c82122018-11-22 18:38:28 -0500346// responses from that topic.
khenaidoo79232702018-12-04 11:00:41 -0500347func (kp *InterContainerProxy) setupTopicResponseChannelMap(topic string, arg <-chan *ic.InterContainerMessage) {
khenaidoo43c82122018-11-22 18:38:28 -0500348 kp.lockTopicResponseChannelMap.Lock()
349 defer kp.lockTopicResponseChannelMap.Unlock()
350 if _, exist := kp.topicToResponseChannelMap[topic]; !exist {
351 kp.topicToResponseChannelMap[topic] = arg
khenaidooabad44c2018-08-03 16:58:35 -0400352 }
353}
354
khenaidoo43c82122018-11-22 18:38:28 -0500355func (kp *InterContainerProxy) isTopicSubscribedForResponse(topic string) bool {
356 kp.lockTopicResponseChannelMap.Lock()
357 defer kp.lockTopicResponseChannelMap.Unlock()
358 _, exist := kp.topicToResponseChannelMap[topic]
359 return exist
360}
361
362func (kp *InterContainerProxy) deleteFromTopicResponseChannelMap(topic string) error {
363 kp.lockTopicResponseChannelMap.Lock()
364 defer kp.lockTopicResponseChannelMap.Unlock()
365 if _, exist := kp.topicToResponseChannelMap[topic]; exist {
366 // Unsubscribe to this topic first - this will close the subscribed channel
367 var err error
368 if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
369 log.Errorw("unsubscribing-error", log.Fields{"topic": topic})
370 }
371 delete(kp.topicToResponseChannelMap, topic)
372 return err
373 } else {
374 return errors.New(fmt.Sprintf("%s-Topic-not-found", topic))
khenaidooabad44c2018-08-03 16:58:35 -0400375 }
376}
377
khenaidoo43c82122018-11-22 18:38:28 -0500378func (kp *InterContainerProxy) deleteAllTopicResponseChannelMap() error {
379 kp.lockTopicResponseChannelMap.Lock()
380 defer kp.lockTopicResponseChannelMap.Unlock()
381 var err error
382 for topic, _ := range kp.topicToResponseChannelMap {
383 // Unsubscribe to this topic first - this will close the subscribed channel
384 if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
385 log.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
386 }
387 delete(kp.topicToResponseChannelMap, topic)
khenaidooabad44c2018-08-03 16:58:35 -0400388 }
khenaidoo43c82122018-11-22 18:38:28 -0500389 return err
khenaidooabad44c2018-08-03 16:58:35 -0400390}
391
khenaidoo43c82122018-11-22 18:38:28 -0500392func (kp *InterContainerProxy) addToTopicRequestHandlerChannelMap(topic string, arg *requestHandlerChannel) {
393 kp.lockTopicRequestHandlerChannelMap.Lock()
394 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
395 if _, exist := kp.topicToRequestHandlerChannelMap[topic]; !exist {
396 kp.topicToRequestHandlerChannelMap[topic] = arg
khenaidooabad44c2018-08-03 16:58:35 -0400397 }
khenaidooabad44c2018-08-03 16:58:35 -0400398}
399
khenaidoo43c82122018-11-22 18:38:28 -0500400func (kp *InterContainerProxy) deleteFromTopicRequestHandlerChannelMap(topic string) error {
401 kp.lockTopicRequestHandlerChannelMap.Lock()
402 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
403 if _, exist := kp.topicToRequestHandlerChannelMap[topic]; exist {
404 // Close the kafka client client first by unsubscribing to this topic
405 kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch)
406 delete(kp.topicToRequestHandlerChannelMap, topic)
khenaidooabad44c2018-08-03 16:58:35 -0400407 return nil
khenaidoo43c82122018-11-22 18:38:28 -0500408 } else {
409 return errors.New(fmt.Sprintf("%s-Topic-not-found", topic))
khenaidooabad44c2018-08-03 16:58:35 -0400410 }
khenaidooabad44c2018-08-03 16:58:35 -0400411}
412
khenaidoo43c82122018-11-22 18:38:28 -0500413func (kp *InterContainerProxy) deleteAllTopicRequestHandlerChannelMap() error {
414 kp.lockTopicRequestHandlerChannelMap.Lock()
415 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
416 var err error
417 for topic, _ := range kp.topicToRequestHandlerChannelMap {
418 // Close the kafka client client first by unsubscribing to this topic
419 if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
420 log.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
421 }
422 delete(kp.topicToRequestHandlerChannelMap, topic)
423 }
424 return err
425}
426
khenaidoo79232702018-12-04 11:00:41 -0500427func (kp *InterContainerProxy) addToTransactionIdToChannelMap(id string, topic *Topic, arg chan *ic.InterContainerMessage) {
khenaidooabad44c2018-08-03 16:58:35 -0400428 kp.lockTransactionIdToChannelMap.Lock()
429 defer kp.lockTransactionIdToChannelMap.Unlock()
430 if _, exist := kp.transactionIdToChannelMap[id]; !exist {
khenaidoo43c82122018-11-22 18:38:28 -0500431 kp.transactionIdToChannelMap[id] = &transactionChannel{topic: topic, ch: arg}
khenaidooabad44c2018-08-03 16:58:35 -0400432 }
433}
434
khenaidoo43c82122018-11-22 18:38:28 -0500435func (kp *InterContainerProxy) deleteFromTransactionIdToChannelMap(id string) {
khenaidooabad44c2018-08-03 16:58:35 -0400436 kp.lockTransactionIdToChannelMap.Lock()
437 defer kp.lockTransactionIdToChannelMap.Unlock()
khenaidoo43c82122018-11-22 18:38:28 -0500438 if transChannel, exist := kp.transactionIdToChannelMap[id]; exist {
439 // Close the channel first
440 close(transChannel.ch)
khenaidooabad44c2018-08-03 16:58:35 -0400441 delete(kp.transactionIdToChannelMap, id)
442 }
443}
444
khenaidoo43c82122018-11-22 18:38:28 -0500445func (kp *InterContainerProxy) deleteTopicTransactionIdToChannelMap(id string) {
446 kp.lockTransactionIdToChannelMap.Lock()
447 defer kp.lockTransactionIdToChannelMap.Unlock()
448 for key, value := range kp.transactionIdToChannelMap {
449 if value.topic.Name == id {
450 close(value.ch)
451 delete(kp.transactionIdToChannelMap, key)
khenaidooabad44c2018-08-03 16:58:35 -0400452 }
453 }
khenaidooabad44c2018-08-03 16:58:35 -0400454}
455
khenaidoo43c82122018-11-22 18:38:28 -0500456func (kp *InterContainerProxy) deleteAllTransactionIdToChannelMap() {
457 kp.lockTransactionIdToChannelMap.Lock()
458 defer kp.lockTransactionIdToChannelMap.Unlock()
459 for key, value := range kp.transactionIdToChannelMap {
460 close(value.ch)
461 delete(kp.transactionIdToChannelMap, key)
khenaidooabad44c2018-08-03 16:58:35 -0400462 }
khenaidooabad44c2018-08-03 16:58:35 -0400463}
464
khenaidoo43c82122018-11-22 18:38:28 -0500465func (kp *InterContainerProxy) DeleteTopic(topic Topic) error {
466 // If we have any consumers on that topic we need to close them
467 kp.deleteFromTopicResponseChannelMap(topic.Name)
468 kp.deleteFromTopicRequestHandlerChannelMap(topic.Name)
469 kp.deleteTopicTransactionIdToChannelMap(topic.Name)
470 return kp.kafkaClient.DeleteTopic(&topic)
471}
472
473func encodeReturnedValue(returnedVal interface{}) (*any.Any, error) {
khenaidooabad44c2018-08-03 16:58:35 -0400474 // Encode the response argument - needs to be a proto message
475 if returnedVal == nil {
476 return nil, nil
477 }
478 protoValue, ok := returnedVal.(proto.Message)
479 if !ok {
480 log.Warnw("response-value-not-proto-message", log.Fields{"error": ok, "returnVal": returnedVal})
481 err := errors.New("response-value-not-proto-message")
482 return nil, err
483 }
484
485 // Marshal the returned value, if any
486 var marshalledReturnedVal *any.Any
487 var err error
488 if marshalledReturnedVal, err = ptypes.MarshalAny(protoValue); err != nil {
489 log.Warnw("cannot-marshal-returned-val", log.Fields{"error": err})
490 return nil, err
491 }
492 return marshalledReturnedVal, nil
493}
494
khenaidoo79232702018-12-04 11:00:41 -0500495func encodeDefaultFailedResponse(request *ic.InterContainerMessage) *ic.InterContainerMessage {
496 responseHeader := &ic.Header{
khenaidooabad44c2018-08-03 16:58:35 -0400497 Id: request.Header.Id,
khenaidoo79232702018-12-04 11:00:41 -0500498 Type: ic.MessageType_RESPONSE,
khenaidooabad44c2018-08-03 16:58:35 -0400499 FromTopic: request.Header.ToTopic,
500 ToTopic: request.Header.FromTopic,
501 Timestamp: time.Now().Unix(),
502 }
khenaidoo79232702018-12-04 11:00:41 -0500503 responseBody := &ic.InterContainerResponseBody{
khenaidooabad44c2018-08-03 16:58:35 -0400504 Success: false,
505 Result: nil,
506 }
507 var marshalledResponseBody *any.Any
508 var err error
509 // Error should never happen here
510 if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
511 log.Warnw("cannot-marshal-failed-response-body", log.Fields{"error": err})
512 }
513
khenaidoo79232702018-12-04 11:00:41 -0500514 return &ic.InterContainerMessage{
khenaidooabad44c2018-08-03 16:58:35 -0400515 Header: responseHeader,
516 Body: marshalledResponseBody,
517 }
518
519}
520
521//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
522//or an error on failure
khenaidoo79232702018-12-04 11:00:41 -0500523func encodeResponse(request *ic.InterContainerMessage, success bool, returnedValues ...interface{}) (*ic.InterContainerMessage, error) {
khenaidoo43c82122018-11-22 18:38:28 -0500524 //log.Debugw("encodeResponse", log.Fields{"success": success, "returnedValues": returnedValues})
khenaidoo79232702018-12-04 11:00:41 -0500525 responseHeader := &ic.Header{
khenaidooabad44c2018-08-03 16:58:35 -0400526 Id: request.Header.Id,
khenaidoo79232702018-12-04 11:00:41 -0500527 Type: ic.MessageType_RESPONSE,
khenaidooabad44c2018-08-03 16:58:35 -0400528 FromTopic: request.Header.ToTopic,
529 ToTopic: request.Header.FromTopic,
530 Timestamp: time.Now().Unix(),
531 }
532
533 // Go over all returned values
534 var marshalledReturnedVal *any.Any
535 var err error
536 for _, returnVal := range returnedValues {
khenaidoo43c82122018-11-22 18:38:28 -0500537 if marshalledReturnedVal, err = encodeReturnedValue(returnVal); err != nil {
khenaidooabad44c2018-08-03 16:58:35 -0400538 log.Warnw("cannot-marshal-response-body", log.Fields{"error": err})
539 }
540 break // for now we support only 1 returned value - (excluding the error)
541 }
542
khenaidoo79232702018-12-04 11:00:41 -0500543 responseBody := &ic.InterContainerResponseBody{
khenaidooabad44c2018-08-03 16:58:35 -0400544 Success: success,
545 Result: marshalledReturnedVal,
546 }
547
548 // Marshal the response body
549 var marshalledResponseBody *any.Any
550 if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
551 log.Warnw("cannot-marshal-response-body", log.Fields{"error": err})
552 return nil, err
553 }
554
khenaidoo79232702018-12-04 11:00:41 -0500555 return &ic.InterContainerMessage{
khenaidooabad44c2018-08-03 16:58:35 -0400556 Header: responseHeader,
557 Body: marshalledResponseBody,
558 }, nil
559}
560
561func CallFuncByName(myClass interface{}, funcName string, params ...interface{}) (out []reflect.Value, err error) {
562 myClassValue := reflect.ValueOf(myClass)
563 m := myClassValue.MethodByName(funcName)
564 if !m.IsValid() {
khenaidoo43c82122018-11-22 18:38:28 -0500565 return make([]reflect.Value, 0), fmt.Errorf("method-not-found \"%s\"", funcName)
khenaidooabad44c2018-08-03 16:58:35 -0400566 }
567 in := make([]reflect.Value, len(params))
568 for i, param := range params {
569 in[i] = reflect.ValueOf(param)
570 }
571 out = m.Call(in)
572 return
573}
574
khenaidoo79232702018-12-04 11:00:41 -0500575func (kp *InterContainerProxy) handleRequest(msg *ic.InterContainerMessage, targetInterface interface{}) {
khenaidooabad44c2018-08-03 16:58:35 -0400576
khenaidoo43c82122018-11-22 18:38:28 -0500577 // First extract the header to know whether this is a request - responses are handled by a different handler
khenaidoo79232702018-12-04 11:00:41 -0500578 if msg.Header.Type == ic.MessageType_REQUEST {
khenaidooabad44c2018-08-03 16:58:35 -0400579
580 var out []reflect.Value
581 var err error
582
583 // Get the request body
khenaidoo79232702018-12-04 11:00:41 -0500584 requestBody := &ic.InterContainerRequestBody{}
khenaidooabad44c2018-08-03 16:58:35 -0400585 if err = ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
586 log.Warnw("cannot-unmarshal-request", log.Fields{"error": err})
587 } else {
khenaidoo43c82122018-11-22 18:38:28 -0500588 log.Debugw("received-request", log.Fields{"rpc": requestBody.Rpc, "header": msg.Header})
khenaidooabad44c2018-08-03 16:58:35 -0400589 // let the callee unpack the arguments as its the only one that knows the real proto type
590 out, err = CallFuncByName(targetInterface, requestBody.Rpc, requestBody.Args)
591 if err != nil {
592 log.Warn(err)
593 }
594 }
595 // Response required?
596 if requestBody.ResponseRequired {
597 // If we already have an error before then just return that
khenaidoo79232702018-12-04 11:00:41 -0500598 var returnError *ic.Error
khenaidooabad44c2018-08-03 16:58:35 -0400599 var returnedValues []interface{}
600 var success bool
601 if err != nil {
khenaidoo79232702018-12-04 11:00:41 -0500602 returnError = &ic.Error{Reason: err.Error()}
khenaidooabad44c2018-08-03 16:58:35 -0400603 returnedValues = make([]interface{}, 1)
604 returnedValues[0] = returnError
605 } else {
khenaidoo43c82122018-11-22 18:38:28 -0500606 //log.Debugw("returned-api-response", log.Fields{"len": len(out), "err": err})
khenaidoob9203542018-09-17 22:56:37 -0400607 returnedValues = make([]interface{}, 0)
608 // Check for errors first
609 lastIndex := len(out) - 1
610 if out[lastIndex].Interface() != nil { // Error
611 if goError, ok := out[lastIndex].Interface().(error); ok {
khenaidoo79232702018-12-04 11:00:41 -0500612 returnError = &ic.Error{Reason: goError.Error()}
khenaidoob9203542018-09-17 22:56:37 -0400613 returnedValues = append(returnedValues, returnError)
614 } else { // Should never happen
khenaidoo79232702018-12-04 11:00:41 -0500615 returnError = &ic.Error{Reason: "incorrect-error-returns"}
khenaidoob9203542018-09-17 22:56:37 -0400616 returnedValues = append(returnedValues, returnError)
617 }
618 } else { // Non-error case
619 success = true
620 for idx, val := range out {
khenaidoo43c82122018-11-22 18:38:28 -0500621 //log.Debugw("returned-api-response-loop", log.Fields{"idx": idx, "val": val.Interface()})
khenaidoob9203542018-09-17 22:56:37 -0400622 if idx != lastIndex {
623 returnedValues = append(returnedValues, val.Interface())
khenaidooabad44c2018-08-03 16:58:35 -0400624 }
khenaidooabad44c2018-08-03 16:58:35 -0400625 }
626 }
627 }
628
khenaidoo79232702018-12-04 11:00:41 -0500629 var icm *ic.InterContainerMessage
khenaidooabad44c2018-08-03 16:58:35 -0400630 if icm, err = encodeResponse(msg, success, returnedValues...); err != nil {
khenaidoob9203542018-09-17 22:56:37 -0400631 log.Warnw("error-encoding-response-returning-failure-result", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400632 icm = encodeDefaultFailedResponse(msg)
633 }
khenaidoo43c82122018-11-22 18:38:28 -0500634 // To preserve ordering of messages, all messages to a given topic are sent to the same partition
635 // by providing a message key. The key is encoded in the topic name. If the deviceId is not
636 // present then the key will be empty, hence all messages for a given topic will be sent to all
637 // partitions.
638 replyTopic := &Topic{Name: msg.Header.FromTopic}
639 key := GetDeviceIdFromTopic(*replyTopic)
khenaidoo90847922018-12-03 14:47:51 -0500640 log.Debugw("sending-response-to-kafka", log.Fields{"rpc": requestBody.Rpc, "header": icm.Header, "key": key})
khenaidoo43c82122018-11-22 18:38:28 -0500641 // TODO: handle error response.
642 kp.kafkaClient.Send(icm, replyTopic, key)
khenaidooabad44c2018-08-03 16:58:35 -0400643 }
644
khenaidooabad44c2018-08-03 16:58:35 -0400645 }
646}
647
khenaidoo79232702018-12-04 11:00:41 -0500648func (kp *InterContainerProxy) waitForRequest(ch <-chan *ic.InterContainerMessage, topic Topic, targetInterface interface{}) {
khenaidooabad44c2018-08-03 16:58:35 -0400649 // Wait for messages
650 for msg := range ch {
khenaidoo43c82122018-11-22 18:38:28 -0500651 //log.Debugw("request-received", log.Fields{"msg": msg, "topic": topic.Name, "target": targetInterface})
khenaidooabad44c2018-08-03 16:58:35 -0400652 go kp.handleRequest(msg, targetInterface)
653 }
654}
655
khenaidoo79232702018-12-04 11:00:41 -0500656func (kp *InterContainerProxy) dispatchResponse(msg *ic.InterContainerMessage) {
khenaidooabad44c2018-08-03 16:58:35 -0400657 kp.lockTransactionIdToChannelMap.Lock()
658 defer kp.lockTransactionIdToChannelMap.Unlock()
659 if _, exist := kp.transactionIdToChannelMap[msg.Header.Id]; !exist {
660 log.Debugw("no-waiting-channel", log.Fields{"transaction": msg.Header.Id})
661 return
662 }
khenaidoo43c82122018-11-22 18:38:28 -0500663 kp.transactionIdToChannelMap[msg.Header.Id].ch <- msg
khenaidooabad44c2018-08-03 16:58:35 -0400664}
665
khenaidoo43c82122018-11-22 18:38:28 -0500666// waitForResponse listens for messages on the subscribedCh, ensure we get a response with the transaction ID,
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500667// and then dispatches to the consumers
khenaidoo79232702018-12-04 11:00:41 -0500668func (kp *InterContainerProxy) waitForResponseLoop(subscribedCh <-chan *ic.InterContainerMessage, topic *Topic) {
khenaidoo43c82122018-11-22 18:38:28 -0500669 log.Debugw("starting-response-loop-for-topic", log.Fields{"topic": topic.Name})
khenaidooabad44c2018-08-03 16:58:35 -0400670startloop:
671 for {
672 select {
khenaidoo43c82122018-11-22 18:38:28 -0500673 case msg := <-subscribedCh:
674 //log.Debugw("message-received", log.Fields{"msg": msg, "fromTopic": msg.Header.FromTopic})
khenaidoo79232702018-12-04 11:00:41 -0500675 if msg.Header.Type == ic.MessageType_RESPONSE {
khenaidoo43c82122018-11-22 18:38:28 -0500676 go kp.dispatchResponse(msg)
677 }
khenaidooabad44c2018-08-03 16:58:35 -0400678 case <-kp.doneCh:
679 log.Infow("received-exit-signal", log.Fields{"topic": topic.Name})
680 break startloop
681 }
682 }
khenaidoo43c82122018-11-22 18:38:28 -0500683 //log.Infow("received-exit-signal-out-of-for-loop", log.Fields{"topic": topic.Name})
684 // We got an exit signal. Unsubscribe to the channel
685 //kp.kafkaClient.UnSubscribe(topic, subscribedCh)
khenaidooabad44c2018-08-03 16:58:35 -0400686}
687
688// subscribeForResponse allows a caller to subscribe to a given topic when waiting for a response.
689// This method is built to prevent all subscribers to receive all messages as is the case of the Subscribe
690// API. There is one response channel waiting for kafka messages before dispatching the message to the
691// corresponding waiting channel
khenaidoo79232702018-12-04 11:00:41 -0500692func (kp *InterContainerProxy) subscribeForResponse(topic Topic, trnsId string) (chan *ic.InterContainerMessage, error) {
khenaidoob9203542018-09-17 22:56:37 -0400693 log.Debugw("subscribeForResponse", log.Fields{"topic": topic.Name, "trnsid": trnsId})
khenaidooabad44c2018-08-03 16:58:35 -0400694
khenaidoo43c82122018-11-22 18:38:28 -0500695 // First check whether we already have a channel listening for response on that topic. If there is
696 // already one then it will be reused. If not, it will be created.
697 if !kp.isTopicSubscribedForResponse(topic.Name) {
khenaidoo79232702018-12-04 11:00:41 -0500698 var subscribedCh <-chan *ic.InterContainerMessage
khenaidooabad44c2018-08-03 16:58:35 -0400699 var err error
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500700 if subscribedCh, err = kp.kafkaClient.Subscribe(&topic); err != nil {
khenaidoo43c82122018-11-22 18:38:28 -0500701 log.Debugw("subscribe-failure", log.Fields{"topic": topic.Name})
khenaidooabad44c2018-08-03 16:58:35 -0400702 return nil, err
703 }
khenaidoo43c82122018-11-22 18:38:28 -0500704 kp.setupTopicResponseChannelMap(topic.Name, subscribedCh)
705 go kp.waitForResponseLoop(subscribedCh, &topic)
khenaidooabad44c2018-08-03 16:58:35 -0400706 }
707
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500708 // Create a specific channel for this consumers. We cannot use the channel from the kafkaclient as it will
khenaidoo43c82122018-11-22 18:38:28 -0500709 // broadcast any message for this topic to all channels waiting on it.
khenaidoo79232702018-12-04 11:00:41 -0500710 ch := make(chan *ic.InterContainerMessage)
khenaidoo43c82122018-11-22 18:38:28 -0500711 kp.addToTransactionIdToChannelMap(trnsId, &topic, ch)
khenaidooabad44c2018-08-03 16:58:35 -0400712
713 return ch, nil
714}
715
khenaidoo43c82122018-11-22 18:38:28 -0500716func (kp *InterContainerProxy) unSubscribeForResponse(trnsId string) error {
khenaidooabad44c2018-08-03 16:58:35 -0400717 log.Debugw("unsubscribe-for-response", log.Fields{"trnsId": trnsId})
khenaidoo43c82122018-11-22 18:38:28 -0500718 if _, exist := kp.transactionIdToChannelMap[trnsId]; exist {
719 // The delete operation will close the channel
720 kp.deleteFromTransactionIdToChannelMap(trnsId)
721 }
khenaidooabad44c2018-08-03 16:58:35 -0400722 return nil
723}
724
725//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
726//or an error on failure
khenaidoo79232702018-12-04 11:00:41 -0500727func encodeRequest(rpc string, toTopic *Topic, replyTopic *Topic, kvArgs ...*KVArg) (*ic.InterContainerMessage, error) {
728 requestHeader := &ic.Header{
khenaidooabad44c2018-08-03 16:58:35 -0400729 Id: uuid.New().String(),
khenaidoo79232702018-12-04 11:00:41 -0500730 Type: ic.MessageType_REQUEST,
khenaidooabad44c2018-08-03 16:58:35 -0400731 FromTopic: replyTopic.Name,
732 ToTopic: toTopic.Name,
733 Timestamp: time.Now().Unix(),
734 }
khenaidoo79232702018-12-04 11:00:41 -0500735 requestBody := &ic.InterContainerRequestBody{
khenaidooabad44c2018-08-03 16:58:35 -0400736 Rpc: rpc,
737 ResponseRequired: true,
738 ReplyToTopic: replyTopic.Name,
739 }
740
741 for _, arg := range kvArgs {
khenaidoo2c6f1672018-09-20 23:14:41 -0400742 if arg == nil {
743 // In case the caller sends an array with empty args
744 continue
745 }
khenaidooabad44c2018-08-03 16:58:35 -0400746 var marshalledArg *any.Any
747 var err error
748 // ascertain the value interface type is a proto.Message
749 protoValue, ok := arg.Value.(proto.Message)
750 if !ok {
751 log.Warnw("argument-value-not-proto-message", log.Fields{"error": ok, "Value": arg.Value})
752 err := errors.New("argument-value-not-proto-message")
753 return nil, err
754 }
755 if marshalledArg, err = ptypes.MarshalAny(protoValue); err != nil {
756 log.Warnw("cannot-marshal-request", log.Fields{"error": err})
757 return nil, err
758 }
khenaidoo79232702018-12-04 11:00:41 -0500759 protoArg := &ic.Argument{
khenaidooabad44c2018-08-03 16:58:35 -0400760 Key: arg.Key,
761 Value: marshalledArg,
762 }
763 requestBody.Args = append(requestBody.Args, protoArg)
764 }
765
766 var marshalledData *any.Any
767 var err error
768 if marshalledData, err = ptypes.MarshalAny(requestBody); err != nil {
769 log.Warnw("cannot-marshal-request", log.Fields{"error": err})
770 return nil, err
771 }
khenaidoo79232702018-12-04 11:00:41 -0500772 request := &ic.InterContainerMessage{
khenaidooabad44c2018-08-03 16:58:35 -0400773 Header: requestHeader,
774 Body: marshalledData,
775 }
776 return request, nil
777}
778
khenaidoo79232702018-12-04 11:00:41 -0500779func decodeResponse(response *ic.InterContainerMessage) (*ic.InterContainerResponseBody, error) {
khenaidooabad44c2018-08-03 16:58:35 -0400780 // Extract the message body
khenaidoo79232702018-12-04 11:00:41 -0500781 responseBody := ic.InterContainerResponseBody{}
khenaidooabad44c2018-08-03 16:58:35 -0400782 if err := ptypes.UnmarshalAny(response.Body, &responseBody); err != nil {
783 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
784 return nil, err
785 }
khenaidoo43c82122018-11-22 18:38:28 -0500786 //log.Debugw("response-decoded-successfully", log.Fields{"response-status": &responseBody.Success})
khenaidooabad44c2018-08-03 16:58:35 -0400787
788 return &responseBody, nil
789
790}