[VOL-5417]:During scale testing, openolt adapter consumes extremely high memory and high CPU utilization while reconcilation.
Change-Id: I7db1cef727cffa01adc0fb08a8243154cbdd7fda
Signed-off-by: pnalmas <praneeth.nalmas@radisys.com>
diff --git a/VERSION b/VERSION
index cc19e06..bf43f75 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-7.6.4
+7.6.5
diff --git a/pkg/techprofile/tech_profile.go b/pkg/techprofile/tech_profile.go
index 01f7db6..ed284b9 100644
--- a/pkg/techprofile/tech_profile.go
+++ b/pkg/techprofile/tech_profile.go
@@ -23,6 +23,7 @@
"fmt"
"regexp"
"strconv"
+ "strings"
"sync"
"time"
@@ -193,9 +194,9 @@
}
}
-func NewTechProfile(ctx context.Context, resourceMgr iPonResourceMgr, kvStoreType string, kvStoreAddress string, basePathKvStore string) (*TechProfileMgr, error) {
+func NewTechProfile(ctx context.Context, IntfId uint32, deviceId string, resourceMgr iPonResourceMgr, kvStoreType string, kvStoreAddress string, basePathKvStore string) (*TechProfileMgr, error) {
var techprofileObj TechProfileMgr
- logger.Debug(ctx, "initializing-techprofile-mananger")
+ logger.Debug(ctx, "initializing-techprofile-mananger ", log.Fields{"IntId": IntfId, "device-id": deviceId})
techprofileObj.config = NewTechProfileFlags(kvStoreType, kvStoreAddress, basePathKvStore)
techprofileObj.config.KVBackend = techprofileObj.SetKVClient(ctx, techprofileObj.config.TPKVPathPrefix)
techprofileObj.config.DefaultTpKVBackend = techprofileObj.SetKVClient(ctx, techprofileObj.config.defaultTpKvPathPrefix)
@@ -214,7 +215,7 @@
techprofileObj.tpMap = make(map[uint32]*tp_pb.TechProfile)
techprofileObj.eponTpMap = make(map[uint32]*tp_pb.EponTechProfile)
logger.Debug(ctx, "reconcile-tp-instance-cache-start")
- if err := techprofileObj.reconcileTpInstancesToCache(ctx); err != nil {
+ if err := techprofileObj.reconcileTpInstancesToCache(ctx, IntfId, deviceId); err != nil {
logger.Errorw(ctx, "failed-to-reconcile-tp-instances", log.Fields{"err": err})
return nil, err
}
@@ -1253,78 +1254,23 @@
// buildTpInstanceFromResourceInstance for GPON, XGPON and XGS-PON technology - build TpInstance from TechProfile template and ResourceInstance
func (t *TechProfileMgr) buildTpInstanceFromResourceInstance(ctx context.Context, tp *tp_pb.TechProfile, resInst *tp_pb.ResourceInstance) *tp_pb.TechProfileInstance {
- var usGemPortAttributeList []*tp_pb.GemPortAttributes
- var dsGemPortAttributeList []*tp_pb.GemPortAttributes
- var dsMulticastGemAttributeList []*tp_pb.GemPortAttributes
- var dsUnicastGemAttributeList []*tp_pb.GemPortAttributes
-
if len(resInst.GemportIds) != int(tp.NumGemPorts) {
logger.Errorw(ctx, "mismatch-in-number-of-gemports-between-template-and-resource-instance",
log.Fields{"tpID": resInst.TpId, "totalResInstGemPortIDs": len(resInst.GemportIds), "totalTpTemplateGemPorts": tp.NumGemPorts})
return nil
}
- for index := 0; index < int(tp.NumGemPorts); index++ {
- usGemPortAttributeList = append(usGemPortAttributeList,
- &tp_pb.GemPortAttributes{GemportId: resInst.GemportIds[index],
- MaxQSize: tp.UpstreamGemPortAttributeList[index].MaxQSize,
- PbitMap: tp.UpstreamGemPortAttributeList[index].PbitMap,
- AesEncryption: tp.UpstreamGemPortAttributeList[index].AesEncryption,
- SchedulingPolicy: tp.UpstreamGemPortAttributeList[index].SchedulingPolicy,
- PriorityQ: tp.UpstreamGemPortAttributeList[index].PriorityQ,
- Weight: tp.UpstreamGemPortAttributeList[index].Weight,
- DiscardPolicy: tp.UpstreamGemPortAttributeList[index].DiscardPolicy,
- DiscardConfig: tp.UpstreamGemPortAttributeList[index].DiscardConfig})
- }
- //put multicast and unicast downstream GEM port attributes in different lists first
- for index := 0; index < len(tp.DownstreamGemPortAttributeList); index++ {
- if isMulticastGem(tp.DownstreamGemPortAttributeList[index].IsMulticast) {
- dsMulticastGemAttributeList = append(dsMulticastGemAttributeList,
- &tp_pb.GemPortAttributes{
- MulticastGemId: tp.DownstreamGemPortAttributeList[index].MulticastGemId,
- MaxQSize: tp.DownstreamGemPortAttributeList[index].MaxQSize,
- PbitMap: tp.DownstreamGemPortAttributeList[index].PbitMap,
- AesEncryption: tp.DownstreamGemPortAttributeList[index].AesEncryption,
- SchedulingPolicy: tp.DownstreamGemPortAttributeList[index].SchedulingPolicy,
- PriorityQ: tp.DownstreamGemPortAttributeList[index].PriorityQ,
- Weight: tp.DownstreamGemPortAttributeList[index].Weight,
- DiscardPolicy: tp.DownstreamGemPortAttributeList[index].DiscardPolicy,
- DiscardConfig: tp.DownstreamGemPortAttributeList[index].DiscardConfig,
- IsMulticast: tp.DownstreamGemPortAttributeList[index].IsMulticast,
- DynamicAccessControlList: tp.DownstreamGemPortAttributeList[index].DynamicAccessControlList,
- StaticAccessControlList: tp.DownstreamGemPortAttributeList[index].StaticAccessControlList})
- } else {
- dsUnicastGemAttributeList = append(dsUnicastGemAttributeList,
- &tp_pb.GemPortAttributes{
- MaxQSize: tp.DownstreamGemPortAttributeList[index].MaxQSize,
- PbitMap: tp.DownstreamGemPortAttributeList[index].PbitMap,
- AesEncryption: tp.DownstreamGemPortAttributeList[index].AesEncryption,
- SchedulingPolicy: tp.DownstreamGemPortAttributeList[index].SchedulingPolicy,
- PriorityQ: tp.DownstreamGemPortAttributeList[index].PriorityQ,
- Weight: tp.DownstreamGemPortAttributeList[index].Weight,
- DiscardPolicy: tp.DownstreamGemPortAttributeList[index].DiscardPolicy,
- DiscardConfig: tp.DownstreamGemPortAttributeList[index].DiscardConfig})
- }
- }
- //add unicast downstream GEM ports to dsGemPortAttributeList
- if dsUnicastGemAttributeList != nil {
- for index := 0; index < int(tp.NumGemPorts); index++ {
- dsGemPortAttributeList = append(dsGemPortAttributeList,
- &tp_pb.GemPortAttributes{GemportId: resInst.GemportIds[index],
- MaxQSize: dsUnicastGemAttributeList[index].MaxQSize,
- PbitMap: dsUnicastGemAttributeList[index].PbitMap,
- AesEncryption: dsUnicastGemAttributeList[index].AesEncryption,
- SchedulingPolicy: dsUnicastGemAttributeList[index].SchedulingPolicy,
- PriorityQ: dsUnicastGemAttributeList[index].PriorityQ,
- Weight: dsUnicastGemAttributeList[index].Weight,
- DiscardPolicy: dsUnicastGemAttributeList[index].DiscardPolicy,
- DiscardConfig: dsUnicastGemAttributeList[index].DiscardConfig})
- }
- }
- //add multicast GEM ports to dsGemPortAttributeList afterwards
- for k := range dsMulticastGemAttributeList {
- dsGemPortAttributeList = append(dsGemPortAttributeList, dsMulticastGemAttributeList[k])
- }
+ usGemPortAttributeList := make([]*tp_pb.GemPortAttributes, 0, tp.NumGemPorts)
+ dsGemPortAttributeList := make([]*tp_pb.GemPortAttributes, 0, tp.NumGemPorts)
+ dsMulticastGemAttributeList := make([]*tp_pb.GemPortAttributes, 0)
+ dsUnicastGemAttributeList := make([]*tp_pb.GemPortAttributes, 0)
+
+ logger.Debugw(ctx, "Building TP Instance",
+ log.Fields{"tpID": resInst.TpId, "totalResInstGemPortIDs": len(resInst.GemportIds), "totalTpTemplateGemPorts": tp.NumGemPorts})
+
+ usGemPortAttributeList = t.buildUpstreamGemPortAttributes(ctx, tp, resInst, usGemPortAttributeList)
+ dsUnicastGemAttributeList, dsMulticastGemAttributeList = t.separateDownstreamGemPortAttributes(ctx, tp, dsUnicastGemAttributeList, dsMulticastGemAttributeList)
+ dsGemPortAttributeList = t.buildDownstreamGemPortAttributes(ctx, tp, resInst, dsUnicastGemAttributeList, dsMulticastGemAttributeList, dsGemPortAttributeList)
return &tp_pb.TechProfileInstance{
SubscriberIdentifier: resInst.SubscriberIdentifier,
@@ -1435,83 +1381,174 @@
return t.buildEponTpInstanceFromResourceInstance(ctx, eponTp, resInst)
}
-func (t *TechProfileMgr) reconcileTpInstancesToCache(ctx context.Context) error {
+func (t *TechProfileMgr) reconcileTpInstancesToCache(ctx context.Context, IntfId uint32, deviceId string) error {
tech := t.resourceMgr.GetTechnology()
newCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
- kvPairs, _ := t.config.ResourceInstanceKVBacked.List(newCtx, tech)
- if tech == xgspon || tech == xgpon || tech == gpon {
- for keyPath, kvPair := range kvPairs {
- logger.Debugw(ctx, "attempting-to-reconcile-tp-instance-from-resource-instance", log.Fields{"resourceInstPath": keyPath})
- if value, err := kvstore.ToByte(kvPair.Value); err == nil {
- var resInst tp_pb.ResourceInstance
- if err = proto.Unmarshal(value, &resInst); err != nil {
- logger.Errorw(ctx, "error-unmarshal-kv-pair", log.Fields{"err": err, "keyPath": keyPath, "value": value})
- continue
- } else {
- if tpInst := t.getTpInstanceFromResourceInstance(ctx, &resInst); tpInst != nil {
- // Trim the kv path by removing the default prefix part and get only the suffix part to reference the internal cache
- keySuffixSlice := regexp.MustCompile(t.config.ResourceInstanceKVPathPrefix+"/").Split(keyPath, 2)
- if len(keySuffixSlice) == 2 {
- keySuffixFormatRegexp := regexp.MustCompile(`^[a-zA-Z\-]+/[0-9]+/olt-{[a-z0-9\-]+}/pon-{[0-9]+}/onu-{[0-9]+}/uni-{[0-9]+}$`)
- // Make sure the keySuffixSlice is as per format [a-zA-Z-+]/[\d+]/olt-{[a-z0-9\-]+}/pon-{[0-9]+}/onu-{[0-9]+}/uni-{[0-9]+}
- if !keySuffixFormatRegexp.Match([]byte(keySuffixSlice[1])) {
- logger.Errorw(ctx, "kv-path-not-confirming-to-format", log.Fields{"kvPath": keySuffixSlice[1]})
- continue
- }
- } else {
- logger.Errorw(ctx, "kv-instance-key-path-not-in-the-expected-format", log.Fields{"kvPath": keyPath})
- continue
- }
- t.tpInstanceMapLock.Lock()
- t.tpInstanceMap[keySuffixSlice[1]] = tpInst
- t.tpInstanceMapLock.Unlock()
- logger.Debugw(ctx, "reconciled-tp-success", log.Fields{"keyPath": keyPath})
- }
- }
- } else {
- logger.Errorw(ctx, "error-converting-kv-pair-value-to-byte", log.Fields{"err": err})
- }
+ //VOL-5417:Only reconcile the tech profiles for the subscribers associated with this PON port on this device.
+ //Getting the list of supported tech profile IDs and then reconciling the tech profiles for these IDs associated
+ //with the subscribers on this device for this PON port.
+
+ //Fetching the techprofile Keys from the KV Store.
+ tpkeys, _ := t.config.DefaultTpKVBackend.GetWithPrefixKeysOnly(ctx, tech)
+
+ // Extract the techprofile Ids from the keys
+ // The tpkeys will be of the format "service/voltha/technology_profiles/GPON/65"
+ // where 65 is the techprofile Id
+ tpIds := make([]uint32, 0)
+
+ for _, key := range tpkeys {
+ parts := strings.Split(key, "/")
+ // Ensure the key has the expected format
+ if len(parts) < 5 {
+ logger.Errorw(ctx, "Key does not match expected format", log.Fields{"key": key})
+ continue
}
- } else if tech == epon {
- for keyPath, kvPair := range kvPairs {
- logger.Debugw(ctx, "attempting-to-reconcile-epon-tp-instance", log.Fields{"keyPath": keyPath})
- if value, err := kvstore.ToByte(kvPair.Value); err == nil {
- var resInst tp_pb.ResourceInstance
- if err = proto.Unmarshal(value, &resInst); err != nil {
- logger.Errorw(ctx, "error-unmarshal-kv-pair", log.Fields{"keyPath": keyPath, "value": value})
- continue
- } else {
- if eponTpInst := t.getEponTpInstanceFromResourceInstance(ctx, &resInst); eponTpInst != nil {
- // Trim the kv path by removing the default prefix part and get only the suffix part to reference the internal cache
- keySuffixSlice := regexp.MustCompile(t.config.ResourceInstanceKVPathPrefix+"/").Split(keyPath, 2)
- if len(keySuffixSlice) == 2 {
- keySuffixFormatRegexp := regexp.MustCompile(`^[a-zA-Z\-]+/[0-9]+/olt-{[a-z0-9\-]+}/pon-{[0-9]+}/onu-{[0-9]+}/uni-{[0-9]+}$`)
- // Make sure the keySuffixSlice is as per format [a-zA-Z-+]/[\d+]/olt-{[a-z0-9\-]+}/pon-{[0-9]+}/onu-{[0-9]+}/uni-{[0-9]+}
- if !keySuffixFormatRegexp.Match([]byte(keySuffixSlice[1])) {
- logger.Errorw(ctx, "kv-path-not-confirming-to-format", log.Fields{"kvPath": keySuffixSlice[1]})
- continue
- }
- } else {
- logger.Errorw(ctx, "kv-instance-key-path-not-in-the-expected-format", log.Fields{"kvPath": keyPath})
- continue
- }
- t.epontpInstanceMapLock.Lock()
- t.eponTpInstanceMap[keySuffixSlice[1]] = eponTpInst
- t.epontpInstanceMapLock.Unlock()
- logger.Debugw(ctx, "reconciled-epon-tp-success", log.Fields{"keyPath": keyPath})
- }
- }
- } else {
- logger.Errorw(ctx, "error-converting-kv-pair-value-to-byte", log.Fields{"err": err})
- }
+ // Convert the last part of the key to uint32 (techprofile Id)
+ tpId, err := strconv.Atoi(parts[len(parts)-1])
+ if err != nil {
+ logger.Errorw(ctx, "Error converting techprofile Id to int", log.Fields{"key": key, "error": err})
+ continue
}
- } else {
- logger.Errorw(ctx, "unknown-tech", log.Fields{"tech": tech})
- return fmt.Errorf("unknown-tech-%v", tech)
+ tpIds = append(tpIds, uint32(tpId))
}
+ //for each tpid form a prefix and get the resource instance
+ for _, tpId := range tpIds {
+ prefix := fmt.Sprintf("%s/%d/olt-{%s}/pon-{%d}", tech, tpId, deviceId, IntfId)
+ kvPairs, _ := t.config.ResourceInstanceKVBacked.GetWithPrefix(newCtx, prefix)
+ //check if KvPairs is empty and if not then reconcile the techprofile instance
+ if len(kvPairs) > 0 {
+ for keyPath, kvPair := range kvPairs {
+ if value, err := kvstore.ToByte(kvPair.Value); err == nil {
+ var resInst tp_pb.ResourceInstance
+ if err = proto.Unmarshal(value, &resInst); err != nil {
+ logger.Errorw(ctx, "error-unmarshal-kv-pair", log.Fields{"err": err, "keyPath": keyPath, "value": value})
+ continue
+ } else {
+ if tech == xgspon || tech == xgpon || tech == gpon {
+ if tpInst := t.getTpInstanceFromResourceInstance(ctx, &resInst); tpInst != nil {
+ keySuffixSlice := regexp.MustCompile(t.config.ResourceInstanceKVPathPrefix+"/").Split(keyPath, 2)
+ if len(keySuffixSlice) == 2 {
+ keySuffixFormatRegexp := regexp.MustCompile(`^[a-zA-Z\-]+/[0-9]+/olt-{[a-z0-9\-]+}/pon-{[0-9]+}/onu-{[0-9]+}/uni-{[0-9]+}$`)
+ if !keySuffixFormatRegexp.Match([]byte(keySuffixSlice[1])) {
+ logger.Errorw(ctx, "kv-path-not-confirming-to-format", log.Fields{"kvPath": keySuffixSlice[1]})
+ continue
+ }
+ } else {
+ logger.Errorw(ctx, "kv-instance-key-path-not-in-the-expected-format", log.Fields{"kvPath": keyPath})
+ continue
+ }
+ t.tpInstanceMapLock.Lock()
+ t.tpInstanceMap[keySuffixSlice[1]] = tpInst
+ t.tpInstanceMapLock.Unlock()
+ logger.Infow(ctx, "reconciled-tp-success", log.Fields{"keyPath": keyPath})
+ }
+ } else if tech == epon {
+ if eponTpInst := t.getEponTpInstanceFromResourceInstance(ctx, &resInst); eponTpInst != nil {
+ keySuffixSlice := regexp.MustCompile(t.config.ResourceInstanceKVPathPrefix+"/").Split(keyPath, 2)
+ if len(keySuffixSlice) == 2 {
+ keySuffixFormatRegexp := regexp.MustCompile(`^[a-zA-Z\-]+/[0-9]+/olt-{[a-z0-9\-]+}/pon-{[0-9]+}/onu-{[0-9]+}/uni-{[0-9]+}$`)
+ if !keySuffixFormatRegexp.Match([]byte(keySuffixSlice[1])) {
+ logger.Errorw(ctx, "kv-path-not-confirming-to-format", log.Fields{"kvPath": keySuffixSlice[1]})
+ continue
+ }
+ } else {
+ logger.Errorw(ctx, "kv-instance-key-path-not-in-the-expected-format", log.Fields{"kvPath": keyPath})
+ continue
+ }
+ t.epontpInstanceMapLock.Lock()
+ t.eponTpInstanceMap[keySuffixSlice[1]] = eponTpInst
+ t.epontpInstanceMapLock.Unlock()
+ logger.Debugw(ctx, "reconciled-epon-tp-success", log.Fields{"keyPath": keyPath})
+ }
+ } else {
+ logger.Errorw(ctx, "unknown-tech", log.Fields{"tech": tech})
+ return fmt.Errorf("unknown-tech-%v", tech)
+ }
+ }
+ } else {
+ logger.Errorw(ctx, "error-converting-kv-pair-value-to-byte", log.Fields{"err": err})
+ }
+
+ }
+
+ return nil
+ }
+ }
return nil
}
+
+func (t *TechProfileMgr) buildUpstreamGemPortAttributes(ctx context.Context, tp *tp_pb.TechProfile, resInst *tp_pb.ResourceInstance, usGemPortAttributeList []*tp_pb.GemPortAttributes) []*tp_pb.GemPortAttributes {
+ for index := 0; index < int(tp.NumGemPorts); index++ {
+ usGemPortAttributeList = append(usGemPortAttributeList, &tp_pb.GemPortAttributes{
+ GemportId: resInst.GemportIds[index],
+ MaxQSize: tp.UpstreamGemPortAttributeList[index].MaxQSize,
+ PbitMap: tp.UpstreamGemPortAttributeList[index].PbitMap,
+ AesEncryption: tp.UpstreamGemPortAttributeList[index].AesEncryption,
+ SchedulingPolicy: tp.UpstreamGemPortAttributeList[index].SchedulingPolicy,
+ PriorityQ: tp.UpstreamGemPortAttributeList[index].PriorityQ,
+ Weight: tp.UpstreamGemPortAttributeList[index].Weight,
+ DiscardPolicy: tp.UpstreamGemPortAttributeList[index].DiscardPolicy,
+ DiscardConfig: tp.UpstreamGemPortAttributeList[index].DiscardConfig,
+ })
+ }
+ logger.Debugw(ctx, "Processed upstream GEM port attributes", log.Fields{"count": len(usGemPortAttributeList)})
+ return usGemPortAttributeList
+}
+
+func (t *TechProfileMgr) separateDownstreamGemPortAttributes(ctx context.Context, tp *tp_pb.TechProfile, dsUnicastGemAttributeList, dsMulticastGemAttributeList []*tp_pb.GemPortAttributes) ([]*tp_pb.GemPortAttributes, []*tp_pb.GemPortAttributes) {
+ for _, attr := range tp.DownstreamGemPortAttributeList {
+ if isMulticastGem(attr.IsMulticast) {
+ dsMulticastGemAttributeList = append(dsMulticastGemAttributeList, &tp_pb.GemPortAttributes{
+ MulticastGemId: attr.MulticastGemId,
+ MaxQSize: attr.MaxQSize,
+ PbitMap: attr.PbitMap,
+ AesEncryption: attr.AesEncryption,
+ SchedulingPolicy: attr.SchedulingPolicy,
+ PriorityQ: attr.PriorityQ,
+ Weight: attr.Weight,
+ DiscardPolicy: attr.DiscardPolicy,
+ DiscardConfig: attr.DiscardConfig,
+ IsMulticast: attr.IsMulticast,
+ DynamicAccessControlList: attr.DynamicAccessControlList,
+ StaticAccessControlList: attr.StaticAccessControlList,
+ })
+ } else {
+ dsUnicastGemAttributeList = append(dsUnicastGemAttributeList, &tp_pb.GemPortAttributes{
+ MaxQSize: attr.MaxQSize,
+ PbitMap: attr.PbitMap,
+ AesEncryption: attr.AesEncryption,
+ SchedulingPolicy: attr.SchedulingPolicy,
+ PriorityQ: attr.PriorityQ,
+ Weight: attr.Weight,
+ DiscardPolicy: attr.DiscardPolicy,
+ DiscardConfig: attr.DiscardConfig,
+ })
+ }
+ }
+ logger.Debugw(ctx, "Processed downstream GEM port attributes", log.Fields{
+ "unicastCount": len(dsUnicastGemAttributeList), "multicastCount": len(dsMulticastGemAttributeList)})
+ return dsUnicastGemAttributeList, dsMulticastGemAttributeList
+}
+
+func (t *TechProfileMgr) buildDownstreamGemPortAttributes(ctx context.Context, tp *tp_pb.TechProfile, resInst *tp_pb.ResourceInstance, dsUnicastGemAttributeList, dsMulticastGemAttributeList, dsGemPortAttributeList []*tp_pb.GemPortAttributes) []*tp_pb.GemPortAttributes {
+ for index := 0; index < int(tp.NumGemPorts) && index < len(dsUnicastGemAttributeList); index++ {
+ dsGemPortAttributeList = append(dsGemPortAttributeList, &tp_pb.GemPortAttributes{
+ GemportId: resInst.GemportIds[index],
+ MaxQSize: dsUnicastGemAttributeList[index].MaxQSize,
+ PbitMap: dsUnicastGemAttributeList[index].PbitMap,
+ AesEncryption: dsUnicastGemAttributeList[index].AesEncryption,
+ SchedulingPolicy: dsUnicastGemAttributeList[index].SchedulingPolicy,
+ PriorityQ: dsUnicastGemAttributeList[index].PriorityQ,
+ Weight: dsUnicastGemAttributeList[index].Weight,
+ DiscardPolicy: dsUnicastGemAttributeList[index].DiscardPolicy,
+ DiscardConfig: dsUnicastGemAttributeList[index].DiscardConfig,
+ })
+ }
+ dsGemPortAttributeList = append(dsGemPortAttributeList, dsMulticastGemAttributeList...)
+ logger.Debugw(ctx, "Processed downstream GEM port attributes for final list", log.Fields{"count": len(dsGemPortAttributeList)})
+ return dsGemPortAttributeList
+}