blob: e2210c495c536b0bb5149e4f1b560a31352a47b4 [file] [log] [blame]
khenaidoobf6e7bb2018-08-14 22:27:29 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
khenaidooabad44c2018-08-03 16:58:35 -040016package kafka
17
18import (
19 "context"
20 "errors"
21 "fmt"
khenaidooabad44c2018-08-03 16:58:35 -040022 "github.com/golang/protobuf/proto"
23 "github.com/golang/protobuf/ptypes"
24 "github.com/golang/protobuf/ptypes/any"
25 "github.com/google/uuid"
26 "github.com/opencord/voltha-go/common/log"
27 ca "github.com/opencord/voltha-go/protos/core_adapter"
28 "reflect"
29 "sync"
30 "time"
31)
32
33// Initialize the logger - gets the default until the main function setup the logger
34func init() {
khenaidoob9203542018-09-17 22:56:37 -040035 log.AddPackage(log.JSON, log.WarnLevel, nil)
khenaidooabad44c2018-08-03 16:58:35 -040036}
37
38const (
khenaidoo43c82122018-11-22 18:38:28 -050039 DefaultMaxRetries = 3
40 DefaultRequestTimeout = 500 // 500 milliseconds - to handle a wider latency range
khenaidooabad44c2018-08-03 16:58:35 -040041)
42
khenaidoo43c82122018-11-22 18:38:28 -050043// requestHandlerChannel represents an interface associated with a channel. Whenever, an event is
44// obtained from that channel, this interface is invoked. This is used to handle
45// async requests into the Core via the kafka messaging bus
46type requestHandlerChannel struct {
47 requesthandlerInterface interface{}
48 ch <-chan *ca.InterContainerMessage
khenaidooabad44c2018-08-03 16:58:35 -040049}
50
khenaidoo43c82122018-11-22 18:38:28 -050051// transactionChannel represents a combination of a topic and a channel onto which a response received
52// on the kafka bus will be sent to
53type transactionChannel struct {
54 topic *Topic
55 ch chan *ca.InterContainerMessage
56}
57
58// InterContainerProxy represents the messaging proxy
59type InterContainerProxy struct {
60 kafkaHost string
61 kafkaPort int
62 DefaultTopic *Topic
63 defaultRequestHandlerInterface interface{}
64 kafkaClient Client
65 doneCh chan int
66
67 // This map is used to map a topic to an interface and channel. When a request is received
68 // on that channel (registered to the topic) then that interface is invoked.
69 topicToRequestHandlerChannelMap map[string]*requestHandlerChannel
70 lockTopicRequestHandlerChannelMap sync.RWMutex
71
72 // This map is used to map a channel to a response topic. This channel handles all responses on that
khenaidoo4c1a5bf2018-11-29 15:53:42 -050073 // channel for that topic and forward them to the appropriate consumers channel, using the
khenaidoo43c82122018-11-22 18:38:28 -050074 // transactionIdToChannelMap.
75 topicToResponseChannelMap map[string]<-chan *ca.InterContainerMessage
76 lockTopicResponseChannelMap sync.RWMutex
77
khenaidoo4c1a5bf2018-11-29 15:53:42 -050078 // This map is used to map a transaction to a consumers channel. This is used whenever a request has been
khenaidoo43c82122018-11-22 18:38:28 -050079 // sent out and we are waiting for a response.
80 transactionIdToChannelMap map[string]*transactionChannel
khenaidooabad44c2018-08-03 16:58:35 -040081 lockTransactionIdToChannelMap sync.RWMutex
82}
83
khenaidoo43c82122018-11-22 18:38:28 -050084type InterContainerProxyOption func(*InterContainerProxy)
khenaidooabad44c2018-08-03 16:58:35 -040085
khenaidoo43c82122018-11-22 18:38:28 -050086func InterContainerHost(host string) InterContainerProxyOption {
87 return func(args *InterContainerProxy) {
88 args.kafkaHost = host
khenaidooabad44c2018-08-03 16:58:35 -040089 }
90}
91
khenaidoo43c82122018-11-22 18:38:28 -050092func InterContainerPort(port int) InterContainerProxyOption {
93 return func(args *InterContainerProxy) {
94 args.kafkaPort = port
khenaidooabad44c2018-08-03 16:58:35 -040095 }
96}
97
khenaidoo43c82122018-11-22 18:38:28 -050098func DefaultTopic(topic *Topic) InterContainerProxyOption {
99 return func(args *InterContainerProxy) {
khenaidooabad44c2018-08-03 16:58:35 -0400100 args.DefaultTopic = topic
101 }
102}
103
khenaidoo43c82122018-11-22 18:38:28 -0500104func RequestHandlerInterface(handler interface{}) InterContainerProxyOption {
105 return func(args *InterContainerProxy) {
106 args.defaultRequestHandlerInterface = handler
khenaidooabad44c2018-08-03 16:58:35 -0400107 }
108}
109
khenaidoo43c82122018-11-22 18:38:28 -0500110func MsgClient(client Client) InterContainerProxyOption {
111 return func(args *InterContainerProxy) {
112 args.kafkaClient = client
113 }
114}
115
116func NewInterContainerProxy(opts ...InterContainerProxyOption) (*InterContainerProxy, error) {
117 proxy := &InterContainerProxy{
118 kafkaHost: DefaultKafkaHost,
119 kafkaPort: DefaultKafkaPort,
khenaidooabad44c2018-08-03 16:58:35 -0400120 }
121
122 for _, option := range opts {
123 option(proxy)
124 }
125
126 // Create the locks for all the maps
khenaidoo43c82122018-11-22 18:38:28 -0500127 proxy.lockTopicRequestHandlerChannelMap = sync.RWMutex{}
khenaidooabad44c2018-08-03 16:58:35 -0400128 proxy.lockTransactionIdToChannelMap = sync.RWMutex{}
khenaidoo43c82122018-11-22 18:38:28 -0500129 proxy.lockTopicResponseChannelMap = sync.RWMutex{}
khenaidooabad44c2018-08-03 16:58:35 -0400130
131 return proxy, nil
132}
133
khenaidoo43c82122018-11-22 18:38:28 -0500134func (kp *InterContainerProxy) Start() error {
khenaidooabad44c2018-08-03 16:58:35 -0400135 log.Info("Starting-Proxy")
136
khenaidoo43c82122018-11-22 18:38:28 -0500137 // Kafka MsgClient should already have been created. If not, output fatal error
138 if kp.kafkaClient == nil {
139 log.Fatal("kafka-client-not-set")
140 }
141
khenaidooabad44c2018-08-03 16:58:35 -0400142 // Create the Done channel
143 kp.doneCh = make(chan int, 1)
144
khenaidoo43c82122018-11-22 18:38:28 -0500145 // Start the kafka client
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500146 if err := kp.kafkaClient.Start(); err != nil {
khenaidoo43c82122018-11-22 18:38:28 -0500147 log.Errorw("Cannot-create-kafka-proxy", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400148 return err
149 }
150
khenaidoo43c82122018-11-22 18:38:28 -0500151 // Create the topic to response channel map
152 kp.topicToResponseChannelMap = make(map[string]<-chan *ca.InterContainerMessage)
153 //
khenaidooabad44c2018-08-03 16:58:35 -0400154 // Create the transactionId to Channel Map
khenaidoo43c82122018-11-22 18:38:28 -0500155 kp.transactionIdToChannelMap = make(map[string]*transactionChannel)
156
157 // Create the topic to request channel map
158 kp.topicToRequestHandlerChannelMap = make(map[string]*requestHandlerChannel)
khenaidooabad44c2018-08-03 16:58:35 -0400159
160 return nil
161}
162
khenaidoo43c82122018-11-22 18:38:28 -0500163func (kp *InterContainerProxy) Stop() {
164 log.Info("stopping-intercontainer-proxy")
165 kp.doneCh <- 1
166 // TODO : Perform cleanup
167 //kp.kafkaClient.Stop()
168 //kp.deleteAllTopicRequestHandlerChannelMap()
169 //kp.deleteAllTopicResponseChannelMap()
170 //kp.deleteAllTransactionIdToChannelMap()
khenaidooabad44c2018-08-03 16:58:35 -0400171}
172
khenaidoo43c82122018-11-22 18:38:28 -0500173// InvokeRPC is used to send a request to a given topic
174func (kp *InterContainerProxy) InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
175 waitForResponse bool, kvArgs ...*KVArg) (bool, *any.Any) {
176
177 // If a replyToTopic is provided then we use it, otherwise just use the default toTopic. The replyToTopic is
178 // typically the device ID.
179 responseTopic := replyToTopic
180 if responseTopic == nil {
181 responseTopic = kp.DefaultTopic
182 }
183
khenaidooabad44c2018-08-03 16:58:35 -0400184 // Encode the request
khenaidoo43c82122018-11-22 18:38:28 -0500185 protoRequest, err := encodeRequest(rpc, toTopic, responseTopic, kvArgs...)
khenaidooabad44c2018-08-03 16:58:35 -0400186 if err != nil {
187 log.Warnw("cannot-format-request", log.Fields{"rpc": rpc, "error": err})
188 return false, nil
189 }
190
191 // Subscribe for response, if needed, before sending request
192 var ch <-chan *ca.InterContainerMessage
193 if waitForResponse {
194 var err error
khenaidoo43c82122018-11-22 18:38:28 -0500195 if ch, err = kp.subscribeForResponse(*responseTopic, protoRequest.Header.Id); err != nil {
196 log.Errorw("failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
khenaidooabad44c2018-08-03 16:58:35 -0400197 }
198 }
199
khenaidoo43c82122018-11-22 18:38:28 -0500200 // Send request - if the topic is formatted with a device Id then we will send the request using a
201 // specific key, hence ensuring a single partition is used to publish the request. This ensures that the
202 // subscriber on that topic will receive the request in the order it was sent. The key used is the deviceId.
203 key := GetDeviceIdFromTopic(*toTopic)
204 log.Debugw("sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key})
205 go kp.kafkaClient.Send(protoRequest, toTopic, key)
khenaidooabad44c2018-08-03 16:58:35 -0400206
207 if waitForResponse {
khenaidoob9203542018-09-17 22:56:37 -0400208 // Create a child context based on the parent context, if any
khenaidooabad44c2018-08-03 16:58:35 -0400209 var cancel context.CancelFunc
khenaidoob9203542018-09-17 22:56:37 -0400210 childCtx := context.Background()
khenaidooabad44c2018-08-03 16:58:35 -0400211 if ctx == nil {
212 ctx, cancel = context.WithTimeout(context.Background(), DefaultRequestTimeout*time.Millisecond)
khenaidoob9203542018-09-17 22:56:37 -0400213 } else {
214 childCtx, cancel = context.WithTimeout(ctx, DefaultRequestTimeout*time.Millisecond)
khenaidooabad44c2018-08-03 16:58:35 -0400215 }
khenaidoob9203542018-09-17 22:56:37 -0400216 defer cancel()
khenaidooabad44c2018-08-03 16:58:35 -0400217
218 // Wait for response as well as timeout or cancellation
219 // Remove the subscription for a response on return
220 defer kp.unSubscribeForResponse(protoRequest.Header.Id)
221 select {
222 case msg := <-ch:
khenaidoo43c82122018-11-22 18:38:28 -0500223 log.Debugw("received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
khenaidooabad44c2018-08-03 16:58:35 -0400224 var responseBody *ca.InterContainerResponseBody
225 var err error
226 if responseBody, err = decodeResponse(msg); err != nil {
227 log.Errorw("decode-response-error", log.Fields{"error": err})
228 }
229 return responseBody.Success, responseBody.Result
230 case <-ctx.Done():
231 log.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
232 // pack the error as proto any type
233 protoError := &ca.Error{Reason: ctx.Err().Error()}
234 var marshalledArg *any.Any
235 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
236 return false, nil // Should never happen
237 }
238 return false, marshalledArg
khenaidoob9203542018-09-17 22:56:37 -0400239 case <-childCtx.Done():
240 log.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": childCtx.Err()})
241 // pack the error as proto any type
242 protoError := &ca.Error{Reason: childCtx.Err().Error()}
243 var marshalledArg *any.Any
244 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
245 return false, nil // Should never happen
246 }
247 return false, marshalledArg
khenaidooabad44c2018-08-03 16:58:35 -0400248 case <-kp.doneCh:
khenaidoo43c82122018-11-22 18:38:28 -0500249 log.Infow("received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
khenaidooabad44c2018-08-03 16:58:35 -0400250 return true, nil
251 }
252 }
253 return true, nil
254}
255
khenaidoo43c82122018-11-22 18:38:28 -0500256// SubscribeWithRequestHandlerInterface allows a caller to assign a target object to be invoked automatically
khenaidooabad44c2018-08-03 16:58:35 -0400257// when a message is received on a given topic
khenaidoo43c82122018-11-22 18:38:28 -0500258func (kp *InterContainerProxy) SubscribeWithRequestHandlerInterface(topic Topic, handler interface{}) error {
khenaidooabad44c2018-08-03 16:58:35 -0400259
260 // Subscribe to receive messages for that topic
261 var ch <-chan *ca.InterContainerMessage
262 var err error
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500263 if ch, err = kp.kafkaClient.Subscribe(&topic); err != nil {
khenaidoo43c82122018-11-22 18:38:28 -0500264 //if ch, err = kp.Subscribe(topic); err != nil {
khenaidooabad44c2018-08-03 16:58:35 -0400265 log.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
266 }
khenaidoo43c82122018-11-22 18:38:28 -0500267
268 kp.defaultRequestHandlerInterface = handler
269 kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: handler, ch: ch})
khenaidooabad44c2018-08-03 16:58:35 -0400270 // Launch a go routine to receive and process kafka messages
khenaidoo43c82122018-11-22 18:38:28 -0500271 go kp.waitForRequest(ch, topic, handler)
khenaidooabad44c2018-08-03 16:58:35 -0400272
273 return nil
274}
275
khenaidoo43c82122018-11-22 18:38:28 -0500276// SubscribeWithDefaultRequestHandler allows a caller to add a topic to an existing target object to be invoked automatically
277// when a message is received on a given topic. So far there is only 1 target registered per microservice
278func (kp *InterContainerProxy) SubscribeWithDefaultRequestHandler(topic Topic) error {
279 // Subscribe to receive messages for that topic
280 var ch <-chan *ca.InterContainerMessage
281 var err error
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500282 if ch, err = kp.kafkaClient.Subscribe(&topic); err != nil {
khenaidoo43c82122018-11-22 18:38:28 -0500283 log.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
284 }
285 kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: kp.defaultRequestHandlerInterface, ch: ch})
286
287 // Launch a go routine to receive and process kafka messages
288 go kp.waitForRequest(ch, topic, kp.defaultRequestHandlerInterface)
289
khenaidooabad44c2018-08-03 16:58:35 -0400290 return nil
291}
292
khenaidoo43c82122018-11-22 18:38:28 -0500293func (kp *InterContainerProxy) UnSubscribeFromRequestHandler(topic Topic) error {
294 return kp.deleteFromTopicRequestHandlerChannelMap(topic.Name)
295}
296
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500297// setupTopicResponseChannelMap sets up single consumers channel that will act as a broadcast channel for all
khenaidoo43c82122018-11-22 18:38:28 -0500298// responses from that topic.
299func (kp *InterContainerProxy) setupTopicResponseChannelMap(topic string, arg <-chan *ca.InterContainerMessage) {
300 kp.lockTopicResponseChannelMap.Lock()
301 defer kp.lockTopicResponseChannelMap.Unlock()
302 if _, exist := kp.topicToResponseChannelMap[topic]; !exist {
303 kp.topicToResponseChannelMap[topic] = arg
khenaidooabad44c2018-08-03 16:58:35 -0400304 }
305}
306
khenaidoo43c82122018-11-22 18:38:28 -0500307func (kp *InterContainerProxy) isTopicSubscribedForResponse(topic string) bool {
308 kp.lockTopicResponseChannelMap.Lock()
309 defer kp.lockTopicResponseChannelMap.Unlock()
310 _, exist := kp.topicToResponseChannelMap[topic]
311 return exist
312}
313
314func (kp *InterContainerProxy) deleteFromTopicResponseChannelMap(topic string) error {
315 kp.lockTopicResponseChannelMap.Lock()
316 defer kp.lockTopicResponseChannelMap.Unlock()
317 if _, exist := kp.topicToResponseChannelMap[topic]; exist {
318 // Unsubscribe to this topic first - this will close the subscribed channel
319 var err error
320 if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
321 log.Errorw("unsubscribing-error", log.Fields{"topic": topic})
322 }
323 delete(kp.topicToResponseChannelMap, topic)
324 return err
325 } else {
326 return errors.New(fmt.Sprintf("%s-Topic-not-found", topic))
khenaidooabad44c2018-08-03 16:58:35 -0400327 }
328}
329
khenaidoo43c82122018-11-22 18:38:28 -0500330func (kp *InterContainerProxy) deleteAllTopicResponseChannelMap() error {
331 kp.lockTopicResponseChannelMap.Lock()
332 defer kp.lockTopicResponseChannelMap.Unlock()
333 var err error
334 for topic, _ := range kp.topicToResponseChannelMap {
335 // Unsubscribe to this topic first - this will close the subscribed channel
336 if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
337 log.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
338 }
339 delete(kp.topicToResponseChannelMap, topic)
khenaidooabad44c2018-08-03 16:58:35 -0400340 }
khenaidoo43c82122018-11-22 18:38:28 -0500341 return err
khenaidooabad44c2018-08-03 16:58:35 -0400342}
343
khenaidoo43c82122018-11-22 18:38:28 -0500344func (kp *InterContainerProxy) addToTopicRequestHandlerChannelMap(topic string, arg *requestHandlerChannel) {
345 kp.lockTopicRequestHandlerChannelMap.Lock()
346 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
347 if _, exist := kp.topicToRequestHandlerChannelMap[topic]; !exist {
348 kp.topicToRequestHandlerChannelMap[topic] = arg
khenaidooabad44c2018-08-03 16:58:35 -0400349 }
khenaidooabad44c2018-08-03 16:58:35 -0400350}
351
khenaidoo43c82122018-11-22 18:38:28 -0500352func (kp *InterContainerProxy) deleteFromTopicRequestHandlerChannelMap(topic string) error {
353 kp.lockTopicRequestHandlerChannelMap.Lock()
354 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
355 if _, exist := kp.topicToRequestHandlerChannelMap[topic]; exist {
356 // Close the kafka client client first by unsubscribing to this topic
357 kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch)
358 delete(kp.topicToRequestHandlerChannelMap, topic)
khenaidooabad44c2018-08-03 16:58:35 -0400359 return nil
khenaidoo43c82122018-11-22 18:38:28 -0500360 } else {
361 return errors.New(fmt.Sprintf("%s-Topic-not-found", topic))
khenaidooabad44c2018-08-03 16:58:35 -0400362 }
khenaidooabad44c2018-08-03 16:58:35 -0400363}
364
khenaidoo43c82122018-11-22 18:38:28 -0500365func (kp *InterContainerProxy) deleteAllTopicRequestHandlerChannelMap() error {
366 kp.lockTopicRequestHandlerChannelMap.Lock()
367 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
368 var err error
369 for topic, _ := range kp.topicToRequestHandlerChannelMap {
370 // Close the kafka client client first by unsubscribing to this topic
371 if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
372 log.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
373 }
374 delete(kp.topicToRequestHandlerChannelMap, topic)
375 }
376 return err
377}
378
379func (kp *InterContainerProxy) addToTransactionIdToChannelMap(id string, topic *Topic, arg chan *ca.InterContainerMessage) {
khenaidooabad44c2018-08-03 16:58:35 -0400380 kp.lockTransactionIdToChannelMap.Lock()
381 defer kp.lockTransactionIdToChannelMap.Unlock()
382 if _, exist := kp.transactionIdToChannelMap[id]; !exist {
khenaidoo43c82122018-11-22 18:38:28 -0500383 kp.transactionIdToChannelMap[id] = &transactionChannel{topic: topic, ch: arg}
khenaidooabad44c2018-08-03 16:58:35 -0400384 }
385}
386
khenaidoo43c82122018-11-22 18:38:28 -0500387func (kp *InterContainerProxy) deleteFromTransactionIdToChannelMap(id string) {
khenaidooabad44c2018-08-03 16:58:35 -0400388 kp.lockTransactionIdToChannelMap.Lock()
389 defer kp.lockTransactionIdToChannelMap.Unlock()
khenaidoo43c82122018-11-22 18:38:28 -0500390 if transChannel, exist := kp.transactionIdToChannelMap[id]; exist {
391 // Close the channel first
392 close(transChannel.ch)
khenaidooabad44c2018-08-03 16:58:35 -0400393 delete(kp.transactionIdToChannelMap, id)
394 }
395}
396
khenaidoo43c82122018-11-22 18:38:28 -0500397func (kp *InterContainerProxy) deleteTopicTransactionIdToChannelMap(id string) {
398 kp.lockTransactionIdToChannelMap.Lock()
399 defer kp.lockTransactionIdToChannelMap.Unlock()
400 for key, value := range kp.transactionIdToChannelMap {
401 if value.topic.Name == id {
402 close(value.ch)
403 delete(kp.transactionIdToChannelMap, key)
khenaidooabad44c2018-08-03 16:58:35 -0400404 }
405 }
khenaidooabad44c2018-08-03 16:58:35 -0400406}
407
khenaidoo43c82122018-11-22 18:38:28 -0500408func (kp *InterContainerProxy) deleteAllTransactionIdToChannelMap() {
409 kp.lockTransactionIdToChannelMap.Lock()
410 defer kp.lockTransactionIdToChannelMap.Unlock()
411 for key, value := range kp.transactionIdToChannelMap {
412 close(value.ch)
413 delete(kp.transactionIdToChannelMap, key)
khenaidooabad44c2018-08-03 16:58:35 -0400414 }
khenaidooabad44c2018-08-03 16:58:35 -0400415}
416
khenaidoo43c82122018-11-22 18:38:28 -0500417func (kp *InterContainerProxy) DeleteTopic(topic Topic) error {
418 // If we have any consumers on that topic we need to close them
419 kp.deleteFromTopicResponseChannelMap(topic.Name)
420 kp.deleteFromTopicRequestHandlerChannelMap(topic.Name)
421 kp.deleteTopicTransactionIdToChannelMap(topic.Name)
422 return kp.kafkaClient.DeleteTopic(&topic)
423}
424
425func encodeReturnedValue(returnedVal interface{}) (*any.Any, error) {
khenaidooabad44c2018-08-03 16:58:35 -0400426 // Encode the response argument - needs to be a proto message
427 if returnedVal == nil {
428 return nil, nil
429 }
430 protoValue, ok := returnedVal.(proto.Message)
431 if !ok {
432 log.Warnw("response-value-not-proto-message", log.Fields{"error": ok, "returnVal": returnedVal})
433 err := errors.New("response-value-not-proto-message")
434 return nil, err
435 }
436
437 // Marshal the returned value, if any
438 var marshalledReturnedVal *any.Any
439 var err error
440 if marshalledReturnedVal, err = ptypes.MarshalAny(protoValue); err != nil {
441 log.Warnw("cannot-marshal-returned-val", log.Fields{"error": err})
442 return nil, err
443 }
444 return marshalledReturnedVal, nil
445}
446
447func encodeDefaultFailedResponse(request *ca.InterContainerMessage) *ca.InterContainerMessage {
448 responseHeader := &ca.Header{
449 Id: request.Header.Id,
450 Type: ca.MessageType_RESPONSE,
451 FromTopic: request.Header.ToTopic,
452 ToTopic: request.Header.FromTopic,
453 Timestamp: time.Now().Unix(),
454 }
455 responseBody := &ca.InterContainerResponseBody{
456 Success: false,
457 Result: nil,
458 }
459 var marshalledResponseBody *any.Any
460 var err error
461 // Error should never happen here
462 if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
463 log.Warnw("cannot-marshal-failed-response-body", log.Fields{"error": err})
464 }
465
466 return &ca.InterContainerMessage{
467 Header: responseHeader,
468 Body: marshalledResponseBody,
469 }
470
471}
472
473//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
474//or an error on failure
475func encodeResponse(request *ca.InterContainerMessage, success bool, returnedValues ...interface{}) (*ca.InterContainerMessage, error) {
khenaidoo43c82122018-11-22 18:38:28 -0500476 //log.Debugw("encodeResponse", log.Fields{"success": success, "returnedValues": returnedValues})
khenaidooabad44c2018-08-03 16:58:35 -0400477 responseHeader := &ca.Header{
478 Id: request.Header.Id,
479 Type: ca.MessageType_RESPONSE,
480 FromTopic: request.Header.ToTopic,
481 ToTopic: request.Header.FromTopic,
482 Timestamp: time.Now().Unix(),
483 }
484
485 // Go over all returned values
486 var marshalledReturnedVal *any.Any
487 var err error
488 for _, returnVal := range returnedValues {
khenaidoo43c82122018-11-22 18:38:28 -0500489 if marshalledReturnedVal, err = encodeReturnedValue(returnVal); err != nil {
khenaidooabad44c2018-08-03 16:58:35 -0400490 log.Warnw("cannot-marshal-response-body", log.Fields{"error": err})
491 }
492 break // for now we support only 1 returned value - (excluding the error)
493 }
494
495 responseBody := &ca.InterContainerResponseBody{
496 Success: success,
497 Result: marshalledReturnedVal,
498 }
499
500 // Marshal the response body
501 var marshalledResponseBody *any.Any
502 if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
503 log.Warnw("cannot-marshal-response-body", log.Fields{"error": err})
504 return nil, err
505 }
506
507 return &ca.InterContainerMessage{
508 Header: responseHeader,
509 Body: marshalledResponseBody,
510 }, nil
511}
512
513func CallFuncByName(myClass interface{}, funcName string, params ...interface{}) (out []reflect.Value, err error) {
514 myClassValue := reflect.ValueOf(myClass)
515 m := myClassValue.MethodByName(funcName)
516 if !m.IsValid() {
khenaidoo43c82122018-11-22 18:38:28 -0500517 return make([]reflect.Value, 0), fmt.Errorf("method-not-found \"%s\"", funcName)
khenaidooabad44c2018-08-03 16:58:35 -0400518 }
519 in := make([]reflect.Value, len(params))
520 for i, param := range params {
521 in[i] = reflect.ValueOf(param)
522 }
523 out = m.Call(in)
524 return
525}
526
khenaidoo43c82122018-11-22 18:38:28 -0500527func (kp *InterContainerProxy) handleRequest(msg *ca.InterContainerMessage, targetInterface interface{}) {
khenaidooabad44c2018-08-03 16:58:35 -0400528
khenaidoo43c82122018-11-22 18:38:28 -0500529 // First extract the header to know whether this is a request - responses are handled by a different handler
khenaidooabad44c2018-08-03 16:58:35 -0400530 if msg.Header.Type == ca.MessageType_REQUEST {
khenaidooabad44c2018-08-03 16:58:35 -0400531
532 var out []reflect.Value
533 var err error
534
535 // Get the request body
536 requestBody := &ca.InterContainerRequestBody{}
537 if err = ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
538 log.Warnw("cannot-unmarshal-request", log.Fields{"error": err})
539 } else {
khenaidoo43c82122018-11-22 18:38:28 -0500540 log.Debugw("received-request", log.Fields{"rpc": requestBody.Rpc, "header": msg.Header})
khenaidooabad44c2018-08-03 16:58:35 -0400541 // let the callee unpack the arguments as its the only one that knows the real proto type
542 out, err = CallFuncByName(targetInterface, requestBody.Rpc, requestBody.Args)
543 if err != nil {
544 log.Warn(err)
545 }
546 }
547 // Response required?
548 if requestBody.ResponseRequired {
549 // If we already have an error before then just return that
550 var returnError *ca.Error
551 var returnedValues []interface{}
552 var success bool
553 if err != nil {
554 returnError = &ca.Error{Reason: err.Error()}
555 returnedValues = make([]interface{}, 1)
556 returnedValues[0] = returnError
557 } else {
khenaidoo43c82122018-11-22 18:38:28 -0500558 //log.Debugw("returned-api-response", log.Fields{"len": len(out), "err": err})
khenaidoob9203542018-09-17 22:56:37 -0400559 returnedValues = make([]interface{}, 0)
560 // Check for errors first
561 lastIndex := len(out) - 1
562 if out[lastIndex].Interface() != nil { // Error
563 if goError, ok := out[lastIndex].Interface().(error); ok {
564 returnError = &ca.Error{Reason: goError.Error()}
565 returnedValues = append(returnedValues, returnError)
566 } else { // Should never happen
567 returnError = &ca.Error{Reason: "incorrect-error-returns"}
568 returnedValues = append(returnedValues, returnError)
569 }
570 } else { // Non-error case
571 success = true
572 for idx, val := range out {
khenaidoo43c82122018-11-22 18:38:28 -0500573 //log.Debugw("returned-api-response-loop", log.Fields{"idx": idx, "val": val.Interface()})
khenaidoob9203542018-09-17 22:56:37 -0400574 if idx != lastIndex {
575 returnedValues = append(returnedValues, val.Interface())
khenaidooabad44c2018-08-03 16:58:35 -0400576 }
khenaidooabad44c2018-08-03 16:58:35 -0400577 }
578 }
579 }
580
581 var icm *ca.InterContainerMessage
582 if icm, err = encodeResponse(msg, success, returnedValues...); err != nil {
khenaidoob9203542018-09-17 22:56:37 -0400583 log.Warnw("error-encoding-response-returning-failure-result", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400584 icm = encodeDefaultFailedResponse(msg)
585 }
khenaidoo43c82122018-11-22 18:38:28 -0500586 // To preserve ordering of messages, all messages to a given topic are sent to the same partition
587 // by providing a message key. The key is encoded in the topic name. If the deviceId is not
588 // present then the key will be empty, hence all messages for a given topic will be sent to all
589 // partitions.
590 replyTopic := &Topic{Name: msg.Header.FromTopic}
591 key := GetDeviceIdFromTopic(*replyTopic)
592 log.Debugw("sending-response-to-kafka", log.Fields{"rpc": requestBody.Rpc, "header": msg.Header, "key": key})
593 // TODO: handle error response.
594 kp.kafkaClient.Send(icm, replyTopic, key)
khenaidooabad44c2018-08-03 16:58:35 -0400595 }
596
khenaidooabad44c2018-08-03 16:58:35 -0400597 }
598}
599
khenaidoo43c82122018-11-22 18:38:28 -0500600func (kp *InterContainerProxy) waitForRequest(ch <-chan *ca.InterContainerMessage, topic Topic, targetInterface interface{}) {
khenaidooabad44c2018-08-03 16:58:35 -0400601 // Wait for messages
602 for msg := range ch {
khenaidoo43c82122018-11-22 18:38:28 -0500603 //log.Debugw("request-received", log.Fields{"msg": msg, "topic": topic.Name, "target": targetInterface})
khenaidooabad44c2018-08-03 16:58:35 -0400604 go kp.handleRequest(msg, targetInterface)
605 }
606}
607
khenaidoo43c82122018-11-22 18:38:28 -0500608func (kp *InterContainerProxy) dispatchResponse(msg *ca.InterContainerMessage) {
khenaidooabad44c2018-08-03 16:58:35 -0400609 kp.lockTransactionIdToChannelMap.Lock()
610 defer kp.lockTransactionIdToChannelMap.Unlock()
611 if _, exist := kp.transactionIdToChannelMap[msg.Header.Id]; !exist {
612 log.Debugw("no-waiting-channel", log.Fields{"transaction": msg.Header.Id})
613 return
614 }
khenaidoo43c82122018-11-22 18:38:28 -0500615 kp.transactionIdToChannelMap[msg.Header.Id].ch <- msg
khenaidooabad44c2018-08-03 16:58:35 -0400616}
617
khenaidoo43c82122018-11-22 18:38:28 -0500618// waitForResponse listens for messages on the subscribedCh, ensure we get a response with the transaction ID,
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500619// and then dispatches to the consumers
khenaidoo43c82122018-11-22 18:38:28 -0500620func (kp *InterContainerProxy) waitForResponseLoop(subscribedCh <-chan *ca.InterContainerMessage, topic *Topic) {
621 log.Debugw("starting-response-loop-for-topic", log.Fields{"topic": topic.Name})
khenaidooabad44c2018-08-03 16:58:35 -0400622startloop:
623 for {
624 select {
khenaidoo43c82122018-11-22 18:38:28 -0500625 case msg := <-subscribedCh:
626 //log.Debugw("message-received", log.Fields{"msg": msg, "fromTopic": msg.Header.FromTopic})
627 if msg.Header.Type == ca.MessageType_RESPONSE {
628 go kp.dispatchResponse(msg)
629 }
khenaidooabad44c2018-08-03 16:58:35 -0400630 case <-kp.doneCh:
631 log.Infow("received-exit-signal", log.Fields{"topic": topic.Name})
632 break startloop
633 }
634 }
khenaidoo43c82122018-11-22 18:38:28 -0500635 //log.Infow("received-exit-signal-out-of-for-loop", log.Fields{"topic": topic.Name})
636 // We got an exit signal. Unsubscribe to the channel
637 //kp.kafkaClient.UnSubscribe(topic, subscribedCh)
khenaidooabad44c2018-08-03 16:58:35 -0400638}
639
640// subscribeForResponse allows a caller to subscribe to a given topic when waiting for a response.
641// This method is built to prevent all subscribers to receive all messages as is the case of the Subscribe
642// API. There is one response channel waiting for kafka messages before dispatching the message to the
643// corresponding waiting channel
khenaidoo43c82122018-11-22 18:38:28 -0500644func (kp *InterContainerProxy) subscribeForResponse(topic Topic, trnsId string) (chan *ca.InterContainerMessage, error) {
khenaidoob9203542018-09-17 22:56:37 -0400645 log.Debugw("subscribeForResponse", log.Fields{"topic": topic.Name, "trnsid": trnsId})
khenaidooabad44c2018-08-03 16:58:35 -0400646
khenaidoo43c82122018-11-22 18:38:28 -0500647 // First check whether we already have a channel listening for response on that topic. If there is
648 // already one then it will be reused. If not, it will be created.
649 if !kp.isTopicSubscribedForResponse(topic.Name) {
650 var subscribedCh <-chan *ca.InterContainerMessage
khenaidooabad44c2018-08-03 16:58:35 -0400651 var err error
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500652 if subscribedCh, err = kp.kafkaClient.Subscribe(&topic); err != nil {
khenaidoo43c82122018-11-22 18:38:28 -0500653 log.Debugw("subscribe-failure", log.Fields{"topic": topic.Name})
khenaidooabad44c2018-08-03 16:58:35 -0400654 return nil, err
655 }
khenaidoo43c82122018-11-22 18:38:28 -0500656 kp.setupTopicResponseChannelMap(topic.Name, subscribedCh)
657 go kp.waitForResponseLoop(subscribedCh, &topic)
khenaidooabad44c2018-08-03 16:58:35 -0400658 }
659
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500660 // Create a specific channel for this consumers. We cannot use the channel from the kafkaclient as it will
khenaidoo43c82122018-11-22 18:38:28 -0500661 // broadcast any message for this topic to all channels waiting on it.
khenaidooabad44c2018-08-03 16:58:35 -0400662 ch := make(chan *ca.InterContainerMessage)
khenaidoo43c82122018-11-22 18:38:28 -0500663 kp.addToTransactionIdToChannelMap(trnsId, &topic, ch)
khenaidooabad44c2018-08-03 16:58:35 -0400664
665 return ch, nil
666}
667
khenaidoo43c82122018-11-22 18:38:28 -0500668func (kp *InterContainerProxy) unSubscribeForResponse(trnsId string) error {
khenaidooabad44c2018-08-03 16:58:35 -0400669 log.Debugw("unsubscribe-for-response", log.Fields{"trnsId": trnsId})
khenaidoo43c82122018-11-22 18:38:28 -0500670 if _, exist := kp.transactionIdToChannelMap[trnsId]; exist {
671 // The delete operation will close the channel
672 kp.deleteFromTransactionIdToChannelMap(trnsId)
673 }
khenaidooabad44c2018-08-03 16:58:35 -0400674 return nil
675}
676
677//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
678//or an error on failure
679func encodeRequest(rpc string, toTopic *Topic, replyTopic *Topic, kvArgs ...*KVArg) (*ca.InterContainerMessage, error) {
680 requestHeader := &ca.Header{
681 Id: uuid.New().String(),
682 Type: ca.MessageType_REQUEST,
683 FromTopic: replyTopic.Name,
684 ToTopic: toTopic.Name,
685 Timestamp: time.Now().Unix(),
686 }
687 requestBody := &ca.InterContainerRequestBody{
688 Rpc: rpc,
689 ResponseRequired: true,
690 ReplyToTopic: replyTopic.Name,
691 }
692
693 for _, arg := range kvArgs {
khenaidoo2c6f1672018-09-20 23:14:41 -0400694 if arg == nil {
695 // In case the caller sends an array with empty args
696 continue
697 }
khenaidooabad44c2018-08-03 16:58:35 -0400698 var marshalledArg *any.Any
699 var err error
700 // ascertain the value interface type is a proto.Message
701 protoValue, ok := arg.Value.(proto.Message)
702 if !ok {
703 log.Warnw("argument-value-not-proto-message", log.Fields{"error": ok, "Value": arg.Value})
704 err := errors.New("argument-value-not-proto-message")
705 return nil, err
706 }
707 if marshalledArg, err = ptypes.MarshalAny(protoValue); err != nil {
708 log.Warnw("cannot-marshal-request", log.Fields{"error": err})
709 return nil, err
710 }
711 protoArg := &ca.Argument{
712 Key: arg.Key,
713 Value: marshalledArg,
714 }
715 requestBody.Args = append(requestBody.Args, protoArg)
716 }
717
718 var marshalledData *any.Any
719 var err error
720 if marshalledData, err = ptypes.MarshalAny(requestBody); err != nil {
721 log.Warnw("cannot-marshal-request", log.Fields{"error": err})
722 return nil, err
723 }
724 request := &ca.InterContainerMessage{
725 Header: requestHeader,
726 Body: marshalledData,
727 }
728 return request, nil
729}
730
khenaidooabad44c2018-08-03 16:58:35 -0400731func decodeResponse(response *ca.InterContainerMessage) (*ca.InterContainerResponseBody, error) {
732 // Extract the message body
733 responseBody := ca.InterContainerResponseBody{}
khenaidooabad44c2018-08-03 16:58:35 -0400734 if err := ptypes.UnmarshalAny(response.Body, &responseBody); err != nil {
735 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
736 return nil, err
737 }
khenaidoo43c82122018-11-22 18:38:28 -0500738 //log.Debugw("response-decoded-successfully", log.Fields{"response-status": &responseBody.Success})
khenaidooabad44c2018-08-03 16:58:35 -0400739
740 return &responseBody, nil
741
742}