VOL-291 : PON simulator refactoring for cluster integration

- Added ponsim build target in Makefile
- Added new option to vcore to select comm type with ponsim
- Modified all proto files to include destination go package

Amendments:

- Clean up based on review comments
- Properly close GRPC connections in ponsim_olt adapter
- Added voltha namespace to some k8s templates

Change-Id: I2f349fa7b3550a8a8cc8fc676cc896f33fbb9372
diff --git a/ponsim/v2/core/ponsim_olt.go b/ponsim/v2/core/ponsim_olt.go
new file mode 100644
index 0000000..2485a67
--- /dev/null
+++ b/ponsim/v2/core/ponsim_olt.go
@@ -0,0 +1,403 @@
+package core
+
+import (
+	"context"
+	"crypto/tls"
+	"github.com/golang/protobuf/ptypes/empty"
+	"github.com/google/gopacket"
+	"github.com/opencord/voltha/ponsim/v2/common"
+	"github.com/opencord/voltha/protos/go/ponsim"
+	"github.com/sirupsen/logrus"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/connectivity"
+	"google.golang.org/grpc/credentials"
+	"strconv"
+	"strings"
+	"time"
+)
+
+// TODO: Pass-in the certificate information as a structure parameter
+
+/*
+PonSimOltDevice is the structure responsible for the handling of an OLT device
+*/
+type PonSimOltDevice struct {
+	PonSimDevice  `json:pon_device`
+	VCoreEndpoint string                  `json:vcore_ep`
+	MaxOnuCount   int                     `json:max_onu`
+	Onus          map[int32]*OnuRegistree `json:onu_registrees`
+	outgoing      chan []byte
+
+	counterLoop *common.IntervalHandler
+	alarmLoop   *common.IntervalHandler
+}
+
+/*
+
+ */
+type OnuRegistree struct {
+	Device *PonSimOnuDevice                      `json:onu_device`
+	Conn   *grpc.ClientConn                      `json:grpc_conn`
+	Client ponsim.PonSimCommonClient             `json:client`
+	Stream ponsim.PonSimCommon_ProcessDataClient `json:stream`
+}
+
+const (
+	BASE_PORT_NUMBER = 128
+)
+
+/*
+NewPonSimOltDevice instantiates a new OLT device structure
+*/
+func NewPonSimOltDevice(device PonSimDevice) *PonSimOltDevice {
+	olt := &PonSimOltDevice{PonSimDevice: device}
+	return olt
+}
+
+/*
+forwardToONU defines a EGRESS function to forward a packet to a specific ONU
+*/
+func (o *PonSimOltDevice) forwardToONU(onuPort int32) func(int, gopacket.Packet) {
+	return func(port int, frame gopacket.Packet) {
+		ipAddress := common.GetInterfaceIP(o.ExternalIf)
+		incoming := &ponsim.IncomingData{
+			Id:      "EGRESS.OLT." + ipAddress,
+			Address: ipAddress,
+			Port:    int32(port),
+			Payload: frame.Data(),
+		}
+		common.Logger().WithFields(logrus.Fields{
+			"device": o,
+			"port":   port,
+			"frame":  frame,
+		}).Debug("Forwarding to ONU")
+
+		// Forward packet to ONU
+		if err := o.GetOnu(onuPort).Stream.Send(incoming); err != nil {
+			common.Logger().WithFields(logrus.Fields{
+				"device":    o,
+				"frameDump": frame.Dump(),
+				"incoming":  incoming,
+				"error":     err.Error(),
+			}).Error("A problem occurred while forwarding to ONU")
+		}
+
+	}
+}
+
+/*
+forwardToLAN defines an INGRESS function to forward a packet to VOLTHA
+*/
+func (o *PonSimOltDevice) forwardToLAN() func(int, gopacket.Packet) {
+	return func(port int, frame gopacket.Packet) {
+		common.Logger().WithFields(logrus.Fields{
+			"frame": frame.Dump(),
+		}).Info("Sending packet")
+
+		select {
+		case o.outgoing <- frame.Data():
+			common.Logger().WithFields(logrus.Fields{
+				"frame": frame.Dump(),
+			}).Info("Sent packet")
+		default:
+			common.Logger().WithFields(logrus.Fields{
+				"frame": frame.Dump(),
+			}).Warn("Unable to send packet")
+		}
+	}
+}
+
+/*
+Start performs setup operations for an OLT device
+*/
+func (o *PonSimOltDevice) Start(ctx context.Context) {
+	common.Logger().Info("Starting OLT device...")
+	o.PonSimDevice.Start(ctx)
+
+	// Open network interfaces for listening
+	o.connectNetworkInterfaces()
+
+	o.outgoing = make(chan []byte, 1)
+
+	// Add INGRESS operation
+	o.AddLink(2, 0, o.forwardToLAN())
+
+	// Start PM counter logging
+	o.counterLoop = common.NewIntervalHandler(90, o.Counter.LogCounts)
+	o.counterLoop.Start()
+
+	// Start alarm simulation
+	if o.AlarmsOn {
+		common.Logger().WithFields(logrus.Fields{
+			"device": o,
+		}).Debug("Starting alarm simulation")
+
+		alarms := NewPonSimAlarm(o.InternalIf, o.VCoreEndpoint, o.forwardToLAN())
+		o.alarmLoop = common.NewIntervalHandler(o.AlarmsFreq, alarms.GenerateAlarm)
+		o.alarmLoop.Start()
+	}
+}
+
+/*
+Stop performs cleanup operations for an OLT device
+*/
+func (o *PonSimOltDevice) Stop(ctx context.Context) {
+	common.Logger().Info("Stopping OLT device...")
+
+	// Stop PM counters loop
+	o.counterLoop.Stop()
+	o.counterLoop = nil
+
+	// Stop alarm simulation
+	if o.AlarmsOn {
+		o.alarmLoop.Stop()
+	}
+	o.alarmLoop = nil
+
+	o.ingressHandler.Close()
+	o.egressHandler.Close()
+
+	o.PonSimDevice.Stop(ctx)
+}
+
+/*
+ConnectToRemoteOnu establishes communication to a remote ONU device
+*/
+func (o *PonSimOltDevice) ConnectToRemoteOnu(onu *OnuRegistree) error {
+	var err error
+
+	host := strings.Join([]string{
+		onu.Device.Address,
+		strconv.Itoa(int(onu.Device.Port)),
+	}, ":")
+
+	common.Logger().WithFields(logrus.Fields{
+		"device": o,
+		"host":   host,
+	}).Debug("Formatting host address")
+
+	// TODO: make it secure
+	ta := credentials.NewTLS(&tls.Config{
+		//Certificates:       []tls.Certificate{peerCert},
+		//RootCAs:            caCertPool,
+		InsecureSkipVerify: true,
+	})
+
+	// GRPC communication needs to be secured
+	if onu.Conn, err = grpc.DialContext(
+		context.Background(),
+		host,
+		grpc.WithTransportCredentials(ta),
+	); err != nil {
+		common.Logger().WithFields(logrus.Fields{
+			"device": o,
+			"error":  err.Error(),
+		}).Error("Problem with client connection")
+	}
+
+	return err
+}
+
+/*
+Listen waits for incoming EGRESS data on the internal interface
+*/
+func (o *PonSimOltDevice) Listen(ctx context.Context, port int32) {
+	var reply *empty.Empty
+	var err error
+
+	// Establish a GRPC connection with the ONU
+	onu := o.GetOnu(port)
+
+	common.Logger().WithFields(logrus.Fields{
+		"onu": onu,
+	}).Debug("Connecting to remote ONU")
+
+	if onu.Client = ponsim.NewPonSimCommonClient(onu.Conn); onu.Client == nil {
+		common.Logger().WithFields(logrus.Fields{
+			"device": o,
+		}).Error("Problem establishing client connection to ONU")
+		o.RemoveOnu(ctx, port)
+		return
+	}
+
+	// Prepare stream to ONU to forward incoming data as needed
+	if onu.Stream, err = onu.Client.ProcessData(ctx); err != nil {
+		common.Logger().WithFields(logrus.Fields{
+			"device": o,
+		}).Error("Problem establishing stream to ONU")
+		o.RemoveOnu(ctx, port)
+		return
+	}
+
+	defer o.egressHandler.Close()
+	packetSource := gopacket.NewPacketSource(o.egressHandler, o.egressHandler.LinkType())
+	common.Logger().WithFields(logrus.Fields{
+		"device":    o,
+		"interface": o.InternalIf,
+	}).Debug("Listening to incoming EGRESS data")
+
+	// Wait for incoming EGRESS data
+	for packet := range packetSource.Packets() {
+		if dot1q := common.GetDot1QLayer(packet); dot1q != nil {
+			common.Logger().WithFields(logrus.Fields{
+				"device": o,
+				"packet": packet,
+			}).Debug("Received EGRESS packet")
+
+			o.Forward(ctx, 2, packet)
+		}
+	}
+
+	common.Logger().WithFields(logrus.Fields{
+		"device": o,
+	}).Debug("No more packets to process")
+
+	if reply, err = onu.Stream.CloseAndRecv(); err != nil {
+		common.Logger().WithFields(logrus.Fields{
+			"device": o,
+			"error":  err.Error(),
+		}).Error("A problem occurred while closing client stream")
+	} else {
+		common.Logger().WithFields(logrus.Fields{
+			"device": o,
+			"reply":  reply,
+		}).Warn("Client stream closed")
+	}
+}
+
+/*
+GetOnus returns the list of registered ONU devices
+*/
+func (o *PonSimOltDevice) GetOnus() map[int32]*OnuRegistree {
+	if o.Onus == nil {
+		o.Onus = make(map[int32]*OnuRegistree)
+	}
+
+	return o.Onus
+}
+
+/*
+GetOnu return a specific registered ONU
+*/
+func (o *PonSimOltDevice) GetOnu(index int32) *OnuRegistree {
+	var onu *OnuRegistree
+	var ok bool
+
+	if onu, ok = (o.GetOnus())[index]; ok {
+		return onu
+	}
+
+	return nil
+}
+
+func (o *PonSimOltDevice) GetOutgoing() chan []byte {
+	return o.outgoing
+}
+
+/*
+nextAvailablePort returns a port that is not already used by a registered ONU
+*/
+func (o *PonSimOltDevice) nextAvailablePort() int32 {
+	var port int32 = BASE_PORT_NUMBER
+
+	if len(o.GetOnus()) < o.MaxOnuCount {
+		for {
+			if o.GetOnu(port) != nil {
+				// port is already used
+				port += 1
+			} else {
+				// port is available... use it
+				return port
+			}
+		}
+	} else {
+		// OLT has reached its max number of ONUs
+		return -1
+	}
+}
+
+/*
+AddOnu registers an ONU device and sets up all required monitoring and connections
+*/
+func (o *PonSimOltDevice) AddOnu(onu *PonSimOnuDevice) (int32, error) {
+	var portNum int32
+	ctx := context.Background()
+
+	if portNum = o.nextAvailablePort(); portNum != -1 {
+		common.Logger().WithFields(logrus.Fields{
+			"device": o,
+			"port":   portNum,
+			"onu":    onu,
+		}).Info("Adding ONU")
+
+		registree := &OnuRegistree{Device: onu}
+
+		// Setup GRPC communication and check if it succeeded
+		if err := o.ConnectToRemoteOnu(registree); err == nil {
+			o.GetOnus()[portNum] = registree
+
+			o.AddLink(1, int(portNum), o.forwardToONU(portNum))
+			go o.MonitorOnu(ctx, portNum)
+			go o.Listen(ctx, portNum)
+		}
+
+	} else {
+		common.Logger().WithFields(logrus.Fields{
+			"device": o,
+		}).Warn("ONU Map is full")
+	}
+
+	return int32(portNum), nil
+}
+
+/*
+RemoveOnu removes the reference to a registered ONU
+*/
+func (o *PonSimOltDevice) RemoveOnu(ctx context.Context, onuIndex int32) error {
+	onu := o.GetOnu(onuIndex)
+	if err := onu.Conn.Close(); err != nil {
+		common.Logger().WithFields(logrus.Fields{
+			"device":   o,
+			"onu":      onu.Device,
+			"onuIndex": onuIndex,
+		}).Error("Problem closing connection to ONU")
+	}
+
+	common.Logger().WithFields(logrus.Fields{
+		"device":   o,
+		"onu":      onu,
+		"onuIndex": onuIndex,
+	}).Info("Removing ONU")
+
+	delete(o.Onus, onuIndex)
+
+	// Remove link entries for this ONU
+	o.RemoveLink(1, int(onuIndex))
+
+	return nil
+}
+
+/*
+MonitorOnu verifies the connection status of a specific ONU and cleans up as necessary
+*/
+func (o *PonSimOltDevice) MonitorOnu(ctx context.Context, onuIndex int32) {
+	for {
+		if o.GetOnu(onuIndex) != nil {
+			if conn := o.GetOnu(onuIndex).Conn; conn.GetState() == connectivity.Ready {
+				// Wait for any change to occur
+				conn.WaitForStateChange(ctx, conn.GetState())
+				// We lost communication with the ONU ... remove it
+				o.RemoveOnu(ctx, onuIndex)
+				return
+			}
+			common.Logger().WithFields(logrus.Fields{
+				"device":   o,
+				"ctx":      ctx,
+				"onuIndex": onuIndex,
+			}).Debug("ONU is not ready")
+			time.Sleep(1 * time.Second)
+		} else {
+			return
+		}
+	}
+}