VOL-1305 Separation of DHCP/AAA emulation part from OLT/ONUs emulation part
Change-Id: Idd2c6fb9bee7b7dca967b9bd49f6189343d1357f
diff --git a/core/core_server.go b/core/core_server.go
index bb02c17..0e0ea1b 100644
--- a/core/core_server.go
+++ b/core/core_server.go
@@ -17,46 +17,43 @@
package core
import (
+ "context"
"errors"
- "strconv"
- "sync"
- "time"
+ "gerrit.opencord.org/voltha-bbsim/common"
"gerrit.opencord.org/voltha-bbsim/device"
"gerrit.opencord.org/voltha-bbsim/protos"
- "gerrit.opencord.org/voltha-bbsim/common"
- "gerrit.opencord.org/voltha-bbsim/setup"
"github.com/google/gopacket"
"github.com/google/gopacket/layers"
"github.com/google/gopacket/pcap"
"google.golang.org/grpc"
+ "strconv"
+ "sync"
)
-type Mode int
-
-const MAX_ONUS_PER_PON = 64 // This value should be the same with the value in AdapterPlatrorm class
-
const (
- DEFAULT Mode = iota
- AAA
- BOTH
+ UNI_VETH_UP_PFX = "sim_uu"
+ UNI_VETH_DW_PFX = "sim_ud"
+ NNI_VETH_UP_PFX = "sim_nu"
+ NNI_VETH_DW_PFX = "sim_nd"
+ MAX_ONUS_PER_PON = 64 // This value should be the same with the value in AdapterPlatrorm class
)
type Server struct {
+ wg *sync.WaitGroup
Olt *device.Olt
Onumap map[uint32][]*device.Onu
Ioinfos []*Ioinfo
- Endchan chan int
- Mode Mode
- AAAWait int
- DhcpWait int
- DhcpServerIP string
- Delay int
gRPCserver *grpc.Server
- VethEnv []string
- TestFlag bool
+ gRPCAddress string
+ gRPCPort uint32
+ Vethnames []string
+ IndInterval int
Processes []string
EnableServer *openolt.Openolt_EnableIndicationServer
CtagMap map[string]uint32
+ cancel context.CancelFunc
+ state coreState
+ stateChan chan coreState
}
type Packet struct {
@@ -64,51 +61,142 @@
Pkt gopacket.Packet
}
-func (s *Server) Initialize() {
- s.VethEnv = []string{}
- s.Endchan = make(chan int)
- s.TestFlag = false
- s.Processes = []string{}
- s.Ioinfos = []*Ioinfo{}
-}
+type coreState int
-func Create(oltid uint32, npon uint32, nonus uint32, aaawait int, dhcpwait int, ip string, delay int, g *grpc.Server, mode Mode, e chan int) *Server {
- s := new(Server)
- s.Olt = device.CreateOlt(oltid, npon, 1)
+const (
+ INACTIVE = iota // OLT/ONUs are not instantiated
+ PRE_ACTIVE // Before PacketInDaemon Running
+ ACTIVE // After PacketInDaemon Running
+)
+
+/* coreState
+INACTIVE -> PRE_ACTIVE -> ACTIVE
+ (ActivateOLT) (Enable)
+ <- <-
+ */
+
+
+func NewCore(opt *option) (*Server) {
+ // TODO: make it decent
+ oltid := opt.oltid
+ npon := opt.npon
+ nonus := opt.nonus
+ s := Server{
+ Olt: device.NewOlt(oltid, npon, 1),
+ Onumap: make(map[uint32][]*device.Onu),
+ Ioinfos: []*Ioinfo{},
+ gRPCAddress: opt.address,
+ gRPCPort: opt.port,
+ Vethnames: []string{},
+ IndInterval: opt.intvl,
+ Processes: []string{},
+ EnableServer: new(openolt.Openolt_EnableIndicationServer),
+ state: INACTIVE,
+ stateChan: make(chan coreState, 8),
+ }
+
nnni := s.Olt.NumNniIntf
logger.Info("OLT ID: %d was retrieved.\n", s.Olt.ID)
- s.Onumap = make(map[uint32][]*device.Onu)
- s.AAAWait = aaawait
- s.DhcpWait = dhcpwait
- s.DhcpServerIP = ip
- s.gRPCserver = g
- s.Delay = delay
- s.Mode = mode
- s.Endchan = e
- s.VethEnv = []string{}
- s.TestFlag = false
for intfid := nnni; intfid < npon+nnni; intfid++ {
- s.Onumap[intfid] = device.CreateOnus(oltid, intfid, nonus, nnni)
+ s.Onumap[intfid] = device.NewOnus(oltid, intfid, nonus, nnni)
}
- s.EnableServer = new(openolt.Openolt_EnableIndicationServer)
- //TODO: To be fixed
+ //TODO: To be fixed because it is hardcoded
s.CtagMap = make(map[string]uint32)
for i := 0; i < MAX_ONUS_PER_PON; i++ {
oltid := s.Olt.ID
intfid := uint32(1)
- sn := convB2S(device.CreateSN(oltid, intfid, uint32(i)))
+ sn := convB2S(device.NewSN(oltid, intfid, uint32(i)))
s.CtagMap[sn] = uint32(900 + i) // This is hard coded for BBWF
}
- return s
+ return &s
+}
+
+//Blocking
+func (s *Server) Start () error {
+ s.wg = &sync.WaitGroup{}
+ logger.Debug("Start() Start")
+ defer func(){
+ close(s.stateChan)
+ logger.Debug("Start() Done")
+ }()
+ addressport := s.gRPCAddress + ":" + strconv.Itoa(int(s.gRPCPort))
+ listener, gserver, err := NewGrpcServer(addressport)
+ if err != nil {
+ logger.Error("Failed to create gRPC server", err)
+ return err
+ }
+ s.gRPCserver = gserver
+ openolt.RegisterOpenoltServer(gserver, s)
+ if err := gserver.Serve(listener); err != nil {
+ logger.Error("Failed to run gRPC server", err)
+ return err
+ }
+ s.wg.Wait()
+ return nil
+}
+
+//Non-Blocking
+func (s *Server) Stop () {
+ logger.Debug("Stop() Start")
+ defer logger.Debug("Stop() Done")
+ if s.gRPCserver != nil {
+ s.gRPCserver.Stop()
+ logger.Debug("gRPCserver.Stop()")
+ }
+ s.StopPktInDaemon()
+ return
+}
+
+// Blocking
+func (s *Server) Enable(sv *openolt.Openolt_EnableIndicationServer) error {
+ defer func(){
+ olt := s.Olt
+ olt.InitializeStatus()
+ for intfid, _ := range s.Onumap {
+ for _, onu := range s.Onumap[intfid] {
+ onu.InitializeStatus()
+ }
+ }
+ s.updateState(INACTIVE)
+ logger.Debug("Enable() Done")
+ }()
+ logger.Debug("Enable() Start")
+ s.EnableServer = sv
+ if err := s.activateOLT(*sv); err != nil {
+ return err
+ }
+ s.updateState(PRE_ACTIVE)
+
+ coreCtx := context.Background()
+ coreCtx, corecancel := context.WithCancel(coreCtx)
+ s.cancel = corecancel
+ if err := s.StartPktInDaemon(coreCtx, *sv) ; err != nil {
+ return err
+ }
+ return nil
+}
+
+//Non-Blocking
+func (s *Server) Disable() {
+ defer func(){
+ logger.Debug("Disable() Done")
+ }()
+ logger.Debug("Disable() Start")
+ s.StopPktInDaemon()
+}
+
+func (s *Server) updateState(state coreState){
+ s.state = state
+ s.stateChan <- state
+ logger.Debug("State updated to:%d", state)
}
func (s *Server) activateOLT(stream openolt.Openolt_EnableIndicationServer) error {
+ defer logger.Debug("activateOLT() Done")
+ logger.Debug("activateOLT() Start")
// Activate OLT
olt := s.Olt
- oltid := olt.ID
- wg := &sync.WaitGroup{}
-
if err := sendOltIndUp(stream, olt); err != nil {
return err
}
@@ -144,87 +232,59 @@
// OLT Sends OnuInd after waiting all of those ONUs up
for {
- if s.IsAllOnuActive(s.Onumap) {
+ if IsAllOnuActive(s.Onumap) {
+ logger.Debug("All the Onus are Activated.")
break
}
}
for intfid, _ := range s.Onumap {
- sendOnuInd(stream, s.Onumap[intfid], s.Delay)
+ sendOnuInd(stream, s.Onumap[intfid], s.IndInterval)
logger.Info("OLT id:%d sent ONUInd.\n", olt.ID)
}
-
- if s.Mode == DEFAULT {
- //EnableIndication's stream should be kept even after activateOLT() is finished.
- //Otherwise, OpenOLT adapter sends EnableIndication again.
- <-s.Endchan
- logger.Debug("core server thread receives close ")
- } else if s.Mode == AAA || s.Mode == BOTH {
- s.TestFlag = true
- var err error
- s.Ioinfos, s.VethEnv, err = createIoinfos(oltid, s.VethEnv, s.Onumap)
- logger.Debug("s.VethEnv:%v", s.VethEnv)
- if err != nil {
- return err
- }
-
- errchan := make(chan error)
- go func() {
- <-errchan
- close(s.Endchan)
- }()
-
- wg.Add(1)
- go func() {
- defer func() {
- logger.Debug("runPacketInDaemon Done")
- wg.Done()
- }()
-
- err := s.runPacketInDaemon(stream)
- if err != nil {
- errchan <- err
- return
- }
- }()
-
- wg.Add(1)
- go func() {
- defer func() {
- logger.Debug("exeAAATest Done")
- wg.Done()
- }()
-
- err = s.exeAAATest()
- if err != nil {
- errchan <- err
- return
- }
-
- if s.Mode == BOTH {
- go func() {
- defer func() {
- logger.Debug("exeDHCPTest Done")
- }()
-
- err := s.exeDHCPTest()
- if err != nil {
- errchan <- err
- return
- }
- }()
- }
- }()
- wg.Wait()
- cleanUpVeths(s.VethEnv) // Grace teardown
- pnames := s.Processes
- killProcesses(pnames)
- logger.Debug("Grace shutdown down")
- }
return nil
}
-func createIoinfos(oltid uint32, vethenv []string, onumap map[uint32][]*device.Onu) ([]*Ioinfo, []string, error) {
+
+// Blocking
+func (s *Server) StartPktInDaemon(ctx context.Context, stream openolt.Openolt_EnableIndicationServer) error {
+ logger.Debug("StartPktInDaemon() Start")
+ defer func(){
+ RemoveVeths(s.Vethnames)
+ s.Vethnames = []string{}
+ s.Ioinfos = []*Ioinfo{}
+ s.wg.Done()
+ s.updateState(PRE_ACTIVE)
+ logger.Debug("StartPktInDaemon() Done")
+ }()
+ s.wg.Add(1)
+ ioinfos, veths, err := createIoinfos(s.Olt.ID, s.Vethnames, s.Onumap)
+ if err != nil {
+ return err
+ }
+ s.Ioinfos = ioinfos
+ s.Vethnames = veths
+ logger.Debug("Created vethnames:%v", s.Vethnames)
+
+ parent := ctx
+ child, cancel := context.WithCancel(parent)
+ s.cancel = cancel
+
+ if err = s.runPacketInDaemon(child, stream); err != nil {
+ return err
+ }
+ return nil
+}
+
+//Non-Blocking
+func (s *Server) StopPktInDaemon() {
+ if s.cancel != nil {
+ cancel := s.cancel
+ cancel()
+ }
+}
+
+func createIoinfos(oltid uint32, Vethnames []string, onumap map[uint32][]*device.Onu) ([]*Ioinfo, []string, error) {
ioinfos := []*Ioinfo{}
var err error
for intfid, _ := range onumap {
@@ -232,36 +292,37 @@
var handler *pcap.Handle
onuid := onumap[intfid][i].OnuID
uniup, unidw := makeUniName(oltid, intfid, onuid)
- if handler, vethenv, err = setupVethHandler(uniup, unidw, vethenv); err != nil {
- return ioinfos, vethenv, err
+ if handler, Vethnames, err = setupVethHandler(uniup, unidw, Vethnames); err != nil {
+ return ioinfos, Vethnames, err
}
- iinfo := Ioinfo{name: uniup, iotype: "uni", ioloc: "inside", intfid: intfid, onuid: onuid, handler: handler}
+ iinfo := Ioinfo{Name: uniup, iotype: "uni", ioloc: "inside", intfid: intfid, onuid: onuid, handler: handler}
ioinfos = append(ioinfos, &iinfo)
- oinfo := Ioinfo{name: unidw, iotype: "uni", ioloc: "outside", intfid: intfid, onuid: onuid, handler: nil}
+ oinfo := Ioinfo{Name: unidw, iotype: "uni", ioloc: "outside", intfid: intfid, onuid: onuid, handler: nil}
ioinfos = append(ioinfos, &oinfo)
}
}
var handler *pcap.Handle
nniup, nnidw := makeNniName(oltid)
- if handler, vethenv, err = setupVethHandler(nniup, nnidw, vethenv); err != nil {
- return ioinfos, vethenv, err
+ if handler, Vethnames, err = setupVethHandler(nniup, nnidw, Vethnames); err != nil {
+ return ioinfos, Vethnames, err
}
- iinfo := Ioinfo{name: nnidw, iotype: "nni", ioloc: "inside", intfid: 1, handler: handler}
+ iinfo := Ioinfo{Name: nnidw, iotype: "nni", ioloc: "inside", intfid: 1, handler: handler}
ioinfos = append(ioinfos, &iinfo)
- oinfo := Ioinfo{name: nniup, iotype: "nni", ioloc: "outside", intfid: 1, handler: nil}
+ oinfo := Ioinfo{Name: nniup, iotype: "nni", ioloc: "outside", intfid: 1, handler: nil}
ioinfos = append(ioinfos, &oinfo)
- return ioinfos, vethenv, nil
+ return ioinfos, Vethnames, nil
}
-func (s *Server) runPacketInDaemon(stream openolt.Openolt_EnableIndicationServer) error {
+//Blocking
+func (s *Server) runPacketInDaemon(ctx context.Context, stream openolt.Openolt_EnableIndicationServer) error {
logger.Debug("runPacketInDaemon Start")
+ defer logger.Debug("runPacketInDaemon Done")
unichannel := make(chan Packet, 2048)
- flag := false
for intfid, _ := range s.Onumap {
- for _, onu := range s.Onumap[intfid] { //TODO: should be updated for multiple-Interface
+ for _, onu := range s.Onumap[intfid] {
onuid := onu.OnuID
ioinfo, err := s.identifyUniIoinfo("inside", intfid, onuid)
if err != nil {
@@ -269,21 +330,20 @@
return err
}
uhandler := ioinfo.handler
- defer uhandler.Close()
go RecvWorker(ioinfo, uhandler, unichannel)
}
}
- ioinfo, err := s.identifyNniIoinfo("inside")
+ ioinfo, err := s.IdentifyNniIoinfo("inside")
if err != nil {
return err
}
nhandler := ioinfo.handler
- defer nhandler.Close()
nnichannel := make(chan Packet, 32)
go RecvWorker(ioinfo, nhandler, nnichannel)
data := &openolt.Indication_PktInd{}
+ s.updateState(ACTIVE)
for {
select {
case unipkt := <-unichannel:
@@ -309,7 +369,7 @@
logger.Debug("Received upstream packet is DHCP.")
//C-TAG
- onu, _ := s.getOnuByID(onuid)
+ onu, _ := getOnuByID(s.Onumap, onuid)
sn := convB2S(onu.SerialNumber.VendorSpecific)
if ctag, ok := s.CtagMap[sn]; ok == true {
tagpkt, err := PushVLAN(pkt, uint16(ctag))
@@ -348,90 +408,12 @@
return err
}
- case <-s.Endchan:
- if flag == false {
- logger.Debug("PacketInDaemon thread receives close ")
- close(unichannel)
- logger.Debug("Closed unichannel ")
- close(nnichannel)
- logger.Debug("Closed nnichannel ")
- flag = true
- return nil
- }
- }
- }
- return nil
-}
-
-func (s *Server) exeAAATest() error {
- logger.Info("exeAAATest stands by....")
- infos, err := s.getUniIoinfos("outside")
- if err != nil {
- return err
- }
-
- univeths := []string{}
- for _, info := range infos {
- univeths = append(univeths, info.name)
- }
-
- for {
- select {
- case <-s.Endchan:
- logger.Debug("exeAAATest thread receives close ")
- return nil
- case <-time.After(time.Second * time.Duration(s.AAAWait)):
- err = setup.ActivateWPASups(univeths, s.Delay)
- if err != nil {
- return err
- }
- logger.Info("WPA Supplicants are successfully activated ")
- s.Processes = append(s.Processes, "wpa_supplicant")
- logger.Debug("Running Process:%v", s.Processes)
- return nil
- }
- }
- return nil
-}
-
-func (s *Server) exeDHCPTest() error {
- logger.Info("exeDHCPTest stands by....")
- info, err := s.identifyNniIoinfo("outside")
-
- if err != nil {
- return err
- }
-
- err = setup.ActivateDHCPServer(info.name, s.DhcpServerIP)
- if err != nil {
- return err
- }
- s.Processes = append(s.Processes, "dhcpd")
- logger.Debug("Running Process:%v", s.Processes)
-
- infos, err := s.getUniIoinfos("outside")
- if err != nil {
- return err
- }
-
- univeths := []string{}
- for _, info := range infos {
- univeths = append(univeths, info.name)
- }
-
- for {
- select {
- case <-s.Endchan:
- logger.Debug("exeDHCPTest thread receives close ")
- return nil
- case <-time.After(time.Second * time.Duration(s.DhcpWait)):
- err = setup.ActivateDHCPClients(univeths, s.Delay)
- if err != nil {
- return err
- }
- logger.Info("DHCP clients are successfully activated ")
- s.Processes = append(s.Processes, "dhclient")
- logger.Debug("Running Process:%v", s.Processes)
+ case <-ctx.Done():
+ logger.Debug("PacketInDaemon thread receives close ")
+ close(unichannel)
+ logger.Debug("Closed unichannel ")
+ close(nnichannel)
+ logger.Debug("Closed nnichannel ")
return nil
}
}
@@ -473,7 +455,7 @@
logger.Error("%s", err)
return err
}
- ioinfo, err := s.identifyNniIoinfo("inside")
+ ioinfo, err := s.IdentifyNniIoinfo("inside")
if err != nil {
return err
}
@@ -482,8 +464,8 @@
return nil
}
-func (s *Server) IsAllOnuActive(regonus map[uint32][]*device.Onu) bool {
- for _, onus := range regonus {
+func IsAllOnuActive(onumap map[uint32][]*device.Onu) bool {
+ for _, onus := range onumap {
for _, onu := range onus {
if onu.GetIntStatus() != device.ONU_ACTIVATED {
return false
@@ -499,8 +481,8 @@
//return uint32(1032 + 8 * (vid - 1)), nil
}
-func (s *Server) getOnuBySN(sn *openolt.SerialNumber) (*device.Onu, error) {
- for _, onus := range s.Onumap {
+func getOnuBySN(onumap map[uint32][]*device.Onu, sn *openolt.SerialNumber) (*device.Onu, error) {
+ for _, onus := range onumap {
for _, onu := range onus {
if device.ValidateSN(*sn, *onu.SerialNumber) {
return onu, nil
@@ -512,8 +494,8 @@
return nil, err
}
-func (s *Server) getOnuByID(onuid uint32) (*device.Onu, error) {
- for _, onus := range s.Onumap {
+func getOnuByID(onumap map[uint32][]*device.Onu, onuid uint32) (*device.Onu, error) {
+ for _, onus := range onumap {
for _, onu := range onus {
if onu.OnuID == onuid {
return onu, nil
@@ -525,65 +507,6 @@
return nil, err
}
-func makeUniName(oltid uint32, intfid uint32, onuid uint32) (upif string, dwif string) {
- upif = setup.UNI_VETH_UP_PFX + strconv.Itoa(int(oltid)) + "_" + strconv.Itoa(int(intfid)) + "_" + strconv.Itoa(int(onuid))
- dwif = setup.UNI_VETH_DW_PFX + strconv.Itoa(int(oltid)) + "_" + strconv.Itoa(int(intfid)) + "_" + strconv.Itoa(int(onuid))
- return
-}
-
-func makeNniName(oltid uint32) (upif string, dwif string) {
- upif = setup.NNI_VETH_UP_PFX + strconv.Itoa(int(oltid))
- dwif = setup.NNI_VETH_DW_PFX + strconv.Itoa(int(oltid))
- return
-}
-
-func cleanUpVeths(vethenv []string) error {
- if len(vethenv) > 0 {
- logger.Debug("cleanUpVeths called ")
- setup.TearVethDown(vethenv)
- }
- return nil
-}
-
-func killProcesses(pnames []string) error {
- for _, pname := range pnames {
- setup.KillProcess(pname)
- }
- return nil
-}
-
-func setupVethHandler(inveth string, outveth string, vethenv []string) (*pcap.Handle, []string, error) {
- logger.Debug("SetupVethHandler(%s, %s) called ", inveth, outveth)
- err1 := setup.CreateVethPairs(inveth, outveth)
- vethenv = append(vethenv, inveth)
- if err1 != nil {
- setup.RemoveVeths(vethenv)
- return nil, vethenv, err1
- }
- handler, err2 := getVethHandler(inveth)
- if err2 != nil {
- setup.RemoveVeths(vethenv)
- return nil, vethenv, err2
- }
- return handler, vethenv, nil
-}
-
-func getVethHandler(vethname string) (*pcap.Handle, error) {
- var (
- device string = vethname
- snapshot_len int32 = 1518
- promiscuous bool = false
- err error
- timeout time.Duration = pcap.BlockForever
- )
- handle, err := pcap.OpenLive(device, snapshot_len, promiscuous, timeout)
- if err != nil {
- return nil, err
- }
- logger.Debug("Server handle is created for %s\n", vethname)
- return handle, nil
-}
-
func convB2S(b []byte) string {
s := ""
for _, i := range b {
diff --git a/core/grpc_service.go b/core/grpc_service.go
index dce608b..41da7f0 100644
--- a/core/grpc_service.go
+++ b/core/grpc_service.go
@@ -17,16 +17,15 @@
package core
import (
+ "gerrit.opencord.org/voltha-bbsim/common"
"gerrit.opencord.org/voltha-bbsim/device"
"gerrit.opencord.org/voltha-bbsim/protos"
- "gerrit.opencord.org/voltha-bbsim/common"
"github.com/google/gopacket"
"github.com/google/gopacket/layers"
"golang.org/x/net/context"
"google.golang.org/grpc"
"log"
"net"
- "os/exec"
)
// gRPC Service
@@ -65,7 +64,7 @@
logger.Info("OLT receives ActivateONU()\n")
result := device.ValidateONU(*onu, s.Onumap)
if result == true {
- matched, error := s.getOnuBySN(onu.SerialNumber)
+ matched, error := getOnuBySN(s.Onumap, onu.SerialNumber)
if error != nil {
log.Fatalf("%s\n", error)
}
@@ -147,42 +146,24 @@
func (s *Server) Reboot(c context.Context, empty *openolt.Empty) (*openolt.Empty, error) {
logger.Info("OLT %d receives Reboot ().\n", s.Olt.ID)
// Initialize OLT & Env
- if s.TestFlag == true{
- logger.Debug("Initialized by Reboot")
- cleanUpVeths(s.VethEnv)
- close(s.Endchan)
- processes := s.Processes
- logger.Debug("Runnig Processes:", processes)
- killProcesses(processes)
- exec.Command("rm", "/var/run/dhcpd.pid").Run() //This is for DHCP server activation
- exec.Command("touch", "/var/run/dhcpd.pid").Run() //This is for DHCP server activation
- s.Initialize()
- }
- olt := s.Olt
- olt.InitializeStatus()
- for intfid, _ := range s.Onumap{
- for _, onu := range s.Onumap[intfid] {
- onu.InitializeStatus()
- }
- }
+ logger.Debug("Initialized by Reboot")
+ s.Disable()
return new(openolt.Empty), nil
}
func (s *Server) EnableIndication(empty *openolt.Empty, stream openolt.Openolt_EnableIndicationServer) error {
- defer func() {
- s.gRPCserver.Stop()
- }()
- s.EnableServer = &stream
logger.Info("OLT receives EnableInd.\n")
- if err := s.activateOLT(stream); err != nil {
- logger.Error("Failed to activate OLT: %v\n", err)
+ defer func() {
+ logger.Debug("grpc EnableIndication Done")
+ }()
+ if err := s.Enable(&stream); err != nil {
+ logger.Error("Failed to Enable Core: %v\n", err)
return err
}
- logger.Debug("Core server down.")
return nil
}
-func CreateGrpcServer(oltid uint32, npon uint32, nonus uint32, addrport string) (l net.Listener, g *grpc.Server, e error) {
+func NewGrpcServer(addrport string) (l net.Listener, g *grpc.Server, e error) {
logger.Info("Listening %s ...", addrport)
g = grpc.NewServer()
l, e = net.Listen("tcp", addrport)
diff --git a/core/io_info.go b/core/io_info.go
index 21e1b9a..8f65280 100644
--- a/core/io_info.go
+++ b/core/io_info.go
@@ -17,13 +17,14 @@
package core
import (
- "github.com/google/gopacket/pcap"
- "gerrit.opencord.org/voltha-bbsim/common"
"errors"
+ "gerrit.opencord.org/voltha-bbsim/common"
+ "github.com/google/gopacket/pcap"
+ "os/exec"
)
type Ioinfo struct {
- name string
+ Name string
iotype string //nni or uni
ioloc string //inside or outsode
intfid uint32
@@ -42,7 +43,7 @@
return nil, err
}
-func (s *Server) identifyNniIoinfo(ioloc string) (*Ioinfo, error) {
+func (s *Server) IdentifyNniIoinfo(ioloc string) (*Ioinfo, error) {
for _, ioinfo := range s.Ioinfos {
if ioinfo.iotype == "nni" && ioinfo.ioloc == ioloc {
return ioinfo, nil
@@ -53,7 +54,7 @@
return nil, err
}
-func (s *Server) getUniIoinfos(ioloc string) ([]*Ioinfo, error) {
+func (s *Server) GetUniIoinfos(ioloc string) ([]*Ioinfo, error) {
ioinfos := []*Ioinfo{}
for _, ioinfo := range s.Ioinfos {
if ioinfo.iotype == "uni" && ioinfo.ioloc == ioloc {
@@ -67,3 +68,41 @@
}
return ioinfos, nil
}
+
+func CreateVethPairs(name1 string, name2 string) (err error) {
+ err = exec.Command("ip", "link", "add", name1, "type", "veth", "peer", "name", name2).Run()
+ if err != nil {
+ logger.Error("Fail to createVeth() for %s and %s veth creation error: %s\n", name1, name2, err.Error())
+ return
+ }
+ logger.Info("%s & %s was created.", name1, name2)
+ err = exec.Command("ip", "link", "set", name1, "up").Run()
+ if err != nil {
+ logger.Error("Fail to createVeth() veth1 up", err)
+ return
+ }
+ err = exec.Command("ip", "link", "set", name2, "up").Run()
+ if err != nil {
+ logger.Error("Fail to createVeth() veth2 up", err)
+ return
+ }
+ logger.Info("%s & %s was up.", name1, name2)
+ return
+}
+
+func RemoveVeth(name string) error {
+ err := exec.Command("ip", "link", "del", name).Run()
+ if err != nil {
+ logger.Error("Fail to removeVeth()", err)
+ }
+ logger.Info("%s was removed.", name)
+ return err
+}
+
+func RemoveVeths(names []string) {
+ for _, name := range names {
+ RemoveVeth(name)
+ }
+ logger.Info("RemoveVeths() :%s\n", names)
+ return
+}
diff --git a/core/io_worker.go b/core/io_worker.go
index 1fa97bd..41aec4b 100644
--- a/core/io_worker.go
+++ b/core/io_worker.go
@@ -17,12 +17,14 @@
package core
import (
+ "errors"
+ "gerrit.opencord.org/voltha-bbsim/common"
"github.com/google/gopacket"
"github.com/google/gopacket/layers"
"github.com/google/gopacket/pcap"
- "gerrit.opencord.org/voltha-bbsim/common"
- "errors"
"net"
+ "strconv"
+ "time"
)
func RecvWorker(io *Ioinfo, handler *pcap.Handle, r chan Packet) {
@@ -49,7 +51,7 @@
func SendNni(handle *pcap.Handle, packet gopacket.Packet) {
err := handle.WritePacketData(packet.Data())
- if err != nil{
+ if err != nil {
logger.Error("Error in send packet to NNI e:%s\n", err)
}
logger.Debug("send packet to NNI-IF: %v \n", *handle)
@@ -139,3 +141,47 @@
}
return hwAddr
}
+
+func makeUniName(oltid uint32, intfid uint32, onuid uint32) (upif string, dwif string) {
+ upif = UNI_VETH_UP_PFX + strconv.Itoa(int(oltid)) + "_" + strconv.Itoa(int(intfid)) + "_" + strconv.Itoa(int(onuid))
+ dwif = UNI_VETH_DW_PFX + strconv.Itoa(int(oltid)) + "_" + strconv.Itoa(int(intfid)) + "_" + strconv.Itoa(int(onuid))
+ return
+}
+
+func makeNniName(oltid uint32) (upif string, dwif string) {
+ upif = NNI_VETH_UP_PFX + strconv.Itoa(int(oltid))
+ dwif = NNI_VETH_DW_PFX + strconv.Itoa(int(oltid))
+ return
+}
+
+func setupVethHandler(inveth string, outveth string, vethnames []string) (*pcap.Handle, []string, error) {
+ logger.Debug("SetupVethHandler(%s, %s) called ", inveth, outveth)
+ err1 := CreateVethPairs(inveth, outveth)
+ vethnames = append(vethnames, inveth)
+ if err1 != nil {
+ RemoveVeths(vethnames)
+ return nil, vethnames, err1
+ }
+ handler, err2 := getVethHandler(inveth)
+ if err2 != nil {
+ RemoveVeths(vethnames)
+ return nil, vethnames, err2
+ }
+ return handler, vethnames, nil
+}
+
+func getVethHandler(vethname string) (*pcap.Handle, error) {
+ var (
+ device string = vethname
+ snapshot_len int32 = 1518
+ promiscuous bool = false
+ err error
+ timeout time.Duration = pcap.BlockForever
+ )
+ handle, err := pcap.OpenLive(device, snapshot_len, promiscuous, timeout)
+ if err != nil {
+ return nil, err
+ }
+ logger.Debug("Server handle is created for %s\n", vethname)
+ return handle, nil
+}
diff --git a/core/mediator.go b/core/mediator.go
new file mode 100644
index 0000000..496621a
--- /dev/null
+++ b/core/mediator.go
@@ -0,0 +1,180 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package core
+
+import (
+ "sync"
+ "gerrit.opencord.org/voltha-bbsim/common"
+ "os"
+ "os/signal"
+ "fmt"
+ "flag"
+ "strings"
+ "strconv"
+)
+
+type option struct{
+ address string
+ port uint32
+ oltid uint32
+ npon uint32
+ nonus uint32
+ aaawait int
+ dhcpwait int
+ dhcpservip string
+ intvl int
+ intvl_test int
+ Mode Mode
+}
+
+func GetOptions() *option {
+ o := new(option)
+ addressport := flag.String("H", ":50060", "IP address:port")
+ oltid :=flag.Int("id", 0, "OLT-ID")
+ npon := flag.Int("i", 1, "Number of PON-IF ports")
+ nonus := flag.Int("n", 1, "Number of ONUs per PON-IF port")
+ modeopt := flag.String("m", "default", "Emulation mode (default, aaa, both (aaa & dhcp))")
+ aaawait := flag.Int("aw", 30, "Wait time (sec) for activation WPA supplicants")
+ dhcpwait := flag.Int("dw", 50, "Wait time (sec) for activation DHCP clients")
+ dhcpservip := flag.String("s", "182.21.0.1", "DHCP Server IP Address")
+ intvl := flag.Int("v", 1, "Interval each Indication")
+ intvl_test := flag.Int("V", 1, "Interval each Indication")
+ o.Mode = DEFAULT
+ flag.Parse()
+ if *modeopt == "aaa" {
+ o.Mode = AAA
+ } else if *modeopt == "both" {
+ o.Mode = BOTH
+ }
+ o.oltid = uint32(*oltid)
+ o.npon = uint32(*npon)
+ o.nonus = uint32(*nonus)
+ o.aaawait = *aaawait
+ o.dhcpwait = *dhcpwait
+ o.dhcpservip = *dhcpservip
+ o.intvl = *intvl
+ o.intvl_test = *intvl_test
+ o.address = (strings.Split(*addressport, ":")[0])
+ tmp, _ := strconv.Atoi(strings.Split(*addressport, ":")[1])
+ o.port = uint32(tmp)
+ return o
+}
+
+type stateMachine struct{
+ handlers []*handler
+ state coreState
+}
+
+type handler struct{
+ dst coreState
+ src coreState
+ method func(s *Server) error
+}
+
+func (sm *stateMachine) transit(next coreState) func(s *Server) error {
+ for _, handler := range sm.handlers{
+ if handler.src == sm.state && handler.dst == next {
+ logger.Debug("Hit (src:%d, dst:%d)",handler.src, handler.dst)
+ sm.state = next
+ return handler.method
+ }
+ }
+ sm.state = next
+ return nil
+}
+
+type mediator struct {
+ opt *option
+ sm *stateMachine
+ server *Server
+ tester *Tester
+}
+
+func NewMediator(o *option) *mediator{
+ m := new(mediator)
+ m.opt = o
+ logger.Debug("ip:%s, baseport:%d, npon:%d, nonus:%d, mode:%d\n", o.address, o.port, o.npon, o.nonus, o.Mode)
+ return m
+}
+
+func (m *mediator) Start(){
+ var wg sync.WaitGroup
+ opt := m.opt
+ server := NewCore(opt)
+ wg.Add(1)
+ go func(){
+ if err:= server.Start(); err != nil { //Blocking
+ logger.Error("%s", err)
+ }
+ wg.Done()
+ return
+ }()
+
+ tester := NewTester(opt)
+ m.server = server
+ m.tester = tester
+ m.sm = &stateMachine{
+ state: INACTIVE,
+ handlers: []*handler{
+ &handler{src: PRE_ACTIVE,dst: ACTIVE, method: m.tester.Start},
+ &handler{src: ACTIVE,dst: PRE_ACTIVE, method: m.tester.Stop},
+ },
+ }
+ go func(){
+ m.Mediate()
+ }()
+
+ c := make(chan os.Signal, 1)
+ signal.Notify(c, os.Interrupt)
+ go func() {
+ defer func(){
+ logger.Debug("SIGINT catcher Done")
+ wg.Done()
+ }()
+ for sig := range c {
+ wg.Add(1)
+ fmt.Println("SIGINT", sig)
+ close(c)
+ server.Stop() //Non-blocking
+ tester.Stop(server) //Non-blocking
+ return
+ }
+ }()
+ wg.Wait()
+ logger.Debug("Reach to the end line")
+}
+
+func (m *mediator) Mediate(){
+ wg := sync.WaitGroup{}
+ defer logger.Debug("Mediate Done")
+ for corestat := range m.server.stateChan {
+ logger.Debug("Mediator receives state %d of server", corestat)
+ method := m.sm.transit(corestat)
+ if method != nil {
+ wg.Add(1)
+ defer wg.Done()
+ go func() error {
+ if err := method(m.server); err != nil{ //blocking
+ m.server.Stop()
+ return err
+ }
+ return nil
+ }()
+ }
+ }
+ wg.Wait()
+}
\ No newline at end of file
diff --git a/core/openolt_service.go b/core/openolt_service.go
index ef92704..97047bf 100644
--- a/core/openolt_service.go
+++ b/core/openolt_service.go
@@ -17,10 +17,10 @@
package core
import (
- "time"
- "gerrit.opencord.org/voltha-bbsim/protos"
- "gerrit.opencord.org/voltha-bbsim/device"
"gerrit.opencord.org/voltha-bbsim/common"
+ "gerrit.opencord.org/voltha-bbsim/device"
+ "gerrit.opencord.org/voltha-bbsim/protos"
+ "time"
)
func sendOltIndUp(stream openolt.Openolt_EnableIndicationServer, olt *device.Olt) error {
diff --git a/core/tester.go b/core/tester.go
new file mode 100644
index 0000000..ba4792b
--- /dev/null
+++ b/core/tester.go
@@ -0,0 +1,322 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package core
+
+import (
+ "context"
+ "gerrit.opencord.org/voltha-bbsim/common"
+ "golang.org/x/sync/errgroup"
+ "log"
+ "os/exec"
+ "time"
+ "sync"
+)
+
+const (
+ DEFAULT Mode = iota
+ AAA
+ BOTH
+)
+
+type Mode int
+
+type Tester struct {
+ Mode Mode
+ AAAWait int
+ DhcpWait int
+ DhcpServerIP string
+ Processes []string
+ Intvl int
+ cancel context.CancelFunc
+}
+
+func NewTester(opt *option) *Tester{
+ t := new(Tester)
+ t.AAAWait = opt.aaawait
+ t.DhcpWait = opt.dhcpwait
+ t.DhcpServerIP = opt.dhcpservip
+ t.Intvl = opt.intvl_test
+ t.Mode = opt.Mode
+ return t
+}
+
+//Blocking
+func (t *Tester) Start (s *Server) error {
+ ctx := context.Background()
+ ctx, cancel := context.WithCancel(ctx)
+ t.cancel = cancel
+ defer func(){
+ cancel()
+ t.Initialize()
+ logger.Debug("Tester Done")
+ }()
+ logger.Info("Tester Run() start")
+ if t.Mode == DEFAULT {
+ //Empty
+ } else if t.Mode == AAA || t.Mode == BOTH {
+ eg, child := errgroup.WithContext(ctx)
+ child, cancel := context.WithCancel(child)
+ eg.Go(func() error {
+ defer func() {
+ logger.Debug("exeAAATest Done")
+ }()
+ err := t.exeAAATest(child, s, t.AAAWait)
+ return err
+ })
+
+ if t.Mode == BOTH {
+ eg.Go(func() error {
+ defer func() {
+ logger.Debug("exeDHCPTest Done")
+ }()
+
+ err := t.exeDHCPTest(ctx, s, t.DhcpWait)
+ return err
+ })
+ }
+ if err := eg.Wait(); err != nil {
+ logger.Error("Error happened in tester: %s", err)
+ cancel()
+ return err
+ } else {
+ logger.Debug("Test successfully finished")
+ }
+ }
+ return nil
+}
+
+func (t *Tester) Stop(s *Server) error {
+ if t.cancel != nil {
+ t.cancel()
+ }
+ return nil
+}
+
+func (t *Tester) Initialize (){
+ logger.Info("Tester Initialize () called")
+ processes := t.Processes
+ logger.Debug("Runnig Process: %s", processes)
+ KillProcesses(processes)
+ exec.Command("rm", "/var/run/dhcpd.pid").Run() //This is for DHCP server activation
+ exec.Command("touch", "/var/run/dhcpd.pid").Run() //This is for DHCP server activation
+}
+
+
+func (t *Tester) exeAAATest(ctx context.Context, s *Server, wait int) error {
+ tick := time.NewTicker(time.Second)
+ defer tick.Stop()
+ logger.Info("exeAAATest stands by....")
+ infos, err := s.GetUniIoinfos("outside")
+ if err != nil {
+ return err
+ }
+
+ univeths := []string{}
+ for _, info := range infos {
+ univeths = append(univeths, info.Name)
+ }
+
+ for sec := 1; sec <= wait; sec ++ {
+ select {
+ case <-ctx.Done():
+ logger.Debug("exeAAATest thread receives close ")
+ return nil
+ case <-tick.C:
+ logger.Info("exeAAATest stands by ... %dsec\n", wait - sec)
+ if sec == wait {
+ wg := sync.WaitGroup{}
+ wg.Add(1)
+ go func() error{
+ defer wg.Done()
+ err = activateWPASups(ctx, univeths, t.Intvl)
+ if err != nil {
+ return err
+ }
+ logger.Info("WPA supplicants are successfully activated ")
+ t.Processes = append(t.Processes, "wpa_supplicant")
+ logger.Debug("Running Process:%s", t.Processes)
+ return nil
+ }()
+ wg.Wait()
+ }
+ }
+ }
+ return nil
+}
+
+func (t *Tester) exeDHCPTest(ctx context.Context, s *Server, wait int) error {
+ tick := time.NewTicker(time.Second)
+ defer tick.Stop()
+ logger.Info("exeDHCPTest stands by....")
+ info, err := s.IdentifyNniIoinfo("outside")
+ if err != nil {
+ return err
+ }
+
+ err = activateDHCPServer(info.Name, t.DhcpServerIP)
+ if err != nil {
+ return err
+ }
+ t.Processes = append(t.Processes, "dhcpd")
+ logger.Debug("Running Process:%s", t.Processes)
+
+ infos, err := s.GetUniIoinfos("outside")
+ if err != nil {
+ return err
+ }
+
+ univeths := []string{}
+ for _, info := range infos {
+ univeths = append(univeths, info.Name)
+ }
+
+ for sec := 1; sec <= wait; sec ++ {
+ select {
+ case <- ctx.Done():
+ logger.Debug("exeDHCPTest thread receives close ")
+ return nil
+ case <- tick.C:
+ logger.Info("exeDHCPTest stands by ... %dsec\n", wait- sec)
+ if sec == wait {
+ wg := sync.WaitGroup{}
+ wg.Add(1)
+ go func() error{
+ defer wg.Done()
+ err = activateDHCPClients(ctx, univeths, t.Intvl)
+ if err != nil {
+ return err
+ }
+ logger.Info("DHCP clients are successfully activated ")
+ t.Processes = append(t.Processes, "dhclient")
+ logger.Debug("Running Process:%s", t.Processes)
+ return nil
+ }()
+ wg.Wait()
+ }
+ }
+ }
+ return nil
+}
+
+func KillProcesses(pnames []string) error {
+ for _, pname := range pnames {
+ killProcess(pname)
+ }
+ return nil
+}
+
+func activateWPASups(ctx context.Context, vethnames []string, intvl int) error {
+ tick := time.NewTicker(time.Duration(intvl) * time.Second)
+ defer tick.Stop()
+ i := 0
+ for {
+ select{
+ case <- tick.C:
+ if i < len(vethnames) {
+ vethname := vethnames[i]
+ if err := activateWPASupplicant(vethname); err != nil {
+ return err
+ }
+ logger.Debug("activateWPASupplicant for interface %v\n", vethname)
+ i ++
+ }
+ case <- ctx.Done():
+ logger.Debug("activateWPASups was canceled by context.")
+ return nil
+ }
+ }
+ return nil
+}
+
+func activateDHCPClients(ctx context.Context, vethnames []string, intvl int) error {
+ tick := time.NewTicker(time.Duration(intvl) * time.Second)
+ defer tick.Stop()
+ i := 0
+ for {
+ select{
+ case <- tick.C:
+ if i < len(vethnames){
+ vethname := vethnames[i]
+ if err := activateDHCPClient(vethname); err != nil {
+ return err
+ }
+ logger.Debug("activateDHCPClient for interface %v\n", vethname)
+ i ++
+ }
+ case <- ctx.Done():
+ logger.Debug("activateDHCPClients was canceled by context.")
+ return nil
+ }
+ }
+ return nil
+}
+
+func killProcess(name string) error {
+ err := exec.Command("pkill", name).Run()
+ if err != nil {
+ logger.Error("Fail to pkill %s: %v\n", name, err)
+ return err
+ }
+ logger.Info("Successfully killed %s\n", name)
+ return nil
+}
+
+func activateWPASupplicant(vethname string) (err error) {
+ cmd := "/sbin/wpa_supplicant"
+ conf := "/etc/wpa_supplicant/wpa_supplicant.conf"
+ err = exec.Command(cmd, "-D", "wired", "-i", vethname, "-c", conf).Start()
+ if err != nil {
+ logger.Error("Fail to activateWPASupplicant() for :%s %v\n", vethname, err)
+ return
+ }
+ logger.Info("activateWPASupplicant() for :%s\n", vethname)
+ return
+}
+
+func activateDHCPClient(vethname string) (err error) {
+ logger.Debug("activateDHCPClient() start for: %s\n", vethname)
+ cmd := exec.Command("/usr/local/bin/dhclient", vethname)
+ // if err := cmd.Run(); err != nil {
+ if err := cmd.Start(); err != nil {
+ logger.Error("Fail to activateDHCPClient() for: %s", vethname)
+ log.Panic(err)
+ }
+ logger.Debug("activateDHCPClient() done for: %s\n", vethname)
+ return
+}
+
+func activateDHCPServer(veth string, serverip string) error {
+ err := exec.Command("ip", "addr", "add", serverip, "dev", veth).Run()
+ if err != nil {
+ logger.Error("Fail to add ip to %s address: %s\n", veth, err)
+ return err
+ }
+ err = exec.Command("ip", "link", "set", veth, "up").Run()
+ if err != nil {
+ logger.Error("Fail to set %s up: %s\n", veth, err)
+ return err
+ }
+ cmd := "/usr/local/bin/dhcpd"
+ conf := "/etc/dhcp/dhcpd.conf"
+ err = exec.Command(cmd, "-cf", conf, veth).Run()
+ if err != nil {
+ logger.Error("Fail to activateDHCP Server (): %s\n", err)
+ return err
+ }
+ logger.Info("DHCP Server is successfully activated !\n")
+ return err
+}