blob: 898548ee970e395a07144e7b498dadc35cdf7350 [file] [log] [blame]
Scott Baker2c1c4822019-10-16 11:02:41 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kafka
17
18import (
19 "context"
20 "errors"
21 "fmt"
Scott Baker2c1c4822019-10-16 11:02:41 -070022 "reflect"
23 "strings"
24 "sync"
25 "time"
serkant.uluderyab38671c2019-11-01 09:35:38 -070026
27 "github.com/golang/protobuf/proto"
28 "github.com/golang/protobuf/ptypes"
29 "github.com/golang/protobuf/ptypes/any"
30 "github.com/google/uuid"
31 "github.com/opencord/voltha-lib-go/v3/pkg/log"
32 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
Scott Baker2c1c4822019-10-16 11:02:41 -070033)
34
Scott Baker2c1c4822019-10-16 11:02:41 -070035const (
36 DefaultMaxRetries = 3
37 DefaultRequestTimeout = 10000 // 10000 milliseconds - to handle a wider latency range
38)
39
40const (
41 TransactionKey = "transactionID"
42 FromTopic = "fromTopic"
43)
44
45var ErrorTransactionNotAcquired = errors.New("transaction-not-acquired")
46var ErrorTransactionInvalidId = errors.New("transaction-invalid-id")
47
48// requestHandlerChannel represents an interface associated with a channel. Whenever, an event is
49// obtained from that channel, this interface is invoked. This is used to handle
50// async requests into the Core via the kafka messaging bus
51type requestHandlerChannel struct {
52 requesthandlerInterface interface{}
53 ch <-chan *ic.InterContainerMessage
54}
55
56// transactionChannel represents a combination of a topic and a channel onto which a response received
57// on the kafka bus will be sent to
58type transactionChannel struct {
59 topic *Topic
60 ch chan *ic.InterContainerMessage
61}
62
Matteo Scandolo2ba00d32020-01-16 17:33:03 -080063type InterContainerProxy interface {
64 Start() error
65 Stop()
Matteo Scandolof346a2d2020-01-24 13:14:54 -080066 GetDefaultTopic() *Topic
Matteo Scandolo2ba00d32020-01-16 17:33:03 -080067 DeviceDiscovered(deviceId string, deviceType string, parentId string, publisher string) error
68 InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic, waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any)
69 SubscribeWithRequestHandlerInterface(topic Topic, handler interface{}) error
70 SubscribeWithDefaultRequestHandler(topic Topic, initialOffset int64) error
71 UnSubscribeFromRequestHandler(topic Topic) error
72 DeleteTopic(topic Topic) error
Matteo Scandolof346a2d2020-01-24 13:14:54 -080073 EnableLivenessChannel(enable bool) chan bool
74 SendLiveness() error
Matteo Scandolo2ba00d32020-01-16 17:33:03 -080075}
76
77// interContainerProxy represents the messaging proxy
78type interContainerProxy struct {
Scott Baker2c1c4822019-10-16 11:02:41 -070079 kafkaHost string
80 kafkaPort int
Matteo Scandolof346a2d2020-01-24 13:14:54 -080081 defaultTopic *Topic
Scott Baker2c1c4822019-10-16 11:02:41 -070082 defaultRequestHandlerInterface interface{}
83 deviceDiscoveryTopic *Topic
84 kafkaClient Client
85 doneCh chan int
86
87 // This map is used to map a topic to an interface and channel. When a request is received
88 // on that channel (registered to the topic) then that interface is invoked.
89 topicToRequestHandlerChannelMap map[string]*requestHandlerChannel
90 lockTopicRequestHandlerChannelMap sync.RWMutex
91
92 // This map is used to map a channel to a response topic. This channel handles all responses on that
93 // channel for that topic and forward them to the appropriate consumers channel, using the
94 // transactionIdToChannelMap.
95 topicToResponseChannelMap map[string]<-chan *ic.InterContainerMessage
96 lockTopicResponseChannelMap sync.RWMutex
97
98 // This map is used to map a transaction to a consumers channel. This is used whenever a request has been
99 // sent out and we are waiting for a response.
100 transactionIdToChannelMap map[string]*transactionChannel
101 lockTransactionIdToChannelMap sync.RWMutex
102}
103
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800104type InterContainerProxyOption func(*interContainerProxy)
Scott Baker2c1c4822019-10-16 11:02:41 -0700105
106func InterContainerHost(host string) InterContainerProxyOption {
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800107 return func(args *interContainerProxy) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700108 args.kafkaHost = host
109 }
110}
111
112func InterContainerPort(port int) InterContainerProxyOption {
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800113 return func(args *interContainerProxy) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700114 args.kafkaPort = port
115 }
116}
117
118func DefaultTopic(topic *Topic) InterContainerProxyOption {
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800119 return func(args *interContainerProxy) {
Matteo Scandolof346a2d2020-01-24 13:14:54 -0800120 args.defaultTopic = topic
Scott Baker2c1c4822019-10-16 11:02:41 -0700121 }
122}
123
124func DeviceDiscoveryTopic(topic *Topic) InterContainerProxyOption {
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800125 return func(args *interContainerProxy) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700126 args.deviceDiscoveryTopic = topic
127 }
128}
129
130func RequestHandlerInterface(handler interface{}) InterContainerProxyOption {
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800131 return func(args *interContainerProxy) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700132 args.defaultRequestHandlerInterface = handler
133 }
134}
135
136func MsgClient(client Client) InterContainerProxyOption {
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800137 return func(args *interContainerProxy) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700138 args.kafkaClient = client
139 }
140}
141
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800142func newInterContainerProxy(opts ...InterContainerProxyOption) (*interContainerProxy, error) {
143 proxy := &interContainerProxy{
Scott Baker2c1c4822019-10-16 11:02:41 -0700144 kafkaHost: DefaultKafkaHost,
145 kafkaPort: DefaultKafkaPort,
146 }
147
148 for _, option := range opts {
149 option(proxy)
150 }
151
152 // Create the locks for all the maps
153 proxy.lockTopicRequestHandlerChannelMap = sync.RWMutex{}
154 proxy.lockTransactionIdToChannelMap = sync.RWMutex{}
155 proxy.lockTopicResponseChannelMap = sync.RWMutex{}
156
157 return proxy, nil
158}
159
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800160func NewInterContainerProxy(opts ...InterContainerProxyOption) (InterContainerProxy, error) {
161 return newInterContainerProxy(opts...)
162}
163
164func (kp *interContainerProxy) Start() error {
khenaidoob332f9b2020-01-16 16:25:26 -0500165 logger.Info("Starting-Proxy")
Scott Baker2c1c4822019-10-16 11:02:41 -0700166
167 // Kafka MsgClient should already have been created. If not, output fatal error
168 if kp.kafkaClient == nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500169 logger.Fatal("kafka-client-not-set")
Scott Baker2c1c4822019-10-16 11:02:41 -0700170 }
171
172 // Create the Done channel
173 kp.doneCh = make(chan int, 1)
174
175 // Start the kafka client
176 if err := kp.kafkaClient.Start(); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500177 logger.Errorw("Cannot-create-kafka-proxy", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700178 return err
179 }
180
181 // Create the topic to response channel map
182 kp.topicToResponseChannelMap = make(map[string]<-chan *ic.InterContainerMessage)
183 //
184 // Create the transactionId to Channel Map
185 kp.transactionIdToChannelMap = make(map[string]*transactionChannel)
186
187 // Create the topic to request channel map
188 kp.topicToRequestHandlerChannelMap = make(map[string]*requestHandlerChannel)
189
190 return nil
191}
192
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800193func (kp *interContainerProxy) Stop() {
khenaidoob332f9b2020-01-16 16:25:26 -0500194 logger.Info("stopping-intercontainer-proxy")
Scott Baker2c1c4822019-10-16 11:02:41 -0700195 kp.doneCh <- 1
196 // TODO : Perform cleanup
197 kp.kafkaClient.Stop()
198 //kp.deleteAllTopicRequestHandlerChannelMap()
199 //kp.deleteAllTopicResponseChannelMap()
200 //kp.deleteAllTransactionIdToChannelMap()
201}
202
Matteo Scandolof346a2d2020-01-24 13:14:54 -0800203func (kp *interContainerProxy) GetDefaultTopic() *Topic {
204 return kp.defaultTopic
205}
206
Scott Baker2c1c4822019-10-16 11:02:41 -0700207// DeviceDiscovered publish the discovered device onto the kafka messaging bus
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800208func (kp *interContainerProxy) DeviceDiscovered(deviceId string, deviceType string, parentId string, publisher string) error {
khenaidoob332f9b2020-01-16 16:25:26 -0500209 logger.Debugw("sending-device-discovery-msg", log.Fields{"deviceId": deviceId})
Scott Baker2c1c4822019-10-16 11:02:41 -0700210 // Simple validation
211 if deviceId == "" || deviceType == "" {
khenaidoob332f9b2020-01-16 16:25:26 -0500212 logger.Errorw("invalid-parameters", log.Fields{"id": deviceId, "type": deviceType})
Scott Baker2c1c4822019-10-16 11:02:41 -0700213 return errors.New("invalid-parameters")
214 }
215 // Create the device discovery message
216 header := &ic.Header{
217 Id: uuid.New().String(),
218 Type: ic.MessageType_DEVICE_DISCOVERED,
Matteo Scandolof346a2d2020-01-24 13:14:54 -0800219 FromTopic: kp.defaultTopic.Name,
Scott Baker2c1c4822019-10-16 11:02:41 -0700220 ToTopic: kp.deviceDiscoveryTopic.Name,
221 Timestamp: time.Now().UnixNano(),
222 }
223 body := &ic.DeviceDiscovered{
224 Id: deviceId,
225 DeviceType: deviceType,
226 ParentId: parentId,
227 Publisher: publisher,
228 }
229
230 var marshalledData *any.Any
231 var err error
232 if marshalledData, err = ptypes.MarshalAny(body); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500233 logger.Errorw("cannot-marshal-request", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700234 return err
235 }
236 msg := &ic.InterContainerMessage{
237 Header: header,
238 Body: marshalledData,
239 }
240
241 // Send the message
242 if err := kp.kafkaClient.Send(msg, kp.deviceDiscoveryTopic); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500243 logger.Errorw("cannot-send-device-discovery-message", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700244 return err
245 }
246 return nil
247}
248
249// InvokeRPC is used to send a request to a given topic
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800250func (kp *interContainerProxy) InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
Scott Baker2c1c4822019-10-16 11:02:41 -0700251 waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any) {
252
253 // If a replyToTopic is provided then we use it, otherwise just use the default toTopic. The replyToTopic is
254 // typically the device ID.
255 responseTopic := replyToTopic
256 if responseTopic == nil {
Matteo Scandolof346a2d2020-01-24 13:14:54 -0800257 responseTopic = kp.defaultTopic
Scott Baker2c1c4822019-10-16 11:02:41 -0700258 }
259
260 // Encode the request
261 protoRequest, err := encodeRequest(rpc, toTopic, responseTopic, key, kvArgs...)
262 if err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500263 logger.Warnw("cannot-format-request", log.Fields{"rpc": rpc, "error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700264 return false, nil
265 }
266
267 // Subscribe for response, if needed, before sending request
268 var ch <-chan *ic.InterContainerMessage
269 if waitForResponse {
270 var err error
271 if ch, err = kp.subscribeForResponse(*responseTopic, protoRequest.Header.Id); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500272 logger.Errorw("failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700273 }
274 }
275
276 // Send request - if the topic is formatted with a device Id then we will send the request using a
277 // specific key, hence ensuring a single partition is used to publish the request. This ensures that the
278 // subscriber on that topic will receive the request in the order it was sent. The key used is the deviceId.
279 //key := GetDeviceIdFromTopic(*toTopic)
khenaidoob332f9b2020-01-16 16:25:26 -0500280 logger.Debugw("sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
Scott Baker2c1c4822019-10-16 11:02:41 -0700281 go kp.kafkaClient.Send(protoRequest, toTopic, key)
282
283 if waitForResponse {
284 // Create a child context based on the parent context, if any
285 var cancel context.CancelFunc
286 childCtx := context.Background()
287 if ctx == nil {
288 ctx, cancel = context.WithTimeout(context.Background(), DefaultRequestTimeout*time.Millisecond)
289 } else {
290 childCtx, cancel = context.WithTimeout(ctx, DefaultRequestTimeout*time.Millisecond)
291 }
292 defer cancel()
293
294 // Wait for response as well as timeout or cancellation
295 // Remove the subscription for a response on return
296 defer kp.unSubscribeForResponse(protoRequest.Header.Id)
297 select {
298 case msg, ok := <-ch:
299 if !ok {
khenaidoob332f9b2020-01-16 16:25:26 -0500300 logger.Warnw("channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700301 protoError := &ic.Error{Reason: "channel-closed"}
302 var marshalledArg *any.Any
303 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
304 return false, nil // Should never happen
305 }
306 return false, marshalledArg
307 }
khenaidoob332f9b2020-01-16 16:25:26 -0500308 logger.Debugw("received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
Scott Baker2c1c4822019-10-16 11:02:41 -0700309 var responseBody *ic.InterContainerResponseBody
310 var err error
311 if responseBody, err = decodeResponse(msg); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500312 logger.Errorw("decode-response-error", log.Fields{"error": err})
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800313 // FIXME we should return something
Scott Baker2c1c4822019-10-16 11:02:41 -0700314 }
315 return responseBody.Success, responseBody.Result
316 case <-ctx.Done():
khenaidoob332f9b2020-01-16 16:25:26 -0500317 logger.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
Scott Baker2c1c4822019-10-16 11:02:41 -0700318 // pack the error as proto any type
Matteo Scandolob45cf592020-01-21 16:10:56 -0800319 protoError := &ic.Error{Reason: ctx.Err().Error(), Code: ic.ErrorCode_DEADLINE_EXCEEDED}
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800320
Scott Baker2c1c4822019-10-16 11:02:41 -0700321 var marshalledArg *any.Any
322 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
323 return false, nil // Should never happen
324 }
325 return false, marshalledArg
326 case <-childCtx.Done():
khenaidoob332f9b2020-01-16 16:25:26 -0500327 logger.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": childCtx.Err()})
Scott Baker2c1c4822019-10-16 11:02:41 -0700328 // pack the error as proto any type
Matteo Scandolob45cf592020-01-21 16:10:56 -0800329 protoError := &ic.Error{Reason: childCtx.Err().Error(), Code: ic.ErrorCode_DEADLINE_EXCEEDED}
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800330
Scott Baker2c1c4822019-10-16 11:02:41 -0700331 var marshalledArg *any.Any
332 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
333 return false, nil // Should never happen
334 }
335 return false, marshalledArg
336 case <-kp.doneCh:
khenaidoob332f9b2020-01-16 16:25:26 -0500337 logger.Infow("received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
Scott Baker2c1c4822019-10-16 11:02:41 -0700338 return true, nil
339 }
340 }
341 return true, nil
342}
343
344// SubscribeWithRequestHandlerInterface allows a caller to assign a target object to be invoked automatically
345// when a message is received on a given topic
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800346func (kp *interContainerProxy) SubscribeWithRequestHandlerInterface(topic Topic, handler interface{}) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700347
348 // Subscribe to receive messages for that topic
349 var ch <-chan *ic.InterContainerMessage
350 var err error
351 if ch, err = kp.kafkaClient.Subscribe(&topic); err != nil {
352 //if ch, err = kp.Subscribe(topic); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500353 logger.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700354 return err
355 }
356
357 kp.defaultRequestHandlerInterface = handler
358 kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: handler, ch: ch})
359 // Launch a go routine to receive and process kafka messages
360 go kp.waitForMessages(ch, topic, handler)
361
362 return nil
363}
364
365// SubscribeWithDefaultRequestHandler allows a caller to add a topic to an existing target object to be invoked automatically
366// when a message is received on a given topic. So far there is only 1 target registered per microservice
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800367func (kp *interContainerProxy) SubscribeWithDefaultRequestHandler(topic Topic, initialOffset int64) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700368 // Subscribe to receive messages for that topic
369 var ch <-chan *ic.InterContainerMessage
370 var err error
371 if ch, err = kp.kafkaClient.Subscribe(&topic, &KVArg{Key: Offset, Value: initialOffset}); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500372 logger.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700373 return err
374 }
375 kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: kp.defaultRequestHandlerInterface, ch: ch})
376
377 // Launch a go routine to receive and process kafka messages
378 go kp.waitForMessages(ch, topic, kp.defaultRequestHandlerInterface)
379
380 return nil
381}
382
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800383func (kp *interContainerProxy) UnSubscribeFromRequestHandler(topic Topic) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700384 return kp.deleteFromTopicRequestHandlerChannelMap(topic.Name)
385}
386
387// setupTopicResponseChannelMap sets up single consumers channel that will act as a broadcast channel for all
388// responses from that topic.
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800389func (kp *interContainerProxy) setupTopicResponseChannelMap(topic string, arg <-chan *ic.InterContainerMessage) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700390 kp.lockTopicResponseChannelMap.Lock()
391 defer kp.lockTopicResponseChannelMap.Unlock()
392 if _, exist := kp.topicToResponseChannelMap[topic]; !exist {
393 kp.topicToResponseChannelMap[topic] = arg
394 }
395}
396
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800397func (kp *interContainerProxy) isTopicSubscribedForResponse(topic string) bool {
Scott Baker2c1c4822019-10-16 11:02:41 -0700398 kp.lockTopicResponseChannelMap.RLock()
399 defer kp.lockTopicResponseChannelMap.RUnlock()
400 _, exist := kp.topicToResponseChannelMap[topic]
401 return exist
402}
403
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800404func (kp *interContainerProxy) deleteFromTopicResponseChannelMap(topic string) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700405 kp.lockTopicResponseChannelMap.Lock()
406 defer kp.lockTopicResponseChannelMap.Unlock()
407 if _, exist := kp.topicToResponseChannelMap[topic]; exist {
408 // Unsubscribe to this topic first - this will close the subscribed channel
409 var err error
410 if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500411 logger.Errorw("unsubscribing-error", log.Fields{"topic": topic})
Scott Baker2c1c4822019-10-16 11:02:41 -0700412 }
413 delete(kp.topicToResponseChannelMap, topic)
414 return err
415 } else {
416 return errors.New(fmt.Sprintf("%s-Topic-not-found", topic))
417 }
418}
419
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800420func (kp *interContainerProxy) deleteAllTopicResponseChannelMap() error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700421 kp.lockTopicResponseChannelMap.Lock()
422 defer kp.lockTopicResponseChannelMap.Unlock()
423 var err error
424 for topic, _ := range kp.topicToResponseChannelMap {
425 // Unsubscribe to this topic first - this will close the subscribed channel
426 if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500427 logger.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700428 }
429 delete(kp.topicToResponseChannelMap, topic)
430 }
431 return err
432}
433
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800434func (kp *interContainerProxy) addToTopicRequestHandlerChannelMap(topic string, arg *requestHandlerChannel) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700435 kp.lockTopicRequestHandlerChannelMap.Lock()
436 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
437 if _, exist := kp.topicToRequestHandlerChannelMap[topic]; !exist {
438 kp.topicToRequestHandlerChannelMap[topic] = arg
439 }
440}
441
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800442func (kp *interContainerProxy) deleteFromTopicRequestHandlerChannelMap(topic string) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700443 kp.lockTopicRequestHandlerChannelMap.Lock()
444 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
445 if _, exist := kp.topicToRequestHandlerChannelMap[topic]; exist {
446 // Close the kafka client client first by unsubscribing to this topic
447 kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch)
448 delete(kp.topicToRequestHandlerChannelMap, topic)
449 return nil
450 } else {
451 return errors.New(fmt.Sprintf("%s-Topic-not-found", topic))
452 }
453}
454
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800455func (kp *interContainerProxy) deleteAllTopicRequestHandlerChannelMap() error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700456 kp.lockTopicRequestHandlerChannelMap.Lock()
457 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
458 var err error
459 for topic, _ := range kp.topicToRequestHandlerChannelMap {
460 // Close the kafka client client first by unsubscribing to this topic
461 if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500462 logger.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700463 }
464 delete(kp.topicToRequestHandlerChannelMap, topic)
465 }
466 return err
467}
468
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800469func (kp *interContainerProxy) addToTransactionIdToChannelMap(id string, topic *Topic, arg chan *ic.InterContainerMessage) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700470 kp.lockTransactionIdToChannelMap.Lock()
471 defer kp.lockTransactionIdToChannelMap.Unlock()
472 if _, exist := kp.transactionIdToChannelMap[id]; !exist {
473 kp.transactionIdToChannelMap[id] = &transactionChannel{topic: topic, ch: arg}
474 }
475}
476
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800477func (kp *interContainerProxy) deleteFromTransactionIdToChannelMap(id string) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700478 kp.lockTransactionIdToChannelMap.Lock()
479 defer kp.lockTransactionIdToChannelMap.Unlock()
480 if transChannel, exist := kp.transactionIdToChannelMap[id]; exist {
481 // Close the channel first
482 close(transChannel.ch)
483 delete(kp.transactionIdToChannelMap, id)
484 }
485}
486
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800487func (kp *interContainerProxy) deleteTopicTransactionIdToChannelMap(id string) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700488 kp.lockTransactionIdToChannelMap.Lock()
489 defer kp.lockTransactionIdToChannelMap.Unlock()
490 for key, value := range kp.transactionIdToChannelMap {
491 if value.topic.Name == id {
492 close(value.ch)
493 delete(kp.transactionIdToChannelMap, key)
494 }
495 }
496}
497
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800498func (kp *interContainerProxy) deleteAllTransactionIdToChannelMap() {
Scott Baker2c1c4822019-10-16 11:02:41 -0700499 kp.lockTransactionIdToChannelMap.Lock()
500 defer kp.lockTransactionIdToChannelMap.Unlock()
501 for key, value := range kp.transactionIdToChannelMap {
502 close(value.ch)
503 delete(kp.transactionIdToChannelMap, key)
504 }
505}
506
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800507func (kp *interContainerProxy) DeleteTopic(topic Topic) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700508 // If we have any consumers on that topic we need to close them
509 if err := kp.deleteFromTopicResponseChannelMap(topic.Name); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500510 logger.Errorw("delete-from-topic-responsechannelmap-failed", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700511 }
512 if err := kp.deleteFromTopicRequestHandlerChannelMap(topic.Name); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500513 logger.Errorw("delete-from-topic-requesthandlerchannelmap-failed", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700514 }
515 kp.deleteTopicTransactionIdToChannelMap(topic.Name)
516
517 return kp.kafkaClient.DeleteTopic(&topic)
518}
519
520func encodeReturnedValue(returnedVal interface{}) (*any.Any, error) {
521 // Encode the response argument - needs to be a proto message
522 if returnedVal == nil {
523 return nil, nil
524 }
525 protoValue, ok := returnedVal.(proto.Message)
526 if !ok {
khenaidoob332f9b2020-01-16 16:25:26 -0500527 logger.Warnw("response-value-not-proto-message", log.Fields{"error": ok, "returnVal": returnedVal})
Scott Baker2c1c4822019-10-16 11:02:41 -0700528 err := errors.New("response-value-not-proto-message")
529 return nil, err
530 }
531
532 // Marshal the returned value, if any
533 var marshalledReturnedVal *any.Any
534 var err error
535 if marshalledReturnedVal, err = ptypes.MarshalAny(protoValue); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500536 logger.Warnw("cannot-marshal-returned-val", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700537 return nil, err
538 }
539 return marshalledReturnedVal, nil
540}
541
542func encodeDefaultFailedResponse(request *ic.InterContainerMessage) *ic.InterContainerMessage {
543 responseHeader := &ic.Header{
544 Id: request.Header.Id,
545 Type: ic.MessageType_RESPONSE,
546 FromTopic: request.Header.ToTopic,
547 ToTopic: request.Header.FromTopic,
Kent Hagermanccfa2132019-12-17 13:29:34 -0500548 Timestamp: time.Now().UnixNano(),
Scott Baker2c1c4822019-10-16 11:02:41 -0700549 }
550 responseBody := &ic.InterContainerResponseBody{
551 Success: false,
552 Result: nil,
553 }
554 var marshalledResponseBody *any.Any
555 var err error
556 // Error should never happen here
557 if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500558 logger.Warnw("cannot-marshal-failed-response-body", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700559 }
560
561 return &ic.InterContainerMessage{
562 Header: responseHeader,
563 Body: marshalledResponseBody,
564 }
565
566}
567
568//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
569//or an error on failure
570func encodeResponse(request *ic.InterContainerMessage, success bool, returnedValues ...interface{}) (*ic.InterContainerMessage, error) {
khenaidoob332f9b2020-01-16 16:25:26 -0500571 //logger.Debugw("encodeResponse", log.Fields{"success": success, "returnedValues": returnedValues})
Scott Baker2c1c4822019-10-16 11:02:41 -0700572 responseHeader := &ic.Header{
573 Id: request.Header.Id,
574 Type: ic.MessageType_RESPONSE,
575 FromTopic: request.Header.ToTopic,
576 ToTopic: request.Header.FromTopic,
577 KeyTopic: request.Header.KeyTopic,
578 Timestamp: time.Now().UnixNano(),
579 }
580
581 // Go over all returned values
582 var marshalledReturnedVal *any.Any
583 var err error
584 for _, returnVal := range returnedValues {
585 if marshalledReturnedVal, err = encodeReturnedValue(returnVal); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500586 logger.Warnw("cannot-marshal-response-body", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700587 }
588 break // for now we support only 1 returned value - (excluding the error)
589 }
590
591 responseBody := &ic.InterContainerResponseBody{
592 Success: success,
593 Result: marshalledReturnedVal,
594 }
595
596 // Marshal the response body
597 var marshalledResponseBody *any.Any
598 if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500599 logger.Warnw("cannot-marshal-response-body", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700600 return nil, err
601 }
602
603 return &ic.InterContainerMessage{
604 Header: responseHeader,
605 Body: marshalledResponseBody,
606 }, nil
607}
608
609func CallFuncByName(myClass interface{}, funcName string, params ...interface{}) (out []reflect.Value, err error) {
610 myClassValue := reflect.ValueOf(myClass)
611 // Capitalize the first letter in the funcName to workaround the first capital letters required to
612 // invoke a function from a different package
613 funcName = strings.Title(funcName)
614 m := myClassValue.MethodByName(funcName)
615 if !m.IsValid() {
616 return make([]reflect.Value, 0), fmt.Errorf("method-not-found \"%s\"", funcName)
617 }
618 in := make([]reflect.Value, len(params))
619 for i, param := range params {
620 in[i] = reflect.ValueOf(param)
621 }
622 out = m.Call(in)
623 return
624}
625
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800626func (kp *interContainerProxy) addTransactionId(transactionId string, currentArgs []*ic.Argument) []*ic.Argument {
Scott Baker2c1c4822019-10-16 11:02:41 -0700627 arg := &KVArg{
628 Key: TransactionKey,
629 Value: &ic.StrType{Val: transactionId},
630 }
631
632 var marshalledArg *any.Any
633 var err error
634 if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: transactionId}); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500635 logger.Warnw("cannot-add-transactionId", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700636 return currentArgs
637 }
638 protoArg := &ic.Argument{
639 Key: arg.Key,
640 Value: marshalledArg,
641 }
642 return append(currentArgs, protoArg)
643}
644
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800645func (kp *interContainerProxy) addFromTopic(fromTopic string, currentArgs []*ic.Argument) []*ic.Argument {
Scott Baker2c1c4822019-10-16 11:02:41 -0700646 var marshalledArg *any.Any
647 var err error
648 if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: fromTopic}); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500649 logger.Warnw("cannot-add-transactionId", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700650 return currentArgs
651 }
652 protoArg := &ic.Argument{
653 Key: FromTopic,
654 Value: marshalledArg,
655 }
656 return append(currentArgs, protoArg)
657}
658
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800659func (kp *interContainerProxy) handleMessage(msg *ic.InterContainerMessage, targetInterface interface{}) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700660
661 // First extract the header to know whether this is a request - responses are handled by a different handler
662 if msg.Header.Type == ic.MessageType_REQUEST {
663 var out []reflect.Value
664 var err error
665
666 // Get the request body
667 requestBody := &ic.InterContainerRequestBody{}
668 if err = ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500669 logger.Warnw("cannot-unmarshal-request", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700670 } else {
khenaidoob332f9b2020-01-16 16:25:26 -0500671 logger.Debugw("received-request", log.Fields{"rpc": requestBody.Rpc, "header": msg.Header})
Scott Baker2c1c4822019-10-16 11:02:41 -0700672 // let the callee unpack the arguments as its the only one that knows the real proto type
673 // Augment the requestBody with the message Id as it will be used in scenarios where cores
674 // are set in pairs and competing
675 requestBody.Args = kp.addTransactionId(msg.Header.Id, requestBody.Args)
676
677 // Augment the requestBody with the From topic name as it will be used in scenarios where a container
678 // needs to send an unsollicited message to the currently requested container
679 requestBody.Args = kp.addFromTopic(msg.Header.FromTopic, requestBody.Args)
680
681 out, err = CallFuncByName(targetInterface, requestBody.Rpc, requestBody.Args)
682 if err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500683 logger.Warn(err)
Scott Baker2c1c4822019-10-16 11:02:41 -0700684 }
685 }
686 // Response required?
687 if requestBody.ResponseRequired {
688 // If we already have an error before then just return that
689 var returnError *ic.Error
690 var returnedValues []interface{}
691 var success bool
692 if err != nil {
693 returnError = &ic.Error{Reason: err.Error()}
694 returnedValues = make([]interface{}, 1)
695 returnedValues[0] = returnError
696 } else {
697 returnedValues = make([]interface{}, 0)
698 // Check for errors first
699 lastIndex := len(out) - 1
700 if out[lastIndex].Interface() != nil { // Error
701 if retError, ok := out[lastIndex].Interface().(error); ok {
702 if retError.Error() == ErrorTransactionNotAcquired.Error() {
khenaidoob332f9b2020-01-16 16:25:26 -0500703 logger.Debugw("Ignoring request", log.Fields{"error": retError, "txId": msg.Header.Id})
Scott Baker2c1c4822019-10-16 11:02:41 -0700704 return // Ignore - process is in competing mode and ignored transaction
705 }
706 returnError = &ic.Error{Reason: retError.Error()}
707 returnedValues = append(returnedValues, returnError)
708 } else { // Should never happen
709 returnError = &ic.Error{Reason: "incorrect-error-returns"}
710 returnedValues = append(returnedValues, returnError)
711 }
712 } else if len(out) == 2 && reflect.ValueOf(out[0].Interface()).IsValid() && reflect.ValueOf(out[0].Interface()).IsNil() {
khenaidoob332f9b2020-01-16 16:25:26 -0500713 logger.Warnw("Unexpected response of (nil,nil)", log.Fields{"txId": msg.Header.Id})
Scott Baker2c1c4822019-10-16 11:02:41 -0700714 return // Ignore - should not happen
715 } else { // Non-error case
716 success = true
717 for idx, val := range out {
khenaidoob332f9b2020-01-16 16:25:26 -0500718 //logger.Debugw("returned-api-response-loop", log.Fields{"idx": idx, "val": val.Interface()})
Scott Baker2c1c4822019-10-16 11:02:41 -0700719 if idx != lastIndex {
720 returnedValues = append(returnedValues, val.Interface())
721 }
722 }
723 }
724 }
725
726 var icm *ic.InterContainerMessage
727 if icm, err = encodeResponse(msg, success, returnedValues...); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500728 logger.Warnw("error-encoding-response-returning-failure-result", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700729 icm = encodeDefaultFailedResponse(msg)
730 }
731 // To preserve ordering of messages, all messages to a given topic are sent to the same partition
732 // by providing a message key. The key is encoded in the topic name. If the deviceId is not
733 // present then the key will be empty, hence all messages for a given topic will be sent to all
734 // partitions.
735 replyTopic := &Topic{Name: msg.Header.FromTopic}
736 key := msg.Header.KeyTopic
khenaidoob332f9b2020-01-16 16:25:26 -0500737 logger.Debugw("sending-response-to-kafka", log.Fields{"rpc": requestBody.Rpc, "header": icm.Header, "key": key})
Scott Baker2c1c4822019-10-16 11:02:41 -0700738 // TODO: handle error response.
739 go kp.kafkaClient.Send(icm, replyTopic, key)
740 }
741 } else if msg.Header.Type == ic.MessageType_RESPONSE {
khenaidoob332f9b2020-01-16 16:25:26 -0500742 logger.Debugw("response-received", log.Fields{"msg-header": msg.Header})
Scott Baker2c1c4822019-10-16 11:02:41 -0700743 go kp.dispatchResponse(msg)
744 } else {
khenaidoob332f9b2020-01-16 16:25:26 -0500745 logger.Warnw("unsupported-message-received", log.Fields{"msg-header": msg.Header})
Scott Baker2c1c4822019-10-16 11:02:41 -0700746 }
747}
748
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800749func (kp *interContainerProxy) waitForMessages(ch <-chan *ic.InterContainerMessage, topic Topic, targetInterface interface{}) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700750 // Wait for messages
751 for msg := range ch {
khenaidoob332f9b2020-01-16 16:25:26 -0500752 //logger.Debugw("request-received", log.Fields{"msg": msg, "topic": topic.Name, "target": targetInterface})
Scott Baker2c1c4822019-10-16 11:02:41 -0700753 go kp.handleMessage(msg, targetInterface)
754 }
755}
756
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800757func (kp *interContainerProxy) dispatchResponse(msg *ic.InterContainerMessage) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700758 kp.lockTransactionIdToChannelMap.RLock()
759 defer kp.lockTransactionIdToChannelMap.RUnlock()
760 if _, exist := kp.transactionIdToChannelMap[msg.Header.Id]; !exist {
khenaidoob332f9b2020-01-16 16:25:26 -0500761 logger.Debugw("no-waiting-channel", log.Fields{"transaction": msg.Header.Id})
Scott Baker2c1c4822019-10-16 11:02:41 -0700762 return
763 }
764 kp.transactionIdToChannelMap[msg.Header.Id].ch <- msg
765}
766
767// subscribeForResponse allows a caller to subscribe to a given topic when waiting for a response.
768// This method is built to prevent all subscribers to receive all messages as is the case of the Subscribe
769// API. There is one response channel waiting for kafka messages before dispatching the message to the
770// corresponding waiting channel
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800771func (kp *interContainerProxy) subscribeForResponse(topic Topic, trnsId string) (chan *ic.InterContainerMessage, error) {
khenaidoob332f9b2020-01-16 16:25:26 -0500772 logger.Debugw("subscribeForResponse", log.Fields{"topic": topic.Name, "trnsid": trnsId})
Scott Baker2c1c4822019-10-16 11:02:41 -0700773
774 // Create a specific channel for this consumers. We cannot use the channel from the kafkaclient as it will
775 // broadcast any message for this topic to all channels waiting on it.
776 ch := make(chan *ic.InterContainerMessage)
777 kp.addToTransactionIdToChannelMap(trnsId, &topic, ch)
778
779 return ch, nil
780}
781
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800782func (kp *interContainerProxy) unSubscribeForResponse(trnsId string) error {
khenaidoob332f9b2020-01-16 16:25:26 -0500783 logger.Debugw("unsubscribe-for-response", log.Fields{"trnsId": trnsId})
Scott Baker2c1c4822019-10-16 11:02:41 -0700784 kp.deleteFromTransactionIdToChannelMap(trnsId)
785 return nil
786}
787
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800788func (kp *interContainerProxy) EnableLivenessChannel(enable bool) chan bool {
Scott Baker104b67d2019-10-29 15:56:27 -0700789 return kp.kafkaClient.EnableLivenessChannel(enable)
790}
791
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800792func (kp *interContainerProxy) EnableHealthinessChannel(enable bool) chan bool {
Scott Baker0fef6982019-12-12 09:49:42 -0800793 return kp.kafkaClient.EnableHealthinessChannel(enable)
794}
795
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800796func (kp *interContainerProxy) SendLiveness() error {
Scott Baker104b67d2019-10-29 15:56:27 -0700797 return kp.kafkaClient.SendLiveness()
798}
799
Scott Baker2c1c4822019-10-16 11:02:41 -0700800//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
801//or an error on failure
802func encodeRequest(rpc string, toTopic *Topic, replyTopic *Topic, key string, kvArgs ...*KVArg) (*ic.InterContainerMessage, error) {
803 requestHeader := &ic.Header{
804 Id: uuid.New().String(),
805 Type: ic.MessageType_REQUEST,
806 FromTopic: replyTopic.Name,
807 ToTopic: toTopic.Name,
808 KeyTopic: key,
809 Timestamp: time.Now().UnixNano(),
810 }
811 requestBody := &ic.InterContainerRequestBody{
812 Rpc: rpc,
813 ResponseRequired: true,
814 ReplyToTopic: replyTopic.Name,
815 }
816
817 for _, arg := range kvArgs {
818 if arg == nil {
819 // In case the caller sends an array with empty args
820 continue
821 }
822 var marshalledArg *any.Any
823 var err error
824 // ascertain the value interface type is a proto.Message
825 protoValue, ok := arg.Value.(proto.Message)
826 if !ok {
khenaidoob332f9b2020-01-16 16:25:26 -0500827 logger.Warnw("argument-value-not-proto-message", log.Fields{"error": ok, "Value": arg.Value})
Scott Baker2c1c4822019-10-16 11:02:41 -0700828 err := errors.New("argument-value-not-proto-message")
829 return nil, err
830 }
831 if marshalledArg, err = ptypes.MarshalAny(protoValue); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500832 logger.Warnw("cannot-marshal-request", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700833 return nil, err
834 }
835 protoArg := &ic.Argument{
836 Key: arg.Key,
837 Value: marshalledArg,
838 }
839 requestBody.Args = append(requestBody.Args, protoArg)
840 }
841
842 var marshalledData *any.Any
843 var err error
844 if marshalledData, err = ptypes.MarshalAny(requestBody); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500845 logger.Warnw("cannot-marshal-request", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700846 return nil, err
847 }
848 request := &ic.InterContainerMessage{
849 Header: requestHeader,
850 Body: marshalledData,
851 }
852 return request, nil
853}
854
855func decodeResponse(response *ic.InterContainerMessage) (*ic.InterContainerResponseBody, error) {
856 // Extract the message body
857 responseBody := ic.InterContainerResponseBody{}
858 if err := ptypes.UnmarshalAny(response.Body, &responseBody); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500859 logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700860 return nil, err
861 }
khenaidoob332f9b2020-01-16 16:25:26 -0500862 //logger.Debugw("response-decoded-successfully", log.Fields{"response-status": &responseBody.Success})
Scott Baker2c1c4822019-10-16 11:02:41 -0700863
864 return &responseBody, nil
865
866}