blob: 42dd0a99c2eb47994ade71e231eaf2758fca8a60 [file] [log] [blame]
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -05001#
2# Copyright 2017 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18Broadcom OpenOMCI OLT/ONU adapter handler.
19"""
20
21import json
22import ast
23import structlog
24
25from collections import OrderedDict
26
27from twisted.internet import reactor, task
28from twisted.internet.defer import DeferredQueue, inlineCallbacks, returnValue, TimeoutError
29
30from heartbeat import HeartBeat
31from voltha.extensions.kpi.onu.onu_pm_metrics import OnuPmMetrics
32from voltha.extensions.kpi.onu.onu_omci_pm import OnuOmciPmMetrics
33from voltha.extensions.alarms.adapter_alarms import AdapterAlarms
34
35from common.utils.indexpool import IndexPool
36import voltha.core.flow_decomposer as fd
37from voltha.registry import registry
38from voltha.core.config.config_backend import ConsulStore
39from voltha.core.config.config_backend import EtcdStore
40from voltha.protos import third_party
41from voltha.protos.common_pb2 import OperStatus, ConnectStatus, AdminState
42from voltha.protos.openflow_13_pb2 import OFPXMC_OPENFLOW_BASIC, ofp_port
43from voltha.protos.bbf_fiber_tcont_body_pb2 import TcontsConfigData
44from voltha.protos.bbf_fiber_gemport_body_pb2 import GemportsConfigData
45from voltha.extensions.omci.onu_configuration import OMCCVersion
46from voltha.extensions.omci.onu_device_entry import OnuDeviceEvents, \
47 OnuDeviceEntry, IN_SYNC_KEY
48from voltha.adapters.brcm_openomci_onu.omci.brcm_mib_download_task import BrcmMibDownloadTask
49from voltha.adapters.brcm_openomci_onu.omci.brcm_tp_service_specific_task import BrcmTpServiceSpecificTask
50from voltha.adapters.brcm_openomci_onu.omci.brcm_uni_lock_task import BrcmUniLockTask
51from voltha.adapters.brcm_openomci_onu.omci.brcm_vlan_filter_task import BrcmVlanFilterTask
52from voltha.adapters.brcm_openomci_onu.onu_gem_port import *
53from voltha.adapters.brcm_openomci_onu.onu_tcont import *
54from voltha.adapters.brcm_openomci_onu.pon_port import *
55from voltha.adapters.brcm_openomci_onu.uni_port import *
56from voltha.adapters.brcm_openomci_onu.onu_traffic_descriptor import *
57from common.tech_profile.tech_profile import TechProfile
58
59OP = EntityOperations
60RC = ReasonCodes
61
62_ = third_party
63log = structlog.get_logger()
64
65_STARTUP_RETRY_WAIT = 20
66
67
68class BrcmOpenomciOnuHandler(object):
69
70 def __init__(self, adapter, device_id):
71 self.log = structlog.get_logger(device_id=device_id)
72 self.log.debug('function-entry')
73 self.adapter = adapter
74 self.adapter_agent = adapter.adapter_agent
75 self.parent_adapter = None
76 self.parent_id = None
77 self.device_id = device_id
78 self.incoming_messages = DeferredQueue()
79 self.event_messages = DeferredQueue()
80 self.proxy_address = None
81 self.tx_id = 0
82 self._enabled = False
83 self.alarms = None
84 self.pm_metrics = None
85 self._omcc_version = OMCCVersion.Unknown
86 self._total_tcont_count = 0 # From ANI-G ME
87 self._qos_flexibility = 0 # From ONT2_G ME
88
89 self._onu_indication = None
90 self._unis = dict() # Port # -> UniPort
91
92 self._pon = None
93 # TODO: probably shouldnt be hardcoded, determine from olt maybe?
94 self._pon_port_number = 100
95 self.logical_device_id = None
96
97 self._heartbeat = HeartBeat.create(self, device_id)
98
99 # Set up OpenOMCI environment
100 self._onu_omci_device = None
101 self._dev_info_loaded = False
102 self._deferred = None
103
104 self._in_sync_subscription = None
105 self._connectivity_subscription = None
106 self._capabilities_subscription = None
107
108 self.mac_bridge_service_profile_entity_id = 0x201
109 self.gal_enet_profile_entity_id = 0x1
110
111 self._tp_service_specific_task = dict()
112 self._tech_profile_download_done = dict()
113
114 # Initialize KV store client
115 self.args = registry('main').get_args()
116 if self.args.backend == 'etcd':
117 host, port = self.args.etcd.split(':', 1)
118 self.kv_client = EtcdStore(host, port,
119 TechProfile.KV_STORE_TECH_PROFILE_PATH_PREFIX)
120 elif self.args.backend == 'consul':
121 host, port = self.args.consul.split(':', 1)
122 self.kv_client = ConsulStore(host, port,
123 TechProfile.KV_STORE_TECH_PROFILE_PATH_PREFIX)
124 else:
125 self.log.error('Invalid-backend')
126 raise Exception("Invalid-backend-for-kv-store")
127
128 # Handle received ONU event messages
129 reactor.callLater(0, self.handle_onu_events)
130
131 @property
132 def enabled(self):
133 return self._enabled
134
135 @enabled.setter
136 def enabled(self, value):
137 if self._enabled != value:
138 self._enabled = value
139
140 @property
141 def omci_agent(self):
142 return self.adapter.omci_agent
143
144 @property
145 def omci_cc(self):
146 return self._onu_omci_device.omci_cc if self._onu_omci_device is not None else None
147
148 @property
149 def heartbeat(self):
150 return self._heartbeat
151
152 @property
153 def uni_ports(self):
154 return self._unis.values()
155
156 def uni_port(self, port_no_or_name):
157 if isinstance(port_no_or_name, (str, unicode)):
158 return next((uni for uni in self.uni_ports
159 if uni.name == port_no_or_name), None)
160
161 assert isinstance(port_no_or_name, int), 'Invalid parameter type'
162 return next((uni for uni in self.uni_ports
163 if uni.logical_port_number == port_no_or_name), None)
164
165 @property
166 def pon_port(self):
167 return self._pon
168
169 def receive_message(self, msg):
170 if self.omci_cc is not None:
171 self.omci_cc.receive_message(msg)
172
173 # Called once when the adapter creates the device/onu instance
174 def activate(self, device):
175 self.log.debug('function-entry', device=device)
176
177 # first we verify that we got parent reference and proxy info
178 assert device.parent_id
179 assert device.proxy_address.device_id
180
181 # register for proxied messages right away
182 self.proxy_address = device.proxy_address
183 self.adapter_agent.register_for_proxied_messages(device.proxy_address)
184 self.parent_id = device.parent_id
185 parent_device = self.adapter_agent.get_device(self.parent_id)
186 if parent_device.type == 'openolt':
187 self.parent_adapter = registry('adapter_loader'). \
188 get_agent(parent_device.adapter).adapter
189
190 if self.enabled is not True:
191 self.log.info('activating-new-onu')
192 # populate what we know. rest comes later after mib sync
193 device.root = True
194 device.vendor = 'Broadcom'
195 device.connect_status = ConnectStatus.REACHABLE
196 device.oper_status = OperStatus.DISCOVERED
197 device.reason = 'activating-onu'
198
199 # pm_metrics requires a logical device id
200 parent_device = self.adapter_agent.get_device(device.parent_id)
201 self.logical_device_id = parent_device.parent_id
202 assert self.logical_device_id, 'Invalid logical device ID'
203
204 self.adapter_agent.update_device(device)
205
206 self.log.debug('set-device-discovered')
207
208 self._init_pon_state(device)
209
210 ############################################################################
211 # Setup PM configuration for this device
212 # Pass in ONU specific options
213 kwargs = {
214 OnuPmMetrics.DEFAULT_FREQUENCY_KEY: OnuPmMetrics.DEFAULT_ONU_COLLECTION_FREQUENCY,
215 'heartbeat': self.heartbeat,
216 OnuOmciPmMetrics.OMCI_DEV_KEY: self._onu_omci_device
217 }
218 self.pm_metrics = OnuPmMetrics(self.adapter_agent, self.device_id,
219 self.logical_device_id, grouped=True,
220 freq_override=False, **kwargs)
221 pm_config = self.pm_metrics.make_proto()
222 self._onu_omci_device.set_pm_config(self.pm_metrics.omci_pm.openomci_interval_pm)
223 self.log.info("initial-pm-config", pm_config=pm_config)
224 self.adapter_agent.update_device_pm_config(pm_config, init=True)
225
226 ############################################################################
227 # Setup Alarm handler
228 self.alarms = AdapterAlarms(self.adapter_agent, device.id, self.logical_device_id)
229 # Note, ONU ID and UNI intf set in add_uni_port method
230 self._onu_omci_device.alarm_synchronizer.set_alarm_params(mgr=self.alarms,
231 ani_ports=[self._pon])
232 self.enabled = True
233 else:
234 self.log.info('onu-already-activated')
235
236 # Called once when the adapter needs to re-create device. usually on vcore restart
237 def reconcile(self, device):
238 self.log.debug('function-entry', device=device)
239
240 # first we verify that we got parent reference and proxy info
241 assert device.parent_id
242 assert device.proxy_address.device_id
243
244 # register for proxied messages right away
245 self.proxy_address = device.proxy_address
246 self.adapter_agent.register_for_proxied_messages(device.proxy_address)
247
248 if self.enabled is not True:
249 self.log.info('reconciling-broadcom-onu-device')
250
251 self._init_pon_state(device)
252
253 # need to restart state machines on vcore restart. there is no indication to do it for us.
254 self._onu_omci_device.start()
255 device.reason = "restarting-openomci"
256 self.adapter_agent.update_device(device)
257
258 # TODO: this is probably a bit heavy handed
259 # Force a reboot for now. We need indications to reflow to reassign tconts and gems given vcore went away
260 # This may not be necessary when mib resync actually works
261 reactor.callLater(1, self.reboot)
262
263 self.enabled = True
264 else:
265 self.log.info('onu-already-activated')
266
267 @inlineCallbacks
268 def handle_onu_events(self):
269 event_msg = yield self.event_messages.get()
270 try:
271 if event_msg['event'] == 'download_tech_profile':
272 tp_path = event_msg['event_data']
273 uni_id = event_msg['uni_id']
274 self.load_and_configure_tech_profile(uni_id, tp_path)
275
276 except Exception as e:
277 self.log.error("exception-handling-onu-event", e=e)
278
279 # Handle next event
280 reactor.callLater(0, self.handle_onu_events)
281
282 def _init_pon_state(self, device):
283 self.log.debug('function-entry', device=device)
284
285 self._pon = PonPort.create(self, self._pon_port_number)
286 self.adapter_agent.add_port(device.id, self._pon.get_port())
287
288 self.log.debug('added-pon-port-to-agent', pon=self._pon)
289
290 parent_device = self.adapter_agent.get_device(device.parent_id)
291 self.logical_device_id = parent_device.parent_id
292
293 self.adapter_agent.update_device(device)
294
295 # Create and start the OpenOMCI ONU Device Entry for this ONU
296 self._onu_omci_device = self.omci_agent.add_device(self.device_id,
297 self.adapter_agent,
298 support_classes=self.adapter.broadcom_omci,
299 custom_me_map=self.adapter.custom_me_entities())
300 # Port startup
301 if self._pon is not None:
302 self._pon.enabled = True
303
304 # TODO: move to UniPort
305 def update_logical_port(self, logical_device_id, port_id, state):
306 try:
307 self.log.info('updating-logical-port', logical_port_id=port_id,
308 logical_device_id=logical_device_id, state=state)
309 logical_port = self.adapter_agent.get_logical_port(logical_device_id,
310 port_id)
311 logical_port.ofp_port.state = state
312 self.adapter_agent.update_logical_port(logical_device_id,
313 logical_port)
314 except Exception as e:
315 self.log.exception("exception-updating-port", e=e)
316
317 def delete(self, device):
318 self.log.info('delete-onu', device=device)
319 if self.parent_adapter:
320 try:
321 self.parent_adapter.delete_child_device(self.parent_id, device)
322 except AttributeError:
323 self.log.debug('parent-device-delete-child-not-implemented')
324 else:
325 self.log.debug("parent-adapter-not-available")
326
327 def _create_tconts(self, uni_id, us_scheduler):
328 alloc_id = us_scheduler['alloc_id']
329 q_sched_policy = us_scheduler['q_sched_policy']
330 self.log.debug('create-tcont', us_scheduler=us_scheduler)
331
332 tcontdict = dict()
333 tcontdict['alloc-id'] = alloc_id
334 tcontdict['q_sched_policy'] = q_sched_policy
335 tcontdict['uni_id'] = uni_id
336
337 # TODO: Not sure what to do with any of this...
338 tddata = dict()
339 tddata['name'] = 'not-sure-td-profile'
340 tddata['fixed-bandwidth'] = "not-sure-fixed"
341 tddata['assured-bandwidth'] = "not-sure-assured"
342 tddata['maximum-bandwidth'] = "not-sure-max"
343 tddata['additional-bw-eligibility-indicator'] = "not-sure-additional"
344
345 td = OnuTrafficDescriptor.create(tddata)
346 tcont = OnuTCont.create(self, tcont=tcontdict, td=td)
347
348 self._pon.add_tcont(tcont)
349
350 self.log.debug('pon-add-tcont', tcont=tcont)
351
352 # Called when there is an olt up indication, providing the gem port id chosen by the olt handler
353 def _create_gemports(self, uni_id, gem_ports, alloc_id_ref, direction):
354 self.log.debug('create-gemport',
355 gem_ports=gem_ports, direction=direction)
356
357 for gem_port in gem_ports:
358 gemdict = dict()
359 gemdict['gemport_id'] = gem_port['gemport_id']
360 gemdict['direction'] = direction
361 gemdict['alloc_id_ref'] = alloc_id_ref
362 gemdict['encryption'] = gem_port['aes_encryption']
363 gemdict['discard_config'] = dict()
364 gemdict['discard_config']['max_probability'] = \
365 gem_port['discard_config']['max_probability']
366 gemdict['discard_config']['max_threshold'] = \
367 gem_port['discard_config']['max_threshold']
368 gemdict['discard_config']['min_threshold'] = \
369 gem_port['discard_config']['min_threshold']
370 gemdict['discard_policy'] = gem_port['discard_policy']
371 gemdict['max_q_size'] = gem_port['max_q_size']
372 gemdict['pbit_map'] = gem_port['pbit_map']
373 gemdict['priority_q'] = gem_port['priority_q']
374 gemdict['scheduling_policy'] = gem_port['scheduling_policy']
375 gemdict['weight'] = gem_port['weight']
376 gemdict['uni_id'] = uni_id
377
378 gem_port = OnuGemPort.create(self, gem_port=gemdict)
379
380 self._pon.add_gem_port(gem_port)
381
382 self.log.debug('pon-add-gemport', gem_port=gem_port)
383
384 def _do_tech_profile_configuration(self, uni_id, tp):
385 num_of_tconts = tp['num_of_tconts']
386 us_scheduler = tp['us_scheduler']
387 alloc_id = us_scheduler['alloc_id']
388 self._create_tconts(uni_id, us_scheduler)
389 upstream_gem_port_attribute_list = tp['upstream_gem_port_attribute_list']
390 self._create_gemports(uni_id, upstream_gem_port_attribute_list, alloc_id, "UPSTREAM")
391 downstream_gem_port_attribute_list = tp['downstream_gem_port_attribute_list']
392 self._create_gemports(uni_id, downstream_gem_port_attribute_list, alloc_id, "DOWNSTREAM")
393
394 def load_and_configure_tech_profile(self, uni_id, tp_path):
395 self.log.debug("loading-tech-profile-configuration", uni_id=uni_id, tp_path=tp_path)
396
397 if uni_id not in self._tp_service_specific_task:
398 self._tp_service_specific_task[uni_id] = dict()
399
400 if uni_id not in self._tech_profile_download_done:
401 self._tech_profile_download_done[uni_id] = dict()
402
403 if tp_path not in self._tech_profile_download_done[uni_id]:
404 self._tech_profile_download_done[uni_id][tp_path] = False
405
406 if not self._tech_profile_download_done[uni_id][tp_path]:
407 try:
408 if tp_path in self._tp_service_specific_task[uni_id]:
409 self.log.info("tech-profile-config-already-in-progress",
410 tp_path=tp_path)
411 return
412
413 tp = self.kv_client[tp_path]
414 tp = ast.literal_eval(tp)
415 self.log.debug("tp-instance", tp=tp)
416 self._do_tech_profile_configuration(uni_id, tp)
417
418 def success(_results):
419 self.log.info("tech-profile-config-done-successfully")
420 device = self.adapter_agent.get_device(self.device_id)
421 device.reason = 'tech-profile-config-download-success'
422 self.adapter_agent.update_device(device)
423 if tp_path in self._tp_service_specific_task[uni_id]:
424 del self._tp_service_specific_task[uni_id][tp_path]
425 self._tech_profile_download_done[uni_id][tp_path] = True
426
427 def failure(_reason):
428 self.log.warn('tech-profile-config-failure-retrying',
429 _reason=_reason)
430 device = self.adapter_agent.get_device(self.device_id)
431 device.reason = 'tech-profile-config-download-failure-retrying'
432 self.adapter_agent.update_device(device)
433 if tp_path in self._tp_service_specific_task[uni_id]:
434 del self._tp_service_specific_task[uni_id][tp_path]
435 self._deferred = reactor.callLater(_STARTUP_RETRY_WAIT, self.load_and_configure_tech_profile,
436 uni_id, tp_path)
437
438 self.log.info('downloading-tech-profile-configuration')
439 self._tp_service_specific_task[uni_id][tp_path] = \
440 BrcmTpServiceSpecificTask(self.omci_agent, self, uni_id)
441 self._deferred = \
442 self._onu_omci_device.task_runner.queue_task(self._tp_service_specific_task[uni_id][tp_path])
443 self._deferred.addCallbacks(success, failure)
444
445 except Exception as e:
446 self.log.exception("error-loading-tech-profile", e=e)
447 else:
448 self.log.info("tech-profile-config-already-done")
449
450 def update_pm_config(self, device, pm_config):
451 # TODO: This has not been tested
452 self.log.info('update_pm_config', pm_config=pm_config)
453 self.pm_metrics.update(pm_config)
454
455 # Calling this assumes the onu is active/ready and had at least an initial mib downloaded. This gets called from
456 # flow decomposition that ultimately comes from onos
457 def update_flow_table(self, device, flows):
458 self.log.debug('function-entry', device=device, flows=flows)
459
460 #
461 # We need to proxy through the OLT to get to the ONU
462 # Configuration from here should be using OMCI
463 #
464 # self.log.info('bulk-flow-update', device_id=device.id, flows=flows)
465
466 # no point in pushing omci flows if the device isnt reachable
467 if device.connect_status != ConnectStatus.REACHABLE or \
468 device.admin_state != AdminState.ENABLED:
469 self.log.warn("device-disabled-or-offline-skipping-flow-update",
470 admin=device.admin_state, connect=device.connect_status)
471 return
472
473 def is_downstream(port):
474 return port == self._pon_port_number
475
476 def is_upstream(port):
477 return not is_downstream(port)
478
479 for flow in flows:
480 _type = None
481 _port = None
482 _vlan_vid = None
483 _udp_dst = None
484 _udp_src = None
485 _ipv4_dst = None
486 _ipv4_src = None
487 _metadata = None
488 _output = None
489 _push_tpid = None
490 _field = None
491 _set_vlan_vid = None
492 self.log.debug('bulk-flow-update', device_id=device.id, flow=flow)
493 try:
494 _in_port = fd.get_in_port(flow)
495 assert _in_port is not None
496
497 _out_port = fd.get_out_port(flow) # may be None
498
499 if is_downstream(_in_port):
500 self.log.debug('downstream-flow', in_port=_in_port, out_port=_out_port)
501 uni_port = self.uni_port(_out_port)
502 elif is_upstream(_in_port):
503 self.log.debug('upstream-flow', in_port=_in_port, out_port=_out_port)
504 uni_port = self.uni_port(_in_port)
505 else:
506 raise Exception('port should be 1 or 2 by our convention')
507
508 self.log.debug('flow-ports', in_port=_in_port, out_port=_out_port, uni_port=str(uni_port))
509
510 for field in fd.get_ofb_fields(flow):
511 if field.type == fd.ETH_TYPE:
512 _type = field.eth_type
513 self.log.debug('field-type-eth-type',
514 eth_type=_type)
515
516 elif field.type == fd.IP_PROTO:
517 _proto = field.ip_proto
518 self.log.debug('field-type-ip-proto',
519 ip_proto=_proto)
520
521 elif field.type == fd.IN_PORT:
522 _port = field.port
523 self.log.debug('field-type-in-port',
524 in_port=_port)
525
526 elif field.type == fd.VLAN_VID:
527 _vlan_vid = field.vlan_vid & 0xfff
528 self.log.debug('field-type-vlan-vid',
529 vlan=_vlan_vid)
530
531 elif field.type == fd.VLAN_PCP:
532 _vlan_pcp = field.vlan_pcp
533 self.log.debug('field-type-vlan-pcp',
534 pcp=_vlan_pcp)
535
536 elif field.type == fd.UDP_DST:
537 _udp_dst = field.udp_dst
538 self.log.debug('field-type-udp-dst',
539 udp_dst=_udp_dst)
540
541 elif field.type == fd.UDP_SRC:
542 _udp_src = field.udp_src
543 self.log.debug('field-type-udp-src',
544 udp_src=_udp_src)
545
546 elif field.type == fd.IPV4_DST:
547 _ipv4_dst = field.ipv4_dst
548 self.log.debug('field-type-ipv4-dst',
549 ipv4_dst=_ipv4_dst)
550
551 elif field.type == fd.IPV4_SRC:
552 _ipv4_src = field.ipv4_src
553 self.log.debug('field-type-ipv4-src',
554 ipv4_dst=_ipv4_src)
555
556 elif field.type == fd.METADATA:
557 _metadata = field.table_metadata
558 self.log.debug('field-type-metadata',
559 metadata=_metadata)
560
561 else:
562 raise NotImplementedError('field.type={}'.format(
563 field.type))
564
565 for action in fd.get_actions(flow):
566
567 if action.type == fd.OUTPUT:
568 _output = action.output.port
569 self.log.debug('action-type-output',
570 output=_output, in_port=_in_port)
571
572 elif action.type == fd.POP_VLAN:
573 self.log.debug('action-type-pop-vlan',
574 in_port=_in_port)
575
576 elif action.type == fd.PUSH_VLAN:
577 _push_tpid = action.push.ethertype
578 self.log.debug('action-type-push-vlan',
579 push_tpid=_push_tpid, in_port=_in_port)
580 if action.push.ethertype != 0x8100:
581 self.log.error('unhandled-tpid',
582 ethertype=action.push.ethertype)
583
584 elif action.type == fd.SET_FIELD:
585 _field = action.set_field.field.ofb_field
586 assert (action.set_field.field.oxm_class ==
587 OFPXMC_OPENFLOW_BASIC)
588 self.log.debug('action-type-set-field',
589 field=_field, in_port=_in_port)
590 if _field.type == fd.VLAN_VID:
591 _set_vlan_vid = _field.vlan_vid & 0xfff
592 self.log.debug('set-field-type-vlan-vid',
593 vlan_vid=_set_vlan_vid)
594 else:
595 self.log.error('unsupported-action-set-field-type',
596 field_type=_field.type)
597 else:
598 self.log.error('unsupported-action-type',
599 action_type=action.type, in_port=_in_port)
600
601 # TODO: We only set vlan omci flows. Handle omci matching ethertypes at some point in another task
602 if _type is not None:
603 self.log.warn('ignoring-flow-with-ethType', ethType=_type)
604 elif _set_vlan_vid is None or _set_vlan_vid == 0:
605 self.log.warn('ignorning-flow-that-does-not-set-vlanid')
606 else:
607 self.log.warn('set-vlanid', uni_id=uni_port.port_number, set_vlan_vid=_set_vlan_vid)
608 self._add_vlan_filter_task(device, uni_port, _set_vlan_vid)
609
610 except Exception as e:
611 self.log.exception('failed-to-install-flow', e=e, flow=flow)
612
613
614 def _add_vlan_filter_task(self, device, uni_port, _set_vlan_vid):
615 assert uni_port is not None
616
617 def success(_results):
618 self.log.info('vlan-tagging-success', uni_port=uni_port, vlan=_set_vlan_vid)
619 device.reason = 'omci-flows-pushed'
620 self._vlan_filter_task = None
621
622 def failure(_reason):
623 self.log.warn('vlan-tagging-failure', uni_port=uni_port, vlan=_set_vlan_vid)
624 device.reason = 'omci-flows-failed-retrying'
625 self._vlan_filter_task = reactor.callLater(_STARTUP_RETRY_WAIT,
626 self._add_vlan_filter_task, device, uni_port, _set_vlan_vid)
627
628 self.log.info('setting-vlan-tag')
629 self._vlan_filter_task = BrcmVlanFilterTask(self.omci_agent, self.device_id, uni_port, _set_vlan_vid)
630 self._deferred = self._onu_omci_device.task_runner.queue_task(self._vlan_filter_task)
631 self._deferred.addCallbacks(success, failure)
632
633 def get_tx_id(self):
634 self.log.debug('function-entry')
635 self.tx_id += 1
636 return self.tx_id
637
638 # TODO: Actually conform to or create a proper interface.
639 # this and the other functions called from the olt arent very clear.
640 # Called each time there is an onu "up" indication from the olt handler
641 def create_interface(self, data):
642 self.log.debug('function-entry', data=data)
643 self._onu_indication = data
644
645 onu_device = self.adapter_agent.get_device(self.device_id)
646
647 self.log.debug('starting-openomci-statemachine')
648 self._subscribe_to_events()
649 reactor.callLater(1, self._onu_omci_device.start)
650 onu_device.reason = "starting-openomci"
651 self.adapter_agent.update_device(onu_device)
652 self._heartbeat.enabled = True
653
654 # Currently called each time there is an onu "down" indication from the olt handler
655 # TODO: possibly other reasons to "update" from the olt?
656 def update_interface(self, data):
657 self.log.debug('function-entry', data=data)
658 oper_state = data.get('oper_state', None)
659
660 onu_device = self.adapter_agent.get_device(self.device_id)
661
662 if oper_state == 'down':
663 self.log.debug('stopping-openomci-statemachine')
664 reactor.callLater(0, self._onu_omci_device.stop)
665
666 # Let TP download happen again
667 for uni_id in self._tp_service_specific_task:
668 self._tp_service_specific_task[uni_id].clear()
669 for uni_id in self._tech_profile_download_done:
670 self._tech_profile_download_done[uni_id].clear()
671
672 self.disable_ports(onu_device)
673 onu_device.reason = "stopping-openomci"
674 onu_device.connect_status = ConnectStatus.UNREACHABLE
675 onu_device.oper_status = OperStatus.DISCOVERED
676 self.adapter_agent.update_device(onu_device)
677 else:
678 self.log.debug('not-changing-openomci-statemachine')
679
680 # Not currently called by olt or anything else
681 def remove_interface(self, data):
682 self.log.debug('function-entry', data=data)
683
684 onu_device = self.adapter_agent.get_device(self.device_id)
685
686 self.log.debug('stopping-openomci-statemachine')
687 reactor.callLater(0, self._onu_omci_device.stop)
688
689 # Let TP download happen again
690 for uni_id in self._tp_service_specific_task:
691 self._tp_service_specific_task[uni_id].clear()
692 for uni_id in self._tech_profile_download_done:
693 self._tech_profile_download_done[uni_id].clear()
694
695 self.disable_ports(onu_device)
696 onu_device.reason = "stopping-openomci"
697 self.adapter_agent.update_device(onu_device)
698
699 # TODO: im sure there is more to do here
700
701 # Not currently called. Would be called presumably from the olt handler
702 def remove_gemport(self, data):
703 self.log.debug('remove-gemport', data=data)
704 gem_port = GemportsConfigData()
705 gem_port.CopyFrom(data)
706 device = self.adapter_agent.get_device(self.device_id)
707 if device.connect_status != ConnectStatus.REACHABLE:
708 self.log.error('device-unreachable')
709 return
710
711 # Not currently called. Would be called presumably from the olt handler
712 def remove_tcont(self, tcont_data, traffic_descriptor_data):
713 self.log.debug('remove-tcont', tcont_data=tcont_data, traffic_descriptor_data=traffic_descriptor_data)
714 device = self.adapter_agent.get_device(self.device_id)
715 if device.connect_status != ConnectStatus.REACHABLE:
716 self.log.error('device-unreachable')
717 return
718
719 # TODO: Create some omci task that encompases this what intended
720
721 # Not currently called. Would be called presumably from the olt handler
722 def create_multicast_gemport(self, data):
723 self.log.debug('function-entry', data=data)
724
725 # TODO: create objects and populate for later omci calls
726
727 def disable(self, device):
728 self.log.debug('function-entry', device=device)
729 try:
730 self.log.info('sending-uni-lock-towards-device', device=device)
731
732 def stop_anyway(reason):
733 # proceed with disable regardless if we could reach the onu. for example onu is unplugged
734 self.log.debug('stopping-openomci-statemachine')
735 reactor.callLater(0, self._onu_omci_device.stop)
736
737 # Let TP download happen again
738 for uni_id in self._tp_service_specific_task:
739 self._tp_service_specific_task[uni_id].clear()
740 for uni_id in self._tech_profile_download_done:
741 self._tech_profile_download_done[uni_id].clear()
742
743 self.disable_ports(device)
744 device.oper_status = OperStatus.UNKNOWN
745 device.reason = "omci-admin-lock"
746 self.adapter_agent.update_device(device)
747
748 # lock all the unis
749 task = BrcmUniLockTask(self.omci_agent, self.device_id, lock=True)
750 self._deferred = self._onu_omci_device.task_runner.queue_task(task)
751 self._deferred.addCallbacks(stop_anyway, stop_anyway)
752 except Exception as e:
753 log.exception('exception-in-onu-disable', exception=e)
754
755 def reenable(self, device):
756 self.log.debug('function-entry', device=device)
757 try:
758 # Start up OpenOMCI state machines for this device
759 # this will ultimately resync mib and unlock unis on successful redownloading the mib
760 self.log.debug('restarting-openomci-statemachine')
761 self._subscribe_to_events()
762 device.reason = "restarting-openomci"
763 self.adapter_agent.update_device(device)
764 reactor.callLater(1, self._onu_omci_device.start)
765 self._heartbeat.enabled = True
766 except Exception as e:
767 log.exception('exception-in-onu-reenable', exception=e)
768
769 def reboot(self):
770 self.log.info('reboot-device')
771 device = self.adapter_agent.get_device(self.device_id)
772 if device.connect_status != ConnectStatus.REACHABLE:
773 self.log.error("device-unreachable")
774 return
775
776 def success(_results):
777 self.log.info('reboot-success', _results=_results)
778 self.disable_ports(device)
779 device.connect_status = ConnectStatus.UNREACHABLE
780 device.oper_status = OperStatus.DISCOVERED
781 device.reason = "rebooting"
782 self.adapter_agent.update_device(device)
783
784 def failure(_reason):
785 self.log.info('reboot-failure', _reason=_reason)
786
787 self._deferred = self._onu_omci_device.reboot()
788 self._deferred.addCallbacks(success, failure)
789
790 def disable_ports(self, onu_device):
791 self.log.info('disable-ports', device_id=self.device_id,
792 onu_device=onu_device)
793
794 # Disable all ports on that device
795 self.adapter_agent.disable_all_ports(self.device_id)
796
797 parent_device = self.adapter_agent.get_device(onu_device.parent_id)
798 assert parent_device
799 logical_device_id = parent_device.parent_id
800 assert logical_device_id
801 ports = self.adapter_agent.get_ports(onu_device.id, Port.ETHERNET_UNI)
802 for port in ports:
803 port_id = 'uni-{}'.format(port.port_no)
804 # TODO: move to UniPort
805 self.update_logical_port(logical_device_id, port_id, OFPPS_LINK_DOWN)
806
807 def enable_ports(self, onu_device):
808 self.log.info('enable-ports', device_id=self.device_id, onu_device=onu_device)
809
810 # Disable all ports on that device
811 self.adapter_agent.enable_all_ports(self.device_id)
812
813 parent_device = self.adapter_agent.get_device(onu_device.parent_id)
814 assert parent_device
815 logical_device_id = parent_device.parent_id
816 assert logical_device_id
817 ports = self.adapter_agent.get_ports(onu_device.id, Port.ETHERNET_UNI)
818 for port in ports:
819 port_id = 'uni-{}'.format(port.port_no)
820 # TODO: move to UniPort
821 self.update_logical_port(logical_device_id, port_id, OFPPS_LIVE)
822
823 # Called just before openomci state machine is started. These listen for events from selected state machines,
824 # most importantly, mib in sync. Which ultimately leads to downloading the mib
825 def _subscribe_to_events(self):
826 self.log.debug('function-entry')
827
828 # OMCI MIB Database sync status
829 bus = self._onu_omci_device.event_bus
830 topic = OnuDeviceEntry.event_bus_topic(self.device_id,
831 OnuDeviceEvents.MibDatabaseSyncEvent)
832 self._in_sync_subscription = bus.subscribe(topic, self.in_sync_handler)
833
834 # OMCI Capabilities
835 bus = self._onu_omci_device.event_bus
836 topic = OnuDeviceEntry.event_bus_topic(self.device_id,
837 OnuDeviceEvents.OmciCapabilitiesEvent)
838 self._capabilities_subscription = bus.subscribe(topic, self.capabilties_handler)
839
840 # Called when the mib is in sync
841 def in_sync_handler(self, _topic, msg):
842 self.log.debug('function-entry', _topic=_topic, msg=msg)
843 if self._in_sync_subscription is not None:
844 try:
845 in_sync = msg[IN_SYNC_KEY]
846
847 if in_sync:
848 # Only call this once
849 bus = self._onu_omci_device.event_bus
850 bus.unsubscribe(self._in_sync_subscription)
851 self._in_sync_subscription = None
852
853 # Start up device_info load
854 self.log.debug('running-mib-sync')
855 reactor.callLater(0, self._mib_in_sync)
856
857 except Exception as e:
858 self.log.exception('in-sync', e=e)
859
860 def capabilties_handler(self, _topic, _msg):
861 self.log.debug('function-entry', _topic=_topic, msg=_msg)
862 if self._capabilities_subscription is not None:
863 self.log.debug('capabilities-handler-done')
864
865 # Mib is in sync, we can now query what we learned and actually start pushing ME (download) to the ONU.
866 # Currently uses a basic mib download task that create a bridge with a single gem port and uni, only allowing EAP
867 # Implement your own MibDownloadTask if you wish to setup something different by default
868 def _mib_in_sync(self):
869 self.log.debug('function-entry')
870
871 omci = self._onu_omci_device
872 in_sync = omci.mib_db_in_sync
873
874 device = self.adapter_agent.get_device(self.device_id)
875 device.reason = 'discovery-mibsync-complete'
876 self.adapter_agent.update_device(device)
877
878 if not self._dev_info_loaded:
879 self.log.info('loading-device-data-from-mib', in_sync=in_sync, already_loaded=self._dev_info_loaded)
880
881 omci_dev = self._onu_omci_device
882 config = omci_dev.configuration
883
884 # TODO: run this sooner somehow. shouldnt have to wait for mib sync to push an initial download
885 # In Sync, we can register logical ports now. Ideally this could occur on
886 # the first time we received a successful (no timeout) OMCI Rx response.
887 try:
888
889 # sort the lists so we get consistent port ordering.
890 ani_list = sorted(config.ani_g_entities) if config.ani_g_entities else []
891 uni_list = sorted(config.uni_g_entities) if config.uni_g_entities else []
892 pptp_list = sorted(config.pptp_entities) if config.pptp_entities else []
893 veip_list = sorted(config.veip_entities) if config.veip_entities else []
894
895 if ani_list is None or (pptp_list is None and veip_list is None):
896 device.reason = 'onu-missing-required-elements'
897 self.log.warn("no-ani-or-unis")
898 self.adapter_agent.update_device(device)
899 raise Exception("onu-missing-required-elements")
900
901 # Currently logging the ani, pptp, veip, and uni for information purposes.
902 # Actually act on the veip/pptp as its ME is the most correct one to use in later tasks.
903 # And in some ONU the UNI-G list is incomplete or incorrect...
904 for entity_id in ani_list:
905 ani_value = config.ani_g_entities[entity_id]
906 self.log.debug("discovered-ani", entity_id=entity_id, value=ani_value)
907 # TODO: currently only one OLT PON port/ANI, so this works out. With NGPON there will be 2..?
908 self._total_tcont_count = ani_value.get('total-tcont-count')
909 self.log.debug("set-total-tcont-count", tcont_count=self._total_tcont_count)
910
911 for entity_id in uni_list:
912 uni_value = config.uni_g_entities[entity_id]
913 self.log.debug("discovered-uni", entity_id=entity_id, value=uni_value)
914
915 uni_entities = OrderedDict()
916 for entity_id in pptp_list:
917 pptp_value = config.pptp_entities[entity_id]
918 self.log.debug("discovered-pptp", entity_id=entity_id, value=pptp_value)
919 uni_entities[entity_id] = UniType.PPTP
920
921 for entity_id in veip_list:
922 veip_value = config.veip_entities[entity_id]
923 self.log.debug("discovered-veip", entity_id=entity_id, value=veip_value)
924 uni_entities[entity_id] = UniType.VEIP
925
926 uni_id = 0
927 for entity_id, uni_type in uni_entities.iteritems():
928 try:
929 self._add_uni_port(entity_id, uni_id, uni_type)
930 uni_id += 1
931 except AssertionError as e:
932 self.log.warn("could not add UNI", entity_id=entity_id, uni_type=uni_type, e=e)
933
934 multi_uni = len(self._unis) > 1
935 for uni_port in self._unis.itervalues():
936 uni_port.add_logical_port(uni_port.port_number, multi_uni)
937
938 self.adapter_agent.update_device(device)
939
940 self._qos_flexibility = config.qos_configuration_flexibility or 0
941 self._omcc_version = config.omcc_version or OMCCVersion.Unknown
942
943 if self._unis:
944 self._dev_info_loaded = True
945 else:
946 device.reason = 'no-usable-unis'
947 self.adapter_agent.update_device(device)
948 self.log.warn("no-usable-unis")
949 raise Exception("no-usable-unis")
950
951 except Exception as e:
952 self.log.exception('device-info-load', e=e)
953 self._deferred = reactor.callLater(_STARTUP_RETRY_WAIT, self._mib_in_sync)
954
955 else:
956 self.log.info('device-info-already-loaded', in_sync=in_sync, already_loaded=self._dev_info_loaded)
957
958 if self._dev_info_loaded:
959 if device.admin_state == AdminState.ENABLED:
960 def success(_results):
961 self.log.info('mib-download-success', _results=_results)
962 device = self.adapter_agent.get_device(self.device_id)
963 device.reason = 'initial-mib-downloaded'
964 device.oper_status = OperStatus.ACTIVE
965 device.connect_status = ConnectStatus.REACHABLE
966 self.enable_ports(device)
967 self.adapter_agent.update_device(device)
968 self._mib_download_task = None
969
970 def failure(_reason):
971 self.log.warn('mib-download-failure-retrying', _reason=_reason)
972 device.reason = 'initial-mib-download-failure-retrying'
973 self.adapter_agent.update_device(device)
974 self._deferred = reactor.callLater(_STARTUP_RETRY_WAIT, self._mib_in_sync)
975
976 # Download an initial mib that creates simple bridge that can pass EAP. On success (above) finally set
977 # the device to active/reachable. This then opens up the handler to openflow pushes from outside
978 self.log.info('downloading-initial-mib-configuration')
979 self._mib_download_task = BrcmMibDownloadTask(self.omci_agent, self)
980 self._deferred = self._onu_omci_device.task_runner.queue_task(self._mib_download_task)
981 self._deferred.addCallbacks(success, failure)
982 else:
983 self.log.info('admin-down-disabling')
984 self.disable(device)
985 else:
986 self.log.info('device-info-not-loaded-skipping-mib-download')
987
988
989 def _add_uni_port(self, entity_id, uni_id, uni_type=UniType.PPTP):
990 self.log.debug('function-entry')
991
992 device = self.adapter_agent.get_device(self.device_id)
993 parent_device = self.adapter_agent.get_device(device.parent_id)
994
995 parent_adapter_agent = registry('adapter_loader').get_agent(parent_device.adapter)
996 if parent_adapter_agent is None:
997 self.log.error('parent-adapter-could-not-be-retrieved')
998
999 # TODO: This knowledge is locked away in openolt. and it assumes one onu equals one uni...
1000 parent_device = self.adapter_agent.get_device(device.parent_id)
1001 parent_adapter = parent_adapter_agent.adapter.devices[parent_device.id]
1002 uni_no = parent_adapter.platform.mk_uni_port_num(
1003 self._onu_indication.intf_id, self._onu_indication.onu_id, uni_id)
1004
1005 # TODO: Some or parts of this likely need to move to UniPort. especially the format stuff
1006 uni_name = "uni-{}".format(uni_no)
1007
1008 mac_bridge_port_num = uni_id + 1 # TODO +1 is only to test non-zero index
1009
1010 self.log.debug('uni-port-inputs', uni_no=uni_no, uni_id=uni_id, uni_name=uni_name, uni_type=uni_type,
1011 entity_id=entity_id, mac_bridge_port_num=mac_bridge_port_num)
1012
1013 uni_port = UniPort.create(self, uni_name, uni_id, uni_no, uni_name, uni_type)
1014 uni_port.entity_id = entity_id
1015 uni_port.enabled = True
1016 uni_port.mac_bridge_port_num = mac_bridge_port_num
1017
1018 self.log.debug("created-uni-port", uni=uni_port)
1019
1020 self.adapter_agent.add_port(device.id, uni_port.get_port())
1021 parent_adapter_agent.add_port(device.parent_id, uni_port.get_port())
1022
1023 self._unis[uni_port.port_number] = uni_port
1024
1025 self._onu_omci_device.alarm_synchronizer.set_alarm_params(onu_id=self._onu_indication.onu_id,
1026 uni_ports=self._unis.values())
1027 # TODO: this should be in the PonPortclass
1028 pon_port = self._pon.get_port()
1029
1030 # Delete reference to my own UNI as peer from parent.
1031 # TODO why is this here, add_port_reference_to_parent already prunes duplicates
1032 me_as_peer = Port.PeerPort(device_id=device.parent_id, port_no=uni_port.port_number)
1033 partial_pon_port = Port(port_no=pon_port.port_no, label=pon_port.label,
1034 type=pon_port.type, admin_state=pon_port.admin_state,
1035 oper_status=pon_port.oper_status,
1036 peers=[me_as_peer]) # only list myself as a peer to avoid deleting all other UNIs from parent
1037 self.adapter_agent.delete_port_reference_from_parent(self.device_id, partial_pon_port)
1038
1039 pon_port.peers.extend([me_as_peer])
1040
1041 self._pon._port = pon_port
1042
1043 self.adapter_agent.add_port_reference_to_parent(self.device_id,
1044 pon_port)