blob: 120ed45c28671e9664391e830fa64bf0a9991aee [file] [log] [blame]
dbainbri4d3a0dc2020-12-02 00:33:42 +00001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kafka
17
18import (
19 "context"
20 "encoding/json"
21 "errors"
22 "fmt"
dbainbri4d3a0dc2020-12-02 00:33:42 +000023 "reflect"
24 "strings"
25 "sync"
26 "time"
27
Girish Gowdra097aa822021-07-08 15:48:35 -070028 "google.golang.org/grpc/codes"
29 "google.golang.org/grpc/status"
30
dbainbri4d3a0dc2020-12-02 00:33:42 +000031 "github.com/golang/protobuf/proto"
32 "github.com/golang/protobuf/ptypes"
33 "github.com/golang/protobuf/ptypes/any"
34 "github.com/google/uuid"
Girish Gowdra50e56422021-06-01 16:46:04 -070035 "github.com/opencord/voltha-lib-go/v5/pkg/log"
dbainbri4d3a0dc2020-12-02 00:33:42 +000036 ic "github.com/opencord/voltha-protos/v4/go/inter_container"
37 "github.com/opentracing/opentracing-go"
38)
39
40const (
41 DefaultMaxRetries = 3
42 DefaultRequestTimeout = 60000 // 60000 milliseconds - to handle a wider latency range
43)
44
45const (
46 TransactionKey = "transactionID"
47 FromTopic = "fromTopic"
48)
49
50var ErrorTransactionNotAcquired = errors.New("transaction-not-acquired")
51var ErrorTransactionInvalidId = errors.New("transaction-invalid-id")
52
53// requestHandlerChannel represents an interface associated with a channel. Whenever, an event is
54// obtained from that channel, this interface is invoked. This is used to handle
55// async requests into the Core via the kafka messaging bus
56type requestHandlerChannel struct {
57 requesthandlerInterface interface{}
58 ch <-chan *ic.InterContainerMessage
59}
60
61// transactionChannel represents a combination of a topic and a channel onto which a response received
62// on the kafka bus will be sent to
63type transactionChannel struct {
64 topic *Topic
65 ch chan *ic.InterContainerMessage
66}
67
68type InterContainerProxy interface {
69 Start(ctx context.Context) error
70 Stop(ctx context.Context)
71 GetDefaultTopic() *Topic
72 InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic, waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any)
73 InvokeAsyncRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic, waitForResponse bool, key string, kvArgs ...*KVArg) chan *RpcResponse
74 SubscribeWithRequestHandlerInterface(ctx context.Context, topic Topic, handler interface{}) error
75 SubscribeWithDefaultRequestHandler(ctx context.Context, topic Topic, initialOffset int64) error
76 UnSubscribeFromRequestHandler(ctx context.Context, topic Topic) error
77 DeleteTopic(ctx context.Context, topic Topic) error
78 EnableLivenessChannel(ctx context.Context, enable bool) chan bool
79 SendLiveness(ctx context.Context) error
80}
81
82// interContainerProxy represents the messaging proxy
83type interContainerProxy struct {
84 kafkaAddress string
85 defaultTopic *Topic
86 defaultRequestHandlerInterface interface{}
87 kafkaClient Client
88 doneCh chan struct{}
89 doneOnce sync.Once
90
91 // This map is used to map a topic to an interface and channel. When a request is received
92 // on that channel (registered to the topic) then that interface is invoked.
93 topicToRequestHandlerChannelMap map[string]*requestHandlerChannel
94 lockTopicRequestHandlerChannelMap sync.RWMutex
95
96 // This map is used to map a channel to a response topic. This channel handles all responses on that
97 // channel for that topic and forward them to the appropriate consumers channel, using the
98 // transactionIdToChannelMap.
99 topicToResponseChannelMap map[string]<-chan *ic.InterContainerMessage
100 lockTopicResponseChannelMap sync.RWMutex
101
102 // This map is used to map a transaction to a consumers channel. This is used whenever a request has been
103 // sent out and we are waiting for a response.
104 transactionIdToChannelMap map[string]*transactionChannel
105 lockTransactionIdToChannelMap sync.RWMutex
106}
107
108type InterContainerProxyOption func(*interContainerProxy)
109
110func InterContainerAddress(address string) InterContainerProxyOption {
111 return func(args *interContainerProxy) {
112 args.kafkaAddress = address
113 }
114}
115
116func DefaultTopic(topic *Topic) InterContainerProxyOption {
117 return func(args *interContainerProxy) {
118 args.defaultTopic = topic
119 }
120}
121
122func RequestHandlerInterface(handler interface{}) InterContainerProxyOption {
123 return func(args *interContainerProxy) {
124 args.defaultRequestHandlerInterface = handler
125 }
126}
127
128func MsgClient(client Client) InterContainerProxyOption {
129 return func(args *interContainerProxy) {
130 args.kafkaClient = client
131 }
132}
133
134func newInterContainerProxy(opts ...InterContainerProxyOption) *interContainerProxy {
135 proxy := &interContainerProxy{
136 kafkaAddress: DefaultKafkaAddress,
137 doneCh: make(chan struct{}),
138 }
139
140 for _, option := range opts {
141 option(proxy)
142 }
143
144 return proxy
145}
146
147func NewInterContainerProxy(opts ...InterContainerProxyOption) InterContainerProxy {
148 return newInterContainerProxy(opts...)
149}
150
151func (kp *interContainerProxy) Start(ctx context.Context) error {
152 logger.Info(ctx, "Starting-Proxy")
153
154 // Kafka MsgClient should already have been created. If not, output fatal error
155 if kp.kafkaClient == nil {
156 logger.Fatal(ctx, "kafka-client-not-set")
157 }
158
159 // Start the kafka client
160 if err := kp.kafkaClient.Start(ctx); err != nil {
161 logger.Errorw(ctx, "Cannot-create-kafka-proxy", log.Fields{"error": err})
162 return err
163 }
164
165 // Create the topic to response channel map
166 kp.topicToResponseChannelMap = make(map[string]<-chan *ic.InterContainerMessage)
167 //
168 // Create the transactionId to Channel Map
169 kp.transactionIdToChannelMap = make(map[string]*transactionChannel)
170
171 // Create the topic to request channel map
172 kp.topicToRequestHandlerChannelMap = make(map[string]*requestHandlerChannel)
173
174 return nil
175}
176
177func (kp *interContainerProxy) Stop(ctx context.Context) {
178 logger.Info(ctx, "stopping-intercontainer-proxy")
179 kp.doneOnce.Do(func() { close(kp.doneCh) })
180 // TODO : Perform cleanup
181 kp.kafkaClient.Stop(ctx)
182 err := kp.deleteAllTopicRequestHandlerChannelMap(ctx)
183 if err != nil {
184 logger.Errorw(ctx, "failed-delete-all-topic-request-handler-channel-map", log.Fields{"error": err})
185 }
186 err = kp.deleteAllTopicResponseChannelMap(ctx)
187 if err != nil {
188 logger.Errorw(ctx, "failed-delete-all-topic-response-channel-map", log.Fields{"error": err})
189 }
190 kp.deleteAllTransactionIdToChannelMap(ctx)
191}
192
193func (kp *interContainerProxy) GetDefaultTopic() *Topic {
194 return kp.defaultTopic
195}
196
197// InvokeAsyncRPC is used to make an RPC request asynchronously
198func (kp *interContainerProxy) InvokeAsyncRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
199 waitForResponse bool, key string, kvArgs ...*KVArg) chan *RpcResponse {
200
dbainbri4d3a0dc2020-12-02 00:33:42 +0000201 spanArg, span, ctx := kp.embedSpanAsArg(ctx, rpc, !waitForResponse)
202 if spanArg != nil {
203 kvArgs = append(kvArgs, &spanArg[0])
204 }
Girish Gowdra097aa822021-07-08 15:48:35 -0700205
dbainbri4d3a0dc2020-12-02 00:33:42 +0000206 defer span.Finish()
207
Girish Gowdra097aa822021-07-08 15:48:35 -0700208 logger.Debugw(ctx, "InvokeAsyncRPC", log.Fields{"rpc": rpc, "key": key, "kvArgs": kvArgs})
209
dbainbri4d3a0dc2020-12-02 00:33:42 +0000210 // If a replyToTopic is provided then we use it, otherwise just use the default toTopic. The replyToTopic is
211 // typically the device ID.
212 responseTopic := replyToTopic
213 if responseTopic == nil {
214 responseTopic = kp.GetDefaultTopic()
215 }
216
217 chnl := make(chan *RpcResponse)
218
219 go func() {
220
221 // once we're done,
222 // close the response channel
223 defer close(chnl)
224
225 var err error
226 var protoRequest *ic.InterContainerMessage
227
228 // Encode the request
229 protoRequest, err = encodeRequest(ctx, rpc, toTopic, responseTopic, key, kvArgs...)
230 if err != nil {
231 logger.Warnw(ctx, "cannot-format-request", log.Fields{"rpc": rpc, "error": err})
232 log.MarkSpanError(ctx, errors.New("cannot-format-request"))
233 chnl <- NewResponse(RpcFormattingError, err, nil)
234 return
235 }
236
237 // Subscribe for response, if needed, before sending request
238 var ch <-chan *ic.InterContainerMessage
239 if ch, err = kp.subscribeForResponse(ctx, *responseTopic, protoRequest.Header.Id); err != nil {
240 logger.Errorw(ctx, "failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
241 log.MarkSpanError(ctx, errors.New("failed-to-subscribe-for-response"))
242 chnl <- NewResponse(RpcTransportError, err, nil)
243 return
244 }
245
246 // Send request - if the topic is formatted with a device Id then we will send the request using a
247 // specific key, hence ensuring a single partition is used to publish the request. This ensures that the
248 // subscriber on that topic will receive the request in the order it was sent. The key used is the deviceId.
249 logger.Debugw(ctx, "sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
250
251 // if the message is not sent on kafka publish an event an close the channel
252 if err = kp.kafkaClient.Send(ctx, protoRequest, toTopic, key); err != nil {
253 chnl <- NewResponse(RpcTransportError, err, nil)
254 return
255 }
256
257 // if the client is not waiting for a response send the ack and close the channel
258 chnl <- NewResponse(RpcSent, nil, nil)
259 if !waitForResponse {
260 return
261 }
262
263 defer func() {
264 // Remove the subscription for a response on return
265 if err := kp.unSubscribeForResponse(ctx, protoRequest.Header.Id); err != nil {
266 logger.Warnw(ctx, "invoke-async-rpc-unsubscriber-for-response-failed", log.Fields{"err": err})
267 }
268 }()
269
270 // Wait for response as well as timeout or cancellation
271 select {
272 case msg, ok := <-ch:
273 if !ok {
274 logger.Warnw(ctx, "channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
275 log.MarkSpanError(ctx, errors.New("channel-closed"))
276 chnl <- NewResponse(RpcTransportError, status.Error(codes.Aborted, "channel closed"), nil)
277 }
278 logger.Debugw(ctx, "received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
279 if responseBody, err := decodeResponse(ctx, msg); err != nil {
280 chnl <- NewResponse(RpcReply, err, nil)
281 } else {
282 if responseBody.Success {
283 chnl <- NewResponse(RpcReply, nil, responseBody.Result)
284 } else {
285 // response body contains an error
286 unpackErr := &ic.Error{}
287 if err := ptypes.UnmarshalAny(responseBody.Result, unpackErr); err != nil {
288 chnl <- NewResponse(RpcReply, err, nil)
289 } else {
290 chnl <- NewResponse(RpcReply, status.Error(codes.Internal, unpackErr.Reason), nil)
291 }
292 }
293 }
294 case <-ctx.Done():
295 logger.Errorw(ctx, "context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
296 log.MarkSpanError(ctx, errors.New("context-cancelled"))
297 err := status.Error(codes.DeadlineExceeded, ctx.Err().Error())
298 chnl <- NewResponse(RpcTimeout, err, nil)
299 case <-kp.doneCh:
300 chnl <- NewResponse(RpcSystemClosing, nil, nil)
301 logger.Warnw(ctx, "received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
302 }
303 }()
304 return chnl
305}
306
307// Method to extract Open-tracing Span from Context and serialize it for transport over Kafka embedded as a additional argument.
308// Additional argument is injected using key as "span" and value as Span marshalled into a byte slice
309//
310// The span name is automatically constructed using the RPC name with following convention (<rpc-name> represents name of invoked method):
311// - RPC invoked in Sync manner (WaitForResponse=true) : kafka-rpc-<rpc-name>
312// - RPC invoked in Async manner (WaitForResponse=false) : kafka-async-rpc-<rpc-name>
313// - Inter Adapter RPC invoked in Sync manner (WaitForResponse=true) : kafka-inter-adapter-rpc-<rpc-name>
314// - Inter Adapter RPC invoked in Async manner (WaitForResponse=false) : kafka-inter-adapter-async-rpc-<rpc-name>
315func (kp *interContainerProxy) embedSpanAsArg(ctx context.Context, rpc string, isAsync bool) ([]KVArg, opentracing.Span, context.Context) {
316 var err error
317 var newCtx context.Context
318 var spanToInject opentracing.Span
319
Girish Gowdra097aa822021-07-08 15:48:35 -0700320 if !log.GetGlobalLFM().GetLogCorrelationStatus() && !log.GetGlobalLFM().GetTracePublishingStatus() {
321 // if both log correlation and trace publishing is disable do not generate the span
322 logger.Debugw(ctx, "not-embedding-span-in-KVArg-", log.Fields{"rpc": rpc,
323 "log-correlation-status": log.GetGlobalLFM().GetLogCorrelationStatus(), "trace-publishing-status": log.GetGlobalLFM().GetTracePublishingStatus()})
324 return nil, opentracing.GlobalTracer().StartSpan(rpc), ctx
325 }
326
dbainbri4d3a0dc2020-12-02 00:33:42 +0000327 var spanName strings.Builder
328 spanName.WriteString("kafka-")
329
330 // In case of inter adapter message, use Msg Type for constructing RPC name
331 if rpc == "process_inter_adapter_message" {
332 if msgType, ok := ctx.Value("inter-adapter-msg-type").(ic.InterAdapterMessageType_Types); ok {
333 spanName.WriteString("inter-adapter-")
334 rpc = msgType.String()
335 }
336 }
337
338 if isAsync {
339 spanName.WriteString("async-rpc-")
340 } else {
341 spanName.WriteString("rpc-")
342 }
343 spanName.WriteString(rpc)
344
345 if isAsync {
346 spanToInject, newCtx = log.CreateAsyncSpan(ctx, spanName.String())
347 } else {
348 spanToInject, newCtx = log.CreateChildSpan(ctx, spanName.String())
349 }
350
351 spanToInject.SetBaggageItem("rpc-span-name", spanName.String())
352
353 textMapCarrier := opentracing.TextMapCarrier(make(map[string]string))
354 if err = opentracing.GlobalTracer().Inject(spanToInject.Context(), opentracing.TextMap, textMapCarrier); err != nil {
355 logger.Warnw(ctx, "unable-to-serialize-span-to-textmap", log.Fields{"span": spanToInject, "error": err})
356 return nil, spanToInject, newCtx
357 }
358
359 var textMapJson []byte
360 if textMapJson, err = json.Marshal(textMapCarrier); err != nil {
361 logger.Warnw(ctx, "unable-to-marshal-textmap-to-json-string", log.Fields{"textMap": textMapCarrier, "error": err})
362 return nil, spanToInject, newCtx
363 }
364
365 spanArg := make([]KVArg, 1)
366 spanArg[0] = KVArg{Key: "span", Value: &ic.StrType{Val: string(textMapJson)}}
367 return spanArg, spanToInject, newCtx
368}
369
370// InvokeRPC is used to send a request to a given topic
371func (kp *interContainerProxy) InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
372 waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any) {
373
374 spanArg, span, ctx := kp.embedSpanAsArg(ctx, rpc, false)
375 if spanArg != nil {
376 kvArgs = append(kvArgs, &spanArg[0])
377 }
Girish Gowdra097aa822021-07-08 15:48:35 -0700378
dbainbri4d3a0dc2020-12-02 00:33:42 +0000379 defer span.Finish()
380
Girish Gowdra097aa822021-07-08 15:48:35 -0700381 logger.Debugw(ctx, "InvokeRPC", log.Fields{"rpc": rpc, "key": key, "kvArgs": kvArgs})
382
dbainbri4d3a0dc2020-12-02 00:33:42 +0000383 // If a replyToTopic is provided then we use it, otherwise just use the default toTopic. The replyToTopic is
384 // typically the device ID.
385 responseTopic := replyToTopic
386 if responseTopic == nil {
387 responseTopic = kp.defaultTopic
388 }
389
390 // Encode the request
391 protoRequest, err := encodeRequest(ctx, rpc, toTopic, responseTopic, key, kvArgs...)
392 if err != nil {
393 logger.Warnw(ctx, "cannot-format-request", log.Fields{"rpc": rpc, "error": err})
394 log.MarkSpanError(ctx, errors.New("cannot-format-request"))
395 return false, nil
396 }
397
398 // Subscribe for response, if needed, before sending request
399 var ch <-chan *ic.InterContainerMessage
400 if waitForResponse {
401 var err error
402 if ch, err = kp.subscribeForResponse(ctx, *responseTopic, protoRequest.Header.Id); err != nil {
403 logger.Errorw(ctx, "failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
404 }
405 }
406
407 // Send request - if the topic is formatted with a device Id then we will send the request using a
408 // specific key, hence ensuring a single partition is used to publish the request. This ensures that the
409 // subscriber on that topic will receive the request in the order it was sent. The key used is the deviceId.
410 //key := GetDeviceIdFromTopic(*toTopic)
411 logger.Debugw(ctx, "sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
412 go func() {
413 if err := kp.kafkaClient.Send(ctx, protoRequest, toTopic, key); err != nil {
414 log.MarkSpanError(ctx, errors.New("send-failed"))
415 logger.Errorw(ctx, "send-failed", log.Fields{
416 "topic": toTopic,
417 "key": key,
418 "error": err})
419 }
420 }()
421
422 if waitForResponse {
423 // Create a child context based on the parent context, if any
424 var cancel context.CancelFunc
425 childCtx := context.Background()
426 if ctx == nil {
427 ctx, cancel = context.WithTimeout(context.Background(), DefaultRequestTimeout*time.Millisecond)
428 } else {
429 childCtx, cancel = context.WithTimeout(ctx, DefaultRequestTimeout*time.Millisecond)
430 }
431 defer cancel()
432
433 // Wait for response as well as timeout or cancellation
434 // Remove the subscription for a response on return
435 defer func() {
436 if err := kp.unSubscribeForResponse(ctx, protoRequest.Header.Id); err != nil {
437 logger.Errorw(ctx, "response-unsubscribe-failed", log.Fields{
438 "id": protoRequest.Header.Id,
439 "error": err})
440 }
441 }()
442 select {
443 case msg, ok := <-ch:
444 if !ok {
445 logger.Warnw(ctx, "channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
446 log.MarkSpanError(ctx, errors.New("channel-closed"))
447 protoError := &ic.Error{Reason: "channel-closed"}
448 var marshalledArg *any.Any
449 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
450 return false, nil // Should never happen
451 }
452 return false, marshalledArg
453 }
454 logger.Debugw(ctx, "received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
455 var responseBody *ic.InterContainerResponseBody
456 var err error
457 if responseBody, err = decodeResponse(ctx, msg); err != nil {
458 logger.Errorw(ctx, "decode-response-error", log.Fields{"error": err})
459 // FIXME we should return something
460 }
461 return responseBody.Success, responseBody.Result
462 case <-ctx.Done():
463 logger.Debugw(ctx, "context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
464 log.MarkSpanError(ctx, errors.New("context-cancelled"))
465 // pack the error as proto any type
466 protoError := &ic.Error{Reason: ctx.Err().Error(), Code: ic.ErrorCode_DEADLINE_EXCEEDED}
467
468 var marshalledArg *any.Any
469 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
470 return false, nil // Should never happen
471 }
472 return false, marshalledArg
473 case <-childCtx.Done():
474 logger.Debugw(ctx, "context-cancelled", log.Fields{"rpc": rpc, "ctx": childCtx.Err()})
475 log.MarkSpanError(ctx, errors.New("context-cancelled"))
476 // pack the error as proto any type
477 protoError := &ic.Error{Reason: childCtx.Err().Error(), Code: ic.ErrorCode_DEADLINE_EXCEEDED}
478
479 var marshalledArg *any.Any
480 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
481 return false, nil // Should never happen
482 }
483 return false, marshalledArg
484 case <-kp.doneCh:
485 logger.Infow(ctx, "received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
486 return true, nil
487 }
488 }
489 return true, nil
490}
491
492// SubscribeWithRequestHandlerInterface allows a caller to assign a target object to be invoked automatically
493// when a message is received on a given topic
494func (kp *interContainerProxy) SubscribeWithRequestHandlerInterface(ctx context.Context, topic Topic, handler interface{}) error {
495
496 // Subscribe to receive messages for that topic
497 var ch <-chan *ic.InterContainerMessage
498 var err error
499 if ch, err = kp.kafkaClient.Subscribe(ctx, &topic); err != nil {
500 //if ch, err = kp.Subscribe(topic); err != nil {
501 logger.Errorw(ctx, "failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
502 return err
503 }
504
505 kp.defaultRequestHandlerInterface = handler
506 kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: handler, ch: ch})
507 // Launch a go routine to receive and process kafka messages
508 go kp.waitForMessages(ctx, ch, topic, handler)
509
510 return nil
511}
512
513// SubscribeWithDefaultRequestHandler allows a caller to add a topic to an existing target object to be invoked automatically
514// when a message is received on a given topic. So far there is only 1 target registered per microservice
515func (kp *interContainerProxy) SubscribeWithDefaultRequestHandler(ctx context.Context, topic Topic, initialOffset int64) error {
516 // Subscribe to receive messages for that topic
517 var ch <-chan *ic.InterContainerMessage
518 var err error
519 if ch, err = kp.kafkaClient.Subscribe(ctx, &topic, &KVArg{Key: Offset, Value: initialOffset}); err != nil {
520 logger.Errorw(ctx, "failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
521 return err
522 }
523 kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: kp.defaultRequestHandlerInterface, ch: ch})
524
525 // Launch a go routine to receive and process kafka messages
526 go kp.waitForMessages(ctx, ch, topic, kp.defaultRequestHandlerInterface)
527
528 return nil
529}
530
531func (kp *interContainerProxy) UnSubscribeFromRequestHandler(ctx context.Context, topic Topic) error {
532 return kp.deleteFromTopicRequestHandlerChannelMap(ctx, topic.Name)
533}
534
535func (kp *interContainerProxy) deleteFromTopicResponseChannelMap(ctx context.Context, topic string) error {
536 kp.lockTopicResponseChannelMap.Lock()
537 defer kp.lockTopicResponseChannelMap.Unlock()
538 if _, exist := kp.topicToResponseChannelMap[topic]; exist {
539 // Unsubscribe to this topic first - this will close the subscribed channel
540 var err error
541 if err = kp.kafkaClient.UnSubscribe(ctx, &Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
542 logger.Errorw(ctx, "unsubscribing-error", log.Fields{"topic": topic})
543 }
544 delete(kp.topicToResponseChannelMap, topic)
545 return err
546 } else {
547 return fmt.Errorf("%s-Topic-not-found", topic)
548 }
549}
550
551// nolint: unused
552func (kp *interContainerProxy) deleteAllTopicResponseChannelMap(ctx context.Context) error {
553 logger.Debug(ctx, "delete-all-topic-response-channel")
554 kp.lockTopicResponseChannelMap.Lock()
555 defer kp.lockTopicResponseChannelMap.Unlock()
556 var unsubscribeFailTopics []string
557 for topic := range kp.topicToResponseChannelMap {
558 // Unsubscribe to this topic first - this will close the subscribed channel
559 if err := kp.kafkaClient.UnSubscribe(ctx, &Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
560 unsubscribeFailTopics = append(unsubscribeFailTopics, topic)
561 logger.Errorw(ctx, "unsubscribing-error", log.Fields{"topic": topic, "error": err})
562 // Do not return. Continue to try to unsubscribe to other topics.
563 } else {
564 // Only delete from channel map if successfully unsubscribed.
565 delete(kp.topicToResponseChannelMap, topic)
566 }
567 }
568 if len(unsubscribeFailTopics) > 0 {
569 return fmt.Errorf("unsubscribe-errors: %v", unsubscribeFailTopics)
570 }
571 return nil
572}
573
574func (kp *interContainerProxy) addToTopicRequestHandlerChannelMap(topic string, arg *requestHandlerChannel) {
575 kp.lockTopicRequestHandlerChannelMap.Lock()
576 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
577 if _, exist := kp.topicToRequestHandlerChannelMap[topic]; !exist {
578 kp.topicToRequestHandlerChannelMap[topic] = arg
579 }
580}
581
582func (kp *interContainerProxy) deleteFromTopicRequestHandlerChannelMap(ctx context.Context, topic string) error {
583 kp.lockTopicRequestHandlerChannelMap.Lock()
584 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
585 if _, exist := kp.topicToRequestHandlerChannelMap[topic]; exist {
586 // Close the kafka client client first by unsubscribing to this topic
587 if err := kp.kafkaClient.UnSubscribe(ctx, &Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
588 return err
589 }
590 delete(kp.topicToRequestHandlerChannelMap, topic)
591 return nil
592 } else {
593 return fmt.Errorf("%s-Topic-not-found", topic)
594 }
595}
596
597// nolint: unused
598func (kp *interContainerProxy) deleteAllTopicRequestHandlerChannelMap(ctx context.Context) error {
599 logger.Debug(ctx, "delete-all-topic-request-channel")
600 kp.lockTopicRequestHandlerChannelMap.Lock()
601 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
602 var unsubscribeFailTopics []string
603 for topic := range kp.topicToRequestHandlerChannelMap {
604 // Close the kafka client client first by unsubscribing to this topic
605 if err := kp.kafkaClient.UnSubscribe(ctx, &Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
606 unsubscribeFailTopics = append(unsubscribeFailTopics, topic)
607 logger.Errorw(ctx, "unsubscribing-error", log.Fields{"topic": topic, "error": err})
608 // Do not return. Continue to try to unsubscribe to other topics.
609 } else {
610 // Only delete from channel map if successfully unsubscribed.
611 delete(kp.topicToRequestHandlerChannelMap, topic)
612 }
613 }
614 if len(unsubscribeFailTopics) > 0 {
615 return fmt.Errorf("unsubscribe-errors: %v", unsubscribeFailTopics)
616 }
617 return nil
618}
619
620func (kp *interContainerProxy) addToTransactionIdToChannelMap(id string, topic *Topic, arg chan *ic.InterContainerMessage) {
621 kp.lockTransactionIdToChannelMap.Lock()
622 defer kp.lockTransactionIdToChannelMap.Unlock()
623 if _, exist := kp.transactionIdToChannelMap[id]; !exist {
624 kp.transactionIdToChannelMap[id] = &transactionChannel{topic: topic, ch: arg}
625 }
626}
627
628func (kp *interContainerProxy) deleteFromTransactionIdToChannelMap(id string) {
629 kp.lockTransactionIdToChannelMap.Lock()
630 defer kp.lockTransactionIdToChannelMap.Unlock()
631 if transChannel, exist := kp.transactionIdToChannelMap[id]; exist {
632 // Close the channel first
633 close(transChannel.ch)
634 delete(kp.transactionIdToChannelMap, id)
635 }
636}
637
638func (kp *interContainerProxy) deleteTopicTransactionIdToChannelMap(id string) {
639 kp.lockTransactionIdToChannelMap.Lock()
640 defer kp.lockTransactionIdToChannelMap.Unlock()
641 for key, value := range kp.transactionIdToChannelMap {
642 if value.topic.Name == id {
643 close(value.ch)
644 delete(kp.transactionIdToChannelMap, key)
645 }
646 }
647}
648
649// nolint: unused
650func (kp *interContainerProxy) deleteAllTransactionIdToChannelMap(ctx context.Context) {
651 logger.Debug(ctx, "delete-all-transaction-id-channel-map")
652 kp.lockTransactionIdToChannelMap.Lock()
653 defer kp.lockTransactionIdToChannelMap.Unlock()
654 for key, value := range kp.transactionIdToChannelMap {
655 close(value.ch)
656 delete(kp.transactionIdToChannelMap, key)
657 }
658}
659
660func (kp *interContainerProxy) DeleteTopic(ctx context.Context, topic Topic) error {
661 // If we have any consumers on that topic we need to close them
662 if err := kp.deleteFromTopicResponseChannelMap(ctx, topic.Name); err != nil {
663 logger.Errorw(ctx, "delete-from-topic-responsechannelmap-failed", log.Fields{"error": err})
664 }
665 if err := kp.deleteFromTopicRequestHandlerChannelMap(ctx, topic.Name); err != nil {
666 logger.Errorw(ctx, "delete-from-topic-requesthandlerchannelmap-failed", log.Fields{"error": err})
667 }
668 kp.deleteTopicTransactionIdToChannelMap(topic.Name)
669
670 return kp.kafkaClient.DeleteTopic(ctx, &topic)
671}
672
673func encodeReturnedValue(ctx context.Context, returnedVal interface{}) (*any.Any, error) {
674 // Encode the response argument - needs to be a proto message
675 if returnedVal == nil {
676 return nil, nil
677 }
678 protoValue, ok := returnedVal.(proto.Message)
679 if !ok {
680 logger.Warnw(ctx, "response-value-not-proto-message", log.Fields{"error": ok, "returnVal": returnedVal})
681 err := errors.New("response-value-not-proto-message")
682 return nil, err
683 }
684
685 // Marshal the returned value, if any
686 var marshalledReturnedVal *any.Any
687 var err error
688 if marshalledReturnedVal, err = ptypes.MarshalAny(protoValue); err != nil {
689 logger.Warnw(ctx, "cannot-marshal-returned-val", log.Fields{"error": err})
690 return nil, err
691 }
692 return marshalledReturnedVal, nil
693}
694
695func encodeDefaultFailedResponse(ctx context.Context, request *ic.InterContainerMessage) *ic.InterContainerMessage {
696 responseHeader := &ic.Header{
697 Id: request.Header.Id,
698 Type: ic.MessageType_RESPONSE,
699 FromTopic: request.Header.ToTopic,
700 ToTopic: request.Header.FromTopic,
701 Timestamp: ptypes.TimestampNow(),
702 }
703 responseBody := &ic.InterContainerResponseBody{
704 Success: false,
705 Result: nil,
706 }
707 var marshalledResponseBody *any.Any
708 var err error
709 // Error should never happen here
710 if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
711 logger.Warnw(ctx, "cannot-marshal-failed-response-body", log.Fields{"error": err})
712 }
713
714 return &ic.InterContainerMessage{
715 Header: responseHeader,
716 Body: marshalledResponseBody,
717 }
718
719}
720
721//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
722//or an error on failure
723func encodeResponse(ctx context.Context, request *ic.InterContainerMessage, success bool, returnedValues ...interface{}) (*ic.InterContainerMessage, error) {
724 //logger.Debugw(ctx, "encodeResponse", log.Fields{"success": success, "returnedValues": returnedValues})
725 responseHeader := &ic.Header{
726 Id: request.Header.Id,
727 Type: ic.MessageType_RESPONSE,
728 FromTopic: request.Header.ToTopic,
729 ToTopic: request.Header.FromTopic,
730 KeyTopic: request.Header.KeyTopic,
731 Timestamp: ptypes.TimestampNow(),
732 }
733
734 // Go over all returned values
735 var marshalledReturnedVal *any.Any
736 var err error
737
738 // for now we support only 1 returned value - (excluding the error)
739 if len(returnedValues) > 0 {
740 if marshalledReturnedVal, err = encodeReturnedValue(ctx, returnedValues[0]); err != nil {
741 logger.Warnw(ctx, "cannot-marshal-response-body", log.Fields{"error": err})
742 }
743 }
744
745 responseBody := &ic.InterContainerResponseBody{
746 Success: success,
747 Result: marshalledReturnedVal,
748 }
749
750 // Marshal the response body
751 var marshalledResponseBody *any.Any
752 if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
753 logger.Warnw(ctx, "cannot-marshal-response-body", log.Fields{"error": err})
754 return nil, err
755 }
756
757 return &ic.InterContainerMessage{
758 Header: responseHeader,
759 Body: marshalledResponseBody,
760 }, nil
761}
762
763func CallFuncByName(ctx context.Context, myClass interface{}, funcName string, params ...interface{}) (out []reflect.Value, err error) {
764 myClassValue := reflect.ValueOf(myClass)
765 // Capitalize the first letter in the funcName to workaround the first capital letters required to
766 // invoke a function from a different package
767 funcName = strings.Title(funcName)
768 m := myClassValue.MethodByName(funcName)
769 if !m.IsValid() {
770 return make([]reflect.Value, 0), fmt.Errorf("method-not-found \"%s\"", funcName)
771 }
772 in := make([]reflect.Value, len(params)+1)
773 in[0] = reflect.ValueOf(ctx)
774 for i, param := range params {
775 in[i+1] = reflect.ValueOf(param)
776 }
777 out = m.Call(in)
778 return
779}
780
781func (kp *interContainerProxy) addTransactionId(ctx context.Context, transactionId string, currentArgs []*ic.Argument) []*ic.Argument {
782 arg := &KVArg{
783 Key: TransactionKey,
784 Value: &ic.StrType{Val: transactionId},
785 }
786
787 var marshalledArg *any.Any
788 var err error
789 if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: transactionId}); err != nil {
790 logger.Warnw(ctx, "cannot-add-transactionId", log.Fields{"error": err})
791 return currentArgs
792 }
793 protoArg := &ic.Argument{
794 Key: arg.Key,
795 Value: marshalledArg,
796 }
797 return append(currentArgs, protoArg)
798}
799
800func (kp *interContainerProxy) addFromTopic(ctx context.Context, fromTopic string, currentArgs []*ic.Argument) []*ic.Argument {
801 var marshalledArg *any.Any
802 var err error
803 if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: fromTopic}); err != nil {
804 logger.Warnw(ctx, "cannot-add-transactionId", log.Fields{"error": err})
805 return currentArgs
806 }
807 protoArg := &ic.Argument{
808 Key: FromTopic,
809 Value: marshalledArg,
810 }
811 return append(currentArgs, protoArg)
812}
813
814// Method to extract the Span embedded in Kafka RPC request on the receiver side. If span is found embedded in the KV args (with key as "span"),
815// it is de-serialized and injected into the Context to be carried forward by the RPC request processor thread.
816// If no span is found embedded, even then a span is created with name as "kafka-rpc-<rpc-name>" to enrich the Context for RPC calls coming
817// from components currently not sending the span (e.g. openonu adapter)
818func (kp *interContainerProxy) enrichContextWithSpan(ctx context.Context, rpcName string, args []*ic.Argument) (opentracing.Span, context.Context) {
819
820 for _, arg := range args {
821 if arg.Key == "span" {
822 var err error
823 var textMapString ic.StrType
824 if err = ptypes.UnmarshalAny(arg.Value, &textMapString); err != nil {
Girish Gowdra097aa822021-07-08 15:48:35 -0700825 logger.Debug(ctx, "unable-to-unmarshal-kvarg-to-textmap-string", log.Fields{"value": arg.Value})
dbainbri4d3a0dc2020-12-02 00:33:42 +0000826 break
827 }
828
829 spanTextMap := make(map[string]string)
830 if err = json.Unmarshal([]byte(textMapString.Val), &spanTextMap); err != nil {
Girish Gowdra097aa822021-07-08 15:48:35 -0700831 logger.Debug(ctx, "unable-to-unmarshal-textmap-from-json-string", log.Fields{"textMapString": textMapString, "error": err})
dbainbri4d3a0dc2020-12-02 00:33:42 +0000832 break
833 }
834
835 var spanContext opentracing.SpanContext
836 if spanContext, err = opentracing.GlobalTracer().Extract(opentracing.TextMap, opentracing.TextMapCarrier(spanTextMap)); err != nil {
Girish Gowdra097aa822021-07-08 15:48:35 -0700837 logger.Debug(ctx, "unable-to-deserialize-textmap-to-span", log.Fields{"textMap": spanTextMap, "error": err})
dbainbri4d3a0dc2020-12-02 00:33:42 +0000838 break
839 }
840
841 var receivedRpcName string
842 extractBaggage := func(k, v string) bool {
843 if k == "rpc-span-name" {
844 receivedRpcName = v
845 return false
846 }
847 return true
848 }
849
850 spanContext.ForeachBaggageItem(extractBaggage)
851
852 return opentracing.StartSpanFromContext(ctx, receivedRpcName, opentracing.FollowsFrom(spanContext))
853 }
854 }
855
856 // Create new Child Span with rpc as name if no span details were received in kafka arguments
857 var spanName strings.Builder
858 spanName.WriteString("kafka-")
859
860 // In case of inter adapter message, use Msg Type for constructing RPC name
861 if rpcName == "process_inter_adapter_message" {
862 for _, arg := range args {
863 if arg.Key == "msg" {
864 iamsg := ic.InterAdapterMessage{}
865 if err := ptypes.UnmarshalAny(arg.Value, &iamsg); err == nil {
866 spanName.WriteString("inter-adapter-")
867 rpcName = iamsg.Header.Type.String()
868 }
869 }
870 }
871 }
872
873 spanName.WriteString("rpc-")
874 spanName.WriteString(rpcName)
875
876 return opentracing.StartSpanFromContext(ctx, spanName.String())
877}
878
879func (kp *interContainerProxy) handleMessage(ctx context.Context, msg *ic.InterContainerMessage, targetInterface interface{}) {
880
881 // First extract the header to know whether this is a request - responses are handled by a different handler
882 if msg.Header.Type == ic.MessageType_REQUEST {
883 var out []reflect.Value
884 var err error
885
886 // Get the request body
887 requestBody := &ic.InterContainerRequestBody{}
888 if err = ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
889 logger.Warnw(ctx, "cannot-unmarshal-request", log.Fields{"error": err})
890 } else {
Girish Gowdra097aa822021-07-08 15:48:35 -0700891 logger.Debugw(ctx, "received-request", log.Fields{"rpc": requestBody.Rpc, "header": msg.Header, "args": requestBody.Args})
dbainbri4d3a0dc2020-12-02 00:33:42 +0000892 span, ctx := kp.enrichContextWithSpan(ctx, requestBody.Rpc, requestBody.Args)
893 defer span.Finish()
894
dbainbri4d3a0dc2020-12-02 00:33:42 +0000895 // let the callee unpack the arguments as its the only one that knows the real proto type
896 // Augment the requestBody with the message Id as it will be used in scenarios where cores
897 // are set in pairs and competing
898 requestBody.Args = kp.addTransactionId(ctx, msg.Header.Id, requestBody.Args)
899
900 // Augment the requestBody with the From topic name as it will be used in scenarios where a container
901 // needs to send an unsollicited message to the currently requested container
902 requestBody.Args = kp.addFromTopic(ctx, msg.Header.FromTopic, requestBody.Args)
903
904 out, err = CallFuncByName(ctx, targetInterface, requestBody.Rpc, requestBody.Args)
905 if err != nil {
906 logger.Warn(ctx, err)
907 }
908 }
909 // Response required?
910 if requestBody.ResponseRequired {
911 // If we already have an error before then just return that
912 var returnError *ic.Error
913 var returnedValues []interface{}
914 var success bool
915 if err != nil {
916 returnError = &ic.Error{Reason: err.Error()}
917 returnedValues = make([]interface{}, 1)
918 returnedValues[0] = returnError
919 } else {
920 returnedValues = make([]interface{}, 0)
921 // Check for errors first
922 lastIndex := len(out) - 1
923 if out[lastIndex].Interface() != nil { // Error
924 if retError, ok := out[lastIndex].Interface().(error); ok {
925 if retError.Error() == ErrorTransactionNotAcquired.Error() {
926 logger.Debugw(ctx, "Ignoring request", log.Fields{"error": retError, "txId": msg.Header.Id})
927 return // Ignore - process is in competing mode and ignored transaction
928 }
929 returnError = &ic.Error{Reason: retError.Error()}
930 returnedValues = append(returnedValues, returnError)
931 } else { // Should never happen
932 returnError = &ic.Error{Reason: "incorrect-error-returns"}
933 returnedValues = append(returnedValues, returnError)
934 }
935 } else if len(out) == 2 && reflect.ValueOf(out[0].Interface()).IsValid() && reflect.ValueOf(out[0].Interface()).IsNil() {
936 logger.Warnw(ctx, "Unexpected response of (nil,nil)", log.Fields{"txId": msg.Header.Id})
937 return // Ignore - should not happen
938 } else { // Non-error case
939 success = true
940 for idx, val := range out {
941 //logger.Debugw(ctx, "returned-api-response-loop", log.Fields{"idx": idx, "val": val.Interface()})
942 if idx != lastIndex {
943 returnedValues = append(returnedValues, val.Interface())
944 }
945 }
946 }
947 }
948
949 var icm *ic.InterContainerMessage
950 if icm, err = encodeResponse(ctx, msg, success, returnedValues...); err != nil {
951 logger.Warnw(ctx, "error-encoding-response-returning-failure-result", log.Fields{"error": err})
952 icm = encodeDefaultFailedResponse(ctx, msg)
953 }
954 // To preserve ordering of messages, all messages to a given topic are sent to the same partition
955 // by providing a message key. The key is encoded in the topic name. If the deviceId is not
956 // present then the key will be empty, hence all messages for a given topic will be sent to all
957 // partitions.
958 replyTopic := &Topic{Name: msg.Header.FromTopic}
959 key := msg.Header.KeyTopic
960 logger.Debugw(ctx, "sending-response-to-kafka", log.Fields{"rpc": requestBody.Rpc, "header": icm.Header, "key": key})
961 // TODO: handle error response.
962 go func() {
963 if err := kp.kafkaClient.Send(ctx, icm, replyTopic, key); err != nil {
964 logger.Errorw(ctx, "send-reply-failed", log.Fields{
965 "topic": replyTopic,
966 "key": key,
967 "error": err})
968 }
969 }()
970 }
971 } else if msg.Header.Type == ic.MessageType_RESPONSE {
972 logger.Debugw(ctx, "response-received", log.Fields{"msg-header": msg.Header})
973 go kp.dispatchResponse(ctx, msg)
974 } else {
975 logger.Warnw(ctx, "unsupported-message-received", log.Fields{"msg-header": msg.Header})
976 }
977}
978
979func (kp *interContainerProxy) waitForMessages(ctx context.Context, ch <-chan *ic.InterContainerMessage, topic Topic, targetInterface interface{}) {
980 // Wait for messages
981 for msg := range ch {
982 //logger.Debugw(ctx, "request-received", log.Fields{"msg": msg, "topic": topic.Name, "target": targetInterface})
983 go kp.handleMessage(context.Background(), msg, targetInterface)
984 }
985}
986
987func (kp *interContainerProxy) dispatchResponse(ctx context.Context, msg *ic.InterContainerMessage) {
988 kp.lockTransactionIdToChannelMap.RLock()
989 defer kp.lockTransactionIdToChannelMap.RUnlock()
990 if _, exist := kp.transactionIdToChannelMap[msg.Header.Id]; !exist {
991 logger.Debugw(ctx, "no-waiting-channel", log.Fields{"transaction": msg.Header.Id})
992 return
993 }
994 kp.transactionIdToChannelMap[msg.Header.Id].ch <- msg
995}
996
997// subscribeForResponse allows a caller to subscribe to a given topic when waiting for a response.
998// This method is built to prevent all subscribers to receive all messages as is the case of the Subscribe
999// API. There is one response channel waiting for kafka messages before dispatching the message to the
1000// corresponding waiting channel
1001func (kp *interContainerProxy) subscribeForResponse(ctx context.Context, topic Topic, trnsId string) (chan *ic.InterContainerMessage, error) {
1002 logger.Debugw(ctx, "subscribeForResponse", log.Fields{"topic": topic.Name, "trnsid": trnsId})
1003
1004 // Create a specific channel for this consumers. We cannot use the channel from the kafkaclient as it will
1005 // broadcast any message for this topic to all channels waiting on it.
1006 // Set channel size to 1 to prevent deadlock, see VOL-2708
1007 ch := make(chan *ic.InterContainerMessage, 1)
1008 kp.addToTransactionIdToChannelMap(trnsId, &topic, ch)
1009
1010 return ch, nil
1011}
1012
1013func (kp *interContainerProxy) unSubscribeForResponse(ctx context.Context, trnsId string) error {
1014 logger.Debugw(ctx, "unsubscribe-for-response", log.Fields{"trnsId": trnsId})
1015 kp.deleteFromTransactionIdToChannelMap(trnsId)
1016 return nil
1017}
1018
1019func (kp *interContainerProxy) EnableLivenessChannel(ctx context.Context, enable bool) chan bool {
1020 return kp.kafkaClient.EnableLivenessChannel(ctx, enable)
1021}
1022
1023func (kp *interContainerProxy) EnableHealthinessChannel(ctx context.Context, enable bool) chan bool {
1024 return kp.kafkaClient.EnableHealthinessChannel(ctx, enable)
1025}
1026
1027func (kp *interContainerProxy) SendLiveness(ctx context.Context) error {
1028 return kp.kafkaClient.SendLiveness(ctx)
1029}
1030
1031//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
1032//or an error on failure
1033func encodeRequest(ctx context.Context, rpc string, toTopic *Topic, replyTopic *Topic, key string, kvArgs ...*KVArg) (*ic.InterContainerMessage, error) {
1034 requestHeader := &ic.Header{
1035 Id: uuid.New().String(),
1036 Type: ic.MessageType_REQUEST,
1037 FromTopic: replyTopic.Name,
1038 ToTopic: toTopic.Name,
1039 KeyTopic: key,
1040 Timestamp: ptypes.TimestampNow(),
1041 }
1042 requestBody := &ic.InterContainerRequestBody{
1043 Rpc: rpc,
1044 ResponseRequired: true,
1045 ReplyToTopic: replyTopic.Name,
1046 }
1047
1048 for _, arg := range kvArgs {
1049 if arg == nil {
1050 // In case the caller sends an array with empty args
1051 continue
1052 }
1053 var marshalledArg *any.Any
1054 var err error
1055 // ascertain the value interface type is a proto.Message
1056 protoValue, ok := arg.Value.(proto.Message)
1057 if !ok {
1058 logger.Warnw(ctx, "argument-value-not-proto-message", log.Fields{"error": ok, "Value": arg.Value})
1059 err := errors.New("argument-value-not-proto-message")
1060 return nil, err
1061 }
1062 if marshalledArg, err = ptypes.MarshalAny(protoValue); err != nil {
1063 logger.Warnw(ctx, "cannot-marshal-request", log.Fields{"error": err})
1064 return nil, err
1065 }
1066 protoArg := &ic.Argument{
1067 Key: arg.Key,
1068 Value: marshalledArg,
1069 }
1070 requestBody.Args = append(requestBody.Args, protoArg)
1071 }
1072
1073 var marshalledData *any.Any
1074 var err error
1075 if marshalledData, err = ptypes.MarshalAny(requestBody); err != nil {
1076 logger.Warnw(ctx, "cannot-marshal-request", log.Fields{"error": err})
1077 return nil, err
1078 }
1079 request := &ic.InterContainerMessage{
1080 Header: requestHeader,
1081 Body: marshalledData,
1082 }
1083 return request, nil
1084}
1085
1086func decodeResponse(ctx context.Context, response *ic.InterContainerMessage) (*ic.InterContainerResponseBody, error) {
1087 // Extract the message body
1088 responseBody := ic.InterContainerResponseBody{}
1089 if err := ptypes.UnmarshalAny(response.Body, &responseBody); err != nil {
1090 logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
1091 return nil, err
1092 }
1093 //logger.Debugw(ctx, "response-decoded-successfully", log.Fields{"response-status": &responseBody.Success})
1094
1095 return &responseBody, nil
1096
1097}