blob: 672197cec577970c663bb2bae1b27dc10fbdb4d2 [file] [log] [blame]
Aharoni, Pavel (pa0916)ca3cb012018-10-22 15:29:57 +03001/*-
2 * ============LICENSE_START=======================================================
3 * OSAM
4 * ================================================================================
5 * Copyright (C) 2018 AT&T
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
19 */
20
21
22
23package org.onap.osam.job.impl;
24
25import org.apache.commons.lang3.StringUtils;
26import org.hibernate.SessionFactory;
27import org.onap.osam.exceptions.GenericUncheckedException;
28import org.onap.osam.exceptions.OperationNotAllowedException;
29import org.onap.osam.job.Job;
30import org.onap.osam.job.JobsBrokerService;
31import org.onap.osam.properties.VidProperties;
32import org.onap.osam.utils.DaoUtils;
33import org.onap.portalsdk.core.logging.logic.EELFLoggerDelegate;
34import org.onap.portalsdk.core.service.DataAccessService;
35import org.onap.portalsdk.core.util.SystemProperties;
36import org.springframework.beans.factory.annotation.Autowired;
37import org.springframework.beans.factory.annotation.Value;
38import org.springframework.stereotype.Service;
39
40import javax.annotation.PostConstruct;
41import java.nio.ByteBuffer;
42import java.sql.Timestamp;
43import java.time.LocalDateTime;
44import java.util.*;
45
46@Service
47public class JobsBrokerServiceInDatabaseImpl implements JobsBrokerService {
48
49 static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(JobsBrokerServiceInDatabaseImpl.class);
50
51 private final DataAccessService dataAccessService;
52
53 private final SessionFactory sessionFactory;
54 private int maxOpenedInstantiationRequestsToMso;
55 private int pollingIntervalSeconds;
56
57 @Autowired
58 public JobsBrokerServiceInDatabaseImpl(DataAccessService dataAccessService, SessionFactory sessionFactory,
59 @Value("0") int maxOpenedInstantiationRequestsToMso,
60 @Value("10") int pollingIntervalSeconds) {
61 // tha @Value will inject conservative defaults; overridden in @PostConstruct from configuration
62 this.dataAccessService = dataAccessService;
63 this.sessionFactory = sessionFactory;
64 this.maxOpenedInstantiationRequestsToMso = maxOpenedInstantiationRequestsToMso;
65 this.pollingIntervalSeconds = pollingIntervalSeconds;
66 }
67
68 @PostConstruct
69 public void configure() {
70 maxOpenedInstantiationRequestsToMso = Integer.parseInt(SystemProperties.getProperty(VidProperties.MSO_MAX_OPENED_INSTANTIATION_REQUESTS));
71 pollingIntervalSeconds = Integer.parseInt(SystemProperties.getProperty(VidProperties.MSO_ASYNC_POLLING_INTERVAL_SECONDS));
72 }
73
74 public void deleteAll() {
75 dataAccessService.deleteDomainObjects(JobDaoImpl.class, "1=1", null);
76 }
77
78 @Override
79 public UUID add(Job job) {
80 final JobDaoImpl jobDao = castToJobDaoImpl(job);
81 jobDao.setUuid(UUID.randomUUID());
82 dataAccessService.saveDomainObject(jobDao, DaoUtils.getPropsMap());
83 return job.getUuid();
84 }
85
86 @Override
87 public Optional<Job> pull(Job.JobStatus topic, String ownerId) {
88 JobDaoImpl daoJob;
89 int updatedEntities;
90 do {
91 String query = sqlQueryForTopic(topic);
92 List<JobDaoImpl> jobs = dataAccessService.executeSQLQuery(query, JobDaoImpl.class, null);
93 if (jobs.isEmpty()) {
94 return Optional.empty();
95 }
96
97 daoJob = jobs.get(0);
98
99 final UUID uuid = daoJob.getUuid();
100 final Integer age = daoJob.getAge();
101
102 daoJob.setTakenBy(ownerId);
103
104 // It might become that daoJob was taken and pushed-back already, before we
105 // arrived here, so we're verifying the age was not pushed forward.
106 // Age is actually forwarded upon pushBack().
107 String hqlUpdate = "update JobDaoImpl job set job.takenBy = :takenBy where " +
108 " job.id = :id" +
109 " and job.age = :age" +
110 " and takenBy is null";
111 updatedEntities = DaoUtils.tryWithSessionAndTransaction(sessionFactory, session ->
112 session.createQuery(hqlUpdate)
113 .setText("id", uuid.toString())
114 .setInteger("age", age)
115 .setText("takenBy", ownerId)
116 .executeUpdate());
117
118 } while (updatedEntities == 0);
119
120 return Optional.ofNullable(daoJob);
121 }
122
123 private java.sql.Timestamp nowMinusInterval() {
124 return Timestamp.valueOf(LocalDateTime.now().minusSeconds(pollingIntervalSeconds));
125 }
126
127 private String sqlQueryForTopic(Job.JobStatus topic) {
128 switch (topic) {
129 case IN_PROGRESS:
130 return "" +
131 "select * from VID_JOB" +
132 " where" +
133 // select only non-deleted in-progress jobs
134 " JOB_STATUS = 'IN_PROGRESS'" +
135 " and TAKEN_BY is null" +
136 " and DELETED_AT is null" +
137 // give some breath, don't select jos that were recently reached
138 " and MODIFIED_DATE <= '" + nowMinusInterval() +
139 // take the oldest handled one
140 "' order by MODIFIED_DATE ASC" +
141 // select only one result
142 " limit 1";
143
144 case PENDING:
145 return "" +
146 // select only pending jobs
147 "select vid_job.* from VID_JOB " +
148 // select users have in_progress jobs
149 "left join \n" +
150 " (select user_Id, 1 as has_any_in_progress_job from VID_JOB where JOB_STATUS = 'IN_PROGRESS' or TAKEN_BY IS NOT NULL \n" +
151 "group by user_id) users_have_any_in_progress_job_tbl\n" +
152 "on vid_job.user_id = users_have_any_in_progress_job_tbl.user_id " +
153 "where JOB_STATUS = 'PENDING' and TAKEN_BY is null" +
154 // job is not deleted
155 " AND DELETED_AT is null and (\n" +
156 // limit in-progress to some amount
157 "select sum(CASE WHEN JOB_STATUS='IN_PROGRESS' or (JOB_STATUS='PENDING' and TAKEN_BY IS NOT NULL) THEN 1 ELSE 0 END) as in_progress\n" +
158 "from VID_JOB ) <" + maxOpenedInstantiationRequestsToMso + " \n " +
159 // don't take jobs from templates that already in-progress/failed
160 "and TEMPLATE_Id not in \n" +
161 "(select TEMPLATE_Id from vid_job where" +
162 " (JOB_STATUS='FAILED' and DELETED_AT is null)" + // failed but not deleted
163 " or JOB_STATUS='IN_PROGRESS'" +
164 " or TAKEN_BY IS NOT NULL)" + " \n " +
165 // prefer older jobs, but the earlier in each bulk
166 "order by has_any_in_progress_job, CREATED_DATE, INDEX_IN_BULK " +
167 // select only one result
168 "limit 1";
169 default:
170 throw new GenericUncheckedException("Unsupported topic to pull from: " + topic);
171 }
172 }
173
174
175 private byte[] getUuidAsByteArray(UUID owner) {
176 ByteBuffer bb = ByteBuffer.wrap(new byte[16]);
177 bb.putLong(owner.getMostSignificantBits());
178 bb.putLong(owner.getLeastSignificantBits());
179 return bb.array();
180 }
181
182 @Override
183 public void pushBack(Job job) {
184 final JobDaoImpl remoteDaoJob = (JobDaoImpl) dataAccessService.getDomainObject(JobDaoImpl.class, job.getUuid(), null);
185
186 if (remoteDaoJob == null) {
187 throw new IllegalStateException("Can push back only pulled jobs. Add new jobs using add()");
188 }
189
190 if (remoteDaoJob.getTakenBy() == null) {
191 throw new IllegalStateException("Can push back only pulled jobs. This one already pushed back.");
192 }
193
194 final JobDaoImpl jobDao = castToJobDaoImpl(job);
195
196 jobDao.setTakenBy(null);
197
198 Integer age = jobDao.getAge();
199 jobDao.setAge(age + 1);
200
201 logger.debug(EELFLoggerDelegate.debugLogger, "{}/{}", jobDao.getStatus(), jobDao.getType());
202
203 dataAccessService.saveDomainObject(jobDao, DaoUtils.getPropsMap());
204 }
205
206 private JobDaoImpl castToJobDaoImpl(Job job) {
207 if (!(job instanceof JobDaoImpl)) {
208 throw new UnsupportedOperationException("Can't add " + job.getClass() + " to " + this.getClass());
209 }
210 return (JobDaoImpl) job;
211 }
212
213 @Override
214 public Collection<Job> peek() {
215 return dataAccessService.getList(JobDaoImpl.class, null);
216 }
217
218 @Override
219 public Job peek(UUID jobId) {
220 return (JobDaoImpl) dataAccessService.getDomainObject(JobDaoImpl.class, jobId, null);
221 }
222
223 @Override
224 public void delete(UUID jobId) {
225 int updatedEntities;
226 Date now = new Date();
227
228 String hqlUpdate = "update JobDaoImpl job set job.deletedAt = :now where " +
229 " job.id = :id" +
230 " and job.status in(:pending, :stopped)" +
231 " and takenBy is null";
232
233 updatedEntities = DaoUtils.tryWithSessionAndTransaction(sessionFactory, session ->
234 session.createQuery(hqlUpdate)
235 .setTimestamp("now", now)
236 .setText("id", jobId.toString())
237 .setText("pending", Job.JobStatus.PENDING.toString())
238 .setText("stopped", Job.JobStatus.STOPPED.toString())
239 .executeUpdate());
240
241 if (updatedEntities == 0) {
242 final JobDaoImpl remoteDaoJob = (JobDaoImpl) dataAccessService.getDomainObject(JobDaoImpl.class, jobId, null);
243
244 if (remoteDaoJob == null || remoteDaoJob.getUuid() == null) {
245 logger.debug(EELFLoggerDelegate.debugLogger,"jobId {}: Service does not exist", jobId);
246 throw new OperationNotAllowedException("Service does not exist");
247 }
248
249 if (!remoteDaoJob.getStatus().equals(Job.JobStatus.PENDING) && !remoteDaoJob.getStatus().equals(Job.JobStatus.STOPPED) || !StringUtils.isEmpty(remoteDaoJob.getTakenBy()) ) {
250 logger.debug(EELFLoggerDelegate.debugLogger,"jobId {}: Service status does not allow deletion from the queue, status = {}", jobId, remoteDaoJob.getStatus() +
251 ", takenBy " + remoteDaoJob.getTakenBy());
252 throw new OperationNotAllowedException("Service status does not allow deletion from the queue");
253 }
254
255 throw new OperationNotAllowedException("Service deletion failed");
256 }
257 }
258}