blob: beda53785c266977bcc9b92c929ae33d24ac39c2 [file] [log] [blame]
khenaidoobf6e7bb2018-08-14 22:27:29 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
khenaidooabad44c2018-08-03 16:58:35 -040016package kafka
17
18import (
19 "context"
20 "errors"
21 "fmt"
Scott Bakerb9635992020-03-11 21:11:28 -070022 "google.golang.org/grpc/codes"
23 "google.golang.org/grpc/status"
khenaidooabad44c2018-08-03 16:58:35 -040024 "reflect"
khenaidoo19374072018-12-11 11:05:15 -050025 "strings"
khenaidooabad44c2018-08-03 16:58:35 -040026 "sync"
27 "time"
khenaidooabad44c2018-08-03 16:58:35 -040028
serkant.uluderya2ae470f2020-01-21 11:13:09 -080029 "github.com/golang/protobuf/proto"
30 "github.com/golang/protobuf/ptypes"
31 "github.com/golang/protobuf/ptypes/any"
32 "github.com/google/uuid"
33 "github.com/opencord/voltha-lib-go/v3/pkg/log"
34 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
35)
khenaidooabad44c2018-08-03 16:58:35 -040036
37const (
khenaidoo43c82122018-11-22 18:38:28 -050038 DefaultMaxRetries = 3
Scott Bakerb9635992020-03-11 21:11:28 -070039 DefaultRequestTimeout = 60000 // 60000 milliseconds - to handle a wider latency range
khenaidooabad44c2018-08-03 16:58:35 -040040)
41
khenaidoo297cd252019-02-07 22:10:23 -050042const (
43 TransactionKey = "transactionID"
khenaidoo54e0ddf2019-02-27 16:21:33 -050044 FromTopic = "fromTopic"
khenaidoo297cd252019-02-07 22:10:23 -050045)
46
khenaidoo09771ef2019-10-11 14:25:02 -040047var ErrorTransactionNotAcquired = errors.New("transaction-not-acquired")
48var ErrorTransactionInvalidId = errors.New("transaction-invalid-id")
49
khenaidoo43c82122018-11-22 18:38:28 -050050// requestHandlerChannel represents an interface associated with a channel. Whenever, an event is
51// obtained from that channel, this interface is invoked. This is used to handle
52// async requests into the Core via the kafka messaging bus
53type requestHandlerChannel struct {
54 requesthandlerInterface interface{}
khenaidoo79232702018-12-04 11:00:41 -050055 ch <-chan *ic.InterContainerMessage
khenaidooabad44c2018-08-03 16:58:35 -040056}
57
khenaidoo43c82122018-11-22 18:38:28 -050058// transactionChannel represents a combination of a topic and a channel onto which a response received
59// on the kafka bus will be sent to
60type transactionChannel struct {
61 topic *Topic
khenaidoo79232702018-12-04 11:00:41 -050062 ch chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -050063}
64
npujar467fe752020-01-16 20:17:45 +053065type InterContainerProxy interface {
66 Start() error
67 Stop()
68 GetDefaultTopic() *Topic
69 DeviceDiscovered(deviceId string, deviceType string, parentId string, publisher string) error
70 InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic, waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any)
Scott Bakerb9635992020-03-11 21:11:28 -070071 InvokeAsyncRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic, waitForResponse bool, key string, kvArgs ...*KVArg) chan *RpcResponse
npujar467fe752020-01-16 20:17:45 +053072 SubscribeWithRequestHandlerInterface(topic Topic, handler interface{}) error
73 SubscribeWithDefaultRequestHandler(topic Topic, initialOffset int64) error
74 UnSubscribeFromRequestHandler(topic Topic) error
75 DeleteTopic(topic Topic) error
76 EnableLivenessChannel(enable bool) chan bool
77 SendLiveness() error
78}
79
80// interContainerProxy represents the messaging proxy
81type interContainerProxy struct {
khenaidoo43c82122018-11-22 18:38:28 -050082 kafkaHost string
83 kafkaPort int
npujar467fe752020-01-16 20:17:45 +053084 defaultTopic *Topic
khenaidoo43c82122018-11-22 18:38:28 -050085 defaultRequestHandlerInterface interface{}
khenaidoo79232702018-12-04 11:00:41 -050086 deviceDiscoveryTopic *Topic
khenaidoo43c82122018-11-22 18:38:28 -050087 kafkaClient Client
npujar467fe752020-01-16 20:17:45 +053088 doneCh chan struct{}
89 doneOnce sync.Once
khenaidoo43c82122018-11-22 18:38:28 -050090
91 // This map is used to map a topic to an interface and channel. When a request is received
92 // on that channel (registered to the topic) then that interface is invoked.
93 topicToRequestHandlerChannelMap map[string]*requestHandlerChannel
94 lockTopicRequestHandlerChannelMap sync.RWMutex
95
96 // This map is used to map a channel to a response topic. This channel handles all responses on that
khenaidoo4c1a5bf2018-11-29 15:53:42 -050097 // channel for that topic and forward them to the appropriate consumers channel, using the
khenaidoo43c82122018-11-22 18:38:28 -050098 // transactionIdToChannelMap.
khenaidoo79232702018-12-04 11:00:41 -050099 topicToResponseChannelMap map[string]<-chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -0500100 lockTopicResponseChannelMap sync.RWMutex
101
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500102 // This map is used to map a transaction to a consumers channel. This is used whenever a request has been
khenaidoo43c82122018-11-22 18:38:28 -0500103 // sent out and we are waiting for a response.
104 transactionIdToChannelMap map[string]*transactionChannel
khenaidooabad44c2018-08-03 16:58:35 -0400105 lockTransactionIdToChannelMap sync.RWMutex
106}
107
npujar467fe752020-01-16 20:17:45 +0530108type InterContainerProxyOption func(*interContainerProxy)
khenaidooabad44c2018-08-03 16:58:35 -0400109
khenaidoo43c82122018-11-22 18:38:28 -0500110func InterContainerHost(host string) InterContainerProxyOption {
npujar467fe752020-01-16 20:17:45 +0530111 return func(args *interContainerProxy) {
khenaidoo43c82122018-11-22 18:38:28 -0500112 args.kafkaHost = host
khenaidooabad44c2018-08-03 16:58:35 -0400113 }
114}
115
khenaidoo43c82122018-11-22 18:38:28 -0500116func InterContainerPort(port int) InterContainerProxyOption {
npujar467fe752020-01-16 20:17:45 +0530117 return func(args *interContainerProxy) {
khenaidoo43c82122018-11-22 18:38:28 -0500118 args.kafkaPort = port
khenaidooabad44c2018-08-03 16:58:35 -0400119 }
120}
121
khenaidoo43c82122018-11-22 18:38:28 -0500122func DefaultTopic(topic *Topic) InterContainerProxyOption {
npujar467fe752020-01-16 20:17:45 +0530123 return func(args *interContainerProxy) {
124 args.defaultTopic = topic
khenaidooabad44c2018-08-03 16:58:35 -0400125 }
126}
127
khenaidoo79232702018-12-04 11:00:41 -0500128func DeviceDiscoveryTopic(topic *Topic) InterContainerProxyOption {
npujar467fe752020-01-16 20:17:45 +0530129 return func(args *interContainerProxy) {
khenaidoo79232702018-12-04 11:00:41 -0500130 args.deviceDiscoveryTopic = topic
131 }
132}
133
khenaidoo43c82122018-11-22 18:38:28 -0500134func RequestHandlerInterface(handler interface{}) InterContainerProxyOption {
npujar467fe752020-01-16 20:17:45 +0530135 return func(args *interContainerProxy) {
khenaidoo43c82122018-11-22 18:38:28 -0500136 args.defaultRequestHandlerInterface = handler
khenaidooabad44c2018-08-03 16:58:35 -0400137 }
138}
139
khenaidoo43c82122018-11-22 18:38:28 -0500140func MsgClient(client Client) InterContainerProxyOption {
npujar467fe752020-01-16 20:17:45 +0530141 return func(args *interContainerProxy) {
khenaidoo43c82122018-11-22 18:38:28 -0500142 args.kafkaClient = client
143 }
144}
145
npujar467fe752020-01-16 20:17:45 +0530146func newInterContainerProxy(opts ...InterContainerProxyOption) *interContainerProxy {
147 proxy := &interContainerProxy{
khenaidoo43c82122018-11-22 18:38:28 -0500148 kafkaHost: DefaultKafkaHost,
149 kafkaPort: DefaultKafkaPort,
npujar467fe752020-01-16 20:17:45 +0530150 doneCh: make(chan struct{}),
khenaidooabad44c2018-08-03 16:58:35 -0400151 }
152
153 for _, option := range opts {
154 option(proxy)
155 }
156
npujar467fe752020-01-16 20:17:45 +0530157 return proxy
khenaidooabad44c2018-08-03 16:58:35 -0400158}
159
npujar467fe752020-01-16 20:17:45 +0530160func NewInterContainerProxy(opts ...InterContainerProxyOption) InterContainerProxy {
161 return newInterContainerProxy(opts...)
162}
163
164func (kp *interContainerProxy) Start() error {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800165 logger.Info("Starting-Proxy")
khenaidooabad44c2018-08-03 16:58:35 -0400166
khenaidoo43c82122018-11-22 18:38:28 -0500167 // Kafka MsgClient should already have been created. If not, output fatal error
168 if kp.kafkaClient == nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800169 logger.Fatal("kafka-client-not-set")
khenaidoo43c82122018-11-22 18:38:28 -0500170 }
171
khenaidoo43c82122018-11-22 18:38:28 -0500172 // Start the kafka client
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500173 if err := kp.kafkaClient.Start(); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800174 logger.Errorw("Cannot-create-kafka-proxy", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400175 return err
176 }
177
khenaidoo43c82122018-11-22 18:38:28 -0500178 // Create the topic to response channel map
khenaidoo79232702018-12-04 11:00:41 -0500179 kp.topicToResponseChannelMap = make(map[string]<-chan *ic.InterContainerMessage)
khenaidoo43c82122018-11-22 18:38:28 -0500180 //
khenaidooabad44c2018-08-03 16:58:35 -0400181 // Create the transactionId to Channel Map
khenaidoo43c82122018-11-22 18:38:28 -0500182 kp.transactionIdToChannelMap = make(map[string]*transactionChannel)
183
184 // Create the topic to request channel map
185 kp.topicToRequestHandlerChannelMap = make(map[string]*requestHandlerChannel)
khenaidooabad44c2018-08-03 16:58:35 -0400186
187 return nil
188}
189
npujar467fe752020-01-16 20:17:45 +0530190func (kp *interContainerProxy) Stop() {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800191 logger.Info("stopping-intercontainer-proxy")
npujar467fe752020-01-16 20:17:45 +0530192 kp.doneOnce.Do(func() { close(kp.doneCh) })
khenaidoo43c82122018-11-22 18:38:28 -0500193 // TODO : Perform cleanup
khenaidooca301322019-01-09 23:06:32 -0500194 kp.kafkaClient.Stop()
Scott Baker0e78ba22020-02-24 17:58:47 -0800195 err := kp.deleteAllTopicRequestHandlerChannelMap()
196 if err != nil {
Scott Baker432f9be2020-03-26 11:56:30 -0700197 logger.Errorw("failed-delete-all-topic-request-handler-channel-map", log.Fields{"error": err})
Scott Baker0e78ba22020-02-24 17:58:47 -0800198 }
199 err = kp.deleteAllTopicResponseChannelMap()
200 if err != nil {
Scott Baker432f9be2020-03-26 11:56:30 -0700201 logger.Errorw("failed-delete-all-topic-response-channel-map", log.Fields{"error": err})
Scott Baker0e78ba22020-02-24 17:58:47 -0800202 }
203 kp.deleteAllTransactionIdToChannelMap()
khenaidooabad44c2018-08-03 16:58:35 -0400204}
205
npujar467fe752020-01-16 20:17:45 +0530206func (kp *interContainerProxy) GetDefaultTopic() *Topic {
207 return kp.defaultTopic
208}
209
khenaidoo79232702018-12-04 11:00:41 -0500210// DeviceDiscovered publish the discovered device onto the kafka messaging bus
npujar467fe752020-01-16 20:17:45 +0530211func (kp *interContainerProxy) DeviceDiscovered(deviceId string, deviceType string, parentId string, publisher string) error {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800212 logger.Debugw("sending-device-discovery-msg", log.Fields{"deviceId": deviceId})
khenaidoo79232702018-12-04 11:00:41 -0500213 // Simple validation
214 if deviceId == "" || deviceType == "" {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800215 logger.Errorw("invalid-parameters", log.Fields{"id": deviceId, "type": deviceType})
khenaidoo79232702018-12-04 11:00:41 -0500216 return errors.New("invalid-parameters")
217 }
218 // Create the device discovery message
219 header := &ic.Header{
220 Id: uuid.New().String(),
221 Type: ic.MessageType_DEVICE_DISCOVERED,
npujar467fe752020-01-16 20:17:45 +0530222 FromTopic: kp.defaultTopic.Name,
khenaidoo79232702018-12-04 11:00:41 -0500223 ToTopic: kp.deviceDiscoveryTopic.Name,
Scott Baker504b4802020-04-17 10:12:20 -0700224 Timestamp: ptypes.TimestampNow(),
khenaidoo79232702018-12-04 11:00:41 -0500225 }
226 body := &ic.DeviceDiscovered{
227 Id: deviceId,
228 DeviceType: deviceType,
229 ParentId: parentId,
khenaidood2b6df92018-12-13 16:37:20 -0500230 Publisher: publisher,
khenaidoo79232702018-12-04 11:00:41 -0500231 }
232
233 var marshalledData *any.Any
234 var err error
235 if marshalledData, err = ptypes.MarshalAny(body); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800236 logger.Errorw("cannot-marshal-request", log.Fields{"error": err})
khenaidoo79232702018-12-04 11:00:41 -0500237 return err
238 }
239 msg := &ic.InterContainerMessage{
240 Header: header,
241 Body: marshalledData,
242 }
243
244 // Send the message
245 if err := kp.kafkaClient.Send(msg, kp.deviceDiscoveryTopic); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800246 logger.Errorw("cannot-send-device-discovery-message", log.Fields{"error": err})
khenaidoo79232702018-12-04 11:00:41 -0500247 return err
248 }
249 return nil
250}
251
Scott Bakerb9635992020-03-11 21:11:28 -0700252// InvokeAsyncRPC is used to make an RPC request asynchronously
253func (kp *interContainerProxy) InvokeAsyncRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
254 waitForResponse bool, key string, kvArgs ...*KVArg) chan *RpcResponse {
255
256 logger.Debugw("InvokeAsyncRPC", log.Fields{"rpc": rpc, "key": key})
257 // If a replyToTopic is provided then we use it, otherwise just use the default toTopic. The replyToTopic is
258 // typically the device ID.
259 responseTopic := replyToTopic
260 if responseTopic == nil {
261 responseTopic = kp.GetDefaultTopic()
262 }
263
264 chnl := make(chan *RpcResponse)
265
266 go func() {
267
268 // once we're done,
269 // close the response channel
270 defer close(chnl)
271
272 var err error
273 var protoRequest *ic.InterContainerMessage
274
275 // Encode the request
276 protoRequest, err = encodeRequest(rpc, toTopic, responseTopic, key, kvArgs...)
277 if err != nil {
278 logger.Warnw("cannot-format-request", log.Fields{"rpc": rpc, "error": err})
279 chnl <- NewResponse(RpcFormattingError, err, nil)
280 return
281 }
282
283 // Subscribe for response, if needed, before sending request
284 var ch <-chan *ic.InterContainerMessage
285 if ch, err = kp.subscribeForResponse(*responseTopic, protoRequest.Header.Id); err != nil {
286 logger.Errorw("failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
287 chnl <- NewResponse(RpcTransportError, err, nil)
288 return
289 }
290
291 // Send request - if the topic is formatted with a device Id then we will send the request using a
292 // specific key, hence ensuring a single partition is used to publish the request. This ensures that the
293 // subscriber on that topic will receive the request in the order it was sent. The key used is the deviceId.
294 logger.Debugw("sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
295
296 // if the message is not sent on kafka publish an event an close the channel
297 if err = kp.kafkaClient.Send(protoRequest, toTopic, key); err != nil {
298 chnl <- NewResponse(RpcTransportError, err, nil)
299 return
300 }
301
302 // if the client is not waiting for a response send the ack and close the channel
303 chnl <- NewResponse(RpcSent, nil, nil)
304 if !waitForResponse {
305 return
306 }
307
308 defer func() {
309 // Remove the subscription for a response on return
310 if err := kp.unSubscribeForResponse(protoRequest.Header.Id); err != nil {
311 logger.Warnw("invoke-async-rpc-unsubscriber-for-response-failed", log.Fields{"err": err})
312 }
313 }()
314
315 // Wait for response as well as timeout or cancellation
316 select {
317 case msg, ok := <-ch:
318 if !ok {
319 logger.Warnw("channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
320 chnl <- NewResponse(RpcTransportError, status.Error(codes.Aborted, "channel closed"), nil)
321 }
322 logger.Debugw("received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
323 if responseBody, err := decodeResponse(msg); err != nil {
324 chnl <- NewResponse(RpcReply, err, nil)
325 } else {
326 if responseBody.Success {
327 chnl <- NewResponse(RpcReply, nil, responseBody.Result)
328 } else {
329 // response body contains an error
330 unpackErr := &ic.Error{}
331 if err := ptypes.UnmarshalAny(responseBody.Result, unpackErr); err != nil {
332 chnl <- NewResponse(RpcReply, err, nil)
333 } else {
334 chnl <- NewResponse(RpcReply, status.Error(codes.Internal, unpackErr.Reason), nil)
335 }
336 }
337 }
338 case <-ctx.Done():
339 logger.Errorw("context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
340 err := status.Error(codes.DeadlineExceeded, ctx.Err().Error())
341 chnl <- NewResponse(RpcTimeout, err, nil)
342 case <-kp.doneCh:
343 chnl <- NewResponse(RpcSystemClosing, nil, nil)
344 logger.Warnw("received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
345 }
346 }()
347 return chnl
348}
349
khenaidoo43c82122018-11-22 18:38:28 -0500350// InvokeRPC is used to send a request to a given topic
npujar467fe752020-01-16 20:17:45 +0530351func (kp *interContainerProxy) InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
khenaidoobdcb8e02019-03-06 16:28:56 -0500352 waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any) {
khenaidoo43c82122018-11-22 18:38:28 -0500353
354 // If a replyToTopic is provided then we use it, otherwise just use the default toTopic. The replyToTopic is
355 // typically the device ID.
356 responseTopic := replyToTopic
357 if responseTopic == nil {
npujar467fe752020-01-16 20:17:45 +0530358 responseTopic = kp.defaultTopic
khenaidoo43c82122018-11-22 18:38:28 -0500359 }
360
khenaidooabad44c2018-08-03 16:58:35 -0400361 // Encode the request
khenaidoobdcb8e02019-03-06 16:28:56 -0500362 protoRequest, err := encodeRequest(rpc, toTopic, responseTopic, key, kvArgs...)
khenaidooabad44c2018-08-03 16:58:35 -0400363 if err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800364 logger.Warnw("cannot-format-request", log.Fields{"rpc": rpc, "error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400365 return false, nil
366 }
367
368 // Subscribe for response, if needed, before sending request
khenaidoo79232702018-12-04 11:00:41 -0500369 var ch <-chan *ic.InterContainerMessage
khenaidooabad44c2018-08-03 16:58:35 -0400370 if waitForResponse {
371 var err error
khenaidoo43c82122018-11-22 18:38:28 -0500372 if ch, err = kp.subscribeForResponse(*responseTopic, protoRequest.Header.Id); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800373 logger.Errorw("failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
khenaidooabad44c2018-08-03 16:58:35 -0400374 }
375 }
376
khenaidoo43c82122018-11-22 18:38:28 -0500377 // Send request - if the topic is formatted with a device Id then we will send the request using a
378 // specific key, hence ensuring a single partition is used to publish the request. This ensures that the
379 // subscriber on that topic will receive the request in the order it was sent. The key used is the deviceId.
khenaidoobdcb8e02019-03-06 16:28:56 -0500380 //key := GetDeviceIdFromTopic(*toTopic)
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800381 logger.Debugw("sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000382 go func() {
383 if err := kp.kafkaClient.Send(protoRequest, toTopic, key); err != nil {
384 logger.Errorw("send-failed", log.Fields{
385 "topic": toTopic,
386 "key": key,
387 "error": err})
388 }
389 }()
khenaidooabad44c2018-08-03 16:58:35 -0400390
391 if waitForResponse {
khenaidoob9203542018-09-17 22:56:37 -0400392 // Create a child context based on the parent context, if any
khenaidooabad44c2018-08-03 16:58:35 -0400393 var cancel context.CancelFunc
khenaidoob9203542018-09-17 22:56:37 -0400394 childCtx := context.Background()
khenaidooabad44c2018-08-03 16:58:35 -0400395 if ctx == nil {
396 ctx, cancel = context.WithTimeout(context.Background(), DefaultRequestTimeout*time.Millisecond)
khenaidoob9203542018-09-17 22:56:37 -0400397 } else {
398 childCtx, cancel = context.WithTimeout(ctx, DefaultRequestTimeout*time.Millisecond)
khenaidooabad44c2018-08-03 16:58:35 -0400399 }
khenaidoob9203542018-09-17 22:56:37 -0400400 defer cancel()
khenaidooabad44c2018-08-03 16:58:35 -0400401
402 // Wait for response as well as timeout or cancellation
403 // Remove the subscription for a response on return
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000404 defer func() {
405 if err := kp.unSubscribeForResponse(protoRequest.Header.Id); err != nil {
406 logger.Errorw("response-unsubscribe-failed", log.Fields{
407 "id": protoRequest.Header.Id,
408 "error": err})
409 }
410 }()
khenaidooabad44c2018-08-03 16:58:35 -0400411 select {
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500412 case msg, ok := <-ch:
413 if !ok {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800414 logger.Warnw("channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500415 protoError := &ic.Error{Reason: "channel-closed"}
416 var marshalledArg *any.Any
417 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
418 return false, nil // Should never happen
419 }
420 return false, marshalledArg
421 }
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800422 logger.Debugw("received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
khenaidoo79232702018-12-04 11:00:41 -0500423 var responseBody *ic.InterContainerResponseBody
khenaidooabad44c2018-08-03 16:58:35 -0400424 var err error
425 if responseBody, err = decodeResponse(msg); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800426 logger.Errorw("decode-response-error", log.Fields{"error": err})
npujar467fe752020-01-16 20:17:45 +0530427 // FIXME we should return something
khenaidooabad44c2018-08-03 16:58:35 -0400428 }
429 return responseBody.Success, responseBody.Result
430 case <-ctx.Done():
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800431 logger.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
khenaidooabad44c2018-08-03 16:58:35 -0400432 // pack the error as proto any type
npujar467fe752020-01-16 20:17:45 +0530433 protoError := &ic.Error{Reason: ctx.Err().Error(), Code: ic.ErrorCode_DEADLINE_EXCEEDED}
434
khenaidooabad44c2018-08-03 16:58:35 -0400435 var marshalledArg *any.Any
436 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
437 return false, nil // Should never happen
438 }
439 return false, marshalledArg
khenaidoob9203542018-09-17 22:56:37 -0400440 case <-childCtx.Done():
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800441 logger.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": childCtx.Err()})
khenaidoob9203542018-09-17 22:56:37 -0400442 // pack the error as proto any type
npujar467fe752020-01-16 20:17:45 +0530443 protoError := &ic.Error{Reason: childCtx.Err().Error(), Code: ic.ErrorCode_DEADLINE_EXCEEDED}
444
khenaidoob9203542018-09-17 22:56:37 -0400445 var marshalledArg *any.Any
446 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
447 return false, nil // Should never happen
448 }
449 return false, marshalledArg
khenaidooabad44c2018-08-03 16:58:35 -0400450 case <-kp.doneCh:
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800451 logger.Infow("received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
khenaidooabad44c2018-08-03 16:58:35 -0400452 return true, nil
453 }
454 }
455 return true, nil
456}
457
khenaidoo43c82122018-11-22 18:38:28 -0500458// SubscribeWithRequestHandlerInterface allows a caller to assign a target object to be invoked automatically
khenaidooabad44c2018-08-03 16:58:35 -0400459// when a message is received on a given topic
npujar467fe752020-01-16 20:17:45 +0530460func (kp *interContainerProxy) SubscribeWithRequestHandlerInterface(topic Topic, handler interface{}) error {
khenaidooabad44c2018-08-03 16:58:35 -0400461
462 // Subscribe to receive messages for that topic
khenaidoo79232702018-12-04 11:00:41 -0500463 var ch <-chan *ic.InterContainerMessage
khenaidooabad44c2018-08-03 16:58:35 -0400464 var err error
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500465 if ch, err = kp.kafkaClient.Subscribe(&topic); err != nil {
khenaidoo43c82122018-11-22 18:38:28 -0500466 //if ch, err = kp.Subscribe(topic); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800467 logger.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
Abhilash S.L90cd9552019-07-18 17:30:29 +0530468 return err
khenaidooabad44c2018-08-03 16:58:35 -0400469 }
khenaidoo43c82122018-11-22 18:38:28 -0500470
471 kp.defaultRequestHandlerInterface = handler
472 kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: handler, ch: ch})
khenaidooabad44c2018-08-03 16:58:35 -0400473 // Launch a go routine to receive and process kafka messages
khenaidoo54e0ddf2019-02-27 16:21:33 -0500474 go kp.waitForMessages(ch, topic, handler)
khenaidooabad44c2018-08-03 16:58:35 -0400475
476 return nil
477}
478
khenaidoo43c82122018-11-22 18:38:28 -0500479// SubscribeWithDefaultRequestHandler allows a caller to add a topic to an existing target object to be invoked automatically
480// when a message is received on a given topic. So far there is only 1 target registered per microservice
npujar467fe752020-01-16 20:17:45 +0530481func (kp *interContainerProxy) SubscribeWithDefaultRequestHandler(topic Topic, initialOffset int64) error {
khenaidoo43c82122018-11-22 18:38:28 -0500482 // Subscribe to receive messages for that topic
khenaidoo79232702018-12-04 11:00:41 -0500483 var ch <-chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -0500484 var err error
khenaidoo54e0ddf2019-02-27 16:21:33 -0500485 if ch, err = kp.kafkaClient.Subscribe(&topic, &KVArg{Key: Offset, Value: initialOffset}); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800486 logger.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
khenaidooca301322019-01-09 23:06:32 -0500487 return err
khenaidoo43c82122018-11-22 18:38:28 -0500488 }
489 kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: kp.defaultRequestHandlerInterface, ch: ch})
490
491 // Launch a go routine to receive and process kafka messages
khenaidoo54e0ddf2019-02-27 16:21:33 -0500492 go kp.waitForMessages(ch, topic, kp.defaultRequestHandlerInterface)
khenaidoo43c82122018-11-22 18:38:28 -0500493
khenaidooabad44c2018-08-03 16:58:35 -0400494 return nil
495}
496
npujar467fe752020-01-16 20:17:45 +0530497func (kp *interContainerProxy) UnSubscribeFromRequestHandler(topic Topic) error {
khenaidoo43c82122018-11-22 18:38:28 -0500498 return kp.deleteFromTopicRequestHandlerChannelMap(topic.Name)
499}
500
npujar467fe752020-01-16 20:17:45 +0530501func (kp *interContainerProxy) deleteFromTopicResponseChannelMap(topic string) error {
khenaidoo43c82122018-11-22 18:38:28 -0500502 kp.lockTopicResponseChannelMap.Lock()
503 defer kp.lockTopicResponseChannelMap.Unlock()
504 if _, exist := kp.topicToResponseChannelMap[topic]; exist {
505 // Unsubscribe to this topic first - this will close the subscribed channel
506 var err error
507 if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800508 logger.Errorw("unsubscribing-error", log.Fields{"topic": topic})
khenaidoo43c82122018-11-22 18:38:28 -0500509 }
510 delete(kp.topicToResponseChannelMap, topic)
511 return err
512 } else {
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000513 return fmt.Errorf("%s-Topic-not-found", topic)
khenaidooabad44c2018-08-03 16:58:35 -0400514 }
515}
516
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000517// nolint: unused
npujar467fe752020-01-16 20:17:45 +0530518func (kp *interContainerProxy) deleteAllTopicResponseChannelMap() error {
Scott Baker0e78ba22020-02-24 17:58:47 -0800519 logger.Debug("delete-all-topic-response-channel")
khenaidoo43c82122018-11-22 18:38:28 -0500520 kp.lockTopicResponseChannelMap.Lock()
521 defer kp.lockTopicResponseChannelMap.Unlock()
Scott Baker0e78ba22020-02-24 17:58:47 -0800522 var unsubscribeFailTopics []string
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000523 for topic := range kp.topicToResponseChannelMap {
khenaidoo43c82122018-11-22 18:38:28 -0500524 // Unsubscribe to this topic first - this will close the subscribed channel
Scott Baker0e78ba22020-02-24 17:58:47 -0800525 if err := kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
526 unsubscribeFailTopics = append(unsubscribeFailTopics, topic)
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800527 logger.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
Scott Baker0e78ba22020-02-24 17:58:47 -0800528 // Do not return. Continue to try to unsubscribe to other topics.
529 } else {
530 // Only delete from channel map if successfully unsubscribed.
531 delete(kp.topicToResponseChannelMap, topic)
khenaidoo43c82122018-11-22 18:38:28 -0500532 }
khenaidooabad44c2018-08-03 16:58:35 -0400533 }
Scott Baker0e78ba22020-02-24 17:58:47 -0800534 if len(unsubscribeFailTopics) > 0 {
535 return fmt.Errorf("unsubscribe-errors: %v", unsubscribeFailTopics)
536 }
537 return nil
khenaidooabad44c2018-08-03 16:58:35 -0400538}
539
npujar467fe752020-01-16 20:17:45 +0530540func (kp *interContainerProxy) addToTopicRequestHandlerChannelMap(topic string, arg *requestHandlerChannel) {
khenaidoo43c82122018-11-22 18:38:28 -0500541 kp.lockTopicRequestHandlerChannelMap.Lock()
542 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
543 if _, exist := kp.topicToRequestHandlerChannelMap[topic]; !exist {
544 kp.topicToRequestHandlerChannelMap[topic] = arg
khenaidooabad44c2018-08-03 16:58:35 -0400545 }
khenaidooabad44c2018-08-03 16:58:35 -0400546}
547
npujar467fe752020-01-16 20:17:45 +0530548func (kp *interContainerProxy) deleteFromTopicRequestHandlerChannelMap(topic string) error {
khenaidoo43c82122018-11-22 18:38:28 -0500549 kp.lockTopicRequestHandlerChannelMap.Lock()
550 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
551 if _, exist := kp.topicToRequestHandlerChannelMap[topic]; exist {
552 // Close the kafka client client first by unsubscribing to this topic
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000553 if err := kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
554 return err
555 }
khenaidoo43c82122018-11-22 18:38:28 -0500556 delete(kp.topicToRequestHandlerChannelMap, topic)
khenaidooabad44c2018-08-03 16:58:35 -0400557 return nil
khenaidoo43c82122018-11-22 18:38:28 -0500558 } else {
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000559 return fmt.Errorf("%s-Topic-not-found", topic)
khenaidooabad44c2018-08-03 16:58:35 -0400560 }
khenaidooabad44c2018-08-03 16:58:35 -0400561}
562
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000563// nolint: unused
npujar467fe752020-01-16 20:17:45 +0530564func (kp *interContainerProxy) deleteAllTopicRequestHandlerChannelMap() error {
Scott Baker0e78ba22020-02-24 17:58:47 -0800565 logger.Debug("delete-all-topic-request-channel")
khenaidoo43c82122018-11-22 18:38:28 -0500566 kp.lockTopicRequestHandlerChannelMap.Lock()
567 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
Scott Baker0e78ba22020-02-24 17:58:47 -0800568 var unsubscribeFailTopics []string
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000569 for topic := range kp.topicToRequestHandlerChannelMap {
khenaidoo43c82122018-11-22 18:38:28 -0500570 // Close the kafka client client first by unsubscribing to this topic
Scott Baker0e78ba22020-02-24 17:58:47 -0800571 if err := kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
572 unsubscribeFailTopics = append(unsubscribeFailTopics, topic)
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800573 logger.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
Scott Baker0e78ba22020-02-24 17:58:47 -0800574 // Do not return. Continue to try to unsubscribe to other topics.
575 } else {
576 // Only delete from channel map if successfully unsubscribed.
577 delete(kp.topicToRequestHandlerChannelMap, topic)
khenaidoo43c82122018-11-22 18:38:28 -0500578 }
khenaidoo43c82122018-11-22 18:38:28 -0500579 }
Scott Baker0e78ba22020-02-24 17:58:47 -0800580 if len(unsubscribeFailTopics) > 0 {
581 return fmt.Errorf("unsubscribe-errors: %v", unsubscribeFailTopics)
582 }
583 return nil
khenaidoo43c82122018-11-22 18:38:28 -0500584}
585
npujar467fe752020-01-16 20:17:45 +0530586func (kp *interContainerProxy) addToTransactionIdToChannelMap(id string, topic *Topic, arg chan *ic.InterContainerMessage) {
khenaidooabad44c2018-08-03 16:58:35 -0400587 kp.lockTransactionIdToChannelMap.Lock()
588 defer kp.lockTransactionIdToChannelMap.Unlock()
589 if _, exist := kp.transactionIdToChannelMap[id]; !exist {
khenaidoo43c82122018-11-22 18:38:28 -0500590 kp.transactionIdToChannelMap[id] = &transactionChannel{topic: topic, ch: arg}
khenaidooabad44c2018-08-03 16:58:35 -0400591 }
592}
593
npujar467fe752020-01-16 20:17:45 +0530594func (kp *interContainerProxy) deleteFromTransactionIdToChannelMap(id string) {
khenaidooabad44c2018-08-03 16:58:35 -0400595 kp.lockTransactionIdToChannelMap.Lock()
596 defer kp.lockTransactionIdToChannelMap.Unlock()
khenaidoo43c82122018-11-22 18:38:28 -0500597 if transChannel, exist := kp.transactionIdToChannelMap[id]; exist {
598 // Close the channel first
599 close(transChannel.ch)
khenaidooabad44c2018-08-03 16:58:35 -0400600 delete(kp.transactionIdToChannelMap, id)
601 }
602}
603
npujar467fe752020-01-16 20:17:45 +0530604func (kp *interContainerProxy) deleteTopicTransactionIdToChannelMap(id string) {
khenaidoo43c82122018-11-22 18:38:28 -0500605 kp.lockTransactionIdToChannelMap.Lock()
606 defer kp.lockTransactionIdToChannelMap.Unlock()
607 for key, value := range kp.transactionIdToChannelMap {
608 if value.topic.Name == id {
609 close(value.ch)
610 delete(kp.transactionIdToChannelMap, key)
khenaidooabad44c2018-08-03 16:58:35 -0400611 }
612 }
khenaidooabad44c2018-08-03 16:58:35 -0400613}
614
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000615// nolint: unused
npujar467fe752020-01-16 20:17:45 +0530616func (kp *interContainerProxy) deleteAllTransactionIdToChannelMap() {
Scott Baker0e78ba22020-02-24 17:58:47 -0800617 logger.Debug("delete-all-transaction-id-channel-map")
khenaidoo43c82122018-11-22 18:38:28 -0500618 kp.lockTransactionIdToChannelMap.Lock()
619 defer kp.lockTransactionIdToChannelMap.Unlock()
620 for key, value := range kp.transactionIdToChannelMap {
621 close(value.ch)
622 delete(kp.transactionIdToChannelMap, key)
khenaidooabad44c2018-08-03 16:58:35 -0400623 }
khenaidooabad44c2018-08-03 16:58:35 -0400624}
625
npujar467fe752020-01-16 20:17:45 +0530626func (kp *interContainerProxy) DeleteTopic(topic Topic) error {
khenaidoo43c82122018-11-22 18:38:28 -0500627 // If we have any consumers on that topic we need to close them
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500628 if err := kp.deleteFromTopicResponseChannelMap(topic.Name); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800629 logger.Errorw("delete-from-topic-responsechannelmap-failed", log.Fields{"error": err})
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500630 }
631 if err := kp.deleteFromTopicRequestHandlerChannelMap(topic.Name); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800632 logger.Errorw("delete-from-topic-requesthandlerchannelmap-failed", log.Fields{"error": err})
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500633 }
khenaidoo43c82122018-11-22 18:38:28 -0500634 kp.deleteTopicTransactionIdToChannelMap(topic.Name)
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500635
khenaidoo43c82122018-11-22 18:38:28 -0500636 return kp.kafkaClient.DeleteTopic(&topic)
637}
638
639func encodeReturnedValue(returnedVal interface{}) (*any.Any, error) {
khenaidooabad44c2018-08-03 16:58:35 -0400640 // Encode the response argument - needs to be a proto message
641 if returnedVal == nil {
642 return nil, nil
643 }
644 protoValue, ok := returnedVal.(proto.Message)
645 if !ok {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800646 logger.Warnw("response-value-not-proto-message", log.Fields{"error": ok, "returnVal": returnedVal})
khenaidooabad44c2018-08-03 16:58:35 -0400647 err := errors.New("response-value-not-proto-message")
648 return nil, err
649 }
650
651 // Marshal the returned value, if any
652 var marshalledReturnedVal *any.Any
653 var err error
654 if marshalledReturnedVal, err = ptypes.MarshalAny(protoValue); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800655 logger.Warnw("cannot-marshal-returned-val", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400656 return nil, err
657 }
658 return marshalledReturnedVal, nil
659}
660
khenaidoo79232702018-12-04 11:00:41 -0500661func encodeDefaultFailedResponse(request *ic.InterContainerMessage) *ic.InterContainerMessage {
662 responseHeader := &ic.Header{
khenaidooabad44c2018-08-03 16:58:35 -0400663 Id: request.Header.Id,
khenaidoo79232702018-12-04 11:00:41 -0500664 Type: ic.MessageType_RESPONSE,
khenaidooabad44c2018-08-03 16:58:35 -0400665 FromTopic: request.Header.ToTopic,
666 ToTopic: request.Header.FromTopic,
Scott Baker504b4802020-04-17 10:12:20 -0700667 Timestamp: ptypes.TimestampNow(),
khenaidooabad44c2018-08-03 16:58:35 -0400668 }
khenaidoo79232702018-12-04 11:00:41 -0500669 responseBody := &ic.InterContainerResponseBody{
khenaidooabad44c2018-08-03 16:58:35 -0400670 Success: false,
671 Result: nil,
672 }
673 var marshalledResponseBody *any.Any
674 var err error
675 // Error should never happen here
676 if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800677 logger.Warnw("cannot-marshal-failed-response-body", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400678 }
679
khenaidoo79232702018-12-04 11:00:41 -0500680 return &ic.InterContainerMessage{
khenaidooabad44c2018-08-03 16:58:35 -0400681 Header: responseHeader,
682 Body: marshalledResponseBody,
683 }
684
685}
686
687//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
688//or an error on failure
khenaidoo79232702018-12-04 11:00:41 -0500689func encodeResponse(request *ic.InterContainerMessage, success bool, returnedValues ...interface{}) (*ic.InterContainerMessage, error) {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800690 //logger.Debugw("encodeResponse", log.Fields{"success": success, "returnedValues": returnedValues})
khenaidoo79232702018-12-04 11:00:41 -0500691 responseHeader := &ic.Header{
khenaidooabad44c2018-08-03 16:58:35 -0400692 Id: request.Header.Id,
khenaidoo79232702018-12-04 11:00:41 -0500693 Type: ic.MessageType_RESPONSE,
khenaidooabad44c2018-08-03 16:58:35 -0400694 FromTopic: request.Header.ToTopic,
695 ToTopic: request.Header.FromTopic,
khenaidoo2c6a0992019-04-29 13:46:56 -0400696 KeyTopic: request.Header.KeyTopic,
Scott Baker504b4802020-04-17 10:12:20 -0700697 Timestamp: ptypes.TimestampNow(),
khenaidooabad44c2018-08-03 16:58:35 -0400698 }
699
700 // Go over all returned values
701 var marshalledReturnedVal *any.Any
702 var err error
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000703
704 // for now we support only 1 returned value - (excluding the error)
705 if len(returnedValues) > 0 {
706 if marshalledReturnedVal, err = encodeReturnedValue(returnedValues[0]); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800707 logger.Warnw("cannot-marshal-response-body", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400708 }
khenaidooabad44c2018-08-03 16:58:35 -0400709 }
710
khenaidoo79232702018-12-04 11:00:41 -0500711 responseBody := &ic.InterContainerResponseBody{
khenaidooabad44c2018-08-03 16:58:35 -0400712 Success: success,
713 Result: marshalledReturnedVal,
714 }
715
716 // Marshal the response body
717 var marshalledResponseBody *any.Any
718 if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800719 logger.Warnw("cannot-marshal-response-body", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400720 return nil, err
721 }
722
khenaidoo79232702018-12-04 11:00:41 -0500723 return &ic.InterContainerMessage{
khenaidooabad44c2018-08-03 16:58:35 -0400724 Header: responseHeader,
725 Body: marshalledResponseBody,
726 }, nil
727}
728
729func CallFuncByName(myClass interface{}, funcName string, params ...interface{}) (out []reflect.Value, err error) {
730 myClassValue := reflect.ValueOf(myClass)
khenaidoo19374072018-12-11 11:05:15 -0500731 // Capitalize the first letter in the funcName to workaround the first capital letters required to
732 // invoke a function from a different package
733 funcName = strings.Title(funcName)
khenaidooabad44c2018-08-03 16:58:35 -0400734 m := myClassValue.MethodByName(funcName)
735 if !m.IsValid() {
khenaidoo43c82122018-11-22 18:38:28 -0500736 return make([]reflect.Value, 0), fmt.Errorf("method-not-found \"%s\"", funcName)
khenaidooabad44c2018-08-03 16:58:35 -0400737 }
738 in := make([]reflect.Value, len(params))
739 for i, param := range params {
740 in[i] = reflect.ValueOf(param)
741 }
742 out = m.Call(in)
743 return
744}
745
npujar467fe752020-01-16 20:17:45 +0530746func (kp *interContainerProxy) addTransactionId(transactionId string, currentArgs []*ic.Argument) []*ic.Argument {
khenaidoo297cd252019-02-07 22:10:23 -0500747 arg := &KVArg{
748 Key: TransactionKey,
749 Value: &ic.StrType{Val: transactionId},
750 }
751
752 var marshalledArg *any.Any
753 var err error
754 if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: transactionId}); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800755 logger.Warnw("cannot-add-transactionId", log.Fields{"error": err})
khenaidoo297cd252019-02-07 22:10:23 -0500756 return currentArgs
757 }
758 protoArg := &ic.Argument{
759 Key: arg.Key,
760 Value: marshalledArg,
761 }
762 return append(currentArgs, protoArg)
763}
764
npujar467fe752020-01-16 20:17:45 +0530765func (kp *interContainerProxy) addFromTopic(fromTopic string, currentArgs []*ic.Argument) []*ic.Argument {
khenaidoo54e0ddf2019-02-27 16:21:33 -0500766 var marshalledArg *any.Any
767 var err error
768 if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: fromTopic}); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800769 logger.Warnw("cannot-add-transactionId", log.Fields{"error": err})
khenaidoo54e0ddf2019-02-27 16:21:33 -0500770 return currentArgs
771 }
772 protoArg := &ic.Argument{
773 Key: FromTopic,
774 Value: marshalledArg,
775 }
776 return append(currentArgs, protoArg)
777}
778
npujar467fe752020-01-16 20:17:45 +0530779func (kp *interContainerProxy) handleMessage(msg *ic.InterContainerMessage, targetInterface interface{}) {
khenaidooabad44c2018-08-03 16:58:35 -0400780
khenaidoo43c82122018-11-22 18:38:28 -0500781 // First extract the header to know whether this is a request - responses are handled by a different handler
khenaidoo79232702018-12-04 11:00:41 -0500782 if msg.Header.Type == ic.MessageType_REQUEST {
khenaidooabad44c2018-08-03 16:58:35 -0400783 var out []reflect.Value
784 var err error
785
786 // Get the request body
khenaidoo79232702018-12-04 11:00:41 -0500787 requestBody := &ic.InterContainerRequestBody{}
khenaidooabad44c2018-08-03 16:58:35 -0400788 if err = ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800789 logger.Warnw("cannot-unmarshal-request", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400790 } else {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800791 logger.Debugw("received-request", log.Fields{"rpc": requestBody.Rpc, "header": msg.Header})
khenaidooabad44c2018-08-03 16:58:35 -0400792 // let the callee unpack the arguments as its the only one that knows the real proto type
khenaidoo297cd252019-02-07 22:10:23 -0500793 // Augment the requestBody with the message Id as it will be used in scenarios where cores
794 // are set in pairs and competing
795 requestBody.Args = kp.addTransactionId(msg.Header.Id, requestBody.Args)
khenaidoo54e0ddf2019-02-27 16:21:33 -0500796
797 // Augment the requestBody with the From topic name as it will be used in scenarios where a container
798 // needs to send an unsollicited message to the currently requested container
799 requestBody.Args = kp.addFromTopic(msg.Header.FromTopic, requestBody.Args)
800
khenaidooabad44c2018-08-03 16:58:35 -0400801 out, err = CallFuncByName(targetInterface, requestBody.Rpc, requestBody.Args)
802 if err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800803 logger.Warn(err)
khenaidooabad44c2018-08-03 16:58:35 -0400804 }
805 }
806 // Response required?
807 if requestBody.ResponseRequired {
808 // If we already have an error before then just return that
khenaidoo79232702018-12-04 11:00:41 -0500809 var returnError *ic.Error
khenaidooabad44c2018-08-03 16:58:35 -0400810 var returnedValues []interface{}
811 var success bool
812 if err != nil {
khenaidoo79232702018-12-04 11:00:41 -0500813 returnError = &ic.Error{Reason: err.Error()}
khenaidooabad44c2018-08-03 16:58:35 -0400814 returnedValues = make([]interface{}, 1)
815 returnedValues[0] = returnError
816 } else {
khenaidoob9203542018-09-17 22:56:37 -0400817 returnedValues = make([]interface{}, 0)
818 // Check for errors first
819 lastIndex := len(out) - 1
820 if out[lastIndex].Interface() != nil { // Error
khenaidoo09771ef2019-10-11 14:25:02 -0400821 if retError, ok := out[lastIndex].Interface().(error); ok {
822 if retError.Error() == ErrorTransactionNotAcquired.Error() {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800823 logger.Debugw("Ignoring request", log.Fields{"error": retError, "txId": msg.Header.Id})
khenaidoo09771ef2019-10-11 14:25:02 -0400824 return // Ignore - process is in competing mode and ignored transaction
825 }
826 returnError = &ic.Error{Reason: retError.Error()}
khenaidoob9203542018-09-17 22:56:37 -0400827 returnedValues = append(returnedValues, returnError)
828 } else { // Should never happen
khenaidoo79232702018-12-04 11:00:41 -0500829 returnError = &ic.Error{Reason: "incorrect-error-returns"}
khenaidoob9203542018-09-17 22:56:37 -0400830 returnedValues = append(returnedValues, returnError)
831 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500832 } else if len(out) == 2 && reflect.ValueOf(out[0].Interface()).IsValid() && reflect.ValueOf(out[0].Interface()).IsNil() {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800833 logger.Warnw("Unexpected response of (nil,nil)", log.Fields{"txId": msg.Header.Id})
khenaidoo09771ef2019-10-11 14:25:02 -0400834 return // Ignore - should not happen
khenaidoob9203542018-09-17 22:56:37 -0400835 } else { // Non-error case
836 success = true
837 for idx, val := range out {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800838 //logger.Debugw("returned-api-response-loop", log.Fields{"idx": idx, "val": val.Interface()})
khenaidoob9203542018-09-17 22:56:37 -0400839 if idx != lastIndex {
840 returnedValues = append(returnedValues, val.Interface())
khenaidooabad44c2018-08-03 16:58:35 -0400841 }
khenaidooabad44c2018-08-03 16:58:35 -0400842 }
843 }
844 }
845
khenaidoo79232702018-12-04 11:00:41 -0500846 var icm *ic.InterContainerMessage
khenaidooabad44c2018-08-03 16:58:35 -0400847 if icm, err = encodeResponse(msg, success, returnedValues...); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800848 logger.Warnw("error-encoding-response-returning-failure-result", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400849 icm = encodeDefaultFailedResponse(msg)
850 }
khenaidoo43c82122018-11-22 18:38:28 -0500851 // To preserve ordering of messages, all messages to a given topic are sent to the same partition
852 // by providing a message key. The key is encoded in the topic name. If the deviceId is not
853 // present then the key will be empty, hence all messages for a given topic will be sent to all
854 // partitions.
855 replyTopic := &Topic{Name: msg.Header.FromTopic}
khenaidoobdcb8e02019-03-06 16:28:56 -0500856 key := msg.Header.KeyTopic
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800857 logger.Debugw("sending-response-to-kafka", log.Fields{"rpc": requestBody.Rpc, "header": icm.Header, "key": key})
khenaidoo43c82122018-11-22 18:38:28 -0500858 // TODO: handle error response.
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000859 go func() {
860 if err := kp.kafkaClient.Send(icm, replyTopic, key); err != nil {
861 logger.Errorw("send-reply-failed", log.Fields{
862 "topic": replyTopic,
863 "key": key,
864 "error": err})
865 }
866 }()
khenaidooabad44c2018-08-03 16:58:35 -0400867 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500868 } else if msg.Header.Type == ic.MessageType_RESPONSE {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800869 logger.Debugw("response-received", log.Fields{"msg-header": msg.Header})
khenaidoo54e0ddf2019-02-27 16:21:33 -0500870 go kp.dispatchResponse(msg)
871 } else {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800872 logger.Warnw("unsupported-message-received", log.Fields{"msg-header": msg.Header})
khenaidooabad44c2018-08-03 16:58:35 -0400873 }
874}
875
npujar467fe752020-01-16 20:17:45 +0530876func (kp *interContainerProxy) waitForMessages(ch <-chan *ic.InterContainerMessage, topic Topic, targetInterface interface{}) {
khenaidooabad44c2018-08-03 16:58:35 -0400877 // Wait for messages
878 for msg := range ch {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800879 //logger.Debugw("request-received", log.Fields{"msg": msg, "topic": topic.Name, "target": targetInterface})
khenaidoo54e0ddf2019-02-27 16:21:33 -0500880 go kp.handleMessage(msg, targetInterface)
khenaidooabad44c2018-08-03 16:58:35 -0400881 }
882}
883
npujar467fe752020-01-16 20:17:45 +0530884func (kp *interContainerProxy) dispatchResponse(msg *ic.InterContainerMessage) {
khenaidoo1ce37ad2019-03-24 22:07:24 -0400885 kp.lockTransactionIdToChannelMap.RLock()
886 defer kp.lockTransactionIdToChannelMap.RUnlock()
khenaidooabad44c2018-08-03 16:58:35 -0400887 if _, exist := kp.transactionIdToChannelMap[msg.Header.Id]; !exist {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800888 logger.Debugw("no-waiting-channel", log.Fields{"transaction": msg.Header.Id})
khenaidooabad44c2018-08-03 16:58:35 -0400889 return
890 }
khenaidoo43c82122018-11-22 18:38:28 -0500891 kp.transactionIdToChannelMap[msg.Header.Id].ch <- msg
khenaidooabad44c2018-08-03 16:58:35 -0400892}
893
khenaidooabad44c2018-08-03 16:58:35 -0400894// subscribeForResponse allows a caller to subscribe to a given topic when waiting for a response.
895// This method is built to prevent all subscribers to receive all messages as is the case of the Subscribe
896// API. There is one response channel waiting for kafka messages before dispatching the message to the
897// corresponding waiting channel
npujar467fe752020-01-16 20:17:45 +0530898func (kp *interContainerProxy) subscribeForResponse(topic Topic, trnsId string) (chan *ic.InterContainerMessage, error) {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800899 logger.Debugw("subscribeForResponse", log.Fields{"topic": topic.Name, "trnsid": trnsId})
khenaidooabad44c2018-08-03 16:58:35 -0400900
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500901 // Create a specific channel for this consumers. We cannot use the channel from the kafkaclient as it will
khenaidoo43c82122018-11-22 18:38:28 -0500902 // broadcast any message for this topic to all channels waiting on it.
Scott Bakerb9635992020-03-11 21:11:28 -0700903 // Set channel size to 1 to prevent deadlock, see VOL-2708
904 ch := make(chan *ic.InterContainerMessage, 1)
khenaidoo43c82122018-11-22 18:38:28 -0500905 kp.addToTransactionIdToChannelMap(trnsId, &topic, ch)
khenaidooabad44c2018-08-03 16:58:35 -0400906
907 return ch, nil
908}
909
npujar467fe752020-01-16 20:17:45 +0530910func (kp *interContainerProxy) unSubscribeForResponse(trnsId string) error {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800911 logger.Debugw("unsubscribe-for-response", log.Fields{"trnsId": trnsId})
khenaidoo7ff26c72019-01-16 14:55:48 -0500912 kp.deleteFromTransactionIdToChannelMap(trnsId)
khenaidooabad44c2018-08-03 16:58:35 -0400913 return nil
914}
915
npujar467fe752020-01-16 20:17:45 +0530916func (kp *interContainerProxy) EnableLivenessChannel(enable bool) chan bool {
Scott Bakeree6a0872019-10-29 15:59:52 -0700917 return kp.kafkaClient.EnableLivenessChannel(enable)
918}
919
npujar467fe752020-01-16 20:17:45 +0530920func (kp *interContainerProxy) EnableHealthinessChannel(enable bool) chan bool {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800921 return kp.kafkaClient.EnableHealthinessChannel(enable)
922}
923
npujar467fe752020-01-16 20:17:45 +0530924func (kp *interContainerProxy) SendLiveness() error {
Scott Bakeree6a0872019-10-29 15:59:52 -0700925 return kp.kafkaClient.SendLiveness()
926}
927
khenaidooabad44c2018-08-03 16:58:35 -0400928//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
929//or an error on failure
khenaidoobdcb8e02019-03-06 16:28:56 -0500930func encodeRequest(rpc string, toTopic *Topic, replyTopic *Topic, key string, kvArgs ...*KVArg) (*ic.InterContainerMessage, error) {
khenaidoo79232702018-12-04 11:00:41 -0500931 requestHeader := &ic.Header{
khenaidooabad44c2018-08-03 16:58:35 -0400932 Id: uuid.New().String(),
khenaidoo79232702018-12-04 11:00:41 -0500933 Type: ic.MessageType_REQUEST,
khenaidooabad44c2018-08-03 16:58:35 -0400934 FromTopic: replyTopic.Name,
935 ToTopic: toTopic.Name,
khenaidoo2c6a0992019-04-29 13:46:56 -0400936 KeyTopic: key,
Scott Baker504b4802020-04-17 10:12:20 -0700937 Timestamp: ptypes.TimestampNow(),
khenaidooabad44c2018-08-03 16:58:35 -0400938 }
khenaidoo79232702018-12-04 11:00:41 -0500939 requestBody := &ic.InterContainerRequestBody{
khenaidooabad44c2018-08-03 16:58:35 -0400940 Rpc: rpc,
941 ResponseRequired: true,
942 ReplyToTopic: replyTopic.Name,
943 }
944
945 for _, arg := range kvArgs {
khenaidoo2c6f1672018-09-20 23:14:41 -0400946 if arg == nil {
947 // In case the caller sends an array with empty args
948 continue
949 }
khenaidooabad44c2018-08-03 16:58:35 -0400950 var marshalledArg *any.Any
951 var err error
952 // ascertain the value interface type is a proto.Message
953 protoValue, ok := arg.Value.(proto.Message)
954 if !ok {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800955 logger.Warnw("argument-value-not-proto-message", log.Fields{"error": ok, "Value": arg.Value})
khenaidooabad44c2018-08-03 16:58:35 -0400956 err := errors.New("argument-value-not-proto-message")
957 return nil, err
958 }
959 if marshalledArg, err = ptypes.MarshalAny(protoValue); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800960 logger.Warnw("cannot-marshal-request", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400961 return nil, err
962 }
khenaidoo79232702018-12-04 11:00:41 -0500963 protoArg := &ic.Argument{
khenaidooabad44c2018-08-03 16:58:35 -0400964 Key: arg.Key,
965 Value: marshalledArg,
966 }
967 requestBody.Args = append(requestBody.Args, protoArg)
968 }
969
970 var marshalledData *any.Any
971 var err error
972 if marshalledData, err = ptypes.MarshalAny(requestBody); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800973 logger.Warnw("cannot-marshal-request", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400974 return nil, err
975 }
khenaidoo79232702018-12-04 11:00:41 -0500976 request := &ic.InterContainerMessage{
khenaidooabad44c2018-08-03 16:58:35 -0400977 Header: requestHeader,
978 Body: marshalledData,
979 }
980 return request, nil
981}
982
khenaidoo79232702018-12-04 11:00:41 -0500983func decodeResponse(response *ic.InterContainerMessage) (*ic.InterContainerResponseBody, error) {
khenaidooabad44c2018-08-03 16:58:35 -0400984 // Extract the message body
khenaidoo79232702018-12-04 11:00:41 -0500985 responseBody := ic.InterContainerResponseBody{}
khenaidooabad44c2018-08-03 16:58:35 -0400986 if err := ptypes.UnmarshalAny(response.Body, &responseBody); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800987 logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400988 return nil, err
989 }
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800990 //logger.Debugw("response-decoded-successfully", log.Fields{"response-status": &responseBody.Success})
khenaidooabad44c2018-08-03 16:58:35 -0400991
992 return &responseBody, nil
993
994}