[VOL-2835] Using different topic per ONU device

Change-Id: I3e55064292f28f9bf39ad6bc75fd5758f5313317
diff --git a/rw_core/core/adapter_manager.go b/rw_core/core/adapter_manager.go
index 1ed5b23..faece55 100644
--- a/rw_core/core/adapter_manager.go
+++ b/rw_core/core/adapter_manager.go
@@ -32,12 +32,6 @@
 	"github.com/opencord/voltha-protos/v3/go/voltha"
 )
 
-// sentinel constants
-const (
-	SentinelAdapterID    = "adapter_sentinel"
-	SentinelDevicetypeID = "device_type_sentinel"
-)
-
 // AdapterAgent represents adapter agent
 type AdapterAgent struct {
 	adapter     *voltha.Adapter
@@ -58,15 +52,6 @@
 	return &adapterAgent
 }
 
-func (aa *AdapterAgent) getDeviceType(deviceType string) *voltha.DeviceType {
-	aa.lock.RLock()
-	defer aa.lock.RUnlock()
-	if _, exist := aa.deviceTypes[deviceType]; exist {
-		return aa.deviceTypes[deviceType]
-	}
-	return nil
-}
-
 func (aa *AdapterAgent) getAdapter() *voltha.Adapter {
 	aa.lock.RLock()
 	defer aa.lock.RUnlock()
@@ -74,12 +59,6 @@
 	return aa.adapter
 }
 
-func (aa *AdapterAgent) updateDeviceType(deviceType *voltha.DeviceType) {
-	aa.lock.Lock()
-	defer aa.lock.Unlock()
-	aa.deviceTypes[deviceType.Id] = deviceType
-}
-
 // updateCommunicationTime updates the message to the specified time.
 // No attempt is made to save the time to the db, so only recent times are guaranteed to be accurate.
 func (aa *AdapterAgent) updateCommunicationTime(new time.Time) {
@@ -99,7 +78,7 @@
 // AdapterManager represents adapter manager attributes
 type AdapterManager struct {
 	adapterAgents               map[string]*AdapterAgent
-	deviceTypeToAdapterMap      map[string]string
+	deviceTypes                 map[string]*voltha.DeviceType
 	clusterDataProxy            *model.Proxy
 	deviceMgr                   *DeviceManager
 	coreInstanceID              string
@@ -110,12 +89,12 @@
 
 func newAdapterManager(cdProxy *model.Proxy, coreInstanceID string, kafkaClient kafka.Client, deviceMgr *DeviceManager) *AdapterManager {
 	aMgr := &AdapterManager{
-		exitChannel:            make(chan int, 1),
-		coreInstanceID:         coreInstanceID,
-		clusterDataProxy:       cdProxy,
-		adapterAgents:          make(map[string]*AdapterAgent),
-		deviceTypeToAdapterMap: make(map[string]string),
-		deviceMgr:              deviceMgr,
+		exitChannel:      make(chan int, 1),
+		coreInstanceID:   coreInstanceID,
+		clusterDataProxy: cdProxy,
+		deviceTypes:      make(map[string]*voltha.DeviceType),
+		adapterAgents:    make(map[string]*AdapterAgent),
+		deviceMgr: deviceMgr,
 	}
 	kafkaClient.SubscribeForMetadata(aMgr.updateLastAdapterCommunication)
 	return aMgr
@@ -153,10 +132,6 @@
 				logger.Debugw("adapter added successfully", log.Fields{"adapterId": adapter.Id})
 			}
 		}
-	} else {
-		logger.Debug("no-existing-adapter-found")
-		//	No adapter data.   In order to have a proxy setup for that path let's create a fake adapter
-		return aMgr.addAdapter(&voltha.Adapter{Id: SentinelAdapterID}, true)
 	}
 
 	// Load the device types
@@ -175,8 +150,8 @@
 	}
 
 	logger.Debug("no-existing-device-type-found")
-	//	No device types data.   In order to have a proxy setup for that path let's create a fake device type
-	return aMgr.addDeviceTypes(&voltha.DeviceTypes{Items: []*voltha.DeviceType{{Id: SentinelDevicetypeID, Adapter: SentinelAdapterID}}}, true)
+
+	return nil
 }
 
 func (aMgr *AdapterManager) updateLastAdapterCommunication(adapterID string, timestamp int64) {
@@ -192,7 +167,8 @@
 func (aMgr *AdapterManager) addAdapter(adapter *voltha.Adapter, saveToDb bool) error {
 	aMgr.lockAdaptersMap.Lock()
 	defer aMgr.lockAdaptersMap.Unlock()
-	logger.Debugw("adding-adapter", log.Fields{"adapter": adapter})
+	logger.Debugw("adding-adapter", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
+		"currentReplica": adapter.CurrentReplica, "totalReplicas": adapter.TotalReplicas, "endpoint": adapter.Endpoint})
 	if _, exist := aMgr.adapterAgents[adapter.Id]; !exist {
 		if saveToDb {
 			// Save the adapter to the KV store - first check if it already exist
@@ -201,10 +177,17 @@
 				return err
 			} else if !have {
 				if err := aMgr.clusterDataProxy.AddWithID(context.Background(), "adapters", adapter.Id, adapter); err != nil {
-					logger.Errorw("failed-to-save-adapter-to-cluster-proxy", log.Fields{"error": err})
+					logger.Errorw("failed-to-save-adapter", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
+						"currentReplica": adapter.CurrentReplica, "totalReplicas": adapter.TotalReplicas, "endpoint": adapter.Endpoint, "replica": adapter.CurrentReplica, "total": adapter.TotalReplicas})
 					return err
 				}
-				logger.Debugw("adapter-saved-to-KV-Store", log.Fields{"adapter": adapter})
+				logger.Debugw("adapter-saved-to-KV-Store", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
+					"currentReplica": adapter.CurrentReplica, "totalReplicas": adapter.TotalReplicas, "endpoint": adapter.Endpoint, "replica": adapter.CurrentReplica, "total": adapter.TotalReplicas})
+			} else {
+				log.Warnw("adding-adapter-already-in-KV-store", log.Fields{
+					"adapterName":    adapter.Id,
+					"adapterReplica": adapter.CurrentReplica,
+				})
 			}
 		}
 		clonedAdapter := (proto.Clone(adapter)).(*voltha.Adapter)
@@ -223,6 +206,11 @@
 	aMgr.lockdDeviceTypeToAdapterMap.Lock()
 	defer aMgr.lockdDeviceTypeToAdapterMap.Unlock()
 
+	// create an in memory map to fetch the entire voltha.DeviceType from a device.Type string
+	for _, deviceType := range deviceTypes.Items {
+		aMgr.deviceTypes[deviceType.Id] = deviceType
+	}
+
 	if saveToDb {
 		// Save the device types to the KV store
 		for _, deviceType := range deviceTypes.Items {
@@ -240,17 +228,7 @@
 			}
 		}
 	}
-	// and save locally
-	for _, deviceType := range deviceTypes.Items {
-		clonedDType := (proto.Clone(deviceType)).(*voltha.DeviceType)
-		if adapterAgent, exist := aMgr.adapterAgents[clonedDType.Adapter]; exist {
-			adapterAgent.updateDeviceType(clonedDType)
-		} else {
-			logger.Debugw("adapter-not-exist", log.Fields{"deviceTypes": deviceTypes, "adapterId": clonedDType.Adapter})
-			aMgr.adapterAgents[clonedDType.Adapter] = newAdapterAgent(&voltha.Adapter{Id: clonedDType.Adapter}, deviceTypes)
-		}
-		aMgr.deviceTypeToAdapterMap[clonedDType.Id] = clonedDType.Adapter
-	}
+
 	return nil
 }
 
@@ -260,9 +238,7 @@
 	defer aMgr.lockAdaptersMap.RUnlock()
 	for _, adapterAgent := range aMgr.adapterAgents {
 		if a := adapterAgent.getAdapter(); a != nil {
-			if a.Id != SentinelAdapterID { // don't report the sentinel
-				result.Items = append(result.Items, (proto.Clone(a)).(*voltha.Adapter))
-			}
+			result.Items = append(result.Items, (proto.Clone(a)).(*voltha.Adapter))
 		}
 	}
 	return result, nil
@@ -278,7 +254,8 @@
 }
 
 func (aMgr *AdapterManager) registerAdapter(adapter *voltha.Adapter, deviceTypes *voltha.DeviceTypes) (*voltha.CoreInstance, error) {
-	logger.Debugw("registerAdapter", log.Fields{"adapter": adapter, "deviceTypes": deviceTypes.Items})
+	logger.Debugw("registerAdapter", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
+		"currentReplica": adapter.CurrentReplica, "totalReplicas": adapter.TotalReplicas, "endpoint": adapter.Endpoint, "deviceTypes": deviceTypes.Items})
 
 	if aMgr.getAdapter(adapter.Id) != nil {
 		//	Already registered - Adapter may have restarted.  Trigger the reconcile process for that adapter
@@ -300,17 +277,20 @@
 		return nil, err
 	}
 
-	logger.Debugw("adapter-registered", log.Fields{"adapter": adapter.Id})
+	logger.Debugw("adapter-registered", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
+		"currentReplica": adapter.CurrentReplica, "totalReplicas": adapter.TotalReplicas, "endpoint": adapter.Endpoint})
 
 	return &voltha.CoreInstance{InstanceId: aMgr.coreInstanceID}, nil
 }
 
-//getAdapterName returns the name of the device adapter that service this device type
-func (aMgr *AdapterManager) getAdapterName(deviceType string) (string, error) {
+// getAdapterType returns the name of the device adapter that service this device type
+func (aMgr *AdapterManager) getAdapterType(deviceType string) (string, error) {
 	aMgr.lockdDeviceTypeToAdapterMap.Lock()
 	defer aMgr.lockdDeviceTypeToAdapterMap.Unlock()
-	if adapterID, exist := aMgr.deviceTypeToAdapterMap[deviceType]; exist {
-		return adapterID, nil
+	for _, adapterAgent := range aMgr.adapterAgents {
+		if deviceType == adapterAgent.adapter.Type {
+			return adapterAgent.adapter.Type, nil
+		}
 	}
 	return "", fmt.Errorf("Adapter-not-registered-for-device-type %s", deviceType)
 }
@@ -319,16 +299,12 @@
 	aMgr.lockdDeviceTypeToAdapterMap.Lock()
 	defer aMgr.lockdDeviceTypeToAdapterMap.Unlock()
 
-	deviceTypes := make([]*voltha.DeviceType, 0, len(aMgr.deviceTypeToAdapterMap))
-	for deviceTypeID, adapterID := range aMgr.deviceTypeToAdapterMap {
-		if adapterAgent, have := aMgr.adapterAgents[adapterID]; have {
-			if deviceType := adapterAgent.getDeviceType(deviceTypeID); deviceType != nil {
-				if deviceType.Id != SentinelDevicetypeID { // don't report the sentinel
-					deviceTypes = append(deviceTypes, deviceType)
-				}
-			}
-		}
+	deviceTypes := make([]*voltha.DeviceType, 0, len(aMgr.deviceTypes))
+
+	for _, deviceType := range aMgr.deviceTypes {
+		deviceTypes = append(deviceTypes, deviceType)
 	}
+
 	return deviceTypes
 }
 
@@ -337,10 +313,9 @@
 	aMgr.lockdDeviceTypeToAdapterMap.Lock()
 	defer aMgr.lockdDeviceTypeToAdapterMap.Unlock()
 
-	if adapterID, exist := aMgr.deviceTypeToAdapterMap[deviceType]; exist {
-		if adapterAgent := aMgr.adapterAgents[adapterID]; adapterAgent != nil {
-			return adapterAgent.getDeviceType(deviceType)
-		}
+	if deviceType, exist := aMgr.deviceTypes[deviceType]; exist {
+		return deviceType
 	}
+
 	return nil
 }