blob: 817a1389542d10b460cc80876f1f4b5d2cdda75b [file] [log] [blame]
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -05001#
2# Copyright 2017 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18Broadcom OpenOMCI OLT/ONU adapter handler.
19"""
20
21import json
22import ast
23import structlog
24
25from collections import OrderedDict
26
27from twisted.internet import reactor, task
28from twisted.internet.defer import DeferredQueue, inlineCallbacks, returnValue, TimeoutError
29
30from heartbeat import HeartBeat
Matt Jeanneret72f96fc2019-02-11 10:53:05 -050031from pyvoltha.adapters.extensions.kpi.onu.onu_pm_metrics import OnuPmMetrics
32from pyvoltha.adapters.extensions.kpi.onu.onu_omci_pm import OnuOmciPmMetrics
33from pyvoltha.adapters.extensions.alarms.adapter_alarms import AdapterAlarms
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050034
Matt Jeanneret72f96fc2019-02-11 10:53:05 -050035import pyvoltha.common.openflow.utils as fd
36from pyvoltha.common.utils.registry import registry
37from pyvoltha.common.config.config_backend import ConsulStore
38from pyvoltha.common.config.config_backend import EtcdStore
39from pyvoltha.protos import third_party
40from pyvoltha.protos.common_pb2 import OperStatus, ConnectStatus, AdminState
41from pyvoltha.protos.openflow_13_pb2 import OFPXMC_OPENFLOW_BASIC, ofp_port
42from pyvoltha.adapters.extensions.omci.onu_configuration import OMCCVersion
43from pyvoltha.adapters.extensions.omci.onu_device_entry import OnuDeviceEvents, \
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050044 OnuDeviceEntry, IN_SYNC_KEY
Matt Jeanneret72f96fc2019-02-11 10:53:05 -050045from omci.brcm_mib_download_task import BrcmMibDownloadTask
46from omci.brcm_tp_service_specific_task import BrcmTpServiceSpecificTask
47from omci.brcm_uni_lock_task import BrcmUniLockTask
48from omci.brcm_vlan_filter_task import BrcmVlanFilterTask
49from onu_gem_port import *
50from onu_tcont import *
51from pon_port import *
52from uni_port import *
53from onu_traffic_descriptor import *
54from pyvoltha.common.tech_profile.tech_profile import TechProfile
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050055
56OP = EntityOperations
57RC = ReasonCodes
58
59_ = third_party
60log = structlog.get_logger()
61
62_STARTUP_RETRY_WAIT = 20
63
64
65class BrcmOpenomciOnuHandler(object):
66
67 def __init__(self, adapter, device_id):
68 self.log = structlog.get_logger(device_id=device_id)
69 self.log.debug('function-entry')
70 self.adapter = adapter
71 self.adapter_agent = adapter.adapter_agent
72 self.parent_adapter = None
73 self.parent_id = None
74 self.device_id = device_id
75 self.incoming_messages = DeferredQueue()
76 self.event_messages = DeferredQueue()
77 self.proxy_address = None
78 self.tx_id = 0
79 self._enabled = False
80 self.alarms = None
81 self.pm_metrics = None
82 self._omcc_version = OMCCVersion.Unknown
83 self._total_tcont_count = 0 # From ANI-G ME
84 self._qos_flexibility = 0 # From ONT2_G ME
85
86 self._onu_indication = None
87 self._unis = dict() # Port # -> UniPort
88
89 self._pon = None
90 # TODO: probably shouldnt be hardcoded, determine from olt maybe?
91 self._pon_port_number = 100
92 self.logical_device_id = None
93
94 self._heartbeat = HeartBeat.create(self, device_id)
95
96 # Set up OpenOMCI environment
97 self._onu_omci_device = None
98 self._dev_info_loaded = False
99 self._deferred = None
100
101 self._in_sync_subscription = None
102 self._connectivity_subscription = None
103 self._capabilities_subscription = None
104
105 self.mac_bridge_service_profile_entity_id = 0x201
106 self.gal_enet_profile_entity_id = 0x1
107
108 self._tp_service_specific_task = dict()
109 self._tech_profile_download_done = dict()
110
111 # Initialize KV store client
112 self.args = registry('main').get_args()
113 if self.args.backend == 'etcd':
114 host, port = self.args.etcd.split(':', 1)
115 self.kv_client = EtcdStore(host, port,
116 TechProfile.KV_STORE_TECH_PROFILE_PATH_PREFIX)
117 elif self.args.backend == 'consul':
118 host, port = self.args.consul.split(':', 1)
119 self.kv_client = ConsulStore(host, port,
120 TechProfile.KV_STORE_TECH_PROFILE_PATH_PREFIX)
121 else:
122 self.log.error('Invalid-backend')
123 raise Exception("Invalid-backend-for-kv-store")
124
125 # Handle received ONU event messages
126 reactor.callLater(0, self.handle_onu_events)
127
128 @property
129 def enabled(self):
130 return self._enabled
131
132 @enabled.setter
133 def enabled(self, value):
134 if self._enabled != value:
135 self._enabled = value
136
137 @property
138 def omci_agent(self):
139 return self.adapter.omci_agent
140
141 @property
142 def omci_cc(self):
143 return self._onu_omci_device.omci_cc if self._onu_omci_device is not None else None
144
145 @property
146 def heartbeat(self):
147 return self._heartbeat
148
149 @property
150 def uni_ports(self):
151 return self._unis.values()
152
153 def uni_port(self, port_no_or_name):
154 if isinstance(port_no_or_name, (str, unicode)):
155 return next((uni for uni in self.uni_ports
156 if uni.name == port_no_or_name), None)
157
158 assert isinstance(port_no_or_name, int), 'Invalid parameter type'
159 return next((uni for uni in self.uni_ports
160 if uni.logical_port_number == port_no_or_name), None)
161
162 @property
163 def pon_port(self):
164 return self._pon
165
166 def receive_message(self, msg):
167 if self.omci_cc is not None:
168 self.omci_cc.receive_message(msg)
169
170 # Called once when the adapter creates the device/onu instance
Matt Jeanneret84e56f62019-02-26 10:48:09 -0500171 @inlineCallbacks
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500172 def activate(self, device):
173 self.log.debug('function-entry', device=device)
174
175 # first we verify that we got parent reference and proxy info
176 assert device.parent_id
177 assert device.proxy_address.device_id
178
179 # register for proxied messages right away
180 self.proxy_address = device.proxy_address
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500181 self.parent_id = device.parent_id
Matt Jeanneret84e56f62019-02-26 10:48:09 -0500182 # TODO NEW CORE: seems no reason for this now
183 #parent_device = yield self.adapter_agent.get_device(self.parent_id)
184 #if parent_device.type == 'openolt':
185 # self.parent_adapter = registry('adapter_loader'). \
186 # get_agent(parent_device.adapter).adapter
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500187
188 if self.enabled is not True:
189 self.log.info('activating-new-onu')
190 # populate what we know. rest comes later after mib sync
191 device.root = True
192 device.vendor = 'Broadcom'
193 device.connect_status = ConnectStatus.REACHABLE
194 device.oper_status = OperStatus.DISCOVERED
195 device.reason = 'activating-onu'
196
Matt Jeanneret84e56f62019-02-26 10:48:09 -0500197 # TODO NEW CORE: Need to either get logical device id from core or use regular device id
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500198 # pm_metrics requires a logical device id
Matt Jeanneret84e56f62019-02-26 10:48:09 -0500199 #parent_device = yield self.adapter_agent.get_device(device.parent_id)
200 #self.logical_device_id = parent_device.parent_id
201 #assert self.logical_device_id, 'Invalid logical device ID'
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500202
Matt Jeanneret84e56f62019-02-26 10:48:09 -0500203 yield self.adapter_agent.device_update(device)
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500204
205 self.log.debug('set-device-discovered')
206
207 self._init_pon_state(device)
208
209 ############################################################################
210 # Setup PM configuration for this device
211 # Pass in ONU specific options
212 kwargs = {
213 OnuPmMetrics.DEFAULT_FREQUENCY_KEY: OnuPmMetrics.DEFAULT_ONU_COLLECTION_FREQUENCY,
214 'heartbeat': self.heartbeat,
215 OnuOmciPmMetrics.OMCI_DEV_KEY: self._onu_omci_device
216 }
217 self.pm_metrics = OnuPmMetrics(self.adapter_agent, self.device_id,
218 self.logical_device_id, grouped=True,
219 freq_override=False, **kwargs)
220 pm_config = self.pm_metrics.make_proto()
221 self._onu_omci_device.set_pm_config(self.pm_metrics.omci_pm.openomci_interval_pm)
222 self.log.info("initial-pm-config", pm_config=pm_config)
Matt Jeanneret84e56f62019-02-26 10:48:09 -0500223 yield self.adapter_agent.device_pm_config_update(pm_config, init=True)
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500224
225 ############################################################################
226 # Setup Alarm handler
227 self.alarms = AdapterAlarms(self.adapter_agent, device.id, self.logical_device_id)
228 # Note, ONU ID and UNI intf set in add_uni_port method
229 self._onu_omci_device.alarm_synchronizer.set_alarm_params(mgr=self.alarms,
230 ani_ports=[self._pon])
231 self.enabled = True
232 else:
233 self.log.info('onu-already-activated')
234
235 # Called once when the adapter needs to re-create device. usually on vcore restart
236 def reconcile(self, device):
237 self.log.debug('function-entry', device=device)
238
239 # first we verify that we got parent reference and proxy info
240 assert device.parent_id
241 assert device.proxy_address.device_id
242
243 # register for proxied messages right away
244 self.proxy_address = device.proxy_address
245 self.adapter_agent.register_for_proxied_messages(device.proxy_address)
246
247 if self.enabled is not True:
248 self.log.info('reconciling-broadcom-onu-device')
249
250 self._init_pon_state(device)
251
252 # need to restart state machines on vcore restart. there is no indication to do it for us.
253 self._onu_omci_device.start()
254 device.reason = "restarting-openomci"
255 self.adapter_agent.update_device(device)
256
257 # TODO: this is probably a bit heavy handed
258 # Force a reboot for now. We need indications to reflow to reassign tconts and gems given vcore went away
259 # This may not be necessary when mib resync actually works
260 reactor.callLater(1, self.reboot)
261
262 self.enabled = True
263 else:
264 self.log.info('onu-already-activated')
265
266 @inlineCallbacks
267 def handle_onu_events(self):
268 event_msg = yield self.event_messages.get()
269 try:
270 if event_msg['event'] == 'download_tech_profile':
271 tp_path = event_msg['event_data']
272 uni_id = event_msg['uni_id']
273 self.load_and_configure_tech_profile(uni_id, tp_path)
274
275 except Exception as e:
276 self.log.error("exception-handling-onu-event", e=e)
277
278 # Handle next event
279 reactor.callLater(0, self.handle_onu_events)
280
Matt Jeanneret84e56f62019-02-26 10:48:09 -0500281 @inlineCallbacks
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500282 def _init_pon_state(self, device):
283 self.log.debug('function-entry', device=device)
284
285 self._pon = PonPort.create(self, self._pon_port_number)
Matt Jeanneret84e56f62019-02-26 10:48:09 -0500286 yield self.adapter_agent.port_created(device.id, self._pon.get_port())
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500287
288 self.log.debug('added-pon-port-to-agent', pon=self._pon)
289
Matt Jeanneret84e56f62019-02-26 10:48:09 -0500290 # TODO NEW CORE: Need to either get logical device id from core or use regular device id
291 #parent_device = yield self.adapter_agent.get_device(device.parent_id)
292 #self.logical_device_id = parent_device.parent_id
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500293
Matt Jeanneret84e56f62019-02-26 10:48:09 -0500294 #self.adapter_agent.update_device(device)
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500295
296 # Create and start the OpenOMCI ONU Device Entry for this ONU
297 self._onu_omci_device = self.omci_agent.add_device(self.device_id,
298 self.adapter_agent,
299 support_classes=self.adapter.broadcom_omci,
300 custom_me_map=self.adapter.custom_me_entities())
301 # Port startup
302 if self._pon is not None:
303 self._pon.enabled = True
304
305 # TODO: move to UniPort
306 def update_logical_port(self, logical_device_id, port_id, state):
307 try:
308 self.log.info('updating-logical-port', logical_port_id=port_id,
309 logical_device_id=logical_device_id, state=state)
310 logical_port = self.adapter_agent.get_logical_port(logical_device_id,
311 port_id)
312 logical_port.ofp_port.state = state
313 self.adapter_agent.update_logical_port(logical_device_id,
314 logical_port)
315 except Exception as e:
316 self.log.exception("exception-updating-port", e=e)
317
318 def delete(self, device):
319 self.log.info('delete-onu', device=device)
320 if self.parent_adapter:
321 try:
322 self.parent_adapter.delete_child_device(self.parent_id, device)
323 except AttributeError:
324 self.log.debug('parent-device-delete-child-not-implemented')
325 else:
326 self.log.debug("parent-adapter-not-available")
327
328 def _create_tconts(self, uni_id, us_scheduler):
329 alloc_id = us_scheduler['alloc_id']
330 q_sched_policy = us_scheduler['q_sched_policy']
331 self.log.debug('create-tcont', us_scheduler=us_scheduler)
332
333 tcontdict = dict()
334 tcontdict['alloc-id'] = alloc_id
335 tcontdict['q_sched_policy'] = q_sched_policy
336 tcontdict['uni_id'] = uni_id
337
338 # TODO: Not sure what to do with any of this...
339 tddata = dict()
340 tddata['name'] = 'not-sure-td-profile'
341 tddata['fixed-bandwidth'] = "not-sure-fixed"
342 tddata['assured-bandwidth'] = "not-sure-assured"
343 tddata['maximum-bandwidth'] = "not-sure-max"
344 tddata['additional-bw-eligibility-indicator'] = "not-sure-additional"
345
346 td = OnuTrafficDescriptor.create(tddata)
347 tcont = OnuTCont.create(self, tcont=tcontdict, td=td)
348
349 self._pon.add_tcont(tcont)
350
351 self.log.debug('pon-add-tcont', tcont=tcont)
352
353 # Called when there is an olt up indication, providing the gem port id chosen by the olt handler
354 def _create_gemports(self, uni_id, gem_ports, alloc_id_ref, direction):
355 self.log.debug('create-gemport',
356 gem_ports=gem_ports, direction=direction)
357
358 for gem_port in gem_ports:
359 gemdict = dict()
360 gemdict['gemport_id'] = gem_port['gemport_id']
361 gemdict['direction'] = direction
362 gemdict['alloc_id_ref'] = alloc_id_ref
363 gemdict['encryption'] = gem_port['aes_encryption']
364 gemdict['discard_config'] = dict()
365 gemdict['discard_config']['max_probability'] = \
366 gem_port['discard_config']['max_probability']
367 gemdict['discard_config']['max_threshold'] = \
368 gem_port['discard_config']['max_threshold']
369 gemdict['discard_config']['min_threshold'] = \
370 gem_port['discard_config']['min_threshold']
371 gemdict['discard_policy'] = gem_port['discard_policy']
372 gemdict['max_q_size'] = gem_port['max_q_size']
373 gemdict['pbit_map'] = gem_port['pbit_map']
374 gemdict['priority_q'] = gem_port['priority_q']
375 gemdict['scheduling_policy'] = gem_port['scheduling_policy']
376 gemdict['weight'] = gem_port['weight']
377 gemdict['uni_id'] = uni_id
378
379 gem_port = OnuGemPort.create(self, gem_port=gemdict)
380
381 self._pon.add_gem_port(gem_port)
382
383 self.log.debug('pon-add-gemport', gem_port=gem_port)
384
385 def _do_tech_profile_configuration(self, uni_id, tp):
386 num_of_tconts = tp['num_of_tconts']
387 us_scheduler = tp['us_scheduler']
388 alloc_id = us_scheduler['alloc_id']
389 self._create_tconts(uni_id, us_scheduler)
390 upstream_gem_port_attribute_list = tp['upstream_gem_port_attribute_list']
391 self._create_gemports(uni_id, upstream_gem_port_attribute_list, alloc_id, "UPSTREAM")
392 downstream_gem_port_attribute_list = tp['downstream_gem_port_attribute_list']
393 self._create_gemports(uni_id, downstream_gem_port_attribute_list, alloc_id, "DOWNSTREAM")
394
395 def load_and_configure_tech_profile(self, uni_id, tp_path):
396 self.log.debug("loading-tech-profile-configuration", uni_id=uni_id, tp_path=tp_path)
397
398 if uni_id not in self._tp_service_specific_task:
399 self._tp_service_specific_task[uni_id] = dict()
400
401 if uni_id not in self._tech_profile_download_done:
402 self._tech_profile_download_done[uni_id] = dict()
403
404 if tp_path not in self._tech_profile_download_done[uni_id]:
405 self._tech_profile_download_done[uni_id][tp_path] = False
406
407 if not self._tech_profile_download_done[uni_id][tp_path]:
408 try:
409 if tp_path in self._tp_service_specific_task[uni_id]:
410 self.log.info("tech-profile-config-already-in-progress",
411 tp_path=tp_path)
412 return
413
414 tp = self.kv_client[tp_path]
415 tp = ast.literal_eval(tp)
416 self.log.debug("tp-instance", tp=tp)
417 self._do_tech_profile_configuration(uni_id, tp)
418
419 def success(_results):
420 self.log.info("tech-profile-config-done-successfully")
421 device = self.adapter_agent.get_device(self.device_id)
422 device.reason = 'tech-profile-config-download-success'
423 self.adapter_agent.update_device(device)
424 if tp_path in self._tp_service_specific_task[uni_id]:
425 del self._tp_service_specific_task[uni_id][tp_path]
426 self._tech_profile_download_done[uni_id][tp_path] = True
427
428 def failure(_reason):
429 self.log.warn('tech-profile-config-failure-retrying',
430 _reason=_reason)
431 device = self.adapter_agent.get_device(self.device_id)
432 device.reason = 'tech-profile-config-download-failure-retrying'
433 self.adapter_agent.update_device(device)
434 if tp_path in self._tp_service_specific_task[uni_id]:
435 del self._tp_service_specific_task[uni_id][tp_path]
436 self._deferred = reactor.callLater(_STARTUP_RETRY_WAIT, self.load_and_configure_tech_profile,
437 uni_id, tp_path)
438
439 self.log.info('downloading-tech-profile-configuration')
440 self._tp_service_specific_task[uni_id][tp_path] = \
441 BrcmTpServiceSpecificTask(self.omci_agent, self, uni_id)
442 self._deferred = \
443 self._onu_omci_device.task_runner.queue_task(self._tp_service_specific_task[uni_id][tp_path])
444 self._deferred.addCallbacks(success, failure)
445
446 except Exception as e:
447 self.log.exception("error-loading-tech-profile", e=e)
448 else:
449 self.log.info("tech-profile-config-already-done")
450
451 def update_pm_config(self, device, pm_config):
452 # TODO: This has not been tested
453 self.log.info('update_pm_config', pm_config=pm_config)
454 self.pm_metrics.update(pm_config)
455
456 # Calling this assumes the onu is active/ready and had at least an initial mib downloaded. This gets called from
457 # flow decomposition that ultimately comes from onos
458 def update_flow_table(self, device, flows):
459 self.log.debug('function-entry', device=device, flows=flows)
460
461 #
462 # We need to proxy through the OLT to get to the ONU
463 # Configuration from here should be using OMCI
464 #
465 # self.log.info('bulk-flow-update', device_id=device.id, flows=flows)
466
467 # no point in pushing omci flows if the device isnt reachable
468 if device.connect_status != ConnectStatus.REACHABLE or \
469 device.admin_state != AdminState.ENABLED:
470 self.log.warn("device-disabled-or-offline-skipping-flow-update",
471 admin=device.admin_state, connect=device.connect_status)
472 return
473
474 def is_downstream(port):
475 return port == self._pon_port_number
476
477 def is_upstream(port):
478 return not is_downstream(port)
479
480 for flow in flows:
481 _type = None
482 _port = None
483 _vlan_vid = None
484 _udp_dst = None
485 _udp_src = None
486 _ipv4_dst = None
487 _ipv4_src = None
488 _metadata = None
489 _output = None
490 _push_tpid = None
491 _field = None
492 _set_vlan_vid = None
493 self.log.debug('bulk-flow-update', device_id=device.id, flow=flow)
494 try:
495 _in_port = fd.get_in_port(flow)
496 assert _in_port is not None
497
498 _out_port = fd.get_out_port(flow) # may be None
499
500 if is_downstream(_in_port):
501 self.log.debug('downstream-flow', in_port=_in_port, out_port=_out_port)
502 uni_port = self.uni_port(_out_port)
503 elif is_upstream(_in_port):
504 self.log.debug('upstream-flow', in_port=_in_port, out_port=_out_port)
505 uni_port = self.uni_port(_in_port)
506 else:
507 raise Exception('port should be 1 or 2 by our convention')
508
509 self.log.debug('flow-ports', in_port=_in_port, out_port=_out_port, uni_port=str(uni_port))
510
511 for field in fd.get_ofb_fields(flow):
512 if field.type == fd.ETH_TYPE:
513 _type = field.eth_type
514 self.log.debug('field-type-eth-type',
515 eth_type=_type)
516
517 elif field.type == fd.IP_PROTO:
518 _proto = field.ip_proto
519 self.log.debug('field-type-ip-proto',
520 ip_proto=_proto)
521
522 elif field.type == fd.IN_PORT:
523 _port = field.port
524 self.log.debug('field-type-in-port',
525 in_port=_port)
526
527 elif field.type == fd.VLAN_VID:
528 _vlan_vid = field.vlan_vid & 0xfff
529 self.log.debug('field-type-vlan-vid',
530 vlan=_vlan_vid)
531
532 elif field.type == fd.VLAN_PCP:
533 _vlan_pcp = field.vlan_pcp
534 self.log.debug('field-type-vlan-pcp',
535 pcp=_vlan_pcp)
536
537 elif field.type == fd.UDP_DST:
538 _udp_dst = field.udp_dst
539 self.log.debug('field-type-udp-dst',
540 udp_dst=_udp_dst)
541
542 elif field.type == fd.UDP_SRC:
543 _udp_src = field.udp_src
544 self.log.debug('field-type-udp-src',
545 udp_src=_udp_src)
546
547 elif field.type == fd.IPV4_DST:
548 _ipv4_dst = field.ipv4_dst
549 self.log.debug('field-type-ipv4-dst',
550 ipv4_dst=_ipv4_dst)
551
552 elif field.type == fd.IPV4_SRC:
553 _ipv4_src = field.ipv4_src
554 self.log.debug('field-type-ipv4-src',
555 ipv4_dst=_ipv4_src)
556
557 elif field.type == fd.METADATA:
558 _metadata = field.table_metadata
559 self.log.debug('field-type-metadata',
560 metadata=_metadata)
561
562 else:
563 raise NotImplementedError('field.type={}'.format(
564 field.type))
565
566 for action in fd.get_actions(flow):
567
568 if action.type == fd.OUTPUT:
569 _output = action.output.port
570 self.log.debug('action-type-output',
571 output=_output, in_port=_in_port)
572
573 elif action.type == fd.POP_VLAN:
574 self.log.debug('action-type-pop-vlan',
575 in_port=_in_port)
576
577 elif action.type == fd.PUSH_VLAN:
578 _push_tpid = action.push.ethertype
579 self.log.debug('action-type-push-vlan',
580 push_tpid=_push_tpid, in_port=_in_port)
581 if action.push.ethertype != 0x8100:
582 self.log.error('unhandled-tpid',
583 ethertype=action.push.ethertype)
584
585 elif action.type == fd.SET_FIELD:
586 _field = action.set_field.field.ofb_field
587 assert (action.set_field.field.oxm_class ==
588 OFPXMC_OPENFLOW_BASIC)
589 self.log.debug('action-type-set-field',
590 field=_field, in_port=_in_port)
591 if _field.type == fd.VLAN_VID:
592 _set_vlan_vid = _field.vlan_vid & 0xfff
593 self.log.debug('set-field-type-vlan-vid',
594 vlan_vid=_set_vlan_vid)
595 else:
596 self.log.error('unsupported-action-set-field-type',
597 field_type=_field.type)
598 else:
599 self.log.error('unsupported-action-type',
600 action_type=action.type, in_port=_in_port)
601
602 # TODO: We only set vlan omci flows. Handle omci matching ethertypes at some point in another task
603 if _type is not None:
604 self.log.warn('ignoring-flow-with-ethType', ethType=_type)
605 elif _set_vlan_vid is None or _set_vlan_vid == 0:
606 self.log.warn('ignorning-flow-that-does-not-set-vlanid')
607 else:
608 self.log.warn('set-vlanid', uni_id=uni_port.port_number, set_vlan_vid=_set_vlan_vid)
609 self._add_vlan_filter_task(device, uni_port, _set_vlan_vid)
610
611 except Exception as e:
612 self.log.exception('failed-to-install-flow', e=e, flow=flow)
613
614
615 def _add_vlan_filter_task(self, device, uni_port, _set_vlan_vid):
616 assert uni_port is not None
617
618 def success(_results):
619 self.log.info('vlan-tagging-success', uni_port=uni_port, vlan=_set_vlan_vid)
620 device.reason = 'omci-flows-pushed'
621 self._vlan_filter_task = None
622
623 def failure(_reason):
624 self.log.warn('vlan-tagging-failure', uni_port=uni_port, vlan=_set_vlan_vid)
625 device.reason = 'omci-flows-failed-retrying'
626 self._vlan_filter_task = reactor.callLater(_STARTUP_RETRY_WAIT,
627 self._add_vlan_filter_task, device, uni_port, _set_vlan_vid)
628
629 self.log.info('setting-vlan-tag')
630 self._vlan_filter_task = BrcmVlanFilterTask(self.omci_agent, self.device_id, uni_port, _set_vlan_vid)
631 self._deferred = self._onu_omci_device.task_runner.queue_task(self._vlan_filter_task)
632 self._deferred.addCallbacks(success, failure)
633
634 def get_tx_id(self):
635 self.log.debug('function-entry')
636 self.tx_id += 1
637 return self.tx_id
638
639 # TODO: Actually conform to or create a proper interface.
640 # this and the other functions called from the olt arent very clear.
641 # Called each time there is an onu "up" indication from the olt handler
642 def create_interface(self, data):
643 self.log.debug('function-entry', data=data)
644 self._onu_indication = data
645
646 onu_device = self.adapter_agent.get_device(self.device_id)
647
648 self.log.debug('starting-openomci-statemachine')
649 self._subscribe_to_events()
650 reactor.callLater(1, self._onu_omci_device.start)
651 onu_device.reason = "starting-openomci"
652 self.adapter_agent.update_device(onu_device)
653 self._heartbeat.enabled = True
654
655 # Currently called each time there is an onu "down" indication from the olt handler
656 # TODO: possibly other reasons to "update" from the olt?
657 def update_interface(self, data):
658 self.log.debug('function-entry', data=data)
659 oper_state = data.get('oper_state', None)
660
661 onu_device = self.adapter_agent.get_device(self.device_id)
662
663 if oper_state == 'down':
664 self.log.debug('stopping-openomci-statemachine')
665 reactor.callLater(0, self._onu_omci_device.stop)
666
667 # Let TP download happen again
668 for uni_id in self._tp_service_specific_task:
669 self._tp_service_specific_task[uni_id].clear()
670 for uni_id in self._tech_profile_download_done:
671 self._tech_profile_download_done[uni_id].clear()
672
673 self.disable_ports(onu_device)
674 onu_device.reason = "stopping-openomci"
675 onu_device.connect_status = ConnectStatus.UNREACHABLE
676 onu_device.oper_status = OperStatus.DISCOVERED
677 self.adapter_agent.update_device(onu_device)
678 else:
679 self.log.debug('not-changing-openomci-statemachine')
680
681 # Not currently called by olt or anything else
682 def remove_interface(self, data):
683 self.log.debug('function-entry', data=data)
684
685 onu_device = self.adapter_agent.get_device(self.device_id)
686
687 self.log.debug('stopping-openomci-statemachine')
688 reactor.callLater(0, self._onu_omci_device.stop)
689
690 # Let TP download happen again
691 for uni_id in self._tp_service_specific_task:
692 self._tp_service_specific_task[uni_id].clear()
693 for uni_id in self._tech_profile_download_done:
694 self._tech_profile_download_done[uni_id].clear()
695
696 self.disable_ports(onu_device)
697 onu_device.reason = "stopping-openomci"
698 self.adapter_agent.update_device(onu_device)
699
700 # TODO: im sure there is more to do here
701
702 # Not currently called. Would be called presumably from the olt handler
703 def remove_gemport(self, data):
704 self.log.debug('remove-gemport', data=data)
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500705 device = self.adapter_agent.get_device(self.device_id)
706 if device.connect_status != ConnectStatus.REACHABLE:
707 self.log.error('device-unreachable')
708 return
709
710 # Not currently called. Would be called presumably from the olt handler
711 def remove_tcont(self, tcont_data, traffic_descriptor_data):
712 self.log.debug('remove-tcont', tcont_data=tcont_data, traffic_descriptor_data=traffic_descriptor_data)
713 device = self.adapter_agent.get_device(self.device_id)
714 if device.connect_status != ConnectStatus.REACHABLE:
715 self.log.error('device-unreachable')
716 return
717
718 # TODO: Create some omci task that encompases this what intended
719
720 # Not currently called. Would be called presumably from the olt handler
721 def create_multicast_gemport(self, data):
722 self.log.debug('function-entry', data=data)
723
724 # TODO: create objects and populate for later omci calls
725
726 def disable(self, device):
727 self.log.debug('function-entry', device=device)
728 try:
729 self.log.info('sending-uni-lock-towards-device', device=device)
730
731 def stop_anyway(reason):
732 # proceed with disable regardless if we could reach the onu. for example onu is unplugged
733 self.log.debug('stopping-openomci-statemachine')
734 reactor.callLater(0, self._onu_omci_device.stop)
735
736 # Let TP download happen again
737 for uni_id in self._tp_service_specific_task:
738 self._tp_service_specific_task[uni_id].clear()
739 for uni_id in self._tech_profile_download_done:
740 self._tech_profile_download_done[uni_id].clear()
741
742 self.disable_ports(device)
743 device.oper_status = OperStatus.UNKNOWN
744 device.reason = "omci-admin-lock"
745 self.adapter_agent.update_device(device)
746
747 # lock all the unis
748 task = BrcmUniLockTask(self.omci_agent, self.device_id, lock=True)
749 self._deferred = self._onu_omci_device.task_runner.queue_task(task)
750 self._deferred.addCallbacks(stop_anyway, stop_anyway)
751 except Exception as e:
752 log.exception('exception-in-onu-disable', exception=e)
753
754 def reenable(self, device):
755 self.log.debug('function-entry', device=device)
756 try:
757 # Start up OpenOMCI state machines for this device
758 # this will ultimately resync mib and unlock unis on successful redownloading the mib
759 self.log.debug('restarting-openomci-statemachine')
760 self._subscribe_to_events()
761 device.reason = "restarting-openomci"
762 self.adapter_agent.update_device(device)
763 reactor.callLater(1, self._onu_omci_device.start)
764 self._heartbeat.enabled = True
765 except Exception as e:
766 log.exception('exception-in-onu-reenable', exception=e)
767
768 def reboot(self):
769 self.log.info('reboot-device')
770 device = self.adapter_agent.get_device(self.device_id)
771 if device.connect_status != ConnectStatus.REACHABLE:
772 self.log.error("device-unreachable")
773 return
774
775 def success(_results):
776 self.log.info('reboot-success', _results=_results)
777 self.disable_ports(device)
778 device.connect_status = ConnectStatus.UNREACHABLE
779 device.oper_status = OperStatus.DISCOVERED
780 device.reason = "rebooting"
781 self.adapter_agent.update_device(device)
782
783 def failure(_reason):
784 self.log.info('reboot-failure', _reason=_reason)
785
786 self._deferred = self._onu_omci_device.reboot()
787 self._deferred.addCallbacks(success, failure)
788
789 def disable_ports(self, onu_device):
790 self.log.info('disable-ports', device_id=self.device_id,
791 onu_device=onu_device)
792
793 # Disable all ports on that device
794 self.adapter_agent.disable_all_ports(self.device_id)
795
796 parent_device = self.adapter_agent.get_device(onu_device.parent_id)
797 assert parent_device
798 logical_device_id = parent_device.parent_id
799 assert logical_device_id
800 ports = self.adapter_agent.get_ports(onu_device.id, Port.ETHERNET_UNI)
801 for port in ports:
802 port_id = 'uni-{}'.format(port.port_no)
803 # TODO: move to UniPort
804 self.update_logical_port(logical_device_id, port_id, OFPPS_LINK_DOWN)
805
806 def enable_ports(self, onu_device):
807 self.log.info('enable-ports', device_id=self.device_id, onu_device=onu_device)
808
809 # Disable all ports on that device
810 self.adapter_agent.enable_all_ports(self.device_id)
811
812 parent_device = self.adapter_agent.get_device(onu_device.parent_id)
813 assert parent_device
814 logical_device_id = parent_device.parent_id
815 assert logical_device_id
816 ports = self.adapter_agent.get_ports(onu_device.id, Port.ETHERNET_UNI)
817 for port in ports:
818 port_id = 'uni-{}'.format(port.port_no)
819 # TODO: move to UniPort
820 self.update_logical_port(logical_device_id, port_id, OFPPS_LIVE)
821
822 # Called just before openomci state machine is started. These listen for events from selected state machines,
823 # most importantly, mib in sync. Which ultimately leads to downloading the mib
824 def _subscribe_to_events(self):
825 self.log.debug('function-entry')
826
827 # OMCI MIB Database sync status
828 bus = self._onu_omci_device.event_bus
829 topic = OnuDeviceEntry.event_bus_topic(self.device_id,
830 OnuDeviceEvents.MibDatabaseSyncEvent)
831 self._in_sync_subscription = bus.subscribe(topic, self.in_sync_handler)
832
833 # OMCI Capabilities
834 bus = self._onu_omci_device.event_bus
835 topic = OnuDeviceEntry.event_bus_topic(self.device_id,
836 OnuDeviceEvents.OmciCapabilitiesEvent)
837 self._capabilities_subscription = bus.subscribe(topic, self.capabilties_handler)
838
839 # Called when the mib is in sync
840 def in_sync_handler(self, _topic, msg):
841 self.log.debug('function-entry', _topic=_topic, msg=msg)
842 if self._in_sync_subscription is not None:
843 try:
844 in_sync = msg[IN_SYNC_KEY]
845
846 if in_sync:
847 # Only call this once
848 bus = self._onu_omci_device.event_bus
849 bus.unsubscribe(self._in_sync_subscription)
850 self._in_sync_subscription = None
851
852 # Start up device_info load
853 self.log.debug('running-mib-sync')
854 reactor.callLater(0, self._mib_in_sync)
855
856 except Exception as e:
857 self.log.exception('in-sync', e=e)
858
859 def capabilties_handler(self, _topic, _msg):
860 self.log.debug('function-entry', _topic=_topic, msg=_msg)
861 if self._capabilities_subscription is not None:
862 self.log.debug('capabilities-handler-done')
863
864 # Mib is in sync, we can now query what we learned and actually start pushing ME (download) to the ONU.
865 # Currently uses a basic mib download task that create a bridge with a single gem port and uni, only allowing EAP
866 # Implement your own MibDownloadTask if you wish to setup something different by default
867 def _mib_in_sync(self):
868 self.log.debug('function-entry')
869
870 omci = self._onu_omci_device
871 in_sync = omci.mib_db_in_sync
872
873 device = self.adapter_agent.get_device(self.device_id)
874 device.reason = 'discovery-mibsync-complete'
875 self.adapter_agent.update_device(device)
876
877 if not self._dev_info_loaded:
878 self.log.info('loading-device-data-from-mib', in_sync=in_sync, already_loaded=self._dev_info_loaded)
879
880 omci_dev = self._onu_omci_device
881 config = omci_dev.configuration
882
883 # TODO: run this sooner somehow. shouldnt have to wait for mib sync to push an initial download
884 # In Sync, we can register logical ports now. Ideally this could occur on
885 # the first time we received a successful (no timeout) OMCI Rx response.
886 try:
887
888 # sort the lists so we get consistent port ordering.
889 ani_list = sorted(config.ani_g_entities) if config.ani_g_entities else []
890 uni_list = sorted(config.uni_g_entities) if config.uni_g_entities else []
891 pptp_list = sorted(config.pptp_entities) if config.pptp_entities else []
892 veip_list = sorted(config.veip_entities) if config.veip_entities else []
893
894 if ani_list is None or (pptp_list is None and veip_list is None):
895 device.reason = 'onu-missing-required-elements'
896 self.log.warn("no-ani-or-unis")
897 self.adapter_agent.update_device(device)
898 raise Exception("onu-missing-required-elements")
899
900 # Currently logging the ani, pptp, veip, and uni for information purposes.
901 # Actually act on the veip/pptp as its ME is the most correct one to use in later tasks.
902 # And in some ONU the UNI-G list is incomplete or incorrect...
903 for entity_id in ani_list:
904 ani_value = config.ani_g_entities[entity_id]
905 self.log.debug("discovered-ani", entity_id=entity_id, value=ani_value)
906 # TODO: currently only one OLT PON port/ANI, so this works out. With NGPON there will be 2..?
907 self._total_tcont_count = ani_value.get('total-tcont-count')
908 self.log.debug("set-total-tcont-count", tcont_count=self._total_tcont_count)
909
910 for entity_id in uni_list:
911 uni_value = config.uni_g_entities[entity_id]
912 self.log.debug("discovered-uni", entity_id=entity_id, value=uni_value)
913
914 uni_entities = OrderedDict()
915 for entity_id in pptp_list:
916 pptp_value = config.pptp_entities[entity_id]
917 self.log.debug("discovered-pptp", entity_id=entity_id, value=pptp_value)
918 uni_entities[entity_id] = UniType.PPTP
919
920 for entity_id in veip_list:
921 veip_value = config.veip_entities[entity_id]
922 self.log.debug("discovered-veip", entity_id=entity_id, value=veip_value)
923 uni_entities[entity_id] = UniType.VEIP
924
925 uni_id = 0
926 for entity_id, uni_type in uni_entities.iteritems():
927 try:
928 self._add_uni_port(entity_id, uni_id, uni_type)
929 uni_id += 1
930 except AssertionError as e:
931 self.log.warn("could not add UNI", entity_id=entity_id, uni_type=uni_type, e=e)
932
933 multi_uni = len(self._unis) > 1
934 for uni_port in self._unis.itervalues():
935 uni_port.add_logical_port(uni_port.port_number, multi_uni)
936
937 self.adapter_agent.update_device(device)
938
939 self._qos_flexibility = config.qos_configuration_flexibility or 0
940 self._omcc_version = config.omcc_version or OMCCVersion.Unknown
941
942 if self._unis:
943 self._dev_info_loaded = True
944 else:
945 device.reason = 'no-usable-unis'
946 self.adapter_agent.update_device(device)
947 self.log.warn("no-usable-unis")
948 raise Exception("no-usable-unis")
949
950 except Exception as e:
951 self.log.exception('device-info-load', e=e)
952 self._deferred = reactor.callLater(_STARTUP_RETRY_WAIT, self._mib_in_sync)
953
954 else:
955 self.log.info('device-info-already-loaded', in_sync=in_sync, already_loaded=self._dev_info_loaded)
956
957 if self._dev_info_loaded:
958 if device.admin_state == AdminState.ENABLED:
959 def success(_results):
960 self.log.info('mib-download-success', _results=_results)
961 device = self.adapter_agent.get_device(self.device_id)
962 device.reason = 'initial-mib-downloaded'
963 device.oper_status = OperStatus.ACTIVE
964 device.connect_status = ConnectStatus.REACHABLE
965 self.enable_ports(device)
966 self.adapter_agent.update_device(device)
967 self._mib_download_task = None
968
969 def failure(_reason):
970 self.log.warn('mib-download-failure-retrying', _reason=_reason)
971 device.reason = 'initial-mib-download-failure-retrying'
972 self.adapter_agent.update_device(device)
973 self._deferred = reactor.callLater(_STARTUP_RETRY_WAIT, self._mib_in_sync)
974
975 # Download an initial mib that creates simple bridge that can pass EAP. On success (above) finally set
976 # the device to active/reachable. This then opens up the handler to openflow pushes from outside
977 self.log.info('downloading-initial-mib-configuration')
978 self._mib_download_task = BrcmMibDownloadTask(self.omci_agent, self)
979 self._deferred = self._onu_omci_device.task_runner.queue_task(self._mib_download_task)
980 self._deferred.addCallbacks(success, failure)
981 else:
982 self.log.info('admin-down-disabling')
983 self.disable(device)
984 else:
985 self.log.info('device-info-not-loaded-skipping-mib-download')
986
987
988 def _add_uni_port(self, entity_id, uni_id, uni_type=UniType.PPTP):
989 self.log.debug('function-entry')
990
991 device = self.adapter_agent.get_device(self.device_id)
992 parent_device = self.adapter_agent.get_device(device.parent_id)
993
994 parent_adapter_agent = registry('adapter_loader').get_agent(parent_device.adapter)
995 if parent_adapter_agent is None:
996 self.log.error('parent-adapter-could-not-be-retrieved')
997
998 # TODO: This knowledge is locked away in openolt. and it assumes one onu equals one uni...
999 parent_device = self.adapter_agent.get_device(device.parent_id)
1000 parent_adapter = parent_adapter_agent.adapter.devices[parent_device.id]
1001 uni_no = parent_adapter.platform.mk_uni_port_num(
1002 self._onu_indication.intf_id, self._onu_indication.onu_id, uni_id)
1003
1004 # TODO: Some or parts of this likely need to move to UniPort. especially the format stuff
1005 uni_name = "uni-{}".format(uni_no)
1006
1007 mac_bridge_port_num = uni_id + 1 # TODO +1 is only to test non-zero index
1008
1009 self.log.debug('uni-port-inputs', uni_no=uni_no, uni_id=uni_id, uni_name=uni_name, uni_type=uni_type,
1010 entity_id=entity_id, mac_bridge_port_num=mac_bridge_port_num)
1011
1012 uni_port = UniPort.create(self, uni_name, uni_id, uni_no, uni_name, uni_type)
1013 uni_port.entity_id = entity_id
1014 uni_port.enabled = True
1015 uni_port.mac_bridge_port_num = mac_bridge_port_num
1016
1017 self.log.debug("created-uni-port", uni=uni_port)
1018
1019 self.adapter_agent.add_port(device.id, uni_port.get_port())
1020 parent_adapter_agent.add_port(device.parent_id, uni_port.get_port())
1021
1022 self._unis[uni_port.port_number] = uni_port
1023
1024 self._onu_omci_device.alarm_synchronizer.set_alarm_params(onu_id=self._onu_indication.onu_id,
1025 uni_ports=self._unis.values())
1026 # TODO: this should be in the PonPortclass
1027 pon_port = self._pon.get_port()
1028
1029 # Delete reference to my own UNI as peer from parent.
1030 # TODO why is this here, add_port_reference_to_parent already prunes duplicates
1031 me_as_peer = Port.PeerPort(device_id=device.parent_id, port_no=uni_port.port_number)
1032 partial_pon_port = Port(port_no=pon_port.port_no, label=pon_port.label,
1033 type=pon_port.type, admin_state=pon_port.admin_state,
1034 oper_status=pon_port.oper_status,
1035 peers=[me_as_peer]) # only list myself as a peer to avoid deleting all other UNIs from parent
1036 self.adapter_agent.delete_port_reference_from_parent(self.device_id, partial_pon_port)
1037
1038 pon_port.peers.extend([me_as_peer])
1039
1040 self._pon._port = pon_port
1041
1042 self.adapter_agent.add_port_reference_to_parent(self.device_id,
1043 pon_port)