blob: 672197cec577970c663bb2bae1b27dc10fbdb4d2 [file] [log] [blame]
/*-
* ============LICENSE_START=======================================================
* OSAM
* ================================================================================
* Copyright (C) 2018 AT&T
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ============LICENSE_END=========================================================
*/
package org.onap.osam.job.impl;
import org.apache.commons.lang3.StringUtils;
import org.hibernate.SessionFactory;
import org.onap.osam.exceptions.GenericUncheckedException;
import org.onap.osam.exceptions.OperationNotAllowedException;
import org.onap.osam.job.Job;
import org.onap.osam.job.JobsBrokerService;
import org.onap.osam.properties.VidProperties;
import org.onap.osam.utils.DaoUtils;
import org.onap.portalsdk.core.logging.logic.EELFLoggerDelegate;
import org.onap.portalsdk.core.service.DataAccessService;
import org.onap.portalsdk.core.util.SystemProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.nio.ByteBuffer;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.util.*;
@Service
public class JobsBrokerServiceInDatabaseImpl implements JobsBrokerService {
static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(JobsBrokerServiceInDatabaseImpl.class);
private final DataAccessService dataAccessService;
private final SessionFactory sessionFactory;
private int maxOpenedInstantiationRequestsToMso;
private int pollingIntervalSeconds;
@Autowired
public JobsBrokerServiceInDatabaseImpl(DataAccessService dataAccessService, SessionFactory sessionFactory,
@Value("0") int maxOpenedInstantiationRequestsToMso,
@Value("10") int pollingIntervalSeconds) {
// tha @Value will inject conservative defaults; overridden in @PostConstruct from configuration
this.dataAccessService = dataAccessService;
this.sessionFactory = sessionFactory;
this.maxOpenedInstantiationRequestsToMso = maxOpenedInstantiationRequestsToMso;
this.pollingIntervalSeconds = pollingIntervalSeconds;
}
@PostConstruct
public void configure() {
maxOpenedInstantiationRequestsToMso = Integer.parseInt(SystemProperties.getProperty(VidProperties.MSO_MAX_OPENED_INSTANTIATION_REQUESTS));
pollingIntervalSeconds = Integer.parseInt(SystemProperties.getProperty(VidProperties.MSO_ASYNC_POLLING_INTERVAL_SECONDS));
}
public void deleteAll() {
dataAccessService.deleteDomainObjects(JobDaoImpl.class, "1=1", null);
}
@Override
public UUID add(Job job) {
final JobDaoImpl jobDao = castToJobDaoImpl(job);
jobDao.setUuid(UUID.randomUUID());
dataAccessService.saveDomainObject(jobDao, DaoUtils.getPropsMap());
return job.getUuid();
}
@Override
public Optional<Job> pull(Job.JobStatus topic, String ownerId) {
JobDaoImpl daoJob;
int updatedEntities;
do {
String query = sqlQueryForTopic(topic);
List<JobDaoImpl> jobs = dataAccessService.executeSQLQuery(query, JobDaoImpl.class, null);
if (jobs.isEmpty()) {
return Optional.empty();
}
daoJob = jobs.get(0);
final UUID uuid = daoJob.getUuid();
final Integer age = daoJob.getAge();
daoJob.setTakenBy(ownerId);
// It might become that daoJob was taken and pushed-back already, before we
// arrived here, so we're verifying the age was not pushed forward.
// Age is actually forwarded upon pushBack().
String hqlUpdate = "update JobDaoImpl job set job.takenBy = :takenBy where " +
" job.id = :id" +
" and job.age = :age" +
" and takenBy is null";
updatedEntities = DaoUtils.tryWithSessionAndTransaction(sessionFactory, session ->
session.createQuery(hqlUpdate)
.setText("id", uuid.toString())
.setInteger("age", age)
.setText("takenBy", ownerId)
.executeUpdate());
} while (updatedEntities == 0);
return Optional.ofNullable(daoJob);
}
private java.sql.Timestamp nowMinusInterval() {
return Timestamp.valueOf(LocalDateTime.now().minusSeconds(pollingIntervalSeconds));
}
private String sqlQueryForTopic(Job.JobStatus topic) {
switch (topic) {
case IN_PROGRESS:
return "" +
"select * from VID_JOB" +
" where" +
// select only non-deleted in-progress jobs
" JOB_STATUS = 'IN_PROGRESS'" +
" and TAKEN_BY is null" +
" and DELETED_AT is null" +
// give some breath, don't select jos that were recently reached
" and MODIFIED_DATE <= '" + nowMinusInterval() +
// take the oldest handled one
"' order by MODIFIED_DATE ASC" +
// select only one result
" limit 1";
case PENDING:
return "" +
// select only pending jobs
"select vid_job.* from VID_JOB " +
// select users have in_progress jobs
"left join \n" +
" (select user_Id, 1 as has_any_in_progress_job from VID_JOB where JOB_STATUS = 'IN_PROGRESS' or TAKEN_BY IS NOT NULL \n" +
"group by user_id) users_have_any_in_progress_job_tbl\n" +
"on vid_job.user_id = users_have_any_in_progress_job_tbl.user_id " +
"where JOB_STATUS = 'PENDING' and TAKEN_BY is null" +
// job is not deleted
" AND DELETED_AT is null and (\n" +
// limit in-progress to some amount
"select sum(CASE WHEN JOB_STATUS='IN_PROGRESS' or (JOB_STATUS='PENDING' and TAKEN_BY IS NOT NULL) THEN 1 ELSE 0 END) as in_progress\n" +
"from VID_JOB ) <" + maxOpenedInstantiationRequestsToMso + " \n " +
// don't take jobs from templates that already in-progress/failed
"and TEMPLATE_Id not in \n" +
"(select TEMPLATE_Id from vid_job where" +
" (JOB_STATUS='FAILED' and DELETED_AT is null)" + // failed but not deleted
" or JOB_STATUS='IN_PROGRESS'" +
" or TAKEN_BY IS NOT NULL)" + " \n " +
// prefer older jobs, but the earlier in each bulk
"order by has_any_in_progress_job, CREATED_DATE, INDEX_IN_BULK " +
// select only one result
"limit 1";
default:
throw new GenericUncheckedException("Unsupported topic to pull from: " + topic);
}
}
private byte[] getUuidAsByteArray(UUID owner) {
ByteBuffer bb = ByteBuffer.wrap(new byte[16]);
bb.putLong(owner.getMostSignificantBits());
bb.putLong(owner.getLeastSignificantBits());
return bb.array();
}
@Override
public void pushBack(Job job) {
final JobDaoImpl remoteDaoJob = (JobDaoImpl) dataAccessService.getDomainObject(JobDaoImpl.class, job.getUuid(), null);
if (remoteDaoJob == null) {
throw new IllegalStateException("Can push back only pulled jobs. Add new jobs using add()");
}
if (remoteDaoJob.getTakenBy() == null) {
throw new IllegalStateException("Can push back only pulled jobs. This one already pushed back.");
}
final JobDaoImpl jobDao = castToJobDaoImpl(job);
jobDao.setTakenBy(null);
Integer age = jobDao.getAge();
jobDao.setAge(age + 1);
logger.debug(EELFLoggerDelegate.debugLogger, "{}/{}", jobDao.getStatus(), jobDao.getType());
dataAccessService.saveDomainObject(jobDao, DaoUtils.getPropsMap());
}
private JobDaoImpl castToJobDaoImpl(Job job) {
if (!(job instanceof JobDaoImpl)) {
throw new UnsupportedOperationException("Can't add " + job.getClass() + " to " + this.getClass());
}
return (JobDaoImpl) job;
}
@Override
public Collection<Job> peek() {
return dataAccessService.getList(JobDaoImpl.class, null);
}
@Override
public Job peek(UUID jobId) {
return (JobDaoImpl) dataAccessService.getDomainObject(JobDaoImpl.class, jobId, null);
}
@Override
public void delete(UUID jobId) {
int updatedEntities;
Date now = new Date();
String hqlUpdate = "update JobDaoImpl job set job.deletedAt = :now where " +
" job.id = :id" +
" and job.status in(:pending, :stopped)" +
" and takenBy is null";
updatedEntities = DaoUtils.tryWithSessionAndTransaction(sessionFactory, session ->
session.createQuery(hqlUpdate)
.setTimestamp("now", now)
.setText("id", jobId.toString())
.setText("pending", Job.JobStatus.PENDING.toString())
.setText("stopped", Job.JobStatus.STOPPED.toString())
.executeUpdate());
if (updatedEntities == 0) {
final JobDaoImpl remoteDaoJob = (JobDaoImpl) dataAccessService.getDomainObject(JobDaoImpl.class, jobId, null);
if (remoteDaoJob == null || remoteDaoJob.getUuid() == null) {
logger.debug(EELFLoggerDelegate.debugLogger,"jobId {}: Service does not exist", jobId);
throw new OperationNotAllowedException("Service does not exist");
}
if (!remoteDaoJob.getStatus().equals(Job.JobStatus.PENDING) && !remoteDaoJob.getStatus().equals(Job.JobStatus.STOPPED) || !StringUtils.isEmpty(remoteDaoJob.getTakenBy()) ) {
logger.debug(EELFLoggerDelegate.debugLogger,"jobId {}: Service status does not allow deletion from the queue, status = {}", jobId, remoteDaoJob.getStatus() +
", takenBy " + remoteDaoJob.getTakenBy());
throw new OperationNotAllowedException("Service status does not allow deletion from the queue");
}
throw new OperationNotAllowedException("Service deletion failed");
}
}
}