blob: 120ed45c28671e9664391e830fa64bf0a9991aee [file] [log] [blame]
khenaidoobf6e7bb2018-08-14 22:27:29 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
khenaidooabad44c2018-08-03 16:58:35 -040016package kafka
17
18import (
19 "context"
Girish Kumardef46fc2020-08-05 18:20:11 +000020 "encoding/json"
khenaidooabad44c2018-08-03 16:58:35 -040021 "errors"
22 "fmt"
khenaidooabad44c2018-08-03 16:58:35 -040023 "reflect"
khenaidoo19374072018-12-11 11:05:15 -050024 "strings"
khenaidooabad44c2018-08-03 16:58:35 -040025 "sync"
26 "time"
khenaidooabad44c2018-08-03 16:58:35 -040027
Girish Gowdra84156d22021-07-08 14:59:36 -070028 "google.golang.org/grpc/codes"
29 "google.golang.org/grpc/status"
30
serkant.uluderya2ae470f2020-01-21 11:13:09 -080031 "github.com/golang/protobuf/proto"
32 "github.com/golang/protobuf/ptypes"
33 "github.com/golang/protobuf/ptypes/any"
34 "github.com/google/uuid"
yasin sapli5458a1c2021-06-14 22:24:38 +000035 "github.com/opencord/voltha-lib-go/v5/pkg/log"
Maninderdfadc982020-10-28 14:04:33 +053036 ic "github.com/opencord/voltha-protos/v4/go/inter_container"
Girish Kumardef46fc2020-08-05 18:20:11 +000037 "github.com/opentracing/opentracing-go"
serkant.uluderya2ae470f2020-01-21 11:13:09 -080038)
khenaidooabad44c2018-08-03 16:58:35 -040039
40const (
khenaidoo43c82122018-11-22 18:38:28 -050041 DefaultMaxRetries = 3
Scott Bakerb9635992020-03-11 21:11:28 -070042 DefaultRequestTimeout = 60000 // 60000 milliseconds - to handle a wider latency range
khenaidooabad44c2018-08-03 16:58:35 -040043)
44
khenaidoo297cd252019-02-07 22:10:23 -050045const (
46 TransactionKey = "transactionID"
khenaidoo54e0ddf2019-02-27 16:21:33 -050047 FromTopic = "fromTopic"
khenaidoo297cd252019-02-07 22:10:23 -050048)
49
khenaidoo09771ef2019-10-11 14:25:02 -040050var ErrorTransactionNotAcquired = errors.New("transaction-not-acquired")
51var ErrorTransactionInvalidId = errors.New("transaction-invalid-id")
52
khenaidoo43c82122018-11-22 18:38:28 -050053// requestHandlerChannel represents an interface associated with a channel. Whenever, an event is
54// obtained from that channel, this interface is invoked. This is used to handle
55// async requests into the Core via the kafka messaging bus
56type requestHandlerChannel struct {
57 requesthandlerInterface interface{}
khenaidoo79232702018-12-04 11:00:41 -050058 ch <-chan *ic.InterContainerMessage
khenaidooabad44c2018-08-03 16:58:35 -040059}
60
khenaidoo43c82122018-11-22 18:38:28 -050061// transactionChannel represents a combination of a topic and a channel onto which a response received
62// on the kafka bus will be sent to
63type transactionChannel struct {
64 topic *Topic
khenaidoo79232702018-12-04 11:00:41 -050065 ch chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -050066}
67
npujar467fe752020-01-16 20:17:45 +053068type InterContainerProxy interface {
Rohan Agrawal31f21802020-06-12 05:38:46 +000069 Start(ctx context.Context) error
70 Stop(ctx context.Context)
npujar467fe752020-01-16 20:17:45 +053071 GetDefaultTopic() *Topic
npujar467fe752020-01-16 20:17:45 +053072 InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic, waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any)
Scott Bakerb9635992020-03-11 21:11:28 -070073 InvokeAsyncRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic, waitForResponse bool, key string, kvArgs ...*KVArg) chan *RpcResponse
Rohan Agrawal31f21802020-06-12 05:38:46 +000074 SubscribeWithRequestHandlerInterface(ctx context.Context, topic Topic, handler interface{}) error
75 SubscribeWithDefaultRequestHandler(ctx context.Context, topic Topic, initialOffset int64) error
76 UnSubscribeFromRequestHandler(ctx context.Context, topic Topic) error
77 DeleteTopic(ctx context.Context, topic Topic) error
78 EnableLivenessChannel(ctx context.Context, enable bool) chan bool
79 SendLiveness(ctx context.Context) error
npujar467fe752020-01-16 20:17:45 +053080}
81
82// interContainerProxy represents the messaging proxy
83type interContainerProxy struct {
Neha Sharmad1387da2020-05-07 20:07:28 +000084 kafkaAddress string
npujar467fe752020-01-16 20:17:45 +053085 defaultTopic *Topic
khenaidoo43c82122018-11-22 18:38:28 -050086 defaultRequestHandlerInterface interface{}
87 kafkaClient Client
npujar467fe752020-01-16 20:17:45 +053088 doneCh chan struct{}
89 doneOnce sync.Once
khenaidoo43c82122018-11-22 18:38:28 -050090
91 // This map is used to map a topic to an interface and channel. When a request is received
92 // on that channel (registered to the topic) then that interface is invoked.
93 topicToRequestHandlerChannelMap map[string]*requestHandlerChannel
94 lockTopicRequestHandlerChannelMap sync.RWMutex
95
96 // This map is used to map a channel to a response topic. This channel handles all responses on that
khenaidoo4c1a5bf2018-11-29 15:53:42 -050097 // channel for that topic and forward them to the appropriate consumers channel, using the
khenaidoo43c82122018-11-22 18:38:28 -050098 // transactionIdToChannelMap.
khenaidoo79232702018-12-04 11:00:41 -050099 topicToResponseChannelMap map[string]<-chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -0500100 lockTopicResponseChannelMap sync.RWMutex
101
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500102 // This map is used to map a transaction to a consumers channel. This is used whenever a request has been
khenaidoo43c82122018-11-22 18:38:28 -0500103 // sent out and we are waiting for a response.
104 transactionIdToChannelMap map[string]*transactionChannel
khenaidooabad44c2018-08-03 16:58:35 -0400105 lockTransactionIdToChannelMap sync.RWMutex
106}
107
npujar467fe752020-01-16 20:17:45 +0530108type InterContainerProxyOption func(*interContainerProxy)
khenaidooabad44c2018-08-03 16:58:35 -0400109
Neha Sharmad1387da2020-05-07 20:07:28 +0000110func InterContainerAddress(address string) InterContainerProxyOption {
npujar467fe752020-01-16 20:17:45 +0530111 return func(args *interContainerProxy) {
Neha Sharmad1387da2020-05-07 20:07:28 +0000112 args.kafkaAddress = address
khenaidooabad44c2018-08-03 16:58:35 -0400113 }
114}
115
khenaidoo43c82122018-11-22 18:38:28 -0500116func DefaultTopic(topic *Topic) InterContainerProxyOption {
npujar467fe752020-01-16 20:17:45 +0530117 return func(args *interContainerProxy) {
118 args.defaultTopic = topic
khenaidooabad44c2018-08-03 16:58:35 -0400119 }
120}
121
khenaidoo43c82122018-11-22 18:38:28 -0500122func RequestHandlerInterface(handler interface{}) InterContainerProxyOption {
npujar467fe752020-01-16 20:17:45 +0530123 return func(args *interContainerProxy) {
khenaidoo43c82122018-11-22 18:38:28 -0500124 args.defaultRequestHandlerInterface = handler
khenaidooabad44c2018-08-03 16:58:35 -0400125 }
126}
127
khenaidoo43c82122018-11-22 18:38:28 -0500128func MsgClient(client Client) InterContainerProxyOption {
npujar467fe752020-01-16 20:17:45 +0530129 return func(args *interContainerProxy) {
khenaidoo43c82122018-11-22 18:38:28 -0500130 args.kafkaClient = client
131 }
132}
133
npujar467fe752020-01-16 20:17:45 +0530134func newInterContainerProxy(opts ...InterContainerProxyOption) *interContainerProxy {
135 proxy := &interContainerProxy{
Neha Sharmad1387da2020-05-07 20:07:28 +0000136 kafkaAddress: DefaultKafkaAddress,
137 doneCh: make(chan struct{}),
khenaidooabad44c2018-08-03 16:58:35 -0400138 }
139
140 for _, option := range opts {
141 option(proxy)
142 }
143
npujar467fe752020-01-16 20:17:45 +0530144 return proxy
khenaidooabad44c2018-08-03 16:58:35 -0400145}
146
npujar467fe752020-01-16 20:17:45 +0530147func NewInterContainerProxy(opts ...InterContainerProxyOption) InterContainerProxy {
148 return newInterContainerProxy(opts...)
149}
150
Rohan Agrawal31f21802020-06-12 05:38:46 +0000151func (kp *interContainerProxy) Start(ctx context.Context) error {
152 logger.Info(ctx, "Starting-Proxy")
khenaidooabad44c2018-08-03 16:58:35 -0400153
khenaidoo43c82122018-11-22 18:38:28 -0500154 // Kafka MsgClient should already have been created. If not, output fatal error
155 if kp.kafkaClient == nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000156 logger.Fatal(ctx, "kafka-client-not-set")
khenaidoo43c82122018-11-22 18:38:28 -0500157 }
158
khenaidoo43c82122018-11-22 18:38:28 -0500159 // Start the kafka client
Rohan Agrawal31f21802020-06-12 05:38:46 +0000160 if err := kp.kafkaClient.Start(ctx); err != nil {
161 logger.Errorw(ctx, "Cannot-create-kafka-proxy", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400162 return err
163 }
164
khenaidoo43c82122018-11-22 18:38:28 -0500165 // Create the topic to response channel map
khenaidoo79232702018-12-04 11:00:41 -0500166 kp.topicToResponseChannelMap = make(map[string]<-chan *ic.InterContainerMessage)
khenaidoo43c82122018-11-22 18:38:28 -0500167 //
khenaidooabad44c2018-08-03 16:58:35 -0400168 // Create the transactionId to Channel Map
khenaidoo43c82122018-11-22 18:38:28 -0500169 kp.transactionIdToChannelMap = make(map[string]*transactionChannel)
170
171 // Create the topic to request channel map
172 kp.topicToRequestHandlerChannelMap = make(map[string]*requestHandlerChannel)
khenaidooabad44c2018-08-03 16:58:35 -0400173
174 return nil
175}
176
Rohan Agrawal31f21802020-06-12 05:38:46 +0000177func (kp *interContainerProxy) Stop(ctx context.Context) {
178 logger.Info(ctx, "stopping-intercontainer-proxy")
npujar467fe752020-01-16 20:17:45 +0530179 kp.doneOnce.Do(func() { close(kp.doneCh) })
khenaidoo43c82122018-11-22 18:38:28 -0500180 // TODO : Perform cleanup
Rohan Agrawal31f21802020-06-12 05:38:46 +0000181 kp.kafkaClient.Stop(ctx)
182 err := kp.deleteAllTopicRequestHandlerChannelMap(ctx)
Scott Baker0e78ba22020-02-24 17:58:47 -0800183 if err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000184 logger.Errorw(ctx, "failed-delete-all-topic-request-handler-channel-map", log.Fields{"error": err})
Scott Baker0e78ba22020-02-24 17:58:47 -0800185 }
Rohan Agrawal31f21802020-06-12 05:38:46 +0000186 err = kp.deleteAllTopicResponseChannelMap(ctx)
Scott Baker0e78ba22020-02-24 17:58:47 -0800187 if err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000188 logger.Errorw(ctx, "failed-delete-all-topic-response-channel-map", log.Fields{"error": err})
Scott Baker0e78ba22020-02-24 17:58:47 -0800189 }
Rohan Agrawal31f21802020-06-12 05:38:46 +0000190 kp.deleteAllTransactionIdToChannelMap(ctx)
khenaidooabad44c2018-08-03 16:58:35 -0400191}
192
npujar467fe752020-01-16 20:17:45 +0530193func (kp *interContainerProxy) GetDefaultTopic() *Topic {
194 return kp.defaultTopic
195}
196
Scott Bakerb9635992020-03-11 21:11:28 -0700197// InvokeAsyncRPC is used to make an RPC request asynchronously
198func (kp *interContainerProxy) InvokeAsyncRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
199 waitForResponse bool, key string, kvArgs ...*KVArg) chan *RpcResponse {
200
Girish Kumardef46fc2020-08-05 18:20:11 +0000201 spanArg, span, ctx := kp.embedSpanAsArg(ctx, rpc, !waitForResponse)
202 if spanArg != nil {
203 kvArgs = append(kvArgs, &spanArg[0])
204 }
Girish Gowdra84156d22021-07-08 14:59:36 -0700205
Girish Kumardef46fc2020-08-05 18:20:11 +0000206 defer span.Finish()
207
Girish Gowdra84156d22021-07-08 14:59:36 -0700208 logger.Debugw(ctx, "InvokeAsyncRPC", log.Fields{"rpc": rpc, "key": key, "kvArgs": kvArgs})
209
Scott Bakerb9635992020-03-11 21:11:28 -0700210 // If a replyToTopic is provided then we use it, otherwise just use the default toTopic. The replyToTopic is
211 // typically the device ID.
212 responseTopic := replyToTopic
213 if responseTopic == nil {
214 responseTopic = kp.GetDefaultTopic()
215 }
216
217 chnl := make(chan *RpcResponse)
218
219 go func() {
220
221 // once we're done,
222 // close the response channel
223 defer close(chnl)
224
225 var err error
226 var protoRequest *ic.InterContainerMessage
227
228 // Encode the request
Rohan Agrawal31f21802020-06-12 05:38:46 +0000229 protoRequest, err = encodeRequest(ctx, rpc, toTopic, responseTopic, key, kvArgs...)
Scott Bakerb9635992020-03-11 21:11:28 -0700230 if err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000231 logger.Warnw(ctx, "cannot-format-request", log.Fields{"rpc": rpc, "error": err})
Girish Kumardef46fc2020-08-05 18:20:11 +0000232 log.MarkSpanError(ctx, errors.New("cannot-format-request"))
Scott Bakerb9635992020-03-11 21:11:28 -0700233 chnl <- NewResponse(RpcFormattingError, err, nil)
234 return
235 }
236
237 // Subscribe for response, if needed, before sending request
238 var ch <-chan *ic.InterContainerMessage
Rohan Agrawal31f21802020-06-12 05:38:46 +0000239 if ch, err = kp.subscribeForResponse(ctx, *responseTopic, protoRequest.Header.Id); err != nil {
240 logger.Errorw(ctx, "failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
Girish Kumardef46fc2020-08-05 18:20:11 +0000241 log.MarkSpanError(ctx, errors.New("failed-to-subscribe-for-response"))
Scott Bakerb9635992020-03-11 21:11:28 -0700242 chnl <- NewResponse(RpcTransportError, err, nil)
243 return
244 }
245
246 // Send request - if the topic is formatted with a device Id then we will send the request using a
247 // specific key, hence ensuring a single partition is used to publish the request. This ensures that the
248 // subscriber on that topic will receive the request in the order it was sent. The key used is the deviceId.
Rohan Agrawal31f21802020-06-12 05:38:46 +0000249 logger.Debugw(ctx, "sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
Scott Bakerb9635992020-03-11 21:11:28 -0700250
251 // if the message is not sent on kafka publish an event an close the channel
Rohan Agrawal31f21802020-06-12 05:38:46 +0000252 if err = kp.kafkaClient.Send(ctx, protoRequest, toTopic, key); err != nil {
Scott Bakerb9635992020-03-11 21:11:28 -0700253 chnl <- NewResponse(RpcTransportError, err, nil)
254 return
255 }
256
257 // if the client is not waiting for a response send the ack and close the channel
258 chnl <- NewResponse(RpcSent, nil, nil)
259 if !waitForResponse {
260 return
261 }
262
263 defer func() {
264 // Remove the subscription for a response on return
Rohan Agrawal31f21802020-06-12 05:38:46 +0000265 if err := kp.unSubscribeForResponse(ctx, protoRequest.Header.Id); err != nil {
266 logger.Warnw(ctx, "invoke-async-rpc-unsubscriber-for-response-failed", log.Fields{"err": err})
Scott Bakerb9635992020-03-11 21:11:28 -0700267 }
268 }()
269
270 // Wait for response as well as timeout or cancellation
271 select {
272 case msg, ok := <-ch:
273 if !ok {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000274 logger.Warnw(ctx, "channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
Girish Kumardef46fc2020-08-05 18:20:11 +0000275 log.MarkSpanError(ctx, errors.New("channel-closed"))
Scott Bakerb9635992020-03-11 21:11:28 -0700276 chnl <- NewResponse(RpcTransportError, status.Error(codes.Aborted, "channel closed"), nil)
277 }
Rohan Agrawal31f21802020-06-12 05:38:46 +0000278 logger.Debugw(ctx, "received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
279 if responseBody, err := decodeResponse(ctx, msg); err != nil {
Scott Bakerb9635992020-03-11 21:11:28 -0700280 chnl <- NewResponse(RpcReply, err, nil)
281 } else {
282 if responseBody.Success {
283 chnl <- NewResponse(RpcReply, nil, responseBody.Result)
284 } else {
285 // response body contains an error
286 unpackErr := &ic.Error{}
287 if err := ptypes.UnmarshalAny(responseBody.Result, unpackErr); err != nil {
288 chnl <- NewResponse(RpcReply, err, nil)
289 } else {
290 chnl <- NewResponse(RpcReply, status.Error(codes.Internal, unpackErr.Reason), nil)
291 }
292 }
293 }
294 case <-ctx.Done():
Rohan Agrawal31f21802020-06-12 05:38:46 +0000295 logger.Errorw(ctx, "context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
Girish Kumardef46fc2020-08-05 18:20:11 +0000296 log.MarkSpanError(ctx, errors.New("context-cancelled"))
Scott Bakerb9635992020-03-11 21:11:28 -0700297 err := status.Error(codes.DeadlineExceeded, ctx.Err().Error())
298 chnl <- NewResponse(RpcTimeout, err, nil)
299 case <-kp.doneCh:
300 chnl <- NewResponse(RpcSystemClosing, nil, nil)
Rohan Agrawal31f21802020-06-12 05:38:46 +0000301 logger.Warnw(ctx, "received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
Scott Bakerb9635992020-03-11 21:11:28 -0700302 }
303 }()
304 return chnl
305}
306
Girish Kumardef46fc2020-08-05 18:20:11 +0000307// Method to extract Open-tracing Span from Context and serialize it for transport over Kafka embedded as a additional argument.
308// Additional argument is injected using key as "span" and value as Span marshalled into a byte slice
309//
310// The span name is automatically constructed using the RPC name with following convention (<rpc-name> represents name of invoked method):
311// - RPC invoked in Sync manner (WaitForResponse=true) : kafka-rpc-<rpc-name>
312// - RPC invoked in Async manner (WaitForResponse=false) : kafka-async-rpc-<rpc-name>
313// - Inter Adapter RPC invoked in Sync manner (WaitForResponse=true) : kafka-inter-adapter-rpc-<rpc-name>
314// - Inter Adapter RPC invoked in Async manner (WaitForResponse=false) : kafka-inter-adapter-async-rpc-<rpc-name>
315func (kp *interContainerProxy) embedSpanAsArg(ctx context.Context, rpc string, isAsync bool) ([]KVArg, opentracing.Span, context.Context) {
316 var err error
317 var newCtx context.Context
318 var spanToInject opentracing.Span
319
Girish Gowdra84156d22021-07-08 14:59:36 -0700320 if !log.GetGlobalLFM().GetLogCorrelationStatus() && !log.GetGlobalLFM().GetTracePublishingStatus() {
321 // if both log correlation and trace publishing is disable do not generate the span
322 logger.Debugw(ctx, "not-embedding-span-in-KVArg-", log.Fields{"rpc": rpc,
323 "log-correlation-status": log.GetGlobalLFM().GetLogCorrelationStatus(), "trace-publishing-status": log.GetGlobalLFM().GetTracePublishingStatus()})
324 return nil, opentracing.GlobalTracer().StartSpan(rpc), ctx
325 }
326
Girish Kumardef46fc2020-08-05 18:20:11 +0000327 var spanName strings.Builder
328 spanName.WriteString("kafka-")
329
330 // In case of inter adapter message, use Msg Type for constructing RPC name
331 if rpc == "process_inter_adapter_message" {
332 if msgType, ok := ctx.Value("inter-adapter-msg-type").(ic.InterAdapterMessageType_Types); ok {
333 spanName.WriteString("inter-adapter-")
334 rpc = msgType.String()
335 }
336 }
337
338 if isAsync {
339 spanName.WriteString("async-rpc-")
340 } else {
341 spanName.WriteString("rpc-")
342 }
343 spanName.WriteString(rpc)
344
345 if isAsync {
346 spanToInject, newCtx = log.CreateAsyncSpan(ctx, spanName.String())
347 } else {
348 spanToInject, newCtx = log.CreateChildSpan(ctx, spanName.String())
349 }
350
351 spanToInject.SetBaggageItem("rpc-span-name", spanName.String())
352
353 textMapCarrier := opentracing.TextMapCarrier(make(map[string]string))
354 if err = opentracing.GlobalTracer().Inject(spanToInject.Context(), opentracing.TextMap, textMapCarrier); err != nil {
355 logger.Warnw(ctx, "unable-to-serialize-span-to-textmap", log.Fields{"span": spanToInject, "error": err})
356 return nil, spanToInject, newCtx
357 }
358
359 var textMapJson []byte
360 if textMapJson, err = json.Marshal(textMapCarrier); err != nil {
361 logger.Warnw(ctx, "unable-to-marshal-textmap-to-json-string", log.Fields{"textMap": textMapCarrier, "error": err})
362 return nil, spanToInject, newCtx
363 }
364
365 spanArg := make([]KVArg, 1)
366 spanArg[0] = KVArg{Key: "span", Value: &ic.StrType{Val: string(textMapJson)}}
367 return spanArg, spanToInject, newCtx
368}
369
khenaidoo43c82122018-11-22 18:38:28 -0500370// InvokeRPC is used to send a request to a given topic
npujar467fe752020-01-16 20:17:45 +0530371func (kp *interContainerProxy) InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
khenaidoobdcb8e02019-03-06 16:28:56 -0500372 waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any) {
khenaidoo43c82122018-11-22 18:38:28 -0500373
Girish Kumardef46fc2020-08-05 18:20:11 +0000374 spanArg, span, ctx := kp.embedSpanAsArg(ctx, rpc, false)
375 if spanArg != nil {
376 kvArgs = append(kvArgs, &spanArg[0])
377 }
Girish Gowdra84156d22021-07-08 14:59:36 -0700378
Girish Kumardef46fc2020-08-05 18:20:11 +0000379 defer span.Finish()
380
Girish Gowdra84156d22021-07-08 14:59:36 -0700381 logger.Debugw(ctx, "InvokeRPC", log.Fields{"rpc": rpc, "key": key, "kvArgs": kvArgs})
382
khenaidoo43c82122018-11-22 18:38:28 -0500383 // If a replyToTopic is provided then we use it, otherwise just use the default toTopic. The replyToTopic is
384 // typically the device ID.
385 responseTopic := replyToTopic
386 if responseTopic == nil {
npujar467fe752020-01-16 20:17:45 +0530387 responseTopic = kp.defaultTopic
khenaidoo43c82122018-11-22 18:38:28 -0500388 }
389
khenaidooabad44c2018-08-03 16:58:35 -0400390 // Encode the request
Rohan Agrawal31f21802020-06-12 05:38:46 +0000391 protoRequest, err := encodeRequest(ctx, rpc, toTopic, responseTopic, key, kvArgs...)
khenaidooabad44c2018-08-03 16:58:35 -0400392 if err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000393 logger.Warnw(ctx, "cannot-format-request", log.Fields{"rpc": rpc, "error": err})
Girish Kumardef46fc2020-08-05 18:20:11 +0000394 log.MarkSpanError(ctx, errors.New("cannot-format-request"))
khenaidooabad44c2018-08-03 16:58:35 -0400395 return false, nil
396 }
397
398 // Subscribe for response, if needed, before sending request
khenaidoo79232702018-12-04 11:00:41 -0500399 var ch <-chan *ic.InterContainerMessage
khenaidooabad44c2018-08-03 16:58:35 -0400400 if waitForResponse {
401 var err error
Rohan Agrawal31f21802020-06-12 05:38:46 +0000402 if ch, err = kp.subscribeForResponse(ctx, *responseTopic, protoRequest.Header.Id); err != nil {
403 logger.Errorw(ctx, "failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
khenaidooabad44c2018-08-03 16:58:35 -0400404 }
405 }
406
khenaidoo43c82122018-11-22 18:38:28 -0500407 // Send request - if the topic is formatted with a device Id then we will send the request using a
408 // specific key, hence ensuring a single partition is used to publish the request. This ensures that the
409 // subscriber on that topic will receive the request in the order it was sent. The key used is the deviceId.
khenaidoobdcb8e02019-03-06 16:28:56 -0500410 //key := GetDeviceIdFromTopic(*toTopic)
Rohan Agrawal31f21802020-06-12 05:38:46 +0000411 logger.Debugw(ctx, "sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000412 go func() {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000413 if err := kp.kafkaClient.Send(ctx, protoRequest, toTopic, key); err != nil {
Girish Kumardef46fc2020-08-05 18:20:11 +0000414 log.MarkSpanError(ctx, errors.New("send-failed"))
Rohan Agrawal31f21802020-06-12 05:38:46 +0000415 logger.Errorw(ctx, "send-failed", log.Fields{
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000416 "topic": toTopic,
417 "key": key,
418 "error": err})
419 }
420 }()
khenaidooabad44c2018-08-03 16:58:35 -0400421
422 if waitForResponse {
khenaidoob9203542018-09-17 22:56:37 -0400423 // Create a child context based on the parent context, if any
khenaidooabad44c2018-08-03 16:58:35 -0400424 var cancel context.CancelFunc
khenaidoob9203542018-09-17 22:56:37 -0400425 childCtx := context.Background()
khenaidooabad44c2018-08-03 16:58:35 -0400426 if ctx == nil {
427 ctx, cancel = context.WithTimeout(context.Background(), DefaultRequestTimeout*time.Millisecond)
khenaidoob9203542018-09-17 22:56:37 -0400428 } else {
429 childCtx, cancel = context.WithTimeout(ctx, DefaultRequestTimeout*time.Millisecond)
khenaidooabad44c2018-08-03 16:58:35 -0400430 }
khenaidoob9203542018-09-17 22:56:37 -0400431 defer cancel()
khenaidooabad44c2018-08-03 16:58:35 -0400432
433 // Wait for response as well as timeout or cancellation
434 // Remove the subscription for a response on return
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000435 defer func() {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000436 if err := kp.unSubscribeForResponse(ctx, protoRequest.Header.Id); err != nil {
437 logger.Errorw(ctx, "response-unsubscribe-failed", log.Fields{
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000438 "id": protoRequest.Header.Id,
439 "error": err})
440 }
441 }()
khenaidooabad44c2018-08-03 16:58:35 -0400442 select {
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500443 case msg, ok := <-ch:
444 if !ok {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000445 logger.Warnw(ctx, "channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
Girish Kumardef46fc2020-08-05 18:20:11 +0000446 log.MarkSpanError(ctx, errors.New("channel-closed"))
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500447 protoError := &ic.Error{Reason: "channel-closed"}
448 var marshalledArg *any.Any
449 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
450 return false, nil // Should never happen
451 }
452 return false, marshalledArg
453 }
Rohan Agrawal31f21802020-06-12 05:38:46 +0000454 logger.Debugw(ctx, "received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
khenaidoo79232702018-12-04 11:00:41 -0500455 var responseBody *ic.InterContainerResponseBody
khenaidooabad44c2018-08-03 16:58:35 -0400456 var err error
Rohan Agrawal31f21802020-06-12 05:38:46 +0000457 if responseBody, err = decodeResponse(ctx, msg); err != nil {
458 logger.Errorw(ctx, "decode-response-error", log.Fields{"error": err})
npujar467fe752020-01-16 20:17:45 +0530459 // FIXME we should return something
khenaidooabad44c2018-08-03 16:58:35 -0400460 }
461 return responseBody.Success, responseBody.Result
462 case <-ctx.Done():
Rohan Agrawal31f21802020-06-12 05:38:46 +0000463 logger.Debugw(ctx, "context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
Girish Kumardef46fc2020-08-05 18:20:11 +0000464 log.MarkSpanError(ctx, errors.New("context-cancelled"))
khenaidooabad44c2018-08-03 16:58:35 -0400465 // pack the error as proto any type
npujar467fe752020-01-16 20:17:45 +0530466 protoError := &ic.Error{Reason: ctx.Err().Error(), Code: ic.ErrorCode_DEADLINE_EXCEEDED}
467
khenaidooabad44c2018-08-03 16:58:35 -0400468 var marshalledArg *any.Any
469 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
470 return false, nil // Should never happen
471 }
472 return false, marshalledArg
khenaidoob9203542018-09-17 22:56:37 -0400473 case <-childCtx.Done():
Rohan Agrawal31f21802020-06-12 05:38:46 +0000474 logger.Debugw(ctx, "context-cancelled", log.Fields{"rpc": rpc, "ctx": childCtx.Err()})
Girish Kumardef46fc2020-08-05 18:20:11 +0000475 log.MarkSpanError(ctx, errors.New("context-cancelled"))
khenaidoob9203542018-09-17 22:56:37 -0400476 // pack the error as proto any type
npujar467fe752020-01-16 20:17:45 +0530477 protoError := &ic.Error{Reason: childCtx.Err().Error(), Code: ic.ErrorCode_DEADLINE_EXCEEDED}
478
khenaidoob9203542018-09-17 22:56:37 -0400479 var marshalledArg *any.Any
480 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
481 return false, nil // Should never happen
482 }
483 return false, marshalledArg
khenaidooabad44c2018-08-03 16:58:35 -0400484 case <-kp.doneCh:
Rohan Agrawal31f21802020-06-12 05:38:46 +0000485 logger.Infow(ctx, "received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
khenaidooabad44c2018-08-03 16:58:35 -0400486 return true, nil
487 }
488 }
489 return true, nil
490}
491
khenaidoo43c82122018-11-22 18:38:28 -0500492// SubscribeWithRequestHandlerInterface allows a caller to assign a target object to be invoked automatically
khenaidooabad44c2018-08-03 16:58:35 -0400493// when a message is received on a given topic
Rohan Agrawal31f21802020-06-12 05:38:46 +0000494func (kp *interContainerProxy) SubscribeWithRequestHandlerInterface(ctx context.Context, topic Topic, handler interface{}) error {
khenaidooabad44c2018-08-03 16:58:35 -0400495
496 // Subscribe to receive messages for that topic
khenaidoo79232702018-12-04 11:00:41 -0500497 var ch <-chan *ic.InterContainerMessage
khenaidooabad44c2018-08-03 16:58:35 -0400498 var err error
Rohan Agrawal31f21802020-06-12 05:38:46 +0000499 if ch, err = kp.kafkaClient.Subscribe(ctx, &topic); err != nil {
khenaidoo43c82122018-11-22 18:38:28 -0500500 //if ch, err = kp.Subscribe(topic); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000501 logger.Errorw(ctx, "failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
Abhilash S.L90cd9552019-07-18 17:30:29 +0530502 return err
khenaidooabad44c2018-08-03 16:58:35 -0400503 }
khenaidoo43c82122018-11-22 18:38:28 -0500504
505 kp.defaultRequestHandlerInterface = handler
506 kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: handler, ch: ch})
khenaidooabad44c2018-08-03 16:58:35 -0400507 // Launch a go routine to receive and process kafka messages
Rohan Agrawal31f21802020-06-12 05:38:46 +0000508 go kp.waitForMessages(ctx, ch, topic, handler)
khenaidooabad44c2018-08-03 16:58:35 -0400509
510 return nil
511}
512
khenaidoo43c82122018-11-22 18:38:28 -0500513// SubscribeWithDefaultRequestHandler allows a caller to add a topic to an existing target object to be invoked automatically
514// when a message is received on a given topic. So far there is only 1 target registered per microservice
Rohan Agrawal31f21802020-06-12 05:38:46 +0000515func (kp *interContainerProxy) SubscribeWithDefaultRequestHandler(ctx context.Context, topic Topic, initialOffset int64) error {
khenaidoo43c82122018-11-22 18:38:28 -0500516 // Subscribe to receive messages for that topic
khenaidoo79232702018-12-04 11:00:41 -0500517 var ch <-chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -0500518 var err error
Rohan Agrawal31f21802020-06-12 05:38:46 +0000519 if ch, err = kp.kafkaClient.Subscribe(ctx, &topic, &KVArg{Key: Offset, Value: initialOffset}); err != nil {
520 logger.Errorw(ctx, "failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
khenaidooca301322019-01-09 23:06:32 -0500521 return err
khenaidoo43c82122018-11-22 18:38:28 -0500522 }
523 kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: kp.defaultRequestHandlerInterface, ch: ch})
524
525 // Launch a go routine to receive and process kafka messages
Rohan Agrawal31f21802020-06-12 05:38:46 +0000526 go kp.waitForMessages(ctx, ch, topic, kp.defaultRequestHandlerInterface)
khenaidoo43c82122018-11-22 18:38:28 -0500527
khenaidooabad44c2018-08-03 16:58:35 -0400528 return nil
529}
530
Rohan Agrawal31f21802020-06-12 05:38:46 +0000531func (kp *interContainerProxy) UnSubscribeFromRequestHandler(ctx context.Context, topic Topic) error {
532 return kp.deleteFromTopicRequestHandlerChannelMap(ctx, topic.Name)
khenaidoo43c82122018-11-22 18:38:28 -0500533}
534
Rohan Agrawal31f21802020-06-12 05:38:46 +0000535func (kp *interContainerProxy) deleteFromTopicResponseChannelMap(ctx context.Context, topic string) error {
khenaidoo43c82122018-11-22 18:38:28 -0500536 kp.lockTopicResponseChannelMap.Lock()
537 defer kp.lockTopicResponseChannelMap.Unlock()
538 if _, exist := kp.topicToResponseChannelMap[topic]; exist {
539 // Unsubscribe to this topic first - this will close the subscribed channel
540 var err error
Rohan Agrawal31f21802020-06-12 05:38:46 +0000541 if err = kp.kafkaClient.UnSubscribe(ctx, &Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
542 logger.Errorw(ctx, "unsubscribing-error", log.Fields{"topic": topic})
khenaidoo43c82122018-11-22 18:38:28 -0500543 }
544 delete(kp.topicToResponseChannelMap, topic)
545 return err
546 } else {
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000547 return fmt.Errorf("%s-Topic-not-found", topic)
khenaidooabad44c2018-08-03 16:58:35 -0400548 }
549}
550
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000551// nolint: unused
Rohan Agrawal31f21802020-06-12 05:38:46 +0000552func (kp *interContainerProxy) deleteAllTopicResponseChannelMap(ctx context.Context) error {
553 logger.Debug(ctx, "delete-all-topic-response-channel")
khenaidoo43c82122018-11-22 18:38:28 -0500554 kp.lockTopicResponseChannelMap.Lock()
555 defer kp.lockTopicResponseChannelMap.Unlock()
Scott Baker0e78ba22020-02-24 17:58:47 -0800556 var unsubscribeFailTopics []string
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000557 for topic := range kp.topicToResponseChannelMap {
khenaidoo43c82122018-11-22 18:38:28 -0500558 // Unsubscribe to this topic first - this will close the subscribed channel
Rohan Agrawal31f21802020-06-12 05:38:46 +0000559 if err := kp.kafkaClient.UnSubscribe(ctx, &Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
Scott Baker0e78ba22020-02-24 17:58:47 -0800560 unsubscribeFailTopics = append(unsubscribeFailTopics, topic)
Rohan Agrawal31f21802020-06-12 05:38:46 +0000561 logger.Errorw(ctx, "unsubscribing-error", log.Fields{"topic": topic, "error": err})
Scott Baker0e78ba22020-02-24 17:58:47 -0800562 // Do not return. Continue to try to unsubscribe to other topics.
563 } else {
564 // Only delete from channel map if successfully unsubscribed.
565 delete(kp.topicToResponseChannelMap, topic)
khenaidoo43c82122018-11-22 18:38:28 -0500566 }
khenaidooabad44c2018-08-03 16:58:35 -0400567 }
Scott Baker0e78ba22020-02-24 17:58:47 -0800568 if len(unsubscribeFailTopics) > 0 {
569 return fmt.Errorf("unsubscribe-errors: %v", unsubscribeFailTopics)
570 }
571 return nil
khenaidooabad44c2018-08-03 16:58:35 -0400572}
573
npujar467fe752020-01-16 20:17:45 +0530574func (kp *interContainerProxy) addToTopicRequestHandlerChannelMap(topic string, arg *requestHandlerChannel) {
khenaidoo43c82122018-11-22 18:38:28 -0500575 kp.lockTopicRequestHandlerChannelMap.Lock()
576 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
577 if _, exist := kp.topicToRequestHandlerChannelMap[topic]; !exist {
578 kp.topicToRequestHandlerChannelMap[topic] = arg
khenaidooabad44c2018-08-03 16:58:35 -0400579 }
khenaidooabad44c2018-08-03 16:58:35 -0400580}
581
Rohan Agrawal31f21802020-06-12 05:38:46 +0000582func (kp *interContainerProxy) deleteFromTopicRequestHandlerChannelMap(ctx context.Context, topic string) error {
khenaidoo43c82122018-11-22 18:38:28 -0500583 kp.lockTopicRequestHandlerChannelMap.Lock()
584 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
585 if _, exist := kp.topicToRequestHandlerChannelMap[topic]; exist {
586 // Close the kafka client client first by unsubscribing to this topic
Rohan Agrawal31f21802020-06-12 05:38:46 +0000587 if err := kp.kafkaClient.UnSubscribe(ctx, &Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000588 return err
589 }
khenaidoo43c82122018-11-22 18:38:28 -0500590 delete(kp.topicToRequestHandlerChannelMap, topic)
khenaidooabad44c2018-08-03 16:58:35 -0400591 return nil
khenaidoo43c82122018-11-22 18:38:28 -0500592 } else {
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000593 return fmt.Errorf("%s-Topic-not-found", topic)
khenaidooabad44c2018-08-03 16:58:35 -0400594 }
khenaidooabad44c2018-08-03 16:58:35 -0400595}
596
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000597// nolint: unused
Rohan Agrawal31f21802020-06-12 05:38:46 +0000598func (kp *interContainerProxy) deleteAllTopicRequestHandlerChannelMap(ctx context.Context) error {
599 logger.Debug(ctx, "delete-all-topic-request-channel")
khenaidoo43c82122018-11-22 18:38:28 -0500600 kp.lockTopicRequestHandlerChannelMap.Lock()
601 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
Scott Baker0e78ba22020-02-24 17:58:47 -0800602 var unsubscribeFailTopics []string
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000603 for topic := range kp.topicToRequestHandlerChannelMap {
khenaidoo43c82122018-11-22 18:38:28 -0500604 // Close the kafka client client first by unsubscribing to this topic
Rohan Agrawal31f21802020-06-12 05:38:46 +0000605 if err := kp.kafkaClient.UnSubscribe(ctx, &Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
Scott Baker0e78ba22020-02-24 17:58:47 -0800606 unsubscribeFailTopics = append(unsubscribeFailTopics, topic)
Rohan Agrawal31f21802020-06-12 05:38:46 +0000607 logger.Errorw(ctx, "unsubscribing-error", log.Fields{"topic": topic, "error": err})
Scott Baker0e78ba22020-02-24 17:58:47 -0800608 // Do not return. Continue to try to unsubscribe to other topics.
609 } else {
610 // Only delete from channel map if successfully unsubscribed.
611 delete(kp.topicToRequestHandlerChannelMap, topic)
khenaidoo43c82122018-11-22 18:38:28 -0500612 }
khenaidoo43c82122018-11-22 18:38:28 -0500613 }
Scott Baker0e78ba22020-02-24 17:58:47 -0800614 if len(unsubscribeFailTopics) > 0 {
615 return fmt.Errorf("unsubscribe-errors: %v", unsubscribeFailTopics)
616 }
617 return nil
khenaidoo43c82122018-11-22 18:38:28 -0500618}
619
npujar467fe752020-01-16 20:17:45 +0530620func (kp *interContainerProxy) addToTransactionIdToChannelMap(id string, topic *Topic, arg chan *ic.InterContainerMessage) {
khenaidooabad44c2018-08-03 16:58:35 -0400621 kp.lockTransactionIdToChannelMap.Lock()
622 defer kp.lockTransactionIdToChannelMap.Unlock()
623 if _, exist := kp.transactionIdToChannelMap[id]; !exist {
khenaidoo43c82122018-11-22 18:38:28 -0500624 kp.transactionIdToChannelMap[id] = &transactionChannel{topic: topic, ch: arg}
khenaidooabad44c2018-08-03 16:58:35 -0400625 }
626}
627
npujar467fe752020-01-16 20:17:45 +0530628func (kp *interContainerProxy) deleteFromTransactionIdToChannelMap(id string) {
khenaidooabad44c2018-08-03 16:58:35 -0400629 kp.lockTransactionIdToChannelMap.Lock()
630 defer kp.lockTransactionIdToChannelMap.Unlock()
khenaidoo43c82122018-11-22 18:38:28 -0500631 if transChannel, exist := kp.transactionIdToChannelMap[id]; exist {
632 // Close the channel first
633 close(transChannel.ch)
khenaidooabad44c2018-08-03 16:58:35 -0400634 delete(kp.transactionIdToChannelMap, id)
635 }
636}
637
npujar467fe752020-01-16 20:17:45 +0530638func (kp *interContainerProxy) deleteTopicTransactionIdToChannelMap(id string) {
khenaidoo43c82122018-11-22 18:38:28 -0500639 kp.lockTransactionIdToChannelMap.Lock()
640 defer kp.lockTransactionIdToChannelMap.Unlock()
641 for key, value := range kp.transactionIdToChannelMap {
642 if value.topic.Name == id {
643 close(value.ch)
644 delete(kp.transactionIdToChannelMap, key)
khenaidooabad44c2018-08-03 16:58:35 -0400645 }
646 }
khenaidooabad44c2018-08-03 16:58:35 -0400647}
648
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000649// nolint: unused
Rohan Agrawal31f21802020-06-12 05:38:46 +0000650func (kp *interContainerProxy) deleteAllTransactionIdToChannelMap(ctx context.Context) {
651 logger.Debug(ctx, "delete-all-transaction-id-channel-map")
khenaidoo43c82122018-11-22 18:38:28 -0500652 kp.lockTransactionIdToChannelMap.Lock()
653 defer kp.lockTransactionIdToChannelMap.Unlock()
654 for key, value := range kp.transactionIdToChannelMap {
655 close(value.ch)
656 delete(kp.transactionIdToChannelMap, key)
khenaidooabad44c2018-08-03 16:58:35 -0400657 }
khenaidooabad44c2018-08-03 16:58:35 -0400658}
659
Rohan Agrawal31f21802020-06-12 05:38:46 +0000660func (kp *interContainerProxy) DeleteTopic(ctx context.Context, topic Topic) error {
khenaidoo43c82122018-11-22 18:38:28 -0500661 // If we have any consumers on that topic we need to close them
Rohan Agrawal31f21802020-06-12 05:38:46 +0000662 if err := kp.deleteFromTopicResponseChannelMap(ctx, topic.Name); err != nil {
663 logger.Errorw(ctx, "delete-from-topic-responsechannelmap-failed", log.Fields{"error": err})
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500664 }
Rohan Agrawal31f21802020-06-12 05:38:46 +0000665 if err := kp.deleteFromTopicRequestHandlerChannelMap(ctx, topic.Name); err != nil {
666 logger.Errorw(ctx, "delete-from-topic-requesthandlerchannelmap-failed", log.Fields{"error": err})
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500667 }
khenaidoo43c82122018-11-22 18:38:28 -0500668 kp.deleteTopicTransactionIdToChannelMap(topic.Name)
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500669
Rohan Agrawal31f21802020-06-12 05:38:46 +0000670 return kp.kafkaClient.DeleteTopic(ctx, &topic)
khenaidoo43c82122018-11-22 18:38:28 -0500671}
672
Rohan Agrawal31f21802020-06-12 05:38:46 +0000673func encodeReturnedValue(ctx context.Context, returnedVal interface{}) (*any.Any, error) {
khenaidooabad44c2018-08-03 16:58:35 -0400674 // Encode the response argument - needs to be a proto message
675 if returnedVal == nil {
676 return nil, nil
677 }
678 protoValue, ok := returnedVal.(proto.Message)
679 if !ok {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000680 logger.Warnw(ctx, "response-value-not-proto-message", log.Fields{"error": ok, "returnVal": returnedVal})
khenaidooabad44c2018-08-03 16:58:35 -0400681 err := errors.New("response-value-not-proto-message")
682 return nil, err
683 }
684
685 // Marshal the returned value, if any
686 var marshalledReturnedVal *any.Any
687 var err error
688 if marshalledReturnedVal, err = ptypes.MarshalAny(protoValue); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000689 logger.Warnw(ctx, "cannot-marshal-returned-val", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400690 return nil, err
691 }
692 return marshalledReturnedVal, nil
693}
694
Rohan Agrawal31f21802020-06-12 05:38:46 +0000695func encodeDefaultFailedResponse(ctx context.Context, request *ic.InterContainerMessage) *ic.InterContainerMessage {
khenaidoo79232702018-12-04 11:00:41 -0500696 responseHeader := &ic.Header{
khenaidooabad44c2018-08-03 16:58:35 -0400697 Id: request.Header.Id,
khenaidoo79232702018-12-04 11:00:41 -0500698 Type: ic.MessageType_RESPONSE,
khenaidooabad44c2018-08-03 16:58:35 -0400699 FromTopic: request.Header.ToTopic,
700 ToTopic: request.Header.FromTopic,
Scott Baker504b4802020-04-17 10:12:20 -0700701 Timestamp: ptypes.TimestampNow(),
khenaidooabad44c2018-08-03 16:58:35 -0400702 }
khenaidoo79232702018-12-04 11:00:41 -0500703 responseBody := &ic.InterContainerResponseBody{
khenaidooabad44c2018-08-03 16:58:35 -0400704 Success: false,
705 Result: nil,
706 }
707 var marshalledResponseBody *any.Any
708 var err error
709 // Error should never happen here
710 if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000711 logger.Warnw(ctx, "cannot-marshal-failed-response-body", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400712 }
713
khenaidoo79232702018-12-04 11:00:41 -0500714 return &ic.InterContainerMessage{
khenaidooabad44c2018-08-03 16:58:35 -0400715 Header: responseHeader,
716 Body: marshalledResponseBody,
717 }
718
719}
720
721//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
722//or an error on failure
Rohan Agrawal31f21802020-06-12 05:38:46 +0000723func encodeResponse(ctx context.Context, request *ic.InterContainerMessage, success bool, returnedValues ...interface{}) (*ic.InterContainerMessage, error) {
724 //logger.Debugw(ctx, "encodeResponse", log.Fields{"success": success, "returnedValues": returnedValues})
khenaidoo79232702018-12-04 11:00:41 -0500725 responseHeader := &ic.Header{
khenaidooabad44c2018-08-03 16:58:35 -0400726 Id: request.Header.Id,
khenaidoo79232702018-12-04 11:00:41 -0500727 Type: ic.MessageType_RESPONSE,
khenaidooabad44c2018-08-03 16:58:35 -0400728 FromTopic: request.Header.ToTopic,
729 ToTopic: request.Header.FromTopic,
khenaidoo2c6a0992019-04-29 13:46:56 -0400730 KeyTopic: request.Header.KeyTopic,
Scott Baker504b4802020-04-17 10:12:20 -0700731 Timestamp: ptypes.TimestampNow(),
khenaidooabad44c2018-08-03 16:58:35 -0400732 }
733
734 // Go over all returned values
735 var marshalledReturnedVal *any.Any
736 var err error
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000737
738 // for now we support only 1 returned value - (excluding the error)
739 if len(returnedValues) > 0 {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000740 if marshalledReturnedVal, err = encodeReturnedValue(ctx, returnedValues[0]); err != nil {
741 logger.Warnw(ctx, "cannot-marshal-response-body", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400742 }
khenaidooabad44c2018-08-03 16:58:35 -0400743 }
744
khenaidoo79232702018-12-04 11:00:41 -0500745 responseBody := &ic.InterContainerResponseBody{
khenaidooabad44c2018-08-03 16:58:35 -0400746 Success: success,
747 Result: marshalledReturnedVal,
748 }
749
750 // Marshal the response body
751 var marshalledResponseBody *any.Any
752 if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000753 logger.Warnw(ctx, "cannot-marshal-response-body", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400754 return nil, err
755 }
756
khenaidoo79232702018-12-04 11:00:41 -0500757 return &ic.InterContainerMessage{
khenaidooabad44c2018-08-03 16:58:35 -0400758 Header: responseHeader,
759 Body: marshalledResponseBody,
760 }, nil
761}
762
Rohan Agrawal31f21802020-06-12 05:38:46 +0000763func CallFuncByName(ctx context.Context, myClass interface{}, funcName string, params ...interface{}) (out []reflect.Value, err error) {
khenaidooabad44c2018-08-03 16:58:35 -0400764 myClassValue := reflect.ValueOf(myClass)
khenaidoo19374072018-12-11 11:05:15 -0500765 // Capitalize the first letter in the funcName to workaround the first capital letters required to
766 // invoke a function from a different package
767 funcName = strings.Title(funcName)
khenaidooabad44c2018-08-03 16:58:35 -0400768 m := myClassValue.MethodByName(funcName)
769 if !m.IsValid() {
khenaidoo43c82122018-11-22 18:38:28 -0500770 return make([]reflect.Value, 0), fmt.Errorf("method-not-found \"%s\"", funcName)
khenaidooabad44c2018-08-03 16:58:35 -0400771 }
Rohan Agrawal31f21802020-06-12 05:38:46 +0000772 in := make([]reflect.Value, len(params)+1)
773 in[0] = reflect.ValueOf(ctx)
khenaidooabad44c2018-08-03 16:58:35 -0400774 for i, param := range params {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000775 in[i+1] = reflect.ValueOf(param)
khenaidooabad44c2018-08-03 16:58:35 -0400776 }
777 out = m.Call(in)
778 return
779}
780
Rohan Agrawal31f21802020-06-12 05:38:46 +0000781func (kp *interContainerProxy) addTransactionId(ctx context.Context, transactionId string, currentArgs []*ic.Argument) []*ic.Argument {
khenaidoo297cd252019-02-07 22:10:23 -0500782 arg := &KVArg{
783 Key: TransactionKey,
784 Value: &ic.StrType{Val: transactionId},
785 }
786
787 var marshalledArg *any.Any
788 var err error
789 if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: transactionId}); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000790 logger.Warnw(ctx, "cannot-add-transactionId", log.Fields{"error": err})
khenaidoo297cd252019-02-07 22:10:23 -0500791 return currentArgs
792 }
793 protoArg := &ic.Argument{
794 Key: arg.Key,
795 Value: marshalledArg,
796 }
797 return append(currentArgs, protoArg)
798}
799
Rohan Agrawal31f21802020-06-12 05:38:46 +0000800func (kp *interContainerProxy) addFromTopic(ctx context.Context, fromTopic string, currentArgs []*ic.Argument) []*ic.Argument {
khenaidoo54e0ddf2019-02-27 16:21:33 -0500801 var marshalledArg *any.Any
802 var err error
803 if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: fromTopic}); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000804 logger.Warnw(ctx, "cannot-add-transactionId", log.Fields{"error": err})
khenaidoo54e0ddf2019-02-27 16:21:33 -0500805 return currentArgs
806 }
807 protoArg := &ic.Argument{
808 Key: FromTopic,
809 Value: marshalledArg,
810 }
811 return append(currentArgs, protoArg)
812}
813
Girish Kumardef46fc2020-08-05 18:20:11 +0000814// Method to extract the Span embedded in Kafka RPC request on the receiver side. If span is found embedded in the KV args (with key as "span"),
815// it is de-serialized and injected into the Context to be carried forward by the RPC request processor thread.
816// If no span is found embedded, even then a span is created with name as "kafka-rpc-<rpc-name>" to enrich the Context for RPC calls coming
817// from components currently not sending the span (e.g. openonu adapter)
818func (kp *interContainerProxy) enrichContextWithSpan(ctx context.Context, rpcName string, args []*ic.Argument) (opentracing.Span, context.Context) {
819
820 for _, arg := range args {
821 if arg.Key == "span" {
822 var err error
823 var textMapString ic.StrType
824 if err = ptypes.UnmarshalAny(arg.Value, &textMapString); err != nil {
Girish Gowdra84156d22021-07-08 14:59:36 -0700825 logger.Debug(ctx, "unable-to-unmarshal-kvarg-to-textmap-string", log.Fields{"value": arg.Value})
Girish Kumardef46fc2020-08-05 18:20:11 +0000826 break
827 }
828
829 spanTextMap := make(map[string]string)
830 if err = json.Unmarshal([]byte(textMapString.Val), &spanTextMap); err != nil {
Girish Gowdra84156d22021-07-08 14:59:36 -0700831 logger.Debug(ctx, "unable-to-unmarshal-textmap-from-json-string", log.Fields{"textMapString": textMapString, "error": err})
Girish Kumardef46fc2020-08-05 18:20:11 +0000832 break
833 }
834
835 var spanContext opentracing.SpanContext
836 if spanContext, err = opentracing.GlobalTracer().Extract(opentracing.TextMap, opentracing.TextMapCarrier(spanTextMap)); err != nil {
Girish Gowdra84156d22021-07-08 14:59:36 -0700837 logger.Debug(ctx, "unable-to-deserialize-textmap-to-span", log.Fields{"textMap": spanTextMap, "error": err})
Girish Kumardef46fc2020-08-05 18:20:11 +0000838 break
839 }
840
841 var receivedRpcName string
842 extractBaggage := func(k, v string) bool {
843 if k == "rpc-span-name" {
844 receivedRpcName = v
845 return false
846 }
847 return true
848 }
849
850 spanContext.ForeachBaggageItem(extractBaggage)
851
852 return opentracing.StartSpanFromContext(ctx, receivedRpcName, opentracing.FollowsFrom(spanContext))
853 }
854 }
855
856 // Create new Child Span with rpc as name if no span details were received in kafka arguments
857 var spanName strings.Builder
858 spanName.WriteString("kafka-")
859
860 // In case of inter adapter message, use Msg Type for constructing RPC name
861 if rpcName == "process_inter_adapter_message" {
862 for _, arg := range args {
863 if arg.Key == "msg" {
864 iamsg := ic.InterAdapterMessage{}
865 if err := ptypes.UnmarshalAny(arg.Value, &iamsg); err == nil {
866 spanName.WriteString("inter-adapter-")
867 rpcName = iamsg.Header.Type.String()
868 }
869 }
870 }
871 }
872
873 spanName.WriteString("rpc-")
874 spanName.WriteString(rpcName)
875
876 return opentracing.StartSpanFromContext(ctx, spanName.String())
877}
878
Rohan Agrawal31f21802020-06-12 05:38:46 +0000879func (kp *interContainerProxy) handleMessage(ctx context.Context, msg *ic.InterContainerMessage, targetInterface interface{}) {
khenaidooabad44c2018-08-03 16:58:35 -0400880
khenaidoo43c82122018-11-22 18:38:28 -0500881 // First extract the header to know whether this is a request - responses are handled by a different handler
khenaidoo79232702018-12-04 11:00:41 -0500882 if msg.Header.Type == ic.MessageType_REQUEST {
khenaidooabad44c2018-08-03 16:58:35 -0400883 var out []reflect.Value
884 var err error
885
886 // Get the request body
khenaidoo79232702018-12-04 11:00:41 -0500887 requestBody := &ic.InterContainerRequestBody{}
khenaidooabad44c2018-08-03 16:58:35 -0400888 if err = ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000889 logger.Warnw(ctx, "cannot-unmarshal-request", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400890 } else {
Girish Gowdra84156d22021-07-08 14:59:36 -0700891 logger.Debugw(ctx, "received-request", log.Fields{"rpc": requestBody.Rpc, "header": msg.Header, "args": requestBody.Args})
Girish Kumardef46fc2020-08-05 18:20:11 +0000892 span, ctx := kp.enrichContextWithSpan(ctx, requestBody.Rpc, requestBody.Args)
893 defer span.Finish()
894
khenaidooabad44c2018-08-03 16:58:35 -0400895 // let the callee unpack the arguments as its the only one that knows the real proto type
khenaidoo297cd252019-02-07 22:10:23 -0500896 // Augment the requestBody with the message Id as it will be used in scenarios where cores
897 // are set in pairs and competing
Rohan Agrawal31f21802020-06-12 05:38:46 +0000898 requestBody.Args = kp.addTransactionId(ctx, msg.Header.Id, requestBody.Args)
khenaidoo54e0ddf2019-02-27 16:21:33 -0500899
900 // Augment the requestBody with the From topic name as it will be used in scenarios where a container
901 // needs to send an unsollicited message to the currently requested container
Rohan Agrawal31f21802020-06-12 05:38:46 +0000902 requestBody.Args = kp.addFromTopic(ctx, msg.Header.FromTopic, requestBody.Args)
khenaidoo54e0ddf2019-02-27 16:21:33 -0500903
Rohan Agrawal31f21802020-06-12 05:38:46 +0000904 out, err = CallFuncByName(ctx, targetInterface, requestBody.Rpc, requestBody.Args)
khenaidooabad44c2018-08-03 16:58:35 -0400905 if err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000906 logger.Warn(ctx, err)
khenaidooabad44c2018-08-03 16:58:35 -0400907 }
908 }
909 // Response required?
910 if requestBody.ResponseRequired {
911 // If we already have an error before then just return that
khenaidoo79232702018-12-04 11:00:41 -0500912 var returnError *ic.Error
khenaidooabad44c2018-08-03 16:58:35 -0400913 var returnedValues []interface{}
914 var success bool
915 if err != nil {
khenaidoo79232702018-12-04 11:00:41 -0500916 returnError = &ic.Error{Reason: err.Error()}
khenaidooabad44c2018-08-03 16:58:35 -0400917 returnedValues = make([]interface{}, 1)
918 returnedValues[0] = returnError
919 } else {
khenaidoob9203542018-09-17 22:56:37 -0400920 returnedValues = make([]interface{}, 0)
921 // Check for errors first
922 lastIndex := len(out) - 1
923 if out[lastIndex].Interface() != nil { // Error
khenaidoo09771ef2019-10-11 14:25:02 -0400924 if retError, ok := out[lastIndex].Interface().(error); ok {
925 if retError.Error() == ErrorTransactionNotAcquired.Error() {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000926 logger.Debugw(ctx, "Ignoring request", log.Fields{"error": retError, "txId": msg.Header.Id})
khenaidoo09771ef2019-10-11 14:25:02 -0400927 return // Ignore - process is in competing mode and ignored transaction
928 }
929 returnError = &ic.Error{Reason: retError.Error()}
khenaidoob9203542018-09-17 22:56:37 -0400930 returnedValues = append(returnedValues, returnError)
931 } else { // Should never happen
khenaidoo79232702018-12-04 11:00:41 -0500932 returnError = &ic.Error{Reason: "incorrect-error-returns"}
khenaidoob9203542018-09-17 22:56:37 -0400933 returnedValues = append(returnedValues, returnError)
934 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500935 } else if len(out) == 2 && reflect.ValueOf(out[0].Interface()).IsValid() && reflect.ValueOf(out[0].Interface()).IsNil() {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000936 logger.Warnw(ctx, "Unexpected response of (nil,nil)", log.Fields{"txId": msg.Header.Id})
khenaidoo09771ef2019-10-11 14:25:02 -0400937 return // Ignore - should not happen
khenaidoob9203542018-09-17 22:56:37 -0400938 } else { // Non-error case
939 success = true
940 for idx, val := range out {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000941 //logger.Debugw(ctx, "returned-api-response-loop", log.Fields{"idx": idx, "val": val.Interface()})
khenaidoob9203542018-09-17 22:56:37 -0400942 if idx != lastIndex {
943 returnedValues = append(returnedValues, val.Interface())
khenaidooabad44c2018-08-03 16:58:35 -0400944 }
khenaidooabad44c2018-08-03 16:58:35 -0400945 }
946 }
947 }
948
khenaidoo79232702018-12-04 11:00:41 -0500949 var icm *ic.InterContainerMessage
Rohan Agrawal31f21802020-06-12 05:38:46 +0000950 if icm, err = encodeResponse(ctx, msg, success, returnedValues...); err != nil {
951 logger.Warnw(ctx, "error-encoding-response-returning-failure-result", log.Fields{"error": err})
952 icm = encodeDefaultFailedResponse(ctx, msg)
khenaidooabad44c2018-08-03 16:58:35 -0400953 }
khenaidoo43c82122018-11-22 18:38:28 -0500954 // To preserve ordering of messages, all messages to a given topic are sent to the same partition
955 // by providing a message key. The key is encoded in the topic name. If the deviceId is not
956 // present then the key will be empty, hence all messages for a given topic will be sent to all
957 // partitions.
958 replyTopic := &Topic{Name: msg.Header.FromTopic}
khenaidoobdcb8e02019-03-06 16:28:56 -0500959 key := msg.Header.KeyTopic
Rohan Agrawal31f21802020-06-12 05:38:46 +0000960 logger.Debugw(ctx, "sending-response-to-kafka", log.Fields{"rpc": requestBody.Rpc, "header": icm.Header, "key": key})
khenaidoo43c82122018-11-22 18:38:28 -0500961 // TODO: handle error response.
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000962 go func() {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000963 if err := kp.kafkaClient.Send(ctx, icm, replyTopic, key); err != nil {
964 logger.Errorw(ctx, "send-reply-failed", log.Fields{
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000965 "topic": replyTopic,
966 "key": key,
967 "error": err})
968 }
969 }()
khenaidooabad44c2018-08-03 16:58:35 -0400970 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500971 } else if msg.Header.Type == ic.MessageType_RESPONSE {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000972 logger.Debugw(ctx, "response-received", log.Fields{"msg-header": msg.Header})
973 go kp.dispatchResponse(ctx, msg)
khenaidoo54e0ddf2019-02-27 16:21:33 -0500974 } else {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000975 logger.Warnw(ctx, "unsupported-message-received", log.Fields{"msg-header": msg.Header})
khenaidooabad44c2018-08-03 16:58:35 -0400976 }
977}
978
Rohan Agrawal31f21802020-06-12 05:38:46 +0000979func (kp *interContainerProxy) waitForMessages(ctx context.Context, ch <-chan *ic.InterContainerMessage, topic Topic, targetInterface interface{}) {
khenaidooabad44c2018-08-03 16:58:35 -0400980 // Wait for messages
981 for msg := range ch {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000982 //logger.Debugw(ctx, "request-received", log.Fields{"msg": msg, "topic": topic.Name, "target": targetInterface})
983 go kp.handleMessage(context.Background(), msg, targetInterface)
khenaidooabad44c2018-08-03 16:58:35 -0400984 }
985}
986
Rohan Agrawal31f21802020-06-12 05:38:46 +0000987func (kp *interContainerProxy) dispatchResponse(ctx context.Context, msg *ic.InterContainerMessage) {
khenaidoo1ce37ad2019-03-24 22:07:24 -0400988 kp.lockTransactionIdToChannelMap.RLock()
989 defer kp.lockTransactionIdToChannelMap.RUnlock()
khenaidooabad44c2018-08-03 16:58:35 -0400990 if _, exist := kp.transactionIdToChannelMap[msg.Header.Id]; !exist {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000991 logger.Debugw(ctx, "no-waiting-channel", log.Fields{"transaction": msg.Header.Id})
khenaidooabad44c2018-08-03 16:58:35 -0400992 return
993 }
khenaidoo43c82122018-11-22 18:38:28 -0500994 kp.transactionIdToChannelMap[msg.Header.Id].ch <- msg
khenaidooabad44c2018-08-03 16:58:35 -0400995}
996
khenaidooabad44c2018-08-03 16:58:35 -0400997// subscribeForResponse allows a caller to subscribe to a given topic when waiting for a response.
998// This method is built to prevent all subscribers to receive all messages as is the case of the Subscribe
999// API. There is one response channel waiting for kafka messages before dispatching the message to the
1000// corresponding waiting channel
Rohan Agrawal31f21802020-06-12 05:38:46 +00001001func (kp *interContainerProxy) subscribeForResponse(ctx context.Context, topic Topic, trnsId string) (chan *ic.InterContainerMessage, error) {
1002 logger.Debugw(ctx, "subscribeForResponse", log.Fields{"topic": topic.Name, "trnsid": trnsId})
khenaidooabad44c2018-08-03 16:58:35 -04001003
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001004 // Create a specific channel for this consumers. We cannot use the channel from the kafkaclient as it will
khenaidoo43c82122018-11-22 18:38:28 -05001005 // broadcast any message for this topic to all channels waiting on it.
Scott Bakerb9635992020-03-11 21:11:28 -07001006 // Set channel size to 1 to prevent deadlock, see VOL-2708
1007 ch := make(chan *ic.InterContainerMessage, 1)
khenaidoo43c82122018-11-22 18:38:28 -05001008 kp.addToTransactionIdToChannelMap(trnsId, &topic, ch)
khenaidooabad44c2018-08-03 16:58:35 -04001009
1010 return ch, nil
1011}
1012
Rohan Agrawal31f21802020-06-12 05:38:46 +00001013func (kp *interContainerProxy) unSubscribeForResponse(ctx context.Context, trnsId string) error {
1014 logger.Debugw(ctx, "unsubscribe-for-response", log.Fields{"trnsId": trnsId})
khenaidoo7ff26c72019-01-16 14:55:48 -05001015 kp.deleteFromTransactionIdToChannelMap(trnsId)
khenaidooabad44c2018-08-03 16:58:35 -04001016 return nil
1017}
1018
Rohan Agrawal31f21802020-06-12 05:38:46 +00001019func (kp *interContainerProxy) EnableLivenessChannel(ctx context.Context, enable bool) chan bool {
1020 return kp.kafkaClient.EnableLivenessChannel(ctx, enable)
Scott Bakeree6a0872019-10-29 15:59:52 -07001021}
1022
Rohan Agrawal31f21802020-06-12 05:38:46 +00001023func (kp *interContainerProxy) EnableHealthinessChannel(ctx context.Context, enable bool) chan bool {
1024 return kp.kafkaClient.EnableHealthinessChannel(ctx, enable)
serkant.uluderya2ae470f2020-01-21 11:13:09 -08001025}
1026
Rohan Agrawal31f21802020-06-12 05:38:46 +00001027func (kp *interContainerProxy) SendLiveness(ctx context.Context) error {
1028 return kp.kafkaClient.SendLiveness(ctx)
Scott Bakeree6a0872019-10-29 15:59:52 -07001029}
1030
khenaidooabad44c2018-08-03 16:58:35 -04001031//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
1032//or an error on failure
Rohan Agrawal31f21802020-06-12 05:38:46 +00001033func encodeRequest(ctx context.Context, rpc string, toTopic *Topic, replyTopic *Topic, key string, kvArgs ...*KVArg) (*ic.InterContainerMessage, error) {
khenaidoo79232702018-12-04 11:00:41 -05001034 requestHeader := &ic.Header{
khenaidooabad44c2018-08-03 16:58:35 -04001035 Id: uuid.New().String(),
khenaidoo79232702018-12-04 11:00:41 -05001036 Type: ic.MessageType_REQUEST,
khenaidooabad44c2018-08-03 16:58:35 -04001037 FromTopic: replyTopic.Name,
1038 ToTopic: toTopic.Name,
khenaidoo2c6a0992019-04-29 13:46:56 -04001039 KeyTopic: key,
Scott Baker504b4802020-04-17 10:12:20 -07001040 Timestamp: ptypes.TimestampNow(),
khenaidooabad44c2018-08-03 16:58:35 -04001041 }
khenaidoo79232702018-12-04 11:00:41 -05001042 requestBody := &ic.InterContainerRequestBody{
khenaidooabad44c2018-08-03 16:58:35 -04001043 Rpc: rpc,
1044 ResponseRequired: true,
1045 ReplyToTopic: replyTopic.Name,
1046 }
1047
1048 for _, arg := range kvArgs {
khenaidoo2c6f1672018-09-20 23:14:41 -04001049 if arg == nil {
1050 // In case the caller sends an array with empty args
1051 continue
1052 }
khenaidooabad44c2018-08-03 16:58:35 -04001053 var marshalledArg *any.Any
1054 var err error
1055 // ascertain the value interface type is a proto.Message
1056 protoValue, ok := arg.Value.(proto.Message)
1057 if !ok {
Rohan Agrawal31f21802020-06-12 05:38:46 +00001058 logger.Warnw(ctx, "argument-value-not-proto-message", log.Fields{"error": ok, "Value": arg.Value})
khenaidooabad44c2018-08-03 16:58:35 -04001059 err := errors.New("argument-value-not-proto-message")
1060 return nil, err
1061 }
1062 if marshalledArg, err = ptypes.MarshalAny(protoValue); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +00001063 logger.Warnw(ctx, "cannot-marshal-request", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -04001064 return nil, err
1065 }
khenaidoo79232702018-12-04 11:00:41 -05001066 protoArg := &ic.Argument{
khenaidooabad44c2018-08-03 16:58:35 -04001067 Key: arg.Key,
1068 Value: marshalledArg,
1069 }
1070 requestBody.Args = append(requestBody.Args, protoArg)
1071 }
1072
1073 var marshalledData *any.Any
1074 var err error
1075 if marshalledData, err = ptypes.MarshalAny(requestBody); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +00001076 logger.Warnw(ctx, "cannot-marshal-request", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -04001077 return nil, err
1078 }
khenaidoo79232702018-12-04 11:00:41 -05001079 request := &ic.InterContainerMessage{
khenaidooabad44c2018-08-03 16:58:35 -04001080 Header: requestHeader,
1081 Body: marshalledData,
1082 }
1083 return request, nil
1084}
1085
Rohan Agrawal31f21802020-06-12 05:38:46 +00001086func decodeResponse(ctx context.Context, response *ic.InterContainerMessage) (*ic.InterContainerResponseBody, error) {
khenaidooabad44c2018-08-03 16:58:35 -04001087 // Extract the message body
khenaidoo79232702018-12-04 11:00:41 -05001088 responseBody := ic.InterContainerResponseBody{}
khenaidooabad44c2018-08-03 16:58:35 -04001089 if err := ptypes.UnmarshalAny(response.Body, &responseBody); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +00001090 logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -04001091 return nil, err
1092 }
Rohan Agrawal31f21802020-06-12 05:38:46 +00001093 //logger.Debugw(ctx, "response-decoded-successfully", log.Fields{"response-status": &responseBody.Success})
khenaidooabad44c2018-08-03 16:58:35 -04001094
1095 return &responseBody, nil
1096
1097}