blob: 224e8ec3ea9e32615110b7bd9e312f6a97e54fbb [file] [log] [blame]
#
# Copyright 2017 the original author or authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
A device agent is instantiated for each Device and plays an important role
between the Device object and its adapter.
"""
import structlog
from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks, returnValue
from voltha.core.config.config_proxy import CallbackType
from voltha.protos.common_pb2 import AdminState, OperStatus, ConnectStatus, \
OperationResp
from voltha.protos.device_pb2 import ImageDownload
from voltha.registry import registry
from voltha.protos.openflow_13_pb2 import Flows, FlowGroups
class InvalidStateTransition(Exception): pass
class DeviceAgent(object):
def __init__(self, core, initial_data):
self.core = core
self._tmp_initial_data = initial_data
self.last_data = None
self.calback_data = None
self.proxy = core.get_proxy('/devices/{}'.format(initial_data.id))
self.flows_proxy = core.get_proxy(
'/devices/{}/flows'.format(initial_data.id))
self.groups_proxy = core.get_proxy(
'/devices/{}/flow_groups'.format(initial_data.id))
self.pm_config_proxy = core.get_proxy(
'/devices/{}/pm_configs'.format(initial_data.id))
self.img_dnld_proxies = {}
self.proxy.register_callback(
CallbackType.PRE_UPDATE, self._validate_update)
self.proxy.register_callback(
CallbackType.POST_UPDATE, self._process_update)
self.flows_proxy.register_callback(
CallbackType.POST_UPDATE, self._flow_table_updated)
self.groups_proxy.register_callback(
CallbackType.POST_UPDATE, self._group_table_updated)
self.pm_config_proxy.register_callback(
CallbackType.POST_UPDATE, self._pm_config_updated)
# to know device capabilities
self.device_type = core.get_proxy(
'/device_types/{}'.format(initial_data.type)).get()
self.adapter_agent = None
self.log = structlog.get_logger(device_id=initial_data.id)
@inlineCallbacks
def start(self, device=None, reconcile=False):
self.log.info('starting', device=device)
self._set_adapter_agent()
if device:
# Starting from an existing data, so set the last_data
self.last_data = device
if reconcile:
self.reconcile_existing_device(device)
else:
yield self._process_update(self._tmp_initial_data)
del self._tmp_initial_data
self.log.info('started')
returnValue(self)
@inlineCallbacks
def stop(self, device):
self.log.debug('stopping', device=device)
# First, propagate this request to the device agents
yield self._delete_device(device)
self.proxy.unregister_callback(
CallbackType.PRE_UPDATE, self._validate_update)
self.proxy.unregister_callback(
CallbackType.POST_UPDATE, self._process_update)
self.log.info('stopped')
@inlineCallbacks
def reboot_device(self, device, dry_run=False):
self.log.debug('reboot-device', device=device, dry_run=dry_run)
if not dry_run:
yield self.adapter_agent.reboot_device(device)
def register_image_download(self, request):
try:
self.log.debug('register-image-download', request=request)
path = '/devices/{}/image_downloads/{}'.format(request.id, request.name)
self.img_dnld_proxies[request.name] = self.core.get_proxy(path)
self.img_dnld_proxies[request.name].register_callback(
CallbackType.POST_UPDATE, self._update_image)
# trigger update callback
request.state = ImageDownload.DOWNLOAD_REQUESTED
self.img_dnld_proxies[request.name].update('/', request)
except Exception as e:
self.log.exception(e.message)
def activate_image_update(self, request):
try:
self.log.debug('activate-image-download', request=request)
request.image_state = ImageDownload.IMAGE_ACTIVATE
self.img_dnld_proxies[request.name].update('/', request)
except Exception as e:
self.log.exception(e.message)
def revert_image_update(self, request):
try:
self.log.debug('revert-image-download', request=request)
request.image_state = ImageDownload.IMAGE_REVERT
self.img_dnld_proxies[request.name].update('/', request)
except Exception as e:
self.log.exception(e.message)
@inlineCallbacks
def _download_image(self, device, img_dnld):
try:
self.log.debug('download-image', img_dnld=img_dnld)
yield self.adapter_agent.download_image(device, img_dnld)
except Exception as e:
self.log.exception(e.message)
def get_image_download_status(self, request):
try:
self.log.debug('get-image-download-status',
request=request)
device = self.proxy.get('/')
self.adapter_agent.get_image_download_status(device, request)
except Exception as e:
self.log.exception(e.message)
def cancel_image_download(self, img_dnld):
try:
self.log.debug('cancel-image-download',
img_dnld=img_dnld)
device = self.proxy.get('/')
self.adapter_agent.cancel_image_download(device, img_dnld)
except Exception as e:
self.log.exception(e.message)
def update_device_image_download(self, img_dnld):
try:
self.log.debug('update-device-image-download',
img_dnld=img_dnld)
self.proxy.update('/image_downloads/{}'\
.format(img_dnld.name), img_dnld)
except Exception as e:
self.log.exception(e.message)
def unregister_device_image_download(self, name):
try:
self.log.debug('unregister-device-image-download',
name=name)
self.self_proxies[name].unregister_callback(
CallbackType.POST_ADD, self._download_image)
self.self_proxies[name].unregister_callback(
CallbackType.POST_UPDATE, self._process_image)
except Exception as e:
self.log.exception(e.message)
@inlineCallbacks
def _update_image(self, img_dnld):
try:
self.log.debug('update-image', img_dnld=img_dnld)
# handle download
if img_dnld.state == ImageDownload.DOWNLOAD_REQUESTED:
device = self.proxy.get('/')
yield self._download_image(device, img_dnld)
if img_dnld.image_state == ImageDownload.IMAGE_ACTIVATE:
device = self.proxy.get('/')
yield self.adapter_agent.activate_image_update(device, img_dnld)
elif img_dnld.image_state == ImageDownload.IMAGE_REVERT:
device = self.proxy.get('/')
yield self.adapter_agent.revert_image_update(device, img_dnld)
except Exception as e:
self.log.exception(e.message)
@inlineCallbacks
def self_test(self, device, dry_run=False):
self.log.debug('self-test-device', device=device, dry_run=dry_run)
if not dry_run:
result = yield self.adapter_agent.self_test(device)
returnValue(result)
@inlineCallbacks
def get_device_details(self, device, dry_run=False):
self.log.debug('get-device-details', device=device, dry_run=dry_run)
if not dry_run:
yield self.adapter_agent.get_device_details(device)
@inlineCallbacks
def suppress_alarm(self, filter):
self.log.debug('suppress-alarms')
try:
yield self.adapter_agent.suppress_alarm(filter)
except Exception as e:
self.log.exception(e.message)
@inlineCallbacks
def unsuppress_alarm(self, filter):
self.log.debug('unsuppress-alarms')
try:
yield self.adapter_agent.unsuppress_alarm(filter)
except Exception as e:
self.log.exception(e.message)
@inlineCallbacks
def reconcile_existing_device(self, device, dry_run=False):
self.log.debug('reconcile-existing-device',
device=device,
dry_run=False)
if not dry_run:
yield self.adapter_agent.reconcile_device(device)
def _set_adapter_agent(self):
adapter_name = self._tmp_initial_data.adapter
if adapter_name == '':
proxy = self.core.get_proxy('/')
known_device_types = dict(
(dt.id, dt) for dt in proxy.get('/device_types'))
device_type = known_device_types[self._tmp_initial_data.type]
adapter_name = device_type.adapter
assert adapter_name != ''
self.adapter_agent = registry('adapter_loader').get_agent(adapter_name)
@inlineCallbacks
def _validate_update(self, device):
"""
Called before each update, it allows the blocking of the update
(by raising an exception), or even the augmentation of the incoming
data.
"""
self.log.debug('device-pre-update', device=device)
yield self._process_state_transitions(device, dry_run=True)
returnValue(device)
@inlineCallbacks
def _process_update(self, device):
"""
Called after the device object was updated (individually or part of
a transaction), and it is used to propagate the change down to the
adapter
"""
self.log.debug('device-post-update', device=device)
# first, process any potential state transition
yield self._process_state_transitions(device)
# finally, store this data as last data so we can see what changed
self.last_data = device
@inlineCallbacks
def _process_state_transitions(self, device, dry_run=False):
old_admin_state = getattr(self.last_data, 'admin_state',
AdminState.UNKNOWN)
new_admin_state = device.admin_state
self.log.debug('device-admin-states', old_state=old_admin_state,
new_state=new_admin_state, dry_run=dry_run)
transition_handler = self.admin_state_fsm.get(
(old_admin_state, new_admin_state), None)
if transition_handler is None:
self.log.debug('No Operation', old_state=old_admin_state,
new_state=new_admin_state, dry_run=dry_run)
pass # no-op
elif transition_handler is False:
raise InvalidStateTransition('{} -> {}'.format(
old_admin_state, new_admin_state))
else:
assert callable(transition_handler)
yield transition_handler(self, device, dry_run)
@inlineCallbacks
def _activate_device(self, device, dry_run=False):
self.log.debug('activate-device', device=device, dry_run=dry_run)
if not dry_run:
device.oper_status = OperStatus.ACTIVATING
self.update_device(device)
yield self.adapter_agent.adopt_device(device)
def update_device(self, device):
self.last_data = device # so that we don't propagate back
self.proxy.update('/', device)
if device.oper_status == OperStatus.ACTIVE and device.connect_status == ConnectStatus.REACHABLE:
self.log.info('replay-create-interfaces ', device=device.id)
self.core.xpon_agent.replay_interface(device.id)
def update_device_pm_config(self, device_pm_config, init=False):
self.callback_data = init# so that we don't push init data
self.pm_config_proxy.update('/', device_pm_config)
def _propagate_change(self, device, dry_run=False):
self.log.debug('propagate-change', device=device, dry_run=dry_run)
if device != self.last_data:
raise NotImplementedError()
else:
self.log.debug('no-op')
def _abandon_device(self, device, dry_run=False):
self.log.debug('abandon-device', device=device, dry_run=dry_run)
raise NotImplementedError()
def _delete_all_flows(self):
""" Delete all flows on the device """
try:
self.flows_proxy.update('/', Flows(items=[]))
self.groups_proxy.update('/', FlowGroups(items=[]))
except Exception, e:
self.exception('flow-delete-exception', e=e)
@inlineCallbacks
def _disable_device(self, device, dry_run=False):
try:
self.log.debug('disable-device', device=device, dry_run=dry_run)
if not dry_run:
# Remove all flows before disabling device
self._delete_all_flows()
yield self.adapter_agent.disable_device(device)
except Exception, e:
self.log.exception('error', e=e)
@inlineCallbacks
def _reenable_device(self, device, dry_run=False):
self.log.debug('reenable-device', device=device, dry_run=dry_run)
if not dry_run:
yield self.adapter_agent.reenable_device(device)
@inlineCallbacks
def _delete_device(self, device, dry_run=False):
self.log.debug('delete-device', device=device, dry_run=dry_run)
if not dry_run:
yield self.adapter_agent.delete_device(device)
admin_state_fsm = {
# Missing entries yield no-op
# False means invalid state change
(AdminState.UNKNOWN, AdminState.ENABLED): _activate_device,
(AdminState.PREPROVISIONED, AdminState.UNKNOWN): False,
(AdminState.PREPROVISIONED, AdminState.ENABLED): _activate_device,
(AdminState.PREPROVISIONED, AdminState.DOWNLOADING_IMAGE): False,
(AdminState.ENABLED, AdminState.UNKNOWN): False,
(AdminState.ENABLED, AdminState.ENABLED): _propagate_change,
(AdminState.ENABLED, AdminState.DISABLED): _disable_device,
(AdminState.ENABLED, AdminState.PREPROVISIONED): _abandon_device,
(AdminState.DISABLED, AdminState.UNKNOWN): False,
(AdminState.DISABLED, AdminState.PREPROVISIONED): _abandon_device,
(AdminState.DISABLED, AdminState.ENABLED): _reenable_device,
(AdminState.DISABLED, AdminState.DOWNLOADING_IMAGE): False,
(AdminState.DOWNLOADING_IMAGE, AdminState.DISABLED): False
}
## <======================= PM CONFIG UPDATE HANDLING ====================
#@inlineCallbacks
def _pm_config_updated(self, pm_configs):
self.log.debug('pm-config-updated', pm_configs=pm_configs,
callback_data=self.callback_data)
device_id = self.proxy.get('/').id
if not self.callback_data:
self.adapter_agent.update_adapter_pm_config(device_id, pm_configs)
self.callback_data = None
## <======================= FLOW TABLE UPDATE HANDLING ====================
@inlineCallbacks
def _flow_table_updated(self, flows):
self.log.debug('flow-table-updated',
logical_device_id=self.last_data.id, flows=flows)
# if device accepts bulk flow update, lets just call that
if self.device_type.accepts_bulk_flow_update:
groups = self.groups_proxy.get('/') # gather flow groups
yield self.adapter_agent.update_flows_bulk(
device=self.last_data,
flows=flows,
groups=groups)
# add ability to notify called when an flow update completes
# see https://jira.opencord.org/browse/CORD-839
elif self.device_type.accepts_add_remove_flow_updates:
raise NotImplementedError()
else:
raise NotImplementedError()
## <======================= GROUP TABLE UPDATE HANDLING ===================
@inlineCallbacks
def _group_table_updated(self, groups):
self.log.debug('group-table-updated',
logical_device_id=self.last_data.id,
flow_groups=groups)
# if device accepts bulk flow update, lets just call that
if self.device_type.accepts_bulk_flow_update:
flows = self.flows_proxy.get('/') # gather flows
yield self.adapter_agent.update_flows_bulk(
device=self.last_data,
flows=flows,
groups=groups)
# add ability to notify called when an group update completes
# see https://jira.opencord.org/browse/CORD-839
elif self.device_type.accepts_add_remove_flow_updates:
raise NotImplementedError()
else:
raise NotImplementedError()