[VOL-1036] Initial implementation of device lifecycle management

Change-Id: I5aa58fdcbcd852f6f5eef35d48f25f76e20c0418
diff --git a/tests/kafka/kafka_inter_container_messaging_test.go b/tests/kafka/kafka_inter_container_messaging_test.go
index 99ba2c4..b31f2ce 100644
--- a/tests/kafka/kafka_inter_container_messaging_test.go
+++ b/tests/kafka/kafka_inter_container_messaging_test.go
@@ -42,12 +42,12 @@
 	log.SetAllLogLevel(log.ErrorLevel)
 
 	coreKafkaProxy, _ = kk.NewKafkaMessagingProxy(
-		kk.KafkaHost("192.168.0.20"),
+		kk.KafkaHost("192.168.0.17"),
 		kk.KafkaPort(9092),
 		kk.DefaultTopic(&kk.Topic{Name: "Core"}))
 
 	adapterKafkaProxy, _ = kk.NewKafkaMessagingProxy(
-		kk.KafkaHost("192.168.0.20"),
+		kk.KafkaHost("192.168.0.17"),
 		kk.KafkaPort(9092),
 		kk.DefaultTopic(&kk.Topic{Name: "Adapter"}))
 
@@ -186,7 +186,7 @@
 
 func TestGetDevice(t *testing.T) {
 	trnsId := uuid.New().String()
-	protoMsg := &voltha.ID{Id:trnsId}
+	protoMsg := &voltha.ID{Id: trnsId}
 	args := make([]*kk.KVArg, 1)
 	args[0] = &kk.KVArg{
 		Key:   "deviceID",
@@ -212,7 +212,7 @@
 
 func TestGetDeviceTimeout(t *testing.T) {
 	trnsId := uuid.New().String()
-	protoMsg := &voltha.ID{Id:trnsId}
+	protoMsg := &voltha.ID{Id: trnsId}
 	args := make([]*kk.KVArg, 1)
 	args[0] = &kk.KVArg{
 		Key:   "deviceID",
@@ -237,7 +237,7 @@
 
 func TestGetChildDevice(t *testing.T) {
 	trnsId := uuid.New().String()
-	protoMsg := &voltha.ID{Id:trnsId}
+	protoMsg := &voltha.ID{Id: trnsId}
 	args := make([]*kk.KVArg, 1)
 	args[0] = &kk.KVArg{
 		Key:   "deviceID",
@@ -260,7 +260,7 @@
 
 func TestGetChildDevices(t *testing.T) {
 	trnsId := uuid.New().String()
-	protoMsg := &voltha.ID{Id:trnsId}
+	protoMsg := &voltha.ID{Id: trnsId}
 	args := make([]*kk.KVArg, 1)
 	args[0] = &kk.KVArg{
 		Key:   "deviceID",
@@ -283,7 +283,7 @@
 
 func TestGetPorts(t *testing.T) {
 	trnsId := uuid.New().String()
-	protoArg1 := &voltha.ID{Id:trnsId}
+	protoArg1 := &voltha.ID{Id: trnsId}
 	args := make([]*kk.KVArg, 2)
 	args[0] = &kk.KVArg{
 		Key:   "deviceID",
@@ -311,7 +311,7 @@
 
 func TestGetPortsMissingArgs(t *testing.T) {
 	trnsId := uuid.New().String()
-	protoArg1 := &voltha.ID{Id:trnsId}
+	protoArg1 := &voltha.ID{Id: trnsId}
 	args := make([]*kk.KVArg, 1)
 	args[0] = &kk.KVArg{
 		Key:   "deviceID",
@@ -443,6 +443,36 @@
 	assert.NotNil(t, unpackResult)
 }
 
+func TestDeviceStateChange(t *testing.T) {
+	log.SetAllLogLevel(log.DebugLevel)
+	trnsId := uuid.New().String()
+	protoArg1 := &voltha.ID{Id: trnsId}
+	args := make([]*kk.KVArg, 4)
+	args[0] = &kk.KVArg{
+		Key:   "device_id",
+		Value: protoArg1,
+	}
+	protoArg2 := &ca.IntType{Val: 1}
+	args[1] = &kk.KVArg{
+		Key:   "oper_status",
+		Value: protoArg2,
+	}
+	protoArg3 := &ca.IntType{Val: 1}
+	args[2] = &kk.KVArg{
+		Key:   "connect_status",
+		Value: protoArg3,
+	}
+
+	rpc := "DeviceStateUpdate"
+	topic := kk.Topic{Name: "Core"}
+	start := time.Now()
+	status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
+	elapsed := time.Since(start)
+	log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
+	assert.Equal(t, status, true)
+	assert.Nil(t, result)
+}
+
 func TestStopKafkaProxy(t *testing.T) {
 	adapterKafkaProxy.Stop()
 	coreKafkaProxy.Stop()