blob: 9f9fbfc1189141065f70f1238cd89d3287c5f160 [file] [log] [blame]
Scott Baker2c1c4822019-10-16 11:02:41 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kafka
17
18import (
19 "context"
20 "errors"
21 "fmt"
Matteo Scandoloed128822020-02-10 15:52:35 -080022 "google.golang.org/grpc/codes"
23 "google.golang.org/grpc/status"
Scott Baker2c1c4822019-10-16 11:02:41 -070024 "reflect"
25 "strings"
26 "sync"
27 "time"
serkant.uluderyab38671c2019-11-01 09:35:38 -070028
29 "github.com/golang/protobuf/proto"
30 "github.com/golang/protobuf/ptypes"
31 "github.com/golang/protobuf/ptypes/any"
32 "github.com/google/uuid"
33 "github.com/opencord/voltha-lib-go/v3/pkg/log"
34 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
Scott Baker2c1c4822019-10-16 11:02:41 -070035)
36
Scott Baker2c1c4822019-10-16 11:02:41 -070037const (
38 DefaultMaxRetries = 3
Matteo Scandoloed128822020-02-10 15:52:35 -080039 DefaultRequestTimeout = 60000 // 60000 milliseconds - to handle a wider latency range
Scott Baker2c1c4822019-10-16 11:02:41 -070040)
41
42const (
43 TransactionKey = "transactionID"
44 FromTopic = "fromTopic"
45)
46
47var ErrorTransactionNotAcquired = errors.New("transaction-not-acquired")
48var ErrorTransactionInvalidId = errors.New("transaction-invalid-id")
49
50// requestHandlerChannel represents an interface associated with a channel. Whenever, an event is
51// obtained from that channel, this interface is invoked. This is used to handle
52// async requests into the Core via the kafka messaging bus
53type requestHandlerChannel struct {
54 requesthandlerInterface interface{}
55 ch <-chan *ic.InterContainerMessage
56}
57
58// transactionChannel represents a combination of a topic and a channel onto which a response received
59// on the kafka bus will be sent to
60type transactionChannel struct {
61 topic *Topic
62 ch chan *ic.InterContainerMessage
63}
64
Matteo Scandolo2ba00d32020-01-16 17:33:03 -080065type InterContainerProxy interface {
Scott Bakere6685952020-06-23 04:05:39 +000066 Start() error
67 Stop()
Matteo Scandolof346a2d2020-01-24 13:14:54 -080068 GetDefaultTopic() *Topic
Scott Bakere6685952020-06-23 04:05:39 +000069 DeviceDiscovered(deviceId string, deviceType string, parentId string, publisher string) error
Matteo Scandolo2ba00d32020-01-16 17:33:03 -080070 InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic, waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any)
Matteo Scandoloed128822020-02-10 15:52:35 -080071 InvokeAsyncRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic, waitForResponse bool, key string, kvArgs ...*KVArg) chan *RpcResponse
Scott Bakere6685952020-06-23 04:05:39 +000072 SubscribeWithRequestHandlerInterface(topic Topic, handler interface{}) error
73 SubscribeWithDefaultRequestHandler(topic Topic, initialOffset int64) error
74 UnSubscribeFromRequestHandler(topic Topic) error
75 DeleteTopic(topic Topic) error
76 EnableLivenessChannel(enable bool) chan bool
77 SendLiveness() error
Matteo Scandolo2ba00d32020-01-16 17:33:03 -080078}
79
80// interContainerProxy represents the messaging proxy
81type interContainerProxy struct {
Neha Sharmadd9af392020-04-28 09:03:57 +000082 kafkaAddress string
Matteo Scandolof346a2d2020-01-24 13:14:54 -080083 defaultTopic *Topic
Scott Baker2c1c4822019-10-16 11:02:41 -070084 defaultRequestHandlerInterface interface{}
85 deviceDiscoveryTopic *Topic
86 kafkaClient Client
Kent Hagerman3a402302020-01-31 15:03:53 -050087 doneCh chan struct{}
88 doneOnce sync.Once
Scott Baker2c1c4822019-10-16 11:02:41 -070089
90 // This map is used to map a topic to an interface and channel. When a request is received
91 // on that channel (registered to the topic) then that interface is invoked.
92 topicToRequestHandlerChannelMap map[string]*requestHandlerChannel
93 lockTopicRequestHandlerChannelMap sync.RWMutex
94
95 // This map is used to map a channel to a response topic. This channel handles all responses on that
96 // channel for that topic and forward them to the appropriate consumers channel, using the
97 // transactionIdToChannelMap.
98 topicToResponseChannelMap map[string]<-chan *ic.InterContainerMessage
99 lockTopicResponseChannelMap sync.RWMutex
100
101 // This map is used to map a transaction to a consumers channel. This is used whenever a request has been
102 // sent out and we are waiting for a response.
103 transactionIdToChannelMap map[string]*transactionChannel
104 lockTransactionIdToChannelMap sync.RWMutex
105}
106
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800107type InterContainerProxyOption func(*interContainerProxy)
Scott Baker2c1c4822019-10-16 11:02:41 -0700108
Neha Sharmadd9af392020-04-28 09:03:57 +0000109func InterContainerAddress(address string) InterContainerProxyOption {
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800110 return func(args *interContainerProxy) {
Neha Sharmadd9af392020-04-28 09:03:57 +0000111 args.kafkaAddress = address
Scott Baker2c1c4822019-10-16 11:02:41 -0700112 }
113}
114
115func DefaultTopic(topic *Topic) InterContainerProxyOption {
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800116 return func(args *interContainerProxy) {
Matteo Scandolof346a2d2020-01-24 13:14:54 -0800117 args.defaultTopic = topic
Scott Baker2c1c4822019-10-16 11:02:41 -0700118 }
119}
120
121func DeviceDiscoveryTopic(topic *Topic) InterContainerProxyOption {
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800122 return func(args *interContainerProxy) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700123 args.deviceDiscoveryTopic = topic
124 }
125}
126
127func RequestHandlerInterface(handler interface{}) InterContainerProxyOption {
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800128 return func(args *interContainerProxy) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700129 args.defaultRequestHandlerInterface = handler
130 }
131}
132
133func MsgClient(client Client) InterContainerProxyOption {
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800134 return func(args *interContainerProxy) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700135 args.kafkaClient = client
136 }
137}
138
Kent Hagerman3a402302020-01-31 15:03:53 -0500139func newInterContainerProxy(opts ...InterContainerProxyOption) *interContainerProxy {
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800140 proxy := &interContainerProxy{
Neha Sharmadd9af392020-04-28 09:03:57 +0000141 kafkaAddress: DefaultKafkaAddress,
142 doneCh: make(chan struct{}),
Scott Baker2c1c4822019-10-16 11:02:41 -0700143 }
144
145 for _, option := range opts {
146 option(proxy)
147 }
148
Kent Hagerman3a402302020-01-31 15:03:53 -0500149 return proxy
Scott Baker2c1c4822019-10-16 11:02:41 -0700150}
151
Kent Hagerman3a402302020-01-31 15:03:53 -0500152func NewInterContainerProxy(opts ...InterContainerProxyOption) InterContainerProxy {
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800153 return newInterContainerProxy(opts...)
154}
155
Scott Bakere6685952020-06-23 04:05:39 +0000156func (kp *interContainerProxy) Start() error {
157 logger.Info("Starting-Proxy")
Scott Baker2c1c4822019-10-16 11:02:41 -0700158
159 // Kafka MsgClient should already have been created. If not, output fatal error
160 if kp.kafkaClient == nil {
Scott Bakere6685952020-06-23 04:05:39 +0000161 logger.Fatal("kafka-client-not-set")
Scott Baker2c1c4822019-10-16 11:02:41 -0700162 }
163
Scott Baker2c1c4822019-10-16 11:02:41 -0700164 // Start the kafka client
Scott Bakere6685952020-06-23 04:05:39 +0000165 if err := kp.kafkaClient.Start(); err != nil {
166 logger.Errorw("Cannot-create-kafka-proxy", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700167 return err
168 }
169
170 // Create the topic to response channel map
171 kp.topicToResponseChannelMap = make(map[string]<-chan *ic.InterContainerMessage)
172 //
173 // Create the transactionId to Channel Map
174 kp.transactionIdToChannelMap = make(map[string]*transactionChannel)
175
176 // Create the topic to request channel map
177 kp.topicToRequestHandlerChannelMap = make(map[string]*requestHandlerChannel)
178
179 return nil
180}
181
Scott Bakere6685952020-06-23 04:05:39 +0000182func (kp *interContainerProxy) Stop() {
183 logger.Info("stopping-intercontainer-proxy")
Kent Hagerman3a402302020-01-31 15:03:53 -0500184 kp.doneOnce.Do(func() { close(kp.doneCh) })
Scott Baker2c1c4822019-10-16 11:02:41 -0700185 // TODO : Perform cleanup
Scott Bakere6685952020-06-23 04:05:39 +0000186 kp.kafkaClient.Stop()
187 err := kp.deleteAllTopicRequestHandlerChannelMap()
Scott Bakera2da2f42020-02-20 16:27:34 -0800188 if err != nil {
Scott Bakere6685952020-06-23 04:05:39 +0000189 logger.Errorw("failed-delete-all-topic-request-handler-channel-map", log.Fields{"error": err})
Scott Bakera2da2f42020-02-20 16:27:34 -0800190 }
Scott Bakere6685952020-06-23 04:05:39 +0000191 err = kp.deleteAllTopicResponseChannelMap()
Scott Bakera2da2f42020-02-20 16:27:34 -0800192 if err != nil {
Scott Bakere6685952020-06-23 04:05:39 +0000193 logger.Errorw("failed-delete-all-topic-response-channel-map", log.Fields{"error": err})
Scott Bakera2da2f42020-02-20 16:27:34 -0800194 }
Scott Bakere6685952020-06-23 04:05:39 +0000195 kp.deleteAllTransactionIdToChannelMap()
Scott Baker2c1c4822019-10-16 11:02:41 -0700196}
197
Matteo Scandolof346a2d2020-01-24 13:14:54 -0800198func (kp *interContainerProxy) GetDefaultTopic() *Topic {
199 return kp.defaultTopic
200}
201
Scott Baker2c1c4822019-10-16 11:02:41 -0700202// DeviceDiscovered publish the discovered device onto the kafka messaging bus
Scott Bakere6685952020-06-23 04:05:39 +0000203func (kp *interContainerProxy) DeviceDiscovered(deviceId string, deviceType string, parentId string, publisher string) error {
204 logger.Debugw("sending-device-discovery-msg", log.Fields{"deviceId": deviceId})
Scott Baker2c1c4822019-10-16 11:02:41 -0700205 // Simple validation
206 if deviceId == "" || deviceType == "" {
Scott Bakere6685952020-06-23 04:05:39 +0000207 logger.Errorw("invalid-parameters", log.Fields{"id": deviceId, "type": deviceType})
Scott Baker2c1c4822019-10-16 11:02:41 -0700208 return errors.New("invalid-parameters")
209 }
210 // Create the device discovery message
211 header := &ic.Header{
212 Id: uuid.New().String(),
213 Type: ic.MessageType_DEVICE_DISCOVERED,
Matteo Scandolof346a2d2020-01-24 13:14:54 -0800214 FromTopic: kp.defaultTopic.Name,
Scott Baker2c1c4822019-10-16 11:02:41 -0700215 ToTopic: kp.deviceDiscoveryTopic.Name,
Scott Baker84a55ce2020-04-17 10:11:30 -0700216 Timestamp: ptypes.TimestampNow(),
Scott Baker2c1c4822019-10-16 11:02:41 -0700217 }
218 body := &ic.DeviceDiscovered{
219 Id: deviceId,
220 DeviceType: deviceType,
221 ParentId: parentId,
222 Publisher: publisher,
223 }
224
225 var marshalledData *any.Any
226 var err error
227 if marshalledData, err = ptypes.MarshalAny(body); err != nil {
Scott Bakere6685952020-06-23 04:05:39 +0000228 logger.Errorw("cannot-marshal-request", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700229 return err
230 }
231 msg := &ic.InterContainerMessage{
232 Header: header,
233 Body: marshalledData,
234 }
235
236 // Send the message
Scott Bakere6685952020-06-23 04:05:39 +0000237 if err := kp.kafkaClient.Send(msg, kp.deviceDiscoveryTopic); err != nil {
238 logger.Errorw("cannot-send-device-discovery-message", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700239 return err
240 }
241 return nil
242}
243
Matteo Scandoloed128822020-02-10 15:52:35 -0800244// InvokeAsyncRPC is used to make an RPC request asynchronously
245func (kp *interContainerProxy) InvokeAsyncRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
246 waitForResponse bool, key string, kvArgs ...*KVArg) chan *RpcResponse {
247
Scott Bakere6685952020-06-23 04:05:39 +0000248 logger.Debugw("InvokeAsyncRPC", log.Fields{"rpc": rpc, "key": key})
Matteo Scandoloed128822020-02-10 15:52:35 -0800249 // If a replyToTopic is provided then we use it, otherwise just use the default toTopic. The replyToTopic is
250 // typically the device ID.
251 responseTopic := replyToTopic
252 if responseTopic == nil {
253 responseTopic = kp.GetDefaultTopic()
254 }
255
256 chnl := make(chan *RpcResponse)
257
258 go func() {
259
260 // once we're done,
261 // close the response channel
262 defer close(chnl)
263
264 var err error
265 var protoRequest *ic.InterContainerMessage
266
267 // Encode the request
Scott Bakere6685952020-06-23 04:05:39 +0000268 protoRequest, err = encodeRequest(rpc, toTopic, responseTopic, key, kvArgs...)
Matteo Scandoloed128822020-02-10 15:52:35 -0800269 if err != nil {
Scott Bakere6685952020-06-23 04:05:39 +0000270 logger.Warnw("cannot-format-request", log.Fields{"rpc": rpc, "error": err})
Matteo Scandoloed128822020-02-10 15:52:35 -0800271 chnl <- NewResponse(RpcFormattingError, err, nil)
272 return
273 }
274
275 // Subscribe for response, if needed, before sending request
276 var ch <-chan *ic.InterContainerMessage
Scott Bakere6685952020-06-23 04:05:39 +0000277 if ch, err = kp.subscribeForResponse(*responseTopic, protoRequest.Header.Id); err != nil {
278 logger.Errorw("failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
Matteo Scandoloed128822020-02-10 15:52:35 -0800279 chnl <- NewResponse(RpcTransportError, err, nil)
280 return
281 }
282
283 // Send request - if the topic is formatted with a device Id then we will send the request using a
284 // specific key, hence ensuring a single partition is used to publish the request. This ensures that the
285 // subscriber on that topic will receive the request in the order it was sent. The key used is the deviceId.
Scott Bakere6685952020-06-23 04:05:39 +0000286 logger.Debugw("sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
Matteo Scandoloed128822020-02-10 15:52:35 -0800287
288 // if the message is not sent on kafka publish an event an close the channel
Scott Bakere6685952020-06-23 04:05:39 +0000289 if err = kp.kafkaClient.Send(protoRequest, toTopic, key); err != nil {
Matteo Scandoloed128822020-02-10 15:52:35 -0800290 chnl <- NewResponse(RpcTransportError, err, nil)
291 return
292 }
293
294 // if the client is not waiting for a response send the ack and close the channel
295 chnl <- NewResponse(RpcSent, nil, nil)
296 if !waitForResponse {
297 return
298 }
299
300 defer func() {
301 // Remove the subscription for a response on return
Scott Bakere6685952020-06-23 04:05:39 +0000302 if err := kp.unSubscribeForResponse(protoRequest.Header.Id); err != nil {
303 logger.Warnw("invoke-async-rpc-unsubscriber-for-response-failed", log.Fields{"err": err})
Matteo Scandoloed128822020-02-10 15:52:35 -0800304 }
305 }()
306
307 // Wait for response as well as timeout or cancellation
308 select {
309 case msg, ok := <-ch:
310 if !ok {
Scott Bakere6685952020-06-23 04:05:39 +0000311 logger.Warnw("channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
Matteo Scandoloed128822020-02-10 15:52:35 -0800312 chnl <- NewResponse(RpcTransportError, status.Error(codes.Aborted, "channel closed"), nil)
313 }
Scott Bakere6685952020-06-23 04:05:39 +0000314 logger.Debugw("received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
315 if responseBody, err := decodeResponse(msg); err != nil {
Matteo Scandoloed128822020-02-10 15:52:35 -0800316 chnl <- NewResponse(RpcReply, err, nil)
317 } else {
318 if responseBody.Success {
319 chnl <- NewResponse(RpcReply, nil, responseBody.Result)
320 } else {
321 // response body contains an error
322 unpackErr := &ic.Error{}
323 if err := ptypes.UnmarshalAny(responseBody.Result, unpackErr); err != nil {
324 chnl <- NewResponse(RpcReply, err, nil)
325 } else {
326 chnl <- NewResponse(RpcReply, status.Error(codes.Internal, unpackErr.Reason), nil)
327 }
328 }
329 }
330 case <-ctx.Done():
Scott Bakere6685952020-06-23 04:05:39 +0000331 logger.Errorw("context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
Matteo Scandoloed128822020-02-10 15:52:35 -0800332 err := status.Error(codes.DeadlineExceeded, ctx.Err().Error())
333 chnl <- NewResponse(RpcTimeout, err, nil)
334 case <-kp.doneCh:
335 chnl <- NewResponse(RpcSystemClosing, nil, nil)
Scott Bakere6685952020-06-23 04:05:39 +0000336 logger.Warnw("received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
Matteo Scandoloed128822020-02-10 15:52:35 -0800337 }
338 }()
339 return chnl
340}
341
Scott Baker2c1c4822019-10-16 11:02:41 -0700342// InvokeRPC is used to send a request to a given topic
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800343func (kp *interContainerProxy) InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
Scott Baker2c1c4822019-10-16 11:02:41 -0700344 waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any) {
345
346 // If a replyToTopic is provided then we use it, otherwise just use the default toTopic. The replyToTopic is
347 // typically the device ID.
348 responseTopic := replyToTopic
349 if responseTopic == nil {
Matteo Scandolof346a2d2020-01-24 13:14:54 -0800350 responseTopic = kp.defaultTopic
Scott Baker2c1c4822019-10-16 11:02:41 -0700351 }
352
353 // Encode the request
Scott Bakere6685952020-06-23 04:05:39 +0000354 protoRequest, err := encodeRequest(rpc, toTopic, responseTopic, key, kvArgs...)
Scott Baker2c1c4822019-10-16 11:02:41 -0700355 if err != nil {
Scott Bakere6685952020-06-23 04:05:39 +0000356 logger.Warnw("cannot-format-request", log.Fields{"rpc": rpc, "error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700357 return false, nil
358 }
359
360 // Subscribe for response, if needed, before sending request
361 var ch <-chan *ic.InterContainerMessage
362 if waitForResponse {
363 var err error
Scott Bakere6685952020-06-23 04:05:39 +0000364 if ch, err = kp.subscribeForResponse(*responseTopic, protoRequest.Header.Id); err != nil {
365 logger.Errorw("failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700366 }
367 }
368
369 // Send request - if the topic is formatted with a device Id then we will send the request using a
370 // specific key, hence ensuring a single partition is used to publish the request. This ensures that the
371 // subscriber on that topic will receive the request in the order it was sent. The key used is the deviceId.
372 //key := GetDeviceIdFromTopic(*toTopic)
Scott Bakere6685952020-06-23 04:05:39 +0000373 logger.Debugw("sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800374 go func() {
Scott Bakere6685952020-06-23 04:05:39 +0000375 if err := kp.kafkaClient.Send(protoRequest, toTopic, key); err != nil {
376 logger.Errorw("send-failed", log.Fields{
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800377 "topic": toTopic,
378 "key": key,
379 "error": err})
380 }
381 }()
Scott Baker2c1c4822019-10-16 11:02:41 -0700382
383 if waitForResponse {
384 // Create a child context based on the parent context, if any
385 var cancel context.CancelFunc
386 childCtx := context.Background()
387 if ctx == nil {
388 ctx, cancel = context.WithTimeout(context.Background(), DefaultRequestTimeout*time.Millisecond)
389 } else {
390 childCtx, cancel = context.WithTimeout(ctx, DefaultRequestTimeout*time.Millisecond)
391 }
392 defer cancel()
393
394 // Wait for response as well as timeout or cancellation
395 // Remove the subscription for a response on return
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800396 defer func() {
Scott Bakere6685952020-06-23 04:05:39 +0000397 if err := kp.unSubscribeForResponse(protoRequest.Header.Id); err != nil {
398 logger.Errorw("response-unsubscribe-failed", log.Fields{
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800399 "id": protoRequest.Header.Id,
400 "error": err})
401 }
402 }()
Scott Baker2c1c4822019-10-16 11:02:41 -0700403 select {
404 case msg, ok := <-ch:
405 if !ok {
Scott Bakere6685952020-06-23 04:05:39 +0000406 logger.Warnw("channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700407 protoError := &ic.Error{Reason: "channel-closed"}
408 var marshalledArg *any.Any
409 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
410 return false, nil // Should never happen
411 }
412 return false, marshalledArg
413 }
Scott Bakere6685952020-06-23 04:05:39 +0000414 logger.Debugw("received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
Scott Baker2c1c4822019-10-16 11:02:41 -0700415 var responseBody *ic.InterContainerResponseBody
416 var err error
Scott Bakere6685952020-06-23 04:05:39 +0000417 if responseBody, err = decodeResponse(msg); err != nil {
418 logger.Errorw("decode-response-error", log.Fields{"error": err})
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800419 // FIXME we should return something
Scott Baker2c1c4822019-10-16 11:02:41 -0700420 }
421 return responseBody.Success, responseBody.Result
422 case <-ctx.Done():
Scott Bakere6685952020-06-23 04:05:39 +0000423 logger.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
Scott Baker2c1c4822019-10-16 11:02:41 -0700424 // pack the error as proto any type
Matteo Scandolob45cf592020-01-21 16:10:56 -0800425 protoError := &ic.Error{Reason: ctx.Err().Error(), Code: ic.ErrorCode_DEADLINE_EXCEEDED}
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800426
Scott Baker2c1c4822019-10-16 11:02:41 -0700427 var marshalledArg *any.Any
428 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
429 return false, nil // Should never happen
430 }
431 return false, marshalledArg
432 case <-childCtx.Done():
Scott Bakere6685952020-06-23 04:05:39 +0000433 logger.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": childCtx.Err()})
Scott Baker2c1c4822019-10-16 11:02:41 -0700434 // pack the error as proto any type
Matteo Scandolob45cf592020-01-21 16:10:56 -0800435 protoError := &ic.Error{Reason: childCtx.Err().Error(), Code: ic.ErrorCode_DEADLINE_EXCEEDED}
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800436
Scott Baker2c1c4822019-10-16 11:02:41 -0700437 var marshalledArg *any.Any
438 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
439 return false, nil // Should never happen
440 }
441 return false, marshalledArg
442 case <-kp.doneCh:
Scott Bakere6685952020-06-23 04:05:39 +0000443 logger.Infow("received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
Scott Baker2c1c4822019-10-16 11:02:41 -0700444 return true, nil
445 }
446 }
447 return true, nil
448}
449
450// SubscribeWithRequestHandlerInterface allows a caller to assign a target object to be invoked automatically
451// when a message is received on a given topic
Scott Bakere6685952020-06-23 04:05:39 +0000452func (kp *interContainerProxy) SubscribeWithRequestHandlerInterface(topic Topic, handler interface{}) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700453
454 // Subscribe to receive messages for that topic
455 var ch <-chan *ic.InterContainerMessage
456 var err error
Scott Bakere6685952020-06-23 04:05:39 +0000457 if ch, err = kp.kafkaClient.Subscribe(&topic); err != nil {
Scott Baker2c1c4822019-10-16 11:02:41 -0700458 //if ch, err = kp.Subscribe(topic); err != nil {
Scott Bakere6685952020-06-23 04:05:39 +0000459 logger.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700460 return err
461 }
462
463 kp.defaultRequestHandlerInterface = handler
464 kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: handler, ch: ch})
465 // Launch a go routine to receive and process kafka messages
Scott Bakere6685952020-06-23 04:05:39 +0000466 go kp.waitForMessages(ch, topic, handler)
Scott Baker2c1c4822019-10-16 11:02:41 -0700467
468 return nil
469}
470
471// SubscribeWithDefaultRequestHandler allows a caller to add a topic to an existing target object to be invoked automatically
472// when a message is received on a given topic. So far there is only 1 target registered per microservice
Scott Bakere6685952020-06-23 04:05:39 +0000473func (kp *interContainerProxy) SubscribeWithDefaultRequestHandler(topic Topic, initialOffset int64) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700474 // Subscribe to receive messages for that topic
475 var ch <-chan *ic.InterContainerMessage
476 var err error
Scott Bakere6685952020-06-23 04:05:39 +0000477 if ch, err = kp.kafkaClient.Subscribe(&topic, &KVArg{Key: Offset, Value: initialOffset}); err != nil {
478 logger.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700479 return err
480 }
481 kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: kp.defaultRequestHandlerInterface, ch: ch})
482
483 // Launch a go routine to receive and process kafka messages
Scott Bakere6685952020-06-23 04:05:39 +0000484 go kp.waitForMessages(ch, topic, kp.defaultRequestHandlerInterface)
Scott Baker2c1c4822019-10-16 11:02:41 -0700485
486 return nil
487}
488
Scott Bakere6685952020-06-23 04:05:39 +0000489func (kp *interContainerProxy) UnSubscribeFromRequestHandler(topic Topic) error {
490 return kp.deleteFromTopicRequestHandlerChannelMap(topic.Name)
Scott Baker2c1c4822019-10-16 11:02:41 -0700491}
492
Scott Bakere6685952020-06-23 04:05:39 +0000493func (kp *interContainerProxy) deleteFromTopicResponseChannelMap(topic string) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700494 kp.lockTopicResponseChannelMap.Lock()
495 defer kp.lockTopicResponseChannelMap.Unlock()
496 if _, exist := kp.topicToResponseChannelMap[topic]; exist {
497 // Unsubscribe to this topic first - this will close the subscribed channel
498 var err error
Scott Bakere6685952020-06-23 04:05:39 +0000499 if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
500 logger.Errorw("unsubscribing-error", log.Fields{"topic": topic})
Scott Baker2c1c4822019-10-16 11:02:41 -0700501 }
502 delete(kp.topicToResponseChannelMap, topic)
503 return err
504 } else {
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800505 return fmt.Errorf("%s-Topic-not-found", topic)
Scott Baker2c1c4822019-10-16 11:02:41 -0700506 }
507}
508
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800509// nolint: unused
Scott Bakere6685952020-06-23 04:05:39 +0000510func (kp *interContainerProxy) deleteAllTopicResponseChannelMap() error {
511 logger.Debug("delete-all-topic-response-channel")
Scott Baker2c1c4822019-10-16 11:02:41 -0700512 kp.lockTopicResponseChannelMap.Lock()
513 defer kp.lockTopicResponseChannelMap.Unlock()
Scott Bakera2da2f42020-02-20 16:27:34 -0800514 var unsubscribeFailTopics []string
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800515 for topic := range kp.topicToResponseChannelMap {
Scott Baker2c1c4822019-10-16 11:02:41 -0700516 // Unsubscribe to this topic first - this will close the subscribed channel
Scott Bakere6685952020-06-23 04:05:39 +0000517 if err := kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
Scott Bakera2da2f42020-02-20 16:27:34 -0800518 unsubscribeFailTopics = append(unsubscribeFailTopics, topic)
Scott Bakere6685952020-06-23 04:05:39 +0000519 logger.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
Scott Bakera2da2f42020-02-20 16:27:34 -0800520 // Do not return. Continue to try to unsubscribe to other topics.
521 } else {
522 // Only delete from channel map if successfully unsubscribed.
523 delete(kp.topicToResponseChannelMap, topic)
Scott Baker2c1c4822019-10-16 11:02:41 -0700524 }
Scott Baker2c1c4822019-10-16 11:02:41 -0700525 }
Scott Bakera2da2f42020-02-20 16:27:34 -0800526 if len(unsubscribeFailTopics) > 0 {
527 return fmt.Errorf("unsubscribe-errors: %v", unsubscribeFailTopics)
528 }
529 return nil
Scott Baker2c1c4822019-10-16 11:02:41 -0700530}
531
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800532func (kp *interContainerProxy) addToTopicRequestHandlerChannelMap(topic string, arg *requestHandlerChannel) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700533 kp.lockTopicRequestHandlerChannelMap.Lock()
534 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
535 if _, exist := kp.topicToRequestHandlerChannelMap[topic]; !exist {
536 kp.topicToRequestHandlerChannelMap[topic] = arg
537 }
538}
539
Scott Bakere6685952020-06-23 04:05:39 +0000540func (kp *interContainerProxy) deleteFromTopicRequestHandlerChannelMap(topic string) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700541 kp.lockTopicRequestHandlerChannelMap.Lock()
542 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
543 if _, exist := kp.topicToRequestHandlerChannelMap[topic]; exist {
544 // Close the kafka client client first by unsubscribing to this topic
Scott Bakere6685952020-06-23 04:05:39 +0000545 if err := kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800546 return err
547 }
Scott Baker2c1c4822019-10-16 11:02:41 -0700548 delete(kp.topicToRequestHandlerChannelMap, topic)
549 return nil
550 } else {
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800551 return fmt.Errorf("%s-Topic-not-found", topic)
Scott Baker2c1c4822019-10-16 11:02:41 -0700552 }
553}
554
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800555// nolint: unused
Scott Bakere6685952020-06-23 04:05:39 +0000556func (kp *interContainerProxy) deleteAllTopicRequestHandlerChannelMap() error {
557 logger.Debug("delete-all-topic-request-channel")
Scott Baker2c1c4822019-10-16 11:02:41 -0700558 kp.lockTopicRequestHandlerChannelMap.Lock()
559 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
Scott Bakera2da2f42020-02-20 16:27:34 -0800560 var unsubscribeFailTopics []string
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800561 for topic := range kp.topicToRequestHandlerChannelMap {
Scott Baker2c1c4822019-10-16 11:02:41 -0700562 // Close the kafka client client first by unsubscribing to this topic
Scott Bakere6685952020-06-23 04:05:39 +0000563 if err := kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
Scott Bakera2da2f42020-02-20 16:27:34 -0800564 unsubscribeFailTopics = append(unsubscribeFailTopics, topic)
Scott Bakere6685952020-06-23 04:05:39 +0000565 logger.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
Scott Bakera2da2f42020-02-20 16:27:34 -0800566 // Do not return. Continue to try to unsubscribe to other topics.
567 } else {
568 // Only delete from channel map if successfully unsubscribed.
569 delete(kp.topicToRequestHandlerChannelMap, topic)
Scott Baker2c1c4822019-10-16 11:02:41 -0700570 }
Scott Baker2c1c4822019-10-16 11:02:41 -0700571 }
Scott Bakera2da2f42020-02-20 16:27:34 -0800572 if len(unsubscribeFailTopics) > 0 {
573 return fmt.Errorf("unsubscribe-errors: %v", unsubscribeFailTopics)
574 }
575 return nil
Scott Baker2c1c4822019-10-16 11:02:41 -0700576}
577
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800578func (kp *interContainerProxy) addToTransactionIdToChannelMap(id string, topic *Topic, arg chan *ic.InterContainerMessage) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700579 kp.lockTransactionIdToChannelMap.Lock()
580 defer kp.lockTransactionIdToChannelMap.Unlock()
581 if _, exist := kp.transactionIdToChannelMap[id]; !exist {
582 kp.transactionIdToChannelMap[id] = &transactionChannel{topic: topic, ch: arg}
583 }
584}
585
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800586func (kp *interContainerProxy) deleteFromTransactionIdToChannelMap(id string) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700587 kp.lockTransactionIdToChannelMap.Lock()
588 defer kp.lockTransactionIdToChannelMap.Unlock()
589 if transChannel, exist := kp.transactionIdToChannelMap[id]; exist {
590 // Close the channel first
591 close(transChannel.ch)
592 delete(kp.transactionIdToChannelMap, id)
593 }
594}
595
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800596func (kp *interContainerProxy) deleteTopicTransactionIdToChannelMap(id string) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700597 kp.lockTransactionIdToChannelMap.Lock()
598 defer kp.lockTransactionIdToChannelMap.Unlock()
599 for key, value := range kp.transactionIdToChannelMap {
600 if value.topic.Name == id {
601 close(value.ch)
602 delete(kp.transactionIdToChannelMap, key)
603 }
604 }
605}
606
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800607// nolint: unused
Scott Bakere6685952020-06-23 04:05:39 +0000608func (kp *interContainerProxy) deleteAllTransactionIdToChannelMap() {
609 logger.Debug("delete-all-transaction-id-channel-map")
Scott Baker2c1c4822019-10-16 11:02:41 -0700610 kp.lockTransactionIdToChannelMap.Lock()
611 defer kp.lockTransactionIdToChannelMap.Unlock()
612 for key, value := range kp.transactionIdToChannelMap {
613 close(value.ch)
614 delete(kp.transactionIdToChannelMap, key)
615 }
616}
617
Scott Bakere6685952020-06-23 04:05:39 +0000618func (kp *interContainerProxy) DeleteTopic(topic Topic) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700619 // If we have any consumers on that topic we need to close them
Scott Bakere6685952020-06-23 04:05:39 +0000620 if err := kp.deleteFromTopicResponseChannelMap(topic.Name); err != nil {
621 logger.Errorw("delete-from-topic-responsechannelmap-failed", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700622 }
Scott Bakere6685952020-06-23 04:05:39 +0000623 if err := kp.deleteFromTopicRequestHandlerChannelMap(topic.Name); err != nil {
624 logger.Errorw("delete-from-topic-requesthandlerchannelmap-failed", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700625 }
626 kp.deleteTopicTransactionIdToChannelMap(topic.Name)
627
Scott Bakere6685952020-06-23 04:05:39 +0000628 return kp.kafkaClient.DeleteTopic(&topic)
Scott Baker2c1c4822019-10-16 11:02:41 -0700629}
630
Scott Bakere6685952020-06-23 04:05:39 +0000631func encodeReturnedValue(returnedVal interface{}) (*any.Any, error) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700632 // Encode the response argument - needs to be a proto message
633 if returnedVal == nil {
634 return nil, nil
635 }
636 protoValue, ok := returnedVal.(proto.Message)
637 if !ok {
Scott Bakere6685952020-06-23 04:05:39 +0000638 logger.Warnw("response-value-not-proto-message", log.Fields{"error": ok, "returnVal": returnedVal})
Scott Baker2c1c4822019-10-16 11:02:41 -0700639 err := errors.New("response-value-not-proto-message")
640 return nil, err
641 }
642
643 // Marshal the returned value, if any
644 var marshalledReturnedVal *any.Any
645 var err error
646 if marshalledReturnedVal, err = ptypes.MarshalAny(protoValue); err != nil {
Scott Bakere6685952020-06-23 04:05:39 +0000647 logger.Warnw("cannot-marshal-returned-val", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700648 return nil, err
649 }
650 return marshalledReturnedVal, nil
651}
652
Scott Bakere6685952020-06-23 04:05:39 +0000653func encodeDefaultFailedResponse(request *ic.InterContainerMessage) *ic.InterContainerMessage {
Scott Baker2c1c4822019-10-16 11:02:41 -0700654 responseHeader := &ic.Header{
655 Id: request.Header.Id,
656 Type: ic.MessageType_RESPONSE,
657 FromTopic: request.Header.ToTopic,
658 ToTopic: request.Header.FromTopic,
Scott Baker84a55ce2020-04-17 10:11:30 -0700659 Timestamp: ptypes.TimestampNow(),
Scott Baker2c1c4822019-10-16 11:02:41 -0700660 }
661 responseBody := &ic.InterContainerResponseBody{
662 Success: false,
663 Result: nil,
664 }
665 var marshalledResponseBody *any.Any
666 var err error
667 // Error should never happen here
668 if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
Scott Bakere6685952020-06-23 04:05:39 +0000669 logger.Warnw("cannot-marshal-failed-response-body", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700670 }
671
672 return &ic.InterContainerMessage{
673 Header: responseHeader,
674 Body: marshalledResponseBody,
675 }
676
677}
678
679//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
680//or an error on failure
Scott Bakere6685952020-06-23 04:05:39 +0000681func encodeResponse(request *ic.InterContainerMessage, success bool, returnedValues ...interface{}) (*ic.InterContainerMessage, error) {
682 //logger.Debugw("encodeResponse", log.Fields{"success": success, "returnedValues": returnedValues})
Scott Baker2c1c4822019-10-16 11:02:41 -0700683 responseHeader := &ic.Header{
684 Id: request.Header.Id,
685 Type: ic.MessageType_RESPONSE,
686 FromTopic: request.Header.ToTopic,
687 ToTopic: request.Header.FromTopic,
688 KeyTopic: request.Header.KeyTopic,
Scott Baker84a55ce2020-04-17 10:11:30 -0700689 Timestamp: ptypes.TimestampNow(),
Scott Baker2c1c4822019-10-16 11:02:41 -0700690 }
691
692 // Go over all returned values
693 var marshalledReturnedVal *any.Any
694 var err error
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800695
696 // for now we support only 1 returned value - (excluding the error)
697 if len(returnedValues) > 0 {
Scott Bakere6685952020-06-23 04:05:39 +0000698 if marshalledReturnedVal, err = encodeReturnedValue(returnedValues[0]); err != nil {
699 logger.Warnw("cannot-marshal-response-body", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700700 }
Scott Baker2c1c4822019-10-16 11:02:41 -0700701 }
702
703 responseBody := &ic.InterContainerResponseBody{
704 Success: success,
705 Result: marshalledReturnedVal,
706 }
707
708 // Marshal the response body
709 var marshalledResponseBody *any.Any
710 if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
Scott Bakere6685952020-06-23 04:05:39 +0000711 logger.Warnw("cannot-marshal-response-body", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700712 return nil, err
713 }
714
715 return &ic.InterContainerMessage{
716 Header: responseHeader,
717 Body: marshalledResponseBody,
718 }, nil
719}
720
Scott Bakere6685952020-06-23 04:05:39 +0000721func CallFuncByName(myClass interface{}, funcName string, params ...interface{}) (out []reflect.Value, err error) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700722 myClassValue := reflect.ValueOf(myClass)
723 // Capitalize the first letter in the funcName to workaround the first capital letters required to
724 // invoke a function from a different package
725 funcName = strings.Title(funcName)
726 m := myClassValue.MethodByName(funcName)
727 if !m.IsValid() {
728 return make([]reflect.Value, 0), fmt.Errorf("method-not-found \"%s\"", funcName)
729 }
Scott Bakere6685952020-06-23 04:05:39 +0000730 in := make([]reflect.Value, len(params))
Scott Baker2c1c4822019-10-16 11:02:41 -0700731 for i, param := range params {
Scott Bakere6685952020-06-23 04:05:39 +0000732 in[i] = reflect.ValueOf(param)
Scott Baker2c1c4822019-10-16 11:02:41 -0700733 }
734 out = m.Call(in)
735 return
736}
737
Scott Bakere6685952020-06-23 04:05:39 +0000738func (kp *interContainerProxy) addTransactionId(transactionId string, currentArgs []*ic.Argument) []*ic.Argument {
Scott Baker2c1c4822019-10-16 11:02:41 -0700739 arg := &KVArg{
740 Key: TransactionKey,
741 Value: &ic.StrType{Val: transactionId},
742 }
743
744 var marshalledArg *any.Any
745 var err error
746 if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: transactionId}); err != nil {
Scott Bakere6685952020-06-23 04:05:39 +0000747 logger.Warnw("cannot-add-transactionId", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700748 return currentArgs
749 }
750 protoArg := &ic.Argument{
751 Key: arg.Key,
752 Value: marshalledArg,
753 }
754 return append(currentArgs, protoArg)
755}
756
Scott Bakere6685952020-06-23 04:05:39 +0000757func (kp *interContainerProxy) addFromTopic(fromTopic string, currentArgs []*ic.Argument) []*ic.Argument {
Scott Baker2c1c4822019-10-16 11:02:41 -0700758 var marshalledArg *any.Any
759 var err error
760 if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: fromTopic}); err != nil {
Scott Bakere6685952020-06-23 04:05:39 +0000761 logger.Warnw("cannot-add-transactionId", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700762 return currentArgs
763 }
764 protoArg := &ic.Argument{
765 Key: FromTopic,
766 Value: marshalledArg,
767 }
768 return append(currentArgs, protoArg)
769}
770
Scott Bakere6685952020-06-23 04:05:39 +0000771func (kp *interContainerProxy) handleMessage(msg *ic.InterContainerMessage, targetInterface interface{}) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700772
773 // First extract the header to know whether this is a request - responses are handled by a different handler
774 if msg.Header.Type == ic.MessageType_REQUEST {
775 var out []reflect.Value
776 var err error
777
778 // Get the request body
779 requestBody := &ic.InterContainerRequestBody{}
780 if err = ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
Scott Bakere6685952020-06-23 04:05:39 +0000781 logger.Warnw("cannot-unmarshal-request", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700782 } else {
Scott Bakere6685952020-06-23 04:05:39 +0000783 logger.Debugw("received-request", log.Fields{"rpc": requestBody.Rpc, "header": msg.Header})
Scott Baker2c1c4822019-10-16 11:02:41 -0700784 // let the callee unpack the arguments as its the only one that knows the real proto type
785 // Augment the requestBody with the message Id as it will be used in scenarios where cores
786 // are set in pairs and competing
Scott Bakere6685952020-06-23 04:05:39 +0000787 requestBody.Args = kp.addTransactionId(msg.Header.Id, requestBody.Args)
Scott Baker2c1c4822019-10-16 11:02:41 -0700788
789 // Augment the requestBody with the From topic name as it will be used in scenarios where a container
790 // needs to send an unsollicited message to the currently requested container
Scott Bakere6685952020-06-23 04:05:39 +0000791 requestBody.Args = kp.addFromTopic(msg.Header.FromTopic, requestBody.Args)
Scott Baker2c1c4822019-10-16 11:02:41 -0700792
Scott Bakere6685952020-06-23 04:05:39 +0000793 out, err = CallFuncByName(targetInterface, requestBody.Rpc, requestBody.Args)
Scott Baker2c1c4822019-10-16 11:02:41 -0700794 if err != nil {
Scott Bakere6685952020-06-23 04:05:39 +0000795 logger.Warn(err)
Scott Baker2c1c4822019-10-16 11:02:41 -0700796 }
797 }
798 // Response required?
799 if requestBody.ResponseRequired {
800 // If we already have an error before then just return that
801 var returnError *ic.Error
802 var returnedValues []interface{}
803 var success bool
804 if err != nil {
805 returnError = &ic.Error{Reason: err.Error()}
806 returnedValues = make([]interface{}, 1)
807 returnedValues[0] = returnError
808 } else {
809 returnedValues = make([]interface{}, 0)
810 // Check for errors first
811 lastIndex := len(out) - 1
812 if out[lastIndex].Interface() != nil { // Error
813 if retError, ok := out[lastIndex].Interface().(error); ok {
814 if retError.Error() == ErrorTransactionNotAcquired.Error() {
Scott Bakere6685952020-06-23 04:05:39 +0000815 logger.Debugw("Ignoring request", log.Fields{"error": retError, "txId": msg.Header.Id})
Scott Baker2c1c4822019-10-16 11:02:41 -0700816 return // Ignore - process is in competing mode and ignored transaction
817 }
818 returnError = &ic.Error{Reason: retError.Error()}
819 returnedValues = append(returnedValues, returnError)
820 } else { // Should never happen
821 returnError = &ic.Error{Reason: "incorrect-error-returns"}
822 returnedValues = append(returnedValues, returnError)
823 }
824 } else if len(out) == 2 && reflect.ValueOf(out[0].Interface()).IsValid() && reflect.ValueOf(out[0].Interface()).IsNil() {
Scott Bakere6685952020-06-23 04:05:39 +0000825 logger.Warnw("Unexpected response of (nil,nil)", log.Fields{"txId": msg.Header.Id})
Scott Baker2c1c4822019-10-16 11:02:41 -0700826 return // Ignore - should not happen
827 } else { // Non-error case
828 success = true
829 for idx, val := range out {
Scott Bakere6685952020-06-23 04:05:39 +0000830 //logger.Debugw("returned-api-response-loop", log.Fields{"idx": idx, "val": val.Interface()})
Scott Baker2c1c4822019-10-16 11:02:41 -0700831 if idx != lastIndex {
832 returnedValues = append(returnedValues, val.Interface())
833 }
834 }
835 }
836 }
837
838 var icm *ic.InterContainerMessage
Scott Bakere6685952020-06-23 04:05:39 +0000839 if icm, err = encodeResponse(msg, success, returnedValues...); err != nil {
840 logger.Warnw("error-encoding-response-returning-failure-result", log.Fields{"error": err})
841 icm = encodeDefaultFailedResponse(msg)
Scott Baker2c1c4822019-10-16 11:02:41 -0700842 }
843 // To preserve ordering of messages, all messages to a given topic are sent to the same partition
844 // by providing a message key. The key is encoded in the topic name. If the deviceId is not
845 // present then the key will be empty, hence all messages for a given topic will be sent to all
846 // partitions.
847 replyTopic := &Topic{Name: msg.Header.FromTopic}
848 key := msg.Header.KeyTopic
Scott Bakere6685952020-06-23 04:05:39 +0000849 logger.Debugw("sending-response-to-kafka", log.Fields{"rpc": requestBody.Rpc, "header": icm.Header, "key": key})
Scott Baker2c1c4822019-10-16 11:02:41 -0700850 // TODO: handle error response.
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800851 go func() {
Scott Bakere6685952020-06-23 04:05:39 +0000852 if err := kp.kafkaClient.Send(icm, replyTopic, key); err != nil {
853 logger.Errorw("send-reply-failed", log.Fields{
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800854 "topic": replyTopic,
855 "key": key,
856 "error": err})
857 }
858 }()
Scott Baker2c1c4822019-10-16 11:02:41 -0700859 }
860 } else if msg.Header.Type == ic.MessageType_RESPONSE {
Scott Bakere6685952020-06-23 04:05:39 +0000861 logger.Debugw("response-received", log.Fields{"msg-header": msg.Header})
862 go kp.dispatchResponse(msg)
Scott Baker2c1c4822019-10-16 11:02:41 -0700863 } else {
Scott Bakere6685952020-06-23 04:05:39 +0000864 logger.Warnw("unsupported-message-received", log.Fields{"msg-header": msg.Header})
Scott Baker2c1c4822019-10-16 11:02:41 -0700865 }
866}
867
Scott Bakere6685952020-06-23 04:05:39 +0000868func (kp *interContainerProxy) waitForMessages(ch <-chan *ic.InterContainerMessage, topic Topic, targetInterface interface{}) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700869 // Wait for messages
870 for msg := range ch {
Scott Bakere6685952020-06-23 04:05:39 +0000871 //logger.Debugw("request-received", log.Fields{"msg": msg, "topic": topic.Name, "target": targetInterface})
872 go kp.handleMessage(msg, targetInterface)
Scott Baker2c1c4822019-10-16 11:02:41 -0700873 }
874}
875
Scott Bakere6685952020-06-23 04:05:39 +0000876func (kp *interContainerProxy) dispatchResponse(msg *ic.InterContainerMessage) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700877 kp.lockTransactionIdToChannelMap.RLock()
878 defer kp.lockTransactionIdToChannelMap.RUnlock()
879 if _, exist := kp.transactionIdToChannelMap[msg.Header.Id]; !exist {
Scott Bakere6685952020-06-23 04:05:39 +0000880 logger.Debugw("no-waiting-channel", log.Fields{"transaction": msg.Header.Id})
Scott Baker2c1c4822019-10-16 11:02:41 -0700881 return
882 }
883 kp.transactionIdToChannelMap[msg.Header.Id].ch <- msg
884}
885
886// subscribeForResponse allows a caller to subscribe to a given topic when waiting for a response.
887// This method is built to prevent all subscribers to receive all messages as is the case of the Subscribe
888// API. There is one response channel waiting for kafka messages before dispatching the message to the
889// corresponding waiting channel
Scott Bakere6685952020-06-23 04:05:39 +0000890func (kp *interContainerProxy) subscribeForResponse(topic Topic, trnsId string) (chan *ic.InterContainerMessage, error) {
891 logger.Debugw("subscribeForResponse", log.Fields{"topic": topic.Name, "trnsid": trnsId})
Scott Baker2c1c4822019-10-16 11:02:41 -0700892
893 // Create a specific channel for this consumers. We cannot use the channel from the kafkaclient as it will
894 // broadcast any message for this topic to all channels waiting on it.
Scott Bakerae1d4702020-03-04 14:10:51 -0800895 // Set channel size to 1 to prevent deadlock, see VOL-2708
896 ch := make(chan *ic.InterContainerMessage, 1)
Scott Baker2c1c4822019-10-16 11:02:41 -0700897 kp.addToTransactionIdToChannelMap(trnsId, &topic, ch)
898
899 return ch, nil
900}
901
Scott Bakere6685952020-06-23 04:05:39 +0000902func (kp *interContainerProxy) unSubscribeForResponse(trnsId string) error {
903 logger.Debugw("unsubscribe-for-response", log.Fields{"trnsId": trnsId})
Scott Baker2c1c4822019-10-16 11:02:41 -0700904 kp.deleteFromTransactionIdToChannelMap(trnsId)
905 return nil
906}
907
Scott Bakere6685952020-06-23 04:05:39 +0000908func (kp *interContainerProxy) EnableLivenessChannel(enable bool) chan bool {
909 return kp.kafkaClient.EnableLivenessChannel(enable)
Scott Baker104b67d2019-10-29 15:56:27 -0700910}
911
Scott Bakere6685952020-06-23 04:05:39 +0000912func (kp *interContainerProxy) EnableHealthinessChannel(enable bool) chan bool {
913 return kp.kafkaClient.EnableHealthinessChannel(enable)
Scott Baker0fef6982019-12-12 09:49:42 -0800914}
915
Scott Bakere6685952020-06-23 04:05:39 +0000916func (kp *interContainerProxy) SendLiveness() error {
917 return kp.kafkaClient.SendLiveness()
Scott Baker104b67d2019-10-29 15:56:27 -0700918}
919
Scott Baker2c1c4822019-10-16 11:02:41 -0700920//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
921//or an error on failure
Scott Bakere6685952020-06-23 04:05:39 +0000922func encodeRequest(rpc string, toTopic *Topic, replyTopic *Topic, key string, kvArgs ...*KVArg) (*ic.InterContainerMessage, error) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700923 requestHeader := &ic.Header{
924 Id: uuid.New().String(),
925 Type: ic.MessageType_REQUEST,
926 FromTopic: replyTopic.Name,
927 ToTopic: toTopic.Name,
928 KeyTopic: key,
Scott Baker84a55ce2020-04-17 10:11:30 -0700929 Timestamp: ptypes.TimestampNow(),
Scott Baker2c1c4822019-10-16 11:02:41 -0700930 }
931 requestBody := &ic.InterContainerRequestBody{
932 Rpc: rpc,
933 ResponseRequired: true,
934 ReplyToTopic: replyTopic.Name,
935 }
936
937 for _, arg := range kvArgs {
938 if arg == nil {
939 // In case the caller sends an array with empty args
940 continue
941 }
942 var marshalledArg *any.Any
943 var err error
944 // ascertain the value interface type is a proto.Message
945 protoValue, ok := arg.Value.(proto.Message)
946 if !ok {
Scott Bakere6685952020-06-23 04:05:39 +0000947 logger.Warnw("argument-value-not-proto-message", log.Fields{"error": ok, "Value": arg.Value})
Scott Baker2c1c4822019-10-16 11:02:41 -0700948 err := errors.New("argument-value-not-proto-message")
949 return nil, err
950 }
951 if marshalledArg, err = ptypes.MarshalAny(protoValue); err != nil {
Scott Bakere6685952020-06-23 04:05:39 +0000952 logger.Warnw("cannot-marshal-request", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700953 return nil, err
954 }
955 protoArg := &ic.Argument{
956 Key: arg.Key,
957 Value: marshalledArg,
958 }
959 requestBody.Args = append(requestBody.Args, protoArg)
960 }
961
962 var marshalledData *any.Any
963 var err error
964 if marshalledData, err = ptypes.MarshalAny(requestBody); err != nil {
Scott Bakere6685952020-06-23 04:05:39 +0000965 logger.Warnw("cannot-marshal-request", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700966 return nil, err
967 }
968 request := &ic.InterContainerMessage{
969 Header: requestHeader,
970 Body: marshalledData,
971 }
972 return request, nil
973}
974
Scott Bakere6685952020-06-23 04:05:39 +0000975func decodeResponse(response *ic.InterContainerMessage) (*ic.InterContainerResponseBody, error) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700976 // Extract the message body
977 responseBody := ic.InterContainerResponseBody{}
978 if err := ptypes.UnmarshalAny(response.Body, &responseBody); err != nil {
Scott Bakere6685952020-06-23 04:05:39 +0000979 logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700980 return nil, err
981 }
Scott Bakere6685952020-06-23 04:05:39 +0000982 //logger.Debugw("response-decoded-successfully", log.Fields{"response-status": &responseBody.Success})
Scott Baker2c1c4822019-10-16 11:02:41 -0700983
984 return &responseBody, nil
985
986}