blob: cbde83421c893cf5980f8eb9731b9f2e103e7284 [file] [log] [blame]
khenaidoobf6e7bb2018-08-14 22:27:29 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
khenaidooabad44c2018-08-03 16:58:35 -040016package kafka
17
18import (
19 "context"
20 "errors"
21 "fmt"
khenaidooabad44c2018-08-03 16:58:35 -040022 "reflect"
khenaidoo19374072018-12-11 11:05:15 -050023 "strings"
khenaidooabad44c2018-08-03 16:58:35 -040024 "sync"
25 "time"
khenaidooabad44c2018-08-03 16:58:35 -040026
David Bainbridge9ae13132020-06-22 17:28:01 -070027 "google.golang.org/grpc/codes"
28 "google.golang.org/grpc/status"
29
serkant.uluderya2ae470f2020-01-21 11:13:09 -080030 "github.com/golang/protobuf/proto"
31 "github.com/golang/protobuf/ptypes"
32 "github.com/golang/protobuf/ptypes/any"
33 "github.com/google/uuid"
34 "github.com/opencord/voltha-lib-go/v3/pkg/log"
35 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
36)
khenaidooabad44c2018-08-03 16:58:35 -040037
38const (
khenaidoo43c82122018-11-22 18:38:28 -050039 DefaultMaxRetries = 3
Scott Bakerb9635992020-03-11 21:11:28 -070040 DefaultRequestTimeout = 60000 // 60000 milliseconds - to handle a wider latency range
khenaidooabad44c2018-08-03 16:58:35 -040041)
42
khenaidoo297cd252019-02-07 22:10:23 -050043const (
44 TransactionKey = "transactionID"
khenaidoo54e0ddf2019-02-27 16:21:33 -050045 FromTopic = "fromTopic"
khenaidoo297cd252019-02-07 22:10:23 -050046)
47
khenaidoo09771ef2019-10-11 14:25:02 -040048var ErrorTransactionNotAcquired = errors.New("transaction-not-acquired")
49var ErrorTransactionInvalidId = errors.New("transaction-invalid-id")
50
khenaidoo43c82122018-11-22 18:38:28 -050051// requestHandlerChannel represents an interface associated with a channel. Whenever, an event is
52// obtained from that channel, this interface is invoked. This is used to handle
53// async requests into the Core via the kafka messaging bus
54type requestHandlerChannel struct {
55 requesthandlerInterface interface{}
khenaidoo79232702018-12-04 11:00:41 -050056 ch <-chan *ic.InterContainerMessage
khenaidooabad44c2018-08-03 16:58:35 -040057}
58
khenaidoo43c82122018-11-22 18:38:28 -050059// transactionChannel represents a combination of a topic and a channel onto which a response received
60// on the kafka bus will be sent to
61type transactionChannel struct {
62 topic *Topic
khenaidoo79232702018-12-04 11:00:41 -050063 ch chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -050064}
65
npujar467fe752020-01-16 20:17:45 +053066type InterContainerProxy interface {
67 Start() error
68 Stop()
69 GetDefaultTopic() *Topic
npujar467fe752020-01-16 20:17:45 +053070 InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic, waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any)
Scott Bakerb9635992020-03-11 21:11:28 -070071 InvokeAsyncRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic, waitForResponse bool, key string, kvArgs ...*KVArg) chan *RpcResponse
npujar467fe752020-01-16 20:17:45 +053072 SubscribeWithRequestHandlerInterface(topic Topic, handler interface{}) error
73 SubscribeWithDefaultRequestHandler(topic Topic, initialOffset int64) error
74 UnSubscribeFromRequestHandler(topic Topic) error
75 DeleteTopic(topic Topic) error
76 EnableLivenessChannel(enable bool) chan bool
77 SendLiveness() error
78}
79
80// interContainerProxy represents the messaging proxy
81type interContainerProxy struct {
Neha Sharmad1387da2020-05-07 20:07:28 +000082 kafkaAddress string
npujar467fe752020-01-16 20:17:45 +053083 defaultTopic *Topic
khenaidoo43c82122018-11-22 18:38:28 -050084 defaultRequestHandlerInterface interface{}
85 kafkaClient Client
npujar467fe752020-01-16 20:17:45 +053086 doneCh chan struct{}
87 doneOnce sync.Once
khenaidoo43c82122018-11-22 18:38:28 -050088
89 // This map is used to map a topic to an interface and channel. When a request is received
90 // on that channel (registered to the topic) then that interface is invoked.
91 topicToRequestHandlerChannelMap map[string]*requestHandlerChannel
92 lockTopicRequestHandlerChannelMap sync.RWMutex
93
94 // This map is used to map a channel to a response topic. This channel handles all responses on that
khenaidoo4c1a5bf2018-11-29 15:53:42 -050095 // channel for that topic and forward them to the appropriate consumers channel, using the
khenaidoo43c82122018-11-22 18:38:28 -050096 // transactionIdToChannelMap.
khenaidoo79232702018-12-04 11:00:41 -050097 topicToResponseChannelMap map[string]<-chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -050098 lockTopicResponseChannelMap sync.RWMutex
99
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500100 // This map is used to map a transaction to a consumers channel. This is used whenever a request has been
khenaidoo43c82122018-11-22 18:38:28 -0500101 // sent out and we are waiting for a response.
102 transactionIdToChannelMap map[string]*transactionChannel
khenaidooabad44c2018-08-03 16:58:35 -0400103 lockTransactionIdToChannelMap sync.RWMutex
104}
105
npujar467fe752020-01-16 20:17:45 +0530106type InterContainerProxyOption func(*interContainerProxy)
khenaidooabad44c2018-08-03 16:58:35 -0400107
Neha Sharmad1387da2020-05-07 20:07:28 +0000108func InterContainerAddress(address string) InterContainerProxyOption {
npujar467fe752020-01-16 20:17:45 +0530109 return func(args *interContainerProxy) {
Neha Sharmad1387da2020-05-07 20:07:28 +0000110 args.kafkaAddress = address
khenaidooabad44c2018-08-03 16:58:35 -0400111 }
112}
113
khenaidoo43c82122018-11-22 18:38:28 -0500114func DefaultTopic(topic *Topic) InterContainerProxyOption {
npujar467fe752020-01-16 20:17:45 +0530115 return func(args *interContainerProxy) {
116 args.defaultTopic = topic
khenaidooabad44c2018-08-03 16:58:35 -0400117 }
118}
119
khenaidoo43c82122018-11-22 18:38:28 -0500120func RequestHandlerInterface(handler interface{}) InterContainerProxyOption {
npujar467fe752020-01-16 20:17:45 +0530121 return func(args *interContainerProxy) {
khenaidoo43c82122018-11-22 18:38:28 -0500122 args.defaultRequestHandlerInterface = handler
khenaidooabad44c2018-08-03 16:58:35 -0400123 }
124}
125
khenaidoo43c82122018-11-22 18:38:28 -0500126func MsgClient(client Client) InterContainerProxyOption {
npujar467fe752020-01-16 20:17:45 +0530127 return func(args *interContainerProxy) {
khenaidoo43c82122018-11-22 18:38:28 -0500128 args.kafkaClient = client
129 }
130}
131
npujar467fe752020-01-16 20:17:45 +0530132func newInterContainerProxy(opts ...InterContainerProxyOption) *interContainerProxy {
133 proxy := &interContainerProxy{
Neha Sharmad1387da2020-05-07 20:07:28 +0000134 kafkaAddress: DefaultKafkaAddress,
135 doneCh: make(chan struct{}),
khenaidooabad44c2018-08-03 16:58:35 -0400136 }
137
138 for _, option := range opts {
139 option(proxy)
140 }
141
npujar467fe752020-01-16 20:17:45 +0530142 return proxy
khenaidooabad44c2018-08-03 16:58:35 -0400143}
144
npujar467fe752020-01-16 20:17:45 +0530145func NewInterContainerProxy(opts ...InterContainerProxyOption) InterContainerProxy {
146 return newInterContainerProxy(opts...)
147}
148
149func (kp *interContainerProxy) Start() error {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800150 logger.Info("Starting-Proxy")
khenaidooabad44c2018-08-03 16:58:35 -0400151
khenaidoo43c82122018-11-22 18:38:28 -0500152 // Kafka MsgClient should already have been created. If not, output fatal error
153 if kp.kafkaClient == nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800154 logger.Fatal("kafka-client-not-set")
khenaidoo43c82122018-11-22 18:38:28 -0500155 }
156
khenaidoo43c82122018-11-22 18:38:28 -0500157 // Start the kafka client
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500158 if err := kp.kafkaClient.Start(); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800159 logger.Errorw("Cannot-create-kafka-proxy", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400160 return err
161 }
162
khenaidoo43c82122018-11-22 18:38:28 -0500163 // Create the topic to response channel map
khenaidoo79232702018-12-04 11:00:41 -0500164 kp.topicToResponseChannelMap = make(map[string]<-chan *ic.InterContainerMessage)
khenaidoo43c82122018-11-22 18:38:28 -0500165 //
khenaidooabad44c2018-08-03 16:58:35 -0400166 // Create the transactionId to Channel Map
khenaidoo43c82122018-11-22 18:38:28 -0500167 kp.transactionIdToChannelMap = make(map[string]*transactionChannel)
168
169 // Create the topic to request channel map
170 kp.topicToRequestHandlerChannelMap = make(map[string]*requestHandlerChannel)
khenaidooabad44c2018-08-03 16:58:35 -0400171
172 return nil
173}
174
npujar467fe752020-01-16 20:17:45 +0530175func (kp *interContainerProxy) Stop() {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800176 logger.Info("stopping-intercontainer-proxy")
npujar467fe752020-01-16 20:17:45 +0530177 kp.doneOnce.Do(func() { close(kp.doneCh) })
khenaidoo43c82122018-11-22 18:38:28 -0500178 // TODO : Perform cleanup
khenaidooca301322019-01-09 23:06:32 -0500179 kp.kafkaClient.Stop()
Scott Baker0e78ba22020-02-24 17:58:47 -0800180 err := kp.deleteAllTopicRequestHandlerChannelMap()
181 if err != nil {
Scott Baker432f9be2020-03-26 11:56:30 -0700182 logger.Errorw("failed-delete-all-topic-request-handler-channel-map", log.Fields{"error": err})
Scott Baker0e78ba22020-02-24 17:58:47 -0800183 }
184 err = kp.deleteAllTopicResponseChannelMap()
185 if err != nil {
Scott Baker432f9be2020-03-26 11:56:30 -0700186 logger.Errorw("failed-delete-all-topic-response-channel-map", log.Fields{"error": err})
Scott Baker0e78ba22020-02-24 17:58:47 -0800187 }
188 kp.deleteAllTransactionIdToChannelMap()
khenaidooabad44c2018-08-03 16:58:35 -0400189}
190
npujar467fe752020-01-16 20:17:45 +0530191func (kp *interContainerProxy) GetDefaultTopic() *Topic {
192 return kp.defaultTopic
193}
194
Scott Bakerb9635992020-03-11 21:11:28 -0700195// InvokeAsyncRPC is used to make an RPC request asynchronously
196func (kp *interContainerProxy) InvokeAsyncRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
197 waitForResponse bool, key string, kvArgs ...*KVArg) chan *RpcResponse {
198
199 logger.Debugw("InvokeAsyncRPC", log.Fields{"rpc": rpc, "key": key})
200 // If a replyToTopic is provided then we use it, otherwise just use the default toTopic. The replyToTopic is
201 // typically the device ID.
202 responseTopic := replyToTopic
203 if responseTopic == nil {
204 responseTopic = kp.GetDefaultTopic()
205 }
206
207 chnl := make(chan *RpcResponse)
208
209 go func() {
210
211 // once we're done,
212 // close the response channel
213 defer close(chnl)
214
215 var err error
216 var protoRequest *ic.InterContainerMessage
217
218 // Encode the request
219 protoRequest, err = encodeRequest(rpc, toTopic, responseTopic, key, kvArgs...)
220 if err != nil {
221 logger.Warnw("cannot-format-request", log.Fields{"rpc": rpc, "error": err})
222 chnl <- NewResponse(RpcFormattingError, err, nil)
223 return
224 }
225
226 // Subscribe for response, if needed, before sending request
227 var ch <-chan *ic.InterContainerMessage
228 if ch, err = kp.subscribeForResponse(*responseTopic, protoRequest.Header.Id); err != nil {
229 logger.Errorw("failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
230 chnl <- NewResponse(RpcTransportError, err, nil)
231 return
232 }
233
234 // Send request - if the topic is formatted with a device Id then we will send the request using a
235 // specific key, hence ensuring a single partition is used to publish the request. This ensures that the
236 // subscriber on that topic will receive the request in the order it was sent. The key used is the deviceId.
237 logger.Debugw("sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
238
239 // if the message is not sent on kafka publish an event an close the channel
240 if err = kp.kafkaClient.Send(protoRequest, toTopic, key); err != nil {
241 chnl <- NewResponse(RpcTransportError, err, nil)
242 return
243 }
244
245 // if the client is not waiting for a response send the ack and close the channel
246 chnl <- NewResponse(RpcSent, nil, nil)
247 if !waitForResponse {
248 return
249 }
250
251 defer func() {
252 // Remove the subscription for a response on return
253 if err := kp.unSubscribeForResponse(protoRequest.Header.Id); err != nil {
254 logger.Warnw("invoke-async-rpc-unsubscriber-for-response-failed", log.Fields{"err": err})
255 }
256 }()
257
258 // Wait for response as well as timeout or cancellation
259 select {
260 case msg, ok := <-ch:
261 if !ok {
262 logger.Warnw("channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
263 chnl <- NewResponse(RpcTransportError, status.Error(codes.Aborted, "channel closed"), nil)
264 }
265 logger.Debugw("received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
266 if responseBody, err := decodeResponse(msg); err != nil {
267 chnl <- NewResponse(RpcReply, err, nil)
268 } else {
269 if responseBody.Success {
270 chnl <- NewResponse(RpcReply, nil, responseBody.Result)
271 } else {
272 // response body contains an error
273 unpackErr := &ic.Error{}
274 if err := ptypes.UnmarshalAny(responseBody.Result, unpackErr); err != nil {
275 chnl <- NewResponse(RpcReply, err, nil)
276 } else {
277 chnl <- NewResponse(RpcReply, status.Error(codes.Internal, unpackErr.Reason), nil)
278 }
279 }
280 }
281 case <-ctx.Done():
282 logger.Errorw("context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
283 err := status.Error(codes.DeadlineExceeded, ctx.Err().Error())
284 chnl <- NewResponse(RpcTimeout, err, nil)
285 case <-kp.doneCh:
286 chnl <- NewResponse(RpcSystemClosing, nil, nil)
287 logger.Warnw("received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
288 }
289 }()
290 return chnl
291}
292
khenaidoo43c82122018-11-22 18:38:28 -0500293// InvokeRPC is used to send a request to a given topic
npujar467fe752020-01-16 20:17:45 +0530294func (kp *interContainerProxy) InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
khenaidoobdcb8e02019-03-06 16:28:56 -0500295 waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any) {
khenaidoo43c82122018-11-22 18:38:28 -0500296
297 // If a replyToTopic is provided then we use it, otherwise just use the default toTopic. The replyToTopic is
298 // typically the device ID.
299 responseTopic := replyToTopic
300 if responseTopic == nil {
npujar467fe752020-01-16 20:17:45 +0530301 responseTopic = kp.defaultTopic
khenaidoo43c82122018-11-22 18:38:28 -0500302 }
303
khenaidooabad44c2018-08-03 16:58:35 -0400304 // Encode the request
khenaidoobdcb8e02019-03-06 16:28:56 -0500305 protoRequest, err := encodeRequest(rpc, toTopic, responseTopic, key, kvArgs...)
khenaidooabad44c2018-08-03 16:58:35 -0400306 if err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800307 logger.Warnw("cannot-format-request", log.Fields{"rpc": rpc, "error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400308 return false, nil
309 }
310
311 // Subscribe for response, if needed, before sending request
khenaidoo79232702018-12-04 11:00:41 -0500312 var ch <-chan *ic.InterContainerMessage
khenaidooabad44c2018-08-03 16:58:35 -0400313 if waitForResponse {
314 var err error
khenaidoo43c82122018-11-22 18:38:28 -0500315 if ch, err = kp.subscribeForResponse(*responseTopic, protoRequest.Header.Id); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800316 logger.Errorw("failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
khenaidooabad44c2018-08-03 16:58:35 -0400317 }
318 }
319
khenaidoo43c82122018-11-22 18:38:28 -0500320 // Send request - if the topic is formatted with a device Id then we will send the request using a
321 // specific key, hence ensuring a single partition is used to publish the request. This ensures that the
322 // subscriber on that topic will receive the request in the order it was sent. The key used is the deviceId.
khenaidoobdcb8e02019-03-06 16:28:56 -0500323 //key := GetDeviceIdFromTopic(*toTopic)
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800324 logger.Debugw("sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000325 go func() {
326 if err := kp.kafkaClient.Send(protoRequest, toTopic, key); err != nil {
327 logger.Errorw("send-failed", log.Fields{
328 "topic": toTopic,
329 "key": key,
330 "error": err})
331 }
332 }()
khenaidooabad44c2018-08-03 16:58:35 -0400333
334 if waitForResponse {
khenaidoob9203542018-09-17 22:56:37 -0400335 // Create a child context based on the parent context, if any
khenaidooabad44c2018-08-03 16:58:35 -0400336 var cancel context.CancelFunc
khenaidoob9203542018-09-17 22:56:37 -0400337 childCtx := context.Background()
khenaidooabad44c2018-08-03 16:58:35 -0400338 if ctx == nil {
339 ctx, cancel = context.WithTimeout(context.Background(), DefaultRequestTimeout*time.Millisecond)
khenaidoob9203542018-09-17 22:56:37 -0400340 } else {
341 childCtx, cancel = context.WithTimeout(ctx, DefaultRequestTimeout*time.Millisecond)
khenaidooabad44c2018-08-03 16:58:35 -0400342 }
khenaidoob9203542018-09-17 22:56:37 -0400343 defer cancel()
khenaidooabad44c2018-08-03 16:58:35 -0400344
345 // Wait for response as well as timeout or cancellation
346 // Remove the subscription for a response on return
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000347 defer func() {
348 if err := kp.unSubscribeForResponse(protoRequest.Header.Id); err != nil {
349 logger.Errorw("response-unsubscribe-failed", log.Fields{
350 "id": protoRequest.Header.Id,
351 "error": err})
352 }
353 }()
khenaidooabad44c2018-08-03 16:58:35 -0400354 select {
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500355 case msg, ok := <-ch:
356 if !ok {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800357 logger.Warnw("channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500358 protoError := &ic.Error{Reason: "channel-closed"}
359 var marshalledArg *any.Any
360 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
361 return false, nil // Should never happen
362 }
363 return false, marshalledArg
364 }
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800365 logger.Debugw("received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
khenaidoo79232702018-12-04 11:00:41 -0500366 var responseBody *ic.InterContainerResponseBody
khenaidooabad44c2018-08-03 16:58:35 -0400367 var err error
368 if responseBody, err = decodeResponse(msg); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800369 logger.Errorw("decode-response-error", log.Fields{"error": err})
npujar467fe752020-01-16 20:17:45 +0530370 // FIXME we should return something
khenaidooabad44c2018-08-03 16:58:35 -0400371 }
372 return responseBody.Success, responseBody.Result
373 case <-ctx.Done():
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800374 logger.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
khenaidooabad44c2018-08-03 16:58:35 -0400375 // pack the error as proto any type
npujar467fe752020-01-16 20:17:45 +0530376 protoError := &ic.Error{Reason: ctx.Err().Error(), Code: ic.ErrorCode_DEADLINE_EXCEEDED}
377
khenaidooabad44c2018-08-03 16:58:35 -0400378 var marshalledArg *any.Any
379 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
380 return false, nil // Should never happen
381 }
382 return false, marshalledArg
khenaidoob9203542018-09-17 22:56:37 -0400383 case <-childCtx.Done():
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800384 logger.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": childCtx.Err()})
khenaidoob9203542018-09-17 22:56:37 -0400385 // pack the error as proto any type
npujar467fe752020-01-16 20:17:45 +0530386 protoError := &ic.Error{Reason: childCtx.Err().Error(), Code: ic.ErrorCode_DEADLINE_EXCEEDED}
387
khenaidoob9203542018-09-17 22:56:37 -0400388 var marshalledArg *any.Any
389 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
390 return false, nil // Should never happen
391 }
392 return false, marshalledArg
khenaidooabad44c2018-08-03 16:58:35 -0400393 case <-kp.doneCh:
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800394 logger.Infow("received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
khenaidooabad44c2018-08-03 16:58:35 -0400395 return true, nil
396 }
397 }
398 return true, nil
399}
400
khenaidoo43c82122018-11-22 18:38:28 -0500401// SubscribeWithRequestHandlerInterface allows a caller to assign a target object to be invoked automatically
khenaidooabad44c2018-08-03 16:58:35 -0400402// when a message is received on a given topic
npujar467fe752020-01-16 20:17:45 +0530403func (kp *interContainerProxy) SubscribeWithRequestHandlerInterface(topic Topic, handler interface{}) error {
khenaidooabad44c2018-08-03 16:58:35 -0400404
405 // Subscribe to receive messages for that topic
khenaidoo79232702018-12-04 11:00:41 -0500406 var ch <-chan *ic.InterContainerMessage
khenaidooabad44c2018-08-03 16:58:35 -0400407 var err error
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500408 if ch, err = kp.kafkaClient.Subscribe(&topic); err != nil {
khenaidoo43c82122018-11-22 18:38:28 -0500409 //if ch, err = kp.Subscribe(topic); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800410 logger.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
Abhilash S.L90cd9552019-07-18 17:30:29 +0530411 return err
khenaidooabad44c2018-08-03 16:58:35 -0400412 }
khenaidoo43c82122018-11-22 18:38:28 -0500413
414 kp.defaultRequestHandlerInterface = handler
415 kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: handler, ch: ch})
khenaidooabad44c2018-08-03 16:58:35 -0400416 // Launch a go routine to receive and process kafka messages
khenaidoo54e0ddf2019-02-27 16:21:33 -0500417 go kp.waitForMessages(ch, topic, handler)
khenaidooabad44c2018-08-03 16:58:35 -0400418
419 return nil
420}
421
khenaidoo43c82122018-11-22 18:38:28 -0500422// SubscribeWithDefaultRequestHandler allows a caller to add a topic to an existing target object to be invoked automatically
423// when a message is received on a given topic. So far there is only 1 target registered per microservice
npujar467fe752020-01-16 20:17:45 +0530424func (kp *interContainerProxy) SubscribeWithDefaultRequestHandler(topic Topic, initialOffset int64) error {
khenaidoo43c82122018-11-22 18:38:28 -0500425 // Subscribe to receive messages for that topic
khenaidoo79232702018-12-04 11:00:41 -0500426 var ch <-chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -0500427 var err error
khenaidoo54e0ddf2019-02-27 16:21:33 -0500428 if ch, err = kp.kafkaClient.Subscribe(&topic, &KVArg{Key: Offset, Value: initialOffset}); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800429 logger.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
khenaidooca301322019-01-09 23:06:32 -0500430 return err
khenaidoo43c82122018-11-22 18:38:28 -0500431 }
432 kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: kp.defaultRequestHandlerInterface, ch: ch})
433
434 // Launch a go routine to receive and process kafka messages
khenaidoo54e0ddf2019-02-27 16:21:33 -0500435 go kp.waitForMessages(ch, topic, kp.defaultRequestHandlerInterface)
khenaidoo43c82122018-11-22 18:38:28 -0500436
khenaidooabad44c2018-08-03 16:58:35 -0400437 return nil
438}
439
npujar467fe752020-01-16 20:17:45 +0530440func (kp *interContainerProxy) UnSubscribeFromRequestHandler(topic Topic) error {
khenaidoo43c82122018-11-22 18:38:28 -0500441 return kp.deleteFromTopicRequestHandlerChannelMap(topic.Name)
442}
443
npujar467fe752020-01-16 20:17:45 +0530444func (kp *interContainerProxy) deleteFromTopicResponseChannelMap(topic string) error {
khenaidoo43c82122018-11-22 18:38:28 -0500445 kp.lockTopicResponseChannelMap.Lock()
446 defer kp.lockTopicResponseChannelMap.Unlock()
447 if _, exist := kp.topicToResponseChannelMap[topic]; exist {
448 // Unsubscribe to this topic first - this will close the subscribed channel
449 var err error
450 if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800451 logger.Errorw("unsubscribing-error", log.Fields{"topic": topic})
khenaidoo43c82122018-11-22 18:38:28 -0500452 }
453 delete(kp.topicToResponseChannelMap, topic)
454 return err
455 } else {
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000456 return fmt.Errorf("%s-Topic-not-found", topic)
khenaidooabad44c2018-08-03 16:58:35 -0400457 }
458}
459
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000460// nolint: unused
npujar467fe752020-01-16 20:17:45 +0530461func (kp *interContainerProxy) deleteAllTopicResponseChannelMap() error {
Scott Baker0e78ba22020-02-24 17:58:47 -0800462 logger.Debug("delete-all-topic-response-channel")
khenaidoo43c82122018-11-22 18:38:28 -0500463 kp.lockTopicResponseChannelMap.Lock()
464 defer kp.lockTopicResponseChannelMap.Unlock()
Scott Baker0e78ba22020-02-24 17:58:47 -0800465 var unsubscribeFailTopics []string
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000466 for topic := range kp.topicToResponseChannelMap {
khenaidoo43c82122018-11-22 18:38:28 -0500467 // Unsubscribe to this topic first - this will close the subscribed channel
Scott Baker0e78ba22020-02-24 17:58:47 -0800468 if err := kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
469 unsubscribeFailTopics = append(unsubscribeFailTopics, topic)
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800470 logger.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
Scott Baker0e78ba22020-02-24 17:58:47 -0800471 // Do not return. Continue to try to unsubscribe to other topics.
472 } else {
473 // Only delete from channel map if successfully unsubscribed.
474 delete(kp.topicToResponseChannelMap, topic)
khenaidoo43c82122018-11-22 18:38:28 -0500475 }
khenaidooabad44c2018-08-03 16:58:35 -0400476 }
Scott Baker0e78ba22020-02-24 17:58:47 -0800477 if len(unsubscribeFailTopics) > 0 {
478 return fmt.Errorf("unsubscribe-errors: %v", unsubscribeFailTopics)
479 }
480 return nil
khenaidooabad44c2018-08-03 16:58:35 -0400481}
482
npujar467fe752020-01-16 20:17:45 +0530483func (kp *interContainerProxy) addToTopicRequestHandlerChannelMap(topic string, arg *requestHandlerChannel) {
khenaidoo43c82122018-11-22 18:38:28 -0500484 kp.lockTopicRequestHandlerChannelMap.Lock()
485 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
486 if _, exist := kp.topicToRequestHandlerChannelMap[topic]; !exist {
487 kp.topicToRequestHandlerChannelMap[topic] = arg
khenaidooabad44c2018-08-03 16:58:35 -0400488 }
khenaidooabad44c2018-08-03 16:58:35 -0400489}
490
npujar467fe752020-01-16 20:17:45 +0530491func (kp *interContainerProxy) deleteFromTopicRequestHandlerChannelMap(topic string) error {
khenaidoo43c82122018-11-22 18:38:28 -0500492 kp.lockTopicRequestHandlerChannelMap.Lock()
493 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
494 if _, exist := kp.topicToRequestHandlerChannelMap[topic]; exist {
495 // Close the kafka client client first by unsubscribing to this topic
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000496 if err := kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
497 return err
498 }
khenaidoo43c82122018-11-22 18:38:28 -0500499 delete(kp.topicToRequestHandlerChannelMap, topic)
khenaidooabad44c2018-08-03 16:58:35 -0400500 return nil
khenaidoo43c82122018-11-22 18:38:28 -0500501 } else {
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000502 return fmt.Errorf("%s-Topic-not-found", topic)
khenaidooabad44c2018-08-03 16:58:35 -0400503 }
khenaidooabad44c2018-08-03 16:58:35 -0400504}
505
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000506// nolint: unused
npujar467fe752020-01-16 20:17:45 +0530507func (kp *interContainerProxy) deleteAllTopicRequestHandlerChannelMap() error {
Scott Baker0e78ba22020-02-24 17:58:47 -0800508 logger.Debug("delete-all-topic-request-channel")
khenaidoo43c82122018-11-22 18:38:28 -0500509 kp.lockTopicRequestHandlerChannelMap.Lock()
510 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
Scott Baker0e78ba22020-02-24 17:58:47 -0800511 var unsubscribeFailTopics []string
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000512 for topic := range kp.topicToRequestHandlerChannelMap {
khenaidoo43c82122018-11-22 18:38:28 -0500513 // Close the kafka client client first by unsubscribing to this topic
Scott Baker0e78ba22020-02-24 17:58:47 -0800514 if err := kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
515 unsubscribeFailTopics = append(unsubscribeFailTopics, topic)
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800516 logger.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
Scott Baker0e78ba22020-02-24 17:58:47 -0800517 // Do not return. Continue to try to unsubscribe to other topics.
518 } else {
519 // Only delete from channel map if successfully unsubscribed.
520 delete(kp.topicToRequestHandlerChannelMap, topic)
khenaidoo43c82122018-11-22 18:38:28 -0500521 }
khenaidoo43c82122018-11-22 18:38:28 -0500522 }
Scott Baker0e78ba22020-02-24 17:58:47 -0800523 if len(unsubscribeFailTopics) > 0 {
524 return fmt.Errorf("unsubscribe-errors: %v", unsubscribeFailTopics)
525 }
526 return nil
khenaidoo43c82122018-11-22 18:38:28 -0500527}
528
npujar467fe752020-01-16 20:17:45 +0530529func (kp *interContainerProxy) addToTransactionIdToChannelMap(id string, topic *Topic, arg chan *ic.InterContainerMessage) {
khenaidooabad44c2018-08-03 16:58:35 -0400530 kp.lockTransactionIdToChannelMap.Lock()
531 defer kp.lockTransactionIdToChannelMap.Unlock()
532 if _, exist := kp.transactionIdToChannelMap[id]; !exist {
khenaidoo43c82122018-11-22 18:38:28 -0500533 kp.transactionIdToChannelMap[id] = &transactionChannel{topic: topic, ch: arg}
khenaidooabad44c2018-08-03 16:58:35 -0400534 }
535}
536
npujar467fe752020-01-16 20:17:45 +0530537func (kp *interContainerProxy) deleteFromTransactionIdToChannelMap(id string) {
khenaidooabad44c2018-08-03 16:58:35 -0400538 kp.lockTransactionIdToChannelMap.Lock()
539 defer kp.lockTransactionIdToChannelMap.Unlock()
khenaidoo43c82122018-11-22 18:38:28 -0500540 if transChannel, exist := kp.transactionIdToChannelMap[id]; exist {
541 // Close the channel first
542 close(transChannel.ch)
khenaidooabad44c2018-08-03 16:58:35 -0400543 delete(kp.transactionIdToChannelMap, id)
544 }
545}
546
npujar467fe752020-01-16 20:17:45 +0530547func (kp *interContainerProxy) deleteTopicTransactionIdToChannelMap(id string) {
khenaidoo43c82122018-11-22 18:38:28 -0500548 kp.lockTransactionIdToChannelMap.Lock()
549 defer kp.lockTransactionIdToChannelMap.Unlock()
550 for key, value := range kp.transactionIdToChannelMap {
551 if value.topic.Name == id {
552 close(value.ch)
553 delete(kp.transactionIdToChannelMap, key)
khenaidooabad44c2018-08-03 16:58:35 -0400554 }
555 }
khenaidooabad44c2018-08-03 16:58:35 -0400556}
557
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000558// nolint: unused
npujar467fe752020-01-16 20:17:45 +0530559func (kp *interContainerProxy) deleteAllTransactionIdToChannelMap() {
Scott Baker0e78ba22020-02-24 17:58:47 -0800560 logger.Debug("delete-all-transaction-id-channel-map")
khenaidoo43c82122018-11-22 18:38:28 -0500561 kp.lockTransactionIdToChannelMap.Lock()
562 defer kp.lockTransactionIdToChannelMap.Unlock()
563 for key, value := range kp.transactionIdToChannelMap {
564 close(value.ch)
565 delete(kp.transactionIdToChannelMap, key)
khenaidooabad44c2018-08-03 16:58:35 -0400566 }
khenaidooabad44c2018-08-03 16:58:35 -0400567}
568
npujar467fe752020-01-16 20:17:45 +0530569func (kp *interContainerProxy) DeleteTopic(topic Topic) error {
khenaidoo43c82122018-11-22 18:38:28 -0500570 // If we have any consumers on that topic we need to close them
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500571 if err := kp.deleteFromTopicResponseChannelMap(topic.Name); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800572 logger.Errorw("delete-from-topic-responsechannelmap-failed", log.Fields{"error": err})
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500573 }
574 if err := kp.deleteFromTopicRequestHandlerChannelMap(topic.Name); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800575 logger.Errorw("delete-from-topic-requesthandlerchannelmap-failed", log.Fields{"error": err})
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500576 }
khenaidoo43c82122018-11-22 18:38:28 -0500577 kp.deleteTopicTransactionIdToChannelMap(topic.Name)
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500578
khenaidoo43c82122018-11-22 18:38:28 -0500579 return kp.kafkaClient.DeleteTopic(&topic)
580}
581
582func encodeReturnedValue(returnedVal interface{}) (*any.Any, error) {
khenaidooabad44c2018-08-03 16:58:35 -0400583 // Encode the response argument - needs to be a proto message
584 if returnedVal == nil {
585 return nil, nil
586 }
587 protoValue, ok := returnedVal.(proto.Message)
588 if !ok {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800589 logger.Warnw("response-value-not-proto-message", log.Fields{"error": ok, "returnVal": returnedVal})
khenaidooabad44c2018-08-03 16:58:35 -0400590 err := errors.New("response-value-not-proto-message")
591 return nil, err
592 }
593
594 // Marshal the returned value, if any
595 var marshalledReturnedVal *any.Any
596 var err error
597 if marshalledReturnedVal, err = ptypes.MarshalAny(protoValue); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800598 logger.Warnw("cannot-marshal-returned-val", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400599 return nil, err
600 }
601 return marshalledReturnedVal, nil
602}
603
khenaidoo79232702018-12-04 11:00:41 -0500604func encodeDefaultFailedResponse(request *ic.InterContainerMessage) *ic.InterContainerMessage {
605 responseHeader := &ic.Header{
khenaidooabad44c2018-08-03 16:58:35 -0400606 Id: request.Header.Id,
khenaidoo79232702018-12-04 11:00:41 -0500607 Type: ic.MessageType_RESPONSE,
khenaidooabad44c2018-08-03 16:58:35 -0400608 FromTopic: request.Header.ToTopic,
609 ToTopic: request.Header.FromTopic,
Scott Baker504b4802020-04-17 10:12:20 -0700610 Timestamp: ptypes.TimestampNow(),
khenaidooabad44c2018-08-03 16:58:35 -0400611 }
khenaidoo79232702018-12-04 11:00:41 -0500612 responseBody := &ic.InterContainerResponseBody{
khenaidooabad44c2018-08-03 16:58:35 -0400613 Success: false,
614 Result: nil,
615 }
616 var marshalledResponseBody *any.Any
617 var err error
618 // Error should never happen here
619 if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800620 logger.Warnw("cannot-marshal-failed-response-body", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400621 }
622
khenaidoo79232702018-12-04 11:00:41 -0500623 return &ic.InterContainerMessage{
khenaidooabad44c2018-08-03 16:58:35 -0400624 Header: responseHeader,
625 Body: marshalledResponseBody,
626 }
627
628}
629
630//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
631//or an error on failure
khenaidoo79232702018-12-04 11:00:41 -0500632func encodeResponse(request *ic.InterContainerMessage, success bool, returnedValues ...interface{}) (*ic.InterContainerMessage, error) {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800633 //logger.Debugw("encodeResponse", log.Fields{"success": success, "returnedValues": returnedValues})
khenaidoo79232702018-12-04 11:00:41 -0500634 responseHeader := &ic.Header{
khenaidooabad44c2018-08-03 16:58:35 -0400635 Id: request.Header.Id,
khenaidoo79232702018-12-04 11:00:41 -0500636 Type: ic.MessageType_RESPONSE,
khenaidooabad44c2018-08-03 16:58:35 -0400637 FromTopic: request.Header.ToTopic,
638 ToTopic: request.Header.FromTopic,
khenaidoo2c6a0992019-04-29 13:46:56 -0400639 KeyTopic: request.Header.KeyTopic,
Scott Baker504b4802020-04-17 10:12:20 -0700640 Timestamp: ptypes.TimestampNow(),
khenaidooabad44c2018-08-03 16:58:35 -0400641 }
642
643 // Go over all returned values
644 var marshalledReturnedVal *any.Any
645 var err error
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000646
647 // for now we support only 1 returned value - (excluding the error)
648 if len(returnedValues) > 0 {
649 if marshalledReturnedVal, err = encodeReturnedValue(returnedValues[0]); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800650 logger.Warnw("cannot-marshal-response-body", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400651 }
khenaidooabad44c2018-08-03 16:58:35 -0400652 }
653
khenaidoo79232702018-12-04 11:00:41 -0500654 responseBody := &ic.InterContainerResponseBody{
khenaidooabad44c2018-08-03 16:58:35 -0400655 Success: success,
656 Result: marshalledReturnedVal,
657 }
658
659 // Marshal the response body
660 var marshalledResponseBody *any.Any
661 if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800662 logger.Warnw("cannot-marshal-response-body", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400663 return nil, err
664 }
665
khenaidoo79232702018-12-04 11:00:41 -0500666 return &ic.InterContainerMessage{
khenaidooabad44c2018-08-03 16:58:35 -0400667 Header: responseHeader,
668 Body: marshalledResponseBody,
669 }, nil
670}
671
672func CallFuncByName(myClass interface{}, funcName string, params ...interface{}) (out []reflect.Value, err error) {
673 myClassValue := reflect.ValueOf(myClass)
khenaidoo19374072018-12-11 11:05:15 -0500674 // Capitalize the first letter in the funcName to workaround the first capital letters required to
675 // invoke a function from a different package
676 funcName = strings.Title(funcName)
khenaidooabad44c2018-08-03 16:58:35 -0400677 m := myClassValue.MethodByName(funcName)
678 if !m.IsValid() {
khenaidoo43c82122018-11-22 18:38:28 -0500679 return make([]reflect.Value, 0), fmt.Errorf("method-not-found \"%s\"", funcName)
khenaidooabad44c2018-08-03 16:58:35 -0400680 }
681 in := make([]reflect.Value, len(params))
682 for i, param := range params {
683 in[i] = reflect.ValueOf(param)
684 }
685 out = m.Call(in)
686 return
687}
688
npujar467fe752020-01-16 20:17:45 +0530689func (kp *interContainerProxy) addTransactionId(transactionId string, currentArgs []*ic.Argument) []*ic.Argument {
khenaidoo297cd252019-02-07 22:10:23 -0500690 arg := &KVArg{
691 Key: TransactionKey,
692 Value: &ic.StrType{Val: transactionId},
693 }
694
695 var marshalledArg *any.Any
696 var err error
697 if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: transactionId}); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800698 logger.Warnw("cannot-add-transactionId", log.Fields{"error": err})
khenaidoo297cd252019-02-07 22:10:23 -0500699 return currentArgs
700 }
701 protoArg := &ic.Argument{
702 Key: arg.Key,
703 Value: marshalledArg,
704 }
705 return append(currentArgs, protoArg)
706}
707
npujar467fe752020-01-16 20:17:45 +0530708func (kp *interContainerProxy) addFromTopic(fromTopic string, currentArgs []*ic.Argument) []*ic.Argument {
khenaidoo54e0ddf2019-02-27 16:21:33 -0500709 var marshalledArg *any.Any
710 var err error
711 if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: fromTopic}); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800712 logger.Warnw("cannot-add-transactionId", log.Fields{"error": err})
khenaidoo54e0ddf2019-02-27 16:21:33 -0500713 return currentArgs
714 }
715 protoArg := &ic.Argument{
716 Key: FromTopic,
717 Value: marshalledArg,
718 }
719 return append(currentArgs, protoArg)
720}
721
npujar467fe752020-01-16 20:17:45 +0530722func (kp *interContainerProxy) handleMessage(msg *ic.InterContainerMessage, targetInterface interface{}) {
khenaidooabad44c2018-08-03 16:58:35 -0400723
khenaidoo43c82122018-11-22 18:38:28 -0500724 // First extract the header to know whether this is a request - responses are handled by a different handler
khenaidoo79232702018-12-04 11:00:41 -0500725 if msg.Header.Type == ic.MessageType_REQUEST {
khenaidooabad44c2018-08-03 16:58:35 -0400726 var out []reflect.Value
727 var err error
728
729 // Get the request body
khenaidoo79232702018-12-04 11:00:41 -0500730 requestBody := &ic.InterContainerRequestBody{}
khenaidooabad44c2018-08-03 16:58:35 -0400731 if err = ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800732 logger.Warnw("cannot-unmarshal-request", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400733 } else {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800734 logger.Debugw("received-request", log.Fields{"rpc": requestBody.Rpc, "header": msg.Header})
khenaidooabad44c2018-08-03 16:58:35 -0400735 // let the callee unpack the arguments as its the only one that knows the real proto type
khenaidoo297cd252019-02-07 22:10:23 -0500736 // Augment the requestBody with the message Id as it will be used in scenarios where cores
737 // are set in pairs and competing
738 requestBody.Args = kp.addTransactionId(msg.Header.Id, requestBody.Args)
khenaidoo54e0ddf2019-02-27 16:21:33 -0500739
740 // Augment the requestBody with the From topic name as it will be used in scenarios where a container
741 // needs to send an unsollicited message to the currently requested container
742 requestBody.Args = kp.addFromTopic(msg.Header.FromTopic, requestBody.Args)
743
khenaidooabad44c2018-08-03 16:58:35 -0400744 out, err = CallFuncByName(targetInterface, requestBody.Rpc, requestBody.Args)
745 if err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800746 logger.Warn(err)
khenaidooabad44c2018-08-03 16:58:35 -0400747 }
748 }
749 // Response required?
750 if requestBody.ResponseRequired {
751 // If we already have an error before then just return that
khenaidoo79232702018-12-04 11:00:41 -0500752 var returnError *ic.Error
khenaidooabad44c2018-08-03 16:58:35 -0400753 var returnedValues []interface{}
754 var success bool
755 if err != nil {
khenaidoo79232702018-12-04 11:00:41 -0500756 returnError = &ic.Error{Reason: err.Error()}
khenaidooabad44c2018-08-03 16:58:35 -0400757 returnedValues = make([]interface{}, 1)
758 returnedValues[0] = returnError
759 } else {
khenaidoob9203542018-09-17 22:56:37 -0400760 returnedValues = make([]interface{}, 0)
761 // Check for errors first
762 lastIndex := len(out) - 1
763 if out[lastIndex].Interface() != nil { // Error
khenaidoo09771ef2019-10-11 14:25:02 -0400764 if retError, ok := out[lastIndex].Interface().(error); ok {
765 if retError.Error() == ErrorTransactionNotAcquired.Error() {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800766 logger.Debugw("Ignoring request", log.Fields{"error": retError, "txId": msg.Header.Id})
khenaidoo09771ef2019-10-11 14:25:02 -0400767 return // Ignore - process is in competing mode and ignored transaction
768 }
769 returnError = &ic.Error{Reason: retError.Error()}
khenaidoob9203542018-09-17 22:56:37 -0400770 returnedValues = append(returnedValues, returnError)
771 } else { // Should never happen
khenaidoo79232702018-12-04 11:00:41 -0500772 returnError = &ic.Error{Reason: "incorrect-error-returns"}
khenaidoob9203542018-09-17 22:56:37 -0400773 returnedValues = append(returnedValues, returnError)
774 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500775 } else if len(out) == 2 && reflect.ValueOf(out[0].Interface()).IsValid() && reflect.ValueOf(out[0].Interface()).IsNil() {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800776 logger.Warnw("Unexpected response of (nil,nil)", log.Fields{"txId": msg.Header.Id})
khenaidoo09771ef2019-10-11 14:25:02 -0400777 return // Ignore - should not happen
khenaidoob9203542018-09-17 22:56:37 -0400778 } else { // Non-error case
779 success = true
780 for idx, val := range out {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800781 //logger.Debugw("returned-api-response-loop", log.Fields{"idx": idx, "val": val.Interface()})
khenaidoob9203542018-09-17 22:56:37 -0400782 if idx != lastIndex {
783 returnedValues = append(returnedValues, val.Interface())
khenaidooabad44c2018-08-03 16:58:35 -0400784 }
khenaidooabad44c2018-08-03 16:58:35 -0400785 }
786 }
787 }
788
khenaidoo79232702018-12-04 11:00:41 -0500789 var icm *ic.InterContainerMessage
khenaidooabad44c2018-08-03 16:58:35 -0400790 if icm, err = encodeResponse(msg, success, returnedValues...); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800791 logger.Warnw("error-encoding-response-returning-failure-result", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400792 icm = encodeDefaultFailedResponse(msg)
793 }
khenaidoo43c82122018-11-22 18:38:28 -0500794 // To preserve ordering of messages, all messages to a given topic are sent to the same partition
795 // by providing a message key. The key is encoded in the topic name. If the deviceId is not
796 // present then the key will be empty, hence all messages for a given topic will be sent to all
797 // partitions.
798 replyTopic := &Topic{Name: msg.Header.FromTopic}
khenaidoobdcb8e02019-03-06 16:28:56 -0500799 key := msg.Header.KeyTopic
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800800 logger.Debugw("sending-response-to-kafka", log.Fields{"rpc": requestBody.Rpc, "header": icm.Header, "key": key})
khenaidoo43c82122018-11-22 18:38:28 -0500801 // TODO: handle error response.
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000802 go func() {
803 if err := kp.kafkaClient.Send(icm, replyTopic, key); err != nil {
804 logger.Errorw("send-reply-failed", log.Fields{
805 "topic": replyTopic,
806 "key": key,
807 "error": err})
808 }
809 }()
khenaidooabad44c2018-08-03 16:58:35 -0400810 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500811 } else if msg.Header.Type == ic.MessageType_RESPONSE {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800812 logger.Debugw("response-received", log.Fields{"msg-header": msg.Header})
khenaidoo54e0ddf2019-02-27 16:21:33 -0500813 go kp.dispatchResponse(msg)
814 } else {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800815 logger.Warnw("unsupported-message-received", log.Fields{"msg-header": msg.Header})
khenaidooabad44c2018-08-03 16:58:35 -0400816 }
817}
818
npujar467fe752020-01-16 20:17:45 +0530819func (kp *interContainerProxy) waitForMessages(ch <-chan *ic.InterContainerMessage, topic Topic, targetInterface interface{}) {
khenaidooabad44c2018-08-03 16:58:35 -0400820 // Wait for messages
821 for msg := range ch {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800822 //logger.Debugw("request-received", log.Fields{"msg": msg, "topic": topic.Name, "target": targetInterface})
khenaidoo54e0ddf2019-02-27 16:21:33 -0500823 go kp.handleMessage(msg, targetInterface)
khenaidooabad44c2018-08-03 16:58:35 -0400824 }
825}
826
npujar467fe752020-01-16 20:17:45 +0530827func (kp *interContainerProxy) dispatchResponse(msg *ic.InterContainerMessage) {
khenaidoo1ce37ad2019-03-24 22:07:24 -0400828 kp.lockTransactionIdToChannelMap.RLock()
829 defer kp.lockTransactionIdToChannelMap.RUnlock()
khenaidooabad44c2018-08-03 16:58:35 -0400830 if _, exist := kp.transactionIdToChannelMap[msg.Header.Id]; !exist {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800831 logger.Debugw("no-waiting-channel", log.Fields{"transaction": msg.Header.Id})
khenaidooabad44c2018-08-03 16:58:35 -0400832 return
833 }
khenaidoo43c82122018-11-22 18:38:28 -0500834 kp.transactionIdToChannelMap[msg.Header.Id].ch <- msg
khenaidooabad44c2018-08-03 16:58:35 -0400835}
836
khenaidooabad44c2018-08-03 16:58:35 -0400837// subscribeForResponse allows a caller to subscribe to a given topic when waiting for a response.
838// This method is built to prevent all subscribers to receive all messages as is the case of the Subscribe
839// API. There is one response channel waiting for kafka messages before dispatching the message to the
840// corresponding waiting channel
npujar467fe752020-01-16 20:17:45 +0530841func (kp *interContainerProxy) subscribeForResponse(topic Topic, trnsId string) (chan *ic.InterContainerMessage, error) {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800842 logger.Debugw("subscribeForResponse", log.Fields{"topic": topic.Name, "trnsid": trnsId})
khenaidooabad44c2018-08-03 16:58:35 -0400843
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500844 // Create a specific channel for this consumers. We cannot use the channel from the kafkaclient as it will
khenaidoo43c82122018-11-22 18:38:28 -0500845 // broadcast any message for this topic to all channels waiting on it.
Scott Bakerb9635992020-03-11 21:11:28 -0700846 // Set channel size to 1 to prevent deadlock, see VOL-2708
847 ch := make(chan *ic.InterContainerMessage, 1)
khenaidoo43c82122018-11-22 18:38:28 -0500848 kp.addToTransactionIdToChannelMap(trnsId, &topic, ch)
khenaidooabad44c2018-08-03 16:58:35 -0400849
850 return ch, nil
851}
852
npujar467fe752020-01-16 20:17:45 +0530853func (kp *interContainerProxy) unSubscribeForResponse(trnsId string) error {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800854 logger.Debugw("unsubscribe-for-response", log.Fields{"trnsId": trnsId})
khenaidoo7ff26c72019-01-16 14:55:48 -0500855 kp.deleteFromTransactionIdToChannelMap(trnsId)
khenaidooabad44c2018-08-03 16:58:35 -0400856 return nil
857}
858
npujar467fe752020-01-16 20:17:45 +0530859func (kp *interContainerProxy) EnableLivenessChannel(enable bool) chan bool {
Scott Bakeree6a0872019-10-29 15:59:52 -0700860 return kp.kafkaClient.EnableLivenessChannel(enable)
861}
862
npujar467fe752020-01-16 20:17:45 +0530863func (kp *interContainerProxy) EnableHealthinessChannel(enable bool) chan bool {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800864 return kp.kafkaClient.EnableHealthinessChannel(enable)
865}
866
npujar467fe752020-01-16 20:17:45 +0530867func (kp *interContainerProxy) SendLiveness() error {
Scott Bakeree6a0872019-10-29 15:59:52 -0700868 return kp.kafkaClient.SendLiveness()
869}
870
khenaidooabad44c2018-08-03 16:58:35 -0400871//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
872//or an error on failure
khenaidoobdcb8e02019-03-06 16:28:56 -0500873func encodeRequest(rpc string, toTopic *Topic, replyTopic *Topic, key string, kvArgs ...*KVArg) (*ic.InterContainerMessage, error) {
khenaidoo79232702018-12-04 11:00:41 -0500874 requestHeader := &ic.Header{
khenaidooabad44c2018-08-03 16:58:35 -0400875 Id: uuid.New().String(),
khenaidoo79232702018-12-04 11:00:41 -0500876 Type: ic.MessageType_REQUEST,
khenaidooabad44c2018-08-03 16:58:35 -0400877 FromTopic: replyTopic.Name,
878 ToTopic: toTopic.Name,
khenaidoo2c6a0992019-04-29 13:46:56 -0400879 KeyTopic: key,
Scott Baker504b4802020-04-17 10:12:20 -0700880 Timestamp: ptypes.TimestampNow(),
khenaidooabad44c2018-08-03 16:58:35 -0400881 }
khenaidoo79232702018-12-04 11:00:41 -0500882 requestBody := &ic.InterContainerRequestBody{
khenaidooabad44c2018-08-03 16:58:35 -0400883 Rpc: rpc,
884 ResponseRequired: true,
885 ReplyToTopic: replyTopic.Name,
886 }
887
888 for _, arg := range kvArgs {
khenaidoo2c6f1672018-09-20 23:14:41 -0400889 if arg == nil {
890 // In case the caller sends an array with empty args
891 continue
892 }
khenaidooabad44c2018-08-03 16:58:35 -0400893 var marshalledArg *any.Any
894 var err error
895 // ascertain the value interface type is a proto.Message
896 protoValue, ok := arg.Value.(proto.Message)
897 if !ok {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800898 logger.Warnw("argument-value-not-proto-message", log.Fields{"error": ok, "Value": arg.Value})
khenaidooabad44c2018-08-03 16:58:35 -0400899 err := errors.New("argument-value-not-proto-message")
900 return nil, err
901 }
902 if marshalledArg, err = ptypes.MarshalAny(protoValue); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800903 logger.Warnw("cannot-marshal-request", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400904 return nil, err
905 }
khenaidoo79232702018-12-04 11:00:41 -0500906 protoArg := &ic.Argument{
khenaidooabad44c2018-08-03 16:58:35 -0400907 Key: arg.Key,
908 Value: marshalledArg,
909 }
910 requestBody.Args = append(requestBody.Args, protoArg)
911 }
912
913 var marshalledData *any.Any
914 var err error
915 if marshalledData, err = ptypes.MarshalAny(requestBody); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800916 logger.Warnw("cannot-marshal-request", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400917 return nil, err
918 }
khenaidoo79232702018-12-04 11:00:41 -0500919 request := &ic.InterContainerMessage{
khenaidooabad44c2018-08-03 16:58:35 -0400920 Header: requestHeader,
921 Body: marshalledData,
922 }
923 return request, nil
924}
925
khenaidoo79232702018-12-04 11:00:41 -0500926func decodeResponse(response *ic.InterContainerMessage) (*ic.InterContainerResponseBody, error) {
khenaidooabad44c2018-08-03 16:58:35 -0400927 // Extract the message body
khenaidoo79232702018-12-04 11:00:41 -0500928 responseBody := ic.InterContainerResponseBody{}
khenaidooabad44c2018-08-03 16:58:35 -0400929 if err := ptypes.UnmarshalAny(response.Body, &responseBody); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800930 logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400931 return nil, err
932 }
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800933 //logger.Debugw("response-decoded-successfully", log.Fields{"response-status": &responseBody.Success})
khenaidooabad44c2018-08-03 16:58:35 -0400934
935 return &responseBody, nil
936
937}