VOL-1358 BBSim - Improve the mediator component
This update is for triggering the test activation per-ONU.
Each OLT/ONU instance's state update is notified to the mediator.
Change-Id: Ia702c0081720d4f1cee10929077b97cc0661c375
diff --git a/core/core_server.go b/core/core_server.go
index 9886051..667a900 100644
--- a/core/core_server.go
+++ b/core/core_server.go
@@ -32,6 +32,7 @@
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
"golang.org/x/sync/errgroup"
+ "reflect"
)
const (
@@ -56,8 +57,7 @@
EnableServer *openolt.Openolt_EnableIndicationServer
CtagMap map[string]uint32
cancel context.CancelFunc
- state coreState
- stateChan chan coreState
+ stateRepCh chan stateReport
omciIn chan openolt.OmciIndication
omciOut chan openolt.OmciMsg
}
@@ -67,19 +67,11 @@
Pkt gopacket.Packet
}
-type coreState int
-
-const (
- INACTIVE = iota // OLT/ONUs are not instantiated
- PRE_ACTIVE // Before running MainPacketLoop
- ACTIVE // After running MainPacketLoop
-)
-
-/* coreState
-INACTIVE -> PRE_ACTIVE -> ACTIVE
- (ActivateOLT) (Enable)
- <- <-
-*/
+type stateReport struct {
+ device device.Device
+ current device.DeviceState
+ next device.DeviceState
+}
func NewCore(opt *option) *Server {
// TODO: make it decent
@@ -96,8 +88,7 @@
IndInterval: opt.intvl,
Processes: []string{},
EnableServer: nil,
- state: INACTIVE,
- stateChan: make(chan coreState, 8),
+ stateRepCh : make(chan stateReport, 8),
omciIn: make(chan openolt.OmciIndication, 1024),
omciOut: make(chan openolt.OmciMsg, 1024),
}
@@ -124,7 +115,7 @@
s.wg = &sync.WaitGroup{}
logger.Debug("Start() Start")
defer func() {
- close(s.stateChan)
+ close(s.stateRepCh )
logger.Debug("Start() Done")
}()
addressport := s.gRPCAddress + ":" + strconv.Itoa(int(s.gRPCPort))
@@ -157,15 +148,15 @@
// Blocking
func (s *Server) Enable(sv *openolt.Openolt_EnableIndicationServer) error {
+ olt := s.Olt
defer func() {
- olt := s.Olt
- olt.InitializeStatus()
+ olt.Initialize()
for intfid, _ := range s.Onumap {
for _, onu := range s.Onumap[intfid] {
- onu.InitializeStatus()
+ onu.Initialize()
}
}
- s.updateState(INACTIVE)
+ s.updateDevIntState(olt, device.OLT_INACTIVE)
logger.Debug("Enable() Done")
}()
logger.Debug("Enable() Start")
@@ -173,7 +164,7 @@
if err := s.activateOLT(*sv); err != nil {
return err
}
- s.updateState(PRE_ACTIVE)
+ s.updateDevIntState(olt, device.OLT_PREACTIVE)
coreCtx := context.Background()
coreCtx, corecancel := context.WithCancel(coreCtx)
@@ -193,10 +184,17 @@
s.StopPktLoops()
}
-func (s *Server) updateState(state coreState) {
- s.state = state
- s.stateChan <- state
- logger.Debug("State updated to:%d", state)
+func (s *Server) updateDevIntState(dev device.Device, state device.DeviceState) {
+ current := dev.GetIntState()
+ dev.UpdateIntState(state)
+ s.stateRepCh <- stateReport{device: dev, current:current, next: state}
+ if reflect.TypeOf(dev) == reflect.TypeOf(&device.Olt{}){
+ logger.Debug("OLT State updated to:%d", state)
+ } else if reflect.TypeOf(dev) == reflect.TypeOf(&device.Onu{}){
+ logger.Debug("ONU State updated to:%d", state)
+ } else {
+ logger.Error("UpdateDevIntState () doesn't support this device: %s", reflect.TypeOf(dev))
+ }
}
func (s *Server) activateOLT(stream openolt.Openolt_EnableIndicationServer) error {
@@ -208,7 +206,6 @@
return err
}
olt.OperState = "up"
- *olt.InternalState = device.OLT_UP
logger.Info("OLT %s sent OltInd.", olt.Name)
// OLT sends Interface Indication to Adapter
@@ -220,7 +217,6 @@
// OLT sends Operation Indication to Adapter after activating each interface
//time.Sleep(IF_UP_TIME * time.Second)
- *olt.InternalState = device.PONIF_UP
if err := sendOperInd(stream, olt); err != nil {
logger.Error("Fail to sendOperInd: %v", err)
return err
@@ -260,7 +256,7 @@
s.Vethnames = []string{}
s.Ioinfos = []*Ioinfo{}
s.wg.Done()
- s.updateState(PRE_ACTIVE)
+ s.updateDevIntState(s.Olt, device.OLT_PREACTIVE)
logger.Debug("StartPktLoops () Done")
}()
s.wg.Add(1)
@@ -344,7 +340,7 @@
logger.Error("Error happend in Omci:%s", v)
return v
} else { //Close
- s.updateState(ACTIVE)
+ s.updateDevIntState(s.Olt, device.OLT_ACTIVE)
}
case <- child.Done():
return nil
@@ -536,7 +532,7 @@
func IsAllOnuActive(onumap map[uint32][]*device.Onu) bool {
for _, onus := range onumap {
for _, onu := range onus {
- if onu.GetIntStatus() != device.ONU_ACTIVATED {
+ if onu.GetIntState() != device.ONU_ACTIVE {
return false
}
}
diff --git a/core/grpc_service.go b/core/grpc_service.go
index 05a3d85..6c07833 100644
--- a/core/grpc_service.go
+++ b/core/grpc_service.go
@@ -83,7 +83,7 @@
}
onuid := onu.OnuId
matched.OnuID = onuid
- matched.UpdateIntStatus(device.ONU_ACTIVATED)
+ s.updateDevIntState(matched, device.ONU_ACTIVE)
logger.Debug("ONU IntfID: %d OnuID: %d activated succesufully.", onu.IntfId, onu.OnuId)
}
return new(openolt.Empty), nil
diff --git a/core/mediator.go b/core/mediator.go
index 8baeafd..b973493 100644
--- a/core/mediator.go
+++ b/core/mediator.go
@@ -27,6 +27,8 @@
"gerrit.opencord.org/voltha-bbsim/common/logger"
log "github.com/sirupsen/logrus"
+ "gerrit.opencord.org/voltha-bbsim/device"
+ "reflect"
)
type option struct {
@@ -79,32 +81,14 @@
return o
}
-type stateMachine struct {
- handlers []*handler
- state coreState
-}
-
type handler struct {
- dst coreState
- src coreState
+ dst device.DeviceState
+ src device.DeviceState
method func(s *Server) error
}
-func (sm *stateMachine) transit(next coreState) func(s *Server) error {
- for _, handler := range sm.handlers {
- if handler.src == sm.state && handler.dst == next {
- logger.Debug("Hit (src:%d, dst:%d)", handler.src, handler.dst)
- sm.state = next
- return handler.method
- }
- }
- sm.state = next
- return nil
-}
-
type mediator struct {
opt *option
- sm *stateMachine
server *Server
tester *Tester
}
@@ -138,15 +122,9 @@
tester := NewTester(opt)
m.server = server
m.tester = tester
- m.sm = &stateMachine{
- state: INACTIVE,
- handlers: []*handler{
- &handler{src: PRE_ACTIVE, dst: ACTIVE, method: m.tester.Start},
- &handler{src: ACTIVE, dst: PRE_ACTIVE, method: m.tester.Stop},
- },
- }
+
go func() {
- m.Mediate()
+ m.Mediate(server)
}()
c := make(chan os.Signal, 1)
@@ -169,23 +147,36 @@
logger.Debug("Reach to the end line")
}
-func (m *mediator) Mediate() {
- wg := sync.WaitGroup{}
+func (m *mediator) Mediate(s *Server) {
defer logger.Debug("Mediate Done")
- for corestat := range m.server.stateChan {
- logger.Debug("Mediator receives state %d of server", corestat)
- method := m.sm.transit(corestat)
- if method != nil {
- wg.Add(1)
- defer wg.Done()
- go func() error {
- if err := method(m.server); err != nil { //blocking
- m.server.Stop()
- return err
- }
- return nil
- }()
+ for sr := range m.server.stateRepCh {
+ next := sr.next
+ current := sr.current
+ dev := sr.device
+ if reflect.TypeOf(dev) == reflect.TypeOf(&device.Olt{}){
+ logger.Debug("Received OLT Device %v Current: %d Next: %d", dev, current, next)
+ if err := transitOlt(s, current, next, m.tester, m.opt); err != nil {
+ logger.Error("%v", err)
+ }
+ } else if reflect.TypeOf(dev) == reflect.TypeOf(&device.Onu{}) {
+ logger.Debug("Received ONU Device %v Current: %d Next: %d", dev, current, next)
+ key := dev.GetDevkey()
+ if err := transitOnu(s, key, current, next, m.tester, m.opt); err != nil {
+ logger.Error("%v", err)
+ }
}
}
- wg.Wait()
}
+
+func transitOlt (s *Server, current device.DeviceState, next device.DeviceState, tester *Tester, o *option) error {
+ if current == device.OLT_PREACTIVE && next == device.OLT_ACTIVE {
+ tester.Start(s)
+ } else if current == device.OLT_ACTIVE && next == device.OLT_PREACTIVE{
+ tester.Stop(s)
+ }
+ return nil
+}
+
+func transitOnu (s *Server, key device.Devkey, current device.DeviceState, next device.DeviceState, tester *Tester, o *option) error {
+ return nil
+}
\ No newline at end of file
diff --git a/device/device_olt.go b/device/device_olt.go
index 41df918..618e668 100644
--- a/device/device_olt.go
+++ b/device/device_olt.go
@@ -16,7 +16,21 @@
package device
-type oltState int
+import "sync"
+
+type DeviceState int
+
+type Device interface {
+ Initialize()
+ UpdateIntState(intstate DeviceState)
+ GetIntState() DeviceState
+ GetDevkey() Devkey
+}
+
+type Devkey struct {
+ ID uint32
+ Intfid uint32
+}
type Olt struct {
ID uint32
@@ -26,10 +40,11 @@
SerialNumber string
Manufacture string
Name string
- InternalState *oltState
+ InternalState DeviceState
OperState string
Intfs []intf
HeartbeatSignature uint32
+ mu *sync.Mutex
}
type intf struct {
@@ -38,11 +53,16 @@
OperState string
}
+/* OltState
+OLT_INACTIVE -> OLT_PREACTIVE -> ACTIVE
+ (ActivateOLT) (Enable)
+ <- <-
+*/
+
const (
- PRE_ENABLE oltState = iota
- OLT_UP
- PONIF_UP
- ONU_DISCOVERED
+ OLT_INACTIVE DeviceState = iota // OLT/ONUs are not instantiated
+ OLT_PREACTIVE // Before PacketInDaemon Running
+ OLT_ACTIVE // After PacketInDaemon Running
)
func NewOlt(oltid uint32, npon uint32, nnni uint32) *Olt {
@@ -51,11 +71,11 @@
olt.NumPonIntf = npon
olt.NumNniIntf = nnni
olt.Name = "BBSIM OLT"
- olt.InternalState = new(oltState)
- *olt.InternalState = PRE_ENABLE
+ olt.InternalState = OLT_INACTIVE
olt.OperState = "up"
olt.Intfs = make([]intf, olt.NumPonIntf+olt.NumNniIntf)
olt.HeartbeatSignature = oltid
+ olt.mu = &sync.Mutex{}
for i := uint32(0); i < olt.NumNniIntf; i++ {
olt.Intfs[i].IntfID = i
olt.Intfs[i].OperState = "up"
@@ -69,8 +89,8 @@
return &olt
}
-func (olt *Olt) InitializeStatus() {
- *olt.InternalState = PRE_ENABLE
+func (olt *Olt) Initialize() {
+ olt.InternalState = OLT_INACTIVE
olt.OperState = "up"
for i := uint32(0); i < olt.NumNniIntf; i++ {
olt.Intfs[i].IntfID = i
@@ -83,3 +103,19 @@
olt.Intfs[i].Type = "pon"
}
}
+
+func (olt *Olt) GetIntState() DeviceState {
+ olt.mu.Lock()
+ defer olt.mu.Unlock()
+ return olt.InternalState
+}
+
+func (olt *Olt) GetDevkey () Devkey {
+ return Devkey{ID: olt.ID}
+}
+
+func (olt *Olt) UpdateIntState(intstate DeviceState) {
+ olt.mu.Lock()
+ defer olt.mu.Unlock()
+ olt.InternalState = intstate
+}
\ No newline at end of file
diff --git a/device/device_onu.go b/device/device_onu.go
index 24f2cbf..c1a7aad 100644
--- a/device/device_onu.go
+++ b/device/device_onu.go
@@ -25,15 +25,13 @@
log "github.com/sirupsen/logrus"
)
-type onuState int
-
const (
- ONU_PRE_ACTIVATED onuState = iota
- ONU_ACTIVATED
+ ONU_PREACTIVATED DeviceState = iota
+ ONU_ACTIVE
)
type Onu struct {
- InternalState *onuState
+ InternalState DeviceState
OltID uint32
IntfID uint32
OperState string
@@ -51,8 +49,7 @@
onus := []*Onu{}
for i := 0; i < int(nonus); i++ {
onu := Onu{}
- onu.InternalState = new(onuState)
- *onu.InternalState = ONU_PRE_ACTIVATED
+ onu.InternalState = ONU_PREACTIVATED
onu.mu = &sync.Mutex{}
onu.IntfID = intfid
onu.OltID = oltid
@@ -65,9 +62,9 @@
return onus
}
-func (onu *Onu) InitializeStatus() {
+func (onu *Onu) Initialize() {
onu.OperState = "up"
- *onu.InternalState = ONU_PRE_ACTIVATED
+ onu.InternalState = ONU_PREACTIVATED
}
func ValidateONU(targetonu openolt.Onu, regonus map[uint32][]*Onu) bool {
@@ -95,14 +92,18 @@
}
}
-func (onu *Onu) UpdateIntStatus(intstatus onuState) {
+func (onu *Onu) UpdateIntState(intstate DeviceState) {
onu.mu.Lock()
defer onu.mu.Unlock()
- *onu.InternalState = intstatus
+ onu.InternalState = intstate
}
-func (onu *Onu) GetIntStatus() onuState {
+func (onu *Onu) GetDevkey () Devkey {
+ return Devkey{ID: onu.OnuID, Intfid:onu.IntfID}
+}
+
+func (onu *Onu) GetIntState() DeviceState {
onu.mu.Lock()
defer onu.mu.Unlock()
- return *onu.InternalState
+ return onu.InternalState
}