blob: c99fcac3a03d7146777fdd8835745dc54727f472 [file] [log] [blame]
khenaidoob9203542018-09-17 22:56:37 -04001#
2# Copyright 2017 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18Fully simulated OLT adapter.
19"""
khenaidoob9203542018-09-17 22:56:37 -040020
21import arrow
khenaidoob9203542018-09-17 22:56:37 -040022import grpc
23import structlog
khenaidoo6fdf0ba2018-11-02 14:38:33 -040024from google.protobuf.empty_pb2 import Empty
25from google.protobuf.json_format import MessageToDict
khenaidoofdbad6e2018-11-06 22:26:38 -050026from scapy.layers.inet import Raw
27import json
khenaidoo6fdf0ba2018-11-02 14:38:33 -040028from google.protobuf.message import Message
29from grpc._channel import _Rendezvous
khenaidoob9203542018-09-17 22:56:37 -040030from scapy.layers.l2 import Ether, Dot1Q
khenaidoo6fdf0ba2018-11-02 14:38:33 -040031from simplejson import dumps
khenaidoob9203542018-09-17 22:56:37 -040032from twisted.internet import reactor
khenaidoo92e62c52018-10-03 14:02:54 -040033from twisted.internet.defer import inlineCallbacks, returnValue
khenaidoo6fdf0ba2018-11-02 14:38:33 -040034from twisted.internet.task import LoopingCall
khenaidoob9203542018-09-17 22:56:37 -040035
William Kurkianfc0dcda2019-04-08 16:54:36 -040036from pyvoltha.adapters.common.frameio.frameio import BpfProgramFilter, hexify
37from pyvoltha.common.utils.asleep import asleep
38from pyvoltha.common.utils.registry import registry
39from pyvoltha.adapters.iadapter import OltAdapter
40from pyvoltha.adapters.kafka.kafka_proxy import get_kafka_proxy
41from voltha_protos.ponsim_pb2_grpc import PonSimStub
42from voltha_protos.common_pb2 import OperStatus, ConnectStatus
43from voltha_protos.inter_container_pb2 import SwitchCapability, PortCapability, \
khenaidoo6fdf0ba2018-11-02 14:38:33 -040044 InterAdapterMessageType, InterAdapterResponseBody
William Kurkianfc0dcda2019-04-08 16:54:36 -040045from voltha_protos.device_pb2 import Port, PmConfig, PmConfigs
46from voltha_protos.events_pb2 import KpiEvent, KpiEventType, MetricValuePairs
47from voltha_protos.logical_device_pb2 import LogicalPort
48from voltha_protos.openflow_13_pb2 import OFPPS_LIVE, OFPPF_FIBER, \
khenaidoob9203542018-09-17 22:56:37 -040049 OFPPF_1GB_FD, \
50 OFPC_GROUP_STATS, OFPC_PORT_STATS, OFPC_TABLE_STATS, OFPC_FLOW_STATS, \
51 ofp_switch_features, ofp_desc
William Kurkianfc0dcda2019-04-08 16:54:36 -040052from voltha_protos.openflow_13_pb2 import ofp_port
53from voltha_protos.ponsim_pb2 import FlowTable, PonSimFrame, PonSimMetricsRequest
khenaidoob9203542018-09-17 22:56:37 -040054
khenaidoob9203542018-09-17 22:56:37 -040055log = structlog.get_logger()
56
57PACKET_IN_VLAN = 4000
58is_inband_frame = BpfProgramFilter('(ether[14:2] & 0xfff) = 0x{:03x}'.format(
59 PACKET_IN_VLAN))
60
khenaidoo6fdf0ba2018-11-02 14:38:33 -040061
khenaidoob9203542018-09-17 22:56:37 -040062def mac_str_to_tuple(mac):
63 return tuple(int(d, 16) for d in mac.split(':'))
64
khenaidoo6fdf0ba2018-11-02 14:38:33 -040065
khenaidoob9203542018-09-17 22:56:37 -040066class AdapterPmMetrics:
67 def __init__(self, device):
68 self.pm_names = {'tx_64_pkts', 'tx_65_127_pkts', 'tx_128_255_pkts',
69 'tx_256_511_pkts', 'tx_512_1023_pkts',
70 'tx_1024_1518_pkts', 'tx_1519_9k_pkts',
71 'rx_64_pkts', 'rx_65_127_pkts',
72 'rx_128_255_pkts', 'rx_256_511_pkts',
73 'rx_512_1023_pkts', 'rx_1024_1518_pkts',
74 'rx_1519_9k_pkts'}
75 self.device = device
76 self.id = device.id
77 self.name = 'ponsim_olt'
78 self.default_freq = 150
79 self.grouped = False
80 self.freq_override = False
81 self.pon_metrics_config = dict()
82 self.nni_metrics_config = dict()
83 self.lc = None
84 for m in self.pm_names:
85 self.pon_metrics_config[m] = PmConfig(name=m,
86 type=PmConfig.COUNTER,
87 enabled=True)
88 self.nni_metrics_config[m] = PmConfig(name=m,
89 type=PmConfig.COUNTER,
90 enabled=True)
91
92 def update(self, pm_config):
93 if self.default_freq != pm_config.default_freq:
94 # Update the callback to the new frequency.
95 self.default_freq = pm_config.default_freq
96 self.lc.stop()
97 self.lc.start(interval=self.default_freq / 10)
98 for m in pm_config.metrics:
99 self.pon_metrics_config[m.name].enabled = m.enabled
100 self.nni_metrics_config[m.name].enabled = m.enabled
101
102 def make_proto(self):
103 pm_config = PmConfigs(
104 id=self.id,
105 default_freq=self.default_freq,
106 grouped=False,
107 freq_override=False)
108 for m in sorted(self.pon_metrics_config):
109 pm = self.pon_metrics_config[m] # Either will do they're the same
110 pm_config.metrics.extend([PmConfig(name=pm.name,
111 type=pm.type,
112 enabled=pm.enabled)])
113 return pm_config
114
115 def collect_port_metrics(self, channel):
116 rtrn_port_metrics = dict()
William Kurkianfc0dcda2019-04-08 16:54:36 -0400117 stub = PonSimStub(channel)
khenaidoob9203542018-09-17 22:56:37 -0400118 stats = stub.GetStats(Empty())
119 rtrn_port_metrics['pon'] = self.extract_pon_metrics(stats)
120 rtrn_port_metrics['nni'] = self.extract_nni_metrics(stats)
121 return rtrn_port_metrics
122
123 def extract_pon_metrics(self, stats):
124 rtrn_pon_metrics = dict()
125 for m in stats.metrics:
126 if m.port_name == "pon":
127 for p in m.packets:
128 if self.pon_metrics_config[p.name].enabled:
129 rtrn_pon_metrics[p.name] = p.value
130 return rtrn_pon_metrics
131
132 def extract_nni_metrics(self, stats):
133 rtrn_pon_metrics = dict()
134 for m in stats.metrics:
135 if m.port_name == "nni":
136 for p in m.packets:
137 if self.pon_metrics_config[p.name].enabled:
138 rtrn_pon_metrics[p.name] = p.value
139 return rtrn_pon_metrics
140
141 def start_collector(self, callback):
142 log.info("starting-pm-collection", device_name=self.name,
143 device_id=self.device.id)
144 prefix = 'voltha.{}.{}'.format(self.name, self.device.id)
145 self.lc = LoopingCall(callback, self.device.id, prefix)
146 self.lc.start(interval=self.default_freq / 10)
147
148 def stop_collector(self):
149 log.info("stopping-pm-collection", device_name=self.name,
150 device_id=self.device.id)
151 self.lc.stop()
152
153
154class AdapterAlarms:
155 def __init__(self, adapter, device):
156 self.adapter = adapter
157 self.device = device
158 self.lc = None
159
khenaidoofdbad6e2018-11-06 22:26:38 -0500160 # TODO: Implement code to send to kafka cluster directly instead of
161 # going through the voltha core.
khenaidoob9203542018-09-17 22:56:37 -0400162 def send_alarm(self, context_data, alarm_data):
khenaidoofdbad6e2018-11-06 22:26:38 -0500163 log.debug("send-alarm-not-implemented")
164 return
khenaidoob9203542018-09-17 22:56:37 -0400165
166
167class PonSimOltAdapter(OltAdapter):
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400168 def __init__(self, core_proxy, adapter_proxy, config):
169 super(PonSimOltAdapter, self).__init__(core_proxy=core_proxy,
170 adapter_proxy=adapter_proxy,
khenaidoob9203542018-09-17 22:56:37 -0400171 config=config,
172 device_handler_class=PonSimOltHandler,
173 name='ponsim_olt',
174 vendor='Voltha project',
175 version='0.4',
176 device_type='ponsim_olt',
177 accepts_bulk_flow_update=True,
178 accepts_add_remove_flow_updates=False)
179
180 def update_pm_config(self, device, pm_config):
181 log.info("adapter-update-pm-config", device=device,
182 pm_config=pm_config)
183 handler = self.devices_handlers[device.id]
184 handler.update_pm_config(device, pm_config)
185
186
khenaidoob9203542018-09-17 22:56:37 -0400187class PonSimOltHandler(object):
188 def __init__(self, adapter, device_id):
189 self.adapter = adapter
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400190 self.core_proxy = adapter.core_proxy
191 self.adapter_proxy = adapter.adapter_proxy
khenaidoob9203542018-09-17 22:56:37 -0400192 self.device_id = device_id
193 self.log = structlog.get_logger(device_id=device_id)
194 self.channel = None
195 self.io_port = None
196 self.logical_device_id = None
197 self.nni_port = None
198 self.ofp_port_no = None
199 self.interface = registry('main').get_args().interface
200 self.pm_metrics = None
201 self.alarms = None
202 self.frames = None
203
204 @inlineCallbacks
205 def get_channel(self):
206 if self.channel is None:
207 try:
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400208 device = yield self.core_proxy.get_device(self.device_id)
209 self.log.info('device-info', device=device,
210 host_port=device.host_and_port)
khenaidoob9203542018-09-17 22:56:37 -0400211 self.channel = grpc.insecure_channel(device.host_and_port)
212 except Exception as e:
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400213 log.exception("ponsim-connection-failure", e=e)
khenaidoob9203542018-09-17 22:56:37 -0400214
215 # returnValue(self.channel)
216
217 def close_channel(self):
218 if self.channel is None:
219 self.log.info('grpc-channel-already-closed')
220 return
221 else:
222 if self.frames is not None:
223 self.frames.cancel()
224 self.frames = None
225 self.log.info('cancelled-grpc-frame-stream')
226
227 self.channel.unsubscribe(lambda *args: None)
228 self.channel = None
229
230 self.log.info('grpc-channel-closed')
231
khenaidoo92e62c52018-10-03 14:02:54 -0400232 @inlineCallbacks
khenaidoob9203542018-09-17 22:56:37 -0400233 def _get_nni_port(self):
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400234 ports = yield self.core_proxy.get_ports(self.device_id,
235 Port.ETHERNET_NNI)
khenaidoo92e62c52018-10-03 14:02:54 -0400236 returnValue(ports)
khenaidoob9203542018-09-17 22:56:37 -0400237
238 @inlineCallbacks
239 def activate(self, device):
240 try:
241 self.log.info('activating')
242
243 if not device.host_and_port:
244 device.oper_status = OperStatus.FAILED
245 device.reason = 'No host_and_port field provided'
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400246 self.core_proxy.device_update(device)
khenaidoob9203542018-09-17 22:56:37 -0400247 return
248
249 yield self.get_channel()
William Kurkianfc0dcda2019-04-08 16:54:36 -0400250 stub = PonSimStub(self.channel)
khenaidoob9203542018-09-17 22:56:37 -0400251 info = stub.GetDeviceInfo(Empty())
252 log.info('got-info', info=info, device_id=device.id)
253 self.ofp_port_no = info.nni_port
254
255 device.root = True
256 device.vendor = 'ponsim'
257 device.model = 'n/a'
258 device.serial_number = device.host_and_port
khenaidoo92e62c52018-10-03 14:02:54 -0400259 device.mac_address = "AA:BB:CC:DD:EE:FF"
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400260 yield self.core_proxy.device_update(device)
khenaidoob9203542018-09-17 22:56:37 -0400261
262 # Now set the initial PM configuration for this device
263 self.pm_metrics = AdapterPmMetrics(device)
264 pm_config = self.pm_metrics.make_proto()
265 log.info("initial-pm-config", pm_config=pm_config)
khenaidoodf5a9752019-02-14 14:25:19 -0500266 yield self.core_proxy.device_pm_config_update(pm_config, init=True)
khenaidoob9203542018-09-17 22:56:37 -0400267
268 # Setup alarm handler
269 self.alarms = AdapterAlarms(self.adapter, device)
270
271 nni_port = Port(
272 port_no=info.nni_port,
khenaidoo54544ae2019-03-18 13:22:39 -0400273 label='nni-'+ str(info.nni_port),
khenaidoob9203542018-09-17 22:56:37 -0400274 type=Port.ETHERNET_NNI,
khenaidoob9203542018-09-17 22:56:37 -0400275 oper_status=OperStatus.ACTIVE
276 )
277 self.nni_port = nni_port
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400278 yield self.core_proxy.port_created(device.id, nni_port)
279 yield self.core_proxy.port_created(device.id, Port(
khenaidoob9203542018-09-17 22:56:37 -0400280 port_no=1,
khenaidoo54544ae2019-03-18 13:22:39 -0400281 label='pon-1',
khenaidoob9203542018-09-17 22:56:37 -0400282 type=Port.PON_OLT,
khenaidoob9203542018-09-17 22:56:37 -0400283 oper_status=OperStatus.ACTIVE
284 ))
khenaidoo92e62c52018-10-03 14:02:54 -0400285
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400286 yield self.core_proxy.device_state_update(device.id,
287 connect_status=ConnectStatus.REACHABLE,
288 oper_status=OperStatus.ACTIVE)
khenaidoob9203542018-09-17 22:56:37 -0400289
290 # register ONUS
291 self.log.info('onu-found', onus=info.onus, len=len(info.onus))
292 for onu in info.onus:
293 vlan_id = onu.uni_port
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400294 yield self.core_proxy.child_device_detected(
khenaidoob9203542018-09-17 22:56:37 -0400295 parent_device_id=device.id,
296 parent_port_no=1,
297 child_device_type='ponsim_onu',
298 channel_id=vlan_id,
khenaidoo1ce37ad2019-03-24 22:07:24 -0400299 serial_number=onu.serial_number,
khenaidoob9203542018-09-17 22:56:37 -0400300 )
301
302 self.log.info('starting-frame-grpc-stream')
303 reactor.callInThread(self.rcv_grpc)
304 self.log.info('started-frame-grpc-stream')
305
khenaidoob9203542018-09-17 22:56:37 -0400306 # Start collecting stats from the device after a brief pause
khenaidoo92e62c52018-10-03 14:02:54 -0400307 self.start_kpi_collection(device.id)
khenaidoob9203542018-09-17 22:56:37 -0400308 except Exception as e:
309 log.exception("Exception-activating", e=e)
310
khenaidoob9203542018-09-17 22:56:37 -0400311 def get_ofp_device_info(self, device):
312 return SwitchCapability(
313 desc=ofp_desc(
314 hw_desc='ponsim pon',
315 sw_desc='ponsim pon',
316 serial_num=device.serial_number,
khenaidoo54e0ddf2019-02-27 16:21:33 -0500317 mfr_desc="VOLTHA Project",
khenaidoob9203542018-09-17 22:56:37 -0400318 dp_desc='n/a'
319 ),
320 switch_features=ofp_switch_features(
321 n_buffers=256, # TODO fake for now
322 n_tables=2, # TODO ditto
323 capabilities=( # TODO and ditto
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400324 OFPC_FLOW_STATS
325 | OFPC_TABLE_STATS
326 | OFPC_PORT_STATS
327 | OFPC_GROUP_STATS
khenaidoob9203542018-09-17 22:56:37 -0400328 )
329 )
330 )
331
332 def get_ofp_port_info(self, device, port_no):
333 # Since the adapter created the device port then it has the reference of the port to
334 # return the capability. TODO: Do a lookup on the NNI port number and return the
335 # appropriate attributes
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400336 self.log.info('get_ofp_port_info', port_no=port_no,
337 info=self.ofp_port_no, device_id=device.id)
khenaidoob9203542018-09-17 22:56:37 -0400338 cap = OFPPF_1GB_FD | OFPPF_FIBER
339 return PortCapability(
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400340 port=LogicalPort(
khenaidoob9203542018-09-17 22:56:37 -0400341 ofp_port=ofp_port(
khenaidoob9203542018-09-17 22:56:37 -0400342 hw_addr=mac_str_to_tuple(
khenaidoobcf205b2019-01-25 22:21:14 -0500343 'AA:BB:CC:DD:EE:%02x' % port_no),
khenaidoob9203542018-09-17 22:56:37 -0400344 config=0,
345 state=OFPPS_LIVE,
346 curr=cap,
347 advertised=cap,
348 peer=cap,
349 curr_speed=OFPPF_1GB_FD,
350 max_speed=OFPPF_1GB_FD
khenaidoo19d7b632018-10-30 10:49:50 -0400351 ),
352 device_id=device.id,
353 device_port_no=port_no
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400354 )
khenaidoob9203542018-09-17 22:56:37 -0400355 )
356
khenaidoo4d4802d2018-10-04 21:59:49 -0400357 # TODO - change for core 2.0
khenaidoob9203542018-09-17 22:56:37 -0400358 def reconcile(self, device):
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400359 self.log.info('reconciling-OLT-device')
khenaidoob9203542018-09-17 22:56:37 -0400360
khenaidoo90847922018-12-03 14:47:51 -0500361 @inlineCallbacks
khenaidoofdbad6e2018-11-06 22:26:38 -0500362 def _rcv_frame(self, frame):
363 pkt = Ether(frame)
364
365 if pkt.haslayer(Dot1Q):
366 outer_shim = pkt.getlayer(Dot1Q)
367
368 if isinstance(outer_shim.payload, Dot1Q):
369 inner_shim = outer_shim.payload
370 cvid = inner_shim.vlan
371 popped_frame = (
372 Ether(src=pkt.src, dst=pkt.dst, type=inner_shim.type) /
373 inner_shim.payload
374 )
375 self.log.info('sending-packet-in',device_id=self.device_id, port=cvid)
khenaidoo90847922018-12-03 14:47:51 -0500376 yield self.core_proxy.send_packet_in(device_id=self.device_id,
khenaidoofdbad6e2018-11-06 22:26:38 -0500377 port=cvid,
378 packet=str(popped_frame))
379 elif pkt.haslayer(Raw):
380 raw_data = json.loads(pkt.getlayer(Raw).load)
381 self.alarms.send_alarm(self, raw_data)
382
khenaidoob9203542018-09-17 22:56:37 -0400383 @inlineCallbacks
384 def rcv_grpc(self):
385 """
386 This call establishes a GRPC stream to receive frames.
387 """
388 yield self.get_channel()
William Kurkianfc0dcda2019-04-08 16:54:36 -0400389 stub = PonSimStub(self.channel)
khenaidoob9203542018-09-17 22:56:37 -0400390
391 # Attempt to establish a grpc stream with the remote ponsim service
392 self.frames = stub.ReceiveFrames(Empty())
393
394 self.log.info('start-receiving-grpc-frames')
395
396 try:
397 for frame in self.frames:
398 self.log.info('received-grpc-frame',
399 frame_len=len(frame.payload))
khenaidoo303a26f2019-02-28 11:53:32 -0500400 self._rcv_frame(frame.payload)
khenaidoob9203542018-09-17 22:56:37 -0400401
402 except _Rendezvous, e:
403 log.warn('grpc-connection-lost', message=e.message)
404
405 self.log.info('stopped-receiving-grpc-frames')
406
khenaidoo19d7b632018-10-30 10:49:50 -0400407 @inlineCallbacks
khenaidoob9203542018-09-17 22:56:37 -0400408 def update_flow_table(self, flows):
khenaidoo19d7b632018-10-30 10:49:50 -0400409 yield self.get_channel()
William Kurkianfc0dcda2019-04-08 16:54:36 -0400410 stub = PonSimStub(self.channel)
khenaidoob9203542018-09-17 22:56:37 -0400411
khenaidoo19d7b632018-10-30 10:49:50 -0400412 self.log.info('pushing-olt-flow-table')
khenaidoob9203542018-09-17 22:56:37 -0400413 stub.UpdateFlowTable(FlowTable(
414 port=0,
415 flows=flows
416 ))
417 self.log.info('success')
418
419 def remove_from_flow_table(self, flows):
420 self.log.debug('remove-from-flow-table', flows=flows)
421 # TODO: Update PONSIM code to accept incremental flow changes
422 # Once completed, the accepts_add_remove_flow_updates for this
423 # device type can be set to True
424
425 def add_to_flow_table(self, flows):
426 self.log.debug('add-to-flow-table', flows=flows)
427 # TODO: Update PONSIM code to accept incremental flow changes
428 # Once completed, the accepts_add_remove_flow_updates for this
429 # device type can be set to True
430
431 def update_pm_config(self, device, pm_config):
432 log.info("handler-update-pm-config", device=device,
433 pm_config=pm_config)
434 self.pm_metrics.update(pm_config)
435
436 def send_proxied_message(self, proxy_address, msg):
437 self.log.info('sending-proxied-message')
438 if isinstance(msg, FlowTable):
William Kurkianfc0dcda2019-04-08 16:54:36 -0400439 stub = PonSimStub(self.get_channel())
khenaidoob9203542018-09-17 22:56:37 -0400440 self.log.info('pushing-onu-flow-table', port=msg.port)
441 res = stub.UpdateFlowTable(msg)
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400442 self.core_proxy.receive_proxied_message(proxy_address, res)
443
444 @inlineCallbacks
445 def process_inter_adapter_message(self, request):
446 self.log.info('process-inter-adapter-message', msg=request)
447 try:
448 if request.header.type == InterAdapterMessageType.FLOW_REQUEST:
449 f = FlowTable()
450 if request.body:
451 request.body.Unpack(f)
William Kurkianfc0dcda2019-04-08 16:54:36 -0400452 stub = PonSimStub(self.channel)
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400453 self.log.info('pushing-onu-flow-table')
454 res = stub.UpdateFlowTable(f)
455 # Send response back
456 reply = InterAdapterResponseBody()
khenaidoo91ecfd62018-11-04 17:13:42 -0500457 reply.status = True
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400458 self.log.info('sending-response-back', reply=reply)
459 yield self.adapter_proxy.send_inter_adapter_message(
460 msg=reply,
461 type=InterAdapterMessageType.FLOW_RESPONSE,
462 from_adapter=self.adapter.name,
463 to_adapter=request.header.from_topic,
464 to_device_id=request.header.to_device_id,
khenaidoo91ecfd62018-11-04 17:13:42 -0500465 message_id=request.header.id
466 )
467 elif request.header.type == InterAdapterMessageType.METRICS_REQUEST:
468 m = PonSimMetricsRequest()
469 if request.body:
470 request.body.Unpack(m)
William Kurkianfc0dcda2019-04-08 16:54:36 -0400471 stub = PonSimStub(self.channel)
khenaidoo91ecfd62018-11-04 17:13:42 -0500472 self.log.info('proxying onu stats request', port=m.port)
473 res = stub.GetStats(m)
474 # Send response back
475 reply = InterAdapterResponseBody()
476 reply.status = True
477 reply.body.Pack(res)
478 self.log.info('sending-response-back', reply=reply)
479 yield self.adapter_proxy.send_inter_adapter_message(
480 msg=reply,
481 type=InterAdapterMessageType.METRICS_RESPONSE,
482 from_adapter=self.adapter.name,
483 to_adapter=request.header.from_topic,
484 to_device_id=request.header.to_device_id,
485 message_id=request.header.id
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400486 )
487 except Exception as e:
488 self.log.exception("error-processing-inter-adapter-message", e=e)
khenaidoob9203542018-09-17 22:56:37 -0400489
490 def packet_out(self, egress_port, msg):
491 self.log.info('sending-packet-out', egress_port=egress_port,
492 msg=hexify(msg))
khenaidoobcf205b2019-01-25 22:21:14 -0500493 try:
494 pkt = Ether(msg)
495 out_pkt = pkt
496 if egress_port != self.nni_port.port_no:
497 # don't do the vlan manipulation for the NNI port, vlans are already correct
498 out_pkt = (
499 Ether(src=pkt.src, dst=pkt.dst) /
500 Dot1Q(vlan=egress_port, type=pkt.type) /
501 pkt.payload
502 )
khenaidoob9203542018-09-17 22:56:37 -0400503
khenaidoobcf205b2019-01-25 22:21:14 -0500504 # TODO need better way of mapping logical ports to PON ports
505 out_port = self.nni_port.port_no if egress_port == self.nni_port.port_no else 1
khenaidoob9203542018-09-17 22:56:37 -0400506
khenaidoobcf205b2019-01-25 22:21:14 -0500507 # send over grpc stream
William Kurkianfc0dcda2019-04-08 16:54:36 -0400508 stub = PonSimStub(self.channel)
khenaidoobcf205b2019-01-25 22:21:14 -0500509 frame = PonSimFrame(id=self.device_id, payload=str(out_pkt),
510 out_port=out_port)
511 stub.SendFrame(frame)
512 except Exception as e:
513 self.log.exception("error-processing-packet-out", e=e)
514
khenaidoob9203542018-09-17 22:56:37 -0400515
khenaidoob9203542018-09-17 22:56:37 -0400516 @inlineCallbacks
517 def reboot(self):
518 self.log.info('rebooting', device_id=self.device_id)
519
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400520 yield self.core_proxy.device_state_update(self.device_id,
521 connect_status=ConnectStatus.UNREACHABLE)
khenaidoob9203542018-09-17 22:56:37 -0400522
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400523 # Update the child devices connect state to UNREACHABLE
524 yield self.core_proxy.children_state_update(self.device_id,
525 connect_status=ConnectStatus.UNREACHABLE)
khenaidoob9203542018-09-17 22:56:37 -0400526
527 # Sleep 10 secs, simulating a reboot
528 # TODO: send alert and clear alert after the reboot
529 yield asleep(10)
530
khenaidoo4d4802d2018-10-04 21:59:49 -0400531 # Change the connection status back to REACHABLE. With a
532 # real OLT the connection state must be the actual state
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400533 yield self.core_proxy.device_state_update(self.device_id,
534 connect_status=ConnectStatus.REACHABLE)
khenaidoob9203542018-09-17 22:56:37 -0400535
536 # Update the child devices connect state to REACHABLE
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400537 yield self.core_proxy.children_state_update(self.device_id,
538 connect_status=ConnectStatus.REACHABLE)
khenaidoob9203542018-09-17 22:56:37 -0400539
540 self.log.info('rebooted', device_id=self.device_id)
541
542 def self_test_device(self, device):
543 """
544 This is called to Self a device based on a NBI call.
545 :param device: A Voltha.Device object.
546 :return: Will return result of self test
547 """
548 log.info('self-test-device', device=device.id)
549 raise NotImplementedError()
550
khenaidoo92e62c52018-10-03 14:02:54 -0400551 @inlineCallbacks
khenaidoob9203542018-09-17 22:56:37 -0400552 def disable(self):
553 self.log.info('disabling', device_id=self.device_id)
554
555 self.stop_kpi_collection()
556
khenaidoo92e62c52018-10-03 14:02:54 -0400557 # Update the operational status to UNKNOWN and connection status to UNREACHABLE
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400558 yield self.core_proxy.device_state_update(self.device_id,
559 oper_status=OperStatus.UNKNOWN,
560 connect_status=ConnectStatus.UNREACHABLE)
khenaidoob9203542018-09-17 22:56:37 -0400561
562 self.close_channel()
563 self.log.info('disabled-grpc-channel')
564
khenaidoo91ecfd62018-11-04 17:13:42 -0500565 self.stop_kpi_collection()
566
khenaidoob9203542018-09-17 22:56:37 -0400567 # TODO:
568 # 1) Remove all flows from the device
569 # 2) Remove the device from ponsim
570
khenaidoo92e62c52018-10-03 14:02:54 -0400571 self.log.info('disabled', device_id=self.device_id)
khenaidoob9203542018-09-17 22:56:37 -0400572
khenaidoo92e62c52018-10-03 14:02:54 -0400573 @inlineCallbacks
khenaidoob9203542018-09-17 22:56:37 -0400574 def reenable(self):
575 self.log.info('re-enabling', device_id=self.device_id)
576
khenaidoob9203542018-09-17 22:56:37 -0400577 # Set the ofp_port_no and nni_port in case we bypassed the reconcile
578 # process if the device was in DISABLED state on voltha restart
579 if not self.ofp_port_no and not self.nni_port:
khenaidoo92e62c52018-10-03 14:02:54 -0400580 yield self.get_channel()
William Kurkianfc0dcda2019-04-08 16:54:36 -0400581 stub = PonSimStub(self.channel)
khenaidoob9203542018-09-17 22:56:37 -0400582 info = stub.GetDeviceInfo(Empty())
583 log.info('got-info', info=info)
584 self.ofp_port_no = info.nni_port
khenaidoo92e62c52018-10-03 14:02:54 -0400585 ports = yield self._get_nni_port()
586 # For ponsim, we are using only 1 NNI port
587 if ports.items:
588 self.nni_port = ports.items[0]
khenaidoob9203542018-09-17 22:56:37 -0400589
khenaidoo92e62c52018-10-03 14:02:54 -0400590 # Update the state of the NNI port
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400591 yield self.core_proxy.port_state_update(self.device_id,
592 port_type=Port.ETHERNET_NNI,
593 port_no=self.ofp_port_no,
594 oper_status=OperStatus.ACTIVE)
khenaidoob9203542018-09-17 22:56:37 -0400595
khenaidoo92e62c52018-10-03 14:02:54 -0400596 # Update the state of the PON port
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400597 yield self.core_proxy.port_state_update(self.device_id,
598 port_type=Port.PON_OLT,
599 port_no=1,
600 oper_status=OperStatus.ACTIVE)
khenaidoob9203542018-09-17 22:56:37 -0400601
khenaidoo92e62c52018-10-03 14:02:54 -0400602 # Set the operational state of the device to ACTIVE and connect status to REACHABLE
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400603 yield self.core_proxy.device_state_update(self.device_id,
604 connect_status=ConnectStatus.REACHABLE,
605 oper_status=OperStatus.ACTIVE)
khenaidoob9203542018-09-17 22:56:37 -0400606
khenaidoo92e62c52018-10-03 14:02:54 -0400607 # TODO: establish frame grpc-stream
608 # yield reactor.callInThread(self.rcv_grpc)
khenaidoob9203542018-09-17 22:56:37 -0400609
khenaidoo92e62c52018-10-03 14:02:54 -0400610 self.start_kpi_collection(self.device_id)
khenaidoob9203542018-09-17 22:56:37 -0400611
khenaidoo92e62c52018-10-03 14:02:54 -0400612 self.log.info('re-enabled', device_id=self.device_id)
khenaidoob9203542018-09-17 22:56:37 -0400613
614 def delete(self):
615 self.log.info('deleting', device_id=self.device_id)
616
khenaidoob9203542018-09-17 22:56:37 -0400617 self.close_channel()
618 self.log.info('disabled-grpc-channel')
619
620 # TODO:
621 # 1) Remove all flows from the device
622 # 2) Remove the device from ponsim
623
624 self.log.info('deleted', device_id=self.device_id)
625
626 def start_kpi_collection(self, device_id):
627
khenaidoo4d4802d2018-10-04 21:59:49 -0400628 kafka_cluster_proxy = get_kafka_proxy()
629
khenaidoob9203542018-09-17 22:56:37 -0400630 def _collect(device_id, prefix):
631
632 try:
633 # Step 1: gather metrics from device
634 port_metrics = \
khenaidoo92e62c52018-10-03 14:02:54 -0400635 self.pm_metrics.collect_port_metrics(self.channel)
khenaidoob9203542018-09-17 22:56:37 -0400636
637 # Step 2: prepare the KpiEvent for submission
638 # we can time-stamp them here (or could use time derived from OLT
639 ts = arrow.utcnow().timestamp
640 kpi_event = KpiEvent(
641 type=KpiEventType.slice,
642 ts=ts,
643 prefixes={
644 # OLT NNI port
645 prefix + '.nni': MetricValuePairs(
646 metrics=port_metrics['nni']),
647 # OLT PON port
648 prefix + '.pon': MetricValuePairs(
649 metrics=port_metrics['pon'])
650 }
651 )
652
khenaidoo91ecfd62018-11-04 17:13:42 -0500653 # Step 3: submit directly to the kafka bus
khenaidoo4d4802d2018-10-04 21:59:49 -0400654 if kafka_cluster_proxy:
655 if isinstance(kpi_event, Message):
656 kpi_event = dumps(MessageToDict(kpi_event, True, True))
657 kafka_cluster_proxy.send_message("voltha.kpis", kpi_event)
khenaidoob9203542018-09-17 22:56:37 -0400658
659 except Exception as e:
660 log.exception('failed-to-submit-kpis', e=e)
661
662 self.pm_metrics.start_collector(_collect)
663
664 def stop_kpi_collection(self):
665 self.pm_metrics.stop_collector()