blob: 9f9fbfc1189141065f70f1238cd89d3287c5f160 [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kafka
17
18import (
19 "context"
20 "errors"
21 "fmt"
divyadesaid26f6b12020-03-19 06:30:28 +000022 "google.golang.org/grpc/codes"
23 "google.golang.org/grpc/status"
William Kurkianea869482019-04-09 15:16:11 -040024 "reflect"
25 "strings"
26 "sync"
27 "time"
William Kurkianea869482019-04-09 15:16:11 -040028
Esin Karamanccb714b2019-11-29 15:02:06 +000029 "github.com/golang/protobuf/proto"
30 "github.com/golang/protobuf/ptypes"
31 "github.com/golang/protobuf/ptypes/any"
32 "github.com/google/uuid"
33 "github.com/opencord/voltha-lib-go/v3/pkg/log"
34 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
35)
William Kurkianea869482019-04-09 15:16:11 -040036
37const (
38 DefaultMaxRetries = 3
divyadesaid26f6b12020-03-19 06:30:28 +000039 DefaultRequestTimeout = 60000 // 60000 milliseconds - to handle a wider latency range
William Kurkianea869482019-04-09 15:16:11 -040040)
41
42const (
43 TransactionKey = "transactionID"
44 FromTopic = "fromTopic"
45)
46
kdarapub26b4502019-10-05 03:02:33 +053047var ErrorTransactionNotAcquired = errors.New("transaction-not-acquired")
48var ErrorTransactionInvalidId = errors.New("transaction-invalid-id")
49
William Kurkianea869482019-04-09 15:16:11 -040050// requestHandlerChannel represents an interface associated with a channel. Whenever, an event is
51// obtained from that channel, this interface is invoked. This is used to handle
52// async requests into the Core via the kafka messaging bus
53type requestHandlerChannel struct {
54 requesthandlerInterface interface{}
55 ch <-chan *ic.InterContainerMessage
56}
57
58// transactionChannel represents a combination of a topic and a channel onto which a response received
59// on the kafka bus will be sent to
60type transactionChannel struct {
61 topic *Topic
62 ch chan *ic.InterContainerMessage
63}
64
npujarec5762e2020-01-01 14:08:48 +053065type InterContainerProxy interface {
66 Start() error
67 Stop()
68 GetDefaultTopic() *Topic
69 DeviceDiscovered(deviceId string, deviceType string, parentId string, publisher string) error
70 InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic, waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any)
divyadesaid26f6b12020-03-19 06:30:28 +000071 InvokeAsyncRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic, waitForResponse bool, key string, kvArgs ...*KVArg) chan *RpcResponse
npujarec5762e2020-01-01 14:08:48 +053072 SubscribeWithRequestHandlerInterface(topic Topic, handler interface{}) error
73 SubscribeWithDefaultRequestHandler(topic Topic, initialOffset int64) error
74 UnSubscribeFromRequestHandler(topic Topic) error
75 DeleteTopic(topic Topic) error
76 EnableLivenessChannel(enable bool) chan bool
77 SendLiveness() error
78}
79
80// interContainerProxy represents the messaging proxy
81type interContainerProxy struct {
Neha Sharma3f221ae2020-04-29 19:02:12 +000082 kafkaAddress string
npujarec5762e2020-01-01 14:08:48 +053083 defaultTopic *Topic
William Kurkianea869482019-04-09 15:16:11 -040084 defaultRequestHandlerInterface interface{}
85 deviceDiscoveryTopic *Topic
86 kafkaClient Client
npujarec5762e2020-01-01 14:08:48 +053087 doneCh chan struct{}
88 doneOnce sync.Once
William Kurkianea869482019-04-09 15:16:11 -040089
90 // This map is used to map a topic to an interface and channel. When a request is received
91 // on that channel (registered to the topic) then that interface is invoked.
92 topicToRequestHandlerChannelMap map[string]*requestHandlerChannel
93 lockTopicRequestHandlerChannelMap sync.RWMutex
94
95 // This map is used to map a channel to a response topic. This channel handles all responses on that
96 // channel for that topic and forward them to the appropriate consumers channel, using the
97 // transactionIdToChannelMap.
98 topicToResponseChannelMap map[string]<-chan *ic.InterContainerMessage
99 lockTopicResponseChannelMap sync.RWMutex
100
101 // This map is used to map a transaction to a consumers channel. This is used whenever a request has been
102 // sent out and we are waiting for a response.
103 transactionIdToChannelMap map[string]*transactionChannel
104 lockTransactionIdToChannelMap sync.RWMutex
105}
106
npujarec5762e2020-01-01 14:08:48 +0530107type InterContainerProxyOption func(*interContainerProxy)
William Kurkianea869482019-04-09 15:16:11 -0400108
Neha Sharma3f221ae2020-04-29 19:02:12 +0000109func InterContainerAddress(address string) InterContainerProxyOption {
npujarec5762e2020-01-01 14:08:48 +0530110 return func(args *interContainerProxy) {
Neha Sharma3f221ae2020-04-29 19:02:12 +0000111 args.kafkaAddress = address
William Kurkianea869482019-04-09 15:16:11 -0400112 }
113}
114
115func DefaultTopic(topic *Topic) InterContainerProxyOption {
npujarec5762e2020-01-01 14:08:48 +0530116 return func(args *interContainerProxy) {
117 args.defaultTopic = topic
William Kurkianea869482019-04-09 15:16:11 -0400118 }
119}
120
121func DeviceDiscoveryTopic(topic *Topic) InterContainerProxyOption {
npujarec5762e2020-01-01 14:08:48 +0530122 return func(args *interContainerProxy) {
William Kurkianea869482019-04-09 15:16:11 -0400123 args.deviceDiscoveryTopic = topic
124 }
125}
126
127func RequestHandlerInterface(handler interface{}) InterContainerProxyOption {
npujarec5762e2020-01-01 14:08:48 +0530128 return func(args *interContainerProxy) {
William Kurkianea869482019-04-09 15:16:11 -0400129 args.defaultRequestHandlerInterface = handler
130 }
131}
132
133func MsgClient(client Client) InterContainerProxyOption {
npujarec5762e2020-01-01 14:08:48 +0530134 return func(args *interContainerProxy) {
William Kurkianea869482019-04-09 15:16:11 -0400135 args.kafkaClient = client
136 }
137}
138
npujarec5762e2020-01-01 14:08:48 +0530139func newInterContainerProxy(opts ...InterContainerProxyOption) *interContainerProxy {
140 proxy := &interContainerProxy{
Neha Sharma3f221ae2020-04-29 19:02:12 +0000141 kafkaAddress: DefaultKafkaAddress,
142 doneCh: make(chan struct{}),
William Kurkianea869482019-04-09 15:16:11 -0400143 }
144
145 for _, option := range opts {
146 option(proxy)
147 }
148
npujarec5762e2020-01-01 14:08:48 +0530149 return proxy
William Kurkianea869482019-04-09 15:16:11 -0400150}
151
npujarec5762e2020-01-01 14:08:48 +0530152func NewInterContainerProxy(opts ...InterContainerProxyOption) InterContainerProxy {
153 return newInterContainerProxy(opts...)
154}
155
156func (kp *interContainerProxy) Start() error {
Esin Karamanccb714b2019-11-29 15:02:06 +0000157 logger.Info("Starting-Proxy")
William Kurkianea869482019-04-09 15:16:11 -0400158
159 // Kafka MsgClient should already have been created. If not, output fatal error
160 if kp.kafkaClient == nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000161 logger.Fatal("kafka-client-not-set")
William Kurkianea869482019-04-09 15:16:11 -0400162 }
163
William Kurkianea869482019-04-09 15:16:11 -0400164 // Start the kafka client
165 if err := kp.kafkaClient.Start(); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000166 logger.Errorw("Cannot-create-kafka-proxy", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400167 return err
168 }
169
170 // Create the topic to response channel map
171 kp.topicToResponseChannelMap = make(map[string]<-chan *ic.InterContainerMessage)
172 //
173 // Create the transactionId to Channel Map
174 kp.transactionIdToChannelMap = make(map[string]*transactionChannel)
175
176 // Create the topic to request channel map
177 kp.topicToRequestHandlerChannelMap = make(map[string]*requestHandlerChannel)
178
179 return nil
180}
181
npujarec5762e2020-01-01 14:08:48 +0530182func (kp *interContainerProxy) Stop() {
Esin Karamanccb714b2019-11-29 15:02:06 +0000183 logger.Info("stopping-intercontainer-proxy")
npujarec5762e2020-01-01 14:08:48 +0530184 kp.doneOnce.Do(func() { close(kp.doneCh) })
William Kurkianea869482019-04-09 15:16:11 -0400185 // TODO : Perform cleanup
186 kp.kafkaClient.Stop()
Scott Bakere701b862020-02-20 16:19:16 -0800187 err := kp.deleteAllTopicRequestHandlerChannelMap()
188 if err != nil {
Scott Baker24f83e22020-03-30 16:14:28 -0700189 logger.Errorw("failed-delete-all-topic-request-handler-channel-map", log.Fields{"error": err})
Scott Bakere701b862020-02-20 16:19:16 -0800190 }
191 err = kp.deleteAllTopicResponseChannelMap()
192 if err != nil {
Scott Baker24f83e22020-03-30 16:14:28 -0700193 logger.Errorw("failed-delete-all-topic-response-channel-map", log.Fields{"error": err})
Scott Bakere701b862020-02-20 16:19:16 -0800194 }
195 kp.deleteAllTransactionIdToChannelMap()
William Kurkianea869482019-04-09 15:16:11 -0400196}
197
npujarec5762e2020-01-01 14:08:48 +0530198func (kp *interContainerProxy) GetDefaultTopic() *Topic {
199 return kp.defaultTopic
200}
201
William Kurkianea869482019-04-09 15:16:11 -0400202// DeviceDiscovered publish the discovered device onto the kafka messaging bus
npujarec5762e2020-01-01 14:08:48 +0530203func (kp *interContainerProxy) DeviceDiscovered(deviceId string, deviceType string, parentId string, publisher string) error {
Esin Karamanccb714b2019-11-29 15:02:06 +0000204 logger.Debugw("sending-device-discovery-msg", log.Fields{"deviceId": deviceId})
William Kurkianea869482019-04-09 15:16:11 -0400205 // Simple validation
206 if deviceId == "" || deviceType == "" {
Esin Karamanccb714b2019-11-29 15:02:06 +0000207 logger.Errorw("invalid-parameters", log.Fields{"id": deviceId, "type": deviceType})
William Kurkianea869482019-04-09 15:16:11 -0400208 return errors.New("invalid-parameters")
209 }
210 // Create the device discovery message
211 header := &ic.Header{
212 Id: uuid.New().String(),
213 Type: ic.MessageType_DEVICE_DISCOVERED,
npujarec5762e2020-01-01 14:08:48 +0530214 FromTopic: kp.defaultTopic.Name,
William Kurkianea869482019-04-09 15:16:11 -0400215 ToTopic: kp.deviceDiscoveryTopic.Name,
Scott Bakered4a8e72020-04-17 11:10:20 -0700216 Timestamp: ptypes.TimestampNow(),
William Kurkianea869482019-04-09 15:16:11 -0400217 }
218 body := &ic.DeviceDiscovered{
219 Id: deviceId,
220 DeviceType: deviceType,
221 ParentId: parentId,
222 Publisher: publisher,
223 }
224
225 var marshalledData *any.Any
226 var err error
227 if marshalledData, err = ptypes.MarshalAny(body); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000228 logger.Errorw("cannot-marshal-request", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400229 return err
230 }
231 msg := &ic.InterContainerMessage{
232 Header: header,
233 Body: marshalledData,
234 }
235
236 // Send the message
237 if err := kp.kafkaClient.Send(msg, kp.deviceDiscoveryTopic); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000238 logger.Errorw("cannot-send-device-discovery-message", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400239 return err
240 }
241 return nil
242}
243
divyadesaid26f6b12020-03-19 06:30:28 +0000244// InvokeAsyncRPC is used to make an RPC request asynchronously
245func (kp *interContainerProxy) InvokeAsyncRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
246 waitForResponse bool, key string, kvArgs ...*KVArg) chan *RpcResponse {
247
248 logger.Debugw("InvokeAsyncRPC", log.Fields{"rpc": rpc, "key": key})
249 // If a replyToTopic is provided then we use it, otherwise just use the default toTopic. The replyToTopic is
250 // typically the device ID.
251 responseTopic := replyToTopic
252 if responseTopic == nil {
253 responseTopic = kp.GetDefaultTopic()
254 }
255
256 chnl := make(chan *RpcResponse)
257
258 go func() {
259
260 // once we're done,
261 // close the response channel
262 defer close(chnl)
263
264 var err error
265 var protoRequest *ic.InterContainerMessage
266
267 // Encode the request
268 protoRequest, err = encodeRequest(rpc, toTopic, responseTopic, key, kvArgs...)
269 if err != nil {
270 logger.Warnw("cannot-format-request", log.Fields{"rpc": rpc, "error": err})
271 chnl <- NewResponse(RpcFormattingError, err, nil)
272 return
273 }
274
275 // Subscribe for response, if needed, before sending request
276 var ch <-chan *ic.InterContainerMessage
277 if ch, err = kp.subscribeForResponse(*responseTopic, protoRequest.Header.Id); err != nil {
278 logger.Errorw("failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
279 chnl <- NewResponse(RpcTransportError, err, nil)
280 return
281 }
282
283 // Send request - if the topic is formatted with a device Id then we will send the request using a
284 // specific key, hence ensuring a single partition is used to publish the request. This ensures that the
285 // subscriber on that topic will receive the request in the order it was sent. The key used is the deviceId.
286 logger.Debugw("sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
287
288 // if the message is not sent on kafka publish an event an close the channel
289 if err = kp.kafkaClient.Send(protoRequest, toTopic, key); err != nil {
290 chnl <- NewResponse(RpcTransportError, err, nil)
291 return
292 }
293
294 // if the client is not waiting for a response send the ack and close the channel
295 chnl <- NewResponse(RpcSent, nil, nil)
296 if !waitForResponse {
297 return
298 }
299
300 defer func() {
301 // Remove the subscription for a response on return
302 if err := kp.unSubscribeForResponse(protoRequest.Header.Id); err != nil {
303 logger.Warnw("invoke-async-rpc-unsubscriber-for-response-failed", log.Fields{"err": err})
304 }
305 }()
306
307 // Wait for response as well as timeout or cancellation
308 select {
309 case msg, ok := <-ch:
310 if !ok {
311 logger.Warnw("channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
312 chnl <- NewResponse(RpcTransportError, status.Error(codes.Aborted, "channel closed"), nil)
313 }
314 logger.Debugw("received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
315 if responseBody, err := decodeResponse(msg); err != nil {
316 chnl <- NewResponse(RpcReply, err, nil)
317 } else {
318 if responseBody.Success {
319 chnl <- NewResponse(RpcReply, nil, responseBody.Result)
320 } else {
321 // response body contains an error
322 unpackErr := &ic.Error{}
323 if err := ptypes.UnmarshalAny(responseBody.Result, unpackErr); err != nil {
324 chnl <- NewResponse(RpcReply, err, nil)
325 } else {
326 chnl <- NewResponse(RpcReply, status.Error(codes.Internal, unpackErr.Reason), nil)
327 }
328 }
329 }
330 case <-ctx.Done():
331 logger.Errorw("context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
332 err := status.Error(codes.DeadlineExceeded, ctx.Err().Error())
333 chnl <- NewResponse(RpcTimeout, err, nil)
334 case <-kp.doneCh:
335 chnl <- NewResponse(RpcSystemClosing, nil, nil)
336 logger.Warnw("received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
337 }
338 }()
339 return chnl
340}
341
William Kurkianea869482019-04-09 15:16:11 -0400342// InvokeRPC is used to send a request to a given topic
npujarec5762e2020-01-01 14:08:48 +0530343func (kp *interContainerProxy) InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
William Kurkianea869482019-04-09 15:16:11 -0400344 waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any) {
345
346 // If a replyToTopic is provided then we use it, otherwise just use the default toTopic. The replyToTopic is
347 // typically the device ID.
348 responseTopic := replyToTopic
349 if responseTopic == nil {
npujarec5762e2020-01-01 14:08:48 +0530350 responseTopic = kp.defaultTopic
William Kurkianea869482019-04-09 15:16:11 -0400351 }
352
353 // Encode the request
354 protoRequest, err := encodeRequest(rpc, toTopic, responseTopic, key, kvArgs...)
355 if err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000356 logger.Warnw("cannot-format-request", log.Fields{"rpc": rpc, "error": err})
William Kurkianea869482019-04-09 15:16:11 -0400357 return false, nil
358 }
359
360 // Subscribe for response, if needed, before sending request
361 var ch <-chan *ic.InterContainerMessage
362 if waitForResponse {
363 var err error
364 if ch, err = kp.subscribeForResponse(*responseTopic, protoRequest.Header.Id); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000365 logger.Errorw("failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400366 }
367 }
368
369 // Send request - if the topic is formatted with a device Id then we will send the request using a
370 // specific key, hence ensuring a single partition is used to publish the request. This ensures that the
371 // subscriber on that topic will receive the request in the order it was sent. The key used is the deviceId.
372 //key := GetDeviceIdFromTopic(*toTopic)
Esin Karamanccb714b2019-11-29 15:02:06 +0000373 logger.Debugw("sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000374 go func() {
375 if err := kp.kafkaClient.Send(protoRequest, toTopic, key); err != nil {
376 logger.Errorw("send-failed", log.Fields{
377 "topic": toTopic,
378 "key": key,
379 "error": err})
380 }
381 }()
William Kurkianea869482019-04-09 15:16:11 -0400382
383 if waitForResponse {
384 // Create a child context based on the parent context, if any
385 var cancel context.CancelFunc
386 childCtx := context.Background()
387 if ctx == nil {
388 ctx, cancel = context.WithTimeout(context.Background(), DefaultRequestTimeout*time.Millisecond)
389 } else {
390 childCtx, cancel = context.WithTimeout(ctx, DefaultRequestTimeout*time.Millisecond)
391 }
392 defer cancel()
393
394 // Wait for response as well as timeout or cancellation
395 // Remove the subscription for a response on return
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000396 defer func() {
397 if err := kp.unSubscribeForResponse(protoRequest.Header.Id); err != nil {
398 logger.Errorw("response-unsubscribe-failed", log.Fields{
399 "id": protoRequest.Header.Id,
400 "error": err})
401 }
402 }()
William Kurkianea869482019-04-09 15:16:11 -0400403 select {
404 case msg, ok := <-ch:
405 if !ok {
Esin Karamanccb714b2019-11-29 15:02:06 +0000406 logger.Warnw("channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400407 protoError := &ic.Error{Reason: "channel-closed"}
408 var marshalledArg *any.Any
409 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
410 return false, nil // Should never happen
411 }
412 return false, marshalledArg
413 }
Esin Karamanccb714b2019-11-29 15:02:06 +0000414 logger.Debugw("received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
William Kurkianea869482019-04-09 15:16:11 -0400415 var responseBody *ic.InterContainerResponseBody
416 var err error
417 if responseBody, err = decodeResponse(msg); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000418 logger.Errorw("decode-response-error", log.Fields{"error": err})
npujarec5762e2020-01-01 14:08:48 +0530419 // FIXME we should return something
William Kurkianea869482019-04-09 15:16:11 -0400420 }
421 return responseBody.Success, responseBody.Result
422 case <-ctx.Done():
Esin Karamanccb714b2019-11-29 15:02:06 +0000423 logger.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
William Kurkianea869482019-04-09 15:16:11 -0400424 // pack the error as proto any type
npujarec5762e2020-01-01 14:08:48 +0530425 protoError := &ic.Error{Reason: ctx.Err().Error(), Code: ic.ErrorCode_DEADLINE_EXCEEDED}
426
William Kurkianea869482019-04-09 15:16:11 -0400427 var marshalledArg *any.Any
428 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
429 return false, nil // Should never happen
430 }
431 return false, marshalledArg
432 case <-childCtx.Done():
Esin Karamanccb714b2019-11-29 15:02:06 +0000433 logger.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": childCtx.Err()})
William Kurkianea869482019-04-09 15:16:11 -0400434 // pack the error as proto any type
npujarec5762e2020-01-01 14:08:48 +0530435 protoError := &ic.Error{Reason: childCtx.Err().Error(), Code: ic.ErrorCode_DEADLINE_EXCEEDED}
436
William Kurkianea869482019-04-09 15:16:11 -0400437 var marshalledArg *any.Any
438 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
439 return false, nil // Should never happen
440 }
441 return false, marshalledArg
442 case <-kp.doneCh:
Esin Karamanccb714b2019-11-29 15:02:06 +0000443 logger.Infow("received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
William Kurkianea869482019-04-09 15:16:11 -0400444 return true, nil
445 }
446 }
447 return true, nil
448}
449
450// SubscribeWithRequestHandlerInterface allows a caller to assign a target object to be invoked automatically
451// when a message is received on a given topic
npujarec5762e2020-01-01 14:08:48 +0530452func (kp *interContainerProxy) SubscribeWithRequestHandlerInterface(topic Topic, handler interface{}) error {
William Kurkianea869482019-04-09 15:16:11 -0400453
454 // Subscribe to receive messages for that topic
455 var ch <-chan *ic.InterContainerMessage
456 var err error
457 if ch, err = kp.kafkaClient.Subscribe(&topic); err != nil {
458 //if ch, err = kp.Subscribe(topic); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000459 logger.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
Matt Jeannereteb5059f2019-07-19 06:11:00 -0400460 return err
William Kurkianea869482019-04-09 15:16:11 -0400461 }
462
463 kp.defaultRequestHandlerInterface = handler
464 kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: handler, ch: ch})
465 // Launch a go routine to receive and process kafka messages
466 go kp.waitForMessages(ch, topic, handler)
467
468 return nil
469}
470
471// SubscribeWithDefaultRequestHandler allows a caller to add a topic to an existing target object to be invoked automatically
472// when a message is received on a given topic. So far there is only 1 target registered per microservice
npujarec5762e2020-01-01 14:08:48 +0530473func (kp *interContainerProxy) SubscribeWithDefaultRequestHandler(topic Topic, initialOffset int64) error {
William Kurkianea869482019-04-09 15:16:11 -0400474 // Subscribe to receive messages for that topic
475 var ch <-chan *ic.InterContainerMessage
476 var err error
477 if ch, err = kp.kafkaClient.Subscribe(&topic, &KVArg{Key: Offset, Value: initialOffset}); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000478 logger.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400479 return err
480 }
481 kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: kp.defaultRequestHandlerInterface, ch: ch})
482
483 // Launch a go routine to receive and process kafka messages
484 go kp.waitForMessages(ch, topic, kp.defaultRequestHandlerInterface)
485
486 return nil
487}
488
npujarec5762e2020-01-01 14:08:48 +0530489func (kp *interContainerProxy) UnSubscribeFromRequestHandler(topic Topic) error {
William Kurkianea869482019-04-09 15:16:11 -0400490 return kp.deleteFromTopicRequestHandlerChannelMap(topic.Name)
491}
492
npujarec5762e2020-01-01 14:08:48 +0530493func (kp *interContainerProxy) deleteFromTopicResponseChannelMap(topic string) error {
William Kurkianea869482019-04-09 15:16:11 -0400494 kp.lockTopicResponseChannelMap.Lock()
495 defer kp.lockTopicResponseChannelMap.Unlock()
496 if _, exist := kp.topicToResponseChannelMap[topic]; exist {
497 // Unsubscribe to this topic first - this will close the subscribed channel
498 var err error
499 if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000500 logger.Errorw("unsubscribing-error", log.Fields{"topic": topic})
William Kurkianea869482019-04-09 15:16:11 -0400501 }
502 delete(kp.topicToResponseChannelMap, topic)
503 return err
504 } else {
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000505 return fmt.Errorf("%s-Topic-not-found", topic)
William Kurkianea869482019-04-09 15:16:11 -0400506 }
507}
508
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000509// nolint: unused
npujarec5762e2020-01-01 14:08:48 +0530510func (kp *interContainerProxy) deleteAllTopicResponseChannelMap() error {
Scott Bakere701b862020-02-20 16:19:16 -0800511 logger.Debug("delete-all-topic-response-channel")
William Kurkianea869482019-04-09 15:16:11 -0400512 kp.lockTopicResponseChannelMap.Lock()
513 defer kp.lockTopicResponseChannelMap.Unlock()
Scott Bakere701b862020-02-20 16:19:16 -0800514 var unsubscribeFailTopics []string
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000515 for topic := range kp.topicToResponseChannelMap {
William Kurkianea869482019-04-09 15:16:11 -0400516 // Unsubscribe to this topic first - this will close the subscribed channel
Scott Bakere701b862020-02-20 16:19:16 -0800517 if err := kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
518 unsubscribeFailTopics = append(unsubscribeFailTopics, topic)
Esin Karamanccb714b2019-11-29 15:02:06 +0000519 logger.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
Scott Bakere701b862020-02-20 16:19:16 -0800520 // Do not return. Continue to try to unsubscribe to other topics.
521 } else {
522 // Only delete from channel map if successfully unsubscribed.
523 delete(kp.topicToResponseChannelMap, topic)
William Kurkianea869482019-04-09 15:16:11 -0400524 }
William Kurkianea869482019-04-09 15:16:11 -0400525 }
Scott Bakere701b862020-02-20 16:19:16 -0800526 if len(unsubscribeFailTopics) > 0 {
527 return fmt.Errorf("unsubscribe-errors: %v", unsubscribeFailTopics)
528 }
529 return nil
William Kurkianea869482019-04-09 15:16:11 -0400530}
531
npujarec5762e2020-01-01 14:08:48 +0530532func (kp *interContainerProxy) addToTopicRequestHandlerChannelMap(topic string, arg *requestHandlerChannel) {
William Kurkianea869482019-04-09 15:16:11 -0400533 kp.lockTopicRequestHandlerChannelMap.Lock()
534 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
535 if _, exist := kp.topicToRequestHandlerChannelMap[topic]; !exist {
536 kp.topicToRequestHandlerChannelMap[topic] = arg
537 }
538}
539
npujarec5762e2020-01-01 14:08:48 +0530540func (kp *interContainerProxy) deleteFromTopicRequestHandlerChannelMap(topic string) error {
William Kurkianea869482019-04-09 15:16:11 -0400541 kp.lockTopicRequestHandlerChannelMap.Lock()
542 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
543 if _, exist := kp.topicToRequestHandlerChannelMap[topic]; exist {
544 // Close the kafka client client first by unsubscribing to this topic
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000545 if err := kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
546 return err
547 }
William Kurkianea869482019-04-09 15:16:11 -0400548 delete(kp.topicToRequestHandlerChannelMap, topic)
549 return nil
550 } else {
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000551 return fmt.Errorf("%s-Topic-not-found", topic)
William Kurkianea869482019-04-09 15:16:11 -0400552 }
553}
554
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000555// nolint: unused
npujarec5762e2020-01-01 14:08:48 +0530556func (kp *interContainerProxy) deleteAllTopicRequestHandlerChannelMap() error {
Scott Bakere701b862020-02-20 16:19:16 -0800557 logger.Debug("delete-all-topic-request-channel")
William Kurkianea869482019-04-09 15:16:11 -0400558 kp.lockTopicRequestHandlerChannelMap.Lock()
559 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
Scott Bakere701b862020-02-20 16:19:16 -0800560 var unsubscribeFailTopics []string
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000561 for topic := range kp.topicToRequestHandlerChannelMap {
William Kurkianea869482019-04-09 15:16:11 -0400562 // Close the kafka client client first by unsubscribing to this topic
Scott Bakere701b862020-02-20 16:19:16 -0800563 if err := kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
564 unsubscribeFailTopics = append(unsubscribeFailTopics, topic)
Esin Karamanccb714b2019-11-29 15:02:06 +0000565 logger.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
Scott Bakere701b862020-02-20 16:19:16 -0800566 // Do not return. Continue to try to unsubscribe to other topics.
567 } else {
568 // Only delete from channel map if successfully unsubscribed.
569 delete(kp.topicToRequestHandlerChannelMap, topic)
William Kurkianea869482019-04-09 15:16:11 -0400570 }
William Kurkianea869482019-04-09 15:16:11 -0400571 }
Scott Bakere701b862020-02-20 16:19:16 -0800572 if len(unsubscribeFailTopics) > 0 {
573 return fmt.Errorf("unsubscribe-errors: %v", unsubscribeFailTopics)
574 }
575 return nil
William Kurkianea869482019-04-09 15:16:11 -0400576}
577
npujarec5762e2020-01-01 14:08:48 +0530578func (kp *interContainerProxy) addToTransactionIdToChannelMap(id string, topic *Topic, arg chan *ic.InterContainerMessage) {
William Kurkianea869482019-04-09 15:16:11 -0400579 kp.lockTransactionIdToChannelMap.Lock()
580 defer kp.lockTransactionIdToChannelMap.Unlock()
581 if _, exist := kp.transactionIdToChannelMap[id]; !exist {
582 kp.transactionIdToChannelMap[id] = &transactionChannel{topic: topic, ch: arg}
583 }
584}
585
npujarec5762e2020-01-01 14:08:48 +0530586func (kp *interContainerProxy) deleteFromTransactionIdToChannelMap(id string) {
William Kurkianea869482019-04-09 15:16:11 -0400587 kp.lockTransactionIdToChannelMap.Lock()
588 defer kp.lockTransactionIdToChannelMap.Unlock()
589 if transChannel, exist := kp.transactionIdToChannelMap[id]; exist {
590 // Close the channel first
591 close(transChannel.ch)
592 delete(kp.transactionIdToChannelMap, id)
593 }
594}
595
npujarec5762e2020-01-01 14:08:48 +0530596func (kp *interContainerProxy) deleteTopicTransactionIdToChannelMap(id string) {
William Kurkianea869482019-04-09 15:16:11 -0400597 kp.lockTransactionIdToChannelMap.Lock()
598 defer kp.lockTransactionIdToChannelMap.Unlock()
599 for key, value := range kp.transactionIdToChannelMap {
600 if value.topic.Name == id {
601 close(value.ch)
602 delete(kp.transactionIdToChannelMap, key)
603 }
604 }
605}
606
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000607// nolint: unused
npujarec5762e2020-01-01 14:08:48 +0530608func (kp *interContainerProxy) deleteAllTransactionIdToChannelMap() {
Scott Bakere701b862020-02-20 16:19:16 -0800609 logger.Debug("delete-all-transaction-id-channel-map")
William Kurkianea869482019-04-09 15:16:11 -0400610 kp.lockTransactionIdToChannelMap.Lock()
611 defer kp.lockTransactionIdToChannelMap.Unlock()
612 for key, value := range kp.transactionIdToChannelMap {
613 close(value.ch)
614 delete(kp.transactionIdToChannelMap, key)
615 }
616}
617
npujarec5762e2020-01-01 14:08:48 +0530618func (kp *interContainerProxy) DeleteTopic(topic Topic) error {
William Kurkianea869482019-04-09 15:16:11 -0400619 // If we have any consumers on that topic we need to close them
620 if err := kp.deleteFromTopicResponseChannelMap(topic.Name); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000621 logger.Errorw("delete-from-topic-responsechannelmap-failed", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400622 }
623 if err := kp.deleteFromTopicRequestHandlerChannelMap(topic.Name); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000624 logger.Errorw("delete-from-topic-requesthandlerchannelmap-failed", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400625 }
626 kp.deleteTopicTransactionIdToChannelMap(topic.Name)
627
628 return kp.kafkaClient.DeleteTopic(&topic)
629}
630
631func encodeReturnedValue(returnedVal interface{}) (*any.Any, error) {
632 // Encode the response argument - needs to be a proto message
633 if returnedVal == nil {
634 return nil, nil
635 }
636 protoValue, ok := returnedVal.(proto.Message)
637 if !ok {
Esin Karamanccb714b2019-11-29 15:02:06 +0000638 logger.Warnw("response-value-not-proto-message", log.Fields{"error": ok, "returnVal": returnedVal})
William Kurkianea869482019-04-09 15:16:11 -0400639 err := errors.New("response-value-not-proto-message")
640 return nil, err
641 }
642
643 // Marshal the returned value, if any
644 var marshalledReturnedVal *any.Any
645 var err error
646 if marshalledReturnedVal, err = ptypes.MarshalAny(protoValue); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000647 logger.Warnw("cannot-marshal-returned-val", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400648 return nil, err
649 }
650 return marshalledReturnedVal, nil
651}
652
653func encodeDefaultFailedResponse(request *ic.InterContainerMessage) *ic.InterContainerMessage {
654 responseHeader := &ic.Header{
655 Id: request.Header.Id,
656 Type: ic.MessageType_RESPONSE,
657 FromTopic: request.Header.ToTopic,
658 ToTopic: request.Header.FromTopic,
Scott Bakered4a8e72020-04-17 11:10:20 -0700659 Timestamp: ptypes.TimestampNow(),
William Kurkianea869482019-04-09 15:16:11 -0400660 }
661 responseBody := &ic.InterContainerResponseBody{
662 Success: false,
663 Result: nil,
664 }
665 var marshalledResponseBody *any.Any
666 var err error
667 // Error should never happen here
668 if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000669 logger.Warnw("cannot-marshal-failed-response-body", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400670 }
671
672 return &ic.InterContainerMessage{
673 Header: responseHeader,
674 Body: marshalledResponseBody,
675 }
676
677}
678
679//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
680//or an error on failure
681func encodeResponse(request *ic.InterContainerMessage, success bool, returnedValues ...interface{}) (*ic.InterContainerMessage, error) {
Esin Karamanccb714b2019-11-29 15:02:06 +0000682 //logger.Debugw("encodeResponse", log.Fields{"success": success, "returnedValues": returnedValues})
William Kurkianea869482019-04-09 15:16:11 -0400683 responseHeader := &ic.Header{
684 Id: request.Header.Id,
685 Type: ic.MessageType_RESPONSE,
686 FromTopic: request.Header.ToTopic,
687 ToTopic: request.Header.FromTopic,
Matt Jeanneret384d8c92019-05-06 14:27:31 -0400688 KeyTopic: request.Header.KeyTopic,
Scott Bakered4a8e72020-04-17 11:10:20 -0700689 Timestamp: ptypes.TimestampNow(),
William Kurkianea869482019-04-09 15:16:11 -0400690 }
691
692 // Go over all returned values
693 var marshalledReturnedVal *any.Any
694 var err error
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000695
696 // for now we support only 1 returned value - (excluding the error)
697 if len(returnedValues) > 0 {
698 if marshalledReturnedVal, err = encodeReturnedValue(returnedValues[0]); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000699 logger.Warnw("cannot-marshal-response-body", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400700 }
William Kurkianea869482019-04-09 15:16:11 -0400701 }
702
703 responseBody := &ic.InterContainerResponseBody{
704 Success: success,
705 Result: marshalledReturnedVal,
706 }
707
708 // Marshal the response body
709 var marshalledResponseBody *any.Any
710 if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000711 logger.Warnw("cannot-marshal-response-body", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400712 return nil, err
713 }
714
715 return &ic.InterContainerMessage{
716 Header: responseHeader,
717 Body: marshalledResponseBody,
718 }, nil
719}
720
721func CallFuncByName(myClass interface{}, funcName string, params ...interface{}) (out []reflect.Value, err error) {
722 myClassValue := reflect.ValueOf(myClass)
723 // Capitalize the first letter in the funcName to workaround the first capital letters required to
724 // invoke a function from a different package
725 funcName = strings.Title(funcName)
726 m := myClassValue.MethodByName(funcName)
727 if !m.IsValid() {
728 return make([]reflect.Value, 0), fmt.Errorf("method-not-found \"%s\"", funcName)
729 }
730 in := make([]reflect.Value, len(params))
731 for i, param := range params {
732 in[i] = reflect.ValueOf(param)
733 }
734 out = m.Call(in)
735 return
736}
737
npujarec5762e2020-01-01 14:08:48 +0530738func (kp *interContainerProxy) addTransactionId(transactionId string, currentArgs []*ic.Argument) []*ic.Argument {
William Kurkianea869482019-04-09 15:16:11 -0400739 arg := &KVArg{
740 Key: TransactionKey,
741 Value: &ic.StrType{Val: transactionId},
742 }
743
744 var marshalledArg *any.Any
745 var err error
746 if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: transactionId}); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000747 logger.Warnw("cannot-add-transactionId", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400748 return currentArgs
749 }
750 protoArg := &ic.Argument{
751 Key: arg.Key,
752 Value: marshalledArg,
753 }
754 return append(currentArgs, protoArg)
755}
756
npujarec5762e2020-01-01 14:08:48 +0530757func (kp *interContainerProxy) addFromTopic(fromTopic string, currentArgs []*ic.Argument) []*ic.Argument {
William Kurkianea869482019-04-09 15:16:11 -0400758 var marshalledArg *any.Any
759 var err error
760 if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: fromTopic}); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000761 logger.Warnw("cannot-add-transactionId", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400762 return currentArgs
763 }
764 protoArg := &ic.Argument{
765 Key: FromTopic,
766 Value: marshalledArg,
767 }
768 return append(currentArgs, protoArg)
769}
770
npujarec5762e2020-01-01 14:08:48 +0530771func (kp *interContainerProxy) handleMessage(msg *ic.InterContainerMessage, targetInterface interface{}) {
William Kurkianea869482019-04-09 15:16:11 -0400772
773 // First extract the header to know whether this is a request - responses are handled by a different handler
774 if msg.Header.Type == ic.MessageType_REQUEST {
775 var out []reflect.Value
776 var err error
777
778 // Get the request body
779 requestBody := &ic.InterContainerRequestBody{}
780 if err = ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000781 logger.Warnw("cannot-unmarshal-request", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400782 } else {
Esin Karamanccb714b2019-11-29 15:02:06 +0000783 logger.Debugw("received-request", log.Fields{"rpc": requestBody.Rpc, "header": msg.Header})
William Kurkianea869482019-04-09 15:16:11 -0400784 // let the callee unpack the arguments as its the only one that knows the real proto type
785 // Augment the requestBody with the message Id as it will be used in scenarios where cores
786 // are set in pairs and competing
787 requestBody.Args = kp.addTransactionId(msg.Header.Id, requestBody.Args)
788
789 // Augment the requestBody with the From topic name as it will be used in scenarios where a container
790 // needs to send an unsollicited message to the currently requested container
791 requestBody.Args = kp.addFromTopic(msg.Header.FromTopic, requestBody.Args)
792
793 out, err = CallFuncByName(targetInterface, requestBody.Rpc, requestBody.Args)
794 if err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000795 logger.Warn(err)
William Kurkianea869482019-04-09 15:16:11 -0400796 }
797 }
798 // Response required?
799 if requestBody.ResponseRequired {
800 // If we already have an error before then just return that
801 var returnError *ic.Error
802 var returnedValues []interface{}
803 var success bool
804 if err != nil {
805 returnError = &ic.Error{Reason: err.Error()}
806 returnedValues = make([]interface{}, 1)
807 returnedValues[0] = returnError
808 } else {
809 returnedValues = make([]interface{}, 0)
810 // Check for errors first
811 lastIndex := len(out) - 1
812 if out[lastIndex].Interface() != nil { // Error
kdarapub26b4502019-10-05 03:02:33 +0530813 if retError, ok := out[lastIndex].Interface().(error); ok {
814 if retError.Error() == ErrorTransactionNotAcquired.Error() {
Esin Karamanccb714b2019-11-29 15:02:06 +0000815 logger.Debugw("Ignoring request", log.Fields{"error": retError, "txId": msg.Header.Id})
kdarapub26b4502019-10-05 03:02:33 +0530816 return // Ignore - process is in competing mode and ignored transaction
817 }
818 returnError = &ic.Error{Reason: retError.Error()}
William Kurkianea869482019-04-09 15:16:11 -0400819 returnedValues = append(returnedValues, returnError)
820 } else { // Should never happen
821 returnError = &ic.Error{Reason: "incorrect-error-returns"}
822 returnedValues = append(returnedValues, returnError)
823 }
824 } else if len(out) == 2 && reflect.ValueOf(out[0].Interface()).IsValid() && reflect.ValueOf(out[0].Interface()).IsNil() {
Esin Karamanccb714b2019-11-29 15:02:06 +0000825 logger.Warnw("Unexpected response of (nil,nil)", log.Fields{"txId": msg.Header.Id})
kdarapub26b4502019-10-05 03:02:33 +0530826 return // Ignore - should not happen
William Kurkianea869482019-04-09 15:16:11 -0400827 } else { // Non-error case
828 success = true
829 for idx, val := range out {
Esin Karamanccb714b2019-11-29 15:02:06 +0000830 //logger.Debugw("returned-api-response-loop", log.Fields{"idx": idx, "val": val.Interface()})
William Kurkianea869482019-04-09 15:16:11 -0400831 if idx != lastIndex {
832 returnedValues = append(returnedValues, val.Interface())
833 }
834 }
835 }
836 }
837
838 var icm *ic.InterContainerMessage
839 if icm, err = encodeResponse(msg, success, returnedValues...); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000840 logger.Warnw("error-encoding-response-returning-failure-result", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400841 icm = encodeDefaultFailedResponse(msg)
842 }
843 // To preserve ordering of messages, all messages to a given topic are sent to the same partition
844 // by providing a message key. The key is encoded in the topic name. If the deviceId is not
845 // present then the key will be empty, hence all messages for a given topic will be sent to all
846 // partitions.
847 replyTopic := &Topic{Name: msg.Header.FromTopic}
848 key := msg.Header.KeyTopic
Esin Karamanccb714b2019-11-29 15:02:06 +0000849 logger.Debugw("sending-response-to-kafka", log.Fields{"rpc": requestBody.Rpc, "header": icm.Header, "key": key})
William Kurkianea869482019-04-09 15:16:11 -0400850 // TODO: handle error response.
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000851 go func() {
852 if err := kp.kafkaClient.Send(icm, replyTopic, key); err != nil {
853 logger.Errorw("send-reply-failed", log.Fields{
854 "topic": replyTopic,
855 "key": key,
856 "error": err})
857 }
858 }()
William Kurkianea869482019-04-09 15:16:11 -0400859 }
860 } else if msg.Header.Type == ic.MessageType_RESPONSE {
Esin Karamanccb714b2019-11-29 15:02:06 +0000861 logger.Debugw("response-received", log.Fields{"msg-header": msg.Header})
William Kurkianea869482019-04-09 15:16:11 -0400862 go kp.dispatchResponse(msg)
863 } else {
Esin Karamanccb714b2019-11-29 15:02:06 +0000864 logger.Warnw("unsupported-message-received", log.Fields{"msg-header": msg.Header})
William Kurkianea869482019-04-09 15:16:11 -0400865 }
866}
867
npujarec5762e2020-01-01 14:08:48 +0530868func (kp *interContainerProxy) waitForMessages(ch <-chan *ic.InterContainerMessage, topic Topic, targetInterface interface{}) {
William Kurkianea869482019-04-09 15:16:11 -0400869 // Wait for messages
870 for msg := range ch {
Esin Karamanccb714b2019-11-29 15:02:06 +0000871 //logger.Debugw("request-received", log.Fields{"msg": msg, "topic": topic.Name, "target": targetInterface})
William Kurkianea869482019-04-09 15:16:11 -0400872 go kp.handleMessage(msg, targetInterface)
873 }
874}
875
npujarec5762e2020-01-01 14:08:48 +0530876func (kp *interContainerProxy) dispatchResponse(msg *ic.InterContainerMessage) {
William Kurkianea869482019-04-09 15:16:11 -0400877 kp.lockTransactionIdToChannelMap.RLock()
878 defer kp.lockTransactionIdToChannelMap.RUnlock()
879 if _, exist := kp.transactionIdToChannelMap[msg.Header.Id]; !exist {
Esin Karamanccb714b2019-11-29 15:02:06 +0000880 logger.Debugw("no-waiting-channel", log.Fields{"transaction": msg.Header.Id})
William Kurkianea869482019-04-09 15:16:11 -0400881 return
882 }
883 kp.transactionIdToChannelMap[msg.Header.Id].ch <- msg
884}
885
886// subscribeForResponse allows a caller to subscribe to a given topic when waiting for a response.
887// This method is built to prevent all subscribers to receive all messages as is the case of the Subscribe
888// API. There is one response channel waiting for kafka messages before dispatching the message to the
889// corresponding waiting channel
npujarec5762e2020-01-01 14:08:48 +0530890func (kp *interContainerProxy) subscribeForResponse(topic Topic, trnsId string) (chan *ic.InterContainerMessage, error) {
Esin Karamanccb714b2019-11-29 15:02:06 +0000891 logger.Debugw("subscribeForResponse", log.Fields{"topic": topic.Name, "trnsid": trnsId})
William Kurkianea869482019-04-09 15:16:11 -0400892
893 // Create a specific channel for this consumers. We cannot use the channel from the kafkaclient as it will
894 // broadcast any message for this topic to all channels waiting on it.
divyadesaid26f6b12020-03-19 06:30:28 +0000895 // Set channel size to 1 to prevent deadlock, see VOL-2708
896 ch := make(chan *ic.InterContainerMessage, 1)
William Kurkianea869482019-04-09 15:16:11 -0400897 kp.addToTransactionIdToChannelMap(trnsId, &topic, ch)
898
899 return ch, nil
900}
901
npujarec5762e2020-01-01 14:08:48 +0530902func (kp *interContainerProxy) unSubscribeForResponse(trnsId string) error {
Esin Karamanccb714b2019-11-29 15:02:06 +0000903 logger.Debugw("unsubscribe-for-response", log.Fields{"trnsId": trnsId})
William Kurkianea869482019-04-09 15:16:11 -0400904 kp.deleteFromTransactionIdToChannelMap(trnsId)
905 return nil
906}
907
npujarec5762e2020-01-01 14:08:48 +0530908func (kp *interContainerProxy) EnableLivenessChannel(enable bool) chan bool {
cbabu95f21522019-11-13 14:25:18 +0100909 return kp.kafkaClient.EnableLivenessChannel(enable)
910}
911
npujarec5762e2020-01-01 14:08:48 +0530912func (kp *interContainerProxy) EnableHealthinessChannel(enable bool) chan bool {
Scott Baker86fce9a2019-12-12 09:47:17 -0800913 return kp.kafkaClient.EnableHealthinessChannel(enable)
914}
915
npujarec5762e2020-01-01 14:08:48 +0530916func (kp *interContainerProxy) SendLiveness() error {
cbabu95f21522019-11-13 14:25:18 +0100917 return kp.kafkaClient.SendLiveness()
918}
919
William Kurkianea869482019-04-09 15:16:11 -0400920//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
921//or an error on failure
922func encodeRequest(rpc string, toTopic *Topic, replyTopic *Topic, key string, kvArgs ...*KVArg) (*ic.InterContainerMessage, error) {
923 requestHeader := &ic.Header{
924 Id: uuid.New().String(),
925 Type: ic.MessageType_REQUEST,
926 FromTopic: replyTopic.Name,
927 ToTopic: toTopic.Name,
Matt Jeanneret384d8c92019-05-06 14:27:31 -0400928 KeyTopic: key,
Scott Bakered4a8e72020-04-17 11:10:20 -0700929 Timestamp: ptypes.TimestampNow(),
William Kurkianea869482019-04-09 15:16:11 -0400930 }
931 requestBody := &ic.InterContainerRequestBody{
932 Rpc: rpc,
933 ResponseRequired: true,
934 ReplyToTopic: replyTopic.Name,
935 }
936
937 for _, arg := range kvArgs {
938 if arg == nil {
939 // In case the caller sends an array with empty args
940 continue
941 }
942 var marshalledArg *any.Any
943 var err error
944 // ascertain the value interface type is a proto.Message
945 protoValue, ok := arg.Value.(proto.Message)
946 if !ok {
Esin Karamanccb714b2019-11-29 15:02:06 +0000947 logger.Warnw("argument-value-not-proto-message", log.Fields{"error": ok, "Value": arg.Value})
William Kurkianea869482019-04-09 15:16:11 -0400948 err := errors.New("argument-value-not-proto-message")
949 return nil, err
950 }
951 if marshalledArg, err = ptypes.MarshalAny(protoValue); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000952 logger.Warnw("cannot-marshal-request", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400953 return nil, err
954 }
955 protoArg := &ic.Argument{
956 Key: arg.Key,
957 Value: marshalledArg,
958 }
959 requestBody.Args = append(requestBody.Args, protoArg)
960 }
961
962 var marshalledData *any.Any
963 var err error
964 if marshalledData, err = ptypes.MarshalAny(requestBody); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000965 logger.Warnw("cannot-marshal-request", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400966 return nil, err
967 }
968 request := &ic.InterContainerMessage{
969 Header: requestHeader,
970 Body: marshalledData,
971 }
972 return request, nil
973}
974
975func decodeResponse(response *ic.InterContainerMessage) (*ic.InterContainerResponseBody, error) {
976 // Extract the message body
977 responseBody := ic.InterContainerResponseBody{}
978 if err := ptypes.UnmarshalAny(response.Body, &responseBody); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000979 logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400980 return nil, err
981 }
Esin Karamanccb714b2019-11-29 15:02:06 +0000982 //logger.Debugw("response-decoded-successfully", log.Fields{"response-status": &responseBody.Success})
William Kurkianea869482019-04-09 15:16:11 -0400983
984 return &responseBody, nil
985
986}