blob: 368391e4814174f5c57bb9d863724a5e9d9f7d5e [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kafka
17
18import (
19 "context"
20 "errors"
21 "fmt"
William Kurkianea869482019-04-09 15:16:11 -040022 "reflect"
23 "strings"
24 "sync"
25 "time"
William Kurkianea869482019-04-09 15:16:11 -040026
Andrea Campanella974d7452020-06-26 19:32:30 +020027 "google.golang.org/grpc/codes"
28 "google.golang.org/grpc/status"
29
Esin Karamanccb714b2019-11-29 15:02:06 +000030 "github.com/golang/protobuf/proto"
31 "github.com/golang/protobuf/ptypes"
32 "github.com/golang/protobuf/ptypes/any"
33 "github.com/google/uuid"
34 "github.com/opencord/voltha-lib-go/v3/pkg/log"
35 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
36)
William Kurkianea869482019-04-09 15:16:11 -040037
38const (
39 DefaultMaxRetries = 3
divyadesaid26f6b12020-03-19 06:30:28 +000040 DefaultRequestTimeout = 60000 // 60000 milliseconds - to handle a wider latency range
William Kurkianea869482019-04-09 15:16:11 -040041)
42
43const (
44 TransactionKey = "transactionID"
45 FromTopic = "fromTopic"
46)
47
kdarapub26b4502019-10-05 03:02:33 +053048var ErrorTransactionNotAcquired = errors.New("transaction-not-acquired")
49var ErrorTransactionInvalidId = errors.New("transaction-invalid-id")
50
William Kurkianea869482019-04-09 15:16:11 -040051// requestHandlerChannel represents an interface associated with a channel. Whenever, an event is
52// obtained from that channel, this interface is invoked. This is used to handle
53// async requests into the Core via the kafka messaging bus
54type requestHandlerChannel struct {
55 requesthandlerInterface interface{}
56 ch <-chan *ic.InterContainerMessage
57}
58
59// transactionChannel represents a combination of a topic and a channel onto which a response received
60// on the kafka bus will be sent to
61type transactionChannel struct {
62 topic *Topic
63 ch chan *ic.InterContainerMessage
64}
65
npujarec5762e2020-01-01 14:08:48 +053066type InterContainerProxy interface {
Neha Sharma96b7bf22020-06-15 10:37:32 +000067 Start(ctx context.Context) error
68 Stop(ctx context.Context)
npujarec5762e2020-01-01 14:08:48 +053069 GetDefaultTopic() *Topic
npujarec5762e2020-01-01 14:08:48 +053070 InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic, waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any)
divyadesaid26f6b12020-03-19 06:30:28 +000071 InvokeAsyncRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic, waitForResponse bool, key string, kvArgs ...*KVArg) chan *RpcResponse
Neha Sharma96b7bf22020-06-15 10:37:32 +000072 SubscribeWithRequestHandlerInterface(ctx context.Context, topic Topic, handler interface{}) error
73 SubscribeWithDefaultRequestHandler(ctx context.Context, topic Topic, initialOffset int64) error
74 UnSubscribeFromRequestHandler(ctx context.Context, topic Topic) error
75 DeleteTopic(ctx context.Context, topic Topic) error
76 EnableLivenessChannel(ctx context.Context, enable bool) chan bool
77 SendLiveness(ctx context.Context) error
npujarec5762e2020-01-01 14:08:48 +053078}
79
80// interContainerProxy represents the messaging proxy
81type interContainerProxy struct {
Neha Sharma3f221ae2020-04-29 19:02:12 +000082 kafkaAddress string
npujarec5762e2020-01-01 14:08:48 +053083 defaultTopic *Topic
William Kurkianea869482019-04-09 15:16:11 -040084 defaultRequestHandlerInterface interface{}
William Kurkianea869482019-04-09 15:16:11 -040085 kafkaClient Client
npujarec5762e2020-01-01 14:08:48 +053086 doneCh chan struct{}
87 doneOnce sync.Once
William Kurkianea869482019-04-09 15:16:11 -040088
89 // This map is used to map a topic to an interface and channel. When a request is received
90 // on that channel (registered to the topic) then that interface is invoked.
91 topicToRequestHandlerChannelMap map[string]*requestHandlerChannel
92 lockTopicRequestHandlerChannelMap sync.RWMutex
93
94 // This map is used to map a channel to a response topic. This channel handles all responses on that
95 // channel for that topic and forward them to the appropriate consumers channel, using the
96 // transactionIdToChannelMap.
97 topicToResponseChannelMap map[string]<-chan *ic.InterContainerMessage
98 lockTopicResponseChannelMap sync.RWMutex
99
100 // This map is used to map a transaction to a consumers channel. This is used whenever a request has been
101 // sent out and we are waiting for a response.
102 transactionIdToChannelMap map[string]*transactionChannel
103 lockTransactionIdToChannelMap sync.RWMutex
104}
105
npujarec5762e2020-01-01 14:08:48 +0530106type InterContainerProxyOption func(*interContainerProxy)
William Kurkianea869482019-04-09 15:16:11 -0400107
Neha Sharma3f221ae2020-04-29 19:02:12 +0000108func InterContainerAddress(address string) InterContainerProxyOption {
npujarec5762e2020-01-01 14:08:48 +0530109 return func(args *interContainerProxy) {
Neha Sharma3f221ae2020-04-29 19:02:12 +0000110 args.kafkaAddress = address
William Kurkianea869482019-04-09 15:16:11 -0400111 }
112}
113
114func DefaultTopic(topic *Topic) InterContainerProxyOption {
npujarec5762e2020-01-01 14:08:48 +0530115 return func(args *interContainerProxy) {
116 args.defaultTopic = topic
William Kurkianea869482019-04-09 15:16:11 -0400117 }
118}
119
William Kurkianea869482019-04-09 15:16:11 -0400120func RequestHandlerInterface(handler interface{}) InterContainerProxyOption {
npujarec5762e2020-01-01 14:08:48 +0530121 return func(args *interContainerProxy) {
William Kurkianea869482019-04-09 15:16:11 -0400122 args.defaultRequestHandlerInterface = handler
123 }
124}
125
126func MsgClient(client Client) InterContainerProxyOption {
npujarec5762e2020-01-01 14:08:48 +0530127 return func(args *interContainerProxy) {
William Kurkianea869482019-04-09 15:16:11 -0400128 args.kafkaClient = client
129 }
130}
131
npujarec5762e2020-01-01 14:08:48 +0530132func newInterContainerProxy(opts ...InterContainerProxyOption) *interContainerProxy {
133 proxy := &interContainerProxy{
Neha Sharma3f221ae2020-04-29 19:02:12 +0000134 kafkaAddress: DefaultKafkaAddress,
135 doneCh: make(chan struct{}),
William Kurkianea869482019-04-09 15:16:11 -0400136 }
137
138 for _, option := range opts {
139 option(proxy)
140 }
141
npujarec5762e2020-01-01 14:08:48 +0530142 return proxy
William Kurkianea869482019-04-09 15:16:11 -0400143}
144
npujarec5762e2020-01-01 14:08:48 +0530145func NewInterContainerProxy(opts ...InterContainerProxyOption) InterContainerProxy {
146 return newInterContainerProxy(opts...)
147}
148
Neha Sharma96b7bf22020-06-15 10:37:32 +0000149func (kp *interContainerProxy) Start(ctx context.Context) error {
150 logger.Info(ctx, "Starting-Proxy")
William Kurkianea869482019-04-09 15:16:11 -0400151
152 // Kafka MsgClient should already have been created. If not, output fatal error
153 if kp.kafkaClient == nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000154 logger.Fatal(ctx, "kafka-client-not-set")
William Kurkianea869482019-04-09 15:16:11 -0400155 }
156
William Kurkianea869482019-04-09 15:16:11 -0400157 // Start the kafka client
Neha Sharma96b7bf22020-06-15 10:37:32 +0000158 if err := kp.kafkaClient.Start(ctx); err != nil {
159 logger.Errorw(ctx, "Cannot-create-kafka-proxy", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400160 return err
161 }
162
163 // Create the topic to response channel map
164 kp.topicToResponseChannelMap = make(map[string]<-chan *ic.InterContainerMessage)
165 //
166 // Create the transactionId to Channel Map
167 kp.transactionIdToChannelMap = make(map[string]*transactionChannel)
168
169 // Create the topic to request channel map
170 kp.topicToRequestHandlerChannelMap = make(map[string]*requestHandlerChannel)
171
172 return nil
173}
174
Neha Sharma96b7bf22020-06-15 10:37:32 +0000175func (kp *interContainerProxy) Stop(ctx context.Context) {
176 logger.Info(ctx, "stopping-intercontainer-proxy")
npujarec5762e2020-01-01 14:08:48 +0530177 kp.doneOnce.Do(func() { close(kp.doneCh) })
William Kurkianea869482019-04-09 15:16:11 -0400178 // TODO : Perform cleanup
Neha Sharma96b7bf22020-06-15 10:37:32 +0000179 kp.kafkaClient.Stop(ctx)
180 err := kp.deleteAllTopicRequestHandlerChannelMap(ctx)
Scott Bakere701b862020-02-20 16:19:16 -0800181 if err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000182 logger.Errorw(ctx, "failed-delete-all-topic-request-handler-channel-map", log.Fields{"error": err})
Scott Bakere701b862020-02-20 16:19:16 -0800183 }
Neha Sharma96b7bf22020-06-15 10:37:32 +0000184 err = kp.deleteAllTopicResponseChannelMap(ctx)
Scott Bakere701b862020-02-20 16:19:16 -0800185 if err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000186 logger.Errorw(ctx, "failed-delete-all-topic-response-channel-map", log.Fields{"error": err})
Scott Bakere701b862020-02-20 16:19:16 -0800187 }
Neha Sharma96b7bf22020-06-15 10:37:32 +0000188 kp.deleteAllTransactionIdToChannelMap(ctx)
William Kurkianea869482019-04-09 15:16:11 -0400189}
190
npujarec5762e2020-01-01 14:08:48 +0530191func (kp *interContainerProxy) GetDefaultTopic() *Topic {
192 return kp.defaultTopic
193}
194
divyadesaid26f6b12020-03-19 06:30:28 +0000195// InvokeAsyncRPC is used to make an RPC request asynchronously
196func (kp *interContainerProxy) InvokeAsyncRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
197 waitForResponse bool, key string, kvArgs ...*KVArg) chan *RpcResponse {
198
Neha Sharma96b7bf22020-06-15 10:37:32 +0000199 logger.Debugw(ctx, "InvokeAsyncRPC", log.Fields{"rpc": rpc, "key": key})
divyadesaid26f6b12020-03-19 06:30:28 +0000200 // If a replyToTopic is provided then we use it, otherwise just use the default toTopic. The replyToTopic is
201 // typically the device ID.
202 responseTopic := replyToTopic
203 if responseTopic == nil {
204 responseTopic = kp.GetDefaultTopic()
205 }
206
207 chnl := make(chan *RpcResponse)
208
209 go func() {
210
211 // once we're done,
212 // close the response channel
213 defer close(chnl)
214
215 var err error
216 var protoRequest *ic.InterContainerMessage
217
218 // Encode the request
Neha Sharma96b7bf22020-06-15 10:37:32 +0000219 protoRequest, err = encodeRequest(ctx, rpc, toTopic, responseTopic, key, kvArgs...)
divyadesaid26f6b12020-03-19 06:30:28 +0000220 if err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000221 logger.Warnw(ctx, "cannot-format-request", log.Fields{"rpc": rpc, "error": err})
divyadesaid26f6b12020-03-19 06:30:28 +0000222 chnl <- NewResponse(RpcFormattingError, err, nil)
223 return
224 }
225
226 // Subscribe for response, if needed, before sending request
227 var ch <-chan *ic.InterContainerMessage
Neha Sharma96b7bf22020-06-15 10:37:32 +0000228 if ch, err = kp.subscribeForResponse(ctx, *responseTopic, protoRequest.Header.Id); err != nil {
229 logger.Errorw(ctx, "failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
divyadesaid26f6b12020-03-19 06:30:28 +0000230 chnl <- NewResponse(RpcTransportError, err, nil)
231 return
232 }
233
234 // Send request - if the topic is formatted with a device Id then we will send the request using a
235 // specific key, hence ensuring a single partition is used to publish the request. This ensures that the
236 // subscriber on that topic will receive the request in the order it was sent. The key used is the deviceId.
Neha Sharma96b7bf22020-06-15 10:37:32 +0000237 logger.Debugw(ctx, "sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
divyadesaid26f6b12020-03-19 06:30:28 +0000238
239 // if the message is not sent on kafka publish an event an close the channel
Neha Sharma96b7bf22020-06-15 10:37:32 +0000240 if err = kp.kafkaClient.Send(ctx, protoRequest, toTopic, key); err != nil {
divyadesaid26f6b12020-03-19 06:30:28 +0000241 chnl <- NewResponse(RpcTransportError, err, nil)
242 return
243 }
244
245 // if the client is not waiting for a response send the ack and close the channel
246 chnl <- NewResponse(RpcSent, nil, nil)
247 if !waitForResponse {
248 return
249 }
250
251 defer func() {
252 // Remove the subscription for a response on return
Neha Sharma96b7bf22020-06-15 10:37:32 +0000253 if err := kp.unSubscribeForResponse(ctx, protoRequest.Header.Id); err != nil {
254 logger.Warnw(ctx, "invoke-async-rpc-unsubscriber-for-response-failed", log.Fields{"err": err})
divyadesaid26f6b12020-03-19 06:30:28 +0000255 }
256 }()
257
258 // Wait for response as well as timeout or cancellation
259 select {
260 case msg, ok := <-ch:
261 if !ok {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000262 logger.Warnw(ctx, "channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
divyadesaid26f6b12020-03-19 06:30:28 +0000263 chnl <- NewResponse(RpcTransportError, status.Error(codes.Aborted, "channel closed"), nil)
264 }
Neha Sharma96b7bf22020-06-15 10:37:32 +0000265 logger.Debugw(ctx, "received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
266 if responseBody, err := decodeResponse(ctx, msg); err != nil {
divyadesaid26f6b12020-03-19 06:30:28 +0000267 chnl <- NewResponse(RpcReply, err, nil)
268 } else {
269 if responseBody.Success {
270 chnl <- NewResponse(RpcReply, nil, responseBody.Result)
271 } else {
272 // response body contains an error
273 unpackErr := &ic.Error{}
274 if err := ptypes.UnmarshalAny(responseBody.Result, unpackErr); err != nil {
275 chnl <- NewResponse(RpcReply, err, nil)
276 } else {
277 chnl <- NewResponse(RpcReply, status.Error(codes.Internal, unpackErr.Reason), nil)
278 }
279 }
280 }
281 case <-ctx.Done():
Neha Sharma96b7bf22020-06-15 10:37:32 +0000282 logger.Errorw(ctx, "context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
divyadesaid26f6b12020-03-19 06:30:28 +0000283 err := status.Error(codes.DeadlineExceeded, ctx.Err().Error())
284 chnl <- NewResponse(RpcTimeout, err, nil)
285 case <-kp.doneCh:
286 chnl <- NewResponse(RpcSystemClosing, nil, nil)
Neha Sharma96b7bf22020-06-15 10:37:32 +0000287 logger.Warnw(ctx, "received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
divyadesaid26f6b12020-03-19 06:30:28 +0000288 }
289 }()
290 return chnl
291}
292
William Kurkianea869482019-04-09 15:16:11 -0400293// InvokeRPC is used to send a request to a given topic
npujarec5762e2020-01-01 14:08:48 +0530294func (kp *interContainerProxy) InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
William Kurkianea869482019-04-09 15:16:11 -0400295 waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any) {
296
297 // If a replyToTopic is provided then we use it, otherwise just use the default toTopic. The replyToTopic is
298 // typically the device ID.
299 responseTopic := replyToTopic
300 if responseTopic == nil {
npujarec5762e2020-01-01 14:08:48 +0530301 responseTopic = kp.defaultTopic
William Kurkianea869482019-04-09 15:16:11 -0400302 }
303
304 // Encode the request
Neha Sharma96b7bf22020-06-15 10:37:32 +0000305 protoRequest, err := encodeRequest(ctx, rpc, toTopic, responseTopic, key, kvArgs...)
William Kurkianea869482019-04-09 15:16:11 -0400306 if err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000307 logger.Warnw(ctx, "cannot-format-request", log.Fields{"rpc": rpc, "error": err})
William Kurkianea869482019-04-09 15:16:11 -0400308 return false, nil
309 }
310
311 // Subscribe for response, if needed, before sending request
312 var ch <-chan *ic.InterContainerMessage
313 if waitForResponse {
314 var err error
Neha Sharma96b7bf22020-06-15 10:37:32 +0000315 if ch, err = kp.subscribeForResponse(ctx, *responseTopic, protoRequest.Header.Id); err != nil {
316 logger.Errorw(ctx, "failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400317 }
318 }
319
320 // Send request - if the topic is formatted with a device Id then we will send the request using a
321 // specific key, hence ensuring a single partition is used to publish the request. This ensures that the
322 // subscriber on that topic will receive the request in the order it was sent. The key used is the deviceId.
323 //key := GetDeviceIdFromTopic(*toTopic)
Neha Sharma96b7bf22020-06-15 10:37:32 +0000324 logger.Debugw(ctx, "sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000325 go func() {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000326 if err := kp.kafkaClient.Send(ctx, protoRequest, toTopic, key); err != nil {
327 logger.Errorw(ctx, "send-failed", log.Fields{
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000328 "topic": toTopic,
329 "key": key,
330 "error": err})
331 }
332 }()
William Kurkianea869482019-04-09 15:16:11 -0400333
334 if waitForResponse {
335 // Create a child context based on the parent context, if any
336 var cancel context.CancelFunc
337 childCtx := context.Background()
338 if ctx == nil {
339 ctx, cancel = context.WithTimeout(context.Background(), DefaultRequestTimeout*time.Millisecond)
340 } else {
341 childCtx, cancel = context.WithTimeout(ctx, DefaultRequestTimeout*time.Millisecond)
342 }
343 defer cancel()
344
345 // Wait for response as well as timeout or cancellation
346 // Remove the subscription for a response on return
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000347 defer func() {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000348 if err := kp.unSubscribeForResponse(ctx, protoRequest.Header.Id); err != nil {
349 logger.Errorw(ctx, "response-unsubscribe-failed", log.Fields{
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000350 "id": protoRequest.Header.Id,
351 "error": err})
352 }
353 }()
William Kurkianea869482019-04-09 15:16:11 -0400354 select {
355 case msg, ok := <-ch:
356 if !ok {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000357 logger.Warnw(ctx, "channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400358 protoError := &ic.Error{Reason: "channel-closed"}
359 var marshalledArg *any.Any
360 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
361 return false, nil // Should never happen
362 }
363 return false, marshalledArg
364 }
Neha Sharma96b7bf22020-06-15 10:37:32 +0000365 logger.Debugw(ctx, "received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
William Kurkianea869482019-04-09 15:16:11 -0400366 var responseBody *ic.InterContainerResponseBody
367 var err error
Neha Sharma96b7bf22020-06-15 10:37:32 +0000368 if responseBody, err = decodeResponse(ctx, msg); err != nil {
369 logger.Errorw(ctx, "decode-response-error", log.Fields{"error": err})
npujarec5762e2020-01-01 14:08:48 +0530370 // FIXME we should return something
William Kurkianea869482019-04-09 15:16:11 -0400371 }
372 return responseBody.Success, responseBody.Result
373 case <-ctx.Done():
Neha Sharma96b7bf22020-06-15 10:37:32 +0000374 logger.Debugw(ctx, "context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
William Kurkianea869482019-04-09 15:16:11 -0400375 // pack the error as proto any type
npujarec5762e2020-01-01 14:08:48 +0530376 protoError := &ic.Error{Reason: ctx.Err().Error(), Code: ic.ErrorCode_DEADLINE_EXCEEDED}
377
William Kurkianea869482019-04-09 15:16:11 -0400378 var marshalledArg *any.Any
379 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
380 return false, nil // Should never happen
381 }
382 return false, marshalledArg
383 case <-childCtx.Done():
Neha Sharma96b7bf22020-06-15 10:37:32 +0000384 logger.Debugw(ctx, "context-cancelled", log.Fields{"rpc": rpc, "ctx": childCtx.Err()})
William Kurkianea869482019-04-09 15:16:11 -0400385 // pack the error as proto any type
npujarec5762e2020-01-01 14:08:48 +0530386 protoError := &ic.Error{Reason: childCtx.Err().Error(), Code: ic.ErrorCode_DEADLINE_EXCEEDED}
387
William Kurkianea869482019-04-09 15:16:11 -0400388 var marshalledArg *any.Any
389 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
390 return false, nil // Should never happen
391 }
392 return false, marshalledArg
393 case <-kp.doneCh:
Neha Sharma96b7bf22020-06-15 10:37:32 +0000394 logger.Infow(ctx, "received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
William Kurkianea869482019-04-09 15:16:11 -0400395 return true, nil
396 }
397 }
398 return true, nil
399}
400
401// SubscribeWithRequestHandlerInterface allows a caller to assign a target object to be invoked automatically
402// when a message is received on a given topic
Neha Sharma96b7bf22020-06-15 10:37:32 +0000403func (kp *interContainerProxy) SubscribeWithRequestHandlerInterface(ctx context.Context, topic Topic, handler interface{}) error {
William Kurkianea869482019-04-09 15:16:11 -0400404
405 // Subscribe to receive messages for that topic
406 var ch <-chan *ic.InterContainerMessage
407 var err error
Neha Sharma96b7bf22020-06-15 10:37:32 +0000408 if ch, err = kp.kafkaClient.Subscribe(ctx, &topic); err != nil {
William Kurkianea869482019-04-09 15:16:11 -0400409 //if ch, err = kp.Subscribe(topic); err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000410 logger.Errorw(ctx, "failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
Matt Jeannereteb5059f2019-07-19 06:11:00 -0400411 return err
William Kurkianea869482019-04-09 15:16:11 -0400412 }
413
414 kp.defaultRequestHandlerInterface = handler
415 kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: handler, ch: ch})
416 // Launch a go routine to receive and process kafka messages
Neha Sharma96b7bf22020-06-15 10:37:32 +0000417 go kp.waitForMessages(ctx, ch, topic, handler)
William Kurkianea869482019-04-09 15:16:11 -0400418
419 return nil
420}
421
422// SubscribeWithDefaultRequestHandler allows a caller to add a topic to an existing target object to be invoked automatically
423// when a message is received on a given topic. So far there is only 1 target registered per microservice
Neha Sharma96b7bf22020-06-15 10:37:32 +0000424func (kp *interContainerProxy) SubscribeWithDefaultRequestHandler(ctx context.Context, topic Topic, initialOffset int64) error {
William Kurkianea869482019-04-09 15:16:11 -0400425 // Subscribe to receive messages for that topic
426 var ch <-chan *ic.InterContainerMessage
427 var err error
Neha Sharma96b7bf22020-06-15 10:37:32 +0000428 if ch, err = kp.kafkaClient.Subscribe(ctx, &topic, &KVArg{Key: Offset, Value: initialOffset}); err != nil {
429 logger.Errorw(ctx, "failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400430 return err
431 }
432 kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: kp.defaultRequestHandlerInterface, ch: ch})
433
434 // Launch a go routine to receive and process kafka messages
Neha Sharma96b7bf22020-06-15 10:37:32 +0000435 go kp.waitForMessages(ctx, ch, topic, kp.defaultRequestHandlerInterface)
William Kurkianea869482019-04-09 15:16:11 -0400436
437 return nil
438}
439
Neha Sharma96b7bf22020-06-15 10:37:32 +0000440func (kp *interContainerProxy) UnSubscribeFromRequestHandler(ctx context.Context, topic Topic) error {
441 return kp.deleteFromTopicRequestHandlerChannelMap(ctx, topic.Name)
William Kurkianea869482019-04-09 15:16:11 -0400442}
443
Neha Sharma96b7bf22020-06-15 10:37:32 +0000444func (kp *interContainerProxy) deleteFromTopicResponseChannelMap(ctx context.Context, topic string) error {
William Kurkianea869482019-04-09 15:16:11 -0400445 kp.lockTopicResponseChannelMap.Lock()
446 defer kp.lockTopicResponseChannelMap.Unlock()
447 if _, exist := kp.topicToResponseChannelMap[topic]; exist {
448 // Unsubscribe to this topic first - this will close the subscribed channel
449 var err error
Neha Sharma96b7bf22020-06-15 10:37:32 +0000450 if err = kp.kafkaClient.UnSubscribe(ctx, &Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
451 logger.Errorw(ctx, "unsubscribing-error", log.Fields{"topic": topic})
William Kurkianea869482019-04-09 15:16:11 -0400452 }
453 delete(kp.topicToResponseChannelMap, topic)
454 return err
455 } else {
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000456 return fmt.Errorf("%s-Topic-not-found", topic)
William Kurkianea869482019-04-09 15:16:11 -0400457 }
458}
459
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000460// nolint: unused
Neha Sharma96b7bf22020-06-15 10:37:32 +0000461func (kp *interContainerProxy) deleteAllTopicResponseChannelMap(ctx context.Context) error {
462 logger.Debug(ctx, "delete-all-topic-response-channel")
William Kurkianea869482019-04-09 15:16:11 -0400463 kp.lockTopicResponseChannelMap.Lock()
464 defer kp.lockTopicResponseChannelMap.Unlock()
Scott Bakere701b862020-02-20 16:19:16 -0800465 var unsubscribeFailTopics []string
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000466 for topic := range kp.topicToResponseChannelMap {
William Kurkianea869482019-04-09 15:16:11 -0400467 // Unsubscribe to this topic first - this will close the subscribed channel
Neha Sharma96b7bf22020-06-15 10:37:32 +0000468 if err := kp.kafkaClient.UnSubscribe(ctx, &Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
Scott Bakere701b862020-02-20 16:19:16 -0800469 unsubscribeFailTopics = append(unsubscribeFailTopics, topic)
Neha Sharma96b7bf22020-06-15 10:37:32 +0000470 logger.Errorw(ctx, "unsubscribing-error", log.Fields{"topic": topic, "error": err})
Scott Bakere701b862020-02-20 16:19:16 -0800471 // Do not return. Continue to try to unsubscribe to other topics.
472 } else {
473 // Only delete from channel map if successfully unsubscribed.
474 delete(kp.topicToResponseChannelMap, topic)
William Kurkianea869482019-04-09 15:16:11 -0400475 }
William Kurkianea869482019-04-09 15:16:11 -0400476 }
Scott Bakere701b862020-02-20 16:19:16 -0800477 if len(unsubscribeFailTopics) > 0 {
478 return fmt.Errorf("unsubscribe-errors: %v", unsubscribeFailTopics)
479 }
480 return nil
William Kurkianea869482019-04-09 15:16:11 -0400481}
482
npujarec5762e2020-01-01 14:08:48 +0530483func (kp *interContainerProxy) addToTopicRequestHandlerChannelMap(topic string, arg *requestHandlerChannel) {
William Kurkianea869482019-04-09 15:16:11 -0400484 kp.lockTopicRequestHandlerChannelMap.Lock()
485 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
486 if _, exist := kp.topicToRequestHandlerChannelMap[topic]; !exist {
487 kp.topicToRequestHandlerChannelMap[topic] = arg
488 }
489}
490
Neha Sharma96b7bf22020-06-15 10:37:32 +0000491func (kp *interContainerProxy) deleteFromTopicRequestHandlerChannelMap(ctx context.Context, topic string) error {
William Kurkianea869482019-04-09 15:16:11 -0400492 kp.lockTopicRequestHandlerChannelMap.Lock()
493 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
494 if _, exist := kp.topicToRequestHandlerChannelMap[topic]; exist {
495 // Close the kafka client client first by unsubscribing to this topic
Neha Sharma96b7bf22020-06-15 10:37:32 +0000496 if err := kp.kafkaClient.UnSubscribe(ctx, &Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000497 return err
498 }
William Kurkianea869482019-04-09 15:16:11 -0400499 delete(kp.topicToRequestHandlerChannelMap, topic)
500 return nil
501 } else {
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000502 return fmt.Errorf("%s-Topic-not-found", topic)
William Kurkianea869482019-04-09 15:16:11 -0400503 }
504}
505
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000506// nolint: unused
Neha Sharma96b7bf22020-06-15 10:37:32 +0000507func (kp *interContainerProxy) deleteAllTopicRequestHandlerChannelMap(ctx context.Context) error {
508 logger.Debug(ctx, "delete-all-topic-request-channel")
William Kurkianea869482019-04-09 15:16:11 -0400509 kp.lockTopicRequestHandlerChannelMap.Lock()
510 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
Scott Bakere701b862020-02-20 16:19:16 -0800511 var unsubscribeFailTopics []string
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000512 for topic := range kp.topicToRequestHandlerChannelMap {
William Kurkianea869482019-04-09 15:16:11 -0400513 // Close the kafka client client first by unsubscribing to this topic
Neha Sharma96b7bf22020-06-15 10:37:32 +0000514 if err := kp.kafkaClient.UnSubscribe(ctx, &Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
Scott Bakere701b862020-02-20 16:19:16 -0800515 unsubscribeFailTopics = append(unsubscribeFailTopics, topic)
Neha Sharma96b7bf22020-06-15 10:37:32 +0000516 logger.Errorw(ctx, "unsubscribing-error", log.Fields{"topic": topic, "error": err})
Scott Bakere701b862020-02-20 16:19:16 -0800517 // Do not return. Continue to try to unsubscribe to other topics.
518 } else {
519 // Only delete from channel map if successfully unsubscribed.
520 delete(kp.topicToRequestHandlerChannelMap, topic)
William Kurkianea869482019-04-09 15:16:11 -0400521 }
William Kurkianea869482019-04-09 15:16:11 -0400522 }
Scott Bakere701b862020-02-20 16:19:16 -0800523 if len(unsubscribeFailTopics) > 0 {
524 return fmt.Errorf("unsubscribe-errors: %v", unsubscribeFailTopics)
525 }
526 return nil
William Kurkianea869482019-04-09 15:16:11 -0400527}
528
npujarec5762e2020-01-01 14:08:48 +0530529func (kp *interContainerProxy) addToTransactionIdToChannelMap(id string, topic *Topic, arg chan *ic.InterContainerMessage) {
William Kurkianea869482019-04-09 15:16:11 -0400530 kp.lockTransactionIdToChannelMap.Lock()
531 defer kp.lockTransactionIdToChannelMap.Unlock()
532 if _, exist := kp.transactionIdToChannelMap[id]; !exist {
533 kp.transactionIdToChannelMap[id] = &transactionChannel{topic: topic, ch: arg}
534 }
535}
536
npujarec5762e2020-01-01 14:08:48 +0530537func (kp *interContainerProxy) deleteFromTransactionIdToChannelMap(id string) {
William Kurkianea869482019-04-09 15:16:11 -0400538 kp.lockTransactionIdToChannelMap.Lock()
539 defer kp.lockTransactionIdToChannelMap.Unlock()
540 if transChannel, exist := kp.transactionIdToChannelMap[id]; exist {
541 // Close the channel first
542 close(transChannel.ch)
543 delete(kp.transactionIdToChannelMap, id)
544 }
545}
546
npujarec5762e2020-01-01 14:08:48 +0530547func (kp *interContainerProxy) deleteTopicTransactionIdToChannelMap(id string) {
William Kurkianea869482019-04-09 15:16:11 -0400548 kp.lockTransactionIdToChannelMap.Lock()
549 defer kp.lockTransactionIdToChannelMap.Unlock()
550 for key, value := range kp.transactionIdToChannelMap {
551 if value.topic.Name == id {
552 close(value.ch)
553 delete(kp.transactionIdToChannelMap, key)
554 }
555 }
556}
557
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000558// nolint: unused
Neha Sharma96b7bf22020-06-15 10:37:32 +0000559func (kp *interContainerProxy) deleteAllTransactionIdToChannelMap(ctx context.Context) {
560 logger.Debug(ctx, "delete-all-transaction-id-channel-map")
William Kurkianea869482019-04-09 15:16:11 -0400561 kp.lockTransactionIdToChannelMap.Lock()
562 defer kp.lockTransactionIdToChannelMap.Unlock()
563 for key, value := range kp.transactionIdToChannelMap {
564 close(value.ch)
565 delete(kp.transactionIdToChannelMap, key)
566 }
567}
568
Neha Sharma96b7bf22020-06-15 10:37:32 +0000569func (kp *interContainerProxy) DeleteTopic(ctx context.Context, topic Topic) error {
William Kurkianea869482019-04-09 15:16:11 -0400570 // If we have any consumers on that topic we need to close them
Neha Sharma96b7bf22020-06-15 10:37:32 +0000571 if err := kp.deleteFromTopicResponseChannelMap(ctx, topic.Name); err != nil {
572 logger.Errorw(ctx, "delete-from-topic-responsechannelmap-failed", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400573 }
Neha Sharma96b7bf22020-06-15 10:37:32 +0000574 if err := kp.deleteFromTopicRequestHandlerChannelMap(ctx, topic.Name); err != nil {
575 logger.Errorw(ctx, "delete-from-topic-requesthandlerchannelmap-failed", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400576 }
577 kp.deleteTopicTransactionIdToChannelMap(topic.Name)
578
Neha Sharma96b7bf22020-06-15 10:37:32 +0000579 return kp.kafkaClient.DeleteTopic(ctx, &topic)
William Kurkianea869482019-04-09 15:16:11 -0400580}
581
Neha Sharma96b7bf22020-06-15 10:37:32 +0000582func encodeReturnedValue(ctx context.Context, returnedVal interface{}) (*any.Any, error) {
William Kurkianea869482019-04-09 15:16:11 -0400583 // Encode the response argument - needs to be a proto message
584 if returnedVal == nil {
585 return nil, nil
586 }
587 protoValue, ok := returnedVal.(proto.Message)
588 if !ok {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000589 logger.Warnw(ctx, "response-value-not-proto-message", log.Fields{"error": ok, "returnVal": returnedVal})
William Kurkianea869482019-04-09 15:16:11 -0400590 err := errors.New("response-value-not-proto-message")
591 return nil, err
592 }
593
594 // Marshal the returned value, if any
595 var marshalledReturnedVal *any.Any
596 var err error
597 if marshalledReturnedVal, err = ptypes.MarshalAny(protoValue); err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000598 logger.Warnw(ctx, "cannot-marshal-returned-val", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400599 return nil, err
600 }
601 return marshalledReturnedVal, nil
602}
603
Neha Sharma96b7bf22020-06-15 10:37:32 +0000604func encodeDefaultFailedResponse(ctx context.Context, request *ic.InterContainerMessage) *ic.InterContainerMessage {
William Kurkianea869482019-04-09 15:16:11 -0400605 responseHeader := &ic.Header{
606 Id: request.Header.Id,
607 Type: ic.MessageType_RESPONSE,
608 FromTopic: request.Header.ToTopic,
609 ToTopic: request.Header.FromTopic,
Scott Bakered4a8e72020-04-17 11:10:20 -0700610 Timestamp: ptypes.TimestampNow(),
William Kurkianea869482019-04-09 15:16:11 -0400611 }
612 responseBody := &ic.InterContainerResponseBody{
613 Success: false,
614 Result: nil,
615 }
616 var marshalledResponseBody *any.Any
617 var err error
618 // Error should never happen here
619 if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000620 logger.Warnw(ctx, "cannot-marshal-failed-response-body", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400621 }
622
623 return &ic.InterContainerMessage{
624 Header: responseHeader,
625 Body: marshalledResponseBody,
626 }
627
628}
629
630//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
631//or an error on failure
Neha Sharma96b7bf22020-06-15 10:37:32 +0000632func encodeResponse(ctx context.Context, request *ic.InterContainerMessage, success bool, returnedValues ...interface{}) (*ic.InterContainerMessage, error) {
633 //logger.Debugw(ctx, "encodeResponse", log.Fields{"success": success, "returnedValues": returnedValues})
William Kurkianea869482019-04-09 15:16:11 -0400634 responseHeader := &ic.Header{
635 Id: request.Header.Id,
636 Type: ic.MessageType_RESPONSE,
637 FromTopic: request.Header.ToTopic,
638 ToTopic: request.Header.FromTopic,
Matt Jeanneret384d8c92019-05-06 14:27:31 -0400639 KeyTopic: request.Header.KeyTopic,
Scott Bakered4a8e72020-04-17 11:10:20 -0700640 Timestamp: ptypes.TimestampNow(),
William Kurkianea869482019-04-09 15:16:11 -0400641 }
642
643 // Go over all returned values
644 var marshalledReturnedVal *any.Any
645 var err error
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000646
647 // for now we support only 1 returned value - (excluding the error)
648 if len(returnedValues) > 0 {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000649 if marshalledReturnedVal, err = encodeReturnedValue(ctx, returnedValues[0]); err != nil {
650 logger.Warnw(ctx, "cannot-marshal-response-body", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400651 }
William Kurkianea869482019-04-09 15:16:11 -0400652 }
653
654 responseBody := &ic.InterContainerResponseBody{
655 Success: success,
656 Result: marshalledReturnedVal,
657 }
658
659 // Marshal the response body
660 var marshalledResponseBody *any.Any
661 if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000662 logger.Warnw(ctx, "cannot-marshal-response-body", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400663 return nil, err
664 }
665
666 return &ic.InterContainerMessage{
667 Header: responseHeader,
668 Body: marshalledResponseBody,
669 }, nil
670}
671
Neha Sharma96b7bf22020-06-15 10:37:32 +0000672func CallFuncByName(ctx context.Context, myClass interface{}, funcName string, params ...interface{}) (out []reflect.Value, err error) {
William Kurkianea869482019-04-09 15:16:11 -0400673 myClassValue := reflect.ValueOf(myClass)
674 // Capitalize the first letter in the funcName to workaround the first capital letters required to
675 // invoke a function from a different package
676 funcName = strings.Title(funcName)
677 m := myClassValue.MethodByName(funcName)
678 if !m.IsValid() {
679 return make([]reflect.Value, 0), fmt.Errorf("method-not-found \"%s\"", funcName)
680 }
Neha Sharma96b7bf22020-06-15 10:37:32 +0000681 in := make([]reflect.Value, len(params)+1)
682 in[0] = reflect.ValueOf(ctx)
William Kurkianea869482019-04-09 15:16:11 -0400683 for i, param := range params {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000684 in[i+1] = reflect.ValueOf(param)
William Kurkianea869482019-04-09 15:16:11 -0400685 }
686 out = m.Call(in)
687 return
688}
689
Neha Sharma96b7bf22020-06-15 10:37:32 +0000690func (kp *interContainerProxy) addTransactionId(ctx context.Context, transactionId string, currentArgs []*ic.Argument) []*ic.Argument {
William Kurkianea869482019-04-09 15:16:11 -0400691 arg := &KVArg{
692 Key: TransactionKey,
693 Value: &ic.StrType{Val: transactionId},
694 }
695
696 var marshalledArg *any.Any
697 var err error
698 if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: transactionId}); err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000699 logger.Warnw(ctx, "cannot-add-transactionId", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400700 return currentArgs
701 }
702 protoArg := &ic.Argument{
703 Key: arg.Key,
704 Value: marshalledArg,
705 }
706 return append(currentArgs, protoArg)
707}
708
Neha Sharma96b7bf22020-06-15 10:37:32 +0000709func (kp *interContainerProxy) addFromTopic(ctx context.Context, fromTopic string, currentArgs []*ic.Argument) []*ic.Argument {
William Kurkianea869482019-04-09 15:16:11 -0400710 var marshalledArg *any.Any
711 var err error
712 if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: fromTopic}); err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000713 logger.Warnw(ctx, "cannot-add-transactionId", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400714 return currentArgs
715 }
716 protoArg := &ic.Argument{
717 Key: FromTopic,
718 Value: marshalledArg,
719 }
720 return append(currentArgs, protoArg)
721}
722
Neha Sharma96b7bf22020-06-15 10:37:32 +0000723func (kp *interContainerProxy) handleMessage(ctx context.Context, msg *ic.InterContainerMessage, targetInterface interface{}) {
William Kurkianea869482019-04-09 15:16:11 -0400724
725 // First extract the header to know whether this is a request - responses are handled by a different handler
726 if msg.Header.Type == ic.MessageType_REQUEST {
727 var out []reflect.Value
728 var err error
729
730 // Get the request body
731 requestBody := &ic.InterContainerRequestBody{}
732 if err = ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000733 logger.Warnw(ctx, "cannot-unmarshal-request", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400734 } else {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000735 logger.Debugw(ctx, "received-request", log.Fields{"rpc": requestBody.Rpc, "header": msg.Header})
William Kurkianea869482019-04-09 15:16:11 -0400736 // let the callee unpack the arguments as its the only one that knows the real proto type
737 // Augment the requestBody with the message Id as it will be used in scenarios where cores
738 // are set in pairs and competing
Neha Sharma96b7bf22020-06-15 10:37:32 +0000739 requestBody.Args = kp.addTransactionId(ctx, msg.Header.Id, requestBody.Args)
William Kurkianea869482019-04-09 15:16:11 -0400740
741 // Augment the requestBody with the From topic name as it will be used in scenarios where a container
742 // needs to send an unsollicited message to the currently requested container
Neha Sharma96b7bf22020-06-15 10:37:32 +0000743 requestBody.Args = kp.addFromTopic(ctx, msg.Header.FromTopic, requestBody.Args)
William Kurkianea869482019-04-09 15:16:11 -0400744
Neha Sharma96b7bf22020-06-15 10:37:32 +0000745 out, err = CallFuncByName(ctx, targetInterface, requestBody.Rpc, requestBody.Args)
William Kurkianea869482019-04-09 15:16:11 -0400746 if err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000747 logger.Warn(ctx, err)
William Kurkianea869482019-04-09 15:16:11 -0400748 }
749 }
750 // Response required?
751 if requestBody.ResponseRequired {
752 // If we already have an error before then just return that
753 var returnError *ic.Error
754 var returnedValues []interface{}
755 var success bool
756 if err != nil {
757 returnError = &ic.Error{Reason: err.Error()}
758 returnedValues = make([]interface{}, 1)
759 returnedValues[0] = returnError
760 } else {
761 returnedValues = make([]interface{}, 0)
762 // Check for errors first
763 lastIndex := len(out) - 1
764 if out[lastIndex].Interface() != nil { // Error
kdarapub26b4502019-10-05 03:02:33 +0530765 if retError, ok := out[lastIndex].Interface().(error); ok {
766 if retError.Error() == ErrorTransactionNotAcquired.Error() {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000767 logger.Debugw(ctx, "Ignoring request", log.Fields{"error": retError, "txId": msg.Header.Id})
kdarapub26b4502019-10-05 03:02:33 +0530768 return // Ignore - process is in competing mode and ignored transaction
769 }
770 returnError = &ic.Error{Reason: retError.Error()}
William Kurkianea869482019-04-09 15:16:11 -0400771 returnedValues = append(returnedValues, returnError)
772 } else { // Should never happen
773 returnError = &ic.Error{Reason: "incorrect-error-returns"}
774 returnedValues = append(returnedValues, returnError)
775 }
776 } else if len(out) == 2 && reflect.ValueOf(out[0].Interface()).IsValid() && reflect.ValueOf(out[0].Interface()).IsNil() {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000777 logger.Warnw(ctx, "Unexpected response of (nil,nil)", log.Fields{"txId": msg.Header.Id})
kdarapub26b4502019-10-05 03:02:33 +0530778 return // Ignore - should not happen
William Kurkianea869482019-04-09 15:16:11 -0400779 } else { // Non-error case
780 success = true
781 for idx, val := range out {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000782 //logger.Debugw(ctx, "returned-api-response-loop", log.Fields{"idx": idx, "val": val.Interface()})
William Kurkianea869482019-04-09 15:16:11 -0400783 if idx != lastIndex {
784 returnedValues = append(returnedValues, val.Interface())
785 }
786 }
787 }
788 }
789
790 var icm *ic.InterContainerMessage
Neha Sharma96b7bf22020-06-15 10:37:32 +0000791 if icm, err = encodeResponse(ctx, msg, success, returnedValues...); err != nil {
792 logger.Warnw(ctx, "error-encoding-response-returning-failure-result", log.Fields{"error": err})
793 icm = encodeDefaultFailedResponse(ctx, msg)
William Kurkianea869482019-04-09 15:16:11 -0400794 }
795 // To preserve ordering of messages, all messages to a given topic are sent to the same partition
796 // by providing a message key. The key is encoded in the topic name. If the deviceId is not
797 // present then the key will be empty, hence all messages for a given topic will be sent to all
798 // partitions.
799 replyTopic := &Topic{Name: msg.Header.FromTopic}
800 key := msg.Header.KeyTopic
Neha Sharma96b7bf22020-06-15 10:37:32 +0000801 logger.Debugw(ctx, "sending-response-to-kafka", log.Fields{"rpc": requestBody.Rpc, "header": icm.Header, "key": key})
William Kurkianea869482019-04-09 15:16:11 -0400802 // TODO: handle error response.
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000803 go func() {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000804 if err := kp.kafkaClient.Send(ctx, icm, replyTopic, key); err != nil {
805 logger.Errorw(ctx, "send-reply-failed", log.Fields{
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000806 "topic": replyTopic,
807 "key": key,
808 "error": err})
809 }
810 }()
William Kurkianea869482019-04-09 15:16:11 -0400811 }
812 } else if msg.Header.Type == ic.MessageType_RESPONSE {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000813 logger.Debugw(ctx, "response-received", log.Fields{"msg-header": msg.Header})
814 go kp.dispatchResponse(ctx, msg)
William Kurkianea869482019-04-09 15:16:11 -0400815 } else {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000816 logger.Warnw(ctx, "unsupported-message-received", log.Fields{"msg-header": msg.Header})
William Kurkianea869482019-04-09 15:16:11 -0400817 }
818}
819
Neha Sharma96b7bf22020-06-15 10:37:32 +0000820func (kp *interContainerProxy) waitForMessages(ctx context.Context, ch <-chan *ic.InterContainerMessage, topic Topic, targetInterface interface{}) {
William Kurkianea869482019-04-09 15:16:11 -0400821 // Wait for messages
822 for msg := range ch {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000823 //logger.Debugw(ctx, "request-received", log.Fields{"msg": msg, "topic": topic.Name, "target": targetInterface})
824 go kp.handleMessage(context.Background(), msg, targetInterface)
William Kurkianea869482019-04-09 15:16:11 -0400825 }
826}
827
Neha Sharma96b7bf22020-06-15 10:37:32 +0000828func (kp *interContainerProxy) dispatchResponse(ctx context.Context, msg *ic.InterContainerMessage) {
William Kurkianea869482019-04-09 15:16:11 -0400829 kp.lockTransactionIdToChannelMap.RLock()
830 defer kp.lockTransactionIdToChannelMap.RUnlock()
831 if _, exist := kp.transactionIdToChannelMap[msg.Header.Id]; !exist {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000832 logger.Debugw(ctx, "no-waiting-channel", log.Fields{"transaction": msg.Header.Id})
William Kurkianea869482019-04-09 15:16:11 -0400833 return
834 }
835 kp.transactionIdToChannelMap[msg.Header.Id].ch <- msg
836}
837
838// subscribeForResponse allows a caller to subscribe to a given topic when waiting for a response.
839// This method is built to prevent all subscribers to receive all messages as is the case of the Subscribe
840// API. There is one response channel waiting for kafka messages before dispatching the message to the
841// corresponding waiting channel
Neha Sharma96b7bf22020-06-15 10:37:32 +0000842func (kp *interContainerProxy) subscribeForResponse(ctx context.Context, topic Topic, trnsId string) (chan *ic.InterContainerMessage, error) {
843 logger.Debugw(ctx, "subscribeForResponse", log.Fields{"topic": topic.Name, "trnsid": trnsId})
William Kurkianea869482019-04-09 15:16:11 -0400844
845 // Create a specific channel for this consumers. We cannot use the channel from the kafkaclient as it will
846 // broadcast any message for this topic to all channels waiting on it.
divyadesaid26f6b12020-03-19 06:30:28 +0000847 // Set channel size to 1 to prevent deadlock, see VOL-2708
848 ch := make(chan *ic.InterContainerMessage, 1)
William Kurkianea869482019-04-09 15:16:11 -0400849 kp.addToTransactionIdToChannelMap(trnsId, &topic, ch)
850
851 return ch, nil
852}
853
Neha Sharma96b7bf22020-06-15 10:37:32 +0000854func (kp *interContainerProxy) unSubscribeForResponse(ctx context.Context, trnsId string) error {
855 logger.Debugw(ctx, "unsubscribe-for-response", log.Fields{"trnsId": trnsId})
William Kurkianea869482019-04-09 15:16:11 -0400856 kp.deleteFromTransactionIdToChannelMap(trnsId)
857 return nil
858}
859
Neha Sharma96b7bf22020-06-15 10:37:32 +0000860func (kp *interContainerProxy) EnableLivenessChannel(ctx context.Context, enable bool) chan bool {
861 return kp.kafkaClient.EnableLivenessChannel(ctx, enable)
cbabu95f21522019-11-13 14:25:18 +0100862}
863
Neha Sharma96b7bf22020-06-15 10:37:32 +0000864func (kp *interContainerProxy) EnableHealthinessChannel(ctx context.Context, enable bool) chan bool {
865 return kp.kafkaClient.EnableHealthinessChannel(ctx, enable)
Scott Baker86fce9a2019-12-12 09:47:17 -0800866}
867
Neha Sharma96b7bf22020-06-15 10:37:32 +0000868func (kp *interContainerProxy) SendLiveness(ctx context.Context) error {
869 return kp.kafkaClient.SendLiveness(ctx)
cbabu95f21522019-11-13 14:25:18 +0100870}
871
William Kurkianea869482019-04-09 15:16:11 -0400872//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
873//or an error on failure
Neha Sharma96b7bf22020-06-15 10:37:32 +0000874func encodeRequest(ctx context.Context, rpc string, toTopic *Topic, replyTopic *Topic, key string, kvArgs ...*KVArg) (*ic.InterContainerMessage, error) {
William Kurkianea869482019-04-09 15:16:11 -0400875 requestHeader := &ic.Header{
876 Id: uuid.New().String(),
877 Type: ic.MessageType_REQUEST,
878 FromTopic: replyTopic.Name,
879 ToTopic: toTopic.Name,
Matt Jeanneret384d8c92019-05-06 14:27:31 -0400880 KeyTopic: key,
Scott Bakered4a8e72020-04-17 11:10:20 -0700881 Timestamp: ptypes.TimestampNow(),
William Kurkianea869482019-04-09 15:16:11 -0400882 }
883 requestBody := &ic.InterContainerRequestBody{
884 Rpc: rpc,
885 ResponseRequired: true,
886 ReplyToTopic: replyTopic.Name,
887 }
888
889 for _, arg := range kvArgs {
890 if arg == nil {
891 // In case the caller sends an array with empty args
892 continue
893 }
894 var marshalledArg *any.Any
895 var err error
896 // ascertain the value interface type is a proto.Message
897 protoValue, ok := arg.Value.(proto.Message)
898 if !ok {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000899 logger.Warnw(ctx, "argument-value-not-proto-message", log.Fields{"error": ok, "Value": arg.Value})
William Kurkianea869482019-04-09 15:16:11 -0400900 err := errors.New("argument-value-not-proto-message")
901 return nil, err
902 }
903 if marshalledArg, err = ptypes.MarshalAny(protoValue); err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000904 logger.Warnw(ctx, "cannot-marshal-request", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400905 return nil, err
906 }
907 protoArg := &ic.Argument{
908 Key: arg.Key,
909 Value: marshalledArg,
910 }
911 requestBody.Args = append(requestBody.Args, protoArg)
912 }
913
914 var marshalledData *any.Any
915 var err error
916 if marshalledData, err = ptypes.MarshalAny(requestBody); err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000917 logger.Warnw(ctx, "cannot-marshal-request", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400918 return nil, err
919 }
920 request := &ic.InterContainerMessage{
921 Header: requestHeader,
922 Body: marshalledData,
923 }
924 return request, nil
925}
926
Neha Sharma96b7bf22020-06-15 10:37:32 +0000927func decodeResponse(ctx context.Context, response *ic.InterContainerMessage) (*ic.InterContainerResponseBody, error) {
William Kurkianea869482019-04-09 15:16:11 -0400928 // Extract the message body
929 responseBody := ic.InterContainerResponseBody{}
930 if err := ptypes.UnmarshalAny(response.Body, &responseBody); err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000931 logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400932 return nil, err
933 }
Neha Sharma96b7bf22020-06-15 10:37:32 +0000934 //logger.Debugw(ctx, "response-decoded-successfully", log.Fields{"response-status": &responseBody.Success})
William Kurkianea869482019-04-09 15:16:11 -0400935
936 return &responseBody, nil
937
938}