[VOL-2364] Adding required methods to InterContainerProxy interface
Change-Id: Idaeb3f40c4979c456f66209ba0a073242c8545d8
diff --git a/VERSION b/VERSION
index 75a22a2..b0f2dcb 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-3.0.3
+3.0.4
diff --git a/pkg/kafka/kafka_inter_container_library.go b/pkg/kafka/kafka_inter_container_library.go
index de22dda..898548e 100644
--- a/pkg/kafka/kafka_inter_container_library.go
+++ b/pkg/kafka/kafka_inter_container_library.go
@@ -63,19 +63,22 @@
type InterContainerProxy interface {
Start() error
Stop()
+ GetDefaultTopic() *Topic
DeviceDiscovered(deviceId string, deviceType string, parentId string, publisher string) error
InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic, waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any)
SubscribeWithRequestHandlerInterface(topic Topic, handler interface{}) error
SubscribeWithDefaultRequestHandler(topic Topic, initialOffset int64) error
UnSubscribeFromRequestHandler(topic Topic) error
DeleteTopic(topic Topic) error
+ EnableLivenessChannel(enable bool) chan bool
+ SendLiveness() error
}
// interContainerProxy represents the messaging proxy
type interContainerProxy struct {
kafkaHost string
kafkaPort int
- DefaultTopic *Topic
+ defaultTopic *Topic
defaultRequestHandlerInterface interface{}
deviceDiscoveryTopic *Topic
kafkaClient Client
@@ -114,7 +117,7 @@
func DefaultTopic(topic *Topic) InterContainerProxyOption {
return func(args *interContainerProxy) {
- args.DefaultTopic = topic
+ args.defaultTopic = topic
}
}
@@ -197,6 +200,10 @@
//kp.deleteAllTransactionIdToChannelMap()
}
+func (kp *interContainerProxy) GetDefaultTopic() *Topic {
+ return kp.defaultTopic
+}
+
// DeviceDiscovered publish the discovered device onto the kafka messaging bus
func (kp *interContainerProxy) DeviceDiscovered(deviceId string, deviceType string, parentId string, publisher string) error {
logger.Debugw("sending-device-discovery-msg", log.Fields{"deviceId": deviceId})
@@ -209,7 +216,7 @@
header := &ic.Header{
Id: uuid.New().String(),
Type: ic.MessageType_DEVICE_DISCOVERED,
- FromTopic: kp.DefaultTopic.Name,
+ FromTopic: kp.defaultTopic.Name,
ToTopic: kp.deviceDiscoveryTopic.Name,
Timestamp: time.Now().UnixNano(),
}
@@ -247,7 +254,7 @@
// typically the device ID.
responseTopic := replyToTopic
if responseTopic == nil {
- responseTopic = kp.DefaultTopic
+ responseTopic = kp.defaultTopic
}
// Encode the request
diff --git a/pkg/kafka/kafka_inter_container_library_test.go b/pkg/kafka/kafka_inter_container_library_test.go
index f9888c0..abec85f 100644
--- a/pkg/kafka/kafka_inter_container_library_test.go
+++ b/pkg/kafka/kafka_inter_container_library_test.go
@@ -50,7 +50,7 @@
assert.Equal(t, actualResult.kafkaHost, DefaultKafkaHost)
assert.Equal(t, actualResult.kafkaPort, DefaultKafkaPort)
assert.Equal(t, actualResult.defaultRequestHandlerInterface, interface{}(nil))
- assert.Equal(t, actualResult.DefaultTopic.Name, "Adapter")
+ assert.Equal(t, actualResult.defaultTopic.Name, "Adapter")
}
type myInterface struct {
@@ -79,7 +79,7 @@
assert.Equal(t, actualResult.kafkaHost, "10.20.30.40")
assert.Equal(t, actualResult.kafkaPort, 1020)
assert.Equal(t, actualResult.defaultRequestHandlerInterface, m)
- assert.Equal(t, actualResult.DefaultTopic.Name, "Adapter")
+ assert.Equal(t, actualResult.defaultTopic.Name, "Adapter")
}
func TestKafkaProxyEnableLivenessChannel(t *testing.T) {
diff --git a/pkg/mocks/kafka_inter_container_proxy.go b/pkg/mocks/kafka_inter_container_proxy.go
index c53abb4..405fbe7 100644
--- a/pkg/mocks/kafka_inter_container_proxy.go
+++ b/pkg/mocks/kafka_inter_container_proxy.go
@@ -46,7 +46,13 @@
InvokeRpcSpy InvokeRpcSpy
}
-func (s *MockKafkaICProxy) Start() error { return nil }
+func (s *MockKafkaICProxy) Start() error { return nil }
+func (s *MockKafkaICProxy) GetDefaultTopic() *kafka.Topic {
+ t := kafka.Topic{
+ Name: "test-topic",
+ }
+ return &t
+}
func (s *MockKafkaICProxy) DeleteTopic(topic kafka.Topic) error { return nil }
func (s *MockKafkaICProxy) DeviceDiscovered(deviceId string, deviceType string, parentId string, publisher string) error {
return nil
@@ -93,3 +99,5 @@
return nil
}
func (s *MockKafkaICProxy) UnSubscribeFromRequestHandler(topic kafka.Topic) error { return nil }
+func (s *MockKafkaICProxy) EnableLivenessChannel(enable bool) chan bool { return nil }
+func (s *MockKafkaICProxy) SendLiveness() error { return nil }