/*- | |
* ============LICENSE_START======================================================= | |
* OSAM | |
* ================================================================================ | |
* Copyright (C) 2018 AT&T | |
* ================================================================================ | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
* ============LICENSE_END========================================================= | |
*/ | |
package org.onap.osam.job.impl; | |
import com.google.common.collect.Lists; | |
import lombok.extern.slf4j.Slf4j; | |
import org.onap.osam.common.exception.GenericUncheckedException; | |
import org.onap.osam.common.exception.InvalidOperationException; | |
import org.onap.osam.job.dao.job.JobStatus; | |
import org.onap.osam.job.dao.job.OsamJob; | |
import org.onap.osam.job.IJobsDataAccessService; | |
import org.onap.osam.job.repository.job.OsamJobRepository; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.beans.factory.annotation.Value; | |
import org.springframework.stereotype.Service; | |
import org.springframework.util.StringUtils; | |
import javax.annotation.PostConstruct; | |
import java.nio.ByteBuffer; | |
import java.sql.Timestamp; | |
import java.time.LocalDateTime; | |
import java.util.Collection; | |
import java.util.Date; | |
import java.util.Optional; | |
import java.util.UUID; | |
@Slf4j | |
@Service | |
public class JobsDataAccessService implements IJobsDataAccessService { | |
private OsamJobRepository osamJobRepository; | |
private Long maxOpenedRequestsToAbstractOlt; | |
private int pollingIntervalSeconds; | |
@Autowired | |
public JobsDataAccessService(OsamJobRepository osamJobRepository, | |
@Value("0") Long maxOpenedRequestsToAbstractOlt, | |
@Value("10") int pollingIntervalSeconds) { | |
// tha @Value will inject conservative defaults; overridden in @PostConstruct from configuration | |
this.osamJobRepository = osamJobRepository; | |
this.maxOpenedRequestsToAbstractOlt = maxOpenedRequestsToAbstractOlt; | |
this.pollingIntervalSeconds = pollingIntervalSeconds; | |
} | |
@PostConstruct | |
public void configure() { | |
//TODO define defaults | |
/*maxOpenedRequestsToAbstractOlt = Integer.parseInt(System.getProperty(VidProperties.MSO_MAX_OPENED_INSTANTIATION_REQUESTS)); | |
pollingIntervalSeconds = Integer.parseInt(System.getProperty(VidProperties.MSO_ASYNC_POLLING_INTERVAL_SECONDS));*/ | |
} | |
public void deleteAll() { | |
osamJobRepository.deleteAll(); | |
} | |
@Override | |
public UUID add(OsamJob job) { | |
osamJobRepository.save(job); | |
return job.getUuid(); | |
} | |
@Override | |
public Optional<OsamJob> pull(JobStatus topic, String ownerId) { | |
OsamJob osamJob; | |
int updatedEntities; | |
do { | |
Optional<OsamJob> optionalOsamJob = selectQueryByJobStatus(topic); | |
if (!optionalOsamJob.isPresent()) { | |
return optionalOsamJob; | |
} | |
osamJob = optionalOsamJob.get(); | |
final UUID uuid = osamJob.getUuid(); | |
final Integer age = osamJob.getAge(); | |
osamJob.setTakenBy(ownerId); | |
// It might become that a job was taken and pushed-back already, before we | |
// arrived here, so we're verifying the age was not pushed forward. | |
// Age is actually forwarded upon pushBack(). | |
updatedEntities = osamJobRepository.updateOsamCoreJobsAge(ownerId, uuid, age); | |
} while (updatedEntities == 0); | |
return Optional.ofNullable(osamJob); | |
} | |
private java.sql.Timestamp nowMinusInterval() { | |
return Timestamp.valueOf(LocalDateTime.now().minusSeconds(pollingIntervalSeconds)); | |
} | |
private Optional<OsamJob> selectQueryByJobStatus(JobStatus topic){ | |
//TODO Pavel understand this interval | |
//String intervalCondition = (topic==JobStatus.CREATING) ? "" : (" and MODIFIED_DATE <= '" + nowMinusInterval()+"'"); | |
return osamJobRepository.queryFirst1ByStatusAndTakenByIsNullAndDeletedAtIsNullOrderByModifiedDateAsc(topic).stream().findFirst(); | |
} | |
private Optional<OsamJob> sqlQueryForTopic(JobStatus topic) { | |
switch (topic) { | |
case IN_PROGRESS: | |
case RESOURCE_IN_PROGRESS: | |
case CREATING: | |
case PENDING: | |
return selectQueryByJobStatus(topic); | |
//TODO Pavel - at first stage, using the naive query for pending topic | |
/*case PENDING: | |
return osamJobRepository.findOsamJobsPending(maxOpenedRequestsToAbstractOlt);*/ | |
default: | |
throw new GenericUncheckedException("Unsupported topic to pull from: " + topic); | |
} | |
} | |
private byte[] getUuidAsByteArray(UUID owner) { | |
ByteBuffer bb = ByteBuffer.wrap(new byte[16]); | |
bb.putLong(owner.getMostSignificantBits()); | |
bb.putLong(owner.getLeastSignificantBits()); | |
return bb.array(); | |
} | |
@Override | |
public void pushBack(OsamJob job) { | |
final Optional<OsamJob> remoteDaoJob = osamJobRepository.findByUuid(job.getUuid()); | |
if (!remoteDaoJob.isPresent()) { | |
throw new IllegalStateException("Can push back only pulled jobs. Add new jobs using add()"); | |
} | |
if (remoteDaoJob.get().getTakenBy() == null) { | |
throw new IllegalStateException("Can push back only pulled jobs. This one already pushed back."); | |
} | |
job.setTakenBy(null); | |
Integer age = job.getAge(); | |
job.setAge(age + 1); | |
log.debug("{}/{}", job.getStatus(), job.getType()); | |
osamJobRepository.save(job); | |
} | |
/*private OsamJob castToOsamJob(OsamJob job) { | |
if (!(job instanceof OsamJob)) { | |
throw new UnsupportedOperationException("Can't add " + job.getClass() + " to " + this.getClass()); | |
} | |
return (OsamJob) job; | |
}*/ | |
@Override | |
public Collection<OsamJob> peek() { | |
return Lists.newArrayList(osamJobRepository.findAll()); | |
} | |
@Override | |
public OsamJob peek(UUID jobId) { | |
return osamJobRepository.findByUuid(jobId).orElse(null); | |
} | |
@Override | |
public void delete(UUID jobId) { | |
Date now = new Date(); | |
Integer updatedEntities = osamJobRepository.updateOsamCoreJobToBeDeleted(now, jobId, JobStatus.PENDING.toString(), JobStatus.STOPPED.toString()); | |
if (updatedEntities == 0) { | |
final Optional<OsamJob> remoteDaoJob = osamJobRepository.findByUuid(jobId); | |
if (!remoteDaoJob.isPresent() || remoteDaoJob.get().getUuid() == null) { | |
log.debug("jobId {}: Service does not exist", jobId); | |
throw new InvalidOperationException("Service does not exist"); | |
} | |
if (!remoteDaoJob.get().equals(JobStatus.PENDING) && !remoteDaoJob.get().getStatus().equals(JobStatus.STOPPED) || !StringUtils.isEmpty(remoteDaoJob.get().getTakenBy())) { | |
log.debug("jobId {}: Service status does not allow deletion from the queue, status = {}", jobId, remoteDaoJob.get().getStatus() + | |
", takenBy " + remoteDaoJob.get().getTakenBy()); | |
throw new InvalidOperationException("Service status does not allow deletion from the queue"); | |
} | |
throw new InvalidOperationException("Service deletion failed"); | |
} | |
} | |
@Override | |
public boolean mute(UUID jobId) { | |
if (jobId == null) { | |
return false; | |
} | |
final String prefix = "DUMP"; | |
Integer updatedEntities = osamJobRepository.muteOsamCoreJob(jobId, prefix); | |
return updatedEntities != 0; | |
} | |
} |