blob: 96e412f9ea92dae9ae79615ee1ee1c3688837ad0 [file] [log] [blame]
Scott Bakerbba67b62019-01-28 17:38:21 -08001# Copyright 2017-present Open Networking Foundation
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
Zack Williams5c2ea232019-01-30 15:23:01 -070015from __future__ import absolute_import, print_function
16
Scott Bakerbba67b62019-01-28 17:38:21 -080017import imp
Zack Williams5c2ea232019-01-30 15:23:01 -070018import inspect
19import os
Scott Bakerbba67b62019-01-28 17:38:21 -080020import sys
21import threading
22import time
Zack Williams5c2ea232019-01-30 15:23:01 -070023
24from multistructlog import create_logger
25from xosconfig import Config
26from xossynchronizer.event_engine import XOSEventEngine
Scott Bakerbba67b62019-01-28 17:38:21 -080027from xossynchronizer.event_loop import XOSObserver
28from xossynchronizer.model_policy_loop import XOSPolicyEngine
Scott Bakerbba67b62019-01-28 17:38:21 -080029from xossynchronizer.pull_step_engine import XOSPullStepEngine
Scott Bakerbba67b62019-01-28 17:38:21 -080030
Scott Bakerbba67b62019-01-28 17:38:21 -080031log = create_logger(Config().get("logging"))
32
33
34class Backend:
Scott Bakerc2fddaa2019-01-30 15:45:03 -080035 def __init__(self, model_accessor, log=log):
36 self.model_accessor = model_accessor
Scott Bakerbba67b62019-01-28 17:38:21 -080037 self.log = log
38
39 def load_sync_step_modules(self, step_dir):
40 sync_steps = []
41
42 self.log.info("Loading sync steps", step_dir=step_dir)
43
44 for fn in os.listdir(step_dir):
45 pathname = os.path.join(step_dir, fn)
46 if (
47 os.path.isfile(pathname)
48 and fn.endswith(".py")
49 and (fn != "__init__.py")
50 and (not fn.startswith("test"))
51 ):
52
53 # we need to extend the path to load modules in the step_dir
54 sys_path_save = sys.path
55 sys.path.append(step_dir)
56 module = imp.load_source(fn[:-3], pathname)
57
58 self.log.debug("Loaded file: %s", pathname)
59
60 # reset the original path
61 sys.path = sys_path_save
62
63 for classname in dir(module):
64 c = getattr(module, classname, None)
65
66 # if classname.startswith("Sync"):
67 # print classname, c, inspect.isclass(c), issubclass(c, SyncStep), hasattr(c,"provides")
68
69 # make sure 'c' is a descendent of SyncStep and has a
70 # provides field (this eliminates the abstract base classes
71 # since they don't have a provides)
72
73 if inspect.isclass(c):
74 bases = inspect.getmro(c)
75 base_names = [b.__name__ for b in bases]
76 if (
77 ("SyncStep" in base_names)
78 and (hasattr(c, "provides") or hasattr(c, "observes"))
79 and (c not in sync_steps)
80 ):
81 sync_steps.append(c)
82
83 self.log.info("Loaded sync steps", steps=sync_steps)
84
85 return sync_steps
86
87 def run(self):
88 observer_thread = None
89 model_policy_thread = None
90 event_engine = None
91
92 steps_dir = Config.get("steps_dir")
93 if steps_dir:
94 sync_steps = []
95
96 # load sync_steps
97 if steps_dir:
98 sync_steps = self.load_sync_step_modules(steps_dir)
99
100 # if we have at least one sync_step
101 if len(sync_steps) > 0:
102 # start the observer
Zack Williams5c2ea232019-01-30 15:23:01 -0700103 self.log.info(
104 "Starting XOSObserver",
105 sync_steps=sync_steps,
106 model_accessor=self.model_accessor,
107 )
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800108 observer = XOSObserver(sync_steps, self.model_accessor, self.log)
Scott Bakerbba67b62019-01-28 17:38:21 -0800109 observer_thread = threading.Thread(
110 target=observer.run, name="synchronizer"
111 )
112 observer_thread.start()
113
114 else:
115 self.log.info("Skipping observer thread due to no steps dir.")
116
117 pull_steps_dir = Config.get("pull_steps_dir")
Scott Baker7ff8ad92019-02-15 17:02:41 -0800118 if not pull_steps_dir:
119 self.log.info("Skipping pull step engine due to no pull_steps_dir dir.")
120 elif Config.get("desired_state") == "unload":
121 self.log.info("Skipping pull steps engine due to synchronizer unloading.")
122 else:
Scott Bakerbba67b62019-01-28 17:38:21 -0800123 self.log.info("Starting XOSPullStepEngine", pull_steps_dir=pull_steps_dir)
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800124 pull_steps_engine = XOSPullStepEngine(model_accessor=self.model_accessor)
Scott Bakerbba67b62019-01-28 17:38:21 -0800125 pull_steps_engine.load_pull_step_modules(pull_steps_dir)
126 pull_steps_thread = threading.Thread(
127 target=pull_steps_engine.start, name="pull_step_engine"
128 )
129 pull_steps_thread.start()
Scott Bakerbba67b62019-01-28 17:38:21 -0800130
131 event_steps_dir = Config.get("event_steps_dir")
Scott Baker7ff8ad92019-02-15 17:02:41 -0800132 if not event_steps_dir:
133 self.log.info("Skipping event engine due to no event_steps dir.")
134 elif Config.get("desired_state") == "unload":
135 self.log.info("Skipping event engine due to synchronizer unloading.")
136 else:
Scott Bakerbba67b62019-01-28 17:38:21 -0800137 self.log.info("Starting XOSEventEngine", event_steps_dir=event_steps_dir)
Zack Williams5c2ea232019-01-30 15:23:01 -0700138 event_engine = XOSEventEngine(
139 model_accessor=self.model_accessor, log=self.log
140 )
Scott Bakerbba67b62019-01-28 17:38:21 -0800141 event_engine.load_event_step_modules(event_steps_dir)
142 event_engine.start()
Scott Bakerbba67b62019-01-28 17:38:21 -0800143
144 # start model policies thread
145 policies_dir = Config.get("model_policies_dir")
146 if policies_dir:
Zack Williams5c2ea232019-01-30 15:23:01 -0700147 policy_engine = XOSPolicyEngine(
148 policies_dir=policies_dir,
149 model_accessor=self.model_accessor,
150 log=self.log,
151 )
Scott Bakerbba67b62019-01-28 17:38:21 -0800152 model_policy_thread = threading.Thread(
153 target=policy_engine.run, name="policy_engine"
154 )
155 model_policy_thread.is_policy_thread = True
156 model_policy_thread.start()
157 else:
158 self.log.info(
159 "Skipping model policies thread due to no model_policies dir."
160 )
161
162 if (not observer_thread) and (not model_policy_thread) and (not event_engine):
163 self.log.info(
164 "No sync steps, no policies, and no event steps. Synchronizer exiting."
165 )
166 # the caller will exit with status 0
167 return
168
169 while True:
170 try:
171 time.sleep(1000)
172 except KeyboardInterrupt:
173 print("exiting due to keyboard interrupt")
174 # TODO: See about setting the threads as daemons
175 if observer_thread:
176 observer_thread._Thread__stop()
177 if model_policy_thread:
178 model_policy_thread._Thread__stop()
179 sys.exit(1)