/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.xd.dirt.server.container;

import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
import org.apache.curator.framework.api.BackgroundPathable;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;
import org.springframework.xd.dirt.cluster.ContainerAttributes;
import org.springframework.xd.dirt.core.Job;
import org.springframework.xd.dirt.core.JobDeploymentsPath;
import org.springframework.xd.dirt.core.ModuleDeploymentsPath;
import org.springframework.xd.dirt.core.Stream;
import org.springframework.xd.dirt.core.StreamDeploymentsPath;
import org.springframework.xd.dirt.job.JobFactory;
import org.springframework.xd.dirt.module.ModuleDeployer;
import org.springframework.xd.dirt.server.admin.deployment.ModuleDeploymentStatus;
import org.springframework.xd.dirt.server.admin.deployment.zk.DeploymentLoader;
import org.springframework.xd.dirt.stream.StreamFactory;
import org.springframework.xd.dirt.zookeeper.Paths;
import org.springframework.xd.dirt.zookeeper.ZooKeeperConnection;
import org.springframework.xd.dirt.zookeeper.ZooKeeperUtils;
import org.springframework.xd.module.ModuleDeploymentProperties;
import org.springframework.xd.module.ModuleDescriptor;
import org.springframework.xd.module.ModuleType;
import org.springframework.xd.module.RuntimeModuleDeploymentProperties;
import org.springframework.xd.module.core.Module;

class DeploymentListener
implements PathChildrenCacheListener {
    private static final Logger logger = LoggerFactory.getLogger(DeploymentListener.class);
    private final ZooKeeperConnection zkConnection;
    private final StreamModuleWatcher streamModuleWatcher;
    private final JobModuleWatcher jobModuleWatcher;
    private final ModuleDeployer moduleDeployer;
    private final ContainerAttributes containerAttributes;
    private final JobFactory jobFactory;
    private final StreamFactory streamFactory;
    private static final String TAP_CHANNEL_PREFIX = "tap:";
    private final Map<ModuleDescriptor.Key, ModuleDescriptor> mapDeployedModules = new ConcurrentHashMap<ModuleDescriptor.Key, ModuleDescriptor>();

    public DeploymentListener(ZooKeeperConnection zkConnection, ModuleDeployer moduleDeployer, ContainerAttributes containerAttributes, JobFactory jobFactory, StreamFactory streamFactory) {
        this.zkConnection = zkConnection;
        this.jobModuleWatcher = new JobModuleWatcher();
        this.streamModuleWatcher = new StreamModuleWatcher();
        this.moduleDeployer = moduleDeployer;
        this.containerAttributes = containerAttributes;
        this.jobFactory = jobFactory;
        this.streamFactory = streamFactory;
    }

    public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
        ZooKeeperUtils.logCacheEvent(logger, event);
        switch (event.getType()) {
            case INITIALIZED: {
                break;
            }
            case CHILD_ADDED: {
                this.onChildAdded(client, event.getData());
                break;
            }
            case CHILD_REMOVED: {
                this.onChildRemoved(client, event.getData());
                break;
            }
        }
    }

    private void onChildAdded(CuratorFramework client, ChildData data) throws Exception {
        block4: {
            ModuleDeploymentStatus status;
            String path = data.getPath();
            ModuleDeploymentsPath moduleDeploymentsPath = new ModuleDeploymentsPath(path);
            String unitName = moduleDeploymentsPath.getDeploymentUnitName();
            String moduleType = moduleDeploymentsPath.getModuleType();
            String moduleLabel = moduleDeploymentsPath.getModuleLabel();
            int moduleSequence = moduleDeploymentsPath.getModuleSequence();
            ModuleDescriptor.Key key = new ModuleDescriptor.Key(unitName, ModuleType.valueOf((String)moduleType), moduleLabel);
            String container = moduleDeploymentsPath.getContainer();
            Module module = null;
            RuntimeModuleDeploymentProperties properties = new RuntimeModuleDeploymentProperties();
            properties.putAll(ZooKeeperUtils.bytesToMap(data.getData()));
            try {
                Module module2 = module = ModuleType.job.toString().equals(moduleType) ? this.deployJobModule(client, unitName, moduleLabel, properties) : this.deployStreamModule(client, unitName, moduleType, moduleLabel, properties);
                status = module == null ? new ModuleDeploymentStatus(container, moduleSequence, key, ModuleDeploymentStatus.State.failed, "Module deployment returned null") : new ModuleDeploymentStatus(container, moduleSequence, key, ModuleDeploymentStatus.State.deployed, null);
            }
            catch (Exception e) {
                status = new ModuleDeploymentStatus(container, moduleSequence, key, ModuleDeploymentStatus.State.failed, ZooKeeperUtils.getStackTrace(e));
                logger.error("Exception deploying module", (Throwable)e);
            }
            try {
                this.writeModuleMetadata(client, module, path);
                client.setData().forPath(status.buildPath(), ZooKeeperUtils.mapToBytes(status.toMap()));
            }
            catch (KeeperException.NoNodeException e) {
                logger.warn("During deployment of module {} of type {} for {} with sequence number {},an undeployment request was detected; this module will be undeployed.", new Object[]{moduleLabel, moduleType, unitName, moduleSequence});
                if (!logger.isTraceEnabled()) break block4;
                logger.trace("Path " + path + " was removed", (Throwable)e);
            }
        }
    }

    private void onChildRemoved(CuratorFramework client, ChildData data) throws Exception {
        ModuleDeploymentsPath moduleDeploymentsPath = new ModuleDeploymentsPath(data.getPath());
        String streamName = moduleDeploymentsPath.getDeploymentUnitName();
        String moduleType = moduleDeploymentsPath.getModuleType();
        String moduleLabel = moduleDeploymentsPath.getModuleLabel();
        String moduleSequence = moduleDeploymentsPath.getModuleSequenceAsString();
        this.undeployModule(streamName, moduleType, moduleLabel);
        String path = ModuleType.job.toString().equals(moduleType) ? new JobDeploymentsPath().setJobName(streamName).setModuleLabel(moduleLabel).setContainer(this.containerAttributes.getId()).build() : new StreamDeploymentsPath().setStreamName(streamName).setModuleType(moduleType).setModuleLabel(moduleLabel).setModuleSequence(moduleSequence).setContainer(this.containerAttributes.getId()).build();
        if (client.checkExists().forPath(path) != null) {
            logger.trace("Deleting path: {}", (Object)path);
            client.delete().forPath(path);
        }
    }

    private void writeModuleMetadata(CuratorFramework client, Module module, String path) throws Exception {
        if (module != null) {
            HashMap<String, String> mapMetadata = new HashMap<String, String>();
            CollectionUtils.mergePropertiesIntoMap((Properties)module.getProperties(), mapMetadata);
            try {
                ((ACLBackgroundPathAndBytesable)client.create().withMode(CreateMode.EPHEMERAL)).forPath(Paths.build(path, "metadata"), ZooKeeperUtils.mapToBytes(mapMetadata));
            }
            catch (KeeperException.NodeExistsException ne) {
                ModuleDescriptor descriptor = module.getDescriptor();
                logger.info("The module metadata path for the module {} of type {} for {}already exists.", new Object[]{descriptor.getModuleLabel(), descriptor.getType().toString(), descriptor.getGroup()});
            }
        }
    }

    private Module deployJobModule(CuratorFramework client, String jobName, String jobLabel, RuntimeModuleDeploymentProperties properties) throws Exception {
        logger.info("Deploying job '{}'", (Object)jobName);
        String jobDeploymentPath = new JobDeploymentsPath().setJobName(jobName).setModuleLabel(jobLabel).setModuleSequence(properties.getSequenceAsString()).setContainer(this.containerAttributes.getId()).build();
        Module module = null;
        Job job = DeploymentLoader.loadJob(client, jobName, this.jobFactory);
        if (job != null) {
            ModuleDescriptor moduleDescriptor = job.getJobModuleDescriptor();
            module = this.deployModule(moduleDescriptor, (ModuleDeploymentProperties)properties);
            try {
                ((ACLBackgroundPathAndBytesable)client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)).forPath(jobDeploymentPath);
                ((BackgroundPathable)client.getData().usingWatcher((CuratorWatcher)this.jobModuleWatcher)).forPath(jobDeploymentPath);
            }
            catch (KeeperException.NodeExistsException e) {
                logger.info("Module for job {} already deployed", (Object)jobName);
            }
        }
        return module;
    }

    private Module deployStreamModule(CuratorFramework client, String streamName, String moduleType, String moduleLabel, RuntimeModuleDeploymentProperties properties) throws Exception {
        logger.info("Deploying module '{}' for stream '{}'", (Object)moduleLabel, (Object)streamName);
        String streamDeploymentPath = new StreamDeploymentsPath().setStreamName(streamName).setModuleType(moduleType).setModuleLabel(moduleLabel).setModuleSequence(properties.getSequenceAsString()).setContainer(this.containerAttributes.getId()).build();
        Module module = null;
        Stream stream = DeploymentLoader.loadStream(client, streamName, this.streamFactory);
        if (stream != null) {
            ModuleDescriptor descriptor = stream.getModuleDescriptor(moduleLabel);
            module = this.deployModule(descriptor, (ModuleDeploymentProperties)properties);
            try {
                ((ACLBackgroundPathAndBytesable)client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)).forPath(streamDeploymentPath);
                ((BackgroundPathable)client.getData().usingWatcher((CuratorWatcher)this.streamModuleWatcher)).forPath(streamDeploymentPath);
            }
            catch (KeeperException.NodeExistsException e) {
                logger.info("Module {} for stream {} already deployed", (Object)moduleLabel, (Object)streamName);
            }
        }
        return module;
    }

    private Module deployModule(ModuleDescriptor moduleDescriptor, ModuleDeploymentProperties deploymentProperties) {
        logger.info("Deploying module {}", (Object)moduleDescriptor);
        ModuleDescriptor.Key key = new ModuleDescriptor.Key(moduleDescriptor.getGroup(), moduleDescriptor.getType(), moduleDescriptor.getModuleLabel());
        this.mapDeployedModules.put(key, moduleDescriptor);
        Module module = this.moduleDeployer.createModule(moduleDescriptor, deploymentProperties);
        this.registerTap(moduleDescriptor);
        this.moduleDeployer.deploy(module, moduleDescriptor);
        return module;
    }

    protected void undeployModule(String streamName, String moduleType, String moduleLabel) {
        ModuleDescriptor.Key key = new ModuleDescriptor.Key(streamName, ModuleType.valueOf((String)moduleType), moduleLabel);
        ModuleDescriptor descriptor = this.mapDeployedModules.get(key);
        if (descriptor == null) {
            logger.trace("Module {} already undeployed", (Object)moduleLabel);
        } else {
            logger.info("Undeploying module {}", (Object)descriptor);
            this.mapDeployedModules.remove(key);
            this.moduleDeployer.undeploy(descriptor);
            this.unregisterTap(descriptor);
        }
    }

    private void registerTap(ModuleDescriptor descriptor) {
        String tapChannelName = this.determineTapChannel(descriptor);
        if (tapChannelName != null) {
            try {
                ((ACLBackgroundPathAndBytesable)this.zkConnection.getClient().create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)).forPath(Paths.build("taps", tapChannelName, this.containerAttributes.getId(), descriptor.getGroup()));
            }
            catch (Exception e) {
                ZooKeeperUtils.wrapAndThrowIgnoring(e, KeeperException.NodeExistsException.class);
            }
        }
    }

    private void unregisterTap(ModuleDescriptor descriptor) {
        CuratorFramework client = this.zkConnection.getClient();
        String tapChannelName = this.determineTapChannel(descriptor);
        if (tapChannelName != null) {
            try {
                try {
                    client.delete().forPath(Paths.build("taps", tapChannelName, this.containerAttributes.getId(), descriptor.getGroup()));
                }
                catch (KeeperException.NoNodeException e) {
                    // empty catch block
                }
                try {
                    client.delete().forPath(Paths.build("taps", tapChannelName, this.containerAttributes.getId()));
                    client.delete().forPath(Paths.build("taps", tapChannelName));
                }
                catch (KeeperException.NoNodeException e) {
                }
                catch (KeeperException.NotEmptyException e) {}
            }
            catch (Exception e) {
                if (client.getState() == CuratorFrameworkState.STARTED) {
                    throw ZooKeeperUtils.wrapThrowable(e);
                }
                logger.debug("Ignoring exception for tap un-registration due to closed ZooKeeper connection", (Throwable)e);
            }
        }
    }

    private String determineTapChannel(ModuleDescriptor descriptor) {
        String sourceChannelName = descriptor.getSourceChannelName();
        return sourceChannelName != null && sourceChannelName.startsWith(TAP_CHANNEL_PREFIX) ? sourceChannelName.substring(TAP_CHANNEL_PREFIX.length()) : null;
    }

    void undeployAllModules() {
        Iterator<ModuleDescriptor.Key> iterator = this.mapDeployedModules.keySet().iterator();
        while (iterator.hasNext()) {
            ModuleDescriptor.Key key = iterator.next();
            try {
                this.undeployModule(key.getGroup(), key.getType().name(), key.getLabel());
            }
            catch (Exception e) {
                logger.warn("Exception while undeploying " + key, (Throwable)e);
            }
            iterator.remove();
        }
    }

    class StreamModuleWatcher
    implements CuratorWatcher {
        StreamModuleWatcher() {
        }

        /*
         * Enabled force condition propagation
         * Lifted jumps to return sites
         */
        public void process(WatchedEvent event) throws Exception {
            CuratorFramework client = DeploymentListener.this.zkConnection.getClient();
            if (event.getType() == Watcher.Event.EventType.NodeDeleted) {
                StreamDeploymentsPath streamDeploymentsPath = new StreamDeploymentsPath(event.getPath());
                String streamName = streamDeploymentsPath.getStreamName();
                String moduleType = streamDeploymentsPath.getModuleType();
                String moduleLabel = streamDeploymentsPath.getModuleLabel();
                String moduleSequence = streamDeploymentsPath.getModuleSequenceAsString();
                DeploymentListener.this.undeployModule(streamName, moduleType, moduleLabel);
                String deploymentPath = new ModuleDeploymentsPath().setContainer(DeploymentListener.this.containerAttributes.getId()).setDeploymentUnitName(streamName).setModuleType(moduleType).setModuleLabel(moduleLabel).setModuleSequence(moduleSequence).build();
                try {
                    if (client.checkExists().forPath(deploymentPath) == null) return;
                    logger.trace("Deleting path: {}", (Object)deploymentPath);
                    client.delete().deletingChildrenIfNeeded().forPath(deploymentPath);
                    return;
                }
                catch (Exception e) {
                    if (client.getState() != CuratorFrameworkState.STARTED) return;
                    throw ZooKeeperUtils.wrapThrowable(e);
                }
            }
            logger.debug("Unexpected event {}, ZooKeeper state: {}", (Object)event.getType(), (Object)event.getState());
            if (!EnumSet.of(Watcher.Event.KeeperState.SyncConnected, Watcher.Event.KeeperState.SaslAuthenticated, Watcher.Event.KeeperState.ConnectedReadOnly).contains(event.getState())) return;
            try {
                ((BackgroundPathable)client.getData().usingWatcher((CuratorWatcher)this)).forPath(event.getPath());
                return;
            }
            catch (Exception e) {
                logger.error("Exception setting up watch for path '{}': {}; ZooKeeper state: {}", new Object[]{event.getPath(), e, DeploymentListener.this.zkConnection.getClient().getZookeeperClient().getZooKeeper().getState()});
                if (logger.isDebugEnabled()) {
                    logger.debug("Full stack trace", (Throwable)e);
                }
                if (client.getState() != CuratorFrameworkState.STARTED) return;
                throw ZooKeeperUtils.wrapThrowable(e);
            }
        }
    }

    class JobModuleWatcher
    implements CuratorWatcher {
        JobModuleWatcher() {
        }

        /*
         * Enabled force condition propagation
         * Lifted jumps to return sites
         */
        public void process(WatchedEvent event) throws Exception {
            CuratorFramework client = DeploymentListener.this.zkConnection.getClient();
            if (event.getType() == Watcher.Event.EventType.NodeDeleted) {
                JobDeploymentsPath jobDeploymentsPath = new JobDeploymentsPath(event.getPath());
                String jobName = jobDeploymentsPath.getJobName();
                String moduleLabel = jobDeploymentsPath.getModuleLabel();
                String moduleSequence = jobDeploymentsPath.getModuleSequenceAsString();
                DeploymentListener.this.undeployModule(jobName, ModuleType.job.toString(), moduleLabel);
                String deploymentPath = new ModuleDeploymentsPath().setContainer(DeploymentListener.this.containerAttributes.getId()).setDeploymentUnitName(jobName).setModuleType(ModuleType.job.toString()).setModuleLabel(moduleLabel).setModuleSequence(moduleSequence).build();
                try {
                    if (client.checkExists().forPath(deploymentPath) == null) return;
                    logger.trace("Deleting path: {}", (Object)deploymentPath);
                    client.delete().deletingChildrenIfNeeded().forPath(deploymentPath);
                    return;
                }
                catch (Exception e) {
                    if (client.getState() != CuratorFrameworkState.STARTED) return;
                    throw ZooKeeperUtils.wrapThrowable(e);
                }
            }
            logger.debug("Unexpected event {}, ZooKeeper state: {}", (Object)event.getType(), (Object)event.getState());
            if (!EnumSet.of(Watcher.Event.KeeperState.SyncConnected, Watcher.Event.KeeperState.SaslAuthenticated, Watcher.Event.KeeperState.ConnectedReadOnly).contains(event.getState())) return;
            try {
                ((BackgroundPathable)client.getData().usingWatcher((CuratorWatcher)this)).forPath(event.getPath());
                return;
            }
            catch (Exception e) {
                logger.error("Exception setting up watch for path '{}': {}; ZooKeeper state: {}", new Object[]{event.getPath(), e, DeploymentListener.this.zkConnection.getClient().getZookeeperClient().getZooKeeper().getState()});
                if (logger.isDebugEnabled()) {
                    logger.debug("Full stack trace", (Throwable)e);
                }
                if (client.getState() != CuratorFrameworkState.STARTED) return;
                throw ZooKeeperUtils.wrapThrowable(e);
            }
        }
    }
}

