[VOL-4061] Adding protection to avoid pushing flows to disabled ONU
Do not terminate processOltMessage loop on stream closure, it's needed
to emulate the gRPC disconnect

Change-Id: Ia5af48e880dac51337c9f851b65c44c44cf85127
diff --git a/internal/bbsim/devices/helpers.go b/internal/bbsim/devices/helpers.go
index 722a1f1..0d793d8 100644
--- a/internal/bbsim/devices/helpers.go
+++ b/internal/bbsim/devices/helpers.go
@@ -18,7 +18,6 @@
 
 import (
 	"math/rand"
-	"strconv"
 	"time"
 
 	"github.com/looplab/fsm"
@@ -61,15 +60,6 @@
 	)
 }
 
-// deprecated
-func onuSnToString(sn *openolt.SerialNumber) string {
-	s := string(sn.VendorId)
-	for _, i := range sn.VendorSpecific {
-		s = s + strconv.FormatInt(int64(i/16), 16) + strconv.FormatInt(int64(i%16), 16)
-	}
-	return s
-}
-
 func publishEvent(eventType string, intfID int32, onuID int32, onuSerial string) {
 	if olt.PublishEvents {
 		currentTime := time.Now()
diff --git a/internal/bbsim/devices/olt.go b/internal/bbsim/devices/olt.go
index 305a7ce..4ffcaa7 100644
--- a/internal/bbsim/devices/olt.go
+++ b/internal/bbsim/devices/olt.go
@@ -742,9 +742,11 @@
 		case <-ctx.Done():
 			oltLogger.Debug("OLT Indication processing canceled via context")
 			break loop
-		case <-stream.Context().Done():
-			oltLogger.Debug("OLT Indication processing canceled via stream context")
-			break loop
+		// do not terminate this loop if the stream is closed,
+		// when we restart the gRPC server it will automatically reconnect and we need this loop to send indications
+		//case <-stream.Context().Done():
+		//	oltLogger.Debug("OLT Indication processing canceled via stream context")
+		//	break loop
 		case message, ok := <-ch:
 			if !ok {
 				if ctx.Err() != nil {
@@ -867,10 +869,6 @@
 // GRPC Endpoints
 
 func (o *OltDevice) ActivateOnu(context context.Context, onu *openolt.Onu) (*openolt.Empty, error) {
-	oltLogger.WithFields(log.Fields{
-		"OnuSn": onuSnToString(onu.SerialNumber),
-	}).Info("Received ActivateOnu call from VOLTHA")
-	publishEvent("ONU-activate-indication-received", int32(onu.IntfId), int32(onu.OnuId), onuSnToString(onu.SerialNumber))
 
 	pon, _ := o.GetPonById(onu.IntfId)
 
@@ -879,6 +877,12 @@
 	olt.GemPortIDs[onu.IntfId][onu.OnuId] = make(map[uint32]map[int32]map[uint64]bool)
 
 	_onu, _ := pon.GetOnuBySn(onu.SerialNumber)
+
+	publishEvent("ONU-activate-indication-received", int32(onu.IntfId), int32(onu.OnuId), _onu.Sn())
+	oltLogger.WithFields(log.Fields{
+		"OnuSn": _onu.Sn(),
+	}).Info("Received ActivateOnu call from VOLTHA")
+
 	_onu.SetID(onu.OnuId)
 
 	if err := _onu.InternalState.Event(OnuTxEnable); err != nil {
@@ -970,6 +974,7 @@
 		},
 	}
 	o.channel <- oltMsg
+
 	return new(openolt.Empty), nil
 }
 
@@ -1097,11 +1102,26 @@
 			}).Error("Can't find Onu")
 			return nil, err
 		}
+
+		// if the ONU is disabled reject the flow
+		// as per VOL-4061 there is a small window during which the ONU is disabled
+		// but the port has not been reported as down to ONOS
+		if onu.InternalState.Is(OnuStatePonDisabled) || onu.InternalState.Is(OnuStateDisabled) {
+			oltLogger.WithFields(log.Fields{
+				"OnuId":         flow.OnuId,
+				"IntfId":        flow.AccessIntfId,
+				"Flow":          flow,
+				"SerialNumber":  onu.Sn(),
+				"InternalState": onu.InternalState.Current(),
+			}).Error("rejected-flow-because-of-onu-state")
+			return nil, fmt.Errorf("onu-%s-is-currently-%s", onu.Sn(), onu.InternalState.Current())
+		}
+
 		if !o.enablePerf {
 			onu.Flows = append(onu.Flows, flowKey)
 			// Generate event on first flow for ONU
 			if len(onu.Flows) == 1 {
-				publishEvent("Flow-add-received", int32(onu.PonPortID), int32(onu.ID), onuSnToString(onu.SerialNumber))
+				publishEvent("Flow-add-received", int32(onu.PonPortID), int32(onu.ID), onu.Sn())
 			}
 		}
 
@@ -1191,7 +1211,7 @@
 				return new(openolt.Empty), nil
 			}
 			onu.DeleteFlow(flowKey)
-			publishEvent("Flow-remove-received", int32(onu.PonPortID), int32(onu.ID), onuSnToString(onu.SerialNumber))
+			publishEvent("Flow-remove-received", int32(onu.PonPortID), int32(onu.ID), onu.Sn())
 		}
 
 		// delete from olt flows