blob: 61fdc2c906f966eb51e7b5a086e3732c6878fd49 [file] [log] [blame]
Chip Boling8e042f62019-02-12 16:14:34 -06001#
2# Copyright 2017 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import structlog
17import ast
18from pon_port import PonPort
19from uni_port import UniPort
20from heartbeat import HeartBeat
21from omci.omci import OMCI
22from onu_traffic_descriptor import OnuTrafficDescriptor
23from onu_tcont import OnuTCont
24from onu_gem_port import OnuGemPort
25
26from pyvoltha.adapters.extensions.alarms.adapter_alarms import AdapterAlarms
27from pyvoltha.adapters.extensions.kpi.onu.onu_pm_metrics import OnuPmMetrics
28from pyvoltha.adapters.extensions.kpi.onu.onu_omci_pm import OnuOmciPmMetrics
29
30from twisted.internet import reactor
31from twisted.internet.defer import DeferredQueue, inlineCallbacks
32from twisted.internet.defer import returnValue
33
34from pyvoltha.common.utils.registry import registry
35from pyvoltha.protos import third_party
36from pyvoltha.protos.common_pb2 import OperStatus, ConnectStatus
37from pyvoltha.common.tech_profile.tech_profile import TechProfile
38from pyvoltha.adapters.common.kvstore.consul_client import Consul
39from pyvoltha.adapters.common.kvstore.etcd_client import EtcdClient
40
41import adtran_olt.resources.adtranolt_platform as platform
42from adapters.adtran_common.flow.flow_entry import FlowEntry
43from omci.adtn_install_flow import AdtnInstallFlowTask
44from omci.adtn_remove_flow import AdtnRemoveFlowTask
45from omci.adtn_tp_service_specific_task import AdtnTpServiceSpecificTask
46from pyvoltha.common.tech_profile.tech_profile import DEFAULT_TECH_PROFILE_TABLE_ID
47
48_ = third_party
49_MAXIMUM_PORT = 17 # Only one PON and UNI port at this time
50_ONU_REBOOT_MIN = 90 # IBONT 602 takes about 3 minutes
51_ONU_REBOOT_RETRY = 10
52_STARTUP_RETRY_WAIT = 20
53
54
55class AdtranOnuHandler(object):
56 def __init__(self, adapter, device_id):
57 self.adapter = adapter
58 self.adapter_agent = adapter.adapter_agent
59 self.device_id = device_id
60 self.log = structlog.get_logger(device_id=device_id)
61 self.logical_device_id = None
62 self.proxy_address = None
63 self._enabled = False
64 self.pm_metrics = None
65 self.alarms = None
66
67 self._openomci = OMCI(self, adapter.omci_agent)
68 self._in_sync_subscription = None
69
70 self._pon_port_number = 1
71
72 self._unis = dict() # Port # -> UniPort
73 self._pon = PonPort.create(self, self._pon_port_number)
74 self._heartbeat = HeartBeat.create(self, device_id)
75 self._deferred = None
76
77 # Flow entries
78 self._flows = dict()
79
80 # OMCI resources # TODO: Some of these could be dynamically chosen
81 self.vlan_tcis_1 = 0x900
82 self.mac_bridge_service_profile_entity_id = self.vlan_tcis_1
83 self.gal_enet_profile_entity_id = 0
84
85 # Technology profile related values
86 self.incoming_messages = DeferredQueue()
87 self.event_messages = DeferredQueue()
88 self._tp_service_specific_task = dict()
89 self._tech_profile_download_done = dict()
90
91 # Initialize KV store client
92 self.args = registry('main').get_args()
93 if self.args.backend == 'etcd':
94 host, port = self.args.etcd.split(':', 1)
95 self.kv_client = EtcdClient(host, port,
96 TechProfile.KV_STORE_TECH_PROFILE_PATH_PREFIX)
97 elif self.args.backend == 'consul':
98 host, port = self.args.consul.split(':', 1)
99 self.kv_client = Consul(host, port,
100 TechProfile.KV_STORE_TECH_PROFILE_PATH_PREFIX)
101 else:
102 self.log.error('Invalid-backend')
103 raise Exception("Invalid-backend-for-kv-store")
104
105 # Handle received ONU event messages
106 reactor.callLater(0, self.handle_onu_events)
107
108 def __str__(self):
109 return "AdtranOnuHandler: {}".format(self.device_id)
110
111 def _cancel_deferred(self):
112 d, self._deferred = self._deferred, None
113 try:
114 if d is not None and not d.called:
115 d.cancel()
116 except:
117 pass
118
119 @property
120 def enabled(self):
121 return self._enabled
122
123 @enabled.setter
124 def enabled(self, value):
125 assert isinstance(value, bool), 'enabled is a boolean'
126 if self._enabled != value:
127 self._enabled = value
128 if self._enabled:
129 self.start()
130 else:
131 self.stop()
132
133 @property
134 def openomci(self):
135 return self._openomci
136
137 @property
138 def heartbeat(self):
139 return self._heartbeat
140
141 @property
142 def uni_ports(self):
143 return self._unis.values()
144
145 def uni_port(self, port_no_or_name):
146 if isinstance(port_no_or_name, (str, unicode)):
147 return next((uni for uni in self.uni_ports
148 if uni.name == port_no_or_name), None)
149
150 assert isinstance(port_no_or_name, int), 'Invalid parameter type'
151 return self._unis.get(port_no_or_name)
152
153 def pon_port(self, port_no=None):
154 return self._pon if port_no is None or port_no == self._pon.port_number else None
155
156 @property
157 def pon_ports(self):
158 return [self._pon]
159
160 def start(self):
161 assert self._enabled, 'Start should only be called if enabled'
162 self._cancel_deferred()
163
164 # Register for adapter messages
165 self.adapter_agent.register_for_inter_adapter_messages()
166
167 # OpenOMCI Startup
168 self._subscribe_to_events()
169 self._openomci.enabled = True
170
171 # Port startup
172 if self._pon is not None:
173 self._pon.enabled = True
174
175 for port in self.uni_ports:
176 port.enabled = True
177
178 # Heartbeat
179 self._heartbeat.enabled = True
180
181 def stop(self):
182 assert not self._enabled, 'Stop should only be called if disabled'
183 self._cancel_deferred()
184
185 # Drop registration for adapter messages
186 self.adapter_agent.unregister_for_inter_adapter_messages()
187
188 # Heartbeat
189 self._heartbeat.enabled = False
190
191 # OMCI Communications
192 self._unsubscribe_to_events()
193
194 # Port shutdown
195 for port in self.uni_ports:
196 port.enabled = False
197
198 if self._pon is not None:
199 self._pon.enabled = False
200 self._openomci.enabled = False
201
202 def receive_message(self, msg):
203 if self.enabled:
204 # TODO: Have OpenOMCI actually receive the messages
205 self.openomci.receive_message(msg)
206
207 def activate(self, device):
208 self.log.info('activating')
209
210 try:
211 # first we verify that we got parent reference and proxy info
212 assert device.parent_id, 'Invalid Parent ID'
213 assert device.proxy_address.device_id, 'Invalid Device ID'
214
215 # register for proxied messages right away
216 self.proxy_address = device.proxy_address
217 self.adapter_agent.register_for_proxied_messages(device.proxy_address)
218
219 # initialize device info
220 device.root = False
221 device.vendor = 'Adtran Inc.'
222 device.model = 'n/a'
223 device.hardware_version = 'n/a'
224 device.firmware_version = 'n/a'
225 device.reason = ''
226 device.connect_status = ConnectStatus.UNKNOWN
227
228 # Register physical ports. Should have at least one of each
229 self.adapter_agent.add_port(device.id, self._pon.get_port())
230
231 def xpon_not_found():
232 self.enabled = True
233
234 # Schedule xPON 'not found' startup for 10 seconds from now. We will
235 # easily get a vONT-ANI create within that time if xPON is being used
236 # as this is how we are initially launched and activated in the first
237 # place if xPON is in use.
238 reactor.callLater(10, xpon_not_found) # TODO: Clean up old xPON delay
239
240 # reference of uni_port is required when re-enabling the device if
241 # it was disabled previously
242 # Need to query ONU for number of supported uni ports
243 # For now, temporarily set number of ports to 1 - port #2
244 parent_device = self.adapter_agent.get_device(device.parent_id)
245
246 self.logical_device_id = parent_device.parent_id
247 self.adapter_agent.update_device(device)
248
249 ############################################################################
250 # Setup PM configuration for this device
251 # Pass in ONU specific options
252 kwargs = {
253 OnuPmMetrics.DEFAULT_FREQUENCY_KEY: OnuPmMetrics.DEFAULT_ONU_COLLECTION_FREQUENCY,
254 'heartbeat': self.heartbeat,
255 OnuOmciPmMetrics.OMCI_DEV_KEY: self.openomci.onu_omci_device
256 }
257 self.pm_metrics = OnuPmMetrics(self.adapter_agent, self.device_id,
258 self.logical_device_id, grouped=True,
259 freq_override=False, **kwargs)
260 pm_config = self.pm_metrics.make_proto()
261 self.openomci.set_pm_config(self.pm_metrics.omci_pm.openomci_interval_pm)
262 self.log.info("initial-pm-config", pm_config=pm_config)
263 self.adapter_agent.update_device_pm_config(pm_config, init=True)
264
265 ############################################################################
266 # Setup Alarm handler
267 self.alarms = AdapterAlarms(self.adapter_agent, device.id, self.logical_device_id)
268 self.openomci.onu_omci_device.alarm_synchronizer.set_alarm_params(mgr=self.alarms,
269 ani_ports=[self._pon])
270 ############################################################################
271 # Start collecting stats from the device after a brief pause
272 reactor.callLater(30, self.pm_metrics.start_collector)
273
274 except Exception as e:
275 self.log.exception('activate-failure', e=e)
276 device.reason = 'Failed to activate: {}'.format(e.message)
277 device.connect_status = ConnectStatus.UNREACHABLE
278 device.oper_status = OperStatus.FAILED
279 self.adapter_agent.update_device(device)
280
281 def reconcile(self, device):
282 self.log.info('reconciling-ONU-device-starts')
283
284 # first we verify that we got parent reference and proxy info
285 assert device.parent_id
286 assert device.proxy_address.device_id
287 # assert device.proxy_address.channel_id
288 self._cancel_deferred()
289
290 # register for proxied messages right away
291 self.proxy_address = device.proxy_address
292 self.adapter_agent.register_for_proxied_messages(device.proxy_address)
293
294 # Register for adapter messages
295 self.adapter_agent.register_for_inter_adapter_messages()
296
297 # Set the connection status to REACHABLE
298 device.connect_status = ConnectStatus.REACHABLE
299 self.adapter_agent.update_device(device)
300 self.enabled = True
301
302 # TODO: Verify that the uni, pon and logical ports exists
303
304 # Mark the device as REACHABLE and ACTIVE
305 device = self.adapter_agent.get_device(device.id)
306 device.connect_status = ConnectStatus.REACHABLE
307 device.oper_status = OperStatus.ACTIVE
308 device.reason = ''
309 self.adapter_agent.update_device(device)
310
311 self.log.info('reconciling-ONU-device-ends')
312
313 @inlineCallbacks
314 def handle_onu_events(self):
315 # TODO: Add 'shutdown' message to exit loop
316 event_msg = yield self.event_messages.get()
317 try:
318 if event_msg['event'] == 'download_tech_profile':
319 tp_path = event_msg['event_data']
320 uni_id = event_msg['uni_id']
321 self.load_and_configure_tech_profile(uni_id, tp_path)
322
323 except Exception as e:
324 self.log.error("exception-handling-onu-event", e=e)
325
326 # Handle next event
327 reactor.callLater(0, self.handle_onu_events)
328
329 def _tp_path_to_tp_id(self, tp_path):
330 parts = tp_path.split('/')
331 if len(parts) > 2:
332 try:
333 return int(tp_path[1])
334 except ValueError:
335 return DEFAULT_TECH_PROFILE_TABLE_ID
336
337 def _create_tcont(self, uni_id, us_scheduler, tech_profile_id):
338 """
339 Decode Upstream Scheduler and create appropriate TCONT structures
340
341 :param uni_id: (int) UNI ID on the PON
342 :param us_scheduler: (Scheduler) Upstream Scheduler with TCONT information
343 :param tech_profile_id: (int) Tech Profile ID
344
345 :return (OnuTCont) Created TCONT
346 """
347 self.log.debug('create-tcont', us_scheduler=us_scheduler, profile_id=tech_profile_id)
348
349 q_sched_policy = {
350 'strictpriority': 1, # Per TCONT (ME #262) values
351 'wrr': 2
352 }.get(us_scheduler.get('q_sched_policy', 'none').lower(), 0)
353
354 tcont_data = {
355 'tech-profile-id': tech_profile_id,
356 'uni-id': uni_id,
357 'alloc-id': us_scheduler['alloc_id'],
358 'q-sched-policy': q_sched_policy
359 }
360 # TODO: Support TD if shaping on ONU is to be performed
361 td = OnuTrafficDescriptor(0, 0, 0)
362 tcont = OnuTCont.create(self, tcont_data, td)
363 self._pon.add_tcont(tcont)
364 return tcont
365
366 # Called when there is an olt up indication, providing the gem port id chosen by the olt handler
367 def _create_gemports(self, upstream_ports, downstream_ports, tcont, uni_id, tech_profile_id):
368 """
369 Create GEM Ports for a specifc tech profile
370
371 The routine will attempt to combine upstream and downstream GEM Ports into bidirectional
372 ports where possible
373
374 :param upstream_ports: (list of IGemPortAttribute) Upstream GEM Port attributes
375 :param downstream_ports: (list of IGemPortAttribute) Downstream GEM Port attributes
376 :param tcont: (OnuTCont) Associated TCONT
377 :param uni_id: (int) UNI Instance ID
378 :param tech_profile_id: (int) Tech Profile ID
379 """
380 self.log.debug('create-gemports', upstream=upstream_ports,
381 downstream_ports=downstream_ports,
382 tcont=tcont, tech_id=tech_profile_id)
383 # Convert GEM Port lists to dicts with GEM ID as they key
384 upstream = {gem['gemport_id']: gem for gem in upstream_ports}
385 downstream = {gem['gemport_id']: gem for gem in downstream_ports}
386
387 upstream_ids = set(upstream.keys())
388 downstream_ids = set(downstream.keys())
389 bidirectional_ids = upstream_ids & downstream_ids
390
391 gem_port_types = { # Keys are the 'direction' attribute value, value is list of GEM attributes
392 OnuGemPort.UPSTREAM: [upstream[gid] for gid in upstream_ids - bidirectional_ids],
393 OnuGemPort.DOWNSTREAM: [downstream[gid] for gid in downstream_ids - bidirectional_ids],
394 OnuGemPort.BIDIRECTIONAL: [upstream[gid] for gid in bidirectional_ids]
395 }
396 for direction, gem_list in gem_port_types.items():
397 for gem in gem_list:
398 gem_data = {
399 'gemport-id': gem['gemport_id'],
400 'direction': direction,
401 'encryption': gem['aes_encryption'].lower() == 'true',
402 'discard-policy': gem['discard_policy'],
403 'max-q-size': gem['max_q_size'],
404 'pbit-map': gem['pbit_map'],
405 'priority-q': gem['priority_q'],
406 'scheduling-policy': gem['scheduling_policy'],
407 'weight': gem['weight'],
408 'uni-id': uni_id,
409 'discard-config': {
410 'max-probability': gem['discard_config']['max_probability'],
411 'max-threshold': gem['discard_config']['max_threshold'],
412 'min-threshold': gem['discard_config']['min_threshold'],
413 },
414 }
415 gem_port = OnuGemPort.create(self, gem_data, tcont.alloc_id,
416 tech_profile_id, uni_id,
417 self._pon.next_gem_entity_id)
418 self._pon.add_gem_port(gem_port)
419
420 def _do_tech_profile_configuration(self, uni_id, tp, tech_profile_id):
421 us_scheduler = tp['us_scheduler']
422 tcont = self._create_tcont(uni_id, us_scheduler, tech_profile_id)
423
424 upstream = tp['upstream_gem_port_attribute_list']
425 downstream = tp['downstream_gem_port_attribute_list']
426 self._create_gemports(upstream, downstream, tcont, uni_id, tech_profile_id)
427
428 def load_and_configure_tech_profile(self, uni_id, tp_path):
429 self.log.debug("loading-tech-profile-configuration", uni_id=uni_id, tp_path=tp_path)
430
431 if uni_id not in self._tp_service_specific_task:
432 self._tp_service_specific_task[uni_id] = dict()
433
434 if uni_id not in self._tech_profile_download_done:
435 self._tech_profile_download_done[uni_id] = dict()
436
437 if tp_path not in self._tech_profile_download_done[uni_id]:
438 self._tech_profile_download_done[uni_id][tp_path] = False
439
440 if not self._tech_profile_download_done[uni_id][tp_path]:
441 try:
442 if tp_path in self._tp_service_specific_task[uni_id]:
443 self.log.info("tech-profile-config-already-in-progress",
444 tp_path=tp_path)
445 return
446
447 tp = self.kv_client[tp_path]
448 tp = ast.literal_eval(tp)
449 self.log.debug("tp-instance", tp=tp)
450
451 tech_profile_id = self._tp_path_to_tp_id(tp_path)
452 self._do_tech_profile_configuration(uni_id, tp, tech_profile_id)
453
454 def success(_results):
455 self.log.info("tech-profile-config-done-successfully")
456 device = self.adapter_agent.get_device(self.device_id)
457 device.reason = ''
458 self.adapter_agent.update_device(device)
459
460 if tp_path in self._tp_service_specific_task[uni_id]:
461 del self._tp_service_specific_task[uni_id][tp_path]
462
463 self._tech_profile_download_done[uni_id][tp_path] = True
464
465 def failure(_reason):
466 self.log.warn('tech-profile-config-failure-retrying', reason=_reason)
467 device = self.adapter_agent.get_device(self.device_id)
468 device.reason = 'Tech Profile config failed-retrying'
469 self.adapter_agent.update_device(device)
470
471 if tp_path in self._tp_service_specific_task[uni_id]:
472 del self._tp_service_specific_task[uni_id][tp_path]
473
474 self._deferred = reactor.callLater(_STARTUP_RETRY_WAIT,
475 self.load_and_configure_tech_profile,
476 uni_id, tp_path)
477
478 self.log.info('downloading-tech-profile-configuration')
479 tp_task = AdtnTpServiceSpecificTask(self.openomci.omci_agent, self, uni_id)
480
481 self._tp_service_specific_task[uni_id][tp_path] = tp_task
482 self._deferred = self.openomci.onu_omci_device.task_runner.queue_task(tp_task)
483 self._deferred.addCallbacks(success, failure)
484
485 except Exception as e:
486 self.log.exception("error-loading-tech-profile", e=e)
487 else:
488 self.log.info("tech-profile-config-already-done")
489
490 def update_pm_config(self, _device, pm_config):
491 # TODO: This has not been tested
492 self.log.info('update_pm_config', pm_config=pm_config)
493 self.pm_metrics.update(pm_config)
494
495 @inlineCallbacks
496 def update_flow_table(self, flows):
497 if len(flows) == 0:
498 returnValue('nop') # TODO: Do we need to delete all flows if empty?
499
500 self.log.debug('bulk-flow-update', flows=flows)
501 valid_flows = set()
502
503 for flow in flows:
504 # Decode it
505 flow_entry = FlowEntry.create(flow, self)
506
507 # Already handled?
508 if flow_entry.flow_id in self._flows:
509 valid_flows.add(flow_entry.flow_id)
510
511 if flow_entry is None or flow_entry.flow_direction not in \
512 FlowEntry.upstream_flow_types | FlowEntry.downstream_flow_types:
513 continue
514
515 is_upstream = flow_entry.flow_direction in FlowEntry.upstream_flow_types
516
517 # Ignore untagged upstream etherType flows. These are trapped at the
518 # OLT and the default flows during initial OMCI service download will
519 # send them to the Default VLAN (4091) port for us
520 if is_upstream and flow_entry.vlan_vid is None and flow_entry.etype is not None:
521 continue
522
523 # Also ignore upstream untagged/priority tag that sets priority tag
524 # since that is already installed and any user-data flows for upstream
525 # priority tag data will be at a higher level. Also should ignore the
526 # corresponding priority-tagged to priority-tagged flow as well.
527 if (flow_entry.vlan_vid == 0 and flow_entry.set_vlan_vid == 0) or \
528 (flow_entry.vlan_vid is None and flow_entry.set_vlan_vid == 0
529 and not is_upstream):
530 continue
531
532 # Add it to hardware
533 try:
534 def failed(_reason, fid):
535 del self._flows[fid]
536
537 task = AdtnInstallFlowTask(self.openomci.omci_agent, self, flow_entry)
538 d = self.openomci.onu_omci_device.task_runner.queue_task(task)
539 d.addErrback(failed, flow_entry.flow_id)
540
541 valid_flows.add(flow_entry.flow_id)
542 self._flows[flow_entry.flow_id] = flow_entry
543
544 except Exception as e:
545 self.log.exception('flow-add', e=e, flow=flow_entry)
546
547 # Now check for flows that were missing in the bulk update
548 deleted_flows = set(self._flows.keys()) - valid_flows
549
550 for flow_id in deleted_flows:
551 try:
552 del_flow = self._flows[flow_id]
553
554 task = AdtnRemoveFlowTask(self.openomci.omci_agent, self, del_flow)
555 self.openomci.onu_omci_device.task_runner.queue_task(task)
556 # TODO: Change to success/failure callback checks later
557 # d.addCallback(success, flow_entry.flow_id)
558 del self._flows[flow_id]
559
560 except Exception as e:
561 self.log.exception('flow-remove', e=e, flow=self._flows[flow_id])
562
563
564 def remove_from_flow_table(self, _flows):
565 """
566 Remove flows from the device
567
568 :param _flows: (list) Flows
569 """
570 raise NotImplementedError
571
572 def add_to_flow_table(self, _flows):
573 """
574 Remove flows from the device
575
576 :param _flows: (list) Flows
577 """
578 raise NotImplementedError
579
580 @inlineCallbacks
581 def reboot(self):
582 self.log.info('rebooting', device_id=self.device_id)
583 self._cancel_deferred()
584
585 reregister = False
586 try:
587 # Drop registration for adapter messages
588 reregister = True
589 self.adapter_agent.unregister_for_inter_adapter_messages()
590
591 except KeyError:
592 reregister = False
593
594 # Update the operational status to ACTIVATING and connect status to
595 # UNREACHABLE
596 device = self.adapter_agent.get_device(self.device_id)
597
598 previous_oper_status = device.oper_status
599 previous_conn_status = device.connect_status
600
601 device.oper_status = OperStatus.ACTIVATING
602 device.connect_status = ConnectStatus.UNREACHABLE
603 device.reason = 'Attempting reboot'
604 self.adapter_agent.update_device(device)
605
606 # TODO: send alert and clear alert after the reboot
607 try:
608 ######################################################
609 # MIB Reset
610 yield self.openomci.onu_omci_device.reboot(timeout=1)
611
612 except Exception as e:
613 self.log.exception('send-reboot', e=e)
614 raise
615
616 # Reboot in progress. A reboot may take up to 3 min 30 seconds
617 # Go ahead and pause less than that and start to look
618 # for it being alive
619 device.reason = 'reboot in progress'
620 self.adapter_agent.update_device(device)
621
622 # Disable OpenOMCI
623 self.omci.enabled = False
624 self._deferred = reactor.callLater(_ONU_REBOOT_MIN,
625 self._finish_reboot,
626 previous_oper_status,
627 previous_conn_status,
628 reregister)
629
630 @inlineCallbacks
631 def _finish_reboot(self, previous_oper_status, previous_conn_status,
632 reregister):
633 # Restart OpenOMCI
634 self.omci.enabled = True
635
636 device = self.adapter_agent.get_device(self.device_id)
637 device.oper_status = previous_oper_status
638 device.connect_status = previous_conn_status
639 device.reason = ''
640 self.adapter_agent.update_device(device)
641
642 if reregister:
643 self.adapter_agent.register_for_inter_adapter_messages()
644
645 self.log.info('reboot-complete', device_id=self.device_id)
646
647 def self_test_device(self, device):
648 """
649 This is called to Self a device based on a NBI call.
650 :param device: A Voltha.Device object.
651 :return: Will return result of self test
652 """
653 from pyvoltha.protos.voltha_pb2 import SelfTestResponse
654 self.log.info('self-test-device', device=device.id)
655 # TODO: Support self test?
656 return SelfTestResponse(result=SelfTestResponse.NOT_SUPPORTED)
657
658 def disable(self):
659 self.log.info('disabling', device_id=self.device_id)
660 try:
661 # Get the latest device reference
662 device = self.adapter_agent.get_device(self.device_id)
663
664 # Disable all ports on that device
665 self.adapter_agent.disable_all_ports(self.device_id)
666
667 # Update the device operational status to UNKNOWN
668 device.oper_status = OperStatus.UNKNOWN
669 device.connect_status = ConnectStatus.UNREACHABLE
670 device.reason = 'Disabled'
671 self.adapter_agent.update_device(device)
672
673 # Remove the uni logical port from the OLT, if still present
674 parent_device = self.adapter_agent.get_device(device.parent_id)
675 assert parent_device
676
677 for uni in self.uni_ports:
678 # port_id = 'uni-{}'.format(uni.port_number)
679 port_id = uni.port_id_name()
680 try:
681 logical_device_id = parent_device.parent_id
682 assert logical_device_id
683 port = self.adapter_agent.get_logical_port(logical_device_id,port_id)
684 self.adapter_agent.delete_logical_port(logical_device_id, port)
685 except KeyError:
686 self.log.info('logical-port-not-found', device_id=self.device_id,
687 portid=port_id)
688
689 # Remove pon port from parent and disable
690 if self._pon is not None:
691 self.adapter_agent.delete_port_reference_from_parent(self.device_id,
692 self._pon.get_port())
693 self._pon.enabled = False
694
695 # Unregister for proxied message
696 self.adapter_agent.unregister_for_proxied_messages(device.proxy_address)
697
698 except Exception as _e:
699 pass # This is expected if OLT has deleted the ONU device handler
700
701 # And disable OMCI as well
702 self.enabled = False
703 self.log.info('disabled')
704
705 def reenable(self):
706 self.log.info('re-enabling', device_id=self.device_id)
707 try:
708 # Get the latest device reference
709 device = self.adapter_agent.get_device(self.device_id)
710 self._cancel_deferred()
711
712 # First we verify that we got parent reference and proxy info
713 assert device.parent_id
714 assert device.proxy_address.device_id
715 # assert device.proxy_address.channel_id
716
717 # Re-register for proxied messages right away
718 self.proxy_address = device.proxy_address
719 self.adapter_agent.register_for_proxied_messages(
720 device.proxy_address)
721
722 # Re-enable the ports on that device
723 self.adapter_agent.enable_all_ports(self.device_id)
724
725 # Add the pon port reference to the parent
726 if self._pon is not None:
727 self._pon.enabled = True
728 self.adapter_agent.add_port_reference_to_parent(device.id,
729 self._pon.get_port())
730 # Update the connect status to REACHABLE
731 device.connect_status = ConnectStatus.REACHABLE
732 self.adapter_agent.update_device(device)
733
734 # re-add uni port to logical device
735 parent_device = self.adapter_agent.get_device(device.parent_id)
736 self.logical_device_id = parent_device.parent_id
737 assert self.logical_device_id, 'Invalid logical device ID'
738
739 # reestablish logical ports for each UNI
740 multi_uni = len(self.uni_ports) > 1
741 for uni in self.uni_ports:
742 self.adapter_agent.add_port(device.id, uni.get_port())
743 uni.add_logical_port(uni.logical_port_number, multi_uni)
744
745 device = self.adapter_agent.get_device(device.id)
746 device.oper_status = OperStatus.ACTIVE
747 device.connect_status = ConnectStatus.REACHABLE
748 device.reason = ''
749
750 self.enabled = True
751 self.adapter_agent.update_device(device)
752
753 self.log.info('re-enabled')
754
755 except Exception, e:
756 self.log.exception('error-re-enabling', e=e)
757
758 def delete(self):
759 self.log.info('deleting', device_id=self.device_id)
760
761 try:
762 for uni in self._unis.values():
763 uni.stop()
764 uni.delete()
765
766 self._pon.stop()
767 self._pon.delete()
768
769 except Exception as _e:
770 pass # Expected if the OLT deleted us from the device handler
771
772 # OpenOMCI cleanup
773 omci, self._openomci = self._openomci, None
774 omci.delete()
775
776 def add_uni_ports(self):
777 """ Called after in-sync achieved and not in xPON mode"""
778 # TODO: We have to methods adding UNI ports. Go to one
779 # TODO: Should this be moved to the omci.py module for this ONU?
780
781 # This is only for working WITHOUT xPON
782 pptp_entities = self.openomci.onu_omci_device.configuration.pptp_entities
783 device = self.adapter_agent.get_device(self.device_id)
784
785 multi_uni = len(pptp_entities) > 1
786 uni_id = 0
787
788 for entity_id, pptp in pptp_entities.items():
789 intf_id = self.proxy_address.channel_id
790 onu_id = self.proxy_address.onu_id
791 uni_no = platform.mk_uni_port_num(intf_id, onu_id, uni_id=uni_id)
792 uni_name = "uni-{}".format(uni_no)
793 mac_bridge_port_num = uni_id + 1
794
795 uni_port = UniPort.create(self, uni_name, uni_no, uni_name)
796 uni_port.entity_id = entity_id
797 uni_port.enabled = True
798 uni_port.mac_bridge_port_num = mac_bridge_port_num
799 uni_port.add_logical_port(uni_port.port_number, multi_uni)
800 self.log.debug("created-uni-port", uni=uni_port)
801
802 self.adapter_agent.add_port(device.id, uni_port.get_port())
803 parent_device = self.adapter_agent.get_device(device.parent_id)
804
805 parent_adapter_agent = registry('adapter_loader').get_agent(parent_device.adapter)
806 if parent_adapter_agent is None:
807 self.log.error('olt-adapter-agent-could-not-be-retrieved')
808
809 parent_adapter_agent.add_port(device.parent_id, uni_port.get_port())
810
811 self._unis[uni_port.port_number] = uni_port
812 self.openomci.onu_omci_device.alarm_synchronizer.set_alarm_params(onu_id=self.proxy_address.onu_id,
813 uni_ports=self._unis.values())
814 # TODO: this should be in the PonPort class
815 pon_port = self._pon.get_port()
816 self.adapter_agent.delete_port_reference_from_parent(self.device_id,
817 pon_port)
818 # Find index where this ONU peer is (should almost always be zero)
819 d = [i for i, e in enumerate(pon_port.peers) if
820 e.port_no == intf_id and e.device_id == device.parent_id]
821
822 if len(d) > 0:
823 pon_port.peers[d[0]].port_no = uni_port.port_number
824 self.adapter_agent.add_port_reference_to_parent(self.device_id,
825 pon_port)
826 self.adapter_agent.update_device(device)
827 uni_port.enabled = True
828 uni_id += 1
829
830 def rx_inter_adapter_message(self, msg):
831 raise NotImplemented('Not currently supported')
832
833 def _subscribe_to_events(self):
834 from pyvoltha.adapters.extensions.omci.onu_device_entry import OnuDeviceEvents, \
835 OnuDeviceEntry
836
837 # OMCI MIB Database sync status
838 bus = self.openomci.onu_omci_device.event_bus
839 topic = OnuDeviceEntry.event_bus_topic(self.device_id,
840 OnuDeviceEvents.MibDatabaseSyncEvent)
841 self._in_sync_subscription = bus.subscribe(topic, self.in_sync_handler)
842
843 def _unsubscribe_to_events(self):
844 insync, self._in_sync_subscription = self._in_sync_subscription, None
845
846 if insync is not None:
847 bus = self.openomci.onu_omci_device.event_bus
848 bus.unsubscribe(insync)
849
850 def in_sync_handler(self, _topic, msg):
851 # Create UNI Ports on first In-Sync event
852 if self._in_sync_subscription is not None:
853 try:
854 from pyvoltha.adapters.extensions.omci.onu_device_entry import IN_SYNC_KEY
855
856 if msg[IN_SYNC_KEY]:
857 # Do not proceed if we have not got our vENET information yet.
858
859 if len(self.uni_ports) == 0:
860 # Drop subscription....
861 insync, self._in_sync_subscription = self._in_sync_subscription, None
862
863 if insync is not None:
864 bus = self.openomci.onu_omci_device.event_bus
865 bus.unsubscribe(insync)
866
867 # Set up UNI Ports. The UNI ports are currently created when the xPON
868 # vENET information is created. Once xPON is removed, we need to create
869 # them from the information provided from the MIB upload UNI-G and other
870 # UNI related MEs.
871 self.add_uni_ports()
872
873 except Exception as e:
874 self.log.exception('in-sync', e=e)