Fixed an issue where the InterContainerProxy was hanging if it was closed before it was started.

Also fixed a bug where secondary invocations of InvokeRPC would hang because the doneCh was empty.
Also made it safe to close the InterContainerProxy multiple times.
Also changed newInterContainerProxy() to not return an error, as it cannot fail.

VOL-2530

Change-Id: I9ba5b85b720ac96b373bbdd6353f51336d44e2d7
diff --git a/VERSION b/VERSION
index 07e1a32..eca690e 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-3.0.5-dev
+3.0.5
diff --git a/pkg/kafka/kafka_inter_container_library.go b/pkg/kafka/kafka_inter_container_library.go
index 898548e..d21fdd5 100644
--- a/pkg/kafka/kafka_inter_container_library.go
+++ b/pkg/kafka/kafka_inter_container_library.go
@@ -82,7 +82,8 @@
 	defaultRequestHandlerInterface interface{}
 	deviceDiscoveryTopic           *Topic
 	kafkaClient                    Client
-	doneCh                         chan int
+	doneCh                         chan struct{}
+	doneOnce                       sync.Once
 
 	// This map is used to map a topic to an interface and channel.   When a request is received
 	// on that channel (registered to the topic) then that interface is invoked.
@@ -139,25 +140,21 @@
 	}
 }
 
-func newInterContainerProxy(opts ...InterContainerProxyOption) (*interContainerProxy, error) {
+func newInterContainerProxy(opts ...InterContainerProxyOption) *interContainerProxy {
 	proxy := &interContainerProxy{
 		kafkaHost: DefaultKafkaHost,
 		kafkaPort: DefaultKafkaPort,
+		doneCh:    make(chan struct{}),
 	}
 
 	for _, option := range opts {
 		option(proxy)
 	}
 
-	// Create the locks for all the maps
-	proxy.lockTopicRequestHandlerChannelMap = sync.RWMutex{}
-	proxy.lockTransactionIdToChannelMap = sync.RWMutex{}
-	proxy.lockTopicResponseChannelMap = sync.RWMutex{}
-
-	return proxy, nil
+	return proxy
 }
 
-func NewInterContainerProxy(opts ...InterContainerProxyOption) (InterContainerProxy, error) {
+func NewInterContainerProxy(opts ...InterContainerProxyOption) InterContainerProxy {
 	return newInterContainerProxy(opts...)
 }
 
@@ -169,9 +166,6 @@
 		logger.Fatal("kafka-client-not-set")
 	}
 
-	// Create the Done channel
-	kp.doneCh = make(chan int, 1)
-
 	// Start the kafka client
 	if err := kp.kafkaClient.Start(); err != nil {
 		logger.Errorw("Cannot-create-kafka-proxy", log.Fields{"error": err})
@@ -192,7 +186,7 @@
 
 func (kp *interContainerProxy) Stop() {
 	logger.Info("stopping-intercontainer-proxy")
-	kp.doneCh <- 1
+	kp.doneOnce.Do(func() { close(kp.doneCh) })
 	// TODO : Perform cleanup
 	kp.kafkaClient.Stop()
 	//kp.deleteAllTopicRequestHandlerChannelMap()
diff --git a/pkg/kafka/kafka_inter_container_library_test.go b/pkg/kafka/kafka_inter_container_library_test.go
index abec85f..56c90ca 100644
--- a/pkg/kafka/kafka_inter_container_library_test.go
+++ b/pkg/kafka/kafka_inter_container_library_test.go
@@ -21,32 +21,28 @@
 )
 
 func TestDefaultKafkaProxy(t *testing.T) {
-	actualResult, error := newInterContainerProxy()
-	assert.Equal(t, error, nil)
+	actualResult := newInterContainerProxy()
 	assert.Equal(t, actualResult.kafkaHost, DefaultKafkaHost)
 	assert.Equal(t, actualResult.kafkaPort, DefaultKafkaPort)
 	assert.Equal(t, actualResult.defaultRequestHandlerInterface, interface{}(nil))
 }
 
 func TestKafkaProxyOptionHost(t *testing.T) {
-	actualResult, error := newInterContainerProxy(InterContainerHost("10.20.30.40"))
-	assert.Equal(t, error, nil)
+	actualResult := newInterContainerProxy(InterContainerHost("10.20.30.40"))
 	assert.Equal(t, actualResult.kafkaHost, "10.20.30.40")
 	assert.Equal(t, actualResult.kafkaPort, DefaultKafkaPort)
 	assert.Equal(t, actualResult.defaultRequestHandlerInterface, interface{}(nil))
 }
 
 func TestKafkaProxyOptionPort(t *testing.T) {
-	actualResult, error := newInterContainerProxy(InterContainerPort(1020))
-	assert.Equal(t, error, nil)
+	actualResult := newInterContainerProxy(InterContainerPort(1020))
 	assert.Equal(t, actualResult.kafkaHost, DefaultKafkaHost)
 	assert.Equal(t, actualResult.kafkaPort, 1020)
 	assert.Equal(t, actualResult.defaultRequestHandlerInterface, interface{}(nil))
 }
 
 func TestKafkaProxyOptionTopic(t *testing.T) {
-	actualResult, error := newInterContainerProxy(DefaultTopic(&Topic{Name: "Adapter"}))
-	assert.Equal(t, error, nil)
+	actualResult := newInterContainerProxy(DefaultTopic(&Topic{Name: "Adapter"}))
 	assert.Equal(t, actualResult.kafkaHost, DefaultKafkaHost)
 	assert.Equal(t, actualResult.kafkaPort, DefaultKafkaPort)
 	assert.Equal(t, actualResult.defaultRequestHandlerInterface, interface{}(nil))
@@ -61,8 +57,7 @@
 
 func TestKafkaProxyOptionTargetInterface(t *testing.T) {
 	var m *myInterface
-	actualResult, error := newInterContainerProxy(RequestHandlerInterface(m))
-	assert.Equal(t, error, nil)
+	actualResult := newInterContainerProxy(RequestHandlerInterface(m))
 	assert.Equal(t, actualResult.kafkaHost, DefaultKafkaHost)
 	assert.Equal(t, actualResult.kafkaPort, DefaultKafkaPort)
 	assert.Equal(t, actualResult.defaultRequestHandlerInterface, m)
@@ -70,12 +65,11 @@
 
 func TestKafkaProxyChangeAllOptions(t *testing.T) {
 	var m *myInterface
-	actualResult, error := newInterContainerProxy(
+	actualResult := newInterContainerProxy(
 		InterContainerHost("10.20.30.40"),
 		InterContainerPort(1020),
 		DefaultTopic(&Topic{Name: "Adapter"}),
 		RequestHandlerInterface(m))
-	assert.Equal(t, error, nil)
 	assert.Equal(t, actualResult.kafkaHost, "10.20.30.40")
 	assert.Equal(t, actualResult.kafkaPort, 1020)
 	assert.Equal(t, actualResult.defaultRequestHandlerInterface, m)
@@ -88,7 +82,7 @@
 	// Note: This doesn't actually start the client
 	client := NewSaramaClient()
 
-	probe, err := newInterContainerProxy(
+	probe := newInterContainerProxy(
 		InterContainerHost("10.20.30.40"),
 		InterContainerPort(1020),
 		DefaultTopic(&Topic{Name: "Adapter"}),
@@ -96,8 +90,6 @@
 		MsgClient(client),
 	)
 
-	assert.Nil(t, err)
-
 	ch := probe.EnableLivenessChannel(true)
 
 	// The channel should have one "true" message on it