Fixed an issue where the InterContainerProxy was hanging if it was closed before it was started.
Also fixed a bug where secondary invocations of InvokeRPC would hang because the doneCh was empty.
Also made it safe to close the InterContainerProxy multiple times.
Also changed newInterContainerProxy() to not return an error, as it cannot fail.
VOL-2530
Change-Id: I9ba5b85b720ac96b373bbdd6353f51336d44e2d7
diff --git a/pkg/kafka/kafka_inter_container_library_test.go b/pkg/kafka/kafka_inter_container_library_test.go
index abec85f..56c90ca 100644
--- a/pkg/kafka/kafka_inter_container_library_test.go
+++ b/pkg/kafka/kafka_inter_container_library_test.go
@@ -21,32 +21,28 @@
)
func TestDefaultKafkaProxy(t *testing.T) {
- actualResult, error := newInterContainerProxy()
- assert.Equal(t, error, nil)
+ actualResult := newInterContainerProxy()
assert.Equal(t, actualResult.kafkaHost, DefaultKafkaHost)
assert.Equal(t, actualResult.kafkaPort, DefaultKafkaPort)
assert.Equal(t, actualResult.defaultRequestHandlerInterface, interface{}(nil))
}
func TestKafkaProxyOptionHost(t *testing.T) {
- actualResult, error := newInterContainerProxy(InterContainerHost("10.20.30.40"))
- assert.Equal(t, error, nil)
+ actualResult := newInterContainerProxy(InterContainerHost("10.20.30.40"))
assert.Equal(t, actualResult.kafkaHost, "10.20.30.40")
assert.Equal(t, actualResult.kafkaPort, DefaultKafkaPort)
assert.Equal(t, actualResult.defaultRequestHandlerInterface, interface{}(nil))
}
func TestKafkaProxyOptionPort(t *testing.T) {
- actualResult, error := newInterContainerProxy(InterContainerPort(1020))
- assert.Equal(t, error, nil)
+ actualResult := newInterContainerProxy(InterContainerPort(1020))
assert.Equal(t, actualResult.kafkaHost, DefaultKafkaHost)
assert.Equal(t, actualResult.kafkaPort, 1020)
assert.Equal(t, actualResult.defaultRequestHandlerInterface, interface{}(nil))
}
func TestKafkaProxyOptionTopic(t *testing.T) {
- actualResult, error := newInterContainerProxy(DefaultTopic(&Topic{Name: "Adapter"}))
- assert.Equal(t, error, nil)
+ actualResult := newInterContainerProxy(DefaultTopic(&Topic{Name: "Adapter"}))
assert.Equal(t, actualResult.kafkaHost, DefaultKafkaHost)
assert.Equal(t, actualResult.kafkaPort, DefaultKafkaPort)
assert.Equal(t, actualResult.defaultRequestHandlerInterface, interface{}(nil))
@@ -61,8 +57,7 @@
func TestKafkaProxyOptionTargetInterface(t *testing.T) {
var m *myInterface
- actualResult, error := newInterContainerProxy(RequestHandlerInterface(m))
- assert.Equal(t, error, nil)
+ actualResult := newInterContainerProxy(RequestHandlerInterface(m))
assert.Equal(t, actualResult.kafkaHost, DefaultKafkaHost)
assert.Equal(t, actualResult.kafkaPort, DefaultKafkaPort)
assert.Equal(t, actualResult.defaultRequestHandlerInterface, m)
@@ -70,12 +65,11 @@
func TestKafkaProxyChangeAllOptions(t *testing.T) {
var m *myInterface
- actualResult, error := newInterContainerProxy(
+ actualResult := newInterContainerProxy(
InterContainerHost("10.20.30.40"),
InterContainerPort(1020),
DefaultTopic(&Topic{Name: "Adapter"}),
RequestHandlerInterface(m))
- assert.Equal(t, error, nil)
assert.Equal(t, actualResult.kafkaHost, "10.20.30.40")
assert.Equal(t, actualResult.kafkaPort, 1020)
assert.Equal(t, actualResult.defaultRequestHandlerInterface, m)
@@ -88,7 +82,7 @@
// Note: This doesn't actually start the client
client := NewSaramaClient()
- probe, err := newInterContainerProxy(
+ probe := newInterContainerProxy(
InterContainerHost("10.20.30.40"),
InterContainerPort(1020),
DefaultTopic(&Topic{Name: "Adapter"}),
@@ -96,8 +90,6 @@
MsgClient(client),
)
- assert.Nil(t, err)
-
ch := probe.EnableLivenessChannel(true)
// The channel should have one "true" message on it