blob: 442061b93a8b4ee22a036878624aab26a242acf9 [file] [log] [blame]
Wei-Yu Chen49950b92021-11-08 19:19:18 +08001"""
2Copyright 2020 The Magma Authors.
3
4This source code is licensed under the BSD-style license found in the
5LICENSE file in the root directory of this source tree.
6
7Unless required by applicable law or agreed to in writing, software
8distributed under the License is distributed on an "AS IS" BASIS,
9WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10See the License for the specific language governing permissions and
11limitations under the License.
12"""
13
14import json
15import os
16from collections import namedtuple
17from typing import Any, Dict, List, NamedTuple, Optional, Tuple, Union
18
19from lte.protos.enodebd_pb2 import SingleEnodebStatus
20from lte.protos.mconfig import mconfigs_pb2
21from common import serialization_utils
22import metrics
23from data_models.data_model_parameters import ParameterName
24from device_config.configuration_util import (
25 find_enb_by_cell_id,
26 get_enb_rf_tx_desired,
27)
28from exceptions import ConfigurationError
29from logger import EnodebdLogger as logger
30from s1ap_client import get_all_enb_state
31from state_machines.enb_acs import EnodebAcsStateMachine
32from state_machines.enb_acs_manager import StateMachineManager
33from orc8r.protos.service303_pb2 import State
34
35# There are 2 levels of caching for GPS coordinates from the enodeB: module
36# variables (in-memory) and on disk. In the event the enodeB stops reporting
37# GPS, we will continue to report the cached coordinates from the in-memory
38# cached coordinates. If enodebd is restarted, this in-memory cache will be
39# populated by the file
40
41CACHED_GPS_COORD_FILE_PATH = os.path.join(
42 '/var/opt/magma/enodebd',
43 'gps_coords.txt',
44)
45
46# Cache GPS coordinates in memory so we don't write to the file cache if the
47# coordinates have not changed. We can read directly from here instead of the
48# file cache when the enodeB goes down unless these are unintialized.
49_gps_lat_cached = None
50_gps_lon_cached = None
51
52EnodebStatus = NamedTuple(
53 'EnodebStatus',
54 [
55 ('enodeb_configured', bool),
56 ('gps_latitude', str),
57 ('gps_longitude', str),
58 ('enodeb_connected', bool),
59 ('opstate_enabled', bool),
60 ('rf_tx_on', bool),
61 ('rf_tx_desired', bool),
62 ('gps_connected', bool),
63 ('ptp_connected', bool),
64 ('mme_connected', bool),
65 ('fsm_state', str),
66 ('cell_id', int),
67 ],
68)
69
70# TODO: Remove after checkins support multiple eNB status
71MagmaOldEnodebdStatus = namedtuple(
72 'MagmaOldEnodebdStatus',
73 [
74 'enodeb_serial',
75 'enodeb_configured',
76 'gps_latitude',
77 'gps_longitude',
78 'enodeb_connected',
79 'opstate_enabled',
80 'rf_tx_on',
81 'rf_tx_desired',
82 'gps_connected',
83 'ptp_connected',
84 'mme_connected',
85 'enodeb_state',
86 ],
87)
88
89MagmaEnodebdStatus = NamedTuple(
90 'MagmaEnodebdStatus',
91 [
92 ('n_enodeb_connected', str),
93 ('all_enodeb_configured', str),
94 ('all_enodeb_opstate_enabled', str),
95 ('all_enodeb_rf_tx_configured', str),
96 ('any_enodeb_gps_connected', str),
97 ('all_enodeb_ptp_connected', str),
98 ('all_enodeb_mme_connected', str),
99 ('gateway_gps_longitude', str),
100 ('gateway_gps_latitude', str),
101 ],
102)
103
104
105def update_status_metrics(status: EnodebStatus) -> None:
106 """ Update metrics for eNodeB status """
107 # Call every second
108 metrics_by_stat_key = {
109 'enodeb_connected': metrics.STAT_ENODEB_CONNECTED,
110 'enodeb_configured': metrics.STAT_ENODEB_CONFIGURED,
111 'opstate_enabled': metrics.STAT_OPSTATE_ENABLED,
112 'rf_tx_on': metrics.STAT_RF_TX_ENABLED,
113 'rf_tx_desired': metrics.STAT_RF_TX_DESIRED,
114 'gps_connected': metrics.STAT_GPS_CONNECTED,
115 'ptp_connected': metrics.STAT_PTP_CONNECTED,
116 'mme_connected': metrics.STAT_MME_CONNECTED,
117 }
118
119 def get_metric_value(enodeb_status: Dict[str, str], key: str):
120 # Metrics are "sticky" when synced to the cloud - if we don't
121 # receive a status update from enodeb, set the metric to 0
122 # to explicitly indicate that it was not received, otherwise the
123 # metrics collector will continue to report the last value
124 val = enodeb_status.get(key, None)
125 if val is None:
126 return 0
127 if type(val) is not bool:
128 logger.error('Could not cast metric value %s to int', val)
129 return 0
130 return int(val) # val should be either True or False
131
132 for stat_key, metric in metrics_by_stat_key.items():
133 metric.set(get_metric_value(status._asdict(), stat_key))
134
135
136# TODO: Remove after checkins support multiple eNB status
137def get_service_status_old(
138 enb_acs_manager: StateMachineManager,
139) -> Dict[str, Any]:
140 """ Get service status compatible with older controller """
141 enb_status_by_serial = get_all_enb_status(enb_acs_manager)
142 # Since we only expect users to plug in a single eNB, generate service
143 # status with the first one we find that is connected
144 for enb_serial, enb_status in enb_status_by_serial.items():
145 if enb_status.enodeb_connected:
146 return MagmaOldEnodebdStatus(
147 enodeb_serial=enb_serial,
148 enodeb_configured=_bool_to_str(enb_status.enodeb_configured),
149 gps_latitude=enb_status.gps_latitude,
150 gps_longitude=enb_status.gps_longitude,
151 enodeb_connected=_bool_to_str(enb_status.enodeb_connected),
152 opstate_enabled=_bool_to_str(enb_status.opstate_enabled),
153 rf_tx_on=_bool_to_str(enb_status.rf_tx_on),
154 rf_tx_desired=_bool_to_str(enb_status.rf_tx_desired),
155 gps_connected=_bool_to_str(enb_status.gps_connected),
156 ptp_connected=_bool_to_str(enb_status.ptp_connected),
157 mme_connected=_bool_to_str(enb_status.mme_connected),
158 enodeb_state=enb_status.fsm_state,
159 )._asdict()
160 return MagmaOldEnodebdStatus(
161 enodeb_serial='N/A',
162 enodeb_configured='0',
163 gps_latitude='0.0',
164 gps_longitude='0.0',
165 enodeb_connected='0',
166 opstate_enabled='0',
167 rf_tx_on='0',
168 rf_tx_desired='N/A',
169 gps_connected='0',
170 ptp_connected='0',
171 mme_connected='0',
172 enodeb_state='N/A',
173 )._asdict()
174
175
176def get_service_status(enb_acs_manager: StateMachineManager) -> Dict[str, Any]:
177 enodebd_status = _get_enodebd_status(enb_acs_manager)
178 return enodebd_status._asdict()
179
180
181def _get_enodebd_status(
182 enb_acs_manager: StateMachineManager,
183) -> MagmaEnodebdStatus:
184 enb_status_by_serial = get_all_enb_status(enb_acs_manager)
185 # Start from default values for enodebd status
186 n_enodeb_connected = 0
187 all_enodeb_configured = False
188 all_enodeb_opstate_enabled = False
189 all_enodeb_rf_tx_configured = False
190 any_enodeb_gps_connected = False
191 all_enodeb_ptp_connected = False
192 all_enodeb_mme_connected = False
193 gateway_gps_longitude = '0.0'
194 gateway_gps_latitude = '0.0'
195
196 def _is_rf_tx_configured(enb_status: EnodebStatus) -> bool:
197 return enb_status.rf_tx_on == enb_status.rf_tx_desired
198
199 if enb_status_by_serial:
200 enb_status_list = list(enb_status_by_serial.values())
201 # Aggregate all eNB status for enodebd status, repetitive but
202 # clearer for output purposes.
203 n_enodeb_connected = sum(
204 enb_status.enodeb_connected for enb_status in enb_status_list
205 )
206 all_enodeb_configured = all(
207 enb_status.enodeb_configured for enb_status in enb_status_list
208 )
209 all_enodeb_mme_connected = all(
210 enb_status.mme_connected for enb_status in enb_status_list
211 )
212 all_enodeb_opstate_enabled = all(
213 enb_status.opstate_enabled for enb_status in enb_status_list
214 )
215 all_enodeb_ptp_connected = all(
216 enb_status.ptp_connected for enb_status in enb_status_list
217 )
218 any_enodeb_gps_connected = any(
219 enb_status.gps_connected for enb_status in enb_status_list
220 )
221 all_enodeb_rf_tx_configured = all(
222 _is_rf_tx_configured(enb_status) for enb_status in enb_status_list
223 )
224 if n_enodeb_connected:
225 gateway_gps_longitude = enb_status_list[0].gps_longitude
226 gateway_gps_latitude = enb_status_list[0].gps_latitude
227
228 return MagmaEnodebdStatus(
229 n_enodeb_connected=str(n_enodeb_connected),
230 all_enodeb_configured=str(all_enodeb_configured),
231 all_enodeb_opstate_enabled=str(all_enodeb_opstate_enabled),
232 all_enodeb_rf_tx_configured=str(all_enodeb_rf_tx_configured),
233 any_enodeb_gps_connected=str(any_enodeb_gps_connected),
234 all_enodeb_ptp_connected=str(all_enodeb_ptp_connected),
235 all_enodeb_mme_connected=str(all_enodeb_mme_connected),
236 gateway_gps_longitude=str(gateway_gps_longitude),
237 gateway_gps_latitude=str(gateway_gps_latitude),
238 )
239
240
241def get_all_enb_status(
242 enb_acs_manager: StateMachineManager,
243) -> Dict[str, EnodebStatus]:
244 enb_status_by_serial = {}
245 serial_list = enb_acs_manager.get_connected_serial_id_list()
246 for enb_serial in serial_list:
247 handler = enb_acs_manager.get_handler_by_serial(enb_serial)
248 status = get_enb_status(handler)
249 enb_status_by_serial[enb_serial] = status
250
251 return enb_status_by_serial
252
253
254def get_enb_status(enodeb: EnodebAcsStateMachine) -> EnodebStatus:
255 """
256 Returns a dict representing the status of an enodeb
257
258 The returned dictionary will be a subset of the following keys:
259 - enodeb_connected
260 - enodeb_configured
261 - opstate_enabled
262 - rf_tx_on
263 - rf_tx_desired
264 - gps_connected
265 - ptp_connected
266 - mme_connected
267 - gps_latitude
268 - gps_longitude
269 - ip_address
270 - cell_id
271
272 The set of keys returned will depend on the connection status of the
273 enodeb. A missing key indicates that the value is unknown.
274
275 Returns:
276 Status dictionary for the enodeb state
277 """
278 enodeb_configured = enodeb.is_enodeb_configured()
279
280 # We cache GPS coordinates so try to read them before the early return
281 # if the enB is not connected
282 gps_lat, gps_lon = _get_and_cache_gps_coords(enodeb)
283
284 enodeb_connected = enodeb.is_enodeb_connected()
285 opstate_enabled = _parse_param_as_bool(enodeb, ParameterName.OP_STATE)
286 rf_tx_on = _parse_param_as_bool(enodeb, ParameterName.RF_TX_STATUS)
287 rf_tx_on = rf_tx_on and enodeb_connected
288 try:
289 enb_serial = \
290 enodeb.device_cfg.get_parameter(ParameterName.SERIAL_NUMBER)
291 enb_cell_id = int(
292 enodeb.device_cfg.get_parameter(ParameterName.CELL_ID),
293 )
294 rf_tx_desired = get_enb_rf_tx_desired(enodeb.mconfig, enb_serial)
295 except (KeyError, ConfigurationError):
296 rf_tx_desired = False
297 enb_cell_id = 0
298
299 mme_connected = _parse_param_as_bool(enodeb, ParameterName.MME_STATUS)
300 gps_connected = _get_gps_status_as_bool(enodeb)
301 try:
302 ptp_connected = _parse_param_as_bool(enodeb, ParameterName.PTP_STATUS)
303 except ConfigurationError:
304 ptp_connected = False
305
306 return EnodebStatus(
307 enodeb_configured=enodeb_configured,
308 gps_latitude=gps_lat,
309 gps_longitude=gps_lon,
310 enodeb_connected=enodeb_connected,
311 opstate_enabled=opstate_enabled,
312 rf_tx_on=rf_tx_on,
313 rf_tx_desired=rf_tx_desired,
314 gps_connected=gps_connected,
315 ptp_connected=ptp_connected,
316 mme_connected=mme_connected,
317 fsm_state=enodeb.get_state(),
318 cell_id=enb_cell_id,
319 )
320
321
322def get_single_enb_status(
323 device_serial: str,
324 state_machine_manager: StateMachineManager,
325) -> SingleEnodebStatus:
326 try:
327 handler = state_machine_manager.get_handler_by_serial(device_serial)
328 except KeyError:
329 return _empty_enb_status()
330
331 # This namedtuple is missing IP and serial info
332 status = get_enb_status(handler)
333
334 # Get IP info
335 ip = state_machine_manager.get_ip_of_serial(device_serial)
336
337 def get_status_property(status: bool) -> SingleEnodebStatus.StatusProperty:
338 if status:
339 return SingleEnodebStatus.StatusProperty.Value('ON')
340 return SingleEnodebStatus.StatusProperty.Value('OFF')
341
342 # Build the message to return through gRPC
343 enb_status = SingleEnodebStatus()
344 enb_status.device_serial = device_serial
345 enb_status.ip_address = ip
346 enb_status.connected = get_status_property(status.enodeb_connected)
347 enb_status.configured = get_status_property(status.enodeb_configured)
348 enb_status.opstate_enabled = get_status_property(status.opstate_enabled)
349 enb_status.rf_tx_on = get_status_property(status.rf_tx_on)
350 enb_status.rf_tx_desired = get_status_property(status.rf_tx_desired)
351 enb_status.gps_connected = get_status_property(status.gps_connected)
352 enb_status.ptp_connected = get_status_property(status.ptp_connected)
353 enb_status.mme_connected = get_status_property(status.mme_connected)
354 enb_status.gps_longitude = status.gps_longitude
355 enb_status.gps_latitude = status.gps_latitude
356 enb_status.fsm_state = status.fsm_state
357 return enb_status
358
359
360def get_operational_states(
361 enb_acs_manager: StateMachineManager,
362 mconfig: mconfigs_pb2.EnodebD,
363) -> List[State]:
364 """
365 Returns: A list of State with EnodebStatus encoded as JSON
366 """
367 states = []
368 configured_serial_ids = []
369 enb_status_by_serial = get_all_enb_status(enb_acs_manager)
370
371 # Get S1 connected eNBs
372 enb_statuses = get_all_enb_state()
373
374 for serial_id in enb_status_by_serial:
375 enb_status_dict = enb_status_by_serial[serial_id]._asdict()
376
377 # Add IP address to state
378 enb_status_dict['ip_address'] = enb_acs_manager.get_ip_of_serial(
379 serial_id,
380 )
381
382 # Add num of UEs connected
383 num_ue_connected = enb_statuses.get(enb_status_dict['cell_id'], 0)
384 enb_status_dict['ues_connected'] = num_ue_connected
385
386 serialized = json.dumps(enb_status_dict)
387 state = State(
388 type="single_enodeb",
389 deviceID=serial_id,
390 value=serialized.encode('utf-8'),
391 )
392 configured_serial_ids.append(serial_id)
393 states.append(state)
394
395 # Get state for externally configured enodebs
396 s1_states = get_enb_s1_connected_states(
397 enb_statuses,
398 configured_serial_ids,
399 mconfig,
400 )
401 states.extend(s1_states)
402
403 return states
404
405
406def get_enb_s1_connected_states(
407 enb_s1_state_map, configured_serial_ids,
408 mconfig,
409) -> List[State]:
410 states = []
411 for enb_id in enb_s1_state_map:
412 enb = find_enb_by_cell_id(mconfig, enb_id)
413 if enb and enb.serial_num not in configured_serial_ids:
414 status = EnodebStatus(
415 enodeb_configured=False,
416 gps_latitude='N/A',
417 gps_longitude='N/A',
418 enodeb_connected=True,
419 opstate_enabled=False,
420 rf_tx_on=False,
421 rf_tx_desired=False,
422 gps_connected=False,
423 ptp_connected=False,
424 mme_connected=True,
425 fsm_state='N/A',
426 cell_id=enb_id,
427 )
428 status_dict = status._asdict()
429
430 # Add IP address to state
431 status_dict['ip_address'] = enb.config.ip_address
432
433 # Add num of UEs connected to state, use cellID from mconfig
434 status_dict['ues_connected'] = enb_s1_state_map.get(enb_id, 0)
435
436 serialized = json.dumps(status_dict)
437 state = State(
438 type="single_enodeb",
439 deviceID=enb.serial_num,
440 value=serialized.encode('utf-8'),
441 )
442 states.append(state)
443 return states
444
445
446def _empty_enb_status() -> SingleEnodebStatus:
447 enb_status = SingleEnodebStatus()
448 enb_status.device_serial = 'N/A'
449 enb_status.ip_address = 'N/A'
450 enb_status.connected = '0'
451 enb_status.configured = '0'
452 enb_status.opstate_enabled = '0'
453 enb_status.rf_tx_on = '0'
454 enb_status.rf_tx_desired = 'N/A'
455 enb_status.gps_connected = '0'
456 enb_status.ptp_connected = '0'
457 enb_status.mme_connected = '0'
458 enb_status.gps_longitude = '0.0'
459 enb_status.gps_latitude = '0.0'
460 enb_status.fsm_state = 'N/A'
461 return enb_status
462
463
464def _parse_param_as_bool(
465 enodeb: EnodebAcsStateMachine,
466 param_name: ParameterName,
467) -> bool:
468 try:
469 return _format_as_bool(enodeb.get_parameter(param_name), param_name)
470 except (KeyError, ConfigurationError):
471 return False
472
473
474def _format_as_bool(
475 param_value: Union[bool, str, int],
476 param_name: Optional[Union[ParameterName, str]] = None,
477) -> bool:
478 """ Returns '1' for true, and '0' for false """
479 stripped_value = str(param_value).lower().strip()
480 if stripped_value in {'true', '1'}:
481 return True
482 elif stripped_value in {'false', '0'}:
483 return False
484 else:
485 logger.warning(
486 '%s parameter not understood (%s)', param_name, param_value,
487 )
488 return False
489
490
491def _get_gps_status_as_bool(enodeb: EnodebAcsStateMachine) -> bool:
492 try:
493 if not enodeb.has_parameter(ParameterName.GPS_STATUS):
494 return False
495 else:
496 param = enodeb.get_parameter(ParameterName.GPS_STATUS)
497 if isinstance(param, bool):
498 # No translation to do.
499 return param
500 stripped_value = param.lower().strip()
501 if stripped_value == '0' or stripped_value == '2':
502 # 2 = GPS locking
503 return False
504 elif stripped_value == '1':
505 return True
506 else:
507 logger.warning(
508 'GPS status parameter not understood (%s)', param,
509 )
510 return False
511 except (KeyError, ConfigurationError):
512 return False
513
514
515def _get_and_cache_gps_coords(enodeb: EnodebAcsStateMachine) -> Tuple[
516 str, str,
517]:
518 """
519 Read the GPS coordinates of the enB from its configuration or the
520 cached coordinate file if the preceding read fails. If reading from
521 enB configuration succeeds, this method will cache the new coordinates.
522
523 Returns:
524 (str, str): GPS latitude, GPS longitude
525 """
526 lat, lon = '', ''
527 try:
528 lat = enodeb.get_parameter(ParameterName.GPS_LAT)
529 lon = enodeb.get_parameter(ParameterName.GPS_LONG)
530
531 if lat != _gps_lat_cached or lon != _gps_lon_cached:
532 _cache_new_gps_coords(lat, lon)
533 return lat, lon
534 except (KeyError, ConfigurationError):
535 return _get_cached_gps_coords()
536 except ValueError:
537 logger.warning('GPS lat/long not understood (%s/%s)', lat, lon)
538 return '0', '0'
539
540
541def _get_cached_gps_coords() -> Tuple[str, str]:
542 """
543 Returns cached GPS coordinates if enB is disconnected or otherwise not
544 reporting coordinates.
545
546 Returns:
547 (str, str): (GPS lat, GPS lon)
548 """
549 # pylint: disable=global-statement
550 global _gps_lat_cached, _gps_lon_cached
551 if _gps_lat_cached is None or _gps_lon_cached is None:
552 _gps_lat_cached, _gps_lon_cached = _read_gps_coords_from_file()
553 return _gps_lat_cached, _gps_lon_cached
554
555
556def _read_gps_coords_from_file():
557 try:
558 with open(CACHED_GPS_COORD_FILE_PATH, encoding="utf-8") as f:
559 lines = f.readlines()
560 if len(lines) != 2:
561 logger.warning(
562 'Expected to find 2 lines in GPS '
563 'coordinate file but only found %d',
564 len(lines),
565 )
566 return '0', '0'
567 return tuple(map(lambda l: l.strip(), lines))
568 except OSError:
569 logger.warning('Could not open cached GPS coordinate file')
570 return '0', '0'
571
572
573def _cache_new_gps_coords(gps_lat, gps_lon):
574 """
575 Cache GPS coordinates in the module-level variables here and write them
576 to a managed file on disk.
577
578 Args:
579 gps_lat (str): latitude as a string
580 gps_lon (str): longitude as a string
581 """
582 # pylint: disable=global-statement
583 global _gps_lat_cached, _gps_lon_cached
584 _gps_lat_cached, _gps_lon_cached = gps_lat, gps_lon
585 _write_gps_coords_to_file(gps_lat, gps_lon)
586
587
588def _write_gps_coords_to_file(gps_lat, gps_lon):
589 lines = '{lat}\n{lon}'.format(lat=gps_lat, lon=gps_lon)
590 try:
591 serialization_utils.write_to_file_atomically(
592 CACHED_GPS_COORD_FILE_PATH,
593 lines,
594 )
595 except OSError:
596 pass
597
598
599def _bool_to_str(b: bool) -> str:
600 if b is True:
601 return "1"
602 return "0"