Aharoni, Pavel (pa0916) | ca3cb01 | 2018-10-22 15:29:57 +0300 | [diff] [blame] | 1 | /*-
|
| 2 | * ============LICENSE_START=======================================================
|
| 3 | * OSAM
|
| 4 | * ================================================================================
|
| 5 | * Copyright (C) 2018 AT&T
|
| 6 | * ================================================================================
|
| 7 | * Licensed under the Apache License, Version 2.0 (the "License");
|
| 8 | * you may not use this file except in compliance with the License.
|
| 9 | * You may obtain a copy of the License at
|
| 10 | *
|
| 11 | * http://www.apache.org/licenses/LICENSE-2.0
|
| 12 | *
|
| 13 | * Unless required by applicable law or agreed to in writing, software
|
| 14 | * distributed under the License is distributed on an "AS IS" BASIS,
|
| 15 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
| 16 | * See the License for the specific language governing permissions and
|
| 17 | * limitations under the License.
|
| 18 | * ============LICENSE_END=========================================================
|
| 19 | */
|
| 20 |
|
| 21 |
|
| 22 |
|
| 23 | package org.onap.osam.job.impl;
|
| 24 |
|
| 25 | import org.apache.commons.lang3.StringUtils;
|
| 26 | import org.hibernate.SessionFactory;
|
| 27 | import org.onap.osam.exceptions.GenericUncheckedException;
|
| 28 | import org.onap.osam.exceptions.OperationNotAllowedException;
|
| 29 | import org.onap.osam.job.Job;
|
| 30 | import org.onap.osam.job.JobsBrokerService;
|
| 31 | import org.onap.osam.properties.VidProperties;
|
| 32 | import org.onap.osam.utils.DaoUtils;
|
| 33 | import org.onap.portalsdk.core.logging.logic.EELFLoggerDelegate;
|
| 34 | import org.onap.portalsdk.core.service.DataAccessService;
|
| 35 | import org.onap.portalsdk.core.util.SystemProperties;
|
| 36 | import org.springframework.beans.factory.annotation.Autowired;
|
| 37 | import org.springframework.beans.factory.annotation.Value;
|
| 38 | import org.springframework.stereotype.Service;
|
| 39 |
|
| 40 | import javax.annotation.PostConstruct;
|
| 41 | import java.nio.ByteBuffer;
|
| 42 | import java.sql.Timestamp;
|
| 43 | import java.time.LocalDateTime;
|
| 44 | import java.util.*;
|
| 45 |
|
| 46 | @Service
|
| 47 | public class JobsBrokerServiceInDatabaseImpl implements JobsBrokerService {
|
| 48 |
|
| 49 | static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(JobsBrokerServiceInDatabaseImpl.class);
|
| 50 |
|
| 51 | private final DataAccessService dataAccessService;
|
| 52 |
|
| 53 | private final SessionFactory sessionFactory;
|
| 54 | private int maxOpenedInstantiationRequestsToMso;
|
| 55 | private int pollingIntervalSeconds;
|
| 56 |
|
| 57 | @Autowired
|
| 58 | public JobsBrokerServiceInDatabaseImpl(DataAccessService dataAccessService, SessionFactory sessionFactory,
|
| 59 | @Value("0") int maxOpenedInstantiationRequestsToMso,
|
| 60 | @Value("10") int pollingIntervalSeconds) {
|
| 61 | // tha @Value will inject conservative defaults; overridden in @PostConstruct from configuration
|
| 62 | this.dataAccessService = dataAccessService;
|
| 63 | this.sessionFactory = sessionFactory;
|
| 64 | this.maxOpenedInstantiationRequestsToMso = maxOpenedInstantiationRequestsToMso;
|
| 65 | this.pollingIntervalSeconds = pollingIntervalSeconds;
|
| 66 | }
|
| 67 |
|
| 68 | @PostConstruct
|
| 69 | public void configure() {
|
| 70 | maxOpenedInstantiationRequestsToMso = Integer.parseInt(SystemProperties.getProperty(VidProperties.MSO_MAX_OPENED_INSTANTIATION_REQUESTS));
|
| 71 | pollingIntervalSeconds = Integer.parseInt(SystemProperties.getProperty(VidProperties.MSO_ASYNC_POLLING_INTERVAL_SECONDS));
|
| 72 | }
|
| 73 |
|
| 74 | public void deleteAll() {
|
| 75 | dataAccessService.deleteDomainObjects(JobDaoImpl.class, "1=1", null);
|
| 76 | }
|
| 77 |
|
| 78 | @Override
|
| 79 | public UUID add(Job job) {
|
| 80 | final JobDaoImpl jobDao = castToJobDaoImpl(job);
|
| 81 | jobDao.setUuid(UUID.randomUUID());
|
| 82 | dataAccessService.saveDomainObject(jobDao, DaoUtils.getPropsMap());
|
| 83 | return job.getUuid();
|
| 84 | }
|
| 85 |
|
| 86 | @Override
|
| 87 | public Optional<Job> pull(Job.JobStatus topic, String ownerId) {
|
| 88 | JobDaoImpl daoJob;
|
| 89 | int updatedEntities;
|
| 90 | do {
|
| 91 | String query = sqlQueryForTopic(topic);
|
| 92 | List<JobDaoImpl> jobs = dataAccessService.executeSQLQuery(query, JobDaoImpl.class, null);
|
| 93 | if (jobs.isEmpty()) {
|
| 94 | return Optional.empty();
|
| 95 | }
|
| 96 |
|
| 97 | daoJob = jobs.get(0);
|
| 98 |
|
| 99 | final UUID uuid = daoJob.getUuid();
|
| 100 | final Integer age = daoJob.getAge();
|
| 101 |
|
| 102 | daoJob.setTakenBy(ownerId);
|
| 103 |
|
| 104 | // It might become that daoJob was taken and pushed-back already, before we
|
| 105 | // arrived here, so we're verifying the age was not pushed forward.
|
| 106 | // Age is actually forwarded upon pushBack().
|
| 107 | String hqlUpdate = "update JobDaoImpl job set job.takenBy = :takenBy where " +
|
| 108 | " job.id = :id" +
|
| 109 | " and job.age = :age" +
|
| 110 | " and takenBy is null";
|
| 111 | updatedEntities = DaoUtils.tryWithSessionAndTransaction(sessionFactory, session ->
|
| 112 | session.createQuery(hqlUpdate)
|
| 113 | .setText("id", uuid.toString())
|
| 114 | .setInteger("age", age)
|
| 115 | .setText("takenBy", ownerId)
|
| 116 | .executeUpdate());
|
| 117 |
|
| 118 | } while (updatedEntities == 0);
|
| 119 |
|
| 120 | return Optional.ofNullable(daoJob);
|
| 121 | }
|
| 122 |
|
| 123 | private java.sql.Timestamp nowMinusInterval() {
|
| 124 | return Timestamp.valueOf(LocalDateTime.now().minusSeconds(pollingIntervalSeconds));
|
| 125 | }
|
| 126 |
|
| 127 | private String sqlQueryForTopic(Job.JobStatus topic) {
|
| 128 | switch (topic) {
|
| 129 | case IN_PROGRESS:
|
| 130 | return "" +
|
| 131 | "select * from VID_JOB" +
|
| 132 | " where" +
|
| 133 | // select only non-deleted in-progress jobs
|
| 134 | " JOB_STATUS = 'IN_PROGRESS'" +
|
| 135 | " and TAKEN_BY is null" +
|
| 136 | " and DELETED_AT is null" +
|
| 137 | // give some breath, don't select jos that were recently reached
|
| 138 | " and MODIFIED_DATE <= '" + nowMinusInterval() +
|
| 139 | // take the oldest handled one
|
| 140 | "' order by MODIFIED_DATE ASC" +
|
| 141 | // select only one result
|
| 142 | " limit 1";
|
| 143 |
|
| 144 | case PENDING:
|
| 145 | return "" +
|
| 146 | // select only pending jobs
|
| 147 | "select vid_job.* from VID_JOB " +
|
| 148 | // select users have in_progress jobs
|
| 149 | "left join \n" +
|
| 150 | " (select user_Id, 1 as has_any_in_progress_job from VID_JOB where JOB_STATUS = 'IN_PROGRESS' or TAKEN_BY IS NOT NULL \n" +
|
| 151 | "group by user_id) users_have_any_in_progress_job_tbl\n" +
|
| 152 | "on vid_job.user_id = users_have_any_in_progress_job_tbl.user_id " +
|
| 153 | "where JOB_STATUS = 'PENDING' and TAKEN_BY is null" +
|
| 154 | // job is not deleted
|
| 155 | " AND DELETED_AT is null and (\n" +
|
| 156 | // limit in-progress to some amount
|
| 157 | "select sum(CASE WHEN JOB_STATUS='IN_PROGRESS' or (JOB_STATUS='PENDING' and TAKEN_BY IS NOT NULL) THEN 1 ELSE 0 END) as in_progress\n" +
|
| 158 | "from VID_JOB ) <" + maxOpenedInstantiationRequestsToMso + " \n " +
|
| 159 | // don't take jobs from templates that already in-progress/failed
|
| 160 | "and TEMPLATE_Id not in \n" +
|
| 161 | "(select TEMPLATE_Id from vid_job where" +
|
| 162 | " (JOB_STATUS='FAILED' and DELETED_AT is null)" + // failed but not deleted
|
| 163 | " or JOB_STATUS='IN_PROGRESS'" +
|
| 164 | " or TAKEN_BY IS NOT NULL)" + " \n " +
|
| 165 | // prefer older jobs, but the earlier in each bulk
|
| 166 | "order by has_any_in_progress_job, CREATED_DATE, INDEX_IN_BULK " +
|
| 167 | // select only one result
|
| 168 | "limit 1";
|
| 169 | default:
|
| 170 | throw new GenericUncheckedException("Unsupported topic to pull from: " + topic);
|
| 171 | }
|
| 172 | }
|
| 173 |
|
| 174 |
|
| 175 | private byte[] getUuidAsByteArray(UUID owner) {
|
| 176 | ByteBuffer bb = ByteBuffer.wrap(new byte[16]);
|
| 177 | bb.putLong(owner.getMostSignificantBits());
|
| 178 | bb.putLong(owner.getLeastSignificantBits());
|
| 179 | return bb.array();
|
| 180 | }
|
| 181 |
|
| 182 | @Override
|
| 183 | public void pushBack(Job job) {
|
| 184 | final JobDaoImpl remoteDaoJob = (JobDaoImpl) dataAccessService.getDomainObject(JobDaoImpl.class, job.getUuid(), null);
|
| 185 |
|
| 186 | if (remoteDaoJob == null) {
|
| 187 | throw new IllegalStateException("Can push back only pulled jobs. Add new jobs using add()");
|
| 188 | }
|
| 189 |
|
| 190 | if (remoteDaoJob.getTakenBy() == null) {
|
| 191 | throw new IllegalStateException("Can push back only pulled jobs. This one already pushed back.");
|
| 192 | }
|
| 193 |
|
| 194 | final JobDaoImpl jobDao = castToJobDaoImpl(job);
|
| 195 |
|
| 196 | jobDao.setTakenBy(null);
|
| 197 |
|
| 198 | Integer age = jobDao.getAge();
|
| 199 | jobDao.setAge(age + 1);
|
| 200 |
|
| 201 | logger.debug(EELFLoggerDelegate.debugLogger, "{}/{}", jobDao.getStatus(), jobDao.getType());
|
| 202 |
|
| 203 | dataAccessService.saveDomainObject(jobDao, DaoUtils.getPropsMap());
|
| 204 | }
|
| 205 |
|
| 206 | private JobDaoImpl castToJobDaoImpl(Job job) {
|
| 207 | if (!(job instanceof JobDaoImpl)) {
|
| 208 | throw new UnsupportedOperationException("Can't add " + job.getClass() + " to " + this.getClass());
|
| 209 | }
|
| 210 | return (JobDaoImpl) job;
|
| 211 | }
|
| 212 |
|
| 213 | @Override
|
| 214 | public Collection<Job> peek() {
|
| 215 | return dataAccessService.getList(JobDaoImpl.class, null);
|
| 216 | }
|
| 217 |
|
| 218 | @Override
|
| 219 | public Job peek(UUID jobId) {
|
| 220 | return (JobDaoImpl) dataAccessService.getDomainObject(JobDaoImpl.class, jobId, null);
|
| 221 | }
|
| 222 |
|
| 223 | @Override
|
| 224 | public void delete(UUID jobId) {
|
| 225 | int updatedEntities;
|
| 226 | Date now = new Date();
|
| 227 |
|
| 228 | String hqlUpdate = "update JobDaoImpl job set job.deletedAt = :now where " +
|
| 229 | " job.id = :id" +
|
| 230 | " and job.status in(:pending, :stopped)" +
|
| 231 | " and takenBy is null";
|
| 232 |
|
| 233 | updatedEntities = DaoUtils.tryWithSessionAndTransaction(sessionFactory, session ->
|
| 234 | session.createQuery(hqlUpdate)
|
| 235 | .setTimestamp("now", now)
|
| 236 | .setText("id", jobId.toString())
|
| 237 | .setText("pending", Job.JobStatus.PENDING.toString())
|
| 238 | .setText("stopped", Job.JobStatus.STOPPED.toString())
|
| 239 | .executeUpdate());
|
| 240 |
|
| 241 | if (updatedEntities == 0) {
|
| 242 | final JobDaoImpl remoteDaoJob = (JobDaoImpl) dataAccessService.getDomainObject(JobDaoImpl.class, jobId, null);
|
| 243 |
|
| 244 | if (remoteDaoJob == null || remoteDaoJob.getUuid() == null) {
|
| 245 | logger.debug(EELFLoggerDelegate.debugLogger,"jobId {}: Service does not exist", jobId);
|
| 246 | throw new OperationNotAllowedException("Service does not exist");
|
| 247 | }
|
| 248 |
|
| 249 | if (!remoteDaoJob.getStatus().equals(Job.JobStatus.PENDING) && !remoteDaoJob.getStatus().equals(Job.JobStatus.STOPPED) || !StringUtils.isEmpty(remoteDaoJob.getTakenBy()) ) {
|
| 250 | logger.debug(EELFLoggerDelegate.debugLogger,"jobId {}: Service status does not allow deletion from the queue, status = {}", jobId, remoteDaoJob.getStatus() +
|
| 251 | ", takenBy " + remoteDaoJob.getTakenBy());
|
| 252 | throw new OperationNotAllowedException("Service status does not allow deletion from the queue");
|
| 253 | }
|
| 254 |
|
| 255 | throw new OperationNotAllowedException("Service deletion failed");
|
| 256 | }
|
| 257 | }
|
| 258 | }
|