blob: aa77ffbdc1f924f316b8d53b5171c18e80199fa1 [file] [log] [blame]
khenaidoobf6e7bb2018-08-14 22:27:29 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
khenaidooabad44c2018-08-03 16:58:35 -040016package kafka
17
18import (
19 "context"
20 "errors"
21 "fmt"
khenaidooabad44c2018-08-03 16:58:35 -040022 "reflect"
khenaidoo19374072018-12-11 11:05:15 -050023 "strings"
khenaidooabad44c2018-08-03 16:58:35 -040024 "sync"
25 "time"
khenaidooabad44c2018-08-03 16:58:35 -040026
serkant.uluderya2ae470f2020-01-21 11:13:09 -080027 "github.com/golang/protobuf/proto"
28 "github.com/golang/protobuf/ptypes"
29 "github.com/golang/protobuf/ptypes/any"
30 "github.com/google/uuid"
31 "github.com/opencord/voltha-lib-go/v3/pkg/log"
32 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
33)
khenaidooabad44c2018-08-03 16:58:35 -040034
35const (
khenaidoo43c82122018-11-22 18:38:28 -050036 DefaultMaxRetries = 3
khenaidoo6d055132019-02-12 16:51:19 -050037 DefaultRequestTimeout = 10000 // 10000 milliseconds - to handle a wider latency range
khenaidooabad44c2018-08-03 16:58:35 -040038)
39
khenaidoo297cd252019-02-07 22:10:23 -050040const (
41 TransactionKey = "transactionID"
khenaidoo54e0ddf2019-02-27 16:21:33 -050042 FromTopic = "fromTopic"
khenaidoo297cd252019-02-07 22:10:23 -050043)
44
khenaidoo09771ef2019-10-11 14:25:02 -040045var ErrorTransactionNotAcquired = errors.New("transaction-not-acquired")
46var ErrorTransactionInvalidId = errors.New("transaction-invalid-id")
47
khenaidoo43c82122018-11-22 18:38:28 -050048// requestHandlerChannel represents an interface associated with a channel. Whenever, an event is
49// obtained from that channel, this interface is invoked. This is used to handle
50// async requests into the Core via the kafka messaging bus
51type requestHandlerChannel struct {
52 requesthandlerInterface interface{}
khenaidoo79232702018-12-04 11:00:41 -050053 ch <-chan *ic.InterContainerMessage
khenaidooabad44c2018-08-03 16:58:35 -040054}
55
khenaidoo43c82122018-11-22 18:38:28 -050056// transactionChannel represents a combination of a topic and a channel onto which a response received
57// on the kafka bus will be sent to
58type transactionChannel struct {
59 topic *Topic
khenaidoo79232702018-12-04 11:00:41 -050060 ch chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -050061}
62
npujar467fe752020-01-16 20:17:45 +053063type InterContainerProxy interface {
64 Start() error
65 Stop()
66 GetDefaultTopic() *Topic
67 DeviceDiscovered(deviceId string, deviceType string, parentId string, publisher string) error
68 InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic, waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any)
69 SubscribeWithRequestHandlerInterface(topic Topic, handler interface{}) error
70 SubscribeWithDefaultRequestHandler(topic Topic, initialOffset int64) error
71 UnSubscribeFromRequestHandler(topic Topic) error
72 DeleteTopic(topic Topic) error
73 EnableLivenessChannel(enable bool) chan bool
74 SendLiveness() error
75}
76
77// interContainerProxy represents the messaging proxy
78type interContainerProxy struct {
khenaidoo43c82122018-11-22 18:38:28 -050079 kafkaHost string
80 kafkaPort int
npujar467fe752020-01-16 20:17:45 +053081 defaultTopic *Topic
khenaidoo43c82122018-11-22 18:38:28 -050082 defaultRequestHandlerInterface interface{}
khenaidoo79232702018-12-04 11:00:41 -050083 deviceDiscoveryTopic *Topic
khenaidoo43c82122018-11-22 18:38:28 -050084 kafkaClient Client
npujar467fe752020-01-16 20:17:45 +053085 doneCh chan struct{}
86 doneOnce sync.Once
khenaidoo43c82122018-11-22 18:38:28 -050087
88 // This map is used to map a topic to an interface and channel. When a request is received
89 // on that channel (registered to the topic) then that interface is invoked.
90 topicToRequestHandlerChannelMap map[string]*requestHandlerChannel
91 lockTopicRequestHandlerChannelMap sync.RWMutex
92
93 // This map is used to map a channel to a response topic. This channel handles all responses on that
khenaidoo4c1a5bf2018-11-29 15:53:42 -050094 // channel for that topic and forward them to the appropriate consumers channel, using the
khenaidoo43c82122018-11-22 18:38:28 -050095 // transactionIdToChannelMap.
khenaidoo79232702018-12-04 11:00:41 -050096 topicToResponseChannelMap map[string]<-chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -050097 lockTopicResponseChannelMap sync.RWMutex
98
khenaidoo4c1a5bf2018-11-29 15:53:42 -050099 // This map is used to map a transaction to a consumers channel. This is used whenever a request has been
khenaidoo43c82122018-11-22 18:38:28 -0500100 // sent out and we are waiting for a response.
101 transactionIdToChannelMap map[string]*transactionChannel
khenaidooabad44c2018-08-03 16:58:35 -0400102 lockTransactionIdToChannelMap sync.RWMutex
103}
104
npujar467fe752020-01-16 20:17:45 +0530105type InterContainerProxyOption func(*interContainerProxy)
khenaidooabad44c2018-08-03 16:58:35 -0400106
khenaidoo43c82122018-11-22 18:38:28 -0500107func InterContainerHost(host string) InterContainerProxyOption {
npujar467fe752020-01-16 20:17:45 +0530108 return func(args *interContainerProxy) {
khenaidoo43c82122018-11-22 18:38:28 -0500109 args.kafkaHost = host
khenaidooabad44c2018-08-03 16:58:35 -0400110 }
111}
112
khenaidoo43c82122018-11-22 18:38:28 -0500113func InterContainerPort(port int) InterContainerProxyOption {
npujar467fe752020-01-16 20:17:45 +0530114 return func(args *interContainerProxy) {
khenaidoo43c82122018-11-22 18:38:28 -0500115 args.kafkaPort = port
khenaidooabad44c2018-08-03 16:58:35 -0400116 }
117}
118
khenaidoo43c82122018-11-22 18:38:28 -0500119func DefaultTopic(topic *Topic) InterContainerProxyOption {
npujar467fe752020-01-16 20:17:45 +0530120 return func(args *interContainerProxy) {
121 args.defaultTopic = topic
khenaidooabad44c2018-08-03 16:58:35 -0400122 }
123}
124
khenaidoo79232702018-12-04 11:00:41 -0500125func DeviceDiscoveryTopic(topic *Topic) InterContainerProxyOption {
npujar467fe752020-01-16 20:17:45 +0530126 return func(args *interContainerProxy) {
khenaidoo79232702018-12-04 11:00:41 -0500127 args.deviceDiscoveryTopic = topic
128 }
129}
130
khenaidoo43c82122018-11-22 18:38:28 -0500131func RequestHandlerInterface(handler interface{}) InterContainerProxyOption {
npujar467fe752020-01-16 20:17:45 +0530132 return func(args *interContainerProxy) {
khenaidoo43c82122018-11-22 18:38:28 -0500133 args.defaultRequestHandlerInterface = handler
khenaidooabad44c2018-08-03 16:58:35 -0400134 }
135}
136
khenaidoo43c82122018-11-22 18:38:28 -0500137func MsgClient(client Client) InterContainerProxyOption {
npujar467fe752020-01-16 20:17:45 +0530138 return func(args *interContainerProxy) {
khenaidoo43c82122018-11-22 18:38:28 -0500139 args.kafkaClient = client
140 }
141}
142
npujar467fe752020-01-16 20:17:45 +0530143func newInterContainerProxy(opts ...InterContainerProxyOption) *interContainerProxy {
144 proxy := &interContainerProxy{
khenaidoo43c82122018-11-22 18:38:28 -0500145 kafkaHost: DefaultKafkaHost,
146 kafkaPort: DefaultKafkaPort,
npujar467fe752020-01-16 20:17:45 +0530147 doneCh: make(chan struct{}),
khenaidooabad44c2018-08-03 16:58:35 -0400148 }
149
150 for _, option := range opts {
151 option(proxy)
152 }
153
npujar467fe752020-01-16 20:17:45 +0530154 return proxy
khenaidooabad44c2018-08-03 16:58:35 -0400155}
156
npujar467fe752020-01-16 20:17:45 +0530157func NewInterContainerProxy(opts ...InterContainerProxyOption) InterContainerProxy {
158 return newInterContainerProxy(opts...)
159}
160
161func (kp *interContainerProxy) Start() error {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800162 logger.Info("Starting-Proxy")
khenaidooabad44c2018-08-03 16:58:35 -0400163
khenaidoo43c82122018-11-22 18:38:28 -0500164 // Kafka MsgClient should already have been created. If not, output fatal error
165 if kp.kafkaClient == nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800166 logger.Fatal("kafka-client-not-set")
khenaidoo43c82122018-11-22 18:38:28 -0500167 }
168
khenaidoo43c82122018-11-22 18:38:28 -0500169 // Start the kafka client
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500170 if err := kp.kafkaClient.Start(); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800171 logger.Errorw("Cannot-create-kafka-proxy", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400172 return err
173 }
174
khenaidoo43c82122018-11-22 18:38:28 -0500175 // Create the topic to response channel map
khenaidoo79232702018-12-04 11:00:41 -0500176 kp.topicToResponseChannelMap = make(map[string]<-chan *ic.InterContainerMessage)
khenaidoo43c82122018-11-22 18:38:28 -0500177 //
khenaidooabad44c2018-08-03 16:58:35 -0400178 // Create the transactionId to Channel Map
khenaidoo43c82122018-11-22 18:38:28 -0500179 kp.transactionIdToChannelMap = make(map[string]*transactionChannel)
180
181 // Create the topic to request channel map
182 kp.topicToRequestHandlerChannelMap = make(map[string]*requestHandlerChannel)
khenaidooabad44c2018-08-03 16:58:35 -0400183
184 return nil
185}
186
npujar467fe752020-01-16 20:17:45 +0530187func (kp *interContainerProxy) Stop() {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800188 logger.Info("stopping-intercontainer-proxy")
npujar467fe752020-01-16 20:17:45 +0530189 kp.doneOnce.Do(func() { close(kp.doneCh) })
khenaidoo43c82122018-11-22 18:38:28 -0500190 // TODO : Perform cleanup
khenaidooca301322019-01-09 23:06:32 -0500191 kp.kafkaClient.Stop()
khenaidoo43c82122018-11-22 18:38:28 -0500192 //kp.deleteAllTopicRequestHandlerChannelMap()
193 //kp.deleteAllTopicResponseChannelMap()
194 //kp.deleteAllTransactionIdToChannelMap()
khenaidooabad44c2018-08-03 16:58:35 -0400195}
196
npujar467fe752020-01-16 20:17:45 +0530197func (kp *interContainerProxy) GetDefaultTopic() *Topic {
198 return kp.defaultTopic
199}
200
khenaidoo79232702018-12-04 11:00:41 -0500201// DeviceDiscovered publish the discovered device onto the kafka messaging bus
npujar467fe752020-01-16 20:17:45 +0530202func (kp *interContainerProxy) DeviceDiscovered(deviceId string, deviceType string, parentId string, publisher string) error {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800203 logger.Debugw("sending-device-discovery-msg", log.Fields{"deviceId": deviceId})
khenaidoo79232702018-12-04 11:00:41 -0500204 // Simple validation
205 if deviceId == "" || deviceType == "" {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800206 logger.Errorw("invalid-parameters", log.Fields{"id": deviceId, "type": deviceType})
khenaidoo79232702018-12-04 11:00:41 -0500207 return errors.New("invalid-parameters")
208 }
209 // Create the device discovery message
210 header := &ic.Header{
211 Id: uuid.New().String(),
212 Type: ic.MessageType_DEVICE_DISCOVERED,
npujar467fe752020-01-16 20:17:45 +0530213 FromTopic: kp.defaultTopic.Name,
khenaidoo79232702018-12-04 11:00:41 -0500214 ToTopic: kp.deviceDiscoveryTopic.Name,
215 Timestamp: time.Now().UnixNano(),
216 }
217 body := &ic.DeviceDiscovered{
218 Id: deviceId,
219 DeviceType: deviceType,
220 ParentId: parentId,
khenaidood2b6df92018-12-13 16:37:20 -0500221 Publisher: publisher,
khenaidoo79232702018-12-04 11:00:41 -0500222 }
223
224 var marshalledData *any.Any
225 var err error
226 if marshalledData, err = ptypes.MarshalAny(body); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800227 logger.Errorw("cannot-marshal-request", log.Fields{"error": err})
khenaidoo79232702018-12-04 11:00:41 -0500228 return err
229 }
230 msg := &ic.InterContainerMessage{
231 Header: header,
232 Body: marshalledData,
233 }
234
235 // Send the message
236 if err := kp.kafkaClient.Send(msg, kp.deviceDiscoveryTopic); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800237 logger.Errorw("cannot-send-device-discovery-message", log.Fields{"error": err})
khenaidoo79232702018-12-04 11:00:41 -0500238 return err
239 }
240 return nil
241}
242
khenaidoo43c82122018-11-22 18:38:28 -0500243// InvokeRPC is used to send a request to a given topic
npujar467fe752020-01-16 20:17:45 +0530244func (kp *interContainerProxy) InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
khenaidoobdcb8e02019-03-06 16:28:56 -0500245 waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any) {
khenaidoo43c82122018-11-22 18:38:28 -0500246
247 // If a replyToTopic is provided then we use it, otherwise just use the default toTopic. The replyToTopic is
248 // typically the device ID.
249 responseTopic := replyToTopic
250 if responseTopic == nil {
npujar467fe752020-01-16 20:17:45 +0530251 responseTopic = kp.defaultTopic
khenaidoo43c82122018-11-22 18:38:28 -0500252 }
253
khenaidooabad44c2018-08-03 16:58:35 -0400254 // Encode the request
khenaidoobdcb8e02019-03-06 16:28:56 -0500255 protoRequest, err := encodeRequest(rpc, toTopic, responseTopic, key, kvArgs...)
khenaidooabad44c2018-08-03 16:58:35 -0400256 if err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800257 logger.Warnw("cannot-format-request", log.Fields{"rpc": rpc, "error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400258 return false, nil
259 }
260
261 // Subscribe for response, if needed, before sending request
khenaidoo79232702018-12-04 11:00:41 -0500262 var ch <-chan *ic.InterContainerMessage
khenaidooabad44c2018-08-03 16:58:35 -0400263 if waitForResponse {
264 var err error
khenaidoo43c82122018-11-22 18:38:28 -0500265 if ch, err = kp.subscribeForResponse(*responseTopic, protoRequest.Header.Id); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800266 logger.Errorw("failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
khenaidooabad44c2018-08-03 16:58:35 -0400267 }
268 }
269
khenaidoo43c82122018-11-22 18:38:28 -0500270 // Send request - if the topic is formatted with a device Id then we will send the request using a
271 // specific key, hence ensuring a single partition is used to publish the request. This ensures that the
272 // subscriber on that topic will receive the request in the order it was sent. The key used is the deviceId.
khenaidoobdcb8e02019-03-06 16:28:56 -0500273 //key := GetDeviceIdFromTopic(*toTopic)
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800274 logger.Debugw("sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000275 go func() {
276 if err := kp.kafkaClient.Send(protoRequest, toTopic, key); err != nil {
277 logger.Errorw("send-failed", log.Fields{
278 "topic": toTopic,
279 "key": key,
280 "error": err})
281 }
282 }()
khenaidooabad44c2018-08-03 16:58:35 -0400283
284 if waitForResponse {
khenaidoob9203542018-09-17 22:56:37 -0400285 // Create a child context based on the parent context, if any
khenaidooabad44c2018-08-03 16:58:35 -0400286 var cancel context.CancelFunc
khenaidoob9203542018-09-17 22:56:37 -0400287 childCtx := context.Background()
khenaidooabad44c2018-08-03 16:58:35 -0400288 if ctx == nil {
289 ctx, cancel = context.WithTimeout(context.Background(), DefaultRequestTimeout*time.Millisecond)
khenaidoob9203542018-09-17 22:56:37 -0400290 } else {
291 childCtx, cancel = context.WithTimeout(ctx, DefaultRequestTimeout*time.Millisecond)
khenaidooabad44c2018-08-03 16:58:35 -0400292 }
khenaidoob9203542018-09-17 22:56:37 -0400293 defer cancel()
khenaidooabad44c2018-08-03 16:58:35 -0400294
295 // Wait for response as well as timeout or cancellation
296 // Remove the subscription for a response on return
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000297 defer func() {
298 if err := kp.unSubscribeForResponse(protoRequest.Header.Id); err != nil {
299 logger.Errorw("response-unsubscribe-failed", log.Fields{
300 "id": protoRequest.Header.Id,
301 "error": err})
302 }
303 }()
khenaidooabad44c2018-08-03 16:58:35 -0400304 select {
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500305 case msg, ok := <-ch:
306 if !ok {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800307 logger.Warnw("channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500308 protoError := &ic.Error{Reason: "channel-closed"}
309 var marshalledArg *any.Any
310 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
311 return false, nil // Should never happen
312 }
313 return false, marshalledArg
314 }
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800315 logger.Debugw("received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
khenaidoo79232702018-12-04 11:00:41 -0500316 var responseBody *ic.InterContainerResponseBody
khenaidooabad44c2018-08-03 16:58:35 -0400317 var err error
318 if responseBody, err = decodeResponse(msg); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800319 logger.Errorw("decode-response-error", log.Fields{"error": err})
npujar467fe752020-01-16 20:17:45 +0530320 // FIXME we should return something
khenaidooabad44c2018-08-03 16:58:35 -0400321 }
322 return responseBody.Success, responseBody.Result
323 case <-ctx.Done():
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800324 logger.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
khenaidooabad44c2018-08-03 16:58:35 -0400325 // pack the error as proto any type
npujar467fe752020-01-16 20:17:45 +0530326 protoError := &ic.Error{Reason: ctx.Err().Error(), Code: ic.ErrorCode_DEADLINE_EXCEEDED}
327
khenaidooabad44c2018-08-03 16:58:35 -0400328 var marshalledArg *any.Any
329 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
330 return false, nil // Should never happen
331 }
332 return false, marshalledArg
khenaidoob9203542018-09-17 22:56:37 -0400333 case <-childCtx.Done():
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800334 logger.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": childCtx.Err()})
khenaidoob9203542018-09-17 22:56:37 -0400335 // pack the error as proto any type
npujar467fe752020-01-16 20:17:45 +0530336 protoError := &ic.Error{Reason: childCtx.Err().Error(), Code: ic.ErrorCode_DEADLINE_EXCEEDED}
337
khenaidoob9203542018-09-17 22:56:37 -0400338 var marshalledArg *any.Any
339 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
340 return false, nil // Should never happen
341 }
342 return false, marshalledArg
khenaidooabad44c2018-08-03 16:58:35 -0400343 case <-kp.doneCh:
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800344 logger.Infow("received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
khenaidooabad44c2018-08-03 16:58:35 -0400345 return true, nil
346 }
347 }
348 return true, nil
349}
350
khenaidoo43c82122018-11-22 18:38:28 -0500351// SubscribeWithRequestHandlerInterface allows a caller to assign a target object to be invoked automatically
khenaidooabad44c2018-08-03 16:58:35 -0400352// when a message is received on a given topic
npujar467fe752020-01-16 20:17:45 +0530353func (kp *interContainerProxy) SubscribeWithRequestHandlerInterface(topic Topic, handler interface{}) error {
khenaidooabad44c2018-08-03 16:58:35 -0400354
355 // Subscribe to receive messages for that topic
khenaidoo79232702018-12-04 11:00:41 -0500356 var ch <-chan *ic.InterContainerMessage
khenaidooabad44c2018-08-03 16:58:35 -0400357 var err error
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500358 if ch, err = kp.kafkaClient.Subscribe(&topic); err != nil {
khenaidoo43c82122018-11-22 18:38:28 -0500359 //if ch, err = kp.Subscribe(topic); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800360 logger.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
Abhilash S.L90cd9552019-07-18 17:30:29 +0530361 return err
khenaidooabad44c2018-08-03 16:58:35 -0400362 }
khenaidoo43c82122018-11-22 18:38:28 -0500363
364 kp.defaultRequestHandlerInterface = handler
365 kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: handler, ch: ch})
khenaidooabad44c2018-08-03 16:58:35 -0400366 // Launch a go routine to receive and process kafka messages
khenaidoo54e0ddf2019-02-27 16:21:33 -0500367 go kp.waitForMessages(ch, topic, handler)
khenaidooabad44c2018-08-03 16:58:35 -0400368
369 return nil
370}
371
khenaidoo43c82122018-11-22 18:38:28 -0500372// SubscribeWithDefaultRequestHandler allows a caller to add a topic to an existing target object to be invoked automatically
373// when a message is received on a given topic. So far there is only 1 target registered per microservice
npujar467fe752020-01-16 20:17:45 +0530374func (kp *interContainerProxy) SubscribeWithDefaultRequestHandler(topic Topic, initialOffset int64) error {
khenaidoo43c82122018-11-22 18:38:28 -0500375 // Subscribe to receive messages for that topic
khenaidoo79232702018-12-04 11:00:41 -0500376 var ch <-chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -0500377 var err error
khenaidoo54e0ddf2019-02-27 16:21:33 -0500378 if ch, err = kp.kafkaClient.Subscribe(&topic, &KVArg{Key: Offset, Value: initialOffset}); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800379 logger.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
khenaidooca301322019-01-09 23:06:32 -0500380 return err
khenaidoo43c82122018-11-22 18:38:28 -0500381 }
382 kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: kp.defaultRequestHandlerInterface, ch: ch})
383
384 // Launch a go routine to receive and process kafka messages
khenaidoo54e0ddf2019-02-27 16:21:33 -0500385 go kp.waitForMessages(ch, topic, kp.defaultRequestHandlerInterface)
khenaidoo43c82122018-11-22 18:38:28 -0500386
khenaidooabad44c2018-08-03 16:58:35 -0400387 return nil
388}
389
npujar467fe752020-01-16 20:17:45 +0530390func (kp *interContainerProxy) UnSubscribeFromRequestHandler(topic Topic) error {
khenaidoo43c82122018-11-22 18:38:28 -0500391 return kp.deleteFromTopicRequestHandlerChannelMap(topic.Name)
392}
393
npujar467fe752020-01-16 20:17:45 +0530394func (kp *interContainerProxy) deleteFromTopicResponseChannelMap(topic string) error {
khenaidoo43c82122018-11-22 18:38:28 -0500395 kp.lockTopicResponseChannelMap.Lock()
396 defer kp.lockTopicResponseChannelMap.Unlock()
397 if _, exist := kp.topicToResponseChannelMap[topic]; exist {
398 // Unsubscribe to this topic first - this will close the subscribed channel
399 var err error
400 if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800401 logger.Errorw("unsubscribing-error", log.Fields{"topic": topic})
khenaidoo43c82122018-11-22 18:38:28 -0500402 }
403 delete(kp.topicToResponseChannelMap, topic)
404 return err
405 } else {
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000406 return fmt.Errorf("%s-Topic-not-found", topic)
khenaidooabad44c2018-08-03 16:58:35 -0400407 }
408}
409
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000410// nolint: unused
npujar467fe752020-01-16 20:17:45 +0530411func (kp *interContainerProxy) deleteAllTopicResponseChannelMap() error {
khenaidoo43c82122018-11-22 18:38:28 -0500412 kp.lockTopicResponseChannelMap.Lock()
413 defer kp.lockTopicResponseChannelMap.Unlock()
414 var err error
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000415 for topic := range kp.topicToResponseChannelMap {
khenaidoo43c82122018-11-22 18:38:28 -0500416 // Unsubscribe to this topic first - this will close the subscribed channel
417 if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800418 logger.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
khenaidoo43c82122018-11-22 18:38:28 -0500419 }
420 delete(kp.topicToResponseChannelMap, topic)
khenaidooabad44c2018-08-03 16:58:35 -0400421 }
khenaidoo43c82122018-11-22 18:38:28 -0500422 return err
khenaidooabad44c2018-08-03 16:58:35 -0400423}
424
npujar467fe752020-01-16 20:17:45 +0530425func (kp *interContainerProxy) addToTopicRequestHandlerChannelMap(topic string, arg *requestHandlerChannel) {
khenaidoo43c82122018-11-22 18:38:28 -0500426 kp.lockTopicRequestHandlerChannelMap.Lock()
427 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
428 if _, exist := kp.topicToRequestHandlerChannelMap[topic]; !exist {
429 kp.topicToRequestHandlerChannelMap[topic] = arg
khenaidooabad44c2018-08-03 16:58:35 -0400430 }
khenaidooabad44c2018-08-03 16:58:35 -0400431}
432
npujar467fe752020-01-16 20:17:45 +0530433func (kp *interContainerProxy) deleteFromTopicRequestHandlerChannelMap(topic string) error {
khenaidoo43c82122018-11-22 18:38:28 -0500434 kp.lockTopicRequestHandlerChannelMap.Lock()
435 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
436 if _, exist := kp.topicToRequestHandlerChannelMap[topic]; exist {
437 // Close the kafka client client first by unsubscribing to this topic
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000438 if err := kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
439 return err
440 }
khenaidoo43c82122018-11-22 18:38:28 -0500441 delete(kp.topicToRequestHandlerChannelMap, topic)
khenaidooabad44c2018-08-03 16:58:35 -0400442 return nil
khenaidoo43c82122018-11-22 18:38:28 -0500443 } else {
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000444 return fmt.Errorf("%s-Topic-not-found", topic)
khenaidooabad44c2018-08-03 16:58:35 -0400445 }
khenaidooabad44c2018-08-03 16:58:35 -0400446}
447
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000448// nolint: unused
npujar467fe752020-01-16 20:17:45 +0530449func (kp *interContainerProxy) deleteAllTopicRequestHandlerChannelMap() error {
khenaidoo43c82122018-11-22 18:38:28 -0500450 kp.lockTopicRequestHandlerChannelMap.Lock()
451 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
452 var err error
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000453 for topic := range kp.topicToRequestHandlerChannelMap {
khenaidoo43c82122018-11-22 18:38:28 -0500454 // Close the kafka client client first by unsubscribing to this topic
455 if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800456 logger.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
khenaidoo43c82122018-11-22 18:38:28 -0500457 }
458 delete(kp.topicToRequestHandlerChannelMap, topic)
459 }
460 return err
461}
462
npujar467fe752020-01-16 20:17:45 +0530463func (kp *interContainerProxy) addToTransactionIdToChannelMap(id string, topic *Topic, arg chan *ic.InterContainerMessage) {
khenaidooabad44c2018-08-03 16:58:35 -0400464 kp.lockTransactionIdToChannelMap.Lock()
465 defer kp.lockTransactionIdToChannelMap.Unlock()
466 if _, exist := kp.transactionIdToChannelMap[id]; !exist {
khenaidoo43c82122018-11-22 18:38:28 -0500467 kp.transactionIdToChannelMap[id] = &transactionChannel{topic: topic, ch: arg}
khenaidooabad44c2018-08-03 16:58:35 -0400468 }
469}
470
npujar467fe752020-01-16 20:17:45 +0530471func (kp *interContainerProxy) deleteFromTransactionIdToChannelMap(id string) {
khenaidooabad44c2018-08-03 16:58:35 -0400472 kp.lockTransactionIdToChannelMap.Lock()
473 defer kp.lockTransactionIdToChannelMap.Unlock()
khenaidoo43c82122018-11-22 18:38:28 -0500474 if transChannel, exist := kp.transactionIdToChannelMap[id]; exist {
475 // Close the channel first
476 close(transChannel.ch)
khenaidooabad44c2018-08-03 16:58:35 -0400477 delete(kp.transactionIdToChannelMap, id)
478 }
479}
480
npujar467fe752020-01-16 20:17:45 +0530481func (kp *interContainerProxy) deleteTopicTransactionIdToChannelMap(id string) {
khenaidoo43c82122018-11-22 18:38:28 -0500482 kp.lockTransactionIdToChannelMap.Lock()
483 defer kp.lockTransactionIdToChannelMap.Unlock()
484 for key, value := range kp.transactionIdToChannelMap {
485 if value.topic.Name == id {
486 close(value.ch)
487 delete(kp.transactionIdToChannelMap, key)
khenaidooabad44c2018-08-03 16:58:35 -0400488 }
489 }
khenaidooabad44c2018-08-03 16:58:35 -0400490}
491
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000492// nolint: unused
npujar467fe752020-01-16 20:17:45 +0530493func (kp *interContainerProxy) deleteAllTransactionIdToChannelMap() {
khenaidoo43c82122018-11-22 18:38:28 -0500494 kp.lockTransactionIdToChannelMap.Lock()
495 defer kp.lockTransactionIdToChannelMap.Unlock()
496 for key, value := range kp.transactionIdToChannelMap {
497 close(value.ch)
498 delete(kp.transactionIdToChannelMap, key)
khenaidooabad44c2018-08-03 16:58:35 -0400499 }
khenaidooabad44c2018-08-03 16:58:35 -0400500}
501
npujar467fe752020-01-16 20:17:45 +0530502func (kp *interContainerProxy) DeleteTopic(topic Topic) error {
khenaidoo43c82122018-11-22 18:38:28 -0500503 // If we have any consumers on that topic we need to close them
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500504 if err := kp.deleteFromTopicResponseChannelMap(topic.Name); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800505 logger.Errorw("delete-from-topic-responsechannelmap-failed", log.Fields{"error": err})
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500506 }
507 if err := kp.deleteFromTopicRequestHandlerChannelMap(topic.Name); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800508 logger.Errorw("delete-from-topic-requesthandlerchannelmap-failed", log.Fields{"error": err})
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500509 }
khenaidoo43c82122018-11-22 18:38:28 -0500510 kp.deleteTopicTransactionIdToChannelMap(topic.Name)
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500511
khenaidoo43c82122018-11-22 18:38:28 -0500512 return kp.kafkaClient.DeleteTopic(&topic)
513}
514
515func encodeReturnedValue(returnedVal interface{}) (*any.Any, error) {
khenaidooabad44c2018-08-03 16:58:35 -0400516 // Encode the response argument - needs to be a proto message
517 if returnedVal == nil {
518 return nil, nil
519 }
520 protoValue, ok := returnedVal.(proto.Message)
521 if !ok {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800522 logger.Warnw("response-value-not-proto-message", log.Fields{"error": ok, "returnVal": returnedVal})
khenaidooabad44c2018-08-03 16:58:35 -0400523 err := errors.New("response-value-not-proto-message")
524 return nil, err
525 }
526
527 // Marshal the returned value, if any
528 var marshalledReturnedVal *any.Any
529 var err error
530 if marshalledReturnedVal, err = ptypes.MarshalAny(protoValue); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800531 logger.Warnw("cannot-marshal-returned-val", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400532 return nil, err
533 }
534 return marshalledReturnedVal, nil
535}
536
khenaidoo79232702018-12-04 11:00:41 -0500537func encodeDefaultFailedResponse(request *ic.InterContainerMessage) *ic.InterContainerMessage {
538 responseHeader := &ic.Header{
khenaidooabad44c2018-08-03 16:58:35 -0400539 Id: request.Header.Id,
khenaidoo79232702018-12-04 11:00:41 -0500540 Type: ic.MessageType_RESPONSE,
khenaidooabad44c2018-08-03 16:58:35 -0400541 FromTopic: request.Header.ToTopic,
542 ToTopic: request.Header.FromTopic,
npujar467fe752020-01-16 20:17:45 +0530543 Timestamp: time.Now().UnixNano(),
khenaidooabad44c2018-08-03 16:58:35 -0400544 }
khenaidoo79232702018-12-04 11:00:41 -0500545 responseBody := &ic.InterContainerResponseBody{
khenaidooabad44c2018-08-03 16:58:35 -0400546 Success: false,
547 Result: nil,
548 }
549 var marshalledResponseBody *any.Any
550 var err error
551 // Error should never happen here
552 if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800553 logger.Warnw("cannot-marshal-failed-response-body", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400554 }
555
khenaidoo79232702018-12-04 11:00:41 -0500556 return &ic.InterContainerMessage{
khenaidooabad44c2018-08-03 16:58:35 -0400557 Header: responseHeader,
558 Body: marshalledResponseBody,
559 }
560
561}
562
563//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
564//or an error on failure
khenaidoo79232702018-12-04 11:00:41 -0500565func encodeResponse(request *ic.InterContainerMessage, success bool, returnedValues ...interface{}) (*ic.InterContainerMessage, error) {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800566 //logger.Debugw("encodeResponse", log.Fields{"success": success, "returnedValues": returnedValues})
khenaidoo79232702018-12-04 11:00:41 -0500567 responseHeader := &ic.Header{
khenaidooabad44c2018-08-03 16:58:35 -0400568 Id: request.Header.Id,
khenaidoo79232702018-12-04 11:00:41 -0500569 Type: ic.MessageType_RESPONSE,
khenaidooabad44c2018-08-03 16:58:35 -0400570 FromTopic: request.Header.ToTopic,
571 ToTopic: request.Header.FromTopic,
khenaidoo2c6a0992019-04-29 13:46:56 -0400572 KeyTopic: request.Header.KeyTopic,
khenaidoo8f474192019-04-03 17:20:44 -0400573 Timestamp: time.Now().UnixNano(),
khenaidooabad44c2018-08-03 16:58:35 -0400574 }
575
576 // Go over all returned values
577 var marshalledReturnedVal *any.Any
578 var err error
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000579
580 // for now we support only 1 returned value - (excluding the error)
581 if len(returnedValues) > 0 {
582 if marshalledReturnedVal, err = encodeReturnedValue(returnedValues[0]); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800583 logger.Warnw("cannot-marshal-response-body", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400584 }
khenaidooabad44c2018-08-03 16:58:35 -0400585 }
586
khenaidoo79232702018-12-04 11:00:41 -0500587 responseBody := &ic.InterContainerResponseBody{
khenaidooabad44c2018-08-03 16:58:35 -0400588 Success: success,
589 Result: marshalledReturnedVal,
590 }
591
592 // Marshal the response body
593 var marshalledResponseBody *any.Any
594 if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800595 logger.Warnw("cannot-marshal-response-body", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400596 return nil, err
597 }
598
khenaidoo79232702018-12-04 11:00:41 -0500599 return &ic.InterContainerMessage{
khenaidooabad44c2018-08-03 16:58:35 -0400600 Header: responseHeader,
601 Body: marshalledResponseBody,
602 }, nil
603}
604
605func CallFuncByName(myClass interface{}, funcName string, params ...interface{}) (out []reflect.Value, err error) {
606 myClassValue := reflect.ValueOf(myClass)
khenaidoo19374072018-12-11 11:05:15 -0500607 // Capitalize the first letter in the funcName to workaround the first capital letters required to
608 // invoke a function from a different package
609 funcName = strings.Title(funcName)
khenaidooabad44c2018-08-03 16:58:35 -0400610 m := myClassValue.MethodByName(funcName)
611 if !m.IsValid() {
khenaidoo43c82122018-11-22 18:38:28 -0500612 return make([]reflect.Value, 0), fmt.Errorf("method-not-found \"%s\"", funcName)
khenaidooabad44c2018-08-03 16:58:35 -0400613 }
614 in := make([]reflect.Value, len(params))
615 for i, param := range params {
616 in[i] = reflect.ValueOf(param)
617 }
618 out = m.Call(in)
619 return
620}
621
npujar467fe752020-01-16 20:17:45 +0530622func (kp *interContainerProxy) addTransactionId(transactionId string, currentArgs []*ic.Argument) []*ic.Argument {
khenaidoo297cd252019-02-07 22:10:23 -0500623 arg := &KVArg{
624 Key: TransactionKey,
625 Value: &ic.StrType{Val: transactionId},
626 }
627
628 var marshalledArg *any.Any
629 var err error
630 if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: transactionId}); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800631 logger.Warnw("cannot-add-transactionId", log.Fields{"error": err})
khenaidoo297cd252019-02-07 22:10:23 -0500632 return currentArgs
633 }
634 protoArg := &ic.Argument{
635 Key: arg.Key,
636 Value: marshalledArg,
637 }
638 return append(currentArgs, protoArg)
639}
640
npujar467fe752020-01-16 20:17:45 +0530641func (kp *interContainerProxy) addFromTopic(fromTopic string, currentArgs []*ic.Argument) []*ic.Argument {
khenaidoo54e0ddf2019-02-27 16:21:33 -0500642 var marshalledArg *any.Any
643 var err error
644 if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: fromTopic}); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800645 logger.Warnw("cannot-add-transactionId", log.Fields{"error": err})
khenaidoo54e0ddf2019-02-27 16:21:33 -0500646 return currentArgs
647 }
648 protoArg := &ic.Argument{
649 Key: FromTopic,
650 Value: marshalledArg,
651 }
652 return append(currentArgs, protoArg)
653}
654
npujar467fe752020-01-16 20:17:45 +0530655func (kp *interContainerProxy) handleMessage(msg *ic.InterContainerMessage, targetInterface interface{}) {
khenaidooabad44c2018-08-03 16:58:35 -0400656
khenaidoo43c82122018-11-22 18:38:28 -0500657 // First extract the header to know whether this is a request - responses are handled by a different handler
khenaidoo79232702018-12-04 11:00:41 -0500658 if msg.Header.Type == ic.MessageType_REQUEST {
khenaidooabad44c2018-08-03 16:58:35 -0400659 var out []reflect.Value
660 var err error
661
662 // Get the request body
khenaidoo79232702018-12-04 11:00:41 -0500663 requestBody := &ic.InterContainerRequestBody{}
khenaidooabad44c2018-08-03 16:58:35 -0400664 if err = ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800665 logger.Warnw("cannot-unmarshal-request", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400666 } else {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800667 logger.Debugw("received-request", log.Fields{"rpc": requestBody.Rpc, "header": msg.Header})
khenaidooabad44c2018-08-03 16:58:35 -0400668 // let the callee unpack the arguments as its the only one that knows the real proto type
khenaidoo297cd252019-02-07 22:10:23 -0500669 // Augment the requestBody with the message Id as it will be used in scenarios where cores
670 // are set in pairs and competing
671 requestBody.Args = kp.addTransactionId(msg.Header.Id, requestBody.Args)
khenaidoo54e0ddf2019-02-27 16:21:33 -0500672
673 // Augment the requestBody with the From topic name as it will be used in scenarios where a container
674 // needs to send an unsollicited message to the currently requested container
675 requestBody.Args = kp.addFromTopic(msg.Header.FromTopic, requestBody.Args)
676
khenaidooabad44c2018-08-03 16:58:35 -0400677 out, err = CallFuncByName(targetInterface, requestBody.Rpc, requestBody.Args)
678 if err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800679 logger.Warn(err)
khenaidooabad44c2018-08-03 16:58:35 -0400680 }
681 }
682 // Response required?
683 if requestBody.ResponseRequired {
684 // If we already have an error before then just return that
khenaidoo79232702018-12-04 11:00:41 -0500685 var returnError *ic.Error
khenaidooabad44c2018-08-03 16:58:35 -0400686 var returnedValues []interface{}
687 var success bool
688 if err != nil {
khenaidoo79232702018-12-04 11:00:41 -0500689 returnError = &ic.Error{Reason: err.Error()}
khenaidooabad44c2018-08-03 16:58:35 -0400690 returnedValues = make([]interface{}, 1)
691 returnedValues[0] = returnError
692 } else {
khenaidoob9203542018-09-17 22:56:37 -0400693 returnedValues = make([]interface{}, 0)
694 // Check for errors first
695 lastIndex := len(out) - 1
696 if out[lastIndex].Interface() != nil { // Error
khenaidoo09771ef2019-10-11 14:25:02 -0400697 if retError, ok := out[lastIndex].Interface().(error); ok {
698 if retError.Error() == ErrorTransactionNotAcquired.Error() {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800699 logger.Debugw("Ignoring request", log.Fields{"error": retError, "txId": msg.Header.Id})
khenaidoo09771ef2019-10-11 14:25:02 -0400700 return // Ignore - process is in competing mode and ignored transaction
701 }
702 returnError = &ic.Error{Reason: retError.Error()}
khenaidoob9203542018-09-17 22:56:37 -0400703 returnedValues = append(returnedValues, returnError)
704 } else { // Should never happen
khenaidoo79232702018-12-04 11:00:41 -0500705 returnError = &ic.Error{Reason: "incorrect-error-returns"}
khenaidoob9203542018-09-17 22:56:37 -0400706 returnedValues = append(returnedValues, returnError)
707 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500708 } else if len(out) == 2 && reflect.ValueOf(out[0].Interface()).IsValid() && reflect.ValueOf(out[0].Interface()).IsNil() {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800709 logger.Warnw("Unexpected response of (nil,nil)", log.Fields{"txId": msg.Header.Id})
khenaidoo09771ef2019-10-11 14:25:02 -0400710 return // Ignore - should not happen
khenaidoob9203542018-09-17 22:56:37 -0400711 } else { // Non-error case
712 success = true
713 for idx, val := range out {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800714 //logger.Debugw("returned-api-response-loop", log.Fields{"idx": idx, "val": val.Interface()})
khenaidoob9203542018-09-17 22:56:37 -0400715 if idx != lastIndex {
716 returnedValues = append(returnedValues, val.Interface())
khenaidooabad44c2018-08-03 16:58:35 -0400717 }
khenaidooabad44c2018-08-03 16:58:35 -0400718 }
719 }
720 }
721
khenaidoo79232702018-12-04 11:00:41 -0500722 var icm *ic.InterContainerMessage
khenaidooabad44c2018-08-03 16:58:35 -0400723 if icm, err = encodeResponse(msg, success, returnedValues...); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800724 logger.Warnw("error-encoding-response-returning-failure-result", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400725 icm = encodeDefaultFailedResponse(msg)
726 }
khenaidoo43c82122018-11-22 18:38:28 -0500727 // To preserve ordering of messages, all messages to a given topic are sent to the same partition
728 // by providing a message key. The key is encoded in the topic name. If the deviceId is not
729 // present then the key will be empty, hence all messages for a given topic will be sent to all
730 // partitions.
731 replyTopic := &Topic{Name: msg.Header.FromTopic}
khenaidoobdcb8e02019-03-06 16:28:56 -0500732 key := msg.Header.KeyTopic
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800733 logger.Debugw("sending-response-to-kafka", log.Fields{"rpc": requestBody.Rpc, "header": icm.Header, "key": key})
khenaidoo43c82122018-11-22 18:38:28 -0500734 // TODO: handle error response.
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000735 go func() {
736 if err := kp.kafkaClient.Send(icm, replyTopic, key); err != nil {
737 logger.Errorw("send-reply-failed", log.Fields{
738 "topic": replyTopic,
739 "key": key,
740 "error": err})
741 }
742 }()
khenaidooabad44c2018-08-03 16:58:35 -0400743 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500744 } else if msg.Header.Type == ic.MessageType_RESPONSE {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800745 logger.Debugw("response-received", log.Fields{"msg-header": msg.Header})
khenaidoo54e0ddf2019-02-27 16:21:33 -0500746 go kp.dispatchResponse(msg)
747 } else {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800748 logger.Warnw("unsupported-message-received", log.Fields{"msg-header": msg.Header})
khenaidooabad44c2018-08-03 16:58:35 -0400749 }
750}
751
npujar467fe752020-01-16 20:17:45 +0530752func (kp *interContainerProxy) waitForMessages(ch <-chan *ic.InterContainerMessage, topic Topic, targetInterface interface{}) {
khenaidooabad44c2018-08-03 16:58:35 -0400753 // Wait for messages
754 for msg := range ch {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800755 //logger.Debugw("request-received", log.Fields{"msg": msg, "topic": topic.Name, "target": targetInterface})
khenaidoo54e0ddf2019-02-27 16:21:33 -0500756 go kp.handleMessage(msg, targetInterface)
khenaidooabad44c2018-08-03 16:58:35 -0400757 }
758}
759
npujar467fe752020-01-16 20:17:45 +0530760func (kp *interContainerProxy) dispatchResponse(msg *ic.InterContainerMessage) {
khenaidoo1ce37ad2019-03-24 22:07:24 -0400761 kp.lockTransactionIdToChannelMap.RLock()
762 defer kp.lockTransactionIdToChannelMap.RUnlock()
khenaidooabad44c2018-08-03 16:58:35 -0400763 if _, exist := kp.transactionIdToChannelMap[msg.Header.Id]; !exist {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800764 logger.Debugw("no-waiting-channel", log.Fields{"transaction": msg.Header.Id})
khenaidooabad44c2018-08-03 16:58:35 -0400765 return
766 }
khenaidoo43c82122018-11-22 18:38:28 -0500767 kp.transactionIdToChannelMap[msg.Header.Id].ch <- msg
khenaidooabad44c2018-08-03 16:58:35 -0400768}
769
khenaidooabad44c2018-08-03 16:58:35 -0400770// subscribeForResponse allows a caller to subscribe to a given topic when waiting for a response.
771// This method is built to prevent all subscribers to receive all messages as is the case of the Subscribe
772// API. There is one response channel waiting for kafka messages before dispatching the message to the
773// corresponding waiting channel
npujar467fe752020-01-16 20:17:45 +0530774func (kp *interContainerProxy) subscribeForResponse(topic Topic, trnsId string) (chan *ic.InterContainerMessage, error) {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800775 logger.Debugw("subscribeForResponse", log.Fields{"topic": topic.Name, "trnsid": trnsId})
khenaidooabad44c2018-08-03 16:58:35 -0400776
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500777 // Create a specific channel for this consumers. We cannot use the channel from the kafkaclient as it will
khenaidoo43c82122018-11-22 18:38:28 -0500778 // broadcast any message for this topic to all channels waiting on it.
khenaidoo79232702018-12-04 11:00:41 -0500779 ch := make(chan *ic.InterContainerMessage)
khenaidoo43c82122018-11-22 18:38:28 -0500780 kp.addToTransactionIdToChannelMap(trnsId, &topic, ch)
khenaidooabad44c2018-08-03 16:58:35 -0400781
782 return ch, nil
783}
784
npujar467fe752020-01-16 20:17:45 +0530785func (kp *interContainerProxy) unSubscribeForResponse(trnsId string) error {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800786 logger.Debugw("unsubscribe-for-response", log.Fields{"trnsId": trnsId})
khenaidoo7ff26c72019-01-16 14:55:48 -0500787 kp.deleteFromTransactionIdToChannelMap(trnsId)
khenaidooabad44c2018-08-03 16:58:35 -0400788 return nil
789}
790
npujar467fe752020-01-16 20:17:45 +0530791func (kp *interContainerProxy) EnableLivenessChannel(enable bool) chan bool {
Scott Bakeree6a0872019-10-29 15:59:52 -0700792 return kp.kafkaClient.EnableLivenessChannel(enable)
793}
794
npujar467fe752020-01-16 20:17:45 +0530795func (kp *interContainerProxy) EnableHealthinessChannel(enable bool) chan bool {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800796 return kp.kafkaClient.EnableHealthinessChannel(enable)
797}
798
npujar467fe752020-01-16 20:17:45 +0530799func (kp *interContainerProxy) SendLiveness() error {
Scott Bakeree6a0872019-10-29 15:59:52 -0700800 return kp.kafkaClient.SendLiveness()
801}
802
khenaidooabad44c2018-08-03 16:58:35 -0400803//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
804//or an error on failure
khenaidoobdcb8e02019-03-06 16:28:56 -0500805func encodeRequest(rpc string, toTopic *Topic, replyTopic *Topic, key string, kvArgs ...*KVArg) (*ic.InterContainerMessage, error) {
khenaidoo79232702018-12-04 11:00:41 -0500806 requestHeader := &ic.Header{
khenaidooabad44c2018-08-03 16:58:35 -0400807 Id: uuid.New().String(),
khenaidoo79232702018-12-04 11:00:41 -0500808 Type: ic.MessageType_REQUEST,
khenaidooabad44c2018-08-03 16:58:35 -0400809 FromTopic: replyTopic.Name,
810 ToTopic: toTopic.Name,
khenaidoo2c6a0992019-04-29 13:46:56 -0400811 KeyTopic: key,
khenaidoo8f474192019-04-03 17:20:44 -0400812 Timestamp: time.Now().UnixNano(),
khenaidooabad44c2018-08-03 16:58:35 -0400813 }
khenaidoo79232702018-12-04 11:00:41 -0500814 requestBody := &ic.InterContainerRequestBody{
khenaidooabad44c2018-08-03 16:58:35 -0400815 Rpc: rpc,
816 ResponseRequired: true,
817 ReplyToTopic: replyTopic.Name,
818 }
819
820 for _, arg := range kvArgs {
khenaidoo2c6f1672018-09-20 23:14:41 -0400821 if arg == nil {
822 // In case the caller sends an array with empty args
823 continue
824 }
khenaidooabad44c2018-08-03 16:58:35 -0400825 var marshalledArg *any.Any
826 var err error
827 // ascertain the value interface type is a proto.Message
828 protoValue, ok := arg.Value.(proto.Message)
829 if !ok {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800830 logger.Warnw("argument-value-not-proto-message", log.Fields{"error": ok, "Value": arg.Value})
khenaidooabad44c2018-08-03 16:58:35 -0400831 err := errors.New("argument-value-not-proto-message")
832 return nil, err
833 }
834 if marshalledArg, err = ptypes.MarshalAny(protoValue); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800835 logger.Warnw("cannot-marshal-request", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400836 return nil, err
837 }
khenaidoo79232702018-12-04 11:00:41 -0500838 protoArg := &ic.Argument{
khenaidooabad44c2018-08-03 16:58:35 -0400839 Key: arg.Key,
840 Value: marshalledArg,
841 }
842 requestBody.Args = append(requestBody.Args, protoArg)
843 }
844
845 var marshalledData *any.Any
846 var err error
847 if marshalledData, err = ptypes.MarshalAny(requestBody); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800848 logger.Warnw("cannot-marshal-request", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400849 return nil, err
850 }
khenaidoo79232702018-12-04 11:00:41 -0500851 request := &ic.InterContainerMessage{
khenaidooabad44c2018-08-03 16:58:35 -0400852 Header: requestHeader,
853 Body: marshalledData,
854 }
855 return request, nil
856}
857
khenaidoo79232702018-12-04 11:00:41 -0500858func decodeResponse(response *ic.InterContainerMessage) (*ic.InterContainerResponseBody, error) {
khenaidooabad44c2018-08-03 16:58:35 -0400859 // Extract the message body
khenaidoo79232702018-12-04 11:00:41 -0500860 responseBody := ic.InterContainerResponseBody{}
khenaidooabad44c2018-08-03 16:58:35 -0400861 if err := ptypes.UnmarshalAny(response.Body, &responseBody); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800862 logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400863 return nil, err
864 }
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800865 //logger.Debugw("response-decoded-successfully", log.Fields{"response-status": &responseBody.Success})
khenaidooabad44c2018-08-03 16:58:35 -0400866
867 return &responseBody, nil
868
869}