Aharoni, Pavel (pa0916) | 8c70f07 | 2018-11-18 00:07:12 +0200 | [diff] [blame] | 1 | /*-
|
| 2 | * ============LICENSE_START=======================================================
|
| 3 | * OSAM
|
| 4 | * ================================================================================
|
| 5 | * Copyright (C) 2018 AT&T
|
| 6 | * ================================================================================
|
| 7 | * Licensed under the Apache License, Version 2.0 (the "License");
|
| 8 | * you may not use this file except in compliance with the License.
|
| 9 | * You may obtain a copy of the License at
|
| 10 | *
|
| 11 | * http://www.apache.org/licenses/LICENSE-2.0
|
| 12 | *
|
| 13 | * Unless required by applicable law or agreed to in writing, software
|
| 14 | * distributed under the License is distributed on an "AS IS" BASIS,
|
| 15 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
| 16 | * See the License for the specific language governing permissions and
|
| 17 | * limitations under the License.
|
| 18 | * ============LICENSE_END=========================================================
|
| 19 | */
|
| 20 | package org.onap.osam.job.impl;
|
| 21 |
|
| 22 | import com.google.common.collect.Lists;
|
| 23 | import lombok.extern.slf4j.Slf4j;
|
| 24 | import org.onap.osam.common.exception.GenericUncheckedException;
|
| 25 | import org.onap.osam.common.exception.InvalidOperationException;
|
| 26 | import org.onap.osam.job.dao.job.JobStatus;
|
| 27 | import org.onap.osam.job.dao.job.OsamJob;
|
| 28 | import org.onap.osam.job.IJobsDataAccessService;
|
| 29 | import org.onap.osam.job.repository.job.OsamJobRepository;
|
| 30 | import org.springframework.beans.factory.annotation.Autowired;
|
| 31 | import org.springframework.beans.factory.annotation.Value;
|
| 32 | import org.springframework.stereotype.Service;
|
| 33 | import org.springframework.util.StringUtils;
|
| 34 |
|
| 35 | import javax.annotation.PostConstruct;
|
| 36 | import java.nio.ByteBuffer;
|
| 37 | import java.sql.Timestamp;
|
| 38 | import java.time.LocalDateTime;
|
| 39 | import java.util.Collection;
|
| 40 | import java.util.Date;
|
| 41 | import java.util.Optional;
|
| 42 | import java.util.UUID;
|
| 43 |
|
| 44 | @Slf4j
|
| 45 | @Service
|
| 46 | public class JobsDataAccessService implements IJobsDataAccessService {
|
| 47 |
|
| 48 | private OsamJobRepository osamJobRepository;
|
| 49 | private Long maxOpenedRequestsToAbstractOlt;
|
| 50 | private int pollingIntervalSeconds;
|
| 51 |
|
| 52 | @Autowired
|
| 53 | public JobsDataAccessService(OsamJobRepository osamJobRepository,
|
| 54 | @Value("0") Long maxOpenedRequestsToAbstractOlt,
|
| 55 | @Value("10") int pollingIntervalSeconds) {
|
| 56 | // tha @Value will inject conservative defaults; overridden in @PostConstruct from configuration
|
| 57 | this.osamJobRepository = osamJobRepository;
|
| 58 | this.maxOpenedRequestsToAbstractOlt = maxOpenedRequestsToAbstractOlt;
|
| 59 | this.pollingIntervalSeconds = pollingIntervalSeconds;
|
| 60 | }
|
| 61 |
|
| 62 | @PostConstruct
|
| 63 | public void configure() {
|
| 64 | //TODO define defaults
|
| 65 | /*maxOpenedRequestsToAbstractOlt = Integer.parseInt(System.getProperty(VidProperties.MSO_MAX_OPENED_INSTANTIATION_REQUESTS));
|
| 66 | pollingIntervalSeconds = Integer.parseInt(System.getProperty(VidProperties.MSO_ASYNC_POLLING_INTERVAL_SECONDS));*/
|
| 67 | }
|
| 68 |
|
| 69 | public void deleteAll() {
|
| 70 | osamJobRepository.deleteAll();
|
| 71 | }
|
| 72 |
|
| 73 | @Override
|
| 74 | public UUID add(OsamJob job) {
|
| 75 | osamJobRepository.save(job);
|
| 76 | return job.getUuid();
|
| 77 | }
|
| 78 |
|
| 79 | @Override
|
| 80 | public Optional<OsamJob> pull(JobStatus topic, String ownerId) {
|
| 81 | OsamJob osamJob;
|
| 82 | int updatedEntities;
|
| 83 | do {
|
| 84 |
|
| 85 | Optional<OsamJob> optionalOsamJob = selectQueryByJobStatus(topic);
|
| 86 | if (!optionalOsamJob.isPresent()) {
|
| 87 | return optionalOsamJob;
|
| 88 | }
|
| 89 |
|
| 90 | osamJob = optionalOsamJob.get();
|
| 91 | final UUID uuid = osamJob.getUuid();
|
| 92 | final Integer age = osamJob.getAge();
|
| 93 |
|
| 94 | osamJob.setTakenBy(ownerId);
|
| 95 |
|
| 96 | // It might become that a job was taken and pushed-back already, before we
|
| 97 | // arrived here, so we're verifying the age was not pushed forward.
|
| 98 | // Age is actually forwarded upon pushBack().
|
| 99 | updatedEntities = osamJobRepository.updateOsamCoreJobsAge(ownerId, uuid, age);
|
| 100 |
|
| 101 | } while (updatedEntities == 0);
|
| 102 |
|
| 103 | return Optional.ofNullable(osamJob);
|
| 104 | }
|
| 105 |
|
| 106 | private java.sql.Timestamp nowMinusInterval() {
|
| 107 | return Timestamp.valueOf(LocalDateTime.now().minusSeconds(pollingIntervalSeconds));
|
| 108 | }
|
| 109 |
|
| 110 | private Optional<OsamJob> selectQueryByJobStatus(JobStatus topic){
|
| 111 | //TODO Pavel understand this interval
|
| 112 | //String intervalCondition = (topic==JobStatus.CREATING) ? "" : (" and MODIFIED_DATE <= '" + nowMinusInterval()+"'");
|
| 113 | return osamJobRepository.queryFirst1ByStatusAndTakenByIsNullAndDeletedAtIsNullOrderByModifiedDateAsc(topic).stream().findFirst();
|
| 114 |
|
| 115 | }
|
| 116 |
|
| 117 | private Optional<OsamJob> sqlQueryForTopic(JobStatus topic) {
|
| 118 | switch (topic) {
|
| 119 | case IN_PROGRESS:
|
| 120 | case RESOURCE_IN_PROGRESS:
|
| 121 | case CREATING:
|
| 122 | case PENDING:
|
| 123 | return selectQueryByJobStatus(topic);
|
| 124 | //TODO Pavel - at first stage, using the naive query for pending topic
|
| 125 | /*case PENDING:
|
| 126 | return osamJobRepository.findOsamJobsPending(maxOpenedRequestsToAbstractOlt);*/
|
| 127 | default:
|
| 128 | throw new GenericUncheckedException("Unsupported topic to pull from: " + topic);
|
| 129 | }
|
| 130 | }
|
| 131 |
|
| 132 |
|
| 133 | private byte[] getUuidAsByteArray(UUID owner) {
|
| 134 | ByteBuffer bb = ByteBuffer.wrap(new byte[16]);
|
| 135 | bb.putLong(owner.getMostSignificantBits());
|
| 136 | bb.putLong(owner.getLeastSignificantBits());
|
| 137 | return bb.array();
|
| 138 | }
|
| 139 |
|
| 140 | @Override
|
| 141 | public void pushBack(OsamJob job) {
|
| 142 | final Optional<OsamJob> remoteDaoJob = osamJobRepository.findByUuid(job.getUuid());
|
| 143 |
|
| 144 | if (!remoteDaoJob.isPresent()) {
|
| 145 | throw new IllegalStateException("Can push back only pulled jobs. Add new jobs using add()");
|
| 146 | }
|
| 147 |
|
| 148 | if (remoteDaoJob.get().getTakenBy() == null) {
|
| 149 | throw new IllegalStateException("Can push back only pulled jobs. This one already pushed back.");
|
| 150 | }
|
| 151 |
|
| 152 | job.setTakenBy(null);
|
| 153 |
|
| 154 | Integer age = job.getAge();
|
| 155 | job.setAge(age + 1);
|
| 156 |
|
| 157 | log.debug("{}/{}", job.getStatus(), job.getType());
|
| 158 |
|
| 159 | osamJobRepository.save(job);
|
| 160 | }
|
| 161 |
|
| 162 | /*private OsamJob castToOsamJob(OsamJob job) {
|
| 163 | if (!(job instanceof OsamJob)) {
|
| 164 | throw new UnsupportedOperationException("Can't add " + job.getClass() + " to " + this.getClass());
|
| 165 | }
|
| 166 | return (OsamJob) job;
|
| 167 | }*/
|
| 168 |
|
| 169 | @Override
|
| 170 | public Collection<OsamJob> peek() {
|
| 171 | return Lists.newArrayList(osamJobRepository.findAll());
|
| 172 | }
|
| 173 |
|
| 174 | @Override
|
| 175 | public OsamJob peek(UUID jobId) {
|
| 176 | return osamJobRepository.findByUuid(jobId).orElse(null);
|
| 177 | }
|
| 178 |
|
| 179 | @Override
|
| 180 | public void delete(UUID jobId) {
|
| 181 | Date now = new Date();
|
| 182 | Integer updatedEntities = osamJobRepository.updateOsamCoreJobToBeDeleted(now, jobId, JobStatus.PENDING.toString(), JobStatus.STOPPED.toString());
|
| 183 |
|
| 184 | if (updatedEntities == 0) {
|
| 185 | final Optional<OsamJob> remoteDaoJob = osamJobRepository.findByUuid(jobId);
|
| 186 |
|
| 187 | if (!remoteDaoJob.isPresent() || remoteDaoJob.get().getUuid() == null) {
|
| 188 | log.debug("jobId {}: Service does not exist", jobId);
|
| 189 | throw new InvalidOperationException("Service does not exist");
|
| 190 | }
|
| 191 |
|
| 192 | if (!remoteDaoJob.get().equals(JobStatus.PENDING) && !remoteDaoJob.get().getStatus().equals(JobStatus.STOPPED) || !StringUtils.isEmpty(remoteDaoJob.get().getTakenBy())) {
|
| 193 | log.debug("jobId {}: Service status does not allow deletion from the queue, status = {}", jobId, remoteDaoJob.get().getStatus() +
|
| 194 | ", takenBy " + remoteDaoJob.get().getTakenBy());
|
| 195 | throw new InvalidOperationException("Service status does not allow deletion from the queue");
|
| 196 | }
|
| 197 |
|
| 198 | throw new InvalidOperationException("Service deletion failed");
|
| 199 | }
|
| 200 | }
|
| 201 |
|
| 202 | @Override
|
| 203 | public boolean mute(UUID jobId) {
|
| 204 | if (jobId == null) {
|
| 205 | return false;
|
| 206 | }
|
| 207 | final String prefix = "DUMP";
|
| 208 | Integer updatedEntities = osamJobRepository.muteOsamCoreJob(jobId, prefix);
|
| 209 | return updatedEntities != 0;
|
| 210 | }
|
| 211 | }
|