VOL-4446: Fix etcd stale data issue post device delete

Change-Id: Ia4183438adb24ca7443627c9ab5968df6ad55dc8
diff --git a/VERSION b/VERSION
index 1454f6e..4d54dad 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-4.0.1
+4.0.2
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index 359d715..02bc895 100644
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -114,6 +114,8 @@
 
 	adapterPreviouslyConnected bool
 	agentPreviouslyConnected   bool
+
+	isDeviceDeletionInProgress bool
 }
 
 //OnuDevice represents ONU related info
@@ -1688,6 +1690,13 @@
 func (dh *DeviceHandler) UpdateFlowsIncrementally(ctx context.Context, device *voltha.Device, flows *of.FlowChanges, groups *of.FlowGroupChanges, flowMetadata *voltha.FlowMetadata) error {
 
 	var errorsList []error
+
+	if dh.getDeviceDeletionInProgressFlag() {
+		// The device itself is going to be reset as part of deletion. So nothing to be done.
+		logger.Infow(ctx, "device-deletion-in-progress--not-handling-flows-or-groups", log.Fields{"device-id": device.Id})
+		return nil
+	}
+
 	logger.Debugw(ctx, "received-incremental-flowupdate-in-device-handler", log.Fields{"device-id": device.Id, "flows": flows, "groups": groups, "flowMetadata": flowMetadata})
 	errorsList = append(errorsList, dh.handleFlows(ctx, device, flows, flowMetadata)...)
 	errorsList = append(errorsList, dh.handleGroups(ctx, groups)...)
@@ -1855,6 +1864,9 @@
 	   This clears up flow data and also resource map data for various
 	   other pon resources like alloc_id and gemport_id
 	*/
+
+	dh.setDeviceDeletionInProgressFlag(true)
+
 	dh.cleanupDeviceResources(ctx)
 	logger.Debugw(ctx, "removed-device-from-Resource-manager-KV-store", log.Fields{"device-id": dh.device.Id})
 	// Stop the Stats collector
@@ -1899,15 +1911,9 @@
 				if err = dh.clearUNIData(ctx, &onuGemData[i]); err != nil {
 					logger.Errorw(ctx, "failed-to-clear-data-for-onu", log.Fields{"onu-device": onu})
 				}
-				// Clear flowids for gem cache.
-				for _, gem := range onu.GemPorts {
-					dh.resourceMgr[ponPort].DeleteFlowIDsForGem(ctx, ponPort, gem)
-				}
-				err = dh.resourceMgr[ponPort].DelOnuGemInfo(ctx, ponPort, onu.OnuID)
-				if err != nil {
-					logger.Errorw(ctx, "failed-to-update-onugem-info", log.Fields{"intfid": ponPort, "onugeminfo": onuGemData})
-				}
 			}
+			_ = dh.resourceMgr[ponPort].DeleteAllFlowIDsForGemForIntf(ctx, ponPort)
+			_ = dh.resourceMgr[ponPort].DeleteAllOnuGemInfoForIntf(ctx, ponPort)
 			if err := dh.resourceMgr[ponPort].Delete(ctx, ponPort); err != nil {
 				logger.Debug(ctx, err)
 			}
@@ -3114,3 +3120,15 @@
 	}
 	return svc
 }
+
+func (dh *DeviceHandler) setDeviceDeletionInProgressFlag(flag bool) {
+	dh.lockDevice.Lock()
+	defer dh.lockDevice.Unlock()
+	dh.isDeviceDeletionInProgress = flag
+}
+
+func (dh *DeviceHandler) getDeviceDeletionInProgressFlag() bool {
+	dh.lockDevice.RLock()
+	defer dh.lockDevice.RUnlock()
+	return dh.isDeviceDeletionInProgress
+}
diff --git a/internal/pkg/resourcemanager/resourcemanager.go b/internal/pkg/resourcemanager/resourcemanager.go
index 63e299b..f47ccf1 100755
--- a/internal/pkg/resourcemanager/resourcemanager.go
+++ b/internal/pkg/resourcemanager/resourcemanager.go
@@ -45,14 +45,19 @@
 	tpIDPathSuffix = "{%d,%d,%d}/tp_id"
 	//MeterIDPathSuffix - <(pon_id, onu_id, uni_id)>/<tp_id>/meter_id/<direction>
 	MeterIDPathSuffix = "{%d,%d,%d}/{%d}/meter_id/{%s}"
-	// OnuPacketINPathPrefix - path prefix where ONU packet-in vlanID/PCP is stored
+
+	// OnuPacketInPathPrefix - path prefix where ONU packet-in vlanID/PCP is stored
 	//format: onu_packetin/{<intfid>,<onuid>,<logicalport>}
-	OnuPacketINPathPrefix = "onu_packetin/{%d,%d,%d}"
-	// OnuPacketINPath path on the kvstore to store packetin gemport,which will be used for packetin, packetout
+	OnuPacketInPathPrefix = "onu_packetin/{%d,%d,%d}"
+	// OnuPacketInPath path on the kvstore to store packetin gemport,which will be used for packetin, packetout
 	//format: onu_packetin/{<intfid>,<onuid>,<logicalport>}/{<vlanId>,<priority>}
-	OnuPacketINPath = OnuPacketINPathPrefix + "/{%d,%d}"
-	//FlowIDsForGem flowids_per_gem/<intfid>/<gemport-id>
-	FlowIDsForGem = "flowids_per_gem/{%d}/{%d}"
+	OnuPacketInPath = OnuPacketInPathPrefix + "/{%d,%d}"
+
+	//FlowIDsForGemPathPrefix format: flowids_for_gem/<intfid>
+	FlowIDsForGemPathPrefix = "flowids_per_gem/{%d}"
+	//FlowIDsForGem flowids_for_gem/<intfid>/<gemport-id>
+	FlowIDsForGem = FlowIDsForGemPathPrefix + "/{%d}"
+
 	//McastQueuesForIntf multicast queues for pon interfaces
 	McastQueuesForIntf = "mcast_qs_for_int"
 	//FlowGroup flow_groups/<flow_group_id>
@@ -72,9 +77,11 @@
 	//Format: BasePathKvStore/<(pon_intf_id, onu_id, uni_id)>/flow_ids
 	FlowIDPath = "{%s}/flow_ids"
 
+	//OnuGemInfoPathPathPrefix format: onu_gem_info/<intfid>
+	OnuGemInfoPathPathPrefix = "onu_gem_info/{%d}"
 	//OnuGemInfoPath is path on the kvstore to store onugem info map
-	//format: <device-id>/onu_gem_info/<intfid>
-	OnuGemInfoPath = "onu_gem_info/{%d}/{%d}" // onu_gem/<intfid>/<onuID>
+	//format: onu_gem_info/<intfid>/<onu_id>
+	OnuGemInfoPath = OnuGemInfoPathPathPrefix + "/{%d}"
 )
 
 // FlowInfo holds the flow information
@@ -328,7 +335,7 @@
 		return onuID[0], err
 	}
 
-	return 0, err // return onuID 0 on error
+	return 0, fmt.Errorf("no-onu-id-allocated")
 }
 
 // GetCurrentFlowIDsForOnu fetches flow ID from the resource manager
@@ -975,6 +982,24 @@
 	return nil
 }
 
+//DeleteAllOnuGemInfoForIntf deletes all the all onu gem info on the given pon interface
+func (rsrcMgr *OpenOltResourceMgr) DeleteAllOnuGemInfoForIntf(ctx context.Context, intfID uint32) error {
+
+	path := fmt.Sprintf(OnuGemInfoPathPathPrefix, intfID)
+
+	logger.Debugw(ctx, "delete-all-onu-gem-info-for-pon-intf", log.Fields{"intfID": intfID})
+	if err := rsrcMgr.KVStore.DeleteWithPrefix(ctx, path); err != nil {
+		logger.Errorf(ctx, "failed-to-remove-resource-%s", path)
+		return err
+	}
+
+	// Reset cache. Normally not necessary as the entire device is getting deleted when this API is invoked.
+	rsrcMgr.onuGemInfoLock.Lock()
+	rsrcMgr.onuGemInfo = make(map[string]*OnuGemInfo)
+	rsrcMgr.onuGemInfoLock.Unlock()
+	return nil
+}
+
 // AddUniPortToOnuInfo adds uni port to the onuinfo kvstore. check if the uni is already present if not update the kv store.
 func (rsrcMgr *OpenOltResourceMgr) AddUniPortToOnuInfo(ctx context.Context, intfID uint32, onuID uint32, portNo uint32) {
 
@@ -1006,7 +1031,7 @@
 //UpdateGemPortForPktIn updates gemport for pkt in path to kvstore, path being intfid, onuid, portno, vlan id, priority bit
 func (rsrcMgr *OpenOltResourceMgr) UpdateGemPortForPktIn(ctx context.Context, pktIn PacketInInfoKey, gemPort uint32) {
 
-	path := fmt.Sprintf(OnuPacketINPath, pktIn.IntfID, pktIn.OnuID, pktIn.LogicalPort, pktIn.VlanID, pktIn.Priority)
+	path := fmt.Sprintf(OnuPacketInPath, pktIn.IntfID, pktIn.OnuID, pktIn.LogicalPort, pktIn.VlanID, pktIn.Priority)
 	// update cache
 	rsrcMgr.gemPortForPacketInInfoLock.Lock()
 	rsrcMgr.gemPortForPacketInInfo[path] = gemPort
@@ -1029,7 +1054,7 @@
 
 	var Val []byte
 
-	path := fmt.Sprintf(OnuPacketINPath, packetInInfoKey.IntfID, packetInInfoKey.OnuID, packetInInfoKey.LogicalPort,
+	path := fmt.Sprintf(OnuPacketInPath, packetInInfoKey.IntfID, packetInInfoKey.OnuID, packetInInfoKey.LogicalPort,
 		packetInInfoKey.VlanID, packetInInfoKey.Priority)
 	// get from cache
 	rsrcMgr.gemPortForPacketInInfoLock.RLock()
@@ -1068,7 +1093,7 @@
 
 //DeletePacketInGemPortForOnu deletes the packet-in gemport for ONU
 func (rsrcMgr *OpenOltResourceMgr) DeletePacketInGemPortForOnu(ctx context.Context, intfID uint32, onuID uint32, logicalPort uint32) error {
-	path := fmt.Sprintf(OnuPacketINPathPrefix, intfID, onuID, logicalPort)
+	path := fmt.Sprintf(OnuPacketInPathPrefix, intfID, onuID, logicalPort)
 	value, err := rsrcMgr.KVStore.List(ctx, path)
 	if err != nil {
 		logger.Errorf(ctx, "failed-to-read-value-from-path-%s", path)
@@ -1078,7 +1103,7 @@
 	//remove them one by one
 	for key := range value {
 		// Remove the PathPrefix from the path on KV key.
-		// gemPortForPacketInInfo cache uses OnuPacketINPath as the key
+		// gemPortForPacketInInfo cache uses OnuPacketInPath as the key
 		stringToBeReplaced := rsrcMgr.KVStore.PathPrefix + "/"
 		replacedWith := ""
 		key = strings.Replace(key, stringToBeReplaced, replacedWith, 1)
@@ -1149,6 +1174,9 @@
 	rsrcMgr.flowIDsForGemLock.Unlock()
 
 	if flowIDs == nil {
+		if err := rsrcMgr.KVStore.Delete(ctx, path); err != nil {
+			logger.Errorw(ctx, "Failed to delete from kvstore", log.Fields{"err": err, "path": path})
+		}
 		return nil
 	}
 	val, err := json.Marshal(flowIDs)
@@ -1177,6 +1205,24 @@
 	}
 }
 
+//DeleteAllFlowIDsForGemForIntf deletes all the flow ids associated for all the gems on the given pon interface
+func (rsrcMgr *OpenOltResourceMgr) DeleteAllFlowIDsForGemForIntf(ctx context.Context, intfID uint32) error {
+
+	path := fmt.Sprintf(FlowIDsForGemPathPrefix, intfID)
+
+	logger.Debugw(ctx, "delete-flow-ids-for-gem-for-pon-intf", log.Fields{"intfID": intfID})
+	if err := rsrcMgr.KVStore.DeleteWithPrefix(ctx, path); err != nil {
+		logger.Errorf(ctx, "failed-to-remove-resource-%s", path)
+		return err
+	}
+
+	// Reset cache. Normally not necessary as the entire device is getting deleted when this API is invoked.
+	rsrcMgr.flowIDsForGemLock.Lock()
+	rsrcMgr.flowIDsForGem = make(map[uint32][]uint64)
+	rsrcMgr.flowIDsForGemLock.Unlock()
+	return nil
+}
+
 //GetMcastQueuePerInterfaceMap gets multicast queue info per pon interface
 func (rsrcMgr *OpenOltResourceMgr) GetMcastQueuePerInterfaceMap(ctx context.Context) (map[uint32][]uint32, error) {
 	path := McastQueuesForIntf
diff --git a/internal/pkg/resourcemanager/resourcemanager_test.go b/internal/pkg/resourcemanager/resourcemanager_test.go
index 4244977..c4a6751 100644
--- a/internal/pkg/resourcemanager/resourcemanager_test.go
+++ b/internal/pkg/resourcemanager/resourcemanager_test.go
@@ -441,6 +441,58 @@
 	}
 }
 
+func TestOpenOltResourceMgr_DeleteAllFlowIDsForGemForIntf(t *testing.T) {
+
+	type args struct {
+		PONIntfID uint32
+	}
+	tests := []struct {
+		name   string
+		fields *fields
+		args   args
+		want   error
+	}{
+		{"DeleteAllFlowIDsForGemForIntf-1", getResMgr(), args{0}, nil},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			RsrcMgr := testResMgrObject(tt.fields)
+			ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+			defer cancel()
+			err := RsrcMgr.DeleteAllFlowIDsForGemForIntf(ctx, tt.args.PONIntfID)
+			if err != nil {
+				t.Errorf("DeleteAllFlowIDsForGemForIntf() returned error")
+			}
+		})
+	}
+}
+
+func TestOpenOltResourceMgr_DeleteAllOnuGemInfoForIntf(t *testing.T) {
+
+	type args struct {
+		PONIntfID uint32
+	}
+	tests := []struct {
+		name   string
+		fields *fields
+		args   args
+		want   error
+	}{
+		{"DeleteAllOnuGemInfoForIntf-1", getResMgr(), args{0}, nil},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			RsrcMgr := testResMgrObject(tt.fields)
+			ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+			defer cancel()
+			err := RsrcMgr.DeleteAllOnuGemInfoForIntf(ctx, tt.args.PONIntfID)
+			if err != nil {
+				t.Errorf("DeleteAllOnuGemInfoForIntf() returned error")
+			}
+		})
+	}
+}
+
 func TestOpenOltResourceMgr_GetCurrentGEMPortIDsForOnu(t *testing.T) {
 	type args struct {
 		intfID uint32