VOL-1360 BBSim - Error handling in AAA/DHCP client activation trigger

Change-Id: Idd327ed5de38ee4026c786e5c384dfa63624ad0a
diff --git a/core/core_server.go b/core/core_server.go
index 69d6d69..610e88b 100644
--- a/core/core_server.go
+++ b/core/core_server.go
@@ -31,6 +31,7 @@
 	"github.com/google/gopacket/pcap"
 	log "github.com/sirupsen/logrus"
 	"google.golang.org/grpc"
+	"golang.org/x/sync/errgroup"
 )
 
 const (
@@ -328,131 +329,157 @@
 func (s *Server) runPacketInDaemon(ctx context.Context, stream openolt.Openolt_EnableIndicationServer) error {
 	logger.Debug("runPacketInDaemon Start")
 	defer logger.Debug("runPacketInDaemon Done")
-	unichannel := make(chan Packet, 2048)
 
+	errch := make(chan error)
+	OmciRun(ctx, s.omciOut, s.omciIn, s.Onumap, errch)
+	parent := ctx
+	eg, child := errgroup.WithContext(parent)
+	child, cancel := context.WithCancel(child)
 
-	logger.Debug("runOMCIDaemon Start")
-	defer logger.Debug("runOMCIDaemon Done")
-	errch := make (chan error)
-	OmciRun(s.omciOut, s.omciIn, s.Onumap, errch)
-	go func(){
-		<-errch	// Wait for OmciInitialization
-		s.updateState(ACTIVE)
-	}()
-
-	for intfid, _ := range s.Onumap {
-		for _, onu := range s.Onumap[intfid] {
-			onuid := onu.OnuID
-			ioinfo, err := s.identifyUniIoinfo("inside", intfid, onuid)
-			if err != nil {
-				utils.LoggerWithOnu(onu).Error("Fail to identifyUniIoinfo (onuid: %d): %v", onuid, err)
-				return err
+	eg.Go (func() error {
+		logger.Debug("runOMCIDaemon Start")
+		defer logger.Debug("runOMCIDaemon Done")
+		select{
+		case v, ok := <- errch:	// Wait for OmciInitialization
+			if ok {	//Error
+				logger.Error("Error happend in Omci:%s", v)
+				return v
+			} else {	//Close
+				s.updateState(ACTIVE)
 			}
-			uhandler := ioinfo.handler
-			go RecvWorker(ioinfo, uhandler, unichannel)
+		case <- child.Done():
+			return nil
 		}
-	}
+		return nil
+	})
 
-	ioinfo, err := s.IdentifyNniIoinfo("inside")
-	if err != nil {
-		return err
-	}
-	nhandler := ioinfo.handler
-	nnichannel := make(chan Packet, 32)
-	go RecvWorker(ioinfo, nhandler, nnichannel)
-
-	data := &openolt.Indication_PktInd{}
-	for {
-		select {
-		case msg := <-s.omciIn:
-			logger.Debug("OLT %d send omci indication, IF %v (ONU-ID: %v) pkt:%x.", s.Olt.ID, msg.IntfId, msg.OnuId, msg.Pkt)
-			omci := &openolt.Indication_OmciInd{OmciInd: &msg}
-			if err := stream.Send(&openolt.Indication{Data: omci}); err != nil {
-				logger.Error("send omci indication failed.", err)
-				continue
+	eg.Go (func () error {
+		unichannel := make(chan Packet, 2048)
+		defer func() {
+			close(unichannel)
+			logger.Debug("Closed unichannel ")
+		}()
+		for intfid, _ := range s.Onumap {
+			for _, onu := range s.Onumap[intfid] {
+				onuid := onu.OnuID
+				ioinfo, err := s.identifyUniIoinfo("inside", intfid, onuid)
+				if err != nil {
+					utils.LoggerWithOnu(onu).Error("Fail to identifyUniIoinfo (onuid: %d): %v", onuid, err)
+					return err
+				}
+				uhandler := ioinfo.handler
+				go RecvWorker(ioinfo, uhandler, unichannel)
 			}
-		case unipkt := <-unichannel:
-			onuid := unipkt.Info.onuid
-			onu, _ := s.GetOnuByID(onuid)
-			utils.LoggerWithOnu(onu).Debug("Received packet from UNI in grpc Server")
-			if unipkt.Info == nil || unipkt.Info.iotype != "uni" {
-				logger.Debug("WARNING: This packet does not come from UNI ")
-				continue
-			}
+		}
 
-			intfid := unipkt.Info.intfid
-			gemid, _ := getGemPortID(intfid, onuid)
-			pkt := unipkt.Pkt
-			layerEth := pkt.Layer(layers.LayerTypeEthernet)
-			le, _ := layerEth.(*layers.Ethernet)
-			ethtype := le.EthernetType
+		ioinfo, err := s.IdentifyNniIoinfo("inside")
+		if err != nil {
+			return err
+		}
+		nhandler := ioinfo.handler
+		nnichannel := make(chan Packet, 32)
+		go RecvWorker(ioinfo, nhandler, nnichannel)
+		defer func(){
+			logger.Debug("PacketInDaemon thread receives close ")
+			close(nnichannel)
+		}()
 
-			if ethtype == 0x888e {
-				utils.LoggerWithOnu(onu).WithFields(log.Fields{
-					"gemId": gemid,
-				}).Info("Received upstream packet is EAPOL.")
-			} else if layerDHCP := pkt.Layer(layers.LayerTypeDHCPv4); layerDHCP != nil {
-				utils.LoggerWithOnu(onu).WithFields(log.Fields{
-					"gemId": gemid,
-				}).Info("Received upstream packet is DHCP.")
+		data := &openolt.Indication_PktInd{}
+		for {
+			select {
+			case msg := <-s.omciIn:
+				logger.Debug("OLT %d send omci indication, IF %v (ONU-ID: %v) pkt:%x.", s.Olt.ID, msg.IntfId, msg.OnuId, msg.Pkt)
+				omci := &openolt.Indication_OmciInd{OmciInd: &msg}
+				if err := stream.Send(&openolt.Indication{Data: omci}); err != nil {
+					logger.Error("send omci indication failed.", err)
+					continue
+				}
+			case unipkt := <-unichannel:
+				onuid := unipkt.Info.onuid
+				onu, _ := s.GetOnuByID(onuid)
+				utils.LoggerWithOnu(onu).Debug("Received packet from UNI in grpc Server")
+				if unipkt.Info == nil || unipkt.Info.iotype != "uni" {
+					logger.Debug("WARNING: This packet does not come from UNI ")
+					continue
+				}
 
-				//C-TAG
-				sn := convB2S(onu.SerialNumber.VendorSpecific)
-				if ctag, ok := s.CtagMap[sn]; ok == true {
-					tagpkt, err := PushVLAN(pkt, uint16(ctag), onu)
-					if err != nil {
-						utils.LoggerWithOnu(onu).WithFields(log.Fields{
-							"gemId": gemid,
-						}).Error("Fail to tag C-tag")
+				intfid := unipkt.Info.intfid
+				gemid, _ := getGemPortID(intfid, onuid)
+				pkt := unipkt.Pkt
+				layerEth := pkt.Layer(layers.LayerTypeEthernet)
+				le, _ := layerEth.(*layers.Ethernet)
+				ethtype := le.EthernetType
+
+				if ethtype == 0x888e {
+					utils.LoggerWithOnu(onu).WithFields(log.Fields{
+						"gemId": gemid,
+					}).Info("Received upstream packet is EAPOL.")
+				} else if layerDHCP := pkt.Layer(layers.LayerTypeDHCPv4); layerDHCP != nil {
+					utils.LoggerWithOnu(onu).WithFields(log.Fields{
+						"gemId": gemid,
+					}).Info("Received upstream packet is DHCP.")
+
+					//C-TAG
+					sn := convB2S(onu.SerialNumber.VendorSpecific)
+					if ctag, ok := s.CtagMap[sn]; ok == true {
+						tagpkt, err := PushVLAN(pkt, uint16(ctag), onu)
+						if err != nil {
+							utils.LoggerWithOnu(onu).WithFields(log.Fields{
+								"gemId": gemid,
+							}).Error("Fail to tag C-tag")
+						} else {
+							pkt = tagpkt
+						}
 					} else {
-						pkt = tagpkt
+						utils.LoggerWithOnu(onu).WithFields(log.Fields{
+							"gemId":   gemid,
+							"cTagMap": s.CtagMap,
+						}).Error("Could not find onuid in CtagMap", onuid, sn, s.CtagMap)
 					}
 				} else {
 					utils.LoggerWithOnu(onu).WithFields(log.Fields{
-						"gemId":   gemid,
-						"cTagMap": s.CtagMap,
-					}).Error("Could not find onuid in CtagMap", onuid, sn, s.CtagMap)
+						"gemId": gemid,
+					}).Info("Received upstream packet is of unknow type, skipping.")
+					continue
 				}
-			} else {
-				utils.LoggerWithOnu(onu).WithFields(log.Fields{
-					"gemId": gemid,
-				}).Info("Received upstream packet is of unknow type, skipping.")
-				continue
-			}
 
-			utils.LoggerWithOnu(onu).Info("sendPktInd - UNI Packet")
-			data = &openolt.Indication_PktInd{PktInd: &openolt.PacketIndication{IntfType: "pon", IntfId: intfid, GemportId: gemid, Pkt: pkt.Data()}}
-			if err := stream.Send(&openolt.Indication{Data: data}); err != nil {
-				logger.Error("Fail to send PktInd indication.", err)
-				return err
-			}
+				utils.LoggerWithOnu(onu).Info("sendPktInd - UNI Packet")
+				data = &openolt.Indication_PktInd{PktInd: &openolt.PacketIndication{IntfType: "pon", IntfId: intfid, GemportId: gemid, Pkt: pkt.Data()}}
+				if err := stream.Send(&openolt.Indication{Data: data}); err != nil {
+					logger.Error("Fail to send PktInd indication.", err)
+					return err
+				}
 
-		case nnipkt := <-nnichannel:
-			if nnipkt.Info == nil || nnipkt.Info.iotype != "nni" {
-				logger.Debug("WARNING: This packet does not come from NNI ")
-				continue
-			}
-			onuid := nnipkt.Info.onuid
-			onu, _ := s.GetOnuByID(onuid)
+			case nnipkt := <-nnichannel:
+				if nnipkt.Info == nil || nnipkt.Info.iotype != "nni" {
+					logger.Debug("WARNING: This packet does not come from NNI ")
+					continue
+				}
+				onuid := nnipkt.Info.onuid
+				onu, _ := s.GetOnuByID(onuid)
 
-			utils.LoggerWithOnu(onu).Info("Received packet from NNI in grpc Server.")
-			intfid := nnipkt.Info.intfid
-			pkt := nnipkt.Pkt
-			utils.LoggerWithOnu(onu).Info("sendPktInd - NNI Packet")
-			data = &openolt.Indication_PktInd{PktInd: &openolt.PacketIndication{IntfType: "nni", IntfId: intfid, Pkt: pkt.Data()}}
-			if err := stream.Send(&openolt.Indication{Data: data}); err != nil {
-				logger.Error("Fail to send PktInd indication.", err)
-				return err
-			}
+				utils.LoggerWithOnu(onu).Info("Received packet from NNI in grpc Server.")
+				intfid := nnipkt.Info.intfid
+				pkt := nnipkt.Pkt
+				utils.LoggerWithOnu(onu).Info("sendPktInd - NNI Packet")
+				data = &openolt.Indication_PktInd{PktInd: &openolt.PacketIndication{IntfType: "nni", IntfId: intfid, Pkt: pkt.Data()}}
+				if err := stream.Send(&openolt.Indication{Data: data}); err != nil {
+					logger.Error("Fail to send PktInd indication.", err)
+					return err
+				}
 
-		case <-ctx.Done():
-			logger.Debug("PacketInDaemon thread receives close ")
-			close(unichannel)
-			logger.Debug("Closed unichannel ")
-			close(nnichannel)
-			logger.Debug("Closed nnichannel ")
-			return nil
+			case <-child.Done():
+				logger.Debug("Closed nnichannel ")
+				return nil
+			}
 		}
+		return nil
+	})
+
+	logger.Debug("Wait here")
+	if err := eg.Wait(); err != nil {
+		logger.Error("Error happend in runPacketInDaemon:%s", err)
+		cancel()
 	}
 	return nil
 }
diff --git a/core/io_worker.go b/core/io_worker.go
index 2533a19..077b2c1 100644
--- a/core/io_worker.go
+++ b/core/io_worker.go
@@ -22,13 +22,12 @@
 	"strconv"
 	"time"
 
-	"gerrit.opencord.org/voltha-bbsim/device"
-
 	"gerrit.opencord.org/voltha-bbsim/common/logger"
-	"gerrit.opencord.org/voltha-bbsim/common/utils"
 	"github.com/google/gopacket"
 	"github.com/google/gopacket/layers"
 	"github.com/google/gopacket/pcap"
+	"gerrit.opencord.org/voltha-bbsim/device"
+	"gerrit.opencord.org/voltha-bbsim/common/utils"
 )
 
 func RecvWorker(io *Ioinfo, handler *pcap.Handle, r chan Packet) {
diff --git a/core/mediator.go b/core/mediator.go
index ee25264..8baeafd 100644
--- a/core/mediator.go
+++ b/core/mediator.go
@@ -51,8 +51,8 @@
 	npon := flag.Int("i", 1, "Number of PON-IF ports")
 	nonus := flag.Int("n", 1, "Number of ONUs per PON-IF port")
 	modeopt := flag.String("m", "default", "Emulation mode (default, aaa, both (aaa & dhcp))")
-	aaawait := flag.Int("aw", 30, "Wait time (sec) for activation WPA supplicants")
-	dhcpwait := flag.Int("dw", 50, "Wait time (sec) for activation DHCP clients")
+	aaawait := flag.Int("aw", 10, "Wait time (sec) for activation WPA supplicants")
+	dhcpwait := flag.Int("dw", 20, "Wait time (sec) for activation DHCP clients")
 	dhcpservip := flag.String("s", "182.21.0.1", "DHCP Server IP Address")
 	intvl := flag.Int("v", 1, "Interval each Indication")
 	intvl_test := flag.Int("V", 1, "Interval each Indication")
diff --git a/core/omci.go b/core/omci.go
index acf7bbe..3a4335d 100644
--- a/core/omci.go
+++ b/core/omci.go
@@ -24,6 +24,8 @@
 	"gerrit.opencord.org/voltha-bbsim/protos"
 	"gerrit.opencord.org/voltha-bbsim/device"
 	"time"
+	"context"
+	"errors"
 )
 
 //
@@ -106,7 +108,7 @@
 	DONE
 )
 
-type OmciMsgHandler func(class OmciClass, content OmciContent, key OnuKey) []byte
+type OmciMsgHandler func(class OmciClass, content OmciContent, key OnuKey) ([]byte, error)
 
 var Handlers = map[OmciMsgType]OmciMsgHandler{
 	MibReset:      mibReset,
@@ -120,64 +122,82 @@
 
 var OnuOmciStateMap = map[OnuKey]*OnuOmciState{}
 
-func OmciRun(omciOut chan openolt.OmciMsg, omciIn chan openolt.OmciIndication, onumap map[uint32][] *device.Onu, errch chan error) {
+func OmciRun(ctx context.Context, omciOut chan openolt.OmciMsg, omciIn chan openolt.OmciIndication, onumap map[uint32][] *device.Onu, errch chan error) {
 	go func() { //For monitoring the OMCI states
+		t := time.NewTicker(1 * time.Second)
+		defer t.Stop()
 		for {
-			time.Sleep(1 * time.Second)
-			if isAllOmciInitDone(onumap) {
-				logger.Info("OmciRun - All the omci init process were done")
-				close(errch)
-				break
+			select{
+			case <- t.C:
+				logger.Debug("Monitor omci init state")
+				if isAllOmciInitDone(onumap) {
+					logger.Info("OmciRun - All the omci initialization wes done")
+					close(errch)
+					return
+				}
+			case <- ctx.Done():
+				logger.Debug("Omci Monitoring process was done")
+				return
 			}
 		}
 	}()
 
 	go func(){
+		defer logger.Debug("Omci response process was done")
 		for {
 			var resp openolt.OmciIndication
+			select{
+				case m := <-omciOut:
+					transactionId, deviceId, msgType, class, instance, content, err := ParsePkt(m.Pkt)
+					if err != nil {
+						errch <- err
+						return
+					}
 
-			m := <-omciOut
+					logger.Debug("OmciRun - transactionId: %d msgType: %d, ME Class: %d, ME Instance: %d",
+						transactionId, msgType, class, instance)
 
-			transactionId, deviceId, msgType, class, instance, content := ParsePkt(m.Pkt)
+					key := OnuKey{m.IntfId, m.OnuId}
+					if _, ok := OnuOmciStateMap[key]; !ok {
+						OnuOmciStateMap[key] = NewOnuOmciState()
+					}
 
-			logger.Debug("OmciRun - transactionId: %d msgType: %d, ME Class: %d, ME Instance: %d",
-				transactionId, msgType, class, instance)
+					if _, ok := Handlers[msgType]; !ok {
+						logger.Warn("Ignore omci msg (msgType %d not handled)", msgType)
+						continue
+					}
 
-			key := OnuKey{m.IntfId, m.OnuId}
-			if _, ok := OnuOmciStateMap[key]; !ok {
-				OnuOmciStateMap[key] = NewOnuOmciState()
+					resp.Pkt, err = Handlers[msgType](class, content, key)
+					if err != nil {
+						errch <- err
+						return
+					}
+					resp.Pkt[0] = byte(transactionId >> 8)
+					resp.Pkt[1] = byte(transactionId & 0xFF)
+					resp.Pkt[2] = 0x2<<4 | byte(msgType)
+					resp.Pkt[3] = deviceId
+					resp.IntfId = m.IntfId
+					resp.OnuId = m.OnuId
+					omciIn <- resp
+				case <-ctx.Done():
+					return
 			}
-
-			if _, ok := Handlers[msgType]; !ok {
-				logger.Warn("Ignore omci msg (msgType %d not handled)", msgType)
-				continue
-			}
-
-			resp.Pkt = Handlers[msgType](class, content, key)
-
-			resp.Pkt[0] = byte(transactionId >> 8)
-			resp.Pkt[1] = byte(transactionId & 0xFF)
-			resp.Pkt[2] = 0x2<<4 | byte(msgType)
-			resp.Pkt[3] = deviceId
-			resp.IntfId = m.IntfId
-			resp.OnuId = m.OnuId
-			omciIn <- resp
 		}
 	}()
 }
 
-func ParsePkt(pkt []byte) (uint16, uint8, OmciMsgType, OmciClass, uint16, OmciContent) {
+func ParsePkt(pkt []byte) (uint16, uint8, OmciMsgType, OmciClass, uint16, OmciContent, error) {
 	var m OmciMessage
 
 	r := bytes.NewReader(HexDecode(pkt))
 
 	if err := binary.Read(r, binary.BigEndian, &m); err != nil {
 		logger.Error("binary.Read failed: %s", err)
+		return 0, 0, 0, 0, 0, OmciContent{}, errors.New("binary.Read failed")
 	}
 	logger.Debug("OmciRun - TransactionId: %d MessageType: %d, ME Class: %d, ME Instance: %d, Content: %x",
 		m.TransactionId, m.MessageType&0x0F, m.MessageId.Class, m.MessageId.Instance, m.Content)
-	return m.TransactionId, m.DeviceId, m.MessageType & 0x0F, m.MessageId.Class, m.MessageId.Instance, m.Content
-
+	return m.TransactionId, m.DeviceId, m.MessageType & 0x0F, m.MessageId.Class, m.MessageId.Instance, m.Content, nil
 }
 
 func HexDecode(pkt []byte) []byte {
@@ -198,7 +218,7 @@
 	return &OnuOmciState{gemPortId: 0, mibUploadCtr: 0, uniGInstance: 1, pptpInstance: 1}
 }
 
-func mibReset(class OmciClass, content OmciContent, key OnuKey) []byte {
+func mibReset(class OmciClass, content OmciContent, key OnuKey) ([]byte, error) {
 	var pkt []byte
 
 	logger.Debug("Omci MibReset")
@@ -210,10 +230,10 @@
 		0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
 		0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
 		0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}
-	return pkt
+	return pkt, nil
 }
 
-func mibUpload(class OmciClass, content OmciContent, key OnuKey) []byte {
+func mibUpload(class OmciClass, content OmciContent, key OnuKey) ([]byte, error) {
 	var pkt []byte
 
 	logger.Debug("Omci MibUpload")
@@ -228,10 +248,10 @@
 
 	pkt[9] = NumMibUploads // Number of subsequent MibUploadNext cmds
 
-	return pkt
+	return pkt, nil
 }
 
-func mibUploadNext(class OmciClass, content OmciContent, key OnuKey) []byte {
+func mibUploadNext(class OmciClass, content OmciContent, key OnuKey) ([]byte, error) {
 	var pkt []byte
 
 	logger.Debug("Omci MibUploadNext")
@@ -271,14 +291,14 @@
 		state.pptpInstance++
 	default:
 		logger.Error("Invalid MibUpload request %d", state.mibUploadCtr)
+		return nil, errors.New("Invalid MibUpload request")
 	}
 
 	state.mibUploadCtr++
-
-	return pkt
+	return pkt, nil
 }
 
-func set(class OmciClass, content OmciContent, key OnuKey) []byte {
+func set(class OmciClass, content OmciContent, key OnuKey) ([]byte, error) {
 	var pkt []byte
 
 	pkt = []byte{
@@ -291,15 +311,16 @@
 
 	logger.Debug("Omci Set")
 
-	return pkt
+	return pkt, nil
 }
 
-func create(class OmciClass, content OmciContent, key OnuKey) []byte {
+func create(class OmciClass, content OmciContent, key OnuKey) ([]byte, error) {
 	var pkt []byte
 
 	if class == GEMPortNetworkCTP {
 		if onuOmciState, ok := OnuOmciStateMap[key]; !ok {
 			logger.Error("ONU Key Error - IntfId: %d, OnuId:", key.IntfId, key.OnuId)
+			return nil, errors.New("ONU Key Error")
 		} else {
 			onuOmciState.gemPortId = binary.BigEndian.Uint16(content[:2])
 			logger.Debug("Gem Port Id %d", onuOmciState.gemPortId)
@@ -317,10 +338,10 @@
 
 	logger.Debug("Omci Create")
 
-	return pkt
+	return pkt, nil
 }
 
-func get(class OmciClass, content OmciContent, key OnuKey) []byte {
+func get(class OmciClass, content OmciContent, key OnuKey) ([]byte, error) {
 	var pkt []byte
 
 	pkt = []byte{
@@ -333,10 +354,10 @@
 
 	logger.Debug("Omci Get")
 
-	return pkt
+	return pkt, nil
 }
 
-func getAllAlarms(class OmciClass, content OmciContent, key OnuKey) []byte {
+func getAllAlarms(class OmciClass, content OmciContent, key OnuKey) ([]byte, error) {
 	var pkt []byte
 
 	pkt = []byte{
@@ -349,7 +370,7 @@
 
 	logger.Debug("Omci GetAllAlarms")
 
-	return pkt
+	return pkt, nil
 }
 
 func isAllOmciInitDone(onumap map[uint32][] *device.Onu) bool {