Migration to new event defination for OpenONU adapter
This change is for the migration of alarms and KPIs for earlier
separate defination of AlarmEvent and KPIEvent to a generic format
called Event where it could be an alarm or a KPI event
An event manager adapter_events handles the event submission.
Change-Id: If088a1876fbbae2975ef77c4364dd96bbe361c8d
diff --git a/python/adapters/brcm_openomci_onu/brcm_openomci_onu_handler.py b/python/adapters/brcm_openomci_onu/brcm_openomci_onu_handler.py
index 4c4a8f0..a2510d9 100644
--- a/python/adapters/brcm_openomci_onu/brcm_openomci_onu_handler.py
+++ b/python/adapters/brcm_openomci_onu/brcm_openomci_onu_handler.py
@@ -19,6 +19,7 @@
"""
import ast
+import arrow
import structlog
from collections import OrderedDict
@@ -27,10 +28,10 @@
from twisted.internet.defer import DeferredQueue, inlineCallbacks, returnValue, TimeoutError
from heartbeat import HeartBeat
-from pyvoltha.adapters.extensions.alarms.onu.onu_active_alarm import OnuActiveAlarm
-from pyvoltha.adapters.extensions.kpi.onu.onu_pm_metrics import OnuPmMetrics
-from pyvoltha.adapters.extensions.kpi.onu.onu_omci_pm import OnuOmciPmMetrics
-from pyvoltha.adapters.extensions.alarms.adapter_alarms import AdapterAlarms
+from pyvoltha.adapters.extensions.events.device_events.onu.onu_active_event import OnuActiveEvent
+from pyvoltha.adapters.extensions.events.kpi.onu.onu_pm_metrics import OnuPmMetrics
+from pyvoltha.adapters.extensions.events.kpi.onu.onu_omci_pm import OnuOmciPmMetrics
+from pyvoltha.adapters.extensions.events.adapter_events import AdapterEvents
import pyvoltha.common.openflow.utils as fd
from pyvoltha.common.utils.registry import registry
@@ -79,7 +80,7 @@
self.proxy_address = None
self.tx_id = 0
self._enabled = False
- self.alarms = None
+ self.events = None
self.pm_metrics = None
self._omcc_version = OMCCVersion.Unknown
self._total_tcont_count = 0 # From ANI-G ME
@@ -239,6 +240,10 @@
self.log.debug('pon state initialized', device=device)
############################################################################
+ # Setup Alarm handler
+ self.events = AdapterEvents(self.core_proxy, device.id, self.logical_device_id,
+ device.serial_number)
+ ############################################################################
# Setup PM configuration for this device
# Pass in ONU specific options
kwargs = {
@@ -247,7 +252,7 @@
OnuOmciPmMetrics.OMCI_DEV_KEY: self._onu_omci_device
}
self.log.debug('create-OnuPmMetrics', serial_number=device.serial_number)
- self.pm_metrics = OnuPmMetrics(self.core_proxy, self.device_id,
+ self.pm_metrics = OnuPmMetrics(self.events, self.core_proxy, self.device_id,
self.logical_device_id, device.serial_number,
grouped=True, freq_override=False, **kwargs)
pm_config = self.pm_metrics.make_proto()
@@ -255,12 +260,8 @@
self.log.info("initial-pm-config", pm_config=pm_config)
yield self.core_proxy.device_pm_config_update(pm_config, init=True)
- ############################################################################
- # Setup Alarm handler
- self.alarms = AdapterAlarms(self.core_proxy, device.id, self.logical_device_id,
- device.serial_number)
# Note, ONU ID and UNI intf set in add_uni_port method
- self._onu_omci_device.alarm_synchronizer.set_alarm_params(mgr=self.alarms,
+ self._onu_omci_device.alarm_synchronizer.set_alarm_params(mgr=self.events,
ani_ports=[self._pon])
#Start collecting stats from the device after a brief pause
@@ -1036,7 +1037,7 @@
oper_status=OperStatus.ACTIVE, connect_status=ConnectStatus.REACHABLE)
yield self.core_proxy.device_update(device)
self._mib_download_task = None
- yield self.onu_active_alarm()
+ yield self.onu_active_event()
@inlineCallbacks
def failure(_reason):
@@ -1097,27 +1098,31 @@
return intf_id << 11 | onu_id << 4 | uni_id
@inlineCallbacks
- def onu_active_alarm(self):
+ def onu_active_event(self):
self.log.debug('function-entry')
try:
device = yield self.core_proxy.get_device(self.device_id)
parent_device = yield self.core_proxy.get_device(self.parent_id)
olt_serial_number = parent_device.serial_number
+ raised_ts = arrow.utcnow().timestamp
self.log.debug("onu-indication-context-data",
pon_id=self._onu_indication.intf_id,
+ onu_id=self._onu_indication.onu_id,
registration_id=self.device_id,
device_id=self.device_id,
onu_serial_number=device.serial_number,
- olt_serial_number=olt_serial_number)
+ olt_serial_number=olt_serial_number,
+ raised_ts=raised_ts)
- self.log.debug("Trying to raise alarm")
- OnuActiveAlarm(self.alarms, self.device_id,
+ self.log.debug("Trying-to-raise-onu-active-event")
+ OnuActiveEvent(self.events, self.device_id,
self._onu_indication.intf_id,
device.serial_number,
str(self.device_id),
- olt_serial_number).raise_alarm()
- except Exception as active_alarm_error:
- self.log.exception('onu-activated-alarm-error',
- errmsg=active_alarm_error.message)
+ olt_serial_number,raised_ts,
+ onu_id=self._onu_indication.onu_id).send(True)
+ except Exception as active_event_error:
+ self.log.exception('onu-activated-event-error',
+ errmsg=active_event_error.message)
diff --git a/python/adapters/brcm_openomci_onu/main.py b/python/adapters/brcm_openomci_onu/main.py
index d0eadcb..54f171c 100755
--- a/python/adapters/brcm_openomci_onu/main.py
+++ b/python/adapters/brcm_openomci_onu/main.py
@@ -59,6 +59,7 @@
accept_atomic_flow=os.environ.get('ACCEPT_ATOMIC_FLOW', True),
etcd=os.environ.get('ETCD', 'localhost:2379'),
core_topic=os.environ.get('CORE_TOPIC', 'rwcore'),
+ event_topic=os.environ.get('EVENT_TOPIC', 'voltha.events'),
interface=os.environ.get('INTERFACE', get_my_primary_interface()),
instance_id=os.environ.get('INSTANCE_ID', os.environ.get('HOSTNAME', '1')),
kafka_adapter=os.environ.get('KAFKA_ADAPTER', '192.168.0.20:9092'),
@@ -223,6 +224,13 @@
default=defs['core_topic'],
help=_help)
+ _help = 'topic of events on the kafka bus'
+ parser.add_argument('-et', '--event_topic',
+ dest='event_topic',
+ action='store',
+ default=defs['event_topic'],
+ help=_help)
+
args = parser.parse_args()
# post-processing
@@ -280,6 +288,7 @@
self.instance_id = self.args.instance_id + '_' + str(current_time)
self.core_topic = args.core_topic
+ self.event_topic = args.event_topic
self.listening_topic = args.name
self.startup_components()
@@ -349,6 +358,7 @@
self.core_proxy = CoreProxy(
kafka_proxy=None,
default_core_topic=self.core_topic,
+ default_event_topic=self.event_topic,
my_listening_topic=self.listening_topic)
self.adapter_proxy = AdapterProxy(
diff --git a/python/compose/system-test.yml b/python/compose/system-test.yml
index fd89e20..069c856 100644
--- a/python/compose/system-test.yml
+++ b/python/compose/system-test.yml
@@ -168,6 +168,7 @@
"--kafka_adapter=${DOCKER_HOST_IP}:9092",
"--kafka_cluster=${DOCKER_HOST_IP}:9092",
"--core_topic=rwcore"
+ "--event_topic=voltha.events"
]
networks:
- default
diff --git a/python/requirements.txt b/python/requirements.txt
index b264018..365d76f 100644
--- a/python/requirements.txt
+++ b/python/requirements.txt
@@ -60,4 +60,4 @@
python-consul==0.6.2
afkak==3.0.0.dev20181106
voltha-protos>=0.1.4
-pyvoltha==0.2.4
+pyvoltha==2.2.0