VOL-4251: Delete device immediately after (without any delay)
volt-remove-subscriber access leaves stale resources on the OLT device

The delete device cleans up everything for the device including the
TpInstance even before volt-remove-subscriber-access has completed
the processing. The volt-remove-subscriber cleans up scheduler
and queues at the end of removing all the flows. But it needs the
tp-instance reference to clean up the schedulers and queues, but
it has already been deleted. So, it aborts the scheduler/queue
cleanup on the OLT and this causes issues when configuring the
scheduler again on the OLT during a fresh setup as there are some
stale entries on the ONU.

The fix here is to force cleanup the resources on the OLT when the
ONU device is being deleted on the OLT.

Change-Id: I54cd3ef0d5bd41cd901f3bb8917927336b84ea27
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index 60bd3bd..9c45250 100644
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -1143,7 +1143,7 @@
 
 func (dh *DeviceHandler) activateONU(ctx context.Context, intfID uint32, onuID int64, serialNum *oop.SerialNumber, serialNumber string) error {
 	logger.Debugw(ctx, "activate-onu", log.Fields{"intf-id": intfID, "onu-id": onuID, "serialNum": serialNum, "serialNumber": serialNumber, "device-id": dh.device.Id, "OmccEncryption": dh.openOLT.config.OmccEncryption})
-	if err := dh.flowMgr[intfID].UpdateOnuInfo(ctx, intfID, uint32(onuID), serialNumber); err != nil {
+	if err := dh.flowMgr[intfID].AddOnuInfoToFlowMgrCacheAndKvStore(ctx, intfID, uint32(onuID), serialNumber); err != nil {
 		return olterrors.NewErrAdapter("onu-activate-failed", log.Fields{"onu": onuID, "intf-id": intfID}, err)
 	}
 	var pir uint32 = 1000000
@@ -2182,8 +2182,7 @@
 		for _, gem := range onuGem.GemPorts {
 			dh.resourceMgr[intfID].DeleteFlowIDsForGem(ctx, intfID, gem)
 		}
-		err := dh.resourceMgr[intfID].DelOnuGemInfo(ctx, intfID, onuID)
-		if err != nil {
+		if err := dh.flowMgr[intfID].RemoveOnuInfoFromFlowMgrCacheAndKvStore(ctx, intfID, onuID); err != nil {
 			logger.Warnw(ctx, "persistence-update-onu-gem-info-failed", log.Fields{
 				"intf-id":    intfID,
 				"onu-device": onu,
diff --git a/internal/pkg/core/openolt_flowmgr.go b/internal/pkg/core/openolt_flowmgr.go
index d6bf38e..6136798 100644
--- a/internal/pkg/core/openolt_flowmgr.go
+++ b/internal/pkg/core/openolt_flowmgr.go
@@ -690,6 +690,95 @@
 	return err
 }
 
+// We are trying to force remove the schedulers and queues here if one exists for the given key.
+// We ignore any errors encountered in the process. The errors most likely are encountered when
+// the schedulers and queues are already cleared for the given key.
+func (f *OpenOltFlowMgr) forceRemoveSchedulerQueues(ctx context.Context, sq schedQueue) {
+
+	var schedCfg *tp_pb.SchedulerConfig
+	var err error
+	logger.Infow(ctx, "removing-schedulers-and-queues-in-olt",
+		log.Fields{
+			"direction": sq.direction,
+			"intf-id":   sq.intfID,
+			"onu-id":    sq.onuID,
+			"uni-id":    sq.uniID,
+			"uni-port":  sq.uniPort,
+			"tp-id":     sq.tpID,
+			"device-id": f.deviceHandler.device.Id})
+	if sq.direction == tp_pb.Direction_UPSTREAM {
+		schedCfg = f.techprofile.GetUsScheduler(sq.tpInst.(*tp_pb.TechProfileInstance))
+	} else if sq.direction == tp_pb.Direction_DOWNSTREAM {
+		schedCfg = f.techprofile.GetDsScheduler(sq.tpInst.(*tp_pb.TechProfileInstance))
+	}
+
+	TrafficShaping := &tp_pb.TrafficShapingInfo{} // this info is not really useful for the agent during flow removal. Just use default values.
+	TrafficSched := []*tp_pb.TrafficScheduler{f.techprofile.GetTrafficScheduler(sq.tpInst.(*tp_pb.TechProfileInstance), schedCfg, TrafficShaping)}
+	TrafficSched[0].TechProfileId = sq.tpID
+
+	// Remove traffic queues. Ignore any errors, just log them.
+	if TrafficQueues, err := f.techprofile.GetTrafficQueues(ctx, sq.tpInst.(*tp_pb.TechProfileInstance), sq.direction); err != nil {
+		logger.Errorw(ctx, "error retrieving traffic queue", log.Fields{
+			"direction": sq.direction,
+			"intf-id":   sq.intfID,
+			"onu-id":    sq.onuID,
+			"uni-id":    sq.uniID,
+			"uni-port":  sq.uniPort,
+			"tp-id":     sq.tpID,
+			"device-id": f.deviceHandler.device.Id,
+			"err":       err})
+	} else {
+		if _, err = f.deviceHandler.Client.RemoveTrafficQueues(ctx,
+			&tp_pb.TrafficQueues{IntfId: sq.intfID, OnuId: sq.onuID,
+				UniId: sq.uniID, PortNo: sq.uniPort,
+				TrafficQueues: TrafficQueues,
+				TechProfileId: TrafficSched[0].TechProfileId}); err != nil {
+			logger.Warnw(ctx, "error removing traffic queue", log.Fields{
+				"direction": sq.direction,
+				"intf-id":   sq.intfID,
+				"onu-id":    sq.onuID,
+				"uni-id":    sq.uniID,
+				"uni-port":  sq.uniPort,
+				"tp-id":     sq.tpID,
+				"device-id": f.deviceHandler.device.Id,
+				"err":       err})
+
+		} else {
+			logger.Infow(ctx, "removed-traffic-queues-successfully", log.Fields{"device-id": f.deviceHandler.device.Id,
+				"direction": sq.direction,
+				"intf-id":   sq.intfID,
+				"onu-id":    sq.onuID,
+				"uni-id":    sq.uniID,
+				"uni-port":  sq.uniPort,
+				"tp-id":     sq.tpID})
+		}
+	}
+
+	// Remove traffic schedulers. Ignore any errors, just log them.
+	if _, err = f.deviceHandler.Client.RemoveTrafficSchedulers(ctx, &tp_pb.TrafficSchedulers{
+		IntfId: sq.intfID, OnuId: sq.onuID,
+		UniId: sq.uniID, PortNo: sq.uniPort,
+		TrafficScheds: TrafficSched}); err != nil {
+		logger.Warnw(ctx, "error removing traffic scheduler", log.Fields{
+			"direction": sq.direction,
+			"intf-id":   sq.intfID,
+			"onu-id":    sq.onuID,
+			"uni-id":    sq.uniID,
+			"uni-port":  sq.uniPort,
+			"tp-id":     sq.tpID,
+			"device-id": f.deviceHandler.device.Id,
+			"err":       err})
+	} else {
+		logger.Infow(ctx, "removed-traffic-schedulers-successfully", log.Fields{"device-id": f.deviceHandler.device.Id,
+			"direction": sq.direction,
+			"intf-id":   sq.intfID,
+			"onu-id":    sq.onuID,
+			"uni-id":    sq.uniID,
+			"uni-port":  sq.uniPort,
+			"tp-id":     sq.tpID})
+	}
+}
+
 // This function allocates tconts and GEM ports for an ONU
 func (f *OpenOltFlowMgr) createTcontGemports(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, uni string, uniPort uint32, TpID uint32, UsMeterID uint32, DsMeterID uint32, flowMetadata *voltha.FlowMetadata) (uint32, []uint32, interface{}) {
 	var allocIDs []uint32
@@ -1388,11 +1477,40 @@
 }
 
 // DeleteTechProfileInstances removes the tech profile instances from persistent storage
+// We also force release scheduler and queues associated with the tp instance. Theoretically there could be
+// an issue if the upstream scheduler (DBA) is shared across multiple UNI and we force release it, given that
+// this function is only meant to clean up TP instances of a given UNI. But in practicality this  routine
+// is only meant to be called when the clean up of resource for the whole ONU is taking place.
+// The reason for introducing the force cleanup of scheduler and queues (on the OLT) was introduced here
+// because it was observed that if the ONU device was deleted too soon after the flows were
+// unprovisioned on that ONU, the scheduler and queue removal pertinent to that ONU would remain
+// uncleaned on the OLT. So we force clean up here and ignore any error that OLT returns during the
+// force cleanup (possible if the OLT has already cleared those resources).
 func (f *OpenOltFlowMgr) DeleteTechProfileInstances(ctx context.Context, intfID uint32, onuID uint32, uniID uint32) error {
 	tpIDList := f.resourceMgr.GetTechProfileIDForOnu(ctx, intfID, onuID, uniID)
 	uniPortName := getUniPortPath(f.deviceHandler.device.Id, intfID, int32(onuID), int32(uniID))
 
 	for _, tpID := range tpIDList {
+
+		// Force cleanup scheduler/queues -- start
+		uniPortNum := MkUniPortNum(ctx, intfID, onuID, uniID)
+		uni := getUniPortPath(f.deviceHandler.device.Id, intfID, int32(onuID), int32(uniID))
+		tpPath := f.getTPpath(ctx, intfID, uni, tpID)
+		tpInst, err := f.techprofile.GetTPInstance(ctx, tpPath)
+		if err != nil || tpInst == nil { // This should not happen, something wrong in KV backend transaction
+			logger.Warnw(ctx, "tech-profile-not-in-kv-store",
+				log.Fields{
+					"tp-id": tpID,
+					"path":  tpPath})
+		}
+		switch tpInstance := tpInst.(type) {
+		case *tp_pb.TechProfileInstance:
+			f.forceRemoveSchedulerQueues(ctx, schedQueue{direction: tp_pb.Direction_UPSTREAM, intfID: intfID, onuID: onuID, uniID: uniID, tpID: tpID, uniPort: uniPortNum, tpInst: tpInstance})
+			f.forceRemoveSchedulerQueues(ctx, schedQueue{direction: tp_pb.Direction_DOWNSTREAM, intfID: intfID, onuID: onuID, uniID: uniID, tpID: tpID, uniPort: uniPortNum, tpInst: tpInstance})
+		}
+		// Force cleanup scheduler/queues -- end
+
+		// Now remove the tp instance
 		if err := f.DeleteTechProfileInstance(ctx, intfID, onuID, uniID, uniPortName, tpID); err != nil {
 			logger.Errorw(ctx, "delete-tech-profile-failed", log.Fields{"err": err, "device-id": f.deviceHandler.device.Id})
 			// return err
@@ -2313,9 +2431,8 @@
 	return nil
 }
 
-//UpdateOnuInfo function adds onu info to cache and kvstore
-//UpdateOnuInfo function adds onu info to cache and kvstore
-func (f *OpenOltFlowMgr) UpdateOnuInfo(ctx context.Context, intfID uint32, onuID uint32, serialNum string) error {
+//AddOnuInfoToFlowMgrCacheAndKvStore function adds onu info to cache and kvstore
+func (f *OpenOltFlowMgr) AddOnuInfoToFlowMgrCacheAndKvStore(ctx context.Context, intfID uint32, onuID uint32, serialNum string) error {
 
 	f.onuGemInfoLock.RLock()
 	_, ok := f.onuGemInfoMap[onuID]
@@ -2335,7 +2452,7 @@
 	if err := f.resourceMgr.AddOnuGemInfo(ctx, intfID, onuID, onuGemInfo); err != nil {
 		return err
 	}
-	logger.Infow(ctx, "updated-onuinfo",
+	logger.Infow(ctx, "added-onuinfo",
 		log.Fields{
 			"intf-id":    intfID,
 			"onu-id":     onuID,
@@ -2345,6 +2462,24 @@
 	return nil
 }
 
+//RemoveOnuInfoFromFlowMgrCacheAndKvStore function adds onu info to cache and kvstore
+func (f *OpenOltFlowMgr) RemoveOnuInfoFromFlowMgrCacheAndKvStore(ctx context.Context, intfID uint32, onuID uint32) error {
+
+	f.onuGemInfoLock.Lock()
+	delete(f.onuGemInfoMap, onuID)
+	f.onuGemInfoLock.Unlock()
+
+	if err := f.resourceMgr.DelOnuGemInfo(ctx, intfID, onuID); err != nil {
+		return err
+	}
+	logger.Infow(ctx, "deleted-onuinfo",
+		log.Fields{
+			"intf-id":   intfID,
+			"onu-id":    onuID,
+			"device-id": f.deviceHandler.device.Id})
+	return nil
+}
+
 //addGemPortToOnuInfoMap function adds GEMport to ONU map
 func (f *OpenOltFlowMgr) addGemPortToOnuInfoMap(ctx context.Context, intfID uint32, onuID uint32, gemPort uint32) {
 
diff --git a/internal/pkg/core/openolt_flowmgr_test.go b/internal/pkg/core/openolt_flowmgr_test.go
index 0d966d3..b985b4e 100644
--- a/internal/pkg/core/openolt_flowmgr_test.go
+++ b/internal/pkg/core/openolt_flowmgr_test.go
@@ -629,7 +629,7 @@
 			wg.Add(1)
 			go func(i uint32, j uint32) {
 				// TODO: actually verify success
-				_ = flowMgr[i].UpdateOnuInfo(ctx, i, i, fmt.Sprintf("onu-%d", i))
+				_ = flowMgr[i].AddOnuInfoToFlowMgrCacheAndKvStore(ctx, i, i, fmt.Sprintf("onu-%d", i))
 				wg.Done()
 			}(uint32(i), uint32(j))
 		}
@@ -655,7 +655,7 @@
 	for i := 0; i < intfNum; i++ {
 		for o := 1; o <= onuNum; o++ {
 			// TODO: actually verify success
-			_ = flowMgr[i].UpdateOnuInfo(ctx, uint32(i), uint32(o), fmt.Sprintf("i%do%d", i, o-1))
+			_ = flowMgr[i].AddOnuInfoToFlowMgrCacheAndKvStore(ctx, uint32(i), uint32(o), fmt.Sprintf("i%do%d", i, o-1))
 		}
 	}
 
@@ -720,7 +720,7 @@
 		// Delete all gemports
 		{"DeleteGemPortFromLocalCache2", args{0, 1, []uint32{1, 2, 3, 4}, []uint32{1, 2, 3, 4}, []uint32{}, "onu1", 0}},
 		// Try to delete when there is no gem port
-		{"DeleteGemPortFromLocalCache3", args{0, 1, []uint32{}, []uint32{1, 2}, []uint32{}, "onu1", 0}},
+		{"DeleteGemPortFromLocalCache3", args{0, 1, []uint32{}, []uint32{1, 2}, nil, "onu1", 0}},
 		// Try to delete non-existent gem port
 		{"DeleteGemPortFromLocalCache4", args{0, 1, []uint32{1}, []uint32{2}, []uint32{1}, "onu1", 1}},
 		// Try to delete two of the gem ports
@@ -730,19 +730,31 @@
 	defer cancel()
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
-			// TODO: should check returned errors are as expected?
-			_ = flowMgr[tt.args.intfID].UpdateOnuInfo(ctx, tt.args.intfID, tt.args.onuID, tt.args.serialNum)
+			if err := flowMgr[tt.args.intfID].RemoveOnuInfoFromFlowMgrCacheAndKvStore(ctx, tt.args.intfID, tt.args.onuID); err != nil {
+				t.Errorf("failed to remove onu")
+			}
+			if err := flowMgr[tt.args.intfID].AddOnuInfoToFlowMgrCacheAndKvStore(ctx, tt.args.intfID, tt.args.onuID, tt.args.serialNum); err != nil {
+				t.Errorf("failed to add onu")
+			}
 			for _, gemPort := range tt.args.gemPortIDs {
 				flowMgr[tt.args.intfID].addGemPortToOnuInfoMap(ctx, tt.args.intfID, tt.args.onuID, gemPort)
 			}
 			for _, gemPortDeleted := range tt.args.gemPortIDsToBeDeleted {
 				flowMgr[tt.args.intfID].deleteGemPortFromLocalCache(ctx, tt.args.intfID, tt.args.onuID, gemPortDeleted)
 			}
-			lenofGemPorts := len(flowMgr[tt.args.intfID].onuGemInfoMap[1].GemPorts)
+			lenofGemPorts := 0
+			gP, ok := flowMgr[tt.args.intfID].onuGemInfoMap[tt.args.onuID]
+			if ok {
+				lenofGemPorts = len(gP.GemPorts)
+			}
 			if lenofGemPorts != tt.args.finalLength {
 				t.Errorf("GemPorts length is not as expected len = %d, want %d", lenofGemPorts, tt.args.finalLength)
 			}
-			gemPorts := flowMgr[tt.args.intfID].onuGemInfoMap[1].GemPorts
+			gP, ok = flowMgr[tt.args.intfID].onuGemInfoMap[tt.args.onuID]
+			var gemPorts []uint32
+			if ok {
+				gemPorts = gP.GemPorts
+			}
 			if !reflect.DeepEqual(tt.args.gemPortIDsRemaining, gemPorts) {
 				t.Errorf("GemPorts are not as expected = %v, want %v", gemPorts, tt.args.gemPortIDsRemaining)
 			}