blob: 0987147052e1aef03fcd44d80cbb1b18956c4233 [file] [log] [blame]
Scott Baker81739502019-10-14 16:53:10 -07001#
2# Copyright 2017 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18Represents an ONU device
19"""
20
21from uuid import uuid4
22
23import arrow
24import structlog
25from google.protobuf.json_format import MessageToDict
26from google.protobuf.message import Message
27from simplejson import dumps
28from twisted.internet.defer import DeferredQueue, inlineCallbacks, \
29 returnValue, Deferred
30from twisted.internet.task import LoopingCall
31
32from pyvoltha.common.utils.asleep import asleep
33from pyvoltha.adapters.iadapter import OnuAdapter
34from pyvoltha.adapters.kafka.kafka_proxy import get_kafka_proxy
35from voltha_protos.common_pb2 import OperStatus, ConnectStatus, AdminState
36from voltha_protos.inter_container_pb2 import PortCapability, \
37 InterAdapterMessageType, InterAdapterResponseBody
38from voltha_protos.device_pb2 import Port, PmConfig, PmConfigs
39from voltha_protos.events_pb2 import KpiEvent, KpiEventType, MetricValuePairs
40from voltha_protos.logical_device_pb2 import LogicalPort
41from voltha_protos.openflow_13_pb2 import OFPPS_LIVE, OFPPF_FIBER, \
42 OFPPF_1GB_FD
43from voltha_protos.openflow_13_pb2 import ofp_port
44from voltha_protos.ponsim_pb2 import FlowTable, PonSimMetricsRequest, PonSimMetrics
45
46log = structlog.get_logger()
47
48
49def mac_str_to_tuple(mac):
50 return tuple(int(d, 16) for d in mac.split(':'))
51
52
53class AdapterPmMetrics:
54 def __init__(self, device):
55 self.pm_names = {'tx_64_pkts', 'tx_65_127_pkts', 'tx_128_255_pkts',
56 'tx_256_511_pkts', 'tx_512_1023_pkts',
57 'tx_1024_1518_pkts', 'tx_1519_9k_pkts',
58 'rx_64_pkts', 'rx_65_127_pkts',
59 'rx_128_255_pkts', 'rx_256_511_pkts',
60 'rx_512_1023_pkts', 'rx_1024_1518_pkts',
61 'rx_1519_9k_pkts'}
62 self.device = device
63 self.id = device.id
64 self.name = 'ponsim_onu'
65 self.default_freq = 150
66 self.grouped = False
67 self.freq_override = False
68 self.pm_metrics = None
69 self.pon_metrics_config = dict()
70 self.uni_metrics_config = dict()
71 self.lc = None
72 for m in self.pm_names:
73 self.pon_metrics_config[m] = PmConfig(name=m,
74 type=PmConfig.COUNTER,
75 enabled=True)
76 self.uni_metrics_config[m] = PmConfig(name=m,
77 type=PmConfig.COUNTER,
78 enabled=True)
79
80 def update(self, pm_config):
81 if self.default_freq != pm_config.default_freq:
82 # Update the callback to the new frequency.
83 self.default_freq = pm_config.default_freq
84 self.lc.stop()
85 self.lc.start(interval=self.default_freq / 10)
86 for m in pm_config.metrics:
87 self.pon_metrics_config[m.name].enabled = m.enabled
88 self.uni_metrics_config[m.name].enabled = m.enabled
89
90 def make_proto(self):
91 pm_config = PmConfigs(
92 id=self.id,
93 default_freq=self.default_freq,
94 grouped=False,
95 freq_override=False)
96 for m in sorted(self.pon_metrics_config):
97 pm = self.pon_metrics_config[m] # Either will do they're the same
98 pm_config.metrics.extend([PmConfig(name=pm.name,
99 type=pm.type,
100 enabled=pm.enabled)])
101 return pm_config
102
103 def extract_metrics(self, stats):
104 rtrn_port_metrics = dict()
105 rtrn_port_metrics['pon'] = self.extract_pon_metrics(stats)
106 rtrn_port_metrics['uni'] = self.extract_uni_metrics(stats)
107 return rtrn_port_metrics
108
109 def extract_pon_metrics(self, stats):
110 rtrn_pon_metrics = dict()
111 for m in stats.metrics:
112 if m.port_name == "pon":
113 for p in m.packets:
114 if self.pon_metrics_config[p.name].enabled:
115 rtrn_pon_metrics[p.name] = p.value
116 return rtrn_pon_metrics
117
118 def extract_uni_metrics(self, stats):
119 rtrn_pon_metrics = dict()
120 for m in stats.metrics:
121 if m.port_name == "uni":
122 for p in m.packets:
123 if self.pon_metrics_config[p.name].enabled:
124 rtrn_pon_metrics[p.name] = p.value
125 return rtrn_pon_metrics
126
127 def start_collector(self, callback):
128 log.info("starting-pm-collection", device_name=self.name,
129 device_id=self.device.id)
130 prefix = 'voltha.{}.{}'.format(self.name, self.device.id)
131 self.lc = LoopingCall(callback, self.device.id, prefix)
132 self.lc.start(interval=self.default_freq / 10)
133
134 def stop_collector(self):
135 log.info("stopping-pm-collection", device_name=self.name,
136 device_id=self.device.id)
137 self.lc.stop()
138
139
140class PonSimOnuAdapter(OnuAdapter):
141 def __init__(self, core_proxy, adapter_proxy, config):
142 # DeviceType of ONU should be same as VENDOR ID of ONU Serial Number
143 # as specified by standard
144 # requires for identifying correct adapter or ranged ONU
145 super(PonSimOnuAdapter, self).__init__(core_proxy=core_proxy,
146 adapter_proxy=adapter_proxy,
147 config=config,
148 device_handler_class=PonSimOnuHandler,
149 name='ponsim_onu',
150 vendor='Voltha project',
151 version='0.4',
152 device_type='ponsim_onu',
153 vendor_id='PSMO',
154 accepts_bulk_flow_update=True,
155 accepts_add_remove_flow_updates=False)
156
157
158class PonSimOnuHandler(object):
159 def __init__(self, adapter, device_id):
160 self.adapter = adapter
161 self.core_proxy = adapter.core_proxy
162 self.adapter_proxy = adapter.adapter_proxy
163 self.device_id = device_id
164 self.device_parent_id = None
165 self.log = structlog.get_logger(device_id=device_id)
166 self.incoming_messages = DeferredQueue()
167 self.inter_adapter_message_deferred_map = {}
168 self.proxy_address = None
169 # reference of uni_port is required when re-enabling the device if
170 # it was disabled previously
171 self.uni_port = None
172 self.pon_port = None
173
174 def _to_string(self, unicode_str):
175 if unicode_str is not None:
176 if type(unicode_str) == unicode:
177 return unicode_str.encode('ascii', 'ignore')
178 else:
179 return unicode_str
180 else:
181 return ""
182
183 def receive_message(self, msg):
184 trns_id = self._to_string(msg.header.id)
185 if trns_id in self.inter_adapter_message_deferred_map:
186 self.inter_adapter_message_deferred_map[trns_id].callback(msg)
187 # self.incoming_messages.put(msg)
188
189 @inlineCallbacks
190 def activate(self, device):
191 self.log.info('activating')
192
193 self.device_parent_id = device.parent_id
194 self.proxy_address = device.proxy_address
195
196 # populate device info
197 device.root = False
198 device.vendor = 'ponsim'
199 device.model = 'n/a'
200 device.serial_number = device.serial_number
201 device.mac_address = "AA:BB:CC:DD:E0:00"
202 yield self.core_proxy.device_update(device)
203
204 # Now set the initial PM configuration for this device
205 self.pm_metrics = AdapterPmMetrics(device)
206 pm_config = self.pm_metrics.make_proto()
207 log.info("initial-pm-config", pm_config=pm_config)
208 yield self.core_proxy.device_pm_config_update(pm_config, init=True)
209
210 # Use the channel Id, assigned by the parent device to me, as the port number
211 uni_port = 2
212 if device.proxy_address is not None:
213 if device.proxy_address.channel_id != 0:
214 uni_port = device.proxy_address.channel_id
215
216 # register physical ports
217 self.uni_port = Port(
218 port_no=uni_port,
219 label="uni-" + str(uni_port),
220 type=Port.ETHERNET_UNI,
221 admin_state=AdminState.ENABLED,
222 oper_status=OperStatus.ACTIVE
223 )
224 self.pon_port = Port(
225 port_no=1,
226 label='pon-1',
227 type=Port.PON_ONU,
228 admin_state=AdminState.ENABLED,
229 oper_status=OperStatus.ACTIVE,
230 peers=[
231 Port.PeerPort(
232 device_id=device.parent_id,
233 port_no=device.parent_port_no
234 )
235 ]
236 )
237 yield self.core_proxy.port_created(device.id, self.uni_port)
238 yield self.core_proxy.port_created(device.id, self.pon_port)
239
240 yield self.core_proxy.device_state_update(device.id,
241 connect_status=ConnectStatus.REACHABLE,
242 oper_status=OperStatus.ACTIVE)
243
244 # Start collecting stats from the device after a brief pause
245 self.start_kpi_collection(device.id)
246
247 # TODO: Return only port specific info
248 def get_ofp_port_info(self, device, port_no):
249 # Since the adapter created the device port then it has the reference
250 # of the port to
251 # return the capability. TODO: Do a lookup on the UNI port number
252 # and return the
253 # appropriate attributes
254 self.log.info('get_ofp_port_info', port_no=port_no, device_id=device.id)
255 cap = OFPPF_1GB_FD | OFPPF_FIBER
256 return PortCapability(
257 port=LogicalPort(
258 ofp_port=ofp_port(
259 hw_addr=mac_str_to_tuple('AA:BB:CC:DD:E0:%02x' % port_no),
260 config=0,
261 state=OFPPS_LIVE,
262 curr=cap,
263 advertised=cap,
264 peer=cap,
265 curr_speed=OFPPF_1GB_FD,
266 max_speed=OFPPF_1GB_FD
267 ),
268 device_id=device.id,
269 device_port_no=port_no
270 )
271 )
272
273 @inlineCallbacks
274 def _get_uni_port(self):
275 ports = yield self.core_proxy.get_ports(self.device_id,
276 Port.ETHERNET_UNI)
277 returnValue(ports)
278
279 @inlineCallbacks
280 def _get_pon_port(self):
281 ports = yield self.core_proxy.get_ports(self.device_id, Port.PON_ONU)
282 returnValue(ports)
283
284 def reconcile(self, device):
285 self.log.info('reconciling-ONU-device-starts')
286 # TODO: complete code
287
288 @inlineCallbacks
289 def update_flow_table(self, flows):
290 trnsId = None
291 try:
292 self.log.info('update_flow_table', flows=flows)
293 # we need to proxy through the OLT to get to the ONU
294
295 fb = FlowTable(
296 port=self.proxy_address.channel_id,
297 flows=flows
298 )
299
300 # Create a deferred to wait for the result as well as a transid
301 wait_for_result = Deferred()
302 trnsId = uuid4().hex
303 self.inter_adapter_message_deferred_map[
304 self._to_string(trnsId)] = wait_for_result
305
306 # Sends the request via proxy and wait for an ACK
307 yield self.adapter_proxy.send_inter_adapter_message(
308 msg=fb,
309 type=InterAdapterMessageType.FLOW_REQUEST,
310 from_adapter=self.adapter.name,
311 to_adapter=self.proxy_address.device_type,
312 to_device_id=self.device_id,
313 proxy_device_id=self.proxy_address.device_id,
314 message_id=trnsId
315 )
316 # Wait for the full response from the proxied adapter
317 res = yield wait_for_result
318 if res.header.type == InterAdapterMessageType.FLOW_RESPONSE:
319 body = InterAdapterResponseBody()
320 res.body.Unpack(body)
321 self.log.info('response-received', result=body.status)
322 except Exception as e:
323 self.log.exception("update-flow-error", e=e)
324 finally:
325 if trnsId in self.inter_adapter_message_deferred_map:
326 del self.inter_adapter_message_deferred_map[trnsId]
327
328 def process_inter_adapter_message(self, msg):
329 # We expect only responses on the ONU side
330 self.log.info('process-inter-adapter-message', msg=msg)
331 self.receive_message(msg)
332
333 def remove_from_flow_table(self, flows):
334 self.log.debug('remove-from-flow-table', flows=flows)
335 # TODO: Update PONSIM code to accept incremental flow changes.
336 # Once completed, the accepts_add_remove_flow_updates for this
337 # device type can be set to True
338
339 def add_to_flow_table(self, flows):
340 self.log.debug('add-to-flow-table', flows=flows)
341 # TODO: Update PONSIM code to accept incremental flow changes
342 # Once completed, the accepts_add_remove_flow_updates for this
343 # device type can be set to True
344
345 @inlineCallbacks
346 def reboot(self):
347 self.log.info('rebooting', device_id=self.device_id)
348
349 # Update the connect status to UNREACHABLE
350 yield self.core_proxy.device_state_update(self.device_id,
351 connect_status=ConnectStatus.UNREACHABLE)
352
353 # Sleep 10 secs, simulating a reboot
354 # TODO: send alert and clear alert after the reboot
355 yield asleep(10)
356
357 # Change the connection status back to REACHABLE. With a
358 # real ONU the connection state must be the actual state
359 yield self.core_proxy.device_state_update(self.device_id,
360 connect_status=ConnectStatus.REACHABLE)
361
362 self.log.info('rebooted', device_id=self.device_id)
363
364 def self_test_device(self, device):
365 """
366 This is called to Self a device based on a NBI call.
367 :param device: A Voltha.Device object.
368 :return: Will return result of self test
369 """
370 log.info('self-test-device', device=device.id)
371 raise NotImplementedError()
372
373 @inlineCallbacks
374 def disable(self):
375 self.log.info('disabling', device_id=self.device_id)
376
377 # Update the device operational status to UNKNOWN
378 yield self.core_proxy.device_state_update(self.device_id,
379 oper_status=OperStatus.UNKNOWN,
380 connect_status=ConnectStatus.UNREACHABLE)
381
382 self.stop_kpi_collection()
383
384 # TODO:
385 # 1) Remove all flows from the device
386 # 2) Remove the device from ponsim
387 self.log.info('disabled', device_id=self.device_id)
388
389 @inlineCallbacks
390 def reenable(self):
391 self.log.info('re-enabling', device_id=self.device_id)
392 try:
393
394 # Refresh the port reference - we only use one port for now
395 ports = yield self._get_uni_port()
396 self.log.info('re-enabling-uni-ports', ports=ports)
397 if ports.items:
398 self.uni_port = ports.items[0]
399
400 ports = yield self._get_pon_port()
401 self.log.info('re-enabling-pon-ports', ports=ports)
402 if ports.items:
403 self.pon_port = ports.items[0]
404
405 # Update the state of the UNI port
406 yield self.core_proxy.port_state_update(self.device_id,
407 port_type=Port.ETHERNET_UNI,
408 port_no=self.uni_port.port_no,
409 oper_status=OperStatus.ACTIVE)
410
411 # Update the state of the PON port
412 yield self.core_proxy.port_state_update(self.device_id,
413 port_type=Port.PON_ONU,
414 port_no=self.pon_port.port_no,
415 oper_status=OperStatus.ACTIVE)
416
417 yield self.core_proxy.device_state_update(self.device_id,
418 oper_status=OperStatus.ACTIVE,
419 connect_status=ConnectStatus.REACHABLE)
420
421 self.start_kpi_collection(self.device_id)
422
423 self.log.info('re-enabled', device_id=self.device_id)
424 except Exception, e:
425 self.log.exception('error-reenabling', e=e)
426
427 def delete(self):
428 self.log.info('deleting', device_id=self.device_id)
429
430 # TODO:
431 # 1) Remove all flows from the device
432 # 2) Remove the device from ponsim
433
434 self.log.info('deleted', device_id=self.device_id)
435
436 def start_kpi_collection(self, device_id):
437 kafka_cluster_proxy = get_kafka_proxy()
438
439 @inlineCallbacks
440 def _collect(device_id, prefix):
441 try:
442 self.log.debug("pm-collection-interval")
443 # Proxy a message to ponsim_olt. The OLT will then query the ONU
444 # for statistics. The reply will
445 # arrive proxied back to us in self.receive_message().
446 msg = PonSimMetricsRequest(port=self.proxy_address.channel_id)
447
448 # Create a deferred to wait for the result as well as a transid
449 wait_for_result = Deferred()
450 trnsId = uuid4().hex
451 self.inter_adapter_message_deferred_map[
452 self._to_string(trnsId)] = wait_for_result
453
454 # Sends the request via proxy and wait for an ACK
455 yield self.adapter_proxy.send_inter_adapter_message(
456 msg=msg,
457 type=InterAdapterMessageType.METRICS_REQUEST,
458 from_adapter=self.adapter.name,
459 to_adapter=self.proxy_address.device_type,
460 to_device_id=self.device_id,
461 proxy_device_id=self.proxy_address.device_id,
462 message_id=trnsId
463 )
464 # Wait for the full response from the proxied adapter
465 res = yield wait_for_result
466 # Remove the transaction from the transaction map
467 del self.inter_adapter_message_deferred_map[self._to_string(trnsId)]
468
469 # Message is a reply to an ONU statistics request. Push it out to
470 # Kafka via adapter.submit_kpis().
471 if res.header.type == InterAdapterMessageType.METRICS_RESPONSE:
472 msg = InterAdapterResponseBody()
473 res.body.Unpack(msg)
474 self.log.debug('metrics-response-received', result=msg.status)
475 if self.pm_metrics:
476 self.log.debug('Handling incoming ONU metrics')
477 response = PonSimMetrics()
478 msg.body.Unpack(response)
479 port_metrics = self.pm_metrics.extract_metrics(response)
480 try:
481 ts = arrow.utcnow().timestamp
482 kpi_event = KpiEvent(
483 type=KpiEventType.slice,
484 ts=ts,
485 prefixes={
486 # OLT NNI port
487 prefix + '.uni': MetricValuePairs(
488 metrics=port_metrics['uni']),
489 # OLT PON port
490 prefix + '.pon': MetricValuePairs(
491 metrics=port_metrics['pon'])
492 }
493 )
494
495 self.log.debug(
496 'Submitting KPI for incoming ONU mnetrics')
497
498 # Step 3: submit directly to the kafka bus
499 if kafka_cluster_proxy:
500 if isinstance(kpi_event, Message):
501 kpi_event = dumps(
502 MessageToDict(kpi_event, True, True))
503 kafka_cluster_proxy.send_message("voltha.kpis",
504 kpi_event)
505
506 except Exception as e:
507 log.exception('failed-to-submit-kpis', e=e)
508 except Exception as e:
509 log.exception('failed-to-collect-metrics', e=e)
510
511 self.pm_metrics.start_collector(_collect)
512
513 def stop_kpi_collection(self):
514 self.pm_metrics.stop_collector()