blob: 2313bb1a855044afee8e2ffd72bd205c6aa7b263 [file] [log] [blame]
Zsolt Haraszti66862032016-11-28 14:28:39 -08001#
Zsolt Haraszti3eb27a52017-01-03 21:56:48 -08002# Copyright 2017 the original author or authors.
Zsolt Haraszti66862032016-11-28 14:28:39 -08003#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18Agent to play gateway between CORE and an individual adapter.
19"""
20from uuid import uuid4
21
22import structlog
Zsolt Harasztief05ad22017-01-07 22:08:06 -080023from google.protobuf.json_format import MessageToJson
Zsolt Haraszti8925d1f2016-12-21 00:45:19 -080024from scapy.packet import Packet
Zsolt Haraszti66862032016-11-28 14:28:39 -080025from twisted.internet.defer import inlineCallbacks, returnValue
26from zope.interface import implementer
27
Zsolt Haraszti89a27302016-12-08 16:53:06 -080028from common.event_bus import EventBusClient
Zsolt Harasztief05ad22017-01-07 22:08:06 -080029from common.frameio.frameio import hexify
Zsolt Haraszti66862032016-11-28 14:28:39 -080030from voltha.adapters.interface import IAdapterAgent
31from voltha.protos import third_party
32from voltha.protos.device_pb2 import Device, Port
33from voltha.protos.voltha_pb2 import DeviceGroup, LogicalDevice, \
34 LogicalPort, AdminState
35from voltha.registry import registry
Zsolt Haraszti656ecc62016-12-28 15:08:23 -080036from voltha.core.flow_decomposer import OUTPUT
Zsolt Haraszti66862032016-11-28 14:28:39 -080037
38
Zsolt Haraszti66862032016-11-28 14:28:39 -080039@implementer(IAdapterAgent)
40class AdapterAgent(object):
41 """
42 Gate-keeper between CORE and device adapters.
43
44 On one side it interacts with Core's internal model and update/dispatch
45 mechanisms.
46
47 On the other side, it interacts with the adapters standard interface as
48 defined in
49 """
50
51 def __init__(self, adapter_name, adapter_cls):
52 self.adapter_name = adapter_name
53 self.adapter_cls = adapter_cls
54 self.core = registry('core')
55 self.adapter = None
56 self.adapter_node_proxy = None
57 self.root_proxy = self.core.get_proxy('/')
Zsolt Haraszti89a27302016-12-08 16:53:06 -080058 self._rx_event_subscriptions = {}
59 self._tx_event_subscriptions = {}
60 self.event_bus = EventBusClient()
61 self.log = structlog.get_logger(adapter_name=adapter_name)
Zsolt Haraszti66862032016-11-28 14:28:39 -080062
63 @inlineCallbacks
64 def start(self):
Zsolt Haraszti89a27302016-12-08 16:53:06 -080065 self.log.debug('starting')
Zsolt Haraszti66862032016-11-28 14:28:39 -080066 config = self._get_adapter_config() # this may be None
Zsolt Haraszti89a27302016-12-08 16:53:06 -080067 try:
68 adapter = self.adapter_cls(self, config)
69 yield adapter.start()
Zsolt Haraszti656ecc62016-12-28 15:08:23 -080070 self.adapter = adapter
71 self.adapter_node_proxy = self._update_adapter_node()
72 self._update_device_types()
Zsolt Haraszti89a27302016-12-08 16:53:06 -080073 except Exception, e:
74 self.log.exception(e)
Zsolt Haraszti89a27302016-12-08 16:53:06 -080075 self.log.info('started')
Zsolt Haraszti66862032016-11-28 14:28:39 -080076 returnValue(self)
77
78 @inlineCallbacks
79 def stop(self):
Zsolt Haraszti89a27302016-12-08 16:53:06 -080080 self.log.debug('stopping')
Zsolt Haraszti66862032016-11-28 14:28:39 -080081 if self.adapter is not None:
82 yield self.adapter.stop()
83 self.adapter = None
Zsolt Haraszti89a27302016-12-08 16:53:06 -080084 self.log.info('stopped')
Zsolt Haraszti66862032016-11-28 14:28:39 -080085
86 def _get_adapter_config(self):
87 """
88 Opportunistically load persisted adapter configuration.
89 Return None if no configuration exists yet.
90 """
91 proxy = self.core.get_proxy('/')
92 try:
93 config = proxy.get('/adapters/' + self.adapter_name)
94 return config
95 except KeyError:
96 return None
97
98 def _update_adapter_node(self):
99 """
100 Creates or updates the adapter node object based on self
101 description from the adapter.
102 """
103
104 adapter_desc = self.adapter.adapter_descriptor()
105 assert adapter_desc.id == self.adapter_name
106 path = self._make_up_to_date(
107 '/adapters', self.adapter_name, adapter_desc)
108 return self.core.get_proxy(path)
109
110 def _update_device_types(self):
111 """
112 Make sure device types are registered in Core
113 """
114 device_types = self.adapter.device_types()
115 for device_type in device_types.items:
116 key = device_type.id
117 self._make_up_to_date('/device_types', key, device_type)
118
119 def _make_up_to_date(self, container_path, key, data):
120 full_path = container_path + '/' + str(key)
121 root_proxy = self.core.get_proxy('/')
122 try:
123 root_proxy.get(full_path)
124 root_proxy.update(full_path, data)
125 except KeyError:
126 root_proxy.add(container_path, data)
127 return full_path
128
129 # ~~~~~~~~~~~~~~~~~~~~~ Core-Facing Service ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
130
131 def adopt_device(self, device):
132 return self.adapter.adopt_device(device)
133
134 def abandon_device(self, device):
135 return self.adapter.abandon_device(device)
136
137 def deactivate_device(self, device):
138 return self.adapter.deactivate_device(device)
139
Zsolt Harasztic5c5d102016-12-07 21:12:27 -0800140 def update_flows_bulk(self, device, flows, groups):
141 return self.adapter.update_flows_bulk(device, flows, groups)
142
143 def update_flows_incrementally(self, device, flow_changes, group_changes):
144 return self.update_flows_incrementally(
145 device, flow_changes, group_changes)
146
Zsolt Haraszti66862032016-11-28 14:28:39 -0800147 # ~~~~~~~~~~~~~~~~~~~ Adapter-Facing Service ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
148
149 def get_device(self, device_id):
150 return self.root_proxy.get('/devices/{}'.format(device_id))
151
152 def add_device(self, device):
153 assert isinstance(device, Device)
154 self._make_up_to_date('/devices', device.id, device)
155
156 # TODO for now, just map everything into a single device group
157 # which we create if it does not yet exist
158
159 dg = DeviceGroup(id='1')
160 self._make_up_to_date('/device_groups', dg.id, dg)
161
162 # add device to device group
163 # TODO how to do that?
164
165 def update_device(self, device):
166 assert isinstance(device, Device)
167
168 # we run the update through the device_agent so that the change
169 # does not loop back to the adapter unnecessarily
170 device_agent = self.core.get_device_agent(device.id)
171 device_agent.update_device(device)
172
173 def remove_device(self, device_id):
174 device_agent = self.core.get_device_agent(device_id)
175 device_agent.remove_device(device_id)
176
177 def add_port(self, device_id, port):
178 assert isinstance(port, Port)
179
180 # for referential integrity, add/augment references
181 port.device_id = device_id
182 me_as_peer = Port.PeerPort(device_id=device_id, port_no=port.port_no)
183 for peer in port.peers:
184 peer_port_path = '/devices/{}/ports/{}'.format(
185 peer.device_id, peer.port_no)
186 peer_port = self.root_proxy.get(peer_port_path)
187 if me_as_peer not in peer_port.peers:
188 new = peer_port.peers.add()
189 new.CopyFrom(me_as_peer)
190 self.root_proxy.update(peer_port_path, peer_port)
191
192 self._make_up_to_date('/devices/{}/ports'.format(device_id),
193 port.port_no, port)
194
Zsolt Harasztid036b7e2016-12-23 15:36:01 -0800195 def _find_first_available_id(self):
196 logical_devices = self.root_proxy.get('/logical_devices')
197 existing_ids = set(ld.id for ld in logical_devices)
198 existing_datapath_ids = set(ld.datapath_id for ld in logical_devices)
199 i = 1
200 while True:
201 if i not in existing_datapath_ids and str(i) not in existing_ids:
202 return i
203 i += 1
204
Zsolt Haraszti656ecc62016-12-28 15:08:23 -0800205 def get_logical_device(self, logical_device_id):
206 return self.root_proxy.get('/logical_devices/{}'.format(
207 logical_device_id))
208
Zsolt Haraszti66862032016-11-28 14:28:39 -0800209 def create_logical_device(self, logical_device):
210 assert isinstance(logical_device, LogicalDevice)
Zsolt Harasztid036b7e2016-12-23 15:36:01 -0800211
212 if not logical_device.id:
213 id = self._find_first_available_id()
214 logical_device.id = str(id)
215 logical_device.datapath_id = id
216
Zsolt Haraszti66862032016-11-28 14:28:39 -0800217 self._make_up_to_date('/logical_devices',
218 logical_device.id, logical_device)
219
Zsolt Haraszti656ecc62016-12-28 15:08:23 -0800220 self.event_bus.subscribe(
221 topic='packet-out:{}'.format(logical_device.id),
222 callback=lambda _, p: self.receive_packet_out(logical_device.id, p)
223 )
224
Zsolt Harasztid036b7e2016-12-23 15:36:01 -0800225 return logical_device
226
Zsolt Haraszti656ecc62016-12-28 15:08:23 -0800227 def receive_packet_out(self, logical_device_id, ofp_packet_out):
228
229 def get_port_out(opo):
230 for action in opo.actions:
231 if action.type == OUTPUT:
232 return action.output.port
233
234 out_port = get_port_out(ofp_packet_out)
235 frame = ofp_packet_out.data
236 self.adapter.receive_packet_out(logical_device_id, out_port, frame)
237
Zsolt Haraszti66862032016-11-28 14:28:39 -0800238 def add_logical_port(self, logical_device_id, port):
239 assert isinstance(port, LogicalPort)
240 self._make_up_to_date(
241 '/logical_devices/{}/ports'.format(logical_device_id),
242 port.id, port)
243
244 def child_device_detected(self,
245 parent_device_id,
246 parent_port_no,
247 child_device_type,
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800248 proxy_address,
249 **kw):
Zsolt Haraszti66862032016-11-28 14:28:39 -0800250 # we create new ONU device objects and insert them into the config
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800251 # TODO should we auto-enable the freshly created device? Probably.
Zsolt Haraszti66862032016-11-28 14:28:39 -0800252 device = Device(
253 id=uuid4().hex[:12],
254 type=child_device_type,
255 parent_id=parent_device_id,
256 parent_port_no=parent_port_no,
257 admin_state=AdminState.ENABLED,
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800258 proxy_address=proxy_address,
259 **kw
Zsolt Haraszti66862032016-11-28 14:28:39 -0800260 )
261 self._make_up_to_date(
262 '/devices', device.id, device)
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800263
264 topic = self._gen_tx_proxy_address_topic(proxy_address)
265 self._tx_event_subscriptions[topic] = self.event_bus.subscribe(
266 topic, lambda t, m: self._send_proxied_message(proxy_address, m))
267
268 def _gen_rx_proxy_address_topic(self, proxy_address):
269 """Generate unique topic name specific to this proxy address for rx"""
Zsolt Harasztief05ad22017-01-07 22:08:06 -0800270 topic = 'rx:' + MessageToJson(proxy_address)
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800271 return topic
272
273 def _gen_tx_proxy_address_topic(self, proxy_address):
274 """Generate unique topic name specific to this proxy address for tx"""
Zsolt Harasztief05ad22017-01-07 22:08:06 -0800275 topic = 'tx:' + MessageToJson(proxy_address)
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800276 return topic
277
278 def register_for_proxied_messages(self, proxy_address):
279 topic = self._gen_rx_proxy_address_topic(proxy_address)
280 self._rx_event_subscriptions[topic] = self.event_bus.subscribe(
281 topic, lambda t, m: self._receive_proxied_message(proxy_address, m))
282
283 def _receive_proxied_message(self, proxy_address, msg):
284 self.adapter.receive_proxied_message(proxy_address, msg)
285
286 def send_proxied_message(self, proxy_address, msg):
287 topic = self._gen_tx_proxy_address_topic(proxy_address)
288 self.event_bus.publish(topic, msg)
289
290 def _send_proxied_message(self, proxy_address, msg):
291 self.adapter.send_proxied_message(proxy_address, msg)
292
293 def receive_proxied_message(self, proxy_address, msg):
294 topic = self._gen_rx_proxy_address_topic(proxy_address)
295 self.event_bus.publish(topic, msg)
Zsolt Haraszti8925d1f2016-12-21 00:45:19 -0800296
297 # ~~~~~~~~~~~~~~~~~~ Handling packet-in and packet-out ~~~~~~~~~~~~~~~~~~~~
298
299 def send_packet_in(self, logical_device_id, logical_port_no, packet):
300 self.log.debug('send-packet-in', logical_device_id=logical_device_id,
Zsolt Harasztief05ad22017-01-07 22:08:06 -0800301 logical_port_no=logical_port_no, packet=hexify(packet))
Zsolt Haraszti8925d1f2016-12-21 00:45:19 -0800302
303 if isinstance(packet, Packet):
304 packet = str(packet)
305
306 topic = 'packet-in:' + logical_device_id
307 self.event_bus.publish(topic, (logical_port_no, packet))