blob: 7af023e4284ccb16690b9ea8be14b96e60b7c1a4 [file] [log] [blame]
Dinesh Belwalkard1cbf4c2018-12-13 13:34:05 -08001#
2# Copyright 2018 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18Openolt adapter.
19"""
20import arrow
21import grpc
22import structlog
23from google.protobuf.empty_pb2 import Empty
24from google.protobuf.json_format import MessageToDict
25from scapy.layers.inet import Raw
26import json
27from google.protobuf.message import Message
28from grpc._channel import _Rendezvous
29from scapy.layers.l2 import Ether, Dot1Q
30from simplejson import dumps
31from twisted.internet import reactor
32from twisted.internet.defer import inlineCallbacks, returnValue
33from twisted.internet.task import LoopingCall
34
35from python.adapters.common.frameio.frameio import BpfProgramFilter, hexify
William Kurkiandaa6bb22019-03-07 12:26:28 -050036from python.adapters.iadapter import OltAdapter
Dinesh Belwalkard1cbf4c2018-12-13 13:34:05 -080037from python.common.utils.asleep import asleep
38from python.common.utils.registry import registry
Dinesh Belwalkard1cbf4c2018-12-13 13:34:05 -080039from python.adapters.kafka.kafka_proxy import get_kafka_proxy
40from python.protos import openolt_pb2
41from python.protos import third_party
42from python.protos.common_pb2 import OperStatus, ConnectStatus
43from python.protos.common_pb2 import LogLevel
44from python.protos.common_pb2 import OperationResp
45from python.protos.inter_container_pb2 import SwitchCapability, PortCapability, \
46 InterAdapterMessageType, InterAdapterResponseBody
Arun Arora5f89fb62018-12-19 08:25:54 +000047from python.protos.device_pb2 import Port, PmConfig, PmConfigs, \
Dinesh Belwalkard1cbf4c2018-12-13 13:34:05 -080048 DeviceType, DeviceTypes
49from python.protos.adapter_pb2 import Adapter
50from python.protos.adapter_pb2 import AdapterConfig
51
52
53from python.protos.events_pb2 import KpiEvent, KpiEventType, MetricValuePairs
54from python.protos.logical_device_pb2 import LogicalPort
55from python.protos.openflow_13_pb2 import OFPPS_LIVE, OFPPF_FIBER, \
56 OFPPF_1GB_FD, \
57 OFPC_GROUP_STATS, OFPC_PORT_STATS, OFPC_TABLE_STATS, OFPC_FLOW_STATS, \
58 ofp_switch_features, ofp_desc
59from python.protos.openflow_13_pb2 import ofp_port
William Kurkiandaa6bb22019-03-07 12:26:28 -050060from python.protos.ponsim_pb2 import FlowTable, PonSimFrame, PonSimMetricsRequest, PonSimStub
Dinesh Belwalkard1cbf4c2018-12-13 13:34:05 -080061
62_ = third_party
63log = structlog.get_logger()
64#OpenOltDefaults = {
65# 'support_classes': {
66# 'platform': OpenOltPlatform,
67# 'resource_mgr': OpenOltResourceMgr,
68# 'flow_mgr': OpenOltFlowMgr,
69# 'alarm_mgr': OpenOltAlarmMgr,
70# 'stats_mgr': OpenOltStatisticsMgr,
71# 'bw_mgr': OpenOltBW
72# }
73#}
74
William Kurkiandaa6bb22019-03-07 12:26:28 -050075class AdapterPmMetrics:
76 def __init__(self, device):
77 self.pm_names = {'tx_64_pkts', 'tx_65_127_pkts', 'tx_128_255_pkts',
78 'tx_256_511_pkts', 'tx_512_1023_pkts',
79 'tx_1024_1518_pkts', 'tx_1519_9k_pkts',
80 'rx_64_pkts', 'rx_65_127_pkts',
81 'rx_128_255_pkts', 'rx_256_511_pkts',
82 'rx_512_1023_pkts', 'rx_1024_1518_pkts',
83 'rx_1519_9k_pkts'}
84 self.device = device
85 self.id = device.id
86 self.name = 'ponsim_olt'
87 self.default_freq = 150
88 self.grouped = False
89 self.freq_override = False
90 self.pon_metrics_config = dict()
91 self.nni_metrics_config = dict()
92 self.lc = None
93 for m in self.pm_names:
94 self.pon_metrics_config[m] = PmConfig(name=m,
95 type=PmConfig.COUNTER,
96 enabled=True)
97 self.nni_metrics_config[m] = PmConfig(name=m,
98 type=PmConfig.COUNTER,
99 enabled=True)
Dinesh Belwalkard1cbf4c2018-12-13 13:34:05 -0800100
William Kurkiandaa6bb22019-03-07 12:26:28 -0500101 def update(self, pm_config):
102 if self.default_freq != pm_config.default_freq:
103 # Update the callback to the new frequency.
104 self.default_freq = pm_config.default_freq
105 self.lc.stop()
106 self.lc.start(interval=self.default_freq / 10)
107 for m in pm_config.metrics:
108 self.pon_metrics_config[m.name].enabled = m.enabled
109 self.nni_metrics_config[m.name].enabled = m.enabled
110
111 def make_proto(self):
112 pm_config = PmConfigs(
113 id=self.id,
114 default_freq=self.default_freq,
115 grouped=False,
116 freq_override=False)
117 for m in sorted(self.pon_metrics_config):
118 pm = self.pon_metrics_config[m] # Either will do they're the same
119 pm_config.metrics.extend([PmConfig(name=pm.name,
120 type=pm.type,
121 enabled=pm.enabled)])
122 return pm_config
123
124 def collect_port_metrics(self, channel):
125 rtrn_port_metrics = dict()
126 stub = ponsim_pb2.PonSimStub(channel)
127 stats = stub.GetStats(Empty())
128 rtrn_port_metrics['pon'] = self.extract_pon_metrics(stats)
129 rtrn_port_metrics['nni'] = self.extract_nni_metrics(stats)
130 return rtrn_port_metrics
131
132 def extract_pon_metrics(self, stats):
133 rtrn_pon_metrics = dict()
134 for m in stats.metrics:
135 if m.port_name == "pon":
136 for p in m.packets:
137 if self.pon_metrics_config[p.name].enabled:
138 rtrn_pon_metrics[p.name] = p.value
139 return rtrn_pon_metrics
140
141 def extract_nni_metrics(self, stats):
142 rtrn_pon_metrics = dict()
143 for m in stats.metrics:
144 if m.port_name == "nni":
145 for p in m.packets:
146 if self.pon_metrics_config[p.name].enabled:
147 rtrn_pon_metrics[p.name] = p.value
148 return rtrn_pon_metrics
149
150 def start_collector(self, callback):
151 log.info("starting-pm-collection", device_name=self.name,
152 device_id=self.device.id)
153 prefix = 'voltha.{}.{}'.format(self.name, self.device.id)
154 self.lc = LoopingCall(callback, self.device.id, prefix)
155 self.lc.start(interval=self.default_freq / 10)
156
157 def stop_collector(self):
158 log.info("stopping-pm-collection", device_name=self.name,
159 device_id=self.device.id)
160 self.lc.stop()
161
162
163class AdapterAlarms:
164 def __init__(self, adapter, device):
165 self.adapter = adapter
166 self.device = device
167 self.lc = None
168
169 # TODO: Implement code to send to kafka cluster directly instead of
170 # going through the voltha core.
171 def send_alarm(self, context_data, alarm_data):
172 log.debug("send-alarm-not-implemented")
173 return
174
175
176
177class OpenoltAdapter(OltAdapter):
Dinesh Belwalkard1cbf4c2018-12-13 13:34:05 -0800178 name = 'openolt'
179
180 supported_device_types = [
181 DeviceType(
182 id=name,
183 adapter=name,
184 accepts_bulk_flow_update=True,
William Kurkiandaa6bb22019-03-07 12:26:28 -0500185 accepts_add_remove_flow_updates=True
Dinesh Belwalkard1cbf4c2018-12-13 13:34:05 -0800186 )
187 ]
188
Arun Arora5f89fb62018-12-19 08:25:54 +0000189 # System Init Methods #
Dinesh Belwalkard1cbf4c2018-12-13 13:34:05 -0800190 def __init__(self, core_proxy, adapter_proxy, config):
William Kurkiandaa6bb22019-03-07 12:26:28 -0500191 super(OpenoltAdapter, self).__init__(core_proxy=core_proxy,
192 adapter_proxy=adapter_proxy,
193 config=config,
194 device_handler_class=OpenoltHandler,
195 name='openolt',
196 vendor='Voltha project',
197 version='0.4',
198 device_type='openolt',
199 accepts_bulk_flow_update=True,
200 accepts_add_remove_flow_updates=False)
Arun Arora5f89fb62018-12-19 08:25:54 +0000201 self.adapter_proxy = adapter_proxy
202 self.core_proxy = core_proxy
Dinesh Belwalkard1cbf4c2018-12-13 13:34:05 -0800203 self.config = config
204 self.descriptor = Adapter(
205 id=self.name,
206 vendor='OLT white box vendor',
207 version='0.1',
208 config=config
209 )
Arun Arora5f89fb62018-12-19 08:25:54 +0000210 log.debug('openolt.__init__', adapter_proxy=adapter_proxy)
Dinesh Belwalkard1cbf4c2018-12-13 13:34:05 -0800211 self.devices = dict() # device_id -> OpenoltDevice()
212 self.interface = registry('main').get_args().interface
213 self.logical_device_id_to_root_device_id = dict()
214 self.num_devices = 0
215
Arun Arora5f89fb62018-12-19 08:25:54 +0000216 def start(self):
217 log.info('started', interface=self.interface)
218
219 def stop(self):
220 log.info('stopped', interface=self.interface)
221
222
223 # Info Methods #
224 def adapter_descriptor(self):
225 log.debug('get descriptor', interface=self.interface)
226 return self.descriptor
227
228 def device_types(self):
229 log.debug('get device_types', interface=self.interface,
230 items=self.supported_device_types)
231 return DeviceTypes(items=self.supported_device_types)
232
233 def health(self):
234 log.debug('get health', interface=self.interface)
235 raise NotImplementedError()
236
237 def get_device_details(self, device):
238 log.debug('get_device_details', device=device)
239 raise NotImplementedError()
240
241
242 # Device Operation Methods #
243 def change_master_state(self, master):
244 log.debug('change_master_state', interface=self.interface,
245 master=master)
246 raise NotImplementedError()
247
248 def abandon_device(self, device):
249 log.info('abandon-device', device=device)
250 raise NotImplementedError()
251
252
253 # Configuration Methods #
254 def update_flows_incrementally(self, device, flow_changes, group_changes):
255 log.debug('update_flows_incrementally', device=device,
256 flow_changes=flow_changes, group_changes=group_changes)
257 log.info('This device does not allow this, therefore it is Not '
258 'implemented')
259 raise NotImplementedError()
260
261 def update_pm_config(self, device, pm_configs):
262 log.info('update_pm_config - Not implemented yet', device=device,
263 pm_configs=pm_configs)
264 raise NotImplementedError()
265
266 def receive_proxied_message(self, proxy_address, msg):
267 log.debug('receive_proxied_message - Not implemented',
268 proxy_address=proxy_address,
269 proxied_msg=msg)
270 raise NotImplementedError()
271
272 def receive_inter_adapter_message(self, msg):
273 log.info('rx_inter_adapter_msg - Not implemented')
274 raise NotImplementedError()
275
276
277 # Image Operations Methods #
278 def download_image(self, device, request):
279 log.info('image_download - Not implemented yet', device=device,
280 request=request)
281 raise NotImplementedError()
282
283 def get_image_download_status(self, device, request):
284 log.info('get_image_download - Not implemented yet', device=device,
285 request=request)
286 raise NotImplementedError()
287
288 def cancel_image_download(self, device, request):
289 log.info('cancel_image_download - Not implemented yet', device=device)
290 raise NotImplementedError()
291
292 def activate_image_update(self, device, request):
293 log.info('activate_image_update - Not implemented yet',
294 device=device, request=request)
295 raise NotImplementedError()
296
297 def revert_image_update(self, device, request):
298 log.info('revert_image_update - Not implemented yet',
299 device=device, request=request)
300 raise NotImplementedError()
301
302 def self_test_device(self, device):
303 # from voltha.protos.voltha_pb2 import SelfTestResponse
304 log.info('Not implemented yet')
305 raise NotImplementedError()
306
307
308 # PON Operations Methods #
309 def create_interface(self, device, data):
310 log.debug('create-interface - Not implemented - We do not use this',
311 data=data)
312 raise NotImplementedError()
313
314 def update_interface(self, device, data):
315 log.debug('update-interface - Not implemented - We do not use this',
316 data=data)
317 raise NotImplementedError()
318
319 def remove_interface(self, device, data):
320 log.debug('remove-interface - Not implemented - We do not use this',
321 data=data)
322 raise NotImplementedError()
323
324 def receive_onu_detect_state(self, proxy_address, state):
325 log.debug('receive-onu-detect-state - Not implemented - We do not '
326 'use this', proxy_address=proxy_address,
327 state=state)
328 raise NotImplementedError()
329
330 def create_tcont(self, device, tcont_data, traffic_descriptor_data):
331 log.info('create-tcont - Not implemented - We do not use this',
332 tcont_data=tcont_data,
333 traffic_descriptor_data=traffic_descriptor_data)
334 raise NotImplementedError()
335
336 def update_tcont(self, device, tcont_data, traffic_descriptor_data):
337 log.info('update-tcont - Not implemented - We do not use this',
338 tcont_data=tcont_data,
339 traffic_descriptor_data=traffic_descriptor_data)
340 raise NotImplementedError()
341
342 def remove_tcont(self, device, tcont_data, traffic_descriptor_data):
343 log.info('remove-tcont - Not implemented - We do not use this',
344 tcont_data=tcont_data,
345 traffic_descriptor_data=traffic_descriptor_data)
346 raise NotImplementedError()
347
348 def create_gemport(self, device, data):
349 log.info('create-gemport - Not implemented - We do not use this',
350 data=data)
351 raise NotImplementedError()
352
353 def update_gemport(self, device, data):
354 log.info('update-gemport - Not implemented - We do not use this',
355 data=data)
356 raise NotImplementedError()
357
358 def remove_gemport(self, device, data):
359 log.info('remove-gemport - Not implemented - We do not use this',
360 data=data)
361 raise NotImplementedError()
362
363 def create_multicast_gemport(self, device, data):
364 log.info('create-mcast-gemport - Not implemented - We do not use '
365 'this', data=data)
366 raise NotImplementedError()
367
368 def update_multicast_gemport(self, device, data):
369 log.info('update-mcast-gemport - Not implemented - We do not use '
370 'this', data=data)
371 raise NotImplementedError()
372
373 def remove_multicast_gemport(self, device, data):
374 log.info('remove-mcast-gemport - Not implemented - We do not use '
375 'this', data=data)
376 raise NotImplementedError()
377
378 def create_multicast_distribution_set(self, device, data):
379 log.info('create-mcast-distribution-set - Not implemented - We do '
380 'not use this', data=data)
381 raise NotImplementedError()
382
383 def update_multicast_distribution_set(self, device, data):
384 log.info('update-mcast-distribution-set - Not implemented - We do '
385 'not use this', data=data)
386 raise NotImplementedError()
387
388 def remove_multicast_distribution_set(self, device, data):
389 log.info('remove-mcast-distribution-set - Not implemented - We do '
390 'not use this', data=data)
391 raise NotImplementedError()
392
393
394 # Alarm Methods #
395 def suppress_alarm(self, filter):
396 log.info('suppress_alarm - Not implemented yet', filter=filter)
397 raise NotImplementedError()
398
399 def unsuppress_alarm(self, filter):
400 log.info('unsuppress_alarm - Not implemented yet', filter=filter)
401 raise NotImplementedError()
William Kurkiandaa6bb22019-03-07 12:26:28 -0500402
403class OpenoltHandler(object):
404 def __init__(self, adapter, device_id):
405 self.adapter = adapter
406 self.core_proxy = adapter.core_proxy
407 self.adapter_proxy = adapter.adapter_proxy
408 self.device_id = device_id
409 self.log = structlog.get_logger(device_id=device_id)
410 self.channel = None
411 self.io_port = None
412 self.logical_device_id = None
413 self.nni_port = None
414 self.ofp_port_no = None
415 self.interface = registry('main').get_args().interface
416 self.pm_metrics = None
417 self.alarms = None
418 self.frames = None
419
420 @inlineCallbacks
421 def get_channel(self):
422 if self.channel is None:
423 try:
424 device = yield self.core_proxy.get_device(self.device_id)
425 self.log.info('device-info', device=device,
426 host_port=device.host_and_port)
427 self.channel = grpc.insecure_channel(device.host_and_port)
428 except Exception as e:
429 log.exception("ponsim-connection-failure", e=e)
430
431 # returnValue(self.channel)
432
433 def close_channel(self):
434 if self.channel is None:
435 self.log.info('grpc-channel-already-closed')
436 return
437 else:
438 if self.frames is not None:
439 self.frames.cancel()
440 self.frames = None
441 self.log.info('cancelled-grpc-frame-stream')
442
443 self.channel.unsubscribe(lambda *args: None)
444 self.channel = None
445
446 self.log.info('grpc-channel-closed')
447
448 @inlineCallbacks
449 def _get_nni_port(self):
450 ports = yield self.core_proxy.get_ports(self.device_id,
451 Port.ETHERNET_NNI)
452 returnValue(ports)
453
454 @inlineCallbacks
455 def activate(self, device):
456 try:
457 self.log.info('activating')
458 print (dir(device))
459 if not device.host_and_port:
460 device.oper_status = OperStatus.FAILED
461 device.reason = 'No host_and_port field provided'
462 self.core_proxy.device_update(device)
463 return
464 yield self.get_channel()
465 stub = PonSimStub(self.channel)
466 info = stub.GetDeviceInfo(Empty())
467 log.info('got-info', info=info, device_id=device.id)
468 self.ofp_port_no = info.nni_port
469
470 device.root = True
471 device.vendor = 'ponsim'
472 device.model = 'n/a'
473 device.serial_number = device.host_and_port
474 device.mac_address = "AA:BB:CC:DD:EE:FF"
475 yield self.core_proxy.device_update(device)
476
477 # Now set the initial PM configuration for this device
478 self.pm_metrics = AdapterPmMetrics(device)
479 pm_config = self.pm_metrics.make_proto()
480 log.info("initial-pm-config", pm_config=pm_config)
481 self.core_proxy.device_pm_config_update(pm_config, init=True)
482
483 # Setup alarm handler
484 self.alarms = AdapterAlarms(self.adapter, device)
485
486 nni_port = Port(
487 port_no=info.nni_port,
488 label='NNI facing Ethernet port',
489 type=Port.ETHERNET_NNI,
490 oper_status=OperStatus.ACTIVE
491 )
492 self.nni_port = nni_port
493 yield self.core_proxy.port_created(device.id, nni_port)
494 yield self.core_proxy.port_created(device.id, Port(
495 port_no=1,
496 label='PON port',
497 type=Port.PON_OLT,
498 oper_status=OperStatus.ACTIVE
499 ))
500
501 yield self.core_proxy.device_state_update(device.id,
502 connect_status=ConnectStatus.REACHABLE,
503 oper_status=OperStatus.ACTIVE)
504
505 # register ONUS
506 self.log.info('onu-found', onus=info.onus, len=len(info.onus))
507 for onu in info.onus:
508 vlan_id = onu.uni_port
509 yield self.core_proxy.child_device_detected(
510 parent_device_id=device.id,
511 parent_port_no=1,
512 child_device_type='ponsim_onu',
513 channel_id=vlan_id,
514 )
515
516 self.log.info('starting-frame-grpc-stream')
517 reactor.callInThread(self.rcv_grpc)
518 self.log.info('started-frame-grpc-stream')
519
520 # Start collecting stats from the device after a brief pause
521 self.start_kpi_collection(device.id)
522 except Exception as e:
523 log.exception("Exception-activating", e=e)
524
525 def get_ofp_device_info(self, device):
526 return SwitchCapability(
527 desc=ofp_desc(
528 hw_desc='ponsim pon',
529 sw_desc='ponsim pon',
530 serial_num=device.serial_number,
531 dp_desc='n/a'
532 ),
533 switch_features=ofp_switch_features(
534 n_buffers=256, # TODO fake for now
535 n_tables=2, # TODO ditto
536 capabilities=( # TODO and ditto
537 OFPC_FLOW_STATS
538 | OFPC_TABLE_STATS
539 | OFPC_PORT_STATS
540 | OFPC_GROUP_STATS
541 )
542 )
543 )
544
545 def get_ofp_port_info(self, device, port_no):
546 # Since the adapter created the device port then it has the reference of the port to
547 # return the capability. TODO: Do a lookup on the NNI port number and return the
548 # appropriate attributes
549 self.log.info('get_ofp_port_info', port_no=port_no,
550 info=self.ofp_port_no, device_id=device.id)
551 cap = OFPPF_1GB_FD | OFPPF_FIBER
552 return PortCapability(
553 port=LogicalPort(
554 ofp_port=ofp_port(
555 hw_addr=mac_str_to_tuple(
556 'AA:BB:CC:DD:EE:%02x' % port_no),
557 config=0,
558 state=OFPPS_LIVE,
559 curr=cap,
560 advertised=cap,
561 peer=cap,
562 curr_speed=OFPPF_1GB_FD,
563 max_speed=OFPPF_1GB_FD
564 ),
565 device_id=device.id,
566 device_port_no=port_no
567 )
568 )
569
570 # TODO - change for core 2.0
571 def reconcile(self, device):
572 self.log.info('reconciling-OLT-device')
573
574 @inlineCallbacks
575 def _rcv_frame(self, frame):
576 pkt = Ether(frame)
577
578 if pkt.haslayer(Dot1Q):
579 outer_shim = pkt.getlayer(Dot1Q)
580
581 if isinstance(outer_shim.payload, Dot1Q):
582 inner_shim = outer_shim.payload
583 cvid = inner_shim.vlan
584 popped_frame = (
585 Ether(src=pkt.src, dst=pkt.dst, type=inner_shim.type) /
586 inner_shim.payload
587 )
588 self.log.info('sending-packet-in',device_id=self.device_id, port=cvid)
589 yield self.core_proxy.send_packet_in(device_id=self.device_id,
590 port=cvid,
591 packet=str(popped_frame))
592 elif pkt.haslayer(Raw):
593 raw_data = json.loads(pkt.getlayer(Raw).load)
594 self.alarms.send_alarm(self, raw_data)
595
596 @inlineCallbacks
597 def rcv_grpc(self):
598 """
599 This call establishes a GRPC stream to receive frames.
600 """
601 yield self.get_channel()
602 stub = PonSimStub(self.channel)
603 # stub = PonSimStub(self.get_channel())
604
605 # Attempt to establish a grpc stream with the remote ponsim service
606 self.frames = stub.ReceiveFrames(Empty())
607
608 self.log.info('start-receiving-grpc-frames')
609
610 try:
611 for frame in self.frames:
612 self.log.info('received-grpc-frame',
613 frame_len=len(frame.payload))
614 yield self._rcv_frame(frame.payload)
615
616 except _Rendezvous, e:
617 log.warn('grpc-connection-lost', message=e.message)
618
619 self.log.info('stopped-receiving-grpc-frames')
620
621 @inlineCallbacks
622 def update_flow_table(self, flows):
623 yield self.get_channel()
624 stub = PonSimStub(self.channel)
625
626 self.log.info('pushing-olt-flow-table')
627 stub.UpdateFlowTable(FlowTable(
628 port=0,
629 flows=flows
630 ))
631 self.log.info('success')
632
633 def remove_from_flow_table(self, flows):
634 self.log.debug('remove-from-flow-table', flows=flows)
635 # TODO: Update PONSIM code to accept incremental flow changes
636 # Once completed, the accepts_add_remove_flow_updates for this
637 # device type can be set to True
638
639 def add_to_flow_table(self, flows):
640 self.log.debug('add-to-flow-table', flows=flows)
641 # TODO: Update PONSIM code to accept incremental flow changes
642 # Once completed, the accepts_add_remove_flow_updates for this
643 # device type can be set to True
644
645 def update_pm_config(self, device, pm_config):
646 log.info("handler-update-pm-config", device=device,
647 pm_config=pm_config)
648 self.pm_metrics.update(pm_config)
649
650 def send_proxied_message(self, proxy_address, msg):
651 self.log.info('sending-proxied-message')
652 if isinstance(msg, FlowTable):
653 stub = PonSimStub(self.get_channel())
654 self.log.info('pushing-onu-flow-table', port=msg.port)
655 res = stub.UpdateFlowTable(msg)
656 self.core_proxy.receive_proxied_message(proxy_address, res)
657
658 @inlineCallbacks
659 def process_inter_adapter_message(self, request):
660 self.log.info('process-inter-adapter-message', msg=request)
661 try:
662 if request.header.type == InterAdapterMessageType.FLOW_REQUEST:
663 f = FlowTable()
664 if request.body:
665 request.body.Unpack(f)
666 stub = PonSimStub(self.channel)
667 self.log.info('pushing-onu-flow-table')
668 res = stub.UpdateFlowTable(f)
669 # Send response back
670 reply = InterAdapterResponseBody()
671 reply.status = True
672 self.log.info('sending-response-back', reply=reply)
673 yield self.adapter_proxy.send_inter_adapter_message(
674 msg=reply,
675 type=InterAdapterMessageType.FLOW_RESPONSE,
676 from_adapter=self.adapter.name,
677 to_adapter=request.header.from_topic,
678 to_device_id=request.header.to_device_id,
679 message_id=request.header.id
680 )
681 elif request.header.type == InterAdapterMessageType.METRICS_REQUEST:
682 m = PonSimMetricsRequest()
683 if request.body:
684 request.body.Unpack(m)
685 stub = PonSimStub(self.channel)
686 self.log.info('proxying onu stats request', port=m.port)
687 res = stub.GetStats(m)
688 # Send response back
689 reply = InterAdapterResponseBody()
690 reply.status = True
691 reply.body.Pack(res)
692 self.log.info('sending-response-back', reply=reply)
693 yield self.adapter_proxy.send_inter_adapter_message(
694 msg=reply,
695 type=InterAdapterMessageType.METRICS_RESPONSE,
696 from_adapter=self.adapter.name,
697 to_adapter=request.header.from_topic,
698 to_device_id=request.header.to_device_id,
699 message_id=request.header.id
700 )
701 except Exception as e:
702 self.log.exception("error-processing-inter-adapter-message", e=e)
703
704 def packet_out(self, egress_port, msg):
705 self.log.info('sending-packet-out', egress_port=egress_port,
706 msg=hexify(msg))
707 try:
708 pkt = Ether(msg)
709 out_pkt = pkt
710 if egress_port != self.nni_port.port_no:
711 # don't do the vlan manipulation for the NNI port, vlans are already correct
712 out_pkt = (
713 Ether(src=pkt.src, dst=pkt.dst) /
714 Dot1Q(vlan=egress_port, type=pkt.type) /
715 pkt.payload
716 )
717
718 # TODO need better way of mapping logical ports to PON ports
719 out_port = self.nni_port.port_no if egress_port == self.nni_port.port_no else 1
720
721 # send over grpc stream
722 stub = PonSimStub(self.channel)
723 frame = PonSimFrame(id=self.device_id, payload=str(out_pkt),
724 out_port=out_port)
725 stub.SendFrame(frame)
726 except Exception as e:
727 self.log.exception("error-processing-packet-out", e=e)
728
729
730 @inlineCallbacks
731 def reboot(self):
732 self.log.info('rebooting', device_id=self.device_id)
733
734 yield self.core_proxy.device_state_update(self.device_id,
735 connect_status=ConnectStatus.UNREACHABLE)
736
737 # Update the child devices connect state to UNREACHABLE
738 yield self.core_proxy.children_state_update(self.device_id,
739 connect_status=ConnectStatus.UNREACHABLE)
740
741 # Sleep 10 secs, simulating a reboot
742 # TODO: send alert and clear alert after the reboot
743 yield asleep(10)
744
745 # Change the connection status back to REACHABLE. With a
746 # real OLT the connection state must be the actual state
747 yield self.core_proxy.device_state_update(self.device_id,
748 connect_status=ConnectStatus.REACHABLE)
749
750 # Update the child devices connect state to REACHABLE
751 yield self.core_proxy.children_state_update(self.device_id,
752 connect_status=ConnectStatus.REACHABLE)
753
754 self.log.info('rebooted', device_id=self.device_id)
755
756 def self_test_device(self, device):
757 """
758 This is called to Self a device based on a NBI call.
759 :param device: A Voltha.Device object.
760 :return: Will return result of self test
761 """
762 log.info('self-test-device', device=device.id)
763 raise NotImplementedError()
764
765 @inlineCallbacks
766 def disable(self):
767 self.log.info('disabling', device_id=self.device_id)
768
769 self.stop_kpi_collection()
770
771 # Update the operational status to UNKNOWN and connection status to UNREACHABLE
772 yield self.core_proxy.device_state_update(self.device_id,
773 oper_status=OperStatus.UNKNOWN,
774 connect_status=ConnectStatus.UNREACHABLE)
775
776 self.close_channel()
777 self.log.info('disabled-grpc-channel')
778
779 self.stop_kpi_collection()
780
781 # TODO:
782 # 1) Remove all flows from the device
783 # 2) Remove the device from ponsim
784
785 self.log.info('disabled', device_id=self.device_id)
786
787 @inlineCallbacks
788 def reenable(self):
789 self.log.info('re-enabling', device_id=self.device_id)
790
791 # Set the ofp_port_no and nni_port in case we bypassed the reconcile
792 # process if the device was in DISABLED state on voltha restart
793 if not self.ofp_port_no and not self.nni_port:
794 yield self.get_channel()
795 stub = PonSimStub(self.channel)
796 info = stub.GetDeviceInfo(Empty())
797 log.info('got-info', info=info)
798 self.ofp_port_no = info.nni_port
799 ports = yield self._get_nni_port()
800 # For ponsim, we are using only 1 NNI port
801 if ports.items:
802 self.nni_port = ports.items[0]
803
804 # Update the state of the NNI port
805 yield self.core_proxy.port_state_update(self.device_id,
806 port_type=Port.ETHERNET_NNI,
807 port_no=self.ofp_port_no,
808 oper_status=OperStatus.ACTIVE)
809
810 # Update the state of the PON port
811 yield self.core_proxy.port_state_update(self.device_id,
812 port_type=Port.PON_OLT,
813 port_no=1,
814 oper_status=OperStatus.ACTIVE)
815
816 # Set the operational state of the device to ACTIVE and connect status to REACHABLE
817 yield self.core_proxy.device_state_update(self.device_id,
818 connect_status=ConnectStatus.REACHABLE,
819 oper_status=OperStatus.ACTIVE)
820
821 # TODO: establish frame grpc-stream
822 # yield reactor.callInThread(self.rcv_grpc)
823
824 self.start_kpi_collection(self.device_id)
825
826 self.log.info('re-enabled', device_id=self.device_id)
827
828 def delete(self):
829 self.log.info('deleting', device_id=self.device_id)
830
831 self.close_channel()
832 self.log.info('disabled-grpc-channel')
833
834 # TODO:
835 # 1) Remove all flows from the device
836 # 2) Remove the device from ponsim
837
838 self.log.info('deleted', device_id=self.device_id)
839
840 def start_kpi_collection(self, device_id):
841
842 kafka_cluster_proxy = get_kafka_proxy()
843
844 def _collect(device_id, prefix):
845
846 try:
847 # Step 1: gather metrics from device
848 port_metrics = \
849 self.pm_metrics.collect_port_metrics(self.channel)
850
851 # Step 2: prepare the KpiEvent for submission
852 # we can time-stamp them here (or could use time derived from OLT
853 ts = arrow.utcnow().timestamp
854 kpi_event = KpiEvent(
855 type=KpiEventType.slice,
856 ts=ts,
857 prefixes={
858 # OLT NNI port
859 prefix + '.nni': MetricValuePairs(
860 metrics=port_metrics['nni']),
861 # OLT PON port
862 prefix + '.pon': MetricValuePairs(
863 metrics=port_metrics['pon'])
864 }
865 )
866
867 # Step 3: submit directly to the kafka bus
868 if kafka_cluster_proxy:
869 if isinstance(kpi_event, Message):
870 kpi_event = dumps(MessageToDict(kpi_event, True, True))
871 kafka_cluster_proxy.send_message("voltha.kpis", kpi_event)
872
873 except Exception as e:
874 log.exception('failed-to-submit-kpis', e=e)
875
876 self.pm_metrics.start_collector(_collect)
877
878 def stop_kpi_collection(self):
879 self.pm_metrics.stop_collector()