VOL-4077: Improve storage usage on etcd
- the onu adapter will now receive the tp instance on the
  inter-container kafka message and need not reach the etcd store
  to fetch it.
- on reconcile, the onu adapter need to go to the kv store to
  fetch the tp instance, but request the tp instance via a new
  API towards openolt adapter which in-turn will fetch from the
  cache.
- re-org the code in onu-metrics-manager to restore pm-data
  on reconcile to avoid panics by accessing uninitialzed data
  if ani-fsm were to try adding gem port for monitoring
  before pm-data is initialized properly.

Change-Id: I82a6de2772155f6e08390b671fe26d692dd02c99
diff --git a/internal/pkg/onuadaptercore/device_handler.go b/internal/pkg/onuadaptercore/device_handler.go
index 00cc09b..e5444d4 100644
--- a/internal/pkg/onuadaptercore/device_handler.go
+++ b/internal/pkg/onuadaptercore/device_handler.go
@@ -21,6 +21,7 @@
 	"context"
 	"errors"
 	"fmt"
+	"github.com/opencord/voltha-protos/v4/go/tech_profile"
 	"strconv"
 	"sync"
 	"time"
@@ -29,11 +30,11 @@
 	"github.com/golang/protobuf/ptypes"
 	"github.com/looplab/fsm"
 	me "github.com/opencord/omci-lib-go/generated"
-	"github.com/opencord/voltha-lib-go/v4/pkg/adapters/adapterif"
-	"github.com/opencord/voltha-lib-go/v4/pkg/db"
-	"github.com/opencord/voltha-lib-go/v4/pkg/events/eventif"
-	flow "github.com/opencord/voltha-lib-go/v4/pkg/flows"
-	"github.com/opencord/voltha-lib-go/v4/pkg/log"
+	"github.com/opencord/voltha-lib-go/v5/pkg/adapters/adapterif"
+	"github.com/opencord/voltha-lib-go/v5/pkg/db"
+	"github.com/opencord/voltha-lib-go/v5/pkg/events/eventif"
+	flow "github.com/opencord/voltha-lib-go/v5/pkg/flows"
+	"github.com/opencord/voltha-lib-go/v5/pkg/log"
 	vc "github.com/opencord/voltha-protos/v4/go/common"
 	"github.com/opencord/voltha-protos/v4/go/extension"
 	ic "github.com/opencord/voltha-protos/v4/go/inter_container"
@@ -138,6 +139,7 @@
 	drReconcileFailed                  = 13
 	drReconcileMaxTimeout              = 14
 	drReconcileCanceled                = 15
+	drTechProfileConfigDownloadFailed  = 16
 )
 
 var deviceReasonMap = map[uint8]string{
@@ -147,6 +149,7 @@
 	drDiscoveryMibsyncComplete:         "discovery-mibsync-complete",
 	drInitialMibDownloaded:             "initial-mib-downloaded",
 	drTechProfileConfigDownloadSuccess: "tech-profile-config-download-success",
+	drTechProfileConfigDownloadFailed:  "tech-profile-config-download-failed",
 	drOmciFlowsPushed:                  "omci-flows-pushed",
 	drOmciAdminLock:                    "omci-admin-lock",
 	drOnuReenabled:                     "onu-reenabled",
@@ -409,52 +412,59 @@
 			techProfMsg.UniId, dh.deviceID))
 	}
 	uniID := uint8(techProfMsg.UniId)
-	tpID, err := GetTpIDFromTpPath(techProfMsg.Path)
+	tpID, err := GetTpIDFromTpPath(techProfMsg.TpInstancePath)
 	if err != nil {
-		logger.Errorw(ctx, "error-parsing-tpid-from-tppath", log.Fields{"err": err, "tp-path": techProfMsg.Path})
+		logger.Errorw(ctx, "error-parsing-tpid-from-tppath", log.Fields{"err": err, "tp-path": techProfMsg.TpInstancePath})
 		return err
 	}
-	logger.Debugw(ctx, "unmarshal-techprof-msg-body", log.Fields{"uniID": uniID, "tp-path": techProfMsg.Path, "tpID": tpID})
+	logger.Debugw(ctx, "unmarshal-techprof-msg-body", log.Fields{"uniID": uniID, "tp-path": techProfMsg.TpInstancePath, "tpID": tpID})
 
-	if bTpModify := pDevEntry.updateOnuUniTpPath(ctx, uniID, uint8(tpID), techProfMsg.Path); bTpModify {
-		logger.Debugw(ctx, "onu-uni-tp-path-modified", log.Fields{"uniID": uniID, "tp-path": techProfMsg.Path, "tpID": tpID})
-		//	if there has been some change for some uni TechProfilePath
-		//in order to allow concurrent calls to other dh instances we do not wait for execution here
-		//but doing so we can not indicate problems to the caller (who does what with that then?)
-		//by now we just assume straightforward successful execution
-		//TODO!!! Generally: In this scheme it would be good to have some means to indicate
-		//  possible problems to the caller later autonomously
+	if bTpModify := pDevEntry.updateOnuUniTpPath(ctx, uniID, uint8(tpID), techProfMsg.TpInstancePath); bTpModify {
 
-		// deadline context to ensure completion of background routines waited for
-		//20200721: 10s proved to be less in 8*8 ONU test on local vbox machine with debug, might be further adapted
-		deadline := time.Now().Add(dh.pOpenOnuAc.maxTimeoutInterAdapterComm) //allowed run time to finish before execution
-		dctx, cancel := context.WithDeadline(context.Background(), deadline)
+		switch tpInst := techProfMsg.TechTpInstance.(type) {
+		case *ic.InterAdapterTechProfileDownloadMessage_TpInstance:
+			logger.Debugw(ctx, "onu-uni-tp-path-modified", log.Fields{"uniID": uniID, "tp-path": techProfMsg.TpInstancePath, "tpID": tpID})
+			//	if there has been some change for some uni TechProfilePath
+			//in order to allow concurrent calls to other dh instances we do not wait for execution here
+			//but doing so we can not indicate problems to the caller (who does what with that then?)
+			//by now we just assume straightforward successful execution
+			//TODO!!! Generally: In this scheme it would be good to have some means to indicate
+			//  possible problems to the caller later autonomously
 
-		dh.pOnuTP.resetTpProcessingErrorIndication(uniID, tpID)
+			// deadline context to ensure completion of background routines waited for
+			//20200721: 10s proved to be less in 8*8 ONU test on local vbox machine with debug, might be further adapted
+			deadline := time.Now().Add(dh.pOpenOnuAc.maxTimeoutInterAdapterComm) //allowed run time to finish before execution
+			dctx, cancel := context.WithDeadline(context.Background(), deadline)
 
-		var wg sync.WaitGroup
-		wg.Add(1) // for the 1 go routine to finish
-		// attention: deadline completion check and wg.Done is to be done in both routines
-		go dh.pOnuTP.configureUniTp(log.WithSpanFromContext(dctx, ctx), uniID, techProfMsg.Path, &wg)
-		dh.waitForCompletion(ctx, cancel, &wg, "TechProfDwld") //wait for background process to finish
-		if tpErr := dh.pOnuTP.getTpProcessingErrorIndication(uniID, tpID); tpErr != nil {
-			logger.Errorw(ctx, "error-processing-tp", log.Fields{"device-id": dh.deviceID, "err": tpErr, "tp-path": techProfMsg.Path})
-			return tpErr
+			dh.pOnuTP.resetTpProcessingErrorIndication(uniID, tpID)
+
+			var wg sync.WaitGroup
+			wg.Add(1) // for the 1 go routine to finish
+			// attention: deadline completion check and wg.Done is to be done in both routines
+			go dh.pOnuTP.configureUniTp(log.WithSpanFromContext(dctx, ctx), uniID, techProfMsg.TpInstancePath, *tpInst.TpInstance, &wg)
+			dh.waitForCompletion(ctx, cancel, &wg, "TechProfDwld") //wait for background process to finish
+			if tpErr := dh.pOnuTP.getTpProcessingErrorIndication(uniID, tpID); tpErr != nil {
+				logger.Errorw(ctx, "error-processing-tp", log.Fields{"device-id": dh.deviceID, "err": tpErr, "tp-path": techProfMsg.TpInstancePath})
+				return tpErr
+			}
+			deadline = time.Now().Add(dh.pOpenOnuAc.maxTimeoutInterAdapterComm) //allowed run time to finish before execution
+			dctx2, cancel2 := context.WithDeadline(context.Background(), deadline)
+			pDevEntry.resetKvProcessingErrorIndication()
+			wg.Add(1) // for the 1 go routine to finish
+			go pDevEntry.updateOnuKvStore(log.WithSpanFromContext(dctx2, ctx), &wg)
+			dh.waitForCompletion(ctx, cancel2, &wg, "TechProfDwld") //wait for background process to finish
+			if kvErr := pDevEntry.getKvProcessingErrorIndication(); kvErr != nil {
+				logger.Errorw(ctx, "error-updating-KV", log.Fields{"device-id": dh.deviceID, "err": kvErr, "tp-path": techProfMsg.TpInstancePath})
+				return kvErr
+			}
+			return nil
+		default:
+			logger.Errorw(ctx, "unsupported-tp-instance-type", log.Fields{"tp-path": techProfMsg.TpInstancePath})
+			return fmt.Errorf("unsupported-tp-instance-type--tp-id-%v", techProfMsg.TpInstancePath)
 		}
-		deadline = time.Now().Add(dh.pOpenOnuAc.maxTimeoutInterAdapterComm) //allowed run time to finish before execution
-		dctx2, cancel2 := context.WithDeadline(context.Background(), deadline)
-		pDevEntry.resetKvProcessingErrorIndication()
-		wg.Add(1) // for the 1 go routine to finish
-		go pDevEntry.updateOnuKvStore(log.WithSpanFromContext(dctx2, ctx), &wg)
-		dh.waitForCompletion(ctx, cancel2, &wg, "TechProfDwld") //wait for background process to finish
-		if kvErr := pDevEntry.getKvProcessingErrorIndication(); kvErr != nil {
-			logger.Errorw(ctx, "error-updating-KV", log.Fields{"device-id": dh.deviceID, "err": kvErr, "tp-path": techProfMsg.Path})
-			return kvErr
-		}
-		return nil
 	}
 	// no change, nothing really to do - return success
-	logger.Debugw(ctx, "onu-uni-tp-path-not-modified", log.Fields{"uniID": uniID, "tp-path": techProfMsg.Path, "tpID": tpID})
+	logger.Debugw(ctx, "onu-uni-tp-path-not-modified", log.Fields{"uniID": uniID, "tp-path": techProfMsg.TpInstancePath, "tpID": tpID})
 	return nil
 }
 
@@ -493,9 +503,9 @@
 			delGemPortMsg.UniId, dh.deviceID))
 	}
 	uniID := uint8(delGemPortMsg.UniId)
-	tpID, err := GetTpIDFromTpPath(delGemPortMsg.TpPath)
+	tpID, err := GetTpIDFromTpPath(delGemPortMsg.TpInstancePath)
 	if err != nil {
-		logger.Errorw(ctx, "error-extracting-tp-id-from-tp-path", log.Fields{"err": err, "tp-path": delGemPortMsg.TpPath})
+		logger.Errorw(ctx, "error-extracting-tp-id-from-tp-path", log.Fields{"err": err, "tp-path": delGemPortMsg.TpInstancePath})
 		return err
 	}
 
@@ -509,7 +519,7 @@
 
 	var wg sync.WaitGroup
 	wg.Add(1) // for the 1 go routine to finish
-	go dh.pOnuTP.deleteTpResource(log.WithSpanFromContext(dctx, ctx), uniID, tpID, delGemPortMsg.TpPath,
+	go dh.pOnuTP.deleteTpResource(log.WithSpanFromContext(dctx, ctx), uniID, tpID, delGemPortMsg.TpInstancePath,
 		cResourceGemPort, delGemPortMsg.GemPortId, &wg)
 	dh.waitForCompletion(ctx, cancel, &wg, "GemDelete") //wait for background process to finish
 
@@ -551,7 +561,7 @@
 			delTcontMsg.UniId, dh.deviceID))
 	}
 	uniID := uint8(delTcontMsg.UniId)
-	tpPath := delTcontMsg.TpPath
+	tpPath := delTcontMsg.TpInstancePath
 	tpID, err := GetTpIDFromTpPath(tpPath)
 	if err != nil {
 		logger.Errorw(ctx, "error-extracting-tp-id-from-tp-path", log.Fields{"err": err, "tp-path": tpPath})
@@ -569,7 +579,7 @@
 
 		var wg sync.WaitGroup
 		wg.Add(2) // for the 2 go routines to finish
-		go dh.pOnuTP.deleteTpResource(log.WithSpanFromContext(dctx, ctx), uniID, tpID, delTcontMsg.TpPath,
+		go dh.pOnuTP.deleteTpResource(log.WithSpanFromContext(dctx, ctx), uniID, tpID, delTcontMsg.TpInstancePath,
 			cResourceTcont, delTcontMsg.AllocId, &wg)
 		// Removal of the tcont/alloc id mapping represents the removal of the tech profile
 		go pDevEntry.updateOnuKvStore(log.WithSpanFromContext(dctx, ctx), &wg)
@@ -822,7 +832,7 @@
 		} else {
 			logger.Errorw(ctx, "reconciling - restoring OnuTp-data failed - abort", log.Fields{"err": err, "device-id": dh.deviceID})
 		}
-		dh.stopReconciling(ctx)
+		dh.stopReconciling(ctx, false)
 		return
 	}
 	var onuIndication oop.OnuIndication
@@ -842,7 +852,7 @@
 	if pDevEntry == nil {
 		logger.Errorw(ctx, "No valid OnuDevice - aborting", log.Fields{"device-id": dh.deviceID})
 		if !dh.isSkipOnuConfigReconciling() {
-			dh.stopReconciling(ctx)
+			dh.stopReconciling(ctx, false)
 		}
 		return
 	}
@@ -855,12 +865,14 @@
 		logger.Debugw(ctx, "reconciling - no uni-configs have been stored before adapter restart - terminate reconcilement",
 			log.Fields{"device-id": dh.deviceID})
 		if !dh.isSkipOnuConfigReconciling() {
-			dh.stopReconciling(ctx)
+			dh.stopReconciling(ctx, true)
 		}
 		return
 	}
-	techProfsFound := false
 	flowsFound := false
+	techProfsFound := false
+	techProfInstLoadFailed := false
+outerLoop:
 	for _, uniData := range pDevEntry.sOnuPersistentData.PersUniConfig {
 		//TODO: check for uni-port specific reconcilement in case of multi-uni-port-per-onu-support
 		if len(uniData.PersTpPathMap) == 0 {
@@ -868,8 +880,31 @@
 				log.Fields{"uni-id": uniData.PersUniID, "device-id": dh.deviceID})
 			continue
 		}
-		techProfsFound = true
+		techProfsFound = true // set to true if we found TP once for any UNI port
 		for tpID := range uniData.PersTpPathMap {
+			// Request the TpInstance again from the openolt adapter in case of reconcile
+			iaTechTpInst, err := dh.AdapterProxy.TechProfileInstanceRequest(ctx, uniData.PersTpPathMap[tpID],
+				dh.device.ParentPortNo, dh.device.ProxyAddress.OnuId, uint32(uniData.PersUniID),
+				dh.pOpenOnuAc.config.Topic, dh.ProxyAddressType,
+				dh.parentID, dh.ProxyAddressID)
+			if err != nil || iaTechTpInst == nil {
+				logger.Errorw(ctx, "error fetching tp instance",
+					log.Fields{"tp-id": tpID, "tpPath": uniData.PersTpPathMap[tpID], "uni-id": uniData.PersUniID, "device-id": dh.deviceID, "err": err})
+				techProfInstLoadFailed = true // stop loading tp instance as soon as we hit failure
+				break outerLoop
+			}
+			var tpInst tech_profile.TechProfileInstance
+			switch techTpInst := iaTechTpInst.TechTpInstance.(type) {
+			case *ic.InterAdapterTechProfileDownloadMessage_TpInstance: // supports only GPON, XGPON, XGS-PON
+				tpInst = *techTpInst.TpInstance
+				logger.Debugw(ctx, "received-tp-instance-successfully-after-reconcile", log.Fields{"tp-id": tpID, "tpPath": uniData.PersTpPathMap[tpID], "uni-id": uniData.PersUniID, "device-id": dh.deviceID})
+
+			default: // do not support epon or other tech
+				logger.Errorw(ctx, "unsupported-tech", log.Fields{"tp-id": tpID, "tpPath": uniData.PersTpPathMap[tpID], "uni-id": uniData.PersUniID, "device-id": dh.deviceID})
+				techProfInstLoadFailed = true // stop loading tp instance as soon as we hit failure
+				break outerLoop
+			}
+
 			// deadline context to ensure completion of background routines waited for
 			//20200721: 10s proved to be less in 8*8 ONU test on local vbox machine with debug, might be further adapted
 			deadline := time.Now().Add(dh.pOpenOnuAc.maxTimeoutInterAdapterComm) //allowed run time to finish before execution
@@ -878,10 +913,12 @@
 			dh.pOnuTP.resetTpProcessingErrorIndication(uniData.PersUniID, tpID)
 			var wg sync.WaitGroup
 			wg.Add(1) // for the 1 go routine to finish
-			go dh.pOnuTP.configureUniTp(log.WithSpanFromContext(dctx, ctx), uniData.PersUniID, uniData.PersTpPathMap[tpID], &wg)
+			go dh.pOnuTP.configureUniTp(log.WithSpanFromContext(dctx, ctx), uniData.PersUniID, uniData.PersTpPathMap[tpID], tpInst, &wg)
 			dh.waitForCompletion(ctx, cancel, &wg, "TechProfReconcile") //wait for background process to finish
 			if err := dh.pOnuTP.getTpProcessingErrorIndication(uniData.PersUniID, tpID); err != nil {
 				logger.Errorw(ctx, err.Error(), log.Fields{"device-id": dh.deviceID})
+				techProfInstLoadFailed = true // stop loading tp instance as soon as we hit failure
+				break outerLoop
 			}
 		}
 		if len(uniData.PersFlowParams) != 0 {
@@ -892,18 +929,22 @@
 		logger.Debugw(ctx, "reconciling - no TPs have been stored before adapter restart - terminate reconcilement",
 			log.Fields{"device-id": dh.deviceID})
 		if !dh.isSkipOnuConfigReconciling() {
-			dh.stopReconciling(ctx)
+			dh.stopReconciling(ctx, true)
 		}
 		return
 	}
-	if dh.isSkipOnuConfigReconciling() {
+	if techProfInstLoadFailed {
+		dh.setDeviceReason(drTechProfileConfigDownloadFailed)
+		dh.stopReconciling(ctx, false)
+		return
+	} else if dh.isSkipOnuConfigReconciling() {
 		dh.setDeviceReason(drTechProfileConfigDownloadSuccess)
 	}
 	if !flowsFound {
 		logger.Debugw(ctx, "reconciling - no flows have been stored before adapter restart - terminate reconcilement",
 			log.Fields{"device-id": dh.deviceID})
 		if !dh.isSkipOnuConfigReconciling() {
-			dh.stopReconciling(ctx)
+			dh.stopReconciling(ctx, true)
 		}
 	}
 }
@@ -915,7 +956,7 @@
 	if pDevEntry == nil {
 		logger.Errorw(ctx, "No valid OnuDevice - aborting", log.Fields{"device-id": dh.deviceID})
 		if !dh.isSkipOnuConfigReconciling() {
-			dh.stopReconciling(ctx)
+			dh.stopReconciling(ctx, false)
 		}
 		return
 	}
@@ -926,7 +967,7 @@
 		logger.Debugw(ctx, "reconciling - no uni-configs have been stored before adapter restart - terminate reconcilement",
 			log.Fields{"device-id": dh.deviceID})
 		if !dh.isSkipOnuConfigReconciling() {
-			dh.stopReconciling(ctx)
+			dh.stopReconciling(ctx, true)
 		}
 		return
 	}
@@ -951,7 +992,7 @@
 			logger.Errorw(ctx, "reconciling - onuUniPort data not found  - terminate reconcilement",
 				log.Fields{"uniNo": uniNo, "device-id": dh.deviceID})
 			if !dh.isSkipOnuConfigReconciling() {
-				dh.stopReconciling(ctx)
+				dh.stopReconciling(ctx, false)
 			}
 			return
 		}
@@ -997,7 +1038,7 @@
 		logger.Debugw(ctx, "reconciling - no flows have been stored before adapter restart - terminate reconcilement",
 			log.Fields{"device-id": dh.deviceID})
 		if !dh.isSkipOnuConfigReconciling() {
-			dh.stopReconciling(ctx)
+			dh.stopReconciling(ctx, true)
 		}
 		return
 	}
@@ -1008,7 +1049,7 @@
 
 func (dh *deviceHandler) reconcileEnd(ctx context.Context) {
 	logger.Debugw(ctx, "reconciling - completed!", log.Fields{"device-id": dh.deviceID})
-	dh.stopReconciling(ctx)
+	dh.stopReconciling(ctx, true)
 }
 
 func (dh *deviceHandler) deleteDevicePersistencyData(ctx context.Context) error {
@@ -1752,7 +1793,7 @@
 			pDevEntry.mutexPersOnuConfig.RUnlock()
 			logger.Debugw(ctx, "reconciling - uni-ports were not unlocked before adapter restart - resume with a normal start-up",
 				log.Fields{"device-id": dh.deviceID})
-			dh.stopReconciling(ctx)
+			dh.stopReconciling(ctx, true)
 		} else {
 			pDevEntry.mutexPersOnuConfig.RUnlock()
 		}
@@ -2715,6 +2756,7 @@
 
 	logger.Debugw(ctx, "SetKVStoreBackend", log.Fields{"IpTarget": dh.pOpenOnuAc.KVStoreAddress,
 		"BasePathKvStore": aBasePathKvStore, "device-id": dh.deviceID})
+	// kvbackend := db.NewBackend(ctx, dh.pOpenOnuAc.KVStoreType, dh.pOpenOnuAc.KVStoreAddress, dh.pOpenOnuAc.KVStoreTimeout, aBasePathKvStore)
 	kvbackend := &db.Backend{
 		Client:    dh.pOpenOnuAc.kvClient,
 		StoreType: dh.pOpenOnuAc.KVStoreType,
@@ -3614,10 +3656,10 @@
 	dh.mutexReconcilingFlag.Unlock()
 }
 
-func (dh *deviceHandler) stopReconciling(ctx context.Context) {
-	logger.Debugw(ctx, "stop reconciling", log.Fields{"device-id": dh.deviceID})
+func (dh *deviceHandler) stopReconciling(ctx context.Context, success bool) {
+	logger.Debugw(ctx, "stop reconciling", log.Fields{"device-id": dh.deviceID, "success": success})
 	if dh.isReconciling() {
-		dh.chReconcilingFinished <- true
+		dh.chReconcilingFinished <- success
 	} else {
 		logger.Infow(ctx, "reconciling is not running", log.Fields{"device-id": dh.deviceID})
 	}