VOL-3882 Changing onuIndications channel to per PON instead of per ONU
Change-Id: Ieac25ae243a6e439f9aac46743cbfb83087528d7
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index 8eaf0bf..ec5bf84 100644
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -94,9 +94,9 @@
stopIndications chan bool
isReadIndicationRoutineActive bool
- totalPonPorts uint32
- perOnuChannel map[string]onuIndicationChannels
- perOnuChannelLock sync.Mutex
+ totalPonPorts uint32
+ perPonOnuIndicationChannel map[uint32]onuIndicationChannels
+ perPonOnuIndicationChannelLock sync.Mutex
// Slice of channels. Each channel in slice, index by (mcast-group-id modulo MaxNumOfGroupHandlerChannels)
// A go routine per index, waits on a unique channel for incoming mcast flow or group (add/modify/remove).
@@ -115,14 +115,13 @@
rdiRaised bool
}
-type perOnuIndication struct {
- ctx context.Context
- indication *oop.Indication
- serialNumber string
+type onuIndicationMsg struct {
+ ctx context.Context
+ indication *oop.Indication
}
type onuIndicationChannels struct {
- indicationChannel chan perOnuIndication
+ indicationChannel chan onuIndicationMsg
stopChannel chan struct{}
}
@@ -179,7 +178,7 @@
dh.metrics = pmmetrics.NewPmMetrics(cloned.Id, pmmetrics.Frequency(150), pmmetrics.FrequencyOverride(false), pmmetrics.Grouped(false), pmmetrics.Metrics(pmNames))
dh.activePorts = sync.Map{}
dh.stopIndications = make(chan bool, 1)
- dh.perOnuChannel = make(map[string]onuIndicationChannels)
+ dh.perPonOnuIndicationChannel = make(map[uint32]onuIndicationChannels)
// Create a slice of buffered channels for handling concurrent mcast flow/group.
dh.incomingMcastFlowOrGroup = make([]chan McastFlowOrGroupControlBlock, MaxNumOfGroupHandlerChannels)
for i := range dh.incomingMcastFlowOrGroup {
@@ -554,18 +553,16 @@
onuDiscInd := indication.GetOnuDiscInd()
logger.Infow(ctx, "received-onu-discovery-indication", log.Fields{"OnuDiscInd": onuDiscInd, "device-id": dh.device.Id})
- sn := dh.stringifySerialNumber(onuDiscInd.SerialNumber)
//put message to channel and return immediately
- dh.putOnuIndicationToChannel(ctx, indication, sn)
+ dh.putOnuIndicationToChannel(ctx, indication, onuDiscInd.GetIntfId())
case *oop.Indication_OnuInd:
span, ctx := log.CreateChildSpan(ctx, "onu-indication", log.Fields{"device-id": dh.device.Id})
defer span.Finish()
onuInd := indication.GetOnuInd()
logger.Infow(ctx, "received-onu-indication", log.Fields{"OnuInd": onuInd, "device-id": dh.device.Id})
- sn := dh.stringifySerialNumber(onuInd.SerialNumber)
//put message to channel and return immediately
- dh.putOnuIndicationToChannel(ctx, indication, sn)
+ dh.putOnuIndicationToChannel(ctx, indication, onuInd.GetIntfId())
case *oop.Indication_OmciInd:
span, ctx := log.CreateChildSpan(ctx, "omci-indication", log.Fields{"device-id": dh.device.Id})
defer span.Finish()
@@ -1129,10 +1126,11 @@
return nil
}
-func (dh *DeviceHandler) onuDiscIndication(ctx context.Context, onuDiscInd *oop.OnuDiscIndication, sn string) error {
+func (dh *DeviceHandler) onuDiscIndication(ctx context.Context, onuDiscInd *oop.OnuDiscIndication) error {
channelID := onuDiscInd.GetIntfId()
parentPortNo := IntfIDToPortNo(onuDiscInd.GetIntfId(), voltha.Port_PON_OLT)
+ sn := dh.stringifySerialNumber(onuDiscInd.SerialNumber)
logger.Infow(ctx, "new-discovery-indication", log.Fields{"sn": sn})
kwargs := make(map[string]interface{})
@@ -1263,7 +1261,7 @@
return nil
}
-func (dh *DeviceHandler) onuIndication(ctx context.Context, onuInd *oop.OnuIndication, serialNumber string) error {
+func (dh *DeviceHandler) onuIndication(ctx context.Context, onuInd *oop.OnuIndication) error {
kwargs := make(map[string]interface{})
ponPort := IntfIDToPortNo(onuInd.GetIntfId(), voltha.Port_PON_OLT)
@@ -1275,6 +1273,7 @@
"intfId": onuInd.GetIntfId(),
"device-id": dh.device.Id})
onuKey := dh.formOnuKey(onuInd.GetIntfId(), onuInd.OnuId)
+ serialNumber := dh.stringifySerialNumber(onuInd.SerialNumber)
errFields := log.Fields{"device-id": dh.device.Id}
@@ -1394,15 +1393,18 @@
}
func (dh *DeviceHandler) stringifyVendorSpecific(vendorSpecific []byte) string {
- tmp := fmt.Sprintf("%x", (uint32(vendorSpecific[0])>>4)&0x0f) +
- fmt.Sprintf("%x", uint32(vendorSpecific[0]&0x0f)) +
- fmt.Sprintf("%x", (uint32(vendorSpecific[1])>>4)&0x0f) +
- fmt.Sprintf("%x", (uint32(vendorSpecific[1]))&0x0f) +
- fmt.Sprintf("%x", (uint32(vendorSpecific[2])>>4)&0x0f) +
- fmt.Sprintf("%x", (uint32(vendorSpecific[2]))&0x0f) +
- fmt.Sprintf("%x", (uint32(vendorSpecific[3])>>4)&0x0f) +
- fmt.Sprintf("%x", (uint32(vendorSpecific[3]))&0x0f)
- return tmp
+ if len(vendorSpecific) > 3 {
+ tmp := fmt.Sprintf("%x", (uint32(vendorSpecific[0])>>4)&0x0f) +
+ fmt.Sprintf("%x", uint32(vendorSpecific[0]&0x0f)) +
+ fmt.Sprintf("%x", (uint32(vendorSpecific[1])>>4)&0x0f) +
+ fmt.Sprintf("%x", (uint32(vendorSpecific[1]))&0x0f) +
+ fmt.Sprintf("%x", (uint32(vendorSpecific[2])>>4)&0x0f) +
+ fmt.Sprintf("%x", (uint32(vendorSpecific[2]))&0x0f) +
+ fmt.Sprintf("%x", (uint32(vendorSpecific[3])>>4)&0x0f) +
+ fmt.Sprintf("%x", (uint32(vendorSpecific[3]))&0x0f)
+ return tmp
+ }
+ return ""
}
//UpdateFlowsBulk upates the bulk flow
@@ -1733,6 +1735,7 @@
return olterrors.NewErrAdapter("olt-reboot-failed", log.Fields{"device-id": dh.device.Id}, err).Log()
}
}
+ dh.removeOnuIndicationChannels(ctx)
// There is no need to update the core about operation status and connection status of the OLT.
// The OLT is getting deleted anyway and the core might have already cleared the OLT device from its DB.
// So any attempt to update the operation status and connection status of the OLT will result in core throwing an error back,
@@ -2215,7 +2218,6 @@
}
dh.onus.Delete(onuKey)
dh.discOnus.Delete(onuDevice.(*OnuDevice).serialNumber)
- dh.removeOnuIndicationChannels(ctx, onuDevice.(*OnuDevice).serialNumber)
return nil
}
@@ -2329,69 +2331,65 @@
return intfID
}
-func (dh *DeviceHandler) getOnuIndicationChannel(ctx context.Context, sn string) chan perOnuIndication {
- dh.perOnuChannelLock.Lock()
- if ch, ok := dh.perOnuChannel[sn]; ok {
- dh.perOnuChannelLock.Unlock()
+func (dh *DeviceHandler) getOnuIndicationChannel(ctx context.Context, intfID uint32) chan onuIndicationMsg {
+ dh.perPonOnuIndicationChannelLock.Lock()
+ if ch, ok := dh.perPonOnuIndicationChannel[intfID]; ok {
+ dh.perPonOnuIndicationChannelLock.Unlock()
return ch.indicationChannel
}
channels := onuIndicationChannels{
//We create a buffered channel here to avoid calling function to be blocked
- //in case of multiple indications from the same ONU,
+ //in case of multiple indications from the ONUs,
//especially in the case where indications are buffered in OLT.
- indicationChannel: make(chan perOnuIndication, 5),
+ indicationChannel: make(chan onuIndicationMsg, 500),
stopChannel: make(chan struct{}),
}
- dh.perOnuChannel[sn] = channels
- dh.perOnuChannelLock.Unlock()
- go dh.perOnuIndicationsRoutine(&channels)
+ dh.perPonOnuIndicationChannel[intfID] = channels
+ dh.perPonOnuIndicationChannelLock.Unlock()
+ go dh.onuIndicationsRoutine(&channels)
return channels.indicationChannel
}
-func (dh *DeviceHandler) removeOnuIndicationChannels(ctx context.Context, sn string) {
- logger.Debugw(ctx, "remove-onu-indication-channels", log.Fields{"sn": sn})
- dh.perOnuChannelLock.Lock()
- defer dh.perOnuChannelLock.Unlock()
- if ch, ok := dh.perOnuChannel[sn]; ok {
- close(ch.stopChannel)
- delete(dh.perOnuChannel, sn)
+func (dh *DeviceHandler) removeOnuIndicationChannels(ctx context.Context) {
+ logger.Debug(ctx, "remove-onu-indication-channels", log.Fields{"device-id": dh.device.Id})
+ dh.perPonOnuIndicationChannelLock.Lock()
+ defer dh.perPonOnuIndicationChannelLock.Unlock()
+ for _, v := range dh.perPonOnuIndicationChannel {
+ close(v.stopChannel)
}
-
+ dh.perPonOnuIndicationChannel = make(map[uint32]onuIndicationChannels)
}
-func (dh *DeviceHandler) putOnuIndicationToChannel(ctx context.Context, indication *oop.Indication, sn string) {
- ind := perOnuIndication{
- ctx: ctx,
- indication: indication,
- serialNumber: sn,
+func (dh *DeviceHandler) putOnuIndicationToChannel(ctx context.Context, indication *oop.Indication, intfID uint32) {
+ ind := onuIndicationMsg{
+ ctx: ctx,
+ indication: indication,
}
- logger.Debugw(ctx, "put-onu-indication-to-channel", log.Fields{"indication": indication, "sn": sn})
+ logger.Debugw(ctx, "put-onu-indication-to-channel", log.Fields{"indication": indication, "intfID": intfID})
// Send the onuIndication on the ONU channel
- dh.getOnuIndicationChannel(ctx, sn) <- ind
+ dh.getOnuIndicationChannel(ctx, intfID) <- ind
}
-func (dh *DeviceHandler) perOnuIndicationsRoutine(onuChannels *onuIndicationChannels) {
+func (dh *DeviceHandler) onuIndicationsRoutine(onuChannels *onuIndicationChannels) {
for {
select {
// process one indication per onu, before proceeding to the next one
case onuInd := <-onuChannels.indicationChannel:
logger.Debugw(onuInd.ctx, "calling-indication", log.Fields{"device-id": dh.device.Id,
- "ind": onuInd.indication, "sn": onuInd.serialNumber})
+ "ind": onuInd.indication})
switch onuInd.indication.Data.(type) {
case *oop.Indication_OnuInd:
- if err := dh.onuIndication(onuInd.ctx, onuInd.indication.GetOnuInd(), onuInd.serialNumber); err != nil {
+ if err := dh.onuIndication(onuInd.ctx, onuInd.indication.GetOnuInd()); err != nil {
_ = olterrors.NewErrAdapter("handle-indication-error", log.Fields{
"type": "onu-indication",
- "device-id": dh.device.Id,
- "sn": onuInd.serialNumber}, err).Log()
+ "device-id": dh.device.Id}, err).Log()
}
case *oop.Indication_OnuDiscInd:
- if err := dh.onuDiscIndication(onuInd.ctx, onuInd.indication.GetOnuDiscInd(), onuInd.serialNumber); err != nil {
+ if err := dh.onuDiscIndication(onuInd.ctx, onuInd.indication.GetOnuDiscInd()); err != nil {
_ = olterrors.NewErrAdapter("handle-indication-error", log.Fields{
"type": "onu-discovery",
- "device-id": dh.device.Id,
- "sn": onuInd.serialNumber}, err).Log()
+ "device-id": dh.device.Id}, err).Log()
}
}
case <-onuChannels.stopChannel:
diff --git a/internal/pkg/core/device_handler_test.go b/internal/pkg/core/device_handler_test.go
index bba8601..9770f15 100644
--- a/internal/pkg/core/device_handler_test.go
+++ b/internal/pkg/core/device_handler_test.go
@@ -1089,7 +1089,7 @@
t.Run(tt.name, func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
- _ = tt.devicehandler.onuDiscIndication(ctx, tt.args.onuDiscInd, tt.args.sn)
+ _ = tt.devicehandler.onuDiscIndication(ctx, tt.args.onuDiscInd)
//TODO: actually verify test cases
})
}