blob: 2c8773097f59504aa1615b9428f5b6504f002e96 [file] [log] [blame]
William Kurkian6f436d02019-02-06 16:25:01 -05001#
2# Copyright 2018 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18Openolt adapter.
19"""
20import arrow
21import grpc
22import structlog
23from google.protobuf.empty_pb2 import Empty
24from google.protobuf.json_format import MessageToDict
25from scapy.layers.inet import Raw
26import json
27from google.protobuf.message import Message
28from grpc._channel import _Rendezvous
29from scapy.layers.l2 import Ether, Dot1Q
30from simplejson import dumps
31from twisted.internet import reactor
32from twisted.internet.defer import inlineCallbacks, returnValue
33from twisted.internet.task import LoopingCall
34
35from python.adapters.common.frameio.frameio import BpfProgramFilter, hexify
36from python.adapters.iadapter import OltAdapter
37from python.common.utils.asleep import asleep
38from python.common.utils.registry import registry
39from python.adapters.kafka.kafka_proxy import get_kafka_proxy
40from python.protos import openolt_pb2
41from python.protos import third_party
42from python.protos.common_pb2 import OperStatus, ConnectStatus
43from python.protos.common_pb2 import LogLevel
44from python.protos.common_pb2 import OperationResp
45from python.protos.inter_container_pb2 import SwitchCapability, PortCapability, \
46 InterAdapterMessageType, InterAdapterResponseBody
47from python.protos.device_pb2 import Port, PmConfig, PmConfigs, \
48 DeviceType, DeviceTypes
49from python.protos.adapter_pb2 import Adapter
50from python.protos.adapter_pb2 import AdapterConfig
51
52
53from python.protos.events_pb2 import KpiEvent, KpiEventType, MetricValuePairs
54from python.protos.logical_device_pb2 import LogicalPort
55from python.protos.openflow_13_pb2 import OFPPS_LIVE, OFPPF_FIBER, \
56 OFPPF_1GB_FD, \
57 OFPC_GROUP_STATS, OFPC_PORT_STATS, OFPC_TABLE_STATS, OFPC_FLOW_STATS, \
58 ofp_switch_features, ofp_desc
59from python.protos.openflow_13_pb2 import ofp_port
60from python.protos.ponsim_pb2 import FlowTable, PonSimFrame, PonSimMetricsRequest, PonSimStub
61
62_ = third_party
63log = structlog.get_logger()
64#OpenOltDefaults = {
65# 'support_classes': {
66# 'platform': OpenOltPlatform,
67# 'resource_mgr': OpenOltResourceMgr,
68# 'flow_mgr': OpenOltFlowMgr,
69# 'alarm_mgr': OpenOltAlarmMgr,
70# 'stats_mgr': OpenOltStatisticsMgr,
71# 'bw_mgr': OpenOltBW
72# }
73#}
74
75class AdapterPmMetrics:
76 def __init__(self, device):
77 self.pm_names = {'tx_64_pkts', 'tx_65_127_pkts', 'tx_128_255_pkts',
78 'tx_256_511_pkts', 'tx_512_1023_pkts',
79 'tx_1024_1518_pkts', 'tx_1519_9k_pkts',
80 'rx_64_pkts', 'rx_65_127_pkts',
81 'rx_128_255_pkts', 'rx_256_511_pkts',
82 'rx_512_1023_pkts', 'rx_1024_1518_pkts',
83 'rx_1519_9k_pkts'}
84 self.device = device
85 self.id = device.id
86 self.name = 'ponsim_olt'
87 self.default_freq = 150
88 self.grouped = False
89 self.freq_override = False
90 self.pon_metrics_config = dict()
91 self.nni_metrics_config = dict()
92 self.lc = None
93 for m in self.pm_names:
94 self.pon_metrics_config[m] = PmConfig(name=m,
95 type=PmConfig.COUNTER,
96 enabled=True)
97 self.nni_metrics_config[m] = PmConfig(name=m,
98 type=PmConfig.COUNTER,
99 enabled=True)
100
101 def update(self, pm_config):
102 if self.default_freq != pm_config.default_freq:
103 # Update the callback to the new frequency.
104 self.default_freq = pm_config.default_freq
105 self.lc.stop()
106 self.lc.start(interval=self.default_freq / 10)
107 for m in pm_config.metrics:
108 self.pon_metrics_config[m.name].enabled = m.enabled
109 self.nni_metrics_config[m.name].enabled = m.enabled
110
111 def make_proto(self):
112 pm_config = PmConfigs(
113 id=self.id,
114 default_freq=self.default_freq,
115 grouped=False,
116 freq_override=False)
117 for m in sorted(self.pon_metrics_config):
118 pm = self.pon_metrics_config[m] # Either will do they're the same
119 pm_config.metrics.extend([PmConfig(name=pm.name,
120 type=pm.type,
121 enabled=pm.enabled)])
122 return pm_config
123
124 def collect_port_metrics(self, channel):
125 rtrn_port_metrics = dict()
126 stub = ponsim_pb2.PonSimStub(channel)
127 stats = stub.GetStats(Empty())
128 rtrn_port_metrics['pon'] = self.extract_pon_metrics(stats)
129 rtrn_port_metrics['nni'] = self.extract_nni_metrics(stats)
130 return rtrn_port_metrics
131
132 def extract_pon_metrics(self, stats):
133 rtrn_pon_metrics = dict()
134 for m in stats.metrics:
135 if m.port_name == "pon":
136 for p in m.packets:
137 if self.pon_metrics_config[p.name].enabled:
138 rtrn_pon_metrics[p.name] = p.value
139 return rtrn_pon_metrics
140
141 def extract_nni_metrics(self, stats):
142 rtrn_pon_metrics = dict()
143 for m in stats.metrics:
144 if m.port_name == "nni":
145 for p in m.packets:
146 if self.pon_metrics_config[p.name].enabled:
147 rtrn_pon_metrics[p.name] = p.value
148 return rtrn_pon_metrics
149
150 def start_collector(self, callback):
151 log.info("starting-pm-collection", device_name=self.name,
152 device_id=self.device.id)
153 prefix = 'voltha.{}.{}'.format(self.name, self.device.id)
154 self.lc = LoopingCall(callback, self.device.id, prefix)
155 self.lc.start(interval=self.default_freq / 10)
156
157 def stop_collector(self):
158 log.info("stopping-pm-collection", device_name=self.name,
159 device_id=self.device.id)
160 self.lc.stop()
161
162
163class AdapterAlarms:
164 def __init__(self, adapter, device):
165 self.adapter = adapter
166 self.device = device
167 self.lc = None
168
169 # TODO: Implement code to send to kafka cluster directly instead of
170 # going through the voltha core.
171 def send_alarm(self, context_data, alarm_data):
172 log.debug("send-alarm-not-implemented")
173 return
174
175
176
177class OpenoltAdapter(OltAdapter):
178 name = 'openolt'
179
180 supported_device_types = [
181 DeviceType(
182 id=name,
183 adapter=name,
184 accepts_bulk_flow_update=True,
185 accepts_add_remove_flow_updates=True
186 )
187 ]
188
189 # System Init Methods #
190 def __init__(self, core_proxy, adapter_proxy, config):
191 super(OpenoltAdapter, self).__init__(core_proxy=core_proxy,
192 adapter_proxy=adapter_proxy,
193 config=config,
194 device_handler_class=OpenoltHandler,
195 name='openolt',
196 vendor='Voltha project',
197 version='0.4',
198 device_type='openolt',
199 accepts_bulk_flow_update=True,
200 accepts_add_remove_flow_updates=False)
201 self.adapter_proxy = adapter_proxy
202 self.core_proxy = core_proxy
203 self.config = config
204 self.descriptor = Adapter(
205 id=self.name,
206 vendor='OLT white box vendor',
207 version='0.1',
208 config=config
209 )
210 log.debug('openolt.__init__', adapter_proxy=adapter_proxy)
211 self.devices = dict() # device_id -> OpenoltDevice()
212 self.interface = registry('main').get_args().interface
213 self.logical_device_id_to_root_device_id = dict()
214 self.num_devices = 0
215
216 def start(self):
217 log.info('started', interface=self.interface)
218
219 def stop(self):
220 log.info('stopped', interface=self.interface)
221
222
223 # Info Methods #
224 def adapter_descriptor(self):
225 log.debug('get descriptor', interface=self.interface)
226 return self.descriptor
227
228 def device_types(self):
229 log.debug('get device_types', interface=self.interface,
230 items=self.supported_device_types)
231 return DeviceTypes(items=self.supported_device_types)
232
233 def health(self):
234 log.debug('get health', interface=self.interface)
235 raise NotImplementedError()
236
237 def get_device_details(self, device):
238 log.debug('get_device_details', device=device)
239 raise NotImplementedError()
240
241
242 # Device Operation Methods #
243 def change_master_state(self, master):
244 log.debug('change_master_state', interface=self.interface,
245 master=master)
246 raise NotImplementedError()
247
248 def abandon_device(self, device):
249 log.info('abandon-device', device=device)
250 raise NotImplementedError()
251
252
253 # Configuration Methods #
254 def update_flows_incrementally(self, device, flow_changes, group_changes):
255 log.debug('update_flows_incrementally', device=device,
256 flow_changes=flow_changes, group_changes=group_changes)
257 log.info('This device does not allow this, therefore it is Not '
258 'implemented')
259 raise NotImplementedError()
260
261 def update_pm_config(self, device, pm_configs):
262 log.info('update_pm_config - Not implemented yet', device=device,
263 pm_configs=pm_configs)
264 raise NotImplementedError()
265
266 def receive_proxied_message(self, proxy_address, msg):
267 log.debug('receive_proxied_message - Not implemented',
268 proxy_address=proxy_address,
269 proxied_msg=msg)
270 raise NotImplementedError()
271
272 def receive_inter_adapter_message(self, msg):
273 log.info('rx_inter_adapter_msg - Not implemented')
274 raise NotImplementedError()
275
276
277 # Image Operations Methods #
278 def download_image(self, device, request):
279 log.info('image_download - Not implemented yet', device=device,
280 request=request)
281 raise NotImplementedError()
282
283 def get_image_download_status(self, device, request):
284 log.info('get_image_download - Not implemented yet', device=device,
285 request=request)
286 raise NotImplementedError()
287
288 def cancel_image_download(self, device, request):
289 log.info('cancel_image_download - Not implemented yet', device=device)
290 raise NotImplementedError()
291
292 def activate_image_update(self, device, request):
293 log.info('activate_image_update - Not implemented yet',
294 device=device, request=request)
295 raise NotImplementedError()
296
297 def revert_image_update(self, device, request):
298 log.info('revert_image_update - Not implemented yet',
299 device=device, request=request)
300 raise NotImplementedError()
301
302 def self_test_device(self, device):
303 # from voltha.protos.voltha_pb2 import SelfTestResponse
304 log.info('Not implemented yet')
305 raise NotImplementedError()
306
307
308 # PON Operations Methods #
309 def create_interface(self, device, data):
310 log.debug('create-interface - Not implemented - We do not use this',
311 data=data)
312 raise NotImplementedError()
313
314 def update_interface(self, device, data):
315 log.debug('update-interface - Not implemented - We do not use this',
316 data=data)
317 raise NotImplementedError()
318
319 def remove_interface(self, device, data):
320 log.debug('remove-interface - Not implemented - We do not use this',
321 data=data)
322 raise NotImplementedError()
323
324 def receive_onu_detect_state(self, proxy_address, state):
325 log.debug('receive-onu-detect-state - Not implemented - We do not '
326 'use this', proxy_address=proxy_address,
327 state=state)
328 raise NotImplementedError()
329
330 def create_tcont(self, device, tcont_data, traffic_descriptor_data):
331 log.info('create-tcont - Not implemented - We do not use this',
332 tcont_data=tcont_data,
333 traffic_descriptor_data=traffic_descriptor_data)
334 raise NotImplementedError()
335
336 def update_tcont(self, device, tcont_data, traffic_descriptor_data):
337 log.info('update-tcont - Not implemented - We do not use this',
338 tcont_data=tcont_data,
339 traffic_descriptor_data=traffic_descriptor_data)
340 raise NotImplementedError()
341
342 def remove_tcont(self, device, tcont_data, traffic_descriptor_data):
343 log.info('remove-tcont - Not implemented - We do not use this',
344 tcont_data=tcont_data,
345 traffic_descriptor_data=traffic_descriptor_data)
346 raise NotImplementedError()
347
348 def create_gemport(self, device, data):
349 log.info('create-gemport - Not implemented - We do not use this',
350 data=data)
351 raise NotImplementedError()
352
353 def update_gemport(self, device, data):
354 log.info('update-gemport - Not implemented - We do not use this',
355 data=data)
356 raise NotImplementedError()
357
358 def remove_gemport(self, device, data):
359 log.info('remove-gemport - Not implemented - We do not use this',
360 data=data)
361 raise NotImplementedError()
362
363 def create_multicast_gemport(self, device, data):
364 log.info('create-mcast-gemport - Not implemented - We do not use '
365 'this', data=data)
366 raise NotImplementedError()
367
368 def update_multicast_gemport(self, device, data):
369 log.info('update-mcast-gemport - Not implemented - We do not use '
370 'this', data=data)
371 raise NotImplementedError()
372
373 def remove_multicast_gemport(self, device, data):
374 log.info('remove-mcast-gemport - Not implemented - We do not use '
375 'this', data=data)
376 raise NotImplementedError()
377
378 def create_multicast_distribution_set(self, device, data):
379 log.info('create-mcast-distribution-set - Not implemented - We do '
380 'not use this', data=data)
381 raise NotImplementedError()
382
383 def update_multicast_distribution_set(self, device, data):
384 log.info('update-mcast-distribution-set - Not implemented - We do '
385 'not use this', data=data)
386 raise NotImplementedError()
387
388 def remove_multicast_distribution_set(self, device, data):
389 log.info('remove-mcast-distribution-set - Not implemented - We do '
390 'not use this', data=data)
391 raise NotImplementedError()
392
393
394 # Alarm Methods #
395 def suppress_alarm(self, filter):
396 log.info('suppress_alarm - Not implemented yet', filter=filter)
397 raise NotImplementedError()
398
399 def unsuppress_alarm(self, filter):
400 log.info('unsuppress_alarm - Not implemented yet', filter=filter)
401 raise NotImplementedError()
402
403class OpenoltHandler(object):
404 def __init__(self, adapter, device_id):
405 self.adapter = adapter
406 self.core_proxy = adapter.core_proxy
407 self.adapter_proxy = adapter.adapter_proxy
408 self.device_id = device_id
409 self.log = structlog.get_logger(device_id=device_id)
410 self.channel = None
411 self.io_port = None
412 self.logical_device_id = None
413 self.nni_port = None
414 self.ofp_port_no = None
415 self.interface = registry('main').get_args().interface
416 self.pm_metrics = None
417 self.alarms = None
418 self.frames = None
419
420 @inlineCallbacks
421 def get_channel(self):
422 if self.channel is None:
423 try:
424 device = yield self.core_proxy.get_device(self.device_id)
425 self.log.info('device-info', device=device,
426 host_port=device.host_and_port)
427 self.channel = grpc.insecure_channel(device.host_and_port)
428 except Exception as e:
429 log.exception("ponsim-connection-failure", e=e)
430
431 # returnValue(self.channel)
432
433 def close_channel(self):
434 if self.channel is None:
435 self.log.info('grpc-channel-already-closed')
436 return
437 else:
438 if self.frames is not None:
439 self.frames.cancel()
440 self.frames = None
441 self.log.info('cancelled-grpc-frame-stream')
442
443 self.channel.unsubscribe(lambda *args: None)
444 self.channel = None
445
446 self.log.info('grpc-channel-closed')
447
448 @inlineCallbacks
449 def _get_nni_port(self):
450 ports = yield self.core_proxy.get_ports(self.device_id,
451 Port.ETHERNET_NNI)
452 returnValue(ports)
453
454 @inlineCallbacks
455 def activate(self, device):
456 try:
457 self.log.info('activating')
458 print (dir(device))
459 if not device.host_and_port:
460 device.oper_status = OperStatus.FAILED
461 device.reason = 'No host_and_port field provided'
462 self.core_proxy.device_update(device)
463 return
464 """
465 kwargs = {
466 'support_classes': OpenOltDefaults['support_classes'],
467 'adapter_agent': self.adapter_proxy,
468 'device': device,
469 'device_num': self.num_devices + 1
470 }
471 try:
472 self.devices[device.id] = OpenoltDevice(**kwargs)
473 except Exception as e:
474 log.error('Failed to adopt OpenOLT device', error=e)
475 # TODO set status to ERROR so that is clear something went wrong
476 del self.devices[device.id]
477 raise
478 else:
479 self.num_devices += 1
480
481 """
482 yield self.get_channel()
483 stub = PonSimStub(self.channel)
484 info = stub.GetDeviceInfo(Empty())
485 log.info('got-info', info=info, device_id=device.id)
486 self.ofp_port_no = info.nni_port
487
488 device.root = True
489 device.vendor = 'ponsim'
490 device.model = 'n/a'
491 device.serial_number = device.host_and_port
492 device.mac_address = "AA:BB:CC:DD:EE:FF"
493 yield self.core_proxy.device_update(device)
494
495 # Now set the initial PM configuration for this device
496 self.pm_metrics = AdapterPmMetrics(device)
497 pm_config = self.pm_metrics.make_proto()
498 log.info("initial-pm-config", pm_config=pm_config)
499 self.core_proxy.device_pm_config_update(pm_config, init=True)
500
501 # Setup alarm handler
502 self.alarms = AdapterAlarms(self.adapter, device)
503
504 nni_port = Port(
505 port_no=info.nni_port,
506 label='NNI facing Ethernet port',
507 type=Port.ETHERNET_NNI,
508 oper_status=OperStatus.ACTIVE
509 )
510 self.nni_port = nni_port
511 yield self.core_proxy.port_created(device.id, nni_port)
512 yield self.core_proxy.port_created(device.id, Port(
513 port_no=1,
514 label='PON port',
515 type=Port.PON_OLT,
516 oper_status=OperStatus.ACTIVE
517 ))
518
519 yield self.core_proxy.device_state_update(device.id,
520 connect_status=ConnectStatus.REACHABLE,
521 oper_status=OperStatus.ACTIVE)
522
523 # register ONUS
524 self.log.info('onu-found', onus=info.onus, len=len(info.onus))
525 for onu in info.onus:
526 vlan_id = onu.uni_port
527 yield self.core_proxy.child_device_detected(
528 parent_device_id=device.id,
529 parent_port_no=1,
530 child_device_type='ponsim_onu',
531 channel_id=vlan_id,
532 )
533
534 self.log.info('starting-frame-grpc-stream')
535 reactor.callInThread(self.rcv_grpc)
536 self.log.info('started-frame-grpc-stream')
537
538 # Start collecting stats from the device after a brief pause
539 self.start_kpi_collection(device.id)
540 except Exception as e:
541 log.exception("Exception-activating", e=e)
542
543 def get_ofp_device_info(self, device):
544 return SwitchCapability(
545 desc=ofp_desc(
546 hw_desc='ponsim pon',
547 sw_desc='ponsim pon',
548 serial_num=device.serial_number,
549 dp_desc='n/a'
550 ),
551 switch_features=ofp_switch_features(
552 n_buffers=256, # TODO fake for now
553 n_tables=2, # TODO ditto
554 capabilities=( # TODO and ditto
555 OFPC_FLOW_STATS
556 | OFPC_TABLE_STATS
557 | OFPC_PORT_STATS
558 | OFPC_GROUP_STATS
559 )
560 )
561 )
562
563 def get_ofp_port_info(self, device, port_no):
564 # Since the adapter created the device port then it has the reference of the port to
565 # return the capability. TODO: Do a lookup on the NNI port number and return the
566 # appropriate attributes
567 self.log.info('get_ofp_port_info', port_no=port_no,
568 info=self.ofp_port_no, device_id=device.id)
569 cap = OFPPF_1GB_FD | OFPPF_FIBER
570 return PortCapability(
571 port=LogicalPort(
572 ofp_port=ofp_port(
573 hw_addr=mac_str_to_tuple(
574 'AA:BB:CC:DD:EE:%02x' % port_no),
575 config=0,
576 state=OFPPS_LIVE,
577 curr=cap,
578 advertised=cap,
579 peer=cap,
580 curr_speed=OFPPF_1GB_FD,
581 max_speed=OFPPF_1GB_FD
582 ),
583 device_id=device.id,
584 device_port_no=port_no
585 )
586 )
587
588 # TODO - change for core 2.0
589 def reconcile(self, device):
590 self.log.info('reconciling-OLT-device')
591
592 @inlineCallbacks
593 def _rcv_frame(self, frame):
594 pkt = Ether(frame)
595
596 if pkt.haslayer(Dot1Q):
597 outer_shim = pkt.getlayer(Dot1Q)
598
599 if isinstance(outer_shim.payload, Dot1Q):
600 inner_shim = outer_shim.payload
601 cvid = inner_shim.vlan
602 popped_frame = (
603 Ether(src=pkt.src, dst=pkt.dst, type=inner_shim.type) /
604 inner_shim.payload
605 )
606 self.log.info('sending-packet-in',device_id=self.device_id, port=cvid)
607 yield self.core_proxy.send_packet_in(device_id=self.device_id,
608 port=cvid,
609 packet=str(popped_frame))
610 elif pkt.haslayer(Raw):
611 raw_data = json.loads(pkt.getlayer(Raw).load)
612 self.alarms.send_alarm(self, raw_data)
613
614 @inlineCallbacks
615 def rcv_grpc(self):
616 """
617 This call establishes a GRPC stream to receive frames.
618 """
619 yield self.get_channel()
620 stub = PonSimStub(self.channel)
621 # stub = PonSimStub(self.get_channel())
622
623 # Attempt to establish a grpc stream with the remote ponsim service
624 self.frames = stub.ReceiveFrames(Empty())
625
626 self.log.info('start-receiving-grpc-frames')
627
628 try:
629 for frame in self.frames:
630 self.log.info('received-grpc-frame',
631 frame_len=len(frame.payload))
632 yield self._rcv_frame(frame.payload)
633
634 except _Rendezvous, e:
635 log.warn('grpc-connection-lost', message=e.message)
636
637 self.log.info('stopped-receiving-grpc-frames')
638
639 @inlineCallbacks
640 def update_flow_table(self, flows):
641 yield self.get_channel()
642 stub = PonSimStub(self.channel)
643
644 self.log.info('pushing-olt-flow-table')
645 stub.UpdateFlowTable(FlowTable(
646 port=0,
647 flows=flows
648 ))
649 self.log.info('success')
650
651 def remove_from_flow_table(self, flows):
652 self.log.debug('remove-from-flow-table', flows=flows)
653 # TODO: Update PONSIM code to accept incremental flow changes
654 # Once completed, the accepts_add_remove_flow_updates for this
655 # device type can be set to True
656
657 def add_to_flow_table(self, flows):
658 self.log.debug('add-to-flow-table', flows=flows)
659 # TODO: Update PONSIM code to accept incremental flow changes
660 # Once completed, the accepts_add_remove_flow_updates for this
661 # device type can be set to True
662
663 def update_pm_config(self, device, pm_config):
664 log.info("handler-update-pm-config", device=device,
665 pm_config=pm_config)
666 self.pm_metrics.update(pm_config)
667
668 def send_proxied_message(self, proxy_address, msg):
669 self.log.info('sending-proxied-message')
670 if isinstance(msg, FlowTable):
671 stub = PonSimStub(self.get_channel())
672 self.log.info('pushing-onu-flow-table', port=msg.port)
673 res = stub.UpdateFlowTable(msg)
674 self.core_proxy.receive_proxied_message(proxy_address, res)
675
676 @inlineCallbacks
677 def process_inter_adapter_message(self, request):
678 self.log.info('process-inter-adapter-message', msg=request)
679 try:
680 if request.header.type == InterAdapterMessageType.FLOW_REQUEST:
681 f = FlowTable()
682 if request.body:
683 request.body.Unpack(f)
684 stub = PonSimStub(self.channel)
685 self.log.info('pushing-onu-flow-table')
686 res = stub.UpdateFlowTable(f)
687 # Send response back
688 reply = InterAdapterResponseBody()
689 reply.status = True
690 self.log.info('sending-response-back', reply=reply)
691 yield self.adapter_proxy.send_inter_adapter_message(
692 msg=reply,
693 type=InterAdapterMessageType.FLOW_RESPONSE,
694 from_adapter=self.adapter.name,
695 to_adapter=request.header.from_topic,
696 to_device_id=request.header.to_device_id,
697 message_id=request.header.id
698 )
699 elif request.header.type == InterAdapterMessageType.METRICS_REQUEST:
700 m = PonSimMetricsRequest()
701 if request.body:
702 request.body.Unpack(m)
703 stub = PonSimStub(self.channel)
704 self.log.info('proxying onu stats request', port=m.port)
705 res = stub.GetStats(m)
706 # Send response back
707 reply = InterAdapterResponseBody()
708 reply.status = True
709 reply.body.Pack(res)
710 self.log.info('sending-response-back', reply=reply)
711 yield self.adapter_proxy.send_inter_adapter_message(
712 msg=reply,
713 type=InterAdapterMessageType.METRICS_RESPONSE,
714 from_adapter=self.adapter.name,
715 to_adapter=request.header.from_topic,
716 to_device_id=request.header.to_device_id,
717 message_id=request.header.id
718 )
719 except Exception as e:
720 self.log.exception("error-processing-inter-adapter-message", e=e)
721
722 def packet_out(self, egress_port, msg):
723 self.log.info('sending-packet-out', egress_port=egress_port,
724 msg=hexify(msg))
725 try:
726 pkt = Ether(msg)
727 out_pkt = pkt
728 if egress_port != self.nni_port.port_no:
729 # don't do the vlan manipulation for the NNI port, vlans are already correct
730 out_pkt = (
731 Ether(src=pkt.src, dst=pkt.dst) /
732 Dot1Q(vlan=egress_port, type=pkt.type) /
733 pkt.payload
734 )
735
736 # TODO need better way of mapping logical ports to PON ports
737 out_port = self.nni_port.port_no if egress_port == self.nni_port.port_no else 1
738
739 # send over grpc stream
740 stub = PonSimStub(self.channel)
741 frame = PonSimFrame(id=self.device_id, payload=str(out_pkt),
742 out_port=out_port)
743 stub.SendFrame(frame)
744 except Exception as e:
745 self.log.exception("error-processing-packet-out", e=e)
746
747
748 @inlineCallbacks
749 def reboot(self):
750 self.log.info('rebooting', device_id=self.device_id)
751
752 yield self.core_proxy.device_state_update(self.device_id,
753 connect_status=ConnectStatus.UNREACHABLE)
754
755 # Update the child devices connect state to UNREACHABLE
756 yield self.core_proxy.children_state_update(self.device_id,
757 connect_status=ConnectStatus.UNREACHABLE)
758
759 # Sleep 10 secs, simulating a reboot
760 # TODO: send alert and clear alert after the reboot
761 yield asleep(10)
762
763 # Change the connection status back to REACHABLE. With a
764 # real OLT the connection state must be the actual state
765 yield self.core_proxy.device_state_update(self.device_id,
766 connect_status=ConnectStatus.REACHABLE)
767
768 # Update the child devices connect state to REACHABLE
769 yield self.core_proxy.children_state_update(self.device_id,
770 connect_status=ConnectStatus.REACHABLE)
771
772 self.log.info('rebooted', device_id=self.device_id)
773
774 def self_test_device(self, device):
775 """
776 This is called to Self a device based on a NBI call.
777 :param device: A Voltha.Device object.
778 :return: Will return result of self test
779 """
780 log.info('self-test-device', device=device.id)
781 raise NotImplementedError()
782
783 @inlineCallbacks
784 def disable(self):
785 self.log.info('disabling', device_id=self.device_id)
786
787 self.stop_kpi_collection()
788
789 # Update the operational status to UNKNOWN and connection status to UNREACHABLE
790 yield self.core_proxy.device_state_update(self.device_id,
791 oper_status=OperStatus.UNKNOWN,
792 connect_status=ConnectStatus.UNREACHABLE)
793
794 self.close_channel()
795 self.log.info('disabled-grpc-channel')
796
797 self.stop_kpi_collection()
798
799 # TODO:
800 # 1) Remove all flows from the device
801 # 2) Remove the device from ponsim
802
803 self.log.info('disabled', device_id=self.device_id)
804
805 @inlineCallbacks
806 def reenable(self):
807 self.log.info('re-enabling', device_id=self.device_id)
808
809 # Set the ofp_port_no and nni_port in case we bypassed the reconcile
810 # process if the device was in DISABLED state on voltha restart
811 if not self.ofp_port_no and not self.nni_port:
812 yield self.get_channel()
813 stub = PonSimStub(self.channel)
814 info = stub.GetDeviceInfo(Empty())
815 log.info('got-info', info=info)
816 self.ofp_port_no = info.nni_port
817 ports = yield self._get_nni_port()
818 # For ponsim, we are using only 1 NNI port
819 if ports.items:
820 self.nni_port = ports.items[0]
821
822 # Update the state of the NNI port
823 yield self.core_proxy.port_state_update(self.device_id,
824 port_type=Port.ETHERNET_NNI,
825 port_no=self.ofp_port_no,
826 oper_status=OperStatus.ACTIVE)
827
828 # Update the state of the PON port
829 yield self.core_proxy.port_state_update(self.device_id,
830 port_type=Port.PON_OLT,
831 port_no=1,
832 oper_status=OperStatus.ACTIVE)
833
834 # Set the operational state of the device to ACTIVE and connect status to REACHABLE
835 yield self.core_proxy.device_state_update(self.device_id,
836 connect_status=ConnectStatus.REACHABLE,
837 oper_status=OperStatus.ACTIVE)
838
839 # TODO: establish frame grpc-stream
840 # yield reactor.callInThread(self.rcv_grpc)
841
842 self.start_kpi_collection(self.device_id)
843
844 self.log.info('re-enabled', device_id=self.device_id)
845
846 def delete(self):
847 self.log.info('deleting', device_id=self.device_id)
848
849 self.close_channel()
850 self.log.info('disabled-grpc-channel')
851
852 # TODO:
853 # 1) Remove all flows from the device
854 # 2) Remove the device from ponsim
855
856 self.log.info('deleted', device_id=self.device_id)
857
858 def start_kpi_collection(self, device_id):
859
860 kafka_cluster_proxy = get_kafka_proxy()
861
862 def _collect(device_id, prefix):
863
864 try:
865 # Step 1: gather metrics from device
866 port_metrics = \
867 self.pm_metrics.collect_port_metrics(self.channel)
868
869 # Step 2: prepare the KpiEvent for submission
870 # we can time-stamp them here (or could use time derived from OLT
871 ts = arrow.utcnow().timestamp
872 kpi_event = KpiEvent(
873 type=KpiEventType.slice,
874 ts=ts,
875 prefixes={
876 # OLT NNI port
877 prefix + '.nni': MetricValuePairs(
878 metrics=port_metrics['nni']),
879 # OLT PON port
880 prefix + '.pon': MetricValuePairs(
881 metrics=port_metrics['pon'])
882 }
883 )
884
885 # Step 3: submit directly to the kafka bus
886 if kafka_cluster_proxy:
887 if isinstance(kpi_event, Message):
888 kpi_event = dumps(MessageToDict(kpi_event, True, True))
889 kafka_cluster_proxy.send_message("voltha.kpis", kpi_event)
890
891 except Exception as e:
892 log.exception('failed-to-submit-kpis', e=e)
893
894 self.pm_metrics.start_collector(_collect)
895
896 def stop_kpi_collection(self):
897 self.pm_metrics.stop_collector()