VOL-291 : PON simulator refactoring for cluster integration
- Added ponsim build target in Makefile
- Added new option to vcore to select comm type with ponsim
- Modified all proto files to include destination go package
Amendments:
- Clean up based on review comments
- Properly close GRPC connections in ponsim_olt adapter
- Added voltha namespace to some k8s templates
Change-Id: I2f349fa7b3550a8a8cc8fc676cc896f33fbb9372
diff --git a/ponsim/v2/core/ponsim_olt.go b/ponsim/v2/core/ponsim_olt.go
new file mode 100644
index 0000000..2485a67
--- /dev/null
+++ b/ponsim/v2/core/ponsim_olt.go
@@ -0,0 +1,403 @@
+package core
+
+import (
+ "context"
+ "crypto/tls"
+ "github.com/golang/protobuf/ptypes/empty"
+ "github.com/google/gopacket"
+ "github.com/opencord/voltha/ponsim/v2/common"
+ "github.com/opencord/voltha/protos/go/ponsim"
+ "github.com/sirupsen/logrus"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/connectivity"
+ "google.golang.org/grpc/credentials"
+ "strconv"
+ "strings"
+ "time"
+)
+
+// TODO: Pass-in the certificate information as a structure parameter
+
+/*
+PonSimOltDevice is the structure responsible for the handling of an OLT device
+*/
+type PonSimOltDevice struct {
+ PonSimDevice `json:pon_device`
+ VCoreEndpoint string `json:vcore_ep`
+ MaxOnuCount int `json:max_onu`
+ Onus map[int32]*OnuRegistree `json:onu_registrees`
+ outgoing chan []byte
+
+ counterLoop *common.IntervalHandler
+ alarmLoop *common.IntervalHandler
+}
+
+/*
+
+ */
+type OnuRegistree struct {
+ Device *PonSimOnuDevice `json:onu_device`
+ Conn *grpc.ClientConn `json:grpc_conn`
+ Client ponsim.PonSimCommonClient `json:client`
+ Stream ponsim.PonSimCommon_ProcessDataClient `json:stream`
+}
+
+const (
+ BASE_PORT_NUMBER = 128
+)
+
+/*
+NewPonSimOltDevice instantiates a new OLT device structure
+*/
+func NewPonSimOltDevice(device PonSimDevice) *PonSimOltDevice {
+ olt := &PonSimOltDevice{PonSimDevice: device}
+ return olt
+}
+
+/*
+forwardToONU defines a EGRESS function to forward a packet to a specific ONU
+*/
+func (o *PonSimOltDevice) forwardToONU(onuPort int32) func(int, gopacket.Packet) {
+ return func(port int, frame gopacket.Packet) {
+ ipAddress := common.GetInterfaceIP(o.ExternalIf)
+ incoming := &ponsim.IncomingData{
+ Id: "EGRESS.OLT." + ipAddress,
+ Address: ipAddress,
+ Port: int32(port),
+ Payload: frame.Data(),
+ }
+ common.Logger().WithFields(logrus.Fields{
+ "device": o,
+ "port": port,
+ "frame": frame,
+ }).Debug("Forwarding to ONU")
+
+ // Forward packet to ONU
+ if err := o.GetOnu(onuPort).Stream.Send(incoming); err != nil {
+ common.Logger().WithFields(logrus.Fields{
+ "device": o,
+ "frameDump": frame.Dump(),
+ "incoming": incoming,
+ "error": err.Error(),
+ }).Error("A problem occurred while forwarding to ONU")
+ }
+
+ }
+}
+
+/*
+forwardToLAN defines an INGRESS function to forward a packet to VOLTHA
+*/
+func (o *PonSimOltDevice) forwardToLAN() func(int, gopacket.Packet) {
+ return func(port int, frame gopacket.Packet) {
+ common.Logger().WithFields(logrus.Fields{
+ "frame": frame.Dump(),
+ }).Info("Sending packet")
+
+ select {
+ case o.outgoing <- frame.Data():
+ common.Logger().WithFields(logrus.Fields{
+ "frame": frame.Dump(),
+ }).Info("Sent packet")
+ default:
+ common.Logger().WithFields(logrus.Fields{
+ "frame": frame.Dump(),
+ }).Warn("Unable to send packet")
+ }
+ }
+}
+
+/*
+Start performs setup operations for an OLT device
+*/
+func (o *PonSimOltDevice) Start(ctx context.Context) {
+ common.Logger().Info("Starting OLT device...")
+ o.PonSimDevice.Start(ctx)
+
+ // Open network interfaces for listening
+ o.connectNetworkInterfaces()
+
+ o.outgoing = make(chan []byte, 1)
+
+ // Add INGRESS operation
+ o.AddLink(2, 0, o.forwardToLAN())
+
+ // Start PM counter logging
+ o.counterLoop = common.NewIntervalHandler(90, o.Counter.LogCounts)
+ o.counterLoop.Start()
+
+ // Start alarm simulation
+ if o.AlarmsOn {
+ common.Logger().WithFields(logrus.Fields{
+ "device": o,
+ }).Debug("Starting alarm simulation")
+
+ alarms := NewPonSimAlarm(o.InternalIf, o.VCoreEndpoint, o.forwardToLAN())
+ o.alarmLoop = common.NewIntervalHandler(o.AlarmsFreq, alarms.GenerateAlarm)
+ o.alarmLoop.Start()
+ }
+}
+
+/*
+Stop performs cleanup operations for an OLT device
+*/
+func (o *PonSimOltDevice) Stop(ctx context.Context) {
+ common.Logger().Info("Stopping OLT device...")
+
+ // Stop PM counters loop
+ o.counterLoop.Stop()
+ o.counterLoop = nil
+
+ // Stop alarm simulation
+ if o.AlarmsOn {
+ o.alarmLoop.Stop()
+ }
+ o.alarmLoop = nil
+
+ o.ingressHandler.Close()
+ o.egressHandler.Close()
+
+ o.PonSimDevice.Stop(ctx)
+}
+
+/*
+ConnectToRemoteOnu establishes communication to a remote ONU device
+*/
+func (o *PonSimOltDevice) ConnectToRemoteOnu(onu *OnuRegistree) error {
+ var err error
+
+ host := strings.Join([]string{
+ onu.Device.Address,
+ strconv.Itoa(int(onu.Device.Port)),
+ }, ":")
+
+ common.Logger().WithFields(logrus.Fields{
+ "device": o,
+ "host": host,
+ }).Debug("Formatting host address")
+
+ // TODO: make it secure
+ ta := credentials.NewTLS(&tls.Config{
+ //Certificates: []tls.Certificate{peerCert},
+ //RootCAs: caCertPool,
+ InsecureSkipVerify: true,
+ })
+
+ // GRPC communication needs to be secured
+ if onu.Conn, err = grpc.DialContext(
+ context.Background(),
+ host,
+ grpc.WithTransportCredentials(ta),
+ ); err != nil {
+ common.Logger().WithFields(logrus.Fields{
+ "device": o,
+ "error": err.Error(),
+ }).Error("Problem with client connection")
+ }
+
+ return err
+}
+
+/*
+Listen waits for incoming EGRESS data on the internal interface
+*/
+func (o *PonSimOltDevice) Listen(ctx context.Context, port int32) {
+ var reply *empty.Empty
+ var err error
+
+ // Establish a GRPC connection with the ONU
+ onu := o.GetOnu(port)
+
+ common.Logger().WithFields(logrus.Fields{
+ "onu": onu,
+ }).Debug("Connecting to remote ONU")
+
+ if onu.Client = ponsim.NewPonSimCommonClient(onu.Conn); onu.Client == nil {
+ common.Logger().WithFields(logrus.Fields{
+ "device": o,
+ }).Error("Problem establishing client connection to ONU")
+ o.RemoveOnu(ctx, port)
+ return
+ }
+
+ // Prepare stream to ONU to forward incoming data as needed
+ if onu.Stream, err = onu.Client.ProcessData(ctx); err != nil {
+ common.Logger().WithFields(logrus.Fields{
+ "device": o,
+ }).Error("Problem establishing stream to ONU")
+ o.RemoveOnu(ctx, port)
+ return
+ }
+
+ defer o.egressHandler.Close()
+ packetSource := gopacket.NewPacketSource(o.egressHandler, o.egressHandler.LinkType())
+ common.Logger().WithFields(logrus.Fields{
+ "device": o,
+ "interface": o.InternalIf,
+ }).Debug("Listening to incoming EGRESS data")
+
+ // Wait for incoming EGRESS data
+ for packet := range packetSource.Packets() {
+ if dot1q := common.GetDot1QLayer(packet); dot1q != nil {
+ common.Logger().WithFields(logrus.Fields{
+ "device": o,
+ "packet": packet,
+ }).Debug("Received EGRESS packet")
+
+ o.Forward(ctx, 2, packet)
+ }
+ }
+
+ common.Logger().WithFields(logrus.Fields{
+ "device": o,
+ }).Debug("No more packets to process")
+
+ if reply, err = onu.Stream.CloseAndRecv(); err != nil {
+ common.Logger().WithFields(logrus.Fields{
+ "device": o,
+ "error": err.Error(),
+ }).Error("A problem occurred while closing client stream")
+ } else {
+ common.Logger().WithFields(logrus.Fields{
+ "device": o,
+ "reply": reply,
+ }).Warn("Client stream closed")
+ }
+}
+
+/*
+GetOnus returns the list of registered ONU devices
+*/
+func (o *PonSimOltDevice) GetOnus() map[int32]*OnuRegistree {
+ if o.Onus == nil {
+ o.Onus = make(map[int32]*OnuRegistree)
+ }
+
+ return o.Onus
+}
+
+/*
+GetOnu return a specific registered ONU
+*/
+func (o *PonSimOltDevice) GetOnu(index int32) *OnuRegistree {
+ var onu *OnuRegistree
+ var ok bool
+
+ if onu, ok = (o.GetOnus())[index]; ok {
+ return onu
+ }
+
+ return nil
+}
+
+func (o *PonSimOltDevice) GetOutgoing() chan []byte {
+ return o.outgoing
+}
+
+/*
+nextAvailablePort returns a port that is not already used by a registered ONU
+*/
+func (o *PonSimOltDevice) nextAvailablePort() int32 {
+ var port int32 = BASE_PORT_NUMBER
+
+ if len(o.GetOnus()) < o.MaxOnuCount {
+ for {
+ if o.GetOnu(port) != nil {
+ // port is already used
+ port += 1
+ } else {
+ // port is available... use it
+ return port
+ }
+ }
+ } else {
+ // OLT has reached its max number of ONUs
+ return -1
+ }
+}
+
+/*
+AddOnu registers an ONU device and sets up all required monitoring and connections
+*/
+func (o *PonSimOltDevice) AddOnu(onu *PonSimOnuDevice) (int32, error) {
+ var portNum int32
+ ctx := context.Background()
+
+ if portNum = o.nextAvailablePort(); portNum != -1 {
+ common.Logger().WithFields(logrus.Fields{
+ "device": o,
+ "port": portNum,
+ "onu": onu,
+ }).Info("Adding ONU")
+
+ registree := &OnuRegistree{Device: onu}
+
+ // Setup GRPC communication and check if it succeeded
+ if err := o.ConnectToRemoteOnu(registree); err == nil {
+ o.GetOnus()[portNum] = registree
+
+ o.AddLink(1, int(portNum), o.forwardToONU(portNum))
+ go o.MonitorOnu(ctx, portNum)
+ go o.Listen(ctx, portNum)
+ }
+
+ } else {
+ common.Logger().WithFields(logrus.Fields{
+ "device": o,
+ }).Warn("ONU Map is full")
+ }
+
+ return int32(portNum), nil
+}
+
+/*
+RemoveOnu removes the reference to a registered ONU
+*/
+func (o *PonSimOltDevice) RemoveOnu(ctx context.Context, onuIndex int32) error {
+ onu := o.GetOnu(onuIndex)
+ if err := onu.Conn.Close(); err != nil {
+ common.Logger().WithFields(logrus.Fields{
+ "device": o,
+ "onu": onu.Device,
+ "onuIndex": onuIndex,
+ }).Error("Problem closing connection to ONU")
+ }
+
+ common.Logger().WithFields(logrus.Fields{
+ "device": o,
+ "onu": onu,
+ "onuIndex": onuIndex,
+ }).Info("Removing ONU")
+
+ delete(o.Onus, onuIndex)
+
+ // Remove link entries for this ONU
+ o.RemoveLink(1, int(onuIndex))
+
+ return nil
+}
+
+/*
+MonitorOnu verifies the connection status of a specific ONU and cleans up as necessary
+*/
+func (o *PonSimOltDevice) MonitorOnu(ctx context.Context, onuIndex int32) {
+ for {
+ if o.GetOnu(onuIndex) != nil {
+ if conn := o.GetOnu(onuIndex).Conn; conn.GetState() == connectivity.Ready {
+ // Wait for any change to occur
+ conn.WaitForStateChange(ctx, conn.GetState())
+ // We lost communication with the ONU ... remove it
+ o.RemoveOnu(ctx, onuIndex)
+ return
+ }
+ common.Logger().WithFields(logrus.Fields{
+ "device": o,
+ "ctx": ctx,
+ "onuIndex": onuIndex,
+ }).Debug("ONU is not ready")
+ time.Sleep(1 * time.Second)
+ } else {
+ return
+ }
+ }
+}