blob: 0810376c5243e15763c10e6c4fb8a3ef82a1cae5 [file] [log] [blame]
khenaidoob9203542018-09-17 22:56:37 -04001#
2# Copyright 2017 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
khenaidoo6fdf0ba2018-11-02 14:38:33 -040018Represents an ONU device
khenaidoob9203542018-09-17 22:56:37 -040019"""
20
khenaidoo91ecfd62018-11-04 17:13:42 -050021from uuid import uuid4
22
23import arrow
khenaidoob9203542018-09-17 22:56:37 -040024import structlog
khenaidoo91ecfd62018-11-04 17:13:42 -050025from google.protobuf.json_format import MessageToDict
26from google.protobuf.message import Message
27from simplejson import dumps
28from twisted.internet.defer import DeferredQueue, inlineCallbacks, \
29 returnValue, Deferred
30from twisted.internet.task import LoopingCall
khenaidoob9203542018-09-17 22:56:37 -040031
khenaidoofdbad6e2018-11-06 22:26:38 -050032from python.common.utils.asleep import asleep
33from python.adapters.iadapter import OnuAdapter
34from python.adapters.kafka.kafka_proxy import get_kafka_proxy
35from python.protos import third_party
36from python.protos.common_pb2 import OperStatus, ConnectStatus, AdminState
khenaidoo79232702018-12-04 11:00:41 -050037from python.protos.inter_container_pb2 import PortCapability, \
khenaidoo6fdf0ba2018-11-02 14:38:33 -040038 InterAdapterMessageType, InterAdapterResponseBody
khenaidoofdbad6e2018-11-06 22:26:38 -050039from python.protos.device_pb2 import Port, PmConfig, PmConfigs
40from python.protos.events_pb2 import KpiEvent, KpiEventType, MetricValuePairs
41from python.protos.logical_device_pb2 import LogicalPort
42from python.protos.openflow_13_pb2 import OFPPS_LIVE, OFPPF_FIBER, \
khenaidoob9203542018-09-17 22:56:37 -040043 OFPPF_1GB_FD
khenaidoofdbad6e2018-11-06 22:26:38 -050044from python.protos.openflow_13_pb2 import ofp_port
45from python.protos.ponsim_pb2 import FlowTable, PonSimMetricsRequest, PonSimMetrics
khenaidoob9203542018-09-17 22:56:37 -040046
47_ = third_party
48log = structlog.get_logger()
49
50
51def mac_str_to_tuple(mac):
52 return tuple(int(d, 16) for d in mac.split(':'))
53
khenaidoo6fdf0ba2018-11-02 14:38:33 -040054
khenaidoo91ecfd62018-11-04 17:13:42 -050055class AdapterPmMetrics:
56 def __init__(self, device):
57 self.pm_names = {'tx_64_pkts', 'tx_65_127_pkts', 'tx_128_255_pkts',
58 'tx_256_511_pkts', 'tx_512_1023_pkts',
59 'tx_1024_1518_pkts', 'tx_1519_9k_pkts',
60 'rx_64_pkts', 'rx_65_127_pkts',
61 'rx_128_255_pkts', 'rx_256_511_pkts',
62 'rx_512_1023_pkts', 'rx_1024_1518_pkts',
63 'rx_1519_9k_pkts'}
64 self.device = device
65 self.id = device.id
66 self.name = 'ponsim_onu'
67 self.default_freq = 150
68 self.grouped = False
69 self.freq_override = False
70 self.pm_metrics = None
71 self.pon_metrics_config = dict()
72 self.uni_metrics_config = dict()
73 self.lc = None
74 for m in self.pm_names:
75 self.pon_metrics_config[m] = PmConfig(name=m,
76 type=PmConfig.COUNTER,
77 enabled=True)
78 self.uni_metrics_config[m] = PmConfig(name=m,
79 type=PmConfig.COUNTER,
80 enabled=True)
81
82 def update(self, pm_config):
83 if self.default_freq != pm_config.default_freq:
84 # Update the callback to the new frequency.
85 self.default_freq = pm_config.default_freq
86 self.lc.stop()
87 self.lc.start(interval=self.default_freq / 10)
88 for m in pm_config.metrics:
89 self.pon_metrics_config[m.name].enabled = m.enabled
90 self.uni_metrics_config[m.name].enabled = m.enabled
91
92 def make_proto(self):
93 pm_config = PmConfigs(
94 id=self.id,
95 default_freq=self.default_freq,
96 grouped=False,
97 freq_override=False)
98 for m in sorted(self.pon_metrics_config):
99 pm = self.pon_metrics_config[m] # Either will do they're the same
100 pm_config.metrics.extend([PmConfig(name=pm.name,
101 type=pm.type,
102 enabled=pm.enabled)])
103 return pm_config
104
105 def extract_metrics(self, stats):
106 rtrn_port_metrics = dict()
107 rtrn_port_metrics['pon'] = self.extract_pon_metrics(stats)
108 rtrn_port_metrics['uni'] = self.extract_uni_metrics(stats)
109 return rtrn_port_metrics
110
111 def extract_pon_metrics(self, stats):
112 rtrn_pon_metrics = dict()
113 for m in stats.metrics:
114 if m.port_name == "pon":
115 for p in m.packets:
116 if self.pon_metrics_config[p.name].enabled:
117 rtrn_pon_metrics[p.name] = p.value
118 return rtrn_pon_metrics
119
120 def extract_uni_metrics(self, stats):
121 rtrn_pon_metrics = dict()
122 for m in stats.metrics:
123 if m.port_name == "uni":
124 for p in m.packets:
125 if self.pon_metrics_config[p.name].enabled:
126 rtrn_pon_metrics[p.name] = p.value
127 return rtrn_pon_metrics
128
129 def start_collector(self, callback):
130 log.info("starting-pm-collection", device_name=self.name,
131 device_id=self.device.id)
132 prefix = 'voltha.{}.{}'.format(self.name, self.device.id)
133 self.lc = LoopingCall(callback, self.device.id, prefix)
134 self.lc.start(interval=self.default_freq / 10)
135
136 def stop_collector(self):
137 log.info("stopping-pm-collection", device_name=self.name,
138 device_id=self.device.id)
139 self.lc.stop()
140
141
khenaidoob9203542018-09-17 22:56:37 -0400142class PonSimOnuAdapter(OnuAdapter):
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400143 def __init__(self, core_proxy, adapter_proxy, config):
khenaidoo91ecfd62018-11-04 17:13:42 -0500144 # DeviceType of ONU should be same as VENDOR ID of ONU Serial Number
145 # as specified by standard
khenaidoob9203542018-09-17 22:56:37 -0400146 # requires for identifying correct adapter or ranged ONU
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400147 super(PonSimOnuAdapter, self).__init__(core_proxy=core_proxy,
148 adapter_proxy=adapter_proxy,
khenaidoob9203542018-09-17 22:56:37 -0400149 config=config,
150 device_handler_class=PonSimOnuHandler,
151 name='ponsim_onu',
152 vendor='Voltha project',
153 version='0.4',
154 device_type='ponsim_onu',
155 vendor_id='PSMO',
156 accepts_bulk_flow_update=True,
157 accepts_add_remove_flow_updates=False)
158
159
160class PonSimOnuHandler(object):
161 def __init__(self, adapter, device_id):
162 self.adapter = adapter
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400163 self.core_proxy = adapter.core_proxy
164 self.adapter_proxy = adapter.adapter_proxy
khenaidoob9203542018-09-17 22:56:37 -0400165 self.device_id = device_id
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400166 self.device_parent_id = None
khenaidoob9203542018-09-17 22:56:37 -0400167 self.log = structlog.get_logger(device_id=device_id)
168 self.incoming_messages = DeferredQueue()
khenaidoo91ecfd62018-11-04 17:13:42 -0500169 self.inter_adapter_message_deferred_map = {}
khenaidoob9203542018-09-17 22:56:37 -0400170 self.proxy_address = None
171 # reference of uni_port is required when re-enabling the device if
172 # it was disabled previously
173 self.uni_port = None
174 self.pon_port = None
175
khenaidoo91ecfd62018-11-04 17:13:42 -0500176 def _to_string(self, unicode_str):
177 if unicode_str is not None:
178 if type(unicode_str) == unicode:
179 return unicode_str.encode('ascii', 'ignore')
180 else:
181 return unicode_str
182 else:
183 return ""
184
khenaidoob9203542018-09-17 22:56:37 -0400185 def receive_message(self, msg):
khenaidoo91ecfd62018-11-04 17:13:42 -0500186 trns_id = self._to_string(msg.header.id)
187 if trns_id in self.inter_adapter_message_deferred_map:
188 self.inter_adapter_message_deferred_map[trns_id].callback(msg)
189 # self.incoming_messages.put(msg)
khenaidoob9203542018-09-17 22:56:37 -0400190
khenaidoob9203542018-09-17 22:56:37 -0400191 @inlineCallbacks
192 def activate(self, device):
193 self.log.info('activating')
194
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400195 self.device_parent_id = device.parent_id
196 self.proxy_address = device.proxy_address
khenaidoob9203542018-09-17 22:56:37 -0400197
198 # populate device info
199 device.root = False
200 device.vendor = 'ponsim'
201 device.model = 'n/a'
khenaidoo1ce37ad2019-03-24 22:07:24 -0400202 device.serial_number = device.serial_number
khenaidoobcf205b2019-01-25 22:21:14 -0500203 device.mac_address = "AA:BB:CC:DD:E0:00"
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400204 yield self.core_proxy.device_update(device)
khenaidoob9203542018-09-17 22:56:37 -0400205
khenaidoo91ecfd62018-11-04 17:13:42 -0500206 # Now set the initial PM configuration for this device
207 self.pm_metrics = AdapterPmMetrics(device)
208 pm_config = self.pm_metrics.make_proto()
209 log.info("initial-pm-config", pm_config=pm_config)
khenaidoodf5a9752019-02-14 14:25:19 -0500210 yield self.core_proxy.device_pm_config_update(pm_config, init=True)
khenaidoo91ecfd62018-11-04 17:13:42 -0500211
khenaidoo54544ae2019-03-18 13:22:39 -0400212 # Use the channel Id, assigned by the parent device to me, as the port number
213 uni_port = 2
214 if device.proxy_address is not None:
215 if device.proxy_address.channel_id != 0:
216 uni_port = device.proxy_address.channel_id
217
khenaidoo4d4802d2018-10-04 21:59:49 -0400218 # register physical ports
khenaidoob9203542018-09-17 22:56:37 -0400219 self.uni_port = Port(
khenaidoo54544ae2019-03-18 13:22:39 -0400220 port_no=uni_port,
221 label="uni-" + str(uni_port),
khenaidoob9203542018-09-17 22:56:37 -0400222 type=Port.ETHERNET_UNI,
223 admin_state=AdminState.ENABLED,
224 oper_status=OperStatus.ACTIVE
225 )
226 self.pon_port = Port(
227 port_no=1,
khenaidoo54544ae2019-03-18 13:22:39 -0400228 label='pon-1',
khenaidoob9203542018-09-17 22:56:37 -0400229 type=Port.PON_ONU,
230 admin_state=AdminState.ENABLED,
231 oper_status=OperStatus.ACTIVE,
232 peers=[
233 Port.PeerPort(
234 device_id=device.parent_id,
235 port_no=device.parent_port_no
236 )
237 ]
238 )
khenaidoodf5a9752019-02-14 14:25:19 -0500239 yield self.core_proxy.port_created(device.id, self.uni_port)
240 yield self.core_proxy.port_created(device.id, self.pon_port)
khenaidoob9203542018-09-17 22:56:37 -0400241
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400242 yield self.core_proxy.device_state_update(device.id,
243 connect_status=ConnectStatus.REACHABLE,
244 oper_status=OperStatus.ACTIVE)
khenaidoob9203542018-09-17 22:56:37 -0400245
khenaidoo91ecfd62018-11-04 17:13:42 -0500246 # Start collecting stats from the device after a brief pause
247 self.start_kpi_collection(device.id)
248
khenaidoo19d7b632018-10-30 10:49:50 -0400249 # TODO: Return only port specific info
khenaidoob9203542018-09-17 22:56:37 -0400250 def get_ofp_port_info(self, device, port_no):
khenaidoo91ecfd62018-11-04 17:13:42 -0500251 # Since the adapter created the device port then it has the reference
252 # of the port to
253 # return the capability. TODO: Do a lookup on the UNI port number
254 # and return the
khenaidoob9203542018-09-17 22:56:37 -0400255 # appropriate attributes
256 self.log.info('get_ofp_port_info', port_no=port_no, device_id=device.id)
khenaidoob9203542018-09-17 22:56:37 -0400257 cap = OFPPF_1GB_FD | OFPPF_FIBER
258 return PortCapability(
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400259 port=LogicalPort(
khenaidoob9203542018-09-17 22:56:37 -0400260 ofp_port=ofp_port(
khenaidoobcf205b2019-01-25 22:21:14 -0500261 hw_addr=mac_str_to_tuple('AA:BB:CC:DD:E0:%02x' % port_no),
khenaidoob9203542018-09-17 22:56:37 -0400262 config=0,
263 state=OFPPS_LIVE,
264 curr=cap,
265 advertised=cap,
266 peer=cap,
267 curr_speed=OFPPF_1GB_FD,
268 max_speed=OFPPF_1GB_FD
khenaidoo19d7b632018-10-30 10:49:50 -0400269 ),
270 device_id=device.id,
271 device_port_no=port_no
khenaidoob9203542018-09-17 22:56:37 -0400272 )
273 )
274
khenaidoo92e62c52018-10-03 14:02:54 -0400275 @inlineCallbacks
276 def _get_uni_port(self):
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400277 ports = yield self.core_proxy.get_ports(self.device_id,
278 Port.ETHERNET_UNI)
khenaidoo92e62c52018-10-03 14:02:54 -0400279 returnValue(ports)
280
281 @inlineCallbacks
khenaidoob9203542018-09-17 22:56:37 -0400282 def _get_pon_port(self):
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400283 ports = yield self.core_proxy.get_ports(self.device_id, Port.PON_ONU)
khenaidoo92e62c52018-10-03 14:02:54 -0400284 returnValue(ports)
285
khenaidoob9203542018-09-17 22:56:37 -0400286 def reconcile(self, device):
287 self.log.info('reconciling-ONU-device-starts')
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400288 # TODO: complete code
khenaidoob9203542018-09-17 22:56:37 -0400289
290 @inlineCallbacks
291 def update_flow_table(self, flows):
khenaidoo91ecfd62018-11-04 17:13:42 -0500292 trnsId = None
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400293 try:
294 self.log.info('update_flow_table', flows=flows)
295 # we need to proxy through the OLT to get to the ONU
khenaidoob9203542018-09-17 22:56:37 -0400296
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400297 fb = FlowTable(
298 port=self.proxy_address.channel_id,
299 flows=flows
300 )
khenaidoo91ecfd62018-11-04 17:13:42 -0500301
302 # Create a deferred to wait for the result as well as a transid
303 wait_for_result = Deferred()
304 trnsId = uuid4().hex
305 self.inter_adapter_message_deferred_map[
306 self._to_string(trnsId)] = wait_for_result
307
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400308 # Sends the request via proxy and wait for an ACK
309 yield self.adapter_proxy.send_inter_adapter_message(
310 msg=fb,
311 type=InterAdapterMessageType.FLOW_REQUEST,
312 from_adapter=self.adapter.name,
313 to_adapter=self.proxy_address.device_type,
314 to_device_id=self.device_id,
khenaidoo91ecfd62018-11-04 17:13:42 -0500315 proxy_device_id=self.proxy_address.device_id,
316 message_id=trnsId
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400317 )
318 # Wait for the full response from the proxied adapter
khenaidoo91ecfd62018-11-04 17:13:42 -0500319 res = yield wait_for_result
320 if res.header.type == InterAdapterMessageType.FLOW_RESPONSE:
321 body = InterAdapterResponseBody()
322 res.body.Unpack(body)
323 self.log.info('response-received', result=body.status)
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400324 except Exception as e:
325 self.log.exception("update-flow-error", e=e)
khenaidoo91ecfd62018-11-04 17:13:42 -0500326 finally:
327 if trnsId in self.inter_adapter_message_deferred_map:
328 del self.inter_adapter_message_deferred_map[trnsId]
khenaidoob9203542018-09-17 22:56:37 -0400329
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400330 def process_inter_adapter_message(self, msg):
khenaidoo91ecfd62018-11-04 17:13:42 -0500331 # We expect only responses on the ONU side
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400332 self.log.info('process-inter-adapter-message', msg=msg)
khenaidoo91ecfd62018-11-04 17:13:42 -0500333 self.receive_message(msg)
khenaidoob9203542018-09-17 22:56:37 -0400334
335 def remove_from_flow_table(self, flows):
336 self.log.debug('remove-from-flow-table', flows=flows)
337 # TODO: Update PONSIM code to accept incremental flow changes.
338 # Once completed, the accepts_add_remove_flow_updates for this
339 # device type can be set to True
340
341 def add_to_flow_table(self, flows):
342 self.log.debug('add-to-flow-table', flows=flows)
343 # TODO: Update PONSIM code to accept incremental flow changes
344 # Once completed, the accepts_add_remove_flow_updates for this
345 # device type can be set to True
346
347 @inlineCallbacks
348 def reboot(self):
349 self.log.info('rebooting', device_id=self.device_id)
350
khenaidoo4d4802d2018-10-04 21:59:49 -0400351 # Update the connect status to UNREACHABLE
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400352 yield self.core_proxy.device_state_update(self.device_id,
353 connect_status=ConnectStatus.UNREACHABLE)
khenaidoob9203542018-09-17 22:56:37 -0400354
355 # Sleep 10 secs, simulating a reboot
356 # TODO: send alert and clear alert after the reboot
357 yield asleep(10)
358
khenaidoo4d4802d2018-10-04 21:59:49 -0400359 # Change the connection status back to REACHABLE. With a
360 # real ONU the connection state must be the actual state
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400361 yield self.core_proxy.device_state_update(self.device_id,
362 connect_status=ConnectStatus.REACHABLE)
khenaidoo4d4802d2018-10-04 21:59:49 -0400363
khenaidoob9203542018-09-17 22:56:37 -0400364 self.log.info('rebooted', device_id=self.device_id)
365
366 def self_test_device(self, device):
367 """
368 This is called to Self a device based on a NBI call.
369 :param device: A Voltha.Device object.
370 :return: Will return result of self test
371 """
372 log.info('self-test-device', device=device.id)
373 raise NotImplementedError()
374
khenaidoo92e62c52018-10-03 14:02:54 -0400375 @inlineCallbacks
khenaidoob9203542018-09-17 22:56:37 -0400376 def disable(self):
377 self.log.info('disabling', device_id=self.device_id)
378
khenaidoob9203542018-09-17 22:56:37 -0400379 # Update the device operational status to UNKNOWN
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400380 yield self.core_proxy.device_state_update(self.device_id,
381 oper_status=OperStatus.UNKNOWN,
382 connect_status=ConnectStatus.UNREACHABLE)
khenaidoob9203542018-09-17 22:56:37 -0400383
khenaidoo91ecfd62018-11-04 17:13:42 -0500384 self.stop_kpi_collection()
385
khenaidoob9203542018-09-17 22:56:37 -0400386 # TODO:
387 # 1) Remove all flows from the device
388 # 2) Remove the device from ponsim
khenaidoo92e62c52018-10-03 14:02:54 -0400389 self.log.info('disabled', device_id=self.device_id)
khenaidoob9203542018-09-17 22:56:37 -0400390
khenaidoo92e62c52018-10-03 14:02:54 -0400391 @inlineCallbacks
khenaidoob9203542018-09-17 22:56:37 -0400392 def reenable(self):
393 self.log.info('re-enabling', device_id=self.device_id)
394 try:
khenaidoob9203542018-09-17 22:56:37 -0400395
khenaidoo92e62c52018-10-03 14:02:54 -0400396 # Refresh the port reference - we only use one port for now
397 ports = yield self._get_uni_port()
398 self.log.info('re-enabling-uni-ports', ports=ports)
399 if ports.items:
400 self.uni_port = ports.items[0]
khenaidoob9203542018-09-17 22:56:37 -0400401
khenaidoo92e62c52018-10-03 14:02:54 -0400402 ports = yield self._get_pon_port()
403 self.log.info('re-enabling-pon-ports', ports=ports)
404 if ports.items:
405 self.pon_port = ports.items[0]
406
407 # Update the state of the UNI port
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400408 yield self.core_proxy.port_state_update(self.device_id,
409 port_type=Port.ETHERNET_UNI,
410 port_no=self.uni_port.port_no,
411 oper_status=OperStatus.ACTIVE)
khenaidoo92e62c52018-10-03 14:02:54 -0400412
413 # Update the state of the PON port
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400414 yield self.core_proxy.port_state_update(self.device_id,
415 port_type=Port.PON_ONU,
416 port_no=self.pon_port.port_no,
417 oper_status=OperStatus.ACTIVE)
khenaidoo92e62c52018-10-03 14:02:54 -0400418
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400419 yield self.core_proxy.device_state_update(self.device_id,
420 oper_status=OperStatus.ACTIVE,
421 connect_status=ConnectStatus.REACHABLE)
khenaidoo92e62c52018-10-03 14:02:54 -0400422
khenaidoo91ecfd62018-11-04 17:13:42 -0500423 self.start_kpi_collection(self.device_id)
424
khenaidoo92e62c52018-10-03 14:02:54 -0400425 self.log.info('re-enabled', device_id=self.device_id)
khenaidoob9203542018-09-17 22:56:37 -0400426 except Exception, e:
427 self.log.exception('error-reenabling', e=e)
428
429 def delete(self):
430 self.log.info('deleting', device_id=self.device_id)
431
khenaidoob9203542018-09-17 22:56:37 -0400432 # TODO:
433 # 1) Remove all flows from the device
434 # 2) Remove the device from ponsim
435
436 self.log.info('deleted', device_id=self.device_id)
khenaidoo91ecfd62018-11-04 17:13:42 -0500437
438 def start_kpi_collection(self, device_id):
439 kafka_cluster_proxy = get_kafka_proxy()
440
441 @inlineCallbacks
442 def _collect(device_id, prefix):
443 try:
444 self.log.debug("pm-collection-interval")
445 # Proxy a message to ponsim_olt. The OLT will then query the ONU
446 # for statistics. The reply will
447 # arrive proxied back to us in self.receive_message().
448 msg = PonSimMetricsRequest(port=self.proxy_address.channel_id)
449
450 # Create a deferred to wait for the result as well as a transid
451 wait_for_result = Deferred()
452 trnsId = uuid4().hex
453 self.inter_adapter_message_deferred_map[
454 self._to_string(trnsId)] = wait_for_result
455
456 # Sends the request via proxy and wait for an ACK
457 yield self.adapter_proxy.send_inter_adapter_message(
458 msg=msg,
459 type=InterAdapterMessageType.METRICS_REQUEST,
460 from_adapter=self.adapter.name,
461 to_adapter=self.proxy_address.device_type,
462 to_device_id=self.device_id,
463 proxy_device_id=self.proxy_address.device_id,
464 message_id=trnsId
465 )
466 # Wait for the full response from the proxied adapter
467 res = yield wait_for_result
468 # Remove the transaction from the transaction map
469 del self.inter_adapter_message_deferred_map[self._to_string(trnsId)]
470
471 # Message is a reply to an ONU statistics request. Push it out to
472 # Kafka via adapter.submit_kpis().
473 if res.header.type == InterAdapterMessageType.METRICS_RESPONSE:
474 msg = InterAdapterResponseBody()
475 res.body.Unpack(msg)
476 self.log.debug('metrics-response-received', result=msg.status)
477 if self.pm_metrics:
478 self.log.debug('Handling incoming ONU metrics')
479 response = PonSimMetrics()
480 msg.body.Unpack(response)
481 port_metrics = self.pm_metrics.extract_metrics(response)
482 try:
483 ts = arrow.utcnow().timestamp
484 kpi_event = KpiEvent(
485 type=KpiEventType.slice,
486 ts=ts,
487 prefixes={
488 # OLT NNI port
489 prefix + '.uni': MetricValuePairs(
490 metrics=port_metrics['uni']),
491 # OLT PON port
492 prefix + '.pon': MetricValuePairs(
493 metrics=port_metrics['pon'])
494 }
495 )
496
497 self.log.debug(
498 'Submitting KPI for incoming ONU mnetrics')
499
500 # Step 3: submit directly to the kafka bus
501 if kafka_cluster_proxy:
502 if isinstance(kpi_event, Message):
503 kpi_event = dumps(
504 MessageToDict(kpi_event, True, True))
505 kafka_cluster_proxy.send_message("voltha.kpis",
506 kpi_event)
507
508 except Exception as e:
509 log.exception('failed-to-submit-kpis', e=e)
510 except Exception as e:
511 log.exception('failed-to-collect-metrics', e=e)
512
513 self.pm_metrics.start_collector(_collect)
514
515 def stop_kpi_collection(self):
516 self.pm_metrics.stop_collector()