[VOL-2364] Adding required methods to InterContainerProxy interface
Change-Id: Idaeb3f40c4979c456f66209ba0a073242c8545d8
diff --git a/VERSION b/VERSION
index 75a22a2..b0f2dcb 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-3.0.3
+3.0.4
diff --git a/pkg/kafka/kafka_inter_container_library.go b/pkg/kafka/kafka_inter_container_library.go
index de22dda..898548e 100644
--- a/pkg/kafka/kafka_inter_container_library.go
+++ b/pkg/kafka/kafka_inter_container_library.go
@@ -63,19 +63,22 @@
 type InterContainerProxy interface {
 	Start() error
 	Stop()
+	GetDefaultTopic() *Topic
 	DeviceDiscovered(deviceId string, deviceType string, parentId string, publisher string) error
 	InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic, waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any)
 	SubscribeWithRequestHandlerInterface(topic Topic, handler interface{}) error
 	SubscribeWithDefaultRequestHandler(topic Topic, initialOffset int64) error
 	UnSubscribeFromRequestHandler(topic Topic) error
 	DeleteTopic(topic Topic) error
+	EnableLivenessChannel(enable bool) chan bool
+	SendLiveness() error
 }
 
 // interContainerProxy represents the messaging proxy
 type interContainerProxy struct {
 	kafkaHost                      string
 	kafkaPort                      int
-	DefaultTopic                   *Topic
+	defaultTopic                   *Topic
 	defaultRequestHandlerInterface interface{}
 	deviceDiscoveryTopic           *Topic
 	kafkaClient                    Client
@@ -114,7 +117,7 @@
 
 func DefaultTopic(topic *Topic) InterContainerProxyOption {
 	return func(args *interContainerProxy) {
-		args.DefaultTopic = topic
+		args.defaultTopic = topic
 	}
 }
 
@@ -197,6 +200,10 @@
 	//kp.deleteAllTransactionIdToChannelMap()
 }
 
+func (kp *interContainerProxy) GetDefaultTopic() *Topic {
+	return kp.defaultTopic
+}
+
 // DeviceDiscovered publish the discovered device onto the kafka messaging bus
 func (kp *interContainerProxy) DeviceDiscovered(deviceId string, deviceType string, parentId string, publisher string) error {
 	logger.Debugw("sending-device-discovery-msg", log.Fields{"deviceId": deviceId})
@@ -209,7 +216,7 @@
 	header := &ic.Header{
 		Id:        uuid.New().String(),
 		Type:      ic.MessageType_DEVICE_DISCOVERED,
-		FromTopic: kp.DefaultTopic.Name,
+		FromTopic: kp.defaultTopic.Name,
 		ToTopic:   kp.deviceDiscoveryTopic.Name,
 		Timestamp: time.Now().UnixNano(),
 	}
@@ -247,7 +254,7 @@
 	// typically the device ID.
 	responseTopic := replyToTopic
 	if responseTopic == nil {
-		responseTopic = kp.DefaultTopic
+		responseTopic = kp.defaultTopic
 	}
 
 	// Encode the request
diff --git a/pkg/kafka/kafka_inter_container_library_test.go b/pkg/kafka/kafka_inter_container_library_test.go
index f9888c0..abec85f 100644
--- a/pkg/kafka/kafka_inter_container_library_test.go
+++ b/pkg/kafka/kafka_inter_container_library_test.go
@@ -50,7 +50,7 @@
 	assert.Equal(t, actualResult.kafkaHost, DefaultKafkaHost)
 	assert.Equal(t, actualResult.kafkaPort, DefaultKafkaPort)
 	assert.Equal(t, actualResult.defaultRequestHandlerInterface, interface{}(nil))
-	assert.Equal(t, actualResult.DefaultTopic.Name, "Adapter")
+	assert.Equal(t, actualResult.defaultTopic.Name, "Adapter")
 }
 
 type myInterface struct {
@@ -79,7 +79,7 @@
 	assert.Equal(t, actualResult.kafkaHost, "10.20.30.40")
 	assert.Equal(t, actualResult.kafkaPort, 1020)
 	assert.Equal(t, actualResult.defaultRequestHandlerInterface, m)
-	assert.Equal(t, actualResult.DefaultTopic.Name, "Adapter")
+	assert.Equal(t, actualResult.defaultTopic.Name, "Adapter")
 }
 
 func TestKafkaProxyEnableLivenessChannel(t *testing.T) {
diff --git a/pkg/mocks/kafka_inter_container_proxy.go b/pkg/mocks/kafka_inter_container_proxy.go
index c53abb4..405fbe7 100644
--- a/pkg/mocks/kafka_inter_container_proxy.go
+++ b/pkg/mocks/kafka_inter_container_proxy.go
@@ -46,7 +46,13 @@
 	InvokeRpcSpy InvokeRpcSpy
 }
 
-func (s *MockKafkaICProxy) Start() error                        { return nil }
+func (s *MockKafkaICProxy) Start() error { return nil }
+func (s *MockKafkaICProxy) GetDefaultTopic() *kafka.Topic {
+	t := kafka.Topic{
+		Name: "test-topic",
+	}
+	return &t
+}
 func (s *MockKafkaICProxy) DeleteTopic(topic kafka.Topic) error { return nil }
 func (s *MockKafkaICProxy) DeviceDiscovered(deviceId string, deviceType string, parentId string, publisher string) error {
 	return nil
@@ -93,3 +99,5 @@
 	return nil
 }
 func (s *MockKafkaICProxy) UnSubscribeFromRequestHandler(topic kafka.Topic) error { return nil }
+func (s *MockKafkaICProxy) EnableLivenessChannel(enable bool) chan bool           { return nil }
+func (s *MockKafkaICProxy) SendLiveness() error                                   { return nil }