blob: c14c54b84f7a21ea92a71090972cb6f5e00dc8c7 [file] [log] [blame]
Scott Baker2c1c4822019-10-16 11:02:41 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kafka
17
18import (
19 "context"
20 "errors"
21 "fmt"
Matteo Scandoloed128822020-02-10 15:52:35 -080022 "google.golang.org/grpc/codes"
23 "google.golang.org/grpc/status"
Scott Baker2c1c4822019-10-16 11:02:41 -070024 "reflect"
25 "strings"
26 "sync"
27 "time"
serkant.uluderyab38671c2019-11-01 09:35:38 -070028
29 "github.com/golang/protobuf/proto"
30 "github.com/golang/protobuf/ptypes"
31 "github.com/golang/protobuf/ptypes/any"
32 "github.com/google/uuid"
33 "github.com/opencord/voltha-lib-go/v3/pkg/log"
34 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
Scott Baker2c1c4822019-10-16 11:02:41 -070035)
36
Scott Baker2c1c4822019-10-16 11:02:41 -070037const (
38 DefaultMaxRetries = 3
Matteo Scandoloed128822020-02-10 15:52:35 -080039 DefaultRequestTimeout = 60000 // 60000 milliseconds - to handle a wider latency range
Scott Baker2c1c4822019-10-16 11:02:41 -070040)
41
42const (
43 TransactionKey = "transactionID"
44 FromTopic = "fromTopic"
45)
46
47var ErrorTransactionNotAcquired = errors.New("transaction-not-acquired")
48var ErrorTransactionInvalidId = errors.New("transaction-invalid-id")
49
50// requestHandlerChannel represents an interface associated with a channel. Whenever, an event is
51// obtained from that channel, this interface is invoked. This is used to handle
52// async requests into the Core via the kafka messaging bus
53type requestHandlerChannel struct {
54 requesthandlerInterface interface{}
55 ch <-chan *ic.InterContainerMessage
56}
57
58// transactionChannel represents a combination of a topic and a channel onto which a response received
59// on the kafka bus will be sent to
60type transactionChannel struct {
61 topic *Topic
62 ch chan *ic.InterContainerMessage
63}
64
Matteo Scandolo2ba00d32020-01-16 17:33:03 -080065type InterContainerProxy interface {
Neha Sharma3c425fb2020-06-08 16:42:32 +000066 Start(ctx context.Context) error
67 Stop(ctx context.Context)
Matteo Scandolof346a2d2020-01-24 13:14:54 -080068 GetDefaultTopic() *Topic
Neha Sharma3c425fb2020-06-08 16:42:32 +000069 DeviceDiscovered(ctx context.Context, deviceId string, deviceType string, parentId string, publisher string) error
Matteo Scandolo2ba00d32020-01-16 17:33:03 -080070 InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic, waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any)
Matteo Scandoloed128822020-02-10 15:52:35 -080071 InvokeAsyncRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic, waitForResponse bool, key string, kvArgs ...*KVArg) chan *RpcResponse
Neha Sharma3c425fb2020-06-08 16:42:32 +000072 SubscribeWithRequestHandlerInterface(ctx context.Context, topic Topic, handler interface{}) error
73 SubscribeWithDefaultRequestHandler(ctx context.Context, topic Topic, initialOffset int64) error
74 UnSubscribeFromRequestHandler(ctx context.Context, topic Topic) error
75 DeleteTopic(ctx context.Context, topic Topic) error
76 EnableLivenessChannel(ctx context.Context, enable bool) chan bool
77 SendLiveness(ctx context.Context) error
Matteo Scandolo2ba00d32020-01-16 17:33:03 -080078}
79
80// interContainerProxy represents the messaging proxy
81type interContainerProxy struct {
Neha Sharmadd9af392020-04-28 09:03:57 +000082 kafkaAddress string
Matteo Scandolof346a2d2020-01-24 13:14:54 -080083 defaultTopic *Topic
Scott Baker2c1c4822019-10-16 11:02:41 -070084 defaultRequestHandlerInterface interface{}
85 deviceDiscoveryTopic *Topic
86 kafkaClient Client
Kent Hagerman3a402302020-01-31 15:03:53 -050087 doneCh chan struct{}
88 doneOnce sync.Once
Scott Baker2c1c4822019-10-16 11:02:41 -070089
90 // This map is used to map a topic to an interface and channel. When a request is received
91 // on that channel (registered to the topic) then that interface is invoked.
92 topicToRequestHandlerChannelMap map[string]*requestHandlerChannel
93 lockTopicRequestHandlerChannelMap sync.RWMutex
94
95 // This map is used to map a channel to a response topic. This channel handles all responses on that
96 // channel for that topic and forward them to the appropriate consumers channel, using the
97 // transactionIdToChannelMap.
98 topicToResponseChannelMap map[string]<-chan *ic.InterContainerMessage
99 lockTopicResponseChannelMap sync.RWMutex
100
101 // This map is used to map a transaction to a consumers channel. This is used whenever a request has been
102 // sent out and we are waiting for a response.
103 transactionIdToChannelMap map[string]*transactionChannel
104 lockTransactionIdToChannelMap sync.RWMutex
105}
106
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800107type InterContainerProxyOption func(*interContainerProxy)
Scott Baker2c1c4822019-10-16 11:02:41 -0700108
Neha Sharmadd9af392020-04-28 09:03:57 +0000109func InterContainerAddress(address string) InterContainerProxyOption {
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800110 return func(args *interContainerProxy) {
Neha Sharmadd9af392020-04-28 09:03:57 +0000111 args.kafkaAddress = address
Scott Baker2c1c4822019-10-16 11:02:41 -0700112 }
113}
114
115func DefaultTopic(topic *Topic) InterContainerProxyOption {
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800116 return func(args *interContainerProxy) {
Matteo Scandolof346a2d2020-01-24 13:14:54 -0800117 args.defaultTopic = topic
Scott Baker2c1c4822019-10-16 11:02:41 -0700118 }
119}
120
121func DeviceDiscoveryTopic(topic *Topic) InterContainerProxyOption {
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800122 return func(args *interContainerProxy) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700123 args.deviceDiscoveryTopic = topic
124 }
125}
126
127func RequestHandlerInterface(handler interface{}) InterContainerProxyOption {
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800128 return func(args *interContainerProxy) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700129 args.defaultRequestHandlerInterface = handler
130 }
131}
132
133func MsgClient(client Client) InterContainerProxyOption {
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800134 return func(args *interContainerProxy) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700135 args.kafkaClient = client
136 }
137}
138
Kent Hagerman3a402302020-01-31 15:03:53 -0500139func newInterContainerProxy(opts ...InterContainerProxyOption) *interContainerProxy {
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800140 proxy := &interContainerProxy{
Neha Sharmadd9af392020-04-28 09:03:57 +0000141 kafkaAddress: DefaultKafkaAddress,
142 doneCh: make(chan struct{}),
Scott Baker2c1c4822019-10-16 11:02:41 -0700143 }
144
145 for _, option := range opts {
146 option(proxy)
147 }
148
Kent Hagerman3a402302020-01-31 15:03:53 -0500149 return proxy
Scott Baker2c1c4822019-10-16 11:02:41 -0700150}
151
Kent Hagerman3a402302020-01-31 15:03:53 -0500152func NewInterContainerProxy(opts ...InterContainerProxyOption) InterContainerProxy {
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800153 return newInterContainerProxy(opts...)
154}
155
Neha Sharma3c425fb2020-06-08 16:42:32 +0000156func (kp *interContainerProxy) Start(ctx context.Context) error {
157 logger.Info(ctx, "Starting-Proxy")
Scott Baker2c1c4822019-10-16 11:02:41 -0700158
159 // Kafka MsgClient should already have been created. If not, output fatal error
160 if kp.kafkaClient == nil {
Neha Sharma3c425fb2020-06-08 16:42:32 +0000161 logger.Fatal(ctx, "kafka-client-not-set")
Scott Baker2c1c4822019-10-16 11:02:41 -0700162 }
163
Scott Baker2c1c4822019-10-16 11:02:41 -0700164 // Start the kafka client
Neha Sharma3c425fb2020-06-08 16:42:32 +0000165 if err := kp.kafkaClient.Start(ctx); err != nil {
166 logger.Errorw(ctx, "Cannot-create-kafka-proxy", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700167 return err
168 }
169
170 // Create the topic to response channel map
171 kp.topicToResponseChannelMap = make(map[string]<-chan *ic.InterContainerMessage)
172 //
173 // Create the transactionId to Channel Map
174 kp.transactionIdToChannelMap = make(map[string]*transactionChannel)
175
176 // Create the topic to request channel map
177 kp.topicToRequestHandlerChannelMap = make(map[string]*requestHandlerChannel)
178
179 return nil
180}
181
Neha Sharma3c425fb2020-06-08 16:42:32 +0000182func (kp *interContainerProxy) Stop(ctx context.Context) {
183 logger.Info(ctx, "stopping-intercontainer-proxy")
Kent Hagerman3a402302020-01-31 15:03:53 -0500184 kp.doneOnce.Do(func() { close(kp.doneCh) })
Scott Baker2c1c4822019-10-16 11:02:41 -0700185 // TODO : Perform cleanup
Neha Sharma3c425fb2020-06-08 16:42:32 +0000186 kp.kafkaClient.Stop(ctx)
187 err := kp.deleteAllTopicRequestHandlerChannelMap(ctx)
Scott Bakera2da2f42020-02-20 16:27:34 -0800188 if err != nil {
Neha Sharma3c425fb2020-06-08 16:42:32 +0000189 logger.Errorw(ctx, "failed-delete-all-topic-request-handler-channel-map", log.Fields{"error": err})
Scott Bakera2da2f42020-02-20 16:27:34 -0800190 }
Neha Sharma3c425fb2020-06-08 16:42:32 +0000191 err = kp.deleteAllTopicResponseChannelMap(ctx)
Scott Bakera2da2f42020-02-20 16:27:34 -0800192 if err != nil {
Neha Sharma3c425fb2020-06-08 16:42:32 +0000193 logger.Errorw(ctx, "failed-delete-all-topic-response-channel-map", log.Fields{"error": err})
Scott Bakera2da2f42020-02-20 16:27:34 -0800194 }
Neha Sharma3c425fb2020-06-08 16:42:32 +0000195 kp.deleteAllTransactionIdToChannelMap(ctx)
Scott Baker2c1c4822019-10-16 11:02:41 -0700196}
197
Matteo Scandolof346a2d2020-01-24 13:14:54 -0800198func (kp *interContainerProxy) GetDefaultTopic() *Topic {
199 return kp.defaultTopic
200}
201
Scott Baker2c1c4822019-10-16 11:02:41 -0700202// DeviceDiscovered publish the discovered device onto the kafka messaging bus
Neha Sharma3c425fb2020-06-08 16:42:32 +0000203func (kp *interContainerProxy) DeviceDiscovered(ctx context.Context, deviceId string, deviceType string, parentId string, publisher string) error {
204 logger.Debugw(ctx, "sending-device-discovery-msg", log.Fields{"deviceId": deviceId})
Scott Baker2c1c4822019-10-16 11:02:41 -0700205 // Simple validation
206 if deviceId == "" || deviceType == "" {
Neha Sharma3c425fb2020-06-08 16:42:32 +0000207 logger.Errorw(ctx, "invalid-parameters", log.Fields{"id": deviceId, "type": deviceType})
Scott Baker2c1c4822019-10-16 11:02:41 -0700208 return errors.New("invalid-parameters")
209 }
210 // Create the device discovery message
211 header := &ic.Header{
212 Id: uuid.New().String(),
213 Type: ic.MessageType_DEVICE_DISCOVERED,
Matteo Scandolof346a2d2020-01-24 13:14:54 -0800214 FromTopic: kp.defaultTopic.Name,
Scott Baker2c1c4822019-10-16 11:02:41 -0700215 ToTopic: kp.deviceDiscoveryTopic.Name,
Scott Baker84a55ce2020-04-17 10:11:30 -0700216 Timestamp: ptypes.TimestampNow(),
Scott Baker2c1c4822019-10-16 11:02:41 -0700217 }
218 body := &ic.DeviceDiscovered{
219 Id: deviceId,
220 DeviceType: deviceType,
221 ParentId: parentId,
222 Publisher: publisher,
223 }
224
225 var marshalledData *any.Any
226 var err error
227 if marshalledData, err = ptypes.MarshalAny(body); err != nil {
Neha Sharma3c425fb2020-06-08 16:42:32 +0000228 logger.Errorw(ctx, "cannot-marshal-request", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700229 return err
230 }
231 msg := &ic.InterContainerMessage{
232 Header: header,
233 Body: marshalledData,
234 }
235
236 // Send the message
Neha Sharma3c425fb2020-06-08 16:42:32 +0000237 if err := kp.kafkaClient.Send(ctx, msg, kp.deviceDiscoveryTopic); err != nil {
238 logger.Errorw(ctx, "cannot-send-device-discovery-message", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700239 return err
240 }
241 return nil
242}
243
Matteo Scandoloed128822020-02-10 15:52:35 -0800244// InvokeAsyncRPC is used to make an RPC request asynchronously
245func (kp *interContainerProxy) InvokeAsyncRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
246 waitForResponse bool, key string, kvArgs ...*KVArg) chan *RpcResponse {
247
Neha Sharma3c425fb2020-06-08 16:42:32 +0000248 logger.Debugw(ctx, "InvokeAsyncRPC", log.Fields{"rpc": rpc, "key": key})
Matteo Scandoloed128822020-02-10 15:52:35 -0800249 // If a replyToTopic is provided then we use it, otherwise just use the default toTopic. The replyToTopic is
250 // typically the device ID.
251 responseTopic := replyToTopic
252 if responseTopic == nil {
253 responseTopic = kp.GetDefaultTopic()
254 }
255
256 chnl := make(chan *RpcResponse)
257
258 go func() {
259
260 // once we're done,
261 // close the response channel
262 defer close(chnl)
263
264 var err error
265 var protoRequest *ic.InterContainerMessage
266
267 // Encode the request
Neha Sharma3c425fb2020-06-08 16:42:32 +0000268 protoRequest, err = encodeRequest(ctx, rpc, toTopic, responseTopic, key, kvArgs...)
Matteo Scandoloed128822020-02-10 15:52:35 -0800269 if err != nil {
Neha Sharma3c425fb2020-06-08 16:42:32 +0000270 logger.Warnw(ctx, "cannot-format-request", log.Fields{"rpc": rpc, "error": err})
Matteo Scandoloed128822020-02-10 15:52:35 -0800271 chnl <- NewResponse(RpcFormattingError, err, nil)
272 return
273 }
274
275 // Subscribe for response, if needed, before sending request
276 var ch <-chan *ic.InterContainerMessage
Neha Sharma3c425fb2020-06-08 16:42:32 +0000277 if ch, err = kp.subscribeForResponse(ctx, *responseTopic, protoRequest.Header.Id); err != nil {
278 logger.Errorw(ctx, "failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
Matteo Scandoloed128822020-02-10 15:52:35 -0800279 chnl <- NewResponse(RpcTransportError, err, nil)
280 return
281 }
282
283 // Send request - if the topic is formatted with a device Id then we will send the request using a
284 // specific key, hence ensuring a single partition is used to publish the request. This ensures that the
285 // subscriber on that topic will receive the request in the order it was sent. The key used is the deviceId.
Neha Sharma3c425fb2020-06-08 16:42:32 +0000286 logger.Debugw(ctx, "sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
Matteo Scandoloed128822020-02-10 15:52:35 -0800287
288 // if the message is not sent on kafka publish an event an close the channel
Neha Sharma3c425fb2020-06-08 16:42:32 +0000289 if err = kp.kafkaClient.Send(ctx, protoRequest, toTopic, key); err != nil {
Matteo Scandoloed128822020-02-10 15:52:35 -0800290 chnl <- NewResponse(RpcTransportError, err, nil)
291 return
292 }
293
294 // if the client is not waiting for a response send the ack and close the channel
295 chnl <- NewResponse(RpcSent, nil, nil)
296 if !waitForResponse {
297 return
298 }
299
300 defer func() {
301 // Remove the subscription for a response on return
Neha Sharma3c425fb2020-06-08 16:42:32 +0000302 if err := kp.unSubscribeForResponse(ctx, protoRequest.Header.Id); err != nil {
303 logger.Warnw(ctx, "invoke-async-rpc-unsubscriber-for-response-failed", log.Fields{"err": err})
Matteo Scandoloed128822020-02-10 15:52:35 -0800304 }
305 }()
306
307 // Wait for response as well as timeout or cancellation
308 select {
309 case msg, ok := <-ch:
310 if !ok {
Neha Sharma3c425fb2020-06-08 16:42:32 +0000311 logger.Warnw(ctx, "channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
Matteo Scandoloed128822020-02-10 15:52:35 -0800312 chnl <- NewResponse(RpcTransportError, status.Error(codes.Aborted, "channel closed"), nil)
313 }
Neha Sharma3c425fb2020-06-08 16:42:32 +0000314 logger.Debugw(ctx, "received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
315 if responseBody, err := decodeResponse(ctx, msg); err != nil {
Matteo Scandoloed128822020-02-10 15:52:35 -0800316 chnl <- NewResponse(RpcReply, err, nil)
317 } else {
318 if responseBody.Success {
319 chnl <- NewResponse(RpcReply, nil, responseBody.Result)
320 } else {
321 // response body contains an error
322 unpackErr := &ic.Error{}
323 if err := ptypes.UnmarshalAny(responseBody.Result, unpackErr); err != nil {
324 chnl <- NewResponse(RpcReply, err, nil)
325 } else {
326 chnl <- NewResponse(RpcReply, status.Error(codes.Internal, unpackErr.Reason), nil)
327 }
328 }
329 }
330 case <-ctx.Done():
Neha Sharma3c425fb2020-06-08 16:42:32 +0000331 logger.Errorw(ctx, "context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
Matteo Scandoloed128822020-02-10 15:52:35 -0800332 err := status.Error(codes.DeadlineExceeded, ctx.Err().Error())
333 chnl <- NewResponse(RpcTimeout, err, nil)
334 case <-kp.doneCh:
335 chnl <- NewResponse(RpcSystemClosing, nil, nil)
Neha Sharma3c425fb2020-06-08 16:42:32 +0000336 logger.Warnw(ctx, "received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
Matteo Scandoloed128822020-02-10 15:52:35 -0800337 }
338 }()
339 return chnl
340}
341
Scott Baker2c1c4822019-10-16 11:02:41 -0700342// InvokeRPC is used to send a request to a given topic
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800343func (kp *interContainerProxy) InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
Scott Baker2c1c4822019-10-16 11:02:41 -0700344 waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any) {
345
346 // If a replyToTopic is provided then we use it, otherwise just use the default toTopic. The replyToTopic is
347 // typically the device ID.
348 responseTopic := replyToTopic
349 if responseTopic == nil {
Matteo Scandolof346a2d2020-01-24 13:14:54 -0800350 responseTopic = kp.defaultTopic
Scott Baker2c1c4822019-10-16 11:02:41 -0700351 }
352
353 // Encode the request
Neha Sharma3c425fb2020-06-08 16:42:32 +0000354 protoRequest, err := encodeRequest(ctx, rpc, toTopic, responseTopic, key, kvArgs...)
Scott Baker2c1c4822019-10-16 11:02:41 -0700355 if err != nil {
Neha Sharma3c425fb2020-06-08 16:42:32 +0000356 logger.Warnw(ctx, "cannot-format-request", log.Fields{"rpc": rpc, "error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700357 return false, nil
358 }
359
360 // Subscribe for response, if needed, before sending request
361 var ch <-chan *ic.InterContainerMessage
362 if waitForResponse {
363 var err error
Neha Sharma3c425fb2020-06-08 16:42:32 +0000364 if ch, err = kp.subscribeForResponse(ctx, *responseTopic, protoRequest.Header.Id); err != nil {
365 logger.Errorw(ctx, "failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700366 }
367 }
368
369 // Send request - if the topic is formatted with a device Id then we will send the request using a
370 // specific key, hence ensuring a single partition is used to publish the request. This ensures that the
371 // subscriber on that topic will receive the request in the order it was sent. The key used is the deviceId.
372 //key := GetDeviceIdFromTopic(*toTopic)
Neha Sharma3c425fb2020-06-08 16:42:32 +0000373 logger.Debugw(ctx, "sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800374 go func() {
Neha Sharma3c425fb2020-06-08 16:42:32 +0000375 if err := kp.kafkaClient.Send(ctx, protoRequest, toTopic, key); err != nil {
376 logger.Errorw(ctx, "send-failed", log.Fields{
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800377 "topic": toTopic,
378 "key": key,
379 "error": err})
380 }
381 }()
Scott Baker2c1c4822019-10-16 11:02:41 -0700382
383 if waitForResponse {
384 // Create a child context based on the parent context, if any
385 var cancel context.CancelFunc
386 childCtx := context.Background()
387 if ctx == nil {
388 ctx, cancel = context.WithTimeout(context.Background(), DefaultRequestTimeout*time.Millisecond)
389 } else {
390 childCtx, cancel = context.WithTimeout(ctx, DefaultRequestTimeout*time.Millisecond)
391 }
392 defer cancel()
393
394 // Wait for response as well as timeout or cancellation
395 // Remove the subscription for a response on return
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800396 defer func() {
Neha Sharma3c425fb2020-06-08 16:42:32 +0000397 if err := kp.unSubscribeForResponse(ctx, protoRequest.Header.Id); err != nil {
398 logger.Errorw(ctx, "response-unsubscribe-failed", log.Fields{
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800399 "id": protoRequest.Header.Id,
400 "error": err})
401 }
402 }()
Scott Baker2c1c4822019-10-16 11:02:41 -0700403 select {
404 case msg, ok := <-ch:
405 if !ok {
Neha Sharma3c425fb2020-06-08 16:42:32 +0000406 logger.Warnw(ctx, "channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700407 protoError := &ic.Error{Reason: "channel-closed"}
408 var marshalledArg *any.Any
409 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
410 return false, nil // Should never happen
411 }
412 return false, marshalledArg
413 }
Neha Sharma3c425fb2020-06-08 16:42:32 +0000414 logger.Debugw(ctx, "received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
Scott Baker2c1c4822019-10-16 11:02:41 -0700415 var responseBody *ic.InterContainerResponseBody
416 var err error
Neha Sharma3c425fb2020-06-08 16:42:32 +0000417 if responseBody, err = decodeResponse(ctx, msg); err != nil {
418 logger.Errorw(ctx, "decode-response-error", log.Fields{"error": err})
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800419 // FIXME we should return something
Scott Baker2c1c4822019-10-16 11:02:41 -0700420 }
421 return responseBody.Success, responseBody.Result
422 case <-ctx.Done():
Neha Sharma3c425fb2020-06-08 16:42:32 +0000423 logger.Debugw(ctx, "context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
Scott Baker2c1c4822019-10-16 11:02:41 -0700424 // pack the error as proto any type
Matteo Scandolob45cf592020-01-21 16:10:56 -0800425 protoError := &ic.Error{Reason: ctx.Err().Error(), Code: ic.ErrorCode_DEADLINE_EXCEEDED}
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800426
Scott Baker2c1c4822019-10-16 11:02:41 -0700427 var marshalledArg *any.Any
428 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
429 return false, nil // Should never happen
430 }
431 return false, marshalledArg
432 case <-childCtx.Done():
Neha Sharma3c425fb2020-06-08 16:42:32 +0000433 logger.Debugw(ctx, "context-cancelled", log.Fields{"rpc": rpc, "ctx": childCtx.Err()})
Scott Baker2c1c4822019-10-16 11:02:41 -0700434 // pack the error as proto any type
Matteo Scandolob45cf592020-01-21 16:10:56 -0800435 protoError := &ic.Error{Reason: childCtx.Err().Error(), Code: ic.ErrorCode_DEADLINE_EXCEEDED}
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800436
Scott Baker2c1c4822019-10-16 11:02:41 -0700437 var marshalledArg *any.Any
438 if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
439 return false, nil // Should never happen
440 }
441 return false, marshalledArg
442 case <-kp.doneCh:
Neha Sharma3c425fb2020-06-08 16:42:32 +0000443 logger.Infow(ctx, "received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
Scott Baker2c1c4822019-10-16 11:02:41 -0700444 return true, nil
445 }
446 }
447 return true, nil
448}
449
450// SubscribeWithRequestHandlerInterface allows a caller to assign a target object to be invoked automatically
451// when a message is received on a given topic
Neha Sharma3c425fb2020-06-08 16:42:32 +0000452func (kp *interContainerProxy) SubscribeWithRequestHandlerInterface(ctx context.Context, topic Topic, handler interface{}) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700453
454 // Subscribe to receive messages for that topic
455 var ch <-chan *ic.InterContainerMessage
456 var err error
Neha Sharma3c425fb2020-06-08 16:42:32 +0000457 if ch, err = kp.kafkaClient.Subscribe(ctx, &topic); err != nil {
Scott Baker2c1c4822019-10-16 11:02:41 -0700458 //if ch, err = kp.Subscribe(topic); err != nil {
Neha Sharma3c425fb2020-06-08 16:42:32 +0000459 logger.Errorw(ctx, "failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700460 return err
461 }
462
463 kp.defaultRequestHandlerInterface = handler
464 kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: handler, ch: ch})
465 // Launch a go routine to receive and process kafka messages
Neha Sharma3c425fb2020-06-08 16:42:32 +0000466 go kp.waitForMessages(ctx, ch, topic, handler)
Scott Baker2c1c4822019-10-16 11:02:41 -0700467
468 return nil
469}
470
471// SubscribeWithDefaultRequestHandler allows a caller to add a topic to an existing target object to be invoked automatically
472// when a message is received on a given topic. So far there is only 1 target registered per microservice
Neha Sharma3c425fb2020-06-08 16:42:32 +0000473func (kp *interContainerProxy) SubscribeWithDefaultRequestHandler(ctx context.Context, topic Topic, initialOffset int64) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700474 // Subscribe to receive messages for that topic
475 var ch <-chan *ic.InterContainerMessage
476 var err error
Neha Sharma3c425fb2020-06-08 16:42:32 +0000477 if ch, err = kp.kafkaClient.Subscribe(ctx, &topic, &KVArg{Key: Offset, Value: initialOffset}); err != nil {
478 logger.Errorw(ctx, "failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700479 return err
480 }
481 kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: kp.defaultRequestHandlerInterface, ch: ch})
482
483 // Launch a go routine to receive and process kafka messages
Neha Sharma3c425fb2020-06-08 16:42:32 +0000484 go kp.waitForMessages(ctx, ch, topic, kp.defaultRequestHandlerInterface)
Scott Baker2c1c4822019-10-16 11:02:41 -0700485
486 return nil
487}
488
Neha Sharma3c425fb2020-06-08 16:42:32 +0000489func (kp *interContainerProxy) UnSubscribeFromRequestHandler(ctx context.Context, topic Topic) error {
490 return kp.deleteFromTopicRequestHandlerChannelMap(ctx, topic.Name)
Scott Baker2c1c4822019-10-16 11:02:41 -0700491}
492
Neha Sharma3c425fb2020-06-08 16:42:32 +0000493func (kp *interContainerProxy) deleteFromTopicResponseChannelMap(ctx context.Context, topic string) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700494 kp.lockTopicResponseChannelMap.Lock()
495 defer kp.lockTopicResponseChannelMap.Unlock()
496 if _, exist := kp.topicToResponseChannelMap[topic]; exist {
497 // Unsubscribe to this topic first - this will close the subscribed channel
498 var err error
Neha Sharma3c425fb2020-06-08 16:42:32 +0000499 if err = kp.kafkaClient.UnSubscribe(ctx, &Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
500 logger.Errorw(ctx, "unsubscribing-error", log.Fields{"topic": topic})
Scott Baker2c1c4822019-10-16 11:02:41 -0700501 }
502 delete(kp.topicToResponseChannelMap, topic)
503 return err
504 } else {
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800505 return fmt.Errorf("%s-Topic-not-found", topic)
Scott Baker2c1c4822019-10-16 11:02:41 -0700506 }
507}
508
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800509// nolint: unused
Neha Sharma3c425fb2020-06-08 16:42:32 +0000510func (kp *interContainerProxy) deleteAllTopicResponseChannelMap(ctx context.Context) error {
511 logger.Debug(ctx, "delete-all-topic-response-channel")
Scott Baker2c1c4822019-10-16 11:02:41 -0700512 kp.lockTopicResponseChannelMap.Lock()
513 defer kp.lockTopicResponseChannelMap.Unlock()
Scott Bakera2da2f42020-02-20 16:27:34 -0800514 var unsubscribeFailTopics []string
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800515 for topic := range kp.topicToResponseChannelMap {
Scott Baker2c1c4822019-10-16 11:02:41 -0700516 // Unsubscribe to this topic first - this will close the subscribed channel
Neha Sharma3c425fb2020-06-08 16:42:32 +0000517 if err := kp.kafkaClient.UnSubscribe(ctx, &Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
Scott Bakera2da2f42020-02-20 16:27:34 -0800518 unsubscribeFailTopics = append(unsubscribeFailTopics, topic)
Neha Sharma3c425fb2020-06-08 16:42:32 +0000519 logger.Errorw(ctx, "unsubscribing-error", log.Fields{"topic": topic, "error": err})
Scott Bakera2da2f42020-02-20 16:27:34 -0800520 // Do not return. Continue to try to unsubscribe to other topics.
521 } else {
522 // Only delete from channel map if successfully unsubscribed.
523 delete(kp.topicToResponseChannelMap, topic)
Scott Baker2c1c4822019-10-16 11:02:41 -0700524 }
Scott Baker2c1c4822019-10-16 11:02:41 -0700525 }
Scott Bakera2da2f42020-02-20 16:27:34 -0800526 if len(unsubscribeFailTopics) > 0 {
527 return fmt.Errorf("unsubscribe-errors: %v", unsubscribeFailTopics)
528 }
529 return nil
Scott Baker2c1c4822019-10-16 11:02:41 -0700530}
531
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800532func (kp *interContainerProxy) addToTopicRequestHandlerChannelMap(topic string, arg *requestHandlerChannel) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700533 kp.lockTopicRequestHandlerChannelMap.Lock()
534 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
535 if _, exist := kp.topicToRequestHandlerChannelMap[topic]; !exist {
536 kp.topicToRequestHandlerChannelMap[topic] = arg
537 }
538}
539
Neha Sharma3c425fb2020-06-08 16:42:32 +0000540func (kp *interContainerProxy) deleteFromTopicRequestHandlerChannelMap(ctx context.Context, topic string) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700541 kp.lockTopicRequestHandlerChannelMap.Lock()
542 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
543 if _, exist := kp.topicToRequestHandlerChannelMap[topic]; exist {
544 // Close the kafka client client first by unsubscribing to this topic
Neha Sharma3c425fb2020-06-08 16:42:32 +0000545 if err := kp.kafkaClient.UnSubscribe(ctx, &Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800546 return err
547 }
Scott Baker2c1c4822019-10-16 11:02:41 -0700548 delete(kp.topicToRequestHandlerChannelMap, topic)
549 return nil
550 } else {
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800551 return fmt.Errorf("%s-Topic-not-found", topic)
Scott Baker2c1c4822019-10-16 11:02:41 -0700552 }
553}
554
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800555// nolint: unused
Neha Sharma3c425fb2020-06-08 16:42:32 +0000556func (kp *interContainerProxy) deleteAllTopicRequestHandlerChannelMap(ctx context.Context) error {
557 logger.Debug(ctx, "delete-all-topic-request-channel")
Scott Baker2c1c4822019-10-16 11:02:41 -0700558 kp.lockTopicRequestHandlerChannelMap.Lock()
559 defer kp.lockTopicRequestHandlerChannelMap.Unlock()
Scott Bakera2da2f42020-02-20 16:27:34 -0800560 var unsubscribeFailTopics []string
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800561 for topic := range kp.topicToRequestHandlerChannelMap {
Scott Baker2c1c4822019-10-16 11:02:41 -0700562 // Close the kafka client client first by unsubscribing to this topic
Neha Sharma3c425fb2020-06-08 16:42:32 +0000563 if err := kp.kafkaClient.UnSubscribe(ctx, &Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
Scott Bakera2da2f42020-02-20 16:27:34 -0800564 unsubscribeFailTopics = append(unsubscribeFailTopics, topic)
Neha Sharma3c425fb2020-06-08 16:42:32 +0000565 logger.Errorw(ctx, "unsubscribing-error", log.Fields{"topic": topic, "error": err})
Scott Bakera2da2f42020-02-20 16:27:34 -0800566 // Do not return. Continue to try to unsubscribe to other topics.
567 } else {
568 // Only delete from channel map if successfully unsubscribed.
569 delete(kp.topicToRequestHandlerChannelMap, topic)
Scott Baker2c1c4822019-10-16 11:02:41 -0700570 }
Scott Baker2c1c4822019-10-16 11:02:41 -0700571 }
Scott Bakera2da2f42020-02-20 16:27:34 -0800572 if len(unsubscribeFailTopics) > 0 {
573 return fmt.Errorf("unsubscribe-errors: %v", unsubscribeFailTopics)
574 }
575 return nil
Scott Baker2c1c4822019-10-16 11:02:41 -0700576}
577
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800578func (kp *interContainerProxy) addToTransactionIdToChannelMap(id string, topic *Topic, arg chan *ic.InterContainerMessage) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700579 kp.lockTransactionIdToChannelMap.Lock()
580 defer kp.lockTransactionIdToChannelMap.Unlock()
581 if _, exist := kp.transactionIdToChannelMap[id]; !exist {
582 kp.transactionIdToChannelMap[id] = &transactionChannel{topic: topic, ch: arg}
583 }
584}
585
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800586func (kp *interContainerProxy) deleteFromTransactionIdToChannelMap(id string) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700587 kp.lockTransactionIdToChannelMap.Lock()
588 defer kp.lockTransactionIdToChannelMap.Unlock()
589 if transChannel, exist := kp.transactionIdToChannelMap[id]; exist {
590 // Close the channel first
591 close(transChannel.ch)
592 delete(kp.transactionIdToChannelMap, id)
593 }
594}
595
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800596func (kp *interContainerProxy) deleteTopicTransactionIdToChannelMap(id string) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700597 kp.lockTransactionIdToChannelMap.Lock()
598 defer kp.lockTransactionIdToChannelMap.Unlock()
599 for key, value := range kp.transactionIdToChannelMap {
600 if value.topic.Name == id {
601 close(value.ch)
602 delete(kp.transactionIdToChannelMap, key)
603 }
604 }
605}
606
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800607// nolint: unused
Neha Sharma3c425fb2020-06-08 16:42:32 +0000608func (kp *interContainerProxy) deleteAllTransactionIdToChannelMap(ctx context.Context) {
609 logger.Debug(ctx, "delete-all-transaction-id-channel-map")
Scott Baker2c1c4822019-10-16 11:02:41 -0700610 kp.lockTransactionIdToChannelMap.Lock()
611 defer kp.lockTransactionIdToChannelMap.Unlock()
612 for key, value := range kp.transactionIdToChannelMap {
613 close(value.ch)
614 delete(kp.transactionIdToChannelMap, key)
615 }
616}
617
Neha Sharma3c425fb2020-06-08 16:42:32 +0000618func (kp *interContainerProxy) DeleteTopic(ctx context.Context, topic Topic) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700619 // If we have any consumers on that topic we need to close them
Neha Sharma3c425fb2020-06-08 16:42:32 +0000620 if err := kp.deleteFromTopicResponseChannelMap(ctx, topic.Name); err != nil {
621 logger.Errorw(ctx, "delete-from-topic-responsechannelmap-failed", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700622 }
Neha Sharma3c425fb2020-06-08 16:42:32 +0000623 if err := kp.deleteFromTopicRequestHandlerChannelMap(ctx, topic.Name); err != nil {
624 logger.Errorw(ctx, "delete-from-topic-requesthandlerchannelmap-failed", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700625 }
626 kp.deleteTopicTransactionIdToChannelMap(topic.Name)
627
Neha Sharma3c425fb2020-06-08 16:42:32 +0000628 return kp.kafkaClient.DeleteTopic(ctx, &topic)
Scott Baker2c1c4822019-10-16 11:02:41 -0700629}
630
Neha Sharma3c425fb2020-06-08 16:42:32 +0000631func encodeReturnedValue(ctx context.Context, returnedVal interface{}) (*any.Any, error) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700632 // Encode the response argument - needs to be a proto message
633 if returnedVal == nil {
634 return nil, nil
635 }
636 protoValue, ok := returnedVal.(proto.Message)
637 if !ok {
Neha Sharma3c425fb2020-06-08 16:42:32 +0000638 logger.Warnw(ctx, "response-value-not-proto-message", log.Fields{"error": ok, "returnVal": returnedVal})
Scott Baker2c1c4822019-10-16 11:02:41 -0700639 err := errors.New("response-value-not-proto-message")
640 return nil, err
641 }
642
643 // Marshal the returned value, if any
644 var marshalledReturnedVal *any.Any
645 var err error
646 if marshalledReturnedVal, err = ptypes.MarshalAny(protoValue); err != nil {
Neha Sharma3c425fb2020-06-08 16:42:32 +0000647 logger.Warnw(ctx, "cannot-marshal-returned-val", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700648 return nil, err
649 }
650 return marshalledReturnedVal, nil
651}
652
Neha Sharma3c425fb2020-06-08 16:42:32 +0000653func encodeDefaultFailedResponse(ctx context.Context, request *ic.InterContainerMessage) *ic.InterContainerMessage {
Scott Baker2c1c4822019-10-16 11:02:41 -0700654 responseHeader := &ic.Header{
655 Id: request.Header.Id,
656 Type: ic.MessageType_RESPONSE,
657 FromTopic: request.Header.ToTopic,
658 ToTopic: request.Header.FromTopic,
Scott Baker84a55ce2020-04-17 10:11:30 -0700659 Timestamp: ptypes.TimestampNow(),
Scott Baker2c1c4822019-10-16 11:02:41 -0700660 }
661 responseBody := &ic.InterContainerResponseBody{
662 Success: false,
663 Result: nil,
664 }
665 var marshalledResponseBody *any.Any
666 var err error
667 // Error should never happen here
668 if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
Neha Sharma3c425fb2020-06-08 16:42:32 +0000669 logger.Warnw(ctx, "cannot-marshal-failed-response-body", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700670 }
671
672 return &ic.InterContainerMessage{
673 Header: responseHeader,
674 Body: marshalledResponseBody,
675 }
676
677}
678
679//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
680//or an error on failure
Neha Sharma3c425fb2020-06-08 16:42:32 +0000681func encodeResponse(ctx context.Context, request *ic.InterContainerMessage, success bool, returnedValues ...interface{}) (*ic.InterContainerMessage, error) {
682 //logger.Debugw(ctx, "encodeResponse", log.Fields{"success": success, "returnedValues": returnedValues})
Scott Baker2c1c4822019-10-16 11:02:41 -0700683 responseHeader := &ic.Header{
684 Id: request.Header.Id,
685 Type: ic.MessageType_RESPONSE,
686 FromTopic: request.Header.ToTopic,
687 ToTopic: request.Header.FromTopic,
688 KeyTopic: request.Header.KeyTopic,
Scott Baker84a55ce2020-04-17 10:11:30 -0700689 Timestamp: ptypes.TimestampNow(),
Scott Baker2c1c4822019-10-16 11:02:41 -0700690 }
691
692 // Go over all returned values
693 var marshalledReturnedVal *any.Any
694 var err error
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800695
696 // for now we support only 1 returned value - (excluding the error)
697 if len(returnedValues) > 0 {
Neha Sharma3c425fb2020-06-08 16:42:32 +0000698 if marshalledReturnedVal, err = encodeReturnedValue(ctx, returnedValues[0]); err != nil {
699 logger.Warnw(ctx, "cannot-marshal-response-body", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700700 }
Scott Baker2c1c4822019-10-16 11:02:41 -0700701 }
702
703 responseBody := &ic.InterContainerResponseBody{
704 Success: success,
705 Result: marshalledReturnedVal,
706 }
707
708 // Marshal the response body
709 var marshalledResponseBody *any.Any
710 if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
Neha Sharma3c425fb2020-06-08 16:42:32 +0000711 logger.Warnw(ctx, "cannot-marshal-response-body", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700712 return nil, err
713 }
714
715 return &ic.InterContainerMessage{
716 Header: responseHeader,
717 Body: marshalledResponseBody,
718 }, nil
719}
720
Neha Sharma3c425fb2020-06-08 16:42:32 +0000721func CallFuncByName(ctx context.Context, myClass interface{}, funcName string, params ...interface{}) (out []reflect.Value, err error) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700722 myClassValue := reflect.ValueOf(myClass)
723 // Capitalize the first letter in the funcName to workaround the first capital letters required to
724 // invoke a function from a different package
725 funcName = strings.Title(funcName)
726 m := myClassValue.MethodByName(funcName)
727 if !m.IsValid() {
728 return make([]reflect.Value, 0), fmt.Errorf("method-not-found \"%s\"", funcName)
729 }
Neha Sharma3c425fb2020-06-08 16:42:32 +0000730 in := make([]reflect.Value, len(params)+1)
731 in[0] = reflect.ValueOf(ctx)
Scott Baker2c1c4822019-10-16 11:02:41 -0700732 for i, param := range params {
Neha Sharma3c425fb2020-06-08 16:42:32 +0000733 in[i+1] = reflect.ValueOf(param)
Scott Baker2c1c4822019-10-16 11:02:41 -0700734 }
735 out = m.Call(in)
736 return
737}
738
Neha Sharma3c425fb2020-06-08 16:42:32 +0000739func (kp *interContainerProxy) addTransactionId(ctx context.Context, transactionId string, currentArgs []*ic.Argument) []*ic.Argument {
Scott Baker2c1c4822019-10-16 11:02:41 -0700740 arg := &KVArg{
741 Key: TransactionKey,
742 Value: &ic.StrType{Val: transactionId},
743 }
744
745 var marshalledArg *any.Any
746 var err error
747 if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: transactionId}); err != nil {
Neha Sharma3c425fb2020-06-08 16:42:32 +0000748 logger.Warnw(ctx, "cannot-add-transactionId", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700749 return currentArgs
750 }
751 protoArg := &ic.Argument{
752 Key: arg.Key,
753 Value: marshalledArg,
754 }
755 return append(currentArgs, protoArg)
756}
757
Neha Sharma3c425fb2020-06-08 16:42:32 +0000758func (kp *interContainerProxy) addFromTopic(ctx context.Context, fromTopic string, currentArgs []*ic.Argument) []*ic.Argument {
Scott Baker2c1c4822019-10-16 11:02:41 -0700759 var marshalledArg *any.Any
760 var err error
761 if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: fromTopic}); err != nil {
Neha Sharma3c425fb2020-06-08 16:42:32 +0000762 logger.Warnw(ctx, "cannot-add-transactionId", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700763 return currentArgs
764 }
765 protoArg := &ic.Argument{
766 Key: FromTopic,
767 Value: marshalledArg,
768 }
769 return append(currentArgs, protoArg)
770}
771
Neha Sharma3c425fb2020-06-08 16:42:32 +0000772func (kp *interContainerProxy) handleMessage(ctx context.Context, msg *ic.InterContainerMessage, targetInterface interface{}) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700773
774 // First extract the header to know whether this is a request - responses are handled by a different handler
775 if msg.Header.Type == ic.MessageType_REQUEST {
776 var out []reflect.Value
777 var err error
778
779 // Get the request body
780 requestBody := &ic.InterContainerRequestBody{}
781 if err = ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
Neha Sharma3c425fb2020-06-08 16:42:32 +0000782 logger.Warnw(ctx, "cannot-unmarshal-request", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700783 } else {
Neha Sharma3c425fb2020-06-08 16:42:32 +0000784 logger.Debugw(ctx, "received-request", log.Fields{"rpc": requestBody.Rpc, "header": msg.Header})
Scott Baker2c1c4822019-10-16 11:02:41 -0700785 // let the callee unpack the arguments as its the only one that knows the real proto type
786 // Augment the requestBody with the message Id as it will be used in scenarios where cores
787 // are set in pairs and competing
Neha Sharma3c425fb2020-06-08 16:42:32 +0000788 requestBody.Args = kp.addTransactionId(ctx, msg.Header.Id, requestBody.Args)
Scott Baker2c1c4822019-10-16 11:02:41 -0700789
790 // Augment the requestBody with the From topic name as it will be used in scenarios where a container
791 // needs to send an unsollicited message to the currently requested container
Neha Sharma3c425fb2020-06-08 16:42:32 +0000792 requestBody.Args = kp.addFromTopic(ctx, msg.Header.FromTopic, requestBody.Args)
Scott Baker2c1c4822019-10-16 11:02:41 -0700793
Neha Sharma3c425fb2020-06-08 16:42:32 +0000794 out, err = CallFuncByName(ctx, targetInterface, requestBody.Rpc, requestBody.Args)
Scott Baker2c1c4822019-10-16 11:02:41 -0700795 if err != nil {
Neha Sharma3c425fb2020-06-08 16:42:32 +0000796 logger.Warn(ctx, err)
Scott Baker2c1c4822019-10-16 11:02:41 -0700797 }
798 }
799 // Response required?
800 if requestBody.ResponseRequired {
801 // If we already have an error before then just return that
802 var returnError *ic.Error
803 var returnedValues []interface{}
804 var success bool
805 if err != nil {
806 returnError = &ic.Error{Reason: err.Error()}
807 returnedValues = make([]interface{}, 1)
808 returnedValues[0] = returnError
809 } else {
810 returnedValues = make([]interface{}, 0)
811 // Check for errors first
812 lastIndex := len(out) - 1
813 if out[lastIndex].Interface() != nil { // Error
814 if retError, ok := out[lastIndex].Interface().(error); ok {
815 if retError.Error() == ErrorTransactionNotAcquired.Error() {
Neha Sharma3c425fb2020-06-08 16:42:32 +0000816 logger.Debugw(ctx, "Ignoring request", log.Fields{"error": retError, "txId": msg.Header.Id})
Scott Baker2c1c4822019-10-16 11:02:41 -0700817 return // Ignore - process is in competing mode and ignored transaction
818 }
819 returnError = &ic.Error{Reason: retError.Error()}
820 returnedValues = append(returnedValues, returnError)
821 } else { // Should never happen
822 returnError = &ic.Error{Reason: "incorrect-error-returns"}
823 returnedValues = append(returnedValues, returnError)
824 }
825 } else if len(out) == 2 && reflect.ValueOf(out[0].Interface()).IsValid() && reflect.ValueOf(out[0].Interface()).IsNil() {
Neha Sharma3c425fb2020-06-08 16:42:32 +0000826 logger.Warnw(ctx, "Unexpected response of (nil,nil)", log.Fields{"txId": msg.Header.Id})
Scott Baker2c1c4822019-10-16 11:02:41 -0700827 return // Ignore - should not happen
828 } else { // Non-error case
829 success = true
830 for idx, val := range out {
Neha Sharma3c425fb2020-06-08 16:42:32 +0000831 //logger.Debugw(ctx, "returned-api-response-loop", log.Fields{"idx": idx, "val": val.Interface()})
Scott Baker2c1c4822019-10-16 11:02:41 -0700832 if idx != lastIndex {
833 returnedValues = append(returnedValues, val.Interface())
834 }
835 }
836 }
837 }
838
839 var icm *ic.InterContainerMessage
Neha Sharma3c425fb2020-06-08 16:42:32 +0000840 if icm, err = encodeResponse(ctx, msg, success, returnedValues...); err != nil {
841 logger.Warnw(ctx, "error-encoding-response-returning-failure-result", log.Fields{"error": err})
842 icm = encodeDefaultFailedResponse(ctx, msg)
Scott Baker2c1c4822019-10-16 11:02:41 -0700843 }
844 // To preserve ordering of messages, all messages to a given topic are sent to the same partition
845 // by providing a message key. The key is encoded in the topic name. If the deviceId is not
846 // present then the key will be empty, hence all messages for a given topic will be sent to all
847 // partitions.
848 replyTopic := &Topic{Name: msg.Header.FromTopic}
849 key := msg.Header.KeyTopic
Neha Sharma3c425fb2020-06-08 16:42:32 +0000850 logger.Debugw(ctx, "sending-response-to-kafka", log.Fields{"rpc": requestBody.Rpc, "header": icm.Header, "key": key})
Scott Baker2c1c4822019-10-16 11:02:41 -0700851 // TODO: handle error response.
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800852 go func() {
Neha Sharma3c425fb2020-06-08 16:42:32 +0000853 if err := kp.kafkaClient.Send(ctx, icm, replyTopic, key); err != nil {
854 logger.Errorw(ctx, "send-reply-failed", log.Fields{
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800855 "topic": replyTopic,
856 "key": key,
857 "error": err})
858 }
859 }()
Scott Baker2c1c4822019-10-16 11:02:41 -0700860 }
861 } else if msg.Header.Type == ic.MessageType_RESPONSE {
Neha Sharma3c425fb2020-06-08 16:42:32 +0000862 logger.Debugw(ctx, "response-received", log.Fields{"msg-header": msg.Header})
863 go kp.dispatchResponse(ctx, msg)
Scott Baker2c1c4822019-10-16 11:02:41 -0700864 } else {
Neha Sharma3c425fb2020-06-08 16:42:32 +0000865 logger.Warnw(ctx, "unsupported-message-received", log.Fields{"msg-header": msg.Header})
Scott Baker2c1c4822019-10-16 11:02:41 -0700866 }
867}
868
Neha Sharma3c425fb2020-06-08 16:42:32 +0000869func (kp *interContainerProxy) waitForMessages(ctx context.Context, ch <-chan *ic.InterContainerMessage, topic Topic, targetInterface interface{}) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700870 // Wait for messages
871 for msg := range ch {
Neha Sharma3c425fb2020-06-08 16:42:32 +0000872 //logger.Debugw(ctx, "request-received", log.Fields{"msg": msg, "topic": topic.Name, "target": targetInterface})
873 go kp.handleMessage(ctx, msg, targetInterface)
Scott Baker2c1c4822019-10-16 11:02:41 -0700874 }
875}
876
Neha Sharma3c425fb2020-06-08 16:42:32 +0000877func (kp *interContainerProxy) dispatchResponse(ctx context.Context, msg *ic.InterContainerMessage) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700878 kp.lockTransactionIdToChannelMap.RLock()
879 defer kp.lockTransactionIdToChannelMap.RUnlock()
880 if _, exist := kp.transactionIdToChannelMap[msg.Header.Id]; !exist {
Neha Sharma3c425fb2020-06-08 16:42:32 +0000881 logger.Debugw(ctx, "no-waiting-channel", log.Fields{"transaction": msg.Header.Id})
Scott Baker2c1c4822019-10-16 11:02:41 -0700882 return
883 }
884 kp.transactionIdToChannelMap[msg.Header.Id].ch <- msg
885}
886
887// subscribeForResponse allows a caller to subscribe to a given topic when waiting for a response.
888// This method is built to prevent all subscribers to receive all messages as is the case of the Subscribe
889// API. There is one response channel waiting for kafka messages before dispatching the message to the
890// corresponding waiting channel
Neha Sharma3c425fb2020-06-08 16:42:32 +0000891func (kp *interContainerProxy) subscribeForResponse(ctx context.Context, topic Topic, trnsId string) (chan *ic.InterContainerMessage, error) {
892 logger.Debugw(ctx, "subscribeForResponse", log.Fields{"topic": topic.Name, "trnsid": trnsId})
Scott Baker2c1c4822019-10-16 11:02:41 -0700893
894 // Create a specific channel for this consumers. We cannot use the channel from the kafkaclient as it will
895 // broadcast any message for this topic to all channels waiting on it.
Scott Bakerae1d4702020-03-04 14:10:51 -0800896 // Set channel size to 1 to prevent deadlock, see VOL-2708
897 ch := make(chan *ic.InterContainerMessage, 1)
Scott Baker2c1c4822019-10-16 11:02:41 -0700898 kp.addToTransactionIdToChannelMap(trnsId, &topic, ch)
899
900 return ch, nil
901}
902
Neha Sharma3c425fb2020-06-08 16:42:32 +0000903func (kp *interContainerProxy) unSubscribeForResponse(ctx context.Context, trnsId string) error {
904 logger.Debugw(ctx, "unsubscribe-for-response", log.Fields{"trnsId": trnsId})
Scott Baker2c1c4822019-10-16 11:02:41 -0700905 kp.deleteFromTransactionIdToChannelMap(trnsId)
906 return nil
907}
908
Neha Sharma3c425fb2020-06-08 16:42:32 +0000909func (kp *interContainerProxy) EnableLivenessChannel(ctx context.Context, enable bool) chan bool {
910 return kp.kafkaClient.EnableLivenessChannel(ctx, enable)
Scott Baker104b67d2019-10-29 15:56:27 -0700911}
912
Neha Sharma3c425fb2020-06-08 16:42:32 +0000913func (kp *interContainerProxy) EnableHealthinessChannel(ctx context.Context, enable bool) chan bool {
914 return kp.kafkaClient.EnableHealthinessChannel(ctx, enable)
Scott Baker0fef6982019-12-12 09:49:42 -0800915}
916
Neha Sharma3c425fb2020-06-08 16:42:32 +0000917func (kp *interContainerProxy) SendLiveness(ctx context.Context) error {
918 return kp.kafkaClient.SendLiveness(ctx)
Scott Baker104b67d2019-10-29 15:56:27 -0700919}
920
Scott Baker2c1c4822019-10-16 11:02:41 -0700921//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
922//or an error on failure
Neha Sharma3c425fb2020-06-08 16:42:32 +0000923func encodeRequest(ctx context.Context, rpc string, toTopic *Topic, replyTopic *Topic, key string, kvArgs ...*KVArg) (*ic.InterContainerMessage, error) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700924 requestHeader := &ic.Header{
925 Id: uuid.New().String(),
926 Type: ic.MessageType_REQUEST,
927 FromTopic: replyTopic.Name,
928 ToTopic: toTopic.Name,
929 KeyTopic: key,
Scott Baker84a55ce2020-04-17 10:11:30 -0700930 Timestamp: ptypes.TimestampNow(),
Scott Baker2c1c4822019-10-16 11:02:41 -0700931 }
932 requestBody := &ic.InterContainerRequestBody{
933 Rpc: rpc,
934 ResponseRequired: true,
935 ReplyToTopic: replyTopic.Name,
936 }
937
938 for _, arg := range kvArgs {
939 if arg == nil {
940 // In case the caller sends an array with empty args
941 continue
942 }
943 var marshalledArg *any.Any
944 var err error
945 // ascertain the value interface type is a proto.Message
946 protoValue, ok := arg.Value.(proto.Message)
947 if !ok {
Neha Sharma3c425fb2020-06-08 16:42:32 +0000948 logger.Warnw(ctx, "argument-value-not-proto-message", log.Fields{"error": ok, "Value": arg.Value})
Scott Baker2c1c4822019-10-16 11:02:41 -0700949 err := errors.New("argument-value-not-proto-message")
950 return nil, err
951 }
952 if marshalledArg, err = ptypes.MarshalAny(protoValue); err != nil {
Neha Sharma3c425fb2020-06-08 16:42:32 +0000953 logger.Warnw(ctx, "cannot-marshal-request", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700954 return nil, err
955 }
956 protoArg := &ic.Argument{
957 Key: arg.Key,
958 Value: marshalledArg,
959 }
960 requestBody.Args = append(requestBody.Args, protoArg)
961 }
962
963 var marshalledData *any.Any
964 var err error
965 if marshalledData, err = ptypes.MarshalAny(requestBody); err != nil {
Neha Sharma3c425fb2020-06-08 16:42:32 +0000966 logger.Warnw(ctx, "cannot-marshal-request", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700967 return nil, err
968 }
969 request := &ic.InterContainerMessage{
970 Header: requestHeader,
971 Body: marshalledData,
972 }
973 return request, nil
974}
975
Neha Sharma3c425fb2020-06-08 16:42:32 +0000976func decodeResponse(ctx context.Context, response *ic.InterContainerMessage) (*ic.InterContainerResponseBody, error) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700977 // Extract the message body
978 responseBody := ic.InterContainerResponseBody{}
979 if err := ptypes.UnmarshalAny(response.Body, &responseBody); err != nil {
Neha Sharma3c425fb2020-06-08 16:42:32 +0000980 logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700981 return nil, err
982 }
Neha Sharma3c425fb2020-06-08 16:42:32 +0000983 //logger.Debugw(ctx, "response-decoded-successfully", log.Fields{"response-status": &responseBody.Success})
Scott Baker2c1c4822019-10-16 11:02:41 -0700984
985 return &responseBody, nil
986
987}