[VOL-2835] Using different topic per ONU device

Change-Id: I3e55064292f28f9bf39ad6bc75fd5758f5313317
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index afe84e8..18593d8 100755
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -56,12 +56,15 @@
 }
 
 func newDeviceManager(core *Core) *DeviceManager {
+
+	endpointManager := kafka.NewEndpointManager(&core.backend)
+
 	var deviceMgr DeviceManager
 	deviceMgr.core = core
 	deviceMgr.exitChannel = make(chan int, 1)
 	deviceMgr.rootDevices = make(map[string]bool)
 	deviceMgr.kafkaICProxy = core.kmp
-	deviceMgr.adapterProxy = NewAdapterProxy(core.kmp, core.config.CorePairTopic)
+	deviceMgr.adapterProxy = NewAdapterProxy(core.kmp, core.config.CorePairTopic, endpointManager)
 	deviceMgr.coreInstanceID = core.instanceID
 	deviceMgr.clusterDataProxy = core.clusterDataProxy
 	deviceMgr.adapterMgr = core.adapterMgr
@@ -186,7 +189,6 @@
 	} else {
 		res = status.Errorf(codes.NotFound, "%s", id.Id)
 	}
-
 	sendResponse(ctx, ch, res)
 }
 
@@ -601,7 +603,8 @@
 
 // adapterRestarted is invoked whenever an adapter is restarted
 func (dMgr *DeviceManager) adapterRestarted(ctx context.Context, adapter *voltha.Adapter) error {
-	logger.Debugw("adapter-restarted", log.Fields{"adapter": adapter.Id})
+	logger.Debugw("adapter-restarted", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
+		"currentReplica": adapter.CurrentReplica, "totalReplicas": adapter.TotalReplicas, "endpoint": adapter.Endpoint})
 
 	// Let's reconcile the device managed by this Core only
 	if len(dMgr.rootDevices) == 0 {