blob: 648d33eca30592dc51d50ac298b4cc5137947d5c [file] [log] [blame]
Chip Bolingf5af85d2019-02-12 15:36:17 -06001# Copyright 2017-present Adtran, Inc.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import base64
16import binascii
17import json
18import structlog
19from twisted.internet import reactor, defer
20from twisted.internet.defer import inlineCallbacks, returnValue, succeed
21from pyvoltha.common.tech_profile.tech_profile import DEFAULT_TECH_PROFILE_TABLE_ID
22from pyvoltha.protos.device_pb2 import Device
23
24from adtran_olt_handler import AdtranOltHandler
25from adapters.adtran_common.net.adtran_rest import RestInvalidResponseCode
26
27_MAX_EXPEDITE_COUNT = 5
28_EXPEDITE_SECS = 2
29_HW_SYNC_SECS = 60
30
31
32class Onu(object):
33 """
34 Wraps an ONU
35 """
36 DEFAULT_PASSWORD = ''
37
38 def __init__(self, onu_info):
39 self._onu_id = onu_info['onu-id']
40 if self._onu_id is None:
41 raise ValueError('No ONU ID available')
42
43 pon = onu_info['pon']
44 self._olt = pon.olt
45 self._pon_id = pon.pon_id
46 self._name = '{}@{}'.format(pon.physical_port_name, self._onu_id)
47 self.log = structlog.get_logger(pon_id=self._pon_id, onu_id=self._onu_id)
48
49 self._valid = True # Set false during delete/cleanup
50 self._serial_number_base64 = Onu.string_to_serial_number(onu_info['serial-number'])
51 self._serial_number_string = onu_info['serial-number']
52 self._device_id = onu_info['device-id']
53 self._password = onu_info['password']
54 self._created = False
55 self._proxy_address = Device.ProxyAddress(device_id=self.olt.device_id,
56 channel_id=self.olt.pon_id_to_port_number(self._pon_id),
57 onu_id=self._onu_id,
58 onu_session_id=self._onu_id)
59 self._sync_tick = _HW_SYNC_SECS
60 self._expedite_sync = False
61 self._expedite_count = 0
62 self._resync_flows = False
63 self._sync_deferred = None # For sync of ONT config to hardware
64
65 self._gem_ports = {} # gem-id -> GemPort
66 self._tconts = {} # alloc-id -> TCont
67 self._uni_ports = onu_info['uni-ports']
68
69 # Provisionable items
70 self._enabled = onu_info['enabled']
71 self._upstream_fec_enable = onu_info.get('upstream-fec')
72
73 # KPI related items
74 self._rssi = -9999
75 self._equalization_delay = 0
76 self._fiber_length = 0
77 self._timestamp = None # Last time the KPI items were updated
78
79 def __str__(self):
80 return "ONU-{}:{}, SN: {}/{}".format(self._pon_id, self._onu_id,
81 self._serial_number_string, self._serial_number_base64)
82
83 @staticmethod
84 def serial_number_to_string(value):
85 sval = base64.decodestring(value)
86 unique = [elem.encode("hex") for elem in sval[4:8]]
87 return '{}{}{}{}{}'.format(sval[:4], unique[0], unique[1], unique[2], unique[3]).upper()
88
89 @staticmethod
90 def string_to_serial_number(value):
91 bvendor = [octet for octet in value[:4]]
92 bunique = [binascii.a2b_hex(value[offset:offset + 2]) for offset in xrange(4, 12, 2)]
93 bvalue = ''.join(bvendor + bunique)
94 return base64.b64encode(bvalue)
95
96 @property
97 def olt(self):
98 return self._olt
99
100 @property
101 def pon(self):
102 return self.olt.southbound_ports[self._pon_id]
103
104 @property
105 def intf_id(self):
106 return self.pon.intf_id
107
108 @property
109 def pon_id(self):
110 return self._pon_id
111
112 @property
113 def onu_id(self):
114 return self._onu_id
115
116 @property
117 def device_id(self):
118 return self._device_id
119
120 @property
121 def name(self):
122 return self._name
123
124 @property
125 def upstream_fec_enable(self):
126 return self._upstream_fec_enable
127
128 @upstream_fec_enable.setter
129 def upstream_fec_enable(self, value):
130 assert isinstance(value, bool), 'upstream FEC enabled is a boolean'
131 if self._upstream_fec_enable != value:
132 self._upstream_fec_enable = value
133
134 # Recalculate PON upstream FEC
135 self.pon.upstream_fec_enable = self.pon.any_upstream_fec_enabled
136
137 @property
138 def password(self):
139 """
140 Get password. Base 64 format
141 """
142 return self._password
143
144 @password.setter
145 def password(self, value):
146 """
147 Set the password
148 :param value: (str) base 64 encoded value
149 """
150 if self._password is None and value is not None:
151 self._password = value
152 reg_id = (value.decode('base64')).rstrip('\00').lstrip('\00')
153 # Must remove any non-printable characters
154 reg_id = ''.join([i if 127 > ord(i) > 31 else '_' for i in reg_id])
155 # Generate alarm here for regID
156 from voltha.extensions.alarms.onu.onu_active_alarm import OnuActiveAlarm
157 self.log.info('onu-Active-Alarm', serial_number=self._serial_number_string)
158 device = self._olt.adapter_agent.get_device(self._olt.device_id)
159
160 OnuActiveAlarm(self._olt.alarms, self._olt.device_id, self._pon_id,
161 self._serial_number_string, reg_id, device.serial_number,
162 ipv4_address=device.ipv4_address).raise_alarm()
163
164 @property
165 def enabled(self):
166 return self._enabled
167
168 @enabled.setter
169 def enabled(self, value):
170 if self._enabled != value:
171 self._enabled = value
172 self._resync_flows = True
173
174 self.set_config('enable', self._enabled)
175
176 if self._enabled:
177 self.start()
178 else:
179 self.stop()
180
181 # Recalculate PON upstream FEC
182 self.pon.upstream_fec_enable = self.pon.any_upstream_fec_enabled
183
184 @property
185 def uni_ports(self):
186 return self._uni_ports
187
188 @property
189 def logical_port(self):
190 """Return the logical PORT number of this ONU's UNI"""
191 # TODO: once we support multiple UNIs, this needs to be revisited
192 return self._uni_ports[0]
193
194 @property
195 def gem_ports(self):
196 return self._gem_ports.values()
197
198 @property
199 def proxy_address(self):
200 return self._proxy_address
201
202 @property
203 def serial_number_64(self):
204 return self._serial_number_base64
205
206 @property
207 def serial_number(self):
208 return self._serial_number_string
209
210 @property
211 def timestamp(self):
212 # Last time the KPI items were updated
213 return self._timestamp
214
215 @timestamp.setter
216 def timestamp(self, value):
217 self._timestamp = value
218
219 @property
220 def rssi(self):
221 """The received signal strength indication of the ONU"""
222 return self._rssi
223
224 @rssi.setter
225 def rssi(self, value):
226 if self._rssi != value:
227 self._rssi = value
228 # TODO: Notify anyone?
229
230 @property
231 def equalization_delay(self):
232 """Equalization delay (bits)"""
233 return self._equalization_delay
234
235 @equalization_delay.setter
236 def equalization_delay(self, value):
237 if self._equalization_delay != value:
238 self._equalization_delay = value
239 # TODO: Notify anyone?
240
241 @property
242 def fiber_length(self):
243 """Distance to ONU"""
244 return self._fiber_length
245
246 @fiber_length.setter
247 def fiber_length(self, value):
248 if self._fiber_length != value:
249 self._fiber_length = value
250 # TODO: Notify anyone?
251
252 def _cancel_deferred(self):
253 d, self._sync_deferred = self._sync_deferred, None
254
255 if d is not None and not d.called:
256 try:
257 d.cancel()
258 except Exception:
259 pass
260
261 @inlineCallbacks
262 def create(self, reflow=False):
263 """
264 Create (or reflow) this ONU to hardware
265 :param reflow: (boolean) Flag, if True, indicating if this is a reflow ONU
266 information after an unmanaged OLT hardware reboot
267 """
268 self.log.debug('create', reflow=reflow)
269 self._cancel_deferred()
270
271 data = json.dumps({'onu-id': self._onu_id,
272 'serial-number': self._serial_number_base64,
273 'enable': self._enabled})
274 uri = AdtranOltHandler.GPON_ONU_CONFIG_LIST_URI.format(self._pon_id)
275 name = 'onu-create-{}-{}-{}: {}'.format(self._pon_id, self._onu_id,
276 self._serial_number_base64, self._enabled)
277
278 first_sync = self._sync_tick if self._created else 5
279
280 if not self._created or reflow:
281 try:
282 yield self.olt.rest_client.request('POST', uri, data=data, name=name)
283 self._created = True
284
285 except Exception as e:
286 self.log.exception('onu-create', e=e)
287 # See if it failed due to already being configured
288 url = AdtranOltHandler.GPON_ONU_CONFIG_URI.format(self._pon_id, self._onu_id)
289 url += '/serial-number'
290
291 try:
292 results = yield self.olt.rest_client.request('GET', uri, name=name)
293 self.log.debug('onu-create-check', results=results)
294 if len(results) == 1 and results[0].get('serial-number', '') != self._serial_number_base64:
295 self._created = True
296
297 except Exception as _e:
298 self.log.warn('onu-exists-check', pon_id=self.pon_id, onu_id=self.onu_id,
299 serial_number=self.serial_number)
300
301 self._sync_deferred = reactor.callLater(first_sync, self._sync_hardware)
302
303 # Recalculate PON upstream FEC
304 self.pon.upstream_fec_enable = self.pon.any_upstream_fec_enabled
305 returnValue('created')
306
307 @inlineCallbacks
308 def delete(self):
309 """
310 Clean up ONU (gems/tconts). ONU removal from OLT h/w done by PonPort
311 :return: (deferred)
312 """
313 self._valid = False
314 self._cancel_deferred()
315
316 # Remove from H/W
317 gem_ids = self._gem_ports.keys()
318 alloc_ids = self._tconts.keys()
319
320 dl = []
321 for gem_id in gem_ids:
322 dl.append(self.remove_gem_id(gem_id))
323
324 try:
325 yield defer.gatherResults(dl, consumeErrors=True)
326 except Exception as _e:
327 pass
328
329 dl = []
330 for alloc_id in alloc_ids:
331 dl.append(self.remove_tcont(alloc_id))
332
333 try:
334 yield defer.gatherResults(dl, consumeErrors=True)
335 except Exception as _e:
336 pass
337
338 self._gem_ports.clear()
339 self._tconts.clear()
340 olt, self._olt = self._olt, None
341
342 uri = AdtranOltHandler.GPON_ONU_CONFIG_URI.format(self._pon_id, self._onu_id)
343 name = 'onu-delete-{}-{}-{}: {}'.format(self._pon_id, self._onu_id,
344 self._serial_number_base64, self._enabled)
345 try:
346 yield olt.rest_client.request('DELETE', uri, name=name)
347
348 except RestInvalidResponseCode as e:
349 if e.code != 404:
350 self.log.exception('onu-delete', e=e)
351
352 except Exception as e:
353 self.log.exception('onu-delete', e=e)
354
355 # Release resource manager resources for this ONU
356 pon_intf_id_onu_id = (self.pon_id, self.onu_id)
357 olt.resource_mgr.free_pon_resources_for_onu(pon_intf_id_onu_id)
358
359 returnValue('deleted')
360
361 def start(self):
362 self._cancel_deferred()
363 self._sync_deferred = reactor.callLater(0, self._sync_hardware)
364
365 def stop(self):
366 self._cancel_deferred()
367
368 def restart(self):
369 if not self._valid:
370 return succeed('Deleting')
371
372 self._cancel_deferred()
373 self._sync_deferred = reactor.callLater(0, self._sync_hardware)
374
375 return self.create()
376
377 def _sync_hardware(self):
378 from codec.olt_config import OltConfig
379 self.log.debug('sync-hardware')
380
381 def read_config(results):
382 self.log.debug('read-config', results=results)
383
384 dl = []
385
386 try:
387 config = OltConfig.Pon.Onu.decode([results])
388 assert self.onu_id in config, 'sync-onu-not-found-{}'.format(self.onu_id)
389 config = config[self.onu_id]
390
391 if self._enabled != config.enable:
392 dl.append(self.set_config('enable', self._enabled))
393
394 if self.serial_number_64 != config.serial_number_64:
395 dl.append(self.set_config('serial-number', self.serial_number_64))
396
397 if self._enabled:
398 # Sync TCONTs if everything else in sync
399 if len(dl) == 0:
400 dl.extend(sync_tconts(config.tconts))
401
402 # Sync GEM Ports if everything else in sync
403
404 if len(dl) == 0:
405 dl.extend(sync_gem_ports(config.gem_ports))
406
407 if len(dl) == 0:
408 sync_flows()
409
410 except Exception as e:
411 self.log.exception('hw-sync-read-config', e=e)
412
413 # Run h/w sync again a bit faster if we had to sync anything
414 self._expedite_sync = len(dl) > 0
415
416 # TODO: do checks
417 return defer.gatherResults(dl, consumeErrors=True)
418
419 def sync_tconts(hw_tconts):
420 hw_alloc_ids = frozenset(hw_tconts.iterkeys())
421 my_alloc_ids = frozenset(self._tconts.iterkeys())
422 dl = []
423
424 try:
425 extra_alloc_ids = hw_alloc_ids - my_alloc_ids
426 dl.extend(sync_delete_extra_tconts(extra_alloc_ids))
427
428 missing_alloc_ids = my_alloc_ids - hw_alloc_ids
429 dl.extend(sync_add_missing_tconts(missing_alloc_ids))
430
431 matching_alloc_ids = my_alloc_ids & hw_alloc_ids
432 matching_hw_tconts = {alloc_id: tcont
433 for alloc_id, tcont in hw_tconts.iteritems()
434 if alloc_id in matching_alloc_ids}
435 dl.extend(sync_matching_tconts(matching_hw_tconts))
436
437 except Exception as e2:
438 self.log.exception('hw-sync-tconts', e=e2)
439
440 return dl
441
442 def sync_delete_extra_tconts(alloc_ids):
443 return [self.remove_tcont(alloc_id) for alloc_id in alloc_ids]
444
445 def sync_add_missing_tconts(alloc_ids):
446 return [self.add_tcont(self._tconts[alloc_id], reflow=True) for alloc_id in alloc_ids]
447
448 def sync_matching_tconts(hw_tconts):
449 from xpon.traffic_descriptor import TrafficDescriptor
450
451 dl = []
452 # TODO: sync TD & Best Effort. Only other TCONT leaf is the key
453 for alloc_id, hw_tcont in hw_tconts.iteritems():
454 my_tcont = self._tconts[alloc_id]
455 my_td = my_tcont.traffic_descriptor
456 hw_td = hw_tcont.traffic_descriptor
457 if my_td is None:
458 continue
459
460 my_additional = TrafficDescriptor.AdditionalBwEligibility.\
461 to_string(my_td.additional_bandwidth_eligibility)
462
463 reflow = hw_td is None or \
464 my_td.fixed_bandwidth != hw_td.fixed_bandwidth or \
465 my_td.assured_bandwidth != hw_td.assured_bandwidth or \
466 my_td.maximum_bandwidth != hw_td.maximum_bandwidth or \
467 my_additional != hw_td.additional_bandwidth_eligibility
468
469 if not reflow and \
470 my_td.additional_bandwidth_eligibility == \
471 TrafficDescriptor.AdditionalBwEligibility.BEST_EFFORT_SHARING and \
472 my_td.best_effort is not None:
473
474 hw_be = hw_td.best_effort
475 my_be = my_td.best_effort
476
477 reflow = hw_be is None or \
478 my_be.bandwidth != hw_be.bandwidth or \
479 my_be.priority != hw_be.priority or \
480 my_be.weight != hw_be.weight
481
482 if reflow:
483 dl.append(my_tcont.add_to_hardware(self.olt.rest_client))
484 return dl
485
486 def sync_gem_ports(hw_gem_ports):
487 hw_gems_ids = frozenset(hw_gem_ports.iterkeys())
488 my_gems_ids = frozenset(self._gem_ports.iterkeys())
489 dl = []
490
491 try:
492 extra_gems_ids = hw_gems_ids - my_gems_ids
493 dl.extend(sync_delete_extra_gem_ports(extra_gems_ids))
494
495 missing_gem_ids = my_gems_ids - hw_gems_ids
496 dl.extend(sync_add_missing_gem_ports(missing_gem_ids))
497
498 matching_gem_ids = my_gems_ids & hw_gems_ids
499 matching_hw_gem_ports = {gem_id: gem_port
500 for gem_id, gem_port in hw_gem_ports.iteritems()
501 if gem_id in matching_gem_ids}
502
503 dl.extend(sync_matching_gem_ports(matching_hw_gem_ports))
504 self._resync_flows |= len(dl) > 0
505
506 except Exception as ex:
507 self.log.exception('hw-sync-gem-ports', e=ex)
508
509 return dl
510
511 def sync_delete_extra_gem_ports(gem_ids):
512 return [self.remove_gem_id(gem_id) for gem_id in gem_ids]
513
514 def sync_add_missing_gem_ports(gem_ids):
515 return [self.add_gem_port(self._gem_ports[gem_id], reflow=True)
516 for gem_id in gem_ids]
517
518 def sync_matching_gem_ports(hw_gem_ports):
519 dl = []
520 for gem_id, hw_gem_port in hw_gem_ports.iteritems():
521 gem_port = self._gem_ports[gem_id]
522
523 if gem_port.alloc_id != hw_gem_port.alloc_id or\
524 gem_port.encryption != hw_gem_port.encryption or\
525 gem_port.omci_transport != hw_gem_port.omci_transport:
526 dl.append(gem_port.add_to_hardware(self.olt.rest_client,
527 operation='PATCH'))
528 return dl
529
530 def sync_flows():
531 from flow.flow_entry import FlowEntry
532
533 reflow, self._resync_flows = self._resync_flows, False
534 return FlowEntry.sync_flows_by_onu(self, reflow=reflow)
535
536 def failure(_reason):
537 # self.log.error('hardware-sync-get-config-failed', reason=_reason)
538 pass
539
540 def reschedule(_):
541 import random
542 delay = self._sync_tick if self._enabled else 5 * self._sync_tick
543
544 # Speed up sequential resync a limited number of times if out of sync
545 # With 60 second initial an typical worst case resync of 4 times, this
546 # should resync an ONU and all it's gem-ports and tconts within <90 seconds
547 if self._expedite_sync and self._enabled:
548 self._expedite_count += 1
549 if self._expedite_count < _MAX_EXPEDITE_COUNT:
550 delay = _EXPEDITE_SECS
551 else:
552 self._expedite_count = 0
553
554 delay += random.uniform(-delay / 10, delay / 10)
555 self._sync_deferred = reactor.callLater(delay, self._sync_hardware)
556 self._expedite_sync = False
557
558 # If PON is not enabled, skip hw-sync. If ONU not enabled, do it but less
559 # frequently
560 if not self.pon.enabled:
561 return reschedule('not-enabled')
562
563 try:
564 self._sync_deferred = self._get_config()
565 self._sync_deferred.addCallbacks(read_config, failure)
566 self._sync_deferred.addBoth(reschedule)
567
568 except Exception as e:
569 self.log.exception('hw-sync-main', e=e)
570 return reschedule('sync-exception')
571
572 def _get_config(self):
573 uri = AdtranOltHandler.GPON_ONU_CONFIG_URI.format(self._pon_id, self.onu_id)
574 name = 'pon-get-onu_config-{}-{}'.format(self._pon_id, self.onu_id)
575 return self.olt.rest_client.request('GET', uri, name=name)
576
577 def set_config(self, leaf, value):
578 self.log.debug('set-config', leaf=leaf, value=value)
579 data = json.dumps({leaf: value})
580 uri = AdtranOltHandler.GPON_ONU_CONFIG_URI.format(self._pon_id, self._onu_id)
581 name = 'onu-set-config-{}-{}-{}: {}'.format(self._pon_id, self._onu_id, leaf, value)
582 return self.olt.rest_client.request('PATCH', uri, data=data, name=name)
583
584 @property
585 def alloc_ids(self):
586 """
587 Get alloc-id's of all T-CONTs
588 """
589 return frozenset(self._tconts.keys())
590
591 @inlineCallbacks
592 def add_tcont(self, tcont, reflow=False):
593 """
594 Creates/ a T-CONT with the given alloc-id
595
596 :param tcont: (TCont) Object that maintains the TCONT properties
597 :param reflow: (boolean) If true, force add (used during h/w resync)
598 :return: (deferred)
599 """
600 if not self._valid:
601 returnValue('Deleting')
602
603 if not reflow and tcont.alloc_id in self._tconts:
604 returnValue('already created')
605
606 self.log.info('add', tcont=tcont, reflow=reflow)
607 self._tconts[tcont.alloc_id] = tcont
608
609 try:
610 results = yield tcont.add_to_hardware(self.olt.rest_client)
611
612 except Exception as e:
613 self.log.exception('tcont', tcont=tcont, reflow=reflow, e=e)
614 results = 'resync needed'
615
616 returnValue(results)
617
618 @inlineCallbacks
619 def remove_tcont(self, alloc_id):
620 tcont = self._tconts.get(alloc_id)
621
622 if tcont is None:
623 returnValue('nop')
624
625 del self._tconts[alloc_id]
626 try:
627 results = yield tcont.remove_from_hardware(self.olt.rest_client)
628
629 except RestInvalidResponseCode as e:
630 results = None
631 if e.code != 404:
632 self.log.exception('tcont-delete', e=e)
633
634 except Exception as e:
635 self.log.exception('delete', e=e)
636 raise
637
638 returnValue(results)
639
640 def gem_port(self, gem_id):
641 return self._gem_ports.get(gem_id)
642
643 def gem_ids(self, tech_profile_id):
644 """Get all GEM Port IDs used by this ONU"""
645 assert tech_profile_id >= DEFAULT_TECH_PROFILE_TABLE_ID
646 return sorted([gem_id for gem_id, gem in self._gem_ports.items()
647 if not gem.multicast and
648 tech_profile_id == gem.tech_profile_id])
649
650 @inlineCallbacks
651 def add_gem_port(self, gem_port, reflow=False):
652 """
653 Add a GEM Port to this ONU
654
655 :param gem_port: (GemPort) GEM Port to add
656 :param reflow: (boolean) If true, force add (used during h/w resync)
657 :return: (deferred)
658 """
659 if not self._valid:
660 returnValue('Deleting')
661
662 if not reflow and gem_port.gem_id in self._gem_ports:
663 returnValue('nop')
664
665 self.log.info('add', gem_port=gem_port, reflow=reflow)
666 self._gem_ports[gem_port.gem_id] = gem_port
667
668 try:
669 results = yield gem_port.add_to_hardware(self.olt.rest_client)
670
671 except Exception as e:
672 self.log.exception('gem-port', gem_port=gem_port, reflow=reflow, e=e)
673 results = 'resync needed'
674
675 returnValue(results)
676
677 @inlineCallbacks
678 def remove_gem_id(self, gem_id):
679 gem_port = self._gem_ports.get(gem_id)
680
681 if gem_port is None:
682 returnValue('nop')
683
684 del self._gem_ports[gem_id]
685 try:
686 yield gem_port.remove_from_hardware(self.olt.rest_client)
687
688 except RestInvalidResponseCode as e:
689 if e.code != 404:
690 self.log.exception('onu-delete', e=e)
691
692 except Exception as ex:
693 self.log.exception('gem-port-delete', e=ex)
694 raise
695
696 returnValue('done')
697
698 @staticmethod
699 def gem_id_to_gvid(gem_id):
700 """Calculate GEM VID (gvid) for a given GEM port id"""
701 return gem_id - 2048