VOL-1357 BBSim - OMCI status management

AddFlow() triggers the update of ONU status into ONU_OMCIACTIVE.
The ONU status update is notified to the mediator, and then, it activates the tester.

Change-Id: I1a077026b872868a74a4b5d9849b65cf52baeaa4
diff --git a/core/core_server.go b/core/core_server.go
index 2fc494b..c57a3e9 100644
--- a/core/core_server.go
+++ b/core/core_server.go
@@ -200,6 +200,15 @@
 	}
 }
 
+func (s *Server) updateOnuIntState (intfid uint32, onuid uint32, state device.DeviceState) error {
+	onu, err := s.GetOnuByID(onuid)	//TODO: IntfID should be included ?
+	if err != nil {
+		return err
+	}
+	s.updateDevIntState(onu, state)
+	return nil
+}
+
 func (s *Server) activateOLT(stream openolt.Openolt_EnableIndicationServer) error {
 	defer logger.Debug("activateOLT() Done")
 	logger.Debug("activateOLT() Start")
@@ -368,6 +377,7 @@
 	defer func() {
 		close(unichannel)
 		logger.Debug("Closed unichannel ")
+		logger.Debug("runMainPktLoop Done")
 	}()
 	for intfid, _ := range s.Onumap {
 		for _, onu := range s.Onumap[intfid] {
@@ -418,7 +428,7 @@
 			le, _ := layerEth.(*layers.Ethernet)
 			ethtype := le.EthernetType
 
-			if ethtype == 0x888e {
+			if ethtype == layers.EthernetTypeEAPOL {
 				utils.LoggerWithOnu(onu).WithFields(log.Fields{
 					"gemId": gemid,
 				}).Info("Received upstream packet is EAPOL.")
@@ -491,7 +501,7 @@
 	if layerEth != nil {
 		pkt, _ := layerEth.(*layers.Ethernet)
 		ethtype := pkt.EthernetType
-		if ethtype == 0x888e {
+		if ethtype == layers.EthernetTypeEAPOL {
 			utils.LoggerWithOnu(onu).Info("Received downstream packet is EAPOL.")
 		} else if layerDHCP := rawpkt.Layer(layers.LayerTypeDHCPv4); layerDHCP != nil {
 			utils.LoggerWithOnu(onu).WithFields(log.Fields{
@@ -543,6 +553,17 @@
 	return true
 }
 
+func (s *Server) isAllOnuOmciActive() bool {
+	for _, onus := range s.Onumap {
+		for _, onu := range onus{
+			if onu.GetIntState() != device.ONU_OMCIACTIVE {
+				return false
+			}
+		}
+	}
+	return true
+}
+
 func getGemPortID(intfid uint32, onuid uint32) (uint32, error) {
 	// FIXME - check for errors
 	return uint32(omci.GetGemPortId(intfid, onuid)), nil
diff --git a/core/grpc_service.go b/core/grpc_service.go
index 6c07833..f7bf672 100644
--- a/core/grpc_service.go
+++ b/core/grpc_service.go
@@ -25,6 +25,7 @@
 	"gerrit.opencord.org/voltha-bbsim/protos"
 	"github.com/google/gopacket"
 	"github.com/google/gopacket/layers"
+	omci "github.com/opencord/omci-sim"
 	log "github.com/sirupsen/logrus"
 	"golang.org/x/net/context"
 	"google.golang.org/grpc"
@@ -138,14 +139,27 @@
 }
 
 func (s *Server) FlowAdd(c context.Context, flow *openolt.Flow) (*openolt.Empty, error) {
+	logger.Debug("OLT %d receives FlowAdd() IntfID:%d OnuID:%d EType:%x:.", s.Olt.ID, flow.AccessIntfId, flow.OnuId, flow.Classifier.EthType)
+	onu, err := s.GetOnuByID(uint32(flow.OnuId))
 
-	onu, _ := s.GetOnuByID(uint32(flow.OnuId))
+	if err == nil {
+		intfid := onu.IntfID
+		onuid := onu.OnuID
 
-	utils.LoggerWithOnu(onu).WithFields(log.Fields{
-		"olt":   s.Olt.ID,
-		"c_tag": flow.Action.IVid,
-	}).Debug("OLT receives FlowAdd().")
+		utils.LoggerWithOnu(onu).WithFields(log.Fields{
+			"olt":   s.Olt.ID,
+			"c_tag": flow.Action.IVid,
+		}).Debug("OLT receives FlowAdd().")
 
+		if flow.Classifier.EthType == uint32(layers.EthernetTypeEAPOL) {
+			omcistate := omci.GetOnuOmciState(onu.OnuID, onu.IntfID)
+			if omcistate == omci.DONE {
+				s.updateOnuIntState(intfid, onuid, device.ONU_OMCIACTIVE)
+			} else {
+				logger.Error("FlowAdd() OMCI state %d is not \"DONE\"", omci.GetOnuOmciState(onu.OnuID, onu.IntfID))
+			}
+		}
+	}
 	return new(openolt.Empty), nil
 }
 
diff --git a/core/mediator.go b/core/mediator.go
index b973493..89715c6 100644
--- a/core/mediator.go
+++ b/core/mediator.go
@@ -170,7 +170,7 @@
 
 func transitOlt (s *Server, current device.DeviceState, next device.DeviceState, tester *Tester, o *option) error {
 	if current == device.OLT_PREACTIVE && next == device.OLT_ACTIVE {
-		tester.Start(s)
+
 	} else if current == device.OLT_ACTIVE && next == device.OLT_PREACTIVE{
 		tester.Stop(s)
 	}
@@ -178,5 +178,12 @@
 }
 
 func transitOnu (s *Server, key device.Devkey, current device.DeviceState, next device.DeviceState, tester *Tester, o *option) error {
+	if current == device.ONU_ACTIVE && next == device.ONU_OMCIACTIVE {
+		if s.isAllOnuOmciActive(){	//TODO: This should be per-ONU control, not by cheking All ONU's status
+			tester.Start(s)
+		}
+	} else if (current == device.ONU_OMCIACTIVE || current == device.ONU_ACTIVE) &&
+		next == device.ONU_INACTIVE {
+	}
 	return nil
 }
\ No newline at end of file
diff --git a/core/omci.go b/core/omci.go
index e35fd90..1d1a78c 100644
--- a/core/omci.go
+++ b/core/omci.go
@@ -17,8 +17,6 @@
 package core
 
 import (
-	"time"
-
 	"context"
 
 	"gerrit.opencord.org/voltha-bbsim/common/logger"
@@ -28,25 +26,6 @@
 )
 
 func RunOmciResponder(ctx context.Context, omciOut chan openolt.OmciMsg, omciIn chan openolt.OmciIndication, onumap map[uint32][]*device.Onu, errch chan error) {
-	go func() { //For monitoring the OMCI states TODO: This part should be eliminated because it is out of scope of this library
-		t := time.NewTicker(1 * time.Second)
-		defer t.Stop()
-		for {
-			select {
-			case <-t.C:
-				logger.Debug("Monitor omci init state")
-				if isAllOmciInitDone(onumap) {
-					logger.Info("OmciRun - All the omci initialization wes done")
-					close(errch)
-					return
-				}
-			case <-ctx.Done():
-				logger.Debug("Omci Monitoring process was done")
-				return
-			}
-		}
-	}()
-
 	go func() {
 		defer logger.Debug("Omci response process was done")
 
@@ -93,13 +72,3 @@
 	return p
 }
 
-func isAllOmciInitDone(onumap map[uint32][]*device.Onu) bool {
-	for _, onus := range onumap {
-		for _, onu := range onus {
-			if omci.GetOnuOmciState(onu.OnuID, onu.IntfID) == omci.INCOMPLETE {
-				return false
-			}
-		}
-	}
-	return true
-}
diff --git a/device/device_onu.go b/device/device_onu.go
index c1a7aad..79a4be2 100644
--- a/device/device_onu.go
+++ b/device/device_onu.go
@@ -26,8 +26,9 @@
 )
 
 const (
-	ONU_PREACTIVATED DeviceState = iota
+	ONU_INACTIVE   DeviceState = iota	//TODO: Each stage name should be more accurate
 	ONU_ACTIVE
+	ONU_OMCIACTIVE
 )
 
 type Onu struct {
@@ -49,7 +50,7 @@
 	onus := []*Onu{}
 	for i := 0; i < int(nonus); i++ {
 		onu := Onu{}
-		onu.InternalState = ONU_PREACTIVATED
+		onu.InternalState = ONU_INACTIVE
 		onu.mu = &sync.Mutex{}
 		onu.IntfID = intfid
 		onu.OltID = oltid
@@ -64,7 +65,7 @@
 
 func (onu *Onu) Initialize() {
 	onu.OperState = "up"
-	onu.InternalState = ONU_PREACTIVATED
+	onu.InternalState = ONU_INACTIVE
 }
 
 func ValidateONU(targetonu openolt.Onu, regonus map[uint32][]*Onu) bool {