blob: 56b5fa102605d696cbe36e44ad4f4a471fc8d434 [file] [log] [blame]
khenaidoobf6e7bb2018-08-14 22:27:29 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
khenaidooabad44c2018-08-03 16:58:35 -040016package kafka
17
18import (
19 "context"
20 "errors"
21 "fmt"
khenaidooabad44c2018-08-03 16:58:35 -040022 "github.com/golang/protobuf/proto"
23 "github.com/golang/protobuf/ptypes"
24 "github.com/golang/protobuf/ptypes/any"
25 "github.com/google/uuid"
26 "github.com/opencord/voltha-go/common/log"
William Kurkiandaa6bb22019-03-07 12:26:28 -050027 ic "github.com/opencord/voltha-protos/go/inter_container"
khenaidooabad44c2018-08-03 16:58:35 -040028 "reflect"
khenaidoo19374072018-12-11 11:05:15 -050029 "strings"
khenaidooabad44c2018-08-03 16:58:35 -040030 "sync"
31 "time"
32)
33
34// Initialize the logger - gets the default until the main function setup the logger
35func init() {
khenaidooca301322019-01-09 23:06:32 -050036 log.AddPackage(log.JSON, log.DebugLevel, nil)
khenaidooabad44c2018-08-03 16:58:35 -040037}
38
39const (
khenaidoo43c82122018-11-22 18:38:28 -050040 DefaultMaxRetries = 3
khenaidoo6d055132019-02-12 16:51:19 -050041 DefaultRequestTimeout = 10000 // 10000 milliseconds - to handle a wider latency range
khenaidooabad44c2018-08-03 16:58:35 -040042)
43
khenaidoo297cd252019-02-07 22:10:23 -050044const (
45 TransactionKey = "transactionID"
khenaidoo54e0ddf2019-02-27 16:21:33 -050046 FromTopic = "fromTopic"
khenaidoo297cd252019-02-07 22:10:23 -050047)
48
khenaidoo43c82122018-11-22 18:38:28 -050049// requestHandlerChannel represents an interface associated with a channel. Whenever, an event is
50// obtained from that channel, this interface is invoked. This is used to handle
51// async requests into the Core via the kafka messaging bus
52type requestHandlerChannel struct {
53 requesthandlerInterface interface{}
khenaidoo79232702018-12-04 11:00:41 -050054 ch <-chan *ic.InterContainerMessage
khenaidooabad44c2018-08-03 16:58:35 -040055}
56
khenaidoo43c82122018-11-22 18:38:28 -050057// transactionChannel represents a combination of a topic and a channel onto which a response received
58// on the kafka bus will be sent to
59type transactionChannel struct {
60 topic *Topic
khenaidoo79232702018-12-04 11:00:41 -050061 ch chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -050062}
63
64// InterContainerProxy represents the messaging proxy
65type InterContainerProxy struct {
66 kafkaHost string
67 kafkaPort int
68 DefaultTopic *Topic
69 defaultRequestHandlerInterface interface{}
khenaidoo79232702018-12-04 11:00:41 -050070 deviceDiscoveryTopic *Topic
khenaidoo43c82122018-11-22 18:38:28 -050071 kafkaClient Client
72 doneCh chan int
73
74 // This map is used to map a topic to an interface and channel. When a request is received
75 // on that channel (registered to the topic) then that interface is invoked.
76 topicToRequestHandlerChannelMap map[string]*requestHandlerChannel
77 lockTopicRequestHandlerChannelMap sync.RWMutex
78
79 // This map is used to map a channel to a response topic. This channel handles all responses on that
khenaidoo4c1a5bf2018-11-29 15:53:42 -050080 // channel for that topic and forward them to the appropriate consumers channel, using the
khenaidoo43c82122018-11-22 18:38:28 -050081 // transactionIdToChannelMap.
khenaidoo79232702018-12-04 11:00:41 -050082 topicToResponseChannelMap map[string]<-chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -050083 lockTopicResponseChannelMap sync.RWMutex
84
khenaidoo4c1a5bf2018-11-29 15:53:42 -050085 // This map is used to map a transaction to a consumers channel. This is used whenever a request has been
khenaidoo43c82122018-11-22 18:38:28 -050086 // sent out and we are waiting for a response.
87 transactionIdToChannelMap map[string]*transactionChannel
khenaidooabad44c2018-08-03 16:58:35 -040088 lockTransactionIdToChannelMap sync.RWMutex
89}
90
khenaidoo43c82122018-11-22 18:38:28 -050091type InterContainerProxyOption func(*InterContainerProxy)
khenaidooabad44c2018-08-03 16:58:35 -040092
khenaidoo43c82122018-11-22 18:38:28 -050093func InterContainerHost(host string) InterContainerProxyOption {
94 return func(args *InterContainerProxy) {
95 args.kafkaHost = host
khenaidooabad44c2018-08-03 16:58:35 -040096 }
97}
98
khenaidoo43c82122018-11-22 18:38:28 -050099func InterContainerPort(port int) InterContainerProxyOption {
100 return func(args *InterContainerProxy) {
101 args.kafkaPort = port
khenaidooabad44c2018-08-03 16:58:35 -0400102 }
103}
104
khenaidoo43c82122018-11-22 18:38:28 -0500105func DefaultTopic(topic *Topic) InterContainerProxyOption {
106 return func(args *InterContainerProxy) {
khenaidooabad44c2018-08-03 16:58:35 -0400107 args.DefaultTopic = topic
108 }
109}
110
khenaidoo79232702018-12-04 11:00:41 -0500111func DeviceDiscoveryTopic(topic *Topic) InterContainerProxyOption {
112 return func(args *InterContainerProxy) {
113 args.deviceDiscoveryTopic = topic
114 }
115}
116
khenaidoo43c82122018-11-22 18:38:28 -0500117func RequestHandlerInterface(handler interface{}) InterContainerProxyOption {
118 return func(args *InterContainerProxy) {
119 args.defaultRequestHandlerInterface = handler
khenaidooabad44c2018-08-03 16:58:35 -0400120 }
121}
122
khenaidoo43c82122018-11-22 18:38:28 -0500123func MsgClient(client Client) InterContainerProxyOption {
124 return func(args *InterContainerProxy) {
125 args.kafkaClient = client
126 }
127}
128
129func NewInterContainerProxy(opts ...InterContainerProxyOption) (*InterContainerProxy, error) {
130 proxy := &InterContainerProxy{
131 kafkaHost: DefaultKafkaHost,
132 kafkaPort: DefaultKafkaPort,
khenaidooabad44c2018-08-03 16:58:35 -0400133 }
134
135 for _, option := range opts {
136 option(proxy)
137 }
138
139 // Create the locks for all the maps
khenaidoo43c82122018-11-22 18:38:28 -0500140 proxy.lockTopicRequestHandlerChannelMap = sync.RWMutex{}
khenaidooabad44c2018-08-03 16:58:35 -0400141 proxy.lockTransactionIdToChannelMap = sync.RWMutex{}
khenaidoo43c82122018-11-22 18:38:28 -0500142 proxy.lockTopicResponseChannelMap = sync.RWMutex{}
khenaidooabad44c2018-08-03 16:58:35 -0400143
144 return proxy, nil
145}
146
khenaidoo43c82122018-11-22 18:38:28 -0500147func (kp *InterContainerProxy) Start() error {
khenaidooabad44c2018-08-03 16:58:35 -0400148 log.Info("Starting-Proxy")
149
khenaidoo43c82122018-11-22 18:38:28 -0500150 // Kafka MsgClient should already have been created. If not, output fatal error
151 if kp.kafkaClient == nil {
152 log.Fatal("kafka-client-not-set")
153 }
154
khenaidooabad44c2018-08-03 16:58:35 -0400155 // Create the Done channel
156 kp.doneCh = make(chan int, 1)
157
khenaidoo43c82122018-11-22 18:38:28 -0500158 // Start the kafka client
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500159 if err := kp.kafkaClient.Start(); err != nil {
khenaidoo43c82122018-11-22 18:38:28 -0500160 log.Errorw("Cannot-create-kafka-proxy", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400161 return err
162 }
163
khenaidoo43c82122018-11-22 18:38:28 -0500164 // Create the topic to response channel map
khenaidoo79232702018-12-04 11:00:41 -0500165 kp.topicToResponseChannelMap = make(map[string]<-chan *ic.InterContainerMessage)
khenaidoo43c82122018-11-22 18:38:28 -0500166 //
khenaidooabad44c2018-08-03 16:58:35 -0400167 // Create the transactionId to Channel Map
khenaidoo43c82122018-11-22 18:38:28 -0500168 kp.transactionIdToChannelMap = make(map[string]*transactionChannel)
169
170 // Create the topic to request channel map
171 kp.topicToRequestHandlerChannelMap = make(map[string]*requestHandlerChannel)
khenaidooabad44c2018-08-03 16:58:35 -0400172
173 return nil
174}
175
khenaidoo43c82122018-11-22 18:38:28 -0500176func (kp *InterContainerProxy) Stop() {
177 log.Info("stopping-intercontainer-proxy")
178 kp.doneCh <- 1
179 // TODO : Perform cleanup
khenaidooca301322019-01-09 23:06:32 -0500180 kp.kafkaClient.Stop()
khenaidoo43c82122018-11-22 18:38:28 -0500181 //kp.deleteAllTopicRequestHandlerChannelMap()
182 //kp.deleteAllTopicResponseChannelMap()
183 //kp.deleteAllTransactionIdToChannelMap()
khenaidooabad44c2018-08-03 16:58:35 -0400184}
185
khenaidoo79232702018-12-04 11:00:41 -0500186// DeviceDiscovered publish the discovered device onto the kafka messaging bus
khenaidoo19374072018-12-11 11:05:15 -0500187func (kp *InterContainerProxy) DeviceDiscovered(deviceId string, deviceType string, parentId string, publisher string) error {
khenaidoo79232702018-12-04 11:00:41 -0500188 log.Debugw("sending-device-discovery-msg", log.Fields{"deviceId": deviceId})
189 // Simple validation
190 if deviceId == "" || deviceType == "" {
191 log.Errorw("invalid-parameters", log.Fields{"id": deviceId, "type": deviceType})
192 return errors.New("invalid-parameters")
193 }
194 // Create the device discovery message
195 header := &ic.Header{
196 Id: uuid.New().String(),
197 Type: ic.MessageType_DEVICE_DISCOVERED,
198 FromTopic: kp.DefaultTopic.Name,
199 ToTopic: kp.deviceDiscoveryTopic.Name,
200 Timestamp: time.Now().UnixNano(),
201 }
202 body := &ic.DeviceDiscovered{
203 Id: deviceId,
204 DeviceType: deviceType,
205 ParentId: parentId,
khenaidood2b6df92018-12-13 16:37:20 -0500206 Publisher: publisher,
khenaidoo79232702018-12-04 11:00:41 -0500207 }
208
209 var marshalledData *any.Any
210 var err error
211 if marshalledData, err = ptypes.MarshalAny(body); err != nil {
212 log.Errorw("cannot-marshal-request", log.Fields{"error": err})
213 return err
214 }
215 msg := &ic.InterContainerMessage{
216 Header: header,
217 Body: marshalledData,
218 }
219
220 // Send the message
221 if err := kp.kafkaClient.Send(msg, kp.deviceDiscoveryTopic); err != nil {
222 log.Errorw("cannot-send-device-discovery-message", log.Fields{"error": err})
223 return err
224 }
225 return nil
226}
227
khenaidoo43c82122018-11-22 18:38:28 -0500228// InvokeRPC is used to send a request to a given topic
229func (kp *InterContainerProxy) InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
khenaidoobdcb8e02019-03-06 16:28:56 -0500230 waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any) {
khenaidoo43c82122018-11-22 18:38:28 -0500231
232 // If a replyToTopic is provided then we use it, otherwise just use the default toTopic. The replyToTopic is
233 // typically the device ID.
234 responseTopic := replyToTopic
235 if responseTopic == nil {
236 responseTopic = kp.DefaultTopic
237 }
238
khenaidooabad44c2018-08-03 16:58:35 -0400239 // Encode the request
khenaidoobdcb8e02019-03-06 16:28:56 -0500240 protoRequest, err := encodeRequest(rpc, toTopic, responseTopic, key, kvArgs...)
khenaidooabad44c2018-08-03 16:58:35 -0400241 if err != nil {
242 log.Warnw("cannot-format-request", log.Fields{"rpc": rpc, "error": err})
243 return false, nil
244 }
245
246 // Subscribe for response, if needed, before sending request
khenaidoo79232702018-12-04 11:00:41 -0500247 var ch <-chan *ic.InterContainerMessage
khenaidooabad44c2018-08-03 16:58:35 -0400248 if waitForResponse {
249 var err error
khenaidoo43c82122018-11-22 18:38:28 -0500250 if ch, err = kp.subscribeForResponse(*responseTopic, protoRequest.Header.Id); err != nil {
251 log.Errorw("failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
khenaidooabad44c2018-08-03 16:58:35 -0400252 }
253 }
254
khenaidoo43c82122018-11-22 18:38:28 -0500255 // Send request - if the topic is formatted with a device Id then we will send the request using a
256 // specific key, hence ensuring a single partition is used to publish the request. This ensures that the
257 // subscriber on that topic will receive the request in the order it was sent. The key used is the deviceId.
khenaidoobdcb8e02019-03-06 16:28:56 -0500258 //key := GetDeviceIdFromTopic(*toTopic)
khenaidoo1ce37ad2019-03-24 22:07:24 -0400259 log.Debugw("sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
khenaidoo8f474192019-04-03 17:20:44 -0400260 go kp.kafkaClient.Send(protoRequest, toTopic, key)
khenaidooabad44c2018-08-03 16:58:35 -0400261
262 if waitForResponse {
khenaidoob9203542018-09-17 22:56:37 -0400263 // Create a child context based on the parent context, if any
khenaidooabad44c2018-08-03 16:58:35 -0400264 var cancel context.CancelFunc
khenaidoob9203542018-09-17 22:56:37 -0400265 childCtx := context.Background()
khenaidooabad44c2018-08-03 16:58:35 -0400266 if ctx == nil {
267 ctx, cancel = context.WithTimeout(context.Background(), DefaultRequestTimeout*time.Millisecond)
khenaidoob9203542018-09-17 22:56:37 -0400268 } else {
269 childCtx, cancel = context.WithTimeout(ctx, DefaultRequestTimeout*time.Millisecond)
khenaidooabad44c2018-08-03 16:58:35 -0400270 }
khenaidoob9203542018-09-17 22:56:37 -0400271 defer cancel()
khenaidooabad44c2018-08-03 16:58:35 -0400272
273 // Wait for response as well as timeout or cancellation
274 // Remove the subscription for a response on return
275 defer kp.unSubscribeForResponse(protoRequest.Header.Id)
276 select {
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500277 case msg, ok := <-ch:
278 if !ok {
279 log.Warnw("channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
280 protoError := &ic.Error{Reason: "channel-closed"}
281 var marshalledArg *any.Any
282 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
283 return false, nil // Should never happen
284 }
285 return false, marshalledArg
286 }
khenaidoo43c82122018-11-22 18:38:28 -0500287 log.Debugw("received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
khenaidoo79232702018-12-04 11:00:41 -0500288 var responseBody *ic.InterContainerResponseBody
khenaidooabad44c2018-08-03 16:58:35 -0400289 var err error
290 if responseBody, err = decodeResponse(msg); err != nil {
291 log.Errorw("decode-response-error", log.Fields{"error": err})
292 }
293 return responseBody.Success, responseBody.Result
294 case <-ctx.Done():
295 log.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
296 // pack the error as proto any type
khenaidoo79232702018-12-04 11:00:41 -0500297 protoError := &ic.Error{Reason: ctx.Err().Error()}
khenaidooabad44c2018-08-03 16:58:35 -0400298 var marshalledArg *any.Any
299 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
300 return false, nil // Should never happen
301 }
302 return false, marshalledArg
khenaidoob9203542018-09-17 22:56:37 -0400303 case <-childCtx.Done():
304 log.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": childCtx.Err()})
305 // pack the error as proto any type
khenaidoo79232702018-12-04 11:00:41 -0500306 protoError := &ic.Error{Reason: childCtx.Err().Error()}
khenaidoob9203542018-09-17 22:56:37 -0400307 var marshalledArg *any.Any
308 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
309 return false, nil // Should never happen
310 }
311 return false, marshalledArg
khenaidooabad44c2018-08-03 16:58:35 -0400312 case <-kp.doneCh:
khenaidoo43c82122018-11-22 18:38:28 -0500313 log.Infow("received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
khenaidooabad44c2018-08-03 16:58:35 -0400314 return true, nil
315 }
316 }
317 return true, nil
318}
319
khenaidoo43c82122018-11-22 18:38:28 -0500320// SubscribeWithRequestHandlerInterface allows a caller to assign a target object to be invoked automatically
khenaidooabad44c2018-08-03 16:58:35 -0400321// when a message is received on a given topic
khenaidoo43c82122018-11-22 18:38:28 -0500322func (kp *InterContainerProxy) SubscribeWithRequestHandlerInterface(topic Topic, handler interface{}) error {
khenaidooabad44c2018-08-03 16:58:35 -0400323
324 // Subscribe to receive messages for that topic
khenaidoo79232702018-12-04 11:00:41 -0500325 var ch <-chan *ic.InterContainerMessage
khenaidooabad44c2018-08-03 16:58:35 -0400326 var err error
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500327 if ch, err = kp.kafkaClient.Subscribe(&topic); err != nil {
khenaidoo43c82122018-11-22 18:38:28 -0500328 //if ch, err = kp.Subscribe(topic); err != nil {
khenaidooabad44c2018-08-03 16:58:35 -0400329 log.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
Abhilash S.L90cd9552019-07-18 17:30:29 +0530330 return err
khenaidooabad44c2018-08-03 16:58:35 -0400331 }
khenaidoo43c82122018-11-22 18:38:28 -0500332
333 kp.defaultRequestHandlerInterface = handler
334 kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: handler, ch: ch})
khenaidooabad44c2018-08-03 16:58:35 -0400335 // Launch a go routine to receive and process kafka messages
khenaidoo54e0ddf2019-02-27 16:21:33 -0500336 go kp.waitForMessages(ch, topic, handler)
khenaidooabad44c2018-08-03 16:58:35 -0400337
338 return nil
339}
340
khenaidoo43c82122018-11-22 18:38:28 -0500341// SubscribeWithDefaultRequestHandler allows a caller to add a topic to an existing target object to be invoked automatically
342// when a message is received on a given topic. So far there is only 1 target registered per microservice
khenaidoo731697e2019-01-29 16:03:29 -0500343func (kp *InterContainerProxy) SubscribeWithDefaultRequestHandler(topic Topic, initialOffset int64) error {
khenaidoo43c82122018-11-22 18:38:28 -0500344 // Subscribe to receive messages for that topic
khenaidoo79232702018-12-04 11:00:41 -0500345 var ch <-chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -0500346 var err error
khenaidoo54e0ddf2019-02-27 16:21:33 -0500347 if ch, err = kp.kafkaClient.Subscribe(&topic, &KVArg{Key: Offset, Value: initialOffset}); err != nil {
khenaidoo43c82122018-11-22 18:38:28 -0500348 log.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
khenaidooca301322019-01-09 23:06:32 -0500349 return err
khenaidoo43c82122018-11-22 18:38:28 -0500350 }
351 kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: kp.defaultRequestHandlerInterface, ch: ch})
352
353 // Launch a go routine to receive and process kafka messages
khenaidoo54e0ddf2019-02-27 16:21:33 -0500354 go kp.waitForMessages(ch, topic, kp.defaultRequestHandlerInterface)
khenaidoo43c82122018-11-22 18:38:28 -0500355
khenaidooabad44c2018-08-03 16:58:35 -0400356 return nil
357}
358
khenaidoo43c82122018-11-22 18:38:28 -0500359func (kp *InterContainerProxy) UnSubscribeFromRequestHandler(topic Topic) error {
360 return kp.deleteFromTopicRequestHandlerChannelMap(topic.Name)
361}
362
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500363// setupTopicResponseChannelMap sets up single consumers channel that will act as a broadcast channel for all
khenaidoo43c82122018-11-22 18:38:28 -0500364// responses from that topic.
khenaidoo79232702018-12-04 11:00:41 -0500365func (kp *InterContainerProxy) setupTopicResponseChannelMap(topic string, arg <-chan *ic.InterContainerMessage) {
khenaidoo43c82122018-11-22 18:38:28 -0500366 kp.lockTopicResponseChannelMap.Lock()
367 defer kp.lockTopicResponseChannelMap.Unlock()
368 if _, exist := kp.topicToResponseChannelMap[topic]; !exist {
369 kp.topicToResponseChannelMap[topic] = arg
khenaidooabad44c2018-08-03 16:58:35 -0400370 }
371}
372
khenaidoo43c82122018-11-22 18:38:28 -0500373func (kp *InterContainerProxy) isTopicSubscribedForResponse(topic string) bool {
khenaidoo1ce37ad2019-03-24 22:07:24 -0400374 kp.lockTopicResponseChannelMap.RLock()
375 defer kp.lockTopicResponseChannelMap.RUnlock()
khenaidoo43c82122018-11-22 18:38:28 -0500376 _, exist := kp.topicToResponseChannelMap[topic]
377 return exist
378}
379
380func (kp *InterContainerProxy) deleteFromTopicResponseChannelMap(topic string) error {
381 kp.lockTopicResponseChannelMap.Lock()
382 defer kp.lockTopicResponseChannelMap.Unlock()
383 if _, exist := kp.topicToResponseChannelMap[topic]; exist {
384 // Unsubscribe to this topic first - this will close the subscribed channel
385 var err error
386 if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
387 log.Errorw("unsubscribing-error", log.Fields{"topic": topic})
388 }
389 delete(kp.topicToResponseChannelMap, topic)
390 return err
391 } else {
392 return errors.New(fmt.Sprintf("%s-Topic-not-found", topic))
khenaidooabad44c2018-08-03 16:58:35 -0400393 }
394}
395
khenaidoo43c82122018-11-22 18:38:28 -0500396func (kp *InterContainerProxy) deleteAllTopicResponseChannelMap() error {
397 kp.lockTopicResponseChannelMap.Lock()
398 defer kp.lockTopicResponseChannelMap.Unlock()
399 var err error
400 for topic, _ := range kp.topicToResponseChannelMap {
401 // Unsubscribe to this topic first - this will close the subscribed channel
402 if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
403 log.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
404 }
405 delete(kp.topicToResponseChannelMap, topic)
khenaidooabad44c2018-08-03 16:58:35 -0400406 }
khenaidoo43c82122018-11-22 18:38:28 -0500407 return err
khenaidooabad44c2018-08-03 16:58:35 -0400408}
409
khenaidoo43c82122018-11-22 18:38:28 -0500410func (kp *InterContainerProxy) addToTopicRequestHandlerChannelMap(topic string, arg *requestHandlerChannel) {
411 kp.lockTopicRequestHandlerChannelMap.Lock()
412 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
413 if _, exist := kp.topicToRequestHandlerChannelMap[topic]; !exist {
414 kp.topicToRequestHandlerChannelMap[topic] = arg
khenaidooabad44c2018-08-03 16:58:35 -0400415 }
khenaidooabad44c2018-08-03 16:58:35 -0400416}
417
khenaidoo43c82122018-11-22 18:38:28 -0500418func (kp *InterContainerProxy) deleteFromTopicRequestHandlerChannelMap(topic string) error {
419 kp.lockTopicRequestHandlerChannelMap.Lock()
420 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
421 if _, exist := kp.topicToRequestHandlerChannelMap[topic]; exist {
422 // Close the kafka client client first by unsubscribing to this topic
423 kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch)
424 delete(kp.topicToRequestHandlerChannelMap, topic)
khenaidooabad44c2018-08-03 16:58:35 -0400425 return nil
khenaidoo43c82122018-11-22 18:38:28 -0500426 } else {
427 return errors.New(fmt.Sprintf("%s-Topic-not-found", topic))
khenaidooabad44c2018-08-03 16:58:35 -0400428 }
khenaidooabad44c2018-08-03 16:58:35 -0400429}
430
khenaidoo43c82122018-11-22 18:38:28 -0500431func (kp *InterContainerProxy) deleteAllTopicRequestHandlerChannelMap() error {
432 kp.lockTopicRequestHandlerChannelMap.Lock()
433 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
434 var err error
435 for topic, _ := range kp.topicToRequestHandlerChannelMap {
436 // Close the kafka client client first by unsubscribing to this topic
437 if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
438 log.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
439 }
440 delete(kp.topicToRequestHandlerChannelMap, topic)
441 }
442 return err
443}
444
khenaidoo79232702018-12-04 11:00:41 -0500445func (kp *InterContainerProxy) addToTransactionIdToChannelMap(id string, topic *Topic, arg chan *ic.InterContainerMessage) {
khenaidooabad44c2018-08-03 16:58:35 -0400446 kp.lockTransactionIdToChannelMap.Lock()
447 defer kp.lockTransactionIdToChannelMap.Unlock()
448 if _, exist := kp.transactionIdToChannelMap[id]; !exist {
khenaidoo43c82122018-11-22 18:38:28 -0500449 kp.transactionIdToChannelMap[id] = &transactionChannel{topic: topic, ch: arg}
khenaidooabad44c2018-08-03 16:58:35 -0400450 }
451}
452
khenaidoo43c82122018-11-22 18:38:28 -0500453func (kp *InterContainerProxy) deleteFromTransactionIdToChannelMap(id string) {
khenaidooabad44c2018-08-03 16:58:35 -0400454 kp.lockTransactionIdToChannelMap.Lock()
455 defer kp.lockTransactionIdToChannelMap.Unlock()
khenaidoo43c82122018-11-22 18:38:28 -0500456 if transChannel, exist := kp.transactionIdToChannelMap[id]; exist {
457 // Close the channel first
458 close(transChannel.ch)
khenaidooabad44c2018-08-03 16:58:35 -0400459 delete(kp.transactionIdToChannelMap, id)
460 }
461}
462
khenaidoo43c82122018-11-22 18:38:28 -0500463func (kp *InterContainerProxy) deleteTopicTransactionIdToChannelMap(id string) {
464 kp.lockTransactionIdToChannelMap.Lock()
465 defer kp.lockTransactionIdToChannelMap.Unlock()
466 for key, value := range kp.transactionIdToChannelMap {
467 if value.topic.Name == id {
468 close(value.ch)
469 delete(kp.transactionIdToChannelMap, key)
khenaidooabad44c2018-08-03 16:58:35 -0400470 }
471 }
khenaidooabad44c2018-08-03 16:58:35 -0400472}
473
khenaidoo43c82122018-11-22 18:38:28 -0500474func (kp *InterContainerProxy) deleteAllTransactionIdToChannelMap() {
475 kp.lockTransactionIdToChannelMap.Lock()
476 defer kp.lockTransactionIdToChannelMap.Unlock()
477 for key, value := range kp.transactionIdToChannelMap {
478 close(value.ch)
479 delete(kp.transactionIdToChannelMap, key)
khenaidooabad44c2018-08-03 16:58:35 -0400480 }
khenaidooabad44c2018-08-03 16:58:35 -0400481}
482
khenaidoo43c82122018-11-22 18:38:28 -0500483func (kp *InterContainerProxy) DeleteTopic(topic Topic) error {
484 // If we have any consumers on that topic we need to close them
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500485 if err := kp.deleteFromTopicResponseChannelMap(topic.Name); err != nil {
486 log.Errorw("delete-from-topic-responsechannelmap-failed", log.Fields{"error": err})
487 }
488 if err := kp.deleteFromTopicRequestHandlerChannelMap(topic.Name); err != nil {
489 log.Errorw("delete-from-topic-requesthandlerchannelmap-failed", log.Fields{"error": err})
490 }
khenaidoo43c82122018-11-22 18:38:28 -0500491 kp.deleteTopicTransactionIdToChannelMap(topic.Name)
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500492
khenaidoo43c82122018-11-22 18:38:28 -0500493 return kp.kafkaClient.DeleteTopic(&topic)
494}
495
496func encodeReturnedValue(returnedVal interface{}) (*any.Any, error) {
khenaidooabad44c2018-08-03 16:58:35 -0400497 // Encode the response argument - needs to be a proto message
498 if returnedVal == nil {
499 return nil, nil
500 }
501 protoValue, ok := returnedVal.(proto.Message)
502 if !ok {
503 log.Warnw("response-value-not-proto-message", log.Fields{"error": ok, "returnVal": returnedVal})
504 err := errors.New("response-value-not-proto-message")
505 return nil, err
506 }
507
508 // Marshal the returned value, if any
509 var marshalledReturnedVal *any.Any
510 var err error
511 if marshalledReturnedVal, err = ptypes.MarshalAny(protoValue); err != nil {
512 log.Warnw("cannot-marshal-returned-val", log.Fields{"error": err})
513 return nil, err
514 }
515 return marshalledReturnedVal, nil
516}
517
khenaidoo79232702018-12-04 11:00:41 -0500518func encodeDefaultFailedResponse(request *ic.InterContainerMessage) *ic.InterContainerMessage {
519 responseHeader := &ic.Header{
khenaidooabad44c2018-08-03 16:58:35 -0400520 Id: request.Header.Id,
khenaidoo79232702018-12-04 11:00:41 -0500521 Type: ic.MessageType_RESPONSE,
khenaidooabad44c2018-08-03 16:58:35 -0400522 FromTopic: request.Header.ToTopic,
523 ToTopic: request.Header.FromTopic,
524 Timestamp: time.Now().Unix(),
525 }
khenaidoo79232702018-12-04 11:00:41 -0500526 responseBody := &ic.InterContainerResponseBody{
khenaidooabad44c2018-08-03 16:58:35 -0400527 Success: false,
528 Result: nil,
529 }
530 var marshalledResponseBody *any.Any
531 var err error
532 // Error should never happen here
533 if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
534 log.Warnw("cannot-marshal-failed-response-body", log.Fields{"error": err})
535 }
536
khenaidoo79232702018-12-04 11:00:41 -0500537 return &ic.InterContainerMessage{
khenaidooabad44c2018-08-03 16:58:35 -0400538 Header: responseHeader,
539 Body: marshalledResponseBody,
540 }
541
542}
543
544//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
545//or an error on failure
khenaidoo79232702018-12-04 11:00:41 -0500546func encodeResponse(request *ic.InterContainerMessage, success bool, returnedValues ...interface{}) (*ic.InterContainerMessage, error) {
khenaidoo43c82122018-11-22 18:38:28 -0500547 //log.Debugw("encodeResponse", log.Fields{"success": success, "returnedValues": returnedValues})
khenaidoo79232702018-12-04 11:00:41 -0500548 responseHeader := &ic.Header{
khenaidooabad44c2018-08-03 16:58:35 -0400549 Id: request.Header.Id,
khenaidoo79232702018-12-04 11:00:41 -0500550 Type: ic.MessageType_RESPONSE,
khenaidooabad44c2018-08-03 16:58:35 -0400551 FromTopic: request.Header.ToTopic,
552 ToTopic: request.Header.FromTopic,
khenaidoo2c6a0992019-04-29 13:46:56 -0400553 KeyTopic: request.Header.KeyTopic,
khenaidoo8f474192019-04-03 17:20:44 -0400554 Timestamp: time.Now().UnixNano(),
khenaidooabad44c2018-08-03 16:58:35 -0400555 }
556
557 // Go over all returned values
558 var marshalledReturnedVal *any.Any
559 var err error
560 for _, returnVal := range returnedValues {
khenaidoo43c82122018-11-22 18:38:28 -0500561 if marshalledReturnedVal, err = encodeReturnedValue(returnVal); err != nil {
khenaidooabad44c2018-08-03 16:58:35 -0400562 log.Warnw("cannot-marshal-response-body", log.Fields{"error": err})
563 }
564 break // for now we support only 1 returned value - (excluding the error)
565 }
566
khenaidoo79232702018-12-04 11:00:41 -0500567 responseBody := &ic.InterContainerResponseBody{
khenaidooabad44c2018-08-03 16:58:35 -0400568 Success: success,
569 Result: marshalledReturnedVal,
570 }
571
572 // Marshal the response body
573 var marshalledResponseBody *any.Any
574 if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
575 log.Warnw("cannot-marshal-response-body", log.Fields{"error": err})
576 return nil, err
577 }
578
khenaidoo79232702018-12-04 11:00:41 -0500579 return &ic.InterContainerMessage{
khenaidooabad44c2018-08-03 16:58:35 -0400580 Header: responseHeader,
581 Body: marshalledResponseBody,
582 }, nil
583}
584
585func CallFuncByName(myClass interface{}, funcName string, params ...interface{}) (out []reflect.Value, err error) {
586 myClassValue := reflect.ValueOf(myClass)
khenaidoo19374072018-12-11 11:05:15 -0500587 // Capitalize the first letter in the funcName to workaround the first capital letters required to
588 // invoke a function from a different package
589 funcName = strings.Title(funcName)
khenaidooabad44c2018-08-03 16:58:35 -0400590 m := myClassValue.MethodByName(funcName)
591 if !m.IsValid() {
khenaidoo43c82122018-11-22 18:38:28 -0500592 return make([]reflect.Value, 0), fmt.Errorf("method-not-found \"%s\"", funcName)
khenaidooabad44c2018-08-03 16:58:35 -0400593 }
594 in := make([]reflect.Value, len(params))
595 for i, param := range params {
596 in[i] = reflect.ValueOf(param)
597 }
598 out = m.Call(in)
599 return
600}
601
khenaidoo297cd252019-02-07 22:10:23 -0500602func (kp *InterContainerProxy) addTransactionId(transactionId string, currentArgs []*ic.Argument) []*ic.Argument {
603 arg := &KVArg{
604 Key: TransactionKey,
605 Value: &ic.StrType{Val: transactionId},
606 }
607
608 var marshalledArg *any.Any
609 var err error
610 if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: transactionId}); err != nil {
611 log.Warnw("cannot-add-transactionId", log.Fields{"error": err})
612 return currentArgs
613 }
614 protoArg := &ic.Argument{
615 Key: arg.Key,
616 Value: marshalledArg,
617 }
618 return append(currentArgs, protoArg)
619}
620
khenaidoo54e0ddf2019-02-27 16:21:33 -0500621func (kp *InterContainerProxy) addFromTopic(fromTopic string, currentArgs []*ic.Argument) []*ic.Argument {
622 var marshalledArg *any.Any
623 var err error
624 if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: fromTopic}); err != nil {
625 log.Warnw("cannot-add-transactionId", log.Fields{"error": err})
626 return currentArgs
627 }
628 protoArg := &ic.Argument{
629 Key: FromTopic,
630 Value: marshalledArg,
631 }
632 return append(currentArgs, protoArg)
633}
634
635func (kp *InterContainerProxy) handleMessage(msg *ic.InterContainerMessage, targetInterface interface{}) {
khenaidooabad44c2018-08-03 16:58:35 -0400636
khenaidoo43c82122018-11-22 18:38:28 -0500637 // First extract the header to know whether this is a request - responses are handled by a different handler
khenaidoo79232702018-12-04 11:00:41 -0500638 if msg.Header.Type == ic.MessageType_REQUEST {
khenaidooabad44c2018-08-03 16:58:35 -0400639 var out []reflect.Value
640 var err error
641
642 // Get the request body
khenaidoo79232702018-12-04 11:00:41 -0500643 requestBody := &ic.InterContainerRequestBody{}
khenaidooabad44c2018-08-03 16:58:35 -0400644 if err = ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
645 log.Warnw("cannot-unmarshal-request", log.Fields{"error": err})
646 } else {
khenaidoo43c82122018-11-22 18:38:28 -0500647 log.Debugw("received-request", log.Fields{"rpc": requestBody.Rpc, "header": msg.Header})
khenaidooabad44c2018-08-03 16:58:35 -0400648 // let the callee unpack the arguments as its the only one that knows the real proto type
khenaidoo297cd252019-02-07 22:10:23 -0500649 // Augment the requestBody with the message Id as it will be used in scenarios where cores
650 // are set in pairs and competing
651 requestBody.Args = kp.addTransactionId(msg.Header.Id, requestBody.Args)
khenaidoo54e0ddf2019-02-27 16:21:33 -0500652
653 // Augment the requestBody with the From topic name as it will be used in scenarios where a container
654 // needs to send an unsollicited message to the currently requested container
655 requestBody.Args = kp.addFromTopic(msg.Header.FromTopic, requestBody.Args)
656
khenaidooabad44c2018-08-03 16:58:35 -0400657 out, err = CallFuncByName(targetInterface, requestBody.Rpc, requestBody.Args)
658 if err != nil {
659 log.Warn(err)
660 }
661 }
662 // Response required?
663 if requestBody.ResponseRequired {
664 // If we already have an error before then just return that
khenaidoo79232702018-12-04 11:00:41 -0500665 var returnError *ic.Error
khenaidooabad44c2018-08-03 16:58:35 -0400666 var returnedValues []interface{}
667 var success bool
668 if err != nil {
khenaidoo79232702018-12-04 11:00:41 -0500669 returnError = &ic.Error{Reason: err.Error()}
khenaidooabad44c2018-08-03 16:58:35 -0400670 returnedValues = make([]interface{}, 1)
671 returnedValues[0] = returnError
672 } else {
khenaidoob9203542018-09-17 22:56:37 -0400673 returnedValues = make([]interface{}, 0)
674 // Check for errors first
675 lastIndex := len(out) - 1
676 if out[lastIndex].Interface() != nil { // Error
677 if goError, ok := out[lastIndex].Interface().(error); ok {
khenaidoo79232702018-12-04 11:00:41 -0500678 returnError = &ic.Error{Reason: goError.Error()}
khenaidoob9203542018-09-17 22:56:37 -0400679 returnedValues = append(returnedValues, returnError)
680 } else { // Should never happen
khenaidoo79232702018-12-04 11:00:41 -0500681 returnError = &ic.Error{Reason: "incorrect-error-returns"}
khenaidoob9203542018-09-17 22:56:37 -0400682 returnedValues = append(returnedValues, returnError)
683 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500684 } else if len(out) == 2 && reflect.ValueOf(out[0].Interface()).IsValid() && reflect.ValueOf(out[0].Interface()).IsNil() {
khenaidoo297cd252019-02-07 22:10:23 -0500685 return // Ignore case - when core is in competing mode
khenaidoob9203542018-09-17 22:56:37 -0400686 } else { // Non-error case
687 success = true
688 for idx, val := range out {
khenaidoo43c82122018-11-22 18:38:28 -0500689 //log.Debugw("returned-api-response-loop", log.Fields{"idx": idx, "val": val.Interface()})
khenaidoob9203542018-09-17 22:56:37 -0400690 if idx != lastIndex {
691 returnedValues = append(returnedValues, val.Interface())
khenaidooabad44c2018-08-03 16:58:35 -0400692 }
khenaidooabad44c2018-08-03 16:58:35 -0400693 }
694 }
695 }
696
khenaidoo79232702018-12-04 11:00:41 -0500697 var icm *ic.InterContainerMessage
khenaidooabad44c2018-08-03 16:58:35 -0400698 if icm, err = encodeResponse(msg, success, returnedValues...); err != nil {
khenaidoob9203542018-09-17 22:56:37 -0400699 log.Warnw("error-encoding-response-returning-failure-result", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400700 icm = encodeDefaultFailedResponse(msg)
701 }
khenaidoo43c82122018-11-22 18:38:28 -0500702 // To preserve ordering of messages, all messages to a given topic are sent to the same partition
703 // by providing a message key. The key is encoded in the topic name. If the deviceId is not
704 // present then the key will be empty, hence all messages for a given topic will be sent to all
705 // partitions.
706 replyTopic := &Topic{Name: msg.Header.FromTopic}
khenaidoobdcb8e02019-03-06 16:28:56 -0500707 key := msg.Header.KeyTopic
khenaidoo90847922018-12-03 14:47:51 -0500708 log.Debugw("sending-response-to-kafka", log.Fields{"rpc": requestBody.Rpc, "header": icm.Header, "key": key})
khenaidoo43c82122018-11-22 18:38:28 -0500709 // TODO: handle error response.
khenaidoo2c6a0992019-04-29 13:46:56 -0400710 go kp.kafkaClient.Send(icm, replyTopic, key)
khenaidooabad44c2018-08-03 16:58:35 -0400711 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500712 } else if msg.Header.Type == ic.MessageType_RESPONSE {
khenaidoo1ce37ad2019-03-24 22:07:24 -0400713 log.Debugw("response-received", log.Fields{"msg-header": msg.Header})
khenaidoo54e0ddf2019-02-27 16:21:33 -0500714 go kp.dispatchResponse(msg)
715 } else {
khenaidoo1ce37ad2019-03-24 22:07:24 -0400716 log.Warnw("unsupported-message-received", log.Fields{"msg-header": msg.Header})
khenaidooabad44c2018-08-03 16:58:35 -0400717 }
718}
719
khenaidoo54e0ddf2019-02-27 16:21:33 -0500720func (kp *InterContainerProxy) waitForMessages(ch <-chan *ic.InterContainerMessage, topic Topic, targetInterface interface{}) {
khenaidooabad44c2018-08-03 16:58:35 -0400721 // Wait for messages
722 for msg := range ch {
khenaidoo43c82122018-11-22 18:38:28 -0500723 //log.Debugw("request-received", log.Fields{"msg": msg, "topic": topic.Name, "target": targetInterface})
khenaidoo54e0ddf2019-02-27 16:21:33 -0500724 go kp.handleMessage(msg, targetInterface)
khenaidooabad44c2018-08-03 16:58:35 -0400725 }
726}
727
khenaidoo79232702018-12-04 11:00:41 -0500728func (kp *InterContainerProxy) dispatchResponse(msg *ic.InterContainerMessage) {
khenaidoo1ce37ad2019-03-24 22:07:24 -0400729 kp.lockTransactionIdToChannelMap.RLock()
730 defer kp.lockTransactionIdToChannelMap.RUnlock()
khenaidooabad44c2018-08-03 16:58:35 -0400731 if _, exist := kp.transactionIdToChannelMap[msg.Header.Id]; !exist {
732 log.Debugw("no-waiting-channel", log.Fields{"transaction": msg.Header.Id})
733 return
734 }
khenaidoo43c82122018-11-22 18:38:28 -0500735 kp.transactionIdToChannelMap[msg.Header.Id].ch <- msg
khenaidooabad44c2018-08-03 16:58:35 -0400736}
737
khenaidooabad44c2018-08-03 16:58:35 -0400738// subscribeForResponse allows a caller to subscribe to a given topic when waiting for a response.
739// This method is built to prevent all subscribers to receive all messages as is the case of the Subscribe
740// API. There is one response channel waiting for kafka messages before dispatching the message to the
741// corresponding waiting channel
khenaidoo79232702018-12-04 11:00:41 -0500742func (kp *InterContainerProxy) subscribeForResponse(topic Topic, trnsId string) (chan *ic.InterContainerMessage, error) {
khenaidoob9203542018-09-17 22:56:37 -0400743 log.Debugw("subscribeForResponse", log.Fields{"topic": topic.Name, "trnsid": trnsId})
khenaidooabad44c2018-08-03 16:58:35 -0400744
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500745 // Create a specific channel for this consumers. We cannot use the channel from the kafkaclient as it will
khenaidoo43c82122018-11-22 18:38:28 -0500746 // broadcast any message for this topic to all channels waiting on it.
khenaidoo79232702018-12-04 11:00:41 -0500747 ch := make(chan *ic.InterContainerMessage)
khenaidoo43c82122018-11-22 18:38:28 -0500748 kp.addToTransactionIdToChannelMap(trnsId, &topic, ch)
khenaidooabad44c2018-08-03 16:58:35 -0400749
750 return ch, nil
751}
752
khenaidoo43c82122018-11-22 18:38:28 -0500753func (kp *InterContainerProxy) unSubscribeForResponse(trnsId string) error {
khenaidooabad44c2018-08-03 16:58:35 -0400754 log.Debugw("unsubscribe-for-response", log.Fields{"trnsId": trnsId})
khenaidoo7ff26c72019-01-16 14:55:48 -0500755 kp.deleteFromTransactionIdToChannelMap(trnsId)
khenaidooabad44c2018-08-03 16:58:35 -0400756 return nil
757}
758
759//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
760//or an error on failure
khenaidoobdcb8e02019-03-06 16:28:56 -0500761func encodeRequest(rpc string, toTopic *Topic, replyTopic *Topic, key string, kvArgs ...*KVArg) (*ic.InterContainerMessage, error) {
khenaidoo79232702018-12-04 11:00:41 -0500762 requestHeader := &ic.Header{
khenaidooabad44c2018-08-03 16:58:35 -0400763 Id: uuid.New().String(),
khenaidoo79232702018-12-04 11:00:41 -0500764 Type: ic.MessageType_REQUEST,
khenaidooabad44c2018-08-03 16:58:35 -0400765 FromTopic: replyTopic.Name,
766 ToTopic: toTopic.Name,
khenaidoo2c6a0992019-04-29 13:46:56 -0400767 KeyTopic: key,
khenaidoo8f474192019-04-03 17:20:44 -0400768 Timestamp: time.Now().UnixNano(),
khenaidooabad44c2018-08-03 16:58:35 -0400769 }
khenaidoo79232702018-12-04 11:00:41 -0500770 requestBody := &ic.InterContainerRequestBody{
khenaidooabad44c2018-08-03 16:58:35 -0400771 Rpc: rpc,
772 ResponseRequired: true,
773 ReplyToTopic: replyTopic.Name,
774 }
775
776 for _, arg := range kvArgs {
khenaidoo2c6f1672018-09-20 23:14:41 -0400777 if arg == nil {
778 // In case the caller sends an array with empty args
779 continue
780 }
khenaidooabad44c2018-08-03 16:58:35 -0400781 var marshalledArg *any.Any
782 var err error
783 // ascertain the value interface type is a proto.Message
784 protoValue, ok := arg.Value.(proto.Message)
785 if !ok {
786 log.Warnw("argument-value-not-proto-message", log.Fields{"error": ok, "Value": arg.Value})
787 err := errors.New("argument-value-not-proto-message")
788 return nil, err
789 }
790 if marshalledArg, err = ptypes.MarshalAny(protoValue); err != nil {
791 log.Warnw("cannot-marshal-request", log.Fields{"error": err})
792 return nil, err
793 }
khenaidoo79232702018-12-04 11:00:41 -0500794 protoArg := &ic.Argument{
khenaidooabad44c2018-08-03 16:58:35 -0400795 Key: arg.Key,
796 Value: marshalledArg,
797 }
798 requestBody.Args = append(requestBody.Args, protoArg)
799 }
800
801 var marshalledData *any.Any
802 var err error
803 if marshalledData, err = ptypes.MarshalAny(requestBody); err != nil {
804 log.Warnw("cannot-marshal-request", log.Fields{"error": err})
805 return nil, err
806 }
khenaidoo79232702018-12-04 11:00:41 -0500807 request := &ic.InterContainerMessage{
khenaidooabad44c2018-08-03 16:58:35 -0400808 Header: requestHeader,
809 Body: marshalledData,
810 }
811 return request, nil
812}
813
khenaidoo79232702018-12-04 11:00:41 -0500814func decodeResponse(response *ic.InterContainerMessage) (*ic.InterContainerResponseBody, error) {
khenaidooabad44c2018-08-03 16:58:35 -0400815 // Extract the message body
khenaidoo79232702018-12-04 11:00:41 -0500816 responseBody := ic.InterContainerResponseBody{}
khenaidooabad44c2018-08-03 16:58:35 -0400817 if err := ptypes.UnmarshalAny(response.Body, &responseBody); err != nil {
818 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
819 return nil, err
820 }
khenaidoo43c82122018-11-22 18:38:28 -0500821 //log.Debugw("response-decoded-successfully", log.Fields{"response-status": &responseBody.Success})
khenaidooabad44c2018-08-03 16:58:35 -0400822
823 return &responseBody, nil
824
825}