Minor fixes & tweaks to improve error handing in preparation for BBSim REST API pull request.
Added comments to reduce golint warnings.
Main contributors: Pragya Arya, Vishesh Prasidh
Change-Id: I6f0b67a39dd0b8da0288306ac4f66098df53b18d
diff --git a/core/core_server.go b/core/core_server.go
index ba49118..917ed25 100644
--- a/core/core_server.go
+++ b/core/core_server.go
@@ -21,11 +21,9 @@
"errors"
"strconv"
"sync"
-
- omci "github.com/opencord/omci-sim"
-
"reflect"
+ omci "github.com/opencord/omci-sim"
"gerrit.opencord.org/voltha-bbsim/common/logger"
"gerrit.opencord.org/voltha-bbsim/common/utils"
"gerrit.opencord.org/voltha-bbsim/device"
@@ -44,6 +42,7 @@
MAX_ONUS_PER_PON = 64 // This value should be the same with the value in AdapterPlatrorm class
)
+// Server structure consists of all the params required for BBsim.
type Server struct {
wg *sync.WaitGroup
Olt *device.Olt
@@ -84,6 +83,7 @@
next device.DeviceState
}
+// NewCore initialize OLT and ONU objects
func NewCore(opt *option) *Server {
// TODO: make it decent
oltid := opt.oltid
@@ -125,7 +125,7 @@
return &s
}
-//Blocking
+// Start starts the BBSim and openolt gRPC servers (blocking)
func (s *Server) Start() error {
s.wg = &sync.WaitGroup{}
logger.Debug("Start() Start")
@@ -149,7 +149,7 @@
return nil
}
-//Non-Blocking
+// Stop stops the BBSim and openolt gRPC servers (non-blocking).
func (s *Server) Stop() {
logger.Debug("Stop() Start")
defer logger.Debug("Stop() Done")
@@ -161,12 +161,12 @@
return
}
-// Blocking
+// Enable invokes methods for activation of OLT and ONU (blocking)
func (s *Server) Enable(sv *openolt.Openolt_EnableIndicationServer) error {
olt := s.Olt
defer func() {
olt.Initialize()
- for intfid, _ := range s.Onumap {
+ for intfid := range s.Onumap {
for _, onu := range s.Onumap[intfid] {
onu.Initialize()
}
@@ -190,7 +190,7 @@
return nil
}
-//Non-Blocking
+// Disable stops packet loops (non-blocking)
func (s *Server) Disable() {
defer func() {
logger.Debug("Disable() Done")
@@ -249,11 +249,19 @@
logger.Info("OLT %s sent OperInd.", olt.Name)
// OLT sends ONU Discover Indication to Adapter after ONU discovery
- for intfid, _ := range s.Onumap {
+ for intfid := range s.Onumap {
device.UpdateOnusOpStatus(intfid, s.Onumap[intfid], "up")
}
- for intfid, _ := range s.Onumap {
+ // Initialize all ONUs
+ for intfid := range s.Onumap {
+ for _, onu := range s.Onumap[intfid] {
+ onu.Initialize()
+ }
+ }
+
+ // Send discovery indication for all ONUs
+ for intfid := range s.Onumap {
sendOnuDiscInd(stream, s.Onumap[intfid])
logger.Info("OLT id:%d sent ONUDiscInd.", olt.ID)
}
@@ -266,14 +274,14 @@
}
}
- for intfid, _ := range s.Onumap {
+ for intfid := range s.Onumap {
sendOnuInd(stream, s.Onumap[intfid], s.IndInterval)
logger.Info("OLT id:%d sent ONUInd.", olt.ID)
}
return nil
}
-// Blocking
+// StartPktLoops creates veth pairs and invokes runPktLoops (blocking)
func (s *Server) StartPktLoops(ctx context.Context, stream openolt.Openolt_EnableIndicationServer) error {
logger.Debug("StartPktLoops () Start")
defer func() {
@@ -305,7 +313,7 @@
return nil
}
-//Non-Blocking
+// StopPktLoops (non-blocking)
func (s *Server) StopPktLoops() {
if s.cancel != nil {
cancel := s.cancel
@@ -496,7 +504,11 @@
}
onuid := nnipkt.Info.onuid
intfid := nnipkt.Info.intfid
- onu, _ := s.GetOnuByID(onuid, intfid)
+ onu, err := s.GetOnuByID(onuid, intfid)
+ if err != nil {
+ logger.Error("Failed processing NNI packet: %v", err)
+ continue
+ }
utils.LoggerWithOnu(onu).Info("Received packet from NNI in grpc Server.")
@@ -517,7 +529,11 @@
func (s *Server) onuPacketOut(intfid uint32, onuid uint32, rawpkt gopacket.Packet) error {
layerEth := rawpkt.Layer(layers.LayerTypeEthernet)
- onu, _ := s.GetOnuByID(onuid, intfid)
+ onu, err := s.GetOnuByID(onuid, intfid)
+ if err != nil {
+ logger.Error("Failed processing onuPacketOut: %v", err)
+ return err
+ }
if layerEth != nil {
pkt, _ := layerEth.(*layers.Ethernet)
@@ -571,6 +587,7 @@
return nil
}
+// IsAllOnuActive checks for ONU_ACTIVE state for all the onus in the map
func IsAllOnuActive(onumap map[uint32][]*device.Onu) bool {
for _, onus := range onumap {
for _, onu := range onus {
@@ -598,13 +615,13 @@
gemportid, err := omci.GetGemPortId(intfid, onuid)
if err != nil {
logger.Warn("Failed to getGemPortID from OMCI lib: %s", err)
+ onu, err := s.GetOnuByID(onuid, intfid)
+ if err != nil {
+ logger.Error("Failed to getGemPortID: %s", err)
+ return 0, err
+ }
+ gemportid = onu.GemportID
}
- onu, err := s.GetOnuByID(onuid, intfid)
- if err != nil {
- logger.Error("Failed to getGemPortID: %s", err)
- return 0, err
- }
- gemportid = onu.GemportID
return uint32(gemportid), nil
}
@@ -621,6 +638,7 @@
return nil, err
}
+// GetOnuByID returns ONU object as per onuID and intfID
func (s *Server) GetOnuByID(onuid uint32, intfid uint32) (*device.Onu, error) {
return getOnuByID(s.Onumap, onuid, intfid)
}
@@ -635,6 +653,7 @@
logger.WithFields(log.Fields{
"onumap": onumap,
"onuid": onuid,
+ "intfid": intfid,
}).Error(err)
return nil, err
}
diff --git a/core/eapol.go b/core/eapol.go
index 8dc7e7b..85a9f07 100644
--- a/core/eapol.go
+++ b/core/eapol.go
@@ -72,6 +72,7 @@
return resp
}
+//RunEapolResponder starts go routine which processes and responds for received eapol messages
func RunEapolResponder(ctx context.Context, eapolOut chan *byteMsg, eapolIn chan *byteMsg, errch chan error) {
responder := getEAPResponder()
responder.eapolIn = eapolIn
diff --git a/core/grpc_service.go b/core/grpc_service.go
index bc92483..d088e05 100644
--- a/core/grpc_service.go
+++ b/core/grpc_service.go
@@ -18,7 +18,6 @@
import (
"net"
- "strconv"
"gerrit.opencord.org/voltha-bbsim/common/logger"
"gerrit.opencord.org/voltha-bbsim/common/utils"
@@ -30,9 +29,11 @@
log "github.com/sirupsen/logrus"
"golang.org/x/net/context"
"google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
)
-// gRPC Service
+// DisableOlt method sends OLT down indication
func (s *Server) DisableOlt(c context.Context, empty *openolt.Empty) (*openolt.Empty, error) {
logger.Debug("OLT receives DisableOLT()")
if s.EnableServer != nil {
@@ -44,16 +45,26 @@
return new(openolt.Empty), nil
}
+// ReenableOlt method sends OLT up indication for re-enabling OLT
func (s *Server) ReenableOlt(c context.Context, empty *openolt.Empty) (*openolt.Empty, error) {
logger.Debug("OLT receives Reenable()")
+ if s.EnableServer != nil {
+ if err := sendOltIndUp(*s.EnableServer, s.Olt); err != nil {
+ logger.Error("Failed to send OLT UP indication for reenable OLT: %v", err)
+ return new(openolt.Empty), err
+ }
+ logger.Debug("Successfuly sent OLT UP indication")
+ }
return new(openolt.Empty), nil
}
+// CollectStatistics method invoked by VOLTHA to get OLT statistics
func (s *Server) CollectStatistics(c context.Context, empty *openolt.Empty) (*openolt.Empty, error) {
logger.Debug("OLT receives CollectStatistics()")
return new(openolt.Empty), nil
}
+// GetDeviceInfo returns OLT info
func (s *Server) GetDeviceInfo(c context.Context, empty *openolt.Empty) (*openolt.DeviceInfo, error) {
logger.Debug("OLT receives GetDeviceInfo()")
devinfo := new(openolt.DeviceInfo)
@@ -71,56 +82,88 @@
devinfo.GemportIdEnd = 65535
devinfo.FlowIdStart = 1
devinfo.FlowIdEnd = 16383
- devinfo.DeviceSerialNumber = "BBSIMOLT00"+strconv.FormatInt(int64(s.Olt.ID), 10)
-
+ devinfo.DeviceSerialNumber = s.Olt.SerialNumber
return devinfo, nil
}
+// ActivateOnu method handles ONU activation request from VOLTHA
func (s *Server) ActivateOnu(c context.Context, onu *openolt.Onu) (*openolt.Empty, error) {
logger.Debug("OLT receives ActivateONU()")
- result := device.ValidateONU(*onu, s.Onumap)
- if result == true {
- matched, error := getOnuBySN(s.Onumap, onu.SerialNumber)
- if error != nil {
- logger.Fatal("%s", error)
- }
- onuid := onu.OnuId
- matched.OnuID = onuid
- s.updateDevIntState(matched, device.ONU_ACTIVE)
- logger.Debug("ONU IntfID: %d OnuID: %d activated succesufully.", onu.IntfId, onu.OnuId)
+
+ matched, err := getOnuBySN(s.Onumap, onu.SerialNumber)
+ if err != nil {
+ logger.Fatal("%s", err)
+ return new(openolt.Empty), status.Errorf(codes.NotFound, "ONU not found with serial number %v", onu.SerialNumber)
}
+ onuid := onu.OnuId
+ matched.OnuID = onuid
+ s.updateDevIntState(matched, device.ONU_ACTIVE)
+ logger.Debug("ONU IntfID: %d OnuID: %d activated succesufully.", onu.IntfId, onu.OnuId)
+
return new(openolt.Empty), nil
}
+// CreateTconts method should handle Tcont creation
func (s *Server) CreateTconts(c context.Context, tconts *openolt.Tconts) (*openolt.Empty, error) {
logger.Debug("OLT receives CreateTconts()")
return new(openolt.Empty), nil
}
+// RemoveTconts method should handle t-cont removal
func (s *Server) RemoveTconts(c context.Context, tconts *openolt.Tconts) (*openolt.Empty, error) {
logger.Debug("OLT receives RemoveTconts()")
return new(openolt.Empty), nil
}
+// DeactivateOnu method should handle ONU deactivation
func (s *Server) DeactivateOnu(c context.Context, onu *openolt.Onu) (*openolt.Empty, error) {
logger.Debug("OLT receives DeactivateONU()")
return new(openolt.Empty), nil
}
+// DeleteOnu handles ONU deletion request from VOLTHA
func (s *Server) DeleteOnu(c context.Context, onu *openolt.Onu) (*openolt.Empty, error) {
logger.Debug("OLT receives DeleteONU()")
+ Onu, err := s.GetOnuByID(onu.OnuId, onu.IntfId)
+ if err != nil {
+ return new(openolt.Empty), err
+ }
+
+ // Mark ONU internal state as ONU_FREE and reset onuID
+ Onu.InternalState = device.ONU_FREE
+ Onu.OnuID = 0
+
return new(openolt.Empty), nil
}
+// OmciMsgOut receives OMCI messages from voltha
func (s *Server) OmciMsgOut(c context.Context, msg *openolt.OmciMsg) (*openolt.Empty, error) {
logger.Debug("OLT %d receives OmciMsgOut to IF %v (ONU-ID: %v) pkt:%x.", s.Olt.ID, msg.IntfId, msg.OnuId, msg.Pkt)
+ // Get ONU state
+ onu, err := s.GetOnuByID(msg.OnuId, msg.IntfId)
+ if err != nil {
+ logger.Error("ONU not found intfID %d, onuID %d", msg.IntfId, msg.OnuId)
+ return new(openolt.Empty), err
+ }
+ state := onu.GetIntState()
+ logger.Debug("ONU-ID: %v, ONU state: %d", msg.OnuId, state)
+
+ // If ONU is ONU_INACTIVE or ONU_FREE do not send omci response
+ if state == device.ONU_INACTIVE || state == device.ONU_FREE {
+ logger.Info("ONU (IF %v ONU-ID: %v) is not ACTIVE, so not processing OmciMsg", msg.IntfId, msg.OnuId)
+ return new(openolt.Empty), nil
+ }
s.omciOut <- *msg
return new(openolt.Empty), nil
}
func (s *Server) OnuPacketOut(c context.Context, packet *openolt.OnuPacket) (*openolt.Empty, error) {
- onu, _ := s.GetOnuByID(packet.OnuId, packet.IntfId)
+ onu, err := s.GetOnuByID(packet.OnuId, packet.IntfId)
+ if err != nil {
+ logger.Error("Failed in OnuPacketOut, %v", err)
+ return new(openolt.Empty), err
+ }
utils.LoggerWithOnu(onu).Debugf("OLT %d receives OnuPacketOut () to IF-ID:%d ONU-ID %d.", s.Olt.ID, packet.IntfId, packet.OnuId)
onuid := packet.OnuId
intfid := packet.IntfId
@@ -141,6 +184,7 @@
return new(openolt.Empty), nil
}
+// FlowAdd method should handle flows addition to datapath for OLT and ONU
func (s *Server) FlowAdd(c context.Context, flow *openolt.Flow) (*openolt.Empty, error) {
logger.Debug("OLT %d receives FlowAdd() IntfID:%d OnuID:%d EType:%x GemPortID:%d", s.Olt.ID, flow.AccessIntfId, flow.OnuId, flow.Classifier.EthType, flow.GemportId)
onu, err := s.GetOnuByID(uint32(flow.OnuId), uint32(flow.AccessIntfId))
@@ -166,6 +210,7 @@
return new(openolt.Empty), nil
}
+// FlowRemove should handle flow deletion from datapath
func (s *Server) FlowRemove(c context.Context, flow *openolt.Flow) (*openolt.Empty, error) {
onu, _ := s.GetOnuByID(uint32(flow.OnuId), uint32(flow.AccessIntfId))
@@ -194,6 +239,7 @@
return new(openolt.Empty), nil
}
+// Reboot method handles reboot of OLT
func (s *Server) Reboot(c context.Context, empty *openolt.Empty) (*openolt.Empty, error) {
logger.Debug("OLT %d receives Reboot ().", s.Olt.ID)
// Initialize OLT & Env
@@ -202,6 +248,7 @@
return new(openolt.Empty), nil
}
+// EnableIndication starts sending indications for OLT and ONU
func (s *Server) EnableIndication(empty *openolt.Empty, stream openolt.Openolt_EnableIndicationServer) error {
logger.Debug("OLT receives EnableInd.")
defer func() {
@@ -214,6 +261,7 @@
return nil
}
+// NewGrpcServer starts openolt gRPC server
func NewGrpcServer(addrport string) (l net.Listener, g *grpc.Server, e error) {
logger.Debug("Listening %s ...", addrport)
g = grpc.NewServer()
diff --git a/core/mediator.go b/core/mediator.go
index 5565715..4e8833a 100644
--- a/core/mediator.go
+++ b/core/mediator.go
@@ -46,6 +46,7 @@
Debuglvl string
}
+// GetOptions receives command line options and stores them in option structure
func GetOptions() *option {
o := new(option)
addressport := flag.String("H", ":50060", "IP address:port")
@@ -87,6 +88,7 @@
testmanager *TestManager
}
+// NewMediator returns a new mediator object
func NewMediator(o *option) *mediator {
m := new(mediator)
m.opt = o
@@ -100,6 +102,7 @@
return m
}
+// Start mediator
func (m *mediator) Start() {
var wg sync.WaitGroup
opt := m.opt
@@ -140,6 +143,7 @@
logger.Debug("Reach to the end line")
}
+// Mediate method is invoked on OLT and ONU state change
func (m *mediator) Mediate() {
defer logger.Debug("Mediate Done")
for sr := range m.server.stateRepCh {
@@ -180,8 +184,7 @@
if err := tm.StartTester(key, t); err != nil {
logger.Error("Cannot Start Executer error:%v", err)
}
- } else if (current == device.ONU_OMCIACTIVE || current == device.ONU_ACTIVE) &&
- next == device.ONU_INACTIVE {
+ } else if current == device.ONU_OMCIACTIVE && next == device.ONU_INACTIVE {
if err := tm.StopTester(key); err != nil {
logger.Error("Cannot Start Executer error:%v", err)
}
diff --git a/core/omci.go b/core/omci.go
index 433cb57..711bbc7 100644
--- a/core/omci.go
+++ b/core/omci.go
@@ -24,6 +24,7 @@
omci "github.com/opencord/omci-sim"
)
+// RunOmciResponder starts a go routine to process/respond to OMCI messages from VOLTHA
func RunOmciResponder(ctx context.Context, omciOut chan openolt.OmciMsg, omciIn chan openolt.OmciIndication, errch chan error) {
go func() {
defer logger.Debug("Omci response process was done")
@@ -57,8 +58,8 @@
}()
}
+// HexDecode converts the hex encoding to binary
func HexDecode(pkt []byte) []byte {
- // Convert the hex encoding to binary
// TODO - Change openolt adapter to send raw binary instead of hex encoded
p := make([]byte, len(pkt)/2)
for i, j := 0, 0; i < len(pkt); i, j = i+2, j+1 {
diff --git a/core/tester.go b/core/tester.go
index 279d2c8..2349fae 100644
--- a/core/tester.go
+++ b/core/tester.go
@@ -86,7 +86,10 @@
func (tm *TestManager) StartTester (key device.Devkey, t *Tester) error {
logger.Debug("StartTester called with key:%v", key)
if t.Mode == DEFAULT {
- //Empty
+ _, child := errgroup.WithContext(tm.ctx)
+ child, cancel := context.WithCancel(child)
+ t.ctx = child
+ t.cancel = cancel
} else if t.Mode == AAA || t.Mode == BOTH {
eg, child := errgroup.WithContext(tm.ctx)
child, cancel := context.WithCancel(child)
diff --git a/device/device_olt.go b/device/device_olt.go
index 618e668..daffcec 100644
--- a/device/device_olt.go
+++ b/device/device_olt.go
@@ -16,10 +16,14 @@
package device
-import "sync"
+import (
+ "strconv"
+ "sync"
+)
type DeviceState int
+// Device interface provides common methods for OLT and ONU devices
type Device interface {
Initialize()
UpdateIntState(intstate DeviceState)
@@ -32,6 +36,7 @@
Intfid uint32
}
+// Olt structure consists required fields for OLT
type Olt struct {
ID uint32
NumPonIntf uint32
@@ -65,6 +70,7 @@
OLT_ACTIVE // After PacketInDaemon Running
)
+// NewOlt creates and return new Olt object
func NewOlt(oltid uint32, npon uint32, nnni uint32) *Olt {
olt := Olt{}
olt.ID = oltid
@@ -73,6 +79,8 @@
olt.Name = "BBSIM OLT"
olt.InternalState = OLT_INACTIVE
olt.OperState = "up"
+ olt.Manufacture = "BBSIM"
+ olt.SerialNumber = "BBSIMOLT00" + strconv.FormatInt(int64(oltid), 10)
olt.Intfs = make([]intf, olt.NumPonIntf+olt.NumNniIntf)
olt.HeartbeatSignature = oltid
olt.mu = &sync.Mutex{}
@@ -89,6 +97,7 @@
return &olt
}
+// Initialize method initializes NNI and PON ports
func (olt *Olt) Initialize() {
olt.InternalState = OLT_INACTIVE
olt.OperState = "up"
@@ -104,16 +113,19 @@
}
}
+// GetIntState returns internal state of OLT
func (olt *Olt) GetIntState() DeviceState {
olt.mu.Lock()
defer olt.mu.Unlock()
return olt.InternalState
}
+// GetDevkey returns device key of OLT
func (olt *Olt) GetDevkey () Devkey {
return Devkey{ID: olt.ID}
}
+// UpdateIntState method updates OLT internal state
func (olt *Olt) UpdateIntState(intstate DeviceState) {
olt.mu.Lock()
defer olt.mu.Unlock()
diff --git a/device/device_onu.go b/device/device_onu.go
index c8e0c4d..1e1ee88 100644
--- a/device/device_onu.go
+++ b/device/device_onu.go
@@ -29,8 +29,10 @@
ONU_INACTIVE DeviceState = iota //TODO: Each stage name should be more accurate
ONU_ACTIVE
ONU_OMCIACTIVE
+ ONU_FREE
)
+// Onu structure stores information of ONUs
type Onu struct {
InternalState DeviceState
OltID uint32
@@ -42,20 +44,22 @@
mu *sync.Mutex
}
+// NewSN constructs and returns serial number based on the OLT ID, intf ID and ONU ID
func NewSN(oltid uint32, intfid uint32, onuid uint32) []byte {
sn := []byte{0, byte(oltid % 256), byte(intfid), byte(onuid)}
return sn
}
+// NewOnus initializes and returns slice of Onu objects
func NewOnus(oltid uint32, intfid uint32, nonus uint32, nnni uint32) []*Onu {
onus := []*Onu{}
for i := 0; i < int(nonus); i++ {
onu := Onu{}
- onu.InternalState = ONU_INACTIVE
+ onu.InternalState = ONU_FREE
onu.mu = &sync.Mutex{}
onu.IntfID = intfid
onu.OltID = oltid
- onu.OperState = "up"
+ onu.OperState = "down"
onu.SerialNumber = new(openolt.SerialNumber)
onu.SerialNumber.VendorId = []byte("BBSM")
onu.SerialNumber.VendorSpecific = NewSN(oltid, intfid, uint32(i))
@@ -65,11 +69,13 @@
return onus
}
+// Initialize method initializes ONU state to up and ONU_INACTIVE
func (onu *Onu) Initialize() {
onu.OperState = "up"
onu.InternalState = ONU_INACTIVE
}
+// ValidateONU method validate ONU based on the serial number in onuMap
func ValidateONU(targetonu openolt.Onu, regonus map[uint32][]*Onu) bool {
for _, onus := range regonus {
for _, onu := range onus {
@@ -81,13 +87,15 @@
return false
}
+// ValidateSN compares two serial numbers and returns result as true/false
func ValidateSN(sn1 openolt.SerialNumber, sn2 openolt.SerialNumber) bool {
return reflect.DeepEqual(sn1.VendorId, sn2.VendorId) && reflect.DeepEqual(sn1.VendorSpecific, sn2.VendorSpecific)
}
+// UpdateOnusOpStatus method updates ONU oper status
func UpdateOnusOpStatus(ponif uint32, onus []*Onu, opstatus string) {
for _, onu := range onus {
- onu.OperState = "up"
+ onu.OperState = opstatus
logger.WithFields(log.Fields{
"onu": onu.SerialNumber,
"pon_interface": ponif,
@@ -95,16 +103,19 @@
}
}
+// UpdateIntState method updates ONU internal state
func (onu *Onu) UpdateIntState(intstate DeviceState) {
onu.mu.Lock()
defer onu.mu.Unlock()
onu.InternalState = intstate
}
+// GetDevkey returns ONU device key
func (onu *Onu) GetDevkey () Devkey {
return Devkey{ID: onu.OnuID, Intfid:onu.IntfID}
}
+// GetIntState returns ONU internal state
func (onu *Onu) GetIntState() DeviceState {
onu.mu.Lock()
defer onu.mu.Unlock()