blob: 3f4732de532fe2f41ac92decd98f9d12a7ad8677 [file] [log] [blame]
Scott Bakerbba67b62019-01-28 17:38:21 -08001# Copyright 2017-present Open Networking Foundation
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import imp
16import inspect
17import os
18import threading
19import time
20from xosconfig import Config
21from multistructlog import create_logger
22
23log = create_logger(Config().get("logging"))
24
25
26class XOSPullStepScheduler:
27 """ XOSPullStepThread
28
29 A Thread for servicing pull steps. There is one event_step associated with one XOSPullStepThread.
30 The thread's pull_records() function is called for every five seconds.
31 """
32
33 def __init__(self, steps, *args, **kwargs):
34 self.steps = steps
35
36 def run(self):
37 while True:
38 time.sleep(5)
39 self.run_once()
40
41 def run_once(self):
42 log.trace("Starting pull steps", steps=self.steps)
43
44 threads = []
45 for step in self.steps:
46 thread = threading.Thread(target=step().pull_records, name="pull_step")
47 threads.append(thread)
48
49 for t in threads:
50 t.start()
51
52 for t in threads:
53 t.join()
54
55 log.trace("Done with pull steps", steps=self.steps)
56
57
58class XOSPullStepEngine:
59 """ XOSPullStepEngine
60
61 Load pull step modules. Two methods are defined:
62
63 load_pull_step_modules(dir) ... look for step modules in the given directory, and load objects that are
64 descendant from PullStep.
65
66 start() ... Launch threads to handle processing of the PullSteps. It's expected that load_step_modules()
67 will be called before start().
68 """
69
70 def __init__(self):
71 self.pull_steps = []
72
73 def load_pull_step_modules(self, pull_step_dir):
74 self.pull_steps = []
75 log.info("Loading pull steps", pull_step_dir=pull_step_dir)
76
77 # NOTE we'll load all the classes that inherit from PullStep
78 for fn in os.listdir(pull_step_dir):
79 pathname = os.path.join(pull_step_dir, fn)
80 if (
81 os.path.isfile(pathname)
82 and fn.endswith(".py")
83 and (fn != "__init__.py")
84 and ("test" not in fn)
85 ):
86 event_module = imp.load_source(fn[:-3], pathname)
87
88 for classname in dir(event_module):
89 c = getattr(event_module, classname, None)
90
91 if inspect.isclass(c):
92 base_names = [b.__name__ for b in c.__bases__]
93 if "PullStep" in base_names:
94 self.pull_steps.append(c)
95 log.info("Loaded pull steps", steps=self.pull_steps)
96
97 def start(self):
98 log.info("Starting pull steps engine", steps=self.pull_steps)
99
100 for step in self.pull_steps:
101 sched = XOSPullStepScheduler(steps=self.pull_steps)
102 sched.run()