Refactoring for coreServer #No change in processing logic
Change-Id: Iea4d9ef4e6288b919c7e5796bbb9bc7f87ac604e
diff --git a/core/core_server.go b/core/core_server.go
index 610e88b..9886051 100644
--- a/core/core_server.go
+++ b/core/core_server.go
@@ -71,8 +71,8 @@
const (
INACTIVE = iota // OLT/ONUs are not instantiated
- PRE_ACTIVE // Before PacketInDaemon Running
- ACTIVE // After PacketInDaemon Running
+ PRE_ACTIVE // Before running MainPacketLoop
+ ACTIVE // After running MainPacketLoop
)
/* coreState
@@ -151,7 +151,7 @@
s.gRPCserver.Stop()
logger.Debug("gRPCserver.Stop()")
}
- s.StopPktInDaemon()
+ s.StopPktLoops()
return
}
@@ -178,7 +178,7 @@
coreCtx := context.Background()
coreCtx, corecancel := context.WithCancel(coreCtx)
s.cancel = corecancel
- if err := s.StartPktInDaemon(coreCtx, *sv); err != nil {
+ if err := s.StartPktLoops(coreCtx, *sv); err != nil {
return err
}
return nil
@@ -190,7 +190,7 @@
logger.Debug("Disable() Done")
}()
logger.Debug("Disable() Start")
- s.StopPktInDaemon()
+ s.StopPktLoops()
}
func (s *Server) updateState(state coreState) {
@@ -253,15 +253,15 @@
}
// Blocking
-func (s *Server) StartPktInDaemon(ctx context.Context, stream openolt.Openolt_EnableIndicationServer) error {
- logger.Debug("StartPktInDaemon() Start")
+func (s *Server) StartPktLoops(ctx context.Context, stream openolt.Openolt_EnableIndicationServer) error {
+ logger.Debug("StartPktLoops () Start")
defer func() {
RemoveVeths(s.Vethnames)
s.Vethnames = []string{}
s.Ioinfos = []*Ioinfo{}
s.wg.Done()
s.updateState(PRE_ACTIVE)
- logger.Debug("StartPktInDaemon() Done")
+ logger.Debug("StartPktLoops () Done")
}()
s.wg.Add(1)
ioinfos, veths, err := createIoinfos(s.Olt.ID, s.Vethnames, s.Onumap)
@@ -277,15 +277,15 @@
child, cancel := context.WithCancel(parent)
s.cancel = cancel
- if err = s.runPacketInDaemon(child, stream); err != nil {
- logger.Error("runPacketInDaemon failed.", err)
+ if err = s.runPktLoops(child, stream); err != nil {
+ logger.Error("runPktLoops failed.", err)
return err
}
return nil
}
//Non-Blocking
-func (s *Server) StopPktInDaemon() {
+func (s *Server) StopPktLoops() {
if s.cancel != nil {
cancel := s.cancel
cancel()
@@ -326,19 +326,18 @@
}
//Blocking
-func (s *Server) runPacketInDaemon(ctx context.Context, stream openolt.Openolt_EnableIndicationServer) error {
- logger.Debug("runPacketInDaemon Start")
- defer logger.Debug("runPacketInDaemon Done")
+func (s *Server) runPktLoops(ctx context.Context, stream openolt.Openolt_EnableIndicationServer) error {
+ logger.Debug("runPacketPktLoops Start")
+ defer logger.Debug("runPacketLoops Done")
errch := make(chan error)
- OmciRun(ctx, s.omciOut, s.omciIn, s.Onumap, errch)
- parent := ctx
- eg, child := errgroup.WithContext(parent)
+ RunOmciResponder(ctx, s.omciOut, s.omciIn, s.Onumap, errch)
+ eg, child := errgroup.WithContext(ctx)
child, cancel := context.WithCancel(child)
eg.Go (func() error {
- logger.Debug("runOMCIDaemon Start")
- defer logger.Debug("runOMCIDaemon Done")
+ logger.Debug("runOMCIResponder Start")
+ defer logger.Debug("runOMCIResponder Done")
select{
case v, ok := <- errch: // Wait for OmciInitialization
if ok { //Error
@@ -354,132 +353,134 @@
})
eg.Go (func () error {
- unichannel := make(chan Packet, 2048)
- defer func() {
- close(unichannel)
- logger.Debug("Closed unichannel ")
- }()
- for intfid, _ := range s.Onumap {
- for _, onu := range s.Onumap[intfid] {
- onuid := onu.OnuID
- ioinfo, err := s.identifyUniIoinfo("inside", intfid, onuid)
- if err != nil {
- utils.LoggerWithOnu(onu).Error("Fail to identifyUniIoinfo (onuid: %d): %v", onuid, err)
- return err
- }
- uhandler := ioinfo.handler
- go RecvWorker(ioinfo, uhandler, unichannel)
+ err := s.runMainPktLoop(child, stream)
+ return err
+ })
+
+ if err := eg.Wait(); err != nil {
+ logger.Error("Error happend in runPacketLoops:%s", err)
+ cancel()
+ }
+ return nil
+}
+
+func (s *Server) runMainPktLoop(ctx context.Context, stream openolt.Openolt_EnableIndicationServer) error {
+ unichannel := make(chan Packet, 2048)
+ defer func() {
+ close(unichannel)
+ logger.Debug("Closed unichannel ")
+ }()
+ for intfid, _ := range s.Onumap {
+ for _, onu := range s.Onumap[intfid] {
+ onuid := onu.OnuID
+ ioinfo, err := s.identifyUniIoinfo("inside", intfid, onuid)
+ if err != nil {
+ utils.LoggerWithOnu(onu).Error("Fail to identifyUniIoinfo (onuid: %d): %v", onuid, err)
+ return err
}
+ uhandler := ioinfo.handler
+ go RecvWorker(ioinfo, uhandler, unichannel)
}
+ }
- ioinfo, err := s.IdentifyNniIoinfo("inside")
- if err != nil {
- return err
- }
- nhandler := ioinfo.handler
- nnichannel := make(chan Packet, 32)
- go RecvWorker(ioinfo, nhandler, nnichannel)
- defer func(){
- logger.Debug("PacketInDaemon thread receives close ")
- close(nnichannel)
- }()
+ ioinfo, err := s.IdentifyNniIoinfo("inside")
+ if err != nil {
+ return err
+ }
+ nhandler, nnichannel := ioinfo.handler, make(chan Packet, 32)
+ go RecvWorker(ioinfo, nhandler, nnichannel)
+ defer func(){
+ close(nnichannel)
+ }()
- data := &openolt.Indication_PktInd{}
- for {
- select {
- case msg := <-s.omciIn:
- logger.Debug("OLT %d send omci indication, IF %v (ONU-ID: %v) pkt:%x.", s.Olt.ID, msg.IntfId, msg.OnuId, msg.Pkt)
- omci := &openolt.Indication_OmciInd{OmciInd: &msg}
- if err := stream.Send(&openolt.Indication{Data: omci}); err != nil {
- logger.Error("send omci indication failed.", err)
- continue
- }
- case unipkt := <-unichannel:
- onuid := unipkt.Info.onuid
- onu, _ := s.GetOnuByID(onuid)
- utils.LoggerWithOnu(onu).Debug("Received packet from UNI in grpc Server")
- if unipkt.Info == nil || unipkt.Info.iotype != "uni" {
- logger.Debug("WARNING: This packet does not come from UNI ")
- continue
- }
+ data := &openolt.Indication_PktInd{}
+ for {
+ select {
+ case msg := <-s.omciIn:
+ logger.Debug("OLT %d send omci indication, IF %v (ONU-ID: %v) pkt:%x.", s.Olt.ID, msg.IntfId, msg.OnuId, msg.Pkt)
+ omci := &openolt.Indication_OmciInd{OmciInd: &msg}
+ if err := stream.Send(&openolt.Indication{Data: omci}); err != nil {
+ logger.Error("send omci indication failed.", err)
+ continue
+ }
+ case unipkt := <-unichannel:
+ onuid := unipkt.Info.onuid
+ onu, _ := s.GetOnuByID(onuid)
+ utils.LoggerWithOnu(onu).Debug("Received packet from UNI in grpc Server")
+ if unipkt.Info == nil || unipkt.Info.iotype != "uni" {
+ logger.Debug("WARNING: This packet does not come from UNI ")
+ continue
+ }
- intfid := unipkt.Info.intfid
- gemid, _ := getGemPortID(intfid, onuid)
- pkt := unipkt.Pkt
- layerEth := pkt.Layer(layers.LayerTypeEthernet)
- le, _ := layerEth.(*layers.Ethernet)
- ethtype := le.EthernetType
+ intfid := unipkt.Info.intfid
+ gemid, _ := getGemPortID(intfid, onuid)
+ pkt := unipkt.Pkt
+ layerEth := pkt.Layer(layers.LayerTypeEthernet)
+ le, _ := layerEth.(*layers.Ethernet)
+ ethtype := le.EthernetType
- if ethtype == 0x888e {
- utils.LoggerWithOnu(onu).WithFields(log.Fields{
- "gemId": gemid,
- }).Info("Received upstream packet is EAPOL.")
- } else if layerDHCP := pkt.Layer(layers.LayerTypeDHCPv4); layerDHCP != nil {
- utils.LoggerWithOnu(onu).WithFields(log.Fields{
- "gemId": gemid,
- }).Info("Received upstream packet is DHCP.")
+ if ethtype == 0x888e {
+ utils.LoggerWithOnu(onu).WithFields(log.Fields{
+ "gemId": gemid,
+ }).Info("Received upstream packet is EAPOL.")
+ } else if layerDHCP := pkt.Layer(layers.LayerTypeDHCPv4); layerDHCP != nil {
+ utils.LoggerWithOnu(onu).WithFields(log.Fields{
+ "gemId": gemid,
+ }).Info("Received upstream packet is DHCP.")
- //C-TAG
- sn := convB2S(onu.SerialNumber.VendorSpecific)
- if ctag, ok := s.CtagMap[sn]; ok == true {
- tagpkt, err := PushVLAN(pkt, uint16(ctag), onu)
- if err != nil {
- utils.LoggerWithOnu(onu).WithFields(log.Fields{
- "gemId": gemid,
- }).Error("Fail to tag C-tag")
- } else {
- pkt = tagpkt
- }
- } else {
+ //C-TAG
+ sn := convB2S(onu.SerialNumber.VendorSpecific)
+ if ctag, ok := s.CtagMap[sn]; ok == true {
+ tagpkt, err := PushVLAN(pkt, uint16(ctag), onu)
+ if err != nil {
utils.LoggerWithOnu(onu).WithFields(log.Fields{
- "gemId": gemid,
- "cTagMap": s.CtagMap,
- }).Error("Could not find onuid in CtagMap", onuid, sn, s.CtagMap)
+ "gemId": gemid,
+ }).Error("Fail to tag C-tag")
+ } else {
+ pkt = tagpkt
}
} else {
utils.LoggerWithOnu(onu).WithFields(log.Fields{
- "gemId": gemid,
- }).Info("Received upstream packet is of unknow type, skipping.")
- continue
+ "gemId": gemid,
+ "cTagMap": s.CtagMap,
+ }).Error("Could not find onuid in CtagMap", onuid, sn, s.CtagMap)
}
-
- utils.LoggerWithOnu(onu).Info("sendPktInd - UNI Packet")
- data = &openolt.Indication_PktInd{PktInd: &openolt.PacketIndication{IntfType: "pon", IntfId: intfid, GemportId: gemid, Pkt: pkt.Data()}}
- if err := stream.Send(&openolt.Indication{Data: data}); err != nil {
- logger.Error("Fail to send PktInd indication.", err)
- return err
- }
-
- case nnipkt := <-nnichannel:
- if nnipkt.Info == nil || nnipkt.Info.iotype != "nni" {
- logger.Debug("WARNING: This packet does not come from NNI ")
- continue
- }
- onuid := nnipkt.Info.onuid
- onu, _ := s.GetOnuByID(onuid)
-
- utils.LoggerWithOnu(onu).Info("Received packet from NNI in grpc Server.")
- intfid := nnipkt.Info.intfid
- pkt := nnipkt.Pkt
- utils.LoggerWithOnu(onu).Info("sendPktInd - NNI Packet")
- data = &openolt.Indication_PktInd{PktInd: &openolt.PacketIndication{IntfType: "nni", IntfId: intfid, Pkt: pkt.Data()}}
- if err := stream.Send(&openolt.Indication{Data: data}); err != nil {
- logger.Error("Fail to send PktInd indication.", err)
- return err
- }
-
- case <-child.Done():
- logger.Debug("Closed nnichannel ")
- return nil
+ } else {
+ utils.LoggerWithOnu(onu).WithFields(log.Fields{
+ "gemId": gemid,
+ }).Info("Received upstream packet is of unknow type, skipping.")
+ continue
}
- }
- return nil
- })
- logger.Debug("Wait here")
- if err := eg.Wait(); err != nil {
- logger.Error("Error happend in runPacketInDaemon:%s", err)
- cancel()
+ utils.LoggerWithOnu(onu).Info("sendPktInd - UNI Packet")
+ data = &openolt.Indication_PktInd{PktInd: &openolt.PacketIndication{IntfType: "pon", IntfId: intfid, GemportId: gemid, Pkt: pkt.Data()}}
+ if err := stream.Send(&openolt.Indication{Data: data}); err != nil {
+ logger.Error("Fail to send PktInd indication.", err)
+ return err
+ }
+
+ case nnipkt := <-nnichannel:
+ if nnipkt.Info == nil || nnipkt.Info.iotype != "nni" {
+ logger.Debug("WARNING: This packet does not come from NNI ")
+ continue
+ }
+ onuid := nnipkt.Info.onuid
+ onu, _ := s.GetOnuByID(onuid)
+
+ utils.LoggerWithOnu(onu).Info("Received packet from NNI in grpc Server.")
+ intfid := nnipkt.Info.intfid
+ pkt := nnipkt.Pkt
+ utils.LoggerWithOnu(onu).Info("sendPktInd - NNI Packet")
+ data = &openolt.Indication_PktInd{PktInd: &openolt.PacketIndication{IntfType: "nni", IntfId: intfid, Pkt: pkt.Data()}}
+ if err := stream.Send(&openolt.Indication{Data: data}); err != nil {
+ logger.Error("Fail to send PktInd indication.", err)
+ return err
+ }
+
+ case <-ctx.Done():
+ logger.Debug("Closed nnichannel ")
+ return nil
+ }
}
return nil
}
diff --git a/core/omci.go b/core/omci.go
index 6a55416..87d2f70 100644
--- a/core/omci.go
+++ b/core/omci.go
@@ -64,8 +64,8 @@
var OnuOmciStateMap = map[OnuKey]*OnuOmciState{}
-func OmciRun(ctx context.Context, omciOut chan openolt.OmciMsg, omciIn chan openolt.OmciIndication, onumap map[uint32][]*device.Onu, errch chan error) {
- go func() { //For monitoring the OMCI states
+func RunOmciResponder(ctx context.Context, omciOut chan openolt.OmciMsg, omciIn chan openolt.OmciIndication, onumap map[uint32][] *device.Onu, errch chan error) {
+ go func() { //For monitoring the OMCI states TODO: This part should be eliminated because it is out of scope of this library
t := time.NewTicker(1 * time.Second)
defer t.Stop()
for {