VOL-2120 OLT and ONU oper_status shows Active even when management interface is down in OLT

Introduced heartbeat check toward the device. openolt adapter will keep checking for heartbeat.

The device state will be updated once the configured timers expires

Change-Id: I49e1247f412cee2dcf6a510b52c348c8fed2304d
diff --git a/adaptercore/device_handler.go b/adaptercore/device_handler.go
index 9c65cee..fcf625f 100644
--- a/adaptercore/device_handler.go
+++ b/adaptercore/device_handler.go
@@ -76,11 +76,12 @@
 	eventMgr      *OpenOltEventMgr
 	resourceMgr   *rsrcMgr.OpenOltResourceMgr
 
-	discOnus      sync.Map
-	onus          sync.Map
-	portStats     *OpenOltStatisticsMgr
-	metrics       *pmmetrics.PmMetrics
-	stopCollector chan bool
+	discOnus           sync.Map
+	onus               sync.Map
+	portStats          *OpenOltStatisticsMgr
+	metrics            *pmmetrics.PmMetrics
+	stopCollector      chan bool
+	stopHeartbeatCheck chan bool
 }
 
 //OnuDevice represents ONU related info
@@ -133,6 +134,7 @@
 	dh.exitChannel = make(chan int, 1)
 	dh.lockDevice = sync.RWMutex{}
 	dh.stopCollector = make(chan bool, 2)
+	dh.stopHeartbeatCheck = make(chan bool, 2)
 	dh.metrics = pmmetrics.NewPmMetrics(cloned.Id, pmmetrics.Frequency(150), pmmetrics.FrequencyOverride(false), pmmetrics.Grouped(false), pmmetrics.Metrics(pmNames))
 	//TODO initialize the support classes.
 	return &dh
@@ -643,6 +645,7 @@
 	}
 
 	go startCollector(dh)
+	go startHeartbeatCheck(dh)
 }
 
 //GetOfpDeviceInfo Gets the Ofp information of the given device
@@ -1130,7 +1133,7 @@
 	dh.discOnus = sync.Map{}
 	dh.onus = sync.Map{}
 
-	go dh.notifyChildDevices()
+	go dh.notifyChildDevices("unreachable")
 	cloned := proto.Clone(device).(*voltha.Device)
 	// Update the all ports state on that device to disable
 	if err := dh.coreProxy.PortsStateUpdate(context.TODO(), cloned.Id, voltha.OperStatus_UNKNOWN); err != nil {
@@ -1142,11 +1145,11 @@
 	return nil
 }
 
-func (dh *DeviceHandler) notifyChildDevices() {
+func (dh *DeviceHandler) notifyChildDevices(state string) {
 
 	// Update onu state as unreachable in onu adapter
 	onuInd := oop.OnuIndication{}
-	onuInd.OperState = "unreachable"
+	onuInd.OperState = state
 	//get the child device for the parent device
 	onuDevices, err := dh.coreProxy.GetChildDevices(context.TODO(), dh.device.Id)
 	if err != nil {
@@ -1326,6 +1329,8 @@
 	log.Debug("Removed-device-from-Resource-manager-KV-store")
 	// Stop the Stats collector
 	dh.stopCollector <- true
+	// stop the heartbeat check routine
+	dh.stopHeartbeatCheck <- true
 	//Reset the state
 	if dh.Client != nil {
 		if _, err := dh.Client.Reboot(context.Background(), new(oop.Empty)); err != nil {
@@ -1473,3 +1478,52 @@
 func (dh *DeviceHandler) formOnuKey(intfID, onuID uint32) string {
 	return "" + strconv.Itoa(int(intfID)) + "." + strconv.Itoa(int(onuID))
 }
+
+func startHeartbeatCheck(dh *DeviceHandler) {
+	// start the heartbeat check towards the OLT.
+	var timerCheck *time.Timer
+
+	for {
+		heartbeatTimer := time.NewTimer(dh.openOLT.HeartbeatCheckInterval)
+		select {
+		case <-heartbeatTimer.C:
+			ctx, cancel := context.WithTimeout(context.Background(), dh.openOLT.GrpcTimeoutInterval)
+			if heartBeat, err := dh.Client.HeartbeatCheck(ctx, new(oop.Empty)); err != nil {
+				log.Error("Hearbeat failed")
+				if timerCheck == nil {
+					// start a after func, when expired will update the state to the core
+					timerCheck = time.AfterFunc(dh.openOLT.HeartbeatFailReportInterval, dh.updateStateUnreachable)
+				}
+			} else {
+				if timerCheck != nil {
+					if timerCheck.Stop() {
+						log.Debug("We got hearbeat within the timeout")
+					} else {
+
+						log.Debug("We got hearbeat after the timeout expired, changing the states")
+						go dh.notifyChildDevices("up")
+						if err := dh.coreProxy.DeviceStateUpdate(context.Background(), dh.device.Id, voltha.ConnectStatus_REACHABLE,
+							voltha.OperStatus_ACTIVE); err != nil {
+							log.Errorw("Failed to update device state", log.Fields{"deviceID": dh.device.Id, "error": err})
+						}
+					}
+					timerCheck = nil
+				}
+				log.Debugw("Hearbeat", log.Fields{"signature": heartBeat})
+			}
+			cancel()
+		case <-dh.stopHeartbeatCheck:
+			log.Debug("Stopping heart beat check")
+			return
+		}
+	}
+}
+
+func (dh *DeviceHandler) updateStateUnreachable() {
+
+	go dh.notifyChildDevices("unreachable")
+	if err := dh.coreProxy.DeviceStateUpdate(context.TODO(), dh.device.Id, voltha.ConnectStatus_UNREACHABLE, voltha.OperStatus_UNKNOWN); err != nil {
+		log.Errorw("error-updating-device-state", log.Fields{"deviceID": dh.device.Id, "error": err})
+		return
+	}
+}
diff --git a/adaptercore/openolt.go b/adaptercore/openolt.go
index 4c506a5..375b137 100644
--- a/adaptercore/openolt.go
+++ b/adaptercore/openolt.go
@@ -22,10 +22,12 @@
 	"errors"
 	"fmt"
 	"sync"
+	"time"
 
 	"github.com/opencord/voltha-lib-go/v2/pkg/adapters/adapterif"
 	"github.com/opencord/voltha-lib-go/v2/pkg/kafka"
 	"github.com/opencord/voltha-lib-go/v2/pkg/log"
+	"github.com/opencord/voltha-openolt-adapter/config"
 	ic "github.com/opencord/voltha-protos/v2/go/inter_container"
 	"github.com/opencord/voltha-protos/v2/go/openflow_13"
 	"github.com/opencord/voltha-protos/v2/go/voltha"
@@ -33,35 +35,42 @@
 
 //OpenOLT structure holds the OLT information
 type OpenOLT struct {
-	deviceHandlers        map[string]*DeviceHandler
-	coreProxy             adapterif.CoreProxy
-	adapterProxy          adapterif.AdapterProxy
-	eventProxy            adapterif.EventProxy
-	kafkaICProxy          *kafka.InterContainerProxy
-	numOnus               int
-	KVStoreHost           string
-	KVStorePort           int
-	KVStoreType           string
-	exitChannel           chan int
-	lockDeviceHandlersMap sync.RWMutex
+	deviceHandlers              map[string]*DeviceHandler
+	coreProxy                   adapterif.CoreProxy
+	adapterProxy                adapterif.AdapterProxy
+	eventProxy                  adapterif.EventProxy
+	kafkaICProxy                *kafka.InterContainerProxy
+	config                      *config.AdapterFlags
+	numOnus                     int
+	KVStoreHost                 string
+	KVStorePort                 int
+	KVStoreType                 string
+	exitChannel                 chan int
+	HeartbeatCheckInterval      time.Duration
+	HeartbeatFailReportInterval time.Duration
+	GrpcTimeoutInterval         time.Duration
+	lockDeviceHandlersMap       sync.RWMutex
 }
 
 //NewOpenOLT returns a new instance of OpenOLT
 func NewOpenOLT(ctx context.Context, kafkaICProxy *kafka.InterContainerProxy,
 	coreProxy adapterif.CoreProxy, adapterProxy adapterif.AdapterProxy,
-	eventProxy adapterif.EventProxy, onuNumber int, kvStoreHost string,
-	kvStorePort int, KVStoreType string) *OpenOLT {
+	eventProxy adapterif.EventProxy, cfg *config.AdapterFlags) *OpenOLT {
 	var openOLT OpenOLT
 	openOLT.exitChannel = make(chan int, 1)
 	openOLT.deviceHandlers = make(map[string]*DeviceHandler)
 	openOLT.kafkaICProxy = kafkaICProxy
-	openOLT.numOnus = onuNumber
+	openOLT.config = cfg
+	openOLT.numOnus = cfg.OnuNumber
 	openOLT.coreProxy = coreProxy
 	openOLT.adapterProxy = adapterProxy
 	openOLT.eventProxy = eventProxy
-	openOLT.KVStoreHost = kvStoreHost
-	openOLT.KVStorePort = kvStorePort
-	openOLT.KVStoreType = KVStoreType
+	openOLT.KVStoreHost = cfg.KVStoreHost
+	openOLT.KVStorePort = cfg.KVStorePort
+	openOLT.KVStoreType = cfg.KVStoreType
+	openOLT.HeartbeatCheckInterval = cfg.HeartbeatCheckInterval
+	openOLT.HeartbeatFailReportInterval = cfg.HeartbeatFailReportInterval
+	openOLT.GrpcTimeoutInterval = cfg.GrpcTimeoutInterval
 	openOLT.lockDeviceHandlersMap = sync.RWMutex{}
 	return &openOLT
 }
diff --git a/adaptercore/openolt_test.go b/adaptercore/openolt_test.go
index ec1ca43..47e9ce3 100644
--- a/adaptercore/openolt_test.go
+++ b/adaptercore/openolt_test.go
@@ -32,6 +32,7 @@
 	com "github.com/opencord/voltha-lib-go/v2/pkg/adapters/common"
 	fu "github.com/opencord/voltha-lib-go/v2/pkg/flows"
 	"github.com/opencord/voltha-lib-go/v2/pkg/kafka"
+	"github.com/opencord/voltha-openolt-adapter/config"
 	ic "github.com/opencord/voltha-protos/v2/go/inter_container"
 	"github.com/opencord/voltha-protos/v2/go/openflow_13"
 	ofp "github.com/opencord/voltha-protos/v2/go/openflow_13"
@@ -102,22 +103,22 @@
 
 func TestNewOpenOLT(t *testing.T) {
 	tests := []struct {
-		name   string
-		fields *fields
-		want   *OpenOLT
+		name        string
+		fields      *fields
+		configFlags *config.AdapterFlags
+		want        *OpenOLT
 	}{
-		{"newopenolt-1", &fields{numOnus: 1, KVStorePort: 1, KVStoreType: "consul", KVStoreHost: "1.1.1.1"},
+		{"newopenolt-1", &fields{}, &config.AdapterFlags{OnuNumber: 1, KVStorePort: 1, KVStoreType: "consul", KVStoreHost: "1.1.1.1"},
 			&OpenOLT{numOnus: 1, KVStorePort: 1, KVStoreType: "consul", KVStoreHost: "1.1.1.1"}},
-		{"newopenolt-2", &fields{numOnus: 2, KVStorePort: 2, KVStoreType: "etcd", KVStoreHost: "2.2.2.2"},
+		{"newopenolt-2", &fields{}, &config.AdapterFlags{OnuNumber: 2, KVStorePort: 2, KVStoreType: "etcd", KVStoreHost: "2.2.2.2"},
 			&OpenOLT{numOnus: 2, KVStorePort: 2, KVStoreType: "etcd", KVStoreHost: "2.2.2.2"}},
-		{"newopenolt-3", &fields{numOnus: 3, KVStorePort: 3, KVStoreType: "consul", KVStoreHost: "3.3.3.3"},
+		{"newopenolt-3", &fields{}, &config.AdapterFlags{OnuNumber: 3, KVStorePort: 3, KVStoreType: "consul", KVStoreHost: "3.3.3.3"},
 			&OpenOLT{numOnus: 3, KVStorePort: 3, KVStoreType: "consul", KVStoreHost: "3.3.3.3"}},
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
 			if got := NewOpenOLT(tt.fields.ctx, tt.fields.kafkaICProxy, tt.fields.coreProxy, tt.fields.adapterProxy,
-				tt.fields.eventProxy, tt.fields.numOnus, tt.fields.KVStoreHost, tt.fields.KVStorePort,
-				tt.fields.KVStoreType); reflect.TypeOf(got) != reflect.TypeOf(tt.want) && got != nil {
+				tt.fields.eventProxy, tt.configFlags); reflect.TypeOf(got) != reflect.TypeOf(tt.want) && got != nil {
 				t.Errorf("NewOpenOLT() error = %v, wantErr %v", got, tt.want)
 			}
 		})
diff --git a/config/config.go b/config/config.go
index d575231..babd8ea 100644
--- a/config/config.go
+++ b/config/config.go
@@ -48,31 +48,40 @@
 	defaultProbePort            = 8080
 	defaultLiveProbeInterval    = 60 * time.Second
 	defaultNotLiveProbeInterval = 5 * time.Second // Probe more frequently when not alive
+	//defaultHearbeatFailReportInterval is the time in seconds the adapter will keep checking the hardware for heartbeat.
+	defaultHearbeatCheckInterval = 30 * time.Second
+	// defaultHearbeatFailReportInterval is the time adapter will wait before updating the state to the core.
+	defaultHearbeatFailReportInterval = 180 * time.Second
+	//defaultGrpcTimeoutInterval is the time in seconds a grpc call will wait before returning error.
+	defaultGrpcTimeoutInterval = 2 * time.Second
 )
 
 // AdapterFlags represents the set of configurations used by the read-write adaptercore service
 type AdapterFlags struct {
 	// Command line parameters
-	InstanceID           string
-	KafkaAdapterHost     string
-	KafkaAdapterPort     int
-	KafkaClusterHost     string
-	KafkaClusterPort     int
-	KVStoreType          string
-	KVStoreTimeout       int // in seconds
-	KVStoreHost          string
-	KVStorePort          int
-	Topic                string
-	CoreTopic            string
-	EventTopic           string
-	LogLevel             int
-	OnuNumber            int
-	Banner               bool
-	DisplayVersionOnly   bool
-	ProbeHost            string
-	ProbePort            int
-	LiveProbeInterval    time.Duration
-	NotLiveProbeInterval time.Duration
+	InstanceID                  string
+	KafkaAdapterHost            string
+	KafkaAdapterPort            int
+	KafkaClusterHost            string
+	KafkaClusterPort            int
+	KVStoreType                 string
+	KVStoreTimeout              int // in seconds
+	KVStoreHost                 string
+	KVStorePort                 int
+	Topic                       string
+	CoreTopic                   string
+	EventTopic                  string
+	LogLevel                    int
+	OnuNumber                   int
+	Banner                      bool
+	DisplayVersionOnly          bool
+	ProbeHost                   string
+	ProbePort                   int
+	LiveProbeInterval           time.Duration
+	NotLiveProbeInterval        time.Duration
+	HeartbeatCheckInterval      time.Duration
+	HeartbeatFailReportInterval time.Duration
+	GrpcTimeoutInterval         time.Duration
 }
 
 func init() {
@@ -82,26 +91,29 @@
 // NewAdapterFlags returns a new RWCore config
 func NewAdapterFlags() *AdapterFlags {
 	var adapterFlags = AdapterFlags{ // Default values
-		InstanceID:           defaultInstanceid,
-		KafkaAdapterHost:     defaultKafkaadapterhost,
-		KafkaAdapterPort:     defaultKafkaadapterport,
-		KafkaClusterHost:     defaultKafkaclusterhost,
-		KafkaClusterPort:     defaultKafkaclusterport,
-		KVStoreType:          defaultKvstoretype,
-		KVStoreTimeout:       defaultKvstoretimeout,
-		KVStoreHost:          defaultKvstorehost,
-		KVStorePort:          defaultKvstoreport,
-		Topic:                defaultTopic,
-		CoreTopic:            defaultCoretopic,
-		EventTopic:           defaultEventtopic,
-		LogLevel:             defaultLoglevel,
-		OnuNumber:            defaultOnunumber,
-		Banner:               defaultBanner,
-		DisplayVersionOnly:   defaultDisplayVersionOnly,
-		ProbeHost:            defaultProbeHost,
-		ProbePort:            defaultProbePort,
-		LiveProbeInterval:    defaultLiveProbeInterval,
-		NotLiveProbeInterval: defaultNotLiveProbeInterval,
+		InstanceID:                  defaultInstanceid,
+		KafkaAdapterHost:            defaultKafkaadapterhost,
+		KafkaAdapterPort:            defaultKafkaadapterport,
+		KafkaClusterHost:            defaultKafkaclusterhost,
+		KafkaClusterPort:            defaultKafkaclusterport,
+		KVStoreType:                 defaultKvstoretype,
+		KVStoreTimeout:              defaultKvstoretimeout,
+		KVStoreHost:                 defaultKvstorehost,
+		KVStorePort:                 defaultKvstoreport,
+		Topic:                       defaultTopic,
+		CoreTopic:                   defaultCoretopic,
+		EventTopic:                  defaultEventtopic,
+		LogLevel:                    defaultLoglevel,
+		OnuNumber:                   defaultOnunumber,
+		Banner:                      defaultBanner,
+		DisplayVersionOnly:          defaultDisplayVersionOnly,
+		ProbeHost:                   defaultProbeHost,
+		ProbePort:                   defaultProbePort,
+		LiveProbeInterval:           defaultLiveProbeInterval,
+		NotLiveProbeInterval:        defaultNotLiveProbeInterval,
+		HeartbeatCheckInterval:      defaultHearbeatCheckInterval,
+		HeartbeatFailReportInterval: defaultHearbeatFailReportInterval,
+		GrpcTimeoutInterval:         defaultGrpcTimeoutInterval,
 	}
 	return &adapterFlags
 }
@@ -166,8 +178,16 @@
 	help = fmt.Sprintf("Number of seconds for liveliness check if probe is not running")
 	flag.DurationVar(&(so.NotLiveProbeInterval), "not_live_probe_interval", defaultNotLiveProbeInterval, help)
 
-	flag.Parse()
+	help = fmt.Sprintf("Number of seconds for heartbeat check interval.")
+	flag.DurationVar(&(so.HeartbeatCheckInterval), "hearbeat_check_interval", defaultHearbeatCheckInterval, help)
 
+	help = fmt.Sprintf("Number of seconds adapter has to wait before reporting core on the hearbeat check failure.")
+	flag.DurationVar(&(so.HeartbeatFailReportInterval), "hearbeat_fail_interval", defaultHearbeatFailReportInterval, help)
+
+	help = fmt.Sprintf("Number of seconds for GRPC timeout.")
+	flag.DurationVar(&(so.GrpcTimeoutInterval), "grpc_timeout_interval", defaultGrpcTimeoutInterval, help)
+
+	flag.Parse()
 	containerName := getContainerInfo()
 	if len(containerName) > 0 {
 		so.InstanceID = containerName
diff --git a/main.go b/main.go
index 0e2e186..f284969 100644
--- a/main.go
+++ b/main.go
@@ -124,8 +124,7 @@
 
 	// Create the open OLT adapter
 	if a.iAdapter, err = a.startOpenOLT(ctx, a.kip, a.coreProxy, a.adapterProxy, a.eventProxy,
-		a.config.OnuNumber,
-		a.config.KVStoreHost, a.config.KVStorePort, a.config.KVStoreType); err != nil {
+		a.config); err != nil {
 		log.Fatal("error-starting-inter-container-proxy")
 	}
 
@@ -334,11 +333,11 @@
 }
 
 func (a *adapter) startOpenOLT(ctx context.Context, kip *kafka.InterContainerProxy,
-	cp adapterif.CoreProxy, ap adapterif.AdapterProxy, ep adapterif.EventProxy, onuNumber int, kvStoreHost string,
-	kvStorePort int, KVStoreType string) (*ac.OpenOLT, error) {
+	cp adapterif.CoreProxy, ap adapterif.AdapterProxy, ep adapterif.EventProxy,
+	cfg *config.AdapterFlags) (*ac.OpenOLT, error) {
 	log.Info("starting-open-olt")
 	var err error
-	sOLT := ac.NewOpenOLT(ctx, a.kip, cp, ap, ep, onuNumber, kvStoreHost, kvStorePort, KVStoreType)
+	sOLT := ac.NewOpenOLT(ctx, a.kip, cp, ap, ep, cfg)
 
 	if err = sOLT.Start(ctx); err != nil {
 		log.Fatalw("error-starting-messaging-proxy", log.Fields{"error": err})
diff --git a/main_test.go b/main_test.go
index f25d132..cffd0aa 100644
--- a/main_test.go
+++ b/main_test.go
@@ -112,8 +112,7 @@
 
 	ad := newMockAdapter()
 	oolt, err := ad.startOpenOLT(context.TODO(), nil,
-		ad.coreProxy, ad.adapterProxy, ad.eventProxy, 1, ad.config.KVStoreHost,
-		ad.config.KVStorePort, ad.config.KVStoreType)
+		ad.coreProxy, ad.adapterProxy, ad.eventProxy, ad.config)
 	if oolt != nil {
 		t.Log("Open OLT ", oolt)
 	}
@@ -167,8 +166,7 @@
 	ad.kip.Start()
 
 	oolt, _ := ad.startOpenOLT(context.TODO(), nil,
-		ad.coreProxy, ad.adapterProxy, ad.eventProxy, 1, ad.config.KVStoreHost,
-		ad.config.KVStorePort, ad.config.KVStoreType)
+		ad.coreProxy, ad.adapterProxy, ad.eventProxy, ad.config)
 	printBanner()
 	printVersion()
 	ctx := context.TODO()