blob: 3a410aed4d53febe63626b33d67f53478fcf2a63 [file] [log] [blame]
William Kurkian6f436d02019-02-06 16:25:01 -05001#
2# Copyright 2018 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18Openolt adapter.
19"""
20import arrow
21import grpc
22import structlog
23from google.protobuf.empty_pb2 import Empty
24from google.protobuf.json_format import MessageToDict
25from scapy.layers.inet import Raw
26import json
27from google.protobuf.message import Message
28from grpc._channel import _Rendezvous
29from scapy.layers.l2 import Ether, Dot1Q
30from simplejson import dumps
31from twisted.internet import reactor
32from twisted.internet.defer import inlineCallbacks, returnValue
33from twisted.internet.task import LoopingCall
34
William Kurkianfefd4642019-02-07 15:30:03 -050035from voltha.adapters.common.frameio.frameio import BpfProgramFilter, hexify
36from voltha.adapters.iadapter import OltAdapter
37from voltha.common.utils.asleep import asleep
38from voltha.common.utils.registry import registry
39from voltha.adapters.kafka.kafka_proxy import get_kafka_proxy
40from voltha.protos import openolt_pb2
41from voltha.protos import third_party
42from voltha.protos.common_pb2 import OperStatus, ConnectStatus
43from voltha.protos.common_pb2 import LogLevel
44from voltha.protos.common_pb2 import OperationResp
45from voltha.protos.inter_container_pb2 import SwitchCapability, PortCapability, \
William Kurkian6f436d02019-02-06 16:25:01 -050046 InterAdapterMessageType, InterAdapterResponseBody
William Kurkianfefd4642019-02-07 15:30:03 -050047from voltha.protos.device_pb2 import Port, PmConfig, PmConfigs, \
William Kurkian6f436d02019-02-06 16:25:01 -050048 DeviceType, DeviceTypes
William Kurkianfefd4642019-02-07 15:30:03 -050049from voltha.protos.adapter_pb2 import Adapter
50from voltha.protos.adapter_pb2 import AdapterConfig
51from voltha.adapters.openolt.openolt_flow_mgr import OpenOltFlowMgr
52from voltha.adapters.openolt.openolt_alarms import OpenOltAlarmMgr
53from voltha.adapters.openolt.openolt_statistics import OpenOltStatisticsMgr
54from voltha.adapters.openolt.openolt_bw import OpenOltBW
55from voltha.adapters.openolt.openolt_platform import OpenOltPlatform
56from voltha.adapters.openolt.openolt_resource_manager import OpenOltResourceMgr
57from voltha.adapters.openolt.openolt_device import OpenoltDevice
William Kurkian6f436d02019-02-06 16:25:01 -050058
William Kurkianfefd4642019-02-07 15:30:03 -050059from voltha.protos.events_pb2 import KpiEvent, KpiEventType, MetricValuePairs
60from voltha.protos.logical_device_pb2 import LogicalPort
61from voltha.protos.openflow_13_pb2 import OFPPS_LIVE, OFPPF_FIBER, \
William Kurkian6f436d02019-02-06 16:25:01 -050062 OFPPF_1GB_FD, \
63 OFPC_GROUP_STATS, OFPC_PORT_STATS, OFPC_TABLE_STATS, OFPC_FLOW_STATS, \
64 ofp_switch_features, ofp_desc
William Kurkianfefd4642019-02-07 15:30:03 -050065from voltha.protos.openflow_13_pb2 import ofp_port
66from voltha.protos.ponsim_pb2 import FlowTable, PonSimFrame, PonSimMetricsRequest, PonSimStub
William Kurkian6f436d02019-02-06 16:25:01 -050067
68_ = third_party
69log = structlog.get_logger()
William Kurkianfefd4642019-02-07 15:30:03 -050070OpenOltDefaults = {
71 'support_classes': {
72 'platform': OpenOltPlatform,
73 'resource_mgr': OpenOltResourceMgr,
74 'flow_mgr': OpenOltFlowMgr,
75 'alarm_mgr': OpenOltAlarmMgr,
76 'stats_mgr': OpenOltStatisticsMgr,
77 'bw_mgr': OpenOltBW
78 }
79}
William Kurkian6f436d02019-02-06 16:25:01 -050080
81class AdapterPmMetrics:
82 def __init__(self, device):
83 self.pm_names = {'tx_64_pkts', 'tx_65_127_pkts', 'tx_128_255_pkts',
84 'tx_256_511_pkts', 'tx_512_1023_pkts',
85 'tx_1024_1518_pkts', 'tx_1519_9k_pkts',
86 'rx_64_pkts', 'rx_65_127_pkts',
87 'rx_128_255_pkts', 'rx_256_511_pkts',
88 'rx_512_1023_pkts', 'rx_1024_1518_pkts',
89 'rx_1519_9k_pkts'}
90 self.device = device
91 self.id = device.id
92 self.name = 'ponsim_olt'
93 self.default_freq = 150
94 self.grouped = False
95 self.freq_override = False
96 self.pon_metrics_config = dict()
97 self.nni_metrics_config = dict()
98 self.lc = None
99 for m in self.pm_names:
100 self.pon_metrics_config[m] = PmConfig(name=m,
101 type=PmConfig.COUNTER,
102 enabled=True)
103 self.nni_metrics_config[m] = PmConfig(name=m,
104 type=PmConfig.COUNTER,
105 enabled=True)
106
107 def update(self, pm_config):
108 if self.default_freq != pm_config.default_freq:
109 # Update the callback to the new frequency.
110 self.default_freq = pm_config.default_freq
111 self.lc.stop()
112 self.lc.start(interval=self.default_freq / 10)
113 for m in pm_config.metrics:
114 self.pon_metrics_config[m.name].enabled = m.enabled
115 self.nni_metrics_config[m.name].enabled = m.enabled
116
117 def make_proto(self):
118 pm_config = PmConfigs(
119 id=self.id,
120 default_freq=self.default_freq,
121 grouped=False,
122 freq_override=False)
123 for m in sorted(self.pon_metrics_config):
124 pm = self.pon_metrics_config[m] # Either will do they're the same
125 pm_config.metrics.extend([PmConfig(name=pm.name,
126 type=pm.type,
127 enabled=pm.enabled)])
128 return pm_config
129
130 def collect_port_metrics(self, channel):
131 rtrn_port_metrics = dict()
132 stub = ponsim_pb2.PonSimStub(channel)
133 stats = stub.GetStats(Empty())
134 rtrn_port_metrics['pon'] = self.extract_pon_metrics(stats)
135 rtrn_port_metrics['nni'] = self.extract_nni_metrics(stats)
136 return rtrn_port_metrics
137
138 def extract_pon_metrics(self, stats):
139 rtrn_pon_metrics = dict()
140 for m in stats.metrics:
141 if m.port_name == "pon":
142 for p in m.packets:
143 if self.pon_metrics_config[p.name].enabled:
144 rtrn_pon_metrics[p.name] = p.value
145 return rtrn_pon_metrics
146
147 def extract_nni_metrics(self, stats):
148 rtrn_pon_metrics = dict()
149 for m in stats.metrics:
150 if m.port_name == "nni":
151 for p in m.packets:
152 if self.pon_metrics_config[p.name].enabled:
153 rtrn_pon_metrics[p.name] = p.value
154 return rtrn_pon_metrics
155
156 def start_collector(self, callback):
157 log.info("starting-pm-collection", device_name=self.name,
158 device_id=self.device.id)
159 prefix = 'voltha.{}.{}'.format(self.name, self.device.id)
160 self.lc = LoopingCall(callback, self.device.id, prefix)
161 self.lc.start(interval=self.default_freq / 10)
162
163 def stop_collector(self):
164 log.info("stopping-pm-collection", device_name=self.name,
165 device_id=self.device.id)
166 self.lc.stop()
167
168
169class AdapterAlarms:
170 def __init__(self, adapter, device):
171 self.adapter = adapter
172 self.device = device
173 self.lc = None
174
175 # TODO: Implement code to send to kafka cluster directly instead of
176 # going through the voltha core.
177 def send_alarm(self, context_data, alarm_data):
178 log.debug("send-alarm-not-implemented")
179 return
180
181
182
183class OpenoltAdapter(OltAdapter):
184 name = 'openolt'
185
186 supported_device_types = [
187 DeviceType(
188 id=name,
189 adapter=name,
190 accepts_bulk_flow_update=True,
191 accepts_add_remove_flow_updates=True
192 )
193 ]
194
195 # System Init Methods #
196 def __init__(self, core_proxy, adapter_proxy, config):
197 super(OpenoltAdapter, self).__init__(core_proxy=core_proxy,
198 adapter_proxy=adapter_proxy,
199 config=config,
200 device_handler_class=OpenoltHandler,
201 name='openolt',
202 vendor='Voltha project',
203 version='0.4',
204 device_type='openolt',
205 accepts_bulk_flow_update=True,
206 accepts_add_remove_flow_updates=False)
207 self.adapter_proxy = adapter_proxy
208 self.core_proxy = core_proxy
209 self.config = config
210 self.descriptor = Adapter(
211 id=self.name,
212 vendor='OLT white box vendor',
213 version='0.1',
214 config=config
215 )
216 log.debug('openolt.__init__', adapter_proxy=adapter_proxy)
217 self.devices = dict() # device_id -> OpenoltDevice()
218 self.interface = registry('main').get_args().interface
219 self.logical_device_id_to_root_device_id = dict()
220 self.num_devices = 0
221
222 def start(self):
223 log.info('started', interface=self.interface)
224
225 def stop(self):
226 log.info('stopped', interface=self.interface)
227
228
229 # Info Methods #
230 def adapter_descriptor(self):
231 log.debug('get descriptor', interface=self.interface)
232 return self.descriptor
233
234 def device_types(self):
235 log.debug('get device_types', interface=self.interface,
236 items=self.supported_device_types)
237 return DeviceTypes(items=self.supported_device_types)
238
239 def health(self):
240 log.debug('get health', interface=self.interface)
241 raise NotImplementedError()
242
243 def get_device_details(self, device):
244 log.debug('get_device_details', device=device)
245 raise NotImplementedError()
246
247
248 # Device Operation Methods #
249 def change_master_state(self, master):
250 log.debug('change_master_state', interface=self.interface,
251 master=master)
252 raise NotImplementedError()
253
254 def abandon_device(self, device):
255 log.info('abandon-device', device=device)
256 raise NotImplementedError()
257
258
259 # Configuration Methods #
260 def update_flows_incrementally(self, device, flow_changes, group_changes):
261 log.debug('update_flows_incrementally', device=device,
262 flow_changes=flow_changes, group_changes=group_changes)
263 log.info('This device does not allow this, therefore it is Not '
264 'implemented')
265 raise NotImplementedError()
266
267 def update_pm_config(self, device, pm_configs):
268 log.info('update_pm_config - Not implemented yet', device=device,
269 pm_configs=pm_configs)
270 raise NotImplementedError()
271
272 def receive_proxied_message(self, proxy_address, msg):
273 log.debug('receive_proxied_message - Not implemented',
274 proxy_address=proxy_address,
275 proxied_msg=msg)
276 raise NotImplementedError()
277
278 def receive_inter_adapter_message(self, msg):
279 log.info('rx_inter_adapter_msg - Not implemented')
280 raise NotImplementedError()
281
282
283 # Image Operations Methods #
284 def download_image(self, device, request):
285 log.info('image_download - Not implemented yet', device=device,
286 request=request)
287 raise NotImplementedError()
288
289 def get_image_download_status(self, device, request):
290 log.info('get_image_download - Not implemented yet', device=device,
291 request=request)
292 raise NotImplementedError()
293
294 def cancel_image_download(self, device, request):
295 log.info('cancel_image_download - Not implemented yet', device=device)
296 raise NotImplementedError()
297
298 def activate_image_update(self, device, request):
299 log.info('activate_image_update - Not implemented yet',
300 device=device, request=request)
301 raise NotImplementedError()
302
303 def revert_image_update(self, device, request):
304 log.info('revert_image_update - Not implemented yet',
305 device=device, request=request)
306 raise NotImplementedError()
307
308 def self_test_device(self, device):
309 # from voltha.protos.voltha_pb2 import SelfTestResponse
310 log.info('Not implemented yet')
311 raise NotImplementedError()
312
313
314 # PON Operations Methods #
315 def create_interface(self, device, data):
316 log.debug('create-interface - Not implemented - We do not use this',
317 data=data)
318 raise NotImplementedError()
319
320 def update_interface(self, device, data):
321 log.debug('update-interface - Not implemented - We do not use this',
322 data=data)
323 raise NotImplementedError()
324
325 def remove_interface(self, device, data):
326 log.debug('remove-interface - Not implemented - We do not use this',
327 data=data)
328 raise NotImplementedError()
329
330 def receive_onu_detect_state(self, proxy_address, state):
331 log.debug('receive-onu-detect-state - Not implemented - We do not '
332 'use this', proxy_address=proxy_address,
333 state=state)
334 raise NotImplementedError()
335
336 def create_tcont(self, device, tcont_data, traffic_descriptor_data):
337 log.info('create-tcont - Not implemented - We do not use this',
338 tcont_data=tcont_data,
339 traffic_descriptor_data=traffic_descriptor_data)
340 raise NotImplementedError()
341
342 def update_tcont(self, device, tcont_data, traffic_descriptor_data):
343 log.info('update-tcont - Not implemented - We do not use this',
344 tcont_data=tcont_data,
345 traffic_descriptor_data=traffic_descriptor_data)
346 raise NotImplementedError()
347
348 def remove_tcont(self, device, tcont_data, traffic_descriptor_data):
349 log.info('remove-tcont - Not implemented - We do not use this',
350 tcont_data=tcont_data,
351 traffic_descriptor_data=traffic_descriptor_data)
352 raise NotImplementedError()
353
354 def create_gemport(self, device, data):
355 log.info('create-gemport - Not implemented - We do not use this',
356 data=data)
357 raise NotImplementedError()
358
359 def update_gemport(self, device, data):
360 log.info('update-gemport - Not implemented - We do not use this',
361 data=data)
362 raise NotImplementedError()
363
364 def remove_gemport(self, device, data):
365 log.info('remove-gemport - Not implemented - We do not use this',
366 data=data)
367 raise NotImplementedError()
368
369 def create_multicast_gemport(self, device, data):
370 log.info('create-mcast-gemport - Not implemented - We do not use '
371 'this', data=data)
372 raise NotImplementedError()
373
374 def update_multicast_gemport(self, device, data):
375 log.info('update-mcast-gemport - Not implemented - We do not use '
376 'this', data=data)
377 raise NotImplementedError()
378
379 def remove_multicast_gemport(self, device, data):
380 log.info('remove-mcast-gemport - Not implemented - We do not use '
381 'this', data=data)
382 raise NotImplementedError()
383
384 def create_multicast_distribution_set(self, device, data):
385 log.info('create-mcast-distribution-set - Not implemented - We do '
386 'not use this', data=data)
387 raise NotImplementedError()
388
389 def update_multicast_distribution_set(self, device, data):
390 log.info('update-mcast-distribution-set - Not implemented - We do '
391 'not use this', data=data)
392 raise NotImplementedError()
393
394 def remove_multicast_distribution_set(self, device, data):
395 log.info('remove-mcast-distribution-set - Not implemented - We do '
396 'not use this', data=data)
397 raise NotImplementedError()
398
399
400 # Alarm Methods #
401 def suppress_alarm(self, filter):
402 log.info('suppress_alarm - Not implemented yet', filter=filter)
403 raise NotImplementedError()
404
405 def unsuppress_alarm(self, filter):
406 log.info('unsuppress_alarm - Not implemented yet', filter=filter)
407 raise NotImplementedError()
408
409class OpenoltHandler(object):
410 def __init__(self, adapter, device_id):
411 self.adapter = adapter
412 self.core_proxy = adapter.core_proxy
413 self.adapter_proxy = adapter.adapter_proxy
414 self.device_id = device_id
415 self.log = structlog.get_logger(device_id=device_id)
416 self.channel = None
417 self.io_port = None
418 self.logical_device_id = None
419 self.nni_port = None
420 self.ofp_port_no = None
421 self.interface = registry('main').get_args().interface
422 self.pm_metrics = None
423 self.alarms = None
424 self.frames = None
William Kurkianfefd4642019-02-07 15:30:03 -0500425 self.num_devices = 0
William Kurkian6f436d02019-02-06 16:25:01 -0500426
427 @inlineCallbacks
428 def get_channel(self):
429 if self.channel is None:
430 try:
431 device = yield self.core_proxy.get_device(self.device_id)
432 self.log.info('device-info', device=device,
433 host_port=device.host_and_port)
434 self.channel = grpc.insecure_channel(device.host_and_port)
435 except Exception as e:
436 log.exception("ponsim-connection-failure", e=e)
437
438 # returnValue(self.channel)
439
440 def close_channel(self):
441 if self.channel is None:
442 self.log.info('grpc-channel-already-closed')
443 return
444 else:
445 if self.frames is not None:
446 self.frames.cancel()
447 self.frames = None
448 self.log.info('cancelled-grpc-frame-stream')
449
450 self.channel.unsubscribe(lambda *args: None)
451 self.channel = None
452
453 self.log.info('grpc-channel-closed')
454
455 @inlineCallbacks
456 def _get_nni_port(self):
457 ports = yield self.core_proxy.get_ports(self.device_id,
458 Port.ETHERNET_NNI)
459 returnValue(ports)
William Kurkianfefd4642019-02-07 15:30:03 -0500460
461 def init_device(self, kwargs):
462 self.device = OpenoltDevice(**kwargs)
William Kurkian6f436d02019-02-06 16:25:01 -0500463
464 @inlineCallbacks
465 def activate(self, device):
466 try:
467 self.log.info('activating')
William Kurkian6f436d02019-02-06 16:25:01 -0500468 if not device.host_and_port:
469 device.oper_status = OperStatus.FAILED
470 device.reason = 'No host_and_port field provided'
471 self.core_proxy.device_update(device)
472 return
William Kurkianfefd4642019-02-07 15:30:03 -0500473
William Kurkian6f436d02019-02-06 16:25:01 -0500474 kwargs = {
475 'support_classes': OpenOltDefaults['support_classes'],
William Kurkianfefd4642019-02-07 15:30:03 -0500476 'adapter_agent': self.core_proxy,
William Kurkian6f436d02019-02-06 16:25:01 -0500477 'device': device,
478 'device_num': self.num_devices + 1
479 }
480 try:
William Kurkianfefd4642019-02-07 15:30:03 -0500481 yield self.init_device(kwargs)
William Kurkian6f436d02019-02-06 16:25:01 -0500482 except Exception as e:
483 log.error('Failed to adopt OpenOLT device', error=e)
484 # TODO set status to ERROR so that is clear something went wrong
William Kurkianfefd4642019-02-07 15:30:03 -0500485 #del self.devices[device.id]
William Kurkian6f436d02019-02-06 16:25:01 -0500486 raise
487 else:
488 self.num_devices += 1
489
490 """
491 yield self.get_channel()
492 stub = PonSimStub(self.channel)
493 info = stub.GetDeviceInfo(Empty())
494 log.info('got-info', info=info, device_id=device.id)
495 self.ofp_port_no = info.nni_port
496
497 device.root = True
498 device.vendor = 'ponsim'
499 device.model = 'n/a'
500 device.serial_number = device.host_and_port
501 device.mac_address = "AA:BB:CC:DD:EE:FF"
502 yield self.core_proxy.device_update(device)
503
504 # Now set the initial PM configuration for this device
505 self.pm_metrics = AdapterPmMetrics(device)
506 pm_config = self.pm_metrics.make_proto()
507 log.info("initial-pm-config", pm_config=pm_config)
508 self.core_proxy.device_pm_config_update(pm_config, init=True)
509
510 # Setup alarm handler
511 self.alarms = AdapterAlarms(self.adapter, device)
512
513 nni_port = Port(
514 port_no=info.nni_port,
515 label='NNI facing Ethernet port',
516 type=Port.ETHERNET_NNI,
517 oper_status=OperStatus.ACTIVE
518 )
519 self.nni_port = nni_port
520 yield self.core_proxy.port_created(device.id, nni_port)
521 yield self.core_proxy.port_created(device.id, Port(
522 port_no=1,
523 label='PON port',
524 type=Port.PON_OLT,
525 oper_status=OperStatus.ACTIVE
526 ))
527
528 yield self.core_proxy.device_state_update(device.id,
529 connect_status=ConnectStatus.REACHABLE,
530 oper_status=OperStatus.ACTIVE)
531
532 # register ONUS
533 self.log.info('onu-found', onus=info.onus, len=len(info.onus))
534 for onu in info.onus:
535 vlan_id = onu.uni_port
536 yield self.core_proxy.child_device_detected(
537 parent_device_id=device.id,
538 parent_port_no=1,
539 child_device_type='ponsim_onu',
540 channel_id=vlan_id,
541 )
542
543 self.log.info('starting-frame-grpc-stream')
544 reactor.callInThread(self.rcv_grpc)
545 self.log.info('started-frame-grpc-stream')
546
547 # Start collecting stats from the device after a brief pause
548 self.start_kpi_collection(device.id)
William Kurkianfefd4642019-02-07 15:30:03 -0500549 """
William Kurkian6f436d02019-02-06 16:25:01 -0500550 except Exception as e:
551 log.exception("Exception-activating", e=e)
552
553 def get_ofp_device_info(self, device):
554 return SwitchCapability(
555 desc=ofp_desc(
556 hw_desc='ponsim pon',
557 sw_desc='ponsim pon',
558 serial_num=device.serial_number,
559 dp_desc='n/a'
560 ),
561 switch_features=ofp_switch_features(
562 n_buffers=256, # TODO fake for now
563 n_tables=2, # TODO ditto
564 capabilities=( # TODO and ditto
565 OFPC_FLOW_STATS
566 | OFPC_TABLE_STATS
567 | OFPC_PORT_STATS
568 | OFPC_GROUP_STATS
569 )
570 )
571 )
572
573 def get_ofp_port_info(self, device, port_no):
574 # Since the adapter created the device port then it has the reference of the port to
575 # return the capability. TODO: Do a lookup on the NNI port number and return the
576 # appropriate attributes
577 self.log.info('get_ofp_port_info', port_no=port_no,
578 info=self.ofp_port_no, device_id=device.id)
579 cap = OFPPF_1GB_FD | OFPPF_FIBER
580 return PortCapability(
581 port=LogicalPort(
582 ofp_port=ofp_port(
583 hw_addr=mac_str_to_tuple(
584 'AA:BB:CC:DD:EE:%02x' % port_no),
585 config=0,
586 state=OFPPS_LIVE,
587 curr=cap,
588 advertised=cap,
589 peer=cap,
590 curr_speed=OFPPF_1GB_FD,
591 max_speed=OFPPF_1GB_FD
592 ),
593 device_id=device.id,
594 device_port_no=port_no
595 )
596 )
597
598 # TODO - change for core 2.0
599 def reconcile(self, device):
600 self.log.info('reconciling-OLT-device')
601
602 @inlineCallbacks
603 def _rcv_frame(self, frame):
604 pkt = Ether(frame)
605
606 if pkt.haslayer(Dot1Q):
607 outer_shim = pkt.getlayer(Dot1Q)
608
609 if isinstance(outer_shim.payload, Dot1Q):
610 inner_shim = outer_shim.payload
611 cvid = inner_shim.vlan
612 popped_frame = (
613 Ether(src=pkt.src, dst=pkt.dst, type=inner_shim.type) /
614 inner_shim.payload
615 )
616 self.log.info('sending-packet-in',device_id=self.device_id, port=cvid)
617 yield self.core_proxy.send_packet_in(device_id=self.device_id,
618 port=cvid,
619 packet=str(popped_frame))
620 elif pkt.haslayer(Raw):
621 raw_data = json.loads(pkt.getlayer(Raw).load)
622 self.alarms.send_alarm(self, raw_data)
623
624 @inlineCallbacks
625 def rcv_grpc(self):
626 """
627 This call establishes a GRPC stream to receive frames.
628 """
629 yield self.get_channel()
630 stub = PonSimStub(self.channel)
631 # stub = PonSimStub(self.get_channel())
632
633 # Attempt to establish a grpc stream with the remote ponsim service
634 self.frames = stub.ReceiveFrames(Empty())
635
636 self.log.info('start-receiving-grpc-frames')
637
638 try:
639 for frame in self.frames:
640 self.log.info('received-grpc-frame',
641 frame_len=len(frame.payload))
642 yield self._rcv_frame(frame.payload)
643
644 except _Rendezvous, e:
645 log.warn('grpc-connection-lost', message=e.message)
646
647 self.log.info('stopped-receiving-grpc-frames')
648
649 @inlineCallbacks
650 def update_flow_table(self, flows):
651 yield self.get_channel()
652 stub = PonSimStub(self.channel)
653
654 self.log.info('pushing-olt-flow-table')
655 stub.UpdateFlowTable(FlowTable(
656 port=0,
657 flows=flows
658 ))
659 self.log.info('success')
660
661 def remove_from_flow_table(self, flows):
662 self.log.debug('remove-from-flow-table', flows=flows)
663 # TODO: Update PONSIM code to accept incremental flow changes
664 # Once completed, the accepts_add_remove_flow_updates for this
665 # device type can be set to True
666
667 def add_to_flow_table(self, flows):
668 self.log.debug('add-to-flow-table', flows=flows)
669 # TODO: Update PONSIM code to accept incremental flow changes
670 # Once completed, the accepts_add_remove_flow_updates for this
671 # device type can be set to True
672
673 def update_pm_config(self, device, pm_config):
674 log.info("handler-update-pm-config", device=device,
675 pm_config=pm_config)
676 self.pm_metrics.update(pm_config)
677
678 def send_proxied_message(self, proxy_address, msg):
679 self.log.info('sending-proxied-message')
680 if isinstance(msg, FlowTable):
681 stub = PonSimStub(self.get_channel())
682 self.log.info('pushing-onu-flow-table', port=msg.port)
683 res = stub.UpdateFlowTable(msg)
684 self.core_proxy.receive_proxied_message(proxy_address, res)
685
686 @inlineCallbacks
687 def process_inter_adapter_message(self, request):
688 self.log.info('process-inter-adapter-message', msg=request)
689 try:
690 if request.header.type == InterAdapterMessageType.FLOW_REQUEST:
691 f = FlowTable()
692 if request.body:
693 request.body.Unpack(f)
694 stub = PonSimStub(self.channel)
695 self.log.info('pushing-onu-flow-table')
696 res = stub.UpdateFlowTable(f)
697 # Send response back
698 reply = InterAdapterResponseBody()
699 reply.status = True
700 self.log.info('sending-response-back', reply=reply)
701 yield self.adapter_proxy.send_inter_adapter_message(
702 msg=reply,
703 type=InterAdapterMessageType.FLOW_RESPONSE,
704 from_adapter=self.adapter.name,
705 to_adapter=request.header.from_topic,
706 to_device_id=request.header.to_device_id,
707 message_id=request.header.id
708 )
709 elif request.header.type == InterAdapterMessageType.METRICS_REQUEST:
710 m = PonSimMetricsRequest()
711 if request.body:
712 request.body.Unpack(m)
713 stub = PonSimStub(self.channel)
714 self.log.info('proxying onu stats request', port=m.port)
715 res = stub.GetStats(m)
716 # Send response back
717 reply = InterAdapterResponseBody()
718 reply.status = True
719 reply.body.Pack(res)
720 self.log.info('sending-response-back', reply=reply)
721 yield self.adapter_proxy.send_inter_adapter_message(
722 msg=reply,
723 type=InterAdapterMessageType.METRICS_RESPONSE,
724 from_adapter=self.adapter.name,
725 to_adapter=request.header.from_topic,
726 to_device_id=request.header.to_device_id,
727 message_id=request.header.id
728 )
729 except Exception as e:
730 self.log.exception("error-processing-inter-adapter-message", e=e)
731
732 def packet_out(self, egress_port, msg):
733 self.log.info('sending-packet-out', egress_port=egress_port,
734 msg=hexify(msg))
735 try:
736 pkt = Ether(msg)
737 out_pkt = pkt
738 if egress_port != self.nni_port.port_no:
739 # don't do the vlan manipulation for the NNI port, vlans are already correct
740 out_pkt = (
741 Ether(src=pkt.src, dst=pkt.dst) /
742 Dot1Q(vlan=egress_port, type=pkt.type) /
743 pkt.payload
744 )
745
746 # TODO need better way of mapping logical ports to PON ports
747 out_port = self.nni_port.port_no if egress_port == self.nni_port.port_no else 1
748
749 # send over grpc stream
750 stub = PonSimStub(self.channel)
751 frame = PonSimFrame(id=self.device_id, payload=str(out_pkt),
752 out_port=out_port)
753 stub.SendFrame(frame)
754 except Exception as e:
755 self.log.exception("error-processing-packet-out", e=e)
756
757
758 @inlineCallbacks
759 def reboot(self):
760 self.log.info('rebooting', device_id=self.device_id)
761
762 yield self.core_proxy.device_state_update(self.device_id,
763 connect_status=ConnectStatus.UNREACHABLE)
764
765 # Update the child devices connect state to UNREACHABLE
766 yield self.core_proxy.children_state_update(self.device_id,
767 connect_status=ConnectStatus.UNREACHABLE)
768
769 # Sleep 10 secs, simulating a reboot
770 # TODO: send alert and clear alert after the reboot
771 yield asleep(10)
772
773 # Change the connection status back to REACHABLE. With a
774 # real OLT the connection state must be the actual state
775 yield self.core_proxy.device_state_update(self.device_id,
776 connect_status=ConnectStatus.REACHABLE)
777
778 # Update the child devices connect state to REACHABLE
779 yield self.core_proxy.children_state_update(self.device_id,
780 connect_status=ConnectStatus.REACHABLE)
781
782 self.log.info('rebooted', device_id=self.device_id)
783
784 def self_test_device(self, device):
785 """
786 This is called to Self a device based on a NBI call.
787 :param device: A Voltha.Device object.
788 :return: Will return result of self test
789 """
790 log.info('self-test-device', device=device.id)
791 raise NotImplementedError()
792
793 @inlineCallbacks
794 def disable(self):
795 self.log.info('disabling', device_id=self.device_id)
796
797 self.stop_kpi_collection()
798
799 # Update the operational status to UNKNOWN and connection status to UNREACHABLE
800 yield self.core_proxy.device_state_update(self.device_id,
801 oper_status=OperStatus.UNKNOWN,
802 connect_status=ConnectStatus.UNREACHABLE)
803
804 self.close_channel()
805 self.log.info('disabled-grpc-channel')
806
807 self.stop_kpi_collection()
808
809 # TODO:
810 # 1) Remove all flows from the device
811 # 2) Remove the device from ponsim
812
813 self.log.info('disabled', device_id=self.device_id)
814
815 @inlineCallbacks
816 def reenable(self):
817 self.log.info('re-enabling', device_id=self.device_id)
818
819 # Set the ofp_port_no and nni_port in case we bypassed the reconcile
820 # process if the device was in DISABLED state on voltha restart
821 if not self.ofp_port_no and not self.nni_port:
822 yield self.get_channel()
823 stub = PonSimStub(self.channel)
824 info = stub.GetDeviceInfo(Empty())
825 log.info('got-info', info=info)
826 self.ofp_port_no = info.nni_port
827 ports = yield self._get_nni_port()
828 # For ponsim, we are using only 1 NNI port
829 if ports.items:
830 self.nni_port = ports.items[0]
831
832 # Update the state of the NNI port
833 yield self.core_proxy.port_state_update(self.device_id,
834 port_type=Port.ETHERNET_NNI,
835 port_no=self.ofp_port_no,
836 oper_status=OperStatus.ACTIVE)
837
838 # Update the state of the PON port
839 yield self.core_proxy.port_state_update(self.device_id,
840 port_type=Port.PON_OLT,
841 port_no=1,
842 oper_status=OperStatus.ACTIVE)
843
844 # Set the operational state of the device to ACTIVE and connect status to REACHABLE
845 yield self.core_proxy.device_state_update(self.device_id,
846 connect_status=ConnectStatus.REACHABLE,
847 oper_status=OperStatus.ACTIVE)
848
849 # TODO: establish frame grpc-stream
850 # yield reactor.callInThread(self.rcv_grpc)
851
852 self.start_kpi_collection(self.device_id)
853
854 self.log.info('re-enabled', device_id=self.device_id)
855
856 def delete(self):
857 self.log.info('deleting', device_id=self.device_id)
858
859 self.close_channel()
860 self.log.info('disabled-grpc-channel')
861
862 # TODO:
863 # 1) Remove all flows from the device
864 # 2) Remove the device from ponsim
865
866 self.log.info('deleted', device_id=self.device_id)
867
868 def start_kpi_collection(self, device_id):
869
870 kafka_cluster_proxy = get_kafka_proxy()
871
872 def _collect(device_id, prefix):
873
874 try:
875 # Step 1: gather metrics from device
876 port_metrics = \
877 self.pm_metrics.collect_port_metrics(self.channel)
878
879 # Step 2: prepare the KpiEvent for submission
880 # we can time-stamp them here (or could use time derived from OLT
881 ts = arrow.utcnow().timestamp
882 kpi_event = KpiEvent(
883 type=KpiEventType.slice,
884 ts=ts,
885 prefixes={
886 # OLT NNI port
887 prefix + '.nni': MetricValuePairs(
888 metrics=port_metrics['nni']),
889 # OLT PON port
890 prefix + '.pon': MetricValuePairs(
891 metrics=port_metrics['pon'])
892 }
893 )
894
895 # Step 3: submit directly to the kafka bus
896 if kafka_cluster_proxy:
897 if isinstance(kpi_event, Message):
898 kpi_event = dumps(MessageToDict(kpi_event, True, True))
899 kafka_cluster_proxy.send_message("voltha.kpis", kpi_event)
900
901 except Exception as e:
902 log.exception('failed-to-submit-kpis', e=e)
903
904 self.pm_metrics.start_collector(_collect)
905
906 def stop_kpi_collection(self):
907 self.pm_metrics.stop_collector()