blob: 099d0d859d3666ac7d0920adf5b8b1a1f54bea17 [file] [log] [blame]
khenaidoobf6e7bb2018-08-14 22:27:29 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
khenaidooabad44c2018-08-03 16:58:35 -040016package kafka
17
18import (
19 "context"
20 "errors"
21 "fmt"
khenaidooabad44c2018-08-03 16:58:35 -040022 "github.com/golang/protobuf/proto"
23 "github.com/golang/protobuf/ptypes"
24 "github.com/golang/protobuf/ptypes/any"
25 "github.com/google/uuid"
26 "github.com/opencord/voltha-go/common/log"
khenaidoo79232702018-12-04 11:00:41 -050027 ic "github.com/opencord/voltha-go/protos/inter_container"
khenaidooabad44c2018-08-03 16:58:35 -040028 "reflect"
khenaidoo19374072018-12-11 11:05:15 -050029 "strings"
khenaidooabad44c2018-08-03 16:58:35 -040030 "sync"
31 "time"
32)
33
34// Initialize the logger - gets the default until the main function setup the logger
35func init() {
khenaidooca301322019-01-09 23:06:32 -050036 log.AddPackage(log.JSON, log.DebugLevel, nil)
khenaidooabad44c2018-08-03 16:58:35 -040037}
38
39const (
khenaidoo43c82122018-11-22 18:38:28 -050040 DefaultMaxRetries = 3
khenaidoo6d055132019-02-12 16:51:19 -050041 DefaultRequestTimeout = 10000 // 10000 milliseconds - to handle a wider latency range
khenaidooabad44c2018-08-03 16:58:35 -040042)
43
khenaidoo297cd252019-02-07 22:10:23 -050044const (
45 TransactionKey = "transactionID"
46)
47
khenaidoo43c82122018-11-22 18:38:28 -050048// requestHandlerChannel represents an interface associated with a channel. Whenever, an event is
49// obtained from that channel, this interface is invoked. This is used to handle
50// async requests into the Core via the kafka messaging bus
51type requestHandlerChannel struct {
52 requesthandlerInterface interface{}
khenaidoo79232702018-12-04 11:00:41 -050053 ch <-chan *ic.InterContainerMessage
khenaidooabad44c2018-08-03 16:58:35 -040054}
55
khenaidoo43c82122018-11-22 18:38:28 -050056// transactionChannel represents a combination of a topic and a channel onto which a response received
57// on the kafka bus will be sent to
58type transactionChannel struct {
59 topic *Topic
khenaidoo79232702018-12-04 11:00:41 -050060 ch chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -050061}
62
63// InterContainerProxy represents the messaging proxy
64type InterContainerProxy struct {
65 kafkaHost string
66 kafkaPort int
67 DefaultTopic *Topic
68 defaultRequestHandlerInterface interface{}
khenaidoo79232702018-12-04 11:00:41 -050069 deviceDiscoveryTopic *Topic
khenaidoo43c82122018-11-22 18:38:28 -050070 kafkaClient Client
71 doneCh chan int
72
73 // This map is used to map a topic to an interface and channel. When a request is received
74 // on that channel (registered to the topic) then that interface is invoked.
75 topicToRequestHandlerChannelMap map[string]*requestHandlerChannel
76 lockTopicRequestHandlerChannelMap sync.RWMutex
77
78 // This map is used to map a channel to a response topic. This channel handles all responses on that
khenaidoo4c1a5bf2018-11-29 15:53:42 -050079 // channel for that topic and forward them to the appropriate consumers channel, using the
khenaidoo43c82122018-11-22 18:38:28 -050080 // transactionIdToChannelMap.
khenaidoo79232702018-12-04 11:00:41 -050081 topicToResponseChannelMap map[string]<-chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -050082 lockTopicResponseChannelMap sync.RWMutex
83
khenaidoo4c1a5bf2018-11-29 15:53:42 -050084 // This map is used to map a transaction to a consumers channel. This is used whenever a request has been
khenaidoo43c82122018-11-22 18:38:28 -050085 // sent out and we are waiting for a response.
86 transactionIdToChannelMap map[string]*transactionChannel
khenaidooabad44c2018-08-03 16:58:35 -040087 lockTransactionIdToChannelMap sync.RWMutex
88}
89
khenaidoo43c82122018-11-22 18:38:28 -050090type InterContainerProxyOption func(*InterContainerProxy)
khenaidooabad44c2018-08-03 16:58:35 -040091
khenaidoo43c82122018-11-22 18:38:28 -050092func InterContainerHost(host string) InterContainerProxyOption {
93 return func(args *InterContainerProxy) {
94 args.kafkaHost = host
khenaidooabad44c2018-08-03 16:58:35 -040095 }
96}
97
khenaidoo43c82122018-11-22 18:38:28 -050098func InterContainerPort(port int) InterContainerProxyOption {
99 return func(args *InterContainerProxy) {
100 args.kafkaPort = port
khenaidooabad44c2018-08-03 16:58:35 -0400101 }
102}
103
khenaidoo43c82122018-11-22 18:38:28 -0500104func DefaultTopic(topic *Topic) InterContainerProxyOption {
105 return func(args *InterContainerProxy) {
khenaidooabad44c2018-08-03 16:58:35 -0400106 args.DefaultTopic = topic
107 }
108}
109
khenaidoo79232702018-12-04 11:00:41 -0500110func DeviceDiscoveryTopic(topic *Topic) InterContainerProxyOption {
111 return func(args *InterContainerProxy) {
112 args.deviceDiscoveryTopic = topic
113 }
114}
115
khenaidoo43c82122018-11-22 18:38:28 -0500116func RequestHandlerInterface(handler interface{}) InterContainerProxyOption {
117 return func(args *InterContainerProxy) {
118 args.defaultRequestHandlerInterface = handler
khenaidooabad44c2018-08-03 16:58:35 -0400119 }
120}
121
khenaidoo43c82122018-11-22 18:38:28 -0500122func MsgClient(client Client) InterContainerProxyOption {
123 return func(args *InterContainerProxy) {
124 args.kafkaClient = client
125 }
126}
127
128func NewInterContainerProxy(opts ...InterContainerProxyOption) (*InterContainerProxy, error) {
129 proxy := &InterContainerProxy{
130 kafkaHost: DefaultKafkaHost,
131 kafkaPort: DefaultKafkaPort,
khenaidooabad44c2018-08-03 16:58:35 -0400132 }
133
134 for _, option := range opts {
135 option(proxy)
136 }
137
138 // Create the locks for all the maps
khenaidoo43c82122018-11-22 18:38:28 -0500139 proxy.lockTopicRequestHandlerChannelMap = sync.RWMutex{}
khenaidooabad44c2018-08-03 16:58:35 -0400140 proxy.lockTransactionIdToChannelMap = sync.RWMutex{}
khenaidoo43c82122018-11-22 18:38:28 -0500141 proxy.lockTopicResponseChannelMap = sync.RWMutex{}
khenaidooabad44c2018-08-03 16:58:35 -0400142
143 return proxy, nil
144}
145
khenaidoo43c82122018-11-22 18:38:28 -0500146func (kp *InterContainerProxy) Start() error {
khenaidooabad44c2018-08-03 16:58:35 -0400147 log.Info("Starting-Proxy")
148
khenaidoo43c82122018-11-22 18:38:28 -0500149 // Kafka MsgClient should already have been created. If not, output fatal error
150 if kp.kafkaClient == nil {
151 log.Fatal("kafka-client-not-set")
152 }
153
khenaidooabad44c2018-08-03 16:58:35 -0400154 // Create the Done channel
155 kp.doneCh = make(chan int, 1)
156
khenaidoo43c82122018-11-22 18:38:28 -0500157 // Start the kafka client
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500158 if err := kp.kafkaClient.Start(); err != nil {
khenaidoo43c82122018-11-22 18:38:28 -0500159 log.Errorw("Cannot-create-kafka-proxy", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400160 return err
161 }
162
khenaidoo43c82122018-11-22 18:38:28 -0500163 // Create the topic to response channel map
khenaidoo79232702018-12-04 11:00:41 -0500164 kp.topicToResponseChannelMap = make(map[string]<-chan *ic.InterContainerMessage)
khenaidoo43c82122018-11-22 18:38:28 -0500165 //
khenaidooabad44c2018-08-03 16:58:35 -0400166 // Create the transactionId to Channel Map
khenaidoo43c82122018-11-22 18:38:28 -0500167 kp.transactionIdToChannelMap = make(map[string]*transactionChannel)
168
169 // Create the topic to request channel map
170 kp.topicToRequestHandlerChannelMap = make(map[string]*requestHandlerChannel)
khenaidooabad44c2018-08-03 16:58:35 -0400171
172 return nil
173}
174
khenaidoo43c82122018-11-22 18:38:28 -0500175func (kp *InterContainerProxy) Stop() {
176 log.Info("stopping-intercontainer-proxy")
177 kp.doneCh <- 1
178 // TODO : Perform cleanup
khenaidooca301322019-01-09 23:06:32 -0500179 kp.kafkaClient.Stop()
khenaidoo43c82122018-11-22 18:38:28 -0500180 //kp.deleteAllTopicRequestHandlerChannelMap()
181 //kp.deleteAllTopicResponseChannelMap()
182 //kp.deleteAllTransactionIdToChannelMap()
khenaidooabad44c2018-08-03 16:58:35 -0400183}
184
khenaidoo79232702018-12-04 11:00:41 -0500185// DeviceDiscovered publish the discovered device onto the kafka messaging bus
khenaidoo19374072018-12-11 11:05:15 -0500186func (kp *InterContainerProxy) DeviceDiscovered(deviceId string, deviceType string, parentId string, publisher string) error {
khenaidoo79232702018-12-04 11:00:41 -0500187 log.Debugw("sending-device-discovery-msg", log.Fields{"deviceId": deviceId})
188 // Simple validation
189 if deviceId == "" || deviceType == "" {
190 log.Errorw("invalid-parameters", log.Fields{"id": deviceId, "type": deviceType})
191 return errors.New("invalid-parameters")
192 }
193 // Create the device discovery message
194 header := &ic.Header{
195 Id: uuid.New().String(),
196 Type: ic.MessageType_DEVICE_DISCOVERED,
197 FromTopic: kp.DefaultTopic.Name,
198 ToTopic: kp.deviceDiscoveryTopic.Name,
199 Timestamp: time.Now().UnixNano(),
200 }
201 body := &ic.DeviceDiscovered{
202 Id: deviceId,
203 DeviceType: deviceType,
204 ParentId: parentId,
khenaidood2b6df92018-12-13 16:37:20 -0500205 Publisher: publisher,
khenaidoo79232702018-12-04 11:00:41 -0500206 }
207
208 var marshalledData *any.Any
209 var err error
210 if marshalledData, err = ptypes.MarshalAny(body); err != nil {
211 log.Errorw("cannot-marshal-request", log.Fields{"error": err})
212 return err
213 }
214 msg := &ic.InterContainerMessage{
215 Header: header,
216 Body: marshalledData,
217 }
218
219 // Send the message
220 if err := kp.kafkaClient.Send(msg, kp.deviceDiscoveryTopic); err != nil {
221 log.Errorw("cannot-send-device-discovery-message", log.Fields{"error": err})
222 return err
223 }
224 return nil
225}
226
khenaidoo43c82122018-11-22 18:38:28 -0500227// InvokeRPC is used to send a request to a given topic
228func (kp *InterContainerProxy) InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
229 waitForResponse bool, kvArgs ...*KVArg) (bool, *any.Any) {
230
231 // If a replyToTopic is provided then we use it, otherwise just use the default toTopic. The replyToTopic is
232 // typically the device ID.
233 responseTopic := replyToTopic
234 if responseTopic == nil {
235 responseTopic = kp.DefaultTopic
236 }
237
khenaidooabad44c2018-08-03 16:58:35 -0400238 // Encode the request
khenaidoo43c82122018-11-22 18:38:28 -0500239 protoRequest, err := encodeRequest(rpc, toTopic, responseTopic, kvArgs...)
khenaidooabad44c2018-08-03 16:58:35 -0400240 if err != nil {
241 log.Warnw("cannot-format-request", log.Fields{"rpc": rpc, "error": err})
242 return false, nil
243 }
244
245 // Subscribe for response, if needed, before sending request
khenaidoo79232702018-12-04 11:00:41 -0500246 var ch <-chan *ic.InterContainerMessage
khenaidooabad44c2018-08-03 16:58:35 -0400247 if waitForResponse {
248 var err error
khenaidoo43c82122018-11-22 18:38:28 -0500249 if ch, err = kp.subscribeForResponse(*responseTopic, protoRequest.Header.Id); err != nil {
250 log.Errorw("failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
khenaidooabad44c2018-08-03 16:58:35 -0400251 }
252 }
253
khenaidoo43c82122018-11-22 18:38:28 -0500254 // Send request - if the topic is formatted with a device Id then we will send the request using a
255 // specific key, hence ensuring a single partition is used to publish the request. This ensures that the
256 // subscriber on that topic will receive the request in the order it was sent. The key used is the deviceId.
257 key := GetDeviceIdFromTopic(*toTopic)
258 log.Debugw("sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key})
khenaidoo6d055132019-02-12 16:51:19 -0500259 kp.kafkaClient.Send(protoRequest, toTopic, key)
khenaidooabad44c2018-08-03 16:58:35 -0400260
261 if waitForResponse {
khenaidoob9203542018-09-17 22:56:37 -0400262 // Create a child context based on the parent context, if any
khenaidooabad44c2018-08-03 16:58:35 -0400263 var cancel context.CancelFunc
khenaidoob9203542018-09-17 22:56:37 -0400264 childCtx := context.Background()
khenaidooabad44c2018-08-03 16:58:35 -0400265 if ctx == nil {
266 ctx, cancel = context.WithTimeout(context.Background(), DefaultRequestTimeout*time.Millisecond)
khenaidoob9203542018-09-17 22:56:37 -0400267 } else {
268 childCtx, cancel = context.WithTimeout(ctx, DefaultRequestTimeout*time.Millisecond)
khenaidooabad44c2018-08-03 16:58:35 -0400269 }
khenaidoob9203542018-09-17 22:56:37 -0400270 defer cancel()
khenaidooabad44c2018-08-03 16:58:35 -0400271
272 // Wait for response as well as timeout or cancellation
273 // Remove the subscription for a response on return
274 defer kp.unSubscribeForResponse(protoRequest.Header.Id)
275 select {
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500276 case msg, ok := <-ch:
277 if !ok {
278 log.Warnw("channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
279 protoError := &ic.Error{Reason: "channel-closed"}
280 var marshalledArg *any.Any
281 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
282 return false, nil // Should never happen
283 }
284 return false, marshalledArg
285 }
khenaidoo43c82122018-11-22 18:38:28 -0500286 log.Debugw("received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
khenaidoo79232702018-12-04 11:00:41 -0500287 var responseBody *ic.InterContainerResponseBody
khenaidooabad44c2018-08-03 16:58:35 -0400288 var err error
289 if responseBody, err = decodeResponse(msg); err != nil {
290 log.Errorw("decode-response-error", log.Fields{"error": err})
291 }
292 return responseBody.Success, responseBody.Result
293 case <-ctx.Done():
294 log.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
295 // pack the error as proto any type
khenaidoo79232702018-12-04 11:00:41 -0500296 protoError := &ic.Error{Reason: ctx.Err().Error()}
khenaidooabad44c2018-08-03 16:58:35 -0400297 var marshalledArg *any.Any
298 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
299 return false, nil // Should never happen
300 }
301 return false, marshalledArg
khenaidoob9203542018-09-17 22:56:37 -0400302 case <-childCtx.Done():
303 log.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": childCtx.Err()})
304 // pack the error as proto any type
khenaidoo79232702018-12-04 11:00:41 -0500305 protoError := &ic.Error{Reason: childCtx.Err().Error()}
khenaidoob9203542018-09-17 22:56:37 -0400306 var marshalledArg *any.Any
307 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
308 return false, nil // Should never happen
309 }
310 return false, marshalledArg
khenaidooabad44c2018-08-03 16:58:35 -0400311 case <-kp.doneCh:
khenaidoo43c82122018-11-22 18:38:28 -0500312 log.Infow("received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
khenaidooabad44c2018-08-03 16:58:35 -0400313 return true, nil
314 }
315 }
316 return true, nil
317}
318
khenaidoo43c82122018-11-22 18:38:28 -0500319// SubscribeWithRequestHandlerInterface allows a caller to assign a target object to be invoked automatically
khenaidooabad44c2018-08-03 16:58:35 -0400320// when a message is received on a given topic
khenaidoo43c82122018-11-22 18:38:28 -0500321func (kp *InterContainerProxy) SubscribeWithRequestHandlerInterface(topic Topic, handler interface{}) error {
khenaidooabad44c2018-08-03 16:58:35 -0400322
323 // Subscribe to receive messages for that topic
khenaidoo79232702018-12-04 11:00:41 -0500324 var ch <-chan *ic.InterContainerMessage
khenaidooabad44c2018-08-03 16:58:35 -0400325 var err error
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500326 if ch, err = kp.kafkaClient.Subscribe(&topic); err != nil {
khenaidoo43c82122018-11-22 18:38:28 -0500327 //if ch, err = kp.Subscribe(topic); err != nil {
khenaidooabad44c2018-08-03 16:58:35 -0400328 log.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
329 }
khenaidoo43c82122018-11-22 18:38:28 -0500330
331 kp.defaultRequestHandlerInterface = handler
332 kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: handler, ch: ch})
khenaidooabad44c2018-08-03 16:58:35 -0400333 // Launch a go routine to receive and process kafka messages
khenaidoo43c82122018-11-22 18:38:28 -0500334 go kp.waitForRequest(ch, topic, handler)
khenaidooabad44c2018-08-03 16:58:35 -0400335
336 return nil
337}
338
khenaidoo43c82122018-11-22 18:38:28 -0500339// SubscribeWithDefaultRequestHandler allows a caller to add a topic to an existing target object to be invoked automatically
340// when a message is received on a given topic. So far there is only 1 target registered per microservice
khenaidoo731697e2019-01-29 16:03:29 -0500341func (kp *InterContainerProxy) SubscribeWithDefaultRequestHandler(topic Topic, initialOffset int64) error {
khenaidoo43c82122018-11-22 18:38:28 -0500342 // Subscribe to receive messages for that topic
khenaidoo79232702018-12-04 11:00:41 -0500343 var ch <-chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -0500344 var err error
khenaidoo731697e2019-01-29 16:03:29 -0500345 if ch, err = kp.kafkaClient.Subscribe(&topic, &KVArg{Key:Offset, Value:initialOffset}); err != nil {
khenaidoo43c82122018-11-22 18:38:28 -0500346 log.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
khenaidooca301322019-01-09 23:06:32 -0500347 return err
khenaidoo43c82122018-11-22 18:38:28 -0500348 }
349 kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: kp.defaultRequestHandlerInterface, ch: ch})
350
351 // Launch a go routine to receive and process kafka messages
352 go kp.waitForRequest(ch, topic, kp.defaultRequestHandlerInterface)
353
khenaidooabad44c2018-08-03 16:58:35 -0400354 return nil
355}
356
khenaidoo43c82122018-11-22 18:38:28 -0500357func (kp *InterContainerProxy) UnSubscribeFromRequestHandler(topic Topic) error {
358 return kp.deleteFromTopicRequestHandlerChannelMap(topic.Name)
359}
360
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500361// setupTopicResponseChannelMap sets up single consumers channel that will act as a broadcast channel for all
khenaidoo43c82122018-11-22 18:38:28 -0500362// responses from that topic.
khenaidoo79232702018-12-04 11:00:41 -0500363func (kp *InterContainerProxy) setupTopicResponseChannelMap(topic string, arg <-chan *ic.InterContainerMessage) {
khenaidoo43c82122018-11-22 18:38:28 -0500364 kp.lockTopicResponseChannelMap.Lock()
365 defer kp.lockTopicResponseChannelMap.Unlock()
366 if _, exist := kp.topicToResponseChannelMap[topic]; !exist {
367 kp.topicToResponseChannelMap[topic] = arg
khenaidooabad44c2018-08-03 16:58:35 -0400368 }
369}
370
khenaidoo43c82122018-11-22 18:38:28 -0500371func (kp *InterContainerProxy) isTopicSubscribedForResponse(topic string) bool {
372 kp.lockTopicResponseChannelMap.Lock()
373 defer kp.lockTopicResponseChannelMap.Unlock()
374 _, exist := kp.topicToResponseChannelMap[topic]
375 return exist
376}
377
378func (kp *InterContainerProxy) deleteFromTopicResponseChannelMap(topic string) error {
379 kp.lockTopicResponseChannelMap.Lock()
380 defer kp.lockTopicResponseChannelMap.Unlock()
381 if _, exist := kp.topicToResponseChannelMap[topic]; exist {
382 // Unsubscribe to this topic first - this will close the subscribed channel
383 var err error
384 if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
385 log.Errorw("unsubscribing-error", log.Fields{"topic": topic})
386 }
387 delete(kp.topicToResponseChannelMap, topic)
388 return err
389 } else {
390 return errors.New(fmt.Sprintf("%s-Topic-not-found", topic))
khenaidooabad44c2018-08-03 16:58:35 -0400391 }
392}
393
khenaidoo43c82122018-11-22 18:38:28 -0500394func (kp *InterContainerProxy) deleteAllTopicResponseChannelMap() error {
395 kp.lockTopicResponseChannelMap.Lock()
396 defer kp.lockTopicResponseChannelMap.Unlock()
397 var err error
398 for topic, _ := range kp.topicToResponseChannelMap {
399 // Unsubscribe to this topic first - this will close the subscribed channel
400 if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
401 log.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
402 }
403 delete(kp.topicToResponseChannelMap, topic)
khenaidooabad44c2018-08-03 16:58:35 -0400404 }
khenaidoo43c82122018-11-22 18:38:28 -0500405 return err
khenaidooabad44c2018-08-03 16:58:35 -0400406}
407
khenaidoo43c82122018-11-22 18:38:28 -0500408func (kp *InterContainerProxy) addToTopicRequestHandlerChannelMap(topic string, arg *requestHandlerChannel) {
409 kp.lockTopicRequestHandlerChannelMap.Lock()
410 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
411 if _, exist := kp.topicToRequestHandlerChannelMap[topic]; !exist {
412 kp.topicToRequestHandlerChannelMap[topic] = arg
khenaidooabad44c2018-08-03 16:58:35 -0400413 }
khenaidooabad44c2018-08-03 16:58:35 -0400414}
415
khenaidoo43c82122018-11-22 18:38:28 -0500416func (kp *InterContainerProxy) deleteFromTopicRequestHandlerChannelMap(topic string) error {
417 kp.lockTopicRequestHandlerChannelMap.Lock()
418 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
419 if _, exist := kp.topicToRequestHandlerChannelMap[topic]; exist {
420 // Close the kafka client client first by unsubscribing to this topic
421 kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch)
422 delete(kp.topicToRequestHandlerChannelMap, topic)
khenaidooabad44c2018-08-03 16:58:35 -0400423 return nil
khenaidoo43c82122018-11-22 18:38:28 -0500424 } else {
425 return errors.New(fmt.Sprintf("%s-Topic-not-found", topic))
khenaidooabad44c2018-08-03 16:58:35 -0400426 }
khenaidooabad44c2018-08-03 16:58:35 -0400427}
428
khenaidoo43c82122018-11-22 18:38:28 -0500429func (kp *InterContainerProxy) deleteAllTopicRequestHandlerChannelMap() error {
430 kp.lockTopicRequestHandlerChannelMap.Lock()
431 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
432 var err error
433 for topic, _ := range kp.topicToRequestHandlerChannelMap {
434 // Close the kafka client client first by unsubscribing to this topic
435 if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
436 log.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
437 }
438 delete(kp.topicToRequestHandlerChannelMap, topic)
439 }
440 return err
441}
442
khenaidoo79232702018-12-04 11:00:41 -0500443func (kp *InterContainerProxy) addToTransactionIdToChannelMap(id string, topic *Topic, arg chan *ic.InterContainerMessage) {
khenaidooabad44c2018-08-03 16:58:35 -0400444 kp.lockTransactionIdToChannelMap.Lock()
445 defer kp.lockTransactionIdToChannelMap.Unlock()
446 if _, exist := kp.transactionIdToChannelMap[id]; !exist {
khenaidoo43c82122018-11-22 18:38:28 -0500447 kp.transactionIdToChannelMap[id] = &transactionChannel{topic: topic, ch: arg}
khenaidooabad44c2018-08-03 16:58:35 -0400448 }
449}
450
khenaidoo43c82122018-11-22 18:38:28 -0500451func (kp *InterContainerProxy) deleteFromTransactionIdToChannelMap(id string) {
khenaidooabad44c2018-08-03 16:58:35 -0400452 kp.lockTransactionIdToChannelMap.Lock()
453 defer kp.lockTransactionIdToChannelMap.Unlock()
khenaidoo43c82122018-11-22 18:38:28 -0500454 if transChannel, exist := kp.transactionIdToChannelMap[id]; exist {
455 // Close the channel first
456 close(transChannel.ch)
khenaidooabad44c2018-08-03 16:58:35 -0400457 delete(kp.transactionIdToChannelMap, id)
458 }
459}
460
khenaidoo43c82122018-11-22 18:38:28 -0500461func (kp *InterContainerProxy) deleteTopicTransactionIdToChannelMap(id string) {
462 kp.lockTransactionIdToChannelMap.Lock()
463 defer kp.lockTransactionIdToChannelMap.Unlock()
464 for key, value := range kp.transactionIdToChannelMap {
465 if value.topic.Name == id {
466 close(value.ch)
467 delete(kp.transactionIdToChannelMap, key)
khenaidooabad44c2018-08-03 16:58:35 -0400468 }
469 }
khenaidooabad44c2018-08-03 16:58:35 -0400470}
471
khenaidoo43c82122018-11-22 18:38:28 -0500472func (kp *InterContainerProxy) deleteAllTransactionIdToChannelMap() {
473 kp.lockTransactionIdToChannelMap.Lock()
474 defer kp.lockTransactionIdToChannelMap.Unlock()
475 for key, value := range kp.transactionIdToChannelMap {
476 close(value.ch)
477 delete(kp.transactionIdToChannelMap, key)
khenaidooabad44c2018-08-03 16:58:35 -0400478 }
khenaidooabad44c2018-08-03 16:58:35 -0400479}
480
khenaidoo43c82122018-11-22 18:38:28 -0500481func (kp *InterContainerProxy) DeleteTopic(topic Topic) error {
482 // If we have any consumers on that topic we need to close them
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500483 if err := kp.deleteFromTopicResponseChannelMap(topic.Name); err != nil {
484 log.Errorw("delete-from-topic-responsechannelmap-failed", log.Fields{"error": err})
485 }
486 if err := kp.deleteFromTopicRequestHandlerChannelMap(topic.Name); err != nil {
487 log.Errorw("delete-from-topic-requesthandlerchannelmap-failed", log.Fields{"error": err})
488 }
khenaidoo43c82122018-11-22 18:38:28 -0500489 kp.deleteTopicTransactionIdToChannelMap(topic.Name)
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500490
khenaidoo43c82122018-11-22 18:38:28 -0500491 return kp.kafkaClient.DeleteTopic(&topic)
492}
493
494func encodeReturnedValue(returnedVal interface{}) (*any.Any, error) {
khenaidooabad44c2018-08-03 16:58:35 -0400495 // Encode the response argument - needs to be a proto message
496 if returnedVal == nil {
497 return nil, nil
498 }
499 protoValue, ok := returnedVal.(proto.Message)
500 if !ok {
501 log.Warnw("response-value-not-proto-message", log.Fields{"error": ok, "returnVal": returnedVal})
502 err := errors.New("response-value-not-proto-message")
503 return nil, err
504 }
505
506 // Marshal the returned value, if any
507 var marshalledReturnedVal *any.Any
508 var err error
509 if marshalledReturnedVal, err = ptypes.MarshalAny(protoValue); err != nil {
510 log.Warnw("cannot-marshal-returned-val", log.Fields{"error": err})
511 return nil, err
512 }
513 return marshalledReturnedVal, nil
514}
515
khenaidoo79232702018-12-04 11:00:41 -0500516func encodeDefaultFailedResponse(request *ic.InterContainerMessage) *ic.InterContainerMessage {
517 responseHeader := &ic.Header{
khenaidooabad44c2018-08-03 16:58:35 -0400518 Id: request.Header.Id,
khenaidoo79232702018-12-04 11:00:41 -0500519 Type: ic.MessageType_RESPONSE,
khenaidooabad44c2018-08-03 16:58:35 -0400520 FromTopic: request.Header.ToTopic,
521 ToTopic: request.Header.FromTopic,
522 Timestamp: time.Now().Unix(),
523 }
khenaidoo79232702018-12-04 11:00:41 -0500524 responseBody := &ic.InterContainerResponseBody{
khenaidooabad44c2018-08-03 16:58:35 -0400525 Success: false,
526 Result: nil,
527 }
528 var marshalledResponseBody *any.Any
529 var err error
530 // Error should never happen here
531 if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
532 log.Warnw("cannot-marshal-failed-response-body", log.Fields{"error": err})
533 }
534
khenaidoo79232702018-12-04 11:00:41 -0500535 return &ic.InterContainerMessage{
khenaidooabad44c2018-08-03 16:58:35 -0400536 Header: responseHeader,
537 Body: marshalledResponseBody,
538 }
539
540}
541
542//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
543//or an error on failure
khenaidoo79232702018-12-04 11:00:41 -0500544func encodeResponse(request *ic.InterContainerMessage, success bool, returnedValues ...interface{}) (*ic.InterContainerMessage, error) {
khenaidoo43c82122018-11-22 18:38:28 -0500545 //log.Debugw("encodeResponse", log.Fields{"success": success, "returnedValues": returnedValues})
khenaidoo79232702018-12-04 11:00:41 -0500546 responseHeader := &ic.Header{
khenaidooabad44c2018-08-03 16:58:35 -0400547 Id: request.Header.Id,
khenaidoo79232702018-12-04 11:00:41 -0500548 Type: ic.MessageType_RESPONSE,
khenaidooabad44c2018-08-03 16:58:35 -0400549 FromTopic: request.Header.ToTopic,
550 ToTopic: request.Header.FromTopic,
551 Timestamp: time.Now().Unix(),
552 }
553
554 // Go over all returned values
555 var marshalledReturnedVal *any.Any
556 var err error
557 for _, returnVal := range returnedValues {
khenaidoo43c82122018-11-22 18:38:28 -0500558 if marshalledReturnedVal, err = encodeReturnedValue(returnVal); err != nil {
khenaidooabad44c2018-08-03 16:58:35 -0400559 log.Warnw("cannot-marshal-response-body", log.Fields{"error": err})
560 }
561 break // for now we support only 1 returned value - (excluding the error)
562 }
563
khenaidoo79232702018-12-04 11:00:41 -0500564 responseBody := &ic.InterContainerResponseBody{
khenaidooabad44c2018-08-03 16:58:35 -0400565 Success: success,
566 Result: marshalledReturnedVal,
567 }
568
569 // Marshal the response body
570 var marshalledResponseBody *any.Any
571 if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
572 log.Warnw("cannot-marshal-response-body", log.Fields{"error": err})
573 return nil, err
574 }
575
khenaidoo79232702018-12-04 11:00:41 -0500576 return &ic.InterContainerMessage{
khenaidooabad44c2018-08-03 16:58:35 -0400577 Header: responseHeader,
578 Body: marshalledResponseBody,
579 }, nil
580}
581
582func CallFuncByName(myClass interface{}, funcName string, params ...interface{}) (out []reflect.Value, err error) {
583 myClassValue := reflect.ValueOf(myClass)
khenaidoo19374072018-12-11 11:05:15 -0500584 // Capitalize the first letter in the funcName to workaround the first capital letters required to
585 // invoke a function from a different package
586 funcName = strings.Title(funcName)
khenaidooabad44c2018-08-03 16:58:35 -0400587 m := myClassValue.MethodByName(funcName)
588 if !m.IsValid() {
khenaidoo43c82122018-11-22 18:38:28 -0500589 return make([]reflect.Value, 0), fmt.Errorf("method-not-found \"%s\"", funcName)
khenaidooabad44c2018-08-03 16:58:35 -0400590 }
591 in := make([]reflect.Value, len(params))
592 for i, param := range params {
593 in[i] = reflect.ValueOf(param)
594 }
595 out = m.Call(in)
596 return
597}
598
khenaidoo297cd252019-02-07 22:10:23 -0500599func (kp *InterContainerProxy) addTransactionId(transactionId string, currentArgs []*ic.Argument) []*ic.Argument {
600 arg := &KVArg{
601 Key: TransactionKey,
602 Value: &ic.StrType{Val: transactionId},
603 }
604
605 var marshalledArg *any.Any
606 var err error
607 if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: transactionId}); err != nil {
608 log.Warnw("cannot-add-transactionId", log.Fields{"error": err})
609 return currentArgs
610 }
611 protoArg := &ic.Argument{
612 Key: arg.Key,
613 Value: marshalledArg,
614 }
615 return append(currentArgs, protoArg)
616}
617
khenaidoo79232702018-12-04 11:00:41 -0500618func (kp *InterContainerProxy) handleRequest(msg *ic.InterContainerMessage, targetInterface interface{}) {
khenaidooabad44c2018-08-03 16:58:35 -0400619
khenaidoo43c82122018-11-22 18:38:28 -0500620 // First extract the header to know whether this is a request - responses are handled by a different handler
khenaidoo79232702018-12-04 11:00:41 -0500621 if msg.Header.Type == ic.MessageType_REQUEST {
khenaidooabad44c2018-08-03 16:58:35 -0400622
623 var out []reflect.Value
624 var err error
625
626 // Get the request body
khenaidoo79232702018-12-04 11:00:41 -0500627 requestBody := &ic.InterContainerRequestBody{}
khenaidooabad44c2018-08-03 16:58:35 -0400628 if err = ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
629 log.Warnw("cannot-unmarshal-request", log.Fields{"error": err})
630 } else {
khenaidoo43c82122018-11-22 18:38:28 -0500631 log.Debugw("received-request", log.Fields{"rpc": requestBody.Rpc, "header": msg.Header})
khenaidooabad44c2018-08-03 16:58:35 -0400632 // let the callee unpack the arguments as its the only one that knows the real proto type
khenaidoo297cd252019-02-07 22:10:23 -0500633 // Augment the requestBody with the message Id as it will be used in scenarios where cores
634 // are set in pairs and competing
635 requestBody.Args = kp.addTransactionId(msg.Header.Id, requestBody.Args)
khenaidooabad44c2018-08-03 16:58:35 -0400636 out, err = CallFuncByName(targetInterface, requestBody.Rpc, requestBody.Args)
637 if err != nil {
638 log.Warn(err)
639 }
640 }
641 // Response required?
642 if requestBody.ResponseRequired {
643 // If we already have an error before then just return that
khenaidoo79232702018-12-04 11:00:41 -0500644 var returnError *ic.Error
khenaidooabad44c2018-08-03 16:58:35 -0400645 var returnedValues []interface{}
646 var success bool
647 if err != nil {
khenaidoo79232702018-12-04 11:00:41 -0500648 returnError = &ic.Error{Reason: err.Error()}
khenaidooabad44c2018-08-03 16:58:35 -0400649 returnedValues = make([]interface{}, 1)
650 returnedValues[0] = returnError
651 } else {
khenaidoob9203542018-09-17 22:56:37 -0400652 returnedValues = make([]interface{}, 0)
653 // Check for errors first
654 lastIndex := len(out) - 1
655 if out[lastIndex].Interface() != nil { // Error
656 if goError, ok := out[lastIndex].Interface().(error); ok {
khenaidoo79232702018-12-04 11:00:41 -0500657 returnError = &ic.Error{Reason: goError.Error()}
khenaidoob9203542018-09-17 22:56:37 -0400658 returnedValues = append(returnedValues, returnError)
659 } else { // Should never happen
khenaidoo79232702018-12-04 11:00:41 -0500660 returnError = &ic.Error{Reason: "incorrect-error-returns"}
khenaidoob9203542018-09-17 22:56:37 -0400661 returnedValues = append(returnedValues, returnError)
662 }
khenaidoo297cd252019-02-07 22:10:23 -0500663 } else if len(out) == 2 && reflect.ValueOf(out[0].Interface()).IsValid() && reflect.ValueOf(out[0].Interface()).IsNil() {
664 return // Ignore case - when core is in competing mode
khenaidoob9203542018-09-17 22:56:37 -0400665 } else { // Non-error case
666 success = true
667 for idx, val := range out {
khenaidoo43c82122018-11-22 18:38:28 -0500668 //log.Debugw("returned-api-response-loop", log.Fields{"idx": idx, "val": val.Interface()})
khenaidoob9203542018-09-17 22:56:37 -0400669 if idx != lastIndex {
670 returnedValues = append(returnedValues, val.Interface())
khenaidooabad44c2018-08-03 16:58:35 -0400671 }
khenaidooabad44c2018-08-03 16:58:35 -0400672 }
673 }
674 }
675
khenaidoo79232702018-12-04 11:00:41 -0500676 var icm *ic.InterContainerMessage
khenaidooabad44c2018-08-03 16:58:35 -0400677 if icm, err = encodeResponse(msg, success, returnedValues...); err != nil {
khenaidoob9203542018-09-17 22:56:37 -0400678 log.Warnw("error-encoding-response-returning-failure-result", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400679 icm = encodeDefaultFailedResponse(msg)
680 }
khenaidoo43c82122018-11-22 18:38:28 -0500681 // To preserve ordering of messages, all messages to a given topic are sent to the same partition
682 // by providing a message key. The key is encoded in the topic name. If the deviceId is not
683 // present then the key will be empty, hence all messages for a given topic will be sent to all
684 // partitions.
685 replyTopic := &Topic{Name: msg.Header.FromTopic}
686 key := GetDeviceIdFromTopic(*replyTopic)
khenaidoo90847922018-12-03 14:47:51 -0500687 log.Debugw("sending-response-to-kafka", log.Fields{"rpc": requestBody.Rpc, "header": icm.Header, "key": key})
khenaidoo43c82122018-11-22 18:38:28 -0500688 // TODO: handle error response.
689 kp.kafkaClient.Send(icm, replyTopic, key)
khenaidooabad44c2018-08-03 16:58:35 -0400690 }
691
khenaidooabad44c2018-08-03 16:58:35 -0400692 }
693}
694
khenaidoo79232702018-12-04 11:00:41 -0500695func (kp *InterContainerProxy) waitForRequest(ch <-chan *ic.InterContainerMessage, topic Topic, targetInterface interface{}) {
khenaidooabad44c2018-08-03 16:58:35 -0400696 // Wait for messages
697 for msg := range ch {
khenaidoo43c82122018-11-22 18:38:28 -0500698 //log.Debugw("request-received", log.Fields{"msg": msg, "topic": topic.Name, "target": targetInterface})
khenaidooabad44c2018-08-03 16:58:35 -0400699 go kp.handleRequest(msg, targetInterface)
700 }
701}
702
khenaidoo79232702018-12-04 11:00:41 -0500703func (kp *InterContainerProxy) dispatchResponse(msg *ic.InterContainerMessage) {
khenaidooabad44c2018-08-03 16:58:35 -0400704 kp.lockTransactionIdToChannelMap.Lock()
705 defer kp.lockTransactionIdToChannelMap.Unlock()
706 if _, exist := kp.transactionIdToChannelMap[msg.Header.Id]; !exist {
707 log.Debugw("no-waiting-channel", log.Fields{"transaction": msg.Header.Id})
708 return
709 }
khenaidoo43c82122018-11-22 18:38:28 -0500710 kp.transactionIdToChannelMap[msg.Header.Id].ch <- msg
khenaidooabad44c2018-08-03 16:58:35 -0400711}
712
khenaidoo43c82122018-11-22 18:38:28 -0500713// waitForResponse listens for messages on the subscribedCh, ensure we get a response with the transaction ID,
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500714// and then dispatches to the consumers
khenaidoo79232702018-12-04 11:00:41 -0500715func (kp *InterContainerProxy) waitForResponseLoop(subscribedCh <-chan *ic.InterContainerMessage, topic *Topic) {
khenaidoo43c82122018-11-22 18:38:28 -0500716 log.Debugw("starting-response-loop-for-topic", log.Fields{"topic": topic.Name})
khenaidooabad44c2018-08-03 16:58:35 -0400717startloop:
718 for {
719 select {
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500720 case msg, ok := <-subscribedCh:
721 if !ok {
722 log.Debugw("channel-closed", log.Fields{"topic": topic.Name})
723 break startloop
724 }
725 log.Debugw("message-received", log.Fields{"msg": msg})
726 //log.Debugw("message-received", log.Fields{"msg": msg, "fromTopic": msg.Header.FromTopic})
khenaidoo79232702018-12-04 11:00:41 -0500727 if msg.Header.Type == ic.MessageType_RESPONSE {
khenaidoo43c82122018-11-22 18:38:28 -0500728 go kp.dispatchResponse(msg)
729 }
khenaidooabad44c2018-08-03 16:58:35 -0400730 case <-kp.doneCh:
731 log.Infow("received-exit-signal", log.Fields{"topic": topic.Name})
732 break startloop
733 }
734 }
khenaidoo43c82122018-11-22 18:38:28 -0500735 //log.Infow("received-exit-signal-out-of-for-loop", log.Fields{"topic": topic.Name})
736 // We got an exit signal. Unsubscribe to the channel
737 //kp.kafkaClient.UnSubscribe(topic, subscribedCh)
khenaidooabad44c2018-08-03 16:58:35 -0400738}
739
740// subscribeForResponse allows a caller to subscribe to a given topic when waiting for a response.
741// This method is built to prevent all subscribers to receive all messages as is the case of the Subscribe
742// API. There is one response channel waiting for kafka messages before dispatching the message to the
743// corresponding waiting channel
khenaidoo79232702018-12-04 11:00:41 -0500744func (kp *InterContainerProxy) subscribeForResponse(topic Topic, trnsId string) (chan *ic.InterContainerMessage, error) {
khenaidoob9203542018-09-17 22:56:37 -0400745 log.Debugw("subscribeForResponse", log.Fields{"topic": topic.Name, "trnsid": trnsId})
khenaidooabad44c2018-08-03 16:58:35 -0400746
khenaidoo43c82122018-11-22 18:38:28 -0500747 // First check whether we already have a channel listening for response on that topic. If there is
748 // already one then it will be reused. If not, it will be created.
749 if !kp.isTopicSubscribedForResponse(topic.Name) {
khenaidooca301322019-01-09 23:06:32 -0500750 log.Debugw("not-subscribed-for-response", log.Fields{"topic": topic.Name, "trnsid": trnsId})
khenaidoo79232702018-12-04 11:00:41 -0500751 var subscribedCh <-chan *ic.InterContainerMessage
khenaidooabad44c2018-08-03 16:58:35 -0400752 var err error
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500753 if subscribedCh, err = kp.kafkaClient.Subscribe(&topic); err != nil {
khenaidoo43c82122018-11-22 18:38:28 -0500754 log.Debugw("subscribe-failure", log.Fields{"topic": topic.Name})
khenaidooabad44c2018-08-03 16:58:35 -0400755 return nil, err
756 }
khenaidoo43c82122018-11-22 18:38:28 -0500757 kp.setupTopicResponseChannelMap(topic.Name, subscribedCh)
758 go kp.waitForResponseLoop(subscribedCh, &topic)
khenaidoo6d055132019-02-12 16:51:19 -0500759
760 // Wait until topic is ready - it takes on average 300 ms for a topic to be created. This is a one time
761 // delay everything a device is created.
762 // TODO: Implement a mechanism to determine when a topic is ready instead of relying on a timeout
763 //kp.kafkaClient.WaitForTopicToBeReady
764 time.Sleep(400 * time.Millisecond)
khenaidooabad44c2018-08-03 16:58:35 -0400765 }
766
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500767 // Create a specific channel for this consumers. We cannot use the channel from the kafkaclient as it will
khenaidoo43c82122018-11-22 18:38:28 -0500768 // broadcast any message for this topic to all channels waiting on it.
khenaidoo79232702018-12-04 11:00:41 -0500769 ch := make(chan *ic.InterContainerMessage)
khenaidoo43c82122018-11-22 18:38:28 -0500770 kp.addToTransactionIdToChannelMap(trnsId, &topic, ch)
khenaidooabad44c2018-08-03 16:58:35 -0400771
772 return ch, nil
773}
774
khenaidoo43c82122018-11-22 18:38:28 -0500775func (kp *InterContainerProxy) unSubscribeForResponse(trnsId string) error {
khenaidooabad44c2018-08-03 16:58:35 -0400776 log.Debugw("unsubscribe-for-response", log.Fields{"trnsId": trnsId})
khenaidoo7ff26c72019-01-16 14:55:48 -0500777 kp.deleteFromTransactionIdToChannelMap(trnsId)
khenaidooabad44c2018-08-03 16:58:35 -0400778 return nil
779}
780
781//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
782//or an error on failure
khenaidoo79232702018-12-04 11:00:41 -0500783func encodeRequest(rpc string, toTopic *Topic, replyTopic *Topic, kvArgs ...*KVArg) (*ic.InterContainerMessage, error) {
784 requestHeader := &ic.Header{
khenaidooabad44c2018-08-03 16:58:35 -0400785 Id: uuid.New().String(),
khenaidoo79232702018-12-04 11:00:41 -0500786 Type: ic.MessageType_REQUEST,
khenaidooabad44c2018-08-03 16:58:35 -0400787 FromTopic: replyTopic.Name,
788 ToTopic: toTopic.Name,
789 Timestamp: time.Now().Unix(),
790 }
khenaidoo79232702018-12-04 11:00:41 -0500791 requestBody := &ic.InterContainerRequestBody{
khenaidooabad44c2018-08-03 16:58:35 -0400792 Rpc: rpc,
793 ResponseRequired: true,
794 ReplyToTopic: replyTopic.Name,
795 }
796
797 for _, arg := range kvArgs {
khenaidoo2c6f1672018-09-20 23:14:41 -0400798 if arg == nil {
799 // In case the caller sends an array with empty args
800 continue
801 }
khenaidooabad44c2018-08-03 16:58:35 -0400802 var marshalledArg *any.Any
803 var err error
804 // ascertain the value interface type is a proto.Message
805 protoValue, ok := arg.Value.(proto.Message)
806 if !ok {
807 log.Warnw("argument-value-not-proto-message", log.Fields{"error": ok, "Value": arg.Value})
808 err := errors.New("argument-value-not-proto-message")
809 return nil, err
810 }
811 if marshalledArg, err = ptypes.MarshalAny(protoValue); err != nil {
812 log.Warnw("cannot-marshal-request", log.Fields{"error": err})
813 return nil, err
814 }
khenaidoo79232702018-12-04 11:00:41 -0500815 protoArg := &ic.Argument{
khenaidooabad44c2018-08-03 16:58:35 -0400816 Key: arg.Key,
817 Value: marshalledArg,
818 }
819 requestBody.Args = append(requestBody.Args, protoArg)
820 }
821
822 var marshalledData *any.Any
823 var err error
824 if marshalledData, err = ptypes.MarshalAny(requestBody); err != nil {
825 log.Warnw("cannot-marshal-request", log.Fields{"error": err})
826 return nil, err
827 }
khenaidoo79232702018-12-04 11:00:41 -0500828 request := &ic.InterContainerMessage{
khenaidooabad44c2018-08-03 16:58:35 -0400829 Header: requestHeader,
830 Body: marshalledData,
831 }
832 return request, nil
833}
834
khenaidoo79232702018-12-04 11:00:41 -0500835func decodeResponse(response *ic.InterContainerMessage) (*ic.InterContainerResponseBody, error) {
khenaidooabad44c2018-08-03 16:58:35 -0400836 // Extract the message body
khenaidoo79232702018-12-04 11:00:41 -0500837 responseBody := ic.InterContainerResponseBody{}
khenaidooabad44c2018-08-03 16:58:35 -0400838 if err := ptypes.UnmarshalAny(response.Body, &responseBody); err != nil {
839 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
840 return nil, err
841 }
khenaidoo43c82122018-11-22 18:38:28 -0500842 //log.Debugw("response-decoded-successfully", log.Fields{"response-status": &responseBody.Success})
khenaidooabad44c2018-08-03 16:58:35 -0400843
844 return &responseBody, nil
845
846}