Scott Baker | bba67b6 | 2019-01-28 17:38:21 -0800 | [diff] [blame] | 1 | # Copyright 2017-present Open Networking Foundation |
| 2 | # |
| 3 | # Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | # you may not use this file except in compliance with the License. |
| 5 | # You may obtain a copy of the License at |
| 6 | # |
| 7 | # http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | # |
| 9 | # Unless required by applicable law or agreed to in writing, software |
| 10 | # distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | # See the License for the specific language governing permissions and |
| 13 | # limitations under the License. |
| 14 | |
| 15 | from __future__ import print_function |
| 16 | import os |
| 17 | import inspect |
| 18 | import imp |
| 19 | import sys |
| 20 | import threading |
| 21 | import time |
| 22 | from xossynchronizer.steps.syncstep import SyncStep |
| 23 | from xossynchronizer.event_loop import XOSObserver |
| 24 | from xossynchronizer.model_policy_loop import XOSPolicyEngine |
| 25 | from xossynchronizer.event_engine import XOSEventEngine |
| 26 | from xossynchronizer.pull_step_engine import XOSPullStepEngine |
Scott Baker | bba67b6 | 2019-01-28 17:38:21 -0800 | [diff] [blame] | 27 | |
| 28 | from xosconfig import Config |
| 29 | from multistructlog import create_logger |
| 30 | |
| 31 | log = create_logger(Config().get("logging")) |
| 32 | |
| 33 | |
| 34 | class Backend: |
Scott Baker | c2fddaa | 2019-01-30 15:45:03 -0800 | [diff] [blame] | 35 | def __init__(self, model_accessor, log=log): |
| 36 | self.model_accessor = model_accessor |
Scott Baker | bba67b6 | 2019-01-28 17:38:21 -0800 | [diff] [blame] | 37 | self.log = log |
| 38 | |
| 39 | def load_sync_step_modules(self, step_dir): |
| 40 | sync_steps = [] |
| 41 | |
| 42 | self.log.info("Loading sync steps", step_dir=step_dir) |
| 43 | |
| 44 | for fn in os.listdir(step_dir): |
| 45 | pathname = os.path.join(step_dir, fn) |
| 46 | if ( |
| 47 | os.path.isfile(pathname) |
| 48 | and fn.endswith(".py") |
| 49 | and (fn != "__init__.py") |
| 50 | and (not fn.startswith("test")) |
| 51 | ): |
| 52 | |
| 53 | # we need to extend the path to load modules in the step_dir |
| 54 | sys_path_save = sys.path |
| 55 | sys.path.append(step_dir) |
| 56 | module = imp.load_source(fn[:-3], pathname) |
| 57 | |
| 58 | self.log.debug("Loaded file: %s", pathname) |
| 59 | |
| 60 | # reset the original path |
| 61 | sys.path = sys_path_save |
| 62 | |
| 63 | for classname in dir(module): |
| 64 | c = getattr(module, classname, None) |
| 65 | |
| 66 | # if classname.startswith("Sync"): |
| 67 | # print classname, c, inspect.isclass(c), issubclass(c, SyncStep), hasattr(c,"provides") |
| 68 | |
| 69 | # make sure 'c' is a descendent of SyncStep and has a |
| 70 | # provides field (this eliminates the abstract base classes |
| 71 | # since they don't have a provides) |
| 72 | |
| 73 | if inspect.isclass(c): |
| 74 | bases = inspect.getmro(c) |
| 75 | base_names = [b.__name__ for b in bases] |
| 76 | if ( |
| 77 | ("SyncStep" in base_names) |
| 78 | and (hasattr(c, "provides") or hasattr(c, "observes")) |
| 79 | and (c not in sync_steps) |
| 80 | ): |
| 81 | sync_steps.append(c) |
| 82 | |
| 83 | self.log.info("Loaded sync steps", steps=sync_steps) |
| 84 | |
| 85 | return sync_steps |
| 86 | |
| 87 | def run(self): |
| 88 | observer_thread = None |
| 89 | model_policy_thread = None |
| 90 | event_engine = None |
| 91 | |
| 92 | steps_dir = Config.get("steps_dir") |
| 93 | if steps_dir: |
| 94 | sync_steps = [] |
| 95 | |
| 96 | # load sync_steps |
| 97 | if steps_dir: |
| 98 | sync_steps = self.load_sync_step_modules(steps_dir) |
| 99 | |
| 100 | # if we have at least one sync_step |
| 101 | if len(sync_steps) > 0: |
| 102 | # start the observer |
Scott Baker | c2fddaa | 2019-01-30 15:45:03 -0800 | [diff] [blame] | 103 | self.log.info("Starting XOSObserver", sync_steps=sync_steps, model_accessor=self.model_accessor) |
| 104 | observer = XOSObserver(sync_steps, self.model_accessor, self.log) |
Scott Baker | bba67b6 | 2019-01-28 17:38:21 -0800 | [diff] [blame] | 105 | observer_thread = threading.Thread( |
| 106 | target=observer.run, name="synchronizer" |
| 107 | ) |
| 108 | observer_thread.start() |
| 109 | |
| 110 | else: |
| 111 | self.log.info("Skipping observer thread due to no steps dir.") |
| 112 | |
| 113 | pull_steps_dir = Config.get("pull_steps_dir") |
| 114 | if pull_steps_dir: |
| 115 | self.log.info("Starting XOSPullStepEngine", pull_steps_dir=pull_steps_dir) |
Scott Baker | c2fddaa | 2019-01-30 15:45:03 -0800 | [diff] [blame] | 116 | pull_steps_engine = XOSPullStepEngine(model_accessor=self.model_accessor) |
Scott Baker | bba67b6 | 2019-01-28 17:38:21 -0800 | [diff] [blame] | 117 | pull_steps_engine.load_pull_step_modules(pull_steps_dir) |
| 118 | pull_steps_thread = threading.Thread( |
| 119 | target=pull_steps_engine.start, name="pull_step_engine" |
| 120 | ) |
| 121 | pull_steps_thread.start() |
| 122 | else: |
| 123 | self.log.info("Skipping pull step engine due to no pull_steps_dir dir.") |
| 124 | |
| 125 | event_steps_dir = Config.get("event_steps_dir") |
| 126 | if event_steps_dir: |
| 127 | self.log.info("Starting XOSEventEngine", event_steps_dir=event_steps_dir) |
Scott Baker | c2fddaa | 2019-01-30 15:45:03 -0800 | [diff] [blame] | 128 | event_engine = XOSEventEngine(model_accessor=self.model_accessor, log=self.log) |
Scott Baker | bba67b6 | 2019-01-28 17:38:21 -0800 | [diff] [blame] | 129 | event_engine.load_event_step_modules(event_steps_dir) |
| 130 | event_engine.start() |
| 131 | else: |
| 132 | self.log.info("Skipping event engine due to no event_steps dir.") |
| 133 | |
| 134 | # start model policies thread |
| 135 | policies_dir = Config.get("model_policies_dir") |
| 136 | if policies_dir: |
Scott Baker | c2fddaa | 2019-01-30 15:45:03 -0800 | [diff] [blame] | 137 | policy_engine = XOSPolicyEngine(policies_dir=policies_dir, model_accessor=self.model_accessor, log=self.log) |
Scott Baker | bba67b6 | 2019-01-28 17:38:21 -0800 | [diff] [blame] | 138 | model_policy_thread = threading.Thread( |
| 139 | target=policy_engine.run, name="policy_engine" |
| 140 | ) |
| 141 | model_policy_thread.is_policy_thread = True |
| 142 | model_policy_thread.start() |
| 143 | else: |
| 144 | self.log.info( |
| 145 | "Skipping model policies thread due to no model_policies dir." |
| 146 | ) |
| 147 | |
| 148 | if (not observer_thread) and (not model_policy_thread) and (not event_engine): |
| 149 | self.log.info( |
| 150 | "No sync steps, no policies, and no event steps. Synchronizer exiting." |
| 151 | ) |
| 152 | # the caller will exit with status 0 |
| 153 | return |
| 154 | |
| 155 | while True: |
| 156 | try: |
| 157 | time.sleep(1000) |
| 158 | except KeyboardInterrupt: |
| 159 | print("exiting due to keyboard interrupt") |
| 160 | # TODO: See about setting the threads as daemons |
| 161 | if observer_thread: |
| 162 | observer_thread._Thread__stop() |
| 163 | if model_policy_thread: |
| 164 | model_policy_thread._Thread__stop() |
| 165 | sys.exit(1) |