SEBA-758 send periodic port stats to voltha
SEBA-790 get flow, gemport, and tcont information through API
fix lint errors
Change-Id: I10909e1992eba71d8e54c976ccbcea8778e35539
diff --git a/core/core_server.go b/core/core_server.go
index a8d9d52..64bd6c7 100644
--- a/core/core_server.go
+++ b/core/core_server.go
@@ -29,7 +29,7 @@
"github.com/google/gopacket/layers"
"github.com/google/gopacket/pcap"
omci "github.com/opencord/omci-sim"
- pb "github.com/opencord/voltha-bbsim/api"
+ api "github.com/opencord/voltha-bbsim/api"
"github.com/opencord/voltha-bbsim/common/logger"
"github.com/opencord/voltha-bbsim/device"
flowHandler "github.com/opencord/voltha-bbsim/flow"
@@ -49,6 +49,7 @@
SerialNumberLength = 12
OpenOltStart = "start"
OpenOltStop = "stop"
+ AutoDiscoveryDelay = 5
)
// Server structure consists of all the params required for BBsim.
@@ -78,9 +79,9 @@
eapolOut chan *byteMsg
dhcpIn chan *byteMsg
dhcpOut chan *byteMsg
- FlowMap map[FlowKey]*openolt.Flow
+ FlowMap map[device.FlowKey]*openolt.Flow
alarmCh chan *openolt.Indication
- deviceActionCh chan *pb.DeviceAction
+ deviceActionCh chan *api.DeviceAction
serverActionCh chan string
}
@@ -91,26 +92,19 @@
}
type byteMsg struct {
- IntfId uint32
- OnuId uint32
+ IntfID uint32
+ OnuID uint32
Byte []byte
}
type stateReport struct {
device device.Device
- current device.DeviceState
- next device.DeviceState
+ current device.State
+ next device.State
}
-// FlowKey used for FlowMap key
-type FlowKey struct {
- FlowID uint32
- FlowDirection string
-}
-
-//Has options (OLT id, number onu ports) from mediator
// NewCore initialize OLT and ONU objects
-func NewCore(opt *option) *Server {
+func NewCore(opt *Option) *Server {
// TODO: make it decent
oltid := opt.oltid
npon := opt.npon
@@ -145,17 +139,14 @@
eapolOut: make(chan *byteMsg, 1024),
dhcpIn: make(chan *byteMsg, 1024),
dhcpOut: make(chan *byteMsg, 1024),
- FlowMap: make(map[FlowKey]*openolt.Flow),
+ FlowMap: make(map[device.FlowKey]*openolt.Flow),
serverActionCh: make(chan string),
}
logger.Info("OLT %d created: %v", s.Olt.ID, s.Olt)
-
- nnni := s.Olt.NumNniIntf
- logger.Info("OLT ID: %d was retrieved.", s.Olt.ID)
logger.Info("OLT Serial-Number: %v", s.Olt.SerialNumber)
// Creating Onu Map
for intfid := uint32(0); intfid < npon; intfid++ {
- s.Onumap[intfid] = device.NewOnus(oltid, intfid, nonus, nnni)
+ s.Onumap[intfid] = device.NewOnus(oltid, intfid, nonus)
}
logger.Debug("Onu Map:")
@@ -221,7 +212,7 @@
grpcAddressPort := s.gRPCAddress + ":" + strconv.Itoa(int(s.mgmtGrpcPort))
restAddressPort := s.gRPCAddress + ":" + strconv.Itoa(int(s.mgmtRestPort))
// Start rest gateway for BBSim server
- go StartRestGatewayService(grpcAddressPort, restAddressPort, wg)
+ go StartRestGatewayService(grpcAddressPort, restAddressPort)
addressPort := s.gRPCAddress + ":" + strconv.Itoa(int(s.mgmtGrpcPort))
listener, apiserver, err := NewMgmtAPIServer(addressPort)
@@ -231,7 +222,7 @@
}
s.mgmtServer = apiserver
- pb.RegisterBBSimServiceServer(apiserver, s)
+ api.RegisterBBSimServiceServer(apiserver, s)
if e := apiserver.Serve(listener); e != nil {
logger.Error("Failed to run management api server %v", e)
return
@@ -259,17 +250,17 @@
// onu.Initialize()
// }
// }
- s.updateDevIntState(olt, device.OLT_INACTIVE)
+ s.updateDevIntState(olt, device.OltInactive)
logger.Debug("Enable() Done")
}()
logger.Debug("Enable() Start")
s.EnableServer = sv
+ flowHandler.InitializePacketInStream(*sv)
if err := s.activateOLT(*sv); err != nil {
return err
}
- s.updateDevIntState(olt, device.OLT_PREACTIVE)
-
+ s.updateDevIntState(olt, device.OltPreactive)
coreCtx := context.Background()
coreCtx, corecancel := context.WithCancel(coreCtx)
s.cancel = corecancel
@@ -279,6 +270,9 @@
if s.AutoONUActivate == true {
// Initialize all ONUs
+ // Allow some delay for OLT to become active at the VOLTHA side, before sending ONU discovery indication.
+ // Otherwise, ONU discovery indication may get processed at the VOLTHA before OLT state becomes active.
+ time.Sleep(AutoDiscoveryDelay * time.Second)
for intfid := range s.Onumap {
for _, onu := range s.Onumap[intfid] {
onu.Initialize()
@@ -308,7 +302,8 @@
s.StopPktLoops()
}
-func (s *Server) updateDevIntState(dev device.Device, state device.DeviceState) {
+func (s *Server) updateDevIntState(dev device.Device, state device.State) {
+ logger.Debug("updateDevIntState called state:%d", state)
current := dev.GetIntState()
dev.UpdateIntState(state)
logger.Debug("updateDevIntState called state: current %s, next %s", device.ONUState[current], device.ONUState[dev.GetIntState()])
@@ -322,8 +317,7 @@
}
}
-func (s *Server) updateOnuIntState(intfid uint32, onuid uint32, state device.DeviceState) error {
-
+func (s *Server) updateOnuIntState(intfid uint32, onuid uint32, state device.State) error {
onu, err := s.GetOnuByID(onuid, intfid)
if err != nil {
@@ -376,15 +370,17 @@
logger.Debug("activateOLT() Start")
// Activate OLT
olt := s.Olt
+ olt.OperState = "up"
if err := sendOltIndUp(stream, olt); err != nil {
+ olt.OperState = "down"
return err
}
- olt.OperState = "up"
logger.Info("OLT %s sent OltInd.", olt.Name)
// OLT sends Interface Indication to Adapter
if err := sendIntfInd(stream, olt); err != nil {
logger.Error("Fail to sendIntfInd: %v", err)
+ olt.OperState = "down"
return err
}
logger.Info("OLT %s sent IntfInd.", olt.Name)
@@ -392,6 +388,7 @@
// OLT sends Operation Indication to Adapter after activating each interface
if err := sendOperInd(stream, olt); err != nil {
logger.Error("Fail to sendOperInd: %v", err)
+ olt.OperState = "down"
return err
}
logger.Info("OLT %s sent OperInd.", olt.Name)
@@ -406,9 +403,10 @@
s.Vethnames = []string{}
s.Ioinfos = []*Ioinfo{}
s.wg.Done()
- s.updateDevIntState(s.Olt, device.OLT_PREACTIVE)
+ s.updateDevIntState(s.Olt, device.OltPreactive)
logger.Debug("StartPktLoops () Done")
}()
+ go s.sendPortStats()
s.alarmCh = make(chan *openolt.Indication, 10)
go startAlarmLoop(stream, s.alarmCh)
go s.startDeviceActionLoop()
@@ -442,7 +440,7 @@
}
func createIoinfos(oltid uint32, Vethnames []string) ([]*Ioinfo, []string, error) {
- ioinfos := []*Ioinfo{}
+ var ioinfos []*Ioinfo
var err error
var handler *pcap.Handle
nniup, nnidw := makeNniName(oltid)
@@ -546,21 +544,21 @@
close(nnichannel)
}()
logger.Debug("BEFORE OLT_ACTIVE")
- s.updateDevIntState(s.Olt, device.OLT_ACTIVE)
+ s.updateDevIntState(s.Olt, device.OltActive)
logger.Debug("AFTER OLT_ACTIVE")
data := &openolt.Indication_PktInd{}
for {
select {
case msg := <-s.omciIn:
- logger.Debug("OLT %d send omci indication, IF %v (ONU-ID: %v) pkt:%x.", s.Olt.ID, msg.IntfId, msg.OnuId, msg.Pkt)
- omci := &openolt.Indication_OmciInd{OmciInd: &msg}
- if err := stream.Send(&openolt.Indication{Data: omci}); err != nil {
- logger.Error("send omci indication failed: %v", err)
+ logger.Debug("OLT %d send omciInd indication, IF %v (ONU-ID: %v) pkt:%x.", s.Olt.ID, msg.IntfId, msg.OnuId, msg.Pkt)
+ omciInd := &openolt.Indication_OmciInd{OmciInd: &msg}
+ if err := stream.Send(&openolt.Indication{Data: omciInd}); err != nil {
+ logger.Error("send omciInd indication failed: %v", err)
continue
}
case msg := <-s.eapolIn:
- intfid := msg.IntfId
- onuid := msg.OnuId
+ intfid := msg.IntfID
+ onuid := msg.OnuID
gemid, err := s.getGemPortID(intfid, onuid)
if err != nil {
logger.Error("Failed to getGemPortID intfid:%d onuid:%d", intfid, onuid)
@@ -575,8 +573,8 @@
return err
}
case msg := <-s.dhcpIn: // TODO: We should put omciIn, eapolIn, dhcpIn toghether
- intfid := msg.IntfId
- onuid := msg.OnuId
+ intfid := msg.IntfID
+ onuid := msg.OnuID
gemid, err := s.getGemPortID(intfid, onuid)
bytes := msg.Byte
pkt := gopacket.NewPacket(bytes, layers.LayerTypeEthernet, gopacket.Default)
@@ -626,12 +624,12 @@
onuid := nnipkt.Info.onuid
intfid := nnipkt.Info.intfid
onu, _ := s.GetOnuByID(onuid, intfid)
+ pkt := nnipkt.Pkt
device.LoggerWithOnu(onu).Info("Received packet from NNI in grpc Server.")
- pkt := nnipkt.Pkt
data = &openolt.Indication_PktInd{PktInd: &openolt.PacketIndication{IntfType: "nni", IntfId: intfid, Pkt: pkt.Data()}}
- if err := stream.Send(&openolt.Indication{Data: data}); err != nil {
+ if err = stream.Send(&openolt.Indication{Data: data}); err != nil {
logger.Error("Fail to send PktInd indication: %v", err)
return err
}
@@ -656,7 +654,7 @@
ethtype := pkt.EthernetType
if ethtype == layers.EthernetTypeEAPOL {
device.LoggerWithOnu(onu).Info("Received downstream packet is EAPOL.")
- eapolPkt := byteMsg{IntfId: intfid, OnuId: onuid, Byte: rawpkt.Data()}
+ eapolPkt := byteMsg{IntfID: intfid, OnuID: onuid, Byte: rawpkt.Data()}
s.eapolOut <- &eapolPkt
return nil
} else if layerDHCP := rawpkt.Layer(layers.LayerTypeDHCPv4); layerDHCP != nil {
@@ -677,7 +675,7 @@
}
}
logger.Debug("%s", poppkt.Dump())
- dhcpPkt := byteMsg{IntfId: intfid, OnuId: onuid, Byte: poppkt.Data()}
+ dhcpPkt := byteMsg{IntfID: intfid, OnuID: onuid, Byte: poppkt.Data()}
s.dhcpOut <- &dhcpPkt
return nil
} else {
@@ -713,30 +711,24 @@
}
ioinfo, err := s.IdentifyNniIoinfo("inside")
if err != nil {
+ logger.Error("failed to get ioinfo")
return err
}
handle := ioinfo.handler
logger.Debug("%s", poppkt.Dump())
SendNni(handle, poppkt)
+ // Send packet to nni port
+ if err := flowHandler.PacketOut(poppkt, "nni", ioinfo.intfid); err != nil {
+ logger.Error("Error in sending packet to nni port")
+ }
return nil
}
-// IsAllOnuActive checks for ONU_ACTIVE state for all the onus in the map
-func IsAllOnuActive(onumap map[uint32][]*device.Onu) bool {
- for _, onus := range onumap {
- for _, onu := range onus {
- if onu.GetIntState() != device.ONU_ACTIVE {
- return false
- }
- }
- }
- return true
-}
-
+// isAllOnuOmciActive checks for OnuOmciActive state for all the onus in the map
func (s *Server) isAllOnuOmciActive() bool {
for _, onus := range s.Onumap {
for _, onu := range onus {
- if onu.GetIntState() != device.ONU_OMCIACTIVE {
+ if onu.GetIntState() != device.OnuOmciActive {
return false
}
}
@@ -754,7 +746,9 @@
logger.Error("Failed to getGemPortID: %s", err)
return 0, err
}
- gemportid = onu.GemportID
+ for _, gemports := range onu.GemPortMap {
+ return gemports[0], nil
+ }
}
return uint32(gemportid), nil
}
@@ -839,7 +833,7 @@
s.SNmap.Range(
func(key, value interface{}) bool {
onu := value.(*device.Onu)
- if onu.InternalState == device.ONU_LOS_RAISED {
+ if onu.InternalState == device.OnuLosRaised {
return true
}
@@ -876,19 +870,19 @@
// TODO all onu and olt related actions (like alarms) should be handled using this function
func (s *Server) startDeviceActionLoop() {
logger.Debug("startDeviceActionLoop invoked")
- s.deviceActionCh = make(chan *pb.DeviceAction, 10)
+ s.deviceActionCh = make(chan *api.DeviceAction, 10)
for {
logger.Debug("Action channel loop started")
select {
case Req := <-s.deviceActionCh:
- logger.Debug("Reboot Action Type: %+v", Req.DeviceAction)
+ logger.Debug("Reboot Action Type: %+v", Req.Action)
switch Req.DeviceType {
case DeviceTypeOnu:
- value, _ := s.SNmap.Load(Req.DeviceSerialNumber)
+ value, _ := s.SNmap.Load(Req.SerialNumber)
onu := value.(*device.Onu)
- if Req.DeviceAction == SoftReboot {
+ if Req.Action == SoftReboot {
s.handleONUSoftReboot(onu.IntfID, onu.OnuID)
- } else if Req.DeviceAction == HardReboot {
+ } else if Req.Action == HardReboot {
s.handleONUHardReboot(onu)
}
case DeviceTypeOlt:
@@ -900,3 +894,27 @@
}
}
}
+
+func (s *Server) sendPortStats() {
+ for {
+ for i, port := range s.Olt.NniIntfs {
+ // send nni port stats
+ logger.Debug("Sending port stats for NNI %d", port.IntfID)
+ err := sendPortStats(*s.EnableServer, &s.Olt.NniIntfs[i])
+ if err != nil {
+ logger.Error("Failed to send port stats for NNI %d", port.IntfID)
+ }
+ }
+
+ for i, port := range s.Olt.PonIntfs {
+ // send pon port stats
+ logger.Debug("Sending port stats for PON %d", port.IntfID)
+ err := sendPortStats(*s.EnableServer, &s.Olt.PonIntfs[i])
+ if err != nil {
+ logger.Error("Failed to send port stats for PON %d", port.IntfID)
+ }
+ }
+
+ time.Sleep(10 * time.Second)
+ }
+}