blob: e47d14d06a55ae24a55eb212362f2ebfe81eb6cb [file] [log] [blame]
khenaidoob9203542018-09-17 22:56:37 -04001#
2# Copyright 2017 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18Fully simulated OLT adapter.
19"""
20from uuid import uuid4
21
22import arrow
23import adapters.common.openflow.utils as fd
24import grpc
25import structlog
26from scapy.layers.l2 import Ether, Dot1Q
27from twisted.internet import reactor
khenaidoo92e62c52018-10-03 14:02:54 -040028from twisted.internet.defer import inlineCallbacks, returnValue
khenaidoob9203542018-09-17 22:56:37 -040029from grpc._channel import _Rendezvous
30
31from adapters.common.frameio.frameio import BpfProgramFilter, hexify
32from adapters.common.utils.asleep import asleep
33from twisted.internet.task import LoopingCall
34from adapters.iadapter import OltAdapter
35from adapters.protos import third_party
36from adapters.protos import openflow_13_pb2 as ofp
37from adapters.protos import ponsim_pb2
38from adapters.protos.common_pb2 import OperStatus, ConnectStatus, AdminState
39from adapters.protos.device_pb2 import Port, PmConfig, PmConfigs
40from adapters.protos.events_pb2 import KpiEvent, KpiEventType, MetricValuePairs
41from google.protobuf.empty_pb2 import Empty
42from adapters.protos.logical_device_pb2 import LogicalPort, LogicalDevice
43from adapters.protos.openflow_13_pb2 import OFPPS_LIVE, OFPPF_FIBER, \
44 OFPPF_1GB_FD, \
45 OFPC_GROUP_STATS, OFPC_PORT_STATS, OFPC_TABLE_STATS, OFPC_FLOW_STATS, \
46 ofp_switch_features, ofp_desc
47from adapters.protos.openflow_13_pb2 import ofp_port
48from adapters.protos.ponsim_pb2 import FlowTable, PonSimFrame
49from adapters.protos.core_adapter_pb2 import SwitchCapability, PortCapability
50from adapters.common.utils.registry import registry
khenaidoo4d4802d2018-10-04 21:59:49 -040051from adapters.kafka.kafka_proxy import get_kafka_proxy
52from simplejson import dumps
53from google.protobuf.json_format import MessageToDict
54from google.protobuf.message import Message
55
khenaidoob9203542018-09-17 22:56:37 -040056
57_ = third_party
58log = structlog.get_logger()
59
60PACKET_IN_VLAN = 4000
61is_inband_frame = BpfProgramFilter('(ether[14:2] & 0xfff) = 0x{:03x}'.format(
62 PACKET_IN_VLAN))
63
64def mac_str_to_tuple(mac):
65 return tuple(int(d, 16) for d in mac.split(':'))
66
67class AdapterPmMetrics:
68 def __init__(self, device):
69 self.pm_names = {'tx_64_pkts', 'tx_65_127_pkts', 'tx_128_255_pkts',
70 'tx_256_511_pkts', 'tx_512_1023_pkts',
71 'tx_1024_1518_pkts', 'tx_1519_9k_pkts',
72 'rx_64_pkts', 'rx_65_127_pkts',
73 'rx_128_255_pkts', 'rx_256_511_pkts',
74 'rx_512_1023_pkts', 'rx_1024_1518_pkts',
75 'rx_1519_9k_pkts'}
76 self.device = device
77 self.id = device.id
78 self.name = 'ponsim_olt'
79 self.default_freq = 150
80 self.grouped = False
81 self.freq_override = False
82 self.pon_metrics_config = dict()
83 self.nni_metrics_config = dict()
84 self.lc = None
85 for m in self.pm_names:
86 self.pon_metrics_config[m] = PmConfig(name=m,
87 type=PmConfig.COUNTER,
88 enabled=True)
89 self.nni_metrics_config[m] = PmConfig(name=m,
90 type=PmConfig.COUNTER,
91 enabled=True)
92
93 def update(self, pm_config):
94 if self.default_freq != pm_config.default_freq:
95 # Update the callback to the new frequency.
96 self.default_freq = pm_config.default_freq
97 self.lc.stop()
98 self.lc.start(interval=self.default_freq / 10)
99 for m in pm_config.metrics:
100 self.pon_metrics_config[m.name].enabled = m.enabled
101 self.nni_metrics_config[m.name].enabled = m.enabled
102
103 def make_proto(self):
104 pm_config = PmConfigs(
105 id=self.id,
106 default_freq=self.default_freq,
107 grouped=False,
108 freq_override=False)
109 for m in sorted(self.pon_metrics_config):
110 pm = self.pon_metrics_config[m] # Either will do they're the same
111 pm_config.metrics.extend([PmConfig(name=pm.name,
112 type=pm.type,
113 enabled=pm.enabled)])
114 return pm_config
115
116 def collect_port_metrics(self, channel):
117 rtrn_port_metrics = dict()
118 stub = ponsim_pb2.PonSimStub(channel)
119 stats = stub.GetStats(Empty())
120 rtrn_port_metrics['pon'] = self.extract_pon_metrics(stats)
121 rtrn_port_metrics['nni'] = self.extract_nni_metrics(stats)
122 return rtrn_port_metrics
123
124 def extract_pon_metrics(self, stats):
125 rtrn_pon_metrics = dict()
126 for m in stats.metrics:
127 if m.port_name == "pon":
128 for p in m.packets:
129 if self.pon_metrics_config[p.name].enabled:
130 rtrn_pon_metrics[p.name] = p.value
131 return rtrn_pon_metrics
132
133 def extract_nni_metrics(self, stats):
134 rtrn_pon_metrics = dict()
135 for m in stats.metrics:
136 if m.port_name == "nni":
137 for p in m.packets:
138 if self.pon_metrics_config[p.name].enabled:
139 rtrn_pon_metrics[p.name] = p.value
140 return rtrn_pon_metrics
141
142 def start_collector(self, callback):
143 log.info("starting-pm-collection", device_name=self.name,
144 device_id=self.device.id)
145 prefix = 'voltha.{}.{}'.format(self.name, self.device.id)
146 self.lc = LoopingCall(callback, self.device.id, prefix)
147 self.lc.start(interval=self.default_freq / 10)
148
149 def stop_collector(self):
150 log.info("stopping-pm-collection", device_name=self.name,
151 device_id=self.device.id)
152 self.lc.stop()
153
154
155class AdapterAlarms:
156 def __init__(self, adapter, device):
157 self.adapter = adapter
158 self.device = device
159 self.lc = None
160
161 def send_alarm(self, context_data, alarm_data):
162 try:
163 current_context = {}
164 for key, value in context_data.__dict__.items():
165 current_context[key] = str(value)
166
167 alarm_event = self.adapter.adapter_agent.create_alarm(
168 resource_id=self.device.id,
169 description="{}.{} - {}".format(self.adapter.name,
170 self.device.id,
171 alarm_data[
172 'description']) if 'description' in alarm_data else None,
173 type=alarm_data['type'] if 'type' in alarm_data else None,
174 category=alarm_data[
175 'category'] if 'category' in alarm_data else None,
176 severity=alarm_data[
177 'severity'] if 'severity' in alarm_data else None,
178 state=alarm_data['state'] if 'state' in alarm_data else None,
179 raised_ts=alarm_data['ts'] if 'ts' in alarm_data else 0,
180 context=current_context
181 )
182
183 self.adapter.adapter_agent.submit_alarm(self.device.id,
184 alarm_event)
185
186 except Exception as e:
187 log.exception('failed-to-send-alarm', e=e)
188
189
190class PonSimOltAdapter(OltAdapter):
191 def __init__(self, adapter_agent, config):
192 super(PonSimOltAdapter, self).__init__(adapter_agent=adapter_agent,
193 config=config,
194 device_handler_class=PonSimOltHandler,
195 name='ponsim_olt',
196 vendor='Voltha project',
197 version='0.4',
198 device_type='ponsim_olt',
199 accepts_bulk_flow_update=True,
200 accepts_add_remove_flow_updates=False)
201
202 def update_pm_config(self, device, pm_config):
203 log.info("adapter-update-pm-config", device=device,
204 pm_config=pm_config)
205 handler = self.devices_handlers[device.id]
206 handler.update_pm_config(device, pm_config)
207
208
209
210class PonSimOltHandler(object):
211 def __init__(self, adapter, device_id):
212 self.adapter = adapter
213 self.adapter_agent = adapter.adapter_agent
214 self.device_id = device_id
215 self.log = structlog.get_logger(device_id=device_id)
216 self.channel = None
217 self.io_port = None
218 self.logical_device_id = None
219 self.nni_port = None
220 self.ofp_port_no = None
221 self.interface = registry('main').get_args().interface
222 self.pm_metrics = None
223 self.alarms = None
224 self.frames = None
225
226 @inlineCallbacks
227 def get_channel(self):
228 if self.channel is None:
229 try:
230 device = yield self.adapter_agent.get_device(self.device_id)
231 self.log.info('device-info', device=device, host_port=device.host_and_port)
232 self.channel = grpc.insecure_channel(device.host_and_port)
233 except Exception as e:
234 log.exception("ponsim-connection-failure", e=e)
235
236 # returnValue(self.channel)
237
238 def close_channel(self):
239 if self.channel is None:
240 self.log.info('grpc-channel-already-closed')
241 return
242 else:
243 if self.frames is not None:
244 self.frames.cancel()
245 self.frames = None
246 self.log.info('cancelled-grpc-frame-stream')
247
248 self.channel.unsubscribe(lambda *args: None)
249 self.channel = None
250
251 self.log.info('grpc-channel-closed')
252
khenaidoo92e62c52018-10-03 14:02:54 -0400253 @inlineCallbacks
khenaidoob9203542018-09-17 22:56:37 -0400254 def _get_nni_port(self):
khenaidoo92e62c52018-10-03 14:02:54 -0400255 ports = yield self.adapter_agent.get_ports(self.device_id, Port.ETHERNET_NNI)
256 returnValue(ports)
khenaidoob9203542018-09-17 22:56:37 -0400257
258 @inlineCallbacks
259 def activate(self, device):
260 try:
261 self.log.info('activating')
262
263 if not device.host_and_port:
264 device.oper_status = OperStatus.FAILED
265 device.reason = 'No host_and_port field provided'
266 self.adapter_agent.device_update(device)
267 return
268
269 yield self.get_channel()
270 stub = ponsim_pb2.PonSimStub(self.channel)
271 info = stub.GetDeviceInfo(Empty())
272 log.info('got-info', info=info, device_id=device.id)
273 self.ofp_port_no = info.nni_port
274
275 device.root = True
276 device.vendor = 'ponsim'
277 device.model = 'n/a'
278 device.serial_number = device.host_and_port
khenaidoo92e62c52018-10-03 14:02:54 -0400279 device.mac_address = "AA:BB:CC:DD:EE:FF"
280 # device.connect_status = ConnectStatus.REACHABLE
khenaidoob9203542018-09-17 22:56:37 -0400281 yield self.adapter_agent.device_update(device)
282
283 # Now set the initial PM configuration for this device
284 self.pm_metrics = AdapterPmMetrics(device)
285 pm_config = self.pm_metrics.make_proto()
286 log.info("initial-pm-config", pm_config=pm_config)
287 self.adapter_agent.device_pm_config_update(pm_config, init=True)
288
289 # Setup alarm handler
290 self.alarms = AdapterAlarms(self.adapter, device)
291
292 nni_port = Port(
293 port_no=info.nni_port,
294 label='NNI facing Ethernet port',
295 type=Port.ETHERNET_NNI,
khenaidoo92e62c52018-10-03 14:02:54 -0400296 # admin_state=AdminState.ENABLED,
khenaidoob9203542018-09-17 22:56:37 -0400297 oper_status=OperStatus.ACTIVE
298 )
299 self.nni_port = nni_port
300 yield self.adapter_agent.port_created(device.id, nni_port)
301 yield self.adapter_agent.port_created(device.id, Port(
302 port_no=1,
303 label='PON port',
304 type=Port.PON_OLT,
khenaidoo92e62c52018-10-03 14:02:54 -0400305 # admin_state=AdminState.ENABLED,
khenaidoob9203542018-09-17 22:56:37 -0400306 oper_status=OperStatus.ACTIVE
307 ))
khenaidoo92e62c52018-10-03 14:02:54 -0400308
309 yield self.adapter_agent.device_state_update(device.id, connect_status=ConnectStatus.REACHABLE, oper_status=OperStatus.ACTIVE)
khenaidoob9203542018-09-17 22:56:37 -0400310
311 # register ONUS
312 self.log.info('onu-found', onus=info.onus, len=len(info.onus))
313 for onu in info.onus:
314 vlan_id = onu.uni_port
315 yield self.adapter_agent.child_device_detected(
316 parent_device_id=device.id,
317 parent_port_no=1,
318 child_device_type='ponsim_onu',
319 channel_id=vlan_id,
320 )
321
322 self.log.info('starting-frame-grpc-stream')
323 reactor.callInThread(self.rcv_grpc)
324 self.log.info('started-frame-grpc-stream')
325
326 # TODO
327 # Start collecting stats from the device after a brief pause
khenaidoo92e62c52018-10-03 14:02:54 -0400328 self.start_kpi_collection(device.id)
khenaidoob9203542018-09-17 22:56:37 -0400329 except Exception as e:
330 log.exception("Exception-activating", e=e)
331
332
333 def get_ofp_device_info(self, device):
334 return SwitchCapability(
335 desc=ofp_desc(
336 hw_desc='ponsim pon',
337 sw_desc='ponsim pon',
338 serial_num=device.serial_number,
339 dp_desc='n/a'
340 ),
341 switch_features=ofp_switch_features(
342 n_buffers=256, # TODO fake for now
343 n_tables=2, # TODO ditto
344 capabilities=( # TODO and ditto
345 OFPC_FLOW_STATS
346 | OFPC_TABLE_STATS
347 | OFPC_PORT_STATS
348 | OFPC_GROUP_STATS
349 )
350 )
351 )
352
353 def get_ofp_port_info(self, device, port_no):
354 # Since the adapter created the device port then it has the reference of the port to
355 # return the capability. TODO: Do a lookup on the NNI port number and return the
356 # appropriate attributes
357 self.log.info('get_ofp_port_info', port_no=port_no, info=self.ofp_port_no, device_id=device.id)
358 cap = OFPPF_1GB_FD | OFPPF_FIBER
359 return PortCapability(
360 port = LogicalPort (
361 id='nni',
362 ofp_port=ofp_port(
363 port_no=port_no,
364 hw_addr=mac_str_to_tuple(
365 '00:00:00:00:00:%02x' % self.ofp_port_no),
366 name='nni',
367 config=0,
368 state=OFPPS_LIVE,
369 curr=cap,
370 advertised=cap,
371 peer=cap,
372 curr_speed=OFPPF_1GB_FD,
373 max_speed=OFPPF_1GB_FD
374 )
375 )
376 )
377
khenaidoo4d4802d2018-10-04 21:59:49 -0400378 # TODO - change for core 2.0
khenaidoob9203542018-09-17 22:56:37 -0400379 def reconcile(self, device):
380 self.log.info('reconciling-OLT-device-starts')
381
382 if not device.host_and_port:
383 device.oper_status = OperStatus.FAILED
384 device.reason = 'No host_and_port field provided'
385 self.adapter_agent.device_update(device)
386 return
387
388 try:
389 stub = ponsim_pb2.PonSimStub(self.get_channel())
390 info = stub.GetDeviceInfo(Empty())
391 log.info('got-info', info=info)
392 # TODO: Verify we are connected to the same device we are
393 # reconciling - not much data in ponsim to differentiate at the
394 # time
395 device.oper_status = OperStatus.ACTIVE
396 self.adapter_agent.device_update(device)
397 self.ofp_port_no = info.nni_port
398 self.nni_port = self._get_nni_port()
399 except Exception, e:
400 log.exception('device-unreachable', e=e)
401 device.connect_status = ConnectStatus.UNREACHABLE
402 device.oper_status = OperStatus.UNKNOWN
403 self.adapter_agent.device_update(device)
404 return
405
406 # Now set the initial PM configuration for this device
407 self.pm_metrics = AdapterPmMetrics(device)
408 pm_config = self.pm_metrics.make_proto()
409 log.info("initial-pm-config", pm_config=pm_config)
410 self.adapter_agent.device_update_pm_config(pm_config, init=True)
411
412 # Setup alarm handler
413 self.alarms = AdapterAlarms(self.adapter, device)
414
415 # TODO: Is there anything required to verify nni and PON ports
416
417 # Set the logical device id
418 device = self.adapter_agent.get_device(device.id)
419 if device.parent_id:
420 self.logical_device_id = device.parent_id
421 self.adapter_agent.reconcile_logical_device(device.parent_id)
422 else:
423 self.log.info('no-logical-device-set')
424
425 # Reconcile child devices
426 self.adapter_agent.reconcile_child_devices(device.id)
427
428 reactor.callInThread(self.rcv_grpc)
429
430 # Start collecting stats from the device after a brief pause
431 self.start_kpi_collection(device.id)
432
433 self.log.info('reconciling-OLT-device-ends')
434
435 @inlineCallbacks
436 def rcv_grpc(self):
437 """
438 This call establishes a GRPC stream to receive frames.
439 """
440 yield self.get_channel()
441 stub = ponsim_pb2.PonSimStub(self.channel)
442 # stub = ponsim_pb2.PonSimStub(self.get_channel())
443
444 # Attempt to establish a grpc stream with the remote ponsim service
445 self.frames = stub.ReceiveFrames(Empty())
446
447 self.log.info('start-receiving-grpc-frames')
448
449 try:
450 for frame in self.frames:
451 self.log.info('received-grpc-frame',
452 frame_len=len(frame.payload))
453 self._rcv_frame(frame.payload)
454
455 except _Rendezvous, e:
456 log.warn('grpc-connection-lost', message=e.message)
457
458 self.log.info('stopped-receiving-grpc-frames')
459
460
461 # VOLTHA's flow decomposition removes the information about which flows
462 # are trap flows where traffic should be forwarded to the controller.
463 # We'll go through the flows and change the output port of flows that we
464 # know to be trap flows to the OF CONTROLLER port.
465 def update_flow_table(self, flows):
466 stub = ponsim_pb2.PonSimStub(self.get_channel())
467 self.log.info('pushing-olt-flow-table')
468 for flow in flows:
469 classifier_info = {}
470 for field in fd.get_ofb_fields(flow):
471 if field.type == fd.ETH_TYPE:
472 classifier_info['eth_type'] = field.eth_type
473 self.log.debug('field-type-eth-type',
474 eth_type=classifier_info['eth_type'])
475 elif field.type == fd.IP_PROTO:
476 classifier_info['ip_proto'] = field.ip_proto
477 self.log.debug('field-type-ip-proto',
478 ip_proto=classifier_info['ip_proto'])
479 if ('ip_proto' in classifier_info and (
480 classifier_info['ip_proto'] == 17 or
481 classifier_info['ip_proto'] == 2)) or (
482 'eth_type' in classifier_info and
483 classifier_info['eth_type'] == 0x888e):
484 for action in fd.get_actions(flow):
485 if action.type == ofp.OFPAT_OUTPUT:
486 action.output.port = ofp.OFPP_CONTROLLER
487 self.log.info('out_port', out_port=fd.get_out_port(flow))
488
489 stub.UpdateFlowTable(FlowTable(
490 port=0,
491 flows=flows
492 ))
493 self.log.info('success')
494
495 def remove_from_flow_table(self, flows):
496 self.log.debug('remove-from-flow-table', flows=flows)
497 # TODO: Update PONSIM code to accept incremental flow changes
498 # Once completed, the accepts_add_remove_flow_updates for this
499 # device type can be set to True
500
501 def add_to_flow_table(self, flows):
502 self.log.debug('add-to-flow-table', flows=flows)
503 # TODO: Update PONSIM code to accept incremental flow changes
504 # Once completed, the accepts_add_remove_flow_updates for this
505 # device type can be set to True
506
507 def update_pm_config(self, device, pm_config):
508 log.info("handler-update-pm-config", device=device,
509 pm_config=pm_config)
510 self.pm_metrics.update(pm_config)
511
512 def send_proxied_message(self, proxy_address, msg):
513 self.log.info('sending-proxied-message')
514 if isinstance(msg, FlowTable):
515 stub = ponsim_pb2.PonSimStub(self.get_channel())
516 self.log.info('pushing-onu-flow-table', port=msg.port)
517 res = stub.UpdateFlowTable(msg)
518 self.adapter_agent.receive_proxied_message(proxy_address, res)
519
520 def packet_out(self, egress_port, msg):
521 self.log.info('sending-packet-out', egress_port=egress_port,
522 msg=hexify(msg))
523 pkt = Ether(msg)
524 out_pkt = pkt
525 if egress_port != self.nni_port.port_no:
526 # don't do the vlan manipulation for the NNI port, vlans are already correct
527 out_pkt = (
528 Ether(src=pkt.src, dst=pkt.dst) /
529 Dot1Q(vlan=egress_port, type=pkt.type) /
530 pkt.payload
531 )
532
533 # TODO need better way of mapping logical ports to PON ports
534 out_port = self.nni_port.port_no if egress_port == self.nni_port.port_no else 1
535
536 # send over grpc stream
537 stub = ponsim_pb2.PonSimStub(self.get_channel())
538 frame = PonSimFrame(id=self.device_id, payload=str(out_pkt), out_port=out_port)
539 stub.SendFrame(frame)
540
541
542 @inlineCallbacks
543 def reboot(self):
544 self.log.info('rebooting', device_id=self.device_id)
545
khenaidoo4d4802d2018-10-04 21:59:49 -0400546 yield self.adapter_agent.device_state_update(self.device_id, connect_status=ConnectStatus.UNREACHABLE)
khenaidoob9203542018-09-17 22:56:37 -0400547
khenaidoo4d4802d2018-10-04 21:59:49 -0400548 # Update the child devices connect state to UNREACHABLE
549 yield self.adapter_agent.children_state_update(self.device_id,
khenaidoob9203542018-09-17 22:56:37 -0400550 connect_status=ConnectStatus.UNREACHABLE)
551
552 # Sleep 10 secs, simulating a reboot
553 # TODO: send alert and clear alert after the reboot
554 yield asleep(10)
555
khenaidoo4d4802d2018-10-04 21:59:49 -0400556 # Change the connection status back to REACHABLE. With a
557 # real OLT the connection state must be the actual state
558 yield self.adapter_agent.device_state_update(self.device_id, connect_status=ConnectStatus.REACHABLE)
559
khenaidoob9203542018-09-17 22:56:37 -0400560
561 # Update the child devices connect state to REACHABLE
khenaidoo4d4802d2018-10-04 21:59:49 -0400562 yield self.adapter_agent.children_state_update(self.device_id,
khenaidoob9203542018-09-17 22:56:37 -0400563 connect_status=ConnectStatus.REACHABLE)
564
565 self.log.info('rebooted', device_id=self.device_id)
566
567 def self_test_device(self, device):
568 """
569 This is called to Self a device based on a NBI call.
570 :param device: A Voltha.Device object.
571 :return: Will return result of self test
572 """
573 log.info('self-test-device', device=device.id)
574 raise NotImplementedError()
575
khenaidoo92e62c52018-10-03 14:02:54 -0400576
577 @inlineCallbacks
khenaidoob9203542018-09-17 22:56:37 -0400578 def disable(self):
579 self.log.info('disabling', device_id=self.device_id)
580
581 self.stop_kpi_collection()
582
khenaidoo92e62c52018-10-03 14:02:54 -0400583 # Update the operational status to UNKNOWN and connection status to UNREACHABLE
584 yield self.adapter_agent.device_state_update(self.device_id, oper_status=OperStatus.UNKNOWN, connect_status=ConnectStatus.UNREACHABLE)
khenaidoob9203542018-09-17 22:56:37 -0400585
586 self.close_channel()
587 self.log.info('disabled-grpc-channel')
588
khenaidoob9203542018-09-17 22:56:37 -0400589 # TODO:
590 # 1) Remove all flows from the device
591 # 2) Remove the device from ponsim
592
khenaidoo92e62c52018-10-03 14:02:54 -0400593 self.log.info('disabled', device_id=self.device_id)
khenaidoob9203542018-09-17 22:56:37 -0400594
khenaidoo92e62c52018-10-03 14:02:54 -0400595 @inlineCallbacks
khenaidoob9203542018-09-17 22:56:37 -0400596 def reenable(self):
597 self.log.info('re-enabling', device_id=self.device_id)
598
khenaidoob9203542018-09-17 22:56:37 -0400599 # Set the ofp_port_no and nni_port in case we bypassed the reconcile
600 # process if the device was in DISABLED state on voltha restart
601 if not self.ofp_port_no and not self.nni_port:
khenaidoo92e62c52018-10-03 14:02:54 -0400602 yield self.get_channel()
603 stub = ponsim_pb2.PonSimStub(self.channel)
khenaidoob9203542018-09-17 22:56:37 -0400604 info = stub.GetDeviceInfo(Empty())
605 log.info('got-info', info=info)
606 self.ofp_port_no = info.nni_port
khenaidoo92e62c52018-10-03 14:02:54 -0400607 ports = yield self._get_nni_port()
608 # For ponsim, we are using only 1 NNI port
609 if ports.items:
610 self.nni_port = ports.items[0]
khenaidoob9203542018-09-17 22:56:37 -0400611
khenaidoo92e62c52018-10-03 14:02:54 -0400612 # Update the state of the NNI port
613 yield self.adapter_agent.port_state_update(self.device_id,
614 port_type=Port.ETHERNET_NNI,
615 port_no=self.ofp_port_no,
616 oper_status=OperStatus.ACTIVE)
khenaidoob9203542018-09-17 22:56:37 -0400617
khenaidoo92e62c52018-10-03 14:02:54 -0400618 # Update the state of the PON port
619 yield self.adapter_agent.port_state_update(self.device_id,
620 port_type=Port.PON_OLT,
621 port_no=1,
622 oper_status=OperStatus.ACTIVE)
khenaidoob9203542018-09-17 22:56:37 -0400623
khenaidoo92e62c52018-10-03 14:02:54 -0400624 # Set the operational state of the device to ACTIVE and connect status to REACHABLE
625 yield self.adapter_agent.device_state_update(self.device_id,
626 connect_status=ConnectStatus.REACHABLE,
627 oper_status=OperStatus.ACTIVE)
khenaidoob9203542018-09-17 22:56:37 -0400628
khenaidoo92e62c52018-10-03 14:02:54 -0400629 # TODO: establish frame grpc-stream
630 # yield reactor.callInThread(self.rcv_grpc)
khenaidoob9203542018-09-17 22:56:37 -0400631
khenaidoo92e62c52018-10-03 14:02:54 -0400632 self.start_kpi_collection(self.device_id)
khenaidoob9203542018-09-17 22:56:37 -0400633
khenaidoo92e62c52018-10-03 14:02:54 -0400634 self.log.info('re-enabled', device_id=self.device_id)
khenaidoob9203542018-09-17 22:56:37 -0400635
636 def delete(self):
637 self.log.info('deleting', device_id=self.device_id)
638
khenaidoob9203542018-09-17 22:56:37 -0400639 self.close_channel()
640 self.log.info('disabled-grpc-channel')
641
642 # TODO:
643 # 1) Remove all flows from the device
644 # 2) Remove the device from ponsim
645
646 self.log.info('deleted', device_id=self.device_id)
647
648 def start_kpi_collection(self, device_id):
649
khenaidoo4d4802d2018-10-04 21:59:49 -0400650 kafka_cluster_proxy = get_kafka_proxy()
651
khenaidoob9203542018-09-17 22:56:37 -0400652 def _collect(device_id, prefix):
653
654 try:
655 # Step 1: gather metrics from device
656 port_metrics = \
khenaidoo92e62c52018-10-03 14:02:54 -0400657 self.pm_metrics.collect_port_metrics(self.channel)
khenaidoob9203542018-09-17 22:56:37 -0400658
659 # Step 2: prepare the KpiEvent for submission
660 # we can time-stamp them here (or could use time derived from OLT
661 ts = arrow.utcnow().timestamp
662 kpi_event = KpiEvent(
663 type=KpiEventType.slice,
664 ts=ts,
665 prefixes={
666 # OLT NNI port
667 prefix + '.nni': MetricValuePairs(
668 metrics=port_metrics['nni']),
669 # OLT PON port
670 prefix + '.pon': MetricValuePairs(
671 metrics=port_metrics['pon'])
672 }
673 )
674
khenaidoo4d4802d2018-10-04 21:59:49 -0400675 # Step 3: submit directlt to kafka bus
676 if kafka_cluster_proxy:
677 if isinstance(kpi_event, Message):
678 kpi_event = dumps(MessageToDict(kpi_event, True, True))
679 kafka_cluster_proxy.send_message("voltha.kpis", kpi_event)
khenaidoob9203542018-09-17 22:56:37 -0400680
681 except Exception as e:
682 log.exception('failed-to-submit-kpis', e=e)
683
684 self.pm_metrics.start_collector(_collect)
685
686 def stop_kpi_collection(self):
687 self.pm_metrics.stop_collector()