/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.xd.dirt.stream;

import java.io.Serializable;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;
import org.springframework.util.Assert;
import org.springframework.xd.dirt.integration.bus.MessageBus;
import org.springframework.xd.dirt.server.admin.deployment.DeploymentHandler;
import org.springframework.xd.dirt.stream.AbstractInstancePersistingDeployer;
import org.springframework.xd.dirt.stream.Job;
import org.springframework.xd.dirt.stream.JobDefinition;
import org.springframework.xd.dirt.stream.JobDefinitionRepository;
import org.springframework.xd.dirt.stream.JobRepository;
import org.springframework.xd.dirt.stream.ParsingContext;
import org.springframework.xd.dirt.stream.XDParser;
import org.springframework.xd.dirt.zookeeper.Paths;
import org.springframework.xd.dirt.zookeeper.ZooKeeperConnection;

public class JobDeployer
extends AbstractInstancePersistingDeployer<JobDefinition, Job>
implements DisposableBean {
    private final String JOB_CHANNEL_PREFIX = "job:";
    private final MessageBus messageBus;
    private final ConcurrentMap<String, MessageChannel> jobChannels = new ConcurrentHashMap<String, MessageChannel>();

    public JobDeployer(ZooKeeperConnection zkConnection, JobDefinitionRepository definitionRepository, JobRepository instanceRepository, XDParser parser, MessageBus messageBus, DeploymentHandler deploymentHandler) {
        super(zkConnection, definitionRepository, instanceRepository, parser, deploymentHandler, ParsingContext.job);
        Assert.notNull((Object)messageBus, (String)"MessageBus must not be null");
        this.messageBus = messageBus;
    }

    @Override
    protected Job makeInstance(JobDefinition definition) {
        return new Job(definition);
    }

    public void launch(String name, String jobParameters) {
        JobDefinition job;
        MessageChannel channel = (MessageChannel)this.jobChannels.get(name);
        if (channel == null) {
            this.jobChannels.putIfAbsent(name, (MessageChannel)new DirectChannel());
            channel = (MessageChannel)this.jobChannels.get(name);
            this.messageBus.bindProducer("job:" + name, channel, null);
        }
        if ((job = (JobDefinition)this.getDefinitionRepository().findOne((Serializable)((Object)name))) == null) {
            this.throwNoSuchDefinitionException(name);
        }
        if (this.instanceRepository.findOne((Serializable)((Object)name)) == null) {
            this.throwNotDeployedException(name);
        }
        channel.send(MessageBuilder.withPayload((Object)(jobParameters != null ? jobParameters : "")).build());
    }

    @Override
    protected JobDefinition createDefinition(String name, String definition) {
        return new JobDefinition(name, definition);
    }

    @Override
    protected String getDeploymentPath(JobDefinition definition) {
        return Paths.build("deployments/jobs", definition.getName());
    }

    public void destroy() throws Exception {
        for (Map.Entry entry : this.jobChannels.entrySet()) {
            this.messageBus.unbindProducer("job:" + (String)entry.getKey(), (MessageChannel)entry.getValue());
        }
    }
}

