VOL-1360 BBSim - Error handling in AAA/DHCP client activation trigger
Change-Id: Idd327ed5de38ee4026c786e5c384dfa63624ad0a
diff --git a/core/core_server.go b/core/core_server.go
index 69d6d69..610e88b 100644
--- a/core/core_server.go
+++ b/core/core_server.go
@@ -31,6 +31,7 @@
"github.com/google/gopacket/pcap"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
+ "golang.org/x/sync/errgroup"
)
const (
@@ -328,131 +329,157 @@
func (s *Server) runPacketInDaemon(ctx context.Context, stream openolt.Openolt_EnableIndicationServer) error {
logger.Debug("runPacketInDaemon Start")
defer logger.Debug("runPacketInDaemon Done")
- unichannel := make(chan Packet, 2048)
+ errch := make(chan error)
+ OmciRun(ctx, s.omciOut, s.omciIn, s.Onumap, errch)
+ parent := ctx
+ eg, child := errgroup.WithContext(parent)
+ child, cancel := context.WithCancel(child)
- logger.Debug("runOMCIDaemon Start")
- defer logger.Debug("runOMCIDaemon Done")
- errch := make (chan error)
- OmciRun(s.omciOut, s.omciIn, s.Onumap, errch)
- go func(){
- <-errch // Wait for OmciInitialization
- s.updateState(ACTIVE)
- }()
-
- for intfid, _ := range s.Onumap {
- for _, onu := range s.Onumap[intfid] {
- onuid := onu.OnuID
- ioinfo, err := s.identifyUniIoinfo("inside", intfid, onuid)
- if err != nil {
- utils.LoggerWithOnu(onu).Error("Fail to identifyUniIoinfo (onuid: %d): %v", onuid, err)
- return err
+ eg.Go (func() error {
+ logger.Debug("runOMCIDaemon Start")
+ defer logger.Debug("runOMCIDaemon Done")
+ select{
+ case v, ok := <- errch: // Wait for OmciInitialization
+ if ok { //Error
+ logger.Error("Error happend in Omci:%s", v)
+ return v
+ } else { //Close
+ s.updateState(ACTIVE)
}
- uhandler := ioinfo.handler
- go RecvWorker(ioinfo, uhandler, unichannel)
+ case <- child.Done():
+ return nil
}
- }
+ return nil
+ })
- ioinfo, err := s.IdentifyNniIoinfo("inside")
- if err != nil {
- return err
- }
- nhandler := ioinfo.handler
- nnichannel := make(chan Packet, 32)
- go RecvWorker(ioinfo, nhandler, nnichannel)
-
- data := &openolt.Indication_PktInd{}
- for {
- select {
- case msg := <-s.omciIn:
- logger.Debug("OLT %d send omci indication, IF %v (ONU-ID: %v) pkt:%x.", s.Olt.ID, msg.IntfId, msg.OnuId, msg.Pkt)
- omci := &openolt.Indication_OmciInd{OmciInd: &msg}
- if err := stream.Send(&openolt.Indication{Data: omci}); err != nil {
- logger.Error("send omci indication failed.", err)
- continue
+ eg.Go (func () error {
+ unichannel := make(chan Packet, 2048)
+ defer func() {
+ close(unichannel)
+ logger.Debug("Closed unichannel ")
+ }()
+ for intfid, _ := range s.Onumap {
+ for _, onu := range s.Onumap[intfid] {
+ onuid := onu.OnuID
+ ioinfo, err := s.identifyUniIoinfo("inside", intfid, onuid)
+ if err != nil {
+ utils.LoggerWithOnu(onu).Error("Fail to identifyUniIoinfo (onuid: %d): %v", onuid, err)
+ return err
+ }
+ uhandler := ioinfo.handler
+ go RecvWorker(ioinfo, uhandler, unichannel)
}
- case unipkt := <-unichannel:
- onuid := unipkt.Info.onuid
- onu, _ := s.GetOnuByID(onuid)
- utils.LoggerWithOnu(onu).Debug("Received packet from UNI in grpc Server")
- if unipkt.Info == nil || unipkt.Info.iotype != "uni" {
- logger.Debug("WARNING: This packet does not come from UNI ")
- continue
- }
+ }
- intfid := unipkt.Info.intfid
- gemid, _ := getGemPortID(intfid, onuid)
- pkt := unipkt.Pkt
- layerEth := pkt.Layer(layers.LayerTypeEthernet)
- le, _ := layerEth.(*layers.Ethernet)
- ethtype := le.EthernetType
+ ioinfo, err := s.IdentifyNniIoinfo("inside")
+ if err != nil {
+ return err
+ }
+ nhandler := ioinfo.handler
+ nnichannel := make(chan Packet, 32)
+ go RecvWorker(ioinfo, nhandler, nnichannel)
+ defer func(){
+ logger.Debug("PacketInDaemon thread receives close ")
+ close(nnichannel)
+ }()
- if ethtype == 0x888e {
- utils.LoggerWithOnu(onu).WithFields(log.Fields{
- "gemId": gemid,
- }).Info("Received upstream packet is EAPOL.")
- } else if layerDHCP := pkt.Layer(layers.LayerTypeDHCPv4); layerDHCP != nil {
- utils.LoggerWithOnu(onu).WithFields(log.Fields{
- "gemId": gemid,
- }).Info("Received upstream packet is DHCP.")
+ data := &openolt.Indication_PktInd{}
+ for {
+ select {
+ case msg := <-s.omciIn:
+ logger.Debug("OLT %d send omci indication, IF %v (ONU-ID: %v) pkt:%x.", s.Olt.ID, msg.IntfId, msg.OnuId, msg.Pkt)
+ omci := &openolt.Indication_OmciInd{OmciInd: &msg}
+ if err := stream.Send(&openolt.Indication{Data: omci}); err != nil {
+ logger.Error("send omci indication failed.", err)
+ continue
+ }
+ case unipkt := <-unichannel:
+ onuid := unipkt.Info.onuid
+ onu, _ := s.GetOnuByID(onuid)
+ utils.LoggerWithOnu(onu).Debug("Received packet from UNI in grpc Server")
+ if unipkt.Info == nil || unipkt.Info.iotype != "uni" {
+ logger.Debug("WARNING: This packet does not come from UNI ")
+ continue
+ }
- //C-TAG
- sn := convB2S(onu.SerialNumber.VendorSpecific)
- if ctag, ok := s.CtagMap[sn]; ok == true {
- tagpkt, err := PushVLAN(pkt, uint16(ctag), onu)
- if err != nil {
- utils.LoggerWithOnu(onu).WithFields(log.Fields{
- "gemId": gemid,
- }).Error("Fail to tag C-tag")
+ intfid := unipkt.Info.intfid
+ gemid, _ := getGemPortID(intfid, onuid)
+ pkt := unipkt.Pkt
+ layerEth := pkt.Layer(layers.LayerTypeEthernet)
+ le, _ := layerEth.(*layers.Ethernet)
+ ethtype := le.EthernetType
+
+ if ethtype == 0x888e {
+ utils.LoggerWithOnu(onu).WithFields(log.Fields{
+ "gemId": gemid,
+ }).Info("Received upstream packet is EAPOL.")
+ } else if layerDHCP := pkt.Layer(layers.LayerTypeDHCPv4); layerDHCP != nil {
+ utils.LoggerWithOnu(onu).WithFields(log.Fields{
+ "gemId": gemid,
+ }).Info("Received upstream packet is DHCP.")
+
+ //C-TAG
+ sn := convB2S(onu.SerialNumber.VendorSpecific)
+ if ctag, ok := s.CtagMap[sn]; ok == true {
+ tagpkt, err := PushVLAN(pkt, uint16(ctag), onu)
+ if err != nil {
+ utils.LoggerWithOnu(onu).WithFields(log.Fields{
+ "gemId": gemid,
+ }).Error("Fail to tag C-tag")
+ } else {
+ pkt = tagpkt
+ }
} else {
- pkt = tagpkt
+ utils.LoggerWithOnu(onu).WithFields(log.Fields{
+ "gemId": gemid,
+ "cTagMap": s.CtagMap,
+ }).Error("Could not find onuid in CtagMap", onuid, sn, s.CtagMap)
}
} else {
utils.LoggerWithOnu(onu).WithFields(log.Fields{
- "gemId": gemid,
- "cTagMap": s.CtagMap,
- }).Error("Could not find onuid in CtagMap", onuid, sn, s.CtagMap)
+ "gemId": gemid,
+ }).Info("Received upstream packet is of unknow type, skipping.")
+ continue
}
- } else {
- utils.LoggerWithOnu(onu).WithFields(log.Fields{
- "gemId": gemid,
- }).Info("Received upstream packet is of unknow type, skipping.")
- continue
- }
- utils.LoggerWithOnu(onu).Info("sendPktInd - UNI Packet")
- data = &openolt.Indication_PktInd{PktInd: &openolt.PacketIndication{IntfType: "pon", IntfId: intfid, GemportId: gemid, Pkt: pkt.Data()}}
- if err := stream.Send(&openolt.Indication{Data: data}); err != nil {
- logger.Error("Fail to send PktInd indication.", err)
- return err
- }
+ utils.LoggerWithOnu(onu).Info("sendPktInd - UNI Packet")
+ data = &openolt.Indication_PktInd{PktInd: &openolt.PacketIndication{IntfType: "pon", IntfId: intfid, GemportId: gemid, Pkt: pkt.Data()}}
+ if err := stream.Send(&openolt.Indication{Data: data}); err != nil {
+ logger.Error("Fail to send PktInd indication.", err)
+ return err
+ }
- case nnipkt := <-nnichannel:
- if nnipkt.Info == nil || nnipkt.Info.iotype != "nni" {
- logger.Debug("WARNING: This packet does not come from NNI ")
- continue
- }
- onuid := nnipkt.Info.onuid
- onu, _ := s.GetOnuByID(onuid)
+ case nnipkt := <-nnichannel:
+ if nnipkt.Info == nil || nnipkt.Info.iotype != "nni" {
+ logger.Debug("WARNING: This packet does not come from NNI ")
+ continue
+ }
+ onuid := nnipkt.Info.onuid
+ onu, _ := s.GetOnuByID(onuid)
- utils.LoggerWithOnu(onu).Info("Received packet from NNI in grpc Server.")
- intfid := nnipkt.Info.intfid
- pkt := nnipkt.Pkt
- utils.LoggerWithOnu(onu).Info("sendPktInd - NNI Packet")
- data = &openolt.Indication_PktInd{PktInd: &openolt.PacketIndication{IntfType: "nni", IntfId: intfid, Pkt: pkt.Data()}}
- if err := stream.Send(&openolt.Indication{Data: data}); err != nil {
- logger.Error("Fail to send PktInd indication.", err)
- return err
- }
+ utils.LoggerWithOnu(onu).Info("Received packet from NNI in grpc Server.")
+ intfid := nnipkt.Info.intfid
+ pkt := nnipkt.Pkt
+ utils.LoggerWithOnu(onu).Info("sendPktInd - NNI Packet")
+ data = &openolt.Indication_PktInd{PktInd: &openolt.PacketIndication{IntfType: "nni", IntfId: intfid, Pkt: pkt.Data()}}
+ if err := stream.Send(&openolt.Indication{Data: data}); err != nil {
+ logger.Error("Fail to send PktInd indication.", err)
+ return err
+ }
- case <-ctx.Done():
- logger.Debug("PacketInDaemon thread receives close ")
- close(unichannel)
- logger.Debug("Closed unichannel ")
- close(nnichannel)
- logger.Debug("Closed nnichannel ")
- return nil
+ case <-child.Done():
+ logger.Debug("Closed nnichannel ")
+ return nil
+ }
}
+ return nil
+ })
+
+ logger.Debug("Wait here")
+ if err := eg.Wait(); err != nil {
+ logger.Error("Error happend in runPacketInDaemon:%s", err)
+ cancel()
}
return nil
}
diff --git a/core/io_worker.go b/core/io_worker.go
index 2533a19..077b2c1 100644
--- a/core/io_worker.go
+++ b/core/io_worker.go
@@ -22,13 +22,12 @@
"strconv"
"time"
- "gerrit.opencord.org/voltha-bbsim/device"
-
"gerrit.opencord.org/voltha-bbsim/common/logger"
- "gerrit.opencord.org/voltha-bbsim/common/utils"
"github.com/google/gopacket"
"github.com/google/gopacket/layers"
"github.com/google/gopacket/pcap"
+ "gerrit.opencord.org/voltha-bbsim/device"
+ "gerrit.opencord.org/voltha-bbsim/common/utils"
)
func RecvWorker(io *Ioinfo, handler *pcap.Handle, r chan Packet) {
diff --git a/core/mediator.go b/core/mediator.go
index ee25264..8baeafd 100644
--- a/core/mediator.go
+++ b/core/mediator.go
@@ -51,8 +51,8 @@
npon := flag.Int("i", 1, "Number of PON-IF ports")
nonus := flag.Int("n", 1, "Number of ONUs per PON-IF port")
modeopt := flag.String("m", "default", "Emulation mode (default, aaa, both (aaa & dhcp))")
- aaawait := flag.Int("aw", 30, "Wait time (sec) for activation WPA supplicants")
- dhcpwait := flag.Int("dw", 50, "Wait time (sec) for activation DHCP clients")
+ aaawait := flag.Int("aw", 10, "Wait time (sec) for activation WPA supplicants")
+ dhcpwait := flag.Int("dw", 20, "Wait time (sec) for activation DHCP clients")
dhcpservip := flag.String("s", "182.21.0.1", "DHCP Server IP Address")
intvl := flag.Int("v", 1, "Interval each Indication")
intvl_test := flag.Int("V", 1, "Interval each Indication")
diff --git a/core/omci.go b/core/omci.go
index acf7bbe..3a4335d 100644
--- a/core/omci.go
+++ b/core/omci.go
@@ -24,6 +24,8 @@
"gerrit.opencord.org/voltha-bbsim/protos"
"gerrit.opencord.org/voltha-bbsim/device"
"time"
+ "context"
+ "errors"
)
//
@@ -106,7 +108,7 @@
DONE
)
-type OmciMsgHandler func(class OmciClass, content OmciContent, key OnuKey) []byte
+type OmciMsgHandler func(class OmciClass, content OmciContent, key OnuKey) ([]byte, error)
var Handlers = map[OmciMsgType]OmciMsgHandler{
MibReset: mibReset,
@@ -120,64 +122,82 @@
var OnuOmciStateMap = map[OnuKey]*OnuOmciState{}
-func OmciRun(omciOut chan openolt.OmciMsg, omciIn chan openolt.OmciIndication, onumap map[uint32][] *device.Onu, errch chan error) {
+func OmciRun(ctx context.Context, omciOut chan openolt.OmciMsg, omciIn chan openolt.OmciIndication, onumap map[uint32][] *device.Onu, errch chan error) {
go func() { //For monitoring the OMCI states
+ t := time.NewTicker(1 * time.Second)
+ defer t.Stop()
for {
- time.Sleep(1 * time.Second)
- if isAllOmciInitDone(onumap) {
- logger.Info("OmciRun - All the omci init process were done")
- close(errch)
- break
+ select{
+ case <- t.C:
+ logger.Debug("Monitor omci init state")
+ if isAllOmciInitDone(onumap) {
+ logger.Info("OmciRun - All the omci initialization wes done")
+ close(errch)
+ return
+ }
+ case <- ctx.Done():
+ logger.Debug("Omci Monitoring process was done")
+ return
}
}
}()
go func(){
+ defer logger.Debug("Omci response process was done")
for {
var resp openolt.OmciIndication
+ select{
+ case m := <-omciOut:
+ transactionId, deviceId, msgType, class, instance, content, err := ParsePkt(m.Pkt)
+ if err != nil {
+ errch <- err
+ return
+ }
- m := <-omciOut
+ logger.Debug("OmciRun - transactionId: %d msgType: %d, ME Class: %d, ME Instance: %d",
+ transactionId, msgType, class, instance)
- transactionId, deviceId, msgType, class, instance, content := ParsePkt(m.Pkt)
+ key := OnuKey{m.IntfId, m.OnuId}
+ if _, ok := OnuOmciStateMap[key]; !ok {
+ OnuOmciStateMap[key] = NewOnuOmciState()
+ }
- logger.Debug("OmciRun - transactionId: %d msgType: %d, ME Class: %d, ME Instance: %d",
- transactionId, msgType, class, instance)
+ if _, ok := Handlers[msgType]; !ok {
+ logger.Warn("Ignore omci msg (msgType %d not handled)", msgType)
+ continue
+ }
- key := OnuKey{m.IntfId, m.OnuId}
- if _, ok := OnuOmciStateMap[key]; !ok {
- OnuOmciStateMap[key] = NewOnuOmciState()
+ resp.Pkt, err = Handlers[msgType](class, content, key)
+ if err != nil {
+ errch <- err
+ return
+ }
+ resp.Pkt[0] = byte(transactionId >> 8)
+ resp.Pkt[1] = byte(transactionId & 0xFF)
+ resp.Pkt[2] = 0x2<<4 | byte(msgType)
+ resp.Pkt[3] = deviceId
+ resp.IntfId = m.IntfId
+ resp.OnuId = m.OnuId
+ omciIn <- resp
+ case <-ctx.Done():
+ return
}
-
- if _, ok := Handlers[msgType]; !ok {
- logger.Warn("Ignore omci msg (msgType %d not handled)", msgType)
- continue
- }
-
- resp.Pkt = Handlers[msgType](class, content, key)
-
- resp.Pkt[0] = byte(transactionId >> 8)
- resp.Pkt[1] = byte(transactionId & 0xFF)
- resp.Pkt[2] = 0x2<<4 | byte(msgType)
- resp.Pkt[3] = deviceId
- resp.IntfId = m.IntfId
- resp.OnuId = m.OnuId
- omciIn <- resp
}
}()
}
-func ParsePkt(pkt []byte) (uint16, uint8, OmciMsgType, OmciClass, uint16, OmciContent) {
+func ParsePkt(pkt []byte) (uint16, uint8, OmciMsgType, OmciClass, uint16, OmciContent, error) {
var m OmciMessage
r := bytes.NewReader(HexDecode(pkt))
if err := binary.Read(r, binary.BigEndian, &m); err != nil {
logger.Error("binary.Read failed: %s", err)
+ return 0, 0, 0, 0, 0, OmciContent{}, errors.New("binary.Read failed")
}
logger.Debug("OmciRun - TransactionId: %d MessageType: %d, ME Class: %d, ME Instance: %d, Content: %x",
m.TransactionId, m.MessageType&0x0F, m.MessageId.Class, m.MessageId.Instance, m.Content)
- return m.TransactionId, m.DeviceId, m.MessageType & 0x0F, m.MessageId.Class, m.MessageId.Instance, m.Content
-
+ return m.TransactionId, m.DeviceId, m.MessageType & 0x0F, m.MessageId.Class, m.MessageId.Instance, m.Content, nil
}
func HexDecode(pkt []byte) []byte {
@@ -198,7 +218,7 @@
return &OnuOmciState{gemPortId: 0, mibUploadCtr: 0, uniGInstance: 1, pptpInstance: 1}
}
-func mibReset(class OmciClass, content OmciContent, key OnuKey) []byte {
+func mibReset(class OmciClass, content OmciContent, key OnuKey) ([]byte, error) {
var pkt []byte
logger.Debug("Omci MibReset")
@@ -210,10 +230,10 @@
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}
- return pkt
+ return pkt, nil
}
-func mibUpload(class OmciClass, content OmciContent, key OnuKey) []byte {
+func mibUpload(class OmciClass, content OmciContent, key OnuKey) ([]byte, error) {
var pkt []byte
logger.Debug("Omci MibUpload")
@@ -228,10 +248,10 @@
pkt[9] = NumMibUploads // Number of subsequent MibUploadNext cmds
- return pkt
+ return pkt, nil
}
-func mibUploadNext(class OmciClass, content OmciContent, key OnuKey) []byte {
+func mibUploadNext(class OmciClass, content OmciContent, key OnuKey) ([]byte, error) {
var pkt []byte
logger.Debug("Omci MibUploadNext")
@@ -271,14 +291,14 @@
state.pptpInstance++
default:
logger.Error("Invalid MibUpload request %d", state.mibUploadCtr)
+ return nil, errors.New("Invalid MibUpload request")
}
state.mibUploadCtr++
-
- return pkt
+ return pkt, nil
}
-func set(class OmciClass, content OmciContent, key OnuKey) []byte {
+func set(class OmciClass, content OmciContent, key OnuKey) ([]byte, error) {
var pkt []byte
pkt = []byte{
@@ -291,15 +311,16 @@
logger.Debug("Omci Set")
- return pkt
+ return pkt, nil
}
-func create(class OmciClass, content OmciContent, key OnuKey) []byte {
+func create(class OmciClass, content OmciContent, key OnuKey) ([]byte, error) {
var pkt []byte
if class == GEMPortNetworkCTP {
if onuOmciState, ok := OnuOmciStateMap[key]; !ok {
logger.Error("ONU Key Error - IntfId: %d, OnuId:", key.IntfId, key.OnuId)
+ return nil, errors.New("ONU Key Error")
} else {
onuOmciState.gemPortId = binary.BigEndian.Uint16(content[:2])
logger.Debug("Gem Port Id %d", onuOmciState.gemPortId)
@@ -317,10 +338,10 @@
logger.Debug("Omci Create")
- return pkt
+ return pkt, nil
}
-func get(class OmciClass, content OmciContent, key OnuKey) []byte {
+func get(class OmciClass, content OmciContent, key OnuKey) ([]byte, error) {
var pkt []byte
pkt = []byte{
@@ -333,10 +354,10 @@
logger.Debug("Omci Get")
- return pkt
+ return pkt, nil
}
-func getAllAlarms(class OmciClass, content OmciContent, key OnuKey) []byte {
+func getAllAlarms(class OmciClass, content OmciContent, key OnuKey) ([]byte, error) {
var pkt []byte
pkt = []byte{
@@ -349,7 +370,7 @@
logger.Debug("Omci GetAllAlarms")
- return pkt
+ return pkt, nil
}
func isAllOmciInitDone(onumap map[uint32][] *device.Onu) bool {