blob: 54899ec75795afc8d4e1bc2795126f70de7c36b8 [file] [log] [blame]
Aharoni, Pavel (pa0916)ca3cb012018-10-22 15:29:57 +03001/*-
2 * ============LICENSE_START=======================================================
3 * OSAM
4 * ================================================================================
5 * Copyright (C) 2018 AT&T
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
19 */
20
21
22
23package org.onap.osam.job.impl;
24
25import org.apache.commons.lang3.StringUtils;
26import org.onap.osam.job.Job;
27import org.onap.osam.job.JobCommand;
28import org.onap.osam.job.JobsBrokerService;
29import org.onap.osam.job.NextCommand;
30import org.onap.osam.job.command.JobCommandFactory;
31import org.onap.osam.properties.Features;
32import org.onap.portalsdk.core.logging.logic.EELFLoggerDelegate;
33import org.quartz.JobExecutionContext;
34import org.springframework.scheduling.quartz.QuartzJobBean;
35import org.springframework.stereotype.Component;
36import org.togglz.core.manager.FeatureManager;
37
38import java.util.Optional;
39import java.util.UUID;
40
41import static org.onap.osam.job.Job.JobStatus.FAILED;
42import static org.onap.osam.job.Job.JobStatus.STOPPED;
43
44@Component
45public class JobWorker extends QuartzJobBean {
46
47 private static final EELFLoggerDelegate LOGGER = EELFLoggerDelegate.getLogger(JobWorker.class);
48
49 private JobsBrokerService jobsBrokerService;
50 private FeatureManager featureManager;
51 private JobCommandFactory jobCommandFactory;
52 private Job.JobStatus topic;
53
54 @Override
55 protected void executeInternal(JobExecutionContext context) {
56 Optional<Job> job;
57
58 if (!isMsoNewApiActive()) {
59 return;
60 }
61
62 job = pullJob();
63
64 while (job.isPresent()) {
65 Job nextJob = executeJobAndGetNext(job.get());
66 pushBack(nextJob);
67
68 job = pullJob();
69 }
70 }
71
72 private Optional<Job> pullJob() {
73 try {
74 return jobsBrokerService.pull(topic, UUID.randomUUID().toString());
75 } catch (Exception e) {
76 LOGGER.error(EELFLoggerDelegate.errorLogger, "failed to pull job from queue, breaking: {}", e, e);
77 return Optional.empty();
78 }
79 }
80
81 private void pushBack(Job nextJob) {
82 try {
83 jobsBrokerService.pushBack(nextJob);
84 } catch (Exception e) {
85 LOGGER.error(EELFLoggerDelegate.errorLogger, "failed pushing back job to queue: {}", e, e);
86 }
87 }
88
89 protected Job executeJobAndGetNext(Job job) {
90 LOGGER.debug(EELFLoggerDelegate.debugLogger, "going to execute job {} of {}: {}/{}",
91 StringUtils.substring(String.valueOf(job.getUuid()), 0, 8),
92 StringUtils.substring(String.valueOf(job.getTemplateId()), 0, 8),
93 job.getStatus(), job.getType());
94
95 NextCommand nextCommand = executeCommandAndGetNext(job);
96
97 Job nextJob = setNextCommandInJob(nextCommand, job);
98
99 return nextJob;
100 }
101
102 private NextCommand executeCommandAndGetNext(Job job) {
103 NextCommand nextCommand;
104 try {
105 final JobCommand jobCommand = jobCommandFactory.toCommand(job);
106 nextCommand = jobCommand.call();
107 } catch (Exception e) {
108 LOGGER.error(EELFLoggerDelegate.errorLogger, "error while executing job from queue: {}", e, e);
109 nextCommand = new NextCommand(FAILED);
110 }
111
112 if (nextCommand == null) {
113 nextCommand = new NextCommand(STOPPED);
114 }
115 return nextCommand;
116 }
117
118 private Job setNextCommandInJob(NextCommand nextCommand, Job job) {
119 LOGGER.debug(EELFLoggerDelegate.debugLogger, "transforming job {} of {}: {}/{} -> {}{}",
120 StringUtils.substring(String.valueOf(job.getUuid()), 0, 8),
121 StringUtils.substring(String.valueOf(job.getTemplateId()), 0, 8),
122 job.getStatus(), job.getType(),
123 nextCommand.getStatus(),
124 nextCommand.getCommand() != null ? ("/" + nextCommand.getCommand().getType()) : "");
125
126 job.setStatus(nextCommand.getStatus());
127
128 if (nextCommand.getCommand() != null) {
129 job.setTypeAndData(nextCommand.getCommand().getType(), nextCommand.getCommand().getData());
130 }
131
132 return job;
133 }
134
135 private boolean isMsoNewApiActive() {
136 return featureManager.isActive(Features.FLAG_ASYNC_INSTANTIATION);
137 }
138
139
140 //used by quartz to inject JobsBrokerService into the job
141 //see JobSchedulerInitializer
142 public void setJobsBrokerService(JobsBrokerService jobsBrokerService) {
143 this.jobsBrokerService = jobsBrokerService;
144 }
145
146 public void setFeatureManager(FeatureManager featureManager) {
147 this.featureManager = featureManager;
148 }
149
150 public void setJobCommandFactory(JobCommandFactory jobCommandFactory) {
151 this.jobCommandFactory = jobCommandFactory;
152 }
153
154 public void setTopic(Job.JobStatus topic) {
155 this.topic = topic;
156 }
157}