blob: e3157e791684c8718e0e0397b741cfe7a6489b3c [file] [log] [blame]
khenaidoob9203542018-09-17 22:56:37 -04001#
2# Copyright 2017 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18Fully simulated OLT adapter.
19"""
khenaidoob9203542018-09-17 22:56:37 -040020
21import arrow
khenaidoob9203542018-09-17 22:56:37 -040022import grpc
23import structlog
khenaidoo6fdf0ba2018-11-02 14:38:33 -040024from google.protobuf.empty_pb2 import Empty
25from google.protobuf.json_format import MessageToDict
khenaidoofdbad6e2018-11-06 22:26:38 -050026from scapy.layers.inet import Raw
27import json
khenaidoo6fdf0ba2018-11-02 14:38:33 -040028from google.protobuf.message import Message
29from grpc._channel import _Rendezvous
khenaidoob9203542018-09-17 22:56:37 -040030from scapy.layers.l2 import Ether, Dot1Q
khenaidoo6fdf0ba2018-11-02 14:38:33 -040031from simplejson import dumps
khenaidoob9203542018-09-17 22:56:37 -040032from twisted.internet import reactor
khenaidoo92e62c52018-10-03 14:02:54 -040033from twisted.internet.defer import inlineCallbacks, returnValue
khenaidoo6fdf0ba2018-11-02 14:38:33 -040034from twisted.internet.task import LoopingCall
khenaidoob9203542018-09-17 22:56:37 -040035
khenaidoofdbad6e2018-11-06 22:26:38 -050036from python.adapters.common.frameio.frameio import BpfProgramFilter, hexify
37from python.common.utils.asleep import asleep
38from python.common.utils.registry import registry
39from python.adapters.iadapter import OltAdapter
40from python.adapters.kafka.kafka_proxy import get_kafka_proxy
41from python.protos import ponsim_pb2
42from python.protos import third_party
43from python.protos.common_pb2 import OperStatus, ConnectStatus
khenaidoo79232702018-12-04 11:00:41 -050044from python.protos.inter_container_pb2 import SwitchCapability, PortCapability, \
khenaidoo6fdf0ba2018-11-02 14:38:33 -040045 InterAdapterMessageType, InterAdapterResponseBody
khenaidoofdbad6e2018-11-06 22:26:38 -050046from python.protos.device_pb2 import Port, PmConfig, PmConfigs
47from python.protos.events_pb2 import KpiEvent, KpiEventType, MetricValuePairs
48from python.protos.logical_device_pb2 import LogicalPort
49from python.protos.openflow_13_pb2 import OFPPS_LIVE, OFPPF_FIBER, \
khenaidoob9203542018-09-17 22:56:37 -040050 OFPPF_1GB_FD, \
51 OFPC_GROUP_STATS, OFPC_PORT_STATS, OFPC_TABLE_STATS, OFPC_FLOW_STATS, \
52 ofp_switch_features, ofp_desc
khenaidoofdbad6e2018-11-06 22:26:38 -050053from python.protos.openflow_13_pb2 import ofp_port
54from python.protos.ponsim_pb2 import FlowTable, PonSimFrame, PonSimMetricsRequest
khenaidoob9203542018-09-17 22:56:37 -040055
56_ = third_party
57log = structlog.get_logger()
58
59PACKET_IN_VLAN = 4000
60is_inband_frame = BpfProgramFilter('(ether[14:2] & 0xfff) = 0x{:03x}'.format(
61 PACKET_IN_VLAN))
62
khenaidoo6fdf0ba2018-11-02 14:38:33 -040063
khenaidoob9203542018-09-17 22:56:37 -040064def mac_str_to_tuple(mac):
65 return tuple(int(d, 16) for d in mac.split(':'))
66
khenaidoo6fdf0ba2018-11-02 14:38:33 -040067
khenaidoob9203542018-09-17 22:56:37 -040068class AdapterPmMetrics:
69 def __init__(self, device):
70 self.pm_names = {'tx_64_pkts', 'tx_65_127_pkts', 'tx_128_255_pkts',
71 'tx_256_511_pkts', 'tx_512_1023_pkts',
72 'tx_1024_1518_pkts', 'tx_1519_9k_pkts',
73 'rx_64_pkts', 'rx_65_127_pkts',
74 'rx_128_255_pkts', 'rx_256_511_pkts',
75 'rx_512_1023_pkts', 'rx_1024_1518_pkts',
76 'rx_1519_9k_pkts'}
77 self.device = device
78 self.id = device.id
79 self.name = 'ponsim_olt'
80 self.default_freq = 150
81 self.grouped = False
82 self.freq_override = False
83 self.pon_metrics_config = dict()
84 self.nni_metrics_config = dict()
85 self.lc = None
86 for m in self.pm_names:
87 self.pon_metrics_config[m] = PmConfig(name=m,
88 type=PmConfig.COUNTER,
89 enabled=True)
90 self.nni_metrics_config[m] = PmConfig(name=m,
91 type=PmConfig.COUNTER,
92 enabled=True)
93
94 def update(self, pm_config):
95 if self.default_freq != pm_config.default_freq:
96 # Update the callback to the new frequency.
97 self.default_freq = pm_config.default_freq
98 self.lc.stop()
99 self.lc.start(interval=self.default_freq / 10)
100 for m in pm_config.metrics:
101 self.pon_metrics_config[m.name].enabled = m.enabled
102 self.nni_metrics_config[m.name].enabled = m.enabled
103
104 def make_proto(self):
105 pm_config = PmConfigs(
106 id=self.id,
107 default_freq=self.default_freq,
108 grouped=False,
109 freq_override=False)
110 for m in sorted(self.pon_metrics_config):
111 pm = self.pon_metrics_config[m] # Either will do they're the same
112 pm_config.metrics.extend([PmConfig(name=pm.name,
113 type=pm.type,
114 enabled=pm.enabled)])
115 return pm_config
116
117 def collect_port_metrics(self, channel):
118 rtrn_port_metrics = dict()
119 stub = ponsim_pb2.PonSimStub(channel)
120 stats = stub.GetStats(Empty())
121 rtrn_port_metrics['pon'] = self.extract_pon_metrics(stats)
122 rtrn_port_metrics['nni'] = self.extract_nni_metrics(stats)
123 return rtrn_port_metrics
124
125 def extract_pon_metrics(self, stats):
126 rtrn_pon_metrics = dict()
127 for m in stats.metrics:
128 if m.port_name == "pon":
129 for p in m.packets:
130 if self.pon_metrics_config[p.name].enabled:
131 rtrn_pon_metrics[p.name] = p.value
132 return rtrn_pon_metrics
133
134 def extract_nni_metrics(self, stats):
135 rtrn_pon_metrics = dict()
136 for m in stats.metrics:
137 if m.port_name == "nni":
138 for p in m.packets:
139 if self.pon_metrics_config[p.name].enabled:
140 rtrn_pon_metrics[p.name] = p.value
141 return rtrn_pon_metrics
142
143 def start_collector(self, callback):
144 log.info("starting-pm-collection", device_name=self.name,
145 device_id=self.device.id)
146 prefix = 'voltha.{}.{}'.format(self.name, self.device.id)
147 self.lc = LoopingCall(callback, self.device.id, prefix)
148 self.lc.start(interval=self.default_freq / 10)
149
150 def stop_collector(self):
151 log.info("stopping-pm-collection", device_name=self.name,
152 device_id=self.device.id)
153 self.lc.stop()
154
155
156class AdapterAlarms:
157 def __init__(self, adapter, device):
158 self.adapter = adapter
159 self.device = device
160 self.lc = None
161
khenaidoofdbad6e2018-11-06 22:26:38 -0500162 # TODO: Implement code to send to kafka cluster directly instead of
163 # going through the voltha core.
khenaidoob9203542018-09-17 22:56:37 -0400164 def send_alarm(self, context_data, alarm_data):
khenaidoofdbad6e2018-11-06 22:26:38 -0500165 log.debug("send-alarm-not-implemented")
166 return
khenaidoob9203542018-09-17 22:56:37 -0400167
168
169class PonSimOltAdapter(OltAdapter):
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400170 def __init__(self, core_proxy, adapter_proxy, config):
171 super(PonSimOltAdapter, self).__init__(core_proxy=core_proxy,
172 adapter_proxy=adapter_proxy,
khenaidoob9203542018-09-17 22:56:37 -0400173 config=config,
174 device_handler_class=PonSimOltHandler,
175 name='ponsim_olt',
176 vendor='Voltha project',
177 version='0.4',
178 device_type='ponsim_olt',
179 accepts_bulk_flow_update=True,
180 accepts_add_remove_flow_updates=False)
181
182 def update_pm_config(self, device, pm_config):
183 log.info("adapter-update-pm-config", device=device,
184 pm_config=pm_config)
185 handler = self.devices_handlers[device.id]
186 handler.update_pm_config(device, pm_config)
187
188
khenaidoob9203542018-09-17 22:56:37 -0400189class PonSimOltHandler(object):
190 def __init__(self, adapter, device_id):
191 self.adapter = adapter
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400192 self.core_proxy = adapter.core_proxy
193 self.adapter_proxy = adapter.adapter_proxy
khenaidoob9203542018-09-17 22:56:37 -0400194 self.device_id = device_id
195 self.log = structlog.get_logger(device_id=device_id)
196 self.channel = None
197 self.io_port = None
198 self.logical_device_id = None
199 self.nni_port = None
200 self.ofp_port_no = None
201 self.interface = registry('main').get_args().interface
202 self.pm_metrics = None
203 self.alarms = None
204 self.frames = None
205
206 @inlineCallbacks
207 def get_channel(self):
208 if self.channel is None:
209 try:
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400210 device = yield self.core_proxy.get_device(self.device_id)
211 self.log.info('device-info', device=device,
212 host_port=device.host_and_port)
khenaidoob9203542018-09-17 22:56:37 -0400213 self.channel = grpc.insecure_channel(device.host_and_port)
214 except Exception as e:
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400215 log.exception("ponsim-connection-failure", e=e)
khenaidoob9203542018-09-17 22:56:37 -0400216
217 # returnValue(self.channel)
218
219 def close_channel(self):
220 if self.channel is None:
221 self.log.info('grpc-channel-already-closed')
222 return
223 else:
224 if self.frames is not None:
225 self.frames.cancel()
226 self.frames = None
227 self.log.info('cancelled-grpc-frame-stream')
228
229 self.channel.unsubscribe(lambda *args: None)
230 self.channel = None
231
232 self.log.info('grpc-channel-closed')
233
khenaidoo92e62c52018-10-03 14:02:54 -0400234 @inlineCallbacks
khenaidoob9203542018-09-17 22:56:37 -0400235 def _get_nni_port(self):
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400236 ports = yield self.core_proxy.get_ports(self.device_id,
237 Port.ETHERNET_NNI)
khenaidoo92e62c52018-10-03 14:02:54 -0400238 returnValue(ports)
khenaidoob9203542018-09-17 22:56:37 -0400239
240 @inlineCallbacks
241 def activate(self, device):
242 try:
243 self.log.info('activating')
244
245 if not device.host_and_port:
246 device.oper_status = OperStatus.FAILED
247 device.reason = 'No host_and_port field provided'
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400248 self.core_proxy.device_update(device)
khenaidoob9203542018-09-17 22:56:37 -0400249 return
250
251 yield self.get_channel()
252 stub = ponsim_pb2.PonSimStub(self.channel)
253 info = stub.GetDeviceInfo(Empty())
254 log.info('got-info', info=info, device_id=device.id)
255 self.ofp_port_no = info.nni_port
256
257 device.root = True
258 device.vendor = 'ponsim'
259 device.model = 'n/a'
260 device.serial_number = device.host_and_port
khenaidoo92e62c52018-10-03 14:02:54 -0400261 device.mac_address = "AA:BB:CC:DD:EE:FF"
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400262 yield self.core_proxy.device_update(device)
khenaidoob9203542018-09-17 22:56:37 -0400263
264 # Now set the initial PM configuration for this device
265 self.pm_metrics = AdapterPmMetrics(device)
266 pm_config = self.pm_metrics.make_proto()
267 log.info("initial-pm-config", pm_config=pm_config)
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400268 self.core_proxy.device_pm_config_update(pm_config, init=True)
khenaidoob9203542018-09-17 22:56:37 -0400269
270 # Setup alarm handler
271 self.alarms = AdapterAlarms(self.adapter, device)
272
273 nni_port = Port(
274 port_no=info.nni_port,
275 label='NNI facing Ethernet port',
276 type=Port.ETHERNET_NNI,
khenaidoob9203542018-09-17 22:56:37 -0400277 oper_status=OperStatus.ACTIVE
278 )
279 self.nni_port = nni_port
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400280 yield self.core_proxy.port_created(device.id, nni_port)
281 yield self.core_proxy.port_created(device.id, Port(
khenaidoob9203542018-09-17 22:56:37 -0400282 port_no=1,
283 label='PON port',
284 type=Port.PON_OLT,
khenaidoob9203542018-09-17 22:56:37 -0400285 oper_status=OperStatus.ACTIVE
286 ))
khenaidoo92e62c52018-10-03 14:02:54 -0400287
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400288 yield self.core_proxy.device_state_update(device.id,
289 connect_status=ConnectStatus.REACHABLE,
290 oper_status=OperStatus.ACTIVE)
khenaidoob9203542018-09-17 22:56:37 -0400291
292 # register ONUS
293 self.log.info('onu-found', onus=info.onus, len=len(info.onus))
294 for onu in info.onus:
295 vlan_id = onu.uni_port
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400296 yield self.core_proxy.child_device_detected(
khenaidoob9203542018-09-17 22:56:37 -0400297 parent_device_id=device.id,
298 parent_port_no=1,
299 child_device_type='ponsim_onu',
300 channel_id=vlan_id,
301 )
302
303 self.log.info('starting-frame-grpc-stream')
304 reactor.callInThread(self.rcv_grpc)
305 self.log.info('started-frame-grpc-stream')
306
khenaidoob9203542018-09-17 22:56:37 -0400307 # Start collecting stats from the device after a brief pause
khenaidoo92e62c52018-10-03 14:02:54 -0400308 self.start_kpi_collection(device.id)
khenaidoob9203542018-09-17 22:56:37 -0400309 except Exception as e:
310 log.exception("Exception-activating", e=e)
311
khenaidoob9203542018-09-17 22:56:37 -0400312 def get_ofp_device_info(self, device):
313 return SwitchCapability(
314 desc=ofp_desc(
315 hw_desc='ponsim pon',
316 sw_desc='ponsim pon',
317 serial_num=device.serial_number,
318 dp_desc='n/a'
319 ),
320 switch_features=ofp_switch_features(
321 n_buffers=256, # TODO fake for now
322 n_tables=2, # TODO ditto
323 capabilities=( # TODO and ditto
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400324 OFPC_FLOW_STATS
325 | OFPC_TABLE_STATS
326 | OFPC_PORT_STATS
327 | OFPC_GROUP_STATS
khenaidoob9203542018-09-17 22:56:37 -0400328 )
329 )
330 )
331
332 def get_ofp_port_info(self, device, port_no):
333 # Since the adapter created the device port then it has the reference of the port to
334 # return the capability. TODO: Do a lookup on the NNI port number and return the
335 # appropriate attributes
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400336 self.log.info('get_ofp_port_info', port_no=port_no,
337 info=self.ofp_port_no, device_id=device.id)
khenaidoob9203542018-09-17 22:56:37 -0400338 cap = OFPPF_1GB_FD | OFPPF_FIBER
339 return PortCapability(
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400340 port=LogicalPort(
khenaidoob9203542018-09-17 22:56:37 -0400341 ofp_port=ofp_port(
khenaidoob9203542018-09-17 22:56:37 -0400342 hw_addr=mac_str_to_tuple(
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400343 '00:00:00:00:00:%02x' % port_no),
khenaidoob9203542018-09-17 22:56:37 -0400344 config=0,
345 state=OFPPS_LIVE,
346 curr=cap,
347 advertised=cap,
348 peer=cap,
349 curr_speed=OFPPF_1GB_FD,
350 max_speed=OFPPF_1GB_FD
khenaidoo19d7b632018-10-30 10:49:50 -0400351 ),
352 device_id=device.id,
353 device_port_no=port_no
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400354 )
khenaidoob9203542018-09-17 22:56:37 -0400355 )
356
khenaidoo4d4802d2018-10-04 21:59:49 -0400357 # TODO - change for core 2.0
khenaidoob9203542018-09-17 22:56:37 -0400358 def reconcile(self, device):
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400359 self.log.info('reconciling-OLT-device')
khenaidoob9203542018-09-17 22:56:37 -0400360
khenaidoo90847922018-12-03 14:47:51 -0500361 @inlineCallbacks
khenaidoofdbad6e2018-11-06 22:26:38 -0500362 def _rcv_frame(self, frame):
363 pkt = Ether(frame)
364
365 if pkt.haslayer(Dot1Q):
366 outer_shim = pkt.getlayer(Dot1Q)
367
368 if isinstance(outer_shim.payload, Dot1Q):
369 inner_shim = outer_shim.payload
370 cvid = inner_shim.vlan
371 popped_frame = (
372 Ether(src=pkt.src, dst=pkt.dst, type=inner_shim.type) /
373 inner_shim.payload
374 )
375 self.log.info('sending-packet-in',device_id=self.device_id, port=cvid)
khenaidoo90847922018-12-03 14:47:51 -0500376 yield self.core_proxy.send_packet_in(device_id=self.device_id,
khenaidoofdbad6e2018-11-06 22:26:38 -0500377 port=cvid,
378 packet=str(popped_frame))
379 elif pkt.haslayer(Raw):
380 raw_data = json.loads(pkt.getlayer(Raw).load)
381 self.alarms.send_alarm(self, raw_data)
382
khenaidoob9203542018-09-17 22:56:37 -0400383 @inlineCallbacks
384 def rcv_grpc(self):
385 """
386 This call establishes a GRPC stream to receive frames.
387 """
388 yield self.get_channel()
389 stub = ponsim_pb2.PonSimStub(self.channel)
390 # stub = ponsim_pb2.PonSimStub(self.get_channel())
391
392 # Attempt to establish a grpc stream with the remote ponsim service
393 self.frames = stub.ReceiveFrames(Empty())
394
395 self.log.info('start-receiving-grpc-frames')
396
397 try:
398 for frame in self.frames:
399 self.log.info('received-grpc-frame',
400 frame_len=len(frame.payload))
khenaidoo90847922018-12-03 14:47:51 -0500401 yield self._rcv_frame(frame.payload)
khenaidoob9203542018-09-17 22:56:37 -0400402
403 except _Rendezvous, e:
404 log.warn('grpc-connection-lost', message=e.message)
405
406 self.log.info('stopped-receiving-grpc-frames')
407
khenaidoo19d7b632018-10-30 10:49:50 -0400408 @inlineCallbacks
khenaidoob9203542018-09-17 22:56:37 -0400409 def update_flow_table(self, flows):
khenaidoo19d7b632018-10-30 10:49:50 -0400410 yield self.get_channel()
411 stub = ponsim_pb2.PonSimStub(self.channel)
khenaidoob9203542018-09-17 22:56:37 -0400412
khenaidoo19d7b632018-10-30 10:49:50 -0400413 self.log.info('pushing-olt-flow-table')
khenaidoob9203542018-09-17 22:56:37 -0400414 stub.UpdateFlowTable(FlowTable(
415 port=0,
416 flows=flows
417 ))
418 self.log.info('success')
419
420 def remove_from_flow_table(self, flows):
421 self.log.debug('remove-from-flow-table', flows=flows)
422 # TODO: Update PONSIM code to accept incremental flow changes
423 # Once completed, the accepts_add_remove_flow_updates for this
424 # device type can be set to True
425
426 def add_to_flow_table(self, flows):
427 self.log.debug('add-to-flow-table', flows=flows)
428 # TODO: Update PONSIM code to accept incremental flow changes
429 # Once completed, the accepts_add_remove_flow_updates for this
430 # device type can be set to True
431
432 def update_pm_config(self, device, pm_config):
433 log.info("handler-update-pm-config", device=device,
434 pm_config=pm_config)
435 self.pm_metrics.update(pm_config)
436
437 def send_proxied_message(self, proxy_address, msg):
438 self.log.info('sending-proxied-message')
439 if isinstance(msg, FlowTable):
440 stub = ponsim_pb2.PonSimStub(self.get_channel())
441 self.log.info('pushing-onu-flow-table', port=msg.port)
442 res = stub.UpdateFlowTable(msg)
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400443 self.core_proxy.receive_proxied_message(proxy_address, res)
444
445 @inlineCallbacks
446 def process_inter_adapter_message(self, request):
447 self.log.info('process-inter-adapter-message', msg=request)
448 try:
449 if request.header.type == InterAdapterMessageType.FLOW_REQUEST:
450 f = FlowTable()
451 if request.body:
452 request.body.Unpack(f)
453 stub = ponsim_pb2.PonSimStub(self.channel)
454 self.log.info('pushing-onu-flow-table')
455 res = stub.UpdateFlowTable(f)
456 # Send response back
457 reply = InterAdapterResponseBody()
khenaidoo91ecfd62018-11-04 17:13:42 -0500458 reply.status = True
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400459 self.log.info('sending-response-back', reply=reply)
460 yield self.adapter_proxy.send_inter_adapter_message(
461 msg=reply,
462 type=InterAdapterMessageType.FLOW_RESPONSE,
463 from_adapter=self.adapter.name,
464 to_adapter=request.header.from_topic,
465 to_device_id=request.header.to_device_id,
khenaidoo91ecfd62018-11-04 17:13:42 -0500466 message_id=request.header.id
467 )
468 elif request.header.type == InterAdapterMessageType.METRICS_REQUEST:
469 m = PonSimMetricsRequest()
470 if request.body:
471 request.body.Unpack(m)
472 stub = ponsim_pb2.PonSimStub(self.channel)
473 self.log.info('proxying onu stats request', port=m.port)
474 res = stub.GetStats(m)
475 # Send response back
476 reply = InterAdapterResponseBody()
477 reply.status = True
478 reply.body.Pack(res)
479 self.log.info('sending-response-back', reply=reply)
480 yield self.adapter_proxy.send_inter_adapter_message(
481 msg=reply,
482 type=InterAdapterMessageType.METRICS_RESPONSE,
483 from_adapter=self.adapter.name,
484 to_adapter=request.header.from_topic,
485 to_device_id=request.header.to_device_id,
486 message_id=request.header.id
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400487 )
488 except Exception as e:
489 self.log.exception("error-processing-inter-adapter-message", e=e)
khenaidoob9203542018-09-17 22:56:37 -0400490
491 def packet_out(self, egress_port, msg):
492 self.log.info('sending-packet-out', egress_port=egress_port,
493 msg=hexify(msg))
494 pkt = Ether(msg)
495 out_pkt = pkt
496 if egress_port != self.nni_port.port_no:
497 # don't do the vlan manipulation for the NNI port, vlans are already correct
498 out_pkt = (
499 Ether(src=pkt.src, dst=pkt.dst) /
500 Dot1Q(vlan=egress_port, type=pkt.type) /
501 pkt.payload
502 )
503
504 # TODO need better way of mapping logical ports to PON ports
505 out_port = self.nni_port.port_no if egress_port == self.nni_port.port_no else 1
506
507 # send over grpc stream
khenaidoofdbad6e2018-11-06 22:26:38 -0500508 stub = ponsim_pb2.PonSimStub(self.channel)
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400509 frame = PonSimFrame(id=self.device_id, payload=str(out_pkt),
510 out_port=out_port)
khenaidoob9203542018-09-17 22:56:37 -0400511 stub.SendFrame(frame)
512
khenaidoob9203542018-09-17 22:56:37 -0400513 @inlineCallbacks
514 def reboot(self):
515 self.log.info('rebooting', device_id=self.device_id)
516
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400517 yield self.core_proxy.device_state_update(self.device_id,
518 connect_status=ConnectStatus.UNREACHABLE)
khenaidoob9203542018-09-17 22:56:37 -0400519
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400520 # Update the child devices connect state to UNREACHABLE
521 yield self.core_proxy.children_state_update(self.device_id,
522 connect_status=ConnectStatus.UNREACHABLE)
khenaidoob9203542018-09-17 22:56:37 -0400523
524 # Sleep 10 secs, simulating a reboot
525 # TODO: send alert and clear alert after the reboot
526 yield asleep(10)
527
khenaidoo4d4802d2018-10-04 21:59:49 -0400528 # Change the connection status back to REACHABLE. With a
529 # real OLT the connection state must be the actual state
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400530 yield self.core_proxy.device_state_update(self.device_id,
531 connect_status=ConnectStatus.REACHABLE)
khenaidoob9203542018-09-17 22:56:37 -0400532
533 # Update the child devices connect state to REACHABLE
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400534 yield self.core_proxy.children_state_update(self.device_id,
535 connect_status=ConnectStatus.REACHABLE)
khenaidoob9203542018-09-17 22:56:37 -0400536
537 self.log.info('rebooted', device_id=self.device_id)
538
539 def self_test_device(self, device):
540 """
541 This is called to Self a device based on a NBI call.
542 :param device: A Voltha.Device object.
543 :return: Will return result of self test
544 """
545 log.info('self-test-device', device=device.id)
546 raise NotImplementedError()
547
khenaidoo92e62c52018-10-03 14:02:54 -0400548 @inlineCallbacks
khenaidoob9203542018-09-17 22:56:37 -0400549 def disable(self):
550 self.log.info('disabling', device_id=self.device_id)
551
552 self.stop_kpi_collection()
553
khenaidoo92e62c52018-10-03 14:02:54 -0400554 # Update the operational status to UNKNOWN and connection status to UNREACHABLE
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400555 yield self.core_proxy.device_state_update(self.device_id,
556 oper_status=OperStatus.UNKNOWN,
557 connect_status=ConnectStatus.UNREACHABLE)
khenaidoob9203542018-09-17 22:56:37 -0400558
559 self.close_channel()
560 self.log.info('disabled-grpc-channel')
561
khenaidoo91ecfd62018-11-04 17:13:42 -0500562 self.stop_kpi_collection()
563
khenaidoob9203542018-09-17 22:56:37 -0400564 # TODO:
565 # 1) Remove all flows from the device
566 # 2) Remove the device from ponsim
567
khenaidoo92e62c52018-10-03 14:02:54 -0400568 self.log.info('disabled', device_id=self.device_id)
khenaidoob9203542018-09-17 22:56:37 -0400569
khenaidoo92e62c52018-10-03 14:02:54 -0400570 @inlineCallbacks
khenaidoob9203542018-09-17 22:56:37 -0400571 def reenable(self):
572 self.log.info('re-enabling', device_id=self.device_id)
573
khenaidoob9203542018-09-17 22:56:37 -0400574 # Set the ofp_port_no and nni_port in case we bypassed the reconcile
575 # process if the device was in DISABLED state on voltha restart
576 if not self.ofp_port_no and not self.nni_port:
khenaidoo92e62c52018-10-03 14:02:54 -0400577 yield self.get_channel()
578 stub = ponsim_pb2.PonSimStub(self.channel)
khenaidoob9203542018-09-17 22:56:37 -0400579 info = stub.GetDeviceInfo(Empty())
580 log.info('got-info', info=info)
581 self.ofp_port_no = info.nni_port
khenaidoo92e62c52018-10-03 14:02:54 -0400582 ports = yield self._get_nni_port()
583 # For ponsim, we are using only 1 NNI port
584 if ports.items:
585 self.nni_port = ports.items[0]
khenaidoob9203542018-09-17 22:56:37 -0400586
khenaidoo92e62c52018-10-03 14:02:54 -0400587 # Update the state of the NNI port
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400588 yield self.core_proxy.port_state_update(self.device_id,
589 port_type=Port.ETHERNET_NNI,
590 port_no=self.ofp_port_no,
591 oper_status=OperStatus.ACTIVE)
khenaidoob9203542018-09-17 22:56:37 -0400592
khenaidoo92e62c52018-10-03 14:02:54 -0400593 # Update the state of the PON port
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400594 yield self.core_proxy.port_state_update(self.device_id,
595 port_type=Port.PON_OLT,
596 port_no=1,
597 oper_status=OperStatus.ACTIVE)
khenaidoob9203542018-09-17 22:56:37 -0400598
khenaidoo92e62c52018-10-03 14:02:54 -0400599 # Set the operational state of the device to ACTIVE and connect status to REACHABLE
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400600 yield self.core_proxy.device_state_update(self.device_id,
601 connect_status=ConnectStatus.REACHABLE,
602 oper_status=OperStatus.ACTIVE)
khenaidoob9203542018-09-17 22:56:37 -0400603
khenaidoo92e62c52018-10-03 14:02:54 -0400604 # TODO: establish frame grpc-stream
605 # yield reactor.callInThread(self.rcv_grpc)
khenaidoob9203542018-09-17 22:56:37 -0400606
khenaidoo92e62c52018-10-03 14:02:54 -0400607 self.start_kpi_collection(self.device_id)
khenaidoob9203542018-09-17 22:56:37 -0400608
khenaidoo92e62c52018-10-03 14:02:54 -0400609 self.log.info('re-enabled', device_id=self.device_id)
khenaidoob9203542018-09-17 22:56:37 -0400610
611 def delete(self):
612 self.log.info('deleting', device_id=self.device_id)
613
khenaidoob9203542018-09-17 22:56:37 -0400614 self.close_channel()
615 self.log.info('disabled-grpc-channel')
616
617 # TODO:
618 # 1) Remove all flows from the device
619 # 2) Remove the device from ponsim
620
621 self.log.info('deleted', device_id=self.device_id)
622
623 def start_kpi_collection(self, device_id):
624
khenaidoo4d4802d2018-10-04 21:59:49 -0400625 kafka_cluster_proxy = get_kafka_proxy()
626
khenaidoob9203542018-09-17 22:56:37 -0400627 def _collect(device_id, prefix):
628
629 try:
630 # Step 1: gather metrics from device
631 port_metrics = \
khenaidoo92e62c52018-10-03 14:02:54 -0400632 self.pm_metrics.collect_port_metrics(self.channel)
khenaidoob9203542018-09-17 22:56:37 -0400633
634 # Step 2: prepare the KpiEvent for submission
635 # we can time-stamp them here (or could use time derived from OLT
636 ts = arrow.utcnow().timestamp
637 kpi_event = KpiEvent(
638 type=KpiEventType.slice,
639 ts=ts,
640 prefixes={
641 # OLT NNI port
642 prefix + '.nni': MetricValuePairs(
643 metrics=port_metrics['nni']),
644 # OLT PON port
645 prefix + '.pon': MetricValuePairs(
646 metrics=port_metrics['pon'])
647 }
648 )
649
khenaidoo91ecfd62018-11-04 17:13:42 -0500650 # Step 3: submit directly to the kafka bus
khenaidoo4d4802d2018-10-04 21:59:49 -0400651 if kafka_cluster_proxy:
652 if isinstance(kpi_event, Message):
653 kpi_event = dumps(MessageToDict(kpi_event, True, True))
654 kafka_cluster_proxy.send_message("voltha.kpis", kpi_event)
khenaidoob9203542018-09-17 22:56:37 -0400655
656 except Exception as e:
657 log.exception('failed-to-submit-kpis', e=e)
658
659 self.pm_metrics.start_collector(_collect)
660
661 def stop_kpi_collection(self):
662 self.pm_metrics.stop_collector()