[VOL-3442] reconcile treatment for existing flow configuration
Signed-off-by: Holger Hildebrandt <holger.hildebrandt@adtran.com>
Change-Id: I9f132f565d262cf8660efae3473aa2f7983c0464
diff --git a/internal/pkg/onuadaptercore/onu_device_entry.go b/internal/pkg/onuadaptercore/onu_device_entry.go
index 382984c..8c1d097 100644
--- a/internal/pkg/onuadaptercore/onu_device_entry.go
+++ b/internal/pkg/onuadaptercore/onu_device_entry.go
@@ -19,7 +19,10 @@
import (
"context"
+ "encoding/json"
"errors"
+ "fmt"
+ "sync"
"time"
"github.com/opencord/omci-lib-go"
@@ -31,6 +34,7 @@
"github.com/looplab/fsm"
"github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif"
"github.com/opencord/voltha-lib-go/v3/pkg/db"
+ "github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
//"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
@@ -104,6 +108,7 @@
const (
cBasePathMibTemplateKvStore = "service/voltha/omci_mibs/go_templates"
cSuffixMibTemplateKvStore = "%s/%s/%s"
+ cBasePathOnuKVStore = "service/voltha/openonu"
)
// OnuDeviceEvent - event of interest to Device Adapters and OpenOMCI State Machines
@@ -188,22 +193,52 @@
isActive uint8
}
+type uniVlanFlowParams struct {
+ TpID uint16 `json:"tp_id"`
+ MatchVid uint32 `json:"match_vid"` //use uint32 types for allowing immediate bitshifting
+ MatchPcp uint32 `json:"match_pcp"`
+ TagsToRemove uint32 `json:"tags_to_revome"`
+ SetVid uint32 `json:"set_vid"`
+ SetPcp uint32 `json:"set_pcp"`
+}
+
+type uniPersConfig struct {
+ PersUniID uint8 `json:"uni_id"`
+ PersTpPath string `json:"tp_path"`
+ PersFlowParams []uniVlanFlowParams `json:"flow_params"`
+}
+
+type onuPersistentData struct {
+ PersOnuID uint32 `json:"onu_id"`
+ PersIntfID uint32 `json:"intf_id"`
+ PersSnr string `json:"serial_number"`
+ PersAdminState string `json:"admin_state"`
+ PersOperState string `json:"oper_state"`
+ PersUniConfig []uniPersConfig `json:"uni_config"`
+}
+
// OnuDeviceEntry - ONU device info and FSM events.
type OnuDeviceEntry struct {
- deviceID string
- baseDeviceHandler *deviceHandler
- coreProxy adapterif.CoreProxy
- adapterProxy adapterif.AdapterProxy
- started bool
- PDevOmciCC *omciCC
- pOnuDB *onuDeviceDB
- mibTemplateKVStore *db.Backend
- vendorID string
- serialNumber string
- equipmentID string
- swImages [secondSwImageMeID + 1]swImages
- activeSwVersion string
- macAddress string
+ deviceID string
+ baseDeviceHandler *deviceHandler
+ coreProxy adapterif.CoreProxy
+ adapterProxy adapterif.AdapterProxy
+ started bool
+ PDevOmciCC *omciCC
+ pOnuDB *onuDeviceDB
+ mibTemplateKVStore *db.Backend
+ sOnuPersistentData onuPersistentData
+ onuKVStoreMutex sync.RWMutex
+ onuKVStore *db.Backend
+ onuKVStorePath string
+ onuKVStoreprocResult error //error indication of processing
+ chOnuKvProcessingStep chan uint8
+ vendorID string
+ serialNumber string
+ equipmentID string
+ swImages [secondSwImageMeID + 1]swImages
+ activeSwVersion string
+ macAddress string
//lockDeviceEntries sync.RWMutex
mibDbClass func() error
supportedFsms OmciDeviceFsms
@@ -235,6 +270,8 @@
onuDeviceEntry.coreProxy = coreProxy
onuDeviceEntry.adapterProxy = adapterProxy
onuDeviceEntry.devState = DeviceStatusInit
+ onuDeviceEntry.sOnuPersistentData.PersUniConfig = make([]uniPersConfig, 0)
+ onuDeviceEntry.chOnuKvProcessingStep = make(chan uint8)
onuDeviceEntry.omciRebootMessageReceivedChannel = make(chan Message, 2048)
//openomciagent.lockDeviceHandlersMap = sync.RWMutex{}
//OMCI related databases are on a per-agent basis. State machines and tasks
@@ -379,7 +416,15 @@
onuDeviceEntry.mibTemplateKVStore = onuDeviceEntry.baseDeviceHandler.setBackend(cBasePathMibTemplateKvStore)
if onuDeviceEntry.mibTemplateKVStore == nil {
- logger.Errorw("Failed to setup mibTemplateKVStore", log.Fields{"device-id": deviceID})
+ logger.Errorw("Can't access mibTemplateKVStore - no backend connection to service",
+ log.Fields{"device-id": deviceID, "service": cBasePathMibTemplateKvStore})
+ }
+
+ onuDeviceEntry.onuKVStorePath = onuDeviceEntry.deviceID
+ onuDeviceEntry.onuKVStore = onuDeviceEntry.baseDeviceHandler.setBackend(cBasePathOnuKVStore)
+ if onuDeviceEntry.onuKVStore == nil {
+ logger.Errorw("Can't access onuKVStore - no backend connection to service",
+ log.Fields{"device-id": deviceID, "service": cBasePathOnuKVStore})
}
// Alarm Synchronization Database
@@ -479,3 +524,217 @@
logger.Warnw("device-event not yet handled", log.Fields{"state": devEvent})
}
}
+
+func (oo *OnuDeviceEntry) restoreDataFromOnuKvStore(ctx context.Context) error {
+ if oo.onuKVStore == nil {
+ logger.Debugw("onuKVStore not set - abort", log.Fields{"device-id": oo.deviceID})
+ return fmt.Errorf(fmt.Sprintf("onuKVStore-not-set-abort-%s", oo.deviceID))
+ }
+ oo.sOnuPersistentData = onuPersistentData{0, 0, "", "", "", make([]uniPersConfig, 0)}
+ Value, err := oo.onuKVStore.Get(ctx, oo.onuKVStorePath)
+ if err == nil {
+ if Value != nil {
+ logger.Debugw("ONU-data read",
+ log.Fields{"Key": Value.Key, "device-id": oo.deviceID})
+ tmpBytes, _ := kvstore.ToByte(Value.Value)
+
+ if err = json.Unmarshal(tmpBytes, &oo.sOnuPersistentData); err != nil {
+ logger.Errorw("unable to unmarshal ONU-data", log.Fields{"error": err, "device-id": oo.deviceID})
+ return fmt.Errorf(fmt.Sprintf("unable-to-unmarshal-ONU-data-%s", oo.deviceID))
+ }
+ logger.Debugw("ONU-data", log.Fields{"sOnuPersistentData": oo.sOnuPersistentData,
+ "device-id": oo.deviceID})
+ } else {
+ logger.Errorw("no ONU-data found", log.Fields{"path": oo.onuKVStorePath, "device-id": oo.deviceID})
+ return fmt.Errorf(fmt.Sprintf("no-ONU-data-found-%s", oo.deviceID))
+ }
+ } else {
+ logger.Errorw("unable to read from KVstore", log.Fields{"device-id": oo.deviceID})
+ return fmt.Errorf(fmt.Sprintf("unable-to-read-from-KVstore-%s", oo.deviceID))
+ }
+ return nil
+}
+
+func (oo *OnuDeviceEntry) deleteDataFromOnuKvStore(ctx context.Context, wg *sync.WaitGroup) {
+ defer wg.Done()
+
+ if oo.onuKVStore == nil {
+ logger.Debugw("onuKVStore not set - abort", log.Fields{"device-id": oo.deviceID})
+ oo.onuKVStoreprocResult = errors.New("onu-data delete aborted: onuKVStore not set")
+ return
+ }
+ var processingStep uint8 = 1 // used to synchronize the different processing steps with chOnuKvProcessingStep
+ go oo.deletePersistentData(ctx, processingStep)
+ if !oo.waitForTimeoutOrCompletion(ctx, oo.chOnuKvProcessingStep, processingStep) {
+ //timeout or error detected
+ logger.Debugw("ONU-data not deleted - abort", log.Fields{"device-id": oo.deviceID})
+ oo.onuKVStoreprocResult = errors.New("onu-data delete aborted: during kv-access")
+ return
+ }
+}
+
+func (oo *OnuDeviceEntry) deletePersistentData(ctx context.Context, aProcessingStep uint8) {
+
+ logger.Debugw("delete ONU-data from KVStore", log.Fields{"device-id": oo.deviceID})
+ err := oo.onuKVStore.Delete(ctx, oo.onuKVStorePath)
+ if err != nil {
+ logger.Errorw("unable to delete in KVstore", log.Fields{"device-id": oo.deviceID, "err": err})
+ oo.chOnuKvProcessingStep <- 0 //error indication
+ return
+ }
+ oo.chOnuKvProcessingStep <- aProcessingStep //done
+}
+
+func (oo *OnuDeviceEntry) updateOnuKvStore(ctx context.Context, wg *sync.WaitGroup) {
+ defer wg.Done()
+
+ if oo.onuKVStore == nil {
+ logger.Debugw("onuKVStore not set - abort", log.Fields{"device-id": oo.deviceID})
+ oo.onuKVStoreprocResult = errors.New("onu-data update aborted: onuKVStore not set")
+ return
+ }
+ var processingStep uint8 = 1 // used to synchronize the different processing steps with chOnuKvProcessingStep
+ go oo.storeDataInOnuKvStore(ctx, processingStep)
+ if !oo.waitForTimeoutOrCompletion(ctx, oo.chOnuKvProcessingStep, processingStep) {
+ //timeout or error detected
+ logger.Debugw("ONU-data not written - abort", log.Fields{"device-id": oo.deviceID})
+ oo.onuKVStoreprocResult = errors.New("onu-data update aborted: during writing process")
+ return
+ }
+}
+
+func (oo *OnuDeviceEntry) storeDataInOnuKvStore(ctx context.Context, aProcessingStep uint8) {
+
+ //assign values which are not already present when newOnuDeviceEntry() is called
+ oo.sOnuPersistentData.PersOnuID = oo.baseDeviceHandler.pOnuIndication.OnuId
+ oo.sOnuPersistentData.PersIntfID = oo.baseDeviceHandler.pOnuIndication.IntfId
+ oo.sOnuPersistentData.PersSnr = oo.baseDeviceHandler.pOnuOmciDevice.serialNumber
+ //TODO: verify usage of these values during restart UC
+ oo.sOnuPersistentData.PersAdminState = "up"
+ oo.sOnuPersistentData.PersOperState = "active"
+
+ logger.Debugw("Update ONU-data in KVStore", log.Fields{"device-id": oo.deviceID, "sOnuPersistentData": oo.sOnuPersistentData})
+
+ Value, err := json.Marshal(oo.sOnuPersistentData)
+ if err != nil {
+ logger.Errorw("unable to marshal ONU-data", log.Fields{"sOnuPersistentData": oo.sOnuPersistentData,
+ "device-id": oo.deviceID, "err": err})
+ oo.chOnuKvProcessingStep <- 0 //error indication
+ return
+ }
+ err = oo.onuKVStore.Put(ctx, oo.onuKVStorePath, Value)
+ if err != nil {
+ logger.Errorw("unable to write ONU-data into KVstore", log.Fields{"device-id": oo.deviceID, "err": err})
+ oo.chOnuKvProcessingStep <- 0 //error indication
+ return
+ }
+ oo.chOnuKvProcessingStep <- aProcessingStep //done
+}
+
+func (oo *OnuDeviceEntry) updateOnuUniTpPath(aUniID uint8, aPathString string) bool {
+ /* within some specific InterAdapter processing request write/read access to data is ensured to be sequentially,
+ as also the complete sequence is ensured to 'run to completion' before some new request is accepted
+ no specific concurrency protection to sOnuPersistentData is required here
+ */
+ for k, v := range oo.sOnuPersistentData.PersUniConfig {
+ if v.PersUniID == aUniID {
+ logger.Debugw("PersUniConfig-entry already exists", log.Fields{"device-id": oo.deviceID, "uniID": aUniID})
+ existingPath := oo.sOnuPersistentData.PersUniConfig[k].PersTpPath
+ if existingPath != aPathString {
+ if aPathString == "" {
+ //existing entry to be deleted
+ logger.Debugw("UniTp delete path value", log.Fields{"device-id": oo.deviceID, "uniID": aUniID, "path": aPathString})
+ oo.sOnuPersistentData.PersUniConfig[k].PersTpPath = ""
+ } else {
+ //existing entry to be modified
+ logger.Debugw("UniTp modify path value", log.Fields{"device-id": oo.deviceID, "uniID": aUniID, "path": aPathString})
+ oo.sOnuPersistentData.PersUniConfig[k].PersTpPath = aPathString
+ }
+ return true
+ }
+ //entry already exists
+ logger.Debugw("UniTp path already exists", log.Fields{"device-id": oo.deviceID, "uniID": aUniID, "path": aPathString})
+ return false
+ }
+ }
+ //no entry exists for uniId
+
+ if aPathString == "" {
+ //delete request in non-existing state , accept as no change
+ logger.Debugw("UniTp path already removed", log.Fields{"device-id": oo.deviceID, "uniID": aUniID})
+ return false
+ }
+ //new entry to be created
+ logger.Debugw("New UniTp path set", log.Fields{"device-id": oo.deviceID, "uniID": aUniID, "path": aPathString})
+ oo.sOnuPersistentData.PersUniConfig =
+ append(oo.sOnuPersistentData.PersUniConfig, uniPersConfig{PersUniID: aUniID, PersTpPath: aPathString, PersFlowParams: make([]uniVlanFlowParams, 0)})
+ return true
+}
+
+// deleteTpResource removes Resources from the ONU's specified Uni
+func (oo *OnuDeviceEntry) deleteTpResource(ctx context.Context,
+ aUniID uint8, aPathString string, aResource resourceEntry, aEntryID uint32,
+ wg *sync.WaitGroup) {
+ defer wg.Done()
+ logger.Debugw("this would remove TP resources from ONU's UNI", log.Fields{
+ "device-id": oo.deviceID, "uniID": aUniID, "path": aPathString, "Resource": aResource})
+ //TODO!!!
+ //delete the given resource from ONU OMCI config and data base - as background routine
+ /*
+ var processingStep uint8 = 1 // used to synchronize the different processing steps with chTpConfigProcessingStep
+ go onuTp.deleteAniResource(ctx, processingStep)
+ if !onuTP.waitForTimeoutOrCompletion(ctx, chTpConfigProcessingStep, processingStep) {
+ //timeout or error detected
+ return
+ }
+ */
+}
+
+func (oo *OnuDeviceEntry) updateOnuUniFlowConfig(aUniID uint8, aUniVlanFlowParams *[]uniVlanFlowParams) {
+
+ for k, v := range oo.sOnuPersistentData.PersUniConfig {
+ if v.PersUniID == aUniID {
+ oo.sOnuPersistentData.PersUniConfig[k].PersFlowParams = make([]uniVlanFlowParams, len(*aUniVlanFlowParams))
+ copy(oo.sOnuPersistentData.PersUniConfig[k].PersFlowParams, *aUniVlanFlowParams)
+ return
+ }
+ }
+ //flow update was faster than tp-config - create PersUniConfig-entry
+ tmpConfig := uniPersConfig{PersUniID: aUniID, PersTpPath: "", PersFlowParams: make([]uniVlanFlowParams, len(*aUniVlanFlowParams))}
+ copy(tmpConfig.PersFlowParams, *aUniVlanFlowParams)
+ oo.sOnuPersistentData.PersUniConfig = append(oo.sOnuPersistentData.PersUniConfig, tmpConfig)
+}
+
+func (oo *OnuDeviceEntry) waitForTimeoutOrCompletion(
+ ctx context.Context, aChOnuProcessingStep <-chan uint8, aProcessingStep uint8) bool {
+ select {
+ case <-ctx.Done():
+ logger.Warnw("processing not completed in-time!",
+ log.Fields{"device-id": oo.deviceID, "error": ctx.Err()})
+ return false
+ case rxStep := <-aChOnuProcessingStep:
+ if rxStep == aProcessingStep {
+ return true
+ }
+ //all other values are not accepted - including 0 for error indication
+ logger.Warnw("Invalid processing step received: abort!",
+ log.Fields{"device-id": oo.deviceID,
+ "wantedStep": aProcessingStep, "haveStep": rxStep})
+ return false
+ }
+}
+
+func (oo *OnuDeviceEntry) resetKvProcessingErrorIndication() {
+ oo.onuKVStoreprocResult = nil
+}
+func (oo *OnuDeviceEntry) getKvProcessingErrorIndication() error {
+ return oo.onuKVStoreprocResult
+}
+
+func (oo *OnuDeviceEntry) lockOnuKVStoreMutex() {
+ oo.onuKVStoreMutex.Lock()
+}
+
+func (oo *OnuDeviceEntry) unlockOnuKVStoreMutex() {
+ oo.onuKVStoreMutex.Unlock()
+}