blob: 368391e4814174f5c57bb9d863724a5e9d9f7d5e [file] [log] [blame]
Scott Baker2c1c4822019-10-16 11:02:41 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kafka
17
18import (
19 "context"
20 "errors"
21 "fmt"
Scott Baker2c1c4822019-10-16 11:02:41 -070022 "reflect"
23 "strings"
24 "sync"
25 "time"
serkant.uluderyab38671c2019-11-01 09:35:38 -070026
David Bainbridgece5e11a2020-06-23 12:41:16 -070027 "google.golang.org/grpc/codes"
28 "google.golang.org/grpc/status"
29
serkant.uluderyab38671c2019-11-01 09:35:38 -070030 "github.com/golang/protobuf/proto"
31 "github.com/golang/protobuf/ptypes"
32 "github.com/golang/protobuf/ptypes/any"
33 "github.com/google/uuid"
34 "github.com/opencord/voltha-lib-go/v3/pkg/log"
35 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
Scott Baker2c1c4822019-10-16 11:02:41 -070036)
37
Scott Baker2c1c4822019-10-16 11:02:41 -070038const (
39 DefaultMaxRetries = 3
Matteo Scandoloed128822020-02-10 15:52:35 -080040 DefaultRequestTimeout = 60000 // 60000 milliseconds - to handle a wider latency range
Scott Baker2c1c4822019-10-16 11:02:41 -070041)
42
43const (
44 TransactionKey = "transactionID"
45 FromTopic = "fromTopic"
46)
47
48var ErrorTransactionNotAcquired = errors.New("transaction-not-acquired")
49var ErrorTransactionInvalidId = errors.New("transaction-invalid-id")
50
51// requestHandlerChannel represents an interface associated with a channel. Whenever, an event is
52// obtained from that channel, this interface is invoked. This is used to handle
53// async requests into the Core via the kafka messaging bus
54type requestHandlerChannel struct {
55 requesthandlerInterface interface{}
56 ch <-chan *ic.InterContainerMessage
57}
58
59// transactionChannel represents a combination of a topic and a channel onto which a response received
60// on the kafka bus will be sent to
61type transactionChannel struct {
62 topic *Topic
63 ch chan *ic.InterContainerMessage
64}
65
Matteo Scandolo2ba00d32020-01-16 17:33:03 -080066type InterContainerProxy interface {
Neha Sharma94f16a92020-06-26 04:17:55 +000067 Start(ctx context.Context) error
68 Stop(ctx context.Context)
Matteo Scandolof346a2d2020-01-24 13:14:54 -080069 GetDefaultTopic() *Topic
Matteo Scandolo2ba00d32020-01-16 17:33:03 -080070 InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic, waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any)
Matteo Scandoloed128822020-02-10 15:52:35 -080071 InvokeAsyncRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic, waitForResponse bool, key string, kvArgs ...*KVArg) chan *RpcResponse
Neha Sharma94f16a92020-06-26 04:17:55 +000072 SubscribeWithRequestHandlerInterface(ctx context.Context, topic Topic, handler interface{}) error
73 SubscribeWithDefaultRequestHandler(ctx context.Context, topic Topic, initialOffset int64) error
74 UnSubscribeFromRequestHandler(ctx context.Context, topic Topic) error
75 DeleteTopic(ctx context.Context, topic Topic) error
76 EnableLivenessChannel(ctx context.Context, enable bool) chan bool
77 SendLiveness(ctx context.Context) error
Matteo Scandolo2ba00d32020-01-16 17:33:03 -080078}
79
80// interContainerProxy represents the messaging proxy
81type interContainerProxy struct {
Neha Sharmadd9af392020-04-28 09:03:57 +000082 kafkaAddress string
Matteo Scandolof346a2d2020-01-24 13:14:54 -080083 defaultTopic *Topic
Scott Baker2c1c4822019-10-16 11:02:41 -070084 defaultRequestHandlerInterface interface{}
Scott Baker2c1c4822019-10-16 11:02:41 -070085 kafkaClient Client
Kent Hagerman3a402302020-01-31 15:03:53 -050086 doneCh chan struct{}
87 doneOnce sync.Once
Scott Baker2c1c4822019-10-16 11:02:41 -070088
89 // This map is used to map a topic to an interface and channel. When a request is received
90 // on that channel (registered to the topic) then that interface is invoked.
91 topicToRequestHandlerChannelMap map[string]*requestHandlerChannel
92 lockTopicRequestHandlerChannelMap sync.RWMutex
93
94 // This map is used to map a channel to a response topic. This channel handles all responses on that
95 // channel for that topic and forward them to the appropriate consumers channel, using the
96 // transactionIdToChannelMap.
97 topicToResponseChannelMap map[string]<-chan *ic.InterContainerMessage
98 lockTopicResponseChannelMap sync.RWMutex
99
100 // This map is used to map a transaction to a consumers channel. This is used whenever a request has been
101 // sent out and we are waiting for a response.
102 transactionIdToChannelMap map[string]*transactionChannel
103 lockTransactionIdToChannelMap sync.RWMutex
104}
105
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800106type InterContainerProxyOption func(*interContainerProxy)
Scott Baker2c1c4822019-10-16 11:02:41 -0700107
Neha Sharmadd9af392020-04-28 09:03:57 +0000108func InterContainerAddress(address string) InterContainerProxyOption {
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800109 return func(args *interContainerProxy) {
Neha Sharmadd9af392020-04-28 09:03:57 +0000110 args.kafkaAddress = address
Scott Baker2c1c4822019-10-16 11:02:41 -0700111 }
112}
113
114func DefaultTopic(topic *Topic) InterContainerProxyOption {
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800115 return func(args *interContainerProxy) {
Matteo Scandolof346a2d2020-01-24 13:14:54 -0800116 args.defaultTopic = topic
Scott Baker2c1c4822019-10-16 11:02:41 -0700117 }
118}
119
Scott Baker2c1c4822019-10-16 11:02:41 -0700120func RequestHandlerInterface(handler interface{}) InterContainerProxyOption {
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800121 return func(args *interContainerProxy) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700122 args.defaultRequestHandlerInterface = handler
123 }
124}
125
126func MsgClient(client Client) InterContainerProxyOption {
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800127 return func(args *interContainerProxy) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700128 args.kafkaClient = client
129 }
130}
131
Kent Hagerman3a402302020-01-31 15:03:53 -0500132func newInterContainerProxy(opts ...InterContainerProxyOption) *interContainerProxy {
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800133 proxy := &interContainerProxy{
Neha Sharmadd9af392020-04-28 09:03:57 +0000134 kafkaAddress: DefaultKafkaAddress,
135 doneCh: make(chan struct{}),
Scott Baker2c1c4822019-10-16 11:02:41 -0700136 }
137
138 for _, option := range opts {
139 option(proxy)
140 }
141
Kent Hagerman3a402302020-01-31 15:03:53 -0500142 return proxy
Scott Baker2c1c4822019-10-16 11:02:41 -0700143}
144
Kent Hagerman3a402302020-01-31 15:03:53 -0500145func NewInterContainerProxy(opts ...InterContainerProxyOption) InterContainerProxy {
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800146 return newInterContainerProxy(opts...)
147}
148
Neha Sharma94f16a92020-06-26 04:17:55 +0000149func (kp *interContainerProxy) Start(ctx context.Context) error {
150 logger.Info(ctx, "Starting-Proxy")
Scott Baker2c1c4822019-10-16 11:02:41 -0700151
152 // Kafka MsgClient should already have been created. If not, output fatal error
153 if kp.kafkaClient == nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000154 logger.Fatal(ctx, "kafka-client-not-set")
Scott Baker2c1c4822019-10-16 11:02:41 -0700155 }
156
Scott Baker2c1c4822019-10-16 11:02:41 -0700157 // Start the kafka client
Neha Sharma94f16a92020-06-26 04:17:55 +0000158 if err := kp.kafkaClient.Start(ctx); err != nil {
159 logger.Errorw(ctx, "Cannot-create-kafka-proxy", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700160 return err
161 }
162
163 // Create the topic to response channel map
164 kp.topicToResponseChannelMap = make(map[string]<-chan *ic.InterContainerMessage)
165 //
166 // Create the transactionId to Channel Map
167 kp.transactionIdToChannelMap = make(map[string]*transactionChannel)
168
169 // Create the topic to request channel map
170 kp.topicToRequestHandlerChannelMap = make(map[string]*requestHandlerChannel)
171
172 return nil
173}
174
Neha Sharma94f16a92020-06-26 04:17:55 +0000175func (kp *interContainerProxy) Stop(ctx context.Context) {
176 logger.Info(ctx, "stopping-intercontainer-proxy")
Kent Hagerman3a402302020-01-31 15:03:53 -0500177 kp.doneOnce.Do(func() { close(kp.doneCh) })
Scott Baker2c1c4822019-10-16 11:02:41 -0700178 // TODO : Perform cleanup
Neha Sharma94f16a92020-06-26 04:17:55 +0000179 kp.kafkaClient.Stop(ctx)
180 err := kp.deleteAllTopicRequestHandlerChannelMap(ctx)
Scott Bakera2da2f42020-02-20 16:27:34 -0800181 if err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000182 logger.Errorw(ctx, "failed-delete-all-topic-request-handler-channel-map", log.Fields{"error": err})
Scott Bakera2da2f42020-02-20 16:27:34 -0800183 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000184 err = kp.deleteAllTopicResponseChannelMap(ctx)
Scott Bakera2da2f42020-02-20 16:27:34 -0800185 if err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000186 logger.Errorw(ctx, "failed-delete-all-topic-response-channel-map", log.Fields{"error": err})
Scott Bakera2da2f42020-02-20 16:27:34 -0800187 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000188 kp.deleteAllTransactionIdToChannelMap(ctx)
Scott Baker2c1c4822019-10-16 11:02:41 -0700189}
190
Matteo Scandolof346a2d2020-01-24 13:14:54 -0800191func (kp *interContainerProxy) GetDefaultTopic() *Topic {
192 return kp.defaultTopic
193}
194
Matteo Scandoloed128822020-02-10 15:52:35 -0800195// InvokeAsyncRPC is used to make an RPC request asynchronously
196func (kp *interContainerProxy) InvokeAsyncRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
197 waitForResponse bool, key string, kvArgs ...*KVArg) chan *RpcResponse {
198
Neha Sharma94f16a92020-06-26 04:17:55 +0000199 logger.Debugw(ctx, "InvokeAsyncRPC", log.Fields{"rpc": rpc, "key": key})
Matteo Scandoloed128822020-02-10 15:52:35 -0800200 // If a replyToTopic is provided then we use it, otherwise just use the default toTopic. The replyToTopic is
201 // typically the device ID.
202 responseTopic := replyToTopic
203 if responseTopic == nil {
204 responseTopic = kp.GetDefaultTopic()
205 }
206
207 chnl := make(chan *RpcResponse)
208
209 go func() {
210
211 // once we're done,
212 // close the response channel
213 defer close(chnl)
214
215 var err error
216 var protoRequest *ic.InterContainerMessage
217
218 // Encode the request
Neha Sharma94f16a92020-06-26 04:17:55 +0000219 protoRequest, err = encodeRequest(ctx, rpc, toTopic, responseTopic, key, kvArgs...)
Matteo Scandoloed128822020-02-10 15:52:35 -0800220 if err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000221 logger.Warnw(ctx, "cannot-format-request", log.Fields{"rpc": rpc, "error": err})
Matteo Scandoloed128822020-02-10 15:52:35 -0800222 chnl <- NewResponse(RpcFormattingError, err, nil)
223 return
224 }
225
226 // Subscribe for response, if needed, before sending request
227 var ch <-chan *ic.InterContainerMessage
Neha Sharma94f16a92020-06-26 04:17:55 +0000228 if ch, err = kp.subscribeForResponse(ctx, *responseTopic, protoRequest.Header.Id); err != nil {
229 logger.Errorw(ctx, "failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
Matteo Scandoloed128822020-02-10 15:52:35 -0800230 chnl <- NewResponse(RpcTransportError, err, nil)
231 return
232 }
233
234 // Send request - if the topic is formatted with a device Id then we will send the request using a
235 // specific key, hence ensuring a single partition is used to publish the request. This ensures that the
236 // subscriber on that topic will receive the request in the order it was sent. The key used is the deviceId.
Neha Sharma94f16a92020-06-26 04:17:55 +0000237 logger.Debugw(ctx, "sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
Matteo Scandoloed128822020-02-10 15:52:35 -0800238
239 // if the message is not sent on kafka publish an event an close the channel
Neha Sharma94f16a92020-06-26 04:17:55 +0000240 if err = kp.kafkaClient.Send(ctx, protoRequest, toTopic, key); err != nil {
Matteo Scandoloed128822020-02-10 15:52:35 -0800241 chnl <- NewResponse(RpcTransportError, err, nil)
242 return
243 }
244
245 // if the client is not waiting for a response send the ack and close the channel
246 chnl <- NewResponse(RpcSent, nil, nil)
247 if !waitForResponse {
248 return
249 }
250
251 defer func() {
252 // Remove the subscription for a response on return
Neha Sharma94f16a92020-06-26 04:17:55 +0000253 if err := kp.unSubscribeForResponse(ctx, protoRequest.Header.Id); err != nil {
254 logger.Warnw(ctx, "invoke-async-rpc-unsubscriber-for-response-failed", log.Fields{"err": err})
Matteo Scandoloed128822020-02-10 15:52:35 -0800255 }
256 }()
257
258 // Wait for response as well as timeout or cancellation
259 select {
260 case msg, ok := <-ch:
261 if !ok {
Neha Sharma94f16a92020-06-26 04:17:55 +0000262 logger.Warnw(ctx, "channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
Matteo Scandoloed128822020-02-10 15:52:35 -0800263 chnl <- NewResponse(RpcTransportError, status.Error(codes.Aborted, "channel closed"), nil)
264 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000265 logger.Debugw(ctx, "received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
266 if responseBody, err := decodeResponse(ctx, msg); err != nil {
Matteo Scandoloed128822020-02-10 15:52:35 -0800267 chnl <- NewResponse(RpcReply, err, nil)
268 } else {
269 if responseBody.Success {
270 chnl <- NewResponse(RpcReply, nil, responseBody.Result)
271 } else {
272 // response body contains an error
273 unpackErr := &ic.Error{}
274 if err := ptypes.UnmarshalAny(responseBody.Result, unpackErr); err != nil {
275 chnl <- NewResponse(RpcReply, err, nil)
276 } else {
277 chnl <- NewResponse(RpcReply, status.Error(codes.Internal, unpackErr.Reason), nil)
278 }
279 }
280 }
281 case <-ctx.Done():
Neha Sharma94f16a92020-06-26 04:17:55 +0000282 logger.Errorw(ctx, "context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
Matteo Scandoloed128822020-02-10 15:52:35 -0800283 err := status.Error(codes.DeadlineExceeded, ctx.Err().Error())
284 chnl <- NewResponse(RpcTimeout, err, nil)
285 case <-kp.doneCh:
286 chnl <- NewResponse(RpcSystemClosing, nil, nil)
Neha Sharma94f16a92020-06-26 04:17:55 +0000287 logger.Warnw(ctx, "received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
Matteo Scandoloed128822020-02-10 15:52:35 -0800288 }
289 }()
290 return chnl
291}
292
Scott Baker2c1c4822019-10-16 11:02:41 -0700293// InvokeRPC is used to send a request to a given topic
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800294func (kp *interContainerProxy) InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
Scott Baker2c1c4822019-10-16 11:02:41 -0700295 waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any) {
296
297 // If a replyToTopic is provided then we use it, otherwise just use the default toTopic. The replyToTopic is
298 // typically the device ID.
299 responseTopic := replyToTopic
300 if responseTopic == nil {
Matteo Scandolof346a2d2020-01-24 13:14:54 -0800301 responseTopic = kp.defaultTopic
Scott Baker2c1c4822019-10-16 11:02:41 -0700302 }
303
304 // Encode the request
Neha Sharma94f16a92020-06-26 04:17:55 +0000305 protoRequest, err := encodeRequest(ctx, rpc, toTopic, responseTopic, key, kvArgs...)
Scott Baker2c1c4822019-10-16 11:02:41 -0700306 if err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000307 logger.Warnw(ctx, "cannot-format-request", log.Fields{"rpc": rpc, "error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700308 return false, nil
309 }
310
311 // Subscribe for response, if needed, before sending request
312 var ch <-chan *ic.InterContainerMessage
313 if waitForResponse {
314 var err error
Neha Sharma94f16a92020-06-26 04:17:55 +0000315 if ch, err = kp.subscribeForResponse(ctx, *responseTopic, protoRequest.Header.Id); err != nil {
316 logger.Errorw(ctx, "failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700317 }
318 }
319
320 // Send request - if the topic is formatted with a device Id then we will send the request using a
321 // specific key, hence ensuring a single partition is used to publish the request. This ensures that the
322 // subscriber on that topic will receive the request in the order it was sent. The key used is the deviceId.
323 //key := GetDeviceIdFromTopic(*toTopic)
Neha Sharma94f16a92020-06-26 04:17:55 +0000324 logger.Debugw(ctx, "sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800325 go func() {
Neha Sharma94f16a92020-06-26 04:17:55 +0000326 if err := kp.kafkaClient.Send(ctx, protoRequest, toTopic, key); err != nil {
327 logger.Errorw(ctx, "send-failed", log.Fields{
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800328 "topic": toTopic,
329 "key": key,
330 "error": err})
331 }
332 }()
Scott Baker2c1c4822019-10-16 11:02:41 -0700333
334 if waitForResponse {
335 // Create a child context based on the parent context, if any
336 var cancel context.CancelFunc
337 childCtx := context.Background()
338 if ctx == nil {
339 ctx, cancel = context.WithTimeout(context.Background(), DefaultRequestTimeout*time.Millisecond)
340 } else {
341 childCtx, cancel = context.WithTimeout(ctx, DefaultRequestTimeout*time.Millisecond)
342 }
343 defer cancel()
344
345 // Wait for response as well as timeout or cancellation
346 // Remove the subscription for a response on return
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800347 defer func() {
Neha Sharma94f16a92020-06-26 04:17:55 +0000348 if err := kp.unSubscribeForResponse(ctx, protoRequest.Header.Id); err != nil {
349 logger.Errorw(ctx, "response-unsubscribe-failed", log.Fields{
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800350 "id": protoRequest.Header.Id,
351 "error": err})
352 }
353 }()
Scott Baker2c1c4822019-10-16 11:02:41 -0700354 select {
355 case msg, ok := <-ch:
356 if !ok {
Neha Sharma94f16a92020-06-26 04:17:55 +0000357 logger.Warnw(ctx, "channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700358 protoError := &ic.Error{Reason: "channel-closed"}
359 var marshalledArg *any.Any
360 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
361 return false, nil // Should never happen
362 }
363 return false, marshalledArg
364 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000365 logger.Debugw(ctx, "received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
Scott Baker2c1c4822019-10-16 11:02:41 -0700366 var responseBody *ic.InterContainerResponseBody
367 var err error
Neha Sharma94f16a92020-06-26 04:17:55 +0000368 if responseBody, err = decodeResponse(ctx, msg); err != nil {
369 logger.Errorw(ctx, "decode-response-error", log.Fields{"error": err})
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800370 // FIXME we should return something
Scott Baker2c1c4822019-10-16 11:02:41 -0700371 }
372 return responseBody.Success, responseBody.Result
373 case <-ctx.Done():
Neha Sharma94f16a92020-06-26 04:17:55 +0000374 logger.Debugw(ctx, "context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
Scott Baker2c1c4822019-10-16 11:02:41 -0700375 // pack the error as proto any type
Matteo Scandolob45cf592020-01-21 16:10:56 -0800376 protoError := &ic.Error{Reason: ctx.Err().Error(), Code: ic.ErrorCode_DEADLINE_EXCEEDED}
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800377
Scott Baker2c1c4822019-10-16 11:02:41 -0700378 var marshalledArg *any.Any
379 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
380 return false, nil // Should never happen
381 }
382 return false, marshalledArg
383 case <-childCtx.Done():
Neha Sharma94f16a92020-06-26 04:17:55 +0000384 logger.Debugw(ctx, "context-cancelled", log.Fields{"rpc": rpc, "ctx": childCtx.Err()})
Scott Baker2c1c4822019-10-16 11:02:41 -0700385 // pack the error as proto any type
Matteo Scandolob45cf592020-01-21 16:10:56 -0800386 protoError := &ic.Error{Reason: childCtx.Err().Error(), Code: ic.ErrorCode_DEADLINE_EXCEEDED}
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800387
Scott Baker2c1c4822019-10-16 11:02:41 -0700388 var marshalledArg *any.Any
389 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
390 return false, nil // Should never happen
391 }
392 return false, marshalledArg
393 case <-kp.doneCh:
Neha Sharma94f16a92020-06-26 04:17:55 +0000394 logger.Infow(ctx, "received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
Scott Baker2c1c4822019-10-16 11:02:41 -0700395 return true, nil
396 }
397 }
398 return true, nil
399}
400
401// SubscribeWithRequestHandlerInterface allows a caller to assign a target object to be invoked automatically
402// when a message is received on a given topic
Neha Sharma94f16a92020-06-26 04:17:55 +0000403func (kp *interContainerProxy) SubscribeWithRequestHandlerInterface(ctx context.Context, topic Topic, handler interface{}) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700404
405 // Subscribe to receive messages for that topic
406 var ch <-chan *ic.InterContainerMessage
407 var err error
Neha Sharma94f16a92020-06-26 04:17:55 +0000408 if ch, err = kp.kafkaClient.Subscribe(ctx, &topic); err != nil {
Scott Baker2c1c4822019-10-16 11:02:41 -0700409 //if ch, err = kp.Subscribe(topic); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000410 logger.Errorw(ctx, "failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700411 return err
412 }
413
414 kp.defaultRequestHandlerInterface = handler
415 kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: handler, ch: ch})
416 // Launch a go routine to receive and process kafka messages
Neha Sharma94f16a92020-06-26 04:17:55 +0000417 go kp.waitForMessages(ctx, ch, topic, handler)
Scott Baker2c1c4822019-10-16 11:02:41 -0700418
419 return nil
420}
421
422// SubscribeWithDefaultRequestHandler allows a caller to add a topic to an existing target object to be invoked automatically
423// when a message is received on a given topic. So far there is only 1 target registered per microservice
Neha Sharma94f16a92020-06-26 04:17:55 +0000424func (kp *interContainerProxy) SubscribeWithDefaultRequestHandler(ctx context.Context, topic Topic, initialOffset int64) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700425 // Subscribe to receive messages for that topic
426 var ch <-chan *ic.InterContainerMessage
427 var err error
Neha Sharma94f16a92020-06-26 04:17:55 +0000428 if ch, err = kp.kafkaClient.Subscribe(ctx, &topic, &KVArg{Key: Offset, Value: initialOffset}); err != nil {
429 logger.Errorw(ctx, "failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700430 return err
431 }
432 kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: kp.defaultRequestHandlerInterface, ch: ch})
433
434 // Launch a go routine to receive and process kafka messages
Neha Sharma94f16a92020-06-26 04:17:55 +0000435 go kp.waitForMessages(ctx, ch, topic, kp.defaultRequestHandlerInterface)
Scott Baker2c1c4822019-10-16 11:02:41 -0700436
437 return nil
438}
439
Neha Sharma94f16a92020-06-26 04:17:55 +0000440func (kp *interContainerProxy) UnSubscribeFromRequestHandler(ctx context.Context, topic Topic) error {
441 return kp.deleteFromTopicRequestHandlerChannelMap(ctx, topic.Name)
Scott Baker2c1c4822019-10-16 11:02:41 -0700442}
443
Neha Sharma94f16a92020-06-26 04:17:55 +0000444func (kp *interContainerProxy) deleteFromTopicResponseChannelMap(ctx context.Context, topic string) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700445 kp.lockTopicResponseChannelMap.Lock()
446 defer kp.lockTopicResponseChannelMap.Unlock()
447 if _, exist := kp.topicToResponseChannelMap[topic]; exist {
448 // Unsubscribe to this topic first - this will close the subscribed channel
449 var err error
Neha Sharma94f16a92020-06-26 04:17:55 +0000450 if err = kp.kafkaClient.UnSubscribe(ctx, &Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
451 logger.Errorw(ctx, "unsubscribing-error", log.Fields{"topic": topic})
Scott Baker2c1c4822019-10-16 11:02:41 -0700452 }
453 delete(kp.topicToResponseChannelMap, topic)
454 return err
455 } else {
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800456 return fmt.Errorf("%s-Topic-not-found", topic)
Scott Baker2c1c4822019-10-16 11:02:41 -0700457 }
458}
459
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800460// nolint: unused
Neha Sharma94f16a92020-06-26 04:17:55 +0000461func (kp *interContainerProxy) deleteAllTopicResponseChannelMap(ctx context.Context) error {
462 logger.Debug(ctx, "delete-all-topic-response-channel")
Scott Baker2c1c4822019-10-16 11:02:41 -0700463 kp.lockTopicResponseChannelMap.Lock()
464 defer kp.lockTopicResponseChannelMap.Unlock()
Scott Bakera2da2f42020-02-20 16:27:34 -0800465 var unsubscribeFailTopics []string
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800466 for topic := range kp.topicToResponseChannelMap {
Scott Baker2c1c4822019-10-16 11:02:41 -0700467 // Unsubscribe to this topic first - this will close the subscribed channel
Neha Sharma94f16a92020-06-26 04:17:55 +0000468 if err := kp.kafkaClient.UnSubscribe(ctx, &Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
Scott Bakera2da2f42020-02-20 16:27:34 -0800469 unsubscribeFailTopics = append(unsubscribeFailTopics, topic)
Neha Sharma94f16a92020-06-26 04:17:55 +0000470 logger.Errorw(ctx, "unsubscribing-error", log.Fields{"topic": topic, "error": err})
Scott Bakera2da2f42020-02-20 16:27:34 -0800471 // Do not return. Continue to try to unsubscribe to other topics.
472 } else {
473 // Only delete from channel map if successfully unsubscribed.
474 delete(kp.topicToResponseChannelMap, topic)
Scott Baker2c1c4822019-10-16 11:02:41 -0700475 }
Scott Baker2c1c4822019-10-16 11:02:41 -0700476 }
Scott Bakera2da2f42020-02-20 16:27:34 -0800477 if len(unsubscribeFailTopics) > 0 {
478 return fmt.Errorf("unsubscribe-errors: %v", unsubscribeFailTopics)
479 }
480 return nil
Scott Baker2c1c4822019-10-16 11:02:41 -0700481}
482
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800483func (kp *interContainerProxy) addToTopicRequestHandlerChannelMap(topic string, arg *requestHandlerChannel) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700484 kp.lockTopicRequestHandlerChannelMap.Lock()
485 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
486 if _, exist := kp.topicToRequestHandlerChannelMap[topic]; !exist {
487 kp.topicToRequestHandlerChannelMap[topic] = arg
488 }
489}
490
Neha Sharma94f16a92020-06-26 04:17:55 +0000491func (kp *interContainerProxy) deleteFromTopicRequestHandlerChannelMap(ctx context.Context, topic string) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700492 kp.lockTopicRequestHandlerChannelMap.Lock()
493 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
494 if _, exist := kp.topicToRequestHandlerChannelMap[topic]; exist {
495 // Close the kafka client client first by unsubscribing to this topic
Neha Sharma94f16a92020-06-26 04:17:55 +0000496 if err := kp.kafkaClient.UnSubscribe(ctx, &Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800497 return err
498 }
Scott Baker2c1c4822019-10-16 11:02:41 -0700499 delete(kp.topicToRequestHandlerChannelMap, topic)
500 return nil
501 } else {
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800502 return fmt.Errorf("%s-Topic-not-found", topic)
Scott Baker2c1c4822019-10-16 11:02:41 -0700503 }
504}
505
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800506// nolint: unused
Neha Sharma94f16a92020-06-26 04:17:55 +0000507func (kp *interContainerProxy) deleteAllTopicRequestHandlerChannelMap(ctx context.Context) error {
508 logger.Debug(ctx, "delete-all-topic-request-channel")
Scott Baker2c1c4822019-10-16 11:02:41 -0700509 kp.lockTopicRequestHandlerChannelMap.Lock()
510 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
Scott Bakera2da2f42020-02-20 16:27:34 -0800511 var unsubscribeFailTopics []string
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800512 for topic := range kp.topicToRequestHandlerChannelMap {
Scott Baker2c1c4822019-10-16 11:02:41 -0700513 // Close the kafka client client first by unsubscribing to this topic
Neha Sharma94f16a92020-06-26 04:17:55 +0000514 if err := kp.kafkaClient.UnSubscribe(ctx, &Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
Scott Bakera2da2f42020-02-20 16:27:34 -0800515 unsubscribeFailTopics = append(unsubscribeFailTopics, topic)
Neha Sharma94f16a92020-06-26 04:17:55 +0000516 logger.Errorw(ctx, "unsubscribing-error", log.Fields{"topic": topic, "error": err})
Scott Bakera2da2f42020-02-20 16:27:34 -0800517 // Do not return. Continue to try to unsubscribe to other topics.
518 } else {
519 // Only delete from channel map if successfully unsubscribed.
520 delete(kp.topicToRequestHandlerChannelMap, topic)
Scott Baker2c1c4822019-10-16 11:02:41 -0700521 }
Scott Baker2c1c4822019-10-16 11:02:41 -0700522 }
Scott Bakera2da2f42020-02-20 16:27:34 -0800523 if len(unsubscribeFailTopics) > 0 {
524 return fmt.Errorf("unsubscribe-errors: %v", unsubscribeFailTopics)
525 }
526 return nil
Scott Baker2c1c4822019-10-16 11:02:41 -0700527}
528
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800529func (kp *interContainerProxy) addToTransactionIdToChannelMap(id string, topic *Topic, arg chan *ic.InterContainerMessage) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700530 kp.lockTransactionIdToChannelMap.Lock()
531 defer kp.lockTransactionIdToChannelMap.Unlock()
532 if _, exist := kp.transactionIdToChannelMap[id]; !exist {
533 kp.transactionIdToChannelMap[id] = &transactionChannel{topic: topic, ch: arg}
534 }
535}
536
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800537func (kp *interContainerProxy) deleteFromTransactionIdToChannelMap(id string) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700538 kp.lockTransactionIdToChannelMap.Lock()
539 defer kp.lockTransactionIdToChannelMap.Unlock()
540 if transChannel, exist := kp.transactionIdToChannelMap[id]; exist {
541 // Close the channel first
542 close(transChannel.ch)
543 delete(kp.transactionIdToChannelMap, id)
544 }
545}
546
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800547func (kp *interContainerProxy) deleteTopicTransactionIdToChannelMap(id string) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700548 kp.lockTransactionIdToChannelMap.Lock()
549 defer kp.lockTransactionIdToChannelMap.Unlock()
550 for key, value := range kp.transactionIdToChannelMap {
551 if value.topic.Name == id {
552 close(value.ch)
553 delete(kp.transactionIdToChannelMap, key)
554 }
555 }
556}
557
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800558// nolint: unused
Neha Sharma94f16a92020-06-26 04:17:55 +0000559func (kp *interContainerProxy) deleteAllTransactionIdToChannelMap(ctx context.Context) {
560 logger.Debug(ctx, "delete-all-transaction-id-channel-map")
Scott Baker2c1c4822019-10-16 11:02:41 -0700561 kp.lockTransactionIdToChannelMap.Lock()
562 defer kp.lockTransactionIdToChannelMap.Unlock()
563 for key, value := range kp.transactionIdToChannelMap {
564 close(value.ch)
565 delete(kp.transactionIdToChannelMap, key)
566 }
567}
568
Neha Sharma94f16a92020-06-26 04:17:55 +0000569func (kp *interContainerProxy) DeleteTopic(ctx context.Context, topic Topic) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700570 // If we have any consumers on that topic we need to close them
Neha Sharma94f16a92020-06-26 04:17:55 +0000571 if err := kp.deleteFromTopicResponseChannelMap(ctx, topic.Name); err != nil {
572 logger.Errorw(ctx, "delete-from-topic-responsechannelmap-failed", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700573 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000574 if err := kp.deleteFromTopicRequestHandlerChannelMap(ctx, topic.Name); err != nil {
575 logger.Errorw(ctx, "delete-from-topic-requesthandlerchannelmap-failed", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700576 }
577 kp.deleteTopicTransactionIdToChannelMap(topic.Name)
578
Neha Sharma94f16a92020-06-26 04:17:55 +0000579 return kp.kafkaClient.DeleteTopic(ctx, &topic)
Scott Baker2c1c4822019-10-16 11:02:41 -0700580}
581
Neha Sharma94f16a92020-06-26 04:17:55 +0000582func encodeReturnedValue(ctx context.Context, returnedVal interface{}) (*any.Any, error) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700583 // Encode the response argument - needs to be a proto message
584 if returnedVal == nil {
585 return nil, nil
586 }
587 protoValue, ok := returnedVal.(proto.Message)
588 if !ok {
Neha Sharma94f16a92020-06-26 04:17:55 +0000589 logger.Warnw(ctx, "response-value-not-proto-message", log.Fields{"error": ok, "returnVal": returnedVal})
Scott Baker2c1c4822019-10-16 11:02:41 -0700590 err := errors.New("response-value-not-proto-message")
591 return nil, err
592 }
593
594 // Marshal the returned value, if any
595 var marshalledReturnedVal *any.Any
596 var err error
597 if marshalledReturnedVal, err = ptypes.MarshalAny(protoValue); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000598 logger.Warnw(ctx, "cannot-marshal-returned-val", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700599 return nil, err
600 }
601 return marshalledReturnedVal, nil
602}
603
Neha Sharma94f16a92020-06-26 04:17:55 +0000604func encodeDefaultFailedResponse(ctx context.Context, request *ic.InterContainerMessage) *ic.InterContainerMessage {
Scott Baker2c1c4822019-10-16 11:02:41 -0700605 responseHeader := &ic.Header{
606 Id: request.Header.Id,
607 Type: ic.MessageType_RESPONSE,
608 FromTopic: request.Header.ToTopic,
609 ToTopic: request.Header.FromTopic,
Scott Baker84a55ce2020-04-17 10:11:30 -0700610 Timestamp: ptypes.TimestampNow(),
Scott Baker2c1c4822019-10-16 11:02:41 -0700611 }
612 responseBody := &ic.InterContainerResponseBody{
613 Success: false,
614 Result: nil,
615 }
616 var marshalledResponseBody *any.Any
617 var err error
618 // Error should never happen here
619 if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000620 logger.Warnw(ctx, "cannot-marshal-failed-response-body", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700621 }
622
623 return &ic.InterContainerMessage{
624 Header: responseHeader,
625 Body: marshalledResponseBody,
626 }
627
628}
629
630//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
631//or an error on failure
Neha Sharma94f16a92020-06-26 04:17:55 +0000632func encodeResponse(ctx context.Context, request *ic.InterContainerMessage, success bool, returnedValues ...interface{}) (*ic.InterContainerMessage, error) {
633 //logger.Debugw(ctx, "encodeResponse", log.Fields{"success": success, "returnedValues": returnedValues})
Scott Baker2c1c4822019-10-16 11:02:41 -0700634 responseHeader := &ic.Header{
635 Id: request.Header.Id,
636 Type: ic.MessageType_RESPONSE,
637 FromTopic: request.Header.ToTopic,
638 ToTopic: request.Header.FromTopic,
639 KeyTopic: request.Header.KeyTopic,
Scott Baker84a55ce2020-04-17 10:11:30 -0700640 Timestamp: ptypes.TimestampNow(),
Scott Baker2c1c4822019-10-16 11:02:41 -0700641 }
642
643 // Go over all returned values
644 var marshalledReturnedVal *any.Any
645 var err error
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800646
647 // for now we support only 1 returned value - (excluding the error)
648 if len(returnedValues) > 0 {
Neha Sharma94f16a92020-06-26 04:17:55 +0000649 if marshalledReturnedVal, err = encodeReturnedValue(ctx, returnedValues[0]); err != nil {
650 logger.Warnw(ctx, "cannot-marshal-response-body", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700651 }
Scott Baker2c1c4822019-10-16 11:02:41 -0700652 }
653
654 responseBody := &ic.InterContainerResponseBody{
655 Success: success,
656 Result: marshalledReturnedVal,
657 }
658
659 // Marshal the response body
660 var marshalledResponseBody *any.Any
661 if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000662 logger.Warnw(ctx, "cannot-marshal-response-body", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700663 return nil, err
664 }
665
666 return &ic.InterContainerMessage{
667 Header: responseHeader,
668 Body: marshalledResponseBody,
669 }, nil
670}
671
Neha Sharma94f16a92020-06-26 04:17:55 +0000672func CallFuncByName(ctx context.Context, myClass interface{}, funcName string, params ...interface{}) (out []reflect.Value, err error) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700673 myClassValue := reflect.ValueOf(myClass)
674 // Capitalize the first letter in the funcName to workaround the first capital letters required to
675 // invoke a function from a different package
676 funcName = strings.Title(funcName)
677 m := myClassValue.MethodByName(funcName)
678 if !m.IsValid() {
679 return make([]reflect.Value, 0), fmt.Errorf("method-not-found \"%s\"", funcName)
680 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000681 in := make([]reflect.Value, len(params)+1)
682 in[0] = reflect.ValueOf(ctx)
Scott Baker2c1c4822019-10-16 11:02:41 -0700683 for i, param := range params {
Neha Sharma94f16a92020-06-26 04:17:55 +0000684 in[i+1] = reflect.ValueOf(param)
Scott Baker2c1c4822019-10-16 11:02:41 -0700685 }
686 out = m.Call(in)
687 return
688}
689
Neha Sharma94f16a92020-06-26 04:17:55 +0000690func (kp *interContainerProxy) addTransactionId(ctx context.Context, transactionId string, currentArgs []*ic.Argument) []*ic.Argument {
Scott Baker2c1c4822019-10-16 11:02:41 -0700691 arg := &KVArg{
692 Key: TransactionKey,
693 Value: &ic.StrType{Val: transactionId},
694 }
695
696 var marshalledArg *any.Any
697 var err error
698 if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: transactionId}); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000699 logger.Warnw(ctx, "cannot-add-transactionId", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700700 return currentArgs
701 }
702 protoArg := &ic.Argument{
703 Key: arg.Key,
704 Value: marshalledArg,
705 }
706 return append(currentArgs, protoArg)
707}
708
Neha Sharma94f16a92020-06-26 04:17:55 +0000709func (kp *interContainerProxy) addFromTopic(ctx context.Context, fromTopic string, currentArgs []*ic.Argument) []*ic.Argument {
Scott Baker2c1c4822019-10-16 11:02:41 -0700710 var marshalledArg *any.Any
711 var err error
712 if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: fromTopic}); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000713 logger.Warnw(ctx, "cannot-add-transactionId", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700714 return currentArgs
715 }
716 protoArg := &ic.Argument{
717 Key: FromTopic,
718 Value: marshalledArg,
719 }
720 return append(currentArgs, protoArg)
721}
722
Neha Sharma94f16a92020-06-26 04:17:55 +0000723func (kp *interContainerProxy) handleMessage(ctx context.Context, msg *ic.InterContainerMessage, targetInterface interface{}) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700724
725 // First extract the header to know whether this is a request - responses are handled by a different handler
726 if msg.Header.Type == ic.MessageType_REQUEST {
727 var out []reflect.Value
728 var err error
729
730 // Get the request body
731 requestBody := &ic.InterContainerRequestBody{}
732 if err = ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000733 logger.Warnw(ctx, "cannot-unmarshal-request", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700734 } else {
Neha Sharma94f16a92020-06-26 04:17:55 +0000735 logger.Debugw(ctx, "received-request", log.Fields{"rpc": requestBody.Rpc, "header": msg.Header})
Scott Baker2c1c4822019-10-16 11:02:41 -0700736 // let the callee unpack the arguments as its the only one that knows the real proto type
737 // Augment the requestBody with the message Id as it will be used in scenarios where cores
738 // are set in pairs and competing
Neha Sharma94f16a92020-06-26 04:17:55 +0000739 requestBody.Args = kp.addTransactionId(ctx, msg.Header.Id, requestBody.Args)
Scott Baker2c1c4822019-10-16 11:02:41 -0700740
741 // Augment the requestBody with the From topic name as it will be used in scenarios where a container
742 // needs to send an unsollicited message to the currently requested container
Neha Sharma94f16a92020-06-26 04:17:55 +0000743 requestBody.Args = kp.addFromTopic(ctx, msg.Header.FromTopic, requestBody.Args)
Scott Baker2c1c4822019-10-16 11:02:41 -0700744
Neha Sharma94f16a92020-06-26 04:17:55 +0000745 out, err = CallFuncByName(ctx, targetInterface, requestBody.Rpc, requestBody.Args)
Scott Baker2c1c4822019-10-16 11:02:41 -0700746 if err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000747 logger.Warn(ctx, err)
Scott Baker2c1c4822019-10-16 11:02:41 -0700748 }
749 }
750 // Response required?
751 if requestBody.ResponseRequired {
752 // If we already have an error before then just return that
753 var returnError *ic.Error
754 var returnedValues []interface{}
755 var success bool
756 if err != nil {
757 returnError = &ic.Error{Reason: err.Error()}
758 returnedValues = make([]interface{}, 1)
759 returnedValues[0] = returnError
760 } else {
761 returnedValues = make([]interface{}, 0)
762 // Check for errors first
763 lastIndex := len(out) - 1
764 if out[lastIndex].Interface() != nil { // Error
765 if retError, ok := out[lastIndex].Interface().(error); ok {
766 if retError.Error() == ErrorTransactionNotAcquired.Error() {
Neha Sharma94f16a92020-06-26 04:17:55 +0000767 logger.Debugw(ctx, "Ignoring request", log.Fields{"error": retError, "txId": msg.Header.Id})
Scott Baker2c1c4822019-10-16 11:02:41 -0700768 return // Ignore - process is in competing mode and ignored transaction
769 }
770 returnError = &ic.Error{Reason: retError.Error()}
771 returnedValues = append(returnedValues, returnError)
772 } else { // Should never happen
773 returnError = &ic.Error{Reason: "incorrect-error-returns"}
774 returnedValues = append(returnedValues, returnError)
775 }
776 } else if len(out) == 2 && reflect.ValueOf(out[0].Interface()).IsValid() && reflect.ValueOf(out[0].Interface()).IsNil() {
Neha Sharma94f16a92020-06-26 04:17:55 +0000777 logger.Warnw(ctx, "Unexpected response of (nil,nil)", log.Fields{"txId": msg.Header.Id})
Scott Baker2c1c4822019-10-16 11:02:41 -0700778 return // Ignore - should not happen
779 } else { // Non-error case
780 success = true
781 for idx, val := range out {
Neha Sharma94f16a92020-06-26 04:17:55 +0000782 //logger.Debugw(ctx, "returned-api-response-loop", log.Fields{"idx": idx, "val": val.Interface()})
Scott Baker2c1c4822019-10-16 11:02:41 -0700783 if idx != lastIndex {
784 returnedValues = append(returnedValues, val.Interface())
785 }
786 }
787 }
788 }
789
790 var icm *ic.InterContainerMessage
Neha Sharma94f16a92020-06-26 04:17:55 +0000791 if icm, err = encodeResponse(ctx, msg, success, returnedValues...); err != nil {
792 logger.Warnw(ctx, "error-encoding-response-returning-failure-result", log.Fields{"error": err})
793 icm = encodeDefaultFailedResponse(ctx, msg)
Scott Baker2c1c4822019-10-16 11:02:41 -0700794 }
795 // To preserve ordering of messages, all messages to a given topic are sent to the same partition
796 // by providing a message key. The key is encoded in the topic name. If the deviceId is not
797 // present then the key will be empty, hence all messages for a given topic will be sent to all
798 // partitions.
799 replyTopic := &Topic{Name: msg.Header.FromTopic}
800 key := msg.Header.KeyTopic
Neha Sharma94f16a92020-06-26 04:17:55 +0000801 logger.Debugw(ctx, "sending-response-to-kafka", log.Fields{"rpc": requestBody.Rpc, "header": icm.Header, "key": key})
Scott Baker2c1c4822019-10-16 11:02:41 -0700802 // TODO: handle error response.
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800803 go func() {
Neha Sharma94f16a92020-06-26 04:17:55 +0000804 if err := kp.kafkaClient.Send(ctx, icm, replyTopic, key); err != nil {
805 logger.Errorw(ctx, "send-reply-failed", log.Fields{
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800806 "topic": replyTopic,
807 "key": key,
808 "error": err})
809 }
810 }()
Scott Baker2c1c4822019-10-16 11:02:41 -0700811 }
812 } else if msg.Header.Type == ic.MessageType_RESPONSE {
Neha Sharma94f16a92020-06-26 04:17:55 +0000813 logger.Debugw(ctx, "response-received", log.Fields{"msg-header": msg.Header})
814 go kp.dispatchResponse(ctx, msg)
Scott Baker2c1c4822019-10-16 11:02:41 -0700815 } else {
Neha Sharma94f16a92020-06-26 04:17:55 +0000816 logger.Warnw(ctx, "unsupported-message-received", log.Fields{"msg-header": msg.Header})
Scott Baker2c1c4822019-10-16 11:02:41 -0700817 }
818}
819
Neha Sharma94f16a92020-06-26 04:17:55 +0000820func (kp *interContainerProxy) waitForMessages(ctx context.Context, ch <-chan *ic.InterContainerMessage, topic Topic, targetInterface interface{}) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700821 // Wait for messages
822 for msg := range ch {
Neha Sharma94f16a92020-06-26 04:17:55 +0000823 //logger.Debugw(ctx, "request-received", log.Fields{"msg": msg, "topic": topic.Name, "target": targetInterface})
824 go kp.handleMessage(context.Background(), msg, targetInterface)
Scott Baker2c1c4822019-10-16 11:02:41 -0700825 }
826}
827
Neha Sharma94f16a92020-06-26 04:17:55 +0000828func (kp *interContainerProxy) dispatchResponse(ctx context.Context, msg *ic.InterContainerMessage) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700829 kp.lockTransactionIdToChannelMap.RLock()
830 defer kp.lockTransactionIdToChannelMap.RUnlock()
831 if _, exist := kp.transactionIdToChannelMap[msg.Header.Id]; !exist {
Neha Sharma94f16a92020-06-26 04:17:55 +0000832 logger.Debugw(ctx, "no-waiting-channel", log.Fields{"transaction": msg.Header.Id})
Scott Baker2c1c4822019-10-16 11:02:41 -0700833 return
834 }
835 kp.transactionIdToChannelMap[msg.Header.Id].ch <- msg
836}
837
838// subscribeForResponse allows a caller to subscribe to a given topic when waiting for a response.
839// This method is built to prevent all subscribers to receive all messages as is the case of the Subscribe
840// API. There is one response channel waiting for kafka messages before dispatching the message to the
841// corresponding waiting channel
Neha Sharma94f16a92020-06-26 04:17:55 +0000842func (kp *interContainerProxy) subscribeForResponse(ctx context.Context, topic Topic, trnsId string) (chan *ic.InterContainerMessage, error) {
843 logger.Debugw(ctx, "subscribeForResponse", log.Fields{"topic": topic.Name, "trnsid": trnsId})
Scott Baker2c1c4822019-10-16 11:02:41 -0700844
845 // Create a specific channel for this consumers. We cannot use the channel from the kafkaclient as it will
846 // broadcast any message for this topic to all channels waiting on it.
Scott Bakerae1d4702020-03-04 14:10:51 -0800847 // Set channel size to 1 to prevent deadlock, see VOL-2708
848 ch := make(chan *ic.InterContainerMessage, 1)
Scott Baker2c1c4822019-10-16 11:02:41 -0700849 kp.addToTransactionIdToChannelMap(trnsId, &topic, ch)
850
851 return ch, nil
852}
853
Neha Sharma94f16a92020-06-26 04:17:55 +0000854func (kp *interContainerProxy) unSubscribeForResponse(ctx context.Context, trnsId string) error {
855 logger.Debugw(ctx, "unsubscribe-for-response", log.Fields{"trnsId": trnsId})
Scott Baker2c1c4822019-10-16 11:02:41 -0700856 kp.deleteFromTransactionIdToChannelMap(trnsId)
857 return nil
858}
859
Neha Sharma94f16a92020-06-26 04:17:55 +0000860func (kp *interContainerProxy) EnableLivenessChannel(ctx context.Context, enable bool) chan bool {
861 return kp.kafkaClient.EnableLivenessChannel(ctx, enable)
Scott Baker104b67d2019-10-29 15:56:27 -0700862}
863
Neha Sharma94f16a92020-06-26 04:17:55 +0000864func (kp *interContainerProxy) EnableHealthinessChannel(ctx context.Context, enable bool) chan bool {
865 return kp.kafkaClient.EnableHealthinessChannel(ctx, enable)
Scott Baker0fef6982019-12-12 09:49:42 -0800866}
867
Neha Sharma94f16a92020-06-26 04:17:55 +0000868func (kp *interContainerProxy) SendLiveness(ctx context.Context) error {
869 return kp.kafkaClient.SendLiveness(ctx)
Scott Baker104b67d2019-10-29 15:56:27 -0700870}
871
Scott Baker2c1c4822019-10-16 11:02:41 -0700872//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
873//or an error on failure
Neha Sharma94f16a92020-06-26 04:17:55 +0000874func encodeRequest(ctx context.Context, rpc string, toTopic *Topic, replyTopic *Topic, key string, kvArgs ...*KVArg) (*ic.InterContainerMessage, error) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700875 requestHeader := &ic.Header{
876 Id: uuid.New().String(),
877 Type: ic.MessageType_REQUEST,
878 FromTopic: replyTopic.Name,
879 ToTopic: toTopic.Name,
880 KeyTopic: key,
Scott Baker84a55ce2020-04-17 10:11:30 -0700881 Timestamp: ptypes.TimestampNow(),
Scott Baker2c1c4822019-10-16 11:02:41 -0700882 }
883 requestBody := &ic.InterContainerRequestBody{
884 Rpc: rpc,
885 ResponseRequired: true,
886 ReplyToTopic: replyTopic.Name,
887 }
888
889 for _, arg := range kvArgs {
890 if arg == nil {
891 // In case the caller sends an array with empty args
892 continue
893 }
894 var marshalledArg *any.Any
895 var err error
896 // ascertain the value interface type is a proto.Message
897 protoValue, ok := arg.Value.(proto.Message)
898 if !ok {
Neha Sharma94f16a92020-06-26 04:17:55 +0000899 logger.Warnw(ctx, "argument-value-not-proto-message", log.Fields{"error": ok, "Value": arg.Value})
Scott Baker2c1c4822019-10-16 11:02:41 -0700900 err := errors.New("argument-value-not-proto-message")
901 return nil, err
902 }
903 if marshalledArg, err = ptypes.MarshalAny(protoValue); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000904 logger.Warnw(ctx, "cannot-marshal-request", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700905 return nil, err
906 }
907 protoArg := &ic.Argument{
908 Key: arg.Key,
909 Value: marshalledArg,
910 }
911 requestBody.Args = append(requestBody.Args, protoArg)
912 }
913
914 var marshalledData *any.Any
915 var err error
916 if marshalledData, err = ptypes.MarshalAny(requestBody); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000917 logger.Warnw(ctx, "cannot-marshal-request", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700918 return nil, err
919 }
920 request := &ic.InterContainerMessage{
921 Header: requestHeader,
922 Body: marshalledData,
923 }
924 return request, nil
925}
926
Neha Sharma94f16a92020-06-26 04:17:55 +0000927func decodeResponse(ctx context.Context, response *ic.InterContainerMessage) (*ic.InterContainerResponseBody, error) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700928 // Extract the message body
929 responseBody := ic.InterContainerResponseBody{}
930 if err := ptypes.UnmarshalAny(response.Body, &responseBody); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000931 logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700932 return nil, err
933 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000934 //logger.Debugw(ctx, "response-decoded-successfully", log.Fields{"response-status": &responseBody.Success})
Scott Baker2c1c4822019-10-16 11:02:41 -0700935
936 return &responseBody, nil
937
938}