SEBA-432
SEBA-565
SEBA-654 (alarms)
implemented
fix Jenkins make errors
fix merge conflicts
address review comments
Change-Id: Ia2e95afb33ce55054afa1fcbd9beb6ada62dd764
diff --git a/core/core_server.go b/core/core_server.go
index ba0727a..4c4fcab 100644
--- a/core/core_server.go
+++ b/core/core_server.go
@@ -18,54 +18,73 @@
import (
"context"
+ "encoding/hex"
"errors"
+ "reflect"
"strconv"
"sync"
- "reflect"
- omci "github.com/opencord/omci-sim"
+ pb "gerrit.opencord.org/voltha-bbsim/api"
"gerrit.opencord.org/voltha-bbsim/common/logger"
"gerrit.opencord.org/voltha-bbsim/common/utils"
"gerrit.opencord.org/voltha-bbsim/device"
+ flowHandler "gerrit.opencord.org/voltha-bbsim/flow"
openolt "gerrit.opencord.org/voltha-bbsim/protos"
"github.com/google/gopacket"
"github.com/google/gopacket/layers"
"github.com/google/gopacket/pcap"
+ omci "github.com/opencord/omci-sim"
log "github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
)
+// Constants
const (
- NNI_VETH_NORTH_PFX = "nni_north"
- NNI_VETH_SOUTH_PFX = "nni_south"
- MAX_ONUS_PER_PON = 64 // This value should be the same with the value in AdapterPlatrorm class
+ NniVethNorthPfx = "nni_north"
+ NniVethSouthPfx = "nni_south"
+ MaxPonPorts = 64
+ MaxOnusPerPon = 64 // This value should be the same with the value in AdapterPlatform class
+ VendorIDLength = 4
+ SerialNumberLength = 12
+ OpenOltStart = "start"
+ OpenOltStop = "stop"
)
// Server structure consists of all the params required for BBsim.
type Server struct {
- wg *sync.WaitGroup
- Olt *device.Olt
- Onumap map[uint32][]*device.Onu
- Ioinfos []*Ioinfo
- gRPCserver *grpc.Server
- gRPCAddress string
- gRPCPort uint32
- Vethnames []string
- IndInterval int
- Processes []string
- EnableServer *openolt.Openolt_EnableIndicationServer
- CtagMap map[string]uint32
- cancel context.CancelFunc
- stateRepCh chan stateReport
- omciIn chan openolt.OmciIndication
- omciOut chan openolt.OmciMsg
- eapolIn chan *byteMsg
- eapolOut chan *byteMsg
- dhcpIn chan *byteMsg
- dhcpOut chan *byteMsg
+ wg *sync.WaitGroup
+ Olt *device.Olt
+ Onumap map[uint32][]*device.Onu
+ SNmap sync.Map
+ AutoONUActivate bool
+ Ioinfos []*Ioinfo
+ gRPCserver *grpc.Server
+ gRPCAddress string
+ gRPCPort uint32
+ mgmtServer *grpc.Server
+ mgmtGrpcPort uint32
+ mgmtRestPort uint32
+ Vethnames []string
+ IndInterval int
+ Processes []string
+ EnableServer *openolt.Openolt_EnableIndicationServer
+ CtagMap map[string]uint32
+ cancel context.CancelFunc
+ stateRepCh chan stateReport
+ omciIn chan openolt.OmciIndication
+ omciOut chan openolt.OmciMsg
+ eapolIn chan *byteMsg
+ eapolOut chan *byteMsg
+ dhcpIn chan *byteMsg
+ dhcpOut chan *byteMsg
+ FlowMap map[FlowKey]*openolt.Flow
+ alarmCh chan *openolt.Indication
+ deviceActionCh chan *pb.DeviceAction
+ serverActionCh chan string
}
+// Packet structure
type Packet struct {
Info *Ioinfo
Pkt gopacket.Packet
@@ -83,56 +102,89 @@
next device.DeviceState
}
+// FlowKey used for FlowMap key
+type FlowKey struct {
+ FlowID uint32
+ FlowDirection string
+}
+
// NewCore initialize OLT and ONU objects
func NewCore(opt *option) *Server {
// TODO: make it decent
oltid := opt.oltid
npon := opt.npon
- nonus := opt.nonus
- s := Server{
- Olt: device.NewOlt(oltid, npon, 1),
- Onumap: make(map[uint32][]*device.Onu),
- Ioinfos: []*Ioinfo{},
- gRPCAddress: opt.address,
- gRPCPort: opt.port,
- Vethnames: []string{},
- IndInterval: opt.intvl,
- Processes: []string{},
- EnableServer: nil,
- stateRepCh: make(chan stateReport, 8),
- omciIn: make(chan openolt.OmciIndication, 1024),
- omciOut: make(chan openolt.OmciMsg, 1024),
- eapolIn: make(chan *byteMsg, 1024),
- eapolOut: make(chan *byteMsg, 1024),
- dhcpIn: make(chan *byteMsg, 1024),
- dhcpOut: make(chan *byteMsg, 1024),
+ if npon > MaxPonPorts {
+ logger.Warn("Provided number of PON ports exceeds limit of %d", MaxPonPorts)
+ logger.Info("Setting number of PON ports to %d", MaxPonPorts)
+ npon = MaxPonPorts
}
+ nonus := opt.nonus
+ if nonus > MaxOnusPerPon {
+ logger.Warn("Provided number of ONUs per PON port exceeds limit of %d", MaxOnusPerPon)
+ logger.Info("Setting number of ONUs per PON port to %d", MaxOnusPerPon)
+ nonus = MaxOnusPerPon
+ }
+ s := Server{
+ Olt: device.NewOlt(oltid, npon, 1), // TODO nnni is to be taken from options
+ Onumap: make(map[uint32][]*device.Onu),
+ Ioinfos: []*Ioinfo{},
+ gRPCAddress: opt.address,
+ gRPCPort: opt.port,
+ Vethnames: []string{},
+ IndInterval: opt.intvl,
+ AutoONUActivate: !opt.interactiveOnuActivation,
+ Processes: []string{},
+ mgmtGrpcPort: opt.mgmtGrpcPort,
+ mgmtRestPort: opt.mgmtRestPort,
+ EnableServer: nil,
+ stateRepCh: make(chan stateReport, 8),
+ omciIn: make(chan openolt.OmciIndication, 1024),
+ omciOut: make(chan openolt.OmciMsg, 1024),
+ eapolIn: make(chan *byteMsg, 1024),
+ eapolOut: make(chan *byteMsg, 1024),
+ dhcpIn: make(chan *byteMsg, 1024),
+ dhcpOut: make(chan *byteMsg, 1024),
+ FlowMap: make(map[FlowKey]*openolt.Flow),
+ serverActionCh: make(chan string),
+ }
+ logger.Info("OLT %d created: %v", s.Olt.ID, s.Olt)
nnni := s.Olt.NumNniIntf
logger.Info("OLT ID: %d was retrieved.", s.Olt.ID)
- for intfid := nnni; intfid < npon+nnni; intfid++ {
+ logger.Info("OLT Serial-Number: %v", s.Olt.SerialNumber)
+ // Creating Onu Map
+ for intfid := uint32(0); intfid < npon; intfid++ {
s.Onumap[intfid] = device.NewOnus(oltid, intfid, nonus, nnni)
}
- //TODO: To be fixed because it is hardcoded
+ logger.Debug("Onu Map:")
+ for _, onus := range s.Onumap {
+ for _, onu := range onus {
+ logger.Debug("%+v", *onu)
+ }
+ }
+
+ // TODO: To be fixed because it is hardcoded
s.CtagMap = make(map[string]uint32)
- for i := 0; i < MAX_ONUS_PER_PON; i++ {
+ for i := 0; i < MaxOnusPerPon; i++ {
oltid := s.Olt.ID
intfid := uint32(1)
- sn := convB2S(device.NewSN(oltid, intfid, uint32(i)))
+ sn := utils.ConvB2S(device.NewSN(oltid, intfid, uint32(i)))
s.CtagMap[sn] = uint32(900 + i) // This is hard coded for BBWF
}
+
+ flowHandler.InitializeFlowManager(s.Olt.ID)
return &s
}
-// Start starts the BBSim and openolt gRPC servers (blocking)
+// Start starts the openolt gRPC server (blocking)
func (s *Server) Start() error {
- s.wg = &sync.WaitGroup{}
- logger.Debug("Start() Start")
+ logger.Debug("Starting OpenOLT gRPC Server")
defer func() {
- close(s.stateRepCh)
- logger.Debug("Start() Done")
+ logger.Debug("OpenOLT gRPC Server Stopped")
}()
+
+ // Start Openolt gRPC server
addressport := s.gRPCAddress + ":" + strconv.Itoa(int(s.gRPCPort))
listener, gserver, err := NewGrpcServer(addressport)
if err != nil {
@@ -145,32 +197,67 @@
logger.Error("Failed to run gRPC server: %v", err)
return err
}
- s.wg.Wait()
return nil
}
-// Stop stops the BBSim and openolt gRPC servers (non-blocking).
+// Stop stops the openolt gRPC servers (non-blocking).
func (s *Server) Stop() {
- logger.Debug("Stop() Start")
- defer logger.Debug("Stop() Done")
+ logger.Debug("Stopping OpenOLT gRPC Server & PktLoops")
+ defer logger.Debug("OpenOLT gRPC Server & PktLoops Stopped")
+
if s.gRPCserver != nil {
s.gRPCserver.Stop()
logger.Debug("gRPCserver.Stop()")
}
+
s.StopPktLoops()
return
}
+func (s *Server) startMgmtServer(wg *sync.WaitGroup) {
+ defer logger.Debug("Management api server exited")
+
+ grpcAddressPort := s.gRPCAddress + ":" + strconv.Itoa(int(s.mgmtGrpcPort))
+ restAddressPort := s.gRPCAddress + ":" + strconv.Itoa(int(s.mgmtRestPort))
+ // Start rest gateway for BBSim server
+ go StartRestGatewayService(grpcAddressPort, restAddressPort, wg)
+ addressPort := s.gRPCAddress + ":" + strconv.Itoa(int(s.mgmtGrpcPort))
+
+ listener, apiserver, err := NewMgmtAPIServer(addressPort)
+ if err != nil {
+ logger.Error("Unable to create management api server %v", err)
+ return
+ }
+
+ s.mgmtServer = apiserver
+ pb.RegisterBBSimServiceServer(apiserver, s)
+ if e := apiserver.Serve(listener); e != nil {
+ logger.Error("Failed to run management api server %v", e)
+ return
+ }
+
+}
+
+func (s *Server) stopMgmtServer() error {
+ if s.mgmtServer != nil {
+ s.mgmtServer.GracefulStop()
+ logger.Debug("Management server stopped")
+ return nil
+ }
+ return errors.New("can not stop management server, server not created")
+}
+
// Enable invokes methods for activation of OLT and ONU (blocking)
func (s *Server) Enable(sv *openolt.Openolt_EnableIndicationServer) error {
olt := s.Olt
defer func() {
olt.Initialize()
- for intfid := range s.Onumap {
- for _, onu := range s.Onumap[intfid] {
- onu.Initialize()
- }
- }
+ // Below lines commented as we dont want to change the onu state on restart
+ // for intfid := range s.Onumap {
+ // for _, onu := range s.Onumap[intfid] {
+ // onu.Initialize()
+ // }
+ // }
s.updateDevIntState(olt, device.OLT_INACTIVE)
logger.Debug("Enable() Done")
}()
@@ -185,11 +272,29 @@
coreCtx := context.Background()
coreCtx, corecancel := context.WithCancel(coreCtx)
s.cancel = corecancel
- go s.sendDiscovertoONUs(*sv)
- if err := s.StartPktLoops(coreCtx, *sv); err != nil {
- return err
+ errorchan := make(chan error, 5)
+ go s.StartPktLoops(coreCtx, *sv, errorchan)
+
+ if s.AutoONUActivate == true {
+ // Initialize all ONUs
+ for intfid := range s.Onumap {
+ for _, onu := range s.Onumap[intfid] {
+ onu.Initialize()
+ }
+ }
+ // Activate all ONUs
+ s.activateONUs(*sv, s.Onumap)
}
+
+ select {
+ case err := <-errorchan:
+ if err != nil {
+ logger.Debug("Error: %v", err)
+ return err
+ }
+ }
+
return nil
}
@@ -224,10 +329,28 @@
s.updateDevIntState(onu, state)
return nil
}
-func (s *Server) Activate(onu *device.Onu) error {
- utils.LoggerWithOnu(onu).Info("sending ONUInd Onuid")
- go sendOnuIndtoONU(*s.EnableServer, onu)
- return nil
+
+func (s *Server) activateOnu(onu *device.Onu) {
+ snKey := stringifySerialNumber(onu.SerialNumber)
+ s.SNmap.Store(snKey, onu)
+ device.UpdateOnusOpStatus(onu.IntfID, onu, "up")
+
+ err := sendOnuDiscInd(*s.EnableServer, onu)
+ if err != nil {
+ logger.Error(err.Error())
+ return
+ }
+ logger.Info("OLT id:%d sent ONUDiscInd.", s.Olt.ID)
+ logger.Debug("activateONUs Entry in SNmap %v", snKey)
+}
+
+func (s *Server) activateONUs(stream openolt.Openolt_EnableIndicationServer, Onumap map[uint32][]*device.Onu) {
+ // Add all ONUs to SerialNumber Map
+ for intfid := range Onumap {
+ for _, onu := range Onumap[intfid] {
+ s.activateOnu(onu)
+ }
+ }
}
func (s *Server) activateOLT(stream openolt.Openolt_EnableIndicationServer) error {
@@ -249,37 +372,16 @@
logger.Info("OLT %s sent IntfInd.", olt.Name)
// OLT sends Operation Indication to Adapter after activating each interface
- //time.Sleep(IF_UP_TIME * time.Second)
if err := sendOperInd(stream, olt); err != nil {
logger.Error("Fail to sendOperInd: %v", err)
return err
}
logger.Info("OLT %s sent OperInd.", olt.Name)
-
return nil
}
-func (s *Server) sendDiscovertoONUs(stream openolt.Openolt_EnableIndicationServer) {
- // OLT sends ONU Discover Indication to Adapter after ONU discovery
- for intfid := range s.Onumap {
- device.UpdateOnusOpStatus(intfid, s.Onumap[intfid], "up")
- }
-
- // Initialize all ONUs
- for intfid := range s.Onumap {
- for _, onu := range s.Onumap[intfid] {
- onu.Initialize()
- }
- }
-
- // Send discovery indication for all ONUs
- for intfid, _ := range s.Onumap {
- sendOnuDiscInd(stream, s.Onumap[intfid], s.IndInterval)
- logger.Info("OLT sent ONUDiscInd for intfId:%d.", intfid)
- }
-}
// StartPktLoops creates veth pairs and invokes runPktLoops (blocking)
-func (s *Server) StartPktLoops(ctx context.Context, stream openolt.Openolt_EnableIndicationServer) error {
+func (s *Server) StartPktLoops(ctx context.Context, stream openolt.Openolt_EnableIndicationServer, errorchan chan error) {
logger.Debug("StartPktLoops () Start")
defer func() {
RemoveVeths(s.Vethnames)
@@ -289,11 +391,14 @@
s.updateDevIntState(s.Olt, device.OLT_PREACTIVE)
logger.Debug("StartPktLoops () Done")
}()
+ s.alarmCh = make(chan *openolt.Indication, 10)
+ go startAlarmLoop(stream, s.alarmCh)
+ go s.startDeviceActionLoop()
s.wg.Add(1)
ioinfos, veths, err := createIoinfos(s.Olt.ID, s.Vethnames)
if err != nil {
logger.Error("createIoinfos failed: %v", err)
- return err
+ errorchan <- err
}
s.Ioinfos = ioinfos
s.Vethnames = veths
@@ -305,9 +410,9 @@
if err = s.runPktLoops(child, stream); err != nil {
logger.Error("runPktLoops failed: %v", err)
- return err
+ errorchan <- err
}
- return nil
+ errorchan <- nil
}
// StopPktLoops (non-blocking)
@@ -335,13 +440,13 @@
return ioinfos, Vethnames, nil
}
-//Blocking
+// Blocking
func (s *Server) runPktLoops(ctx context.Context, stream openolt.Openolt_EnableIndicationServer) error {
logger.Debug("runPacketPktLoops Start")
defer logger.Debug("runPacketLoops Done")
errchOmci := make(chan error)
- RunOmciResponder(ctx, s.omciOut, s.omciIn, errchOmci)
+ s.RunOmciResponder(ctx, s.omciOut, s.omciIn, errchOmci)
eg, child := errgroup.WithContext(ctx)
child, cancel := context.WithCancel(child)
@@ -356,7 +461,7 @@
defer logger.Debug("runOMCIResponder Done")
select {
case v, ok := <-errchOmci: // Wait for OmciInitialization
- if ok { //Error
+ if ok { // Error
logger.Error("Error happend in Omci: %s", v)
return v
}
@@ -371,7 +476,7 @@
defer logger.Debug("runEapolResponder Done")
select {
case v, ok := <-errchEapol:
- if ok { //Error
+ if ok { // Error
logger.Error("Error happend in Eapol:%s", v)
return v
}
@@ -386,7 +491,7 @@
defer logger.Debug("runDhcpResponder Done")
select {
case v, ok := <-errchDhcp:
- if ok { //Error
+ if ok { // Error
logger.Error("Error happend in Dhcp:%s", v)
return v
}
@@ -451,7 +556,7 @@
logger.Error("Fail to send EAPOL PktInd indication. %v", err)
return err
}
- case msg := <-s.dhcpIn: //TODO: We should put omciIn, eapolIn, dhcpIn toghether
+ case msg := <-s.dhcpIn: // TODO: We should put omciIn, eapolIn, dhcpIn toghether
intfid := msg.IntfId
onuid := msg.OnuId
gemid, err := s.getGemPortID(intfid, onuid)
@@ -468,7 +573,7 @@
logger.Error("Failed to GetOnuByID:%d", onuid)
continue
}
- sn := convB2S(onu.SerialNumber.VendorSpecific)
+ sn := utils.ConvB2S(onu.SerialNumber.VendorSpecific)
if ctag, ok := s.CtagMap[sn]; ok == true {
tagpkt, err := PushVLAN(pkt, uint16(ctag), onu)
if err != nil {
@@ -499,7 +604,12 @@
logger.Debug("WARNING: This packet does not come from NNI ")
continue
}
+
+ onuid := nnipkt.Info.onuid
intfid := nnipkt.Info.intfid
+ onu, _ := s.GetOnuByID(onuid, intfid)
+
+ utils.LoggerWithOnu(onu).Info("Received packet from NNI in grpc Server.")
pkt := nnipkt.Pkt
data = &openolt.Indication_PktInd{PktInd: &openolt.PacketIndication{IntfType: "nni", IntfId: intfid, Pkt: pkt.Data()}}
@@ -513,7 +623,6 @@
return nil
}
}
- return nil
}
func (s *Server) onuPacketOut(intfid uint32, onuid uint32, rawpkt gopacket.Packet) error {
@@ -622,7 +731,7 @@
}
}
}
- err := errors.New("No mathced SN is found ")
+ err := errors.New("no matching serial number found")
logger.Error("%s", err)
return nil, err
}
@@ -638,7 +747,7 @@
return onu, nil
}
}
- err := errors.New("No matched OnuID is found ")
+ err := errors.New("no matching OnuID found")
logger.WithFields(log.Fields{
"onumap": onumap,
"onuid": onuid,
@@ -647,10 +756,111 @@
return nil, err
}
-func convB2S(b []byte) string {
- s := ""
- for _, i := range b {
- s = s + strconv.FormatInt(int64(i/16), 16) + strconv.FormatInt(int64(i%16), 16)
+// getOnuFromSNmap method returns onu object from SNmap if found
+func (s *Server) getOnuFromSNmap(serialNumber *openolt.SerialNumber) (*device.Onu, bool) {
+ snkey := stringifySerialNumber(serialNumber)
+
+ logger.Debug("getOnuFromSNmap received serial number %s", snkey)
+
+ if onu, exist := s.SNmap.Load(snkey); exist {
+ logger.Info("Serial number found in map")
+ return onu.(*device.Onu), true
}
- return s
+ logger.Info("Serial number not found in map")
+ return nil, false
+}
+
+func stringifySerialNumber(serialNum *openolt.SerialNumber) string {
+ return string(serialNum.VendorId) + utils.ConvB2S(serialNum.VendorSpecific)
+}
+
+func getOpenoltSerialNumber(SerialNumber string) (*openolt.SerialNumber, error) {
+ if len(SerialNumber) != SerialNumberLength {
+ logger.Error("Invalid serial number %s", SerialNumber)
+ return nil, errors.New("invalid serial number")
+ }
+ // First four characters are vendorId
+ vendorID := SerialNumber[:VendorIDLength]
+ vendorSpecific := SerialNumber[VendorIDLength:]
+
+ vsbyte, _ := hex.DecodeString(vendorSpecific)
+
+ // Convert to Openolt serial number
+ serialNum := new(openolt.SerialNumber)
+ serialNum.VendorId = []byte(vendorID)
+ serialNum.VendorSpecific = vsbyte
+
+ return serialNum, nil
+}
+
+// TODO move to device_onu.go
+func (s *Server) sendOnuIndicationsOnOltReboot() {
+ if AutoONUActivate == 1 {
+ // For auto activate mode, onu indications is sent in Enable()
+ return
+ }
+
+ s.SNmap.Range(
+ func(key, value interface{}) bool {
+ onu := value.(*device.Onu)
+ if onu.InternalState == device.ONU_LOS_RAISED {
+ return true
+ }
+
+ err := sendOnuDiscInd(*s.EnableServer, onu)
+ if err != nil {
+ logger.Error(err.Error())
+ }
+
+ return true
+ })
+}
+
+// StartServerActionLoop reads on server-action channel, and starts and stops the server as per the value received
+func (s *Server) StartServerActionLoop(wg *sync.WaitGroup) {
+ for {
+ select {
+ case Req := <-s.serverActionCh:
+ logger.Debug("Request Received On serverActionCh: %+v", Req)
+ switch Req {
+ case "start":
+ logger.Debug("Server Start Request Received On ServerActionChannel")
+ go s.Start() // blocking
+ case "stop":
+ logger.Debug("Server Stop Request Received On ServerActionChannel")
+ s.Stop()
+ default:
+ logger.Error("Invalid value received in deviceActionCh")
+ }
+ }
+ }
+}
+
+// startDeviceActionLoop reads on the action-channel, and performs onu and olt reboot related actions
+// TODO all onu and olt related actions (like alarms) should be handled using this function
+func (s *Server) startDeviceActionLoop() {
+ logger.Debug("startDeviceActionLoop invoked")
+ s.deviceActionCh = make(chan *pb.DeviceAction, 10)
+ for {
+ logger.Debug("Action channel loop started")
+ select {
+ case Req := <-s.deviceActionCh:
+ logger.Debug("Reboot Action Type: %+v", Req.DeviceAction)
+ switch Req.DeviceType {
+ case DeviceTypeOnu:
+ value, _ := s.SNmap.Load(Req.DeviceSerialNumber)
+ onu := value.(*device.Onu)
+ if Req.DeviceAction == SoftReboot {
+ s.handleONUSoftReboot(onu.IntfID, onu.OnuID)
+ } else if Req.DeviceAction == HardReboot {
+ s.handleONUHardReboot(onu)
+ }
+ case DeviceTypeOlt:
+ logger.Debug("Reboot For OLT Received")
+ s.handleOLTReboot()
+ default:
+ logger.Error("Invalid value received in deviceActionCh")
+ }
+ }
+ }
}