blob: 30b3f4358929ee675454fae906e2fe633469be2e [file] [log] [blame]
Wei-Yu Chen49950b92021-11-08 19:19:18 +08001"""
2Copyright 2020 The Magma Authors.
3
4This source code is licensed under the BSD-style license found in the
5LICENSE file in the root directory of this source tree.
6
7Unless required by applicable law or agreed to in writing, software
8distributed under the License is distributed on an "AS IS" BASIS,
9WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10See the License for the specific language governing permissions and
11limitations under the License.
12"""
13
14from typing import Any
15
16import grpc
17from lte.protos.enodebd_pb2 import (
18 AllEnodebStatus,
19 EnodebIdentity,
20 GetParameterRequest,
21 GetParameterResponse,
22 SetParameterRequest,
23 SingleEnodebStatus,
24)
25from lte.protos.enodebd_pb2_grpc import (
26 EnodebdServicer,
27 add_EnodebdServicer_to_server,
28)
29from common.rpc_utils import return_void
30from enodeb_status import (
31 get_service_status,
32 get_single_enb_status,
33)
34from state_machines.enb_acs import EnodebAcsStateMachine
35from state_machines.enb_acs_manager import StateMachineManager
36from orc8r.protos.service303_pb2 import ServiceStatus
37
38
39class EnodebdRpcServicer(EnodebdServicer):
40 """ gRPC based server for enodebd """
41
42 def __init__(self, state_machine_manager: StateMachineManager) -> None:
43 self.state_machine_manager = state_machine_manager
44
45 def add_to_server(self, server) -> None:
46 """
47 Add the servicer to a gRPC server
48 """
49 add_EnodebdServicer_to_server(self, server)
50
51 def _get_handler(self, device_serial: str) -> EnodebAcsStateMachine:
52 return self.state_machine_manager.get_handler_by_serial(device_serial)
53
54 def GetParameter(
55 self,
56 request: GetParameterRequest,
57 context: Any,
58 ) -> GetParameterResponse:
59 """
60 Sends a GetParameterValues message. Used for testing only.
61
62 Different data models will have different names for the same
63 parameter. Whatever name that the data model uses, we call the
64 'parameter path', eg. "Device.DeviceInfo.X_BAICELLS_COM_GPS_Status"
65 We denote 'ParameterName' to be a standard string name for
66 equivalent parameters between different data models
67 """
68 # Get the parameter value information
69 parameter_path = request.parameter_name
70 handler = self._get_handler(request.device_serial)
71 data_model = handler.data_model
72 param_name = data_model.get_parameter_name_from_path(parameter_path)
73 param_value = str(handler.get_parameter(param_name))
74
75 # And now construct the response to the rpc request
76 get_parameter_values_response = GetParameterResponse()
77 get_parameter_values_response.parameters.add(
78 name=parameter_path, value=param_value,
79 )
80 return get_parameter_values_response
81
82 @return_void
83 def SetParameter(self, request: SetParameterRequest, context: Any) -> None:
84 """
85 Sends a SetParameterValues message. Used for testing only.
86
87 Different data models will have different names for the same
88 parameter. Whatever name that the data model uses, we call the
89 'parameter path', eg. "Device.DeviceInfo.X_BAICELLS_COM_GPS_Status"
90 We denote 'ParameterName' to be a standard string name for
91 equivalent parameters between different data models
92 """
93 # Parse the request
94 if request.HasField('value_int'):
95 value = (request.value_int, 'int')
96 elif request.HasField('value_bool'):
97 value = (request.value_bool, 'boolean')
98 elif request.HasField('value_string'):
99 value = (request.value_string, 'string')
100 else:
101 context.set_details(
102 'SetParameter: Unsupported type %d',
103 request.type,
104 )
105 context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
106 return
107
108 # Update the handler so it will set the parameter value
109 parameter_path = request.parameter_name
110 handler = self._get_handler(request.device_serial)
111 data_model = handler.data_model
112 param_name = data_model.get_parameter_name_from_path(parameter_path)
113 handler.set_parameter_asap(param_name, value)
114
115 @return_void
116 def Reboot(self, request: EnodebIdentity, context=None) -> None:
117 """ Reboot eNodeB """
118 handler = self._get_handler(request.device_serial)
119 handler.reboot_asap()
120
121 @return_void
122 def RebootAll(self, _=None, context=None) -> None:
123 """ Reboot all connected eNodeB devices """
124 serial_list = self.state_machine_manager.get_connected_serial_id_list()
125 for enb_serial in serial_list:
126 handler = self._get_handler(enb_serial)
127 handler.reboot_asap()
128
129 def GetStatus(self, _=None, context=None) -> ServiceStatus:
130 """
131 Get eNodeB status
132 Note: input variable defaults used so this can be either called locally
133 or as an RPC.
134 """
135 status = dict(get_service_status(self.state_machine_manager))
136 status_message = ServiceStatus()
137 status_message.meta.update(status)
138 return status_message
139
140 def GetAllEnodebStatus(self, _=None, context=None) -> AllEnodebStatus:
141 all_enb_status = AllEnodebStatus()
142 serial_list = self.state_machine_manager.get_connected_serial_id_list()
143 for enb_serial in serial_list:
144 enb_status = get_single_enb_status(
145 enb_serial,
146 self.state_machine_manager,
147 )
148 all_enb_status.enb_status_list.add(
149 device_serial=enb_status.device_serial,
150 ip_address=enb_status.ip_address,
151 connected=enb_status.connected,
152 configured=enb_status.configured,
153 opstate_enabled=enb_status.opstate_enabled,
154 rf_tx_on=enb_status.rf_tx_on,
155 rf_tx_desired=enb_status.rf_tx_desired,
156 gps_connected=enb_status.gps_connected,
157 ptp_connected=enb_status.ptp_connected,
158 mme_connected=enb_status.mme_connected,
159 gps_longitude=enb_status.gps_longitude,
160 gps_latitude=enb_status.gps_latitude,
161 fsm_state=enb_status.fsm_state,
162 )
163
164 return all_enb_status
165
166 def GetEnodebStatus(
167 self,
168 request: EnodebIdentity,
169 _context=None,
170 ) -> SingleEnodebStatus:
171 return get_single_enb_status(
172 request.device_serial,
173 self.state_machine_manager,
174 )