blob: 18ae378427e56aea6744b19679c229bf89fceb70 [file] [log] [blame]
Aharoni, Pavel (pa0916)8c70f072018-11-18 00:07:12 +02001/*-
2 * ============LICENSE_START=======================================================
3 * OSAM
4 * ================================================================================
5 * Copyright (C) 2018 AT&T
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
19 */
20package org.onap.osam.job.impl;
21
22import com.google.common.collect.Lists;
23import lombok.extern.slf4j.Slf4j;
24import org.onap.osam.common.exception.GenericUncheckedException;
25import org.onap.osam.common.exception.InvalidOperationException;
26import org.onap.osam.job.dao.job.JobStatus;
27import org.onap.osam.job.dao.job.OsamJob;
28import org.onap.osam.job.IJobsDataAccessService;
29import org.onap.osam.job.repository.job.OsamJobRepository;
30import org.springframework.beans.factory.annotation.Autowired;
31import org.springframework.beans.factory.annotation.Value;
32import org.springframework.stereotype.Service;
33import org.springframework.util.StringUtils;
34
35import javax.annotation.PostConstruct;
36import java.nio.ByteBuffer;
37import java.sql.Timestamp;
38import java.time.LocalDateTime;
39import java.util.Collection;
40import java.util.Date;
41import java.util.Optional;
42import java.util.UUID;
43
44@Slf4j
45@Service
46public class JobsDataAccessService implements IJobsDataAccessService {
47
48 private OsamJobRepository osamJobRepository;
49 private Long maxOpenedRequestsToAbstractOlt;
50 private int pollingIntervalSeconds;
51
52 @Autowired
53 public JobsDataAccessService(OsamJobRepository osamJobRepository,
54 @Value("0") Long maxOpenedRequestsToAbstractOlt,
55 @Value("10") int pollingIntervalSeconds) {
56 // tha @Value will inject conservative defaults; overridden in @PostConstruct from configuration
57 this.osamJobRepository = osamJobRepository;
58 this.maxOpenedRequestsToAbstractOlt = maxOpenedRequestsToAbstractOlt;
59 this.pollingIntervalSeconds = pollingIntervalSeconds;
60 }
61
62 @PostConstruct
63 public void configure() {
64 //TODO define defaults
65 /*maxOpenedRequestsToAbstractOlt = Integer.parseInt(System.getProperty(VidProperties.MSO_MAX_OPENED_INSTANTIATION_REQUESTS));
66 pollingIntervalSeconds = Integer.parseInt(System.getProperty(VidProperties.MSO_ASYNC_POLLING_INTERVAL_SECONDS));*/
67 }
68
69 public void deleteAll() {
70 osamJobRepository.deleteAll();
71 }
72
73 @Override
74 public UUID add(OsamJob job) {
75 osamJobRepository.save(job);
76 return job.getUuid();
77 }
78
79 @Override
80 public Optional<OsamJob> pull(JobStatus topic, String ownerId) {
81 OsamJob osamJob;
82 int updatedEntities;
83 do {
84
85 Optional<OsamJob> optionalOsamJob = selectQueryByJobStatus(topic);
86 if (!optionalOsamJob.isPresent()) {
87 return optionalOsamJob;
88 }
89
90 osamJob = optionalOsamJob.get();
91 final UUID uuid = osamJob.getUuid();
92 final Integer age = osamJob.getAge();
93
94 osamJob.setTakenBy(ownerId);
95
96 // It might become that a job was taken and pushed-back already, before we
97 // arrived here, so we're verifying the age was not pushed forward.
98 // Age is actually forwarded upon pushBack().
99 updatedEntities = osamJobRepository.updateOsamCoreJobsAge(ownerId, uuid, age);
100
101 } while (updatedEntities == 0);
102
103 return Optional.ofNullable(osamJob);
104 }
105
106 private java.sql.Timestamp nowMinusInterval() {
107 return Timestamp.valueOf(LocalDateTime.now().minusSeconds(pollingIntervalSeconds));
108 }
109
110 private Optional<OsamJob> selectQueryByJobStatus(JobStatus topic){
111 //TODO Pavel understand this interval
112 //String intervalCondition = (topic==JobStatus.CREATING) ? "" : (" and MODIFIED_DATE <= '" + nowMinusInterval()+"'");
113 return osamJobRepository.queryFirst1ByStatusAndTakenByIsNullAndDeletedAtIsNullOrderByModifiedDateAsc(topic).stream().findFirst();
114
115 }
116
117 private Optional<OsamJob> sqlQueryForTopic(JobStatus topic) {
118 switch (topic) {
119 case IN_PROGRESS:
120 case RESOURCE_IN_PROGRESS:
121 case CREATING:
122 case PENDING:
123 return selectQueryByJobStatus(topic);
124 //TODO Pavel - at first stage, using the naive query for pending topic
125 /*case PENDING:
126 return osamJobRepository.findOsamJobsPending(maxOpenedRequestsToAbstractOlt);*/
127 default:
128 throw new GenericUncheckedException("Unsupported topic to pull from: " + topic);
129 }
130 }
131
132
133 private byte[] getUuidAsByteArray(UUID owner) {
134 ByteBuffer bb = ByteBuffer.wrap(new byte[16]);
135 bb.putLong(owner.getMostSignificantBits());
136 bb.putLong(owner.getLeastSignificantBits());
137 return bb.array();
138 }
139
140 @Override
141 public void pushBack(OsamJob job) {
142 final Optional<OsamJob> remoteDaoJob = osamJobRepository.findByUuid(job.getUuid());
143
144 if (!remoteDaoJob.isPresent()) {
145 throw new IllegalStateException("Can push back only pulled jobs. Add new jobs using add()");
146 }
147
148 if (remoteDaoJob.get().getTakenBy() == null) {
149 throw new IllegalStateException("Can push back only pulled jobs. This one already pushed back.");
150 }
151
152 job.setTakenBy(null);
153
154 Integer age = job.getAge();
155 job.setAge(age + 1);
156
157 log.debug("{}/{}", job.getStatus(), job.getType());
158
159 osamJobRepository.save(job);
160 }
161
162 /*private OsamJob castToOsamJob(OsamJob job) {
163 if (!(job instanceof OsamJob)) {
164 throw new UnsupportedOperationException("Can't add " + job.getClass() + " to " + this.getClass());
165 }
166 return (OsamJob) job;
167 }*/
168
169 @Override
170 public Collection<OsamJob> peek() {
171 return Lists.newArrayList(osamJobRepository.findAll());
172 }
173
174 @Override
175 public OsamJob peek(UUID jobId) {
176 return osamJobRepository.findByUuid(jobId).orElse(null);
177 }
178
179 @Override
180 public void delete(UUID jobId) {
181 Date now = new Date();
182 Integer updatedEntities = osamJobRepository.updateOsamCoreJobToBeDeleted(now, jobId, JobStatus.PENDING.toString(), JobStatus.STOPPED.toString());
183
184 if (updatedEntities == 0) {
185 final Optional<OsamJob> remoteDaoJob = osamJobRepository.findByUuid(jobId);
186
187 if (!remoteDaoJob.isPresent() || remoteDaoJob.get().getUuid() == null) {
188 log.debug("jobId {}: Service does not exist", jobId);
189 throw new InvalidOperationException("Service does not exist");
190 }
191
192 if (!remoteDaoJob.get().equals(JobStatus.PENDING) && !remoteDaoJob.get().getStatus().equals(JobStatus.STOPPED) || !StringUtils.isEmpty(remoteDaoJob.get().getTakenBy())) {
193 log.debug("jobId {}: Service status does not allow deletion from the queue, status = {}", jobId, remoteDaoJob.get().getStatus() +
194 ", takenBy " + remoteDaoJob.get().getTakenBy());
195 throw new InvalidOperationException("Service status does not allow deletion from the queue");
196 }
197
198 throw new InvalidOperationException("Service deletion failed");
199 }
200 }
201
202 @Override
203 public boolean mute(UUID jobId) {
204 if (jobId == null) {
205 return false;
206 }
207 final String prefix = "DUMP";
208 Integer updatedEntities = osamJobRepository.muteOsamCoreJob(jobId, prefix);
209 return updatedEntities != 0;
210 }
211}