blob: 4c1806625d95af7cf01ad3f0fb4e05168956818e [file] [log] [blame]
William Kurkian6f436d02019-02-06 16:25:01 -05001#
2# Copyright 2018 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18Openolt adapter.
19"""
20import arrow
21import grpc
22import structlog
23from google.protobuf.empty_pb2 import Empty
24from google.protobuf.json_format import MessageToDict
25from scapy.layers.inet import Raw
26import json
27from google.protobuf.message import Message
28from grpc._channel import _Rendezvous
29from scapy.layers.l2 import Ether, Dot1Q
30from simplejson import dumps
31from twisted.internet import reactor
32from twisted.internet.defer import inlineCallbacks, returnValue
33from twisted.internet.task import LoopingCall
34
William Kurkian44cd7bb2019-02-11 16:39:12 -050035from pyvoltha.adapters.common.frameio.frameio import BpfProgramFilter, hexify
36from pyvoltha.adapters.iadapter import OltAdapter
37from pyvoltha.common.utils.asleep import asleep
38from pyvoltha.common.utils.registry import registry
39from pyvoltha.adapters.kafka.kafka_proxy import get_kafka_proxy
40from pyvoltha.protos import openolt_pb2
41from pyvoltha.protos import third_party
42from pyvoltha.protos.common_pb2 import OperStatus, ConnectStatus
43from pyvoltha.protos.common_pb2 import LogLevel
44from pyvoltha.protos.common_pb2 import OperationResp
45from pyvoltha.protos.inter_container_pb2 import SwitchCapability, PortCapability, \
William Kurkian6f436d02019-02-06 16:25:01 -050046 InterAdapterMessageType, InterAdapterResponseBody
William Kurkian44cd7bb2019-02-11 16:39:12 -050047from pyvoltha.protos.device_pb2 import Port, PmConfig, PmConfigs, \
William Kurkian6f436d02019-02-06 16:25:01 -050048 DeviceType, DeviceTypes
William Kurkian44cd7bb2019-02-11 16:39:12 -050049from pyvoltha.protos.adapter_pb2 import Adapter
50from pyvoltha.protos.adapter_pb2 import AdapterConfig
William Kurkian33508752019-02-12 14:56:06 -050051from openolt_flow_mgr import OpenOltFlowMgr
52from openolt_alarms import OpenOltAlarmMgr
53from openolt_statistics import OpenOltStatisticsMgr
54from openolt_bw import OpenOltBW
55from openolt_platform import OpenOltPlatform
56from openolt_resource_manager import OpenOltResourceMgr
57from openolt_device import OpenoltDevice
William Kurkian6f436d02019-02-06 16:25:01 -050058
William Kurkian44cd7bb2019-02-11 16:39:12 -050059from pyvoltha.protos.events_pb2 import KpiEvent, KpiEventType, MetricValuePairs
60from pyvoltha.protos.logical_device_pb2 import LogicalPort
61from pyvoltha.protos.openflow_13_pb2 import OFPPS_LIVE, OFPPF_FIBER, \
William Kurkian6f436d02019-02-06 16:25:01 -050062 OFPPF_1GB_FD, \
63 OFPC_GROUP_STATS, OFPC_PORT_STATS, OFPC_TABLE_STATS, OFPC_FLOW_STATS, \
64 ofp_switch_features, ofp_desc
William Kurkian44cd7bb2019-02-11 16:39:12 -050065from pyvoltha.protos.openflow_13_pb2 import ofp_port
William Kurkian6f436d02019-02-06 16:25:01 -050066
67_ = third_party
68log = structlog.get_logger()
William Kurkianfefd4642019-02-07 15:30:03 -050069OpenOltDefaults = {
70 'support_classes': {
71 'platform': OpenOltPlatform,
72 'resource_mgr': OpenOltResourceMgr,
73 'flow_mgr': OpenOltFlowMgr,
74 'alarm_mgr': OpenOltAlarmMgr,
75 'stats_mgr': OpenOltStatisticsMgr,
76 'bw_mgr': OpenOltBW
77 }
78}
William Kurkian6f436d02019-02-06 16:25:01 -050079
80class AdapterPmMetrics:
81 def __init__(self, device):
82 self.pm_names = {'tx_64_pkts', 'tx_65_127_pkts', 'tx_128_255_pkts',
83 'tx_256_511_pkts', 'tx_512_1023_pkts',
84 'tx_1024_1518_pkts', 'tx_1519_9k_pkts',
85 'rx_64_pkts', 'rx_65_127_pkts',
86 'rx_128_255_pkts', 'rx_256_511_pkts',
87 'rx_512_1023_pkts', 'rx_1024_1518_pkts',
88 'rx_1519_9k_pkts'}
89 self.device = device
90 self.id = device.id
91 self.name = 'ponsim_olt'
92 self.default_freq = 150
93 self.grouped = False
94 self.freq_override = False
95 self.pon_metrics_config = dict()
96 self.nni_metrics_config = dict()
97 self.lc = None
98 for m in self.pm_names:
99 self.pon_metrics_config[m] = PmConfig(name=m,
100 type=PmConfig.COUNTER,
101 enabled=True)
102 self.nni_metrics_config[m] = PmConfig(name=m,
103 type=PmConfig.COUNTER,
104 enabled=True)
105
106 def update(self, pm_config):
107 if self.default_freq != pm_config.default_freq:
108 # Update the callback to the new frequency.
109 self.default_freq = pm_config.default_freq
110 self.lc.stop()
111 self.lc.start(interval=self.default_freq / 10)
112 for m in pm_config.metrics:
113 self.pon_metrics_config[m.name].enabled = m.enabled
114 self.nni_metrics_config[m.name].enabled = m.enabled
115
116 def make_proto(self):
117 pm_config = PmConfigs(
118 id=self.id,
119 default_freq=self.default_freq,
120 grouped=False,
121 freq_override=False)
122 for m in sorted(self.pon_metrics_config):
123 pm = self.pon_metrics_config[m] # Either will do they're the same
124 pm_config.metrics.extend([PmConfig(name=pm.name,
125 type=pm.type,
126 enabled=pm.enabled)])
127 return pm_config
128
129 def collect_port_metrics(self, channel):
130 rtrn_port_metrics = dict()
131 stub = ponsim_pb2.PonSimStub(channel)
132 stats = stub.GetStats(Empty())
133 rtrn_port_metrics['pon'] = self.extract_pon_metrics(stats)
134 rtrn_port_metrics['nni'] = self.extract_nni_metrics(stats)
135 return rtrn_port_metrics
136
137 def extract_pon_metrics(self, stats):
138 rtrn_pon_metrics = dict()
139 for m in stats.metrics:
140 if m.port_name == "pon":
141 for p in m.packets:
142 if self.pon_metrics_config[p.name].enabled:
143 rtrn_pon_metrics[p.name] = p.value
144 return rtrn_pon_metrics
145
146 def extract_nni_metrics(self, stats):
147 rtrn_pon_metrics = dict()
148 for m in stats.metrics:
149 if m.port_name == "nni":
150 for p in m.packets:
151 if self.pon_metrics_config[p.name].enabled:
152 rtrn_pon_metrics[p.name] = p.value
153 return rtrn_pon_metrics
154
155 def start_collector(self, callback):
156 log.info("starting-pm-collection", device_name=self.name,
157 device_id=self.device.id)
158 prefix = 'voltha.{}.{}'.format(self.name, self.device.id)
159 self.lc = LoopingCall(callback, self.device.id, prefix)
160 self.lc.start(interval=self.default_freq / 10)
161
162 def stop_collector(self):
163 log.info("stopping-pm-collection", device_name=self.name,
164 device_id=self.device.id)
165 self.lc.stop()
166
167
168class AdapterAlarms:
169 def __init__(self, adapter, device):
170 self.adapter = adapter
171 self.device = device
172 self.lc = None
173
174 # TODO: Implement code to send to kafka cluster directly instead of
175 # going through the voltha core.
176 def send_alarm(self, context_data, alarm_data):
177 log.debug("send-alarm-not-implemented")
178 return
179
180
181
182class OpenoltAdapter(OltAdapter):
183 name = 'openolt'
184
185 supported_device_types = [
186 DeviceType(
187 id=name,
188 adapter=name,
189 accepts_bulk_flow_update=True,
190 accepts_add_remove_flow_updates=True
191 )
192 ]
193
194 # System Init Methods #
195 def __init__(self, core_proxy, adapter_proxy, config):
196 super(OpenoltAdapter, self).__init__(core_proxy=core_proxy,
197 adapter_proxy=adapter_proxy,
198 config=config,
199 device_handler_class=OpenoltHandler,
200 name='openolt',
201 vendor='Voltha project',
202 version='0.4',
203 device_type='openolt',
204 accepts_bulk_flow_update=True,
205 accepts_add_remove_flow_updates=False)
206 self.adapter_proxy = adapter_proxy
207 self.core_proxy = core_proxy
208 self.config = config
209 self.descriptor = Adapter(
210 id=self.name,
211 vendor='OLT white box vendor',
212 version='0.1',
213 config=config
214 )
215 log.debug('openolt.__init__', adapter_proxy=adapter_proxy)
216 self.devices = dict() # device_id -> OpenoltDevice()
217 self.interface = registry('main').get_args().interface
218 self.logical_device_id_to_root_device_id = dict()
219 self.num_devices = 0
220
221 def start(self):
222 log.info('started', interface=self.interface)
223
224 def stop(self):
225 log.info('stopped', interface=self.interface)
226
227
228 # Info Methods #
229 def adapter_descriptor(self):
230 log.debug('get descriptor', interface=self.interface)
231 return self.descriptor
232
233 def device_types(self):
234 log.debug('get device_types', interface=self.interface,
235 items=self.supported_device_types)
236 return DeviceTypes(items=self.supported_device_types)
237
238 def health(self):
239 log.debug('get health', interface=self.interface)
240 raise NotImplementedError()
241
242 def get_device_details(self, device):
243 log.debug('get_device_details', device=device)
244 raise NotImplementedError()
245
246
247 # Device Operation Methods #
248 def change_master_state(self, master):
249 log.debug('change_master_state', interface=self.interface,
250 master=master)
251 raise NotImplementedError()
252
253 def abandon_device(self, device):
254 log.info('abandon-device', device=device)
255 raise NotImplementedError()
256
257
258 # Configuration Methods #
259 def update_flows_incrementally(self, device, flow_changes, group_changes):
260 log.debug('update_flows_incrementally', device=device,
261 flow_changes=flow_changes, group_changes=group_changes)
262 log.info('This device does not allow this, therefore it is Not '
263 'implemented')
264 raise NotImplementedError()
265
266 def update_pm_config(self, device, pm_configs):
267 log.info('update_pm_config - Not implemented yet', device=device,
268 pm_configs=pm_configs)
269 raise NotImplementedError()
270
271 def receive_proxied_message(self, proxy_address, msg):
272 log.debug('receive_proxied_message - Not implemented',
273 proxy_address=proxy_address,
274 proxied_msg=msg)
275 raise NotImplementedError()
276
277 def receive_inter_adapter_message(self, msg):
278 log.info('rx_inter_adapter_msg - Not implemented')
279 raise NotImplementedError()
280
281
282 # Image Operations Methods #
283 def download_image(self, device, request):
284 log.info('image_download - Not implemented yet', device=device,
285 request=request)
286 raise NotImplementedError()
287
288 def get_image_download_status(self, device, request):
289 log.info('get_image_download - Not implemented yet', device=device,
290 request=request)
291 raise NotImplementedError()
292
293 def cancel_image_download(self, device, request):
294 log.info('cancel_image_download - Not implemented yet', device=device)
295 raise NotImplementedError()
296
297 def activate_image_update(self, device, request):
298 log.info('activate_image_update - Not implemented yet',
299 device=device, request=request)
300 raise NotImplementedError()
301
302 def revert_image_update(self, device, request):
303 log.info('revert_image_update - Not implemented yet',
304 device=device, request=request)
305 raise NotImplementedError()
306
307 def self_test_device(self, device):
William Kurkian6f436d02019-02-06 16:25:01 -0500308 log.info('Not implemented yet')
309 raise NotImplementedError()
310
311
312 # PON Operations Methods #
313 def create_interface(self, device, data):
314 log.debug('create-interface - Not implemented - We do not use this',
315 data=data)
316 raise NotImplementedError()
317
318 def update_interface(self, device, data):
319 log.debug('update-interface - Not implemented - We do not use this',
320 data=data)
321 raise NotImplementedError()
322
323 def remove_interface(self, device, data):
324 log.debug('remove-interface - Not implemented - We do not use this',
325 data=data)
326 raise NotImplementedError()
327
328 def receive_onu_detect_state(self, proxy_address, state):
329 log.debug('receive-onu-detect-state - Not implemented - We do not '
330 'use this', proxy_address=proxy_address,
331 state=state)
332 raise NotImplementedError()
333
334 def create_tcont(self, device, tcont_data, traffic_descriptor_data):
335 log.info('create-tcont - Not implemented - We do not use this',
336 tcont_data=tcont_data,
337 traffic_descriptor_data=traffic_descriptor_data)
338 raise NotImplementedError()
339
340 def update_tcont(self, device, tcont_data, traffic_descriptor_data):
341 log.info('update-tcont - Not implemented - We do not use this',
342 tcont_data=tcont_data,
343 traffic_descriptor_data=traffic_descriptor_data)
344 raise NotImplementedError()
345
346 def remove_tcont(self, device, tcont_data, traffic_descriptor_data):
347 log.info('remove-tcont - Not implemented - We do not use this',
348 tcont_data=tcont_data,
349 traffic_descriptor_data=traffic_descriptor_data)
350 raise NotImplementedError()
351
352 def create_gemport(self, device, data):
353 log.info('create-gemport - Not implemented - We do not use this',
354 data=data)
355 raise NotImplementedError()
356
357 def update_gemport(self, device, data):
358 log.info('update-gemport - Not implemented - We do not use this',
359 data=data)
360 raise NotImplementedError()
361
362 def remove_gemport(self, device, data):
363 log.info('remove-gemport - Not implemented - We do not use this',
364 data=data)
365 raise NotImplementedError()
366
367 def create_multicast_gemport(self, device, data):
368 log.info('create-mcast-gemport - Not implemented - We do not use '
369 'this', data=data)
370 raise NotImplementedError()
371
372 def update_multicast_gemport(self, device, data):
373 log.info('update-mcast-gemport - Not implemented - We do not use '
374 'this', data=data)
375 raise NotImplementedError()
376
377 def remove_multicast_gemport(self, device, data):
378 log.info('remove-mcast-gemport - Not implemented - We do not use '
379 'this', data=data)
380 raise NotImplementedError()
381
382 def create_multicast_distribution_set(self, device, data):
383 log.info('create-mcast-distribution-set - Not implemented - We do '
384 'not use this', data=data)
385 raise NotImplementedError()
386
387 def update_multicast_distribution_set(self, device, data):
388 log.info('update-mcast-distribution-set - Not implemented - We do '
389 'not use this', data=data)
390 raise NotImplementedError()
391
392 def remove_multicast_distribution_set(self, device, data):
393 log.info('remove-mcast-distribution-set - Not implemented - We do '
394 'not use this', data=data)
395 raise NotImplementedError()
396
397
398 # Alarm Methods #
399 def suppress_alarm(self, filter):
400 log.info('suppress_alarm - Not implemented yet', filter=filter)
401 raise NotImplementedError()
402
403 def unsuppress_alarm(self, filter):
404 log.info('unsuppress_alarm - Not implemented yet', filter=filter)
405 raise NotImplementedError()
406
407class OpenoltHandler(object):
408 def __init__(self, adapter, device_id):
409 self.adapter = adapter
410 self.core_proxy = adapter.core_proxy
411 self.adapter_proxy = adapter.adapter_proxy
412 self.device_id = device_id
413 self.log = structlog.get_logger(device_id=device_id)
414 self.channel = None
415 self.io_port = None
416 self.logical_device_id = None
417 self.nni_port = None
418 self.ofp_port_no = None
419 self.interface = registry('main').get_args().interface
420 self.pm_metrics = None
421 self.alarms = None
422 self.frames = None
William Kurkianfefd4642019-02-07 15:30:03 -0500423 self.num_devices = 0
William Kurkian6f436d02019-02-06 16:25:01 -0500424
425 @inlineCallbacks
426 def get_channel(self):
427 if self.channel is None:
428 try:
429 device = yield self.core_proxy.get_device(self.device_id)
430 self.log.info('device-info', device=device,
431 host_port=device.host_and_port)
432 self.channel = grpc.insecure_channel(device.host_and_port)
433 except Exception as e:
434 log.exception("ponsim-connection-failure", e=e)
435
436 # returnValue(self.channel)
437
438 def close_channel(self):
439 if self.channel is None:
440 self.log.info('grpc-channel-already-closed')
441 return
442 else:
443 if self.frames is not None:
444 self.frames.cancel()
445 self.frames = None
446 self.log.info('cancelled-grpc-frame-stream')
447
448 self.channel.unsubscribe(lambda *args: None)
449 self.channel = None
450
451 self.log.info('grpc-channel-closed')
452
453 @inlineCallbacks
454 def _get_nni_port(self):
455 ports = yield self.core_proxy.get_ports(self.device_id,
456 Port.ETHERNET_NNI)
457 returnValue(ports)
William Kurkianfefd4642019-02-07 15:30:03 -0500458
459 def init_device(self, kwargs):
460 self.device = OpenoltDevice(**kwargs)
William Kurkian6f436d02019-02-06 16:25:01 -0500461
462 @inlineCallbacks
463 def activate(self, device):
464 try:
465 self.log.info('activating')
William Kurkian6f436d02019-02-06 16:25:01 -0500466 if not device.host_and_port:
467 device.oper_status = OperStatus.FAILED
468 device.reason = 'No host_and_port field provided'
469 self.core_proxy.device_update(device)
470 return
William Kurkianfefd4642019-02-07 15:30:03 -0500471
William Kurkian6f436d02019-02-06 16:25:01 -0500472 kwargs = {
473 'support_classes': OpenOltDefaults['support_classes'],
William Kurkianfefd4642019-02-07 15:30:03 -0500474 'adapter_agent': self.core_proxy,
William Kurkian6f436d02019-02-06 16:25:01 -0500475 'device': device,
476 'device_num': self.num_devices + 1
477 }
478 try:
William Kurkianfefd4642019-02-07 15:30:03 -0500479 yield self.init_device(kwargs)
William Kurkian6f436d02019-02-06 16:25:01 -0500480 except Exception as e:
481 log.error('Failed to adopt OpenOLT device', error=e)
482 # TODO set status to ERROR so that is clear something went wrong
William Kurkianfefd4642019-02-07 15:30:03 -0500483 #del self.devices[device.id]
William Kurkian6f436d02019-02-06 16:25:01 -0500484 raise
485 else:
486 self.num_devices += 1
487
488 """
489 yield self.get_channel()
490 stub = PonSimStub(self.channel)
491 info = stub.GetDeviceInfo(Empty())
492 log.info('got-info', info=info, device_id=device.id)
493 self.ofp_port_no = info.nni_port
494
495 device.root = True
496 device.vendor = 'ponsim'
497 device.model = 'n/a'
498 device.serial_number = device.host_and_port
499 device.mac_address = "AA:BB:CC:DD:EE:FF"
500 yield self.core_proxy.device_update(device)
501
502 # Now set the initial PM configuration for this device
503 self.pm_metrics = AdapterPmMetrics(device)
504 pm_config = self.pm_metrics.make_proto()
505 log.info("initial-pm-config", pm_config=pm_config)
506 self.core_proxy.device_pm_config_update(pm_config, init=True)
507
508 # Setup alarm handler
509 self.alarms = AdapterAlarms(self.adapter, device)
510
511 nni_port = Port(
512 port_no=info.nni_port,
513 label='NNI facing Ethernet port',
514 type=Port.ETHERNET_NNI,
515 oper_status=OperStatus.ACTIVE
516 )
517 self.nni_port = nni_port
518 yield self.core_proxy.port_created(device.id, nni_port)
519 yield self.core_proxy.port_created(device.id, Port(
520 port_no=1,
521 label='PON port',
522 type=Port.PON_OLT,
523 oper_status=OperStatus.ACTIVE
524 ))
525
526 yield self.core_proxy.device_state_update(device.id,
527 connect_status=ConnectStatus.REACHABLE,
528 oper_status=OperStatus.ACTIVE)
529
530 # register ONUS
531 self.log.info('onu-found', onus=info.onus, len=len(info.onus))
532 for onu in info.onus:
533 vlan_id = onu.uni_port
534 yield self.core_proxy.child_device_detected(
535 parent_device_id=device.id,
536 parent_port_no=1,
537 child_device_type='ponsim_onu',
538 channel_id=vlan_id,
539 )
540
541 self.log.info('starting-frame-grpc-stream')
542 reactor.callInThread(self.rcv_grpc)
543 self.log.info('started-frame-grpc-stream')
544
545 # Start collecting stats from the device after a brief pause
546 self.start_kpi_collection(device.id)
William Kurkianfefd4642019-02-07 15:30:03 -0500547 """
William Kurkian6f436d02019-02-06 16:25:01 -0500548 except Exception as e:
549 log.exception("Exception-activating", e=e)
550
551 def get_ofp_device_info(self, device):
552 return SwitchCapability(
553 desc=ofp_desc(
554 hw_desc='ponsim pon',
555 sw_desc='ponsim pon',
556 serial_num=device.serial_number,
557 dp_desc='n/a'
558 ),
559 switch_features=ofp_switch_features(
560 n_buffers=256, # TODO fake for now
561 n_tables=2, # TODO ditto
562 capabilities=( # TODO and ditto
563 OFPC_FLOW_STATS
564 | OFPC_TABLE_STATS
565 | OFPC_PORT_STATS
566 | OFPC_GROUP_STATS
567 )
568 )
569 )
570
571 def get_ofp_port_info(self, device, port_no):
572 # Since the adapter created the device port then it has the reference of the port to
573 # return the capability. TODO: Do a lookup on the NNI port number and return the
574 # appropriate attributes
575 self.log.info('get_ofp_port_info', port_no=port_no,
576 info=self.ofp_port_no, device_id=device.id)
577 cap = OFPPF_1GB_FD | OFPPF_FIBER
578 return PortCapability(
579 port=LogicalPort(
580 ofp_port=ofp_port(
581 hw_addr=mac_str_to_tuple(
582 'AA:BB:CC:DD:EE:%02x' % port_no),
583 config=0,
584 state=OFPPS_LIVE,
585 curr=cap,
586 advertised=cap,
587 peer=cap,
588 curr_speed=OFPPF_1GB_FD,
589 max_speed=OFPPF_1GB_FD
590 ),
591 device_id=device.id,
592 device_port_no=port_no
593 )
594 )
595
596 # TODO - change for core 2.0
597 def reconcile(self, device):
598 self.log.info('reconciling-OLT-device')
599
600 @inlineCallbacks
601 def _rcv_frame(self, frame):
602 pkt = Ether(frame)
603
604 if pkt.haslayer(Dot1Q):
605 outer_shim = pkt.getlayer(Dot1Q)
606
607 if isinstance(outer_shim.payload, Dot1Q):
608 inner_shim = outer_shim.payload
609 cvid = inner_shim.vlan
610 popped_frame = (
611 Ether(src=pkt.src, dst=pkt.dst, type=inner_shim.type) /
612 inner_shim.payload
613 )
614 self.log.info('sending-packet-in',device_id=self.device_id, port=cvid)
615 yield self.core_proxy.send_packet_in(device_id=self.device_id,
616 port=cvid,
617 packet=str(popped_frame))
618 elif pkt.haslayer(Raw):
619 raw_data = json.loads(pkt.getlayer(Raw).load)
620 self.alarms.send_alarm(self, raw_data)
621
622 @inlineCallbacks
623 def rcv_grpc(self):
624 """
625 This call establishes a GRPC stream to receive frames.
626 """
627 yield self.get_channel()
628 stub = PonSimStub(self.channel)
629 # stub = PonSimStub(self.get_channel())
630
631 # Attempt to establish a grpc stream with the remote ponsim service
632 self.frames = stub.ReceiveFrames(Empty())
633
634 self.log.info('start-receiving-grpc-frames')
635
636 try:
637 for frame in self.frames:
638 self.log.info('received-grpc-frame',
639 frame_len=len(frame.payload))
640 yield self._rcv_frame(frame.payload)
641
642 except _Rendezvous, e:
643 log.warn('grpc-connection-lost', message=e.message)
644
645 self.log.info('stopped-receiving-grpc-frames')
646
647 @inlineCallbacks
648 def update_flow_table(self, flows):
649 yield self.get_channel()
650 stub = PonSimStub(self.channel)
651
652 self.log.info('pushing-olt-flow-table')
653 stub.UpdateFlowTable(FlowTable(
654 port=0,
655 flows=flows
656 ))
657 self.log.info('success')
658
659 def remove_from_flow_table(self, flows):
660 self.log.debug('remove-from-flow-table', flows=flows)
661 # TODO: Update PONSIM code to accept incremental flow changes
662 # Once completed, the accepts_add_remove_flow_updates for this
663 # device type can be set to True
664
665 def add_to_flow_table(self, flows):
666 self.log.debug('add-to-flow-table', flows=flows)
667 # TODO: Update PONSIM code to accept incremental flow changes
668 # Once completed, the accepts_add_remove_flow_updates for this
669 # device type can be set to True
670
671 def update_pm_config(self, device, pm_config):
672 log.info("handler-update-pm-config", device=device,
673 pm_config=pm_config)
674 self.pm_metrics.update(pm_config)
675
676 def send_proxied_message(self, proxy_address, msg):
677 self.log.info('sending-proxied-message')
678 if isinstance(msg, FlowTable):
679 stub = PonSimStub(self.get_channel())
680 self.log.info('pushing-onu-flow-table', port=msg.port)
681 res = stub.UpdateFlowTable(msg)
682 self.core_proxy.receive_proxied_message(proxy_address, res)
683
684 @inlineCallbacks
685 def process_inter_adapter_message(self, request):
686 self.log.info('process-inter-adapter-message', msg=request)
687 try:
688 if request.header.type == InterAdapterMessageType.FLOW_REQUEST:
689 f = FlowTable()
690 if request.body:
691 request.body.Unpack(f)
692 stub = PonSimStub(self.channel)
693 self.log.info('pushing-onu-flow-table')
694 res = stub.UpdateFlowTable(f)
695 # Send response back
696 reply = InterAdapterResponseBody()
697 reply.status = True
698 self.log.info('sending-response-back', reply=reply)
699 yield self.adapter_proxy.send_inter_adapter_message(
700 msg=reply,
701 type=InterAdapterMessageType.FLOW_RESPONSE,
702 from_adapter=self.adapter.name,
703 to_adapter=request.header.from_topic,
704 to_device_id=request.header.to_device_id,
705 message_id=request.header.id
706 )
707 elif request.header.type == InterAdapterMessageType.METRICS_REQUEST:
708 m = PonSimMetricsRequest()
709 if request.body:
710 request.body.Unpack(m)
711 stub = PonSimStub(self.channel)
712 self.log.info('proxying onu stats request', port=m.port)
713 res = stub.GetStats(m)
714 # Send response back
715 reply = InterAdapterResponseBody()
716 reply.status = True
717 reply.body.Pack(res)
718 self.log.info('sending-response-back', reply=reply)
719 yield self.adapter_proxy.send_inter_adapter_message(
720 msg=reply,
721 type=InterAdapterMessageType.METRICS_RESPONSE,
722 from_adapter=self.adapter.name,
723 to_adapter=request.header.from_topic,
724 to_device_id=request.header.to_device_id,
725 message_id=request.header.id
726 )
727 except Exception as e:
728 self.log.exception("error-processing-inter-adapter-message", e=e)
729
730 def packet_out(self, egress_port, msg):
731 self.log.info('sending-packet-out', egress_port=egress_port,
732 msg=hexify(msg))
733 try:
734 pkt = Ether(msg)
735 out_pkt = pkt
736 if egress_port != self.nni_port.port_no:
737 # don't do the vlan manipulation for the NNI port, vlans are already correct
738 out_pkt = (
739 Ether(src=pkt.src, dst=pkt.dst) /
740 Dot1Q(vlan=egress_port, type=pkt.type) /
741 pkt.payload
742 )
743
744 # TODO need better way of mapping logical ports to PON ports
745 out_port = self.nni_port.port_no if egress_port == self.nni_port.port_no else 1
746
747 # send over grpc stream
748 stub = PonSimStub(self.channel)
749 frame = PonSimFrame(id=self.device_id, payload=str(out_pkt),
750 out_port=out_port)
751 stub.SendFrame(frame)
752 except Exception as e:
753 self.log.exception("error-processing-packet-out", e=e)
754
755
756 @inlineCallbacks
757 def reboot(self):
758 self.log.info('rebooting', device_id=self.device_id)
759
760 yield self.core_proxy.device_state_update(self.device_id,
761 connect_status=ConnectStatus.UNREACHABLE)
762
763 # Update the child devices connect state to UNREACHABLE
764 yield self.core_proxy.children_state_update(self.device_id,
765 connect_status=ConnectStatus.UNREACHABLE)
766
767 # Sleep 10 secs, simulating a reboot
768 # TODO: send alert and clear alert after the reboot
769 yield asleep(10)
770
771 # Change the connection status back to REACHABLE. With a
772 # real OLT the connection state must be the actual state
773 yield self.core_proxy.device_state_update(self.device_id,
774 connect_status=ConnectStatus.REACHABLE)
775
776 # Update the child devices connect state to REACHABLE
777 yield self.core_proxy.children_state_update(self.device_id,
778 connect_status=ConnectStatus.REACHABLE)
779
780 self.log.info('rebooted', device_id=self.device_id)
781
782 def self_test_device(self, device):
783 """
784 This is called to Self a device based on a NBI call.
785 :param device: A Voltha.Device object.
786 :return: Will return result of self test
787 """
788 log.info('self-test-device', device=device.id)
789 raise NotImplementedError()
790
791 @inlineCallbacks
792 def disable(self):
793 self.log.info('disabling', device_id=self.device_id)
794
795 self.stop_kpi_collection()
796
797 # Update the operational status to UNKNOWN and connection status to UNREACHABLE
798 yield self.core_proxy.device_state_update(self.device_id,
799 oper_status=OperStatus.UNKNOWN,
800 connect_status=ConnectStatus.UNREACHABLE)
801
802 self.close_channel()
803 self.log.info('disabled-grpc-channel')
804
805 self.stop_kpi_collection()
806
807 # TODO:
808 # 1) Remove all flows from the device
809 # 2) Remove the device from ponsim
810
811 self.log.info('disabled', device_id=self.device_id)
812
813 @inlineCallbacks
814 def reenable(self):
815 self.log.info('re-enabling', device_id=self.device_id)
816
817 # Set the ofp_port_no and nni_port in case we bypassed the reconcile
818 # process if the device was in DISABLED state on voltha restart
819 if not self.ofp_port_no and not self.nni_port:
820 yield self.get_channel()
821 stub = PonSimStub(self.channel)
822 info = stub.GetDeviceInfo(Empty())
823 log.info('got-info', info=info)
824 self.ofp_port_no = info.nni_port
825 ports = yield self._get_nni_port()
826 # For ponsim, we are using only 1 NNI port
827 if ports.items:
828 self.nni_port = ports.items[0]
829
830 # Update the state of the NNI port
831 yield self.core_proxy.port_state_update(self.device_id,
832 port_type=Port.ETHERNET_NNI,
833 port_no=self.ofp_port_no,
834 oper_status=OperStatus.ACTIVE)
835
836 # Update the state of the PON port
837 yield self.core_proxy.port_state_update(self.device_id,
838 port_type=Port.PON_OLT,
839 port_no=1,
840 oper_status=OperStatus.ACTIVE)
841
842 # Set the operational state of the device to ACTIVE and connect status to REACHABLE
843 yield self.core_proxy.device_state_update(self.device_id,
844 connect_status=ConnectStatus.REACHABLE,
845 oper_status=OperStatus.ACTIVE)
846
847 # TODO: establish frame grpc-stream
848 # yield reactor.callInThread(self.rcv_grpc)
849
850 self.start_kpi_collection(self.device_id)
851
852 self.log.info('re-enabled', device_id=self.device_id)
853
854 def delete(self):
855 self.log.info('deleting', device_id=self.device_id)
856
857 self.close_channel()
858 self.log.info('disabled-grpc-channel')
859
860 # TODO:
861 # 1) Remove all flows from the device
862 # 2) Remove the device from ponsim
863
864 self.log.info('deleted', device_id=self.device_id)
865
866 def start_kpi_collection(self, device_id):
867
868 kafka_cluster_proxy = get_kafka_proxy()
869
870 def _collect(device_id, prefix):
871
872 try:
873 # Step 1: gather metrics from device
874 port_metrics = \
875 self.pm_metrics.collect_port_metrics(self.channel)
876
877 # Step 2: prepare the KpiEvent for submission
878 # we can time-stamp them here (or could use time derived from OLT
879 ts = arrow.utcnow().timestamp
880 kpi_event = KpiEvent(
881 type=KpiEventType.slice,
882 ts=ts,
883 prefixes={
884 # OLT NNI port
885 prefix + '.nni': MetricValuePairs(
886 metrics=port_metrics['nni']),
887 # OLT PON port
888 prefix + '.pon': MetricValuePairs(
889 metrics=port_metrics['pon'])
890 }
891 )
892
893 # Step 3: submit directly to the kafka bus
894 if kafka_cluster_proxy:
895 if isinstance(kpi_event, Message):
896 kpi_event = dumps(MessageToDict(kpi_event, True, True))
897 kafka_cluster_proxy.send_message("voltha.kpis", kpi_event)
898
899 except Exception as e:
900 log.exception('failed-to-submit-kpis', e=e)
901
902 self.pm_metrics.start_collector(_collect)
903
904 def stop_kpi_collection(self):
905 self.pm_metrics.stop_collector()