[VOL-2835] Using different topic per ONU device
Change-Id: I3e55064292f28f9bf39ad6bc75fd5758f5313317
diff --git a/rw_core/core/adapter_manager.go b/rw_core/core/adapter_manager.go
index 1ed5b23..faece55 100644
--- a/rw_core/core/adapter_manager.go
+++ b/rw_core/core/adapter_manager.go
@@ -32,12 +32,6 @@
"github.com/opencord/voltha-protos/v3/go/voltha"
)
-// sentinel constants
-const (
- SentinelAdapterID = "adapter_sentinel"
- SentinelDevicetypeID = "device_type_sentinel"
-)
-
// AdapterAgent represents adapter agent
type AdapterAgent struct {
adapter *voltha.Adapter
@@ -58,15 +52,6 @@
return &adapterAgent
}
-func (aa *AdapterAgent) getDeviceType(deviceType string) *voltha.DeviceType {
- aa.lock.RLock()
- defer aa.lock.RUnlock()
- if _, exist := aa.deviceTypes[deviceType]; exist {
- return aa.deviceTypes[deviceType]
- }
- return nil
-}
-
func (aa *AdapterAgent) getAdapter() *voltha.Adapter {
aa.lock.RLock()
defer aa.lock.RUnlock()
@@ -74,12 +59,6 @@
return aa.adapter
}
-func (aa *AdapterAgent) updateDeviceType(deviceType *voltha.DeviceType) {
- aa.lock.Lock()
- defer aa.lock.Unlock()
- aa.deviceTypes[deviceType.Id] = deviceType
-}
-
// updateCommunicationTime updates the message to the specified time.
// No attempt is made to save the time to the db, so only recent times are guaranteed to be accurate.
func (aa *AdapterAgent) updateCommunicationTime(new time.Time) {
@@ -99,7 +78,7 @@
// AdapterManager represents adapter manager attributes
type AdapterManager struct {
adapterAgents map[string]*AdapterAgent
- deviceTypeToAdapterMap map[string]string
+ deviceTypes map[string]*voltha.DeviceType
clusterDataProxy *model.Proxy
deviceMgr *DeviceManager
coreInstanceID string
@@ -110,12 +89,12 @@
func newAdapterManager(cdProxy *model.Proxy, coreInstanceID string, kafkaClient kafka.Client, deviceMgr *DeviceManager) *AdapterManager {
aMgr := &AdapterManager{
- exitChannel: make(chan int, 1),
- coreInstanceID: coreInstanceID,
- clusterDataProxy: cdProxy,
- adapterAgents: make(map[string]*AdapterAgent),
- deviceTypeToAdapterMap: make(map[string]string),
- deviceMgr: deviceMgr,
+ exitChannel: make(chan int, 1),
+ coreInstanceID: coreInstanceID,
+ clusterDataProxy: cdProxy,
+ deviceTypes: make(map[string]*voltha.DeviceType),
+ adapterAgents: make(map[string]*AdapterAgent),
+ deviceMgr: deviceMgr,
}
kafkaClient.SubscribeForMetadata(aMgr.updateLastAdapterCommunication)
return aMgr
@@ -153,10 +132,6 @@
logger.Debugw("adapter added successfully", log.Fields{"adapterId": adapter.Id})
}
}
- } else {
- logger.Debug("no-existing-adapter-found")
- // No adapter data. In order to have a proxy setup for that path let's create a fake adapter
- return aMgr.addAdapter(&voltha.Adapter{Id: SentinelAdapterID}, true)
}
// Load the device types
@@ -175,8 +150,8 @@
}
logger.Debug("no-existing-device-type-found")
- // No device types data. In order to have a proxy setup for that path let's create a fake device type
- return aMgr.addDeviceTypes(&voltha.DeviceTypes{Items: []*voltha.DeviceType{{Id: SentinelDevicetypeID, Adapter: SentinelAdapterID}}}, true)
+
+ return nil
}
func (aMgr *AdapterManager) updateLastAdapterCommunication(adapterID string, timestamp int64) {
@@ -192,7 +167,8 @@
func (aMgr *AdapterManager) addAdapter(adapter *voltha.Adapter, saveToDb bool) error {
aMgr.lockAdaptersMap.Lock()
defer aMgr.lockAdaptersMap.Unlock()
- logger.Debugw("adding-adapter", log.Fields{"adapter": adapter})
+ logger.Debugw("adding-adapter", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
+ "currentReplica": adapter.CurrentReplica, "totalReplicas": adapter.TotalReplicas, "endpoint": adapter.Endpoint})
if _, exist := aMgr.adapterAgents[adapter.Id]; !exist {
if saveToDb {
// Save the adapter to the KV store - first check if it already exist
@@ -201,10 +177,17 @@
return err
} else if !have {
if err := aMgr.clusterDataProxy.AddWithID(context.Background(), "adapters", adapter.Id, adapter); err != nil {
- logger.Errorw("failed-to-save-adapter-to-cluster-proxy", log.Fields{"error": err})
+ logger.Errorw("failed-to-save-adapter", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
+ "currentReplica": adapter.CurrentReplica, "totalReplicas": adapter.TotalReplicas, "endpoint": adapter.Endpoint, "replica": adapter.CurrentReplica, "total": adapter.TotalReplicas})
return err
}
- logger.Debugw("adapter-saved-to-KV-Store", log.Fields{"adapter": adapter})
+ logger.Debugw("adapter-saved-to-KV-Store", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
+ "currentReplica": adapter.CurrentReplica, "totalReplicas": adapter.TotalReplicas, "endpoint": adapter.Endpoint, "replica": adapter.CurrentReplica, "total": adapter.TotalReplicas})
+ } else {
+ log.Warnw("adding-adapter-already-in-KV-store", log.Fields{
+ "adapterName": adapter.Id,
+ "adapterReplica": adapter.CurrentReplica,
+ })
}
}
clonedAdapter := (proto.Clone(adapter)).(*voltha.Adapter)
@@ -223,6 +206,11 @@
aMgr.lockdDeviceTypeToAdapterMap.Lock()
defer aMgr.lockdDeviceTypeToAdapterMap.Unlock()
+ // create an in memory map to fetch the entire voltha.DeviceType from a device.Type string
+ for _, deviceType := range deviceTypes.Items {
+ aMgr.deviceTypes[deviceType.Id] = deviceType
+ }
+
if saveToDb {
// Save the device types to the KV store
for _, deviceType := range deviceTypes.Items {
@@ -240,17 +228,7 @@
}
}
}
- // and save locally
- for _, deviceType := range deviceTypes.Items {
- clonedDType := (proto.Clone(deviceType)).(*voltha.DeviceType)
- if adapterAgent, exist := aMgr.adapterAgents[clonedDType.Adapter]; exist {
- adapterAgent.updateDeviceType(clonedDType)
- } else {
- logger.Debugw("adapter-not-exist", log.Fields{"deviceTypes": deviceTypes, "adapterId": clonedDType.Adapter})
- aMgr.adapterAgents[clonedDType.Adapter] = newAdapterAgent(&voltha.Adapter{Id: clonedDType.Adapter}, deviceTypes)
- }
- aMgr.deviceTypeToAdapterMap[clonedDType.Id] = clonedDType.Adapter
- }
+
return nil
}
@@ -260,9 +238,7 @@
defer aMgr.lockAdaptersMap.RUnlock()
for _, adapterAgent := range aMgr.adapterAgents {
if a := adapterAgent.getAdapter(); a != nil {
- if a.Id != SentinelAdapterID { // don't report the sentinel
- result.Items = append(result.Items, (proto.Clone(a)).(*voltha.Adapter))
- }
+ result.Items = append(result.Items, (proto.Clone(a)).(*voltha.Adapter))
}
}
return result, nil
@@ -278,7 +254,8 @@
}
func (aMgr *AdapterManager) registerAdapter(adapter *voltha.Adapter, deviceTypes *voltha.DeviceTypes) (*voltha.CoreInstance, error) {
- logger.Debugw("registerAdapter", log.Fields{"adapter": adapter, "deviceTypes": deviceTypes.Items})
+ logger.Debugw("registerAdapter", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
+ "currentReplica": adapter.CurrentReplica, "totalReplicas": adapter.TotalReplicas, "endpoint": adapter.Endpoint, "deviceTypes": deviceTypes.Items})
if aMgr.getAdapter(adapter.Id) != nil {
// Already registered - Adapter may have restarted. Trigger the reconcile process for that adapter
@@ -300,17 +277,20 @@
return nil, err
}
- logger.Debugw("adapter-registered", log.Fields{"adapter": adapter.Id})
+ logger.Debugw("adapter-registered", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
+ "currentReplica": adapter.CurrentReplica, "totalReplicas": adapter.TotalReplicas, "endpoint": adapter.Endpoint})
return &voltha.CoreInstance{InstanceId: aMgr.coreInstanceID}, nil
}
-//getAdapterName returns the name of the device adapter that service this device type
-func (aMgr *AdapterManager) getAdapterName(deviceType string) (string, error) {
+// getAdapterType returns the name of the device adapter that service this device type
+func (aMgr *AdapterManager) getAdapterType(deviceType string) (string, error) {
aMgr.lockdDeviceTypeToAdapterMap.Lock()
defer aMgr.lockdDeviceTypeToAdapterMap.Unlock()
- if adapterID, exist := aMgr.deviceTypeToAdapterMap[deviceType]; exist {
- return adapterID, nil
+ for _, adapterAgent := range aMgr.adapterAgents {
+ if deviceType == adapterAgent.adapter.Type {
+ return adapterAgent.adapter.Type, nil
+ }
}
return "", fmt.Errorf("Adapter-not-registered-for-device-type %s", deviceType)
}
@@ -319,16 +299,12 @@
aMgr.lockdDeviceTypeToAdapterMap.Lock()
defer aMgr.lockdDeviceTypeToAdapterMap.Unlock()
- deviceTypes := make([]*voltha.DeviceType, 0, len(aMgr.deviceTypeToAdapterMap))
- for deviceTypeID, adapterID := range aMgr.deviceTypeToAdapterMap {
- if adapterAgent, have := aMgr.adapterAgents[adapterID]; have {
- if deviceType := adapterAgent.getDeviceType(deviceTypeID); deviceType != nil {
- if deviceType.Id != SentinelDevicetypeID { // don't report the sentinel
- deviceTypes = append(deviceTypes, deviceType)
- }
- }
- }
+ deviceTypes := make([]*voltha.DeviceType, 0, len(aMgr.deviceTypes))
+
+ for _, deviceType := range aMgr.deviceTypes {
+ deviceTypes = append(deviceTypes, deviceType)
}
+
return deviceTypes
}
@@ -337,10 +313,9 @@
aMgr.lockdDeviceTypeToAdapterMap.Lock()
defer aMgr.lockdDeviceTypeToAdapterMap.Unlock()
- if adapterID, exist := aMgr.deviceTypeToAdapterMap[deviceType]; exist {
- if adapterAgent := aMgr.adapterAgents[adapterID]; adapterAgent != nil {
- return adapterAgent.getDeviceType(deviceType)
- }
+ if deviceType, exist := aMgr.deviceTypes[deviceType]; exist {
+ return deviceType
}
+
return nil
}