[VOL-2835] Using different topic per ONU device

Change-Id: I3e55064292f28f9bf39ad6bc75fd5758f5313317
diff --git a/rw_core/core/adapter_proxy.go b/rw_core/core/adapter_proxy.go
index d3907bb..1e18ba4 100755
--- a/rw_core/core/adapter_proxy.go
+++ b/rw_core/core/adapter_proxy.go
@@ -30,14 +30,16 @@
 	deviceTopicRegistered bool
 	corePairTopic         string
 	kafkaICProxy          kafka.InterContainerProxy
+	endpointManager       kafka.EndpointManager
 }
 
 // NewAdapterProxy will return adapter proxy instance
-func NewAdapterProxy(kafkaProxy kafka.InterContainerProxy, corePairTopic string) *AdapterProxy {
+func NewAdapterProxy(kafkaProxy kafka.InterContainerProxy, corePairTopic string, endpointManager kafka.EndpointManager) *AdapterProxy {
 	return &AdapterProxy{
 		kafkaICProxy:          kafkaProxy,
 		corePairTopic:         corePairTopic,
 		deviceTopicRegistered: false,
+		endpointManager:       endpointManager,
 	}
 }
 
@@ -45,8 +47,14 @@
 	return kafka.Topic{Name: ap.corePairTopic}
 }
 
-func (ap *AdapterProxy) getAdapterTopic(adapterName string) kafka.Topic {
-	return kafka.Topic{Name: adapterName}
+func (ap *AdapterProxy) getAdapterTopic(deviceID string, adapterType string) (*kafka.Topic, error) {
+
+	endpoint, err := ap.endpointManager.GetEndpoint(deviceID, adapterType)
+	if err != nil {
+		return nil, err
+	}
+
+	return &kafka.Topic{Name: string(endpoint)}, nil
 }
 
 func (ap *AdapterProxy) sendRPC(ctx context.Context, rpc string, toTopic *kafka.Topic, replyToTopic *kafka.Topic,
@@ -69,167 +77,210 @@
 func (ap *AdapterProxy) adoptDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("adoptDevice", log.Fields{"device-id": device.Id})
 	rpc := "adopt_device"
-	toTopic := ap.getAdapterTopic(device.Adapter)
+	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	if err != nil {
+		return nil, err
+	}
 	args := []*kafka.KVArg{
 		{Key: "device", Value: device},
 	}
 	replyToTopic := ap.getCoreTopic()
 	ap.deviceTopicRegistered = true
-	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+	logger.Debugw("adoptDevice-send-request", log.Fields{"device-id": device.Id, "deviceType": device.Type, "serialNumber": device.SerialNumber})
+	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
 // disableDevice invokes disable device rpc
 func (ap *AdapterProxy) disableDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("disableDevice", log.Fields{"device-id": device.Id})
 	rpc := "disable_device"
-	toTopic := ap.getAdapterTopic(device.Adapter)
+	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	if err != nil {
+		return nil, err
+	}
 	args := []*kafka.KVArg{
 		{Key: "device", Value: device},
 	}
 	replyToTopic := ap.getCoreTopic()
-	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
 // reEnableDevice invokes reenable device rpc
 func (ap *AdapterProxy) reEnableDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("reEnableDevice", log.Fields{"device-id": device.Id})
 	rpc := "reenable_device"
-	toTopic := ap.getAdapterTopic(device.Adapter)
+	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	if err != nil {
+		return nil, err
+	}
 	args := []*kafka.KVArg{
 		{Key: "device", Value: device},
 	}
 	replyToTopic := ap.getCoreTopic()
-	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
 // rebootDevice invokes reboot device rpc
 func (ap *AdapterProxy) rebootDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("rebootDevice", log.Fields{"device-id": device.Id})
 	rpc := "reboot_device"
-	toTopic := ap.getAdapterTopic(device.Adapter)
+	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	if err != nil {
+		return nil, err
+	}
 	args := []*kafka.KVArg{
 		{Key: "device", Value: device},
 	}
 	replyToTopic := ap.getCoreTopic()
-	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
 // deleteDevice invokes delete device rpc
 func (ap *AdapterProxy) deleteDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("deleteDevice", log.Fields{"device-id": device.Id})
 	rpc := "delete_device"
-	toTopic := ap.getAdapterTopic(device.Adapter)
+	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	if err != nil {
+		return nil, err
+	}
 	args := []*kafka.KVArg{
 		{Key: "device", Value: device},
 	}
 	replyToTopic := ap.getCoreTopic()
-	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
 // getOfpDeviceInfo invokes get ofp device info rpc
 func (ap *AdapterProxy) getOfpDeviceInfo(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("getOfpDeviceInfo", log.Fields{"device-id": device.Id})
 	rpc := "get_ofp_device_info"
-	toTopic := ap.getAdapterTopic(device.Adapter)
+	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	if err != nil {
+		return nil, err
+	}
 	args := []*kafka.KVArg{
 		{Key: "device", Value: device},
 	}
 	replyToTopic := ap.getCoreTopic()
-	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
 // getOfpPortInfo invokes get ofp port info rpc
 func (ap *AdapterProxy) getOfpPortInfo(ctx context.Context, device *voltha.Device, portNo uint32) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("getOfpPortInfo", log.Fields{"device-id": device.Id, "port-no": portNo})
-	toTopic := ap.getAdapterTopic(device.Adapter)
+	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	if err != nil {
+		return nil, err
+	}
 	args := []*kafka.KVArg{
 		{Key: "device", Value: device},
 		{Key: "port_no", Value: &ic.IntType{Val: int64(portNo)}},
 	}
 	replyToTopic := ap.getCoreTopic()
-	return ap.sendRPC(ctx, "get_ofp_port_info", &toTopic, &replyToTopic, true, device.Id, args...)
+	return ap.sendRPC(ctx, "get_ofp_port_info", toTopic, &replyToTopic, true, device.Id, args...)
 }
 
 // reconcileDevice invokes reconcile device rpc
 func (ap *AdapterProxy) reconcileDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("reconcileDevice", log.Fields{"device-id": device.Id})
 	rpc := "reconcile_device"
-	toTopic := ap.getAdapterTopic(device.Adapter)
+	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	if err != nil {
+		return nil, err
+	}
 	args := []*kafka.KVArg{
 		{Key: "device", Value: device},
 	}
 	replyToTopic := ap.getCoreTopic()
-	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
 // downloadImage invokes download image rpc
 func (ap *AdapterProxy) downloadImage(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("downloadImage", log.Fields{"device-id": device.Id, "image": download.Name})
 	rpc := "download_image"
-	toTopic := ap.getAdapterTopic(device.Adapter)
+	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	if err != nil {
+		return nil, err
+	}
 	args := []*kafka.KVArg{
 		{Key: "device", Value: device},
 		{Key: "request", Value: download},
 	}
 	replyToTopic := ap.getCoreTopic()
-	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
 // getImageDownloadStatus invokes get image download status rpc
 func (ap *AdapterProxy) getImageDownloadStatus(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("getImageDownloadStatus", log.Fields{"device-id": device.Id, "image": download.Name})
 	rpc := "get_image_download_status"
-	toTopic := ap.getAdapterTopic(device.Adapter)
+	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	if err != nil {
+		return nil, err
+	}
 	args := []*kafka.KVArg{
 		{Key: "device", Value: device},
 		{Key: "request", Value: download},
 	}
 	replyToTopic := ap.getCoreTopic()
-	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
 // cancelImageDownload invokes cancel image download rpc
 func (ap *AdapterProxy) cancelImageDownload(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("cancelImageDownload", log.Fields{"device-id": device.Id, "image": download.Name})
 	rpc := "cancel_image_download"
-	toTopic := ap.getAdapterTopic(device.Adapter)
+	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	if err != nil {
+		return nil, err
+	}
 	args := []*kafka.KVArg{
 		{Key: "device", Value: device},
 		{Key: "request", Value: download},
 	}
 	replyToTopic := ap.getCoreTopic()
-	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
 // activateImageUpdate invokes activate image update rpc
 func (ap *AdapterProxy) activateImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("activateImageUpdate", log.Fields{"device-id": device.Id, "image": download.Name})
 	rpc := "activate_image_update"
-	toTopic := ap.getAdapterTopic(device.Adapter)
+	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	if err != nil {
+		return nil, err
+	}
 	args := []*kafka.KVArg{
 		{Key: "device", Value: device},
 		{Key: "request", Value: download},
 	}
 	replyToTopic := ap.getCoreTopic()
-	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
 // revertImageUpdate invokes revert image update rpc
 func (ap *AdapterProxy) revertImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("revertImageUpdate", log.Fields{"device-id": device.Id, "image": download.Name})
 	rpc := "revert_image_update"
-	toTopic := ap.getAdapterTopic(device.Adapter)
+	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	if err != nil {
+		return nil, err
+	}
 	args := []*kafka.KVArg{
 		{Key: "device", Value: device},
 		{Key: "request", Value: download},
 	}
 	replyToTopic := ap.getCoreTopic()
-	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
 func (ap *AdapterProxy) packetOut(ctx context.Context, deviceType string, deviceID string, outPort uint32, packet *openflow_13.OfpPacketOut) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("packetOut", log.Fields{"device-id": deviceID, "device-type": deviceType, "out-port": outPort})
-	toTopic := ap.getAdapterTopic(deviceType)
+	toTopic, err := ap.getAdapterTopic(deviceID, deviceType)
+	if err != nil {
+		return nil, err
+	}
 	rpc := "receive_packet_out"
 	args := []*kafka.KVArg{
 		{Key: "deviceId", Value: &ic.StrType{Val: deviceID}},
@@ -237,13 +288,16 @@
 		{Key: "packet", Value: packet},
 	}
 	replyToTopic := ap.getCoreTopic()
-	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, deviceID, args...)
+	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, deviceID, args...)
 }
 
 // updateFlowsBulk invokes update flows bulk rpc
 func (ap *AdapterProxy) updateFlowsBulk(ctx context.Context, device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups, flowMetadata *voltha.FlowMetadata) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("updateFlowsBulk", log.Fields{"device-id": device.Id, "flow-count": len(flows.Items), "group-count": len(groups.Items), "flow-metadata": flowMetadata})
-	toTopic := ap.getAdapterTopic(device.Adapter)
+	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	if err != nil {
+		return nil, err
+	}
 	rpc := "update_flows_bulk"
 	args := []*kafka.KVArg{
 		{Key: "device", Value: device},
@@ -252,7 +306,7 @@
 		{Key: "flow_metadata", Value: flowMetadata},
 	}
 	replyToTopic := ap.getCoreTopic()
-	return ap.sendRPC(context.TODO(), rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+	return ap.sendRPC(context.TODO(), rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
 // updateFlowsIncremental invokes update flows incremental rpc
@@ -266,7 +320,10 @@
 			"group-to-delete-count": len(groupChanges.ToRemove.Items),
 			"group-to-update-count": len(groupChanges.ToUpdate.Items),
 		})
-	toTopic := ap.getAdapterTopic(device.Adapter)
+	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	if err != nil {
+		return nil, err
+	}
 	rpc := "update_flows_incrementally"
 	args := []*kafka.KVArg{
 		{Key: "device", Value: device},
@@ -275,83 +332,101 @@
 		{Key: "flow_metadata", Value: flowMetadata},
 	}
 	replyToTopic := ap.getCoreTopic()
-	return ap.sendRPC(context.TODO(), rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+	return ap.sendRPC(context.TODO(), rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
 // updatePmConfigs invokes update pm configs rpc
 func (ap *AdapterProxy) updatePmConfigs(ctx context.Context, device *voltha.Device, pmConfigs *voltha.PmConfigs) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("updatePmConfigs", log.Fields{"device-id": device.Id, "pm-configs-id": pmConfigs.Id})
-	toTopic := ap.getAdapterTopic(device.Adapter)
+	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	if err != nil {
+		return nil, err
+	}
 	rpc := "Update_pm_config"
 	args := []*kafka.KVArg{
 		{Key: "device", Value: device},
 		{Key: "pm_configs", Value: pmConfigs},
 	}
 	replyToTopic := ap.getCoreTopic()
-	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
 // simulateAlarm invokes simulate alarm rpc
 func (ap *AdapterProxy) simulateAlarm(ctx context.Context, device *voltha.Device, simulateReq *voltha.SimulateAlarmRequest) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("simulateAlarm", log.Fields{"device-id": device.Id, "simulate-req-id": simulateReq.Id})
 	rpc := "simulate_alarm"
-	toTopic := ap.getAdapterTopic(device.Adapter)
+	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	if err != nil {
+		return nil, err
+	}
 	args := []*kafka.KVArg{
 		{Key: "device", Value: device},
 		{Key: "request", Value: simulateReq},
 	}
 	replyToTopic := ap.getCoreTopic()
 	ap.deviceTopicRegistered = true
-	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
 func (ap *AdapterProxy) disablePort(ctx context.Context, device *voltha.Device, port *voltha.Port) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("disablePort", log.Fields{"device-id": device.Id, "port-no": port.PortNo})
 	rpc := "disable_port"
-	toTopic := ap.getAdapterTopic(device.Adapter)
+	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	if err != nil {
+		return nil, err
+	}
 	args := []*kafka.KVArg{
 		{Key: "deviceId", Value: &ic.StrType{Val: device.Id}},
 		{Key: "port", Value: port},
 	}
 	replyToTopic := ap.getCoreTopic()
-	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
 func (ap *AdapterProxy) enablePort(ctx context.Context, device *voltha.Device, port *voltha.Port) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("enablePort", log.Fields{"device-id": device.Id, "port-no": port.PortNo})
 	rpc := "enable_port"
-	toTopic := ap.getAdapterTopic(device.Adapter)
+	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	if err != nil {
+		return nil, err
+	}
 	args := []*kafka.KVArg{
 		{Key: "deviceId", Value: &ic.StrType{Val: device.Id}},
 		{Key: "port", Value: port},
 	}
 	replyToTopic := ap.getCoreTopic()
-	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
 // childDeviceLost invokes child device_lost rpc
-func (ap *AdapterProxy) childDeviceLost(ctx context.Context, deviceType string, pDeviceID string, pPortNo uint32, onuID uint32) (chan *kafka.RpcResponse, error) {
-	logger.Debugw("childDeviceLost", log.Fields{"parent-device-id": pDeviceID, "parent-port-no": pPortNo, "onu-id": onuID})
+func (ap *AdapterProxy) childDeviceLost(ctx context.Context, deviceType string, deviceID string, pPortNo uint32, onuID uint32) (chan *kafka.RpcResponse, error) {
+	logger.Debugw("childDeviceLost", log.Fields{"device-id": deviceID, "parent-port-no": pPortNo, "onu-id": onuID})
 	rpc := "child_device_lost"
-	toTopic := ap.getAdapterTopic(deviceType)
+	toTopic, err := ap.getAdapterTopic(deviceID, deviceType)
+	if err != nil {
+		return nil, err
+	}
 	args := []*kafka.KVArg{
-		{Key: "pDeviceId", Value: &ic.StrType{Val: pDeviceID}},
+		{Key: "pDeviceId", Value: &ic.StrType{Val: deviceID}},
 		{Key: "pPortNo", Value: &ic.IntType{Val: int64(pPortNo)}},
 		{Key: "onuID", Value: &ic.IntType{Val: int64(onuID)}},
 	}
 	replyToTopic := ap.getCoreTopic()
-	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, pDeviceID, args...)
+	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, deviceID, args...)
 }
 
 func (ap *AdapterProxy) startOmciTest(ctx context.Context, device *voltha.Device, omcitestrequest *voltha.OmciTestRequest) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("Omci_test_Request_adapter_proxy", log.Fields{"device": device, "omciTestRequest": omcitestrequest})
 	rpc := "start_omci_test"
-	toTopic := ap.getAdapterTopic(device.Adapter)
+	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	if err != nil {
+		return nil, err
+	}
 	// Use a device specific topic as we are the only core handling requests for this device
 	replyToTopic := ap.getCoreTopic()
 	// TODO: Perhaps this should have used omcitestrequest.uuid as the second argument rather
 	//   than including the whole request, which is (deviceid, uuid)
-	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id,
+	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id,
 		&kafka.KVArg{Key: "device", Value: device},
 		&kafka.KVArg{Key: "omcitestrequest", Value: omcitestrequest})
 }