blob: 84b47517d33f7967b4f89945806bc2367b7f0523 [file] [log] [blame]
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -05001#
2# Copyright 2017 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18Broadcom OpenOMCI OLT/ONU adapter handler.
19"""
20
21import json
22import ast
23import structlog
24
25from collections import OrderedDict
26
27from twisted.internet import reactor, task
28from twisted.internet.defer import DeferredQueue, inlineCallbacks, returnValue, TimeoutError
29
30from heartbeat import HeartBeat
Matt Jeanneret72f96fc2019-02-11 10:53:05 -050031from pyvoltha.adapters.extensions.kpi.onu.onu_pm_metrics import OnuPmMetrics
32from pyvoltha.adapters.extensions.kpi.onu.onu_omci_pm import OnuOmciPmMetrics
33from pyvoltha.adapters.extensions.alarms.adapter_alarms import AdapterAlarms
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050034
Matt Jeanneret72f96fc2019-02-11 10:53:05 -050035import pyvoltha.common.openflow.utils as fd
36from pyvoltha.common.utils.registry import registry
37from pyvoltha.common.config.config_backend import ConsulStore
38from pyvoltha.common.config.config_backend import EtcdStore
William Kurkian8235c1e2019-03-05 12:58:28 -050039from voltha_protos.common_pb2 import OperStatus, ConnectStatus, AdminState
40from voltha_protos.openflow_13_pb2 import OFPXMC_OPENFLOW_BASIC, ofp_port
Matt Jeanneret72f96fc2019-02-11 10:53:05 -050041from pyvoltha.adapters.extensions.omci.onu_configuration import OMCCVersion
42from pyvoltha.adapters.extensions.omci.onu_device_entry import OnuDeviceEvents, \
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050043 OnuDeviceEntry, IN_SYNC_KEY
Matt Jeanneret72f96fc2019-02-11 10:53:05 -050044from omci.brcm_mib_download_task import BrcmMibDownloadTask
45from omci.brcm_tp_service_specific_task import BrcmTpServiceSpecificTask
46from omci.brcm_uni_lock_task import BrcmUniLockTask
47from omci.brcm_vlan_filter_task import BrcmVlanFilterTask
48from onu_gem_port import *
49from onu_tcont import *
50from pon_port import *
51from uni_port import *
52from onu_traffic_descriptor import *
53from pyvoltha.common.tech_profile.tech_profile import TechProfile
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050054
55OP = EntityOperations
56RC = ReasonCodes
57
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050058log = structlog.get_logger()
59
60_STARTUP_RETRY_WAIT = 20
61
62
63class BrcmOpenomciOnuHandler(object):
64
65 def __init__(self, adapter, device_id):
66 self.log = structlog.get_logger(device_id=device_id)
67 self.log.debug('function-entry')
68 self.adapter = adapter
69 self.adapter_agent = adapter.adapter_agent
70 self.parent_adapter = None
71 self.parent_id = None
72 self.device_id = device_id
73 self.incoming_messages = DeferredQueue()
74 self.event_messages = DeferredQueue()
75 self.proxy_address = None
76 self.tx_id = 0
77 self._enabled = False
78 self.alarms = None
79 self.pm_metrics = None
80 self._omcc_version = OMCCVersion.Unknown
81 self._total_tcont_count = 0 # From ANI-G ME
82 self._qos_flexibility = 0 # From ONT2_G ME
83
84 self._onu_indication = None
85 self._unis = dict() # Port # -> UniPort
86
87 self._pon = None
88 # TODO: probably shouldnt be hardcoded, determine from olt maybe?
89 self._pon_port_number = 100
90 self.logical_device_id = None
91
92 self._heartbeat = HeartBeat.create(self, device_id)
93
94 # Set up OpenOMCI environment
95 self._onu_omci_device = None
96 self._dev_info_loaded = False
97 self._deferred = None
98
99 self._in_sync_subscription = None
100 self._connectivity_subscription = None
101 self._capabilities_subscription = None
102
103 self.mac_bridge_service_profile_entity_id = 0x201
104 self.gal_enet_profile_entity_id = 0x1
105
106 self._tp_service_specific_task = dict()
107 self._tech_profile_download_done = dict()
108
109 # Initialize KV store client
110 self.args = registry('main').get_args()
111 if self.args.backend == 'etcd':
112 host, port = self.args.etcd.split(':', 1)
113 self.kv_client = EtcdStore(host, port,
114 TechProfile.KV_STORE_TECH_PROFILE_PATH_PREFIX)
115 elif self.args.backend == 'consul':
116 host, port = self.args.consul.split(':', 1)
117 self.kv_client = ConsulStore(host, port,
118 TechProfile.KV_STORE_TECH_PROFILE_PATH_PREFIX)
119 else:
120 self.log.error('Invalid-backend')
121 raise Exception("Invalid-backend-for-kv-store")
122
123 # Handle received ONU event messages
124 reactor.callLater(0, self.handle_onu_events)
125
126 @property
127 def enabled(self):
128 return self._enabled
129
130 @enabled.setter
131 def enabled(self, value):
132 if self._enabled != value:
133 self._enabled = value
134
135 @property
136 def omci_agent(self):
137 return self.adapter.omci_agent
138
139 @property
140 def omci_cc(self):
141 return self._onu_omci_device.omci_cc if self._onu_omci_device is not None else None
142
143 @property
144 def heartbeat(self):
145 return self._heartbeat
146
147 @property
148 def uni_ports(self):
149 return self._unis.values()
150
151 def uni_port(self, port_no_or_name):
152 if isinstance(port_no_or_name, (str, unicode)):
153 return next((uni for uni in self.uni_ports
154 if uni.name == port_no_or_name), None)
155
156 assert isinstance(port_no_or_name, int), 'Invalid parameter type'
157 return next((uni for uni in self.uni_ports
158 if uni.logical_port_number == port_no_or_name), None)
159
160 @property
161 def pon_port(self):
162 return self._pon
163
164 def receive_message(self, msg):
165 if self.omci_cc is not None:
166 self.omci_cc.receive_message(msg)
167
168 # Called once when the adapter creates the device/onu instance
Matt Jeanneret84e56f62019-02-26 10:48:09 -0500169 @inlineCallbacks
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500170 def activate(self, device):
171 self.log.debug('function-entry', device=device)
172
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500173 assert device.parent_id
Matt Jeanneret0c287892019-02-28 11:48:00 -0500174 assert device.parent_port_no
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500175 assert device.proxy_address.device_id
176
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500177 self.proxy_address = device.proxy_address
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500178 self.parent_id = device.parent_id
Matt Jeanneret0c287892019-02-28 11:48:00 -0500179 self._pon_port_number = device.parent_port_no
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500180
181 if self.enabled is not True:
182 self.log.info('activating-new-onu')
183 # populate what we know. rest comes later after mib sync
Matt Jeanneret0c287892019-02-28 11:48:00 -0500184 device.root = False
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500185 device.vendor = 'Broadcom'
186 device.connect_status = ConnectStatus.REACHABLE
187 device.oper_status = OperStatus.DISCOVERED
188 device.reason = 'activating-onu'
189
Matt Jeanneret84e56f62019-02-26 10:48:09 -0500190 # TODO NEW CORE: Need to either get logical device id from core or use regular device id
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500191 # pm_metrics requires a logical device id
Matt Jeanneret84e56f62019-02-26 10:48:09 -0500192 #parent_device = yield self.adapter_agent.get_device(device.parent_id)
193 #self.logical_device_id = parent_device.parent_id
194 #assert self.logical_device_id, 'Invalid logical device ID'
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500195
Matt Jeanneret84e56f62019-02-26 10:48:09 -0500196 yield self.adapter_agent.device_update(device)
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500197
198 self.log.debug('set-device-discovered')
199
200 self._init_pon_state(device)
201
202 ############################################################################
203 # Setup PM configuration for this device
204 # Pass in ONU specific options
205 kwargs = {
206 OnuPmMetrics.DEFAULT_FREQUENCY_KEY: OnuPmMetrics.DEFAULT_ONU_COLLECTION_FREQUENCY,
207 'heartbeat': self.heartbeat,
208 OnuOmciPmMetrics.OMCI_DEV_KEY: self._onu_omci_device
209 }
210 self.pm_metrics = OnuPmMetrics(self.adapter_agent, self.device_id,
211 self.logical_device_id, grouped=True,
212 freq_override=False, **kwargs)
213 pm_config = self.pm_metrics.make_proto()
214 self._onu_omci_device.set_pm_config(self.pm_metrics.omci_pm.openomci_interval_pm)
215 self.log.info("initial-pm-config", pm_config=pm_config)
Matt Jeanneret84e56f62019-02-26 10:48:09 -0500216 yield self.adapter_agent.device_pm_config_update(pm_config, init=True)
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500217
218 ############################################################################
219 # Setup Alarm handler
220 self.alarms = AdapterAlarms(self.adapter_agent, device.id, self.logical_device_id)
221 # Note, ONU ID and UNI intf set in add_uni_port method
222 self._onu_omci_device.alarm_synchronizer.set_alarm_params(mgr=self.alarms,
223 ani_ports=[self._pon])
224 self.enabled = True
225 else:
226 self.log.info('onu-already-activated')
227
228 # Called once when the adapter needs to re-create device. usually on vcore restart
229 def reconcile(self, device):
230 self.log.debug('function-entry', device=device)
231
232 # first we verify that we got parent reference and proxy info
233 assert device.parent_id
234 assert device.proxy_address.device_id
235
236 # register for proxied messages right away
237 self.proxy_address = device.proxy_address
238 self.adapter_agent.register_for_proxied_messages(device.proxy_address)
239
240 if self.enabled is not True:
241 self.log.info('reconciling-broadcom-onu-device')
242
243 self._init_pon_state(device)
244
245 # need to restart state machines on vcore restart. there is no indication to do it for us.
246 self._onu_omci_device.start()
247 device.reason = "restarting-openomci"
248 self.adapter_agent.update_device(device)
249
250 # TODO: this is probably a bit heavy handed
251 # Force a reboot for now. We need indications to reflow to reassign tconts and gems given vcore went away
252 # This may not be necessary when mib resync actually works
253 reactor.callLater(1, self.reboot)
254
255 self.enabled = True
256 else:
257 self.log.info('onu-already-activated')
258
259 @inlineCallbacks
260 def handle_onu_events(self):
261 event_msg = yield self.event_messages.get()
262 try:
263 if event_msg['event'] == 'download_tech_profile':
264 tp_path = event_msg['event_data']
265 uni_id = event_msg['uni_id']
266 self.load_and_configure_tech_profile(uni_id, tp_path)
267
268 except Exception as e:
269 self.log.error("exception-handling-onu-event", e=e)
270
271 # Handle next event
272 reactor.callLater(0, self.handle_onu_events)
273
Matt Jeanneret84e56f62019-02-26 10:48:09 -0500274 @inlineCallbacks
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500275 def _init_pon_state(self, device):
276 self.log.debug('function-entry', device=device)
277
278 self._pon = PonPort.create(self, self._pon_port_number)
Matt Jeanneret0c287892019-02-28 11:48:00 -0500279 self._pon.add_peer(self.parent_id, self._pon_port_number)
280 self.log.debug('adding-pon-port-to-agent', pon=self._pon.get_port())
281
Matt Jeanneret84e56f62019-02-26 10:48:09 -0500282 yield self.adapter_agent.port_created(device.id, self._pon.get_port())
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500283
Matt Jeanneret0c287892019-02-28 11:48:00 -0500284 self.log.debug('added-pon-port-to-agent', pon=self._pon.get_port())
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500285
286 # Create and start the OpenOMCI ONU Device Entry for this ONU
287 self._onu_omci_device = self.omci_agent.add_device(self.device_id,
288 self.adapter_agent,
289 support_classes=self.adapter.broadcom_omci,
290 custom_me_map=self.adapter.custom_me_entities())
291 # Port startup
292 if self._pon is not None:
293 self._pon.enabled = True
294
295 # TODO: move to UniPort
296 def update_logical_port(self, logical_device_id, port_id, state):
297 try:
298 self.log.info('updating-logical-port', logical_port_id=port_id,
299 logical_device_id=logical_device_id, state=state)
300 logical_port = self.adapter_agent.get_logical_port(logical_device_id,
301 port_id)
302 logical_port.ofp_port.state = state
303 self.adapter_agent.update_logical_port(logical_device_id,
304 logical_port)
305 except Exception as e:
306 self.log.exception("exception-updating-port", e=e)
307
308 def delete(self, device):
309 self.log.info('delete-onu', device=device)
310 if self.parent_adapter:
311 try:
312 self.parent_adapter.delete_child_device(self.parent_id, device)
313 except AttributeError:
314 self.log.debug('parent-device-delete-child-not-implemented')
315 else:
316 self.log.debug("parent-adapter-not-available")
317
318 def _create_tconts(self, uni_id, us_scheduler):
319 alloc_id = us_scheduler['alloc_id']
320 q_sched_policy = us_scheduler['q_sched_policy']
321 self.log.debug('create-tcont', us_scheduler=us_scheduler)
322
323 tcontdict = dict()
324 tcontdict['alloc-id'] = alloc_id
325 tcontdict['q_sched_policy'] = q_sched_policy
326 tcontdict['uni_id'] = uni_id
327
328 # TODO: Not sure what to do with any of this...
329 tddata = dict()
330 tddata['name'] = 'not-sure-td-profile'
331 tddata['fixed-bandwidth'] = "not-sure-fixed"
332 tddata['assured-bandwidth'] = "not-sure-assured"
333 tddata['maximum-bandwidth'] = "not-sure-max"
334 tddata['additional-bw-eligibility-indicator'] = "not-sure-additional"
335
336 td = OnuTrafficDescriptor.create(tddata)
337 tcont = OnuTCont.create(self, tcont=tcontdict, td=td)
338
339 self._pon.add_tcont(tcont)
340
341 self.log.debug('pon-add-tcont', tcont=tcont)
342
343 # Called when there is an olt up indication, providing the gem port id chosen by the olt handler
344 def _create_gemports(self, uni_id, gem_ports, alloc_id_ref, direction):
345 self.log.debug('create-gemport',
346 gem_ports=gem_ports, direction=direction)
347
348 for gem_port in gem_ports:
349 gemdict = dict()
350 gemdict['gemport_id'] = gem_port['gemport_id']
351 gemdict['direction'] = direction
352 gemdict['alloc_id_ref'] = alloc_id_ref
353 gemdict['encryption'] = gem_port['aes_encryption']
354 gemdict['discard_config'] = dict()
355 gemdict['discard_config']['max_probability'] = \
356 gem_port['discard_config']['max_probability']
357 gemdict['discard_config']['max_threshold'] = \
358 gem_port['discard_config']['max_threshold']
359 gemdict['discard_config']['min_threshold'] = \
360 gem_port['discard_config']['min_threshold']
361 gemdict['discard_policy'] = gem_port['discard_policy']
362 gemdict['max_q_size'] = gem_port['max_q_size']
363 gemdict['pbit_map'] = gem_port['pbit_map']
364 gemdict['priority_q'] = gem_port['priority_q']
365 gemdict['scheduling_policy'] = gem_port['scheduling_policy']
366 gemdict['weight'] = gem_port['weight']
367 gemdict['uni_id'] = uni_id
368
369 gem_port = OnuGemPort.create(self, gem_port=gemdict)
370
371 self._pon.add_gem_port(gem_port)
372
373 self.log.debug('pon-add-gemport', gem_port=gem_port)
374
375 def _do_tech_profile_configuration(self, uni_id, tp):
376 num_of_tconts = tp['num_of_tconts']
377 us_scheduler = tp['us_scheduler']
378 alloc_id = us_scheduler['alloc_id']
379 self._create_tconts(uni_id, us_scheduler)
380 upstream_gem_port_attribute_list = tp['upstream_gem_port_attribute_list']
381 self._create_gemports(uni_id, upstream_gem_port_attribute_list, alloc_id, "UPSTREAM")
382 downstream_gem_port_attribute_list = tp['downstream_gem_port_attribute_list']
383 self._create_gemports(uni_id, downstream_gem_port_attribute_list, alloc_id, "DOWNSTREAM")
384
385 def load_and_configure_tech_profile(self, uni_id, tp_path):
386 self.log.debug("loading-tech-profile-configuration", uni_id=uni_id, tp_path=tp_path)
387
388 if uni_id not in self._tp_service_specific_task:
389 self._tp_service_specific_task[uni_id] = dict()
390
391 if uni_id not in self._tech_profile_download_done:
392 self._tech_profile_download_done[uni_id] = dict()
393
394 if tp_path not in self._tech_profile_download_done[uni_id]:
395 self._tech_profile_download_done[uni_id][tp_path] = False
396
397 if not self._tech_profile_download_done[uni_id][tp_path]:
398 try:
399 if tp_path in self._tp_service_specific_task[uni_id]:
400 self.log.info("tech-profile-config-already-in-progress",
401 tp_path=tp_path)
402 return
403
404 tp = self.kv_client[tp_path]
405 tp = ast.literal_eval(tp)
406 self.log.debug("tp-instance", tp=tp)
407 self._do_tech_profile_configuration(uni_id, tp)
408
409 def success(_results):
410 self.log.info("tech-profile-config-done-successfully")
411 device = self.adapter_agent.get_device(self.device_id)
412 device.reason = 'tech-profile-config-download-success'
413 self.adapter_agent.update_device(device)
414 if tp_path in self._tp_service_specific_task[uni_id]:
415 del self._tp_service_specific_task[uni_id][tp_path]
416 self._tech_profile_download_done[uni_id][tp_path] = True
417
418 def failure(_reason):
419 self.log.warn('tech-profile-config-failure-retrying',
420 _reason=_reason)
421 device = self.adapter_agent.get_device(self.device_id)
422 device.reason = 'tech-profile-config-download-failure-retrying'
423 self.adapter_agent.update_device(device)
424 if tp_path in self._tp_service_specific_task[uni_id]:
425 del self._tp_service_specific_task[uni_id][tp_path]
426 self._deferred = reactor.callLater(_STARTUP_RETRY_WAIT, self.load_and_configure_tech_profile,
427 uni_id, tp_path)
428
429 self.log.info('downloading-tech-profile-configuration')
430 self._tp_service_specific_task[uni_id][tp_path] = \
431 BrcmTpServiceSpecificTask(self.omci_agent, self, uni_id)
432 self._deferred = \
433 self._onu_omci_device.task_runner.queue_task(self._tp_service_specific_task[uni_id][tp_path])
434 self._deferred.addCallbacks(success, failure)
435
436 except Exception as e:
437 self.log.exception("error-loading-tech-profile", e=e)
438 else:
439 self.log.info("tech-profile-config-already-done")
440
441 def update_pm_config(self, device, pm_config):
442 # TODO: This has not been tested
443 self.log.info('update_pm_config', pm_config=pm_config)
444 self.pm_metrics.update(pm_config)
445
446 # Calling this assumes the onu is active/ready and had at least an initial mib downloaded. This gets called from
447 # flow decomposition that ultimately comes from onos
448 def update_flow_table(self, device, flows):
449 self.log.debug('function-entry', device=device, flows=flows)
450
451 #
452 # We need to proxy through the OLT to get to the ONU
453 # Configuration from here should be using OMCI
454 #
455 # self.log.info('bulk-flow-update', device_id=device.id, flows=flows)
456
457 # no point in pushing omci flows if the device isnt reachable
458 if device.connect_status != ConnectStatus.REACHABLE or \
459 device.admin_state != AdminState.ENABLED:
460 self.log.warn("device-disabled-or-offline-skipping-flow-update",
461 admin=device.admin_state, connect=device.connect_status)
462 return
463
464 def is_downstream(port):
465 return port == self._pon_port_number
466
467 def is_upstream(port):
468 return not is_downstream(port)
469
470 for flow in flows:
471 _type = None
472 _port = None
473 _vlan_vid = None
474 _udp_dst = None
475 _udp_src = None
476 _ipv4_dst = None
477 _ipv4_src = None
478 _metadata = None
479 _output = None
480 _push_tpid = None
481 _field = None
482 _set_vlan_vid = None
483 self.log.debug('bulk-flow-update', device_id=device.id, flow=flow)
484 try:
485 _in_port = fd.get_in_port(flow)
486 assert _in_port is not None
487
488 _out_port = fd.get_out_port(flow) # may be None
489
490 if is_downstream(_in_port):
491 self.log.debug('downstream-flow', in_port=_in_port, out_port=_out_port)
492 uni_port = self.uni_port(_out_port)
493 elif is_upstream(_in_port):
494 self.log.debug('upstream-flow', in_port=_in_port, out_port=_out_port)
495 uni_port = self.uni_port(_in_port)
496 else:
497 raise Exception('port should be 1 or 2 by our convention')
498
499 self.log.debug('flow-ports', in_port=_in_port, out_port=_out_port, uni_port=str(uni_port))
500
501 for field in fd.get_ofb_fields(flow):
502 if field.type == fd.ETH_TYPE:
503 _type = field.eth_type
504 self.log.debug('field-type-eth-type',
505 eth_type=_type)
506
507 elif field.type == fd.IP_PROTO:
508 _proto = field.ip_proto
509 self.log.debug('field-type-ip-proto',
510 ip_proto=_proto)
511
512 elif field.type == fd.IN_PORT:
513 _port = field.port
514 self.log.debug('field-type-in-port',
515 in_port=_port)
516
517 elif field.type == fd.VLAN_VID:
518 _vlan_vid = field.vlan_vid & 0xfff
519 self.log.debug('field-type-vlan-vid',
520 vlan=_vlan_vid)
521
522 elif field.type == fd.VLAN_PCP:
523 _vlan_pcp = field.vlan_pcp
524 self.log.debug('field-type-vlan-pcp',
525 pcp=_vlan_pcp)
526
527 elif field.type == fd.UDP_DST:
528 _udp_dst = field.udp_dst
529 self.log.debug('field-type-udp-dst',
530 udp_dst=_udp_dst)
531
532 elif field.type == fd.UDP_SRC:
533 _udp_src = field.udp_src
534 self.log.debug('field-type-udp-src',
535 udp_src=_udp_src)
536
537 elif field.type == fd.IPV4_DST:
538 _ipv4_dst = field.ipv4_dst
539 self.log.debug('field-type-ipv4-dst',
540 ipv4_dst=_ipv4_dst)
541
542 elif field.type == fd.IPV4_SRC:
543 _ipv4_src = field.ipv4_src
544 self.log.debug('field-type-ipv4-src',
545 ipv4_dst=_ipv4_src)
546
547 elif field.type == fd.METADATA:
548 _metadata = field.table_metadata
549 self.log.debug('field-type-metadata',
550 metadata=_metadata)
551
552 else:
553 raise NotImplementedError('field.type={}'.format(
554 field.type))
555
556 for action in fd.get_actions(flow):
557
558 if action.type == fd.OUTPUT:
559 _output = action.output.port
560 self.log.debug('action-type-output',
561 output=_output, in_port=_in_port)
562
563 elif action.type == fd.POP_VLAN:
564 self.log.debug('action-type-pop-vlan',
565 in_port=_in_port)
566
567 elif action.type == fd.PUSH_VLAN:
568 _push_tpid = action.push.ethertype
569 self.log.debug('action-type-push-vlan',
570 push_tpid=_push_tpid, in_port=_in_port)
571 if action.push.ethertype != 0x8100:
572 self.log.error('unhandled-tpid',
573 ethertype=action.push.ethertype)
574
575 elif action.type == fd.SET_FIELD:
576 _field = action.set_field.field.ofb_field
577 assert (action.set_field.field.oxm_class ==
578 OFPXMC_OPENFLOW_BASIC)
579 self.log.debug('action-type-set-field',
580 field=_field, in_port=_in_port)
581 if _field.type == fd.VLAN_VID:
582 _set_vlan_vid = _field.vlan_vid & 0xfff
583 self.log.debug('set-field-type-vlan-vid',
584 vlan_vid=_set_vlan_vid)
585 else:
586 self.log.error('unsupported-action-set-field-type',
587 field_type=_field.type)
588 else:
589 self.log.error('unsupported-action-type',
590 action_type=action.type, in_port=_in_port)
591
592 # TODO: We only set vlan omci flows. Handle omci matching ethertypes at some point in another task
593 if _type is not None:
594 self.log.warn('ignoring-flow-with-ethType', ethType=_type)
595 elif _set_vlan_vid is None or _set_vlan_vid == 0:
596 self.log.warn('ignorning-flow-that-does-not-set-vlanid')
597 else:
598 self.log.warn('set-vlanid', uni_id=uni_port.port_number, set_vlan_vid=_set_vlan_vid)
599 self._add_vlan_filter_task(device, uni_port, _set_vlan_vid)
600
601 except Exception as e:
602 self.log.exception('failed-to-install-flow', e=e, flow=flow)
603
604
605 def _add_vlan_filter_task(self, device, uni_port, _set_vlan_vid):
606 assert uni_port is not None
607
608 def success(_results):
609 self.log.info('vlan-tagging-success', uni_port=uni_port, vlan=_set_vlan_vid)
610 device.reason = 'omci-flows-pushed'
611 self._vlan_filter_task = None
612
613 def failure(_reason):
614 self.log.warn('vlan-tagging-failure', uni_port=uni_port, vlan=_set_vlan_vid)
615 device.reason = 'omci-flows-failed-retrying'
616 self._vlan_filter_task = reactor.callLater(_STARTUP_RETRY_WAIT,
617 self._add_vlan_filter_task, device, uni_port, _set_vlan_vid)
618
619 self.log.info('setting-vlan-tag')
620 self._vlan_filter_task = BrcmVlanFilterTask(self.omci_agent, self.device_id, uni_port, _set_vlan_vid)
621 self._deferred = self._onu_omci_device.task_runner.queue_task(self._vlan_filter_task)
622 self._deferred.addCallbacks(success, failure)
623
624 def get_tx_id(self):
625 self.log.debug('function-entry')
626 self.tx_id += 1
627 return self.tx_id
628
629 # TODO: Actually conform to or create a proper interface.
630 # this and the other functions called from the olt arent very clear.
631 # Called each time there is an onu "up" indication from the olt handler
632 def create_interface(self, data):
633 self.log.debug('function-entry', data=data)
634 self._onu_indication = data
635
636 onu_device = self.adapter_agent.get_device(self.device_id)
637
638 self.log.debug('starting-openomci-statemachine')
639 self._subscribe_to_events()
640 reactor.callLater(1, self._onu_omci_device.start)
641 onu_device.reason = "starting-openomci"
642 self.adapter_agent.update_device(onu_device)
643 self._heartbeat.enabled = True
644
645 # Currently called each time there is an onu "down" indication from the olt handler
646 # TODO: possibly other reasons to "update" from the olt?
647 def update_interface(self, data):
648 self.log.debug('function-entry', data=data)
649 oper_state = data.get('oper_state', None)
650
651 onu_device = self.adapter_agent.get_device(self.device_id)
652
653 if oper_state == 'down':
654 self.log.debug('stopping-openomci-statemachine')
655 reactor.callLater(0, self._onu_omci_device.stop)
656
657 # Let TP download happen again
658 for uni_id in self._tp_service_specific_task:
659 self._tp_service_specific_task[uni_id].clear()
660 for uni_id in self._tech_profile_download_done:
661 self._tech_profile_download_done[uni_id].clear()
662
663 self.disable_ports(onu_device)
664 onu_device.reason = "stopping-openomci"
665 onu_device.connect_status = ConnectStatus.UNREACHABLE
666 onu_device.oper_status = OperStatus.DISCOVERED
667 self.adapter_agent.update_device(onu_device)
668 else:
669 self.log.debug('not-changing-openomci-statemachine')
670
671 # Not currently called by olt or anything else
672 def remove_interface(self, data):
673 self.log.debug('function-entry', data=data)
674
675 onu_device = self.adapter_agent.get_device(self.device_id)
676
677 self.log.debug('stopping-openomci-statemachine')
678 reactor.callLater(0, self._onu_omci_device.stop)
679
680 # Let TP download happen again
681 for uni_id in self._tp_service_specific_task:
682 self._tp_service_specific_task[uni_id].clear()
683 for uni_id in self._tech_profile_download_done:
684 self._tech_profile_download_done[uni_id].clear()
685
686 self.disable_ports(onu_device)
687 onu_device.reason = "stopping-openomci"
688 self.adapter_agent.update_device(onu_device)
689
690 # TODO: im sure there is more to do here
691
692 # Not currently called. Would be called presumably from the olt handler
693 def remove_gemport(self, data):
694 self.log.debug('remove-gemport', data=data)
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500695 device = self.adapter_agent.get_device(self.device_id)
696 if device.connect_status != ConnectStatus.REACHABLE:
697 self.log.error('device-unreachable')
698 return
699
700 # Not currently called. Would be called presumably from the olt handler
701 def remove_tcont(self, tcont_data, traffic_descriptor_data):
702 self.log.debug('remove-tcont', tcont_data=tcont_data, traffic_descriptor_data=traffic_descriptor_data)
703 device = self.adapter_agent.get_device(self.device_id)
704 if device.connect_status != ConnectStatus.REACHABLE:
705 self.log.error('device-unreachable')
706 return
707
708 # TODO: Create some omci task that encompases this what intended
709
710 # Not currently called. Would be called presumably from the olt handler
711 def create_multicast_gemport(self, data):
712 self.log.debug('function-entry', data=data)
713
714 # TODO: create objects and populate for later omci calls
715
716 def disable(self, device):
717 self.log.debug('function-entry', device=device)
718 try:
719 self.log.info('sending-uni-lock-towards-device', device=device)
720
721 def stop_anyway(reason):
722 # proceed with disable regardless if we could reach the onu. for example onu is unplugged
723 self.log.debug('stopping-openomci-statemachine')
724 reactor.callLater(0, self._onu_omci_device.stop)
725
726 # Let TP download happen again
727 for uni_id in self._tp_service_specific_task:
728 self._tp_service_specific_task[uni_id].clear()
729 for uni_id in self._tech_profile_download_done:
730 self._tech_profile_download_done[uni_id].clear()
731
732 self.disable_ports(device)
733 device.oper_status = OperStatus.UNKNOWN
734 device.reason = "omci-admin-lock"
735 self.adapter_agent.update_device(device)
736
737 # lock all the unis
738 task = BrcmUniLockTask(self.omci_agent, self.device_id, lock=True)
739 self._deferred = self._onu_omci_device.task_runner.queue_task(task)
740 self._deferred.addCallbacks(stop_anyway, stop_anyway)
741 except Exception as e:
742 log.exception('exception-in-onu-disable', exception=e)
743
744 def reenable(self, device):
745 self.log.debug('function-entry', device=device)
746 try:
747 # Start up OpenOMCI state machines for this device
748 # this will ultimately resync mib and unlock unis on successful redownloading the mib
749 self.log.debug('restarting-openomci-statemachine')
750 self._subscribe_to_events()
751 device.reason = "restarting-openomci"
752 self.adapter_agent.update_device(device)
753 reactor.callLater(1, self._onu_omci_device.start)
754 self._heartbeat.enabled = True
755 except Exception as e:
756 log.exception('exception-in-onu-reenable', exception=e)
757
758 def reboot(self):
759 self.log.info('reboot-device')
760 device = self.adapter_agent.get_device(self.device_id)
761 if device.connect_status != ConnectStatus.REACHABLE:
762 self.log.error("device-unreachable")
763 return
764
765 def success(_results):
766 self.log.info('reboot-success', _results=_results)
767 self.disable_ports(device)
768 device.connect_status = ConnectStatus.UNREACHABLE
769 device.oper_status = OperStatus.DISCOVERED
770 device.reason = "rebooting"
771 self.adapter_agent.update_device(device)
772
773 def failure(_reason):
774 self.log.info('reboot-failure', _reason=_reason)
775
776 self._deferred = self._onu_omci_device.reboot()
777 self._deferred.addCallbacks(success, failure)
778
779 def disable_ports(self, onu_device):
780 self.log.info('disable-ports', device_id=self.device_id,
781 onu_device=onu_device)
782
783 # Disable all ports on that device
784 self.adapter_agent.disable_all_ports(self.device_id)
785
786 parent_device = self.adapter_agent.get_device(onu_device.parent_id)
787 assert parent_device
788 logical_device_id = parent_device.parent_id
789 assert logical_device_id
790 ports = self.adapter_agent.get_ports(onu_device.id, Port.ETHERNET_UNI)
791 for port in ports:
792 port_id = 'uni-{}'.format(port.port_no)
793 # TODO: move to UniPort
794 self.update_logical_port(logical_device_id, port_id, OFPPS_LINK_DOWN)
795
796 def enable_ports(self, onu_device):
797 self.log.info('enable-ports', device_id=self.device_id, onu_device=onu_device)
798
799 # Disable all ports on that device
800 self.adapter_agent.enable_all_ports(self.device_id)
801
802 parent_device = self.adapter_agent.get_device(onu_device.parent_id)
803 assert parent_device
804 logical_device_id = parent_device.parent_id
805 assert logical_device_id
806 ports = self.adapter_agent.get_ports(onu_device.id, Port.ETHERNET_UNI)
807 for port in ports:
808 port_id = 'uni-{}'.format(port.port_no)
809 # TODO: move to UniPort
810 self.update_logical_port(logical_device_id, port_id, OFPPS_LIVE)
811
812 # Called just before openomci state machine is started. These listen for events from selected state machines,
813 # most importantly, mib in sync. Which ultimately leads to downloading the mib
814 def _subscribe_to_events(self):
815 self.log.debug('function-entry')
816
817 # OMCI MIB Database sync status
818 bus = self._onu_omci_device.event_bus
819 topic = OnuDeviceEntry.event_bus_topic(self.device_id,
820 OnuDeviceEvents.MibDatabaseSyncEvent)
821 self._in_sync_subscription = bus.subscribe(topic, self.in_sync_handler)
822
823 # OMCI Capabilities
824 bus = self._onu_omci_device.event_bus
825 topic = OnuDeviceEntry.event_bus_topic(self.device_id,
826 OnuDeviceEvents.OmciCapabilitiesEvent)
827 self._capabilities_subscription = bus.subscribe(topic, self.capabilties_handler)
828
829 # Called when the mib is in sync
830 def in_sync_handler(self, _topic, msg):
831 self.log.debug('function-entry', _topic=_topic, msg=msg)
832 if self._in_sync_subscription is not None:
833 try:
834 in_sync = msg[IN_SYNC_KEY]
835
836 if in_sync:
837 # Only call this once
838 bus = self._onu_omci_device.event_bus
839 bus.unsubscribe(self._in_sync_subscription)
840 self._in_sync_subscription = None
841
842 # Start up device_info load
843 self.log.debug('running-mib-sync')
844 reactor.callLater(0, self._mib_in_sync)
845
846 except Exception as e:
847 self.log.exception('in-sync', e=e)
848
849 def capabilties_handler(self, _topic, _msg):
850 self.log.debug('function-entry', _topic=_topic, msg=_msg)
851 if self._capabilities_subscription is not None:
852 self.log.debug('capabilities-handler-done')
853
854 # Mib is in sync, we can now query what we learned and actually start pushing ME (download) to the ONU.
855 # Currently uses a basic mib download task that create a bridge with a single gem port and uni, only allowing EAP
856 # Implement your own MibDownloadTask if you wish to setup something different by default
857 def _mib_in_sync(self):
858 self.log.debug('function-entry')
859
860 omci = self._onu_omci_device
861 in_sync = omci.mib_db_in_sync
862
863 device = self.adapter_agent.get_device(self.device_id)
864 device.reason = 'discovery-mibsync-complete'
865 self.adapter_agent.update_device(device)
866
867 if not self._dev_info_loaded:
868 self.log.info('loading-device-data-from-mib', in_sync=in_sync, already_loaded=self._dev_info_loaded)
869
870 omci_dev = self._onu_omci_device
871 config = omci_dev.configuration
872
873 # TODO: run this sooner somehow. shouldnt have to wait for mib sync to push an initial download
874 # In Sync, we can register logical ports now. Ideally this could occur on
875 # the first time we received a successful (no timeout) OMCI Rx response.
876 try:
877
878 # sort the lists so we get consistent port ordering.
879 ani_list = sorted(config.ani_g_entities) if config.ani_g_entities else []
880 uni_list = sorted(config.uni_g_entities) if config.uni_g_entities else []
881 pptp_list = sorted(config.pptp_entities) if config.pptp_entities else []
882 veip_list = sorted(config.veip_entities) if config.veip_entities else []
883
884 if ani_list is None or (pptp_list is None and veip_list is None):
885 device.reason = 'onu-missing-required-elements'
886 self.log.warn("no-ani-or-unis")
887 self.adapter_agent.update_device(device)
888 raise Exception("onu-missing-required-elements")
889
890 # Currently logging the ani, pptp, veip, and uni for information purposes.
891 # Actually act on the veip/pptp as its ME is the most correct one to use in later tasks.
892 # And in some ONU the UNI-G list is incomplete or incorrect...
893 for entity_id in ani_list:
894 ani_value = config.ani_g_entities[entity_id]
895 self.log.debug("discovered-ani", entity_id=entity_id, value=ani_value)
896 # TODO: currently only one OLT PON port/ANI, so this works out. With NGPON there will be 2..?
897 self._total_tcont_count = ani_value.get('total-tcont-count')
898 self.log.debug("set-total-tcont-count", tcont_count=self._total_tcont_count)
899
900 for entity_id in uni_list:
901 uni_value = config.uni_g_entities[entity_id]
902 self.log.debug("discovered-uni", entity_id=entity_id, value=uni_value)
903
904 uni_entities = OrderedDict()
905 for entity_id in pptp_list:
906 pptp_value = config.pptp_entities[entity_id]
907 self.log.debug("discovered-pptp", entity_id=entity_id, value=pptp_value)
908 uni_entities[entity_id] = UniType.PPTP
909
910 for entity_id in veip_list:
911 veip_value = config.veip_entities[entity_id]
912 self.log.debug("discovered-veip", entity_id=entity_id, value=veip_value)
913 uni_entities[entity_id] = UniType.VEIP
914
915 uni_id = 0
916 for entity_id, uni_type in uni_entities.iteritems():
917 try:
918 self._add_uni_port(entity_id, uni_id, uni_type)
919 uni_id += 1
920 except AssertionError as e:
921 self.log.warn("could not add UNI", entity_id=entity_id, uni_type=uni_type, e=e)
922
923 multi_uni = len(self._unis) > 1
924 for uni_port in self._unis.itervalues():
925 uni_port.add_logical_port(uni_port.port_number, multi_uni)
926
927 self.adapter_agent.update_device(device)
928
929 self._qos_flexibility = config.qos_configuration_flexibility or 0
930 self._omcc_version = config.omcc_version or OMCCVersion.Unknown
931
932 if self._unis:
933 self._dev_info_loaded = True
934 else:
935 device.reason = 'no-usable-unis'
936 self.adapter_agent.update_device(device)
937 self.log.warn("no-usable-unis")
938 raise Exception("no-usable-unis")
939
940 except Exception as e:
941 self.log.exception('device-info-load', e=e)
942 self._deferred = reactor.callLater(_STARTUP_RETRY_WAIT, self._mib_in_sync)
943
944 else:
945 self.log.info('device-info-already-loaded', in_sync=in_sync, already_loaded=self._dev_info_loaded)
946
947 if self._dev_info_loaded:
948 if device.admin_state == AdminState.ENABLED:
949 def success(_results):
950 self.log.info('mib-download-success', _results=_results)
951 device = self.adapter_agent.get_device(self.device_id)
952 device.reason = 'initial-mib-downloaded'
953 device.oper_status = OperStatus.ACTIVE
954 device.connect_status = ConnectStatus.REACHABLE
955 self.enable_ports(device)
956 self.adapter_agent.update_device(device)
957 self._mib_download_task = None
958
959 def failure(_reason):
960 self.log.warn('mib-download-failure-retrying', _reason=_reason)
961 device.reason = 'initial-mib-download-failure-retrying'
962 self.adapter_agent.update_device(device)
963 self._deferred = reactor.callLater(_STARTUP_RETRY_WAIT, self._mib_in_sync)
964
965 # Download an initial mib that creates simple bridge that can pass EAP. On success (above) finally set
966 # the device to active/reachable. This then opens up the handler to openflow pushes from outside
967 self.log.info('downloading-initial-mib-configuration')
968 self._mib_download_task = BrcmMibDownloadTask(self.omci_agent, self)
969 self._deferred = self._onu_omci_device.task_runner.queue_task(self._mib_download_task)
970 self._deferred.addCallbacks(success, failure)
971 else:
972 self.log.info('admin-down-disabling')
973 self.disable(device)
974 else:
975 self.log.info('device-info-not-loaded-skipping-mib-download')
976
977
978 def _add_uni_port(self, entity_id, uni_id, uni_type=UniType.PPTP):
979 self.log.debug('function-entry')
980
981 device = self.adapter_agent.get_device(self.device_id)
982 parent_device = self.adapter_agent.get_device(device.parent_id)
983
984 parent_adapter_agent = registry('adapter_loader').get_agent(parent_device.adapter)
985 if parent_adapter_agent is None:
986 self.log.error('parent-adapter-could-not-be-retrieved')
987
988 # TODO: This knowledge is locked away in openolt. and it assumes one onu equals one uni...
989 parent_device = self.adapter_agent.get_device(device.parent_id)
990 parent_adapter = parent_adapter_agent.adapter.devices[parent_device.id]
991 uni_no = parent_adapter.platform.mk_uni_port_num(
992 self._onu_indication.intf_id, self._onu_indication.onu_id, uni_id)
993
994 # TODO: Some or parts of this likely need to move to UniPort. especially the format stuff
995 uni_name = "uni-{}".format(uni_no)
996
997 mac_bridge_port_num = uni_id + 1 # TODO +1 is only to test non-zero index
998
999 self.log.debug('uni-port-inputs', uni_no=uni_no, uni_id=uni_id, uni_name=uni_name, uni_type=uni_type,
1000 entity_id=entity_id, mac_bridge_port_num=mac_bridge_port_num)
1001
1002 uni_port = UniPort.create(self, uni_name, uni_id, uni_no, uni_name, uni_type)
1003 uni_port.entity_id = entity_id
1004 uni_port.enabled = True
1005 uni_port.mac_bridge_port_num = mac_bridge_port_num
1006
1007 self.log.debug("created-uni-port", uni=uni_port)
1008
1009 self.adapter_agent.add_port(device.id, uni_port.get_port())
1010 parent_adapter_agent.add_port(device.parent_id, uni_port.get_port())
1011
1012 self._unis[uni_port.port_number] = uni_port
1013
1014 self._onu_omci_device.alarm_synchronizer.set_alarm_params(onu_id=self._onu_indication.onu_id,
1015 uni_ports=self._unis.values())
1016 # TODO: this should be in the PonPortclass
1017 pon_port = self._pon.get_port()
1018
1019 # Delete reference to my own UNI as peer from parent.
1020 # TODO why is this here, add_port_reference_to_parent already prunes duplicates
1021 me_as_peer = Port.PeerPort(device_id=device.parent_id, port_no=uni_port.port_number)
1022 partial_pon_port = Port(port_no=pon_port.port_no, label=pon_port.label,
1023 type=pon_port.type, admin_state=pon_port.admin_state,
1024 oper_status=pon_port.oper_status,
1025 peers=[me_as_peer]) # only list myself as a peer to avoid deleting all other UNIs from parent
1026 self.adapter_agent.delete_port_reference_from_parent(self.device_id, partial_pon_port)
1027
1028 pon_port.peers.extend([me_as_peer])
1029
1030 self._pon._port = pon_port
1031
1032 self.adapter_agent.add_port_reference_to_parent(self.device_id,
1033 pon_port)