VOL-291 : PON simulator refactoring for cluster integration
- Added ponsim build target in Makefile
- Added new option to vcore to select comm type with ponsim
- Modified all proto files to include destination go package
Amendments:
- Clean up based on review comments
- Properly close GRPC connections in ponsim_olt adapter
- Added voltha namespace to some k8s templates
Change-Id: I2f349fa7b3550a8a8cc8fc676cc896f33fbb9372
diff --git a/ponsim/v2/common/flow_sort.go b/ponsim/v2/common/flow_sort.go
new file mode 100644
index 0000000..f62f7ca
--- /dev/null
+++ b/ponsim/v2/common/flow_sort.go
@@ -0,0 +1,17 @@
+package common
+
+import (
+ "github.com/opencord/voltha/protos/go/openflow_13"
+)
+
+type SortByPriority []*openflow_13.OfpFlowStats
+
+func (s SortByPriority) Len() int {
+ return len(s)
+}
+func (s SortByPriority) Swap(i, j int) {
+ s[i], s[j] = s[j], s[i]
+}
+func (s SortByPriority) Less(i, j int) bool {
+ return s[i].Priority < s[j].Priority
+}
diff --git a/ponsim/v2/common/interval_handler.go b/ponsim/v2/common/interval_handler.go
new file mode 100644
index 0000000..973865e
--- /dev/null
+++ b/ponsim/v2/common/interval_handler.go
@@ -0,0 +1,164 @@
+package common
+
+import (
+ "github.com/sirupsen/logrus"
+ "sync"
+ "time"
+)
+
+/*
+ IntervalHandler is used to run a routine at regular intervals and provide all the necessary
+ utilities to manage the execution.
+*/
+
+type IntervalHandler struct {
+ // Interval period in between each execution (in seconds?)
+ Interval int
+ // function to execute after each interval
+ function func()
+ // Channel listening to execution events
+ execute chan _ExecutionState
+ // Channel listening for a termination event
+ terminate chan struct{}
+ // Current execution state of the handler
+ state _ExecutionState
+ wg sync.WaitGroup
+}
+
+// Define execution state constants
+type _ExecutionState uint8
+
+const (
+ STARTED _ExecutionState = iota
+ STOPPED
+ PAUSED
+ RESUMED
+)
+
+// Execute state string equivalents
+var _ExecutionStateEnum = []string{
+ "STARTED",
+ "STOPPED",
+ "PAUSED",
+ "RESUMED",
+}
+
+func (s _ExecutionState) String() string {
+ return _ExecutionStateEnum[s]
+}
+
+/*
+NewIntervalHandler instantiates a new interval based function execution handler
+*/
+func NewIntervalHandler(interval int, function func()) *IntervalHandler {
+ handler := &IntervalHandler{
+ Interval: interval,
+ function: function,
+ }
+
+ handler.execute = make(chan _ExecutionState)
+ handler.terminate = make(chan struct{})
+ handler.state = STOPPED
+
+ return handler
+}
+
+/*
+_Execute is a routine running concurrently and listening to execution events
+*/
+func (h *IntervalHandler) _Execute() {
+ defer h.wg.Done()
+ for {
+ select {
+ case h.state = <-h.execute:
+ Logger().WithFields(logrus.Fields{
+ "handler": h,
+ }).Debug("Processing execution state")
+ switch h.state {
+ case STARTED:
+ case PAUSED:
+ case RESUMED:
+ h.state = STARTED
+ case STOPPED:
+ fallthrough
+ default:
+ h.terminate <- struct{}{}
+ }
+
+ case <-h.terminate:
+ return
+
+ default:
+ if h.state == STARTED {
+ h.function()
+ time.Sleep(time.Duration(h.Interval) * time.Second)
+ } else {
+ // TODO: replace hardcoded delay with a configurable parameter
+ time.Sleep(1 * time.Second)
+ }
+ }
+ }
+}
+
+/*
+Start initiates the interval based function execution
+*/
+func (h *IntervalHandler) Start() {
+ Logger().WithFields(logrus.Fields{
+ "handler": h,
+ }).Info("Starting interval handler")
+
+ if h.execute == nil {
+ return
+ }
+ if h.state == STOPPED {
+ go h._Execute()
+ h.execute <- STARTED
+ }
+}
+
+/*
+Pause interrupts the interval based function execution
+*/
+func (h *IntervalHandler) Pause() {
+ Logger().WithFields(logrus.Fields{
+ "handler": h,
+ }).Info("Pausing interval handler")
+
+ if h.execute == nil || h.state == STOPPED {
+ return
+ }
+ if h.state == STARTED {
+ h.execute <- PAUSED
+ }
+}
+
+/*
+Resume continues the interval based function execution
+*/
+func (h *IntervalHandler) Resume() {
+ Logger().WithFields(logrus.Fields{
+ "handler": h,
+ }).Info("Resuming interval handler")
+
+ if h.execute == nil || h.state == STOPPED {
+ return
+ }
+ if h.state == PAUSED {
+ h.execute <- RESUMED
+ }
+}
+
+/*
+Stop terminates the interval based function execution
+*/
+func (h *IntervalHandler) Stop() {
+ Logger().WithFields(logrus.Fields{
+ "handler": h,
+ }).Info("Stopping interval handler")
+
+ if h.execute == nil || h.state == STOPPED {
+ return
+ }
+ h.execute <- STOPPED
+}
diff --git a/ponsim/v2/common/interval_handler_test.go b/ponsim/v2/common/interval_handler_test.go
new file mode 100644
index 0000000..e1f060f
--- /dev/null
+++ b/ponsim/v2/common/interval_handler_test.go
@@ -0,0 +1,72 @@
+package common
+
+import (
+ "fmt"
+ "testing"
+ "time"
+)
+
+var (
+ handler *IntervalHandler
+ iteration int = 0
+ interval int = 2
+)
+
+func RepeatMessage() {
+ fmt.Printf("Ran the function %d times\n", iteration)
+ iteration += 1
+}
+
+func TestNewIntervalHandler(t *testing.T) {
+ handler = NewIntervalHandler(interval, RepeatMessage)
+
+ if handler.state != STOPPED {
+ t.Error("The handler should be in STOPPED state", handler.state)
+ }
+ if handler.Interval != interval {
+ t.Error("The handler interval doesn't match the configured value", handler.Interval)
+ }
+ if handler.function == nil {
+ t.Error("The handler does not have function configured function", handler.function)
+ }
+}
+
+func TestIntervalHandler_Start(t *testing.T) {
+ handler.Start()
+
+ time.Sleep(5 * time.Second)
+
+ if handler.state != STARTED {
+ t.Error("The handler should be in STARTED state", handler.state)
+ }
+}
+
+func TestIntervalHandler_Pause(t *testing.T) {
+ handler.Pause()
+
+ if handler.state != PAUSED {
+ t.Error("The handler should be in PAUSED state", handler.state)
+ }
+
+ time.Sleep(5 * time.Second)
+}
+
+func TestIntervalHandler_Resume(t *testing.T) {
+ handler.Resume()
+
+ time.Sleep(5 * time.Second)
+
+ if handler.state != STARTED {
+ t.Error("The handler should be in STARTED state", handler.state)
+ }
+}
+
+func TestIntervalHandler_Stop(t *testing.T) {
+ handler.Stop()
+
+ if handler.state != STOPPED {
+ t.Error("The handler should be in STOPPED state", handler.state)
+ }
+
+ time.Sleep(5 * time.Second)
+}
diff --git a/ponsim/v2/common/logger.go b/ponsim/v2/common/logger.go
new file mode 100644
index 0000000..7749dd2
--- /dev/null
+++ b/ponsim/v2/common/logger.go
@@ -0,0 +1,81 @@
+package common
+
+import (
+ "github.com/evalphobia/logrus_fluent"
+ "github.com/sirupsen/logrus"
+ "net"
+ "strconv"
+ "sync"
+)
+
+type logManager struct {
+ *logrus.Logger
+}
+
+// Singleton instance
+var mgrInstance *logManager
+var once sync.Once
+
+func (mgr *logManager) SetFluentd(fluentdHost string) {
+ var hook *logrus_fluent.FluentHook
+ var err error
+ var host string
+ var portStr string
+ var port int
+
+ if host, portStr, err = net.SplitHostPort(fluentdHost); err != nil {
+ mgr.WithFields(logrus.Fields{
+ "error": err.Error(),
+ }).Error("Failed to retrieve host/port information")
+ return
+ }
+
+ if port, err = strconv.Atoi(portStr); err != nil {
+ mgr.WithFields(logrus.Fields{
+ "error": err.Error(),
+ }).Error("Failed to convert port to integer")
+ return
+ }
+
+ if hook, err = logrus_fluent.NewWithConfig(
+ logrus_fluent.Config{
+ Host: host,
+ Port: port,
+ }); err != nil {
+ mgr.WithFields(logrus.Fields{
+ "error": err.Error(),
+ }).Error("Failed to enable Fluentd hook")
+ return
+ }
+
+ hook.SetTag("ponsim")
+
+ hook.SetLevels([]logrus.Level{
+ logrus.DebugLevel,
+ })
+
+ mgr.AddHook(hook)
+
+ mgr.WithFields(logrus.Fields{
+ "hook": hook,
+ }).Info("Added fluentd hook")
+}
+
+/**
+ * Get instance
+ *
+ * It should get initialized only once
+ */
+func Logger() *logManager {
+ once.Do(func() {
+
+ _logger := logrus.New()
+ _logger.Formatter = new(logrus.JSONFormatter)
+ _logger.Level = logrus.DebugLevel
+ //_logger.Out =
+
+ mgrInstance = &logManager{_logger}
+ })
+
+ return mgrInstance
+}
diff --git a/ponsim/v2/common/net_utils.go b/ponsim/v2/common/net_utils.go
new file mode 100644
index 0000000..c37c4ef
--- /dev/null
+++ b/ponsim/v2/common/net_utils.go
@@ -0,0 +1,107 @@
+package common
+
+import (
+ "github.com/google/gopacket"
+ "github.com/google/gopacket/layers"
+ "github.com/sirupsen/logrus"
+ "net"
+)
+
+func GetInterfaceIP(ifName string) string {
+ var err error
+ var netIf *net.Interface
+ var netAddrs []net.Addr
+ var netIp net.IP
+ var ipAddr string
+
+ if netIf, err = net.InterfaceByName(ifName); err == nil {
+ if netAddrs, err = netIf.Addrs(); err == nil {
+ for _, addr := range netAddrs {
+ Logger().WithFields(logrus.Fields{
+ "type": addr.Network(),
+ }).Debug("Address network type")
+ switch v := addr.(type) {
+ case *net.IPNet:
+ netIp = v.IP
+ case *net.IPAddr:
+ netIp = v.IP
+ }
+ if netIp == nil || netIp.IsLoopback() {
+ continue
+ }
+ netIp = netIp.To4()
+ if netIp == nil {
+ continue // not an ipv4 address
+ }
+ ipAddr = netIp.String()
+ break
+ }
+ }
+ }
+
+ return ipAddr
+}
+func GetHostIP(hostName string) string {
+ var err error
+ var ipAddrs []string
+ var ipAddr string
+
+ if ipAddrs, err = net.LookupHost(hostName); err == nil {
+ for _, ip := range ipAddrs {
+ if addr := net.ParseIP(ip); err == nil {
+ Logger().WithFields(logrus.Fields{
+ "ip": addr,
+ }).Debug("Host address")
+ if addr == nil /*|| addr.IsLoopback()*/ {
+ continue
+ }
+ ipAddr = ip
+ break
+ }
+ }
+ }
+
+ return ipAddr
+}
+func GetMacAddress(ifName string) net.HardwareAddr {
+ var err error
+ var netIf *net.Interface
+ var hwAddr net.HardwareAddr
+
+ if netIf, err = net.InterfaceByName(ifName); err == nil {
+ hwAddr = netIf.HardwareAddr
+ }
+
+ return hwAddr
+}
+
+func GetEthernetLayer(frame gopacket.Packet) *layers.Ethernet {
+ eth := &layers.Ethernet{}
+ if ethLayer := frame.Layer(layers.LayerTypeEthernet); ethLayer != nil {
+ eth, _ = ethLayer.(*layers.Ethernet)
+ }
+ return eth
+}
+func GetDot1QLayer(frame gopacket.Packet) *layers.Dot1Q {
+ var dot1q *layers.Dot1Q
+ //dot1q := &layers.Dot1Q{}
+ if dot1qLayer := frame.Layer(layers.LayerTypeDot1Q); dot1qLayer != nil {
+ dot1q, _ = dot1qLayer.(*layers.Dot1Q)
+ }
+ return dot1q
+}
+
+func GetIpLayer(frame gopacket.Packet) *layers.IPv4 {
+ ip := &layers.IPv4{}
+ if ipLayer := frame.Layer(layers.LayerTypeIPv4); ipLayer != nil {
+ ip, _ = ipLayer.(*layers.IPv4)
+ }
+ return ip
+}
+func GetUdpLayer(frame gopacket.Packet) *layers.UDP {
+ udp := &layers.UDP{}
+ if udpLayer := frame.Layer(layers.LayerTypeUDP); udpLayer != nil {
+ udp, _ = udpLayer.(*layers.UDP)
+ }
+ return udp
+}