/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.xd.dirt.batch.tasklet;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.Serializable;
import java.util.Date;
import java.util.Map;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.UnexpectedJobExecutionException;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.PollableChannel;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import org.springframework.xd.dirt.integration.bus.BusUtils;
import org.springframework.xd.dirt.integration.bus.MessageBus;
import org.springframework.xd.dirt.stream.JobDefinition;
import org.springframework.xd.dirt.stream.JobDefinitionRepository;
import org.springframework.xd.dirt.stream.NoSuchDefinitionException;
import org.springframework.xd.dirt.stream.NotDeployedException;
import org.springframework.xd.store.DomainRepository;

public class JobLaunchingTasklet
implements Tasklet {
    private final Logger logger = LoggerFactory.getLogger(JobLaunchingTasklet.class);
    public static final String XD_ORCHESTRATION_ID = "xd_orchestration_id";
    public static final String XD_PARENT_JOB_EXECUTION_ID = "xd_parent_execution_id";
    private long timeout;
    private String jobName;
    private MessageBus messageBus;
    private JobDefinitionRepository definitionRepository;
    private DomainRepository<JobDefinition, String> instanceRepository;
    private String orchestrationId;
    private MessageChannel launchingChannel;
    private PollableChannel listeningChannel;

    public JobLaunchingTasklet(MessageBus messageBus, JobDefinitionRepository jobDefinitionRepository, DomainRepository<JobDefinition, String> instanceRepository, String jobName, Long timeout) {
        this(messageBus, jobDefinitionRepository, instanceRepository, jobName, timeout, (MessageChannel)JobLaunchingTasklet.createLaunchingChannel(jobName), (PollableChannel)JobLaunchingTasklet.createListeningChannel(jobName));
    }

    protected JobLaunchingTasklet(MessageBus messageBus, JobDefinitionRepository jobDefinitionRepository, DomainRepository<JobDefinition, String> instanceRepository, String jobName, Long timeout, MessageChannel launchingChannel, PollableChannel listeningChannel) {
        Assert.notNull((Object)messageBus, (String)"A message bus is required");
        Assert.notNull((Object)jobDefinitionRepository, (String)"A JobDefinitionRepository is required");
        Assert.notNull(instanceRepository, (String)"A DomainRepository is required");
        Assert.notNull((Object)jobName, (String)"A job name is required");
        this.jobName = jobName;
        this.messageBus = messageBus;
        this.definitionRepository = jobDefinitionRepository;
        this.instanceRepository = instanceRepository;
        this.launchingChannel = launchingChannel;
        this.listeningChannel = listeningChannel;
        this.timeout = timeout == null ? -1L : timeout;
    }

    private static DirectChannel createLaunchingChannel(String jobName) {
        DirectChannel launchingChannel = new DirectChannel();
        launchingChannel.setBeanName(jobName + ":launcher");
        return launchingChannel;
    }

    private static QueueChannel createListeningChannel(String jobName) {
        QueueChannel listeningChannel = new QueueChannel();
        listeningChannel.setBeanName(jobName + ":resultListener");
        return listeningChannel;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
        this.setOrchestrationId(chunkContext);
        String driverJobName = chunkContext.getStepContext().getStepExecution().getJobExecution().getJobInstance().getJobName();
        this.bindChannels(driverJobName);
        try {
            this.validateJobDeployment();
            String jobParametersString = this.getJobParameters(chunkContext);
            this.logger.debug("Launching request for {} orchestration {}", (Object)this.jobName, (Object)this.orchestrationId);
            this.launchingChannel.send(MessageBuilder.withPayload((Object)jobParametersString).build());
            Date startTime = new Date();
            long remaining = this.timeout;
            JobExecution results = null;
            while (results == null && (this.timeout <= 0L || remaining > 0L)) {
                Message resultMessage;
                Message message = resultMessage = this.timeout > 0L ? this.listeningChannel.receive(remaining) : this.listeningChannel.receive();
                if (resultMessage != null) {
                    results = this.getResult(resultMessage);
                }
                remaining = startTime.getTime() - System.currentTimeMillis() + this.timeout;
            }
            if (results == null) {
                throw new UnexpectedJobExecutionException("The job timed out while waiting for a result");
            }
            this.processResult(contribution, chunkContext, results);
            this.logger.debug("Completed processing for {} orchestration {}", (Object)this.jobName, (Object)this.orchestrationId);
            RepeatStatus repeatStatus = RepeatStatus.FINISHED;
            return repeatStatus;
        }
        finally {
            this.unbindChannels(driverJobName);
        }
    }

    private String getJobParameters(ChunkContext chunkContext) throws JsonProcessingException {
        StepExecution stepExecution = chunkContext.getStepContext().getStepExecution();
        JobParameters originalJobParameters = stepExecution.getJobParameters();
        Properties currentJobParameters = originalJobParameters.toProperties();
        currentJobParameters.remove("random");
        String random = null;
        Map stepExecutionContext = chunkContext.getStepContext().getStepExecutionContext();
        if (stepExecutionContext.containsKey("random")) {
            random = (String)stepExecutionContext.get("random");
        }
        currentJobParameters.put(XD_ORCHESTRATION_ID, this.orchestrationId);
        currentJobParameters.put("-xd_parent_execution_id", stepExecution.getJobExecutionId());
        if (random != null) {
            currentJobParameters.put("XD_isRestart", (Object)true);
            currentJobParameters.put("random", random);
        }
        return new ObjectMapper().writeValueAsString((Object)currentJobParameters);
    }

    private void processResult(StepContribution contribution, ChunkContext chunkContext, JobExecution results) {
        contribution.setExitStatus(results.getExitStatus());
        if (results.getStatus().isUnsuccessful()) {
            chunkContext.getStepContext().getStepExecution().getExecutionContext().put("random", (Object)results.getJobParameters().getString("random"));
            throw new UnexpectedJobExecutionException(String.format("Step failure: %s failed.", this.jobName));
        }
    }

    private void bindChannels(String driverJobName) {
        this.messageBus.bindPubSubConsumer(this.getEventListenerChannelName(this.jobName, driverJobName), (MessageChannel)this.listeningChannel, null);
        this.messageBus.bindProducer("job:" + this.jobName, this.launchingChannel, null);
    }

    private void unbindChannels(String driverJobName) {
        this.messageBus.unbindConsumer(this.getEventListenerChannelName(this.jobName, driverJobName), (MessageChannel)this.listeningChannel);
        this.messageBus.unbindProducer("job:" + this.jobName, this.launchingChannel);
    }

    private void validateJobDeployment() {
        JobDefinition job = (JobDefinition)this.definitionRepository.findOne((Serializable)((Object)this.jobName));
        if (job == null) {
            throw new NoSuchDefinitionException(this.jobName, String.format("There is no %s definition named '%%s'", "job"));
        }
        if (this.instanceRepository.findOne((Serializable)((Object)this.jobName)) == null) {
            throw new NotDeployedException(this.jobName, String.format("The %s named '%%s' is not currently deployed", "job"));
        }
    }

    private void setOrchestrationId(ChunkContext chunkContext) {
        this.orchestrationId = String.valueOf(chunkContext.getStepContext().getStepExecution().getJobExecution().getJobInstance().getInstanceId());
        ExecutionContext stepExecutionContext = chunkContext.getStepContext().getStepExecution().getExecutionContext();
        if (stepExecutionContext.containsKey(XD_ORCHESTRATION_ID)) {
            this.orchestrationId = (String)stepExecutionContext.get(XD_ORCHESTRATION_ID);
        } else {
            stepExecutionContext.put(XD_ORCHESTRATION_ID, (Object)this.orchestrationId);
        }
    }

    private String getEventListenerChannelName(String jobName, String driverJobName) {
        String tapName = String.format("tap:job:%s.job", jobName);
        if (this.messageBus.isCapable(MessageBus.Capability.DURABLE_PUBSUB)) {
            tapName = BusUtils.addGroupToPubSub((String)driverJobName, (String)tapName);
        }
        return tapName;
    }

    public JobExecution getResult(Message<?> message) throws MessagingException {
        JobExecution jobExecution = (JobExecution)message.getPayload();
        String curOrchestrationId = jobExecution.getJobParameters().getString(XD_ORCHESTRATION_ID);
        this.logger.debug("Received result for {} orchestration {}", (Object)this.jobName, (Object)curOrchestrationId);
        if (StringUtils.hasText((String)curOrchestrationId) && curOrchestrationId.equalsIgnoreCase(this.orchestrationId) && !jobExecution.isRunning()) {
            return jobExecution;
        }
        return null;
    }
}

