SEBA-651 DHCP emulation triggered by DHCP flow install

Change-Id: Ibaeab5ee6daf19ee1d9214ce48363fba6c71fd27
diff --git a/README.md b/README.md
index 004f413..d0d08a2 100644
--- a/README.md
+++ b/README.md
@@ -95,11 +95,11 @@
   -H string
       IP address:port (default ":50060")
   -aw int
-      Wait time (sec) for activation WPA supplicants (default 10)
+      Wait time (sec) for activation WPA supplicants after EAPOL flow entry installed (default 2)
   -d string
       Debug Level(TRACE DEBUG INFO WARN ERROR) (default "DEBUG")
   -dw int
-      Wait time (sec) for activation DHCP clients (default 20)
+      Wait time (sec) for activation DHCP clients after DHCP flow entry installed (default 2)
   -i int
       Number of PON-IF ports (default 1)
   -id int
diff --git a/core/grpc_service.go b/core/grpc_service.go
index c030c18..cb1ee95 100644
--- a/core/grpc_service.go
+++ b/core/grpc_service.go
@@ -200,6 +200,8 @@
 			"c_tag": flow.Action.IVid,
 		}).Debug("OLT receives FlowAdd().")
 
+
+		// EAPOL flow
 		if flow.Classifier.EthType == uint32(layers.EthernetTypeEAPOL) {
 			omcistate := omci.GetOnuOmciState(onu.IntfID, onu.OnuID)
 			if omcistate != omci.DONE {
@@ -207,6 +209,19 @@
 			}
 			s.updateOnuIntState(intfid, onuid, device.ONU_OMCIACTIVE)
 		}
+
+		// DHCP flow
+		if flow.Classifier.EthType == uint32(layers.EthernetTypeIPv4) {
+			logger.Debug("Received flow's srcPort:%d dstPort:%d", flow.Classifier.SrcPort, flow.Classifier.DstPort)
+			if flow.Classifier.SrcPort == uint32(68) && flow.Classifier.DstPort == uint32(67) {
+				logger.Debug("OLT %d receives DHCP flow IntfID:%d OnuID:%d EType:%x GemPortID:%d", s.Olt.ID, flow.AccessIntfId, flow.OnuId, flow.Classifier.EthType, flow.GemportId)
+				omcistate := omci.GetOnuOmciState(onu.IntfID, onu.OnuID)
+				if omcistate != omci.DONE {
+					logger.Warn("FlowAdd() OMCI state %d is not \"DONE\"", omci.GetOnuOmciState(onu.OnuID, onu.IntfID))
+				}
+				s.updateOnuIntState(intfid, onuid, device.ONU_AUTHENTICATED)
+			}
+		}
 	}
 	return new(openolt.Empty), nil
 }
@@ -215,7 +230,7 @@
 func (s *Server) FlowRemove(c context.Context, flow *openolt.Flow) (*openolt.Empty, error) {
 	logger.Debug("OLT %d receives FlowRemove()", s.Olt.ID)
 	onu, err := s.GetOnuByID(uint32(flow.OnuId), uint32(flow.AccessIntfId))
-	if err == nil{
+	if err == nil {
 		utils.LoggerWithOnu(onu).WithFields(log.Fields{
 			"olt":   s.Olt.ID,
 			"c_tag": flow.Action.IVid,
diff --git a/core/mediator.go b/core/mediator.go
index 4e8833a..b1d098d 100644
--- a/core/mediator.go
+++ b/core/mediator.go
@@ -31,6 +31,14 @@
 	"reflect"
 )
 
+const (
+	DEFAULT Mode = iota
+	AAA
+	BOTH
+)
+
+type Mode int
+
 type option struct {
 	address     string
 	port        uint32
@@ -54,8 +62,8 @@
 	npon := flag.Int("i", 1, "Number of PON-IF ports")
 	nonus := flag.Int("n", 1, "Number of ONUs per PON-IF port")
 	modeopt := flag.String("m", "default", "Emulation mode (default, aaa, both (aaa & dhcp))")
-	aaawait := flag.Int("aw", 10, "Wait time (sec) for activation WPA supplicants")
-	dhcpwait := flag.Int("dw", 20, "Wait time (sec) for activation DHCP clients")
+	aaawait := flag.Int("aw", 2, "Wait time (sec) for activation WPA supplicants after EAPOL flow entry installed")
+	dhcpwait := flag.Int("dw", 2, "Wait time (sec) for activation DHCP clients after DHCP flow entry installed")
 	dhcpservip := flag.String("s", "182.21.0.128", "DHCP Server IP Address")
 	intvl := flag.Int("v", 1000, "Interval each Indication (ms)")
 	kafkaBroker := flag.String("k", "", "Kafka broker")
@@ -179,14 +187,29 @@
 
 func transitOnu (key device.Devkey, current device.DeviceState, next device.DeviceState, tm *TestManager, o *option) error {
 	logger.Debug("trnsitOnu called with key: %v, current: %d, next: %d", key, current, next)
-	if current == device.ONU_ACTIVE && next == device.ONU_OMCIACTIVE {
-		t := tm.CreateTester(o, key)
-		if err := tm.StartTester(key, t); err != nil {
-			logger.Error("Cannot Start Executer error:%v", err)
+	if o.Mode == AAA || o.Mode == BOTH {
+		if current == device.ONU_ACTIVE && next == device.ONU_OMCIACTIVE {
+			t := tm.CreateTester("AAA", o, key, activateWPASupplicant, o.aaawait)
+			if err := tm.StartTester(t); err != nil {
+				logger.Error("Cannot Start AAA Executer error:%v", err)
+			}
+		} else if current == device.ONU_OMCIACTIVE && next == device.ONU_INACTIVE {
+			if err := tm.StopTester("AAA", key); err != nil {
+				logger.Error("Cannot Stop AAA Executer error:%v", err)
+			}
 		}
-	} else if current == device.ONU_OMCIACTIVE && next == device.ONU_INACTIVE {
-		if err := tm.StopTester(key); err != nil {
-			logger.Error("Cannot Start Executer error:%v", err)
+	}
+
+	if o.Mode == BOTH{
+		if current == device.ONU_OMCIACTIVE && next == device.ONU_AUTHENTICATED {
+			t := tm.CreateTester("DHCP", o, key, activateDHCPClient, o.dhcpwait)
+			if err := tm.StartTester(t); err != nil {
+				logger.Error("Cannot Start DHCP Executer error:%v", err)
+			}
+		} else if current == device.ONU_AUTHENTICATED && next == device.ONU_INACTIVE {
+			if err := tm.StopTester("DHCP", key); err != nil {
+				logger.Error("Cannot Stop DHCP Executer error:%v", err)
+			}
 		}
 	}
 	return nil
diff --git a/core/tester.go b/core/tester.go
index 2349fae..4d1118e 100644
--- a/core/tester.go
+++ b/core/tester.go
@@ -20,32 +20,25 @@
 	"context"
 	"os/exec"
 	"gerrit.opencord.org/voltha-bbsim/common/logger"
-	"golang.org/x/sync/errgroup"
 	"time"
 	"strconv"
 	"gerrit.opencord.org/voltha-bbsim/device"
 	"fmt"
 )
 
-const (
-	DEFAULT Mode = iota
-	AAA
-	BOTH
-)
-
-type Mode int
-
 type TestManager struct {
 	DhcpServerIP string
 	Pid          []int
-	testers      map[device.Devkey]*Tester
+	testers      map[string]map[device.Devkey]*Tester
 	ctx          context.Context
 	cancel       context.CancelFunc
 }
 
 type Tester struct {
+	Type string
 	Key device.Devkey
-	Mode Mode
+	Testfunc func(device.Devkey) error
+	Waitsec  int
 	ctx          context.Context
 	cancel       context.CancelFunc
 }
@@ -56,11 +49,13 @@
 	return t
 }
 
-func (*TestManager) CreateTester(opt *option, key device.Devkey) *Tester{
+func (*TestManager) CreateTester(testtype string, opt *option, key device.Devkey, fn func(device.Devkey) error, waitsec int) *Tester{
 	logger.Debug("CreateTester() called")
 	t := new(Tester)
-	t.Mode = opt.Mode
+	t.Type = testtype
 	t.Key = key
+	t.Testfunc = fn
+	t.Waitsec = waitsec
 	return t
 }
 
@@ -69,7 +64,9 @@
 	ctx, cancel := context.WithCancel(context.Background())
 	tm.ctx = ctx
 	tm.cancel = cancel
-	tm.testers = map[device.Devkey]*Tester{}
+	tm.testers = make(map[string]map[device.Devkey]*Tester)
+	tm.testers["AAA"] = map[device.Devkey]*Tester{}
+	tm.testers["DHCP"] = map[device.Devkey]*Tester{}
 	logger.Info("TestManager start")
 	return nil
 }
@@ -83,67 +80,50 @@
 	return nil
 }
 
-func (tm *TestManager) StartTester (key device.Devkey, t *Tester) error {
-	logger.Debug("StartTester called with key:%v", key)
-	if t.Mode == DEFAULT {
-		_, child := errgroup.WithContext(tm.ctx)
-		child, cancel := context.WithCancel(child)
-		t.ctx = child
-		t.cancel = cancel
-	} else if t.Mode == AAA || t.Mode == BOTH {
-		eg, child := errgroup.WithContext(tm.ctx)
-		child, cancel := context.WithCancel(child)
-		t.ctx = child
-		t.cancel = cancel
-		eg.Go(func() error {
-			err := activateWPASupplicant(key)
-			if err != nil {
-				return err
-			}
-			return nil
-		})
+func (tm *TestManager) StartTester (t *Tester) error {
+	testtype := t.Type
+	key := t.Key
+	waitsec := t.Waitsec
 
-		if t.Mode == BOTH {
-			waitForDHCP := 3
-			eg.Go(func() error {
-				tick := time.NewTicker(time.Second)
-				counter := 0
-				defer func() {
-					tick.Stop()
-					logger.Debug("exeDHCPTest Done")
-				}()
+	logger.Debug("StartTester type:%s called with key:%v", testtype, key)
+	child, cancel := context.WithCancel(tm.ctx)
+	t.ctx = child
+	t.cancel = cancel
+	go func() error {
+		tick := time.NewTicker(time.Second)
+		counter := 0
+		defer func() {
+			tick.Stop()
+			logger.Debug("Tester type:%s with key %v Done", testtype, key)
+		}()
 
-			L:
-				for counter < waitForDHCP {
-					select{
-					case <-tick.C:
-						counter ++
-						if counter == waitForDHCP {	// TODO: This should be fixed
-							break L
-						}
-					case <-child.Done():
-						return nil
-					}
+	L:
+		for counter < waitsec {
+			select{
+			case <-tick.C:
+				counter ++
+				if counter == waitsec {	// TODO: This should be fixed
+					break L
 				}
-				err := activateDHCPClient(key)
-				if err != nil {
-					return err
-				}
+			case <-child.Done():
 				return nil
-			})
+			}
 		}
-		if err := eg.Wait(); err != nil {
+		err := t.Testfunc(key)
+		if err != nil {
 			return err
 		}
-	}
-	tm.testers[key] = t
+		return nil
+	}()
+
+	tm.testers[testtype][key] = t
 	return nil
 }
 
-func (tm *TestManager) StopTester (key device.Devkey) error {
-	ts := tm.testers[key]
+func (tm *TestManager) StopTester (testtype string, key device.Devkey) error {
+	ts := tm.testers[testtype][key]
 	ts.cancel()
-	delete(tm.testers, key)
+	delete(tm.testers[testtype], key)
 	return nil
 }
 
diff --git a/device/device_onu.go b/device/device_onu.go
index 1e1ee88..f2338d7 100644
--- a/device/device_onu.go
+++ b/device/device_onu.go
@@ -29,6 +29,7 @@
 	ONU_INACTIVE   DeviceState = iota	//TODO: Each stage name should be more accurate
 	ONU_ACTIVE
 	ONU_OMCIACTIVE
+	ONU_AUTHENTICATED
 	ONU_FREE
 )