blob: 5a0e1de7d1fe9e26ca2eee462d7f1011401556f8 [file] [log] [blame]
Zsolt Haraszti66862032016-11-28 14:28:39 -08001#
Zsolt Haraszti3eb27a52017-01-03 21:56:48 -08002# Copyright 2017 the original author or authors.
Zsolt Haraszti66862032016-11-28 14:28:39 -08003#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18Agent to play gateway between CORE and an individual adapter.
19"""
20from uuid import uuid4
21
22import structlog
Zsolt Harasztief05ad22017-01-07 22:08:06 -080023from google.protobuf.json_format import MessageToJson
Zsolt Haraszti8925d1f2016-12-21 00:45:19 -080024from scapy.packet import Packet
Zsolt Haraszti66862032016-11-28 14:28:39 -080025from twisted.internet.defer import inlineCallbacks, returnValue
26from zope.interface import implementer
27
Zsolt Haraszti89a27302016-12-08 16:53:06 -080028from common.event_bus import EventBusClient
Zsolt Harasztief05ad22017-01-07 22:08:06 -080029from common.frameio.frameio import hexify
Zsolt Haraszti66862032016-11-28 14:28:39 -080030from voltha.adapters.interface import IAdapterAgent
31from voltha.protos import third_party
32from voltha.protos.device_pb2 import Device, Port
Zsolt Haraszti749b0952017-01-18 09:02:35 -080033from voltha.protos.events_pb2 import KpiEvent
Zsolt Haraszti66862032016-11-28 14:28:39 -080034from voltha.protos.voltha_pb2 import DeviceGroup, LogicalDevice, \
35 LogicalPort, AdminState
36from voltha.registry import registry
Zsolt Haraszti656ecc62016-12-28 15:08:23 -080037from voltha.core.flow_decomposer import OUTPUT
Zsolt Haraszti66862032016-11-28 14:28:39 -080038
39
Zsolt Haraszti66862032016-11-28 14:28:39 -080040@implementer(IAdapterAgent)
41class AdapterAgent(object):
42 """
43 Gate-keeper between CORE and device adapters.
44
45 On one side it interacts with Core's internal model and update/dispatch
46 mechanisms.
47
48 On the other side, it interacts with the adapters standard interface as
49 defined in
50 """
51
52 def __init__(self, adapter_name, adapter_cls):
53 self.adapter_name = adapter_name
54 self.adapter_cls = adapter_cls
55 self.core = registry('core')
56 self.adapter = None
57 self.adapter_node_proxy = None
58 self.root_proxy = self.core.get_proxy('/')
Zsolt Haraszti89a27302016-12-08 16:53:06 -080059 self._rx_event_subscriptions = {}
60 self._tx_event_subscriptions = {}
61 self.event_bus = EventBusClient()
62 self.log = structlog.get_logger(adapter_name=adapter_name)
Zsolt Haraszti66862032016-11-28 14:28:39 -080063
64 @inlineCallbacks
65 def start(self):
Zsolt Haraszti89a27302016-12-08 16:53:06 -080066 self.log.debug('starting')
Zsolt Haraszti66862032016-11-28 14:28:39 -080067 config = self._get_adapter_config() # this may be None
Zsolt Haraszti89a27302016-12-08 16:53:06 -080068 try:
69 adapter = self.adapter_cls(self, config)
70 yield adapter.start()
Zsolt Haraszti656ecc62016-12-28 15:08:23 -080071 self.adapter = adapter
72 self.adapter_node_proxy = self._update_adapter_node()
73 self._update_device_types()
Zsolt Haraszti89a27302016-12-08 16:53:06 -080074 except Exception, e:
75 self.log.exception(e)
Zsolt Haraszti89a27302016-12-08 16:53:06 -080076 self.log.info('started')
Zsolt Haraszti66862032016-11-28 14:28:39 -080077 returnValue(self)
78
79 @inlineCallbacks
80 def stop(self):
Zsolt Haraszti89a27302016-12-08 16:53:06 -080081 self.log.debug('stopping')
Zsolt Haraszti66862032016-11-28 14:28:39 -080082 if self.adapter is not None:
83 yield self.adapter.stop()
84 self.adapter = None
Zsolt Haraszti89a27302016-12-08 16:53:06 -080085 self.log.info('stopped')
Zsolt Haraszti66862032016-11-28 14:28:39 -080086
87 def _get_adapter_config(self):
88 """
89 Opportunistically load persisted adapter configuration.
90 Return None if no configuration exists yet.
91 """
92 proxy = self.core.get_proxy('/')
93 try:
94 config = proxy.get('/adapters/' + self.adapter_name)
95 return config
96 except KeyError:
97 return None
98
99 def _update_adapter_node(self):
100 """
101 Creates or updates the adapter node object based on self
102 description from the adapter.
103 """
104
105 adapter_desc = self.adapter.adapter_descriptor()
106 assert adapter_desc.id == self.adapter_name
107 path = self._make_up_to_date(
108 '/adapters', self.adapter_name, adapter_desc)
109 return self.core.get_proxy(path)
110
111 def _update_device_types(self):
112 """
113 Make sure device types are registered in Core
114 """
115 device_types = self.adapter.device_types()
116 for device_type in device_types.items:
117 key = device_type.id
118 self._make_up_to_date('/device_types', key, device_type)
119
120 def _make_up_to_date(self, container_path, key, data):
121 full_path = container_path + '/' + str(key)
122 root_proxy = self.core.get_proxy('/')
123 try:
124 root_proxy.get(full_path)
125 root_proxy.update(full_path, data)
126 except KeyError:
127 root_proxy.add(container_path, data)
128 return full_path
129
130 # ~~~~~~~~~~~~~~~~~~~~~ Core-Facing Service ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
131
132 def adopt_device(self, device):
133 return self.adapter.adopt_device(device)
134
135 def abandon_device(self, device):
136 return self.adapter.abandon_device(device)
137
138 def deactivate_device(self, device):
139 return self.adapter.deactivate_device(device)
140
Zsolt Harasztic5c5d102016-12-07 21:12:27 -0800141 def update_flows_bulk(self, device, flows, groups):
142 return self.adapter.update_flows_bulk(device, flows, groups)
143
144 def update_flows_incrementally(self, device, flow_changes, group_changes):
145 return self.update_flows_incrementally(
146 device, flow_changes, group_changes)
147
Zsolt Haraszti66862032016-11-28 14:28:39 -0800148 # ~~~~~~~~~~~~~~~~~~~ Adapter-Facing Service ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
149
150 def get_device(self, device_id):
151 return self.root_proxy.get('/devices/{}'.format(device_id))
152
153 def add_device(self, device):
154 assert isinstance(device, Device)
155 self._make_up_to_date('/devices', device.id, device)
156
157 # TODO for now, just map everything into a single device group
158 # which we create if it does not yet exist
159
160 dg = DeviceGroup(id='1')
161 self._make_up_to_date('/device_groups', dg.id, dg)
162
163 # add device to device group
164 # TODO how to do that?
165
166 def update_device(self, device):
167 assert isinstance(device, Device)
168
169 # we run the update through the device_agent so that the change
170 # does not loop back to the adapter unnecessarily
171 device_agent = self.core.get_device_agent(device.id)
172 device_agent.update_device(device)
173
174 def remove_device(self, device_id):
175 device_agent = self.core.get_device_agent(device_id)
176 device_agent.remove_device(device_id)
177
178 def add_port(self, device_id, port):
179 assert isinstance(port, Port)
180
181 # for referential integrity, add/augment references
182 port.device_id = device_id
183 me_as_peer = Port.PeerPort(device_id=device_id, port_no=port.port_no)
184 for peer in port.peers:
185 peer_port_path = '/devices/{}/ports/{}'.format(
186 peer.device_id, peer.port_no)
187 peer_port = self.root_proxy.get(peer_port_path)
188 if me_as_peer not in peer_port.peers:
189 new = peer_port.peers.add()
190 new.CopyFrom(me_as_peer)
191 self.root_proxy.update(peer_port_path, peer_port)
192
193 self._make_up_to_date('/devices/{}/ports'.format(device_id),
194 port.port_no, port)
195
Zsolt Harasztid036b7e2016-12-23 15:36:01 -0800196 def _find_first_available_id(self):
197 logical_devices = self.root_proxy.get('/logical_devices')
198 existing_ids = set(ld.id for ld in logical_devices)
199 existing_datapath_ids = set(ld.datapath_id for ld in logical_devices)
200 i = 1
201 while True:
202 if i not in existing_datapath_ids and str(i) not in existing_ids:
203 return i
204 i += 1
205
Zsolt Haraszti656ecc62016-12-28 15:08:23 -0800206 def get_logical_device(self, logical_device_id):
207 return self.root_proxy.get('/logical_devices/{}'.format(
208 logical_device_id))
209
Zsolt Haraszti66862032016-11-28 14:28:39 -0800210 def create_logical_device(self, logical_device):
211 assert isinstance(logical_device, LogicalDevice)
Zsolt Harasztid036b7e2016-12-23 15:36:01 -0800212
213 if not logical_device.id:
214 id = self._find_first_available_id()
215 logical_device.id = str(id)
216 logical_device.datapath_id = id
217
Zsolt Haraszti66862032016-11-28 14:28:39 -0800218 self._make_up_to_date('/logical_devices',
219 logical_device.id, logical_device)
220
Zsolt Haraszti656ecc62016-12-28 15:08:23 -0800221 self.event_bus.subscribe(
222 topic='packet-out:{}'.format(logical_device.id),
223 callback=lambda _, p: self.receive_packet_out(logical_device.id, p)
224 )
225
Zsolt Harasztid036b7e2016-12-23 15:36:01 -0800226 return logical_device
227
Zsolt Haraszti656ecc62016-12-28 15:08:23 -0800228 def receive_packet_out(self, logical_device_id, ofp_packet_out):
229
230 def get_port_out(opo):
231 for action in opo.actions:
232 if action.type == OUTPUT:
233 return action.output.port
234
235 out_port = get_port_out(ofp_packet_out)
236 frame = ofp_packet_out.data
237 self.adapter.receive_packet_out(logical_device_id, out_port, frame)
238
Zsolt Haraszti66862032016-11-28 14:28:39 -0800239 def add_logical_port(self, logical_device_id, port):
240 assert isinstance(port, LogicalPort)
241 self._make_up_to_date(
242 '/logical_devices/{}/ports'.format(logical_device_id),
243 port.id, port)
244
245 def child_device_detected(self,
246 parent_device_id,
247 parent_port_no,
248 child_device_type,
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800249 proxy_address,
250 **kw):
Zsolt Haraszti66862032016-11-28 14:28:39 -0800251 # we create new ONU device objects and insert them into the config
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800252 # TODO should we auto-enable the freshly created device? Probably.
Zsolt Haraszti66862032016-11-28 14:28:39 -0800253 device = Device(
254 id=uuid4().hex[:12],
255 type=child_device_type,
256 parent_id=parent_device_id,
257 parent_port_no=parent_port_no,
258 admin_state=AdminState.ENABLED,
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800259 proxy_address=proxy_address,
260 **kw
Zsolt Haraszti66862032016-11-28 14:28:39 -0800261 )
262 self._make_up_to_date(
263 '/devices', device.id, device)
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800264
265 topic = self._gen_tx_proxy_address_topic(proxy_address)
266 self._tx_event_subscriptions[topic] = self.event_bus.subscribe(
267 topic, lambda t, m: self._send_proxied_message(proxy_address, m))
268
269 def _gen_rx_proxy_address_topic(self, proxy_address):
270 """Generate unique topic name specific to this proxy address for rx"""
Zsolt Harasztief05ad22017-01-07 22:08:06 -0800271 topic = 'rx:' + MessageToJson(proxy_address)
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800272 return topic
273
274 def _gen_tx_proxy_address_topic(self, proxy_address):
275 """Generate unique topic name specific to this proxy address for tx"""
Zsolt Harasztief05ad22017-01-07 22:08:06 -0800276 topic = 'tx:' + MessageToJson(proxy_address)
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800277 return topic
278
279 def register_for_proxied_messages(self, proxy_address):
280 topic = self._gen_rx_proxy_address_topic(proxy_address)
281 self._rx_event_subscriptions[topic] = self.event_bus.subscribe(
282 topic, lambda t, m: self._receive_proxied_message(proxy_address, m))
283
284 def _receive_proxied_message(self, proxy_address, msg):
285 self.adapter.receive_proxied_message(proxy_address, msg)
286
287 def send_proxied_message(self, proxy_address, msg):
288 topic = self._gen_tx_proxy_address_topic(proxy_address)
289 self.event_bus.publish(topic, msg)
290
291 def _send_proxied_message(self, proxy_address, msg):
292 self.adapter.send_proxied_message(proxy_address, msg)
293
294 def receive_proxied_message(self, proxy_address, msg):
295 topic = self._gen_rx_proxy_address_topic(proxy_address)
296 self.event_bus.publish(topic, msg)
Zsolt Haraszti8925d1f2016-12-21 00:45:19 -0800297
298 # ~~~~~~~~~~~~~~~~~~ Handling packet-in and packet-out ~~~~~~~~~~~~~~~~~~~~
299
300 def send_packet_in(self, logical_device_id, logical_port_no, packet):
301 self.log.debug('send-packet-in', logical_device_id=logical_device_id,
Zsolt Harasztief05ad22017-01-07 22:08:06 -0800302 logical_port_no=logical_port_no, packet=hexify(packet))
Zsolt Haraszti8925d1f2016-12-21 00:45:19 -0800303
304 if isinstance(packet, Packet):
305 packet = str(packet)
306
307 topic = 'packet-in:' + logical_device_id
308 self.event_bus.publish(topic, (logical_port_no, packet))
Zsolt Haraszti749b0952017-01-18 09:02:35 -0800309
310 # ~~~~~~~~~~~~~~~~~~~ Handling KPI metric submissions ~~~~~~~~~~~~~~~~~~~~~
Zsolt Harasztic5f740b2017-01-18 09:53:17 -0800311
Zsolt Haraszti749b0952017-01-18 09:02:35 -0800312 def submit_kpis(self, kpi_event_msg):
313 try:
314 assert isinstance(kpi_event_msg, KpiEvent)
315 self.event_bus.publish('kpis', kpi_event_msg)
316 except Exception as e:
317 self.log.exception('failed-kpi-submission',
318 type=type(kpi_event_msg))