VOL-4312: Reboot OLT test fails on Edgecore GPON ASGvOLT64 OLT
- Fixed issue where 'N*N' (where N is the number of PON ports)
number of TP Managers were getting created for an OLT device
instead of just 'N' TP managers.
It was seen that after OLT reboot burst of conncurrent TP
managers (N*N) were trying to create etcd pool connections
and at some point the etcd stopped responding and this lead
re-connection handling to OLT after reboot failing.
VOL-4313: Stop stale go routines on monitoring flow and group
messages for the ONUs after OLT reboot or OLT delete
- There are stale go routines for the ONU monitoring for flow
and group messages. Although these never caused problem it
is always recomendded to stop such routines as they consume
memory and cpu.
Change-Id: Ie4d1ce9155dbd15e1831361d50cb959402045cc8
diff --git a/VERSION b/VERSION
index 6307708..b727628 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-3.6.2-dev
+3.6.2
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index 10bdd09..bc6ca7c 100644
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -102,7 +102,9 @@
// Slice of channels. Each channel in slice, index by (mcast-group-id modulo MaxNumOfGroupHandlerChannels)
// A go routine per index, waits on a unique channel for incoming mcast flow or group (add/modify/remove).
- incomingMcastFlowOrGroup []chan McastFlowOrGroupControlBlock
+ incomingMcastFlowOrGroup []chan McastFlowOrGroupControlBlock
+ stopMcastHandlerRoutine []chan bool
+ mcastHandlerRoutineActive []bool
adapterPreviouslyConnected bool
agentPreviouslyConnected bool
@@ -186,13 +188,17 @@
dh.perPonOnuIndicationChannel = make(map[uint32]onuIndicationChannels)
// Create a slice of buffered channels for handling concurrent mcast flow/group.
dh.incomingMcastFlowOrGroup = make([]chan McastFlowOrGroupControlBlock, MaxNumOfGroupHandlerChannels)
+ dh.stopMcastHandlerRoutine = make([]chan bool, MaxNumOfGroupHandlerChannels)
+ dh.mcastHandlerRoutineActive = make([]bool, MaxNumOfGroupHandlerChannels)
for i := range dh.incomingMcastFlowOrGroup {
dh.incomingMcastFlowOrGroup[i] = make(chan McastFlowOrGroupControlBlock, MaxNumOfGroupHandlerChannels)
+ dh.stopMcastHandlerRoutine[i] = make(chan bool, 1)
// Spin up a go routine to handling incoming mcast flow/group (add/modify/remove).
// There will be MaxNumOfGroupHandlerChannels number of mcastFlowOrGroupChannelHandlerRoutine go routines.
// These routines will be blocked on the dh.incomingMcastFlowOrGroup[mcast-group-id modulo MaxNumOfGroupHandlerChannels] channel
// for incoming mcast flow/group to be processed serially.
- go dh.mcastFlowOrGroupChannelHandlerRoutine(dh.incomingMcastFlowOrGroup[i])
+ dh.mcastHandlerRoutineActive[i] = true
+ go dh.mcastFlowOrGroupChannelHandlerRoutine(i, dh.incomingMcastFlowOrGroup[i], dh.stopMcastHandlerRoutine[i])
}
//TODO initialize the support classes.
return &dh
@@ -1749,6 +1755,10 @@
}
dh.lockDevice.RUnlock()
dh.removeOnuIndicationChannels(ctx)
+ go dh.StopAllMcastHandlerRoutines(ctx)
+ for _, flMgr := range dh.flowMgr {
+ go flMgr.StopAllFlowHandlerRoutines(ctx)
+ }
//Reset the state
if dh.Client != nil {
if _, err := dh.Client.Reboot(ctx, new(oop.Empty)); err != nil {
@@ -2053,6 +2063,11 @@
}
dh.lockDevice.RUnlock()
+ go dh.StopAllMcastHandlerRoutines(ctx)
+ for _, flMgr := range dh.flowMgr {
+ go flMgr.StopAllFlowHandlerRoutines(ctx)
+ }
+
//reset adapter reconcile flag
dh.adapterPreviouslyConnected = false
@@ -2421,66 +2436,86 @@
} else {
return errors.New("flow-and-group-both-nil")
}
- // Derive the appropriate go routine to handle the request by a simple module operation.
- // There are only MaxNumOfGroupHandlerChannels number of channels to handle the mcast flow or group
- dh.incomingMcastFlowOrGroup[groupID%MaxNumOfGroupHandlerChannels] <- mcastFlowOrGroupCb
- // Wait for handler to return error value
- err := <-errChan
- logger.Debugw(ctx, "process-flow-or-group--received-resp", log.Fields{"err": err, "totalTimeInSeconds": time.Since(startTime).Milliseconds()})
- return err
+ mcastRoutineIdx := groupID % MaxNumOfGroupHandlerChannels
+ if dh.mcastHandlerRoutineActive[mcastRoutineIdx] {
+ // Derive the appropriate go routine to handle the request by a simple module operation.
+ // There are only MaxNumOfGroupHandlerChannels number of channels to handle the mcast flow or group
+ dh.incomingMcastFlowOrGroup[groupID%MaxNumOfGroupHandlerChannels] <- mcastFlowOrGroupCb
+ // Wait for handler to return error value
+ err := <-errChan
+ logger.Debugw(ctx, "process-flow-or-group--received-resp", log.Fields{"err": err, "totalTimeInSeconds": time.Since(startTime).Milliseconds()})
+ return err
+ }
+ logger.Errorw(ctx, "mcast handler routine not active for onu", log.Fields{"mcastRoutineIdx": mcastRoutineIdx})
+ return fmt.Errorf("mcast-handler-routine-not-active-for-index-%v", mcastRoutineIdx)
}
// mcastFlowOrGroupChannelHandlerRoutine routine to handle incoming mcast flow/group message
-func (dh *DeviceHandler) mcastFlowOrGroupChannelHandlerRoutine(mcastFlowOrGroupChannel chan McastFlowOrGroupControlBlock) {
+func (dh *DeviceHandler) mcastFlowOrGroupChannelHandlerRoutine(routineIndex int, mcastFlowOrGroupChannel chan McastFlowOrGroupControlBlock, stopHandler chan bool) {
for {
+ select {
// block on the channel to receive an incoming mcast flow/group
// process the flow completely before proceeding to handle the next flow
- mcastFlowOrGroupCb := <-mcastFlowOrGroupChannel
- if mcastFlowOrGroupCb.flow != nil {
- if mcastFlowOrGroupCb.flowOrGroupAction == McastFlowOrGroupAdd {
- logger.Debugw(mcastFlowOrGroupCb.ctx, "adding-mcast-flow",
- log.Fields{"device-id": dh.device.Id,
- "flowToAdd": mcastFlowOrGroupCb.flow})
- // The mcast flow is not unique to any particular PON port, so it is OK to default to PON0
- err := dh.flowMgr[0].AddFlow(mcastFlowOrGroupCb.ctx, mcastFlowOrGroupCb.flow, nil)
- // Pass the return value over the return channel
- *mcastFlowOrGroupCb.errChan <- err
- } else { // flow remove
- logger.Debugw(mcastFlowOrGroupCb.ctx, "removing-mcast-flow",
- log.Fields{"device-id": dh.device.Id,
- "flowToRemove": mcastFlowOrGroupCb.flow})
- // The mcast flow is not unique to any particular PON port, so it is OK to default to PON0
- err := dh.flowMgr[0].RemoveFlow(mcastFlowOrGroupCb.ctx, mcastFlowOrGroupCb.flow)
- // Pass the return value over the return channel
- *mcastFlowOrGroupCb.errChan <- err
+ case mcastFlowOrGroupCb := <-mcastFlowOrGroupChannel:
+ if mcastFlowOrGroupCb.flow != nil {
+ if mcastFlowOrGroupCb.flowOrGroupAction == McastFlowOrGroupAdd {
+ logger.Debugw(mcastFlowOrGroupCb.ctx, "adding-mcast-flow",
+ log.Fields{"device-id": dh.device.Id,
+ "flowToAdd": mcastFlowOrGroupCb.flow})
+ // The mcast flow is not unique to any particular PON port, so it is OK to default to PON0
+ err := dh.flowMgr[0].AddFlow(mcastFlowOrGroupCb.ctx, mcastFlowOrGroupCb.flow, nil)
+ // Pass the return value over the return channel
+ *mcastFlowOrGroupCb.errChan <- err
+ } else { // flow remove
+ logger.Debugw(mcastFlowOrGroupCb.ctx, "removing-mcast-flow",
+ log.Fields{"device-id": dh.device.Id,
+ "flowToRemove": mcastFlowOrGroupCb.flow})
+ // The mcast flow is not unique to any particular PON port, so it is OK to default to PON0
+ err := dh.flowMgr[0].RemoveFlow(mcastFlowOrGroupCb.ctx, mcastFlowOrGroupCb.flow)
+ // Pass the return value over the return channel
+ *mcastFlowOrGroupCb.errChan <- err
+ }
+ } else { // mcast group
+ if mcastFlowOrGroupCb.flowOrGroupAction == McastFlowOrGroupAdd {
+ logger.Debugw(mcastFlowOrGroupCb.ctx, "adding-mcast-group",
+ log.Fields{"device-id": dh.device.Id,
+ "groupToAdd": mcastFlowOrGroupCb.group})
+ err := dh.groupMgr.AddGroup(mcastFlowOrGroupCb.ctx, mcastFlowOrGroupCb.group)
+ // Pass the return value over the return channel
+ *mcastFlowOrGroupCb.errChan <- err
+ } else if mcastFlowOrGroupCb.flowOrGroupAction == McastFlowOrGroupModify { // group modify
+ logger.Debugw(mcastFlowOrGroupCb.ctx, "modifying-mcast-group",
+ log.Fields{"device-id": dh.device.Id,
+ "groupToModify": mcastFlowOrGroupCb.group})
+ err := dh.groupMgr.ModifyGroup(mcastFlowOrGroupCb.ctx, mcastFlowOrGroupCb.group)
+ // Pass the return value over the return channel
+ *mcastFlowOrGroupCb.errChan <- err
+ } else { // group remove
+ logger.Debugw(mcastFlowOrGroupCb.ctx, "removing-mcast-group",
+ log.Fields{"device-id": dh.device.Id,
+ "groupToRemove": mcastFlowOrGroupCb.group})
+ err := dh.groupMgr.DeleteGroup(mcastFlowOrGroupCb.ctx, mcastFlowOrGroupCb.group)
+ // Pass the return value over the return channel
+ *mcastFlowOrGroupCb.errChan <- err
+ }
}
- } else { // mcast group
- if mcastFlowOrGroupCb.flowOrGroupAction == McastFlowOrGroupAdd {
- logger.Debugw(mcastFlowOrGroupCb.ctx, "adding-mcast-group",
- log.Fields{"device-id": dh.device.Id,
- "groupToAdd": mcastFlowOrGroupCb.group})
- err := dh.groupMgr.AddGroup(mcastFlowOrGroupCb.ctx, mcastFlowOrGroupCb.group)
- // Pass the return value over the return channel
- *mcastFlowOrGroupCb.errChan <- err
- } else if mcastFlowOrGroupCb.flowOrGroupAction == McastFlowOrGroupModify { // group modify
- logger.Debugw(mcastFlowOrGroupCb.ctx, "modifying-mcast-group",
- log.Fields{"device-id": dh.device.Id,
- "groupToModify": mcastFlowOrGroupCb.group})
- err := dh.groupMgr.ModifyGroup(mcastFlowOrGroupCb.ctx, mcastFlowOrGroupCb.group)
- // Pass the return value over the return channel
- *mcastFlowOrGroupCb.errChan <- err
- } else { // group remove
- logger.Debugw(mcastFlowOrGroupCb.ctx, "removing-mcast-group",
- log.Fields{"device-id": dh.device.Id,
- "groupToRemove": mcastFlowOrGroupCb.group})
- err := dh.groupMgr.DeleteGroup(mcastFlowOrGroupCb.ctx, mcastFlowOrGroupCb.group)
- // Pass the return value over the return channel
- *mcastFlowOrGroupCb.errChan <- err
- }
+ case <-stopHandler:
+ dh.mcastHandlerRoutineActive[routineIndex] = false
+ return
}
}
}
+// StopAllMcastHandlerRoutines stops all flow handler routines. Call this when device is being rebooted or deleted
+func (dh *DeviceHandler) StopAllMcastHandlerRoutines(ctx context.Context) {
+ for i, v := range dh.stopMcastHandlerRoutine {
+ if dh.mcastHandlerRoutineActive[i] {
+ v <- true
+ }
+ }
+ logger.Debug(ctx, "stopped all mcast handler routines")
+}
+
func (dh *DeviceHandler) getOltPortCounters(ctx context.Context, oltPortInfo *extension.GetOltPortCounters) *extension.SingleGetValueResponse {
singleValResp := extension.SingleGetValueResponse{
diff --git a/internal/pkg/core/device_handler_test.go b/internal/pkg/core/device_handler_test.go
index 57103c4..f021312 100644
--- a/internal/pkg/core/device_handler_test.go
+++ b/internal/pkg/core/device_handler_test.go
@@ -234,12 +234,16 @@
// Create a slice of buffered channels for handling concurrent flows per ONU.
// The additional entry (+1) is to handle the NNI trap flows on a separate channel from individual ONUs channel
dh.flowMgr[i].incomingFlows = make([]chan flowControlBlock, MaxOnusPerPon+1)
+ dh.flowMgr[i].stopFlowHandlerRoutine = make([]chan bool, MaxOnusPerPon+1)
+ dh.flowMgr[i].flowHandlerRoutineActive = make([]bool, MaxOnusPerPon+1)
for j := range dh.flowMgr[i].incomingFlows {
dh.flowMgr[i].incomingFlows[j] = make(chan flowControlBlock, maxConcurrentFlowsPerOnu)
+ dh.flowMgr[i].stopFlowHandlerRoutine[j] = make(chan bool, 1)
// Spin up a go routine to handling incoming flows (add/remove).
// There will be on go routine per ONU.
// This routine will be blocked on the flowMgr.incomingFlows[onu-id] channel for incoming flows.
- go dh.flowMgr[i].perOnuFlowHandlerRoutine(dh.flowMgr[i].incomingFlows[j])
+ dh.flowMgr[i].flowHandlerRoutineActive[j] = true
+ go dh.flowMgr[i].perOnuFlowHandlerRoutine(j, dh.flowMgr[i].incomingFlows[j], dh.flowMgr[i].stopFlowHandlerRoutine[j])
}
dh.flowMgr[i].onuGemInfoMap = make(map[uint32]*resourcemanager.OnuGemInfo)
}
diff --git a/internal/pkg/core/openolt_flowmgr.go b/internal/pkg/core/openolt_flowmgr.go
index 6136798..61da77d 100644
--- a/internal/pkg/core/openolt_flowmgr.go
+++ b/internal/pkg/core/openolt_flowmgr.go
@@ -219,7 +219,9 @@
// Slice of channels. Each channel in slice, index by ONU ID, queues flows per ONU.
// A go routine per ONU, waits on the unique channel (indexed by ONU ID) for incoming flows (add/remove)
- incomingFlows []chan flowControlBlock
+ incomingFlows []chan flowControlBlock
+ stopFlowHandlerRoutine []chan bool
+ flowHandlerRoutineActive []bool
}
//NewFlowManager creates OpenOltFlowMgr object and initializes the parameters
@@ -232,7 +234,7 @@
flowMgr.ponPortIdx = ponPortIdx
flowMgr.grpMgr = grpMgr
flowMgr.resourceMgr = rMgr
- if err = flowMgr.populateTechProfilePerPonPort(ctx); err != nil {
+ if err = flowMgr.populateTechProfileForCurrentPonPort(ctx); err != nil {
logger.Errorw(ctx, "error-while-populating-tech-profile-mgr", log.Fields{"err": err})
return nil
}
@@ -243,12 +245,16 @@
// Create a slice of buffered channels for handling concurrent flows per ONU.
// The additional entry (+1) is to handle the NNI trap flows on a separate channel from individual ONUs channel
flowMgr.incomingFlows = make([]chan flowControlBlock, MaxOnusPerPon+1)
+ flowMgr.stopFlowHandlerRoutine = make([]chan bool, MaxOnusPerPon+1)
+ flowMgr.flowHandlerRoutineActive = make([]bool, MaxOnusPerPon+1)
for i := range flowMgr.incomingFlows {
flowMgr.incomingFlows[i] = make(chan flowControlBlock, maxConcurrentFlowsPerOnu)
+ flowMgr.stopFlowHandlerRoutine[i] = make(chan bool, 1)
// Spin up a go routine to handling incoming flows (add/remove).
// There will be on go routine per ONU.
// This routine will be blocked on the flowMgr.incomingFlows[onu-id] channel for incoming flows.
- go flowMgr.perOnuFlowHandlerRoutine(flowMgr.incomingFlows[i])
+ flowMgr.flowHandlerRoutineActive[i] = true
+ go flowMgr.perOnuFlowHandlerRoutine(i, flowMgr.incomingFlows[i], flowMgr.stopFlowHandlerRoutine[i])
}
flowMgr.onuGemInfoMap = make(map[uint32]*rsrcMgr.OnuGemInfo)
//Load the onugem info cache from kv store on flowmanager start
@@ -937,38 +943,27 @@
}
}
-func (f *OpenOltFlowMgr) populateTechProfilePerPonPort(ctx context.Context) error {
- var tpCount int
+func (f *OpenOltFlowMgr) populateTechProfileForCurrentPonPort(ctx context.Context) error {
for _, techRange := range f.resourceMgr.DevInfo.Ranges {
for _, intfID := range techRange.IntfIds {
- var err error
- f.techprofile, err = tp.NewTechProfile(ctx, f.resourceMgr.PonRsrMgr, f.resourceMgr.PonRsrMgr.Backend,
- f.resourceMgr.PonRsrMgr.Address, f.deviceHandler.cm.Backend.PathPrefix)
- if err != nil || f.techprofile == nil {
- logger.Errorw(ctx, "failed-to-allocate-to-techprofile-for-pon-port", log.Fields{"intfID": intfID, "err": err})
- return fmt.Errorf("failed-to-allocate-tech-profile-for-pon-port--pon-%v-err-%v", intfID, err)
+ if intfID == f.ponPortIdx { // initialize only for the pon port that this flow manager is managing
+ var err error
+ f.techprofile, err = tp.NewTechProfile(ctx, f.resourceMgr.PonRsrMgr, f.resourceMgr.PonRsrMgr.Backend,
+ f.resourceMgr.PonRsrMgr.Address, f.deviceHandler.cm.Backend.PathPrefix)
+ if err != nil || f.techprofile == nil {
+ logger.Errorw(ctx, "failed-to-allocate-to-techprofile-for-pon-port", log.Fields{"intfID": intfID, "err": err})
+ return fmt.Errorf("failed-to-allocate-tech-profile-for-pon-port--pon-%v-err-%v", intfID, err)
+ }
+ logger.Debugw(ctx, "init-tech-profile-done",
+ log.Fields{
+ "intf-id": intfID,
+ "device-id": f.deviceHandler.device.Id})
+ return nil
}
- tpCount++
- logger.Debugw(ctx, "init-tech-profile-done",
- log.Fields{
- "intf-id": intfID,
- "device-id": f.deviceHandler.device.Id})
}
}
- //Make sure we have as many tech_profiles as there are pon ports on the device
- if tpCount != int(f.resourceMgr.DevInfo.GetPonPorts()) {
- return olterrors.NewErrInvalidValue(log.Fields{
- "reason": "tP-count-does-not-match-number-of-pon-ports",
- "tech-profile-count": tpCount,
- "pon-port-count": f.resourceMgr.DevInfo.GetPonPorts(),
- "device-id": f.deviceHandler.device.Id}, nil)
- }
- logger.Infow(ctx, "populated-techprofile-for-ponports-successfully",
- log.Fields{
- "numofTech": tpCount,
- "numPonPorts": f.resourceMgr.DevInfo.GetPonPorts(),
- "device-id": f.deviceHandler.device.Id})
- return nil
+ logger.Errorw(ctx, "pon port not found in the the device pon port range", log.Fields{"intfID": f.ponPortIdx})
+ return fmt.Errorf("pon-port-idx-not-found-in-the-device-info-pon-port-range-%v", f.ponPortIdx)
}
func (f *OpenOltFlowMgr) addUpstreamDataPathFlow(ctx context.Context, flowContext *flowContext) error {
@@ -2176,41 +2171,60 @@
if inPort != InvalidPort && outPort != InvalidPort {
_, _, onuID, _ = ExtractAccessFromFlow(inPort, outPort)
}
- // inPort or outPort is InvalidPort for trap-from-nni flows.
- // In the that case onuID is 0 which is the reserved index for trap-from-nni flows in the f.incomingFlows slice
- // Send the flowCb on the ONU flow channel
- f.incomingFlows[onuID] <- flowCb
- // Wait on the channel for flow handlers return value
- err := <-errChan
- logger.Infow(ctx, "process-flow--received-resp", log.Fields{"err": err, "totalTimeSeconds": time.Since(startTime).Seconds()})
- return err
+ if f.flowHandlerRoutineActive[onuID] {
+ // inPort or outPort is InvalidPort for trap-from-nni flows.
+ // In the that case onuID is 0 which is the reserved index for trap-from-nni flows in the f.incomingFlows slice
+ // Send the flowCb on the ONU flow channel
+ f.incomingFlows[onuID] <- flowCb
+ // Wait on the channel for flow handlers return value
+ err := <-errChan
+ logger.Infow(ctx, "process-flow--received-resp", log.Fields{"err": err, "totalTimeSeconds": time.Since(startTime).Seconds()})
+ return err
+ }
+ logger.Errorw(ctx, "flow handler routine not active for onu", log.Fields{"onuID": onuID, "ponPortIdx": f.ponPortIdx})
+ return fmt.Errorf("flow-handler-routine-not-active-for-onu-%v-pon-%d", onuID, f.ponPortIdx)
}
// This routine is unique per ONU ID and blocks on flowControlBlock channel for incoming flows
// Each incoming flow is processed in a synchronous manner, i.e., the flow is processed to completion before picking another
-func (f *OpenOltFlowMgr) perOnuFlowHandlerRoutine(subscriberFlowChannel chan flowControlBlock) {
+func (f *OpenOltFlowMgr) perOnuFlowHandlerRoutine(handlerRoutineIndex int, subscriberFlowChannel chan flowControlBlock, stopHandler chan bool) {
for {
+ select {
// block on the channel to receive an incoming flow
// process the flow completely before proceeding to handle the next flow
- flowCb := <-subscriberFlowChannel
- if flowCb.addFlow {
- logger.Info(flowCb.ctx, "adding-flow-start")
- startTime := time.Now()
- err := f.AddFlow(flowCb.ctx, flowCb.flow, flowCb.flowMetadata)
- logger.Infow(flowCb.ctx, "adding-flow-complete", log.Fields{"processTimeSecs": time.Since(startTime).Seconds()})
- // Pass the return value over the return channel
- *flowCb.errChan <- err
- } else {
- logger.Info(flowCb.ctx, "removing-flow-start")
- startTime := time.Now()
- err := f.RemoveFlow(flowCb.ctx, flowCb.flow)
- logger.Infow(flowCb.ctx, "removing-flow-complete", log.Fields{"processTimeSecs": time.Since(startTime).Seconds()})
- // Pass the return value over the return channel
- *flowCb.errChan <- err
+ case flowCb := <-subscriberFlowChannel:
+ if flowCb.addFlow {
+ logger.Info(flowCb.ctx, "adding-flow-start")
+ startTime := time.Now()
+ err := f.AddFlow(flowCb.ctx, flowCb.flow, flowCb.flowMetadata)
+ logger.Infow(flowCb.ctx, "adding-flow-complete", log.Fields{"processTimeSecs": time.Since(startTime).Seconds()})
+ // Pass the return value over the return channel
+ *flowCb.errChan <- err
+ } else {
+ logger.Info(flowCb.ctx, "removing-flow-start")
+ startTime := time.Now()
+ err := f.RemoveFlow(flowCb.ctx, flowCb.flow)
+ logger.Infow(flowCb.ctx, "removing-flow-complete", log.Fields{"processTimeSecs": time.Since(startTime).Seconds()})
+ // Pass the return value over the return channel
+ *flowCb.errChan <- err
+ }
+ case <-stopHandler:
+ f.flowHandlerRoutineActive[handlerRoutineIndex] = false
+ return
}
}
}
+// StopAllFlowHandlerRoutines stops all flow handler routines. Call this when device is being rebooted or deleted
+func (f *OpenOltFlowMgr) StopAllFlowHandlerRoutines(ctx context.Context) {
+ for i, v := range f.stopFlowHandlerRoutine {
+ if f.flowHandlerRoutineActive[i] {
+ v <- true
+ }
+ }
+ logger.Debugw(ctx, "stopped all flow handler routines", log.Fields{"ponPortIdx": f.ponPortIdx})
+}
+
// AddFlow add flow to device
// nolint: gocyclo
func (f *OpenOltFlowMgr) AddFlow(ctx context.Context, flow *ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) error {
diff --git a/internal/pkg/core/openolt_groupmgr.go b/internal/pkg/core/openolt_groupmgr.go
index 02a18c0..2bbbc3a 100644
--- a/internal/pkg/core/openolt_groupmgr.go
+++ b/internal/pkg/core/openolt_groupmgr.go
@@ -47,7 +47,7 @@
//NewGroupManager creates OpenOltGroupMgr object and initializes the parameters
func NewGroupManager(ctx context.Context, dh *DeviceHandler, rMgr *rsrcMgr.OpenOltResourceMgr) *OpenOltGroupMgr {
- logger.Infow(ctx, "initializing-flow-manager", log.Fields{"device-id": dh.device.Id})
+ logger.Infow(ctx, "initializing-group-manager", log.Fields{"device-id": dh.device.Id})
var grpMgr OpenOltGroupMgr
grpMgr.deviceHandler = dh
grpMgr.resourceMgr = rMgr