blob: aaeff9fae58c871ff9d24eae761facdbd1e94480 [file] [log] [blame]
William Kurkian6f436d02019-02-06 16:25:01 -05001#
2# Copyright 2018 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18Openolt adapter.
19"""
20import arrow
21import grpc
22import structlog
23from google.protobuf.empty_pb2 import Empty
24from google.protobuf.json_format import MessageToDict
25from scapy.layers.inet import Raw
26import json
27from google.protobuf.message import Message
28from grpc._channel import _Rendezvous
29from scapy.layers.l2 import Ether, Dot1Q
30from simplejson import dumps
31from twisted.internet import reactor
32from twisted.internet.defer import inlineCallbacks, returnValue
33from twisted.internet.task import LoopingCall
34
William Kurkian44cd7bb2019-02-11 16:39:12 -050035from pyvoltha.adapters.common.frameio.frameio import BpfProgramFilter, hexify
36from pyvoltha.adapters.iadapter import OltAdapter
37from pyvoltha.common.utils.asleep import asleep
38from pyvoltha.common.utils.registry import registry
39from pyvoltha.adapters.kafka.kafka_proxy import get_kafka_proxy
40from pyvoltha.protos import openolt_pb2
41from pyvoltha.protos import third_party
42from pyvoltha.protos.common_pb2 import OperStatus, ConnectStatus
43from pyvoltha.protos.common_pb2 import LogLevel
44from pyvoltha.protos.common_pb2 import OperationResp
45from pyvoltha.protos.inter_container_pb2 import SwitchCapability, PortCapability, \
William Kurkian6f436d02019-02-06 16:25:01 -050046 InterAdapterMessageType, InterAdapterResponseBody
William Kurkian44cd7bb2019-02-11 16:39:12 -050047from pyvoltha.protos.device_pb2 import Port, PmConfig, PmConfigs, \
William Kurkian6f436d02019-02-06 16:25:01 -050048 DeviceType, DeviceTypes
William Kurkian44cd7bb2019-02-11 16:39:12 -050049from pyvoltha.protos.adapter_pb2 import Adapter
50from pyvoltha.protos.adapter_pb2 import AdapterConfig
William Kurkianfefd4642019-02-07 15:30:03 -050051from voltha.adapters.openolt.openolt_flow_mgr import OpenOltFlowMgr
52from voltha.adapters.openolt.openolt_alarms import OpenOltAlarmMgr
53from voltha.adapters.openolt.openolt_statistics import OpenOltStatisticsMgr
54from voltha.adapters.openolt.openolt_bw import OpenOltBW
55from voltha.adapters.openolt.openolt_platform import OpenOltPlatform
56from voltha.adapters.openolt.openolt_resource_manager import OpenOltResourceMgr
57from voltha.adapters.openolt.openolt_device import OpenoltDevice
William Kurkian6f436d02019-02-06 16:25:01 -050058
William Kurkian44cd7bb2019-02-11 16:39:12 -050059from pyvoltha.protos.events_pb2 import KpiEvent, KpiEventType, MetricValuePairs
60from pyvoltha.protos.logical_device_pb2 import LogicalPort
61from pyvoltha.protos.openflow_13_pb2 import OFPPS_LIVE, OFPPF_FIBER, \
William Kurkian6f436d02019-02-06 16:25:01 -050062 OFPPF_1GB_FD, \
63 OFPC_GROUP_STATS, OFPC_PORT_STATS, OFPC_TABLE_STATS, OFPC_FLOW_STATS, \
64 ofp_switch_features, ofp_desc
William Kurkian44cd7bb2019-02-11 16:39:12 -050065from pyvoltha.protos.openflow_13_pb2 import ofp_port
William Kurkian6f436d02019-02-06 16:25:01 -050066
67_ = third_party
68log = structlog.get_logger()
William Kurkianfefd4642019-02-07 15:30:03 -050069OpenOltDefaults = {
70 'support_classes': {
71 'platform': OpenOltPlatform,
72 'resource_mgr': OpenOltResourceMgr,
73 'flow_mgr': OpenOltFlowMgr,
74 'alarm_mgr': OpenOltAlarmMgr,
75 'stats_mgr': OpenOltStatisticsMgr,
76 'bw_mgr': OpenOltBW
77 }
78}
William Kurkian6f436d02019-02-06 16:25:01 -050079
80class AdapterPmMetrics:
81 def __init__(self, device):
82 self.pm_names = {'tx_64_pkts', 'tx_65_127_pkts', 'tx_128_255_pkts',
83 'tx_256_511_pkts', 'tx_512_1023_pkts',
84 'tx_1024_1518_pkts', 'tx_1519_9k_pkts',
85 'rx_64_pkts', 'rx_65_127_pkts',
86 'rx_128_255_pkts', 'rx_256_511_pkts',
87 'rx_512_1023_pkts', 'rx_1024_1518_pkts',
88 'rx_1519_9k_pkts'}
89 self.device = device
90 self.id = device.id
91 self.name = 'ponsim_olt'
92 self.default_freq = 150
93 self.grouped = False
94 self.freq_override = False
95 self.pon_metrics_config = dict()
96 self.nni_metrics_config = dict()
97 self.lc = None
98 for m in self.pm_names:
99 self.pon_metrics_config[m] = PmConfig(name=m,
100 type=PmConfig.COUNTER,
101 enabled=True)
102 self.nni_metrics_config[m] = PmConfig(name=m,
103 type=PmConfig.COUNTER,
104 enabled=True)
105
106 def update(self, pm_config):
107 if self.default_freq != pm_config.default_freq:
108 # Update the callback to the new frequency.
109 self.default_freq = pm_config.default_freq
110 self.lc.stop()
111 self.lc.start(interval=self.default_freq / 10)
112 for m in pm_config.metrics:
113 self.pon_metrics_config[m.name].enabled = m.enabled
114 self.nni_metrics_config[m.name].enabled = m.enabled
115
116 def make_proto(self):
117 pm_config = PmConfigs(
118 id=self.id,
119 default_freq=self.default_freq,
120 grouped=False,
121 freq_override=False)
122 for m in sorted(self.pon_metrics_config):
123 pm = self.pon_metrics_config[m] # Either will do they're the same
124 pm_config.metrics.extend([PmConfig(name=pm.name,
125 type=pm.type,
126 enabled=pm.enabled)])
127 return pm_config
128
129 def collect_port_metrics(self, channel):
130 rtrn_port_metrics = dict()
131 stub = ponsim_pb2.PonSimStub(channel)
132 stats = stub.GetStats(Empty())
133 rtrn_port_metrics['pon'] = self.extract_pon_metrics(stats)
134 rtrn_port_metrics['nni'] = self.extract_nni_metrics(stats)
135 return rtrn_port_metrics
136
137 def extract_pon_metrics(self, stats):
138 rtrn_pon_metrics = dict()
139 for m in stats.metrics:
140 if m.port_name == "pon":
141 for p in m.packets:
142 if self.pon_metrics_config[p.name].enabled:
143 rtrn_pon_metrics[p.name] = p.value
144 return rtrn_pon_metrics
145
146 def extract_nni_metrics(self, stats):
147 rtrn_pon_metrics = dict()
148 for m in stats.metrics:
149 if m.port_name == "nni":
150 for p in m.packets:
151 if self.pon_metrics_config[p.name].enabled:
152 rtrn_pon_metrics[p.name] = p.value
153 return rtrn_pon_metrics
154
155 def start_collector(self, callback):
156 log.info("starting-pm-collection", device_name=self.name,
157 device_id=self.device.id)
158 prefix = 'voltha.{}.{}'.format(self.name, self.device.id)
159 self.lc = LoopingCall(callback, self.device.id, prefix)
160 self.lc.start(interval=self.default_freq / 10)
161
162 def stop_collector(self):
163 log.info("stopping-pm-collection", device_name=self.name,
164 device_id=self.device.id)
165 self.lc.stop()
166
167
168class AdapterAlarms:
169 def __init__(self, adapter, device):
170 self.adapter = adapter
171 self.device = device
172 self.lc = None
173
174 # TODO: Implement code to send to kafka cluster directly instead of
175 # going through the voltha core.
176 def send_alarm(self, context_data, alarm_data):
177 log.debug("send-alarm-not-implemented")
178 return
179
180
181
182class OpenoltAdapter(OltAdapter):
183 name = 'openolt'
184
185 supported_device_types = [
186 DeviceType(
187 id=name,
188 adapter=name,
189 accepts_bulk_flow_update=True,
190 accepts_add_remove_flow_updates=True
191 )
192 ]
193
194 # System Init Methods #
195 def __init__(self, core_proxy, adapter_proxy, config):
196 super(OpenoltAdapter, self).__init__(core_proxy=core_proxy,
197 adapter_proxy=adapter_proxy,
198 config=config,
199 device_handler_class=OpenoltHandler,
200 name='openolt',
201 vendor='Voltha project',
202 version='0.4',
203 device_type='openolt',
204 accepts_bulk_flow_update=True,
205 accepts_add_remove_flow_updates=False)
206 self.adapter_proxy = adapter_proxy
207 self.core_proxy = core_proxy
208 self.config = config
209 self.descriptor = Adapter(
210 id=self.name,
211 vendor='OLT white box vendor',
212 version='0.1',
213 config=config
214 )
215 log.debug('openolt.__init__', adapter_proxy=adapter_proxy)
216 self.devices = dict() # device_id -> OpenoltDevice()
217 self.interface = registry('main').get_args().interface
218 self.logical_device_id_to_root_device_id = dict()
219 self.num_devices = 0
220
221 def start(self):
222 log.info('started', interface=self.interface)
223
224 def stop(self):
225 log.info('stopped', interface=self.interface)
226
227
228 # Info Methods #
229 def adapter_descriptor(self):
230 log.debug('get descriptor', interface=self.interface)
231 return self.descriptor
232
233 def device_types(self):
234 log.debug('get device_types', interface=self.interface,
235 items=self.supported_device_types)
236 return DeviceTypes(items=self.supported_device_types)
237
238 def health(self):
239 log.debug('get health', interface=self.interface)
240 raise NotImplementedError()
241
242 def get_device_details(self, device):
243 log.debug('get_device_details', device=device)
244 raise NotImplementedError()
245
246
247 # Device Operation Methods #
248 def change_master_state(self, master):
249 log.debug('change_master_state', interface=self.interface,
250 master=master)
251 raise NotImplementedError()
252
253 def abandon_device(self, device):
254 log.info('abandon-device', device=device)
255 raise NotImplementedError()
256
257
258 # Configuration Methods #
259 def update_flows_incrementally(self, device, flow_changes, group_changes):
260 log.debug('update_flows_incrementally', device=device,
261 flow_changes=flow_changes, group_changes=group_changes)
262 log.info('This device does not allow this, therefore it is Not '
263 'implemented')
264 raise NotImplementedError()
265
266 def update_pm_config(self, device, pm_configs):
267 log.info('update_pm_config - Not implemented yet', device=device,
268 pm_configs=pm_configs)
269 raise NotImplementedError()
270
271 def receive_proxied_message(self, proxy_address, msg):
272 log.debug('receive_proxied_message - Not implemented',
273 proxy_address=proxy_address,
274 proxied_msg=msg)
275 raise NotImplementedError()
276
277 def receive_inter_adapter_message(self, msg):
278 log.info('rx_inter_adapter_msg - Not implemented')
279 raise NotImplementedError()
280
281
282 # Image Operations Methods #
283 def download_image(self, device, request):
284 log.info('image_download - Not implemented yet', device=device,
285 request=request)
286 raise NotImplementedError()
287
288 def get_image_download_status(self, device, request):
289 log.info('get_image_download - Not implemented yet', device=device,
290 request=request)
291 raise NotImplementedError()
292
293 def cancel_image_download(self, device, request):
294 log.info('cancel_image_download - Not implemented yet', device=device)
295 raise NotImplementedError()
296
297 def activate_image_update(self, device, request):
298 log.info('activate_image_update - Not implemented yet',
299 device=device, request=request)
300 raise NotImplementedError()
301
302 def revert_image_update(self, device, request):
303 log.info('revert_image_update - Not implemented yet',
304 device=device, request=request)
305 raise NotImplementedError()
306
307 def self_test_device(self, device):
308 # from voltha.protos.voltha_pb2 import SelfTestResponse
309 log.info('Not implemented yet')
310 raise NotImplementedError()
311
312
313 # PON Operations Methods #
314 def create_interface(self, device, data):
315 log.debug('create-interface - Not implemented - We do not use this',
316 data=data)
317 raise NotImplementedError()
318
319 def update_interface(self, device, data):
320 log.debug('update-interface - Not implemented - We do not use this',
321 data=data)
322 raise NotImplementedError()
323
324 def remove_interface(self, device, data):
325 log.debug('remove-interface - Not implemented - We do not use this',
326 data=data)
327 raise NotImplementedError()
328
329 def receive_onu_detect_state(self, proxy_address, state):
330 log.debug('receive-onu-detect-state - Not implemented - We do not '
331 'use this', proxy_address=proxy_address,
332 state=state)
333 raise NotImplementedError()
334
335 def create_tcont(self, device, tcont_data, traffic_descriptor_data):
336 log.info('create-tcont - Not implemented - We do not use this',
337 tcont_data=tcont_data,
338 traffic_descriptor_data=traffic_descriptor_data)
339 raise NotImplementedError()
340
341 def update_tcont(self, device, tcont_data, traffic_descriptor_data):
342 log.info('update-tcont - Not implemented - We do not use this',
343 tcont_data=tcont_data,
344 traffic_descriptor_data=traffic_descriptor_data)
345 raise NotImplementedError()
346
347 def remove_tcont(self, device, tcont_data, traffic_descriptor_data):
348 log.info('remove-tcont - Not implemented - We do not use this',
349 tcont_data=tcont_data,
350 traffic_descriptor_data=traffic_descriptor_data)
351 raise NotImplementedError()
352
353 def create_gemport(self, device, data):
354 log.info('create-gemport - Not implemented - We do not use this',
355 data=data)
356 raise NotImplementedError()
357
358 def update_gemport(self, device, data):
359 log.info('update-gemport - Not implemented - We do not use this',
360 data=data)
361 raise NotImplementedError()
362
363 def remove_gemport(self, device, data):
364 log.info('remove-gemport - Not implemented - We do not use this',
365 data=data)
366 raise NotImplementedError()
367
368 def create_multicast_gemport(self, device, data):
369 log.info('create-mcast-gemport - Not implemented - We do not use '
370 'this', data=data)
371 raise NotImplementedError()
372
373 def update_multicast_gemport(self, device, data):
374 log.info('update-mcast-gemport - Not implemented - We do not use '
375 'this', data=data)
376 raise NotImplementedError()
377
378 def remove_multicast_gemport(self, device, data):
379 log.info('remove-mcast-gemport - Not implemented - We do not use '
380 'this', data=data)
381 raise NotImplementedError()
382
383 def create_multicast_distribution_set(self, device, data):
384 log.info('create-mcast-distribution-set - Not implemented - We do '
385 'not use this', data=data)
386 raise NotImplementedError()
387
388 def update_multicast_distribution_set(self, device, data):
389 log.info('update-mcast-distribution-set - Not implemented - We do '
390 'not use this', data=data)
391 raise NotImplementedError()
392
393 def remove_multicast_distribution_set(self, device, data):
394 log.info('remove-mcast-distribution-set - Not implemented - We do '
395 'not use this', data=data)
396 raise NotImplementedError()
397
398
399 # Alarm Methods #
400 def suppress_alarm(self, filter):
401 log.info('suppress_alarm - Not implemented yet', filter=filter)
402 raise NotImplementedError()
403
404 def unsuppress_alarm(self, filter):
405 log.info('unsuppress_alarm - Not implemented yet', filter=filter)
406 raise NotImplementedError()
407
408class OpenoltHandler(object):
409 def __init__(self, adapter, device_id):
410 self.adapter = adapter
411 self.core_proxy = adapter.core_proxy
412 self.adapter_proxy = adapter.adapter_proxy
413 self.device_id = device_id
414 self.log = structlog.get_logger(device_id=device_id)
415 self.channel = None
416 self.io_port = None
417 self.logical_device_id = None
418 self.nni_port = None
419 self.ofp_port_no = None
420 self.interface = registry('main').get_args().interface
421 self.pm_metrics = None
422 self.alarms = None
423 self.frames = None
William Kurkianfefd4642019-02-07 15:30:03 -0500424 self.num_devices = 0
William Kurkian6f436d02019-02-06 16:25:01 -0500425
426 @inlineCallbacks
427 def get_channel(self):
428 if self.channel is None:
429 try:
430 device = yield self.core_proxy.get_device(self.device_id)
431 self.log.info('device-info', device=device,
432 host_port=device.host_and_port)
433 self.channel = grpc.insecure_channel(device.host_and_port)
434 except Exception as e:
435 log.exception("ponsim-connection-failure", e=e)
436
437 # returnValue(self.channel)
438
439 def close_channel(self):
440 if self.channel is None:
441 self.log.info('grpc-channel-already-closed')
442 return
443 else:
444 if self.frames is not None:
445 self.frames.cancel()
446 self.frames = None
447 self.log.info('cancelled-grpc-frame-stream')
448
449 self.channel.unsubscribe(lambda *args: None)
450 self.channel = None
451
452 self.log.info('grpc-channel-closed')
453
454 @inlineCallbacks
455 def _get_nni_port(self):
456 ports = yield self.core_proxy.get_ports(self.device_id,
457 Port.ETHERNET_NNI)
458 returnValue(ports)
William Kurkianfefd4642019-02-07 15:30:03 -0500459
460 def init_device(self, kwargs):
461 self.device = OpenoltDevice(**kwargs)
William Kurkian6f436d02019-02-06 16:25:01 -0500462
463 @inlineCallbacks
464 def activate(self, device):
465 try:
466 self.log.info('activating')
William Kurkian6f436d02019-02-06 16:25:01 -0500467 if not device.host_and_port:
468 device.oper_status = OperStatus.FAILED
469 device.reason = 'No host_and_port field provided'
470 self.core_proxy.device_update(device)
471 return
William Kurkianfefd4642019-02-07 15:30:03 -0500472
William Kurkian6f436d02019-02-06 16:25:01 -0500473 kwargs = {
474 'support_classes': OpenOltDefaults['support_classes'],
William Kurkianfefd4642019-02-07 15:30:03 -0500475 'adapter_agent': self.core_proxy,
William Kurkian6f436d02019-02-06 16:25:01 -0500476 'device': device,
477 'device_num': self.num_devices + 1
478 }
479 try:
William Kurkianfefd4642019-02-07 15:30:03 -0500480 yield self.init_device(kwargs)
William Kurkian6f436d02019-02-06 16:25:01 -0500481 except Exception as e:
482 log.error('Failed to adopt OpenOLT device', error=e)
483 # TODO set status to ERROR so that is clear something went wrong
William Kurkianfefd4642019-02-07 15:30:03 -0500484 #del self.devices[device.id]
William Kurkian6f436d02019-02-06 16:25:01 -0500485 raise
486 else:
487 self.num_devices += 1
488
489 """
490 yield self.get_channel()
491 stub = PonSimStub(self.channel)
492 info = stub.GetDeviceInfo(Empty())
493 log.info('got-info', info=info, device_id=device.id)
494 self.ofp_port_no = info.nni_port
495
496 device.root = True
497 device.vendor = 'ponsim'
498 device.model = 'n/a'
499 device.serial_number = device.host_and_port
500 device.mac_address = "AA:BB:CC:DD:EE:FF"
501 yield self.core_proxy.device_update(device)
502
503 # Now set the initial PM configuration for this device
504 self.pm_metrics = AdapterPmMetrics(device)
505 pm_config = self.pm_metrics.make_proto()
506 log.info("initial-pm-config", pm_config=pm_config)
507 self.core_proxy.device_pm_config_update(pm_config, init=True)
508
509 # Setup alarm handler
510 self.alarms = AdapterAlarms(self.adapter, device)
511
512 nni_port = Port(
513 port_no=info.nni_port,
514 label='NNI facing Ethernet port',
515 type=Port.ETHERNET_NNI,
516 oper_status=OperStatus.ACTIVE
517 )
518 self.nni_port = nni_port
519 yield self.core_proxy.port_created(device.id, nni_port)
520 yield self.core_proxy.port_created(device.id, Port(
521 port_no=1,
522 label='PON port',
523 type=Port.PON_OLT,
524 oper_status=OperStatus.ACTIVE
525 ))
526
527 yield self.core_proxy.device_state_update(device.id,
528 connect_status=ConnectStatus.REACHABLE,
529 oper_status=OperStatus.ACTIVE)
530
531 # register ONUS
532 self.log.info('onu-found', onus=info.onus, len=len(info.onus))
533 for onu in info.onus:
534 vlan_id = onu.uni_port
535 yield self.core_proxy.child_device_detected(
536 parent_device_id=device.id,
537 parent_port_no=1,
538 child_device_type='ponsim_onu',
539 channel_id=vlan_id,
540 )
541
542 self.log.info('starting-frame-grpc-stream')
543 reactor.callInThread(self.rcv_grpc)
544 self.log.info('started-frame-grpc-stream')
545
546 # Start collecting stats from the device after a brief pause
547 self.start_kpi_collection(device.id)
William Kurkianfefd4642019-02-07 15:30:03 -0500548 """
William Kurkian6f436d02019-02-06 16:25:01 -0500549 except Exception as e:
550 log.exception("Exception-activating", e=e)
551
552 def get_ofp_device_info(self, device):
553 return SwitchCapability(
554 desc=ofp_desc(
555 hw_desc='ponsim pon',
556 sw_desc='ponsim pon',
557 serial_num=device.serial_number,
558 dp_desc='n/a'
559 ),
560 switch_features=ofp_switch_features(
561 n_buffers=256, # TODO fake for now
562 n_tables=2, # TODO ditto
563 capabilities=( # TODO and ditto
564 OFPC_FLOW_STATS
565 | OFPC_TABLE_STATS
566 | OFPC_PORT_STATS
567 | OFPC_GROUP_STATS
568 )
569 )
570 )
571
572 def get_ofp_port_info(self, device, port_no):
573 # Since the adapter created the device port then it has the reference of the port to
574 # return the capability. TODO: Do a lookup on the NNI port number and return the
575 # appropriate attributes
576 self.log.info('get_ofp_port_info', port_no=port_no,
577 info=self.ofp_port_no, device_id=device.id)
578 cap = OFPPF_1GB_FD | OFPPF_FIBER
579 return PortCapability(
580 port=LogicalPort(
581 ofp_port=ofp_port(
582 hw_addr=mac_str_to_tuple(
583 'AA:BB:CC:DD:EE:%02x' % port_no),
584 config=0,
585 state=OFPPS_LIVE,
586 curr=cap,
587 advertised=cap,
588 peer=cap,
589 curr_speed=OFPPF_1GB_FD,
590 max_speed=OFPPF_1GB_FD
591 ),
592 device_id=device.id,
593 device_port_no=port_no
594 )
595 )
596
597 # TODO - change for core 2.0
598 def reconcile(self, device):
599 self.log.info('reconciling-OLT-device')
600
601 @inlineCallbacks
602 def _rcv_frame(self, frame):
603 pkt = Ether(frame)
604
605 if pkt.haslayer(Dot1Q):
606 outer_shim = pkt.getlayer(Dot1Q)
607
608 if isinstance(outer_shim.payload, Dot1Q):
609 inner_shim = outer_shim.payload
610 cvid = inner_shim.vlan
611 popped_frame = (
612 Ether(src=pkt.src, dst=pkt.dst, type=inner_shim.type) /
613 inner_shim.payload
614 )
615 self.log.info('sending-packet-in',device_id=self.device_id, port=cvid)
616 yield self.core_proxy.send_packet_in(device_id=self.device_id,
617 port=cvid,
618 packet=str(popped_frame))
619 elif pkt.haslayer(Raw):
620 raw_data = json.loads(pkt.getlayer(Raw).load)
621 self.alarms.send_alarm(self, raw_data)
622
623 @inlineCallbacks
624 def rcv_grpc(self):
625 """
626 This call establishes a GRPC stream to receive frames.
627 """
628 yield self.get_channel()
629 stub = PonSimStub(self.channel)
630 # stub = PonSimStub(self.get_channel())
631
632 # Attempt to establish a grpc stream with the remote ponsim service
633 self.frames = stub.ReceiveFrames(Empty())
634
635 self.log.info('start-receiving-grpc-frames')
636
637 try:
638 for frame in self.frames:
639 self.log.info('received-grpc-frame',
640 frame_len=len(frame.payload))
641 yield self._rcv_frame(frame.payload)
642
643 except _Rendezvous, e:
644 log.warn('grpc-connection-lost', message=e.message)
645
646 self.log.info('stopped-receiving-grpc-frames')
647
648 @inlineCallbacks
649 def update_flow_table(self, flows):
650 yield self.get_channel()
651 stub = PonSimStub(self.channel)
652
653 self.log.info('pushing-olt-flow-table')
654 stub.UpdateFlowTable(FlowTable(
655 port=0,
656 flows=flows
657 ))
658 self.log.info('success')
659
660 def remove_from_flow_table(self, flows):
661 self.log.debug('remove-from-flow-table', flows=flows)
662 # TODO: Update PONSIM code to accept incremental flow changes
663 # Once completed, the accepts_add_remove_flow_updates for this
664 # device type can be set to True
665
666 def add_to_flow_table(self, flows):
667 self.log.debug('add-to-flow-table', flows=flows)
668 # TODO: Update PONSIM code to accept incremental flow changes
669 # Once completed, the accepts_add_remove_flow_updates for this
670 # device type can be set to True
671
672 def update_pm_config(self, device, pm_config):
673 log.info("handler-update-pm-config", device=device,
674 pm_config=pm_config)
675 self.pm_metrics.update(pm_config)
676
677 def send_proxied_message(self, proxy_address, msg):
678 self.log.info('sending-proxied-message')
679 if isinstance(msg, FlowTable):
680 stub = PonSimStub(self.get_channel())
681 self.log.info('pushing-onu-flow-table', port=msg.port)
682 res = stub.UpdateFlowTable(msg)
683 self.core_proxy.receive_proxied_message(proxy_address, res)
684
685 @inlineCallbacks
686 def process_inter_adapter_message(self, request):
687 self.log.info('process-inter-adapter-message', msg=request)
688 try:
689 if request.header.type == InterAdapterMessageType.FLOW_REQUEST:
690 f = FlowTable()
691 if request.body:
692 request.body.Unpack(f)
693 stub = PonSimStub(self.channel)
694 self.log.info('pushing-onu-flow-table')
695 res = stub.UpdateFlowTable(f)
696 # Send response back
697 reply = InterAdapterResponseBody()
698 reply.status = True
699 self.log.info('sending-response-back', reply=reply)
700 yield self.adapter_proxy.send_inter_adapter_message(
701 msg=reply,
702 type=InterAdapterMessageType.FLOW_RESPONSE,
703 from_adapter=self.adapter.name,
704 to_adapter=request.header.from_topic,
705 to_device_id=request.header.to_device_id,
706 message_id=request.header.id
707 )
708 elif request.header.type == InterAdapterMessageType.METRICS_REQUEST:
709 m = PonSimMetricsRequest()
710 if request.body:
711 request.body.Unpack(m)
712 stub = PonSimStub(self.channel)
713 self.log.info('proxying onu stats request', port=m.port)
714 res = stub.GetStats(m)
715 # Send response back
716 reply = InterAdapterResponseBody()
717 reply.status = True
718 reply.body.Pack(res)
719 self.log.info('sending-response-back', reply=reply)
720 yield self.adapter_proxy.send_inter_adapter_message(
721 msg=reply,
722 type=InterAdapterMessageType.METRICS_RESPONSE,
723 from_adapter=self.adapter.name,
724 to_adapter=request.header.from_topic,
725 to_device_id=request.header.to_device_id,
726 message_id=request.header.id
727 )
728 except Exception as e:
729 self.log.exception("error-processing-inter-adapter-message", e=e)
730
731 def packet_out(self, egress_port, msg):
732 self.log.info('sending-packet-out', egress_port=egress_port,
733 msg=hexify(msg))
734 try:
735 pkt = Ether(msg)
736 out_pkt = pkt
737 if egress_port != self.nni_port.port_no:
738 # don't do the vlan manipulation for the NNI port, vlans are already correct
739 out_pkt = (
740 Ether(src=pkt.src, dst=pkt.dst) /
741 Dot1Q(vlan=egress_port, type=pkt.type) /
742 pkt.payload
743 )
744
745 # TODO need better way of mapping logical ports to PON ports
746 out_port = self.nni_port.port_no if egress_port == self.nni_port.port_no else 1
747
748 # send over grpc stream
749 stub = PonSimStub(self.channel)
750 frame = PonSimFrame(id=self.device_id, payload=str(out_pkt),
751 out_port=out_port)
752 stub.SendFrame(frame)
753 except Exception as e:
754 self.log.exception("error-processing-packet-out", e=e)
755
756
757 @inlineCallbacks
758 def reboot(self):
759 self.log.info('rebooting', device_id=self.device_id)
760
761 yield self.core_proxy.device_state_update(self.device_id,
762 connect_status=ConnectStatus.UNREACHABLE)
763
764 # Update the child devices connect state to UNREACHABLE
765 yield self.core_proxy.children_state_update(self.device_id,
766 connect_status=ConnectStatus.UNREACHABLE)
767
768 # Sleep 10 secs, simulating a reboot
769 # TODO: send alert and clear alert after the reboot
770 yield asleep(10)
771
772 # Change the connection status back to REACHABLE. With a
773 # real OLT the connection state must be the actual state
774 yield self.core_proxy.device_state_update(self.device_id,
775 connect_status=ConnectStatus.REACHABLE)
776
777 # Update the child devices connect state to REACHABLE
778 yield self.core_proxy.children_state_update(self.device_id,
779 connect_status=ConnectStatus.REACHABLE)
780
781 self.log.info('rebooted', device_id=self.device_id)
782
783 def self_test_device(self, device):
784 """
785 This is called to Self a device based on a NBI call.
786 :param device: A Voltha.Device object.
787 :return: Will return result of self test
788 """
789 log.info('self-test-device', device=device.id)
790 raise NotImplementedError()
791
792 @inlineCallbacks
793 def disable(self):
794 self.log.info('disabling', device_id=self.device_id)
795
796 self.stop_kpi_collection()
797
798 # Update the operational status to UNKNOWN and connection status to UNREACHABLE
799 yield self.core_proxy.device_state_update(self.device_id,
800 oper_status=OperStatus.UNKNOWN,
801 connect_status=ConnectStatus.UNREACHABLE)
802
803 self.close_channel()
804 self.log.info('disabled-grpc-channel')
805
806 self.stop_kpi_collection()
807
808 # TODO:
809 # 1) Remove all flows from the device
810 # 2) Remove the device from ponsim
811
812 self.log.info('disabled', device_id=self.device_id)
813
814 @inlineCallbacks
815 def reenable(self):
816 self.log.info('re-enabling', device_id=self.device_id)
817
818 # Set the ofp_port_no and nni_port in case we bypassed the reconcile
819 # process if the device was in DISABLED state on voltha restart
820 if not self.ofp_port_no and not self.nni_port:
821 yield self.get_channel()
822 stub = PonSimStub(self.channel)
823 info = stub.GetDeviceInfo(Empty())
824 log.info('got-info', info=info)
825 self.ofp_port_no = info.nni_port
826 ports = yield self._get_nni_port()
827 # For ponsim, we are using only 1 NNI port
828 if ports.items:
829 self.nni_port = ports.items[0]
830
831 # Update the state of the NNI port
832 yield self.core_proxy.port_state_update(self.device_id,
833 port_type=Port.ETHERNET_NNI,
834 port_no=self.ofp_port_no,
835 oper_status=OperStatus.ACTIVE)
836
837 # Update the state of the PON port
838 yield self.core_proxy.port_state_update(self.device_id,
839 port_type=Port.PON_OLT,
840 port_no=1,
841 oper_status=OperStatus.ACTIVE)
842
843 # Set the operational state of the device to ACTIVE and connect status to REACHABLE
844 yield self.core_proxy.device_state_update(self.device_id,
845 connect_status=ConnectStatus.REACHABLE,
846 oper_status=OperStatus.ACTIVE)
847
848 # TODO: establish frame grpc-stream
849 # yield reactor.callInThread(self.rcv_grpc)
850
851 self.start_kpi_collection(self.device_id)
852
853 self.log.info('re-enabled', device_id=self.device_id)
854
855 def delete(self):
856 self.log.info('deleting', device_id=self.device_id)
857
858 self.close_channel()
859 self.log.info('disabled-grpc-channel')
860
861 # TODO:
862 # 1) Remove all flows from the device
863 # 2) Remove the device from ponsim
864
865 self.log.info('deleted', device_id=self.device_id)
866
867 def start_kpi_collection(self, device_id):
868
869 kafka_cluster_proxy = get_kafka_proxy()
870
871 def _collect(device_id, prefix):
872
873 try:
874 # Step 1: gather metrics from device
875 port_metrics = \
876 self.pm_metrics.collect_port_metrics(self.channel)
877
878 # Step 2: prepare the KpiEvent for submission
879 # we can time-stamp them here (or could use time derived from OLT
880 ts = arrow.utcnow().timestamp
881 kpi_event = KpiEvent(
882 type=KpiEventType.slice,
883 ts=ts,
884 prefixes={
885 # OLT NNI port
886 prefix + '.nni': MetricValuePairs(
887 metrics=port_metrics['nni']),
888 # OLT PON port
889 prefix + '.pon': MetricValuePairs(
890 metrics=port_metrics['pon'])
891 }
892 )
893
894 # Step 3: submit directly to the kafka bus
895 if kafka_cluster_proxy:
896 if isinstance(kpi_event, Message):
897 kpi_event = dumps(MessageToDict(kpi_event, True, True))
898 kafka_cluster_proxy.send_message("voltha.kpis", kpi_event)
899
900 except Exception as e:
901 log.exception('failed-to-submit-kpis', e=e)
902
903 self.pm_metrics.start_collector(_collect)
904
905 def stop_kpi_collection(self):
906 self.pm_metrics.stop_collector()