blob: acd5bc85013c91dd9f55a0a902935735f0a3f1e0 [file] [log] [blame]
Zsolt Haraszti66862032016-11-28 14:28:39 -08001#
2# Copyright 2016 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18A device agent is instantiated for each Device and plays an important role
19between the Device object and its adapter.
20"""
21import structlog
22from twisted.internet import reactor
23from twisted.internet.defer import inlineCallbacks, returnValue
24
25from voltha.core.config.config_proxy import CallbackType
26from voltha.protos.common_pb2 import AdminState, OperStatus
27from voltha.registry import registry
28
Zsolt Haraszti66862032016-11-28 14:28:39 -080029
30class InvalidStateTransition(Exception): pass
31
32
33class DeviceAgent(object):
34
35 def __init__(self, core, initial_data):
Zsolt Harasztic5c5d102016-12-07 21:12:27 -080036
Zsolt Haraszti66862032016-11-28 14:28:39 -080037 self.core = core
38 self._tmp_initial_data = initial_data
Zsolt Harasztic5c5d102016-12-07 21:12:27 -080039 self.last_data = None
40
Zsolt Haraszti66862032016-11-28 14:28:39 -080041 self.proxy = core.get_proxy('/devices/{}'.format(initial_data.id))
Zsolt Harasztic5c5d102016-12-07 21:12:27 -080042 self.flows_proxy = core.get_proxy(
43 '/devices/{}/flows'.format(initial_data.id))
44 self.groups_proxy = core.get_proxy(
45 '/devices/{}/flow_groups'.format(initial_data.id))
46
Zsolt Haraszti66862032016-11-28 14:28:39 -080047 self.proxy.register_callback(
48 CallbackType.PRE_UPDATE, self._validate_update)
49 self.proxy.register_callback(
50 CallbackType.POST_UPDATE, self._process_update)
Zsolt Harasztic5c5d102016-12-07 21:12:27 -080051
52 self.flows_proxy.register_callback(
53 CallbackType.POST_UPDATE, self._flow_table_updated)
54 self.groups_proxy.register_callback(
55 CallbackType.POST_UPDATE, self._group_table_updated)
56
57 # to know device capabilities
58 self.device_type = core.get_proxy(
59 '/device_types/{}'.format(initial_data.type)).get()
60
Zsolt Haraszti66862032016-11-28 14:28:39 -080061 self.adapter_agent = None
Zsolt Haraszti89a27302016-12-08 16:53:06 -080062 self.log = structlog.get_logger(device_id=initial_data.id)
Zsolt Haraszti66862032016-11-28 14:28:39 -080063
64 @inlineCallbacks
65 def start(self):
Zsolt Haraszti89a27302016-12-08 16:53:06 -080066 self.log.debug('starting')
Zsolt Haraszti66862032016-11-28 14:28:39 -080067 self._set_adapter_agent()
68 yield self._process_update(self._tmp_initial_data)
69 del self._tmp_initial_data
Zsolt Haraszti89a27302016-12-08 16:53:06 -080070 self.log.info('started')
Zsolt Haraszti66862032016-11-28 14:28:39 -080071 returnValue(self)
72
73 def stop(self):
Zsolt Haraszti89a27302016-12-08 16:53:06 -080074 self.log.debug('stopping')
Zsolt Haraszti66862032016-11-28 14:28:39 -080075 self.proxy.unregister_callback(
76 CallbackType.PRE_UPDATE, self._validate_update)
77 self.proxy.unregister_callback(
78 CallbackType.POST_UPDATE, self._process_update)
Zsolt Haraszti89a27302016-12-08 16:53:06 -080079 self.log.info('stopped')
Zsolt Haraszti66862032016-11-28 14:28:39 -080080
81 def _set_adapter_agent(self):
82 adapter_name = self._tmp_initial_data.adapter
83 if adapter_name == '':
84 proxy = self.core.get_proxy('/')
85 known_device_types = dict(
86 (dt.id, dt) for dt in proxy.get('/device_types'))
87 device_type = known_device_types[self._tmp_initial_data.type]
88 adapter_name = device_type.adapter
89 assert adapter_name != ''
90 self.adapter_agent = registry('adapter_loader').get_agent(adapter_name)
91
92 @inlineCallbacks
93 def _validate_update(self, device):
94 """
95 Called before each update, it allows the blocking of the update
96 (by raising an exception), or even the augmentation of the incoming
97 data.
98 """
Zsolt Haraszti89a27302016-12-08 16:53:06 -080099 self.log.debug('device-pre-update', device=device)
Zsolt Haraszti66862032016-11-28 14:28:39 -0800100 yield self._process_state_transitions(device, dry_run=True)
101 returnValue(device)
102
103 @inlineCallbacks
104 def _process_update(self, device):
105 """
106 Called after the device object was updated (individually or part of
107 a transaction), and it is used to propagate the change down to the
108 adapter
109 """
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800110 self.log.debug('device-post-update', device=device)
Zsolt Haraszti66862032016-11-28 14:28:39 -0800111
112 # first, process any potential state transition
113 yield self._process_state_transitions(device)
114
115 # finally, store this data as last data so we can see what changed
116 self.last_data = device
117
118 @inlineCallbacks
119 def _process_state_transitions(self, device, dry_run=False):
120
121 old_admin_state = getattr(self.last_data, 'admin_state',
122 AdminState.UNKNOWN)
123 new_admin_state = device.admin_state
124 transition_handler = self.admin_state_fsm.get(
125 (old_admin_state, new_admin_state), None)
126 if transition_handler is None:
127 pass # no-op
128 elif transition_handler is False:
129 raise InvalidStateTransition('{} -> {}'.format(
130 old_admin_state, new_admin_state))
131 else:
132 assert callable(transition_handler)
133 yield transition_handler(self, device, dry_run)
134
135 @inlineCallbacks
136 def _activate_device(self, device, dry_run=False):
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800137 self.log.info('activate-device', device=device, dry_run=dry_run)
Zsolt Haraszti66862032016-11-28 14:28:39 -0800138 if not dry_run:
139 device = yield self.adapter_agent.adopt_device(device)
140 device.oper_status = OperStatus.ACTIVATING
141 # successful return from this may also populated the device
142 # data, so we need to write it back
143 reactor.callLater(0, self.update_device, device)
144
145 def update_device(self, device):
146 self.last_data = device # so that we don't propagate back
147 self.proxy.update('/', device)
148
149 def remove_device(self, device_id):
150 raise NotImplementedError()
151
152 def _propagate_change(self, device, dry_run=False):
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800153 self.log.info('propagate-change', device=device, dry_run=dry_run)
Zsolt Haraszti66862032016-11-28 14:28:39 -0800154 if device != self.last_data:
155 raise NotImplementedError()
156 else:
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800157 self.log.debug('no-op')
Zsolt Haraszti66862032016-11-28 14:28:39 -0800158
159 def _abandon_device(self, device, dry_run=False):
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800160 self.log.info('abandon-device', device=device, dry_run=dry_run)
Zsolt Haraszti66862032016-11-28 14:28:39 -0800161 raise NotImplementedError()
162
163 def _disable_device(self, device, dry_run=False):
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800164 self.log.info('disable-device', device=device, dry_run=dry_run)
Zsolt Haraszti66862032016-11-28 14:28:39 -0800165 raise NotImplementedError()
166
167 def _reenable_device(self, device, dry_run=False):
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800168 self.log.info('reenable-device', device=device, dry_run=dry_run)
Zsolt Haraszti66862032016-11-28 14:28:39 -0800169 raise NotImplementedError()
170
171 admin_state_fsm = {
172
173 # Missing entries yield no-op
174 # False means invalid state change
175
176 (AdminState.UNKNOWN, AdminState.ENABLED): _activate_device,
177
178 (AdminState.PREPROVISIONED, AdminState.UNKNOWN): False,
179 (AdminState.PREPROVISIONED, AdminState.ENABLED): _activate_device,
180
181 (AdminState.ENABLED, AdminState.UNKNOWN): False,
182 (AdminState.ENABLED, AdminState.ENABLED): _propagate_change,
183 (AdminState.ENABLED, AdminState.DISABLED): _disable_device,
184 (AdminState.ENABLED, AdminState.PREPROVISIONED): _abandon_device,
185
186 (AdminState.DISABLED, AdminState.UNKNOWN): False,
187 (AdminState.DISABLED, AdminState.PREPROVISIONED): _abandon_device,
188 (AdminState.DISABLED, AdminState.ENABLED): _reenable_device
189
190 }
Zsolt Harasztic5c5d102016-12-07 21:12:27 -0800191
192 ## <======================= FLOW TABLE UPDATE HANDLING ====================
193
194 @inlineCallbacks
195 def _flow_table_updated(self, flows):
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800196 self.log.debug('flow-table-updated',
Zsolt Harasztic5c5d102016-12-07 21:12:27 -0800197 logical_device_id=self.last_data.id, flows=flows)
198
199 # if device accepts bulk flow update, lets just call that
200 if self.device_type.accepts_bulk_flow_update:
201 groups = self.groups_proxy.get('/') # gather flow groups
202 yield self.adapter_agent.update_flows_bulk(
203 device=self.last_data,
204 flows=flows,
205 groups=groups)
206 # TODO place to feed back completion
207
208 elif self.accepts_add_remove_flow_updates:
209 raise NotImplementedError()
210
211 else:
212 raise NotImplementedError()
213
214 ## <======================= GROUP TABLE UPDATE HANDLING ===================
215
216 @inlineCallbacks
217 def _group_table_updated(self, groups):
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800218 self.log.debug('group-table-updated',
Zsolt Harasztic5c5d102016-12-07 21:12:27 -0800219 logical_device_id=self.last_data.id,
220 flow_groups=groups)
221
222 # if device accepts bulk flow update, lets just call that
223 if self.device_type.accepts_bulk_flow_update:
224 flows = self.flows_proxy.get('/') # gather flows
225 yield self.adapter_agent.update_flows_bulk(
226 device=self.last_data,
227 flows=flows,
228 groups=groups)
229 # TODO place to feed back completion
230
231 elif self.accepts_add_remove_flow_updates:
232 raise NotImplementedError()
233
234 else:
235 raise NotImplementedError()
236