blob: 1806a339b43c328c23c4f88c4c335db879c5b05b [file] [log] [blame]
khenaidoob9203542018-09-17 22:56:37 -04001#
2# Copyright 2017 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18Fully simulated OLT adapter.
19"""
20from uuid import uuid4
21
22import arrow
23import adapters.common.openflow.utils as fd
24import grpc
25import structlog
26from scapy.layers.l2 import Ether, Dot1Q
27from twisted.internet import reactor
khenaidoo92e62c52018-10-03 14:02:54 -040028from twisted.internet.defer import inlineCallbacks, returnValue
khenaidoob9203542018-09-17 22:56:37 -040029from grpc._channel import _Rendezvous
30
31from adapters.common.frameio.frameio import BpfProgramFilter, hexify
32from adapters.common.utils.asleep import asleep
33from twisted.internet.task import LoopingCall
34from adapters.iadapter import OltAdapter
35from adapters.protos import third_party
36from adapters.protos import openflow_13_pb2 as ofp
37from adapters.protos import ponsim_pb2
38from adapters.protos.common_pb2 import OperStatus, ConnectStatus, AdminState
39from adapters.protos.device_pb2 import Port, PmConfig, PmConfigs
40from adapters.protos.events_pb2 import KpiEvent, KpiEventType, MetricValuePairs
41from google.protobuf.empty_pb2 import Empty
42from adapters.protos.logical_device_pb2 import LogicalPort, LogicalDevice
43from adapters.protos.openflow_13_pb2 import OFPPS_LIVE, OFPPF_FIBER, \
44 OFPPF_1GB_FD, \
45 OFPC_GROUP_STATS, OFPC_PORT_STATS, OFPC_TABLE_STATS, OFPC_FLOW_STATS, \
46 ofp_switch_features, ofp_desc
47from adapters.protos.openflow_13_pb2 import ofp_port
48from adapters.protos.ponsim_pb2 import FlowTable, PonSimFrame
49from adapters.protos.core_adapter_pb2 import SwitchCapability, PortCapability
50from adapters.common.utils.registry import registry
khenaidoo4d4802d2018-10-04 21:59:49 -040051from adapters.kafka.kafka_proxy import get_kafka_proxy
52from simplejson import dumps
53from google.protobuf.json_format import MessageToDict
54from google.protobuf.message import Message
55
khenaidoob9203542018-09-17 22:56:37 -040056
57_ = third_party
58log = structlog.get_logger()
59
60PACKET_IN_VLAN = 4000
61is_inband_frame = BpfProgramFilter('(ether[14:2] & 0xfff) = 0x{:03x}'.format(
62 PACKET_IN_VLAN))
63
64def mac_str_to_tuple(mac):
65 return tuple(int(d, 16) for d in mac.split(':'))
66
67class AdapterPmMetrics:
68 def __init__(self, device):
69 self.pm_names = {'tx_64_pkts', 'tx_65_127_pkts', 'tx_128_255_pkts',
70 'tx_256_511_pkts', 'tx_512_1023_pkts',
71 'tx_1024_1518_pkts', 'tx_1519_9k_pkts',
72 'rx_64_pkts', 'rx_65_127_pkts',
73 'rx_128_255_pkts', 'rx_256_511_pkts',
74 'rx_512_1023_pkts', 'rx_1024_1518_pkts',
75 'rx_1519_9k_pkts'}
76 self.device = device
77 self.id = device.id
78 self.name = 'ponsim_olt'
79 self.default_freq = 150
80 self.grouped = False
81 self.freq_override = False
82 self.pon_metrics_config = dict()
83 self.nni_metrics_config = dict()
84 self.lc = None
85 for m in self.pm_names:
86 self.pon_metrics_config[m] = PmConfig(name=m,
87 type=PmConfig.COUNTER,
88 enabled=True)
89 self.nni_metrics_config[m] = PmConfig(name=m,
90 type=PmConfig.COUNTER,
91 enabled=True)
92
93 def update(self, pm_config):
94 if self.default_freq != pm_config.default_freq:
95 # Update the callback to the new frequency.
96 self.default_freq = pm_config.default_freq
97 self.lc.stop()
98 self.lc.start(interval=self.default_freq / 10)
99 for m in pm_config.metrics:
100 self.pon_metrics_config[m.name].enabled = m.enabled
101 self.nni_metrics_config[m.name].enabled = m.enabled
102
103 def make_proto(self):
104 pm_config = PmConfigs(
105 id=self.id,
106 default_freq=self.default_freq,
107 grouped=False,
108 freq_override=False)
109 for m in sorted(self.pon_metrics_config):
110 pm = self.pon_metrics_config[m] # Either will do they're the same
111 pm_config.metrics.extend([PmConfig(name=pm.name,
112 type=pm.type,
113 enabled=pm.enabled)])
114 return pm_config
115
116 def collect_port_metrics(self, channel):
117 rtrn_port_metrics = dict()
118 stub = ponsim_pb2.PonSimStub(channel)
119 stats = stub.GetStats(Empty())
120 rtrn_port_metrics['pon'] = self.extract_pon_metrics(stats)
121 rtrn_port_metrics['nni'] = self.extract_nni_metrics(stats)
122 return rtrn_port_metrics
123
124 def extract_pon_metrics(self, stats):
125 rtrn_pon_metrics = dict()
126 for m in stats.metrics:
127 if m.port_name == "pon":
128 for p in m.packets:
129 if self.pon_metrics_config[p.name].enabled:
130 rtrn_pon_metrics[p.name] = p.value
131 return rtrn_pon_metrics
132
133 def extract_nni_metrics(self, stats):
134 rtrn_pon_metrics = dict()
135 for m in stats.metrics:
136 if m.port_name == "nni":
137 for p in m.packets:
138 if self.pon_metrics_config[p.name].enabled:
139 rtrn_pon_metrics[p.name] = p.value
140 return rtrn_pon_metrics
141
142 def start_collector(self, callback):
143 log.info("starting-pm-collection", device_name=self.name,
144 device_id=self.device.id)
145 prefix = 'voltha.{}.{}'.format(self.name, self.device.id)
146 self.lc = LoopingCall(callback, self.device.id, prefix)
147 self.lc.start(interval=self.default_freq / 10)
148
149 def stop_collector(self):
150 log.info("stopping-pm-collection", device_name=self.name,
151 device_id=self.device.id)
152 self.lc.stop()
153
154
155class AdapterAlarms:
156 def __init__(self, adapter, device):
157 self.adapter = adapter
158 self.device = device
159 self.lc = None
160
161 def send_alarm(self, context_data, alarm_data):
162 try:
163 current_context = {}
164 for key, value in context_data.__dict__.items():
165 current_context[key] = str(value)
166
167 alarm_event = self.adapter.adapter_agent.create_alarm(
168 resource_id=self.device.id,
169 description="{}.{} - {}".format(self.adapter.name,
170 self.device.id,
171 alarm_data[
172 'description']) if 'description' in alarm_data else None,
173 type=alarm_data['type'] if 'type' in alarm_data else None,
174 category=alarm_data[
175 'category'] if 'category' in alarm_data else None,
176 severity=alarm_data[
177 'severity'] if 'severity' in alarm_data else None,
178 state=alarm_data['state'] if 'state' in alarm_data else None,
179 raised_ts=alarm_data['ts'] if 'ts' in alarm_data else 0,
180 context=current_context
181 )
182
183 self.adapter.adapter_agent.submit_alarm(self.device.id,
184 alarm_event)
185
186 except Exception as e:
187 log.exception('failed-to-send-alarm', e=e)
188
189
190class PonSimOltAdapter(OltAdapter):
191 def __init__(self, adapter_agent, config):
192 super(PonSimOltAdapter, self).__init__(adapter_agent=adapter_agent,
193 config=config,
194 device_handler_class=PonSimOltHandler,
195 name='ponsim_olt',
196 vendor='Voltha project',
197 version='0.4',
198 device_type='ponsim_olt',
199 accepts_bulk_flow_update=True,
200 accepts_add_remove_flow_updates=False)
201
202 def update_pm_config(self, device, pm_config):
203 log.info("adapter-update-pm-config", device=device,
204 pm_config=pm_config)
205 handler = self.devices_handlers[device.id]
206 handler.update_pm_config(device, pm_config)
207
208
209
210class PonSimOltHandler(object):
211 def __init__(self, adapter, device_id):
212 self.adapter = adapter
213 self.adapter_agent = adapter.adapter_agent
214 self.device_id = device_id
215 self.log = structlog.get_logger(device_id=device_id)
216 self.channel = None
217 self.io_port = None
218 self.logical_device_id = None
219 self.nni_port = None
220 self.ofp_port_no = None
221 self.interface = registry('main').get_args().interface
222 self.pm_metrics = None
223 self.alarms = None
224 self.frames = None
225
226 @inlineCallbacks
227 def get_channel(self):
228 if self.channel is None:
229 try:
230 device = yield self.adapter_agent.get_device(self.device_id)
231 self.log.info('device-info', device=device, host_port=device.host_and_port)
232 self.channel = grpc.insecure_channel(device.host_and_port)
233 except Exception as e:
234 log.exception("ponsim-connection-failure", e=e)
235
236 # returnValue(self.channel)
237
238 def close_channel(self):
239 if self.channel is None:
240 self.log.info('grpc-channel-already-closed')
241 return
242 else:
243 if self.frames is not None:
244 self.frames.cancel()
245 self.frames = None
246 self.log.info('cancelled-grpc-frame-stream')
247
248 self.channel.unsubscribe(lambda *args: None)
249 self.channel = None
250
251 self.log.info('grpc-channel-closed')
252
khenaidoo92e62c52018-10-03 14:02:54 -0400253 @inlineCallbacks
khenaidoob9203542018-09-17 22:56:37 -0400254 def _get_nni_port(self):
khenaidoo92e62c52018-10-03 14:02:54 -0400255 ports = yield self.adapter_agent.get_ports(self.device_id, Port.ETHERNET_NNI)
256 returnValue(ports)
khenaidoob9203542018-09-17 22:56:37 -0400257
258 @inlineCallbacks
259 def activate(self, device):
260 try:
261 self.log.info('activating')
262
263 if not device.host_and_port:
264 device.oper_status = OperStatus.FAILED
265 device.reason = 'No host_and_port field provided'
266 self.adapter_agent.device_update(device)
267 return
268
269 yield self.get_channel()
270 stub = ponsim_pb2.PonSimStub(self.channel)
271 info = stub.GetDeviceInfo(Empty())
272 log.info('got-info', info=info, device_id=device.id)
273 self.ofp_port_no = info.nni_port
274
275 device.root = True
276 device.vendor = 'ponsim'
277 device.model = 'n/a'
278 device.serial_number = device.host_and_port
khenaidoo92e62c52018-10-03 14:02:54 -0400279 device.mac_address = "AA:BB:CC:DD:EE:FF"
280 # device.connect_status = ConnectStatus.REACHABLE
khenaidoob9203542018-09-17 22:56:37 -0400281 yield self.adapter_agent.device_update(device)
282
283 # Now set the initial PM configuration for this device
284 self.pm_metrics = AdapterPmMetrics(device)
285 pm_config = self.pm_metrics.make_proto()
286 log.info("initial-pm-config", pm_config=pm_config)
287 self.adapter_agent.device_pm_config_update(pm_config, init=True)
288
289 # Setup alarm handler
290 self.alarms = AdapterAlarms(self.adapter, device)
291
292 nni_port = Port(
293 port_no=info.nni_port,
294 label='NNI facing Ethernet port',
295 type=Port.ETHERNET_NNI,
khenaidoo92e62c52018-10-03 14:02:54 -0400296 # admin_state=AdminState.ENABLED,
khenaidoob9203542018-09-17 22:56:37 -0400297 oper_status=OperStatus.ACTIVE
298 )
299 self.nni_port = nni_port
300 yield self.adapter_agent.port_created(device.id, nni_port)
301 yield self.adapter_agent.port_created(device.id, Port(
302 port_no=1,
303 label='PON port',
304 type=Port.PON_OLT,
khenaidoo92e62c52018-10-03 14:02:54 -0400305 # admin_state=AdminState.ENABLED,
khenaidoob9203542018-09-17 22:56:37 -0400306 oper_status=OperStatus.ACTIVE
307 ))
khenaidoo92e62c52018-10-03 14:02:54 -0400308
309 yield self.adapter_agent.device_state_update(device.id, connect_status=ConnectStatus.REACHABLE, oper_status=OperStatus.ACTIVE)
khenaidoob9203542018-09-17 22:56:37 -0400310
311 # register ONUS
312 self.log.info('onu-found', onus=info.onus, len=len(info.onus))
313 for onu in info.onus:
314 vlan_id = onu.uni_port
315 yield self.adapter_agent.child_device_detected(
316 parent_device_id=device.id,
317 parent_port_no=1,
318 child_device_type='ponsim_onu',
319 channel_id=vlan_id,
320 )
321
322 self.log.info('starting-frame-grpc-stream')
323 reactor.callInThread(self.rcv_grpc)
324 self.log.info('started-frame-grpc-stream')
325
326 # TODO
327 # Start collecting stats from the device after a brief pause
khenaidoo92e62c52018-10-03 14:02:54 -0400328 self.start_kpi_collection(device.id)
khenaidoob9203542018-09-17 22:56:37 -0400329 except Exception as e:
330 log.exception("Exception-activating", e=e)
331
332
333 def get_ofp_device_info(self, device):
334 return SwitchCapability(
335 desc=ofp_desc(
336 hw_desc='ponsim pon',
337 sw_desc='ponsim pon',
338 serial_num=device.serial_number,
339 dp_desc='n/a'
340 ),
341 switch_features=ofp_switch_features(
342 n_buffers=256, # TODO fake for now
343 n_tables=2, # TODO ditto
344 capabilities=( # TODO and ditto
345 OFPC_FLOW_STATS
346 | OFPC_TABLE_STATS
347 | OFPC_PORT_STATS
348 | OFPC_GROUP_STATS
349 )
350 )
351 )
352
353 def get_ofp_port_info(self, device, port_no):
354 # Since the adapter created the device port then it has the reference of the port to
355 # return the capability. TODO: Do a lookup on the NNI port number and return the
356 # appropriate attributes
357 self.log.info('get_ofp_port_info', port_no=port_no, info=self.ofp_port_no, device_id=device.id)
358 cap = OFPPF_1GB_FD | OFPPF_FIBER
359 return PortCapability(
360 port = LogicalPort (
khenaidoo19d7b632018-10-30 10:49:50 -0400361 # id='nni',
khenaidoob9203542018-09-17 22:56:37 -0400362 ofp_port=ofp_port(
khenaidoo19d7b632018-10-30 10:49:50 -0400363 # port_no=port_no,
khenaidoob9203542018-09-17 22:56:37 -0400364 hw_addr=mac_str_to_tuple(
khenaidoo19d7b632018-10-30 10:49:50 -0400365 '00:00:00:00:00:%02x' % port_no),
366 # name='nni',
khenaidoob9203542018-09-17 22:56:37 -0400367 config=0,
368 state=OFPPS_LIVE,
369 curr=cap,
370 advertised=cap,
371 peer=cap,
372 curr_speed=OFPPF_1GB_FD,
373 max_speed=OFPPF_1GB_FD
khenaidoo19d7b632018-10-30 10:49:50 -0400374 ),
375 device_id=device.id,
376 device_port_no=port_no
377 )
khenaidoob9203542018-09-17 22:56:37 -0400378 )
379
khenaidoo4d4802d2018-10-04 21:59:49 -0400380 # TODO - change for core 2.0
khenaidoob9203542018-09-17 22:56:37 -0400381 def reconcile(self, device):
382 self.log.info('reconciling-OLT-device-starts')
383
384 if not device.host_and_port:
385 device.oper_status = OperStatus.FAILED
386 device.reason = 'No host_and_port field provided'
387 self.adapter_agent.device_update(device)
388 return
389
390 try:
391 stub = ponsim_pb2.PonSimStub(self.get_channel())
392 info = stub.GetDeviceInfo(Empty())
393 log.info('got-info', info=info)
394 # TODO: Verify we are connected to the same device we are
395 # reconciling - not much data in ponsim to differentiate at the
396 # time
397 device.oper_status = OperStatus.ACTIVE
398 self.adapter_agent.device_update(device)
399 self.ofp_port_no = info.nni_port
400 self.nni_port = self._get_nni_port()
401 except Exception, e:
402 log.exception('device-unreachable', e=e)
403 device.connect_status = ConnectStatus.UNREACHABLE
404 device.oper_status = OperStatus.UNKNOWN
405 self.adapter_agent.device_update(device)
406 return
407
408 # Now set the initial PM configuration for this device
409 self.pm_metrics = AdapterPmMetrics(device)
410 pm_config = self.pm_metrics.make_proto()
411 log.info("initial-pm-config", pm_config=pm_config)
412 self.adapter_agent.device_update_pm_config(pm_config, init=True)
413
414 # Setup alarm handler
415 self.alarms = AdapterAlarms(self.adapter, device)
416
417 # TODO: Is there anything required to verify nni and PON ports
418
419 # Set the logical device id
420 device = self.adapter_agent.get_device(device.id)
421 if device.parent_id:
422 self.logical_device_id = device.parent_id
423 self.adapter_agent.reconcile_logical_device(device.parent_id)
424 else:
425 self.log.info('no-logical-device-set')
426
427 # Reconcile child devices
428 self.adapter_agent.reconcile_child_devices(device.id)
429
430 reactor.callInThread(self.rcv_grpc)
431
432 # Start collecting stats from the device after a brief pause
433 self.start_kpi_collection(device.id)
434
435 self.log.info('reconciling-OLT-device-ends')
436
437 @inlineCallbacks
438 def rcv_grpc(self):
439 """
440 This call establishes a GRPC stream to receive frames.
441 """
442 yield self.get_channel()
443 stub = ponsim_pb2.PonSimStub(self.channel)
444 # stub = ponsim_pb2.PonSimStub(self.get_channel())
445
446 # Attempt to establish a grpc stream with the remote ponsim service
447 self.frames = stub.ReceiveFrames(Empty())
448
449 self.log.info('start-receiving-grpc-frames')
450
451 try:
452 for frame in self.frames:
453 self.log.info('received-grpc-frame',
454 frame_len=len(frame.payload))
455 self._rcv_frame(frame.payload)
456
457 except _Rendezvous, e:
458 log.warn('grpc-connection-lost', message=e.message)
459
460 self.log.info('stopped-receiving-grpc-frames')
461
462
khenaidoo19d7b632018-10-30 10:49:50 -0400463 @inlineCallbacks
khenaidoob9203542018-09-17 22:56:37 -0400464 def update_flow_table(self, flows):
khenaidoo19d7b632018-10-30 10:49:50 -0400465 yield self.get_channel()
466 stub = ponsim_pb2.PonSimStub(self.channel)
khenaidoob9203542018-09-17 22:56:37 -0400467
khenaidoo19d7b632018-10-30 10:49:50 -0400468 self.log.info('pushing-olt-flow-table')
khenaidoob9203542018-09-17 22:56:37 -0400469 stub.UpdateFlowTable(FlowTable(
470 port=0,
471 flows=flows
472 ))
473 self.log.info('success')
474
475 def remove_from_flow_table(self, flows):
476 self.log.debug('remove-from-flow-table', flows=flows)
477 # TODO: Update PONSIM code to accept incremental flow changes
478 # Once completed, the accepts_add_remove_flow_updates for this
479 # device type can be set to True
480
481 def add_to_flow_table(self, flows):
482 self.log.debug('add-to-flow-table', flows=flows)
483 # TODO: Update PONSIM code to accept incremental flow changes
484 # Once completed, the accepts_add_remove_flow_updates for this
485 # device type can be set to True
486
487 def update_pm_config(self, device, pm_config):
488 log.info("handler-update-pm-config", device=device,
489 pm_config=pm_config)
490 self.pm_metrics.update(pm_config)
491
492 def send_proxied_message(self, proxy_address, msg):
493 self.log.info('sending-proxied-message')
494 if isinstance(msg, FlowTable):
495 stub = ponsim_pb2.PonSimStub(self.get_channel())
496 self.log.info('pushing-onu-flow-table', port=msg.port)
497 res = stub.UpdateFlowTable(msg)
498 self.adapter_agent.receive_proxied_message(proxy_address, res)
499
500 def packet_out(self, egress_port, msg):
501 self.log.info('sending-packet-out', egress_port=egress_port,
502 msg=hexify(msg))
503 pkt = Ether(msg)
504 out_pkt = pkt
505 if egress_port != self.nni_port.port_no:
506 # don't do the vlan manipulation for the NNI port, vlans are already correct
507 out_pkt = (
508 Ether(src=pkt.src, dst=pkt.dst) /
509 Dot1Q(vlan=egress_port, type=pkt.type) /
510 pkt.payload
511 )
512
513 # TODO need better way of mapping logical ports to PON ports
514 out_port = self.nni_port.port_no if egress_port == self.nni_port.port_no else 1
515
516 # send over grpc stream
517 stub = ponsim_pb2.PonSimStub(self.get_channel())
518 frame = PonSimFrame(id=self.device_id, payload=str(out_pkt), out_port=out_port)
519 stub.SendFrame(frame)
520
521
522 @inlineCallbacks
523 def reboot(self):
524 self.log.info('rebooting', device_id=self.device_id)
525
khenaidoo4d4802d2018-10-04 21:59:49 -0400526 yield self.adapter_agent.device_state_update(self.device_id, connect_status=ConnectStatus.UNREACHABLE)
khenaidoob9203542018-09-17 22:56:37 -0400527
khenaidoo4d4802d2018-10-04 21:59:49 -0400528 # Update the child devices connect state to UNREACHABLE
529 yield self.adapter_agent.children_state_update(self.device_id,
khenaidoob9203542018-09-17 22:56:37 -0400530 connect_status=ConnectStatus.UNREACHABLE)
531
532 # Sleep 10 secs, simulating a reboot
533 # TODO: send alert and clear alert after the reboot
534 yield asleep(10)
535
khenaidoo4d4802d2018-10-04 21:59:49 -0400536 # Change the connection status back to REACHABLE. With a
537 # real OLT the connection state must be the actual state
538 yield self.adapter_agent.device_state_update(self.device_id, connect_status=ConnectStatus.REACHABLE)
539
khenaidoob9203542018-09-17 22:56:37 -0400540
541 # Update the child devices connect state to REACHABLE
khenaidoo4d4802d2018-10-04 21:59:49 -0400542 yield self.adapter_agent.children_state_update(self.device_id,
khenaidoob9203542018-09-17 22:56:37 -0400543 connect_status=ConnectStatus.REACHABLE)
544
545 self.log.info('rebooted', device_id=self.device_id)
546
547 def self_test_device(self, device):
548 """
549 This is called to Self a device based on a NBI call.
550 :param device: A Voltha.Device object.
551 :return: Will return result of self test
552 """
553 log.info('self-test-device', device=device.id)
554 raise NotImplementedError()
555
khenaidoo92e62c52018-10-03 14:02:54 -0400556
557 @inlineCallbacks
khenaidoob9203542018-09-17 22:56:37 -0400558 def disable(self):
559 self.log.info('disabling', device_id=self.device_id)
560
561 self.stop_kpi_collection()
562
khenaidoo92e62c52018-10-03 14:02:54 -0400563 # Update the operational status to UNKNOWN and connection status to UNREACHABLE
564 yield self.adapter_agent.device_state_update(self.device_id, oper_status=OperStatus.UNKNOWN, connect_status=ConnectStatus.UNREACHABLE)
khenaidoob9203542018-09-17 22:56:37 -0400565
566 self.close_channel()
567 self.log.info('disabled-grpc-channel')
568
khenaidoob9203542018-09-17 22:56:37 -0400569 # TODO:
570 # 1) Remove all flows from the device
571 # 2) Remove the device from ponsim
572
khenaidoo92e62c52018-10-03 14:02:54 -0400573 self.log.info('disabled', device_id=self.device_id)
khenaidoob9203542018-09-17 22:56:37 -0400574
khenaidoo92e62c52018-10-03 14:02:54 -0400575 @inlineCallbacks
khenaidoob9203542018-09-17 22:56:37 -0400576 def reenable(self):
577 self.log.info('re-enabling', device_id=self.device_id)
578
khenaidoob9203542018-09-17 22:56:37 -0400579 # Set the ofp_port_no and nni_port in case we bypassed the reconcile
580 # process if the device was in DISABLED state on voltha restart
581 if not self.ofp_port_no and not self.nni_port:
khenaidoo92e62c52018-10-03 14:02:54 -0400582 yield self.get_channel()
583 stub = ponsim_pb2.PonSimStub(self.channel)
khenaidoob9203542018-09-17 22:56:37 -0400584 info = stub.GetDeviceInfo(Empty())
585 log.info('got-info', info=info)
586 self.ofp_port_no = info.nni_port
khenaidoo92e62c52018-10-03 14:02:54 -0400587 ports = yield self._get_nni_port()
588 # For ponsim, we are using only 1 NNI port
589 if ports.items:
590 self.nni_port = ports.items[0]
khenaidoob9203542018-09-17 22:56:37 -0400591
khenaidoo92e62c52018-10-03 14:02:54 -0400592 # Update the state of the NNI port
593 yield self.adapter_agent.port_state_update(self.device_id,
594 port_type=Port.ETHERNET_NNI,
595 port_no=self.ofp_port_no,
596 oper_status=OperStatus.ACTIVE)
khenaidoob9203542018-09-17 22:56:37 -0400597
khenaidoo92e62c52018-10-03 14:02:54 -0400598 # Update the state of the PON port
599 yield self.adapter_agent.port_state_update(self.device_id,
600 port_type=Port.PON_OLT,
601 port_no=1,
602 oper_status=OperStatus.ACTIVE)
khenaidoob9203542018-09-17 22:56:37 -0400603
khenaidoo92e62c52018-10-03 14:02:54 -0400604 # Set the operational state of the device to ACTIVE and connect status to REACHABLE
605 yield self.adapter_agent.device_state_update(self.device_id,
606 connect_status=ConnectStatus.REACHABLE,
607 oper_status=OperStatus.ACTIVE)
khenaidoob9203542018-09-17 22:56:37 -0400608
khenaidoo92e62c52018-10-03 14:02:54 -0400609 # TODO: establish frame grpc-stream
610 # yield reactor.callInThread(self.rcv_grpc)
khenaidoob9203542018-09-17 22:56:37 -0400611
khenaidoo92e62c52018-10-03 14:02:54 -0400612 self.start_kpi_collection(self.device_id)
khenaidoob9203542018-09-17 22:56:37 -0400613
khenaidoo92e62c52018-10-03 14:02:54 -0400614 self.log.info('re-enabled', device_id=self.device_id)
khenaidoob9203542018-09-17 22:56:37 -0400615
616 def delete(self):
617 self.log.info('deleting', device_id=self.device_id)
618
khenaidoob9203542018-09-17 22:56:37 -0400619 self.close_channel()
620 self.log.info('disabled-grpc-channel')
621
622 # TODO:
623 # 1) Remove all flows from the device
624 # 2) Remove the device from ponsim
625
626 self.log.info('deleted', device_id=self.device_id)
627
628 def start_kpi_collection(self, device_id):
629
khenaidoo4d4802d2018-10-04 21:59:49 -0400630 kafka_cluster_proxy = get_kafka_proxy()
631
khenaidoob9203542018-09-17 22:56:37 -0400632 def _collect(device_id, prefix):
633
634 try:
635 # Step 1: gather metrics from device
636 port_metrics = \
khenaidoo92e62c52018-10-03 14:02:54 -0400637 self.pm_metrics.collect_port_metrics(self.channel)
khenaidoob9203542018-09-17 22:56:37 -0400638
639 # Step 2: prepare the KpiEvent for submission
640 # we can time-stamp them here (or could use time derived from OLT
641 ts = arrow.utcnow().timestamp
642 kpi_event = KpiEvent(
643 type=KpiEventType.slice,
644 ts=ts,
645 prefixes={
646 # OLT NNI port
647 prefix + '.nni': MetricValuePairs(
648 metrics=port_metrics['nni']),
649 # OLT PON port
650 prefix + '.pon': MetricValuePairs(
651 metrics=port_metrics['pon'])
652 }
653 )
654
khenaidoo4d4802d2018-10-04 21:59:49 -0400655 # Step 3: submit directlt to kafka bus
656 if kafka_cluster_proxy:
657 if isinstance(kpi_event, Message):
658 kpi_event = dumps(MessageToDict(kpi_event, True, True))
659 kafka_cluster_proxy.send_message("voltha.kpis", kpi_event)
khenaidoob9203542018-09-17 22:56:37 -0400660
661 except Exception as e:
662 log.exception('failed-to-submit-kpis', e=e)
663
664 self.pm_metrics.start_collector(_collect)
665
666 def stop_kpi_collection(self):
667 self.pm_metrics.stop_collector()