blob: 4e0f7b44d116b29c03a9ebe543702f0c4911dda3 [file] [log] [blame]
Wei-Yu Chenad55cb82022-02-15 20:07:01 +08001# SPDX-FileCopyrightText: 2020 The Magma Authors.
2# SPDX-FileCopyrightText: 2022 Open Networking Foundation <support@opennetworking.org>
3#
4# SPDX-License-Identifier: BSD-3-Clause
Wei-Yu Chen49950b92021-11-08 19:19:18 +08005
6import asyncio
7import ipaddress
8import os
9from enum import Enum
10
11import netifaces
12import snowflake
13
14
15class IpPreference(Enum):
16 IPV4_ONLY = 1
17 IPV4_PREFERRED = 2
18 IPV6_PREFERRED = 3
19 IPV6_ONLY = 4
20
21
22def get_if_ip_with_netmask(interface, preference=IpPreference.IPV4_PREFERRED):
23 """
24 Get IP address and netmask (in form /255.255.255.0)
25 from interface name and return as tuple (ip, netmask).
26 Note: If multiple v4/v6 addresses exist, the first is chosen
27
28 Raise ValueError if unable to get requested IP address.
29 """
30 # Raises ValueError if interface is unavailable
31 ip_addresses = netifaces.ifaddresses(interface)
32
33 try:
34 ipv4_address = (
35 ip_addresses[netifaces.AF_INET][0]['addr'],
36 ip_addresses[netifaces.AF_INET][0]['netmask'],
37 )
38 except KeyError:
39 ipv4_address = None
40
41 try:
42 ipv6_address = (
43 ip_addresses[netifaces.AF_INET6][0]["addr"].split("%")[0],
44 ip_addresses[netifaces.AF_INET6][0]["netmask"],
45 )
46
47 except KeyError:
48 ipv6_address = None
49
50 if preference == IpPreference.IPV4_ONLY:
51 if ipv4_address is not None:
52 return ipv4_address
53 else:
54 raise ValueError('Error getting IPv4 address for %s' % interface)
55
56 elif preference == IpPreference.IPV4_PREFERRED:
57 if ipv4_address is not None:
58 return ipv4_address
59 elif ipv6_address is not None:
60 return ipv6_address
61 else:
62 raise ValueError('Error getting IPv4/6 address for %s' % interface)
63
64 elif preference == IpPreference.IPV6_PREFERRED:
65 if ipv6_address is not None:
66 return ipv6_address
67 elif ipv4_address is not None:
68 return ipv4_address
69 else:
70 raise ValueError('Error getting IPv6/4 address for %s' % interface)
71
72 elif preference == IpPreference.IPV6_ONLY:
73 if ipv6_address is not None:
74 return ipv6_address
75 else:
76 raise ValueError('Error getting IPv6 address for %s' % interface)
77
78 else:
79 raise ValueError('Unknown IP preference %s' % preference)
80
81
82def get_all_if_ips_with_netmask(
83 interface,
84 preference=IpPreference.IPV4_PREFERRED,
85):
86 """
87 Get all IP addresses and netmasks (in form /255.255.255.0)
88 from interface name and return as a list of tuple (ip, netmask).
89
90 Raise ValueError if unable to get requested IP addresses.
91 """
92 # Raises ValueError if interface is unavailable
93 ip_addresses = netifaces.ifaddresses(interface)
94
95 try:
96 ipv4_addresses = [(ip_address['addr'], ip_address['netmask']) for
97 ip_address in ip_addresses[netifaces.AF_INET]]
98 except KeyError:
99 ipv4_addresses = None
100
101 try:
102 ipv6_addresses = [(ip_address['addr'], ip_address['netmask']) for
103 ip_address in ip_addresses[netifaces.AF_INET6]]
104 except KeyError:
105 ipv6_addresses = None
106
107 if preference == IpPreference.IPV4_ONLY:
108 if ipv4_addresses is not None:
109 return ipv4_addresses
110 else:
111 raise ValueError('Error getting IPv4 addresses for %s' % interface)
112
113 elif preference == IpPreference.IPV4_PREFERRED:
114 if ipv4_addresses is not None:
115 return ipv4_addresses
116 elif ipv6_addresses is not None:
117 return ipv6_addresses
118 else:
119 raise ValueError(
120 'Error getting IPv4/6 addresses for %s' % interface,
121 )
122
123 elif preference == IpPreference.IPV6_PREFERRED:
124 if ipv6_addresses is not None:
125 return ipv6_addresses
126 elif ipv4_addresses is not None:
127 return ipv4_addresses
128 else:
129 raise ValueError(
130 'Error getting IPv6/4 addresses for %s' % interface,
131 )
132
133 elif preference == IpPreference.IPV6_ONLY:
134 if ipv6_addresses is not None:
135 return ipv6_addresses
136 else:
137 raise ValueError('Error getting IPv6 addresses for %s' % interface)
138
139 else:
140 raise ValueError('Unknown IP preference %s' % preference)
141
142
143def get_ip_from_if(iface_name, preference=IpPreference.IPV4_PREFERRED):
144 """
145 Get ip address from interface name and return as string.
146 Extract only ip address from (ip, netmask)
147 """
148 return get_if_ip_with_netmask(iface_name, preference)[0]
149
150
151def get_all_ips_from_if(iface_name, preference=IpPreference.IPV4_PREFERRED):
152 """
153 Get all ip addresses from interface name and return as a list of string.
154 Extract only ip address from (ip, netmask)
155 """
156 return [
157 ip[0] for ip in
158 get_all_if_ips_with_netmask(iface_name, preference)
159 ]
160
161
162def get_ip_from_if_cidr(iface_name, preference=IpPreference.IPV4_PREFERRED):
163 """
164 Get IPAddress with netmask from interface name and
165 transform into CIDR (eth1 -> 192.168.60.142/24)
166 notation return as string.
167 """
168 ip, netmask = get_if_ip_with_netmask(iface_name, preference)
169 ip = '%s/%s' % (ip, netmask)
170 interface = ipaddress.ip_interface(ip).with_prefixlen # Set CIDR notation
171 return interface
172
173
174def get_all_ips_from_if_cidr(
175 iface_name,
176 preference=IpPreference.IPV4_PREFERRED,
177):
178 """
179 Get all IPAddresses with netmask from interface name and
180 transform into CIDR (eth1 -> 192.168.60.142/24) notation
181 return as a list of string.
182 """
183
184 def ip_cidr_gen():
185 for ip, netmask in get_all_if_ips_with_netmask(iface_name, preference):
186 ip = '%s/%s' % (ip, netmask)
187 # Set CIDR notation
188 ip_cidr = ipaddress.ip_interface(ip).with_prefixlen
189 yield ip_cidr
190
191 return [ip_cidr for ip_cidr in ip_cidr_gen()]
192
193
194def cidr_to_ip_netmask_tuple(cidr_network):
195 """
196 Convert CIDR-format IP network string (e.g. 10.0.0.1/24) to a tuple
197 (ip, netmask) where netmask is in the form (n.n.n.n).
198
199 Args:
200 cidr_network (str): IPv4 network in CIDR notation
201
202 Returns:
203 (str, str): 2-tuple of IP address and netmask
204 """
205 network = ipaddress.ip_network(cidr_network)
206 return '{}'.format(network.network_address), '{}'.format(network.netmask)
207
208
209def get_if_mac_address(interface):
210 """
211 Returns the MAC address of an interface.
212 Note: If multiple MAC addresses exist, the first one is chosen.
213
214 Raise ValueError if unable to get requested IP address.
215 """
216 addr = netifaces.ifaddresses(interface)
217 try:
218 return addr[netifaces.AF_LINK][0]['addr']
219 except KeyError:
220 raise ValueError('Error getting MAC address for %s' % interface)
221
222
223def get_gateway_hwid() -> str:
224 """
225 Returns the HWID of the gateway
226 Note: Currently this uses the snowflake at /etc/snowflake
227 """
228 return snowflake.snowflake()
229
230
231def is_interface_up(interface):
232 """
233 Returns whether an interface is up.
234 """
235 try:
236 addr = netifaces.ifaddresses(interface)
237 except ValueError:
238 return False
239 return netifaces.AF_INET in addr
240
241
242def call_process(cmd, callback, loop):
243 loop = loop or asyncio.get_event_loop()
244 loop.create_task(
245 loop.subprocess_shell(
246 lambda: SubprocessProtocol(callback), "nohup " + cmd,
247 preexec_fn=os.setsid,
248 ),
249 )
250
251
252class SubprocessProtocol(asyncio.SubprocessProtocol):
253 def __init__(self, callback):
254 self._callback = callback
255 self._transport = None
256
257 def connection_made(self, transport):
258 self._transport = transport
259
260 def process_exited(self):
261 self._callback(self._transport.get_returncode())