VOL-2120 OLT and ONU oper_status shows Active even when management interface is down in OLT
Introduced heartbeat check toward the device. openolt adapter will keep checking for heartbeat.
The device state will be updated once the configured timers expires
Change-Id: I49e1247f412cee2dcf6a510b52c348c8fed2304d
diff --git a/adaptercore/device_handler.go b/adaptercore/device_handler.go
index 9c65cee..fcf625f 100644
--- a/adaptercore/device_handler.go
+++ b/adaptercore/device_handler.go
@@ -76,11 +76,12 @@
eventMgr *OpenOltEventMgr
resourceMgr *rsrcMgr.OpenOltResourceMgr
- discOnus sync.Map
- onus sync.Map
- portStats *OpenOltStatisticsMgr
- metrics *pmmetrics.PmMetrics
- stopCollector chan bool
+ discOnus sync.Map
+ onus sync.Map
+ portStats *OpenOltStatisticsMgr
+ metrics *pmmetrics.PmMetrics
+ stopCollector chan bool
+ stopHeartbeatCheck chan bool
}
//OnuDevice represents ONU related info
@@ -133,6 +134,7 @@
dh.exitChannel = make(chan int, 1)
dh.lockDevice = sync.RWMutex{}
dh.stopCollector = make(chan bool, 2)
+ dh.stopHeartbeatCheck = make(chan bool, 2)
dh.metrics = pmmetrics.NewPmMetrics(cloned.Id, pmmetrics.Frequency(150), pmmetrics.FrequencyOverride(false), pmmetrics.Grouped(false), pmmetrics.Metrics(pmNames))
//TODO initialize the support classes.
return &dh
@@ -643,6 +645,7 @@
}
go startCollector(dh)
+ go startHeartbeatCheck(dh)
}
//GetOfpDeviceInfo Gets the Ofp information of the given device
@@ -1130,7 +1133,7 @@
dh.discOnus = sync.Map{}
dh.onus = sync.Map{}
- go dh.notifyChildDevices()
+ go dh.notifyChildDevices("unreachable")
cloned := proto.Clone(device).(*voltha.Device)
// Update the all ports state on that device to disable
if err := dh.coreProxy.PortsStateUpdate(context.TODO(), cloned.Id, voltha.OperStatus_UNKNOWN); err != nil {
@@ -1142,11 +1145,11 @@
return nil
}
-func (dh *DeviceHandler) notifyChildDevices() {
+func (dh *DeviceHandler) notifyChildDevices(state string) {
// Update onu state as unreachable in onu adapter
onuInd := oop.OnuIndication{}
- onuInd.OperState = "unreachable"
+ onuInd.OperState = state
//get the child device for the parent device
onuDevices, err := dh.coreProxy.GetChildDevices(context.TODO(), dh.device.Id)
if err != nil {
@@ -1326,6 +1329,8 @@
log.Debug("Removed-device-from-Resource-manager-KV-store")
// Stop the Stats collector
dh.stopCollector <- true
+ // stop the heartbeat check routine
+ dh.stopHeartbeatCheck <- true
//Reset the state
if dh.Client != nil {
if _, err := dh.Client.Reboot(context.Background(), new(oop.Empty)); err != nil {
@@ -1473,3 +1478,52 @@
func (dh *DeviceHandler) formOnuKey(intfID, onuID uint32) string {
return "" + strconv.Itoa(int(intfID)) + "." + strconv.Itoa(int(onuID))
}
+
+func startHeartbeatCheck(dh *DeviceHandler) {
+ // start the heartbeat check towards the OLT.
+ var timerCheck *time.Timer
+
+ for {
+ heartbeatTimer := time.NewTimer(dh.openOLT.HeartbeatCheckInterval)
+ select {
+ case <-heartbeatTimer.C:
+ ctx, cancel := context.WithTimeout(context.Background(), dh.openOLT.GrpcTimeoutInterval)
+ if heartBeat, err := dh.Client.HeartbeatCheck(ctx, new(oop.Empty)); err != nil {
+ log.Error("Hearbeat failed")
+ if timerCheck == nil {
+ // start a after func, when expired will update the state to the core
+ timerCheck = time.AfterFunc(dh.openOLT.HeartbeatFailReportInterval, dh.updateStateUnreachable)
+ }
+ } else {
+ if timerCheck != nil {
+ if timerCheck.Stop() {
+ log.Debug("We got hearbeat within the timeout")
+ } else {
+
+ log.Debug("We got hearbeat after the timeout expired, changing the states")
+ go dh.notifyChildDevices("up")
+ if err := dh.coreProxy.DeviceStateUpdate(context.Background(), dh.device.Id, voltha.ConnectStatus_REACHABLE,
+ voltha.OperStatus_ACTIVE); err != nil {
+ log.Errorw("Failed to update device state", log.Fields{"deviceID": dh.device.Id, "error": err})
+ }
+ }
+ timerCheck = nil
+ }
+ log.Debugw("Hearbeat", log.Fields{"signature": heartBeat})
+ }
+ cancel()
+ case <-dh.stopHeartbeatCheck:
+ log.Debug("Stopping heart beat check")
+ return
+ }
+ }
+}
+
+func (dh *DeviceHandler) updateStateUnreachable() {
+
+ go dh.notifyChildDevices("unreachable")
+ if err := dh.coreProxy.DeviceStateUpdate(context.TODO(), dh.device.Id, voltha.ConnectStatus_UNREACHABLE, voltha.OperStatus_UNKNOWN); err != nil {
+ log.Errorw("error-updating-device-state", log.Fields{"deviceID": dh.device.Id, "error": err})
+ return
+ }
+}
diff --git a/adaptercore/openolt.go b/adaptercore/openolt.go
index 4c506a5..375b137 100644
--- a/adaptercore/openolt.go
+++ b/adaptercore/openolt.go
@@ -22,10 +22,12 @@
"errors"
"fmt"
"sync"
+ "time"
"github.com/opencord/voltha-lib-go/v2/pkg/adapters/adapterif"
"github.com/opencord/voltha-lib-go/v2/pkg/kafka"
"github.com/opencord/voltha-lib-go/v2/pkg/log"
+ "github.com/opencord/voltha-openolt-adapter/config"
ic "github.com/opencord/voltha-protos/v2/go/inter_container"
"github.com/opencord/voltha-protos/v2/go/openflow_13"
"github.com/opencord/voltha-protos/v2/go/voltha"
@@ -33,35 +35,42 @@
//OpenOLT structure holds the OLT information
type OpenOLT struct {
- deviceHandlers map[string]*DeviceHandler
- coreProxy adapterif.CoreProxy
- adapterProxy adapterif.AdapterProxy
- eventProxy adapterif.EventProxy
- kafkaICProxy *kafka.InterContainerProxy
- numOnus int
- KVStoreHost string
- KVStorePort int
- KVStoreType string
- exitChannel chan int
- lockDeviceHandlersMap sync.RWMutex
+ deviceHandlers map[string]*DeviceHandler
+ coreProxy adapterif.CoreProxy
+ adapterProxy adapterif.AdapterProxy
+ eventProxy adapterif.EventProxy
+ kafkaICProxy *kafka.InterContainerProxy
+ config *config.AdapterFlags
+ numOnus int
+ KVStoreHost string
+ KVStorePort int
+ KVStoreType string
+ exitChannel chan int
+ HeartbeatCheckInterval time.Duration
+ HeartbeatFailReportInterval time.Duration
+ GrpcTimeoutInterval time.Duration
+ lockDeviceHandlersMap sync.RWMutex
}
//NewOpenOLT returns a new instance of OpenOLT
func NewOpenOLT(ctx context.Context, kafkaICProxy *kafka.InterContainerProxy,
coreProxy adapterif.CoreProxy, adapterProxy adapterif.AdapterProxy,
- eventProxy adapterif.EventProxy, onuNumber int, kvStoreHost string,
- kvStorePort int, KVStoreType string) *OpenOLT {
+ eventProxy adapterif.EventProxy, cfg *config.AdapterFlags) *OpenOLT {
var openOLT OpenOLT
openOLT.exitChannel = make(chan int, 1)
openOLT.deviceHandlers = make(map[string]*DeviceHandler)
openOLT.kafkaICProxy = kafkaICProxy
- openOLT.numOnus = onuNumber
+ openOLT.config = cfg
+ openOLT.numOnus = cfg.OnuNumber
openOLT.coreProxy = coreProxy
openOLT.adapterProxy = adapterProxy
openOLT.eventProxy = eventProxy
- openOLT.KVStoreHost = kvStoreHost
- openOLT.KVStorePort = kvStorePort
- openOLT.KVStoreType = KVStoreType
+ openOLT.KVStoreHost = cfg.KVStoreHost
+ openOLT.KVStorePort = cfg.KVStorePort
+ openOLT.KVStoreType = cfg.KVStoreType
+ openOLT.HeartbeatCheckInterval = cfg.HeartbeatCheckInterval
+ openOLT.HeartbeatFailReportInterval = cfg.HeartbeatFailReportInterval
+ openOLT.GrpcTimeoutInterval = cfg.GrpcTimeoutInterval
openOLT.lockDeviceHandlersMap = sync.RWMutex{}
return &openOLT
}
diff --git a/adaptercore/openolt_test.go b/adaptercore/openolt_test.go
index ec1ca43..47e9ce3 100644
--- a/adaptercore/openolt_test.go
+++ b/adaptercore/openolt_test.go
@@ -32,6 +32,7 @@
com "github.com/opencord/voltha-lib-go/v2/pkg/adapters/common"
fu "github.com/opencord/voltha-lib-go/v2/pkg/flows"
"github.com/opencord/voltha-lib-go/v2/pkg/kafka"
+ "github.com/opencord/voltha-openolt-adapter/config"
ic "github.com/opencord/voltha-protos/v2/go/inter_container"
"github.com/opencord/voltha-protos/v2/go/openflow_13"
ofp "github.com/opencord/voltha-protos/v2/go/openflow_13"
@@ -102,22 +103,22 @@
func TestNewOpenOLT(t *testing.T) {
tests := []struct {
- name string
- fields *fields
- want *OpenOLT
+ name string
+ fields *fields
+ configFlags *config.AdapterFlags
+ want *OpenOLT
}{
- {"newopenolt-1", &fields{numOnus: 1, KVStorePort: 1, KVStoreType: "consul", KVStoreHost: "1.1.1.1"},
+ {"newopenolt-1", &fields{}, &config.AdapterFlags{OnuNumber: 1, KVStorePort: 1, KVStoreType: "consul", KVStoreHost: "1.1.1.1"},
&OpenOLT{numOnus: 1, KVStorePort: 1, KVStoreType: "consul", KVStoreHost: "1.1.1.1"}},
- {"newopenolt-2", &fields{numOnus: 2, KVStorePort: 2, KVStoreType: "etcd", KVStoreHost: "2.2.2.2"},
+ {"newopenolt-2", &fields{}, &config.AdapterFlags{OnuNumber: 2, KVStorePort: 2, KVStoreType: "etcd", KVStoreHost: "2.2.2.2"},
&OpenOLT{numOnus: 2, KVStorePort: 2, KVStoreType: "etcd", KVStoreHost: "2.2.2.2"}},
- {"newopenolt-3", &fields{numOnus: 3, KVStorePort: 3, KVStoreType: "consul", KVStoreHost: "3.3.3.3"},
+ {"newopenolt-3", &fields{}, &config.AdapterFlags{OnuNumber: 3, KVStorePort: 3, KVStoreType: "consul", KVStoreHost: "3.3.3.3"},
&OpenOLT{numOnus: 3, KVStorePort: 3, KVStoreType: "consul", KVStoreHost: "3.3.3.3"}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := NewOpenOLT(tt.fields.ctx, tt.fields.kafkaICProxy, tt.fields.coreProxy, tt.fields.adapterProxy,
- tt.fields.eventProxy, tt.fields.numOnus, tt.fields.KVStoreHost, tt.fields.KVStorePort,
- tt.fields.KVStoreType); reflect.TypeOf(got) != reflect.TypeOf(tt.want) && got != nil {
+ tt.fields.eventProxy, tt.configFlags); reflect.TypeOf(got) != reflect.TypeOf(tt.want) && got != nil {
t.Errorf("NewOpenOLT() error = %v, wantErr %v", got, tt.want)
}
})