blob: c13739e15feb346e5213aa956eecc35c7112778c [file] [log] [blame]
Chip Boling32aab302019-01-23 10:50:18 -06001#
2# Copyright 2017 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import structlog
17from transitions import Machine
18from twisted.internet import reactor
19from voltha.extensions.omci.onu_device_entry import OnuDeviceEntry, OnuDeviceEvents, IN_SYNC_KEY
20from voltha.protos.omci_mib_db_pb2 import OpenOmciEventType
21
22
23class OnuOmciCapabilities(object):
24 """
25 OpenOMCI ONU OMCI Capabilities State machine
26 """
27 DEFAULT_STATES = ['disabled', 'out_of_sync', 'in_sync', 'idle']
28
29 DEFAULT_TRANSITIONS = [
30 {'trigger': 'start', 'source': 'disabled', 'dest': 'out_of_sync'},
31 {'trigger': 'synchronized', 'source': 'out_of_sync', 'dest': 'in_sync'},
32
33 {'trigger': 'success', 'source': 'in_sync', 'dest': 'idle'},
34 {'trigger': 'failure', 'source': 'in_sync', 'dest': 'out_of_sync'},
35
36 {'trigger': 'not_synchronized', 'source': 'idle', 'dest': 'out_of_sync'},
37
38 # Do wildcard 'stop' trigger last so it covers all previous states
39 {'trigger': 'stop', 'source': '*', 'dest': 'disabled'},
40 ]
41 DEFAULT_RETRY = 10 # Seconds to delay after task failure/timeout/poll
42
43 def __init__(self, agent, device_id, tasks,
44 advertise_events=False,
45 states=DEFAULT_STATES,
46 transitions=DEFAULT_TRANSITIONS,
47 initial_state='disabled',
48 timeout_delay=DEFAULT_RETRY):
49 """
50 Class initialization
51
52 :param agent: (OpenOmciAgent) Agent
53 :param device_id: (str) ONU Device ID
54 :param tasks: (dict) Tasks to run
55 :param advertise_events: (bool) Advertise events on OpenOMCI Event Bus
56 :param states: (list) List of valid states
57 :param transitions: (dict) Dictionary of triggers and state changes
58 :param initial_state: (str) Initial state machine state
59 :param timeout_delay: (int/float) Number of seconds after a timeout or poll
60 """
61 self.log = structlog.get_logger(device_id=device_id)
62
63 self._agent = agent
64 self._device_id = device_id
65 self._device = None
66 self._timeout_delay = timeout_delay
67
68 self._get_capabilities_task = tasks['get-capabilities']
69 self._advertise_events = advertise_events
70
71 self._deferred = None
72 self._current_task = None
73 self._task_deferred = None
74 self._supported_entities = frozenset()
75 self._supported_msg_types = frozenset()
76
77 self._subscriptions = { # RxEvent.enum -> Subscription Object
78 OnuDeviceEvents.MibDatabaseSyncEvent: None
79 }
80 self._sub_mapping = {
81 OnuDeviceEvents.MibDatabaseSyncEvent: self.on_mib_sync_event
82 }
83 # Statistics and attributes
84 # TODO: add any others if it will support problem diagnosis
85
86 # Set up state machine to manage states
87 self.machine = Machine(model=self, states=states,
88 transitions=transitions,
89 initial=initial_state,
90 queued=True,
91 name='{}-{}'.format(self.__class__.__name__,
92 device_id))
93
94 def _cancel_deferred(self):
95 d1, self._deferred = self._deferred, None
96 d2, self._task_deferred = self._task_deferred, None
97
98 for d in [d1, d2]:
99 try:
100 if d is not None and not d.called:
101 d.cancel()
102 except:
103 pass
104
105 def _cancel_tasks(self):
106 task, self._current_task = self._current_task, None
107 if task is not None:
108 task.stop()
109
110 def __str__(self):
111 return 'OnuOmciCapabilities: Device ID: {}, State:{}'.format(self._device_id, self.state)
112
113 def delete(self):
114 """
115 Cleanup any state information
116 """
117 self.stop()
118
119 @property
120 def device_id(self):
121 return self._device_id
122
123 @property
124 def supported_managed_entities(self):
125 """
126 Return a set of the Managed Entity class IDs supported on this ONU
127 None is returned if no MEs have been discovered
128
129 :return: (set of ints)
130 """
131 return self._supported_entities if len(self._supported_entities) else None
132
133 @property
134 def supported_message_types(self):
135 """
136 Return a set of the Message Types supported on this ONU
137 None is returned if no message types have been discovered
138
139 :return: (set of EntityOperations)
140 """
141 return self._supported_msg_types if len(self._supported_msg_types) else None
142
143 @property
144 def advertise_events(self):
145 return self._advertise_events
146
147 @advertise_events.setter
148 def advertise_events(self, value):
149 if not isinstance(value, bool):
150 raise TypeError('Advertise event is a boolean')
151 self._advertise_events = value
152
153 def advertise(self, event, info):
154 """Advertise an event on the OpenOMCI event bus"""
155 from datetime import datetime
156
157 if self._advertise_events:
158 self._agent.advertise(event,
159 {
160 'state-machine': self.machine.name,
161 'info': info,
162 'time': str(datetime.utcnow())
163 })
164
165 def on_enter_disabled(self):
166 """
167 State machine is being stopped
168 """
169 self.advertise(OpenOmciEventType.state_change, self.state)
170 self._cancel_deferred()
171 self._cancel_tasks()
172
173 self._supported_entities = frozenset()
174 self._supported_msg_types = frozenset()
175
176 # Drop Response and Autonomous notification subscriptions
177 for event, sub in self._subscriptions.iteritems():
178 if sub is not None:
179 self._subscriptions[event] = None
180 self._device.event_bus.unsubscribe(sub)
181
182 def on_enter_out_of_sync(self):
183 """
184 State machine has just started or the MIB database has transitioned
185 to an out-of-synchronization state
186 """
187 self.advertise(OpenOmciEventType.state_change, self.state)
188 self._cancel_deferred()
189 self._device = self._agent.get_device(self._device_id)
190
191 # Subscribe to events of interest
192 try:
193 for event, sub in self._sub_mapping.iteritems():
194 if self._subscriptions[event] is None:
195 self._subscriptions[event] = \
196 self._device.event_bus.subscribe(
197 topic=OnuDeviceEntry.event_bus_topic(self._device_id,
198 event),
199 callback=sub)
200
201 except Exception as e:
202 self.log.exception('subscription-setup', e=e)
203
204 # Periodically check/poll for in-sync in case subscription was missed or
205 # already in sync
206 self._deferred = reactor.callLater(0, self.check_in_sync)
207
208 def check_in_sync(self):
209 if self._device.mib_db_in_sync:
210 self.synchronized()
211 else:
212 self._deferred = reactor.callLater(self._timeout_delay,
213 self.check_in_sync)
214
215 def on_enter_in_sync(self):
216 """
217 State machine has just transitioned to an in-synchronization state
218 """
219 self.advertise(OpenOmciEventType.state_change, self.state)
220 self._cancel_deferred()
221
222 def success(results):
223 self.log.debug('capabilities-success', results=results)
224 self._supported_entities = self._current_task.supported_managed_entities
225 self._supported_msg_types = self._current_task.supported_message_types
226 self._current_task = None
227 self._deferred = reactor.callLater(0, self.success)
228
229 def failure(reason):
230 self.log.info('capabilities-failure', reason=reason)
231 self._supported_entities = frozenset()
232 self._supported_msg_types = frozenset()
233 self._current_task = None
234 self._deferred = reactor.callLater(self._timeout_delay, self.failure)
235
236 # Schedule a task to read the ONU's OMCI capabilities
237 self._current_task = self._get_capabilities_task(self._agent, self._device_id)
238 self._task_deferred = self._device.task_runner.queue_task(self._current_task)
239 self._task_deferred.addCallbacks(success, failure)
240
241 def on_enter_idle(self):
242 """
243 Notify any subscribers for a capabilities event and wait until
244 stopped or ONU MIB database goes out of sync
245 """
246 self.advertise(OpenOmciEventType.state_change, self.state)
247 self._cancel_deferred()
248 self._device.publish_omci_capabilities_event()
249
250 def on_mib_sync_event(self, _topic, msg):
251 """
252 Handle In-Sync/Out-of-Sync for the MIB database
253 :param _topic: (str) Subscription topic
254 :param msg: (dict) In-Sync event data
255 """
256 if self._subscriptions.get(OnuDeviceEvents.MibDatabaseSyncEvent) is None:
257 return
258
259 if msg[IN_SYNC_KEY]:
260 self.synchronized()
261 else:
262 self.not_synchronized()