blob: 83b4e89b4fa24b238325ad3fa55307dbaecac041 [file] [log] [blame]
Zsolt Haraszti66862032016-11-28 14:28:39 -08001#
2# Copyright 2016 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18Agent to play gateway between CORE and an individual adapter.
19"""
20from uuid import uuid4
21
22import structlog
23from twisted.internet.defer import inlineCallbacks, returnValue
24from zope.interface import implementer
25
Zsolt Haraszti89a27302016-12-08 16:53:06 -080026from common.event_bus import EventBusClient
Zsolt Haraszti66862032016-11-28 14:28:39 -080027from voltha.adapters.interface import IAdapterAgent
28from voltha.protos import third_party
29from voltha.protos.device_pb2 import Device, Port
30from voltha.protos.voltha_pb2 import DeviceGroup, LogicalDevice, \
31 LogicalPort, AdminState
32from voltha.registry import registry
33
34
Zsolt Haraszti66862032016-11-28 14:28:39 -080035@implementer(IAdapterAgent)
36class AdapterAgent(object):
37 """
38 Gate-keeper between CORE and device adapters.
39
40 On one side it interacts with Core's internal model and update/dispatch
41 mechanisms.
42
43 On the other side, it interacts with the adapters standard interface as
44 defined in
45 """
46
47 def __init__(self, adapter_name, adapter_cls):
48 self.adapter_name = adapter_name
49 self.adapter_cls = adapter_cls
50 self.core = registry('core')
51 self.adapter = None
52 self.adapter_node_proxy = None
53 self.root_proxy = self.core.get_proxy('/')
Zsolt Haraszti89a27302016-12-08 16:53:06 -080054 self._rx_event_subscriptions = {}
55 self._tx_event_subscriptions = {}
56 self.event_bus = EventBusClient()
57 self.log = structlog.get_logger(adapter_name=adapter_name)
Zsolt Haraszti66862032016-11-28 14:28:39 -080058
59 @inlineCallbacks
60 def start(self):
Zsolt Haraszti89a27302016-12-08 16:53:06 -080061 self.log.debug('starting')
Zsolt Haraszti66862032016-11-28 14:28:39 -080062 config = self._get_adapter_config() # this may be None
Zsolt Haraszti89a27302016-12-08 16:53:06 -080063 try:
64 adapter = self.adapter_cls(self, config)
65 yield adapter.start()
66 except Exception, e:
67 self.log.exception(e)
Zsolt Haraszti66862032016-11-28 14:28:39 -080068 self.adapter = adapter
69 self.adapter_node_proxy = self._update_adapter_node()
70 self._update_device_types()
Zsolt Haraszti89a27302016-12-08 16:53:06 -080071 self.log.info('started')
Zsolt Haraszti66862032016-11-28 14:28:39 -080072 returnValue(self)
73
74 @inlineCallbacks
75 def stop(self):
Zsolt Haraszti89a27302016-12-08 16:53:06 -080076 self.log.debug('stopping')
Zsolt Haraszti66862032016-11-28 14:28:39 -080077 if self.adapter is not None:
78 yield self.adapter.stop()
79 self.adapter = None
Zsolt Haraszti89a27302016-12-08 16:53:06 -080080 self.log.info('stopped')
Zsolt Haraszti66862032016-11-28 14:28:39 -080081
82 def _get_adapter_config(self):
83 """
84 Opportunistically load persisted adapter configuration.
85 Return None if no configuration exists yet.
86 """
87 proxy = self.core.get_proxy('/')
88 try:
89 config = proxy.get('/adapters/' + self.adapter_name)
90 return config
91 except KeyError:
92 return None
93
94 def _update_adapter_node(self):
95 """
96 Creates or updates the adapter node object based on self
97 description from the adapter.
98 """
99
100 adapter_desc = self.adapter.adapter_descriptor()
101 assert adapter_desc.id == self.adapter_name
102 path = self._make_up_to_date(
103 '/adapters', self.adapter_name, adapter_desc)
104 return self.core.get_proxy(path)
105
106 def _update_device_types(self):
107 """
108 Make sure device types are registered in Core
109 """
110 device_types = self.adapter.device_types()
111 for device_type in device_types.items:
112 key = device_type.id
113 self._make_up_to_date('/device_types', key, device_type)
114
115 def _make_up_to_date(self, container_path, key, data):
116 full_path = container_path + '/' + str(key)
117 root_proxy = self.core.get_proxy('/')
118 try:
119 root_proxy.get(full_path)
120 root_proxy.update(full_path, data)
121 except KeyError:
122 root_proxy.add(container_path, data)
123 return full_path
124
125 # ~~~~~~~~~~~~~~~~~~~~~ Core-Facing Service ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
126
127 def adopt_device(self, device):
128 return self.adapter.adopt_device(device)
129
130 def abandon_device(self, device):
131 return self.adapter.abandon_device(device)
132
133 def deactivate_device(self, device):
134 return self.adapter.deactivate_device(device)
135
Zsolt Harasztic5c5d102016-12-07 21:12:27 -0800136 def update_flows_bulk(self, device, flows, groups):
137 return self.adapter.update_flows_bulk(device, flows, groups)
138
139 def update_flows_incrementally(self, device, flow_changes, group_changes):
140 return self.update_flows_incrementally(
141 device, flow_changes, group_changes)
142
Zsolt Haraszti66862032016-11-28 14:28:39 -0800143 # ~~~~~~~~~~~~~~~~~~~ Adapter-Facing Service ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
144
145 def get_device(self, device_id):
146 return self.root_proxy.get('/devices/{}'.format(device_id))
147
148 def add_device(self, device):
149 assert isinstance(device, Device)
150 self._make_up_to_date('/devices', device.id, device)
151
152 # TODO for now, just map everything into a single device group
153 # which we create if it does not yet exist
154
155 dg = DeviceGroup(id='1')
156 self._make_up_to_date('/device_groups', dg.id, dg)
157
158 # add device to device group
159 # TODO how to do that?
160
161 def update_device(self, device):
162 assert isinstance(device, Device)
163
164 # we run the update through the device_agent so that the change
165 # does not loop back to the adapter unnecessarily
166 device_agent = self.core.get_device_agent(device.id)
167 device_agent.update_device(device)
168
169 def remove_device(self, device_id):
170 device_agent = self.core.get_device_agent(device_id)
171 device_agent.remove_device(device_id)
172
173 def add_port(self, device_id, port):
174 assert isinstance(port, Port)
175
176 # for referential integrity, add/augment references
177 port.device_id = device_id
178 me_as_peer = Port.PeerPort(device_id=device_id, port_no=port.port_no)
179 for peer in port.peers:
180 peer_port_path = '/devices/{}/ports/{}'.format(
181 peer.device_id, peer.port_no)
182 peer_port = self.root_proxy.get(peer_port_path)
183 if me_as_peer not in peer_port.peers:
184 new = peer_port.peers.add()
185 new.CopyFrom(me_as_peer)
186 self.root_proxy.update(peer_port_path, peer_port)
187
188 self._make_up_to_date('/devices/{}/ports'.format(device_id),
189 port.port_no, port)
190
191 def create_logical_device(self, logical_device):
192 assert isinstance(logical_device, LogicalDevice)
193 self._make_up_to_date('/logical_devices',
194 logical_device.id, logical_device)
195
196 def add_logical_port(self, logical_device_id, port):
197 assert isinstance(port, LogicalPort)
198 self._make_up_to_date(
199 '/logical_devices/{}/ports'.format(logical_device_id),
200 port.id, port)
201
202 def child_device_detected(self,
203 parent_device_id,
204 parent_port_no,
205 child_device_type,
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800206 proxy_address,
207 **kw):
Zsolt Haraszti66862032016-11-28 14:28:39 -0800208 # we create new ONU device objects and insert them into the config
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800209 # TODO should we auto-enable the freshly created device? Probably.
Zsolt Haraszti66862032016-11-28 14:28:39 -0800210 device = Device(
211 id=uuid4().hex[:12],
212 type=child_device_type,
213 parent_id=parent_device_id,
214 parent_port_no=parent_port_no,
215 admin_state=AdminState.ENABLED,
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800216 proxy_address=proxy_address,
217 **kw
Zsolt Haraszti66862032016-11-28 14:28:39 -0800218 )
219 self._make_up_to_date(
220 '/devices', device.id, device)
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800221
222 topic = self._gen_tx_proxy_address_topic(proxy_address)
223 self._tx_event_subscriptions[topic] = self.event_bus.subscribe(
224 topic, lambda t, m: self._send_proxied_message(proxy_address, m))
225
226 def _gen_rx_proxy_address_topic(self, proxy_address):
227 """Generate unique topic name specific to this proxy address for rx"""
228 topic = 'rx:' + proxy_address.SerializeToString()
229 return topic
230
231 def _gen_tx_proxy_address_topic(self, proxy_address):
232 """Generate unique topic name specific to this proxy address for tx"""
233 topic = 'tx:' + proxy_address.SerializeToString()
234 return topic
235
236 def register_for_proxied_messages(self, proxy_address):
237 topic = self._gen_rx_proxy_address_topic(proxy_address)
238 self._rx_event_subscriptions[topic] = self.event_bus.subscribe(
239 topic, lambda t, m: self._receive_proxied_message(proxy_address, m))
240
241 def _receive_proxied_message(self, proxy_address, msg):
242 self.adapter.receive_proxied_message(proxy_address, msg)
243
244 def send_proxied_message(self, proxy_address, msg):
245 topic = self._gen_tx_proxy_address_topic(proxy_address)
246 self.event_bus.publish(topic, msg)
247
248 def _send_proxied_message(self, proxy_address, msg):
249 self.adapter.send_proxied_message(proxy_address, msg)
250
251 def receive_proxied_message(self, proxy_address, msg):
252 topic = self._gen_rx_proxy_address_topic(proxy_address)
253 self.event_bus.publish(topic, msg)