blob: d9e2ed92789d5d05a79fd41b708853ab28ea25d6 [file] [log] [blame]
Wei-Yu Chen49950b92021-11-08 19:19:18 +08001"""
2Copyright 2020 The Magma Authors.
3
4This source code is licensed under the BSD-style license found in the
5LICENSE file in the root directory of this source tree.
6
7Unless required by applicable law or agreed to in writing, software
8distributed under the License is distributed on an "AS IS" BASIS,
9WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10See the License for the specific language governing permissions and
11limitations under the License.
12"""
Wei-Yu Chen678f0a52021-12-21 13:50:52 +080013
14import time
Wei-Yu Chen49950b92021-11-08 19:19:18 +080015from abc import ABC, abstractmethod
16from collections import namedtuple
17from typing import Any, Optional
18
Wei-Yu Chen678f0a52021-12-21 13:50:52 +080019import metrics
20
21from configuration.service_configs import load_service_config
Wei-Yu Chen49950b92021-11-08 19:19:18 +080022from data_models.data_model import InvalidTrParamPath
23from data_models.data_model_parameters import ParameterName
24from device_config.configuration_init import build_desired_config
25from exceptions import ConfigurationError, Tr069Error
26from logger import EnodebdLogger as logger
27from state_machines.acs_state_utils import (
28 does_inform_have_event,
29 get_all_objects_to_add,
30 get_all_objects_to_delete,
31 get_all_param_values_to_set,
32 get_obj_param_values_to_set,
33 get_object_params_to_get,
34 get_optional_param_to_check,
35 get_param_values_to_set,
36 get_params_to_get,
37 parse_get_parameter_values_response,
38 process_inform_message,
39)
40from state_machines.enb_acs import EnodebAcsStateMachine
41from state_machines.timer import StateMachineTimer
42from tr069 import models
43
44AcsMsgAndTransition = namedtuple(
45 'AcsMsgAndTransition', ['msg', 'next_state'],
46)
47
48AcsReadMsgResult = namedtuple(
49 'AcsReadMsgResult', ['msg_handled', 'next_state'],
50)
51
52
53class EnodebAcsState(ABC):
54 """
55 State class for the Enodeb state machine
56
57 States can transition after reading a message from the eNB, sending a
58 message out to the eNB, or when a timer completes. As such, some states
59 are only responsible for message sending, and others are only responsible
60 for reading incoming messages.
61
62 In the constructor, set up state transitions.
63 """
64
65 def __init__(self):
66 self._acs = None
67
68 def enter(self) -> None:
69 """
70 Set up your timers here. Call transition(..) on the ACS when the timer
71 completes or throw an error
72 """
73 pass
74
75 def exit(self) -> None:
76 """Destroy timers here"""
77 pass
78
79 def read_msg(self, message: Any) -> AcsReadMsgResult:
80 """
81 Args: message: tr069 message
82 Returns: name of the next state, if transition required
83 """
84 raise ConfigurationError(
85 '%s should implement read_msg() if it '
86 'needs to handle message reading' % self.__class__.__name__,
87 )
88
89 def get_msg(self, message: Any) -> AcsMsgAndTransition:
90 """
91 Produce a message to send back to the eNB.
92
93 Args:
94 message: TR-069 message which was already processed by read_msg
95
96 Returns: Message and possible transition
97 """
98 raise ConfigurationError(
99 '%s should implement get_msg() if it '
100 'needs to produce messages' % self.__class__.__name__,
101 )
102
103 @property
104 def acs(self) -> EnodebAcsStateMachine:
105 return self._acs
106
107 @acs.setter
108 def acs(self, val: EnodebAcsStateMachine) -> None:
109 self._acs = val
110
111 @abstractmethod
112 def state_description(self) -> str:
113 """ Provide a few words about what the state represents """
114 pass
115
116
117class WaitInformState(EnodebAcsState):
118 """
119 This state indicates that no Inform message has been received yet, or
120 that no Inform message has been received for a long time.
121
122 This state is used to handle an Inform message that arrived when enodebd
123 already believes that the eNB is connected. As such, it is unclear to
124 enodebd whether the eNB is just sending another Inform, or if a different
125 eNB was plugged into the same interface.
126 """
127
128 def __init__(
129 self,
130 acs: EnodebAcsStateMachine,
131 when_done: str,
132 when_boot: Optional[str] = None,
133 ):
134 super().__init__()
135 self.acs = acs
136 self.done_transition = when_done
137 self.boot_transition = when_boot
138 self.has_enb_just_booted = False
139
140 def read_msg(self, message: Any) -> AcsReadMsgResult:
141 """
142 Args:
143 message: models.Inform Tr069 Inform message
144 """
145 if not isinstance(message, models.Inform):
146 return AcsReadMsgResult(False, None)
147 process_inform_message(
148 message, self.acs.data_model,
149 self.acs.device_cfg,
150 )
151 if does_inform_have_event(message, '1 BOOT'):
152 return AcsReadMsgResult(True, self.boot_transition)
153 return AcsReadMsgResult(True, None)
154
155 def get_msg(self, message: Any) -> AcsMsgAndTransition:
156 """ Reply with InformResponse """
157 response = models.InformResponse()
158 # Set maxEnvelopes to 1, as per TR-069 spec
159 response.MaxEnvelopes = 1
160 return AcsMsgAndTransition(response, self.done_transition)
161
162 def state_description(self) -> str:
163 return 'Waiting for an Inform'
164
165
166class GetRPCMethodsState(EnodebAcsState):
167 """
168 After the first Inform message from boot, it is expected that the eNB
169 will try to learn the RPC methods of the ACS.
170 """
171
172 def __init__(self, acs: EnodebAcsStateMachine, when_done: str, when_skip: str):
173 super().__init__()
174 self.acs = acs
175 self.done_transition = when_done
176 self.skip_transition = when_skip
177
178 def read_msg(self, message: Any) -> AcsReadMsgResult:
179 # If this is a regular Inform, not after a reboot we'll get an empty
180 if isinstance(message, models.DummyInput):
181 return AcsReadMsgResult(True, self.skip_transition)
182 if not isinstance(message, models.GetRPCMethods):
183 return AcsReadMsgResult(False, self.done_transition)
184 return AcsReadMsgResult(True, None)
185
186 def get_msg(self, message: Any) -> AcsMsgAndTransition:
187 resp = models.GetRPCMethodsResponse()
188 resp.MethodList = models.MethodList()
189 RPC_METHODS = ['Inform', 'GetRPCMethods', 'TransferComplete']
190 resp.MethodList.arrayType = 'xsd:string[%d]' \
191 % len(RPC_METHODS)
192 resp.MethodList.string = RPC_METHODS
193 return AcsMsgAndTransition(resp, self.done_transition)
194
195 def state_description(self) -> str:
196 return 'Waiting for incoming GetRPC Methods after boot'
197
198
199class BaicellsRemWaitState(EnodebAcsState):
200 """
201 We've already received an Inform message. This state is to handle a
202 Baicells eNodeB issue.
203
204 After eNodeB is rebooted, hold off configuring it for some time to give
205 time for REM to run. This is a BaiCells eNodeB issue that doesn't support
206 enabling the eNodeB during initial REM.
207
208 In this state, just hang at responding to Inform, and then ending the
209 TR-069 session.
210 """
211
212 CONFIG_DELAY_AFTER_BOOT = 600
213
214 def __init__(self, acs: EnodebAcsStateMachine, when_done: str):
215 super().__init__()
216 self.acs = acs
217 self.done_transition = when_done
218 self.rem_timer = None
219
220 def enter(self):
221 self.rem_timer = StateMachineTimer(self.CONFIG_DELAY_AFTER_BOOT)
222 logger.info(
223 'Holding off of eNB configuration for %s seconds. '
224 'Will resume after eNB REM process has finished. ',
225 self.CONFIG_DELAY_AFTER_BOOT,
226 )
227
228 def exit(self):
229 self.rem_timer = None
230
231 def read_msg(self, message: Any) -> AcsReadMsgResult:
232 if not isinstance(message, models.Inform):
233 return AcsReadMsgResult(False, None)
234 process_inform_message(
235 message, self.acs.data_model,
236 self.acs.device_cfg,
237 )
238 return AcsReadMsgResult(True, None)
239
240 def get_msg(self, message: Any) -> AcsMsgAndTransition:
241 if self.rem_timer.is_done():
242 return AcsMsgAndTransition(
243 models.DummyInput(),
244 self.done_transition,
245 )
246 return AcsMsgAndTransition(models.DummyInput(), None)
247
248 def state_description(self) -> str:
249 remaining = self.rem_timer.seconds_remaining()
250 return 'Waiting for eNB REM to run for %d more seconds before ' \
251 'resuming with configuration.' % remaining
252
253
254class WaitEmptyMessageState(EnodebAcsState):
255 def __init__(
256 self,
257 acs: EnodebAcsStateMachine,
258 when_done: str,
259 when_missing: Optional[str] = None,
260 ):
261 super().__init__()
262 self.acs = acs
263 self.done_transition = when_done
264 self.unknown_param_transition = when_missing
265
266 def read_msg(self, message: Any) -> AcsReadMsgResult:
267 """
268 It's expected that we transition into this state right after receiving
269 an Inform message and replying with an InformResponse. At that point,
270 the eNB sends an empty HTTP request (aka DummyInput) to initiate the
271 rest of the provisioning process
272 """
273 if not isinstance(message, models.DummyInput):
274 logger.debug("Ignoring message %s", str(type(message)))
275 return AcsReadMsgResult(msg_handled=False, next_state=None)
276 if self.unknown_param_transition:
277 if get_optional_param_to_check(self.acs.data_model):
278 return AcsReadMsgResult(
279 msg_handled=True,
280 next_state=self.unknown_param_transition,
281 )
282 return AcsReadMsgResult(
283 msg_handled=True,
284 next_state=self.done_transition,
285 )
286
287 def get_msg(self, message: Any) -> AcsReadMsgResult:
288 """
289 Return a dummy message waiting for the empty message from CPE
290 """
291 request = models.DummyInput()
292 return AcsMsgAndTransition(msg=request, next_state=None)
293
294 def state_description(self) -> str:
295 return 'Waiting for empty message from eNodeB'
296
297
298class CheckOptionalParamsState(EnodebAcsState):
299 def __init__(
300 self,
301 acs: EnodebAcsStateMachine,
302 when_done: str,
303 ):
304 super().__init__()
305 self.acs = acs
306 self.done_transition = when_done
307 self.optional_param = None
308
309 def get_msg(self, message: Any) -> AcsMsgAndTransition:
310 self.optional_param = get_optional_param_to_check(self.acs.data_model)
311 if self.optional_param is None:
312 raise Tr069Error('Invalid State')
313 # Generate the request
314 request = models.GetParameterValues()
315 request.ParameterNames = models.ParameterNames()
316 request.ParameterNames.arrayType = 'xsd:string[1]'
317 request.ParameterNames.string = []
318 path = self.acs.data_model.get_parameter(self.optional_param).path
319 request.ParameterNames.string.append(path)
320 return AcsMsgAndTransition(request, None)
321
322 def read_msg(self, message: Any) -> AcsReadMsgResult:
323 """ Process either GetParameterValuesResponse or a Fault """
324 if type(message) == models.Fault:
325 self.acs.data_model.set_parameter_presence(
326 self.optional_param,
327 False,
328 )
329 elif type(message) == models.GetParameterValuesResponse:
330 name_to_val = parse_get_parameter_values_response(
331 self.acs.data_model,
332 message,
333 )
334 logger.debug(
335 'Received CPE parameter values: %s',
336 str(name_to_val),
337 )
338 for name, val in name_to_val.items():
339 self.acs.data_model.set_parameter_presence(
340 self.optional_param,
341 True,
342 )
343 magma_val = self.acs.data_model.transform_for_magma(name, val)
344 self.acs.device_cfg.set_parameter(name, magma_val)
345 else:
346 return AcsReadMsgResult(False, None)
347
348 if get_optional_param_to_check(self.acs.data_model) is not None:
349 return AcsReadMsgResult(True, None)
350 return AcsReadMsgResult(True, self.done_transition)
351
352 def state_description(self) -> str:
353 return 'Checking if some optional parameters exist in data model'
354
355
356class SendGetTransientParametersState(EnodebAcsState):
357 """
358 Periodically read eNodeB status. Note: keep frequency low to avoid
359 backing up large numbers of read operations if enodebd is busy.
360 Some eNB parameters are read only and updated by the eNB itself.
361 """
362 PARAMETERS = [
363 ParameterName.OP_STATE,
364 ParameterName.RF_TX_STATUS,
365 ParameterName.GPS_STATUS,
366 ParameterName.PTP_STATUS,
367 ParameterName.MME_STATUS,
368 ParameterName.GPS_LAT,
369 ParameterName.GPS_LONG,
370 ]
371
372 def __init__(self, acs: EnodebAcsStateMachine, when_done: str):
373 super().__init__()
374 self.acs = acs
375 self.done_transition = when_done
376
377 def read_msg(self, message: Any) -> AcsReadMsgResult:
378 if not isinstance(message, models.DummyInput):
379 return AcsReadMsgResult(False, None)
380 return AcsReadMsgResult(True, None)
381
382 def get_msg(self, message: Any) -> AcsMsgAndTransition:
383 request = models.GetParameterValues()
384 request.ParameterNames = models.ParameterNames()
385 request.ParameterNames.string = []
386 for name in self.PARAMETERS:
387 # Not all data models have these parameters
388 if self.acs.data_model.is_parameter_present(name):
389 path = self.acs.data_model.get_parameter(name).path
390 request.ParameterNames.string.append(path)
391 request.ParameterNames.arrayType = \
392 'xsd:string[%d]' % len(request.ParameterNames.string)
393
394 return AcsMsgAndTransition(request, self.done_transition)
395
396 def state_description(self) -> str:
397 return 'Getting transient read-only parameters'
398
399
400class WaitGetTransientParametersState(EnodebAcsState):
401 """
402 Periodically read eNodeB status. Note: keep frequency low to avoid
403 backing up large numbers of read operations if enodebd is busy
404 """
405
406 def __init__(
407 self,
408 acs: EnodebAcsStateMachine,
409 when_get: str,
410 when_get_obj_params: str,
411 when_delete: str,
412 when_add: str,
413 when_set: str,
414 when_skip: str,
415 ):
416 super().__init__()
417 self.acs = acs
418 self.done_transition = when_get
419 self.get_obj_params_transition = when_get_obj_params
420 self.rm_obj_transition = when_delete
421 self.add_obj_transition = when_add
422 self.set_transition = when_set
423 self.skip_transition = when_skip
424
425 def read_msg(self, message: Any) -> AcsReadMsgResult:
426 if not isinstance(message, models.GetParameterValuesResponse):
427 return AcsReadMsgResult(False, None)
428 # Current values of the fetched parameters
429 name_to_val = parse_get_parameter_values_response(
430 self.acs.data_model,
431 message,
432 )
433 logger.debug('Fetched Transient Params: %s', str(name_to_val))
434
435 # Update device configuration
436 for name in name_to_val:
437 magma_val = \
438 self.acs.data_model.transform_for_magma(
439 name,
440 name_to_val[name],
441 )
442 self.acs.device_cfg.set_parameter(name, magma_val)
443
444 return AcsReadMsgResult(True, self.get_next_state())
445
446 def get_next_state(self) -> str:
447 should_get_params = \
448 len(
449 get_params_to_get(
450 self.acs.device_cfg,
451 self.acs.data_model,
452 ),
453 ) > 0
454 if should_get_params:
455 return self.done_transition
456 should_get_obj_params = \
457 len(
458 get_object_params_to_get(
459 self.acs.desired_cfg,
460 self.acs.device_cfg,
461 self.acs.data_model,
462 ),
463 ) > 0
464 if should_get_obj_params:
465 return self.get_obj_params_transition
466 elif len(
467 get_all_objects_to_delete(
468 self.acs.desired_cfg,
469 self.acs.device_cfg,
470 ),
471 ) > 0:
472 return self.rm_obj_transition
473 elif len(
474 get_all_objects_to_add(
475 self.acs.desired_cfg,
476 self.acs.device_cfg,
477 ),
478 ) > 0:
479 return self.add_obj_transition
480 return self.skip_transition
481
482 def state_description(self) -> str:
483 return 'Getting transient read-only parameters'
484
485
486class GetParametersState(EnodebAcsState):
487 """
488 Get the value of most parameters of the eNB that are defined in the data
489 model. Object parameters are excluded.
490 """
491
492 def __init__(
493 self,
494 acs: EnodebAcsStateMachine,
495 when_done: str,
496 request_all_params: bool = False,
497 ):
498 super().__init__()
499 self.acs = acs
500 self.done_transition = when_done
501 # Set to True if we want to request values of all parameters, even if
502 # the ACS state machine already has recorded values of them.
503 self.request_all_params = request_all_params
504
505 def read_msg(self, message: Any) -> AcsReadMsgResult:
506 """
507 It's expected that we transition into this state right after receiving
508 an Inform message and replying with an InformResponse. At that point,
509 the eNB sends an empty HTTP request (aka DummyInput) to initiate the
510 rest of the provisioning process
511 """
512 if not isinstance(message, models.DummyInput):
513 return AcsReadMsgResult(False, None)
514 return AcsReadMsgResult(True, None)
515
516 def get_msg(self, message: Any) -> AcsMsgAndTransition:
517 """
518 Respond with GetParameterValuesRequest
519
520 Get the values of all parameters defined in the data model.
521 Also check which addable objects are present, and what the values of
522 parameters for those objects are.
523 """
524
525 # Get the names of regular parameters
526 names = get_params_to_get(
527 self.acs.device_cfg, self.acs.data_model,
528 self.request_all_params,
529 )
530
531 # Generate the request
532 request = models.GetParameterValues()
533 request.ParameterNames = models.ParameterNames()
534 request.ParameterNames.arrayType = 'xsd:string[%d]' \
535 % len(names)
536 request.ParameterNames.string = []
537 for name in names:
538 path = self.acs.data_model.get_parameter(name).path
539 if path is not InvalidTrParamPath:
540 # Only get data elements backed by tr69 path
541 request.ParameterNames.string.append(path)
542
543 return AcsMsgAndTransition(request, self.done_transition)
544
545 def state_description(self) -> str:
546 return 'Getting non-object parameters'
547
548
549class WaitGetParametersState(EnodebAcsState):
550 def __init__(self, acs: EnodebAcsStateMachine, when_done: str):
551 super().__init__()
552 self.acs = acs
553 self.done_transition = when_done
554
555 def read_msg(self, message: Any) -> AcsReadMsgResult:
556 """ Process GetParameterValuesResponse """
557 if not isinstance(message, models.GetParameterValuesResponse):
558 return AcsReadMsgResult(False, None)
559 name_to_val = parse_get_parameter_values_response(
560 self.acs.data_model,
561 message,
562 )
563 logger.debug('Received CPE parameter values: %s', str(name_to_val))
564 for name, val in name_to_val.items():
565 magma_val = self.acs.data_model.transform_for_magma(name, val)
566 self.acs.device_cfg.set_parameter(name, magma_val)
567 return AcsReadMsgResult(True, self.done_transition)
568
569 def state_description(self) -> str:
570 return 'Getting non-object parameters'
571
572
573class GetObjectParametersState(EnodebAcsState):
574 def __init__(self, acs: EnodebAcsStateMachine, when_done: str):
575 super().__init__()
576 self.acs = acs
577 self.done_transition = when_done
578
579 def get_msg(self, message: Any) -> AcsMsgAndTransition:
580 """ Respond with GetParameterValuesRequest """
581 names = get_object_params_to_get(
582 self.acs.desired_cfg,
583 self.acs.device_cfg,
584 self.acs.data_model,
585 )
586
587 # Generate the request
588 request = models.GetParameterValues()
589 request.ParameterNames = models.ParameterNames()
590 request.ParameterNames.arrayType = 'xsd:string[%d]' \
591 % len(names)
592 request.ParameterNames.string = []
593 for name in names:
594 path = self.acs.data_model.get_parameter(name).path
595 request.ParameterNames.string.append(path)
596
597 return AcsMsgAndTransition(request, self.done_transition)
598
599 def state_description(self) -> str:
600 return 'Getting object parameters'
601
602
603class WaitGetObjectParametersState(EnodebAcsState):
604 def __init__(
605 self,
606 acs: EnodebAcsStateMachine,
607 when_delete: str,
608 when_add: str,
609 when_set: str,
610 when_skip: str,
611 ):
612 super().__init__()
613 self.acs = acs
614 self.rm_obj_transition = when_delete
615 self.add_obj_transition = when_add
616 self.set_params_transition = when_set
617 self.skip_transition = when_skip
618
619 def read_msg(self, message: Any) -> AcsReadMsgResult:
620 """ Process GetParameterValuesResponse """
621 if not isinstance(message, models.GetParameterValuesResponse):
622 return AcsReadMsgResult(False, None)
623
624 path_to_val = {}
625 if hasattr(message.ParameterList, 'ParameterValueStruct') and \
626 message.ParameterList.ParameterValueStruct is not None:
627 for param_value_struct in message.ParameterList.ParameterValueStruct:
628 path_to_val[param_value_struct.Name] = \
629 param_value_struct.Value.Data
630 logger.debug('Received object parameters: %s', str(path_to_val))
631
632 # Number of PLMN objects reported can be incorrect. Let's count them
633 num_plmns = 0
634 obj_to_params = self.acs.data_model.get_numbered_param_names()
635 while True:
636 obj_name = ParameterName.PLMN_N % (num_plmns + 1)
637 if obj_name not in obj_to_params or len(obj_to_params[obj_name]) == 0:
638 logger.warning(
639 "eNB has PLMN %s but not defined in model",
640 obj_name,
641 )
642 break
643 param_name_list = obj_to_params[obj_name]
644 obj_path = self.acs.data_model.get_parameter(param_name_list[0]).path
645 if obj_path not in path_to_val:
646 break
647 if not self.acs.device_cfg.has_object(obj_name):
648 self.acs.device_cfg.add_object(obj_name)
649 num_plmns += 1
650 for name in param_name_list:
651 path = self.acs.data_model.get_parameter(name).path
652 value = path_to_val[path]
653 magma_val = \
654 self.acs.data_model.transform_for_magma(name, value)
655 self.acs.device_cfg.set_parameter_for_object(
656 name, magma_val,
657 obj_name,
658 )
659 num_plmns_reported = \
660 int(self.acs.device_cfg.get_parameter(ParameterName.NUM_PLMNS))
661 if num_plmns != num_plmns_reported:
662 logger.warning(
663 "eNB reported %d PLMNs but found %d",
664 num_plmns_reported, num_plmns,
665 )
666 self.acs.device_cfg.set_parameter(
667 ParameterName.NUM_PLMNS,
668 num_plmns,
669 )
670
671 # Now we can have the desired state
672 if self.acs.desired_cfg is None:
673 self.acs.desired_cfg = build_desired_config(
674 self.acs.mconfig,
675 self.acs.service_config,
676 self.acs.device_cfg,
677 self.acs.data_model,
678 self.acs.config_postprocessor,
679 )
680
681 if len(
682 get_all_objects_to_delete(
683 self.acs.desired_cfg,
684 self.acs.device_cfg,
685 ),
686 ) > 0:
687 return AcsReadMsgResult(True, self.rm_obj_transition)
688 elif len(
689 get_all_objects_to_add(
690 self.acs.desired_cfg,
691 self.acs.device_cfg,
692 ),
693 ) > 0:
694 return AcsReadMsgResult(True, self.add_obj_transition)
695 elif len(
696 get_all_param_values_to_set(
697 self.acs.desired_cfg,
698 self.acs.device_cfg,
699 self.acs.data_model,
700 ),
701 ) > 0:
702 return AcsReadMsgResult(True, self.set_params_transition)
703 return AcsReadMsgResult(True, self.skip_transition)
704
705 def state_description(self) -> str:
706 return 'Getting object parameters'
707
708
709class DeleteObjectsState(EnodebAcsState):
710 def __init__(
711 self,
712 acs: EnodebAcsStateMachine,
713 when_add: str,
714 when_skip: str,
715 ):
716 super().__init__()
717 self.acs = acs
718 self.deleted_param = None
719 self.add_obj_transition = when_add
720 self.skip_transition = when_skip
721
722 def get_msg(self, message: Any) -> AcsMsgAndTransition:
723 """
724 Send DeleteObject message to TR-069 and poll for response(s).
725 Input:
726 - Object name (string)
727 """
728 request = models.DeleteObject()
729 self.deleted_param = get_all_objects_to_delete(
730 self.acs.desired_cfg,
731 self.acs.device_cfg,
732 )[0]
733 request.ObjectName = \
734 self.acs.data_model.get_parameter(self.deleted_param).path
735 return AcsMsgAndTransition(request, None)
736
737 def read_msg(self, message: Any) -> AcsReadMsgResult:
738 """
739 Send DeleteObject message to TR-069 and poll for response(s).
740 Input:
741 - Object name (string)
742 """
743 if type(message) == models.DeleteObjectResponse:
744 if message.Status != 0:
745 raise Tr069Error(
746 'Received DeleteObjectResponse with '
747 'Status=%d' % message.Status,
748 )
749 elif type(message) == models.Fault:
750 raise Tr069Error(
751 'Received Fault in response to DeleteObject '
752 '(faultstring = %s)' % message.FaultString,
753 )
754 else:
755 return AcsReadMsgResult(False, None)
756
757 self.acs.device_cfg.delete_object(self.deleted_param)
758 obj_list_to_delete = get_all_objects_to_delete(
759 self.acs.desired_cfg,
760 self.acs.device_cfg,
761 )
762 if len(obj_list_to_delete) > 0:
763 return AcsReadMsgResult(True, None)
764 if len(
765 get_all_objects_to_add(
766 self.acs.desired_cfg,
767 self.acs.device_cfg,
768 ),
769 ) == 0:
770 return AcsReadMsgResult(True, self.skip_transition)
771 return AcsReadMsgResult(True, self.add_obj_transition)
772
773 def state_description(self) -> str:
774 return 'Deleting objects'
775
776
777class AddObjectsState(EnodebAcsState):
778 def __init__(self, acs: EnodebAcsStateMachine, when_done: str):
779 super().__init__()
780 self.acs = acs
781 self.done_transition = when_done
782 self.added_param = None
783
784 def get_msg(self, message: Any) -> AcsMsgAndTransition:
785 request = models.AddObject()
786 self.added_param = get_all_objects_to_add(
787 self.acs.desired_cfg,
788 self.acs.device_cfg,
789 )[0]
790 desired_param = self.acs.data_model.get_parameter(self.added_param)
791 desired_path = desired_param.path
792 path_parts = desired_path.split('.')
793 # If adding enumerated object, ie. XX.N. we should add it to the
794 # parent object XX. so strip the index
795 if len(path_parts) > 2 and \
796 path_parts[-1] == '' and path_parts[-2].isnumeric():
797 logger.debug('Stripping index from path=%s', desired_path)
798 desired_path = '.'.join(path_parts[:-2]) + '.'
799 request.ObjectName = desired_path
800 return AcsMsgAndTransition(request, None)
801
802 def read_msg(self, message: Any) -> AcsReadMsgResult:
803 if type(message) == models.AddObjectResponse:
804 if message.Status != 0:
805 raise Tr069Error(
806 'Received AddObjectResponse with '
807 'Status=%d' % message.Status,
808 )
809 elif type(message) == models.Fault:
810 raise Tr069Error(
811 'Received Fault in response to AddObject '
812 '(faultstring = %s)' % message.FaultString,
813 )
814 else:
815 return AcsReadMsgResult(False, None)
816 instance_n = message.InstanceNumber
817 self.acs.device_cfg.add_object(self.added_param % instance_n)
818 obj_list_to_add = get_all_objects_to_add(
819 self.acs.desired_cfg,
820 self.acs.device_cfg,
821 )
822 if len(obj_list_to_add) > 0:
823 return AcsReadMsgResult(True, None)
824 return AcsReadMsgResult(True, self.done_transition)
825
826 def state_description(self) -> str:
827 return 'Adding objects'
828
829
830class SetParameterValuesState(EnodebAcsState):
831 def __init__(self, acs: EnodebAcsStateMachine, when_done: str):
832 super().__init__()
833 self.acs = acs
834 self.done_transition = when_done
835
836 def get_msg(self, message: Any) -> AcsMsgAndTransition:
837 request = models.SetParameterValues()
838 request.ParameterList = models.ParameterValueList()
839 param_values = get_all_param_values_to_set(
840 self.acs.desired_cfg,
841 self.acs.device_cfg,
842 self.acs.data_model,
843 )
844 request.ParameterList.arrayType = 'cwmp:ParameterValueStruct[%d]' \
845 % len(param_values)
846 request.ParameterList.ParameterValueStruct = []
847 logger.debug(
848 'Sending TR069 request to set CPE parameter values: %s',
849 str(param_values),
850 )
851 # TODO: Match key response when we support having multiple outstanding
852 # calls.
853 if self.acs.has_version_key:
854 request.ParameterKey = models.ParameterKeyType()
855 request.ParameterKey.Data =\
856 "SetParameter-{:10.0f}".format(self.acs.parameter_version_key)
857 request.ParameterKey.type = 'xsd:string'
858
859 for name, value in param_values.items():
860 param_info = self.acs.data_model.get_parameter(name)
861 type_ = param_info.type
862 name_value = models.ParameterValueStruct()
863 name_value.Value = models.anySimpleType()
864 name_value.Name = param_info.path
865 enb_value = self.acs.data_model.transform_for_enb(name, value)
866 if type_ in ('int', 'unsignedInt'):
867 name_value.Value.type = 'xsd:%s' % type_
868 name_value.Value.Data = str(enb_value)
869 elif type_ == 'boolean':
870 # Boolean values have integral representations in spec
871 name_value.Value.type = 'xsd:boolean'
872 name_value.Value.Data = str(int(enb_value))
873 elif type_ == 'string':
874 name_value.Value.type = 'xsd:string'
875 name_value.Value.Data = str(enb_value)
876 else:
877 raise Tr069Error(
878 'Unsupported type for %s: %s' %
879 (name, type_),
880 )
881 if param_info.is_invasive:
882 self.acs.are_invasive_changes_applied = False
883 request.ParameterList.ParameterValueStruct.append(name_value)
884
885 return AcsMsgAndTransition(request, self.done_transition)
886
887 def state_description(self) -> str:
888 return 'Setting parameter values'
889
890
891class SetParameterValuesNotAdminState(EnodebAcsState):
892 def __init__(self, acs: EnodebAcsStateMachine, when_done: str):
893 super().__init__()
894 self.acs = acs
895 self.done_transition = when_done
896
897 def get_msg(self, message: Any) -> AcsMsgAndTransition:
898 request = models.SetParameterValues()
899 request.ParameterList = models.ParameterValueList()
900 param_values = get_all_param_values_to_set(
901 self.acs.desired_cfg,
902 self.acs.device_cfg,
903 self.acs.data_model,
904 exclude_admin=True,
905 )
906 request.ParameterList.arrayType = 'cwmp:ParameterValueStruct[%d]' \
907 % len(param_values)
908 request.ParameterList.ParameterValueStruct = []
909 logger.debug(
910 'Sending TR069 request to set CPE parameter values: %s',
911 str(param_values),
912 )
913 for name, value in param_values.items():
914 param_info = self.acs.data_model.get_parameter(name)
915 type_ = param_info.type
916 name_value = models.ParameterValueStruct()
917 name_value.Value = models.anySimpleType()
918 name_value.Name = param_info.path
919 enb_value = self.acs.data_model.transform_for_enb(name, value)
920 if type_ in ('int', 'unsignedInt'):
921 name_value.Value.type = 'xsd:%s' % type_
922 name_value.Value.Data = str(enb_value)
923 elif type_ == 'boolean':
924 # Boolean values have integral representations in spec
925 name_value.Value.type = 'xsd:boolean'
926 name_value.Value.Data = str(int(enb_value))
927 elif type_ == 'string':
928 name_value.Value.type = 'xsd:string'
929 name_value.Value.Data = str(enb_value)
930 else:
931 raise Tr069Error(
932 'Unsupported type for %s: %s' %
933 (name, type_),
934 )
935 if param_info.is_invasive:
936 self.acs.are_invasive_changes_applied = False
937 request.ParameterList.ParameterValueStruct.append(name_value)
938
939 return AcsMsgAndTransition(request, self.done_transition)
940
941 def state_description(self) -> str:
942 return 'Setting parameter values excluding Admin Enable'
943
944
945class WaitSetParameterValuesState(EnodebAcsState):
946 def __init__(
947 self,
948 acs: EnodebAcsStateMachine,
949 when_done: str,
950 when_apply_invasive: str,
951 status_non_zero_allowed: bool = False,
952 ):
953 super().__init__()
954 self.acs = acs
955 self.done_transition = when_done
956 self.apply_invasive_transition = when_apply_invasive
957 # Set Params can legally return zero and non zero status
958 # Per tr-196, if there are errors the method should return a fault.
959 # Make flag optional to compensate for existing radios returning non
960 # zero on error.
961 self.status_non_zero_allowed = status_non_zero_allowed
962
963 def read_msg(self, message: Any) -> AcsReadMsgResult:
964 if type(message) == models.SetParameterValuesResponse:
965 if not self.status_non_zero_allowed:
966 if message.Status != 0:
967 raise Tr069Error(
968 'Received SetParameterValuesResponse with '
969 'Status=%d' % message.Status,
970 )
971 self._mark_as_configured()
Wei-Yu Chen678f0a52021-12-21 13:50:52 +0800972
973 metrics.STAT_ENODEB_LAST_CONFIGURED.labels(
974 serial_number=self.acs.device_cfg.get_parameter("Serial number"),
975 ip_address=self.acs.device_cfg.get_parameter("ip_address"),
976 gps_lat=self.acs.device_cfg.get_parameter("GPS lat"),
977 gps_lon=self.acs.device_cfg.get_parameter("GPS long")
978 ).set(int(time.time()))
979
Wei-Yu Chen49950b92021-11-08 19:19:18 +0800980 if not self.acs.are_invasive_changes_applied:
981 return AcsReadMsgResult(True, self.apply_invasive_transition)
982 return AcsReadMsgResult(True, self.done_transition)
983 elif type(message) == models.Fault:
984 logger.error(
985 'Received Fault in response to SetParameterValues, '
986 'Code (%s), Message (%s)', message.FaultCode,
987 message.FaultString,
988 )
989 if message.SetParameterValuesFault is not None:
990 for fault in message.SetParameterValuesFault:
991 logger.error(
992 'SetParameterValuesFault Param: %s, '
993 'Code: %s, String: %s', fault.ParameterName,
994 fault.FaultCode, fault.FaultString,
995 )
996 return AcsReadMsgResult(False, None)
997
998 def _mark_as_configured(self) -> None:
999 """
1000 A successful attempt at setting parameter values means that we need to
1001 update what we think the eNB's configuration is to match what we just
1002 set the parameter values to.
1003 """
1004 # Values of parameters
1005 name_to_val = get_param_values_to_set(
1006 self.acs.desired_cfg,
1007 self.acs.device_cfg,
1008 self.acs.data_model,
1009 )
1010 for name, val in name_to_val.items():
1011 magma_val = self.acs.data_model.transform_for_magma(name, val)
1012 self.acs.device_cfg.set_parameter(name, magma_val)
1013
1014 # Values of object parameters
1015 obj_to_name_to_val = get_obj_param_values_to_set(
1016 self.acs.desired_cfg,
1017 self.acs.device_cfg,
1018 self.acs.data_model,
1019 )
1020 for obj_name, name_to_val in obj_to_name_to_val.items():
1021 for name, val in name_to_val.items():
1022 logger.debug(
1023 'Set obj: %s, name: %s, val: %s', str(obj_name),
1024 str(name), str(val),
1025 )
1026 magma_val = self.acs.data_model.transform_for_magma(name, val)
1027 self.acs.device_cfg.set_parameter_for_object(
1028 name, magma_val,
1029 obj_name,
1030 )
1031 logger.info('Successfully configured CPE parameters!')
1032
1033 def state_description(self) -> str:
1034 return 'Setting parameter values'
1035
1036
1037class EndSessionState(EnodebAcsState):
1038 """ To end a TR-069 session, send an empty HTTP response """
1039
1040 def __init__(self, acs: EnodebAcsStateMachine):
1041 super().__init__()
1042 self.acs = acs
1043
1044 def read_msg(self, message: Any) -> AcsReadMsgResult:
1045 """
1046 No message is expected after enodebd sends the eNodeB
1047 an empty HTTP response.
1048
1049 If a device sends an empty HTTP request, we can just
1050 ignore it and send another empty response.
1051 """
1052 if isinstance(message, models.DummyInput):
1053 return AcsReadMsgResult(True, None)
1054 return AcsReadMsgResult(False, None)
1055
1056 def get_msg(self, message: Any) -> AcsMsgAndTransition:
1057 request = models.DummyInput()
1058 return AcsMsgAndTransition(request, None)
1059
1060 def state_description(self) -> str:
1061 return 'Completed provisioning eNB. Awaiting new Inform.'
1062
1063
1064class EnbSendRebootState(EnodebAcsState):
1065 def __init__(self, acs: EnodebAcsStateMachine, when_done: str):
1066 super().__init__()
1067 self.acs = acs
1068 self.done_transition = when_done
1069 self.prev_msg_was_inform = False
1070
1071 def read_msg(self, message: Any) -> AcsReadMsgResult:
1072 """
1073 This state can be transitioned into through user command.
1074 All messages received by enodebd will be ignored in this state.
1075 """
1076 if self.prev_msg_was_inform \
1077 and not isinstance(message, models.DummyInput):
1078 return AcsReadMsgResult(False, None)
1079 elif isinstance(message, models.Inform):
1080 self.prev_msg_was_inform = True
1081 process_inform_message(
1082 message, self.acs.data_model,
1083 self.acs.device_cfg,
1084 )
1085 return AcsReadMsgResult(True, None)
1086 self.prev_msg_was_inform = False
1087 return AcsReadMsgResult(True, None)
1088
1089 def get_msg(self, message: Any) -> AcsMsgAndTransition:
1090 if self.prev_msg_was_inform:
1091 response = models.InformResponse()
1092 # Set maxEnvelopes to 1, as per TR-069 spec
1093 response.MaxEnvelopes = 1
1094 return AcsMsgAndTransition(response, None)
1095 logger.info('Sending reboot request to eNB')
1096 request = models.Reboot()
1097 request.CommandKey = ''
1098 self.acs.are_invasive_changes_applied = True
1099 return AcsMsgAndTransition(request, self.done_transition)
1100
1101 def state_description(self) -> str:
1102 return 'Rebooting eNB'
1103
1104
1105class SendRebootState(EnodebAcsState):
1106 def __init__(self, acs: EnodebAcsStateMachine, when_done: str):
1107 super().__init__()
1108 self.acs = acs
1109 self.done_transition = when_done
1110 self.prev_msg_was_inform = False
1111
1112 def read_msg(self, message: Any) -> AcsReadMsgResult:
1113 """
1114 This state can be transitioned into through user command.
1115 All messages received by enodebd will be ignored in this state.
1116 """
1117 if self.prev_msg_was_inform \
1118 and not isinstance(message, models.DummyInput):
1119 return AcsReadMsgResult(False, None)
1120 elif isinstance(message, models.Inform):
1121 self.prev_msg_was_inform = True
1122 process_inform_message(
1123 message, self.acs.data_model,
1124 self.acs.device_cfg,
1125 )
1126 return AcsReadMsgResult(True, None)
1127 self.prev_msg_was_inform = False
1128 return AcsReadMsgResult(True, None)
1129
1130 def get_msg(self, message: Any) -> AcsMsgAndTransition:
1131 if self.prev_msg_was_inform:
1132 response = models.InformResponse()
1133 # Set maxEnvelopes to 1, as per TR-069 spec
1134 response.MaxEnvelopes = 1
1135 return AcsMsgAndTransition(response, None)
1136 logger.info('Sending reboot request to eNB')
1137 request = models.Reboot()
1138 request.CommandKey = ''
1139 return AcsMsgAndTransition(request, self.done_transition)
1140
1141 def state_description(self) -> str:
1142 return 'Rebooting eNB'
1143
1144
1145class WaitRebootResponseState(EnodebAcsState):
1146 def __init__(self, acs: EnodebAcsStateMachine, when_done: str):
1147 super().__init__()
1148 self.acs = acs
1149 self.done_transition = when_done
1150
1151 def read_msg(self, message: Any) -> AcsReadMsgResult:
1152 if not isinstance(message, models.RebootResponse):
1153 return AcsReadMsgResult(False, None)
1154 return AcsReadMsgResult(True, None)
1155
1156 def get_msg(self, message: Any) -> AcsMsgAndTransition:
1157 """ Reply with empty message """
1158 return AcsMsgAndTransition(models.DummyInput(), self.done_transition)
1159
1160 def state_description(self) -> str:
1161 return 'Rebooting eNB'
1162
1163
1164class WaitInformMRebootState(EnodebAcsState):
1165 """
1166 After sending a reboot request, we expect an Inform request with a
1167 specific 'inform event code'
1168 """
1169
1170 # Time to wait for eNodeB reboot. The measured time
1171 # (on BaiCells indoor eNodeB)
1172 # is ~110secs, so add healthy padding on top of this.
1173 REBOOT_TIMEOUT = 300 # In seconds
1174 # We expect that the Inform we receive tells us the eNB has rebooted
1175 INFORM_EVENT_CODE = 'M Reboot'
1176
1177 def __init__(
1178 self,
1179 acs: EnodebAcsStateMachine,
1180 when_done: str,
1181 when_timeout: str,
1182 ):
1183 super().__init__()
1184 self.acs = acs
1185 self.done_transition = when_done
1186 self.timeout_transition = when_timeout
1187 self.timeout_timer = None
1188 self.timer_handle = None
1189
1190 def enter(self):
1191 self.timeout_timer = StateMachineTimer(self.REBOOT_TIMEOUT)
1192
1193 def check_timer() -> None:
1194 if self.timeout_timer.is_done():
1195 self.acs.transition(self.timeout_transition)
1196 raise Tr069Error(
1197 'Did not receive Inform response after '
1198 'rebooting',
1199 )
1200
1201 self.timer_handle = \
1202 self.acs.event_loop.call_later(
1203 self.REBOOT_TIMEOUT,
1204 check_timer,
1205 )
1206
1207 def exit(self):
1208 self.timer_handle.cancel()
1209 self.timeout_timer = None
1210
1211 def read_msg(self, message: Any) -> AcsReadMsgResult:
1212 if not isinstance(message, models.Inform):
1213 return AcsReadMsgResult(False, None)
1214 if not does_inform_have_event(message, self.INFORM_EVENT_CODE):
1215 raise Tr069Error(
1216 'Did not receive M Reboot event code in '
1217 'Inform',
1218 )
1219 process_inform_message(
1220 message, self.acs.data_model,
1221 self.acs.device_cfg,
1222 )
1223 return AcsReadMsgResult(True, self.done_transition)
1224
1225 def state_description(self) -> str:
1226 return 'Waiting for M Reboot code from Inform'
1227
1228
1229class WaitRebootDelayState(EnodebAcsState):
1230 """
1231 After receiving the Inform notifying us that the eNodeB has successfully
1232 rebooted, wait a short duration to prevent unspecified race conditions
1233 that may occur w.r.t reboot
1234 """
1235
1236 # Short delay timer to prevent race conditions w.r.t. reboot
1237 SHORT_CONFIG_DELAY = 10
1238
1239 def __init__(self, acs: EnodebAcsStateMachine, when_done: str):
1240 super().__init__()
1241 self.acs = acs
1242 self.done_transition = when_done
1243 self.config_timer = None
1244 self.timer_handle = None
1245
1246 def enter(self):
1247 self.config_timer = StateMachineTimer(self.SHORT_CONFIG_DELAY)
1248
1249 def check_timer() -> None:
1250 if self.config_timer.is_done():
1251 self.acs.transition(self.done_transition)
1252
1253 self.timer_handle = \
1254 self.acs.event_loop.call_later(
1255 self.SHORT_CONFIG_DELAY,
1256 check_timer,
1257 )
1258
1259 def exit(self):
1260 self.timer_handle.cancel()
1261 self.config_timer = None
1262
1263 def read_msg(self, message: Any) -> AcsReadMsgResult:
1264 return AcsReadMsgResult(True, None)
1265
1266 def get_msg(self, message: Any) -> AcsMsgAndTransition:
1267 return AcsMsgAndTransition(models.DummyInput(), None)
1268
1269 def state_description(self) -> str:
1270 return 'Waiting after eNB reboot to prevent race conditions'
1271
1272
Wei-Yu Chen678f0a52021-12-21 13:50:52 +08001273class DownloadState(EnodebAcsState):
1274 """
1275 The eNB handler will enter this state when firmware version is older than desired version.
1276 """
1277
1278 def __init__(self, acs: EnodebAcsStateMachine, when_done: str):
1279 super().__init__()
1280 self.acs = acs
1281 self.done_transition = when_done
1282
1283 def get_msg(self, message: Any) -> AcsMsgAndTransition:
1284
1285 print("ACS Device CFG")
1286 print(self.acs.device_cfg._param_to_value)
1287
1288 request = models.Download()
1289 request.CommandKey = "20220206215200"
1290 request.FileType = "1 Firmware Upgrade Image"
1291 request.URL = "http://18.116.99.179/firmware/Qproject_TEST3918_2102241222.ffw"
1292 request.Username = ""
1293 request.Password = ""
1294 request.FileSize = 57208579
1295 request.TargetFileName = "Qproject_TEST3918_2102241222.ffw"
1296 request.DelaySeconds = 0
1297 request.SuccessURL = ""
1298 request.FailureURL = ""
1299 return AcsMsgAndTransition(request, self.done_transition)
1300
1301 def state_description(self) -> str:
1302 return 'Upgrade the firmware the desired version'
1303
1304class WaitDownloadResponseState(EnodebAcsState):
1305 """
1306 The eNB handler will enter this state after the Download command sent.
1307 """
1308
1309 def __init__(self, acs: EnodebAcsStateMachine, when_done: str):
1310 super().__init__()
1311 self.acs = acs
1312 self.done_transition = when_done
1313
1314 def read_msg(self, message: Any) -> AcsReadMsgResult:
1315 if not isinstance(message, models.DownloadResponse):
1316 return AcsReadMsgResult(False, None)
1317 return AcsReadMsgResult(True, None)
1318
1319 def get_msg(self, message: Any) -> AcsMsgAndTransition:
1320 """ Reply with empty message """
1321 logger.info("Received Download Response from eNodeB")
1322 return AcsMsgAndTransition(models.DummyInput(), self.done_transition)
1323
1324 def state_description(self) -> str:
1325 return "Wait DownloadResponse message"
1326
1327class WaitInformTransferCompleteState(EnodebAcsState):
1328 """
1329 The eNB handler will enter this state after firmware upgraded and rebooted
1330 """
1331
1332 REBOOT_TIMEOUT = 300 # In seconds
1333 INFORM_EVENT_CODE = "7 TRANSFER COMPLETE"
1334 PREIODIC_EVENT_CODE = "2 PERIODIC"
1335
1336 def __init__(self, acs: EnodebAcsStateMachine, when_done: str, when_periodic: str, when_timeout: str):
1337 super().__init__()
1338 self.acs = acs
1339 self.done_transition = when_done
1340 self.periodic_update_transition = when_periodic
1341 self.timeout_transition = when_timeout
1342 self.timeout_timer = None
1343 self.timer_handle = None
1344
1345 def enter(self):
1346 print("Get into the TransferComplete State")
1347 self.timeout_timer = StateMachineTimer(self.REBOOT_TIMEOUT)
1348
1349 def check_timer() -> None:
1350 if self.timeout_timer.is_done():
1351 self.acs.transition(self.timeout_transition)
1352 raise Tr069Error("Didn't receive Inform response after rebooting")
1353
1354 self.timer_handle = self.acs.event_loop.call_later(
1355 self.REBOOT_TIMEOUT,
1356 check_timer,
1357 )
1358
1359 def exit(self):
1360 self.timer_handle.cancel()
1361 self.timeout_timer = None
1362
1363 def get_msg(self, message: Any) -> AcsMsgAndTransition:
1364 return AcsMsgAndTransition(models.DummyInput(), None)
1365
1366 def read_msg(self, message: Any) -> AcsReadMsgResult:
1367 if not isinstance(message, models.Inform):
1368 return AcsReadMsgResult(False, None)
1369 if does_inform_have_event(message, self.PREIODIC_EVENT_CODE):
1370 logger.info("Receive Periodic update from enodeb")
1371 return AcsReadMsgResult(True, self.periodic_update_transition)
1372 if does_inform_have_event(message, self.INFORM_EVENT_CODE):
1373 logger.info("Receive Transfer complete")
1374 return AcsReadMsgResult(True, self.done_transition)
1375
1376 # Unhandled situation
1377 return AcsReadMsgResult(False, None)
1378
1379 def state_description(self) -> str:
1380 return "Wait DownloadResponse message"
1381
Wei-Yu Chen49950b92021-11-08 19:19:18 +08001382class ErrorState(EnodebAcsState):
1383 """
1384 The eNB handler will enter this state when an unhandled Fault is received.
1385
1386 If the inform_transition_target constructor parameter is non-null, this
1387 state will attempt to autoremediate by transitioning to the specified
1388 target state when an Inform is received.
1389 """
1390
1391 def __init__(
1392 self, acs: EnodebAcsStateMachine,
1393 inform_transition_target: Optional[str] = None,
1394 ):
1395 super().__init__()
1396 self.acs = acs
1397 self.inform_transition_target = inform_transition_target
1398
1399 def read_msg(self, message: Any) -> AcsReadMsgResult:
1400 return AcsReadMsgResult(True, None)
1401
1402 def get_msg(self, message: Any) -> AcsMsgAndTransition:
1403 if not self.inform_transition_target:
1404 return AcsMsgAndTransition(models.DummyInput(), None)
1405
1406 if isinstance(message, models.Inform):
1407 return AcsMsgAndTransition(
1408 models.DummyInput(),
1409 self.inform_transition_target,
1410 )
1411 return AcsMsgAndTransition(models.DummyInput(), None)
1412
1413 def state_description(self) -> str:
1414 return 'Error state - awaiting manual restart of enodebd service or ' \
Wei-Yu Chen678f0a52021-12-21 13:50:52 +08001415 'an Inform to be received from the eNB'