rebase to the HEAD of master
Change-Id: I781aebffea383e2c327b81fe84711073c241471e
diff --git a/voltha/extensions/omci/onu_device_entry.py b/voltha/extensions/omci/onu_device_entry.py
index c73ea99..ca44c88 100644
--- a/voltha/extensions/omci/onu_device_entry.py
+++ b/voltha/extensions/omci/onu_device_entry.py
@@ -24,6 +24,7 @@
from voltha.extensions.omci.tasks.reboot_task import OmciRebootRequest, RebootFlags
from voltha.extensions.omci.tasks.omci_modify_request import OmciModifyRequest
from voltha.extensions.omci.omci_me import OntGFrame
+from voltha.extensions.omci.state_machines.image_agent import ImageAgent
from twisted.internet import reactor
from enum import IntEnum
@@ -114,6 +115,15 @@
alarm_synchronizer_info['tasks'],
alarm_db,
advertise_events=advertise)
+ # State machine of downloading image file from server
+ downloader_info = support_classes.get('image_downloader')
+ advertise = downloader_info['advertise-event']
+ # self._img_download_sm = downloader_info['state-machine'](self._omci_agent, device_id,
+ # downloader_info['tasks'],
+ # advertise_events=advertise)
+ self._image_agent = ImageAgent(self._omci_agent, device_id, downloader_info['state-machine'],
+ downloader_info['tasks'],
+ advertise_events=advertise)
except Exception as e:
self.log.exception('state-machine-create-failed', e=e)
raise
@@ -484,3 +494,15 @@
self.device_id,
flags=flags,
timeout=timeout))
+
+ def get_imagefile(self, local_name, local_dir, remote_url=None):
+ """
+ Return a Deferred that will be triggered if the file is locally availabe or downloaded sucessfully
+ """
+ self.log.info('start download from {}'.format(remote_url))
+
+ # for debug purpose, start runner here to queue downloading task
+ # self._runner.start()
+
+ return self._image_agent.get_image(local_name, local_dir, remote_url)
+
diff --git a/voltha/extensions/omci/openomci_agent.py b/voltha/extensions/omci/openomci_agent.py
index c7f3eca..8dfd4de 100644
--- a/voltha/extensions/omci/openomci_agent.py
+++ b/voltha/extensions/omci/openomci_agent.py
@@ -33,6 +33,8 @@
from voltha.extensions.omci.state_machines.performance_intervals import PerformanceIntervals
from voltha.extensions.omci.tasks.omci_create_pm_task import OmciCreatePMRequest
from voltha.extensions.omci.tasks.omci_delete_pm_task import OmciDeletePMRequest
+from voltha.extensions.omci.state_machines.image_agent import ImageDownloadeSTM
+from voltha.extensions.omci.tasks.file_download_task import FileDownloadTask
OpenOmciAgentDefaults = {
'mib-synchronizer': {
@@ -66,14 +68,21 @@
},
},
'alarm-synchronizer': {
- 'state-machine': AlarmSynchronizer, # Implements the Alarm sync state machine
- 'database': AlarmDbExternal, # For any State storage needs
- 'advertise-events': True, # Advertise events on OpenOMCI event bus
+ 'state-machine': AlarmSynchronizer, # Implements the Alarm sync state machine
+ 'database': AlarmDbExternal, # For any State storage needs
+ 'advertise-events': True, # Advertise events on OpenOMCI event bus
+ 'tasks': {
+ 'alarm-sync': AlarmSyncDataTask,
+ 'alarm-check': AlarmDataTask,
+ 'alarm-resync': AlarmResyncTask,
+ 'alarm-audit': AlarmDataTask
+ }
+ },
+ 'image_downloader': {
+ 'state-machine': ImageDownloadeSTM,
+ 'advertise-event': True,
'tasks': {
- 'alarm-sync': AlarmSyncDataTask,
- 'alarm-check': AlarmDataTask,
- 'alarm-resync': AlarmResyncTask,
- 'alarm-audit': AlarmDataTask
+ 'download-file': FileDownloadTask,
}
}
}
diff --git a/voltha/extensions/omci/state_machines/image_agent.py b/voltha/extensions/omci/state_machines/image_agent.py
new file mode 100755
index 0000000..ff0aa79
--- /dev/null
+++ b/voltha/extensions/omci/state_machines/image_agent.py
@@ -0,0 +1,213 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import structlog
+from datetime import datetime, timedelta
+from transitions import Machine
+from twisted.internet import reactor, defer
+from common.event_bus import EventBusClient
+from voltha.protos.voltha_pb2 import ImageDownload
+
+class ImageDownloadeSTM(object):
+ DEFAULT_STATES = ['disabled', 'downloading', 'validating', 'done']
+ DEFAULT_TRANSITIONS = [
+ {'trigger': 'start', 'source': 'disabled', 'dest': 'downloading'},
+ {'trigger': 'stop', 'source': ['downloading', 'validating', 'done'], 'dest': 'disabled'},
+ {'trigger': 'dw_success', 'source': 'downloading', 'dest': 'validating'},
+ {'trigger': 'dw_fail', 'source': 'downloading', 'dest': 'done'},
+ {'trigger': 'validate_success', 'source': 'validating', 'dest': 'done'},
+ ]
+ DEFAULT_TIMEOUT_RETRY = 10 # Seconds to delay after task failure/timeout
+
+ def __init__(self, omci_agent, dev_id, local_name, local_dir, remote_url, download_task,
+ states=DEFAULT_STATES,
+ transitions=DEFAULT_TRANSITIONS,
+ initial_state='disabled',
+ timeout_delay=DEFAULT_TIMEOUT_RETRY):
+ self.log = structlog.get_logger(device_id=dev_id)
+ self._agent = omci_agent
+ self._imgdw = ImageDownload()
+ self._imgdw.name = local_name
+ self._imgdw.id = dev_id
+ self._imgdw.url = remote_url
+ self._imgdw.local_dir = local_dir
+ self._imgdw.state = ImageDownload.DOWNLOAD_UNKNOWN # voltha_pb2
+
+ self._download_task = download_task
+ self._timeout_delay = timeout_delay
+
+ self._current_task = None
+ self._task_deferred = None
+ self._ret_deferred = None
+ self._timeout_deferred = None
+
+ self.machine = Machine(model=self, states=states,
+ transitions=transitions,
+ initial=initial_state,
+ queued=True,
+ name='{}-{}'.format(self.__class__.__name__, self._imgdw.id))
+ @property
+ def name(self):
+ return self._imgdw.name
+
+ @property
+ def download_state(self):
+ return self._imgdw.state
+
+ def reset(self):
+ """
+ Reset all the state machine to intial state
+ It is used to clear failed result in last downloading
+ """
+ if self._current_task is not None:
+ self._current_task.stop()
+
+ self._cancel_deferred()
+
+ if self._ret_deferred is not None:
+ self._ret_deferred.cancel()
+ self._ret_deferred = None
+
+ self.stop()
+ self._imgdw.state = ImageDownload.DOWNLOAD_UNKNOWN
+
+
+ def get_file(self):
+ """
+ return a defer.Deferred object
+ Caller will register a callback to the Deferred to get notified once the image is available
+ """
+ self.log.debug('Get to work: {}'.format(self._imgdw))
+ if self._ret_deferred is None or self._ret_deferred.called:
+ self._ret_deferred = defer.Deferred()
+
+ if self._imgdw.state == ImageDownload.DOWNLOAD_SUCCEEDED:
+ self.log.debug('Image Available')
+ reactor.callLater(0, self._ret_deferred.callback, self)
+ elif self._imgdw.state == ImageDownload.DOWNLOAD_FAILED or self._imgdw.state == ImageDownload.DOWNLOAD_UNSUPPORTED:
+ self.log.debug('Image not exist')
+ reactor.callLater(0, self._ret_deferred.errback, self)
+ elif self._imgdw.state == ImageDownload.DOWNLOAD_UNKNOWN:
+ self.log.debug('Start Image STM')
+ self._imgdw.state == ImageDownload.DOWNLOAD_STARTED
+ reactor.callLater(0, self.start)
+ else:
+ pass
+
+ return self._ret_deferred
+
+ def _cancel_deferred(self):
+ d1, self._timeout_deferred = self._timeout_deferred, None
+ d2, self._task_deferred = self._task_deferred, None
+
+ for d in [d1, d1]:
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except:
+ pass
+
+ def timeout(self):
+ self.log.debug('Image Download Timeout {}'.format(self._imgdw));
+ if self._task_deferred is not None and not self._task_deferred.called:
+ self._task_deferred.cancel()
+ self._current_task = None
+ self.dw_fail()
+
+
+ def on_enter_disabled(self):
+ self._imgdw.state == ImageDownload.DOWNLOAD_UNKNOWN
+
+ def on_enter_downloading(self):
+ def success(results):
+ self.log.debug('image-download-success', results=results)
+ self._imgdw.state = ImageDownload.DOWNLOAD_SUCCEEDED
+ self._current_task = None
+ self.dw_success()
+
+ def failure(reason):
+ self.log.info('image-download-failure', reason=reason)
+ self._imgdw.state = ImageDownload.FAILED
+ self._current_task = None
+ self.dw_fail()
+
+ self._device = self._agent.get_device(self._imgdw.id)
+ self._current_task = self._download_task(self._agent, self._imgdw.id, self._imgdw.url,
+ '{}/{}'.format(self._imgdw.local_dir, self._imgdw.name))
+
+ self._task_deferred = self._device.task_runner.queue_task(self._current_task)
+ self._task_deferred.addCallbacks(success, failure)
+
+ if self._timeout_delay > 0:
+ self._timeout_deferred = reactor.callLater(self._timeout_delay, self.timeout)
+
+ def on_enter_validating(self):
+ self.validate_success()
+
+ def on_enter_done(self):
+ self._cancel_deferred()
+
+ if self._imgdw.state == ImageDownload.DOWNLOAD_SUCCEEDED:
+ reactor.callLater(0, self._ret_deferred.callback, self)
+ else: # failed
+ reactor.callLater(0, self._ret_deferred.errback, self)
+
+class ImageAgent(object):
+ """
+ Image Agent supports multiple state machines running at the same time:
+ """
+ def __init__(self, omci_agent, dev_id, stm_cls, img_tasks, advertise_events=True):
+ """
+ Class initialization
+
+ :param omci_agent: (OpenOmciAgent) Agent
+ :param dev_id : (str) ONU Device ID
+ :param stm_cls : (ImageDownloadeSTM) Image download state machine class
+ :param img_tasks : (FileDownloadTask) file download task
+ """
+
+ self.log = structlog.get_logger(device_id=dev_id)
+
+ self._omci_agent = omci_agent
+ self._device_id = dev_id
+ self._state_machine_cls = stm_cls
+ self._download_task = img_tasks['download-file']
+ self._advertise_events = advertise_events
+ self._images = dict()
+
+ def get_image(self, name, local_dir, remote_url, timeout_delay=ImageDownloadeSTM.DEFAULT_TIMEOUT_RETRY):
+
+ """
+ Get named image from the agent
+
+ :param name : (str) filename or image name
+ :param local_dir : (str) local directory where the image is saved. if image not exist, start downloading
+ :param remote_url : (str) the URL to download image
+ :param timeout_delay : (number) timeout for download task
+ :
+ :Return a Deferred that will be triggered if the file is locally availabe or downloaded sucessfully
+ : Caller will register callback and errback to the returned defer to get notified
+ """
+ if name not in self._images.keys():
+ self._images[name] = ImageDownloadeSTM(self._omci_agent, self._device_id, name,
+ local_dir, remote_url, self._download_task,
+ timeout_delay=timeout_delay)
+ elif self._images[name].download_state == ImageDownload.DOWNLOAD_FAILED:
+ self._images[name].reset()
+
+ d = self._images[name].get_file()
+ # reactor.callLater(0, self._images[name].start)
+ return d
+
diff --git a/voltha/extensions/omci/tasks/file_download_task.py b/voltha/extensions/omci/tasks/file_download_task.py
new file mode 100755
index 0000000..7d8719d
--- /dev/null
+++ b/voltha/extensions/omci/tasks/file_download_task.py
@@ -0,0 +1,55 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from task import Task
+from twisted.internet.defer import inlineCallbacks, TimeoutError, failure, AlreadyCalledError
+from twisted.internet import reactor
+from voltha.extensions.omci.omci_defs import ReasonCodes
+import requests
+import os
+
+class FileDownloadTask(Task):
+ task_priority = 250
+ name = "Image File Download Task"
+
+ def __init__(self, omci_agent, device_id, url, local_path):
+ super(FileDownloadTask, self).__init__(FileDownloadTask.name, omci_agent, device_id)
+ self.url = url
+ self.local_path = local_path
+ # self.log.debug('{} running'.format(FileDownloadTask.name))
+
+ def start(self):
+ self.log.debug('{} running'.format(FileDownloadTask.name))
+ # reactor.callLater(1, self.deferred.callback, 'device {} success downloaded {} '.format(self.device_id, self.url))
+ try:
+ # local_filename = url.split('/')[-1]
+ dir_name = os.path.dirname(self.local_path)
+ if not os.path.exists(dir_name):
+ os.makedirs(dir_name)
+
+ r = requests.get(self.url, stream=True)
+ with open(self.local_path, 'wb') as f:
+ for chunk in r.iter_content(chunk_size=1024):
+ if chunk: # filter out keep-alive new chunks
+ f.write(chunk)
+ self.deferred.callback('device {} success downloaded {} '.format(self.device_id, self.url))
+ except Exception as e:
+ #self.deferred.errback(KeyError('device {} failed downloaded {} '.format(self.device_id, self.url)))
+ self.deferred.errback(failure.Failure(e))
+
+ def stop(self):
+ self.cancel_deferred()
+ super(FileDownloadTask, self).stop()
+