blob: 3af35d74a7a3584ca8ff667f8dcfc94a392bec4e [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kafka
17
18import (
19 "context"
Matteo Scandolo46654682020-08-05 11:46:37 -070020 "encoding/json"
William Kurkianea869482019-04-09 15:16:11 -040021 "errors"
22 "fmt"
Matteo Scandolo46654682020-08-05 11:46:37 -070023 "google.golang.org/grpc/codes"
24 "google.golang.org/grpc/status"
William Kurkianea869482019-04-09 15:16:11 -040025 "reflect"
26 "strings"
27 "sync"
28 "time"
William Kurkianea869482019-04-09 15:16:11 -040029
Esin Karamanccb714b2019-11-29 15:02:06 +000030 "github.com/golang/protobuf/proto"
31 "github.com/golang/protobuf/ptypes"
32 "github.com/golang/protobuf/ptypes/any"
33 "github.com/google/uuid"
Girish Gowdraa09aeab2020-09-14 16:30:52 -070034 "github.com/opencord/voltha-lib-go/v4/pkg/log"
35 ic "github.com/opencord/voltha-protos/v4/go/inter_container"
Matteo Scandolo46654682020-08-05 11:46:37 -070036 "github.com/opentracing/opentracing-go"
Esin Karamanccb714b2019-11-29 15:02:06 +000037)
William Kurkianea869482019-04-09 15:16:11 -040038
39const (
40 DefaultMaxRetries = 3
divyadesaid26f6b12020-03-19 06:30:28 +000041 DefaultRequestTimeout = 60000 // 60000 milliseconds - to handle a wider latency range
William Kurkianea869482019-04-09 15:16:11 -040042)
43
44const (
45 TransactionKey = "transactionID"
46 FromTopic = "fromTopic"
47)
48
kdarapub26b4502019-10-05 03:02:33 +053049var ErrorTransactionNotAcquired = errors.New("transaction-not-acquired")
50var ErrorTransactionInvalidId = errors.New("transaction-invalid-id")
51
William Kurkianea869482019-04-09 15:16:11 -040052// requestHandlerChannel represents an interface associated with a channel. Whenever, an event is
53// obtained from that channel, this interface is invoked. This is used to handle
54// async requests into the Core via the kafka messaging bus
55type requestHandlerChannel struct {
56 requesthandlerInterface interface{}
57 ch <-chan *ic.InterContainerMessage
58}
59
60// transactionChannel represents a combination of a topic and a channel onto which a response received
61// on the kafka bus will be sent to
62type transactionChannel struct {
63 topic *Topic
64 ch chan *ic.InterContainerMessage
65}
66
npujarec5762e2020-01-01 14:08:48 +053067type InterContainerProxy interface {
Neha Sharma96b7bf22020-06-15 10:37:32 +000068 Start(ctx context.Context) error
69 Stop(ctx context.Context)
npujarec5762e2020-01-01 14:08:48 +053070 GetDefaultTopic() *Topic
npujarec5762e2020-01-01 14:08:48 +053071 InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic, waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any)
divyadesaid26f6b12020-03-19 06:30:28 +000072 InvokeAsyncRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic, waitForResponse bool, key string, kvArgs ...*KVArg) chan *RpcResponse
Neha Sharma96b7bf22020-06-15 10:37:32 +000073 SubscribeWithRequestHandlerInterface(ctx context.Context, topic Topic, handler interface{}) error
74 SubscribeWithDefaultRequestHandler(ctx context.Context, topic Topic, initialOffset int64) error
75 UnSubscribeFromRequestHandler(ctx context.Context, topic Topic) error
76 DeleteTopic(ctx context.Context, topic Topic) error
77 EnableLivenessChannel(ctx context.Context, enable bool) chan bool
78 SendLiveness(ctx context.Context) error
npujarec5762e2020-01-01 14:08:48 +053079}
80
81// interContainerProxy represents the messaging proxy
82type interContainerProxy struct {
Neha Sharma3f221ae2020-04-29 19:02:12 +000083 kafkaAddress string
npujarec5762e2020-01-01 14:08:48 +053084 defaultTopic *Topic
William Kurkianea869482019-04-09 15:16:11 -040085 defaultRequestHandlerInterface interface{}
William Kurkianea869482019-04-09 15:16:11 -040086 kafkaClient Client
npujarec5762e2020-01-01 14:08:48 +053087 doneCh chan struct{}
88 doneOnce sync.Once
William Kurkianea869482019-04-09 15:16:11 -040089
90 // This map is used to map a topic to an interface and channel. When a request is received
91 // on that channel (registered to the topic) then that interface is invoked.
92 topicToRequestHandlerChannelMap map[string]*requestHandlerChannel
93 lockTopicRequestHandlerChannelMap sync.RWMutex
94
95 // This map is used to map a channel to a response topic. This channel handles all responses on that
96 // channel for that topic and forward them to the appropriate consumers channel, using the
97 // transactionIdToChannelMap.
98 topicToResponseChannelMap map[string]<-chan *ic.InterContainerMessage
99 lockTopicResponseChannelMap sync.RWMutex
100
101 // This map is used to map a transaction to a consumers channel. This is used whenever a request has been
102 // sent out and we are waiting for a response.
103 transactionIdToChannelMap map[string]*transactionChannel
104 lockTransactionIdToChannelMap sync.RWMutex
105}
106
npujarec5762e2020-01-01 14:08:48 +0530107type InterContainerProxyOption func(*interContainerProxy)
William Kurkianea869482019-04-09 15:16:11 -0400108
Neha Sharma3f221ae2020-04-29 19:02:12 +0000109func InterContainerAddress(address string) InterContainerProxyOption {
npujarec5762e2020-01-01 14:08:48 +0530110 return func(args *interContainerProxy) {
Neha Sharma3f221ae2020-04-29 19:02:12 +0000111 args.kafkaAddress = address
William Kurkianea869482019-04-09 15:16:11 -0400112 }
113}
114
115func DefaultTopic(topic *Topic) InterContainerProxyOption {
npujarec5762e2020-01-01 14:08:48 +0530116 return func(args *interContainerProxy) {
117 args.defaultTopic = topic
William Kurkianea869482019-04-09 15:16:11 -0400118 }
119}
120
William Kurkianea869482019-04-09 15:16:11 -0400121func RequestHandlerInterface(handler interface{}) InterContainerProxyOption {
npujarec5762e2020-01-01 14:08:48 +0530122 return func(args *interContainerProxy) {
William Kurkianea869482019-04-09 15:16:11 -0400123 args.defaultRequestHandlerInterface = handler
124 }
125}
126
127func MsgClient(client Client) InterContainerProxyOption {
npujarec5762e2020-01-01 14:08:48 +0530128 return func(args *interContainerProxy) {
William Kurkianea869482019-04-09 15:16:11 -0400129 args.kafkaClient = client
130 }
131}
132
npujarec5762e2020-01-01 14:08:48 +0530133func newInterContainerProxy(opts ...InterContainerProxyOption) *interContainerProxy {
134 proxy := &interContainerProxy{
Neha Sharma3f221ae2020-04-29 19:02:12 +0000135 kafkaAddress: DefaultKafkaAddress,
136 doneCh: make(chan struct{}),
William Kurkianea869482019-04-09 15:16:11 -0400137 }
138
139 for _, option := range opts {
140 option(proxy)
141 }
142
npujarec5762e2020-01-01 14:08:48 +0530143 return proxy
William Kurkianea869482019-04-09 15:16:11 -0400144}
145
npujarec5762e2020-01-01 14:08:48 +0530146func NewInterContainerProxy(opts ...InterContainerProxyOption) InterContainerProxy {
147 return newInterContainerProxy(opts...)
148}
149
Neha Sharma96b7bf22020-06-15 10:37:32 +0000150func (kp *interContainerProxy) Start(ctx context.Context) error {
151 logger.Info(ctx, "Starting-Proxy")
William Kurkianea869482019-04-09 15:16:11 -0400152
153 // Kafka MsgClient should already have been created. If not, output fatal error
154 if kp.kafkaClient == nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000155 logger.Fatal(ctx, "kafka-client-not-set")
William Kurkianea869482019-04-09 15:16:11 -0400156 }
157
William Kurkianea869482019-04-09 15:16:11 -0400158 // Start the kafka client
Neha Sharma96b7bf22020-06-15 10:37:32 +0000159 if err := kp.kafkaClient.Start(ctx); err != nil {
160 logger.Errorw(ctx, "Cannot-create-kafka-proxy", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400161 return err
162 }
163
164 // Create the topic to response channel map
165 kp.topicToResponseChannelMap = make(map[string]<-chan *ic.InterContainerMessage)
166 //
167 // Create the transactionId to Channel Map
168 kp.transactionIdToChannelMap = make(map[string]*transactionChannel)
169
170 // Create the topic to request channel map
171 kp.topicToRequestHandlerChannelMap = make(map[string]*requestHandlerChannel)
172
173 return nil
174}
175
Neha Sharma96b7bf22020-06-15 10:37:32 +0000176func (kp *interContainerProxy) Stop(ctx context.Context) {
177 logger.Info(ctx, "stopping-intercontainer-proxy")
npujarec5762e2020-01-01 14:08:48 +0530178 kp.doneOnce.Do(func() { close(kp.doneCh) })
William Kurkianea869482019-04-09 15:16:11 -0400179 // TODO : Perform cleanup
Neha Sharma96b7bf22020-06-15 10:37:32 +0000180 kp.kafkaClient.Stop(ctx)
181 err := kp.deleteAllTopicRequestHandlerChannelMap(ctx)
Scott Bakere701b862020-02-20 16:19:16 -0800182 if err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000183 logger.Errorw(ctx, "failed-delete-all-topic-request-handler-channel-map", log.Fields{"error": err})
Scott Bakere701b862020-02-20 16:19:16 -0800184 }
Neha Sharma96b7bf22020-06-15 10:37:32 +0000185 err = kp.deleteAllTopicResponseChannelMap(ctx)
Scott Bakere701b862020-02-20 16:19:16 -0800186 if err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000187 logger.Errorw(ctx, "failed-delete-all-topic-response-channel-map", log.Fields{"error": err})
Scott Bakere701b862020-02-20 16:19:16 -0800188 }
Neha Sharma96b7bf22020-06-15 10:37:32 +0000189 kp.deleteAllTransactionIdToChannelMap(ctx)
William Kurkianea869482019-04-09 15:16:11 -0400190}
191
npujarec5762e2020-01-01 14:08:48 +0530192func (kp *interContainerProxy) GetDefaultTopic() *Topic {
193 return kp.defaultTopic
194}
195
divyadesaid26f6b12020-03-19 06:30:28 +0000196// InvokeAsyncRPC is used to make an RPC request asynchronously
197func (kp *interContainerProxy) InvokeAsyncRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
198 waitForResponse bool, key string, kvArgs ...*KVArg) chan *RpcResponse {
199
Neha Sharma96b7bf22020-06-15 10:37:32 +0000200 logger.Debugw(ctx, "InvokeAsyncRPC", log.Fields{"rpc": rpc, "key": key})
Matteo Scandolo46654682020-08-05 11:46:37 -0700201
202 spanArg, span, ctx := kp.embedSpanAsArg(ctx, rpc, !waitForResponse)
203 if spanArg != nil {
204 kvArgs = append(kvArgs, &spanArg[0])
205 }
206 defer span.Finish()
207
divyadesaid26f6b12020-03-19 06:30:28 +0000208 // If a replyToTopic is provided then we use it, otherwise just use the default toTopic. The replyToTopic is
209 // typically the device ID.
210 responseTopic := replyToTopic
211 if responseTopic == nil {
212 responseTopic = kp.GetDefaultTopic()
213 }
214
215 chnl := make(chan *RpcResponse)
216
217 go func() {
218
219 // once we're done,
220 // close the response channel
221 defer close(chnl)
222
223 var err error
224 var protoRequest *ic.InterContainerMessage
225
226 // Encode the request
Neha Sharma96b7bf22020-06-15 10:37:32 +0000227 protoRequest, err = encodeRequest(ctx, rpc, toTopic, responseTopic, key, kvArgs...)
divyadesaid26f6b12020-03-19 06:30:28 +0000228 if err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000229 logger.Warnw(ctx, "cannot-format-request", log.Fields{"rpc": rpc, "error": err})
Matteo Scandolo46654682020-08-05 11:46:37 -0700230 log.MarkSpanError(ctx, errors.New("cannot-format-request"))
divyadesaid26f6b12020-03-19 06:30:28 +0000231 chnl <- NewResponse(RpcFormattingError, err, nil)
232 return
233 }
234
235 // Subscribe for response, if needed, before sending request
236 var ch <-chan *ic.InterContainerMessage
Neha Sharma96b7bf22020-06-15 10:37:32 +0000237 if ch, err = kp.subscribeForResponse(ctx, *responseTopic, protoRequest.Header.Id); err != nil {
238 logger.Errorw(ctx, "failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
Matteo Scandolo46654682020-08-05 11:46:37 -0700239 log.MarkSpanError(ctx, errors.New("failed-to-subscribe-for-response"))
divyadesaid26f6b12020-03-19 06:30:28 +0000240 chnl <- NewResponse(RpcTransportError, err, nil)
241 return
242 }
243
244 // Send request - if the topic is formatted with a device Id then we will send the request using a
245 // specific key, hence ensuring a single partition is used to publish the request. This ensures that the
246 // subscriber on that topic will receive the request in the order it was sent. The key used is the deviceId.
Neha Sharma96b7bf22020-06-15 10:37:32 +0000247 logger.Debugw(ctx, "sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
divyadesaid26f6b12020-03-19 06:30:28 +0000248
249 // if the message is not sent on kafka publish an event an close the channel
Neha Sharma96b7bf22020-06-15 10:37:32 +0000250 if err = kp.kafkaClient.Send(ctx, protoRequest, toTopic, key); err != nil {
divyadesaid26f6b12020-03-19 06:30:28 +0000251 chnl <- NewResponse(RpcTransportError, err, nil)
252 return
253 }
254
255 // if the client is not waiting for a response send the ack and close the channel
256 chnl <- NewResponse(RpcSent, nil, nil)
257 if !waitForResponse {
258 return
259 }
260
261 defer func() {
262 // Remove the subscription for a response on return
Neha Sharma96b7bf22020-06-15 10:37:32 +0000263 if err := kp.unSubscribeForResponse(ctx, protoRequest.Header.Id); err != nil {
264 logger.Warnw(ctx, "invoke-async-rpc-unsubscriber-for-response-failed", log.Fields{"err": err})
divyadesaid26f6b12020-03-19 06:30:28 +0000265 }
266 }()
267
268 // Wait for response as well as timeout or cancellation
269 select {
270 case msg, ok := <-ch:
271 if !ok {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000272 logger.Warnw(ctx, "channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
Matteo Scandolo46654682020-08-05 11:46:37 -0700273 log.MarkSpanError(ctx, errors.New("channel-closed"))
divyadesaid26f6b12020-03-19 06:30:28 +0000274 chnl <- NewResponse(RpcTransportError, status.Error(codes.Aborted, "channel closed"), nil)
275 }
Neha Sharma96b7bf22020-06-15 10:37:32 +0000276 logger.Debugw(ctx, "received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
277 if responseBody, err := decodeResponse(ctx, msg); err != nil {
divyadesaid26f6b12020-03-19 06:30:28 +0000278 chnl <- NewResponse(RpcReply, err, nil)
279 } else {
280 if responseBody.Success {
281 chnl <- NewResponse(RpcReply, nil, responseBody.Result)
282 } else {
283 // response body contains an error
284 unpackErr := &ic.Error{}
285 if err := ptypes.UnmarshalAny(responseBody.Result, unpackErr); err != nil {
286 chnl <- NewResponse(RpcReply, err, nil)
287 } else {
288 chnl <- NewResponse(RpcReply, status.Error(codes.Internal, unpackErr.Reason), nil)
289 }
290 }
291 }
292 case <-ctx.Done():
Neha Sharma96b7bf22020-06-15 10:37:32 +0000293 logger.Errorw(ctx, "context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
Matteo Scandolo46654682020-08-05 11:46:37 -0700294 log.MarkSpanError(ctx, errors.New("context-cancelled"))
divyadesaid26f6b12020-03-19 06:30:28 +0000295 err := status.Error(codes.DeadlineExceeded, ctx.Err().Error())
296 chnl <- NewResponse(RpcTimeout, err, nil)
297 case <-kp.doneCh:
298 chnl <- NewResponse(RpcSystemClosing, nil, nil)
Neha Sharma96b7bf22020-06-15 10:37:32 +0000299 logger.Warnw(ctx, "received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
divyadesaid26f6b12020-03-19 06:30:28 +0000300 }
301 }()
302 return chnl
303}
304
Matteo Scandolo46654682020-08-05 11:46:37 -0700305// Method to extract Open-tracing Span from Context and serialize it for transport over Kafka embedded as a additional argument.
306// Additional argument is injected using key as "span" and value as Span marshalled into a byte slice
307//
308// The span name is automatically constructed using the RPC name with following convention (<rpc-name> represents name of invoked method):
309// - RPC invoked in Sync manner (WaitForResponse=true) : kafka-rpc-<rpc-name>
310// - RPC invoked in Async manner (WaitForResponse=false) : kafka-async-rpc-<rpc-name>
311// - Inter Adapter RPC invoked in Sync manner (WaitForResponse=true) : kafka-inter-adapter-rpc-<rpc-name>
312// - Inter Adapter RPC invoked in Async manner (WaitForResponse=false) : kafka-inter-adapter-async-rpc-<rpc-name>
313func (kp *interContainerProxy) embedSpanAsArg(ctx context.Context, rpc string, isAsync bool) ([]KVArg, opentracing.Span, context.Context) {
314 var err error
315 var newCtx context.Context
316 var spanToInject opentracing.Span
317
318 var spanName strings.Builder
319 spanName.WriteString("kafka-")
320
321 // In case of inter adapter message, use Msg Type for constructing RPC name
322 if rpc == "process_inter_adapter_message" {
323 if msgType, ok := ctx.Value("inter-adapter-msg-type").(ic.InterAdapterMessageType_Types); ok {
324 spanName.WriteString("inter-adapter-")
325 rpc = msgType.String()
326 }
327 }
328
329 if isAsync {
330 spanName.WriteString("async-rpc-")
331 } else {
332 spanName.WriteString("rpc-")
333 }
334 spanName.WriteString(rpc)
335
336 if isAsync {
337 spanToInject, newCtx = log.CreateAsyncSpan(ctx, spanName.String())
338 } else {
339 spanToInject, newCtx = log.CreateChildSpan(ctx, spanName.String())
340 }
341
342 spanToInject.SetBaggageItem("rpc-span-name", spanName.String())
343
344 textMapCarrier := opentracing.TextMapCarrier(make(map[string]string))
345 if err = opentracing.GlobalTracer().Inject(spanToInject.Context(), opentracing.TextMap, textMapCarrier); err != nil {
346 logger.Warnw(ctx, "unable-to-serialize-span-to-textmap", log.Fields{"span": spanToInject, "error": err})
347 return nil, spanToInject, newCtx
348 }
349
350 var textMapJson []byte
351 if textMapJson, err = json.Marshal(textMapCarrier); err != nil {
352 logger.Warnw(ctx, "unable-to-marshal-textmap-to-json-string", log.Fields{"textMap": textMapCarrier, "error": err})
353 return nil, spanToInject, newCtx
354 }
355
356 spanArg := make([]KVArg, 1)
357 spanArg[0] = KVArg{Key: "span", Value: &ic.StrType{Val: string(textMapJson)}}
358 return spanArg, spanToInject, newCtx
359}
360
William Kurkianea869482019-04-09 15:16:11 -0400361// InvokeRPC is used to send a request to a given topic
npujarec5762e2020-01-01 14:08:48 +0530362func (kp *interContainerProxy) InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
William Kurkianea869482019-04-09 15:16:11 -0400363 waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any) {
364
Matteo Scandolo46654682020-08-05 11:46:37 -0700365 spanArg, span, ctx := kp.embedSpanAsArg(ctx, rpc, false)
366 if spanArg != nil {
367 kvArgs = append(kvArgs, &spanArg[0])
368 }
369 defer span.Finish()
370
William Kurkianea869482019-04-09 15:16:11 -0400371 // If a replyToTopic is provided then we use it, otherwise just use the default toTopic. The replyToTopic is
372 // typically the device ID.
373 responseTopic := replyToTopic
374 if responseTopic == nil {
npujarec5762e2020-01-01 14:08:48 +0530375 responseTopic = kp.defaultTopic
William Kurkianea869482019-04-09 15:16:11 -0400376 }
377
378 // Encode the request
Neha Sharma96b7bf22020-06-15 10:37:32 +0000379 protoRequest, err := encodeRequest(ctx, rpc, toTopic, responseTopic, key, kvArgs...)
William Kurkianea869482019-04-09 15:16:11 -0400380 if err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000381 logger.Warnw(ctx, "cannot-format-request", log.Fields{"rpc": rpc, "error": err})
Matteo Scandolo46654682020-08-05 11:46:37 -0700382 log.MarkSpanError(ctx, errors.New("cannot-format-request"))
William Kurkianea869482019-04-09 15:16:11 -0400383 return false, nil
384 }
385
386 // Subscribe for response, if needed, before sending request
387 var ch <-chan *ic.InterContainerMessage
388 if waitForResponse {
389 var err error
Neha Sharma96b7bf22020-06-15 10:37:32 +0000390 if ch, err = kp.subscribeForResponse(ctx, *responseTopic, protoRequest.Header.Id); err != nil {
391 logger.Errorw(ctx, "failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400392 }
393 }
394
395 // Send request - if the topic is formatted with a device Id then we will send the request using a
396 // specific key, hence ensuring a single partition is used to publish the request. This ensures that the
397 // subscriber on that topic will receive the request in the order it was sent. The key used is the deviceId.
398 //key := GetDeviceIdFromTopic(*toTopic)
Neha Sharma96b7bf22020-06-15 10:37:32 +0000399 logger.Debugw(ctx, "sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000400 go func() {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000401 if err := kp.kafkaClient.Send(ctx, protoRequest, toTopic, key); err != nil {
Matteo Scandolo46654682020-08-05 11:46:37 -0700402 log.MarkSpanError(ctx, errors.New("send-failed"))
Neha Sharma96b7bf22020-06-15 10:37:32 +0000403 logger.Errorw(ctx, "send-failed", log.Fields{
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000404 "topic": toTopic,
405 "key": key,
406 "error": err})
407 }
408 }()
William Kurkianea869482019-04-09 15:16:11 -0400409
410 if waitForResponse {
411 // Create a child context based on the parent context, if any
412 var cancel context.CancelFunc
413 childCtx := context.Background()
414 if ctx == nil {
415 ctx, cancel = context.WithTimeout(context.Background(), DefaultRequestTimeout*time.Millisecond)
416 } else {
417 childCtx, cancel = context.WithTimeout(ctx, DefaultRequestTimeout*time.Millisecond)
418 }
419 defer cancel()
420
421 // Wait for response as well as timeout or cancellation
422 // Remove the subscription for a response on return
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000423 defer func() {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000424 if err := kp.unSubscribeForResponse(ctx, protoRequest.Header.Id); err != nil {
425 logger.Errorw(ctx, "response-unsubscribe-failed", log.Fields{
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000426 "id": protoRequest.Header.Id,
427 "error": err})
428 }
429 }()
William Kurkianea869482019-04-09 15:16:11 -0400430 select {
431 case msg, ok := <-ch:
432 if !ok {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000433 logger.Warnw(ctx, "channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
Matteo Scandolo46654682020-08-05 11:46:37 -0700434 log.MarkSpanError(ctx, errors.New("channel-closed"))
William Kurkianea869482019-04-09 15:16:11 -0400435 protoError := &ic.Error{Reason: "channel-closed"}
436 var marshalledArg *any.Any
437 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
438 return false, nil // Should never happen
439 }
440 return false, marshalledArg
441 }
Neha Sharma96b7bf22020-06-15 10:37:32 +0000442 logger.Debugw(ctx, "received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
William Kurkianea869482019-04-09 15:16:11 -0400443 var responseBody *ic.InterContainerResponseBody
444 var err error
Neha Sharma96b7bf22020-06-15 10:37:32 +0000445 if responseBody, err = decodeResponse(ctx, msg); err != nil {
446 logger.Errorw(ctx, "decode-response-error", log.Fields{"error": err})
npujarec5762e2020-01-01 14:08:48 +0530447 // FIXME we should return something
William Kurkianea869482019-04-09 15:16:11 -0400448 }
449 return responseBody.Success, responseBody.Result
450 case <-ctx.Done():
Neha Sharma96b7bf22020-06-15 10:37:32 +0000451 logger.Debugw(ctx, "context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
Matteo Scandolo46654682020-08-05 11:46:37 -0700452 log.MarkSpanError(ctx, errors.New("context-cancelled"))
William Kurkianea869482019-04-09 15:16:11 -0400453 // pack the error as proto any type
npujarec5762e2020-01-01 14:08:48 +0530454 protoError := &ic.Error{Reason: ctx.Err().Error(), Code: ic.ErrorCode_DEADLINE_EXCEEDED}
455
William Kurkianea869482019-04-09 15:16:11 -0400456 var marshalledArg *any.Any
457 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
458 return false, nil // Should never happen
459 }
460 return false, marshalledArg
461 case <-childCtx.Done():
Neha Sharma96b7bf22020-06-15 10:37:32 +0000462 logger.Debugw(ctx, "context-cancelled", log.Fields{"rpc": rpc, "ctx": childCtx.Err()})
Matteo Scandolo46654682020-08-05 11:46:37 -0700463 log.MarkSpanError(ctx, errors.New("context-cancelled"))
William Kurkianea869482019-04-09 15:16:11 -0400464 // pack the error as proto any type
npujarec5762e2020-01-01 14:08:48 +0530465 protoError := &ic.Error{Reason: childCtx.Err().Error(), Code: ic.ErrorCode_DEADLINE_EXCEEDED}
466
William Kurkianea869482019-04-09 15:16:11 -0400467 var marshalledArg *any.Any
468 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
469 return false, nil // Should never happen
470 }
471 return false, marshalledArg
472 case <-kp.doneCh:
Neha Sharma96b7bf22020-06-15 10:37:32 +0000473 logger.Infow(ctx, "received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
William Kurkianea869482019-04-09 15:16:11 -0400474 return true, nil
475 }
476 }
477 return true, nil
478}
479
480// SubscribeWithRequestHandlerInterface allows a caller to assign a target object to be invoked automatically
481// when a message is received on a given topic
Neha Sharma96b7bf22020-06-15 10:37:32 +0000482func (kp *interContainerProxy) SubscribeWithRequestHandlerInterface(ctx context.Context, topic Topic, handler interface{}) error {
William Kurkianea869482019-04-09 15:16:11 -0400483
484 // Subscribe to receive messages for that topic
485 var ch <-chan *ic.InterContainerMessage
486 var err error
Neha Sharma96b7bf22020-06-15 10:37:32 +0000487 if ch, err = kp.kafkaClient.Subscribe(ctx, &topic); err != nil {
William Kurkianea869482019-04-09 15:16:11 -0400488 //if ch, err = kp.Subscribe(topic); err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000489 logger.Errorw(ctx, "failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
Matt Jeannereteb5059f2019-07-19 06:11:00 -0400490 return err
William Kurkianea869482019-04-09 15:16:11 -0400491 }
492
493 kp.defaultRequestHandlerInterface = handler
494 kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: handler, ch: ch})
495 // Launch a go routine to receive and process kafka messages
Neha Sharma96b7bf22020-06-15 10:37:32 +0000496 go kp.waitForMessages(ctx, ch, topic, handler)
William Kurkianea869482019-04-09 15:16:11 -0400497
498 return nil
499}
500
501// SubscribeWithDefaultRequestHandler allows a caller to add a topic to an existing target object to be invoked automatically
502// when a message is received on a given topic. So far there is only 1 target registered per microservice
Neha Sharma96b7bf22020-06-15 10:37:32 +0000503func (kp *interContainerProxy) SubscribeWithDefaultRequestHandler(ctx context.Context, topic Topic, initialOffset int64) error {
William Kurkianea869482019-04-09 15:16:11 -0400504 // Subscribe to receive messages for that topic
505 var ch <-chan *ic.InterContainerMessage
506 var err error
Neha Sharma96b7bf22020-06-15 10:37:32 +0000507 if ch, err = kp.kafkaClient.Subscribe(ctx, &topic, &KVArg{Key: Offset, Value: initialOffset}); err != nil {
508 logger.Errorw(ctx, "failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400509 return err
510 }
511 kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: kp.defaultRequestHandlerInterface, ch: ch})
512
513 // Launch a go routine to receive and process kafka messages
Neha Sharma96b7bf22020-06-15 10:37:32 +0000514 go kp.waitForMessages(ctx, ch, topic, kp.defaultRequestHandlerInterface)
William Kurkianea869482019-04-09 15:16:11 -0400515
516 return nil
517}
518
Neha Sharma96b7bf22020-06-15 10:37:32 +0000519func (kp *interContainerProxy) UnSubscribeFromRequestHandler(ctx context.Context, topic Topic) error {
520 return kp.deleteFromTopicRequestHandlerChannelMap(ctx, topic.Name)
William Kurkianea869482019-04-09 15:16:11 -0400521}
522
Neha Sharma96b7bf22020-06-15 10:37:32 +0000523func (kp *interContainerProxy) deleteFromTopicResponseChannelMap(ctx context.Context, topic string) error {
William Kurkianea869482019-04-09 15:16:11 -0400524 kp.lockTopicResponseChannelMap.Lock()
525 defer kp.lockTopicResponseChannelMap.Unlock()
526 if _, exist := kp.topicToResponseChannelMap[topic]; exist {
527 // Unsubscribe to this topic first - this will close the subscribed channel
528 var err error
Neha Sharma96b7bf22020-06-15 10:37:32 +0000529 if err = kp.kafkaClient.UnSubscribe(ctx, &Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
530 logger.Errorw(ctx, "unsubscribing-error", log.Fields{"topic": topic})
William Kurkianea869482019-04-09 15:16:11 -0400531 }
532 delete(kp.topicToResponseChannelMap, topic)
533 return err
534 } else {
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000535 return fmt.Errorf("%s-Topic-not-found", topic)
William Kurkianea869482019-04-09 15:16:11 -0400536 }
537}
538
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000539// nolint: unused
Neha Sharma96b7bf22020-06-15 10:37:32 +0000540func (kp *interContainerProxy) deleteAllTopicResponseChannelMap(ctx context.Context) error {
541 logger.Debug(ctx, "delete-all-topic-response-channel")
William Kurkianea869482019-04-09 15:16:11 -0400542 kp.lockTopicResponseChannelMap.Lock()
543 defer kp.lockTopicResponseChannelMap.Unlock()
Scott Bakere701b862020-02-20 16:19:16 -0800544 var unsubscribeFailTopics []string
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000545 for topic := range kp.topicToResponseChannelMap {
William Kurkianea869482019-04-09 15:16:11 -0400546 // Unsubscribe to this topic first - this will close the subscribed channel
Neha Sharma96b7bf22020-06-15 10:37:32 +0000547 if err := kp.kafkaClient.UnSubscribe(ctx, &Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
Scott Bakere701b862020-02-20 16:19:16 -0800548 unsubscribeFailTopics = append(unsubscribeFailTopics, topic)
Neha Sharma96b7bf22020-06-15 10:37:32 +0000549 logger.Errorw(ctx, "unsubscribing-error", log.Fields{"topic": topic, "error": err})
Scott Bakere701b862020-02-20 16:19:16 -0800550 // Do not return. Continue to try to unsubscribe to other topics.
551 } else {
552 // Only delete from channel map if successfully unsubscribed.
553 delete(kp.topicToResponseChannelMap, topic)
William Kurkianea869482019-04-09 15:16:11 -0400554 }
William Kurkianea869482019-04-09 15:16:11 -0400555 }
Scott Bakere701b862020-02-20 16:19:16 -0800556 if len(unsubscribeFailTopics) > 0 {
557 return fmt.Errorf("unsubscribe-errors: %v", unsubscribeFailTopics)
558 }
559 return nil
William Kurkianea869482019-04-09 15:16:11 -0400560}
561
npujarec5762e2020-01-01 14:08:48 +0530562func (kp *interContainerProxy) addToTopicRequestHandlerChannelMap(topic string, arg *requestHandlerChannel) {
William Kurkianea869482019-04-09 15:16:11 -0400563 kp.lockTopicRequestHandlerChannelMap.Lock()
564 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
565 if _, exist := kp.topicToRequestHandlerChannelMap[topic]; !exist {
566 kp.topicToRequestHandlerChannelMap[topic] = arg
567 }
568}
569
Neha Sharma96b7bf22020-06-15 10:37:32 +0000570func (kp *interContainerProxy) deleteFromTopicRequestHandlerChannelMap(ctx context.Context, topic string) error {
William Kurkianea869482019-04-09 15:16:11 -0400571 kp.lockTopicRequestHandlerChannelMap.Lock()
572 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
573 if _, exist := kp.topicToRequestHandlerChannelMap[topic]; exist {
574 // Close the kafka client client first by unsubscribing to this topic
Neha Sharma96b7bf22020-06-15 10:37:32 +0000575 if err := kp.kafkaClient.UnSubscribe(ctx, &Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000576 return err
577 }
William Kurkianea869482019-04-09 15:16:11 -0400578 delete(kp.topicToRequestHandlerChannelMap, topic)
579 return nil
580 } else {
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000581 return fmt.Errorf("%s-Topic-not-found", topic)
William Kurkianea869482019-04-09 15:16:11 -0400582 }
583}
584
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000585// nolint: unused
Neha Sharma96b7bf22020-06-15 10:37:32 +0000586func (kp *interContainerProxy) deleteAllTopicRequestHandlerChannelMap(ctx context.Context) error {
587 logger.Debug(ctx, "delete-all-topic-request-channel")
William Kurkianea869482019-04-09 15:16:11 -0400588 kp.lockTopicRequestHandlerChannelMap.Lock()
589 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
Scott Bakere701b862020-02-20 16:19:16 -0800590 var unsubscribeFailTopics []string
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000591 for topic := range kp.topicToRequestHandlerChannelMap {
William Kurkianea869482019-04-09 15:16:11 -0400592 // Close the kafka client client first by unsubscribing to this topic
Neha Sharma96b7bf22020-06-15 10:37:32 +0000593 if err := kp.kafkaClient.UnSubscribe(ctx, &Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
Scott Bakere701b862020-02-20 16:19:16 -0800594 unsubscribeFailTopics = append(unsubscribeFailTopics, topic)
Neha Sharma96b7bf22020-06-15 10:37:32 +0000595 logger.Errorw(ctx, "unsubscribing-error", log.Fields{"topic": topic, "error": err})
Scott Bakere701b862020-02-20 16:19:16 -0800596 // Do not return. Continue to try to unsubscribe to other topics.
597 } else {
598 // Only delete from channel map if successfully unsubscribed.
599 delete(kp.topicToRequestHandlerChannelMap, topic)
William Kurkianea869482019-04-09 15:16:11 -0400600 }
William Kurkianea869482019-04-09 15:16:11 -0400601 }
Scott Bakere701b862020-02-20 16:19:16 -0800602 if len(unsubscribeFailTopics) > 0 {
603 return fmt.Errorf("unsubscribe-errors: %v", unsubscribeFailTopics)
604 }
605 return nil
William Kurkianea869482019-04-09 15:16:11 -0400606}
607
npujarec5762e2020-01-01 14:08:48 +0530608func (kp *interContainerProxy) addToTransactionIdToChannelMap(id string, topic *Topic, arg chan *ic.InterContainerMessage) {
William Kurkianea869482019-04-09 15:16:11 -0400609 kp.lockTransactionIdToChannelMap.Lock()
610 defer kp.lockTransactionIdToChannelMap.Unlock()
611 if _, exist := kp.transactionIdToChannelMap[id]; !exist {
612 kp.transactionIdToChannelMap[id] = &transactionChannel{topic: topic, ch: arg}
613 }
614}
615
npujarec5762e2020-01-01 14:08:48 +0530616func (kp *interContainerProxy) deleteFromTransactionIdToChannelMap(id string) {
William Kurkianea869482019-04-09 15:16:11 -0400617 kp.lockTransactionIdToChannelMap.Lock()
618 defer kp.lockTransactionIdToChannelMap.Unlock()
619 if transChannel, exist := kp.transactionIdToChannelMap[id]; exist {
620 // Close the channel first
621 close(transChannel.ch)
622 delete(kp.transactionIdToChannelMap, id)
623 }
624}
625
npujarec5762e2020-01-01 14:08:48 +0530626func (kp *interContainerProxy) deleteTopicTransactionIdToChannelMap(id string) {
William Kurkianea869482019-04-09 15:16:11 -0400627 kp.lockTransactionIdToChannelMap.Lock()
628 defer kp.lockTransactionIdToChannelMap.Unlock()
629 for key, value := range kp.transactionIdToChannelMap {
630 if value.topic.Name == id {
631 close(value.ch)
632 delete(kp.transactionIdToChannelMap, key)
633 }
634 }
635}
636
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000637// nolint: unused
Neha Sharma96b7bf22020-06-15 10:37:32 +0000638func (kp *interContainerProxy) deleteAllTransactionIdToChannelMap(ctx context.Context) {
639 logger.Debug(ctx, "delete-all-transaction-id-channel-map")
William Kurkianea869482019-04-09 15:16:11 -0400640 kp.lockTransactionIdToChannelMap.Lock()
641 defer kp.lockTransactionIdToChannelMap.Unlock()
642 for key, value := range kp.transactionIdToChannelMap {
643 close(value.ch)
644 delete(kp.transactionIdToChannelMap, key)
645 }
646}
647
Neha Sharma96b7bf22020-06-15 10:37:32 +0000648func (kp *interContainerProxy) DeleteTopic(ctx context.Context, topic Topic) error {
William Kurkianea869482019-04-09 15:16:11 -0400649 // If we have any consumers on that topic we need to close them
Neha Sharma96b7bf22020-06-15 10:37:32 +0000650 if err := kp.deleteFromTopicResponseChannelMap(ctx, topic.Name); err != nil {
651 logger.Errorw(ctx, "delete-from-topic-responsechannelmap-failed", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400652 }
Neha Sharma96b7bf22020-06-15 10:37:32 +0000653 if err := kp.deleteFromTopicRequestHandlerChannelMap(ctx, topic.Name); err != nil {
654 logger.Errorw(ctx, "delete-from-topic-requesthandlerchannelmap-failed", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400655 }
656 kp.deleteTopicTransactionIdToChannelMap(topic.Name)
657
Neha Sharma96b7bf22020-06-15 10:37:32 +0000658 return kp.kafkaClient.DeleteTopic(ctx, &topic)
William Kurkianea869482019-04-09 15:16:11 -0400659}
660
Neha Sharma96b7bf22020-06-15 10:37:32 +0000661func encodeReturnedValue(ctx context.Context, returnedVal interface{}) (*any.Any, error) {
William Kurkianea869482019-04-09 15:16:11 -0400662 // Encode the response argument - needs to be a proto message
663 if returnedVal == nil {
664 return nil, nil
665 }
666 protoValue, ok := returnedVal.(proto.Message)
667 if !ok {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000668 logger.Warnw(ctx, "response-value-not-proto-message", log.Fields{"error": ok, "returnVal": returnedVal})
William Kurkianea869482019-04-09 15:16:11 -0400669 err := errors.New("response-value-not-proto-message")
670 return nil, err
671 }
672
673 // Marshal the returned value, if any
674 var marshalledReturnedVal *any.Any
675 var err error
676 if marshalledReturnedVal, err = ptypes.MarshalAny(protoValue); err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000677 logger.Warnw(ctx, "cannot-marshal-returned-val", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400678 return nil, err
679 }
680 return marshalledReturnedVal, nil
681}
682
Neha Sharma96b7bf22020-06-15 10:37:32 +0000683func encodeDefaultFailedResponse(ctx context.Context, request *ic.InterContainerMessage) *ic.InterContainerMessage {
William Kurkianea869482019-04-09 15:16:11 -0400684 responseHeader := &ic.Header{
685 Id: request.Header.Id,
686 Type: ic.MessageType_RESPONSE,
687 FromTopic: request.Header.ToTopic,
688 ToTopic: request.Header.FromTopic,
Scott Bakered4a8e72020-04-17 11:10:20 -0700689 Timestamp: ptypes.TimestampNow(),
William Kurkianea869482019-04-09 15:16:11 -0400690 }
691 responseBody := &ic.InterContainerResponseBody{
692 Success: false,
693 Result: nil,
694 }
695 var marshalledResponseBody *any.Any
696 var err error
697 // Error should never happen here
698 if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000699 logger.Warnw(ctx, "cannot-marshal-failed-response-body", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400700 }
701
702 return &ic.InterContainerMessage{
703 Header: responseHeader,
704 Body: marshalledResponseBody,
705 }
706
707}
708
709//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
710//or an error on failure
Neha Sharma96b7bf22020-06-15 10:37:32 +0000711func encodeResponse(ctx context.Context, request *ic.InterContainerMessage, success bool, returnedValues ...interface{}) (*ic.InterContainerMessage, error) {
712 //logger.Debugw(ctx, "encodeResponse", log.Fields{"success": success, "returnedValues": returnedValues})
William Kurkianea869482019-04-09 15:16:11 -0400713 responseHeader := &ic.Header{
714 Id: request.Header.Id,
715 Type: ic.MessageType_RESPONSE,
716 FromTopic: request.Header.ToTopic,
717 ToTopic: request.Header.FromTopic,
Matt Jeanneret384d8c92019-05-06 14:27:31 -0400718 KeyTopic: request.Header.KeyTopic,
Scott Bakered4a8e72020-04-17 11:10:20 -0700719 Timestamp: ptypes.TimestampNow(),
William Kurkianea869482019-04-09 15:16:11 -0400720 }
721
722 // Go over all returned values
723 var marshalledReturnedVal *any.Any
724 var err error
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000725
726 // for now we support only 1 returned value - (excluding the error)
727 if len(returnedValues) > 0 {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000728 if marshalledReturnedVal, err = encodeReturnedValue(ctx, returnedValues[0]); err != nil {
729 logger.Warnw(ctx, "cannot-marshal-response-body", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400730 }
William Kurkianea869482019-04-09 15:16:11 -0400731 }
732
733 responseBody := &ic.InterContainerResponseBody{
734 Success: success,
735 Result: marshalledReturnedVal,
736 }
737
738 // Marshal the response body
739 var marshalledResponseBody *any.Any
740 if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000741 logger.Warnw(ctx, "cannot-marshal-response-body", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400742 return nil, err
743 }
744
745 return &ic.InterContainerMessage{
746 Header: responseHeader,
747 Body: marshalledResponseBody,
748 }, nil
749}
750
Neha Sharma96b7bf22020-06-15 10:37:32 +0000751func CallFuncByName(ctx context.Context, myClass interface{}, funcName string, params ...interface{}) (out []reflect.Value, err error) {
William Kurkianea869482019-04-09 15:16:11 -0400752 myClassValue := reflect.ValueOf(myClass)
753 // Capitalize the first letter in the funcName to workaround the first capital letters required to
754 // invoke a function from a different package
755 funcName = strings.Title(funcName)
756 m := myClassValue.MethodByName(funcName)
757 if !m.IsValid() {
758 return make([]reflect.Value, 0), fmt.Errorf("method-not-found \"%s\"", funcName)
759 }
Neha Sharma96b7bf22020-06-15 10:37:32 +0000760 in := make([]reflect.Value, len(params)+1)
761 in[0] = reflect.ValueOf(ctx)
William Kurkianea869482019-04-09 15:16:11 -0400762 for i, param := range params {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000763 in[i+1] = reflect.ValueOf(param)
William Kurkianea869482019-04-09 15:16:11 -0400764 }
765 out = m.Call(in)
766 return
767}
768
Neha Sharma96b7bf22020-06-15 10:37:32 +0000769func (kp *interContainerProxy) addTransactionId(ctx context.Context, transactionId string, currentArgs []*ic.Argument) []*ic.Argument {
William Kurkianea869482019-04-09 15:16:11 -0400770 arg := &KVArg{
771 Key: TransactionKey,
772 Value: &ic.StrType{Val: transactionId},
773 }
774
775 var marshalledArg *any.Any
776 var err error
777 if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: transactionId}); err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000778 logger.Warnw(ctx, "cannot-add-transactionId", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400779 return currentArgs
780 }
781 protoArg := &ic.Argument{
782 Key: arg.Key,
783 Value: marshalledArg,
784 }
785 return append(currentArgs, protoArg)
786}
787
Neha Sharma96b7bf22020-06-15 10:37:32 +0000788func (kp *interContainerProxy) addFromTopic(ctx context.Context, fromTopic string, currentArgs []*ic.Argument) []*ic.Argument {
William Kurkianea869482019-04-09 15:16:11 -0400789 var marshalledArg *any.Any
790 var err error
791 if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: fromTopic}); err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000792 logger.Warnw(ctx, "cannot-add-transactionId", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400793 return currentArgs
794 }
795 protoArg := &ic.Argument{
796 Key: FromTopic,
797 Value: marshalledArg,
798 }
799 return append(currentArgs, protoArg)
800}
801
Matteo Scandolo46654682020-08-05 11:46:37 -0700802// Method to extract the Span embedded in Kafka RPC request on the receiver side. If span is found embedded in the KV args (with key as "span"),
803// it is de-serialized and injected into the Context to be carried forward by the RPC request processor thread.
804// If no span is found embedded, even then a span is created with name as "kafka-rpc-<rpc-name>" to enrich the Context for RPC calls coming
805// from components currently not sending the span (e.g. openonu adapter)
806func (kp *interContainerProxy) enrichContextWithSpan(ctx context.Context, rpcName string, args []*ic.Argument) (opentracing.Span, context.Context) {
807
808 for _, arg := range args {
809 if arg.Key == "span" {
810 var err error
811 var textMapString ic.StrType
812 if err = ptypes.UnmarshalAny(arg.Value, &textMapString); err != nil {
813 logger.Warnw(ctx, "unable-to-unmarshal-kvarg-to-textmap-string", log.Fields{"value": arg.Value})
814 break
815 }
816
817 spanTextMap := make(map[string]string)
818 if err = json.Unmarshal([]byte(textMapString.Val), &spanTextMap); err != nil {
819 logger.Warnw(ctx, "unable-to-unmarshal-textmap-from-json-string", log.Fields{"textMapString": textMapString, "error": err})
820 break
821 }
822
823 var spanContext opentracing.SpanContext
824 if spanContext, err = opentracing.GlobalTracer().Extract(opentracing.TextMap, opentracing.TextMapCarrier(spanTextMap)); err != nil {
825 logger.Warnw(ctx, "unable-to-deserialize-textmap-to-span", log.Fields{"textMap": spanTextMap, "error": err})
826 break
827 }
828
829 var receivedRpcName string
830 extractBaggage := func(k, v string) bool {
831 if k == "rpc-span-name" {
832 receivedRpcName = v
833 return false
834 }
835 return true
836 }
837
838 spanContext.ForeachBaggageItem(extractBaggage)
839
840 return opentracing.StartSpanFromContext(ctx, receivedRpcName, opentracing.FollowsFrom(spanContext))
841 }
842 }
843
844 // Create new Child Span with rpc as name if no span details were received in kafka arguments
845 var spanName strings.Builder
846 spanName.WriteString("kafka-")
847
848 // In case of inter adapter message, use Msg Type for constructing RPC name
849 if rpcName == "process_inter_adapter_message" {
850 for _, arg := range args {
851 if arg.Key == "msg" {
852 iamsg := ic.InterAdapterMessage{}
853 if err := ptypes.UnmarshalAny(arg.Value, &iamsg); err == nil {
854 spanName.WriteString("inter-adapter-")
855 rpcName = iamsg.Header.Type.String()
856 }
857 }
858 }
859 }
860
861 spanName.WriteString("rpc-")
862 spanName.WriteString(rpcName)
863
864 return opentracing.StartSpanFromContext(ctx, spanName.String())
865}
866
Neha Sharma96b7bf22020-06-15 10:37:32 +0000867func (kp *interContainerProxy) handleMessage(ctx context.Context, msg *ic.InterContainerMessage, targetInterface interface{}) {
William Kurkianea869482019-04-09 15:16:11 -0400868
869 // First extract the header to know whether this is a request - responses are handled by a different handler
870 if msg.Header.Type == ic.MessageType_REQUEST {
871 var out []reflect.Value
872 var err error
873
874 // Get the request body
875 requestBody := &ic.InterContainerRequestBody{}
876 if err = ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000877 logger.Warnw(ctx, "cannot-unmarshal-request", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400878 } else {
Matteo Scandolo46654682020-08-05 11:46:37 -0700879 span, ctx := kp.enrichContextWithSpan(ctx, requestBody.Rpc, requestBody.Args)
880 defer span.Finish()
881
Neha Sharma96b7bf22020-06-15 10:37:32 +0000882 logger.Debugw(ctx, "received-request", log.Fields{"rpc": requestBody.Rpc, "header": msg.Header})
William Kurkianea869482019-04-09 15:16:11 -0400883 // let the callee unpack the arguments as its the only one that knows the real proto type
884 // Augment the requestBody with the message Id as it will be used in scenarios where cores
885 // are set in pairs and competing
Neha Sharma96b7bf22020-06-15 10:37:32 +0000886 requestBody.Args = kp.addTransactionId(ctx, msg.Header.Id, requestBody.Args)
William Kurkianea869482019-04-09 15:16:11 -0400887
888 // Augment the requestBody with the From topic name as it will be used in scenarios where a container
889 // needs to send an unsollicited message to the currently requested container
Neha Sharma96b7bf22020-06-15 10:37:32 +0000890 requestBody.Args = kp.addFromTopic(ctx, msg.Header.FromTopic, requestBody.Args)
William Kurkianea869482019-04-09 15:16:11 -0400891
Neha Sharma96b7bf22020-06-15 10:37:32 +0000892 out, err = CallFuncByName(ctx, targetInterface, requestBody.Rpc, requestBody.Args)
William Kurkianea869482019-04-09 15:16:11 -0400893 if err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000894 logger.Warn(ctx, err)
William Kurkianea869482019-04-09 15:16:11 -0400895 }
896 }
897 // Response required?
898 if requestBody.ResponseRequired {
899 // If we already have an error before then just return that
900 var returnError *ic.Error
901 var returnedValues []interface{}
902 var success bool
903 if err != nil {
904 returnError = &ic.Error{Reason: err.Error()}
905 returnedValues = make([]interface{}, 1)
906 returnedValues[0] = returnError
907 } else {
908 returnedValues = make([]interface{}, 0)
909 // Check for errors first
910 lastIndex := len(out) - 1
911 if out[lastIndex].Interface() != nil { // Error
kdarapub26b4502019-10-05 03:02:33 +0530912 if retError, ok := out[lastIndex].Interface().(error); ok {
913 if retError.Error() == ErrorTransactionNotAcquired.Error() {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000914 logger.Debugw(ctx, "Ignoring request", log.Fields{"error": retError, "txId": msg.Header.Id})
kdarapub26b4502019-10-05 03:02:33 +0530915 return // Ignore - process is in competing mode and ignored transaction
916 }
917 returnError = &ic.Error{Reason: retError.Error()}
William Kurkianea869482019-04-09 15:16:11 -0400918 returnedValues = append(returnedValues, returnError)
919 } else { // Should never happen
920 returnError = &ic.Error{Reason: "incorrect-error-returns"}
921 returnedValues = append(returnedValues, returnError)
922 }
923 } else if len(out) == 2 && reflect.ValueOf(out[0].Interface()).IsValid() && reflect.ValueOf(out[0].Interface()).IsNil() {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000924 logger.Warnw(ctx, "Unexpected response of (nil,nil)", log.Fields{"txId": msg.Header.Id})
kdarapub26b4502019-10-05 03:02:33 +0530925 return // Ignore - should not happen
William Kurkianea869482019-04-09 15:16:11 -0400926 } else { // Non-error case
927 success = true
928 for idx, val := range out {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000929 //logger.Debugw(ctx, "returned-api-response-loop", log.Fields{"idx": idx, "val": val.Interface()})
William Kurkianea869482019-04-09 15:16:11 -0400930 if idx != lastIndex {
931 returnedValues = append(returnedValues, val.Interface())
932 }
933 }
934 }
935 }
936
937 var icm *ic.InterContainerMessage
Neha Sharma96b7bf22020-06-15 10:37:32 +0000938 if icm, err = encodeResponse(ctx, msg, success, returnedValues...); err != nil {
939 logger.Warnw(ctx, "error-encoding-response-returning-failure-result", log.Fields{"error": err})
940 icm = encodeDefaultFailedResponse(ctx, msg)
William Kurkianea869482019-04-09 15:16:11 -0400941 }
942 // To preserve ordering of messages, all messages to a given topic are sent to the same partition
943 // by providing a message key. The key is encoded in the topic name. If the deviceId is not
944 // present then the key will be empty, hence all messages for a given topic will be sent to all
945 // partitions.
946 replyTopic := &Topic{Name: msg.Header.FromTopic}
947 key := msg.Header.KeyTopic
Neha Sharma96b7bf22020-06-15 10:37:32 +0000948 logger.Debugw(ctx, "sending-response-to-kafka", log.Fields{"rpc": requestBody.Rpc, "header": icm.Header, "key": key})
William Kurkianea869482019-04-09 15:16:11 -0400949 // TODO: handle error response.
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000950 go func() {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000951 if err := kp.kafkaClient.Send(ctx, icm, replyTopic, key); err != nil {
952 logger.Errorw(ctx, "send-reply-failed", log.Fields{
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000953 "topic": replyTopic,
954 "key": key,
955 "error": err})
956 }
957 }()
William Kurkianea869482019-04-09 15:16:11 -0400958 }
959 } else if msg.Header.Type == ic.MessageType_RESPONSE {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000960 logger.Debugw(ctx, "response-received", log.Fields{"msg-header": msg.Header})
961 go kp.dispatchResponse(ctx, msg)
William Kurkianea869482019-04-09 15:16:11 -0400962 } else {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000963 logger.Warnw(ctx, "unsupported-message-received", log.Fields{"msg-header": msg.Header})
William Kurkianea869482019-04-09 15:16:11 -0400964 }
965}
966
Neha Sharma96b7bf22020-06-15 10:37:32 +0000967func (kp *interContainerProxy) waitForMessages(ctx context.Context, ch <-chan *ic.InterContainerMessage, topic Topic, targetInterface interface{}) {
William Kurkianea869482019-04-09 15:16:11 -0400968 // Wait for messages
969 for msg := range ch {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000970 //logger.Debugw(ctx, "request-received", log.Fields{"msg": msg, "topic": topic.Name, "target": targetInterface})
971 go kp.handleMessage(context.Background(), msg, targetInterface)
William Kurkianea869482019-04-09 15:16:11 -0400972 }
973}
974
Neha Sharma96b7bf22020-06-15 10:37:32 +0000975func (kp *interContainerProxy) dispatchResponse(ctx context.Context, msg *ic.InterContainerMessage) {
William Kurkianea869482019-04-09 15:16:11 -0400976 kp.lockTransactionIdToChannelMap.RLock()
977 defer kp.lockTransactionIdToChannelMap.RUnlock()
978 if _, exist := kp.transactionIdToChannelMap[msg.Header.Id]; !exist {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000979 logger.Debugw(ctx, "no-waiting-channel", log.Fields{"transaction": msg.Header.Id})
William Kurkianea869482019-04-09 15:16:11 -0400980 return
981 }
982 kp.transactionIdToChannelMap[msg.Header.Id].ch <- msg
983}
984
985// subscribeForResponse allows a caller to subscribe to a given topic when waiting for a response.
986// This method is built to prevent all subscribers to receive all messages as is the case of the Subscribe
987// API. There is one response channel waiting for kafka messages before dispatching the message to the
988// corresponding waiting channel
Neha Sharma96b7bf22020-06-15 10:37:32 +0000989func (kp *interContainerProxy) subscribeForResponse(ctx context.Context, topic Topic, trnsId string) (chan *ic.InterContainerMessage, error) {
990 logger.Debugw(ctx, "subscribeForResponse", log.Fields{"topic": topic.Name, "trnsid": trnsId})
William Kurkianea869482019-04-09 15:16:11 -0400991
992 // Create a specific channel for this consumers. We cannot use the channel from the kafkaclient as it will
993 // broadcast any message for this topic to all channels waiting on it.
divyadesaid26f6b12020-03-19 06:30:28 +0000994 // Set channel size to 1 to prevent deadlock, see VOL-2708
995 ch := make(chan *ic.InterContainerMessage, 1)
William Kurkianea869482019-04-09 15:16:11 -0400996 kp.addToTransactionIdToChannelMap(trnsId, &topic, ch)
997
998 return ch, nil
999}
1000
Neha Sharma96b7bf22020-06-15 10:37:32 +00001001func (kp *interContainerProxy) unSubscribeForResponse(ctx context.Context, trnsId string) error {
1002 logger.Debugw(ctx, "unsubscribe-for-response", log.Fields{"trnsId": trnsId})
William Kurkianea869482019-04-09 15:16:11 -04001003 kp.deleteFromTransactionIdToChannelMap(trnsId)
1004 return nil
1005}
1006
Neha Sharma96b7bf22020-06-15 10:37:32 +00001007func (kp *interContainerProxy) EnableLivenessChannel(ctx context.Context, enable bool) chan bool {
1008 return kp.kafkaClient.EnableLivenessChannel(ctx, enable)
cbabu95f21522019-11-13 14:25:18 +01001009}
1010
Neha Sharma96b7bf22020-06-15 10:37:32 +00001011func (kp *interContainerProxy) EnableHealthinessChannel(ctx context.Context, enable bool) chan bool {
1012 return kp.kafkaClient.EnableHealthinessChannel(ctx, enable)
Scott Baker86fce9a2019-12-12 09:47:17 -08001013}
1014
Neha Sharma96b7bf22020-06-15 10:37:32 +00001015func (kp *interContainerProxy) SendLiveness(ctx context.Context) error {
1016 return kp.kafkaClient.SendLiveness(ctx)
cbabu95f21522019-11-13 14:25:18 +01001017}
1018
William Kurkianea869482019-04-09 15:16:11 -04001019//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
1020//or an error on failure
Neha Sharma96b7bf22020-06-15 10:37:32 +00001021func encodeRequest(ctx context.Context, rpc string, toTopic *Topic, replyTopic *Topic, key string, kvArgs ...*KVArg) (*ic.InterContainerMessage, error) {
William Kurkianea869482019-04-09 15:16:11 -04001022 requestHeader := &ic.Header{
1023 Id: uuid.New().String(),
1024 Type: ic.MessageType_REQUEST,
1025 FromTopic: replyTopic.Name,
1026 ToTopic: toTopic.Name,
Matt Jeanneret384d8c92019-05-06 14:27:31 -04001027 KeyTopic: key,
Scott Bakered4a8e72020-04-17 11:10:20 -07001028 Timestamp: ptypes.TimestampNow(),
William Kurkianea869482019-04-09 15:16:11 -04001029 }
1030 requestBody := &ic.InterContainerRequestBody{
1031 Rpc: rpc,
1032 ResponseRequired: true,
1033 ReplyToTopic: replyTopic.Name,
1034 }
1035
1036 for _, arg := range kvArgs {
1037 if arg == nil {
1038 // In case the caller sends an array with empty args
1039 continue
1040 }
1041 var marshalledArg *any.Any
1042 var err error
1043 // ascertain the value interface type is a proto.Message
1044 protoValue, ok := arg.Value.(proto.Message)
1045 if !ok {
Neha Sharma96b7bf22020-06-15 10:37:32 +00001046 logger.Warnw(ctx, "argument-value-not-proto-message", log.Fields{"error": ok, "Value": arg.Value})
William Kurkianea869482019-04-09 15:16:11 -04001047 err := errors.New("argument-value-not-proto-message")
1048 return nil, err
1049 }
1050 if marshalledArg, err = ptypes.MarshalAny(protoValue); err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +00001051 logger.Warnw(ctx, "cannot-marshal-request", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -04001052 return nil, err
1053 }
1054 protoArg := &ic.Argument{
1055 Key: arg.Key,
1056 Value: marshalledArg,
1057 }
1058 requestBody.Args = append(requestBody.Args, protoArg)
1059 }
1060
1061 var marshalledData *any.Any
1062 var err error
1063 if marshalledData, err = ptypes.MarshalAny(requestBody); err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +00001064 logger.Warnw(ctx, "cannot-marshal-request", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -04001065 return nil, err
1066 }
1067 request := &ic.InterContainerMessage{
1068 Header: requestHeader,
1069 Body: marshalledData,
1070 }
1071 return request, nil
1072}
1073
Neha Sharma96b7bf22020-06-15 10:37:32 +00001074func decodeResponse(ctx context.Context, response *ic.InterContainerMessage) (*ic.InterContainerResponseBody, error) {
William Kurkianea869482019-04-09 15:16:11 -04001075 // Extract the message body
1076 responseBody := ic.InterContainerResponseBody{}
1077 if err := ptypes.UnmarshalAny(response.Body, &responseBody); err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +00001078 logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -04001079 return nil, err
1080 }
Neha Sharma96b7bf22020-06-15 10:37:32 +00001081 //logger.Debugw(ctx, "response-decoded-successfully", log.Fields{"response-status": &responseBody.Success})
William Kurkianea869482019-04-09 15:16:11 -04001082
1083 return &responseBody, nil
1084
1085}