blob: 4d8e5ec5f6cc629d706d6c0f09ffa94bd710d455 [file] [log] [blame]
Wei-Yu Chen49950b92021-11-08 19:19:18 +08001"""
2Copyright 2020 The Magma Authors.
3
4This source code is licensed under the BSD-style license found in the
5LICENSE file in the root directory of this source tree.
6
7Unless required by applicable law or agreed to in writing, software
8distributed under the License is distributed on an "AS IS" BASIS,
9WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10See the License for the specific language governing permissions and
11limitations under the License.
12"""
13
14import asyncio
15import ipaddress
16import os
17from enum import Enum
18
19import netifaces
20import snowflake
21
22
23class IpPreference(Enum):
24 IPV4_ONLY = 1
25 IPV4_PREFERRED = 2
26 IPV6_PREFERRED = 3
27 IPV6_ONLY = 4
28
29
30def get_if_ip_with_netmask(interface, preference=IpPreference.IPV4_PREFERRED):
31 """
32 Get IP address and netmask (in form /255.255.255.0)
33 from interface name and return as tuple (ip, netmask).
34 Note: If multiple v4/v6 addresses exist, the first is chosen
35
36 Raise ValueError if unable to get requested IP address.
37 """
38 # Raises ValueError if interface is unavailable
39 ip_addresses = netifaces.ifaddresses(interface)
40
41 try:
42 ipv4_address = (
43 ip_addresses[netifaces.AF_INET][0]['addr'],
44 ip_addresses[netifaces.AF_INET][0]['netmask'],
45 )
46 except KeyError:
47 ipv4_address = None
48
49 try:
50 ipv6_address = (
51 ip_addresses[netifaces.AF_INET6][0]["addr"].split("%")[0],
52 ip_addresses[netifaces.AF_INET6][0]["netmask"],
53 )
54
55 except KeyError:
56 ipv6_address = None
57
58 if preference == IpPreference.IPV4_ONLY:
59 if ipv4_address is not None:
60 return ipv4_address
61 else:
62 raise ValueError('Error getting IPv4 address for %s' % interface)
63
64 elif preference == IpPreference.IPV4_PREFERRED:
65 if ipv4_address is not None:
66 return ipv4_address
67 elif ipv6_address is not None:
68 return ipv6_address
69 else:
70 raise ValueError('Error getting IPv4/6 address for %s' % interface)
71
72 elif preference == IpPreference.IPV6_PREFERRED:
73 if ipv6_address is not None:
74 return ipv6_address
75 elif ipv4_address is not None:
76 return ipv4_address
77 else:
78 raise ValueError('Error getting IPv6/4 address for %s' % interface)
79
80 elif preference == IpPreference.IPV6_ONLY:
81 if ipv6_address is not None:
82 return ipv6_address
83 else:
84 raise ValueError('Error getting IPv6 address for %s' % interface)
85
86 else:
87 raise ValueError('Unknown IP preference %s' % preference)
88
89
90def get_all_if_ips_with_netmask(
91 interface,
92 preference=IpPreference.IPV4_PREFERRED,
93):
94 """
95 Get all IP addresses and netmasks (in form /255.255.255.0)
96 from interface name and return as a list of tuple (ip, netmask).
97
98 Raise ValueError if unable to get requested IP addresses.
99 """
100 # Raises ValueError if interface is unavailable
101 ip_addresses = netifaces.ifaddresses(interface)
102
103 try:
104 ipv4_addresses = [(ip_address['addr'], ip_address['netmask']) for
105 ip_address in ip_addresses[netifaces.AF_INET]]
106 except KeyError:
107 ipv4_addresses = None
108
109 try:
110 ipv6_addresses = [(ip_address['addr'], ip_address['netmask']) for
111 ip_address in ip_addresses[netifaces.AF_INET6]]
112 except KeyError:
113 ipv6_addresses = None
114
115 if preference == IpPreference.IPV4_ONLY:
116 if ipv4_addresses is not None:
117 return ipv4_addresses
118 else:
119 raise ValueError('Error getting IPv4 addresses for %s' % interface)
120
121 elif preference == IpPreference.IPV4_PREFERRED:
122 if ipv4_addresses is not None:
123 return ipv4_addresses
124 elif ipv6_addresses is not None:
125 return ipv6_addresses
126 else:
127 raise ValueError(
128 'Error getting IPv4/6 addresses for %s' % interface,
129 )
130
131 elif preference == IpPreference.IPV6_PREFERRED:
132 if ipv6_addresses is not None:
133 return ipv6_addresses
134 elif ipv4_addresses is not None:
135 return ipv4_addresses
136 else:
137 raise ValueError(
138 'Error getting IPv6/4 addresses for %s' % interface,
139 )
140
141 elif preference == IpPreference.IPV6_ONLY:
142 if ipv6_addresses is not None:
143 return ipv6_addresses
144 else:
145 raise ValueError('Error getting IPv6 addresses for %s' % interface)
146
147 else:
148 raise ValueError('Unknown IP preference %s' % preference)
149
150
151def get_ip_from_if(iface_name, preference=IpPreference.IPV4_PREFERRED):
152 """
153 Get ip address from interface name and return as string.
154 Extract only ip address from (ip, netmask)
155 """
156 return get_if_ip_with_netmask(iface_name, preference)[0]
157
158
159def get_all_ips_from_if(iface_name, preference=IpPreference.IPV4_PREFERRED):
160 """
161 Get all ip addresses from interface name and return as a list of string.
162 Extract only ip address from (ip, netmask)
163 """
164 return [
165 ip[0] for ip in
166 get_all_if_ips_with_netmask(iface_name, preference)
167 ]
168
169
170def get_ip_from_if_cidr(iface_name, preference=IpPreference.IPV4_PREFERRED):
171 """
172 Get IPAddress with netmask from interface name and
173 transform into CIDR (eth1 -> 192.168.60.142/24)
174 notation return as string.
175 """
176 ip, netmask = get_if_ip_with_netmask(iface_name, preference)
177 ip = '%s/%s' % (ip, netmask)
178 interface = ipaddress.ip_interface(ip).with_prefixlen # Set CIDR notation
179 return interface
180
181
182def get_all_ips_from_if_cidr(
183 iface_name,
184 preference=IpPreference.IPV4_PREFERRED,
185):
186 """
187 Get all IPAddresses with netmask from interface name and
188 transform into CIDR (eth1 -> 192.168.60.142/24) notation
189 return as a list of string.
190 """
191
192 def ip_cidr_gen():
193 for ip, netmask in get_all_if_ips_with_netmask(iface_name, preference):
194 ip = '%s/%s' % (ip, netmask)
195 # Set CIDR notation
196 ip_cidr = ipaddress.ip_interface(ip).with_prefixlen
197 yield ip_cidr
198
199 return [ip_cidr for ip_cidr in ip_cidr_gen()]
200
201
202def cidr_to_ip_netmask_tuple(cidr_network):
203 """
204 Convert CIDR-format IP network string (e.g. 10.0.0.1/24) to a tuple
205 (ip, netmask) where netmask is in the form (n.n.n.n).
206
207 Args:
208 cidr_network (str): IPv4 network in CIDR notation
209
210 Returns:
211 (str, str): 2-tuple of IP address and netmask
212 """
213 network = ipaddress.ip_network(cidr_network)
214 return '{}'.format(network.network_address), '{}'.format(network.netmask)
215
216
217def get_if_mac_address(interface):
218 """
219 Returns the MAC address of an interface.
220 Note: If multiple MAC addresses exist, the first one is chosen.
221
222 Raise ValueError if unable to get requested IP address.
223 """
224 addr = netifaces.ifaddresses(interface)
225 try:
226 return addr[netifaces.AF_LINK][0]['addr']
227 except KeyError:
228 raise ValueError('Error getting MAC address for %s' % interface)
229
230
231def get_gateway_hwid() -> str:
232 """
233 Returns the HWID of the gateway
234 Note: Currently this uses the snowflake at /etc/snowflake
235 """
236 return snowflake.snowflake()
237
238
239def is_interface_up(interface):
240 """
241 Returns whether an interface is up.
242 """
243 try:
244 addr = netifaces.ifaddresses(interface)
245 except ValueError:
246 return False
247 return netifaces.AF_INET in addr
248
249
250def call_process(cmd, callback, loop):
251 loop = loop or asyncio.get_event_loop()
252 loop.create_task(
253 loop.subprocess_shell(
254 lambda: SubprocessProtocol(callback), "nohup " + cmd,
255 preexec_fn=os.setsid,
256 ),
257 )
258
259
260class SubprocessProtocol(asyncio.SubprocessProtocol):
261 def __init__(self, callback):
262 self._callback = callback
263 self._transport = None
264
265 def connection_made(self, transport):
266 self._transport = transport
267
268 def process_exited(self):
269 self._callback(self._transport.get_returncode())