blob: b149e7d876ad90f2be57dfecbbd3a9bab0145455 [file] [log] [blame]
khenaidoobf6e7bb2018-08-14 22:27:29 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
khenaidooabad44c2018-08-03 16:58:35 -040016package kafka
17
18import (
19 "context"
Girish Kumardef46fc2020-08-05 18:20:11 +000020 "encoding/json"
khenaidooabad44c2018-08-03 16:58:35 -040021 "errors"
22 "fmt"
Girish Kumardef46fc2020-08-05 18:20:11 +000023 "google.golang.org/grpc/codes"
24 "google.golang.org/grpc/status"
khenaidooabad44c2018-08-03 16:58:35 -040025 "reflect"
khenaidoo19374072018-12-11 11:05:15 -050026 "strings"
khenaidooabad44c2018-08-03 16:58:35 -040027 "sync"
28 "time"
khenaidooabad44c2018-08-03 16:58:35 -040029
serkant.uluderya2ae470f2020-01-21 11:13:09 -080030 "github.com/golang/protobuf/proto"
31 "github.com/golang/protobuf/ptypes"
32 "github.com/golang/protobuf/ptypes/any"
33 "github.com/google/uuid"
yasin sapli5458a1c2021-06-14 22:24:38 +000034 "github.com/opencord/voltha-lib-go/v5/pkg/log"
Maninderdfadc982020-10-28 14:04:33 +053035 ic "github.com/opencord/voltha-protos/v4/go/inter_container"
Girish Kumardef46fc2020-08-05 18:20:11 +000036 "github.com/opentracing/opentracing-go"
serkant.uluderya2ae470f2020-01-21 11:13:09 -080037)
khenaidooabad44c2018-08-03 16:58:35 -040038
39const (
khenaidoo43c82122018-11-22 18:38:28 -050040 DefaultMaxRetries = 3
Scott Bakerb9635992020-03-11 21:11:28 -070041 DefaultRequestTimeout = 60000 // 60000 milliseconds - to handle a wider latency range
khenaidooabad44c2018-08-03 16:58:35 -040042)
43
khenaidoo297cd252019-02-07 22:10:23 -050044const (
45 TransactionKey = "transactionID"
khenaidoo54e0ddf2019-02-27 16:21:33 -050046 FromTopic = "fromTopic"
khenaidoo297cd252019-02-07 22:10:23 -050047)
48
khenaidoo09771ef2019-10-11 14:25:02 -040049var ErrorTransactionNotAcquired = errors.New("transaction-not-acquired")
50var ErrorTransactionInvalidId = errors.New("transaction-invalid-id")
51
khenaidoo43c82122018-11-22 18:38:28 -050052// requestHandlerChannel represents an interface associated with a channel. Whenever, an event is
53// obtained from that channel, this interface is invoked. This is used to handle
54// async requests into the Core via the kafka messaging bus
55type requestHandlerChannel struct {
56 requesthandlerInterface interface{}
khenaidoo79232702018-12-04 11:00:41 -050057 ch <-chan *ic.InterContainerMessage
khenaidooabad44c2018-08-03 16:58:35 -040058}
59
khenaidoo43c82122018-11-22 18:38:28 -050060// transactionChannel represents a combination of a topic and a channel onto which a response received
61// on the kafka bus will be sent to
62type transactionChannel struct {
63 topic *Topic
khenaidoo79232702018-12-04 11:00:41 -050064 ch chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -050065}
66
npujar467fe752020-01-16 20:17:45 +053067type InterContainerProxy interface {
Rohan Agrawal31f21802020-06-12 05:38:46 +000068 Start(ctx context.Context) error
69 Stop(ctx context.Context)
npujar467fe752020-01-16 20:17:45 +053070 GetDefaultTopic() *Topic
npujar467fe752020-01-16 20:17:45 +053071 InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic, waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any)
Scott Bakerb9635992020-03-11 21:11:28 -070072 InvokeAsyncRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic, waitForResponse bool, key string, kvArgs ...*KVArg) chan *RpcResponse
Rohan Agrawal31f21802020-06-12 05:38:46 +000073 SubscribeWithRequestHandlerInterface(ctx context.Context, topic Topic, handler interface{}) error
74 SubscribeWithDefaultRequestHandler(ctx context.Context, topic Topic, initialOffset int64) error
75 UnSubscribeFromRequestHandler(ctx context.Context, topic Topic) error
76 DeleteTopic(ctx context.Context, topic Topic) error
77 EnableLivenessChannel(ctx context.Context, enable bool) chan bool
78 SendLiveness(ctx context.Context) error
npujar467fe752020-01-16 20:17:45 +053079}
80
81// interContainerProxy represents the messaging proxy
82type interContainerProxy struct {
Neha Sharmad1387da2020-05-07 20:07:28 +000083 kafkaAddress string
npujar467fe752020-01-16 20:17:45 +053084 defaultTopic *Topic
khenaidoo43c82122018-11-22 18:38:28 -050085 defaultRequestHandlerInterface interface{}
86 kafkaClient Client
npujar467fe752020-01-16 20:17:45 +053087 doneCh chan struct{}
88 doneOnce sync.Once
khenaidoo43c82122018-11-22 18:38:28 -050089
90 // This map is used to map a topic to an interface and channel. When a request is received
91 // on that channel (registered to the topic) then that interface is invoked.
92 topicToRequestHandlerChannelMap map[string]*requestHandlerChannel
93 lockTopicRequestHandlerChannelMap sync.RWMutex
94
95 // This map is used to map a channel to a response topic. This channel handles all responses on that
khenaidoo4c1a5bf2018-11-29 15:53:42 -050096 // channel for that topic and forward them to the appropriate consumers channel, using the
khenaidoo43c82122018-11-22 18:38:28 -050097 // transactionIdToChannelMap.
khenaidoo79232702018-12-04 11:00:41 -050098 topicToResponseChannelMap map[string]<-chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -050099 lockTopicResponseChannelMap sync.RWMutex
100
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500101 // This map is used to map a transaction to a consumers channel. This is used whenever a request has been
khenaidoo43c82122018-11-22 18:38:28 -0500102 // sent out and we are waiting for a response.
103 transactionIdToChannelMap map[string]*transactionChannel
khenaidooabad44c2018-08-03 16:58:35 -0400104 lockTransactionIdToChannelMap sync.RWMutex
105}
106
npujar467fe752020-01-16 20:17:45 +0530107type InterContainerProxyOption func(*interContainerProxy)
khenaidooabad44c2018-08-03 16:58:35 -0400108
Neha Sharmad1387da2020-05-07 20:07:28 +0000109func InterContainerAddress(address string) InterContainerProxyOption {
npujar467fe752020-01-16 20:17:45 +0530110 return func(args *interContainerProxy) {
Neha Sharmad1387da2020-05-07 20:07:28 +0000111 args.kafkaAddress = address
khenaidooabad44c2018-08-03 16:58:35 -0400112 }
113}
114
khenaidoo43c82122018-11-22 18:38:28 -0500115func DefaultTopic(topic *Topic) InterContainerProxyOption {
npujar467fe752020-01-16 20:17:45 +0530116 return func(args *interContainerProxy) {
117 args.defaultTopic = topic
khenaidooabad44c2018-08-03 16:58:35 -0400118 }
119}
120
khenaidoo43c82122018-11-22 18:38:28 -0500121func RequestHandlerInterface(handler interface{}) InterContainerProxyOption {
npujar467fe752020-01-16 20:17:45 +0530122 return func(args *interContainerProxy) {
khenaidoo43c82122018-11-22 18:38:28 -0500123 args.defaultRequestHandlerInterface = handler
khenaidooabad44c2018-08-03 16:58:35 -0400124 }
125}
126
khenaidoo43c82122018-11-22 18:38:28 -0500127func MsgClient(client Client) InterContainerProxyOption {
npujar467fe752020-01-16 20:17:45 +0530128 return func(args *interContainerProxy) {
khenaidoo43c82122018-11-22 18:38:28 -0500129 args.kafkaClient = client
130 }
131}
132
npujar467fe752020-01-16 20:17:45 +0530133func newInterContainerProxy(opts ...InterContainerProxyOption) *interContainerProxy {
134 proxy := &interContainerProxy{
Neha Sharmad1387da2020-05-07 20:07:28 +0000135 kafkaAddress: DefaultKafkaAddress,
136 doneCh: make(chan struct{}),
khenaidooabad44c2018-08-03 16:58:35 -0400137 }
138
139 for _, option := range opts {
140 option(proxy)
141 }
142
npujar467fe752020-01-16 20:17:45 +0530143 return proxy
khenaidooabad44c2018-08-03 16:58:35 -0400144}
145
npujar467fe752020-01-16 20:17:45 +0530146func NewInterContainerProxy(opts ...InterContainerProxyOption) InterContainerProxy {
147 return newInterContainerProxy(opts...)
148}
149
Rohan Agrawal31f21802020-06-12 05:38:46 +0000150func (kp *interContainerProxy) Start(ctx context.Context) error {
151 logger.Info(ctx, "Starting-Proxy")
khenaidooabad44c2018-08-03 16:58:35 -0400152
khenaidoo43c82122018-11-22 18:38:28 -0500153 // Kafka MsgClient should already have been created. If not, output fatal error
154 if kp.kafkaClient == nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000155 logger.Fatal(ctx, "kafka-client-not-set")
khenaidoo43c82122018-11-22 18:38:28 -0500156 }
157
khenaidoo43c82122018-11-22 18:38:28 -0500158 // Start the kafka client
Rohan Agrawal31f21802020-06-12 05:38:46 +0000159 if err := kp.kafkaClient.Start(ctx); err != nil {
160 logger.Errorw(ctx, "Cannot-create-kafka-proxy", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400161 return err
162 }
163
khenaidoo43c82122018-11-22 18:38:28 -0500164 // Create the topic to response channel map
khenaidoo79232702018-12-04 11:00:41 -0500165 kp.topicToResponseChannelMap = make(map[string]<-chan *ic.InterContainerMessage)
khenaidoo43c82122018-11-22 18:38:28 -0500166 //
khenaidooabad44c2018-08-03 16:58:35 -0400167 // Create the transactionId to Channel Map
khenaidoo43c82122018-11-22 18:38:28 -0500168 kp.transactionIdToChannelMap = make(map[string]*transactionChannel)
169
170 // Create the topic to request channel map
171 kp.topicToRequestHandlerChannelMap = make(map[string]*requestHandlerChannel)
khenaidooabad44c2018-08-03 16:58:35 -0400172
173 return nil
174}
175
Rohan Agrawal31f21802020-06-12 05:38:46 +0000176func (kp *interContainerProxy) Stop(ctx context.Context) {
177 logger.Info(ctx, "stopping-intercontainer-proxy")
npujar467fe752020-01-16 20:17:45 +0530178 kp.doneOnce.Do(func() { close(kp.doneCh) })
khenaidoo43c82122018-11-22 18:38:28 -0500179 // TODO : Perform cleanup
Rohan Agrawal31f21802020-06-12 05:38:46 +0000180 kp.kafkaClient.Stop(ctx)
181 err := kp.deleteAllTopicRequestHandlerChannelMap(ctx)
Scott Baker0e78ba22020-02-24 17:58:47 -0800182 if err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000183 logger.Errorw(ctx, "failed-delete-all-topic-request-handler-channel-map", log.Fields{"error": err})
Scott Baker0e78ba22020-02-24 17:58:47 -0800184 }
Rohan Agrawal31f21802020-06-12 05:38:46 +0000185 err = kp.deleteAllTopicResponseChannelMap(ctx)
Scott Baker0e78ba22020-02-24 17:58:47 -0800186 if err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000187 logger.Errorw(ctx, "failed-delete-all-topic-response-channel-map", log.Fields{"error": err})
Scott Baker0e78ba22020-02-24 17:58:47 -0800188 }
Rohan Agrawal31f21802020-06-12 05:38:46 +0000189 kp.deleteAllTransactionIdToChannelMap(ctx)
khenaidooabad44c2018-08-03 16:58:35 -0400190}
191
npujar467fe752020-01-16 20:17:45 +0530192func (kp *interContainerProxy) GetDefaultTopic() *Topic {
193 return kp.defaultTopic
194}
195
Scott Bakerb9635992020-03-11 21:11:28 -0700196// InvokeAsyncRPC is used to make an RPC request asynchronously
197func (kp *interContainerProxy) InvokeAsyncRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
198 waitForResponse bool, key string, kvArgs ...*KVArg) chan *RpcResponse {
199
Rohan Agrawal31f21802020-06-12 05:38:46 +0000200 logger.Debugw(ctx, "InvokeAsyncRPC", log.Fields{"rpc": rpc, "key": key})
Girish Kumardef46fc2020-08-05 18:20:11 +0000201
202 spanArg, span, ctx := kp.embedSpanAsArg(ctx, rpc, !waitForResponse)
203 if spanArg != nil {
204 kvArgs = append(kvArgs, &spanArg[0])
205 }
206 defer span.Finish()
207
Scott Bakerb9635992020-03-11 21:11:28 -0700208 // If a replyToTopic is provided then we use it, otherwise just use the default toTopic. The replyToTopic is
209 // typically the device ID.
210 responseTopic := replyToTopic
211 if responseTopic == nil {
212 responseTopic = kp.GetDefaultTopic()
213 }
214
215 chnl := make(chan *RpcResponse)
216
217 go func() {
218
219 // once we're done,
220 // close the response channel
221 defer close(chnl)
222
223 var err error
224 var protoRequest *ic.InterContainerMessage
225
226 // Encode the request
Rohan Agrawal31f21802020-06-12 05:38:46 +0000227 protoRequest, err = encodeRequest(ctx, rpc, toTopic, responseTopic, key, kvArgs...)
Scott Bakerb9635992020-03-11 21:11:28 -0700228 if err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000229 logger.Warnw(ctx, "cannot-format-request", log.Fields{"rpc": rpc, "error": err})
Girish Kumardef46fc2020-08-05 18:20:11 +0000230 log.MarkSpanError(ctx, errors.New("cannot-format-request"))
Scott Bakerb9635992020-03-11 21:11:28 -0700231 chnl <- NewResponse(RpcFormattingError, err, nil)
232 return
233 }
234
235 // Subscribe for response, if needed, before sending request
236 var ch <-chan *ic.InterContainerMessage
Rohan Agrawal31f21802020-06-12 05:38:46 +0000237 if ch, err = kp.subscribeForResponse(ctx, *responseTopic, protoRequest.Header.Id); err != nil {
238 logger.Errorw(ctx, "failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
Girish Kumardef46fc2020-08-05 18:20:11 +0000239 log.MarkSpanError(ctx, errors.New("failed-to-subscribe-for-response"))
Scott Bakerb9635992020-03-11 21:11:28 -0700240 chnl <- NewResponse(RpcTransportError, err, nil)
241 return
242 }
243
244 // Send request - if the topic is formatted with a device Id then we will send the request using a
245 // specific key, hence ensuring a single partition is used to publish the request. This ensures that the
246 // subscriber on that topic will receive the request in the order it was sent. The key used is the deviceId.
Rohan Agrawal31f21802020-06-12 05:38:46 +0000247 logger.Debugw(ctx, "sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
Scott Bakerb9635992020-03-11 21:11:28 -0700248
249 // if the message is not sent on kafka publish an event an close the channel
Rohan Agrawal31f21802020-06-12 05:38:46 +0000250 if err = kp.kafkaClient.Send(ctx, protoRequest, toTopic, key); err != nil {
Scott Bakerb9635992020-03-11 21:11:28 -0700251 chnl <- NewResponse(RpcTransportError, err, nil)
252 return
253 }
254
255 // if the client is not waiting for a response send the ack and close the channel
256 chnl <- NewResponse(RpcSent, nil, nil)
257 if !waitForResponse {
258 return
259 }
260
261 defer func() {
262 // Remove the subscription for a response on return
Rohan Agrawal31f21802020-06-12 05:38:46 +0000263 if err := kp.unSubscribeForResponse(ctx, protoRequest.Header.Id); err != nil {
264 logger.Warnw(ctx, "invoke-async-rpc-unsubscriber-for-response-failed", log.Fields{"err": err})
Scott Bakerb9635992020-03-11 21:11:28 -0700265 }
266 }()
267
268 // Wait for response as well as timeout or cancellation
269 select {
270 case msg, ok := <-ch:
271 if !ok {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000272 logger.Warnw(ctx, "channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
Girish Kumardef46fc2020-08-05 18:20:11 +0000273 log.MarkSpanError(ctx, errors.New("channel-closed"))
Scott Bakerb9635992020-03-11 21:11:28 -0700274 chnl <- NewResponse(RpcTransportError, status.Error(codes.Aborted, "channel closed"), nil)
275 }
Rohan Agrawal31f21802020-06-12 05:38:46 +0000276 logger.Debugw(ctx, "received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
277 if responseBody, err := decodeResponse(ctx, msg); err != nil {
Scott Bakerb9635992020-03-11 21:11:28 -0700278 chnl <- NewResponse(RpcReply, err, nil)
279 } else {
280 if responseBody.Success {
281 chnl <- NewResponse(RpcReply, nil, responseBody.Result)
282 } else {
283 // response body contains an error
284 unpackErr := &ic.Error{}
285 if err := ptypes.UnmarshalAny(responseBody.Result, unpackErr); err != nil {
286 chnl <- NewResponse(RpcReply, err, nil)
287 } else {
288 chnl <- NewResponse(RpcReply, status.Error(codes.Internal, unpackErr.Reason), nil)
289 }
290 }
291 }
292 case <-ctx.Done():
Rohan Agrawal31f21802020-06-12 05:38:46 +0000293 logger.Errorw(ctx, "context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
Girish Kumardef46fc2020-08-05 18:20:11 +0000294 log.MarkSpanError(ctx, errors.New("context-cancelled"))
Scott Bakerb9635992020-03-11 21:11:28 -0700295 err := status.Error(codes.DeadlineExceeded, ctx.Err().Error())
296 chnl <- NewResponse(RpcTimeout, err, nil)
297 case <-kp.doneCh:
298 chnl <- NewResponse(RpcSystemClosing, nil, nil)
Rohan Agrawal31f21802020-06-12 05:38:46 +0000299 logger.Warnw(ctx, "received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
Scott Bakerb9635992020-03-11 21:11:28 -0700300 }
301 }()
302 return chnl
303}
304
Girish Kumardef46fc2020-08-05 18:20:11 +0000305// Method to extract Open-tracing Span from Context and serialize it for transport over Kafka embedded as a additional argument.
306// Additional argument is injected using key as "span" and value as Span marshalled into a byte slice
307//
308// The span name is automatically constructed using the RPC name with following convention (<rpc-name> represents name of invoked method):
309// - RPC invoked in Sync manner (WaitForResponse=true) : kafka-rpc-<rpc-name>
310// - RPC invoked in Async manner (WaitForResponse=false) : kafka-async-rpc-<rpc-name>
311// - Inter Adapter RPC invoked in Sync manner (WaitForResponse=true) : kafka-inter-adapter-rpc-<rpc-name>
312// - Inter Adapter RPC invoked in Async manner (WaitForResponse=false) : kafka-inter-adapter-async-rpc-<rpc-name>
313func (kp *interContainerProxy) embedSpanAsArg(ctx context.Context, rpc string, isAsync bool) ([]KVArg, opentracing.Span, context.Context) {
314 var err error
315 var newCtx context.Context
316 var spanToInject opentracing.Span
317
318 var spanName strings.Builder
319 spanName.WriteString("kafka-")
320
321 // In case of inter adapter message, use Msg Type for constructing RPC name
322 if rpc == "process_inter_adapter_message" {
323 if msgType, ok := ctx.Value("inter-adapter-msg-type").(ic.InterAdapterMessageType_Types); ok {
324 spanName.WriteString("inter-adapter-")
325 rpc = msgType.String()
326 }
327 }
328
329 if isAsync {
330 spanName.WriteString("async-rpc-")
331 } else {
332 spanName.WriteString("rpc-")
333 }
334 spanName.WriteString(rpc)
335
336 if isAsync {
337 spanToInject, newCtx = log.CreateAsyncSpan(ctx, spanName.String())
338 } else {
339 spanToInject, newCtx = log.CreateChildSpan(ctx, spanName.String())
340 }
341
342 spanToInject.SetBaggageItem("rpc-span-name", spanName.String())
343
344 textMapCarrier := opentracing.TextMapCarrier(make(map[string]string))
345 if err = opentracing.GlobalTracer().Inject(spanToInject.Context(), opentracing.TextMap, textMapCarrier); err != nil {
346 logger.Warnw(ctx, "unable-to-serialize-span-to-textmap", log.Fields{"span": spanToInject, "error": err})
347 return nil, spanToInject, newCtx
348 }
349
350 var textMapJson []byte
351 if textMapJson, err = json.Marshal(textMapCarrier); err != nil {
352 logger.Warnw(ctx, "unable-to-marshal-textmap-to-json-string", log.Fields{"textMap": textMapCarrier, "error": err})
353 return nil, spanToInject, newCtx
354 }
355
356 spanArg := make([]KVArg, 1)
357 spanArg[0] = KVArg{Key: "span", Value: &ic.StrType{Val: string(textMapJson)}}
358 return spanArg, spanToInject, newCtx
359}
360
khenaidoo43c82122018-11-22 18:38:28 -0500361// InvokeRPC is used to send a request to a given topic
npujar467fe752020-01-16 20:17:45 +0530362func (kp *interContainerProxy) InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
khenaidoobdcb8e02019-03-06 16:28:56 -0500363 waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any) {
khenaidoo43c82122018-11-22 18:38:28 -0500364
Girish Kumardef46fc2020-08-05 18:20:11 +0000365 spanArg, span, ctx := kp.embedSpanAsArg(ctx, rpc, false)
366 if spanArg != nil {
367 kvArgs = append(kvArgs, &spanArg[0])
368 }
369 defer span.Finish()
370
khenaidoo43c82122018-11-22 18:38:28 -0500371 // If a replyToTopic is provided then we use it, otherwise just use the default toTopic. The replyToTopic is
372 // typically the device ID.
373 responseTopic := replyToTopic
374 if responseTopic == nil {
npujar467fe752020-01-16 20:17:45 +0530375 responseTopic = kp.defaultTopic
khenaidoo43c82122018-11-22 18:38:28 -0500376 }
377
khenaidooabad44c2018-08-03 16:58:35 -0400378 // Encode the request
Rohan Agrawal31f21802020-06-12 05:38:46 +0000379 protoRequest, err := encodeRequest(ctx, rpc, toTopic, responseTopic, key, kvArgs...)
khenaidooabad44c2018-08-03 16:58:35 -0400380 if err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000381 logger.Warnw(ctx, "cannot-format-request", log.Fields{"rpc": rpc, "error": err})
Girish Kumardef46fc2020-08-05 18:20:11 +0000382 log.MarkSpanError(ctx, errors.New("cannot-format-request"))
khenaidooabad44c2018-08-03 16:58:35 -0400383 return false, nil
384 }
385
386 // Subscribe for response, if needed, before sending request
khenaidoo79232702018-12-04 11:00:41 -0500387 var ch <-chan *ic.InterContainerMessage
khenaidooabad44c2018-08-03 16:58:35 -0400388 if waitForResponse {
389 var err error
Rohan Agrawal31f21802020-06-12 05:38:46 +0000390 if ch, err = kp.subscribeForResponse(ctx, *responseTopic, protoRequest.Header.Id); err != nil {
391 logger.Errorw(ctx, "failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
khenaidooabad44c2018-08-03 16:58:35 -0400392 }
393 }
394
khenaidoo43c82122018-11-22 18:38:28 -0500395 // Send request - if the topic is formatted with a device Id then we will send the request using a
396 // specific key, hence ensuring a single partition is used to publish the request. This ensures that the
397 // subscriber on that topic will receive the request in the order it was sent. The key used is the deviceId.
khenaidoobdcb8e02019-03-06 16:28:56 -0500398 //key := GetDeviceIdFromTopic(*toTopic)
Rohan Agrawal31f21802020-06-12 05:38:46 +0000399 logger.Debugw(ctx, "sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000400 go func() {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000401 if err := kp.kafkaClient.Send(ctx, protoRequest, toTopic, key); err != nil {
Girish Kumardef46fc2020-08-05 18:20:11 +0000402 log.MarkSpanError(ctx, errors.New("send-failed"))
Rohan Agrawal31f21802020-06-12 05:38:46 +0000403 logger.Errorw(ctx, "send-failed", log.Fields{
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000404 "topic": toTopic,
405 "key": key,
406 "error": err})
407 }
408 }()
khenaidooabad44c2018-08-03 16:58:35 -0400409
410 if waitForResponse {
khenaidoob9203542018-09-17 22:56:37 -0400411 // Create a child context based on the parent context, if any
khenaidooabad44c2018-08-03 16:58:35 -0400412 var cancel context.CancelFunc
khenaidoob9203542018-09-17 22:56:37 -0400413 childCtx := context.Background()
khenaidooabad44c2018-08-03 16:58:35 -0400414 if ctx == nil {
415 ctx, cancel = context.WithTimeout(context.Background(), DefaultRequestTimeout*time.Millisecond)
khenaidoob9203542018-09-17 22:56:37 -0400416 } else {
417 childCtx, cancel = context.WithTimeout(ctx, DefaultRequestTimeout*time.Millisecond)
khenaidooabad44c2018-08-03 16:58:35 -0400418 }
khenaidoob9203542018-09-17 22:56:37 -0400419 defer cancel()
khenaidooabad44c2018-08-03 16:58:35 -0400420
421 // Wait for response as well as timeout or cancellation
422 // Remove the subscription for a response on return
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000423 defer func() {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000424 if err := kp.unSubscribeForResponse(ctx, protoRequest.Header.Id); err != nil {
425 logger.Errorw(ctx, "response-unsubscribe-failed", log.Fields{
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000426 "id": protoRequest.Header.Id,
427 "error": err})
428 }
429 }()
khenaidooabad44c2018-08-03 16:58:35 -0400430 select {
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500431 case msg, ok := <-ch:
432 if !ok {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000433 logger.Warnw(ctx, "channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
Girish Kumardef46fc2020-08-05 18:20:11 +0000434 log.MarkSpanError(ctx, errors.New("channel-closed"))
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500435 protoError := &ic.Error{Reason: "channel-closed"}
436 var marshalledArg *any.Any
437 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
438 return false, nil // Should never happen
439 }
440 return false, marshalledArg
441 }
Rohan Agrawal31f21802020-06-12 05:38:46 +0000442 logger.Debugw(ctx, "received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
khenaidoo79232702018-12-04 11:00:41 -0500443 var responseBody *ic.InterContainerResponseBody
khenaidooabad44c2018-08-03 16:58:35 -0400444 var err error
Rohan Agrawal31f21802020-06-12 05:38:46 +0000445 if responseBody, err = decodeResponse(ctx, msg); err != nil {
446 logger.Errorw(ctx, "decode-response-error", log.Fields{"error": err})
npujar467fe752020-01-16 20:17:45 +0530447 // FIXME we should return something
khenaidooabad44c2018-08-03 16:58:35 -0400448 }
449 return responseBody.Success, responseBody.Result
450 case <-ctx.Done():
Rohan Agrawal31f21802020-06-12 05:38:46 +0000451 logger.Debugw(ctx, "context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
Girish Kumardef46fc2020-08-05 18:20:11 +0000452 log.MarkSpanError(ctx, errors.New("context-cancelled"))
khenaidooabad44c2018-08-03 16:58:35 -0400453 // pack the error as proto any type
npujar467fe752020-01-16 20:17:45 +0530454 protoError := &ic.Error{Reason: ctx.Err().Error(), Code: ic.ErrorCode_DEADLINE_EXCEEDED}
455
khenaidooabad44c2018-08-03 16:58:35 -0400456 var marshalledArg *any.Any
457 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
458 return false, nil // Should never happen
459 }
460 return false, marshalledArg
khenaidoob9203542018-09-17 22:56:37 -0400461 case <-childCtx.Done():
Rohan Agrawal31f21802020-06-12 05:38:46 +0000462 logger.Debugw(ctx, "context-cancelled", log.Fields{"rpc": rpc, "ctx": childCtx.Err()})
Girish Kumardef46fc2020-08-05 18:20:11 +0000463 log.MarkSpanError(ctx, errors.New("context-cancelled"))
khenaidoob9203542018-09-17 22:56:37 -0400464 // pack the error as proto any type
npujar467fe752020-01-16 20:17:45 +0530465 protoError := &ic.Error{Reason: childCtx.Err().Error(), Code: ic.ErrorCode_DEADLINE_EXCEEDED}
466
khenaidoob9203542018-09-17 22:56:37 -0400467 var marshalledArg *any.Any
468 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
469 return false, nil // Should never happen
470 }
471 return false, marshalledArg
khenaidooabad44c2018-08-03 16:58:35 -0400472 case <-kp.doneCh:
Rohan Agrawal31f21802020-06-12 05:38:46 +0000473 logger.Infow(ctx, "received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
khenaidooabad44c2018-08-03 16:58:35 -0400474 return true, nil
475 }
476 }
477 return true, nil
478}
479
khenaidoo43c82122018-11-22 18:38:28 -0500480// SubscribeWithRequestHandlerInterface allows a caller to assign a target object to be invoked automatically
khenaidooabad44c2018-08-03 16:58:35 -0400481// when a message is received on a given topic
Rohan Agrawal31f21802020-06-12 05:38:46 +0000482func (kp *interContainerProxy) SubscribeWithRequestHandlerInterface(ctx context.Context, topic Topic, handler interface{}) error {
khenaidooabad44c2018-08-03 16:58:35 -0400483
484 // Subscribe to receive messages for that topic
khenaidoo79232702018-12-04 11:00:41 -0500485 var ch <-chan *ic.InterContainerMessage
khenaidooabad44c2018-08-03 16:58:35 -0400486 var err error
Rohan Agrawal31f21802020-06-12 05:38:46 +0000487 if ch, err = kp.kafkaClient.Subscribe(ctx, &topic); err != nil {
khenaidoo43c82122018-11-22 18:38:28 -0500488 //if ch, err = kp.Subscribe(topic); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000489 logger.Errorw(ctx, "failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
Abhilash S.L90cd9552019-07-18 17:30:29 +0530490 return err
khenaidooabad44c2018-08-03 16:58:35 -0400491 }
khenaidoo43c82122018-11-22 18:38:28 -0500492
493 kp.defaultRequestHandlerInterface = handler
494 kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: handler, ch: ch})
khenaidooabad44c2018-08-03 16:58:35 -0400495 // Launch a go routine to receive and process kafka messages
Rohan Agrawal31f21802020-06-12 05:38:46 +0000496 go kp.waitForMessages(ctx, ch, topic, handler)
khenaidooabad44c2018-08-03 16:58:35 -0400497
498 return nil
499}
500
khenaidoo43c82122018-11-22 18:38:28 -0500501// SubscribeWithDefaultRequestHandler allows a caller to add a topic to an existing target object to be invoked automatically
502// when a message is received on a given topic. So far there is only 1 target registered per microservice
Rohan Agrawal31f21802020-06-12 05:38:46 +0000503func (kp *interContainerProxy) SubscribeWithDefaultRequestHandler(ctx context.Context, topic Topic, initialOffset int64) error {
khenaidoo43c82122018-11-22 18:38:28 -0500504 // Subscribe to receive messages for that topic
khenaidoo79232702018-12-04 11:00:41 -0500505 var ch <-chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -0500506 var err error
Rohan Agrawal31f21802020-06-12 05:38:46 +0000507 if ch, err = kp.kafkaClient.Subscribe(ctx, &topic, &KVArg{Key: Offset, Value: initialOffset}); err != nil {
508 logger.Errorw(ctx, "failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
khenaidooca301322019-01-09 23:06:32 -0500509 return err
khenaidoo43c82122018-11-22 18:38:28 -0500510 }
511 kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: kp.defaultRequestHandlerInterface, ch: ch})
512
513 // Launch a go routine to receive and process kafka messages
Rohan Agrawal31f21802020-06-12 05:38:46 +0000514 go kp.waitForMessages(ctx, ch, topic, kp.defaultRequestHandlerInterface)
khenaidoo43c82122018-11-22 18:38:28 -0500515
khenaidooabad44c2018-08-03 16:58:35 -0400516 return nil
517}
518
Rohan Agrawal31f21802020-06-12 05:38:46 +0000519func (kp *interContainerProxy) UnSubscribeFromRequestHandler(ctx context.Context, topic Topic) error {
520 return kp.deleteFromTopicRequestHandlerChannelMap(ctx, topic.Name)
khenaidoo43c82122018-11-22 18:38:28 -0500521}
522
Rohan Agrawal31f21802020-06-12 05:38:46 +0000523func (kp *interContainerProxy) deleteFromTopicResponseChannelMap(ctx context.Context, topic string) error {
khenaidoo43c82122018-11-22 18:38:28 -0500524 kp.lockTopicResponseChannelMap.Lock()
525 defer kp.lockTopicResponseChannelMap.Unlock()
526 if _, exist := kp.topicToResponseChannelMap[topic]; exist {
527 // Unsubscribe to this topic first - this will close the subscribed channel
528 var err error
Rohan Agrawal31f21802020-06-12 05:38:46 +0000529 if err = kp.kafkaClient.UnSubscribe(ctx, &Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
530 logger.Errorw(ctx, "unsubscribing-error", log.Fields{"topic": topic})
khenaidoo43c82122018-11-22 18:38:28 -0500531 }
532 delete(kp.topicToResponseChannelMap, topic)
533 return err
534 } else {
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000535 return fmt.Errorf("%s-Topic-not-found", topic)
khenaidooabad44c2018-08-03 16:58:35 -0400536 }
537}
538
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000539// nolint: unused
Rohan Agrawal31f21802020-06-12 05:38:46 +0000540func (kp *interContainerProxy) deleteAllTopicResponseChannelMap(ctx context.Context) error {
541 logger.Debug(ctx, "delete-all-topic-response-channel")
khenaidoo43c82122018-11-22 18:38:28 -0500542 kp.lockTopicResponseChannelMap.Lock()
543 defer kp.lockTopicResponseChannelMap.Unlock()
Scott Baker0e78ba22020-02-24 17:58:47 -0800544 var unsubscribeFailTopics []string
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000545 for topic := range kp.topicToResponseChannelMap {
khenaidoo43c82122018-11-22 18:38:28 -0500546 // Unsubscribe to this topic first - this will close the subscribed channel
Rohan Agrawal31f21802020-06-12 05:38:46 +0000547 if err := kp.kafkaClient.UnSubscribe(ctx, &Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
Scott Baker0e78ba22020-02-24 17:58:47 -0800548 unsubscribeFailTopics = append(unsubscribeFailTopics, topic)
Rohan Agrawal31f21802020-06-12 05:38:46 +0000549 logger.Errorw(ctx, "unsubscribing-error", log.Fields{"topic": topic, "error": err})
Scott Baker0e78ba22020-02-24 17:58:47 -0800550 // Do not return. Continue to try to unsubscribe to other topics.
551 } else {
552 // Only delete from channel map if successfully unsubscribed.
553 delete(kp.topicToResponseChannelMap, topic)
khenaidoo43c82122018-11-22 18:38:28 -0500554 }
khenaidooabad44c2018-08-03 16:58:35 -0400555 }
Scott Baker0e78ba22020-02-24 17:58:47 -0800556 if len(unsubscribeFailTopics) > 0 {
557 return fmt.Errorf("unsubscribe-errors: %v", unsubscribeFailTopics)
558 }
559 return nil
khenaidooabad44c2018-08-03 16:58:35 -0400560}
561
npujar467fe752020-01-16 20:17:45 +0530562func (kp *interContainerProxy) addToTopicRequestHandlerChannelMap(topic string, arg *requestHandlerChannel) {
khenaidoo43c82122018-11-22 18:38:28 -0500563 kp.lockTopicRequestHandlerChannelMap.Lock()
564 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
565 if _, exist := kp.topicToRequestHandlerChannelMap[topic]; !exist {
566 kp.topicToRequestHandlerChannelMap[topic] = arg
khenaidooabad44c2018-08-03 16:58:35 -0400567 }
khenaidooabad44c2018-08-03 16:58:35 -0400568}
569
Rohan Agrawal31f21802020-06-12 05:38:46 +0000570func (kp *interContainerProxy) deleteFromTopicRequestHandlerChannelMap(ctx context.Context, topic string) error {
khenaidoo43c82122018-11-22 18:38:28 -0500571 kp.lockTopicRequestHandlerChannelMap.Lock()
572 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
573 if _, exist := kp.topicToRequestHandlerChannelMap[topic]; exist {
574 // Close the kafka client client first by unsubscribing to this topic
Rohan Agrawal31f21802020-06-12 05:38:46 +0000575 if err := kp.kafkaClient.UnSubscribe(ctx, &Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000576 return err
577 }
khenaidoo43c82122018-11-22 18:38:28 -0500578 delete(kp.topicToRequestHandlerChannelMap, topic)
khenaidooabad44c2018-08-03 16:58:35 -0400579 return nil
khenaidoo43c82122018-11-22 18:38:28 -0500580 } else {
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000581 return fmt.Errorf("%s-Topic-not-found", topic)
khenaidooabad44c2018-08-03 16:58:35 -0400582 }
khenaidooabad44c2018-08-03 16:58:35 -0400583}
584
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000585// nolint: unused
Rohan Agrawal31f21802020-06-12 05:38:46 +0000586func (kp *interContainerProxy) deleteAllTopicRequestHandlerChannelMap(ctx context.Context) error {
587 logger.Debug(ctx, "delete-all-topic-request-channel")
khenaidoo43c82122018-11-22 18:38:28 -0500588 kp.lockTopicRequestHandlerChannelMap.Lock()
589 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
Scott Baker0e78ba22020-02-24 17:58:47 -0800590 var unsubscribeFailTopics []string
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000591 for topic := range kp.topicToRequestHandlerChannelMap {
khenaidoo43c82122018-11-22 18:38:28 -0500592 // Close the kafka client client first by unsubscribing to this topic
Rohan Agrawal31f21802020-06-12 05:38:46 +0000593 if err := kp.kafkaClient.UnSubscribe(ctx, &Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
Scott Baker0e78ba22020-02-24 17:58:47 -0800594 unsubscribeFailTopics = append(unsubscribeFailTopics, topic)
Rohan Agrawal31f21802020-06-12 05:38:46 +0000595 logger.Errorw(ctx, "unsubscribing-error", log.Fields{"topic": topic, "error": err})
Scott Baker0e78ba22020-02-24 17:58:47 -0800596 // Do not return. Continue to try to unsubscribe to other topics.
597 } else {
598 // Only delete from channel map if successfully unsubscribed.
599 delete(kp.topicToRequestHandlerChannelMap, topic)
khenaidoo43c82122018-11-22 18:38:28 -0500600 }
khenaidoo43c82122018-11-22 18:38:28 -0500601 }
Scott Baker0e78ba22020-02-24 17:58:47 -0800602 if len(unsubscribeFailTopics) > 0 {
603 return fmt.Errorf("unsubscribe-errors: %v", unsubscribeFailTopics)
604 }
605 return nil
khenaidoo43c82122018-11-22 18:38:28 -0500606}
607
npujar467fe752020-01-16 20:17:45 +0530608func (kp *interContainerProxy) addToTransactionIdToChannelMap(id string, topic *Topic, arg chan *ic.InterContainerMessage) {
khenaidooabad44c2018-08-03 16:58:35 -0400609 kp.lockTransactionIdToChannelMap.Lock()
610 defer kp.lockTransactionIdToChannelMap.Unlock()
611 if _, exist := kp.transactionIdToChannelMap[id]; !exist {
khenaidoo43c82122018-11-22 18:38:28 -0500612 kp.transactionIdToChannelMap[id] = &transactionChannel{topic: topic, ch: arg}
khenaidooabad44c2018-08-03 16:58:35 -0400613 }
614}
615
npujar467fe752020-01-16 20:17:45 +0530616func (kp *interContainerProxy) deleteFromTransactionIdToChannelMap(id string) {
khenaidooabad44c2018-08-03 16:58:35 -0400617 kp.lockTransactionIdToChannelMap.Lock()
618 defer kp.lockTransactionIdToChannelMap.Unlock()
khenaidoo43c82122018-11-22 18:38:28 -0500619 if transChannel, exist := kp.transactionIdToChannelMap[id]; exist {
620 // Close the channel first
621 close(transChannel.ch)
khenaidooabad44c2018-08-03 16:58:35 -0400622 delete(kp.transactionIdToChannelMap, id)
623 }
624}
625
npujar467fe752020-01-16 20:17:45 +0530626func (kp *interContainerProxy) deleteTopicTransactionIdToChannelMap(id string) {
khenaidoo43c82122018-11-22 18:38:28 -0500627 kp.lockTransactionIdToChannelMap.Lock()
628 defer kp.lockTransactionIdToChannelMap.Unlock()
629 for key, value := range kp.transactionIdToChannelMap {
630 if value.topic.Name == id {
631 close(value.ch)
632 delete(kp.transactionIdToChannelMap, key)
khenaidooabad44c2018-08-03 16:58:35 -0400633 }
634 }
khenaidooabad44c2018-08-03 16:58:35 -0400635}
636
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000637// nolint: unused
Rohan Agrawal31f21802020-06-12 05:38:46 +0000638func (kp *interContainerProxy) deleteAllTransactionIdToChannelMap(ctx context.Context) {
639 logger.Debug(ctx, "delete-all-transaction-id-channel-map")
khenaidoo43c82122018-11-22 18:38:28 -0500640 kp.lockTransactionIdToChannelMap.Lock()
641 defer kp.lockTransactionIdToChannelMap.Unlock()
642 for key, value := range kp.transactionIdToChannelMap {
643 close(value.ch)
644 delete(kp.transactionIdToChannelMap, key)
khenaidooabad44c2018-08-03 16:58:35 -0400645 }
khenaidooabad44c2018-08-03 16:58:35 -0400646}
647
Rohan Agrawal31f21802020-06-12 05:38:46 +0000648func (kp *interContainerProxy) DeleteTopic(ctx context.Context, topic Topic) error {
khenaidoo43c82122018-11-22 18:38:28 -0500649 // If we have any consumers on that topic we need to close them
Rohan Agrawal31f21802020-06-12 05:38:46 +0000650 if err := kp.deleteFromTopicResponseChannelMap(ctx, topic.Name); err != nil {
651 logger.Errorw(ctx, "delete-from-topic-responsechannelmap-failed", log.Fields{"error": err})
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500652 }
Rohan Agrawal31f21802020-06-12 05:38:46 +0000653 if err := kp.deleteFromTopicRequestHandlerChannelMap(ctx, topic.Name); err != nil {
654 logger.Errorw(ctx, "delete-from-topic-requesthandlerchannelmap-failed", log.Fields{"error": err})
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500655 }
khenaidoo43c82122018-11-22 18:38:28 -0500656 kp.deleteTopicTransactionIdToChannelMap(topic.Name)
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500657
Rohan Agrawal31f21802020-06-12 05:38:46 +0000658 return kp.kafkaClient.DeleteTopic(ctx, &topic)
khenaidoo43c82122018-11-22 18:38:28 -0500659}
660
Rohan Agrawal31f21802020-06-12 05:38:46 +0000661func encodeReturnedValue(ctx context.Context, returnedVal interface{}) (*any.Any, error) {
khenaidooabad44c2018-08-03 16:58:35 -0400662 // Encode the response argument - needs to be a proto message
663 if returnedVal == nil {
664 return nil, nil
665 }
666 protoValue, ok := returnedVal.(proto.Message)
667 if !ok {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000668 logger.Warnw(ctx, "response-value-not-proto-message", log.Fields{"error": ok, "returnVal": returnedVal})
khenaidooabad44c2018-08-03 16:58:35 -0400669 err := errors.New("response-value-not-proto-message")
670 return nil, err
671 }
672
673 // Marshal the returned value, if any
674 var marshalledReturnedVal *any.Any
675 var err error
676 if marshalledReturnedVal, err = ptypes.MarshalAny(protoValue); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000677 logger.Warnw(ctx, "cannot-marshal-returned-val", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400678 return nil, err
679 }
680 return marshalledReturnedVal, nil
681}
682
Rohan Agrawal31f21802020-06-12 05:38:46 +0000683func encodeDefaultFailedResponse(ctx context.Context, request *ic.InterContainerMessage) *ic.InterContainerMessage {
khenaidoo79232702018-12-04 11:00:41 -0500684 responseHeader := &ic.Header{
khenaidooabad44c2018-08-03 16:58:35 -0400685 Id: request.Header.Id,
khenaidoo79232702018-12-04 11:00:41 -0500686 Type: ic.MessageType_RESPONSE,
khenaidooabad44c2018-08-03 16:58:35 -0400687 FromTopic: request.Header.ToTopic,
688 ToTopic: request.Header.FromTopic,
Scott Baker504b4802020-04-17 10:12:20 -0700689 Timestamp: ptypes.TimestampNow(),
khenaidooabad44c2018-08-03 16:58:35 -0400690 }
khenaidoo79232702018-12-04 11:00:41 -0500691 responseBody := &ic.InterContainerResponseBody{
khenaidooabad44c2018-08-03 16:58:35 -0400692 Success: false,
693 Result: nil,
694 }
695 var marshalledResponseBody *any.Any
696 var err error
697 // Error should never happen here
698 if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000699 logger.Warnw(ctx, "cannot-marshal-failed-response-body", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400700 }
701
khenaidoo79232702018-12-04 11:00:41 -0500702 return &ic.InterContainerMessage{
khenaidooabad44c2018-08-03 16:58:35 -0400703 Header: responseHeader,
704 Body: marshalledResponseBody,
705 }
706
707}
708
709//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
710//or an error on failure
Rohan Agrawal31f21802020-06-12 05:38:46 +0000711func encodeResponse(ctx context.Context, request *ic.InterContainerMessage, success bool, returnedValues ...interface{}) (*ic.InterContainerMessage, error) {
712 //logger.Debugw(ctx, "encodeResponse", log.Fields{"success": success, "returnedValues": returnedValues})
khenaidoo79232702018-12-04 11:00:41 -0500713 responseHeader := &ic.Header{
khenaidooabad44c2018-08-03 16:58:35 -0400714 Id: request.Header.Id,
khenaidoo79232702018-12-04 11:00:41 -0500715 Type: ic.MessageType_RESPONSE,
khenaidooabad44c2018-08-03 16:58:35 -0400716 FromTopic: request.Header.ToTopic,
717 ToTopic: request.Header.FromTopic,
khenaidoo2c6a0992019-04-29 13:46:56 -0400718 KeyTopic: request.Header.KeyTopic,
Scott Baker504b4802020-04-17 10:12:20 -0700719 Timestamp: ptypes.TimestampNow(),
khenaidooabad44c2018-08-03 16:58:35 -0400720 }
721
722 // Go over all returned values
723 var marshalledReturnedVal *any.Any
724 var err error
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000725
726 // for now we support only 1 returned value - (excluding the error)
727 if len(returnedValues) > 0 {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000728 if marshalledReturnedVal, err = encodeReturnedValue(ctx, returnedValues[0]); err != nil {
729 logger.Warnw(ctx, "cannot-marshal-response-body", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400730 }
khenaidooabad44c2018-08-03 16:58:35 -0400731 }
732
khenaidoo79232702018-12-04 11:00:41 -0500733 responseBody := &ic.InterContainerResponseBody{
khenaidooabad44c2018-08-03 16:58:35 -0400734 Success: success,
735 Result: marshalledReturnedVal,
736 }
737
738 // Marshal the response body
739 var marshalledResponseBody *any.Any
740 if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000741 logger.Warnw(ctx, "cannot-marshal-response-body", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400742 return nil, err
743 }
744
khenaidoo79232702018-12-04 11:00:41 -0500745 return &ic.InterContainerMessage{
khenaidooabad44c2018-08-03 16:58:35 -0400746 Header: responseHeader,
747 Body: marshalledResponseBody,
748 }, nil
749}
750
Rohan Agrawal31f21802020-06-12 05:38:46 +0000751func CallFuncByName(ctx context.Context, myClass interface{}, funcName string, params ...interface{}) (out []reflect.Value, err error) {
khenaidooabad44c2018-08-03 16:58:35 -0400752 myClassValue := reflect.ValueOf(myClass)
khenaidoo19374072018-12-11 11:05:15 -0500753 // Capitalize the first letter in the funcName to workaround the first capital letters required to
754 // invoke a function from a different package
755 funcName = strings.Title(funcName)
khenaidooabad44c2018-08-03 16:58:35 -0400756 m := myClassValue.MethodByName(funcName)
757 if !m.IsValid() {
khenaidoo43c82122018-11-22 18:38:28 -0500758 return make([]reflect.Value, 0), fmt.Errorf("method-not-found \"%s\"", funcName)
khenaidooabad44c2018-08-03 16:58:35 -0400759 }
Rohan Agrawal31f21802020-06-12 05:38:46 +0000760 in := make([]reflect.Value, len(params)+1)
761 in[0] = reflect.ValueOf(ctx)
khenaidooabad44c2018-08-03 16:58:35 -0400762 for i, param := range params {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000763 in[i+1] = reflect.ValueOf(param)
khenaidooabad44c2018-08-03 16:58:35 -0400764 }
765 out = m.Call(in)
766 return
767}
768
Rohan Agrawal31f21802020-06-12 05:38:46 +0000769func (kp *interContainerProxy) addTransactionId(ctx context.Context, transactionId string, currentArgs []*ic.Argument) []*ic.Argument {
khenaidoo297cd252019-02-07 22:10:23 -0500770 arg := &KVArg{
771 Key: TransactionKey,
772 Value: &ic.StrType{Val: transactionId},
773 }
774
775 var marshalledArg *any.Any
776 var err error
777 if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: transactionId}); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000778 logger.Warnw(ctx, "cannot-add-transactionId", log.Fields{"error": err})
khenaidoo297cd252019-02-07 22:10:23 -0500779 return currentArgs
780 }
781 protoArg := &ic.Argument{
782 Key: arg.Key,
783 Value: marshalledArg,
784 }
785 return append(currentArgs, protoArg)
786}
787
Rohan Agrawal31f21802020-06-12 05:38:46 +0000788func (kp *interContainerProxy) addFromTopic(ctx context.Context, fromTopic string, currentArgs []*ic.Argument) []*ic.Argument {
khenaidoo54e0ddf2019-02-27 16:21:33 -0500789 var marshalledArg *any.Any
790 var err error
791 if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: fromTopic}); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000792 logger.Warnw(ctx, "cannot-add-transactionId", log.Fields{"error": err})
khenaidoo54e0ddf2019-02-27 16:21:33 -0500793 return currentArgs
794 }
795 protoArg := &ic.Argument{
796 Key: FromTopic,
797 Value: marshalledArg,
798 }
799 return append(currentArgs, protoArg)
800}
801
Girish Kumardef46fc2020-08-05 18:20:11 +0000802// Method to extract the Span embedded in Kafka RPC request on the receiver side. If span is found embedded in the KV args (with key as "span"),
803// it is de-serialized and injected into the Context to be carried forward by the RPC request processor thread.
804// If no span is found embedded, even then a span is created with name as "kafka-rpc-<rpc-name>" to enrich the Context for RPC calls coming
805// from components currently not sending the span (e.g. openonu adapter)
806func (kp *interContainerProxy) enrichContextWithSpan(ctx context.Context, rpcName string, args []*ic.Argument) (opentracing.Span, context.Context) {
807
808 for _, arg := range args {
809 if arg.Key == "span" {
810 var err error
811 var textMapString ic.StrType
812 if err = ptypes.UnmarshalAny(arg.Value, &textMapString); err != nil {
813 logger.Warnw(ctx, "unable-to-unmarshal-kvarg-to-textmap-string", log.Fields{"value": arg.Value})
814 break
815 }
816
817 spanTextMap := make(map[string]string)
818 if err = json.Unmarshal([]byte(textMapString.Val), &spanTextMap); err != nil {
819 logger.Warnw(ctx, "unable-to-unmarshal-textmap-from-json-string", log.Fields{"textMapString": textMapString, "error": err})
820 break
821 }
822
823 var spanContext opentracing.SpanContext
824 if spanContext, err = opentracing.GlobalTracer().Extract(opentracing.TextMap, opentracing.TextMapCarrier(spanTextMap)); err != nil {
825 logger.Warnw(ctx, "unable-to-deserialize-textmap-to-span", log.Fields{"textMap": spanTextMap, "error": err})
826 break
827 }
828
829 var receivedRpcName string
830 extractBaggage := func(k, v string) bool {
831 if k == "rpc-span-name" {
832 receivedRpcName = v
833 return false
834 }
835 return true
836 }
837
838 spanContext.ForeachBaggageItem(extractBaggage)
839
840 return opentracing.StartSpanFromContext(ctx, receivedRpcName, opentracing.FollowsFrom(spanContext))
841 }
842 }
843
844 // Create new Child Span with rpc as name if no span details were received in kafka arguments
845 var spanName strings.Builder
846 spanName.WriteString("kafka-")
847
848 // In case of inter adapter message, use Msg Type for constructing RPC name
849 if rpcName == "process_inter_adapter_message" {
850 for _, arg := range args {
851 if arg.Key == "msg" {
852 iamsg := ic.InterAdapterMessage{}
853 if err := ptypes.UnmarshalAny(arg.Value, &iamsg); err == nil {
854 spanName.WriteString("inter-adapter-")
855 rpcName = iamsg.Header.Type.String()
856 }
857 }
858 }
859 }
860
861 spanName.WriteString("rpc-")
862 spanName.WriteString(rpcName)
863
864 return opentracing.StartSpanFromContext(ctx, spanName.String())
865}
866
Rohan Agrawal31f21802020-06-12 05:38:46 +0000867func (kp *interContainerProxy) handleMessage(ctx context.Context, msg *ic.InterContainerMessage, targetInterface interface{}) {
khenaidooabad44c2018-08-03 16:58:35 -0400868
khenaidoo43c82122018-11-22 18:38:28 -0500869 // First extract the header to know whether this is a request - responses are handled by a different handler
khenaidoo79232702018-12-04 11:00:41 -0500870 if msg.Header.Type == ic.MessageType_REQUEST {
khenaidooabad44c2018-08-03 16:58:35 -0400871 var out []reflect.Value
872 var err error
873
874 // Get the request body
khenaidoo79232702018-12-04 11:00:41 -0500875 requestBody := &ic.InterContainerRequestBody{}
khenaidooabad44c2018-08-03 16:58:35 -0400876 if err = ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000877 logger.Warnw(ctx, "cannot-unmarshal-request", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400878 } else {
Girish Kumardef46fc2020-08-05 18:20:11 +0000879 span, ctx := kp.enrichContextWithSpan(ctx, requestBody.Rpc, requestBody.Args)
880 defer span.Finish()
881
Rohan Agrawal31f21802020-06-12 05:38:46 +0000882 logger.Debugw(ctx, "received-request", log.Fields{"rpc": requestBody.Rpc, "header": msg.Header})
khenaidooabad44c2018-08-03 16:58:35 -0400883 // let the callee unpack the arguments as its the only one that knows the real proto type
khenaidoo297cd252019-02-07 22:10:23 -0500884 // Augment the requestBody with the message Id as it will be used in scenarios where cores
885 // are set in pairs and competing
Rohan Agrawal31f21802020-06-12 05:38:46 +0000886 requestBody.Args = kp.addTransactionId(ctx, msg.Header.Id, requestBody.Args)
khenaidoo54e0ddf2019-02-27 16:21:33 -0500887
888 // Augment the requestBody with the From topic name as it will be used in scenarios where a container
889 // needs to send an unsollicited message to the currently requested container
Rohan Agrawal31f21802020-06-12 05:38:46 +0000890 requestBody.Args = kp.addFromTopic(ctx, msg.Header.FromTopic, requestBody.Args)
khenaidoo54e0ddf2019-02-27 16:21:33 -0500891
Rohan Agrawal31f21802020-06-12 05:38:46 +0000892 out, err = CallFuncByName(ctx, targetInterface, requestBody.Rpc, requestBody.Args)
khenaidooabad44c2018-08-03 16:58:35 -0400893 if err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000894 logger.Warn(ctx, err)
khenaidooabad44c2018-08-03 16:58:35 -0400895 }
896 }
897 // Response required?
898 if requestBody.ResponseRequired {
899 // If we already have an error before then just return that
khenaidoo79232702018-12-04 11:00:41 -0500900 var returnError *ic.Error
khenaidooabad44c2018-08-03 16:58:35 -0400901 var returnedValues []interface{}
902 var success bool
903 if err != nil {
khenaidoo79232702018-12-04 11:00:41 -0500904 returnError = &ic.Error{Reason: err.Error()}
khenaidooabad44c2018-08-03 16:58:35 -0400905 returnedValues = make([]interface{}, 1)
906 returnedValues[0] = returnError
907 } else {
khenaidoob9203542018-09-17 22:56:37 -0400908 returnedValues = make([]interface{}, 0)
909 // Check for errors first
910 lastIndex := len(out) - 1
911 if out[lastIndex].Interface() != nil { // Error
khenaidoo09771ef2019-10-11 14:25:02 -0400912 if retError, ok := out[lastIndex].Interface().(error); ok {
913 if retError.Error() == ErrorTransactionNotAcquired.Error() {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000914 logger.Debugw(ctx, "Ignoring request", log.Fields{"error": retError, "txId": msg.Header.Id})
khenaidoo09771ef2019-10-11 14:25:02 -0400915 return // Ignore - process is in competing mode and ignored transaction
916 }
917 returnError = &ic.Error{Reason: retError.Error()}
khenaidoob9203542018-09-17 22:56:37 -0400918 returnedValues = append(returnedValues, returnError)
919 } else { // Should never happen
khenaidoo79232702018-12-04 11:00:41 -0500920 returnError = &ic.Error{Reason: "incorrect-error-returns"}
khenaidoob9203542018-09-17 22:56:37 -0400921 returnedValues = append(returnedValues, returnError)
922 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500923 } else if len(out) == 2 && reflect.ValueOf(out[0].Interface()).IsValid() && reflect.ValueOf(out[0].Interface()).IsNil() {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000924 logger.Warnw(ctx, "Unexpected response of (nil,nil)", log.Fields{"txId": msg.Header.Id})
khenaidoo09771ef2019-10-11 14:25:02 -0400925 return // Ignore - should not happen
khenaidoob9203542018-09-17 22:56:37 -0400926 } else { // Non-error case
927 success = true
928 for idx, val := range out {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000929 //logger.Debugw(ctx, "returned-api-response-loop", log.Fields{"idx": idx, "val": val.Interface()})
khenaidoob9203542018-09-17 22:56:37 -0400930 if idx != lastIndex {
931 returnedValues = append(returnedValues, val.Interface())
khenaidooabad44c2018-08-03 16:58:35 -0400932 }
khenaidooabad44c2018-08-03 16:58:35 -0400933 }
934 }
935 }
936
khenaidoo79232702018-12-04 11:00:41 -0500937 var icm *ic.InterContainerMessage
Rohan Agrawal31f21802020-06-12 05:38:46 +0000938 if icm, err = encodeResponse(ctx, msg, success, returnedValues...); err != nil {
939 logger.Warnw(ctx, "error-encoding-response-returning-failure-result", log.Fields{"error": err})
940 icm = encodeDefaultFailedResponse(ctx, msg)
khenaidooabad44c2018-08-03 16:58:35 -0400941 }
khenaidoo43c82122018-11-22 18:38:28 -0500942 // To preserve ordering of messages, all messages to a given topic are sent to the same partition
943 // by providing a message key. The key is encoded in the topic name. If the deviceId is not
944 // present then the key will be empty, hence all messages for a given topic will be sent to all
945 // partitions.
946 replyTopic := &Topic{Name: msg.Header.FromTopic}
khenaidoobdcb8e02019-03-06 16:28:56 -0500947 key := msg.Header.KeyTopic
Rohan Agrawal31f21802020-06-12 05:38:46 +0000948 logger.Debugw(ctx, "sending-response-to-kafka", log.Fields{"rpc": requestBody.Rpc, "header": icm.Header, "key": key})
khenaidoo43c82122018-11-22 18:38:28 -0500949 // TODO: handle error response.
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000950 go func() {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000951 if err := kp.kafkaClient.Send(ctx, icm, replyTopic, key); err != nil {
952 logger.Errorw(ctx, "send-reply-failed", log.Fields{
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000953 "topic": replyTopic,
954 "key": key,
955 "error": err})
956 }
957 }()
khenaidooabad44c2018-08-03 16:58:35 -0400958 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500959 } else if msg.Header.Type == ic.MessageType_RESPONSE {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000960 logger.Debugw(ctx, "response-received", log.Fields{"msg-header": msg.Header})
961 go kp.dispatchResponse(ctx, msg)
khenaidoo54e0ddf2019-02-27 16:21:33 -0500962 } else {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000963 logger.Warnw(ctx, "unsupported-message-received", log.Fields{"msg-header": msg.Header})
khenaidooabad44c2018-08-03 16:58:35 -0400964 }
965}
966
Rohan Agrawal31f21802020-06-12 05:38:46 +0000967func (kp *interContainerProxy) waitForMessages(ctx context.Context, ch <-chan *ic.InterContainerMessage, topic Topic, targetInterface interface{}) {
khenaidooabad44c2018-08-03 16:58:35 -0400968 // Wait for messages
969 for msg := range ch {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000970 //logger.Debugw(ctx, "request-received", log.Fields{"msg": msg, "topic": topic.Name, "target": targetInterface})
971 go kp.handleMessage(context.Background(), msg, targetInterface)
khenaidooabad44c2018-08-03 16:58:35 -0400972 }
973}
974
Rohan Agrawal31f21802020-06-12 05:38:46 +0000975func (kp *interContainerProxy) dispatchResponse(ctx context.Context, msg *ic.InterContainerMessage) {
khenaidoo1ce37ad2019-03-24 22:07:24 -0400976 kp.lockTransactionIdToChannelMap.RLock()
977 defer kp.lockTransactionIdToChannelMap.RUnlock()
khenaidooabad44c2018-08-03 16:58:35 -0400978 if _, exist := kp.transactionIdToChannelMap[msg.Header.Id]; !exist {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000979 logger.Debugw(ctx, "no-waiting-channel", log.Fields{"transaction": msg.Header.Id})
khenaidooabad44c2018-08-03 16:58:35 -0400980 return
981 }
khenaidoo43c82122018-11-22 18:38:28 -0500982 kp.transactionIdToChannelMap[msg.Header.Id].ch <- msg
khenaidooabad44c2018-08-03 16:58:35 -0400983}
984
khenaidooabad44c2018-08-03 16:58:35 -0400985// subscribeForResponse allows a caller to subscribe to a given topic when waiting for a response.
986// This method is built to prevent all subscribers to receive all messages as is the case of the Subscribe
987// API. There is one response channel waiting for kafka messages before dispatching the message to the
988// corresponding waiting channel
Rohan Agrawal31f21802020-06-12 05:38:46 +0000989func (kp *interContainerProxy) subscribeForResponse(ctx context.Context, topic Topic, trnsId string) (chan *ic.InterContainerMessage, error) {
990 logger.Debugw(ctx, "subscribeForResponse", log.Fields{"topic": topic.Name, "trnsid": trnsId})
khenaidooabad44c2018-08-03 16:58:35 -0400991
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500992 // Create a specific channel for this consumers. We cannot use the channel from the kafkaclient as it will
khenaidoo43c82122018-11-22 18:38:28 -0500993 // broadcast any message for this topic to all channels waiting on it.
Scott Bakerb9635992020-03-11 21:11:28 -0700994 // Set channel size to 1 to prevent deadlock, see VOL-2708
995 ch := make(chan *ic.InterContainerMessage, 1)
khenaidoo43c82122018-11-22 18:38:28 -0500996 kp.addToTransactionIdToChannelMap(trnsId, &topic, ch)
khenaidooabad44c2018-08-03 16:58:35 -0400997
998 return ch, nil
999}
1000
Rohan Agrawal31f21802020-06-12 05:38:46 +00001001func (kp *interContainerProxy) unSubscribeForResponse(ctx context.Context, trnsId string) error {
1002 logger.Debugw(ctx, "unsubscribe-for-response", log.Fields{"trnsId": trnsId})
khenaidoo7ff26c72019-01-16 14:55:48 -05001003 kp.deleteFromTransactionIdToChannelMap(trnsId)
khenaidooabad44c2018-08-03 16:58:35 -04001004 return nil
1005}
1006
Rohan Agrawal31f21802020-06-12 05:38:46 +00001007func (kp *interContainerProxy) EnableLivenessChannel(ctx context.Context, enable bool) chan bool {
1008 return kp.kafkaClient.EnableLivenessChannel(ctx, enable)
Scott Bakeree6a0872019-10-29 15:59:52 -07001009}
1010
Rohan Agrawal31f21802020-06-12 05:38:46 +00001011func (kp *interContainerProxy) EnableHealthinessChannel(ctx context.Context, enable bool) chan bool {
1012 return kp.kafkaClient.EnableHealthinessChannel(ctx, enable)
serkant.uluderya2ae470f2020-01-21 11:13:09 -08001013}
1014
Rohan Agrawal31f21802020-06-12 05:38:46 +00001015func (kp *interContainerProxy) SendLiveness(ctx context.Context) error {
1016 return kp.kafkaClient.SendLiveness(ctx)
Scott Bakeree6a0872019-10-29 15:59:52 -07001017}
1018
khenaidooabad44c2018-08-03 16:58:35 -04001019//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
1020//or an error on failure
Rohan Agrawal31f21802020-06-12 05:38:46 +00001021func encodeRequest(ctx context.Context, rpc string, toTopic *Topic, replyTopic *Topic, key string, kvArgs ...*KVArg) (*ic.InterContainerMessage, error) {
khenaidoo79232702018-12-04 11:00:41 -05001022 requestHeader := &ic.Header{
khenaidooabad44c2018-08-03 16:58:35 -04001023 Id: uuid.New().String(),
khenaidoo79232702018-12-04 11:00:41 -05001024 Type: ic.MessageType_REQUEST,
khenaidooabad44c2018-08-03 16:58:35 -04001025 FromTopic: replyTopic.Name,
1026 ToTopic: toTopic.Name,
khenaidoo2c6a0992019-04-29 13:46:56 -04001027 KeyTopic: key,
Scott Baker504b4802020-04-17 10:12:20 -07001028 Timestamp: ptypes.TimestampNow(),
khenaidooabad44c2018-08-03 16:58:35 -04001029 }
khenaidoo79232702018-12-04 11:00:41 -05001030 requestBody := &ic.InterContainerRequestBody{
khenaidooabad44c2018-08-03 16:58:35 -04001031 Rpc: rpc,
1032 ResponseRequired: true,
1033 ReplyToTopic: replyTopic.Name,
1034 }
1035
1036 for _, arg := range kvArgs {
khenaidoo2c6f1672018-09-20 23:14:41 -04001037 if arg == nil {
1038 // In case the caller sends an array with empty args
1039 continue
1040 }
khenaidooabad44c2018-08-03 16:58:35 -04001041 var marshalledArg *any.Any
1042 var err error
1043 // ascertain the value interface type is a proto.Message
1044 protoValue, ok := arg.Value.(proto.Message)
1045 if !ok {
Rohan Agrawal31f21802020-06-12 05:38:46 +00001046 logger.Warnw(ctx, "argument-value-not-proto-message", log.Fields{"error": ok, "Value": arg.Value})
khenaidooabad44c2018-08-03 16:58:35 -04001047 err := errors.New("argument-value-not-proto-message")
1048 return nil, err
1049 }
1050 if marshalledArg, err = ptypes.MarshalAny(protoValue); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +00001051 logger.Warnw(ctx, "cannot-marshal-request", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -04001052 return nil, err
1053 }
khenaidoo79232702018-12-04 11:00:41 -05001054 protoArg := &ic.Argument{
khenaidooabad44c2018-08-03 16:58:35 -04001055 Key: arg.Key,
1056 Value: marshalledArg,
1057 }
1058 requestBody.Args = append(requestBody.Args, protoArg)
1059 }
1060
1061 var marshalledData *any.Any
1062 var err error
1063 if marshalledData, err = ptypes.MarshalAny(requestBody); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +00001064 logger.Warnw(ctx, "cannot-marshal-request", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -04001065 return nil, err
1066 }
khenaidoo79232702018-12-04 11:00:41 -05001067 request := &ic.InterContainerMessage{
khenaidooabad44c2018-08-03 16:58:35 -04001068 Header: requestHeader,
1069 Body: marshalledData,
1070 }
1071 return request, nil
1072}
1073
Rohan Agrawal31f21802020-06-12 05:38:46 +00001074func decodeResponse(ctx context.Context, response *ic.InterContainerMessage) (*ic.InterContainerResponseBody, error) {
khenaidooabad44c2018-08-03 16:58:35 -04001075 // Extract the message body
khenaidoo79232702018-12-04 11:00:41 -05001076 responseBody := ic.InterContainerResponseBody{}
khenaidooabad44c2018-08-03 16:58:35 -04001077 if err := ptypes.UnmarshalAny(response.Body, &responseBody); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +00001078 logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -04001079 return nil, err
1080 }
Rohan Agrawal31f21802020-06-12 05:38:46 +00001081 //logger.Debugw(ctx, "response-decoded-successfully", log.Fields{"response-status": &responseBody.Success})
khenaidooabad44c2018-08-03 16:58:35 -04001082
1083 return &responseBody, nil
1084
1085}