VOL-4446: Fix etcd stale data issue post device delete
Change-Id: I7022a632342bc2868ca52024a15d502313852266
diff --git a/VERSION b/VERSION
index fa8da20..6284111 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-3.5.8
+3.5.9
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index af93713..3f23fb4 100644
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -106,6 +106,8 @@
adapterPreviouslyConnected bool
agentPreviouslyConnected bool
+
+ isDeviceDeletionInProgress bool
}
//OnuDevice represents ONU related info
@@ -1526,10 +1528,15 @@
//UpdateFlowsIncrementally updates the device flow
func (dh *DeviceHandler) UpdateFlowsIncrementally(ctx context.Context, device *voltha.Device, flows *of.FlowChanges, groups *of.FlowGroupChanges, flowMetadata *voltha.FlowMetadata) error {
logger.Debugw(ctx, "received-incremental-flowupdate-in-device-handler", log.Fields{"device-id": device.Id, "flows": flows, "groups": groups, "flowMetadata": flowMetadata})
-
var err error
var errorsList []error
+ if dh.getDeviceDeletionInProgressFlag() {
+ // The device itself is going to be reset as part of deletion. So nothing to be done.
+ logger.Infow(ctx, "device-deletion-in-progress--not-handling-flows-or-groups", log.Fields{"device-id": device.Id})
+ return nil
+ }
+
if flows != nil {
for _, flow := range flows.ToRemove.Items {
ponIf := dh.getPonIfFromFlow(flow)
@@ -1735,6 +1742,9 @@
This clears up flow data and also resource map data for various
other pon resources like alloc_id and gemport_id
*/
+
+ dh.setDeviceDeletionInProgressFlag(true)
+
dh.cleanupDeviceResources(ctx)
logger.Debugw(ctx, "removed-device-from-Resource-manager-KV-store", log.Fields{"device-id": dh.device.Id})
// Stop the Stats collector
@@ -1769,22 +1779,14 @@
var err error
onuGemData := dh.flowMgr[ponPort].getOnuGemInfoList(ctx)
for i, onu := range onuGemData {
- onuID := make([]uint32, 1)
logger.Debugw(ctx, "onu-data", log.Fields{"onu": onu})
if err = dh.clearUNIData(ctx, &onuGemData[i]); err != nil {
logger.Errorw(ctx, "failed-to-clear-data-for-onu", log.Fields{"onu-device": onu})
}
- // Clear flowids for gem cache.
- for _, gem := range onu.GemPorts {
- dh.resourceMgr[ponPort].DeleteFlowIDsForGem(ctx, ponPort, gem)
- }
- onuID[0] = onu.OnuID
- dh.resourceMgr[ponPort].FreeonuID(ctx, ponPort, onuID)
- err = dh.resourceMgr[ponPort].DelOnuGemInfo(ctx, ponPort, onu.OnuID)
- if err != nil {
- logger.Errorw(ctx, "failed-to-update-onugem-info", log.Fields{"intfid": ponPort, "onugeminfo": onuGemData})
- }
}
+ _ = dh.resourceMgr[ponPort].DeleteAllFlowIDsForGemForIntf(ctx, ponPort)
+ _ = dh.resourceMgr[ponPort].DeleteAllOnuGemInfoForIntf(ctx, ponPort)
+
if err := dh.resourceMgr[ponPort].Delete(ctx, ponPort); err != nil {
logger.Debug(ctx, err)
}
@@ -2645,3 +2647,15 @@
return errResp(extension.GetValueResponse_ERROR, extension.GetValueResponse_REASON_UNDEFINED)
}
+
+func (dh *DeviceHandler) setDeviceDeletionInProgressFlag(flag bool) {
+ dh.lockDevice.Lock()
+ defer dh.lockDevice.Unlock()
+ dh.isDeviceDeletionInProgress = flag
+}
+
+func (dh *DeviceHandler) getDeviceDeletionInProgressFlag() bool {
+ dh.lockDevice.RLock()
+ defer dh.lockDevice.RUnlock()
+ return dh.isDeviceDeletionInProgress
+}
diff --git a/internal/pkg/resourcemanager/resourcemanager.go b/internal/pkg/resourcemanager/resourcemanager.go
index 72b7d54..15ef901 100755
--- a/internal/pkg/resourcemanager/resourcemanager.go
+++ b/internal/pkg/resourcemanager/resourcemanager.go
@@ -43,14 +43,19 @@
tpIDPathSuffix = "{%d,%d,%d}/tp_id"
//MeterIDPathSuffix - <(pon_id, onu_id, uni_id)>/<tp_id>/meter_id/<direction>
MeterIDPathSuffix = "{%d,%d,%d}/{%d}/meter_id/{%s}"
- // OnuPacketINPathPrefix - path prefix where ONU packet-in vlanID/PCP is stored
+
+ // OnuPacketInPathPrefix - path prefix where ONU packet-in vlanID/PCP is stored
//format: onu_packetin/{<intfid>,<onuid>,<logicalport>}
- OnuPacketINPathPrefix = "onu_packetin/{%d,%d,%d}"
- // OnuPacketINPath path on the kvstore to store packetin gemport,which will be used for packetin, packetout
+ OnuPacketInPathPrefix = "onu_packetin/{%d,%d,%d}"
+ // OnuPacketInPath path on the kvstore to store packetin gemport,which will be used for packetin, packetout
//format: onu_packetin/{<intfid>,<onuid>,<logicalport>}/{<vlanId>,<priority>}
- OnuPacketINPath = OnuPacketINPathPrefix + "/{%d,%d}"
- //FlowIDsForGem flowids_per_gem/<intfid>/<gemport-id>
- FlowIDsForGem = "flowids_per_gem/{%d}/{%d}"
+ OnuPacketInPath = OnuPacketInPathPrefix + "/{%d,%d}"
+
+ //FlowIDsForGemPathPrefix format: flowids_for_gem/<intfid>
+ FlowIDsForGemPathPrefix = "flowids_per_gem/{%d}"
+ //FlowIDsForGem flowids_for_gem/<intfid>/<gemport-id>
+ FlowIDsForGem = FlowIDsForGemPathPrefix + "/{%d}"
+
//McastQueuesForIntf multicast queues for pon interfaces
McastQueuesForIntf = "mcast_qs_for_int"
//FlowGroup flow_groups/<flow_group_id>
@@ -70,9 +75,11 @@
//Format: BasePathKvStore/<(pon_intf_id, onu_id, uni_id)>/flow_ids
FlowIDPath = "{%s}/flow_ids"
+ //OnuGemInfoPathPathPrefix format: onu_gem_info/<intfid>
+ OnuGemInfoPathPathPrefix = "onu_gem_info/{%d}"
//OnuGemInfoPath is path on the kvstore to store onugem info map
- //format: <device-id>/onu_gem_info/<intfid>
- OnuGemInfoPath = "onu_gem_info/{%d}/{%d}" // onu_gem/<intfid>/<onuID>
+ //format: onu_gem_info/<intfid>/<onu_id>
+ OnuGemInfoPath = OnuGemInfoPathPathPrefix + "/{%d}"
)
// FlowInfo holds the flow information
@@ -320,12 +327,11 @@
PonIntfID, ponrmgr.ONU_ID)
return 0, err
}
- if len(onuID) > 0 {
- rsrcMgr.PonRsrMgr.InitResourceMap(ctx, fmt.Sprintf("%d,%d", PonIntfID, onuID[0]))
- return onuID[0], err
- }
- return 0, err // return onuID 0 on error
+ if len(onuID) > 0 {
+ return onuID[0], nil
+ }
+ return 0, fmt.Errorf("no-onu-id-allocated")
}
// GetCurrentFlowIDsForOnu fetches flow ID from the resource manager
@@ -487,13 +493,6 @@
} else {
logger.Infow(ctx, "freed onu id", log.Fields{"intfID": intfID, "onuID": onuID})
}
-
- /* Free onu id for a particular interface.*/
- var IntfonuID string
- for _, onu := range onuID {
- IntfonuID = fmt.Sprintf("%d,%d", intfID, onu)
- rsrcMgr.PonRsrMgr.RemoveResourceMap(ctx, IntfonuID)
- }
}
// FreeAllocID frees AllocID on the PON resource pool and also frees the allocID association
@@ -979,6 +978,24 @@
return nil
}
+//DeleteAllOnuGemInfoForIntf deletes all the all onu gem info on the given pon interface
+func (rsrcMgr *OpenOltResourceMgr) DeleteAllOnuGemInfoForIntf(ctx context.Context, intfID uint32) error {
+
+ path := fmt.Sprintf(OnuGemInfoPathPathPrefix, intfID)
+
+ logger.Debugw(ctx, "delete-all-onu-gem-info-for-pon-intf", log.Fields{"intfID": intfID})
+ if err := rsrcMgr.KVStore.DeleteWithPrefix(ctx, path); err != nil {
+ logger.Errorf(ctx, "failed-to-remove-resource-%s", path)
+ return err
+ }
+
+ // Reset cache. Normally not necessary as the entire device is getting deleted when this API is invoked.
+ rsrcMgr.onuGemInfoLock.Lock()
+ rsrcMgr.onuGemInfo = make(map[string]*OnuGemInfo)
+ rsrcMgr.onuGemInfoLock.Unlock()
+ return nil
+}
+
// AddUniPortToOnuInfo adds uni port to the onuinfo kvstore. check if the uni is already present if not update the kv store.
func (rsrcMgr *OpenOltResourceMgr) AddUniPortToOnuInfo(ctx context.Context, intfID uint32, onuID uint32, portNo uint32) {
@@ -1010,7 +1027,7 @@
//UpdateGemPortForPktIn updates gemport for pkt in path to kvstore, path being intfid, onuid, portno, vlan id, priority bit
func (rsrcMgr *OpenOltResourceMgr) UpdateGemPortForPktIn(ctx context.Context, pktIn PacketInInfoKey, gemPort uint32) {
- path := fmt.Sprintf(OnuPacketINPath, pktIn.IntfID, pktIn.OnuID, pktIn.LogicalPort, pktIn.VlanID, pktIn.Priority)
+ path := fmt.Sprintf(OnuPacketInPath, pktIn.IntfID, pktIn.OnuID, pktIn.LogicalPort, pktIn.VlanID, pktIn.Priority)
// update cache
rsrcMgr.gemPortForPacketInInfoLock.Lock()
rsrcMgr.gemPortForPacketInInfo[path] = gemPort
@@ -1033,7 +1050,7 @@
var Val []byte
- path := fmt.Sprintf(OnuPacketINPath, packetInInfoKey.IntfID, packetInInfoKey.OnuID, packetInInfoKey.LogicalPort,
+ path := fmt.Sprintf(OnuPacketInPath, packetInInfoKey.IntfID, packetInInfoKey.OnuID, packetInInfoKey.LogicalPort,
packetInInfoKey.VlanID, packetInInfoKey.Priority)
// get from cache
rsrcMgr.gemPortForPacketInInfoLock.RLock()
@@ -1072,7 +1089,7 @@
//DeletePacketInGemPortForOnu deletes the packet-in gemport for ONU
func (rsrcMgr *OpenOltResourceMgr) DeletePacketInGemPortForOnu(ctx context.Context, intfID uint32, onuID uint32, logicalPort uint32) error {
- path := fmt.Sprintf(OnuPacketINPathPrefix, intfID, onuID, logicalPort)
+ path := fmt.Sprintf(OnuPacketInPathPrefix, intfID, onuID, logicalPort)
value, err := rsrcMgr.KVStore.List(ctx, path)
if err != nil {
logger.Errorf(ctx, "failed-to-read-value-from-path-%s", path)
@@ -1082,7 +1099,7 @@
//remove them one by one
for key := range value {
// Remove the PathPrefix from the path on KV key.
- // gemPortForPacketInInfo cache uses OnuPacketINPath as the key
+ // gemPortForPacketInInfo cache uses OnuPacketInPath as the key
stringToBeReplaced := rsrcMgr.KVStore.PathPrefix + "/"
replacedWith := ""
key = strings.Replace(key, stringToBeReplaced, replacedWith, 1)
@@ -1152,7 +1169,11 @@
rsrcMgr.flowIDsForGem[gem] = flowIDs
rsrcMgr.flowIDsForGemLock.Unlock()
+ // If the flowIDs slice is empty delete the key entry from the DB.
if flowIDs == nil {
+ if err := rsrcMgr.KVStore.Delete(ctx, path); err != nil {
+ logger.Errorw(ctx, "Failed to delete from kvstore", log.Fields{"err": err, "path": path})
+ }
return nil
}
val, err := json.Marshal(flowIDs)
@@ -1181,6 +1202,24 @@
}
}
+//DeleteAllFlowIDsForGemForIntf deletes all the flow ids associated for all the gems on the given pon interface
+func (rsrcMgr *OpenOltResourceMgr) DeleteAllFlowIDsForGemForIntf(ctx context.Context, intfID uint32) error {
+
+ path := fmt.Sprintf(FlowIDsForGemPathPrefix, intfID)
+
+ logger.Debugw(ctx, "delete-flow-ids-for-gem-for-pon-intf", log.Fields{"intfID": intfID})
+ if err := rsrcMgr.KVStore.DeleteWithPrefix(ctx, path); err != nil {
+ logger.Errorf(ctx, "failed-to-remove-resource-%s", path)
+ return err
+ }
+
+ // Reset cache. Normally not necessary as the entire device is getting deleted when this API is invoked.
+ rsrcMgr.flowIDsForGemLock.Lock()
+ rsrcMgr.flowIDsForGem = make(map[uint32][]uint64)
+ rsrcMgr.flowIDsForGemLock.Unlock()
+ return nil
+}
+
//GetMcastQueuePerInterfaceMap gets multicast queue info per pon interface
func (rsrcMgr *OpenOltResourceMgr) GetMcastQueuePerInterfaceMap(ctx context.Context) (map[uint32][]uint32, error) {
path := McastQueuesForIntf