blob: 368391e4814174f5c57bb9d863724a5e9d9f7d5e [file] [log] [blame]
khenaidoobf6e7bb2018-08-14 22:27:29 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
khenaidooabad44c2018-08-03 16:58:35 -040016package kafka
17
18import (
19 "context"
20 "errors"
21 "fmt"
khenaidooabad44c2018-08-03 16:58:35 -040022 "reflect"
khenaidoo19374072018-12-11 11:05:15 -050023 "strings"
khenaidooabad44c2018-08-03 16:58:35 -040024 "sync"
25 "time"
khenaidooabad44c2018-08-03 16:58:35 -040026
David Bainbridge9ae13132020-06-22 17:28:01 -070027 "google.golang.org/grpc/codes"
28 "google.golang.org/grpc/status"
29
serkant.uluderya2ae470f2020-01-21 11:13:09 -080030 "github.com/golang/protobuf/proto"
31 "github.com/golang/protobuf/ptypes"
32 "github.com/golang/protobuf/ptypes/any"
33 "github.com/google/uuid"
34 "github.com/opencord/voltha-lib-go/v3/pkg/log"
35 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
36)
khenaidooabad44c2018-08-03 16:58:35 -040037
38const (
khenaidoo43c82122018-11-22 18:38:28 -050039 DefaultMaxRetries = 3
Scott Bakerb9635992020-03-11 21:11:28 -070040 DefaultRequestTimeout = 60000 // 60000 milliseconds - to handle a wider latency range
khenaidooabad44c2018-08-03 16:58:35 -040041)
42
khenaidoo297cd252019-02-07 22:10:23 -050043const (
44 TransactionKey = "transactionID"
khenaidoo54e0ddf2019-02-27 16:21:33 -050045 FromTopic = "fromTopic"
khenaidoo297cd252019-02-07 22:10:23 -050046)
47
khenaidoo09771ef2019-10-11 14:25:02 -040048var ErrorTransactionNotAcquired = errors.New("transaction-not-acquired")
49var ErrorTransactionInvalidId = errors.New("transaction-invalid-id")
50
khenaidoo43c82122018-11-22 18:38:28 -050051// requestHandlerChannel represents an interface associated with a channel. Whenever, an event is
52// obtained from that channel, this interface is invoked. This is used to handle
53// async requests into the Core via the kafka messaging bus
54type requestHandlerChannel struct {
55 requesthandlerInterface interface{}
khenaidoo79232702018-12-04 11:00:41 -050056 ch <-chan *ic.InterContainerMessage
khenaidooabad44c2018-08-03 16:58:35 -040057}
58
khenaidoo43c82122018-11-22 18:38:28 -050059// transactionChannel represents a combination of a topic and a channel onto which a response received
60// on the kafka bus will be sent to
61type transactionChannel struct {
62 topic *Topic
khenaidoo79232702018-12-04 11:00:41 -050063 ch chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -050064}
65
npujar467fe752020-01-16 20:17:45 +053066type InterContainerProxy interface {
Rohan Agrawal31f21802020-06-12 05:38:46 +000067 Start(ctx context.Context) error
68 Stop(ctx context.Context)
npujar467fe752020-01-16 20:17:45 +053069 GetDefaultTopic() *Topic
npujar467fe752020-01-16 20:17:45 +053070 InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic, waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any)
Scott Bakerb9635992020-03-11 21:11:28 -070071 InvokeAsyncRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic, waitForResponse bool, key string, kvArgs ...*KVArg) chan *RpcResponse
Rohan Agrawal31f21802020-06-12 05:38:46 +000072 SubscribeWithRequestHandlerInterface(ctx context.Context, topic Topic, handler interface{}) error
73 SubscribeWithDefaultRequestHandler(ctx context.Context, topic Topic, initialOffset int64) error
74 UnSubscribeFromRequestHandler(ctx context.Context, topic Topic) error
75 DeleteTopic(ctx context.Context, topic Topic) error
76 EnableLivenessChannel(ctx context.Context, enable bool) chan bool
77 SendLiveness(ctx context.Context) error
npujar467fe752020-01-16 20:17:45 +053078}
79
80// interContainerProxy represents the messaging proxy
81type interContainerProxy struct {
Neha Sharmad1387da2020-05-07 20:07:28 +000082 kafkaAddress string
npujar467fe752020-01-16 20:17:45 +053083 defaultTopic *Topic
khenaidoo43c82122018-11-22 18:38:28 -050084 defaultRequestHandlerInterface interface{}
85 kafkaClient Client
npujar467fe752020-01-16 20:17:45 +053086 doneCh chan struct{}
87 doneOnce sync.Once
khenaidoo43c82122018-11-22 18:38:28 -050088
89 // This map is used to map a topic to an interface and channel. When a request is received
90 // on that channel (registered to the topic) then that interface is invoked.
91 topicToRequestHandlerChannelMap map[string]*requestHandlerChannel
92 lockTopicRequestHandlerChannelMap sync.RWMutex
93
94 // This map is used to map a channel to a response topic. This channel handles all responses on that
khenaidoo4c1a5bf2018-11-29 15:53:42 -050095 // channel for that topic and forward them to the appropriate consumers channel, using the
khenaidoo43c82122018-11-22 18:38:28 -050096 // transactionIdToChannelMap.
khenaidoo79232702018-12-04 11:00:41 -050097 topicToResponseChannelMap map[string]<-chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -050098 lockTopicResponseChannelMap sync.RWMutex
99
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500100 // This map is used to map a transaction to a consumers channel. This is used whenever a request has been
khenaidoo43c82122018-11-22 18:38:28 -0500101 // sent out and we are waiting for a response.
102 transactionIdToChannelMap map[string]*transactionChannel
khenaidooabad44c2018-08-03 16:58:35 -0400103 lockTransactionIdToChannelMap sync.RWMutex
104}
105
npujar467fe752020-01-16 20:17:45 +0530106type InterContainerProxyOption func(*interContainerProxy)
khenaidooabad44c2018-08-03 16:58:35 -0400107
Neha Sharmad1387da2020-05-07 20:07:28 +0000108func InterContainerAddress(address string) InterContainerProxyOption {
npujar467fe752020-01-16 20:17:45 +0530109 return func(args *interContainerProxy) {
Neha Sharmad1387da2020-05-07 20:07:28 +0000110 args.kafkaAddress = address
khenaidooabad44c2018-08-03 16:58:35 -0400111 }
112}
113
khenaidoo43c82122018-11-22 18:38:28 -0500114func DefaultTopic(topic *Topic) InterContainerProxyOption {
npujar467fe752020-01-16 20:17:45 +0530115 return func(args *interContainerProxy) {
116 args.defaultTopic = topic
khenaidooabad44c2018-08-03 16:58:35 -0400117 }
118}
119
khenaidoo43c82122018-11-22 18:38:28 -0500120func RequestHandlerInterface(handler interface{}) InterContainerProxyOption {
npujar467fe752020-01-16 20:17:45 +0530121 return func(args *interContainerProxy) {
khenaidoo43c82122018-11-22 18:38:28 -0500122 args.defaultRequestHandlerInterface = handler
khenaidooabad44c2018-08-03 16:58:35 -0400123 }
124}
125
khenaidoo43c82122018-11-22 18:38:28 -0500126func MsgClient(client Client) InterContainerProxyOption {
npujar467fe752020-01-16 20:17:45 +0530127 return func(args *interContainerProxy) {
khenaidoo43c82122018-11-22 18:38:28 -0500128 args.kafkaClient = client
129 }
130}
131
npujar467fe752020-01-16 20:17:45 +0530132func newInterContainerProxy(opts ...InterContainerProxyOption) *interContainerProxy {
133 proxy := &interContainerProxy{
Neha Sharmad1387da2020-05-07 20:07:28 +0000134 kafkaAddress: DefaultKafkaAddress,
135 doneCh: make(chan struct{}),
khenaidooabad44c2018-08-03 16:58:35 -0400136 }
137
138 for _, option := range opts {
139 option(proxy)
140 }
141
npujar467fe752020-01-16 20:17:45 +0530142 return proxy
khenaidooabad44c2018-08-03 16:58:35 -0400143}
144
npujar467fe752020-01-16 20:17:45 +0530145func NewInterContainerProxy(opts ...InterContainerProxyOption) InterContainerProxy {
146 return newInterContainerProxy(opts...)
147}
148
Rohan Agrawal31f21802020-06-12 05:38:46 +0000149func (kp *interContainerProxy) Start(ctx context.Context) error {
150 logger.Info(ctx, "Starting-Proxy")
khenaidooabad44c2018-08-03 16:58:35 -0400151
khenaidoo43c82122018-11-22 18:38:28 -0500152 // Kafka MsgClient should already have been created. If not, output fatal error
153 if kp.kafkaClient == nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000154 logger.Fatal(ctx, "kafka-client-not-set")
khenaidoo43c82122018-11-22 18:38:28 -0500155 }
156
khenaidoo43c82122018-11-22 18:38:28 -0500157 // Start the kafka client
Rohan Agrawal31f21802020-06-12 05:38:46 +0000158 if err := kp.kafkaClient.Start(ctx); err != nil {
159 logger.Errorw(ctx, "Cannot-create-kafka-proxy", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400160 return err
161 }
162
khenaidoo43c82122018-11-22 18:38:28 -0500163 // Create the topic to response channel map
khenaidoo79232702018-12-04 11:00:41 -0500164 kp.topicToResponseChannelMap = make(map[string]<-chan *ic.InterContainerMessage)
khenaidoo43c82122018-11-22 18:38:28 -0500165 //
khenaidooabad44c2018-08-03 16:58:35 -0400166 // Create the transactionId to Channel Map
khenaidoo43c82122018-11-22 18:38:28 -0500167 kp.transactionIdToChannelMap = make(map[string]*transactionChannel)
168
169 // Create the topic to request channel map
170 kp.topicToRequestHandlerChannelMap = make(map[string]*requestHandlerChannel)
khenaidooabad44c2018-08-03 16:58:35 -0400171
172 return nil
173}
174
Rohan Agrawal31f21802020-06-12 05:38:46 +0000175func (kp *interContainerProxy) Stop(ctx context.Context) {
176 logger.Info(ctx, "stopping-intercontainer-proxy")
npujar467fe752020-01-16 20:17:45 +0530177 kp.doneOnce.Do(func() { close(kp.doneCh) })
khenaidoo43c82122018-11-22 18:38:28 -0500178 // TODO : Perform cleanup
Rohan Agrawal31f21802020-06-12 05:38:46 +0000179 kp.kafkaClient.Stop(ctx)
180 err := kp.deleteAllTopicRequestHandlerChannelMap(ctx)
Scott Baker0e78ba22020-02-24 17:58:47 -0800181 if err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000182 logger.Errorw(ctx, "failed-delete-all-topic-request-handler-channel-map", log.Fields{"error": err})
Scott Baker0e78ba22020-02-24 17:58:47 -0800183 }
Rohan Agrawal31f21802020-06-12 05:38:46 +0000184 err = kp.deleteAllTopicResponseChannelMap(ctx)
Scott Baker0e78ba22020-02-24 17:58:47 -0800185 if err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000186 logger.Errorw(ctx, "failed-delete-all-topic-response-channel-map", log.Fields{"error": err})
Scott Baker0e78ba22020-02-24 17:58:47 -0800187 }
Rohan Agrawal31f21802020-06-12 05:38:46 +0000188 kp.deleteAllTransactionIdToChannelMap(ctx)
khenaidooabad44c2018-08-03 16:58:35 -0400189}
190
npujar467fe752020-01-16 20:17:45 +0530191func (kp *interContainerProxy) GetDefaultTopic() *Topic {
192 return kp.defaultTopic
193}
194
Scott Bakerb9635992020-03-11 21:11:28 -0700195// InvokeAsyncRPC is used to make an RPC request asynchronously
196func (kp *interContainerProxy) InvokeAsyncRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
197 waitForResponse bool, key string, kvArgs ...*KVArg) chan *RpcResponse {
198
Rohan Agrawal31f21802020-06-12 05:38:46 +0000199 logger.Debugw(ctx, "InvokeAsyncRPC", log.Fields{"rpc": rpc, "key": key})
Scott Bakerb9635992020-03-11 21:11:28 -0700200 // If a replyToTopic is provided then we use it, otherwise just use the default toTopic. The replyToTopic is
201 // typically the device ID.
202 responseTopic := replyToTopic
203 if responseTopic == nil {
204 responseTopic = kp.GetDefaultTopic()
205 }
206
207 chnl := make(chan *RpcResponse)
208
209 go func() {
210
211 // once we're done,
212 // close the response channel
213 defer close(chnl)
214
215 var err error
216 var protoRequest *ic.InterContainerMessage
217
218 // Encode the request
Rohan Agrawal31f21802020-06-12 05:38:46 +0000219 protoRequest, err = encodeRequest(ctx, rpc, toTopic, responseTopic, key, kvArgs...)
Scott Bakerb9635992020-03-11 21:11:28 -0700220 if err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000221 logger.Warnw(ctx, "cannot-format-request", log.Fields{"rpc": rpc, "error": err})
Scott Bakerb9635992020-03-11 21:11:28 -0700222 chnl <- NewResponse(RpcFormattingError, err, nil)
223 return
224 }
225
226 // Subscribe for response, if needed, before sending request
227 var ch <-chan *ic.InterContainerMessage
Rohan Agrawal31f21802020-06-12 05:38:46 +0000228 if ch, err = kp.subscribeForResponse(ctx, *responseTopic, protoRequest.Header.Id); err != nil {
229 logger.Errorw(ctx, "failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
Scott Bakerb9635992020-03-11 21:11:28 -0700230 chnl <- NewResponse(RpcTransportError, err, nil)
231 return
232 }
233
234 // Send request - if the topic is formatted with a device Id then we will send the request using a
235 // specific key, hence ensuring a single partition is used to publish the request. This ensures that the
236 // subscriber on that topic will receive the request in the order it was sent. The key used is the deviceId.
Rohan Agrawal31f21802020-06-12 05:38:46 +0000237 logger.Debugw(ctx, "sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
Scott Bakerb9635992020-03-11 21:11:28 -0700238
239 // if the message is not sent on kafka publish an event an close the channel
Rohan Agrawal31f21802020-06-12 05:38:46 +0000240 if err = kp.kafkaClient.Send(ctx, protoRequest, toTopic, key); err != nil {
Scott Bakerb9635992020-03-11 21:11:28 -0700241 chnl <- NewResponse(RpcTransportError, err, nil)
242 return
243 }
244
245 // if the client is not waiting for a response send the ack and close the channel
246 chnl <- NewResponse(RpcSent, nil, nil)
247 if !waitForResponse {
248 return
249 }
250
251 defer func() {
252 // Remove the subscription for a response on return
Rohan Agrawal31f21802020-06-12 05:38:46 +0000253 if err := kp.unSubscribeForResponse(ctx, protoRequest.Header.Id); err != nil {
254 logger.Warnw(ctx, "invoke-async-rpc-unsubscriber-for-response-failed", log.Fields{"err": err})
Scott Bakerb9635992020-03-11 21:11:28 -0700255 }
256 }()
257
258 // Wait for response as well as timeout or cancellation
259 select {
260 case msg, ok := <-ch:
261 if !ok {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000262 logger.Warnw(ctx, "channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
Scott Bakerb9635992020-03-11 21:11:28 -0700263 chnl <- NewResponse(RpcTransportError, status.Error(codes.Aborted, "channel closed"), nil)
264 }
Rohan Agrawal31f21802020-06-12 05:38:46 +0000265 logger.Debugw(ctx, "received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
266 if responseBody, err := decodeResponse(ctx, msg); err != nil {
Scott Bakerb9635992020-03-11 21:11:28 -0700267 chnl <- NewResponse(RpcReply, err, nil)
268 } else {
269 if responseBody.Success {
270 chnl <- NewResponse(RpcReply, nil, responseBody.Result)
271 } else {
272 // response body contains an error
273 unpackErr := &ic.Error{}
274 if err := ptypes.UnmarshalAny(responseBody.Result, unpackErr); err != nil {
275 chnl <- NewResponse(RpcReply, err, nil)
276 } else {
277 chnl <- NewResponse(RpcReply, status.Error(codes.Internal, unpackErr.Reason), nil)
278 }
279 }
280 }
281 case <-ctx.Done():
Rohan Agrawal31f21802020-06-12 05:38:46 +0000282 logger.Errorw(ctx, "context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
Scott Bakerb9635992020-03-11 21:11:28 -0700283 err := status.Error(codes.DeadlineExceeded, ctx.Err().Error())
284 chnl <- NewResponse(RpcTimeout, err, nil)
285 case <-kp.doneCh:
286 chnl <- NewResponse(RpcSystemClosing, nil, nil)
Rohan Agrawal31f21802020-06-12 05:38:46 +0000287 logger.Warnw(ctx, "received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
Scott Bakerb9635992020-03-11 21:11:28 -0700288 }
289 }()
290 return chnl
291}
292
khenaidoo43c82122018-11-22 18:38:28 -0500293// InvokeRPC is used to send a request to a given topic
npujar467fe752020-01-16 20:17:45 +0530294func (kp *interContainerProxy) InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
khenaidoobdcb8e02019-03-06 16:28:56 -0500295 waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any) {
khenaidoo43c82122018-11-22 18:38:28 -0500296
297 // If a replyToTopic is provided then we use it, otherwise just use the default toTopic. The replyToTopic is
298 // typically the device ID.
299 responseTopic := replyToTopic
300 if responseTopic == nil {
npujar467fe752020-01-16 20:17:45 +0530301 responseTopic = kp.defaultTopic
khenaidoo43c82122018-11-22 18:38:28 -0500302 }
303
khenaidooabad44c2018-08-03 16:58:35 -0400304 // Encode the request
Rohan Agrawal31f21802020-06-12 05:38:46 +0000305 protoRequest, err := encodeRequest(ctx, rpc, toTopic, responseTopic, key, kvArgs...)
khenaidooabad44c2018-08-03 16:58:35 -0400306 if err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000307 logger.Warnw(ctx, "cannot-format-request", log.Fields{"rpc": rpc, "error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400308 return false, nil
309 }
310
311 // Subscribe for response, if needed, before sending request
khenaidoo79232702018-12-04 11:00:41 -0500312 var ch <-chan *ic.InterContainerMessage
khenaidooabad44c2018-08-03 16:58:35 -0400313 if waitForResponse {
314 var err error
Rohan Agrawal31f21802020-06-12 05:38:46 +0000315 if ch, err = kp.subscribeForResponse(ctx, *responseTopic, protoRequest.Header.Id); err != nil {
316 logger.Errorw(ctx, "failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
khenaidooabad44c2018-08-03 16:58:35 -0400317 }
318 }
319
khenaidoo43c82122018-11-22 18:38:28 -0500320 // Send request - if the topic is formatted with a device Id then we will send the request using a
321 // specific key, hence ensuring a single partition is used to publish the request. This ensures that the
322 // subscriber on that topic will receive the request in the order it was sent. The key used is the deviceId.
khenaidoobdcb8e02019-03-06 16:28:56 -0500323 //key := GetDeviceIdFromTopic(*toTopic)
Rohan Agrawal31f21802020-06-12 05:38:46 +0000324 logger.Debugw(ctx, "sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000325 go func() {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000326 if err := kp.kafkaClient.Send(ctx, protoRequest, toTopic, key); err != nil {
327 logger.Errorw(ctx, "send-failed", log.Fields{
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000328 "topic": toTopic,
329 "key": key,
330 "error": err})
331 }
332 }()
khenaidooabad44c2018-08-03 16:58:35 -0400333
334 if waitForResponse {
khenaidoob9203542018-09-17 22:56:37 -0400335 // Create a child context based on the parent context, if any
khenaidooabad44c2018-08-03 16:58:35 -0400336 var cancel context.CancelFunc
khenaidoob9203542018-09-17 22:56:37 -0400337 childCtx := context.Background()
khenaidooabad44c2018-08-03 16:58:35 -0400338 if ctx == nil {
339 ctx, cancel = context.WithTimeout(context.Background(), DefaultRequestTimeout*time.Millisecond)
khenaidoob9203542018-09-17 22:56:37 -0400340 } else {
341 childCtx, cancel = context.WithTimeout(ctx, DefaultRequestTimeout*time.Millisecond)
khenaidooabad44c2018-08-03 16:58:35 -0400342 }
khenaidoob9203542018-09-17 22:56:37 -0400343 defer cancel()
khenaidooabad44c2018-08-03 16:58:35 -0400344
345 // Wait for response as well as timeout or cancellation
346 // Remove the subscription for a response on return
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000347 defer func() {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000348 if err := kp.unSubscribeForResponse(ctx, protoRequest.Header.Id); err != nil {
349 logger.Errorw(ctx, "response-unsubscribe-failed", log.Fields{
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000350 "id": protoRequest.Header.Id,
351 "error": err})
352 }
353 }()
khenaidooabad44c2018-08-03 16:58:35 -0400354 select {
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500355 case msg, ok := <-ch:
356 if !ok {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000357 logger.Warnw(ctx, "channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500358 protoError := &ic.Error{Reason: "channel-closed"}
359 var marshalledArg *any.Any
360 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
361 return false, nil // Should never happen
362 }
363 return false, marshalledArg
364 }
Rohan Agrawal31f21802020-06-12 05:38:46 +0000365 logger.Debugw(ctx, "received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
khenaidoo79232702018-12-04 11:00:41 -0500366 var responseBody *ic.InterContainerResponseBody
khenaidooabad44c2018-08-03 16:58:35 -0400367 var err error
Rohan Agrawal31f21802020-06-12 05:38:46 +0000368 if responseBody, err = decodeResponse(ctx, msg); err != nil {
369 logger.Errorw(ctx, "decode-response-error", log.Fields{"error": err})
npujar467fe752020-01-16 20:17:45 +0530370 // FIXME we should return something
khenaidooabad44c2018-08-03 16:58:35 -0400371 }
372 return responseBody.Success, responseBody.Result
373 case <-ctx.Done():
Rohan Agrawal31f21802020-06-12 05:38:46 +0000374 logger.Debugw(ctx, "context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
khenaidooabad44c2018-08-03 16:58:35 -0400375 // pack the error as proto any type
npujar467fe752020-01-16 20:17:45 +0530376 protoError := &ic.Error{Reason: ctx.Err().Error(), Code: ic.ErrorCode_DEADLINE_EXCEEDED}
377
khenaidooabad44c2018-08-03 16:58:35 -0400378 var marshalledArg *any.Any
379 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
380 return false, nil // Should never happen
381 }
382 return false, marshalledArg
khenaidoob9203542018-09-17 22:56:37 -0400383 case <-childCtx.Done():
Rohan Agrawal31f21802020-06-12 05:38:46 +0000384 logger.Debugw(ctx, "context-cancelled", log.Fields{"rpc": rpc, "ctx": childCtx.Err()})
khenaidoob9203542018-09-17 22:56:37 -0400385 // pack the error as proto any type
npujar467fe752020-01-16 20:17:45 +0530386 protoError := &ic.Error{Reason: childCtx.Err().Error(), Code: ic.ErrorCode_DEADLINE_EXCEEDED}
387
khenaidoob9203542018-09-17 22:56:37 -0400388 var marshalledArg *any.Any
389 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
390 return false, nil // Should never happen
391 }
392 return false, marshalledArg
khenaidooabad44c2018-08-03 16:58:35 -0400393 case <-kp.doneCh:
Rohan Agrawal31f21802020-06-12 05:38:46 +0000394 logger.Infow(ctx, "received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
khenaidooabad44c2018-08-03 16:58:35 -0400395 return true, nil
396 }
397 }
398 return true, nil
399}
400
khenaidoo43c82122018-11-22 18:38:28 -0500401// SubscribeWithRequestHandlerInterface allows a caller to assign a target object to be invoked automatically
khenaidooabad44c2018-08-03 16:58:35 -0400402// when a message is received on a given topic
Rohan Agrawal31f21802020-06-12 05:38:46 +0000403func (kp *interContainerProxy) SubscribeWithRequestHandlerInterface(ctx context.Context, topic Topic, handler interface{}) error {
khenaidooabad44c2018-08-03 16:58:35 -0400404
405 // Subscribe to receive messages for that topic
khenaidoo79232702018-12-04 11:00:41 -0500406 var ch <-chan *ic.InterContainerMessage
khenaidooabad44c2018-08-03 16:58:35 -0400407 var err error
Rohan Agrawal31f21802020-06-12 05:38:46 +0000408 if ch, err = kp.kafkaClient.Subscribe(ctx, &topic); err != nil {
khenaidoo43c82122018-11-22 18:38:28 -0500409 //if ch, err = kp.Subscribe(topic); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000410 logger.Errorw(ctx, "failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
Abhilash S.L90cd9552019-07-18 17:30:29 +0530411 return err
khenaidooabad44c2018-08-03 16:58:35 -0400412 }
khenaidoo43c82122018-11-22 18:38:28 -0500413
414 kp.defaultRequestHandlerInterface = handler
415 kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: handler, ch: ch})
khenaidooabad44c2018-08-03 16:58:35 -0400416 // Launch a go routine to receive and process kafka messages
Rohan Agrawal31f21802020-06-12 05:38:46 +0000417 go kp.waitForMessages(ctx, ch, topic, handler)
khenaidooabad44c2018-08-03 16:58:35 -0400418
419 return nil
420}
421
khenaidoo43c82122018-11-22 18:38:28 -0500422// SubscribeWithDefaultRequestHandler allows a caller to add a topic to an existing target object to be invoked automatically
423// when a message is received on a given topic. So far there is only 1 target registered per microservice
Rohan Agrawal31f21802020-06-12 05:38:46 +0000424func (kp *interContainerProxy) SubscribeWithDefaultRequestHandler(ctx context.Context, topic Topic, initialOffset int64) error {
khenaidoo43c82122018-11-22 18:38:28 -0500425 // Subscribe to receive messages for that topic
khenaidoo79232702018-12-04 11:00:41 -0500426 var ch <-chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -0500427 var err error
Rohan Agrawal31f21802020-06-12 05:38:46 +0000428 if ch, err = kp.kafkaClient.Subscribe(ctx, &topic, &KVArg{Key: Offset, Value: initialOffset}); err != nil {
429 logger.Errorw(ctx, "failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
khenaidooca301322019-01-09 23:06:32 -0500430 return err
khenaidoo43c82122018-11-22 18:38:28 -0500431 }
432 kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: kp.defaultRequestHandlerInterface, ch: ch})
433
434 // Launch a go routine to receive and process kafka messages
Rohan Agrawal31f21802020-06-12 05:38:46 +0000435 go kp.waitForMessages(ctx, ch, topic, kp.defaultRequestHandlerInterface)
khenaidoo43c82122018-11-22 18:38:28 -0500436
khenaidooabad44c2018-08-03 16:58:35 -0400437 return nil
438}
439
Rohan Agrawal31f21802020-06-12 05:38:46 +0000440func (kp *interContainerProxy) UnSubscribeFromRequestHandler(ctx context.Context, topic Topic) error {
441 return kp.deleteFromTopicRequestHandlerChannelMap(ctx, topic.Name)
khenaidoo43c82122018-11-22 18:38:28 -0500442}
443
Rohan Agrawal31f21802020-06-12 05:38:46 +0000444func (kp *interContainerProxy) deleteFromTopicResponseChannelMap(ctx context.Context, topic string) error {
khenaidoo43c82122018-11-22 18:38:28 -0500445 kp.lockTopicResponseChannelMap.Lock()
446 defer kp.lockTopicResponseChannelMap.Unlock()
447 if _, exist := kp.topicToResponseChannelMap[topic]; exist {
448 // Unsubscribe to this topic first - this will close the subscribed channel
449 var err error
Rohan Agrawal31f21802020-06-12 05:38:46 +0000450 if err = kp.kafkaClient.UnSubscribe(ctx, &Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
451 logger.Errorw(ctx, "unsubscribing-error", log.Fields{"topic": topic})
khenaidoo43c82122018-11-22 18:38:28 -0500452 }
453 delete(kp.topicToResponseChannelMap, topic)
454 return err
455 } else {
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000456 return fmt.Errorf("%s-Topic-not-found", topic)
khenaidooabad44c2018-08-03 16:58:35 -0400457 }
458}
459
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000460// nolint: unused
Rohan Agrawal31f21802020-06-12 05:38:46 +0000461func (kp *interContainerProxy) deleteAllTopicResponseChannelMap(ctx context.Context) error {
462 logger.Debug(ctx, "delete-all-topic-response-channel")
khenaidoo43c82122018-11-22 18:38:28 -0500463 kp.lockTopicResponseChannelMap.Lock()
464 defer kp.lockTopicResponseChannelMap.Unlock()
Scott Baker0e78ba22020-02-24 17:58:47 -0800465 var unsubscribeFailTopics []string
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000466 for topic := range kp.topicToResponseChannelMap {
khenaidoo43c82122018-11-22 18:38:28 -0500467 // Unsubscribe to this topic first - this will close the subscribed channel
Rohan Agrawal31f21802020-06-12 05:38:46 +0000468 if err := kp.kafkaClient.UnSubscribe(ctx, &Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
Scott Baker0e78ba22020-02-24 17:58:47 -0800469 unsubscribeFailTopics = append(unsubscribeFailTopics, topic)
Rohan Agrawal31f21802020-06-12 05:38:46 +0000470 logger.Errorw(ctx, "unsubscribing-error", log.Fields{"topic": topic, "error": err})
Scott Baker0e78ba22020-02-24 17:58:47 -0800471 // Do not return. Continue to try to unsubscribe to other topics.
472 } else {
473 // Only delete from channel map if successfully unsubscribed.
474 delete(kp.topicToResponseChannelMap, topic)
khenaidoo43c82122018-11-22 18:38:28 -0500475 }
khenaidooabad44c2018-08-03 16:58:35 -0400476 }
Scott Baker0e78ba22020-02-24 17:58:47 -0800477 if len(unsubscribeFailTopics) > 0 {
478 return fmt.Errorf("unsubscribe-errors: %v", unsubscribeFailTopics)
479 }
480 return nil
khenaidooabad44c2018-08-03 16:58:35 -0400481}
482
npujar467fe752020-01-16 20:17:45 +0530483func (kp *interContainerProxy) addToTopicRequestHandlerChannelMap(topic string, arg *requestHandlerChannel) {
khenaidoo43c82122018-11-22 18:38:28 -0500484 kp.lockTopicRequestHandlerChannelMap.Lock()
485 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
486 if _, exist := kp.topicToRequestHandlerChannelMap[topic]; !exist {
487 kp.topicToRequestHandlerChannelMap[topic] = arg
khenaidooabad44c2018-08-03 16:58:35 -0400488 }
khenaidooabad44c2018-08-03 16:58:35 -0400489}
490
Rohan Agrawal31f21802020-06-12 05:38:46 +0000491func (kp *interContainerProxy) deleteFromTopicRequestHandlerChannelMap(ctx context.Context, topic string) error {
khenaidoo43c82122018-11-22 18:38:28 -0500492 kp.lockTopicRequestHandlerChannelMap.Lock()
493 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
494 if _, exist := kp.topicToRequestHandlerChannelMap[topic]; exist {
495 // Close the kafka client client first by unsubscribing to this topic
Rohan Agrawal31f21802020-06-12 05:38:46 +0000496 if err := kp.kafkaClient.UnSubscribe(ctx, &Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000497 return err
498 }
khenaidoo43c82122018-11-22 18:38:28 -0500499 delete(kp.topicToRequestHandlerChannelMap, topic)
khenaidooabad44c2018-08-03 16:58:35 -0400500 return nil
khenaidoo43c82122018-11-22 18:38:28 -0500501 } else {
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000502 return fmt.Errorf("%s-Topic-not-found", topic)
khenaidooabad44c2018-08-03 16:58:35 -0400503 }
khenaidooabad44c2018-08-03 16:58:35 -0400504}
505
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000506// nolint: unused
Rohan Agrawal31f21802020-06-12 05:38:46 +0000507func (kp *interContainerProxy) deleteAllTopicRequestHandlerChannelMap(ctx context.Context) error {
508 logger.Debug(ctx, "delete-all-topic-request-channel")
khenaidoo43c82122018-11-22 18:38:28 -0500509 kp.lockTopicRequestHandlerChannelMap.Lock()
510 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
Scott Baker0e78ba22020-02-24 17:58:47 -0800511 var unsubscribeFailTopics []string
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000512 for topic := range kp.topicToRequestHandlerChannelMap {
khenaidoo43c82122018-11-22 18:38:28 -0500513 // Close the kafka client client first by unsubscribing to this topic
Rohan Agrawal31f21802020-06-12 05:38:46 +0000514 if err := kp.kafkaClient.UnSubscribe(ctx, &Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
Scott Baker0e78ba22020-02-24 17:58:47 -0800515 unsubscribeFailTopics = append(unsubscribeFailTopics, topic)
Rohan Agrawal31f21802020-06-12 05:38:46 +0000516 logger.Errorw(ctx, "unsubscribing-error", log.Fields{"topic": topic, "error": err})
Scott Baker0e78ba22020-02-24 17:58:47 -0800517 // Do not return. Continue to try to unsubscribe to other topics.
518 } else {
519 // Only delete from channel map if successfully unsubscribed.
520 delete(kp.topicToRequestHandlerChannelMap, topic)
khenaidoo43c82122018-11-22 18:38:28 -0500521 }
khenaidoo43c82122018-11-22 18:38:28 -0500522 }
Scott Baker0e78ba22020-02-24 17:58:47 -0800523 if len(unsubscribeFailTopics) > 0 {
524 return fmt.Errorf("unsubscribe-errors: %v", unsubscribeFailTopics)
525 }
526 return nil
khenaidoo43c82122018-11-22 18:38:28 -0500527}
528
npujar467fe752020-01-16 20:17:45 +0530529func (kp *interContainerProxy) addToTransactionIdToChannelMap(id string, topic *Topic, arg chan *ic.InterContainerMessage) {
khenaidooabad44c2018-08-03 16:58:35 -0400530 kp.lockTransactionIdToChannelMap.Lock()
531 defer kp.lockTransactionIdToChannelMap.Unlock()
532 if _, exist := kp.transactionIdToChannelMap[id]; !exist {
khenaidoo43c82122018-11-22 18:38:28 -0500533 kp.transactionIdToChannelMap[id] = &transactionChannel{topic: topic, ch: arg}
khenaidooabad44c2018-08-03 16:58:35 -0400534 }
535}
536
npujar467fe752020-01-16 20:17:45 +0530537func (kp *interContainerProxy) deleteFromTransactionIdToChannelMap(id string) {
khenaidooabad44c2018-08-03 16:58:35 -0400538 kp.lockTransactionIdToChannelMap.Lock()
539 defer kp.lockTransactionIdToChannelMap.Unlock()
khenaidoo43c82122018-11-22 18:38:28 -0500540 if transChannel, exist := kp.transactionIdToChannelMap[id]; exist {
541 // Close the channel first
542 close(transChannel.ch)
khenaidooabad44c2018-08-03 16:58:35 -0400543 delete(kp.transactionIdToChannelMap, id)
544 }
545}
546
npujar467fe752020-01-16 20:17:45 +0530547func (kp *interContainerProxy) deleteTopicTransactionIdToChannelMap(id string) {
khenaidoo43c82122018-11-22 18:38:28 -0500548 kp.lockTransactionIdToChannelMap.Lock()
549 defer kp.lockTransactionIdToChannelMap.Unlock()
550 for key, value := range kp.transactionIdToChannelMap {
551 if value.topic.Name == id {
552 close(value.ch)
553 delete(kp.transactionIdToChannelMap, key)
khenaidooabad44c2018-08-03 16:58:35 -0400554 }
555 }
khenaidooabad44c2018-08-03 16:58:35 -0400556}
557
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000558// nolint: unused
Rohan Agrawal31f21802020-06-12 05:38:46 +0000559func (kp *interContainerProxy) deleteAllTransactionIdToChannelMap(ctx context.Context) {
560 logger.Debug(ctx, "delete-all-transaction-id-channel-map")
khenaidoo43c82122018-11-22 18:38:28 -0500561 kp.lockTransactionIdToChannelMap.Lock()
562 defer kp.lockTransactionIdToChannelMap.Unlock()
563 for key, value := range kp.transactionIdToChannelMap {
564 close(value.ch)
565 delete(kp.transactionIdToChannelMap, key)
khenaidooabad44c2018-08-03 16:58:35 -0400566 }
khenaidooabad44c2018-08-03 16:58:35 -0400567}
568
Rohan Agrawal31f21802020-06-12 05:38:46 +0000569func (kp *interContainerProxy) DeleteTopic(ctx context.Context, topic Topic) error {
khenaidoo43c82122018-11-22 18:38:28 -0500570 // If we have any consumers on that topic we need to close them
Rohan Agrawal31f21802020-06-12 05:38:46 +0000571 if err := kp.deleteFromTopicResponseChannelMap(ctx, topic.Name); err != nil {
572 logger.Errorw(ctx, "delete-from-topic-responsechannelmap-failed", log.Fields{"error": err})
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500573 }
Rohan Agrawal31f21802020-06-12 05:38:46 +0000574 if err := kp.deleteFromTopicRequestHandlerChannelMap(ctx, topic.Name); err != nil {
575 logger.Errorw(ctx, "delete-from-topic-requesthandlerchannelmap-failed", log.Fields{"error": err})
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500576 }
khenaidoo43c82122018-11-22 18:38:28 -0500577 kp.deleteTopicTransactionIdToChannelMap(topic.Name)
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500578
Rohan Agrawal31f21802020-06-12 05:38:46 +0000579 return kp.kafkaClient.DeleteTopic(ctx, &topic)
khenaidoo43c82122018-11-22 18:38:28 -0500580}
581
Rohan Agrawal31f21802020-06-12 05:38:46 +0000582func encodeReturnedValue(ctx context.Context, returnedVal interface{}) (*any.Any, error) {
khenaidooabad44c2018-08-03 16:58:35 -0400583 // Encode the response argument - needs to be a proto message
584 if returnedVal == nil {
585 return nil, nil
586 }
587 protoValue, ok := returnedVal.(proto.Message)
588 if !ok {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000589 logger.Warnw(ctx, "response-value-not-proto-message", log.Fields{"error": ok, "returnVal": returnedVal})
khenaidooabad44c2018-08-03 16:58:35 -0400590 err := errors.New("response-value-not-proto-message")
591 return nil, err
592 }
593
594 // Marshal the returned value, if any
595 var marshalledReturnedVal *any.Any
596 var err error
597 if marshalledReturnedVal, err = ptypes.MarshalAny(protoValue); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000598 logger.Warnw(ctx, "cannot-marshal-returned-val", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400599 return nil, err
600 }
601 return marshalledReturnedVal, nil
602}
603
Rohan Agrawal31f21802020-06-12 05:38:46 +0000604func encodeDefaultFailedResponse(ctx context.Context, request *ic.InterContainerMessage) *ic.InterContainerMessage {
khenaidoo79232702018-12-04 11:00:41 -0500605 responseHeader := &ic.Header{
khenaidooabad44c2018-08-03 16:58:35 -0400606 Id: request.Header.Id,
khenaidoo79232702018-12-04 11:00:41 -0500607 Type: ic.MessageType_RESPONSE,
khenaidooabad44c2018-08-03 16:58:35 -0400608 FromTopic: request.Header.ToTopic,
609 ToTopic: request.Header.FromTopic,
Scott Baker504b4802020-04-17 10:12:20 -0700610 Timestamp: ptypes.TimestampNow(),
khenaidooabad44c2018-08-03 16:58:35 -0400611 }
khenaidoo79232702018-12-04 11:00:41 -0500612 responseBody := &ic.InterContainerResponseBody{
khenaidooabad44c2018-08-03 16:58:35 -0400613 Success: false,
614 Result: nil,
615 }
616 var marshalledResponseBody *any.Any
617 var err error
618 // Error should never happen here
619 if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000620 logger.Warnw(ctx, "cannot-marshal-failed-response-body", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400621 }
622
khenaidoo79232702018-12-04 11:00:41 -0500623 return &ic.InterContainerMessage{
khenaidooabad44c2018-08-03 16:58:35 -0400624 Header: responseHeader,
625 Body: marshalledResponseBody,
626 }
627
628}
629
630//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
631//or an error on failure
Rohan Agrawal31f21802020-06-12 05:38:46 +0000632func encodeResponse(ctx context.Context, request *ic.InterContainerMessage, success bool, returnedValues ...interface{}) (*ic.InterContainerMessage, error) {
633 //logger.Debugw(ctx, "encodeResponse", log.Fields{"success": success, "returnedValues": returnedValues})
khenaidoo79232702018-12-04 11:00:41 -0500634 responseHeader := &ic.Header{
khenaidooabad44c2018-08-03 16:58:35 -0400635 Id: request.Header.Id,
khenaidoo79232702018-12-04 11:00:41 -0500636 Type: ic.MessageType_RESPONSE,
khenaidooabad44c2018-08-03 16:58:35 -0400637 FromTopic: request.Header.ToTopic,
638 ToTopic: request.Header.FromTopic,
khenaidoo2c6a0992019-04-29 13:46:56 -0400639 KeyTopic: request.Header.KeyTopic,
Scott Baker504b4802020-04-17 10:12:20 -0700640 Timestamp: ptypes.TimestampNow(),
khenaidooabad44c2018-08-03 16:58:35 -0400641 }
642
643 // Go over all returned values
644 var marshalledReturnedVal *any.Any
645 var err error
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000646
647 // for now we support only 1 returned value - (excluding the error)
648 if len(returnedValues) > 0 {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000649 if marshalledReturnedVal, err = encodeReturnedValue(ctx, returnedValues[0]); err != nil {
650 logger.Warnw(ctx, "cannot-marshal-response-body", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400651 }
khenaidooabad44c2018-08-03 16:58:35 -0400652 }
653
khenaidoo79232702018-12-04 11:00:41 -0500654 responseBody := &ic.InterContainerResponseBody{
khenaidooabad44c2018-08-03 16:58:35 -0400655 Success: success,
656 Result: marshalledReturnedVal,
657 }
658
659 // Marshal the response body
660 var marshalledResponseBody *any.Any
661 if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000662 logger.Warnw(ctx, "cannot-marshal-response-body", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400663 return nil, err
664 }
665
khenaidoo79232702018-12-04 11:00:41 -0500666 return &ic.InterContainerMessage{
khenaidooabad44c2018-08-03 16:58:35 -0400667 Header: responseHeader,
668 Body: marshalledResponseBody,
669 }, nil
670}
671
Rohan Agrawal31f21802020-06-12 05:38:46 +0000672func CallFuncByName(ctx context.Context, myClass interface{}, funcName string, params ...interface{}) (out []reflect.Value, err error) {
khenaidooabad44c2018-08-03 16:58:35 -0400673 myClassValue := reflect.ValueOf(myClass)
khenaidoo19374072018-12-11 11:05:15 -0500674 // Capitalize the first letter in the funcName to workaround the first capital letters required to
675 // invoke a function from a different package
676 funcName = strings.Title(funcName)
khenaidooabad44c2018-08-03 16:58:35 -0400677 m := myClassValue.MethodByName(funcName)
678 if !m.IsValid() {
khenaidoo43c82122018-11-22 18:38:28 -0500679 return make([]reflect.Value, 0), fmt.Errorf("method-not-found \"%s\"", funcName)
khenaidooabad44c2018-08-03 16:58:35 -0400680 }
Rohan Agrawal31f21802020-06-12 05:38:46 +0000681 in := make([]reflect.Value, len(params)+1)
682 in[0] = reflect.ValueOf(ctx)
khenaidooabad44c2018-08-03 16:58:35 -0400683 for i, param := range params {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000684 in[i+1] = reflect.ValueOf(param)
khenaidooabad44c2018-08-03 16:58:35 -0400685 }
686 out = m.Call(in)
687 return
688}
689
Rohan Agrawal31f21802020-06-12 05:38:46 +0000690func (kp *interContainerProxy) addTransactionId(ctx context.Context, transactionId string, currentArgs []*ic.Argument) []*ic.Argument {
khenaidoo297cd252019-02-07 22:10:23 -0500691 arg := &KVArg{
692 Key: TransactionKey,
693 Value: &ic.StrType{Val: transactionId},
694 }
695
696 var marshalledArg *any.Any
697 var err error
698 if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: transactionId}); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000699 logger.Warnw(ctx, "cannot-add-transactionId", log.Fields{"error": err})
khenaidoo297cd252019-02-07 22:10:23 -0500700 return currentArgs
701 }
702 protoArg := &ic.Argument{
703 Key: arg.Key,
704 Value: marshalledArg,
705 }
706 return append(currentArgs, protoArg)
707}
708
Rohan Agrawal31f21802020-06-12 05:38:46 +0000709func (kp *interContainerProxy) addFromTopic(ctx context.Context, fromTopic string, currentArgs []*ic.Argument) []*ic.Argument {
khenaidoo54e0ddf2019-02-27 16:21:33 -0500710 var marshalledArg *any.Any
711 var err error
712 if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: fromTopic}); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000713 logger.Warnw(ctx, "cannot-add-transactionId", log.Fields{"error": err})
khenaidoo54e0ddf2019-02-27 16:21:33 -0500714 return currentArgs
715 }
716 protoArg := &ic.Argument{
717 Key: FromTopic,
718 Value: marshalledArg,
719 }
720 return append(currentArgs, protoArg)
721}
722
Rohan Agrawal31f21802020-06-12 05:38:46 +0000723func (kp *interContainerProxy) handleMessage(ctx context.Context, msg *ic.InterContainerMessage, targetInterface interface{}) {
khenaidooabad44c2018-08-03 16:58:35 -0400724
khenaidoo43c82122018-11-22 18:38:28 -0500725 // First extract the header to know whether this is a request - responses are handled by a different handler
khenaidoo79232702018-12-04 11:00:41 -0500726 if msg.Header.Type == ic.MessageType_REQUEST {
khenaidooabad44c2018-08-03 16:58:35 -0400727 var out []reflect.Value
728 var err error
729
730 // Get the request body
khenaidoo79232702018-12-04 11:00:41 -0500731 requestBody := &ic.InterContainerRequestBody{}
khenaidooabad44c2018-08-03 16:58:35 -0400732 if err = ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000733 logger.Warnw(ctx, "cannot-unmarshal-request", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400734 } else {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000735 logger.Debugw(ctx, "received-request", log.Fields{"rpc": requestBody.Rpc, "header": msg.Header})
khenaidooabad44c2018-08-03 16:58:35 -0400736 // let the callee unpack the arguments as its the only one that knows the real proto type
khenaidoo297cd252019-02-07 22:10:23 -0500737 // Augment the requestBody with the message Id as it will be used in scenarios where cores
738 // are set in pairs and competing
Rohan Agrawal31f21802020-06-12 05:38:46 +0000739 requestBody.Args = kp.addTransactionId(ctx, msg.Header.Id, requestBody.Args)
khenaidoo54e0ddf2019-02-27 16:21:33 -0500740
741 // Augment the requestBody with the From topic name as it will be used in scenarios where a container
742 // needs to send an unsollicited message to the currently requested container
Rohan Agrawal31f21802020-06-12 05:38:46 +0000743 requestBody.Args = kp.addFromTopic(ctx, msg.Header.FromTopic, requestBody.Args)
khenaidoo54e0ddf2019-02-27 16:21:33 -0500744
Rohan Agrawal31f21802020-06-12 05:38:46 +0000745 out, err = CallFuncByName(ctx, targetInterface, requestBody.Rpc, requestBody.Args)
khenaidooabad44c2018-08-03 16:58:35 -0400746 if err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000747 logger.Warn(ctx, err)
khenaidooabad44c2018-08-03 16:58:35 -0400748 }
749 }
750 // Response required?
751 if requestBody.ResponseRequired {
752 // If we already have an error before then just return that
khenaidoo79232702018-12-04 11:00:41 -0500753 var returnError *ic.Error
khenaidooabad44c2018-08-03 16:58:35 -0400754 var returnedValues []interface{}
755 var success bool
756 if err != nil {
khenaidoo79232702018-12-04 11:00:41 -0500757 returnError = &ic.Error{Reason: err.Error()}
khenaidooabad44c2018-08-03 16:58:35 -0400758 returnedValues = make([]interface{}, 1)
759 returnedValues[0] = returnError
760 } else {
khenaidoob9203542018-09-17 22:56:37 -0400761 returnedValues = make([]interface{}, 0)
762 // Check for errors first
763 lastIndex := len(out) - 1
764 if out[lastIndex].Interface() != nil { // Error
khenaidoo09771ef2019-10-11 14:25:02 -0400765 if retError, ok := out[lastIndex].Interface().(error); ok {
766 if retError.Error() == ErrorTransactionNotAcquired.Error() {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000767 logger.Debugw(ctx, "Ignoring request", log.Fields{"error": retError, "txId": msg.Header.Id})
khenaidoo09771ef2019-10-11 14:25:02 -0400768 return // Ignore - process is in competing mode and ignored transaction
769 }
770 returnError = &ic.Error{Reason: retError.Error()}
khenaidoob9203542018-09-17 22:56:37 -0400771 returnedValues = append(returnedValues, returnError)
772 } else { // Should never happen
khenaidoo79232702018-12-04 11:00:41 -0500773 returnError = &ic.Error{Reason: "incorrect-error-returns"}
khenaidoob9203542018-09-17 22:56:37 -0400774 returnedValues = append(returnedValues, returnError)
775 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500776 } else if len(out) == 2 && reflect.ValueOf(out[0].Interface()).IsValid() && reflect.ValueOf(out[0].Interface()).IsNil() {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000777 logger.Warnw(ctx, "Unexpected response of (nil,nil)", log.Fields{"txId": msg.Header.Id})
khenaidoo09771ef2019-10-11 14:25:02 -0400778 return // Ignore - should not happen
khenaidoob9203542018-09-17 22:56:37 -0400779 } else { // Non-error case
780 success = true
781 for idx, val := range out {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000782 //logger.Debugw(ctx, "returned-api-response-loop", log.Fields{"idx": idx, "val": val.Interface()})
khenaidoob9203542018-09-17 22:56:37 -0400783 if idx != lastIndex {
784 returnedValues = append(returnedValues, val.Interface())
khenaidooabad44c2018-08-03 16:58:35 -0400785 }
khenaidooabad44c2018-08-03 16:58:35 -0400786 }
787 }
788 }
789
khenaidoo79232702018-12-04 11:00:41 -0500790 var icm *ic.InterContainerMessage
Rohan Agrawal31f21802020-06-12 05:38:46 +0000791 if icm, err = encodeResponse(ctx, msg, success, returnedValues...); err != nil {
792 logger.Warnw(ctx, "error-encoding-response-returning-failure-result", log.Fields{"error": err})
793 icm = encodeDefaultFailedResponse(ctx, msg)
khenaidooabad44c2018-08-03 16:58:35 -0400794 }
khenaidoo43c82122018-11-22 18:38:28 -0500795 // To preserve ordering of messages, all messages to a given topic are sent to the same partition
796 // by providing a message key. The key is encoded in the topic name. If the deviceId is not
797 // present then the key will be empty, hence all messages for a given topic will be sent to all
798 // partitions.
799 replyTopic := &Topic{Name: msg.Header.FromTopic}
khenaidoobdcb8e02019-03-06 16:28:56 -0500800 key := msg.Header.KeyTopic
Rohan Agrawal31f21802020-06-12 05:38:46 +0000801 logger.Debugw(ctx, "sending-response-to-kafka", log.Fields{"rpc": requestBody.Rpc, "header": icm.Header, "key": key})
khenaidoo43c82122018-11-22 18:38:28 -0500802 // TODO: handle error response.
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000803 go func() {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000804 if err := kp.kafkaClient.Send(ctx, icm, replyTopic, key); err != nil {
805 logger.Errorw(ctx, "send-reply-failed", log.Fields{
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000806 "topic": replyTopic,
807 "key": key,
808 "error": err})
809 }
810 }()
khenaidooabad44c2018-08-03 16:58:35 -0400811 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500812 } else if msg.Header.Type == ic.MessageType_RESPONSE {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000813 logger.Debugw(ctx, "response-received", log.Fields{"msg-header": msg.Header})
814 go kp.dispatchResponse(ctx, msg)
khenaidoo54e0ddf2019-02-27 16:21:33 -0500815 } else {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000816 logger.Warnw(ctx, "unsupported-message-received", log.Fields{"msg-header": msg.Header})
khenaidooabad44c2018-08-03 16:58:35 -0400817 }
818}
819
Rohan Agrawal31f21802020-06-12 05:38:46 +0000820func (kp *interContainerProxy) waitForMessages(ctx context.Context, ch <-chan *ic.InterContainerMessage, topic Topic, targetInterface interface{}) {
khenaidooabad44c2018-08-03 16:58:35 -0400821 // Wait for messages
822 for msg := range ch {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000823 //logger.Debugw(ctx, "request-received", log.Fields{"msg": msg, "topic": topic.Name, "target": targetInterface})
824 go kp.handleMessage(context.Background(), msg, targetInterface)
khenaidooabad44c2018-08-03 16:58:35 -0400825 }
826}
827
Rohan Agrawal31f21802020-06-12 05:38:46 +0000828func (kp *interContainerProxy) dispatchResponse(ctx context.Context, msg *ic.InterContainerMessage) {
khenaidoo1ce37ad2019-03-24 22:07:24 -0400829 kp.lockTransactionIdToChannelMap.RLock()
830 defer kp.lockTransactionIdToChannelMap.RUnlock()
khenaidooabad44c2018-08-03 16:58:35 -0400831 if _, exist := kp.transactionIdToChannelMap[msg.Header.Id]; !exist {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000832 logger.Debugw(ctx, "no-waiting-channel", log.Fields{"transaction": msg.Header.Id})
khenaidooabad44c2018-08-03 16:58:35 -0400833 return
834 }
khenaidoo43c82122018-11-22 18:38:28 -0500835 kp.transactionIdToChannelMap[msg.Header.Id].ch <- msg
khenaidooabad44c2018-08-03 16:58:35 -0400836}
837
khenaidooabad44c2018-08-03 16:58:35 -0400838// subscribeForResponse allows a caller to subscribe to a given topic when waiting for a response.
839// This method is built to prevent all subscribers to receive all messages as is the case of the Subscribe
840// API. There is one response channel waiting for kafka messages before dispatching the message to the
841// corresponding waiting channel
Rohan Agrawal31f21802020-06-12 05:38:46 +0000842func (kp *interContainerProxy) subscribeForResponse(ctx context.Context, topic Topic, trnsId string) (chan *ic.InterContainerMessage, error) {
843 logger.Debugw(ctx, "subscribeForResponse", log.Fields{"topic": topic.Name, "trnsid": trnsId})
khenaidooabad44c2018-08-03 16:58:35 -0400844
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500845 // Create a specific channel for this consumers. We cannot use the channel from the kafkaclient as it will
khenaidoo43c82122018-11-22 18:38:28 -0500846 // broadcast any message for this topic to all channels waiting on it.
Scott Bakerb9635992020-03-11 21:11:28 -0700847 // Set channel size to 1 to prevent deadlock, see VOL-2708
848 ch := make(chan *ic.InterContainerMessage, 1)
khenaidoo43c82122018-11-22 18:38:28 -0500849 kp.addToTransactionIdToChannelMap(trnsId, &topic, ch)
khenaidooabad44c2018-08-03 16:58:35 -0400850
851 return ch, nil
852}
853
Rohan Agrawal31f21802020-06-12 05:38:46 +0000854func (kp *interContainerProxy) unSubscribeForResponse(ctx context.Context, trnsId string) error {
855 logger.Debugw(ctx, "unsubscribe-for-response", log.Fields{"trnsId": trnsId})
khenaidoo7ff26c72019-01-16 14:55:48 -0500856 kp.deleteFromTransactionIdToChannelMap(trnsId)
khenaidooabad44c2018-08-03 16:58:35 -0400857 return nil
858}
859
Rohan Agrawal31f21802020-06-12 05:38:46 +0000860func (kp *interContainerProxy) EnableLivenessChannel(ctx context.Context, enable bool) chan bool {
861 return kp.kafkaClient.EnableLivenessChannel(ctx, enable)
Scott Bakeree6a0872019-10-29 15:59:52 -0700862}
863
Rohan Agrawal31f21802020-06-12 05:38:46 +0000864func (kp *interContainerProxy) EnableHealthinessChannel(ctx context.Context, enable bool) chan bool {
865 return kp.kafkaClient.EnableHealthinessChannel(ctx, enable)
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800866}
867
Rohan Agrawal31f21802020-06-12 05:38:46 +0000868func (kp *interContainerProxy) SendLiveness(ctx context.Context) error {
869 return kp.kafkaClient.SendLiveness(ctx)
Scott Bakeree6a0872019-10-29 15:59:52 -0700870}
871
khenaidooabad44c2018-08-03 16:58:35 -0400872//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
873//or an error on failure
Rohan Agrawal31f21802020-06-12 05:38:46 +0000874func encodeRequest(ctx context.Context, rpc string, toTopic *Topic, replyTopic *Topic, key string, kvArgs ...*KVArg) (*ic.InterContainerMessage, error) {
khenaidoo79232702018-12-04 11:00:41 -0500875 requestHeader := &ic.Header{
khenaidooabad44c2018-08-03 16:58:35 -0400876 Id: uuid.New().String(),
khenaidoo79232702018-12-04 11:00:41 -0500877 Type: ic.MessageType_REQUEST,
khenaidooabad44c2018-08-03 16:58:35 -0400878 FromTopic: replyTopic.Name,
879 ToTopic: toTopic.Name,
khenaidoo2c6a0992019-04-29 13:46:56 -0400880 KeyTopic: key,
Scott Baker504b4802020-04-17 10:12:20 -0700881 Timestamp: ptypes.TimestampNow(),
khenaidooabad44c2018-08-03 16:58:35 -0400882 }
khenaidoo79232702018-12-04 11:00:41 -0500883 requestBody := &ic.InterContainerRequestBody{
khenaidooabad44c2018-08-03 16:58:35 -0400884 Rpc: rpc,
885 ResponseRequired: true,
886 ReplyToTopic: replyTopic.Name,
887 }
888
889 for _, arg := range kvArgs {
khenaidoo2c6f1672018-09-20 23:14:41 -0400890 if arg == nil {
891 // In case the caller sends an array with empty args
892 continue
893 }
khenaidooabad44c2018-08-03 16:58:35 -0400894 var marshalledArg *any.Any
895 var err error
896 // ascertain the value interface type is a proto.Message
897 protoValue, ok := arg.Value.(proto.Message)
898 if !ok {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000899 logger.Warnw(ctx, "argument-value-not-proto-message", log.Fields{"error": ok, "Value": arg.Value})
khenaidooabad44c2018-08-03 16:58:35 -0400900 err := errors.New("argument-value-not-proto-message")
901 return nil, err
902 }
903 if marshalledArg, err = ptypes.MarshalAny(protoValue); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000904 logger.Warnw(ctx, "cannot-marshal-request", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400905 return nil, err
906 }
khenaidoo79232702018-12-04 11:00:41 -0500907 protoArg := &ic.Argument{
khenaidooabad44c2018-08-03 16:58:35 -0400908 Key: arg.Key,
909 Value: marshalledArg,
910 }
911 requestBody.Args = append(requestBody.Args, protoArg)
912 }
913
914 var marshalledData *any.Any
915 var err error
916 if marshalledData, err = ptypes.MarshalAny(requestBody); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000917 logger.Warnw(ctx, "cannot-marshal-request", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400918 return nil, err
919 }
khenaidoo79232702018-12-04 11:00:41 -0500920 request := &ic.InterContainerMessage{
khenaidooabad44c2018-08-03 16:58:35 -0400921 Header: requestHeader,
922 Body: marshalledData,
923 }
924 return request, nil
925}
926
Rohan Agrawal31f21802020-06-12 05:38:46 +0000927func decodeResponse(ctx context.Context, response *ic.InterContainerMessage) (*ic.InterContainerResponseBody, error) {
khenaidooabad44c2018-08-03 16:58:35 -0400928 // Extract the message body
khenaidoo79232702018-12-04 11:00:41 -0500929 responseBody := ic.InterContainerResponseBody{}
khenaidooabad44c2018-08-03 16:58:35 -0400930 if err := ptypes.UnmarshalAny(response.Body, &responseBody); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000931 logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400932 return nil, err
933 }
Rohan Agrawal31f21802020-06-12 05:38:46 +0000934 //logger.Debugw(ctx, "response-decoded-successfully", log.Fields{"response-status": &responseBody.Success})
khenaidooabad44c2018-08-03 16:58:35 -0400935
936 return &responseBody, nil
937
938}