VOL-4077: Improve storage usage on etcd
- the onu adapter will now receive the tp instance on the
inter-container kafka message and need not reach the etcd store
to fetch it.
- on reconcile, the onu adapter need to go to the kv store to
fetch the tp instance, but request the tp instance via a new
API towards openolt adapter which in-turn will fetch from the
cache.
- re-org the code in onu-metrics-manager to restore pm-data
on reconcile to avoid panics by accessing uninitialzed data
if ani-fsm were to try adding gem port for monitoring
before pm-data is initialized properly.
Change-Id: I82a6de2772155f6e08390b671fe26d692dd02c99
diff --git a/internal/pkg/onuadaptercore/device_handler.go b/internal/pkg/onuadaptercore/device_handler.go
index 00cc09b..e5444d4 100644
--- a/internal/pkg/onuadaptercore/device_handler.go
+++ b/internal/pkg/onuadaptercore/device_handler.go
@@ -21,6 +21,7 @@
"context"
"errors"
"fmt"
+ "github.com/opencord/voltha-protos/v4/go/tech_profile"
"strconv"
"sync"
"time"
@@ -29,11 +30,11 @@
"github.com/golang/protobuf/ptypes"
"github.com/looplab/fsm"
me "github.com/opencord/omci-lib-go/generated"
- "github.com/opencord/voltha-lib-go/v4/pkg/adapters/adapterif"
- "github.com/opencord/voltha-lib-go/v4/pkg/db"
- "github.com/opencord/voltha-lib-go/v4/pkg/events/eventif"
- flow "github.com/opencord/voltha-lib-go/v4/pkg/flows"
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-lib-go/v5/pkg/adapters/adapterif"
+ "github.com/opencord/voltha-lib-go/v5/pkg/db"
+ "github.com/opencord/voltha-lib-go/v5/pkg/events/eventif"
+ flow "github.com/opencord/voltha-lib-go/v5/pkg/flows"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
vc "github.com/opencord/voltha-protos/v4/go/common"
"github.com/opencord/voltha-protos/v4/go/extension"
ic "github.com/opencord/voltha-protos/v4/go/inter_container"
@@ -138,6 +139,7 @@
drReconcileFailed = 13
drReconcileMaxTimeout = 14
drReconcileCanceled = 15
+ drTechProfileConfigDownloadFailed = 16
)
var deviceReasonMap = map[uint8]string{
@@ -147,6 +149,7 @@
drDiscoveryMibsyncComplete: "discovery-mibsync-complete",
drInitialMibDownloaded: "initial-mib-downloaded",
drTechProfileConfigDownloadSuccess: "tech-profile-config-download-success",
+ drTechProfileConfigDownloadFailed: "tech-profile-config-download-failed",
drOmciFlowsPushed: "omci-flows-pushed",
drOmciAdminLock: "omci-admin-lock",
drOnuReenabled: "onu-reenabled",
@@ -409,52 +412,59 @@
techProfMsg.UniId, dh.deviceID))
}
uniID := uint8(techProfMsg.UniId)
- tpID, err := GetTpIDFromTpPath(techProfMsg.Path)
+ tpID, err := GetTpIDFromTpPath(techProfMsg.TpInstancePath)
if err != nil {
- logger.Errorw(ctx, "error-parsing-tpid-from-tppath", log.Fields{"err": err, "tp-path": techProfMsg.Path})
+ logger.Errorw(ctx, "error-parsing-tpid-from-tppath", log.Fields{"err": err, "tp-path": techProfMsg.TpInstancePath})
return err
}
- logger.Debugw(ctx, "unmarshal-techprof-msg-body", log.Fields{"uniID": uniID, "tp-path": techProfMsg.Path, "tpID": tpID})
+ logger.Debugw(ctx, "unmarshal-techprof-msg-body", log.Fields{"uniID": uniID, "tp-path": techProfMsg.TpInstancePath, "tpID": tpID})
- if bTpModify := pDevEntry.updateOnuUniTpPath(ctx, uniID, uint8(tpID), techProfMsg.Path); bTpModify {
- logger.Debugw(ctx, "onu-uni-tp-path-modified", log.Fields{"uniID": uniID, "tp-path": techProfMsg.Path, "tpID": tpID})
- // if there has been some change for some uni TechProfilePath
- //in order to allow concurrent calls to other dh instances we do not wait for execution here
- //but doing so we can not indicate problems to the caller (who does what with that then?)
- //by now we just assume straightforward successful execution
- //TODO!!! Generally: In this scheme it would be good to have some means to indicate
- // possible problems to the caller later autonomously
+ if bTpModify := pDevEntry.updateOnuUniTpPath(ctx, uniID, uint8(tpID), techProfMsg.TpInstancePath); bTpModify {
- // deadline context to ensure completion of background routines waited for
- //20200721: 10s proved to be less in 8*8 ONU test on local vbox machine with debug, might be further adapted
- deadline := time.Now().Add(dh.pOpenOnuAc.maxTimeoutInterAdapterComm) //allowed run time to finish before execution
- dctx, cancel := context.WithDeadline(context.Background(), deadline)
+ switch tpInst := techProfMsg.TechTpInstance.(type) {
+ case *ic.InterAdapterTechProfileDownloadMessage_TpInstance:
+ logger.Debugw(ctx, "onu-uni-tp-path-modified", log.Fields{"uniID": uniID, "tp-path": techProfMsg.TpInstancePath, "tpID": tpID})
+ // if there has been some change for some uni TechProfilePath
+ //in order to allow concurrent calls to other dh instances we do not wait for execution here
+ //but doing so we can not indicate problems to the caller (who does what with that then?)
+ //by now we just assume straightforward successful execution
+ //TODO!!! Generally: In this scheme it would be good to have some means to indicate
+ // possible problems to the caller later autonomously
- dh.pOnuTP.resetTpProcessingErrorIndication(uniID, tpID)
+ // deadline context to ensure completion of background routines waited for
+ //20200721: 10s proved to be less in 8*8 ONU test on local vbox machine with debug, might be further adapted
+ deadline := time.Now().Add(dh.pOpenOnuAc.maxTimeoutInterAdapterComm) //allowed run time to finish before execution
+ dctx, cancel := context.WithDeadline(context.Background(), deadline)
- var wg sync.WaitGroup
- wg.Add(1) // for the 1 go routine to finish
- // attention: deadline completion check and wg.Done is to be done in both routines
- go dh.pOnuTP.configureUniTp(log.WithSpanFromContext(dctx, ctx), uniID, techProfMsg.Path, &wg)
- dh.waitForCompletion(ctx, cancel, &wg, "TechProfDwld") //wait for background process to finish
- if tpErr := dh.pOnuTP.getTpProcessingErrorIndication(uniID, tpID); tpErr != nil {
- logger.Errorw(ctx, "error-processing-tp", log.Fields{"device-id": dh.deviceID, "err": tpErr, "tp-path": techProfMsg.Path})
- return tpErr
+ dh.pOnuTP.resetTpProcessingErrorIndication(uniID, tpID)
+
+ var wg sync.WaitGroup
+ wg.Add(1) // for the 1 go routine to finish
+ // attention: deadline completion check and wg.Done is to be done in both routines
+ go dh.pOnuTP.configureUniTp(log.WithSpanFromContext(dctx, ctx), uniID, techProfMsg.TpInstancePath, *tpInst.TpInstance, &wg)
+ dh.waitForCompletion(ctx, cancel, &wg, "TechProfDwld") //wait for background process to finish
+ if tpErr := dh.pOnuTP.getTpProcessingErrorIndication(uniID, tpID); tpErr != nil {
+ logger.Errorw(ctx, "error-processing-tp", log.Fields{"device-id": dh.deviceID, "err": tpErr, "tp-path": techProfMsg.TpInstancePath})
+ return tpErr
+ }
+ deadline = time.Now().Add(dh.pOpenOnuAc.maxTimeoutInterAdapterComm) //allowed run time to finish before execution
+ dctx2, cancel2 := context.WithDeadline(context.Background(), deadline)
+ pDevEntry.resetKvProcessingErrorIndication()
+ wg.Add(1) // for the 1 go routine to finish
+ go pDevEntry.updateOnuKvStore(log.WithSpanFromContext(dctx2, ctx), &wg)
+ dh.waitForCompletion(ctx, cancel2, &wg, "TechProfDwld") //wait for background process to finish
+ if kvErr := pDevEntry.getKvProcessingErrorIndication(); kvErr != nil {
+ logger.Errorw(ctx, "error-updating-KV", log.Fields{"device-id": dh.deviceID, "err": kvErr, "tp-path": techProfMsg.TpInstancePath})
+ return kvErr
+ }
+ return nil
+ default:
+ logger.Errorw(ctx, "unsupported-tp-instance-type", log.Fields{"tp-path": techProfMsg.TpInstancePath})
+ return fmt.Errorf("unsupported-tp-instance-type--tp-id-%v", techProfMsg.TpInstancePath)
}
- deadline = time.Now().Add(dh.pOpenOnuAc.maxTimeoutInterAdapterComm) //allowed run time to finish before execution
- dctx2, cancel2 := context.WithDeadline(context.Background(), deadline)
- pDevEntry.resetKvProcessingErrorIndication()
- wg.Add(1) // for the 1 go routine to finish
- go pDevEntry.updateOnuKvStore(log.WithSpanFromContext(dctx2, ctx), &wg)
- dh.waitForCompletion(ctx, cancel2, &wg, "TechProfDwld") //wait for background process to finish
- if kvErr := pDevEntry.getKvProcessingErrorIndication(); kvErr != nil {
- logger.Errorw(ctx, "error-updating-KV", log.Fields{"device-id": dh.deviceID, "err": kvErr, "tp-path": techProfMsg.Path})
- return kvErr
- }
- return nil
}
// no change, nothing really to do - return success
- logger.Debugw(ctx, "onu-uni-tp-path-not-modified", log.Fields{"uniID": uniID, "tp-path": techProfMsg.Path, "tpID": tpID})
+ logger.Debugw(ctx, "onu-uni-tp-path-not-modified", log.Fields{"uniID": uniID, "tp-path": techProfMsg.TpInstancePath, "tpID": tpID})
return nil
}
@@ -493,9 +503,9 @@
delGemPortMsg.UniId, dh.deviceID))
}
uniID := uint8(delGemPortMsg.UniId)
- tpID, err := GetTpIDFromTpPath(delGemPortMsg.TpPath)
+ tpID, err := GetTpIDFromTpPath(delGemPortMsg.TpInstancePath)
if err != nil {
- logger.Errorw(ctx, "error-extracting-tp-id-from-tp-path", log.Fields{"err": err, "tp-path": delGemPortMsg.TpPath})
+ logger.Errorw(ctx, "error-extracting-tp-id-from-tp-path", log.Fields{"err": err, "tp-path": delGemPortMsg.TpInstancePath})
return err
}
@@ -509,7 +519,7 @@
var wg sync.WaitGroup
wg.Add(1) // for the 1 go routine to finish
- go dh.pOnuTP.deleteTpResource(log.WithSpanFromContext(dctx, ctx), uniID, tpID, delGemPortMsg.TpPath,
+ go dh.pOnuTP.deleteTpResource(log.WithSpanFromContext(dctx, ctx), uniID, tpID, delGemPortMsg.TpInstancePath,
cResourceGemPort, delGemPortMsg.GemPortId, &wg)
dh.waitForCompletion(ctx, cancel, &wg, "GemDelete") //wait for background process to finish
@@ -551,7 +561,7 @@
delTcontMsg.UniId, dh.deviceID))
}
uniID := uint8(delTcontMsg.UniId)
- tpPath := delTcontMsg.TpPath
+ tpPath := delTcontMsg.TpInstancePath
tpID, err := GetTpIDFromTpPath(tpPath)
if err != nil {
logger.Errorw(ctx, "error-extracting-tp-id-from-tp-path", log.Fields{"err": err, "tp-path": tpPath})
@@ -569,7 +579,7 @@
var wg sync.WaitGroup
wg.Add(2) // for the 2 go routines to finish
- go dh.pOnuTP.deleteTpResource(log.WithSpanFromContext(dctx, ctx), uniID, tpID, delTcontMsg.TpPath,
+ go dh.pOnuTP.deleteTpResource(log.WithSpanFromContext(dctx, ctx), uniID, tpID, delTcontMsg.TpInstancePath,
cResourceTcont, delTcontMsg.AllocId, &wg)
// Removal of the tcont/alloc id mapping represents the removal of the tech profile
go pDevEntry.updateOnuKvStore(log.WithSpanFromContext(dctx, ctx), &wg)
@@ -822,7 +832,7 @@
} else {
logger.Errorw(ctx, "reconciling - restoring OnuTp-data failed - abort", log.Fields{"err": err, "device-id": dh.deviceID})
}
- dh.stopReconciling(ctx)
+ dh.stopReconciling(ctx, false)
return
}
var onuIndication oop.OnuIndication
@@ -842,7 +852,7 @@
if pDevEntry == nil {
logger.Errorw(ctx, "No valid OnuDevice - aborting", log.Fields{"device-id": dh.deviceID})
if !dh.isSkipOnuConfigReconciling() {
- dh.stopReconciling(ctx)
+ dh.stopReconciling(ctx, false)
}
return
}
@@ -855,12 +865,14 @@
logger.Debugw(ctx, "reconciling - no uni-configs have been stored before adapter restart - terminate reconcilement",
log.Fields{"device-id": dh.deviceID})
if !dh.isSkipOnuConfigReconciling() {
- dh.stopReconciling(ctx)
+ dh.stopReconciling(ctx, true)
}
return
}
- techProfsFound := false
flowsFound := false
+ techProfsFound := false
+ techProfInstLoadFailed := false
+outerLoop:
for _, uniData := range pDevEntry.sOnuPersistentData.PersUniConfig {
//TODO: check for uni-port specific reconcilement in case of multi-uni-port-per-onu-support
if len(uniData.PersTpPathMap) == 0 {
@@ -868,8 +880,31 @@
log.Fields{"uni-id": uniData.PersUniID, "device-id": dh.deviceID})
continue
}
- techProfsFound = true
+ techProfsFound = true // set to true if we found TP once for any UNI port
for tpID := range uniData.PersTpPathMap {
+ // Request the TpInstance again from the openolt adapter in case of reconcile
+ iaTechTpInst, err := dh.AdapterProxy.TechProfileInstanceRequest(ctx, uniData.PersTpPathMap[tpID],
+ dh.device.ParentPortNo, dh.device.ProxyAddress.OnuId, uint32(uniData.PersUniID),
+ dh.pOpenOnuAc.config.Topic, dh.ProxyAddressType,
+ dh.parentID, dh.ProxyAddressID)
+ if err != nil || iaTechTpInst == nil {
+ logger.Errorw(ctx, "error fetching tp instance",
+ log.Fields{"tp-id": tpID, "tpPath": uniData.PersTpPathMap[tpID], "uni-id": uniData.PersUniID, "device-id": dh.deviceID, "err": err})
+ techProfInstLoadFailed = true // stop loading tp instance as soon as we hit failure
+ break outerLoop
+ }
+ var tpInst tech_profile.TechProfileInstance
+ switch techTpInst := iaTechTpInst.TechTpInstance.(type) {
+ case *ic.InterAdapterTechProfileDownloadMessage_TpInstance: // supports only GPON, XGPON, XGS-PON
+ tpInst = *techTpInst.TpInstance
+ logger.Debugw(ctx, "received-tp-instance-successfully-after-reconcile", log.Fields{"tp-id": tpID, "tpPath": uniData.PersTpPathMap[tpID], "uni-id": uniData.PersUniID, "device-id": dh.deviceID})
+
+ default: // do not support epon or other tech
+ logger.Errorw(ctx, "unsupported-tech", log.Fields{"tp-id": tpID, "tpPath": uniData.PersTpPathMap[tpID], "uni-id": uniData.PersUniID, "device-id": dh.deviceID})
+ techProfInstLoadFailed = true // stop loading tp instance as soon as we hit failure
+ break outerLoop
+ }
+
// deadline context to ensure completion of background routines waited for
//20200721: 10s proved to be less in 8*8 ONU test on local vbox machine with debug, might be further adapted
deadline := time.Now().Add(dh.pOpenOnuAc.maxTimeoutInterAdapterComm) //allowed run time to finish before execution
@@ -878,10 +913,12 @@
dh.pOnuTP.resetTpProcessingErrorIndication(uniData.PersUniID, tpID)
var wg sync.WaitGroup
wg.Add(1) // for the 1 go routine to finish
- go dh.pOnuTP.configureUniTp(log.WithSpanFromContext(dctx, ctx), uniData.PersUniID, uniData.PersTpPathMap[tpID], &wg)
+ go dh.pOnuTP.configureUniTp(log.WithSpanFromContext(dctx, ctx), uniData.PersUniID, uniData.PersTpPathMap[tpID], tpInst, &wg)
dh.waitForCompletion(ctx, cancel, &wg, "TechProfReconcile") //wait for background process to finish
if err := dh.pOnuTP.getTpProcessingErrorIndication(uniData.PersUniID, tpID); err != nil {
logger.Errorw(ctx, err.Error(), log.Fields{"device-id": dh.deviceID})
+ techProfInstLoadFailed = true // stop loading tp instance as soon as we hit failure
+ break outerLoop
}
}
if len(uniData.PersFlowParams) != 0 {
@@ -892,18 +929,22 @@
logger.Debugw(ctx, "reconciling - no TPs have been stored before adapter restart - terminate reconcilement",
log.Fields{"device-id": dh.deviceID})
if !dh.isSkipOnuConfigReconciling() {
- dh.stopReconciling(ctx)
+ dh.stopReconciling(ctx, true)
}
return
}
- if dh.isSkipOnuConfigReconciling() {
+ if techProfInstLoadFailed {
+ dh.setDeviceReason(drTechProfileConfigDownloadFailed)
+ dh.stopReconciling(ctx, false)
+ return
+ } else if dh.isSkipOnuConfigReconciling() {
dh.setDeviceReason(drTechProfileConfigDownloadSuccess)
}
if !flowsFound {
logger.Debugw(ctx, "reconciling - no flows have been stored before adapter restart - terminate reconcilement",
log.Fields{"device-id": dh.deviceID})
if !dh.isSkipOnuConfigReconciling() {
- dh.stopReconciling(ctx)
+ dh.stopReconciling(ctx, true)
}
}
}
@@ -915,7 +956,7 @@
if pDevEntry == nil {
logger.Errorw(ctx, "No valid OnuDevice - aborting", log.Fields{"device-id": dh.deviceID})
if !dh.isSkipOnuConfigReconciling() {
- dh.stopReconciling(ctx)
+ dh.stopReconciling(ctx, false)
}
return
}
@@ -926,7 +967,7 @@
logger.Debugw(ctx, "reconciling - no uni-configs have been stored before adapter restart - terminate reconcilement",
log.Fields{"device-id": dh.deviceID})
if !dh.isSkipOnuConfigReconciling() {
- dh.stopReconciling(ctx)
+ dh.stopReconciling(ctx, true)
}
return
}
@@ -951,7 +992,7 @@
logger.Errorw(ctx, "reconciling - onuUniPort data not found - terminate reconcilement",
log.Fields{"uniNo": uniNo, "device-id": dh.deviceID})
if !dh.isSkipOnuConfigReconciling() {
- dh.stopReconciling(ctx)
+ dh.stopReconciling(ctx, false)
}
return
}
@@ -997,7 +1038,7 @@
logger.Debugw(ctx, "reconciling - no flows have been stored before adapter restart - terminate reconcilement",
log.Fields{"device-id": dh.deviceID})
if !dh.isSkipOnuConfigReconciling() {
- dh.stopReconciling(ctx)
+ dh.stopReconciling(ctx, true)
}
return
}
@@ -1008,7 +1049,7 @@
func (dh *deviceHandler) reconcileEnd(ctx context.Context) {
logger.Debugw(ctx, "reconciling - completed!", log.Fields{"device-id": dh.deviceID})
- dh.stopReconciling(ctx)
+ dh.stopReconciling(ctx, true)
}
func (dh *deviceHandler) deleteDevicePersistencyData(ctx context.Context) error {
@@ -1752,7 +1793,7 @@
pDevEntry.mutexPersOnuConfig.RUnlock()
logger.Debugw(ctx, "reconciling - uni-ports were not unlocked before adapter restart - resume with a normal start-up",
log.Fields{"device-id": dh.deviceID})
- dh.stopReconciling(ctx)
+ dh.stopReconciling(ctx, true)
} else {
pDevEntry.mutexPersOnuConfig.RUnlock()
}
@@ -2715,6 +2756,7 @@
logger.Debugw(ctx, "SetKVStoreBackend", log.Fields{"IpTarget": dh.pOpenOnuAc.KVStoreAddress,
"BasePathKvStore": aBasePathKvStore, "device-id": dh.deviceID})
+ // kvbackend := db.NewBackend(ctx, dh.pOpenOnuAc.KVStoreType, dh.pOpenOnuAc.KVStoreAddress, dh.pOpenOnuAc.KVStoreTimeout, aBasePathKvStore)
kvbackend := &db.Backend{
Client: dh.pOpenOnuAc.kvClient,
StoreType: dh.pOpenOnuAc.KVStoreType,
@@ -3614,10 +3656,10 @@
dh.mutexReconcilingFlag.Unlock()
}
-func (dh *deviceHandler) stopReconciling(ctx context.Context) {
- logger.Debugw(ctx, "stop reconciling", log.Fields{"device-id": dh.deviceID})
+func (dh *deviceHandler) stopReconciling(ctx context.Context, success bool) {
+ logger.Debugw(ctx, "stop reconciling", log.Fields{"device-id": dh.deviceID, "success": success})
if dh.isReconciling() {
- dh.chReconcilingFinished <- true
+ dh.chReconcilingFinished <- success
} else {
logger.Infow(ctx, "reconciling is not running", log.Fields{"device-id": dh.deviceID})
}