blob: d874fb6b60fe7ff2b2610ae22739671635de6fb3 [file] [log] [blame]
Scott Baker2c1c4822019-10-16 11:02:41 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kafka
17
18import (
19 "context"
Girish Kumar74240652020-07-10 11:54:28 +000020 "encoding/json"
Scott Baker2c1c4822019-10-16 11:02:41 -070021 "errors"
22 "fmt"
Girish Kumar74240652020-07-10 11:54:28 +000023 "google.golang.org/grpc/codes"
24 "google.golang.org/grpc/status"
Scott Baker2c1c4822019-10-16 11:02:41 -070025 "reflect"
26 "strings"
27 "sync"
28 "time"
serkant.uluderyab38671c2019-11-01 09:35:38 -070029
30 "github.com/golang/protobuf/proto"
31 "github.com/golang/protobuf/ptypes"
32 "github.com/golang/protobuf/ptypes/any"
33 "github.com/google/uuid"
Girish Gowdra248971a2021-06-01 15:14:15 -070034 "github.com/opencord/voltha-lib-go/v5/pkg/log"
Girish Gowdra89c985b2020-10-14 15:02:09 -070035 ic "github.com/opencord/voltha-protos/v4/go/inter_container"
Girish Kumar74240652020-07-10 11:54:28 +000036 "github.com/opentracing/opentracing-go"
Scott Baker2c1c4822019-10-16 11:02:41 -070037)
38
Scott Baker2c1c4822019-10-16 11:02:41 -070039const (
40 DefaultMaxRetries = 3
Matteo Scandoloed128822020-02-10 15:52:35 -080041 DefaultRequestTimeout = 60000 // 60000 milliseconds - to handle a wider latency range
Scott Baker2c1c4822019-10-16 11:02:41 -070042)
43
44const (
45 TransactionKey = "transactionID"
46 FromTopic = "fromTopic"
47)
48
49var ErrorTransactionNotAcquired = errors.New("transaction-not-acquired")
50var ErrorTransactionInvalidId = errors.New("transaction-invalid-id")
51
52// requestHandlerChannel represents an interface associated with a channel. Whenever, an event is
53// obtained from that channel, this interface is invoked. This is used to handle
54// async requests into the Core via the kafka messaging bus
55type requestHandlerChannel struct {
56 requesthandlerInterface interface{}
57 ch <-chan *ic.InterContainerMessage
58}
59
60// transactionChannel represents a combination of a topic and a channel onto which a response received
61// on the kafka bus will be sent to
62type transactionChannel struct {
63 topic *Topic
64 ch chan *ic.InterContainerMessage
65}
66
Matteo Scandolo2ba00d32020-01-16 17:33:03 -080067type InterContainerProxy interface {
Neha Sharma94f16a92020-06-26 04:17:55 +000068 Start(ctx context.Context) error
69 Stop(ctx context.Context)
Matteo Scandolof346a2d2020-01-24 13:14:54 -080070 GetDefaultTopic() *Topic
Matteo Scandolo2ba00d32020-01-16 17:33:03 -080071 InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic, waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any)
Matteo Scandoloed128822020-02-10 15:52:35 -080072 InvokeAsyncRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic, waitForResponse bool, key string, kvArgs ...*KVArg) chan *RpcResponse
Neha Sharma94f16a92020-06-26 04:17:55 +000073 SubscribeWithRequestHandlerInterface(ctx context.Context, topic Topic, handler interface{}) error
74 SubscribeWithDefaultRequestHandler(ctx context.Context, topic Topic, initialOffset int64) error
75 UnSubscribeFromRequestHandler(ctx context.Context, topic Topic) error
76 DeleteTopic(ctx context.Context, topic Topic) error
77 EnableLivenessChannel(ctx context.Context, enable bool) chan bool
78 SendLiveness(ctx context.Context) error
Matteo Scandolo2ba00d32020-01-16 17:33:03 -080079}
80
81// interContainerProxy represents the messaging proxy
82type interContainerProxy struct {
Neha Sharmadd9af392020-04-28 09:03:57 +000083 kafkaAddress string
Matteo Scandolof346a2d2020-01-24 13:14:54 -080084 defaultTopic *Topic
Scott Baker2c1c4822019-10-16 11:02:41 -070085 defaultRequestHandlerInterface interface{}
Scott Baker2c1c4822019-10-16 11:02:41 -070086 kafkaClient Client
Kent Hagerman3a402302020-01-31 15:03:53 -050087 doneCh chan struct{}
88 doneOnce sync.Once
Scott Baker2c1c4822019-10-16 11:02:41 -070089
90 // This map is used to map a topic to an interface and channel. When a request is received
91 // on that channel (registered to the topic) then that interface is invoked.
92 topicToRequestHandlerChannelMap map[string]*requestHandlerChannel
93 lockTopicRequestHandlerChannelMap sync.RWMutex
94
95 // This map is used to map a channel to a response topic. This channel handles all responses on that
96 // channel for that topic and forward them to the appropriate consumers channel, using the
97 // transactionIdToChannelMap.
98 topicToResponseChannelMap map[string]<-chan *ic.InterContainerMessage
99 lockTopicResponseChannelMap sync.RWMutex
100
101 // This map is used to map a transaction to a consumers channel. This is used whenever a request has been
102 // sent out and we are waiting for a response.
103 transactionIdToChannelMap map[string]*transactionChannel
104 lockTransactionIdToChannelMap sync.RWMutex
105}
106
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800107type InterContainerProxyOption func(*interContainerProxy)
Scott Baker2c1c4822019-10-16 11:02:41 -0700108
Neha Sharmadd9af392020-04-28 09:03:57 +0000109func InterContainerAddress(address string) InterContainerProxyOption {
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800110 return func(args *interContainerProxy) {
Neha Sharmadd9af392020-04-28 09:03:57 +0000111 args.kafkaAddress = address
Scott Baker2c1c4822019-10-16 11:02:41 -0700112 }
113}
114
115func DefaultTopic(topic *Topic) InterContainerProxyOption {
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800116 return func(args *interContainerProxy) {
Matteo Scandolof346a2d2020-01-24 13:14:54 -0800117 args.defaultTopic = topic
Scott Baker2c1c4822019-10-16 11:02:41 -0700118 }
119}
120
Scott Baker2c1c4822019-10-16 11:02:41 -0700121func RequestHandlerInterface(handler interface{}) InterContainerProxyOption {
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800122 return func(args *interContainerProxy) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700123 args.defaultRequestHandlerInterface = handler
124 }
125}
126
127func MsgClient(client Client) InterContainerProxyOption {
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800128 return func(args *interContainerProxy) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700129 args.kafkaClient = client
130 }
131}
132
Kent Hagerman3a402302020-01-31 15:03:53 -0500133func newInterContainerProxy(opts ...InterContainerProxyOption) *interContainerProxy {
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800134 proxy := &interContainerProxy{
Neha Sharmadd9af392020-04-28 09:03:57 +0000135 kafkaAddress: DefaultKafkaAddress,
136 doneCh: make(chan struct{}),
Scott Baker2c1c4822019-10-16 11:02:41 -0700137 }
138
139 for _, option := range opts {
140 option(proxy)
141 }
142
Kent Hagerman3a402302020-01-31 15:03:53 -0500143 return proxy
Scott Baker2c1c4822019-10-16 11:02:41 -0700144}
145
Kent Hagerman3a402302020-01-31 15:03:53 -0500146func NewInterContainerProxy(opts ...InterContainerProxyOption) InterContainerProxy {
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800147 return newInterContainerProxy(opts...)
148}
149
Neha Sharma94f16a92020-06-26 04:17:55 +0000150func (kp *interContainerProxy) Start(ctx context.Context) error {
151 logger.Info(ctx, "Starting-Proxy")
Scott Baker2c1c4822019-10-16 11:02:41 -0700152
153 // Kafka MsgClient should already have been created. If not, output fatal error
154 if kp.kafkaClient == nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000155 logger.Fatal(ctx, "kafka-client-not-set")
Scott Baker2c1c4822019-10-16 11:02:41 -0700156 }
157
Scott Baker2c1c4822019-10-16 11:02:41 -0700158 // Start the kafka client
Neha Sharma94f16a92020-06-26 04:17:55 +0000159 if err := kp.kafkaClient.Start(ctx); err != nil {
160 logger.Errorw(ctx, "Cannot-create-kafka-proxy", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700161 return err
162 }
163
164 // Create the topic to response channel map
165 kp.topicToResponseChannelMap = make(map[string]<-chan *ic.InterContainerMessage)
166 //
167 // Create the transactionId to Channel Map
168 kp.transactionIdToChannelMap = make(map[string]*transactionChannel)
169
170 // Create the topic to request channel map
171 kp.topicToRequestHandlerChannelMap = make(map[string]*requestHandlerChannel)
172
173 return nil
174}
175
Neha Sharma94f16a92020-06-26 04:17:55 +0000176func (kp *interContainerProxy) Stop(ctx context.Context) {
177 logger.Info(ctx, "stopping-intercontainer-proxy")
Kent Hagerman3a402302020-01-31 15:03:53 -0500178 kp.doneOnce.Do(func() { close(kp.doneCh) })
Scott Baker2c1c4822019-10-16 11:02:41 -0700179 // TODO : Perform cleanup
Neha Sharma94f16a92020-06-26 04:17:55 +0000180 kp.kafkaClient.Stop(ctx)
181 err := kp.deleteAllTopicRequestHandlerChannelMap(ctx)
Scott Bakera2da2f42020-02-20 16:27:34 -0800182 if err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000183 logger.Errorw(ctx, "failed-delete-all-topic-request-handler-channel-map", log.Fields{"error": err})
Scott Bakera2da2f42020-02-20 16:27:34 -0800184 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000185 err = kp.deleteAllTopicResponseChannelMap(ctx)
Scott Bakera2da2f42020-02-20 16:27:34 -0800186 if err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000187 logger.Errorw(ctx, "failed-delete-all-topic-response-channel-map", log.Fields{"error": err})
Scott Bakera2da2f42020-02-20 16:27:34 -0800188 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000189 kp.deleteAllTransactionIdToChannelMap(ctx)
Scott Baker2c1c4822019-10-16 11:02:41 -0700190}
191
Matteo Scandolof346a2d2020-01-24 13:14:54 -0800192func (kp *interContainerProxy) GetDefaultTopic() *Topic {
193 return kp.defaultTopic
194}
195
Matteo Scandoloed128822020-02-10 15:52:35 -0800196// InvokeAsyncRPC is used to make an RPC request asynchronously
197func (kp *interContainerProxy) InvokeAsyncRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
198 waitForResponse bool, key string, kvArgs ...*KVArg) chan *RpcResponse {
199
Girish Kumar74240652020-07-10 11:54:28 +0000200 spanArg, span, ctx := kp.embedSpanAsArg(ctx, rpc, !waitForResponse)
201 if spanArg != nil {
202 kvArgs = append(kvArgs, &spanArg[0])
203 }
Matteo Scandolodb824802021-06-29 16:23:52 +0200204
Girish Kumar74240652020-07-10 11:54:28 +0000205 defer span.Finish()
206
Matteo Scandolodb824802021-06-29 16:23:52 +0200207 logger.Debugw(ctx, "InvokeAsyncRPC", log.Fields{"rpc": rpc, "key": key, "kvArgs": kvArgs})
208
Matteo Scandoloed128822020-02-10 15:52:35 -0800209 // If a replyToTopic is provided then we use it, otherwise just use the default toTopic. The replyToTopic is
210 // typically the device ID.
211 responseTopic := replyToTopic
212 if responseTopic == nil {
213 responseTopic = kp.GetDefaultTopic()
214 }
215
216 chnl := make(chan *RpcResponse)
217
218 go func() {
219
220 // once we're done,
221 // close the response channel
222 defer close(chnl)
223
224 var err error
225 var protoRequest *ic.InterContainerMessage
226
227 // Encode the request
Neha Sharma94f16a92020-06-26 04:17:55 +0000228 protoRequest, err = encodeRequest(ctx, rpc, toTopic, responseTopic, key, kvArgs...)
Matteo Scandoloed128822020-02-10 15:52:35 -0800229 if err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000230 logger.Warnw(ctx, "cannot-format-request", log.Fields{"rpc": rpc, "error": err})
Girish Kumar74240652020-07-10 11:54:28 +0000231 log.MarkSpanError(ctx, errors.New("cannot-format-request"))
Matteo Scandoloed128822020-02-10 15:52:35 -0800232 chnl <- NewResponse(RpcFormattingError, err, nil)
233 return
234 }
235
236 // Subscribe for response, if needed, before sending request
237 var ch <-chan *ic.InterContainerMessage
Neha Sharma94f16a92020-06-26 04:17:55 +0000238 if ch, err = kp.subscribeForResponse(ctx, *responseTopic, protoRequest.Header.Id); err != nil {
239 logger.Errorw(ctx, "failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
Girish Kumar74240652020-07-10 11:54:28 +0000240 log.MarkSpanError(ctx, errors.New("failed-to-subscribe-for-response"))
Matteo Scandoloed128822020-02-10 15:52:35 -0800241 chnl <- NewResponse(RpcTransportError, err, nil)
242 return
243 }
244
245 // Send request - if the topic is formatted with a device Id then we will send the request using a
246 // specific key, hence ensuring a single partition is used to publish the request. This ensures that the
247 // subscriber on that topic will receive the request in the order it was sent. The key used is the deviceId.
Neha Sharma94f16a92020-06-26 04:17:55 +0000248 logger.Debugw(ctx, "sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
Matteo Scandoloed128822020-02-10 15:52:35 -0800249
250 // if the message is not sent on kafka publish an event an close the channel
Neha Sharma94f16a92020-06-26 04:17:55 +0000251 if err = kp.kafkaClient.Send(ctx, protoRequest, toTopic, key); err != nil {
Matteo Scandoloed128822020-02-10 15:52:35 -0800252 chnl <- NewResponse(RpcTransportError, err, nil)
253 return
254 }
255
256 // if the client is not waiting for a response send the ack and close the channel
257 chnl <- NewResponse(RpcSent, nil, nil)
258 if !waitForResponse {
259 return
260 }
261
262 defer func() {
263 // Remove the subscription for a response on return
Neha Sharma94f16a92020-06-26 04:17:55 +0000264 if err := kp.unSubscribeForResponse(ctx, protoRequest.Header.Id); err != nil {
265 logger.Warnw(ctx, "invoke-async-rpc-unsubscriber-for-response-failed", log.Fields{"err": err})
Matteo Scandoloed128822020-02-10 15:52:35 -0800266 }
267 }()
268
269 // Wait for response as well as timeout or cancellation
270 select {
271 case msg, ok := <-ch:
272 if !ok {
Neha Sharma94f16a92020-06-26 04:17:55 +0000273 logger.Warnw(ctx, "channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
Girish Kumar74240652020-07-10 11:54:28 +0000274 log.MarkSpanError(ctx, errors.New("channel-closed"))
Matteo Scandoloed128822020-02-10 15:52:35 -0800275 chnl <- NewResponse(RpcTransportError, status.Error(codes.Aborted, "channel closed"), nil)
276 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000277 logger.Debugw(ctx, "received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
278 if responseBody, err := decodeResponse(ctx, msg); err != nil {
Matteo Scandoloed128822020-02-10 15:52:35 -0800279 chnl <- NewResponse(RpcReply, err, nil)
280 } else {
281 if responseBody.Success {
282 chnl <- NewResponse(RpcReply, nil, responseBody.Result)
283 } else {
284 // response body contains an error
285 unpackErr := &ic.Error{}
286 if err := ptypes.UnmarshalAny(responseBody.Result, unpackErr); err != nil {
287 chnl <- NewResponse(RpcReply, err, nil)
288 } else {
289 chnl <- NewResponse(RpcReply, status.Error(codes.Internal, unpackErr.Reason), nil)
290 }
291 }
292 }
293 case <-ctx.Done():
Neha Sharma94f16a92020-06-26 04:17:55 +0000294 logger.Errorw(ctx, "context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
Girish Kumar74240652020-07-10 11:54:28 +0000295 log.MarkSpanError(ctx, errors.New("context-cancelled"))
Matteo Scandoloed128822020-02-10 15:52:35 -0800296 err := status.Error(codes.DeadlineExceeded, ctx.Err().Error())
297 chnl <- NewResponse(RpcTimeout, err, nil)
298 case <-kp.doneCh:
299 chnl <- NewResponse(RpcSystemClosing, nil, nil)
Neha Sharma94f16a92020-06-26 04:17:55 +0000300 logger.Warnw(ctx, "received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
Matteo Scandoloed128822020-02-10 15:52:35 -0800301 }
302 }()
303 return chnl
304}
305
Girish Kumar74240652020-07-10 11:54:28 +0000306// Method to extract Open-tracing Span from Context and serialize it for transport over Kafka embedded as a additional argument.
307// Additional argument is injected using key as "span" and value as Span marshalled into a byte slice
308//
309// The span name is automatically constructed using the RPC name with following convention (<rpc-name> represents name of invoked method):
310// - RPC invoked in Sync manner (WaitForResponse=true) : kafka-rpc-<rpc-name>
311// - RPC invoked in Async manner (WaitForResponse=false) : kafka-async-rpc-<rpc-name>
312// - Inter Adapter RPC invoked in Sync manner (WaitForResponse=true) : kafka-inter-adapter-rpc-<rpc-name>
313// - Inter Adapter RPC invoked in Async manner (WaitForResponse=false) : kafka-inter-adapter-async-rpc-<rpc-name>
314func (kp *interContainerProxy) embedSpanAsArg(ctx context.Context, rpc string, isAsync bool) ([]KVArg, opentracing.Span, context.Context) {
315 var err error
316 var newCtx context.Context
317 var spanToInject opentracing.Span
318
Matteo Scandolodb824802021-06-29 16:23:52 +0200319 if !log.GetGlobalLFM().GetLogCorrelationStatus() && !log.GetGlobalLFM().GetTracePublishingStatus() {
320 // if both log correlation and trace publishing is disable do not generate the span
321 logger.Debugw(ctx, "not-embedding-span-in-KVArg-", log.Fields{"rpc": rpc,
322 "log-correlation-status": log.GetGlobalLFM().GetLogCorrelationStatus(), "trace-publishing-status": log.GetGlobalLFM().GetTracePublishingStatus()})
323 return nil, opentracing.SpanFromContext(ctx), ctx
324 }
325
Girish Kumar74240652020-07-10 11:54:28 +0000326 var spanName strings.Builder
327 spanName.WriteString("kafka-")
328
329 // In case of inter adapter message, use Msg Type for constructing RPC name
330 if rpc == "process_inter_adapter_message" {
331 if msgType, ok := ctx.Value("inter-adapter-msg-type").(ic.InterAdapterMessageType_Types); ok {
332 spanName.WriteString("inter-adapter-")
333 rpc = msgType.String()
334 }
335 }
336
337 if isAsync {
338 spanName.WriteString("async-rpc-")
339 } else {
340 spanName.WriteString("rpc-")
341 }
342 spanName.WriteString(rpc)
343
344 if isAsync {
345 spanToInject, newCtx = log.CreateAsyncSpan(ctx, spanName.String())
346 } else {
347 spanToInject, newCtx = log.CreateChildSpan(ctx, spanName.String())
348 }
349
350 spanToInject.SetBaggageItem("rpc-span-name", spanName.String())
351
352 textMapCarrier := opentracing.TextMapCarrier(make(map[string]string))
353 if err = opentracing.GlobalTracer().Inject(spanToInject.Context(), opentracing.TextMap, textMapCarrier); err != nil {
354 logger.Warnw(ctx, "unable-to-serialize-span-to-textmap", log.Fields{"span": spanToInject, "error": err})
355 return nil, spanToInject, newCtx
356 }
357
358 var textMapJson []byte
359 if textMapJson, err = json.Marshal(textMapCarrier); err != nil {
360 logger.Warnw(ctx, "unable-to-marshal-textmap-to-json-string", log.Fields{"textMap": textMapCarrier, "error": err})
361 return nil, spanToInject, newCtx
362 }
363
364 spanArg := make([]KVArg, 1)
365 spanArg[0] = KVArg{Key: "span", Value: &ic.StrType{Val: string(textMapJson)}}
366 return spanArg, spanToInject, newCtx
367}
368
Scott Baker2c1c4822019-10-16 11:02:41 -0700369// InvokeRPC is used to send a request to a given topic
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800370func (kp *interContainerProxy) InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
Scott Baker2c1c4822019-10-16 11:02:41 -0700371 waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any) {
372
Girish Kumar74240652020-07-10 11:54:28 +0000373 spanArg, span, ctx := kp.embedSpanAsArg(ctx, rpc, false)
374 if spanArg != nil {
375 kvArgs = append(kvArgs, &spanArg[0])
376 }
Matteo Scandolodb824802021-06-29 16:23:52 +0200377
Girish Kumar74240652020-07-10 11:54:28 +0000378 defer span.Finish()
379
Matteo Scandolodb824802021-06-29 16:23:52 +0200380 logger.Debugw(ctx, "InvokeRPC", log.Fields{"rpc": rpc, "key": key, "kvArgs": kvArgs})
381
Scott Baker2c1c4822019-10-16 11:02:41 -0700382 // If a replyToTopic is provided then we use it, otherwise just use the default toTopic. The replyToTopic is
383 // typically the device ID.
384 responseTopic := replyToTopic
385 if responseTopic == nil {
Matteo Scandolof346a2d2020-01-24 13:14:54 -0800386 responseTopic = kp.defaultTopic
Scott Baker2c1c4822019-10-16 11:02:41 -0700387 }
388
389 // Encode the request
Neha Sharma94f16a92020-06-26 04:17:55 +0000390 protoRequest, err := encodeRequest(ctx, rpc, toTopic, responseTopic, key, kvArgs...)
Scott Baker2c1c4822019-10-16 11:02:41 -0700391 if err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000392 logger.Warnw(ctx, "cannot-format-request", log.Fields{"rpc": rpc, "error": err})
Girish Kumar74240652020-07-10 11:54:28 +0000393 log.MarkSpanError(ctx, errors.New("cannot-format-request"))
Scott Baker2c1c4822019-10-16 11:02:41 -0700394 return false, nil
395 }
396
397 // Subscribe for response, if needed, before sending request
398 var ch <-chan *ic.InterContainerMessage
399 if waitForResponse {
400 var err error
Neha Sharma94f16a92020-06-26 04:17:55 +0000401 if ch, err = kp.subscribeForResponse(ctx, *responseTopic, protoRequest.Header.Id); err != nil {
402 logger.Errorw(ctx, "failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700403 }
404 }
405
406 // Send request - if the topic is formatted with a device Id then we will send the request using a
407 // specific key, hence ensuring a single partition is used to publish the request. This ensures that the
408 // subscriber on that topic will receive the request in the order it was sent. The key used is the deviceId.
409 //key := GetDeviceIdFromTopic(*toTopic)
Neha Sharma94f16a92020-06-26 04:17:55 +0000410 logger.Debugw(ctx, "sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800411 go func() {
Neha Sharma94f16a92020-06-26 04:17:55 +0000412 if err := kp.kafkaClient.Send(ctx, protoRequest, toTopic, key); err != nil {
Girish Kumar74240652020-07-10 11:54:28 +0000413 log.MarkSpanError(ctx, errors.New("send-failed"))
Neha Sharma94f16a92020-06-26 04:17:55 +0000414 logger.Errorw(ctx, "send-failed", log.Fields{
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800415 "topic": toTopic,
416 "key": key,
417 "error": err})
418 }
419 }()
Scott Baker2c1c4822019-10-16 11:02:41 -0700420
421 if waitForResponse {
422 // Create a child context based on the parent context, if any
423 var cancel context.CancelFunc
424 childCtx := context.Background()
425 if ctx == nil {
426 ctx, cancel = context.WithTimeout(context.Background(), DefaultRequestTimeout*time.Millisecond)
427 } else {
428 childCtx, cancel = context.WithTimeout(ctx, DefaultRequestTimeout*time.Millisecond)
429 }
430 defer cancel()
431
432 // Wait for response as well as timeout or cancellation
433 // Remove the subscription for a response on return
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800434 defer func() {
Neha Sharma94f16a92020-06-26 04:17:55 +0000435 if err := kp.unSubscribeForResponse(ctx, protoRequest.Header.Id); err != nil {
436 logger.Errorw(ctx, "response-unsubscribe-failed", log.Fields{
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800437 "id": protoRequest.Header.Id,
438 "error": err})
439 }
440 }()
Scott Baker2c1c4822019-10-16 11:02:41 -0700441 select {
442 case msg, ok := <-ch:
443 if !ok {
Neha Sharma94f16a92020-06-26 04:17:55 +0000444 logger.Warnw(ctx, "channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
Girish Kumar74240652020-07-10 11:54:28 +0000445 log.MarkSpanError(ctx, errors.New("channel-closed"))
Scott Baker2c1c4822019-10-16 11:02:41 -0700446 protoError := &ic.Error{Reason: "channel-closed"}
447 var marshalledArg *any.Any
448 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
449 return false, nil // Should never happen
450 }
451 return false, marshalledArg
452 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000453 logger.Debugw(ctx, "received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
Scott Baker2c1c4822019-10-16 11:02:41 -0700454 var responseBody *ic.InterContainerResponseBody
455 var err error
Neha Sharma94f16a92020-06-26 04:17:55 +0000456 if responseBody, err = decodeResponse(ctx, msg); err != nil {
457 logger.Errorw(ctx, "decode-response-error", log.Fields{"error": err})
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800458 // FIXME we should return something
Scott Baker2c1c4822019-10-16 11:02:41 -0700459 }
460 return responseBody.Success, responseBody.Result
461 case <-ctx.Done():
Neha Sharma94f16a92020-06-26 04:17:55 +0000462 logger.Debugw(ctx, "context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
Girish Kumar74240652020-07-10 11:54:28 +0000463 log.MarkSpanError(ctx, errors.New("context-cancelled"))
Scott Baker2c1c4822019-10-16 11:02:41 -0700464 // pack the error as proto any type
Matteo Scandolob45cf592020-01-21 16:10:56 -0800465 protoError := &ic.Error{Reason: ctx.Err().Error(), Code: ic.ErrorCode_DEADLINE_EXCEEDED}
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800466
Scott Baker2c1c4822019-10-16 11:02:41 -0700467 var marshalledArg *any.Any
468 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
469 return false, nil // Should never happen
470 }
471 return false, marshalledArg
472 case <-childCtx.Done():
Neha Sharma94f16a92020-06-26 04:17:55 +0000473 logger.Debugw(ctx, "context-cancelled", log.Fields{"rpc": rpc, "ctx": childCtx.Err()})
Girish Kumar74240652020-07-10 11:54:28 +0000474 log.MarkSpanError(ctx, errors.New("context-cancelled"))
Scott Baker2c1c4822019-10-16 11:02:41 -0700475 // pack the error as proto any type
Matteo Scandolob45cf592020-01-21 16:10:56 -0800476 protoError := &ic.Error{Reason: childCtx.Err().Error(), Code: ic.ErrorCode_DEADLINE_EXCEEDED}
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800477
Scott Baker2c1c4822019-10-16 11:02:41 -0700478 var marshalledArg *any.Any
479 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
480 return false, nil // Should never happen
481 }
482 return false, marshalledArg
483 case <-kp.doneCh:
Neha Sharma94f16a92020-06-26 04:17:55 +0000484 logger.Infow(ctx, "received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
Scott Baker2c1c4822019-10-16 11:02:41 -0700485 return true, nil
486 }
487 }
488 return true, nil
489}
490
491// SubscribeWithRequestHandlerInterface allows a caller to assign a target object to be invoked automatically
492// when a message is received on a given topic
Neha Sharma94f16a92020-06-26 04:17:55 +0000493func (kp *interContainerProxy) SubscribeWithRequestHandlerInterface(ctx context.Context, topic Topic, handler interface{}) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700494
495 // Subscribe to receive messages for that topic
496 var ch <-chan *ic.InterContainerMessage
497 var err error
Neha Sharma94f16a92020-06-26 04:17:55 +0000498 if ch, err = kp.kafkaClient.Subscribe(ctx, &topic); err != nil {
Scott Baker2c1c4822019-10-16 11:02:41 -0700499 //if ch, err = kp.Subscribe(topic); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000500 logger.Errorw(ctx, "failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700501 return err
502 }
503
504 kp.defaultRequestHandlerInterface = handler
505 kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: handler, ch: ch})
506 // Launch a go routine to receive and process kafka messages
Neha Sharma94f16a92020-06-26 04:17:55 +0000507 go kp.waitForMessages(ctx, ch, topic, handler)
Scott Baker2c1c4822019-10-16 11:02:41 -0700508
509 return nil
510}
511
512// SubscribeWithDefaultRequestHandler allows a caller to add a topic to an existing target object to be invoked automatically
513// when a message is received on a given topic. So far there is only 1 target registered per microservice
Neha Sharma94f16a92020-06-26 04:17:55 +0000514func (kp *interContainerProxy) SubscribeWithDefaultRequestHandler(ctx context.Context, topic Topic, initialOffset int64) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700515 // Subscribe to receive messages for that topic
516 var ch <-chan *ic.InterContainerMessage
517 var err error
Neha Sharma94f16a92020-06-26 04:17:55 +0000518 if ch, err = kp.kafkaClient.Subscribe(ctx, &topic, &KVArg{Key: Offset, Value: initialOffset}); err != nil {
519 logger.Errorw(ctx, "failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700520 return err
521 }
522 kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: kp.defaultRequestHandlerInterface, ch: ch})
523
524 // Launch a go routine to receive and process kafka messages
Neha Sharma94f16a92020-06-26 04:17:55 +0000525 go kp.waitForMessages(ctx, ch, topic, kp.defaultRequestHandlerInterface)
Scott Baker2c1c4822019-10-16 11:02:41 -0700526
527 return nil
528}
529
Neha Sharma94f16a92020-06-26 04:17:55 +0000530func (kp *interContainerProxy) UnSubscribeFromRequestHandler(ctx context.Context, topic Topic) error {
531 return kp.deleteFromTopicRequestHandlerChannelMap(ctx, topic.Name)
Scott Baker2c1c4822019-10-16 11:02:41 -0700532}
533
Neha Sharma94f16a92020-06-26 04:17:55 +0000534func (kp *interContainerProxy) deleteFromTopicResponseChannelMap(ctx context.Context, topic string) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700535 kp.lockTopicResponseChannelMap.Lock()
536 defer kp.lockTopicResponseChannelMap.Unlock()
537 if _, exist := kp.topicToResponseChannelMap[topic]; exist {
538 // Unsubscribe to this topic first - this will close the subscribed channel
539 var err error
Neha Sharma94f16a92020-06-26 04:17:55 +0000540 if err = kp.kafkaClient.UnSubscribe(ctx, &Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
541 logger.Errorw(ctx, "unsubscribing-error", log.Fields{"topic": topic})
Scott Baker2c1c4822019-10-16 11:02:41 -0700542 }
543 delete(kp.topicToResponseChannelMap, topic)
544 return err
545 } else {
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800546 return fmt.Errorf("%s-Topic-not-found", topic)
Scott Baker2c1c4822019-10-16 11:02:41 -0700547 }
548}
549
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800550// nolint: unused
Neha Sharma94f16a92020-06-26 04:17:55 +0000551func (kp *interContainerProxy) deleteAllTopicResponseChannelMap(ctx context.Context) error {
552 logger.Debug(ctx, "delete-all-topic-response-channel")
Scott Baker2c1c4822019-10-16 11:02:41 -0700553 kp.lockTopicResponseChannelMap.Lock()
554 defer kp.lockTopicResponseChannelMap.Unlock()
Scott Bakera2da2f42020-02-20 16:27:34 -0800555 var unsubscribeFailTopics []string
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800556 for topic := range kp.topicToResponseChannelMap {
Scott Baker2c1c4822019-10-16 11:02:41 -0700557 // Unsubscribe to this topic first - this will close the subscribed channel
Neha Sharma94f16a92020-06-26 04:17:55 +0000558 if err := kp.kafkaClient.UnSubscribe(ctx, &Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
Scott Bakera2da2f42020-02-20 16:27:34 -0800559 unsubscribeFailTopics = append(unsubscribeFailTopics, topic)
Neha Sharma94f16a92020-06-26 04:17:55 +0000560 logger.Errorw(ctx, "unsubscribing-error", log.Fields{"topic": topic, "error": err})
Scott Bakera2da2f42020-02-20 16:27:34 -0800561 // Do not return. Continue to try to unsubscribe to other topics.
562 } else {
563 // Only delete from channel map if successfully unsubscribed.
564 delete(kp.topicToResponseChannelMap, topic)
Scott Baker2c1c4822019-10-16 11:02:41 -0700565 }
Scott Baker2c1c4822019-10-16 11:02:41 -0700566 }
Scott Bakera2da2f42020-02-20 16:27:34 -0800567 if len(unsubscribeFailTopics) > 0 {
568 return fmt.Errorf("unsubscribe-errors: %v", unsubscribeFailTopics)
569 }
570 return nil
Scott Baker2c1c4822019-10-16 11:02:41 -0700571}
572
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800573func (kp *interContainerProxy) addToTopicRequestHandlerChannelMap(topic string, arg *requestHandlerChannel) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700574 kp.lockTopicRequestHandlerChannelMap.Lock()
575 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
576 if _, exist := kp.topicToRequestHandlerChannelMap[topic]; !exist {
577 kp.topicToRequestHandlerChannelMap[topic] = arg
578 }
579}
580
Neha Sharma94f16a92020-06-26 04:17:55 +0000581func (kp *interContainerProxy) deleteFromTopicRequestHandlerChannelMap(ctx context.Context, topic string) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700582 kp.lockTopicRequestHandlerChannelMap.Lock()
583 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
584 if _, exist := kp.topicToRequestHandlerChannelMap[topic]; exist {
585 // Close the kafka client client first by unsubscribing to this topic
Neha Sharma94f16a92020-06-26 04:17:55 +0000586 if err := kp.kafkaClient.UnSubscribe(ctx, &Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800587 return err
588 }
Scott Baker2c1c4822019-10-16 11:02:41 -0700589 delete(kp.topicToRequestHandlerChannelMap, topic)
590 return nil
591 } else {
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800592 return fmt.Errorf("%s-Topic-not-found", topic)
Scott Baker2c1c4822019-10-16 11:02:41 -0700593 }
594}
595
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800596// nolint: unused
Neha Sharma94f16a92020-06-26 04:17:55 +0000597func (kp *interContainerProxy) deleteAllTopicRequestHandlerChannelMap(ctx context.Context) error {
598 logger.Debug(ctx, "delete-all-topic-request-channel")
Scott Baker2c1c4822019-10-16 11:02:41 -0700599 kp.lockTopicRequestHandlerChannelMap.Lock()
600 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
Scott Bakera2da2f42020-02-20 16:27:34 -0800601 var unsubscribeFailTopics []string
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800602 for topic := range kp.topicToRequestHandlerChannelMap {
Scott Baker2c1c4822019-10-16 11:02:41 -0700603 // Close the kafka client client first by unsubscribing to this topic
Neha Sharma94f16a92020-06-26 04:17:55 +0000604 if err := kp.kafkaClient.UnSubscribe(ctx, &Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
Scott Bakera2da2f42020-02-20 16:27:34 -0800605 unsubscribeFailTopics = append(unsubscribeFailTopics, topic)
Neha Sharma94f16a92020-06-26 04:17:55 +0000606 logger.Errorw(ctx, "unsubscribing-error", log.Fields{"topic": topic, "error": err})
Scott Bakera2da2f42020-02-20 16:27:34 -0800607 // Do not return. Continue to try to unsubscribe to other topics.
608 } else {
609 // Only delete from channel map if successfully unsubscribed.
610 delete(kp.topicToRequestHandlerChannelMap, topic)
Scott Baker2c1c4822019-10-16 11:02:41 -0700611 }
Scott Baker2c1c4822019-10-16 11:02:41 -0700612 }
Scott Bakera2da2f42020-02-20 16:27:34 -0800613 if len(unsubscribeFailTopics) > 0 {
614 return fmt.Errorf("unsubscribe-errors: %v", unsubscribeFailTopics)
615 }
616 return nil
Scott Baker2c1c4822019-10-16 11:02:41 -0700617}
618
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800619func (kp *interContainerProxy) addToTransactionIdToChannelMap(id string, topic *Topic, arg chan *ic.InterContainerMessage) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700620 kp.lockTransactionIdToChannelMap.Lock()
621 defer kp.lockTransactionIdToChannelMap.Unlock()
622 if _, exist := kp.transactionIdToChannelMap[id]; !exist {
623 kp.transactionIdToChannelMap[id] = &transactionChannel{topic: topic, ch: arg}
624 }
625}
626
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800627func (kp *interContainerProxy) deleteFromTransactionIdToChannelMap(id string) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700628 kp.lockTransactionIdToChannelMap.Lock()
629 defer kp.lockTransactionIdToChannelMap.Unlock()
630 if transChannel, exist := kp.transactionIdToChannelMap[id]; exist {
631 // Close the channel first
632 close(transChannel.ch)
633 delete(kp.transactionIdToChannelMap, id)
634 }
635}
636
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800637func (kp *interContainerProxy) deleteTopicTransactionIdToChannelMap(id string) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700638 kp.lockTransactionIdToChannelMap.Lock()
639 defer kp.lockTransactionIdToChannelMap.Unlock()
640 for key, value := range kp.transactionIdToChannelMap {
641 if value.topic.Name == id {
642 close(value.ch)
643 delete(kp.transactionIdToChannelMap, key)
644 }
645 }
646}
647
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800648// nolint: unused
Neha Sharma94f16a92020-06-26 04:17:55 +0000649func (kp *interContainerProxy) deleteAllTransactionIdToChannelMap(ctx context.Context) {
650 logger.Debug(ctx, "delete-all-transaction-id-channel-map")
Scott Baker2c1c4822019-10-16 11:02:41 -0700651 kp.lockTransactionIdToChannelMap.Lock()
652 defer kp.lockTransactionIdToChannelMap.Unlock()
653 for key, value := range kp.transactionIdToChannelMap {
654 close(value.ch)
655 delete(kp.transactionIdToChannelMap, key)
656 }
657}
658
Neha Sharma94f16a92020-06-26 04:17:55 +0000659func (kp *interContainerProxy) DeleteTopic(ctx context.Context, topic Topic) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700660 // If we have any consumers on that topic we need to close them
Neha Sharma94f16a92020-06-26 04:17:55 +0000661 if err := kp.deleteFromTopicResponseChannelMap(ctx, topic.Name); err != nil {
662 logger.Errorw(ctx, "delete-from-topic-responsechannelmap-failed", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700663 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000664 if err := kp.deleteFromTopicRequestHandlerChannelMap(ctx, topic.Name); err != nil {
665 logger.Errorw(ctx, "delete-from-topic-requesthandlerchannelmap-failed", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700666 }
667 kp.deleteTopicTransactionIdToChannelMap(topic.Name)
668
Neha Sharma94f16a92020-06-26 04:17:55 +0000669 return kp.kafkaClient.DeleteTopic(ctx, &topic)
Scott Baker2c1c4822019-10-16 11:02:41 -0700670}
671
Neha Sharma94f16a92020-06-26 04:17:55 +0000672func encodeReturnedValue(ctx context.Context, returnedVal interface{}) (*any.Any, error) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700673 // Encode the response argument - needs to be a proto message
674 if returnedVal == nil {
675 return nil, nil
676 }
677 protoValue, ok := returnedVal.(proto.Message)
678 if !ok {
Neha Sharma94f16a92020-06-26 04:17:55 +0000679 logger.Warnw(ctx, "response-value-not-proto-message", log.Fields{"error": ok, "returnVal": returnedVal})
Scott Baker2c1c4822019-10-16 11:02:41 -0700680 err := errors.New("response-value-not-proto-message")
681 return nil, err
682 }
683
684 // Marshal the returned value, if any
685 var marshalledReturnedVal *any.Any
686 var err error
687 if marshalledReturnedVal, err = ptypes.MarshalAny(protoValue); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000688 logger.Warnw(ctx, "cannot-marshal-returned-val", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700689 return nil, err
690 }
691 return marshalledReturnedVal, nil
692}
693
Neha Sharma94f16a92020-06-26 04:17:55 +0000694func encodeDefaultFailedResponse(ctx context.Context, request *ic.InterContainerMessage) *ic.InterContainerMessage {
Scott Baker2c1c4822019-10-16 11:02:41 -0700695 responseHeader := &ic.Header{
696 Id: request.Header.Id,
697 Type: ic.MessageType_RESPONSE,
698 FromTopic: request.Header.ToTopic,
699 ToTopic: request.Header.FromTopic,
Scott Baker84a55ce2020-04-17 10:11:30 -0700700 Timestamp: ptypes.TimestampNow(),
Scott Baker2c1c4822019-10-16 11:02:41 -0700701 }
702 responseBody := &ic.InterContainerResponseBody{
703 Success: false,
704 Result: nil,
705 }
706 var marshalledResponseBody *any.Any
707 var err error
708 // Error should never happen here
709 if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000710 logger.Warnw(ctx, "cannot-marshal-failed-response-body", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700711 }
712
713 return &ic.InterContainerMessage{
714 Header: responseHeader,
715 Body: marshalledResponseBody,
716 }
717
718}
719
720//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
721//or an error on failure
Neha Sharma94f16a92020-06-26 04:17:55 +0000722func encodeResponse(ctx context.Context, request *ic.InterContainerMessage, success bool, returnedValues ...interface{}) (*ic.InterContainerMessage, error) {
723 //logger.Debugw(ctx, "encodeResponse", log.Fields{"success": success, "returnedValues": returnedValues})
Scott Baker2c1c4822019-10-16 11:02:41 -0700724 responseHeader := &ic.Header{
725 Id: request.Header.Id,
726 Type: ic.MessageType_RESPONSE,
727 FromTopic: request.Header.ToTopic,
728 ToTopic: request.Header.FromTopic,
729 KeyTopic: request.Header.KeyTopic,
Scott Baker84a55ce2020-04-17 10:11:30 -0700730 Timestamp: ptypes.TimestampNow(),
Scott Baker2c1c4822019-10-16 11:02:41 -0700731 }
732
733 // Go over all returned values
734 var marshalledReturnedVal *any.Any
735 var err error
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800736
737 // for now we support only 1 returned value - (excluding the error)
738 if len(returnedValues) > 0 {
Neha Sharma94f16a92020-06-26 04:17:55 +0000739 if marshalledReturnedVal, err = encodeReturnedValue(ctx, returnedValues[0]); err != nil {
740 logger.Warnw(ctx, "cannot-marshal-response-body", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700741 }
Scott Baker2c1c4822019-10-16 11:02:41 -0700742 }
743
744 responseBody := &ic.InterContainerResponseBody{
745 Success: success,
746 Result: marshalledReturnedVal,
747 }
748
749 // Marshal the response body
750 var marshalledResponseBody *any.Any
751 if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000752 logger.Warnw(ctx, "cannot-marshal-response-body", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700753 return nil, err
754 }
755
756 return &ic.InterContainerMessage{
757 Header: responseHeader,
758 Body: marshalledResponseBody,
759 }, nil
760}
761
Neha Sharma94f16a92020-06-26 04:17:55 +0000762func CallFuncByName(ctx context.Context, myClass interface{}, funcName string, params ...interface{}) (out []reflect.Value, err error) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700763 myClassValue := reflect.ValueOf(myClass)
764 // Capitalize the first letter in the funcName to workaround the first capital letters required to
765 // invoke a function from a different package
766 funcName = strings.Title(funcName)
767 m := myClassValue.MethodByName(funcName)
768 if !m.IsValid() {
769 return make([]reflect.Value, 0), fmt.Errorf("method-not-found \"%s\"", funcName)
770 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000771 in := make([]reflect.Value, len(params)+1)
772 in[0] = reflect.ValueOf(ctx)
Scott Baker2c1c4822019-10-16 11:02:41 -0700773 for i, param := range params {
Neha Sharma94f16a92020-06-26 04:17:55 +0000774 in[i+1] = reflect.ValueOf(param)
Scott Baker2c1c4822019-10-16 11:02:41 -0700775 }
776 out = m.Call(in)
777 return
778}
779
Neha Sharma94f16a92020-06-26 04:17:55 +0000780func (kp *interContainerProxy) addTransactionId(ctx context.Context, transactionId string, currentArgs []*ic.Argument) []*ic.Argument {
Scott Baker2c1c4822019-10-16 11:02:41 -0700781 arg := &KVArg{
782 Key: TransactionKey,
783 Value: &ic.StrType{Val: transactionId},
784 }
785
786 var marshalledArg *any.Any
787 var err error
788 if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: transactionId}); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000789 logger.Warnw(ctx, "cannot-add-transactionId", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700790 return currentArgs
791 }
792 protoArg := &ic.Argument{
793 Key: arg.Key,
794 Value: marshalledArg,
795 }
796 return append(currentArgs, protoArg)
797}
798
Neha Sharma94f16a92020-06-26 04:17:55 +0000799func (kp *interContainerProxy) addFromTopic(ctx context.Context, fromTopic string, currentArgs []*ic.Argument) []*ic.Argument {
Scott Baker2c1c4822019-10-16 11:02:41 -0700800 var marshalledArg *any.Any
801 var err error
802 if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: fromTopic}); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000803 logger.Warnw(ctx, "cannot-add-transactionId", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700804 return currentArgs
805 }
806 protoArg := &ic.Argument{
807 Key: FromTopic,
808 Value: marshalledArg,
809 }
810 return append(currentArgs, protoArg)
811}
812
Girish Kumar74240652020-07-10 11:54:28 +0000813// Method to extract the Span embedded in Kafka RPC request on the receiver side. If span is found embedded in the KV args (with key as "span"),
814// it is de-serialized and injected into the Context to be carried forward by the RPC request processor thread.
815// If no span is found embedded, even then a span is created with name as "kafka-rpc-<rpc-name>" to enrich the Context for RPC calls coming
816// from components currently not sending the span (e.g. openonu adapter)
817func (kp *interContainerProxy) enrichContextWithSpan(ctx context.Context, rpcName string, args []*ic.Argument) (opentracing.Span, context.Context) {
818
819 for _, arg := range args {
820 if arg.Key == "span" {
821 var err error
822 var textMapString ic.StrType
823 if err = ptypes.UnmarshalAny(arg.Value, &textMapString); err != nil {
824 logger.Warnw(ctx, "unable-to-unmarshal-kvarg-to-textmap-string", log.Fields{"value": arg.Value})
825 break
826 }
827
828 spanTextMap := make(map[string]string)
829 if err = json.Unmarshal([]byte(textMapString.Val), &spanTextMap); err != nil {
830 logger.Warnw(ctx, "unable-to-unmarshal-textmap-from-json-string", log.Fields{"textMapString": textMapString, "error": err})
831 break
832 }
833
834 var spanContext opentracing.SpanContext
835 if spanContext, err = opentracing.GlobalTracer().Extract(opentracing.TextMap, opentracing.TextMapCarrier(spanTextMap)); err != nil {
836 logger.Warnw(ctx, "unable-to-deserialize-textmap-to-span", log.Fields{"textMap": spanTextMap, "error": err})
837 break
838 }
839
840 var receivedRpcName string
841 extractBaggage := func(k, v string) bool {
842 if k == "rpc-span-name" {
843 receivedRpcName = v
844 return false
845 }
846 return true
847 }
848
849 spanContext.ForeachBaggageItem(extractBaggage)
850
851 return opentracing.StartSpanFromContext(ctx, receivedRpcName, opentracing.FollowsFrom(spanContext))
852 }
853 }
854
855 // Create new Child Span with rpc as name if no span details were received in kafka arguments
856 var spanName strings.Builder
857 spanName.WriteString("kafka-")
858
859 // In case of inter adapter message, use Msg Type for constructing RPC name
860 if rpcName == "process_inter_adapter_message" {
861 for _, arg := range args {
862 if arg.Key == "msg" {
863 iamsg := ic.InterAdapterMessage{}
864 if err := ptypes.UnmarshalAny(arg.Value, &iamsg); err == nil {
865 spanName.WriteString("inter-adapter-")
866 rpcName = iamsg.Header.Type.String()
867 }
868 }
869 }
870 }
871
872 spanName.WriteString("rpc-")
873 spanName.WriteString(rpcName)
874
875 return opentracing.StartSpanFromContext(ctx, spanName.String())
876}
877
Neha Sharma94f16a92020-06-26 04:17:55 +0000878func (kp *interContainerProxy) handleMessage(ctx context.Context, msg *ic.InterContainerMessage, targetInterface interface{}) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700879
880 // First extract the header to know whether this is a request - responses are handled by a different handler
881 if msg.Header.Type == ic.MessageType_REQUEST {
882 var out []reflect.Value
883 var err error
884
885 // Get the request body
886 requestBody := &ic.InterContainerRequestBody{}
887 if err = ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000888 logger.Warnw(ctx, "cannot-unmarshal-request", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700889 } else {
Matteo Scandolodb824802021-06-29 16:23:52 +0200890 logger.Debugw(ctx, "received-request", log.Fields{"rpc": requestBody.Rpc, "header": msg.Header, "args": requestBody.Args})
Girish Kumar74240652020-07-10 11:54:28 +0000891 span, ctx := kp.enrichContextWithSpan(ctx, requestBody.Rpc, requestBody.Args)
892 defer span.Finish()
893
Scott Baker2c1c4822019-10-16 11:02:41 -0700894 // let the callee unpack the arguments as its the only one that knows the real proto type
895 // Augment the requestBody with the message Id as it will be used in scenarios where cores
896 // are set in pairs and competing
Neha Sharma94f16a92020-06-26 04:17:55 +0000897 requestBody.Args = kp.addTransactionId(ctx, msg.Header.Id, requestBody.Args)
Scott Baker2c1c4822019-10-16 11:02:41 -0700898
899 // Augment the requestBody with the From topic name as it will be used in scenarios where a container
900 // needs to send an unsollicited message to the currently requested container
Neha Sharma94f16a92020-06-26 04:17:55 +0000901 requestBody.Args = kp.addFromTopic(ctx, msg.Header.FromTopic, requestBody.Args)
Scott Baker2c1c4822019-10-16 11:02:41 -0700902
Neha Sharma94f16a92020-06-26 04:17:55 +0000903 out, err = CallFuncByName(ctx, targetInterface, requestBody.Rpc, requestBody.Args)
Scott Baker2c1c4822019-10-16 11:02:41 -0700904 if err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000905 logger.Warn(ctx, err)
Scott Baker2c1c4822019-10-16 11:02:41 -0700906 }
907 }
908 // Response required?
909 if requestBody.ResponseRequired {
910 // If we already have an error before then just return that
911 var returnError *ic.Error
912 var returnedValues []interface{}
913 var success bool
914 if err != nil {
915 returnError = &ic.Error{Reason: err.Error()}
916 returnedValues = make([]interface{}, 1)
917 returnedValues[0] = returnError
918 } else {
919 returnedValues = make([]interface{}, 0)
920 // Check for errors first
921 lastIndex := len(out) - 1
922 if out[lastIndex].Interface() != nil { // Error
923 if retError, ok := out[lastIndex].Interface().(error); ok {
924 if retError.Error() == ErrorTransactionNotAcquired.Error() {
Neha Sharma94f16a92020-06-26 04:17:55 +0000925 logger.Debugw(ctx, "Ignoring request", log.Fields{"error": retError, "txId": msg.Header.Id})
Scott Baker2c1c4822019-10-16 11:02:41 -0700926 return // Ignore - process is in competing mode and ignored transaction
927 }
928 returnError = &ic.Error{Reason: retError.Error()}
929 returnedValues = append(returnedValues, returnError)
930 } else { // Should never happen
931 returnError = &ic.Error{Reason: "incorrect-error-returns"}
932 returnedValues = append(returnedValues, returnError)
933 }
934 } else if len(out) == 2 && reflect.ValueOf(out[0].Interface()).IsValid() && reflect.ValueOf(out[0].Interface()).IsNil() {
Neha Sharma94f16a92020-06-26 04:17:55 +0000935 logger.Warnw(ctx, "Unexpected response of (nil,nil)", log.Fields{"txId": msg.Header.Id})
Scott Baker2c1c4822019-10-16 11:02:41 -0700936 return // Ignore - should not happen
937 } else { // Non-error case
938 success = true
939 for idx, val := range out {
Neha Sharma94f16a92020-06-26 04:17:55 +0000940 //logger.Debugw(ctx, "returned-api-response-loop", log.Fields{"idx": idx, "val": val.Interface()})
Scott Baker2c1c4822019-10-16 11:02:41 -0700941 if idx != lastIndex {
942 returnedValues = append(returnedValues, val.Interface())
943 }
944 }
945 }
946 }
947
948 var icm *ic.InterContainerMessage
Neha Sharma94f16a92020-06-26 04:17:55 +0000949 if icm, err = encodeResponse(ctx, msg, success, returnedValues...); err != nil {
950 logger.Warnw(ctx, "error-encoding-response-returning-failure-result", log.Fields{"error": err})
951 icm = encodeDefaultFailedResponse(ctx, msg)
Scott Baker2c1c4822019-10-16 11:02:41 -0700952 }
953 // To preserve ordering of messages, all messages to a given topic are sent to the same partition
954 // by providing a message key. The key is encoded in the topic name. If the deviceId is not
955 // present then the key will be empty, hence all messages for a given topic will be sent to all
956 // partitions.
957 replyTopic := &Topic{Name: msg.Header.FromTopic}
958 key := msg.Header.KeyTopic
Neha Sharma94f16a92020-06-26 04:17:55 +0000959 logger.Debugw(ctx, "sending-response-to-kafka", log.Fields{"rpc": requestBody.Rpc, "header": icm.Header, "key": key})
Scott Baker2c1c4822019-10-16 11:02:41 -0700960 // TODO: handle error response.
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800961 go func() {
Neha Sharma94f16a92020-06-26 04:17:55 +0000962 if err := kp.kafkaClient.Send(ctx, icm, replyTopic, key); err != nil {
963 logger.Errorw(ctx, "send-reply-failed", log.Fields{
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800964 "topic": replyTopic,
965 "key": key,
966 "error": err})
967 }
968 }()
Scott Baker2c1c4822019-10-16 11:02:41 -0700969 }
970 } else if msg.Header.Type == ic.MessageType_RESPONSE {
Neha Sharma94f16a92020-06-26 04:17:55 +0000971 logger.Debugw(ctx, "response-received", log.Fields{"msg-header": msg.Header})
972 go kp.dispatchResponse(ctx, msg)
Scott Baker2c1c4822019-10-16 11:02:41 -0700973 } else {
Neha Sharma94f16a92020-06-26 04:17:55 +0000974 logger.Warnw(ctx, "unsupported-message-received", log.Fields{"msg-header": msg.Header})
Scott Baker2c1c4822019-10-16 11:02:41 -0700975 }
976}
977
Neha Sharma94f16a92020-06-26 04:17:55 +0000978func (kp *interContainerProxy) waitForMessages(ctx context.Context, ch <-chan *ic.InterContainerMessage, topic Topic, targetInterface interface{}) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700979 // Wait for messages
980 for msg := range ch {
Neha Sharma94f16a92020-06-26 04:17:55 +0000981 //logger.Debugw(ctx, "request-received", log.Fields{"msg": msg, "topic": topic.Name, "target": targetInterface})
982 go kp.handleMessage(context.Background(), msg, targetInterface)
Scott Baker2c1c4822019-10-16 11:02:41 -0700983 }
984}
985
Neha Sharma94f16a92020-06-26 04:17:55 +0000986func (kp *interContainerProxy) dispatchResponse(ctx context.Context, msg *ic.InterContainerMessage) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700987 kp.lockTransactionIdToChannelMap.RLock()
988 defer kp.lockTransactionIdToChannelMap.RUnlock()
989 if _, exist := kp.transactionIdToChannelMap[msg.Header.Id]; !exist {
Neha Sharma94f16a92020-06-26 04:17:55 +0000990 logger.Debugw(ctx, "no-waiting-channel", log.Fields{"transaction": msg.Header.Id})
Scott Baker2c1c4822019-10-16 11:02:41 -0700991 return
992 }
993 kp.transactionIdToChannelMap[msg.Header.Id].ch <- msg
994}
995
996// subscribeForResponse allows a caller to subscribe to a given topic when waiting for a response.
997// This method is built to prevent all subscribers to receive all messages as is the case of the Subscribe
998// API. There is one response channel waiting for kafka messages before dispatching the message to the
999// corresponding waiting channel
Neha Sharma94f16a92020-06-26 04:17:55 +00001000func (kp *interContainerProxy) subscribeForResponse(ctx context.Context, topic Topic, trnsId string) (chan *ic.InterContainerMessage, error) {
1001 logger.Debugw(ctx, "subscribeForResponse", log.Fields{"topic": topic.Name, "trnsid": trnsId})
Scott Baker2c1c4822019-10-16 11:02:41 -07001002
1003 // Create a specific channel for this consumers. We cannot use the channel from the kafkaclient as it will
1004 // broadcast any message for this topic to all channels waiting on it.
Scott Bakerae1d4702020-03-04 14:10:51 -08001005 // Set channel size to 1 to prevent deadlock, see VOL-2708
1006 ch := make(chan *ic.InterContainerMessage, 1)
Scott Baker2c1c4822019-10-16 11:02:41 -07001007 kp.addToTransactionIdToChannelMap(trnsId, &topic, ch)
1008
1009 return ch, nil
1010}
1011
Neha Sharma94f16a92020-06-26 04:17:55 +00001012func (kp *interContainerProxy) unSubscribeForResponse(ctx context.Context, trnsId string) error {
1013 logger.Debugw(ctx, "unsubscribe-for-response", log.Fields{"trnsId": trnsId})
Scott Baker2c1c4822019-10-16 11:02:41 -07001014 kp.deleteFromTransactionIdToChannelMap(trnsId)
1015 return nil
1016}
1017
Neha Sharma94f16a92020-06-26 04:17:55 +00001018func (kp *interContainerProxy) EnableLivenessChannel(ctx context.Context, enable bool) chan bool {
1019 return kp.kafkaClient.EnableLivenessChannel(ctx, enable)
Scott Baker104b67d2019-10-29 15:56:27 -07001020}
1021
Neha Sharma94f16a92020-06-26 04:17:55 +00001022func (kp *interContainerProxy) EnableHealthinessChannel(ctx context.Context, enable bool) chan bool {
1023 return kp.kafkaClient.EnableHealthinessChannel(ctx, enable)
Scott Baker0fef6982019-12-12 09:49:42 -08001024}
1025
Neha Sharma94f16a92020-06-26 04:17:55 +00001026func (kp *interContainerProxy) SendLiveness(ctx context.Context) error {
1027 return kp.kafkaClient.SendLiveness(ctx)
Scott Baker104b67d2019-10-29 15:56:27 -07001028}
1029
Scott Baker2c1c4822019-10-16 11:02:41 -07001030//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
1031//or an error on failure
Neha Sharma94f16a92020-06-26 04:17:55 +00001032func encodeRequest(ctx context.Context, rpc string, toTopic *Topic, replyTopic *Topic, key string, kvArgs ...*KVArg) (*ic.InterContainerMessage, error) {
Scott Baker2c1c4822019-10-16 11:02:41 -07001033 requestHeader := &ic.Header{
1034 Id: uuid.New().String(),
1035 Type: ic.MessageType_REQUEST,
1036 FromTopic: replyTopic.Name,
1037 ToTopic: toTopic.Name,
1038 KeyTopic: key,
Scott Baker84a55ce2020-04-17 10:11:30 -07001039 Timestamp: ptypes.TimestampNow(),
Scott Baker2c1c4822019-10-16 11:02:41 -07001040 }
1041 requestBody := &ic.InterContainerRequestBody{
1042 Rpc: rpc,
1043 ResponseRequired: true,
1044 ReplyToTopic: replyTopic.Name,
1045 }
1046
1047 for _, arg := range kvArgs {
1048 if arg == nil {
1049 // In case the caller sends an array with empty args
1050 continue
1051 }
1052 var marshalledArg *any.Any
1053 var err error
1054 // ascertain the value interface type is a proto.Message
1055 protoValue, ok := arg.Value.(proto.Message)
1056 if !ok {
Neha Sharma94f16a92020-06-26 04:17:55 +00001057 logger.Warnw(ctx, "argument-value-not-proto-message", log.Fields{"error": ok, "Value": arg.Value})
Scott Baker2c1c4822019-10-16 11:02:41 -07001058 err := errors.New("argument-value-not-proto-message")
1059 return nil, err
1060 }
1061 if marshalledArg, err = ptypes.MarshalAny(protoValue); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +00001062 logger.Warnw(ctx, "cannot-marshal-request", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -07001063 return nil, err
1064 }
1065 protoArg := &ic.Argument{
1066 Key: arg.Key,
1067 Value: marshalledArg,
1068 }
1069 requestBody.Args = append(requestBody.Args, protoArg)
1070 }
1071
1072 var marshalledData *any.Any
1073 var err error
1074 if marshalledData, err = ptypes.MarshalAny(requestBody); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +00001075 logger.Warnw(ctx, "cannot-marshal-request", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -07001076 return nil, err
1077 }
1078 request := &ic.InterContainerMessage{
1079 Header: requestHeader,
1080 Body: marshalledData,
1081 }
1082 return request, nil
1083}
1084
Neha Sharma94f16a92020-06-26 04:17:55 +00001085func decodeResponse(ctx context.Context, response *ic.InterContainerMessage) (*ic.InterContainerResponseBody, error) {
Scott Baker2c1c4822019-10-16 11:02:41 -07001086 // Extract the message body
1087 responseBody := ic.InterContainerResponseBody{}
1088 if err := ptypes.UnmarshalAny(response.Body, &responseBody); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +00001089 logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -07001090 return nil, err
1091 }
Neha Sharma94f16a92020-06-26 04:17:55 +00001092 //logger.Debugw(ctx, "response-decoded-successfully", log.Fields{"response-status": &responseBody.Success})
Scott Baker2c1c4822019-10-16 11:02:41 -07001093
1094 return &responseBody, nil
1095
1096}