[VOL-3442] reconcile treatment for existing flow configuration

Signed-off-by: Holger Hildebrandt <holger.hildebrandt@adtran.com>
Change-Id: I9f132f565d262cf8660efae3473aa2f7983c0464
diff --git a/internal/pkg/onuadaptercore/onu_device_entry.go b/internal/pkg/onuadaptercore/onu_device_entry.go
index 382984c..8c1d097 100644
--- a/internal/pkg/onuadaptercore/onu_device_entry.go
+++ b/internal/pkg/onuadaptercore/onu_device_entry.go
@@ -19,7 +19,10 @@
 
 import (
 	"context"
+	"encoding/json"
 	"errors"
+	"fmt"
+	"sync"
 	"time"
 
 	"github.com/opencord/omci-lib-go"
@@ -31,6 +34,7 @@
 	"github.com/looplab/fsm"
 	"github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif"
 	"github.com/opencord/voltha-lib-go/v3/pkg/db"
+	"github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
 
 	//"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
@@ -104,6 +108,7 @@
 const (
 	cBasePathMibTemplateKvStore = "service/voltha/omci_mibs/go_templates"
 	cSuffixMibTemplateKvStore   = "%s/%s/%s"
+	cBasePathOnuKVStore         = "service/voltha/openonu"
 )
 
 // OnuDeviceEvent - event of interest to Device Adapters and OpenOMCI State Machines
@@ -188,22 +193,52 @@
 	isActive uint8
 }
 
+type uniVlanFlowParams struct {
+	TpID         uint16 `json:"tp_id"`
+	MatchVid     uint32 `json:"match_vid"` //use uint32 types for allowing immediate bitshifting
+	MatchPcp     uint32 `json:"match_pcp"`
+	TagsToRemove uint32 `json:"tags_to_revome"`
+	SetVid       uint32 `json:"set_vid"`
+	SetPcp       uint32 `json:"set_pcp"`
+}
+
+type uniPersConfig struct {
+	PersUniID      uint8               `json:"uni_id"`
+	PersTpPath     string              `json:"tp_path"`
+	PersFlowParams []uniVlanFlowParams `json:"flow_params"`
+}
+
+type onuPersistentData struct {
+	PersOnuID      uint32          `json:"onu_id"`
+	PersIntfID     uint32          `json:"intf_id"`
+	PersSnr        string          `json:"serial_number"`
+	PersAdminState string          `json:"admin_state"`
+	PersOperState  string          `json:"oper_state"`
+	PersUniConfig  []uniPersConfig `json:"uni_config"`
+}
+
 // OnuDeviceEntry - ONU device info and FSM events.
 type OnuDeviceEntry struct {
-	deviceID           string
-	baseDeviceHandler  *deviceHandler
-	coreProxy          adapterif.CoreProxy
-	adapterProxy       adapterif.AdapterProxy
-	started            bool
-	PDevOmciCC         *omciCC
-	pOnuDB             *onuDeviceDB
-	mibTemplateKVStore *db.Backend
-	vendorID           string
-	serialNumber       string
-	equipmentID        string
-	swImages           [secondSwImageMeID + 1]swImages
-	activeSwVersion    string
-	macAddress         string
+	deviceID              string
+	baseDeviceHandler     *deviceHandler
+	coreProxy             adapterif.CoreProxy
+	adapterProxy          adapterif.AdapterProxy
+	started               bool
+	PDevOmciCC            *omciCC
+	pOnuDB                *onuDeviceDB
+	mibTemplateKVStore    *db.Backend
+	sOnuPersistentData    onuPersistentData
+	onuKVStoreMutex       sync.RWMutex
+	onuKVStore            *db.Backend
+	onuKVStorePath        string
+	onuKVStoreprocResult  error //error indication of processing
+	chOnuKvProcessingStep chan uint8
+	vendorID              string
+	serialNumber          string
+	equipmentID           string
+	swImages              [secondSwImageMeID + 1]swImages
+	activeSwVersion       string
+	macAddress            string
 	//lockDeviceEntries           sync.RWMutex
 	mibDbClass    func() error
 	supportedFsms OmciDeviceFsms
@@ -235,6 +270,8 @@
 	onuDeviceEntry.coreProxy = coreProxy
 	onuDeviceEntry.adapterProxy = adapterProxy
 	onuDeviceEntry.devState = DeviceStatusInit
+	onuDeviceEntry.sOnuPersistentData.PersUniConfig = make([]uniPersConfig, 0)
+	onuDeviceEntry.chOnuKvProcessingStep = make(chan uint8)
 	onuDeviceEntry.omciRebootMessageReceivedChannel = make(chan Message, 2048)
 	//openomciagent.lockDeviceHandlersMap = sync.RWMutex{}
 	//OMCI related databases are on a per-agent basis. State machines and tasks
@@ -379,7 +416,15 @@
 
 	onuDeviceEntry.mibTemplateKVStore = onuDeviceEntry.baseDeviceHandler.setBackend(cBasePathMibTemplateKvStore)
 	if onuDeviceEntry.mibTemplateKVStore == nil {
-		logger.Errorw("Failed to setup mibTemplateKVStore", log.Fields{"device-id": deviceID})
+		logger.Errorw("Can't access mibTemplateKVStore - no backend connection to service",
+			log.Fields{"device-id": deviceID, "service": cBasePathMibTemplateKvStore})
+	}
+
+	onuDeviceEntry.onuKVStorePath = onuDeviceEntry.deviceID
+	onuDeviceEntry.onuKVStore = onuDeviceEntry.baseDeviceHandler.setBackend(cBasePathOnuKVStore)
+	if onuDeviceEntry.onuKVStore == nil {
+		logger.Errorw("Can't access onuKVStore - no backend connection to service",
+			log.Fields{"device-id": deviceID, "service": cBasePathOnuKVStore})
 	}
 
 	// Alarm Synchronization Database
@@ -479,3 +524,217 @@
 		logger.Warnw("device-event not yet handled", log.Fields{"state": devEvent})
 	}
 }
+
+func (oo *OnuDeviceEntry) restoreDataFromOnuKvStore(ctx context.Context) error {
+	if oo.onuKVStore == nil {
+		logger.Debugw("onuKVStore not set - abort", log.Fields{"device-id": oo.deviceID})
+		return fmt.Errorf(fmt.Sprintf("onuKVStore-not-set-abort-%s", oo.deviceID))
+	}
+	oo.sOnuPersistentData = onuPersistentData{0, 0, "", "", "", make([]uniPersConfig, 0)}
+	Value, err := oo.onuKVStore.Get(ctx, oo.onuKVStorePath)
+	if err == nil {
+		if Value != nil {
+			logger.Debugw("ONU-data read",
+				log.Fields{"Key": Value.Key, "device-id": oo.deviceID})
+			tmpBytes, _ := kvstore.ToByte(Value.Value)
+
+			if err = json.Unmarshal(tmpBytes, &oo.sOnuPersistentData); err != nil {
+				logger.Errorw("unable to unmarshal ONU-data", log.Fields{"error": err, "device-id": oo.deviceID})
+				return fmt.Errorf(fmt.Sprintf("unable-to-unmarshal-ONU-data-%s", oo.deviceID))
+			}
+			logger.Debugw("ONU-data", log.Fields{"sOnuPersistentData": oo.sOnuPersistentData,
+				"device-id": oo.deviceID})
+		} else {
+			logger.Errorw("no ONU-data found", log.Fields{"path": oo.onuKVStorePath, "device-id": oo.deviceID})
+			return fmt.Errorf(fmt.Sprintf("no-ONU-data-found-%s", oo.deviceID))
+		}
+	} else {
+		logger.Errorw("unable to read from KVstore", log.Fields{"device-id": oo.deviceID})
+		return fmt.Errorf(fmt.Sprintf("unable-to-read-from-KVstore-%s", oo.deviceID))
+	}
+	return nil
+}
+
+func (oo *OnuDeviceEntry) deleteDataFromOnuKvStore(ctx context.Context, wg *sync.WaitGroup) {
+	defer wg.Done()
+
+	if oo.onuKVStore == nil {
+		logger.Debugw("onuKVStore not set - abort", log.Fields{"device-id": oo.deviceID})
+		oo.onuKVStoreprocResult = errors.New("onu-data delete aborted: onuKVStore not set")
+		return
+	}
+	var processingStep uint8 = 1 // used to synchronize the different processing steps with chOnuKvProcessingStep
+	go oo.deletePersistentData(ctx, processingStep)
+	if !oo.waitForTimeoutOrCompletion(ctx, oo.chOnuKvProcessingStep, processingStep) {
+		//timeout or error detected
+		logger.Debugw("ONU-data not deleted - abort", log.Fields{"device-id": oo.deviceID})
+		oo.onuKVStoreprocResult = errors.New("onu-data delete aborted: during kv-access")
+		return
+	}
+}
+
+func (oo *OnuDeviceEntry) deletePersistentData(ctx context.Context, aProcessingStep uint8) {
+
+	logger.Debugw("delete ONU-data from KVStore", log.Fields{"device-id": oo.deviceID})
+	err := oo.onuKVStore.Delete(ctx, oo.onuKVStorePath)
+	if err != nil {
+		logger.Errorw("unable to delete in KVstore", log.Fields{"device-id": oo.deviceID, "err": err})
+		oo.chOnuKvProcessingStep <- 0 //error indication
+		return
+	}
+	oo.chOnuKvProcessingStep <- aProcessingStep //done
+}
+
+func (oo *OnuDeviceEntry) updateOnuKvStore(ctx context.Context, wg *sync.WaitGroup) {
+	defer wg.Done()
+
+	if oo.onuKVStore == nil {
+		logger.Debugw("onuKVStore not set - abort", log.Fields{"device-id": oo.deviceID})
+		oo.onuKVStoreprocResult = errors.New("onu-data update aborted: onuKVStore not set")
+		return
+	}
+	var processingStep uint8 = 1 // used to synchronize the different processing steps with chOnuKvProcessingStep
+	go oo.storeDataInOnuKvStore(ctx, processingStep)
+	if !oo.waitForTimeoutOrCompletion(ctx, oo.chOnuKvProcessingStep, processingStep) {
+		//timeout or error detected
+		logger.Debugw("ONU-data not written - abort", log.Fields{"device-id": oo.deviceID})
+		oo.onuKVStoreprocResult = errors.New("onu-data update aborted: during writing process")
+		return
+	}
+}
+
+func (oo *OnuDeviceEntry) storeDataInOnuKvStore(ctx context.Context, aProcessingStep uint8) {
+
+	//assign values which are not already present when newOnuDeviceEntry() is called
+	oo.sOnuPersistentData.PersOnuID = oo.baseDeviceHandler.pOnuIndication.OnuId
+	oo.sOnuPersistentData.PersIntfID = oo.baseDeviceHandler.pOnuIndication.IntfId
+	oo.sOnuPersistentData.PersSnr = oo.baseDeviceHandler.pOnuOmciDevice.serialNumber
+	//TODO: verify usage of these values during restart UC
+	oo.sOnuPersistentData.PersAdminState = "up"
+	oo.sOnuPersistentData.PersOperState = "active"
+
+	logger.Debugw("Update ONU-data in KVStore", log.Fields{"device-id": oo.deviceID, "sOnuPersistentData": oo.sOnuPersistentData})
+
+	Value, err := json.Marshal(oo.sOnuPersistentData)
+	if err != nil {
+		logger.Errorw("unable to marshal ONU-data", log.Fields{"sOnuPersistentData": oo.sOnuPersistentData,
+			"device-id": oo.deviceID, "err": err})
+		oo.chOnuKvProcessingStep <- 0 //error indication
+		return
+	}
+	err = oo.onuKVStore.Put(ctx, oo.onuKVStorePath, Value)
+	if err != nil {
+		logger.Errorw("unable to write ONU-data into KVstore", log.Fields{"device-id": oo.deviceID, "err": err})
+		oo.chOnuKvProcessingStep <- 0 //error indication
+		return
+	}
+	oo.chOnuKvProcessingStep <- aProcessingStep //done
+}
+
+func (oo *OnuDeviceEntry) updateOnuUniTpPath(aUniID uint8, aPathString string) bool {
+	/* within some specific InterAdapter processing request write/read access to data is ensured to be sequentially,
+	   as also the complete sequence is ensured to 'run to completion' before some new request is accepted
+	   no specific concurrency protection to sOnuPersistentData is required here
+	*/
+	for k, v := range oo.sOnuPersistentData.PersUniConfig {
+		if v.PersUniID == aUniID {
+			logger.Debugw("PersUniConfig-entry already exists", log.Fields{"device-id": oo.deviceID, "uniID": aUniID})
+			existingPath := oo.sOnuPersistentData.PersUniConfig[k].PersTpPath
+			if existingPath != aPathString {
+				if aPathString == "" {
+					//existing entry to be deleted
+					logger.Debugw("UniTp delete path value", log.Fields{"device-id": oo.deviceID, "uniID": aUniID, "path": aPathString})
+					oo.sOnuPersistentData.PersUniConfig[k].PersTpPath = ""
+				} else {
+					//existing entry to be modified
+					logger.Debugw("UniTp modify path value", log.Fields{"device-id": oo.deviceID, "uniID": aUniID, "path": aPathString})
+					oo.sOnuPersistentData.PersUniConfig[k].PersTpPath = aPathString
+				}
+				return true
+			}
+			//entry already exists
+			logger.Debugw("UniTp path already exists", log.Fields{"device-id": oo.deviceID, "uniID": aUniID, "path": aPathString})
+			return false
+		}
+	}
+	//no entry exists for uniId
+
+	if aPathString == "" {
+		//delete request in non-existing state , accept as no change
+		logger.Debugw("UniTp path already removed", log.Fields{"device-id": oo.deviceID, "uniID": aUniID})
+		return false
+	}
+	//new entry to be created
+	logger.Debugw("New UniTp path set", log.Fields{"device-id": oo.deviceID, "uniID": aUniID, "path": aPathString})
+	oo.sOnuPersistentData.PersUniConfig =
+		append(oo.sOnuPersistentData.PersUniConfig, uniPersConfig{PersUniID: aUniID, PersTpPath: aPathString, PersFlowParams: make([]uniVlanFlowParams, 0)})
+	return true
+}
+
+// deleteTpResource removes Resources from the ONU's specified Uni
+func (oo *OnuDeviceEntry) deleteTpResource(ctx context.Context,
+	aUniID uint8, aPathString string, aResource resourceEntry, aEntryID uint32,
+	wg *sync.WaitGroup) {
+	defer wg.Done()
+	logger.Debugw("this would remove TP resources from ONU's UNI", log.Fields{
+		"device-id": oo.deviceID, "uniID": aUniID, "path": aPathString, "Resource": aResource})
+	//TODO!!!
+	//delete the given resource from ONU OMCI config and data base - as background routine
+	/*
+		var processingStep uint8 = 1 // used to synchronize the different processing steps with chTpConfigProcessingStep
+		go onuTp.deleteAniResource(ctx, processingStep)
+		if !onuTP.waitForTimeoutOrCompletion(ctx, chTpConfigProcessingStep, processingStep) {
+			//timeout or error detected
+			return
+		}
+	*/
+}
+
+func (oo *OnuDeviceEntry) updateOnuUniFlowConfig(aUniID uint8, aUniVlanFlowParams *[]uniVlanFlowParams) {
+
+	for k, v := range oo.sOnuPersistentData.PersUniConfig {
+		if v.PersUniID == aUniID {
+			oo.sOnuPersistentData.PersUniConfig[k].PersFlowParams = make([]uniVlanFlowParams, len(*aUniVlanFlowParams))
+			copy(oo.sOnuPersistentData.PersUniConfig[k].PersFlowParams, *aUniVlanFlowParams)
+			return
+		}
+	}
+	//flow update was faster than tp-config - create PersUniConfig-entry
+	tmpConfig := uniPersConfig{PersUniID: aUniID, PersTpPath: "", PersFlowParams: make([]uniVlanFlowParams, len(*aUniVlanFlowParams))}
+	copy(tmpConfig.PersFlowParams, *aUniVlanFlowParams)
+	oo.sOnuPersistentData.PersUniConfig = append(oo.sOnuPersistentData.PersUniConfig, tmpConfig)
+}
+
+func (oo *OnuDeviceEntry) waitForTimeoutOrCompletion(
+	ctx context.Context, aChOnuProcessingStep <-chan uint8, aProcessingStep uint8) bool {
+	select {
+	case <-ctx.Done():
+		logger.Warnw("processing not completed in-time!",
+			log.Fields{"device-id": oo.deviceID, "error": ctx.Err()})
+		return false
+	case rxStep := <-aChOnuProcessingStep:
+		if rxStep == aProcessingStep {
+			return true
+		}
+		//all other values are not accepted - including 0 for error indication
+		logger.Warnw("Invalid processing step received: abort!",
+			log.Fields{"device-id": oo.deviceID,
+				"wantedStep": aProcessingStep, "haveStep": rxStep})
+		return false
+	}
+}
+
+func (oo *OnuDeviceEntry) resetKvProcessingErrorIndication() {
+	oo.onuKVStoreprocResult = nil
+}
+func (oo *OnuDeviceEntry) getKvProcessingErrorIndication() error {
+	return oo.onuKVStoreprocResult
+}
+
+func (oo *OnuDeviceEntry) lockOnuKVStoreMutex() {
+	oo.onuKVStoreMutex.Lock()
+}
+
+func (oo *OnuDeviceEntry) unlockOnuKVStoreMutex() {
+	oo.onuKVStoreMutex.Unlock()
+}