Refactoring for coreServer #No change in processing logic

Change-Id: Iea4d9ef4e6288b919c7e5796bbb9bc7f87ac604e
diff --git a/core/core_server.go b/core/core_server.go
index 610e88b..9886051 100644
--- a/core/core_server.go
+++ b/core/core_server.go
@@ -71,8 +71,8 @@
 
 const (
 	INACTIVE   = iota // OLT/ONUs are not instantiated
-	PRE_ACTIVE        // Before PacketInDaemon Running
-	ACTIVE            // After PacketInDaemon Running
+	PRE_ACTIVE        // Before running MainPacketLoop
+	ACTIVE            // After running MainPacketLoop
 )
 
 /* coreState
@@ -151,7 +151,7 @@
 		s.gRPCserver.Stop()
 		logger.Debug("gRPCserver.Stop()")
 	}
-	s.StopPktInDaemon()
+	s.StopPktLoops()
 	return
 }
 
@@ -178,7 +178,7 @@
 	coreCtx := context.Background()
 	coreCtx, corecancel := context.WithCancel(coreCtx)
 	s.cancel = corecancel
-	if err := s.StartPktInDaemon(coreCtx, *sv); err != nil {
+	if err := s.StartPktLoops(coreCtx, *sv); err != nil {
 		return err
 	}
 	return nil
@@ -190,7 +190,7 @@
 		logger.Debug("Disable() Done")
 	}()
 	logger.Debug("Disable() Start")
-	s.StopPktInDaemon()
+	s.StopPktLoops()
 }
 
 func (s *Server) updateState(state coreState) {
@@ -253,15 +253,15 @@
 }
 
 // Blocking
-func (s *Server) StartPktInDaemon(ctx context.Context, stream openolt.Openolt_EnableIndicationServer) error {
-	logger.Debug("StartPktInDaemon() Start")
+func (s *Server) StartPktLoops(ctx context.Context, stream openolt.Openolt_EnableIndicationServer) error {
+	logger.Debug("StartPktLoops () Start")
 	defer func() {
 		RemoveVeths(s.Vethnames)
 		s.Vethnames = []string{}
 		s.Ioinfos = []*Ioinfo{}
 		s.wg.Done()
 		s.updateState(PRE_ACTIVE)
-		logger.Debug("StartPktInDaemon() Done")
+		logger.Debug("StartPktLoops () Done")
 	}()
 	s.wg.Add(1)
 	ioinfos, veths, err := createIoinfos(s.Olt.ID, s.Vethnames, s.Onumap)
@@ -277,15 +277,15 @@
 	child, cancel := context.WithCancel(parent)
 	s.cancel = cancel
 
-	if err = s.runPacketInDaemon(child, stream); err != nil {
-		logger.Error("runPacketInDaemon failed.", err)
+	if err = s.runPktLoops(child, stream); err != nil {
+		logger.Error("runPktLoops failed.", err)
 		return err
 	}
 	return nil
 }
 
 //Non-Blocking
-func (s *Server) StopPktInDaemon() {
+func (s *Server) StopPktLoops() {
 	if s.cancel != nil {
 		cancel := s.cancel
 		cancel()
@@ -326,19 +326,18 @@
 }
 
 //Blocking
-func (s *Server) runPacketInDaemon(ctx context.Context, stream openolt.Openolt_EnableIndicationServer) error {
-	logger.Debug("runPacketInDaemon Start")
-	defer logger.Debug("runPacketInDaemon Done")
+func (s *Server) runPktLoops(ctx context.Context, stream openolt.Openolt_EnableIndicationServer) error {
+	logger.Debug("runPacketPktLoops Start")
+	defer logger.Debug("runPacketLoops Done")
 
 	errch := make(chan error)
-	OmciRun(ctx, s.omciOut, s.omciIn, s.Onumap, errch)
-	parent := ctx
-	eg, child := errgroup.WithContext(parent)
+	RunOmciResponder(ctx, s.omciOut, s.omciIn, s.Onumap, errch)
+	eg, child := errgroup.WithContext(ctx)
 	child, cancel := context.WithCancel(child)
 
 	eg.Go (func() error {
-		logger.Debug("runOMCIDaemon Start")
-		defer logger.Debug("runOMCIDaemon Done")
+		logger.Debug("runOMCIResponder Start")
+		defer logger.Debug("runOMCIResponder Done")
 		select{
 		case v, ok := <- errch:	// Wait for OmciInitialization
 			if ok {	//Error
@@ -354,132 +353,134 @@
 	})
 
 	eg.Go (func () error {
-		unichannel := make(chan Packet, 2048)
-		defer func() {
-			close(unichannel)
-			logger.Debug("Closed unichannel ")
-		}()
-		for intfid, _ := range s.Onumap {
-			for _, onu := range s.Onumap[intfid] {
-				onuid := onu.OnuID
-				ioinfo, err := s.identifyUniIoinfo("inside", intfid, onuid)
-				if err != nil {
-					utils.LoggerWithOnu(onu).Error("Fail to identifyUniIoinfo (onuid: %d): %v", onuid, err)
-					return err
-				}
-				uhandler := ioinfo.handler
-				go RecvWorker(ioinfo, uhandler, unichannel)
+		err := s.runMainPktLoop(child, stream)
+		return err
+	})
+
+	if err := eg.Wait(); err != nil {
+		logger.Error("Error happend in runPacketLoops:%s", err)
+		cancel()
+	}
+	return nil
+}
+
+func (s *Server) runMainPktLoop(ctx context.Context, stream openolt.Openolt_EnableIndicationServer) error {
+	unichannel := make(chan Packet, 2048)
+	defer func() {
+		close(unichannel)
+		logger.Debug("Closed unichannel ")
+	}()
+	for intfid, _ := range s.Onumap {
+		for _, onu := range s.Onumap[intfid] {
+			onuid := onu.OnuID
+			ioinfo, err := s.identifyUniIoinfo("inside", intfid, onuid)
+			if err != nil {
+				utils.LoggerWithOnu(onu).Error("Fail to identifyUniIoinfo (onuid: %d): %v", onuid, err)
+				return err
 			}
+			uhandler := ioinfo.handler
+			go RecvWorker(ioinfo, uhandler, unichannel)
 		}
+	}
 
-		ioinfo, err := s.IdentifyNniIoinfo("inside")
-		if err != nil {
-			return err
-		}
-		nhandler := ioinfo.handler
-		nnichannel := make(chan Packet, 32)
-		go RecvWorker(ioinfo, nhandler, nnichannel)
-		defer func(){
-			logger.Debug("PacketInDaemon thread receives close ")
-			close(nnichannel)
-		}()
+	ioinfo, err := s.IdentifyNniIoinfo("inside")
+	if err != nil {
+		return err
+	}
+	nhandler, nnichannel := ioinfo.handler, make(chan Packet, 32)
+	go RecvWorker(ioinfo, nhandler, nnichannel)
+	defer func(){
+		close(nnichannel)
+	}()
 
-		data := &openolt.Indication_PktInd{}
-		for {
-			select {
-			case msg := <-s.omciIn:
-				logger.Debug("OLT %d send omci indication, IF %v (ONU-ID: %v) pkt:%x.", s.Olt.ID, msg.IntfId, msg.OnuId, msg.Pkt)
-				omci := &openolt.Indication_OmciInd{OmciInd: &msg}
-				if err := stream.Send(&openolt.Indication{Data: omci}); err != nil {
-					logger.Error("send omci indication failed.", err)
-					continue
-				}
-			case unipkt := <-unichannel:
-				onuid := unipkt.Info.onuid
-				onu, _ := s.GetOnuByID(onuid)
-				utils.LoggerWithOnu(onu).Debug("Received packet from UNI in grpc Server")
-				if unipkt.Info == nil || unipkt.Info.iotype != "uni" {
-					logger.Debug("WARNING: This packet does not come from UNI ")
-					continue
-				}
+	data := &openolt.Indication_PktInd{}
+	for {
+		select {
+		case msg := <-s.omciIn:
+			logger.Debug("OLT %d send omci indication, IF %v (ONU-ID: %v) pkt:%x.", s.Olt.ID, msg.IntfId, msg.OnuId, msg.Pkt)
+			omci := &openolt.Indication_OmciInd{OmciInd: &msg}
+			if err := stream.Send(&openolt.Indication{Data: omci}); err != nil {
+				logger.Error("send omci indication failed.", err)
+				continue
+			}
+		case unipkt := <-unichannel:
+			onuid := unipkt.Info.onuid
+			onu, _ := s.GetOnuByID(onuid)
+			utils.LoggerWithOnu(onu).Debug("Received packet from UNI in grpc Server")
+			if unipkt.Info == nil || unipkt.Info.iotype != "uni" {
+				logger.Debug("WARNING: This packet does not come from UNI ")
+				continue
+			}
 
-				intfid := unipkt.Info.intfid
-				gemid, _ := getGemPortID(intfid, onuid)
-				pkt := unipkt.Pkt
-				layerEth := pkt.Layer(layers.LayerTypeEthernet)
-				le, _ := layerEth.(*layers.Ethernet)
-				ethtype := le.EthernetType
+			intfid := unipkt.Info.intfid
+			gemid, _ := getGemPortID(intfid, onuid)
+			pkt := unipkt.Pkt
+			layerEth := pkt.Layer(layers.LayerTypeEthernet)
+			le, _ := layerEth.(*layers.Ethernet)
+			ethtype := le.EthernetType
 
-				if ethtype == 0x888e {
-					utils.LoggerWithOnu(onu).WithFields(log.Fields{
-						"gemId": gemid,
-					}).Info("Received upstream packet is EAPOL.")
-				} else if layerDHCP := pkt.Layer(layers.LayerTypeDHCPv4); layerDHCP != nil {
-					utils.LoggerWithOnu(onu).WithFields(log.Fields{
-						"gemId": gemid,
-					}).Info("Received upstream packet is DHCP.")
+			if ethtype == 0x888e {
+				utils.LoggerWithOnu(onu).WithFields(log.Fields{
+					"gemId": gemid,
+				}).Info("Received upstream packet is EAPOL.")
+			} else if layerDHCP := pkt.Layer(layers.LayerTypeDHCPv4); layerDHCP != nil {
+				utils.LoggerWithOnu(onu).WithFields(log.Fields{
+					"gemId": gemid,
+				}).Info("Received upstream packet is DHCP.")
 
-					//C-TAG
-					sn := convB2S(onu.SerialNumber.VendorSpecific)
-					if ctag, ok := s.CtagMap[sn]; ok == true {
-						tagpkt, err := PushVLAN(pkt, uint16(ctag), onu)
-						if err != nil {
-							utils.LoggerWithOnu(onu).WithFields(log.Fields{
-								"gemId": gemid,
-							}).Error("Fail to tag C-tag")
-						} else {
-							pkt = tagpkt
-						}
-					} else {
+				//C-TAG
+				sn := convB2S(onu.SerialNumber.VendorSpecific)
+				if ctag, ok := s.CtagMap[sn]; ok == true {
+					tagpkt, err := PushVLAN(pkt, uint16(ctag), onu)
+					if err != nil {
 						utils.LoggerWithOnu(onu).WithFields(log.Fields{
-							"gemId":   gemid,
-							"cTagMap": s.CtagMap,
-						}).Error("Could not find onuid in CtagMap", onuid, sn, s.CtagMap)
+							"gemId": gemid,
+						}).Error("Fail to tag C-tag")
+					} else {
+						pkt = tagpkt
 					}
 				} else {
 					utils.LoggerWithOnu(onu).WithFields(log.Fields{
-						"gemId": gemid,
-					}).Info("Received upstream packet is of unknow type, skipping.")
-					continue
+						"gemId":   gemid,
+						"cTagMap": s.CtagMap,
+					}).Error("Could not find onuid in CtagMap", onuid, sn, s.CtagMap)
 				}
-
-				utils.LoggerWithOnu(onu).Info("sendPktInd - UNI Packet")
-				data = &openolt.Indication_PktInd{PktInd: &openolt.PacketIndication{IntfType: "pon", IntfId: intfid, GemportId: gemid, Pkt: pkt.Data()}}
-				if err := stream.Send(&openolt.Indication{Data: data}); err != nil {
-					logger.Error("Fail to send PktInd indication.", err)
-					return err
-				}
-
-			case nnipkt := <-nnichannel:
-				if nnipkt.Info == nil || nnipkt.Info.iotype != "nni" {
-					logger.Debug("WARNING: This packet does not come from NNI ")
-					continue
-				}
-				onuid := nnipkt.Info.onuid
-				onu, _ := s.GetOnuByID(onuid)
-
-				utils.LoggerWithOnu(onu).Info("Received packet from NNI in grpc Server.")
-				intfid := nnipkt.Info.intfid
-				pkt := nnipkt.Pkt
-				utils.LoggerWithOnu(onu).Info("sendPktInd - NNI Packet")
-				data = &openolt.Indication_PktInd{PktInd: &openolt.PacketIndication{IntfType: "nni", IntfId: intfid, Pkt: pkt.Data()}}
-				if err := stream.Send(&openolt.Indication{Data: data}); err != nil {
-					logger.Error("Fail to send PktInd indication.", err)
-					return err
-				}
-
-			case <-child.Done():
-				logger.Debug("Closed nnichannel ")
-				return nil
+			} else {
+				utils.LoggerWithOnu(onu).WithFields(log.Fields{
+					"gemId": gemid,
+				}).Info("Received upstream packet is of unknow type, skipping.")
+				continue
 			}
-		}
-		return nil
-	})
 
-	logger.Debug("Wait here")
-	if err := eg.Wait(); err != nil {
-		logger.Error("Error happend in runPacketInDaemon:%s", err)
-		cancel()
+			utils.LoggerWithOnu(onu).Info("sendPktInd - UNI Packet")
+			data = &openolt.Indication_PktInd{PktInd: &openolt.PacketIndication{IntfType: "pon", IntfId: intfid, GemportId: gemid, Pkt: pkt.Data()}}
+			if err := stream.Send(&openolt.Indication{Data: data}); err != nil {
+				logger.Error("Fail to send PktInd indication.", err)
+				return err
+			}
+
+		case nnipkt := <-nnichannel:
+			if nnipkt.Info == nil || nnipkt.Info.iotype != "nni" {
+				logger.Debug("WARNING: This packet does not come from NNI ")
+				continue
+			}
+			onuid := nnipkt.Info.onuid
+			onu, _ := s.GetOnuByID(onuid)
+
+			utils.LoggerWithOnu(onu).Info("Received packet from NNI in grpc Server.")
+			intfid := nnipkt.Info.intfid
+			pkt := nnipkt.Pkt
+			utils.LoggerWithOnu(onu).Info("sendPktInd - NNI Packet")
+			data = &openolt.Indication_PktInd{PktInd: &openolt.PacketIndication{IntfType: "nni", IntfId: intfid, Pkt: pkt.Data()}}
+			if err := stream.Send(&openolt.Indication{Data: data}); err != nil {
+				logger.Error("Fail to send PktInd indication.", err)
+				return err
+			}
+
+		case <-ctx.Done():
+			logger.Debug("Closed nnichannel ")
+			return nil
+		}
 	}
 	return nil
 }
diff --git a/core/omci.go b/core/omci.go
index 6a55416..87d2f70 100644
--- a/core/omci.go
+++ b/core/omci.go
@@ -64,8 +64,8 @@
 
 var OnuOmciStateMap = map[OnuKey]*OnuOmciState{}
 
-func OmciRun(ctx context.Context, omciOut chan openolt.OmciMsg, omciIn chan openolt.OmciIndication, onumap map[uint32][]*device.Onu, errch chan error) {
-	go func() { //For monitoring the OMCI states
+func RunOmciResponder(ctx context.Context, omciOut chan openolt.OmciMsg, omciIn chan openolt.OmciIndication, onumap map[uint32][] *device.Onu, errch chan error) {
+	go func() { //For monitoring the OMCI states TODO: This part should be eliminated because it is out of scope of this library
 		t := time.NewTicker(1 * time.Second)
 		defer t.Stop()
 		for {