blob: 18ae378427e56aea6744b19679c229bf89fceb70 [file] [log] [blame]
/*-
* ============LICENSE_START=======================================================
* OSAM
* ================================================================================
* Copyright (C) 2018 AT&T
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ============LICENSE_END=========================================================
*/
package org.onap.osam.job.impl;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.onap.osam.common.exception.GenericUncheckedException;
import org.onap.osam.common.exception.InvalidOperationException;
import org.onap.osam.job.dao.job.JobStatus;
import org.onap.osam.job.dao.job.OsamJob;
import org.onap.osam.job.IJobsDataAccessService;
import org.onap.osam.job.repository.job.OsamJobRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import javax.annotation.PostConstruct;
import java.nio.ByteBuffer;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.util.Collection;
import java.util.Date;
import java.util.Optional;
import java.util.UUID;
@Slf4j
@Service
public class JobsDataAccessService implements IJobsDataAccessService {
private OsamJobRepository osamJobRepository;
private Long maxOpenedRequestsToAbstractOlt;
private int pollingIntervalSeconds;
@Autowired
public JobsDataAccessService(OsamJobRepository osamJobRepository,
@Value("0") Long maxOpenedRequestsToAbstractOlt,
@Value("10") int pollingIntervalSeconds) {
// tha @Value will inject conservative defaults; overridden in @PostConstruct from configuration
this.osamJobRepository = osamJobRepository;
this.maxOpenedRequestsToAbstractOlt = maxOpenedRequestsToAbstractOlt;
this.pollingIntervalSeconds = pollingIntervalSeconds;
}
@PostConstruct
public void configure() {
//TODO define defaults
/*maxOpenedRequestsToAbstractOlt = Integer.parseInt(System.getProperty(VidProperties.MSO_MAX_OPENED_INSTANTIATION_REQUESTS));
pollingIntervalSeconds = Integer.parseInt(System.getProperty(VidProperties.MSO_ASYNC_POLLING_INTERVAL_SECONDS));*/
}
public void deleteAll() {
osamJobRepository.deleteAll();
}
@Override
public UUID add(OsamJob job) {
osamJobRepository.save(job);
return job.getUuid();
}
@Override
public Optional<OsamJob> pull(JobStatus topic, String ownerId) {
OsamJob osamJob;
int updatedEntities;
do {
Optional<OsamJob> optionalOsamJob = selectQueryByJobStatus(topic);
if (!optionalOsamJob.isPresent()) {
return optionalOsamJob;
}
osamJob = optionalOsamJob.get();
final UUID uuid = osamJob.getUuid();
final Integer age = osamJob.getAge();
osamJob.setTakenBy(ownerId);
// It might become that a job was taken and pushed-back already, before we
// arrived here, so we're verifying the age was not pushed forward.
// Age is actually forwarded upon pushBack().
updatedEntities = osamJobRepository.updateOsamCoreJobsAge(ownerId, uuid, age);
} while (updatedEntities == 0);
return Optional.ofNullable(osamJob);
}
private java.sql.Timestamp nowMinusInterval() {
return Timestamp.valueOf(LocalDateTime.now().minusSeconds(pollingIntervalSeconds));
}
private Optional<OsamJob> selectQueryByJobStatus(JobStatus topic){
//TODO Pavel understand this interval
//String intervalCondition = (topic==JobStatus.CREATING) ? "" : (" and MODIFIED_DATE <= '" + nowMinusInterval()+"'");
return osamJobRepository.queryFirst1ByStatusAndTakenByIsNullAndDeletedAtIsNullOrderByModifiedDateAsc(topic).stream().findFirst();
}
private Optional<OsamJob> sqlQueryForTopic(JobStatus topic) {
switch (topic) {
case IN_PROGRESS:
case RESOURCE_IN_PROGRESS:
case CREATING:
case PENDING:
return selectQueryByJobStatus(topic);
//TODO Pavel - at first stage, using the naive query for pending topic
/*case PENDING:
return osamJobRepository.findOsamJobsPending(maxOpenedRequestsToAbstractOlt);*/
default:
throw new GenericUncheckedException("Unsupported topic to pull from: " + topic);
}
}
private byte[] getUuidAsByteArray(UUID owner) {
ByteBuffer bb = ByteBuffer.wrap(new byte[16]);
bb.putLong(owner.getMostSignificantBits());
bb.putLong(owner.getLeastSignificantBits());
return bb.array();
}
@Override
public void pushBack(OsamJob job) {
final Optional<OsamJob> remoteDaoJob = osamJobRepository.findByUuid(job.getUuid());
if (!remoteDaoJob.isPresent()) {
throw new IllegalStateException("Can push back only pulled jobs. Add new jobs using add()");
}
if (remoteDaoJob.get().getTakenBy() == null) {
throw new IllegalStateException("Can push back only pulled jobs. This one already pushed back.");
}
job.setTakenBy(null);
Integer age = job.getAge();
job.setAge(age + 1);
log.debug("{}/{}", job.getStatus(), job.getType());
osamJobRepository.save(job);
}
/*private OsamJob castToOsamJob(OsamJob job) {
if (!(job instanceof OsamJob)) {
throw new UnsupportedOperationException("Can't add " + job.getClass() + " to " + this.getClass());
}
return (OsamJob) job;
}*/
@Override
public Collection<OsamJob> peek() {
return Lists.newArrayList(osamJobRepository.findAll());
}
@Override
public OsamJob peek(UUID jobId) {
return osamJobRepository.findByUuid(jobId).orElse(null);
}
@Override
public void delete(UUID jobId) {
Date now = new Date();
Integer updatedEntities = osamJobRepository.updateOsamCoreJobToBeDeleted(now, jobId, JobStatus.PENDING.toString(), JobStatus.STOPPED.toString());
if (updatedEntities == 0) {
final Optional<OsamJob> remoteDaoJob = osamJobRepository.findByUuid(jobId);
if (!remoteDaoJob.isPresent() || remoteDaoJob.get().getUuid() == null) {
log.debug("jobId {}: Service does not exist", jobId);
throw new InvalidOperationException("Service does not exist");
}
if (!remoteDaoJob.get().equals(JobStatus.PENDING) && !remoteDaoJob.get().getStatus().equals(JobStatus.STOPPED) || !StringUtils.isEmpty(remoteDaoJob.get().getTakenBy())) {
log.debug("jobId {}: Service status does not allow deletion from the queue, status = {}", jobId, remoteDaoJob.get().getStatus() +
", takenBy " + remoteDaoJob.get().getTakenBy());
throw new InvalidOperationException("Service status does not allow deletion from the queue");
}
throw new InvalidOperationException("Service deletion failed");
}
}
@Override
public boolean mute(UUID jobId) {
if (jobId == null) {
return false;
}
final String prefix = "DUMP";
Integer updatedEntities = osamJobRepository.muteOsamCoreJob(jobId, prefix);
return updatedEntities != 0;
}
}