blob: 0c5448fab4c06775fa56ea757bd1c1c0a0150756 [file] [log] [blame]
Zsolt Haraszti66862032016-11-28 14:28:39 -08001#
2# Copyright 2016 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18Agent to play gateway between CORE and an individual adapter.
19"""
20from uuid import uuid4
21
22import structlog
Zsolt Haraszti8925d1f2016-12-21 00:45:19 -080023from scapy.packet import Packet
Zsolt Haraszti66862032016-11-28 14:28:39 -080024from twisted.internet.defer import inlineCallbacks, returnValue
25from zope.interface import implementer
26
Zsolt Haraszti89a27302016-12-08 16:53:06 -080027from common.event_bus import EventBusClient
Zsolt Haraszti66862032016-11-28 14:28:39 -080028from voltha.adapters.interface import IAdapterAgent
29from voltha.protos import third_party
30from voltha.protos.device_pb2 import Device, Port
31from voltha.protos.voltha_pb2 import DeviceGroup, LogicalDevice, \
32 LogicalPort, AdminState
33from voltha.registry import registry
34
35
Zsolt Haraszti66862032016-11-28 14:28:39 -080036@implementer(IAdapterAgent)
37class AdapterAgent(object):
38 """
39 Gate-keeper between CORE and device adapters.
40
41 On one side it interacts with Core's internal model and update/dispatch
42 mechanisms.
43
44 On the other side, it interacts with the adapters standard interface as
45 defined in
46 """
47
48 def __init__(self, adapter_name, adapter_cls):
49 self.adapter_name = adapter_name
50 self.adapter_cls = adapter_cls
51 self.core = registry('core')
52 self.adapter = None
53 self.adapter_node_proxy = None
54 self.root_proxy = self.core.get_proxy('/')
Zsolt Haraszti89a27302016-12-08 16:53:06 -080055 self._rx_event_subscriptions = {}
56 self._tx_event_subscriptions = {}
57 self.event_bus = EventBusClient()
58 self.log = structlog.get_logger(adapter_name=adapter_name)
Zsolt Haraszti66862032016-11-28 14:28:39 -080059
60 @inlineCallbacks
61 def start(self):
Zsolt Haraszti89a27302016-12-08 16:53:06 -080062 self.log.debug('starting')
Zsolt Haraszti66862032016-11-28 14:28:39 -080063 config = self._get_adapter_config() # this may be None
Zsolt Haraszti89a27302016-12-08 16:53:06 -080064 try:
65 adapter = self.adapter_cls(self, config)
66 yield adapter.start()
67 except Exception, e:
68 self.log.exception(e)
Zsolt Haraszti66862032016-11-28 14:28:39 -080069 self.adapter = adapter
70 self.adapter_node_proxy = self._update_adapter_node()
71 self._update_device_types()
Zsolt Haraszti89a27302016-12-08 16:53:06 -080072 self.log.info('started')
Zsolt Haraszti66862032016-11-28 14:28:39 -080073 returnValue(self)
74
75 @inlineCallbacks
76 def stop(self):
Zsolt Haraszti89a27302016-12-08 16:53:06 -080077 self.log.debug('stopping')
Zsolt Haraszti66862032016-11-28 14:28:39 -080078 if self.adapter is not None:
79 yield self.adapter.stop()
80 self.adapter = None
Zsolt Haraszti89a27302016-12-08 16:53:06 -080081 self.log.info('stopped')
Zsolt Haraszti66862032016-11-28 14:28:39 -080082
83 def _get_adapter_config(self):
84 """
85 Opportunistically load persisted adapter configuration.
86 Return None if no configuration exists yet.
87 """
88 proxy = self.core.get_proxy('/')
89 try:
90 config = proxy.get('/adapters/' + self.adapter_name)
91 return config
92 except KeyError:
93 return None
94
95 def _update_adapter_node(self):
96 """
97 Creates or updates the adapter node object based on self
98 description from the adapter.
99 """
100
101 adapter_desc = self.adapter.adapter_descriptor()
102 assert adapter_desc.id == self.adapter_name
103 path = self._make_up_to_date(
104 '/adapters', self.adapter_name, adapter_desc)
105 return self.core.get_proxy(path)
106
107 def _update_device_types(self):
108 """
109 Make sure device types are registered in Core
110 """
111 device_types = self.adapter.device_types()
112 for device_type in device_types.items:
113 key = device_type.id
114 self._make_up_to_date('/device_types', key, device_type)
115
116 def _make_up_to_date(self, container_path, key, data):
117 full_path = container_path + '/' + str(key)
118 root_proxy = self.core.get_proxy('/')
119 try:
120 root_proxy.get(full_path)
121 root_proxy.update(full_path, data)
122 except KeyError:
123 root_proxy.add(container_path, data)
124 return full_path
125
126 # ~~~~~~~~~~~~~~~~~~~~~ Core-Facing Service ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
127
128 def adopt_device(self, device):
129 return self.adapter.adopt_device(device)
130
131 def abandon_device(self, device):
132 return self.adapter.abandon_device(device)
133
134 def deactivate_device(self, device):
135 return self.adapter.deactivate_device(device)
136
Zsolt Harasztic5c5d102016-12-07 21:12:27 -0800137 def update_flows_bulk(self, device, flows, groups):
138 return self.adapter.update_flows_bulk(device, flows, groups)
139
140 def update_flows_incrementally(self, device, flow_changes, group_changes):
141 return self.update_flows_incrementally(
142 device, flow_changes, group_changes)
143
Zsolt Haraszti66862032016-11-28 14:28:39 -0800144 # ~~~~~~~~~~~~~~~~~~~ Adapter-Facing Service ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
145
146 def get_device(self, device_id):
147 return self.root_proxy.get('/devices/{}'.format(device_id))
148
149 def add_device(self, device):
150 assert isinstance(device, Device)
151 self._make_up_to_date('/devices', device.id, device)
152
153 # TODO for now, just map everything into a single device group
154 # which we create if it does not yet exist
155
156 dg = DeviceGroup(id='1')
157 self._make_up_to_date('/device_groups', dg.id, dg)
158
159 # add device to device group
160 # TODO how to do that?
161
162 def update_device(self, device):
163 assert isinstance(device, Device)
164
165 # we run the update through the device_agent so that the change
166 # does not loop back to the adapter unnecessarily
167 device_agent = self.core.get_device_agent(device.id)
168 device_agent.update_device(device)
169
170 def remove_device(self, device_id):
171 device_agent = self.core.get_device_agent(device_id)
172 device_agent.remove_device(device_id)
173
174 def add_port(self, device_id, port):
175 assert isinstance(port, Port)
176
177 # for referential integrity, add/augment references
178 port.device_id = device_id
179 me_as_peer = Port.PeerPort(device_id=device_id, port_no=port.port_no)
180 for peer in port.peers:
181 peer_port_path = '/devices/{}/ports/{}'.format(
182 peer.device_id, peer.port_no)
183 peer_port = self.root_proxy.get(peer_port_path)
184 if me_as_peer not in peer_port.peers:
185 new = peer_port.peers.add()
186 new.CopyFrom(me_as_peer)
187 self.root_proxy.update(peer_port_path, peer_port)
188
189 self._make_up_to_date('/devices/{}/ports'.format(device_id),
190 port.port_no, port)
191
Zsolt Harasztid036b7e2016-12-23 15:36:01 -0800192 def _find_first_available_id(self):
193 logical_devices = self.root_proxy.get('/logical_devices')
194 existing_ids = set(ld.id for ld in logical_devices)
195 existing_datapath_ids = set(ld.datapath_id for ld in logical_devices)
196 i = 1
197 while True:
198 if i not in existing_datapath_ids and str(i) not in existing_ids:
199 return i
200 i += 1
201
Zsolt Haraszti66862032016-11-28 14:28:39 -0800202 def create_logical_device(self, logical_device):
203 assert isinstance(logical_device, LogicalDevice)
Zsolt Harasztid036b7e2016-12-23 15:36:01 -0800204
205 if not logical_device.id:
206 id = self._find_first_available_id()
207 logical_device.id = str(id)
208 logical_device.datapath_id = id
209
Zsolt Haraszti66862032016-11-28 14:28:39 -0800210 self._make_up_to_date('/logical_devices',
211 logical_device.id, logical_device)
212
Zsolt Harasztid036b7e2016-12-23 15:36:01 -0800213 return logical_device
214
Zsolt Haraszti66862032016-11-28 14:28:39 -0800215 def add_logical_port(self, logical_device_id, port):
216 assert isinstance(port, LogicalPort)
217 self._make_up_to_date(
218 '/logical_devices/{}/ports'.format(logical_device_id),
219 port.id, port)
220
221 def child_device_detected(self,
222 parent_device_id,
223 parent_port_no,
224 child_device_type,
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800225 proxy_address,
226 **kw):
Zsolt Haraszti66862032016-11-28 14:28:39 -0800227 # we create new ONU device objects and insert them into the config
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800228 # TODO should we auto-enable the freshly created device? Probably.
Zsolt Haraszti66862032016-11-28 14:28:39 -0800229 device = Device(
230 id=uuid4().hex[:12],
231 type=child_device_type,
232 parent_id=parent_device_id,
233 parent_port_no=parent_port_no,
234 admin_state=AdminState.ENABLED,
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800235 proxy_address=proxy_address,
236 **kw
Zsolt Haraszti66862032016-11-28 14:28:39 -0800237 )
238 self._make_up_to_date(
239 '/devices', device.id, device)
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800240
241 topic = self._gen_tx_proxy_address_topic(proxy_address)
242 self._tx_event_subscriptions[topic] = self.event_bus.subscribe(
243 topic, lambda t, m: self._send_proxied_message(proxy_address, m))
244
245 def _gen_rx_proxy_address_topic(self, proxy_address):
246 """Generate unique topic name specific to this proxy address for rx"""
247 topic = 'rx:' + proxy_address.SerializeToString()
248 return topic
249
250 def _gen_tx_proxy_address_topic(self, proxy_address):
251 """Generate unique topic name specific to this proxy address for tx"""
252 topic = 'tx:' + proxy_address.SerializeToString()
253 return topic
254
255 def register_for_proxied_messages(self, proxy_address):
256 topic = self._gen_rx_proxy_address_topic(proxy_address)
257 self._rx_event_subscriptions[topic] = self.event_bus.subscribe(
258 topic, lambda t, m: self._receive_proxied_message(proxy_address, m))
259
260 def _receive_proxied_message(self, proxy_address, msg):
261 self.adapter.receive_proxied_message(proxy_address, msg)
262
263 def send_proxied_message(self, proxy_address, msg):
264 topic = self._gen_tx_proxy_address_topic(proxy_address)
265 self.event_bus.publish(topic, msg)
266
267 def _send_proxied_message(self, proxy_address, msg):
268 self.adapter.send_proxied_message(proxy_address, msg)
269
270 def receive_proxied_message(self, proxy_address, msg):
271 topic = self._gen_rx_proxy_address_topic(proxy_address)
272 self.event_bus.publish(topic, msg)
Zsolt Haraszti8925d1f2016-12-21 00:45:19 -0800273
274 # ~~~~~~~~~~~~~~~~~~ Handling packet-in and packet-out ~~~~~~~~~~~~~~~~~~~~
275
276 def send_packet_in(self, logical_device_id, logical_port_no, packet):
277 self.log.debug('send-packet-in', logical_device_id=logical_device_id,
278 logical_port_no=logical_port_no, packet=packet)
279
280 if isinstance(packet, Packet):
281 packet = str(packet)
282
283 topic = 'packet-in:' + logical_device_id
284 self.event_bus.publish(topic, (logical_port_no, packet))