blob: d21fdd5756f1589e4003b80d01604d9edf2f86ba [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kafka
17
18import (
19 "context"
20 "errors"
21 "fmt"
William Kurkianea869482019-04-09 15:16:11 -040022 "reflect"
23 "strings"
24 "sync"
25 "time"
William Kurkianea869482019-04-09 15:16:11 -040026
Esin Karamanccb714b2019-11-29 15:02:06 +000027 "github.com/golang/protobuf/proto"
28 "github.com/golang/protobuf/ptypes"
29 "github.com/golang/protobuf/ptypes/any"
30 "github.com/google/uuid"
31 "github.com/opencord/voltha-lib-go/v3/pkg/log"
32 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
33)
William Kurkianea869482019-04-09 15:16:11 -040034
35const (
36 DefaultMaxRetries = 3
37 DefaultRequestTimeout = 10000 // 10000 milliseconds - to handle a wider latency range
38)
39
40const (
41 TransactionKey = "transactionID"
42 FromTopic = "fromTopic"
43)
44
kdarapub26b4502019-10-05 03:02:33 +053045var ErrorTransactionNotAcquired = errors.New("transaction-not-acquired")
46var ErrorTransactionInvalidId = errors.New("transaction-invalid-id")
47
William Kurkianea869482019-04-09 15:16:11 -040048// requestHandlerChannel represents an interface associated with a channel. Whenever, an event is
49// obtained from that channel, this interface is invoked. This is used to handle
50// async requests into the Core via the kafka messaging bus
51type requestHandlerChannel struct {
52 requesthandlerInterface interface{}
53 ch <-chan *ic.InterContainerMessage
54}
55
56// transactionChannel represents a combination of a topic and a channel onto which a response received
57// on the kafka bus will be sent to
58type transactionChannel struct {
59 topic *Topic
60 ch chan *ic.InterContainerMessage
61}
62
npujarec5762e2020-01-01 14:08:48 +053063type InterContainerProxy interface {
64 Start() error
65 Stop()
66 GetDefaultTopic() *Topic
67 DeviceDiscovered(deviceId string, deviceType string, parentId string, publisher string) error
68 InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic, waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any)
69 SubscribeWithRequestHandlerInterface(topic Topic, handler interface{}) error
70 SubscribeWithDefaultRequestHandler(topic Topic, initialOffset int64) error
71 UnSubscribeFromRequestHandler(topic Topic) error
72 DeleteTopic(topic Topic) error
73 EnableLivenessChannel(enable bool) chan bool
74 SendLiveness() error
75}
76
77// interContainerProxy represents the messaging proxy
78type interContainerProxy struct {
William Kurkianea869482019-04-09 15:16:11 -040079 kafkaHost string
80 kafkaPort int
npujarec5762e2020-01-01 14:08:48 +053081 defaultTopic *Topic
William Kurkianea869482019-04-09 15:16:11 -040082 defaultRequestHandlerInterface interface{}
83 deviceDiscoveryTopic *Topic
84 kafkaClient Client
npujarec5762e2020-01-01 14:08:48 +053085 doneCh chan struct{}
86 doneOnce sync.Once
William Kurkianea869482019-04-09 15:16:11 -040087
88 // This map is used to map a topic to an interface and channel. When a request is received
89 // on that channel (registered to the topic) then that interface is invoked.
90 topicToRequestHandlerChannelMap map[string]*requestHandlerChannel
91 lockTopicRequestHandlerChannelMap sync.RWMutex
92
93 // This map is used to map a channel to a response topic. This channel handles all responses on that
94 // channel for that topic and forward them to the appropriate consumers channel, using the
95 // transactionIdToChannelMap.
96 topicToResponseChannelMap map[string]<-chan *ic.InterContainerMessage
97 lockTopicResponseChannelMap sync.RWMutex
98
99 // This map is used to map a transaction to a consumers channel. This is used whenever a request has been
100 // sent out and we are waiting for a response.
101 transactionIdToChannelMap map[string]*transactionChannel
102 lockTransactionIdToChannelMap sync.RWMutex
103}
104
npujarec5762e2020-01-01 14:08:48 +0530105type InterContainerProxyOption func(*interContainerProxy)
William Kurkianea869482019-04-09 15:16:11 -0400106
107func InterContainerHost(host string) InterContainerProxyOption {
npujarec5762e2020-01-01 14:08:48 +0530108 return func(args *interContainerProxy) {
William Kurkianea869482019-04-09 15:16:11 -0400109 args.kafkaHost = host
110 }
111}
112
113func InterContainerPort(port int) InterContainerProxyOption {
npujarec5762e2020-01-01 14:08:48 +0530114 return func(args *interContainerProxy) {
William Kurkianea869482019-04-09 15:16:11 -0400115 args.kafkaPort = port
116 }
117}
118
119func DefaultTopic(topic *Topic) InterContainerProxyOption {
npujarec5762e2020-01-01 14:08:48 +0530120 return func(args *interContainerProxy) {
121 args.defaultTopic = topic
William Kurkianea869482019-04-09 15:16:11 -0400122 }
123}
124
125func DeviceDiscoveryTopic(topic *Topic) InterContainerProxyOption {
npujarec5762e2020-01-01 14:08:48 +0530126 return func(args *interContainerProxy) {
William Kurkianea869482019-04-09 15:16:11 -0400127 args.deviceDiscoveryTopic = topic
128 }
129}
130
131func RequestHandlerInterface(handler interface{}) InterContainerProxyOption {
npujarec5762e2020-01-01 14:08:48 +0530132 return func(args *interContainerProxy) {
William Kurkianea869482019-04-09 15:16:11 -0400133 args.defaultRequestHandlerInterface = handler
134 }
135}
136
137func MsgClient(client Client) InterContainerProxyOption {
npujarec5762e2020-01-01 14:08:48 +0530138 return func(args *interContainerProxy) {
William Kurkianea869482019-04-09 15:16:11 -0400139 args.kafkaClient = client
140 }
141}
142
npujarec5762e2020-01-01 14:08:48 +0530143func newInterContainerProxy(opts ...InterContainerProxyOption) *interContainerProxy {
144 proxy := &interContainerProxy{
William Kurkianea869482019-04-09 15:16:11 -0400145 kafkaHost: DefaultKafkaHost,
146 kafkaPort: DefaultKafkaPort,
npujarec5762e2020-01-01 14:08:48 +0530147 doneCh: make(chan struct{}),
William Kurkianea869482019-04-09 15:16:11 -0400148 }
149
150 for _, option := range opts {
151 option(proxy)
152 }
153
npujarec5762e2020-01-01 14:08:48 +0530154 return proxy
William Kurkianea869482019-04-09 15:16:11 -0400155}
156
npujarec5762e2020-01-01 14:08:48 +0530157func NewInterContainerProxy(opts ...InterContainerProxyOption) InterContainerProxy {
158 return newInterContainerProxy(opts...)
159}
160
161func (kp *interContainerProxy) Start() error {
Esin Karamanccb714b2019-11-29 15:02:06 +0000162 logger.Info("Starting-Proxy")
William Kurkianea869482019-04-09 15:16:11 -0400163
164 // Kafka MsgClient should already have been created. If not, output fatal error
165 if kp.kafkaClient == nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000166 logger.Fatal("kafka-client-not-set")
William Kurkianea869482019-04-09 15:16:11 -0400167 }
168
William Kurkianea869482019-04-09 15:16:11 -0400169 // Start the kafka client
170 if err := kp.kafkaClient.Start(); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000171 logger.Errorw("Cannot-create-kafka-proxy", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400172 return err
173 }
174
175 // Create the topic to response channel map
176 kp.topicToResponseChannelMap = make(map[string]<-chan *ic.InterContainerMessage)
177 //
178 // Create the transactionId to Channel Map
179 kp.transactionIdToChannelMap = make(map[string]*transactionChannel)
180
181 // Create the topic to request channel map
182 kp.topicToRequestHandlerChannelMap = make(map[string]*requestHandlerChannel)
183
184 return nil
185}
186
npujarec5762e2020-01-01 14:08:48 +0530187func (kp *interContainerProxy) Stop() {
Esin Karamanccb714b2019-11-29 15:02:06 +0000188 logger.Info("stopping-intercontainer-proxy")
npujarec5762e2020-01-01 14:08:48 +0530189 kp.doneOnce.Do(func() { close(kp.doneCh) })
William Kurkianea869482019-04-09 15:16:11 -0400190 // TODO : Perform cleanup
191 kp.kafkaClient.Stop()
192 //kp.deleteAllTopicRequestHandlerChannelMap()
193 //kp.deleteAllTopicResponseChannelMap()
194 //kp.deleteAllTransactionIdToChannelMap()
195}
196
npujarec5762e2020-01-01 14:08:48 +0530197func (kp *interContainerProxy) GetDefaultTopic() *Topic {
198 return kp.defaultTopic
199}
200
William Kurkianea869482019-04-09 15:16:11 -0400201// DeviceDiscovered publish the discovered device onto the kafka messaging bus
npujarec5762e2020-01-01 14:08:48 +0530202func (kp *interContainerProxy) DeviceDiscovered(deviceId string, deviceType string, parentId string, publisher string) error {
Esin Karamanccb714b2019-11-29 15:02:06 +0000203 logger.Debugw("sending-device-discovery-msg", log.Fields{"deviceId": deviceId})
William Kurkianea869482019-04-09 15:16:11 -0400204 // Simple validation
205 if deviceId == "" || deviceType == "" {
Esin Karamanccb714b2019-11-29 15:02:06 +0000206 logger.Errorw("invalid-parameters", log.Fields{"id": deviceId, "type": deviceType})
William Kurkianea869482019-04-09 15:16:11 -0400207 return errors.New("invalid-parameters")
208 }
209 // Create the device discovery message
210 header := &ic.Header{
211 Id: uuid.New().String(),
212 Type: ic.MessageType_DEVICE_DISCOVERED,
npujarec5762e2020-01-01 14:08:48 +0530213 FromTopic: kp.defaultTopic.Name,
William Kurkianea869482019-04-09 15:16:11 -0400214 ToTopic: kp.deviceDiscoveryTopic.Name,
215 Timestamp: time.Now().UnixNano(),
216 }
217 body := &ic.DeviceDiscovered{
218 Id: deviceId,
219 DeviceType: deviceType,
220 ParentId: parentId,
221 Publisher: publisher,
222 }
223
224 var marshalledData *any.Any
225 var err error
226 if marshalledData, err = ptypes.MarshalAny(body); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000227 logger.Errorw("cannot-marshal-request", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400228 return err
229 }
230 msg := &ic.InterContainerMessage{
231 Header: header,
232 Body: marshalledData,
233 }
234
235 // Send the message
236 if err := kp.kafkaClient.Send(msg, kp.deviceDiscoveryTopic); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000237 logger.Errorw("cannot-send-device-discovery-message", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400238 return err
239 }
240 return nil
241}
242
243// InvokeRPC is used to send a request to a given topic
npujarec5762e2020-01-01 14:08:48 +0530244func (kp *interContainerProxy) InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
William Kurkianea869482019-04-09 15:16:11 -0400245 waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any) {
246
247 // If a replyToTopic is provided then we use it, otherwise just use the default toTopic. The replyToTopic is
248 // typically the device ID.
249 responseTopic := replyToTopic
250 if responseTopic == nil {
npujarec5762e2020-01-01 14:08:48 +0530251 responseTopic = kp.defaultTopic
William Kurkianea869482019-04-09 15:16:11 -0400252 }
253
254 // Encode the request
255 protoRequest, err := encodeRequest(rpc, toTopic, responseTopic, key, kvArgs...)
256 if err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000257 logger.Warnw("cannot-format-request", log.Fields{"rpc": rpc, "error": err})
William Kurkianea869482019-04-09 15:16:11 -0400258 return false, nil
259 }
260
261 // Subscribe for response, if needed, before sending request
262 var ch <-chan *ic.InterContainerMessage
263 if waitForResponse {
264 var err error
265 if ch, err = kp.subscribeForResponse(*responseTopic, protoRequest.Header.Id); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000266 logger.Errorw("failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400267 }
268 }
269
270 // Send request - if the topic is formatted with a device Id then we will send the request using a
271 // specific key, hence ensuring a single partition is used to publish the request. This ensures that the
272 // subscriber on that topic will receive the request in the order it was sent. The key used is the deviceId.
273 //key := GetDeviceIdFromTopic(*toTopic)
Esin Karamanccb714b2019-11-29 15:02:06 +0000274 logger.Debugw("sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
William Kurkianea869482019-04-09 15:16:11 -0400275 go kp.kafkaClient.Send(protoRequest, toTopic, key)
276
277 if waitForResponse {
278 // Create a child context based on the parent context, if any
279 var cancel context.CancelFunc
280 childCtx := context.Background()
281 if ctx == nil {
282 ctx, cancel = context.WithTimeout(context.Background(), DefaultRequestTimeout*time.Millisecond)
283 } else {
284 childCtx, cancel = context.WithTimeout(ctx, DefaultRequestTimeout*time.Millisecond)
285 }
286 defer cancel()
287
288 // Wait for response as well as timeout or cancellation
289 // Remove the subscription for a response on return
290 defer kp.unSubscribeForResponse(protoRequest.Header.Id)
291 select {
292 case msg, ok := <-ch:
293 if !ok {
Esin Karamanccb714b2019-11-29 15:02:06 +0000294 logger.Warnw("channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400295 protoError := &ic.Error{Reason: "channel-closed"}
296 var marshalledArg *any.Any
297 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
298 return false, nil // Should never happen
299 }
300 return false, marshalledArg
301 }
Esin Karamanccb714b2019-11-29 15:02:06 +0000302 logger.Debugw("received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
William Kurkianea869482019-04-09 15:16:11 -0400303 var responseBody *ic.InterContainerResponseBody
304 var err error
305 if responseBody, err = decodeResponse(msg); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000306 logger.Errorw("decode-response-error", log.Fields{"error": err})
npujarec5762e2020-01-01 14:08:48 +0530307 // FIXME we should return something
William Kurkianea869482019-04-09 15:16:11 -0400308 }
309 return responseBody.Success, responseBody.Result
310 case <-ctx.Done():
Esin Karamanccb714b2019-11-29 15:02:06 +0000311 logger.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
William Kurkianea869482019-04-09 15:16:11 -0400312 // pack the error as proto any type
npujarec5762e2020-01-01 14:08:48 +0530313 protoError := &ic.Error{Reason: ctx.Err().Error(), Code: ic.ErrorCode_DEADLINE_EXCEEDED}
314
William Kurkianea869482019-04-09 15:16:11 -0400315 var marshalledArg *any.Any
316 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
317 return false, nil // Should never happen
318 }
319 return false, marshalledArg
320 case <-childCtx.Done():
Esin Karamanccb714b2019-11-29 15:02:06 +0000321 logger.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": childCtx.Err()})
William Kurkianea869482019-04-09 15:16:11 -0400322 // pack the error as proto any type
npujarec5762e2020-01-01 14:08:48 +0530323 protoError := &ic.Error{Reason: childCtx.Err().Error(), Code: ic.ErrorCode_DEADLINE_EXCEEDED}
324
William Kurkianea869482019-04-09 15:16:11 -0400325 var marshalledArg *any.Any
326 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
327 return false, nil // Should never happen
328 }
329 return false, marshalledArg
330 case <-kp.doneCh:
Esin Karamanccb714b2019-11-29 15:02:06 +0000331 logger.Infow("received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
William Kurkianea869482019-04-09 15:16:11 -0400332 return true, nil
333 }
334 }
335 return true, nil
336}
337
338// SubscribeWithRequestHandlerInterface allows a caller to assign a target object to be invoked automatically
339// when a message is received on a given topic
npujarec5762e2020-01-01 14:08:48 +0530340func (kp *interContainerProxy) SubscribeWithRequestHandlerInterface(topic Topic, handler interface{}) error {
William Kurkianea869482019-04-09 15:16:11 -0400341
342 // Subscribe to receive messages for that topic
343 var ch <-chan *ic.InterContainerMessage
344 var err error
345 if ch, err = kp.kafkaClient.Subscribe(&topic); err != nil {
346 //if ch, err = kp.Subscribe(topic); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000347 logger.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
Matt Jeannereteb5059f2019-07-19 06:11:00 -0400348 return err
William Kurkianea869482019-04-09 15:16:11 -0400349 }
350
351 kp.defaultRequestHandlerInterface = handler
352 kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: handler, ch: ch})
353 // Launch a go routine to receive and process kafka messages
354 go kp.waitForMessages(ch, topic, handler)
355
356 return nil
357}
358
359// SubscribeWithDefaultRequestHandler allows a caller to add a topic to an existing target object to be invoked automatically
360// when a message is received on a given topic. So far there is only 1 target registered per microservice
npujarec5762e2020-01-01 14:08:48 +0530361func (kp *interContainerProxy) SubscribeWithDefaultRequestHandler(topic Topic, initialOffset int64) error {
William Kurkianea869482019-04-09 15:16:11 -0400362 // Subscribe to receive messages for that topic
363 var ch <-chan *ic.InterContainerMessage
364 var err error
365 if ch, err = kp.kafkaClient.Subscribe(&topic, &KVArg{Key: Offset, Value: initialOffset}); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000366 logger.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400367 return err
368 }
369 kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: kp.defaultRequestHandlerInterface, ch: ch})
370
371 // Launch a go routine to receive and process kafka messages
372 go kp.waitForMessages(ch, topic, kp.defaultRequestHandlerInterface)
373
374 return nil
375}
376
npujarec5762e2020-01-01 14:08:48 +0530377func (kp *interContainerProxy) UnSubscribeFromRequestHandler(topic Topic) error {
William Kurkianea869482019-04-09 15:16:11 -0400378 return kp.deleteFromTopicRequestHandlerChannelMap(topic.Name)
379}
380
381// setupTopicResponseChannelMap sets up single consumers channel that will act as a broadcast channel for all
382// responses from that topic.
npujarec5762e2020-01-01 14:08:48 +0530383func (kp *interContainerProxy) setupTopicResponseChannelMap(topic string, arg <-chan *ic.InterContainerMessage) {
William Kurkianea869482019-04-09 15:16:11 -0400384 kp.lockTopicResponseChannelMap.Lock()
385 defer kp.lockTopicResponseChannelMap.Unlock()
386 if _, exist := kp.topicToResponseChannelMap[topic]; !exist {
387 kp.topicToResponseChannelMap[topic] = arg
388 }
389}
390
npujarec5762e2020-01-01 14:08:48 +0530391func (kp *interContainerProxy) isTopicSubscribedForResponse(topic string) bool {
William Kurkianea869482019-04-09 15:16:11 -0400392 kp.lockTopicResponseChannelMap.RLock()
393 defer kp.lockTopicResponseChannelMap.RUnlock()
394 _, exist := kp.topicToResponseChannelMap[topic]
395 return exist
396}
397
npujarec5762e2020-01-01 14:08:48 +0530398func (kp *interContainerProxy) deleteFromTopicResponseChannelMap(topic string) error {
William Kurkianea869482019-04-09 15:16:11 -0400399 kp.lockTopicResponseChannelMap.Lock()
400 defer kp.lockTopicResponseChannelMap.Unlock()
401 if _, exist := kp.topicToResponseChannelMap[topic]; exist {
402 // Unsubscribe to this topic first - this will close the subscribed channel
403 var err error
404 if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000405 logger.Errorw("unsubscribing-error", log.Fields{"topic": topic})
William Kurkianea869482019-04-09 15:16:11 -0400406 }
407 delete(kp.topicToResponseChannelMap, topic)
408 return err
409 } else {
410 return errors.New(fmt.Sprintf("%s-Topic-not-found", topic))
411 }
412}
413
npujarec5762e2020-01-01 14:08:48 +0530414func (kp *interContainerProxy) deleteAllTopicResponseChannelMap() error {
William Kurkianea869482019-04-09 15:16:11 -0400415 kp.lockTopicResponseChannelMap.Lock()
416 defer kp.lockTopicResponseChannelMap.Unlock()
417 var err error
418 for topic, _ := range kp.topicToResponseChannelMap {
419 // Unsubscribe to this topic first - this will close the subscribed channel
420 if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000421 logger.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
William Kurkianea869482019-04-09 15:16:11 -0400422 }
423 delete(kp.topicToResponseChannelMap, topic)
424 }
425 return err
426}
427
npujarec5762e2020-01-01 14:08:48 +0530428func (kp *interContainerProxy) addToTopicRequestHandlerChannelMap(topic string, arg *requestHandlerChannel) {
William Kurkianea869482019-04-09 15:16:11 -0400429 kp.lockTopicRequestHandlerChannelMap.Lock()
430 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
431 if _, exist := kp.topicToRequestHandlerChannelMap[topic]; !exist {
432 kp.topicToRequestHandlerChannelMap[topic] = arg
433 }
434}
435
npujarec5762e2020-01-01 14:08:48 +0530436func (kp *interContainerProxy) deleteFromTopicRequestHandlerChannelMap(topic string) error {
William Kurkianea869482019-04-09 15:16:11 -0400437 kp.lockTopicRequestHandlerChannelMap.Lock()
438 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
439 if _, exist := kp.topicToRequestHandlerChannelMap[topic]; exist {
440 // Close the kafka client client first by unsubscribing to this topic
441 kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch)
442 delete(kp.topicToRequestHandlerChannelMap, topic)
443 return nil
444 } else {
445 return errors.New(fmt.Sprintf("%s-Topic-not-found", topic))
446 }
447}
448
npujarec5762e2020-01-01 14:08:48 +0530449func (kp *interContainerProxy) deleteAllTopicRequestHandlerChannelMap() error {
William Kurkianea869482019-04-09 15:16:11 -0400450 kp.lockTopicRequestHandlerChannelMap.Lock()
451 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
452 var err error
453 for topic, _ := range kp.topicToRequestHandlerChannelMap {
454 // Close the kafka client client first by unsubscribing to this topic
455 if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000456 logger.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
William Kurkianea869482019-04-09 15:16:11 -0400457 }
458 delete(kp.topicToRequestHandlerChannelMap, topic)
459 }
460 return err
461}
462
npujarec5762e2020-01-01 14:08:48 +0530463func (kp *interContainerProxy) addToTransactionIdToChannelMap(id string, topic *Topic, arg chan *ic.InterContainerMessage) {
William Kurkianea869482019-04-09 15:16:11 -0400464 kp.lockTransactionIdToChannelMap.Lock()
465 defer kp.lockTransactionIdToChannelMap.Unlock()
466 if _, exist := kp.transactionIdToChannelMap[id]; !exist {
467 kp.transactionIdToChannelMap[id] = &transactionChannel{topic: topic, ch: arg}
468 }
469}
470
npujarec5762e2020-01-01 14:08:48 +0530471func (kp *interContainerProxy) deleteFromTransactionIdToChannelMap(id string) {
William Kurkianea869482019-04-09 15:16:11 -0400472 kp.lockTransactionIdToChannelMap.Lock()
473 defer kp.lockTransactionIdToChannelMap.Unlock()
474 if transChannel, exist := kp.transactionIdToChannelMap[id]; exist {
475 // Close the channel first
476 close(transChannel.ch)
477 delete(kp.transactionIdToChannelMap, id)
478 }
479}
480
npujarec5762e2020-01-01 14:08:48 +0530481func (kp *interContainerProxy) deleteTopicTransactionIdToChannelMap(id string) {
William Kurkianea869482019-04-09 15:16:11 -0400482 kp.lockTransactionIdToChannelMap.Lock()
483 defer kp.lockTransactionIdToChannelMap.Unlock()
484 for key, value := range kp.transactionIdToChannelMap {
485 if value.topic.Name == id {
486 close(value.ch)
487 delete(kp.transactionIdToChannelMap, key)
488 }
489 }
490}
491
npujarec5762e2020-01-01 14:08:48 +0530492func (kp *interContainerProxy) deleteAllTransactionIdToChannelMap() {
William Kurkianea869482019-04-09 15:16:11 -0400493 kp.lockTransactionIdToChannelMap.Lock()
494 defer kp.lockTransactionIdToChannelMap.Unlock()
495 for key, value := range kp.transactionIdToChannelMap {
496 close(value.ch)
497 delete(kp.transactionIdToChannelMap, key)
498 }
499}
500
npujarec5762e2020-01-01 14:08:48 +0530501func (kp *interContainerProxy) DeleteTopic(topic Topic) error {
William Kurkianea869482019-04-09 15:16:11 -0400502 // If we have any consumers on that topic we need to close them
503 if err := kp.deleteFromTopicResponseChannelMap(topic.Name); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000504 logger.Errorw("delete-from-topic-responsechannelmap-failed", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400505 }
506 if err := kp.deleteFromTopicRequestHandlerChannelMap(topic.Name); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000507 logger.Errorw("delete-from-topic-requesthandlerchannelmap-failed", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400508 }
509 kp.deleteTopicTransactionIdToChannelMap(topic.Name)
510
511 return kp.kafkaClient.DeleteTopic(&topic)
512}
513
514func encodeReturnedValue(returnedVal interface{}) (*any.Any, error) {
515 // Encode the response argument - needs to be a proto message
516 if returnedVal == nil {
517 return nil, nil
518 }
519 protoValue, ok := returnedVal.(proto.Message)
520 if !ok {
Esin Karamanccb714b2019-11-29 15:02:06 +0000521 logger.Warnw("response-value-not-proto-message", log.Fields{"error": ok, "returnVal": returnedVal})
William Kurkianea869482019-04-09 15:16:11 -0400522 err := errors.New("response-value-not-proto-message")
523 return nil, err
524 }
525
526 // Marshal the returned value, if any
527 var marshalledReturnedVal *any.Any
528 var err error
529 if marshalledReturnedVal, err = ptypes.MarshalAny(protoValue); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000530 logger.Warnw("cannot-marshal-returned-val", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400531 return nil, err
532 }
533 return marshalledReturnedVal, nil
534}
535
536func encodeDefaultFailedResponse(request *ic.InterContainerMessage) *ic.InterContainerMessage {
537 responseHeader := &ic.Header{
538 Id: request.Header.Id,
539 Type: ic.MessageType_RESPONSE,
540 FromTopic: request.Header.ToTopic,
541 ToTopic: request.Header.FromTopic,
npujarec5762e2020-01-01 14:08:48 +0530542 Timestamp: time.Now().UnixNano(),
William Kurkianea869482019-04-09 15:16:11 -0400543 }
544 responseBody := &ic.InterContainerResponseBody{
545 Success: false,
546 Result: nil,
547 }
548 var marshalledResponseBody *any.Any
549 var err error
550 // Error should never happen here
551 if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000552 logger.Warnw("cannot-marshal-failed-response-body", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400553 }
554
555 return &ic.InterContainerMessage{
556 Header: responseHeader,
557 Body: marshalledResponseBody,
558 }
559
560}
561
562//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
563//or an error on failure
564func encodeResponse(request *ic.InterContainerMessage, success bool, returnedValues ...interface{}) (*ic.InterContainerMessage, error) {
Esin Karamanccb714b2019-11-29 15:02:06 +0000565 //logger.Debugw("encodeResponse", log.Fields{"success": success, "returnedValues": returnedValues})
William Kurkianea869482019-04-09 15:16:11 -0400566 responseHeader := &ic.Header{
567 Id: request.Header.Id,
568 Type: ic.MessageType_RESPONSE,
569 FromTopic: request.Header.ToTopic,
570 ToTopic: request.Header.FromTopic,
Matt Jeanneret384d8c92019-05-06 14:27:31 -0400571 KeyTopic: request.Header.KeyTopic,
William Kurkianea869482019-04-09 15:16:11 -0400572 Timestamp: time.Now().UnixNano(),
573 }
574
575 // Go over all returned values
576 var marshalledReturnedVal *any.Any
577 var err error
578 for _, returnVal := range returnedValues {
579 if marshalledReturnedVal, err = encodeReturnedValue(returnVal); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000580 logger.Warnw("cannot-marshal-response-body", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400581 }
582 break // for now we support only 1 returned value - (excluding the error)
583 }
584
585 responseBody := &ic.InterContainerResponseBody{
586 Success: success,
587 Result: marshalledReturnedVal,
588 }
589
590 // Marshal the response body
591 var marshalledResponseBody *any.Any
592 if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000593 logger.Warnw("cannot-marshal-response-body", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400594 return nil, err
595 }
596
597 return &ic.InterContainerMessage{
598 Header: responseHeader,
599 Body: marshalledResponseBody,
600 }, nil
601}
602
603func CallFuncByName(myClass interface{}, funcName string, params ...interface{}) (out []reflect.Value, err error) {
604 myClassValue := reflect.ValueOf(myClass)
605 // Capitalize the first letter in the funcName to workaround the first capital letters required to
606 // invoke a function from a different package
607 funcName = strings.Title(funcName)
608 m := myClassValue.MethodByName(funcName)
609 if !m.IsValid() {
610 return make([]reflect.Value, 0), fmt.Errorf("method-not-found \"%s\"", funcName)
611 }
612 in := make([]reflect.Value, len(params))
613 for i, param := range params {
614 in[i] = reflect.ValueOf(param)
615 }
616 out = m.Call(in)
617 return
618}
619
npujarec5762e2020-01-01 14:08:48 +0530620func (kp *interContainerProxy) addTransactionId(transactionId string, currentArgs []*ic.Argument) []*ic.Argument {
William Kurkianea869482019-04-09 15:16:11 -0400621 arg := &KVArg{
622 Key: TransactionKey,
623 Value: &ic.StrType{Val: transactionId},
624 }
625
626 var marshalledArg *any.Any
627 var err error
628 if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: transactionId}); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000629 logger.Warnw("cannot-add-transactionId", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400630 return currentArgs
631 }
632 protoArg := &ic.Argument{
633 Key: arg.Key,
634 Value: marshalledArg,
635 }
636 return append(currentArgs, protoArg)
637}
638
npujarec5762e2020-01-01 14:08:48 +0530639func (kp *interContainerProxy) addFromTopic(fromTopic string, currentArgs []*ic.Argument) []*ic.Argument {
William Kurkianea869482019-04-09 15:16:11 -0400640 var marshalledArg *any.Any
641 var err error
642 if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: fromTopic}); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000643 logger.Warnw("cannot-add-transactionId", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400644 return currentArgs
645 }
646 protoArg := &ic.Argument{
647 Key: FromTopic,
648 Value: marshalledArg,
649 }
650 return append(currentArgs, protoArg)
651}
652
npujarec5762e2020-01-01 14:08:48 +0530653func (kp *interContainerProxy) handleMessage(msg *ic.InterContainerMessage, targetInterface interface{}) {
William Kurkianea869482019-04-09 15:16:11 -0400654
655 // First extract the header to know whether this is a request - responses are handled by a different handler
656 if msg.Header.Type == ic.MessageType_REQUEST {
657 var out []reflect.Value
658 var err error
659
660 // Get the request body
661 requestBody := &ic.InterContainerRequestBody{}
662 if err = ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000663 logger.Warnw("cannot-unmarshal-request", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400664 } else {
Esin Karamanccb714b2019-11-29 15:02:06 +0000665 logger.Debugw("received-request", log.Fields{"rpc": requestBody.Rpc, "header": msg.Header})
William Kurkianea869482019-04-09 15:16:11 -0400666 // let the callee unpack the arguments as its the only one that knows the real proto type
667 // Augment the requestBody with the message Id as it will be used in scenarios where cores
668 // are set in pairs and competing
669 requestBody.Args = kp.addTransactionId(msg.Header.Id, requestBody.Args)
670
671 // Augment the requestBody with the From topic name as it will be used in scenarios where a container
672 // needs to send an unsollicited message to the currently requested container
673 requestBody.Args = kp.addFromTopic(msg.Header.FromTopic, requestBody.Args)
674
675 out, err = CallFuncByName(targetInterface, requestBody.Rpc, requestBody.Args)
676 if err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000677 logger.Warn(err)
William Kurkianea869482019-04-09 15:16:11 -0400678 }
679 }
680 // Response required?
681 if requestBody.ResponseRequired {
682 // If we already have an error before then just return that
683 var returnError *ic.Error
684 var returnedValues []interface{}
685 var success bool
686 if err != nil {
687 returnError = &ic.Error{Reason: err.Error()}
688 returnedValues = make([]interface{}, 1)
689 returnedValues[0] = returnError
690 } else {
691 returnedValues = make([]interface{}, 0)
692 // Check for errors first
693 lastIndex := len(out) - 1
694 if out[lastIndex].Interface() != nil { // Error
kdarapub26b4502019-10-05 03:02:33 +0530695 if retError, ok := out[lastIndex].Interface().(error); ok {
696 if retError.Error() == ErrorTransactionNotAcquired.Error() {
Esin Karamanccb714b2019-11-29 15:02:06 +0000697 logger.Debugw("Ignoring request", log.Fields{"error": retError, "txId": msg.Header.Id})
kdarapub26b4502019-10-05 03:02:33 +0530698 return // Ignore - process is in competing mode and ignored transaction
699 }
700 returnError = &ic.Error{Reason: retError.Error()}
William Kurkianea869482019-04-09 15:16:11 -0400701 returnedValues = append(returnedValues, returnError)
702 } else { // Should never happen
703 returnError = &ic.Error{Reason: "incorrect-error-returns"}
704 returnedValues = append(returnedValues, returnError)
705 }
706 } else if len(out) == 2 && reflect.ValueOf(out[0].Interface()).IsValid() && reflect.ValueOf(out[0].Interface()).IsNil() {
Esin Karamanccb714b2019-11-29 15:02:06 +0000707 logger.Warnw("Unexpected response of (nil,nil)", log.Fields{"txId": msg.Header.Id})
kdarapub26b4502019-10-05 03:02:33 +0530708 return // Ignore - should not happen
William Kurkianea869482019-04-09 15:16:11 -0400709 } else { // Non-error case
710 success = true
711 for idx, val := range out {
Esin Karamanccb714b2019-11-29 15:02:06 +0000712 //logger.Debugw("returned-api-response-loop", log.Fields{"idx": idx, "val": val.Interface()})
William Kurkianea869482019-04-09 15:16:11 -0400713 if idx != lastIndex {
714 returnedValues = append(returnedValues, val.Interface())
715 }
716 }
717 }
718 }
719
720 var icm *ic.InterContainerMessage
721 if icm, err = encodeResponse(msg, success, returnedValues...); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000722 logger.Warnw("error-encoding-response-returning-failure-result", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400723 icm = encodeDefaultFailedResponse(msg)
724 }
725 // To preserve ordering of messages, all messages to a given topic are sent to the same partition
726 // by providing a message key. The key is encoded in the topic name. If the deviceId is not
727 // present then the key will be empty, hence all messages for a given topic will be sent to all
728 // partitions.
729 replyTopic := &Topic{Name: msg.Header.FromTopic}
730 key := msg.Header.KeyTopic
Esin Karamanccb714b2019-11-29 15:02:06 +0000731 logger.Debugw("sending-response-to-kafka", log.Fields{"rpc": requestBody.Rpc, "header": icm.Header, "key": key})
William Kurkianea869482019-04-09 15:16:11 -0400732 // TODO: handle error response.
Matt Jeanneret384d8c92019-05-06 14:27:31 -0400733 go kp.kafkaClient.Send(icm, replyTopic, key)
William Kurkianea869482019-04-09 15:16:11 -0400734 }
735 } else if msg.Header.Type == ic.MessageType_RESPONSE {
Esin Karamanccb714b2019-11-29 15:02:06 +0000736 logger.Debugw("response-received", log.Fields{"msg-header": msg.Header})
William Kurkianea869482019-04-09 15:16:11 -0400737 go kp.dispatchResponse(msg)
738 } else {
Esin Karamanccb714b2019-11-29 15:02:06 +0000739 logger.Warnw("unsupported-message-received", log.Fields{"msg-header": msg.Header})
William Kurkianea869482019-04-09 15:16:11 -0400740 }
741}
742
npujarec5762e2020-01-01 14:08:48 +0530743func (kp *interContainerProxy) waitForMessages(ch <-chan *ic.InterContainerMessage, topic Topic, targetInterface interface{}) {
William Kurkianea869482019-04-09 15:16:11 -0400744 // Wait for messages
745 for msg := range ch {
Esin Karamanccb714b2019-11-29 15:02:06 +0000746 //logger.Debugw("request-received", log.Fields{"msg": msg, "topic": topic.Name, "target": targetInterface})
William Kurkianea869482019-04-09 15:16:11 -0400747 go kp.handleMessage(msg, targetInterface)
748 }
749}
750
npujarec5762e2020-01-01 14:08:48 +0530751func (kp *interContainerProxy) dispatchResponse(msg *ic.InterContainerMessage) {
William Kurkianea869482019-04-09 15:16:11 -0400752 kp.lockTransactionIdToChannelMap.RLock()
753 defer kp.lockTransactionIdToChannelMap.RUnlock()
754 if _, exist := kp.transactionIdToChannelMap[msg.Header.Id]; !exist {
Esin Karamanccb714b2019-11-29 15:02:06 +0000755 logger.Debugw("no-waiting-channel", log.Fields{"transaction": msg.Header.Id})
William Kurkianea869482019-04-09 15:16:11 -0400756 return
757 }
758 kp.transactionIdToChannelMap[msg.Header.Id].ch <- msg
759}
760
761// subscribeForResponse allows a caller to subscribe to a given topic when waiting for a response.
762// This method is built to prevent all subscribers to receive all messages as is the case of the Subscribe
763// API. There is one response channel waiting for kafka messages before dispatching the message to the
764// corresponding waiting channel
npujarec5762e2020-01-01 14:08:48 +0530765func (kp *interContainerProxy) subscribeForResponse(topic Topic, trnsId string) (chan *ic.InterContainerMessage, error) {
Esin Karamanccb714b2019-11-29 15:02:06 +0000766 logger.Debugw("subscribeForResponse", log.Fields{"topic": topic.Name, "trnsid": trnsId})
William Kurkianea869482019-04-09 15:16:11 -0400767
768 // Create a specific channel for this consumers. We cannot use the channel from the kafkaclient as it will
769 // broadcast any message for this topic to all channels waiting on it.
770 ch := make(chan *ic.InterContainerMessage)
771 kp.addToTransactionIdToChannelMap(trnsId, &topic, ch)
772
773 return ch, nil
774}
775
npujarec5762e2020-01-01 14:08:48 +0530776func (kp *interContainerProxy) unSubscribeForResponse(trnsId string) error {
Esin Karamanccb714b2019-11-29 15:02:06 +0000777 logger.Debugw("unsubscribe-for-response", log.Fields{"trnsId": trnsId})
William Kurkianea869482019-04-09 15:16:11 -0400778 kp.deleteFromTransactionIdToChannelMap(trnsId)
779 return nil
780}
781
npujarec5762e2020-01-01 14:08:48 +0530782func (kp *interContainerProxy) EnableLivenessChannel(enable bool) chan bool {
cbabu95f21522019-11-13 14:25:18 +0100783 return kp.kafkaClient.EnableLivenessChannel(enable)
784}
785
npujarec5762e2020-01-01 14:08:48 +0530786func (kp *interContainerProxy) EnableHealthinessChannel(enable bool) chan bool {
Scott Baker86fce9a2019-12-12 09:47:17 -0800787 return kp.kafkaClient.EnableHealthinessChannel(enable)
788}
789
npujarec5762e2020-01-01 14:08:48 +0530790func (kp *interContainerProxy) SendLiveness() error {
cbabu95f21522019-11-13 14:25:18 +0100791 return kp.kafkaClient.SendLiveness()
792}
793
William Kurkianea869482019-04-09 15:16:11 -0400794//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
795//or an error on failure
796func encodeRequest(rpc string, toTopic *Topic, replyTopic *Topic, key string, kvArgs ...*KVArg) (*ic.InterContainerMessage, error) {
797 requestHeader := &ic.Header{
798 Id: uuid.New().String(),
799 Type: ic.MessageType_REQUEST,
800 FromTopic: replyTopic.Name,
801 ToTopic: toTopic.Name,
Matt Jeanneret384d8c92019-05-06 14:27:31 -0400802 KeyTopic: key,
William Kurkianea869482019-04-09 15:16:11 -0400803 Timestamp: time.Now().UnixNano(),
804 }
805 requestBody := &ic.InterContainerRequestBody{
806 Rpc: rpc,
807 ResponseRequired: true,
808 ReplyToTopic: replyTopic.Name,
809 }
810
811 for _, arg := range kvArgs {
812 if arg == nil {
813 // In case the caller sends an array with empty args
814 continue
815 }
816 var marshalledArg *any.Any
817 var err error
818 // ascertain the value interface type is a proto.Message
819 protoValue, ok := arg.Value.(proto.Message)
820 if !ok {
Esin Karamanccb714b2019-11-29 15:02:06 +0000821 logger.Warnw("argument-value-not-proto-message", log.Fields{"error": ok, "Value": arg.Value})
William Kurkianea869482019-04-09 15:16:11 -0400822 err := errors.New("argument-value-not-proto-message")
823 return nil, err
824 }
825 if marshalledArg, err = ptypes.MarshalAny(protoValue); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000826 logger.Warnw("cannot-marshal-request", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400827 return nil, err
828 }
829 protoArg := &ic.Argument{
830 Key: arg.Key,
831 Value: marshalledArg,
832 }
833 requestBody.Args = append(requestBody.Args, protoArg)
834 }
835
836 var marshalledData *any.Any
837 var err error
838 if marshalledData, err = ptypes.MarshalAny(requestBody); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000839 logger.Warnw("cannot-marshal-request", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400840 return nil, err
841 }
842 request := &ic.InterContainerMessage{
843 Header: requestHeader,
844 Body: marshalledData,
845 }
846 return request, nil
847}
848
849func decodeResponse(response *ic.InterContainerMessage) (*ic.InterContainerResponseBody, error) {
850 // Extract the message body
851 responseBody := ic.InterContainerResponseBody{}
852 if err := ptypes.UnmarshalAny(response.Body, &responseBody); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000853 logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400854 return nil, err
855 }
Esin Karamanccb714b2019-11-29 15:02:06 +0000856 //logger.Debugw("response-decoded-successfully", log.Fields{"response-status": &responseBody.Success})
William Kurkianea869482019-04-09 15:16:11 -0400857
858 return &responseBody, nil
859
860}