blob: a58bfa2440e3ab4325f4ce2ab80246d982a87f65 [file] [log] [blame]
Scott Bakerbba67b62019-01-28 17:38:21 -08001# Copyright 2017-present Open Networking Foundation
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import imp
16import inspect
17import os
18import threading
19import time
20from xosconfig import Config
21from multistructlog import create_logger
22
23log = create_logger(Config().get("logging"))
24
25
26class XOSPullStepScheduler:
27 """ XOSPullStepThread
28
29 A Thread for servicing pull steps. There is one event_step associated with one XOSPullStepThread.
30 The thread's pull_records() function is called for every five seconds.
31 """
32
Scott Bakerc2fddaa2019-01-30 15:45:03 -080033 def __init__(self, steps, model_accessor, *args, **kwargs):
Scott Bakerbba67b62019-01-28 17:38:21 -080034 self.steps = steps
Scott Bakerc2fddaa2019-01-30 15:45:03 -080035 self.model_accessor = model_accessor
Scott Bakerbba67b62019-01-28 17:38:21 -080036
37 def run(self):
38 while True:
39 time.sleep(5)
40 self.run_once()
41
42 def run_once(self):
Zack Williamsda69db22019-01-29 16:44:52 -070043 log.debug("Starting pull steps", steps=self.steps)
Scott Bakerbba67b62019-01-28 17:38:21 -080044
45 threads = []
46 for step in self.steps:
Scott Bakerc2fddaa2019-01-30 15:45:03 -080047 thread = threading.Thread(target=step(model_accessor=self.model_accessor).pull_records, name="pull_step")
Scott Bakerbba67b62019-01-28 17:38:21 -080048 threads.append(thread)
49
50 for t in threads:
51 t.start()
52
53 for t in threads:
54 t.join()
55
Zack Williamsda69db22019-01-29 16:44:52 -070056 log.debug("Done with pull steps", steps=self.steps)
Scott Bakerbba67b62019-01-28 17:38:21 -080057
58
59class XOSPullStepEngine:
60 """ XOSPullStepEngine
61
62 Load pull step modules. Two methods are defined:
63
64 load_pull_step_modules(dir) ... look for step modules in the given directory, and load objects that are
65 descendant from PullStep.
66
67 start() ... Launch threads to handle processing of the PullSteps. It's expected that load_step_modules()
68 will be called before start().
69 """
70
Scott Bakerc2fddaa2019-01-30 15:45:03 -080071 def __init__(self, model_accessor):
72 self.model_accessor = model_accessor
Scott Bakerbba67b62019-01-28 17:38:21 -080073 self.pull_steps = []
74
75 def load_pull_step_modules(self, pull_step_dir):
76 self.pull_steps = []
77 log.info("Loading pull steps", pull_step_dir=pull_step_dir)
78
79 # NOTE we'll load all the classes that inherit from PullStep
80 for fn in os.listdir(pull_step_dir):
81 pathname = os.path.join(pull_step_dir, fn)
82 if (
83 os.path.isfile(pathname)
84 and fn.endswith(".py")
85 and (fn != "__init__.py")
86 and ("test" not in fn)
87 ):
88 event_module = imp.load_source(fn[:-3], pathname)
89
90 for classname in dir(event_module):
91 c = getattr(event_module, classname, None)
92
93 if inspect.isclass(c):
94 base_names = [b.__name__ for b in c.__bases__]
95 if "PullStep" in base_names:
96 self.pull_steps.append(c)
97 log.info("Loaded pull steps", steps=self.pull_steps)
98
99 def start(self):
100 log.info("Starting pull steps engine", steps=self.pull_steps)
101
102 for step in self.pull_steps:
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800103 sched = XOSPullStepScheduler(steps=self.pull_steps, model_accessor=self.model_accessor)
Scott Bakerbba67b62019-01-28 17:38:21 -0800104 sched.run()