VOL-291 : PON simulator refactoring for cluster integration
- Added ponsim build target in Makefile
- Added new option to vcore to select comm type with ponsim
- Modified all proto files to include destination go package
Amendments:
- Clean up based on review comments
- Properly close GRPC connections in ponsim_olt adapter
- Added voltha namespace to some k8s templates
Change-Id: I2f349fa7b3550a8a8cc8fc676cc896f33fbb9372
diff --git a/ponsim/v2/core/ponsim_onu.go b/ponsim/v2/core/ponsim_onu.go
new file mode 100644
index 0000000..f58d473
--- /dev/null
+++ b/ponsim/v2/core/ponsim_onu.go
@@ -0,0 +1,388 @@
+package core
+
+import (
+ "context"
+ "crypto/tls"
+ "github.com/golang/protobuf/ptypes/empty"
+ "github.com/google/gopacket"
+ "github.com/google/uuid"
+ "github.com/opencord/voltha/ponsim/v2/common"
+ "github.com/opencord/voltha/protos/go/ponsim"
+ "github.com/sirupsen/logrus"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials"
+ "strconv"
+ "strings"
+ "sync"
+ "time"
+)
+
+// TODO: Cleanup GRPC security config
+// TODO: Pass-in the certificate information as a structure parameter
+
+/*
+PonSimOnuDevice is the structure responsible for the handling of an ONU device
+*/
+type PonSimOnuDevice struct {
+ PonSimDevice
+
+ ParentAddress string
+ ParentPort int32
+ AssignedPort int32
+ Conn *grpc.ClientConn
+
+ oltClient ponsim.PonSimCommonClient
+ stream ponsim.PonSimCommon_ProcessDataClient
+ monitor chan PonSimDeviceState
+ state PonSimDeviceState
+}
+
+/*
+NewPonSimOnuDevice instantiates a new ONU device structure
+*/
+func NewPonSimOnuDevice(device PonSimDevice) *PonSimOnuDevice {
+ onu := &PonSimOnuDevice{PonSimDevice: device}
+
+ return onu
+}
+
+/*
+forwardToOLT defines a INGRESS function to forward a packet to the parent OLT
+*/
+func (o *PonSimOnuDevice) forwardToOLT() func(int, gopacket.Packet) {
+ return func(port int, frame gopacket.Packet) {
+ ipAddress := common.GetInterfaceIP(o.InternalIf)
+ incoming := &ponsim.IncomingData{
+ Id: "INGRESS.ONU." + ipAddress,
+ Address: ipAddress,
+ Port: int32(port),
+ Payload: frame.Data(),
+ }
+ common.Logger().WithFields(logrus.Fields{
+ "device": o,
+ "port": port,
+ "frame": frame,
+ "frameDump": frame.Dump(),
+ "incoming": incoming,
+ }).Debug("Forwarding to OLT")
+
+ // Forward packet to OLT
+ if err := o.stream.Send(incoming); err != nil {
+ common.Logger().WithFields(logrus.Fields{
+ "device": o,
+ "port": port,
+ "frameDump": frame.Dump(),
+ "incoming": incoming,
+ }).Fatal("A problem occurred while forwarding to OLT")
+ }
+ }
+}
+
+/*
+forwardToWAN defines a EGRESS function to forward a packet to the world
+*/
+func (o *PonSimOnuDevice) forwardToWAN() func(int, gopacket.Packet) {
+ return func(port int, frame gopacket.Packet) {
+ var err error
+ common.Logger().WithFields(logrus.Fields{
+ "device": o,
+ "port": port,
+ "frame": frame,
+ }).Debug("Forwarding packet to world")
+ if err = o.ingressHandler.WritePacketData(frame.Data()); err != nil {
+ common.Logger().WithFields(logrus.Fields{
+ "device": o,
+ "port": port,
+ "frame": frame,
+ }).Fatal("Problem while forwarding packet to world")
+ } else {
+ common.Logger().WithFields(logrus.Fields{
+ "device": o,
+ "port": port,
+ "frame": frame,
+ }).Debug("Forwarded packet to world")
+ }
+ }
+}
+
+/*
+Start performs setup operations for an ONU device
+*/
+func (o *PonSimOnuDevice) Start(ctx context.Context) {
+ // Initialize the parent
+ o.PonSimDevice.Start(ctx)
+
+ // Setup flow behaviours
+ // ONU -> OLT
+ o.AddLink(1, 0, o.forwardToOLT())
+ // ONU -> World
+ o.AddLink(2, 0, o.forwardToWAN())
+
+ go o.MonitorConnection(ctx)
+}
+
+/*
+Stop performs cleanup operations for an ONU device
+*/
+func (o *PonSimOnuDevice) Stop(ctx context.Context) {
+ common.Logger().WithFields(logrus.Fields{
+ "device": o,
+ }).Debug("Stopping ONU")
+
+ o.RemoveLink(1, 0)
+ o.RemoveLink(2, 0)
+
+ o.PonSimDevice.Stop(ctx)
+}
+
+/*
+Listen waits for incoming INGRESS data on the external interface
+*/
+func (o *PonSimOnuDevice) Listen(ctx context.Context) {
+ var reply *empty.Empty
+ var err error
+
+ if o.oltClient = ponsim.NewPonSimCommonClient(o.Conn); o.oltClient == nil {
+ common.Logger().WithFields(logrus.Fields{
+ "device": o,
+ }).Fatal("Problem establishing client connection to OLT")
+ panic("Problem establishing client connection to OLT")
+ }
+
+ // Establish GRPC connection with OLT
+ if o.stream, err = o.oltClient.ProcessData(ctx); err != nil {
+ common.Logger().WithFields(logrus.Fields{
+ "device": o,
+ "error": err.Error(),
+ }).Fatal("Problem establishing stream")
+ panic(err)
+ }
+
+ defer o.ingressHandler.Close()
+ packetSource := gopacket.NewPacketSource(o.ingressHandler, o.ingressHandler.LinkType())
+ common.Logger().WithFields(logrus.Fields{
+ "device": o,
+ "interface": o.ExternalIf,
+ }).Debug("Listening to incoming ONU data")
+
+ for packet := range packetSource.Packets() {
+ common.Logger().WithFields(logrus.Fields{
+ "device": o,
+ "packet": packet,
+ }).Debug("Received INGRESS packet")
+
+ o.Forward(ctx, 2, packet)
+ }
+
+ common.Logger().WithFields(logrus.Fields{
+ "device": o,
+ }).Debug("No more packets to process")
+
+ if reply, err = o.stream.CloseAndRecv(); err != nil {
+ common.Logger().Fatal("A problem occurred while closing Ingress stream", err.Error())
+ } else {
+ common.Logger().Info("Ingress stream closed", reply)
+ }
+}
+
+/*
+Register sends a registration request to the remote OLT
+*/
+func (o *PonSimOnuDevice) Register(ctx context.Context) error {
+ var err error
+ var rreq *ponsim.RegistrationRequest
+ var rrep *ponsim.RegistrationReply
+ var client ponsim.PonSimOltClient
+
+ if o.Conn != nil {
+ if client = ponsim.NewPonSimOltClient(o.Conn); client != nil {
+ rreq = &ponsim.RegistrationRequest{
+ Id: uuid.New().String(),
+ Address: common.GetInterfaceIP(o.InternalIf),
+ Port: o.Port,
+ }
+ common.Logger().Printf("Request details %+v\n", rreq)
+
+ // TODO: Loop registration until an OLT becomes available??
+
+ rrep, err = client.Register(ctx, rreq)
+ if err != nil {
+ common.Logger().Printf("Problem with registration", err.Error())
+ } else {
+ // Save OLT address details
+ o.ParentAddress = rrep.GetParentAddress()
+ o.ParentPort = rrep.GetParentPort()
+ o.AssignedPort = rrep.GetAssignedPort()
+
+ common.Logger().Printf("Registration details - %+v\n", rrep)
+
+ o.monitor <- REGISTERED_WITH_OLT
+ }
+
+ } else {
+ common.Logger().Info("Client is NIL")
+ }
+ }
+
+ return err
+}
+
+/*
+MonitorConnection verifies the communication with the OLT
+*/
+func (o *PonSimOnuDevice) MonitorConnection(ctx context.Context) {
+ for {
+ if o.state == DISCONNECTED_FROM_PON {
+ // Establish communication with OLT
+ o.Connect(ctx)
+ }
+
+ if o.state == CONNECTED_TO_PON {
+ // Just stay idle while the ONU-OLT connection is up
+ o.Conn.WaitForStateChange(ctx, o.Conn.GetState())
+
+ // The ONU-OLT connection was lost... need to cleanup
+ o.Disconnect(ctx)
+ }
+
+ time.Sleep(1 * time.Second)
+ }
+}
+
+/*
+Connect sets up communication and monitoring with remote OLT
+*/
+func (o *PonSimOnuDevice) Connect(ctx context.Context) {
+ o.monitor = make(chan PonSimDeviceState, 1)
+
+ // Define a waitgroup to block the current routine until
+ // a CONNECTED state is reached
+ wg := sync.WaitGroup{}
+ wg.Add(1)
+
+ go o.MonitorState(ctx, &wg)
+
+ o.ConnectToRemoteOlt()
+
+ // Wait until we establish a connection to the remote PON
+ wg.Wait()
+}
+
+/*
+Disconnect tears down communication and monitoring with remote OLT
+*/
+func (o *PonSimOnuDevice) Disconnect(ctx context.Context) {
+ if o.egressHandler != nil {
+ o.egressHandler.Close()
+ o.egressHandler = nil
+ }
+
+ if o.Conn != nil {
+ o.Conn.Close()
+ o.Conn = nil
+ }
+
+ if o.monitor != nil {
+ close(o.monitor)
+ o.monitor = nil
+ o.state = DISCONNECTED_FROM_PON
+ }
+}
+
+/*
+MonitorState follows the progress of the OLT connection
+*/
+func (o *PonSimOnuDevice) MonitorState(ctx context.Context, wg *sync.WaitGroup) {
+ // Start a concurrent routine to handle ONU state changes
+ var ok bool
+ for {
+ select {
+ case o.state, ok = <-o.monitor:
+ if ok {
+ common.Logger().WithFields(logrus.Fields{
+ "device": o,
+ "state": o.state,
+ }).Info("Received monitoring state")
+
+ switch o.state {
+ case CONNECTED_TO_PON:
+ // We have successfully connected to the OLT
+ // proceed with registration
+ wg.Done()
+
+ if err := o.Register(ctx); err != nil {
+ o.Disconnect(ctx)
+ }
+
+ case DISCONNECTED_FROM_PON:
+ // Connection to remote OLT was lost... exit
+ common.Logger().WithFields(logrus.Fields{
+ "device": o,
+ }).Warn("Exiting due to disconnection")
+ return
+
+ case REGISTERED_WITH_OLT:
+ // Start listening on network interfaces
+ o.connectNetworkInterfaces()
+ o.monitor <- CONNECTED_IO_INTERFACE
+
+ case CONNECTED_IO_INTERFACE:
+ // Start listening on local interfaces
+ go o.Listen(ctx)
+ }
+ } else {
+ common.Logger().WithFields(logrus.Fields{
+ "device": o,
+ }).Warn("Monitoring channel has closed")
+ return
+ }
+ case <-ctx.Done():
+ common.Logger().WithFields(logrus.Fields{
+ "device": o,
+ }).Warn("Received a cancellation notification")
+
+ return
+ }
+ }
+}
+
+/*
+ConnectToRemoteOlt establishes GRPC communication with the remote OLT
+*/
+func (o *PonSimOnuDevice) ConnectToRemoteOlt() {
+ common.Logger().WithFields(logrus.Fields{
+ "device": o,
+ }).Debug("Connecting to remote device")
+
+ var err error
+
+ host := strings.Join([]string{
+ o.ParentAddress,
+ strconv.Itoa(int(o.ParentPort)),
+ }, ":")
+
+ // TODO: make it secure
+ // GRPC communication needs to be secured
+ ta := credentials.NewTLS(&tls.Config{
+ //Certificates: []tls.Certificate{peerCert},
+ //RootCAs: caCertPool,
+ InsecureSkipVerify: true,
+ })
+
+ if o.Conn, err = grpc.DialContext(
+ context.Background(), host, grpc.WithTransportCredentials(ta), grpc.WithBlock(),
+ ); err != nil {
+ common.Logger().WithFields(logrus.Fields{
+ "device": o,
+ "error": err.Error(),
+ }).Error("Problem establishing connection")
+ } else {
+ // We are now connected
+ // time to move on
+ common.Logger().WithFields(logrus.Fields{
+ "device": o,
+ }).Info("Connected to OLT")
+ }
+
+ o.monitor <- CONNECTED_TO_PON
+}