blob: 92d25290b6fc2ac8adc2a5afc491912240d702de [file] [log] [blame]
Scott Baker2c1c4822019-10-16 11:02:41 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kafka
17
18import (
19 "context"
Girish Kumar74240652020-07-10 11:54:28 +000020 "encoding/json"
Scott Baker2c1c4822019-10-16 11:02:41 -070021 "errors"
22 "fmt"
Girish Kumar74240652020-07-10 11:54:28 +000023 "google.golang.org/grpc/codes"
24 "google.golang.org/grpc/status"
Scott Baker2c1c4822019-10-16 11:02:41 -070025 "reflect"
26 "strings"
27 "sync"
28 "time"
serkant.uluderyab38671c2019-11-01 09:35:38 -070029
30 "github.com/golang/protobuf/proto"
31 "github.com/golang/protobuf/ptypes"
32 "github.com/golang/protobuf/ptypes/any"
33 "github.com/google/uuid"
34 "github.com/opencord/voltha-lib-go/v3/pkg/log"
35 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
Girish Kumar74240652020-07-10 11:54:28 +000036 "github.com/opentracing/opentracing-go"
Scott Baker2c1c4822019-10-16 11:02:41 -070037)
38
Scott Baker2c1c4822019-10-16 11:02:41 -070039const (
40 DefaultMaxRetries = 3
Matteo Scandoloed128822020-02-10 15:52:35 -080041 DefaultRequestTimeout = 60000 // 60000 milliseconds - to handle a wider latency range
Scott Baker2c1c4822019-10-16 11:02:41 -070042)
43
44const (
45 TransactionKey = "transactionID"
46 FromTopic = "fromTopic"
47)
48
49var ErrorTransactionNotAcquired = errors.New("transaction-not-acquired")
50var ErrorTransactionInvalidId = errors.New("transaction-invalid-id")
51
52// requestHandlerChannel represents an interface associated with a channel. Whenever, an event is
53// obtained from that channel, this interface is invoked. This is used to handle
54// async requests into the Core via the kafka messaging bus
55type requestHandlerChannel struct {
56 requesthandlerInterface interface{}
57 ch <-chan *ic.InterContainerMessage
58}
59
60// transactionChannel represents a combination of a topic and a channel onto which a response received
61// on the kafka bus will be sent to
62type transactionChannel struct {
63 topic *Topic
64 ch chan *ic.InterContainerMessage
65}
66
Matteo Scandolo2ba00d32020-01-16 17:33:03 -080067type InterContainerProxy interface {
Neha Sharma94f16a92020-06-26 04:17:55 +000068 Start(ctx context.Context) error
69 Stop(ctx context.Context)
Matteo Scandolof346a2d2020-01-24 13:14:54 -080070 GetDefaultTopic() *Topic
Matteo Scandolo2ba00d32020-01-16 17:33:03 -080071 InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic, waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any)
Matteo Scandoloed128822020-02-10 15:52:35 -080072 InvokeAsyncRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic, waitForResponse bool, key string, kvArgs ...*KVArg) chan *RpcResponse
Neha Sharma94f16a92020-06-26 04:17:55 +000073 SubscribeWithRequestHandlerInterface(ctx context.Context, topic Topic, handler interface{}) error
74 SubscribeWithDefaultRequestHandler(ctx context.Context, topic Topic, initialOffset int64) error
75 UnSubscribeFromRequestHandler(ctx context.Context, topic Topic) error
76 DeleteTopic(ctx context.Context, topic Topic) error
77 EnableLivenessChannel(ctx context.Context, enable bool) chan bool
78 SendLiveness(ctx context.Context) error
Matteo Scandolo2ba00d32020-01-16 17:33:03 -080079}
80
81// interContainerProxy represents the messaging proxy
82type interContainerProxy struct {
Neha Sharmadd9af392020-04-28 09:03:57 +000083 kafkaAddress string
Matteo Scandolof346a2d2020-01-24 13:14:54 -080084 defaultTopic *Topic
Scott Baker2c1c4822019-10-16 11:02:41 -070085 defaultRequestHandlerInterface interface{}
Scott Baker2c1c4822019-10-16 11:02:41 -070086 kafkaClient Client
Kent Hagerman3a402302020-01-31 15:03:53 -050087 doneCh chan struct{}
88 doneOnce sync.Once
Scott Baker2c1c4822019-10-16 11:02:41 -070089
90 // This map is used to map a topic to an interface and channel. When a request is received
91 // on that channel (registered to the topic) then that interface is invoked.
92 topicToRequestHandlerChannelMap map[string]*requestHandlerChannel
93 lockTopicRequestHandlerChannelMap sync.RWMutex
94
95 // This map is used to map a channel to a response topic. This channel handles all responses on that
96 // channel for that topic and forward them to the appropriate consumers channel, using the
97 // transactionIdToChannelMap.
98 topicToResponseChannelMap map[string]<-chan *ic.InterContainerMessage
99 lockTopicResponseChannelMap sync.RWMutex
100
101 // This map is used to map a transaction to a consumers channel. This is used whenever a request has been
102 // sent out and we are waiting for a response.
103 transactionIdToChannelMap map[string]*transactionChannel
104 lockTransactionIdToChannelMap sync.RWMutex
105}
106
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800107type InterContainerProxyOption func(*interContainerProxy)
Scott Baker2c1c4822019-10-16 11:02:41 -0700108
Neha Sharmadd9af392020-04-28 09:03:57 +0000109func InterContainerAddress(address string) InterContainerProxyOption {
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800110 return func(args *interContainerProxy) {
Neha Sharmadd9af392020-04-28 09:03:57 +0000111 args.kafkaAddress = address
Scott Baker2c1c4822019-10-16 11:02:41 -0700112 }
113}
114
115func DefaultTopic(topic *Topic) InterContainerProxyOption {
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800116 return func(args *interContainerProxy) {
Matteo Scandolof346a2d2020-01-24 13:14:54 -0800117 args.defaultTopic = topic
Scott Baker2c1c4822019-10-16 11:02:41 -0700118 }
119}
120
Scott Baker2c1c4822019-10-16 11:02:41 -0700121func RequestHandlerInterface(handler interface{}) InterContainerProxyOption {
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800122 return func(args *interContainerProxy) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700123 args.defaultRequestHandlerInterface = handler
124 }
125}
126
127func MsgClient(client Client) InterContainerProxyOption {
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800128 return func(args *interContainerProxy) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700129 args.kafkaClient = client
130 }
131}
132
Kent Hagerman3a402302020-01-31 15:03:53 -0500133func newInterContainerProxy(opts ...InterContainerProxyOption) *interContainerProxy {
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800134 proxy := &interContainerProxy{
Neha Sharmadd9af392020-04-28 09:03:57 +0000135 kafkaAddress: DefaultKafkaAddress,
136 doneCh: make(chan struct{}),
Scott Baker2c1c4822019-10-16 11:02:41 -0700137 }
138
139 for _, option := range opts {
140 option(proxy)
141 }
142
Kent Hagerman3a402302020-01-31 15:03:53 -0500143 return proxy
Scott Baker2c1c4822019-10-16 11:02:41 -0700144}
145
Kent Hagerman3a402302020-01-31 15:03:53 -0500146func NewInterContainerProxy(opts ...InterContainerProxyOption) InterContainerProxy {
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800147 return newInterContainerProxy(opts...)
148}
149
Neha Sharma94f16a92020-06-26 04:17:55 +0000150func (kp *interContainerProxy) Start(ctx context.Context) error {
151 logger.Info(ctx, "Starting-Proxy")
Scott Baker2c1c4822019-10-16 11:02:41 -0700152
153 // Kafka MsgClient should already have been created. If not, output fatal error
154 if kp.kafkaClient == nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000155 logger.Fatal(ctx, "kafka-client-not-set")
Scott Baker2c1c4822019-10-16 11:02:41 -0700156 }
157
Scott Baker2c1c4822019-10-16 11:02:41 -0700158 // Start the kafka client
Neha Sharma94f16a92020-06-26 04:17:55 +0000159 if err := kp.kafkaClient.Start(ctx); err != nil {
160 logger.Errorw(ctx, "Cannot-create-kafka-proxy", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700161 return err
162 }
163
164 // Create the topic to response channel map
165 kp.topicToResponseChannelMap = make(map[string]<-chan *ic.InterContainerMessage)
166 //
167 // Create the transactionId to Channel Map
168 kp.transactionIdToChannelMap = make(map[string]*transactionChannel)
169
170 // Create the topic to request channel map
171 kp.topicToRequestHandlerChannelMap = make(map[string]*requestHandlerChannel)
172
173 return nil
174}
175
Neha Sharma94f16a92020-06-26 04:17:55 +0000176func (kp *interContainerProxy) Stop(ctx context.Context) {
177 logger.Info(ctx, "stopping-intercontainer-proxy")
Kent Hagerman3a402302020-01-31 15:03:53 -0500178 kp.doneOnce.Do(func() { close(kp.doneCh) })
Scott Baker2c1c4822019-10-16 11:02:41 -0700179 // TODO : Perform cleanup
Neha Sharma94f16a92020-06-26 04:17:55 +0000180 kp.kafkaClient.Stop(ctx)
181 err := kp.deleteAllTopicRequestHandlerChannelMap(ctx)
Scott Bakera2da2f42020-02-20 16:27:34 -0800182 if err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000183 logger.Errorw(ctx, "failed-delete-all-topic-request-handler-channel-map", log.Fields{"error": err})
Scott Bakera2da2f42020-02-20 16:27:34 -0800184 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000185 err = kp.deleteAllTopicResponseChannelMap(ctx)
Scott Bakera2da2f42020-02-20 16:27:34 -0800186 if err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000187 logger.Errorw(ctx, "failed-delete-all-topic-response-channel-map", log.Fields{"error": err})
Scott Bakera2da2f42020-02-20 16:27:34 -0800188 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000189 kp.deleteAllTransactionIdToChannelMap(ctx)
Scott Baker2c1c4822019-10-16 11:02:41 -0700190}
191
Matteo Scandolof346a2d2020-01-24 13:14:54 -0800192func (kp *interContainerProxy) GetDefaultTopic() *Topic {
193 return kp.defaultTopic
194}
195
Matteo Scandoloed128822020-02-10 15:52:35 -0800196// InvokeAsyncRPC is used to make an RPC request asynchronously
197func (kp *interContainerProxy) InvokeAsyncRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
198 waitForResponse bool, key string, kvArgs ...*KVArg) chan *RpcResponse {
199
Neha Sharma94f16a92020-06-26 04:17:55 +0000200 logger.Debugw(ctx, "InvokeAsyncRPC", log.Fields{"rpc": rpc, "key": key})
Girish Kumar74240652020-07-10 11:54:28 +0000201
202 spanArg, span, ctx := kp.embedSpanAsArg(ctx, rpc, !waitForResponse)
203 if spanArg != nil {
204 kvArgs = append(kvArgs, &spanArg[0])
205 }
206 defer span.Finish()
207
Matteo Scandoloed128822020-02-10 15:52:35 -0800208 // If a replyToTopic is provided then we use it, otherwise just use the default toTopic. The replyToTopic is
209 // typically the device ID.
210 responseTopic := replyToTopic
211 if responseTopic == nil {
212 responseTopic = kp.GetDefaultTopic()
213 }
214
215 chnl := make(chan *RpcResponse)
216
217 go func() {
218
219 // once we're done,
220 // close the response channel
221 defer close(chnl)
222
223 var err error
224 var protoRequest *ic.InterContainerMessage
225
226 // Encode the request
Neha Sharma94f16a92020-06-26 04:17:55 +0000227 protoRequest, err = encodeRequest(ctx, rpc, toTopic, responseTopic, key, kvArgs...)
Matteo Scandoloed128822020-02-10 15:52:35 -0800228 if err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000229 logger.Warnw(ctx, "cannot-format-request", log.Fields{"rpc": rpc, "error": err})
Girish Kumar74240652020-07-10 11:54:28 +0000230 log.MarkSpanError(ctx, errors.New("cannot-format-request"))
Matteo Scandoloed128822020-02-10 15:52:35 -0800231 chnl <- NewResponse(RpcFormattingError, err, nil)
232 return
233 }
234
235 // Subscribe for response, if needed, before sending request
236 var ch <-chan *ic.InterContainerMessage
Neha Sharma94f16a92020-06-26 04:17:55 +0000237 if ch, err = kp.subscribeForResponse(ctx, *responseTopic, protoRequest.Header.Id); err != nil {
238 logger.Errorw(ctx, "failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
Girish Kumar74240652020-07-10 11:54:28 +0000239 log.MarkSpanError(ctx, errors.New("failed-to-subscribe-for-response"))
Matteo Scandoloed128822020-02-10 15:52:35 -0800240 chnl <- NewResponse(RpcTransportError, err, nil)
241 return
242 }
243
244 // Send request - if the topic is formatted with a device Id then we will send the request using a
245 // specific key, hence ensuring a single partition is used to publish the request. This ensures that the
246 // subscriber on that topic will receive the request in the order it was sent. The key used is the deviceId.
Neha Sharma94f16a92020-06-26 04:17:55 +0000247 logger.Debugw(ctx, "sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
Matteo Scandoloed128822020-02-10 15:52:35 -0800248
249 // if the message is not sent on kafka publish an event an close the channel
Neha Sharma94f16a92020-06-26 04:17:55 +0000250 if err = kp.kafkaClient.Send(ctx, protoRequest, toTopic, key); err != nil {
Matteo Scandoloed128822020-02-10 15:52:35 -0800251 chnl <- NewResponse(RpcTransportError, err, nil)
252 return
253 }
254
255 // if the client is not waiting for a response send the ack and close the channel
256 chnl <- NewResponse(RpcSent, nil, nil)
257 if !waitForResponse {
258 return
259 }
260
261 defer func() {
262 // Remove the subscription for a response on return
Neha Sharma94f16a92020-06-26 04:17:55 +0000263 if err := kp.unSubscribeForResponse(ctx, protoRequest.Header.Id); err != nil {
264 logger.Warnw(ctx, "invoke-async-rpc-unsubscriber-for-response-failed", log.Fields{"err": err})
Matteo Scandoloed128822020-02-10 15:52:35 -0800265 }
266 }()
267
268 // Wait for response as well as timeout or cancellation
269 select {
270 case msg, ok := <-ch:
271 if !ok {
Neha Sharma94f16a92020-06-26 04:17:55 +0000272 logger.Warnw(ctx, "channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
Girish Kumar74240652020-07-10 11:54:28 +0000273 log.MarkSpanError(ctx, errors.New("channel-closed"))
Matteo Scandoloed128822020-02-10 15:52:35 -0800274 chnl <- NewResponse(RpcTransportError, status.Error(codes.Aborted, "channel closed"), nil)
275 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000276 logger.Debugw(ctx, "received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
277 if responseBody, err := decodeResponse(ctx, msg); err != nil {
Matteo Scandoloed128822020-02-10 15:52:35 -0800278 chnl <- NewResponse(RpcReply, err, nil)
279 } else {
280 if responseBody.Success {
281 chnl <- NewResponse(RpcReply, nil, responseBody.Result)
282 } else {
283 // response body contains an error
284 unpackErr := &ic.Error{}
285 if err := ptypes.UnmarshalAny(responseBody.Result, unpackErr); err != nil {
286 chnl <- NewResponse(RpcReply, err, nil)
287 } else {
288 chnl <- NewResponse(RpcReply, status.Error(codes.Internal, unpackErr.Reason), nil)
289 }
290 }
291 }
292 case <-ctx.Done():
Neha Sharma94f16a92020-06-26 04:17:55 +0000293 logger.Errorw(ctx, "context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
Girish Kumar74240652020-07-10 11:54:28 +0000294 log.MarkSpanError(ctx, errors.New("context-cancelled"))
Matteo Scandoloed128822020-02-10 15:52:35 -0800295 err := status.Error(codes.DeadlineExceeded, ctx.Err().Error())
296 chnl <- NewResponse(RpcTimeout, err, nil)
297 case <-kp.doneCh:
298 chnl <- NewResponse(RpcSystemClosing, nil, nil)
Neha Sharma94f16a92020-06-26 04:17:55 +0000299 logger.Warnw(ctx, "received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
Matteo Scandoloed128822020-02-10 15:52:35 -0800300 }
301 }()
302 return chnl
303}
304
Girish Kumar74240652020-07-10 11:54:28 +0000305// Method to extract Open-tracing Span from Context and serialize it for transport over Kafka embedded as a additional argument.
306// Additional argument is injected using key as "span" and value as Span marshalled into a byte slice
307//
308// The span name is automatically constructed using the RPC name with following convention (<rpc-name> represents name of invoked method):
309// - RPC invoked in Sync manner (WaitForResponse=true) : kafka-rpc-<rpc-name>
310// - RPC invoked in Async manner (WaitForResponse=false) : kafka-async-rpc-<rpc-name>
311// - Inter Adapter RPC invoked in Sync manner (WaitForResponse=true) : kafka-inter-adapter-rpc-<rpc-name>
312// - Inter Adapter RPC invoked in Async manner (WaitForResponse=false) : kafka-inter-adapter-async-rpc-<rpc-name>
313func (kp *interContainerProxy) embedSpanAsArg(ctx context.Context, rpc string, isAsync bool) ([]KVArg, opentracing.Span, context.Context) {
314 var err error
315 var newCtx context.Context
316 var spanToInject opentracing.Span
317
318 var spanName strings.Builder
319 spanName.WriteString("kafka-")
320
321 // In case of inter adapter message, use Msg Type for constructing RPC name
322 if rpc == "process_inter_adapter_message" {
323 if msgType, ok := ctx.Value("inter-adapter-msg-type").(ic.InterAdapterMessageType_Types); ok {
324 spanName.WriteString("inter-adapter-")
325 rpc = msgType.String()
326 }
327 }
328
329 if isAsync {
330 spanName.WriteString("async-rpc-")
331 } else {
332 spanName.WriteString("rpc-")
333 }
334 spanName.WriteString(rpc)
335
336 if isAsync {
337 spanToInject, newCtx = log.CreateAsyncSpan(ctx, spanName.String())
338 } else {
339 spanToInject, newCtx = log.CreateChildSpan(ctx, spanName.String())
340 }
341
342 spanToInject.SetBaggageItem("rpc-span-name", spanName.String())
343
344 textMapCarrier := opentracing.TextMapCarrier(make(map[string]string))
345 if err = opentracing.GlobalTracer().Inject(spanToInject.Context(), opentracing.TextMap, textMapCarrier); err != nil {
346 logger.Warnw(ctx, "unable-to-serialize-span-to-textmap", log.Fields{"span": spanToInject, "error": err})
347 return nil, spanToInject, newCtx
348 }
349
350 var textMapJson []byte
351 if textMapJson, err = json.Marshal(textMapCarrier); err != nil {
352 logger.Warnw(ctx, "unable-to-marshal-textmap-to-json-string", log.Fields{"textMap": textMapCarrier, "error": err})
353 return nil, spanToInject, newCtx
354 }
355
356 spanArg := make([]KVArg, 1)
357 spanArg[0] = KVArg{Key: "span", Value: &ic.StrType{Val: string(textMapJson)}}
358 return spanArg, spanToInject, newCtx
359}
360
Scott Baker2c1c4822019-10-16 11:02:41 -0700361// InvokeRPC is used to send a request to a given topic
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800362func (kp *interContainerProxy) InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
Scott Baker2c1c4822019-10-16 11:02:41 -0700363 waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any) {
364
Girish Kumar74240652020-07-10 11:54:28 +0000365 spanArg, span, ctx := kp.embedSpanAsArg(ctx, rpc, false)
366 if spanArg != nil {
367 kvArgs = append(kvArgs, &spanArg[0])
368 }
369 defer span.Finish()
370
Scott Baker2c1c4822019-10-16 11:02:41 -0700371 // If a replyToTopic is provided then we use it, otherwise just use the default toTopic. The replyToTopic is
372 // typically the device ID.
373 responseTopic := replyToTopic
374 if responseTopic == nil {
Matteo Scandolof346a2d2020-01-24 13:14:54 -0800375 responseTopic = kp.defaultTopic
Scott Baker2c1c4822019-10-16 11:02:41 -0700376 }
377
378 // Encode the request
Neha Sharma94f16a92020-06-26 04:17:55 +0000379 protoRequest, err := encodeRequest(ctx, rpc, toTopic, responseTopic, key, kvArgs...)
Scott Baker2c1c4822019-10-16 11:02:41 -0700380 if err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000381 logger.Warnw(ctx, "cannot-format-request", log.Fields{"rpc": rpc, "error": err})
Girish Kumar74240652020-07-10 11:54:28 +0000382 log.MarkSpanError(ctx, errors.New("cannot-format-request"))
Scott Baker2c1c4822019-10-16 11:02:41 -0700383 return false, nil
384 }
385
386 // Subscribe for response, if needed, before sending request
387 var ch <-chan *ic.InterContainerMessage
388 if waitForResponse {
389 var err error
Neha Sharma94f16a92020-06-26 04:17:55 +0000390 if ch, err = kp.subscribeForResponse(ctx, *responseTopic, protoRequest.Header.Id); err != nil {
391 logger.Errorw(ctx, "failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700392 }
393 }
394
395 // Send request - if the topic is formatted with a device Id then we will send the request using a
396 // specific key, hence ensuring a single partition is used to publish the request. This ensures that the
397 // subscriber on that topic will receive the request in the order it was sent. The key used is the deviceId.
398 //key := GetDeviceIdFromTopic(*toTopic)
Neha Sharma94f16a92020-06-26 04:17:55 +0000399 logger.Debugw(ctx, "sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800400 go func() {
Neha Sharma94f16a92020-06-26 04:17:55 +0000401 if err := kp.kafkaClient.Send(ctx, protoRequest, toTopic, key); err != nil {
Girish Kumar74240652020-07-10 11:54:28 +0000402 log.MarkSpanError(ctx, errors.New("send-failed"))
Neha Sharma94f16a92020-06-26 04:17:55 +0000403 logger.Errorw(ctx, "send-failed", log.Fields{
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800404 "topic": toTopic,
405 "key": key,
406 "error": err})
407 }
408 }()
Scott Baker2c1c4822019-10-16 11:02:41 -0700409
410 if waitForResponse {
411 // Create a child context based on the parent context, if any
412 var cancel context.CancelFunc
413 childCtx := context.Background()
414 if ctx == nil {
415 ctx, cancel = context.WithTimeout(context.Background(), DefaultRequestTimeout*time.Millisecond)
416 } else {
417 childCtx, cancel = context.WithTimeout(ctx, DefaultRequestTimeout*time.Millisecond)
418 }
419 defer cancel()
420
421 // Wait for response as well as timeout or cancellation
422 // Remove the subscription for a response on return
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800423 defer func() {
Neha Sharma94f16a92020-06-26 04:17:55 +0000424 if err := kp.unSubscribeForResponse(ctx, protoRequest.Header.Id); err != nil {
425 logger.Errorw(ctx, "response-unsubscribe-failed", log.Fields{
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800426 "id": protoRequest.Header.Id,
427 "error": err})
428 }
429 }()
Scott Baker2c1c4822019-10-16 11:02:41 -0700430 select {
431 case msg, ok := <-ch:
432 if !ok {
Neha Sharma94f16a92020-06-26 04:17:55 +0000433 logger.Warnw(ctx, "channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
Girish Kumar74240652020-07-10 11:54:28 +0000434 log.MarkSpanError(ctx, errors.New("channel-closed"))
Scott Baker2c1c4822019-10-16 11:02:41 -0700435 protoError := &ic.Error{Reason: "channel-closed"}
436 var marshalledArg *any.Any
437 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
438 return false, nil // Should never happen
439 }
440 return false, marshalledArg
441 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000442 logger.Debugw(ctx, "received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
Scott Baker2c1c4822019-10-16 11:02:41 -0700443 var responseBody *ic.InterContainerResponseBody
444 var err error
Neha Sharma94f16a92020-06-26 04:17:55 +0000445 if responseBody, err = decodeResponse(ctx, msg); err != nil {
446 logger.Errorw(ctx, "decode-response-error", log.Fields{"error": err})
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800447 // FIXME we should return something
Scott Baker2c1c4822019-10-16 11:02:41 -0700448 }
449 return responseBody.Success, responseBody.Result
450 case <-ctx.Done():
Neha Sharma94f16a92020-06-26 04:17:55 +0000451 logger.Debugw(ctx, "context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
Girish Kumar74240652020-07-10 11:54:28 +0000452 log.MarkSpanError(ctx, errors.New("context-cancelled"))
Scott Baker2c1c4822019-10-16 11:02:41 -0700453 // pack the error as proto any type
Matteo Scandolob45cf592020-01-21 16:10:56 -0800454 protoError := &ic.Error{Reason: ctx.Err().Error(), Code: ic.ErrorCode_DEADLINE_EXCEEDED}
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800455
Scott Baker2c1c4822019-10-16 11:02:41 -0700456 var marshalledArg *any.Any
457 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
458 return false, nil // Should never happen
459 }
460 return false, marshalledArg
461 case <-childCtx.Done():
Neha Sharma94f16a92020-06-26 04:17:55 +0000462 logger.Debugw(ctx, "context-cancelled", log.Fields{"rpc": rpc, "ctx": childCtx.Err()})
Girish Kumar74240652020-07-10 11:54:28 +0000463 log.MarkSpanError(ctx, errors.New("context-cancelled"))
Scott Baker2c1c4822019-10-16 11:02:41 -0700464 // pack the error as proto any type
Matteo Scandolob45cf592020-01-21 16:10:56 -0800465 protoError := &ic.Error{Reason: childCtx.Err().Error(), Code: ic.ErrorCode_DEADLINE_EXCEEDED}
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800466
Scott Baker2c1c4822019-10-16 11:02:41 -0700467 var marshalledArg *any.Any
468 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
469 return false, nil // Should never happen
470 }
471 return false, marshalledArg
472 case <-kp.doneCh:
Neha Sharma94f16a92020-06-26 04:17:55 +0000473 logger.Infow(ctx, "received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
Scott Baker2c1c4822019-10-16 11:02:41 -0700474 return true, nil
475 }
476 }
477 return true, nil
478}
479
480// SubscribeWithRequestHandlerInterface allows a caller to assign a target object to be invoked automatically
481// when a message is received on a given topic
Neha Sharma94f16a92020-06-26 04:17:55 +0000482func (kp *interContainerProxy) SubscribeWithRequestHandlerInterface(ctx context.Context, topic Topic, handler interface{}) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700483
484 // Subscribe to receive messages for that topic
485 var ch <-chan *ic.InterContainerMessage
486 var err error
Neha Sharma94f16a92020-06-26 04:17:55 +0000487 if ch, err = kp.kafkaClient.Subscribe(ctx, &topic); err != nil {
Scott Baker2c1c4822019-10-16 11:02:41 -0700488 //if ch, err = kp.Subscribe(topic); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000489 logger.Errorw(ctx, "failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700490 return err
491 }
492
493 kp.defaultRequestHandlerInterface = handler
494 kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: handler, ch: ch})
495 // Launch a go routine to receive and process kafka messages
Neha Sharma94f16a92020-06-26 04:17:55 +0000496 go kp.waitForMessages(ctx, ch, topic, handler)
Scott Baker2c1c4822019-10-16 11:02:41 -0700497
498 return nil
499}
500
501// SubscribeWithDefaultRequestHandler allows a caller to add a topic to an existing target object to be invoked automatically
502// when a message is received on a given topic. So far there is only 1 target registered per microservice
Neha Sharma94f16a92020-06-26 04:17:55 +0000503func (kp *interContainerProxy) SubscribeWithDefaultRequestHandler(ctx context.Context, topic Topic, initialOffset int64) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700504 // Subscribe to receive messages for that topic
505 var ch <-chan *ic.InterContainerMessage
506 var err error
Neha Sharma94f16a92020-06-26 04:17:55 +0000507 if ch, err = kp.kafkaClient.Subscribe(ctx, &topic, &KVArg{Key: Offset, Value: initialOffset}); err != nil {
508 logger.Errorw(ctx, "failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700509 return err
510 }
511 kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: kp.defaultRequestHandlerInterface, ch: ch})
512
513 // Launch a go routine to receive and process kafka messages
Neha Sharma94f16a92020-06-26 04:17:55 +0000514 go kp.waitForMessages(ctx, ch, topic, kp.defaultRequestHandlerInterface)
Scott Baker2c1c4822019-10-16 11:02:41 -0700515
516 return nil
517}
518
Neha Sharma94f16a92020-06-26 04:17:55 +0000519func (kp *interContainerProxy) UnSubscribeFromRequestHandler(ctx context.Context, topic Topic) error {
520 return kp.deleteFromTopicRequestHandlerChannelMap(ctx, topic.Name)
Scott Baker2c1c4822019-10-16 11:02:41 -0700521}
522
Neha Sharma94f16a92020-06-26 04:17:55 +0000523func (kp *interContainerProxy) deleteFromTopicResponseChannelMap(ctx context.Context, topic string) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700524 kp.lockTopicResponseChannelMap.Lock()
525 defer kp.lockTopicResponseChannelMap.Unlock()
526 if _, exist := kp.topicToResponseChannelMap[topic]; exist {
527 // Unsubscribe to this topic first - this will close the subscribed channel
528 var err error
Neha Sharma94f16a92020-06-26 04:17:55 +0000529 if err = kp.kafkaClient.UnSubscribe(ctx, &Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
530 logger.Errorw(ctx, "unsubscribing-error", log.Fields{"topic": topic})
Scott Baker2c1c4822019-10-16 11:02:41 -0700531 }
532 delete(kp.topicToResponseChannelMap, topic)
533 return err
534 } else {
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800535 return fmt.Errorf("%s-Topic-not-found", topic)
Scott Baker2c1c4822019-10-16 11:02:41 -0700536 }
537}
538
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800539// nolint: unused
Neha Sharma94f16a92020-06-26 04:17:55 +0000540func (kp *interContainerProxy) deleteAllTopicResponseChannelMap(ctx context.Context) error {
541 logger.Debug(ctx, "delete-all-topic-response-channel")
Scott Baker2c1c4822019-10-16 11:02:41 -0700542 kp.lockTopicResponseChannelMap.Lock()
543 defer kp.lockTopicResponseChannelMap.Unlock()
Scott Bakera2da2f42020-02-20 16:27:34 -0800544 var unsubscribeFailTopics []string
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800545 for topic := range kp.topicToResponseChannelMap {
Scott Baker2c1c4822019-10-16 11:02:41 -0700546 // Unsubscribe to this topic first - this will close the subscribed channel
Neha Sharma94f16a92020-06-26 04:17:55 +0000547 if err := kp.kafkaClient.UnSubscribe(ctx, &Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
Scott Bakera2da2f42020-02-20 16:27:34 -0800548 unsubscribeFailTopics = append(unsubscribeFailTopics, topic)
Neha Sharma94f16a92020-06-26 04:17:55 +0000549 logger.Errorw(ctx, "unsubscribing-error", log.Fields{"topic": topic, "error": err})
Scott Bakera2da2f42020-02-20 16:27:34 -0800550 // Do not return. Continue to try to unsubscribe to other topics.
551 } else {
552 // Only delete from channel map if successfully unsubscribed.
553 delete(kp.topicToResponseChannelMap, topic)
Scott Baker2c1c4822019-10-16 11:02:41 -0700554 }
Scott Baker2c1c4822019-10-16 11:02:41 -0700555 }
Scott Bakera2da2f42020-02-20 16:27:34 -0800556 if len(unsubscribeFailTopics) > 0 {
557 return fmt.Errorf("unsubscribe-errors: %v", unsubscribeFailTopics)
558 }
559 return nil
Scott Baker2c1c4822019-10-16 11:02:41 -0700560}
561
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800562func (kp *interContainerProxy) addToTopicRequestHandlerChannelMap(topic string, arg *requestHandlerChannel) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700563 kp.lockTopicRequestHandlerChannelMap.Lock()
564 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
565 if _, exist := kp.topicToRequestHandlerChannelMap[topic]; !exist {
566 kp.topicToRequestHandlerChannelMap[topic] = arg
567 }
568}
569
Neha Sharma94f16a92020-06-26 04:17:55 +0000570func (kp *interContainerProxy) deleteFromTopicRequestHandlerChannelMap(ctx context.Context, topic string) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700571 kp.lockTopicRequestHandlerChannelMap.Lock()
572 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
573 if _, exist := kp.topicToRequestHandlerChannelMap[topic]; exist {
574 // Close the kafka client client first by unsubscribing to this topic
Neha Sharma94f16a92020-06-26 04:17:55 +0000575 if err := kp.kafkaClient.UnSubscribe(ctx, &Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800576 return err
577 }
Scott Baker2c1c4822019-10-16 11:02:41 -0700578 delete(kp.topicToRequestHandlerChannelMap, topic)
579 return nil
580 } else {
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800581 return fmt.Errorf("%s-Topic-not-found", topic)
Scott Baker2c1c4822019-10-16 11:02:41 -0700582 }
583}
584
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800585// nolint: unused
Neha Sharma94f16a92020-06-26 04:17:55 +0000586func (kp *interContainerProxy) deleteAllTopicRequestHandlerChannelMap(ctx context.Context) error {
587 logger.Debug(ctx, "delete-all-topic-request-channel")
Scott Baker2c1c4822019-10-16 11:02:41 -0700588 kp.lockTopicRequestHandlerChannelMap.Lock()
589 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
Scott Bakera2da2f42020-02-20 16:27:34 -0800590 var unsubscribeFailTopics []string
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800591 for topic := range kp.topicToRequestHandlerChannelMap {
Scott Baker2c1c4822019-10-16 11:02:41 -0700592 // Close the kafka client client first by unsubscribing to this topic
Neha Sharma94f16a92020-06-26 04:17:55 +0000593 if err := kp.kafkaClient.UnSubscribe(ctx, &Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
Scott Bakera2da2f42020-02-20 16:27:34 -0800594 unsubscribeFailTopics = append(unsubscribeFailTopics, topic)
Neha Sharma94f16a92020-06-26 04:17:55 +0000595 logger.Errorw(ctx, "unsubscribing-error", log.Fields{"topic": topic, "error": err})
Scott Bakera2da2f42020-02-20 16:27:34 -0800596 // Do not return. Continue to try to unsubscribe to other topics.
597 } else {
598 // Only delete from channel map if successfully unsubscribed.
599 delete(kp.topicToRequestHandlerChannelMap, topic)
Scott Baker2c1c4822019-10-16 11:02:41 -0700600 }
Scott Baker2c1c4822019-10-16 11:02:41 -0700601 }
Scott Bakera2da2f42020-02-20 16:27:34 -0800602 if len(unsubscribeFailTopics) > 0 {
603 return fmt.Errorf("unsubscribe-errors: %v", unsubscribeFailTopics)
604 }
605 return nil
Scott Baker2c1c4822019-10-16 11:02:41 -0700606}
607
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800608func (kp *interContainerProxy) addToTransactionIdToChannelMap(id string, topic *Topic, arg chan *ic.InterContainerMessage) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700609 kp.lockTransactionIdToChannelMap.Lock()
610 defer kp.lockTransactionIdToChannelMap.Unlock()
611 if _, exist := kp.transactionIdToChannelMap[id]; !exist {
612 kp.transactionIdToChannelMap[id] = &transactionChannel{topic: topic, ch: arg}
613 }
614}
615
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800616func (kp *interContainerProxy) deleteFromTransactionIdToChannelMap(id string) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700617 kp.lockTransactionIdToChannelMap.Lock()
618 defer kp.lockTransactionIdToChannelMap.Unlock()
619 if transChannel, exist := kp.transactionIdToChannelMap[id]; exist {
620 // Close the channel first
621 close(transChannel.ch)
622 delete(kp.transactionIdToChannelMap, id)
623 }
624}
625
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800626func (kp *interContainerProxy) deleteTopicTransactionIdToChannelMap(id string) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700627 kp.lockTransactionIdToChannelMap.Lock()
628 defer kp.lockTransactionIdToChannelMap.Unlock()
629 for key, value := range kp.transactionIdToChannelMap {
630 if value.topic.Name == id {
631 close(value.ch)
632 delete(kp.transactionIdToChannelMap, key)
633 }
634 }
635}
636
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800637// nolint: unused
Neha Sharma94f16a92020-06-26 04:17:55 +0000638func (kp *interContainerProxy) deleteAllTransactionIdToChannelMap(ctx context.Context) {
639 logger.Debug(ctx, "delete-all-transaction-id-channel-map")
Scott Baker2c1c4822019-10-16 11:02:41 -0700640 kp.lockTransactionIdToChannelMap.Lock()
641 defer kp.lockTransactionIdToChannelMap.Unlock()
642 for key, value := range kp.transactionIdToChannelMap {
643 close(value.ch)
644 delete(kp.transactionIdToChannelMap, key)
645 }
646}
647
Neha Sharma94f16a92020-06-26 04:17:55 +0000648func (kp *interContainerProxy) DeleteTopic(ctx context.Context, topic Topic) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700649 // If we have any consumers on that topic we need to close them
Neha Sharma94f16a92020-06-26 04:17:55 +0000650 if err := kp.deleteFromTopicResponseChannelMap(ctx, topic.Name); err != nil {
651 logger.Errorw(ctx, "delete-from-topic-responsechannelmap-failed", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700652 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000653 if err := kp.deleteFromTopicRequestHandlerChannelMap(ctx, topic.Name); err != nil {
654 logger.Errorw(ctx, "delete-from-topic-requesthandlerchannelmap-failed", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700655 }
656 kp.deleteTopicTransactionIdToChannelMap(topic.Name)
657
Neha Sharma94f16a92020-06-26 04:17:55 +0000658 return kp.kafkaClient.DeleteTopic(ctx, &topic)
Scott Baker2c1c4822019-10-16 11:02:41 -0700659}
660
Neha Sharma94f16a92020-06-26 04:17:55 +0000661func encodeReturnedValue(ctx context.Context, returnedVal interface{}) (*any.Any, error) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700662 // Encode the response argument - needs to be a proto message
663 if returnedVal == nil {
664 return nil, nil
665 }
666 protoValue, ok := returnedVal.(proto.Message)
667 if !ok {
Neha Sharma94f16a92020-06-26 04:17:55 +0000668 logger.Warnw(ctx, "response-value-not-proto-message", log.Fields{"error": ok, "returnVal": returnedVal})
Scott Baker2c1c4822019-10-16 11:02:41 -0700669 err := errors.New("response-value-not-proto-message")
670 return nil, err
671 }
672
673 // Marshal the returned value, if any
674 var marshalledReturnedVal *any.Any
675 var err error
676 if marshalledReturnedVal, err = ptypes.MarshalAny(protoValue); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000677 logger.Warnw(ctx, "cannot-marshal-returned-val", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700678 return nil, err
679 }
680 return marshalledReturnedVal, nil
681}
682
Neha Sharma94f16a92020-06-26 04:17:55 +0000683func encodeDefaultFailedResponse(ctx context.Context, request *ic.InterContainerMessage) *ic.InterContainerMessage {
Scott Baker2c1c4822019-10-16 11:02:41 -0700684 responseHeader := &ic.Header{
685 Id: request.Header.Id,
686 Type: ic.MessageType_RESPONSE,
687 FromTopic: request.Header.ToTopic,
688 ToTopic: request.Header.FromTopic,
Scott Baker84a55ce2020-04-17 10:11:30 -0700689 Timestamp: ptypes.TimestampNow(),
Scott Baker2c1c4822019-10-16 11:02:41 -0700690 }
691 responseBody := &ic.InterContainerResponseBody{
692 Success: false,
693 Result: nil,
694 }
695 var marshalledResponseBody *any.Any
696 var err error
697 // Error should never happen here
698 if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000699 logger.Warnw(ctx, "cannot-marshal-failed-response-body", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700700 }
701
702 return &ic.InterContainerMessage{
703 Header: responseHeader,
704 Body: marshalledResponseBody,
705 }
706
707}
708
709//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
710//or an error on failure
Neha Sharma94f16a92020-06-26 04:17:55 +0000711func encodeResponse(ctx context.Context, request *ic.InterContainerMessage, success bool, returnedValues ...interface{}) (*ic.InterContainerMessage, error) {
712 //logger.Debugw(ctx, "encodeResponse", log.Fields{"success": success, "returnedValues": returnedValues})
Scott Baker2c1c4822019-10-16 11:02:41 -0700713 responseHeader := &ic.Header{
714 Id: request.Header.Id,
715 Type: ic.MessageType_RESPONSE,
716 FromTopic: request.Header.ToTopic,
717 ToTopic: request.Header.FromTopic,
718 KeyTopic: request.Header.KeyTopic,
Scott Baker84a55ce2020-04-17 10:11:30 -0700719 Timestamp: ptypes.TimestampNow(),
Scott Baker2c1c4822019-10-16 11:02:41 -0700720 }
721
722 // Go over all returned values
723 var marshalledReturnedVal *any.Any
724 var err error
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800725
726 // for now we support only 1 returned value - (excluding the error)
727 if len(returnedValues) > 0 {
Neha Sharma94f16a92020-06-26 04:17:55 +0000728 if marshalledReturnedVal, err = encodeReturnedValue(ctx, returnedValues[0]); err != nil {
729 logger.Warnw(ctx, "cannot-marshal-response-body", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700730 }
Scott Baker2c1c4822019-10-16 11:02:41 -0700731 }
732
733 responseBody := &ic.InterContainerResponseBody{
734 Success: success,
735 Result: marshalledReturnedVal,
736 }
737
738 // Marshal the response body
739 var marshalledResponseBody *any.Any
740 if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000741 logger.Warnw(ctx, "cannot-marshal-response-body", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700742 return nil, err
743 }
744
745 return &ic.InterContainerMessage{
746 Header: responseHeader,
747 Body: marshalledResponseBody,
748 }, nil
749}
750
Neha Sharma94f16a92020-06-26 04:17:55 +0000751func CallFuncByName(ctx context.Context, myClass interface{}, funcName string, params ...interface{}) (out []reflect.Value, err error) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700752 myClassValue := reflect.ValueOf(myClass)
753 // Capitalize the first letter in the funcName to workaround the first capital letters required to
754 // invoke a function from a different package
755 funcName = strings.Title(funcName)
756 m := myClassValue.MethodByName(funcName)
757 if !m.IsValid() {
758 return make([]reflect.Value, 0), fmt.Errorf("method-not-found \"%s\"", funcName)
759 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000760 in := make([]reflect.Value, len(params)+1)
761 in[0] = reflect.ValueOf(ctx)
Scott Baker2c1c4822019-10-16 11:02:41 -0700762 for i, param := range params {
Neha Sharma94f16a92020-06-26 04:17:55 +0000763 in[i+1] = reflect.ValueOf(param)
Scott Baker2c1c4822019-10-16 11:02:41 -0700764 }
765 out = m.Call(in)
766 return
767}
768
Neha Sharma94f16a92020-06-26 04:17:55 +0000769func (kp *interContainerProxy) addTransactionId(ctx context.Context, transactionId string, currentArgs []*ic.Argument) []*ic.Argument {
Scott Baker2c1c4822019-10-16 11:02:41 -0700770 arg := &KVArg{
771 Key: TransactionKey,
772 Value: &ic.StrType{Val: transactionId},
773 }
774
775 var marshalledArg *any.Any
776 var err error
777 if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: transactionId}); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000778 logger.Warnw(ctx, "cannot-add-transactionId", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700779 return currentArgs
780 }
781 protoArg := &ic.Argument{
782 Key: arg.Key,
783 Value: marshalledArg,
784 }
785 return append(currentArgs, protoArg)
786}
787
Neha Sharma94f16a92020-06-26 04:17:55 +0000788func (kp *interContainerProxy) addFromTopic(ctx context.Context, fromTopic string, currentArgs []*ic.Argument) []*ic.Argument {
Scott Baker2c1c4822019-10-16 11:02:41 -0700789 var marshalledArg *any.Any
790 var err error
791 if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: fromTopic}); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000792 logger.Warnw(ctx, "cannot-add-transactionId", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700793 return currentArgs
794 }
795 protoArg := &ic.Argument{
796 Key: FromTopic,
797 Value: marshalledArg,
798 }
799 return append(currentArgs, protoArg)
800}
801
Girish Kumar74240652020-07-10 11:54:28 +0000802// Method to extract the Span embedded in Kafka RPC request on the receiver side. If span is found embedded in the KV args (with key as "span"),
803// it is de-serialized and injected into the Context to be carried forward by the RPC request processor thread.
804// If no span is found embedded, even then a span is created with name as "kafka-rpc-<rpc-name>" to enrich the Context for RPC calls coming
805// from components currently not sending the span (e.g. openonu adapter)
806func (kp *interContainerProxy) enrichContextWithSpan(ctx context.Context, rpcName string, args []*ic.Argument) (opentracing.Span, context.Context) {
807
808 for _, arg := range args {
809 if arg.Key == "span" {
810 var err error
811 var textMapString ic.StrType
812 if err = ptypes.UnmarshalAny(arg.Value, &textMapString); err != nil {
813 logger.Warnw(ctx, "unable-to-unmarshal-kvarg-to-textmap-string", log.Fields{"value": arg.Value})
814 break
815 }
816
817 spanTextMap := make(map[string]string)
818 if err = json.Unmarshal([]byte(textMapString.Val), &spanTextMap); err != nil {
819 logger.Warnw(ctx, "unable-to-unmarshal-textmap-from-json-string", log.Fields{"textMapString": textMapString, "error": err})
820 break
821 }
822
823 var spanContext opentracing.SpanContext
824 if spanContext, err = opentracing.GlobalTracer().Extract(opentracing.TextMap, opentracing.TextMapCarrier(spanTextMap)); err != nil {
825 logger.Warnw(ctx, "unable-to-deserialize-textmap-to-span", log.Fields{"textMap": spanTextMap, "error": err})
826 break
827 }
828
829 var receivedRpcName string
830 extractBaggage := func(k, v string) bool {
831 if k == "rpc-span-name" {
832 receivedRpcName = v
833 return false
834 }
835 return true
836 }
837
838 spanContext.ForeachBaggageItem(extractBaggage)
839
840 return opentracing.StartSpanFromContext(ctx, receivedRpcName, opentracing.FollowsFrom(spanContext))
841 }
842 }
843
844 // Create new Child Span with rpc as name if no span details were received in kafka arguments
845 var spanName strings.Builder
846 spanName.WriteString("kafka-")
847
848 // In case of inter adapter message, use Msg Type for constructing RPC name
849 if rpcName == "process_inter_adapter_message" {
850 for _, arg := range args {
851 if arg.Key == "msg" {
852 iamsg := ic.InterAdapterMessage{}
853 if err := ptypes.UnmarshalAny(arg.Value, &iamsg); err == nil {
854 spanName.WriteString("inter-adapter-")
855 rpcName = iamsg.Header.Type.String()
856 }
857 }
858 }
859 }
860
861 spanName.WriteString("rpc-")
862 spanName.WriteString(rpcName)
863
864 return opentracing.StartSpanFromContext(ctx, spanName.String())
865}
866
Neha Sharma94f16a92020-06-26 04:17:55 +0000867func (kp *interContainerProxy) handleMessage(ctx context.Context, msg *ic.InterContainerMessage, targetInterface interface{}) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700868
869 // First extract the header to know whether this is a request - responses are handled by a different handler
870 if msg.Header.Type == ic.MessageType_REQUEST {
871 var out []reflect.Value
872 var err error
873
874 // Get the request body
875 requestBody := &ic.InterContainerRequestBody{}
876 if err = ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000877 logger.Warnw(ctx, "cannot-unmarshal-request", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700878 } else {
Girish Kumar74240652020-07-10 11:54:28 +0000879 span, ctx := kp.enrichContextWithSpan(ctx, requestBody.Rpc, requestBody.Args)
880 defer span.Finish()
881
Neha Sharma94f16a92020-06-26 04:17:55 +0000882 logger.Debugw(ctx, "received-request", log.Fields{"rpc": requestBody.Rpc, "header": msg.Header})
Scott Baker2c1c4822019-10-16 11:02:41 -0700883 // let the callee unpack the arguments as its the only one that knows the real proto type
884 // Augment the requestBody with the message Id as it will be used in scenarios where cores
885 // are set in pairs and competing
Neha Sharma94f16a92020-06-26 04:17:55 +0000886 requestBody.Args = kp.addTransactionId(ctx, msg.Header.Id, requestBody.Args)
Scott Baker2c1c4822019-10-16 11:02:41 -0700887
888 // Augment the requestBody with the From topic name as it will be used in scenarios where a container
889 // needs to send an unsollicited message to the currently requested container
Neha Sharma94f16a92020-06-26 04:17:55 +0000890 requestBody.Args = kp.addFromTopic(ctx, msg.Header.FromTopic, requestBody.Args)
Scott Baker2c1c4822019-10-16 11:02:41 -0700891
Neha Sharma94f16a92020-06-26 04:17:55 +0000892 out, err = CallFuncByName(ctx, targetInterface, requestBody.Rpc, requestBody.Args)
Scott Baker2c1c4822019-10-16 11:02:41 -0700893 if err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000894 logger.Warn(ctx, err)
Scott Baker2c1c4822019-10-16 11:02:41 -0700895 }
896 }
897 // Response required?
898 if requestBody.ResponseRequired {
899 // If we already have an error before then just return that
900 var returnError *ic.Error
901 var returnedValues []interface{}
902 var success bool
903 if err != nil {
904 returnError = &ic.Error{Reason: err.Error()}
905 returnedValues = make([]interface{}, 1)
906 returnedValues[0] = returnError
907 } else {
908 returnedValues = make([]interface{}, 0)
909 // Check for errors first
910 lastIndex := len(out) - 1
911 if out[lastIndex].Interface() != nil { // Error
912 if retError, ok := out[lastIndex].Interface().(error); ok {
913 if retError.Error() == ErrorTransactionNotAcquired.Error() {
Neha Sharma94f16a92020-06-26 04:17:55 +0000914 logger.Debugw(ctx, "Ignoring request", log.Fields{"error": retError, "txId": msg.Header.Id})
Scott Baker2c1c4822019-10-16 11:02:41 -0700915 return // Ignore - process is in competing mode and ignored transaction
916 }
917 returnError = &ic.Error{Reason: retError.Error()}
918 returnedValues = append(returnedValues, returnError)
919 } else { // Should never happen
920 returnError = &ic.Error{Reason: "incorrect-error-returns"}
921 returnedValues = append(returnedValues, returnError)
922 }
923 } else if len(out) == 2 && reflect.ValueOf(out[0].Interface()).IsValid() && reflect.ValueOf(out[0].Interface()).IsNil() {
Neha Sharma94f16a92020-06-26 04:17:55 +0000924 logger.Warnw(ctx, "Unexpected response of (nil,nil)", log.Fields{"txId": msg.Header.Id})
Scott Baker2c1c4822019-10-16 11:02:41 -0700925 return // Ignore - should not happen
926 } else { // Non-error case
927 success = true
928 for idx, val := range out {
Neha Sharma94f16a92020-06-26 04:17:55 +0000929 //logger.Debugw(ctx, "returned-api-response-loop", log.Fields{"idx": idx, "val": val.Interface()})
Scott Baker2c1c4822019-10-16 11:02:41 -0700930 if idx != lastIndex {
931 returnedValues = append(returnedValues, val.Interface())
932 }
933 }
934 }
935 }
936
937 var icm *ic.InterContainerMessage
Neha Sharma94f16a92020-06-26 04:17:55 +0000938 if icm, err = encodeResponse(ctx, msg, success, returnedValues...); err != nil {
939 logger.Warnw(ctx, "error-encoding-response-returning-failure-result", log.Fields{"error": err})
940 icm = encodeDefaultFailedResponse(ctx, msg)
Scott Baker2c1c4822019-10-16 11:02:41 -0700941 }
942 // To preserve ordering of messages, all messages to a given topic are sent to the same partition
943 // by providing a message key. The key is encoded in the topic name. If the deviceId is not
944 // present then the key will be empty, hence all messages for a given topic will be sent to all
945 // partitions.
946 replyTopic := &Topic{Name: msg.Header.FromTopic}
947 key := msg.Header.KeyTopic
Neha Sharma94f16a92020-06-26 04:17:55 +0000948 logger.Debugw(ctx, "sending-response-to-kafka", log.Fields{"rpc": requestBody.Rpc, "header": icm.Header, "key": key})
Scott Baker2c1c4822019-10-16 11:02:41 -0700949 // TODO: handle error response.
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800950 go func() {
Neha Sharma94f16a92020-06-26 04:17:55 +0000951 if err := kp.kafkaClient.Send(ctx, icm, replyTopic, key); err != nil {
952 logger.Errorw(ctx, "send-reply-failed", log.Fields{
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800953 "topic": replyTopic,
954 "key": key,
955 "error": err})
956 }
957 }()
Scott Baker2c1c4822019-10-16 11:02:41 -0700958 }
959 } else if msg.Header.Type == ic.MessageType_RESPONSE {
Neha Sharma94f16a92020-06-26 04:17:55 +0000960 logger.Debugw(ctx, "response-received", log.Fields{"msg-header": msg.Header})
961 go kp.dispatchResponse(ctx, msg)
Scott Baker2c1c4822019-10-16 11:02:41 -0700962 } else {
Neha Sharma94f16a92020-06-26 04:17:55 +0000963 logger.Warnw(ctx, "unsupported-message-received", log.Fields{"msg-header": msg.Header})
Scott Baker2c1c4822019-10-16 11:02:41 -0700964 }
965}
966
Neha Sharma94f16a92020-06-26 04:17:55 +0000967func (kp *interContainerProxy) waitForMessages(ctx context.Context, ch <-chan *ic.InterContainerMessage, topic Topic, targetInterface interface{}) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700968 // Wait for messages
969 for msg := range ch {
Neha Sharma94f16a92020-06-26 04:17:55 +0000970 //logger.Debugw(ctx, "request-received", log.Fields{"msg": msg, "topic": topic.Name, "target": targetInterface})
971 go kp.handleMessage(context.Background(), msg, targetInterface)
Scott Baker2c1c4822019-10-16 11:02:41 -0700972 }
973}
974
Neha Sharma94f16a92020-06-26 04:17:55 +0000975func (kp *interContainerProxy) dispatchResponse(ctx context.Context, msg *ic.InterContainerMessage) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700976 kp.lockTransactionIdToChannelMap.RLock()
977 defer kp.lockTransactionIdToChannelMap.RUnlock()
978 if _, exist := kp.transactionIdToChannelMap[msg.Header.Id]; !exist {
Neha Sharma94f16a92020-06-26 04:17:55 +0000979 logger.Debugw(ctx, "no-waiting-channel", log.Fields{"transaction": msg.Header.Id})
Scott Baker2c1c4822019-10-16 11:02:41 -0700980 return
981 }
982 kp.transactionIdToChannelMap[msg.Header.Id].ch <- msg
983}
984
985// subscribeForResponse allows a caller to subscribe to a given topic when waiting for a response.
986// This method is built to prevent all subscribers to receive all messages as is the case of the Subscribe
987// API. There is one response channel waiting for kafka messages before dispatching the message to the
988// corresponding waiting channel
Neha Sharma94f16a92020-06-26 04:17:55 +0000989func (kp *interContainerProxy) subscribeForResponse(ctx context.Context, topic Topic, trnsId string) (chan *ic.InterContainerMessage, error) {
990 logger.Debugw(ctx, "subscribeForResponse", log.Fields{"topic": topic.Name, "trnsid": trnsId})
Scott Baker2c1c4822019-10-16 11:02:41 -0700991
992 // Create a specific channel for this consumers. We cannot use the channel from the kafkaclient as it will
993 // broadcast any message for this topic to all channels waiting on it.
Scott Bakerae1d4702020-03-04 14:10:51 -0800994 // Set channel size to 1 to prevent deadlock, see VOL-2708
995 ch := make(chan *ic.InterContainerMessage, 1)
Scott Baker2c1c4822019-10-16 11:02:41 -0700996 kp.addToTransactionIdToChannelMap(trnsId, &topic, ch)
997
998 return ch, nil
999}
1000
Neha Sharma94f16a92020-06-26 04:17:55 +00001001func (kp *interContainerProxy) unSubscribeForResponse(ctx context.Context, trnsId string) error {
1002 logger.Debugw(ctx, "unsubscribe-for-response", log.Fields{"trnsId": trnsId})
Scott Baker2c1c4822019-10-16 11:02:41 -07001003 kp.deleteFromTransactionIdToChannelMap(trnsId)
1004 return nil
1005}
1006
Neha Sharma94f16a92020-06-26 04:17:55 +00001007func (kp *interContainerProxy) EnableLivenessChannel(ctx context.Context, enable bool) chan bool {
1008 return kp.kafkaClient.EnableLivenessChannel(ctx, enable)
Scott Baker104b67d2019-10-29 15:56:27 -07001009}
1010
Neha Sharma94f16a92020-06-26 04:17:55 +00001011func (kp *interContainerProxy) EnableHealthinessChannel(ctx context.Context, enable bool) chan bool {
1012 return kp.kafkaClient.EnableHealthinessChannel(ctx, enable)
Scott Baker0fef6982019-12-12 09:49:42 -08001013}
1014
Neha Sharma94f16a92020-06-26 04:17:55 +00001015func (kp *interContainerProxy) SendLiveness(ctx context.Context) error {
1016 return kp.kafkaClient.SendLiveness(ctx)
Scott Baker104b67d2019-10-29 15:56:27 -07001017}
1018
Scott Baker2c1c4822019-10-16 11:02:41 -07001019//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
1020//or an error on failure
Neha Sharma94f16a92020-06-26 04:17:55 +00001021func encodeRequest(ctx context.Context, rpc string, toTopic *Topic, replyTopic *Topic, key string, kvArgs ...*KVArg) (*ic.InterContainerMessage, error) {
Scott Baker2c1c4822019-10-16 11:02:41 -07001022 requestHeader := &ic.Header{
1023 Id: uuid.New().String(),
1024 Type: ic.MessageType_REQUEST,
1025 FromTopic: replyTopic.Name,
1026 ToTopic: toTopic.Name,
1027 KeyTopic: key,
Scott Baker84a55ce2020-04-17 10:11:30 -07001028 Timestamp: ptypes.TimestampNow(),
Scott Baker2c1c4822019-10-16 11:02:41 -07001029 }
1030 requestBody := &ic.InterContainerRequestBody{
1031 Rpc: rpc,
1032 ResponseRequired: true,
1033 ReplyToTopic: replyTopic.Name,
1034 }
1035
1036 for _, arg := range kvArgs {
1037 if arg == nil {
1038 // In case the caller sends an array with empty args
1039 continue
1040 }
1041 var marshalledArg *any.Any
1042 var err error
1043 // ascertain the value interface type is a proto.Message
1044 protoValue, ok := arg.Value.(proto.Message)
1045 if !ok {
Neha Sharma94f16a92020-06-26 04:17:55 +00001046 logger.Warnw(ctx, "argument-value-not-proto-message", log.Fields{"error": ok, "Value": arg.Value})
Scott Baker2c1c4822019-10-16 11:02:41 -07001047 err := errors.New("argument-value-not-proto-message")
1048 return nil, err
1049 }
1050 if marshalledArg, err = ptypes.MarshalAny(protoValue); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +00001051 logger.Warnw(ctx, "cannot-marshal-request", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -07001052 return nil, err
1053 }
1054 protoArg := &ic.Argument{
1055 Key: arg.Key,
1056 Value: marshalledArg,
1057 }
1058 requestBody.Args = append(requestBody.Args, protoArg)
1059 }
1060
1061 var marshalledData *any.Any
1062 var err error
1063 if marshalledData, err = ptypes.MarshalAny(requestBody); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +00001064 logger.Warnw(ctx, "cannot-marshal-request", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -07001065 return nil, err
1066 }
1067 request := &ic.InterContainerMessage{
1068 Header: requestHeader,
1069 Body: marshalledData,
1070 }
1071 return request, nil
1072}
1073
Neha Sharma94f16a92020-06-26 04:17:55 +00001074func decodeResponse(ctx context.Context, response *ic.InterContainerMessage) (*ic.InterContainerResponseBody, error) {
Scott Baker2c1c4822019-10-16 11:02:41 -07001075 // Extract the message body
1076 responseBody := ic.InterContainerResponseBody{}
1077 if err := ptypes.UnmarshalAny(response.Body, &responseBody); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +00001078 logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -07001079 return nil, err
1080 }
Neha Sharma94f16a92020-06-26 04:17:55 +00001081 //logger.Debugw(ctx, "response-decoded-successfully", log.Fields{"response-status": &responseBody.Success})
Scott Baker2c1c4822019-10-16 11:02:41 -07001082
1083 return &responseBody, nil
1084
1085}