This update addresses the following:
1.  Decouple the kafka messaging proxy from the kafka client.  This
will allow us to try out different kafka clients as well as test
the client separately.
2. Create unique device topics for the core, olt adapter and onu
adapters.  This will ensure only cores and adapters handling these
devices will listens to the device messages.
3. Update the core with the latest device model APIs and changes.
While most of the model issues have been fixed, there is still an
issue with updating a child branch.   This will be dealt in a separate
update.

Change-Id: I622ef5c636d7466bb3adefaa4ac4c85d7c450bea
diff --git a/rw_core/core/adapter_proxy.go b/rw_core/core/adapter_proxy.go
index ab35037..4af4fb0 100644
--- a/rw_core/core/adapter_proxy.go
+++ b/rw_core/core/adapter_proxy.go
@@ -29,13 +29,13 @@
 )
 
 type AdapterProxy struct {
-	TestMode   bool
-	kafkaProxy *kafka.KafkaMessagingProxy
+	TestMode     bool
+	kafkaICProxy *kafka.InterContainerProxy
 }
 
-func NewAdapterProxy(kafkaProxy *kafka.KafkaMessagingProxy) *AdapterProxy {
+func NewAdapterProxy(kafkaProxy *kafka.InterContainerProxy) *AdapterProxy {
 	var proxy AdapterProxy
-	proxy.kafkaProxy = kafkaProxy
+	proxy.kafkaICProxy = kafkaProxy
 	return &proxy
 }
 
@@ -54,6 +54,18 @@
 	}
 }
 
+//func kafka.CreateSubTopic(args ...string) kafka.Topic{
+//	topic := ""
+//	for index , arg := range args {
+//		if index == 0 {
+//			topic = arg
+//		} else {
+//			topic = fmt.Sprintf("%s_%s",  topic, arg)
+//		}
+//	}
+//	return kafka.Topic{Name:topic}
+//}
+
 func (ap *AdapterProxy) AdoptDevice(ctx context.Context, device *voltha.Device) error {
 	log.Debugw("AdoptDevice", log.Fields{"device": device})
 	rpc := "adopt_device"
@@ -63,21 +75,35 @@
 		Key:   "device",
 		Value: device,
 	}
-	success, result := ap.kafkaProxy.InvokeRPC(ctx, rpc, &topic, true, args...)
-	log.Debugw("AdoptDevice-response", log.Fields{"deviceid": device.Id, "success": success})
+	// Use a device topic for the response as we are the only core handling requests for this device
+	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, args...)
+	log.Debugw("AdoptDevice-response", log.Fields{"replyTopic": replyToTopic, "deviceid": device.Id, "success": success})
+	if success {
+		// From now on, any unsolicited requests from the adapters for this device will come over the device topic.
+		// We should therefore include the replyToTopic as part of the target when unsolicited messages come in.
+		if err := ap.kafkaICProxy.SubscribeWithDefaultRequestHandler(replyToTopic); err != nil {
+			log.Errorw("Unable-to-subscribe-new-topic", log.Fields{"topic": replyToTopic, "error": err})
+			return err
+		}
+	}
 	return unPackResponse(rpc, device.Id, success, result)
 }
 
 func (ap *AdapterProxy) DisableDevice(ctx context.Context, device *voltha.Device) error {
 	log.Debugw("DisableDevice", log.Fields{"deviceId": device.Id})
 	rpc := "disable_device"
-	topic := kafka.Topic{Name: device.Type}
+	// Use a device specific topic to send the request.  The adapter handling the device creates a device
+	// specific topic
+	toTopic := kafka.CreateSubTopic(device.Type, device.Id)
 	args := make([]*kafka.KVArg, 1)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
 		Value: device,
 	}
-	success, result := ap.kafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
+	// Use a device specific topic as we are the only core handling requests for this device
+	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
 	log.Debugw("DisableDevice-response", log.Fields{"deviceId": device.Id, "success": success})
 	return unPackResponse(rpc, device.Id, success, result)
 }
@@ -85,13 +111,15 @@
 func (ap *AdapterProxy) ReEnableDevice(ctx context.Context, device *voltha.Device) error {
 	log.Debugw("ReEnableDevice", log.Fields{"deviceId": device.Id})
 	rpc := "reenable_device"
-	topic := kafka.Topic{Name: device.Type}
+	toTopic := kafka.CreateSubTopic(device.Type, device.Id)
 	args := make([]*kafka.KVArg, 1)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
 		Value: device,
 	}
-	success, result := ap.kafkaProxy.InvokeRPC(ctx, rpc, &topic, true, args...)
+	// Use a device specific topic as we are the only core handling requests for this device
+	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
 	log.Debugw("ReEnableDevice-response", log.Fields{"deviceid": device.Id, "success": success})
 	return unPackResponse(rpc, device.Id, success, result)
 }
@@ -99,13 +127,15 @@
 func (ap *AdapterProxy) RebootDevice(ctx context.Context, device *voltha.Device) error {
 	log.Debugw("RebootDevice", log.Fields{"deviceId": device.Id})
 	rpc := "reboot_device"
-	topic := kafka.Topic{Name: device.Type}
+	toTopic := kafka.CreateSubTopic(device.Type, device.Id)
 	args := make([]*kafka.KVArg, 1)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
 		Value: device,
 	}
-	success, result := ap.kafkaProxy.InvokeRPC(ctx, rpc, &topic, true, args...)
+	// Use a device specific topic as we are the only core handling requests for this device
+	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
 	log.Debugw("RebootDevice-response", log.Fields{"deviceid": device.Id, "success": success})
 	return unPackResponse(rpc, device.Id, success, result)
 }
@@ -113,26 +143,40 @@
 func (ap *AdapterProxy) DeleteDevice(ctx context.Context, device *voltha.Device) error {
 	log.Debugw("DeleteDevice", log.Fields{"deviceId": device.Id})
 	rpc := "delete_device"
-	topic := kafka.Topic{Name: device.Type}
+	toTopic := kafka.CreateSubTopic(device.Type, device.Id)
 	args := make([]*kafka.KVArg, 1)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
 		Value: device,
 	}
-	success, result := ap.kafkaProxy.InvokeRPC(ctx, rpc, &topic, true, args...)
+	// Use a device specific topic as we are the only core handling requests for this device
+	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
 	log.Debugw("DeleteDevice-response", log.Fields{"deviceid": device.Id, "success": success})
+
+	// We no longer need to have a target against that topic as we won't receive any unsolicited messages on that
+	// topic
+	if err := ap.kafkaICProxy.UnSubscribeFromRequestHandler(replyToTopic); err != nil {
+		log.Errorw("Unable-to-subscribe-from-target", log.Fields{"topic": replyToTopic, "error": err})
+		return err
+	}
+	// Now delete the topic altogether
+	ap.kafkaICProxy.DeleteTopic(replyToTopic)
+
 	return unPackResponse(rpc, device.Id, success, result)
 }
 
 func (ap *AdapterProxy) GetOfpDeviceInfo(ctx context.Context, device *voltha.Device) (*ca.SwitchCapability, error) {
 	log.Debugw("GetOfpDeviceInfo", log.Fields{"deviceId": device.Id})
-	topic := kafka.Topic{Name: device.Type}
+	toTopic := kafka.CreateSubTopic(device.Type, device.Id)
 	args := make([]*kafka.KVArg, 1)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
 		Value: device,
 	}
-	success, result := ap.kafkaProxy.InvokeRPC(ctx, "get_ofp_device_info", &topic, true, args...)
+	// Use a device specific topic as we are the only core handling requests for this device
+	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	success, result := ap.kafkaICProxy.InvokeRPC(ctx, "get_ofp_device_info", &toTopic, &replyToTopic, true, args...)
 	log.Debugw("GetOfpDeviceInfo-response", log.Fields{"deviceId": device.Id, "success": success, "result": result})
 	if success {
 		unpackResult := &ca.SwitchCapability{}
@@ -155,7 +199,7 @@
 
 func (ap *AdapterProxy) GetOfpPortInfo(ctx context.Context, device *voltha.Device, portNo uint32) (*ca.PortCapability, error) {
 	log.Debugw("GetOfpPortInfo", log.Fields{"deviceId": device.Id})
-	topic := kafka.Topic{Name: device.Type}
+	toTopic := kafka.CreateSubTopic(device.Type, device.Id)
 	args := make([]*kafka.KVArg, 2)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
@@ -166,8 +210,9 @@
 		Key:   "port_no",
 		Value: pNo,
 	}
-
-	success, result := ap.kafkaProxy.InvokeRPC(ctx, "get_ofp_port_info", &topic, true, args...)
+	// Use a device specific topic as we are the only core handling requests for this device
+	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	success, result := ap.kafkaICProxy.InvokeRPC(ctx, "get_ofp_port_info", &toTopic, &replyToTopic, true, args...)
 	log.Debugw("GetOfpPortInfo-response", log.Fields{"deviceid": device.Id, "success": success})
 	if success {
 		unpackResult := &ca.PortCapability{}
@@ -252,15 +297,15 @@
 
 func (ap *AdapterProxy) packetOut(deviceType string, deviceId string, outPort uint32, packet *openflow_13.OfpPacketOut) error {
 	log.Debugw("packetOut", log.Fields{"deviceId": deviceId})
-	topic := kafka.Topic{Name: deviceType}
+	toTopic := kafka.CreateSubTopic(deviceType, deviceId)
 	rpc := "receive_packet_out"
-	dId := &ca.StrType{Val:deviceId}
+	dId := &ca.StrType{Val: deviceId}
 	args := make([]*kafka.KVArg, 3)
 	args[0] = &kafka.KVArg{
 		Key:   "deviceId",
 		Value: dId,
 	}
-	op := &ca.IntType{Val:int64(outPort)}
+	op := &ca.IntType{Val: int64(outPort)}
 	args[1] = &kafka.KVArg{
 		Key:   "outPort",
 		Value: op,
@@ -271,15 +316,16 @@
 	}
 
 	// TODO:  Do we need to wait for an ACK on a packet Out?
-	success, result := ap.kafkaProxy.InvokeRPC(nil, rpc, &topic, false, args...)
+	// Use a device specific topic as we are the only core handling requests for this device
+	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, deviceId)
+	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, false, args...)
 	log.Debugw("packetOut", log.Fields{"deviceid": deviceId, "success": success})
 	return unPackResponse(rpc, deviceId, success, result)
 }
 
-
 func (ap *AdapterProxy) UpdateFlowsBulk(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups) error {
 	log.Debugw("UpdateFlowsBulk", log.Fields{"deviceId": device.Id})
-	topic := kafka.Topic{Name: device.Type}
+	toTopic := kafka.CreateSubTopic(device.Type, device.Id)
 	rpc := "update_flows_bulk"
 	args := make([]*kafka.KVArg, 3)
 	args[0] = &kafka.KVArg{
@@ -295,14 +341,16 @@
 		Value: groups,
 	}
 
-	success, result := ap.kafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
+	// Use a device specific topic as we are the only core handling requests for this device
+	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
 	log.Debugw("UpdateFlowsBulk-response", log.Fields{"deviceid": device.Id, "success": success})
 	return unPackResponse(rpc, device.Id, success, result)
 }
 
 func (ap *AdapterProxy) UpdateFlowsIncremental(device *voltha.Device, flowChanges *openflow_13.FlowChanges, groupChanges *openflow_13.FlowGroupChanges) error {
 	log.Debugw("UpdateFlowsIncremental", log.Fields{"deviceId": device.Id})
-	topic := kafka.Topic{Name: device.Type}
+	toTopic := kafka.CreateSubTopic(device.Type, device.Id)
 	rpc := "update_flows_bulk"
 	args := make([]*kafka.KVArg, 3)
 	args[0] = &kafka.KVArg{
@@ -318,7 +366,9 @@
 		Value: groupChanges,
 	}
 
-	success, result := ap.kafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
+	// Use a device specific topic as we are the only core handling requests for this device
+	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
 	log.Debugw("UpdateFlowsIncremental-response", log.Fields{"deviceid": device.Id, "success": success})
 	return unPackResponse(rpc, device.Id, success, result)
 }