[VOL-4061] Adding protection to avoid pushing flows to disabled ONU
Do not terminate processOltMessage loop on stream closure, it's needed
to emulate the gRPC disconnect
Change-Id: Ia5af48e880dac51337c9f851b65c44c44cf85127
diff --git a/internal/bbsim/devices/helpers.go b/internal/bbsim/devices/helpers.go
index 722a1f1..0d793d8 100644
--- a/internal/bbsim/devices/helpers.go
+++ b/internal/bbsim/devices/helpers.go
@@ -18,7 +18,6 @@
import (
"math/rand"
- "strconv"
"time"
"github.com/looplab/fsm"
@@ -61,15 +60,6 @@
)
}
-// deprecated
-func onuSnToString(sn *openolt.SerialNumber) string {
- s := string(sn.VendorId)
- for _, i := range sn.VendorSpecific {
- s = s + strconv.FormatInt(int64(i/16), 16) + strconv.FormatInt(int64(i%16), 16)
- }
- return s
-}
-
func publishEvent(eventType string, intfID int32, onuID int32, onuSerial string) {
if olt.PublishEvents {
currentTime := time.Now()
diff --git a/internal/bbsim/devices/olt.go b/internal/bbsim/devices/olt.go
index 305a7ce..4ffcaa7 100644
--- a/internal/bbsim/devices/olt.go
+++ b/internal/bbsim/devices/olt.go
@@ -742,9 +742,11 @@
case <-ctx.Done():
oltLogger.Debug("OLT Indication processing canceled via context")
break loop
- case <-stream.Context().Done():
- oltLogger.Debug("OLT Indication processing canceled via stream context")
- break loop
+ // do not terminate this loop if the stream is closed,
+ // when we restart the gRPC server it will automatically reconnect and we need this loop to send indications
+ //case <-stream.Context().Done():
+ // oltLogger.Debug("OLT Indication processing canceled via stream context")
+ // break loop
case message, ok := <-ch:
if !ok {
if ctx.Err() != nil {
@@ -867,10 +869,6 @@
// GRPC Endpoints
func (o *OltDevice) ActivateOnu(context context.Context, onu *openolt.Onu) (*openolt.Empty, error) {
- oltLogger.WithFields(log.Fields{
- "OnuSn": onuSnToString(onu.SerialNumber),
- }).Info("Received ActivateOnu call from VOLTHA")
- publishEvent("ONU-activate-indication-received", int32(onu.IntfId), int32(onu.OnuId), onuSnToString(onu.SerialNumber))
pon, _ := o.GetPonById(onu.IntfId)
@@ -879,6 +877,12 @@
olt.GemPortIDs[onu.IntfId][onu.OnuId] = make(map[uint32]map[int32]map[uint64]bool)
_onu, _ := pon.GetOnuBySn(onu.SerialNumber)
+
+ publishEvent("ONU-activate-indication-received", int32(onu.IntfId), int32(onu.OnuId), _onu.Sn())
+ oltLogger.WithFields(log.Fields{
+ "OnuSn": _onu.Sn(),
+ }).Info("Received ActivateOnu call from VOLTHA")
+
_onu.SetID(onu.OnuId)
if err := _onu.InternalState.Event(OnuTxEnable); err != nil {
@@ -970,6 +974,7 @@
},
}
o.channel <- oltMsg
+
return new(openolt.Empty), nil
}
@@ -1097,11 +1102,26 @@
}).Error("Can't find Onu")
return nil, err
}
+
+ // if the ONU is disabled reject the flow
+ // as per VOL-4061 there is a small window during which the ONU is disabled
+ // but the port has not been reported as down to ONOS
+ if onu.InternalState.Is(OnuStatePonDisabled) || onu.InternalState.Is(OnuStateDisabled) {
+ oltLogger.WithFields(log.Fields{
+ "OnuId": flow.OnuId,
+ "IntfId": flow.AccessIntfId,
+ "Flow": flow,
+ "SerialNumber": onu.Sn(),
+ "InternalState": onu.InternalState.Current(),
+ }).Error("rejected-flow-because-of-onu-state")
+ return nil, fmt.Errorf("onu-%s-is-currently-%s", onu.Sn(), onu.InternalState.Current())
+ }
+
if !o.enablePerf {
onu.Flows = append(onu.Flows, flowKey)
// Generate event on first flow for ONU
if len(onu.Flows) == 1 {
- publishEvent("Flow-add-received", int32(onu.PonPortID), int32(onu.ID), onuSnToString(onu.SerialNumber))
+ publishEvent("Flow-add-received", int32(onu.PonPortID), int32(onu.ID), onu.Sn())
}
}
@@ -1191,7 +1211,7 @@
return new(openolt.Empty), nil
}
onu.DeleteFlow(flowKey)
- publishEvent("Flow-remove-received", int32(onu.PonPortID), int32(onu.ID), onuSnToString(onu.SerialNumber))
+ publishEvent("Flow-remove-received", int32(onu.PonPortID), int32(onu.ID), onu.Sn())
}
// delete from olt flows