Matteo Scandolo | d2044a4 | 2017-08-07 16:08:28 -0700 | [diff] [blame^] | 1 | |
| 2 | # Copyright 2017-present Open Networking Foundation |
| 3 | # |
| 4 | # Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | # you may not use this file except in compliance with the License. |
| 6 | # You may obtain a copy of the License at |
| 7 | # |
| 8 | # http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | # |
| 10 | # Unless required by applicable law or agreed to in writing, software |
| 11 | # distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | # See the License for the specific language governing permissions and |
| 14 | # limitations under the License. |
| 15 | |
| 16 | |
Scott Baker | 1b3b37b | 2017-02-21 22:53:33 -0800 | [diff] [blame] | 17 | import os |
| 18 | import inspect |
| 19 | import imp |
| 20 | import sys |
| 21 | import threading |
| 22 | import time |
| 23 | from synchronizers.new_base.syncstep import SyncStep |
| 24 | from synchronizers.new_base.event_loop import XOSObserver |
Scott Baker | e0cc832 | 2017-05-30 14:38:43 -0700 | [diff] [blame] | 25 | from synchronizers.new_base.model_policy_loop import XOSPolicyEngine |
Scott Baker | 1b3b37b | 2017-02-21 22:53:33 -0800 | [diff] [blame] | 26 | from synchronizers.new_base.modelaccessor import * |
| 27 | from xos.logger import Logger, logging |
Matteo Scandolo | 1879ce7 | 2017-05-30 15:45:26 -0700 | [diff] [blame] | 28 | from xosconfig import Config |
Scott Baker | 1b3b37b | 2017-02-21 22:53:33 -0800 | [diff] [blame] | 29 | |
Matteo Scandolo | 1879ce7 | 2017-05-30 15:45:26 -0700 | [diff] [blame] | 30 | watchers_enabled = Config.get("enable_watchers") |
Scott Baker | 1b3b37b | 2017-02-21 22:53:33 -0800 | [diff] [blame] | 31 | |
| 32 | if (watchers_enabled): |
| 33 | from synchronizers.new_base.watchers import XOSWatcher |
| 34 | |
| 35 | logger = Logger(level=logging.INFO) |
| 36 | |
| 37 | class Backend: |
| 38 | |
| 39 | def __init__(self): |
| 40 | pass |
| 41 | |
Scott Baker | 5ac1960 | 2017-06-02 11:02:01 -0700 | [diff] [blame] | 42 | def load_sync_step_modules(self, step_dir): |
Scott Baker | 1b3b37b | 2017-02-21 22:53:33 -0800 | [diff] [blame] | 43 | sync_steps = [] |
Scott Baker | 1b3b37b | 2017-02-21 22:53:33 -0800 | [diff] [blame] | 44 | |
| 45 | logger.info("Loading sync steps from %s" % step_dir) |
| 46 | |
| 47 | for fn in os.listdir(step_dir): |
| 48 | pathname = os.path.join(step_dir,fn) |
| 49 | if os.path.isfile(pathname) and fn.endswith(".py") and (fn!="__init__.py"): |
| 50 | module = imp.load_source(fn[:-3],pathname) |
| 51 | for classname in dir(module): |
| 52 | c = getattr(module, classname, None) |
| 53 | |
| 54 | if classname.startswith("Sync"): |
| 55 | print classname, c, inspect.isclass(c), issubclass(c, SyncStep), hasattr(c,"provides") |
| 56 | |
| 57 | # make sure 'c' is a descendent of SyncStep and has a |
| 58 | # provides field (this eliminates the abstract base classes |
| 59 | # since they don't have a provides) |
| 60 | |
| 61 | if inspect.isclass(c) and issubclass(c, SyncStep) and hasattr(c,"provides") and (c not in sync_steps): |
| 62 | sync_steps.append(c) |
| 63 | |
| 64 | logger.info("Loaded %s sync steps" % len(sync_steps)) |
| 65 | |
| 66 | return sync_steps |
| 67 | |
| 68 | def run(self): |
Scott Baker | 5ac1960 | 2017-06-02 11:02:01 -0700 | [diff] [blame] | 69 | observer_thread = None |
Scott Baker | 1b3b37b | 2017-02-21 22:53:33 -0800 | [diff] [blame] | 70 | watcher_thread = None |
| 71 | model_policy_thread = None |
| 72 | |
| 73 | model_accessor.update_diag(sync_start=time.time(), backend_status="0 - Synchronizer Start") |
| 74 | |
Matteo Scandolo | 1879ce7 | 2017-05-30 15:45:26 -0700 | [diff] [blame] | 75 | steps_dir = Config.get("steps_dir") |
Scott Baker | 5ac1960 | 2017-06-02 11:02:01 -0700 | [diff] [blame] | 76 | if steps_dir: |
| 77 | sync_steps = self.load_sync_step_modules(steps_dir) |
| 78 | if sync_steps: |
| 79 | # start the observer |
| 80 | observer = XOSObserver(sync_steps) |
| 81 | observer_thread = threading.Thread(target=observer.run,name='synchronizer') |
| 82 | observer_thread.start() |
Scott Baker | 1b3b37b | 2017-02-21 22:53:33 -0800 | [diff] [blame] | 83 | |
Scott Baker | 5ac1960 | 2017-06-02 11:02:01 -0700 | [diff] [blame] | 84 | # start the watcher thread |
| 85 | if (watchers_enabled): |
| 86 | watcher = XOSWatcher(sync_steps) |
| 87 | watcher_thread = threading.Thread(target=watcher.run,name='watcher') |
| 88 | watcher_thread.start() |
| 89 | else: |
| 90 | logger.info("Skipping observer and watcher threads due to no steps dir.") |
Scott Baker | 1b3b37b | 2017-02-21 22:53:33 -0800 | [diff] [blame] | 91 | |
Scott Baker | bae9d84 | 2017-03-21 10:44:10 -0700 | [diff] [blame] | 92 | # start model policies thread |
Matteo Scandolo | 1879ce7 | 2017-05-30 15:45:26 -0700 | [diff] [blame] | 93 | policies_dir = Config.get("model_policies_dir") |
Scott Baker | bae9d84 | 2017-03-21 10:44:10 -0700 | [diff] [blame] | 94 | if policies_dir: |
Scott Baker | e0cc832 | 2017-05-30 14:38:43 -0700 | [diff] [blame] | 95 | policy_engine = XOSPolicyEngine(policies_dir=policies_dir) |
| 96 | model_policy_thread = threading.Thread(target=policy_engine.run, name="policy_engine") |
Scott Baker | bae9d84 | 2017-03-21 10:44:10 -0700 | [diff] [blame] | 97 | model_policy_thread.start() |
| 98 | else: |
Scott Baker | bae9d84 | 2017-03-21 10:44:10 -0700 | [diff] [blame] | 99 | logger.info("Skipping model policies thread due to no model_policies dir.") |
| 100 | |
Scott Baker | 1b3b37b | 2017-02-21 22:53:33 -0800 | [diff] [blame] | 101 | while True: |
| 102 | try: |
| 103 | time.sleep(1000) |
| 104 | except KeyboardInterrupt: |
| 105 | print "exiting due to keyboard interrupt" |
| 106 | # TODO: See about setting the threads as daemons |
Scott Baker | 5ac1960 | 2017-06-02 11:02:01 -0700 | [diff] [blame] | 107 | if observer_thread: |
| 108 | observer_thread._Thread__stop() |
Scott Baker | 1b3b37b | 2017-02-21 22:53:33 -0800 | [diff] [blame] | 109 | if watcher_thread: |
| 110 | watcher_thread._Thread__stop() |
| 111 | if model_policy_thread: |
| 112 | model_policy_thread._Thread__stop() |
| 113 | sys.exit(1) |
| 114 | |