blob: 39ffc0207849a14d3159f5578e4dd2e40108a3b3 [file] [log] [blame]
Wei-Yu Chen49950b92021-11-08 19:19:18 +08001#!/usr/bin/env python3
2
3"""
4Copyright 2020 The Magma Authors.
5
6This source code is licensed under the BSD-style license found in the
7LICENSE file in the root directory of this source tree.
8
9Unless required by applicable law or agreed to in writing, software
10distributed under the License is distributed on an "AS IS" BASIS,
11WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12See the License for the specific language governing permissions and
13limitations under the License.
14"""
15
16
17import asyncio
18import re
19import shlex
20import subprocess
21from typing import List
22
23from common.misc_utils import (
24 IpPreference,
25 get_if_ip_with_netmask,
26 get_ip_from_if,
27)
28from configuration.service_configs import load_service_config
29from logger import EnodebdLogger as logger
30
31IPTABLES_RULE_FMT = """sudo iptables -t nat
32 -{add} PREROUTING
33 -d {public_ip}
34 -p tcp
35 --dport {port}
36 -j DNAT --to-destination {private_ip}"""
37
38EXPECTED_IP4 = ('192.168.60.142', '10.0.2.1')
39EXPECTED_MASK = '255.255.255.0'
40
41
42def get_iptables_rule(port, enodebd_public_ip, private_ip, add=True):
43 return IPTABLES_RULE_FMT.format(
44 add='A' if add else 'D',
45 public_ip=enodebd_public_ip,
46 port=port,
47 private_ip=private_ip,
48 )
49
50
51def does_iface_config_match_expected(ip: str, netmask: str) -> bool:
52 return ip in EXPECTED_IP4 and netmask == EXPECTED_MASK
53
54
55def _get_prerouting_rules(output: str) -> List[str]:
56 prerouting_rules = output.split('\n\n')[0]
57 prerouting_rules = prerouting_rules.split('\n')
58 # Skipping the first two lines since it contains only column names
59 prerouting_rules = prerouting_rules[2:]
60 return prerouting_rules
61
62
63async def check_and_apply_iptables_rules(
64 port: str,
65 enodebd_public_ip: str,
66 enodebd_ip: str,
67) -> None:
68 command = 'sudo iptables -t nat -L'
69 output = subprocess.run(command, shell=True, stdout=subprocess.PIPE, check=True)
70 command_output = output.stdout.decode('utf-8').strip()
71 prerouting_rules = _get_prerouting_rules(command_output)
72 if not prerouting_rules:
73 logger.info('Configuring Iptables rule')
74 await run(
75 get_iptables_rule(
76 port,
77 enodebd_public_ip,
78 enodebd_ip,
79 add=True,
80 ),
81 )
82 else:
83 # Checks each rule in PREROUTING Chain
84 check_rules(prerouting_rules, port, enodebd_public_ip, enodebd_ip)
85
86
87def check_rules(
88 prerouting_rules: List[str],
89 port: str,
90 enodebd_public_ip: str,
91 private_ip: str,
92) -> None:
93 unexpected_rules = []
94 pattern = r'DNAT\s+tcp\s+--\s+anywhere\s+{pub_ip}\s+tcp\s+dpt:{dport} to:{ip}'.format(
95 pub_ip=enodebd_public_ip,
96 dport=port,
97 ip=private_ip,
98 )
99 for rule in prerouting_rules:
100 match = re.search(pattern, rule)
101 if not match:
102 unexpected_rules.append(rule)
103 if unexpected_rules:
104 logger.warning('The following Prerouting rule(s) are unexpected')
105 for rule in unexpected_rules:
106 logger.warning(rule)
107
108
109async def run(cmd):
110 """Fork shell and run command NOTE: Popen is non-blocking"""
111 cmd = shlex.split(cmd)
112 proc = await asyncio.create_subprocess_shell(" ".join(cmd))
113 await proc.communicate()
114 if proc.returncode != 0:
115 # This can happen because the NAT prerouting rule didn't exist
116 logger.error(
117 'Possible error running async subprocess: %s exited with '
118 'return code [%d].', cmd, proc.returncode,
119 )
120 return proc.returncode
121
122
123async def set_enodebd_iptables_rule():
124 """
125 Remove & Set iptable rules for exposing public IP
126 for enobeb instead of private IP..
127 """
128 # Remove & Set iptable rules for exposing public ip
129 # for enobeb instead of private
130 cfg = load_service_config('enodebd')
131 port, interface = cfg['tr069']['port'], cfg['tr069']['interface']
132 enodebd_public_ip = cfg['tr069']['public_ip']
133 # IPv4 only as iptables only works for IPv4. TODO: Investigate ip6tables?
134 enodebd_ip = get_ip_from_if(interface, preference=IpPreference.IPV4_ONLY)
135 # Incoming data from 192.88.99.142 -> enodebd address (eg 192.168.60.142)
136 enodebd_netmask = get_if_ip_with_netmask(
137 interface,
138 preference=IpPreference.IPV4_ONLY,
139 )[1]
140 verify_config = does_iface_config_match_expected(
141 enodebd_ip,
142 enodebd_netmask,
143 )
144 if not verify_config:
145 logger.warning(
146 'The IP address of the %s interface is %s. The '
147 'expected IP addresses are %s',
148 interface, enodebd_ip, str(EXPECTED_IP4),
149 )
150 await check_and_apply_iptables_rules(
151 port,
152 enodebd_public_ip,
153 enodebd_ip,
154 )
155
156
157if __name__ == '__main__':
158 set_enodebd_iptables_rule()