blob: 33261912e2da3c950e6dcb0f3774b42c758ed186 [file] [log] [blame]
khenaidoobf6e7bb2018-08-14 22:27:29 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
khenaidooabad44c2018-08-03 16:58:35 -040016package kafka
17
18import (
19 "context"
20 "errors"
21 "fmt"
khenaidooabad44c2018-08-03 16:58:35 -040022 "github.com/golang/protobuf/proto"
23 "github.com/golang/protobuf/ptypes"
24 "github.com/golang/protobuf/ptypes/any"
25 "github.com/google/uuid"
Scott Baker807addd2019-10-24 15:16:21 -070026 "github.com/opencord/voltha-lib-go/v2/pkg/log"
Scott Baker555307d2019-11-04 08:58:01 -080027 ic "github.com/opencord/voltha-protos/v2/go/inter_container"
khenaidooabad44c2018-08-03 16:58:35 -040028 "reflect"
khenaidoo19374072018-12-11 11:05:15 -050029 "strings"
khenaidooabad44c2018-08-03 16:58:35 -040030 "sync"
31 "time"
32)
33
34// Initialize the logger - gets the default until the main function setup the logger
35func init() {
khenaidooca301322019-01-09 23:06:32 -050036 log.AddPackage(log.JSON, log.DebugLevel, nil)
khenaidooabad44c2018-08-03 16:58:35 -040037}
38
39const (
khenaidoo43c82122018-11-22 18:38:28 -050040 DefaultMaxRetries = 3
khenaidoo6d055132019-02-12 16:51:19 -050041 DefaultRequestTimeout = 10000 // 10000 milliseconds - to handle a wider latency range
khenaidooabad44c2018-08-03 16:58:35 -040042)
43
khenaidoo297cd252019-02-07 22:10:23 -050044const (
45 TransactionKey = "transactionID"
khenaidoo54e0ddf2019-02-27 16:21:33 -050046 FromTopic = "fromTopic"
khenaidoo297cd252019-02-07 22:10:23 -050047)
48
khenaidoo09771ef2019-10-11 14:25:02 -040049var ErrorTransactionNotAcquired = errors.New("transaction-not-acquired")
50var ErrorTransactionInvalidId = errors.New("transaction-invalid-id")
51
khenaidoo43c82122018-11-22 18:38:28 -050052// requestHandlerChannel represents an interface associated with a channel. Whenever, an event is
53// obtained from that channel, this interface is invoked. This is used to handle
54// async requests into the Core via the kafka messaging bus
55type requestHandlerChannel struct {
56 requesthandlerInterface interface{}
khenaidoo79232702018-12-04 11:00:41 -050057 ch <-chan *ic.InterContainerMessage
khenaidooabad44c2018-08-03 16:58:35 -040058}
59
khenaidoo43c82122018-11-22 18:38:28 -050060// transactionChannel represents a combination of a topic and a channel onto which a response received
61// on the kafka bus will be sent to
62type transactionChannel struct {
63 topic *Topic
khenaidoo79232702018-12-04 11:00:41 -050064 ch chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -050065}
66
67// InterContainerProxy represents the messaging proxy
68type InterContainerProxy struct {
69 kafkaHost string
70 kafkaPort int
71 DefaultTopic *Topic
72 defaultRequestHandlerInterface interface{}
khenaidoo79232702018-12-04 11:00:41 -050073 deviceDiscoveryTopic *Topic
khenaidoo43c82122018-11-22 18:38:28 -050074 kafkaClient Client
75 doneCh chan int
76
77 // This map is used to map a topic to an interface and channel. When a request is received
78 // on that channel (registered to the topic) then that interface is invoked.
79 topicToRequestHandlerChannelMap map[string]*requestHandlerChannel
80 lockTopicRequestHandlerChannelMap sync.RWMutex
81
82 // This map is used to map a channel to a response topic. This channel handles all responses on that
khenaidoo4c1a5bf2018-11-29 15:53:42 -050083 // channel for that topic and forward them to the appropriate consumers channel, using the
khenaidoo43c82122018-11-22 18:38:28 -050084 // transactionIdToChannelMap.
khenaidoo79232702018-12-04 11:00:41 -050085 topicToResponseChannelMap map[string]<-chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -050086 lockTopicResponseChannelMap sync.RWMutex
87
khenaidoo4c1a5bf2018-11-29 15:53:42 -050088 // This map is used to map a transaction to a consumers channel. This is used whenever a request has been
khenaidoo43c82122018-11-22 18:38:28 -050089 // sent out and we are waiting for a response.
90 transactionIdToChannelMap map[string]*transactionChannel
khenaidooabad44c2018-08-03 16:58:35 -040091 lockTransactionIdToChannelMap sync.RWMutex
92}
93
khenaidoo43c82122018-11-22 18:38:28 -050094type InterContainerProxyOption func(*InterContainerProxy)
khenaidooabad44c2018-08-03 16:58:35 -040095
khenaidoo43c82122018-11-22 18:38:28 -050096func InterContainerHost(host string) InterContainerProxyOption {
97 return func(args *InterContainerProxy) {
98 args.kafkaHost = host
khenaidooabad44c2018-08-03 16:58:35 -040099 }
100}
101
khenaidoo43c82122018-11-22 18:38:28 -0500102func InterContainerPort(port int) InterContainerProxyOption {
103 return func(args *InterContainerProxy) {
104 args.kafkaPort = port
khenaidooabad44c2018-08-03 16:58:35 -0400105 }
106}
107
khenaidoo43c82122018-11-22 18:38:28 -0500108func DefaultTopic(topic *Topic) InterContainerProxyOption {
109 return func(args *InterContainerProxy) {
khenaidooabad44c2018-08-03 16:58:35 -0400110 args.DefaultTopic = topic
111 }
112}
113
khenaidoo79232702018-12-04 11:00:41 -0500114func DeviceDiscoveryTopic(topic *Topic) InterContainerProxyOption {
115 return func(args *InterContainerProxy) {
116 args.deviceDiscoveryTopic = topic
117 }
118}
119
khenaidoo43c82122018-11-22 18:38:28 -0500120func RequestHandlerInterface(handler interface{}) InterContainerProxyOption {
121 return func(args *InterContainerProxy) {
122 args.defaultRequestHandlerInterface = handler
khenaidooabad44c2018-08-03 16:58:35 -0400123 }
124}
125
khenaidoo43c82122018-11-22 18:38:28 -0500126func MsgClient(client Client) InterContainerProxyOption {
127 return func(args *InterContainerProxy) {
128 args.kafkaClient = client
129 }
130}
131
132func NewInterContainerProxy(opts ...InterContainerProxyOption) (*InterContainerProxy, error) {
133 proxy := &InterContainerProxy{
134 kafkaHost: DefaultKafkaHost,
135 kafkaPort: DefaultKafkaPort,
khenaidooabad44c2018-08-03 16:58:35 -0400136 }
137
138 for _, option := range opts {
139 option(proxy)
140 }
141
142 // Create the locks for all the maps
khenaidoo43c82122018-11-22 18:38:28 -0500143 proxy.lockTopicRequestHandlerChannelMap = sync.RWMutex{}
khenaidooabad44c2018-08-03 16:58:35 -0400144 proxy.lockTransactionIdToChannelMap = sync.RWMutex{}
khenaidoo43c82122018-11-22 18:38:28 -0500145 proxy.lockTopicResponseChannelMap = sync.RWMutex{}
khenaidooabad44c2018-08-03 16:58:35 -0400146
147 return proxy, nil
148}
149
khenaidoo43c82122018-11-22 18:38:28 -0500150func (kp *InterContainerProxy) Start() error {
khenaidooabad44c2018-08-03 16:58:35 -0400151 log.Info("Starting-Proxy")
152
khenaidoo43c82122018-11-22 18:38:28 -0500153 // Kafka MsgClient should already have been created. If not, output fatal error
154 if kp.kafkaClient == nil {
155 log.Fatal("kafka-client-not-set")
156 }
157
khenaidooabad44c2018-08-03 16:58:35 -0400158 // Create the Done channel
159 kp.doneCh = make(chan int, 1)
160
khenaidoo43c82122018-11-22 18:38:28 -0500161 // Start the kafka client
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500162 if err := kp.kafkaClient.Start(); err != nil {
khenaidoo43c82122018-11-22 18:38:28 -0500163 log.Errorw("Cannot-create-kafka-proxy", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400164 return err
165 }
166
khenaidoo43c82122018-11-22 18:38:28 -0500167 // Create the topic to response channel map
khenaidoo79232702018-12-04 11:00:41 -0500168 kp.topicToResponseChannelMap = make(map[string]<-chan *ic.InterContainerMessage)
khenaidoo43c82122018-11-22 18:38:28 -0500169 //
khenaidooabad44c2018-08-03 16:58:35 -0400170 // Create the transactionId to Channel Map
khenaidoo43c82122018-11-22 18:38:28 -0500171 kp.transactionIdToChannelMap = make(map[string]*transactionChannel)
172
173 // Create the topic to request channel map
174 kp.topicToRequestHandlerChannelMap = make(map[string]*requestHandlerChannel)
khenaidooabad44c2018-08-03 16:58:35 -0400175
176 return nil
177}
178
khenaidoo43c82122018-11-22 18:38:28 -0500179func (kp *InterContainerProxy) Stop() {
180 log.Info("stopping-intercontainer-proxy")
181 kp.doneCh <- 1
182 // TODO : Perform cleanup
khenaidooca301322019-01-09 23:06:32 -0500183 kp.kafkaClient.Stop()
khenaidoo43c82122018-11-22 18:38:28 -0500184 //kp.deleteAllTopicRequestHandlerChannelMap()
185 //kp.deleteAllTopicResponseChannelMap()
186 //kp.deleteAllTransactionIdToChannelMap()
khenaidooabad44c2018-08-03 16:58:35 -0400187}
188
khenaidoo79232702018-12-04 11:00:41 -0500189// DeviceDiscovered publish the discovered device onto the kafka messaging bus
khenaidoo19374072018-12-11 11:05:15 -0500190func (kp *InterContainerProxy) DeviceDiscovered(deviceId string, deviceType string, parentId string, publisher string) error {
khenaidoo79232702018-12-04 11:00:41 -0500191 log.Debugw("sending-device-discovery-msg", log.Fields{"deviceId": deviceId})
192 // Simple validation
193 if deviceId == "" || deviceType == "" {
194 log.Errorw("invalid-parameters", log.Fields{"id": deviceId, "type": deviceType})
195 return errors.New("invalid-parameters")
196 }
197 // Create the device discovery message
198 header := &ic.Header{
199 Id: uuid.New().String(),
200 Type: ic.MessageType_DEVICE_DISCOVERED,
201 FromTopic: kp.DefaultTopic.Name,
202 ToTopic: kp.deviceDiscoveryTopic.Name,
203 Timestamp: time.Now().UnixNano(),
204 }
205 body := &ic.DeviceDiscovered{
206 Id: deviceId,
207 DeviceType: deviceType,
208 ParentId: parentId,
khenaidood2b6df92018-12-13 16:37:20 -0500209 Publisher: publisher,
khenaidoo79232702018-12-04 11:00:41 -0500210 }
211
212 var marshalledData *any.Any
213 var err error
214 if marshalledData, err = ptypes.MarshalAny(body); err != nil {
215 log.Errorw("cannot-marshal-request", log.Fields{"error": err})
216 return err
217 }
218 msg := &ic.InterContainerMessage{
219 Header: header,
220 Body: marshalledData,
221 }
222
223 // Send the message
224 if err := kp.kafkaClient.Send(msg, kp.deviceDiscoveryTopic); err != nil {
225 log.Errorw("cannot-send-device-discovery-message", log.Fields{"error": err})
226 return err
227 }
228 return nil
229}
230
khenaidoo43c82122018-11-22 18:38:28 -0500231// InvokeRPC is used to send a request to a given topic
232func (kp *InterContainerProxy) InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
khenaidoobdcb8e02019-03-06 16:28:56 -0500233 waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any) {
khenaidoo43c82122018-11-22 18:38:28 -0500234
235 // If a replyToTopic is provided then we use it, otherwise just use the default toTopic. The replyToTopic is
236 // typically the device ID.
237 responseTopic := replyToTopic
238 if responseTopic == nil {
239 responseTopic = kp.DefaultTopic
240 }
241
khenaidooabad44c2018-08-03 16:58:35 -0400242 // Encode the request
khenaidoobdcb8e02019-03-06 16:28:56 -0500243 protoRequest, err := encodeRequest(rpc, toTopic, responseTopic, key, kvArgs...)
khenaidooabad44c2018-08-03 16:58:35 -0400244 if err != nil {
245 log.Warnw("cannot-format-request", log.Fields{"rpc": rpc, "error": err})
246 return false, nil
247 }
248
249 // Subscribe for response, if needed, before sending request
khenaidoo79232702018-12-04 11:00:41 -0500250 var ch <-chan *ic.InterContainerMessage
khenaidooabad44c2018-08-03 16:58:35 -0400251 if waitForResponse {
252 var err error
khenaidoo43c82122018-11-22 18:38:28 -0500253 if ch, err = kp.subscribeForResponse(*responseTopic, protoRequest.Header.Id); err != nil {
254 log.Errorw("failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
khenaidooabad44c2018-08-03 16:58:35 -0400255 }
256 }
257
khenaidoo43c82122018-11-22 18:38:28 -0500258 // Send request - if the topic is formatted with a device Id then we will send the request using a
259 // specific key, hence ensuring a single partition is used to publish the request. This ensures that the
260 // subscriber on that topic will receive the request in the order it was sent. The key used is the deviceId.
khenaidoobdcb8e02019-03-06 16:28:56 -0500261 //key := GetDeviceIdFromTopic(*toTopic)
khenaidoo1ce37ad2019-03-24 22:07:24 -0400262 log.Debugw("sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
khenaidoo8f474192019-04-03 17:20:44 -0400263 go kp.kafkaClient.Send(protoRequest, toTopic, key)
khenaidooabad44c2018-08-03 16:58:35 -0400264
265 if waitForResponse {
khenaidoob9203542018-09-17 22:56:37 -0400266 // Create a child context based on the parent context, if any
khenaidooabad44c2018-08-03 16:58:35 -0400267 var cancel context.CancelFunc
khenaidoob9203542018-09-17 22:56:37 -0400268 childCtx := context.Background()
khenaidooabad44c2018-08-03 16:58:35 -0400269 if ctx == nil {
270 ctx, cancel = context.WithTimeout(context.Background(), DefaultRequestTimeout*time.Millisecond)
khenaidoob9203542018-09-17 22:56:37 -0400271 } else {
272 childCtx, cancel = context.WithTimeout(ctx, DefaultRequestTimeout*time.Millisecond)
khenaidooabad44c2018-08-03 16:58:35 -0400273 }
khenaidoob9203542018-09-17 22:56:37 -0400274 defer cancel()
khenaidooabad44c2018-08-03 16:58:35 -0400275
276 // Wait for response as well as timeout or cancellation
277 // Remove the subscription for a response on return
278 defer kp.unSubscribeForResponse(protoRequest.Header.Id)
279 select {
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500280 case msg, ok := <-ch:
281 if !ok {
282 log.Warnw("channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
283 protoError := &ic.Error{Reason: "channel-closed"}
284 var marshalledArg *any.Any
285 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
286 return false, nil // Should never happen
287 }
288 return false, marshalledArg
289 }
khenaidoo43c82122018-11-22 18:38:28 -0500290 log.Debugw("received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
khenaidoo79232702018-12-04 11:00:41 -0500291 var responseBody *ic.InterContainerResponseBody
khenaidooabad44c2018-08-03 16:58:35 -0400292 var err error
293 if responseBody, err = decodeResponse(msg); err != nil {
294 log.Errorw("decode-response-error", log.Fields{"error": err})
295 }
296 return responseBody.Success, responseBody.Result
297 case <-ctx.Done():
298 log.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
299 // pack the error as proto any type
khenaidoo79232702018-12-04 11:00:41 -0500300 protoError := &ic.Error{Reason: ctx.Err().Error()}
khenaidooabad44c2018-08-03 16:58:35 -0400301 var marshalledArg *any.Any
302 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
303 return false, nil // Should never happen
304 }
305 return false, marshalledArg
khenaidoob9203542018-09-17 22:56:37 -0400306 case <-childCtx.Done():
307 log.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": childCtx.Err()})
308 // pack the error as proto any type
khenaidoo79232702018-12-04 11:00:41 -0500309 protoError := &ic.Error{Reason: childCtx.Err().Error()}
khenaidoob9203542018-09-17 22:56:37 -0400310 var marshalledArg *any.Any
311 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
312 return false, nil // Should never happen
313 }
314 return false, marshalledArg
khenaidooabad44c2018-08-03 16:58:35 -0400315 case <-kp.doneCh:
khenaidoo43c82122018-11-22 18:38:28 -0500316 log.Infow("received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
khenaidooabad44c2018-08-03 16:58:35 -0400317 return true, nil
318 }
319 }
320 return true, nil
321}
322
khenaidoo43c82122018-11-22 18:38:28 -0500323// SubscribeWithRequestHandlerInterface allows a caller to assign a target object to be invoked automatically
khenaidooabad44c2018-08-03 16:58:35 -0400324// when a message is received on a given topic
khenaidoo43c82122018-11-22 18:38:28 -0500325func (kp *InterContainerProxy) SubscribeWithRequestHandlerInterface(topic Topic, handler interface{}) error {
khenaidooabad44c2018-08-03 16:58:35 -0400326
327 // Subscribe to receive messages for that topic
khenaidoo79232702018-12-04 11:00:41 -0500328 var ch <-chan *ic.InterContainerMessage
khenaidooabad44c2018-08-03 16:58:35 -0400329 var err error
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500330 if ch, err = kp.kafkaClient.Subscribe(&topic); err != nil {
khenaidoo43c82122018-11-22 18:38:28 -0500331 //if ch, err = kp.Subscribe(topic); err != nil {
khenaidooabad44c2018-08-03 16:58:35 -0400332 log.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
Abhilash S.L90cd9552019-07-18 17:30:29 +0530333 return err
khenaidooabad44c2018-08-03 16:58:35 -0400334 }
khenaidoo43c82122018-11-22 18:38:28 -0500335
336 kp.defaultRequestHandlerInterface = handler
337 kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: handler, ch: ch})
khenaidooabad44c2018-08-03 16:58:35 -0400338 // Launch a go routine to receive and process kafka messages
khenaidoo54e0ddf2019-02-27 16:21:33 -0500339 go kp.waitForMessages(ch, topic, handler)
khenaidooabad44c2018-08-03 16:58:35 -0400340
341 return nil
342}
343
khenaidoo43c82122018-11-22 18:38:28 -0500344// SubscribeWithDefaultRequestHandler allows a caller to add a topic to an existing target object to be invoked automatically
345// when a message is received on a given topic. So far there is only 1 target registered per microservice
khenaidoo731697e2019-01-29 16:03:29 -0500346func (kp *InterContainerProxy) SubscribeWithDefaultRequestHandler(topic Topic, initialOffset int64) error {
khenaidoo43c82122018-11-22 18:38:28 -0500347 // Subscribe to receive messages for that topic
khenaidoo79232702018-12-04 11:00:41 -0500348 var ch <-chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -0500349 var err error
khenaidoo54e0ddf2019-02-27 16:21:33 -0500350 if ch, err = kp.kafkaClient.Subscribe(&topic, &KVArg{Key: Offset, Value: initialOffset}); err != nil {
khenaidoo43c82122018-11-22 18:38:28 -0500351 log.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
khenaidooca301322019-01-09 23:06:32 -0500352 return err
khenaidoo43c82122018-11-22 18:38:28 -0500353 }
354 kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: kp.defaultRequestHandlerInterface, ch: ch})
355
356 // Launch a go routine to receive and process kafka messages
khenaidoo54e0ddf2019-02-27 16:21:33 -0500357 go kp.waitForMessages(ch, topic, kp.defaultRequestHandlerInterface)
khenaidoo43c82122018-11-22 18:38:28 -0500358
khenaidooabad44c2018-08-03 16:58:35 -0400359 return nil
360}
361
khenaidoo43c82122018-11-22 18:38:28 -0500362func (kp *InterContainerProxy) UnSubscribeFromRequestHandler(topic Topic) error {
363 return kp.deleteFromTopicRequestHandlerChannelMap(topic.Name)
364}
365
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500366// setupTopicResponseChannelMap sets up single consumers channel that will act as a broadcast channel for all
khenaidoo43c82122018-11-22 18:38:28 -0500367// responses from that topic.
khenaidoo79232702018-12-04 11:00:41 -0500368func (kp *InterContainerProxy) setupTopicResponseChannelMap(topic string, arg <-chan *ic.InterContainerMessage) {
khenaidoo43c82122018-11-22 18:38:28 -0500369 kp.lockTopicResponseChannelMap.Lock()
370 defer kp.lockTopicResponseChannelMap.Unlock()
371 if _, exist := kp.topicToResponseChannelMap[topic]; !exist {
372 kp.topicToResponseChannelMap[topic] = arg
khenaidooabad44c2018-08-03 16:58:35 -0400373 }
374}
375
khenaidoo43c82122018-11-22 18:38:28 -0500376func (kp *InterContainerProxy) isTopicSubscribedForResponse(topic string) bool {
khenaidoo1ce37ad2019-03-24 22:07:24 -0400377 kp.lockTopicResponseChannelMap.RLock()
378 defer kp.lockTopicResponseChannelMap.RUnlock()
khenaidoo43c82122018-11-22 18:38:28 -0500379 _, exist := kp.topicToResponseChannelMap[topic]
380 return exist
381}
382
383func (kp *InterContainerProxy) deleteFromTopicResponseChannelMap(topic string) error {
384 kp.lockTopicResponseChannelMap.Lock()
385 defer kp.lockTopicResponseChannelMap.Unlock()
386 if _, exist := kp.topicToResponseChannelMap[topic]; exist {
387 // Unsubscribe to this topic first - this will close the subscribed channel
388 var err error
389 if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
390 log.Errorw("unsubscribing-error", log.Fields{"topic": topic})
391 }
392 delete(kp.topicToResponseChannelMap, topic)
393 return err
394 } else {
395 return errors.New(fmt.Sprintf("%s-Topic-not-found", topic))
khenaidooabad44c2018-08-03 16:58:35 -0400396 }
397}
398
khenaidoo43c82122018-11-22 18:38:28 -0500399func (kp *InterContainerProxy) deleteAllTopicResponseChannelMap() error {
400 kp.lockTopicResponseChannelMap.Lock()
401 defer kp.lockTopicResponseChannelMap.Unlock()
402 var err error
403 for topic, _ := range kp.topicToResponseChannelMap {
404 // Unsubscribe to this topic first - this will close the subscribed channel
405 if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
406 log.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
407 }
408 delete(kp.topicToResponseChannelMap, topic)
khenaidooabad44c2018-08-03 16:58:35 -0400409 }
khenaidoo43c82122018-11-22 18:38:28 -0500410 return err
khenaidooabad44c2018-08-03 16:58:35 -0400411}
412
khenaidoo43c82122018-11-22 18:38:28 -0500413func (kp *InterContainerProxy) addToTopicRequestHandlerChannelMap(topic string, arg *requestHandlerChannel) {
414 kp.lockTopicRequestHandlerChannelMap.Lock()
415 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
416 if _, exist := kp.topicToRequestHandlerChannelMap[topic]; !exist {
417 kp.topicToRequestHandlerChannelMap[topic] = arg
khenaidooabad44c2018-08-03 16:58:35 -0400418 }
khenaidooabad44c2018-08-03 16:58:35 -0400419}
420
khenaidoo43c82122018-11-22 18:38:28 -0500421func (kp *InterContainerProxy) deleteFromTopicRequestHandlerChannelMap(topic string) error {
422 kp.lockTopicRequestHandlerChannelMap.Lock()
423 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
424 if _, exist := kp.topicToRequestHandlerChannelMap[topic]; exist {
425 // Close the kafka client client first by unsubscribing to this topic
426 kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch)
427 delete(kp.topicToRequestHandlerChannelMap, topic)
khenaidooabad44c2018-08-03 16:58:35 -0400428 return nil
khenaidoo43c82122018-11-22 18:38:28 -0500429 } else {
430 return errors.New(fmt.Sprintf("%s-Topic-not-found", topic))
khenaidooabad44c2018-08-03 16:58:35 -0400431 }
khenaidooabad44c2018-08-03 16:58:35 -0400432}
433
khenaidoo43c82122018-11-22 18:38:28 -0500434func (kp *InterContainerProxy) deleteAllTopicRequestHandlerChannelMap() error {
435 kp.lockTopicRequestHandlerChannelMap.Lock()
436 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
437 var err error
438 for topic, _ := range kp.topicToRequestHandlerChannelMap {
439 // Close the kafka client client first by unsubscribing to this topic
440 if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
441 log.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
442 }
443 delete(kp.topicToRequestHandlerChannelMap, topic)
444 }
445 return err
446}
447
khenaidoo79232702018-12-04 11:00:41 -0500448func (kp *InterContainerProxy) addToTransactionIdToChannelMap(id string, topic *Topic, arg chan *ic.InterContainerMessage) {
khenaidooabad44c2018-08-03 16:58:35 -0400449 kp.lockTransactionIdToChannelMap.Lock()
450 defer kp.lockTransactionIdToChannelMap.Unlock()
451 if _, exist := kp.transactionIdToChannelMap[id]; !exist {
khenaidoo43c82122018-11-22 18:38:28 -0500452 kp.transactionIdToChannelMap[id] = &transactionChannel{topic: topic, ch: arg}
khenaidooabad44c2018-08-03 16:58:35 -0400453 }
454}
455
khenaidoo43c82122018-11-22 18:38:28 -0500456func (kp *InterContainerProxy) deleteFromTransactionIdToChannelMap(id string) {
khenaidooabad44c2018-08-03 16:58:35 -0400457 kp.lockTransactionIdToChannelMap.Lock()
458 defer kp.lockTransactionIdToChannelMap.Unlock()
khenaidoo43c82122018-11-22 18:38:28 -0500459 if transChannel, exist := kp.transactionIdToChannelMap[id]; exist {
460 // Close the channel first
461 close(transChannel.ch)
khenaidooabad44c2018-08-03 16:58:35 -0400462 delete(kp.transactionIdToChannelMap, id)
463 }
464}
465
khenaidoo43c82122018-11-22 18:38:28 -0500466func (kp *InterContainerProxy) deleteTopicTransactionIdToChannelMap(id string) {
467 kp.lockTransactionIdToChannelMap.Lock()
468 defer kp.lockTransactionIdToChannelMap.Unlock()
469 for key, value := range kp.transactionIdToChannelMap {
470 if value.topic.Name == id {
471 close(value.ch)
472 delete(kp.transactionIdToChannelMap, key)
khenaidooabad44c2018-08-03 16:58:35 -0400473 }
474 }
khenaidooabad44c2018-08-03 16:58:35 -0400475}
476
khenaidoo43c82122018-11-22 18:38:28 -0500477func (kp *InterContainerProxy) deleteAllTransactionIdToChannelMap() {
478 kp.lockTransactionIdToChannelMap.Lock()
479 defer kp.lockTransactionIdToChannelMap.Unlock()
480 for key, value := range kp.transactionIdToChannelMap {
481 close(value.ch)
482 delete(kp.transactionIdToChannelMap, key)
khenaidooabad44c2018-08-03 16:58:35 -0400483 }
khenaidooabad44c2018-08-03 16:58:35 -0400484}
485
khenaidoo43c82122018-11-22 18:38:28 -0500486func (kp *InterContainerProxy) DeleteTopic(topic Topic) error {
487 // If we have any consumers on that topic we need to close them
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500488 if err := kp.deleteFromTopicResponseChannelMap(topic.Name); err != nil {
489 log.Errorw("delete-from-topic-responsechannelmap-failed", log.Fields{"error": err})
490 }
491 if err := kp.deleteFromTopicRequestHandlerChannelMap(topic.Name); err != nil {
492 log.Errorw("delete-from-topic-requesthandlerchannelmap-failed", log.Fields{"error": err})
493 }
khenaidoo43c82122018-11-22 18:38:28 -0500494 kp.deleteTopicTransactionIdToChannelMap(topic.Name)
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500495
khenaidoo43c82122018-11-22 18:38:28 -0500496 return kp.kafkaClient.DeleteTopic(&topic)
497}
498
499func encodeReturnedValue(returnedVal interface{}) (*any.Any, error) {
khenaidooabad44c2018-08-03 16:58:35 -0400500 // Encode the response argument - needs to be a proto message
501 if returnedVal == nil {
502 return nil, nil
503 }
504 protoValue, ok := returnedVal.(proto.Message)
505 if !ok {
506 log.Warnw("response-value-not-proto-message", log.Fields{"error": ok, "returnVal": returnedVal})
507 err := errors.New("response-value-not-proto-message")
508 return nil, err
509 }
510
511 // Marshal the returned value, if any
512 var marshalledReturnedVal *any.Any
513 var err error
514 if marshalledReturnedVal, err = ptypes.MarshalAny(protoValue); err != nil {
515 log.Warnw("cannot-marshal-returned-val", log.Fields{"error": err})
516 return nil, err
517 }
518 return marshalledReturnedVal, nil
519}
520
khenaidoo79232702018-12-04 11:00:41 -0500521func encodeDefaultFailedResponse(request *ic.InterContainerMessage) *ic.InterContainerMessage {
522 responseHeader := &ic.Header{
khenaidooabad44c2018-08-03 16:58:35 -0400523 Id: request.Header.Id,
khenaidoo79232702018-12-04 11:00:41 -0500524 Type: ic.MessageType_RESPONSE,
khenaidooabad44c2018-08-03 16:58:35 -0400525 FromTopic: request.Header.ToTopic,
526 ToTopic: request.Header.FromTopic,
527 Timestamp: time.Now().Unix(),
528 }
khenaidoo79232702018-12-04 11:00:41 -0500529 responseBody := &ic.InterContainerResponseBody{
khenaidooabad44c2018-08-03 16:58:35 -0400530 Success: false,
531 Result: nil,
532 }
533 var marshalledResponseBody *any.Any
534 var err error
535 // Error should never happen here
536 if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
537 log.Warnw("cannot-marshal-failed-response-body", log.Fields{"error": err})
538 }
539
khenaidoo79232702018-12-04 11:00:41 -0500540 return &ic.InterContainerMessage{
khenaidooabad44c2018-08-03 16:58:35 -0400541 Header: responseHeader,
542 Body: marshalledResponseBody,
543 }
544
545}
546
547//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
548//or an error on failure
khenaidoo79232702018-12-04 11:00:41 -0500549func encodeResponse(request *ic.InterContainerMessage, success bool, returnedValues ...interface{}) (*ic.InterContainerMessage, error) {
khenaidoo43c82122018-11-22 18:38:28 -0500550 //log.Debugw("encodeResponse", log.Fields{"success": success, "returnedValues": returnedValues})
khenaidoo79232702018-12-04 11:00:41 -0500551 responseHeader := &ic.Header{
khenaidooabad44c2018-08-03 16:58:35 -0400552 Id: request.Header.Id,
khenaidoo79232702018-12-04 11:00:41 -0500553 Type: ic.MessageType_RESPONSE,
khenaidooabad44c2018-08-03 16:58:35 -0400554 FromTopic: request.Header.ToTopic,
555 ToTopic: request.Header.FromTopic,
khenaidoo2c6a0992019-04-29 13:46:56 -0400556 KeyTopic: request.Header.KeyTopic,
khenaidoo8f474192019-04-03 17:20:44 -0400557 Timestamp: time.Now().UnixNano(),
khenaidooabad44c2018-08-03 16:58:35 -0400558 }
559
560 // Go over all returned values
561 var marshalledReturnedVal *any.Any
562 var err error
563 for _, returnVal := range returnedValues {
khenaidoo43c82122018-11-22 18:38:28 -0500564 if marshalledReturnedVal, err = encodeReturnedValue(returnVal); err != nil {
khenaidooabad44c2018-08-03 16:58:35 -0400565 log.Warnw("cannot-marshal-response-body", log.Fields{"error": err})
566 }
567 break // for now we support only 1 returned value - (excluding the error)
568 }
569
khenaidoo79232702018-12-04 11:00:41 -0500570 responseBody := &ic.InterContainerResponseBody{
khenaidooabad44c2018-08-03 16:58:35 -0400571 Success: success,
572 Result: marshalledReturnedVal,
573 }
574
575 // Marshal the response body
576 var marshalledResponseBody *any.Any
577 if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
578 log.Warnw("cannot-marshal-response-body", log.Fields{"error": err})
579 return nil, err
580 }
581
khenaidoo79232702018-12-04 11:00:41 -0500582 return &ic.InterContainerMessage{
khenaidooabad44c2018-08-03 16:58:35 -0400583 Header: responseHeader,
584 Body: marshalledResponseBody,
585 }, nil
586}
587
588func CallFuncByName(myClass interface{}, funcName string, params ...interface{}) (out []reflect.Value, err error) {
589 myClassValue := reflect.ValueOf(myClass)
khenaidoo19374072018-12-11 11:05:15 -0500590 // Capitalize the first letter in the funcName to workaround the first capital letters required to
591 // invoke a function from a different package
592 funcName = strings.Title(funcName)
khenaidooabad44c2018-08-03 16:58:35 -0400593 m := myClassValue.MethodByName(funcName)
594 if !m.IsValid() {
khenaidoo43c82122018-11-22 18:38:28 -0500595 return make([]reflect.Value, 0), fmt.Errorf("method-not-found \"%s\"", funcName)
khenaidooabad44c2018-08-03 16:58:35 -0400596 }
597 in := make([]reflect.Value, len(params))
598 for i, param := range params {
599 in[i] = reflect.ValueOf(param)
600 }
601 out = m.Call(in)
602 return
603}
604
khenaidoo297cd252019-02-07 22:10:23 -0500605func (kp *InterContainerProxy) addTransactionId(transactionId string, currentArgs []*ic.Argument) []*ic.Argument {
606 arg := &KVArg{
607 Key: TransactionKey,
608 Value: &ic.StrType{Val: transactionId},
609 }
610
611 var marshalledArg *any.Any
612 var err error
613 if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: transactionId}); err != nil {
614 log.Warnw("cannot-add-transactionId", log.Fields{"error": err})
615 return currentArgs
616 }
617 protoArg := &ic.Argument{
618 Key: arg.Key,
619 Value: marshalledArg,
620 }
621 return append(currentArgs, protoArg)
622}
623
khenaidoo54e0ddf2019-02-27 16:21:33 -0500624func (kp *InterContainerProxy) addFromTopic(fromTopic string, currentArgs []*ic.Argument) []*ic.Argument {
625 var marshalledArg *any.Any
626 var err error
627 if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: fromTopic}); err != nil {
628 log.Warnw("cannot-add-transactionId", log.Fields{"error": err})
629 return currentArgs
630 }
631 protoArg := &ic.Argument{
632 Key: FromTopic,
633 Value: marshalledArg,
634 }
635 return append(currentArgs, protoArg)
636}
637
638func (kp *InterContainerProxy) handleMessage(msg *ic.InterContainerMessage, targetInterface interface{}) {
khenaidooabad44c2018-08-03 16:58:35 -0400639
khenaidoo43c82122018-11-22 18:38:28 -0500640 // First extract the header to know whether this is a request - responses are handled by a different handler
khenaidoo79232702018-12-04 11:00:41 -0500641 if msg.Header.Type == ic.MessageType_REQUEST {
khenaidooabad44c2018-08-03 16:58:35 -0400642 var out []reflect.Value
643 var err error
644
645 // Get the request body
khenaidoo79232702018-12-04 11:00:41 -0500646 requestBody := &ic.InterContainerRequestBody{}
khenaidooabad44c2018-08-03 16:58:35 -0400647 if err = ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
648 log.Warnw("cannot-unmarshal-request", log.Fields{"error": err})
649 } else {
khenaidoo43c82122018-11-22 18:38:28 -0500650 log.Debugw("received-request", log.Fields{"rpc": requestBody.Rpc, "header": msg.Header})
khenaidooabad44c2018-08-03 16:58:35 -0400651 // let the callee unpack the arguments as its the only one that knows the real proto type
khenaidoo297cd252019-02-07 22:10:23 -0500652 // Augment the requestBody with the message Id as it will be used in scenarios where cores
653 // are set in pairs and competing
654 requestBody.Args = kp.addTransactionId(msg.Header.Id, requestBody.Args)
khenaidoo54e0ddf2019-02-27 16:21:33 -0500655
656 // Augment the requestBody with the From topic name as it will be used in scenarios where a container
657 // needs to send an unsollicited message to the currently requested container
658 requestBody.Args = kp.addFromTopic(msg.Header.FromTopic, requestBody.Args)
659
khenaidooabad44c2018-08-03 16:58:35 -0400660 out, err = CallFuncByName(targetInterface, requestBody.Rpc, requestBody.Args)
661 if err != nil {
662 log.Warn(err)
663 }
664 }
665 // Response required?
666 if requestBody.ResponseRequired {
667 // If we already have an error before then just return that
khenaidoo79232702018-12-04 11:00:41 -0500668 var returnError *ic.Error
khenaidooabad44c2018-08-03 16:58:35 -0400669 var returnedValues []interface{}
670 var success bool
671 if err != nil {
khenaidoo79232702018-12-04 11:00:41 -0500672 returnError = &ic.Error{Reason: err.Error()}
khenaidooabad44c2018-08-03 16:58:35 -0400673 returnedValues = make([]interface{}, 1)
674 returnedValues[0] = returnError
675 } else {
khenaidoob9203542018-09-17 22:56:37 -0400676 returnedValues = make([]interface{}, 0)
677 // Check for errors first
678 lastIndex := len(out) - 1
679 if out[lastIndex].Interface() != nil { // Error
khenaidoo09771ef2019-10-11 14:25:02 -0400680 if retError, ok := out[lastIndex].Interface().(error); ok {
681 if retError.Error() == ErrorTransactionNotAcquired.Error() {
682 log.Debugw("Ignoring request", log.Fields{"error": retError, "txId": msg.Header.Id})
683 return // Ignore - process is in competing mode and ignored transaction
684 }
685 returnError = &ic.Error{Reason: retError.Error()}
khenaidoob9203542018-09-17 22:56:37 -0400686 returnedValues = append(returnedValues, returnError)
687 } else { // Should never happen
khenaidoo79232702018-12-04 11:00:41 -0500688 returnError = &ic.Error{Reason: "incorrect-error-returns"}
khenaidoob9203542018-09-17 22:56:37 -0400689 returnedValues = append(returnedValues, returnError)
690 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500691 } else if len(out) == 2 && reflect.ValueOf(out[0].Interface()).IsValid() && reflect.ValueOf(out[0].Interface()).IsNil() {
khenaidoo09771ef2019-10-11 14:25:02 -0400692 log.Warnw("Unexpected response of (nil,nil)", log.Fields{"txId": msg.Header.Id})
693 return // Ignore - should not happen
khenaidoob9203542018-09-17 22:56:37 -0400694 } else { // Non-error case
695 success = true
696 for idx, val := range out {
khenaidoo43c82122018-11-22 18:38:28 -0500697 //log.Debugw("returned-api-response-loop", log.Fields{"idx": idx, "val": val.Interface()})
khenaidoob9203542018-09-17 22:56:37 -0400698 if idx != lastIndex {
699 returnedValues = append(returnedValues, val.Interface())
khenaidooabad44c2018-08-03 16:58:35 -0400700 }
khenaidooabad44c2018-08-03 16:58:35 -0400701 }
702 }
703 }
704
khenaidoo79232702018-12-04 11:00:41 -0500705 var icm *ic.InterContainerMessage
khenaidooabad44c2018-08-03 16:58:35 -0400706 if icm, err = encodeResponse(msg, success, returnedValues...); err != nil {
khenaidoob9203542018-09-17 22:56:37 -0400707 log.Warnw("error-encoding-response-returning-failure-result", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400708 icm = encodeDefaultFailedResponse(msg)
709 }
khenaidoo43c82122018-11-22 18:38:28 -0500710 // To preserve ordering of messages, all messages to a given topic are sent to the same partition
711 // by providing a message key. The key is encoded in the topic name. If the deviceId is not
712 // present then the key will be empty, hence all messages for a given topic will be sent to all
713 // partitions.
714 replyTopic := &Topic{Name: msg.Header.FromTopic}
khenaidoobdcb8e02019-03-06 16:28:56 -0500715 key := msg.Header.KeyTopic
khenaidoo90847922018-12-03 14:47:51 -0500716 log.Debugw("sending-response-to-kafka", log.Fields{"rpc": requestBody.Rpc, "header": icm.Header, "key": key})
khenaidoo43c82122018-11-22 18:38:28 -0500717 // TODO: handle error response.
khenaidoo2c6a0992019-04-29 13:46:56 -0400718 go kp.kafkaClient.Send(icm, replyTopic, key)
khenaidooabad44c2018-08-03 16:58:35 -0400719 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500720 } else if msg.Header.Type == ic.MessageType_RESPONSE {
khenaidoo1ce37ad2019-03-24 22:07:24 -0400721 log.Debugw("response-received", log.Fields{"msg-header": msg.Header})
khenaidoo54e0ddf2019-02-27 16:21:33 -0500722 go kp.dispatchResponse(msg)
723 } else {
khenaidoo1ce37ad2019-03-24 22:07:24 -0400724 log.Warnw("unsupported-message-received", log.Fields{"msg-header": msg.Header})
khenaidooabad44c2018-08-03 16:58:35 -0400725 }
726}
727
khenaidoo54e0ddf2019-02-27 16:21:33 -0500728func (kp *InterContainerProxy) waitForMessages(ch <-chan *ic.InterContainerMessage, topic Topic, targetInterface interface{}) {
khenaidooabad44c2018-08-03 16:58:35 -0400729 // Wait for messages
730 for msg := range ch {
khenaidoo43c82122018-11-22 18:38:28 -0500731 //log.Debugw("request-received", log.Fields{"msg": msg, "topic": topic.Name, "target": targetInterface})
khenaidoo54e0ddf2019-02-27 16:21:33 -0500732 go kp.handleMessage(msg, targetInterface)
khenaidooabad44c2018-08-03 16:58:35 -0400733 }
734}
735
khenaidoo79232702018-12-04 11:00:41 -0500736func (kp *InterContainerProxy) dispatchResponse(msg *ic.InterContainerMessage) {
khenaidoo1ce37ad2019-03-24 22:07:24 -0400737 kp.lockTransactionIdToChannelMap.RLock()
738 defer kp.lockTransactionIdToChannelMap.RUnlock()
khenaidooabad44c2018-08-03 16:58:35 -0400739 if _, exist := kp.transactionIdToChannelMap[msg.Header.Id]; !exist {
740 log.Debugw("no-waiting-channel", log.Fields{"transaction": msg.Header.Id})
741 return
742 }
khenaidoo43c82122018-11-22 18:38:28 -0500743 kp.transactionIdToChannelMap[msg.Header.Id].ch <- msg
khenaidooabad44c2018-08-03 16:58:35 -0400744}
745
khenaidooabad44c2018-08-03 16:58:35 -0400746// subscribeForResponse allows a caller to subscribe to a given topic when waiting for a response.
747// This method is built to prevent all subscribers to receive all messages as is the case of the Subscribe
748// API. There is one response channel waiting for kafka messages before dispatching the message to the
749// corresponding waiting channel
khenaidoo79232702018-12-04 11:00:41 -0500750func (kp *InterContainerProxy) subscribeForResponse(topic Topic, trnsId string) (chan *ic.InterContainerMessage, error) {
khenaidoob9203542018-09-17 22:56:37 -0400751 log.Debugw("subscribeForResponse", log.Fields{"topic": topic.Name, "trnsid": trnsId})
khenaidooabad44c2018-08-03 16:58:35 -0400752
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500753 // Create a specific channel for this consumers. We cannot use the channel from the kafkaclient as it will
khenaidoo43c82122018-11-22 18:38:28 -0500754 // broadcast any message for this topic to all channels waiting on it.
khenaidoo79232702018-12-04 11:00:41 -0500755 ch := make(chan *ic.InterContainerMessage)
khenaidoo43c82122018-11-22 18:38:28 -0500756 kp.addToTransactionIdToChannelMap(trnsId, &topic, ch)
khenaidooabad44c2018-08-03 16:58:35 -0400757
758 return ch, nil
759}
760
khenaidoo43c82122018-11-22 18:38:28 -0500761func (kp *InterContainerProxy) unSubscribeForResponse(trnsId string) error {
khenaidooabad44c2018-08-03 16:58:35 -0400762 log.Debugw("unsubscribe-for-response", log.Fields{"trnsId": trnsId})
khenaidoo7ff26c72019-01-16 14:55:48 -0500763 kp.deleteFromTransactionIdToChannelMap(trnsId)
khenaidooabad44c2018-08-03 16:58:35 -0400764 return nil
765}
766
Scott Bakeree6a0872019-10-29 15:59:52 -0700767func (kp *InterContainerProxy) EnableLivenessChannel(enable bool) chan bool {
768 return kp.kafkaClient.EnableLivenessChannel(enable)
769}
770
771func (kp *InterContainerProxy) SendLiveness() error {
772 return kp.kafkaClient.SendLiveness()
773}
774
khenaidooabad44c2018-08-03 16:58:35 -0400775//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
776//or an error on failure
khenaidoobdcb8e02019-03-06 16:28:56 -0500777func encodeRequest(rpc string, toTopic *Topic, replyTopic *Topic, key string, kvArgs ...*KVArg) (*ic.InterContainerMessage, error) {
khenaidoo79232702018-12-04 11:00:41 -0500778 requestHeader := &ic.Header{
khenaidooabad44c2018-08-03 16:58:35 -0400779 Id: uuid.New().String(),
khenaidoo79232702018-12-04 11:00:41 -0500780 Type: ic.MessageType_REQUEST,
khenaidooabad44c2018-08-03 16:58:35 -0400781 FromTopic: replyTopic.Name,
782 ToTopic: toTopic.Name,
khenaidoo2c6a0992019-04-29 13:46:56 -0400783 KeyTopic: key,
khenaidoo8f474192019-04-03 17:20:44 -0400784 Timestamp: time.Now().UnixNano(),
khenaidooabad44c2018-08-03 16:58:35 -0400785 }
khenaidoo79232702018-12-04 11:00:41 -0500786 requestBody := &ic.InterContainerRequestBody{
khenaidooabad44c2018-08-03 16:58:35 -0400787 Rpc: rpc,
788 ResponseRequired: true,
789 ReplyToTopic: replyTopic.Name,
790 }
791
792 for _, arg := range kvArgs {
khenaidoo2c6f1672018-09-20 23:14:41 -0400793 if arg == nil {
794 // In case the caller sends an array with empty args
795 continue
796 }
khenaidooabad44c2018-08-03 16:58:35 -0400797 var marshalledArg *any.Any
798 var err error
799 // ascertain the value interface type is a proto.Message
800 protoValue, ok := arg.Value.(proto.Message)
801 if !ok {
802 log.Warnw("argument-value-not-proto-message", log.Fields{"error": ok, "Value": arg.Value})
803 err := errors.New("argument-value-not-proto-message")
804 return nil, err
805 }
806 if marshalledArg, err = ptypes.MarshalAny(protoValue); err != nil {
807 log.Warnw("cannot-marshal-request", log.Fields{"error": err})
808 return nil, err
809 }
khenaidoo79232702018-12-04 11:00:41 -0500810 protoArg := &ic.Argument{
khenaidooabad44c2018-08-03 16:58:35 -0400811 Key: arg.Key,
812 Value: marshalledArg,
813 }
814 requestBody.Args = append(requestBody.Args, protoArg)
815 }
816
817 var marshalledData *any.Any
818 var err error
819 if marshalledData, err = ptypes.MarshalAny(requestBody); err != nil {
820 log.Warnw("cannot-marshal-request", log.Fields{"error": err})
821 return nil, err
822 }
khenaidoo79232702018-12-04 11:00:41 -0500823 request := &ic.InterContainerMessage{
khenaidooabad44c2018-08-03 16:58:35 -0400824 Header: requestHeader,
825 Body: marshalledData,
826 }
827 return request, nil
828}
829
khenaidoo79232702018-12-04 11:00:41 -0500830func decodeResponse(response *ic.InterContainerMessage) (*ic.InterContainerResponseBody, error) {
khenaidooabad44c2018-08-03 16:58:35 -0400831 // Extract the message body
khenaidoo79232702018-12-04 11:00:41 -0500832 responseBody := ic.InterContainerResponseBody{}
khenaidooabad44c2018-08-03 16:58:35 -0400833 if err := ptypes.UnmarshalAny(response.Body, &responseBody); err != nil {
834 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
835 return nil, err
836 }
khenaidoo43c82122018-11-22 18:38:28 -0500837 //log.Debugw("response-decoded-successfully", log.Fields{"response-status": &responseBody.Success})
khenaidooabad44c2018-08-03 16:58:35 -0400838
839 return &responseBody, nil
840
841}