Creating one channel per ONU
Using state machine for OperState in OLT, NNI, PON
Using state machien ONU OperState and added gRPC API to list ONUs with
statuses
Change-Id: I663133510ef4a672e3807cd7e0db9eca8b7ab0d2
diff --git a/internal/bbsim/devices/olt.go b/internal/bbsim/devices/olt.go
index 2c3486e..d1b7313 100644
--- a/internal/bbsim/devices/olt.go
+++ b/internal/bbsim/devices/olt.go
@@ -9,10 +9,11 @@
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
"net"
+ "os"
"sync"
)
-var logger = log.WithFields(log.Fields{
+var oltLogger = log.WithFields(log.Fields{
"module": "OLT",
})
@@ -28,7 +29,7 @@
}
func CreateOLT(seq int, nni int, pon int, onuPerPon int) OltDevice {
- logger.WithFields(log.Fields{
+ oltLogger.WithFields(log.Fields{
"ID": seq,
"NumNni":nni,
"NumPon":pon,
@@ -37,15 +38,19 @@
olt = OltDevice{
ID: seq,
+ OperState: getOperStateFSM(func(e *fsm.Event) {
+ oltLogger.Debugf("Changing OLT OperState from %s to %s", e.Src, e.Dst)
+ }),
NumNni:nni,
NumPon:pon,
NumOnuPerPon:onuPerPon,
Pons: []PonPort{},
Nnis: []NniPort{},
- channel: make(chan interface{}, 32),
+ channel: make(chan Message),
}
// OLT State machine
+ // NOTE do we need 2 state machines for the OLT? (InternalState and OperState)
olt.InternalState = fsm.NewFSM(
"created",
fsm.Events{
@@ -54,7 +59,7 @@
},
fsm.Callbacks{
"enter_state": func(e *fsm.Event) {
- olt.stateChange(e)
+ oltLogger.Debugf("Changing OLT InternalState from %s to %s", e.Src, e.Dst)
},
},
)
@@ -62,7 +67,9 @@
// create NNI Port
nniPort := NniPort{
ID: uint32(0),
- OperState: DOWN,
+ OperState: getOperStateFSM(func(e *fsm.Event) {
+ oltLogger.Debugf("Changing NNI OperState from %s to %s", e.Src, e.Dst)
+ }),
Type: "nni",
}
olt.Nnis = append(olt.Nnis, nniPort)
@@ -72,9 +79,13 @@
p := PonPort{
NumOnu: olt.NumOnuPerPon,
ID: uint32(i),
- OperState: DOWN,
Type: "pon",
}
+ p.OperState = getOperStateFSM(func(e *fsm.Event) {
+ oltLogger.WithFields(log.Fields{
+ "ID": p.ID,
+ }).Debugf("Changing PON Port OperState from %s to %s", e.Src, e.Dst)
+ })
// create ONU devices
for j := 0; j < onuPerPon; j++ {
@@ -98,13 +109,13 @@
address := "0.0.0.0:50060"
lis, err := net.Listen("tcp", address)
if err != nil {
- logger.Fatalf("OLT failed to listen: %v", err)
+ oltLogger.Fatalf("OLT failed to listen: %v", err)
}
grpcServer := grpc.NewServer()
openolt.RegisterOpenoltServer(grpcServer, o)
go grpcServer.Serve(lis)
- logger.Debugf("OLT Listening on: %v", address)
+ oltLogger.Debugf("OLT Listening on: %v", address)
return nil
}
@@ -113,13 +124,13 @@
func (o OltDevice) Enable (stream openolt.Openolt_EnableIndicationServer) error {
- logger.Debug("Enable OLT called")
+ oltLogger.Debug("Enable OLT called")
wg := sync.WaitGroup{}
wg.Add(1)
// create a channel for all the OLT events
- go o.oltChannels(stream)
+ go o.processOltMessages(stream)
// enable the OLT
olt_msg := Message{
@@ -154,6 +165,7 @@
o.channel <- msg
for _, onu := range pon.Onus {
+ go onu.processOnuMessages(stream)
msg := Message{
Type: OnuDiscIndication,
Data: OnuDiscIndicationMessage{
@@ -161,7 +173,7 @@
OperState: UP,
},
}
- o.channel <- msg
+ onu.channel <- msg
}
}
@@ -189,116 +201,101 @@
return nil, errors.New(fmt.Sprintf("Cannot find NniPort with id %d in OLT %d", id, o.ID))
}
-func (o OltDevice) stateChange(e *fsm.Event) {
- logger.WithFields(log.Fields{
- "oltId": o.ID,
- "dstState": e.Dst,
- "srcState": e.Src,
- }).Debugf("OLT state has changed")
-}
-
func (o OltDevice) sendOltIndication(msg OltIndicationMessage, stream openolt.Openolt_EnableIndicationServer) {
data := &openolt.Indication_OltInd{OltInd: &openolt.OltIndication{OperState: msg.OperState.String()}}
if err := stream.Send(&openolt.Indication{Data: data}); err != nil {
- logger.Error("Failed to send Indication_OltInd: %v", err)
+ oltLogger.Error("Failed to send Indication_OltInd: %v", err)
}
- logger.WithFields(log.Fields{
+ oltLogger.WithFields(log.Fields{
"OperState": msg.OperState,
}).Debug("Sent Indication_OltInd")
}
func (o OltDevice) sendNniIndication(msg NniIndicationMessage, stream openolt.Openolt_EnableIndicationServer) {
nni, _ := o.getNniById(msg.NniPortID)
- nni.OperState = UP
+ nni.OperState.Event("enable")
+ // NOTE Operstate may need to be an integer
operData := &openolt.Indication_IntfOperInd{IntfOperInd: &openolt.IntfOperIndication{
Type: nni.Type,
IntfId: nni.ID,
- OperState: nni.OperState.String(),
+ OperState: nni.OperState.Current(),
}}
if err := stream.Send(&openolt.Indication{Data: operData}); err != nil {
- logger.Error("Failed to send Indication_IntfOperInd for NNI: %v", err)
+ oltLogger.Error("Failed to send Indication_IntfOperInd for NNI: %v", err)
}
- logger.WithFields(log.Fields{
+ oltLogger.WithFields(log.Fields{
"Type": nni.Type,
"IntfId": nni.ID,
- "OperState": nni.OperState.String(),
+ "OperState": nni.OperState.Current(),
}).Debug("Sent Indication_IntfOperInd for NNI")
}
func (o OltDevice) sendPonIndication(msg PonIndicationMessage, stream openolt.Openolt_EnableIndicationServer) {
pon, _ := o.getPonById(msg.PonPortID)
- pon.OperState = UP
+ pon.OperState.Event("enable")
discoverData := &openolt.Indication_IntfInd{IntfInd: &openolt.IntfIndication{
IntfId: pon.ID,
- OperState: pon.OperState.String(),
+ OperState: pon.OperState.Current(),
}}
if err := stream.Send(&openolt.Indication{Data: discoverData}); err != nil {
- logger.Error("Failed to send Indication_IntfInd: %v", err)
+ oltLogger.Error("Failed to send Indication_IntfInd: %v", err)
}
- logger.WithFields(log.Fields{
+ oltLogger.WithFields(log.Fields{
"IntfId": pon.ID,
- "OperState": pon.OperState.String(),
+ "OperState": pon.OperState.Current(),
}).Debug("Sent Indication_IntfInd")
operData := &openolt.Indication_IntfOperInd{IntfOperInd: &openolt.IntfOperIndication{
Type: pon.Type,
IntfId: pon.ID,
- OperState: pon.OperState.String(),
+ OperState: pon.OperState.Current(),
}}
if err := stream.Send(&openolt.Indication{Data: operData}); err != nil {
- logger.Error("Failed to send Indication_IntfOperInd for PON: %v", err)
+ oltLogger.Error("Failed to send Indication_IntfOperInd for PON: %v", err)
}
- logger.WithFields(log.Fields{
+ oltLogger.WithFields(log.Fields{
"Type": pon.Type,
"IntfId": pon.ID,
- "OperState": pon.OperState.String(),
+ "OperState": pon.OperState.Current(),
}).Debug("Sent Indication_IntfOperInd for PON")
}
-func (o OltDevice) oltChannels(stream openolt.Openolt_EnableIndicationServer) {
- logger.Debug("Started OLT Indication Channel")
+func (o OltDevice) processOltMessages(stream openolt.Openolt_EnableIndicationServer) {
+ oltLogger.Debug("Started OLT Indication Channel")
for message := range o.channel {
- _msg, _ok := message.(Message)
- if _ok {
- logger.WithFields(log.Fields{
- "oltId": o.ID,
- "messageType": _msg.Type,
- }).Debug("Received message")
- switch _msg.Data.(type) {
- case OltIndicationMessage:
- msg, _ := _msg.Data.(OltIndicationMessage)
+ oltLogger.WithFields(log.Fields{
+ "oltId": o.ID,
+ "messageType": message.Type,
+ }).Trace("Received message")
+
+ switch message.Type {
+ case OltIndication:
+ msg, _ := message.Data.(OltIndicationMessage)
+ if msg.OperState == UP {
o.InternalState.Event("enable")
- o.sendOltIndication(msg, stream)
- case NniIndicationMessage:
- msg, _ := _msg.Data.(NniIndicationMessage)
- o.sendNniIndication(msg, stream)
- case PonIndicationMessage:
- msg, _ := _msg.Data.(PonIndicationMessage)
- o.sendPonIndication(msg, stream)
- case OnuDiscIndicationMessage:
- msg, _ := _msg.Data.(OnuDiscIndicationMessage)
- msg.Onu.InternalState.Event("discover")
- msg.Onu.sendOnuDiscIndication(msg, stream)
- case OnuIndicationMessage:
- msg, _ := _msg.Data.(OnuIndicationMessage)
- pon, _ := o.getPonById(msg.PonPortID)
- onu, _ := pon.getOnuBySn(msg.OnuSN)
- onu.InternalState.Event("enable")
- onu.sendOnuIndication(msg, stream)
- default:
- logger.Warnf("Received unkown message data %v for type %v", _msg.Data, _msg.Type)
+ o.OperState.Event("enable")
+ } else if msg.OperState == DOWN {
+ o.InternalState.Event("disable")
+ o.OperState.Event("disable")
}
- } else {
- logger.Warnf("Received unkown message %v", message)
+ o.sendOltIndication(msg, stream)
+ case NniIndication:
+ msg, _ := message.Data.(NniIndicationMessage)
+ o.sendNniIndication(msg, stream)
+ case PonIndication:
+ msg, _ := message.Data.(PonIndicationMessage)
+ o.sendPonIndication(msg, stream)
+ default:
+ oltLogger.Warnf("Received unknown message data %v for type %v in OLT channel", message.Data, message.Type)
}
}
@@ -307,9 +304,14 @@
// GRPC Endpoints
func (o OltDevice) ActivateOnu(context context.Context, onu *openolt.Onu) (*openolt.Empty, error) {
- logger.WithFields(log.Fields{
+ oltLogger.WithFields(log.Fields{
"onuSerialNumber": onu.SerialNumber,
}).Info("Received ActivateOnu call from VOLTHA")
+
+ pon, _ := o.getPonById(onu.IntfId)
+ _onu, _ := pon.getOnuBySn(onu.SerialNumber)
+
+ // NOTE we need to immediately activate the ONU or the OMCI state machine won't start
msg := Message{
Type: OnuIndication,
Data: OnuIndicationMessage{
@@ -318,59 +320,66 @@
OperState: UP,
},
}
- o.channel <- msg
+ _onu.channel <- msg
return new(openolt.Empty) , nil
}
func (o OltDevice) DeactivateOnu(context.Context, *openolt.Onu) (*openolt.Empty, error) {
- logger.Error("DeactivateOnu not implemented")
+ oltLogger.Error("DeactivateOnu not implemented")
return new(openolt.Empty) , nil
}
func (o OltDevice) DeleteOnu(context.Context, *openolt.Onu) (*openolt.Empty, error) {
- logger.Error("DeleteOnu not implemented")
+ oltLogger.Error("DeleteOnu not implemented")
return new(openolt.Empty) , nil
}
func (o OltDevice) DisableOlt(context.Context, *openolt.Empty) (*openolt.Empty, error) {
- logger.Error("DisableOlt not implemented")
+ // NOTE when we disable the OLT should we disable NNI, PONs and ONUs altogether?
+ olt_msg := Message{
+ Type: OltIndication,
+ Data: OltIndicationMessage{
+ OperState: DOWN,
+ },
+ }
+ o.channel <- olt_msg
return new(openolt.Empty) , nil
}
func (o OltDevice) DisablePonIf(context.Context, *openolt.Interface) (*openolt.Empty, error) {
- logger.Error("DisablePonIf not implemented")
+ oltLogger.Error("DisablePonIf not implemented")
return new(openolt.Empty) , nil
}
func (o OltDevice) EnableIndication(_ *openolt.Empty, stream openolt.Openolt_EnableIndicationServer) error {
- logger.WithField("oltId", o.ID).Info("OLT receives EnableIndication call from VOLTHA")
+ oltLogger.WithField("oltId", o.ID).Info("OLT receives EnableIndication call from VOLTHA")
o.Enable(stream)
return nil
}
func (o OltDevice) EnablePonIf(context.Context, *openolt.Interface) (*openolt.Empty, error) {
- logger.Error("EnablePonIf not implemented")
+ oltLogger.Error("EnablePonIf not implemented")
return new(openolt.Empty) , nil
}
func (o OltDevice) FlowAdd(context.Context, *openolt.Flow) (*openolt.Empty, error) {
- logger.Error("FlowAdd not implemented")
+ oltLogger.Error("FlowAdd not implemented")
return new(openolt.Empty) , nil
}
func (o OltDevice) FlowRemove(context.Context, *openolt.Flow) (*openolt.Empty, error) {
- logger.Error("FlowRemove not implemented")
+ oltLogger.Error("FlowRemove not implemented")
return new(openolt.Empty) , nil
}
func (o OltDevice) HeartbeatCheck(context.Context, *openolt.Empty) (*openolt.Heartbeat, error) {
- logger.Error("HeartbeatCheck not implemented")
+ oltLogger.Error("HeartbeatCheck not implemented")
return new(openolt.Heartbeat) , nil
}
func (o OltDevice) GetDeviceInfo(context.Context, *openolt.Empty) (*openolt.DeviceInfo, error) {
- logger.WithField("oltId", o.ID).Info("OLT receives GetDeviceInfo call from VOLTHA")
+ oltLogger.WithField("oltId", o.ID).Info("OLT receives GetDeviceInfo call from VOLTHA")
devinfo := new(openolt.DeviceInfo)
devinfo.Vendor = "BBSim"
devinfo.Model = "asfvolt16"
@@ -391,52 +400,64 @@
return devinfo, nil
}
-func (o OltDevice) OmciMsgOut(context.Context, *openolt.OmciMsg) (*openolt.Empty, error) {
- logger.Error("OmciMsgOut not implemented")
+func (o OltDevice) OmciMsgOut(ctx context.Context, omci_msg *openolt.OmciMsg) (*openolt.Empty, error) {
+ oltLogger.Debugf("Recevied OmciMsgOut - IntfId: %d OnuId: %d", omci_msg.IntfId, omci_msg.OnuId)
+ pon, _ := o.getPonById(omci_msg.IntfId)
+ onu, _ := pon.getOnuById(omci_msg.OnuId)
+ msg := Message{
+ Type: OMCI,
+ Data: OmciMessage{
+ OnuSN: onu.SerialNumber,
+ OnuId: onu.ID,
+ msg: omci_msg,
+ },
+ }
+ onu.channel <- msg
return new(openolt.Empty) , nil
}
func (o OltDevice) OnuPacketOut(context.Context, *openolt.OnuPacket) (*openolt.Empty, error) {
- logger.Error("OnuPacketOut not implemented")
+ oltLogger.Error("OnuPacketOut not implemented")
return new(openolt.Empty) , nil
}
func (o OltDevice) Reboot(context.Context, *openolt.Empty) (*openolt.Empty, error) {
- logger.Error("Reboot not implemented")
+ oltLogger.Info("Shutting Down, hope you're running in K8s...")
+ os.Exit(0)
return new(openolt.Empty) , nil
}
func (o OltDevice) ReenableOlt(context.Context, *openolt.Empty) (*openolt.Empty, error) {
- logger.Error("ReenableOlt not implemented")
+ oltLogger.Error("ReenableOlt not implemented")
return new(openolt.Empty) , nil
}
func (o OltDevice) UplinkPacketOut(context context.Context, packet *openolt.UplinkPacket) (*openolt.Empty, error) {
- logger.Error("UplinkPacketOut not implemented")
+ oltLogger.Error("UplinkPacketOut not implemented")
return new(openolt.Empty) , nil
}
func (o OltDevice) CollectStatistics(context.Context, *openolt.Empty) (*openolt.Empty, error) {
- logger.Error("CollectStatistics not implemented")
+ oltLogger.Error("CollectStatistics not implemented")
return new(openolt.Empty) , nil
}
func (o OltDevice) CreateTconts(context context.Context, packet *openolt.Tconts) (*openolt.Empty, error) {
- logger.Error("CreateTconts not implemented")
+ oltLogger.Error("CreateTconts not implemented")
return new(openolt.Empty) , nil
}
func (o OltDevice) RemoveTconts(context context.Context, packet *openolt.Tconts) (*openolt.Empty, error) {
- logger.Error("RemoveTconts not implemented")
+ oltLogger.Error("RemoveTconts not implemented")
return new(openolt.Empty) , nil
}
func (o OltDevice) GetOnuInfo(context context.Context, packet *openolt.Onu) (*openolt.OnuIndication, error) {
- logger.Error("GetOnuInfo not implemented")
+ oltLogger.Error("GetOnuInfo not implemented")
return new(openolt.OnuIndication) , nil
}
func (o OltDevice) GetPonIf(context context.Context, packet *openolt.Interface) (*openolt.IntfIndication, error) {
- logger.Error("GetPonIf not implemented")
+ oltLogger.Error("GetPonIf not implemented")
return new(openolt.IntfIndication) , nil
}
\ No newline at end of file