changes for olt reconnect , reboot redesing

Change-Id: I192c01f7672b62956b4b55e19b8cd0d33ca7cfae

fixes for device state handling

Change-Id: I6765b7b53b2c130c70ac37cad28373cec7397908

olt reconnect, reboot redesign changes

Change-Id: I2a4981bc815d0961ffbf7e36ba7cfb06243e8319
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index 2d3c7c5..500dbf1 100644
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -21,6 +21,7 @@
 	"context"
 	"encoding/binary"
 	"encoding/hex"
+	"encoding/json"
 	"errors"
 	"fmt"
 	"net"
@@ -72,6 +73,7 @@
 	oltPortInfoTimeout     = 3
 
 	defaultPortSpeedMbps = 1000
+	heartbeatPath        = "heartbeat"
 )
 
 //DeviceHandler will interact with the OLT device.
@@ -93,6 +95,7 @@
 	groupMgr                *OpenOltGroupMgr
 	eventMgr                *OpenOltEventMgr
 	resourceMgr             []*rsrcMgr.OpenOltResourceMgr
+	kvStore                 *db.Backend // backend kv store connection handle
 
 	deviceInfo *oop.DeviceInfo
 
@@ -120,6 +123,7 @@
 	agentPreviouslyConnected   bool
 
 	isDeviceDeletionInProgress bool
+	heartbeatSignature         uint32
 }
 
 //OnuDevice represents ONU related info
@@ -185,6 +189,7 @@
 //NewDeviceHandler creates a new device handler
 func NewDeviceHandler(cc *vgrpc.Client, ep eventif.EventProxy, device *voltha.Device, adapter *OpenOLT, cm *config.ConfigManager, cfg *conf.AdapterFlags) *DeviceHandler {
 	var dh DeviceHandler
+	ctx := context.Background()
 	dh.cm = cm
 	dh.coreClient = cc
 	dh.EventProxy = ep
@@ -201,6 +206,13 @@
 	dh.perPonOnuIndicationChannel = make(map[uint32]onuIndicationChannels)
 	dh.childAdapterClients = make(map[string]*vgrpc.Client)
 	dh.cfg = cfg
+	kvStoreDevicePath := fmt.Sprintf(dh.cm.Backend.PathPrefix, "/%s/", dh.device.Id)
+	dh.kvStore = SetKVClient(ctx, dh.openOLT.KVStoreType, dh.openOLT.KVStoreAddress, dh.device.Id, kvStoreDevicePath)
+	if dh.kvStore == nil {
+		logger.Error(ctx, "Failed to setup KV store")
+		return nil
+	}
+
 	// Create a slice of buffered channels for handling concurrent mcast flow/group.
 	dh.incomingMcastFlowOrGroup = make([]chan McastFlowOrGroupControlBlock, MaxNumOfGroupHandlerChannels)
 	dh.stopMcastHandlerRoutine = make([]chan bool, MaxNumOfGroupHandlerChannels)
@@ -219,6 +231,33 @@
 	return &dh
 }
 
+func newKVClient(ctx context.Context, storeType string, address string, timeout time.Duration) (kvstore.Client, error) {
+	logger.Infow(ctx, "kv-store-type", log.Fields{"store": storeType})
+	switch storeType {
+	case "etcd":
+		return kvstore.NewEtcdClient(ctx, address, timeout, log.FatalLevel)
+	}
+	return nil, errors.New("unsupported-kv-store")
+}
+
+// SetKVClient sets the KV client and return a kv backend
+func SetKVClient(ctx context.Context, backend string, addr string, DeviceID string, basePathKvStore string) *db.Backend {
+	kvClient, err := newKVClient(ctx, backend, addr, rsrcMgr.KvstoreTimeout)
+	if err != nil {
+		logger.Fatalw(ctx, "Failed to init KV client\n", log.Fields{"err": err})
+		return nil
+	}
+
+	kvbackend := &db.Backend{
+		Client:     kvClient,
+		StoreType:  backend,
+		Address:    addr,
+		Timeout:    rsrcMgr.KvstoreTimeout,
+		PathPrefix: fmt.Sprintf(rsrcMgr.BasePathKvStore, basePathKvStore, DeviceID)}
+
+	return kvbackend
+}
+
 // start save the device to the data model
 func (dh *DeviceHandler) start(ctx context.Context) {
 	dh.lockDevice.Lock()
@@ -744,18 +783,6 @@
 	raisedTs := time.Now().Unix()
 	go dh.eventMgr.oltCommunicationEvent(ctx, dh.device, raisedTs)
 
-	//check adapter and agent reconcile status
-	//reboot olt if needed (olt disconnection case)
-	if dh.adapterPreviouslyConnected != dh.agentPreviouslyConnected {
-		logger.Warnw(ctx, "different-reconcile-status-between-adapter-and-agent-rebooting-device",
-			log.Fields{
-				"device-id":      dh.device.Id,
-				"adapter-status": dh.adapterPreviouslyConnected,
-				"agent-status":   dh.agentPreviouslyConnected,
-			})
-		_ = dh.RebootDevice(ctx, dh.device)
-	}
-
 	return nil
 }
 
@@ -839,6 +866,7 @@
 		}
 	}
 
+	logger.Debugw(ctx, "Dailing grpc", log.Fields{"device-id": dh.device.Id})
 	// Use Interceptors to automatically inject and publish Open Tracing Spans by this GRPC client
 	dh.clientCon, err = grpc.Dial(dh.device.GetHostAndPort(),
 		grpc.WithInsecure(),
@@ -905,13 +933,6 @@
 			return olterrors.NewErrAdapter("device-handler-initialization-failed", log.Fields{"device-id": dh.device.Id}, err).LogAt(log.ErrorLevel)
 		}
 
-		// Start reading indications
-		go func() {
-			if err = dh.readIndications(ctx); err != nil {
-				_ = olterrors.NewErrAdapter("indication-read-failure", log.Fields{"device-id": dh.device.Id}, err).LogAt(log.ErrorLevel)
-			}
-		}()
-
 		go startHeartbeatCheck(ctx, dh)
 
 		return nil
@@ -931,12 +952,6 @@
 		return olterrors.NewErrAdapter("device-handler-initialization-failed", log.Fields{"device-id": dh.device.Id}, err).LogAt(log.ErrorLevel)
 	}
 
-	// Start reading indications
-	go func() {
-		if err := dh.readIndications(ctx); err != nil {
-			_ = olterrors.NewErrAdapter("read-indications-failure", log.Fields{"device-id": dh.device.Id}, err).Log()
-		}
-	}()
 	go dh.updateLocalDevice(ctx)
 
 	if device.PmConfigs != nil {
@@ -1368,13 +1383,88 @@
 	return nil
 }
 
-func (dh *DeviceHandler) onuDiscIndication(ctx context.Context, onuDiscInd *oop.OnuDiscIndication) error {
+//getChildDevice function can be used in general to get child device, if not found in cache the function will
+//get from core and update the cache and return the child device.
+func (dh *DeviceHandler) getChildDevice(ctx context.Context, sn string, parentPortNo uint32) *OnuDevice {
+	var InCacheOnuDev *OnuDevice
+	dh.onus.Range(func(Onukey interface{}, onuInCache interface{}) bool {
+		if onuInCache.(*OnuDevice).serialNumber == sn {
+			InCacheOnuDev = onuInCache.(*OnuDevice)
+			return false
+		}
+		return true
+	})
+	//Got the onu device from cache return
+	if InCacheOnuDev != nil {
+		logger.Debugw(ctx, "Got child device from cache", log.Fields{"onudev": InCacheOnuDev.serialNumber})
+		return InCacheOnuDev
+	}
+	onuDevice, _ := dh.getChildDeviceFromCore(ctx, &ca.ChildDeviceFilter{
+		ParentId:     dh.device.Id,
+		SerialNumber: sn,
+		ParentPortNo: parentPortNo,
+	})
+	//No device found in core return nil
+	if onuDevice == nil {
+		return nil
+	}
+	onuID := onuDevice.ProxyAddress.OnuId
+	intfID := plt.PortNoToIntfID(parentPortNo, voltha.Port_PON_OLT)
+	onuKey := dh.formOnuKey(intfID, onuID)
+
+	onuDev := NewOnuDevice(onuDevice.Id, onuDevice.Type, onuDevice.SerialNumber, onuDevice.ProxyAddress.OnuId, intfID, onuDevice.ProxyAddress.DeviceId, false, onuDevice.AdapterEndpoint)
+	dh.onus.Store(onuKey, onuDev)
+	logger.Debugw(ctx, "got child device from core", log.Fields{"onudev": onuDevice})
+	return onuDev
+}
+
+func (dh *DeviceHandler) checkForResourceExistance(ctx context.Context, onuDiscInd *oop.OnuDiscIndication, sn string) (bool, error) {
 	channelID := onuDiscInd.GetIntfId()
 	parentPortNo := plt.IntfIDToPortNo(onuDiscInd.GetIntfId(), voltha.Port_PON_OLT)
+	tpInstExists := false
 
-	sn := dh.stringifySerialNumber(onuDiscInd.SerialNumber)
-	logger.Infow(ctx, "new-discovery-indication", log.Fields{"sn": sn})
+	//CheckOnuDevExistenceAtOnuDiscovery if true , a check will be made for the existence of the onu device. If the onu device
+	// still exists , the onu discovery will be ignored, else a check for active techprofiles for ONU is checked.
+	if !dh.openOLT.CheckOnuDevExistenceAtOnuDiscovery {
+		onuDev := dh.getChildDevice(ctx, sn, parentPortNo)
+		if onuDev != nil {
+			var onuGemInfo *rsrcMgr.OnuGemInfo
+			var err error
+			if onuGemInfo, err = dh.resourceMgr[channelID].GetOnuGemInfo(ctx, onuDev.onuID); err != nil {
+				logger.Warnw(ctx, "Unable to find onuGemInfo", log.Fields{"onuID": onuDev.onuID})
+				return false, err
+			}
+			if onuGemInfo != nil {
+				for _, uni := range onuGemInfo.UniPorts {
+					uniID := plt.UniIDFromPortNum(uni)
+					tpIDs := dh.resourceMgr[channelID].GetTechProfileIDForOnu(ctx, onuDev.onuID, uniID)
+					if len(tpIDs) != 0 {
+						logger.Warnw(ctx, "Techprofile present for ONU, ignoring onu discovery", log.Fields{"onuID": onuDev.onuID})
+						tpInstExists = true
+						break
+					}
+				}
+			}
+		}
+		return tpInstExists, nil
+	}
 
+	onuDevice, _ := dh.getChildDeviceFromCore(ctx, &ca.ChildDeviceFilter{
+		ParentId:     dh.device.Id,
+		SerialNumber: sn,
+		ParentPortNo: parentPortNo,
+	})
+	if onuDevice != nil {
+		logger.Infow(ctx, "Child device still present ignoring discovery indication", log.Fields{"sn": sn})
+		return true, nil
+	}
+	logger.Infow(ctx, "No device present in core , continuing with discovery", log.Fields{"sn": sn})
+
+	return false, nil
+
+}
+
+func (dh *DeviceHandler) processDiscONULOSClear(ctx context.Context, onuDiscInd *oop.OnuDiscIndication, sn string) bool {
 	var alarmInd oop.OnuAlarmIndication
 	raisedTs := time.Now().Unix()
 	if _, loaded := dh.discOnus.LoadOrStore(sn, true); loaded {
@@ -1405,9 +1495,29 @@
 		})
 
 		logger.Warnw(ctx, "onu-sn-is-already-being-processed", log.Fields{"sn": sn})
+		return true
+	}
+	return false
+}
+
+func (dh *DeviceHandler) onuDiscIndication(ctx context.Context, onuDiscInd *oop.OnuDiscIndication) error {
+	channelID := onuDiscInd.GetIntfId()
+	parentPortNo := plt.IntfIDToPortNo(onuDiscInd.GetIntfId(), voltha.Port_PON_OLT)
+
+	sn := dh.stringifySerialNumber(onuDiscInd.SerialNumber)
+	logger.Infow(ctx, "new-discovery-indication", log.Fields{"sn": sn})
+
+	tpInstExists, errtp := dh.checkForResourceExistance(ctx, onuDiscInd, sn)
+	if errtp != nil {
+		return errtp
+	}
+	if tpInstExists {
+		//ignore the discovery if tpinstance is present.
 		return nil
 	}
-
+	if onuBeingProcessed := dh.processDiscONULOSClear(ctx, onuDiscInd, sn); onuBeingProcessed {
+		return nil
+	}
 	var onuID uint32
 
 	// check the ONU is already know to the OLT
@@ -2283,6 +2393,7 @@
 
 	// start the heartbeat check towards the OLT.
 	var timerCheck *time.Timer
+	dh.heartbeatSignature = dh.getHeartbeatSignature(ctx)
 
 	for {
 		heartbeatTimer := time.NewTimer(dh.openOLT.HeartbeatCheckInterval)
@@ -2302,9 +2413,36 @@
 					}
 					timerCheck = nil
 				}
-				logger.Debugw(ctx, "heartbeat",
-					log.Fields{"signature": heartBeat,
-						"device-id": dh.device.Id})
+				if dh.heartbeatSignature == 0 || dh.heartbeatSignature == heartBeat.HeartbeatSignature {
+					if dh.heartbeatSignature == 0 {
+						// First time the signature will be 0, update the signture to DB when not found.
+						dh.updateHeartbeatSignature(ctx, heartBeat.HeartbeatSignature)
+						dh.heartbeatSignature = heartBeat.HeartbeatSignature
+					}
+					logger.Infow(ctx, "heartbeat signature", log.Fields{"sign": dh.heartbeatSignature})
+
+					dh.lockDevice.RLock()
+					// Stop the read indication only if it the routine is active
+					// The read indication would have already stopped due to failure on the gRPC stream following OLT going unreachable
+					// Sending message on the 'stopIndication' channel again will cause the readIndication routine to immediately stop
+					// on next execution of the readIndication routine.
+					if !dh.isReadIndicationRoutineActive {
+						// Start reading indications
+						go func() {
+							if err = dh.readIndications(ctx); err != nil {
+								_ = olterrors.NewErrAdapter("indication-read-failure", log.Fields{"device-id": dh.device.Id}, err).LogAt(log.ErrorLevel)
+							}
+						}()
+					}
+					dh.lockDevice.RUnlock()
+
+				} else {
+					logger.Warn(ctx, "Heartbeat signature changed, OLT is rebooted. Cleaningup resources.")
+					dh.updateHeartbeatSignature(ctx, heartBeat.HeartbeatSignature)
+					dh.heartbeatSignature = heartBeat.HeartbeatSignature
+					go dh.updateStateRebooted(ctx)
+				}
+
 			}
 			cancel()
 		case <-dh.stopHeartbeatCheck:
@@ -2327,7 +2465,7 @@
 		return
 	}
 
-	logger.Debugw(ctx, "update-state-unreachable", log.Fields{"device-id": dh.device.Id, "connect-status": device.ConnectStatus,
+	logger.Warnw(ctx, "update-state-unreachable", log.Fields{"device-id": dh.device.Id, "connect-status": device.ConnectStatus,
 		"admin-state": device.AdminState, "oper-status": device.OperStatus})
 	if device.ConnectStatus == voltha.ConnectStatus_REACHABLE {
 		if err = dh.updateDeviceStateInCore(ctx, &ca.DeviceStateFilter{
@@ -2337,14 +2475,15 @@
 		}); err != nil {
 			_ = olterrors.NewErrAdapter("device-state-update-failed", log.Fields{"device-id": dh.device.Id}, err).LogAt(log.ErrorLevel)
 		}
-
-		if err = dh.updatePortsStateInCore(ctx, &ca.PortStateFilter{
-			DeviceId:       dh.device.Id,
-			PortTypeFilter: 0,
-			OperStatus:     voltha.OperStatus_UNKNOWN,
-		}); err != nil {
-			_ = olterrors.NewErrAdapter("port-update-failed", log.Fields{"device-id": dh.device.Id}, err).Log()
-		}
+		/*
+			if err = dh.updatePortsStateInCore(ctx, &ca.PortStateFilter{
+				DeviceId:       dh.device.Id,
+				PortTypeFilter: 0,
+				OperStatus:     voltha.OperStatus_UNKNOWN,
+			}); err != nil {
+				_ = olterrors.NewErrAdapter("port-update-failed", log.Fields{"device-id": dh.device.Id}, err).Log()
+			}
+		*/
 
 		//raise olt communication failure event
 		raisedTs := time.Now().Unix()
@@ -2354,12 +2493,10 @@
 		dh.device = cloned // update local copy of the device
 		go dh.eventMgr.oltCommunicationEvent(ctx, cloned, raisedTs)
 
-		dh.cleanupDeviceResources(ctx)
 		// Stop the Stats collector
 		dh.stopCollector <- true
 		// stop the heartbeat check routine
 		dh.stopHeartbeatCheck <- true
-
 		dh.lockDevice.RLock()
 		// Stop the read indication only if it the routine is active
 		// The read indication would have already stopped due to failure on the gRPC stream following OLT going unreachable
@@ -2369,28 +2506,94 @@
 			dh.stopIndications <- true
 		}
 		dh.lockDevice.RUnlock()
-
-		var wg sync.WaitGroup
-		wg.Add(1) // for the multicast handler routine
-		go dh.StopAllMcastHandlerRoutines(ctx, &wg)
-		for _, flMgr := range dh.flowMgr {
-			wg.Add(1) // for the flow handler routine
-			go flMgr.StopAllFlowHandlerRoutines(ctx, &wg)
-		}
-		if !dh.waitForTimeoutOrCompletion(&wg, time.Second*30) {
-			logger.Warnw(ctx, "timed out waiting for stopping flow and group handlers", log.Fields{"deviceID": device.Id})
-		} else {
-			logger.Infow(ctx, "all flow and group handlers shutdown gracefully", log.Fields{"deviceID": device.Id})
-		}
-
-		//reset adapter reconcile flag
-		dh.adapterPreviouslyConnected = false
-
 		dh.transitionMap.Handle(ctx, DeviceInit)
 
 	}
 }
 
+func (dh *DeviceHandler) updateStateRebooted(ctx context.Context) {
+	device, err := dh.getDeviceFromCore(ctx, dh.device.Id)
+	if err != nil || device == nil {
+		// One case where we have seen core returning an error for GetDevice call is after OLT device delete.
+		// After OLT delete, the adapter asks for OLT to reboot. When OLT is rebooted, shortly we loose heartbeat.
+		// The 'startHeartbeatCheck' then asks the device to be marked unreachable towards the core, but the core
+		// has already deleted the device and returns error. In this particular scenario, it is Ok because any necessary
+		// cleanup in the adapter was already done during DeleteDevice API handler routine.
+		_ = olterrors.NewErrNotFound("device", log.Fields{"device-id": dh.device.Id}, err).Log()
+		// Immediately return, otherwise accessing a null 'device' struct would cause panic
+		return
+	}
+
+	logger.Warnw(ctx, "update-state-rebooted", log.Fields{"device-id": dh.device.Id, "connect-status": device.ConnectStatus,
+		"admin-state": device.AdminState, "oper-status": device.OperStatus, "conn-status": voltha.ConnectStatus_UNREACHABLE})
+	if err = dh.updateDeviceStateInCore(ctx, &ca.DeviceStateFilter{
+		DeviceId:   dh.device.Id,
+		OperStatus: voltha.OperStatus_REBOOTED,
+		ConnStatus: voltha.ConnectStatus_REACHABLE,
+	}); err != nil {
+		_ = olterrors.NewErrAdapter("device-state-update-failed", log.Fields{"device-id": dh.device.Id}, err).LogAt(log.ErrorLevel)
+	}
+
+	dh.lockDevice.RLock()
+	// Stop the read indication only if it the routine is active
+	// The read indication would have already stopped due to failure on the gRPC stream following OLT going unreachable
+	// Sending message on the 'stopIndication' channel again will cause the readIndication routine to immediately stop
+	// on next execution of the readIndication routine.
+	if dh.isReadIndicationRoutineActive {
+		dh.stopIndications <- true
+	}
+	dh.lockDevice.RUnlock()
+
+	//raise olt communication failure event
+	raisedTs := time.Now().Unix()
+	cloned := proto.Clone(device).(*voltha.Device)
+	cloned.ConnectStatus = voltha.ConnectStatus_UNREACHABLE
+	cloned.OperStatus = voltha.OperStatus_UNKNOWN
+	dh.device = cloned // update local copy of the device
+	go dh.eventMgr.oltCommunicationEvent(ctx, cloned, raisedTs)
+
+	dh.cleanupDeviceResources(ctx)
+	// Stop the Stats collector
+	dh.stopCollector <- true
+	// stop the heartbeat check routine
+	dh.stopHeartbeatCheck <- true
+
+	var wg sync.WaitGroup
+	wg.Add(1) // for the multicast handler routine
+	go dh.StopAllMcastHandlerRoutines(ctx, &wg)
+	for _, flMgr := range dh.flowMgr {
+		wg.Add(1) // for the flow handler routine
+		go flMgr.StopAllFlowHandlerRoutines(ctx, &wg)
+	}
+	if !dh.waitForTimeoutOrCompletion(&wg, time.Second*30) {
+		logger.Warnw(ctx, "timed out waiting for stopping flow and group handlers", log.Fields{"deviceID": device.Id})
+	} else {
+		logger.Infow(ctx, "all flow and group handlers shutdown gracefully", log.Fields{"deviceID": device.Id})
+	}
+
+	//reset adapter reconcile flag
+	dh.adapterPreviouslyConnected = false
+	for {
+
+		childDevices, err := dh.getChildDevicesFromCore(ctx, dh.device.Id)
+		if err != nil || childDevices == nil {
+			logger.Errorw(ctx, "Failed to get child devices from core", log.Fields{"deviceID": dh.device.Id})
+			continue
+		}
+		if len(childDevices.Items) == 0 {
+			logger.Infow(ctx, "All childDevices cleared from core, proceed with device init", log.Fields{"deviceID": dh.device.Id})
+			break
+		} else {
+			logger.Warn(ctx, "Not all child devices are cleared, continuing to wait")
+			time.Sleep(5 * time.Second)
+		}
+
+	}
+	logger.Infow(ctx, "cleanup complete after reboot , moving to init", log.Fields{"deviceID": device.Id})
+	dh.transitionMap.Handle(ctx, DeviceInit)
+
+}
+
 // EnablePort to enable Pon interface
 func (dh *DeviceHandler) EnablePort(ctx context.Context, port *voltha.Port) error {
 	logger.Debugw(ctx, "enable-port", log.Fields{"Device": dh.device, "port": port})
@@ -3368,3 +3571,34 @@
 		return false // timed out
 	}
 }
+
+func (dh *DeviceHandler) updateHeartbeatSignature(ctx context.Context, signature uint32) {
+	val, err := json.Marshal(signature)
+	if err != nil {
+		logger.Error(ctx, "failed-to-marshal")
+		return
+	}
+	if err = dh.kvStore.Put(ctx, heartbeatPath, val); err != nil {
+		logger.Error(ctx, "failed-to-store-hearbeat-signature")
+	}
+}
+
+func (dh *DeviceHandler) getHeartbeatSignature(ctx context.Context) uint32 {
+	var signature uint32
+
+	Value, er := dh.kvStore.Get(ctx, heartbeatPath)
+	if er == nil {
+		if Value != nil {
+			Val, er := kvstore.ToByte(Value.Value)
+			if er != nil {
+				logger.Errorw(ctx, "Failed to convert into byte array", log.Fields{"err": er})
+				return signature
+			}
+			if er = json.Unmarshal(Val, &signature); er != nil {
+				logger.Error(ctx, "Failed to unmarshal signature", log.Fields{"err": er})
+				return signature
+			}
+		}
+	}
+	return signature
+}