VOL-1305 Separation of DHCP/AAA emulation part from OLT/ONUs emulation part
Change-Id: Idd2c6fb9bee7b7dca967b9bd49f6189343d1357f
diff --git a/core/core_server.go b/core/core_server.go
index bb02c17..0e0ea1b 100644
--- a/core/core_server.go
+++ b/core/core_server.go
@@ -17,46 +17,43 @@
package core
import (
+ "context"
"errors"
- "strconv"
- "sync"
- "time"
+ "gerrit.opencord.org/voltha-bbsim/common"
"gerrit.opencord.org/voltha-bbsim/device"
"gerrit.opencord.org/voltha-bbsim/protos"
- "gerrit.opencord.org/voltha-bbsim/common"
- "gerrit.opencord.org/voltha-bbsim/setup"
"github.com/google/gopacket"
"github.com/google/gopacket/layers"
"github.com/google/gopacket/pcap"
"google.golang.org/grpc"
+ "strconv"
+ "sync"
)
-type Mode int
-
-const MAX_ONUS_PER_PON = 64 // This value should be the same with the value in AdapterPlatrorm class
-
const (
- DEFAULT Mode = iota
- AAA
- BOTH
+ UNI_VETH_UP_PFX = "sim_uu"
+ UNI_VETH_DW_PFX = "sim_ud"
+ NNI_VETH_UP_PFX = "sim_nu"
+ NNI_VETH_DW_PFX = "sim_nd"
+ MAX_ONUS_PER_PON = 64 // This value should be the same with the value in AdapterPlatrorm class
)
type Server struct {
+ wg *sync.WaitGroup
Olt *device.Olt
Onumap map[uint32][]*device.Onu
Ioinfos []*Ioinfo
- Endchan chan int
- Mode Mode
- AAAWait int
- DhcpWait int
- DhcpServerIP string
- Delay int
gRPCserver *grpc.Server
- VethEnv []string
- TestFlag bool
+ gRPCAddress string
+ gRPCPort uint32
+ Vethnames []string
+ IndInterval int
Processes []string
EnableServer *openolt.Openolt_EnableIndicationServer
CtagMap map[string]uint32
+ cancel context.CancelFunc
+ state coreState
+ stateChan chan coreState
}
type Packet struct {
@@ -64,51 +61,142 @@
Pkt gopacket.Packet
}
-func (s *Server) Initialize() {
- s.VethEnv = []string{}
- s.Endchan = make(chan int)
- s.TestFlag = false
- s.Processes = []string{}
- s.Ioinfos = []*Ioinfo{}
-}
+type coreState int
-func Create(oltid uint32, npon uint32, nonus uint32, aaawait int, dhcpwait int, ip string, delay int, g *grpc.Server, mode Mode, e chan int) *Server {
- s := new(Server)
- s.Olt = device.CreateOlt(oltid, npon, 1)
+const (
+ INACTIVE = iota // OLT/ONUs are not instantiated
+ PRE_ACTIVE // Before PacketInDaemon Running
+ ACTIVE // After PacketInDaemon Running
+)
+
+/* coreState
+INACTIVE -> PRE_ACTIVE -> ACTIVE
+ (ActivateOLT) (Enable)
+ <- <-
+ */
+
+
+func NewCore(opt *option) (*Server) {
+ // TODO: make it decent
+ oltid := opt.oltid
+ npon := opt.npon
+ nonus := opt.nonus
+ s := Server{
+ Olt: device.NewOlt(oltid, npon, 1),
+ Onumap: make(map[uint32][]*device.Onu),
+ Ioinfos: []*Ioinfo{},
+ gRPCAddress: opt.address,
+ gRPCPort: opt.port,
+ Vethnames: []string{},
+ IndInterval: opt.intvl,
+ Processes: []string{},
+ EnableServer: new(openolt.Openolt_EnableIndicationServer),
+ state: INACTIVE,
+ stateChan: make(chan coreState, 8),
+ }
+
nnni := s.Olt.NumNniIntf
logger.Info("OLT ID: %d was retrieved.\n", s.Olt.ID)
- s.Onumap = make(map[uint32][]*device.Onu)
- s.AAAWait = aaawait
- s.DhcpWait = dhcpwait
- s.DhcpServerIP = ip
- s.gRPCserver = g
- s.Delay = delay
- s.Mode = mode
- s.Endchan = e
- s.VethEnv = []string{}
- s.TestFlag = false
for intfid := nnni; intfid < npon+nnni; intfid++ {
- s.Onumap[intfid] = device.CreateOnus(oltid, intfid, nonus, nnni)
+ s.Onumap[intfid] = device.NewOnus(oltid, intfid, nonus, nnni)
}
- s.EnableServer = new(openolt.Openolt_EnableIndicationServer)
- //TODO: To be fixed
+ //TODO: To be fixed because it is hardcoded
s.CtagMap = make(map[string]uint32)
for i := 0; i < MAX_ONUS_PER_PON; i++ {
oltid := s.Olt.ID
intfid := uint32(1)
- sn := convB2S(device.CreateSN(oltid, intfid, uint32(i)))
+ sn := convB2S(device.NewSN(oltid, intfid, uint32(i)))
s.CtagMap[sn] = uint32(900 + i) // This is hard coded for BBWF
}
- return s
+ return &s
+}
+
+//Blocking
+func (s *Server) Start () error {
+ s.wg = &sync.WaitGroup{}
+ logger.Debug("Start() Start")
+ defer func(){
+ close(s.stateChan)
+ logger.Debug("Start() Done")
+ }()
+ addressport := s.gRPCAddress + ":" + strconv.Itoa(int(s.gRPCPort))
+ listener, gserver, err := NewGrpcServer(addressport)
+ if err != nil {
+ logger.Error("Failed to create gRPC server", err)
+ return err
+ }
+ s.gRPCserver = gserver
+ openolt.RegisterOpenoltServer(gserver, s)
+ if err := gserver.Serve(listener); err != nil {
+ logger.Error("Failed to run gRPC server", err)
+ return err
+ }
+ s.wg.Wait()
+ return nil
+}
+
+//Non-Blocking
+func (s *Server) Stop () {
+ logger.Debug("Stop() Start")
+ defer logger.Debug("Stop() Done")
+ if s.gRPCserver != nil {
+ s.gRPCserver.Stop()
+ logger.Debug("gRPCserver.Stop()")
+ }
+ s.StopPktInDaemon()
+ return
+}
+
+// Blocking
+func (s *Server) Enable(sv *openolt.Openolt_EnableIndicationServer) error {
+ defer func(){
+ olt := s.Olt
+ olt.InitializeStatus()
+ for intfid, _ := range s.Onumap {
+ for _, onu := range s.Onumap[intfid] {
+ onu.InitializeStatus()
+ }
+ }
+ s.updateState(INACTIVE)
+ logger.Debug("Enable() Done")
+ }()
+ logger.Debug("Enable() Start")
+ s.EnableServer = sv
+ if err := s.activateOLT(*sv); err != nil {
+ return err
+ }
+ s.updateState(PRE_ACTIVE)
+
+ coreCtx := context.Background()
+ coreCtx, corecancel := context.WithCancel(coreCtx)
+ s.cancel = corecancel
+ if err := s.StartPktInDaemon(coreCtx, *sv) ; err != nil {
+ return err
+ }
+ return nil
+}
+
+//Non-Blocking
+func (s *Server) Disable() {
+ defer func(){
+ logger.Debug("Disable() Done")
+ }()
+ logger.Debug("Disable() Start")
+ s.StopPktInDaemon()
+}
+
+func (s *Server) updateState(state coreState){
+ s.state = state
+ s.stateChan <- state
+ logger.Debug("State updated to:%d", state)
}
func (s *Server) activateOLT(stream openolt.Openolt_EnableIndicationServer) error {
+ defer logger.Debug("activateOLT() Done")
+ logger.Debug("activateOLT() Start")
// Activate OLT
olt := s.Olt
- oltid := olt.ID
- wg := &sync.WaitGroup{}
-
if err := sendOltIndUp(stream, olt); err != nil {
return err
}
@@ -144,87 +232,59 @@
// OLT Sends OnuInd after waiting all of those ONUs up
for {
- if s.IsAllOnuActive(s.Onumap) {
+ if IsAllOnuActive(s.Onumap) {
+ logger.Debug("All the Onus are Activated.")
break
}
}
for intfid, _ := range s.Onumap {
- sendOnuInd(stream, s.Onumap[intfid], s.Delay)
+ sendOnuInd(stream, s.Onumap[intfid], s.IndInterval)
logger.Info("OLT id:%d sent ONUInd.\n", olt.ID)
}
-
- if s.Mode == DEFAULT {
- //EnableIndication's stream should be kept even after activateOLT() is finished.
- //Otherwise, OpenOLT adapter sends EnableIndication again.
- <-s.Endchan
- logger.Debug("core server thread receives close ")
- } else if s.Mode == AAA || s.Mode == BOTH {
- s.TestFlag = true
- var err error
- s.Ioinfos, s.VethEnv, err = createIoinfos(oltid, s.VethEnv, s.Onumap)
- logger.Debug("s.VethEnv:%v", s.VethEnv)
- if err != nil {
- return err
- }
-
- errchan := make(chan error)
- go func() {
- <-errchan
- close(s.Endchan)
- }()
-
- wg.Add(1)
- go func() {
- defer func() {
- logger.Debug("runPacketInDaemon Done")
- wg.Done()
- }()
-
- err := s.runPacketInDaemon(stream)
- if err != nil {
- errchan <- err
- return
- }
- }()
-
- wg.Add(1)
- go func() {
- defer func() {
- logger.Debug("exeAAATest Done")
- wg.Done()
- }()
-
- err = s.exeAAATest()
- if err != nil {
- errchan <- err
- return
- }
-
- if s.Mode == BOTH {
- go func() {
- defer func() {
- logger.Debug("exeDHCPTest Done")
- }()
-
- err := s.exeDHCPTest()
- if err != nil {
- errchan <- err
- return
- }
- }()
- }
- }()
- wg.Wait()
- cleanUpVeths(s.VethEnv) // Grace teardown
- pnames := s.Processes
- killProcesses(pnames)
- logger.Debug("Grace shutdown down")
- }
return nil
}
-func createIoinfos(oltid uint32, vethenv []string, onumap map[uint32][]*device.Onu) ([]*Ioinfo, []string, error) {
+
+// Blocking
+func (s *Server) StartPktInDaemon(ctx context.Context, stream openolt.Openolt_EnableIndicationServer) error {
+ logger.Debug("StartPktInDaemon() Start")
+ defer func(){
+ RemoveVeths(s.Vethnames)
+ s.Vethnames = []string{}
+ s.Ioinfos = []*Ioinfo{}
+ s.wg.Done()
+ s.updateState(PRE_ACTIVE)
+ logger.Debug("StartPktInDaemon() Done")
+ }()
+ s.wg.Add(1)
+ ioinfos, veths, err := createIoinfos(s.Olt.ID, s.Vethnames, s.Onumap)
+ if err != nil {
+ return err
+ }
+ s.Ioinfos = ioinfos
+ s.Vethnames = veths
+ logger.Debug("Created vethnames:%v", s.Vethnames)
+
+ parent := ctx
+ child, cancel := context.WithCancel(parent)
+ s.cancel = cancel
+
+ if err = s.runPacketInDaemon(child, stream); err != nil {
+ return err
+ }
+ return nil
+}
+
+//Non-Blocking
+func (s *Server) StopPktInDaemon() {
+ if s.cancel != nil {
+ cancel := s.cancel
+ cancel()
+ }
+}
+
+func createIoinfos(oltid uint32, Vethnames []string, onumap map[uint32][]*device.Onu) ([]*Ioinfo, []string, error) {
ioinfos := []*Ioinfo{}
var err error
for intfid, _ := range onumap {
@@ -232,36 +292,37 @@
var handler *pcap.Handle
onuid := onumap[intfid][i].OnuID
uniup, unidw := makeUniName(oltid, intfid, onuid)
- if handler, vethenv, err = setupVethHandler(uniup, unidw, vethenv); err != nil {
- return ioinfos, vethenv, err
+ if handler, Vethnames, err = setupVethHandler(uniup, unidw, Vethnames); err != nil {
+ return ioinfos, Vethnames, err
}
- iinfo := Ioinfo{name: uniup, iotype: "uni", ioloc: "inside", intfid: intfid, onuid: onuid, handler: handler}
+ iinfo := Ioinfo{Name: uniup, iotype: "uni", ioloc: "inside", intfid: intfid, onuid: onuid, handler: handler}
ioinfos = append(ioinfos, &iinfo)
- oinfo := Ioinfo{name: unidw, iotype: "uni", ioloc: "outside", intfid: intfid, onuid: onuid, handler: nil}
+ oinfo := Ioinfo{Name: unidw, iotype: "uni", ioloc: "outside", intfid: intfid, onuid: onuid, handler: nil}
ioinfos = append(ioinfos, &oinfo)
}
}
var handler *pcap.Handle
nniup, nnidw := makeNniName(oltid)
- if handler, vethenv, err = setupVethHandler(nniup, nnidw, vethenv); err != nil {
- return ioinfos, vethenv, err
+ if handler, Vethnames, err = setupVethHandler(nniup, nnidw, Vethnames); err != nil {
+ return ioinfos, Vethnames, err
}
- iinfo := Ioinfo{name: nnidw, iotype: "nni", ioloc: "inside", intfid: 1, handler: handler}
+ iinfo := Ioinfo{Name: nnidw, iotype: "nni", ioloc: "inside", intfid: 1, handler: handler}
ioinfos = append(ioinfos, &iinfo)
- oinfo := Ioinfo{name: nniup, iotype: "nni", ioloc: "outside", intfid: 1, handler: nil}
+ oinfo := Ioinfo{Name: nniup, iotype: "nni", ioloc: "outside", intfid: 1, handler: nil}
ioinfos = append(ioinfos, &oinfo)
- return ioinfos, vethenv, nil
+ return ioinfos, Vethnames, nil
}
-func (s *Server) runPacketInDaemon(stream openolt.Openolt_EnableIndicationServer) error {
+//Blocking
+func (s *Server) runPacketInDaemon(ctx context.Context, stream openolt.Openolt_EnableIndicationServer) error {
logger.Debug("runPacketInDaemon Start")
+ defer logger.Debug("runPacketInDaemon Done")
unichannel := make(chan Packet, 2048)
- flag := false
for intfid, _ := range s.Onumap {
- for _, onu := range s.Onumap[intfid] { //TODO: should be updated for multiple-Interface
+ for _, onu := range s.Onumap[intfid] {
onuid := onu.OnuID
ioinfo, err := s.identifyUniIoinfo("inside", intfid, onuid)
if err != nil {
@@ -269,21 +330,20 @@
return err
}
uhandler := ioinfo.handler
- defer uhandler.Close()
go RecvWorker(ioinfo, uhandler, unichannel)
}
}
- ioinfo, err := s.identifyNniIoinfo("inside")
+ ioinfo, err := s.IdentifyNniIoinfo("inside")
if err != nil {
return err
}
nhandler := ioinfo.handler
- defer nhandler.Close()
nnichannel := make(chan Packet, 32)
go RecvWorker(ioinfo, nhandler, nnichannel)
data := &openolt.Indication_PktInd{}
+ s.updateState(ACTIVE)
for {
select {
case unipkt := <-unichannel:
@@ -309,7 +369,7 @@
logger.Debug("Received upstream packet is DHCP.")
//C-TAG
- onu, _ := s.getOnuByID(onuid)
+ onu, _ := getOnuByID(s.Onumap, onuid)
sn := convB2S(onu.SerialNumber.VendorSpecific)
if ctag, ok := s.CtagMap[sn]; ok == true {
tagpkt, err := PushVLAN(pkt, uint16(ctag))
@@ -348,90 +408,12 @@
return err
}
- case <-s.Endchan:
- if flag == false {
- logger.Debug("PacketInDaemon thread receives close ")
- close(unichannel)
- logger.Debug("Closed unichannel ")
- close(nnichannel)
- logger.Debug("Closed nnichannel ")
- flag = true
- return nil
- }
- }
- }
- return nil
-}
-
-func (s *Server) exeAAATest() error {
- logger.Info("exeAAATest stands by....")
- infos, err := s.getUniIoinfos("outside")
- if err != nil {
- return err
- }
-
- univeths := []string{}
- for _, info := range infos {
- univeths = append(univeths, info.name)
- }
-
- for {
- select {
- case <-s.Endchan:
- logger.Debug("exeAAATest thread receives close ")
- return nil
- case <-time.After(time.Second * time.Duration(s.AAAWait)):
- err = setup.ActivateWPASups(univeths, s.Delay)
- if err != nil {
- return err
- }
- logger.Info("WPA Supplicants are successfully activated ")
- s.Processes = append(s.Processes, "wpa_supplicant")
- logger.Debug("Running Process:%v", s.Processes)
- return nil
- }
- }
- return nil
-}
-
-func (s *Server) exeDHCPTest() error {
- logger.Info("exeDHCPTest stands by....")
- info, err := s.identifyNniIoinfo("outside")
-
- if err != nil {
- return err
- }
-
- err = setup.ActivateDHCPServer(info.name, s.DhcpServerIP)
- if err != nil {
- return err
- }
- s.Processes = append(s.Processes, "dhcpd")
- logger.Debug("Running Process:%v", s.Processes)
-
- infos, err := s.getUniIoinfos("outside")
- if err != nil {
- return err
- }
-
- univeths := []string{}
- for _, info := range infos {
- univeths = append(univeths, info.name)
- }
-
- for {
- select {
- case <-s.Endchan:
- logger.Debug("exeDHCPTest thread receives close ")
- return nil
- case <-time.After(time.Second * time.Duration(s.DhcpWait)):
- err = setup.ActivateDHCPClients(univeths, s.Delay)
- if err != nil {
- return err
- }
- logger.Info("DHCP clients are successfully activated ")
- s.Processes = append(s.Processes, "dhclient")
- logger.Debug("Running Process:%v", s.Processes)
+ case <-ctx.Done():
+ logger.Debug("PacketInDaemon thread receives close ")
+ close(unichannel)
+ logger.Debug("Closed unichannel ")
+ close(nnichannel)
+ logger.Debug("Closed nnichannel ")
return nil
}
}
@@ -473,7 +455,7 @@
logger.Error("%s", err)
return err
}
- ioinfo, err := s.identifyNniIoinfo("inside")
+ ioinfo, err := s.IdentifyNniIoinfo("inside")
if err != nil {
return err
}
@@ -482,8 +464,8 @@
return nil
}
-func (s *Server) IsAllOnuActive(regonus map[uint32][]*device.Onu) bool {
- for _, onus := range regonus {
+func IsAllOnuActive(onumap map[uint32][]*device.Onu) bool {
+ for _, onus := range onumap {
for _, onu := range onus {
if onu.GetIntStatus() != device.ONU_ACTIVATED {
return false
@@ -499,8 +481,8 @@
//return uint32(1032 + 8 * (vid - 1)), nil
}
-func (s *Server) getOnuBySN(sn *openolt.SerialNumber) (*device.Onu, error) {
- for _, onus := range s.Onumap {
+func getOnuBySN(onumap map[uint32][]*device.Onu, sn *openolt.SerialNumber) (*device.Onu, error) {
+ for _, onus := range onumap {
for _, onu := range onus {
if device.ValidateSN(*sn, *onu.SerialNumber) {
return onu, nil
@@ -512,8 +494,8 @@
return nil, err
}
-func (s *Server) getOnuByID(onuid uint32) (*device.Onu, error) {
- for _, onus := range s.Onumap {
+func getOnuByID(onumap map[uint32][]*device.Onu, onuid uint32) (*device.Onu, error) {
+ for _, onus := range onumap {
for _, onu := range onus {
if onu.OnuID == onuid {
return onu, nil
@@ -525,65 +507,6 @@
return nil, err
}
-func makeUniName(oltid uint32, intfid uint32, onuid uint32) (upif string, dwif string) {
- upif = setup.UNI_VETH_UP_PFX + strconv.Itoa(int(oltid)) + "_" + strconv.Itoa(int(intfid)) + "_" + strconv.Itoa(int(onuid))
- dwif = setup.UNI_VETH_DW_PFX + strconv.Itoa(int(oltid)) + "_" + strconv.Itoa(int(intfid)) + "_" + strconv.Itoa(int(onuid))
- return
-}
-
-func makeNniName(oltid uint32) (upif string, dwif string) {
- upif = setup.NNI_VETH_UP_PFX + strconv.Itoa(int(oltid))
- dwif = setup.NNI_VETH_DW_PFX + strconv.Itoa(int(oltid))
- return
-}
-
-func cleanUpVeths(vethenv []string) error {
- if len(vethenv) > 0 {
- logger.Debug("cleanUpVeths called ")
- setup.TearVethDown(vethenv)
- }
- return nil
-}
-
-func killProcesses(pnames []string) error {
- for _, pname := range pnames {
- setup.KillProcess(pname)
- }
- return nil
-}
-
-func setupVethHandler(inveth string, outveth string, vethenv []string) (*pcap.Handle, []string, error) {
- logger.Debug("SetupVethHandler(%s, %s) called ", inveth, outveth)
- err1 := setup.CreateVethPairs(inveth, outveth)
- vethenv = append(vethenv, inveth)
- if err1 != nil {
- setup.RemoveVeths(vethenv)
- return nil, vethenv, err1
- }
- handler, err2 := getVethHandler(inveth)
- if err2 != nil {
- setup.RemoveVeths(vethenv)
- return nil, vethenv, err2
- }
- return handler, vethenv, nil
-}
-
-func getVethHandler(vethname string) (*pcap.Handle, error) {
- var (
- device string = vethname
- snapshot_len int32 = 1518
- promiscuous bool = false
- err error
- timeout time.Duration = pcap.BlockForever
- )
- handle, err := pcap.OpenLive(device, snapshot_len, promiscuous, timeout)
- if err != nil {
- return nil, err
- }
- logger.Debug("Server handle is created for %s\n", vethname)
- return handle, nil
-}
-
func convB2S(b []byte) string {
s := ""
for _, i := range b {