blob: 97d52a5061dd09131427fcf08e576eeec37f9f40 [file] [log] [blame]
Aharoni, Pavel (pa0916)8c70f072018-11-18 00:07:12 +02001/*-
2 * ============LICENSE_START=======================================================
3 * OSAM
4 * ================================================================================
5 * Copyright (C) 2018 AT&T
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
19 */
20package org.onap.osam.job.impl;
21
22import lombok.extern.slf4j.Slf4j;
23import org.apache.commons.lang3.exception.ExceptionUtils;
24import org.onap.osam.job.exceptions.JobException;
25import org.onap.osam.job.dao.job.JobStatus;
26import org.onap.osam.job.dao.job.OsamJob;
27import org.onap.osam.job.IJobCommand;
28import org.onap.osam.job.IJobsDataAccessService;
29import org.onap.osam.job.NextCommand;
30import org.onap.osam.job.command.JobCommandFactory;
31import org.quartz.JobExecutionContext;
32import org.springframework.scheduling.quartz.QuartzJobBean;
33import org.springframework.stereotype.Component;
34
35import java.util.Optional;
36import java.util.UUID;
37
38
39@Slf4j
40@Component
41public class JobWorker extends QuartzJobBean {
42
43 private IJobsDataAccessService jobsDataAccessService;
44 private JobCommandFactory jobCommandFactory;
45 private JobStatus topic;
46
47 @Override
48 protected void executeInternal(JobExecutionContext context) {
49 Optional<OsamJob> job;
50
51 job = pullJob();
52
53 while (job.isPresent()) {
54 OsamJob nextStateOfJob = executeJobAndGetNext(job.get());
55 pushBack(nextStateOfJob);
56 job = pullJob();
57 }
58 }
59
60 private Optional<OsamJob> pullJob() {
61 try {
62 return jobsDataAccessService.pull(topic, UUID.randomUUID().toString());
63 } catch (Exception e) {
64 log.error("failed to pull job from queue, breaking: {}", e, e);
65 tryMutingJobFromException(e);
66
67 return Optional.empty();
68 }
69 }
70
71 private void pushBack(OsamJob nextJob) {
72 try {
73 jobsDataAccessService.pushBack(nextJob);
74 } catch (Exception e) {
75 log.error("failed pushing back job to queue: {}", e, e);
76 }
77 }
78
79 protected OsamJob executeJobAndGetNext(OsamJob job) {
80 //TODO Pavel find out about teplateId
81 log.debug("going to execute job {}: {}/{}",
82 //log.debug("going to execute job {} of {}: {}/{}",
83 String.valueOf(job.getUuid()).substring(0,8),
84 //String.valueOf(job.getTemplateId()).substring(0, 8),
85 job.getStatus(), job.getType());
86
87 NextCommand nextCommand = executeCommandAndGetNext(job);
88
89 return setNextCommandInJob(nextCommand, job);
90 }
91
92 private NextCommand executeCommandAndGetNext(OsamJob job) {
93 NextCommand nextCommand;
94 try {
95 final IJobCommand jobCommand = jobCommandFactory.toCommand(job);
96 nextCommand = jobCommand.call();
97 } catch (Exception e) {
98 log.error("error while executing job from queue: {}", e);
99 nextCommand = new NextCommand(JobStatus.FAILED);
100 }
101
102 if (nextCommand == null) {
103 nextCommand = new NextCommand(JobStatus.STOPPED);
104 }
105 return nextCommand;
106 }
107
108 private OsamJob setNextCommandInJob(NextCommand nextCommand, OsamJob job) {
109 log.debug("transforming job {}: {}/{} -> {}{}",
110 String.valueOf(job.getUuid()).substring(0, 8),
111 job.getStatus(), job.getType(),
112 nextCommand.getStatus(),
113 nextCommand.getCommand() != null ? ("/" + nextCommand.getCommand().getType()) : "");
114
115 job.setStatus(nextCommand.getStatus());
116
117 if (nextCommand.getCommand() != null) {
118 job.setTypeAndData(nextCommand.getCommand().getType(), nextCommand.getCommand().getData());
119 }
120
121 return job;
122 }
123
124
125 private void tryMutingJobFromException(Exception e) {
126 // If there's JobException in the stack, read job uuid from
127 // the exception, and mute it in DB.
128 final int indexOfJobException =
129 ExceptionUtils.indexOfThrowable(e, JobException.class);
130
131 if (indexOfJobException >= 0) {
132 try {
133 final JobException jobException = (JobException) ExceptionUtils.getThrowableList(e).get(indexOfJobException);
134 log.info("muting job: {} ({})", jobException.getJobUuid(), jobException.toString());
135 final boolean success = jobsDataAccessService.mute(jobException.getJobUuid());
136 if (!success) {
137 log.error("failed to mute job {}", jobException.getJobUuid());
138 }
139 } catch (Exception e1) {
140 log.error("failed to mute job: {}", e1, e1);
141 }
142 }
143 }
144
145 //used by quartz to inject IJobsDataAccessService into the job
146 //see JobSchedulerInitializer
147 public void setJobsDataAccessService(IJobsDataAccessService jobsDataAccessService) {
148 this.jobsDataAccessService = jobsDataAccessService;
149 }
150
151 /*public void setFeatureManager(FeatureManager featureManager) {
152 this.featureManager = featureManager;
153 }*/
154
155 public void setJobCommandFactory(JobCommandFactory jobCommandFactory) {
156 this.jobCommandFactory = jobCommandFactory;
157 }
158
159 public void setTopic(JobStatus topic) {
160 this.topic = topic;
161 }
162}