blob: 4f5af65eb43a50e34fada982ce2c59bc197807c9 [file] [log] [blame]
khenaidoob9203542018-09-17 22:56:37 -04001#
2# Copyright 2017 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18Fully simulated OLT adapter.
19"""
khenaidoob9203542018-09-17 22:56:37 -040020
21import arrow
khenaidoob9203542018-09-17 22:56:37 -040022import grpc
23import structlog
khenaidoo6fdf0ba2018-11-02 14:38:33 -040024from google.protobuf.empty_pb2 import Empty
25from google.protobuf.json_format import MessageToDict
khenaidoofdbad6e2018-11-06 22:26:38 -050026from scapy.layers.inet import Raw
27import json
khenaidoo6fdf0ba2018-11-02 14:38:33 -040028from google.protobuf.message import Message
29from grpc._channel import _Rendezvous
khenaidoob9203542018-09-17 22:56:37 -040030from scapy.layers.l2 import Ether, Dot1Q
khenaidoo6fdf0ba2018-11-02 14:38:33 -040031from simplejson import dumps
khenaidoob9203542018-09-17 22:56:37 -040032from twisted.internet import reactor
khenaidoo92e62c52018-10-03 14:02:54 -040033from twisted.internet.defer import inlineCallbacks, returnValue
khenaidoo6fdf0ba2018-11-02 14:38:33 -040034from twisted.internet.task import LoopingCall
khenaidoob9203542018-09-17 22:56:37 -040035
khenaidoofdbad6e2018-11-06 22:26:38 -050036from python.adapters.common.frameio.frameio import BpfProgramFilter, hexify
37from python.common.utils.asleep import asleep
38from python.common.utils.registry import registry
39from python.adapters.iadapter import OltAdapter
40from python.adapters.kafka.kafka_proxy import get_kafka_proxy
41from python.protos import ponsim_pb2
42from python.protos import third_party
43from python.protos.common_pb2 import OperStatus, ConnectStatus
khenaidoo79232702018-12-04 11:00:41 -050044from python.protos.inter_container_pb2 import SwitchCapability, PortCapability, \
khenaidoo6fdf0ba2018-11-02 14:38:33 -040045 InterAdapterMessageType, InterAdapterResponseBody
khenaidoofdbad6e2018-11-06 22:26:38 -050046from python.protos.device_pb2 import Port, PmConfig, PmConfigs
47from python.protos.events_pb2 import KpiEvent, KpiEventType, MetricValuePairs
48from python.protos.logical_device_pb2 import LogicalPort
49from python.protos.openflow_13_pb2 import OFPPS_LIVE, OFPPF_FIBER, \
khenaidoob9203542018-09-17 22:56:37 -040050 OFPPF_1GB_FD, \
51 OFPC_GROUP_STATS, OFPC_PORT_STATS, OFPC_TABLE_STATS, OFPC_FLOW_STATS, \
52 ofp_switch_features, ofp_desc
khenaidoofdbad6e2018-11-06 22:26:38 -050053from python.protos.openflow_13_pb2 import ofp_port
54from python.protos.ponsim_pb2 import FlowTable, PonSimFrame, PonSimMetricsRequest
khenaidoob9203542018-09-17 22:56:37 -040055
56_ = third_party
57log = structlog.get_logger()
58
59PACKET_IN_VLAN = 4000
60is_inband_frame = BpfProgramFilter('(ether[14:2] & 0xfff) = 0x{:03x}'.format(
61 PACKET_IN_VLAN))
62
khenaidoo6fdf0ba2018-11-02 14:38:33 -040063
khenaidoob9203542018-09-17 22:56:37 -040064def mac_str_to_tuple(mac):
65 return tuple(int(d, 16) for d in mac.split(':'))
66
khenaidoo6fdf0ba2018-11-02 14:38:33 -040067
khenaidoob9203542018-09-17 22:56:37 -040068class AdapterPmMetrics:
69 def __init__(self, device):
70 self.pm_names = {'tx_64_pkts', 'tx_65_127_pkts', 'tx_128_255_pkts',
71 'tx_256_511_pkts', 'tx_512_1023_pkts',
72 'tx_1024_1518_pkts', 'tx_1519_9k_pkts',
73 'rx_64_pkts', 'rx_65_127_pkts',
74 'rx_128_255_pkts', 'rx_256_511_pkts',
75 'rx_512_1023_pkts', 'rx_1024_1518_pkts',
76 'rx_1519_9k_pkts'}
77 self.device = device
78 self.id = device.id
79 self.name = 'ponsim_olt'
80 self.default_freq = 150
81 self.grouped = False
82 self.freq_override = False
83 self.pon_metrics_config = dict()
84 self.nni_metrics_config = dict()
85 self.lc = None
86 for m in self.pm_names:
87 self.pon_metrics_config[m] = PmConfig(name=m,
88 type=PmConfig.COUNTER,
89 enabled=True)
90 self.nni_metrics_config[m] = PmConfig(name=m,
91 type=PmConfig.COUNTER,
92 enabled=True)
93
94 def update(self, pm_config):
95 if self.default_freq != pm_config.default_freq:
96 # Update the callback to the new frequency.
97 self.default_freq = pm_config.default_freq
98 self.lc.stop()
99 self.lc.start(interval=self.default_freq / 10)
100 for m in pm_config.metrics:
101 self.pon_metrics_config[m.name].enabled = m.enabled
102 self.nni_metrics_config[m.name].enabled = m.enabled
103
104 def make_proto(self):
105 pm_config = PmConfigs(
106 id=self.id,
107 default_freq=self.default_freq,
108 grouped=False,
109 freq_override=False)
110 for m in sorted(self.pon_metrics_config):
111 pm = self.pon_metrics_config[m] # Either will do they're the same
112 pm_config.metrics.extend([PmConfig(name=pm.name,
113 type=pm.type,
114 enabled=pm.enabled)])
115 return pm_config
116
117 def collect_port_metrics(self, channel):
118 rtrn_port_metrics = dict()
119 stub = ponsim_pb2.PonSimStub(channel)
120 stats = stub.GetStats(Empty())
121 rtrn_port_metrics['pon'] = self.extract_pon_metrics(stats)
122 rtrn_port_metrics['nni'] = self.extract_nni_metrics(stats)
123 return rtrn_port_metrics
124
125 def extract_pon_metrics(self, stats):
126 rtrn_pon_metrics = dict()
127 for m in stats.metrics:
128 if m.port_name == "pon":
129 for p in m.packets:
130 if self.pon_metrics_config[p.name].enabled:
131 rtrn_pon_metrics[p.name] = p.value
132 return rtrn_pon_metrics
133
134 def extract_nni_metrics(self, stats):
135 rtrn_pon_metrics = dict()
136 for m in stats.metrics:
137 if m.port_name == "nni":
138 for p in m.packets:
139 if self.pon_metrics_config[p.name].enabled:
140 rtrn_pon_metrics[p.name] = p.value
141 return rtrn_pon_metrics
142
143 def start_collector(self, callback):
144 log.info("starting-pm-collection", device_name=self.name,
145 device_id=self.device.id)
146 prefix = 'voltha.{}.{}'.format(self.name, self.device.id)
147 self.lc = LoopingCall(callback, self.device.id, prefix)
148 self.lc.start(interval=self.default_freq / 10)
149
150 def stop_collector(self):
151 log.info("stopping-pm-collection", device_name=self.name,
152 device_id=self.device.id)
153 self.lc.stop()
154
155
156class AdapterAlarms:
157 def __init__(self, adapter, device):
158 self.adapter = adapter
159 self.device = device
160 self.lc = None
161
khenaidoofdbad6e2018-11-06 22:26:38 -0500162 # TODO: Implement code to send to kafka cluster directly instead of
163 # going through the voltha core.
khenaidoob9203542018-09-17 22:56:37 -0400164 def send_alarm(self, context_data, alarm_data):
khenaidoofdbad6e2018-11-06 22:26:38 -0500165 log.debug("send-alarm-not-implemented")
166 return
khenaidoob9203542018-09-17 22:56:37 -0400167
168
169class PonSimOltAdapter(OltAdapter):
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400170 def __init__(self, core_proxy, adapter_proxy, config):
171 super(PonSimOltAdapter, self).__init__(core_proxy=core_proxy,
172 adapter_proxy=adapter_proxy,
khenaidoob9203542018-09-17 22:56:37 -0400173 config=config,
174 device_handler_class=PonSimOltHandler,
175 name='ponsim_olt',
176 vendor='Voltha project',
177 version='0.4',
178 device_type='ponsim_olt',
179 accepts_bulk_flow_update=True,
180 accepts_add_remove_flow_updates=False)
181
182 def update_pm_config(self, device, pm_config):
183 log.info("adapter-update-pm-config", device=device,
184 pm_config=pm_config)
185 handler = self.devices_handlers[device.id]
186 handler.update_pm_config(device, pm_config)
187
188
khenaidoob9203542018-09-17 22:56:37 -0400189class PonSimOltHandler(object):
190 def __init__(self, adapter, device_id):
191 self.adapter = adapter
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400192 self.core_proxy = adapter.core_proxy
193 self.adapter_proxy = adapter.adapter_proxy
khenaidoob9203542018-09-17 22:56:37 -0400194 self.device_id = device_id
195 self.log = structlog.get_logger(device_id=device_id)
196 self.channel = None
197 self.io_port = None
198 self.logical_device_id = None
199 self.nni_port = None
200 self.ofp_port_no = None
201 self.interface = registry('main').get_args().interface
202 self.pm_metrics = None
203 self.alarms = None
204 self.frames = None
205
206 @inlineCallbacks
207 def get_channel(self):
208 if self.channel is None:
209 try:
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400210 device = yield self.core_proxy.get_device(self.device_id)
211 self.log.info('device-info', device=device,
212 host_port=device.host_and_port)
khenaidoob9203542018-09-17 22:56:37 -0400213 self.channel = grpc.insecure_channel(device.host_and_port)
214 except Exception as e:
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400215 log.exception("ponsim-connection-failure", e=e)
khenaidoob9203542018-09-17 22:56:37 -0400216
217 # returnValue(self.channel)
218
219 def close_channel(self):
220 if self.channel is None:
221 self.log.info('grpc-channel-already-closed')
222 return
223 else:
224 if self.frames is not None:
225 self.frames.cancel()
226 self.frames = None
227 self.log.info('cancelled-grpc-frame-stream')
228
229 self.channel.unsubscribe(lambda *args: None)
230 self.channel = None
231
232 self.log.info('grpc-channel-closed')
233
khenaidoo92e62c52018-10-03 14:02:54 -0400234 @inlineCallbacks
khenaidoob9203542018-09-17 22:56:37 -0400235 def _get_nni_port(self):
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400236 ports = yield self.core_proxy.get_ports(self.device_id,
237 Port.ETHERNET_NNI)
khenaidoo92e62c52018-10-03 14:02:54 -0400238 returnValue(ports)
khenaidoob9203542018-09-17 22:56:37 -0400239
240 @inlineCallbacks
241 def activate(self, device):
242 try:
243 self.log.info('activating')
244
245 if not device.host_and_port:
246 device.oper_status = OperStatus.FAILED
247 device.reason = 'No host_and_port field provided'
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400248 self.core_proxy.device_update(device)
khenaidoob9203542018-09-17 22:56:37 -0400249 return
250
251 yield self.get_channel()
252 stub = ponsim_pb2.PonSimStub(self.channel)
253 info = stub.GetDeviceInfo(Empty())
254 log.info('got-info', info=info, device_id=device.id)
255 self.ofp_port_no = info.nni_port
256
257 device.root = True
258 device.vendor = 'ponsim'
259 device.model = 'n/a'
260 device.serial_number = device.host_and_port
khenaidoo92e62c52018-10-03 14:02:54 -0400261 device.mac_address = "AA:BB:CC:DD:EE:FF"
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400262 yield self.core_proxy.device_update(device)
khenaidoob9203542018-09-17 22:56:37 -0400263
264 # Now set the initial PM configuration for this device
265 self.pm_metrics = AdapterPmMetrics(device)
266 pm_config = self.pm_metrics.make_proto()
267 log.info("initial-pm-config", pm_config=pm_config)
khenaidoodf5a9752019-02-14 14:25:19 -0500268 yield self.core_proxy.device_pm_config_update(pm_config, init=True)
khenaidoob9203542018-09-17 22:56:37 -0400269
270 # Setup alarm handler
271 self.alarms = AdapterAlarms(self.adapter, device)
272
273 nni_port = Port(
274 port_no=info.nni_port,
275 label='NNI facing Ethernet port',
276 type=Port.ETHERNET_NNI,
khenaidoob9203542018-09-17 22:56:37 -0400277 oper_status=OperStatus.ACTIVE
278 )
279 self.nni_port = nni_port
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400280 yield self.core_proxy.port_created(device.id, nni_port)
281 yield self.core_proxy.port_created(device.id, Port(
khenaidoob9203542018-09-17 22:56:37 -0400282 port_no=1,
283 label='PON port',
284 type=Port.PON_OLT,
khenaidoob9203542018-09-17 22:56:37 -0400285 oper_status=OperStatus.ACTIVE
286 ))
khenaidoo92e62c52018-10-03 14:02:54 -0400287
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400288 yield self.core_proxy.device_state_update(device.id,
289 connect_status=ConnectStatus.REACHABLE,
290 oper_status=OperStatus.ACTIVE)
khenaidoob9203542018-09-17 22:56:37 -0400291
292 # register ONUS
293 self.log.info('onu-found', onus=info.onus, len=len(info.onus))
294 for onu in info.onus:
295 vlan_id = onu.uni_port
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400296 yield self.core_proxy.child_device_detected(
khenaidoob9203542018-09-17 22:56:37 -0400297 parent_device_id=device.id,
298 parent_port_no=1,
299 child_device_type='ponsim_onu',
300 channel_id=vlan_id,
301 )
302
303 self.log.info('starting-frame-grpc-stream')
304 reactor.callInThread(self.rcv_grpc)
305 self.log.info('started-frame-grpc-stream')
306
khenaidoob9203542018-09-17 22:56:37 -0400307 # Start collecting stats from the device after a brief pause
khenaidoo92e62c52018-10-03 14:02:54 -0400308 self.start_kpi_collection(device.id)
khenaidoob9203542018-09-17 22:56:37 -0400309 except Exception as e:
310 log.exception("Exception-activating", e=e)
311
khenaidoob9203542018-09-17 22:56:37 -0400312 def get_ofp_device_info(self, device):
313 return SwitchCapability(
314 desc=ofp_desc(
315 hw_desc='ponsim pon',
316 sw_desc='ponsim pon',
317 serial_num=device.serial_number,
khenaidoo54e0ddf2019-02-27 16:21:33 -0500318 mfr_desc="VOLTHA Project",
khenaidoob9203542018-09-17 22:56:37 -0400319 dp_desc='n/a'
320 ),
321 switch_features=ofp_switch_features(
322 n_buffers=256, # TODO fake for now
323 n_tables=2, # TODO ditto
324 capabilities=( # TODO and ditto
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400325 OFPC_FLOW_STATS
326 | OFPC_TABLE_STATS
327 | OFPC_PORT_STATS
328 | OFPC_GROUP_STATS
khenaidoob9203542018-09-17 22:56:37 -0400329 )
330 )
331 )
332
333 def get_ofp_port_info(self, device, port_no):
334 # Since the adapter created the device port then it has the reference of the port to
335 # return the capability. TODO: Do a lookup on the NNI port number and return the
336 # appropriate attributes
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400337 self.log.info('get_ofp_port_info', port_no=port_no,
338 info=self.ofp_port_no, device_id=device.id)
khenaidoob9203542018-09-17 22:56:37 -0400339 cap = OFPPF_1GB_FD | OFPPF_FIBER
340 return PortCapability(
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400341 port=LogicalPort(
khenaidoob9203542018-09-17 22:56:37 -0400342 ofp_port=ofp_port(
khenaidoob9203542018-09-17 22:56:37 -0400343 hw_addr=mac_str_to_tuple(
khenaidoobcf205b2019-01-25 22:21:14 -0500344 'AA:BB:CC:DD:EE:%02x' % port_no),
khenaidoob9203542018-09-17 22:56:37 -0400345 config=0,
346 state=OFPPS_LIVE,
347 curr=cap,
348 advertised=cap,
349 peer=cap,
350 curr_speed=OFPPF_1GB_FD,
351 max_speed=OFPPF_1GB_FD
khenaidoo19d7b632018-10-30 10:49:50 -0400352 ),
353 device_id=device.id,
354 device_port_no=port_no
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400355 )
khenaidoob9203542018-09-17 22:56:37 -0400356 )
357
khenaidoo4d4802d2018-10-04 21:59:49 -0400358 # TODO - change for core 2.0
khenaidoob9203542018-09-17 22:56:37 -0400359 def reconcile(self, device):
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400360 self.log.info('reconciling-OLT-device')
khenaidoob9203542018-09-17 22:56:37 -0400361
khenaidoo90847922018-12-03 14:47:51 -0500362 @inlineCallbacks
khenaidoofdbad6e2018-11-06 22:26:38 -0500363 def _rcv_frame(self, frame):
364 pkt = Ether(frame)
365
366 if pkt.haslayer(Dot1Q):
367 outer_shim = pkt.getlayer(Dot1Q)
368
369 if isinstance(outer_shim.payload, Dot1Q):
370 inner_shim = outer_shim.payload
371 cvid = inner_shim.vlan
372 popped_frame = (
373 Ether(src=pkt.src, dst=pkt.dst, type=inner_shim.type) /
374 inner_shim.payload
375 )
376 self.log.info('sending-packet-in',device_id=self.device_id, port=cvid)
khenaidoo90847922018-12-03 14:47:51 -0500377 yield self.core_proxy.send_packet_in(device_id=self.device_id,
khenaidoofdbad6e2018-11-06 22:26:38 -0500378 port=cvid,
379 packet=str(popped_frame))
380 elif pkt.haslayer(Raw):
381 raw_data = json.loads(pkt.getlayer(Raw).load)
382 self.alarms.send_alarm(self, raw_data)
383
khenaidoob9203542018-09-17 22:56:37 -0400384 @inlineCallbacks
385 def rcv_grpc(self):
386 """
387 This call establishes a GRPC stream to receive frames.
388 """
389 yield self.get_channel()
390 stub = ponsim_pb2.PonSimStub(self.channel)
391 # stub = ponsim_pb2.PonSimStub(self.get_channel())
392
393 # Attempt to establish a grpc stream with the remote ponsim service
394 self.frames = stub.ReceiveFrames(Empty())
395
396 self.log.info('start-receiving-grpc-frames')
397
398 try:
399 for frame in self.frames:
400 self.log.info('received-grpc-frame',
401 frame_len=len(frame.payload))
khenaidoo90847922018-12-03 14:47:51 -0500402 yield self._rcv_frame(frame.payload)
khenaidoob9203542018-09-17 22:56:37 -0400403
404 except _Rendezvous, e:
405 log.warn('grpc-connection-lost', message=e.message)
406
407 self.log.info('stopped-receiving-grpc-frames')
408
khenaidoo19d7b632018-10-30 10:49:50 -0400409 @inlineCallbacks
khenaidoob9203542018-09-17 22:56:37 -0400410 def update_flow_table(self, flows):
khenaidoo19d7b632018-10-30 10:49:50 -0400411 yield self.get_channel()
412 stub = ponsim_pb2.PonSimStub(self.channel)
khenaidoob9203542018-09-17 22:56:37 -0400413
khenaidoo19d7b632018-10-30 10:49:50 -0400414 self.log.info('pushing-olt-flow-table')
khenaidoob9203542018-09-17 22:56:37 -0400415 stub.UpdateFlowTable(FlowTable(
416 port=0,
417 flows=flows
418 ))
419 self.log.info('success')
420
421 def remove_from_flow_table(self, flows):
422 self.log.debug('remove-from-flow-table', flows=flows)
423 # TODO: Update PONSIM code to accept incremental flow changes
424 # Once completed, the accepts_add_remove_flow_updates for this
425 # device type can be set to True
426
427 def add_to_flow_table(self, flows):
428 self.log.debug('add-to-flow-table', flows=flows)
429 # TODO: Update PONSIM code to accept incremental flow changes
430 # Once completed, the accepts_add_remove_flow_updates for this
431 # device type can be set to True
432
433 def update_pm_config(self, device, pm_config):
434 log.info("handler-update-pm-config", device=device,
435 pm_config=pm_config)
436 self.pm_metrics.update(pm_config)
437
438 def send_proxied_message(self, proxy_address, msg):
439 self.log.info('sending-proxied-message')
440 if isinstance(msg, FlowTable):
441 stub = ponsim_pb2.PonSimStub(self.get_channel())
442 self.log.info('pushing-onu-flow-table', port=msg.port)
443 res = stub.UpdateFlowTable(msg)
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400444 self.core_proxy.receive_proxied_message(proxy_address, res)
445
446 @inlineCallbacks
447 def process_inter_adapter_message(self, request):
448 self.log.info('process-inter-adapter-message', msg=request)
449 try:
450 if request.header.type == InterAdapterMessageType.FLOW_REQUEST:
451 f = FlowTable()
452 if request.body:
453 request.body.Unpack(f)
454 stub = ponsim_pb2.PonSimStub(self.channel)
455 self.log.info('pushing-onu-flow-table')
456 res = stub.UpdateFlowTable(f)
457 # Send response back
458 reply = InterAdapterResponseBody()
khenaidoo91ecfd62018-11-04 17:13:42 -0500459 reply.status = True
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400460 self.log.info('sending-response-back', reply=reply)
461 yield self.adapter_proxy.send_inter_adapter_message(
462 msg=reply,
463 type=InterAdapterMessageType.FLOW_RESPONSE,
464 from_adapter=self.adapter.name,
465 to_adapter=request.header.from_topic,
466 to_device_id=request.header.to_device_id,
khenaidoo91ecfd62018-11-04 17:13:42 -0500467 message_id=request.header.id
468 )
469 elif request.header.type == InterAdapterMessageType.METRICS_REQUEST:
470 m = PonSimMetricsRequest()
471 if request.body:
472 request.body.Unpack(m)
473 stub = ponsim_pb2.PonSimStub(self.channel)
474 self.log.info('proxying onu stats request', port=m.port)
475 res = stub.GetStats(m)
476 # Send response back
477 reply = InterAdapterResponseBody()
478 reply.status = True
479 reply.body.Pack(res)
480 self.log.info('sending-response-back', reply=reply)
481 yield self.adapter_proxy.send_inter_adapter_message(
482 msg=reply,
483 type=InterAdapterMessageType.METRICS_RESPONSE,
484 from_adapter=self.adapter.name,
485 to_adapter=request.header.from_topic,
486 to_device_id=request.header.to_device_id,
487 message_id=request.header.id
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400488 )
489 except Exception as e:
490 self.log.exception("error-processing-inter-adapter-message", e=e)
khenaidoob9203542018-09-17 22:56:37 -0400491
492 def packet_out(self, egress_port, msg):
493 self.log.info('sending-packet-out', egress_port=egress_port,
494 msg=hexify(msg))
khenaidoobcf205b2019-01-25 22:21:14 -0500495 try:
496 pkt = Ether(msg)
497 out_pkt = pkt
498 if egress_port != self.nni_port.port_no:
499 # don't do the vlan manipulation for the NNI port, vlans are already correct
500 out_pkt = (
501 Ether(src=pkt.src, dst=pkt.dst) /
502 Dot1Q(vlan=egress_port, type=pkt.type) /
503 pkt.payload
504 )
khenaidoob9203542018-09-17 22:56:37 -0400505
khenaidoobcf205b2019-01-25 22:21:14 -0500506 # TODO need better way of mapping logical ports to PON ports
507 out_port = self.nni_port.port_no if egress_port == self.nni_port.port_no else 1
khenaidoob9203542018-09-17 22:56:37 -0400508
khenaidoobcf205b2019-01-25 22:21:14 -0500509 # send over grpc stream
510 stub = ponsim_pb2.PonSimStub(self.channel)
511 frame = PonSimFrame(id=self.device_id, payload=str(out_pkt),
512 out_port=out_port)
513 stub.SendFrame(frame)
514 except Exception as e:
515 self.log.exception("error-processing-packet-out", e=e)
516
khenaidoob9203542018-09-17 22:56:37 -0400517
khenaidoob9203542018-09-17 22:56:37 -0400518 @inlineCallbacks
519 def reboot(self):
520 self.log.info('rebooting', device_id=self.device_id)
521
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400522 yield self.core_proxy.device_state_update(self.device_id,
523 connect_status=ConnectStatus.UNREACHABLE)
khenaidoob9203542018-09-17 22:56:37 -0400524
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400525 # Update the child devices connect state to UNREACHABLE
526 yield self.core_proxy.children_state_update(self.device_id,
527 connect_status=ConnectStatus.UNREACHABLE)
khenaidoob9203542018-09-17 22:56:37 -0400528
529 # Sleep 10 secs, simulating a reboot
530 # TODO: send alert and clear alert after the reboot
531 yield asleep(10)
532
khenaidoo4d4802d2018-10-04 21:59:49 -0400533 # Change the connection status back to REACHABLE. With a
534 # real OLT the connection state must be the actual state
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400535 yield self.core_proxy.device_state_update(self.device_id,
536 connect_status=ConnectStatus.REACHABLE)
khenaidoob9203542018-09-17 22:56:37 -0400537
538 # Update the child devices connect state to REACHABLE
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400539 yield self.core_proxy.children_state_update(self.device_id,
540 connect_status=ConnectStatus.REACHABLE)
khenaidoob9203542018-09-17 22:56:37 -0400541
542 self.log.info('rebooted', device_id=self.device_id)
543
544 def self_test_device(self, device):
545 """
546 This is called to Self a device based on a NBI call.
547 :param device: A Voltha.Device object.
548 :return: Will return result of self test
549 """
550 log.info('self-test-device', device=device.id)
551 raise NotImplementedError()
552
khenaidoo92e62c52018-10-03 14:02:54 -0400553 @inlineCallbacks
khenaidoob9203542018-09-17 22:56:37 -0400554 def disable(self):
555 self.log.info('disabling', device_id=self.device_id)
556
557 self.stop_kpi_collection()
558
khenaidoo92e62c52018-10-03 14:02:54 -0400559 # Update the operational status to UNKNOWN and connection status to UNREACHABLE
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400560 yield self.core_proxy.device_state_update(self.device_id,
561 oper_status=OperStatus.UNKNOWN,
562 connect_status=ConnectStatus.UNREACHABLE)
khenaidoob9203542018-09-17 22:56:37 -0400563
564 self.close_channel()
565 self.log.info('disabled-grpc-channel')
566
khenaidoo91ecfd62018-11-04 17:13:42 -0500567 self.stop_kpi_collection()
568
khenaidoob9203542018-09-17 22:56:37 -0400569 # TODO:
570 # 1) Remove all flows from the device
571 # 2) Remove the device from ponsim
572
khenaidoo92e62c52018-10-03 14:02:54 -0400573 self.log.info('disabled', device_id=self.device_id)
khenaidoob9203542018-09-17 22:56:37 -0400574
khenaidoo92e62c52018-10-03 14:02:54 -0400575 @inlineCallbacks
khenaidoob9203542018-09-17 22:56:37 -0400576 def reenable(self):
577 self.log.info('re-enabling', device_id=self.device_id)
578
khenaidoob9203542018-09-17 22:56:37 -0400579 # Set the ofp_port_no and nni_port in case we bypassed the reconcile
580 # process if the device was in DISABLED state on voltha restart
581 if not self.ofp_port_no and not self.nni_port:
khenaidoo92e62c52018-10-03 14:02:54 -0400582 yield self.get_channel()
583 stub = ponsim_pb2.PonSimStub(self.channel)
khenaidoob9203542018-09-17 22:56:37 -0400584 info = stub.GetDeviceInfo(Empty())
585 log.info('got-info', info=info)
586 self.ofp_port_no = info.nni_port
khenaidoo92e62c52018-10-03 14:02:54 -0400587 ports = yield self._get_nni_port()
588 # For ponsim, we are using only 1 NNI port
589 if ports.items:
590 self.nni_port = ports.items[0]
khenaidoob9203542018-09-17 22:56:37 -0400591
khenaidoo92e62c52018-10-03 14:02:54 -0400592 # Update the state of the NNI port
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400593 yield self.core_proxy.port_state_update(self.device_id,
594 port_type=Port.ETHERNET_NNI,
595 port_no=self.ofp_port_no,
596 oper_status=OperStatus.ACTIVE)
khenaidoob9203542018-09-17 22:56:37 -0400597
khenaidoo92e62c52018-10-03 14:02:54 -0400598 # Update the state of the PON port
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400599 yield self.core_proxy.port_state_update(self.device_id,
600 port_type=Port.PON_OLT,
601 port_no=1,
602 oper_status=OperStatus.ACTIVE)
khenaidoob9203542018-09-17 22:56:37 -0400603
khenaidoo92e62c52018-10-03 14:02:54 -0400604 # Set the operational state of the device to ACTIVE and connect status to REACHABLE
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400605 yield self.core_proxy.device_state_update(self.device_id,
606 connect_status=ConnectStatus.REACHABLE,
607 oper_status=OperStatus.ACTIVE)
khenaidoob9203542018-09-17 22:56:37 -0400608
khenaidoo92e62c52018-10-03 14:02:54 -0400609 # TODO: establish frame grpc-stream
610 # yield reactor.callInThread(self.rcv_grpc)
khenaidoob9203542018-09-17 22:56:37 -0400611
khenaidoo92e62c52018-10-03 14:02:54 -0400612 self.start_kpi_collection(self.device_id)
khenaidoob9203542018-09-17 22:56:37 -0400613
khenaidoo92e62c52018-10-03 14:02:54 -0400614 self.log.info('re-enabled', device_id=self.device_id)
khenaidoob9203542018-09-17 22:56:37 -0400615
616 def delete(self):
617 self.log.info('deleting', device_id=self.device_id)
618
khenaidoob9203542018-09-17 22:56:37 -0400619 self.close_channel()
620 self.log.info('disabled-grpc-channel')
621
622 # TODO:
623 # 1) Remove all flows from the device
624 # 2) Remove the device from ponsim
625
626 self.log.info('deleted', device_id=self.device_id)
627
628 def start_kpi_collection(self, device_id):
629
khenaidoo4d4802d2018-10-04 21:59:49 -0400630 kafka_cluster_proxy = get_kafka_proxy()
631
khenaidoob9203542018-09-17 22:56:37 -0400632 def _collect(device_id, prefix):
633
634 try:
635 # Step 1: gather metrics from device
636 port_metrics = \
khenaidoo92e62c52018-10-03 14:02:54 -0400637 self.pm_metrics.collect_port_metrics(self.channel)
khenaidoob9203542018-09-17 22:56:37 -0400638
639 # Step 2: prepare the KpiEvent for submission
640 # we can time-stamp them here (or could use time derived from OLT
641 ts = arrow.utcnow().timestamp
642 kpi_event = KpiEvent(
643 type=KpiEventType.slice,
644 ts=ts,
645 prefixes={
646 # OLT NNI port
647 prefix + '.nni': MetricValuePairs(
648 metrics=port_metrics['nni']),
649 # OLT PON port
650 prefix + '.pon': MetricValuePairs(
651 metrics=port_metrics['pon'])
652 }
653 )
654
khenaidoo91ecfd62018-11-04 17:13:42 -0500655 # Step 3: submit directly to the kafka bus
khenaidoo4d4802d2018-10-04 21:59:49 -0400656 if kafka_cluster_proxy:
657 if isinstance(kpi_event, Message):
658 kpi_event = dumps(MessageToDict(kpi_event, True, True))
659 kafka_cluster_proxy.send_message("voltha.kpis", kpi_event)
khenaidoob9203542018-09-17 22:56:37 -0400660
661 except Exception as e:
662 log.exception('failed-to-submit-kpis', e=e)
663
664 self.pm_metrics.start_collector(_collect)
665
666 def stop_kpi_collection(self):
667 self.pm_metrics.stop_collector()