VOL-4077: Improve storage usage on etcd
- the onu adapter will now receive the tp instance on the
inter-container kafka message and need not reach the etcd store
to fetch it.
- on reconcile, the onu adapter need to go to the kv store to
fetch the tp instance, but request the tp instance via a new
API towards openolt adapter which in-turn will fetch from the
cache.
- re-org the code in onu-metrics-manager to restore pm-data
on reconcile to avoid panics by accessing uninitialzed data
if ani-fsm were to try adding gem port for monitoring
before pm-data is initialized properly.
Change-Id: I82a6de2772155f6e08390b671fe26d692dd02c99
diff --git a/internal/pkg/onuadaptercore/onu_uni_tp.go b/internal/pkg/onuadaptercore/onu_uni_tp.go
index dff13d4..246bd94 100644
--- a/internal/pkg/onuadaptercore/onu_uni_tp.go
+++ b/internal/pkg/onuadaptercore/onu_uni_tp.go
@@ -19,21 +19,15 @@
import (
"context"
- "encoding/json"
- "errors"
"fmt"
+ "github.com/opencord/voltha-protos/v4/go/tech_profile"
"strconv"
"strings"
"sync"
- "github.com/opencord/voltha-lib-go/v4/pkg/db"
- "github.com/opencord/voltha-lib-go/v4/pkg/db/kvstore"
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
- tp "github.com/opencord/voltha-lib-go/v4/pkg/techprofile"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
)
-const cBasePathTechProfileKVStore = "%s/technology_profiles"
-
//definitions for TechProfileProcessing - copied from OltAdapter:openolt_flowmgr.go
// could perhaps be defined more globally
const (
@@ -98,7 +92,6 @@
baseDeviceHandler *deviceHandler
deviceID string
tpProcMutex sync.RWMutex
- techProfileKVStore *db.Backend
chTpConfigProcessingStep chan uint8
mapUniTpIndication map[uniTP]*tTechProfileIndication //use pointer values to ease assignments to the map
mapPonAniConfig map[uniTP]*tcontGemList //per UNI: use pointer values to ease assignments to the map
@@ -136,12 +129,6 @@
onuTP.tpProfileExists = make(map[uniTP]bool)
onuTP.tpProfileResetting = make(map[uniTP]bool)
onuTP.mapRemoveGemEntry = make(map[uniTP]*gemPortParamStruct)
- baseKvStorePath := fmt.Sprintf(cBasePathTechProfileKVStore, aDeviceHandler.pOpenOnuAc.cm.Backend.PathPrefix)
- onuTP.techProfileKVStore = aDeviceHandler.setBackend(ctx, baseKvStorePath)
- if onuTP.techProfileKVStore == nil {
- logger.Errorw(ctx, "Can't access techProfileKVStore - no backend connection to service",
- log.Fields{"device-id": aDeviceHandler.deviceID, "service": baseKvStorePath})
- }
return &onuTP
}
@@ -175,7 +162,7 @@
// but take care on sequential background processing when needed (logical dependencies)
// use waitForTimeoutOrCompletion(ctx, chTpConfigProcessingStep, processingStep) for internal synchronization
func (onuTP *onuUniTechProf) configureUniTp(ctx context.Context,
- aUniID uint8, aPathString string, wg *sync.WaitGroup) {
+ aUniID uint8, aPathString string, tpInst tech_profile.TechProfileInstance, wg *sync.WaitGroup) {
defer wg.Done() //always decrement the waitGroup on return
logger.Debugw(ctx, "configure the Uni according to TpPath", log.Fields{
"device-id": onuTP.deviceID, "uni-id": aUniID, "path": aPathString})
@@ -185,14 +172,6 @@
logger.Errorw(ctx, "error-extracting-tp-id-from-tp-path", log.Fields{"device-id": onuTP.deviceID, "uni-id": aUniID, "path": aPathString})
return
}
- if onuTP.techProfileKVStore == nil {
- logger.Errorw(ctx, "techProfileKVStore not set - abort",
- log.Fields{"device-id": onuTP.deviceID})
- onuTP.mutexTPState.Lock()
- defer onuTP.mutexTPState.Unlock()
- onuTP.procResult[uniTpKey] = errors.New("techProfile config aborted: techProfileKVStore not set")
- return
- }
//ensure that the given uniID is available (configured) in the UniPort class (used for OMCI entities)
var pCurrentUniPort *onuUniPort
@@ -243,7 +222,7 @@
processingStep++
*/
- go onuTP.readAniSideConfigFromTechProfile(ctx, aUniID, tpID, aPathString, processingStep)
+ go onuTP.readAniSideConfigFromTechProfile(ctx, aUniID, tpID, aPathString, tpInst, processingStep)
if !onuTP.waitForTimeoutOrCompletion(ctx, onuTP.chTpConfigProcessingStep, processingStep) {
//timeout or error detected
onuTP.mutexTPState.RLock()
@@ -330,9 +309,8 @@
/* internal methods *********************/
// nolint: gocyclo
func (onuTP *onuUniTechProf) readAniSideConfigFromTechProfile(
- ctx context.Context, aUniID uint8, aTpID uint8, aPathString string, aProcessingStep uint8) {
- var tpInst tp.TechProfile
-
+ ctx context.Context, aUniID uint8, aTpID uint8, aPathString string, tpInst tech_profile.TechProfileInstance, aProcessingStep uint8) {
+ var err error
//store profile type and identifier for later usage within the OMCI identifier and possibly ME setup
//pathstring is defined to be in the form of <ProfType>/<profID>/<Interface/../Identifier>
subStringSlice := strings.Split(aPathString, "/")
@@ -387,47 +365,16 @@
"profType": onuTP.mapUniTpIndication[uniTPKey].techProfileType,
"profID": onuTP.mapUniTpIndication[uniTPKey].techProfileID})
- Value, err := onuTP.techProfileKVStore.Get(ctx, aPathString)
- if err == nil {
- if Value != nil {
- logger.Debugw(ctx, "tech-profile read",
- log.Fields{"Key": Value.Key, "device-id": onuTP.deviceID})
- tpTmpBytes, _ := kvstore.ToByte(Value.Value)
-
- if err = json.Unmarshal(tpTmpBytes, &tpInst); err != nil {
- logger.Errorw(ctx, "TechProf - Failed to unmarshal tech-profile into tpInst",
- log.Fields{"error": err, "device-id": onuTP.deviceID})
- onuTP.chTpConfigProcessingStep <- 0 //error indication
- return
- }
- logger.Debugw(ctx, "TechProf - tpInst", log.Fields{"tpInst": tpInst})
- // access examples
- logger.Debugw(ctx, "TechProf content", log.Fields{"Name": tpInst.Name,
- "MaxGemPayloadSize": tpInst.InstanceCtrl.MaxGemPayloadSize,
- "DownstreamGemDiscardmaxThreshold": tpInst.DownstreamGemPortAttributeList[0].DiscardConfig.MaxThreshold})
- } else {
- logger.Errorw(ctx, "No tech-profile found",
- log.Fields{"path": aPathString, "device-id": onuTP.deviceID})
- onuTP.chTpConfigProcessingStep <- 0 //error indication
- return
- }
- } else {
- logger.Errorw(ctx, "kvstore-get failed for path",
- log.Fields{"path": aPathString, "device-id": onuTP.deviceID})
- onuTP.chTpConfigProcessingStep <- 0 //error indication
- return
- }
-
//default start with 1Tcont profile, later perhaps extend to MultiTcontMultiGem
localMapGemPortParams := make(map[uint16]*gemPortParamStruct)
onuTP.mapPonAniConfig[uniTPKey] = &tcontGemList{tcontParamStruct{}, localMapGemPortParams}
//note: the code is currently restricted to one TCcont per Onu (index [0])
//get the relevant values from the profile and store to mapPonAniConfig
- onuTP.mapPonAniConfig[uniTPKey].tcontParams.allocID = uint16(tpInst.UsScheduler.AllocID)
+ onuTP.mapPonAniConfig[uniTPKey].tcontParams.allocID = uint16(tpInst.UsScheduler.AllocId)
//maybe tCont scheduling not (yet) needed - just to basically have it for future
// (would only be relevant in case of ONU-2G QOS configuration flexibility)
- if tpInst.UsScheduler.QSchedPolicy == "StrictPrio" {
+ if tpInst.UsScheduler.QSchedPolicy == tech_profile.SchedulingPolicy_StrictPriority {
onuTP.mapPonAniConfig[uniTPKey].tcontParams.schedPolicy = 1 //for the moment fixed value acc. G.988 //TODO: defines!
} else {
//default profile defines "Hybrid" - which probably comes down to WRR with some weigthts for SP
@@ -446,37 +393,37 @@
loGemPortRead = true
}
//for all GemPorts we need to extend the mapGemPortParams
- onuTP.mapPonAniConfig[uniTPKey].mapGemPortParams[uint16(content.GemportID)] = &gemPortParamStruct{}
+ onuTP.mapPonAniConfig[uniTPKey].mapGemPortParams[uint16(content.GemportId)] = &gemPortParamStruct{}
- onuTP.mapPonAniConfig[uniTPKey].mapGemPortParams[uint16(content.GemportID)].gemPortID =
- uint16(content.GemportID)
+ onuTP.mapPonAniConfig[uniTPKey].mapGemPortParams[uint16(content.GemportId)].gemPortID =
+ uint16(content.GemportId)
//direction can be correlated later with Downstream list,
// for now just assume bidirectional (upstream never exists alone)
- onuTP.mapPonAniConfig[uniTPKey].mapGemPortParams[uint16(content.GemportID)].direction = 3 //as defined in G.988
+ onuTP.mapPonAniConfig[uniTPKey].mapGemPortParams[uint16(content.GemportId)].direction = 3 //as defined in G.988
// expected Prio-Queue values 0..7 with 7 for highest PrioQueue, QueueIndex=Prio = 0..7
- if content.PriorityQueue > 7 {
+ if content.PriorityQ > 7 {
logger.Errorw(ctx, "PonAniConfig reject on GemPortList - PrioQueue value invalid",
- log.Fields{"device-id": onuTP.deviceID, "index": pos, "PrioQueue": content.PriorityQueue})
+ log.Fields{"device-id": onuTP.deviceID, "index": pos, "PrioQueue": content.PriorityQ})
//remove PonAniConfig as done so far, delete map should be safe, even if not existing
delete(onuTP.mapPonAniConfig, uniTPKey)
onuTP.chTpConfigProcessingStep <- 0 //error indication
return
}
- onuTP.mapPonAniConfig[uniTPKey].mapGemPortParams[uint16(content.GemportID)].prioQueueIndex =
- uint8(content.PriorityQueue)
- onuTP.mapPonAniConfig[uniTPKey].mapGemPortParams[uint16(content.GemportID)].pbitString =
+ onuTP.mapPonAniConfig[uniTPKey].mapGemPortParams[uint16(content.GemportId)].prioQueueIndex =
+ uint8(content.PriorityQ)
+ onuTP.mapPonAniConfig[uniTPKey].mapGemPortParams[uint16(content.GemportId)].pbitString =
strings.TrimPrefix(content.PbitMap, binaryStringPrefix)
if content.AesEncryption == "True" {
- onuTP.mapPonAniConfig[uniTPKey].mapGemPortParams[uint16(content.GemportID)].gemPortEncState = 1
+ onuTP.mapPonAniConfig[uniTPKey].mapGemPortParams[uint16(content.GemportId)].gemPortEncState = 1
} else {
- onuTP.mapPonAniConfig[uniTPKey].mapGemPortParams[uint16(content.GemportID)].gemPortEncState = 0
+ onuTP.mapPonAniConfig[uniTPKey].mapGemPortParams[uint16(content.GemportId)].gemPortEncState = 0
}
- onuTP.mapPonAniConfig[uniTPKey].mapGemPortParams[uint16(content.GemportID)].discardPolicy =
- content.DiscardPolicy
- onuTP.mapPonAniConfig[uniTPKey].mapGemPortParams[uint16(content.GemportID)].queueSchedPolicy =
- content.SchedulingPolicy
+ onuTP.mapPonAniConfig[uniTPKey].mapGemPortParams[uint16(content.GemportId)].discardPolicy =
+ content.DiscardPolicy.String()
+ onuTP.mapPonAniConfig[uniTPKey].mapGemPortParams[uint16(content.GemportId)].queueSchedPolicy =
+ content.SchedulingPolicy.String()
//'GemWeight' looks strange in default profile, for now we just copy the weight to first queue
- onuTP.mapPonAniConfig[uniTPKey].mapGemPortParams[uint16(content.GemportID)].queueWeight =
+ onuTP.mapPonAniConfig[uniTPKey].mapGemPortParams[uint16(content.GemportId)].queueWeight =
uint8(content.Weight)
}
@@ -500,7 +447,7 @@
}
logger.Infow(ctx, "Gem Port is multicast", log.Fields{"isMulticast": isMulticast})
if isMulticast {
- mcastGemID := uint16(downstreamContent.McastGemID)
+ mcastGemID := uint16(downstreamContent.MulticastGemId)
_, existing := onuTP.mapPonAniConfig[uniTPKey].mapGemPortParams[mcastGemID]
if existing {
//GEM port was previously configured, avoid setting multicast attributes
@@ -526,32 +473,32 @@
}
// expected Prio-Queue values 0..7 with 7 for highest PrioQueue, QueueIndex=Prio = 0..7
- if downstreamContent.PriorityQueue > 7 {
+ if downstreamContent.PriorityQ > 7 {
logger.Errorw(ctx, "PonAniConfig reject on GemPortList - PrioQueue value invalid",
- log.Fields{"device-id": onuTP.deviceID, "index": mcastGemID, "PrioQueue": downstreamContent.PriorityQueue})
+ log.Fields{"device-id": onuTP.deviceID, "index": mcastGemID, "PrioQueue": downstreamContent.PriorityQ})
//remove PonAniConfig as done so far, delete map should be safe, even if not existing
delete(onuTP.mapPonAniConfig, uniTPKey)
onuTP.chTpConfigProcessingStep <- 0 //error indication
return
}
onuTP.mapPonAniConfig[uniTPKey].mapGemPortParams[mcastGemID].prioQueueIndex =
- uint8(downstreamContent.PriorityQueue)
+ uint8(downstreamContent.PriorityQ)
onuTP.mapPonAniConfig[uniTPKey].mapGemPortParams[mcastGemID].pbitString =
strings.TrimPrefix(downstreamContent.PbitMap, binaryStringPrefix)
onuTP.mapPonAniConfig[uniTPKey].mapGemPortParams[mcastGemID].discardPolicy =
- downstreamContent.DiscardPolicy
+ downstreamContent.DiscardPolicy.String()
onuTP.mapPonAniConfig[uniTPKey].mapGemPortParams[mcastGemID].queueSchedPolicy =
- downstreamContent.SchedulingPolicy
+ downstreamContent.SchedulingPolicy.String()
//'GemWeight' looks strange in default profile, for now we just copy the weight to first queue
onuTP.mapPonAniConfig[uniTPKey].mapGemPortParams[mcastGemID].queueWeight =
uint8(downstreamContent.Weight)
onuTP.mapPonAniConfig[uniTPKey].mapGemPortParams[mcastGemID].isMulticast = isMulticast
onuTP.mapPonAniConfig[uniTPKey].mapGemPortParams[mcastGemID].multicastGemPortID =
- uint16(downstreamContent.McastGemID)
- onuTP.mapPonAniConfig[uniTPKey].mapGemPortParams[mcastGemID].staticACL = downstreamContent.SControlList
- onuTP.mapPonAniConfig[uniTPKey].mapGemPortParams[mcastGemID].dynamicACL = downstreamContent.DControlList
+ uint16(downstreamContent.MulticastGemId)
+ onuTP.mapPonAniConfig[uniTPKey].mapGemPortParams[mcastGemID].staticACL = downstreamContent.StaticAccessControlList
+ onuTP.mapPonAniConfig[uniTPKey].mapGemPortParams[mcastGemID].dynamicACL = downstreamContent.DynamicAccessControlList
}
}
}