blob: 63da4279b037ec8c1507a36d6d9a071ae1091d46 [file] [log] [blame]
#
# Copyright 2017 the original author or authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from task import Task
from twisted.internet.defer import inlineCallbacks, TimeoutError, failure, AlreadyCalledError
from twisted.internet import reactor
from voltha.extensions.omci.omci_defs import ReasonCodes
import requests
import os
import time
class FileDownloadTask(Task):
name = "Image File Download Task"
CHUNK_SIZE = 1024
def __init__(self, omci_agent, img_dnld, clock= None): #device_id, url, local_path)
super(FileDownloadTask, self).__init__(FileDownloadTask.name, omci_agent, img_dnld.id,
exclusive=False,
watchdog_timeout=45)
# self.url = url
# self.local_path = local_path
self._image_download = img_dnld
self.reactor = clock if clock is not None else reactor
self._local_deferred = None
# self._request = None
# self._file = None
# self.log.debug('{} running'.format(FileDownloadTask.name))
# def __save_data(self):
# chunk = self._request.iter_content(chunk_size=FileDownloadTask.CHUNK_SIZE)
# if len(chunk) == 0:
# self._file.close()
# self.deferred.callback(self._image_download)
# else:
# self._file.write(chunk)
# self._image_download.downloaded_bytes += len(chunk)
# self.reactor.callLater(0, self.__save_data)
@inlineCallbacks
def perform_download_data(self):
try:
r = requests.get(self._image_download.url, stream=True)
with open(self._image_download.local_dir + '/' + self._image_download.name, 'wb') as f:
for chunk in r.iter_content(chunk_size=FileDownloadTask.CHUNK_SIZE):
self.strobe_watchdog()
if chunk: # filter out keep-alive new chunks
yield f.write(chunk)
self._image_download.file_size += len(chunk)
# yield time.sleep(1)
self.deferred.callback(self._image_download)
except Exception as e:
self.deferred.errback(failure.Failure(e))
def start(self):
super(FileDownloadTask, self).start()
if not os.path.exists(self._image_download.local_dir):
os.makedirs(self._image_download.local_dir)
self.strobe_watchdog()
self._image_download.file_size = 0
self._local_deferred = self.reactor.callLater(0, self.perform_download_data)
# try:
# if not os.path.exists(self._image_download.local_dir):
# os.makedirs(self._image_download.local_dir)
# self.strobe_watchdog()
# self._image_download.downloaded_bytes = 0
# self.reactor.callLater(0, self.perform_download_data)
# self._request = requests.get(self._image_download.url, stream=True)
# with open(self._image_download.local_dir + '/' + self._image_download.name, 'wb') as f:
# for chunk in r.iter_content(chunk_size=FileDownloadTask.CHUNK_SIZE):
# self.strobe_watchdog()
# if chunk: # filter out keep-alive new chunks
# f.write(chunk)
# self._image_download.downloaded_bytes += len(chunk)
# self.deferred.callback(self._image_download)
# except Exception as e:
# self.deferred.errback(failure.Failure(e))
# def stop(self):
# # self.cancel_deferred()
# super(FileDownloadTask, self).stop()
def cancel_deferred(self):
self.log.debug('FileDownloadTask cancel_deferred')
super(FileDownloadTask, self).cancel_deferred()
d, self._local_deferred = self._local_deferred, None
try:
if d is not None and not d.called:
d.cancel()
except:
pass