blob: f5dc8bf51d8fd7442cd0c17f7430cee8142f1474 [file] [log] [blame]
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -05001#
2# Copyright 2017 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18Broadcom OpenOMCI OLT/ONU adapter handler.
19"""
20
21import json
22import ast
23import structlog
24
25from collections import OrderedDict
26
27from twisted.internet import reactor, task
28from twisted.internet.defer import DeferredQueue, inlineCallbacks, returnValue, TimeoutError
29
30from heartbeat import HeartBeat
Matt Jeanneret72f96fc2019-02-11 10:53:05 -050031from pyvoltha.adapters.extensions.kpi.onu.onu_pm_metrics import OnuPmMetrics
32from pyvoltha.adapters.extensions.kpi.onu.onu_omci_pm import OnuOmciPmMetrics
33from pyvoltha.adapters.extensions.alarms.adapter_alarms import AdapterAlarms
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050034
Matt Jeanneret72f96fc2019-02-11 10:53:05 -050035import pyvoltha.common.openflow.utils as fd
36from pyvoltha.common.utils.registry import registry
37from pyvoltha.common.config.config_backend import ConsulStore
38from pyvoltha.common.config.config_backend import EtcdStore
39from pyvoltha.protos import third_party
40from pyvoltha.protos.common_pb2 import OperStatus, ConnectStatus, AdminState
41from pyvoltha.protos.openflow_13_pb2 import OFPXMC_OPENFLOW_BASIC, ofp_port
42from pyvoltha.adapters.extensions.omci.onu_configuration import OMCCVersion
43from pyvoltha.adapters.extensions.omci.onu_device_entry import OnuDeviceEvents, \
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050044 OnuDeviceEntry, IN_SYNC_KEY
Matt Jeanneret72f96fc2019-02-11 10:53:05 -050045from omci.brcm_mib_download_task import BrcmMibDownloadTask
46from omci.brcm_tp_service_specific_task import BrcmTpServiceSpecificTask
47from omci.brcm_uni_lock_task import BrcmUniLockTask
48from omci.brcm_vlan_filter_task import BrcmVlanFilterTask
49from onu_gem_port import *
50from onu_tcont import *
51from pon_port import *
52from uni_port import *
53from onu_traffic_descriptor import *
54from pyvoltha.common.tech_profile.tech_profile import TechProfile
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050055
56OP = EntityOperations
57RC = ReasonCodes
58
59_ = third_party
60log = structlog.get_logger()
61
62_STARTUP_RETRY_WAIT = 20
63
64
65class BrcmOpenomciOnuHandler(object):
66
67 def __init__(self, adapter, device_id):
68 self.log = structlog.get_logger(device_id=device_id)
69 self.log.debug('function-entry')
70 self.adapter = adapter
71 self.adapter_agent = adapter.adapter_agent
72 self.parent_adapter = None
73 self.parent_id = None
74 self.device_id = device_id
75 self.incoming_messages = DeferredQueue()
76 self.event_messages = DeferredQueue()
77 self.proxy_address = None
78 self.tx_id = 0
79 self._enabled = False
80 self.alarms = None
81 self.pm_metrics = None
82 self._omcc_version = OMCCVersion.Unknown
83 self._total_tcont_count = 0 # From ANI-G ME
84 self._qos_flexibility = 0 # From ONT2_G ME
85
86 self._onu_indication = None
87 self._unis = dict() # Port # -> UniPort
88
89 self._pon = None
90 # TODO: probably shouldnt be hardcoded, determine from olt maybe?
91 self._pon_port_number = 100
92 self.logical_device_id = None
93
94 self._heartbeat = HeartBeat.create(self, device_id)
95
96 # Set up OpenOMCI environment
97 self._onu_omci_device = None
98 self._dev_info_loaded = False
99 self._deferred = None
100
101 self._in_sync_subscription = None
102 self._connectivity_subscription = None
103 self._capabilities_subscription = None
104
105 self.mac_bridge_service_profile_entity_id = 0x201
106 self.gal_enet_profile_entity_id = 0x1
107
108 self._tp_service_specific_task = dict()
109 self._tech_profile_download_done = dict()
110
111 # Initialize KV store client
112 self.args = registry('main').get_args()
113 if self.args.backend == 'etcd':
114 host, port = self.args.etcd.split(':', 1)
115 self.kv_client = EtcdStore(host, port,
116 TechProfile.KV_STORE_TECH_PROFILE_PATH_PREFIX)
117 elif self.args.backend == 'consul':
118 host, port = self.args.consul.split(':', 1)
119 self.kv_client = ConsulStore(host, port,
120 TechProfile.KV_STORE_TECH_PROFILE_PATH_PREFIX)
121 else:
122 self.log.error('Invalid-backend')
123 raise Exception("Invalid-backend-for-kv-store")
124
125 # Handle received ONU event messages
126 reactor.callLater(0, self.handle_onu_events)
127
128 @property
129 def enabled(self):
130 return self._enabled
131
132 @enabled.setter
133 def enabled(self, value):
134 if self._enabled != value:
135 self._enabled = value
136
137 @property
138 def omci_agent(self):
139 return self.adapter.omci_agent
140
141 @property
142 def omci_cc(self):
143 return self._onu_omci_device.omci_cc if self._onu_omci_device is not None else None
144
145 @property
146 def heartbeat(self):
147 return self._heartbeat
148
149 @property
150 def uni_ports(self):
151 return self._unis.values()
152
153 def uni_port(self, port_no_or_name):
154 if isinstance(port_no_or_name, (str, unicode)):
155 return next((uni for uni in self.uni_ports
156 if uni.name == port_no_or_name), None)
157
158 assert isinstance(port_no_or_name, int), 'Invalid parameter type'
159 return next((uni for uni in self.uni_ports
160 if uni.logical_port_number == port_no_or_name), None)
161
162 @property
163 def pon_port(self):
164 return self._pon
165
166 def receive_message(self, msg):
167 if self.omci_cc is not None:
168 self.omci_cc.receive_message(msg)
169
170 # Called once when the adapter creates the device/onu instance
171 def activate(self, device):
172 self.log.debug('function-entry', device=device)
173
174 # first we verify that we got parent reference and proxy info
175 assert device.parent_id
176 assert device.proxy_address.device_id
177
178 # register for proxied messages right away
179 self.proxy_address = device.proxy_address
180 self.adapter_agent.register_for_proxied_messages(device.proxy_address)
181 self.parent_id = device.parent_id
182 parent_device = self.adapter_agent.get_device(self.parent_id)
183 if parent_device.type == 'openolt':
184 self.parent_adapter = registry('adapter_loader'). \
185 get_agent(parent_device.adapter).adapter
186
187 if self.enabled is not True:
188 self.log.info('activating-new-onu')
189 # populate what we know. rest comes later after mib sync
190 device.root = True
191 device.vendor = 'Broadcom'
192 device.connect_status = ConnectStatus.REACHABLE
193 device.oper_status = OperStatus.DISCOVERED
194 device.reason = 'activating-onu'
195
196 # pm_metrics requires a logical device id
197 parent_device = self.adapter_agent.get_device(device.parent_id)
198 self.logical_device_id = parent_device.parent_id
199 assert self.logical_device_id, 'Invalid logical device ID'
200
201 self.adapter_agent.update_device(device)
202
203 self.log.debug('set-device-discovered')
204
205 self._init_pon_state(device)
206
207 ############################################################################
208 # Setup PM configuration for this device
209 # Pass in ONU specific options
210 kwargs = {
211 OnuPmMetrics.DEFAULT_FREQUENCY_KEY: OnuPmMetrics.DEFAULT_ONU_COLLECTION_FREQUENCY,
212 'heartbeat': self.heartbeat,
213 OnuOmciPmMetrics.OMCI_DEV_KEY: self._onu_omci_device
214 }
215 self.pm_metrics = OnuPmMetrics(self.adapter_agent, self.device_id,
216 self.logical_device_id, grouped=True,
217 freq_override=False, **kwargs)
218 pm_config = self.pm_metrics.make_proto()
219 self._onu_omci_device.set_pm_config(self.pm_metrics.omci_pm.openomci_interval_pm)
220 self.log.info("initial-pm-config", pm_config=pm_config)
221 self.adapter_agent.update_device_pm_config(pm_config, init=True)
222
223 ############################################################################
224 # Setup Alarm handler
225 self.alarms = AdapterAlarms(self.adapter_agent, device.id, self.logical_device_id)
226 # Note, ONU ID and UNI intf set in add_uni_port method
227 self._onu_omci_device.alarm_synchronizer.set_alarm_params(mgr=self.alarms,
228 ani_ports=[self._pon])
229 self.enabled = True
230 else:
231 self.log.info('onu-already-activated')
232
233 # Called once when the adapter needs to re-create device. usually on vcore restart
234 def reconcile(self, device):
235 self.log.debug('function-entry', device=device)
236
237 # first we verify that we got parent reference and proxy info
238 assert device.parent_id
239 assert device.proxy_address.device_id
240
241 # register for proxied messages right away
242 self.proxy_address = device.proxy_address
243 self.adapter_agent.register_for_proxied_messages(device.proxy_address)
244
245 if self.enabled is not True:
246 self.log.info('reconciling-broadcom-onu-device')
247
248 self._init_pon_state(device)
249
250 # need to restart state machines on vcore restart. there is no indication to do it for us.
251 self._onu_omci_device.start()
252 device.reason = "restarting-openomci"
253 self.adapter_agent.update_device(device)
254
255 # TODO: this is probably a bit heavy handed
256 # Force a reboot for now. We need indications to reflow to reassign tconts and gems given vcore went away
257 # This may not be necessary when mib resync actually works
258 reactor.callLater(1, self.reboot)
259
260 self.enabled = True
261 else:
262 self.log.info('onu-already-activated')
263
264 @inlineCallbacks
265 def handle_onu_events(self):
266 event_msg = yield self.event_messages.get()
267 try:
268 if event_msg['event'] == 'download_tech_profile':
269 tp_path = event_msg['event_data']
270 uni_id = event_msg['uni_id']
271 self.load_and_configure_tech_profile(uni_id, tp_path)
272
273 except Exception as e:
274 self.log.error("exception-handling-onu-event", e=e)
275
276 # Handle next event
277 reactor.callLater(0, self.handle_onu_events)
278
279 def _init_pon_state(self, device):
280 self.log.debug('function-entry', device=device)
281
282 self._pon = PonPort.create(self, self._pon_port_number)
283 self.adapter_agent.add_port(device.id, self._pon.get_port())
284
285 self.log.debug('added-pon-port-to-agent', pon=self._pon)
286
287 parent_device = self.adapter_agent.get_device(device.parent_id)
288 self.logical_device_id = parent_device.parent_id
289
290 self.adapter_agent.update_device(device)
291
292 # Create and start the OpenOMCI ONU Device Entry for this ONU
293 self._onu_omci_device = self.omci_agent.add_device(self.device_id,
294 self.adapter_agent,
295 support_classes=self.adapter.broadcom_omci,
296 custom_me_map=self.adapter.custom_me_entities())
297 # Port startup
298 if self._pon is not None:
299 self._pon.enabled = True
300
301 # TODO: move to UniPort
302 def update_logical_port(self, logical_device_id, port_id, state):
303 try:
304 self.log.info('updating-logical-port', logical_port_id=port_id,
305 logical_device_id=logical_device_id, state=state)
306 logical_port = self.adapter_agent.get_logical_port(logical_device_id,
307 port_id)
308 logical_port.ofp_port.state = state
309 self.adapter_agent.update_logical_port(logical_device_id,
310 logical_port)
311 except Exception as e:
312 self.log.exception("exception-updating-port", e=e)
313
314 def delete(self, device):
315 self.log.info('delete-onu', device=device)
316 if self.parent_adapter:
317 try:
318 self.parent_adapter.delete_child_device(self.parent_id, device)
319 except AttributeError:
320 self.log.debug('parent-device-delete-child-not-implemented')
321 else:
322 self.log.debug("parent-adapter-not-available")
323
324 def _create_tconts(self, uni_id, us_scheduler):
325 alloc_id = us_scheduler['alloc_id']
326 q_sched_policy = us_scheduler['q_sched_policy']
327 self.log.debug('create-tcont', us_scheduler=us_scheduler)
328
329 tcontdict = dict()
330 tcontdict['alloc-id'] = alloc_id
331 tcontdict['q_sched_policy'] = q_sched_policy
332 tcontdict['uni_id'] = uni_id
333
334 # TODO: Not sure what to do with any of this...
335 tddata = dict()
336 tddata['name'] = 'not-sure-td-profile'
337 tddata['fixed-bandwidth'] = "not-sure-fixed"
338 tddata['assured-bandwidth'] = "not-sure-assured"
339 tddata['maximum-bandwidth'] = "not-sure-max"
340 tddata['additional-bw-eligibility-indicator'] = "not-sure-additional"
341
342 td = OnuTrafficDescriptor.create(tddata)
343 tcont = OnuTCont.create(self, tcont=tcontdict, td=td)
344
345 self._pon.add_tcont(tcont)
346
347 self.log.debug('pon-add-tcont', tcont=tcont)
348
349 # Called when there is an olt up indication, providing the gem port id chosen by the olt handler
350 def _create_gemports(self, uni_id, gem_ports, alloc_id_ref, direction):
351 self.log.debug('create-gemport',
352 gem_ports=gem_ports, direction=direction)
353
354 for gem_port in gem_ports:
355 gemdict = dict()
356 gemdict['gemport_id'] = gem_port['gemport_id']
357 gemdict['direction'] = direction
358 gemdict['alloc_id_ref'] = alloc_id_ref
359 gemdict['encryption'] = gem_port['aes_encryption']
360 gemdict['discard_config'] = dict()
361 gemdict['discard_config']['max_probability'] = \
362 gem_port['discard_config']['max_probability']
363 gemdict['discard_config']['max_threshold'] = \
364 gem_port['discard_config']['max_threshold']
365 gemdict['discard_config']['min_threshold'] = \
366 gem_port['discard_config']['min_threshold']
367 gemdict['discard_policy'] = gem_port['discard_policy']
368 gemdict['max_q_size'] = gem_port['max_q_size']
369 gemdict['pbit_map'] = gem_port['pbit_map']
370 gemdict['priority_q'] = gem_port['priority_q']
371 gemdict['scheduling_policy'] = gem_port['scheduling_policy']
372 gemdict['weight'] = gem_port['weight']
373 gemdict['uni_id'] = uni_id
374
375 gem_port = OnuGemPort.create(self, gem_port=gemdict)
376
377 self._pon.add_gem_port(gem_port)
378
379 self.log.debug('pon-add-gemport', gem_port=gem_port)
380
381 def _do_tech_profile_configuration(self, uni_id, tp):
382 num_of_tconts = tp['num_of_tconts']
383 us_scheduler = tp['us_scheduler']
384 alloc_id = us_scheduler['alloc_id']
385 self._create_tconts(uni_id, us_scheduler)
386 upstream_gem_port_attribute_list = tp['upstream_gem_port_attribute_list']
387 self._create_gemports(uni_id, upstream_gem_port_attribute_list, alloc_id, "UPSTREAM")
388 downstream_gem_port_attribute_list = tp['downstream_gem_port_attribute_list']
389 self._create_gemports(uni_id, downstream_gem_port_attribute_list, alloc_id, "DOWNSTREAM")
390
391 def load_and_configure_tech_profile(self, uni_id, tp_path):
392 self.log.debug("loading-tech-profile-configuration", uni_id=uni_id, tp_path=tp_path)
393
394 if uni_id not in self._tp_service_specific_task:
395 self._tp_service_specific_task[uni_id] = dict()
396
397 if uni_id not in self._tech_profile_download_done:
398 self._tech_profile_download_done[uni_id] = dict()
399
400 if tp_path not in self._tech_profile_download_done[uni_id]:
401 self._tech_profile_download_done[uni_id][tp_path] = False
402
403 if not self._tech_profile_download_done[uni_id][tp_path]:
404 try:
405 if tp_path in self._tp_service_specific_task[uni_id]:
406 self.log.info("tech-profile-config-already-in-progress",
407 tp_path=tp_path)
408 return
409
410 tp = self.kv_client[tp_path]
411 tp = ast.literal_eval(tp)
412 self.log.debug("tp-instance", tp=tp)
413 self._do_tech_profile_configuration(uni_id, tp)
414
415 def success(_results):
416 self.log.info("tech-profile-config-done-successfully")
417 device = self.adapter_agent.get_device(self.device_id)
418 device.reason = 'tech-profile-config-download-success'
419 self.adapter_agent.update_device(device)
420 if tp_path in self._tp_service_specific_task[uni_id]:
421 del self._tp_service_specific_task[uni_id][tp_path]
422 self._tech_profile_download_done[uni_id][tp_path] = True
423
424 def failure(_reason):
425 self.log.warn('tech-profile-config-failure-retrying',
426 _reason=_reason)
427 device = self.adapter_agent.get_device(self.device_id)
428 device.reason = 'tech-profile-config-download-failure-retrying'
429 self.adapter_agent.update_device(device)
430 if tp_path in self._tp_service_specific_task[uni_id]:
431 del self._tp_service_specific_task[uni_id][tp_path]
432 self._deferred = reactor.callLater(_STARTUP_RETRY_WAIT, self.load_and_configure_tech_profile,
433 uni_id, tp_path)
434
435 self.log.info('downloading-tech-profile-configuration')
436 self._tp_service_specific_task[uni_id][tp_path] = \
437 BrcmTpServiceSpecificTask(self.omci_agent, self, uni_id)
438 self._deferred = \
439 self._onu_omci_device.task_runner.queue_task(self._tp_service_specific_task[uni_id][tp_path])
440 self._deferred.addCallbacks(success, failure)
441
442 except Exception as e:
443 self.log.exception("error-loading-tech-profile", e=e)
444 else:
445 self.log.info("tech-profile-config-already-done")
446
447 def update_pm_config(self, device, pm_config):
448 # TODO: This has not been tested
449 self.log.info('update_pm_config', pm_config=pm_config)
450 self.pm_metrics.update(pm_config)
451
452 # Calling this assumes the onu is active/ready and had at least an initial mib downloaded. This gets called from
453 # flow decomposition that ultimately comes from onos
454 def update_flow_table(self, device, flows):
455 self.log.debug('function-entry', device=device, flows=flows)
456
457 #
458 # We need to proxy through the OLT to get to the ONU
459 # Configuration from here should be using OMCI
460 #
461 # self.log.info('bulk-flow-update', device_id=device.id, flows=flows)
462
463 # no point in pushing omci flows if the device isnt reachable
464 if device.connect_status != ConnectStatus.REACHABLE or \
465 device.admin_state != AdminState.ENABLED:
466 self.log.warn("device-disabled-or-offline-skipping-flow-update",
467 admin=device.admin_state, connect=device.connect_status)
468 return
469
470 def is_downstream(port):
471 return port == self._pon_port_number
472
473 def is_upstream(port):
474 return not is_downstream(port)
475
476 for flow in flows:
477 _type = None
478 _port = None
479 _vlan_vid = None
480 _udp_dst = None
481 _udp_src = None
482 _ipv4_dst = None
483 _ipv4_src = None
484 _metadata = None
485 _output = None
486 _push_tpid = None
487 _field = None
488 _set_vlan_vid = None
489 self.log.debug('bulk-flow-update', device_id=device.id, flow=flow)
490 try:
491 _in_port = fd.get_in_port(flow)
492 assert _in_port is not None
493
494 _out_port = fd.get_out_port(flow) # may be None
495
496 if is_downstream(_in_port):
497 self.log.debug('downstream-flow', in_port=_in_port, out_port=_out_port)
498 uni_port = self.uni_port(_out_port)
499 elif is_upstream(_in_port):
500 self.log.debug('upstream-flow', in_port=_in_port, out_port=_out_port)
501 uni_port = self.uni_port(_in_port)
502 else:
503 raise Exception('port should be 1 or 2 by our convention')
504
505 self.log.debug('flow-ports', in_port=_in_port, out_port=_out_port, uni_port=str(uni_port))
506
507 for field in fd.get_ofb_fields(flow):
508 if field.type == fd.ETH_TYPE:
509 _type = field.eth_type
510 self.log.debug('field-type-eth-type',
511 eth_type=_type)
512
513 elif field.type == fd.IP_PROTO:
514 _proto = field.ip_proto
515 self.log.debug('field-type-ip-proto',
516 ip_proto=_proto)
517
518 elif field.type == fd.IN_PORT:
519 _port = field.port
520 self.log.debug('field-type-in-port',
521 in_port=_port)
522
523 elif field.type == fd.VLAN_VID:
524 _vlan_vid = field.vlan_vid & 0xfff
525 self.log.debug('field-type-vlan-vid',
526 vlan=_vlan_vid)
527
528 elif field.type == fd.VLAN_PCP:
529 _vlan_pcp = field.vlan_pcp
530 self.log.debug('field-type-vlan-pcp',
531 pcp=_vlan_pcp)
532
533 elif field.type == fd.UDP_DST:
534 _udp_dst = field.udp_dst
535 self.log.debug('field-type-udp-dst',
536 udp_dst=_udp_dst)
537
538 elif field.type == fd.UDP_SRC:
539 _udp_src = field.udp_src
540 self.log.debug('field-type-udp-src',
541 udp_src=_udp_src)
542
543 elif field.type == fd.IPV4_DST:
544 _ipv4_dst = field.ipv4_dst
545 self.log.debug('field-type-ipv4-dst',
546 ipv4_dst=_ipv4_dst)
547
548 elif field.type == fd.IPV4_SRC:
549 _ipv4_src = field.ipv4_src
550 self.log.debug('field-type-ipv4-src',
551 ipv4_dst=_ipv4_src)
552
553 elif field.type == fd.METADATA:
554 _metadata = field.table_metadata
555 self.log.debug('field-type-metadata',
556 metadata=_metadata)
557
558 else:
559 raise NotImplementedError('field.type={}'.format(
560 field.type))
561
562 for action in fd.get_actions(flow):
563
564 if action.type == fd.OUTPUT:
565 _output = action.output.port
566 self.log.debug('action-type-output',
567 output=_output, in_port=_in_port)
568
569 elif action.type == fd.POP_VLAN:
570 self.log.debug('action-type-pop-vlan',
571 in_port=_in_port)
572
573 elif action.type == fd.PUSH_VLAN:
574 _push_tpid = action.push.ethertype
575 self.log.debug('action-type-push-vlan',
576 push_tpid=_push_tpid, in_port=_in_port)
577 if action.push.ethertype != 0x8100:
578 self.log.error('unhandled-tpid',
579 ethertype=action.push.ethertype)
580
581 elif action.type == fd.SET_FIELD:
582 _field = action.set_field.field.ofb_field
583 assert (action.set_field.field.oxm_class ==
584 OFPXMC_OPENFLOW_BASIC)
585 self.log.debug('action-type-set-field',
586 field=_field, in_port=_in_port)
587 if _field.type == fd.VLAN_VID:
588 _set_vlan_vid = _field.vlan_vid & 0xfff
589 self.log.debug('set-field-type-vlan-vid',
590 vlan_vid=_set_vlan_vid)
591 else:
592 self.log.error('unsupported-action-set-field-type',
593 field_type=_field.type)
594 else:
595 self.log.error('unsupported-action-type',
596 action_type=action.type, in_port=_in_port)
597
598 # TODO: We only set vlan omci flows. Handle omci matching ethertypes at some point in another task
599 if _type is not None:
600 self.log.warn('ignoring-flow-with-ethType', ethType=_type)
601 elif _set_vlan_vid is None or _set_vlan_vid == 0:
602 self.log.warn('ignorning-flow-that-does-not-set-vlanid')
603 else:
604 self.log.warn('set-vlanid', uni_id=uni_port.port_number, set_vlan_vid=_set_vlan_vid)
605 self._add_vlan_filter_task(device, uni_port, _set_vlan_vid)
606
607 except Exception as e:
608 self.log.exception('failed-to-install-flow', e=e, flow=flow)
609
610
611 def _add_vlan_filter_task(self, device, uni_port, _set_vlan_vid):
612 assert uni_port is not None
613
614 def success(_results):
615 self.log.info('vlan-tagging-success', uni_port=uni_port, vlan=_set_vlan_vid)
616 device.reason = 'omci-flows-pushed'
617 self._vlan_filter_task = None
618
619 def failure(_reason):
620 self.log.warn('vlan-tagging-failure', uni_port=uni_port, vlan=_set_vlan_vid)
621 device.reason = 'omci-flows-failed-retrying'
622 self._vlan_filter_task = reactor.callLater(_STARTUP_RETRY_WAIT,
623 self._add_vlan_filter_task, device, uni_port, _set_vlan_vid)
624
625 self.log.info('setting-vlan-tag')
626 self._vlan_filter_task = BrcmVlanFilterTask(self.omci_agent, self.device_id, uni_port, _set_vlan_vid)
627 self._deferred = self._onu_omci_device.task_runner.queue_task(self._vlan_filter_task)
628 self._deferred.addCallbacks(success, failure)
629
630 def get_tx_id(self):
631 self.log.debug('function-entry')
632 self.tx_id += 1
633 return self.tx_id
634
635 # TODO: Actually conform to or create a proper interface.
636 # this and the other functions called from the olt arent very clear.
637 # Called each time there is an onu "up" indication from the olt handler
638 def create_interface(self, data):
639 self.log.debug('function-entry', data=data)
640 self._onu_indication = data
641
642 onu_device = self.adapter_agent.get_device(self.device_id)
643
644 self.log.debug('starting-openomci-statemachine')
645 self._subscribe_to_events()
646 reactor.callLater(1, self._onu_omci_device.start)
647 onu_device.reason = "starting-openomci"
648 self.adapter_agent.update_device(onu_device)
649 self._heartbeat.enabled = True
650
651 # Currently called each time there is an onu "down" indication from the olt handler
652 # TODO: possibly other reasons to "update" from the olt?
653 def update_interface(self, data):
654 self.log.debug('function-entry', data=data)
655 oper_state = data.get('oper_state', None)
656
657 onu_device = self.adapter_agent.get_device(self.device_id)
658
659 if oper_state == 'down':
660 self.log.debug('stopping-openomci-statemachine')
661 reactor.callLater(0, self._onu_omci_device.stop)
662
663 # Let TP download happen again
664 for uni_id in self._tp_service_specific_task:
665 self._tp_service_specific_task[uni_id].clear()
666 for uni_id in self._tech_profile_download_done:
667 self._tech_profile_download_done[uni_id].clear()
668
669 self.disable_ports(onu_device)
670 onu_device.reason = "stopping-openomci"
671 onu_device.connect_status = ConnectStatus.UNREACHABLE
672 onu_device.oper_status = OperStatus.DISCOVERED
673 self.adapter_agent.update_device(onu_device)
674 else:
675 self.log.debug('not-changing-openomci-statemachine')
676
677 # Not currently called by olt or anything else
678 def remove_interface(self, data):
679 self.log.debug('function-entry', data=data)
680
681 onu_device = self.adapter_agent.get_device(self.device_id)
682
683 self.log.debug('stopping-openomci-statemachine')
684 reactor.callLater(0, self._onu_omci_device.stop)
685
686 # Let TP download happen again
687 for uni_id in self._tp_service_specific_task:
688 self._tp_service_specific_task[uni_id].clear()
689 for uni_id in self._tech_profile_download_done:
690 self._tech_profile_download_done[uni_id].clear()
691
692 self.disable_ports(onu_device)
693 onu_device.reason = "stopping-openomci"
694 self.adapter_agent.update_device(onu_device)
695
696 # TODO: im sure there is more to do here
697
698 # Not currently called. Would be called presumably from the olt handler
699 def remove_gemport(self, data):
700 self.log.debug('remove-gemport', data=data)
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500701 device = self.adapter_agent.get_device(self.device_id)
702 if device.connect_status != ConnectStatus.REACHABLE:
703 self.log.error('device-unreachable')
704 return
705
706 # Not currently called. Would be called presumably from the olt handler
707 def remove_tcont(self, tcont_data, traffic_descriptor_data):
708 self.log.debug('remove-tcont', tcont_data=tcont_data, traffic_descriptor_data=traffic_descriptor_data)
709 device = self.adapter_agent.get_device(self.device_id)
710 if device.connect_status != ConnectStatus.REACHABLE:
711 self.log.error('device-unreachable')
712 return
713
714 # TODO: Create some omci task that encompases this what intended
715
716 # Not currently called. Would be called presumably from the olt handler
717 def create_multicast_gemport(self, data):
718 self.log.debug('function-entry', data=data)
719
720 # TODO: create objects and populate for later omci calls
721
722 def disable(self, device):
723 self.log.debug('function-entry', device=device)
724 try:
725 self.log.info('sending-uni-lock-towards-device', device=device)
726
727 def stop_anyway(reason):
728 # proceed with disable regardless if we could reach the onu. for example onu is unplugged
729 self.log.debug('stopping-openomci-statemachine')
730 reactor.callLater(0, self._onu_omci_device.stop)
731
732 # Let TP download happen again
733 for uni_id in self._tp_service_specific_task:
734 self._tp_service_specific_task[uni_id].clear()
735 for uni_id in self._tech_profile_download_done:
736 self._tech_profile_download_done[uni_id].clear()
737
738 self.disable_ports(device)
739 device.oper_status = OperStatus.UNKNOWN
740 device.reason = "omci-admin-lock"
741 self.adapter_agent.update_device(device)
742
743 # lock all the unis
744 task = BrcmUniLockTask(self.omci_agent, self.device_id, lock=True)
745 self._deferred = self._onu_omci_device.task_runner.queue_task(task)
746 self._deferred.addCallbacks(stop_anyway, stop_anyway)
747 except Exception as e:
748 log.exception('exception-in-onu-disable', exception=e)
749
750 def reenable(self, device):
751 self.log.debug('function-entry', device=device)
752 try:
753 # Start up OpenOMCI state machines for this device
754 # this will ultimately resync mib and unlock unis on successful redownloading the mib
755 self.log.debug('restarting-openomci-statemachine')
756 self._subscribe_to_events()
757 device.reason = "restarting-openomci"
758 self.adapter_agent.update_device(device)
759 reactor.callLater(1, self._onu_omci_device.start)
760 self._heartbeat.enabled = True
761 except Exception as e:
762 log.exception('exception-in-onu-reenable', exception=e)
763
764 def reboot(self):
765 self.log.info('reboot-device')
766 device = self.adapter_agent.get_device(self.device_id)
767 if device.connect_status != ConnectStatus.REACHABLE:
768 self.log.error("device-unreachable")
769 return
770
771 def success(_results):
772 self.log.info('reboot-success', _results=_results)
773 self.disable_ports(device)
774 device.connect_status = ConnectStatus.UNREACHABLE
775 device.oper_status = OperStatus.DISCOVERED
776 device.reason = "rebooting"
777 self.adapter_agent.update_device(device)
778
779 def failure(_reason):
780 self.log.info('reboot-failure', _reason=_reason)
781
782 self._deferred = self._onu_omci_device.reboot()
783 self._deferred.addCallbacks(success, failure)
784
785 def disable_ports(self, onu_device):
786 self.log.info('disable-ports', device_id=self.device_id,
787 onu_device=onu_device)
788
789 # Disable all ports on that device
790 self.adapter_agent.disable_all_ports(self.device_id)
791
792 parent_device = self.adapter_agent.get_device(onu_device.parent_id)
793 assert parent_device
794 logical_device_id = parent_device.parent_id
795 assert logical_device_id
796 ports = self.adapter_agent.get_ports(onu_device.id, Port.ETHERNET_UNI)
797 for port in ports:
798 port_id = 'uni-{}'.format(port.port_no)
799 # TODO: move to UniPort
800 self.update_logical_port(logical_device_id, port_id, OFPPS_LINK_DOWN)
801
802 def enable_ports(self, onu_device):
803 self.log.info('enable-ports', device_id=self.device_id, onu_device=onu_device)
804
805 # Disable all ports on that device
806 self.adapter_agent.enable_all_ports(self.device_id)
807
808 parent_device = self.adapter_agent.get_device(onu_device.parent_id)
809 assert parent_device
810 logical_device_id = parent_device.parent_id
811 assert logical_device_id
812 ports = self.adapter_agent.get_ports(onu_device.id, Port.ETHERNET_UNI)
813 for port in ports:
814 port_id = 'uni-{}'.format(port.port_no)
815 # TODO: move to UniPort
816 self.update_logical_port(logical_device_id, port_id, OFPPS_LIVE)
817
818 # Called just before openomci state machine is started. These listen for events from selected state machines,
819 # most importantly, mib in sync. Which ultimately leads to downloading the mib
820 def _subscribe_to_events(self):
821 self.log.debug('function-entry')
822
823 # OMCI MIB Database sync status
824 bus = self._onu_omci_device.event_bus
825 topic = OnuDeviceEntry.event_bus_topic(self.device_id,
826 OnuDeviceEvents.MibDatabaseSyncEvent)
827 self._in_sync_subscription = bus.subscribe(topic, self.in_sync_handler)
828
829 # OMCI Capabilities
830 bus = self._onu_omci_device.event_bus
831 topic = OnuDeviceEntry.event_bus_topic(self.device_id,
832 OnuDeviceEvents.OmciCapabilitiesEvent)
833 self._capabilities_subscription = bus.subscribe(topic, self.capabilties_handler)
834
835 # Called when the mib is in sync
836 def in_sync_handler(self, _topic, msg):
837 self.log.debug('function-entry', _topic=_topic, msg=msg)
838 if self._in_sync_subscription is not None:
839 try:
840 in_sync = msg[IN_SYNC_KEY]
841
842 if in_sync:
843 # Only call this once
844 bus = self._onu_omci_device.event_bus
845 bus.unsubscribe(self._in_sync_subscription)
846 self._in_sync_subscription = None
847
848 # Start up device_info load
849 self.log.debug('running-mib-sync')
850 reactor.callLater(0, self._mib_in_sync)
851
852 except Exception as e:
853 self.log.exception('in-sync', e=e)
854
855 def capabilties_handler(self, _topic, _msg):
856 self.log.debug('function-entry', _topic=_topic, msg=_msg)
857 if self._capabilities_subscription is not None:
858 self.log.debug('capabilities-handler-done')
859
860 # Mib is in sync, we can now query what we learned and actually start pushing ME (download) to the ONU.
861 # Currently uses a basic mib download task that create a bridge with a single gem port and uni, only allowing EAP
862 # Implement your own MibDownloadTask if you wish to setup something different by default
863 def _mib_in_sync(self):
864 self.log.debug('function-entry')
865
866 omci = self._onu_omci_device
867 in_sync = omci.mib_db_in_sync
868
869 device = self.adapter_agent.get_device(self.device_id)
870 device.reason = 'discovery-mibsync-complete'
871 self.adapter_agent.update_device(device)
872
873 if not self._dev_info_loaded:
874 self.log.info('loading-device-data-from-mib', in_sync=in_sync, already_loaded=self._dev_info_loaded)
875
876 omci_dev = self._onu_omci_device
877 config = omci_dev.configuration
878
879 # TODO: run this sooner somehow. shouldnt have to wait for mib sync to push an initial download
880 # In Sync, we can register logical ports now. Ideally this could occur on
881 # the first time we received a successful (no timeout) OMCI Rx response.
882 try:
883
884 # sort the lists so we get consistent port ordering.
885 ani_list = sorted(config.ani_g_entities) if config.ani_g_entities else []
886 uni_list = sorted(config.uni_g_entities) if config.uni_g_entities else []
887 pptp_list = sorted(config.pptp_entities) if config.pptp_entities else []
888 veip_list = sorted(config.veip_entities) if config.veip_entities else []
889
890 if ani_list is None or (pptp_list is None and veip_list is None):
891 device.reason = 'onu-missing-required-elements'
892 self.log.warn("no-ani-or-unis")
893 self.adapter_agent.update_device(device)
894 raise Exception("onu-missing-required-elements")
895
896 # Currently logging the ani, pptp, veip, and uni for information purposes.
897 # Actually act on the veip/pptp as its ME is the most correct one to use in later tasks.
898 # And in some ONU the UNI-G list is incomplete or incorrect...
899 for entity_id in ani_list:
900 ani_value = config.ani_g_entities[entity_id]
901 self.log.debug("discovered-ani", entity_id=entity_id, value=ani_value)
902 # TODO: currently only one OLT PON port/ANI, so this works out. With NGPON there will be 2..?
903 self._total_tcont_count = ani_value.get('total-tcont-count')
904 self.log.debug("set-total-tcont-count", tcont_count=self._total_tcont_count)
905
906 for entity_id in uni_list:
907 uni_value = config.uni_g_entities[entity_id]
908 self.log.debug("discovered-uni", entity_id=entity_id, value=uni_value)
909
910 uni_entities = OrderedDict()
911 for entity_id in pptp_list:
912 pptp_value = config.pptp_entities[entity_id]
913 self.log.debug("discovered-pptp", entity_id=entity_id, value=pptp_value)
914 uni_entities[entity_id] = UniType.PPTP
915
916 for entity_id in veip_list:
917 veip_value = config.veip_entities[entity_id]
918 self.log.debug("discovered-veip", entity_id=entity_id, value=veip_value)
919 uni_entities[entity_id] = UniType.VEIP
920
921 uni_id = 0
922 for entity_id, uni_type in uni_entities.iteritems():
923 try:
924 self._add_uni_port(entity_id, uni_id, uni_type)
925 uni_id += 1
926 except AssertionError as e:
927 self.log.warn("could not add UNI", entity_id=entity_id, uni_type=uni_type, e=e)
928
929 multi_uni = len(self._unis) > 1
930 for uni_port in self._unis.itervalues():
931 uni_port.add_logical_port(uni_port.port_number, multi_uni)
932
933 self.adapter_agent.update_device(device)
934
935 self._qos_flexibility = config.qos_configuration_flexibility or 0
936 self._omcc_version = config.omcc_version or OMCCVersion.Unknown
937
938 if self._unis:
939 self._dev_info_loaded = True
940 else:
941 device.reason = 'no-usable-unis'
942 self.adapter_agent.update_device(device)
943 self.log.warn("no-usable-unis")
944 raise Exception("no-usable-unis")
945
946 except Exception as e:
947 self.log.exception('device-info-load', e=e)
948 self._deferred = reactor.callLater(_STARTUP_RETRY_WAIT, self._mib_in_sync)
949
950 else:
951 self.log.info('device-info-already-loaded', in_sync=in_sync, already_loaded=self._dev_info_loaded)
952
953 if self._dev_info_loaded:
954 if device.admin_state == AdminState.ENABLED:
955 def success(_results):
956 self.log.info('mib-download-success', _results=_results)
957 device = self.adapter_agent.get_device(self.device_id)
958 device.reason = 'initial-mib-downloaded'
959 device.oper_status = OperStatus.ACTIVE
960 device.connect_status = ConnectStatus.REACHABLE
961 self.enable_ports(device)
962 self.adapter_agent.update_device(device)
963 self._mib_download_task = None
964
965 def failure(_reason):
966 self.log.warn('mib-download-failure-retrying', _reason=_reason)
967 device.reason = 'initial-mib-download-failure-retrying'
968 self.adapter_agent.update_device(device)
969 self._deferred = reactor.callLater(_STARTUP_RETRY_WAIT, self._mib_in_sync)
970
971 # Download an initial mib that creates simple bridge that can pass EAP. On success (above) finally set
972 # the device to active/reachable. This then opens up the handler to openflow pushes from outside
973 self.log.info('downloading-initial-mib-configuration')
974 self._mib_download_task = BrcmMibDownloadTask(self.omci_agent, self)
975 self._deferred = self._onu_omci_device.task_runner.queue_task(self._mib_download_task)
976 self._deferred.addCallbacks(success, failure)
977 else:
978 self.log.info('admin-down-disabling')
979 self.disable(device)
980 else:
981 self.log.info('device-info-not-loaded-skipping-mib-download')
982
983
984 def _add_uni_port(self, entity_id, uni_id, uni_type=UniType.PPTP):
985 self.log.debug('function-entry')
986
987 device = self.adapter_agent.get_device(self.device_id)
988 parent_device = self.adapter_agent.get_device(device.parent_id)
989
990 parent_adapter_agent = registry('adapter_loader').get_agent(parent_device.adapter)
991 if parent_adapter_agent is None:
992 self.log.error('parent-adapter-could-not-be-retrieved')
993
994 # TODO: This knowledge is locked away in openolt. and it assumes one onu equals one uni...
995 parent_device = self.adapter_agent.get_device(device.parent_id)
996 parent_adapter = parent_adapter_agent.adapter.devices[parent_device.id]
997 uni_no = parent_adapter.platform.mk_uni_port_num(
998 self._onu_indication.intf_id, self._onu_indication.onu_id, uni_id)
999
1000 # TODO: Some or parts of this likely need to move to UniPort. especially the format stuff
1001 uni_name = "uni-{}".format(uni_no)
1002
1003 mac_bridge_port_num = uni_id + 1 # TODO +1 is only to test non-zero index
1004
1005 self.log.debug('uni-port-inputs', uni_no=uni_no, uni_id=uni_id, uni_name=uni_name, uni_type=uni_type,
1006 entity_id=entity_id, mac_bridge_port_num=mac_bridge_port_num)
1007
1008 uni_port = UniPort.create(self, uni_name, uni_id, uni_no, uni_name, uni_type)
1009 uni_port.entity_id = entity_id
1010 uni_port.enabled = True
1011 uni_port.mac_bridge_port_num = mac_bridge_port_num
1012
1013 self.log.debug("created-uni-port", uni=uni_port)
1014
1015 self.adapter_agent.add_port(device.id, uni_port.get_port())
1016 parent_adapter_agent.add_port(device.parent_id, uni_port.get_port())
1017
1018 self._unis[uni_port.port_number] = uni_port
1019
1020 self._onu_omci_device.alarm_synchronizer.set_alarm_params(onu_id=self._onu_indication.onu_id,
1021 uni_ports=self._unis.values())
1022 # TODO: this should be in the PonPortclass
1023 pon_port = self._pon.get_port()
1024
1025 # Delete reference to my own UNI as peer from parent.
1026 # TODO why is this here, add_port_reference_to_parent already prunes duplicates
1027 me_as_peer = Port.PeerPort(device_id=device.parent_id, port_no=uni_port.port_number)
1028 partial_pon_port = Port(port_no=pon_port.port_no, label=pon_port.label,
1029 type=pon_port.type, admin_state=pon_port.admin_state,
1030 oper_status=pon_port.oper_status,
1031 peers=[me_as_peer]) # only list myself as a peer to avoid deleting all other UNIs from parent
1032 self.adapter_agent.delete_port_reference_from_parent(self.device_id, partial_pon_port)
1033
1034 pon_port.peers.extend([me_as_peer])
1035
1036 self._pon._port = pon_port
1037
1038 self.adapter_agent.add_port_reference_to_parent(self.device_id,
1039 pon_port)