blob: 9fb991352fde364c7381eb5ef0303377f1ad346d [file] [log] [blame]
Wei-Yu Chenad55cb82022-02-15 20:07:01 +08001# SPDX-FileCopyrightText: 2020 The Magma Authors.
2# SPDX-FileCopyrightText: 2022 Open Networking Foundation <support@opennetworking.org>
3#
4# SPDX-License-Identifier: BSD-3-Clause
Wei-Yu Chen49950b92021-11-08 19:19:18 +08005
6from typing import Any
7
8import grpc
9from lte.protos.enodebd_pb2 import (
10 AllEnodebStatus,
11 EnodebIdentity,
12 GetParameterRequest,
13 GetParameterResponse,
14 SetParameterRequest,
15 SingleEnodebStatus,
16)
17from lte.protos.enodebd_pb2_grpc import (
18 EnodebdServicer,
19 add_EnodebdServicer_to_server,
20)
21from common.rpc_utils import return_void
22from enodeb_status import (
23 get_service_status,
24 get_single_enb_status,
25)
26from state_machines.enb_acs import EnodebAcsStateMachine
27from state_machines.enb_acs_manager import StateMachineManager
28from orc8r.protos.service303_pb2 import ServiceStatus
29
30
31class EnodebdRpcServicer(EnodebdServicer):
32 """ gRPC based server for enodebd """
33
34 def __init__(self, state_machine_manager: StateMachineManager) -> None:
35 self.state_machine_manager = state_machine_manager
36
37 def add_to_server(self, server) -> None:
38 """
39 Add the servicer to a gRPC server
40 """
41 add_EnodebdServicer_to_server(self, server)
42
43 def _get_handler(self, device_serial: str) -> EnodebAcsStateMachine:
44 return self.state_machine_manager.get_handler_by_serial(device_serial)
45
46 def GetParameter(
47 self,
48 request: GetParameterRequest,
49 context: Any,
50 ) -> GetParameterResponse:
51 """
52 Sends a GetParameterValues message. Used for testing only.
53
54 Different data models will have different names for the same
55 parameter. Whatever name that the data model uses, we call the
56 'parameter path', eg. "Device.DeviceInfo.X_BAICELLS_COM_GPS_Status"
57 We denote 'ParameterName' to be a standard string name for
58 equivalent parameters between different data models
59 """
60 # Get the parameter value information
61 parameter_path = request.parameter_name
62 handler = self._get_handler(request.device_serial)
63 data_model = handler.data_model
64 param_name = data_model.get_parameter_name_from_path(parameter_path)
65 param_value = str(handler.get_parameter(param_name))
66
67 # And now construct the response to the rpc request
68 get_parameter_values_response = GetParameterResponse()
69 get_parameter_values_response.parameters.add(
70 name=parameter_path, value=param_value,
71 )
72 return get_parameter_values_response
73
74 @return_void
75 def SetParameter(self, request: SetParameterRequest, context: Any) -> None:
76 """
77 Sends a SetParameterValues message. Used for testing only.
78
79 Different data models will have different names for the same
80 parameter. Whatever name that the data model uses, we call the
81 'parameter path', eg. "Device.DeviceInfo.X_BAICELLS_COM_GPS_Status"
82 We denote 'ParameterName' to be a standard string name for
83 equivalent parameters between different data models
84 """
85 # Parse the request
86 if request.HasField('value_int'):
87 value = (request.value_int, 'int')
88 elif request.HasField('value_bool'):
89 value = (request.value_bool, 'boolean')
90 elif request.HasField('value_string'):
91 value = (request.value_string, 'string')
92 else:
93 context.set_details(
94 'SetParameter: Unsupported type %d',
95 request.type,
96 )
97 context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
98 return
99
100 # Update the handler so it will set the parameter value
101 parameter_path = request.parameter_name
102 handler = self._get_handler(request.device_serial)
103 data_model = handler.data_model
104 param_name = data_model.get_parameter_name_from_path(parameter_path)
105 handler.set_parameter_asap(param_name, value)
106
107 @return_void
108 def Reboot(self, request: EnodebIdentity, context=None) -> None:
109 """ Reboot eNodeB """
110 handler = self._get_handler(request.device_serial)
111 handler.reboot_asap()
112
113 @return_void
114 def RebootAll(self, _=None, context=None) -> None:
115 """ Reboot all connected eNodeB devices """
116 serial_list = self.state_machine_manager.get_connected_serial_id_list()
117 for enb_serial in serial_list:
118 handler = self._get_handler(enb_serial)
119 handler.reboot_asap()
120
121 def GetStatus(self, _=None, context=None) -> ServiceStatus:
122 """
123 Get eNodeB status
124 Note: input variable defaults used so this can be either called locally
125 or as an RPC.
126 """
127 status = dict(get_service_status(self.state_machine_manager))
128 status_message = ServiceStatus()
129 status_message.meta.update(status)
130 return status_message
131
132 def GetAllEnodebStatus(self, _=None, context=None) -> AllEnodebStatus:
133 all_enb_status = AllEnodebStatus()
134 serial_list = self.state_machine_manager.get_connected_serial_id_list()
135 for enb_serial in serial_list:
136 enb_status = get_single_enb_status(
137 enb_serial,
138 self.state_machine_manager,
139 )
140 all_enb_status.enb_status_list.add(
141 device_serial=enb_status.device_serial,
142 ip_address=enb_status.ip_address,
143 connected=enb_status.connected,
144 configured=enb_status.configured,
145 opstate_enabled=enb_status.opstate_enabled,
146 rf_tx_on=enb_status.rf_tx_on,
147 rf_tx_desired=enb_status.rf_tx_desired,
148 gps_connected=enb_status.gps_connected,
149 ptp_connected=enb_status.ptp_connected,
150 mme_connected=enb_status.mme_connected,
151 gps_longitude=enb_status.gps_longitude,
152 gps_latitude=enb_status.gps_latitude,
153 fsm_state=enb_status.fsm_state,
154 )
155
156 return all_enb_status
157
158 def GetEnodebStatus(
159 self,
160 request: EnodebIdentity,
161 _context=None,
162 ) -> SingleEnodebStatus:
163 return get_single_enb_status(
164 request.device_serial,
165 self.state_machine_manager,
166 )