Fixed an issue where the InterContainerProxy was hanging if it was closed before it was started.
Also fixed a bug where secondary invocations of InvokeRPC would hang because the doneCh was empty.
Also made it safe to close the InterContainerProxy multiple times.
Also changed newInterContainerProxy() to not return an error, as it cannot fail.
VOL-2530
Change-Id: I9ba5b85b720ac96b373bbdd6353f51336d44e2d7
diff --git a/VERSION b/VERSION
index 07e1a32..eca690e 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-3.0.5-dev
+3.0.5
diff --git a/pkg/kafka/kafka_inter_container_library.go b/pkg/kafka/kafka_inter_container_library.go
index 898548e..d21fdd5 100644
--- a/pkg/kafka/kafka_inter_container_library.go
+++ b/pkg/kafka/kafka_inter_container_library.go
@@ -82,7 +82,8 @@
defaultRequestHandlerInterface interface{}
deviceDiscoveryTopic *Topic
kafkaClient Client
- doneCh chan int
+ doneCh chan struct{}
+ doneOnce sync.Once
// This map is used to map a topic to an interface and channel. When a request is received
// on that channel (registered to the topic) then that interface is invoked.
@@ -139,25 +140,21 @@
}
}
-func newInterContainerProxy(opts ...InterContainerProxyOption) (*interContainerProxy, error) {
+func newInterContainerProxy(opts ...InterContainerProxyOption) *interContainerProxy {
proxy := &interContainerProxy{
kafkaHost: DefaultKafkaHost,
kafkaPort: DefaultKafkaPort,
+ doneCh: make(chan struct{}),
}
for _, option := range opts {
option(proxy)
}
- // Create the locks for all the maps
- proxy.lockTopicRequestHandlerChannelMap = sync.RWMutex{}
- proxy.lockTransactionIdToChannelMap = sync.RWMutex{}
- proxy.lockTopicResponseChannelMap = sync.RWMutex{}
-
- return proxy, nil
+ return proxy
}
-func NewInterContainerProxy(opts ...InterContainerProxyOption) (InterContainerProxy, error) {
+func NewInterContainerProxy(opts ...InterContainerProxyOption) InterContainerProxy {
return newInterContainerProxy(opts...)
}
@@ -169,9 +166,6 @@
logger.Fatal("kafka-client-not-set")
}
- // Create the Done channel
- kp.doneCh = make(chan int, 1)
-
// Start the kafka client
if err := kp.kafkaClient.Start(); err != nil {
logger.Errorw("Cannot-create-kafka-proxy", log.Fields{"error": err})
@@ -192,7 +186,7 @@
func (kp *interContainerProxy) Stop() {
logger.Info("stopping-intercontainer-proxy")
- kp.doneCh <- 1
+ kp.doneOnce.Do(func() { close(kp.doneCh) })
// TODO : Perform cleanup
kp.kafkaClient.Stop()
//kp.deleteAllTopicRequestHandlerChannelMap()
diff --git a/pkg/kafka/kafka_inter_container_library_test.go b/pkg/kafka/kafka_inter_container_library_test.go
index abec85f..56c90ca 100644
--- a/pkg/kafka/kafka_inter_container_library_test.go
+++ b/pkg/kafka/kafka_inter_container_library_test.go
@@ -21,32 +21,28 @@
)
func TestDefaultKafkaProxy(t *testing.T) {
- actualResult, error := newInterContainerProxy()
- assert.Equal(t, error, nil)
+ actualResult := newInterContainerProxy()
assert.Equal(t, actualResult.kafkaHost, DefaultKafkaHost)
assert.Equal(t, actualResult.kafkaPort, DefaultKafkaPort)
assert.Equal(t, actualResult.defaultRequestHandlerInterface, interface{}(nil))
}
func TestKafkaProxyOptionHost(t *testing.T) {
- actualResult, error := newInterContainerProxy(InterContainerHost("10.20.30.40"))
- assert.Equal(t, error, nil)
+ actualResult := newInterContainerProxy(InterContainerHost("10.20.30.40"))
assert.Equal(t, actualResult.kafkaHost, "10.20.30.40")
assert.Equal(t, actualResult.kafkaPort, DefaultKafkaPort)
assert.Equal(t, actualResult.defaultRequestHandlerInterface, interface{}(nil))
}
func TestKafkaProxyOptionPort(t *testing.T) {
- actualResult, error := newInterContainerProxy(InterContainerPort(1020))
- assert.Equal(t, error, nil)
+ actualResult := newInterContainerProxy(InterContainerPort(1020))
assert.Equal(t, actualResult.kafkaHost, DefaultKafkaHost)
assert.Equal(t, actualResult.kafkaPort, 1020)
assert.Equal(t, actualResult.defaultRequestHandlerInterface, interface{}(nil))
}
func TestKafkaProxyOptionTopic(t *testing.T) {
- actualResult, error := newInterContainerProxy(DefaultTopic(&Topic{Name: "Adapter"}))
- assert.Equal(t, error, nil)
+ actualResult := newInterContainerProxy(DefaultTopic(&Topic{Name: "Adapter"}))
assert.Equal(t, actualResult.kafkaHost, DefaultKafkaHost)
assert.Equal(t, actualResult.kafkaPort, DefaultKafkaPort)
assert.Equal(t, actualResult.defaultRequestHandlerInterface, interface{}(nil))
@@ -61,8 +57,7 @@
func TestKafkaProxyOptionTargetInterface(t *testing.T) {
var m *myInterface
- actualResult, error := newInterContainerProxy(RequestHandlerInterface(m))
- assert.Equal(t, error, nil)
+ actualResult := newInterContainerProxy(RequestHandlerInterface(m))
assert.Equal(t, actualResult.kafkaHost, DefaultKafkaHost)
assert.Equal(t, actualResult.kafkaPort, DefaultKafkaPort)
assert.Equal(t, actualResult.defaultRequestHandlerInterface, m)
@@ -70,12 +65,11 @@
func TestKafkaProxyChangeAllOptions(t *testing.T) {
var m *myInterface
- actualResult, error := newInterContainerProxy(
+ actualResult := newInterContainerProxy(
InterContainerHost("10.20.30.40"),
InterContainerPort(1020),
DefaultTopic(&Topic{Name: "Adapter"}),
RequestHandlerInterface(m))
- assert.Equal(t, error, nil)
assert.Equal(t, actualResult.kafkaHost, "10.20.30.40")
assert.Equal(t, actualResult.kafkaPort, 1020)
assert.Equal(t, actualResult.defaultRequestHandlerInterface, m)
@@ -88,7 +82,7 @@
// Note: This doesn't actually start the client
client := NewSaramaClient()
- probe, err := newInterContainerProxy(
+ probe := newInterContainerProxy(
InterContainerHost("10.20.30.40"),
InterContainerPort(1020),
DefaultTopic(&Topic{Name: "Adapter"}),
@@ -96,8 +90,6 @@
MsgClient(client),
)
- assert.Nil(t, err)
-
ch := probe.EnableLivenessChannel(true)
// The channel should have one "true" message on it