blob: c9eefaf62bb26bca91147fbfdee38d9210455824 [file] [log] [blame]
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -05001#
2# Copyright 2017 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18Broadcom OpenOMCI OLT/ONU adapter handler.
19"""
20
21import json
22import ast
23import structlog
24
25from collections import OrderedDict
26
27from twisted.internet import reactor, task
28from twisted.internet.defer import DeferredQueue, inlineCallbacks, returnValue, TimeoutError
29
30from heartbeat import HeartBeat
Matt Jeanneret72f96fc2019-02-11 10:53:05 -050031from pyvoltha.adapters.extensions.kpi.onu.onu_pm_metrics import OnuPmMetrics
32from pyvoltha.adapters.extensions.kpi.onu.onu_omci_pm import OnuOmciPmMetrics
33from pyvoltha.adapters.extensions.alarms.adapter_alarms import AdapterAlarms
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050034
Matt Jeanneret72f96fc2019-02-11 10:53:05 -050035import pyvoltha.common.openflow.utils as fd
36from pyvoltha.common.utils.registry import registry
37from pyvoltha.common.config.config_backend import ConsulStore
38from pyvoltha.common.config.config_backend import EtcdStore
39from pyvoltha.protos import third_party
40from pyvoltha.protos.common_pb2 import OperStatus, ConnectStatus, AdminState
41from pyvoltha.protos.openflow_13_pb2 import OFPXMC_OPENFLOW_BASIC, ofp_port
42from pyvoltha.adapters.extensions.omci.onu_configuration import OMCCVersion
43from pyvoltha.adapters.extensions.omci.onu_device_entry import OnuDeviceEvents, \
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050044 OnuDeviceEntry, IN_SYNC_KEY
Matt Jeanneret72f96fc2019-02-11 10:53:05 -050045from omci.brcm_mib_download_task import BrcmMibDownloadTask
46from omci.brcm_tp_service_specific_task import BrcmTpServiceSpecificTask
47from omci.brcm_uni_lock_task import BrcmUniLockTask
48from omci.brcm_vlan_filter_task import BrcmVlanFilterTask
49from onu_gem_port import *
50from onu_tcont import *
51from pon_port import *
52from uni_port import *
53from onu_traffic_descriptor import *
54from pyvoltha.common.tech_profile.tech_profile import TechProfile
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050055
56OP = EntityOperations
57RC = ReasonCodes
58
59_ = third_party
60log = structlog.get_logger()
61
62_STARTUP_RETRY_WAIT = 20
63
64
65class BrcmOpenomciOnuHandler(object):
66
67 def __init__(self, adapter, device_id):
68 self.log = structlog.get_logger(device_id=device_id)
69 self.log.debug('function-entry')
70 self.adapter = adapter
71 self.adapter_agent = adapter.adapter_agent
72 self.parent_adapter = None
73 self.parent_id = None
74 self.device_id = device_id
75 self.incoming_messages = DeferredQueue()
76 self.event_messages = DeferredQueue()
77 self.proxy_address = None
78 self.tx_id = 0
79 self._enabled = False
80 self.alarms = None
81 self.pm_metrics = None
82 self._omcc_version = OMCCVersion.Unknown
83 self._total_tcont_count = 0 # From ANI-G ME
84 self._qos_flexibility = 0 # From ONT2_G ME
85
86 self._onu_indication = None
87 self._unis = dict() # Port # -> UniPort
88
89 self._pon = None
90 # TODO: probably shouldnt be hardcoded, determine from olt maybe?
91 self._pon_port_number = 100
92 self.logical_device_id = None
93
94 self._heartbeat = HeartBeat.create(self, device_id)
95
96 # Set up OpenOMCI environment
97 self._onu_omci_device = None
98 self._dev_info_loaded = False
99 self._deferred = None
100
101 self._in_sync_subscription = None
102 self._connectivity_subscription = None
103 self._capabilities_subscription = None
104
105 self.mac_bridge_service_profile_entity_id = 0x201
106 self.gal_enet_profile_entity_id = 0x1
107
108 self._tp_service_specific_task = dict()
109 self._tech_profile_download_done = dict()
110
111 # Initialize KV store client
112 self.args = registry('main').get_args()
113 if self.args.backend == 'etcd':
114 host, port = self.args.etcd.split(':', 1)
115 self.kv_client = EtcdStore(host, port,
116 TechProfile.KV_STORE_TECH_PROFILE_PATH_PREFIX)
117 elif self.args.backend == 'consul':
118 host, port = self.args.consul.split(':', 1)
119 self.kv_client = ConsulStore(host, port,
120 TechProfile.KV_STORE_TECH_PROFILE_PATH_PREFIX)
121 else:
122 self.log.error('Invalid-backend')
123 raise Exception("Invalid-backend-for-kv-store")
124
125 # Handle received ONU event messages
126 reactor.callLater(0, self.handle_onu_events)
127
128 @property
129 def enabled(self):
130 return self._enabled
131
132 @enabled.setter
133 def enabled(self, value):
134 if self._enabled != value:
135 self._enabled = value
136
137 @property
138 def omci_agent(self):
139 return self.adapter.omci_agent
140
141 @property
142 def omci_cc(self):
143 return self._onu_omci_device.omci_cc if self._onu_omci_device is not None else None
144
145 @property
146 def heartbeat(self):
147 return self._heartbeat
148
149 @property
150 def uni_ports(self):
151 return self._unis.values()
152
153 def uni_port(self, port_no_or_name):
154 if isinstance(port_no_or_name, (str, unicode)):
155 return next((uni for uni in self.uni_ports
156 if uni.name == port_no_or_name), None)
157
158 assert isinstance(port_no_or_name, int), 'Invalid parameter type'
159 return next((uni for uni in self.uni_ports
160 if uni.logical_port_number == port_no_or_name), None)
161
162 @property
163 def pon_port(self):
164 return self._pon
165
166 def receive_message(self, msg):
167 if self.omci_cc is not None:
168 self.omci_cc.receive_message(msg)
169
170 # Called once when the adapter creates the device/onu instance
Matt Jeanneret84e56f62019-02-26 10:48:09 -0500171 @inlineCallbacks
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500172 def activate(self, device):
173 self.log.debug('function-entry', device=device)
174
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500175 assert device.parent_id
Matt Jeanneret0c287892019-02-28 11:48:00 -0500176 assert device.parent_port_no
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500177 assert device.proxy_address.device_id
178
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500179 self.proxy_address = device.proxy_address
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500180 self.parent_id = device.parent_id
Matt Jeanneret0c287892019-02-28 11:48:00 -0500181 self._pon_port_number = device.parent_port_no
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500182
183 if self.enabled is not True:
184 self.log.info('activating-new-onu')
185 # populate what we know. rest comes later after mib sync
Matt Jeanneret0c287892019-02-28 11:48:00 -0500186 device.root = False
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500187 device.vendor = 'Broadcom'
188 device.connect_status = ConnectStatus.REACHABLE
189 device.oper_status = OperStatus.DISCOVERED
190 device.reason = 'activating-onu'
191
Matt Jeanneret84e56f62019-02-26 10:48:09 -0500192 # TODO NEW CORE: Need to either get logical device id from core or use regular device id
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500193 # pm_metrics requires a logical device id
Matt Jeanneret84e56f62019-02-26 10:48:09 -0500194 #parent_device = yield self.adapter_agent.get_device(device.parent_id)
195 #self.logical_device_id = parent_device.parent_id
196 #assert self.logical_device_id, 'Invalid logical device ID'
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500197
Matt Jeanneret84e56f62019-02-26 10:48:09 -0500198 yield self.adapter_agent.device_update(device)
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500199
200 self.log.debug('set-device-discovered')
201
202 self._init_pon_state(device)
203
204 ############################################################################
205 # Setup PM configuration for this device
206 # Pass in ONU specific options
207 kwargs = {
208 OnuPmMetrics.DEFAULT_FREQUENCY_KEY: OnuPmMetrics.DEFAULT_ONU_COLLECTION_FREQUENCY,
209 'heartbeat': self.heartbeat,
210 OnuOmciPmMetrics.OMCI_DEV_KEY: self._onu_omci_device
211 }
212 self.pm_metrics = OnuPmMetrics(self.adapter_agent, self.device_id,
213 self.logical_device_id, grouped=True,
214 freq_override=False, **kwargs)
215 pm_config = self.pm_metrics.make_proto()
216 self._onu_omci_device.set_pm_config(self.pm_metrics.omci_pm.openomci_interval_pm)
217 self.log.info("initial-pm-config", pm_config=pm_config)
Matt Jeanneret84e56f62019-02-26 10:48:09 -0500218 yield self.adapter_agent.device_pm_config_update(pm_config, init=True)
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500219
220 ############################################################################
221 # Setup Alarm handler
222 self.alarms = AdapterAlarms(self.adapter_agent, device.id, self.logical_device_id)
223 # Note, ONU ID and UNI intf set in add_uni_port method
224 self._onu_omci_device.alarm_synchronizer.set_alarm_params(mgr=self.alarms,
225 ani_ports=[self._pon])
226 self.enabled = True
227 else:
228 self.log.info('onu-already-activated')
229
230 # Called once when the adapter needs to re-create device. usually on vcore restart
231 def reconcile(self, device):
232 self.log.debug('function-entry', device=device)
233
234 # first we verify that we got parent reference and proxy info
235 assert device.parent_id
236 assert device.proxy_address.device_id
237
238 # register for proxied messages right away
239 self.proxy_address = device.proxy_address
240 self.adapter_agent.register_for_proxied_messages(device.proxy_address)
241
242 if self.enabled is not True:
243 self.log.info('reconciling-broadcom-onu-device')
244
245 self._init_pon_state(device)
246
247 # need to restart state machines on vcore restart. there is no indication to do it for us.
248 self._onu_omci_device.start()
249 device.reason = "restarting-openomci"
250 self.adapter_agent.update_device(device)
251
252 # TODO: this is probably a bit heavy handed
253 # Force a reboot for now. We need indications to reflow to reassign tconts and gems given vcore went away
254 # This may not be necessary when mib resync actually works
255 reactor.callLater(1, self.reboot)
256
257 self.enabled = True
258 else:
259 self.log.info('onu-already-activated')
260
261 @inlineCallbacks
262 def handle_onu_events(self):
263 event_msg = yield self.event_messages.get()
264 try:
265 if event_msg['event'] == 'download_tech_profile':
266 tp_path = event_msg['event_data']
267 uni_id = event_msg['uni_id']
268 self.load_and_configure_tech_profile(uni_id, tp_path)
269
270 except Exception as e:
271 self.log.error("exception-handling-onu-event", e=e)
272
273 # Handle next event
274 reactor.callLater(0, self.handle_onu_events)
275
Matt Jeanneret84e56f62019-02-26 10:48:09 -0500276 @inlineCallbacks
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500277 def _init_pon_state(self, device):
278 self.log.debug('function-entry', device=device)
279
280 self._pon = PonPort.create(self, self._pon_port_number)
Matt Jeanneret0c287892019-02-28 11:48:00 -0500281 self._pon.add_peer(self.parent_id, self._pon_port_number)
282 self.log.debug('adding-pon-port-to-agent', pon=self._pon.get_port())
283
Matt Jeanneret84e56f62019-02-26 10:48:09 -0500284 yield self.adapter_agent.port_created(device.id, self._pon.get_port())
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500285
Matt Jeanneret0c287892019-02-28 11:48:00 -0500286 self.log.debug('added-pon-port-to-agent', pon=self._pon.get_port())
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500287
288 # Create and start the OpenOMCI ONU Device Entry for this ONU
289 self._onu_omci_device = self.omci_agent.add_device(self.device_id,
290 self.adapter_agent,
291 support_classes=self.adapter.broadcom_omci,
292 custom_me_map=self.adapter.custom_me_entities())
293 # Port startup
294 if self._pon is not None:
295 self._pon.enabled = True
296
297 # TODO: move to UniPort
298 def update_logical_port(self, logical_device_id, port_id, state):
299 try:
300 self.log.info('updating-logical-port', logical_port_id=port_id,
301 logical_device_id=logical_device_id, state=state)
302 logical_port = self.adapter_agent.get_logical_port(logical_device_id,
303 port_id)
304 logical_port.ofp_port.state = state
305 self.adapter_agent.update_logical_port(logical_device_id,
306 logical_port)
307 except Exception as e:
308 self.log.exception("exception-updating-port", e=e)
309
310 def delete(self, device):
311 self.log.info('delete-onu', device=device)
312 if self.parent_adapter:
313 try:
314 self.parent_adapter.delete_child_device(self.parent_id, device)
315 except AttributeError:
316 self.log.debug('parent-device-delete-child-not-implemented')
317 else:
318 self.log.debug("parent-adapter-not-available")
319
320 def _create_tconts(self, uni_id, us_scheduler):
321 alloc_id = us_scheduler['alloc_id']
322 q_sched_policy = us_scheduler['q_sched_policy']
323 self.log.debug('create-tcont', us_scheduler=us_scheduler)
324
325 tcontdict = dict()
326 tcontdict['alloc-id'] = alloc_id
327 tcontdict['q_sched_policy'] = q_sched_policy
328 tcontdict['uni_id'] = uni_id
329
330 # TODO: Not sure what to do with any of this...
331 tddata = dict()
332 tddata['name'] = 'not-sure-td-profile'
333 tddata['fixed-bandwidth'] = "not-sure-fixed"
334 tddata['assured-bandwidth'] = "not-sure-assured"
335 tddata['maximum-bandwidth'] = "not-sure-max"
336 tddata['additional-bw-eligibility-indicator'] = "not-sure-additional"
337
338 td = OnuTrafficDescriptor.create(tddata)
339 tcont = OnuTCont.create(self, tcont=tcontdict, td=td)
340
341 self._pon.add_tcont(tcont)
342
343 self.log.debug('pon-add-tcont', tcont=tcont)
344
345 # Called when there is an olt up indication, providing the gem port id chosen by the olt handler
346 def _create_gemports(self, uni_id, gem_ports, alloc_id_ref, direction):
347 self.log.debug('create-gemport',
348 gem_ports=gem_ports, direction=direction)
349
350 for gem_port in gem_ports:
351 gemdict = dict()
352 gemdict['gemport_id'] = gem_port['gemport_id']
353 gemdict['direction'] = direction
354 gemdict['alloc_id_ref'] = alloc_id_ref
355 gemdict['encryption'] = gem_port['aes_encryption']
356 gemdict['discard_config'] = dict()
357 gemdict['discard_config']['max_probability'] = \
358 gem_port['discard_config']['max_probability']
359 gemdict['discard_config']['max_threshold'] = \
360 gem_port['discard_config']['max_threshold']
361 gemdict['discard_config']['min_threshold'] = \
362 gem_port['discard_config']['min_threshold']
363 gemdict['discard_policy'] = gem_port['discard_policy']
364 gemdict['max_q_size'] = gem_port['max_q_size']
365 gemdict['pbit_map'] = gem_port['pbit_map']
366 gemdict['priority_q'] = gem_port['priority_q']
367 gemdict['scheduling_policy'] = gem_port['scheduling_policy']
368 gemdict['weight'] = gem_port['weight']
369 gemdict['uni_id'] = uni_id
370
371 gem_port = OnuGemPort.create(self, gem_port=gemdict)
372
373 self._pon.add_gem_port(gem_port)
374
375 self.log.debug('pon-add-gemport', gem_port=gem_port)
376
377 def _do_tech_profile_configuration(self, uni_id, tp):
378 num_of_tconts = tp['num_of_tconts']
379 us_scheduler = tp['us_scheduler']
380 alloc_id = us_scheduler['alloc_id']
381 self._create_tconts(uni_id, us_scheduler)
382 upstream_gem_port_attribute_list = tp['upstream_gem_port_attribute_list']
383 self._create_gemports(uni_id, upstream_gem_port_attribute_list, alloc_id, "UPSTREAM")
384 downstream_gem_port_attribute_list = tp['downstream_gem_port_attribute_list']
385 self._create_gemports(uni_id, downstream_gem_port_attribute_list, alloc_id, "DOWNSTREAM")
386
387 def load_and_configure_tech_profile(self, uni_id, tp_path):
388 self.log.debug("loading-tech-profile-configuration", uni_id=uni_id, tp_path=tp_path)
389
390 if uni_id not in self._tp_service_specific_task:
391 self._tp_service_specific_task[uni_id] = dict()
392
393 if uni_id not in self._tech_profile_download_done:
394 self._tech_profile_download_done[uni_id] = dict()
395
396 if tp_path not in self._tech_profile_download_done[uni_id]:
397 self._tech_profile_download_done[uni_id][tp_path] = False
398
399 if not self._tech_profile_download_done[uni_id][tp_path]:
400 try:
401 if tp_path in self._tp_service_specific_task[uni_id]:
402 self.log.info("tech-profile-config-already-in-progress",
403 tp_path=tp_path)
404 return
405
406 tp = self.kv_client[tp_path]
407 tp = ast.literal_eval(tp)
408 self.log.debug("tp-instance", tp=tp)
409 self._do_tech_profile_configuration(uni_id, tp)
410
411 def success(_results):
412 self.log.info("tech-profile-config-done-successfully")
413 device = self.adapter_agent.get_device(self.device_id)
414 device.reason = 'tech-profile-config-download-success'
415 self.adapter_agent.update_device(device)
416 if tp_path in self._tp_service_specific_task[uni_id]:
417 del self._tp_service_specific_task[uni_id][tp_path]
418 self._tech_profile_download_done[uni_id][tp_path] = True
419
420 def failure(_reason):
421 self.log.warn('tech-profile-config-failure-retrying',
422 _reason=_reason)
423 device = self.adapter_agent.get_device(self.device_id)
424 device.reason = 'tech-profile-config-download-failure-retrying'
425 self.adapter_agent.update_device(device)
426 if tp_path in self._tp_service_specific_task[uni_id]:
427 del self._tp_service_specific_task[uni_id][tp_path]
428 self._deferred = reactor.callLater(_STARTUP_RETRY_WAIT, self.load_and_configure_tech_profile,
429 uni_id, tp_path)
430
431 self.log.info('downloading-tech-profile-configuration')
432 self._tp_service_specific_task[uni_id][tp_path] = \
433 BrcmTpServiceSpecificTask(self.omci_agent, self, uni_id)
434 self._deferred = \
435 self._onu_omci_device.task_runner.queue_task(self._tp_service_specific_task[uni_id][tp_path])
436 self._deferred.addCallbacks(success, failure)
437
438 except Exception as e:
439 self.log.exception("error-loading-tech-profile", e=e)
440 else:
441 self.log.info("tech-profile-config-already-done")
442
443 def update_pm_config(self, device, pm_config):
444 # TODO: This has not been tested
445 self.log.info('update_pm_config', pm_config=pm_config)
446 self.pm_metrics.update(pm_config)
447
448 # Calling this assumes the onu is active/ready and had at least an initial mib downloaded. This gets called from
449 # flow decomposition that ultimately comes from onos
450 def update_flow_table(self, device, flows):
451 self.log.debug('function-entry', device=device, flows=flows)
452
453 #
454 # We need to proxy through the OLT to get to the ONU
455 # Configuration from here should be using OMCI
456 #
457 # self.log.info('bulk-flow-update', device_id=device.id, flows=flows)
458
459 # no point in pushing omci flows if the device isnt reachable
460 if device.connect_status != ConnectStatus.REACHABLE or \
461 device.admin_state != AdminState.ENABLED:
462 self.log.warn("device-disabled-or-offline-skipping-flow-update",
463 admin=device.admin_state, connect=device.connect_status)
464 return
465
466 def is_downstream(port):
467 return port == self._pon_port_number
468
469 def is_upstream(port):
470 return not is_downstream(port)
471
472 for flow in flows:
473 _type = None
474 _port = None
475 _vlan_vid = None
476 _udp_dst = None
477 _udp_src = None
478 _ipv4_dst = None
479 _ipv4_src = None
480 _metadata = None
481 _output = None
482 _push_tpid = None
483 _field = None
484 _set_vlan_vid = None
485 self.log.debug('bulk-flow-update', device_id=device.id, flow=flow)
486 try:
487 _in_port = fd.get_in_port(flow)
488 assert _in_port is not None
489
490 _out_port = fd.get_out_port(flow) # may be None
491
492 if is_downstream(_in_port):
493 self.log.debug('downstream-flow', in_port=_in_port, out_port=_out_port)
494 uni_port = self.uni_port(_out_port)
495 elif is_upstream(_in_port):
496 self.log.debug('upstream-flow', in_port=_in_port, out_port=_out_port)
497 uni_port = self.uni_port(_in_port)
498 else:
499 raise Exception('port should be 1 or 2 by our convention')
500
501 self.log.debug('flow-ports', in_port=_in_port, out_port=_out_port, uni_port=str(uni_port))
502
503 for field in fd.get_ofb_fields(flow):
504 if field.type == fd.ETH_TYPE:
505 _type = field.eth_type
506 self.log.debug('field-type-eth-type',
507 eth_type=_type)
508
509 elif field.type == fd.IP_PROTO:
510 _proto = field.ip_proto
511 self.log.debug('field-type-ip-proto',
512 ip_proto=_proto)
513
514 elif field.type == fd.IN_PORT:
515 _port = field.port
516 self.log.debug('field-type-in-port',
517 in_port=_port)
518
519 elif field.type == fd.VLAN_VID:
520 _vlan_vid = field.vlan_vid & 0xfff
521 self.log.debug('field-type-vlan-vid',
522 vlan=_vlan_vid)
523
524 elif field.type == fd.VLAN_PCP:
525 _vlan_pcp = field.vlan_pcp
526 self.log.debug('field-type-vlan-pcp',
527 pcp=_vlan_pcp)
528
529 elif field.type == fd.UDP_DST:
530 _udp_dst = field.udp_dst
531 self.log.debug('field-type-udp-dst',
532 udp_dst=_udp_dst)
533
534 elif field.type == fd.UDP_SRC:
535 _udp_src = field.udp_src
536 self.log.debug('field-type-udp-src',
537 udp_src=_udp_src)
538
539 elif field.type == fd.IPV4_DST:
540 _ipv4_dst = field.ipv4_dst
541 self.log.debug('field-type-ipv4-dst',
542 ipv4_dst=_ipv4_dst)
543
544 elif field.type == fd.IPV4_SRC:
545 _ipv4_src = field.ipv4_src
546 self.log.debug('field-type-ipv4-src',
547 ipv4_dst=_ipv4_src)
548
549 elif field.type == fd.METADATA:
550 _metadata = field.table_metadata
551 self.log.debug('field-type-metadata',
552 metadata=_metadata)
553
554 else:
555 raise NotImplementedError('field.type={}'.format(
556 field.type))
557
558 for action in fd.get_actions(flow):
559
560 if action.type == fd.OUTPUT:
561 _output = action.output.port
562 self.log.debug('action-type-output',
563 output=_output, in_port=_in_port)
564
565 elif action.type == fd.POP_VLAN:
566 self.log.debug('action-type-pop-vlan',
567 in_port=_in_port)
568
569 elif action.type == fd.PUSH_VLAN:
570 _push_tpid = action.push.ethertype
571 self.log.debug('action-type-push-vlan',
572 push_tpid=_push_tpid, in_port=_in_port)
573 if action.push.ethertype != 0x8100:
574 self.log.error('unhandled-tpid',
575 ethertype=action.push.ethertype)
576
577 elif action.type == fd.SET_FIELD:
578 _field = action.set_field.field.ofb_field
579 assert (action.set_field.field.oxm_class ==
580 OFPXMC_OPENFLOW_BASIC)
581 self.log.debug('action-type-set-field',
582 field=_field, in_port=_in_port)
583 if _field.type == fd.VLAN_VID:
584 _set_vlan_vid = _field.vlan_vid & 0xfff
585 self.log.debug('set-field-type-vlan-vid',
586 vlan_vid=_set_vlan_vid)
587 else:
588 self.log.error('unsupported-action-set-field-type',
589 field_type=_field.type)
590 else:
591 self.log.error('unsupported-action-type',
592 action_type=action.type, in_port=_in_port)
593
594 # TODO: We only set vlan omci flows. Handle omci matching ethertypes at some point in another task
595 if _type is not None:
596 self.log.warn('ignoring-flow-with-ethType', ethType=_type)
597 elif _set_vlan_vid is None or _set_vlan_vid == 0:
598 self.log.warn('ignorning-flow-that-does-not-set-vlanid')
599 else:
600 self.log.warn('set-vlanid', uni_id=uni_port.port_number, set_vlan_vid=_set_vlan_vid)
601 self._add_vlan_filter_task(device, uni_port, _set_vlan_vid)
602
603 except Exception as e:
604 self.log.exception('failed-to-install-flow', e=e, flow=flow)
605
606
607 def _add_vlan_filter_task(self, device, uni_port, _set_vlan_vid):
608 assert uni_port is not None
609
610 def success(_results):
611 self.log.info('vlan-tagging-success', uni_port=uni_port, vlan=_set_vlan_vid)
612 device.reason = 'omci-flows-pushed'
613 self._vlan_filter_task = None
614
615 def failure(_reason):
616 self.log.warn('vlan-tagging-failure', uni_port=uni_port, vlan=_set_vlan_vid)
617 device.reason = 'omci-flows-failed-retrying'
618 self._vlan_filter_task = reactor.callLater(_STARTUP_RETRY_WAIT,
619 self._add_vlan_filter_task, device, uni_port, _set_vlan_vid)
620
621 self.log.info('setting-vlan-tag')
622 self._vlan_filter_task = BrcmVlanFilterTask(self.omci_agent, self.device_id, uni_port, _set_vlan_vid)
623 self._deferred = self._onu_omci_device.task_runner.queue_task(self._vlan_filter_task)
624 self._deferred.addCallbacks(success, failure)
625
626 def get_tx_id(self):
627 self.log.debug('function-entry')
628 self.tx_id += 1
629 return self.tx_id
630
631 # TODO: Actually conform to or create a proper interface.
632 # this and the other functions called from the olt arent very clear.
633 # Called each time there is an onu "up" indication from the olt handler
634 def create_interface(self, data):
635 self.log.debug('function-entry', data=data)
636 self._onu_indication = data
637
638 onu_device = self.adapter_agent.get_device(self.device_id)
639
640 self.log.debug('starting-openomci-statemachine')
641 self._subscribe_to_events()
642 reactor.callLater(1, self._onu_omci_device.start)
643 onu_device.reason = "starting-openomci"
644 self.adapter_agent.update_device(onu_device)
645 self._heartbeat.enabled = True
646
647 # Currently called each time there is an onu "down" indication from the olt handler
648 # TODO: possibly other reasons to "update" from the olt?
649 def update_interface(self, data):
650 self.log.debug('function-entry', data=data)
651 oper_state = data.get('oper_state', None)
652
653 onu_device = self.adapter_agent.get_device(self.device_id)
654
655 if oper_state == 'down':
656 self.log.debug('stopping-openomci-statemachine')
657 reactor.callLater(0, self._onu_omci_device.stop)
658
659 # Let TP download happen again
660 for uni_id in self._tp_service_specific_task:
661 self._tp_service_specific_task[uni_id].clear()
662 for uni_id in self._tech_profile_download_done:
663 self._tech_profile_download_done[uni_id].clear()
664
665 self.disable_ports(onu_device)
666 onu_device.reason = "stopping-openomci"
667 onu_device.connect_status = ConnectStatus.UNREACHABLE
668 onu_device.oper_status = OperStatus.DISCOVERED
669 self.adapter_agent.update_device(onu_device)
670 else:
671 self.log.debug('not-changing-openomci-statemachine')
672
673 # Not currently called by olt or anything else
674 def remove_interface(self, data):
675 self.log.debug('function-entry', data=data)
676
677 onu_device = self.adapter_agent.get_device(self.device_id)
678
679 self.log.debug('stopping-openomci-statemachine')
680 reactor.callLater(0, self._onu_omci_device.stop)
681
682 # Let TP download happen again
683 for uni_id in self._tp_service_specific_task:
684 self._tp_service_specific_task[uni_id].clear()
685 for uni_id in self._tech_profile_download_done:
686 self._tech_profile_download_done[uni_id].clear()
687
688 self.disable_ports(onu_device)
689 onu_device.reason = "stopping-openomci"
690 self.adapter_agent.update_device(onu_device)
691
692 # TODO: im sure there is more to do here
693
694 # Not currently called. Would be called presumably from the olt handler
695 def remove_gemport(self, data):
696 self.log.debug('remove-gemport', data=data)
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500697 device = self.adapter_agent.get_device(self.device_id)
698 if device.connect_status != ConnectStatus.REACHABLE:
699 self.log.error('device-unreachable')
700 return
701
702 # Not currently called. Would be called presumably from the olt handler
703 def remove_tcont(self, tcont_data, traffic_descriptor_data):
704 self.log.debug('remove-tcont', tcont_data=tcont_data, traffic_descriptor_data=traffic_descriptor_data)
705 device = self.adapter_agent.get_device(self.device_id)
706 if device.connect_status != ConnectStatus.REACHABLE:
707 self.log.error('device-unreachable')
708 return
709
710 # TODO: Create some omci task that encompases this what intended
711
712 # Not currently called. Would be called presumably from the olt handler
713 def create_multicast_gemport(self, data):
714 self.log.debug('function-entry', data=data)
715
716 # TODO: create objects and populate for later omci calls
717
718 def disable(self, device):
719 self.log.debug('function-entry', device=device)
720 try:
721 self.log.info('sending-uni-lock-towards-device', device=device)
722
723 def stop_anyway(reason):
724 # proceed with disable regardless if we could reach the onu. for example onu is unplugged
725 self.log.debug('stopping-openomci-statemachine')
726 reactor.callLater(0, self._onu_omci_device.stop)
727
728 # Let TP download happen again
729 for uni_id in self._tp_service_specific_task:
730 self._tp_service_specific_task[uni_id].clear()
731 for uni_id in self._tech_profile_download_done:
732 self._tech_profile_download_done[uni_id].clear()
733
734 self.disable_ports(device)
735 device.oper_status = OperStatus.UNKNOWN
736 device.reason = "omci-admin-lock"
737 self.adapter_agent.update_device(device)
738
739 # lock all the unis
740 task = BrcmUniLockTask(self.omci_agent, self.device_id, lock=True)
741 self._deferred = self._onu_omci_device.task_runner.queue_task(task)
742 self._deferred.addCallbacks(stop_anyway, stop_anyway)
743 except Exception as e:
744 log.exception('exception-in-onu-disable', exception=e)
745
746 def reenable(self, device):
747 self.log.debug('function-entry', device=device)
748 try:
749 # Start up OpenOMCI state machines for this device
750 # this will ultimately resync mib and unlock unis on successful redownloading the mib
751 self.log.debug('restarting-openomci-statemachine')
752 self._subscribe_to_events()
753 device.reason = "restarting-openomci"
754 self.adapter_agent.update_device(device)
755 reactor.callLater(1, self._onu_omci_device.start)
756 self._heartbeat.enabled = True
757 except Exception as e:
758 log.exception('exception-in-onu-reenable', exception=e)
759
760 def reboot(self):
761 self.log.info('reboot-device')
762 device = self.adapter_agent.get_device(self.device_id)
763 if device.connect_status != ConnectStatus.REACHABLE:
764 self.log.error("device-unreachable")
765 return
766
767 def success(_results):
768 self.log.info('reboot-success', _results=_results)
769 self.disable_ports(device)
770 device.connect_status = ConnectStatus.UNREACHABLE
771 device.oper_status = OperStatus.DISCOVERED
772 device.reason = "rebooting"
773 self.adapter_agent.update_device(device)
774
775 def failure(_reason):
776 self.log.info('reboot-failure', _reason=_reason)
777
778 self._deferred = self._onu_omci_device.reboot()
779 self._deferred.addCallbacks(success, failure)
780
781 def disable_ports(self, onu_device):
782 self.log.info('disable-ports', device_id=self.device_id,
783 onu_device=onu_device)
784
785 # Disable all ports on that device
786 self.adapter_agent.disable_all_ports(self.device_id)
787
788 parent_device = self.adapter_agent.get_device(onu_device.parent_id)
789 assert parent_device
790 logical_device_id = parent_device.parent_id
791 assert logical_device_id
792 ports = self.adapter_agent.get_ports(onu_device.id, Port.ETHERNET_UNI)
793 for port in ports:
794 port_id = 'uni-{}'.format(port.port_no)
795 # TODO: move to UniPort
796 self.update_logical_port(logical_device_id, port_id, OFPPS_LINK_DOWN)
797
798 def enable_ports(self, onu_device):
799 self.log.info('enable-ports', device_id=self.device_id, onu_device=onu_device)
800
801 # Disable all ports on that device
802 self.adapter_agent.enable_all_ports(self.device_id)
803
804 parent_device = self.adapter_agent.get_device(onu_device.parent_id)
805 assert parent_device
806 logical_device_id = parent_device.parent_id
807 assert logical_device_id
808 ports = self.adapter_agent.get_ports(onu_device.id, Port.ETHERNET_UNI)
809 for port in ports:
810 port_id = 'uni-{}'.format(port.port_no)
811 # TODO: move to UniPort
812 self.update_logical_port(logical_device_id, port_id, OFPPS_LIVE)
813
814 # Called just before openomci state machine is started. These listen for events from selected state machines,
815 # most importantly, mib in sync. Which ultimately leads to downloading the mib
816 def _subscribe_to_events(self):
817 self.log.debug('function-entry')
818
819 # OMCI MIB Database sync status
820 bus = self._onu_omci_device.event_bus
821 topic = OnuDeviceEntry.event_bus_topic(self.device_id,
822 OnuDeviceEvents.MibDatabaseSyncEvent)
823 self._in_sync_subscription = bus.subscribe(topic, self.in_sync_handler)
824
825 # OMCI Capabilities
826 bus = self._onu_omci_device.event_bus
827 topic = OnuDeviceEntry.event_bus_topic(self.device_id,
828 OnuDeviceEvents.OmciCapabilitiesEvent)
829 self._capabilities_subscription = bus.subscribe(topic, self.capabilties_handler)
830
831 # Called when the mib is in sync
832 def in_sync_handler(self, _topic, msg):
833 self.log.debug('function-entry', _topic=_topic, msg=msg)
834 if self._in_sync_subscription is not None:
835 try:
836 in_sync = msg[IN_SYNC_KEY]
837
838 if in_sync:
839 # Only call this once
840 bus = self._onu_omci_device.event_bus
841 bus.unsubscribe(self._in_sync_subscription)
842 self._in_sync_subscription = None
843
844 # Start up device_info load
845 self.log.debug('running-mib-sync')
846 reactor.callLater(0, self._mib_in_sync)
847
848 except Exception as e:
849 self.log.exception('in-sync', e=e)
850
851 def capabilties_handler(self, _topic, _msg):
852 self.log.debug('function-entry', _topic=_topic, msg=_msg)
853 if self._capabilities_subscription is not None:
854 self.log.debug('capabilities-handler-done')
855
856 # Mib is in sync, we can now query what we learned and actually start pushing ME (download) to the ONU.
857 # Currently uses a basic mib download task that create a bridge with a single gem port and uni, only allowing EAP
858 # Implement your own MibDownloadTask if you wish to setup something different by default
859 def _mib_in_sync(self):
860 self.log.debug('function-entry')
861
862 omci = self._onu_omci_device
863 in_sync = omci.mib_db_in_sync
864
865 device = self.adapter_agent.get_device(self.device_id)
866 device.reason = 'discovery-mibsync-complete'
867 self.adapter_agent.update_device(device)
868
869 if not self._dev_info_loaded:
870 self.log.info('loading-device-data-from-mib', in_sync=in_sync, already_loaded=self._dev_info_loaded)
871
872 omci_dev = self._onu_omci_device
873 config = omci_dev.configuration
874
875 # TODO: run this sooner somehow. shouldnt have to wait for mib sync to push an initial download
876 # In Sync, we can register logical ports now. Ideally this could occur on
877 # the first time we received a successful (no timeout) OMCI Rx response.
878 try:
879
880 # sort the lists so we get consistent port ordering.
881 ani_list = sorted(config.ani_g_entities) if config.ani_g_entities else []
882 uni_list = sorted(config.uni_g_entities) if config.uni_g_entities else []
883 pptp_list = sorted(config.pptp_entities) if config.pptp_entities else []
884 veip_list = sorted(config.veip_entities) if config.veip_entities else []
885
886 if ani_list is None or (pptp_list is None and veip_list is None):
887 device.reason = 'onu-missing-required-elements'
888 self.log.warn("no-ani-or-unis")
889 self.adapter_agent.update_device(device)
890 raise Exception("onu-missing-required-elements")
891
892 # Currently logging the ani, pptp, veip, and uni for information purposes.
893 # Actually act on the veip/pptp as its ME is the most correct one to use in later tasks.
894 # And in some ONU the UNI-G list is incomplete or incorrect...
895 for entity_id in ani_list:
896 ani_value = config.ani_g_entities[entity_id]
897 self.log.debug("discovered-ani", entity_id=entity_id, value=ani_value)
898 # TODO: currently only one OLT PON port/ANI, so this works out. With NGPON there will be 2..?
899 self._total_tcont_count = ani_value.get('total-tcont-count')
900 self.log.debug("set-total-tcont-count", tcont_count=self._total_tcont_count)
901
902 for entity_id in uni_list:
903 uni_value = config.uni_g_entities[entity_id]
904 self.log.debug("discovered-uni", entity_id=entity_id, value=uni_value)
905
906 uni_entities = OrderedDict()
907 for entity_id in pptp_list:
908 pptp_value = config.pptp_entities[entity_id]
909 self.log.debug("discovered-pptp", entity_id=entity_id, value=pptp_value)
910 uni_entities[entity_id] = UniType.PPTP
911
912 for entity_id in veip_list:
913 veip_value = config.veip_entities[entity_id]
914 self.log.debug("discovered-veip", entity_id=entity_id, value=veip_value)
915 uni_entities[entity_id] = UniType.VEIP
916
917 uni_id = 0
918 for entity_id, uni_type in uni_entities.iteritems():
919 try:
920 self._add_uni_port(entity_id, uni_id, uni_type)
921 uni_id += 1
922 except AssertionError as e:
923 self.log.warn("could not add UNI", entity_id=entity_id, uni_type=uni_type, e=e)
924
925 multi_uni = len(self._unis) > 1
926 for uni_port in self._unis.itervalues():
927 uni_port.add_logical_port(uni_port.port_number, multi_uni)
928
929 self.adapter_agent.update_device(device)
930
931 self._qos_flexibility = config.qos_configuration_flexibility or 0
932 self._omcc_version = config.omcc_version or OMCCVersion.Unknown
933
934 if self._unis:
935 self._dev_info_loaded = True
936 else:
937 device.reason = 'no-usable-unis'
938 self.adapter_agent.update_device(device)
939 self.log.warn("no-usable-unis")
940 raise Exception("no-usable-unis")
941
942 except Exception as e:
943 self.log.exception('device-info-load', e=e)
944 self._deferred = reactor.callLater(_STARTUP_RETRY_WAIT, self._mib_in_sync)
945
946 else:
947 self.log.info('device-info-already-loaded', in_sync=in_sync, already_loaded=self._dev_info_loaded)
948
949 if self._dev_info_loaded:
950 if device.admin_state == AdminState.ENABLED:
951 def success(_results):
952 self.log.info('mib-download-success', _results=_results)
953 device = self.adapter_agent.get_device(self.device_id)
954 device.reason = 'initial-mib-downloaded'
955 device.oper_status = OperStatus.ACTIVE
956 device.connect_status = ConnectStatus.REACHABLE
957 self.enable_ports(device)
958 self.adapter_agent.update_device(device)
959 self._mib_download_task = None
960
961 def failure(_reason):
962 self.log.warn('mib-download-failure-retrying', _reason=_reason)
963 device.reason = 'initial-mib-download-failure-retrying'
964 self.adapter_agent.update_device(device)
965 self._deferred = reactor.callLater(_STARTUP_RETRY_WAIT, self._mib_in_sync)
966
967 # Download an initial mib that creates simple bridge that can pass EAP. On success (above) finally set
968 # the device to active/reachable. This then opens up the handler to openflow pushes from outside
969 self.log.info('downloading-initial-mib-configuration')
970 self._mib_download_task = BrcmMibDownloadTask(self.omci_agent, self)
971 self._deferred = self._onu_omci_device.task_runner.queue_task(self._mib_download_task)
972 self._deferred.addCallbacks(success, failure)
973 else:
974 self.log.info('admin-down-disabling')
975 self.disable(device)
976 else:
977 self.log.info('device-info-not-loaded-skipping-mib-download')
978
979
980 def _add_uni_port(self, entity_id, uni_id, uni_type=UniType.PPTP):
981 self.log.debug('function-entry')
982
983 device = self.adapter_agent.get_device(self.device_id)
984 parent_device = self.adapter_agent.get_device(device.parent_id)
985
986 parent_adapter_agent = registry('adapter_loader').get_agent(parent_device.adapter)
987 if parent_adapter_agent is None:
988 self.log.error('parent-adapter-could-not-be-retrieved')
989
990 # TODO: This knowledge is locked away in openolt. and it assumes one onu equals one uni...
991 parent_device = self.adapter_agent.get_device(device.parent_id)
992 parent_adapter = parent_adapter_agent.adapter.devices[parent_device.id]
993 uni_no = parent_adapter.platform.mk_uni_port_num(
994 self._onu_indication.intf_id, self._onu_indication.onu_id, uni_id)
995
996 # TODO: Some or parts of this likely need to move to UniPort. especially the format stuff
997 uni_name = "uni-{}".format(uni_no)
998
999 mac_bridge_port_num = uni_id + 1 # TODO +1 is only to test non-zero index
1000
1001 self.log.debug('uni-port-inputs', uni_no=uni_no, uni_id=uni_id, uni_name=uni_name, uni_type=uni_type,
1002 entity_id=entity_id, mac_bridge_port_num=mac_bridge_port_num)
1003
1004 uni_port = UniPort.create(self, uni_name, uni_id, uni_no, uni_name, uni_type)
1005 uni_port.entity_id = entity_id
1006 uni_port.enabled = True
1007 uni_port.mac_bridge_port_num = mac_bridge_port_num
1008
1009 self.log.debug("created-uni-port", uni=uni_port)
1010
1011 self.adapter_agent.add_port(device.id, uni_port.get_port())
1012 parent_adapter_agent.add_port(device.parent_id, uni_port.get_port())
1013
1014 self._unis[uni_port.port_number] = uni_port
1015
1016 self._onu_omci_device.alarm_synchronizer.set_alarm_params(onu_id=self._onu_indication.onu_id,
1017 uni_ports=self._unis.values())
1018 # TODO: this should be in the PonPortclass
1019 pon_port = self._pon.get_port()
1020
1021 # Delete reference to my own UNI as peer from parent.
1022 # TODO why is this here, add_port_reference_to_parent already prunes duplicates
1023 me_as_peer = Port.PeerPort(device_id=device.parent_id, port_no=uni_port.port_number)
1024 partial_pon_port = Port(port_no=pon_port.port_no, label=pon_port.label,
1025 type=pon_port.type, admin_state=pon_port.admin_state,
1026 oper_status=pon_port.oper_status,
1027 peers=[me_as_peer]) # only list myself as a peer to avoid deleting all other UNIs from parent
1028 self.adapter_agent.delete_port_reference_from_parent(self.device_id, partial_pon_port)
1029
1030 pon_port.peers.extend([me_as_peer])
1031
1032 self._pon._port = pon_port
1033
1034 self.adapter_agent.add_port_reference_to_parent(self.device_id,
1035 pon_port)