blob: c99fcac3a03d7146777fdd8835745dc54727f472 [file] [log] [blame]
Scott Baker12f1ef82019-10-14 13:06:14 -07001#
2# Copyright 2017 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18Fully simulated OLT adapter.
19"""
20
21import arrow
22import grpc
23import structlog
24from google.protobuf.empty_pb2 import Empty
25from google.protobuf.json_format import MessageToDict
26from scapy.layers.inet import Raw
27import json
28from google.protobuf.message import Message
29from grpc._channel import _Rendezvous
30from scapy.layers.l2 import Ether, Dot1Q
31from simplejson import dumps
32from twisted.internet import reactor
33from twisted.internet.defer import inlineCallbacks, returnValue
34from twisted.internet.task import LoopingCall
35
36from pyvoltha.adapters.common.frameio.frameio import BpfProgramFilter, hexify
37from pyvoltha.common.utils.asleep import asleep
38from pyvoltha.common.utils.registry import registry
39from pyvoltha.adapters.iadapter import OltAdapter
40from pyvoltha.adapters.kafka.kafka_proxy import get_kafka_proxy
41from voltha_protos.ponsim_pb2_grpc import PonSimStub
42from voltha_protos.common_pb2 import OperStatus, ConnectStatus
43from voltha_protos.inter_container_pb2 import SwitchCapability, PortCapability, \
44 InterAdapterMessageType, InterAdapterResponseBody
45from voltha_protos.device_pb2 import Port, PmConfig, PmConfigs
46from voltha_protos.events_pb2 import KpiEvent, KpiEventType, MetricValuePairs
47from voltha_protos.logical_device_pb2 import LogicalPort
48from voltha_protos.openflow_13_pb2 import OFPPS_LIVE, OFPPF_FIBER, \
49 OFPPF_1GB_FD, \
50 OFPC_GROUP_STATS, OFPC_PORT_STATS, OFPC_TABLE_STATS, OFPC_FLOW_STATS, \
51 ofp_switch_features, ofp_desc
52from voltha_protos.openflow_13_pb2 import ofp_port
53from voltha_protos.ponsim_pb2 import FlowTable, PonSimFrame, PonSimMetricsRequest
54
55log = structlog.get_logger()
56
57PACKET_IN_VLAN = 4000
58is_inband_frame = BpfProgramFilter('(ether[14:2] & 0xfff) = 0x{:03x}'.format(
59 PACKET_IN_VLAN))
60
61
62def mac_str_to_tuple(mac):
63 return tuple(int(d, 16) for d in mac.split(':'))
64
65
66class AdapterPmMetrics:
67 def __init__(self, device):
68 self.pm_names = {'tx_64_pkts', 'tx_65_127_pkts', 'tx_128_255_pkts',
69 'tx_256_511_pkts', 'tx_512_1023_pkts',
70 'tx_1024_1518_pkts', 'tx_1519_9k_pkts',
71 'rx_64_pkts', 'rx_65_127_pkts',
72 'rx_128_255_pkts', 'rx_256_511_pkts',
73 'rx_512_1023_pkts', 'rx_1024_1518_pkts',
74 'rx_1519_9k_pkts'}
75 self.device = device
76 self.id = device.id
77 self.name = 'ponsim_olt'
78 self.default_freq = 150
79 self.grouped = False
80 self.freq_override = False
81 self.pon_metrics_config = dict()
82 self.nni_metrics_config = dict()
83 self.lc = None
84 for m in self.pm_names:
85 self.pon_metrics_config[m] = PmConfig(name=m,
86 type=PmConfig.COUNTER,
87 enabled=True)
88 self.nni_metrics_config[m] = PmConfig(name=m,
89 type=PmConfig.COUNTER,
90 enabled=True)
91
92 def update(self, pm_config):
93 if self.default_freq != pm_config.default_freq:
94 # Update the callback to the new frequency.
95 self.default_freq = pm_config.default_freq
96 self.lc.stop()
97 self.lc.start(interval=self.default_freq / 10)
98 for m in pm_config.metrics:
99 self.pon_metrics_config[m.name].enabled = m.enabled
100 self.nni_metrics_config[m.name].enabled = m.enabled
101
102 def make_proto(self):
103 pm_config = PmConfigs(
104 id=self.id,
105 default_freq=self.default_freq,
106 grouped=False,
107 freq_override=False)
108 for m in sorted(self.pon_metrics_config):
109 pm = self.pon_metrics_config[m] # Either will do they're the same
110 pm_config.metrics.extend([PmConfig(name=pm.name,
111 type=pm.type,
112 enabled=pm.enabled)])
113 return pm_config
114
115 def collect_port_metrics(self, channel):
116 rtrn_port_metrics = dict()
117 stub = PonSimStub(channel)
118 stats = stub.GetStats(Empty())
119 rtrn_port_metrics['pon'] = self.extract_pon_metrics(stats)
120 rtrn_port_metrics['nni'] = self.extract_nni_metrics(stats)
121 return rtrn_port_metrics
122
123 def extract_pon_metrics(self, stats):
124 rtrn_pon_metrics = dict()
125 for m in stats.metrics:
126 if m.port_name == "pon":
127 for p in m.packets:
128 if self.pon_metrics_config[p.name].enabled:
129 rtrn_pon_metrics[p.name] = p.value
130 return rtrn_pon_metrics
131
132 def extract_nni_metrics(self, stats):
133 rtrn_pon_metrics = dict()
134 for m in stats.metrics:
135 if m.port_name == "nni":
136 for p in m.packets:
137 if self.pon_metrics_config[p.name].enabled:
138 rtrn_pon_metrics[p.name] = p.value
139 return rtrn_pon_metrics
140
141 def start_collector(self, callback):
142 log.info("starting-pm-collection", device_name=self.name,
143 device_id=self.device.id)
144 prefix = 'voltha.{}.{}'.format(self.name, self.device.id)
145 self.lc = LoopingCall(callback, self.device.id, prefix)
146 self.lc.start(interval=self.default_freq / 10)
147
148 def stop_collector(self):
149 log.info("stopping-pm-collection", device_name=self.name,
150 device_id=self.device.id)
151 self.lc.stop()
152
153
154class AdapterAlarms:
155 def __init__(self, adapter, device):
156 self.adapter = adapter
157 self.device = device
158 self.lc = None
159
160 # TODO: Implement code to send to kafka cluster directly instead of
161 # going through the voltha core.
162 def send_alarm(self, context_data, alarm_data):
163 log.debug("send-alarm-not-implemented")
164 return
165
166
167class PonSimOltAdapter(OltAdapter):
168 def __init__(self, core_proxy, adapter_proxy, config):
169 super(PonSimOltAdapter, self).__init__(core_proxy=core_proxy,
170 adapter_proxy=adapter_proxy,
171 config=config,
172 device_handler_class=PonSimOltHandler,
173 name='ponsim_olt',
174 vendor='Voltha project',
175 version='0.4',
176 device_type='ponsim_olt',
177 accepts_bulk_flow_update=True,
178 accepts_add_remove_flow_updates=False)
179
180 def update_pm_config(self, device, pm_config):
181 log.info("adapter-update-pm-config", device=device,
182 pm_config=pm_config)
183 handler = self.devices_handlers[device.id]
184 handler.update_pm_config(device, pm_config)
185
186
187class PonSimOltHandler(object):
188 def __init__(self, adapter, device_id):
189 self.adapter = adapter
190 self.core_proxy = adapter.core_proxy
191 self.adapter_proxy = adapter.adapter_proxy
192 self.device_id = device_id
193 self.log = structlog.get_logger(device_id=device_id)
194 self.channel = None
195 self.io_port = None
196 self.logical_device_id = None
197 self.nni_port = None
198 self.ofp_port_no = None
199 self.interface = registry('main').get_args().interface
200 self.pm_metrics = None
201 self.alarms = None
202 self.frames = None
203
204 @inlineCallbacks
205 def get_channel(self):
206 if self.channel is None:
207 try:
208 device = yield self.core_proxy.get_device(self.device_id)
209 self.log.info('device-info', device=device,
210 host_port=device.host_and_port)
211 self.channel = grpc.insecure_channel(device.host_and_port)
212 except Exception as e:
213 log.exception("ponsim-connection-failure", e=e)
214
215 # returnValue(self.channel)
216
217 def close_channel(self):
218 if self.channel is None:
219 self.log.info('grpc-channel-already-closed')
220 return
221 else:
222 if self.frames is not None:
223 self.frames.cancel()
224 self.frames = None
225 self.log.info('cancelled-grpc-frame-stream')
226
227 self.channel.unsubscribe(lambda *args: None)
228 self.channel = None
229
230 self.log.info('grpc-channel-closed')
231
232 @inlineCallbacks
233 def _get_nni_port(self):
234 ports = yield self.core_proxy.get_ports(self.device_id,
235 Port.ETHERNET_NNI)
236 returnValue(ports)
237
238 @inlineCallbacks
239 def activate(self, device):
240 try:
241 self.log.info('activating')
242
243 if not device.host_and_port:
244 device.oper_status = OperStatus.FAILED
245 device.reason = 'No host_and_port field provided'
246 self.core_proxy.device_update(device)
247 return
248
249 yield self.get_channel()
250 stub = PonSimStub(self.channel)
251 info = stub.GetDeviceInfo(Empty())
252 log.info('got-info', info=info, device_id=device.id)
253 self.ofp_port_no = info.nni_port
254
255 device.root = True
256 device.vendor = 'ponsim'
257 device.model = 'n/a'
258 device.serial_number = device.host_and_port
259 device.mac_address = "AA:BB:CC:DD:EE:FF"
260 yield self.core_proxy.device_update(device)
261
262 # Now set the initial PM configuration for this device
263 self.pm_metrics = AdapterPmMetrics(device)
264 pm_config = self.pm_metrics.make_proto()
265 log.info("initial-pm-config", pm_config=pm_config)
266 yield self.core_proxy.device_pm_config_update(pm_config, init=True)
267
268 # Setup alarm handler
269 self.alarms = AdapterAlarms(self.adapter, device)
270
271 nni_port = Port(
272 port_no=info.nni_port,
273 label='nni-'+ str(info.nni_port),
274 type=Port.ETHERNET_NNI,
275 oper_status=OperStatus.ACTIVE
276 )
277 self.nni_port = nni_port
278 yield self.core_proxy.port_created(device.id, nni_port)
279 yield self.core_proxy.port_created(device.id, Port(
280 port_no=1,
281 label='pon-1',
282 type=Port.PON_OLT,
283 oper_status=OperStatus.ACTIVE
284 ))
285
286 yield self.core_proxy.device_state_update(device.id,
287 connect_status=ConnectStatus.REACHABLE,
288 oper_status=OperStatus.ACTIVE)
289
290 # register ONUS
291 self.log.info('onu-found', onus=info.onus, len=len(info.onus))
292 for onu in info.onus:
293 vlan_id = onu.uni_port
294 yield self.core_proxy.child_device_detected(
295 parent_device_id=device.id,
296 parent_port_no=1,
297 child_device_type='ponsim_onu',
298 channel_id=vlan_id,
299 serial_number=onu.serial_number,
300 )
301
302 self.log.info('starting-frame-grpc-stream')
303 reactor.callInThread(self.rcv_grpc)
304 self.log.info('started-frame-grpc-stream')
305
306 # Start collecting stats from the device after a brief pause
307 self.start_kpi_collection(device.id)
308 except Exception as e:
309 log.exception("Exception-activating", e=e)
310
311 def get_ofp_device_info(self, device):
312 return SwitchCapability(
313 desc=ofp_desc(
314 hw_desc='ponsim pon',
315 sw_desc='ponsim pon',
316 serial_num=device.serial_number,
317 mfr_desc="VOLTHA Project",
318 dp_desc='n/a'
319 ),
320 switch_features=ofp_switch_features(
321 n_buffers=256, # TODO fake for now
322 n_tables=2, # TODO ditto
323 capabilities=( # TODO and ditto
324 OFPC_FLOW_STATS
325 | OFPC_TABLE_STATS
326 | OFPC_PORT_STATS
327 | OFPC_GROUP_STATS
328 )
329 )
330 )
331
332 def get_ofp_port_info(self, device, port_no):
333 # Since the adapter created the device port then it has the reference of the port to
334 # return the capability. TODO: Do a lookup on the NNI port number and return the
335 # appropriate attributes
336 self.log.info('get_ofp_port_info', port_no=port_no,
337 info=self.ofp_port_no, device_id=device.id)
338 cap = OFPPF_1GB_FD | OFPPF_FIBER
339 return PortCapability(
340 port=LogicalPort(
341 ofp_port=ofp_port(
342 hw_addr=mac_str_to_tuple(
343 'AA:BB:CC:DD:EE:%02x' % port_no),
344 config=0,
345 state=OFPPS_LIVE,
346 curr=cap,
347 advertised=cap,
348 peer=cap,
349 curr_speed=OFPPF_1GB_FD,
350 max_speed=OFPPF_1GB_FD
351 ),
352 device_id=device.id,
353 device_port_no=port_no
354 )
355 )
356
357 # TODO - change for core 2.0
358 def reconcile(self, device):
359 self.log.info('reconciling-OLT-device')
360
361 @inlineCallbacks
362 def _rcv_frame(self, frame):
363 pkt = Ether(frame)
364
365 if pkt.haslayer(Dot1Q):
366 outer_shim = pkt.getlayer(Dot1Q)
367
368 if isinstance(outer_shim.payload, Dot1Q):
369 inner_shim = outer_shim.payload
370 cvid = inner_shim.vlan
371 popped_frame = (
372 Ether(src=pkt.src, dst=pkt.dst, type=inner_shim.type) /
373 inner_shim.payload
374 )
375 self.log.info('sending-packet-in',device_id=self.device_id, port=cvid)
376 yield self.core_proxy.send_packet_in(device_id=self.device_id,
377 port=cvid,
378 packet=str(popped_frame))
379 elif pkt.haslayer(Raw):
380 raw_data = json.loads(pkt.getlayer(Raw).load)
381 self.alarms.send_alarm(self, raw_data)
382
383 @inlineCallbacks
384 def rcv_grpc(self):
385 """
386 This call establishes a GRPC stream to receive frames.
387 """
388 yield self.get_channel()
389 stub = PonSimStub(self.channel)
390
391 # Attempt to establish a grpc stream with the remote ponsim service
392 self.frames = stub.ReceiveFrames(Empty())
393
394 self.log.info('start-receiving-grpc-frames')
395
396 try:
397 for frame in self.frames:
398 self.log.info('received-grpc-frame',
399 frame_len=len(frame.payload))
400 self._rcv_frame(frame.payload)
401
402 except _Rendezvous, e:
403 log.warn('grpc-connection-lost', message=e.message)
404
405 self.log.info('stopped-receiving-grpc-frames')
406
407 @inlineCallbacks
408 def update_flow_table(self, flows):
409 yield self.get_channel()
410 stub = PonSimStub(self.channel)
411
412 self.log.info('pushing-olt-flow-table')
413 stub.UpdateFlowTable(FlowTable(
414 port=0,
415 flows=flows
416 ))
417 self.log.info('success')
418
419 def remove_from_flow_table(self, flows):
420 self.log.debug('remove-from-flow-table', flows=flows)
421 # TODO: Update PONSIM code to accept incremental flow changes
422 # Once completed, the accepts_add_remove_flow_updates for this
423 # device type can be set to True
424
425 def add_to_flow_table(self, flows):
426 self.log.debug('add-to-flow-table', flows=flows)
427 # TODO: Update PONSIM code to accept incremental flow changes
428 # Once completed, the accepts_add_remove_flow_updates for this
429 # device type can be set to True
430
431 def update_pm_config(self, device, pm_config):
432 log.info("handler-update-pm-config", device=device,
433 pm_config=pm_config)
434 self.pm_metrics.update(pm_config)
435
436 def send_proxied_message(self, proxy_address, msg):
437 self.log.info('sending-proxied-message')
438 if isinstance(msg, FlowTable):
439 stub = PonSimStub(self.get_channel())
440 self.log.info('pushing-onu-flow-table', port=msg.port)
441 res = stub.UpdateFlowTable(msg)
442 self.core_proxy.receive_proxied_message(proxy_address, res)
443
444 @inlineCallbacks
445 def process_inter_adapter_message(self, request):
446 self.log.info('process-inter-adapter-message', msg=request)
447 try:
448 if request.header.type == InterAdapterMessageType.FLOW_REQUEST:
449 f = FlowTable()
450 if request.body:
451 request.body.Unpack(f)
452 stub = PonSimStub(self.channel)
453 self.log.info('pushing-onu-flow-table')
454 res = stub.UpdateFlowTable(f)
455 # Send response back
456 reply = InterAdapterResponseBody()
457 reply.status = True
458 self.log.info('sending-response-back', reply=reply)
459 yield self.adapter_proxy.send_inter_adapter_message(
460 msg=reply,
461 type=InterAdapterMessageType.FLOW_RESPONSE,
462 from_adapter=self.adapter.name,
463 to_adapter=request.header.from_topic,
464 to_device_id=request.header.to_device_id,
465 message_id=request.header.id
466 )
467 elif request.header.type == InterAdapterMessageType.METRICS_REQUEST:
468 m = PonSimMetricsRequest()
469 if request.body:
470 request.body.Unpack(m)
471 stub = PonSimStub(self.channel)
472 self.log.info('proxying onu stats request', port=m.port)
473 res = stub.GetStats(m)
474 # Send response back
475 reply = InterAdapterResponseBody()
476 reply.status = True
477 reply.body.Pack(res)
478 self.log.info('sending-response-back', reply=reply)
479 yield self.adapter_proxy.send_inter_adapter_message(
480 msg=reply,
481 type=InterAdapterMessageType.METRICS_RESPONSE,
482 from_adapter=self.adapter.name,
483 to_adapter=request.header.from_topic,
484 to_device_id=request.header.to_device_id,
485 message_id=request.header.id
486 )
487 except Exception as e:
488 self.log.exception("error-processing-inter-adapter-message", e=e)
489
490 def packet_out(self, egress_port, msg):
491 self.log.info('sending-packet-out', egress_port=egress_port,
492 msg=hexify(msg))
493 try:
494 pkt = Ether(msg)
495 out_pkt = pkt
496 if egress_port != self.nni_port.port_no:
497 # don't do the vlan manipulation for the NNI port, vlans are already correct
498 out_pkt = (
499 Ether(src=pkt.src, dst=pkt.dst) /
500 Dot1Q(vlan=egress_port, type=pkt.type) /
501 pkt.payload
502 )
503
504 # TODO need better way of mapping logical ports to PON ports
505 out_port = self.nni_port.port_no if egress_port == self.nni_port.port_no else 1
506
507 # send over grpc stream
508 stub = PonSimStub(self.channel)
509 frame = PonSimFrame(id=self.device_id, payload=str(out_pkt),
510 out_port=out_port)
511 stub.SendFrame(frame)
512 except Exception as e:
513 self.log.exception("error-processing-packet-out", e=e)
514
515
516 @inlineCallbacks
517 def reboot(self):
518 self.log.info('rebooting', device_id=self.device_id)
519
520 yield self.core_proxy.device_state_update(self.device_id,
521 connect_status=ConnectStatus.UNREACHABLE)
522
523 # Update the child devices connect state to UNREACHABLE
524 yield self.core_proxy.children_state_update(self.device_id,
525 connect_status=ConnectStatus.UNREACHABLE)
526
527 # Sleep 10 secs, simulating a reboot
528 # TODO: send alert and clear alert after the reboot
529 yield asleep(10)
530
531 # Change the connection status back to REACHABLE. With a
532 # real OLT the connection state must be the actual state
533 yield self.core_proxy.device_state_update(self.device_id,
534 connect_status=ConnectStatus.REACHABLE)
535
536 # Update the child devices connect state to REACHABLE
537 yield self.core_proxy.children_state_update(self.device_id,
538 connect_status=ConnectStatus.REACHABLE)
539
540 self.log.info('rebooted', device_id=self.device_id)
541
542 def self_test_device(self, device):
543 """
544 This is called to Self a device based on a NBI call.
545 :param device: A Voltha.Device object.
546 :return: Will return result of self test
547 """
548 log.info('self-test-device', device=device.id)
549 raise NotImplementedError()
550
551 @inlineCallbacks
552 def disable(self):
553 self.log.info('disabling', device_id=self.device_id)
554
555 self.stop_kpi_collection()
556
557 # Update the operational status to UNKNOWN and connection status to UNREACHABLE
558 yield self.core_proxy.device_state_update(self.device_id,
559 oper_status=OperStatus.UNKNOWN,
560 connect_status=ConnectStatus.UNREACHABLE)
561
562 self.close_channel()
563 self.log.info('disabled-grpc-channel')
564
565 self.stop_kpi_collection()
566
567 # TODO:
568 # 1) Remove all flows from the device
569 # 2) Remove the device from ponsim
570
571 self.log.info('disabled', device_id=self.device_id)
572
573 @inlineCallbacks
574 def reenable(self):
575 self.log.info('re-enabling', device_id=self.device_id)
576
577 # Set the ofp_port_no and nni_port in case we bypassed the reconcile
578 # process if the device was in DISABLED state on voltha restart
579 if not self.ofp_port_no and not self.nni_port:
580 yield self.get_channel()
581 stub = PonSimStub(self.channel)
582 info = stub.GetDeviceInfo(Empty())
583 log.info('got-info', info=info)
584 self.ofp_port_no = info.nni_port
585 ports = yield self._get_nni_port()
586 # For ponsim, we are using only 1 NNI port
587 if ports.items:
588 self.nni_port = ports.items[0]
589
590 # Update the state of the NNI port
591 yield self.core_proxy.port_state_update(self.device_id,
592 port_type=Port.ETHERNET_NNI,
593 port_no=self.ofp_port_no,
594 oper_status=OperStatus.ACTIVE)
595
596 # Update the state of the PON port
597 yield self.core_proxy.port_state_update(self.device_id,
598 port_type=Port.PON_OLT,
599 port_no=1,
600 oper_status=OperStatus.ACTIVE)
601
602 # Set the operational state of the device to ACTIVE and connect status to REACHABLE
603 yield self.core_proxy.device_state_update(self.device_id,
604 connect_status=ConnectStatus.REACHABLE,
605 oper_status=OperStatus.ACTIVE)
606
607 # TODO: establish frame grpc-stream
608 # yield reactor.callInThread(self.rcv_grpc)
609
610 self.start_kpi_collection(self.device_id)
611
612 self.log.info('re-enabled', device_id=self.device_id)
613
614 def delete(self):
615 self.log.info('deleting', device_id=self.device_id)
616
617 self.close_channel()
618 self.log.info('disabled-grpc-channel')
619
620 # TODO:
621 # 1) Remove all flows from the device
622 # 2) Remove the device from ponsim
623
624 self.log.info('deleted', device_id=self.device_id)
625
626 def start_kpi_collection(self, device_id):
627
628 kafka_cluster_proxy = get_kafka_proxy()
629
630 def _collect(device_id, prefix):
631
632 try:
633 # Step 1: gather metrics from device
634 port_metrics = \
635 self.pm_metrics.collect_port_metrics(self.channel)
636
637 # Step 2: prepare the KpiEvent for submission
638 # we can time-stamp them here (or could use time derived from OLT
639 ts = arrow.utcnow().timestamp
640 kpi_event = KpiEvent(
641 type=KpiEventType.slice,
642 ts=ts,
643 prefixes={
644 # OLT NNI port
645 prefix + '.nni': MetricValuePairs(
646 metrics=port_metrics['nni']),
647 # OLT PON port
648 prefix + '.pon': MetricValuePairs(
649 metrics=port_metrics['pon'])
650 }
651 )
652
653 # Step 3: submit directly to the kafka bus
654 if kafka_cluster_proxy:
655 if isinstance(kpi_event, Message):
656 kpi_event = dumps(MessageToDict(kpi_event, True, True))
657 kafka_cluster_proxy.send_message("voltha.kpis", kpi_event)
658
659 except Exception as e:
660 log.exception('failed-to-submit-kpis', e=e)
661
662 self.pm_metrics.start_collector(_collect)
663
664 def stop_kpi_collection(self):
665 self.pm_metrics.stop_collector()