Fixed an issue where the InterContainerProxy was hanging if it was closed before it was started.

Also fixed a bug where secondary invocations of InvokeRPC would hang because the doneCh was empty.
Also made it safe to close the InterContainerProxy multiple times.
Also changed newInterContainerProxy() to not return an error, as it cannot fail.

VOL-2530

Change-Id: I9ba5b85b720ac96b373bbdd6353f51336d44e2d7
diff --git a/pkg/kafka/kafka_inter_container_library.go b/pkg/kafka/kafka_inter_container_library.go
index 898548e..d21fdd5 100644
--- a/pkg/kafka/kafka_inter_container_library.go
+++ b/pkg/kafka/kafka_inter_container_library.go
@@ -82,7 +82,8 @@
 	defaultRequestHandlerInterface interface{}
 	deviceDiscoveryTopic           *Topic
 	kafkaClient                    Client
-	doneCh                         chan int
+	doneCh                         chan struct{}
+	doneOnce                       sync.Once
 
 	// This map is used to map a topic to an interface and channel.   When a request is received
 	// on that channel (registered to the topic) then that interface is invoked.
@@ -139,25 +140,21 @@
 	}
 }
 
-func newInterContainerProxy(opts ...InterContainerProxyOption) (*interContainerProxy, error) {
+func newInterContainerProxy(opts ...InterContainerProxyOption) *interContainerProxy {
 	proxy := &interContainerProxy{
 		kafkaHost: DefaultKafkaHost,
 		kafkaPort: DefaultKafkaPort,
+		doneCh:    make(chan struct{}),
 	}
 
 	for _, option := range opts {
 		option(proxy)
 	}
 
-	// Create the locks for all the maps
-	proxy.lockTopicRequestHandlerChannelMap = sync.RWMutex{}
-	proxy.lockTransactionIdToChannelMap = sync.RWMutex{}
-	proxy.lockTopicResponseChannelMap = sync.RWMutex{}
-
-	return proxy, nil
+	return proxy
 }
 
-func NewInterContainerProxy(opts ...InterContainerProxyOption) (InterContainerProxy, error) {
+func NewInterContainerProxy(opts ...InterContainerProxyOption) InterContainerProxy {
 	return newInterContainerProxy(opts...)
 }
 
@@ -169,9 +166,6 @@
 		logger.Fatal("kafka-client-not-set")
 	}
 
-	// Create the Done channel
-	kp.doneCh = make(chan int, 1)
-
 	// Start the kafka client
 	if err := kp.kafkaClient.Start(); err != nil {
 		logger.Errorw("Cannot-create-kafka-proxy", log.Fields{"error": err})
@@ -192,7 +186,7 @@
 
 func (kp *interContainerProxy) Stop() {
 	logger.Info("stopping-intercontainer-proxy")
-	kp.doneCh <- 1
+	kp.doneOnce.Do(func() { close(kp.doneCh) })
 	// TODO : Perform cleanup
 	kp.kafkaClient.Stop()
 	//kp.deleteAllTopicRequestHandlerChannelMap()