blob: bf9194a0d5125c4f648583de59225170b418b64c [file] [log] [blame]
Wei-Yu Chenad55cb82022-02-15 20:07:01 +08001# SPDX-FileCopyrightText: 2020 The Magma Authors.
2# SPDX-FileCopyrightText: 2022 Open Networking Foundation <support@opennetworking.org>
3#
4# SPDX-License-Identifier: BSD-3-Clause
Wei-Yu Chen49950b92021-11-08 19:19:18 +08005
6from typing import Any as TAny
7from typing import Dict
8
9from google.protobuf.internal.well_known_types import Any
10from configuration import service_configs
11from configuration.exceptions import LoadConfigError
12
13
14def filter_configs_by_key(configs_by_key: Dict[str, TAny]) -> Dict[str, TAny]:
15 """
16 Given a JSON-deserialized map of mconfig protobuf Any's keyed by service
17 name, filter out any entires without a corresponding service or which have
18 values that aren't registered in the protobuf symbol database yet.
19
20 Args:
21 configs_by_key:
22 JSON-deserialized service mconfigs keyed by service name
23
24 Returns:
25 The input map without any services which currently don't exist.
26 """
27 magmad_cfg = service_configs.load_service_config('magmad')
28 services = magmad_cfg.get('magma_services', [])
29 services.append('magmad')
30 services += magmad_cfg.get('registered_dynamic_services', [])
31 services = set(services)
32
33 filtered_configs_by_key = {}
34 for srv, cfg in configs_by_key.items():
35 if srv not in services:
36 continue
37 filtered_configs_by_key[srv] = cfg
38 return filtered_configs_by_key
39
40
41def unpack_mconfig_any(mconfig_any: Any, mconfig_struct: TAny) -> TAny:
42 """
43 Unpack a protobuf Any type into a given an empty protobuf message struct
44 for a service.
45
46 Args:
47 mconfig_any: protobuf Any type to unpack
48 mconfig_struct: protobuf message struct
49
50 Returns: Concrete protobuf object that the provided Any wraps
51 """
52 unpacked = mconfig_any.Unpack(mconfig_struct)
53 if not unpacked:
54 raise LoadConfigError(
55 'Cannot unpack Any type into message: %s' % mconfig_struct,
56 )
57 return mconfig_struct