VOL-4077: Improve storage usage on etcd
- Do away with unnecessary data storage on etcd if it can be
reconciled on adapter restart
- For data that needs storage, use lesser footprint if possible
- Use write-through-cache for all data stored on etcd via
resource manager module
- Use ResourceManager module per interface to localize lock
contention per PON port
Change-Id: I21d38216fab195d738a446b3f96a00251569e38b
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index 7f2b53e..4243a26 100644
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -35,12 +35,12 @@
"github.com/golang/protobuf/ptypes"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
- "github.com/opencord/voltha-lib-go/v4/pkg/adapters/adapterif"
- "github.com/opencord/voltha-lib-go/v4/pkg/config"
- "github.com/opencord/voltha-lib-go/v4/pkg/events/eventif"
- flow_utils "github.com/opencord/voltha-lib-go/v4/pkg/flows"
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
- "github.com/opencord/voltha-lib-go/v4/pkg/pmmetrics"
+ "github.com/opencord/voltha-lib-go/v5/pkg/adapters/adapterif"
+ "github.com/opencord/voltha-lib-go/v5/pkg/config"
+ "github.com/opencord/voltha-lib-go/v5/pkg/events/eventif"
+ flow_utils "github.com/opencord/voltha-lib-go/v5/pkg/flows"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
+ "github.com/opencord/voltha-lib-go/v5/pkg/pmmetrics"
"github.com/opencord/voltha-openolt-adapter/internal/pkg/olterrors"
rsrcMgr "github.com/opencord/voltha-openolt-adapter/internal/pkg/resourcemanager"
@@ -82,7 +82,9 @@
flowMgr []*OpenOltFlowMgr
groupMgr *OpenOltGroupMgr
eventMgr *OpenOltEventMgr
- resourceMgr *rsrcMgr.OpenOltResourceMgr
+ resourceMgr []*rsrcMgr.OpenOltResourceMgr
+
+ deviceInfo *oop.DeviceInfo
discOnus sync.Map
onus sync.Map
@@ -528,9 +530,6 @@
_ = olterrors.NewErrAdapter("handle-indication-error", log.Fields{"type": "interface-oper-nni", "device-id": dh.device.Id}, err).Log()
}
}()
- if err := dh.resourceMgr.AddNNIToKVStore(ctx, intfOperInd.GetIntfId()); err != nil {
- logger.Warn(ctx, err)
- }
} else if intfOperInd.GetType() == "pon" {
// TODO: Check what needs to be handled here for When PON PORT down, ONU will be down
// Handle pon port update
@@ -826,29 +825,35 @@
}
func (dh *DeviceHandler) initializeDeviceHandlerModules(ctx context.Context) error {
- deviceInfo, err := dh.populateDeviceInfo(ctx)
+ var err error
+ dh.deviceInfo, err = dh.populateDeviceInfo(ctx)
if err != nil {
return olterrors.NewErrAdapter("populate-device-info-failed", log.Fields{"device-id": dh.device.Id}, err)
}
- dh.totalPonPorts = deviceInfo.GetPonPorts()
- dh.agentPreviouslyConnected = deviceInfo.PreviouslyConnected
+ dh.totalPonPorts = dh.deviceInfo.GetPonPorts()
+ dh.agentPreviouslyConnected = dh.deviceInfo.PreviouslyConnected
- // Instantiate resource manager
- if dh.resourceMgr = rsrcMgr.NewResourceMgr(ctx, dh.device.Id, dh.openOLT.KVStoreAddress, dh.openOLT.KVStoreType, dh.device.Type, deviceInfo, dh.cm.Backend.PathPrefix); dh.resourceMgr == nil {
- return olterrors.ErrResourceManagerInstantiating
- }
-
- dh.groupMgr = NewGroupManager(ctx, dh, dh.resourceMgr)
-
+ dh.resourceMgr = make([]*rsrcMgr.OpenOltResourceMgr, dh.totalPonPorts)
dh.flowMgr = make([]*OpenOltFlowMgr, dh.totalPonPorts)
- for i := range dh.flowMgr {
- // Instantiate flow manager
- if dh.flowMgr[i] = NewFlowManager(ctx, dh, dh.resourceMgr, dh.groupMgr, uint32(i)); dh.flowMgr[i] == nil {
+ var i uint32
+ for i = 0; i < dh.totalPonPorts; i++ {
+ // Instantiate resource manager
+ if dh.resourceMgr[i] = rsrcMgr.NewResourceMgr(ctx, i, dh.device.Id, dh.openOLT.KVStoreAddress, dh.openOLT.KVStoreType, dh.device.Type, dh.deviceInfo, dh.cm.Backend.PathPrefix); dh.resourceMgr[i] == nil {
return olterrors.ErrResourceManagerInstantiating
}
}
-
+ // GroupManager instance is per OLT. But it needs a reference to any instance of resourceMgr to interface with
+ // the KV store to manage mcast group data. Provide the first instance (0th index)
+ if dh.groupMgr = NewGroupManager(ctx, dh, dh.resourceMgr[0]); dh.groupMgr == nil {
+ return olterrors.ErrGroupManagerInstantiating
+ }
+ for i = 0; i < dh.totalPonPorts; i++ {
+ // Instantiate flow manager
+ if dh.flowMgr[i] = NewFlowManager(ctx, dh, dh.resourceMgr[i], dh.groupMgr, i); dh.flowMgr[i] == nil {
+ return olterrors.ErrFlowManagerInstantiating
+ }
+ }
/* TODO: Instantiate Alarm , stats , BW managers */
/* Instantiating Event Manager to handle Alarms and KPIs */
dh.eventMgr = NewEventMgr(dh.EventProxy, dh)
@@ -913,7 +918,7 @@
ports, err := dh.coreProxy.ListDevicePorts(log.WithSpanFromContext(context.Background(), ctx), dh.device.Id)
if err != nil {
- logger.Warnw(ctx, "failed-to-list-ports", log.Fields{"device-id": dh.device.Id, "error": err})
+ logger.Warnw(ctx, "failed-to-list-ports", log.Fields{"device-id": dh.device.Id, "err": err})
continue
}
for _, port := range ports {
@@ -935,10 +940,9 @@
}
logger.Debugw(ctx, "publish-pon-metrics", log.Fields{"pon-port": port.Label})
- //ONU & Gem Stats
- onuGemInfo := dh.flowMgr[intfID].onuGemInfo
- if len(onuGemInfo) != 0 {
- go dh.portStats.collectOnuAndGemStats(ctx, onuGemInfo)
+ onuGemInfoLst := dh.flowMgr[intfID].getOnuGemInfoList()
+ if len(onuGemInfoLst) > 0 {
+ go dh.portStats.collectOnuAndGemStats(ctx, onuGemInfoLst)
}
}
}
@@ -978,6 +982,15 @@
}, nil
}
+// GetInterAdapterTechProfileDownloadMessage fetches the TechProfileDownloadMessage for the caller.
+func (dh *DeviceHandler) GetInterAdapterTechProfileDownloadMessage(ctx context.Context, tpPath string, ponPortNum uint32, onuID uint32, uniID uint32) *ic.InterAdapterTechProfileDownloadMessage {
+ ifID, err := IntfIDFromPonPortNum(ctx, ponPortNum)
+ if err != nil {
+ return nil
+ }
+ return dh.flowMgr[ifID].getTechProfileDownloadMessage(ctx, tpPath, ifID, onuID, uniID)
+}
+
func (dh *DeviceHandler) omciIndication(ctx context.Context, omciInd *oop.OmciIndication) error {
logger.Debugw(ctx, "omci-indication", log.Fields{"intf-id": omciInd.IntfId, "onu-id": omciInd.OnuId, "device-id": dh.device.Id})
var deviceType string
@@ -1038,44 +1051,47 @@
func (dh *DeviceHandler) ProcessInterAdapterMessage(ctx context.Context, msg *ic.InterAdapterMessage) error {
logger.Debugw(ctx, "process-inter-adapter-message", log.Fields{"msgID": msg.Header.Id})
if msg.Header.Type == ic.InterAdapterMessageType_OMCI_REQUEST {
- msgID := msg.Header.Id
- fromTopic := msg.Header.FromTopic
- toTopic := msg.Header.ToTopic
- toDeviceID := msg.Header.ToDeviceId
- proxyDeviceID := msg.Header.ProxyDeviceId
+ return dh.handleInterAdapterOmciMsg(ctx, msg)
+ }
+ return olterrors.NewErrInvalidValue(log.Fields{"inter-adapter-message-type": msg.Header.Type}, nil)
+}
- logger.Debugw(ctx, "omci-request-message-header", log.Fields{"msgID": msgID, "fromTopic": fromTopic, "toTopic": toTopic, "toDeviceID": toDeviceID, "proxyDeviceID": proxyDeviceID})
+func (dh *DeviceHandler) handleInterAdapterOmciMsg(ctx context.Context, msg *ic.InterAdapterMessage) error {
+ msgID := msg.Header.Id
+ fromTopic := msg.Header.FromTopic
+ toTopic := msg.Header.ToTopic
+ toDeviceID := msg.Header.ToDeviceId
+ proxyDeviceID := msg.Header.ProxyDeviceId
- msgBody := msg.GetBody()
+ logger.Debugw(ctx, "omci-request-message-header", log.Fields{"msgID": msgID, "fromTopic": fromTopic, "toTopic": toTopic, "toDeviceID": toDeviceID, "proxyDeviceID": proxyDeviceID})
- omciMsg := &ic.InterAdapterOmciMessage{}
- if err := ptypes.UnmarshalAny(msgBody, omciMsg); err != nil {
- return olterrors.NewErrAdapter("cannot-unmarshal-omci-msg-body", log.Fields{"msgbody": msgBody}, err)
+ msgBody := msg.GetBody()
+
+ omciMsg := &ic.InterAdapterOmciMessage{}
+ if err := ptypes.UnmarshalAny(msgBody, omciMsg); err != nil {
+ return olterrors.NewErrAdapter("cannot-unmarshal-omci-msg-body", log.Fields{"msgbody": msgBody}, err)
+ }
+
+ if omciMsg.GetProxyAddress() == nil {
+ onuDevice, err := dh.coreProxy.GetDevice(log.WithSpanFromContext(context.TODO(), ctx), dh.device.Id, toDeviceID)
+ if err != nil {
+ return olterrors.NewErrNotFound("onu", log.Fields{
+ "device-id": dh.device.Id,
+ "onu-device-id": toDeviceID}, err)
}
-
- if omciMsg.GetProxyAddress() == nil {
- onuDevice, err := dh.coreProxy.GetDevice(log.WithSpanFromContext(context.TODO(), ctx), dh.device.Id, toDeviceID)
- if err != nil {
- return olterrors.NewErrNotFound("onu", log.Fields{
- "device-id": dh.device.Id,
- "onu-device-id": toDeviceID}, err)
- }
- logger.Debugw(ctx, "device-retrieved-from-core", log.Fields{"msgID": msgID, "fromTopic": fromTopic, "toTopic": toTopic, "toDeviceID": toDeviceID, "proxyDeviceID": proxyDeviceID})
- if err := dh.sendProxiedMessage(ctx, onuDevice, omciMsg); err != nil {
- return olterrors.NewErrCommunication("send-failed", log.Fields{
- "device-id": dh.device.Id,
- "onu-device-id": toDeviceID}, err)
- }
- } else {
- logger.Debugw(ctx, "proxy-address-found-in-omci-message", log.Fields{"msgID": msgID, "fromTopic": fromTopic, "toTopic": toTopic, "toDeviceID": toDeviceID, "proxyDeviceID": proxyDeviceID})
- if err := dh.sendProxiedMessage(ctx, nil, omciMsg); err != nil {
- return olterrors.NewErrCommunication("send-failed", log.Fields{
- "device-id": dh.device.Id,
- "onu-device-id": toDeviceID}, err)
- }
+ logger.Debugw(ctx, "device-retrieved-from-core", log.Fields{"msgID": msgID, "fromTopic": fromTopic, "toTopic": toTopic, "toDeviceID": toDeviceID, "proxyDeviceID": proxyDeviceID})
+ if err := dh.sendProxiedMessage(ctx, onuDevice, omciMsg); err != nil {
+ return olterrors.NewErrCommunication("send-failed", log.Fields{
+ "device-id": dh.device.Id,
+ "onu-device-id": toDeviceID}, err)
}
} else {
- return olterrors.NewErrInvalidValue(log.Fields{"inter-adapter-message-type": msg.Header.Type}, nil)
+ logger.Debugw(ctx, "proxy-address-found-in-omci-message", log.Fields{"msgID": msgID, "fromTopic": fromTopic, "toTopic": toTopic, "toDeviceID": toDeviceID, "proxyDeviceID": proxyDeviceID})
+ if err := dh.sendProxiedMessage(ctx, nil, omciMsg); err != nil {
+ return olterrors.NewErrCommunication("send-failed", log.Fields{
+ "device-id": dh.device.Id,
+ "onu-device-id": toDeviceID}, err)
+ }
}
return nil
}
@@ -1129,7 +1145,6 @@
if err := dh.flowMgr[intfID].UpdateOnuInfo(ctx, intfID, uint32(onuID), serialNumber); err != nil {
return olterrors.NewErrAdapter("onu-activate-failed", log.Fields{"onu": onuID, "intf-id": intfID}, err)
}
- // TODO: need resource manager
var pir uint32 = 1000000
Onu := oop.Onu{IntfId: intfID, OnuId: uint32(onuID), SerialNumber: serialNum, Pir: pir, OmccEncryption: dh.openOLT.config.OmccEncryption}
if _, err := dh.Client.ActivateOnu(ctx, &Onu); err != nil {
@@ -1182,7 +1197,7 @@
alarmInd.LosStatus = statusCheckOff
go func() {
if err := dh.eventMgr.onuAlarmIndication(ctx, &alarmInd, onuInCache.(*OnuDevice).deviceID, raisedTs); err != nil {
- logger.Debugw(ctx, "indication-failed", log.Fields{"error": err})
+ logger.Debugw(ctx, "indication-failed", log.Fields{"err": err})
}
}()
}
@@ -1220,7 +1235,7 @@
logger.Debugw(ctx, "creating-new-onu", log.Fields{"sn": sn})
// we need to create a new ChildDevice
ponintfid := onuDiscInd.GetIntfId()
- onuID, err = dh.resourceMgr.GetONUID(ctx, ponintfid)
+ onuID, err = dh.resourceMgr[ponintfid].GetONUID(ctx, ponintfid)
logger.Infow(ctx, "creating-new-onu-got-onu-id", log.Fields{"sn": sn, "onuId": onuID})
@@ -1236,13 +1251,13 @@
if onuDevice, err = dh.coreProxy.ChildDeviceDetected(log.WithSpanFromContext(context.TODO(), ctx), dh.device.Id, int(parentPortNo),
"", int(channelID), string(onuDiscInd.SerialNumber.GetVendorId()), sn, int64(onuID)); err != nil {
dh.discOnus.Delete(sn)
- dh.resourceMgr.FreeonuID(ctx, ponintfid, []uint32{onuID}) // NOTE I'm not sure this method is actually cleaning up the right thing
+ dh.resourceMgr[ponintfid].FreeonuID(ctx, ponintfid, []uint32{onuID}) // NOTE I'm not sure this method is actually cleaning up the right thing
return olterrors.NewErrAdapter("core-proxy-child-device-detected-failed", log.Fields{
"pon-intf-id": ponintfid,
"serial-number": sn}, err)
}
if err := dh.eventMgr.OnuDiscoveryIndication(ctx, onuDiscInd, dh.device.Id, onuDevice.Id, onuID, sn, time.Now().Unix()); err != nil {
- logger.Warnw(ctx, "discovery-indication-failed", log.Fields{"error": err})
+ logger.Warnw(ctx, "discovery-indication-failed", log.Fields{"err": err})
}
logger.Infow(ctx, "onu-child-device-added",
log.Fields{"onuDevice": onuDevice,
@@ -1341,7 +1356,7 @@
}
if onuInd.OperState == "down" && onuInd.FailReason != oop.OnuIndication_ONU_ACTIVATION_FAIL_REASON_NONE {
if err := dh.eventMgr.onuActivationIndication(ctx, onuActivationFailEvent, onuInd, dh.device.Id, time.Now().Unix()); err != nil {
- logger.Warnw(ctx, "onu-activation-indication-reporting-failed", log.Fields{"error": err})
+ logger.Warnw(ctx, "onu-activation-indication-reporting-failed", log.Fields{"err": err})
}
}
if err := dh.updateOnuStates(ctx, onuDevice, onuInd); err != nil {
@@ -1614,7 +1629,7 @@
//get the child device for the parent device
onuDevices, err := dh.coreProxy.GetChildDevices(log.WithSpanFromContext(context.TODO(), ctx), dh.device.Id)
if err != nil {
- logger.Errorw(ctx, "failed-to-get-child-devices-information", log.Fields{"device-id": dh.device.Id, "error": err})
+ logger.Errorw(ctx, "failed-to-get-child-devices-information", log.Fields{"device-id": dh.device.Id, "err": err})
}
if onuDevices != nil {
for _, onuDevice := range onuDevices.Items {
@@ -1679,57 +1694,29 @@
logger.Debugw(ctx, "failed-to-remove-tech-profile-instance-for-onu", log.Fields{"onu-id": onu.OnuID})
}
logger.Debugw(ctx, "deleted-tech-profile-instance-for-onu", log.Fields{"onu-id": onu.OnuID})
- tpIDList := dh.resourceMgr.GetTechProfileIDForOnu(ctx, onu.IntfID, onu.OnuID, uniID)
+ tpIDList := dh.resourceMgr[onu.IntfID].GetTechProfileIDForOnu(ctx, onu.IntfID, onu.OnuID, uniID)
for _, tpID := range tpIDList {
- if err = dh.resourceMgr.RemoveMeterInfoForOnu(ctx, "upstream", onu.IntfID, onu.OnuID, uniID, tpID); err != nil {
+ if err = dh.resourceMgr[onu.IntfID].RemoveMeterInfoForOnu(ctx, "upstream", onu.IntfID, onu.OnuID, uniID, tpID); err != nil {
logger.Debugw(ctx, "failed-to-remove-meter-id-for-onu-upstream", log.Fields{"onu-id": onu.OnuID})
}
logger.Debugw(ctx, "removed-meter-id-for-onu-upstream", log.Fields{"onu-id": onu.OnuID})
- if err = dh.resourceMgr.RemoveMeterInfoForOnu(ctx, "downstream", onu.IntfID, onu.OnuID, uniID, tpID); err != nil {
+ if err = dh.resourceMgr[onu.IntfID].RemoveMeterInfoForOnu(ctx, "downstream", onu.IntfID, onu.OnuID, uniID, tpID); err != nil {
logger.Debugw(ctx, "failed-to-remove-meter-id-for-onu-downstream", log.Fields{"onu-id": onu.OnuID})
}
logger.Debugw(ctx, "removed-meter-id-for-onu-downstream", log.Fields{"onu-id": onu.OnuID})
}
- dh.resourceMgr.FreePONResourcesForONU(ctx, onu.IntfID, onu.OnuID, uniID)
- if err = dh.resourceMgr.RemoveTechProfileIDsForOnu(ctx, onu.IntfID, onu.OnuID, uniID); err != nil {
+ dh.resourceMgr[onu.IntfID].FreePONResourcesForONU(ctx, onu.IntfID, onu.OnuID, uniID)
+ if err = dh.resourceMgr[onu.IntfID].RemoveTechProfileIDsForOnu(ctx, onu.IntfID, onu.OnuID, uniID); err != nil {
logger.Debugw(ctx, "failed-to-remove-tech-profile-id-for-onu", log.Fields{"onu-id": onu.OnuID})
}
logger.Debugw(ctx, "removed-tech-profile-id-for-onu", log.Fields{"onu-id": onu.OnuID})
- if err = dh.resourceMgr.DeletePacketInGemPortForOnu(ctx, onu.IntfID, onu.OnuID, port); err != nil {
+ if err = dh.resourceMgr[onu.IntfID].DeletePacketInGemPortForOnu(ctx, onu.IntfID, onu.OnuID, port); err != nil {
logger.Debugw(ctx, "failed-to-remove-gemport-pkt-in", log.Fields{"intfid": onu.IntfID, "onuid": onu.OnuID, "uniId": uniID})
}
- if err = dh.resourceMgr.RemoveAllFlowsForIntfOnuUniKey(ctx, onu.IntfID, int32(onu.OnuID), int32(uniID)); err != nil {
- logger.Debugw(ctx, "failed-to-remove-flow-for", log.Fields{"intfid": onu.IntfID, "onuid": onu.OnuID, "uniId": uniID})
- }
}
return nil
}
-func (dh *DeviceHandler) clearNNIData(ctx context.Context) error {
- nniUniID := -1
- nniOnuID := -1
-
- if dh.resourceMgr == nil {
- return olterrors.NewErrNotFound("resource-manager", log.Fields{"device-id": dh.device.Id}, nil)
- }
- //Free the flow-ids for the NNI port
- nni, err := dh.resourceMgr.GetNNIFromKVStore(ctx)
- if err != nil {
- return olterrors.NewErrPersistence("get", "nni", 0, nil, err)
- }
- logger.Debugw(ctx, "nni-", log.Fields{"nni": nni})
- for _, nniIntfID := range nni {
- dh.resourceMgr.RemoveResourceMap(ctx, nniIntfID, int32(nniOnuID), int32(nniUniID))
- _ = dh.resourceMgr.RemoveAllFlowsForIntfOnuUniKey(ctx, nniIntfID, -1, -1)
-
- }
- if err = dh.resourceMgr.DelNNiFromKVStore(ctx); err != nil {
- return olterrors.NewErrPersistence("clear", "nni", 0, nil, err)
- }
-
- return nil
-}
-
// DeleteDevice deletes the device instance from openolt handler array. Also clears allocated resource manager resources. Also reboots the OLT hardware!
func (dh *DeviceHandler) DeleteDevice(ctx context.Context, device *voltha.Device) error {
logger.Debug(ctx, "function-entry-delete-device")
@@ -1767,13 +1754,8 @@
if dh.resourceMgr != nil {
var ponPort uint32
for ponPort = 0; ponPort < dh.totalPonPorts; ponPort++ {
- var onuGemData []rsrcMgr.OnuGemInfo
- err := dh.resourceMgr.ResourceMgrs[ponPort].GetOnuGemInfo(ctx, ponPort, &onuGemData)
- if err != nil {
- _ = olterrors.NewErrNotFound("onu", log.Fields{
- "device-id": dh.device.Id,
- "pon-port": ponPort}, err).Log()
- }
+ var err error
+ onuGemData := dh.flowMgr[ponPort].getOnuGemInfoList()
for i, onu := range onuGemData {
onuID := make([]uint32, 1)
logger.Debugw(ctx, "onu-data", log.Fields{"onu": onu})
@@ -1782,31 +1764,22 @@
}
// Clear flowids for gem cache.
for _, gem := range onu.GemPorts {
- dh.resourceMgr.DeleteFlowIDsForGem(ctx, ponPort, gem)
+ dh.resourceMgr[ponPort].DeleteFlowIDsForGem(ctx, ponPort, gem)
}
onuID[0] = onu.OnuID
- dh.resourceMgr.FreeonuID(ctx, ponPort, onuID)
+ dh.resourceMgr[ponPort].FreeonuID(ctx, ponPort, onuID)
+ err = dh.resourceMgr[ponPort].DelOnuGemInfo(ctx, ponPort, onu.OnuID)
+ if err != nil {
+ logger.Errorw(ctx, "failed-to-update-onugem-info", log.Fields{"intfid": ponPort, "onugeminfo": onuGemData})
+ }
}
- dh.resourceMgr.DeleteIntfIDGempMapPath(ctx, ponPort)
- onuGemData = nil
- err = dh.resourceMgr.DelOnuGemInfoForIntf(ctx, ponPort)
- if err != nil {
- logger.Errorw(ctx, "failed-to-update-onugem-info", log.Fields{"intfid": ponPort, "onugeminfo": onuGemData})
- }
+ /* Clear the resource pool for each PON port in the background */
+ go func(ponPort uint32) {
+ if err := dh.resourceMgr[ponPort].Delete(ctx, ponPort); err != nil {
+ logger.Debug(ctx, err)
+ }
+ }(ponPort)
}
- /* Clear the flows from KV store associated with NNI port.
- There are mostly trap rules from NNI port (like LLDP)
- */
- if err := dh.clearNNIData(ctx); err != nil {
- logger.Errorw(ctx, "failed-to-clear-data-for-NNI-port", log.Fields{"device-id": dh.device.Id})
- }
-
- /* Clear the resource pool for each PON port in the background */
- go func() {
- if err := dh.resourceMgr.Delete(ctx); err != nil {
- logger.Debug(ctx, err)
- }
- }()
}
/*Delete ONU map for the device*/
@@ -2190,56 +2163,48 @@
}
onu := &oop.Onu{IntfId: intfID, OnuId: onuID, SerialNumber: sn}
+ //clear PON resources associated with ONU
+ onuGem, err := dh.resourceMgr[intfID].GetOnuGemInfo(ctx, intfID, onuID)
+ if err != nil || onuGem == nil || onuGem.OnuID != onuID {
+ logger.Warnw(ctx, "failed-to-get-onu-info-for-pon-port", log.Fields{
+ "device-id": dh.device.Id,
+ "intf-id": intfID,
+ "onuID": onuID,
+ "err": err})
+ } else {
+ logger.Debugw(ctx, "onu-data", log.Fields{"onu": onu})
+ if err := dh.clearUNIData(ctx, onuGem); err != nil {
+ logger.Warnw(ctx, "failed-to-clear-uni-data-for-onu", log.Fields{
+ "device-id": dh.device.Id,
+ "onu-device": onu,
+ "err": err})
+ }
+ // Clear flowids for gem cache.
+ for _, gem := range onuGem.GemPorts {
+ dh.resourceMgr[intfID].DeleteFlowIDsForGem(ctx, intfID, gem)
+ }
+ err := dh.resourceMgr[intfID].DelOnuGemInfo(ctx, intfID, onuID)
+ if err != nil {
+ logger.Warnw(ctx, "persistence-update-onu-gem-info-failed", log.Fields{
+ "intf-id": intfID,
+ "onu-device": onu,
+ "onu-gem": onuGem,
+ "err": err})
+ //Not returning error on cleanup.
+ }
+ logger.Debugw(ctx, "removed-onu-gem-info", log.Fields{"intf": intfID, "onu-device": onu, "onugem": onuGem})
+ dh.resourceMgr[intfID].FreeonuID(ctx, intfID, []uint32{onuGem.OnuID})
+ }
+ dh.onus.Delete(onuKey)
+ dh.discOnus.Delete(onuSn)
+
+ // Now clear the ONU on the OLT
if _, err := dh.Client.DeleteOnu(log.WithSpanFromContext(context.Background(), ctx), onu); err != nil {
return olterrors.NewErrAdapter("failed-to-delete-onu", log.Fields{
"device-id": dh.device.Id,
"onu-id": onuID}, err).Log()
}
- //clear PON resources associated with ONU
- var onuGemData []rsrcMgr.OnuGemInfo
- if onuMgr, ok := dh.resourceMgr.ResourceMgrs[intfID]; !ok {
- logger.Warnw(ctx, "failed-to-get-resource-manager-for-interface-Id", log.Fields{
- "device-id": dh.device.Id,
- "intf-id": intfID})
- } else {
- if err := onuMgr.GetOnuGemInfo(ctx, intfID, &onuGemData); err != nil {
- logger.Warnw(ctx, "failed-to-get-onu-info-for-pon-port", log.Fields{
- "device-id": dh.device.Id,
- "intf-id": intfID,
- "error": err})
- } else {
- for i, onu := range onuGemData {
- if onu.OnuID == onuID && onu.SerialNumber == onuSn {
- logger.Debugw(ctx, "onu-data", log.Fields{"onu": onu})
- if err := dh.clearUNIData(ctx, &onuGemData[i]); err != nil {
- logger.Warnw(ctx, "failed-to-clear-uni-data-for-onu", log.Fields{
- "device-id": dh.device.Id,
- "onu-device": onu,
- "error": err})
- }
- // Clear flowids for gem cache.
- for _, gem := range onu.GemPorts {
- dh.resourceMgr.DeleteFlowIDsForGem(ctx, intfID, gem)
- }
- onuGemData = append(onuGemData[:i], onuGemData[i+1:]...)
- err := onuMgr.AddOnuGemInfo(ctx, intfID, onuGemData)
- if err != nil {
- logger.Warnw(ctx, "persistence-update-onu-gem-info-failed", log.Fields{
- "intf-id": intfID,
- "onu-device": onu,
- "onu-gem": onuGemData,
- "error": err})
- //Not returning error on cleanup.
- }
- logger.Debugw(ctx, "removed-onu-gem-info", log.Fields{"intf": intfID, "onu-device": onu, "onugem": onuGemData})
- dh.resourceMgr.FreeonuID(ctx, intfID, []uint32{onu.OnuID})
- break
- }
- }
- }
- }
- dh.onus.Delete(onuKey)
- dh.discOnus.Delete(onuSn)
+
return nil
}
@@ -2334,7 +2299,7 @@
/*
resp, err = dh.Client.GetValue(ctx, valueparam)
if err != nil {
- logger.Errorw("error-while-getValue", log.Fields{"DeviceID": dh.device, "onu-id": onuid, "error": err})
+ logger.Errorw("error-while-getValue", log.Fields{"DeviceID": dh.device, "onu-id": onuid, "err": err})
return nil, err
}
*/
@@ -2429,6 +2394,7 @@
// Step2 : Push the McastFlowOrGroupControlBlock to appropriate channel
// Step3 : Wait on response channel for response
// Step4 : Return error value
+ startTime := time.Now()
logger.Debugw(ctx, "process-flow-or-group", log.Fields{"flow": flow, "group": group, "action": action})
errChan := make(chan error)
var groupID uint32
@@ -2451,7 +2417,7 @@
dh.incomingMcastFlowOrGroup[groupID%MaxNumOfGroupHandlerChannels] <- mcastFlowOrGroupCb
// Wait for handler to return error value
err := <-errChan
- logger.Debugw(ctx, "process-flow-or-group--received-resp", log.Fields{"flow": flow, "group": group, "action": action, "err": err})
+ logger.Debugw(ctx, "process-flow-or-group--received-resp", log.Fields{"err": err, "totalTimeInSeconds": time.Since(startTime).Milliseconds()})
return err
}