Chip Boling | 32aab30 | 2019-01-23 10:50:18 -0600 | [diff] [blame] | 1 | # |
| 2 | # Copyright 2017 the original author or authors. |
| 3 | # |
| 4 | # Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | # you may not use this file except in compliance with the License. |
| 6 | # You may obtain a copy of the License at |
| 7 | # |
| 8 | # http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | # |
| 10 | # Unless required by applicable law or agreed to in writing, software |
| 11 | # distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | # See the License for the specific language governing permissions and |
| 14 | # limitations under the License. |
| 15 | # |
| 16 | import structlog |
| 17 | from transitions import Machine |
| 18 | from twisted.internet import reactor |
| 19 | from voltha.extensions.omci.onu_device_entry import OnuDeviceEntry, OnuDeviceEvents, IN_SYNC_KEY |
| 20 | from voltha.protos.omci_mib_db_pb2 import OpenOmciEventType |
| 21 | |
| 22 | |
| 23 | class OnuOmciCapabilities(object): |
| 24 | """ |
| 25 | OpenOMCI ONU OMCI Capabilities State machine |
| 26 | """ |
| 27 | DEFAULT_STATES = ['disabled', 'out_of_sync', 'in_sync', 'idle'] |
| 28 | |
| 29 | DEFAULT_TRANSITIONS = [ |
| 30 | {'trigger': 'start', 'source': 'disabled', 'dest': 'out_of_sync'}, |
| 31 | {'trigger': 'synchronized', 'source': 'out_of_sync', 'dest': 'in_sync'}, |
| 32 | |
| 33 | {'trigger': 'success', 'source': 'in_sync', 'dest': 'idle'}, |
| 34 | {'trigger': 'failure', 'source': 'in_sync', 'dest': 'out_of_sync'}, |
| 35 | |
| 36 | {'trigger': 'not_synchronized', 'source': 'idle', 'dest': 'out_of_sync'}, |
| 37 | |
| 38 | # Do wildcard 'stop' trigger last so it covers all previous states |
| 39 | {'trigger': 'stop', 'source': '*', 'dest': 'disabled'}, |
| 40 | ] |
| 41 | DEFAULT_RETRY = 10 # Seconds to delay after task failure/timeout/poll |
| 42 | |
| 43 | def __init__(self, agent, device_id, tasks, |
| 44 | advertise_events=False, |
| 45 | states=DEFAULT_STATES, |
| 46 | transitions=DEFAULT_TRANSITIONS, |
| 47 | initial_state='disabled', |
| 48 | timeout_delay=DEFAULT_RETRY): |
| 49 | """ |
| 50 | Class initialization |
| 51 | |
| 52 | :param agent: (OpenOmciAgent) Agent |
| 53 | :param device_id: (str) ONU Device ID |
| 54 | :param tasks: (dict) Tasks to run |
| 55 | :param advertise_events: (bool) Advertise events on OpenOMCI Event Bus |
| 56 | :param states: (list) List of valid states |
| 57 | :param transitions: (dict) Dictionary of triggers and state changes |
| 58 | :param initial_state: (str) Initial state machine state |
| 59 | :param timeout_delay: (int/float) Number of seconds after a timeout or poll |
| 60 | """ |
| 61 | self.log = structlog.get_logger(device_id=device_id) |
| 62 | |
| 63 | self._agent = agent |
| 64 | self._device_id = device_id |
| 65 | self._device = None |
| 66 | self._timeout_delay = timeout_delay |
| 67 | |
| 68 | self._get_capabilities_task = tasks['get-capabilities'] |
| 69 | self._advertise_events = advertise_events |
| 70 | |
| 71 | self._deferred = None |
| 72 | self._current_task = None |
| 73 | self._task_deferred = None |
| 74 | self._supported_entities = frozenset() |
| 75 | self._supported_msg_types = frozenset() |
| 76 | |
| 77 | self._subscriptions = { # RxEvent.enum -> Subscription Object |
| 78 | OnuDeviceEvents.MibDatabaseSyncEvent: None |
| 79 | } |
| 80 | self._sub_mapping = { |
| 81 | OnuDeviceEvents.MibDatabaseSyncEvent: self.on_mib_sync_event |
| 82 | } |
| 83 | # Statistics and attributes |
| 84 | # TODO: add any others if it will support problem diagnosis |
| 85 | |
| 86 | # Set up state machine to manage states |
| 87 | self.machine = Machine(model=self, states=states, |
| 88 | transitions=transitions, |
| 89 | initial=initial_state, |
| 90 | queued=True, |
| 91 | name='{}-{}'.format(self.__class__.__name__, |
| 92 | device_id)) |
| 93 | |
| 94 | def _cancel_deferred(self): |
| 95 | d1, self._deferred = self._deferred, None |
| 96 | d2, self._task_deferred = self._task_deferred, None |
| 97 | |
| 98 | for d in [d1, d2]: |
| 99 | try: |
| 100 | if d is not None and not d.called: |
| 101 | d.cancel() |
| 102 | except: |
| 103 | pass |
| 104 | |
| 105 | def _cancel_tasks(self): |
| 106 | task, self._current_task = self._current_task, None |
| 107 | if task is not None: |
| 108 | task.stop() |
| 109 | |
| 110 | def __str__(self): |
| 111 | return 'OnuOmciCapabilities: Device ID: {}, State:{}'.format(self._device_id, self.state) |
| 112 | |
| 113 | def delete(self): |
| 114 | """ |
| 115 | Cleanup any state information |
| 116 | """ |
| 117 | self.stop() |
| 118 | |
| 119 | @property |
| 120 | def device_id(self): |
| 121 | return self._device_id |
| 122 | |
| 123 | @property |
| 124 | def supported_managed_entities(self): |
| 125 | """ |
| 126 | Return a set of the Managed Entity class IDs supported on this ONU |
| 127 | None is returned if no MEs have been discovered |
| 128 | |
| 129 | :return: (set of ints) |
| 130 | """ |
| 131 | return self._supported_entities if len(self._supported_entities) else None |
| 132 | |
| 133 | @property |
| 134 | def supported_message_types(self): |
| 135 | """ |
| 136 | Return a set of the Message Types supported on this ONU |
| 137 | None is returned if no message types have been discovered |
| 138 | |
| 139 | :return: (set of EntityOperations) |
| 140 | """ |
| 141 | return self._supported_msg_types if len(self._supported_msg_types) else None |
| 142 | |
| 143 | @property |
| 144 | def advertise_events(self): |
| 145 | return self._advertise_events |
| 146 | |
| 147 | @advertise_events.setter |
| 148 | def advertise_events(self, value): |
| 149 | if not isinstance(value, bool): |
| 150 | raise TypeError('Advertise event is a boolean') |
| 151 | self._advertise_events = value |
| 152 | |
| 153 | def advertise(self, event, info): |
| 154 | """Advertise an event on the OpenOMCI event bus""" |
| 155 | from datetime import datetime |
| 156 | |
| 157 | if self._advertise_events: |
| 158 | self._agent.advertise(event, |
| 159 | { |
| 160 | 'state-machine': self.machine.name, |
| 161 | 'info': info, |
| 162 | 'time': str(datetime.utcnow()) |
| 163 | }) |
| 164 | |
| 165 | def on_enter_disabled(self): |
| 166 | """ |
| 167 | State machine is being stopped |
| 168 | """ |
| 169 | self.advertise(OpenOmciEventType.state_change, self.state) |
| 170 | self._cancel_deferred() |
| 171 | self._cancel_tasks() |
| 172 | |
| 173 | self._supported_entities = frozenset() |
| 174 | self._supported_msg_types = frozenset() |
| 175 | |
| 176 | # Drop Response and Autonomous notification subscriptions |
| 177 | for event, sub in self._subscriptions.iteritems(): |
| 178 | if sub is not None: |
| 179 | self._subscriptions[event] = None |
| 180 | self._device.event_bus.unsubscribe(sub) |
| 181 | |
| 182 | def on_enter_out_of_sync(self): |
| 183 | """ |
| 184 | State machine has just started or the MIB database has transitioned |
| 185 | to an out-of-synchronization state |
| 186 | """ |
| 187 | self.advertise(OpenOmciEventType.state_change, self.state) |
| 188 | self._cancel_deferred() |
| 189 | self._device = self._agent.get_device(self._device_id) |
| 190 | |
| 191 | # Subscribe to events of interest |
| 192 | try: |
| 193 | for event, sub in self._sub_mapping.iteritems(): |
| 194 | if self._subscriptions[event] is None: |
| 195 | self._subscriptions[event] = \ |
| 196 | self._device.event_bus.subscribe( |
| 197 | topic=OnuDeviceEntry.event_bus_topic(self._device_id, |
| 198 | event), |
| 199 | callback=sub) |
| 200 | |
| 201 | except Exception as e: |
| 202 | self.log.exception('subscription-setup', e=e) |
| 203 | |
| 204 | # Periodically check/poll for in-sync in case subscription was missed or |
| 205 | # already in sync |
| 206 | self._deferred = reactor.callLater(0, self.check_in_sync) |
| 207 | |
| 208 | def check_in_sync(self): |
| 209 | if self._device.mib_db_in_sync: |
| 210 | self.synchronized() |
| 211 | else: |
| 212 | self._deferred = reactor.callLater(self._timeout_delay, |
| 213 | self.check_in_sync) |
| 214 | |
| 215 | def on_enter_in_sync(self): |
| 216 | """ |
| 217 | State machine has just transitioned to an in-synchronization state |
| 218 | """ |
| 219 | self.advertise(OpenOmciEventType.state_change, self.state) |
| 220 | self._cancel_deferred() |
| 221 | |
| 222 | def success(results): |
| 223 | self.log.debug('capabilities-success', results=results) |
| 224 | self._supported_entities = self._current_task.supported_managed_entities |
| 225 | self._supported_msg_types = self._current_task.supported_message_types |
| 226 | self._current_task = None |
| 227 | self._deferred = reactor.callLater(0, self.success) |
| 228 | |
| 229 | def failure(reason): |
| 230 | self.log.info('capabilities-failure', reason=reason) |
| 231 | self._supported_entities = frozenset() |
| 232 | self._supported_msg_types = frozenset() |
| 233 | self._current_task = None |
| 234 | self._deferred = reactor.callLater(self._timeout_delay, self.failure) |
| 235 | |
| 236 | # Schedule a task to read the ONU's OMCI capabilities |
| 237 | self._current_task = self._get_capabilities_task(self._agent, self._device_id) |
| 238 | self._task_deferred = self._device.task_runner.queue_task(self._current_task) |
| 239 | self._task_deferred.addCallbacks(success, failure) |
| 240 | |
| 241 | def on_enter_idle(self): |
| 242 | """ |
| 243 | Notify any subscribers for a capabilities event and wait until |
| 244 | stopped or ONU MIB database goes out of sync |
| 245 | """ |
| 246 | self.advertise(OpenOmciEventType.state_change, self.state) |
| 247 | self._cancel_deferred() |
| 248 | self._device.publish_omci_capabilities_event() |
| 249 | |
| 250 | def on_mib_sync_event(self, _topic, msg): |
| 251 | """ |
| 252 | Handle In-Sync/Out-of-Sync for the MIB database |
| 253 | :param _topic: (str) Subscription topic |
| 254 | :param msg: (dict) In-Sync event data |
| 255 | """ |
| 256 | if self._subscriptions.get(OnuDeviceEvents.MibDatabaseSyncEvent) is None: |
| 257 | return |
| 258 | |
| 259 | if msg[IN_SYNC_KEY]: |
| 260 | self.synchronized() |
| 261 | else: |
| 262 | self.not_synchronized() |