blob: c7dc5af34f84bf4aa3b67966833a79e1b6afeb98 [file] [log] [blame]
Scott Baker2c1c4822019-10-16 11:02:41 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kafka
17
18import (
19 "context"
20 "errors"
21 "fmt"
Scott Baker2c1c4822019-10-16 11:02:41 -070022 "reflect"
23 "strings"
24 "sync"
25 "time"
serkant.uluderyab38671c2019-11-01 09:35:38 -070026
27 "github.com/golang/protobuf/proto"
28 "github.com/golang/protobuf/ptypes"
29 "github.com/golang/protobuf/ptypes/any"
30 "github.com/google/uuid"
31 "github.com/opencord/voltha-lib-go/v3/pkg/log"
32 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
Scott Baker2c1c4822019-10-16 11:02:41 -070033)
34
Scott Baker2c1c4822019-10-16 11:02:41 -070035const (
36 DefaultMaxRetries = 3
37 DefaultRequestTimeout = 10000 // 10000 milliseconds - to handle a wider latency range
38)
39
40const (
41 TransactionKey = "transactionID"
42 FromTopic = "fromTopic"
43)
44
45var ErrorTransactionNotAcquired = errors.New("transaction-not-acquired")
46var ErrorTransactionInvalidId = errors.New("transaction-invalid-id")
47
48// requestHandlerChannel represents an interface associated with a channel. Whenever, an event is
49// obtained from that channel, this interface is invoked. This is used to handle
50// async requests into the Core via the kafka messaging bus
51type requestHandlerChannel struct {
52 requesthandlerInterface interface{}
53 ch <-chan *ic.InterContainerMessage
54}
55
56// transactionChannel represents a combination of a topic and a channel onto which a response received
57// on the kafka bus will be sent to
58type transactionChannel struct {
59 topic *Topic
60 ch chan *ic.InterContainerMessage
61}
62
Matteo Scandolo2ba00d32020-01-16 17:33:03 -080063type InterContainerProxy interface {
64 Start() error
65 Stop()
Matteo Scandolof346a2d2020-01-24 13:14:54 -080066 GetDefaultTopic() *Topic
Matteo Scandolo2ba00d32020-01-16 17:33:03 -080067 DeviceDiscovered(deviceId string, deviceType string, parentId string, publisher string) error
68 InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic, waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any)
69 SubscribeWithRequestHandlerInterface(topic Topic, handler interface{}) error
70 SubscribeWithDefaultRequestHandler(topic Topic, initialOffset int64) error
71 UnSubscribeFromRequestHandler(topic Topic) error
72 DeleteTopic(topic Topic) error
Matteo Scandolof346a2d2020-01-24 13:14:54 -080073 EnableLivenessChannel(enable bool) chan bool
74 SendLiveness() error
Matteo Scandolo2ba00d32020-01-16 17:33:03 -080075}
76
77// interContainerProxy represents the messaging proxy
78type interContainerProxy struct {
Scott Baker2c1c4822019-10-16 11:02:41 -070079 kafkaHost string
80 kafkaPort int
Matteo Scandolof346a2d2020-01-24 13:14:54 -080081 defaultTopic *Topic
Scott Baker2c1c4822019-10-16 11:02:41 -070082 defaultRequestHandlerInterface interface{}
83 deviceDiscoveryTopic *Topic
84 kafkaClient Client
Kent Hagerman3a402302020-01-31 15:03:53 -050085 doneCh chan struct{}
86 doneOnce sync.Once
Scott Baker2c1c4822019-10-16 11:02:41 -070087
88 // This map is used to map a topic to an interface and channel. When a request is received
89 // on that channel (registered to the topic) then that interface is invoked.
90 topicToRequestHandlerChannelMap map[string]*requestHandlerChannel
91 lockTopicRequestHandlerChannelMap sync.RWMutex
92
93 // This map is used to map a channel to a response topic. This channel handles all responses on that
94 // channel for that topic and forward them to the appropriate consumers channel, using the
95 // transactionIdToChannelMap.
96 topicToResponseChannelMap map[string]<-chan *ic.InterContainerMessage
97 lockTopicResponseChannelMap sync.RWMutex
98
99 // This map is used to map a transaction to a consumers channel. This is used whenever a request has been
100 // sent out and we are waiting for a response.
101 transactionIdToChannelMap map[string]*transactionChannel
102 lockTransactionIdToChannelMap sync.RWMutex
103}
104
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800105type InterContainerProxyOption func(*interContainerProxy)
Scott Baker2c1c4822019-10-16 11:02:41 -0700106
107func InterContainerHost(host string) InterContainerProxyOption {
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800108 return func(args *interContainerProxy) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700109 args.kafkaHost = host
110 }
111}
112
113func InterContainerPort(port int) InterContainerProxyOption {
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800114 return func(args *interContainerProxy) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700115 args.kafkaPort = port
116 }
117}
118
119func DefaultTopic(topic *Topic) InterContainerProxyOption {
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800120 return func(args *interContainerProxy) {
Matteo Scandolof346a2d2020-01-24 13:14:54 -0800121 args.defaultTopic = topic
Scott Baker2c1c4822019-10-16 11:02:41 -0700122 }
123}
124
125func DeviceDiscoveryTopic(topic *Topic) InterContainerProxyOption {
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800126 return func(args *interContainerProxy) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700127 args.deviceDiscoveryTopic = topic
128 }
129}
130
131func RequestHandlerInterface(handler interface{}) InterContainerProxyOption {
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800132 return func(args *interContainerProxy) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700133 args.defaultRequestHandlerInterface = handler
134 }
135}
136
137func MsgClient(client Client) InterContainerProxyOption {
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800138 return func(args *interContainerProxy) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700139 args.kafkaClient = client
140 }
141}
142
Kent Hagerman3a402302020-01-31 15:03:53 -0500143func newInterContainerProxy(opts ...InterContainerProxyOption) *interContainerProxy {
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800144 proxy := &interContainerProxy{
Scott Baker2c1c4822019-10-16 11:02:41 -0700145 kafkaHost: DefaultKafkaHost,
146 kafkaPort: DefaultKafkaPort,
Kent Hagerman3a402302020-01-31 15:03:53 -0500147 doneCh: make(chan struct{}),
Scott Baker2c1c4822019-10-16 11:02:41 -0700148 }
149
150 for _, option := range opts {
151 option(proxy)
152 }
153
Kent Hagerman3a402302020-01-31 15:03:53 -0500154 return proxy
Scott Baker2c1c4822019-10-16 11:02:41 -0700155}
156
Kent Hagerman3a402302020-01-31 15:03:53 -0500157func NewInterContainerProxy(opts ...InterContainerProxyOption) InterContainerProxy {
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800158 return newInterContainerProxy(opts...)
159}
160
161func (kp *interContainerProxy) Start() error {
khenaidoob332f9b2020-01-16 16:25:26 -0500162 logger.Info("Starting-Proxy")
Scott Baker2c1c4822019-10-16 11:02:41 -0700163
164 // Kafka MsgClient should already have been created. If not, output fatal error
165 if kp.kafkaClient == nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500166 logger.Fatal("kafka-client-not-set")
Scott Baker2c1c4822019-10-16 11:02:41 -0700167 }
168
Scott Baker2c1c4822019-10-16 11:02:41 -0700169 // Start the kafka client
170 if err := kp.kafkaClient.Start(); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500171 logger.Errorw("Cannot-create-kafka-proxy", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700172 return err
173 }
174
175 // Create the topic to response channel map
176 kp.topicToResponseChannelMap = make(map[string]<-chan *ic.InterContainerMessage)
177 //
178 // Create the transactionId to Channel Map
179 kp.transactionIdToChannelMap = make(map[string]*transactionChannel)
180
181 // Create the topic to request channel map
182 kp.topicToRequestHandlerChannelMap = make(map[string]*requestHandlerChannel)
183
184 return nil
185}
186
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800187func (kp *interContainerProxy) Stop() {
khenaidoob332f9b2020-01-16 16:25:26 -0500188 logger.Info("stopping-intercontainer-proxy")
Kent Hagerman3a402302020-01-31 15:03:53 -0500189 kp.doneOnce.Do(func() { close(kp.doneCh) })
Scott Baker2c1c4822019-10-16 11:02:41 -0700190 // TODO : Perform cleanup
191 kp.kafkaClient.Stop()
Scott Bakera2da2f42020-02-20 16:27:34 -0800192 err := kp.deleteAllTopicRequestHandlerChannelMap()
193 if err != nil {
194 log.Errorw("failed-delete-all-topic-request-handler-channel-map", log.Fields{"error": err})
195 }
196 err = kp.deleteAllTopicResponseChannelMap()
197 if err != nil {
198 log.Errorw("failed-delete-all-topic-response-channel-map", log.Fields{"error": err})
199 }
200 kp.deleteAllTransactionIdToChannelMap()
Scott Baker2c1c4822019-10-16 11:02:41 -0700201}
202
Matteo Scandolof346a2d2020-01-24 13:14:54 -0800203func (kp *interContainerProxy) GetDefaultTopic() *Topic {
204 return kp.defaultTopic
205}
206
Scott Baker2c1c4822019-10-16 11:02:41 -0700207// DeviceDiscovered publish the discovered device onto the kafka messaging bus
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800208func (kp *interContainerProxy) DeviceDiscovered(deviceId string, deviceType string, parentId string, publisher string) error {
khenaidoob332f9b2020-01-16 16:25:26 -0500209 logger.Debugw("sending-device-discovery-msg", log.Fields{"deviceId": deviceId})
Scott Baker2c1c4822019-10-16 11:02:41 -0700210 // Simple validation
211 if deviceId == "" || deviceType == "" {
khenaidoob332f9b2020-01-16 16:25:26 -0500212 logger.Errorw("invalid-parameters", log.Fields{"id": deviceId, "type": deviceType})
Scott Baker2c1c4822019-10-16 11:02:41 -0700213 return errors.New("invalid-parameters")
214 }
215 // Create the device discovery message
216 header := &ic.Header{
217 Id: uuid.New().String(),
218 Type: ic.MessageType_DEVICE_DISCOVERED,
Matteo Scandolof346a2d2020-01-24 13:14:54 -0800219 FromTopic: kp.defaultTopic.Name,
Scott Baker2c1c4822019-10-16 11:02:41 -0700220 ToTopic: kp.deviceDiscoveryTopic.Name,
221 Timestamp: time.Now().UnixNano(),
222 }
223 body := &ic.DeviceDiscovered{
224 Id: deviceId,
225 DeviceType: deviceType,
226 ParentId: parentId,
227 Publisher: publisher,
228 }
229
230 var marshalledData *any.Any
231 var err error
232 if marshalledData, err = ptypes.MarshalAny(body); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500233 logger.Errorw("cannot-marshal-request", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700234 return err
235 }
236 msg := &ic.InterContainerMessage{
237 Header: header,
238 Body: marshalledData,
239 }
240
241 // Send the message
242 if err := kp.kafkaClient.Send(msg, kp.deviceDiscoveryTopic); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500243 logger.Errorw("cannot-send-device-discovery-message", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700244 return err
245 }
246 return nil
247}
248
249// InvokeRPC is used to send a request to a given topic
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800250func (kp *interContainerProxy) InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
Scott Baker2c1c4822019-10-16 11:02:41 -0700251 waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any) {
252
253 // If a replyToTopic is provided then we use it, otherwise just use the default toTopic. The replyToTopic is
254 // typically the device ID.
255 responseTopic := replyToTopic
256 if responseTopic == nil {
Matteo Scandolof346a2d2020-01-24 13:14:54 -0800257 responseTopic = kp.defaultTopic
Scott Baker2c1c4822019-10-16 11:02:41 -0700258 }
259
260 // Encode the request
261 protoRequest, err := encodeRequest(rpc, toTopic, responseTopic, key, kvArgs...)
262 if err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500263 logger.Warnw("cannot-format-request", log.Fields{"rpc": rpc, "error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700264 return false, nil
265 }
266
267 // Subscribe for response, if needed, before sending request
268 var ch <-chan *ic.InterContainerMessage
269 if waitForResponse {
270 var err error
271 if ch, err = kp.subscribeForResponse(*responseTopic, protoRequest.Header.Id); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500272 logger.Errorw("failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700273 }
274 }
275
276 // Send request - if the topic is formatted with a device Id then we will send the request using a
277 // specific key, hence ensuring a single partition is used to publish the request. This ensures that the
278 // subscriber on that topic will receive the request in the order it was sent. The key used is the deviceId.
279 //key := GetDeviceIdFromTopic(*toTopic)
khenaidoob332f9b2020-01-16 16:25:26 -0500280 logger.Debugw("sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800281 go func() {
282 if err := kp.kafkaClient.Send(protoRequest, toTopic, key); err != nil {
283 logger.Errorw("send-failed", log.Fields{
284 "topic": toTopic,
285 "key": key,
286 "error": err})
287 }
288 }()
Scott Baker2c1c4822019-10-16 11:02:41 -0700289
290 if waitForResponse {
291 // Create a child context based on the parent context, if any
292 var cancel context.CancelFunc
293 childCtx := context.Background()
294 if ctx == nil {
295 ctx, cancel = context.WithTimeout(context.Background(), DefaultRequestTimeout*time.Millisecond)
296 } else {
297 childCtx, cancel = context.WithTimeout(ctx, DefaultRequestTimeout*time.Millisecond)
298 }
299 defer cancel()
300
301 // Wait for response as well as timeout or cancellation
302 // Remove the subscription for a response on return
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800303 defer func() {
304 if err := kp.unSubscribeForResponse(protoRequest.Header.Id); err != nil {
305 logger.Errorw("response-unsubscribe-failed", log.Fields{
306 "id": protoRequest.Header.Id,
307 "error": err})
308 }
309 }()
Scott Baker2c1c4822019-10-16 11:02:41 -0700310 select {
311 case msg, ok := <-ch:
312 if !ok {
khenaidoob332f9b2020-01-16 16:25:26 -0500313 logger.Warnw("channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700314 protoError := &ic.Error{Reason: "channel-closed"}
315 var marshalledArg *any.Any
316 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
317 return false, nil // Should never happen
318 }
319 return false, marshalledArg
320 }
khenaidoob332f9b2020-01-16 16:25:26 -0500321 logger.Debugw("received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
Scott Baker2c1c4822019-10-16 11:02:41 -0700322 var responseBody *ic.InterContainerResponseBody
323 var err error
324 if responseBody, err = decodeResponse(msg); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500325 logger.Errorw("decode-response-error", log.Fields{"error": err})
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800326 // FIXME we should return something
Scott Baker2c1c4822019-10-16 11:02:41 -0700327 }
328 return responseBody.Success, responseBody.Result
329 case <-ctx.Done():
khenaidoob332f9b2020-01-16 16:25:26 -0500330 logger.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
Scott Baker2c1c4822019-10-16 11:02:41 -0700331 // pack the error as proto any type
Matteo Scandolob45cf592020-01-21 16:10:56 -0800332 protoError := &ic.Error{Reason: ctx.Err().Error(), Code: ic.ErrorCode_DEADLINE_EXCEEDED}
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800333
Scott Baker2c1c4822019-10-16 11:02:41 -0700334 var marshalledArg *any.Any
335 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
336 return false, nil // Should never happen
337 }
338 return false, marshalledArg
339 case <-childCtx.Done():
khenaidoob332f9b2020-01-16 16:25:26 -0500340 logger.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": childCtx.Err()})
Scott Baker2c1c4822019-10-16 11:02:41 -0700341 // pack the error as proto any type
Matteo Scandolob45cf592020-01-21 16:10:56 -0800342 protoError := &ic.Error{Reason: childCtx.Err().Error(), Code: ic.ErrorCode_DEADLINE_EXCEEDED}
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800343
Scott Baker2c1c4822019-10-16 11:02:41 -0700344 var marshalledArg *any.Any
345 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
346 return false, nil // Should never happen
347 }
348 return false, marshalledArg
349 case <-kp.doneCh:
khenaidoob332f9b2020-01-16 16:25:26 -0500350 logger.Infow("received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
Scott Baker2c1c4822019-10-16 11:02:41 -0700351 return true, nil
352 }
353 }
354 return true, nil
355}
356
357// SubscribeWithRequestHandlerInterface allows a caller to assign a target object to be invoked automatically
358// when a message is received on a given topic
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800359func (kp *interContainerProxy) SubscribeWithRequestHandlerInterface(topic Topic, handler interface{}) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700360
361 // Subscribe to receive messages for that topic
362 var ch <-chan *ic.InterContainerMessage
363 var err error
364 if ch, err = kp.kafkaClient.Subscribe(&topic); err != nil {
365 //if ch, err = kp.Subscribe(topic); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500366 logger.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700367 return err
368 }
369
370 kp.defaultRequestHandlerInterface = handler
371 kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: handler, ch: ch})
372 // Launch a go routine to receive and process kafka messages
373 go kp.waitForMessages(ch, topic, handler)
374
375 return nil
376}
377
378// SubscribeWithDefaultRequestHandler allows a caller to add a topic to an existing target object to be invoked automatically
379// when a message is received on a given topic. So far there is only 1 target registered per microservice
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800380func (kp *interContainerProxy) SubscribeWithDefaultRequestHandler(topic Topic, initialOffset int64) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700381 // Subscribe to receive messages for that topic
382 var ch <-chan *ic.InterContainerMessage
383 var err error
384 if ch, err = kp.kafkaClient.Subscribe(&topic, &KVArg{Key: Offset, Value: initialOffset}); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500385 logger.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700386 return err
387 }
388 kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: kp.defaultRequestHandlerInterface, ch: ch})
389
390 // Launch a go routine to receive and process kafka messages
391 go kp.waitForMessages(ch, topic, kp.defaultRequestHandlerInterface)
392
393 return nil
394}
395
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800396func (kp *interContainerProxy) UnSubscribeFromRequestHandler(topic Topic) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700397 return kp.deleteFromTopicRequestHandlerChannelMap(topic.Name)
398}
399
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800400func (kp *interContainerProxy) deleteFromTopicResponseChannelMap(topic string) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700401 kp.lockTopicResponseChannelMap.Lock()
402 defer kp.lockTopicResponseChannelMap.Unlock()
403 if _, exist := kp.topicToResponseChannelMap[topic]; exist {
404 // Unsubscribe to this topic first - this will close the subscribed channel
405 var err error
406 if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500407 logger.Errorw("unsubscribing-error", log.Fields{"topic": topic})
Scott Baker2c1c4822019-10-16 11:02:41 -0700408 }
409 delete(kp.topicToResponseChannelMap, topic)
410 return err
411 } else {
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800412 return fmt.Errorf("%s-Topic-not-found", topic)
Scott Baker2c1c4822019-10-16 11:02:41 -0700413 }
414}
415
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800416// nolint: unused
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800417func (kp *interContainerProxy) deleteAllTopicResponseChannelMap() error {
Scott Bakera2da2f42020-02-20 16:27:34 -0800418 logger.Debug("delete-all-topic-response-channel")
Scott Baker2c1c4822019-10-16 11:02:41 -0700419 kp.lockTopicResponseChannelMap.Lock()
420 defer kp.lockTopicResponseChannelMap.Unlock()
Scott Bakera2da2f42020-02-20 16:27:34 -0800421 var unsubscribeFailTopics []string
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800422 for topic := range kp.topicToResponseChannelMap {
Scott Baker2c1c4822019-10-16 11:02:41 -0700423 // Unsubscribe to this topic first - this will close the subscribed channel
Scott Bakera2da2f42020-02-20 16:27:34 -0800424 if err := kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
425 unsubscribeFailTopics = append(unsubscribeFailTopics, topic)
khenaidoob332f9b2020-01-16 16:25:26 -0500426 logger.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
Scott Bakera2da2f42020-02-20 16:27:34 -0800427 // Do not return. Continue to try to unsubscribe to other topics.
428 } else {
429 // Only delete from channel map if successfully unsubscribed.
430 delete(kp.topicToResponseChannelMap, topic)
Scott Baker2c1c4822019-10-16 11:02:41 -0700431 }
Scott Baker2c1c4822019-10-16 11:02:41 -0700432 }
Scott Bakera2da2f42020-02-20 16:27:34 -0800433 if len(unsubscribeFailTopics) > 0 {
434 return fmt.Errorf("unsubscribe-errors: %v", unsubscribeFailTopics)
435 }
436 return nil
Scott Baker2c1c4822019-10-16 11:02:41 -0700437}
438
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800439func (kp *interContainerProxy) addToTopicRequestHandlerChannelMap(topic string, arg *requestHandlerChannel) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700440 kp.lockTopicRequestHandlerChannelMap.Lock()
441 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
442 if _, exist := kp.topicToRequestHandlerChannelMap[topic]; !exist {
443 kp.topicToRequestHandlerChannelMap[topic] = arg
444 }
445}
446
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800447func (kp *interContainerProxy) deleteFromTopicRequestHandlerChannelMap(topic string) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700448 kp.lockTopicRequestHandlerChannelMap.Lock()
449 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
450 if _, exist := kp.topicToRequestHandlerChannelMap[topic]; exist {
451 // Close the kafka client client first by unsubscribing to this topic
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800452 if err := kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
453 return err
454 }
Scott Baker2c1c4822019-10-16 11:02:41 -0700455 delete(kp.topicToRequestHandlerChannelMap, topic)
456 return nil
457 } else {
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800458 return fmt.Errorf("%s-Topic-not-found", topic)
Scott Baker2c1c4822019-10-16 11:02:41 -0700459 }
460}
461
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800462// nolint: unused
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800463func (kp *interContainerProxy) deleteAllTopicRequestHandlerChannelMap() error {
Scott Bakera2da2f42020-02-20 16:27:34 -0800464 logger.Debug("delete-all-topic-request-channel")
Scott Baker2c1c4822019-10-16 11:02:41 -0700465 kp.lockTopicRequestHandlerChannelMap.Lock()
466 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
Scott Bakera2da2f42020-02-20 16:27:34 -0800467 var unsubscribeFailTopics []string
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800468 for topic := range kp.topicToRequestHandlerChannelMap {
Scott Baker2c1c4822019-10-16 11:02:41 -0700469 // Close the kafka client client first by unsubscribing to this topic
Scott Bakera2da2f42020-02-20 16:27:34 -0800470 if err := kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
471 unsubscribeFailTopics = append(unsubscribeFailTopics, topic)
khenaidoob332f9b2020-01-16 16:25:26 -0500472 logger.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
Scott Bakera2da2f42020-02-20 16:27:34 -0800473 // Do not return. Continue to try to unsubscribe to other topics.
474 } else {
475 // Only delete from channel map if successfully unsubscribed.
476 delete(kp.topicToRequestHandlerChannelMap, topic)
Scott Baker2c1c4822019-10-16 11:02:41 -0700477 }
Scott Baker2c1c4822019-10-16 11:02:41 -0700478 }
Scott Bakera2da2f42020-02-20 16:27:34 -0800479 if len(unsubscribeFailTopics) > 0 {
480 return fmt.Errorf("unsubscribe-errors: %v", unsubscribeFailTopics)
481 }
482 return nil
Scott Baker2c1c4822019-10-16 11:02:41 -0700483}
484
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800485func (kp *interContainerProxy) addToTransactionIdToChannelMap(id string, topic *Topic, arg chan *ic.InterContainerMessage) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700486 kp.lockTransactionIdToChannelMap.Lock()
487 defer kp.lockTransactionIdToChannelMap.Unlock()
488 if _, exist := kp.transactionIdToChannelMap[id]; !exist {
489 kp.transactionIdToChannelMap[id] = &transactionChannel{topic: topic, ch: arg}
490 }
491}
492
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800493func (kp *interContainerProxy) deleteFromTransactionIdToChannelMap(id string) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700494 kp.lockTransactionIdToChannelMap.Lock()
495 defer kp.lockTransactionIdToChannelMap.Unlock()
496 if transChannel, exist := kp.transactionIdToChannelMap[id]; exist {
497 // Close the channel first
498 close(transChannel.ch)
499 delete(kp.transactionIdToChannelMap, id)
500 }
501}
502
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800503func (kp *interContainerProxy) deleteTopicTransactionIdToChannelMap(id string) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700504 kp.lockTransactionIdToChannelMap.Lock()
505 defer kp.lockTransactionIdToChannelMap.Unlock()
506 for key, value := range kp.transactionIdToChannelMap {
507 if value.topic.Name == id {
508 close(value.ch)
509 delete(kp.transactionIdToChannelMap, key)
510 }
511 }
512}
513
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800514// nolint: unused
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800515func (kp *interContainerProxy) deleteAllTransactionIdToChannelMap() {
Scott Bakera2da2f42020-02-20 16:27:34 -0800516 logger.Debug("delete-all-transaction-id-channel-map")
Scott Baker2c1c4822019-10-16 11:02:41 -0700517 kp.lockTransactionIdToChannelMap.Lock()
518 defer kp.lockTransactionIdToChannelMap.Unlock()
519 for key, value := range kp.transactionIdToChannelMap {
520 close(value.ch)
521 delete(kp.transactionIdToChannelMap, key)
522 }
523}
524
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800525func (kp *interContainerProxy) DeleteTopic(topic Topic) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700526 // If we have any consumers on that topic we need to close them
527 if err := kp.deleteFromTopicResponseChannelMap(topic.Name); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500528 logger.Errorw("delete-from-topic-responsechannelmap-failed", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700529 }
530 if err := kp.deleteFromTopicRequestHandlerChannelMap(topic.Name); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500531 logger.Errorw("delete-from-topic-requesthandlerchannelmap-failed", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700532 }
533 kp.deleteTopicTransactionIdToChannelMap(topic.Name)
534
535 return kp.kafkaClient.DeleteTopic(&topic)
536}
537
538func encodeReturnedValue(returnedVal interface{}) (*any.Any, error) {
539 // Encode the response argument - needs to be a proto message
540 if returnedVal == nil {
541 return nil, nil
542 }
543 protoValue, ok := returnedVal.(proto.Message)
544 if !ok {
khenaidoob332f9b2020-01-16 16:25:26 -0500545 logger.Warnw("response-value-not-proto-message", log.Fields{"error": ok, "returnVal": returnedVal})
Scott Baker2c1c4822019-10-16 11:02:41 -0700546 err := errors.New("response-value-not-proto-message")
547 return nil, err
548 }
549
550 // Marshal the returned value, if any
551 var marshalledReturnedVal *any.Any
552 var err error
553 if marshalledReturnedVal, err = ptypes.MarshalAny(protoValue); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500554 logger.Warnw("cannot-marshal-returned-val", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700555 return nil, err
556 }
557 return marshalledReturnedVal, nil
558}
559
560func encodeDefaultFailedResponse(request *ic.InterContainerMessage) *ic.InterContainerMessage {
561 responseHeader := &ic.Header{
562 Id: request.Header.Id,
563 Type: ic.MessageType_RESPONSE,
564 FromTopic: request.Header.ToTopic,
565 ToTopic: request.Header.FromTopic,
Kent Hagermanccfa2132019-12-17 13:29:34 -0500566 Timestamp: time.Now().UnixNano(),
Scott Baker2c1c4822019-10-16 11:02:41 -0700567 }
568 responseBody := &ic.InterContainerResponseBody{
569 Success: false,
570 Result: nil,
571 }
572 var marshalledResponseBody *any.Any
573 var err error
574 // Error should never happen here
575 if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500576 logger.Warnw("cannot-marshal-failed-response-body", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700577 }
578
579 return &ic.InterContainerMessage{
580 Header: responseHeader,
581 Body: marshalledResponseBody,
582 }
583
584}
585
586//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
587//or an error on failure
588func encodeResponse(request *ic.InterContainerMessage, success bool, returnedValues ...interface{}) (*ic.InterContainerMessage, error) {
khenaidoob332f9b2020-01-16 16:25:26 -0500589 //logger.Debugw("encodeResponse", log.Fields{"success": success, "returnedValues": returnedValues})
Scott Baker2c1c4822019-10-16 11:02:41 -0700590 responseHeader := &ic.Header{
591 Id: request.Header.Id,
592 Type: ic.MessageType_RESPONSE,
593 FromTopic: request.Header.ToTopic,
594 ToTopic: request.Header.FromTopic,
595 KeyTopic: request.Header.KeyTopic,
596 Timestamp: time.Now().UnixNano(),
597 }
598
599 // Go over all returned values
600 var marshalledReturnedVal *any.Any
601 var err error
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800602
603 // for now we support only 1 returned value - (excluding the error)
604 if len(returnedValues) > 0 {
605 if marshalledReturnedVal, err = encodeReturnedValue(returnedValues[0]); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500606 logger.Warnw("cannot-marshal-response-body", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700607 }
Scott Baker2c1c4822019-10-16 11:02:41 -0700608 }
609
610 responseBody := &ic.InterContainerResponseBody{
611 Success: success,
612 Result: marshalledReturnedVal,
613 }
614
615 // Marshal the response body
616 var marshalledResponseBody *any.Any
617 if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500618 logger.Warnw("cannot-marshal-response-body", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700619 return nil, err
620 }
621
622 return &ic.InterContainerMessage{
623 Header: responseHeader,
624 Body: marshalledResponseBody,
625 }, nil
626}
627
628func CallFuncByName(myClass interface{}, funcName string, params ...interface{}) (out []reflect.Value, err error) {
629 myClassValue := reflect.ValueOf(myClass)
630 // Capitalize the first letter in the funcName to workaround the first capital letters required to
631 // invoke a function from a different package
632 funcName = strings.Title(funcName)
633 m := myClassValue.MethodByName(funcName)
634 if !m.IsValid() {
635 return make([]reflect.Value, 0), fmt.Errorf("method-not-found \"%s\"", funcName)
636 }
637 in := make([]reflect.Value, len(params))
638 for i, param := range params {
639 in[i] = reflect.ValueOf(param)
640 }
641 out = m.Call(in)
642 return
643}
644
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800645func (kp *interContainerProxy) addTransactionId(transactionId string, currentArgs []*ic.Argument) []*ic.Argument {
Scott Baker2c1c4822019-10-16 11:02:41 -0700646 arg := &KVArg{
647 Key: TransactionKey,
648 Value: &ic.StrType{Val: transactionId},
649 }
650
651 var marshalledArg *any.Any
652 var err error
653 if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: transactionId}); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500654 logger.Warnw("cannot-add-transactionId", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700655 return currentArgs
656 }
657 protoArg := &ic.Argument{
658 Key: arg.Key,
659 Value: marshalledArg,
660 }
661 return append(currentArgs, protoArg)
662}
663
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800664func (kp *interContainerProxy) addFromTopic(fromTopic string, currentArgs []*ic.Argument) []*ic.Argument {
Scott Baker2c1c4822019-10-16 11:02:41 -0700665 var marshalledArg *any.Any
666 var err error
667 if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: fromTopic}); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500668 logger.Warnw("cannot-add-transactionId", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700669 return currentArgs
670 }
671 protoArg := &ic.Argument{
672 Key: FromTopic,
673 Value: marshalledArg,
674 }
675 return append(currentArgs, protoArg)
676}
677
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800678func (kp *interContainerProxy) handleMessage(msg *ic.InterContainerMessage, targetInterface interface{}) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700679
680 // First extract the header to know whether this is a request - responses are handled by a different handler
681 if msg.Header.Type == ic.MessageType_REQUEST {
682 var out []reflect.Value
683 var err error
684
685 // Get the request body
686 requestBody := &ic.InterContainerRequestBody{}
687 if err = ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500688 logger.Warnw("cannot-unmarshal-request", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700689 } else {
khenaidoob332f9b2020-01-16 16:25:26 -0500690 logger.Debugw("received-request", log.Fields{"rpc": requestBody.Rpc, "header": msg.Header})
Scott Baker2c1c4822019-10-16 11:02:41 -0700691 // let the callee unpack the arguments as its the only one that knows the real proto type
692 // Augment the requestBody with the message Id as it will be used in scenarios where cores
693 // are set in pairs and competing
694 requestBody.Args = kp.addTransactionId(msg.Header.Id, requestBody.Args)
695
696 // Augment the requestBody with the From topic name as it will be used in scenarios where a container
697 // needs to send an unsollicited message to the currently requested container
698 requestBody.Args = kp.addFromTopic(msg.Header.FromTopic, requestBody.Args)
699
700 out, err = CallFuncByName(targetInterface, requestBody.Rpc, requestBody.Args)
701 if err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500702 logger.Warn(err)
Scott Baker2c1c4822019-10-16 11:02:41 -0700703 }
704 }
705 // Response required?
706 if requestBody.ResponseRequired {
707 // If we already have an error before then just return that
708 var returnError *ic.Error
709 var returnedValues []interface{}
710 var success bool
711 if err != nil {
712 returnError = &ic.Error{Reason: err.Error()}
713 returnedValues = make([]interface{}, 1)
714 returnedValues[0] = returnError
715 } else {
716 returnedValues = make([]interface{}, 0)
717 // Check for errors first
718 lastIndex := len(out) - 1
719 if out[lastIndex].Interface() != nil { // Error
720 if retError, ok := out[lastIndex].Interface().(error); ok {
721 if retError.Error() == ErrorTransactionNotAcquired.Error() {
khenaidoob332f9b2020-01-16 16:25:26 -0500722 logger.Debugw("Ignoring request", log.Fields{"error": retError, "txId": msg.Header.Id})
Scott Baker2c1c4822019-10-16 11:02:41 -0700723 return // Ignore - process is in competing mode and ignored transaction
724 }
725 returnError = &ic.Error{Reason: retError.Error()}
726 returnedValues = append(returnedValues, returnError)
727 } else { // Should never happen
728 returnError = &ic.Error{Reason: "incorrect-error-returns"}
729 returnedValues = append(returnedValues, returnError)
730 }
731 } else if len(out) == 2 && reflect.ValueOf(out[0].Interface()).IsValid() && reflect.ValueOf(out[0].Interface()).IsNil() {
khenaidoob332f9b2020-01-16 16:25:26 -0500732 logger.Warnw("Unexpected response of (nil,nil)", log.Fields{"txId": msg.Header.Id})
Scott Baker2c1c4822019-10-16 11:02:41 -0700733 return // Ignore - should not happen
734 } else { // Non-error case
735 success = true
736 for idx, val := range out {
khenaidoob332f9b2020-01-16 16:25:26 -0500737 //logger.Debugw("returned-api-response-loop", log.Fields{"idx": idx, "val": val.Interface()})
Scott Baker2c1c4822019-10-16 11:02:41 -0700738 if idx != lastIndex {
739 returnedValues = append(returnedValues, val.Interface())
740 }
741 }
742 }
743 }
744
745 var icm *ic.InterContainerMessage
746 if icm, err = encodeResponse(msg, success, returnedValues...); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500747 logger.Warnw("error-encoding-response-returning-failure-result", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700748 icm = encodeDefaultFailedResponse(msg)
749 }
750 // To preserve ordering of messages, all messages to a given topic are sent to the same partition
751 // by providing a message key. The key is encoded in the topic name. If the deviceId is not
752 // present then the key will be empty, hence all messages for a given topic will be sent to all
753 // partitions.
754 replyTopic := &Topic{Name: msg.Header.FromTopic}
755 key := msg.Header.KeyTopic
khenaidoob332f9b2020-01-16 16:25:26 -0500756 logger.Debugw("sending-response-to-kafka", log.Fields{"rpc": requestBody.Rpc, "header": icm.Header, "key": key})
Scott Baker2c1c4822019-10-16 11:02:41 -0700757 // TODO: handle error response.
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800758 go func() {
759 if err := kp.kafkaClient.Send(icm, replyTopic, key); err != nil {
760 logger.Errorw("send-reply-failed", log.Fields{
761 "topic": replyTopic,
762 "key": key,
763 "error": err})
764 }
765 }()
Scott Baker2c1c4822019-10-16 11:02:41 -0700766 }
767 } else if msg.Header.Type == ic.MessageType_RESPONSE {
khenaidoob332f9b2020-01-16 16:25:26 -0500768 logger.Debugw("response-received", log.Fields{"msg-header": msg.Header})
Scott Baker2c1c4822019-10-16 11:02:41 -0700769 go kp.dispatchResponse(msg)
770 } else {
khenaidoob332f9b2020-01-16 16:25:26 -0500771 logger.Warnw("unsupported-message-received", log.Fields{"msg-header": msg.Header})
Scott Baker2c1c4822019-10-16 11:02:41 -0700772 }
773}
774
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800775func (kp *interContainerProxy) waitForMessages(ch <-chan *ic.InterContainerMessage, topic Topic, targetInterface interface{}) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700776 // Wait for messages
777 for msg := range ch {
khenaidoob332f9b2020-01-16 16:25:26 -0500778 //logger.Debugw("request-received", log.Fields{"msg": msg, "topic": topic.Name, "target": targetInterface})
Scott Baker2c1c4822019-10-16 11:02:41 -0700779 go kp.handleMessage(msg, targetInterface)
780 }
781}
782
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800783func (kp *interContainerProxy) dispatchResponse(msg *ic.InterContainerMessage) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700784 kp.lockTransactionIdToChannelMap.RLock()
785 defer kp.lockTransactionIdToChannelMap.RUnlock()
786 if _, exist := kp.transactionIdToChannelMap[msg.Header.Id]; !exist {
khenaidoob332f9b2020-01-16 16:25:26 -0500787 logger.Debugw("no-waiting-channel", log.Fields{"transaction": msg.Header.Id})
Scott Baker2c1c4822019-10-16 11:02:41 -0700788 return
789 }
790 kp.transactionIdToChannelMap[msg.Header.Id].ch <- msg
791}
792
793// subscribeForResponse allows a caller to subscribe to a given topic when waiting for a response.
794// This method is built to prevent all subscribers to receive all messages as is the case of the Subscribe
795// API. There is one response channel waiting for kafka messages before dispatching the message to the
796// corresponding waiting channel
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800797func (kp *interContainerProxy) subscribeForResponse(topic Topic, trnsId string) (chan *ic.InterContainerMessage, error) {
khenaidoob332f9b2020-01-16 16:25:26 -0500798 logger.Debugw("subscribeForResponse", log.Fields{"topic": topic.Name, "trnsid": trnsId})
Scott Baker2c1c4822019-10-16 11:02:41 -0700799
800 // Create a specific channel for this consumers. We cannot use the channel from the kafkaclient as it will
801 // broadcast any message for this topic to all channels waiting on it.
Scott Bakerae1d4702020-03-04 14:10:51 -0800802 // Set channel size to 1 to prevent deadlock, see VOL-2708
803 ch := make(chan *ic.InterContainerMessage, 1)
Scott Baker2c1c4822019-10-16 11:02:41 -0700804 kp.addToTransactionIdToChannelMap(trnsId, &topic, ch)
805
806 return ch, nil
807}
808
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800809func (kp *interContainerProxy) unSubscribeForResponse(trnsId string) error {
khenaidoob332f9b2020-01-16 16:25:26 -0500810 logger.Debugw("unsubscribe-for-response", log.Fields{"trnsId": trnsId})
Scott Baker2c1c4822019-10-16 11:02:41 -0700811 kp.deleteFromTransactionIdToChannelMap(trnsId)
812 return nil
813}
814
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800815func (kp *interContainerProxy) EnableLivenessChannel(enable bool) chan bool {
Scott Baker104b67d2019-10-29 15:56:27 -0700816 return kp.kafkaClient.EnableLivenessChannel(enable)
817}
818
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800819func (kp *interContainerProxy) EnableHealthinessChannel(enable bool) chan bool {
Scott Baker0fef6982019-12-12 09:49:42 -0800820 return kp.kafkaClient.EnableHealthinessChannel(enable)
821}
822
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800823func (kp *interContainerProxy) SendLiveness() error {
Scott Baker104b67d2019-10-29 15:56:27 -0700824 return kp.kafkaClient.SendLiveness()
825}
826
Scott Baker2c1c4822019-10-16 11:02:41 -0700827//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
828//or an error on failure
829func encodeRequest(rpc string, toTopic *Topic, replyTopic *Topic, key string, kvArgs ...*KVArg) (*ic.InterContainerMessage, error) {
830 requestHeader := &ic.Header{
831 Id: uuid.New().String(),
832 Type: ic.MessageType_REQUEST,
833 FromTopic: replyTopic.Name,
834 ToTopic: toTopic.Name,
835 KeyTopic: key,
836 Timestamp: time.Now().UnixNano(),
837 }
838 requestBody := &ic.InterContainerRequestBody{
839 Rpc: rpc,
840 ResponseRequired: true,
841 ReplyToTopic: replyTopic.Name,
842 }
843
844 for _, arg := range kvArgs {
845 if arg == nil {
846 // In case the caller sends an array with empty args
847 continue
848 }
849 var marshalledArg *any.Any
850 var err error
851 // ascertain the value interface type is a proto.Message
852 protoValue, ok := arg.Value.(proto.Message)
853 if !ok {
khenaidoob332f9b2020-01-16 16:25:26 -0500854 logger.Warnw("argument-value-not-proto-message", log.Fields{"error": ok, "Value": arg.Value})
Scott Baker2c1c4822019-10-16 11:02:41 -0700855 err := errors.New("argument-value-not-proto-message")
856 return nil, err
857 }
858 if marshalledArg, err = ptypes.MarshalAny(protoValue); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500859 logger.Warnw("cannot-marshal-request", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700860 return nil, err
861 }
862 protoArg := &ic.Argument{
863 Key: arg.Key,
864 Value: marshalledArg,
865 }
866 requestBody.Args = append(requestBody.Args, protoArg)
867 }
868
869 var marshalledData *any.Any
870 var err error
871 if marshalledData, err = ptypes.MarshalAny(requestBody); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500872 logger.Warnw("cannot-marshal-request", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700873 return nil, err
874 }
875 request := &ic.InterContainerMessage{
876 Header: requestHeader,
877 Body: marshalledData,
878 }
879 return request, nil
880}
881
882func decodeResponse(response *ic.InterContainerMessage) (*ic.InterContainerResponseBody, error) {
883 // Extract the message body
884 responseBody := ic.InterContainerResponseBody{}
885 if err := ptypes.UnmarshalAny(response.Body, &responseBody); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500886 logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700887 return nil, err
888 }
khenaidoob332f9b2020-01-16 16:25:26 -0500889 //logger.Debugw("response-decoded-successfully", log.Fields{"response-status": &responseBody.Success})
Scott Baker2c1c4822019-10-16 11:02:41 -0700890
891 return &responseBody, nil
892
893}