/*- | |
* ============LICENSE_START======================================================= | |
* OSAM | |
* ================================================================================ | |
* Copyright (C) 2018 AT&T | |
* ================================================================================ | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
* ============LICENSE_END========================================================= | |
*/ | |
package org.onap.osam.job.impl; | |
import lombok.extern.slf4j.Slf4j; | |
import org.apache.commons.lang3.exception.ExceptionUtils; | |
import org.onap.osam.job.exceptions.JobException; | |
import org.onap.osam.job.dao.job.JobStatus; | |
import org.onap.osam.job.dao.job.OsamJob; | |
import org.onap.osam.job.IJobCommand; | |
import org.onap.osam.job.IJobsDataAccessService; | |
import org.onap.osam.job.NextCommand; | |
import org.onap.osam.job.command.JobCommandFactory; | |
import org.quartz.JobExecutionContext; | |
import org.springframework.scheduling.quartz.QuartzJobBean; | |
import org.springframework.stereotype.Component; | |
import java.util.Optional; | |
import java.util.UUID; | |
@Slf4j | |
@Component | |
public class JobWorker extends QuartzJobBean { | |
private IJobsDataAccessService jobsDataAccessService; | |
private JobCommandFactory jobCommandFactory; | |
private JobStatus topic; | |
@Override | |
protected void executeInternal(JobExecutionContext context) { | |
Optional<OsamJob> job; | |
job = pullJob(); | |
while (job.isPresent()) { | |
OsamJob nextStateOfJob = executeJobAndGetNext(job.get()); | |
pushBack(nextStateOfJob); | |
job = pullJob(); | |
} | |
} | |
private Optional<OsamJob> pullJob() { | |
try { | |
return jobsDataAccessService.pull(topic, UUID.randomUUID().toString()); | |
} catch (Exception e) { | |
log.error("failed to pull job from queue, breaking: {}", e, e); | |
tryMutingJobFromException(e); | |
return Optional.empty(); | |
} | |
} | |
private void pushBack(OsamJob nextJob) { | |
try { | |
jobsDataAccessService.pushBack(nextJob); | |
} catch (Exception e) { | |
log.error("failed pushing back job to queue: {}", e, e); | |
} | |
} | |
protected OsamJob executeJobAndGetNext(OsamJob job) { | |
//TODO Pavel find out about teplateId | |
log.debug("going to execute job {}: {}/{}", | |
//log.debug("going to execute job {} of {}: {}/{}", | |
String.valueOf(job.getUuid()).substring(0,8), | |
//String.valueOf(job.getTemplateId()).substring(0, 8), | |
job.getStatus(), job.getType()); | |
NextCommand nextCommand = executeCommandAndGetNext(job); | |
return setNextCommandInJob(nextCommand, job); | |
} | |
private NextCommand executeCommandAndGetNext(OsamJob job) { | |
NextCommand nextCommand; | |
try { | |
final IJobCommand jobCommand = jobCommandFactory.toCommand(job); | |
nextCommand = jobCommand.call(); | |
} catch (Exception e) { | |
log.error("error while executing job from queue: {}", e); | |
nextCommand = new NextCommand(JobStatus.FAILED); | |
} | |
if (nextCommand == null) { | |
nextCommand = new NextCommand(JobStatus.STOPPED); | |
} | |
return nextCommand; | |
} | |
private OsamJob setNextCommandInJob(NextCommand nextCommand, OsamJob job) { | |
log.debug("transforming job {}: {}/{} -> {}{}", | |
String.valueOf(job.getUuid()).substring(0, 8), | |
job.getStatus(), job.getType(), | |
nextCommand.getStatus(), | |
nextCommand.getCommand() != null ? ("/" + nextCommand.getCommand().getType()) : ""); | |
job.setStatus(nextCommand.getStatus()); | |
if (nextCommand.getCommand() != null) { | |
job.setTypeAndData(nextCommand.getCommand().getType(), nextCommand.getCommand().getData()); | |
} | |
return job; | |
} | |
private void tryMutingJobFromException(Exception e) { | |
// If there's JobException in the stack, read job uuid from | |
// the exception, and mute it in DB. | |
final int indexOfJobException = | |
ExceptionUtils.indexOfThrowable(e, JobException.class); | |
if (indexOfJobException >= 0) { | |
try { | |
final JobException jobException = (JobException) ExceptionUtils.getThrowableList(e).get(indexOfJobException); | |
log.info("muting job: {} ({})", jobException.getJobUuid(), jobException.toString()); | |
final boolean success = jobsDataAccessService.mute(jobException.getJobUuid()); | |
if (!success) { | |
log.error("failed to mute job {}", jobException.getJobUuid()); | |
} | |
} catch (Exception e1) { | |
log.error("failed to mute job: {}", e1, e1); | |
} | |
} | |
} | |
//used by quartz to inject IJobsDataAccessService into the job | |
//see JobSchedulerInitializer | |
public void setJobsDataAccessService(IJobsDataAccessService jobsDataAccessService) { | |
this.jobsDataAccessService = jobsDataAccessService; | |
} | |
/*public void setFeatureManager(FeatureManager featureManager) { | |
this.featureManager = featureManager; | |
}*/ | |
public void setJobCommandFactory(JobCommandFactory jobCommandFactory) { | |
this.jobCommandFactory = jobCommandFactory; | |
} | |
public void setTopic(JobStatus topic) { | |
this.topic = topic; | |
} | |
} |