blob: c0670d333124fa50f3c265440784bda68ba6f39d [file] [log] [blame]
khenaidoobf6e7bb2018-08-14 22:27:29 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
khenaidooabad44c2018-08-03 16:58:35 -040016package kafka
17
18import (
19 "context"
20 "errors"
21 "fmt"
khenaidooabad44c2018-08-03 16:58:35 -040022 "github.com/golang/protobuf/proto"
23 "github.com/golang/protobuf/ptypes"
24 "github.com/golang/protobuf/ptypes/any"
25 "github.com/google/uuid"
26 "github.com/opencord/voltha-go/common/log"
khenaidoo79232702018-12-04 11:00:41 -050027 ic "github.com/opencord/voltha-go/protos/inter_container"
khenaidooabad44c2018-08-03 16:58:35 -040028 "reflect"
khenaidoo19374072018-12-11 11:05:15 -050029 "strings"
khenaidooabad44c2018-08-03 16:58:35 -040030 "sync"
31 "time"
32)
33
34// Initialize the logger - gets the default until the main function setup the logger
35func init() {
khenaidooca301322019-01-09 23:06:32 -050036 log.AddPackage(log.JSON, log.DebugLevel, nil)
khenaidooabad44c2018-08-03 16:58:35 -040037}
38
39const (
khenaidoo43c82122018-11-22 18:38:28 -050040 DefaultMaxRetries = 3
khenaidooca301322019-01-09 23:06:32 -050041 DefaultRequestTimeout = 3000 // 3000 milliseconds - to handle a wider latency range
khenaidooabad44c2018-08-03 16:58:35 -040042)
43
khenaidoo43c82122018-11-22 18:38:28 -050044// requestHandlerChannel represents an interface associated with a channel. Whenever, an event is
45// obtained from that channel, this interface is invoked. This is used to handle
46// async requests into the Core via the kafka messaging bus
47type requestHandlerChannel struct {
48 requesthandlerInterface interface{}
khenaidoo79232702018-12-04 11:00:41 -050049 ch <-chan *ic.InterContainerMessage
khenaidooabad44c2018-08-03 16:58:35 -040050}
51
khenaidoo43c82122018-11-22 18:38:28 -050052// transactionChannel represents a combination of a topic and a channel onto which a response received
53// on the kafka bus will be sent to
54type transactionChannel struct {
55 topic *Topic
khenaidoo79232702018-12-04 11:00:41 -050056 ch chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -050057}
58
59// InterContainerProxy represents the messaging proxy
60type InterContainerProxy struct {
61 kafkaHost string
62 kafkaPort int
63 DefaultTopic *Topic
64 defaultRequestHandlerInterface interface{}
khenaidoo79232702018-12-04 11:00:41 -050065 deviceDiscoveryTopic *Topic
khenaidoo43c82122018-11-22 18:38:28 -050066 kafkaClient Client
67 doneCh chan int
68
69 // This map is used to map a topic to an interface and channel. When a request is received
70 // on that channel (registered to the topic) then that interface is invoked.
71 topicToRequestHandlerChannelMap map[string]*requestHandlerChannel
72 lockTopicRequestHandlerChannelMap sync.RWMutex
73
74 // This map is used to map a channel to a response topic. This channel handles all responses on that
khenaidoo4c1a5bf2018-11-29 15:53:42 -050075 // channel for that topic and forward them to the appropriate consumers channel, using the
khenaidoo43c82122018-11-22 18:38:28 -050076 // transactionIdToChannelMap.
khenaidoo79232702018-12-04 11:00:41 -050077 topicToResponseChannelMap map[string]<-chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -050078 lockTopicResponseChannelMap sync.RWMutex
79
khenaidoo4c1a5bf2018-11-29 15:53:42 -050080 // This map is used to map a transaction to a consumers channel. This is used whenever a request has been
khenaidoo43c82122018-11-22 18:38:28 -050081 // sent out and we are waiting for a response.
82 transactionIdToChannelMap map[string]*transactionChannel
khenaidooabad44c2018-08-03 16:58:35 -040083 lockTransactionIdToChannelMap sync.RWMutex
84}
85
khenaidoo43c82122018-11-22 18:38:28 -050086type InterContainerProxyOption func(*InterContainerProxy)
khenaidooabad44c2018-08-03 16:58:35 -040087
khenaidoo43c82122018-11-22 18:38:28 -050088func InterContainerHost(host string) InterContainerProxyOption {
89 return func(args *InterContainerProxy) {
90 args.kafkaHost = host
khenaidooabad44c2018-08-03 16:58:35 -040091 }
92}
93
khenaidoo43c82122018-11-22 18:38:28 -050094func InterContainerPort(port int) InterContainerProxyOption {
95 return func(args *InterContainerProxy) {
96 args.kafkaPort = port
khenaidooabad44c2018-08-03 16:58:35 -040097 }
98}
99
khenaidoo43c82122018-11-22 18:38:28 -0500100func DefaultTopic(topic *Topic) InterContainerProxyOption {
101 return func(args *InterContainerProxy) {
khenaidooabad44c2018-08-03 16:58:35 -0400102 args.DefaultTopic = topic
103 }
104}
105
khenaidoo79232702018-12-04 11:00:41 -0500106func DeviceDiscoveryTopic(topic *Topic) InterContainerProxyOption {
107 return func(args *InterContainerProxy) {
108 args.deviceDiscoveryTopic = topic
109 }
110}
111
khenaidoo43c82122018-11-22 18:38:28 -0500112func RequestHandlerInterface(handler interface{}) InterContainerProxyOption {
113 return func(args *InterContainerProxy) {
114 args.defaultRequestHandlerInterface = handler
khenaidooabad44c2018-08-03 16:58:35 -0400115 }
116}
117
khenaidoo43c82122018-11-22 18:38:28 -0500118func MsgClient(client Client) InterContainerProxyOption {
119 return func(args *InterContainerProxy) {
120 args.kafkaClient = client
121 }
122}
123
124func NewInterContainerProxy(opts ...InterContainerProxyOption) (*InterContainerProxy, error) {
125 proxy := &InterContainerProxy{
126 kafkaHost: DefaultKafkaHost,
127 kafkaPort: DefaultKafkaPort,
khenaidooabad44c2018-08-03 16:58:35 -0400128 }
129
130 for _, option := range opts {
131 option(proxy)
132 }
133
134 // Create the locks for all the maps
khenaidoo43c82122018-11-22 18:38:28 -0500135 proxy.lockTopicRequestHandlerChannelMap = sync.RWMutex{}
khenaidooabad44c2018-08-03 16:58:35 -0400136 proxy.lockTransactionIdToChannelMap = sync.RWMutex{}
khenaidoo43c82122018-11-22 18:38:28 -0500137 proxy.lockTopicResponseChannelMap = sync.RWMutex{}
khenaidooabad44c2018-08-03 16:58:35 -0400138
139 return proxy, nil
140}
141
khenaidoo43c82122018-11-22 18:38:28 -0500142func (kp *InterContainerProxy) Start() error {
khenaidooabad44c2018-08-03 16:58:35 -0400143 log.Info("Starting-Proxy")
144
khenaidoo43c82122018-11-22 18:38:28 -0500145 // Kafka MsgClient should already have been created. If not, output fatal error
146 if kp.kafkaClient == nil {
147 log.Fatal("kafka-client-not-set")
148 }
149
khenaidooabad44c2018-08-03 16:58:35 -0400150 // Create the Done channel
151 kp.doneCh = make(chan int, 1)
152
khenaidoo43c82122018-11-22 18:38:28 -0500153 // Start the kafka client
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500154 if err := kp.kafkaClient.Start(); err != nil {
khenaidoo43c82122018-11-22 18:38:28 -0500155 log.Errorw("Cannot-create-kafka-proxy", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400156 return err
157 }
158
khenaidoo43c82122018-11-22 18:38:28 -0500159 // Create the topic to response channel map
khenaidoo79232702018-12-04 11:00:41 -0500160 kp.topicToResponseChannelMap = make(map[string]<-chan *ic.InterContainerMessage)
khenaidoo43c82122018-11-22 18:38:28 -0500161 //
khenaidooabad44c2018-08-03 16:58:35 -0400162 // Create the transactionId to Channel Map
khenaidoo43c82122018-11-22 18:38:28 -0500163 kp.transactionIdToChannelMap = make(map[string]*transactionChannel)
164
165 // Create the topic to request channel map
166 kp.topicToRequestHandlerChannelMap = make(map[string]*requestHandlerChannel)
khenaidooabad44c2018-08-03 16:58:35 -0400167
168 return nil
169}
170
khenaidoo43c82122018-11-22 18:38:28 -0500171func (kp *InterContainerProxy) Stop() {
172 log.Info("stopping-intercontainer-proxy")
173 kp.doneCh <- 1
174 // TODO : Perform cleanup
khenaidooca301322019-01-09 23:06:32 -0500175 kp.kafkaClient.Stop()
khenaidoo43c82122018-11-22 18:38:28 -0500176 //kp.deleteAllTopicRequestHandlerChannelMap()
177 //kp.deleteAllTopicResponseChannelMap()
178 //kp.deleteAllTransactionIdToChannelMap()
khenaidooabad44c2018-08-03 16:58:35 -0400179}
180
khenaidoo79232702018-12-04 11:00:41 -0500181// DeviceDiscovered publish the discovered device onto the kafka messaging bus
khenaidoo19374072018-12-11 11:05:15 -0500182func (kp *InterContainerProxy) DeviceDiscovered(deviceId string, deviceType string, parentId string, publisher string) error {
khenaidoo79232702018-12-04 11:00:41 -0500183 log.Debugw("sending-device-discovery-msg", log.Fields{"deviceId": deviceId})
184 // Simple validation
185 if deviceId == "" || deviceType == "" {
186 log.Errorw("invalid-parameters", log.Fields{"id": deviceId, "type": deviceType})
187 return errors.New("invalid-parameters")
188 }
189 // Create the device discovery message
190 header := &ic.Header{
191 Id: uuid.New().String(),
192 Type: ic.MessageType_DEVICE_DISCOVERED,
193 FromTopic: kp.DefaultTopic.Name,
194 ToTopic: kp.deviceDiscoveryTopic.Name,
195 Timestamp: time.Now().UnixNano(),
196 }
197 body := &ic.DeviceDiscovered{
198 Id: deviceId,
199 DeviceType: deviceType,
200 ParentId: parentId,
khenaidood2b6df92018-12-13 16:37:20 -0500201 Publisher: publisher,
khenaidoo79232702018-12-04 11:00:41 -0500202 }
203
204 var marshalledData *any.Any
205 var err error
206 if marshalledData, err = ptypes.MarshalAny(body); err != nil {
207 log.Errorw("cannot-marshal-request", log.Fields{"error": err})
208 return err
209 }
210 msg := &ic.InterContainerMessage{
211 Header: header,
212 Body: marshalledData,
213 }
214
215 // Send the message
216 if err := kp.kafkaClient.Send(msg, kp.deviceDiscoveryTopic); err != nil {
217 log.Errorw("cannot-send-device-discovery-message", log.Fields{"error": err})
218 return err
219 }
220 return nil
221}
222
khenaidoo43c82122018-11-22 18:38:28 -0500223// InvokeRPC is used to send a request to a given topic
224func (kp *InterContainerProxy) InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
225 waitForResponse bool, kvArgs ...*KVArg) (bool, *any.Any) {
226
227 // If a replyToTopic is provided then we use it, otherwise just use the default toTopic. The replyToTopic is
228 // typically the device ID.
229 responseTopic := replyToTopic
230 if responseTopic == nil {
231 responseTopic = kp.DefaultTopic
232 }
233
khenaidooabad44c2018-08-03 16:58:35 -0400234 // Encode the request
khenaidoo43c82122018-11-22 18:38:28 -0500235 protoRequest, err := encodeRequest(rpc, toTopic, responseTopic, kvArgs...)
khenaidooabad44c2018-08-03 16:58:35 -0400236 if err != nil {
237 log.Warnw("cannot-format-request", log.Fields{"rpc": rpc, "error": err})
238 return false, nil
239 }
240
241 // Subscribe for response, if needed, before sending request
khenaidoo79232702018-12-04 11:00:41 -0500242 var ch <-chan *ic.InterContainerMessage
khenaidooabad44c2018-08-03 16:58:35 -0400243 if waitForResponse {
244 var err error
khenaidoo43c82122018-11-22 18:38:28 -0500245 if ch, err = kp.subscribeForResponse(*responseTopic, protoRequest.Header.Id); err != nil {
246 log.Errorw("failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
khenaidooabad44c2018-08-03 16:58:35 -0400247 }
248 }
249
khenaidoo43c82122018-11-22 18:38:28 -0500250 // Send request - if the topic is formatted with a device Id then we will send the request using a
251 // specific key, hence ensuring a single partition is used to publish the request. This ensures that the
252 // subscriber on that topic will receive the request in the order it was sent. The key used is the deviceId.
253 key := GetDeviceIdFromTopic(*toTopic)
254 log.Debugw("sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key})
255 go kp.kafkaClient.Send(protoRequest, toTopic, key)
khenaidooabad44c2018-08-03 16:58:35 -0400256
257 if waitForResponse {
khenaidoob9203542018-09-17 22:56:37 -0400258 // Create a child context based on the parent context, if any
khenaidooabad44c2018-08-03 16:58:35 -0400259 var cancel context.CancelFunc
khenaidoob9203542018-09-17 22:56:37 -0400260 childCtx := context.Background()
khenaidooabad44c2018-08-03 16:58:35 -0400261 if ctx == nil {
262 ctx, cancel = context.WithTimeout(context.Background(), DefaultRequestTimeout*time.Millisecond)
khenaidoob9203542018-09-17 22:56:37 -0400263 } else {
264 childCtx, cancel = context.WithTimeout(ctx, DefaultRequestTimeout*time.Millisecond)
khenaidooabad44c2018-08-03 16:58:35 -0400265 }
khenaidoob9203542018-09-17 22:56:37 -0400266 defer cancel()
khenaidooabad44c2018-08-03 16:58:35 -0400267
268 // Wait for response as well as timeout or cancellation
269 // Remove the subscription for a response on return
270 defer kp.unSubscribeForResponse(protoRequest.Header.Id)
271 select {
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500272 case msg, ok := <-ch:
273 if !ok {
274 log.Warnw("channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
275 protoError := &ic.Error{Reason: "channel-closed"}
276 var marshalledArg *any.Any
277 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
278 return false, nil // Should never happen
279 }
280 return false, marshalledArg
281 }
khenaidoo43c82122018-11-22 18:38:28 -0500282 log.Debugw("received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
khenaidoo79232702018-12-04 11:00:41 -0500283 var responseBody *ic.InterContainerResponseBody
khenaidooabad44c2018-08-03 16:58:35 -0400284 var err error
285 if responseBody, err = decodeResponse(msg); err != nil {
286 log.Errorw("decode-response-error", log.Fields{"error": err})
287 }
288 return responseBody.Success, responseBody.Result
289 case <-ctx.Done():
290 log.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
291 // pack the error as proto any type
khenaidoo79232702018-12-04 11:00:41 -0500292 protoError := &ic.Error{Reason: ctx.Err().Error()}
khenaidooabad44c2018-08-03 16:58:35 -0400293 var marshalledArg *any.Any
294 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
295 return false, nil // Should never happen
296 }
297 return false, marshalledArg
khenaidoob9203542018-09-17 22:56:37 -0400298 case <-childCtx.Done():
299 log.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": childCtx.Err()})
300 // pack the error as proto any type
khenaidoo79232702018-12-04 11:00:41 -0500301 protoError := &ic.Error{Reason: childCtx.Err().Error()}
khenaidoob9203542018-09-17 22:56:37 -0400302 var marshalledArg *any.Any
303 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
304 return false, nil // Should never happen
305 }
306 return false, marshalledArg
khenaidooabad44c2018-08-03 16:58:35 -0400307 case <-kp.doneCh:
khenaidoo43c82122018-11-22 18:38:28 -0500308 log.Infow("received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
khenaidooabad44c2018-08-03 16:58:35 -0400309 return true, nil
310 }
311 }
312 return true, nil
313}
314
khenaidoo43c82122018-11-22 18:38:28 -0500315// SubscribeWithRequestHandlerInterface allows a caller to assign a target object to be invoked automatically
khenaidooabad44c2018-08-03 16:58:35 -0400316// when a message is received on a given topic
khenaidoo43c82122018-11-22 18:38:28 -0500317func (kp *InterContainerProxy) SubscribeWithRequestHandlerInterface(topic Topic, handler interface{}) error {
khenaidooabad44c2018-08-03 16:58:35 -0400318
319 // Subscribe to receive messages for that topic
khenaidoo79232702018-12-04 11:00:41 -0500320 var ch <-chan *ic.InterContainerMessage
khenaidooabad44c2018-08-03 16:58:35 -0400321 var err error
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500322 if ch, err = kp.kafkaClient.Subscribe(&topic); err != nil {
khenaidoo43c82122018-11-22 18:38:28 -0500323 //if ch, err = kp.Subscribe(topic); err != nil {
khenaidooabad44c2018-08-03 16:58:35 -0400324 log.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
325 }
khenaidoo43c82122018-11-22 18:38:28 -0500326
327 kp.defaultRequestHandlerInterface = handler
328 kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: handler, ch: ch})
khenaidooabad44c2018-08-03 16:58:35 -0400329 // Launch a go routine to receive and process kafka messages
khenaidoo43c82122018-11-22 18:38:28 -0500330 go kp.waitForRequest(ch, topic, handler)
khenaidooabad44c2018-08-03 16:58:35 -0400331
332 return nil
333}
334
khenaidoo43c82122018-11-22 18:38:28 -0500335// SubscribeWithDefaultRequestHandler allows a caller to add a topic to an existing target object to be invoked automatically
336// when a message is received on a given topic. So far there is only 1 target registered per microservice
337func (kp *InterContainerProxy) SubscribeWithDefaultRequestHandler(topic Topic) error {
338 // Subscribe to receive messages for that topic
khenaidoo79232702018-12-04 11:00:41 -0500339 var ch <-chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -0500340 var err error
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500341 if ch, err = kp.kafkaClient.Subscribe(&topic); err != nil {
khenaidoo43c82122018-11-22 18:38:28 -0500342 log.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
khenaidooca301322019-01-09 23:06:32 -0500343 return err
khenaidoo43c82122018-11-22 18:38:28 -0500344 }
345 kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: kp.defaultRequestHandlerInterface, ch: ch})
346
347 // Launch a go routine to receive and process kafka messages
348 go kp.waitForRequest(ch, topic, kp.defaultRequestHandlerInterface)
349
khenaidooabad44c2018-08-03 16:58:35 -0400350 return nil
351}
352
khenaidoo43c82122018-11-22 18:38:28 -0500353func (kp *InterContainerProxy) UnSubscribeFromRequestHandler(topic Topic) error {
354 return kp.deleteFromTopicRequestHandlerChannelMap(topic.Name)
355}
356
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500357// setupTopicResponseChannelMap sets up single consumers channel that will act as a broadcast channel for all
khenaidoo43c82122018-11-22 18:38:28 -0500358// responses from that topic.
khenaidoo79232702018-12-04 11:00:41 -0500359func (kp *InterContainerProxy) setupTopicResponseChannelMap(topic string, arg <-chan *ic.InterContainerMessage) {
khenaidoo43c82122018-11-22 18:38:28 -0500360 kp.lockTopicResponseChannelMap.Lock()
361 defer kp.lockTopicResponseChannelMap.Unlock()
362 if _, exist := kp.topicToResponseChannelMap[topic]; !exist {
363 kp.topicToResponseChannelMap[topic] = arg
khenaidooabad44c2018-08-03 16:58:35 -0400364 }
365}
366
khenaidoo43c82122018-11-22 18:38:28 -0500367func (kp *InterContainerProxy) isTopicSubscribedForResponse(topic string) bool {
368 kp.lockTopicResponseChannelMap.Lock()
369 defer kp.lockTopicResponseChannelMap.Unlock()
370 _, exist := kp.topicToResponseChannelMap[topic]
371 return exist
372}
373
374func (kp *InterContainerProxy) deleteFromTopicResponseChannelMap(topic string) error {
375 kp.lockTopicResponseChannelMap.Lock()
376 defer kp.lockTopicResponseChannelMap.Unlock()
377 if _, exist := kp.topicToResponseChannelMap[topic]; exist {
378 // Unsubscribe to this topic first - this will close the subscribed channel
379 var err error
380 if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
381 log.Errorw("unsubscribing-error", log.Fields{"topic": topic})
382 }
383 delete(kp.topicToResponseChannelMap, topic)
384 return err
385 } else {
386 return errors.New(fmt.Sprintf("%s-Topic-not-found", topic))
khenaidooabad44c2018-08-03 16:58:35 -0400387 }
388}
389
khenaidoo43c82122018-11-22 18:38:28 -0500390func (kp *InterContainerProxy) deleteAllTopicResponseChannelMap() error {
391 kp.lockTopicResponseChannelMap.Lock()
392 defer kp.lockTopicResponseChannelMap.Unlock()
393 var err error
394 for topic, _ := range kp.topicToResponseChannelMap {
395 // Unsubscribe to this topic first - this will close the subscribed channel
396 if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
397 log.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
398 }
399 delete(kp.topicToResponseChannelMap, topic)
khenaidooabad44c2018-08-03 16:58:35 -0400400 }
khenaidoo43c82122018-11-22 18:38:28 -0500401 return err
khenaidooabad44c2018-08-03 16:58:35 -0400402}
403
khenaidoo43c82122018-11-22 18:38:28 -0500404func (kp *InterContainerProxy) addToTopicRequestHandlerChannelMap(topic string, arg *requestHandlerChannel) {
405 kp.lockTopicRequestHandlerChannelMap.Lock()
406 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
407 if _, exist := kp.topicToRequestHandlerChannelMap[topic]; !exist {
408 kp.topicToRequestHandlerChannelMap[topic] = arg
khenaidooabad44c2018-08-03 16:58:35 -0400409 }
khenaidooabad44c2018-08-03 16:58:35 -0400410}
411
khenaidoo43c82122018-11-22 18:38:28 -0500412func (kp *InterContainerProxy) deleteFromTopicRequestHandlerChannelMap(topic string) error {
413 kp.lockTopicRequestHandlerChannelMap.Lock()
414 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
415 if _, exist := kp.topicToRequestHandlerChannelMap[topic]; exist {
416 // Close the kafka client client first by unsubscribing to this topic
417 kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch)
418 delete(kp.topicToRequestHandlerChannelMap, topic)
khenaidooabad44c2018-08-03 16:58:35 -0400419 return nil
khenaidoo43c82122018-11-22 18:38:28 -0500420 } else {
421 return errors.New(fmt.Sprintf("%s-Topic-not-found", topic))
khenaidooabad44c2018-08-03 16:58:35 -0400422 }
khenaidooabad44c2018-08-03 16:58:35 -0400423}
424
khenaidoo43c82122018-11-22 18:38:28 -0500425func (kp *InterContainerProxy) deleteAllTopicRequestHandlerChannelMap() error {
426 kp.lockTopicRequestHandlerChannelMap.Lock()
427 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
428 var err error
429 for topic, _ := range kp.topicToRequestHandlerChannelMap {
430 // Close the kafka client client first by unsubscribing to this topic
431 if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
432 log.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
433 }
434 delete(kp.topicToRequestHandlerChannelMap, topic)
435 }
436 return err
437}
438
khenaidoo79232702018-12-04 11:00:41 -0500439func (kp *InterContainerProxy) addToTransactionIdToChannelMap(id string, topic *Topic, arg chan *ic.InterContainerMessage) {
khenaidooabad44c2018-08-03 16:58:35 -0400440 kp.lockTransactionIdToChannelMap.Lock()
441 defer kp.lockTransactionIdToChannelMap.Unlock()
442 if _, exist := kp.transactionIdToChannelMap[id]; !exist {
khenaidoo43c82122018-11-22 18:38:28 -0500443 kp.transactionIdToChannelMap[id] = &transactionChannel{topic: topic, ch: arg}
khenaidooabad44c2018-08-03 16:58:35 -0400444 }
445}
446
khenaidoo43c82122018-11-22 18:38:28 -0500447func (kp *InterContainerProxy) deleteFromTransactionIdToChannelMap(id string) {
khenaidooabad44c2018-08-03 16:58:35 -0400448 kp.lockTransactionIdToChannelMap.Lock()
449 defer kp.lockTransactionIdToChannelMap.Unlock()
khenaidoo43c82122018-11-22 18:38:28 -0500450 if transChannel, exist := kp.transactionIdToChannelMap[id]; exist {
451 // Close the channel first
452 close(transChannel.ch)
khenaidooabad44c2018-08-03 16:58:35 -0400453 delete(kp.transactionIdToChannelMap, id)
454 }
455}
456
khenaidoo43c82122018-11-22 18:38:28 -0500457func (kp *InterContainerProxy) deleteTopicTransactionIdToChannelMap(id string) {
458 kp.lockTransactionIdToChannelMap.Lock()
459 defer kp.lockTransactionIdToChannelMap.Unlock()
460 for key, value := range kp.transactionIdToChannelMap {
461 if value.topic.Name == id {
462 close(value.ch)
463 delete(kp.transactionIdToChannelMap, key)
khenaidooabad44c2018-08-03 16:58:35 -0400464 }
465 }
khenaidooabad44c2018-08-03 16:58:35 -0400466}
467
khenaidoo43c82122018-11-22 18:38:28 -0500468func (kp *InterContainerProxy) deleteAllTransactionIdToChannelMap() {
469 kp.lockTransactionIdToChannelMap.Lock()
470 defer kp.lockTransactionIdToChannelMap.Unlock()
471 for key, value := range kp.transactionIdToChannelMap {
472 close(value.ch)
473 delete(kp.transactionIdToChannelMap, key)
khenaidooabad44c2018-08-03 16:58:35 -0400474 }
khenaidooabad44c2018-08-03 16:58:35 -0400475}
476
khenaidoo43c82122018-11-22 18:38:28 -0500477func (kp *InterContainerProxy) DeleteTopic(topic Topic) error {
478 // If we have any consumers on that topic we need to close them
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500479 if err := kp.deleteFromTopicResponseChannelMap(topic.Name); err != nil {
480 log.Errorw("delete-from-topic-responsechannelmap-failed", log.Fields{"error": err})
481 }
482 if err := kp.deleteFromTopicRequestHandlerChannelMap(topic.Name); err != nil {
483 log.Errorw("delete-from-topic-requesthandlerchannelmap-failed", log.Fields{"error": err})
484 }
khenaidoo43c82122018-11-22 18:38:28 -0500485 kp.deleteTopicTransactionIdToChannelMap(topic.Name)
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500486
khenaidoo43c82122018-11-22 18:38:28 -0500487 return kp.kafkaClient.DeleteTopic(&topic)
488}
489
490func encodeReturnedValue(returnedVal interface{}) (*any.Any, error) {
khenaidooabad44c2018-08-03 16:58:35 -0400491 // Encode the response argument - needs to be a proto message
492 if returnedVal == nil {
493 return nil, nil
494 }
495 protoValue, ok := returnedVal.(proto.Message)
496 if !ok {
497 log.Warnw("response-value-not-proto-message", log.Fields{"error": ok, "returnVal": returnedVal})
498 err := errors.New("response-value-not-proto-message")
499 return nil, err
500 }
501
502 // Marshal the returned value, if any
503 var marshalledReturnedVal *any.Any
504 var err error
505 if marshalledReturnedVal, err = ptypes.MarshalAny(protoValue); err != nil {
506 log.Warnw("cannot-marshal-returned-val", log.Fields{"error": err})
507 return nil, err
508 }
509 return marshalledReturnedVal, nil
510}
511
khenaidoo79232702018-12-04 11:00:41 -0500512func encodeDefaultFailedResponse(request *ic.InterContainerMessage) *ic.InterContainerMessage {
513 responseHeader := &ic.Header{
khenaidooabad44c2018-08-03 16:58:35 -0400514 Id: request.Header.Id,
khenaidoo79232702018-12-04 11:00:41 -0500515 Type: ic.MessageType_RESPONSE,
khenaidooabad44c2018-08-03 16:58:35 -0400516 FromTopic: request.Header.ToTopic,
517 ToTopic: request.Header.FromTopic,
518 Timestamp: time.Now().Unix(),
519 }
khenaidoo79232702018-12-04 11:00:41 -0500520 responseBody := &ic.InterContainerResponseBody{
khenaidooabad44c2018-08-03 16:58:35 -0400521 Success: false,
522 Result: nil,
523 }
524 var marshalledResponseBody *any.Any
525 var err error
526 // Error should never happen here
527 if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
528 log.Warnw("cannot-marshal-failed-response-body", log.Fields{"error": err})
529 }
530
khenaidoo79232702018-12-04 11:00:41 -0500531 return &ic.InterContainerMessage{
khenaidooabad44c2018-08-03 16:58:35 -0400532 Header: responseHeader,
533 Body: marshalledResponseBody,
534 }
535
536}
537
538//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
539//or an error on failure
khenaidoo79232702018-12-04 11:00:41 -0500540func encodeResponse(request *ic.InterContainerMessage, success bool, returnedValues ...interface{}) (*ic.InterContainerMessage, error) {
khenaidoo43c82122018-11-22 18:38:28 -0500541 //log.Debugw("encodeResponse", log.Fields{"success": success, "returnedValues": returnedValues})
khenaidoo79232702018-12-04 11:00:41 -0500542 responseHeader := &ic.Header{
khenaidooabad44c2018-08-03 16:58:35 -0400543 Id: request.Header.Id,
khenaidoo79232702018-12-04 11:00:41 -0500544 Type: ic.MessageType_RESPONSE,
khenaidooabad44c2018-08-03 16:58:35 -0400545 FromTopic: request.Header.ToTopic,
546 ToTopic: request.Header.FromTopic,
547 Timestamp: time.Now().Unix(),
548 }
549
550 // Go over all returned values
551 var marshalledReturnedVal *any.Any
552 var err error
553 for _, returnVal := range returnedValues {
khenaidoo43c82122018-11-22 18:38:28 -0500554 if marshalledReturnedVal, err = encodeReturnedValue(returnVal); err != nil {
khenaidooabad44c2018-08-03 16:58:35 -0400555 log.Warnw("cannot-marshal-response-body", log.Fields{"error": err})
556 }
557 break // for now we support only 1 returned value - (excluding the error)
558 }
559
khenaidoo79232702018-12-04 11:00:41 -0500560 responseBody := &ic.InterContainerResponseBody{
khenaidooabad44c2018-08-03 16:58:35 -0400561 Success: success,
562 Result: marshalledReturnedVal,
563 }
564
565 // Marshal the response body
566 var marshalledResponseBody *any.Any
567 if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
568 log.Warnw("cannot-marshal-response-body", log.Fields{"error": err})
569 return nil, err
570 }
571
khenaidoo79232702018-12-04 11:00:41 -0500572 return &ic.InterContainerMessage{
khenaidooabad44c2018-08-03 16:58:35 -0400573 Header: responseHeader,
574 Body: marshalledResponseBody,
575 }, nil
576}
577
578func CallFuncByName(myClass interface{}, funcName string, params ...interface{}) (out []reflect.Value, err error) {
579 myClassValue := reflect.ValueOf(myClass)
khenaidoo19374072018-12-11 11:05:15 -0500580 // Capitalize the first letter in the funcName to workaround the first capital letters required to
581 // invoke a function from a different package
582 funcName = strings.Title(funcName)
khenaidooabad44c2018-08-03 16:58:35 -0400583 m := myClassValue.MethodByName(funcName)
584 if !m.IsValid() {
khenaidoo43c82122018-11-22 18:38:28 -0500585 return make([]reflect.Value, 0), fmt.Errorf("method-not-found \"%s\"", funcName)
khenaidooabad44c2018-08-03 16:58:35 -0400586 }
587 in := make([]reflect.Value, len(params))
588 for i, param := range params {
589 in[i] = reflect.ValueOf(param)
590 }
591 out = m.Call(in)
592 return
593}
594
khenaidoo79232702018-12-04 11:00:41 -0500595func (kp *InterContainerProxy) handleRequest(msg *ic.InterContainerMessage, targetInterface interface{}) {
khenaidooabad44c2018-08-03 16:58:35 -0400596
khenaidoo43c82122018-11-22 18:38:28 -0500597 // First extract the header to know whether this is a request - responses are handled by a different handler
khenaidoo79232702018-12-04 11:00:41 -0500598 if msg.Header.Type == ic.MessageType_REQUEST {
khenaidooabad44c2018-08-03 16:58:35 -0400599
600 var out []reflect.Value
601 var err error
602
603 // Get the request body
khenaidoo79232702018-12-04 11:00:41 -0500604 requestBody := &ic.InterContainerRequestBody{}
khenaidooabad44c2018-08-03 16:58:35 -0400605 if err = ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
606 log.Warnw("cannot-unmarshal-request", log.Fields{"error": err})
607 } else {
khenaidoo43c82122018-11-22 18:38:28 -0500608 log.Debugw("received-request", log.Fields{"rpc": requestBody.Rpc, "header": msg.Header})
khenaidooabad44c2018-08-03 16:58:35 -0400609 // let the callee unpack the arguments as its the only one that knows the real proto type
610 out, err = CallFuncByName(targetInterface, requestBody.Rpc, requestBody.Args)
611 if err != nil {
612 log.Warn(err)
613 }
614 }
615 // Response required?
616 if requestBody.ResponseRequired {
617 // If we already have an error before then just return that
khenaidoo79232702018-12-04 11:00:41 -0500618 var returnError *ic.Error
khenaidooabad44c2018-08-03 16:58:35 -0400619 var returnedValues []interface{}
620 var success bool
621 if err != nil {
khenaidoo79232702018-12-04 11:00:41 -0500622 returnError = &ic.Error{Reason: err.Error()}
khenaidooabad44c2018-08-03 16:58:35 -0400623 returnedValues = make([]interface{}, 1)
624 returnedValues[0] = returnError
625 } else {
khenaidoo43c82122018-11-22 18:38:28 -0500626 //log.Debugw("returned-api-response", log.Fields{"len": len(out), "err": err})
khenaidoob9203542018-09-17 22:56:37 -0400627 returnedValues = make([]interface{}, 0)
628 // Check for errors first
629 lastIndex := len(out) - 1
630 if out[lastIndex].Interface() != nil { // Error
631 if goError, ok := out[lastIndex].Interface().(error); ok {
khenaidoo79232702018-12-04 11:00:41 -0500632 returnError = &ic.Error{Reason: goError.Error()}
khenaidoob9203542018-09-17 22:56:37 -0400633 returnedValues = append(returnedValues, returnError)
634 } else { // Should never happen
khenaidoo79232702018-12-04 11:00:41 -0500635 returnError = &ic.Error{Reason: "incorrect-error-returns"}
khenaidoob9203542018-09-17 22:56:37 -0400636 returnedValues = append(returnedValues, returnError)
637 }
638 } else { // Non-error case
639 success = true
640 for idx, val := range out {
khenaidoo43c82122018-11-22 18:38:28 -0500641 //log.Debugw("returned-api-response-loop", log.Fields{"idx": idx, "val": val.Interface()})
khenaidoob9203542018-09-17 22:56:37 -0400642 if idx != lastIndex {
643 returnedValues = append(returnedValues, val.Interface())
khenaidooabad44c2018-08-03 16:58:35 -0400644 }
khenaidooabad44c2018-08-03 16:58:35 -0400645 }
646 }
647 }
648
khenaidoo79232702018-12-04 11:00:41 -0500649 var icm *ic.InterContainerMessage
khenaidooabad44c2018-08-03 16:58:35 -0400650 if icm, err = encodeResponse(msg, success, returnedValues...); err != nil {
khenaidoob9203542018-09-17 22:56:37 -0400651 log.Warnw("error-encoding-response-returning-failure-result", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400652 icm = encodeDefaultFailedResponse(msg)
653 }
khenaidoo43c82122018-11-22 18:38:28 -0500654 // To preserve ordering of messages, all messages to a given topic are sent to the same partition
655 // by providing a message key. The key is encoded in the topic name. If the deviceId is not
656 // present then the key will be empty, hence all messages for a given topic will be sent to all
657 // partitions.
658 replyTopic := &Topic{Name: msg.Header.FromTopic}
659 key := GetDeviceIdFromTopic(*replyTopic)
khenaidoo90847922018-12-03 14:47:51 -0500660 log.Debugw("sending-response-to-kafka", log.Fields{"rpc": requestBody.Rpc, "header": icm.Header, "key": key})
khenaidoo43c82122018-11-22 18:38:28 -0500661 // TODO: handle error response.
662 kp.kafkaClient.Send(icm, replyTopic, key)
khenaidooabad44c2018-08-03 16:58:35 -0400663 }
664
khenaidooabad44c2018-08-03 16:58:35 -0400665 }
666}
667
khenaidoo79232702018-12-04 11:00:41 -0500668func (kp *InterContainerProxy) waitForRequest(ch <-chan *ic.InterContainerMessage, topic Topic, targetInterface interface{}) {
khenaidooabad44c2018-08-03 16:58:35 -0400669 // Wait for messages
670 for msg := range ch {
khenaidoo43c82122018-11-22 18:38:28 -0500671 //log.Debugw("request-received", log.Fields{"msg": msg, "topic": topic.Name, "target": targetInterface})
khenaidooabad44c2018-08-03 16:58:35 -0400672 go kp.handleRequest(msg, targetInterface)
673 }
674}
675
khenaidoo79232702018-12-04 11:00:41 -0500676func (kp *InterContainerProxy) dispatchResponse(msg *ic.InterContainerMessage) {
khenaidooabad44c2018-08-03 16:58:35 -0400677 kp.lockTransactionIdToChannelMap.Lock()
678 defer kp.lockTransactionIdToChannelMap.Unlock()
679 if _, exist := kp.transactionIdToChannelMap[msg.Header.Id]; !exist {
680 log.Debugw("no-waiting-channel", log.Fields{"transaction": msg.Header.Id})
681 return
682 }
khenaidoo43c82122018-11-22 18:38:28 -0500683 kp.transactionIdToChannelMap[msg.Header.Id].ch <- msg
khenaidooabad44c2018-08-03 16:58:35 -0400684}
685
khenaidoo43c82122018-11-22 18:38:28 -0500686// waitForResponse listens for messages on the subscribedCh, ensure we get a response with the transaction ID,
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500687// and then dispatches to the consumers
khenaidoo79232702018-12-04 11:00:41 -0500688func (kp *InterContainerProxy) waitForResponseLoop(subscribedCh <-chan *ic.InterContainerMessage, topic *Topic) {
khenaidoo43c82122018-11-22 18:38:28 -0500689 log.Debugw("starting-response-loop-for-topic", log.Fields{"topic": topic.Name})
khenaidooabad44c2018-08-03 16:58:35 -0400690startloop:
691 for {
692 select {
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500693 case msg, ok := <-subscribedCh:
694 if !ok {
695 log.Debugw("channel-closed", log.Fields{"topic": topic.Name})
696 break startloop
697 }
698 log.Debugw("message-received", log.Fields{"msg": msg})
699 //log.Debugw("message-received", log.Fields{"msg": msg, "fromTopic": msg.Header.FromTopic})
khenaidoo79232702018-12-04 11:00:41 -0500700 if msg.Header.Type == ic.MessageType_RESPONSE {
khenaidoo43c82122018-11-22 18:38:28 -0500701 go kp.dispatchResponse(msg)
702 }
khenaidooabad44c2018-08-03 16:58:35 -0400703 case <-kp.doneCh:
704 log.Infow("received-exit-signal", log.Fields{"topic": topic.Name})
705 break startloop
706 }
707 }
khenaidoo43c82122018-11-22 18:38:28 -0500708 //log.Infow("received-exit-signal-out-of-for-loop", log.Fields{"topic": topic.Name})
709 // We got an exit signal. Unsubscribe to the channel
710 //kp.kafkaClient.UnSubscribe(topic, subscribedCh)
khenaidooabad44c2018-08-03 16:58:35 -0400711}
712
713// subscribeForResponse allows a caller to subscribe to a given topic when waiting for a response.
714// This method is built to prevent all subscribers to receive all messages as is the case of the Subscribe
715// API. There is one response channel waiting for kafka messages before dispatching the message to the
716// corresponding waiting channel
khenaidoo79232702018-12-04 11:00:41 -0500717func (kp *InterContainerProxy) subscribeForResponse(topic Topic, trnsId string) (chan *ic.InterContainerMessage, error) {
khenaidoob9203542018-09-17 22:56:37 -0400718 log.Debugw("subscribeForResponse", log.Fields{"topic": topic.Name, "trnsid": trnsId})
khenaidooabad44c2018-08-03 16:58:35 -0400719
khenaidoo43c82122018-11-22 18:38:28 -0500720 // First check whether we already have a channel listening for response on that topic. If there is
721 // already one then it will be reused. If not, it will be created.
722 if !kp.isTopicSubscribedForResponse(topic.Name) {
khenaidooca301322019-01-09 23:06:32 -0500723 log.Debugw("not-subscribed-for-response", log.Fields{"topic": topic.Name, "trnsid": trnsId})
khenaidoo79232702018-12-04 11:00:41 -0500724 var subscribedCh <-chan *ic.InterContainerMessage
khenaidooabad44c2018-08-03 16:58:35 -0400725 var err error
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500726 if subscribedCh, err = kp.kafkaClient.Subscribe(&topic); err != nil {
khenaidoo43c82122018-11-22 18:38:28 -0500727 log.Debugw("subscribe-failure", log.Fields{"topic": topic.Name})
khenaidooabad44c2018-08-03 16:58:35 -0400728 return nil, err
729 }
khenaidoo43c82122018-11-22 18:38:28 -0500730 kp.setupTopicResponseChannelMap(topic.Name, subscribedCh)
731 go kp.waitForResponseLoop(subscribedCh, &topic)
khenaidooabad44c2018-08-03 16:58:35 -0400732 }
733
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500734 // Create a specific channel for this consumers. We cannot use the channel from the kafkaclient as it will
khenaidoo43c82122018-11-22 18:38:28 -0500735 // broadcast any message for this topic to all channels waiting on it.
khenaidoo79232702018-12-04 11:00:41 -0500736 ch := make(chan *ic.InterContainerMessage)
khenaidoo43c82122018-11-22 18:38:28 -0500737 kp.addToTransactionIdToChannelMap(trnsId, &topic, ch)
khenaidooabad44c2018-08-03 16:58:35 -0400738
739 return ch, nil
740}
741
khenaidoo43c82122018-11-22 18:38:28 -0500742func (kp *InterContainerProxy) unSubscribeForResponse(trnsId string) error {
khenaidooabad44c2018-08-03 16:58:35 -0400743 log.Debugw("unsubscribe-for-response", log.Fields{"trnsId": trnsId})
khenaidoo7ff26c72019-01-16 14:55:48 -0500744 kp.deleteFromTransactionIdToChannelMap(trnsId)
khenaidooabad44c2018-08-03 16:58:35 -0400745 return nil
746}
747
748//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
749//or an error on failure
khenaidoo79232702018-12-04 11:00:41 -0500750func encodeRequest(rpc string, toTopic *Topic, replyTopic *Topic, kvArgs ...*KVArg) (*ic.InterContainerMessage, error) {
751 requestHeader := &ic.Header{
khenaidooabad44c2018-08-03 16:58:35 -0400752 Id: uuid.New().String(),
khenaidoo79232702018-12-04 11:00:41 -0500753 Type: ic.MessageType_REQUEST,
khenaidooabad44c2018-08-03 16:58:35 -0400754 FromTopic: replyTopic.Name,
755 ToTopic: toTopic.Name,
756 Timestamp: time.Now().Unix(),
757 }
khenaidoo79232702018-12-04 11:00:41 -0500758 requestBody := &ic.InterContainerRequestBody{
khenaidooabad44c2018-08-03 16:58:35 -0400759 Rpc: rpc,
760 ResponseRequired: true,
761 ReplyToTopic: replyTopic.Name,
762 }
763
764 for _, arg := range kvArgs {
khenaidoo2c6f1672018-09-20 23:14:41 -0400765 if arg == nil {
766 // In case the caller sends an array with empty args
767 continue
768 }
khenaidooabad44c2018-08-03 16:58:35 -0400769 var marshalledArg *any.Any
770 var err error
771 // ascertain the value interface type is a proto.Message
772 protoValue, ok := arg.Value.(proto.Message)
773 if !ok {
774 log.Warnw("argument-value-not-proto-message", log.Fields{"error": ok, "Value": arg.Value})
775 err := errors.New("argument-value-not-proto-message")
776 return nil, err
777 }
778 if marshalledArg, err = ptypes.MarshalAny(protoValue); err != nil {
779 log.Warnw("cannot-marshal-request", log.Fields{"error": err})
780 return nil, err
781 }
khenaidoo79232702018-12-04 11:00:41 -0500782 protoArg := &ic.Argument{
khenaidooabad44c2018-08-03 16:58:35 -0400783 Key: arg.Key,
784 Value: marshalledArg,
785 }
786 requestBody.Args = append(requestBody.Args, protoArg)
787 }
788
789 var marshalledData *any.Any
790 var err error
791 if marshalledData, err = ptypes.MarshalAny(requestBody); err != nil {
792 log.Warnw("cannot-marshal-request", log.Fields{"error": err})
793 return nil, err
794 }
khenaidoo79232702018-12-04 11:00:41 -0500795 request := &ic.InterContainerMessage{
khenaidooabad44c2018-08-03 16:58:35 -0400796 Header: requestHeader,
797 Body: marshalledData,
798 }
799 return request, nil
800}
801
khenaidoo79232702018-12-04 11:00:41 -0500802func decodeResponse(response *ic.InterContainerMessage) (*ic.InterContainerResponseBody, error) {
khenaidooabad44c2018-08-03 16:58:35 -0400803 // Extract the message body
khenaidoo79232702018-12-04 11:00:41 -0500804 responseBody := ic.InterContainerResponseBody{}
khenaidooabad44c2018-08-03 16:58:35 -0400805 if err := ptypes.UnmarshalAny(response.Body, &responseBody); err != nil {
806 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
807 return nil, err
808 }
khenaidoo43c82122018-11-22 18:38:28 -0500809 //log.Debugw("response-decoded-successfully", log.Fields{"response-status": &responseBody.Success})
khenaidooabad44c2018-08-03 16:58:35 -0400810
811 return &responseBody, nil
812
813}