blob: 120ed45c28671e9664391e830fa64bf0a9991aee [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kafka
17
18import (
19 "context"
Matteo Scandolo46654682020-08-05 11:46:37 -070020 "encoding/json"
William Kurkianea869482019-04-09 15:16:11 -040021 "errors"
22 "fmt"
William Kurkianea869482019-04-09 15:16:11 -040023 "reflect"
24 "strings"
25 "sync"
26 "time"
William Kurkianea869482019-04-09 15:16:11 -040027
Girish Gowdra833343a2021-07-08 15:45:02 -070028 "google.golang.org/grpc/codes"
29 "google.golang.org/grpc/status"
30
Esin Karamanccb714b2019-11-29 15:02:06 +000031 "github.com/golang/protobuf/proto"
32 "github.com/golang/protobuf/ptypes"
33 "github.com/golang/protobuf/ptypes/any"
34 "github.com/google/uuid"
Girish Gowdra8a0bdcd2021-05-13 12:31:04 -070035 "github.com/opencord/voltha-lib-go/v5/pkg/log"
Girish Gowdraa09aeab2020-09-14 16:30:52 -070036 ic "github.com/opencord/voltha-protos/v4/go/inter_container"
Matteo Scandolo46654682020-08-05 11:46:37 -070037 "github.com/opentracing/opentracing-go"
Esin Karamanccb714b2019-11-29 15:02:06 +000038)
William Kurkianea869482019-04-09 15:16:11 -040039
40const (
41 DefaultMaxRetries = 3
divyadesaid26f6b12020-03-19 06:30:28 +000042 DefaultRequestTimeout = 60000 // 60000 milliseconds - to handle a wider latency range
William Kurkianea869482019-04-09 15:16:11 -040043)
44
45const (
46 TransactionKey = "transactionID"
47 FromTopic = "fromTopic"
48)
49
kdarapub26b4502019-10-05 03:02:33 +053050var ErrorTransactionNotAcquired = errors.New("transaction-not-acquired")
51var ErrorTransactionInvalidId = errors.New("transaction-invalid-id")
52
William Kurkianea869482019-04-09 15:16:11 -040053// requestHandlerChannel represents an interface associated with a channel. Whenever, an event is
54// obtained from that channel, this interface is invoked. This is used to handle
55// async requests into the Core via the kafka messaging bus
56type requestHandlerChannel struct {
57 requesthandlerInterface interface{}
58 ch <-chan *ic.InterContainerMessage
59}
60
61// transactionChannel represents a combination of a topic and a channel onto which a response received
62// on the kafka bus will be sent to
63type transactionChannel struct {
64 topic *Topic
65 ch chan *ic.InterContainerMessage
66}
67
npujarec5762e2020-01-01 14:08:48 +053068type InterContainerProxy interface {
Neha Sharma96b7bf22020-06-15 10:37:32 +000069 Start(ctx context.Context) error
70 Stop(ctx context.Context)
npujarec5762e2020-01-01 14:08:48 +053071 GetDefaultTopic() *Topic
npujarec5762e2020-01-01 14:08:48 +053072 InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic, waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any)
divyadesaid26f6b12020-03-19 06:30:28 +000073 InvokeAsyncRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic, waitForResponse bool, key string, kvArgs ...*KVArg) chan *RpcResponse
Neha Sharma96b7bf22020-06-15 10:37:32 +000074 SubscribeWithRequestHandlerInterface(ctx context.Context, topic Topic, handler interface{}) error
75 SubscribeWithDefaultRequestHandler(ctx context.Context, topic Topic, initialOffset int64) error
76 UnSubscribeFromRequestHandler(ctx context.Context, topic Topic) error
77 DeleteTopic(ctx context.Context, topic Topic) error
78 EnableLivenessChannel(ctx context.Context, enable bool) chan bool
79 SendLiveness(ctx context.Context) error
npujarec5762e2020-01-01 14:08:48 +053080}
81
82// interContainerProxy represents the messaging proxy
83type interContainerProxy struct {
Neha Sharma3f221ae2020-04-29 19:02:12 +000084 kafkaAddress string
npujarec5762e2020-01-01 14:08:48 +053085 defaultTopic *Topic
William Kurkianea869482019-04-09 15:16:11 -040086 defaultRequestHandlerInterface interface{}
William Kurkianea869482019-04-09 15:16:11 -040087 kafkaClient Client
npujarec5762e2020-01-01 14:08:48 +053088 doneCh chan struct{}
89 doneOnce sync.Once
William Kurkianea869482019-04-09 15:16:11 -040090
91 // This map is used to map a topic to an interface and channel. When a request is received
92 // on that channel (registered to the topic) then that interface is invoked.
93 topicToRequestHandlerChannelMap map[string]*requestHandlerChannel
94 lockTopicRequestHandlerChannelMap sync.RWMutex
95
96 // This map is used to map a channel to a response topic. This channel handles all responses on that
97 // channel for that topic and forward them to the appropriate consumers channel, using the
98 // transactionIdToChannelMap.
99 topicToResponseChannelMap map[string]<-chan *ic.InterContainerMessage
100 lockTopicResponseChannelMap sync.RWMutex
101
102 // This map is used to map a transaction to a consumers channel. This is used whenever a request has been
103 // sent out and we are waiting for a response.
104 transactionIdToChannelMap map[string]*transactionChannel
105 lockTransactionIdToChannelMap sync.RWMutex
106}
107
npujarec5762e2020-01-01 14:08:48 +0530108type InterContainerProxyOption func(*interContainerProxy)
William Kurkianea869482019-04-09 15:16:11 -0400109
Neha Sharma3f221ae2020-04-29 19:02:12 +0000110func InterContainerAddress(address string) InterContainerProxyOption {
npujarec5762e2020-01-01 14:08:48 +0530111 return func(args *interContainerProxy) {
Neha Sharma3f221ae2020-04-29 19:02:12 +0000112 args.kafkaAddress = address
William Kurkianea869482019-04-09 15:16:11 -0400113 }
114}
115
116func DefaultTopic(topic *Topic) InterContainerProxyOption {
npujarec5762e2020-01-01 14:08:48 +0530117 return func(args *interContainerProxy) {
118 args.defaultTopic = topic
William Kurkianea869482019-04-09 15:16:11 -0400119 }
120}
121
William Kurkianea869482019-04-09 15:16:11 -0400122func RequestHandlerInterface(handler interface{}) InterContainerProxyOption {
npujarec5762e2020-01-01 14:08:48 +0530123 return func(args *interContainerProxy) {
William Kurkianea869482019-04-09 15:16:11 -0400124 args.defaultRequestHandlerInterface = handler
125 }
126}
127
128func MsgClient(client Client) InterContainerProxyOption {
npujarec5762e2020-01-01 14:08:48 +0530129 return func(args *interContainerProxy) {
William Kurkianea869482019-04-09 15:16:11 -0400130 args.kafkaClient = client
131 }
132}
133
npujarec5762e2020-01-01 14:08:48 +0530134func newInterContainerProxy(opts ...InterContainerProxyOption) *interContainerProxy {
135 proxy := &interContainerProxy{
Neha Sharma3f221ae2020-04-29 19:02:12 +0000136 kafkaAddress: DefaultKafkaAddress,
137 doneCh: make(chan struct{}),
William Kurkianea869482019-04-09 15:16:11 -0400138 }
139
140 for _, option := range opts {
141 option(proxy)
142 }
143
npujarec5762e2020-01-01 14:08:48 +0530144 return proxy
William Kurkianea869482019-04-09 15:16:11 -0400145}
146
npujarec5762e2020-01-01 14:08:48 +0530147func NewInterContainerProxy(opts ...InterContainerProxyOption) InterContainerProxy {
148 return newInterContainerProxy(opts...)
149}
150
Neha Sharma96b7bf22020-06-15 10:37:32 +0000151func (kp *interContainerProxy) Start(ctx context.Context) error {
152 logger.Info(ctx, "Starting-Proxy")
William Kurkianea869482019-04-09 15:16:11 -0400153
154 // Kafka MsgClient should already have been created. If not, output fatal error
155 if kp.kafkaClient == nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000156 logger.Fatal(ctx, "kafka-client-not-set")
William Kurkianea869482019-04-09 15:16:11 -0400157 }
158
William Kurkianea869482019-04-09 15:16:11 -0400159 // Start the kafka client
Neha Sharma96b7bf22020-06-15 10:37:32 +0000160 if err := kp.kafkaClient.Start(ctx); err != nil {
161 logger.Errorw(ctx, "Cannot-create-kafka-proxy", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400162 return err
163 }
164
165 // Create the topic to response channel map
166 kp.topicToResponseChannelMap = make(map[string]<-chan *ic.InterContainerMessage)
167 //
168 // Create the transactionId to Channel Map
169 kp.transactionIdToChannelMap = make(map[string]*transactionChannel)
170
171 // Create the topic to request channel map
172 kp.topicToRequestHandlerChannelMap = make(map[string]*requestHandlerChannel)
173
174 return nil
175}
176
Neha Sharma96b7bf22020-06-15 10:37:32 +0000177func (kp *interContainerProxy) Stop(ctx context.Context) {
178 logger.Info(ctx, "stopping-intercontainer-proxy")
npujarec5762e2020-01-01 14:08:48 +0530179 kp.doneOnce.Do(func() { close(kp.doneCh) })
William Kurkianea869482019-04-09 15:16:11 -0400180 // TODO : Perform cleanup
Neha Sharma96b7bf22020-06-15 10:37:32 +0000181 kp.kafkaClient.Stop(ctx)
182 err := kp.deleteAllTopicRequestHandlerChannelMap(ctx)
Scott Bakere701b862020-02-20 16:19:16 -0800183 if err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000184 logger.Errorw(ctx, "failed-delete-all-topic-request-handler-channel-map", log.Fields{"error": err})
Scott Bakere701b862020-02-20 16:19:16 -0800185 }
Neha Sharma96b7bf22020-06-15 10:37:32 +0000186 err = kp.deleteAllTopicResponseChannelMap(ctx)
Scott Bakere701b862020-02-20 16:19:16 -0800187 if err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000188 logger.Errorw(ctx, "failed-delete-all-topic-response-channel-map", log.Fields{"error": err})
Scott Bakere701b862020-02-20 16:19:16 -0800189 }
Neha Sharma96b7bf22020-06-15 10:37:32 +0000190 kp.deleteAllTransactionIdToChannelMap(ctx)
William Kurkianea869482019-04-09 15:16:11 -0400191}
192
npujarec5762e2020-01-01 14:08:48 +0530193func (kp *interContainerProxy) GetDefaultTopic() *Topic {
194 return kp.defaultTopic
195}
196
divyadesaid26f6b12020-03-19 06:30:28 +0000197// InvokeAsyncRPC is used to make an RPC request asynchronously
198func (kp *interContainerProxy) InvokeAsyncRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
199 waitForResponse bool, key string, kvArgs ...*KVArg) chan *RpcResponse {
200
Matteo Scandolo46654682020-08-05 11:46:37 -0700201 spanArg, span, ctx := kp.embedSpanAsArg(ctx, rpc, !waitForResponse)
202 if spanArg != nil {
203 kvArgs = append(kvArgs, &spanArg[0])
204 }
Girish Gowdra833343a2021-07-08 15:45:02 -0700205
Matteo Scandolo46654682020-08-05 11:46:37 -0700206 defer span.Finish()
207
Girish Gowdra833343a2021-07-08 15:45:02 -0700208 logger.Debugw(ctx, "InvokeAsyncRPC", log.Fields{"rpc": rpc, "key": key, "kvArgs": kvArgs})
209
divyadesaid26f6b12020-03-19 06:30:28 +0000210 // If a replyToTopic is provided then we use it, otherwise just use the default toTopic. The replyToTopic is
211 // typically the device ID.
212 responseTopic := replyToTopic
213 if responseTopic == nil {
214 responseTopic = kp.GetDefaultTopic()
215 }
216
217 chnl := make(chan *RpcResponse)
218
219 go func() {
220
221 // once we're done,
222 // close the response channel
223 defer close(chnl)
224
225 var err error
226 var protoRequest *ic.InterContainerMessage
227
228 // Encode the request
Neha Sharma96b7bf22020-06-15 10:37:32 +0000229 protoRequest, err = encodeRequest(ctx, rpc, toTopic, responseTopic, key, kvArgs...)
divyadesaid26f6b12020-03-19 06:30:28 +0000230 if err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000231 logger.Warnw(ctx, "cannot-format-request", log.Fields{"rpc": rpc, "error": err})
Matteo Scandolo46654682020-08-05 11:46:37 -0700232 log.MarkSpanError(ctx, errors.New("cannot-format-request"))
divyadesaid26f6b12020-03-19 06:30:28 +0000233 chnl <- NewResponse(RpcFormattingError, err, nil)
234 return
235 }
236
237 // Subscribe for response, if needed, before sending request
238 var ch <-chan *ic.InterContainerMessage
Neha Sharma96b7bf22020-06-15 10:37:32 +0000239 if ch, err = kp.subscribeForResponse(ctx, *responseTopic, protoRequest.Header.Id); err != nil {
240 logger.Errorw(ctx, "failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
Matteo Scandolo46654682020-08-05 11:46:37 -0700241 log.MarkSpanError(ctx, errors.New("failed-to-subscribe-for-response"))
divyadesaid26f6b12020-03-19 06:30:28 +0000242 chnl <- NewResponse(RpcTransportError, err, nil)
243 return
244 }
245
246 // Send request - if the topic is formatted with a device Id then we will send the request using a
247 // specific key, hence ensuring a single partition is used to publish the request. This ensures that the
248 // subscriber on that topic will receive the request in the order it was sent. The key used is the deviceId.
Neha Sharma96b7bf22020-06-15 10:37:32 +0000249 logger.Debugw(ctx, "sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
divyadesaid26f6b12020-03-19 06:30:28 +0000250
251 // if the message is not sent on kafka publish an event an close the channel
Neha Sharma96b7bf22020-06-15 10:37:32 +0000252 if err = kp.kafkaClient.Send(ctx, protoRequest, toTopic, key); err != nil {
divyadesaid26f6b12020-03-19 06:30:28 +0000253 chnl <- NewResponse(RpcTransportError, err, nil)
254 return
255 }
256
257 // if the client is not waiting for a response send the ack and close the channel
258 chnl <- NewResponse(RpcSent, nil, nil)
259 if !waitForResponse {
260 return
261 }
262
263 defer func() {
264 // Remove the subscription for a response on return
Neha Sharma96b7bf22020-06-15 10:37:32 +0000265 if err := kp.unSubscribeForResponse(ctx, protoRequest.Header.Id); err != nil {
266 logger.Warnw(ctx, "invoke-async-rpc-unsubscriber-for-response-failed", log.Fields{"err": err})
divyadesaid26f6b12020-03-19 06:30:28 +0000267 }
268 }()
269
270 // Wait for response as well as timeout or cancellation
271 select {
272 case msg, ok := <-ch:
273 if !ok {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000274 logger.Warnw(ctx, "channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
Matteo Scandolo46654682020-08-05 11:46:37 -0700275 log.MarkSpanError(ctx, errors.New("channel-closed"))
divyadesaid26f6b12020-03-19 06:30:28 +0000276 chnl <- NewResponse(RpcTransportError, status.Error(codes.Aborted, "channel closed"), nil)
277 }
Neha Sharma96b7bf22020-06-15 10:37:32 +0000278 logger.Debugw(ctx, "received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
279 if responseBody, err := decodeResponse(ctx, msg); err != nil {
divyadesaid26f6b12020-03-19 06:30:28 +0000280 chnl <- NewResponse(RpcReply, err, nil)
281 } else {
282 if responseBody.Success {
283 chnl <- NewResponse(RpcReply, nil, responseBody.Result)
284 } else {
285 // response body contains an error
286 unpackErr := &ic.Error{}
287 if err := ptypes.UnmarshalAny(responseBody.Result, unpackErr); err != nil {
288 chnl <- NewResponse(RpcReply, err, nil)
289 } else {
290 chnl <- NewResponse(RpcReply, status.Error(codes.Internal, unpackErr.Reason), nil)
291 }
292 }
293 }
294 case <-ctx.Done():
Neha Sharma96b7bf22020-06-15 10:37:32 +0000295 logger.Errorw(ctx, "context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
Matteo Scandolo46654682020-08-05 11:46:37 -0700296 log.MarkSpanError(ctx, errors.New("context-cancelled"))
divyadesaid26f6b12020-03-19 06:30:28 +0000297 err := status.Error(codes.DeadlineExceeded, ctx.Err().Error())
298 chnl <- NewResponse(RpcTimeout, err, nil)
299 case <-kp.doneCh:
300 chnl <- NewResponse(RpcSystemClosing, nil, nil)
Neha Sharma96b7bf22020-06-15 10:37:32 +0000301 logger.Warnw(ctx, "received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
divyadesaid26f6b12020-03-19 06:30:28 +0000302 }
303 }()
304 return chnl
305}
306
Matteo Scandolo46654682020-08-05 11:46:37 -0700307// Method to extract Open-tracing Span from Context and serialize it for transport over Kafka embedded as a additional argument.
308// Additional argument is injected using key as "span" and value as Span marshalled into a byte slice
309//
310// The span name is automatically constructed using the RPC name with following convention (<rpc-name> represents name of invoked method):
311// - RPC invoked in Sync manner (WaitForResponse=true) : kafka-rpc-<rpc-name>
312// - RPC invoked in Async manner (WaitForResponse=false) : kafka-async-rpc-<rpc-name>
313// - Inter Adapter RPC invoked in Sync manner (WaitForResponse=true) : kafka-inter-adapter-rpc-<rpc-name>
314// - Inter Adapter RPC invoked in Async manner (WaitForResponse=false) : kafka-inter-adapter-async-rpc-<rpc-name>
315func (kp *interContainerProxy) embedSpanAsArg(ctx context.Context, rpc string, isAsync bool) ([]KVArg, opentracing.Span, context.Context) {
316 var err error
317 var newCtx context.Context
318 var spanToInject opentracing.Span
319
Girish Gowdra833343a2021-07-08 15:45:02 -0700320 if !log.GetGlobalLFM().GetLogCorrelationStatus() && !log.GetGlobalLFM().GetTracePublishingStatus() {
321 // if both log correlation and trace publishing is disable do not generate the span
322 logger.Debugw(ctx, "not-embedding-span-in-KVArg-", log.Fields{"rpc": rpc,
323 "log-correlation-status": log.GetGlobalLFM().GetLogCorrelationStatus(), "trace-publishing-status": log.GetGlobalLFM().GetTracePublishingStatus()})
324 return nil, opentracing.GlobalTracer().StartSpan(rpc), ctx
325 }
326
Matteo Scandolo46654682020-08-05 11:46:37 -0700327 var spanName strings.Builder
328 spanName.WriteString("kafka-")
329
330 // In case of inter adapter message, use Msg Type for constructing RPC name
331 if rpc == "process_inter_adapter_message" {
332 if msgType, ok := ctx.Value("inter-adapter-msg-type").(ic.InterAdapterMessageType_Types); ok {
333 spanName.WriteString("inter-adapter-")
334 rpc = msgType.String()
335 }
336 }
337
338 if isAsync {
339 spanName.WriteString("async-rpc-")
340 } else {
341 spanName.WriteString("rpc-")
342 }
343 spanName.WriteString(rpc)
344
345 if isAsync {
346 spanToInject, newCtx = log.CreateAsyncSpan(ctx, spanName.String())
347 } else {
348 spanToInject, newCtx = log.CreateChildSpan(ctx, spanName.String())
349 }
350
351 spanToInject.SetBaggageItem("rpc-span-name", spanName.String())
352
353 textMapCarrier := opentracing.TextMapCarrier(make(map[string]string))
354 if err = opentracing.GlobalTracer().Inject(spanToInject.Context(), opentracing.TextMap, textMapCarrier); err != nil {
355 logger.Warnw(ctx, "unable-to-serialize-span-to-textmap", log.Fields{"span": spanToInject, "error": err})
356 return nil, spanToInject, newCtx
357 }
358
359 var textMapJson []byte
360 if textMapJson, err = json.Marshal(textMapCarrier); err != nil {
361 logger.Warnw(ctx, "unable-to-marshal-textmap-to-json-string", log.Fields{"textMap": textMapCarrier, "error": err})
362 return nil, spanToInject, newCtx
363 }
364
365 spanArg := make([]KVArg, 1)
366 spanArg[0] = KVArg{Key: "span", Value: &ic.StrType{Val: string(textMapJson)}}
367 return spanArg, spanToInject, newCtx
368}
369
William Kurkianea869482019-04-09 15:16:11 -0400370// InvokeRPC is used to send a request to a given topic
npujarec5762e2020-01-01 14:08:48 +0530371func (kp *interContainerProxy) InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
William Kurkianea869482019-04-09 15:16:11 -0400372 waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any) {
373
Matteo Scandolo46654682020-08-05 11:46:37 -0700374 spanArg, span, ctx := kp.embedSpanAsArg(ctx, rpc, false)
375 if spanArg != nil {
376 kvArgs = append(kvArgs, &spanArg[0])
377 }
Girish Gowdra833343a2021-07-08 15:45:02 -0700378
Matteo Scandolo46654682020-08-05 11:46:37 -0700379 defer span.Finish()
380
Girish Gowdra833343a2021-07-08 15:45:02 -0700381 logger.Debugw(ctx, "InvokeRPC", log.Fields{"rpc": rpc, "key": key, "kvArgs": kvArgs})
382
William Kurkianea869482019-04-09 15:16:11 -0400383 // If a replyToTopic is provided then we use it, otherwise just use the default toTopic. The replyToTopic is
384 // typically the device ID.
385 responseTopic := replyToTopic
386 if responseTopic == nil {
npujarec5762e2020-01-01 14:08:48 +0530387 responseTopic = kp.defaultTopic
William Kurkianea869482019-04-09 15:16:11 -0400388 }
389
390 // Encode the request
Neha Sharma96b7bf22020-06-15 10:37:32 +0000391 protoRequest, err := encodeRequest(ctx, rpc, toTopic, responseTopic, key, kvArgs...)
William Kurkianea869482019-04-09 15:16:11 -0400392 if err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000393 logger.Warnw(ctx, "cannot-format-request", log.Fields{"rpc": rpc, "error": err})
Matteo Scandolo46654682020-08-05 11:46:37 -0700394 log.MarkSpanError(ctx, errors.New("cannot-format-request"))
William Kurkianea869482019-04-09 15:16:11 -0400395 return false, nil
396 }
397
398 // Subscribe for response, if needed, before sending request
399 var ch <-chan *ic.InterContainerMessage
400 if waitForResponse {
401 var err error
Neha Sharma96b7bf22020-06-15 10:37:32 +0000402 if ch, err = kp.subscribeForResponse(ctx, *responseTopic, protoRequest.Header.Id); err != nil {
403 logger.Errorw(ctx, "failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400404 }
405 }
406
407 // Send request - if the topic is formatted with a device Id then we will send the request using a
408 // specific key, hence ensuring a single partition is used to publish the request. This ensures that the
409 // subscriber on that topic will receive the request in the order it was sent. The key used is the deviceId.
410 //key := GetDeviceIdFromTopic(*toTopic)
Neha Sharma96b7bf22020-06-15 10:37:32 +0000411 logger.Debugw(ctx, "sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000412 go func() {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000413 if err := kp.kafkaClient.Send(ctx, protoRequest, toTopic, key); err != nil {
Matteo Scandolo46654682020-08-05 11:46:37 -0700414 log.MarkSpanError(ctx, errors.New("send-failed"))
Neha Sharma96b7bf22020-06-15 10:37:32 +0000415 logger.Errorw(ctx, "send-failed", log.Fields{
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000416 "topic": toTopic,
417 "key": key,
418 "error": err})
419 }
420 }()
William Kurkianea869482019-04-09 15:16:11 -0400421
422 if waitForResponse {
423 // Create a child context based on the parent context, if any
424 var cancel context.CancelFunc
425 childCtx := context.Background()
426 if ctx == nil {
427 ctx, cancel = context.WithTimeout(context.Background(), DefaultRequestTimeout*time.Millisecond)
428 } else {
429 childCtx, cancel = context.WithTimeout(ctx, DefaultRequestTimeout*time.Millisecond)
430 }
431 defer cancel()
432
433 // Wait for response as well as timeout or cancellation
434 // Remove the subscription for a response on return
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000435 defer func() {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000436 if err := kp.unSubscribeForResponse(ctx, protoRequest.Header.Id); err != nil {
437 logger.Errorw(ctx, "response-unsubscribe-failed", log.Fields{
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000438 "id": protoRequest.Header.Id,
439 "error": err})
440 }
441 }()
William Kurkianea869482019-04-09 15:16:11 -0400442 select {
443 case msg, ok := <-ch:
444 if !ok {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000445 logger.Warnw(ctx, "channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
Matteo Scandolo46654682020-08-05 11:46:37 -0700446 log.MarkSpanError(ctx, errors.New("channel-closed"))
William Kurkianea869482019-04-09 15:16:11 -0400447 protoError := &ic.Error{Reason: "channel-closed"}
448 var marshalledArg *any.Any
449 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
450 return false, nil // Should never happen
451 }
452 return false, marshalledArg
453 }
Neha Sharma96b7bf22020-06-15 10:37:32 +0000454 logger.Debugw(ctx, "received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
William Kurkianea869482019-04-09 15:16:11 -0400455 var responseBody *ic.InterContainerResponseBody
456 var err error
Neha Sharma96b7bf22020-06-15 10:37:32 +0000457 if responseBody, err = decodeResponse(ctx, msg); err != nil {
458 logger.Errorw(ctx, "decode-response-error", log.Fields{"error": err})
npujarec5762e2020-01-01 14:08:48 +0530459 // FIXME we should return something
William Kurkianea869482019-04-09 15:16:11 -0400460 }
461 return responseBody.Success, responseBody.Result
462 case <-ctx.Done():
Neha Sharma96b7bf22020-06-15 10:37:32 +0000463 logger.Debugw(ctx, "context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
Matteo Scandolo46654682020-08-05 11:46:37 -0700464 log.MarkSpanError(ctx, errors.New("context-cancelled"))
William Kurkianea869482019-04-09 15:16:11 -0400465 // pack the error as proto any type
npujarec5762e2020-01-01 14:08:48 +0530466 protoError := &ic.Error{Reason: ctx.Err().Error(), Code: ic.ErrorCode_DEADLINE_EXCEEDED}
467
William Kurkianea869482019-04-09 15:16:11 -0400468 var marshalledArg *any.Any
469 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
470 return false, nil // Should never happen
471 }
472 return false, marshalledArg
473 case <-childCtx.Done():
Neha Sharma96b7bf22020-06-15 10:37:32 +0000474 logger.Debugw(ctx, "context-cancelled", log.Fields{"rpc": rpc, "ctx": childCtx.Err()})
Matteo Scandolo46654682020-08-05 11:46:37 -0700475 log.MarkSpanError(ctx, errors.New("context-cancelled"))
William Kurkianea869482019-04-09 15:16:11 -0400476 // pack the error as proto any type
npujarec5762e2020-01-01 14:08:48 +0530477 protoError := &ic.Error{Reason: childCtx.Err().Error(), Code: ic.ErrorCode_DEADLINE_EXCEEDED}
478
William Kurkianea869482019-04-09 15:16:11 -0400479 var marshalledArg *any.Any
480 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
481 return false, nil // Should never happen
482 }
483 return false, marshalledArg
484 case <-kp.doneCh:
Neha Sharma96b7bf22020-06-15 10:37:32 +0000485 logger.Infow(ctx, "received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
William Kurkianea869482019-04-09 15:16:11 -0400486 return true, nil
487 }
488 }
489 return true, nil
490}
491
492// SubscribeWithRequestHandlerInterface allows a caller to assign a target object to be invoked automatically
493// when a message is received on a given topic
Neha Sharma96b7bf22020-06-15 10:37:32 +0000494func (kp *interContainerProxy) SubscribeWithRequestHandlerInterface(ctx context.Context, topic Topic, handler interface{}) error {
William Kurkianea869482019-04-09 15:16:11 -0400495
496 // Subscribe to receive messages for that topic
497 var ch <-chan *ic.InterContainerMessage
498 var err error
Neha Sharma96b7bf22020-06-15 10:37:32 +0000499 if ch, err = kp.kafkaClient.Subscribe(ctx, &topic); err != nil {
William Kurkianea869482019-04-09 15:16:11 -0400500 //if ch, err = kp.Subscribe(topic); err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000501 logger.Errorw(ctx, "failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
Matt Jeannereteb5059f2019-07-19 06:11:00 -0400502 return err
William Kurkianea869482019-04-09 15:16:11 -0400503 }
504
505 kp.defaultRequestHandlerInterface = handler
506 kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: handler, ch: ch})
507 // Launch a go routine to receive and process kafka messages
Neha Sharma96b7bf22020-06-15 10:37:32 +0000508 go kp.waitForMessages(ctx, ch, topic, handler)
William Kurkianea869482019-04-09 15:16:11 -0400509
510 return nil
511}
512
513// SubscribeWithDefaultRequestHandler allows a caller to add a topic to an existing target object to be invoked automatically
514// when a message is received on a given topic. So far there is only 1 target registered per microservice
Neha Sharma96b7bf22020-06-15 10:37:32 +0000515func (kp *interContainerProxy) SubscribeWithDefaultRequestHandler(ctx context.Context, topic Topic, initialOffset int64) error {
William Kurkianea869482019-04-09 15:16:11 -0400516 // Subscribe to receive messages for that topic
517 var ch <-chan *ic.InterContainerMessage
518 var err error
Neha Sharma96b7bf22020-06-15 10:37:32 +0000519 if ch, err = kp.kafkaClient.Subscribe(ctx, &topic, &KVArg{Key: Offset, Value: initialOffset}); err != nil {
520 logger.Errorw(ctx, "failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400521 return err
522 }
523 kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: kp.defaultRequestHandlerInterface, ch: ch})
524
525 // Launch a go routine to receive and process kafka messages
Neha Sharma96b7bf22020-06-15 10:37:32 +0000526 go kp.waitForMessages(ctx, ch, topic, kp.defaultRequestHandlerInterface)
William Kurkianea869482019-04-09 15:16:11 -0400527
528 return nil
529}
530
Neha Sharma96b7bf22020-06-15 10:37:32 +0000531func (kp *interContainerProxy) UnSubscribeFromRequestHandler(ctx context.Context, topic Topic) error {
532 return kp.deleteFromTopicRequestHandlerChannelMap(ctx, topic.Name)
William Kurkianea869482019-04-09 15:16:11 -0400533}
534
Neha Sharma96b7bf22020-06-15 10:37:32 +0000535func (kp *interContainerProxy) deleteFromTopicResponseChannelMap(ctx context.Context, topic string) error {
William Kurkianea869482019-04-09 15:16:11 -0400536 kp.lockTopicResponseChannelMap.Lock()
537 defer kp.lockTopicResponseChannelMap.Unlock()
538 if _, exist := kp.topicToResponseChannelMap[topic]; exist {
539 // Unsubscribe to this topic first - this will close the subscribed channel
540 var err error
Neha Sharma96b7bf22020-06-15 10:37:32 +0000541 if err = kp.kafkaClient.UnSubscribe(ctx, &Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
542 logger.Errorw(ctx, "unsubscribing-error", log.Fields{"topic": topic})
William Kurkianea869482019-04-09 15:16:11 -0400543 }
544 delete(kp.topicToResponseChannelMap, topic)
545 return err
546 } else {
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000547 return fmt.Errorf("%s-Topic-not-found", topic)
William Kurkianea869482019-04-09 15:16:11 -0400548 }
549}
550
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000551// nolint: unused
Neha Sharma96b7bf22020-06-15 10:37:32 +0000552func (kp *interContainerProxy) deleteAllTopicResponseChannelMap(ctx context.Context) error {
553 logger.Debug(ctx, "delete-all-topic-response-channel")
William Kurkianea869482019-04-09 15:16:11 -0400554 kp.lockTopicResponseChannelMap.Lock()
555 defer kp.lockTopicResponseChannelMap.Unlock()
Scott Bakere701b862020-02-20 16:19:16 -0800556 var unsubscribeFailTopics []string
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000557 for topic := range kp.topicToResponseChannelMap {
William Kurkianea869482019-04-09 15:16:11 -0400558 // Unsubscribe to this topic first - this will close the subscribed channel
Neha Sharma96b7bf22020-06-15 10:37:32 +0000559 if err := kp.kafkaClient.UnSubscribe(ctx, &Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
Scott Bakere701b862020-02-20 16:19:16 -0800560 unsubscribeFailTopics = append(unsubscribeFailTopics, topic)
Neha Sharma96b7bf22020-06-15 10:37:32 +0000561 logger.Errorw(ctx, "unsubscribing-error", log.Fields{"topic": topic, "error": err})
Scott Bakere701b862020-02-20 16:19:16 -0800562 // Do not return. Continue to try to unsubscribe to other topics.
563 } else {
564 // Only delete from channel map if successfully unsubscribed.
565 delete(kp.topicToResponseChannelMap, topic)
William Kurkianea869482019-04-09 15:16:11 -0400566 }
William Kurkianea869482019-04-09 15:16:11 -0400567 }
Scott Bakere701b862020-02-20 16:19:16 -0800568 if len(unsubscribeFailTopics) > 0 {
569 return fmt.Errorf("unsubscribe-errors: %v", unsubscribeFailTopics)
570 }
571 return nil
William Kurkianea869482019-04-09 15:16:11 -0400572}
573
npujarec5762e2020-01-01 14:08:48 +0530574func (kp *interContainerProxy) addToTopicRequestHandlerChannelMap(topic string, arg *requestHandlerChannel) {
William Kurkianea869482019-04-09 15:16:11 -0400575 kp.lockTopicRequestHandlerChannelMap.Lock()
576 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
577 if _, exist := kp.topicToRequestHandlerChannelMap[topic]; !exist {
578 kp.topicToRequestHandlerChannelMap[topic] = arg
579 }
580}
581
Neha Sharma96b7bf22020-06-15 10:37:32 +0000582func (kp *interContainerProxy) deleteFromTopicRequestHandlerChannelMap(ctx context.Context, topic string) error {
William Kurkianea869482019-04-09 15:16:11 -0400583 kp.lockTopicRequestHandlerChannelMap.Lock()
584 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
585 if _, exist := kp.topicToRequestHandlerChannelMap[topic]; exist {
586 // Close the kafka client client first by unsubscribing to this topic
Neha Sharma96b7bf22020-06-15 10:37:32 +0000587 if err := kp.kafkaClient.UnSubscribe(ctx, &Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000588 return err
589 }
William Kurkianea869482019-04-09 15:16:11 -0400590 delete(kp.topicToRequestHandlerChannelMap, topic)
591 return nil
592 } else {
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000593 return fmt.Errorf("%s-Topic-not-found", topic)
William Kurkianea869482019-04-09 15:16:11 -0400594 }
595}
596
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000597// nolint: unused
Neha Sharma96b7bf22020-06-15 10:37:32 +0000598func (kp *interContainerProxy) deleteAllTopicRequestHandlerChannelMap(ctx context.Context) error {
599 logger.Debug(ctx, "delete-all-topic-request-channel")
William Kurkianea869482019-04-09 15:16:11 -0400600 kp.lockTopicRequestHandlerChannelMap.Lock()
601 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
Scott Bakere701b862020-02-20 16:19:16 -0800602 var unsubscribeFailTopics []string
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000603 for topic := range kp.topicToRequestHandlerChannelMap {
William Kurkianea869482019-04-09 15:16:11 -0400604 // Close the kafka client client first by unsubscribing to this topic
Neha Sharma96b7bf22020-06-15 10:37:32 +0000605 if err := kp.kafkaClient.UnSubscribe(ctx, &Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
Scott Bakere701b862020-02-20 16:19:16 -0800606 unsubscribeFailTopics = append(unsubscribeFailTopics, topic)
Neha Sharma96b7bf22020-06-15 10:37:32 +0000607 logger.Errorw(ctx, "unsubscribing-error", log.Fields{"topic": topic, "error": err})
Scott Bakere701b862020-02-20 16:19:16 -0800608 // Do not return. Continue to try to unsubscribe to other topics.
609 } else {
610 // Only delete from channel map if successfully unsubscribed.
611 delete(kp.topicToRequestHandlerChannelMap, topic)
William Kurkianea869482019-04-09 15:16:11 -0400612 }
William Kurkianea869482019-04-09 15:16:11 -0400613 }
Scott Bakere701b862020-02-20 16:19:16 -0800614 if len(unsubscribeFailTopics) > 0 {
615 return fmt.Errorf("unsubscribe-errors: %v", unsubscribeFailTopics)
616 }
617 return nil
William Kurkianea869482019-04-09 15:16:11 -0400618}
619
npujarec5762e2020-01-01 14:08:48 +0530620func (kp *interContainerProxy) addToTransactionIdToChannelMap(id string, topic *Topic, arg chan *ic.InterContainerMessage) {
William Kurkianea869482019-04-09 15:16:11 -0400621 kp.lockTransactionIdToChannelMap.Lock()
622 defer kp.lockTransactionIdToChannelMap.Unlock()
623 if _, exist := kp.transactionIdToChannelMap[id]; !exist {
624 kp.transactionIdToChannelMap[id] = &transactionChannel{topic: topic, ch: arg}
625 }
626}
627
npujarec5762e2020-01-01 14:08:48 +0530628func (kp *interContainerProxy) deleteFromTransactionIdToChannelMap(id string) {
William Kurkianea869482019-04-09 15:16:11 -0400629 kp.lockTransactionIdToChannelMap.Lock()
630 defer kp.lockTransactionIdToChannelMap.Unlock()
631 if transChannel, exist := kp.transactionIdToChannelMap[id]; exist {
632 // Close the channel first
633 close(transChannel.ch)
634 delete(kp.transactionIdToChannelMap, id)
635 }
636}
637
npujarec5762e2020-01-01 14:08:48 +0530638func (kp *interContainerProxy) deleteTopicTransactionIdToChannelMap(id string) {
William Kurkianea869482019-04-09 15:16:11 -0400639 kp.lockTransactionIdToChannelMap.Lock()
640 defer kp.lockTransactionIdToChannelMap.Unlock()
641 for key, value := range kp.transactionIdToChannelMap {
642 if value.topic.Name == id {
643 close(value.ch)
644 delete(kp.transactionIdToChannelMap, key)
645 }
646 }
647}
648
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000649// nolint: unused
Neha Sharma96b7bf22020-06-15 10:37:32 +0000650func (kp *interContainerProxy) deleteAllTransactionIdToChannelMap(ctx context.Context) {
651 logger.Debug(ctx, "delete-all-transaction-id-channel-map")
William Kurkianea869482019-04-09 15:16:11 -0400652 kp.lockTransactionIdToChannelMap.Lock()
653 defer kp.lockTransactionIdToChannelMap.Unlock()
654 for key, value := range kp.transactionIdToChannelMap {
655 close(value.ch)
656 delete(kp.transactionIdToChannelMap, key)
657 }
658}
659
Neha Sharma96b7bf22020-06-15 10:37:32 +0000660func (kp *interContainerProxy) DeleteTopic(ctx context.Context, topic Topic) error {
William Kurkianea869482019-04-09 15:16:11 -0400661 // If we have any consumers on that topic we need to close them
Neha Sharma96b7bf22020-06-15 10:37:32 +0000662 if err := kp.deleteFromTopicResponseChannelMap(ctx, topic.Name); err != nil {
663 logger.Errorw(ctx, "delete-from-topic-responsechannelmap-failed", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400664 }
Neha Sharma96b7bf22020-06-15 10:37:32 +0000665 if err := kp.deleteFromTopicRequestHandlerChannelMap(ctx, topic.Name); err != nil {
666 logger.Errorw(ctx, "delete-from-topic-requesthandlerchannelmap-failed", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400667 }
668 kp.deleteTopicTransactionIdToChannelMap(topic.Name)
669
Neha Sharma96b7bf22020-06-15 10:37:32 +0000670 return kp.kafkaClient.DeleteTopic(ctx, &topic)
William Kurkianea869482019-04-09 15:16:11 -0400671}
672
Neha Sharma96b7bf22020-06-15 10:37:32 +0000673func encodeReturnedValue(ctx context.Context, returnedVal interface{}) (*any.Any, error) {
William Kurkianea869482019-04-09 15:16:11 -0400674 // Encode the response argument - needs to be a proto message
675 if returnedVal == nil {
676 return nil, nil
677 }
678 protoValue, ok := returnedVal.(proto.Message)
679 if !ok {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000680 logger.Warnw(ctx, "response-value-not-proto-message", log.Fields{"error": ok, "returnVal": returnedVal})
William Kurkianea869482019-04-09 15:16:11 -0400681 err := errors.New("response-value-not-proto-message")
682 return nil, err
683 }
684
685 // Marshal the returned value, if any
686 var marshalledReturnedVal *any.Any
687 var err error
688 if marshalledReturnedVal, err = ptypes.MarshalAny(protoValue); err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000689 logger.Warnw(ctx, "cannot-marshal-returned-val", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400690 return nil, err
691 }
692 return marshalledReturnedVal, nil
693}
694
Neha Sharma96b7bf22020-06-15 10:37:32 +0000695func encodeDefaultFailedResponse(ctx context.Context, request *ic.InterContainerMessage) *ic.InterContainerMessage {
William Kurkianea869482019-04-09 15:16:11 -0400696 responseHeader := &ic.Header{
697 Id: request.Header.Id,
698 Type: ic.MessageType_RESPONSE,
699 FromTopic: request.Header.ToTopic,
700 ToTopic: request.Header.FromTopic,
Scott Bakered4a8e72020-04-17 11:10:20 -0700701 Timestamp: ptypes.TimestampNow(),
William Kurkianea869482019-04-09 15:16:11 -0400702 }
703 responseBody := &ic.InterContainerResponseBody{
704 Success: false,
705 Result: nil,
706 }
707 var marshalledResponseBody *any.Any
708 var err error
709 // Error should never happen here
710 if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000711 logger.Warnw(ctx, "cannot-marshal-failed-response-body", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400712 }
713
714 return &ic.InterContainerMessage{
715 Header: responseHeader,
716 Body: marshalledResponseBody,
717 }
718
719}
720
721//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
722//or an error on failure
Neha Sharma96b7bf22020-06-15 10:37:32 +0000723func encodeResponse(ctx context.Context, request *ic.InterContainerMessage, success bool, returnedValues ...interface{}) (*ic.InterContainerMessage, error) {
724 //logger.Debugw(ctx, "encodeResponse", log.Fields{"success": success, "returnedValues": returnedValues})
William Kurkianea869482019-04-09 15:16:11 -0400725 responseHeader := &ic.Header{
726 Id: request.Header.Id,
727 Type: ic.MessageType_RESPONSE,
728 FromTopic: request.Header.ToTopic,
729 ToTopic: request.Header.FromTopic,
Matt Jeanneret384d8c92019-05-06 14:27:31 -0400730 KeyTopic: request.Header.KeyTopic,
Scott Bakered4a8e72020-04-17 11:10:20 -0700731 Timestamp: ptypes.TimestampNow(),
William Kurkianea869482019-04-09 15:16:11 -0400732 }
733
734 // Go over all returned values
735 var marshalledReturnedVal *any.Any
736 var err error
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000737
738 // for now we support only 1 returned value - (excluding the error)
739 if len(returnedValues) > 0 {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000740 if marshalledReturnedVal, err = encodeReturnedValue(ctx, returnedValues[0]); err != nil {
741 logger.Warnw(ctx, "cannot-marshal-response-body", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400742 }
William Kurkianea869482019-04-09 15:16:11 -0400743 }
744
745 responseBody := &ic.InterContainerResponseBody{
746 Success: success,
747 Result: marshalledReturnedVal,
748 }
749
750 // Marshal the response body
751 var marshalledResponseBody *any.Any
752 if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000753 logger.Warnw(ctx, "cannot-marshal-response-body", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400754 return nil, err
755 }
756
757 return &ic.InterContainerMessage{
758 Header: responseHeader,
759 Body: marshalledResponseBody,
760 }, nil
761}
762
Neha Sharma96b7bf22020-06-15 10:37:32 +0000763func CallFuncByName(ctx context.Context, myClass interface{}, funcName string, params ...interface{}) (out []reflect.Value, err error) {
William Kurkianea869482019-04-09 15:16:11 -0400764 myClassValue := reflect.ValueOf(myClass)
765 // Capitalize the first letter in the funcName to workaround the first capital letters required to
766 // invoke a function from a different package
767 funcName = strings.Title(funcName)
768 m := myClassValue.MethodByName(funcName)
769 if !m.IsValid() {
770 return make([]reflect.Value, 0), fmt.Errorf("method-not-found \"%s\"", funcName)
771 }
Neha Sharma96b7bf22020-06-15 10:37:32 +0000772 in := make([]reflect.Value, len(params)+1)
773 in[0] = reflect.ValueOf(ctx)
William Kurkianea869482019-04-09 15:16:11 -0400774 for i, param := range params {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000775 in[i+1] = reflect.ValueOf(param)
William Kurkianea869482019-04-09 15:16:11 -0400776 }
777 out = m.Call(in)
778 return
779}
780
Neha Sharma96b7bf22020-06-15 10:37:32 +0000781func (kp *interContainerProxy) addTransactionId(ctx context.Context, transactionId string, currentArgs []*ic.Argument) []*ic.Argument {
William Kurkianea869482019-04-09 15:16:11 -0400782 arg := &KVArg{
783 Key: TransactionKey,
784 Value: &ic.StrType{Val: transactionId},
785 }
786
787 var marshalledArg *any.Any
788 var err error
789 if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: transactionId}); err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000790 logger.Warnw(ctx, "cannot-add-transactionId", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400791 return currentArgs
792 }
793 protoArg := &ic.Argument{
794 Key: arg.Key,
795 Value: marshalledArg,
796 }
797 return append(currentArgs, protoArg)
798}
799
Neha Sharma96b7bf22020-06-15 10:37:32 +0000800func (kp *interContainerProxy) addFromTopic(ctx context.Context, fromTopic string, currentArgs []*ic.Argument) []*ic.Argument {
William Kurkianea869482019-04-09 15:16:11 -0400801 var marshalledArg *any.Any
802 var err error
803 if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: fromTopic}); err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000804 logger.Warnw(ctx, "cannot-add-transactionId", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400805 return currentArgs
806 }
807 protoArg := &ic.Argument{
808 Key: FromTopic,
809 Value: marshalledArg,
810 }
811 return append(currentArgs, protoArg)
812}
813
Matteo Scandolo46654682020-08-05 11:46:37 -0700814// Method to extract the Span embedded in Kafka RPC request on the receiver side. If span is found embedded in the KV args (with key as "span"),
815// it is de-serialized and injected into the Context to be carried forward by the RPC request processor thread.
816// If no span is found embedded, even then a span is created with name as "kafka-rpc-<rpc-name>" to enrich the Context for RPC calls coming
817// from components currently not sending the span (e.g. openonu adapter)
818func (kp *interContainerProxy) enrichContextWithSpan(ctx context.Context, rpcName string, args []*ic.Argument) (opentracing.Span, context.Context) {
819
820 for _, arg := range args {
821 if arg.Key == "span" {
822 var err error
823 var textMapString ic.StrType
824 if err = ptypes.UnmarshalAny(arg.Value, &textMapString); err != nil {
Girish Gowdra833343a2021-07-08 15:45:02 -0700825 logger.Debug(ctx, "unable-to-unmarshal-kvarg-to-textmap-string", log.Fields{"value": arg.Value})
Matteo Scandolo46654682020-08-05 11:46:37 -0700826 break
827 }
828
829 spanTextMap := make(map[string]string)
830 if err = json.Unmarshal([]byte(textMapString.Val), &spanTextMap); err != nil {
Girish Gowdra833343a2021-07-08 15:45:02 -0700831 logger.Debug(ctx, "unable-to-unmarshal-textmap-from-json-string", log.Fields{"textMapString": textMapString, "error": err})
Matteo Scandolo46654682020-08-05 11:46:37 -0700832 break
833 }
834
835 var spanContext opentracing.SpanContext
836 if spanContext, err = opentracing.GlobalTracer().Extract(opentracing.TextMap, opentracing.TextMapCarrier(spanTextMap)); err != nil {
Girish Gowdra833343a2021-07-08 15:45:02 -0700837 logger.Debug(ctx, "unable-to-deserialize-textmap-to-span", log.Fields{"textMap": spanTextMap, "error": err})
Matteo Scandolo46654682020-08-05 11:46:37 -0700838 break
839 }
840
841 var receivedRpcName string
842 extractBaggage := func(k, v string) bool {
843 if k == "rpc-span-name" {
844 receivedRpcName = v
845 return false
846 }
847 return true
848 }
849
850 spanContext.ForeachBaggageItem(extractBaggage)
851
852 return opentracing.StartSpanFromContext(ctx, receivedRpcName, opentracing.FollowsFrom(spanContext))
853 }
854 }
855
856 // Create new Child Span with rpc as name if no span details were received in kafka arguments
857 var spanName strings.Builder
858 spanName.WriteString("kafka-")
859
860 // In case of inter adapter message, use Msg Type for constructing RPC name
861 if rpcName == "process_inter_adapter_message" {
862 for _, arg := range args {
863 if arg.Key == "msg" {
864 iamsg := ic.InterAdapterMessage{}
865 if err := ptypes.UnmarshalAny(arg.Value, &iamsg); err == nil {
866 spanName.WriteString("inter-adapter-")
867 rpcName = iamsg.Header.Type.String()
868 }
869 }
870 }
871 }
872
873 spanName.WriteString("rpc-")
874 spanName.WriteString(rpcName)
875
876 return opentracing.StartSpanFromContext(ctx, spanName.String())
877}
878
Neha Sharma96b7bf22020-06-15 10:37:32 +0000879func (kp *interContainerProxy) handleMessage(ctx context.Context, msg *ic.InterContainerMessage, targetInterface interface{}) {
William Kurkianea869482019-04-09 15:16:11 -0400880
881 // First extract the header to know whether this is a request - responses are handled by a different handler
882 if msg.Header.Type == ic.MessageType_REQUEST {
883 var out []reflect.Value
884 var err error
885
886 // Get the request body
887 requestBody := &ic.InterContainerRequestBody{}
888 if err = ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000889 logger.Warnw(ctx, "cannot-unmarshal-request", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400890 } else {
Girish Gowdra833343a2021-07-08 15:45:02 -0700891 logger.Debugw(ctx, "received-request", log.Fields{"rpc": requestBody.Rpc, "header": msg.Header, "args": requestBody.Args})
Matteo Scandolo46654682020-08-05 11:46:37 -0700892 span, ctx := kp.enrichContextWithSpan(ctx, requestBody.Rpc, requestBody.Args)
893 defer span.Finish()
894
William Kurkianea869482019-04-09 15:16:11 -0400895 // let the callee unpack the arguments as its the only one that knows the real proto type
896 // Augment the requestBody with the message Id as it will be used in scenarios where cores
897 // are set in pairs and competing
Neha Sharma96b7bf22020-06-15 10:37:32 +0000898 requestBody.Args = kp.addTransactionId(ctx, msg.Header.Id, requestBody.Args)
William Kurkianea869482019-04-09 15:16:11 -0400899
900 // Augment the requestBody with the From topic name as it will be used in scenarios where a container
901 // needs to send an unsollicited message to the currently requested container
Neha Sharma96b7bf22020-06-15 10:37:32 +0000902 requestBody.Args = kp.addFromTopic(ctx, msg.Header.FromTopic, requestBody.Args)
William Kurkianea869482019-04-09 15:16:11 -0400903
Neha Sharma96b7bf22020-06-15 10:37:32 +0000904 out, err = CallFuncByName(ctx, targetInterface, requestBody.Rpc, requestBody.Args)
William Kurkianea869482019-04-09 15:16:11 -0400905 if err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000906 logger.Warn(ctx, err)
William Kurkianea869482019-04-09 15:16:11 -0400907 }
908 }
909 // Response required?
910 if requestBody.ResponseRequired {
911 // If we already have an error before then just return that
912 var returnError *ic.Error
913 var returnedValues []interface{}
914 var success bool
915 if err != nil {
916 returnError = &ic.Error{Reason: err.Error()}
917 returnedValues = make([]interface{}, 1)
918 returnedValues[0] = returnError
919 } else {
920 returnedValues = make([]interface{}, 0)
921 // Check for errors first
922 lastIndex := len(out) - 1
923 if out[lastIndex].Interface() != nil { // Error
kdarapub26b4502019-10-05 03:02:33 +0530924 if retError, ok := out[lastIndex].Interface().(error); ok {
925 if retError.Error() == ErrorTransactionNotAcquired.Error() {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000926 logger.Debugw(ctx, "Ignoring request", log.Fields{"error": retError, "txId": msg.Header.Id})
kdarapub26b4502019-10-05 03:02:33 +0530927 return // Ignore - process is in competing mode and ignored transaction
928 }
929 returnError = &ic.Error{Reason: retError.Error()}
William Kurkianea869482019-04-09 15:16:11 -0400930 returnedValues = append(returnedValues, returnError)
931 } else { // Should never happen
932 returnError = &ic.Error{Reason: "incorrect-error-returns"}
933 returnedValues = append(returnedValues, returnError)
934 }
935 } else if len(out) == 2 && reflect.ValueOf(out[0].Interface()).IsValid() && reflect.ValueOf(out[0].Interface()).IsNil() {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000936 logger.Warnw(ctx, "Unexpected response of (nil,nil)", log.Fields{"txId": msg.Header.Id})
kdarapub26b4502019-10-05 03:02:33 +0530937 return // Ignore - should not happen
William Kurkianea869482019-04-09 15:16:11 -0400938 } else { // Non-error case
939 success = true
940 for idx, val := range out {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000941 //logger.Debugw(ctx, "returned-api-response-loop", log.Fields{"idx": idx, "val": val.Interface()})
William Kurkianea869482019-04-09 15:16:11 -0400942 if idx != lastIndex {
943 returnedValues = append(returnedValues, val.Interface())
944 }
945 }
946 }
947 }
948
949 var icm *ic.InterContainerMessage
Neha Sharma96b7bf22020-06-15 10:37:32 +0000950 if icm, err = encodeResponse(ctx, msg, success, returnedValues...); err != nil {
951 logger.Warnw(ctx, "error-encoding-response-returning-failure-result", log.Fields{"error": err})
952 icm = encodeDefaultFailedResponse(ctx, msg)
William Kurkianea869482019-04-09 15:16:11 -0400953 }
954 // To preserve ordering of messages, all messages to a given topic are sent to the same partition
955 // by providing a message key. The key is encoded in the topic name. If the deviceId is not
956 // present then the key will be empty, hence all messages for a given topic will be sent to all
957 // partitions.
958 replyTopic := &Topic{Name: msg.Header.FromTopic}
959 key := msg.Header.KeyTopic
Neha Sharma96b7bf22020-06-15 10:37:32 +0000960 logger.Debugw(ctx, "sending-response-to-kafka", log.Fields{"rpc": requestBody.Rpc, "header": icm.Header, "key": key})
William Kurkianea869482019-04-09 15:16:11 -0400961 // TODO: handle error response.
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000962 go func() {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000963 if err := kp.kafkaClient.Send(ctx, icm, replyTopic, key); err != nil {
964 logger.Errorw(ctx, "send-reply-failed", log.Fields{
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000965 "topic": replyTopic,
966 "key": key,
967 "error": err})
968 }
969 }()
William Kurkianea869482019-04-09 15:16:11 -0400970 }
971 } else if msg.Header.Type == ic.MessageType_RESPONSE {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000972 logger.Debugw(ctx, "response-received", log.Fields{"msg-header": msg.Header})
973 go kp.dispatchResponse(ctx, msg)
William Kurkianea869482019-04-09 15:16:11 -0400974 } else {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000975 logger.Warnw(ctx, "unsupported-message-received", log.Fields{"msg-header": msg.Header})
William Kurkianea869482019-04-09 15:16:11 -0400976 }
977}
978
Neha Sharma96b7bf22020-06-15 10:37:32 +0000979func (kp *interContainerProxy) waitForMessages(ctx context.Context, ch <-chan *ic.InterContainerMessage, topic Topic, targetInterface interface{}) {
William Kurkianea869482019-04-09 15:16:11 -0400980 // Wait for messages
981 for msg := range ch {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000982 //logger.Debugw(ctx, "request-received", log.Fields{"msg": msg, "topic": topic.Name, "target": targetInterface})
983 go kp.handleMessage(context.Background(), msg, targetInterface)
William Kurkianea869482019-04-09 15:16:11 -0400984 }
985}
986
Neha Sharma96b7bf22020-06-15 10:37:32 +0000987func (kp *interContainerProxy) dispatchResponse(ctx context.Context, msg *ic.InterContainerMessage) {
William Kurkianea869482019-04-09 15:16:11 -0400988 kp.lockTransactionIdToChannelMap.RLock()
989 defer kp.lockTransactionIdToChannelMap.RUnlock()
990 if _, exist := kp.transactionIdToChannelMap[msg.Header.Id]; !exist {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000991 logger.Debugw(ctx, "no-waiting-channel", log.Fields{"transaction": msg.Header.Id})
William Kurkianea869482019-04-09 15:16:11 -0400992 return
993 }
994 kp.transactionIdToChannelMap[msg.Header.Id].ch <- msg
995}
996
997// subscribeForResponse allows a caller to subscribe to a given topic when waiting for a response.
998// This method is built to prevent all subscribers to receive all messages as is the case of the Subscribe
999// API. There is one response channel waiting for kafka messages before dispatching the message to the
1000// corresponding waiting channel
Neha Sharma96b7bf22020-06-15 10:37:32 +00001001func (kp *interContainerProxy) subscribeForResponse(ctx context.Context, topic Topic, trnsId string) (chan *ic.InterContainerMessage, error) {
1002 logger.Debugw(ctx, "subscribeForResponse", log.Fields{"topic": topic.Name, "trnsid": trnsId})
William Kurkianea869482019-04-09 15:16:11 -04001003
1004 // Create a specific channel for this consumers. We cannot use the channel from the kafkaclient as it will
1005 // broadcast any message for this topic to all channels waiting on it.
divyadesaid26f6b12020-03-19 06:30:28 +00001006 // Set channel size to 1 to prevent deadlock, see VOL-2708
1007 ch := make(chan *ic.InterContainerMessage, 1)
William Kurkianea869482019-04-09 15:16:11 -04001008 kp.addToTransactionIdToChannelMap(trnsId, &topic, ch)
1009
1010 return ch, nil
1011}
1012
Neha Sharma96b7bf22020-06-15 10:37:32 +00001013func (kp *interContainerProxy) unSubscribeForResponse(ctx context.Context, trnsId string) error {
1014 logger.Debugw(ctx, "unsubscribe-for-response", log.Fields{"trnsId": trnsId})
William Kurkianea869482019-04-09 15:16:11 -04001015 kp.deleteFromTransactionIdToChannelMap(trnsId)
1016 return nil
1017}
1018
Neha Sharma96b7bf22020-06-15 10:37:32 +00001019func (kp *interContainerProxy) EnableLivenessChannel(ctx context.Context, enable bool) chan bool {
1020 return kp.kafkaClient.EnableLivenessChannel(ctx, enable)
cbabu95f21522019-11-13 14:25:18 +01001021}
1022
Neha Sharma96b7bf22020-06-15 10:37:32 +00001023func (kp *interContainerProxy) EnableHealthinessChannel(ctx context.Context, enable bool) chan bool {
1024 return kp.kafkaClient.EnableHealthinessChannel(ctx, enable)
Scott Baker86fce9a2019-12-12 09:47:17 -08001025}
1026
Neha Sharma96b7bf22020-06-15 10:37:32 +00001027func (kp *interContainerProxy) SendLiveness(ctx context.Context) error {
1028 return kp.kafkaClient.SendLiveness(ctx)
cbabu95f21522019-11-13 14:25:18 +01001029}
1030
William Kurkianea869482019-04-09 15:16:11 -04001031//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
1032//or an error on failure
Neha Sharma96b7bf22020-06-15 10:37:32 +00001033func encodeRequest(ctx context.Context, rpc string, toTopic *Topic, replyTopic *Topic, key string, kvArgs ...*KVArg) (*ic.InterContainerMessage, error) {
William Kurkianea869482019-04-09 15:16:11 -04001034 requestHeader := &ic.Header{
1035 Id: uuid.New().String(),
1036 Type: ic.MessageType_REQUEST,
1037 FromTopic: replyTopic.Name,
1038 ToTopic: toTopic.Name,
Matt Jeanneret384d8c92019-05-06 14:27:31 -04001039 KeyTopic: key,
Scott Bakered4a8e72020-04-17 11:10:20 -07001040 Timestamp: ptypes.TimestampNow(),
William Kurkianea869482019-04-09 15:16:11 -04001041 }
1042 requestBody := &ic.InterContainerRequestBody{
1043 Rpc: rpc,
1044 ResponseRequired: true,
1045 ReplyToTopic: replyTopic.Name,
1046 }
1047
1048 for _, arg := range kvArgs {
1049 if arg == nil {
1050 // In case the caller sends an array with empty args
1051 continue
1052 }
1053 var marshalledArg *any.Any
1054 var err error
1055 // ascertain the value interface type is a proto.Message
1056 protoValue, ok := arg.Value.(proto.Message)
1057 if !ok {
Neha Sharma96b7bf22020-06-15 10:37:32 +00001058 logger.Warnw(ctx, "argument-value-not-proto-message", log.Fields{"error": ok, "Value": arg.Value})
William Kurkianea869482019-04-09 15:16:11 -04001059 err := errors.New("argument-value-not-proto-message")
1060 return nil, err
1061 }
1062 if marshalledArg, err = ptypes.MarshalAny(protoValue); err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +00001063 logger.Warnw(ctx, "cannot-marshal-request", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -04001064 return nil, err
1065 }
1066 protoArg := &ic.Argument{
1067 Key: arg.Key,
1068 Value: marshalledArg,
1069 }
1070 requestBody.Args = append(requestBody.Args, protoArg)
1071 }
1072
1073 var marshalledData *any.Any
1074 var err error
1075 if marshalledData, err = ptypes.MarshalAny(requestBody); err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +00001076 logger.Warnw(ctx, "cannot-marshal-request", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -04001077 return nil, err
1078 }
1079 request := &ic.InterContainerMessage{
1080 Header: requestHeader,
1081 Body: marshalledData,
1082 }
1083 return request, nil
1084}
1085
Neha Sharma96b7bf22020-06-15 10:37:32 +00001086func decodeResponse(ctx context.Context, response *ic.InterContainerMessage) (*ic.InterContainerResponseBody, error) {
William Kurkianea869482019-04-09 15:16:11 -04001087 // Extract the message body
1088 responseBody := ic.InterContainerResponseBody{}
1089 if err := ptypes.UnmarshalAny(response.Body, &responseBody); err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +00001090 logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -04001091 return nil, err
1092 }
Neha Sharma96b7bf22020-06-15 10:37:32 +00001093 //logger.Debugw(ctx, "response-decoded-successfully", log.Fields{"response-status": &responseBody.Success})
William Kurkianea869482019-04-09 15:16:11 -04001094
1095 return &responseBody, nil
1096
1097}