blob: 7f19f9bab173a5527fe9f75ce17e0e35a9a3be16 [file] [log] [blame]
Wei-Yu Chenad55cb82022-02-15 20:07:01 +08001# SPDX-FileCopyrightText: 2020 The Magma Authors.
2# SPDX-FileCopyrightText: 2022 Open Networking Foundation <support@opennetworking.org>
3#
4# SPDX-License-Identifier: BSD-3-Clause
Wei-Yu Chen49950b92021-11-08 19:19:18 +08005
6import json
7import os
8from collections import namedtuple
9from typing import Any, Dict, List, NamedTuple, Optional, Tuple, Union
10
11from lte.protos.enodebd_pb2 import SingleEnodebStatus
12from lte.protos.mconfig import mconfigs_pb2
13from common import serialization_utils
14import metrics
15from data_models.data_model_parameters import ParameterName
16from device_config.configuration_util import (
17 find_enb_by_cell_id,
18 get_enb_rf_tx_desired,
19)
20from exceptions import ConfigurationError
21from logger import EnodebdLogger as logger
22from s1ap_client import get_all_enb_state
23from state_machines.enb_acs import EnodebAcsStateMachine
24from state_machines.enb_acs_manager import StateMachineManager
25from orc8r.protos.service303_pb2 import State
26
27# There are 2 levels of caching for GPS coordinates from the enodeB: module
28# variables (in-memory) and on disk. In the event the enodeB stops reporting
29# GPS, we will continue to report the cached coordinates from the in-memory
30# cached coordinates. If enodebd is restarted, this in-memory cache will be
31# populated by the file
32
33CACHED_GPS_COORD_FILE_PATH = os.path.join(
34 '/var/opt/magma/enodebd',
35 'gps_coords.txt',
36)
37
38# Cache GPS coordinates in memory so we don't write to the file cache if the
39# coordinates have not changed. We can read directly from here instead of the
40# file cache when the enodeB goes down unless these are unintialized.
41_gps_lat_cached = None
42_gps_lon_cached = None
43
44EnodebStatus = NamedTuple(
45 'EnodebStatus',
46 [
47 ('enodeb_configured', bool),
48 ('gps_latitude', str),
49 ('gps_longitude', str),
50 ('enodeb_connected', bool),
51 ('opstate_enabled', bool),
52 ('rf_tx_on', bool),
53 ('rf_tx_desired', bool),
54 ('gps_connected', bool),
55 ('ptp_connected', bool),
56 ('mme_connected', bool),
57 ('fsm_state', str),
58 ('cell_id', int),
59 ],
60)
61
62# TODO: Remove after checkins support multiple eNB status
63MagmaOldEnodebdStatus = namedtuple(
64 'MagmaOldEnodebdStatus',
65 [
66 'enodeb_serial',
67 'enodeb_configured',
68 'gps_latitude',
69 'gps_longitude',
70 'enodeb_connected',
71 'opstate_enabled',
72 'rf_tx_on',
73 'rf_tx_desired',
74 'gps_connected',
75 'ptp_connected',
76 'mme_connected',
77 'enodeb_state',
78 ],
79)
80
81MagmaEnodebdStatus = NamedTuple(
82 'MagmaEnodebdStatus',
83 [
84 ('n_enodeb_connected', str),
85 ('all_enodeb_configured', str),
86 ('all_enodeb_opstate_enabled', str),
87 ('all_enodeb_rf_tx_configured', str),
88 ('any_enodeb_gps_connected', str),
89 ('all_enodeb_ptp_connected', str),
90 ('all_enodeb_mme_connected', str),
91 ('gateway_gps_longitude', str),
92 ('gateway_gps_latitude', str),
93 ],
94)
95
96
97def update_status_metrics(status: EnodebStatus) -> None:
98 """ Update metrics for eNodeB status """
99 # Call every second
100 metrics_by_stat_key = {
101 'enodeb_connected': metrics.STAT_ENODEB_CONNECTED,
102 'enodeb_configured': metrics.STAT_ENODEB_CONFIGURED,
103 'opstate_enabled': metrics.STAT_OPSTATE_ENABLED,
104 'rf_tx_on': metrics.STAT_RF_TX_ENABLED,
105 'rf_tx_desired': metrics.STAT_RF_TX_DESIRED,
106 'gps_connected': metrics.STAT_GPS_CONNECTED,
107 'ptp_connected': metrics.STAT_PTP_CONNECTED,
108 'mme_connected': metrics.STAT_MME_CONNECTED,
109 }
110
111 def get_metric_value(enodeb_status: Dict[str, str], key: str):
112 # Metrics are "sticky" when synced to the cloud - if we don't
113 # receive a status update from enodeb, set the metric to 0
114 # to explicitly indicate that it was not received, otherwise the
115 # metrics collector will continue to report the last value
116 val = enodeb_status.get(key, None)
117 if val is None:
118 return 0
119 if type(val) is not bool:
120 logger.error('Could not cast metric value %s to int', val)
121 return 0
122 return int(val) # val should be either True or False
123
124 for stat_key, metric in metrics_by_stat_key.items():
125 metric.set(get_metric_value(status._asdict(), stat_key))
126
127
128# TODO: Remove after checkins support multiple eNB status
129def get_service_status_old(
130 enb_acs_manager: StateMachineManager,
131) -> Dict[str, Any]:
132 """ Get service status compatible with older controller """
133 enb_status_by_serial = get_all_enb_status(enb_acs_manager)
134 # Since we only expect users to plug in a single eNB, generate service
135 # status with the first one we find that is connected
136 for enb_serial, enb_status in enb_status_by_serial.items():
137 if enb_status.enodeb_connected:
138 return MagmaOldEnodebdStatus(
139 enodeb_serial=enb_serial,
140 enodeb_configured=_bool_to_str(enb_status.enodeb_configured),
141 gps_latitude=enb_status.gps_latitude,
142 gps_longitude=enb_status.gps_longitude,
143 enodeb_connected=_bool_to_str(enb_status.enodeb_connected),
144 opstate_enabled=_bool_to_str(enb_status.opstate_enabled),
145 rf_tx_on=_bool_to_str(enb_status.rf_tx_on),
146 rf_tx_desired=_bool_to_str(enb_status.rf_tx_desired),
147 gps_connected=_bool_to_str(enb_status.gps_connected),
148 ptp_connected=_bool_to_str(enb_status.ptp_connected),
149 mme_connected=_bool_to_str(enb_status.mme_connected),
150 enodeb_state=enb_status.fsm_state,
151 )._asdict()
152 return MagmaOldEnodebdStatus(
153 enodeb_serial='N/A',
154 enodeb_configured='0',
155 gps_latitude='0.0',
156 gps_longitude='0.0',
157 enodeb_connected='0',
158 opstate_enabled='0',
159 rf_tx_on='0',
160 rf_tx_desired='N/A',
161 gps_connected='0',
162 ptp_connected='0',
163 mme_connected='0',
164 enodeb_state='N/A',
165 )._asdict()
166
167
168def get_service_status(enb_acs_manager: StateMachineManager) -> Dict[str, Any]:
169 enodebd_status = _get_enodebd_status(enb_acs_manager)
170 return enodebd_status._asdict()
171
172
173def _get_enodebd_status(
174 enb_acs_manager: StateMachineManager,
175) -> MagmaEnodebdStatus:
176 enb_status_by_serial = get_all_enb_status(enb_acs_manager)
177 # Start from default values for enodebd status
178 n_enodeb_connected = 0
179 all_enodeb_configured = False
180 all_enodeb_opstate_enabled = False
181 all_enodeb_rf_tx_configured = False
182 any_enodeb_gps_connected = False
183 all_enodeb_ptp_connected = False
184 all_enodeb_mme_connected = False
185 gateway_gps_longitude = '0.0'
186 gateway_gps_latitude = '0.0'
187
188 def _is_rf_tx_configured(enb_status: EnodebStatus) -> bool:
189 return enb_status.rf_tx_on == enb_status.rf_tx_desired
190
191 if enb_status_by_serial:
192 enb_status_list = list(enb_status_by_serial.values())
193 # Aggregate all eNB status for enodebd status, repetitive but
194 # clearer for output purposes.
195 n_enodeb_connected = sum(
196 enb_status.enodeb_connected for enb_status in enb_status_list
197 )
198 all_enodeb_configured = all(
199 enb_status.enodeb_configured for enb_status in enb_status_list
200 )
201 all_enodeb_mme_connected = all(
202 enb_status.mme_connected for enb_status in enb_status_list
203 )
204 all_enodeb_opstate_enabled = all(
205 enb_status.opstate_enabled for enb_status in enb_status_list
206 )
207 all_enodeb_ptp_connected = all(
208 enb_status.ptp_connected for enb_status in enb_status_list
209 )
210 any_enodeb_gps_connected = any(
211 enb_status.gps_connected for enb_status in enb_status_list
212 )
213 all_enodeb_rf_tx_configured = all(
214 _is_rf_tx_configured(enb_status) for enb_status in enb_status_list
215 )
216 if n_enodeb_connected:
217 gateway_gps_longitude = enb_status_list[0].gps_longitude
218 gateway_gps_latitude = enb_status_list[0].gps_latitude
219
220 return MagmaEnodebdStatus(
221 n_enodeb_connected=str(n_enodeb_connected),
222 all_enodeb_configured=str(all_enodeb_configured),
223 all_enodeb_opstate_enabled=str(all_enodeb_opstate_enabled),
224 all_enodeb_rf_tx_configured=str(all_enodeb_rf_tx_configured),
225 any_enodeb_gps_connected=str(any_enodeb_gps_connected),
226 all_enodeb_ptp_connected=str(all_enodeb_ptp_connected),
227 all_enodeb_mme_connected=str(all_enodeb_mme_connected),
228 gateway_gps_longitude=str(gateway_gps_longitude),
229 gateway_gps_latitude=str(gateway_gps_latitude),
230 )
231
232
233def get_all_enb_status(
234 enb_acs_manager: StateMachineManager,
235) -> Dict[str, EnodebStatus]:
236 enb_status_by_serial = {}
237 serial_list = enb_acs_manager.get_connected_serial_id_list()
238 for enb_serial in serial_list:
239 handler = enb_acs_manager.get_handler_by_serial(enb_serial)
240 status = get_enb_status(handler)
241 enb_status_by_serial[enb_serial] = status
242
243 return enb_status_by_serial
244
245
246def get_enb_status(enodeb: EnodebAcsStateMachine) -> EnodebStatus:
247 """
248 Returns a dict representing the status of an enodeb
249
250 The returned dictionary will be a subset of the following keys:
251 - enodeb_connected
252 - enodeb_configured
253 - opstate_enabled
254 - rf_tx_on
255 - rf_tx_desired
256 - gps_connected
257 - ptp_connected
258 - mme_connected
259 - gps_latitude
260 - gps_longitude
261 - ip_address
262 - cell_id
263
264 The set of keys returned will depend on the connection status of the
265 enodeb. A missing key indicates that the value is unknown.
266
267 Returns:
268 Status dictionary for the enodeb state
269 """
270 enodeb_configured = enodeb.is_enodeb_configured()
271
272 # We cache GPS coordinates so try to read them before the early return
273 # if the enB is not connected
274 gps_lat, gps_lon = _get_and_cache_gps_coords(enodeb)
275
276 enodeb_connected = enodeb.is_enodeb_connected()
277 opstate_enabled = _parse_param_as_bool(enodeb, ParameterName.OP_STATE)
278 rf_tx_on = _parse_param_as_bool(enodeb, ParameterName.RF_TX_STATUS)
279 rf_tx_on = rf_tx_on and enodeb_connected
280 try:
281 enb_serial = \
282 enodeb.device_cfg.get_parameter(ParameterName.SERIAL_NUMBER)
283 enb_cell_id = int(
284 enodeb.device_cfg.get_parameter(ParameterName.CELL_ID),
285 )
286 rf_tx_desired = get_enb_rf_tx_desired(enodeb.mconfig, enb_serial)
287 except (KeyError, ConfigurationError):
288 rf_tx_desired = False
289 enb_cell_id = 0
290
291 mme_connected = _parse_param_as_bool(enodeb, ParameterName.MME_STATUS)
292 gps_connected = _get_gps_status_as_bool(enodeb)
293 try:
294 ptp_connected = _parse_param_as_bool(enodeb, ParameterName.PTP_STATUS)
295 except ConfigurationError:
296 ptp_connected = False
297
298 return EnodebStatus(
299 enodeb_configured=enodeb_configured,
300 gps_latitude=gps_lat,
301 gps_longitude=gps_lon,
302 enodeb_connected=enodeb_connected,
303 opstate_enabled=opstate_enabled,
304 rf_tx_on=rf_tx_on,
305 rf_tx_desired=rf_tx_desired,
306 gps_connected=gps_connected,
307 ptp_connected=ptp_connected,
308 mme_connected=mme_connected,
309 fsm_state=enodeb.get_state(),
310 cell_id=enb_cell_id,
311 )
312
313
314def get_single_enb_status(
315 device_serial: str,
316 state_machine_manager: StateMachineManager,
317) -> SingleEnodebStatus:
318 try:
319 handler = state_machine_manager.get_handler_by_serial(device_serial)
320 except KeyError:
321 return _empty_enb_status()
322
323 # This namedtuple is missing IP and serial info
324 status = get_enb_status(handler)
325
326 # Get IP info
327 ip = state_machine_manager.get_ip_of_serial(device_serial)
328
329 def get_status_property(status: bool) -> SingleEnodebStatus.StatusProperty:
330 if status:
331 return SingleEnodebStatus.StatusProperty.Value('ON')
332 return SingleEnodebStatus.StatusProperty.Value('OFF')
333
334 # Build the message to return through gRPC
335 enb_status = SingleEnodebStatus()
336 enb_status.device_serial = device_serial
337 enb_status.ip_address = ip
338 enb_status.connected = get_status_property(status.enodeb_connected)
339 enb_status.configured = get_status_property(status.enodeb_configured)
340 enb_status.opstate_enabled = get_status_property(status.opstate_enabled)
341 enb_status.rf_tx_on = get_status_property(status.rf_tx_on)
342 enb_status.rf_tx_desired = get_status_property(status.rf_tx_desired)
343 enb_status.gps_connected = get_status_property(status.gps_connected)
344 enb_status.ptp_connected = get_status_property(status.ptp_connected)
345 enb_status.mme_connected = get_status_property(status.mme_connected)
346 enb_status.gps_longitude = status.gps_longitude
347 enb_status.gps_latitude = status.gps_latitude
348 enb_status.fsm_state = status.fsm_state
349 return enb_status
350
351
352def get_operational_states(
353 enb_acs_manager: StateMachineManager,
354 mconfig: mconfigs_pb2.EnodebD,
355) -> List[State]:
356 """
357 Returns: A list of State with EnodebStatus encoded as JSON
358 """
359 states = []
360 configured_serial_ids = []
361 enb_status_by_serial = get_all_enb_status(enb_acs_manager)
362
363 # Get S1 connected eNBs
364 enb_statuses = get_all_enb_state()
365
366 for serial_id in enb_status_by_serial:
367 enb_status_dict = enb_status_by_serial[serial_id]._asdict()
368
369 # Add IP address to state
370 enb_status_dict['ip_address'] = enb_acs_manager.get_ip_of_serial(
371 serial_id,
372 )
373
374 # Add num of UEs connected
375 num_ue_connected = enb_statuses.get(enb_status_dict['cell_id'], 0)
376 enb_status_dict['ues_connected'] = num_ue_connected
377
378 serialized = json.dumps(enb_status_dict)
379 state = State(
380 type="single_enodeb",
381 deviceID=serial_id,
382 value=serialized.encode('utf-8'),
383 )
384 configured_serial_ids.append(serial_id)
385 states.append(state)
386
387 # Get state for externally configured enodebs
388 s1_states = get_enb_s1_connected_states(
389 enb_statuses,
390 configured_serial_ids,
391 mconfig,
392 )
393 states.extend(s1_states)
394
395 return states
396
397
398def get_enb_s1_connected_states(
399 enb_s1_state_map, configured_serial_ids,
400 mconfig,
401) -> List[State]:
402 states = []
403 for enb_id in enb_s1_state_map:
404 enb = find_enb_by_cell_id(mconfig, enb_id)
405 if enb and enb.serial_num not in configured_serial_ids:
406 status = EnodebStatus(
407 enodeb_configured=False,
408 gps_latitude='N/A',
409 gps_longitude='N/A',
410 enodeb_connected=True,
411 opstate_enabled=False,
412 rf_tx_on=False,
413 rf_tx_desired=False,
414 gps_connected=False,
415 ptp_connected=False,
416 mme_connected=True,
417 fsm_state='N/A',
418 cell_id=enb_id,
419 )
420 status_dict = status._asdict()
421
422 # Add IP address to state
423 status_dict['ip_address'] = enb.config.ip_address
424
425 # Add num of UEs connected to state, use cellID from mconfig
426 status_dict['ues_connected'] = enb_s1_state_map.get(enb_id, 0)
427
428 serialized = json.dumps(status_dict)
429 state = State(
430 type="single_enodeb",
431 deviceID=enb.serial_num,
432 value=serialized.encode('utf-8'),
433 )
434 states.append(state)
435 return states
436
437
438def _empty_enb_status() -> SingleEnodebStatus:
439 enb_status = SingleEnodebStatus()
440 enb_status.device_serial = 'N/A'
441 enb_status.ip_address = 'N/A'
442 enb_status.connected = '0'
443 enb_status.configured = '0'
444 enb_status.opstate_enabled = '0'
445 enb_status.rf_tx_on = '0'
446 enb_status.rf_tx_desired = 'N/A'
447 enb_status.gps_connected = '0'
448 enb_status.ptp_connected = '0'
449 enb_status.mme_connected = '0'
450 enb_status.gps_longitude = '0.0'
451 enb_status.gps_latitude = '0.0'
452 enb_status.fsm_state = 'N/A'
453 return enb_status
454
455
456def _parse_param_as_bool(
457 enodeb: EnodebAcsStateMachine,
458 param_name: ParameterName,
459) -> bool:
460 try:
461 return _format_as_bool(enodeb.get_parameter(param_name), param_name)
462 except (KeyError, ConfigurationError):
463 return False
464
465
466def _format_as_bool(
467 param_value: Union[bool, str, int],
468 param_name: Optional[Union[ParameterName, str]] = None,
469) -> bool:
470 """ Returns '1' for true, and '0' for false """
471 stripped_value = str(param_value).lower().strip()
472 if stripped_value in {'true', '1'}:
473 return True
474 elif stripped_value in {'false', '0'}:
475 return False
476 else:
477 logger.warning(
478 '%s parameter not understood (%s)', param_name, param_value,
479 )
480 return False
481
482
483def _get_gps_status_as_bool(enodeb: EnodebAcsStateMachine) -> bool:
484 try:
485 if not enodeb.has_parameter(ParameterName.GPS_STATUS):
486 return False
487 else:
488 param = enodeb.get_parameter(ParameterName.GPS_STATUS)
489 if isinstance(param, bool):
490 # No translation to do.
491 return param
492 stripped_value = param.lower().strip()
493 if stripped_value == '0' or stripped_value == '2':
494 # 2 = GPS locking
495 return False
496 elif stripped_value == '1':
497 return True
498 else:
499 logger.warning(
500 'GPS status parameter not understood (%s)', param,
501 )
502 return False
503 except (KeyError, ConfigurationError):
504 return False
505
506
507def _get_and_cache_gps_coords(enodeb: EnodebAcsStateMachine) -> Tuple[
508 str, str,
509]:
510 """
511 Read the GPS coordinates of the enB from its configuration or the
512 cached coordinate file if the preceding read fails. If reading from
513 enB configuration succeeds, this method will cache the new coordinates.
514
515 Returns:
516 (str, str): GPS latitude, GPS longitude
517 """
518 lat, lon = '', ''
519 try:
520 lat = enodeb.get_parameter(ParameterName.GPS_LAT)
521 lon = enodeb.get_parameter(ParameterName.GPS_LONG)
522
523 if lat != _gps_lat_cached or lon != _gps_lon_cached:
524 _cache_new_gps_coords(lat, lon)
525 return lat, lon
526 except (KeyError, ConfigurationError):
527 return _get_cached_gps_coords()
528 except ValueError:
529 logger.warning('GPS lat/long not understood (%s/%s)', lat, lon)
530 return '0', '0'
531
532
533def _get_cached_gps_coords() -> Tuple[str, str]:
534 """
535 Returns cached GPS coordinates if enB is disconnected or otherwise not
536 reporting coordinates.
537
538 Returns:
539 (str, str): (GPS lat, GPS lon)
540 """
541 # pylint: disable=global-statement
542 global _gps_lat_cached, _gps_lon_cached
543 if _gps_lat_cached is None or _gps_lon_cached is None:
544 _gps_lat_cached, _gps_lon_cached = _read_gps_coords_from_file()
545 return _gps_lat_cached, _gps_lon_cached
546
547
548def _read_gps_coords_from_file():
549 try:
550 with open(CACHED_GPS_COORD_FILE_PATH, encoding="utf-8") as f:
551 lines = f.readlines()
552 if len(lines) != 2:
553 logger.warning(
554 'Expected to find 2 lines in GPS '
555 'coordinate file but only found %d',
556 len(lines),
557 )
558 return '0', '0'
559 return tuple(map(lambda l: l.strip(), lines))
560 except OSError:
561 logger.warning('Could not open cached GPS coordinate file')
562 return '0', '0'
563
564
565def _cache_new_gps_coords(gps_lat, gps_lon):
566 """
567 Cache GPS coordinates in the module-level variables here and write them
568 to a managed file on disk.
569
570 Args:
571 gps_lat (str): latitude as a string
572 gps_lon (str): longitude as a string
573 """
574 # pylint: disable=global-statement
575 global _gps_lat_cached, _gps_lon_cached
576 _gps_lat_cached, _gps_lon_cached = gps_lat, gps_lon
577 _write_gps_coords_to_file(gps_lat, gps_lon)
578
579
580def _write_gps_coords_to_file(gps_lat, gps_lon):
581 lines = '{lat}\n{lon}'.format(lat=gps_lat, lon=gps_lon)
582 try:
583 serialization_utils.write_to_file_atomically(
584 CACHED_GPS_COORD_FILE_PATH,
585 lines,
586 )
587 except OSError:
588 pass
589
590
591def _bool_to_str(b: bool) -> str:
592 if b is True:
593 return "1"
594 return "0"